From d6cb2d6d5c0ccbc454a801d7ef68c7241bb3cd6c Mon Sep 17 00:00:00 2001 From: Yongqiang YANG <98214048+dataroaring@users.noreply.github.com> Date: Wed, 3 Jan 2024 20:48:11 +0800 Subject: [PATCH] [improvement](compaction) start 1 cumu compaction thread each disk by default (#29430) --- be/src/agent/task_worker_pool.cpp | 3 +- be/src/common/config.cpp | 4 +- be/src/olap/olap_server.cpp | 90 ++++++++++++------- docs/en/docs/admin-manual/config/be-config.md | 12 +-- .../docs/admin-manual/config/be-config.md | 10 +-- 5 files changed, 75 insertions(+), 44 deletions(-) diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index f9d43ba231..a845b6253f 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -1528,7 +1528,8 @@ void PublishVersionWorkerPool::publish_version_callback(const TAgentTaskRequest& if (!tablet->tablet_meta()->tablet_schema()->disable_auto_compaction()) { tablet->published_count.fetch_add(1); int64_t published_count = tablet->published_count.load(); - if (published_count % 10 == 0) { + if (tablet->exceed_version_limit(config::max_tablet_version_num * 2 / 3) && + published_count % 20 == 0) { auto st = _engine.submit_compaction_task( tablet, CompactionType::CUMULATIVE_COMPACTION, true); if (!st.ok()) [[unlikely]] { diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 040fc5ae21..a53885bd13 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -365,8 +365,8 @@ DEFINE_mInt32(ordered_data_compaction_min_segment_size, "10485760"); // This config can be set to limit thread number in compaction thread pool. DEFINE_mInt32(max_base_compaction_threads, "4"); -DEFINE_mInt32(max_cumu_compaction_threads, "10"); -DEFINE_mInt32(max_single_replica_compaction_threads, "10"); +DEFINE_mInt32(max_cumu_compaction_threads, "-1"); +DEFINE_mInt32(max_single_replica_compaction_threads, "-1"); DEFINE_Bool(enable_base_compaction_idle_sched, "true"); DEFINE_mInt64(base_compaction_min_rowset_num, "5"); diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index 9ee24dee19..60c309584a 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -93,6 +93,33 @@ volatile uint32_t g_schema_change_active_threads = 0; static const uint64_t DEFAULT_SEED = 104729; static const uint64_t MOD_PRIME = 7652413; +static int32_t get_cumu_compaction_threads_num(size_t data_dirs_num) { + int32_t threads_num = config::max_cumu_compaction_threads; + if (threads_num == -1) { + threads_num = data_dirs_num; + } + threads_num = threads_num <= 0 ? 1 : threads_num; + return threads_num; +} + +static int32_t get_base_compaction_threads_num(size_t data_dirs_num) { + int32_t threads_num = config::max_base_compaction_threads; + if (threads_num == -1) { + threads_num = data_dirs_num; + } + threads_num = threads_num <= 0 ? 1 : threads_num; + return threads_num; +} + +static int32_t get_single_replica_compaction_threads_num(size_t data_dirs_num) { + int32_t threads_num = config::max_single_replica_compaction_threads; + if (threads_num == -1) { + threads_num = data_dirs_num; + } + threads_num = threads_num <= 0 ? 1 : threads_num; + return threads_num; +} + Status StorageEngine::start_bg_threads() { RETURN_IF_ERROR(Thread::create( "StorageEngine", "unused_rowset_monitor_thread", @@ -118,17 +145,22 @@ Status StorageEngine::start_bg_threads() { data_dirs.push_back(tmp_store.second); } + auto base_compaction_threads = get_base_compaction_threads_num(data_dirs.size()); + auto cumu_compaction_threads = get_cumu_compaction_threads_num(data_dirs.size()); + auto single_replica_compaction_threads = + get_single_replica_compaction_threads_num(data_dirs.size()); + RETURN_IF_ERROR(ThreadPoolBuilder("BaseCompactionTaskThreadPool") - .set_min_threads(config::max_base_compaction_threads) - .set_max_threads(config::max_base_compaction_threads) + .set_min_threads(base_compaction_threads) + .set_max_threads(base_compaction_threads) .build(&_base_compaction_thread_pool)); RETURN_IF_ERROR(ThreadPoolBuilder("CumuCompactionTaskThreadPool") - .set_min_threads(config::max_cumu_compaction_threads) - .set_max_threads(config::max_cumu_compaction_threads) + .set_min_threads(cumu_compaction_threads) + .set_max_threads(cumu_compaction_threads) .build(&_cumu_compaction_thread_pool)); RETURN_IF_ERROR(ThreadPoolBuilder("SingleReplicaCompactionTaskThreadPool") - .set_min_threads(config::max_single_replica_compaction_threads) - .set_max_threads(config::max_single_replica_compaction_threads) + .set_min_threads(single_replica_compaction_threads) + .set_max_threads(single_replica_compaction_threads) .build(&_single_replica_compaction_thread_pool)); if (config::enable_segcompaction) { @@ -458,64 +490,62 @@ void StorageEngine::_tablet_path_check_callback() { } void StorageEngine::_adjust_compaction_thread_num() { - if (_base_compaction_thread_pool->max_threads() != config::max_base_compaction_threads) { + auto base_compaction_threads_num = get_base_compaction_threads_num(_store_map.size()); + if (_base_compaction_thread_pool->max_threads() != base_compaction_threads_num) { int old_max_threads = _base_compaction_thread_pool->max_threads(); - Status status = - _base_compaction_thread_pool->set_max_threads(config::max_base_compaction_threads); + Status status = _base_compaction_thread_pool->set_max_threads(base_compaction_threads_num); if (status.ok()) { VLOG_NOTICE << "update base compaction thread pool max_threads from " << old_max_threads - << " to " << config::max_base_compaction_threads; + << " to " << base_compaction_threads_num; } } - if (_base_compaction_thread_pool->min_threads() != config::max_base_compaction_threads) { + if (_base_compaction_thread_pool->min_threads() != base_compaction_threads_num) { int old_min_threads = _base_compaction_thread_pool->min_threads(); - Status status = - _base_compaction_thread_pool->set_min_threads(config::max_base_compaction_threads); + Status status = _base_compaction_thread_pool->set_min_threads(base_compaction_threads_num); if (status.ok()) { VLOG_NOTICE << "update base compaction thread pool min_threads from " << old_min_threads - << " to " << config::max_base_compaction_threads; + << " to " << base_compaction_threads_num; } } - if (_cumu_compaction_thread_pool->max_threads() != config::max_cumu_compaction_threads) { + auto cumu_compaction_threads_num = get_cumu_compaction_threads_num(_store_map.size()); + if (_cumu_compaction_thread_pool->max_threads() != cumu_compaction_threads_num) { int old_max_threads = _cumu_compaction_thread_pool->max_threads(); - Status status = - _cumu_compaction_thread_pool->set_max_threads(config::max_cumu_compaction_threads); + Status status = _cumu_compaction_thread_pool->set_max_threads(cumu_compaction_threads_num); if (status.ok()) { VLOG_NOTICE << "update cumu compaction thread pool max_threads from " << old_max_threads - << " to " << config::max_cumu_compaction_threads; + << " to " << cumu_compaction_threads_num; } } - if (_cumu_compaction_thread_pool->min_threads() != config::max_cumu_compaction_threads) { + if (_cumu_compaction_thread_pool->min_threads() != cumu_compaction_threads_num) { int old_min_threads = _cumu_compaction_thread_pool->min_threads(); - Status status = - _cumu_compaction_thread_pool->set_min_threads(config::max_cumu_compaction_threads); + Status status = _cumu_compaction_thread_pool->set_min_threads(cumu_compaction_threads_num); if (status.ok()) { VLOG_NOTICE << "update cumu compaction thread pool min_threads from " << old_min_threads - << " to " << config::max_cumu_compaction_threads; + << " to " << cumu_compaction_threads_num; } } + auto single_replica_compaction_threads_num = + get_single_replica_compaction_threads_num(_store_map.size()); if (_single_replica_compaction_thread_pool->max_threads() != - config::max_single_replica_compaction_threads) { + single_replica_compaction_threads_num) { int old_max_threads = _single_replica_compaction_thread_pool->max_threads(); Status status = _single_replica_compaction_thread_pool->set_max_threads( - config::max_single_replica_compaction_threads); + single_replica_compaction_threads_num); if (status.ok()) { VLOG_NOTICE << "update single replica compaction thread pool max_threads from " - << old_max_threads << " to " - << config::max_single_replica_compaction_threads; + << old_max_threads << " to " << single_replica_compaction_threads_num; } } if (_single_replica_compaction_thread_pool->min_threads() != - config::max_single_replica_compaction_threads) { + single_replica_compaction_threads_num) { int old_min_threads = _single_replica_compaction_thread_pool->min_threads(); Status status = _single_replica_compaction_thread_pool->set_min_threads( - config::max_single_replica_compaction_threads); + single_replica_compaction_threads_num); if (status.ok()) { VLOG_NOTICE << "update single replica compaction thread pool min_threads from " - << old_min_threads << " to " - << config::max_single_replica_compaction_threads; + << old_min_threads << " to " << single_replica_compaction_threads_num; } } } diff --git a/docs/en/docs/admin-manual/config/be-config.md b/docs/en/docs/admin-manual/config/be-config.md index 1b8ee9423d..ec53e9a811 100644 --- a/docs/en/docs/admin-manual/config/be-config.md +++ b/docs/en/docs/admin-manual/config/be-config.md @@ -482,9 +482,9 @@ There are two ways to configure BE configuration items: * Default value: 10485760 #### `max_base_compaction_threads` - +git * Type: int32 -* Description: The maximum of thread number in base compaction thread pool. +* Description: The maximum of thread number in base compaction thread pool, -1 means one thread per disk. * Default value: 4 #### `generate_compaction_tasks_interval_ms` @@ -625,8 +625,8 @@ BaseCompaction:546859: #### `max_cumu_compaction_threads` * Type: int32 -* Description: The maximum of thread number in cumulative compaction thread pool. -* Default value: 10 +* Description: The maximum of thread number in cumulative compaction thread pool, -1 means one thread per disk. +* Default value: -1 #### `enable_segcompaction` @@ -686,8 +686,8 @@ BaseCompaction:546859: #### `max_single_replica_compaction_threads` * Type: int32 -* Description: The maximum of thread number in single replica compaction thread pool. -* Default value: 10 +* Description: The maximum of thread number in single replica compaction thread pool. -1 means one thread per disk. +* Default value: -1 #### `update_replica_infos_interval_seconds` diff --git a/docs/zh-CN/docs/admin-manual/config/be-config.md b/docs/zh-CN/docs/admin-manual/config/be-config.md index 7b99c1f2cb..db4e6764a2 100644 --- a/docs/zh-CN/docs/admin-manual/config/be-config.md +++ b/docs/zh-CN/docs/admin-manual/config/be-config.md @@ -498,7 +498,7 @@ BE 重启后该配置将失效。如果想持久化修改结果,使用如下 #### `max_base_compaction_threads` * 类型:int32 -* 描述:Base Compaction线程池中线程数量的最大值。 +* 描述:Base Compaction线程池中线程数量的最大值, -1 表示每个磁盘一个线程。 * 默认值:4 #### `generate_compaction_tasks_interval_ms` @@ -639,8 +639,8 @@ BaseCompaction:546859: #### `max_cumu_compaction_threads` * 类型:int32 -* 描述:Cumulative Compaction线程池中线程数量的最大值。 -* 默认值:10 +* 描述:Cumulative Compaction线程池中线程数量的最大值, -1 表示每个磁盘一个线程。 +* 默认值:-1 #### `enable_segcompaction` @@ -712,8 +712,8 @@ BaseCompaction:546859: #### `max_single_replica_compaction_threads` * 类型:int32 -* 描述:Single Replica Compaction 线程池中线程数量的最大值。 -* 默认值:10 +* 描述:Single Replica Compaction 线程池中线程数量的最大值, -1 表示每个磁盘一个线程。 +* 默认值:-1 #### `update_replica_infos_interval_seconds`