From a8d690272f8f146dad0e049d9f857e3e030d6ca1 Mon Sep 17 00:00:00 2001 From: morrySnow <101034200+morrySnow@users.noreply.github.com> Date: Wed, 9 Aug 2023 12:13:21 +0800 Subject: [PATCH] [refactor](Nereids) let topn runtime filter as PhysicalTopN's attr (#22745) The original implement use MutableMap on PhysicalTopN. It is easy to lose if we rewrite the plan after this processor. The new implement use attr to indict whether use topn runtime filter --- .../translator/PhysicalPlanTranslator.java | 2 +- .../nereids/processor/post/TopNScanOpt.java | 34 +++++++++--- .../PhysicalDeferMaterializeTopN.java | 14 ++--- .../trees/plans/physical/PhysicalTopN.java | 52 ++++++++++++------- .../postprocess/TopNRuntimeFilterTest.java | 11 ++-- 5 files changed, 71 insertions(+), 42 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 41be06be44..4144987bae 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -1698,7 +1698,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor visitPhysicalTopN(PhysicalTopN topN, CascadesContext ctx) { - topN.child().accept(this, ctx); + Plan child = topN.child().accept(this, ctx); + topN = rewriteTopN(topN); + if (child != topN.child()) { + topN.withChildren(child); + } + return topN; + } + + @Override + public Plan visitPhysicalDeferMaterializeTopN(PhysicalDeferMaterializeTopN topN, + CascadesContext context) { + Plan child = topN.child().accept(this, context); + if (child != topN.child()) { + topN = topN.withChildren(ImmutableList.of(child)); + } + PhysicalTopN rewrittenTopN = rewriteTopN(topN.getPhysicalTopN()); + if (topN.getPhysicalTopN() != rewrittenTopN) { + topN = topN.withPhysicalTopN(rewrittenTopN); + } + return topN; + } + + private PhysicalTopN rewriteTopN(PhysicalTopN topN) { Plan child = topN.child(); if (topN.getSortPhase() != SortPhase.LOCAL_SORT) { return topN; @@ -79,18 +103,12 @@ public class TopNScanOpt extends PlanPostProcessor { olapScan = (OlapScan) child; if (olapScan.getTable().isDupKeysOrMergeOnWrite()) { - topN.setMutableState(PhysicalTopN.TOPN_RUNTIME_FILTER, true); + return topN.withEnableRuntimeFilter(true); } return topN; } - @Override - public Plan visitPhysicalDeferMaterializeTopN(PhysicalDeferMaterializeTopN topN, - CascadesContext context) { - return topN.withPhysicalTopN(visitPhysicalTopN(topN.getPhysicalTopN(), context)); - } - private long getTopNOptLimitThreshold() { if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable() != null) { return ConnectContext.get().getSessionVariable().topnOptLimitThreshold; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalDeferMaterializeTopN.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalDeferMaterializeTopN.java index 2c2a53761a..f5db3ff42f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalDeferMaterializeTopN.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalDeferMaterializeTopN.java @@ -91,13 +91,13 @@ public class PhysicalDeferMaterializeTopN return physicalTopN.getLimit(); } - public Plan withPhysicalTopN(PhysicalTopN physicalTopN) { + public PhysicalDeferMaterializeTopN withPhysicalTopN(PhysicalTopN physicalTopN) { return new PhysicalDeferMaterializeTopN<>(physicalTopN, deferMaterializeSlotIds, columnIdSlot, groupExpression, getLogicalProperties(), physicalProperties, statistics, physicalTopN.child()); } @Override - public Plan withChildren(List children) { + public PhysicalDeferMaterializeTopN withChildren(List children) { Preconditions.checkArgument(children.size() == 1, "PhysicalDeferMaterializeTopN's children size must be 1, but real is %s", children.size()); return new PhysicalDeferMaterializeTopN<>(physicalTopN.withChildren(ImmutableList.of(children.get(0))), @@ -111,13 +111,14 @@ public class PhysicalDeferMaterializeTopN } @Override - public Plan withGroupExpression(Optional groupExpression) { + public PhysicalDeferMaterializeTopN withGroupExpression(Optional groupExpression) { return new PhysicalDeferMaterializeTopN<>(physicalTopN, deferMaterializeSlotIds, columnIdSlot, groupExpression, getLogicalProperties(), physicalProperties, statistics, child()); } @Override - public Plan withGroupExprLogicalPropChildren(Optional groupExpression, + public PhysicalDeferMaterializeTopN withGroupExprLogicalPropChildren( + Optional groupExpression, Optional logicalProperties, List children) { Preconditions.checkArgument(children.size() == 1, "PhysicalDeferMaterializeTopN's children size must be 1, but real is %s", children.size()); @@ -127,7 +128,8 @@ public class PhysicalDeferMaterializeTopN } @Override - public PhysicalPlan withPhysicalPropertiesAndStats(PhysicalProperties physicalProperties, Statistics statistics) { + public PhysicalDeferMaterializeTopN withPhysicalPropertiesAndStats( + PhysicalProperties physicalProperties, Statistics statistics) { return new PhysicalDeferMaterializeTopN<>(physicalTopN, deferMaterializeSlotIds, columnIdSlot, groupExpression, getLogicalProperties(), physicalProperties, statistics, child()); } @@ -138,7 +140,7 @@ public class PhysicalDeferMaterializeTopN } @Override - public PhysicalDeferMaterializeTopN resetLogicalProperties() { + public PhysicalDeferMaterializeTopN resetLogicalProperties() { return new PhysicalDeferMaterializeTopN<>(physicalTopN, deferMaterializeSlotIds, columnIdSlot, groupExpression, null, physicalProperties, statistics, child()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTopN.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTopN.java index 7df18fd010..bccc2cad41 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTopN.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTopN.java @@ -41,37 +41,38 @@ import java.util.Optional; */ public class PhysicalTopN extends AbstractPhysicalSort implements TopN { - public static final String TOPN_RUNTIME_FILTER = "topn_runtime_filter"; - private final long limit; private final long offset; + private final boolean enableRuntimeFilter; public PhysicalTopN(List orderKeys, long limit, long offset, SortPhase phase, LogicalProperties logicalProperties, CHILD_TYPE child) { - this(orderKeys, limit, offset, phase, Optional.empty(), logicalProperties, child); + this(orderKeys, limit, offset, phase, false, Optional.empty(), logicalProperties, child); } /** * Constructor of PhysicalHashJoinNode. */ public PhysicalTopN(List orderKeys, long limit, long offset, - SortPhase phase, Optional groupExpression, LogicalProperties logicalProperties, - CHILD_TYPE child) { - this(orderKeys, limit, offset, phase, groupExpression, logicalProperties, - null, null, child); + SortPhase phase, boolean enableRuntimeFilter, + Optional groupExpression, LogicalProperties logicalProperties, CHILD_TYPE child) { + this(orderKeys, limit, offset, phase, enableRuntimeFilter, + groupExpression, logicalProperties, null, null, child); } /** * Constructor of PhysicalHashJoinNode. */ public PhysicalTopN(List orderKeys, long limit, long offset, - SortPhase phase, Optional groupExpression, LogicalProperties logicalProperties, + SortPhase phase, boolean enableRuntimeFilter, + Optional groupExpression, LogicalProperties logicalProperties, PhysicalProperties physicalProperties, Statistics statistics, CHILD_TYPE child) { super(PlanType.PHYSICAL_TOP_N, orderKeys, phase, groupExpression, logicalProperties, physicalProperties, statistics, child); Objects.requireNonNull(orderKeys, "orderKeys should not be null in PhysicalTopN."); this.limit = limit; this.offset = offset; + this.enableRuntimeFilter = enableRuntimeFilter; } public long getLimit() { @@ -82,6 +83,10 @@ public class PhysicalTopN extends AbstractPhysicalSort< return offset; } + public boolean isEnableRuntimeFilter() { + return enableRuntimeFilter; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -94,12 +99,12 @@ public class PhysicalTopN extends AbstractPhysicalSort< return false; } PhysicalTopN that = (PhysicalTopN) o; - return limit == that.limit && offset == that.offset; + return limit == that.limit && offset == that.offset && enableRuntimeFilter == that.enableRuntimeFilter; } @Override public int hashCode() { - return Objects.hash(super.hashCode(), limit, offset); + return Objects.hash(super.hashCode(), limit, offset, enableRuntimeFilter); } @Override @@ -107,33 +112,39 @@ public class PhysicalTopN extends AbstractPhysicalSort< return visitor.visitPhysicalTopN(this, context); } + public PhysicalTopN withEnableRuntimeFilter(boolean enableRuntimeFilter) { + return new PhysicalTopN<>(orderKeys, limit, offset, phase, enableRuntimeFilter, + groupExpression, getLogicalProperties(), child()); + } + @Override public PhysicalTopN withChildren(List children) { Preconditions.checkArgument(children.size() == 1, "PhysicalTopN's children size must be 1, but real is %s", children.size()); - return new PhysicalTopN<>(orderKeys, limit, offset, phase, groupExpression, + return new PhysicalTopN<>(orderKeys, limit, offset, phase, enableRuntimeFilter, groupExpression, getLogicalProperties(), physicalProperties, statistics, children.get(0)); } @Override public PhysicalTopN withGroupExpression(Optional groupExpression) { - return new PhysicalTopN<>(orderKeys, limit, offset, phase, groupExpression, getLogicalProperties(), child()); + return new PhysicalTopN<>(orderKeys, limit, offset, phase, enableRuntimeFilter, + groupExpression, getLogicalProperties(), child()); } @Override - public Plan withGroupExprLogicalPropChildren(Optional groupExpression, + public PhysicalTopN withGroupExprLogicalPropChildren(Optional groupExpression, Optional logicalProperties, List children) { Preconditions.checkArgument(children.size() == 1, "PhysicalTopN's children size must be 1, but real is %s", children.size()); - return new PhysicalTopN<>(orderKeys, limit, offset, phase, groupExpression, logicalProperties.get(), - children.get(0)); + return new PhysicalTopN<>(orderKeys, limit, offset, phase, enableRuntimeFilter, + groupExpression, logicalProperties.get(), children.get(0)); } @Override public PhysicalTopN withPhysicalPropertiesAndStats(PhysicalProperties physicalProperties, Statistics statistics) { - return new PhysicalTopN<>(orderKeys, limit, offset, phase, groupExpression, - getLogicalProperties(), physicalProperties, statistics, child()); + return new PhysicalTopN<>(orderKeys, limit, offset, phase, enableRuntimeFilter, + groupExpression, getLogicalProperties(), physicalProperties, statistics, child()); } @Override @@ -142,7 +153,8 @@ public class PhysicalTopN extends AbstractPhysicalSort< "limit", limit, "offset", offset, "orderKeys", orderKeys, - "phase", phase.toString() + "phase", phase.toString(), + "enableRuntimeFilter", enableRuntimeFilter ); } @@ -152,8 +164,8 @@ public class PhysicalTopN extends AbstractPhysicalSort< } @Override - public PhysicalTopN resetLogicalProperties() { - return new PhysicalTopN<>(orderKeys, limit, offset, phase, groupExpression, + public PhysicalTopN resetLogicalProperties() { + return new PhysicalTopN<>(orderKeys, limit, offset, phase, enableRuntimeFilter, groupExpression, null, physicalProperties, statistics, child()); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/TopNRuntimeFilterTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/TopNRuntimeFilterTest.java index 944ebcf3e8..f4fdf6f44f 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/TopNRuntimeFilterTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/TopNRuntimeFilterTest.java @@ -22,7 +22,6 @@ import org.apache.doris.nereids.processor.post.PlanPostProcessors; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeTopN; import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan; -import org.apache.doris.nereids.trees.plans.physical.PhysicalTopN; import org.apache.doris.nereids.util.PlanChecker; import org.junit.jupiter.api.Assertions; @@ -41,12 +40,11 @@ public class TopNRuntimeFilterTest extends SSBTestBase { .rewrite() .implement(); PhysicalPlan plan = checker.getPhysicalPlan(); - new PlanPostProcessors(checker.getCascadesContext()).process(plan); + plan = new PlanPostProcessors(checker.getCascadesContext()).process(plan); Assertions.assertTrue(plan.children().get(0).child(0) instanceof PhysicalDeferMaterializeTopN); PhysicalDeferMaterializeTopN localTopN = (PhysicalDeferMaterializeTopN) plan.child(0).child(0); - Assertions.assertTrue(localTopN.getPhysicalTopN() - .getMutableState(PhysicalTopN.TOPN_RUNTIME_FILTER).isPresent()); + Assertions.assertTrue(localTopN.getPhysicalTopN().isEnableRuntimeFilter()); } // topn rf do not apply on string-like and float column @@ -57,11 +55,10 @@ public class TopNRuntimeFilterTest extends SSBTestBase { .rewrite() .implement(); PhysicalPlan plan = checker.getPhysicalPlan(); - new PlanPostProcessors(checker.getCascadesContext()).process(plan); + plan = new PlanPostProcessors(checker.getCascadesContext()).process(plan); Assertions.assertTrue(plan.children().get(0).child(0) instanceof PhysicalDeferMaterializeTopN); PhysicalDeferMaterializeTopN localTopN = (PhysicalDeferMaterializeTopN) plan.child(0).child(0); - Assertions.assertFalse(localTopN.getPhysicalTopN() - .getMutableState(PhysicalTopN.TOPN_RUNTIME_FILTER).isPresent()); + Assertions.assertFalse(localTopN.getPhysicalTopN().isEnableRuntimeFilter()); } }