diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java index c1591f9d78..1c94cb65ea 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java @@ -348,8 +348,18 @@ public class ChildOutputPropertyDeriver extends PlanVisitor partitionTopN, PlanContext context) { Preconditions.checkState(childrenOutputProperties.size() == 1); - PhysicalProperties childOutputProperty = childrenOutputProperties.get(0); - return new PhysicalProperties(childOutputProperty.getDistributionSpec()); + DistributionSpec childDistSpec = childrenOutputProperties.get(0).getDistributionSpec(); + + if (partitionTopN.getPhase().isTwoPhaseLocal() || partitionTopN.getPhase().isOnePhaseGlobal()) { + return new PhysicalProperties(childDistSpec); + } else { + Preconditions.checkState(partitionTopN.getPhase().isTwoPhaseGlobal(), + "partition topn phase is not two phase global"); + Preconditions.checkState(childDistSpec instanceof DistributionSpecHash, + "child dist spec is not hash spec"); + + return new PhysicalProperties(childDistSpec, new OrderSpec(partitionTopN.getOrderKeys())); + } } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java index a3a91880c5..93f6e94789 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java @@ -38,6 +38,7 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter; import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate; import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin; import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin; +import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN; import org.apache.doris.nereids.trees.plans.physical.PhysicalProject; import org.apache.doris.nereids.trees.plans.physical.PhysicalSetOperation; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; @@ -153,6 +154,23 @@ public class ChildrenPropertiesRegulator extends PlanVisitor { return true; } + @Override + public Boolean visitPhysicalPartitionTopN(PhysicalPartitionTopN partitionTopN, Void context) { + if (partitionTopN.getPhase().isOnePhaseGlobal() && children.get(0).getPlan() instanceof PhysicalDistribute) { + // one phase partition topn, if the child is an enforced distribution, discard this + // and use two phase candidate. + return false; + } else if (partitionTopN.getPhase().isTwoPhaseGlobal() + && !(children.get(0).getPlan() instanceof PhysicalDistribute)) { + // two phase partition topn, if global's child is not distribution, which means + // the local distribution has met final requirement, discard this candidate. + return false; + } else { + visit(partitionTopN, context); + return true; + } + } + @Override public Boolean visitPhysicalFilter(PhysicalFilter filter, Void context) { // do not process must shuffle diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java index 870af51b92..ef8fd61415 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java @@ -37,6 +37,7 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin; import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit; import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin; import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink; +import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN; import org.apache.doris.nereids.trees.plans.physical.PhysicalResultSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalSetOperation; import org.apache.doris.nereids.trees.plans.physical.PhysicalUnion; @@ -44,6 +45,7 @@ import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.nereids.util.JoinUtils; import org.apache.doris.qe.ConnectContext; +import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; @@ -250,6 +252,21 @@ public class RequestPropertyDeriver extends PlanVisitor { return null; } + @Override + public Void visitPhysicalPartitionTopN(PhysicalPartitionTopN partitionTopN, PlanContext context) { + if (partitionTopN.getPhase().isTwoPhaseLocal()) { + addRequestPropertyToChildren(PhysicalProperties.ANY); + } else { + Preconditions.checkState(partitionTopN.getPhase().isTwoPhaseGlobal() + || partitionTopN.getPhase().isOnePhaseGlobal(), + "partition topn phase is not two phase global or one phase global"); + PhysicalProperties properties = PhysicalProperties.createHash(partitionTopN.getPartitionKeys(), + ShuffleType.REQUIRE); + addRequestPropertyToChildren(properties); + } + return null; + } + @Override public Void visitPhysicalFileSink(PhysicalFileSink fileSink, PlanContext context) { addRequestPropertyToChildren(PhysicalProperties.GATHER); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalPartitionTopNToPhysicalPartitionTopN.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalPartitionTopNToPhysicalPartitionTopN.java index b7975e7ca6..2b6fcfe464 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalPartitionTopNToPhysicalPartitionTopN.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalPartitionTopNToPhysicalPartitionTopN.java @@ -21,6 +21,9 @@ import org.apache.doris.nereids.properties.OrderKey; import org.apache.doris.nereids.rules.Rule; import org.apache.doris.nereids.rules.RuleType; import org.apache.doris.nereids.trees.expressions.OrderExpression; +import org.apache.doris.nereids.trees.plans.PartitionTopnPhase; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.logical.LogicalPartitionTopN; import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN; import com.google.common.collect.ImmutableList; @@ -33,21 +36,85 @@ import java.util.List; public class LogicalPartitionTopNToPhysicalPartitionTopN extends OneImplementationRuleFactory { @Override public Rule build() { - return logicalPartitionTopN().then(partitionTopN -> { - List orderKeys = !partitionTopN.getOrderKeys().isEmpty() - ? partitionTopN.getOrderKeys().stream() - .map(OrderExpression::getOrderKey) - .collect(ImmutableList.toImmutableList()) : + return logicalPartitionTopN().thenApplyMulti(ctx -> generatePhysicalPartitionTopn(ctx.root)) + .toRule(RuleType.LOGICAL_PARTITION_TOP_N_TO_PHYSICAL_PARTITION_TOP_N_RULE); + } + + private List> generatePhysicalPartitionTopn( + LogicalPartitionTopN logicalPartitionTopN) { + if (logicalPartitionTopN.getPartitionKeys().isEmpty()) { + // if no partition by keys, use local partition topn combined with further full sort + List orderKeys = !logicalPartitionTopN.getOrderKeys().isEmpty() + ? logicalPartitionTopN.getOrderKeys().stream() + .map(OrderExpression::getOrderKey) + .collect(ImmutableList.toImmutableList()) : ImmutableList.of(); - return new PhysicalPartitionTopN<>( - partitionTopN.getFunction(), - partitionTopN.getPartitionKeys(), + PhysicalPartitionTopN onePhaseLocalPartitionTopN = new PhysicalPartitionTopN<>( + logicalPartitionTopN.getFunction(), + logicalPartitionTopN.getPartitionKeys(), orderKeys, - partitionTopN.hasGlobalLimit(), - partitionTopN.getPartitionLimit(), - partitionTopN.getLogicalProperties(), - partitionTopN.child()); - }).toRule(RuleType.LOGICAL_PARTITION_TOP_N_TO_PHYSICAL_PARTITION_TOP_N_RULE); + logicalPartitionTopN.hasGlobalLimit(), + logicalPartitionTopN.getPartitionLimit(), + PartitionTopnPhase.TWO_PHASE_LOCAL_PTOPN, + logicalPartitionTopN.getLogicalProperties(), + logicalPartitionTopN.child(0)); + + return ImmutableList.of(onePhaseLocalPartitionTopN); + } else { + // if partition by keys exist, the order keys will be set as original partition keys combined with + // orderby keys, to meet upper window operator's order requirement. + ImmutableList fullOrderKeys = getAllOrderKeys(logicalPartitionTopN); + PhysicalPartitionTopN onePhaseGlobalPartitionTopN = new PhysicalPartitionTopN<>( + logicalPartitionTopN.getFunction(), + logicalPartitionTopN.getPartitionKeys(), + fullOrderKeys, + logicalPartitionTopN.hasGlobalLimit(), + logicalPartitionTopN.getPartitionLimit(), + PartitionTopnPhase.ONE_PHASE_GLOBAL_PTOPN, + logicalPartitionTopN.getLogicalProperties(), + logicalPartitionTopN.child(0)); + + PhysicalPartitionTopN twoPhaseLocalPartitionTopN = new PhysicalPartitionTopN<>( + logicalPartitionTopN.getFunction(), + logicalPartitionTopN.getPartitionKeys(), + fullOrderKeys, + logicalPartitionTopN.hasGlobalLimit(), + logicalPartitionTopN.getPartitionLimit(), + PartitionTopnPhase.TWO_PHASE_LOCAL_PTOPN, + logicalPartitionTopN.getLogicalProperties(), + logicalPartitionTopN.child(0)); + + PhysicalPartitionTopN twoPhaseGlobalPartitionTopN = new PhysicalPartitionTopN<>( + logicalPartitionTopN.getFunction(), + logicalPartitionTopN.getPartitionKeys(), + fullOrderKeys, + logicalPartitionTopN.hasGlobalLimit(), + logicalPartitionTopN.getPartitionLimit(), + PartitionTopnPhase.TWO_PHASE_GLOBAL_PTOPN, + logicalPartitionTopN.getLogicalProperties(), + twoPhaseLocalPartitionTopN); + + return ImmutableList.of(onePhaseGlobalPartitionTopN, twoPhaseGlobalPartitionTopN); + } + } + + private ImmutableList getAllOrderKeys(LogicalPartitionTopN logicalPartitionTopN) { + ImmutableList.Builder builder = ImmutableList.builder(); + + if (!logicalPartitionTopN.getPartitionKeys().isEmpty()) { + builder.addAll(logicalPartitionTopN.getPartitionKeys().stream().map(partitionKey -> { + return new OrderKey(partitionKey, true, false); + }).collect(ImmutableList.toImmutableList())); + } + + if (!logicalPartitionTopN.getOrderKeys().isEmpty()) { + builder.addAll(logicalPartitionTopN.getOrderKeys().stream() + .map(OrderExpression::getOrderKey) + .collect(ImmutableList.toImmutableList()) + ); + } + + return builder.build(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PartitionTopnPhase.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PartitionTopnPhase.java new file mode 100644 index 0000000000..34520ae6c1 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PartitionTopnPhase.java @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans; + +/** + * Represents different phase of partition topn and map it to the + * enum of partition topn phase definition of stale optimizer. + */ +public enum PartitionTopnPhase { + ONE_PHASE_GLOBAL_PTOPN("OnePhaseGlobalPartitionTopn"), + TWO_PHASE_LOCAL_PTOPN("TwoPhaseLocalPartitionTopn"), + TWO_PHASE_GLOBAL_PTOPN("TwoPhaseGlobalPartitionTopn"); + private final String name; + PartitionTopnPhase(String name) { + this.name = name; + } + + public boolean isOnePhaseGlobal() { + return this == ONE_PHASE_GLOBAL_PTOPN; + } + + public boolean isTwoPhaseLocal() { + return this == TWO_PHASE_LOCAL_PTOPN; + } + + public boolean isTwoPhaseGlobal() { + return this == TWO_PHASE_GLOBAL_PTOPN; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/SortPhase.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/SortPhase.java index 4361aa2d34..7a5157d125 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/SortPhase.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/SortPhase.java @@ -18,8 +18,8 @@ package org.apache.doris.nereids.trees.plans; /** - * Represents different phase of agg and map it to the - * enum of agg phase definition of stale optimizer. + * Represents different phase of sort and map it to the + * enum of sort phase definition of stale optimizer. */ public enum SortPhase { MERGE_SORT("MergeSort"), diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalPartitionTopN.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalPartitionTopN.java index 4166e7d903..971042db42 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalPartitionTopN.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalPartitionTopN.java @@ -23,6 +23,7 @@ import org.apache.doris.nereids.properties.OrderKey; import org.apache.doris.nereids.properties.PhysicalProperties; import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.plans.PartitionTopnPhase; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.WindowFuncType; @@ -48,11 +49,12 @@ public class PhysicalPartitionTopN extends PhysicalUnar private final List orderKeys; private final Boolean hasGlobalLimit; private final long partitionLimit; + private final PartitionTopnPhase phase; public PhysicalPartitionTopN(WindowFuncType function, List partitionKeys, List orderKeys, - Boolean hasGlobalLimit, long partitionLimit, + Boolean hasGlobalLimit, long partitionLimit, PartitionTopnPhase phase, LogicalProperties logicalProperties, CHILD_TYPE child) { - this(function, partitionKeys, orderKeys, hasGlobalLimit, partitionLimit, + this(function, partitionKeys, orderKeys, hasGlobalLimit, partitionLimit, phase, Optional.empty(), logicalProperties, child); } @@ -60,7 +62,7 @@ public class PhysicalPartitionTopN extends PhysicalUnar * Constructor of PhysicalPartitionTopN. */ public PhysicalPartitionTopN(WindowFuncType function, List partitionKeys, List orderKeys, - Boolean hasGlobalLimit, long partitionLimit, + Boolean hasGlobalLimit, long partitionLimit, PartitionTopnPhase phase, Optional groupExpression, LogicalProperties logicalProperties, CHILD_TYPE child) { super(PlanType.PHYSICAL_PARTITION_TOP_N, groupExpression, logicalProperties, child); @@ -69,15 +71,17 @@ public class PhysicalPartitionTopN extends PhysicalUnar this.orderKeys = ImmutableList.copyOf(orderKeys); this.hasGlobalLimit = hasGlobalLimit; this.partitionLimit = partitionLimit; + this.phase = phase; } /** * Constructor of PhysicalPartitionTopN. */ public PhysicalPartitionTopN(WindowFuncType function, List partitionKeys, List orderKeys, - Boolean hasGlobalLimit, long partitionLimit, + Boolean hasGlobalLimit, long partitionLimit, PartitionTopnPhase phase, Optional groupExpression, LogicalProperties logicalProperties, - PhysicalProperties physicalProperties, Statistics statistics, CHILD_TYPE child) { + PhysicalProperties physicalProperties, Statistics statistics, + CHILD_TYPE child) { super(PlanType.PHYSICAL_PARTITION_TOP_N, groupExpression, logicalProperties, physicalProperties, statistics, child); this.function = function; @@ -85,6 +89,7 @@ public class PhysicalPartitionTopN extends PhysicalUnar this.orderKeys = orderKeys; this.hasGlobalLimit = hasGlobalLimit; this.partitionLimit = partitionLimit; + this.phase = phase; } public WindowFuncType getFunction() { @@ -110,6 +115,10 @@ public class PhysicalPartitionTopN extends PhysicalUnar return partitionLimit; } + public PartitionTopnPhase getPhase() { + return phase; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -125,7 +134,8 @@ public class PhysicalPartitionTopN extends PhysicalUnar return Objects.equals(this.function, that.function) && Objects.equals(this.partitionKeys, that.partitionKeys) && Objects.equals(this.orderKeys, that.orderKeys) && this.hasGlobalLimit == that.hasGlobalLimit - && this.partitionLimit == that.partitionLimit; + && this.partitionLimit == that.partitionLimit + && this.phase == that.phase; } @Override @@ -152,28 +162,28 @@ public class PhysicalPartitionTopN extends PhysicalUnar public PhysicalPartitionTopN withChildren(List children) { Preconditions.checkArgument(children.size() == 1); return new PhysicalPartitionTopN<>(function, partitionKeys, orderKeys, hasGlobalLimit, - partitionLimit, groupExpression, getLogicalProperties(), physicalProperties, + partitionLimit, phase, groupExpression, getLogicalProperties(), physicalProperties, statistics, children.get(0)); } @Override public PhysicalPartitionTopN withGroupExpression(Optional groupExpression) { - return new PhysicalPartitionTopN<>(function, partitionKeys, orderKeys, hasGlobalLimit, partitionLimit, - groupExpression, getLogicalProperties(), child()); + return new PhysicalPartitionTopN<>(function, partitionKeys, orderKeys, hasGlobalLimit, partitionLimit, phase, + groupExpression, getLogicalProperties(), child()); } @Override public Plan withGroupExprLogicalPropChildren(Optional groupExpression, Optional logicalProperties, List children) { - return new PhysicalPartitionTopN<>(function, partitionKeys, orderKeys, hasGlobalLimit, partitionLimit, + return new PhysicalPartitionTopN<>(function, partitionKeys, orderKeys, hasGlobalLimit, partitionLimit, phase, groupExpression, logicalProperties.get(), children.get(0)); } @Override public PhysicalPartitionTopN withPhysicalPropertiesAndStats(PhysicalProperties physicalProperties, Statistics statistics) { - return new PhysicalPartitionTopN<>(function, partitionKeys, orderKeys, hasGlobalLimit, partitionLimit, - groupExpression, getLogicalProperties(), physicalProperties, statistics, child()); + return new PhysicalPartitionTopN<>(function, partitionKeys, orderKeys, hasGlobalLimit, partitionLimit, phase, + groupExpression, getLogicalProperties(), physicalProperties, statistics, child()); } @Override @@ -184,7 +194,8 @@ public class PhysicalPartitionTopN extends PhysicalUnar "orderKeys", orderKeys, "hasGlobalLimit", hasGlobalLimit, "partitionLimit", partitionLimit, - "stats", statistics + "stats", statistics, + "phase", phase ); } @@ -195,7 +206,7 @@ public class PhysicalPartitionTopN extends PhysicalUnar @Override public PhysicalPartitionTopN resetLogicalProperties() { - return new PhysicalPartitionTopN<>(function, partitionKeys, orderKeys, hasGlobalLimit, partitionLimit, - groupExpression, null, physicalProperties, statistics, child()); + return new PhysicalPartitionTopN<>(function, partitionKeys, orderKeys, hasGlobalLimit, partitionLimit, phase, + groupExpression, null, physicalProperties, statistics, child()); } } diff --git a/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query67.out b/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query67.out index b9707e1b5a..8161837a95 100644 --- a/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query67.out +++ b/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query67.out @@ -6,7 +6,7 @@ PhysicalResultSink ------PhysicalTopN --------filter((rk <= 100)) ----------PhysicalWindow -------------PhysicalQuickSort +------------PhysicalPartitionTopN --------------PhysicalDistribute ----------------PhysicalPartitionTopN ------------------PhysicalProject