fix compaction parallel count

This commit is contained in:
Fengjingkun
2024-01-12 08:42:46 +00:00
committed by ob-robot
parent 41776006da
commit dd4a0bd926
2 changed files with 26 additions and 21 deletions

View File

@ -1446,9 +1446,9 @@ inline bool is_reserve_mode()
})
constexpr uint64_t operator "" _percentage(unsigned long long percentage)
constexpr double operator "" _percentage(unsigned long long percentage)
{
return percentage / 100;
return (percentage + 0.0) / 100;
}
#define ADAPTIVE_PERCENT 40_percentage

View File

@ -279,12 +279,10 @@ int ObParallelMergeCtx::init_parallel_mini_merge(compaction::ObBasicTabletMergeC
}
if (OB_SUCC(ret)) {
if (MTL(ObTenantTabletScheduler *)->enable_adaptive_merge_schedule()) {
calc_adaptive_parallel_degree(ObDagPrio::DAG_PRIO_COMPACTION_HIGH,
ObCompactionEstimator::MINI_MEM_PER_THREAD,
(total_bytes + ObCompactionEstimator::MINI_PARALLEL_BASE_MEM - 1) / ObCompactionEstimator::MINI_PARALLEL_BASE_MEM,
concurrent_cnt_);
}
calc_adaptive_parallel_degree(ObDagPrio::DAG_PRIO_COMPACTION_HIGH,
ObCompactionEstimator::MINI_MEM_PER_THREAD,
(total_bytes + ObCompactionEstimator::MINI_PARALLEL_BASE_MEM - 1) / ObCompactionEstimator::MINI_PARALLEL_BASE_MEM,
concurrent_cnt_);
ObArray<ObStoreRange> store_ranges;
store_ranges.set_attr(lib::ObMemAttr(MTL_ID(), "TmpMiniRanges", ObCtxIds::MERGE_NORMAL_CTX_ID));
@ -347,7 +345,7 @@ int ObParallelMergeCtx::init_parallel_mini_minor_merge(compaction::ObBasicTablet
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "Invalid argument to calc mini minor parallel degree", K(ret), K(tablet_size),
K(range_info), K(tables.count()));
} else if (MTL(ObTenantTabletScheduler *)->enable_adaptive_merge_schedule()) {
} else {
calc_adaptive_parallel_degree(ObDagPrio::DAG_PRIO_COMPACTION_MID,
ObCompactionEstimator::MINOR_MEM_PER_THREAD,
(range_info.total_size_ / tables.count() + tablet_size - 1) / tablet_size,
@ -392,20 +390,27 @@ void ObParallelMergeCtx::calc_adaptive_parallel_degree(
const int64_t origin_degree,
int64_t &parallel_degree)
{
int64_t worker_thread_cnt = 0;
int tmp_ret = OB_SUCCESS;
if (OB_TMP_FAIL(MTL(ObTenantDagScheduler *)->get_limit(prio, worker_thread_cnt))) {
worker_thread_cnt = ObCompactionEstimator::DEFAULT_MERGE_THREAD_CNT;
STORAGE_LOG_RET(WARN, tmp_ret, "failed to get worker thread cnt, use dfault value", K(prio), K(worker_thread_cnt));
}
worker_thread_cnt = MAX(worker_thread_cnt, PARALLEL_MERGE_TARGET_TASK_CNT);
int64_t dag_worker_limit = 0;
int64_t mem_allow_max_thread_cnt = lib::get_tenant_memory_remain(MTL_ID()) * ADAPTIVE_PERCENT / MAX(mem_per_thread, 1);
int64_t max_concurrent_cnt = MAX(MIN(worker_thread_cnt, mem_allow_max_thread_cnt), 1);
parallel_degree = MIN(max_concurrent_cnt, origin_degree);
STORAGE_LOG(INFO, "Success to calc adaptive parallel degree", K(prio), K(mem_per_thread),
K(origin_degree), K(mem_allow_max_thread_cnt), K(parallel_degree));
if (OB_TMP_FAIL(MTL(ObTenantDagScheduler *)->get_limit(prio, dag_worker_limit))) {
dag_worker_limit = ObCompactionEstimator::DEFAULT_MERGE_THREAD_CNT;
STORAGE_LOG_RET(WARN, tmp_ret, "failed to get worker thread cnt, use dfault value", K(prio), K(dag_worker_limit));
}
parallel_degree = MIN(MAX(dag_worker_limit, PARALLEL_MERGE_TARGET_TASK_CNT), origin_degree);
if (parallel_degree <= 2) {
// do nothing
} else if (MTL(ObTenantTabletScheduler *)->enable_adaptive_merge_schedule()) {
int64_t tenant_free_mem_byte = lib::get_tenant_memory_remain(MTL_ID());
int64_t mem_allow_max_thread_cnt = tenant_free_mem_byte * ADAPTIVE_PERCENT / MAX(mem_per_thread, 1);
if (mem_allow_max_thread_cnt < parallel_degree) {
parallel_degree = MAX(parallel_degree / 2, 2); // fix the parallel degree
}
STORAGE_LOG(INFO, "[ADAPTIVE_SCHED] calc adaptive parallel degree", K(prio), K(tenant_free_mem_byte), K(mem_per_thread),
K(dag_worker_limit), K(origin_degree), K(mem_allow_max_thread_cnt), K(parallel_degree));
}
}