optimize adaptive batch size for co merge
This commit is contained in:
@ -1140,7 +1140,7 @@ int ObCOMergeDagNet::create_co_execute_dags(share::ObIDag &schedule_dag)
|
|||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_WARN("merge ctx is null or schema invalid", K(ret), KPC(co_merge_ctx_));
|
LOG_WARN("merge ctx is null or schema invalid", K(ret), KPC(co_merge_ctx_));
|
||||||
SET_DAG_LOCATION(&schedule_dag);
|
SET_DAG_LOCATION(&schedule_dag);
|
||||||
} else if (OB_FAIL(choose_merge_batch_size(*co_merge_ctx_))) {
|
} else if (OB_FAIL(choose_merge_batch_size(co_merge_ctx_->array_count_))) {
|
||||||
LOG_WARN("failed to choose merge batch size", K(ret));
|
LOG_WARN("failed to choose merge batch size", K(ret));
|
||||||
} else if (OB_FAIL(inner_create_and_schedule_dags(&schedule_dag))) {
|
} else if (OB_FAIL(inner_create_and_schedule_dags(&schedule_dag))) {
|
||||||
LOG_WARN("failed to create and schedule dags", K(ret));
|
LOG_WARN("failed to create and schedule dags", K(ret));
|
||||||
@ -1149,14 +1149,13 @@ int ObCOMergeDagNet::create_co_execute_dags(share::ObIDag &schedule_dag)
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObCOMergeDagNet::choose_merge_batch_size(ObCOTabletMergeCtx &co_ctx)
|
int ObCOMergeDagNet::choose_merge_batch_size(const int64_t column_group_cnt)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
const int64_t column_group_cnt = co_ctx.array_count_;
|
|
||||||
|
|
||||||
if (OB_UNLIKELY(column_group_cnt <= 0)) {
|
if (OB_UNLIKELY(column_group_cnt <= 0)) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_WARN("get unexpected count", K(ret), K(column_group_cnt), K(co_ctx));
|
LOG_WARN("get unexpected count", K(ret), K(column_group_cnt));
|
||||||
} else if (column_group_cnt < ObCOTabletMergeCtx::DEFAULT_CG_MERGE_BATCH_SIZE * 2) {
|
} else if (column_group_cnt < ObCOTabletMergeCtx::DEFAULT_CG_MERGE_BATCH_SIZE * 2) {
|
||||||
merge_batch_size_ = column_group_cnt;
|
merge_batch_size_ = column_group_cnt;
|
||||||
} else {
|
} else {
|
||||||
@ -1211,13 +1210,15 @@ void ObCOMergeDagNet::try_update_merge_batch_size(const int64_t column_group_cnt
|
|||||||
int64_t batch_mem_allow_per_thread = MAX(mem_allow_used / merge_thread - ObCompactionEstimator::MAJOR_MEM_PER_THREAD, 0);
|
int64_t batch_mem_allow_per_thread = MAX(mem_allow_used / merge_thread - ObCompactionEstimator::MAJOR_MEM_PER_THREAD, 0);
|
||||||
int64_t mem_allow_batch_size = MAX(batch_mem_allow_per_thread / ObCompactionEstimator::CO_MAJOR_CG_BASE_MEM, 1);
|
int64_t mem_allow_batch_size = MAX(batch_mem_allow_per_thread / ObCompactionEstimator::CO_MAJOR_CG_BASE_MEM, 1);
|
||||||
|
|
||||||
if (mem_allow_batch_size >= column_group_cnt) {
|
if (mem_allow_batch_size > merge_batch_size_ * 2) {
|
||||||
merge_batch_size_ = column_group_cnt;
|
merge_batch_size_ = MIN(merge_batch_size_ * 2, column_group_cnt);
|
||||||
} else if (mem_allow_batch_size > merge_batch_size_) {
|
} else if (OB_TMP_FAIL(choose_merge_batch_size(column_group_cnt))) {
|
||||||
merge_batch_size_ = mem_allow_batch_size;
|
merge_batch_size_ = ObCOTabletMergeCtx::DEFAULT_CG_MERGE_BATCH_SIZE;
|
||||||
} else {
|
LOG_WARN_RET(tmp_ret, "failed to choose merge batch size, use default batch size", K(column_group_cnt), K(merge_batch_size_));
|
||||||
//TODO(@DanLing) use default batch size now, we need more experiment about mem stat to dec batch size
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
FLOG_INFO("[ADAPTIVE_SCHED] update co merge batch size", K(merge_thread),
|
||||||
|
K(mem_allow_used), K(batch_mem_allow_per_thread), K(mem_allow_batch_size), K(merge_batch_size_));
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObCOMergeDagNet::inner_create_and_schedule_dags(ObIDag *parent_dag)
|
int ObCOMergeDagNet::inner_create_and_schedule_dags(ObIDag *parent_dag)
|
||||||
|
|||||||
@ -318,7 +318,7 @@ private:
|
|||||||
const int64_t max_cg_idx,
|
const int64_t max_cg_idx,
|
||||||
ObCOMergeBatchExeDag *&dag,
|
ObCOMergeBatchExeDag *&dag,
|
||||||
const bool add_scheduler_flag = true);
|
const bool add_scheduler_flag = true);
|
||||||
int choose_merge_batch_size(ObCOTabletMergeCtx &co_ctx);
|
int choose_merge_batch_size(const int64_t column_group_cnt);
|
||||||
int inner_schedule_finish_dag(ObIDag *parent_dag = nullptr);
|
int inner_schedule_finish_dag(ObIDag *parent_dag = nullptr);
|
||||||
void try_update_merge_batch_size(const int64_t column_group_cnt);
|
void try_update_merge_batch_size(const int64_t column_group_cnt);
|
||||||
int inner_create_and_schedule_dags(ObIDag *parent_dag = nullptr);
|
int inner_create_and_schedule_dags(ObIDag *parent_dag = nullptr);
|
||||||
|
|||||||
Reference in New Issue
Block a user