diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 6270c5795b..b20d073814 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -268,13 +268,14 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor distribute, PlanTranslatorContext context) { - PlanFragment inputFragment = distribute.child().accept(this, context); - List> distributeExprLists = getDistributeExprs(distribute.child()); + Plan child = distribute.child(); + PlanFragment inputFragment = child.accept(this, context); + List> distributeExprLists = getDistributeExprs(child); // TODO: why need set streaming here? should remove this. if (inputFragment.getPlanRoot() instanceof AggregationNode - && distribute.child() instanceof PhysicalHashAggregate - && context.getFirstAggregateInFragment(inputFragment) == distribute.child()) { - PhysicalHashAggregate hashAggregate = (PhysicalHashAggregate) distribute.child(); + && child instanceof PhysicalHashAggregate + && context.getFirstAggregateInFragment(inputFragment) == child) { + PhysicalHashAggregate hashAggregate = (PhysicalHashAggregate) child; if (hashAggregate.getAggPhase() == AggPhase.LOCAL && hashAggregate.getAggMode() == AggMode.INPUT_TO_BUFFER) { AggregationNode aggregationNode = (AggregationNode) inputFragment.getPlanRoot(); @@ -285,23 +286,26 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor validOutputIds = distribute.getOutputExprIds(); - if (distribute.child() instanceof PhysicalHashAggregate) { + if (child instanceof PhysicalHashAggregate) { // we must add group by keys to output list, // otherwise we could not process aggregate's output without group by keys - List keys = ((PhysicalHashAggregate) distribute.child()).getGroupByExpressions().stream() + List keys = ((PhysicalHashAggregate) child).getGroupByExpressions().stream() .filter(SlotReference.class::isInstance) .map(SlotReference.class::cast) .map(SlotReference::getExprId) .collect(Collectors.toList()); keys.addAll(validOutputIds); validOutputIds = keys; + } else if (child instanceof PhysicalLimit && ((PhysicalLimit) child).getPhase().isGlobal()) { + // because sort already contains Offset, we don't need to handle PhysicalTopN + exchangeNode.setOffset(((PhysicalLimit) child).getOffset()); } if (inputFragment instanceof MultiCastPlanFragment) { // TODO: remove this logic when we split to multi-window in logical window to physical window conversion MultiCastDataSink multiCastDataSink = (MultiCastDataSink) inputFragment.getSink(); DataStreamSink dataStreamSink = multiCastDataSink.getDataStreamSinks().get( multiCastDataSink.getDataStreamSinks().size() - 1); - if (!(distribute.child() instanceof PhysicalProject)) { + if (!(child instanceof PhysicalProject)) { List projectionExprs = new ArrayList<>(); PhysicalCTEConsumer consumer = getCTEConsumerChild(distribute); Preconditions.checkState(consumer != null, "consumer not found"); @@ -1591,7 +1595,8 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor limit, CascadesContext context) { + if (limit.getPhase().isLocal() || limit.getOffset() == 0) { + return limit; + } + + return new PhysicalDistribute<>(DistributionSpecGather.INSTANCE, + limit.withLimit(limit.getLimit() + limit.getOffset())); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java index 17538d55d4..6d85bebb2d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java @@ -61,6 +61,7 @@ public class PlanPostProcessors { builder.add(new PushDownFilterThroughProject()); builder.add(new MergeProjectPostProcessor()); builder.add(new RecomputeLogicalPropertiesProcessor()); + builder.add(new AddOffsetIntoDistribute()); builder.add(new TopNScanOpt()); // after generate rf, DO NOT replace PLAN NODE builder.add(new FragmentProcessor()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalLimit.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalLimit.java index ab7f5f811e..bbc2143df6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalLimit.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalLimit.java @@ -101,6 +101,11 @@ public class PhysicalLimit extends PhysicalUnary(limit, offset, phase, groupExpression, getLogicalProperties(), + physicalProperties, statistics, children.get(0)); + } + @Override public Plan withChildren(List children) { Preconditions.checkArgument(children.size() == 1);