set thread_count more than 2 when need_sort is true

Co-authored-by: coolfishchen <coolfishchen@gmail.com>
This commit is contained in:
obdev
2024-06-27 14:03:30 +00:00
committed by ob-robot
parent b4815393cc
commit 9f8a87b902
2 changed files with 14 additions and 2 deletions

View File

@ -323,7 +323,7 @@ int ObTableLoadCoordinator::gen_apply_arg(ObDirectLoadResourceApplyArg &apply_ar
if (OB_FAIL(partitions.push_back(all_leader_info_array[i].partition_id_array_.count()))) { if (OB_FAIL(partitions.push_back(all_leader_info_array[i].partition_id_array_.count()))) {
LOG_WARN("fail to push back", KR(ret)); LOG_WARN("fail to push back", KR(ret));
} else { } else {
unit.thread_count_ = MAX((ctx_->param_.need_sort_ && ctx_->param_.px_mode_ == false ? 2 : 1), unit.thread_count_ = MAX((!ctx_->schema_.is_heap_table_ && ctx_->param_.need_sort_ ? 2 : 1),
MIN(max_session_count, total_session_count * partitions[i] / total_partitions)); MIN(max_session_count, total_session_count * partitions[i] / total_partitions));
if (OB_FAIL(apply_arg.apply_array_.push_back(unit))) { if (OB_FAIL(apply_arg.apply_array_.push_back(unit))) {
LOG_WARN("fail to push back", KR(ret)); LOG_WARN("fail to push back", KR(ret));

View File

@ -269,7 +269,11 @@ int ObTableLoadMemCompactor::inner_init()
const uint64_t tenant_id = MTL_ID(); const uint64_t tenant_id = MTL_ID();
store_ctx_ = compact_ctx_->store_ctx_; store_ctx_ = compact_ctx_->store_ctx_;
param_ = &(store_ctx_->ctx_->param_); param_ = &(store_ctx_->ctx_->param_);
if (OB_FAIL(init_scheduler())) { if (OB_UNLIKELY(param_->session_count_ < 2)) {
// 排序至少需要两个线程
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", KR(ret), K(param_->session_count_));
} else if (OB_FAIL(init_scheduler())) {
LOG_WARN("fail to init_scheduler", KR(ret)); LOG_WARN("fail to init_scheduler", KR(ret));
} else { } else {
mem_ctx_.mem_dump_task_count_ = param_->session_count_ / 3; //暂时先写成1/3,后续再优化 mem_ctx_.mem_dump_task_count_ = param_->session_count_ / 3; //暂时先写成1/3,后续再优化
@ -294,6 +298,14 @@ int ObTableLoadMemCompactor::inner_init()
mem_ctx_.file_mgr_ = store_ctx_->tmp_file_mgr_; mem_ctx_.file_mgr_ = store_ctx_->tmp_file_mgr_;
mem_ctx_.dup_action_ = param_->dup_action_; mem_ctx_.dup_action_ = param_->dup_action_;
} }
if (OB_SUCC(ret)) {
if (param_->session_count_ - mem_ctx_.mem_dump_task_count_ <= 0) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("mem load thread cannot be zero", K(param_->session_count_), K(mem_ctx_.mem_dump_task_count_), KR(ret));
}
}
if (OB_SUCC(ret)) { if (OB_SUCC(ret)) {
if (OB_FAIL(mem_ctx_.init())) { if (OB_FAIL(mem_ctx_.init())) {
LOG_WARN("fail to init compactor ctx", KR(ret)); LOG_WARN("fail to init compactor ctx", KR(ret));