Fix skip scan bug, retire to normal scan in some case
This commit is contained in:
@ -19,12 +19,14 @@ namespace storage
|
||||
ObMultipleSkipScanMerge::ObMultipleSkipScanMerge()
|
||||
: ObMultipleScanMerge(),
|
||||
state_(SCAN_ROWKEY),
|
||||
scan_rowkey_cnt_(0),
|
||||
schema_rowkey_cnt_(0),
|
||||
ss_rowkey_prefix_cnt_(0),
|
||||
scan_rowkey_range_(),
|
||||
scan_rows_range_(),
|
||||
datums_cnt_(0),
|
||||
datums_(nullptr),
|
||||
origin_range_(nullptr),
|
||||
range_allocator_("SS_RANGE"),
|
||||
rowkey_allocator_("SS_ROWKEY")
|
||||
{
|
||||
@ -72,6 +74,7 @@ int ObMultipleSkipScanMerge::init(
|
||||
void ObMultipleSkipScanMerge::reset()
|
||||
{
|
||||
state_ = SCAN_ROWKEY;
|
||||
scan_rowkey_cnt_ = 0;
|
||||
schema_rowkey_cnt_ = 0;
|
||||
ss_rowkey_prefix_cnt_ = 0;
|
||||
scan_rowkey_range_.reset();
|
||||
@ -81,6 +84,7 @@ void ObMultipleSkipScanMerge::reset()
|
||||
access_ctx_->stmt_allocator_->free(datums_);
|
||||
}
|
||||
datums_ = nullptr;
|
||||
origin_range_ = nullptr;
|
||||
range_allocator_.reset();
|
||||
rowkey_allocator_.reset();
|
||||
ObMultipleScanMerge::reset();
|
||||
@ -88,7 +92,9 @@ void ObMultipleSkipScanMerge::reset()
|
||||
|
||||
void ObMultipleSkipScanMerge::reuse()
|
||||
{
|
||||
state_ = SCAN_ROWKEY;
|
||||
if (RETIRED_TO_SCAN != state_) {
|
||||
state_ = SCAN_ROWKEY;
|
||||
}
|
||||
reuse_datums();
|
||||
range_allocator_.reuse();
|
||||
rowkey_allocator_.reuse();
|
||||
@ -98,10 +104,25 @@ void ObMultipleSkipScanMerge::reuse()
|
||||
// range: the original key range to scan rows
|
||||
// skip_scan_range: the key range only contains suffix columns in rowkey
|
||||
int ObMultipleSkipScanMerge::open(const blocksstable::ObDatumRange &range, const blocksstable::ObDatumRange &skip_scan_range)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
origin_range_ = ⦥
|
||||
if (RETIRED_TO_SCAN == state_) {
|
||||
if (OB_FAIL(ObMultipleScanMerge::open(range))) {
|
||||
STORAGE_LOG(WARN, "Fail to open ObMultipleScanMerge", K(ret), K(range));
|
||||
}
|
||||
} else if (OB_FAIL(open_skip_scan(range, skip_scan_range))) {
|
||||
STORAGE_LOG(WARN, "Fail to open skip scan", K(ret), K(range), K(skip_scan_range));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObMultipleSkipScanMerge::open_skip_scan(const blocksstable::ObDatumRange &range, const blocksstable::ObDatumRange &skip_scan_range)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
bool exceeded = false;
|
||||
const int64_t skip_range_datum_cnt = schema_rowkey_cnt_ - ss_rowkey_prefix_cnt_;
|
||||
access_ctx_->range_allocator_ = &range_allocator_;
|
||||
if (skip_scan_range.is_whole_range()) {
|
||||
ret = OB_NOT_SUPPORTED;
|
||||
STORAGE_LOG(WARN, "not supported index skip scan plan", K(ret));
|
||||
@ -165,6 +186,7 @@ int ObMultipleSkipScanMerge::inner_get_next_row(blocksstable::ObDatumRow &row)
|
||||
}
|
||||
} else {
|
||||
state_ = UPDATE_SCAN_ROWS_RANGE;
|
||||
scan_rowkey_cnt_++;
|
||||
}
|
||||
break;
|
||||
}
|
||||
@ -176,6 +198,8 @@ int ObMultipleSkipScanMerge::inner_get_next_row(blocksstable::ObDatumRow &row)
|
||||
} else {
|
||||
STORAGE_LOG(WARN, "Fail to update scan rows range", K(ret), K(row));
|
||||
}
|
||||
} else if (should_retire_to_scan()) {
|
||||
state_ = RETIRED_TO_SCAN;
|
||||
} else {
|
||||
STORAGE_LOG(DEBUG, "skip scan update scan rows range", K(row), K(scan_rows_range_));
|
||||
state_ = SCAN_ROWS;
|
||||
@ -216,6 +240,11 @@ int ObMultipleSkipScanMerge::inner_get_next_row(blocksstable::ObDatumRow &row)
|
||||
ret = OB_ITER_END;
|
||||
break;
|
||||
}
|
||||
case RETIRED_TO_SCAN: {
|
||||
ret = ObMultipleScanMerge::inner_get_next_row(row);
|
||||
got_row = true;
|
||||
break;
|
||||
}
|
||||
default : {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
STORAGE_LOG(WARN, "Unexpected state", K(state_));
|
||||
@ -245,6 +274,7 @@ int ObMultipleSkipScanMerge::inner_get_next_rows()
|
||||
}
|
||||
} else {
|
||||
state_ = UPDATE_SCAN_ROWS_RANGE;
|
||||
scan_rowkey_cnt_++;
|
||||
}
|
||||
break;
|
||||
}
|
||||
@ -256,6 +286,8 @@ int ObMultipleSkipScanMerge::inner_get_next_rows()
|
||||
} else {
|
||||
STORAGE_LOG(WARN, "Fail to update scan rows range", K(ret), K(unprojected_row_));
|
||||
}
|
||||
} else if (should_retire_to_scan()) {
|
||||
state_ = RETIRED_TO_SCAN;
|
||||
} else {
|
||||
STORAGE_LOG(DEBUG, "skip scan update scan rows range", K(unprojected_row_), K(scan_rows_range_));
|
||||
state_ = SCAN_ROWS;
|
||||
@ -299,6 +331,11 @@ int ObMultipleSkipScanMerge::inner_get_next_rows()
|
||||
ret = OB_ITER_END;
|
||||
break;
|
||||
}
|
||||
case RETIRED_TO_SCAN: {
|
||||
ret = ObMultipleScanMerge::inner_get_next_rows();
|
||||
end_loop = true;
|
||||
break;
|
||||
}
|
||||
default : {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
STORAGE_LOG(WARN, "Unexpected state", K(state_));
|
||||
@ -351,15 +388,39 @@ int ObMultipleSkipScanMerge::update_scan_rows_range(blocksstable::ObDatumRow &ro
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
range_allocator_.reuse();
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < ss_rowkey_prefix_cnt_; ++i) {
|
||||
ObStorageDatum &prefix_of_start_key = start_key_of_scan_rows_range()[i];
|
||||
ObStorageDatum &prefix_of_end_key = end_key_of_scan_rows_range()[i];
|
||||
prefix_of_start_key.reuse();
|
||||
prefix_of_end_key.reuse();
|
||||
if (OB_FAIL(prefix_of_start_key.deep_copy(row.storage_datums_[i], range_allocator_))) {
|
||||
STORAGE_LOG(WARN, "Fail to deep copy start key's datum", K(ret), K(i), K(row), K(scan_rows_range_));
|
||||
} else if (OB_FAIL(prefix_of_end_key.deep_copy(row.storage_datums_[i], range_allocator_))) {
|
||||
STORAGE_LOG(WARN, "Fail to deep copy end key's datum", K(ret), K(i), K(row), K(scan_rows_range_));
|
||||
if (should_check_interrupt()) {
|
||||
if (OB_FAIL(THIS_WORKER.check_status())) {
|
||||
STORAGE_LOG(WARN, "query interrupt, ", K(ret));
|
||||
}
|
||||
} else if (should_retire_to_scan()) {
|
||||
// too many distinct prefix, retire to normal scan
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < ss_rowkey_prefix_cnt_; ++i) {
|
||||
ObStorageDatum &prefix_of_rows_key = access_ctx_->query_flag_.is_reverse_scan() ?
|
||||
end_key_of_scan_rows_range()[i] :
|
||||
start_key_of_scan_rows_range()[i];
|
||||
prefix_of_rows_key.reuse();
|
||||
if (OB_FAIL(prefix_of_rows_key.deep_copy(row.storage_datums_[i], range_allocator_))) {
|
||||
STORAGE_LOG(WARN, "Fail to deep copy start key's datum", K(ret), K(i), K(row), K(scan_rows_range_));
|
||||
}
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (access_ctx_->query_flag_.is_reverse_scan()) {
|
||||
scan_rows_range_.set_start_key(origin_range_->get_start_key());
|
||||
} else {
|
||||
scan_rows_range_.set_end_key(origin_range_->get_end_key());
|
||||
}
|
||||
STORAGE_LOG(TRACE, "should retire to normal scan", K(ret), K(scan_rows_range_));
|
||||
} else {
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < ss_rowkey_prefix_cnt_; ++i) {
|
||||
ObStorageDatum &prefix_of_start_key = start_key_of_scan_rows_range()[i];
|
||||
ObStorageDatum &prefix_of_end_key = end_key_of_scan_rows_range()[i];
|
||||
prefix_of_start_key.reuse();
|
||||
prefix_of_end_key.reuse();
|
||||
if (OB_FAIL(prefix_of_start_key.deep_copy(row.storage_datums_[i], range_allocator_))) {
|
||||
STORAGE_LOG(WARN, "Fail to deep copy start key's datum", K(ret), K(i), K(row), K(scan_rows_range_));
|
||||
} else if (OB_FAIL(prefix_of_end_key.deep_copy(row.storage_datums_[i], range_allocator_))) {
|
||||
STORAGE_LOG(WARN, "Fail to deep copy end key's datum", K(ret), K(i), K(row), K(scan_rows_range_));
|
||||
}
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
|
||||
@ -43,6 +43,9 @@ private:
|
||||
const static int32_t START_KEY_OFFSET_OF_SCAN_ROWS_RANGE = 2;
|
||||
const static int32_t END_KEY_OFFSET_OF_SCAN_ROWS_RANGE = 3;
|
||||
const static int32_t SKIP_SCAN_ROWKEY_DATUMS_ARRAY_CNT = 4;
|
||||
const static int64_t SKIP_SCAN_CHECK_INTERRUPT_CNT = 100;
|
||||
const static int64_t SKIP_SCAN_RETIRE_TO_NORMAL_SCAN_LIMIT = 1000;
|
||||
int open_skip_scan(const blocksstable::ObDatumRange &range, const blocksstable::ObDatumRange &skip_scan_range);
|
||||
int prepare_range(blocksstable::ObStorageDatum *datums, blocksstable::ObDatumRange &range);
|
||||
void prepare_rowkey(blocksstable::ObStorageDatum *datums, const blocksstable::ObDatumRowkey &rowkey,
|
||||
const int64_t datum_cnt, const bool is_min);
|
||||
@ -65,6 +68,14 @@ private:
|
||||
}
|
||||
}
|
||||
}
|
||||
OB_INLINE bool should_check_interrupt() const
|
||||
{
|
||||
return 0 == scan_rowkey_cnt_ % SKIP_SCAN_CHECK_INTERRUPT_CNT;
|
||||
}
|
||||
OB_INLINE bool should_retire_to_scan() const
|
||||
{
|
||||
return scan_rowkey_cnt_ > SKIP_SCAN_RETIRE_TO_NORMAL_SCAN_LIMIT;
|
||||
}
|
||||
|
||||
enum SkipScanState {
|
||||
SCAN_ROWKEY,
|
||||
@ -72,14 +83,17 @@ private:
|
||||
SCAN_ROWS,
|
||||
UPDATE_SCAN_ROWKEY_RANGE,
|
||||
SCAN_FINISHED,
|
||||
RETIRED_TO_SCAN,
|
||||
};
|
||||
SkipScanState state_;
|
||||
int64_t scan_rowkey_cnt_;
|
||||
int64_t schema_rowkey_cnt_;
|
||||
int64_t ss_rowkey_prefix_cnt_;
|
||||
blocksstable::ObDatumRange scan_rowkey_range_;
|
||||
blocksstable::ObDatumRange scan_rows_range_;
|
||||
int64_t datums_cnt_;
|
||||
blocksstable::ObStorageDatum *datums_;
|
||||
const blocksstable::ObDatumRange *origin_range_;
|
||||
common::ObArenaAllocator range_allocator_;
|
||||
common::ObArenaAllocator rowkey_allocator_;
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user