From ee5b79ac2b538272374be60313fc6adeab7cfcaf Mon Sep 17 00:00:00 2001 From: Mingyu Chen Date: Tue, 15 Oct 2019 22:46:19 +0800 Subject: [PATCH] Fix bug that memtable should be destroyed before finishing the load process (#1983) The parent mem tracker may be release before visiting it in child mem tracker, which cause segfault. --- be/src/olap/memtable_flush_executor.cpp | 4 ++++ be/src/runtime/load_channel.cpp | 6 +++--- be/src/runtime/load_channel.h | 4 ++-- be/src/runtime/load_channel_mgr.cpp | 22 ++++++++++++++++++---- 4 files changed, 27 insertions(+), 9 deletions(-) diff --git a/be/src/olap/memtable_flush_executor.cpp b/be/src/olap/memtable_flush_executor.cpp index 1aa25e9fea..728ebf0acc 100644 --- a/be/src/olap/memtable_flush_executor.cpp +++ b/be/src/olap/memtable_flush_executor.cpp @@ -132,6 +132,8 @@ void MemTableFlushExecutor::_flush_memtable(int32_t queue_idx) { // if last flush of this tablet already failed, just skip if (ctx.flush_handler->is_cancelled()) { + // must release memtable before notifying + ctx.memtable.reset(); ctx.flush_handler->on_flush_cancelled(); continue; } @@ -143,6 +145,8 @@ void MemTableFlushExecutor::_flush_memtable(int32_t queue_idx) { res.flush_status = ctx.memtable->flush(); res.flush_time_ns = timer.elapsed_time(); res.flush_size_bytes = ctx.memtable->memory_usage(); + // must release memtable before notifying + ctx.memtable.reset(); // callback ctx.flush_handler->on_flush_finished(res); } diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp index 44877bb5ca..7f47069c7c 100644 --- a/be/src/runtime/load_channel.cpp +++ b/be/src/runtime/load_channel.cpp @@ -29,7 +29,7 @@ LoadChannel::LoadChannel(const UniqueId& load_id, int64_t mem_limit, MemTracker* // _last_updated_time should be set before being inserted to // _load_channels in load_channel_mgr, or it may be erased // immediately by gc thread. - _last_updated_time = time(nullptr); + _last_updated_time.store(time(nullptr)); } LoadChannel::~LoadChannel() { @@ -57,7 +57,7 @@ Status LoadChannel::open(const PTabletWriterOpenRequest& params) { RETURN_IF_ERROR(channel->open(params)); _opened = true; - _last_updated_time = time(nullptr); + _last_updated_time.store(time(nullptr)); return Status::OK(); } @@ -102,7 +102,7 @@ Status LoadChannel::add_batch( _finished_channel_ids.emplace(index_id); } } - _last_updated_time = time(nullptr); + _last_updated_time.store(time(nullptr)); return st; } diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h index 9e317abc33..d6ddc1be94 100644 --- a/be/src/runtime/load_channel.h +++ b/be/src/runtime/load_channel.h @@ -53,7 +53,7 @@ public: // cancel this channel Status cancel(); - time_t last_updated_time() const { return _last_updated_time; } + time_t last_updated_time() const { return _last_updated_time.load(); } const UniqueId& load_id() const { return _load_id; } @@ -84,7 +84,7 @@ private: // set to true if at least one tablets channel has been opened bool _opened = false; - time_t _last_updated_time; + std::atomic _last_updated_time; }; } diff --git a/be/src/runtime/load_channel_mgr.cpp b/be/src/runtime/load_channel_mgr.cpp index 181499a025..bb76ab1e84 100644 --- a/be/src/runtime/load_channel_mgr.cpp +++ b/be/src/runtime/load_channel_mgr.cpp @@ -120,9 +120,9 @@ Status LoadChannelMgr::add_batch( _handle_mem_exceed_limit(); // 3. add batch to load channel - if (request.has_row_batch()) { - RETURN_IF_ERROR(channel->add_batch(request, tablet_vec)); - } + // batch may not exist in request(eg: eos request without batch), + // this case will be handled in load channel's add batch method. + RETURN_IF_ERROR(channel->add_batch(request, tablet_vec)); // 4. handle finish if (channel->is_finished()) { @@ -165,10 +165,20 @@ void LoadChannelMgr::_handle_mem_exceed_limit() { Status LoadChannelMgr::cancel(const PTabletWriterCancelRequest& params) { UniqueId load_id(params.id()); + std::shared_ptr cancelled_channel; { std::lock_guard l(_lock); - _load_channels.erase(load_id); + if (_load_channels.find(load_id) != _load_channels.end()) { + cancelled_channel = _load_channels[load_id]; + _load_channels.erase(load_id); + } } + + if (cancelled_channel.get() != nullptr) { + cancelled_channel->cancel(); + LOG(INFO) << "load channel has been cancelled: " << load_id; + } + return Status::OK(); } @@ -179,7 +189,11 @@ Status LoadChannelMgr::_start_bg_worker() { ProfilerRegisterThread(); #endif +#ifndef BE_TEST uint32_t interval = 60; +#else + uint32_t interval = 1; +#endif while (!_is_stopped.load()) { _start_load_channels_clean(); sleep(interval);