From 504ec324bb7e2dada2ca0a4a9c1db7f815cad234 Mon Sep 17 00:00:00 2001 From: TengJianPing <18241664+jacktengg@users.noreply.github.com> Date: Mon, 13 Nov 2023 16:27:23 +0800 Subject: [PATCH] Revert "[refactor](scan) delete bloom_filter_predicate (#26499)" (#26851) This reverts commit 2bb3ef198144954583aea106591959ee09932cba. --- be/src/exprs/create_predicate_function.h | 12 +- be/src/olap/bloom_filter_predicate.h | 197 ++++++++++++++++++ be/src/olap/olap_common.h | 1 - be/src/olap/predicate_creator.h | 1 + be/src/olap/reader.cpp | 16 ++ be/src/olap/reader.h | 3 + .../rowset/segment_v2/segment_iterator.cpp | 3 +- be/src/pipeline/exec/olap_scan_operator.cpp | 2 - be/src/pipeline/exec/olap_scan_operator.h | 1 - be/src/pipeline/exec/scan_operator.cpp | 20 ++ be/src/pipeline/exec/scan_operator.h | 3 + be/src/vec/exec/scan/new_olap_scan_node.cpp | 2 - be/src/vec/exec/scan/new_olap_scan_node.h | 1 - be/src/vec/exec/scan/new_olap_scanner.cpp | 1 - be/src/vec/exec/scan/vscan_node.cpp | 17 ++ be/src/vec/exec/scan/vscan_node.h | 3 + 16 files changed, 272 insertions(+), 11 deletions(-) create mode 100644 be/src/olap/bloom_filter_predicate.h diff --git a/be/src/exprs/create_predicate_function.h b/be/src/exprs/create_predicate_function.h index f4e0601459..0e792563ac 100644 --- a/be/src/exprs/create_predicate_function.h +++ b/be/src/exprs/create_predicate_function.h @@ -17,11 +17,11 @@ #pragma once -#include "bloom_filter_func.h" #include "exprs/hybrid_set.h" #include "exprs/minmax_predicate.h" #include "function_filter.h" #include "olap/bitmap_filter_predicate.h" +#include "olap/bloom_filter_predicate.h" #include "olap/column_predicate.h" #include "olap/in_list_predicate.h" #include "olap/like_column_predicate.h" @@ -225,6 +225,16 @@ inline auto create_bitmap_filter(PrimitiveType type) { return create_bitmap_predicate_function(type); } +template +ColumnPredicate* create_olap_column_predicate(uint32_t column_id, + const std::shared_ptr& filter, + int be_exec_version, const TabletColumn*) { + std::shared_ptr filter_olap; + filter_olap.reset(create_bloom_filter(PT)); + filter_olap->light_copy(filter.get()); + return new BloomFilterColumnPredicate(column_id, filter, be_exec_version); +} + template ColumnPredicate* create_olap_column_predicate(uint32_t column_id, const std::shared_ptr& filter, diff --git a/be/src/olap/bloom_filter_predicate.h b/be/src/olap/bloom_filter_predicate.h new file mode 100644 index 0000000000..d2816be996 --- /dev/null +++ b/be/src/olap/bloom_filter_predicate.h @@ -0,0 +1,197 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "exprs/bloom_filter_func.h" +#include "exprs/runtime_filter.h" +#include "olap/column_predicate.h" +#include "runtime/primitive_type.h" +#include "vec/columns/column_dictionary.h" +#include "vec/columns/column_nullable.h" +#include "vec/columns/column_vector.h" +#include "vec/columns/predicate_column.h" +#include "vec/exprs/vruntimefilter_wrapper.h" + +namespace doris { + +// only use in runtime filter and segment v2 + +template +class BloomFilterColumnPredicate : public ColumnPredicate { +public: + using SpecificFilter = BloomFilterFunc; + + BloomFilterColumnPredicate(uint32_t column_id, + const std::shared_ptr& filter, + int be_exec_version) + : ColumnPredicate(column_id), + _filter(filter), + _specific_filter(reinterpret_cast(_filter.get())), + _be_exec_version(be_exec_version) {} + ~BloomFilterColumnPredicate() override = default; + + PredicateType type() const override { return PredicateType::BF; } + + Status evaluate(BitmapIndexIterator* iterators, uint32_t num_rows, + roaring::Roaring* roaring) const override { + return Status::OK(); + } + + uint16_t evaluate(const vectorized::IColumn& column, uint16_t* sel, + uint16_t size) const override; + +private: + template + uint16_t evaluate(const vectorized::IColumn& column, const uint8_t* null_map, uint16_t* sel, + uint16_t size) const { + if constexpr (is_nullable) { + DCHECK(null_map); + } + + uint24_t tmp_uint24_value; + auto get_cell_value = [&tmp_uint24_value](auto& data) { + if constexpr (std::is_same_v, uint32_t> && + T == PrimitiveType::TYPE_DATE) { + memcpy((char*)(&tmp_uint24_value), (char*)(&data), sizeof(uint24_t)); + return (const char*)&tmp_uint24_value; + } else { + return (const char*)&data; + } + }; + + uint16_t new_size = 0; + if (column.is_column_dictionary()) { + auto* dict_col = reinterpret_cast(&column); + if (_be_exec_version >= 2) { + for (uint16_t i = 0; i < size; i++) { + uint16_t idx = sel[i]; + sel[new_size] = idx; + if constexpr (is_nullable) { + new_size += !null_map[idx] && _specific_filter->find_uint32_t( + dict_col->get_crc32_hash_value(idx)); + } else { + new_size += _specific_filter->find_uint32_t( + dict_col->get_crc32_hash_value(idx)); + } + } + } else { + for (uint16_t i = 0; i < size; i++) { + uint16_t idx = sel[i]; + sel[new_size] = idx; + if constexpr (is_nullable) { + new_size += !null_map[idx] && + _specific_filter->find_uint32_t(dict_col->get_hash_value(idx)); + } else { + new_size += _specific_filter->find_uint32_t(dict_col->get_hash_value(idx)); + } + } + } + } else if (is_string_type(T) && _be_exec_version >= 2) { + auto& pred_col = + reinterpret_cast< + const vectorized::PredicateColumnType>*>( + &column) + ->get_data(); + + auto pred_col_data = pred_col.data(); + const bool is_dense_column = pred_col.size() == size; + for (uint16_t i = 0; i < size; i++) { + uint16_t idx = is_dense_column ? i : sel[i]; + if constexpr (is_nullable) { + if (!null_map[idx] && + _specific_filter->find_crc32_hash(get_cell_value(pred_col_data[idx]))) { + sel[new_size++] = idx; + } + } else { + if (_specific_filter->find_crc32_hash(get_cell_value(pred_col_data[idx]))) { + sel[new_size++] = idx; + } + } + } + } else if (IRuntimeFilter::enable_use_batch(_be_exec_version > 0, T)) { + const auto& data = + reinterpret_cast< + const vectorized::PredicateColumnType>*>( + &column) + ->get_data(); + new_size = _specific_filter->find_fixed_len_olap_engine((char*)data.data(), null_map, + sel, size, data.size() != size); + } else { + auto& pred_col = + reinterpret_cast< + const vectorized::PredicateColumnType>*>( + &column) + ->get_data(); + + auto pred_col_data = pred_col.data(); +#define EVALUATE_WITH_NULL_IMPL(IDX) \ + !null_map[IDX] && _specific_filter->find_olap_engine(get_cell_value(pred_col_data[IDX])) +#define EVALUATE_WITHOUT_NULL_IMPL(IDX) \ + _specific_filter->find_olap_engine(get_cell_value(pred_col_data[IDX])) + EVALUATE_BY_SELECTOR(EVALUATE_WITH_NULL_IMPL, EVALUATE_WITHOUT_NULL_IMPL) +#undef EVALUATE_WITH_NULL_IMPL +#undef EVALUATE_WITHOUT_NULL_IMPL + } + return new_size; + } + + std::string _debug_string() const override { + std::string info = "BloomFilterColumnPredicate(" + type_to_string(T) + ")"; + return info; + } + + int get_filter_id() const override { + int filter_id = _filter->get_filter_id(); + DCHECK(filter_id != -1); + return filter_id; + } + bool is_filter() const override { return true; } + + std::shared_ptr _filter; + SpecificFilter* _specific_filter; // owned by _filter + mutable bool _always_true = false; + mutable bool _has_calculate_filter = false; + int _be_exec_version; +}; + +template +uint16_t BloomFilterColumnPredicate::evaluate(const vectorized::IColumn& column, uint16_t* sel, + uint16_t size) const { + uint16_t new_size = 0; + if (_always_true) { + return size; + } + if (column.is_nullable()) { + auto* nullable_col = reinterpret_cast(&column); + auto& null_map_data = nullable_col->get_null_map_column().get_data(); + new_size = + evaluate(nullable_col->get_nested_column(), null_map_data.data(), sel, size); + } else { + new_size = evaluate(column, nullptr, sel, size); + } + // If the pass rate is very high, for example > 50%, then the bloomfilter is useless. + // Some bloomfilter is useless, for example ssb 4.3, it consumes a lot of cpu but it is + // useless. + _evaluated_rows += size; + _passed_rows += new_size; + vectorized::VRuntimeFilterWrapper::calculate_filter( + _evaluated_rows - _passed_rows, _evaluated_rows, _has_calculate_filter, _always_true); + return new_size; +} + +} //namespace doris diff --git a/be/src/olap/olap_common.h b/be/src/olap/olap_common.h index 37a0ce8813..1921902a9d 100644 --- a/be/src/olap/olap_common.h +++ b/be/src/olap/olap_common.h @@ -313,7 +313,6 @@ struct OlapReaderStatistics { int64_t rows_vec_cond_filtered = 0; int64_t rows_short_circuit_cond_filtered = 0; - int64_t rows_common_expr_filtered = 0; int64_t vec_cond_input_rows = 0; int64_t short_circuit_cond_input_rows = 0; int64_t rows_vec_del_cond_filtered = 0; diff --git a/be/src/olap/predicate_creator.h b/be/src/olap/predicate_creator.h index efe92c6dbd..742336ec77 100644 --- a/be/src/olap/predicate_creator.h +++ b/be/src/olap/predicate_creator.h @@ -23,6 +23,7 @@ #include "exec/olap_utils.h" #include "exprs/create_predicate_function.h" #include "exprs/hybrid_set.h" +#include "olap/bloom_filter_predicate.h" #include "olap/column_predicate.h" #include "olap/comparison_predicate.h" #include "olap/in_list_predicate.h" diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp index e98b56a0fc..dfc99e58ce 100644 --- a/be/src/olap/reader.cpp +++ b/be/src/olap/reader.cpp @@ -489,6 +489,11 @@ Status TabletReader::_init_conditions_param(const ReaderParams& read_params) { } } + // Only key column bloom filter will push down to storage engine + for (const auto& filter : read_params.bloom_filters) { + _col_predicates.emplace_back(_parse_to_predicate(filter)); + } + for (const auto& filter : read_params.bitmap_filters) { _col_predicates.emplace_back(_parse_to_predicate(filter)); } @@ -562,6 +567,17 @@ void TabletReader::_init_conditions_param_except_leafnode_of_andnode( } } +ColumnPredicate* TabletReader::_parse_to_predicate( + const std::pair>& bloom_filter) { + int32_t index = _tablet_schema->field_index(bloom_filter.first); + if (index < 0) { + return nullptr; + } + const TabletColumn& column = _tablet_schema->column(index); + return create_column_predicate(index, bloom_filter.second, column.type(), + _reader_context.runtime_state->be_exec_version(), &column); +} + ColumnPredicate* TabletReader::_parse_to_predicate( const std::pair>& in_filter) { int32_t index = _tablet_schema->field_index(in_filter.first); diff --git a/be/src/olap/reader.h b/be/src/olap/reader.h index 11806f5ca9..b0d5ed1fa2 100644 --- a/be/src/olap/reader.h +++ b/be/src/olap/reader.h @@ -240,6 +240,9 @@ protected: void _init_conditions_param_except_leafnode_of_andnode(const ReaderParams& read_params); + ColumnPredicate* _parse_to_predicate( + const std::pair>& bloom_filter); + ColumnPredicate* _parse_to_predicate( const std::pair>& bitmap_filter); diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp b/be/src/olap/rowset/segment_v2/segment_iterator.cpp index 38e24cd7c8..9f4b57d046 100644 --- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp +++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp @@ -37,6 +37,7 @@ #include "common/object_pool.h" #include "common/status.h" #include "io/io_common.h" +#include "olap/bloom_filter_predicate.h" #include "olap/column_predicate.h" #include "olap/field.h" #include "olap/iterators.h" @@ -2135,9 +2136,7 @@ Status SegmentIterator::_execute_common_expr(uint16_t* sel_rowid_idx, uint16_t& RETURN_IF_ERROR(vectorized::VExprContext::execute_conjuncts_and_filter_block( _common_expr_ctxs_push_down, block, _columns_to_filter, prev_columns, filter)); - const auto origin_size = selected_size; selected_size = _evaluate_common_expr_filter(sel_rowid_idx, selected_size, filter); - _opts.stats->rows_common_expr_filtered += (origin_size - selected_size); return Status::OK(); } diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp b/be/src/pipeline/exec/olap_scan_operator.cpp index 224a58659f..a77dba4c00 100644 --- a/be/src/pipeline/exec/olap_scan_operator.cpp +++ b/be/src/pipeline/exec/olap_scan_operator.cpp @@ -72,8 +72,6 @@ Status OlapScanLocalState::_init_profile() { ADD_COUNTER(_segment_profile, "RowsVectorPredInput", TUnit::UNIT); _rows_short_circuit_cond_input_counter = ADD_COUNTER(_segment_profile, "RowsShortCircuitPredInput", TUnit::UNIT); - _rows_common_expr_filtered_counter = - ADD_COUNTER(_segment_profile, "RowsCommonExprFiltered", TUnit::UNIT); _vec_cond_timer = ADD_TIMER(_segment_profile, "VectorPredEvalTime"); _short_cond_timer = ADD_TIMER(_segment_profile, "ShortPredEvalTime"); _expr_filter_timer = ADD_TIMER(_segment_profile, "ExprFilterEvalTime"); diff --git a/be/src/pipeline/exec/olap_scan_operator.h b/be/src/pipeline/exec/olap_scan_operator.h index d3c6658139..0527fa6f44 100644 --- a/be/src/pipeline/exec/olap_scan_operator.h +++ b/be/src/pipeline/exec/olap_scan_operator.h @@ -114,7 +114,6 @@ private: RuntimeProfile::Counter* _rows_short_circuit_cond_filtered_counter = nullptr; RuntimeProfile::Counter* _rows_vec_cond_input_counter = nullptr; RuntimeProfile::Counter* _rows_short_circuit_cond_input_counter = nullptr; - RuntimeProfile::Counter* _rows_common_expr_filtered_counter = nullptr; RuntimeProfile::Counter* _vec_cond_timer = nullptr; RuntimeProfile::Counter* _short_cond_timer = nullptr; RuntimeProfile::Counter* _expr_filter_timer = nullptr; diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index 4a1d512400..601bf5b8b9 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -356,6 +356,9 @@ Status ScanLocalState::_normalize_predicate( RETURN_IF_PUSH_DOWN( _normalize_bitmap_filter(cur_expr, context, slot, &pdt), status); + RETURN_IF_PUSH_DOWN( + _normalize_bloom_filter(cur_expr, context, slot, &pdt), + status); if (state()->enable_function_pushdown()) { RETURN_IF_PUSH_DOWN(_normalize_function_filters( cur_expr, context, slot, &pdt), @@ -428,6 +431,23 @@ Status ScanLocalState::_normalize_predicate( return Status::OK(); } +template +Status ScanLocalState::_normalize_bloom_filter(vectorized::VExpr* expr, + vectorized::VExprContext* expr_ctx, + SlotDescriptor* slot, + vectorized::VScanNode::PushDownType* pdt) { + if (TExprNodeType::BLOOM_PRED == expr->node_type()) { + DCHECK(expr->children().size() == 1); + vectorized::VScanNode::PushDownType temp_pdt = _should_push_down_bloom_filter(); + if (temp_pdt != vectorized::VScanNode::PushDownType::UNACCEPTABLE) { + _filter_predicates.bloom_filters.emplace_back(slot->col_name(), + expr->get_bloom_filter_func()); + *pdt = temp_pdt; + } + } + return Status::OK(); +} + template Status ScanLocalState::_normalize_bitmap_filter(vectorized::VExpr* expr, vectorized::VExprContext* expr_ctx, diff --git a/be/src/pipeline/exec/scan_operator.h b/be/src/pipeline/exec/scan_operator.h index 8ee9b4b5b4..68d006006f 100644 --- a/be/src/pipeline/exec/scan_operator.h +++ b/be/src/pipeline/exec/scan_operator.h @@ -287,6 +287,9 @@ protected: Status _eval_const_conjuncts(vectorized::VExpr* vexpr, vectorized::VExprContext* expr_ctx, vectorized::VScanNode::PushDownType* pdt); + Status _normalize_bloom_filter(vectorized::VExpr* expr, vectorized::VExprContext* expr_ctx, + SlotDescriptor* slot, vectorized::VScanNode::PushDownType* pdt); + Status _normalize_bitmap_filter(vectorized::VExpr* expr, vectorized::VExprContext* expr_ctx, SlotDescriptor* slot, vectorized::VScanNode::PushDownType* pdt); diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp b/be/src/vec/exec/scan/new_olap_scan_node.cpp index 1a8017d37f..af4c780757 100644 --- a/be/src/vec/exec/scan/new_olap_scan_node.cpp +++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp @@ -133,8 +133,6 @@ Status NewOlapScanNode::_init_profile() { ADD_COUNTER(_segment_profile, "RowsVectorPredInput", TUnit::UNIT); _rows_short_circuit_cond_input_counter = ADD_COUNTER(_segment_profile, "RowsShortCircuitPredInput", TUnit::UNIT); - _rows_common_expr_filtered_counter = - ADD_COUNTER(_segment_profile, "RowsCommonExprFiltered", TUnit::UNIT); _vec_cond_timer = ADD_TIMER(_segment_profile, "VectorPredEvalTime"); _short_cond_timer = ADD_TIMER(_segment_profile, "ShortPredEvalTime"); _expr_filter_timer = ADD_TIMER(_segment_profile, "ExprFilterEvalTime"); diff --git a/be/src/vec/exec/scan/new_olap_scan_node.h b/be/src/vec/exec/scan/new_olap_scan_node.h index 7a25d743c4..93039c6182 100644 --- a/be/src/vec/exec/scan/new_olap_scan_node.h +++ b/be/src/vec/exec/scan/new_olap_scan_node.h @@ -138,7 +138,6 @@ private: RuntimeProfile::Counter* _rows_short_circuit_cond_filtered_counter = nullptr; RuntimeProfile::Counter* _rows_vec_cond_input_counter = nullptr; RuntimeProfile::Counter* _rows_short_circuit_cond_input_counter = nullptr; - RuntimeProfile::Counter* _rows_common_expr_filtered_counter = nullptr; RuntimeProfile::Counter* _vec_cond_timer = nullptr; RuntimeProfile::Counter* _short_cond_timer = nullptr; RuntimeProfile::Counter* _expr_filter_timer = nullptr; diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp b/be/src/vec/exec/scan/new_olap_scanner.cpp index 0726c71e45..53417039c8 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.cpp +++ b/be/src/vec/exec/scan/new_olap_scanner.cpp @@ -551,7 +551,6 @@ void NewOlapScanner::_update_counters_before_close() { COUNTER_UPDATE(Parent->_rows_vec_cond_input_counter, stats.vec_cond_input_rows); \ COUNTER_UPDATE(Parent->_rows_short_circuit_cond_input_counter, \ stats.short_circuit_cond_input_rows); \ - COUNTER_UPDATE(Parent->_rows_common_expr_filtered_counter, stats.rows_common_expr_filtered); \ for (auto& [id, info] : stats.filter_info) { \ Parent->add_filter_info(id, info); \ } \ diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp index f708f4b475..d711aa43b4 100644 --- a/be/src/vec/exec/scan/vscan_node.cpp +++ b/be/src/vec/exec/scan/vscan_node.cpp @@ -522,6 +522,9 @@ Status VScanNode::_normalize_predicate(const VExprSPtr& conjunct_expr_root, VExp RETURN_IF_PUSH_DOWN( _normalize_bitmap_filter(cur_expr, context, slot, &pdt), status); + RETURN_IF_PUSH_DOWN( + _normalize_bloom_filter(cur_expr, context, slot, &pdt), + status); if (_state->enable_function_pushdown()) { RETURN_IF_PUSH_DOWN(_normalize_function_filters( cur_expr, context, slot, &pdt), @@ -594,6 +597,20 @@ Status VScanNode::_normalize_predicate(const VExprSPtr& conjunct_expr_root, VExp return Status::OK(); } +Status VScanNode::_normalize_bloom_filter(VExpr* expr, VExprContext* expr_ctx, SlotDescriptor* slot, + PushDownType* pdt) { + if (TExprNodeType::BLOOM_PRED == expr->node_type()) { + DCHECK(expr->children().size() == 1); + PushDownType temp_pdt = _should_push_down_bloom_filter(); + if (temp_pdt != PushDownType::UNACCEPTABLE) { + _filter_predicates.bloom_filters.emplace_back(slot->col_name(), + expr->get_bloom_filter_func()); + *pdt = temp_pdt; + } + } + return Status::OK(); +} + Status VScanNode::_normalize_bitmap_filter(VExpr* expr, VExprContext* expr_ctx, SlotDescriptor* slot, PushDownType* pdt) { if (TExprNodeType::BITMAP_PRED == expr->node_type()) { diff --git a/be/src/vec/exec/scan/vscan_node.h b/be/src/vec/exec/scan/vscan_node.h index 5110ce18ac..73961f5913 100644 --- a/be/src/vec/exec/scan/vscan_node.h +++ b/be/src/vec/exec/scan/vscan_node.h @@ -365,6 +365,9 @@ private: VExprSPtr& output_expr); Status _eval_const_conjuncts(VExpr* vexpr, VExprContext* expr_ctx, PushDownType* pdt); + Status _normalize_bloom_filter(VExpr* expr, VExprContext* expr_ctx, SlotDescriptor* slot, + PushDownType* pdt); + Status _normalize_bitmap_filter(VExpr* expr, VExprContext* expr_ctx, SlotDescriptor* slot, PushDownType* pdt);