Fixed a bug in calculating whether to use sorting mode
This commit is contained in:
@ -331,6 +331,7 @@ int ObTableLoadCoordinator::gen_apply_arg(ObDirectLoadResourceApplyArg &apply_ar
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// 协调节点不存在数据分区时,需要申请线程资源,但不需要申请内存,为零
|
||||||
if (OB_SUCC(ret) && !include_cur_addr) {
|
if (OB_SUCC(ret) && !include_cur_addr) {
|
||||||
ObDirectLoadResourceUnit unit;
|
ObDirectLoadResourceUnit unit;
|
||||||
unit.addr_ = coordinator_addr;
|
unit.addr_ = coordinator_addr;
|
||||||
@ -369,6 +370,7 @@ int ObTableLoadCoordinator::gen_apply_arg(ObDirectLoadResourceApplyArg &apply_ar
|
|||||||
ObDirectLoadResourceUnit &unit = apply_arg.apply_array_[i];
|
ObDirectLoadResourceUnit &unit = apply_arg.apply_array_[i];
|
||||||
int64_t min_unsort_memory = 0;
|
int64_t min_unsort_memory = 0;
|
||||||
if (ctx_->schema_.is_heap_table_) {
|
if (ctx_->schema_.is_heap_table_) {
|
||||||
|
// 直接写宏块需要的内存,对于非排序模式,每个分区各自写宏块,所以要乘分区数
|
||||||
min_unsort_memory = MACROBLOCK_BUFFER_SIZE * partitions[i] * write_session_count;
|
min_unsort_memory = MACROBLOCK_BUFFER_SIZE * partitions[i] * write_session_count;
|
||||||
if (min_unsort_memory <= memory_limit) {
|
if (min_unsort_memory <= memory_limit) {
|
||||||
need_sort = false;
|
need_sort = false;
|
||||||
@ -378,13 +380,31 @@ int ObTableLoadCoordinator::gen_apply_arg(ObDirectLoadResourceApplyArg &apply_ar
|
|||||||
unit.memory_size_ = MIN(ObTableLoadAssignedMemoryManager::MIN_SORT_MEMORY_PER_TASK, memory_limit);
|
unit.memory_size_ = MIN(ObTableLoadAssignedMemoryManager::MIN_SORT_MEMORY_PER_TASK, memory_limit);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
min_unsort_memory = MACROBLOCK_BUFFER_SIZE * partitions[i] * unit.thread_count_;
|
// 取写宏块或写临时文件需要内存的最小值,对于非排序模式,每个分区各自写临时文件,所以要乘分区数
|
||||||
|
min_unsort_memory = SSTABLE_BUFFER_SIZE * partitions[i] * unit.thread_count_;
|
||||||
if (need_sort) {
|
if (need_sort) {
|
||||||
unit.memory_size_ = MIN(ObTableLoadAssignedMemoryManager::MIN_SORT_MEMORY_PER_TASK, memory_limit);
|
unit.memory_size_ = MIN(ObTableLoadAssignedMemoryManager::MIN_SORT_MEMORY_PER_TASK, memory_limit);
|
||||||
} else {
|
} else {
|
||||||
unit.memory_size_ = MIN(min_unsort_memory, memory_limit);
|
// hint指定不排序,如果不排序内存大于内存上限,要改成走排序模式,一般是分区数较大的场景
|
||||||
|
if (min_unsort_memory < memory_limit) {
|
||||||
|
unit.memory_size_ = MAX(min_unsort_memory, MACROBLOCK_BUFFER_SIZE * write_session_count);
|
||||||
|
} else {
|
||||||
|
need_sort = true;
|
||||||
|
unit.memory_size_ = MIN(ObTableLoadAssignedMemoryManager::MIN_SORT_MEMORY_PER_TASK, memory_limit);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (need_sort) {
|
||||||
|
// 只要有一个节点走排序模式,所有节点统一走排序模式
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (need_sort) {
|
||||||
|
// 排序模式,所有节点都分配固定的最小排序内存
|
||||||
|
for (int64_t i = 0; i < store_server_count; i++) {
|
||||||
|
ObDirectLoadResourceUnit &unit = apply_arg.apply_array_[i];
|
||||||
|
unit.memory_size_ = MIN(ObTableLoadAssignedMemoryManager::MIN_SORT_MEMORY_PER_TASK, memory_limit);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -85,10 +85,10 @@ public:
|
|||||||
};
|
};
|
||||||
|
|
||||||
enum class ObTableLoadExeMode {
|
enum class ObTableLoadExeMode {
|
||||||
FAST_HEAP_TABLE = 0,
|
FAST_HEAP_TABLE = 0, //快速堆表
|
||||||
GENERAL_TABLE_COMPACT = 1,
|
GENERAL_TABLE_COMPACT = 1, // 非堆表不排序
|
||||||
MULTIPLE_HEAP_TABLE_COMPACT = 2,
|
MULTIPLE_HEAP_TABLE_COMPACT = 2, //堆表排序
|
||||||
MEM_COMPACT = 3,
|
MEM_COMPACT = 3, //非堆表排序
|
||||||
MAX_TYPE
|
MAX_TYPE
|
||||||
};
|
};
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user