From 3154082c616d6892528db390d264531614e838b8 Mon Sep 17 00:00:00 2001 From: DengzhiLiu Date: Thu, 8 Feb 2024 21:55:56 +0000 Subject: [PATCH] Adapt the new vec2.0 interface for single row calculation --- .../storage/blocksstable/test_cg_scanner.cpp | 1 + src/sql/engine/basic/ob_pushdown_filter.cpp | 64 ++++++++++++++----- src/sql/engine/basic/ob_pushdown_filter.h | 25 +++++--- src/sql/engine/expr/ob_batch_eval_util.h | 11 ++++ src/sql/engine/ob_bit_vector.h | 23 +++++++ src/sql/engine/ob_operator.cpp | 20 ++++++ src/sql/engine/ob_operator.h | 4 ++ src/storage/access/ob_multiple_merge.cpp | 20 +++++- src/storage/access/ob_multiple_merge.h | 1 + .../cs_encoding/ob_micro_block_cs_decoder.cpp | 2 +- .../encoding/ob_micro_block_decoder.cpp | 2 +- .../blocksstable/ob_micro_block_reader.cpp | 2 +- .../column_store/ob_virtual_cg_scanner.cpp | 12 ++-- .../column_store/ob_virtual_cg_scanner.h | 2 +- unittest/sql/engine/test_bit_vector.cpp | 6 ++ 15 files changed, 162 insertions(+), 33 deletions(-) diff --git a/mittest/mtlenv/storage/blocksstable/test_cg_scanner.cpp b/mittest/mtlenv/storage/blocksstable/test_cg_scanner.cpp index 91123ce85..289235f0d 100644 --- a/mittest/mtlenv/storage/blocksstable/test_cg_scanner.cpp +++ b/mittest/mtlenv/storage/blocksstable/test_cg_scanner.cpp @@ -594,6 +594,7 @@ TEST_F(TestCGScanner, test_filter) pd_filter.cell_data_ptrs_ = reinterpret_cast(buf3); buf3 = allocator_.alloc(sizeof(int64_t) * pd_filter.batch_size_); pd_filter.row_ids_ = reinterpret_cast(buf3); + pd_filter.skip_bit_ = to_bit_vector(allocator_.alloc(ObBitVector::memory_size(256))); pd_filter.is_inited_ = true; int64_t start = 5; diff --git a/src/sql/engine/basic/ob_pushdown_filter.cpp b/src/sql/engine/basic/ob_pushdown_filter.cpp index cb2da0e65..4ef8d9b02 100644 --- a/src/sql/engine/basic/ob_pushdown_filter.cpp +++ b/src/sql/engine/basic/ob_pushdown_filter.cpp @@ -1651,7 +1651,7 @@ ObPhysicalFilterExecutor::~ObPhysicalFilterExecutor() } } -int ObPhysicalFilterExecutor::filter(blocksstable::ObStorageDatum *datums, int64_t col_cnt, bool &filtered) +int ObPhysicalFilterExecutor::filter(blocksstable::ObStorageDatum *datums, int64_t col_cnt, const sql::ObBitVector &skip_bit, bool &filtered) { int ret = OB_SUCCESS; const common::ObIArray *column_exprs = get_cg_col_exprs(); @@ -1666,7 +1666,7 @@ int ObPhysicalFilterExecutor::filter(blocksstable::ObStorageDatum *datums, int64 LOG_WARN("Failed to convert object from datum", K(ret), K(datums[i])); } } - if (OB_SUCC(ret) && OB_FAIL(filter(eval_ctx, filtered))) { + if (OB_SUCC(ret) && OB_FAIL(filter(eval_ctx, skip_bit, filtered))) { LOG_WARN("Failed to calc filter", K(ret)); } } @@ -1875,15 +1875,26 @@ bool ObWhiteFilterExecutor::is_cmp_op_with_null_ref_value() const return is_cmp_op && has_single_null_ref_value; } -int ObWhiteFilterExecutor::filter(ObEvalCtx &eval_ctx, bool &filtered) +int ObWhiteFilterExecutor::filter(ObEvalCtx &eval_ctx, const sql::ObBitVector &skip_bit, bool &filtered) { int ret = OB_SUCCESS; filtered = false; - ObDatum *cmp_res = nullptr; - if (OB_FAIL(filter_.expr_->eval(eval_ctx, cmp_res))) { - LOG_WARN("Failed to eval", K(ret)); + if (!op_.enable_rich_format_) { + ObDatum *cmp_res = nullptr; + if (OB_FAIL(filter_.expr_->eval(eval_ctx, cmp_res))) { + LOG_WARN("Failed to eval", K(ret)); + } else { + filtered = is_row_filtered(*cmp_res); + } } else { - filtered = is_row_filtered(*cmp_res); + const int64_t batch_idx = eval_ctx.get_batch_idx(); + EvalBound eval_bound(eval_ctx.get_batch_size(), batch_idx, batch_idx + 1, false); + if (OB_FAIL(filter_.expr_->eval_vector(eval_ctx, skip_bit, eval_bound))) { + LOG_WARN("Failed to eval vector", K(ret)); + } else { + ObIVector *res = filter_.expr_->get_vector(eval_ctx); + filtered = !res->is_true(batch_idx); + } } if (op_.is_vectorized()) { @@ -1902,16 +1913,30 @@ ObBlackFilterExecutor::~ObBlackFilterExecutor() } } -int ObBlackFilterExecutor::filter(ObEvalCtx &eval_ctx, bool &filtered) +int ObBlackFilterExecutor::filter(ObEvalCtx &eval_ctx, const sql::ObBitVector &skip_bit, bool &filtered) { int ret = OB_SUCCESS; filtered = false; - ObDatum *cmp_res = nullptr; - FOREACH_CNT_X(e, filter_.filter_exprs_, OB_SUCC(ret) && !filtered) { - if (OB_FAIL((*e)->eval(eval_ctx, cmp_res))) { - LOG_WARN("failed to filter child", K(ret)); - } else { - filtered = is_row_filtered(*cmp_res); + const bool enable_rich_format = op_.enable_rich_format_; + if (!enable_rich_format) { + ObDatum *cmp_res = nullptr; + FOREACH_CNT_X(e, filter_.filter_exprs_, OB_SUCC(ret) && !filtered) { + if (OB_FAIL((*e)->eval(eval_ctx, cmp_res))) { + LOG_WARN("failed to filter child", K(ret)); + } else { + filtered = is_row_filtered(*cmp_res); + } + } + } else { + const int64_t batch_idx = eval_ctx.get_batch_idx(); + EvalBound eval_bound(eval_ctx.get_batch_size(), batch_idx, batch_idx + 1, false); + FOREACH_CNT_X(e, filter_.filter_exprs_, OB_SUCC(ret) && !filtered) { + if (OB_FAIL((*e)->eval_vector(eval_ctx, skip_bit, eval_bound))) { + LOG_WARN("Failed to evaluate vector", K(ret)); + } else { + ObIVector *res = (*e)->get_vector(eval_ctx); + filtered = !res->is_true(batch_idx); + } } } @@ -2564,6 +2589,10 @@ void PushdownFilterInfo::reset() allocator_->free(ref_bitmap_); ref_bitmap_ = nullptr; } + if (nullptr != skip_bit_) { + allocator_->free(skip_bit_); + skip_bit_ = nullptr; + } allocator_ = nullptr; } filter_ = nullptr; @@ -2614,7 +2643,8 @@ int PushdownFilterInfo::init(const storage::ObTableIterParam &iter_param, common col_capacity_ = out_col_cnt; } - if (OB_SUCC(ret) && (iter_param.vectorized_enabled_ || iter_param.enable_pd_aggregate())) { + if (OB_SUCC(ret) && (iter_param.vectorized_enabled_ || iter_param.enable_pd_aggregate() || + (nullptr != iter_param.op_ && iter_param.op_->enable_rich_format_))) { batch_size_ = iter_param.vectorized_enabled_ ? iter_param.op_->get_batch_size() : storage::AGGREGATE_STORE_BATCH_SIZE; if (OB_FAIL(col_datum_buf_.init(batch_size_, alloc))) { LOG_WARN("fail to init tmp col datum buf", K(ret)); @@ -2622,6 +2652,9 @@ int PushdownFilterInfo::init(const storage::ObTableIterParam &iter_param, common ret = common::OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail to alloc cell data ptr", K(ret), K(batch_size_)); } else if (FALSE_IT(cell_data_ptrs_ = reinterpret_cast(buf))) { + } else if (OB_ISNULL(skip_bit_ = to_bit_vector(alloc.alloc(ObBitVector::memory_size(batch_size_))))) { + ret = common::OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("Failed to alloc skip bit", K(ret), K_(batch_size)); } else if (OB_ISNULL(buf = alloc.alloc(sizeof(int64_t) * batch_size_))) { ret = common::OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail to alloc row_ids", K(ret), K(batch_size_)); @@ -2629,6 +2662,7 @@ int PushdownFilterInfo::init(const storage::ObTableIterParam &iter_param, common ret = common::OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail to alloc len_array_buf", K(ret), K_(batch_size)); } else { + skip_bit_->init(batch_size_); row_ids_ = reinterpret_cast(buf); len_array_ = reinterpret_cast(len_array_buf); } diff --git a/src/sql/engine/basic/ob_pushdown_filter.h b/src/sql/engine/basic/ob_pushdown_filter.h index 7313e0bec..601bf26db 100644 --- a/src/sql/engine/basic/ob_pushdown_filter.h +++ b/src/sql/engine/basic/ob_pushdown_filter.h @@ -711,9 +711,9 @@ public: datum_eval_flags_(nullptr) {} ~ObPhysicalFilterExecutor(); - int filter(blocksstable::ObStorageDatum *datums, int64_t col_cnt, bool &ret_val); + int filter(blocksstable::ObStorageDatum *datums, int64_t col_cnt, const sql::ObBitVector &skip_bit, bool &ret_val); virtual int init_evaluated_datums() override; - virtual int filter(ObEvalCtx &eval_ctx, bool &filtered) = 0; + virtual int filter(ObEvalCtx &eval_ctx, const sql::ObBitVector &skip_bit, bool &filtered) = 0; INHERIT_TO_STRING_KV("ObPhysicalFilterExecutor", ObPushdownFilterExecutor, K_(n_eval_infos), KP_(eval_infos)); protected: @@ -750,7 +750,7 @@ public: int get_datums_from_column(common::ObIArray &datum_infos); INHERIT_TO_STRING_KV("ObPushdownBlackFilterExecutor", ObPhysicalFilterExecutor, K_(filter), KP_(skip_bit)); - virtual int filter(ObEvalCtx &eval_ctx, bool &filtered) override; + virtual int filter(ObEvalCtx &eval_ctx, const sql::ObBitVector &skip_bit, bool &filtered) override; private: int eval_exprs_batch(ObBitVector &skip, const int64_t bsize); @@ -792,7 +792,7 @@ public: INHERIT_TO_STRING_KV("ObPushdownWhiteFilterExecutor", ObPushdownFilterExecutor, K_(null_param_contained), K_(datum_params), K(param_set_.created()), K_(filter)); - virtual int filter(ObEvalCtx &eval_ctx, bool &filtered) override; + virtual int filter(ObEvalCtx &eval_ctx, const sql::ObBitVector &skip_bit, bool &filtered) override; protected: void check_null_params(); int init_obj_set(); @@ -971,7 +971,7 @@ public: OB_INLINE int64_t get_batch_size() const { return expr_spec_.max_batch_size_; } // filter row for storage callback. // clear expression evaluated flag if row filtered. - OB_INLINE int filter_row_outside(const ObExprPtrIArray &exprs, bool &filtered); + OB_INLINE int filter_row_outside(const ObExprPtrIArray &exprs, const sql::ObBitVector &skip_bit, bool &filtered); // Notice: // clear one/current datum eval flag at a time, do NOT call it // unless fully understand this API. @@ -990,10 +990,14 @@ public: }; // filter row for storage callback. -OB_INLINE int ObPushdownOperator::filter_row_outside(const ObExprPtrIArray &exprs, bool &filtered) +OB_INLINE int ObPushdownOperator::filter_row_outside(const ObExprPtrIArray &exprs, const sql::ObBitVector &skip_bit, bool &filtered) { int ret = common::OB_SUCCESS; - ret = ObOperator::filter_row(eval_ctx_, exprs, filtered); + if (!enable_rich_format_) { + ret = ObOperator::filter_row(eval_ctx_, exprs, filtered); + } else { + ret = ObOperator::filter_row_vector(eval_ctx_, exprs, skip_bit, filtered); + } // always clear evaluated flag, because filter expr and table scan output expr may have // common expr, when eval filter expr, memory of dependence column may from storage, // if not filter and we don't clear eval flag, output expr will used the result datum @@ -1030,6 +1034,7 @@ struct PushdownFilterInfo row_ids_(nullptr), len_array_(nullptr), ref_bitmap_(nullptr), + skip_bit_(nullptr), col_datum_buf_(), allocator_(nullptr), param_(nullptr), @@ -1045,7 +1050,8 @@ struct PushdownFilterInfo ret = ret && (nullptr != datum_buf_); } if (0 < batch_size_) { - ret = ret && (nullptr != cell_data_ptrs_) && (nullptr != row_ids_); + ret = ret && (nullptr != cell_data_ptrs_) && (nullptr != row_ids_) + && (nullptr != skip_bit_); } return ret; } @@ -1068,7 +1074,7 @@ struct PushdownFilterInfo TO_STRING_KV(K_(is_pd_filter), K_(is_pd_to_cg), K_(start), K_(count), K_(col_capacity), K_(batch_size), KP_(datum_buf), KP_(filter), KP_(cell_data_ptrs), KP_(row_ids), KP_(ref_bitmap), - K_(col_datum_buf), KP_(param), KP_(context)); + K_(col_datum_buf), KP_(param), KP_(context), KP_(skip_bit)); bool is_inited_; bool is_pd_filter_; @@ -1084,6 +1090,7 @@ struct PushdownFilterInfo int64_t *row_ids_; uint32_t *len_array_; common::ObBitmap *ref_bitmap_; + sql::ObBitVector *skip_bit_; mutable TmpColDatumBuf col_datum_buf_; common::ObIAllocator *allocator_; const storage::ObTableIterParam *param_; diff --git a/src/sql/engine/expr/ob_batch_eval_util.h b/src/sql/engine/expr/ob_batch_eval_util.h index df72dcdac..acb5a9855 100644 --- a/src/sql/engine/expr/ob_batch_eval_util.h +++ b/src/sql/engine/expr/ob_batch_eval_util.h @@ -228,6 +228,7 @@ struct ObDoArithFixedVectorEval for (int64_t idx = bound.start(); idx < bound.end(); ++idx) { ArithOp::raw_op(res_arr[idx], left_arr[idx], right_arr[idx], args...); } + res_vec->get_nulls()->unset_all(bound.start(), bound.end()); if (expr.may_not_need_raw_check_ && ob_is_int_less_than_64(expr.args_[0]->datum_meta_.type_) && ob_is_int_less_than_64(expr.args_[1]->datum_meta_.type_)) { // do nothing @@ -273,6 +274,7 @@ struct ObDoArithFixedConstVectorEval for (int64_t idx = bound.start(); idx < bound.end(); ++idx) { ArithOp::raw_op(res_arr[idx], left_arr[idx], *right_val, args...); } + res_vec->get_nulls()->unset_all(bound.start(), bound.end()); if (expr.may_not_need_raw_check_ && ob_is_int_less_than_64(expr.args_[0]->datum_meta_.type_) && INT_MIN < *right_val < INT_MAX) { // do nothing @@ -318,6 +320,7 @@ struct ObDoArithConstFixedVectorEval for (int64_t idx = bound.start(); idx < bound.end(); ++idx) { ArithOp::raw_op(res_arr[idx], *left_val, right_arr[idx], args...); } + res_vec->get_nulls()->unset_all(bound.start(), bound.end()); if (expr.may_not_need_raw_check_ && INT_MIN < *left_val < INT_MAX && ob_is_int_less_than_64(expr.args_[1]->datum_meta_.type_)) { } else { @@ -394,11 +397,18 @@ inline int ObDoNumberVectorEval(VECTOR_EVAL_FUNC_ARG_DECL, const bool right_eval for (int idx = bound.start(); OB_SUCC(ret) && idx < bound.end(); idx++) { ret = nmb_eval_op(idx, left_vec, right_vec, res_vec, nmb_fast_op, local_alloc); } + if (std::is_same::value) { + reinterpret_cast(res_vec)->get_nulls()->unset_all(bound.start(), + bound.end()); + } } else { for (int idx = bound.start(); OB_SUCC(ret) && idx < bound.end(); idx++) { if (left_vec->is_null(idx) || right_vec->is_null(idx)) { res_vec->set_null(idx); } else { + if (std::is_same::value) { + res_vec->unset_null(idx); + } ret = nmb_eval_op(idx, left_vec, right_vec, res_vec, nmb_fast_op, local_alloc); } } @@ -806,6 +816,7 @@ struct ObVectorArithOpWrap : public Base if (left_vec.is_null(idx) || right_vec.is_null(idx)) { res_vec.set_null(idx); } else { + res_vec.unset_null(idx); ret = ObVectorArithOpWrap()(res_vec, left_vec, right_vec, idx, args...); } return ret; diff --git a/src/sql/engine/ob_bit_vector.h b/src/sql/engine/ob_bit_vector.h index 986b1136c..2d9db96e7 100644 --- a/src/sql/engine/ob_bit_vector.h +++ b/src/sql/engine/ob_bit_vector.h @@ -130,6 +130,8 @@ public: inline void set_all(const int64_t start_idx, const int64_t end_idx); + inline void unset_all(const int64_t start_idx, const int64_t end_idx); + inline bool is_all_true(const int64_t start_idx, const int64_t end_idx) const; inline void deep_copy(const ObBitVectorImpl &src, const int64_t start_idx, const int64_t end_idx); @@ -555,6 +557,27 @@ inline void ObBitVectorImpl::set_all(const int64_t start_idx, const in } } +template +inline void ObBitVectorImpl::unset_all(const int64_t start_idx, const int64_t end_idx) +{ + int64_t start_cnt = 0; + int64_t end_cnt = 0; + WordType start_mask = 0; + WordType end_mask = 0; + get_start_end_mask(start_idx, end_idx, start_mask, end_mask, start_cnt, end_cnt); + + if (start_cnt == end_cnt) { + data_[start_cnt] &= (~(start_mask & end_mask)); + } else { + data_[start_cnt] &= (~start_mask); + MEMSET(data_ + start_cnt + 1, 0x0, (end_cnt - start_cnt - 1) * BYTES_PER_WORD); + if (end_mask > 0) { + data_[end_cnt] &= (~end_mask); + } + } +} + + template inline void ObBitVectorImpl::deep_copy(const ObBitVectorImpl &src, const int64_t start_idx, const int64_t end_idx) diff --git a/src/sql/engine/ob_operator.cpp b/src/sql/engine/ob_operator.cpp index 2f9b167cd..fb023e663 100644 --- a/src/sql/engine/ob_operator.cpp +++ b/src/sql/engine/ob_operator.cpp @@ -1433,6 +1433,26 @@ int ObOperator::filter_row(ObEvalCtx &eval_ctx, const ObIArray &exprs, return ret; } +int ObOperator::filter_row_vector(ObEvalCtx &eval_ctx, + const common::ObIArray &exprs, + const sql::ObBitVector &skip_bit, + bool &filtered) +{ + int ret = OB_SUCCESS; + filtered = false; + const int64_t batch_idx = eval_ctx.get_batch_idx(); + EvalBound eval_bound(eval_ctx.get_batch_size(), batch_idx, batch_idx + 1, false); + FOREACH_CNT_X(e, exprs, OB_SUCC(ret) && !filtered) { + if (OB_FAIL((*e)->eval_vector(eval_ctx, skip_bit, eval_bound))) { + LOG_WARN("Failed to evaluate vector", K(ret)); + } else { + ObIVector *res = (*e)->get_vector(eval_ctx); + filtered = !res->is_true(batch_idx); + } + } + return ret; +} + int ObOperator::filter(const common::ObIArray &exprs, bool &filtered) { return filter_row(eval_ctx_, exprs, filtered); diff --git a/src/sql/engine/ob_operator.h b/src/sql/engine/ob_operator.h index aa127c9ec..cab4d626f 100644 --- a/src/sql/engine/ob_operator.h +++ b/src/sql/engine/ob_operator.h @@ -476,6 +476,10 @@ public: static int filter_row(ObEvalCtx &eval_ctx, const common::ObIArray &exprs, bool &filtered); + static int filter_row_vector(ObEvalCtx &eval_ctx, + const common::ObIArray &exprs, + const sql::ObBitVector &skip_bit, + bool &filtered); ObBatchRows &get_brs() { return brs_; } // Drain exchange in data for PX, or producer DFO will be blocked. int drain_exch(); diff --git a/src/storage/access/ob_multiple_merge.cpp b/src/storage/access/ob_multiple_merge.cpp index e4dd53742..a484a72b2 100644 --- a/src/storage/access/ob_multiple_merge.cpp +++ b/src/storage/access/ob_multiple_merge.cpp @@ -63,6 +63,7 @@ ObMultipleMerge::ObMultipleMerge() get_table_param_(nullptr), read_memtable_only_(false), block_row_store_(nullptr), + skip_bit_(nullptr), out_project_cols_(), lob_reader_(), scan_state_(ScanState::NONE) @@ -86,6 +87,12 @@ ObMultipleMerge::~ObMultipleMerge() } block_row_store_ = nullptr; } + if (nullptr != skip_bit_) { + if (OB_NOT_NULL(access_ctx_->stmt_allocator_)) { + access_ctx_->stmt_allocator_->free(skip_bit_); + } + skip_bit_ = nullptr; + } } int ObMultipleMerge::init( @@ -141,6 +148,7 @@ int ObMultipleMerge::init( get_table_param_ = &get_table_param; access_param_->iter_param_.set_table_param(get_table_param_); const ObITableReadInfo *read_info = access_param_->iter_param_.get_read_info(); + const int64_t batch_size = access_param_->iter_param_.vectorized_enabled_ ? access_param_->get_op()->get_batch_size() : 1; if (OB_SUCC(ret)) { if (OB_ISNULL(read_info)) { ret = OB_ERR_UNEXPECTED; @@ -162,7 +170,11 @@ int ObMultipleMerge::init( LOG_WARN("fail to alloc row store", K(ret)); } else if (param.iter_param_.is_use_iter_pool() && OB_FAIL(access_ctx_->alloc_iter_pool(access_param_->iter_param_.is_use_column_store()))) { LOG_WARN("Failed to init iter pool", K(ret)); + } else if (OB_ISNULL(skip_bit_ = to_bit_vector(access_ctx_->stmt_allocator_->alloc(ObBitVector::memory_size(batch_size))))) { + ret = common::OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("Failed to alloc skip bit", K(ret), K(batch_size)); } else { + skip_bit_->init(batch_size); access_ctx_->block_row_store_ = block_row_store_; inited_ = true; LOG_TRACE("succ to init multiple merge", K(*this)); @@ -766,6 +778,12 @@ void ObMultipleMerge::reset() } block_row_store_ = nullptr; } + if (nullptr != skip_bit_) { + if (OB_NOT_NULL(access_ctx_->stmt_allocator_)) { + access_ctx_->stmt_allocator_->free(skip_bit_); + } + skip_bit_ = nullptr; + } padding_allocator_.reset(); iters_.reset(); access_param_ = NULL; @@ -1073,7 +1091,7 @@ int ObMultipleMerge::check_filtered(const ObDatumRow &row, bool &filtered) && !access_param_->op_filters_->empty()) { // Execute filter in sql static typing engine. // %row is already projected to output expressions for main table scan. - if (OB_FAIL(access_param_->get_op()->filter_row_outside(*access_param_->op_filters_, filtered))) { + if (OB_FAIL(access_param_->get_op()->filter_row_outside(*access_param_->op_filters_, *skip_bit_, filtered))) { LOG_WARN("filter row failed", K(ret)); } } diff --git a/src/storage/access/ob_multiple_merge.h b/src/storage/access/ob_multiple_merge.h index ad18c168f..cf370f716 100644 --- a/src/storage/access/ob_multiple_merge.h +++ b/src/storage/access/ob_multiple_merge.h @@ -142,6 +142,7 @@ protected: ObGetTableParam *get_table_param_; bool read_memtable_only_; ObBlockRowStore *block_row_store_; + sql::ObBitVector *skip_bit_; common::ObSEArray out_project_cols_; ObLobDataReader lob_reader_; private: diff --git a/src/storage/blocksstable/cs_encoding/ob_micro_block_cs_decoder.cpp b/src/storage/blocksstable/cs_encoding/ob_micro_block_cs_decoder.cpp index eed61514f..a2c0870b0 100644 --- a/src/storage/blocksstable/cs_encoding/ob_micro_block_cs_decoder.cpp +++ b/src/storage/blocksstable/cs_encoding/ob_micro_block_cs_decoder.cpp @@ -1340,7 +1340,7 @@ int ObMicroBlockCSDecoder::filter_pushdown_filter( } bool filtered = false; if (OB_SUCC(ret)) { - if (OB_FAIL(filter.filter(datum_buf, col_count, filtered))) { + if (OB_FAIL(filter.filter(datum_buf, col_count, *pd_filter_info.skip_bit_, filtered))) { LOG_WARN("Failed to filter row with black filter", K(ret), "datum_buf", common::ObArrayWrap(datum_buf, col_count)); } else if (!filtered) { diff --git a/src/storage/blocksstable/encoding/ob_micro_block_decoder.cpp b/src/storage/blocksstable/encoding/ob_micro_block_decoder.cpp index 37cc98435..315a38edf 100644 --- a/src/storage/blocksstable/encoding/ob_micro_block_decoder.cpp +++ b/src/storage/blocksstable/encoding/ob_micro_block_decoder.cpp @@ -1711,7 +1711,7 @@ int ObMicroBlockDecoder::filter_pushdown_filter( } bool filtered = false; if (OB_SUCC(ret)) { - if (OB_FAIL(filter.filter(datum_buf, col_count, filtered))) { + if (OB_FAIL(filter.filter(datum_buf, col_count, *pd_filter_info.skip_bit_, filtered))) { LOG_WARN("Failed to filter row with black filter", K(ret), "datum_buf", common::ObArrayWrap(datum_buf, col_count)); } else if (!filtered) { if (OB_FAIL(result_bitmap.set(offset))) { diff --git a/src/storage/blocksstable/ob_micro_block_reader.cpp b/src/storage/blocksstable/ob_micro_block_reader.cpp index 9fda19cd3..ace38afb6 100644 --- a/src/storage/blocksstable/ob_micro_block_reader.cpp +++ b/src/storage/blocksstable/ob_micro_block_reader.cpp @@ -757,7 +757,7 @@ int ObMicroBlockReader::filter_pushdown_filter( if (OB_SUCC(ret)) { if (filter.is_filter_black_node() || has_lob_out_row) { sql::ObPhysicalFilterExecutor &physical_filter = static_cast(filter); - if (OB_FAIL(physical_filter.filter(datum_buf, col_count, filtered))) { + if (OB_FAIL(physical_filter.filter(datum_buf, col_count, *pd_filter_info.skip_bit_, filtered))) { LOG_WARN("Failed to filter row with black filter", K(ret), K(row_idx)); } if (need_reuse_lob_locator) { diff --git a/src/storage/column_store/ob_virtual_cg_scanner.cpp b/src/storage/column_store/ob_virtual_cg_scanner.cpp index fac40d2a4..bf2b4363d 100644 --- a/src/storage/column_store/ob_virtual_cg_scanner.cpp +++ b/src/storage/column_store/ob_virtual_cg_scanner.cpp @@ -416,7 +416,7 @@ int ObDefaultCGScanner::apply_filter( } else { result = filter_result_; } - } else if (OB_FAIL(do_filter(filter, result))) { + } else if (OB_FAIL(do_filter(filter, *filter_info.skip_bit_, result))) { STORAGE_LOG(WARN, "failed to do filter", K(ret), KPC(filter)); } @@ -483,7 +483,7 @@ int ObDefaultCGScanner::get_next_row(const blocksstable::ObDatumRow *&datum_row) return ret; } -int ObDefaultCGScanner::do_filter(sql::ObPushdownFilterExecutor *filter, bool &result) +int ObDefaultCGScanner::do_filter(sql::ObPushdownFilterExecutor *filter, const sql::ObBitVector &skip_bit, bool &result) { int ret = OB_SUCCESS; bool filtered = false; @@ -491,7 +491,11 @@ int ObDefaultCGScanner::do_filter(sql::ObPushdownFilterExecutor *filter, bool &r if (filter->is_filter_node()) { if (filter->is_filter_black_node()) { sql::ObPhysicalFilterExecutor *black_filter = static_cast(filter); - if (OB_FAIL(black_filter->filter(default_row_.storage_datums_, filter->get_col_count(), filtered))) { + sql::ObPushdownOperator &pushdown_op = black_filter->get_op(); + if (pushdown_op.enable_rich_format_ && + OB_FAIL(storage::init_exprs_uniform_header(black_filter->get_cg_col_exprs(), pushdown_op.get_eval_ctx(), 1))) { + LOG_WARN("Failed to init exprs vector header", K(ret)); + } else if (OB_FAIL(black_filter->filter(default_row_.storage_datums_, filter->get_col_count(), skip_bit, filtered))) { LOG_WARN("Failed to filter row with black filter", K(ret), K(default_row_), KPC(black_filter)); } } else { @@ -514,7 +518,7 @@ int ObDefaultCGScanner::do_filter(sql::ObPushdownFilterExecutor *filter, bool &r if (OB_ISNULL(children[i])) { ret = OB_ERR_UNEXPECTED; LOG_WARN("Unexpected null child filter", K(ret)); - } else if (OB_FAIL(do_filter(children[i], result))) { + } else if (OB_FAIL(do_filter(children[i], skip_bit, result))) { STORAGE_LOG(WARN, "failed to do filter", K(ret), KPC(children[i])); } else if ((result && filter->is_logic_or_node()) || (!result && filter->is_logic_and_node())) { break; diff --git a/src/storage/column_store/ob_virtual_cg_scanner.h b/src/storage/column_store/ob_virtual_cg_scanner.h index 6b829d523..36a852d67 100644 --- a/src/storage/column_store/ob_virtual_cg_scanner.h +++ b/src/storage/column_store/ob_virtual_cg_scanner.h @@ -105,7 +105,7 @@ public: private: int init_datum_infos_and_default_row(const ObTableIterParam &iter_param, ObTableAccessContext &access_ctx); int init_cg_agg_cells(const ObTableIterParam &iter_param, ObTableAccessContext &access_ctx); - int do_filter(sql::ObPushdownFilterExecutor *filter, bool &result); + int do_filter(sql::ObPushdownFilterExecutor *filter, const sql::ObBitVector &skip_bit, bool &result); int add_lob_header_if_need( const share::schema::ObColumnParam &column_param, ObIAllocator &allocator, diff --git a/unittest/sql/engine/test_bit_vector.cpp b/unittest/sql/engine/test_bit_vector.cpp index 863d38f78..282bc6da7 100644 --- a/unittest/sql/engine/test_bit_vector.cpp +++ b/unittest/sql/engine/test_bit_vector.cpp @@ -64,6 +64,12 @@ void test_range(ObBitVector *dest_bit_vector, ObBitVector *src_bit_vector, int64 dest_bit_vector->unset(middle); expect_range(dest_bit_vector, start, middle, end); + dest_bit_vector->set_all(start, end); + dest_bit_vector->unset_all(start, end); + for (int64_t i = 0; i < end + 100; i++) { + EXPECT_EQ(0, dest_bit_vector->at(i)); + } + src_bit_vector->unset(middle); dest_bit_vector->deep_copy(*src_bit_vector, start, end); expect_range(dest_bit_vector, start, middle, end);