diff --git a/be/src/olap/memtable_flush_executor.cpp b/be/src/olap/memtable_flush_executor.cpp index 1ebe44aabf..247bf0ca81 100644 --- a/be/src/olap/memtable_flush_executor.cpp +++ b/be/src/olap/memtable_flush_executor.cpp @@ -95,6 +95,7 @@ Status FlushToken::submit(std::unique_ptr mem_table) { this, std::move(mem_table), _rowset_writer->allocate_segment_id(), submit_task_time); Status ret = _thread_pool->submit(std::move(task)); if (ret.ok()) { + // _wait_running_task_finish was executed after this function, so no need to notify _cond here _stats.flush_running_count++; } return ret; @@ -103,16 +104,8 @@ Status FlushToken::submit(std::unique_ptr mem_table) { // NOTE: FlushToken's submit/cancel/wait run in one thread, // so we don't need to make them mutually exclusive, std::atomic is enough. void FlushToken::_wait_running_task_finish() { - while (true) { - int64_t flush_running_count = _stats.flush_running_count.load(); - if (flush_running_count < 0) { - LOG(ERROR) << "flush_running_count < 0, this is not expected!"; - } - if (flush_running_count == 0) { - break; - } - std::this_thread::sleep_for(std::chrono::milliseconds(50)); - } + std::unique_lock lock(_mutex); + _cond.wait(lock, [&]() { return _stats.flush_running_count.load() == 0; }); } void FlushToken::cancel() { @@ -155,7 +148,13 @@ Status FlushToken::_do_flush_memtable(MemTable* memtable, int32_t segment_id, in void FlushToken::_flush_memtable(std::unique_ptr memtable_ptr, int32_t segment_id, int64_t submit_task_time) { - Defer defer {[&]() { _stats.flush_running_count--; }}; + Defer defer {[&]() { + std::lock_guard lock(_mutex); + _stats.flush_running_count--; + if (_stats.flush_running_count == 0) { + _cond.notify_one(); + } + }}; if (_is_shutdown()) { return; } diff --git a/be/src/olap/memtable_flush_executor.h b/be/src/olap/memtable_flush_executor.h index 9896b8382d..1576e68fc7 100644 --- a/be/src/olap/memtable_flush_executor.h +++ b/be/src/olap/memtable_flush_executor.h @@ -18,6 +18,7 @@ #pragma once #include +#include #include #include #include @@ -101,6 +102,9 @@ private: std::atomic _shutdown = false; ThreadPool* _thread_pool = nullptr; + + std::mutex _mutex; + std::condition_variable _cond; }; // MemTableFlushExecutor is responsible for flushing memtables to disk.