[CP] Fix incorrect row flag caused by split lob meta minor
This commit is contained in:
parent
19c0279bcc
commit
d0e5730bc3
@ -1840,7 +1840,7 @@ int ObTabletLobSplitUtil::open_uncommitted_scan_iters(ObLobSplitParam *param,
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_WARN("fail to alloc row scan", K(ret));
|
||||
} else if (FALSE_IT(new_scanner = new(buff)ObUncommittedRowScan())) {
|
||||
} else if (OB_FAIL(new_scanner->init(scan_param, *sst, major_snapshot_version))) {
|
||||
} else if (OB_FAIL(new_scanner->init(scan_param, *sst, major_snapshot_version, ObLobMetaUtil::LOB_META_COLUMN_CNT))) {
|
||||
LOG_WARN("fail to init row scanner", K(ret));
|
||||
} else if (OB_FAIL(iters.push_back(ObIStoreRowIteratorPtr(new_scanner)))) {
|
||||
LOG_WARN("fail to push back new row scanner", K(ret));
|
||||
|
@ -2114,7 +2114,7 @@ int ObSnapshotRowScan::get_next_row(const ObDatumRow *&out_row)
|
||||
}
|
||||
|
||||
ObUncommittedRowScan::ObUncommittedRowScan()
|
||||
: row_scan_(), major_snapshot_version_(OB_INVALID_TIMESTAMP), trans_version_col_idx_(0)
|
||||
: row_scan_(), row_scan_end_(false), next_row_(nullptr), major_snapshot_version_(OB_INVALID_TIMESTAMP), trans_version_col_idx_(0), row_queue_(), row_queue_allocator_(), row_queue_has_unskippable_row_(false)
|
||||
{
|
||||
}
|
||||
|
||||
@ -2125,15 +2125,24 @@ ObUncommittedRowScan::~ObUncommittedRowScan()
|
||||
int ObUncommittedRowScan::init(
|
||||
const ObSplitScanParam param,
|
||||
ObSSTable &src_sstable,
|
||||
const int64_t major_snapshot_version)
|
||||
const int64_t major_snapshot_version,
|
||||
const int64_t schema_column_cnt)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_FAIL(row_scan_.init(param, src_sstable))) {
|
||||
LOG_WARN("failed to init", K(ret));
|
||||
} else {
|
||||
row_scan_end_ = false;
|
||||
next_row_ = nullptr;
|
||||
major_snapshot_version_ = major_snapshot_version;
|
||||
trans_version_col_idx_ = ObMultiVersionRowkeyHelpper::get_trans_version_col_store_index(
|
||||
row_scan_.get_rowkey_read_info()->get_schema_rowkey_count(), true);
|
||||
if (OB_FAIL(row_queue_.init(schema_column_cnt + storage::ObMultiVersionRowkeyHelpper::get_extra_rowkey_col_cnt()))) {
|
||||
LOG_WARN("failed to init row queue", K(ret));
|
||||
} else {
|
||||
row_queue_allocator_.reset();
|
||||
row_queue_has_unskippable_row_ = false;
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -2142,23 +2151,125 @@ int ObUncommittedRowScan::get_next_row(const ObDatumRow *&res_row)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const ObDatumRow *row = nullptr;
|
||||
bool can_skip = true;
|
||||
res_row = nullptr;
|
||||
while (OB_SUCC(ret) && can_skip) {
|
||||
if (OB_FAIL(row_scan_.get_next_row(row))) {
|
||||
if (OB_UNLIKELY(OB_ITER_END != ret)) {
|
||||
LOG_WARN("fail to get next row", K(ret));
|
||||
while (OB_SUCC(ret) && !row_queue_.has_next()) {
|
||||
if (OB_FAIL(get_next_rowkey_rows())) {
|
||||
if (OB_ITER_END != ret) {
|
||||
LOG_WARN("failed to get next rowkey rows", K(ret));
|
||||
}
|
||||
} else if (OB_FAIL(check_can_skip(*row, can_skip))) {
|
||||
LOG_WARN("failed to check can skip", K(ret));
|
||||
} else if (!row_queue_has_unskippable_row_) {
|
||||
row_queue_reuse();
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_FAIL(row_queue_.get_next_row(row))) {
|
||||
LOG_WARN("failed to get next row", K(ret));
|
||||
} else {
|
||||
res_row = row;
|
||||
}
|
||||
return ret;
|
||||
};
|
||||
|
||||
int ObUncommittedRowScan::get_next_rowkey_rows()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const ObDatumRow *row = nullptr;
|
||||
if (OB_UNLIKELY(row_queue_.has_next())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("previous rowkey rows still exist", K(ret));
|
||||
}
|
||||
|
||||
// collect rows from next_row_ to row_queue_
|
||||
if (OB_SUCC(ret) && OB_NOT_NULL(next_row_)) {
|
||||
if (OB_FAIL(row_queue_add(*next_row_))) {
|
||||
LOG_WARN("failed to add row", K(ret));
|
||||
} else {
|
||||
next_row_ = nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
// collect rows from row_scan_ to row_queue_ or next_row_
|
||||
while (OB_SUCC(ret) && !row_scan_end_ && nullptr == next_row_) {
|
||||
if (OB_FAIL(row_scan_.get_next_row(row))) {
|
||||
if (OB_ITER_END == ret) {
|
||||
row_scan_end_ = true;
|
||||
ret = OB_SUCCESS;
|
||||
}
|
||||
} else if (OB_ISNULL(row)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("invalid row", K(ret));
|
||||
} else if (OB_FAIL(row_queue_add(*row))) {
|
||||
if (OB_SIZE_OVERFLOW == ret) {
|
||||
next_row_ = row;
|
||||
ret = OB_SUCCESS;
|
||||
} else {
|
||||
LOG_WARN("failed to add row", K(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret) && !row_queue_.has_next()) {
|
||||
ret = OB_ITER_END;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObUncommittedRowScan::row_queue_add(const ObDatumRow &row)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
bool rowkey_changed = false;
|
||||
if (row_queue_.has_next()) {
|
||||
const int64_t schema_rowkey_cnt = row_scan_.get_rowkey_read_info()->get_schema_rowkey_count();
|
||||
const blocksstable::ObStorageDatumUtils &datum_utils = row_scan_.get_rowkey_read_info()->get_datum_utils();
|
||||
const ObDatumRow *last_row_in_queue = row_queue_.get_last();
|
||||
ObDatumRowkey cur_key;
|
||||
ObDatumRowkey last_key;
|
||||
int compare_result = 0;
|
||||
if (OB_ISNULL(last_row_in_queue)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected last row is nullptr", K(ret), K(row_queue_));
|
||||
} else if (OB_FAIL(last_key.assign(last_row_in_queue->storage_datums_, schema_rowkey_cnt))) {
|
||||
LOG_WARN("failed to assign qu rowkey", K(ret));
|
||||
} else if (OB_FAIL(cur_key.assign(row.storage_datums_, schema_rowkey_cnt))) {
|
||||
LOG_WARN("failed to assign cur key", K(ret));
|
||||
} else if (OB_FAIL(cur_key.compare(last_key, datum_utils, compare_result))) {
|
||||
LOG_WARN("failed to compare last key", K(ret), K(cur_key), K(last_key));
|
||||
} else if (OB_UNLIKELY(compare_result < 0)) {
|
||||
ret = OB_ROWKEY_ORDER_ERROR;
|
||||
LOG_ERROR("input rowkey is less then last rowkey", K(ret), K(cur_key), K(last_key), K(ret));
|
||||
} else if (compare_result > 0) {
|
||||
rowkey_changed = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret) && !rowkey_changed) {
|
||||
bool can_skip = false;
|
||||
if (OB_FAIL(row_queue_.add_row(row, row_queue_allocator_))) {
|
||||
LOG_WARN("failed to add row", K(ret));
|
||||
} else if (OB_FAIL(check_can_skip(row, can_skip))) {
|
||||
LOG_WARN("failed to check can skip", K(ret));
|
||||
} else if (!can_skip) {
|
||||
row_queue_has_unskippable_row_ = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret) && rowkey_changed) {
|
||||
ret = OB_SIZE_OVERFLOW;
|
||||
} else if (OB_SIZE_OVERFLOW == ret) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("change errcode", K(ret), "real_ret", OB_SIZE_OVERFLOW);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ObUncommittedRowScan::row_queue_reuse()
|
||||
{
|
||||
row_queue_.reuse();
|
||||
row_queue_allocator_.reuse();
|
||||
row_queue_has_unskippable_row_ = false;
|
||||
return;
|
||||
}
|
||||
|
||||
int ObUncommittedRowScan::check_can_skip(const ObDatumRow &row, bool &can_skip)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
@ -383,14 +383,23 @@ public:
|
||||
int init(
|
||||
const ObSplitScanParam param,
|
||||
ObSSTable &src_sstable,
|
||||
const int64_t major_snapshot_version);
|
||||
const int64_t major_snapshot_version,
|
||||
const int64_t schema_column_cnt);
|
||||
virtual int get_next_row(const blocksstable::ObDatumRow *&tmp_row) override;
|
||||
private:
|
||||
int get_next_rowkey_rows();
|
||||
int row_queue_add(const ObDatumRow &row);
|
||||
void row_queue_reuse();
|
||||
int check_can_skip(const blocksstable::ObDatumRow &row, bool &can_skip);
|
||||
private:
|
||||
ObRowScan row_scan_;
|
||||
bool row_scan_end_;
|
||||
const ObDatumRow *next_row_;
|
||||
int64_t major_snapshot_version_;
|
||||
int64_t trans_version_col_idx_;
|
||||
blocksstable::ObRowQueue row_queue_;
|
||||
ObArenaAllocator row_queue_allocator_;
|
||||
bool row_queue_has_unskippable_row_;
|
||||
};
|
||||
|
||||
struct ObTabletSplitUtil final
|
||||
|
Loading…
x
Reference in New Issue
Block a user