Fix column store blockscan check logic
This commit is contained in:
parent
960df4d65c
commit
debe792090
@ -49,6 +49,7 @@ public:
|
||||
virtual void SetUp();
|
||||
virtual void TearDown();
|
||||
virtual void prepare_schema();
|
||||
void prepare_co_query_param(const bool is_reverse);
|
||||
void generate_range(
|
||||
const int64_t start,
|
||||
const int64_t end,
|
||||
@ -135,6 +136,14 @@ void TestCOSSTableRowScanner::TearDown()
|
||||
TestIndexBlockDataPrepare::TearDown();
|
||||
}
|
||||
|
||||
void TestCOSSTableRowScanner::prepare_co_query_param(const bool is_reverse)
|
||||
{
|
||||
prepare_query_param(is_reverse);
|
||||
iter_param_.pd_storage_flag_.set_blockscan_pushdown(true);
|
||||
iter_param_.pd_storage_flag_.set_filter_pushdown(true);
|
||||
iter_param_.vectorized_enabled_ = true;
|
||||
}
|
||||
|
||||
void TestCOSSTableRowScanner::prepare_schema()
|
||||
{
|
||||
ObColumnSchemaV2 column;
|
||||
@ -382,7 +391,7 @@ void TestCOSSTableRowScanner::test_row_scan_only(const bool is_reverse)
|
||||
int64_t start = 0;
|
||||
int64_t end = row_cnt_ - 1;
|
||||
generate_range(start, end, range_);
|
||||
prepare_query_param(is_reverse);
|
||||
prepare_co_query_param(is_reverse);
|
||||
OK(scanner_.inner_open(iter_param_, context_, &sstable_, &range_));
|
||||
scanner_.block_row_store_ = &block_row_store_;
|
||||
if (is_reverse) {
|
||||
@ -400,7 +409,7 @@ void TestCOSSTableRowScanner::test_row_scan_and_column_scan(const bool is_revers
|
||||
int64_t start = 0;
|
||||
int64_t end = row_cnt_ - 1;
|
||||
generate_range(start, end, range_);
|
||||
prepare_query_param(is_reverse);
|
||||
prepare_co_query_param(is_reverse);
|
||||
OK(scanner_.inner_open(iter_param_, context_, &sstable_, &range_));
|
||||
scanner_.block_row_store_ = &block_row_store_;
|
||||
|
||||
@ -445,7 +454,7 @@ void TestCOSSTableRowScanner::test_row_scan_and_column_scan_with_multi_range1()
|
||||
const bool is_reverse = false;
|
||||
prepare_test_case(level_cnt);
|
||||
generate_ranges_case1(is_reverse);
|
||||
prepare_query_param(is_reverse);
|
||||
prepare_co_query_param(is_reverse);
|
||||
OK(multi_scanner_.inner_open(iter_param_, context_, &sstable_, &ranges_));
|
||||
multi_scanner_.block_row_store_ = &block_row_store_;
|
||||
consume_rows_by_row_store(&multi_scanner_, range_row_ids_[0].start_row_id_,
|
||||
@ -494,7 +503,7 @@ void TestCOSSTableRowScanner::test_reverse_row_scan_and_column_scan_with_multi_r
|
||||
const bool is_reverse = true;
|
||||
prepare_test_case(level_cnt);
|
||||
generate_ranges_case1(is_reverse);
|
||||
prepare_query_param(is_reverse);
|
||||
prepare_co_query_param(is_reverse);
|
||||
OK(multi_scanner_.inner_open(iter_param_, context_, &sstable_, &ranges_));
|
||||
multi_scanner_.block_row_store_ = &block_row_store_;
|
||||
consume_rows_by_row_store(&multi_scanner_, range_row_ids_[0].end_row_id_,
|
||||
@ -539,7 +548,7 @@ void TestCOSSTableRowScanner::test_row_scan_and_column_scan_with_multi_range2()
|
||||
const bool is_reverse = false;
|
||||
prepare_test_case(level_cnt);
|
||||
generate_ranges_case2(is_reverse);
|
||||
prepare_query_param(is_reverse);
|
||||
prepare_co_query_param(is_reverse);
|
||||
OK(multi_scanner_.inner_open(iter_param_, context_, &sstable_, &ranges_));
|
||||
multi_scanner_.block_row_store_ = &block_row_store_;
|
||||
|
||||
@ -616,7 +625,7 @@ void TestCOSSTableRowScanner::test_reverse_row_scan_and_column_scan_with_multi_r
|
||||
const bool is_reverse = true;
|
||||
prepare_test_case(level_cnt);
|
||||
generate_ranges_case2(is_reverse);
|
||||
prepare_query_param(is_reverse);
|
||||
prepare_co_query_param(is_reverse);
|
||||
OK(multi_scanner_.inner_open(iter_param_, context_, &sstable_, &ranges_));
|
||||
multi_scanner_.block_row_store_ = &block_row_store_;
|
||||
|
||||
|
@ -67,9 +67,6 @@ public:
|
||||
OB_INLINE bool can_blockscan() const { return can_blockscan_; }
|
||||
OB_INLINE bool filter_pushdown() const { return pd_filter_info_.is_pd_filter_; }
|
||||
OB_INLINE bool filter_applied() const { return filter_applied_; }
|
||||
// For columnar store.
|
||||
// TODO(hanling: it is used for compatible with row store and will be deprecated in the future)
|
||||
OB_INLINE void set_filter_applied(const bool is_filter_applied) { filter_applied_ = is_filter_applied; }
|
||||
OB_INLINE bool filter_is_null() const { return pd_filter_info_.is_pd_filter_ && nullptr == pd_filter_info_.filter_; }
|
||||
int apply_blockscan(
|
||||
blocksstable::ObIMicroBlockRowScanner µ_scanner,
|
||||
|
@ -272,8 +272,6 @@ int ObMultipleScanMerge::locate_blockscan_border()
|
||||
LOG_WARN("Unexpected null iter", K(ret), K(consumers_[0]));
|
||||
} else if (OB_FAIL(iter->refresh_blockscan_checker(border_key))) {
|
||||
LOG_WARN("Failed to check pushdown skip", K(ret), K(border_key));
|
||||
} else {
|
||||
block_row_store_->set_filter_applied(true);
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
@ -547,7 +545,7 @@ int ObMultipleScanMerge::can_batch_scan(bool &can_batch)
|
||||
} else if (OB_ISNULL(iter = iters_.at(consumers_[0]))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
STORAGE_LOG(WARN, "Unexpected null iter", K(ret), K(consumers_[0]), K(iters_), K(*this));
|
||||
} else if (iter->filter_applied()) {
|
||||
} else if (iter->can_batch_scan()) {
|
||||
can_batch = true;
|
||||
}
|
||||
}
|
||||
|
@ -59,6 +59,23 @@ void ObSSTableRowScanner<PrefetchType>::reuse()
|
||||
prefetcher_.reuse();
|
||||
}
|
||||
|
||||
template<typename PrefetchType>
|
||||
bool ObSSTableRowScanner<PrefetchType>::can_blockscan() const
|
||||
{
|
||||
return is_scan(type_) &&
|
||||
nullptr != block_row_store_ &&
|
||||
block_row_store_->can_blockscan();
|
||||
}
|
||||
|
||||
template<typename PrefetchType>
|
||||
bool ObSSTableRowScanner<PrefetchType>::can_batch_scan() const
|
||||
{
|
||||
return can_blockscan() &&
|
||||
block_row_store_->filter_applied() &&
|
||||
// can batch scan when only enable_pd_aggregate, as it uses own datum buffer and only return aggregated result
|
||||
(iter_param_->vectorized_enabled_ || iter_param_->enable_pd_aggregate());
|
||||
}
|
||||
|
||||
template<typename PrefetchType>
|
||||
int ObSSTableRowScanner<PrefetchType>::inner_open(
|
||||
const ObTableIterParam &iter_param,
|
||||
@ -236,8 +253,7 @@ int ObSSTableRowScanner<PrefetchType>::inner_get_next_row(const ObDatumRow *&sto
|
||||
if (OB_UNLIKELY(!is_opened_)) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObSSTableRowScanner has not been opened", K(ret));
|
||||
} else if (nullptr != block_row_store_ && !block_row_store_->is_disabled() && prefetcher_.switch_to_columnar_scan()) {
|
||||
block_row_store_->set_filter_applied(true);
|
||||
} else if (can_batch_scan()) {
|
||||
ret = OB_PUSHDOWN_STATUS_CHANGED;
|
||||
} else {
|
||||
while(OB_SUCC(ret)) {
|
||||
@ -308,7 +324,7 @@ int ObSSTableRowScanner<PrefetchType>::fetch_row(ObSSTableReadHandle &read_handl
|
||||
if (OB_UNLIKELY(OB_ITER_END != ret)) {
|
||||
LOG_WARN("Fail to open cur data block", K(ret), KPC(this));
|
||||
}
|
||||
} else if (can_vectorize()) {
|
||||
} else if (can_batch_scan()) {
|
||||
ret = OB_PUSHDOWN_STATUS_CHANGED;
|
||||
LOG_TRACE("[Vectorized|Aggregate] pushdown status changed, fuse=>pushdown", K(ret),
|
||||
K(prefetcher_.cur_micro_data_fetch_idx_));
|
||||
@ -328,7 +344,7 @@ int ObSSTableRowScanner<PrefetchType>::fetch_row(ObSSTableReadHandle &read_handl
|
||||
if (OB_UNLIKELY(OB_ITER_END != ret)) {
|
||||
LOG_WARN("Fail to open cur data block", K(ret), KPC(this));
|
||||
}
|
||||
} else if (can_vectorize()) {
|
||||
} else if (can_batch_scan()) {
|
||||
ret = OB_PUSHDOWN_STATUS_CHANGED;
|
||||
LOG_TRACE("[Vectorized|Aggregate] pushdown status changed, fuse=>pushdown", K(ret),
|
||||
K(prefetcher_.cur_micro_data_fetch_idx_));
|
||||
@ -353,13 +369,6 @@ int ObSSTableRowScanner<PrefetchType>::refresh_blockscan_checker(const blockssta
|
||||
return ret;
|
||||
}
|
||||
|
||||
template<typename PrefetchType>
|
||||
bool ObSSTableRowScanner<PrefetchType>::can_vectorize() const
|
||||
{
|
||||
return (iter_param_->vectorized_enabled_ || iter_param_->enable_pd_aggregate()) &&
|
||||
nullptr != block_row_store_ && block_row_store_->filter_applied();
|
||||
}
|
||||
|
||||
template<typename PrefetchType>
|
||||
int ObSSTableRowScanner<PrefetchType>::get_next_rows()
|
||||
{
|
||||
@ -422,7 +431,7 @@ int ObSSTableRowScanner<PrefetchType>::fetch_rows(ObSSTableReadHandle &read_hand
|
||||
if (OB_UNLIKELY(OB_ITER_END != ret)) {
|
||||
LOG_WARN("Fail to open cur data block", K(ret), KPC(this));
|
||||
}
|
||||
} else if (!can_vectorize()) {
|
||||
} else if (!can_batch_scan()) {
|
||||
ret = OB_PUSHDOWN_STATUS_CHANGED;
|
||||
LOG_TRACE("[Vectorized] pushdown status changed, pushdown=>fuse", K(ret),
|
||||
K(prefetcher_.cur_micro_data_fetch_idx_));
|
||||
@ -445,7 +454,7 @@ int ObSSTableRowScanner<PrefetchType>::fetch_rows(ObSSTableReadHandle &read_hand
|
||||
}
|
||||
} else if (need_prefetch && OB_FAIL(prefetcher_.prefetch())) {
|
||||
LOG_WARN("Fail to do prefetch", K(ret), K_(prefetcher));
|
||||
} else if (!can_vectorize()) {
|
||||
} else if (!can_batch_scan()) {
|
||||
ret = OB_PUSHDOWN_STATUS_CHANGED;
|
||||
LOG_TRACE("[Vectorized] pushdown status changed, pushdown=>fuse", K(ret),
|
||||
K(prefetcher_.cur_micro_data_fetch_idx_));
|
||||
@ -461,6 +470,22 @@ int ObSSTableRowScanner<PrefetchType>::fetch_rows(ObSSTableReadHandle &read_hand
|
||||
|
||||
/*************** For columnar store ****************/
|
||||
|
||||
template<>
|
||||
bool ObSSTableRowScanner<ObCOPrefetcher>::can_blockscan() const
|
||||
{
|
||||
return is_scan(type_) &&
|
||||
nullptr != block_row_store_ &&
|
||||
prefetcher_.switch_to_columnar_scan();
|
||||
}
|
||||
|
||||
template<>
|
||||
bool ObSSTableRowScanner<ObCOPrefetcher>::can_batch_scan() const
|
||||
{
|
||||
return can_blockscan() &&
|
||||
!block_row_store_->is_disabled() &&
|
||||
iter_param_->vectorized_enabled_ && iter_param_->enable_pd_filter();
|
||||
}
|
||||
|
||||
template<typename PrefetchType>
|
||||
int ObSSTableRowScanner<PrefetchType>::get_blockscan_start(
|
||||
ObCSRowId &start,
|
||||
|
@ -41,6 +41,8 @@ public:
|
||||
virtual ~ObSSTableRowScanner();
|
||||
virtual void reset();
|
||||
virtual void reuse();
|
||||
virtual bool can_blockscan() const override;
|
||||
virtual bool can_batch_scan() const override;
|
||||
TO_STRING_KV(K_(is_opened), K_(cur_range_idx), K_(prefetcher), KPC_(sstable));
|
||||
protected:
|
||||
int inner_open(
|
||||
@ -59,7 +61,6 @@ protected:
|
||||
|
||||
private:
|
||||
OB_INLINE int init_micro_scanner();
|
||||
OB_INLINE bool can_vectorize() const;
|
||||
int open_cur_data_block(ObSSTableReadHandle &read_handle);
|
||||
int fetch_rows(ObSSTableReadHandle &read_handle);
|
||||
// For columnar store
|
||||
|
@ -87,15 +87,13 @@ public:
|
||||
return OB_NOT_SUPPORTED;
|
||||
}
|
||||
virtual int set_ignore_shadow_row() { return OB_NOT_SUPPORTED; }
|
||||
bool can_blockscan() const
|
||||
virtual bool can_blockscan() const
|
||||
{
|
||||
return is_scan(type_) && is_sstable_iter_ &&
|
||||
nullptr != block_row_store_ && block_row_store_->can_blockscan();
|
||||
return false;
|
||||
}
|
||||
bool filter_applied() const
|
||||
virtual bool can_batch_scan() const
|
||||
{
|
||||
return is_scan(type_) && is_sstable_iter_ &&
|
||||
nullptr != block_row_store_ && block_row_store_->filter_applied();
|
||||
return false;
|
||||
}
|
||||
virtual int get_next_row(const blocksstable::ObDatumRow *&row);
|
||||
virtual int get_next_rows()
|
||||
|
@ -66,7 +66,7 @@ public:
|
||||
{
|
||||
return block_scan_state_;
|
||||
}
|
||||
OB_INLINE bool is_in_row_store_scan_mode()
|
||||
OB_INLINE bool is_in_row_store_scan_mode() const
|
||||
{
|
||||
return ROW_STORE_SCAN == block_scan_state_;
|
||||
}
|
||||
@ -88,7 +88,7 @@ public:
|
||||
{
|
||||
return PENDING_BLOCK_SCAN == block_scan_state_;
|
||||
}
|
||||
OB_INLINE virtual bool switch_to_columnar_scan()
|
||||
OB_INLINE virtual bool switch_to_columnar_scan() const
|
||||
{
|
||||
return !is_in_row_store_scan_mode();
|
||||
}
|
||||
|
@ -35,7 +35,7 @@ ObCOSSTableRowScanner::ObCOSSTableRowScanner()
|
||||
reverse_scan_(false),
|
||||
is_limit_end_(false),
|
||||
state_(BEGIN),
|
||||
blockscan_state_(BLOCKSCAN_RANGE),
|
||||
blockscan_state_(MAX_STATE),
|
||||
group_by_project_idx_(0),
|
||||
group_size_(0),
|
||||
column_group_cnt_(-1),
|
||||
@ -133,7 +133,7 @@ void ObCOSSTableRowScanner::reset()
|
||||
group_size_ = 0;
|
||||
reverse_scan_ = false;
|
||||
state_ = BEGIN;
|
||||
blockscan_state_ = BLOCKSCAN_RANGE;
|
||||
blockscan_state_ = MAX_STATE;
|
||||
range_ = nullptr;
|
||||
is_limit_end_ = false;
|
||||
pending_end_row_id_ = OB_INVALID_CS_ROW_ID;
|
||||
@ -163,7 +163,7 @@ void ObCOSSTableRowScanner::reuse()
|
||||
group_size_ = 0;
|
||||
reverse_scan_ = false;
|
||||
state_ = BEGIN;
|
||||
blockscan_state_ = BLOCKSCAN_RANGE;
|
||||
blockscan_state_ = MAX_STATE;
|
||||
range_ = nullptr;
|
||||
is_limit_end_ = false;
|
||||
pending_end_row_id_ = OB_INVALID_CS_ROW_ID;
|
||||
@ -182,11 +182,11 @@ int ObCOSSTableRowScanner::get_next_rows()
|
||||
if (OB_UNLIKELY(OB_ITER_END != ret)) {
|
||||
LOG_WARN("Fail to get blockscan start", K(ret), KPC(this));
|
||||
} else {
|
||||
set_filter_not_applied();
|
||||
blockscan_state_ = MAX_STATE;
|
||||
state_ = END;
|
||||
}
|
||||
} else if (BLOCKSCAN_FINISH == blockscan_state_) {
|
||||
set_filter_not_applied();
|
||||
blockscan_state_ = MAX_STATE;
|
||||
ret = OB_PUSHDOWN_STATUS_CHANGED;
|
||||
} else {
|
||||
LOG_DEBUG("[COLUMNSTORE] COScanner get_next_rows [change to filter_rows]", K(ret), K_(state), K_(blockscan_state),
|
||||
@ -202,6 +202,9 @@ int ObCOSSTableRowScanner::get_next_rows()
|
||||
} else if (OB_FAIL(filter_rows(blockscan_state_))) {
|
||||
if (OB_UNLIKELY(OB_ITER_END != ret)) {
|
||||
LOG_WARN("Fail to filter rows", K(ret), KPC(this));
|
||||
} else if (MAX_STATE == blockscan_state_) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("Unexpected blockscan state", K(ret), K_(blockscan_state));
|
||||
} else {
|
||||
ret = OB_SUCCESS;
|
||||
state_ = STATE_TRANSITION[blockscan_state_];
|
||||
@ -236,7 +239,7 @@ int ObCOSSTableRowScanner::get_next_rows()
|
||||
} else {
|
||||
batched_row_store_->set_end();
|
||||
}
|
||||
set_filter_not_applied();
|
||||
blockscan_state_ = MAX_STATE;
|
||||
break;
|
||||
}
|
||||
}
|
||||
@ -540,9 +543,6 @@ int ObCOSSTableRowScanner::filter_rows(BlockScanState &blockscan_state)
|
||||
} else {
|
||||
ret = filter_rows_without_limit(blockscan_state);
|
||||
}
|
||||
if (nullptr != block_row_store_) {
|
||||
block_row_store_->set_filter_applied(true);
|
||||
}
|
||||
LOG_TRACE("[COLUMNSTORE] COScanner filter_rows [end]", K(ret), K_(state), K_(blockscan_state),
|
||||
K_(current), K_(group_size), K_(end));
|
||||
return ret;
|
||||
@ -797,6 +797,9 @@ int ObCOSSTableRowScanner::inner_get_next_row(const ObDatumRow *&store_row)
|
||||
} else if (OB_FAIL(row_scanner_->inner_get_next_row_with_row_id(store_row, row_id))) {
|
||||
if (OB_UNLIKELY(OB_PUSHDOWN_STATUS_CHANGED != ret && OB_ITER_END != ret)) {
|
||||
LOG_WARN("Fail to get next row from row scanner", K(ret));
|
||||
} else if (OB_PUSHDOWN_STATUS_CHANGED == ret) {
|
||||
state_ = BEGIN;
|
||||
blockscan_state_ = BLOCKSCAN_RANGE;
|
||||
}
|
||||
} else if (nullptr == getter_project_iter_) {
|
||||
// All columns have been fetched in the row scanner.
|
||||
|
@ -45,6 +45,18 @@ public:
|
||||
const void *query_range) override;
|
||||
virtual void reset() override;
|
||||
virtual void reuse() override;
|
||||
virtual bool can_blockscan() const override
|
||||
{
|
||||
return is_scan(type_) &&
|
||||
nullptr != block_row_store_ &&
|
||||
MAX_STATE != blockscan_state_;
|
||||
}
|
||||
virtual bool can_batch_scan() const override
|
||||
{
|
||||
return can_blockscan() &&
|
||||
iter_param_->vectorized_enabled_ &&
|
||||
iter_param_->enable_pd_filter();
|
||||
}
|
||||
virtual int get_next_rows() override;
|
||||
TO_STRING_KV(KPC_(iter_param),
|
||||
KP_(access_ctx),
|
||||
@ -142,12 +154,6 @@ private:
|
||||
current_ += count;
|
||||
}
|
||||
}
|
||||
OB_INLINE void set_filter_not_applied()
|
||||
{
|
||||
if (nullptr != block_row_store_) {
|
||||
block_row_store_->set_filter_applied(false);
|
||||
}
|
||||
}
|
||||
OB_INLINE bool is_group_idx_expr(sql::ObExpr *e) const
|
||||
{
|
||||
return T_PSEUDO_GROUP_ID == e->type_;
|
||||
|
Loading…
x
Reference in New Issue
Block a user