From d0e5730bc3997f9950d38de5c2aba61bd8ce2317 Mon Sep 17 00:00:00 2001 From: Hongqin-Li Date: Wed, 30 Oct 2024 17:44:17 +0000 Subject: [PATCH] [CP] Fix incorrect row flag caused by split lob meta minor --- src/storage/ddl/ob_tablet_lob_split_task.cpp | 2 +- src/storage/ddl/ob_tablet_split_task.cpp | 131 +++++++++++++++++-- src/storage/ddl/ob_tablet_split_task.h | 11 +- 3 files changed, 132 insertions(+), 12 deletions(-) diff --git a/src/storage/ddl/ob_tablet_lob_split_task.cpp b/src/storage/ddl/ob_tablet_lob_split_task.cpp index 75d2995a3..a90746136 100644 --- a/src/storage/ddl/ob_tablet_lob_split_task.cpp +++ b/src/storage/ddl/ob_tablet_lob_split_task.cpp @@ -1840,7 +1840,7 @@ int ObTabletLobSplitUtil::open_uncommitted_scan_iters(ObLobSplitParam *param, ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail to alloc row scan", K(ret)); } else if (FALSE_IT(new_scanner = new(buff)ObUncommittedRowScan())) { - } else if (OB_FAIL(new_scanner->init(scan_param, *sst, major_snapshot_version))) { + } else if (OB_FAIL(new_scanner->init(scan_param, *sst, major_snapshot_version, ObLobMetaUtil::LOB_META_COLUMN_CNT))) { LOG_WARN("fail to init row scanner", K(ret)); } else if (OB_FAIL(iters.push_back(ObIStoreRowIteratorPtr(new_scanner)))) { LOG_WARN("fail to push back new row scanner", K(ret)); diff --git a/src/storage/ddl/ob_tablet_split_task.cpp b/src/storage/ddl/ob_tablet_split_task.cpp index ebd712f9b..c7efcaa1f 100644 --- a/src/storage/ddl/ob_tablet_split_task.cpp +++ b/src/storage/ddl/ob_tablet_split_task.cpp @@ -2114,7 +2114,7 @@ int ObSnapshotRowScan::get_next_row(const ObDatumRow *&out_row) } ObUncommittedRowScan::ObUncommittedRowScan() - : row_scan_(), major_snapshot_version_(OB_INVALID_TIMESTAMP), trans_version_col_idx_(0) + : row_scan_(), row_scan_end_(false), next_row_(nullptr), major_snapshot_version_(OB_INVALID_TIMESTAMP), trans_version_col_idx_(0), row_queue_(), row_queue_allocator_(), row_queue_has_unskippable_row_(false) { } @@ -2125,15 +2125,24 @@ ObUncommittedRowScan::~ObUncommittedRowScan() int ObUncommittedRowScan::init( const ObSplitScanParam param, ObSSTable &src_sstable, - const int64_t major_snapshot_version) + const int64_t major_snapshot_version, + const int64_t schema_column_cnt) { int ret = OB_SUCCESS; if (OB_FAIL(row_scan_.init(param, src_sstable))) { LOG_WARN("failed to init", K(ret)); } else { + row_scan_end_ = false; + next_row_ = nullptr; major_snapshot_version_ = major_snapshot_version; trans_version_col_idx_ = ObMultiVersionRowkeyHelpper::get_trans_version_col_store_index( row_scan_.get_rowkey_read_info()->get_schema_rowkey_count(), true); + if (OB_FAIL(row_queue_.init(schema_column_cnt + storage::ObMultiVersionRowkeyHelpper::get_extra_rowkey_col_cnt()))) { + LOG_WARN("failed to init row queue", K(ret)); + } else { + row_queue_allocator_.reset(); + row_queue_has_unskippable_row_ = false; + } } return ret; } @@ -2142,23 +2151,125 @@ int ObUncommittedRowScan::get_next_row(const ObDatumRow *&res_row) { int ret = OB_SUCCESS; const ObDatumRow *row = nullptr; - bool can_skip = true; res_row = nullptr; - while (OB_SUCC(ret) && can_skip) { - if (OB_FAIL(row_scan_.get_next_row(row))) { - if (OB_UNLIKELY(OB_ITER_END != ret)) { - LOG_WARN("fail to get next row", K(ret)); + while (OB_SUCC(ret) && !row_queue_.has_next()) { + if (OB_FAIL(get_next_rowkey_rows())) { + if (OB_ITER_END != ret) { + LOG_WARN("failed to get next rowkey rows", K(ret)); } - } else if (OB_FAIL(check_can_skip(*row, can_skip))) { - LOG_WARN("failed to check can skip", K(ret)); + } else if (!row_queue_has_unskippable_row_) { + row_queue_reuse(); } } - if (OB_SUCC(ret)) { + if (OB_FAIL(ret)) { + } else if (OB_FAIL(row_queue_.get_next_row(row))) { + LOG_WARN("failed to get next row", K(ret)); + } else { res_row = row; } return ret; }; +int ObUncommittedRowScan::get_next_rowkey_rows() +{ + int ret = OB_SUCCESS; + const ObDatumRow *row = nullptr; + if (OB_UNLIKELY(row_queue_.has_next())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("previous rowkey rows still exist", K(ret)); + } + + // collect rows from next_row_ to row_queue_ + if (OB_SUCC(ret) && OB_NOT_NULL(next_row_)) { + if (OB_FAIL(row_queue_add(*next_row_))) { + LOG_WARN("failed to add row", K(ret)); + } else { + next_row_ = nullptr; + } + } + + // collect rows from row_scan_ to row_queue_ or next_row_ + while (OB_SUCC(ret) && !row_scan_end_ && nullptr == next_row_) { + if (OB_FAIL(row_scan_.get_next_row(row))) { + if (OB_ITER_END == ret) { + row_scan_end_ = true; + ret = OB_SUCCESS; + } + } else if (OB_ISNULL(row)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("invalid row", K(ret)); + } else if (OB_FAIL(row_queue_add(*row))) { + if (OB_SIZE_OVERFLOW == ret) { + next_row_ = row; + ret = OB_SUCCESS; + } else { + LOG_WARN("failed to add row", K(ret)); + } + } + } + + if (OB_SUCC(ret) && !row_queue_.has_next()) { + ret = OB_ITER_END; + } + return ret; +} + +int ObUncommittedRowScan::row_queue_add(const ObDatumRow &row) +{ + int ret = OB_SUCCESS; + bool rowkey_changed = false; + if (row_queue_.has_next()) { + const int64_t schema_rowkey_cnt = row_scan_.get_rowkey_read_info()->get_schema_rowkey_count(); + const blocksstable::ObStorageDatumUtils &datum_utils = row_scan_.get_rowkey_read_info()->get_datum_utils(); + const ObDatumRow *last_row_in_queue = row_queue_.get_last(); + ObDatumRowkey cur_key; + ObDatumRowkey last_key; + int compare_result = 0; + if (OB_ISNULL(last_row_in_queue)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected last row is nullptr", K(ret), K(row_queue_)); + } else if (OB_FAIL(last_key.assign(last_row_in_queue->storage_datums_, schema_rowkey_cnt))) { + LOG_WARN("failed to assign qu rowkey", K(ret)); + } else if (OB_FAIL(cur_key.assign(row.storage_datums_, schema_rowkey_cnt))) { + LOG_WARN("failed to assign cur key", K(ret)); + } else if (OB_FAIL(cur_key.compare(last_key, datum_utils, compare_result))) { + LOG_WARN("failed to compare last key", K(ret), K(cur_key), K(last_key)); + } else if (OB_UNLIKELY(compare_result < 0)) { + ret = OB_ROWKEY_ORDER_ERROR; + LOG_ERROR("input rowkey is less then last rowkey", K(ret), K(cur_key), K(last_key), K(ret)); + } else if (compare_result > 0) { + rowkey_changed = true; + } + } + + if (OB_SUCC(ret) && !rowkey_changed) { + bool can_skip = false; + if (OB_FAIL(row_queue_.add_row(row, row_queue_allocator_))) { + LOG_WARN("failed to add row", K(ret)); + } else if (OB_FAIL(check_can_skip(row, can_skip))) { + LOG_WARN("failed to check can skip", K(ret)); + } else if (!can_skip) { + row_queue_has_unskippable_row_ = true; + } + } + + if (OB_SUCC(ret) && rowkey_changed) { + ret = OB_SIZE_OVERFLOW; + } else if (OB_SIZE_OVERFLOW == ret) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("change errcode", K(ret), "real_ret", OB_SIZE_OVERFLOW); + } + return ret; +} + +void ObUncommittedRowScan::row_queue_reuse() +{ + row_queue_.reuse(); + row_queue_allocator_.reuse(); + row_queue_has_unskippable_row_ = false; + return; +} + int ObUncommittedRowScan::check_can_skip(const ObDatumRow &row, bool &can_skip) { int ret = OB_SUCCESS; diff --git a/src/storage/ddl/ob_tablet_split_task.h b/src/storage/ddl/ob_tablet_split_task.h index 47d9e24a0..a7305e14d 100644 --- a/src/storage/ddl/ob_tablet_split_task.h +++ b/src/storage/ddl/ob_tablet_split_task.h @@ -383,14 +383,23 @@ public: int init( const ObSplitScanParam param, ObSSTable &src_sstable, - const int64_t major_snapshot_version); + const int64_t major_snapshot_version, + const int64_t schema_column_cnt); virtual int get_next_row(const blocksstable::ObDatumRow *&tmp_row) override; private: + int get_next_rowkey_rows(); + int row_queue_add(const ObDatumRow &row); + void row_queue_reuse(); int check_can_skip(const blocksstable::ObDatumRow &row, bool &can_skip); private: ObRowScan row_scan_; + bool row_scan_end_; + const ObDatumRow *next_row_; int64_t major_snapshot_version_; int64_t trans_version_col_idx_; + blocksstable::ObRowQueue row_queue_; + ObArenaAllocator row_queue_allocator_; + bool row_queue_has_unskippable_row_; }; struct ObTabletSplitUtil final