From 90c2da54bd1189ad899de19cd08dac61a032815f Mon Sep 17 00:00:00 2001 From: weizuo93 <68884553+weizuo93@users.noreply.github.com> Date: Sat, 30 Jan 2021 16:35:36 +0800 Subject: [PATCH] [Bug] Fix bug and add graceful exit for compaction producer (#5124) 1. add graceful exit mechanism for the compaction producer thread. 2. if compaction task submits unsuccessfully, the compaction task should pop from `_tablet_submitted_compaction`. --- be/src/common/config.h | 3 +++ be/src/olap/olap_server.cpp | 50 +++++++++++++++++++++--------------- be/src/olap/storage_engine.h | 2 ++ 3 files changed, 35 insertions(+), 20 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index 4d78596417..e978fc856f 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -283,6 +283,9 @@ CONF_mInt32(max_compaction_threads, "10"); // The upper limit of "permits" held by all compaction tasks. This config can be set to limit memory consumption for compaction. CONF_mInt64(total_permits_for_compaction_score, "10000"); +// sleep interval in ms after generated compaction tasks +CONF_mInt32(generate_compaction_tasks_min_interval_ms, "10") + // Compaction task number per disk. CONF_mInt32(compaction_task_num_per_disk, "2"); diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index 2026150732..19d2865075 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -324,7 +324,8 @@ void StorageEngine::_compaction_tasks_producer_callback() { int round = 0; CompactionType compaction_type; - int32_t interval = 1; + + int64_t interval = config::generate_compaction_tasks_min_interval_ms; do { if (!config::disable_auto_compaction) { if (round < config::cumulative_compaction_rounds_for_each_base_compaction_round) { @@ -337,8 +338,8 @@ void StorageEngine::_compaction_tasks_producer_callback() { std::vector tablets_compaction = _compaction_tasks_generator(compaction_type, data_dirs); if (tablets_compaction.size() == 0) { - _wakeup_producer_flag = 0; std::unique_lock lock(_compaction_producer_sleep_mutex); + _wakeup_producer_flag = 0; // It is necessary to wake up the thread on timeout to prevent deadlock // in case of no running compaction task. _compaction_producer_sleep_cv.wait_for(lock, std::chrono::milliseconds(2000), @@ -354,32 +355,19 @@ void StorageEngine::_compaction_tasks_producer_callback() { for (const auto& tablet : tablets_compaction) { int64_t permits = tablet->prepare_compaction_and_calculate_permits(compaction_type, tablet); if (permits > 0 && _permit_limiter.request(permits)) { - { - // Push to _tablet_submitted_compaction before submitting task - std::unique_lock lock(_tablet_submitted_compaction_mutex); - _tablet_submitted_compaction[tablet->data_dir()].emplace_back( - tablet->tablet_id()); - } + // Push to _tablet_submitted_compaction before submitting task + _push_tablet_into_submitted_compaction(tablet); auto st =_compaction_thread_pool->submit_func([=]() { CgroupsMgr::apply_system_cgroup(); tablet->execute_compaction(compaction_type); _permit_limiter.release(permits); - std::unique_lock lock(_tablet_submitted_compaction_mutex); - std::vector::iterator it_tablet = - find(_tablet_submitted_compaction[tablet->data_dir()].begin(), - _tablet_submitted_compaction[tablet->data_dir()].end(), - tablet->tablet_id()); - if (it_tablet != - _tablet_submitted_compaction[tablet->data_dir()].end()) { - _tablet_submitted_compaction[tablet->data_dir()].erase(it_tablet); - _wakeup_producer_flag = 1; - _compaction_producer_sleep_cv.notify_one(); - } + _pop_tablet_from_submitted_compaction(tablet); // reset compaction tablet->reset_compaction(compaction_type); }); if (!st.ok()) { _permit_limiter.release(permits); + _pop_tablet_from_submitted_compaction(tablet); // reset compaction tablet->reset_compaction(compaction_type); } @@ -388,7 +376,7 @@ void StorageEngine::_compaction_tasks_producer_callback() { tablet->reset_compaction(compaction_type); } } - interval = 1; + interval = config::generate_compaction_tasks_min_interval_ms; } else { interval = config::check_auto_compaction_interval_seconds * 1000; } @@ -414,4 +402,26 @@ std::vector StorageEngine::_compaction_tasks_generator( } return tablets_compaction; } + +void StorageEngine::_push_tablet_into_submitted_compaction(TabletSharedPtr tablet) { + std::unique_lock lock(_tablet_submitted_compaction_mutex); + _tablet_submitted_compaction[tablet->data_dir()].emplace_back( + tablet->tablet_id()); +} + +void StorageEngine::_pop_tablet_from_submitted_compaction(TabletSharedPtr tablet) { + std::unique_lock lock(_tablet_submitted_compaction_mutex); + std::vector::iterator it_tablet = + find(_tablet_submitted_compaction[tablet->data_dir()].begin(), + _tablet_submitted_compaction[tablet->data_dir()].end(), + tablet->tablet_id()); + if (it_tablet != + _tablet_submitted_compaction[tablet->data_dir()].end()) { + _tablet_submitted_compaction[tablet->data_dir()].erase(it_tablet); + std::unique_lock lock(_compaction_producer_sleep_mutex); + _wakeup_producer_flag = 1; + _compaction_producer_sleep_cv.notify_one(); + } +} + } // namespace doris diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index e5aaf4e939..04964c9602 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -239,6 +239,8 @@ private: void _compaction_tasks_producer_callback(); vector _compaction_tasks_generator(CompactionType compaction_type, std::vector data_dirs); + void _push_tablet_into_submitted_compaction(TabletSharedPtr tablet); + void _pop_tablet_from_submitted_compaction(TabletSharedPtr tablet); private: struct CompactionCandidate {