diff --git a/be/src/exec/olap_common.h b/be/src/exec/olap_common.h index fb303de5f6..2f37b05633 100644 --- a/be/src/exec/olap_common.h +++ b/be/src/exec/olap_common.h @@ -256,6 +256,11 @@ public: range.remove_fixed_value(*value); } + static void add_value_range(ColumnValueRange& range, SQLFilterOp op, + CppType* value) { + range.add_range(op, *value); + } + static ColumnValueRange create_empty_column_value_range() { return ColumnValueRange::create_empty_column_value_range(""); } diff --git a/be/src/exec/olap_scan_node.h b/be/src/exec/olap_scan_node.h index ea2b0dc70d..4097c710fe 100644 --- a/be/src/exec/olap_scan_node.h +++ b/be/src/exec/olap_scan_node.h @@ -158,7 +158,6 @@ protected: int conj_idx, int child_idx); friend class OlapScanner; - friend class vectorized::VOlapScanner; // Tuple id resolved in prepare() to set _tuple_desc; TupleId _tuple_id; diff --git a/be/src/exec/olap_utils.h b/be/src/exec/olap_utils.h index eb0cf52e98..1fe0e21f71 100644 --- a/be/src/exec/olap_utils.h +++ b/be/src/exec/olap_utils.h @@ -184,6 +184,9 @@ inline int get_olap_size(PrimitiveType type) { return 0; } +template +static constexpr bool always_false_v = false; + inline SQLFilterOp to_olap_filter_type(TExprOpcode::type type, bool opposite) { switch (type) { case TExprOpcode::LT: @@ -215,4 +218,23 @@ inline SQLFilterOp to_olap_filter_type(TExprOpcode::type type, bool opposite) { return FILTER_IN; } +inline SQLFilterOp to_olap_filter_type(const std::string& function_name, bool opposite) { + if (function_name == "lt") { + return opposite ? FILTER_LARGER : FILTER_LESS; + } else if (function_name == "gt") { + return opposite ? FILTER_LESS : FILTER_LARGER; + } else if (function_name == "le") { + return opposite ? FILTER_LARGER_OR_EQUAL : FILTER_LESS_OR_EQUAL; + } else if (function_name == "ge") { + return opposite ? FILTER_LESS_OR_EQUAL : FILTER_LARGER_OR_EQUAL; + } else if (function_name == "eq") { + return opposite ? FILTER_NOT_IN : FILTER_IN; + } else if (function_name == "ne") { + return opposite ? FILTER_IN : FILTER_NOT_IN; + } else { + DCHECK(false) << "Function Name: " << function_name; + return FILTER_IN; + } +} + } // namespace doris diff --git a/be/src/exprs/bloomfilter_predicate.cpp b/be/src/exprs/bloomfilter_predicate.cpp index 26cd7a209b..fcf8589ac6 100644 --- a/be/src/exprs/bloomfilter_predicate.cpp +++ b/be/src/exprs/bloomfilter_predicate.cpp @@ -43,12 +43,13 @@ BloomFilterPredicate::BloomFilterPredicate(const BloomFilterPredicate& other) _filtered_rows(), _scan_rows() {} -Status BloomFilterPredicate::prepare(RuntimeState* state, IBloomFilterFuncBase* filter) { +Status BloomFilterPredicate::prepare(RuntimeState* state, + std::shared_ptr filter) { // DCHECK(filter != nullptr); if (_is_prepare) { return Status::OK(); } - _filter.reset(filter); + _filter = filter; if (nullptr == _filter.get()) { return Status::InternalError("Unknown column type."); } diff --git a/be/src/exprs/bloomfilter_predicate.h b/be/src/exprs/bloomfilter_predicate.h index 7f420a2d8e..5fcf604518 100644 --- a/be/src/exprs/bloomfilter_predicate.h +++ b/be/src/exprs/bloomfilter_predicate.h @@ -365,7 +365,7 @@ public: return pool->add(new BloomFilterPredicate(*this)); } using Predicate::prepare; - Status prepare(RuntimeState* state, IBloomFilterFuncBase* bloomfilterfunc); + Status prepare(RuntimeState* state, std::shared_ptr bloomfilterfunc); std::shared_ptr get_bloom_filter_func() { return _filter; } diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index 5983b11fcc..89e832f3d0 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -1000,7 +1000,7 @@ private: int32_t _max_in_num = -1; std::unique_ptr _minmax_func; std::unique_ptr _hybrid_set; - std::unique_ptr _bloomfilter_func; + std::shared_ptr _bloomfilter_func; bool _is_bloomfilter = false; bool _is_ignored_in_filter = false; std::string* _ignored_in_filter_msg = nullptr; @@ -1061,6 +1061,14 @@ Status IRuntimeFilter::get_push_expr_ctxs(std::list* push_expr_ctx return Status::OK(); } +Status IRuntimeFilter::get_push_expr_ctxs(std::vector* push_vexprs) { + DCHECK(is_consumer()); + if (!_is_ignored) { + return _wrapper->get_push_vexprs(push_vexprs, _state, _vprobe_ctx); + } + return Status::OK(); +} + Status IRuntimeFilter::get_push_expr_ctxs(std::list* push_expr_ctxs, ExprContext* probe_ctx) { DCHECK(is_producer()); @@ -1649,7 +1657,7 @@ Status RuntimePredicateWrapper::get_push_context(T* container, RuntimeState* sta node.__isset.vector_opcode = true; node.__set_vector_opcode(to_in_opcode(_column_return_type)); auto bloom_pred = _pool->add(new BloomFilterPredicate(node)); - RETURN_IF_ERROR(bloom_pred->prepare(state, _bloomfilter_func.release())); + RETURN_IF_ERROR(bloom_pred->prepare(state, _bloomfilter_func)); bloom_pred->add_child(Expr::copy(_pool, prob_expr->root())); ExprContext* ctx = _pool->add(new ExprContext(bloom_pred)); container->push_back(ctx); @@ -1677,6 +1685,8 @@ Status RuntimePredicateWrapper::get_push_vexprs(std::vectorsize() > 0 ? true : vprob_expr->root()->is_nullable()); TExprNode node; node.__set_type(type_desc); node.__set_node_type(TExprNodeType::IN_PRED); @@ -1684,6 +1694,8 @@ Status RuntimePredicateWrapper::get_push_vexprs(std::vectorsize() > 0 ? true + : vprob_expr->root()->is_nullable()); // VInPredicate doris::vectorized::VExpr* expr = nullptr; @@ -1732,12 +1744,14 @@ Status RuntimePredicateWrapper::get_push_vexprs(std::vectorroot()->is_nullable()); TExprNode node; node.__set_type(type_desc); node.__set_node_type(TExprNodeType::BLOOM_PRED); node.__set_opcode(TExprOpcode::RT_FILTER); node.__isset.vector_opcode = true; node.__set_vector_opcode(to_in_opcode(_column_return_type)); + node.__set_is_nullable(vprob_expr->root()->is_nullable()); auto bloom_pred = _pool->add(new doris::vectorized::VBloomPredicate(node)); bloom_pred->set_filter(_bloomfilter_func); auto cloned_vexpr = vprob_expr->root()->clone(_pool); diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h index b4382d8874..587c0aa87a 100644 --- a/be/src/exprs/runtime_filter.h +++ b/be/src/exprs/runtime_filter.h @@ -154,6 +154,8 @@ public: // only consumer could call this Status get_push_expr_ctxs(std::list* push_expr_ctxs); + Status get_push_expr_ctxs(std::vector* push_vexprs); + // This function is used by UT and producer Status get_push_expr_ctxs(std::list* push_expr_ctxs, ExprContext* probe_ctx); diff --git a/be/src/vec/exec/volap_scan_node.cpp b/be/src/vec/exec/volap_scan_node.cpp index 4b4ddf313f..865b2d838b 100644 --- a/be/src/vec/exec/volap_scan_node.cpp +++ b/be/src/vec/exec/volap_scan_node.cpp @@ -30,12 +30,22 @@ #include "vec/core/block.h" #include "vec/data_types/data_type_decimal.h" #include "vec/exec/volap_scanner.h" +#include "vec/exprs/vbloom_predicate.h" #include "vec/exprs/vcompound_pred.h" #include "vec/exprs/vexpr.h" +#include "vec/exprs/vruntimefilter_wrapper.h" +#include "vec/functions/in.h" namespace doris::vectorized { using doris::operator<<; +#define RETURN_IF_PUSH_DOWN(stmt) \ + if (!push_down) { \ + stmt; \ + } else { \ + return; \ + } + VOlapScanNode::VOlapScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) : ScanNode(pool, tnode, descs), _tuple_id(tnode.olap_scan_node.tuple_id), @@ -59,7 +69,6 @@ VOlapScanNode::VOlapScanNode(ObjectPool* pool, const TPlanNode& tnode, const Des Status VOlapScanNode::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(ExecNode::init(tnode, state)); - _direct_conjunct_size = _conjunct_ctxs.size(); const TQueryOptions& query_options = state->query_options(); if (query_options.__isset.max_scan_key_num) { @@ -244,6 +253,7 @@ Status VOlapScanNode::open(RuntimeState* state) { // acquire runtime filter _runtime_filter_ctxs.resize(_runtime_filter_descs.size()); + std::vector vexprs; for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) { auto& filter_desc = _runtime_filter_descs[i]; IRuntimeFilter* runtime_filter = nullptr; @@ -257,21 +267,12 @@ Status VOlapScanNode::open(RuntimeState* state) { ready = runtime_filter->await(); } if (ready) { - std::list expr_context; - RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(&expr_context)); + RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(&vexprs)); _runtime_filter_ctxs[i].apply_mark = true; _runtime_filter_ctxs[i].runtimefilter = runtime_filter; - - for (auto ctx : expr_context) { - ctx->prepare(state, row_desc()); - ctx->open(state); - int index = _conjunct_ctxs.size(); - _conjunct_ctxs.push_back(ctx); - // it's safe to store address from a fix-resized vector - _conjunctid_to_runtime_filter_ctxs[index] = &_runtime_filter_ctxs[i]; - } } } + RETURN_IF_ERROR(_append_rf_into_conjuncts(state, vexprs)); return Status::OK(); } @@ -469,7 +470,7 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner) { if (_vconjunct_ctx_ptr) { _stale_vexpr_ctxs.push_back(std::move(_vconjunct_ctx_ptr)); } - _vconjunct_ctx_ptr.reset(new doris::vectorized::VExprContext*); + _vconjunct_ctx_ptr.reset(new VExprContext*); *(_vconjunct_ctx_ptr.get()) = new_vconjunct_ctx_ptr; _runtime_filter_ready_flag[i] = true; } @@ -621,136 +622,41 @@ Status VOlapScanNode::_add_blocks(std::vector& block) { return Status::OK(); } -void VOlapScanNode::eval_const_conjuncts() { - for (int conj_idx = 0; conj_idx < _conjunct_ctxs.size(); ++conj_idx) { - // if conjunct is constant, compute direct and set eos = true - if (_conjunct_ctxs[conj_idx]->root()->is_constant()) { - void* value = _conjunct_ctxs[conj_idx]->get_value(nullptr); - if (value == nullptr || *reinterpret_cast(value) == false) { - _eos = true; - break; - } - } - } -} - Status VOlapScanNode::normalize_conjuncts() { std::vector slots = _tuple_desc->slots(); for (int slot_idx = 0; slot_idx < slots.size(); ++slot_idx) { switch (slots[slot_idx]->type().type) { - case TYPE_TINYINT: { - ColumnValueRange range(slots[slot_idx]->col_name()); - normalize_predicate(range, slots[slot_idx]); - break; - } - - case TYPE_SMALLINT: { - ColumnValueRange range(slots[slot_idx]->col_name()); - normalize_predicate(range, slots[slot_idx]); - break; - } - - case TYPE_INT: { - ColumnValueRange range(slots[slot_idx]->col_name()); - normalize_predicate(range, slots[slot_idx]); - break; - } - - case TYPE_BIGINT: { - ColumnValueRange range(slots[slot_idx]->col_name()); - normalize_predicate(range, slots[slot_idx]); - break; - } - - case TYPE_LARGEINT: { - ColumnValueRange range(slots[slot_idx]->col_name()); - normalize_predicate(range, slots[slot_idx]); - break; - } - - case TYPE_CHAR: { - ColumnValueRange range(slots[slot_idx]->col_name()); - normalize_predicate(range, slots[slot_idx]); - break; - } - case TYPE_VARCHAR: { - ColumnValueRange range(slots[slot_idx]->col_name()); - normalize_predicate(range, slots[slot_idx]); - break; - } - case TYPE_HLL: { - ColumnValueRange range(slots[slot_idx]->col_name()); - normalize_predicate(range, slots[slot_idx]); - break; - } - case TYPE_STRING: { - ColumnValueRange range(slots[slot_idx]->col_name()); - normalize_predicate(range, slots[slot_idx]); - break; - } - - case TYPE_DATE: { - ColumnValueRange range(slots[slot_idx]->col_name()); - normalize_predicate(range, slots[slot_idx]); - break; - } - case TYPE_DATETIME: { - ColumnValueRange range(slots[slot_idx]->col_name()); - normalize_predicate(range, slots[slot_idx]); - break; - } - - case TYPE_DATEV2: { - ColumnValueRange range(slots[slot_idx]->col_name()); - normalize_predicate(range, slots[slot_idx]); - break; - } - - case TYPE_DATETIMEV2: { - ColumnValueRange range(slots[slot_idx]->col_name(), - slots[slot_idx]->type().precision, - slots[slot_idx]->type().scale); - normalize_predicate(range, slots[slot_idx]); - break; - } - - case TYPE_DECIMALV2: { - ColumnValueRange range(slots[slot_idx]->col_name()); - normalize_predicate(range, slots[slot_idx]); - break; - } - - case TYPE_DECIMAL32: { - ColumnValueRange range(slots[slot_idx]->col_name(), - slots[slot_idx]->type().precision, - slots[slot_idx]->type().scale); - normalize_predicate(range, slots[slot_idx]); - break; - } - - case TYPE_DECIMAL64: { - ColumnValueRange range(slots[slot_idx]->col_name(), - slots[slot_idx]->type().precision, - slots[slot_idx]->type().scale); - normalize_predicate(range, slots[slot_idx]); - break; - } - - case TYPE_DECIMAL128: { - ColumnValueRange range(slots[slot_idx]->col_name(), - slots[slot_idx]->type().precision, - slots[slot_idx]->type().scale); - normalize_predicate(range, slots[slot_idx]); - break; - } - - case TYPE_BOOLEAN: { - ColumnValueRange range(slots[slot_idx]->col_name()); - normalize_predicate(range, slots[slot_idx]); - break; - } - +#define M(NAME) \ + case TYPE_##NAME: { \ + ColumnValueRange range(slots[slot_idx]->col_name(), \ + slots[slot_idx]->type().precision, \ + slots[slot_idx]->type().scale); \ + _id_to_slot_column_value_range[slots[slot_idx]->id()] = \ + std::pair {slots[slot_idx], range}; \ + break; \ + } +#define APPLY_FOR_PRIMITIVE_TYPE(M) \ + M(TINYINT) \ + M(SMALLINT) \ + M(INT) \ + M(BIGINT) \ + M(LARGEINT) \ + M(CHAR) \ + M(DATE) \ + M(DATETIME) \ + M(DATEV2) \ + M(DATETIMEV2) \ + M(VARCHAR) \ + M(STRING) \ + M(HLL) \ + M(DECIMAL32) \ + M(DECIMAL64) \ + M(DECIMAL128) \ + M(DECIMALV2) \ + M(BOOLEAN) + APPLY_FOR_PRIMITIVE_TYPE(M) +#undef M default: { VLOG_CRITICAL << "Unsupported Normalize Slot [ColName=" << slots[slot_idx]->col_name() << "]"; @@ -758,6 +664,28 @@ Status VOlapScanNode::normalize_conjuncts() { } } } + if (_vconjunct_ctx_ptr) { + if ((*_vconjunct_ctx_ptr)->root()) { + VExpr* new_root = _normalize_predicate(_runtime_state, (*_vconjunct_ctx_ptr)->root()); + if (new_root) { + (*_vconjunct_ctx_ptr)->set_root(new_root); + } else { + (*(_vconjunct_ctx_ptr.get()))->mark_as_stale(); + _stale_vexpr_ctxs.push_back(std::move(_vconjunct_ctx_ptr)); + _vconjunct_ctx_ptr.reset(nullptr); + } + } + } + for (auto& it : _id_to_slot_column_value_range) { + std::visit( + [&](auto&& range) { + if (range.is_empty_value_range()) { + _eos = true; + } + }, + it.second.second); + _column_value_ranges[it.second.first->col_name()] = it.second.second; + } return Status::OK(); } @@ -786,67 +714,6 @@ static std::string olap_filters_to_string(const std::vector& return filters_string; } -Status VOlapScanNode::build_function_filters() { - for (int conj_idx = 0; conj_idx < _conjunct_ctxs.size(); ++conj_idx) { - ExprContext* ex_ctx = _conjunct_ctxs[conj_idx]; - Expr* fn_expr = ex_ctx->root(); - bool opposite = false; - - if (TExprNodeType::COMPOUND_PRED == fn_expr->node_type() && - TExprOpcode::COMPOUND_NOT == fn_expr->op()) { - fn_expr = fn_expr->get_child(0); - opposite = true; - } - - // currently only support like / not like - if (TExprNodeType::FUNCTION_CALL == fn_expr->node_type() && - "like" == fn_expr->fn().name.function_name) { - doris_udf::FunctionContext* func_cxt = - ex_ctx->fn_context(fn_expr->get_fn_context_index()); - - if (!func_cxt) { - continue; - } - if (fn_expr->children().size() != 2) { - continue; - } - SlotRef* slot_ref = nullptr; - Expr* literal_expr = nullptr; - - if (TExprNodeType::SLOT_REF == fn_expr->get_child(0)->node_type()) { - literal_expr = fn_expr->get_child(1); - slot_ref = (SlotRef*)(fn_expr->get_child(0)); - } else if (TExprNodeType::SLOT_REF == fn_expr->get_child(1)->node_type()) { - literal_expr = fn_expr->get_child(0); - slot_ref = (SlotRef*)(fn_expr->get_child(1)); - } else { - continue; - } - - if (TExprNodeType::STRING_LITERAL != literal_expr->node_type()) continue; - - const SlotDescriptor* slot_desc = nullptr; - std::vector slot_ids; - slot_ref->get_slot_ids(&slot_ids); - for (SlotDescriptor* slot : _tuple_desc->slots()) { - if (slot->id() == slot_ids[0]) { - slot_desc = slot; - break; - } - } - - if (!slot_desc) { - continue; - } - std::string col = slot_desc->col_name(); - StringVal val = literal_expr->get_string_val(ex_ctx, nullptr); - _push_down_functions.emplace_back(opposite, col, func_cxt, val); - _pushed_func_conjuncts_index.insert(conj_idx); - } - } - return Status::OK(); -} - Status VOlapScanNode::build_key_ranges_and_filters() { const std::vector& column_names = _olap_scan_node.key_column_name; const std::vector& column_types = _olap_scan_node.key_column_type; @@ -898,62 +765,23 @@ Status VOlapScanNode::build_key_ranges_and_filters() { Status VOlapScanNode::start_scan(RuntimeState* state) { RETURN_IF_CANCELLED(state); - VLOG_CRITICAL << "Eval Const Conjuncts"; - // 1. Eval const conjuncts to find whether eos = true - eval_const_conjuncts(); - VLOG_CRITICAL << "NormalizeConjuncts"; - // 2. Convert conjuncts to ColumnValueRange in each column, some conjuncts may - // set eos = true RETURN_IF_ERROR(normalize_conjuncts()); - // 1 and 2 step dispose find conjuncts set eos = true, return directly if (_eos) { return Status::OK(); } VLOG_CRITICAL << "BuildKeyRangesAndFilters"; - // 3.1 Using `Key Column`'s ColumnValueRange to split ScanRange to several `Sub ScanRange` RETURN_IF_ERROR(build_key_ranges_and_filters()); - // 3.2 Function pushdown - if (config::enable_function_pushdown) RETURN_IF_ERROR(build_function_filters()); - - VLOG_CRITICAL << "Filter idle conjuncts"; - // 4.1 Filter idle conjunct which already trans to olap filters - // this must be after build_scan_key, it will free the StringValue memory - remove_pushed_conjuncts(state); VLOG_CRITICAL << "StartScanThread"; - // 5. Start multi thread to read several `Sub Sub ScanRange` RETURN_IF_ERROR(start_scan_thread(state)); return Status::OK(); } -template -Status VOlapScanNode::normalize_predicate(ColumnValueRange& range, SlotDescriptor* slot) { - // 1. Normalize InPredicate, add to ColumnValueRange - RETURN_IF_ERROR(normalize_in_and_eq_predicate(slot, &range)); - - // 2. Normalize NotInPredicate, add to ColumnValueRange - RETURN_IF_ERROR(normalize_not_in_and_not_eq_predicate(slot, &range)); - - // 3. Normalize BinaryPredicate , add to ColumnValueRange - RETURN_IF_ERROR(normalize_noneq_binary_predicate(slot, &range)); - - // 3. Normalize BloomFilterPredicate, push down by hash join node - RETURN_IF_ERROR(normalize_bloom_filter_predicate(slot)); - - // 4. Check whether range is empty, set _eos - if (range.is_empty_value_range()) _eos = true; - - // 5. Add range to Column->ColumnValueRange map - _column_value_ranges[slot->col_name()] = range; - - return Status::OK(); -} - -static bool ignore_cast(SlotDescriptor* slot, Expr* expr) { +static bool ignore_cast(SlotDescriptor* slot, VExpr* expr) { if ((slot->type().is_date_type() || slot->type().is_date_v2_type() || slot->type().is_datetime_v2_type()) && (expr->type().is_date_type() || expr->type().is_date_v2_type() || @@ -966,152 +794,83 @@ static bool ignore_cast(SlotDescriptor* slot, Expr* expr) { return false; } -bool VOlapScanNode::should_push_down_in_predicate(doris::SlotDescriptor* slot, - doris::InPredicate* pred) { - if (Expr::type_without_cast(pred->get_child(0)) != TExprNodeType::SLOT_REF) { - // not a slot ref(column) - return false; - } - - std::vector slot_ids; - if (pred->get_child(0)->get_slot_ids(&slot_ids) != 1) { - // not a single column predicate - return false; - } - - if (slot_ids[0] != slot->id()) { - // predicate not related to current column - return false; - } - - if (pred->get_child(0)->type().type != slot->type().type) { - if (!ignore_cast(slot, pred->get_child(0))) { - // the type of predicate not match the slot's type - return false; - } - } - - VLOG_CRITICAL << slot->col_name() << " fixed_values add num: " << pred->hybrid_set()->size(); - - // if there are too many elements in InPredicate, exceed the limit, - // we will not push any condition of this column to storage engine. - // because too many conditions pushed down to storage engine may even - // slow down the query process. - // ATTN: This is just an experience value. You may need to try - // different thresholds to improve performance. - if (pred->hybrid_set()->size() > _max_pushdown_conditions_per_column) { - VLOG_NOTICE << "Predicate value num " << pred->hybrid_set()->size() << " exceed limit " - << _max_pushdown_conditions_per_column; - return false; - } - - return true; -} - -std::pair VOlapScanNode::should_push_down_eq_predicate(doris::SlotDescriptor* slot, - doris::Expr* pred, int conj_idx, - int child_idx) { - auto result_pair = std::make_pair(false, nullptr); - - // Do not get slot_ref of column, should not push_down to Storage Engine - if (Expr::type_without_cast(pred->get_child(child_idx)) != TExprNodeType::SLOT_REF) { - return result_pair; - } - - std::vector slot_ids; - if (pred->get_child(child_idx)->get_slot_ids(&slot_ids) != 1) { - // not a single column predicate - return result_pair; - } - - if (slot_ids[0] != slot->id()) { - // predicate not related to current column - return result_pair; - } - - if (pred->get_child(child_idx)->type().type != slot->type().type) { - if (!ignore_cast(slot, pred->get_child(child_idx))) { - // the type of predicate not match the slot's type - return result_pair; - } - } - - Expr* expr = pred->get_child(1 - child_idx); - if (!expr->is_constant()) { - // only handle constant value - return result_pair; - } - - // get value in result pair - result_pair = std::make_pair( - true, _conjunct_ctxs[conj_idx]->get_value(expr, nullptr, slot->type().precision, - slot->type().scale)); - - return result_pair; -} - -template -Status VOlapScanNode::change_fixed_value_range(ColumnValueRange& temp_range, - void* value, const ChangeFixedValueRangeFunc& func) { - switch (primitive_type) { - case TYPE_DATE: { - DateTimeValue date_value = *reinterpret_cast(value); - // There is must return empty data in olap_scan_node, - // Because data value loss accuracy - if (!date_value.check_loss_accuracy_cast_to_date()) { - func(temp_range, - reinterpret_cast::CppType*>( +template +Status VOlapScanNode::change_value_range(ColumnValueRange& temp_range, void* value, + const ChangeFixedValueRangeFunc& func, + const std::string& fn_name, int slot_ref_child) { + if constexpr (PrimitiveType == TYPE_DATE) { + DateTimeValue date_value; + reinterpret_cast(value)->convert_vec_dt_to_dt(&date_value); + if constexpr (IsFixed) { + if (!date_value.check_loss_accuracy_cast_to_date()) { + func(temp_range, + reinterpret_cast::CppType*>( + &date_value)); + } + } else { + if (date_value.check_loss_accuracy_cast_to_date()) { + if (fn_name == "lt" || fn_name == "ge") { + ++date_value; + } + } + func(temp_range, to_olap_filter_type(fn_name, slot_ref_child), + reinterpret_cast::CppType*>( &date_value)); } - break; - } - case TYPE_DECIMALV2: - case TYPE_CHAR: - case TYPE_VARCHAR: - case TYPE_HLL: - case TYPE_DATETIME: - case TYPE_DATETIMEV2: - case TYPE_TINYINT: - case TYPE_SMALLINT: - case TYPE_INT: - case TYPE_BIGINT: - case TYPE_LARGEINT: - case TYPE_DECIMAL32: - case TYPE_DECIMAL64: - case TYPE_DECIMAL128: - case TYPE_STRING: { - func(temp_range, - reinterpret_cast::CppType*>(value)); - break; - } - case TYPE_BOOLEAN: { - bool v = *reinterpret_cast(value); - func(temp_range, - reinterpret_cast::CppType*>(&v)); - break; - } - case TYPE_DATEV2: { + } else if constexpr (PrimitiveType == TYPE_DATETIME) { + DateTimeValue date_value; + reinterpret_cast(value)->convert_vec_dt_to_dt(&date_value); + if constexpr (IsFixed) { + func(temp_range, + reinterpret_cast::CppType*>( + &date_value)); + } else { + func(temp_range, to_olap_filter_type(fn_name, slot_ref_child), + reinterpret_cast::CppType*>( + reinterpret_cast(&date_value))); + } + } else if constexpr (PrimitiveType == TYPE_DATEV2) { DateV2Value datetimev2_value = *reinterpret_cast*>(value); - if (datetimev2_value.can_cast_to_date_without_loss_accuracy()) { - DateV2Value date_v2; + if constexpr (IsFixed) { + if (datetimev2_value.can_cast_to_date_without_loss_accuracy()) { + DateV2Value date_v2; + date_v2.set_date_uint32( + binary_cast, uint64_t>(datetimev2_value) >> + TIME_PART_LENGTH); + func(temp_range, &date_v2); + } + } else { + doris::vectorized::DateV2Value date_v2; date_v2.set_date_uint32( binary_cast, uint64_t>(datetimev2_value) >> TIME_PART_LENGTH); - if constexpr (primitive_type == PrimitiveType::TYPE_DATEV2) { - func(temp_range, &date_v2); - } else { - __builtin_unreachable(); + if (!datetimev2_value.can_cast_to_date_without_loss_accuracy()) { + if (fn_name == "lt" || fn_name == "ge") { + ++date_v2; + } } + func(temp_range, to_olap_filter_type(fn_name, slot_ref_child), &date_v2); } - break; - } - default: { - LOG(WARNING) << "Normalize filter fail, Unsupported Primitive type. [type=" - << primitive_type << "]"; - return Status::InternalError("Normalize filter fail, Unsupported Primitive type"); - } + } else if constexpr ((PrimitiveType == TYPE_DECIMALV2) || (PrimitiveType == TYPE_CHAR) || + (PrimitiveType == TYPE_VARCHAR) || (PrimitiveType == TYPE_HLL) || + (PrimitiveType == TYPE_DATETIMEV2) || (PrimitiveType == TYPE_TINYINT) || + (PrimitiveType == TYPE_SMALLINT) || (PrimitiveType == TYPE_INT) || + (PrimitiveType == TYPE_BIGINT) || (PrimitiveType == TYPE_LARGEINT) || + (PrimitiveType == TYPE_DECIMAL32) || (PrimitiveType == TYPE_DECIMAL64) || + (PrimitiveType == TYPE_DECIMAL128) || (PrimitiveType == TYPE_STRING) || + (PrimitiveType == TYPE_BOOLEAN)) { + if constexpr (IsFixed) { + func(temp_range, + reinterpret_cast::CppType*>(value)); + } else { + func(temp_range, to_olap_filter_type(fn_name, slot_ref_child), + reinterpret_cast::CppType*>(value)); + } + } else { + static_assert(always_false_v); } + return Status::OK(); } @@ -1127,440 +886,6 @@ bool VOlapScanNode::is_key_column(const std::string& key_name) { return res != _olap_scan_node.key_column_name.end(); } -void VOlapScanNode::remove_pushed_conjuncts(RuntimeState* state) { - if (_pushed_conjuncts_index.empty() && _pushed_func_conjuncts_index.empty()) { - return; - } - - // dispose direct conjunct first - std::vector new_conjunct_ctxs; - for (int i = 0; i < _direct_conjunct_size; ++i) { - if (!_pushed_conjuncts_index.empty() && _pushed_conjuncts_index.count(i)) { - _conjunct_ctxs[i]->close(state); // pushed condition, just close - } else if (!_pushed_func_conjuncts_index.empty() && _pushed_func_conjuncts_index.count(i)) { - _pushed_func_conjunct_ctxs.emplace_back( - _conjunct_ctxs[i]); // pushed functions, need keep ctxs - } else { - new_conjunct_ctxs.emplace_back(_conjunct_ctxs[i]); - } - } - auto new_direct_conjunct_size = new_conjunct_ctxs.size(); - - // dispose hash join push down conjunct second - for (int i = _direct_conjunct_size; i < _conjunct_ctxs.size(); ++i) { - if (!_pushed_conjuncts_index.empty() && _pushed_conjuncts_index.count(i)) { - _conjunct_ctxs[i]->close(state); // pushed condition, just close - } else if (!_pushed_func_conjuncts_index.empty() && _pushed_func_conjuncts_index.count(i)) { - _pushed_func_conjunct_ctxs.emplace_back( - _conjunct_ctxs[i]); // pushed functions, need keep ctxs - } else { - new_conjunct_ctxs.emplace_back(_conjunct_ctxs[i]); - } - } - - _conjunct_ctxs = std::move(new_conjunct_ctxs); - _direct_conjunct_size = new_direct_conjunct_size; - - // TODO: support vbloom_filter_predicate/vbinary_predicate and merge unpushed predicate to _vconjunct_ctx - for (auto push_down_ctx : _pushed_conjuncts_index) { - auto iter = _conjunctid_to_runtime_filter_ctxs.find(push_down_ctx); - if (iter != _conjunctid_to_runtime_filter_ctxs.end()) { - iter->second->runtimefilter->set_push_down_profile(); - } - } - - // set vconjunct_ctx is empty, if all conjunct - if (_direct_conjunct_size == 0) { - if (_vconjunct_ctx_ptr != nullptr) { - (*_vconjunct_ctx_ptr)->close(state); - _vconjunct_ctx_ptr = nullptr; - } - } - - // filter idle conjunct in vexpr_contexts - auto checker = [&](int index) { return _pushed_conjuncts_index.count(index); }; - _peel_pushed_vconjunct(state, checker); -} - -// Construct the ColumnValueRange for one specified column -// It will only handle the InPredicate and eq BinaryPredicate in conjunct_ctxs. -// It will try to push down conditions of that column as much as possible, -// But if the number of conditions exceeds the limit, none of conditions will be pushed down. -template -Status VOlapScanNode::normalize_in_and_eq_predicate(SlotDescriptor* slot, - ColumnValueRange* range) { - std::vector filter_conjuncts_index; - for (int conj_idx = 0; conj_idx < _conjunct_ctxs.size(); ++conj_idx) { - // create empty range as temp range, temp range should do intersection on range - auto temp_range = ColumnValueRange::create_empty_column_value_range( - slot->type().precision, slot->type().scale); - - // 1. Normalize in conjuncts like 'where col in (v1, v2, v3)' - if (TExprOpcode::FILTER_IN == _conjunct_ctxs[conj_idx]->root()->op()) { - InPredicate* pred = static_cast(_conjunct_ctxs[conj_idx]->root()); - if (!should_push_down_in_predicate(slot, pred)) { - continue; - } - - // begin to push InPredicate value into ColumnValueRange - HybridSetBase::IteratorBase* iter = pred->hybrid_set()->begin(); - while (iter->has_next()) { - // column in (nullptr) is always false so continue to - // dispose next item - if (nullptr == iter->get_value()) { - iter->next(); - continue; - } - auto value = const_cast(iter->get_value()); - RETURN_IF_ERROR(change_fixed_value_range( - temp_range, value, ColumnValueRange::add_fixed_value_range)); - iter->next(); - } - - if (is_key_column(slot->col_name())) { - filter_conjuncts_index.emplace_back(conj_idx); - } - range->intersection(temp_range); - } // end of handle in predicate - // 2. Normalize eq conjuncts like 'where col = value' - else if (TExprNodeType::BINARY_PRED == _conjunct_ctxs[conj_idx]->root()->node_type() && - FILTER_IN == to_olap_filter_type(_conjunct_ctxs[conj_idx]->root()->op(), false)) { - Expr* pred = _conjunct_ctxs[conj_idx]->root(); - DCHECK(pred->get_num_children() == 2); - - for (int child_idx = 0; child_idx < 2; ++child_idx) { - // TODO: should use C++17 structured bindlings to refactor this code in the future: - // 'auto [should_push_down, value] = should_push_down_eq_predicate(slot, pred, conj_idx, child_idx);' - // make code tidier and readabler - auto result_pair = should_push_down_eq_predicate(slot, pred, conj_idx, child_idx); - if (!result_pair.first) { - continue; - } - - auto value = result_pair.second; - // where A = nullptr should return empty result set - if (value != nullptr) { - RETURN_IF_ERROR(change_fixed_value_range( - temp_range, value, ColumnValueRange::add_fixed_value_range)); - } - - if (is_key_column(slot->col_name())) { - filter_conjuncts_index.emplace_back(conj_idx); - } - range->intersection(temp_range); - } // end for each binary predicate child - } // end of handling eq binary predicate - } - - // exceed limit, no conditions will be pushed down to storage engine. - if (range->get_fixed_value_size() > _max_pushdown_conditions_per_column) { - range->set_whole_value_range(); - } else { - std::copy(filter_conjuncts_index.cbegin(), filter_conjuncts_index.cend(), - std::inserter(_pushed_conjuncts_index, _pushed_conjuncts_index.begin())); - } - return Status::OK(); -} - -// Construct the ColumnValueRange for one specified column -// It will only handle the NotInPredicate and not eq BinaryPredicate in conjunct_ctxs. -// It will try to push down conditions of that column as much as possible, -// But if the number of conditions exceeds the limit, none of conditions will be pushed down. -template -Status VOlapScanNode::normalize_not_in_and_not_eq_predicate(SlotDescriptor* slot, - ColumnValueRange* range) { - // If the conjunct of slot is fixed value, will change the fixed value set of column value range - // else add value to not in range and push down predicate directly - bool is_fixed_range = range->is_fixed_value_range(); - auto not_in_range = ColumnValueRange::create_empty_column_value_range(range->column_name()); - - std::vector filter_conjuncts_index; - for (int conj_idx = 0; conj_idx < _conjunct_ctxs.size(); ++conj_idx) { - // 1. Normalize in conjuncts like 'where col not in (v1, v2, v3)' - if (TExprOpcode::FILTER_NOT_IN == _conjunct_ctxs[conj_idx]->root()->op()) { - InPredicate* pred = static_cast(_conjunct_ctxs[conj_idx]->root()); - if (!should_push_down_in_predicate(slot, pred)) { - continue; - } - - // begin to push InPredicate value into ColumnValueRange - auto iter = pred->hybrid_set()->begin(); - while (iter->has_next()) { - // column not in (nullptr) is always true - if (nullptr == iter->get_value()) { - continue; - } - auto value = const_cast(iter->get_value()); - if (is_fixed_range) { - RETURN_IF_ERROR(change_fixed_value_range( - *range, value, ColumnValueRange::remove_fixed_value_range)); - } else { - RETURN_IF_ERROR(change_fixed_value_range( - not_in_range, value, ColumnValueRange::add_fixed_value_range)); - } - iter->next(); - } - - // only where a in ('a', 'b', nullptr) contain nullptr will - // clear temp_range to whole range, no need do intersection - if (is_key_column(slot->col_name())) { - filter_conjuncts_index.emplace_back(conj_idx); - } - } // end of handle not in predicate - - // 2. Normalize eq conjuncts like 'where col != value' - if (TExprNodeType::BINARY_PRED == _conjunct_ctxs[conj_idx]->root()->node_type() && - FILTER_NOT_IN == to_olap_filter_type(_conjunct_ctxs[conj_idx]->root()->op(), false)) { - Expr* pred = _conjunct_ctxs[conj_idx]->root(); - DCHECK(pred->get_num_children() == 2); - - for (int child_idx = 0; child_idx < 2; ++child_idx) { - // TODO: should use C++17 structured bindlings to refactor this code in the future: - // 'auto [should_push_down, value] = should_push_down_eq_predicate(slot, pred, conj_idx, child_idx);' - // make code tidier and readabler - auto result_pair = should_push_down_eq_predicate(slot, pred, conj_idx, child_idx); - if (!result_pair.first) { - continue; - } - auto value = result_pair.second; - - if (is_fixed_range) { - RETURN_IF_ERROR(change_fixed_value_range( - *range, value, ColumnValueRange::remove_fixed_value_range)); - } else { - RETURN_IF_ERROR(change_fixed_value_range( - not_in_range, value, ColumnValueRange::add_fixed_value_range)); - } - - if (is_key_column(slot->col_name())) { - filter_conjuncts_index.emplace_back(conj_idx); - } - } // end for each binary predicate child - } // end of handling eq binary predicate - } - - // exceed limit, no conditions will be pushed down to storage engine. - if (is_fixed_range || - not_in_range.get_fixed_value_size() <= _max_pushdown_conditions_per_column) { - if (!is_fixed_range) { - // push down not in condition to storage engine - not_in_range.to_in_condition(_olap_filter, false); - } - std::copy(filter_conjuncts_index.cbegin(), filter_conjuncts_index.cend(), - std::inserter(_pushed_conjuncts_index, _pushed_conjuncts_index.begin())); - } - return Status::OK(); -} - -template -bool VOlapScanNode::normalize_is_null_predicate(Expr* expr, SlotDescriptor* slot, - const std::string& is_null_str, - ColumnValueRange* range) { - if (expr->node_type() != TExprNodeType::SLOT_REF) { - return false; - } - - std::vector slot_ids; - if (1 != expr->get_slot_ids(&slot_ids)) { - return false; - } - - if (slot_ids[0] != slot->id()) { - return false; - } - - auto temp_range = ColumnValueRange::create_empty_column_value_range(slot->type().precision, - slot->type().scale); - temp_range.set_contain_null(is_null_str == "null"); - range->intersection(temp_range); - - return true; -} - -template -Status VOlapScanNode::normalize_noneq_binary_predicate(SlotDescriptor* slot, - ColumnValueRange* range) { - std::vector filter_conjuncts_index; - - for (int conj_idx = 0; conj_idx < _conjunct_ctxs.size(); ++conj_idx) { - Expr* root_expr = _conjunct_ctxs[conj_idx]->root(); - if (TExprNodeType::BINARY_PRED != root_expr->node_type() || - FILTER_IN == to_olap_filter_type(root_expr->op(), false) || - FILTER_NOT_IN == to_olap_filter_type(root_expr->op(), false)) { - if (TExprNodeType::FUNCTION_CALL == root_expr->node_type()) { - std::string is_null_str; - // 1. dispose the where pred "A is null" and "A is not null" - if (root_expr->is_null_scalar_function(is_null_str) && - normalize_is_null_predicate(root_expr->get_child(0), slot, is_null_str, - range)) { - // if column is key column should push down conjunct storage engine - if (is_key_column(slot->col_name())) { - filter_conjuncts_index.emplace_back(conj_idx); - } - } - } - continue; - } - - // 2. dispose the where pred "A <,<=" and "A >,>=" - Expr* pred = _conjunct_ctxs[conj_idx]->root(); - DCHECK(pred->get_num_children() == 2); - - for (int child_idx = 0; child_idx < 2; ++child_idx) { - if (Expr::type_without_cast(pred->get_child(child_idx)) != TExprNodeType::SLOT_REF) { - continue; - } - if (pred->get_child(child_idx)->type().type != slot->type().type) { - if (!ignore_cast(slot, pred->get_child(child_idx))) { - continue; - } - } - - std::vector slot_ids; - - if (1 == pred->get_child(child_idx)->get_slot_ids(&slot_ids)) { - if (slot_ids[0] != slot->id()) { - continue; - } - - Expr* expr = pred->get_child(1 - child_idx); - - // for case: where col_a > col_b - if (!expr->is_constant()) { - continue; - } - - void* value = _conjunct_ctxs[conj_idx]->get_value( - expr, nullptr, slot->type().precision, slot->type().scale); - // for case: where col > null - if (value == nullptr) { - continue; - } - - switch (slot->type().type) { - case TYPE_DATE: { - DateTimeValue date_value = *reinterpret_cast(value); - // NOTE: Datetime may be truncated to a date column, so we call ++operator for date_value - // for example: '2010-01-01 00:00:01' will be truncate to '2010-01-01' - if (date_value.check_loss_accuracy_cast_to_date()) { - if (pred->op() == TExprOpcode::LT || pred->op() == TExprOpcode::GE) { - ++date_value; - } - } - range->add_range(to_olap_filter_type(pred->op(), child_idx), - *reinterpret_cast::CppType*>( - &date_value)); - break; - } - case TYPE_DATEV2: { - DateV2Value datetimev2_value = - *reinterpret_cast*>(value); - doris::vectorized::DateV2Value date_v2; - date_v2.set_date_uint32(binary_cast, uint64_t>( - datetimev2_value) >> - TIME_PART_LENGTH); - if (!datetimev2_value.can_cast_to_date_without_loss_accuracy()) { - if (pred->op() == TExprOpcode::LT || pred->op() == TExprOpcode::GE) { - ++date_v2; - } - } - if constexpr (T == PrimitiveType::TYPE_DATEV2) { - range->add_range(to_olap_filter_type(pred->op(), child_idx), date_v2); - break; - } else { - __builtin_unreachable(); - } - } - case TYPE_TINYINT: - case TYPE_DECIMALV2: - case TYPE_DECIMAL32: - case TYPE_DECIMAL64: - case TYPE_DECIMAL128: - case TYPE_CHAR: - case TYPE_VARCHAR: - case TYPE_HLL: - case TYPE_DATETIME: - case TYPE_DATETIMEV2: - case TYPE_SMALLINT: - case TYPE_INT: - case TYPE_BIGINT: - case TYPE_LARGEINT: - case TYPE_BOOLEAN: - case TYPE_STRING: { - range->add_range( - to_olap_filter_type(pred->op(), child_idx), - *reinterpret_cast::CppType*>(value)); - break; - } - - default: { - LOG(WARNING) << "Normalize filter fail, Unsupported Primitive type. [type=" - << expr->type() << "]"; - return Status::InternalError( - "Normalize filter fail, Unsupported Primitive type"); - } - } - - if (is_key_column(slot->col_name())) { - filter_conjuncts_index.emplace_back(conj_idx); - } - - VLOG_CRITICAL << slot->col_name() << " op: " - << static_cast(to_olap_filter_type(pred->op(), child_idx)) - << " value: " - << *reinterpret_cast::CppType*>( - value); - } - } - } - - std::copy(filter_conjuncts_index.cbegin(), filter_conjuncts_index.cend(), - std::inserter(_pushed_conjuncts_index, _pushed_conjuncts_index.begin())); - - return Status::OK(); -} - -Status VOlapScanNode::normalize_bloom_filter_predicate(SlotDescriptor* slot) { - std::vector filter_conjuncts_index; - - for (int conj_idx = _direct_conjunct_size; conj_idx < _conjunct_ctxs.size(); ++conj_idx) { - Expr* root_expr = _conjunct_ctxs[conj_idx]->root(); - if (TExprNodeType::BLOOM_PRED != root_expr->node_type()) continue; - - Expr* pred = _conjunct_ctxs[conj_idx]->root(); - DCHECK(pred->get_num_children() == 1); - - if (Expr::type_without_cast(pred->get_child(0)) != TExprNodeType::SLOT_REF) { - continue; - } - if (pred->get_child(0)->type().type != slot->type().type) { - if (!ignore_cast(slot, pred->get_child(0))) { - continue; - } - } - - std::vector slot_ids; - - if (1 == pred->get_child(0)->get_slot_ids(&slot_ids)) { - if (slot_ids[0] != slot->id()) { - continue; - } - // only key column of bloom filter will push down to storage engine - if (is_key_column(slot->col_name())) { - filter_conjuncts_index.emplace_back(conj_idx); - _bloom_filters_push_down.emplace_back( - slot->col_name(), - (reinterpret_cast(pred))->get_bloom_filter_func()); - } - } - } - - std::copy(filter_conjuncts_index.cbegin(), filter_conjuncts_index.cend(), - std::inserter(_pushed_conjuncts_index, _pushed_conjuncts_index.begin())); - - return Status::OK(); -} - Status VOlapScanNode::start_scan_thread(RuntimeState* state) { if (_scan_ranges.empty()) { _transfer_done = true; @@ -1700,8 +1025,6 @@ Status VOlapScanNode::close(RuntimeState* state) { DCHECK(runtime_filter != nullptr); runtime_filter->consumer_close(); } - // pushed functions close - Expr::close(_pushed_func_conjunct_ctxs, state); for (auto& ctx : _stale_vexpr_ctxs) { (*ctx)->close(state); @@ -2019,4 +1342,533 @@ Status VOlapScanNode::get_hints(TabletSharedPtr table, const TPaloScanRange& sca return Status::OK(); } +template +bool VOlapScanNode::_should_push_down_in_predicate(VInPredicate* pred, VExprContext* expr_ctx) { + if (pred->is_not_in() != IsNotIn) { + return false; + } + InState* state = reinterpret_cast( + expr_ctx->fn_context(pred->fn_context_index()) + ->get_function_state(FunctionContext::FRAGMENT_LOCAL)); + HybridSetBase* set = state->hybrid_set.get(); + + // if there are too many elements in InPredicate, exceed the limit, + // we will not push any condition of this column to storage engine. + // because too many conditions pushed down to storage engine may even + // slow down the query process. + // ATTN: This is just an experience value. You may need to try + // different thresholds to improve performance. + if (set->size() > _max_pushdown_conditions_per_column) { + VLOG_NOTICE << "Predicate value num " << set->size() << " exceed limit " + << _max_pushdown_conditions_per_column; + return false; + } + return true; +} + +bool VOlapScanNode::_should_push_down_function_filter(VectorizedFnCall* fn_call, + VExprContext* expr_ctx, + std::string* constant_str, + doris_udf::FunctionContext** fn_ctx) { + // Now only `like` function filters is supported to push down + if (fn_call->fn().name.function_name != "like") { + return false; + } + + const auto& children = fn_call->children(); + doris_udf::FunctionContext* func_cxt = expr_ctx->fn_context(fn_call->fn_context_index()); + DCHECK(func_cxt != nullptr); + DCHECK(children.size() == 2); + for (size_t i = 0; i < children.size(); i++) { + if (VExpr::expr_without_cast(children[i])->node_type() != TExprNodeType::SLOT_REF) { + // not a slot ref(column) + continue; + } + if (!children[1 - i]->is_constant()) { + // only handle constant value + return false; + } else { + DCHECK(children[1 - i]->type().is_string_type()); + if (const ColumnConst* const_column = check_and_get_column( + children[1 - i]->get_const_col(expr_ctx)->column_ptr)) { + *constant_str = const_column->get_data_at(0).to_string(); + } else { + return false; + } + } + } + *fn_ctx = func_cxt; + return true; +} + +bool VOlapScanNode::_should_push_down_binary_predicate( + VectorizedFnCall* fn_call, VExprContext* expr_ctx, StringRef* constant_val, + int* slot_ref_child, const std::function& fn_checker) { + if (!fn_checker(fn_call->fn().name.function_name)) { + return false; + } + + const auto& children = fn_call->children(); + DCHECK(children.size() == 2); + for (size_t i = 0; i < children.size(); i++) { + if (VExpr::expr_without_cast(children[i])->node_type() != TExprNodeType::SLOT_REF) { + // not a slot ref(column) + continue; + } + if (!children[1 - i]->is_constant()) { + // only handle constant value + return false; + } else { + if (const ColumnConst* const_column = check_and_get_column( + children[1 - i]->get_const_col(expr_ctx)->column_ptr)) { + *slot_ref_child = i; + *constant_val = const_column->get_data_at(0); + } else { + return false; + } + } + } + return true; +} + +bool VOlapScanNode::_is_predicate_acting_on_slot( + VExpr* expr, + const std::function&, const VSlotRef**, VExpr**)>& checker, + SlotDescriptor** slot_desc, ColumnValueRangeType** range) { + const VSlotRef* slot_ref = nullptr; + VExpr* child_contains_slot = nullptr; + if (!checker(expr->children(), &slot_ref, &child_contains_slot)) { + // not a slot ref(column) + return false; + } + + auto entry = _id_to_slot_column_value_range.find(slot_ref->slot_id()); + if (_id_to_slot_column_value_range.end() == entry) { + return false; + } + *slot_desc = entry->second.first; + DCHECK(child_contains_slot != nullptr); + if (child_contains_slot->type().type != (*slot_desc)->type().type) { + if (!ignore_cast(*slot_desc, child_contains_slot)) { + // the type of predicate not match the slot's type + return false; + } + } + *range = &(entry->second.second); + return true; +} + +template +Status VOlapScanNode::_normalize_in_and_eq_predicate(VExpr* expr, VExprContext* expr_ctx, + SlotDescriptor* slot, + ColumnValueRange& range, bool* push_down) { + auto temp_range = ColumnValueRange::create_empty_column_value_range(slot->type().precision, + slot->type().scale); + bool effect = false; + // 1. Normalize in conjuncts like 'where col in (v1, v2, v3)' + if (TExprNodeType::IN_PRED == expr->node_type()) { + VInPredicate* pred = static_cast(expr); + if (!_should_push_down_in_predicate(pred, expr_ctx)) { + return Status::OK(); + } + + // begin to push InPredicate value into ColumnValueRange + InState* state = reinterpret_cast( + expr_ctx->fn_context(pred->fn_context_index()) + ->get_function_state(FunctionContext::FRAGMENT_LOCAL)); + HybridSetBase::IteratorBase* iter = state->hybrid_set->begin(); + auto fn_name = std::string(""); + while (iter->has_next()) { + // column in (nullptr) is always false so continue to + // dispose next item + if (nullptr == iter->get_value()) { + iter->next(); + continue; + } + auto value = const_cast(iter->get_value()); + RETURN_IF_ERROR(change_value_range( + temp_range, value, ColumnValueRange::add_fixed_value_range, fn_name)); + iter->next(); + } + + range.intersection(temp_range); + effect = true; + } else if (TExprNodeType::BINARY_PRED == expr->node_type()) { + DCHECK(expr->children().size() == 2); + auto eq_checker = [](const std::string& fn_name) { return fn_name == "eq"; }; + + StringRef value; + int slot_ref_child = -1; + if (_should_push_down_binary_predicate(reinterpret_cast(expr), expr_ctx, + &value, &slot_ref_child, eq_checker)) { + DCHECK(slot_ref_child >= 0); + // where A = nullptr should return empty result set + auto fn_name = std::string(""); + if (value.data != nullptr) { + if constexpr (T == TYPE_CHAR || T == TYPE_VARCHAR || T == TYPE_STRING || + T == TYPE_HLL) { + auto val = StringValue(value.data, value.size); + RETURN_IF_ERROR(change_value_range( + temp_range, reinterpret_cast(&val), + ColumnValueRange::add_fixed_value_range, fn_name)); + } else { + RETURN_IF_ERROR(change_value_range( + temp_range, reinterpret_cast(const_cast(value.data)), + ColumnValueRange::add_fixed_value_range, fn_name)); + } + range.intersection(temp_range); + effect = true; + } + } + } + + // exceed limit, no conditions will be pushed down to storage engine. + if (range.get_fixed_value_size() > _max_pushdown_conditions_per_column) { + range.set_whole_value_range(); + } else { + *push_down = effect; + } + return Status::OK(); +} + +template +Status VOlapScanNode::_normalize_not_in_and_not_eq_predicate(VExpr* expr, VExprContext* expr_ctx, + SlotDescriptor* slot, + ColumnValueRange& range, + bool* push_down) { + bool is_fixed_range = range.is_fixed_value_range(); + auto not_in_range = ColumnValueRange::create_empty_column_value_range(range.column_name()); + bool effect = false; + // 1. Normalize in conjuncts like 'where col in (v1, v2, v3)' + if (TExprNodeType::IN_PRED == expr->node_type()) { + VInPredicate* pred = static_cast(expr); + if (!_should_push_down_in_predicate(pred, expr_ctx)) { + return Status::OK(); + } + + // begin to push InPredicate value into ColumnValueRange + InState* state = reinterpret_cast( + expr_ctx->fn_context(pred->fn_context_index()) + ->get_function_state(FunctionContext::FRAGMENT_LOCAL)); + HybridSetBase::IteratorBase* iter = state->hybrid_set->begin(); + auto fn_name = std::string(""); + while (iter->has_next()) { + // column not in (nullptr) is always true + if (nullptr == iter->get_value()) { + continue; + } + auto value = const_cast(iter->get_value()); + if (is_fixed_range) { + RETURN_IF_ERROR(change_value_range( + range, value, ColumnValueRange::remove_fixed_value_range, fn_name)); + } else { + RETURN_IF_ERROR(change_value_range( + not_in_range, value, ColumnValueRange::add_fixed_value_range, fn_name)); + } + iter->next(); + } + effect = true; + } else if (TExprNodeType::BINARY_PRED == expr->node_type()) { + DCHECK(expr->children().size() == 2); + + auto ne_checker = [](const std::string& fn_name) { return fn_name == "ne"; }; + StringRef value; + int slot_ref_child = -1; + if (_should_push_down_binary_predicate(reinterpret_cast(expr), expr_ctx, + &value, &slot_ref_child, ne_checker)) { + DCHECK(slot_ref_child >= 0); + // where A = nullptr should return empty result set + if (value.data != nullptr) { + auto fn_name = std::string(""); + if constexpr (T == TYPE_CHAR || T == TYPE_VARCHAR || T == TYPE_STRING || + T == TYPE_HLL) { + auto val = StringValue(value.data, value.size); + if (is_fixed_range) { + RETURN_IF_ERROR(change_value_range( + range, reinterpret_cast(&val), + ColumnValueRange::remove_fixed_value_range, fn_name)); + } else { + RETURN_IF_ERROR(change_value_range( + not_in_range, reinterpret_cast(&val), + ColumnValueRange::add_fixed_value_range, fn_name)); + } + } else { + if (is_fixed_range) { + RETURN_IF_ERROR(change_value_range( + range, reinterpret_cast(const_cast(value.data)), + ColumnValueRange::remove_fixed_value_range, fn_name)); + } else { + RETURN_IF_ERROR(change_value_range( + not_in_range, + reinterpret_cast(const_cast(value.data)), + ColumnValueRange::add_fixed_value_range, fn_name)); + } + } + effect = true; + } + } + } + + if (is_fixed_range || + not_in_range.get_fixed_value_size() <= _max_pushdown_conditions_per_column) { + if (!is_fixed_range) { + // push down not in condition to storage engine + not_in_range.to_in_condition(_olap_filter, false); + } + *push_down = effect; + } + return Status::OK(); +} + +template +Status VOlapScanNode::_normalize_is_null_predicate(VExpr* expr, VExprContext* expr_ctx, + SlotDescriptor* slot, ColumnValueRange& range, + bool* push_down) { + if (TExprNodeType::FUNCTION_CALL == expr->node_type()) { + if (reinterpret_cast(expr)->fn().name.function_name == "is_null_pred") { + auto temp_range = ColumnValueRange::create_empty_column_value_range( + slot->type().precision, slot->type().scale); + temp_range.set_contain_null(true); + range.intersection(temp_range); + *push_down = true; + } else if (reinterpret_cast(expr)->fn().name.function_name == + "is_not_null_pred") { + auto temp_range = ColumnValueRange::create_empty_column_value_range( + slot->type().precision, slot->type().scale); + temp_range.set_contain_null(false); + range.intersection(temp_range); + *push_down = true; + } + } + return Status::OK(); +} + +template +Status VOlapScanNode::_normalize_noneq_binary_predicate(VExpr* expr, VExprContext* expr_ctx, + SlotDescriptor* slot, + ColumnValueRange& range, + bool* push_down) { + if (TExprNodeType::BINARY_PRED == expr->node_type()) { + DCHECK(expr->children().size() == 2); + + auto noneq_checker = [](const std::string& fn_name) { + return fn_name != "ne" && fn_name != "eq"; + }; + StringRef value; + int slot_ref_child = -1; + if (_should_push_down_binary_predicate(reinterpret_cast(expr), expr_ctx, + &value, &slot_ref_child, noneq_checker)) { + DCHECK(slot_ref_child >= 0); + const std::string& fn_name = + reinterpret_cast(expr)->fn().name.function_name; + + // where A = nullptr should return empty result set + if (value.data != nullptr) { + *push_down = true; + if constexpr (T == TYPE_CHAR || T == TYPE_VARCHAR || T == TYPE_STRING || + T == TYPE_HLL) { + auto val = StringValue(value.data, value.size); + RETURN_IF_ERROR(change_value_range(range, reinterpret_cast(&val), + ColumnValueRange::add_value_range, + fn_name, slot_ref_child)); + } else { + RETURN_IF_ERROR(change_value_range( + range, reinterpret_cast(const_cast(value.data)), + ColumnValueRange::add_value_range, fn_name, slot_ref_child)); + } + } + } + } + return Status::OK(); +} + +Status VOlapScanNode::_normalize_bloom_filter(VExpr* expr, VExprContext* expr_ctx, + SlotDescriptor* slot, bool* push_down) { + if (TExprNodeType::BLOOM_PRED == expr->node_type()) { + DCHECK(expr->children().size() == 1); + _bloom_filters_push_down.emplace_back(slot->col_name(), expr->get_bloom_filter_func()); + *push_down = true; + } + return Status::OK(); +} + +Status VOlapScanNode::_normalize_function_filters(VExpr* expr, VExprContext* expr_ctx, + SlotDescriptor* slot, bool* push_down) { + if (!config::enable_function_pushdown) { + return Status::OK(); + } + bool opposite = false; + VExpr* fn_expr = expr; + if (TExprNodeType::COMPOUND_PRED == expr->node_type() && + expr->fn().name.function_name == "not") { + fn_expr = fn_expr->children()[0]; + opposite = true; + } + + if (TExprNodeType::FUNCTION_CALL == fn_expr->node_type()) { + doris_udf::FunctionContext* fn_ctx = nullptr; + std::string str; + if (_should_push_down_function_filter(reinterpret_cast(fn_expr), + expr_ctx, &str, &fn_ctx)) { + std::string col = slot->col_name(); + StringVal val(reinterpret_cast(str.data()), str.size()); + _push_down_functions.emplace_back(opposite, col, fn_ctx, val); + *push_down = true; + } + } + return Status::OK(); +} + +void VOlapScanNode::eval_const_conjuncts(VExpr* vexpr, VExprContext* expr_ctx, bool* push_down) { + char* constant_val = nullptr; + if (vexpr->is_constant()) { + if (const ColumnConst* const_column = + check_and_get_column(vexpr->get_const_col(expr_ctx)->column_ptr)) { + constant_val = const_cast(const_column->get_data_at(0).data); + if (constant_val == nullptr || *reinterpret_cast(constant_val) == false) { + *push_down = true; + _eos = true; + } + } else { + LOG(WARNING) << "Expr[" << vexpr->debug_string() + << "] is a constant but doesn't contain a const column!"; + } + } +} + +VExpr* VOlapScanNode::_normalize_predicate(RuntimeState* state, VExpr* conjunct_expr_root) { + static constexpr auto is_leaf = [](VExpr* expr) { return !expr->is_and_expr(); }; + auto in_predicate_checker = [](const std::vector& children, const VSlotRef** slot, + VExpr** child_contains_slot) { + if (children.empty() || + VExpr::expr_without_cast(children[0])->node_type() != TExprNodeType::SLOT_REF) { + // not a slot ref(column) + return false; + } + *slot = reinterpret_cast(VExpr::expr_without_cast(children[0])); + *child_contains_slot = children[0]; + return true; + }; + auto eq_predicate_checker = [](const std::vector& children, const VSlotRef** slot, + VExpr** child_contains_slot) { + for (const VExpr* child : children) { + if (VExpr::expr_without_cast(child)->node_type() != TExprNodeType::SLOT_REF) { + // not a slot ref(column) + continue; + } + *slot = reinterpret_cast(VExpr::expr_without_cast(child)); + *child_contains_slot = const_cast(child); + return true; + } + return false; + }; + + if (conjunct_expr_root != nullptr) { + if (is_leaf(conjunct_expr_root)) { + auto impl = conjunct_expr_root->get_impl(); + VExpr* cur_expr = impl ? const_cast(impl) : conjunct_expr_root; + SlotDescriptor* slot; + ColumnValueRangeType* range = nullptr; + bool push_down = false; + eval_const_conjuncts(cur_expr, *(_vconjunct_ctx_ptr.get()), &push_down); + if (!push_down && + (_is_predicate_acting_on_slot(cur_expr, in_predicate_checker, &slot, &range) || + _is_predicate_acting_on_slot(cur_expr, eq_predicate_checker, &slot, &range))) { + std::visit( + [&](auto& value_range) { + RETURN_IF_PUSH_DOWN(_normalize_in_and_eq_predicate( + cur_expr, *(_vconjunct_ctx_ptr.get()), slot, value_range, + &push_down)); + RETURN_IF_PUSH_DOWN(_normalize_not_in_and_not_eq_predicate( + cur_expr, *(_vconjunct_ctx_ptr.get()), slot, value_range, + &push_down)); + RETURN_IF_PUSH_DOWN(_normalize_is_null_predicate( + cur_expr, *(_vconjunct_ctx_ptr.get()), slot, value_range, + &push_down)); + RETURN_IF_PUSH_DOWN(_normalize_noneq_binary_predicate( + cur_expr, *(_vconjunct_ctx_ptr.get()), slot, value_range, + &push_down)); + if (is_key_column(slot->col_name())) { + RETURN_IF_PUSH_DOWN(_normalize_bloom_filter( + cur_expr, *(_vconjunct_ctx_ptr.get()), slot, &push_down)); + RETURN_IF_PUSH_DOWN(_normalize_function_filters( + cur_expr, *(_vconjunct_ctx_ptr.get()), slot, &push_down)); + } + }, + *range); + } + if (push_down && is_key_column(slot->col_name())) { + return nullptr; + } else { + return conjunct_expr_root; + } + } else { + VExpr* left_child = _normalize_predicate(state, conjunct_expr_root->children()[0]); + VExpr* right_child = _normalize_predicate(state, conjunct_expr_root->children()[1]); + + if (left_child != nullptr && right_child != nullptr) { + conjunct_expr_root->set_children({left_child, right_child}); + return conjunct_expr_root; + } else { + // here only close the and expr self, do not close the child + conjunct_expr_root->set_children({}); + conjunct_expr_root->close(state, *_vconjunct_ctx_ptr, + (*_vconjunct_ctx_ptr)->get_function_state_scope()); + } + + // here do not close Expr* now + return left_child != nullptr ? left_child : right_child; + } + } + return conjunct_expr_root; +} + +Status VOlapScanNode::_append_rf_into_conjuncts(RuntimeState* state, std::vector& vexprs) { + if (!vexprs.empty()) { + auto last_expr = _vconjunct_ctx_ptr ? (*_vconjunct_ctx_ptr)->root() : vexprs[0]; + for (size_t j = _vconjunct_ctx_ptr ? 0 : 1; j < vexprs.size(); j++) { + if (_rf_vexpr_set.find(vexprs[j]) != _rf_vexpr_set.end()) { + continue; + } + TFunction fn; + TFunctionName fn_name; + fn_name.__set_db_name(""); + fn_name.__set_function_name("and"); + fn.__set_name(fn_name); + fn.__set_binary_type(TFunctionBinaryType::BUILTIN); + std::vector arg_types; + arg_types.push_back(create_type_desc(PrimitiveType::TYPE_BOOLEAN)); + arg_types.push_back(create_type_desc(PrimitiveType::TYPE_BOOLEAN)); + fn.__set_arg_types(arg_types); + fn.__set_ret_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN)); + fn.__set_has_var_args(false); + + TExprNode texpr_node; + texpr_node.__set_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN)); + texpr_node.__set_node_type(TExprNodeType::COMPOUND_PRED); + texpr_node.__set_opcode(TExprOpcode::COMPOUND_AND); + texpr_node.__set_fn(fn); + texpr_node.__set_is_nullable(last_expr->is_nullable() || vexprs[j]->is_nullable()); + VExpr* new_node = _pool->add(new VcompoundPred(texpr_node)); + new_node->add_child(last_expr); + DCHECK((vexprs[j])->get_impl() != nullptr); + new_node->add_child(vexprs[j]); + last_expr = new_node; + _rf_vexpr_set.insert(vexprs[j]); + } + auto new_vconjunct_ctx_ptr = _pool->add(new VExprContext(last_expr)); + if (_vconjunct_ctx_ptr) { + (*_vconjunct_ctx_ptr)->clone_fn_contexts(new_vconjunct_ctx_ptr); + } + RETURN_IF_ERROR(new_vconjunct_ctx_ptr->prepare(state, row_desc())); + RETURN_IF_ERROR(new_vconjunct_ctx_ptr->open(state)); + if (_vconjunct_ctx_ptr) { + (*(_vconjunct_ctx_ptr.get()))->mark_as_stale(); + _stale_vexpr_ctxs.push_back(std::move(_vconjunct_ctx_ptr)); + } + _vconjunct_ctx_ptr.reset(new doris::vectorized::VExprContext*); + *(_vconjunct_ctx_ptr.get()) = new_vconjunct_ctx_ptr; + } + return Status::OK(); +} } // namespace doris::vectorized diff --git a/be/src/vec/exec/volap_scan_node.h b/be/src/vec/exec/volap_scan_node.h index 4ead17f9b3..e047f34c10 100644 --- a/be/src/vec/exec/volap_scan_node.h +++ b/be/src/vec/exec/volap_scan_node.h @@ -26,6 +26,9 @@ #include "gen_cpp/PlanNodes_types.h" #include "olap/tablet.h" #include "util/progress_updater.h" +#include "vec/exprs/vectorized_fn_call.h" +#include "vec/exprs/vin_predicate.h" +#include "vec/exprs/vslot_ref.h" namespace doris { class ObjectPool; @@ -63,40 +66,15 @@ private: // In order to ensure the accuracy of the query result // only key column conjuncts will be remove as idle conjunct bool is_key_column(const std::string& key_name); - void remove_pushed_conjuncts(RuntimeState* state); Status start_scan(RuntimeState* state); - void eval_const_conjuncts(); Status normalize_conjuncts(); Status build_key_ranges_and_filters(); - Status build_function_filters(); - template - Status normalize_predicate(ColumnValueRange& range, SlotDescriptor* slot); - - template - Status normalize_in_and_eq_predicate(SlotDescriptor* slot, ColumnValueRange* range); - - template - Status normalize_not_in_and_not_eq_predicate(SlotDescriptor* slot, ColumnValueRange* range); - - template - Status normalize_noneq_binary_predicate(SlotDescriptor* slot, ColumnValueRange* range); - - Status normalize_bloom_filter_predicate(SlotDescriptor* slot); - - template - static bool normalize_is_null_predicate(Expr* expr, SlotDescriptor* slot, - const std::string& is_null_str, - ColumnValueRange* range); - bool should_push_down_in_predicate(SlotDescriptor* slot, InPredicate* in_pred); - - template - static Status change_fixed_value_range(ColumnValueRange& range, void* value, - const ChangeFixedValueRangeFunc& func); - - std::pair should_push_down_eq_predicate(SlotDescriptor* slot, Expr* pred, - int conj_idx, int child_idx); + template + static Status change_value_range(ColumnValueRange& range, void* value, + const ChangeFixedValueRangeFunc& func, + const std::string& fn_name, int slot_ref_child = -1); void transfer_thread(RuntimeState* state); void scanner_thread(VOlapScanner* scanner); @@ -114,6 +92,54 @@ private: return _runtime_filter_descs; } + template + Status _normalize_in_and_eq_predicate(vectorized::VExpr* expr, VExprContext* expr_ctx, + SlotDescriptor* slot, ColumnValueRange& range, + bool* push_down); + template + Status _normalize_not_in_and_not_eq_predicate(vectorized::VExpr* expr, VExprContext* expr_ctx, + SlotDescriptor* slot, ColumnValueRange& range, + bool* push_down); + + template + Status _normalize_noneq_binary_predicate(vectorized::VExpr* expr, VExprContext* expr_ctx, + SlotDescriptor* slot, ColumnValueRange& range, + bool* push_down); + + template + Status _normalize_is_null_predicate(vectorized::VExpr* expr, VExprContext* expr_ctx, + SlotDescriptor* slot, ColumnValueRange& range, + bool* push_down); + + Status _normalize_bloom_filter(vectorized::VExpr* expr, VExprContext* expr_ctx, + SlotDescriptor* slot, bool* push_down); + + void eval_const_conjuncts(VExpr* vexpr, VExprContext* expr_ctx, bool* push_down); + + vectorized::VExpr* _normalize_predicate(RuntimeState* state, + vectorized::VExpr* conjunct_expr_root); + + Status _normalize_function_filters(vectorized::VExpr* expr, VExprContext* expr_ctx, + SlotDescriptor* slot, bool* push_down); + + template + bool _should_push_down_in_predicate(VInPredicate* in_pred, VExprContext* expr_ctx); + + bool _is_predicate_acting_on_slot(VExpr* expr, + const std::function&, + const VSlotRef**, VExpr**)>& checker, + SlotDescriptor** slot_desc, ColumnValueRangeType** range); + + bool _should_push_down_binary_predicate( + VectorizedFnCall* fn_call, VExprContext* expr_ctx, StringRef* constant_val, + int* slot_ref_child, const std::function& fn_checker); + + bool _should_push_down_function_filter(VectorizedFnCall* fn_call, VExprContext* expr_ctx, + std::string* constant_str, + doris_udf::FunctionContext** fn_ctx); + + Status _append_rf_into_conjuncts(RuntimeState* state, std::vector& vexprs); + // Tuple id resolved in prepare() to set _tuple_desc; TupleId _tuple_id; // doris scan node used to scan doris @@ -124,9 +150,6 @@ private: int _tuple_idx; // string slots std::vector _string_slots; - // conjunct's index which already be push down storage engine - // should be remove in olap_scan_node, no need check this conjunct again - std::set _pushed_conjuncts_index; // collection slots std::vector _collection_slots; @@ -149,11 +172,6 @@ private: // push down functions to storage engine // only support scalar functions, now just support like / not like std::vector _push_down_functions; - // functions conjunct's index which already be push down storage engine - std::set _pushed_func_conjuncts_index; - // need keep these conjunct to the end of scan node, - // since some memory referenced by pushed function filters - std::vector _pushed_func_conjunct_ctxs; // Pool for storing allocated scanner objects. We don't want to use the // runtime pool to ensure that the scanner objects are deleted before this @@ -183,7 +201,6 @@ private: // Used in Scan thread to ensure thread-safe std::atomic_bool _scanner_done; std::atomic_bool _transfer_done; - size_t _direct_conjunct_size; int _total_assign_num; int _nice; @@ -321,6 +338,8 @@ private: phmap::flat_hash_set _rf_vexpr_set; std::vector> _stale_vexpr_ctxs; + phmap::flat_hash_map> + _id_to_slot_column_value_range; }; } // namespace vectorized } // namespace doris diff --git a/be/src/vec/exec/volap_scanner.cpp b/be/src/vec/exec/volap_scanner.cpp index c032aee301..6ce3182c1c 100644 --- a/be/src/vec/exec/volap_scanner.cpp +++ b/be/src/vec/exec/volap_scanner.cpp @@ -122,10 +122,6 @@ Status VOlapScanner::open() { SCOPED_TIMER(_parent->_reader_init_timer); SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); - if (_conjunct_ctxs.size() > _parent->_direct_conjunct_size) { - _use_pushdown_conjuncts = true; - } - _runtime_filter_marks.resize(_parent->runtime_filter_descs().size(), false); auto res = _tablet_reader->init(_tablet_reader_params); diff --git a/be/src/vec/exec/volap_scanner.h b/be/src/vec/exec/volap_scanner.h index 1ea8639c6c..7584eb47c9 100644 --- a/be/src/vec/exec/volap_scanner.h +++ b/be/src/vec/exec/volap_scanner.h @@ -83,10 +83,6 @@ public: int64_t update_wait_worker_timer() const { return _watcher.elapsed_time(); } - void set_use_pushdown_conjuncts(bool has_pushdown_conjuncts) { - _use_pushdown_conjuncts = has_pushdown_conjuncts; - } - std::vector* mutable_runtime_filter_marks() { return &_runtime_filter_marks; } TabletStorageType get_storage_type(); @@ -117,7 +113,6 @@ private: bool _aggregation; bool _need_agg_finalize = true; bool _has_update_counter = false; - bool _use_pushdown_conjuncts = false; TabletReader::ReaderParams _tablet_reader_params; std::unique_ptr _tablet_reader; diff --git a/be/src/vec/exprs/vbloom_predicate.cpp b/be/src/vec/exprs/vbloom_predicate.cpp index 6f73b2c69d..c708efdda0 100644 --- a/be/src/vec/exprs/vbloom_predicate.cpp +++ b/be/src/vec/exprs/vbloom_predicate.cpp @@ -90,7 +90,7 @@ Status VBloomPredicate::execute(VExprContext* context, Block* block, int* result const std::string& VBloomPredicate::expr_name() const { return _expr_name; } -void VBloomPredicate::set_filter(std::unique_ptr& filter) { - _filter.reset(filter.release()); +void VBloomPredicate::set_filter(std::shared_ptr& filter) { + _filter = filter; } } // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/exprs/vbloom_predicate.h b/be/src/vec/exprs/vbloom_predicate.h index f715b9e40e..0f00d6ca55 100644 --- a/be/src/vec/exprs/vbloom_predicate.h +++ b/be/src/vec/exprs/vbloom_predicate.h @@ -37,7 +37,9 @@ public: return pool->add(new VBloomPredicate(*this)); } const std::string& expr_name() const override; - void set_filter(std::unique_ptr& filter); + void set_filter(std::shared_ptr& filter); + + std::shared_ptr get_bloom_filter_func() const override { return _filter; } private: std::shared_ptr _filter; diff --git a/be/src/vec/exprs/vexpr.cpp b/be/src/vec/exprs/vexpr.cpp index 909feb324f..d493a87a65 100644 --- a/be/src/vec/exprs/vexpr.cpp +++ b/be/src/vec/exprs/vexpr.cpp @@ -354,7 +354,7 @@ Status VExpr::init_function_context(VExprContext* context, void VExpr::close_function_context(VExprContext* context, FunctionContext::FunctionStateScope scope, const FunctionBasePtr& function) const { - if (_fn_context_index != -1) { + if (_fn_context_index != -1 && !context->_stale) { FunctionContext* fn_ctx = context->fn_context(_fn_context_index); function->close(fn_ctx, FunctionContext::THREAD_LOCAL); if (scope == FunctionContext::FRAGMENT_LOCAL) { diff --git a/be/src/vec/exprs/vexpr.h b/be/src/vec/exprs/vexpr.h index b6921753e8..9d2d4ce64b 100644 --- a/be/src/vec/exprs/vexpr.h +++ b/be/src/vec/exprs/vexpr.h @@ -21,6 +21,7 @@ #include #include "common/status.h" +#include "exprs/bloomfilter_predicate.h" #include "gen_cpp/Exprs_types.h" #include "runtime/types.h" #include "udf/udf_internal.h" @@ -137,6 +138,25 @@ public: /// expr. virtual ColumnPtrWrapper* get_const_col(VExprContext* context); + int fn_context_index() { return _fn_context_index; }; + + static const VExpr* expr_without_cast(const VExpr* expr) { + if (expr->node_type() == doris::TExprNodeType::CAST_EXPR) { + return expr_without_cast(expr->_children[0]); + } + return expr; + } + + // If this expr is a RuntimeFilterWrapper, this method will return an underlying rf expression + virtual const VExpr* get_impl() const { return nullptr; } + + // If this expr is a BloomPredicate, this method will return a BloomFilterFunc + virtual std::shared_ptr get_bloom_filter_func() const { + LOG(FATAL) << "Method 'get_bloom_filter_func()' is not supported in expression: " + << this->debug_string(); + return nullptr; + } + protected: /// Simple debug string that provides no expr subclass-specific information std::string debug_string(const std::string& expr_name) const { diff --git a/be/src/vec/exprs/vexpr_context.cpp b/be/src/vec/exprs/vexpr_context.cpp index dfb4223741..da4a3a96eb 100644 --- a/be/src/vec/exprs/vexpr_context.cpp +++ b/be/src/vec/exprs/vexpr_context.cpp @@ -28,7 +28,8 @@ VExprContext::VExprContext(VExpr* expr) _prepared(false), _opened(false), _closed(false), - _last_result_column_id(-1) {} + _last_result_column_id(-1), + _stale(false) {} VExprContext::~VExprContext() { DCHECK(!_prepared || _closed); @@ -98,6 +99,12 @@ doris::Status VExprContext::clone(RuntimeState* state, VExprContext** new_ctx) { return _root->open(state, *new_ctx, FunctionContext::THREAD_LOCAL); } +void VExprContext::clone_fn_contexts(VExprContext* other) { + for (auto& _fn_context : _fn_contexts) { + other->_fn_contexts.push_back(_fn_context->impl()->clone(other->_pool.get())); + } +} + int VExprContext::register_func(RuntimeState* state, const FunctionContext::TypeDesc& return_type, const std::vector& arg_types, int varargs_buffer_size) { diff --git a/be/src/vec/exprs/vexpr_context.h b/be/src/vec/exprs/vexpr_context.h index fb31925136..b454833ed3 100644 --- a/be/src/vec/exprs/vexpr_context.h +++ b/be/src/vec/exprs/vexpr_context.h @@ -69,6 +69,13 @@ public: return _is_clone ? FunctionContext::THREAD_LOCAL : FunctionContext::FRAGMENT_LOCAL; } + void clone_fn_contexts(VExprContext* other); + + void mark_as_stale() { + DCHECK(!_stale); + _stale = true; + } + private: friend class VExpr; @@ -91,5 +98,7 @@ private: std::unique_ptr _pool; int _last_result_column_id; + + bool _stale; }; } // namespace doris::vectorized diff --git a/be/src/vec/exprs/vin_predicate.h b/be/src/vec/exprs/vin_predicate.h index 64cc9acefa..6b2264d40a 100644 --- a/be/src/vec/exprs/vin_predicate.h +++ b/be/src/vec/exprs/vin_predicate.h @@ -41,6 +41,10 @@ public: virtual std::string debug_string() const override; + const FunctionBasePtr function() { return _function; }; + + const bool is_not_in() { return _is_not_in; }; + private: FunctionBasePtr _function; std::string _expr_name; diff --git a/be/src/vec/exprs/vruntimefilter_wrapper.h b/be/src/vec/exprs/vruntimefilter_wrapper.h index 755d243fd8..91a6bdbcac 100644 --- a/be/src/vec/exprs/vruntimefilter_wrapper.h +++ b/be/src/vec/exprs/vruntimefilter_wrapper.h @@ -44,6 +44,8 @@ public: return _impl->get_const_col(context); } + const VExpr* get_impl() const override { return _impl; } + private: VExpr* _impl; diff --git a/be/src/vec/exprs/vslot_ref.h b/be/src/vec/exprs/vslot_ref.h index 1bc78a4c5c..0f8540fc9e 100644 --- a/be/src/vec/exprs/vslot_ref.h +++ b/be/src/vec/exprs/vslot_ref.h @@ -41,6 +41,8 @@ public: const int column_id() const { return _column_id; } + const int slot_id() const { return _slot_id; } + private: FunctionPtr _function; int _slot_id; diff --git a/be/src/vec/functions/in.cpp b/be/src/vec/functions/in.cpp index 4e5877a6ec..b0bc1d137f 100644 --- a/be/src/vec/functions/in.cpp +++ b/be/src/vec/functions/in.cpp @@ -16,164 +16,12 @@ // under the License. // This file is copied from -#include +#include -#include "exprs/create_predicate_function.h" -#include "vec/columns/column_const.h" -#include "vec/columns/column_nullable.h" -#include "vec/columns/column_set.h" -#include "vec/columns/columns_number.h" -#include "vec/data_types/data_type_nullable.h" -#include "vec/data_types/data_type_number.h" -#include "vec/functions/function.h" #include "vec/functions/simple_function_factory.h" namespace doris::vectorized { -struct InState { - bool use_set = true; - - // only use in null in set - bool null_in_set = false; - std::unique_ptr hybrid_set; -}; - -template -class FunctionIn : public IFunction { -public: - static constexpr auto name = negative ? "not_in" : "in"; - - static FunctionPtr create() { return std::make_shared(); } - - String get_name() const override { return name; } - - bool is_variadic() const override { return true; } - - size_t get_number_of_arguments() const override { return 0; } - - DataTypePtr get_return_type_impl(const DataTypes& args) const override { - for (const auto& arg : args) { - if (arg->is_nullable()) return make_nullable(std::make_shared()); - } - return std::make_shared(); - } - - bool use_default_implementation_for_nulls() const override { return false; } - - Status prepare(FunctionContext* context, FunctionContext::FunctionStateScope scope) override { - if (scope == FunctionContext::THREAD_LOCAL) { - return Status::OK(); - } - auto* state = new InState(); - context->set_function_state(scope, state); - state->hybrid_set.reset( - vec_create_set(convert_type_to_primitive(context->get_arg_type(0)->type))); - - DCHECK(context->get_num_args() >= 1); - for (int i = 1; i < context->get_num_args(); ++i) { - const auto& const_column_ptr = context->get_constant_col(i); - if (const_column_ptr != nullptr) { - auto const_data = const_column_ptr->column_ptr->get_data_at(0); - if (const_data.data == nullptr) { - state->null_in_set = true; - } else { - state->hybrid_set->insert((void*)const_data.data, const_data.size); - } - } else { - state->use_set = false; - break; - } - } - return Status::OK(); - } - - Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments, - size_t result, size_t input_rows_count) override { - auto in_state = reinterpret_cast( - context->get_function_state(FunctionContext::FRAGMENT_LOCAL)); - if (!in_state) { - return Status::RuntimeError("funciton context for function '{}' must have Set;", - get_name()); - } - auto res = ColumnUInt8::create(); - ColumnUInt8::Container& vec_res = res->get_data(); - vec_res.resize(input_rows_count); - - ColumnUInt8::MutablePtr col_null_map_to; - col_null_map_to = ColumnUInt8::create(input_rows_count); - auto& vec_null_map_to = col_null_map_to->get_data(); - - /// First argument may be a single column. - const ColumnWithTypeAndName& left_arg = block.get_by_position(arguments[0]); - auto materialized_column = left_arg.column->convert_to_full_column_if_const(); - - if (in_state->use_set) { - for (size_t i = 0; i < input_rows_count; ++i) { - const auto& ref_data = materialized_column->get_data_at(i); - if (ref_data.data) { - vec_res[i] = negative ^ - in_state->hybrid_set->find((void*)ref_data.data, ref_data.size); - if (in_state->null_in_set) { - vec_null_map_to[i] = negative == vec_res[i]; - } else { - vec_null_map_to[i] = false; - } - } else { - vec_null_map_to[i] = true; - } - } - } else { - std::vector set_columns; - for (int i = 1; i < arguments.size(); ++i) { - set_columns.emplace_back(block.get_by_position(arguments[i]).column); - } - - for (size_t i = 0; i < input_rows_count; ++i) { - const auto& ref_data = materialized_column->get_data_at(i); - if (ref_data.data == nullptr) { - vec_null_map_to[i] = true; - continue; - } - - std::unique_ptr hybrid_set( - vec_create_set(convert_type_to_primitive(context->get_arg_type(0)->type))); - bool null_in_set = false; - - for (const auto& set_column : set_columns) { - auto set_data = set_column->get_data_at(i); - if (set_data.data == nullptr) - null_in_set = true; - else - hybrid_set->insert((void*)(set_data.data), set_data.size); - } - vec_res[i] = negative ^ hybrid_set->find((void*)ref_data.data, ref_data.size); - if (null_in_set) { - vec_null_map_to[i] = negative == vec_res[i]; - } else { - vec_null_map_to[i] = false; - } - } - } - - if (block.get_by_position(result).type->is_nullable()) { - block.replace_by_position( - result, ColumnNullable::create(std::move(res), std::move(col_null_map_to))); - } else { - block.replace_by_position(result, std::move(res)); - } - - return Status::OK(); - } - - Status close(FunctionContext* context, FunctionContext::FunctionStateScope scope) override { - if (scope == FunctionContext::FRAGMENT_LOCAL) { - delete reinterpret_cast( - context->get_function_state(FunctionContext::FRAGMENT_LOCAL)); - } - return Status::OK(); - } -}; - void register_function_in(SimpleFunctionFactory& factory) { factory.register_function>(); factory.register_function>(); diff --git a/be/src/vec/functions/in.h b/be/src/vec/functions/in.h new file mode 100644 index 0000000000..d10661fa48 --- /dev/null +++ b/be/src/vec/functions/in.h @@ -0,0 +1,178 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from + +#pragma once + +#include + +#include "exprs/create_predicate_function.h" +#include "vec/columns/column_const.h" +#include "vec/columns/column_nullable.h" +#include "vec/columns/column_set.h" +#include "vec/columns/columns_number.h" +#include "vec/data_types/data_type_nullable.h" +#include "vec/data_types/data_type_number.h" +#include "vec/functions/function.h" + +namespace doris::vectorized { + +struct InState { + bool use_set = true; + + // only use in null in set + bool null_in_set = false; + std::unique_ptr hybrid_set; +}; + +template +class FunctionIn : public IFunction { +public: + static constexpr auto name = negative ? "not_in" : "in"; + + static FunctionPtr create() { return std::make_shared(); } + + String get_name() const override { return name; } + + bool is_variadic() const override { return true; } + + size_t get_number_of_arguments() const override { return 0; } + + DataTypePtr get_return_type_impl(const DataTypes& args) const override { + for (const auto& arg : args) { + if (arg->is_nullable()) return make_nullable(std::make_shared()); + } + return std::make_shared(); + } + + bool use_default_implementation_for_nulls() const override { return false; } + + Status prepare(FunctionContext* context, FunctionContext::FunctionStateScope scope) override { + if (scope == FunctionContext::THREAD_LOCAL) { + return Status::OK(); + } + auto* state = new InState(); + context->set_function_state(scope, state); + state->hybrid_set.reset( + vec_create_set(convert_type_to_primitive(context->get_arg_type(0)->type))); + + DCHECK(context->get_num_args() >= 1); + for (int i = 1; i < context->get_num_args(); ++i) { + const auto& const_column_ptr = context->get_constant_col(i); + if (const_column_ptr != nullptr) { + auto const_data = const_column_ptr->column_ptr->get_data_at(0); + if (const_data.data == nullptr) { + state->null_in_set = true; + } else { + state->hybrid_set->insert((void*)const_data.data, const_data.size); + } + } else { + state->use_set = false; + break; + } + } + return Status::OK(); + } + + Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments, + size_t result, size_t input_rows_count) override { + auto in_state = reinterpret_cast( + context->get_function_state(FunctionContext::FRAGMENT_LOCAL)); + if (!in_state) { + return Status::RuntimeError("funciton context for function '{}' must have Set;", + get_name()); + } + auto res = ColumnUInt8::create(); + ColumnUInt8::Container& vec_res = res->get_data(); + vec_res.resize(input_rows_count); + + ColumnUInt8::MutablePtr col_null_map_to; + col_null_map_to = ColumnUInt8::create(input_rows_count); + auto& vec_null_map_to = col_null_map_to->get_data(); + + /// First argument may be a single column. + const ColumnWithTypeAndName& left_arg = block.get_by_position(arguments[0]); + auto materialized_column = left_arg.column->convert_to_full_column_if_const(); + + if (in_state->use_set) { + for (size_t i = 0; i < input_rows_count; ++i) { + const auto& ref_data = materialized_column->get_data_at(i); + if (ref_data.data) { + vec_res[i] = negative ^ + in_state->hybrid_set->find((void*)ref_data.data, ref_data.size); + if (in_state->null_in_set) { + vec_null_map_to[i] = negative == vec_res[i]; + } else { + vec_null_map_to[i] = false; + } + } else { + vec_null_map_to[i] = true; + } + } + } else { + std::vector set_columns; + for (int i = 1; i < arguments.size(); ++i) { + set_columns.emplace_back(block.get_by_position(arguments[i]).column); + } + + for (size_t i = 0; i < input_rows_count; ++i) { + const auto& ref_data = materialized_column->get_data_at(i); + if (ref_data.data == nullptr) { + vec_null_map_to[i] = true; + continue; + } + + std::unique_ptr hybrid_set( + vec_create_set(convert_type_to_primitive(context->get_arg_type(0)->type))); + bool null_in_set = false; + + for (const auto& set_column : set_columns) { + auto set_data = set_column->get_data_at(i); + if (set_data.data == nullptr) + null_in_set = true; + else + hybrid_set->insert((void*)(set_data.data), set_data.size); + } + vec_res[i] = negative ^ hybrid_set->find((void*)ref_data.data, ref_data.size); + if (null_in_set) { + vec_null_map_to[i] = negative == vec_res[i]; + } else { + vec_null_map_to[i] = false; + } + } + } + + if (block.get_by_position(result).type->is_nullable()) { + block.replace_by_position( + result, ColumnNullable::create(std::move(res), std::move(col_null_map_to))); + } else { + block.replace_by_position(result, std::move(res)); + } + + return Status::OK(); + } + + Status close(FunctionContext* context, FunctionContext::FunctionStateScope scope) override { + if (scope == FunctionContext::FRAGMENT_LOCAL) { + delete reinterpret_cast( + context->get_function_state(FunctionContext::FRAGMENT_LOCAL)); + } + return Status::OK(); + } +}; + +} // namespace doris::vectorized