diff --git a/be/src/common/config.h b/be/src/common/config.h index 4d78596417..e978fc856f 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -283,6 +283,9 @@ CONF_mInt32(max_compaction_threads, "10"); // The upper limit of "permits" held by all compaction tasks. This config can be set to limit memory consumption for compaction. CONF_mInt64(total_permits_for_compaction_score, "10000"); +// sleep interval in ms after generated compaction tasks +CONF_mInt32(generate_compaction_tasks_min_interval_ms, "10") + // Compaction task number per disk. CONF_mInt32(compaction_task_num_per_disk, "2"); diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index 2026150732..19d2865075 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -324,7 +324,8 @@ void StorageEngine::_compaction_tasks_producer_callback() { int round = 0; CompactionType compaction_type; - int32_t interval = 1; + + int64_t interval = config::generate_compaction_tasks_min_interval_ms; do { if (!config::disable_auto_compaction) { if (round < config::cumulative_compaction_rounds_for_each_base_compaction_round) { @@ -337,8 +338,8 @@ void StorageEngine::_compaction_tasks_producer_callback() { std::vector tablets_compaction = _compaction_tasks_generator(compaction_type, data_dirs); if (tablets_compaction.size() == 0) { - _wakeup_producer_flag = 0; std::unique_lock lock(_compaction_producer_sleep_mutex); + _wakeup_producer_flag = 0; // It is necessary to wake up the thread on timeout to prevent deadlock // in case of no running compaction task. _compaction_producer_sleep_cv.wait_for(lock, std::chrono::milliseconds(2000), @@ -354,32 +355,19 @@ void StorageEngine::_compaction_tasks_producer_callback() { for (const auto& tablet : tablets_compaction) { int64_t permits = tablet->prepare_compaction_and_calculate_permits(compaction_type, tablet); if (permits > 0 && _permit_limiter.request(permits)) { - { - // Push to _tablet_submitted_compaction before submitting task - std::unique_lock lock(_tablet_submitted_compaction_mutex); - _tablet_submitted_compaction[tablet->data_dir()].emplace_back( - tablet->tablet_id()); - } + // Push to _tablet_submitted_compaction before submitting task + _push_tablet_into_submitted_compaction(tablet); auto st =_compaction_thread_pool->submit_func([=]() { CgroupsMgr::apply_system_cgroup(); tablet->execute_compaction(compaction_type); _permit_limiter.release(permits); - std::unique_lock lock(_tablet_submitted_compaction_mutex); - std::vector::iterator it_tablet = - find(_tablet_submitted_compaction[tablet->data_dir()].begin(), - _tablet_submitted_compaction[tablet->data_dir()].end(), - tablet->tablet_id()); - if (it_tablet != - _tablet_submitted_compaction[tablet->data_dir()].end()) { - _tablet_submitted_compaction[tablet->data_dir()].erase(it_tablet); - _wakeup_producer_flag = 1; - _compaction_producer_sleep_cv.notify_one(); - } + _pop_tablet_from_submitted_compaction(tablet); // reset compaction tablet->reset_compaction(compaction_type); }); if (!st.ok()) { _permit_limiter.release(permits); + _pop_tablet_from_submitted_compaction(tablet); // reset compaction tablet->reset_compaction(compaction_type); } @@ -388,7 +376,7 @@ void StorageEngine::_compaction_tasks_producer_callback() { tablet->reset_compaction(compaction_type); } } - interval = 1; + interval = config::generate_compaction_tasks_min_interval_ms; } else { interval = config::check_auto_compaction_interval_seconds * 1000; } @@ -414,4 +402,26 @@ std::vector StorageEngine::_compaction_tasks_generator( } return tablets_compaction; } + +void StorageEngine::_push_tablet_into_submitted_compaction(TabletSharedPtr tablet) { + std::unique_lock lock(_tablet_submitted_compaction_mutex); + _tablet_submitted_compaction[tablet->data_dir()].emplace_back( + tablet->tablet_id()); +} + +void StorageEngine::_pop_tablet_from_submitted_compaction(TabletSharedPtr tablet) { + std::unique_lock lock(_tablet_submitted_compaction_mutex); + std::vector::iterator it_tablet = + find(_tablet_submitted_compaction[tablet->data_dir()].begin(), + _tablet_submitted_compaction[tablet->data_dir()].end(), + tablet->tablet_id()); + if (it_tablet != + _tablet_submitted_compaction[tablet->data_dir()].end()) { + _tablet_submitted_compaction[tablet->data_dir()].erase(it_tablet); + std::unique_lock lock(_compaction_producer_sleep_mutex); + _wakeup_producer_flag = 1; + _compaction_producer_sleep_cv.notify_one(); + } +} + } // namespace doris diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index e5aaf4e939..04964c9602 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -239,6 +239,8 @@ private: void _compaction_tasks_producer_callback(); vector _compaction_tasks_generator(CompactionType compaction_type, std::vector data_dirs); + void _push_tablet_into_submitted_compaction(TabletSharedPtr tablet); + void _pop_tablet_from_submitted_compaction(TabletSharedPtr tablet); private: struct CompactionCandidate {