From 1e7ef357414132a4848836c33df6f27856fdc5eb Mon Sep 17 00:00:00 2001 From: morrySnow <101034200+morrySnow@users.noreply.github.com> Date: Sun, 23 Apr 2023 21:32:23 +0800 Subject: [PATCH] [fix](Nereids) two phase read for topn only support simple case (#18955) 1. topn must has merge node 2. topn must the top node of plan --- .../translator/PhysicalPlanTranslator.java | 6 +- .../processor/post/PlanPostProcessor.java | 5 ++ .../processor/post/PlanPostProcessors.java | 2 +- .../nereids/processor/post/TopNScanOpt.java | 2 +- .../processor/post/TwoPhaseReadOpt.java | 55 +++++++++++++------ .../trees/plans/physical/PhysicalTopN.java | 2 +- .../postprocess/TopNRuntimeFilterTest.java | 4 +- 7 files changed, 49 insertions(+), 27 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index e51e5469b7..7961fddd57 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -884,7 +884,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor sortTupleOutputList = new ArrayList<>(); List outputList = sort.getOutput(); - outputList.forEach(k -> { - sortTupleOutputList.add(ExpressionTranslator.translate(k, context)); - }); + outputList.forEach(k -> sortTupleOutputList.add(ExpressionTranslator.translate(k, context))); // 2. Generate new Tuple and get current slotRef for newOrderingExprList List newOrderingExprList = Lists.newArrayList(); TupleDescriptor tupleDesc = generateTupleDesc(outputList, orderKeyList, newOrderingExprList, context, null); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessor.java index fa6a9deaa9..5090acedf4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessor.java @@ -18,10 +18,15 @@ package org.apache.doris.nereids.processor.post; import org.apache.doris.nereids.CascadesContext; +import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter; /** * PlanPostprocessor: a PlanVisitor to rewrite PhysicalPlan to new PhysicalPlan. */ public class PlanPostProcessor extends DefaultPlanRewriter { + + public Plan processRoot(Plan plan, CascadesContext ctx) { + return plan.accept(this, ctx); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java index b96e7bbf3e..0843d1e04b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java @@ -47,7 +47,7 @@ public class PlanPostProcessors { public PhysicalPlan process(PhysicalPlan physicalPlan) { PhysicalPlan resultPlan = physicalPlan; for (PlanPostProcessor processor : getProcessors()) { - resultPlan = (PhysicalPlan) resultPlan.accept(processor, cascadesContext); + resultPlan = (PhysicalPlan) processor.processRoot(resultPlan, cascadesContext); } return resultPlan; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TopNScanOpt.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TopNScanOpt.java index bb00983d2d..a938a231ed 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TopNScanOpt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TopNScanOpt.java @@ -78,7 +78,7 @@ public class TopNScanOpt extends PlanPostProcessor { olapScan = (PhysicalOlapScan) child; if (olapScan.getTable().isDupKeysOrMergeOnWrite()) { - topN.setMutableState(PhysicalTopN.TOPN_OPT, true); + topN.setMutableState(PhysicalTopN.TOPN_RUNTIME_FILTER, true); } return topN; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TwoPhaseReadOpt.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TwoPhaseReadOpt.java index 454caae435..543f908456 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TwoPhaseReadOpt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TwoPhaseReadOpt.java @@ -28,6 +28,7 @@ import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.SortPhase; import org.apache.doris.nereids.trees.plans.algebra.Filter; import org.apache.doris.nereids.trees.plans.algebra.Project; +import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute; import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter; import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan; import org.apache.doris.nereids.trees.plans.physical.PhysicalProject; @@ -52,23 +53,40 @@ import java.util.Set; public class TwoPhaseReadOpt extends PlanPostProcessor { @Override - public PhysicalTopN visitPhysicalTopN(PhysicalTopN topN, CascadesContext ctx) { - topN.child().accept(this, ctx); - Plan child = topN.child(); - if (topN.getSortPhase() != SortPhase.LOCAL_SORT) { - return topN; + public Plan processRoot(Plan plan, CascadesContext ctx) { + if (plan instanceof PhysicalTopN) { + PhysicalTopN physicalTopN = (PhysicalTopN) plan; + if (physicalTopN.getSortPhase() == SortPhase.MERGE_SORT) { + return plan.accept(this, ctx); + } } - if (topN.getOrderKeys().isEmpty()) { - return topN; + return plan; + } + + @Override + public PhysicalTopN visitPhysicalTopN(PhysicalTopN mergeTopN, CascadesContext ctx) { + mergeTopN.child().accept(this, ctx); + Plan child = mergeTopN.child(); + if (mergeTopN.getSortPhase() != SortPhase.MERGE_SORT || !(mergeTopN.child() instanceof PhysicalDistribute)) { + return mergeTopN; + } + PhysicalDistribute distribute = (PhysicalDistribute) mergeTopN.child(); + if (!(distribute.child() instanceof PhysicalTopN)) { + return mergeTopN; + } + PhysicalTopN localTopN = (PhysicalTopN) distribute.child(); + + if (localTopN.getOrderKeys().isEmpty()) { + return mergeTopN; } // topn opt long topNOptLimitThreshold = getTopNOptLimitThreshold(); - if (topNOptLimitThreshold < 0 || topN.getLimit() > topNOptLimitThreshold) { - return topN; + if (topNOptLimitThreshold < 0 || mergeTopN.getLimit() > topNOptLimitThreshold) { + return mergeTopN; } - if (!topN.getOrderKeys().stream().map(OrderKey::getExpr).allMatch(Expression::isColumnFromTable)) { - return topN; + if (!localTopN.getOrderKeys().stream().map(OrderKey::getExpr).allMatch(Expression::isColumnFromTable)) { + return mergeTopN; } PhysicalOlapScan olapScan; @@ -81,18 +99,18 @@ public class TwoPhaseReadOpt extends PlanPostProcessor { if (child instanceof Project) { project = (PhysicalProject) child; // TODO: remove this after fix two phase read on project core - return topN; + return mergeTopN; } child = child.child(0); } if (!(child instanceof PhysicalOlapScan)) { - return topN; + return mergeTopN; } olapScan = (PhysicalOlapScan) child; // all order key must column from table if (!olapScan.getTable().getEnableLightSchemaChange()) { - return topN; + return mergeTopN; } Map projectRevertedMap = Maps.newHashMap(); @@ -114,22 +132,23 @@ public class TwoPhaseReadOpt extends PlanPostProcessor { if (filter != null) { filter.getConjuncts().forEach(e -> deferredMaterializedExprIds.removeAll(e.getInputSlotExprIds())); } - topN.getOrderKeys().stream() + localTopN.getOrderKeys().stream() .map(OrderKey::getExpr) .map(Slot.class::cast) .map(NamedExpression::getExprId) .map(projectRevertedMap::get) .filter(Objects::nonNull) .forEach(deferredMaterializedExprIds::remove); - topN.getOrderKeys().stream() + localTopN.getOrderKeys().stream() .map(OrderKey::getExpr) .map(Slot.class::cast) .map(NamedExpression::getExprId) .forEach(deferredMaterializedExprIds::remove); - topN.setMutableState(PhysicalTopN.TWO_PHASE_READ_OPT, true); + localTopN.setMutableState(PhysicalTopN.TWO_PHASE_READ_OPT, true); + mergeTopN.setMutableState(PhysicalTopN.TWO_PHASE_READ_OPT, true); olapScan.setMutableState(PhysicalOlapScan.DEFERRED_MATERIALIZED_SLOTS, deferredMaterializedExprIds); - return topN; + return mergeTopN; } private long getTopNOptLimitThreshold() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTopN.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTopN.java index 2ca9a4c51c..4a58f5d9e9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTopN.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTopN.java @@ -40,7 +40,7 @@ import java.util.Optional; */ public class PhysicalTopN extends AbstractPhysicalSort implements TopN { - public static final String TOPN_OPT = "topn_opt"; + public static final String TOPN_RUNTIME_FILTER = "topn_runtime_filter"; public static final String TWO_PHASE_READ_OPT = "two_phase_read_opt"; private final long limit; diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/TopNRuntimeFilterTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/TopNRuntimeFilterTest.java index 9da544143a..6cbbcbf071 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/TopNRuntimeFilterTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/TopNRuntimeFilterTest.java @@ -42,7 +42,7 @@ public class TopNRuntimeFilterTest extends SSBTestBase { new PlanPostProcessors(checker.getCascadesContext()).process(plan); Assertions.assertTrue(plan.children().get(0) instanceof PhysicalTopN); PhysicalTopN localTopN = (PhysicalTopN) plan.children().get(0); - Assertions.assertTrue(localTopN.getMutableState(PhysicalTopN.TOPN_OPT).isPresent()); + Assertions.assertTrue(localTopN.getMutableState(PhysicalTopN.TOPN_RUNTIME_FILTER).isPresent()); } // topn rf do not apply on string-like and float column @@ -56,6 +56,6 @@ public class TopNRuntimeFilterTest extends SSBTestBase { new PlanPostProcessors(checker.getCascadesContext()).process(plan); Assertions.assertTrue(plan.children().get(0) instanceof PhysicalTopN); PhysicalTopN localTopN = (PhysicalTopN) plan.children().get(0); - Assertions.assertFalse(localTopN.getMutableState(PhysicalTopN.TOPN_OPT).isPresent()); + Assertions.assertFalse(localTopN.getMutableState(PhysicalTopN.TOPN_RUNTIME_FILTER).isPresent()); } }