Fix direct load multiple merge range splitter handle origin table

This commit is contained in:
suz-yang
2023-07-19 14:12:11 +00:00
committed by ob-robot
parent 4a023c0a24
commit c459d3c14c
2 changed files with 102 additions and 131 deletions

View File

@ -187,6 +187,64 @@ int ObDirectLoadRangeSplitUtils::construct_multiple_rowkey_iter(
return ret;
}
int ObDirectLoadRangeSplitUtils::construct_origin_table_rowkey_iters(
ObDirectLoadOriginTable *origin_table,
const ObDatumRange &scan_range,
ObIAllocator &allocator,
int64_t &total_block_count,
ObIArray<ObIDirectLoadDatumRowkeyIterator *> &rowkey_iters)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(nullptr == origin_table || !origin_table->is_valid() || !scan_range.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", KR(ret), KPC(origin_table), K(scan_range));
} else {
const ObITableReadInfo &read_info =
origin_table->get_tablet_handle().get_obj()->get_rowkey_read_info();
if (nullptr != origin_table->get_major_sstable()) {
ObSSTable *major_sstable = origin_table->get_major_sstable();
ObIDirectLoadDatumRowkeyIterator *rowkey_iter = nullptr;
if (OB_FAIL(ObDirectLoadRangeSplitUtils::construct_rowkey_iter(
major_sstable, scan_range, read_info, allocator, rowkey_iter))) {
LOG_WARN("fail to construct rowkey iter", KR(ret));
} else if (OB_FAIL(rowkey_iters.push_back(rowkey_iter))) {
LOG_WARN("fail to push back rowkey iter", KR(ret));
} else {
total_block_count += major_sstable->get_data_macro_block_count();
}
if (OB_FAIL(ret)) {
if (nullptr != rowkey_iter) {
rowkey_iter->~ObIDirectLoadDatumRowkeyIterator();
allocator.free(rowkey_iter);
rowkey_iter = nullptr;
}
}
} else {
const ObIArray<blocksstable::ObSSTable *> &ddl_sstables = origin_table->get_ddl_sstables();
for (int64_t i = 0; OB_SUCC(ret) && i < ddl_sstables.count(); ++i) {
ObSSTable *ddl_sstable = ddl_sstables.at(i);
ObIDirectLoadDatumRowkeyIterator *rowkey_iter = nullptr;
if (OB_FAIL(ObDirectLoadRangeSplitUtils::construct_rowkey_iter(
ddl_sstable, scan_range, read_info, allocator, rowkey_iter))) {
LOG_WARN("fail to construct rowkey iter", KR(ret));
} else if (OB_FAIL(rowkey_iters.push_back(rowkey_iter))) {
LOG_WARN("fail to push back rowkey iter", KR(ret));
} else {
total_block_count += ddl_sstable->get_data_macro_block_count();
}
if (OB_FAIL(ret)) {
if (nullptr != rowkey_iter) {
rowkey_iter->~ObIDirectLoadDatumRowkeyIterator();
allocator.free(rowkey_iter);
rowkey_iter = nullptr;
}
}
}
}
}
return ret;
}
/**
* ObDirectLoadRowkeyMergeRangeSplitter
*/
@ -455,8 +513,9 @@ int ObDirectLoadMergeRangeSplitter::init(ObDirectLoadOriginTable *origin_table,
} else {
allocator_.set_tenant_id(MTL_ID());
scan_range_.set_whole_range();
if (OB_FAIL(construct_origin_table_rowkey_iter(origin_table))) {
LOG_WARN("fail to construct origin sstable rowkey itre", KR(ret));
if (OB_FAIL(ObDirectLoadRangeSplitUtils::construct_origin_table_rowkey_iters(
origin_table, scan_range_, allocator_, total_block_count_, rowkey_iters_))) {
LOG_WARN("fail to construct origin table rowkey iters", KR(ret));
} else if (OB_FAIL(construct_sstable_rowkey_iters(sstable_array))) {
LOG_WARN("fail to construct sstable rowkey itres", KR(ret));
} else if (OB_FAIL(
@ -471,55 +530,6 @@ int ObDirectLoadMergeRangeSplitter::init(ObDirectLoadOriginTable *origin_table,
return ret;
}
int ObDirectLoadMergeRangeSplitter::construct_origin_table_rowkey_iter(
ObDirectLoadOriginTable *origin_table)
{
int ret = OB_SUCCESS;
const ObITableReadInfo &read_info =
origin_table->get_tablet_handle().get_obj()->get_rowkey_read_info();
if (nullptr != origin_table->get_major_sstable()) {
ObSSTable *major_sstable = origin_table->get_major_sstable();
ObIDirectLoadDatumRowkeyIterator *rowkey_iter = nullptr;
if (OB_FAIL(ObDirectLoadRangeSplitUtils::construct_rowkey_iter(
major_sstable, scan_range_, read_info, allocator_, rowkey_iter))) {
LOG_WARN("fail to construct rowkey iter", KR(ret));
} else if (OB_FAIL(rowkey_iters_.push_back(rowkey_iter))) {
LOG_WARN("fail to push back rowkey iter", KR(ret));
} else {
total_block_count_ += major_sstable->get_data_macro_block_count();
}
if (OB_FAIL(ret)) {
if (nullptr != rowkey_iter) {
rowkey_iter->~ObIDirectLoadDatumRowkeyIterator();
allocator_.free(rowkey_iter);
rowkey_iter = nullptr;
}
}
} else {
const ObIArray<blocksstable::ObSSTable *> &ddl_sstables = origin_table->get_ddl_sstables();
for (int64_t i = 0; OB_SUCC(ret) && i < ddl_sstables.count(); ++i) {
ObSSTable *ddl_sstable = ddl_sstables.at(i);
ObIDirectLoadDatumRowkeyIterator *rowkey_iter = nullptr;
if (OB_FAIL(ObDirectLoadRangeSplitUtils::construct_rowkey_iter(
ddl_sstable, scan_range_, read_info, allocator_, rowkey_iter))) {
LOG_WARN("fail to construct rowkey iter", KR(ret));
} else if (OB_FAIL(rowkey_iters_.push_back(rowkey_iter))) {
LOG_WARN("fail to push back rowkey iter", KR(ret));
} else {
total_block_count_ += ddl_sstable->get_data_macro_block_count();
}
if (OB_FAIL(ret)) {
if (nullptr != rowkey_iter) {
rowkey_iter->~ObIDirectLoadDatumRowkeyIterator();
allocator_.free(rowkey_iter);
rowkey_iter = nullptr;
}
}
}
}
return ret;
}
int ObDirectLoadMergeRangeSplitter::construct_sstable_rowkey_iters(
const ObIArray<ObDirectLoadSSTable *> &sstable_array)
{
@ -601,8 +611,9 @@ int ObDirectLoadMultipleMergeTabletRangeSplitter::init(
tablet_id_ = tablet_id;
allocator_.set_tenant_id(MTL_ID());
scan_range_.set_whole_range();
if (OB_FAIL(construct_origin_table_rowkey_iter(origin_table))) {
LOG_WARN("fail to construct origin sstable rowkey itre", KR(ret));
if (OB_FAIL(ObDirectLoadRangeSplitUtils::construct_origin_table_rowkey_iters(
origin_table, scan_range_, allocator_, total_block_count_, rowkey_iters_))) {
LOG_WARN("fail to construct origin table rowkey iters", KR(ret));
} else if (OB_FAIL(
construct_sstable_rowkey_iters(sstable_array, table_data_desc, datum_utils))) {
LOG_WARN("fail to construct sstable rowkey itres", KR(ret));
@ -618,55 +629,6 @@ int ObDirectLoadMultipleMergeTabletRangeSplitter::init(
return ret;
}
int ObDirectLoadMultipleMergeTabletRangeSplitter::construct_origin_table_rowkey_iter(
ObDirectLoadOriginTable *origin_table)
{
int ret = OB_SUCCESS;
const ObITableReadInfo &read_info =
origin_table->get_tablet_handle().get_obj()->get_rowkey_read_info();
if (nullptr != origin_table->get_major_sstable()) {
ObSSTable *major_sstable = origin_table->get_major_sstable();
ObIDirectLoadDatumRowkeyIterator *rowkey_iter = nullptr;
if (OB_FAIL(ObDirectLoadRangeSplitUtils::construct_rowkey_iter(
major_sstable, scan_range_, read_info, allocator_, rowkey_iter))) {
LOG_WARN("fail to construct rowkey iter", KR(ret));
} else if (OB_FAIL(rowkey_iters_.push_back(rowkey_iter))) {
LOG_WARN("fail to push back rowkey iter", KR(ret));
} else {
total_block_count_ += major_sstable->get_data_macro_block_count();
}
if (OB_FAIL(ret)) {
if (nullptr != rowkey_iter) {
rowkey_iter->~ObIDirectLoadDatumRowkeyIterator();
allocator_.free(rowkey_iter);
rowkey_iter = nullptr;
}
}
} else {
const ObIArray<blocksstable::ObSSTable *> &ddl_sstables = origin_table->get_ddl_sstables();
for (int64_t i = 0; OB_SUCC(ret) && i < ddl_sstables.count(); ++i) {
ObSSTable *ddl_sstable = ddl_sstables.at(i);
ObIDirectLoadDatumRowkeyIterator *rowkey_iter = nullptr;
if (OB_FAIL(ObDirectLoadRangeSplitUtils::construct_rowkey_iter(
ddl_sstable, scan_range_, read_info, allocator_, rowkey_iter))) {
LOG_WARN("fail to construct rowkey iter", KR(ret));
} else if (OB_FAIL(rowkey_iters_.push_back(rowkey_iter))) {
LOG_WARN("fail to push back rowkey iter", KR(ret));
} else {
total_block_count_ += ddl_sstable->get_data_macro_block_count();
}
if (OB_FAIL(ret)) {
if (nullptr != rowkey_iter) {
rowkey_iter->~ObIDirectLoadDatumRowkeyIterator();
allocator_.free(rowkey_iter);
rowkey_iter = nullptr;
}
}
}
}
return ret;
}
int ObDirectLoadMultipleMergeTabletRangeSplitter::construct_sstable_rowkey_iters(
const ObIArray<ObDirectLoadMultipleSSTable *> &sstable_array,
const ObDirectLoadTableDataDesc &table_data_desc,
@ -846,38 +808,43 @@ int ObDirectLoadMultipleMergeRangeSplitter::get_rowkeys_by_origin(
ObIAllocator &allocator)
{
int ret = OB_SUCCESS;
ObIDirectLoadDatumRowkeyIterator *rowkey_iter = nullptr;
ObDirectLoadDatumRowkeyCompare compare;
RowkeyMerger rowkey_merger;
ObDatumRange scan_range;
scan_range.set_whole_range();
if (OB_FAIL(ObDirectLoadRangeSplitUtils::construct_rowkey_iter(
origin_table->get_major_sstable(), scan_range,
origin_table->get_tablet_handle().get_obj()->get_rowkey_read_info(), allocator,
rowkey_iter))) {
LOG_WARN("fail to construct rowkey iter", KR(ret));
} else {
const ObDatumRowkey *datum_rowkey = nullptr;
ObDatumRowkey copied_rowkey;
int64_t count = 0;
while (OB_SUCC(ret)) {
if (OB_FAIL(rowkey_iter->get_next_rowkey(datum_rowkey))) {
if (OB_UNLIKELY(OB_ITER_END != ret)) {
LOG_WARN("fail to get next rowkey", KR(ret));
} else {
ret = OB_SUCCESS;
break;
}
} else if (++count >= BLOCK_COUNT_PER_RANGE) {
if (OB_FAIL(datum_rowkey->deep_copy(copied_rowkey, allocator))) {
LOG_WARN("fail to deep copy rowkey", KR(ret));
} else if (OB_FAIL(rowkey_array.push_back(copied_rowkey))) {
LOG_WARN("fail to push back", KR(ret));
} else {
count = 0;
}
ObArray<ObIDirectLoadDatumRowkeyIterator *> rowkey_iters;
int64_t unused_total_block_count = 0;
const ObDatumRowkey *datum_rowkey = nullptr;
ObDatumRowkey copied_rowkey;
int64_t count = 0;
if (OB_FAIL(ObDirectLoadRangeSplitUtils::construct_origin_table_rowkey_iters(
origin_table, scan_range, allocator, unused_total_block_count, rowkey_iters))) {
LOG_WARN("fail to construct origin table rowkey iters", KR(ret));
} else if (OB_FAIL(compare.init(*datum_utils_))) {
LOG_WARN("fail to init rowkey compare", KR(ret));
} else if (OB_FAIL(rowkey_merger.init(rowkey_iters, &compare))) {
LOG_WARN("fail to init rowkey merger", KR(ret));
}
while (OB_SUCC(ret)) {
if (OB_FAIL(rowkey_merger.get_next_rowkey(datum_rowkey))) {
if (OB_UNLIKELY(OB_ITER_END != ret)) {
LOG_WARN("fail to get next rowkey", KR(ret));
} else {
ret = OB_SUCCESS;
break;
}
} else if (++count >= BLOCK_COUNT_PER_RANGE) {
if (OB_FAIL(datum_rowkey->deep_copy(copied_rowkey, allocator))) {
LOG_WARN("fail to deep copy rowkey", KR(ret));
} else if (OB_FAIL(rowkey_array.push_back(copied_rowkey))) {
LOG_WARN("fail to push back", KR(ret));
} else {
count = 0;
}
}
}
if (nullptr != rowkey_iter) {
for (int64_t i = 0; i < rowkey_iters.count(); ++i) {
ObIDirectLoadDatumRowkeyIterator *rowkey_iter = rowkey_iters.at(i);
rowkey_iter->~ObIDirectLoadDatumRowkeyIterator();
allocator.free(rowkey_iter);
rowkey_iter = nullptr;

View File

@ -43,6 +43,12 @@ public:
const ObDirectLoadTableDataDesc &table_data_desc,
common::ObIAllocator &allocator,
ObIDirectLoadMultipleDatumRowkeyIterator *&rowkey_iter);
// append to rowkey_iters
static int construct_origin_table_rowkey_iters(ObDirectLoadOriginTable *origin_table,
const blocksstable::ObDatumRange &scan_range,
common::ObIAllocator &allocator,
int64_t &total_block_count,
common::ObIArray<ObIDirectLoadDatumRowkeyIterator *> &rowkey_iters);
};
class ObDirectLoadRowkeyMergeRangeSplitter
@ -101,7 +107,6 @@ public:
int split_range(common::ObIArray<blocksstable::ObDatumRange> &range_array,
int64_t max_range_count, common::ObIAllocator &allocator);
private:
int construct_origin_table_rowkey_iter(ObDirectLoadOriginTable *origin_table);
int construct_sstable_rowkey_iters(const common::ObIArray<ObDirectLoadSSTable *> &sstable_array);
private:
common::ObArenaAllocator allocator_;
@ -125,7 +130,6 @@ public:
int split_range(common::ObIArray<blocksstable::ObDatumRange> &range_array,
int64_t max_range_count, common::ObIAllocator &allocator);
private:
int construct_origin_table_rowkey_iter(ObDirectLoadOriginTable *origin_table);
int construct_sstable_rowkey_iters(
const common::ObIArray<ObDirectLoadMultipleSSTable *> &sstable_array,
const ObDirectLoadTableDataDesc &table_data_desc,