Fix bug in skip scan with splitted ranges
This commit is contained in:
@ -27,6 +27,7 @@ ObMultipleSkipScanMerge::ObMultipleSkipScanMerge()
|
||||
datums_cnt_(0),
|
||||
datums_(nullptr),
|
||||
origin_range_(nullptr),
|
||||
skip_scan_range_(nullptr),
|
||||
range_allocator_("SS_RANGE"),
|
||||
rowkey_allocator_("SS_ROWKEY")
|
||||
{
|
||||
@ -61,8 +62,6 @@ int ObMultipleSkipScanMerge::init(
|
||||
} else if (FALSE_IT(datums_ = new (buf) ObStorageDatum[datums_cnt_])) {
|
||||
} else if (OB_FAIL(prepare_range(start_key_of_scan_rowkey_range(), scan_rowkey_range_))) {
|
||||
STORAGE_LOG(WARN, "Fail to prepare distinct scan range", K(ret));
|
||||
} else if (OB_FAIL(prepare_range(start_key_of_scan_rows_range(), scan_rows_range_))) {
|
||||
STORAGE_LOG(WARN, "Fail to prepare skip scan range", K(ret));
|
||||
} else {
|
||||
STORAGE_LOG(DEBUG, "success to init ObMultipleSkipScanMerge", K(param), K(context), K(get_table_param),
|
||||
K(schema_rowkey_cnt_), K(ss_rowkey_prefix_cnt_));
|
||||
@ -85,6 +84,7 @@ void ObMultipleSkipScanMerge::reset()
|
||||
}
|
||||
datums_ = nullptr;
|
||||
origin_range_ = nullptr;
|
||||
skip_scan_range_ = nullptr;
|
||||
range_allocator_.reset();
|
||||
rowkey_allocator_.reset();
|
||||
ObMultipleScanMerge::reset();
|
||||
@ -107,6 +107,7 @@ int ObMultipleSkipScanMerge::open(const blocksstable::ObDatumRange &range, const
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
origin_range_ = ⦥
|
||||
skip_scan_range_ = &skip_scan_range;
|
||||
if (RETIRED_TO_SCAN == state_) {
|
||||
if (OB_FAIL(ObMultipleScanMerge::open(range))) {
|
||||
STORAGE_LOG(WARN, "Fail to open ObMultipleScanMerge", K(ret), K(range));
|
||||
@ -138,20 +139,6 @@ int ObMultipleSkipScanMerge::open_skip_scan(const blocksstable::ObDatumRange &ra
|
||||
prepare_rowkey(end_key_of_scan_rowkey_range(), range.end_key_, schema_rowkey_cnt_, false);
|
||||
scan_rowkey_range_.set_border_flag(range.get_border_flag());
|
||||
scan_rowkey_range_.set_group_idx(range.get_group_idx());
|
||||
// generate key range for outputing rows
|
||||
for (int64_t i = 0; i < ss_rowkey_prefix_cnt_; ++i) {
|
||||
start_key_of_scan_rows_range()[i].set_min();
|
||||
end_key_of_scan_rows_range()[i].set_max();
|
||||
}
|
||||
prepare_rowkey(start_key_of_scan_rows_range() + ss_rowkey_prefix_cnt_,
|
||||
skip_scan_range.start_key_,
|
||||
schema_rowkey_cnt_ - ss_rowkey_prefix_cnt_,
|
||||
true);
|
||||
prepare_rowkey(end_key_of_scan_rows_range() + ss_rowkey_prefix_cnt_,
|
||||
skip_scan_range.end_key_,
|
||||
schema_rowkey_cnt_ - ss_rowkey_prefix_cnt_,
|
||||
false);
|
||||
scan_rows_range_.set_border_flag(skip_scan_range.get_border_flag());
|
||||
scan_rowkey_range_.set_group_idx(range.get_group_idx());
|
||||
const ObColDescIArray *col_descs = nullptr;
|
||||
if (OB_ISNULL(col_descs = access_param_->iter_param_.get_out_col_descs())) {
|
||||
@ -159,8 +146,6 @@ int ObMultipleSkipScanMerge::open_skip_scan(const blocksstable::ObDatumRange &ra
|
||||
TRANS_LOG(WARN, "Unexpected null out cols", K(ret));
|
||||
} else if (OB_FAIL(scan_rowkey_range_.prepare_memtable_readable(*col_descs, rowkey_allocator_))) {
|
||||
STORAGE_LOG(WARN, "Fail to transfer store rowkey", K(ret), K(scan_rowkey_range_));
|
||||
} else if (OB_FAIL(scan_rows_range_.prepare_memtable_readable(*col_descs, range_allocator_))) {
|
||||
STORAGE_LOG(WARN, "Fail to transfer store rowkey", K(ret), K(scan_rows_range_));
|
||||
}
|
||||
STORAGE_LOG(TRACE, "open skip scan", K(ret), K(schema_rowkey_cnt_), K(ss_rowkey_prefix_cnt_),
|
||||
K(scan_rows_range_), K(range), K(skip_scan_range));
|
||||
@ -390,6 +375,8 @@ int ObMultipleSkipScanMerge::update_scan_rows_range(blocksstable::ObDatumRow &ro
|
||||
range_allocator_.reuse();
|
||||
if (should_check_interrupt() && OB_FAIL(THIS_WORKER.check_status())) {
|
||||
STORAGE_LOG(WARN, "query interrupt", K(ret));
|
||||
} else if (OB_FAIL(prepare_scan_row_range())) {
|
||||
STORAGE_LOG(WARN, "Fail to prepare scan row range", K(ret));
|
||||
} else if (should_retire_to_scan()) {
|
||||
// too many distinct prefix, retire to normal scan
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < ss_rowkey_prefix_cnt_; ++i) {
|
||||
@ -404,8 +391,10 @@ int ObMultipleSkipScanMerge::update_scan_rows_range(blocksstable::ObDatumRow &ro
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (access_ctx_->query_flag_.is_reverse_scan()) {
|
||||
scan_rows_range_.set_start_key(origin_range_->get_start_key());
|
||||
set_border_falg(true, *origin_range_, scan_rows_range_);
|
||||
} else {
|
||||
scan_rows_range_.set_end_key(origin_range_->get_end_key());
|
||||
set_border_falg(false, *origin_range_, scan_rows_range_);
|
||||
}
|
||||
STORAGE_LOG(TRACE, "should retire to normal scan", K(ret), K(scan_rows_range_));
|
||||
} else {
|
||||
@ -422,10 +411,10 @@ int ObMultipleSkipScanMerge::update_scan_rows_range(blocksstable::ObDatumRow &ro
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
// check current skip scan range may exceed original range?
|
||||
// adjust scan rows range according original query range
|
||||
// one case is in parallel execution, splitted range
|
||||
bool exceeded = false;
|
||||
if (OB_FAIL(check_range_exceeded(exceeded))) {
|
||||
if (OB_FAIL(shrink_scan_rows_range(exceeded))) {
|
||||
STORAGE_LOG(WARN, "Fail to check range exceed", K(ret));
|
||||
} else if (exceeded) {
|
||||
ret = OB_ITER_END;
|
||||
@ -494,23 +483,79 @@ int ObMultipleSkipScanMerge::update_scan_rowkey_range()
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObMultipleSkipScanMerge::check_range_exceeded(bool &exceeded)
|
||||
int ObMultipleSkipScanMerge::shrink_scan_rows_range(bool &exceeded)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int cmp_ret = 0;
|
||||
exceeded = false;
|
||||
const ObStorageDatumUtils &datum_utils = access_param_->iter_param_.get_read_info()->get_datum_utils();
|
||||
if (OB_FAIL(scan_rows_range_.end_key_.compare(scan_rowkey_range_.start_key_, datum_utils, cmp_ret))) {
|
||||
STORAGE_LOG(WARN, "Fail to compare", K(ret), K(scan_rows_range_.end_key_), K(scan_rowkey_range_.start_key_));
|
||||
} else if (cmp_ret < 0 || (0 == cmp_ret && (scan_rows_range_.is_right_open() || scan_rowkey_range_.is_left_open()))) {
|
||||
exceeded = true;
|
||||
} else if (OB_FAIL(scan_rows_range_.start_key_.compare(scan_rowkey_range_.end_key_, datum_utils, cmp_ret))) {
|
||||
STORAGE_LOG(WARN, "Fail to compare", K(ret), K(scan_rows_range_.end_key_), K(scan_rowkey_range_.start_key_));
|
||||
} else if (cmp_ret > 0 || (0 == cmp_ret && (scan_rows_range_.is_left_open() || scan_rowkey_range_.is_right_open()))) {
|
||||
if (OB_FAIL(scan_rows_range_.start_key_.compare(origin_range_->start_key_, datum_utils, cmp_ret))) {
|
||||
STORAGE_LOG(WARN, "Fail to compare", K(ret));
|
||||
} else if (cmp_ret > 0) {
|
||||
} else {
|
||||
set_border_falg(true, *origin_range_, scan_rows_range_);
|
||||
if (cmp_ret < 0) {
|
||||
scan_rows_range_.start_key_ = origin_range_->start_key_;
|
||||
}
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_FAIL(scan_rows_range_.end_key_.compare(origin_range_->end_key_, datum_utils, cmp_ret))) {
|
||||
STORAGE_LOG(WARN, "Fail to compare", K(ret));
|
||||
} else if (cmp_ret < 0) {
|
||||
} else {
|
||||
set_border_falg(false, *origin_range_, scan_rows_range_);
|
||||
if (cmp_ret > 0) {
|
||||
scan_rows_range_.end_key_ = origin_range_->end_key_;
|
||||
}
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_FAIL(scan_rows_range_.start_key_.compare(scan_rows_range_.end_key_, datum_utils, cmp_ret))) {
|
||||
STORAGE_LOG(WARN, "Fail to compare", K(ret), K(scan_rows_range_));
|
||||
} else if (cmp_ret > 0 || (0 == cmp_ret && (scan_rows_range_.is_left_open() || scan_rows_range_.is_right_open()))) {
|
||||
exceeded = true;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObMultipleSkipScanMerge::prepare_scan_row_range()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_FAIL(prepare_range(start_key_of_scan_rows_range(), scan_rows_range_))) {
|
||||
STORAGE_LOG(WARN, "Fail to prepare skip scan range", K(ret));
|
||||
} else {
|
||||
for (int64_t i = 0; i < ss_rowkey_prefix_cnt_; ++i) {
|
||||
start_key_of_scan_rows_range()[i].set_min();
|
||||
end_key_of_scan_rows_range()[i].set_max();
|
||||
}
|
||||
prepare_rowkey(start_key_of_scan_rows_range() + ss_rowkey_prefix_cnt_,
|
||||
skip_scan_range_->start_key_,
|
||||
schema_rowkey_cnt_ - ss_rowkey_prefix_cnt_,
|
||||
true);
|
||||
prepare_rowkey(end_key_of_scan_rows_range() + ss_rowkey_prefix_cnt_,
|
||||
skip_scan_range_->end_key_,
|
||||
schema_rowkey_cnt_ - ss_rowkey_prefix_cnt_,
|
||||
false);
|
||||
scan_rows_range_.set_border_flag(skip_scan_range_->get_border_flag());
|
||||
scan_rows_range_.set_group_idx(origin_range_->get_group_idx());
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ObMultipleSkipScanMerge::set_border_falg(const bool is_left, const blocksstable::ObDatumRange &src, blocksstable::ObDatumRange &dst)
|
||||
{
|
||||
if (is_left) {
|
||||
if (src.is_left_open()) {
|
||||
dst.set_left_open();
|
||||
} else {
|
||||
dst.set_left_closed();
|
||||
}
|
||||
} else {
|
||||
if (src.is_right_open()) {
|
||||
dst.set_right_open();
|
||||
} else {
|
||||
dst.set_right_closed();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user