From 9f8a87b9024ba0abbec475d912b4e0b227a418bc Mon Sep 17 00:00:00 2001 From: obdev Date: Thu, 27 Jun 2024 14:03:30 +0000 Subject: [PATCH] set thread_count more than 2 when need_sort is true Co-authored-by: coolfishchen --- .../table_load/ob_table_load_coordinator.cpp | 2 +- .../table_load/ob_table_load_mem_compactor.cpp | 14 +++++++++++++- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/src/observer/table_load/ob_table_load_coordinator.cpp b/src/observer/table_load/ob_table_load_coordinator.cpp index 02424f42ab..edc6671f3b 100644 --- a/src/observer/table_load/ob_table_load_coordinator.cpp +++ b/src/observer/table_load/ob_table_load_coordinator.cpp @@ -323,7 +323,7 @@ int ObTableLoadCoordinator::gen_apply_arg(ObDirectLoadResourceApplyArg &apply_ar if (OB_FAIL(partitions.push_back(all_leader_info_array[i].partition_id_array_.count()))) { LOG_WARN("fail to push back", KR(ret)); } else { - unit.thread_count_ = MAX((ctx_->param_.need_sort_ && ctx_->param_.px_mode_ == false ? 2 : 1), + unit.thread_count_ = MAX((!ctx_->schema_.is_heap_table_ && ctx_->param_.need_sort_ ? 2 : 1), MIN(max_session_count, total_session_count * partitions[i] / total_partitions)); if (OB_FAIL(apply_arg.apply_array_.push_back(unit))) { LOG_WARN("fail to push back", KR(ret)); diff --git a/src/observer/table_load/ob_table_load_mem_compactor.cpp b/src/observer/table_load/ob_table_load_mem_compactor.cpp index 84c1746f24..d426ab12c5 100644 --- a/src/observer/table_load/ob_table_load_mem_compactor.cpp +++ b/src/observer/table_load/ob_table_load_mem_compactor.cpp @@ -269,7 +269,11 @@ int ObTableLoadMemCompactor::inner_init() const uint64_t tenant_id = MTL_ID(); store_ctx_ = compact_ctx_->store_ctx_; param_ = &(store_ctx_->ctx_->param_); - if (OB_FAIL(init_scheduler())) { + if (OB_UNLIKELY(param_->session_count_ < 2)) { + // 排序至少需要两个线程 + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid args", KR(ret), K(param_->session_count_)); + } else if (OB_FAIL(init_scheduler())) { LOG_WARN("fail to init_scheduler", KR(ret)); } else { mem_ctx_.mem_dump_task_count_ = param_->session_count_ / 3; //暂时先写成1/3,后续再优化 @@ -294,6 +298,14 @@ int ObTableLoadMemCompactor::inner_init() mem_ctx_.file_mgr_ = store_ctx_->tmp_file_mgr_; mem_ctx_.dup_action_ = param_->dup_action_; } + + if (OB_SUCC(ret)) { + if (param_->session_count_ - mem_ctx_.mem_dump_task_count_ <= 0) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("mem load thread cannot be zero", K(param_->session_count_), K(mem_ctx_.mem_dump_task_count_), KR(ret)); + } + } + if (OB_SUCC(ret)) { if (OB_FAIL(mem_ctx_.init())) { LOG_WARN("fail to init compactor ctx", KR(ret));