diff --git a/mittest/mtlenv/storage/blocksstable/test_co_sstable_row_scanner.cpp b/mittest/mtlenv/storage/blocksstable/test_co_sstable_row_scanner.cpp index 7e5f23e79..3aa6f8154 100644 --- a/mittest/mtlenv/storage/blocksstable/test_co_sstable_row_scanner.cpp +++ b/mittest/mtlenv/storage/blocksstable/test_co_sstable_row_scanner.cpp @@ -49,6 +49,7 @@ public: virtual void SetUp(); virtual void TearDown(); virtual void prepare_schema(); + void prepare_co_query_param(const bool is_reverse); void generate_range( const int64_t start, const int64_t end, @@ -135,6 +136,14 @@ void TestCOSSTableRowScanner::TearDown() TestIndexBlockDataPrepare::TearDown(); } +void TestCOSSTableRowScanner::prepare_co_query_param(const bool is_reverse) +{ + prepare_query_param(is_reverse); + iter_param_.pd_storage_flag_.set_blockscan_pushdown(true); + iter_param_.pd_storage_flag_.set_filter_pushdown(true); + iter_param_.vectorized_enabled_ = true; +} + void TestCOSSTableRowScanner::prepare_schema() { ObColumnSchemaV2 column; @@ -382,7 +391,7 @@ void TestCOSSTableRowScanner::test_row_scan_only(const bool is_reverse) int64_t start = 0; int64_t end = row_cnt_ - 1; generate_range(start, end, range_); - prepare_query_param(is_reverse); + prepare_co_query_param(is_reverse); OK(scanner_.inner_open(iter_param_, context_, &sstable_, &range_)); scanner_.block_row_store_ = &block_row_store_; if (is_reverse) { @@ -400,7 +409,7 @@ void TestCOSSTableRowScanner::test_row_scan_and_column_scan(const bool is_revers int64_t start = 0; int64_t end = row_cnt_ - 1; generate_range(start, end, range_); - prepare_query_param(is_reverse); + prepare_co_query_param(is_reverse); OK(scanner_.inner_open(iter_param_, context_, &sstable_, &range_)); scanner_.block_row_store_ = &block_row_store_; @@ -445,7 +454,7 @@ void TestCOSSTableRowScanner::test_row_scan_and_column_scan_with_multi_range1() const bool is_reverse = false; prepare_test_case(level_cnt); generate_ranges_case1(is_reverse); - prepare_query_param(is_reverse); + prepare_co_query_param(is_reverse); OK(multi_scanner_.inner_open(iter_param_, context_, &sstable_, &ranges_)); multi_scanner_.block_row_store_ = &block_row_store_; consume_rows_by_row_store(&multi_scanner_, range_row_ids_[0].start_row_id_, @@ -494,7 +503,7 @@ void TestCOSSTableRowScanner::test_reverse_row_scan_and_column_scan_with_multi_r const bool is_reverse = true; prepare_test_case(level_cnt); generate_ranges_case1(is_reverse); - prepare_query_param(is_reverse); + prepare_co_query_param(is_reverse); OK(multi_scanner_.inner_open(iter_param_, context_, &sstable_, &ranges_)); multi_scanner_.block_row_store_ = &block_row_store_; consume_rows_by_row_store(&multi_scanner_, range_row_ids_[0].end_row_id_, @@ -539,7 +548,7 @@ void TestCOSSTableRowScanner::test_row_scan_and_column_scan_with_multi_range2() const bool is_reverse = false; prepare_test_case(level_cnt); generate_ranges_case2(is_reverse); - prepare_query_param(is_reverse); + prepare_co_query_param(is_reverse); OK(multi_scanner_.inner_open(iter_param_, context_, &sstable_, &ranges_)); multi_scanner_.block_row_store_ = &block_row_store_; @@ -616,7 +625,7 @@ void TestCOSSTableRowScanner::test_reverse_row_scan_and_column_scan_with_multi_r const bool is_reverse = true; prepare_test_case(level_cnt); generate_ranges_case2(is_reverse); - prepare_query_param(is_reverse); + prepare_co_query_param(is_reverse); OK(multi_scanner_.inner_open(iter_param_, context_, &sstable_, &ranges_)); multi_scanner_.block_row_store_ = &block_row_store_; diff --git a/src/storage/access/ob_block_row_store.h b/src/storage/access/ob_block_row_store.h index 7916963c2..99c4a7887 100644 --- a/src/storage/access/ob_block_row_store.h +++ b/src/storage/access/ob_block_row_store.h @@ -67,9 +67,6 @@ public: OB_INLINE bool can_blockscan() const { return can_blockscan_; } OB_INLINE bool filter_pushdown() const { return pd_filter_info_.is_pd_filter_; } OB_INLINE bool filter_applied() const { return filter_applied_; } - // For columnar store. - // TODO(hanling: it is used for compatible with row store and will be deprecated in the future) - OB_INLINE void set_filter_applied(const bool is_filter_applied) { filter_applied_ = is_filter_applied; } OB_INLINE bool filter_is_null() const { return pd_filter_info_.is_pd_filter_ && nullptr == pd_filter_info_.filter_; } int apply_blockscan( blocksstable::ObIMicroBlockRowScanner µ_scanner, diff --git a/src/storage/access/ob_multiple_scan_merge.cpp b/src/storage/access/ob_multiple_scan_merge.cpp index ae6de1724..d2d708132 100644 --- a/src/storage/access/ob_multiple_scan_merge.cpp +++ b/src/storage/access/ob_multiple_scan_merge.cpp @@ -272,8 +272,6 @@ int ObMultipleScanMerge::locate_blockscan_border() LOG_WARN("Unexpected null iter", K(ret), K(consumers_[0])); } else if (OB_FAIL(iter->refresh_blockscan_checker(border_key))) { LOG_WARN("Failed to check pushdown skip", K(ret), K(border_key)); - } else { - block_row_store_->set_filter_applied(true); } } return ret; @@ -547,7 +545,7 @@ int ObMultipleScanMerge::can_batch_scan(bool &can_batch) } else if (OB_ISNULL(iter = iters_.at(consumers_[0]))) { ret = OB_ERR_UNEXPECTED; STORAGE_LOG(WARN, "Unexpected null iter", K(ret), K(consumers_[0]), K(iters_), K(*this)); - } else if (iter->filter_applied()) { + } else if (iter->can_batch_scan()) { can_batch = true; } } diff --git a/src/storage/access/ob_sstable_row_scanner.cpp b/src/storage/access/ob_sstable_row_scanner.cpp index b48928c9b..67371eb48 100644 --- a/src/storage/access/ob_sstable_row_scanner.cpp +++ b/src/storage/access/ob_sstable_row_scanner.cpp @@ -59,6 +59,23 @@ void ObSSTableRowScanner::reuse() prefetcher_.reuse(); } +template +bool ObSSTableRowScanner::can_blockscan() const +{ + return is_scan(type_) && + nullptr != block_row_store_ && + block_row_store_->can_blockscan(); +} + +template +bool ObSSTableRowScanner::can_batch_scan() const +{ + return can_blockscan() && + block_row_store_->filter_applied() && + // can batch scan when only enable_pd_aggregate, as it uses own datum buffer and only return aggregated result + (iter_param_->vectorized_enabled_ || iter_param_->enable_pd_aggregate()); +} + template int ObSSTableRowScanner::inner_open( const ObTableIterParam &iter_param, @@ -236,8 +253,7 @@ int ObSSTableRowScanner::inner_get_next_row(const ObDatumRow *&sto if (OB_UNLIKELY(!is_opened_)) { ret = OB_NOT_INIT; LOG_WARN("ObSSTableRowScanner has not been opened", K(ret)); - } else if (nullptr != block_row_store_ && !block_row_store_->is_disabled() && prefetcher_.switch_to_columnar_scan()) { - block_row_store_->set_filter_applied(true); + } else if (can_batch_scan()) { ret = OB_PUSHDOWN_STATUS_CHANGED; } else { while(OB_SUCC(ret)) { @@ -308,7 +324,7 @@ int ObSSTableRowScanner::fetch_row(ObSSTableReadHandle &read_handl if (OB_UNLIKELY(OB_ITER_END != ret)) { LOG_WARN("Fail to open cur data block", K(ret), KPC(this)); } - } else if (can_vectorize()) { + } else if (can_batch_scan()) { ret = OB_PUSHDOWN_STATUS_CHANGED; LOG_TRACE("[Vectorized|Aggregate] pushdown status changed, fuse=>pushdown", K(ret), K(prefetcher_.cur_micro_data_fetch_idx_)); @@ -328,7 +344,7 @@ int ObSSTableRowScanner::fetch_row(ObSSTableReadHandle &read_handl if (OB_UNLIKELY(OB_ITER_END != ret)) { LOG_WARN("Fail to open cur data block", K(ret), KPC(this)); } - } else if (can_vectorize()) { + } else if (can_batch_scan()) { ret = OB_PUSHDOWN_STATUS_CHANGED; LOG_TRACE("[Vectorized|Aggregate] pushdown status changed, fuse=>pushdown", K(ret), K(prefetcher_.cur_micro_data_fetch_idx_)); @@ -353,13 +369,6 @@ int ObSSTableRowScanner::refresh_blockscan_checker(const blockssta return ret; } -template -bool ObSSTableRowScanner::can_vectorize() const -{ - return (iter_param_->vectorized_enabled_ || iter_param_->enable_pd_aggregate()) && - nullptr != block_row_store_ && block_row_store_->filter_applied(); -} - template int ObSSTableRowScanner::get_next_rows() { @@ -422,7 +431,7 @@ int ObSSTableRowScanner::fetch_rows(ObSSTableReadHandle &read_hand if (OB_UNLIKELY(OB_ITER_END != ret)) { LOG_WARN("Fail to open cur data block", K(ret), KPC(this)); } - } else if (!can_vectorize()) { + } else if (!can_batch_scan()) { ret = OB_PUSHDOWN_STATUS_CHANGED; LOG_TRACE("[Vectorized] pushdown status changed, pushdown=>fuse", K(ret), K(prefetcher_.cur_micro_data_fetch_idx_)); @@ -445,7 +454,7 @@ int ObSSTableRowScanner::fetch_rows(ObSSTableReadHandle &read_hand } } else if (need_prefetch && OB_FAIL(prefetcher_.prefetch())) { LOG_WARN("Fail to do prefetch", K(ret), K_(prefetcher)); - } else if (!can_vectorize()) { + } else if (!can_batch_scan()) { ret = OB_PUSHDOWN_STATUS_CHANGED; LOG_TRACE("[Vectorized] pushdown status changed, pushdown=>fuse", K(ret), K(prefetcher_.cur_micro_data_fetch_idx_)); @@ -461,6 +470,22 @@ int ObSSTableRowScanner::fetch_rows(ObSSTableReadHandle &read_hand /*************** For columnar store ****************/ +template<> +bool ObSSTableRowScanner::can_blockscan() const +{ + return is_scan(type_) && + nullptr != block_row_store_ && + prefetcher_.switch_to_columnar_scan(); +} + +template<> +bool ObSSTableRowScanner::can_batch_scan() const +{ + return can_blockscan() && + !block_row_store_->is_disabled() && + iter_param_->vectorized_enabled_ && iter_param_->enable_pd_filter(); +} + template int ObSSTableRowScanner::get_blockscan_start( ObCSRowId &start, diff --git a/src/storage/access/ob_sstable_row_scanner.h b/src/storage/access/ob_sstable_row_scanner.h index 9ab47039f..8a85fba0e 100644 --- a/src/storage/access/ob_sstable_row_scanner.h +++ b/src/storage/access/ob_sstable_row_scanner.h @@ -41,6 +41,8 @@ public: virtual ~ObSSTableRowScanner(); virtual void reset(); virtual void reuse(); + virtual bool can_blockscan() const override; + virtual bool can_batch_scan() const override; TO_STRING_KV(K_(is_opened), K_(cur_range_idx), K_(prefetcher), KPC_(sstable)); protected: int inner_open( @@ -59,7 +61,6 @@ protected: private: OB_INLINE int init_micro_scanner(); - OB_INLINE bool can_vectorize() const; int open_cur_data_block(ObSSTableReadHandle &read_handle); int fetch_rows(ObSSTableReadHandle &read_handle); // For columnar store diff --git a/src/storage/access/ob_store_row_iterator.h b/src/storage/access/ob_store_row_iterator.h index b03039dc5..1e150b09a 100644 --- a/src/storage/access/ob_store_row_iterator.h +++ b/src/storage/access/ob_store_row_iterator.h @@ -87,15 +87,13 @@ public: return OB_NOT_SUPPORTED; } virtual int set_ignore_shadow_row() { return OB_NOT_SUPPORTED; } - bool can_blockscan() const + virtual bool can_blockscan() const { - return is_scan(type_) && is_sstable_iter_ && - nullptr != block_row_store_ && block_row_store_->can_blockscan(); + return false; } - bool filter_applied() const + virtual bool can_batch_scan() const { - return is_scan(type_) && is_sstable_iter_ && - nullptr != block_row_store_ && block_row_store_->filter_applied(); + return false; } virtual int get_next_row(const blocksstable::ObDatumRow *&row); virtual int get_next_rows() diff --git a/src/storage/column_store/ob_co_prefetcher.h b/src/storage/column_store/ob_co_prefetcher.h index 4c3e11afa..21cb50af2 100644 --- a/src/storage/column_store/ob_co_prefetcher.h +++ b/src/storage/column_store/ob_co_prefetcher.h @@ -66,7 +66,7 @@ public: { return block_scan_state_; } - OB_INLINE bool is_in_row_store_scan_mode() + OB_INLINE bool is_in_row_store_scan_mode() const { return ROW_STORE_SCAN == block_scan_state_; } @@ -88,7 +88,7 @@ public: { return PENDING_BLOCK_SCAN == block_scan_state_; } - OB_INLINE virtual bool switch_to_columnar_scan() + OB_INLINE virtual bool switch_to_columnar_scan() const { return !is_in_row_store_scan_mode(); } diff --git a/src/storage/column_store/ob_co_sstable_row_scanner.cpp b/src/storage/column_store/ob_co_sstable_row_scanner.cpp index 78c9633da..87773e58e 100644 --- a/src/storage/column_store/ob_co_sstable_row_scanner.cpp +++ b/src/storage/column_store/ob_co_sstable_row_scanner.cpp @@ -35,7 +35,7 @@ ObCOSSTableRowScanner::ObCOSSTableRowScanner() reverse_scan_(false), is_limit_end_(false), state_(BEGIN), - blockscan_state_(BLOCKSCAN_RANGE), + blockscan_state_(MAX_STATE), group_by_project_idx_(0), group_size_(0), column_group_cnt_(-1), @@ -133,7 +133,7 @@ void ObCOSSTableRowScanner::reset() group_size_ = 0; reverse_scan_ = false; state_ = BEGIN; - blockscan_state_ = BLOCKSCAN_RANGE; + blockscan_state_ = MAX_STATE; range_ = nullptr; is_limit_end_ = false; pending_end_row_id_ = OB_INVALID_CS_ROW_ID; @@ -163,7 +163,7 @@ void ObCOSSTableRowScanner::reuse() group_size_ = 0; reverse_scan_ = false; state_ = BEGIN; - blockscan_state_ = BLOCKSCAN_RANGE; + blockscan_state_ = MAX_STATE; range_ = nullptr; is_limit_end_ = false; pending_end_row_id_ = OB_INVALID_CS_ROW_ID; @@ -182,11 +182,11 @@ int ObCOSSTableRowScanner::get_next_rows() if (OB_UNLIKELY(OB_ITER_END != ret)) { LOG_WARN("Fail to get blockscan start", K(ret), KPC(this)); } else { - set_filter_not_applied(); + blockscan_state_ = MAX_STATE; state_ = END; } } else if (BLOCKSCAN_FINISH == blockscan_state_) { - set_filter_not_applied(); + blockscan_state_ = MAX_STATE; ret = OB_PUSHDOWN_STATUS_CHANGED; } else { LOG_DEBUG("[COLUMNSTORE] COScanner get_next_rows [change to filter_rows]", K(ret), K_(state), K_(blockscan_state), @@ -202,6 +202,9 @@ int ObCOSSTableRowScanner::get_next_rows() } else if (OB_FAIL(filter_rows(blockscan_state_))) { if (OB_UNLIKELY(OB_ITER_END != ret)) { LOG_WARN("Fail to filter rows", K(ret), KPC(this)); + } else if (MAX_STATE == blockscan_state_) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("Unexpected blockscan state", K(ret), K_(blockscan_state)); } else { ret = OB_SUCCESS; state_ = STATE_TRANSITION[blockscan_state_]; @@ -236,7 +239,7 @@ int ObCOSSTableRowScanner::get_next_rows() } else { batched_row_store_->set_end(); } - set_filter_not_applied(); + blockscan_state_ = MAX_STATE; break; } } @@ -540,9 +543,6 @@ int ObCOSSTableRowScanner::filter_rows(BlockScanState &blockscan_state) } else { ret = filter_rows_without_limit(blockscan_state); } - if (nullptr != block_row_store_) { - block_row_store_->set_filter_applied(true); - } LOG_TRACE("[COLUMNSTORE] COScanner filter_rows [end]", K(ret), K_(state), K_(blockscan_state), K_(current), K_(group_size), K_(end)); return ret; @@ -797,6 +797,9 @@ int ObCOSSTableRowScanner::inner_get_next_row(const ObDatumRow *&store_row) } else if (OB_FAIL(row_scanner_->inner_get_next_row_with_row_id(store_row, row_id))) { if (OB_UNLIKELY(OB_PUSHDOWN_STATUS_CHANGED != ret && OB_ITER_END != ret)) { LOG_WARN("Fail to get next row from row scanner", K(ret)); + } else if (OB_PUSHDOWN_STATUS_CHANGED == ret) { + state_ = BEGIN; + blockscan_state_ = BLOCKSCAN_RANGE; } } else if (nullptr == getter_project_iter_) { // All columns have been fetched in the row scanner. diff --git a/src/storage/column_store/ob_co_sstable_row_scanner.h b/src/storage/column_store/ob_co_sstable_row_scanner.h index ced251fce..413de801b 100644 --- a/src/storage/column_store/ob_co_sstable_row_scanner.h +++ b/src/storage/column_store/ob_co_sstable_row_scanner.h @@ -45,6 +45,18 @@ public: const void *query_range) override; virtual void reset() override; virtual void reuse() override; + virtual bool can_blockscan() const override + { + return is_scan(type_) && + nullptr != block_row_store_ && + MAX_STATE != blockscan_state_; + } + virtual bool can_batch_scan() const override + { + return can_blockscan() && + iter_param_->vectorized_enabled_ && + iter_param_->enable_pd_filter(); + } virtual int get_next_rows() override; TO_STRING_KV(KPC_(iter_param), KP_(access_ctx), @@ -142,12 +154,6 @@ private: current_ += count; } } - OB_INLINE void set_filter_not_applied() - { - if (nullptr != block_row_store_) { - block_row_store_->set_filter_applied(false); - } - } OB_INLINE bool is_group_idx_expr(sql::ObExpr *e) const { return T_PSEUDO_GROUP_ID == e->type_;