diff --git a/be/src/common/config.h b/be/src/common/config.h index 08ac3a2104..fff9e60d3e 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -237,6 +237,7 @@ namespace config { CONF_Int32(base_compaction_check_interval_seconds, "60"); CONF_Int64(base_compaction_num_cumulative_deltas, "5"); CONF_Int32(base_compaction_num_threads, "1"); + CONF_Int32(base_compaction_num_threads_per_disk, "1"); CONF_Double(base_cumulative_delta_ratio, "0.3"); CONF_Int64(base_compaction_interval_seconds_since_last_operation, "604800"); CONF_Int32(base_compaction_write_mbytes_per_sec, "5"); @@ -245,6 +246,7 @@ namespace config { CONF_Int32(cumulative_compaction_check_interval_seconds, "10"); CONF_Int64(cumulative_compaction_num_singleton_deltas, "5"); CONF_Int32(cumulative_compaction_num_threads, "1"); + CONF_Int32(cumulative_compaction_num_threads_per_disk, "1"); CONF_Int64(cumulative_compaction_budgeted_bytes, "104857600"); CONF_Int32(cumulative_compaction_write_mbytes_per_sec, "100"); diff --git a/be/src/olap/base_compaction.cpp b/be/src/olap/base_compaction.cpp index 52332d8c42..ea98c00080 100644 --- a/be/src/olap/base_compaction.cpp +++ b/be/src/olap/base_compaction.cpp @@ -310,6 +310,7 @@ bool BaseCompaction::_check_whether_satisfy_policy(bool is_manual_trigger, OLAPStatus BaseCompaction::_do_base_compaction(VersionHash new_base_version_hash, vector* base_data_sources, uint64_t* row_count) { + OlapStopWatch watch; // 1. 生成新base文件对应的olap index SegmentGroup* new_base = new (std::nothrow) SegmentGroup(_table.get(), _new_base_version, @@ -399,7 +400,9 @@ OLAPStatus BaseCompaction::_do_base_compaction(VersionHash new_base_version_hash << "source_rows=" << source_rows << ", merged_rows=" << merged_rows << ", filted_rows=" << filted_rows - << ", new_index_rows=" << new_base->num_rows(); + << ", new_index_rows=" << new_base->num_rows() + << ", merged_version_num=" << _need_merged_versions.size() + << ", time_us=" << watch.get_elapse_time_us(); } LOG(INFO) << "succeed to do base compaction. table=" << _table->full_name() diff --git a/be/src/olap/cumulative_compaction.cpp b/be/src/olap/cumulative_compaction.cpp index ba43b87d32..6a208bb13f 100755 --- a/be/src/olap/cumulative_compaction.cpp +++ b/be/src/olap/cumulative_compaction.cpp @@ -369,6 +369,7 @@ bool CumulativeCompaction::_find_previous_version(const Version current_version, OLAPStatus CumulativeCompaction::_do_cumulative_compaction() { OLAPStatus res = OLAP_SUCCESS; + OlapStopWatch watch; Merger merger(_table, _new_segment_group, READER_CUMULATIVE_COMPACTION); // 1. merge delta files into new cumulative file @@ -412,7 +413,9 @@ OLAPStatus CumulativeCompaction::_do_cumulative_compaction() { LOG(INFO) << "all row nums. source_rows=" << source_rows << ", merged_rows=" << merged_rows << ", filted_rows=" << filted_rows - << ", new_index_rows=" << _new_segment_group->num_rows(); + << ", new_index_rows=" << _new_segment_group->num_rows() + << ", merged_version_num=" << _need_merged_versions.size() + << ", time_us=" << watch.get_elapse_time_us(); } // 3. add new cumulative file into table diff --git a/be/src/olap/olap_engine.cpp b/be/src/olap/olap_engine.cpp index 75afc59320..19a62d7c8e 100644 --- a/be/src/olap/olap_engine.cpp +++ b/be/src/olap/olap_engine.cpp @@ -1833,8 +1833,8 @@ void OLAPEngine::start_clean_fd_cache() { VLOG(10) << "end clean file descritpor cache"; } -void OLAPEngine::perform_cumulative_compaction() { - OLAPTablePtr best_table = _find_best_tablet_to_compaction(CompactionType::CUMULATIVE_COMPACTION); +void OLAPEngine::perform_cumulative_compaction(OlapStore* store) { + OLAPTablePtr best_table = _find_best_tablet_to_compaction(CompactionType::CUMULATIVE_COMPACTION, store); if (best_table == nullptr) { return; } DorisMetrics::cumulative_compaction_request_total.increment(1); @@ -1863,8 +1863,8 @@ void OLAPEngine::perform_cumulative_compaction() { best_table->set_last_compaction_failure_time(0); } -void OLAPEngine::perform_base_compaction() { - OLAPTablePtr best_table = _find_best_tablet_to_compaction(CompactionType::BASE_COMPACTION); +void OLAPEngine::perform_base_compaction(OlapStore* store) { + OLAPTablePtr best_table = _find_best_tablet_to_compaction(CompactionType::BASE_COMPACTION, store); if (best_table == nullptr) { return; } DorisMetrics::base_compaction_request_total.increment(1); @@ -1893,14 +1893,15 @@ void OLAPEngine::perform_base_compaction() { best_table->set_last_compaction_failure_time(0); } -OLAPTablePtr OLAPEngine::_find_best_tablet_to_compaction(CompactionType compaction_type) { +OLAPTablePtr OLAPEngine::_find_best_tablet_to_compaction(CompactionType compaction_type, OlapStore* store) { ReadLock tablet_map_rdlock(&_tablet_map_lock); uint32_t highest_score = 0; OLAPTablePtr best_table; int64_t now = UnixMillis(); for (tablet_map_t::value_type& table_ins : _tablet_map){ for (OLAPTablePtr& table_ptr : table_ins.second.table_arr) { - if (!table_ptr->is_used() || !table_ptr->is_loaded() || !_can_do_compaction(table_ptr)) { + if (table_ptr->store()->path_hash() != store->path_hash() + || !table_ptr->is_used() || !table_ptr->is_loaded() || !_can_do_compaction(table_ptr)) { continue; } diff --git a/be/src/olap/olap_engine.h b/be/src/olap/olap_engine.h index 8928571b35..c8cc693f99 100644 --- a/be/src/olap/olap_engine.h +++ b/be/src/olap/olap_engine.h @@ -182,8 +182,8 @@ public: OLAPStatus clear(); void start_clean_fd_cache(); - void perform_cumulative_compaction(); - void perform_base_compaction(); + void perform_cumulative_compaction(OlapStore* store); + void perform_base_compaction(OlapStore* store); // 获取cache的使用情况信息 void get_cache_status(rapidjson::Document* document) const; @@ -531,7 +531,7 @@ private: OLAPStatus _check_existed_or_else_create_dir(const std::string& path); - OLAPTablePtr _find_best_tablet_to_compaction(CompactionType compaction_type); + OLAPTablePtr _find_best_tablet_to_compaction(CompactionType compaction_type, OlapStore* store); bool _can_do_compaction(OLAPTablePtr table); void _cancel_unfinished_schema_change(); @@ -588,7 +588,7 @@ private: // Thread functions // base compaction thread process function - void* _base_compaction_thread_callback(void* arg); + void* _base_compaction_thread_callback(void* arg, OlapStore* store); // garbage sweep thread process function. clear snapshot and trash folder void* _garbage_sweeper_thread_callback(void* arg); @@ -600,7 +600,7 @@ private: void* _unused_index_thread_callback(void* arg); // cumulative process function - void* _cumulative_compaction_thread_callback(void* arg); + void* _cumulative_compaction_thread_callback(void* arg, OlapStore* store); // clean file descriptors cache void* _fd_cache_clean_callback(void* arg); diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index b236caeb3a..4cd45e1fa8 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -57,23 +57,28 @@ OLAPStatus OLAPEngine::_start_bg_worker() { [this] { _unused_index_thread_callback(nullptr); }); - + // convert store map to vector + std::vector store_vec; + for (auto& tmp_store : _store_map) { + store_vec.push_back(tmp_store.second); + } + int32_t store_num = store_vec.size(); // start be and ce threads for merge data - int32_t base_compaction_num_threads = config::base_compaction_num_threads; + int32_t base_compaction_num_threads = config::base_compaction_num_threads_per_disk * store_num; _base_compaction_threads.reserve(base_compaction_num_threads); for (uint32_t i = 0; i < base_compaction_num_threads; ++i) { _base_compaction_threads.emplace_back( - [this] { - _base_compaction_thread_callback(nullptr); + [this, store_num, store_vec, i] { + _base_compaction_thread_callback(nullptr, store_vec[i % store_num]); }); } - int32_t cumulative_compaction_num_threads = config::cumulative_compaction_num_threads; + int32_t cumulative_compaction_num_threads = config::cumulative_compaction_num_threads_per_disk * store_num; _cumulative_compaction_threads.reserve(cumulative_compaction_num_threads); for (uint32_t i = 0; i < cumulative_compaction_num_threads; ++i) { _cumulative_compaction_threads.emplace_back( - [this] { - _cumulative_compaction_thread_callback(nullptr); + [this, store_num, store_vec, i] { + _cumulative_compaction_thread_callback(nullptr, store_vec[i % store_num]); }); } @@ -104,7 +109,7 @@ void* OLAPEngine::_fd_cache_clean_callback(void* arg) { return NULL; } -void* OLAPEngine::_base_compaction_thread_callback(void* arg) { +void* OLAPEngine::_base_compaction_thread_callback(void* arg, OlapStore* store) { #ifdef GOOGLE_PROFILER ProfilerRegisterThread(); #endif @@ -122,7 +127,7 @@ void* OLAPEngine::_base_compaction_thread_callback(void* arg) { // cgroup is not initialized at this time // add tid to cgroup CgroupsMgr::apply_system_cgroup(); - perform_base_compaction(); + perform_base_compaction(store); usleep(interval * 1000000); } @@ -218,7 +223,7 @@ void* OLAPEngine::_unused_index_thread_callback(void* arg) { return NULL; } -void* OLAPEngine::_cumulative_compaction_thread_callback(void* arg) { +void* OLAPEngine::_cumulative_compaction_thread_callback(void* arg, OlapStore* store) { #ifdef GOOGLE_PROFILER ProfilerRegisterThread(); #endif @@ -235,7 +240,7 @@ void* OLAPEngine::_cumulative_compaction_thread_callback(void* arg) { // cgroup is not initialized at this time // add tid to cgroup CgroupsMgr::apply_system_cgroup(); - perform_cumulative_compaction(); + perform_cumulative_compaction(store); usleep(interval * 1000000); }