diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index 47157cf74d..ffb92520be 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -735,7 +735,9 @@ public: _is_bloomfilter = true; // we won't use this class to insert or find any data // so any type is ok - _context.bloom_filter_func.reset(create_bloom_filter(PrimitiveType::TYPE_INT)); + _context.bloom_filter_func.reset(create_bloom_filter(_column_return_type == INVALID_TYPE + ? PrimitiveType::TYPE_INT + : _column_return_type)); return _context.bloom_filter_func->assign(data, bloom_filter->filter_length()); } @@ -950,12 +952,6 @@ vectorized::SharedRuntimeFilterContext& IRuntimeFilter::get_shared_context_ref() return _wrapper->_context; } -void IRuntimeFilter::copy_from_other(IRuntimeFilter* other) { - _wrapper->_filter_type = other->_wrapper->_filter_type; - _wrapper->_is_bloomfilter = other->is_bloomfilter(); - _wrapper->_context = other->_wrapper->_context; -} - void IRuntimeFilter::insert_batch(const vectorized::ColumnPtr column, size_t start) { DCHECK(is_producer()); _wrapper->insert_batch(column, start); @@ -988,7 +984,6 @@ Status IRuntimeFilter::publish(bool publish_local) { filter->signal(); } } - return Status::OK(); } else if (_has_local_target) { std::vector filters; RETURN_IF_ERROR(_state->runtime_filter_mgr->get_consume_filters(_filter_id, filters)); @@ -1112,6 +1107,10 @@ bool IRuntimeFilter::is_ready_or_timeout() { } } +PrimitiveType IRuntimeFilter::column_type() const { + return _wrapper->column_type(); +} + void IRuntimeFilter::signal() { DCHECK(is_consumer()); if (_enable_pipeline_exec) { @@ -1249,16 +1248,12 @@ Status IRuntimeFilter::create_wrapper(RuntimeFilterParamsContext* state, } Status IRuntimeFilter::create_wrapper(RuntimeFilterParamsContext* state, - const UpdateRuntimeFilterParamsV2* param, ObjectPool* pool, - std::unique_ptr* wrapper) { - int filter_type = param->request->filter_type(); - PrimitiveType column_type = PrimitiveType::INVALID_TYPE; - if (param->request->has_in_filter()) { - column_type = to_primitive_type(param->request->in_filter().column_type()); - } - wrapper->reset(new RuntimePredicateWrapper(state, pool, column_type, get_type(filter_type), - param->request->filter_id())); - + const UpdateRuntimeFilterParamsV2* param, + RuntimePredicateWrapper** wrapper) { + auto filter_type = param->request->filter_type(); + PrimitiveType column_type = param->column_type; + *wrapper = param->pool->add(new RuntimePredicateWrapper( + state, param->pool, column_type, get_type(filter_type), param->request->filter_id())); switch (filter_type) { case PFilterType::IN_FILTER: { DCHECK(param->request->has_in_filter()); @@ -1650,8 +1645,9 @@ bool IRuntimeFilter::is_bloomfilter() { return _wrapper->is_bloomfilter(); } -template -Status IRuntimeFilter::_update_filter(const T* param) { +Status IRuntimeFilter::update_filter(const UpdateRuntimeFilterParams* param) { + _profile->add_info_string("MergeTime", std::to_string(param->request->merge_time()) + " ms"); + if (param->request->has_in_filter() && param->request->in_filter().has_ignored_msg()) { const PInFilter in_filter = param->request->in_filter(); set_ignored(in_filter.ignored_msg()); @@ -1665,19 +1661,25 @@ Status IRuntimeFilter::_update_filter(const T* param) { } this->signal(); - _profile->add_info_string("MergeTime", std::to_string(param->request->merge_time()) + " ms"); return Status::OK(); } -Status IRuntimeFilter::update_filter(const UpdateRuntimeFilterParams* param) { - return _update_filter(param); -} - -Status IRuntimeFilter::update_filter(const UpdateRuntimeFilterParamsV2* param, - int64_t start_apply) { +void IRuntimeFilter::update_filter(RuntimePredicateWrapper* wrapper, int64_t merge_time, + int64_t start_apply) { _profile->add_info_string("UpdateTime", std::to_string(MonotonicMillis() - start_apply) + " ms"); - return _update_filter(param); + _profile->add_info_string("MergeTime", std::to_string(merge_time) + " ms"); + // prevent apply filter to not have right column_return_type remove + // the code in the future + if (_wrapper->column_type() != wrapper->column_type()) { + wrapper->_column_return_type = _wrapper->_column_return_type; + } + auto origin_type = _wrapper->get_real_type(); + _wrapper = wrapper; + if (origin_type != _wrapper->get_real_type()) { + update_runtime_filter_type_to_profile(); + } + this->signal(); } Status RuntimePredicateWrapper::get_push_exprs(std::list& probe_ctxs, @@ -1691,8 +1693,8 @@ Status RuntimePredicateWrapper::get_push_exprs(std::listroot()->type().type) && is_string_type(_column_return_type)) || _filter_type == RuntimeFilterType::BITMAP_FILTER) - << " prob_expr->root()->type().type: " << probe_ctx->root()->type().type - << " _column_return_type: " << _column_return_type + << " prob_expr->root()->type().type: " << int(probe_ctx->root()->type().type) + << " _column_return_type: " << int(_column_return_type) << " _filter_type: " << IRuntimeFilter::to_string(_filter_type); auto real_filter_type = get_real_type(); diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h index 6d69302c2e..8fcc4cee17 100644 --- a/be/src/exprs/runtime_filter.h +++ b/be/src/exprs/runtime_filter.h @@ -157,13 +157,10 @@ struct UpdateRuntimeFilterParams { }; struct UpdateRuntimeFilterParamsV2 { - UpdateRuntimeFilterParamsV2(const PPublishFilterRequestV2* req, - butil::IOBufAsZeroCopyInputStream* data_stream, - ObjectPool* obj_pool) - : request(req), data(data_stream), pool(obj_pool) {} const PPublishFilterRequestV2* request; butil::IOBufAsZeroCopyInputStream* data; ObjectPool* pool = nullptr; + PrimitiveType column_type = INVALID_TYPE; }; struct MergeRuntimeFilterParams { @@ -220,8 +217,6 @@ public: vectorized::SharedRuntimeFilterContext& get_shared_context_ref(); - void copy_from_other(IRuntimeFilter* other); - // insert data to build filter void insert_batch(vectorized::ColumnPtr column, size_t start); @@ -231,6 +226,8 @@ public: RuntimeFilterType type() const { return _runtime_filter_type; } + PrimitiveType column_type() const; + Status get_push_expr_ctxs(std::list& probe_ctxs, std::vector& push_exprs, bool is_late_arrival); @@ -276,20 +273,21 @@ public: Status merge_from(const RuntimePredicateWrapper* wrapper); - // for ut static Status create_wrapper(RuntimeFilterParamsContext* state, const MergeRuntimeFilterParams* param, ObjectPool* pool, std::unique_ptr* wrapper); static Status create_wrapper(RuntimeFilterParamsContext* state, const UpdateRuntimeFilterParams* param, ObjectPool* pool, std::unique_ptr* wrapper); + static Status create_wrapper(RuntimeFilterParamsContext* state, - const UpdateRuntimeFilterParamsV2* param, ObjectPool* pool, - std::unique_ptr* wrapper); + const UpdateRuntimeFilterParamsV2* param, + RuntimePredicateWrapper** wrapper); void change_to_bloom_filter(); Status init_bloom_filter(const size_t build_bf_cardinality); Status update_filter(const UpdateRuntimeFilterParams* param); - Status update_filter(const UpdateRuntimeFilterParamsV2* param, int64_t start_apply); + void update_filter(RuntimePredicateWrapper* filter_wrapper, int64_t merge_time, + int64_t start_apply); void set_ignored(const std::string& msg); diff --git a/be/src/exprs/runtime_filter_slots.h b/be/src/exprs/runtime_filter_slots.h index 3dcf84ace0..7a738b8c06 100644 --- a/be/src/exprs/runtime_filter_slots.h +++ b/be/src/exprs/runtime_filter_slots.h @@ -34,9 +34,9 @@ class VRuntimeFilterSlots { public: VRuntimeFilterSlots( const std::vector>& build_expr_ctxs, - const std::vector& runtime_filter_descs, bool is_global = false) + const std::vector& runtime_filters, bool is_global = false) : _build_expr_context(build_expr_ctxs), - _runtime_filter_descs(runtime_filter_descs), + _runtime_filters(runtime_filters), _is_global(is_global) {} Status init(RuntimeState* state, int64_t hash_table_size) { @@ -75,33 +75,28 @@ public: // ordered vector: IN, IN_OR_BLOOM, others. // so we can ignore other filter if IN Predicate exists. - std::vector sorted_runtime_filter_descs(_runtime_filter_descs); - auto compare_desc = [](TRuntimeFilterDesc& d1, TRuntimeFilterDesc& d2) { - if (d1.type == d2.type) { + auto compare_desc = [](IRuntimeFilter* d1, IRuntimeFilter* d2) { + if (d1->type() == d2->type()) { return false; - } else if (d1.type == TRuntimeFilterType::IN) { + } else if (d1->type() == RuntimeFilterType::IN_FILTER) { return true; - } else if (d2.type == TRuntimeFilterType::IN) { + } else if (d2->type() == RuntimeFilterType::IN_FILTER) { return false; - } else if (d1.type == TRuntimeFilterType::IN_OR_BLOOM) { + } else if (d1->type() == RuntimeFilterType::IN_OR_BLOOM_FILTER) { return true; - } else if (d2.type == TRuntimeFilterType::IN_OR_BLOOM) { + } else if (d2->type() == RuntimeFilterType::IN_OR_BLOOM_FILTER) { return false; } else { - return d1.type < d2.type; + return d1->type() < d2->type(); } }; - std::sort(sorted_runtime_filter_descs.begin(), sorted_runtime_filter_descs.end(), - compare_desc); + std::sort(_runtime_filters.begin(), _runtime_filters.end(), compare_desc); // do not create 'in filter' when hash_table size over limit const auto max_in_num = state->runtime_filter_max_in_num(); const bool over_max_in_num = (hash_table_size >= max_in_num); - for (auto& filter_desc : sorted_runtime_filter_descs) { - IRuntimeFilter* runtime_filter = nullptr; - RETURN_IF_ERROR(state->runtime_filter_mgr()->get_producer_filter(filter_desc.filter_id, - &runtime_filter)); + for (auto* runtime_filter : _runtime_filters) { if (runtime_filter->expr_order() < 0 || runtime_filter->expr_order() >= _build_expr_context.size()) { return Status::InternalError( @@ -133,10 +128,10 @@ public: bool exists_in_filter = has_in_filter[runtime_filter->expr_order()]; if (is_in_filter && over_max_in_num) { VLOG_DEBUG << "fragment instance " << print_id(state->fragment_instance_id()) - << " ignore runtime filter(in filter id " << filter_desc.filter_id - << ") because: in_num(" << hash_table_size << ") >= max_in_num(" - << max_in_num << ")"; - RETURN_IF_ERROR(ignore_local_filter(filter_desc.filter_id)); + << " ignore runtime filter(in filter id " + << runtime_filter->filter_id() << ") because: in_num(" + << hash_table_size << ") >= max_in_num(" << max_in_num << ")"; + RETURN_IF_ERROR(ignore_local_filter(runtime_filter->filter_id())); continue; } else if (!is_in_filter && exists_in_filter) { // do not create 'bloom filter' and 'minmax filter' when 'in filter' has created @@ -144,15 +139,16 @@ public: VLOG_DEBUG << "fragment instance " << print_id(state->fragment_instance_id()) << " ignore runtime filter(" << IRuntimeFilter::to_string(runtime_filter->type()) << " id " - << filter_desc.filter_id << ") because: already exists in filter"; - RETURN_IF_ERROR(ignore_local_filter(filter_desc.filter_id)); + << runtime_filter->filter_id() + << ") because: already exists in filter"; + RETURN_IF_ERROR(ignore_local_filter(runtime_filter->filter_id())); continue; } } else if (is_in_filter && over_max_in_num) { std::string msg = fmt::format( "fragment instance {} ignore runtime filter(in filter id {}) because: " "in_num({}) >= max_in_num({})", - print_id(state->fragment_instance_id()), filter_desc.filter_id, + print_id(state->fragment_instance_id()), runtime_filter->filter_id(), hash_table_size, max_in_num); RETURN_IF_ERROR(ignore_remote_filter(runtime_filter, msg)); continue; @@ -163,7 +159,7 @@ public: !over_max_in_num)) { has_in_filter[runtime_filter->expr_order()] = true; } - _runtime_filters[runtime_filter->expr_order()].push_back(runtime_filter); + _runtime_filters_map[runtime_filter->expr_order()].push_back(runtime_filter); } return Status::OK(); @@ -171,8 +167,8 @@ public: void insert(const vectorized::Block* block) { for (int i = 0; i < _build_expr_context.size(); ++i) { - auto iter = _runtime_filters.find(i); - if (iter == _runtime_filters.end()) { + auto iter = _runtime_filters_map.find(i); + if (iter == _runtime_filters_map.end()) { continue; } @@ -186,7 +182,7 @@ public: // publish runtime filter Status publish(bool publish_local = false) { - for (auto& pair : _runtime_filters) { + for (auto& pair : _runtime_filters_map) { for (auto& filter : pair.second) { RETURN_IF_ERROR(filter->publish(publish_local)); } @@ -195,7 +191,7 @@ public: } void copy_to_shared_context(vectorized::SharedHashTableContextPtr& context) { - for (auto& it : _runtime_filters) { + for (auto& it : _runtime_filters_map) { for (auto& filter : it.second) { context->runtime_filters[filter->filter_id()] = filter->get_shared_context_ref(); } @@ -203,7 +199,7 @@ public: } Status copy_from_shared_context(vectorized::SharedHashTableContextPtr& context) { - for (auto& it : _runtime_filters) { + for (auto& it : _runtime_filters_map) { for (auto& filter : it.second) { auto filter_id = filter->filter_id(); auto ret = context->runtime_filters.find(filter_id); @@ -216,14 +212,14 @@ public: return Status::OK(); } - bool empty() { return _runtime_filters.empty(); } + bool empty() { return _runtime_filters_map.empty(); } private: const std::vector>& _build_expr_context; - const std::vector& _runtime_filter_descs; + std::vector _runtime_filters; const bool _is_global = false; // prob_contition index -> [IRuntimeFilter] - std::map> _runtime_filters; + std::map> _runtime_filters_map; }; } // namespace doris diff --git a/be/src/exprs/runtime_filter_slots_cross.h b/be/src/exprs/runtime_filter_slots_cross.h index 7b1a2063d1..1d496ddf55 100644 --- a/be/src/exprs/runtime_filter_slots_cross.h +++ b/be/src/exprs/runtime_filter_slots_cross.h @@ -34,17 +34,14 @@ namespace doris { // this class used in cross join node class VRuntimeFilterSlotsCross { public: - VRuntimeFilterSlotsCross(const std::vector& runtime_filter_descs, + VRuntimeFilterSlotsCross(const std::vector& runtime_filters, const vectorized::VExprContextSPtrs& src_expr_ctxs) - : _runtime_filter_descs(runtime_filter_descs), filter_src_expr_ctxs(src_expr_ctxs) {} + : _runtime_filters(runtime_filters), filter_src_expr_ctxs(src_expr_ctxs) {} ~VRuntimeFilterSlotsCross() = default; Status init(RuntimeState* state) { - for (auto& filter_desc : _runtime_filter_descs) { - IRuntimeFilter* runtime_filter = nullptr; - RETURN_IF_ERROR(state->runtime_filter_mgr()->get_producer_filter(filter_desc.filter_id, - &runtime_filter)); + for (auto* runtime_filter : _runtime_filters) { if (runtime_filter == nullptr) { return Status::InternalError("runtime filter is nullptr"); } @@ -53,7 +50,6 @@ public: runtime_filter->has_remote_target()) { return Status::InternalError("cross join runtime filter has remote target"); } - _runtime_filters.push_back(runtime_filter); } return Status::OK(); } @@ -85,9 +81,8 @@ public: bool empty() const { return _runtime_filters.empty(); } private: - const std::vector& _runtime_filter_descs; + const std::vector& _runtime_filters; const vectorized::VExprContextSPtrs filter_src_expr_ctxs; - std::vector _runtime_filters; }; } // namespace doris diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp index fb459163f6..5cde609f1c 100644 --- a/be/src/olap/push_handler.cpp +++ b/be/src/olap/push_handler.cpp @@ -373,7 +373,7 @@ Status PushBrokerReader::init() { TQueryOptions query_options; TQueryGlobals query_globals; _runtime_state = RuntimeState::create_unique(params, query_options, query_globals, - ExecEnv::GetInstance()); + ExecEnv::GetInstance(), nullptr); DescriptorTbl* desc_tbl = nullptr; Status status = DescriptorTbl::create(_runtime_state->obj_pool(), _t_desc_tbl, &desc_tbl); if (UNLIKELY(!status.ok())) { diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index 4597bf9876..19de8fb7bd 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -97,14 +97,11 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo // Hash Table Init _hash_table_init(state); - _runtime_filters.resize(p._runtime_filter_descs.size()); for (size_t i = 0; i < p._runtime_filter_descs.size(); i++) { RETURN_IF_ERROR(state->runtime_filter_mgr()->register_producer_filter( - p._runtime_filter_descs[i], state->query_options(), _build_expr_ctxs.size() == 1, - p._use_global_rf, p._child_x->parallel_tasks())); - RETURN_IF_ERROR(state->runtime_filter_mgr()->get_producer_filter( - p._runtime_filter_descs[i].filter_id, &_runtime_filters[i])); + p._runtime_filter_descs[i], state->query_options(), &_runtime_filters[i], + _build_expr_ctxs.size() == 1, p._use_global_rf, p._child_x->parallel_tasks())); } return Status::OK(); @@ -121,10 +118,6 @@ bool HashJoinBuildSinkLocalState::build_unique() const { return _parent->cast()._build_unique; } -std::vector& HashJoinBuildSinkLocalState::runtime_filter_descs() const { - return _parent->cast()._runtime_filter_descs; -} - void HashJoinBuildSinkLocalState::init_short_circuit_for_probe() { auto& p = _parent->cast(); _shared_state->short_circuit_for_probe = @@ -386,9 +379,7 @@ HashJoinBuildSinkOperatorX::HashJoinBuildSinkOperatorX(ObjectPool* pool, int ope _partition_exprs(tnode.__isset.distribute_expr_lists && !_is_broadcast_join ? tnode.distribute_expr_lists[1] : std::vector {}), - _use_global_rf(use_global_rf) { - _runtime_filter_descs = tnode.runtime_filters; -} + _use_global_rf(use_global_rf) {} Status HashJoinBuildSinkOperatorX::prepare(RuntimeState* state) { if (_is_broadcast_join) { @@ -548,13 +539,13 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* __builtin_unreachable(); }, [&](auto&& arg) -> Status { - if (_runtime_filter_descs.empty()) { + if (local_state._runtime_filters.empty()) { return Status::OK(); } local_state._runtime_filter_slots = - std::make_shared(_build_expr_ctxs, - _runtime_filter_descs, - use_global_rf); + std::make_shared( + _build_expr_ctxs, local_state._runtime_filters, + use_global_rf); RETURN_IF_ERROR(local_state._runtime_filter_slots->init( state, arg.hash_table->size())); diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h b/be/src/pipeline/exec/hashjoin_build_sink.h index c3d6038b3e..56a651e421 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.h +++ b/be/src/pipeline/exec/hashjoin_build_sink.h @@ -70,7 +70,6 @@ public: void init_short_circuit_for_probe(); bool build_unique() const; - std::vector& runtime_filter_descs() const; std::shared_ptr arena() { return _shared_state->arena; } void add_hash_buckets_info(const std::string& info) const { @@ -101,7 +100,6 @@ protected: // build expr vectorized::VExprContextSPtrs _build_expr_ctxs; - std::vector _runtime_filters; bool _should_build_hash_table = true; int64_t _build_side_mem_used = 0; int64_t _build_side_last_mem_used = 0; @@ -190,7 +188,6 @@ private: std::shared_ptr _shared_hashtable_controller; vectorized::SharedHashTableContextPtr _shared_hash_table_context = nullptr; - std::vector _runtime_filter_descs; const std::vector _partition_exprs; const bool _use_global_rf; diff --git a/be/src/pipeline/exec/join_build_sink_operator.cpp b/be/src/pipeline/exec/join_build_sink_operator.cpp index 6b930ff8a5..1141acc650 100644 --- a/be/src/pipeline/exec/join_build_sink_operator.cpp +++ b/be/src/pipeline/exec/join_build_sink_operator.cpp @@ -78,7 +78,8 @@ JoinBuildSinkOperatorX::JoinBuildSinkOperatorX(ObjectPool* pool, : tnode.hash_join_node.__isset.is_mark ? tnode.hash_join_node.is_mark : false), _short_circuit_for_null_in_build_side(_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && - !_is_mark_join) { + !_is_mark_join), + _runtime_filter_descs(tnode.runtime_filters) { _init_join_op(); if (_is_mark_join) { DCHECK(_join_op == TJoinOp::LEFT_ANTI_JOIN || _join_op == TJoinOp::LEFT_SEMI_JOIN || diff --git a/be/src/pipeline/exec/join_build_sink_operator.h b/be/src/pipeline/exec/join_build_sink_operator.h index f7c1415d37..9cf5be80f8 100644 --- a/be/src/pipeline/exec/join_build_sink_operator.h +++ b/be/src/pipeline/exec/join_build_sink_operator.h @@ -33,6 +33,8 @@ class JoinBuildSinkLocalState : public PipelineXSinkLocalState { public: Status init(RuntimeState* state, LocalSinkStateInfo& info) override; + const std::vector& runtime_filters() const { return _runtime_filters; } + protected: JoinBuildSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) : PipelineXSinkLocalState(parent, state) {} @@ -43,6 +45,7 @@ protected: RuntimeProfile::Counter* _build_rows_counter = nullptr; RuntimeProfile::Counter* _publish_runtime_filter_timer = nullptr; RuntimeProfile::Counter* _runtime_filter_compute_timer = nullptr; + std::vector _runtime_filters; }; template @@ -75,6 +78,8 @@ protected: // 2. In build phase, we stop materialize build side when we meet the first null value and set _has_null_in_build_side to true. // 3. In probe phase, if _has_null_in_build_side is true, join node returns empty block directly. Otherwise, probing will continue as the same as generic left anti join. const bool _short_circuit_for_null_in_build_side; + + const std::vector _runtime_filter_descs; }; } // namespace pipeline diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp index aec93c66b6..c30bb5ad67 100644 --- a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp +++ b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp @@ -40,24 +40,20 @@ Status NestedLoopJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkSta for (size_t i = 0; i < _filter_src_expr_ctxs.size(); i++) { RETURN_IF_ERROR(p._filter_src_expr_ctxs[i]->clone(state, _filter_src_expr_ctxs[i])); } + _runtime_filters.resize(p._runtime_filter_descs.size()); for (size_t i = 0; i < p._runtime_filter_descs.size(); i++) { RETURN_IF_ERROR(state->runtime_filter_mgr()->register_producer_filter( - p._runtime_filter_descs[i], state->query_options())); + p._runtime_filter_descs[i], state->query_options(), &_runtime_filters[i])); } return Status::OK(); } -const std::vector& NestedLoopJoinBuildSinkLocalState::runtime_filter_descs() { - return _parent->cast()._runtime_filter_descs; -} - NestedLoopJoinBuildSinkOperatorX::NestedLoopJoinBuildSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, const DescriptorTbl& descs) : JoinBuildSinkOperatorX(pool, operator_id, tnode, descs), - _runtime_filter_descs(tnode.runtime_filters), _is_output_left_side_only(tnode.nested_loop_join_node.__isset.is_output_left_side_only && tnode.nested_loop_join_node.is_output_left_side_only), _row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples) {} diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.h b/be/src/pipeline/exec/nested_loop_join_build_operator.h index 27f847cc00..e89d6a9d0b 100644 --- a/be/src/pipeline/exec/nested_loop_join_build_operator.h +++ b/be/src/pipeline/exec/nested_loop_join_build_operator.h @@ -65,7 +65,6 @@ public: Status init(RuntimeState* state, LocalSinkStateInfo& info) override; - const std::vector& runtime_filter_descs(); vectorized::VExprContextSPtrs& filter_src_expr_ctxs() { return _filter_src_expr_ctxs; } RuntimeProfile::Counter* runtime_filter_compute_timer() { return _runtime_filter_compute_timer; @@ -115,7 +114,6 @@ private: vectorized::VExprContextSPtrs _filter_src_expr_ctxs; - const std::vector _runtime_filter_descs; const bool _is_output_left_side_only; RowDescriptor _row_descriptor; }; diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 1fb50c9ea0..6c183a96f6 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -229,13 +229,12 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re // 1. init _runtime_state _runtime_state = RuntimeState::create_unique( local_params.fragment_instance_id, request.query_id, request.fragment_id, - request.query_options, _query_ctx->query_globals, _exec_env); + request.query_options, _query_ctx->query_globals, _exec_env, _query_ctx.get()); if (local_params.__isset.runtime_filter_params) { _runtime_state->set_runtime_filter_params(local_params.runtime_filter_params); } _runtime_state->set_task_execution_context(shared_from_this()); - _runtime_state->set_query_ctx(_query_ctx.get()); _runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker); // TODO should be combine with plan_fragment_executor.prepare funciton diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index b8a3bf7e92..b247d0ce83 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -75,9 +75,7 @@ public: TUniqueId get_fragment_instance_id() const { return _fragment_instance_id; } - RuntimeState* get_runtime_state(UniqueId /*fragment_instance_id*/) { - return _runtime_state.get(); - } + RuntimeState* get_runtime_state() { return _runtime_state.get(); } virtual RuntimeFilterMgr* get_runtime_filter_mgr(UniqueId /*fragment_instance_id*/) { return _runtime_state->runtime_filter_mgr(); diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 13336ea7ea..67c5da9706 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -182,8 +182,7 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r // 1. Set up the global runtime state. _runtime_state = RuntimeState::create_unique(request.query_id, request.fragment_id, request.query_options, _query_ctx->query_globals, - _exec_env); - _runtime_state->set_query_ctx(_query_ctx.get()); + _exec_env, _query_ctx.get()); _runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker); SCOPED_ATTACH_TASK(_runtime_state.get()); @@ -479,7 +478,6 @@ Status PipelineXFragmentContext::_build_pipeline_tasks( auto init_runtime_state = [&](std::unique_ptr& runtime_state) { runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker); - runtime_state->set_query_ctx(_query_ctx.get()); runtime_state->set_task_execution_context(shared_from_this()); runtime_state->set_be_number(local_params.backend_num); @@ -566,7 +564,7 @@ Status PipelineXFragmentContext::_build_pipeline_tasks( _task_runtime_states.push_back(RuntimeState::create_unique( this, local_params.fragment_instance_id, request.query_id, request.fragment_id, request.query_options, _query_ctx->query_globals, - _exec_env)); + _exec_env, _query_ctx.get())); auto& task_runtime_state = _task_runtime_states.back(); init_runtime_state(task_runtime_state); auto cur_task_id = _total_tasks++; diff --git a/be/src/runtime/fold_constant_executor.cpp b/be/src/runtime/fold_constant_executor.cpp index 33fb034b60..de1fd04688 100644 --- a/be/src/runtime/fold_constant_executor.cpp +++ b/be/src/runtime/fold_constant_executor.cpp @@ -136,7 +136,7 @@ Status FoldConstantExecutor::_init(const TQueryGlobals& query_globals, fragment_params.params = params; fragment_params.protocol_version = PaloInternalServiceVersion::V1; _runtime_state = RuntimeState::create_unique(fragment_params.params, query_options, - query_globals, ExecEnv::GetInstance()); + query_globals, ExecEnv::GetInstance(), nullptr); DescriptorTbl* desc_tbl = nullptr; Status status = DescriptorTbl::create(_runtime_state->obj_pool(), TDescriptorTable(), &desc_tbl); diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 2eff857a3d..e42467bd6c 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -807,7 +807,7 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, std::shared_ptr handler; static_cast(_runtimefilter_controller.add_entity( params.local_params[i], params.query_id, params.query_options, &handler, - RuntimeFilterParamsContext::create(context->get_runtime_state(UniqueId())))); + RuntimeFilterParamsContext::create(context->get_runtime_state()))); context->set_merge_controller_handler(handler); const TUniqueId& fragment_instance_id = params.local_params[i].fragment_instance_id; { @@ -887,7 +887,7 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, std::shared_ptr handler; static_cast(_runtimefilter_controller.add_entity( local_params, params.query_id, params.query_options, &handler, - RuntimeFilterParamsContext::create(context->get_runtime_state(UniqueId())))); + RuntimeFilterParamsContext::create(context->get_runtime_state()))); context->set_merge_controller_handler(handler); { @@ -1366,20 +1366,29 @@ Status FragmentMgr::apply_filterv2(const PPublishFilterRequestV2* request, pool = &fragment_executor->get_query_ctx()->obj_pool; } - UpdateRuntimeFilterParamsV2 params(request, attach_data, pool); - int filter_id = request->filter_id(); + // 1. get the target filters std::vector filters; - RETURN_IF_ERROR(runtime_filter_mgr->get_consume_filters(filter_id, filters)); + RETURN_IF_ERROR(runtime_filter_mgr->get_consume_filters(request->filter_id(), filters)); - IRuntimeFilter* first_filter = nullptr; - for (auto filter : filters) { - if (!first_filter) { - RETURN_IF_ERROR(filter->update_filter(¶ms, start_apply)); - first_filter = filter; - } else { - filter->copy_from_other(first_filter); + // 2. create the filter wrapper to replace or ignore the target filters + if (request->has_in_filter() && request->in_filter().has_ignored_msg()) { + const auto& in_filter = request->in_filter(); + + std::ranges::for_each(filters, [&in_filter](auto& filter) { + filter->set_ignored(in_filter.ignored_msg()); filter->signal(); - } + }); + } else if (!filters.empty()) { + UpdateRuntimeFilterParamsV2 params {request, attach_data, pool, + filters[0]->column_type()}; + RuntimePredicateWrapper* filter_wrapper = nullptr; + RETURN_IF_ERROR(IRuntimeFilter::create_wrapper( + runtime_filter_mgr->get_runtime_filter_context_state(), ¶ms, + &filter_wrapper)); + + std::ranges::for_each(filters, [&](auto& filter) { + filter->update_filter(filter_wrapper, request->merge_time(), start_apply); + }); } } diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index cbeb38204b..9040fa1204 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -126,9 +126,8 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request) { // VLOG_CRITICAL << "request:\n" << apache::thrift::ThriftDebugString(request); const TQueryGlobals& query_globals = _query_ctx->query_globals; - _runtime_state = - RuntimeState::create_unique(params, request.query_options, query_globals, _exec_env); - _runtime_state->set_query_ctx(_query_ctx.get()); + _runtime_state = RuntimeState::create_unique(params, request.query_options, query_globals, + _exec_env, _query_ctx.get()); _runtime_state->set_task_execution_context(shared_from_this()); _runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker); diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp index 8281ae2ea6..155bb430ec 100644 --- a/be/src/runtime/runtime_filter_mgr.cpp +++ b/be/src/runtime/runtime_filter_mgr.cpp @@ -58,17 +58,6 @@ RuntimeFilterMgr::RuntimeFilterMgr(const UniqueId& query_id, RuntimeFilterParams ExecEnv::GetInstance()->experimental_mem_tracker()); } -Status RuntimeFilterMgr::get_producer_filter(const int filter_id, IRuntimeFilter** target) { - std::lock_guard l(_lock); - auto iter = _producer_map.find(filter_id); - if (iter == _producer_map.end()) { - return Status::InvalidArgument("unknown filter: {}, role: PRODUCER", filter_id); - } - - *target = iter->second; - return Status::OK(); -} - Status RuntimeFilterMgr::get_consume_filters(const int filter_id, std::vector& consumer_filters) { std::lock_guard l(_lock); @@ -101,10 +90,7 @@ Status RuntimeFilterMgr::register_consumer_filter(const TRuntimeFilterDesc& desc } // TODO: union the remote opt and global two case as one case to one judge - bool remote_opt_or_global = - (desc.__isset.opt_remote_rf && desc.opt_remote_rf && desc.has_remote_targets && - desc.type == TRuntimeFilterType::BLOOM) || - is_global; + bool remote_opt_or_global = (desc.__isset.opt_remote_rf && desc.opt_remote_rf) || is_global; if (!has_exist) { IRuntimeFilter* filter; @@ -122,6 +108,7 @@ Status RuntimeFilterMgr::register_consumer_filter(const TRuntimeFilterDesc& desc Status RuntimeFilterMgr::register_producer_filter(const TRuntimeFilterDesc& desc, const TQueryOptions& options, + IRuntimeFilter** producer_filter, bool build_bf_exactly, bool is_global, int parallel_tasks) { SCOPED_CONSUME_MEM_TRACKER(_tracker.get()); @@ -133,11 +120,10 @@ Status RuntimeFilterMgr::register_producer_filter(const TRuntimeFilterDesc& desc if (iter != _producer_map.end()) { return Status::InvalidArgument("filter has registed"); } - IRuntimeFilter* filter; RETURN_IF_ERROR(IRuntimeFilter::create(_state, &_pool, &desc, &options, - RuntimeFilterRole::PRODUCER, -1, &filter, + RuntimeFilterRole::PRODUCER, -1, producer_filter, build_bf_exactly, is_global, parallel_tasks)); - _producer_map.emplace(key, filter); + _producer_map.emplace(key, *producer_filter); return Status::OK(); } @@ -196,7 +182,6 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc( const TRuntimeFilterDesc* runtime_filter_desc, const TQueryOptions* query_options, const std::vector* targetv2_info, const int producer_size) { - std::unique_lock guard(_filter_map_mutex); std::shared_ptr cnt_val = std::make_shared(); // runtime_filter_desc and target will be released, // so we need to copy to cnt_val @@ -206,9 +191,10 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc( cnt_val->pool.reset(new ObjectPool()); cnt_val->filter = cnt_val->pool->add( new IRuntimeFilter(_state, &_state->get_query_ctx()->obj_pool, runtime_filter_desc)); - auto filter_id = runtime_filter_desc->filter_id; RETURN_IF_ERROR(cnt_val->filter->init_with_desc(&cnt_val->runtime_filter_desc, query_options)); + + std::unique_lock guard(_filter_map_mutex); _filter_map.emplace(filter_id, CntlValwithLock {cnt_val, std::make_unique()}); return Status::OK(); } @@ -303,7 +289,6 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ if (merged_size == cnt_val->producer_size) { if (opt_remote_rf) { DCHECK_GT(cnt_val->targetv2_info.size(), 0); - DCHECK(cnt_val->filter->is_bloomfilter()); // Optimize merging phase iff: // 1. All BE has been upgraded (e.g. _opt_remote_rf) // 2. FE has been upgraded (e.g. cnt_val->targetv2_info.size() > 0) @@ -492,8 +477,6 @@ RuntimeFilterParamsContext* RuntimeFilterParamsContext::create(QueryContext* que params->query_id.set_hi(query_ctx->query_id().hi); params->query_id.set_lo(query_ctx->query_id().lo); - // params->fragment_instance_id.set_hi(state->fragment_instance_id().hi); - // params->fragment_instance_id.set_lo(state->fragment_instance_id().lo); params->be_exec_version = query_ctx->be_exec_version(); params->query_ctx = query_ctx; params->_obj_pool = &query_ctx->obj_pool; diff --git a/be/src/runtime/runtime_filter_mgr.h b/be/src/runtime/runtime_filter_mgr.h index de55e34fc1..31552ba623 100644 --- a/be/src/runtime/runtime_filter_mgr.h +++ b/be/src/runtime/runtime_filter_mgr.h @@ -73,15 +73,13 @@ public: Status get_consume_filters(const int filter_id, std::vector& consumer_filters); - Status get_producer_filter(const int filter_id, IRuntimeFilter** producer_filter); - // register filter Status register_consumer_filter(const TRuntimeFilterDesc& desc, const TQueryOptions& options, int node_id, IRuntimeFilter** consumer_filter, bool build_bf_exactly = false, bool is_global = false); Status register_producer_filter(const TRuntimeFilterDesc& desc, const TQueryOptions& options, - bool build_bf_exactly = false, bool is_global = false, - int parallel_tasks = 0); + IRuntimeFilter** producer_filter, bool build_bf_exactly = false, + bool is_global = false, int parallel_tasks = 0); // update filter by remote Status update_filter(const PPublishFilterRequest* request, @@ -91,6 +89,8 @@ public: Status get_merge_addr(TNetworkAddress* addr); + RuntimeFilterParamsContext* get_runtime_filter_context_state() const { return _state; } + private: struct ConsumerFilterHolder { int node_id; diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index d84b86f229..d25d914147 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -76,7 +76,7 @@ RuntimeState::RuntimeState(const TUniqueId& fragment_instance_id, RuntimeState::RuntimeState(const TPlanFragmentExecParams& fragment_exec_params, const TQueryOptions& query_options, const TQueryGlobals& query_globals, - ExecEnv* exec_env) + ExecEnv* exec_env, QueryContext* ctx) : _profile("Fragment " + print_id(fragment_exec_params.fragment_instance_id)), _load_channel_profile(""), _obj_pool(new ObjectPool()), @@ -93,7 +93,8 @@ RuntimeState::RuntimeState(const TPlanFragmentExecParams& fragment_exec_params, _num_finished_scan_range(0), _normal_row_number(0), _error_row_number(0), - _error_log_file(nullptr) { + _error_log_file(nullptr), + _query_ctx(ctx) { Status status = init(fragment_exec_params.fragment_instance_id, query_options, query_globals, exec_env); DCHECK(status.ok()); @@ -106,7 +107,7 @@ RuntimeState::RuntimeState(const TPlanFragmentExecParams& fragment_exec_params, RuntimeState::RuntimeState(const TUniqueId& instance_id, const TUniqueId& query_id, int32_t fragment_id, const TQueryOptions& query_options, - const TQueryGlobals& query_globals, ExecEnv* exec_env) + const TQueryGlobals& query_globals, ExecEnv* exec_env, QueryContext* ctx) : _profile("Fragment " + print_id(instance_id)), _load_channel_profile(""), _obj_pool(new ObjectPool()), @@ -125,7 +126,8 @@ RuntimeState::RuntimeState(const TUniqueId& instance_id, const TUniqueId& query_ _num_finished_scan_range(0), _normal_row_number(0), _error_row_number(0), - _error_log_file(nullptr) { + _error_log_file(nullptr), + _query_ctx(ctx) { [[maybe_unused]] auto status = init(instance_id, query_options, query_globals, exec_env); DCHECK(status.ok()); _runtime_filter_mgr.reset( @@ -135,7 +137,7 @@ RuntimeState::RuntimeState(const TUniqueId& instance_id, const TUniqueId& query_ RuntimeState::RuntimeState(pipeline::PipelineXFragmentContext*, const TUniqueId& instance_id, const TUniqueId& query_id, int32_t fragment_id, const TQueryOptions& query_options, const TQueryGlobals& query_globals, - ExecEnv* exec_env) + ExecEnv* exec_env, QueryContext* ctx) : _profile("Fragment " + print_id(instance_id)), _load_channel_profile(""), _obj_pool(new ObjectPool()), @@ -155,14 +157,15 @@ RuntimeState::RuntimeState(pipeline::PipelineXFragmentContext*, const TUniqueId& _num_finished_scan_range(0), _normal_row_number(0), _error_row_number(0), - _error_log_file(nullptr) { + _error_log_file(nullptr), + _query_ctx(ctx) { [[maybe_unused]] auto status = init(instance_id, query_options, query_globals, exec_env); DCHECK(status.ok()); } RuntimeState::RuntimeState(const TUniqueId& query_id, int32_t fragment_id, const TQueryOptions& query_options, const TQueryGlobals& query_globals, - ExecEnv* exec_env) + ExecEnv* exec_env, QueryContext* ctx) : _profile("PipelineX " + std::to_string(fragment_id)), _load_channel_profile(""), _obj_pool(new ObjectPool()), @@ -181,7 +184,8 @@ RuntimeState::RuntimeState(const TUniqueId& query_id, int32_t fragment_id, _num_finished_scan_range(0), _normal_row_number(0), _error_row_number(0), - _error_log_file(nullptr) { + _error_log_file(nullptr), + _query_ctx(ctx) { // TODO: do we really need instance id? Status status = init(TUniqueId(), query_options, query_globals, exec_env); DCHECK(status.ok()); diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 38053d3cb6..cdc7e83f04 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -70,20 +70,20 @@ public: RuntimeState(const TPlanFragmentExecParams& fragment_exec_params, const TQueryOptions& query_options, const TQueryGlobals& query_globals, - ExecEnv* exec_env); + ExecEnv* exec_env, QueryContext* ctx); RuntimeState(const TUniqueId& instance_id, const TUniqueId& query_id, int32 fragment_id, const TQueryOptions& query_options, const TQueryGlobals& query_globals, - ExecEnv* exec_env); + ExecEnv* exec_env, QueryContext* ctx); // for only use in pipelineX RuntimeState(pipeline::PipelineXFragmentContext*, const TUniqueId& instance_id, const TUniqueId& query_id, int32 fragment_id, const TQueryOptions& query_options, - const TQueryGlobals& query_globals, ExecEnv* exec_env); + const TQueryGlobals& query_globals, ExecEnv* exec_env, QueryContext* ctx); // Used by pipelineX. This runtime state is only used for setup. RuntimeState(const TUniqueId& query_id, int32 fragment_id, const TQueryOptions& query_options, - const TQueryGlobals& query_globals, ExecEnv* exec_env); + const TQueryGlobals& query_globals, ExecEnv* exec_env, QueryContext* ctx); // RuntimeState for executing expr in fe-support. RuntimeState(const TQueryGlobals& query_globals); @@ -466,8 +466,6 @@ public: _pipeline_x_runtime_filter_mgr = pipeline_x_runtime_filter_mgr; } - void set_query_ctx(QueryContext* ctx) { _query_ctx = ctx; } - QueryContext* get_query_ctx() { return _query_ctx; } void set_query_mem_tracker(const std::shared_ptr& tracker) { diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index 3ca828a7f3..67dffa4b20 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -92,7 +92,6 @@ HashJoinNode::HashJoinNode(ObjectPool* pool, const TPlanNode& tnode, const Descr _hash_output_slot_ids(tnode.hash_join_node.__isset.hash_output_slot_ids ? tnode.hash_join_node.hash_output_slot_ids : std::vector {}) { - _runtime_filter_descs = tnode.runtime_filters; _arena = std::make_shared(); _hash_table_variants = std::make_shared(); _process_hashtable_ctx_variants = std::make_unique(); @@ -180,12 +179,10 @@ Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) { } #endif - _runtime_filters.resize(_runtime_filter_descs.size()); for (size_t i = 0; i < _runtime_filter_descs.size(); i++) { RETURN_IF_ERROR(state->runtime_filter_mgr()->register_producer_filter( - _runtime_filter_descs[i], state->query_options(), _probe_expr_ctxs.size() == 1)); - RETURN_IF_ERROR(state->runtime_filter_mgr()->get_producer_filter( - _runtime_filter_descs[i].filter_id, &_runtime_filters[i])); + _runtime_filter_descs[i], state->query_options(), &_runtime_filters[i], + _probe_expr_ctxs.size() == 1)); } // init left/right output slots flags, only column of slot_id in _hash_output_slot_ids need @@ -795,11 +792,11 @@ Status HashJoinNode::sink(doris::RuntimeState* state, vectorized::Block* in_bloc __builtin_unreachable(); }, [&](auto&& arg) -> Status { - if (_runtime_filter_descs.empty()) { + if (_runtime_filters.empty()) { return Status::OK(); } _runtime_filter_slots = std::make_shared( - _build_expr_ctxs, _runtime_filter_descs); + _build_expr_ctxs, _runtime_filters); RETURN_IF_ERROR(_runtime_filter_slots->init( state, arg.hash_table->size())); diff --git a/be/src/vec/exec/join/vhash_join_node.h b/be/src/vec/exec/join/vhash_join_node.h index a017633e5c..58451c360e 100644 --- a/be/src/vec/exec/join/vhash_join_node.h +++ b/be/src/vec/exec/join/vhash_join_node.h @@ -53,9 +53,7 @@ template struct HashCRC32; namespace doris { - class ObjectPool; -class IRuntimeFilter; class DescriptorTbl; class RuntimeState; @@ -77,11 +75,11 @@ class HashJoinNode; template Status process_runtime_filter_build(RuntimeState* state, Block* block, Parent* parent, bool is_global = false) { - if (parent->runtime_filter_descs().empty()) { + if (parent->runtime_filters().empty()) { return Status::OK(); } parent->_runtime_filter_slots = std::make_shared( - parent->_build_expr_ctxs, parent->runtime_filter_descs(), is_global); + parent->_build_expr_ctxs, parent->runtime_filters(), is_global); RETURN_IF_ERROR(parent->_runtime_filter_slots->init(state, block->rows())); @@ -236,7 +234,6 @@ public: DataTypes right_table_data_types() { return _right_table_data_types; } DataTypes left_table_data_types() { return _left_table_data_types; } bool build_unique() const { return _build_unique; } - std::vector& runtime_filter_descs() { return _runtime_filter_descs; } std::shared_ptr arena() { return _arena; } protected: @@ -400,9 +397,6 @@ private: friend Status process_runtime_filter_build(RuntimeState* state, vectorized::Block* block, Parent* parent, bool is_global); - std::vector _runtime_filter_descs; - - std::vector _runtime_filters; std::atomic_bool _probe_open_finish = false; std::vector _build_col_ids; }; diff --git a/be/src/vec/exec/join/vjoin_node_base.cpp b/be/src/vec/exec/join/vjoin_node_base.cpp index 3436209c4c..6fab6b8b91 100644 --- a/be/src/vec/exec/join/vjoin_node_base.cpp +++ b/be/src/vec/exec/join/vjoin_node_base.cpp @@ -79,7 +79,9 @@ VJoinNodeBase::VJoinNodeBase(ObjectPool* pool, const TPlanNode& tnode, const Des : tnode.hash_join_node.__isset.is_mark && tnode.hash_join_node.is_mark), _short_circuit_for_null_in_build_side(_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && - !_is_mark_join) { + !_is_mark_join), + _runtime_filter_descs(tnode.runtime_filters) { + _runtime_filters.resize(_runtime_filter_descs.size()); _init_join_op(); if (_is_mark_join) { DCHECK(_join_op == TJoinOp::LEFT_ANTI_JOIN || _join_op == TJoinOp::LEFT_SEMI_JOIN || diff --git a/be/src/vec/exec/join/vjoin_node_base.h b/be/src/vec/exec/join/vjoin_node_base.h index 95d10a0a07..0e6ac3c983 100644 --- a/be/src/vec/exec/join/vjoin_node_base.h +++ b/be/src/vec/exec/join/vjoin_node_base.h @@ -35,7 +35,7 @@ namespace doris { class ObjectPool; class RuntimeState; - +class IRuntimeFilter; } // namespace doris namespace doris::vectorized { @@ -74,6 +74,8 @@ public: [[nodiscard]] bool can_terminate_early() override { return _short_circuit_for_probe; } + const std::vector& runtime_filters() const { return _runtime_filters; } + protected: // Construct the intermediate blocks to store the results from join operation. void _construct_mutable_join_block(); @@ -147,6 +149,9 @@ protected: RuntimeProfile::Counter* _runtime_filter_compute_timer = nullptr; RuntimeProfile::Counter* _join_filter_timer = nullptr; RuntimeProfile::Counter* _build_output_block_timer = nullptr; + + std::vector _runtime_filter_descs; + std::vector _runtime_filters; }; } // namespace doris::vectorized diff --git a/be/src/vec/exec/join/vnested_loop_join_node.cpp b/be/src/vec/exec/join/vnested_loop_join_node.cpp index 08d4c7d448..150068096b 100644 --- a/be/src/vec/exec/join/vnested_loop_join_node.cpp +++ b/be/src/vec/exec/join/vnested_loop_join_node.cpp @@ -34,7 +34,6 @@ #include "common/compiler_util.h" // IWYU pragma: keep #include "common/status.h" #include "exec/exec_node.h" -#include "exprs/runtime_filter.h" #include "exprs/runtime_filter_slots_cross.h" #include "gutil/integral_types.h" #include "pipeline/exec/nested_loop_join_build_operator.h" @@ -43,14 +42,12 @@ #include "runtime/runtime_state.h" #include "util/runtime_profile.h" #include "util/simd/bits.h" -#include "vec/columns/column_const.h" #include "vec/columns/column_filter_helper.h" #include "vec/columns/column_nullable.h" #include "vec/columns/column_vector.h" #include "vec/columns/columns_number.h" #include "vec/common/assert_cast.h" #include "vec/core/column_with_type_and_name.h" -#include "vec/core/types.h" #include "vec/data_types/data_type.h" #include "vec/data_types/data_type_number.h" #include "vec/exprs/vexpr.h" @@ -65,10 +62,10 @@ namespace doris::vectorized { template Status RuntimeFilterBuild::operator()(RuntimeState* state) { - if (_parent->runtime_filter_descs().empty()) { + if (_parent->runtime_filters().empty()) { return Status::OK(); } - VRuntimeFilterSlotsCross runtime_filter_slots(_parent->runtime_filter_descs(), + VRuntimeFilterSlotsCross runtime_filter_slots(_parent->runtime_filters(), _parent->filter_src_expr_ctxs()); RETURN_IF_ERROR(runtime_filter_slots.init(state)); @@ -96,8 +93,7 @@ VNestedLoopJoinNode::VNestedLoopJoinNode(ObjectPool* pool, const TPlanNode& tnod _matched_rows_done(false), _left_block_pos(0), _left_side_eos(false), - _old_version_flag(!tnode.__isset.nested_loop_join_node), - _runtime_filter_descs(tnode.runtime_filters) { + _old_version_flag(!tnode.__isset.nested_loop_join_node) { _left_block = Block::create_shared(); } @@ -123,7 +119,7 @@ Status VNestedLoopJoinNode::init(const TPlanNode& tnode, RuntimeState* state) { for (size_t i = 0; i < _runtime_filter_descs.size(); i++) { filter_src_exprs.push_back(_runtime_filter_descs[i].src_expr); RETURN_IF_ERROR(state->runtime_filter_mgr()->register_producer_filter( - _runtime_filter_descs[i], state->query_options())); + _runtime_filter_descs[i], state->query_options(), &_runtime_filters[i])); } RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(filter_src_exprs, _filter_src_expr_ctxs)); return Status::OK(); diff --git a/be/src/vec/exec/join/vnested_loop_join_node.h b/be/src/vec/exec/join/vnested_loop_join_node.h index 7326485927..a8021bb425 100644 --- a/be/src/vec/exec/join/vnested_loop_join_node.h +++ b/be/src/vec/exec/join/vnested_loop_join_node.h @@ -97,7 +97,6 @@ public: std::shared_ptr get_left_block() { return _left_block; } - std::vector& runtime_filter_descs() { return _runtime_filter_descs; } VExprContextSPtrs& filter_src_expr_ctxs() { return _filter_src_expr_ctxs; } RuntimeProfile::Counter* runtime_filter_compute_timer() { return _runtime_filter_compute_timer; @@ -267,7 +266,6 @@ private: MutableColumns _dst_columns; - std::vector _runtime_filter_descs; VExprContextSPtrs _filter_src_expr_ctxs; bool _is_output_left_side_only = false; bool _need_more_input_data = true; diff --git a/be/src/vec/exec/runtime_filter_consumer.cpp b/be/src/vec/exec/runtime_filter_consumer.cpp index 5e2d90bf62..eba11dac45 100644 --- a/be/src/vec/exec/runtime_filter_consumer.cpp +++ b/be/src/vec/exec/runtime_filter_consumer.cpp @@ -53,10 +53,7 @@ Status RuntimeFilterConsumer::_register_runtime_filter(bool is_global) { IRuntimeFilter* runtime_filter = nullptr; const auto& filter_desc = _runtime_filter_descs[i]; if (filter_desc.__isset.opt_remote_rf && filter_desc.opt_remote_rf) { - DCHECK(filter_desc.type == TRuntimeFilterType::BLOOM && filter_desc.has_remote_targets); - // Optimize merging phase iff: - // 1. All BE and FE has been upgraded (e.g. opt_remote_rf) - // 2. This filter is bloom filter (only bloom filter should be used for merging) + DCHECK(filter_desc.has_remote_targets); RETURN_IF_ERROR(_state->get_query_ctx()->runtime_filter_mgr()->register_consumer_filter( filter_desc, _state->query_options(), _filter_id, &runtime_filter, false, is_global)); diff --git a/be/test/vec/exec/vwal_scanner_test.cpp b/be/test/vec/exec/vwal_scanner_test.cpp index 6f983ca5bf..b47d756345 100644 --- a/be/test/vec/exec/vwal_scanner_test.cpp +++ b/be/test/vec/exec/vwal_scanner_test.cpp @@ -41,7 +41,6 @@ public: _profile = _runtime_state.runtime_profile(); _runtime_state.init_mem_trackers(); static_cast(_runtime_state.init(unique_id, query_options, query_globals, _env)); - _runtime_state.set_query_ctx(query_ctx); } void init(); @@ -75,7 +74,6 @@ private: TUniqueId unique_id; TQueryOptions query_options; TQueryGlobals query_globals; - QueryContext* query_ctx = nullptr; }; void VWalScannerTest::init_desc_table() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java index 58d4d3eabe..ade087e2de 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java @@ -108,8 +108,6 @@ public final class RuntimeFilter { private boolean bitmapFilterNotIn = false; - private boolean useRemoteRfOpt = true; - private TMinMaxRuntimeFilterType tMinMaxRuntimeFilterType; private boolean bloomFilterSizeCalculatedByNdv = false; @@ -207,17 +205,6 @@ public final class RuntimeFilter { this.bitmapFilterNotIn = bitmapFilterNotIn; } - public void computeUseRemoteRfOpt() { - for (RuntimeFilterTarget target : targets) { - useRemoteRfOpt = useRemoteRfOpt && hasRemoteTargets && runtimeFilterType == TRuntimeFilterType.BLOOM - && target.expr instanceof SlotRef; - } - } - - public boolean getUseRemoteRfOpt() { - return useRemoteRfOpt; - } - /** * Serializes a runtime filter to Thrift. */ @@ -229,12 +216,8 @@ public final class RuntimeFilter { tFilter.setIsBroadcastJoin(isBroadcastJoin); tFilter.setHasLocalTargets(hasLocalTargets); tFilter.setHasRemoteTargets(hasRemoteTargets); - boolean optRemoteRf = true; for (RuntimeFilterTarget target : targets) { tFilter.putToPlanIdToTargetExpr(target.node.getId().asInt(), target.expr.treeToThrift()); - // TODO: now only support SlotRef - optRemoteRf = optRemoteRf && hasRemoteTargets && runtimeFilterType == TRuntimeFilterType.BLOOM - && target.expr instanceof SlotRef; } tFilter.setType(runtimeFilterType); tFilter.setBloomFilterSizeBytes(filterSizeBytes); @@ -245,7 +228,7 @@ public final class RuntimeFilter { if (runtimeFilterType.equals(TRuntimeFilterType.MIN_MAX)) { tFilter.setMinMaxType(tMinMaxRuntimeFilterType); } - tFilter.setOptRemoteRf(optRemoteRf); + tFilter.setOptRemoteRf(hasRemoteTargets); tFilter.setBloomFilterSizeCalculatedByNdv(bloomFilterSizeCalculatedByNdv); return tFilter; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 57dfde62f6..083d3cd3fd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -3676,8 +3676,7 @@ public class Coordinator implements CoordInterface { continue; } List fParams = ridToTargetParam.get(rf.getFilterId()); - rf.computeUseRemoteRfOpt(); - if (rf.getUseRemoteRfOpt()) { + if (rf.hasRemoteTargets()) { Map targetParamsV2 = new HashMap<>(); for (FRuntimeFilterTargetParam targetParam : fParams) { if (targetParamsV2.containsKey(targetParam.targetFragmentInstanceAddr)) { @@ -3810,8 +3809,7 @@ public class Coordinator implements CoordInterface { continue; } List fParams = ridToTargetParam.get(rf.getFilterId()); - rf.computeUseRemoteRfOpt(); - if (rf.getUseRemoteRfOpt()) { + if (rf.hasRemoteTargets()) { Map targetParamsV2 = new HashMap<>(); for (FRuntimeFilterTargetParam targetParam : fParams) { if (targetParamsV2.containsKey(targetParam.targetFragmentInstanceAddr)) {