Change the compaction thread number based on disks number (#1161)

Add 2 BE configs: base_compaction_num_threads_per_disk and cumulative_compaction_num_threads_per_disk to control the number of threads per disks.
This commit is contained in:
yiguolei
2019-05-15 14:17:11 +08:00
committed by Mingyu Chen
parent 47fb206cdf
commit 758adce761
6 changed files with 38 additions and 24 deletions

View File

@ -237,6 +237,7 @@ namespace config {
CONF_Int32(base_compaction_check_interval_seconds, "60");
CONF_Int64(base_compaction_num_cumulative_deltas, "5");
CONF_Int32(base_compaction_num_threads, "1");
CONF_Int32(base_compaction_num_threads_per_disk, "1");
CONF_Double(base_cumulative_delta_ratio, "0.3");
CONF_Int64(base_compaction_interval_seconds_since_last_operation, "604800");
CONF_Int32(base_compaction_write_mbytes_per_sec, "5");
@ -245,6 +246,7 @@ namespace config {
CONF_Int32(cumulative_compaction_check_interval_seconds, "10");
CONF_Int64(cumulative_compaction_num_singleton_deltas, "5");
CONF_Int32(cumulative_compaction_num_threads, "1");
CONF_Int32(cumulative_compaction_num_threads_per_disk, "1");
CONF_Int64(cumulative_compaction_budgeted_bytes, "104857600");
CONF_Int32(cumulative_compaction_write_mbytes_per_sec, "100");

View File

@ -310,6 +310,7 @@ bool BaseCompaction::_check_whether_satisfy_policy(bool is_manual_trigger,
OLAPStatus BaseCompaction::_do_base_compaction(VersionHash new_base_version_hash,
vector<ColumnData*>* base_data_sources,
uint64_t* row_count) {
OlapStopWatch watch;
// 1. 生成新base文件对应的olap index
SegmentGroup* new_base = new (std::nothrow) SegmentGroup(_table.get(),
_new_base_version,
@ -399,7 +400,9 @@ OLAPStatus BaseCompaction::_do_base_compaction(VersionHash new_base_version_hash
<< "source_rows=" << source_rows
<< ", merged_rows=" << merged_rows
<< ", filted_rows=" << filted_rows
<< ", new_index_rows=" << new_base->num_rows();
<< ", new_index_rows=" << new_base->num_rows()
<< ", merged_version_num=" << _need_merged_versions.size()
<< ", time_us=" << watch.get_elapse_time_us();
}
LOG(INFO) << "succeed to do base compaction. table=" << _table->full_name()

View File

@ -369,6 +369,7 @@ bool CumulativeCompaction::_find_previous_version(const Version current_version,
OLAPStatus CumulativeCompaction::_do_cumulative_compaction() {
OLAPStatus res = OLAP_SUCCESS;
OlapStopWatch watch;
Merger merger(_table, _new_segment_group, READER_CUMULATIVE_COMPACTION);
// 1. merge delta files into new cumulative file
@ -412,7 +413,9 @@ OLAPStatus CumulativeCompaction::_do_cumulative_compaction() {
LOG(INFO) << "all row nums. source_rows=" << source_rows
<< ", merged_rows=" << merged_rows
<< ", filted_rows=" << filted_rows
<< ", new_index_rows=" << _new_segment_group->num_rows();
<< ", new_index_rows=" << _new_segment_group->num_rows()
<< ", merged_version_num=" << _need_merged_versions.size()
<< ", time_us=" << watch.get_elapse_time_us();
}
// 3. add new cumulative file into table

View File

@ -1833,8 +1833,8 @@ void OLAPEngine::start_clean_fd_cache() {
VLOG(10) << "end clean file descritpor cache";
}
void OLAPEngine::perform_cumulative_compaction() {
OLAPTablePtr best_table = _find_best_tablet_to_compaction(CompactionType::CUMULATIVE_COMPACTION);
void OLAPEngine::perform_cumulative_compaction(OlapStore* store) {
OLAPTablePtr best_table = _find_best_tablet_to_compaction(CompactionType::CUMULATIVE_COMPACTION, store);
if (best_table == nullptr) { return; }
DorisMetrics::cumulative_compaction_request_total.increment(1);
@ -1863,8 +1863,8 @@ void OLAPEngine::perform_cumulative_compaction() {
best_table->set_last_compaction_failure_time(0);
}
void OLAPEngine::perform_base_compaction() {
OLAPTablePtr best_table = _find_best_tablet_to_compaction(CompactionType::BASE_COMPACTION);
void OLAPEngine::perform_base_compaction(OlapStore* store) {
OLAPTablePtr best_table = _find_best_tablet_to_compaction(CompactionType::BASE_COMPACTION, store);
if (best_table == nullptr) { return; }
DorisMetrics::base_compaction_request_total.increment(1);
@ -1893,14 +1893,15 @@ void OLAPEngine::perform_base_compaction() {
best_table->set_last_compaction_failure_time(0);
}
OLAPTablePtr OLAPEngine::_find_best_tablet_to_compaction(CompactionType compaction_type) {
OLAPTablePtr OLAPEngine::_find_best_tablet_to_compaction(CompactionType compaction_type, OlapStore* store) {
ReadLock tablet_map_rdlock(&_tablet_map_lock);
uint32_t highest_score = 0;
OLAPTablePtr best_table;
int64_t now = UnixMillis();
for (tablet_map_t::value_type& table_ins : _tablet_map){
for (OLAPTablePtr& table_ptr : table_ins.second.table_arr) {
if (!table_ptr->is_used() || !table_ptr->is_loaded() || !_can_do_compaction(table_ptr)) {
if (table_ptr->store()->path_hash() != store->path_hash()
|| !table_ptr->is_used() || !table_ptr->is_loaded() || !_can_do_compaction(table_ptr)) {
continue;
}

View File

@ -182,8 +182,8 @@ public:
OLAPStatus clear();
void start_clean_fd_cache();
void perform_cumulative_compaction();
void perform_base_compaction();
void perform_cumulative_compaction(OlapStore* store);
void perform_base_compaction(OlapStore* store);
// 获取cache的使用情况信息
void get_cache_status(rapidjson::Document* document) const;
@ -531,7 +531,7 @@ private:
OLAPStatus _check_existed_or_else_create_dir(const std::string& path);
OLAPTablePtr _find_best_tablet_to_compaction(CompactionType compaction_type);
OLAPTablePtr _find_best_tablet_to_compaction(CompactionType compaction_type, OlapStore* store);
bool _can_do_compaction(OLAPTablePtr table);
void _cancel_unfinished_schema_change();
@ -588,7 +588,7 @@ private:
// Thread functions
// base compaction thread process function
void* _base_compaction_thread_callback(void* arg);
void* _base_compaction_thread_callback(void* arg, OlapStore* store);
// garbage sweep thread process function. clear snapshot and trash folder
void* _garbage_sweeper_thread_callback(void* arg);
@ -600,7 +600,7 @@ private:
void* _unused_index_thread_callback(void* arg);
// cumulative process function
void* _cumulative_compaction_thread_callback(void* arg);
void* _cumulative_compaction_thread_callback(void* arg, OlapStore* store);
// clean file descriptors cache
void* _fd_cache_clean_callback(void* arg);

View File

@ -57,23 +57,28 @@ OLAPStatus OLAPEngine::_start_bg_worker() {
[this] {
_unused_index_thread_callback(nullptr);
});
// convert store map to vector
std::vector<OlapStore*> store_vec;
for (auto& tmp_store : _store_map) {
store_vec.push_back(tmp_store.second);
}
int32_t store_num = store_vec.size();
// start be and ce threads for merge data
int32_t base_compaction_num_threads = config::base_compaction_num_threads;
int32_t base_compaction_num_threads = config::base_compaction_num_threads_per_disk * store_num;
_base_compaction_threads.reserve(base_compaction_num_threads);
for (uint32_t i = 0; i < base_compaction_num_threads; ++i) {
_base_compaction_threads.emplace_back(
[this] {
_base_compaction_thread_callback(nullptr);
[this, store_num, store_vec, i] {
_base_compaction_thread_callback(nullptr, store_vec[i % store_num]);
});
}
int32_t cumulative_compaction_num_threads = config::cumulative_compaction_num_threads;
int32_t cumulative_compaction_num_threads = config::cumulative_compaction_num_threads_per_disk * store_num;
_cumulative_compaction_threads.reserve(cumulative_compaction_num_threads);
for (uint32_t i = 0; i < cumulative_compaction_num_threads; ++i) {
_cumulative_compaction_threads.emplace_back(
[this] {
_cumulative_compaction_thread_callback(nullptr);
[this, store_num, store_vec, i] {
_cumulative_compaction_thread_callback(nullptr, store_vec[i % store_num]);
});
}
@ -104,7 +109,7 @@ void* OLAPEngine::_fd_cache_clean_callback(void* arg) {
return NULL;
}
void* OLAPEngine::_base_compaction_thread_callback(void* arg) {
void* OLAPEngine::_base_compaction_thread_callback(void* arg, OlapStore* store) {
#ifdef GOOGLE_PROFILER
ProfilerRegisterThread();
#endif
@ -122,7 +127,7 @@ void* OLAPEngine::_base_compaction_thread_callback(void* arg) {
// cgroup is not initialized at this time
// add tid to cgroup
CgroupsMgr::apply_system_cgroup();
perform_base_compaction();
perform_base_compaction(store);
usleep(interval * 1000000);
}
@ -218,7 +223,7 @@ void* OLAPEngine::_unused_index_thread_callback(void* arg) {
return NULL;
}
void* OLAPEngine::_cumulative_compaction_thread_callback(void* arg) {
void* OLAPEngine::_cumulative_compaction_thread_callback(void* arg, OlapStore* store) {
#ifdef GOOGLE_PROFILER
ProfilerRegisterThread();
#endif
@ -235,7 +240,7 @@ void* OLAPEngine::_cumulative_compaction_thread_callback(void* arg) {
// cgroup is not initialized at this time
// add tid to cgroup
CgroupsMgr::apply_system_cgroup();
perform_cumulative_compaction();
perform_cumulative_compaction(store);
usleep(interval * 1000000);
}