[improvement](compaction) start 1 cumu compaction thread each disk by default (#29430)

This commit is contained in:
Yongqiang YANG
2024-01-03 20:48:11 +08:00
committed by GitHub
parent d93812d23f
commit d6cb2d6d5c
5 changed files with 75 additions and 44 deletions

View File

@ -1528,7 +1528,8 @@ void PublishVersionWorkerPool::publish_version_callback(const TAgentTaskRequest&
if (!tablet->tablet_meta()->tablet_schema()->disable_auto_compaction()) {
tablet->published_count.fetch_add(1);
int64_t published_count = tablet->published_count.load();
if (published_count % 10 == 0) {
if (tablet->exceed_version_limit(config::max_tablet_version_num * 2 / 3) &&
published_count % 20 == 0) {
auto st = _engine.submit_compaction_task(
tablet, CompactionType::CUMULATIVE_COMPACTION, true);
if (!st.ok()) [[unlikely]] {

View File

@ -365,8 +365,8 @@ DEFINE_mInt32(ordered_data_compaction_min_segment_size, "10485760");
// This config can be set to limit thread number in compaction thread pool.
DEFINE_mInt32(max_base_compaction_threads, "4");
DEFINE_mInt32(max_cumu_compaction_threads, "10");
DEFINE_mInt32(max_single_replica_compaction_threads, "10");
DEFINE_mInt32(max_cumu_compaction_threads, "-1");
DEFINE_mInt32(max_single_replica_compaction_threads, "-1");
DEFINE_Bool(enable_base_compaction_idle_sched, "true");
DEFINE_mInt64(base_compaction_min_rowset_num, "5");

View File

@ -93,6 +93,33 @@ volatile uint32_t g_schema_change_active_threads = 0;
static const uint64_t DEFAULT_SEED = 104729;
static const uint64_t MOD_PRIME = 7652413;
static int32_t get_cumu_compaction_threads_num(size_t data_dirs_num) {
int32_t threads_num = config::max_cumu_compaction_threads;
if (threads_num == -1) {
threads_num = data_dirs_num;
}
threads_num = threads_num <= 0 ? 1 : threads_num;
return threads_num;
}
static int32_t get_base_compaction_threads_num(size_t data_dirs_num) {
int32_t threads_num = config::max_base_compaction_threads;
if (threads_num == -1) {
threads_num = data_dirs_num;
}
threads_num = threads_num <= 0 ? 1 : threads_num;
return threads_num;
}
static int32_t get_single_replica_compaction_threads_num(size_t data_dirs_num) {
int32_t threads_num = config::max_single_replica_compaction_threads;
if (threads_num == -1) {
threads_num = data_dirs_num;
}
threads_num = threads_num <= 0 ? 1 : threads_num;
return threads_num;
}
Status StorageEngine::start_bg_threads() {
RETURN_IF_ERROR(Thread::create(
"StorageEngine", "unused_rowset_monitor_thread",
@ -118,17 +145,22 @@ Status StorageEngine::start_bg_threads() {
data_dirs.push_back(tmp_store.second);
}
auto base_compaction_threads = get_base_compaction_threads_num(data_dirs.size());
auto cumu_compaction_threads = get_cumu_compaction_threads_num(data_dirs.size());
auto single_replica_compaction_threads =
get_single_replica_compaction_threads_num(data_dirs.size());
RETURN_IF_ERROR(ThreadPoolBuilder("BaseCompactionTaskThreadPool")
.set_min_threads(config::max_base_compaction_threads)
.set_max_threads(config::max_base_compaction_threads)
.set_min_threads(base_compaction_threads)
.set_max_threads(base_compaction_threads)
.build(&_base_compaction_thread_pool));
RETURN_IF_ERROR(ThreadPoolBuilder("CumuCompactionTaskThreadPool")
.set_min_threads(config::max_cumu_compaction_threads)
.set_max_threads(config::max_cumu_compaction_threads)
.set_min_threads(cumu_compaction_threads)
.set_max_threads(cumu_compaction_threads)
.build(&_cumu_compaction_thread_pool));
RETURN_IF_ERROR(ThreadPoolBuilder("SingleReplicaCompactionTaskThreadPool")
.set_min_threads(config::max_single_replica_compaction_threads)
.set_max_threads(config::max_single_replica_compaction_threads)
.set_min_threads(single_replica_compaction_threads)
.set_max_threads(single_replica_compaction_threads)
.build(&_single_replica_compaction_thread_pool));
if (config::enable_segcompaction) {
@ -458,64 +490,62 @@ void StorageEngine::_tablet_path_check_callback() {
}
void StorageEngine::_adjust_compaction_thread_num() {
if (_base_compaction_thread_pool->max_threads() != config::max_base_compaction_threads) {
auto base_compaction_threads_num = get_base_compaction_threads_num(_store_map.size());
if (_base_compaction_thread_pool->max_threads() != base_compaction_threads_num) {
int old_max_threads = _base_compaction_thread_pool->max_threads();
Status status =
_base_compaction_thread_pool->set_max_threads(config::max_base_compaction_threads);
Status status = _base_compaction_thread_pool->set_max_threads(base_compaction_threads_num);
if (status.ok()) {
VLOG_NOTICE << "update base compaction thread pool max_threads from " << old_max_threads
<< " to " << config::max_base_compaction_threads;
<< " to " << base_compaction_threads_num;
}
}
if (_base_compaction_thread_pool->min_threads() != config::max_base_compaction_threads) {
if (_base_compaction_thread_pool->min_threads() != base_compaction_threads_num) {
int old_min_threads = _base_compaction_thread_pool->min_threads();
Status status =
_base_compaction_thread_pool->set_min_threads(config::max_base_compaction_threads);
Status status = _base_compaction_thread_pool->set_min_threads(base_compaction_threads_num);
if (status.ok()) {
VLOG_NOTICE << "update base compaction thread pool min_threads from " << old_min_threads
<< " to " << config::max_base_compaction_threads;
<< " to " << base_compaction_threads_num;
}
}
if (_cumu_compaction_thread_pool->max_threads() != config::max_cumu_compaction_threads) {
auto cumu_compaction_threads_num = get_cumu_compaction_threads_num(_store_map.size());
if (_cumu_compaction_thread_pool->max_threads() != cumu_compaction_threads_num) {
int old_max_threads = _cumu_compaction_thread_pool->max_threads();
Status status =
_cumu_compaction_thread_pool->set_max_threads(config::max_cumu_compaction_threads);
Status status = _cumu_compaction_thread_pool->set_max_threads(cumu_compaction_threads_num);
if (status.ok()) {
VLOG_NOTICE << "update cumu compaction thread pool max_threads from " << old_max_threads
<< " to " << config::max_cumu_compaction_threads;
<< " to " << cumu_compaction_threads_num;
}
}
if (_cumu_compaction_thread_pool->min_threads() != config::max_cumu_compaction_threads) {
if (_cumu_compaction_thread_pool->min_threads() != cumu_compaction_threads_num) {
int old_min_threads = _cumu_compaction_thread_pool->min_threads();
Status status =
_cumu_compaction_thread_pool->set_min_threads(config::max_cumu_compaction_threads);
Status status = _cumu_compaction_thread_pool->set_min_threads(cumu_compaction_threads_num);
if (status.ok()) {
VLOG_NOTICE << "update cumu compaction thread pool min_threads from " << old_min_threads
<< " to " << config::max_cumu_compaction_threads;
<< " to " << cumu_compaction_threads_num;
}
}
auto single_replica_compaction_threads_num =
get_single_replica_compaction_threads_num(_store_map.size());
if (_single_replica_compaction_thread_pool->max_threads() !=
config::max_single_replica_compaction_threads) {
single_replica_compaction_threads_num) {
int old_max_threads = _single_replica_compaction_thread_pool->max_threads();
Status status = _single_replica_compaction_thread_pool->set_max_threads(
config::max_single_replica_compaction_threads);
single_replica_compaction_threads_num);
if (status.ok()) {
VLOG_NOTICE << "update single replica compaction thread pool max_threads from "
<< old_max_threads << " to "
<< config::max_single_replica_compaction_threads;
<< old_max_threads << " to " << single_replica_compaction_threads_num;
}
}
if (_single_replica_compaction_thread_pool->min_threads() !=
config::max_single_replica_compaction_threads) {
single_replica_compaction_threads_num) {
int old_min_threads = _single_replica_compaction_thread_pool->min_threads();
Status status = _single_replica_compaction_thread_pool->set_min_threads(
config::max_single_replica_compaction_threads);
single_replica_compaction_threads_num);
if (status.ok()) {
VLOG_NOTICE << "update single replica compaction thread pool min_threads from "
<< old_min_threads << " to "
<< config::max_single_replica_compaction_threads;
<< old_min_threads << " to " << single_replica_compaction_threads_num;
}
}
}

View File

@ -482,9 +482,9 @@ There are two ways to configure BE configuration items:
* Default value: 10485760
#### `max_base_compaction_threads`
git
* Type: int32
* Description: The maximum of thread number in base compaction thread pool.
* Description: The maximum of thread number in base compaction thread pool, -1 means one thread per disk.
* Default value: 4
#### `generate_compaction_tasks_interval_ms`
@ -625,8 +625,8 @@ BaseCompaction:546859:
#### `max_cumu_compaction_threads`
* Type: int32
* Description: The maximum of thread number in cumulative compaction thread pool.
* Default value: 10
* Description: The maximum of thread number in cumulative compaction thread pool, -1 means one thread per disk.
* Default value: -1
#### `enable_segcompaction`
@ -686,8 +686,8 @@ BaseCompaction:546859:
#### `max_single_replica_compaction_threads`
* Type: int32
* Description: The maximum of thread number in single replica compaction thread pool.
* Default value: 10
* Description: The maximum of thread number in single replica compaction thread pool. -1 means one thread per disk.
* Default value: -1
#### `update_replica_infos_interval_seconds`

View File

@ -498,7 +498,7 @@ BE 重启后该配置将失效。如果想持久化修改结果,使用如下
#### `max_base_compaction_threads`
* 类型:int32
* 描述:Base Compaction线程池中线程数量的最大值。
* 描述:Base Compaction线程池中线程数量的最大值, -1 表示每个磁盘一个线程
* 默认值:4
#### `generate_compaction_tasks_interval_ms`
@ -639,8 +639,8 @@ BaseCompaction:546859:
#### `max_cumu_compaction_threads`
* 类型:int32
* 描述:Cumulative Compaction线程池中线程数量的最大值。
* 默认值:10
* 描述:Cumulative Compaction线程池中线程数量的最大值, -1 表示每个磁盘一个线程
* 默认值:-1
#### `enable_segcompaction`
@ -712,8 +712,8 @@ BaseCompaction:546859:
#### `max_single_replica_compaction_threads`
* 类型:int32
* 描述:Single Replica Compaction 线程池中线程数量的最大值。
* 默认值:10
* 描述:Single Replica Compaction 线程池中线程数量的最大值, -1 表示每个磁盘一个线程
* 默认值:-1
#### `update_replica_infos_interval_seconds`