From 6a8fdb45c68f8d4eeb1891fecd8c33bb2349ee92 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Wed, 31 May 2023 10:25:18 +0800 Subject: [PATCH] [Bug](runtimefilter) Fix waiting for runtime filter (#20155) --- .../apache/doris/planner/RuntimeFilter.java | 13 +++ .../java/org/apache/doris/qe/Coordinator.java | 105 ++++++++++++------ 2 files changed, 81 insertions(+), 37 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java index 20cb993e7c..484a666851 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java @@ -105,6 +105,8 @@ public final class RuntimeFilter { private boolean bitmapFilterNotIn = false; + private boolean useRemoteRfOpt = true; + /** * Internal representation of a runtime filter target. */ @@ -184,6 +186,17 @@ public final class RuntimeFilter { this.bitmapFilterNotIn = bitmapFilterNotIn; } + public void computeUseRemoteRfOpt() { + for (RuntimeFilterTarget target : targets) { + useRemoteRfOpt = useRemoteRfOpt && hasRemoteTargets && runtimeFilterType == TRuntimeFilterType.BLOOM + && target.expr instanceof SlotRef; + } + } + + public boolean getUseRemoteRfOpt() { + return useRemoteRfOpt; + } + /** * Serializes a runtime filter to Thrift. */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index f2bbc3692d..f37de81217 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -100,6 +100,7 @@ import org.apache.doris.thrift.TQueryType; import org.apache.doris.thrift.TReportExecStatusParams; import org.apache.doris.thrift.TResourceLimit; import org.apache.doris.thrift.TRuntimeFilterParams; +import org.apache.doris.thrift.TRuntimeFilterTargetParams; import org.apache.doris.thrift.TRuntimeFilterTargetParamsV2; import org.apache.doris.thrift.TScanRange; import org.apache.doris.thrift.TScanRangeLocation; @@ -3138,26 +3139,41 @@ public class Coordinator { params.params.setRuntimeFilterParams(new TRuntimeFilterParams()); params.params.runtime_filter_params.setRuntimeFilterMergeAddr(runtimeFilterMergeAddr); if (instanceExecParam.instanceId.equals(runtimeFilterMergeInstanceId)) { - for (Map.Entry> entry - : ridToTargetParam.entrySet()) { - Map targetParams = new HashMap<>(); - for (FRuntimeFilterTargetParam targetParam : entry.getValue()) { - if (targetParams.containsKey(targetParam.targetFragmentInstanceAddr)) { - targetParams.get(targetParam.targetFragmentInstanceAddr).target_fragment_instance_ids - .add(targetParam.targetFragmentInstanceId); - } else { - targetParams.put(targetParam.targetFragmentInstanceAddr, - new TRuntimeFilterTargetParamsV2()); - targetParams.get(targetParam.targetFragmentInstanceAddr).target_fragment_instance_addr - = targetParam.targetFragmentInstanceAddr; - targetParams.get(targetParam.targetFragmentInstanceAddr).target_fragment_instance_ids - = new ArrayList<>(); - targetParams.get(targetParam.targetFragmentInstanceAddr).target_fragment_instance_ids - .add(targetParam.targetFragmentInstanceId); + for (RuntimeFilter rf : assignedRuntimeFilters) { + List fParams = ridToTargetParam.get(rf.getFilterId()); + rf.computeUseRemoteRfOpt(); + if (rf.getUseRemoteRfOpt()) { + Map targetParamsV2 = new HashMap<>(); + for (FRuntimeFilterTargetParam targetParam : fParams) { + if (targetParamsV2.containsKey(targetParam.targetFragmentInstanceAddr)) { + targetParamsV2.get(targetParam.targetFragmentInstanceAddr) + .target_fragment_instance_ids + .add(targetParam.targetFragmentInstanceId); + } else { + targetParamsV2.put(targetParam.targetFragmentInstanceAddr, + new TRuntimeFilterTargetParamsV2()); + targetParamsV2.get(targetParam.targetFragmentInstanceAddr) + .target_fragment_instance_addr + = targetParam.targetFragmentInstanceAddr; + targetParamsV2.get(targetParam.targetFragmentInstanceAddr) + .target_fragment_instance_ids + = new ArrayList<>(); + targetParamsV2.get(targetParam.targetFragmentInstanceAddr) + .target_fragment_instance_ids + .add(targetParam.targetFragmentInstanceId); + } } + params.params.runtime_filter_params.putToRidToTargetParamv2(rf.getFilterId().asInt(), + new ArrayList(targetParamsV2.values())); + } else { + List targetParams = Lists.newArrayList(); + for (FRuntimeFilterTargetParam targetParam : fParams) { + targetParams.add(new TRuntimeFilterTargetParams(targetParam.targetFragmentInstanceId, + targetParam.targetFragmentInstanceAddr)); + } + params.params.runtime_filter_params.putToRidToTargetParam(rf.getFilterId().asInt(), + targetParams); } - params.params.runtime_filter_params.putToRidToTargetParamv2(entry.getKey().asInt(), - new ArrayList(targetParams.values())); } for (Map.Entry entry : ridToBuilderNum.entrySet()) { params.params.runtime_filter_params.putToRuntimeFilterBuilderNum( @@ -3226,27 +3242,42 @@ public class Coordinator { localParams.setRuntimeFilterParams(new TRuntimeFilterParams()); localParams.runtime_filter_params.setRuntimeFilterMergeAddr(runtimeFilterMergeAddr); if (instanceExecParam.instanceId.equals(runtimeFilterMergeInstanceId)) { - for (Map.Entry> entry - : ridToTargetParam.entrySet()) { - Map targetParams = new HashMap<>(); - for (FRuntimeFilterTargetParam targetParam : entry.getValue()) { - if (targetParams.containsKey(targetParam.targetFragmentInstanceAddr)) { - targetParams.get(targetParam.targetFragmentInstanceAddr).target_fragment_instance_ids - .add(targetParam.targetFragmentInstanceId); - } else { - targetParams.put(targetParam.targetFragmentInstanceAddr, - new TRuntimeFilterTargetParamsV2()); - targetParams.get(targetParam.targetFragmentInstanceAddr).target_fragment_instance_addr - = targetParam.targetFragmentInstanceAddr; - targetParams.get(targetParam.targetFragmentInstanceAddr).target_fragment_instance_ids - = new ArrayList<>(); - targetParams.get(targetParam.targetFragmentInstanceAddr).target_fragment_instance_ids - .add(targetParam.targetFragmentInstanceId); + for (RuntimeFilter rf : assignedRuntimeFilters) { + List fParams = ridToTargetParam.get(rf.getFilterId()); + rf.computeUseRemoteRfOpt(); + if (rf.getUseRemoteRfOpt()) { + Map targetParamsV2 = new HashMap<>(); + for (FRuntimeFilterTargetParam targetParam : fParams) { + if (targetParamsV2.containsKey(targetParam.targetFragmentInstanceAddr)) { + targetParamsV2.get(targetParam.targetFragmentInstanceAddr) + .target_fragment_instance_ids + .add(targetParam.targetFragmentInstanceId); + } else { + targetParamsV2.put(targetParam.targetFragmentInstanceAddr, + new TRuntimeFilterTargetParamsV2()); + targetParamsV2.get(targetParam.targetFragmentInstanceAddr) + .target_fragment_instance_addr + = targetParam.targetFragmentInstanceAddr; + targetParamsV2.get(targetParam.targetFragmentInstanceAddr) + .target_fragment_instance_ids + = new ArrayList<>(); + targetParamsV2.get(targetParam.targetFragmentInstanceAddr) + .target_fragment_instance_ids + .add(targetParam.targetFragmentInstanceId); + } } - } - localParams.runtime_filter_params.putToRidToTargetParamv2(entry.getKey().asInt(), - new ArrayList(targetParams.values())); + localParams.runtime_filter_params.putToRidToTargetParamv2(rf.getFilterId().asInt(), + new ArrayList(targetParamsV2.values())); + } else { + List targetParams = Lists.newArrayList(); + for (FRuntimeFilterTargetParam targetParam : fParams) { + targetParams.add(new TRuntimeFilterTargetParams(targetParam.targetFragmentInstanceId, + targetParam.targetFragmentInstanceAddr)); + } + localParams.runtime_filter_params.putToRidToTargetParam(rf.getFilterId().asInt(), + targetParams); + } } for (Map.Entry entry : ridToBuilderNum.entrySet()) { localParams.runtime_filter_params.putToRuntimeFilterBuilderNum(