From dd4a0bd9261ff15ab8c2c2c68fd8322a96b614ef Mon Sep 17 00:00:00 2001 From: Fengjingkun Date: Fri, 12 Jan 2024 08:42:46 +0000 Subject: [PATCH] fix compaction parallel count --- src/share/scheduler/ob_tenant_dag_scheduler.h | 4 +- .../ob_partition_parallel_merge_ctx.cpp | 43 +++++++++++-------- 2 files changed, 26 insertions(+), 21 deletions(-) diff --git a/src/share/scheduler/ob_tenant_dag_scheduler.h b/src/share/scheduler/ob_tenant_dag_scheduler.h index e6b551c8a5..7da08947b4 100644 --- a/src/share/scheduler/ob_tenant_dag_scheduler.h +++ b/src/share/scheduler/ob_tenant_dag_scheduler.h @@ -1446,9 +1446,9 @@ inline bool is_reserve_mode() }) -constexpr uint64_t operator "" _percentage(unsigned long long percentage) +constexpr double operator "" _percentage(unsigned long long percentage) { - return percentage / 100; + return (percentage + 0.0) / 100; } #define ADAPTIVE_PERCENT 40_percentage diff --git a/src/storage/compaction/ob_partition_parallel_merge_ctx.cpp b/src/storage/compaction/ob_partition_parallel_merge_ctx.cpp index 1b76cdffa1..a003f1f662 100644 --- a/src/storage/compaction/ob_partition_parallel_merge_ctx.cpp +++ b/src/storage/compaction/ob_partition_parallel_merge_ctx.cpp @@ -279,12 +279,10 @@ int ObParallelMergeCtx::init_parallel_mini_merge(compaction::ObBasicTabletMergeC } if (OB_SUCC(ret)) { - if (MTL(ObTenantTabletScheduler *)->enable_adaptive_merge_schedule()) { - calc_adaptive_parallel_degree(ObDagPrio::DAG_PRIO_COMPACTION_HIGH, - ObCompactionEstimator::MINI_MEM_PER_THREAD, - (total_bytes + ObCompactionEstimator::MINI_PARALLEL_BASE_MEM - 1) / ObCompactionEstimator::MINI_PARALLEL_BASE_MEM, - concurrent_cnt_); - } + calc_adaptive_parallel_degree(ObDagPrio::DAG_PRIO_COMPACTION_HIGH, + ObCompactionEstimator::MINI_MEM_PER_THREAD, + (total_bytes + ObCompactionEstimator::MINI_PARALLEL_BASE_MEM - 1) / ObCompactionEstimator::MINI_PARALLEL_BASE_MEM, + concurrent_cnt_); ObArray store_ranges; store_ranges.set_attr(lib::ObMemAttr(MTL_ID(), "TmpMiniRanges", ObCtxIds::MERGE_NORMAL_CTX_ID)); @@ -347,7 +345,7 @@ int ObParallelMergeCtx::init_parallel_mini_minor_merge(compaction::ObBasicTablet ret = OB_INVALID_ARGUMENT; STORAGE_LOG(WARN, "Invalid argument to calc mini minor parallel degree", K(ret), K(tablet_size), K(range_info), K(tables.count())); - } else if (MTL(ObTenantTabletScheduler *)->enable_adaptive_merge_schedule()) { + } else { calc_adaptive_parallel_degree(ObDagPrio::DAG_PRIO_COMPACTION_MID, ObCompactionEstimator::MINOR_MEM_PER_THREAD, (range_info.total_size_ / tables.count() + tablet_size - 1) / tablet_size, @@ -392,20 +390,27 @@ void ObParallelMergeCtx::calc_adaptive_parallel_degree( const int64_t origin_degree, int64_t ¶llel_degree) { - int64_t worker_thread_cnt = 0; - int tmp_ret = OB_SUCCESS; - if (OB_TMP_FAIL(MTL(ObTenantDagScheduler *)->get_limit(prio, worker_thread_cnt))) { - worker_thread_cnt = ObCompactionEstimator::DEFAULT_MERGE_THREAD_CNT; - STORAGE_LOG_RET(WARN, tmp_ret, "failed to get worker thread cnt, use dfault value", K(prio), K(worker_thread_cnt)); - } - worker_thread_cnt = MAX(worker_thread_cnt, PARALLEL_MERGE_TARGET_TASK_CNT); + int64_t dag_worker_limit = 0; - int64_t mem_allow_max_thread_cnt = lib::get_tenant_memory_remain(MTL_ID()) * ADAPTIVE_PERCENT / MAX(mem_per_thread, 1); - int64_t max_concurrent_cnt = MAX(MIN(worker_thread_cnt, mem_allow_max_thread_cnt), 1); - parallel_degree = MIN(max_concurrent_cnt, origin_degree); - STORAGE_LOG(INFO, "Success to calc adaptive parallel degree", K(prio), K(mem_per_thread), - K(origin_degree), K(mem_allow_max_thread_cnt), K(parallel_degree)); + if (OB_TMP_FAIL(MTL(ObTenantDagScheduler *)->get_limit(prio, dag_worker_limit))) { + dag_worker_limit = ObCompactionEstimator::DEFAULT_MERGE_THREAD_CNT; + STORAGE_LOG_RET(WARN, tmp_ret, "failed to get worker thread cnt, use dfault value", K(prio), K(dag_worker_limit)); + } + parallel_degree = MIN(MAX(dag_worker_limit, PARALLEL_MERGE_TARGET_TASK_CNT), origin_degree); + + if (parallel_degree <= 2) { + // do nothing + } else if (MTL(ObTenantTabletScheduler *)->enable_adaptive_merge_schedule()) { + int64_t tenant_free_mem_byte = lib::get_tenant_memory_remain(MTL_ID()); + int64_t mem_allow_max_thread_cnt = tenant_free_mem_byte * ADAPTIVE_PERCENT / MAX(mem_per_thread, 1); + if (mem_allow_max_thread_cnt < parallel_degree) { + parallel_degree = MAX(parallel_degree / 2, 2); // fix the parallel degree + } + + STORAGE_LOG(INFO, "[ADAPTIVE_SCHED] calc adaptive parallel degree", K(prio), K(tenant_free_mem_byte), K(mem_per_thread), + K(dag_worker_limit), K(origin_degree), K(mem_allow_max_thread_cnt), K(parallel_degree)); + } }