From eea657a6106db8fafff98ba706b74ee2eaa8d4a1 Mon Sep 17 00:00:00 2001 From: minghong Date: Mon, 8 Jan 2024 16:33:43 +0800 Subject: [PATCH] [rf](nereids)prune rf for external db according to jump count (#29634) * prune some rf for external db --- .../processor/post/PlanPostProcessors.java | 3 + .../processor/post/RuntimeFilterContext.java | 20 ++- .../post/RuntimeFilterGenerator.java | 6 +- .../RuntimeFilterPrunerForExternalTable.java | 158 ++++++++++++++++++ .../plans/physical/AbstractPhysicalPlan.java | 4 +- .../plans/physical/PhysicalHashJoin.java | 1 + .../trees/plans/physical/RuntimeFilter.java | 23 ++- .../doris/nereids/util/MutableState.java | 2 + .../org/apache/doris/qe/SessionVariable.java | 6 + .../limit_push_down/order_push_down.out | 18 +- .../push_down_top_n_through_union.out | 2 +- .../rf_prune/query72.out | 2 +- 12 files changed, 217 insertions(+), 28 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPrunerForExternalTable.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java index 7e69db0477..17538d55d4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java @@ -69,6 +69,9 @@ public class PlanPostProcessors { builder.add(new RuntimeFilterGenerator()); if (ConnectContext.get().getSessionVariable().enableRuntimeFilterPrune) { builder.add(new RuntimeFilterPruner()); + if (ConnectContext.get().getSessionVariable().runtimeFilterPruneForExternal) { + builder.add(new RuntimeFilterPrunerForExternalTable()); + } } } builder.add(new Validator()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java index 9fa2346221..b432649903 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java @@ -96,6 +96,8 @@ public class RuntimeFilterContext { public List prunedRF = Lists.newArrayList(); + public final List needRfPlans = Lists.newArrayList(); + private final IdGenerator generator = RuntimeFilterId.createGenerator(); // exprId of target to runtime filter. @@ -197,14 +199,20 @@ public class RuntimeFilterContext { RuntimeFilter rf = iter.next(); if (rf.getBuilderNode().equals(builderNode)) { builderNode.getRuntimeFilters().remove(rf); - for (Slot target : rf.getTargetSlots()) { - if (target.getExprId().equals(targetId)) { - Pair pair = aliasTransferMap.get(target); - if (pair != null) { - pair.first.removeAppliedRuntimeFilter(rf); - } + for (int i = 0; i < rf.getTargetSlots().size(); i++) { + Slot targetSlot = rf.getTargetSlots().get(i); + if (targetSlot.getExprId().equals(targetId)) { + rf.getTargetScans().get(i).removeAppliedRuntimeFilter(rf); } } + // for (Slot target : rf.getTargetSlots()) { + // if (target.getExprId().equals(targetId)) { + // Pair pair = aliasTransferMap.get(target); + // if (pair != null) { + // pair.first.removeAppliedRuntimeFilter(rf); + // } + // } + // } iter.remove(); prunedRF.add(rf); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java index b73b6e8bda..da28b40b03 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java @@ -186,7 +186,7 @@ public class RuntimeFilterGenerator extends PlanPostProcessor { PhysicalRelation scan = ctx.getAliasTransferPair(targetSlot).first; RuntimeFilter filter = new RuntimeFilter(generator.getNextId(), bitmapContains.child(0), ImmutableList.of(scanSlot), - ImmutableList.of(bitmapContains.child(1)), type, i, join, isNot, -1L); + ImmutableList.of(bitmapContains.child(1)), type, i, join, isNot, -1L, scan); scan.addAppliedRuntimeFilter(filter); ctx.addJoinToTargetMap(join, scanSlot.getExprId()); ctx.setTargetExprIdToFilter(scanSlot.getExprId(), filter); @@ -266,7 +266,7 @@ public class RuntimeFilterGenerator extends PlanPostProcessor { RuntimeFilter filter = new RuntimeFilter(generator.getNextId(), compare.child(1), ImmutableList.of(olapScanSlot), ImmutableList.of(olapScanSlot), TRuntimeFilterType.MIN_MAX, exprOrder, join, true, buildSideNdv, - getMinMaxType(compare)); + getMinMaxType(compare), scan); scan.addAppliedRuntimeFilter(filter); ctx.addJoinToTargetMap(join, olapScanSlot.getExprId()); ctx.setTargetExprIdToFilter(olapScanSlot.getExprId(), filter); @@ -625,7 +625,7 @@ public class RuntimeFilterGenerator extends PlanPostProcessor { // build multi-target runtime filter // since always on different join, set the expr_order as 0 RuntimeFilter filter = new RuntimeFilter(generator.getNextId(), - equalTo.right(), targetList, type, 0, join, buildSideNdv); + equalTo.right(), targetList, type, 0, join, buildSideNdv, cteNode); targetNodes.forEach(node -> node.addAppliedRuntimeFilter(filter)); for (Slot slot : targetList) { ctx.setTargetExprIdToFilter(slot.getExprId(), filter); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPrunerForExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPrunerForExternalTable.java new file mode 100644 index 0000000000..dd104173b2 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPrunerForExternalTable.java @@ -0,0 +1,158 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.processor.post; + +import org.apache.doris.nereids.CascadesContext; +import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.algebra.Join; +import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalJoin; +import org.apache.doris.nereids.trees.plans.physical.PhysicalFileScan; +import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin; +import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation; +import org.apache.doris.nereids.trees.plans.physical.RuntimeFilter; +import org.apache.doris.nereids.util.MutableState; +import org.apache.doris.qe.ConnectContext; + +import com.google.common.collect.Lists; + +import java.util.List; +import java.util.Optional; + +/** + * prune rf for external db + */ +public class RuntimeFilterPrunerForExternalTable extends PlanPostProcessor { + /** + * add parent to plan node and then remove rf if it is an external scan and only used as probe + */ + @Override + public Plan processRoot(Plan plan, CascadesContext ctx) { + plan = plan.accept(this, ctx); + RuntimeFilterContext rfCtx = ctx.getRuntimeFilterContext(); + for (RuntimeFilter rf : rfCtx.getNereidsRuntimeFilter()) { + AbstractPhysicalJoin join = rf.getBuilderNode(); + if (join instanceof PhysicalHashJoin) { + List joinAncestors = getAncestors(rf.getBuilderNode()); + for (int i = 0; i < rf.getTargetScans().size(); i++) { + PhysicalRelation scan = rf.getTargetScans().get(i); + if (canPrune(scan, joinAncestors)) { + rfCtx.removeFilter(rf.getTargetSlots().get(i).getExprId(), (PhysicalHashJoin) join); + } + } + } + } + return plan; + } + + @Override + public Plan visit(Plan plan, CascadesContext context) { + for (Plan child : plan.children()) { + child.setMutableState(MutableState.KEY_PARENT, plan); + child.accept(this, context); + } + setMaxChildRuntimeFilterJump(plan); + return plan; + } + + @Override + public PhysicalRelation visitPhysicalRelation(PhysicalRelation scan, CascadesContext context) { + RuntimeFilterContext rfCtx = context.getRuntimeFilterContext(); + List slots = rfCtx.getTargetListByScan(scan); + int maxJump = -1; + for (Slot slot : slots) { + if (!rfCtx.getTargetExprIdToFilter().get(slot.getExprId()).isEmpty()) { + for (RuntimeFilter rf : rfCtx.getTargetExprIdToFilter().get(slot.getExprId())) { + Optional oJump = rf.getBuilderNode().getMutableState(MutableState.KEY_RF_JUMP); + if (oJump.isPresent()) { + Integer jump = (Integer) (oJump.get()); + if (jump > maxJump) { + maxJump = jump; + } + } + } + } + } + scan.setMutableState(MutableState.KEY_RF_JUMP, maxJump + 1); + return scan; + } + + @Override + public PhysicalHashJoin visitPhysicalHashJoin(PhysicalHashJoin join, + CascadesContext context) { + join.right().accept(this, context); + join.right().setMutableState(MutableState.KEY_PARENT, join); + join.setMutableState(MutableState.KEY_RF_JUMP, join.right().getMutableState(MutableState.KEY_RF_JUMP).get()); + join.left().accept(this, context); + join.left().setMutableState(MutableState.KEY_PARENT, join); + return join; + } + + private List getAncestors(Plan plan) { + List ancestors = Lists.newArrayList(); + ancestors.add(plan); + Optional parent = plan.getMutableState(MutableState.KEY_PARENT); + while (parent.isPresent()) { + ancestors.add((Plan) parent.get()); + parent = ((Plan) parent.get()).getMutableState(MutableState.KEY_PARENT); + } + return ancestors; + } + + private boolean canPrune(PhysicalRelation scan, List joinAndAncestors) { + if (!(scan instanceof PhysicalFileScan)) { + return false; + } + Plan cursor = scan; + Optional parent = cursor.getMutableState(MutableState.KEY_PARENT); + while (parent.isPresent()) { + if (joinAndAncestors.contains(parent.get())) { + Optional oi = parent.get().getMutableState(MutableState.KEY_RF_JUMP); + if (oi.isPresent() && ConnectContext.get() != null + && (int) (oi.get()) > ConnectContext.get().getSessionVariable().runtimeFilterJumpThreshold) { + return true; + } + } else { + if (isBuildSide(parent.get(), cursor)) { + return false; + } + } + cursor = parent.get(); + parent = cursor.getMutableState(MutableState.KEY_PARENT); + } + return false; + } + + private boolean isBuildSide(Plan parent, Plan child) { + return parent instanceof Join && child.equals(parent.child(1)); + } + + private void setMaxChildRuntimeFilterJump(Plan plan) { + int maxJump = 0; + for (Plan child : plan.children()) { + Optional oi = child.getMutableState(MutableState.KEY_RF_JUMP); + if (oi.isPresent()) { + int jump = (Integer) (oi.get()); + if (jump > maxJump) { + maxJump = jump; + } + } + } + plan.setMutableState(MutableState.KEY_RF_JUMP, maxJump); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java index 5b668c0135..a2968ca808 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java @@ -120,14 +120,14 @@ public abstract class AbstractPhysicalPlan extends AbstractPlan implements Physi Preconditions.checkState(scanSlot != null, "scan slot is null"); if (filter != null) { this.addAppliedRuntimeFilter(filter); - filter.addTargetSlot(scanSlot); + filter.addTargetSlot(scanSlot, scan); filter.addTargetExpression(scanSlot); ctx.addJoinToTargetMap(builderNode, scanSlot.getExprId()); ctx.setTargetExprIdToFilter(scanSlot.getExprId(), filter); ctx.setTargetsOnScanNode(ctx.getAliasTransferPair((NamedExpression) probeExpr).first, scanSlot); } else { filter = new RuntimeFilter(generator.getNextId(), - src, ImmutableList.of(scanSlot), type, exprOrder, builderNode, buildSideNdv); + src, ImmutableList.of(scanSlot), type, exprOrder, builderNode, buildSideNdv, scan); this.addAppliedRuntimeFilter(filter); ctx.addJoinToTargetMap(builderNode, scanSlot.getExprId()); ctx.setTargetExprIdToFilter(scanSlot.getExprId(), filter); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashJoin.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashJoin.java index b1dd9a329c..cf1ff7fdf5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashJoin.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashJoin.java @@ -257,6 +257,7 @@ public class PhysicalHashJoin< builder.append(" build RFs:").append(runtimeFilters.stream() .map(rf -> rf.shapeInfo()).collect(Collectors.joining(";"))); } + // builder.append("jump: ").append(getMutableState(MutableState.KEY_RF_JUMP)); return builder.toString(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/RuntimeFilter.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/RuntimeFilter.java index d928c45078..a8f4c3cd7c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/RuntimeFilter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/RuntimeFilter.java @@ -50,20 +50,24 @@ public class RuntimeFilter { // use for min-max filter only. specify if the min or max side is valid private final TMinMaxRuntimeFilterType tMinMaxType; + private final List targetScans = Lists.newArrayList(); + /** * constructor */ public RuntimeFilter(RuntimeFilterId id, Expression src, List targets, TRuntimeFilterType type, - int exprOrder, AbstractPhysicalJoin builderNode, long buildSideNdv) { + int exprOrder, AbstractPhysicalJoin builderNode, long buildSideNdv, + PhysicalRelation scan) { this(id, src, targets, ImmutableList.copyOf(targets), type, exprOrder, - builderNode, false, buildSideNdv, TMinMaxRuntimeFilterType.MIN_MAX); + builderNode, false, buildSideNdv, TMinMaxRuntimeFilterType.MIN_MAX, scan); } public RuntimeFilter(RuntimeFilterId id, Expression src, List targets, List targetExpressions, TRuntimeFilterType type, int exprOrder, AbstractPhysicalJoin builderNode, - boolean bitmapFilterNotIn, long buildSideNdv) { + boolean bitmapFilterNotIn, long buildSideNdv, + PhysicalRelation scan) { this(id, src, targets, targetExpressions, type, exprOrder, - builderNode, bitmapFilterNotIn, buildSideNdv, TMinMaxRuntimeFilterType.MIN_MAX); + builderNode, bitmapFilterNotIn, buildSideNdv, TMinMaxRuntimeFilterType.MIN_MAX, scan); } /** @@ -71,7 +75,8 @@ public class RuntimeFilter { */ public RuntimeFilter(RuntimeFilterId id, Expression src, List targets, List targetExpressions, TRuntimeFilterType type, int exprOrder, AbstractPhysicalJoin builderNode, - boolean bitmapFilterNotIn, long buildSideNdv, TMinMaxRuntimeFilterType tMinMaxType) { + boolean bitmapFilterNotIn, long buildSideNdv, TMinMaxRuntimeFilterType tMinMaxType, + PhysicalRelation scan) { this.id = id; this.srcSlot = src; this.targetSlots = Lists.newArrayList(targets); @@ -83,6 +88,7 @@ public class RuntimeFilter { this.buildSideNdv = buildSideNdv <= 0 ? -1L : buildSideNdv; this.tMinMaxType = tMinMaxType; builderNode.addRuntimeFilter(this); + this.targetScans.add(scan); } public TMinMaxRuntimeFilterType gettMinMaxType() { @@ -125,8 +131,9 @@ public class RuntimeFilter { return buildSideNdv; } - public void addTargetSlot(Slot target) { + public void addTargetSlot(Slot target, PhysicalRelation scan) { targetSlots.add(target); + targetScans.add(scan); } public List getTargetSlots() { @@ -137,6 +144,10 @@ public class RuntimeFilter { targetExpressions.add(targetExpr); } + public List getTargetScans() { + return targetScans; + } + @Override public String toString() { StringBuilder sb = new StringBuilder(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/MutableState.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/MutableState.java index 37234e954d..6f03de77fa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/MutableState.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/MutableState.java @@ -25,6 +25,8 @@ import java.util.Optional; public interface MutableState { String KEY_GROUP = "group"; String KEY_FRAGMENT = "fragment"; + String KEY_PARENT = "parent"; + String KEY_RF_JUMP = "rf-jump"; Optional get(String key); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index be6e13e399..aca2e98c2a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -567,6 +567,12 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = ENABLE_PROFILE, needForward = true) public boolean enableProfile = false; + @VariableMgr.VarAttr(name = "runtime_filter_prune_for_external") + public boolean runtimeFilterPruneForExternal = true; + + @VariableMgr.VarAttr(name = "runtime_filter_jump_threshold") + public int runtimeFilterJumpThreshold = 2; + // using hashset instead of group by + count can improve performance // but may cause rpc failed when cluster has less BE // Whether this switch is turned on depends on the BE number diff --git a/regression-test/data/nereids_rules_p0/limit_push_down/order_push_down.out b/regression-test/data/nereids_rules_p0/limit_push_down/order_push_down.out index c95f0cfb3d..f22c364e98 100644 --- a/regression-test/data/nereids_rules_p0/limit_push_down/order_push_down.out +++ b/regression-test/data/nereids_rules_p0/limit_push_down/order_push_down.out @@ -12,7 +12,7 @@ PhysicalResultSink ----PhysicalDistribute[DistributionSpecGather] ------PhysicalTopN[LOCAL_SORT] --------hashJoin[INNER_JOIN] hashCondition=((t1.id = t2.id)) otherCondition=() -----------PhysicalOlapScan[t1] apply RFs: RF0 +----------PhysicalOlapScan[t1] ----------PhysicalDistribute[DistributionSpecHash] ------------PhysicalOlapScan[t2] @@ -22,7 +22,7 @@ PhysicalResultSink ----PhysicalDistribute[DistributionSpecGather] ------PhysicalTopN[LOCAL_SORT] --------hashJoin[LEFT_SEMI_JOIN] hashCondition=((t1.id = t2.id)) otherCondition=() -----------PhysicalOlapScan[t1] apply RFs: RF0 +----------PhysicalOlapScan[t1] ----------PhysicalDistribute[DistributionSpecHash] ------------PhysicalOlapScan[t2] @@ -114,7 +114,7 @@ PhysicalResultSink ----PhysicalDistribute[DistributionSpecGather] ------PhysicalTopN[LOCAL_SORT] --------hashJoin[INNER_JOIN] hashCondition=((t1.id = t2.id)) otherCondition=() -----------PhysicalOlapScan[t1] apply RFs: RF0 +----------PhysicalOlapScan[t1] ----------PhysicalDistribute[DistributionSpecHash] ------------PhysicalOlapScan[t2] @@ -401,7 +401,7 @@ PhysicalResultSink ----PhysicalDistribute[DistributionSpecGather] ------PhysicalTopN[LOCAL_SORT] --------hashJoin[INNER_JOIN] hashCondition=((t1.id = t2.id)) otherCondition=() -----------PhysicalOlapScan[t1] apply RFs: RF0 +----------PhysicalOlapScan[t1] ----------PhysicalDistribute[DistributionSpecHash] ------------PhysicalOlapScan[t2] @@ -683,7 +683,7 @@ PhysicalResultSink ----PhysicalDistribute[DistributionSpecGather] ------PhysicalTopN[LOCAL_SORT] --------hashJoin[INNER_JOIN] hashCondition=((t1.id = t2.id)) otherCondition=() -----------PhysicalOlapScan[t1] apply RFs: RF0 +----------PhysicalOlapScan[t1] ----------PhysicalDistribute[DistributionSpecHash] ------------PhysicalOlapScan[t2] @@ -754,7 +754,7 @@ PhysicalResultSink ----------PhysicalDistribute[DistributionSpecHash] ------------hashJoin[RIGHT_OUTER_JOIN] hashCondition=((t1.id = t2.id)) otherCondition=() --------------PhysicalDistribute[DistributionSpecHash] -----------------PhysicalOlapScan[t1] apply RFs: RF0 RF1 +----------------PhysicalOlapScan[t1] --------------PhysicalDistribute[DistributionSpecHash] ----------------PhysicalOlapScan[t2] ----------PhysicalDistribute[DistributionSpecHash] @@ -794,7 +794,7 @@ PhysicalResultSink ------PhysicalTopN[LOCAL_SORT] --------hashJoin[INNER_JOIN] hashCondition=((subq.id = t3.id)) otherCondition=() ----------hashJoin[INNER_JOIN] hashCondition=((subq.id = t2.id)) otherCondition=() -------------PhysicalOlapScan[t1] apply RFs: RF0 RF1 +------------PhysicalOlapScan[t1] ------------PhysicalDistribute[DistributionSpecHash] --------------PhysicalOlapScan[t2] ----------PhysicalDistribute[DistributionSpecHash] @@ -807,7 +807,7 @@ PhysicalResultSink ------PhysicalTopN[LOCAL_SORT] --------hashJoin[INNER_JOIN] hashCondition=((subq2.id = t3.id)) otherCondition=() ----------hashJoin[INNER_JOIN] hashCondition=((subq2.id = t2.id)) otherCondition=() -------------PhysicalOlapScan[t1] apply RFs: RF0 RF1 +------------PhysicalOlapScan[t1] ------------PhysicalDistribute[DistributionSpecHash] --------------PhysicalOlapScan[t2] ----------PhysicalDistribute[DistributionSpecHash] @@ -821,7 +821,7 @@ PhysicalResultSink --------hashAgg[LOCAL] ----------hashJoin[INNER_JOIN] hashCondition=((subq2.id = t3.id)) otherCondition=() ------------hashJoin[INNER_JOIN] hashCondition=((subq2.id = t2.id)) otherCondition=() ---------------PhysicalOlapScan[t1] apply RFs: RF0 RF1 +--------------PhysicalOlapScan[t1] --------------PhysicalDistribute[DistributionSpecHash] ----------------PhysicalOlapScan[t2] ------------PhysicalDistribute[DistributionSpecHash] diff --git a/regression-test/data/nereids_rules_p0/push_down_top_n/push_down_top_n_through_union.out b/regression-test/data/nereids_rules_p0/push_down_top_n/push_down_top_n_through_union.out index 5c0c8ba5ba..1135cd5270 100644 --- a/regression-test/data/nereids_rules_p0/push_down_top_n/push_down_top_n_through_union.out +++ b/regression-test/data/nereids_rules_p0/push_down_top_n/push_down_top_n_through_union.out @@ -101,7 +101,7 @@ PhysicalResultSink --------------PhysicalDistribute[DistributionSpecGather] ----------------PhysicalTopN[LOCAL_SORT] ------------------hashJoin[INNER_JOIN] hashCondition=((t1.id = t2.id)) otherCondition=() ---------------------PhysicalOlapScan[table1] apply RFs: RF0 +--------------------PhysicalOlapScan[table1] --------------------PhysicalOlapScan[table1] ----------PhysicalDistribute[DistributionSpecExecutionAny] ------------PhysicalTopN[MERGE_SORT] diff --git a/regression-test/data/nereids_tpcds_shape_sf100_p0/rf_prune/query72.out b/regression-test/data/nereids_tpcds_shape_sf100_p0/rf_prune/query72.out index 92933fa585..fe270dc642 100644 --- a/regression-test/data/nereids_tpcds_shape_sf100_p0/rf_prune/query72.out +++ b/regression-test/data/nereids_tpcds_shape_sf100_p0/rf_prune/query72.out @@ -38,7 +38,7 @@ PhysicalResultSink ----------------------------------------------------PhysicalDistribute[DistributionSpecReplicated] ------------------------------------------------------PhysicalProject --------------------------------------------------------filter((d1.d_year = 2002)) -----------------------------------------------------------PhysicalOlapScan[date_dim] apply RFs: RF5 +----------------------------------------------------------PhysicalOlapScan[date_dim] ------------------------------------------------PhysicalDistribute[DistributionSpecHash] --------------------------------------------------PhysicalProject ----------------------------------------------------filter((customer_demographics.cd_marital_status = 'W'))