From ab32299ba40bef55d4ba626a43df8b313c7718dc Mon Sep 17 00:00:00 2001 From: xzj7019 <131111794+xzj7019@users.noreply.github.com> Date: Fri, 16 Jun 2023 20:26:00 +0800 Subject: [PATCH] [feature](nereids) Support multi target rf #20714 Support multi target runtime filter, mainly for set operation, such as union/intersect/except. --- be/src/exprs/runtime_filter.cpp | 19 +- be/src/exprs/runtime_filter.h | 2 + be/src/exprs/runtime_filter_slots.h | 12 +- be/src/runtime/fragment_mgr.cpp | 16 +- be/src/runtime/runtime_filter_mgr.cpp | 153 +++++++++------- be/src/runtime/runtime_filter_mgr.h | 29 ++-- be/src/vec/columns/column_nullable.h | 6 +- be/src/vec/exec/join/vhash_join_node.cpp | 5 +- .../vec/exec/join/vnested_loop_join_node.cpp | 4 +- .../vec/exec/runtime_filter_consumer_node.cpp | 164 ++++++++++++++++++ .../vec/exec/runtime_filter_consumer_node.h | 73 ++++++++ be/src/vec/exec/scan/vscan_node.cpp | 138 +-------------- be/src/vec/exec/scan/vscan_node.h | 37 +--- be/src/vec/exec/vdata_gen_scan_node.cpp | 12 +- be/src/vec/exec/vselect_node.cpp | 20 ++- be/src/vec/exec/vselect_node.h | 7 +- .../translator/PhysicalPlanTranslator.java | 2 +- .../translator/RuntimeFilterTranslator.java | 103 ++++++----- .../processor/post/RuntimeFilterContext.java | 10 +- .../post/RuntimeFilterGenerator.java | 108 +++++++++--- .../trees/plans/physical/RuntimeFilter.java | 28 +-- .../apache/doris/planner/RuntimeFilter.java | 26 +-- .../doris/planner/RuntimeFilterGenerator.java | 41 +++-- .../postprocess/RuntimeFilterTest.java | 2 +- 24 files changed, 634 insertions(+), 383 deletions(-) create mode 100644 be/src/vec/exec/runtime_filter_consumer_node.cpp create mode 100644 be/src/vec/exec/runtime_filter_consumer_node.h diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index c11ab8715c..2c6f3662bb 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -1105,6 +1105,12 @@ Status IRuntimeFilter::copy_from_shared_context(vectorized::SharedRuntimeFilterC return Status::OK(); } +void IRuntimeFilter::copy_from_other(IRuntimeFilter* other) { + _wrapper->_filter_type = other->_wrapper->_filter_type; + _wrapper->_is_bloomfilter = other->is_bloomfilter(); + _wrapper->_context = other->_wrapper->_context; +} + void IRuntimeFilter::insert(const void* data) { DCHECK(is_producer()); if (!_is_ignored) { @@ -1126,13 +1132,14 @@ void IRuntimeFilter::insert_batch(const vectorized::ColumnPtr column, Status IRuntimeFilter::publish() { DCHECK(is_producer()); if (_has_local_target) { - IRuntimeFilter* consumer_filter = nullptr; - RETURN_IF_ERROR( - _state->runtime_filter_mgr()->get_consume_filter(_filter_id, &consumer_filter)); + std::vector filters; + RETURN_IF_ERROR(_state->runtime_filter_mgr()->get_consume_filters(_filter_id, filters)); // push down - consumer_filter->_wrapper = _wrapper; - consumer_filter->update_runtime_filter_type_to_profile(); - consumer_filter->signal(); + for (auto filter : filters) { + filter->_wrapper = _wrapper; + filter->update_runtime_filter_type_to_profile(); + filter->signal(); + } return Status::OK(); } else { TNetworkAddress addr; diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h index 7042dccc1c..a4fd241a28 100644 --- a/be/src/exprs/runtime_filter.h +++ b/be/src/exprs/runtime_filter.h @@ -207,6 +207,8 @@ public: void copy_to_shared_context(vectorized::SharedRuntimeFilterContext& context); Status copy_from_shared_context(vectorized::SharedRuntimeFilterContext& context); + void copy_from_other(IRuntimeFilter* other); + // insert data to build filter // only used for producer void insert(const void* data); diff --git a/be/src/exprs/runtime_filter_slots.h b/be/src/exprs/runtime_filter_slots.h index 08532251bc..816e62c731 100644 --- a/be/src/exprs/runtime_filter_slots.h +++ b/be/src/exprs/runtime_filter_slots.h @@ -51,11 +51,13 @@ public: std::map has_in_filter; auto ignore_local_filter = [state](int filter_id) { - IRuntimeFilter* consumer_filter = nullptr; - state->runtime_filter_mgr()->get_consume_filter(filter_id, &consumer_filter); - DCHECK(consumer_filter != nullptr); - consumer_filter->set_ignored(); - consumer_filter->signal(); + std::vector filters; + state->runtime_filter_mgr()->get_consume_filters(filter_id, filters); + DCHECK(!filters.empty()); + for (auto filter : filters) { + filter->set_ignored(); + filter->signal(); + } }; auto ignore_remote_filter = [](IRuntimeFilter* runtime_filter, std::string& msg) { diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index c98bebafa1..4581768199 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -1276,9 +1276,19 @@ Status FragmentMgr::apply_filterv2(const PPublishFilterRequestV2* request, UpdateRuntimeFilterParamsV2 params(request, attach_data, pool); int filter_id = request->filter_id(); - IRuntimeFilter* real_filter = nullptr; - RETURN_IF_ERROR(runtime_filter_mgr->get_consume_filter(filter_id, &real_filter)); - RETURN_IF_ERROR(real_filter->update_filter(¶ms, start_apply)); + std::vector filters; + RETURN_IF_ERROR(runtime_filter_mgr->get_consume_filters(filter_id, filters)); + + IRuntimeFilter* first_filter = nullptr; + for (auto filter : filters) { + if (!first_filter) { + RETURN_IF_ERROR(filter->update_filter(¶ms, start_apply)); + first_filter = filter; + } else { + filter->copy_from_other(first_filter); + filter->signal(); + } + } } return Status::OK(); diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp index e00149a7fa..e2b4c525ac 100644 --- a/be/src/runtime/runtime_filter_mgr.cpp +++ b/be/src/runtime/runtime_filter_mgr.cpp @@ -55,91 +55,124 @@ RuntimeFilterMgr::RuntimeFilterMgr(const UniqueId& query_id, RuntimeState* state RuntimeFilterMgr::RuntimeFilterMgr(const UniqueId& query_id, QueryContext* query_ctx) : _query_ctx(query_ctx) {} -RuntimeFilterMgr::~RuntimeFilterMgr() = default; - Status RuntimeFilterMgr::init() { _tracker = std::make_unique("RuntimeFilterMgr", ExecEnv::GetInstance()->experimental_mem_tracker()); return Status::OK(); } -Status RuntimeFilterMgr::get_filter_by_role(const int filter_id, const RuntimeFilterRole role, - IRuntimeFilter** target) { +Status RuntimeFilterMgr::get_producer_filter(const int filter_id, IRuntimeFilter** target) { int32_t key = filter_id; - std::map* filter_map = nullptr; - if (role == RuntimeFilterRole::CONSUMER) { - filter_map = &_consumer_map; - } else { - filter_map = &_producer_map; + auto iter = _producer_map.find(key); + if (iter == _producer_map.end()) { + LOG(WARNING) << "unknown runtime filter: " << key << ", role: PRODUCER"; + return Status::InvalidArgument("unknown filter"); } - auto iter = filter_map->find(key); - if (iter == filter_map->end()) { - return Status::InternalError("get filter failed, key={}, role={}", key, (int)role); - } - *target = iter->second.filter; + *target = iter->second; return Status::OK(); } -Status RuntimeFilterMgr::get_consume_filter(const int filter_id, IRuntimeFilter** consumer_filter) { - return get_filter_by_role(filter_id, RuntimeFilterRole::CONSUMER, consumer_filter); +Status RuntimeFilterMgr::get_consume_filter(const int filter_id, const int node_id, + IRuntimeFilter** consumer_filter) { + auto iter = _consumer_map.find(filter_id); + if (iter == _consumer_map.cend()) { + LOG(WARNING) << "unknown runtime filter: " << filter_id << ", role: consumer"; + return Status::InvalidArgument("unknown filter"); + } + + for (auto& item : iter->second) { + if (item.node_id == node_id) { + *consumer_filter = item.filter; + return Status::OK(); + } + } + + return Status::InvalidArgument( + fmt::format("unknown filter, filter_id: {}, node_id: {}", filter_id, node_id)); } -Status RuntimeFilterMgr::get_producer_filter(const int filter_id, - IRuntimeFilter** producer_filter) { - return get_filter_by_role(filter_id, RuntimeFilterRole::PRODUCER, producer_filter); +Status RuntimeFilterMgr::get_consume_filters(const int filter_id, + std::vector& consumer_filters) { + int32_t key = filter_id; + auto iter = _consumer_map.find(key); + if (iter == _consumer_map.end()) { + LOG(WARNING) << "unknown runtime filter: " << key << ", role: consumer"; + return Status::InvalidArgument("unknown filter"); + } + for (auto& holder : iter->second) { + consumer_filters.emplace_back(holder.filter); + } + return Status::OK(); } -Status RuntimeFilterMgr::register_filter(const RuntimeFilterRole role, - const TRuntimeFilterDesc& desc, - const TQueryOptions& options, int node_id, - bool build_bf_exactly) { - DCHECK(role != RuntimeFilterRole::CONSUMER || node_id >= 0); +Status RuntimeFilterMgr::register_consumer_filter(const TRuntimeFilterDesc& desc, + const TQueryOptions& options, int node_id, + bool build_bf_exactly) { SCOPED_CONSUME_MEM_TRACKER(_tracker.get()); int32_t key = desc.filter_id; - std::map* filter_map = nullptr; - if (role == RuntimeFilterRole::CONSUMER) { - filter_map = &_consumer_map; - } else { - filter_map = &_producer_map; - } - VLOG_NOTICE << "regist filter...:" << key << ",role:" << (int)role; - - auto iter = filter_map->find(key); - - RuntimeFilterMgrVal filter_mgr_val; - filter_mgr_val.role = role; - - if (desc.__isset.opt_remote_rf && desc.opt_remote_rf && role == RuntimeFilterRole::CONSUMER && - desc.has_remote_targets && desc.type == TRuntimeFilterType::BLOOM) { + auto iter = _consumer_map.find(key); + if (desc.__isset.opt_remote_rf && desc.opt_remote_rf && desc.has_remote_targets && + desc.type == TRuntimeFilterType::BLOOM) { // if this runtime filter has remote target (e.g. need merge), we reuse the runtime filter between all instances DCHECK(_query_ctx != nullptr); - if (iter != filter_map->end()) { - return Status::OK(); - } + { std::lock_guard l(_lock); - iter = filter_map->find(key); - if (iter != filter_map->end()) { - return Status::OK(); + + iter = _consumer_map.find(key); + if (iter != _consumer_map.end()) { + for (auto holder : iter->second) { + if (holder.node_id == node_id) { + return Status::OK(); + } + } } + IRuntimeFilter* filter; RETURN_IF_ERROR(IRuntimeFilter::create(_query_ctx, &_query_ctx->obj_pool, &desc, - &options, role, node_id, &filter_mgr_val.filter, - build_bf_exactly)); - filter_map->emplace(key, filter_mgr_val); + + &options, RuntimeFilterRole::CONSUMER, node_id, + &filter, build_bf_exactly)); + _consumer_map[key].emplace_back(node_id, filter); } } else { DCHECK(_state != nullptr); - if (iter != filter_map->end()) { - return Status::InvalidArgument("filter has registed"); - } - RETURN_IF_ERROR(IRuntimeFilter::create(_state, &_pool, &desc, &options, role, node_id, - &filter_mgr_val.filter, build_bf_exactly)); - filter_map->emplace(key, filter_mgr_val); - } + if (iter != _consumer_map.end()) { + for (auto holder : iter->second) { + if (holder.node_id == node_id) { + return Status::InvalidArgument("filter has registered"); + } + } + } + + IRuntimeFilter* filter; + RETURN_IF_ERROR(IRuntimeFilter::create(_state, &_pool, &desc, &options, + RuntimeFilterRole::CONSUMER, node_id, &filter, + build_bf_exactly)); + _consumer_map[key].emplace_back(node_id, filter); + } + return Status::OK(); +} + +Status RuntimeFilterMgr::register_producer_filter(const TRuntimeFilterDesc& desc, + const TQueryOptions& options, + bool build_bf_exactly) { + SCOPED_CONSUME_MEM_TRACKER(_tracker.get()); + int32_t key = desc.filter_id; + auto iter = _producer_map.find(key); + + DCHECK(_state != nullptr); + if (iter != _producer_map.end()) { + return Status::InvalidArgument("filter has registed"); + } + IRuntimeFilter* filter; + RETURN_IF_ERROR(IRuntimeFilter::create(_state, &_pool, &desc, &options, + RuntimeFilterRole::PRODUCER, -1, &filter, + build_bf_exactly)); + _producer_map.emplace(key, filter); return Status::OK(); } @@ -148,9 +181,13 @@ Status RuntimeFilterMgr::update_filter(const PPublishFilterRequest* request, SCOPED_CONSUME_MEM_TRACKER(_tracker.get()); UpdateRuntimeFilterParams params(request, data, &_pool); int filter_id = request->filter_id(); - IRuntimeFilter* real_filter = nullptr; - RETURN_IF_ERROR(get_consume_filter(filter_id, &real_filter)); - return real_filter->update_filter(¶ms); + std::vector filters; + RETURN_IF_ERROR(get_consume_filters(filter_id, filters)); + for (auto filter : filters) { + RETURN_IF_ERROR(filter->update_filter(¶ms)); + } + + return Status::OK(); } void RuntimeFilterMgr::set_runtime_filter_params( diff --git a/be/src/runtime/runtime_filter_mgr.h b/be/src/runtime/runtime_filter_mgr.h index 0a0a70ec7a..f3cb32813f 100644 --- a/be/src/runtime/runtime_filter_mgr.h +++ b/be/src/runtime/runtime_filter_mgr.h @@ -68,18 +68,22 @@ public: RuntimeFilterMgr(const UniqueId& query_id, QueryContext* query_ctx); - ~RuntimeFilterMgr(); + ~RuntimeFilterMgr() = default; Status init(); - // get a consumer filter by filter-id - Status get_consume_filter(const int filter_id, IRuntimeFilter** consumer_filter); + Status get_consume_filter(const int filter_id, const int node_id, + IRuntimeFilter** consumer_filter); + + Status get_consume_filters(const int filter_id, std::vector& consumer_filters); Status get_producer_filter(const int filter_id, IRuntimeFilter** producer_filter); - // regist filter - Status register_filter(const RuntimeFilterRole role, const TRuntimeFilterDesc& desc, - const TQueryOptions& options, int node_id = -1, - bool build_bf_exactly = false); + + // register filter + Status register_consumer_filter(const TRuntimeFilterDesc& desc, const TQueryOptions& options, + int node_id, bool build_bf_exactly = false); + Status register_producer_filter(const TRuntimeFilterDesc& desc, const TQueryOptions& options, + bool build_bf_exactly = false); // update filter by remote Status update_filter(const PPublishFilterRequest* request, @@ -90,19 +94,16 @@ public: Status get_merge_addr(TNetworkAddress* addr); private: - Status get_filter_by_role(const int filter_id, const RuntimeFilterRole role, - IRuntimeFilter** target); - - struct RuntimeFilterMgrVal { - RuntimeFilterRole role; // consumer or producer + struct ConsumerFilterHolder { + int node_id; IRuntimeFilter* filter; }; // RuntimeFilterMgr is owned by RuntimeState, so we only // use filter_id as key // key: "filter-id" /// TODO: should it need protected by a mutex? - std::map _consumer_map; - std::map _producer_map; + std::map> _consumer_map; + std::map _producer_map; RuntimeState* _state; QueryContext* _query_ctx; diff --git a/be/src/vec/columns/column_nullable.h b/be/src/vec/columns/column_nullable.h index 7d4001f6fe..d5ca7f844b 100644 --- a/be/src/vec/columns/column_nullable.h +++ b/be/src/vec/columns/column_nullable.h @@ -98,7 +98,11 @@ public: const char* get_family_name() const override { return "Nullable"; } std::string get_name() const override { return "Nullable(" + nested_column->get_name() + ")"; } MutableColumnPtr clone_resized(size_t size) const override; - size_t size() const override { return nested_column->size(); } + size_t size() const override { + return nested_column->size( + + ); + } bool is_null_at(size_t n) const override { return assert_cast(*null_map).get_data()[n] != 0; } diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index 0770d26e74..2a239c48b0 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -392,9 +392,8 @@ Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) { _runtime_filters.resize(_runtime_filter_descs.size()); for (size_t i = 0; i < _runtime_filter_descs.size(); i++) { - RETURN_IF_ERROR(state->runtime_filter_mgr()->register_filter( - RuntimeFilterRole::PRODUCER, _runtime_filter_descs[i], state->query_options(), -1, - _probe_expr_ctxs.size() == 1)); + RETURN_IF_ERROR(state->runtime_filter_mgr()->register_producer_filter( + _runtime_filter_descs[i], state->query_options(), _probe_expr_ctxs.size() == 1)); RETURN_IF_ERROR(state->runtime_filter_mgr()->get_producer_filter( _runtime_filter_descs[i].filter_id, &_runtime_filters[i])); } diff --git a/be/src/vec/exec/join/vnested_loop_join_node.cpp b/be/src/vec/exec/join/vnested_loop_join_node.cpp index 2a603e7680..0b50860874 100644 --- a/be/src/vec/exec/join/vnested_loop_join_node.cpp +++ b/be/src/vec/exec/join/vnested_loop_join_node.cpp @@ -124,8 +124,8 @@ Status VNestedLoopJoinNode::init(const TPlanNode& tnode, RuntimeState* state) { std::vector filter_src_exprs; for (size_t i = 0; i < _runtime_filter_descs.size(); i++) { filter_src_exprs.push_back(_runtime_filter_descs[i].src_expr); - RETURN_IF_ERROR(state->runtime_filter_mgr()->register_filter( - RuntimeFilterRole::PRODUCER, _runtime_filter_descs[i], state->query_options())); + RETURN_IF_ERROR(state->runtime_filter_mgr()->register_producer_filter( + _runtime_filter_descs[i], state->query_options())); } RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(filter_src_exprs, _filter_src_expr_ctxs)); return Status::OK(); diff --git a/be/src/vec/exec/runtime_filter_consumer_node.cpp b/be/src/vec/exec/runtime_filter_consumer_node.cpp new file mode 100644 index 0000000000..dd631ce66e --- /dev/null +++ b/be/src/vec/exec/runtime_filter_consumer_node.cpp @@ -0,0 +1,164 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/runtime_filter_consumer_node.h" + +namespace doris::vectorized { + +RuntimeFilterConsumerNode::RuntimeFilterConsumerNode(ObjectPool* pool, const TPlanNode& tnode, + const DescriptorTbl& descs) + : ExecNode(pool, tnode, descs), _runtime_filter_descs(tnode.runtime_filters) {} + +Status RuntimeFilterConsumerNode::init(const TPlanNode& tnode, RuntimeState* state) { + RETURN_IF_ERROR(ExecNode::init(tnode, state)); + _state = state; + RETURN_IF_ERROR(_register_runtime_filter()); + return Status::OK(); +} + +Status RuntimeFilterConsumerNode::_register_runtime_filter() { + int filter_size = _runtime_filter_descs.size(); + _runtime_filter_ctxs.reserve(filter_size); + _runtime_filter_ready_flag.reserve(filter_size); + for (int i = 0; i < filter_size; ++i) { + IRuntimeFilter* runtime_filter = nullptr; + const auto& filter_desc = _runtime_filter_descs[i]; + if (filter_desc.__isset.opt_remote_rf && filter_desc.opt_remote_rf) { + DCHECK(filter_desc.type == TRuntimeFilterType::BLOOM && filter_desc.has_remote_targets); + // Optimize merging phase iff: + // 1. All BE and FE has been upgraded (e.g. opt_remote_rf) + // 2. This filter is bloom filter (only bloom filter should be used for merging) + RETURN_IF_ERROR(_state->get_query_ctx()->runtime_filter_mgr()->register_consumer_filter( + filter_desc, _state->query_options(), id(), false)); + RETURN_IF_ERROR(_state->get_query_ctx()->runtime_filter_mgr()->get_consume_filter( + filter_desc.filter_id, id(), &runtime_filter)); + } else { + RETURN_IF_ERROR(_state->runtime_filter_mgr()->register_consumer_filter( + filter_desc, _state->query_options(), id(), false)); + RETURN_IF_ERROR(_state->runtime_filter_mgr()->get_consume_filter( + filter_desc.filter_id, id(), &runtime_filter)); + } + _runtime_filter_ctxs.emplace_back(runtime_filter); + _runtime_filter_ready_flag.emplace_back(false); + } + return Status::OK(); +} + +bool RuntimeFilterConsumerNode::runtime_filters_are_ready_or_timeout() { + if (!_blocked_by_rf) { + return true; + } + for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) { + IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtime_filter; + if (!runtime_filter->is_ready_or_timeout()) { + return false; + } + } + _blocked_by_rf = false; + return true; +} + +Status RuntimeFilterConsumerNode::_acquire_runtime_filter(bool wait) { + SCOPED_TIMER(_acquire_runtime_filter_timer); + VExprSPtrs vexprs; + for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) { + IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtime_filter; + bool ready = runtime_filter->is_ready(); + if (!ready && wait) { + ready = runtime_filter->await(); + } + if (ready && !_runtime_filter_ctxs[i].apply_mark) { + RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(&vexprs)); + _runtime_filter_ctxs[i].apply_mark = true; + } else if ((wait || !runtime_filter->is_ready_or_timeout()) && + runtime_filter->current_state() == RuntimeFilterState::NOT_READY && + !_runtime_filter_ctxs[i].apply_mark) { + _blocked_by_rf = true; + } else if (!_runtime_filter_ctxs[i].apply_mark) { + DCHECK(runtime_filter->current_state() != RuntimeFilterState::NOT_READY); + _is_all_rf_applied = false; + } + } + RETURN_IF_ERROR(_append_rf_into_conjuncts(vexprs)); + if (_blocked_by_rf) { + return Status::WaitForRf("Runtime filters are neither not ready nor timeout"); + } + + return Status::OK(); +} + +Status RuntimeFilterConsumerNode::_append_rf_into_conjuncts(const VExprSPtrs& vexprs) { + if (vexprs.empty()) { + return Status::OK(); + } + + for (auto& expr : vexprs) { + VExprContextSPtr conjunct = VExprContext::create_shared(expr); + RETURN_IF_ERROR(conjunct->prepare(_state, _row_descriptor)); + RETURN_IF_ERROR(conjunct->open(_state)); + _rf_vexpr_set.insert(expr); + _conjuncts.emplace_back(conjunct); + } + + return Status::OK(); +} + +Status RuntimeFilterConsumerNode::try_append_late_arrival_runtime_filter(int* arrived_rf_num) { + if (_is_all_rf_applied) { + *arrived_rf_num = _runtime_filter_descs.size(); + return Status::OK(); + } + + // This method will be called in scanner thread. + // So need to add lock + std::unique_lock l(_rf_locks); + if (_is_all_rf_applied) { + *arrived_rf_num = _runtime_filter_descs.size(); + return Status::OK(); + } + + // 1. Check if are runtime filter ready but not applied. + VExprSPtrs exprs; + int current_arrived_rf_num = 0; + for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) { + if (_runtime_filter_ctxs[i].apply_mark) { + ++current_arrived_rf_num; + continue; + } else if (_runtime_filter_ctxs[i].runtime_filter->is_ready()) { + RETURN_IF_ERROR(_runtime_filter_ctxs[i].runtime_filter->get_prepared_exprs( + &exprs, _row_descriptor, _state)); + ++current_arrived_rf_num; + _runtime_filter_ctxs[i].apply_mark = true; + } + } + // 2. Append unapplied runtime filters to vconjunct_ctx_ptr + if (!exprs.empty()) { + RETURN_IF_ERROR(_append_rf_into_conjuncts(exprs)); + } + if (current_arrived_rf_num == _runtime_filter_descs.size()) { + _is_all_rf_applied = true; + } + + *arrived_rf_num = current_arrived_rf_num; + return Status::OK(); +} + +void RuntimeFilterConsumerNode::_prepare_rf_timer(RuntimeProfile* profile) { + _acquire_runtime_filter_timer = ADD_TIMER(profile, "AcquireRuntimeFilterTime"); +} + +} // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/exec/runtime_filter_consumer_node.h b/be/src/vec/exec/runtime_filter_consumer_node.h new file mode 100644 index 0000000000..518e0e865c --- /dev/null +++ b/be/src/vec/exec/runtime_filter_consumer_node.h @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "exec/exec_node.h" +#include "exprs/runtime_filter.h" + +namespace doris::vectorized { + +class RuntimeFilterConsumerNode : public ExecNode { +public: + RuntimeFilterConsumerNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); + ~RuntimeFilterConsumerNode() override = default; + + Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) override; + + // Try append late arrived runtime filters. + // Return num of filters which are applied already. + Status try_append_late_arrival_runtime_filter(int* arrived_rf_num); + + bool runtime_filters_are_ready_or_timeout(); + +protected: + // Register and get all runtime filters at Init phase. + Status _register_runtime_filter(); + // Get all arrived runtime filters at Open phase. + Status _acquire_runtime_filter(bool wait = true); + // Append late-arrival runtime filters to the vconjunct_ctx. + Status _append_rf_into_conjuncts(const VExprSPtrs& vexprs); + + void _prepare_rf_timer(RuntimeProfile* profile); + + // For runtime filters + struct RuntimeFilterContext { + RuntimeFilterContext() : apply_mark(false), runtime_filter(nullptr) {} + RuntimeFilterContext(IRuntimeFilter* rf) : apply_mark(false), runtime_filter(rf) {} + // set to true if this runtime filter is already applied to vconjunct_ctx_ptr + bool apply_mark; + IRuntimeFilter* runtime_filter; + }; + + RuntimeState* _state; + + std::vector _runtime_filter_ctxs; + + std::vector _runtime_filter_descs; + // Set to true if the runtime filter is ready. + std::vector _runtime_filter_ready_flag; + doris::Mutex _rf_locks; + phmap::flat_hash_set _rf_vexpr_set; + // True means all runtime filters are applied to scanners + bool _is_all_rf_applied = true; + bool _blocked_by_rf = false; + + RuntimeProfile::Counter* _acquire_runtime_filter_timer = nullptr; +}; + +} // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp index 4c55150e53..852998c25c 100644 --- a/be/src/vec/exec/scan/vscan_node.cpp +++ b/be/src/vec/exec/scan/vscan_node.cpp @@ -94,8 +94,7 @@ static bool ignore_cast(SlotDescriptor* slot, VExpr* expr) { } Status VScanNode::init(const TPlanNode& tnode, RuntimeState* state) { - RETURN_IF_ERROR(ExecNode::init(tnode, state)); - _state = state; + RETURN_IF_ERROR(RuntimeFilterConsumerNode::init(tnode, state)); _is_pipeline_scan = state->enable_pipeline_exec(); const TQueryOptions& query_options = state->query_options(); @@ -109,9 +108,6 @@ Status VScanNode::init(const TPlanNode& tnode, RuntimeState* state) { } else { _max_pushdown_conditions_per_column = config::max_pushdown_conditions_per_column; } - - RETURN_IF_ERROR(_register_runtime_filter()); - return Status::OK(); } @@ -149,7 +145,8 @@ Status VScanNode::prepare(RuntimeState* state) { // if you want to add some profile in scan node, even it have not new VScanner object // could add here, not in the _init_profile() function _get_next_timer = ADD_TIMER(_runtime_profile, "GetNextTime"); - _acquire_runtime_filter_timer = ADD_TIMER(_runtime_profile, "AcuireRuntimeFilterTime"); + + _prepare_rf_timer(_runtime_profile.get()); _open_timer = ADD_TIMER(_runtime_profile, "OpenTime"); _alloc_resource_timer = ADD_TIMER(_runtime_profile, "AllocateResourceTime"); @@ -311,95 +308,6 @@ Status VScanNode::_start_scanners(const std::list& scanners) { return Status::OK(); } -Status VScanNode::_register_runtime_filter() { - int filter_size = _runtime_filter_descs.size(); - _runtime_filter_ctxs.reserve(filter_size); - _runtime_filter_ready_flag.reserve(filter_size); - for (int i = 0; i < filter_size; ++i) { - IRuntimeFilter* runtime_filter = nullptr; - const auto& filter_desc = _runtime_filter_descs[i]; - if (filter_desc.__isset.opt_remote_rf && filter_desc.opt_remote_rf) { - DCHECK(filter_desc.type == TRuntimeFilterType::BLOOM && filter_desc.has_remote_targets); - // Optimize merging phase iff: - // 1. All BE and FE has been upgraded (e.g. opt_remote_rf) - // 2. This filter is bloom filter (only bloom filter should be used for merging) - RETURN_IF_ERROR(_state->get_query_ctx()->runtime_filter_mgr()->register_filter( - RuntimeFilterRole::CONSUMER, filter_desc, _state->query_options(), id(), - false)); - RETURN_IF_ERROR(_state->get_query_ctx()->runtime_filter_mgr()->get_consume_filter( - filter_desc.filter_id, &runtime_filter)); - } else { - RETURN_IF_ERROR(_state->runtime_filter_mgr()->register_filter( - RuntimeFilterRole::CONSUMER, filter_desc, _state->query_options(), id(), - false)); - RETURN_IF_ERROR(_state->runtime_filter_mgr()->get_consume_filter(filter_desc.filter_id, - &runtime_filter)); - } - _runtime_filter_ctxs.emplace_back(runtime_filter); - _runtime_filter_ready_flag.emplace_back(false); - } - return Status::OK(); -} - -bool VScanNode::runtime_filters_are_ready_or_timeout() { - if (!_blocked_by_rf) { - return true; - } - for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) { - IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtime_filter; - if (!runtime_filter->is_ready_or_timeout()) { - return false; - } - } - _blocked_by_rf = false; - return true; -} - -Status VScanNode::_acquire_runtime_filter(bool wait) { - SCOPED_TIMER(_acquire_runtime_filter_timer); - VExprSPtrs vexprs; - for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) { - IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtime_filter; - bool ready = runtime_filter->is_ready(); - if (!ready && wait) { - ready = runtime_filter->await(); - } - if (ready && !_runtime_filter_ctxs[i].apply_mark) { - RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(&vexprs)); - _runtime_filter_ctxs[i].apply_mark = true; - } else if ((wait || !runtime_filter->is_ready_or_timeout()) && - runtime_filter->current_state() == RuntimeFilterState::NOT_READY && - !_runtime_filter_ctxs[i].apply_mark) { - _blocked_by_rf = true; - } else if (!_runtime_filter_ctxs[i].apply_mark) { - DCHECK(runtime_filter->current_state() != RuntimeFilterState::NOT_READY); - _is_all_rf_applied = false; - } - } - RETURN_IF_ERROR(_append_rf_into_conjuncts(vexprs)); - if (_blocked_by_rf) { - return Status::WaitForRf("Runtime filters are neither not ready nor timeout"); - } - - return Status::OK(); -} - -Status VScanNode::_append_rf_into_conjuncts(const VExprSPtrs& vexprs) { - if (vexprs.empty()) { - return Status::OK(); - } - - for (auto& expr : vexprs) { - VExprContextSPtr conjunct = VExprContext::create_shared(expr); - RETURN_IF_ERROR(conjunct->prepare(_state, _row_descriptor)); - RETURN_IF_ERROR(conjunct->open(_state)); - _rf_vexpr_set.insert(expr); - _conjuncts.emplace_back(conjunct); - } - - return Status::OK(); -} - Status VScanNode::close(RuntimeState* state) { if (is_closed()) { return Status::OK(); @@ -1280,46 +1188,6 @@ Status VScanNode::_change_value_range(ColumnValueRange& temp_rang return Status::OK(); } -Status VScanNode::try_append_late_arrival_runtime_filter(int* arrived_rf_num) { - if (_is_all_rf_applied) { - *arrived_rf_num = _runtime_filter_descs.size(); - return Status::OK(); - } - - // This method will be called in scanner thread. - // So need to add lock - std::unique_lock l(_rf_locks); - if (_is_all_rf_applied) { - *arrived_rf_num = _runtime_filter_descs.size(); - return Status::OK(); - } - - // 1. Check if are runtime filter ready but not applied. - VExprSPtrs exprs; - int current_arrived_rf_num = 0; - for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) { - if (_runtime_filter_ctxs[i].apply_mark) { - ++current_arrived_rf_num; - continue; - } else if (_runtime_filter_ctxs[i].runtime_filter->is_ready()) { - RETURN_IF_ERROR(_runtime_filter_ctxs[i].runtime_filter->get_prepared_exprs( - &exprs, _row_descriptor, _state)); - ++current_arrived_rf_num; - _runtime_filter_ctxs[i].apply_mark = true; - } - } - // 2. Append unapplied runtime filters to vconjunct_ctx_ptr - if (!exprs.empty()) { - RETURN_IF_ERROR(_append_rf_into_conjuncts(exprs)); - } - if (current_arrived_rf_num == _runtime_filter_descs.size()) { - _is_all_rf_applied = true; - } - - *arrived_rf_num = current_arrived_rf_num; - return Status::OK(); -} - Status VScanNode::clone_conjunct_ctxs(VExprContextSPtrs& conjuncts) { if (!_conjuncts.empty()) { std::unique_lock l(_rf_locks); diff --git a/be/src/vec/exec/scan/vscan_node.h b/be/src/vec/exec/scan/vscan_node.h index 8d235a8365..bb1089d349 100644 --- a/be/src/vec/exec/scan/vscan_node.h +++ b/be/src/vec/exec/scan/vscan_node.h @@ -43,6 +43,7 @@ #include "runtime/runtime_state.h" #include "util/lock.h" #include "util/runtime_profile.h" +#include "vec/exec/runtime_filter_consumer_node.h" #include "vec/exec/scan/scanner_context.h" #include "vec/exec/scan/vscanner.h" #include "vec/runtime/shared_scanner_controller.h" @@ -87,10 +88,10 @@ struct FilterPredicates { std::vector>> in_filters; }; -class VScanNode : public ExecNode { +class VScanNode : public RuntimeFilterConsumerNode { public: VScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) - : ExecNode(pool, tnode, descs), _runtime_filter_descs(tnode.runtime_filters) { + : RuntimeFilterConsumerNode(pool, tnode, descs) { if (!tnode.__isset.conjuncts || tnode.conjuncts.empty()) { // Which means the request could be fullfilled in a single segment iterator request. if (tnode.limit > 0 && tnode.limit < 1024) { @@ -140,10 +141,6 @@ public: void set_no_agg_finalize() { _need_agg_finalize = false; } - // Try append late arrived runtime filters. - // Return num of filters which are applied already. - Status try_append_late_arrival_runtime_filter(int* arrived_rf_num); - // Clone current _conjuncts to conjuncts, if exists. Status clone_conjunct_ctxs(VExprContextSPtrs& conjuncts); @@ -156,7 +153,6 @@ public: Status alloc_resource(RuntimeState* state) override; void release_resource(RuntimeState* state) override; - bool runtime_filters_are_ready_or_timeout(); Status try_close(); @@ -242,7 +238,6 @@ protected: Status _prepare_scanners(); - RuntimeState* _state; bool _is_pipeline_scan = false; bool _shared_scan_opt = false; // For load scan node, there should be both input and output tuple descriptor. @@ -256,24 +251,6 @@ protected: int _max_scan_key_num; int _max_pushdown_conditions_per_column; - // For runtime filters - struct RuntimeFilterContext { - RuntimeFilterContext() : apply_mark(false), runtime_filter(nullptr) {} - RuntimeFilterContext(IRuntimeFilter* rf) : apply_mark(false), runtime_filter(rf) {} - // set to true if this runtime filter is already applied to vconjunct_ctx_ptr - bool apply_mark; - IRuntimeFilter* runtime_filter; - }; - std::vector _runtime_filter_ctxs; - - std::vector _runtime_filter_descs; - // Set to true if the runtime filter is ready. - std::vector _runtime_filter_ready_flag; - doris::Mutex _rf_locks; - phmap::flat_hash_set _rf_vexpr_set; - // True means all runtime filters are applied to scanners - bool _is_all_rf_applied = true; - // Each scan node will generates a ScannerContext to manage all Scanners. // See comments of ScannerContext for more details std::shared_ptr _scanner_ctx; @@ -315,7 +292,6 @@ protected: std::vector _not_in_value_ranges; bool _need_agg_finalize = true; - bool _blocked_by_rf = false; // If the query like select * from table limit 10; then the query should run in // single scanner to avoid too many scanners which will cause lots of useless read. bool _should_run_serial = false; @@ -371,13 +347,6 @@ protected: std::vector _col_distribute_ids; private: - // Register and get all runtime filters at Init phase. - Status _register_runtime_filter(); - // Get all arrived runtime filters at Open phase. - Status _acquire_runtime_filter(bool wait = true); - // Append late-arrival runtime filters to the vconjunct_ctx. - Status _append_rf_into_conjuncts(const VExprSPtrs& vexprs); - Status _normalize_conjuncts(); Status _normalize_predicate(const VExprSPtr& conjunct_expr_root, VExprContext* context, VExprSPtr& output_expr); diff --git a/be/src/vec/exec/vdata_gen_scan_node.cpp b/be/src/vec/exec/vdata_gen_scan_node.cpp index 35494e93d0..95ce6cbc8a 100644 --- a/be/src/vec/exec/vdata_gen_scan_node.cpp +++ b/be/src/vec/exec/vdata_gen_scan_node.cpp @@ -82,15 +82,15 @@ Status VDataGenFunctionScanNode::prepare(RuntimeState* state) { for (const auto& filter_desc : _runtime_filter_descs) { IRuntimeFilter* runtime_filter = nullptr; if (filter_desc.__isset.opt_remote_rf && filter_desc.opt_remote_rf) { - RETURN_IF_ERROR(state->get_query_ctx()->runtime_filter_mgr()->register_filter( - RuntimeFilterRole::CONSUMER, filter_desc, state->query_options(), id(), false)); + RETURN_IF_ERROR(state->get_query_ctx()->runtime_filter_mgr()->register_consumer_filter( + filter_desc, state->query_options(), id(), false)); RETURN_IF_ERROR(state->get_query_ctx()->runtime_filter_mgr()->get_consume_filter( - filter_desc.filter_id, &runtime_filter)); + filter_desc.filter_id, id(), &runtime_filter)); } else { - RETURN_IF_ERROR(state->runtime_filter_mgr()->register_filter( - RuntimeFilterRole::CONSUMER, filter_desc, state->query_options(), id(), false)); + RETURN_IF_ERROR(state->runtime_filter_mgr()->register_consumer_filter( + filter_desc, state->query_options(), id(), false)); RETURN_IF_ERROR(state->runtime_filter_mgr()->get_consume_filter(filter_desc.filter_id, - &runtime_filter)); + id(), &runtime_filter)); } runtime_filter->init_profile(_runtime_profile.get()); } diff --git a/be/src/vec/exec/vselect_node.cpp b/be/src/vec/exec/vselect_node.cpp index ee1628cd19..626fd5ce96 100644 --- a/be/src/vec/exec/vselect_node.cpp +++ b/be/src/vec/exec/vselect_node.cpp @@ -37,22 +37,34 @@ class TPlanNode; namespace vectorized { VSelectNode::VSelectNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) - : ExecNode(pool, tnode, descs), _child_eos(false) {} + : RuntimeFilterConsumerNode(pool, tnode, descs), _child_eos(false) {} Status VSelectNode::init(const TPlanNode& tnode, RuntimeState* state) { - return ExecNode::init(tnode, state); + return RuntimeFilterConsumerNode::init(tnode, state); } Status VSelectNode::prepare(RuntimeState* state) { - return ExecNode::prepare(state); + return RuntimeFilterConsumerNode::prepare(state); } Status VSelectNode::open(RuntimeState* state) { - RETURN_IF_ERROR(ExecNode::open(state)); + RETURN_IF_ERROR(RuntimeFilterConsumerNode::open(state)); RETURN_IF_ERROR(child(0)->open(state)); return Status::OK(); } +Status VSelectNode::alloc_resource(RuntimeState* state) { + if (_opened) { + return Status::OK(); + } + + RETURN_IF_ERROR(RuntimeFilterConsumerNode::alloc_resource(state)); + RETURN_IF_ERROR(_acquire_runtime_filter()); + RETURN_IF_CANCELLED(state); + _opened = true; + return Status::OK(); +} + Status VSelectNode::get_next(RuntimeState* state, vectorized::Block* block, bool* eos) { SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_CANCELLED(state); diff --git a/be/src/vec/exec/vselect_node.h b/be/src/vec/exec/vselect_node.h index d6783d7237..140009e4b3 100644 --- a/be/src/vec/exec/vselect_node.h +++ b/be/src/vec/exec/vselect_node.h @@ -17,7 +17,7 @@ #pragma once #include "common/status.h" -#include "exec/exec_node.h" +#include "vec/exec/runtime_filter_consumer_node.h" namespace doris { class DescriptorTbl; @@ -28,7 +28,7 @@ class TPlanNode; namespace vectorized { class Block; -class VSelectNode final : public ExecNode { +class VSelectNode final : public RuntimeFilterConsumerNode { public: VSelectNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) override; @@ -38,9 +38,12 @@ public: Status close(RuntimeState* state) override; Status pull(RuntimeState* state, vectorized::Block* output_block, bool* eos) override; + Status alloc_resource(RuntimeState* state) override; + private: // true if last get_next() call on child signalled eos bool _child_eos; + bool _opened = false; }; } // namespace vectorized } // namespace doris \ No newline at end of file diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 8593b750ba..10cb05acba 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -1441,7 +1441,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor { - List filters = runtimeFilterTranslator + Set filters = runtimeFilterTranslator .getRuntimeFilterOfHashJoinNode(nestedLoopJoin); filters.forEach(filter -> runtimeFilterTranslator .createLegacyRuntimeFilter(filter, nestedLoopJoinNode, context)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java index 9df4e74ea4..da4bb2e131 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java @@ -25,6 +25,7 @@ import org.apache.doris.analysis.TupleId; import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.nereids.processor.post.RuntimeFilterContext; import org.apache.doris.nereids.trees.expressions.ExprId; +import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.Slot; import org.apache.doris.nereids.trees.expressions.SlotReference; import org.apache.doris.nereids.trees.plans.ObjectId; @@ -41,9 +42,11 @@ import org.apache.doris.thrift.TRuntimeFilterType; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Set; /** * translate runtime filter @@ -57,7 +60,7 @@ public class RuntimeFilterTranslator { context.generatePhysicalHashJoinToRuntimeFilter(); } - public List getRuntimeFilterOfHashJoinNode(AbstractPhysicalJoin join) { + public Set getRuntimeFilterOfHashJoinNode(AbstractPhysicalJoin join) { return context.getRuntimeFilterOnHashJoinNode(join); } @@ -101,51 +104,65 @@ public class RuntimeFilterTranslator { * @param ctx plan translator context */ public void createLegacyRuntimeFilter(RuntimeFilter filter, JoinNodeBase node, PlanTranslatorContext ctx) { - Expr target = context.getExprIdToOlapScanNodeSlotRef().get(filter.getTargetExpr().getExprId()); - if (target == null) { - context.setTargetNullCount(); - return; - } - Expr targetExpr; - if (filter.getType() == TRuntimeFilterType.BITMAP) { - if (filter.getTargetExpression().equals(filter.getTargetExpr())) { - targetExpr = target; - } else { - RuntimeFilterExpressionTranslator translator = new RuntimeFilterExpressionTranslator( - context.getExprIdToOlapScanNodeSlotRef()); - targetExpr = filter.getTargetExpression().accept(translator, ctx); - } - } else { - targetExpr = target; - } - Expr src = ExpressionTranslator.translate(filter.getSrcExpr(), ctx); - SlotRef targetSlot = target.getSrcSlotRef(); - TupleId targetTupleId = targetSlot.getDesc().getParent().getId(); - SlotId targetSlotId = targetSlot.getSlotId(); - // adjust data type - if (!src.getType().equals(target.getType()) && filter.getType() != TRuntimeFilterType.BITMAP) { - targetExpr = new CastExpr(src.getType(), targetExpr); + List targetExprList = new ArrayList<>(); + List>> targetTupleIdMapList = new ArrayList<>(); + List scanNodeList = new ArrayList<>(); + boolean hasInvalidTarget = false; + for (int i = 0; i < filter.getTargetExprs().size(); i++) { + Slot curTargetExpr = filter.getTargetExprs().get(i); + Expression curTargetExpression = filter.getTargetExpressions().get(i); + Expr target = context.getExprIdToOlapScanNodeSlotRef().get(curTargetExpr.getExprId()); + if (target == null) { + context.setTargetNullCount(); + hasInvalidTarget = true; + break; + } + Expr targetExpr; + if (filter.getType() == TRuntimeFilterType.BITMAP) { + if (curTargetExpression.equals(curTargetExpr)) { + targetExpr = target; + } else { + RuntimeFilterExpressionTranslator translator = new RuntimeFilterExpressionTranslator( + context.getExprIdToOlapScanNodeSlotRef()); + targetExpr = curTargetExpression.accept(translator, ctx); + } + } else { + targetExpr = target; + } + // adjust data type + if (!src.getType().equals(target.getType()) && filter.getType() != TRuntimeFilterType.BITMAP) { + targetExpr = new CastExpr(src.getType(), targetExpr); + } + SlotRef targetSlot = target.getSrcSlotRef(); + TupleId targetTupleId = targetSlot.getDesc().getParent().getId(); + SlotId targetSlotId = targetSlot.getSlotId(); + ScanNode scanNode = context.getScanNodeOfLegacyRuntimeFilterTarget().get(curTargetExpr); + scanNodeList.add(scanNode); + targetExprList.add(targetExpr); + targetTupleIdMapList.add(ImmutableMap.of(targetTupleId, ImmutableList.of(targetSlotId))); } - org.apache.doris.planner.RuntimeFilter origFilter - = org.apache.doris.planner.RuntimeFilter.fromNereidsRuntimeFilter( - filter.getId(), node, src, filter.getExprOrder(), targetExpr, - ImmutableMap.of(targetTupleId, ImmutableList.of(targetSlotId)), - filter.getType(), context.getLimits(), filter.getBuildSideNdv()); - if (node instanceof HashJoinNode) { - origFilter.setIsBroadcast(((HashJoinNode) node).getDistributionMode() == DistributionMode.BROADCAST); - } else { - //bitmap rf requires isBroadCast=false, it always requires merge filter - origFilter.setIsBroadcast(false); + if (!hasInvalidTarget) { + org.apache.doris.planner.RuntimeFilter origFilter + = org.apache.doris.planner.RuntimeFilter.fromNereidsRuntimeFilter( + filter.getId(), node, src, filter.getExprOrder(), targetExprList, + targetTupleIdMapList, filter.getType(), context.getLimits(), filter.getBuildSideNdv()); + if (node instanceof HashJoinNode) { + origFilter.setIsBroadcast(((HashJoinNode) node).getDistributionMode() == DistributionMode.BROADCAST); + } else { + //bitmap rf requires isBroadCast=false, it always requires merge filter + origFilter.setIsBroadcast(false); + } + boolean isLocalTarget = scanNodeList.stream().allMatch(e -> e.getFragmentId().equals(node.getFragmentId())); + for (int i = 0; i < targetExprList.size(); i++) { + ScanNode scanNode = scanNodeList.get(i); + Expr targetExpr = targetExprList.get(i); + origFilter.addTarget(new RuntimeFilterTarget( + scanNode, targetExpr, true, isLocalTarget)); + } + origFilter.setBitmapFilterNotIn(filter.isBitmapFilterNotIn()); + context.getLegacyFilters().add(finalize(origFilter)); } - ScanNode scanNode = context.getScanNodeOfLegacyRuntimeFilterTarget().get(filter.getTargetExpr()); - origFilter.addTarget(new RuntimeFilterTarget( - scanNode, - targetExpr, - true, - scanNode.getFragmentId().equals(node.getFragmentId()))); - origFilter.setBitmapFilterNotIn(filter.isBitmapFilterNotIn()); - context.getLegacyFilters().add(finalize(origFilter)); } private org.apache.doris.planner.RuntimeFilter finalize(org.apache.doris.planner.RuntimeFilter origFilter) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java index 69986cde67..b9d2ac301e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java @@ -67,7 +67,7 @@ public class RuntimeFilterContext { // exprId to olap scan node slotRef because the slotRef will be changed when translating. private final Map exprIdToOlapScanNodeSlotRef = Maps.newHashMap(); - private final Map> runtimeFilterOnHashJoinNode = Maps.newHashMap(); + private final Map> runtimeFilterOnHashJoinNode = Maps.newHashMap(); // alias -> alias's child, if there's a key that is alias's child, the key-value will change by this way // Alias(A) = B, now B -> A in map, and encounter Alias(B) -> C, the kv will be C -> A. @@ -97,7 +97,7 @@ public class RuntimeFilterContext { } public void setTargetExprIdToFilter(ExprId id, RuntimeFilter filter) { - Preconditions.checkArgument(filter.getTargetExpr().getExprId() == id); + Preconditions.checkArgument(filter.getTargetExprs().stream().anyMatch(expr -> expr.getExprId() == id)); this.targetExprIdToFilter.computeIfAbsent(id, k -> Lists.newArrayList()).add(filter); } @@ -135,13 +135,13 @@ public class RuntimeFilterContext { return scanNodeOfLegacyRuntimeFilterTarget; } - public List getRuntimeFilterOnHashJoinNode(AbstractPhysicalJoin join) { - return runtimeFilterOnHashJoinNode.getOrDefault(join, Collections.emptyList()); + public Set getRuntimeFilterOnHashJoinNode(AbstractPhysicalJoin join) { + return runtimeFilterOnHashJoinNode.getOrDefault(join, Collections.emptySet()); } public void generatePhysicalHashJoinToRuntimeFilter() { targetExprIdToFilter.values().forEach(filters -> filters.forEach(filter -> runtimeFilterOnHashJoinNode - .computeIfAbsent(filter.getBuilderNode(), k -> Lists.newArrayList()).add(filter))); + .computeIfAbsent(filter.getBuilderNode(), k -> Sets.newHashSet()).add(filter))); } public Map> getTargetExprIdToFilter() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java index bb0272c3a7..9ec1a34c9e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java @@ -27,15 +27,19 @@ import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.NamedExpression; import org.apache.doris.nereids.trees.expressions.Not; import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.expressions.SlotReference; import org.apache.doris.nereids.trees.expressions.functions.scalar.BitmapContains; import org.apache.doris.nereids.trees.plans.AbstractPlan; import org.apache.doris.nereids.trees.plans.JoinType; import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.physical.PhysicalExcept; import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin; +import org.apache.doris.nereids.trees.plans.physical.PhysicalIntersect; import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin; import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan; import org.apache.doris.nereids.trees.plans.physical.PhysicalProject; import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation; +import org.apache.doris.nereids.trees.plans.physical.PhysicalUnion; import org.apache.doris.nereids.trees.plans.physical.RuntimeFilter; import org.apache.doris.nereids.util.ExpressionUtils; import org.apache.doris.nereids.util.JoinUtils; @@ -44,8 +48,10 @@ import org.apache.doris.statistics.ColumnStatistic; import org.apache.doris.thrift.TRuntimeFilterType; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -105,29 +111,81 @@ public class RuntimeFilterGenerator extends PlanPostProcessor { if (type == TRuntimeFilterType.BITMAP) { continue; } - // currently, we can ensure children in the two side are corresponding to the equal_to's. - // so right maybe an expression and left is a slot - Slot unwrappedSlot = checkTargetChild(equalTo.left()); - // aliasTransMap doesn't contain the key, means that the path from the olap scan to the join - // contains join with denied join type. for example: a left join b on a.id = b.id - if (unwrappedSlot == null || !aliasTransferMap.containsKey(unwrappedSlot)) { - continue; - } - Slot olapScanSlot = aliasTransferMap.get(unwrappedSlot).second; - PhysicalRelation scan = aliasTransferMap.get(unwrappedSlot).first; - // in-filter is not friendly to pipeline - if (type == TRuntimeFilterType.IN_OR_BLOOM - && ctx.getSessionVariable().enablePipelineEngine() - && hasRemoteTarget(join, scan)) { - type = TRuntimeFilterType.BLOOM; - } + if (join.left() instanceof PhysicalUnion + || join.left() instanceof PhysicalIntersect + || join.left() instanceof PhysicalExcept) { + List targetList = new ArrayList<>(); + int projIndex = -1; + for (int j = 0; j < join.left().children().size(); j++) { + PhysicalPlan child = (PhysicalPlan) join.left().child(j); + if (child instanceof PhysicalProject) { + PhysicalProject project = (PhysicalProject) child; + Slot leftSlot = checkTargetChild(equalTo.left()); + if (leftSlot == null) { + break; + } + for (int k = 0; projIndex < 0 && k < project.getProjects().size(); k++) { + NamedExpression expr = (NamedExpression) project.getProjects().get(k); + if (expr.getName().equals(leftSlot.getName())) { + projIndex = k; + break; + } + } + Preconditions.checkState(projIndex >= 0 + && projIndex < project.getProjects().size()); - long buildSideNdv = getBuildSideNdv(join, equalTo); - RuntimeFilter filter = new RuntimeFilter(generator.getNextId(), - equalTo.right(), olapScanSlot, type, i, join, buildSideNdv); - ctx.addJoinToTargetMap(join, olapScanSlot.getExprId()); - ctx.setTargetExprIdToFilter(olapScanSlot.getExprId(), filter); - ctx.setTargetsOnScanNode(aliasTransferMap.get(unwrappedSlot).first.getId(), olapScanSlot); + NamedExpression targetExpr = (NamedExpression) project.getProjects().get(projIndex); + + SlotReference origSlot = null; + if (targetExpr instanceof Alias) { + origSlot = (SlotReference) targetExpr.child(0); + } else { + origSlot = (SlotReference) targetExpr; + } + Slot olapScanSlot = aliasTransferMap.get(origSlot).second; + PhysicalRelation scan = aliasTransferMap.get(origSlot).first; + if (type == TRuntimeFilterType.IN_OR_BLOOM + && ctx.getSessionVariable().enablePipelineEngine() + && hasRemoteTarget(join, scan)) { + type = TRuntimeFilterType.BLOOM; + } + targetList.add(olapScanSlot); + ctx.addJoinToTargetMap(join, olapScanSlot.getExprId()); + ctx.setTargetsOnScanNode(aliasTransferMap.get(origSlot).first.getId(), olapScanSlot); + } + } + if (!targetList.isEmpty()) { + long buildSideNdv = getBuildSideNdv(join, equalTo); + RuntimeFilter filter = new RuntimeFilter(generator.getNextId(), + equalTo.right(), targetList, type, i, join, buildSideNdv); + for (int j = 0; j < targetList.size(); j++) { + ctx.setTargetExprIdToFilter(targetList.get(j).getExprId(), filter); + } + } + } else { + // currently, we can ensure children in the two side are corresponding to the equal_to's. + // so right maybe an expression and left is a slot + Slot unwrappedSlot = checkTargetChild(equalTo.left()); + // aliasTransMap doesn't contain the key, means that the path from the olap scan to the join + // contains join with denied join type. for example: a left join b on a.id = b.id + if (unwrappedSlot == null || !aliasTransferMap.containsKey(unwrappedSlot)) { + continue; + } + Slot olapScanSlot = aliasTransferMap.get(unwrappedSlot).second; + PhysicalRelation scan = aliasTransferMap.get(unwrappedSlot).first; + // in-filter is not friendly to pipeline + if (type == TRuntimeFilterType.IN_OR_BLOOM + && ctx.getSessionVariable().enablePipelineEngine() + && hasRemoteTarget(join, scan)) { + type = TRuntimeFilterType.BLOOM; + } + long buildSideNdv = getBuildSideNdv(join, equalTo); + RuntimeFilter filter = new RuntimeFilter(generator.getNextId(), + equalTo.right(), ImmutableList.of(olapScanSlot), type, i, join, buildSideNdv); + ctx.addJoinToTargetMap(join, olapScanSlot.getExprId()); + ctx.setTargetExprIdToFilter(olapScanSlot.getExprId(), filter); + ctx.setTargetsOnScanNode(aliasTransferMap.get(unwrappedSlot).first.getId(), olapScanSlot); + } } } } @@ -194,8 +252,8 @@ public class RuntimeFilterGenerator extends PlanPostProcessor { if (targetSlot != null && aliasTransferMap.containsKey(targetSlot)) { Slot olapScanSlot = aliasTransferMap.get(targetSlot).second; RuntimeFilter filter = new RuntimeFilter(generator.getNextId(), - bitmapContains.child(0), olapScanSlot, - bitmapContains.child(1), type, i, join, isNot, -1L); + bitmapContains.child(0), ImmutableList.of(olapScanSlot), + ImmutableList.of(bitmapContains.child(1)), type, i, join, isNot, -1L); ctx.addJoinToTargetMap(join, olapScanSlot.getExprId()); ctx.setTargetExprIdToFilter(olapScanSlot.getExprId(), filter); ctx.setTargetsOnScanNode(aliasTransferMap.get(targetSlot).first.getId(), @@ -221,7 +279,7 @@ public class RuntimeFilterGenerator extends PlanPostProcessor { if (expr instanceof NamedExpression && aliasTransferMap.containsKey((NamedExpression) expr)) { if (expression instanceof Alias) { Alias alias = ((Alias) expression); - aliasTransferMap.put(alias.toSlot(), aliasTransferMap.remove(expr)); + aliasTransferMap.put(alias.toSlot(), aliasTransferMap.get(expr)); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/RuntimeFilter.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/RuntimeFilter.java index 703a043b48..ff907f62aa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/RuntimeFilter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/RuntimeFilter.java @@ -22,6 +22,10 @@ import org.apache.doris.nereids.trees.expressions.Slot; import org.apache.doris.planner.RuntimeFilterId; import org.apache.doris.thrift.TRuntimeFilterType; +import com.google.common.collect.ImmutableList; + +import java.util.List; + /** * runtime filter */ @@ -32,8 +36,8 @@ public class RuntimeFilter { private final Expression srcSlot; //bitmap filter support target expression like k1+1, abs(k1) //targetExpression is an expression on targetSlot, in which there is only one non-const slot - private final Expression targetExpression; - private final Slot targetSlot; + private final List targetExpressions; + private final List targetSlots; private final int exprOrder; private final AbstractPhysicalJoin builderNode; @@ -44,21 +48,21 @@ public class RuntimeFilter { /** * constructor */ - public RuntimeFilter(RuntimeFilterId id, Expression src, Slot target, TRuntimeFilterType type, + public RuntimeFilter(RuntimeFilterId id, Expression src, List targets, TRuntimeFilterType type, int exprOrder, AbstractPhysicalJoin builderNode, long buildSideNdv) { - this(id, src, target, target, type, exprOrder, builderNode, false, buildSideNdv); + this(id, src, targets, ImmutableList.copyOf(targets), type, exprOrder, builderNode, false, buildSideNdv); } /** * constructor */ - public RuntimeFilter(RuntimeFilterId id, Expression src, Slot target, Expression targetExpression, + public RuntimeFilter(RuntimeFilterId id, Expression src, List targets, List targetExpressions, TRuntimeFilterType type, int exprOrder, AbstractPhysicalJoin builderNode, boolean bitmapFilterNotIn, long buildSideNdv) { this.id = id; this.srcSlot = src; - this.targetSlot = target; - this.targetExpression = targetExpression; + this.targetSlots = ImmutableList.copyOf(targets); + this.targetExpressions = ImmutableList.copyOf(targetExpressions); this.type = type; this.exprOrder = exprOrder; this.builderNode = builderNode; @@ -71,8 +75,8 @@ public class RuntimeFilter { return srcSlot; } - public Slot getTargetExpr() { - return targetSlot; + public List getTargetExprs() { + return targetSlots; } public RuntimeFilterId getId() { @@ -95,8 +99,8 @@ public class RuntimeFilter { return bitmapFilterNotIn; } - public Expression getTargetExpression() { - return targetExpression; + public List getTargetExpressions() { + return targetExpressions; } public long getBuildSideNdv() { @@ -107,7 +111,7 @@ public class RuntimeFilter { public String toString() { StringBuilder sb = new StringBuilder(); sb.append("RF").append(id.asInt()) - .append("[").append(getSrcExpr()).append("->").append(targetSlot) + .append("[").append(getSrcExpr()).append("->").append(targetSlots) .append("(ndv/size = ").append(buildSideNdv).append("/") .append(org.apache.doris.planner.RuntimeFilter.expectRuntimeFilterSize(buildSideNdv)) .append(")"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java index 484a666851..d45f4f2611 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java @@ -39,6 +39,7 @@ import org.apache.doris.thrift.TRuntimeFilterType; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -71,12 +72,12 @@ public final class RuntimeFilter { // The position of expr in the join condition private final int exprOrder; // Expr (lhs of join predicate) from which the targetExprs_ are generated. - private final Expr origTargetExpr; + private final List origTargetExprs; // Runtime filter targets private final List targets = new ArrayList<>(); // Slots from base table tuples that have value transfer from the slots // of 'origTargetExpr'. The slots are grouped by tuple id. - private final Map> targetSlotsByTid; + private final List>> targetSlotsByTid; // If true, the join node building this filter is executed using a broadcast join; // set in the DistributedPlanner.createHashJoinFragment() private boolean isBroadcastJoin; @@ -139,14 +140,14 @@ public final class RuntimeFilter { } private RuntimeFilter(RuntimeFilterId filterId, PlanNode filterSrcNode, Expr srcExpr, int exprOrder, - Expr origTargetExpr, Map> targetSlots, TRuntimeFilterType type, + List origTargetExprs, List>> targetSlots, TRuntimeFilterType type, RuntimeFilterGenerator.FilterSizeLimits filterSizeLimits, long buildSizeNdv) { this.id = filterId; this.builderNode = filterSrcNode; this.srcExpr = srcExpr; this.exprOrder = exprOrder; - this.origTargetExpr = origTargetExpr; - this.targetSlotsByTid = targetSlots; + this.origTargetExprs = ImmutableList.copyOf(origTargetExprs); + this.targetSlotsByTid = ImmutableList.copyOf(targetSlots); this.runtimeFilterType = type; this.ndvEstimate = buildSizeNdv; computeNdvEstimate(); @@ -155,9 +156,9 @@ public final class RuntimeFilter { // only for nereids planner public static RuntimeFilter fromNereidsRuntimeFilter(RuntimeFilterId id, JoinNodeBase node, Expr srcExpr, - int exprOrder, Expr origTargetExpr, Map> targetSlots, + int exprOrder, List origTargetExprs, List>> targetSlots, TRuntimeFilterType type, RuntimeFilterGenerator.FilterSizeLimits filterSizeLimits, long buildSizeNdv) { - return new RuntimeFilter(id, node, srcExpr, exprOrder, origTargetExpr, + return new RuntimeFilter(id, node, srcExpr, exprOrder, origTargetExprs, targetSlots, type, filterSizeLimits, buildSizeNdv); } @@ -237,11 +238,11 @@ public final class RuntimeFilter { return srcExpr; } - public Expr getOrigTargetExpr() { - return origTargetExpr; + public List getOrigTargetExprs() { + return origTargetExprs; } - public Map> getTargetSlots() { + public List>> getTargetSlots() { return targetSlotsByTid; } @@ -329,7 +330,7 @@ public final class RuntimeFilter { } return new RuntimeFilter(idGen.getNextId(), filterSrcNode, srcExpr, exprOrder, - targetExpr, targetSlots, type, filterSizeLimits, -1L); + ImmutableList.of(targetExpr), ImmutableList.of(targetSlots), type, filterSizeLimits, -1L); } public static RuntimeFilter create(IdGenerator idGen, Analyzer analyzer, Expr joinPredicate, @@ -365,7 +366,8 @@ public final class RuntimeFilter { } RuntimeFilter runtimeFilter = - new RuntimeFilter(idGen.getNextId(), filterSrcNode, srcExpr, exprOrder, targetExpr, targetSlots, + new RuntimeFilter(idGen.getNextId(), filterSrcNode, srcExpr, exprOrder, + ImmutableList.of(targetExpr), ImmutableList.of(targetSlots), type, filterSizeLimits, -1L); runtimeFilter.setBitmapFilterNotIn(((BitmapFilterPredicate) joinPredicate).isNotIn()); return runtimeFilter; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilterGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilterGenerator.java index 7de864f50a..fb1b4da92b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilterGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilterGenerator.java @@ -317,10 +317,12 @@ public final class RuntimeFilterGenerator { * destination node for that filter. */ private void registerRuntimeFilter(RuntimeFilter filter) { - Map> targetSlotsByTid = filter.getTargetSlots(); - Preconditions.checkState(targetSlotsByTid != null && !targetSlotsByTid.isEmpty()); - for (TupleId tupleId : targetSlotsByTid.keySet()) { - registerRuntimeFilter(filter, tupleId); + List>> targetSlotsByTids = filter.getTargetSlots(); + for (Map> targetSlotsByTid : targetSlotsByTids) { + Preconditions.checkState(targetSlotsByTid != null && !targetSlotsByTid.isEmpty()); + for (TupleId tupleId : targetSlotsByTid.keySet()) { + registerRuntimeFilter(filter, tupleId); + } } } @@ -328,7 +330,7 @@ public final class RuntimeFilterGenerator { * Registers a runtime filter with a specific target tuple id. */ private void registerRuntimeFilter(RuntimeFilter filter, TupleId targetTid) { - Preconditions.checkState(filter.getTargetSlots().containsKey(targetTid)); + Preconditions.checkState(filter.getTargetSlots().stream().anyMatch(e -> e.containsKey(targetTid))); List filters = runtimeFiltersByTid.computeIfAbsent(targetTid, k -> new ArrayList<>()); Preconditions.checkState(!filter.isFinalized()); filters.add(filter); @@ -344,9 +346,11 @@ public final class RuntimeFilterGenerator { for (RuntimeFilter.RuntimeFilterTarget target : runtimeFilter.getTargets()) { targetTupleIds.addAll(target.node.getTupleIds()); } - for (TupleId tupleId : runtimeFilter.getTargetSlots().keySet()) { - if (!targetTupleIds.contains(tupleId)) { - runtimeFiltersByTid.get(tupleId).remove(runtimeFilter); + for (Map> slots : runtimeFilter.getTargetSlots()) { + for (TupleId tupleId : slots.keySet()) { + if (!targetTupleIds.contains(tupleId)) { + runtimeFiltersByTid.get(tupleId).remove(runtimeFilter); + } } } runtimeFilter.markFinalized(); @@ -424,14 +428,22 @@ public final class RuntimeFilterGenerator { * the scan node with target tuple descriptor 'targetTid'. */ private Expr computeTargetExpr(RuntimeFilter filter, TupleId targetTid) { - Expr targetExpr = filter.getOrigTargetExpr(); + Preconditions.checkState(filter.getTargetSlots().size() == filter.getOrigTargetExprs().size()); + Expr targetExpr = null; + for (int i = 0; i < filter.getOrigTargetExprs().size(); i++) { + if (filter.getTargetSlots().get(i).containsKey(targetTid)) { + targetExpr = filter.getOrigTargetExprs().get(i); + break; + } + } + Preconditions.checkState(targetExpr != null); // if there is a subquery on the left side of join, in order to push to scan in the subquery, // targetExpr will return false as long as there is a slotref parent node that is not targetTid. // But when this slotref can be transferred to the targetTid slot, such as Aa + Bb = Cc, // targetTid is B, if Aa can be transferred to Ba, that is, Aa and Ba are equivalent columns, // then replace Aa with Ba, and then calculate for targetTid targetExpr if (!targetExpr.isBound(targetTid)) { - Preconditions.checkState(filter.getTargetSlots().containsKey(targetTid)); + Preconditions.checkState(filter.getTargetSlots().stream().anyMatch(e -> e.containsKey(targetTid))); // Modify the filter target expr using the equivalent slots from the scan node // on which the filter will be applied. ExprSubstitutionMap smap = new ExprSubstitutionMap(); @@ -440,7 +452,14 @@ public final class RuntimeFilterGenerator { // all slots of targetSlotsByTid. targetExpr.collect(SlotRef.class, exprSlots); // targetExpr specifies the id of the slotRef node in the `tupleID` - List sids = filter.getTargetSlots().get(targetTid); + List sids = new ArrayList<>(); + for (Map> map : filter.getTargetSlots()) { + if (map.containsKey(targetTid)) { + sids = map.get(targetTid); + break; + } + } + Preconditions.checkState(!sids.isEmpty()); for (SlotRef slotRef : exprSlots) { for (SlotId sid : sids) { if (analyzer.hasValueTransfer(slotRef.getSlotId(), sid)) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java index e4dd754be9..699df19ba4 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java @@ -252,7 +252,7 @@ public class RuntimeFilterTest extends SSBTestBase { for (RuntimeFilter filter : filters) { Assertions.assertTrue(colNames.contains(Pair.of( filter.getSrcExpr().toSql(), - filter.getTargetExpr().getName()))); + filter.getTargetExprs().get(0).getName()))); } } }