diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 41be06be44..4144987bae 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -1698,7 +1698,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor visitPhysicalTopN(PhysicalTopN topN, CascadesContext ctx) { - topN.child().accept(this, ctx); + Plan child = topN.child().accept(this, ctx); + topN = rewriteTopN(topN); + if (child != topN.child()) { + topN.withChildren(child); + } + return topN; + } + + @Override + public Plan visitPhysicalDeferMaterializeTopN(PhysicalDeferMaterializeTopN topN, + CascadesContext context) { + Plan child = topN.child().accept(this, context); + if (child != topN.child()) { + topN = topN.withChildren(ImmutableList.of(child)); + } + PhysicalTopN rewrittenTopN = rewriteTopN(topN.getPhysicalTopN()); + if (topN.getPhysicalTopN() != rewrittenTopN) { + topN = topN.withPhysicalTopN(rewrittenTopN); + } + return topN; + } + + private PhysicalTopN rewriteTopN(PhysicalTopN topN) { Plan child = topN.child(); if (topN.getSortPhase() != SortPhase.LOCAL_SORT) { return topN; @@ -79,18 +103,12 @@ public class TopNScanOpt extends PlanPostProcessor { olapScan = (OlapScan) child; if (olapScan.getTable().isDupKeysOrMergeOnWrite()) { - topN.setMutableState(PhysicalTopN.TOPN_RUNTIME_FILTER, true); + return topN.withEnableRuntimeFilter(true); } return topN; } - @Override - public Plan visitPhysicalDeferMaterializeTopN(PhysicalDeferMaterializeTopN topN, - CascadesContext context) { - return topN.withPhysicalTopN(visitPhysicalTopN(topN.getPhysicalTopN(), context)); - } - private long getTopNOptLimitThreshold() { if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable() != null) { return ConnectContext.get().getSessionVariable().topnOptLimitThreshold; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalDeferMaterializeTopN.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalDeferMaterializeTopN.java index 2c2a53761a..f5db3ff42f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalDeferMaterializeTopN.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalDeferMaterializeTopN.java @@ -91,13 +91,13 @@ public class PhysicalDeferMaterializeTopN return physicalTopN.getLimit(); } - public Plan withPhysicalTopN(PhysicalTopN physicalTopN) { + public PhysicalDeferMaterializeTopN withPhysicalTopN(PhysicalTopN physicalTopN) { return new PhysicalDeferMaterializeTopN<>(physicalTopN, deferMaterializeSlotIds, columnIdSlot, groupExpression, getLogicalProperties(), physicalProperties, statistics, physicalTopN.child()); } @Override - public Plan withChildren(List children) { + public PhysicalDeferMaterializeTopN withChildren(List children) { Preconditions.checkArgument(children.size() == 1, "PhysicalDeferMaterializeTopN's children size must be 1, but real is %s", children.size()); return new PhysicalDeferMaterializeTopN<>(physicalTopN.withChildren(ImmutableList.of(children.get(0))), @@ -111,13 +111,14 @@ public class PhysicalDeferMaterializeTopN } @Override - public Plan withGroupExpression(Optional groupExpression) { + public PhysicalDeferMaterializeTopN withGroupExpression(Optional groupExpression) { return new PhysicalDeferMaterializeTopN<>(physicalTopN, deferMaterializeSlotIds, columnIdSlot, groupExpression, getLogicalProperties(), physicalProperties, statistics, child()); } @Override - public Plan withGroupExprLogicalPropChildren(Optional groupExpression, + public PhysicalDeferMaterializeTopN withGroupExprLogicalPropChildren( + Optional groupExpression, Optional logicalProperties, List children) { Preconditions.checkArgument(children.size() == 1, "PhysicalDeferMaterializeTopN's children size must be 1, but real is %s", children.size()); @@ -127,7 +128,8 @@ public class PhysicalDeferMaterializeTopN } @Override - public PhysicalPlan withPhysicalPropertiesAndStats(PhysicalProperties physicalProperties, Statistics statistics) { + public PhysicalDeferMaterializeTopN withPhysicalPropertiesAndStats( + PhysicalProperties physicalProperties, Statistics statistics) { return new PhysicalDeferMaterializeTopN<>(physicalTopN, deferMaterializeSlotIds, columnIdSlot, groupExpression, getLogicalProperties(), physicalProperties, statistics, child()); } @@ -138,7 +140,7 @@ public class PhysicalDeferMaterializeTopN } @Override - public PhysicalDeferMaterializeTopN resetLogicalProperties() { + public PhysicalDeferMaterializeTopN resetLogicalProperties() { return new PhysicalDeferMaterializeTopN<>(physicalTopN, deferMaterializeSlotIds, columnIdSlot, groupExpression, null, physicalProperties, statistics, child()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTopN.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTopN.java index 7df18fd010..bccc2cad41 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTopN.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTopN.java @@ -41,37 +41,38 @@ import java.util.Optional; */ public class PhysicalTopN extends AbstractPhysicalSort implements TopN { - public static final String TOPN_RUNTIME_FILTER = "topn_runtime_filter"; - private final long limit; private final long offset; + private final boolean enableRuntimeFilter; public PhysicalTopN(List orderKeys, long limit, long offset, SortPhase phase, LogicalProperties logicalProperties, CHILD_TYPE child) { - this(orderKeys, limit, offset, phase, Optional.empty(), logicalProperties, child); + this(orderKeys, limit, offset, phase, false, Optional.empty(), logicalProperties, child); } /** * Constructor of PhysicalHashJoinNode. */ public PhysicalTopN(List orderKeys, long limit, long offset, - SortPhase phase, Optional groupExpression, LogicalProperties logicalProperties, - CHILD_TYPE child) { - this(orderKeys, limit, offset, phase, groupExpression, logicalProperties, - null, null, child); + SortPhase phase, boolean enableRuntimeFilter, + Optional groupExpression, LogicalProperties logicalProperties, CHILD_TYPE child) { + this(orderKeys, limit, offset, phase, enableRuntimeFilter, + groupExpression, logicalProperties, null, null, child); } /** * Constructor of PhysicalHashJoinNode. */ public PhysicalTopN(List orderKeys, long limit, long offset, - SortPhase phase, Optional groupExpression, LogicalProperties logicalProperties, + SortPhase phase, boolean enableRuntimeFilter, + Optional groupExpression, LogicalProperties logicalProperties, PhysicalProperties physicalProperties, Statistics statistics, CHILD_TYPE child) { super(PlanType.PHYSICAL_TOP_N, orderKeys, phase, groupExpression, logicalProperties, physicalProperties, statistics, child); Objects.requireNonNull(orderKeys, "orderKeys should not be null in PhysicalTopN."); this.limit = limit; this.offset = offset; + this.enableRuntimeFilter = enableRuntimeFilter; } public long getLimit() { @@ -82,6 +83,10 @@ public class PhysicalTopN extends AbstractPhysicalSort< return offset; } + public boolean isEnableRuntimeFilter() { + return enableRuntimeFilter; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -94,12 +99,12 @@ public class PhysicalTopN extends AbstractPhysicalSort< return false; } PhysicalTopN that = (PhysicalTopN) o; - return limit == that.limit && offset == that.offset; + return limit == that.limit && offset == that.offset && enableRuntimeFilter == that.enableRuntimeFilter; } @Override public int hashCode() { - return Objects.hash(super.hashCode(), limit, offset); + return Objects.hash(super.hashCode(), limit, offset, enableRuntimeFilter); } @Override @@ -107,33 +112,39 @@ public class PhysicalTopN extends AbstractPhysicalSort< return visitor.visitPhysicalTopN(this, context); } + public PhysicalTopN withEnableRuntimeFilter(boolean enableRuntimeFilter) { + return new PhysicalTopN<>(orderKeys, limit, offset, phase, enableRuntimeFilter, + groupExpression, getLogicalProperties(), child()); + } + @Override public PhysicalTopN withChildren(List children) { Preconditions.checkArgument(children.size() == 1, "PhysicalTopN's children size must be 1, but real is %s", children.size()); - return new PhysicalTopN<>(orderKeys, limit, offset, phase, groupExpression, + return new PhysicalTopN<>(orderKeys, limit, offset, phase, enableRuntimeFilter, groupExpression, getLogicalProperties(), physicalProperties, statistics, children.get(0)); } @Override public PhysicalTopN withGroupExpression(Optional groupExpression) { - return new PhysicalTopN<>(orderKeys, limit, offset, phase, groupExpression, getLogicalProperties(), child()); + return new PhysicalTopN<>(orderKeys, limit, offset, phase, enableRuntimeFilter, + groupExpression, getLogicalProperties(), child()); } @Override - public Plan withGroupExprLogicalPropChildren(Optional groupExpression, + public PhysicalTopN withGroupExprLogicalPropChildren(Optional groupExpression, Optional logicalProperties, List children) { Preconditions.checkArgument(children.size() == 1, "PhysicalTopN's children size must be 1, but real is %s", children.size()); - return new PhysicalTopN<>(orderKeys, limit, offset, phase, groupExpression, logicalProperties.get(), - children.get(0)); + return new PhysicalTopN<>(orderKeys, limit, offset, phase, enableRuntimeFilter, + groupExpression, logicalProperties.get(), children.get(0)); } @Override public PhysicalTopN withPhysicalPropertiesAndStats(PhysicalProperties physicalProperties, Statistics statistics) { - return new PhysicalTopN<>(orderKeys, limit, offset, phase, groupExpression, - getLogicalProperties(), physicalProperties, statistics, child()); + return new PhysicalTopN<>(orderKeys, limit, offset, phase, enableRuntimeFilter, + groupExpression, getLogicalProperties(), physicalProperties, statistics, child()); } @Override @@ -142,7 +153,8 @@ public class PhysicalTopN extends AbstractPhysicalSort< "limit", limit, "offset", offset, "orderKeys", orderKeys, - "phase", phase.toString() + "phase", phase.toString(), + "enableRuntimeFilter", enableRuntimeFilter ); } @@ -152,8 +164,8 @@ public class PhysicalTopN extends AbstractPhysicalSort< } @Override - public PhysicalTopN resetLogicalProperties() { - return new PhysicalTopN<>(orderKeys, limit, offset, phase, groupExpression, + public PhysicalTopN resetLogicalProperties() { + return new PhysicalTopN<>(orderKeys, limit, offset, phase, enableRuntimeFilter, groupExpression, null, physicalProperties, statistics, child()); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/TopNRuntimeFilterTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/TopNRuntimeFilterTest.java index 944ebcf3e8..f4fdf6f44f 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/TopNRuntimeFilterTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/TopNRuntimeFilterTest.java @@ -22,7 +22,6 @@ import org.apache.doris.nereids.processor.post.PlanPostProcessors; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeTopN; import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan; -import org.apache.doris.nereids.trees.plans.physical.PhysicalTopN; import org.apache.doris.nereids.util.PlanChecker; import org.junit.jupiter.api.Assertions; @@ -41,12 +40,11 @@ public class TopNRuntimeFilterTest extends SSBTestBase { .rewrite() .implement(); PhysicalPlan plan = checker.getPhysicalPlan(); - new PlanPostProcessors(checker.getCascadesContext()).process(plan); + plan = new PlanPostProcessors(checker.getCascadesContext()).process(plan); Assertions.assertTrue(plan.children().get(0).child(0) instanceof PhysicalDeferMaterializeTopN); PhysicalDeferMaterializeTopN localTopN = (PhysicalDeferMaterializeTopN) plan.child(0).child(0); - Assertions.assertTrue(localTopN.getPhysicalTopN() - .getMutableState(PhysicalTopN.TOPN_RUNTIME_FILTER).isPresent()); + Assertions.assertTrue(localTopN.getPhysicalTopN().isEnableRuntimeFilter()); } // topn rf do not apply on string-like and float column @@ -57,11 +55,10 @@ public class TopNRuntimeFilterTest extends SSBTestBase { .rewrite() .implement(); PhysicalPlan plan = checker.getPhysicalPlan(); - new PlanPostProcessors(checker.getCascadesContext()).process(plan); + plan = new PlanPostProcessors(checker.getCascadesContext()).process(plan); Assertions.assertTrue(plan.children().get(0).child(0) instanceof PhysicalDeferMaterializeTopN); PhysicalDeferMaterializeTopN localTopN = (PhysicalDeferMaterializeTopN) plan.child(0).child(0); - Assertions.assertFalse(localTopN.getPhysicalTopN() - .getMutableState(PhysicalTopN.TOPN_RUNTIME_FILTER).isPresent()); + Assertions.assertFalse(localTopN.getPhysicalTopN().isEnableRuntimeFilter()); } }