fix bug of applying memory for fast heap mode

This commit is contained in:
obdev 2024-08-27 16:29:36 +00:00 committed by ob-robot
parent 478d5d29de
commit 27fca33fd0

View File

@ -298,6 +298,7 @@ int ObTableLoadCoordinator::gen_apply_arg(ObDirectLoadResourceApplyArg &apply_ar
ObArray<int64_t> partitions;
int64_t store_server_count = all_leader_info_array.count();
int64_t coordinator_session_count = 0;
int64_t write_session_count = 0;
int64_t min_session_count = MAX(ctx_->param_.parallel_, 2);
int64_t max_session_count = (int64_t)tenant->unit_max_cpu() * 2;
int64_t total_session_count = MIN(ctx_->param_.parallel_, max_session_count * store_server_count);
@ -351,27 +352,38 @@ int ObTableLoadCoordinator::gen_apply_arg(ObDirectLoadResourceApplyArg &apply_ar
}
}
}
}
for (int64_t i = 0; OB_SUCC(ret) && i < store_server_count; i++) {
ObDirectLoadResourceUnit &unit = apply_arg.apply_array_[i];
if (all_leader_info_array[i].addr_ == coordinator_addr) {
coordinator_session_count = unit.thread_count_;
}
min_session_count = MIN(min_session_count, unit.thread_count_);
int64_t min_unsort_memory = MACROBLOCK_BUFFER_SIZE * partitions[i] * unit.thread_count_;
if (ctx_->schema_.is_heap_table_) {
if (min_unsort_memory <= memory_limit) {
need_sort = false;
unit.memory_size_ = min_unsort_memory;
} else {
need_sort = ctx_->param_.need_sort_; // allow forced non-sorting
unit.memory_size_ = MIN(ObTableLoadAssignedMemoryManager::MIN_SORT_MEMORY_PER_TASK, memory_limit);
for (int64_t i = 0; i < store_server_count; i++) {
ObDirectLoadResourceUnit &unit = apply_arg.apply_array_[i];
if (all_leader_info_array[i].addr_ == coordinator_addr) {
coordinator_session_count = unit.thread_count_;
}
min_session_count = MIN(min_session_count, unit.thread_count_);
}
if (include_cur_addr && ctx_->param_.load_mode_ == ObDirectLoadMode::LOAD_DATA) {
// 协调节点和数据节点都在同一个节点上,对于load data模式,分出一半的线程用于解析数据,一半的线程用于存储数据
write_session_count = MIN(min_session_count, (coordinator_session_count + 1) / 2);
} else {
if (need_sort) {
unit.memory_size_ = MIN(ObTableLoadAssignedMemoryManager::MIN_SORT_MEMORY_PER_TASK, memory_limit);
write_session_count = min_session_count;
}
for (int64_t i = 0; i < store_server_count; i++) {
ObDirectLoadResourceUnit &unit = apply_arg.apply_array_[i];
int64_t min_unsort_memory = 0;
if (ctx_->schema_.is_heap_table_) {
min_unsort_memory = MACROBLOCK_BUFFER_SIZE * partitions[i] * write_session_count;
if (min_unsort_memory <= memory_limit) {
need_sort = false;
unit.memory_size_ = min_unsort_memory;
} else {
need_sort = ctx_->param_.need_sort_; // allow forced non-sorting
unit.memory_size_ = MIN(ObTableLoadAssignedMemoryManager::MIN_SORT_MEMORY_PER_TASK, memory_limit);
}
} else {
unit.memory_size_ = MIN(min_unsort_memory, memory_limit);
min_unsort_memory = MACROBLOCK_BUFFER_SIZE * partitions[i] * unit.thread_count_;
if (need_sort) {
unit.memory_size_ = MIN(ObTableLoadAssignedMemoryManager::MIN_SORT_MEMORY_PER_TASK, memory_limit);
} else {
unit.memory_size_ = MIN(min_unsort_memory, memory_limit);
}
}
}
}
@ -390,8 +402,7 @@ int ObTableLoadCoordinator::gen_apply_arg(ObDirectLoadResourceApplyArg &apply_ar
} else {
ctx_->param_.need_sort_ = need_sort;
ctx_->param_.session_count_ = coordinator_session_count;
ctx_->param_.write_session_count_ =
(include_cur_addr ? MIN(min_session_count, (coordinator_session_count + 1) / 2) : min_session_count);
ctx_->param_.write_session_count_ = write_session_count;
ctx_->param_.exe_mode_ = (ctx_->schema_.is_heap_table_ ?
(need_sort ? ObTableLoadExeMode::MULTIPLE_HEAP_TABLE_COMPACT : ObTableLoadExeMode::FAST_HEAP_TABLE) :
(need_sort ? ObTableLoadExeMode::MEM_COMPACT : ObTableLoadExeMode::GENERAL_TABLE_COMPACT));