From 14e7eb762430b251869737eeef6ee3ac83ce3707 Mon Sep 17 00:00:00 2001 From: HappenLee Date: Wed, 3 Jan 2024 11:21:28 +0800 Subject: [PATCH] [Opt](rf) Opt broadcast join remote runtime filter merge and wait (#29439) --- be/src/exprs/runtime_filter.cpp | 10 +++++++--- be/src/exprs/runtime_filter.h | 2 +- be/src/exprs/runtime_filter_slots.h | 6 +++--- be/src/pipeline/exec/hashjoin_build_sink.cpp | 2 +- be/src/runtime/runtime_filter_mgr.cpp | 10 +++++----- be/src/vec/exec/join/vhash_join_node.cpp | 2 +- .../java/org/apache/doris/planner/RuntimeFilter.java | 4 ++++ .../main/java/org/apache/doris/qe/Coordinator.java | 12 ++++++++++-- 8 files changed, 32 insertions(+), 16 deletions(-) diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index 8e8e3dfd8c..f52a9574bf 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -985,7 +985,7 @@ Status IRuntimeFilter::merge_local_filter(RuntimePredicateWrapper* wrapper, int* return Status::OK(); } -Status IRuntimeFilter::publish() { +Status IRuntimeFilter::publish(bool publish_local) { DCHECK(is_producer()); if (_is_global) { std::vector filters; @@ -1010,13 +1010,17 @@ Status IRuntimeFilter::publish() { filter->update_runtime_filter_type_to_profile(); filter->signal(); } - return Status::OK(); - } else { + } else if (!publish_local) { TNetworkAddress addr; DCHECK(_state != nullptr); RETURN_IF_ERROR(_state->runtime_filter_mgr->get_merge_addr(&addr)); return push_to_remote(_state, &addr, _opt_remote_rf); + } else { + // remote broadcast join only push onetime in build shared hash table + // publish_local only set true on copy shared hash table + DCHECK(_is_broadcast_join); } + return Status::OK(); } Status IRuntimeFilter::get_push_expr_ctxs(std::list& probe_ctxs, diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h index 32fee99173..2673308ae7 100644 --- a/be/src/exprs/runtime_filter.h +++ b/be/src/exprs/runtime_filter.h @@ -228,7 +228,7 @@ public: // publish filter // push filter to remote node or push down it to scan_node - Status publish(); + Status publish(bool publish_local = false); RuntimeFilterType type() const { return _runtime_filter_type; } diff --git a/be/src/exprs/runtime_filter_slots.h b/be/src/exprs/runtime_filter_slots.h index 7223b652a3..d539e295ae 100644 --- a/be/src/exprs/runtime_filter_slots.h +++ b/be/src/exprs/runtime_filter_slots.h @@ -204,10 +204,10 @@ public: } // publish runtime filter - Status publish() { + Status publish(bool publish_local = false) { for (auto& pair : _runtime_filters) { - for (auto filter : pair.second) { - RETURN_IF_ERROR(filter->publish()); + for (auto& filter : pair.second) { + RETURN_IF_ERROR(filter->publish(publish_local)); } } return Status::OK(); diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index 8c1ff85243..3d9fd50191 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -568,7 +568,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* RETURN_IF_ERROR( local_state._runtime_filter_slots->copy_from_shared_context( _shared_hash_table_context)); - RETURN_IF_ERROR(local_state._runtime_filter_slots->publish()); + RETURN_IF_ERROR(local_state._runtime_filter_slots->publish(true)); return Status::OK(); }}, *local_state._shared_state->hash_table_variants); diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp index b67bc5ffd3..73a043070e 100644 --- a/be/src/runtime/runtime_filter_mgr.cpp +++ b/be/src/runtime/runtime_filter_mgr.cpp @@ -328,6 +328,10 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ cntVal = iter->second.first; { std::lock_guard l(*iter->second.second); + // Skip the other broadcast join runtime filter + if (cntVal->arrive_id.size() == 1 && cntVal->runtime_filter_desc.is_broadcast_join) { + return Status::OK(); + } MergeRuntimeFilterParams params(request, attach_data); ObjectPool* pool = cntVal->pool.get(); RuntimeFilterWrapperHolder holder; @@ -339,11 +343,7 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ VLOG_ROW << "merge size:" << merged_size << ":" << cntVal->producer_size; DCHECK_LE(merged_size, cntVal->producer_size); cntVal->merge_time += (MonotonicMillis() - start_merge); - if (merged_size < cntVal->producer_size) { - return Status::OK(); - } else { - merge_time = cntVal->merge_time; - } + merge_time = cntVal->merge_time; } if (merged_size == cntVal->producer_size) { diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index 60391716e2..b135ada723 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -824,7 +824,7 @@ Status HashJoinNode::sink(doris::RuntimeState* state, vectorized::Block* in_bloc state, arg.hash_table->size())); RETURN_IF_ERROR(_runtime_filter_slots->copy_from_shared_context( _shared_hash_table_context)); - RETURN_IF_ERROR(_runtime_filter_slots->publish()); + RETURN_IF_ERROR(_runtime_filter_slots->publish(true)); return Status::OK(); }}, *_hash_table_variants); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java index d21f390c04..646a07221a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java @@ -604,6 +604,10 @@ public final class RuntimeFilter { isBroadcastJoin = isBroadcast; } + public boolean isBroadcast() { + return isBroadcastJoin; + } + public void computeNdvEstimate() { if (ndvEstimate < 0) { ndvEstimate = builderNode.getChild(1).getCardinalityAfterFilter(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 421fa1ac52..9bfca2802e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -3627,6 +3627,9 @@ public class Coordinator implements CoordInterface { params.params.setRuntimeFilterParams(new TRuntimeFilterParams()); params.params.runtime_filter_params.setRuntimeFilterMergeAddr(runtimeFilterMergeAddr); if (instanceExecParam.instanceId.equals(runtimeFilterMergeInstanceId)) { + Set broadCastRf = assignedRuntimeFilters.stream().filter(RuntimeFilter::isBroadcast) + .map(r -> r.getFilterId().asInt()).collect(Collectors.toSet()); + for (RuntimeFilter rf : assignedRuntimeFilters) { if (!ridToTargetParam.containsKey(rf.getFilterId())) { continue; @@ -3668,7 +3671,8 @@ public class Coordinator implements CoordInterface { } for (Map.Entry entry : ridToBuilderNum.entrySet()) { params.params.runtime_filter_params.putToRuntimeFilterBuilderNum( - entry.getKey().asInt(), entry.getValue()); + entry.getKey().asInt(), broadCastRf.contains(entry.getKey().asInt()) + ? 1 : entry.getValue()); } for (RuntimeFilter rf : assignedRuntimeFilters) { params.params.runtime_filter_params.putToRidToRuntimeFilter( @@ -3753,6 +3757,9 @@ public class Coordinator implements CoordInterface { localParams.setRuntimeFilterParams(new TRuntimeFilterParams()); localParams.runtime_filter_params.setRuntimeFilterMergeAddr(runtimeFilterMergeAddr); if (instanceExecParam.instanceId.equals(runtimeFilterMergeInstanceId)) { + Set broadCastRf = assignedRuntimeFilters.stream().filter(RuntimeFilter::isBroadcast) + .map(r -> r.getFilterId().asInt()).collect(Collectors.toSet()); + for (RuntimeFilter rf : assignedRuntimeFilters) { if (!ridToTargetParam.containsKey(rf.getFilterId())) { continue; @@ -3795,7 +3802,8 @@ public class Coordinator implements CoordInterface { } for (Map.Entry entry : ridToBuilderNum.entrySet()) { localParams.runtime_filter_params.putToRuntimeFilterBuilderNum( - entry.getKey().asInt(), entry.getValue()); + entry.getKey().asInt(), broadCastRf.contains(entry.getKey().asInt()) ? 1 : + entry.getValue()); } for (RuntimeFilter rf : assignedRuntimeFilters) { localParams.runtime_filter_params.putToRidToRuntimeFilter(