[opt](load) use notify to replace polling for FlushToken #35796 (#36170)

cherry pick from #35796
This commit is contained in:
Xin Liao
2024-06-12 19:37:27 +08:00
committed by GitHub
parent 14ece32b87
commit ff517ab677
2 changed files with 14 additions and 11 deletions

View File

@ -95,6 +95,7 @@ Status FlushToken::submit(std::unique_ptr<MemTable> mem_table) {
this, std::move(mem_table), _rowset_writer->allocate_segment_id(), submit_task_time);
Status ret = _thread_pool->submit(std::move(task));
if (ret.ok()) {
// _wait_running_task_finish was executed after this function, so no need to notify _cond here
_stats.flush_running_count++;
}
return ret;
@ -103,16 +104,8 @@ Status FlushToken::submit(std::unique_ptr<MemTable> mem_table) {
// NOTE: FlushToken's submit/cancel/wait run in one thread,
// so we don't need to make them mutually exclusive, std::atomic is enough.
void FlushToken::_wait_running_task_finish() {
while (true) {
int64_t flush_running_count = _stats.flush_running_count.load();
if (flush_running_count < 0) {
LOG(ERROR) << "flush_running_count < 0, this is not expected!";
}
if (flush_running_count == 0) {
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
std::unique_lock<std::mutex> lock(_mutex);
_cond.wait(lock, [&]() { return _stats.flush_running_count.load() == 0; });
}
void FlushToken::cancel() {
@ -155,7 +148,13 @@ Status FlushToken::_do_flush_memtable(MemTable* memtable, int32_t segment_id, in
void FlushToken::_flush_memtable(std::unique_ptr<MemTable> memtable_ptr, int32_t segment_id,
int64_t submit_task_time) {
Defer defer {[&]() { _stats.flush_running_count--; }};
Defer defer {[&]() {
std::lock_guard<std::mutex> lock(_mutex);
_stats.flush_running_count--;
if (_stats.flush_running_count == 0) {
_cond.notify_one();
}
}};
if (_is_shutdown()) {
return;
}

View File

@ -18,6 +18,7 @@
#pragma once
#include <atomic>
#include <condition_variable>
#include <cstdint>
#include <iosfwd>
#include <memory>
@ -101,6 +102,9 @@ private:
std::atomic<bool> _shutdown = false;
ThreadPool* _thread_pool = nullptr;
std::mutex _mutex;
std::condition_variable _cond;
};
// MemTableFlushExecutor is responsible for flushing memtables to disk.