fix direct load build heap table multiple merge task

This commit is contained in:
suz-yang
2024-08-21 07:24:51 +00:00
committed by ob-robot
parent d9203e2b94
commit b263784bf1

View File

@ -634,38 +634,37 @@ int ObDirectLoadTabletMergeCtx::build_heap_table_multiple_merge_task(
merge_task = nullptr;
}
}
if (OB_SUCC(ret)) {
// for imported data, construct task by multiple heap table
for (int64_t i = 0; OB_SUCC(ret) && !param_.is_fast_heap_table_ && i < multiple_heap_table_array_.count(); ++i) {
ObDirectLoadMultipleHeapTable *heap_table = multiple_heap_table_array_.at(i);
ObDirectLoadPartitionHeapTableMultipleMergeTask *merge_task = nullptr;
int64_t row_count = 0;
ObTabletCacheInterval pk_interval;
if (OB_FAIL(heap_table->get_tablet_row_count(tablet_id_, param_.table_data_desc_, row_count))) {
LOG_WARN("fail to get tablet row count", KR(ret), K(tablet_id_));
} else if (0 == row_count) {
// ignore
} else if (OB_FAIL(get_autoincrement_value(row_count, pk_interval))) {
LOG_WARN("fail to get autoincrement value", KR(ret), K(row_count));
} else if (OB_ISNULL(merge_task = OB_NEWx(ObDirectLoadPartitionHeapTableMultipleMergeTask, (&allocator_)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to new ObDirectLoadPartitionHeapTableMultipleMergeTask", KR(ret));
} else if (OB_FAIL(merge_task->init(ctx_, param_, this, heap_table, pk_interval, parallel_idx++))) {
LOG_WARN("fail to init merge task", KR(ret));
} else if (OB_FAIL(task_array_.push_back(merge_task))) {
LOG_WARN("fail to push back merge task", KR(ret));
}
if (OB_FAIL(ret)) {
if (nullptr != merge_task) {
merge_task->~ObDirectLoadPartitionHeapTableMultipleMergeTask();
allocator_.free(merge_task);
merge_task = nullptr;
}
}
if (OB_SUCC(ret)) {
// for imported data, construct task by multiple heap table
for (int64_t i = 0; OB_SUCC(ret) && !param_.is_fast_heap_table_ && i < multiple_heap_table_array_.count(); ++i) {
ObDirectLoadMultipleHeapTable *heap_table = multiple_heap_table_array_.at(i);
ObDirectLoadPartitionHeapTableMultipleMergeTask *merge_task = nullptr;
int64_t row_count = 0;
ObTabletCacheInterval pk_interval;
if (OB_FAIL(heap_table->get_tablet_row_count(tablet_id_, param_.table_data_desc_, row_count))) {
LOG_WARN("fail to get tablet row count", KR(ret), K(tablet_id_));
} else if (0 == row_count) {
// ignore
} else if (OB_FAIL(get_autoincrement_value(row_count, pk_interval))) {
LOG_WARN("fail to get autoincrement value", KR(ret), K(row_count));
} else if (OB_ISNULL(merge_task = OB_NEWx(ObDirectLoadPartitionHeapTableMultipleMergeTask, (&allocator_)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to new ObDirectLoadPartitionHeapTableMultipleMergeTask", KR(ret));
} else if (OB_FAIL(merge_task->init(ctx_, param_, this, heap_table, pk_interval, parallel_idx++))) {
LOG_WARN("fail to init merge task", KR(ret));
} else if (OB_FAIL(task_array_.push_back(merge_task))) {
LOG_WARN("fail to push back merge task", KR(ret));
}
if (OB_FAIL(ret)) {
if (nullptr != merge_task) {
merge_task->~ObDirectLoadPartitionHeapTableMultipleMergeTask();
allocator_.free(merge_task);
merge_task = nullptr;
}
}
}
}
return ret;
}