Fix bug that memtable should be destroyed before finishing the load process (#1983)
The parent mem tracker may be release before visiting it in child mem tracker, which cause segfault.
This commit is contained in:
@ -132,6 +132,8 @@ void MemTableFlushExecutor::_flush_memtable(int32_t queue_idx) {
|
||||
|
||||
// if last flush of this tablet already failed, just skip
|
||||
if (ctx.flush_handler->is_cancelled()) {
|
||||
// must release memtable before notifying
|
||||
ctx.memtable.reset();
|
||||
ctx.flush_handler->on_flush_cancelled();
|
||||
continue;
|
||||
}
|
||||
@ -143,6 +145,8 @@ void MemTableFlushExecutor::_flush_memtable(int32_t queue_idx) {
|
||||
res.flush_status = ctx.memtable->flush();
|
||||
res.flush_time_ns = timer.elapsed_time();
|
||||
res.flush_size_bytes = ctx.memtable->memory_usage();
|
||||
// must release memtable before notifying
|
||||
ctx.memtable.reset();
|
||||
// callback
|
||||
ctx.flush_handler->on_flush_finished(res);
|
||||
}
|
||||
|
||||
@ -29,7 +29,7 @@ LoadChannel::LoadChannel(const UniqueId& load_id, int64_t mem_limit, MemTracker*
|
||||
// _last_updated_time should be set before being inserted to
|
||||
// _load_channels in load_channel_mgr, or it may be erased
|
||||
// immediately by gc thread.
|
||||
_last_updated_time = time(nullptr);
|
||||
_last_updated_time.store(time(nullptr));
|
||||
}
|
||||
|
||||
LoadChannel::~LoadChannel() {
|
||||
@ -57,7 +57,7 @@ Status LoadChannel::open(const PTabletWriterOpenRequest& params) {
|
||||
RETURN_IF_ERROR(channel->open(params));
|
||||
|
||||
_opened = true;
|
||||
_last_updated_time = time(nullptr);
|
||||
_last_updated_time.store(time(nullptr));
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -102,7 +102,7 @@ Status LoadChannel::add_batch(
|
||||
_finished_channel_ids.emplace(index_id);
|
||||
}
|
||||
}
|
||||
_last_updated_time = time(nullptr);
|
||||
_last_updated_time.store(time(nullptr));
|
||||
return st;
|
||||
}
|
||||
|
||||
|
||||
@ -53,7 +53,7 @@ public:
|
||||
// cancel this channel
|
||||
Status cancel();
|
||||
|
||||
time_t last_updated_time() const { return _last_updated_time; }
|
||||
time_t last_updated_time() const { return _last_updated_time.load(); }
|
||||
|
||||
const UniqueId& load_id() const { return _load_id; }
|
||||
|
||||
@ -84,7 +84,7 @@ private:
|
||||
// set to true if at least one tablets channel has been opened
|
||||
bool _opened = false;
|
||||
|
||||
time_t _last_updated_time;
|
||||
std::atomic<time_t> _last_updated_time;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@ -120,9 +120,9 @@ Status LoadChannelMgr::add_batch(
|
||||
_handle_mem_exceed_limit();
|
||||
|
||||
// 3. add batch to load channel
|
||||
if (request.has_row_batch()) {
|
||||
RETURN_IF_ERROR(channel->add_batch(request, tablet_vec));
|
||||
}
|
||||
// batch may not exist in request(eg: eos request without batch),
|
||||
// this case will be handled in load channel's add batch method.
|
||||
RETURN_IF_ERROR(channel->add_batch(request, tablet_vec));
|
||||
|
||||
// 4. handle finish
|
||||
if (channel->is_finished()) {
|
||||
@ -165,10 +165,20 @@ void LoadChannelMgr::_handle_mem_exceed_limit() {
|
||||
|
||||
Status LoadChannelMgr::cancel(const PTabletWriterCancelRequest& params) {
|
||||
UniqueId load_id(params.id());
|
||||
std::shared_ptr<LoadChannel> cancelled_channel;
|
||||
{
|
||||
std::lock_guard<std::mutex> l(_lock);
|
||||
_load_channels.erase(load_id);
|
||||
if (_load_channels.find(load_id) != _load_channels.end()) {
|
||||
cancelled_channel = _load_channels[load_id];
|
||||
_load_channels.erase(load_id);
|
||||
}
|
||||
}
|
||||
|
||||
if (cancelled_channel.get() != nullptr) {
|
||||
cancelled_channel->cancel();
|
||||
LOG(INFO) << "load channel has been cancelled: " << load_id;
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -179,7 +189,11 @@ Status LoadChannelMgr::_start_bg_worker() {
|
||||
ProfilerRegisterThread();
|
||||
#endif
|
||||
|
||||
#ifndef BE_TEST
|
||||
uint32_t interval = 60;
|
||||
#else
|
||||
uint32_t interval = 1;
|
||||
#endif
|
||||
while (!_is_stopped.load()) {
|
||||
_start_load_channels_clean();
|
||||
sleep(interval);
|
||||
|
||||
Reference in New Issue
Block a user