diff --git a/src/storage/column_store/ob_co_merge_dag.cpp b/src/storage/column_store/ob_co_merge_dag.cpp index cdf9504b5d..ee6497ecde 100644 --- a/src/storage/column_store/ob_co_merge_dag.cpp +++ b/src/storage/column_store/ob_co_merge_dag.cpp @@ -1140,7 +1140,7 @@ int ObCOMergeDagNet::create_co_execute_dags(share::ObIDag &schedule_dag) ret = OB_ERR_UNEXPECTED; LOG_WARN("merge ctx is null or schema invalid", K(ret), KPC(co_merge_ctx_)); SET_DAG_LOCATION(&schedule_dag); - } else if (OB_FAIL(choose_merge_batch_size(*co_merge_ctx_))) { + } else if (OB_FAIL(choose_merge_batch_size(co_merge_ctx_->array_count_))) { LOG_WARN("failed to choose merge batch size", K(ret)); } else if (OB_FAIL(inner_create_and_schedule_dags(&schedule_dag))) { LOG_WARN("failed to create and schedule dags", K(ret)); @@ -1149,14 +1149,13 @@ int ObCOMergeDagNet::create_co_execute_dags(share::ObIDag &schedule_dag) return ret; } -int ObCOMergeDagNet::choose_merge_batch_size(ObCOTabletMergeCtx &co_ctx) +int ObCOMergeDagNet::choose_merge_batch_size(const int64_t column_group_cnt) { int ret = OB_SUCCESS; - const int64_t column_group_cnt = co_ctx.array_count_; if (OB_UNLIKELY(column_group_cnt <= 0)) { ret = OB_ERR_UNEXPECTED; - LOG_WARN("get unexpected count", K(ret), K(column_group_cnt), K(co_ctx)); + LOG_WARN("get unexpected count", K(ret), K(column_group_cnt)); } else if (column_group_cnt < ObCOTabletMergeCtx::DEFAULT_CG_MERGE_BATCH_SIZE * 2) { merge_batch_size_ = column_group_cnt; } else { @@ -1211,13 +1210,15 @@ void ObCOMergeDagNet::try_update_merge_batch_size(const int64_t column_group_cnt int64_t batch_mem_allow_per_thread = MAX(mem_allow_used / merge_thread - ObCompactionEstimator::MAJOR_MEM_PER_THREAD, 0); int64_t mem_allow_batch_size = MAX(batch_mem_allow_per_thread / ObCompactionEstimator::CO_MAJOR_CG_BASE_MEM, 1); - if (mem_allow_batch_size >= column_group_cnt) { - merge_batch_size_ = column_group_cnt; - } else if (mem_allow_batch_size > merge_batch_size_) { - merge_batch_size_ = mem_allow_batch_size; - } else { - //TODO(@DanLing) use default batch size now, we need more experiment about mem stat to dec batch size + if (mem_allow_batch_size > merge_batch_size_ * 2) { + merge_batch_size_ = MIN(merge_batch_size_ * 2, column_group_cnt); + } else if (OB_TMP_FAIL(choose_merge_batch_size(column_group_cnt))) { + merge_batch_size_ = ObCOTabletMergeCtx::DEFAULT_CG_MERGE_BATCH_SIZE; + LOG_WARN_RET(tmp_ret, "failed to choose merge batch size, use default batch size", K(column_group_cnt), K(merge_batch_size_)); } + + FLOG_INFO("[ADAPTIVE_SCHED] update co merge batch size", K(merge_thread), + K(mem_allow_used), K(batch_mem_allow_per_thread), K(mem_allow_batch_size), K(merge_batch_size_)); } int ObCOMergeDagNet::inner_create_and_schedule_dags(ObIDag *parent_dag) diff --git a/src/storage/column_store/ob_co_merge_dag.h b/src/storage/column_store/ob_co_merge_dag.h index b122b8b0fd..4dda4ec701 100644 --- a/src/storage/column_store/ob_co_merge_dag.h +++ b/src/storage/column_store/ob_co_merge_dag.h @@ -318,7 +318,7 @@ private: const int64_t max_cg_idx, ObCOMergeBatchExeDag *&dag, const bool add_scheduler_flag = true); - int choose_merge_batch_size(ObCOTabletMergeCtx &co_ctx); + int choose_merge_batch_size(const int64_t column_group_cnt); int inner_schedule_finish_dag(ObIDag *parent_dag = nullptr); void try_update_merge_batch_size(const int64_t column_group_cnt); int inner_create_and_schedule_dags(ObIDag *parent_dag = nullptr);