[opt](nereids) enable two phase partition topn opt #23870

Enable two phase partition topn optimization, instead of original full sort at the second phase.
E.g, partial plan of tpcds q67 is as following and a full sort after exchange will have performance impact, especially if the window column's ndv is very high and the number of window is huge.

------PhysicalTopN
--------filter((rk <= 100))
----------PhysicalWindow
------------PhysicalQuickSort
--------------PhysicalDistribute
----------------PhysicalPartitionTopN
------------------PhysicalProject

Under this scenario, the second phase full sort can be transformed to a global PhysicalPartitionTopN and reduce the cost from full sort. The plan will be optimized to the following:

------PhysicalTopN
--------filter((rk <= 100))
----------PhysicalWindow
------------PhysicalPartitionTopN
--------------PhysicalDistribute
----------------PhysicalPartitionTopN
------------------PhysicalProject
This commit is contained in:
xzj7019
2023-09-15 10:30:34 +08:00
committed by GitHub
parent 23f01ddf3a
commit 00bb32cfc0
8 changed files with 200 additions and 33 deletions

View File

@ -348,8 +348,18 @@ public class ChildOutputPropertyDeriver extends PlanVisitor<PhysicalProperties,
public PhysicalProperties visitPhysicalPartitionTopN(PhysicalPartitionTopN<? extends Plan> partitionTopN,
PlanContext context) {
Preconditions.checkState(childrenOutputProperties.size() == 1);
PhysicalProperties childOutputProperty = childrenOutputProperties.get(0);
return new PhysicalProperties(childOutputProperty.getDistributionSpec());
DistributionSpec childDistSpec = childrenOutputProperties.get(0).getDistributionSpec();
if (partitionTopN.getPhase().isTwoPhaseLocal() || partitionTopN.getPhase().isOnePhaseGlobal()) {
return new PhysicalProperties(childDistSpec);
} else {
Preconditions.checkState(partitionTopN.getPhase().isTwoPhaseGlobal(),
"partition topn phase is not two phase global");
Preconditions.checkState(childDistSpec instanceof DistributionSpecHash,
"child dist spec is not hash spec");
return new PhysicalProperties(childDistSpec, new OrderSpec(partitionTopN.getOrderKeys()));
}
}
@Override

View File

@ -38,6 +38,7 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN;
import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSetOperation;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
@ -153,6 +154,23 @@ public class ChildrenPropertiesRegulator extends PlanVisitor<Boolean, Void> {
return true;
}
@Override
public Boolean visitPhysicalPartitionTopN(PhysicalPartitionTopN<? extends Plan> partitionTopN, Void context) {
if (partitionTopN.getPhase().isOnePhaseGlobal() && children.get(0).getPlan() instanceof PhysicalDistribute) {
// one phase partition topn, if the child is an enforced distribution, discard this
// and use two phase candidate.
return false;
} else if (partitionTopN.getPhase().isTwoPhaseGlobal()
&& !(children.get(0).getPlan() instanceof PhysicalDistribute)) {
// two phase partition topn, if global's child is not distribution, which means
// the local distribution has met final requirement, discard this candidate.
return false;
} else {
visit(partitionTopN, context);
return true;
}
}
@Override
public Boolean visitPhysicalFilter(PhysicalFilter<? extends Plan> filter, Void context) {
// do not process must shuffle

View File

@ -37,6 +37,7 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit;
import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN;
import org.apache.doris.nereids.trees.plans.physical.PhysicalResultSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSetOperation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalUnion;
@ -44,6 +45,7 @@ import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.JoinUtils;
import org.apache.doris.qe.ConnectContext;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
@ -250,6 +252,21 @@ public class RequestPropertyDeriver extends PlanVisitor<Void, PlanContext> {
return null;
}
@Override
public Void visitPhysicalPartitionTopN(PhysicalPartitionTopN<? extends Plan> partitionTopN, PlanContext context) {
if (partitionTopN.getPhase().isTwoPhaseLocal()) {
addRequestPropertyToChildren(PhysicalProperties.ANY);
} else {
Preconditions.checkState(partitionTopN.getPhase().isTwoPhaseGlobal()
|| partitionTopN.getPhase().isOnePhaseGlobal(),
"partition topn phase is not two phase global or one phase global");
PhysicalProperties properties = PhysicalProperties.createHash(partitionTopN.getPartitionKeys(),
ShuffleType.REQUIRE);
addRequestPropertyToChildren(properties);
}
return null;
}
@Override
public Void visitPhysicalFileSink(PhysicalFileSink<? extends Plan> fileSink, PlanContext context) {
addRequestPropertyToChildren(PhysicalProperties.GATHER);

View File

@ -21,6 +21,9 @@ import org.apache.doris.nereids.properties.OrderKey;
import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.trees.expressions.OrderExpression;
import org.apache.doris.nereids.trees.plans.PartitionTopnPhase;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalPartitionTopN;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN;
import com.google.common.collect.ImmutableList;
@ -33,21 +36,85 @@ import java.util.List;
public class LogicalPartitionTopNToPhysicalPartitionTopN extends OneImplementationRuleFactory {
@Override
public Rule build() {
return logicalPartitionTopN().then(partitionTopN -> {
List<OrderKey> orderKeys = !partitionTopN.getOrderKeys().isEmpty()
? partitionTopN.getOrderKeys().stream()
.map(OrderExpression::getOrderKey)
.collect(ImmutableList.toImmutableList()) :
return logicalPartitionTopN().thenApplyMulti(ctx -> generatePhysicalPartitionTopn(ctx.root))
.toRule(RuleType.LOGICAL_PARTITION_TOP_N_TO_PHYSICAL_PARTITION_TOP_N_RULE);
}
private List<PhysicalPartitionTopN<? extends Plan>> generatePhysicalPartitionTopn(
LogicalPartitionTopN<? extends Plan> logicalPartitionTopN) {
if (logicalPartitionTopN.getPartitionKeys().isEmpty()) {
// if no partition by keys, use local partition topn combined with further full sort
List<OrderKey> orderKeys = !logicalPartitionTopN.getOrderKeys().isEmpty()
? logicalPartitionTopN.getOrderKeys().stream()
.map(OrderExpression::getOrderKey)
.collect(ImmutableList.toImmutableList()) :
ImmutableList.of();
return new PhysicalPartitionTopN<>(
partitionTopN.getFunction(),
partitionTopN.getPartitionKeys(),
PhysicalPartitionTopN<Plan> onePhaseLocalPartitionTopN = new PhysicalPartitionTopN<>(
logicalPartitionTopN.getFunction(),
logicalPartitionTopN.getPartitionKeys(),
orderKeys,
partitionTopN.hasGlobalLimit(),
partitionTopN.getPartitionLimit(),
partitionTopN.getLogicalProperties(),
partitionTopN.child());
}).toRule(RuleType.LOGICAL_PARTITION_TOP_N_TO_PHYSICAL_PARTITION_TOP_N_RULE);
logicalPartitionTopN.hasGlobalLimit(),
logicalPartitionTopN.getPartitionLimit(),
PartitionTopnPhase.TWO_PHASE_LOCAL_PTOPN,
logicalPartitionTopN.getLogicalProperties(),
logicalPartitionTopN.child(0));
return ImmutableList.of(onePhaseLocalPartitionTopN);
} else {
// if partition by keys exist, the order keys will be set as original partition keys combined with
// orderby keys, to meet upper window operator's order requirement.
ImmutableList<OrderKey> fullOrderKeys = getAllOrderKeys(logicalPartitionTopN);
PhysicalPartitionTopN<Plan> onePhaseGlobalPartitionTopN = new PhysicalPartitionTopN<>(
logicalPartitionTopN.getFunction(),
logicalPartitionTopN.getPartitionKeys(),
fullOrderKeys,
logicalPartitionTopN.hasGlobalLimit(),
logicalPartitionTopN.getPartitionLimit(),
PartitionTopnPhase.ONE_PHASE_GLOBAL_PTOPN,
logicalPartitionTopN.getLogicalProperties(),
logicalPartitionTopN.child(0));
PhysicalPartitionTopN<Plan> twoPhaseLocalPartitionTopN = new PhysicalPartitionTopN<>(
logicalPartitionTopN.getFunction(),
logicalPartitionTopN.getPartitionKeys(),
fullOrderKeys,
logicalPartitionTopN.hasGlobalLimit(),
logicalPartitionTopN.getPartitionLimit(),
PartitionTopnPhase.TWO_PHASE_LOCAL_PTOPN,
logicalPartitionTopN.getLogicalProperties(),
logicalPartitionTopN.child(0));
PhysicalPartitionTopN<Plan> twoPhaseGlobalPartitionTopN = new PhysicalPartitionTopN<>(
logicalPartitionTopN.getFunction(),
logicalPartitionTopN.getPartitionKeys(),
fullOrderKeys,
logicalPartitionTopN.hasGlobalLimit(),
logicalPartitionTopN.getPartitionLimit(),
PartitionTopnPhase.TWO_PHASE_GLOBAL_PTOPN,
logicalPartitionTopN.getLogicalProperties(),
twoPhaseLocalPartitionTopN);
return ImmutableList.of(onePhaseGlobalPartitionTopN, twoPhaseGlobalPartitionTopN);
}
}
private ImmutableList<OrderKey> getAllOrderKeys(LogicalPartitionTopN<? extends Plan> logicalPartitionTopN) {
ImmutableList.Builder<OrderKey> builder = ImmutableList.builder();
if (!logicalPartitionTopN.getPartitionKeys().isEmpty()) {
builder.addAll(logicalPartitionTopN.getPartitionKeys().stream().map(partitionKey -> {
return new OrderKey(partitionKey, true, false);
}).collect(ImmutableList.toImmutableList()));
}
if (!logicalPartitionTopN.getOrderKeys().isEmpty()) {
builder.addAll(logicalPartitionTopN.getOrderKeys().stream()
.map(OrderExpression::getOrderKey)
.collect(ImmutableList.toImmutableList())
);
}
return builder.build();
}
}

View File

@ -0,0 +1,44 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.plans;
/**
* Represents different phase of partition topn and map it to the
* enum of partition topn phase definition of stale optimizer.
*/
public enum PartitionTopnPhase {
ONE_PHASE_GLOBAL_PTOPN("OnePhaseGlobalPartitionTopn"),
TWO_PHASE_LOCAL_PTOPN("TwoPhaseLocalPartitionTopn"),
TWO_PHASE_GLOBAL_PTOPN("TwoPhaseGlobalPartitionTopn");
private final String name;
PartitionTopnPhase(String name) {
this.name = name;
}
public boolean isOnePhaseGlobal() {
return this == ONE_PHASE_GLOBAL_PTOPN;
}
public boolean isTwoPhaseLocal() {
return this == TWO_PHASE_LOCAL_PTOPN;
}
public boolean isTwoPhaseGlobal() {
return this == TWO_PHASE_GLOBAL_PTOPN;
}
}

View File

@ -18,8 +18,8 @@
package org.apache.doris.nereids.trees.plans;
/**
* Represents different phase of agg and map it to the
* enum of agg phase definition of stale optimizer.
* Represents different phase of sort and map it to the
* enum of sort phase definition of stale optimizer.
*/
public enum SortPhase {
MERGE_SORT("MergeSort"),

View File

@ -23,6 +23,7 @@ import org.apache.doris.nereids.properties.OrderKey;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.plans.PartitionTopnPhase;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.WindowFuncType;
@ -48,11 +49,12 @@ public class PhysicalPartitionTopN<CHILD_TYPE extends Plan> extends PhysicalUnar
private final List<OrderKey> orderKeys;
private final Boolean hasGlobalLimit;
private final long partitionLimit;
private final PartitionTopnPhase phase;
public PhysicalPartitionTopN(WindowFuncType function, List<Expression> partitionKeys, List<OrderKey> orderKeys,
Boolean hasGlobalLimit, long partitionLimit,
Boolean hasGlobalLimit, long partitionLimit, PartitionTopnPhase phase,
LogicalProperties logicalProperties, CHILD_TYPE child) {
this(function, partitionKeys, orderKeys, hasGlobalLimit, partitionLimit,
this(function, partitionKeys, orderKeys, hasGlobalLimit, partitionLimit, phase,
Optional.empty(), logicalProperties, child);
}
@ -60,7 +62,7 @@ public class PhysicalPartitionTopN<CHILD_TYPE extends Plan> extends PhysicalUnar
* Constructor of PhysicalPartitionTopN.
*/
public PhysicalPartitionTopN(WindowFuncType function, List<Expression> partitionKeys, List<OrderKey> orderKeys,
Boolean hasGlobalLimit, long partitionLimit,
Boolean hasGlobalLimit, long partitionLimit, PartitionTopnPhase phase,
Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties,
CHILD_TYPE child) {
super(PlanType.PHYSICAL_PARTITION_TOP_N, groupExpression, logicalProperties, child);
@ -69,15 +71,17 @@ public class PhysicalPartitionTopN<CHILD_TYPE extends Plan> extends PhysicalUnar
this.orderKeys = ImmutableList.copyOf(orderKeys);
this.hasGlobalLimit = hasGlobalLimit;
this.partitionLimit = partitionLimit;
this.phase = phase;
}
/**
* Constructor of PhysicalPartitionTopN.
*/
public PhysicalPartitionTopN(WindowFuncType function, List<Expression> partitionKeys, List<OrderKey> orderKeys,
Boolean hasGlobalLimit, long partitionLimit,
Boolean hasGlobalLimit, long partitionLimit, PartitionTopnPhase phase,
Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties,
PhysicalProperties physicalProperties, Statistics statistics, CHILD_TYPE child) {
PhysicalProperties physicalProperties, Statistics statistics,
CHILD_TYPE child) {
super(PlanType.PHYSICAL_PARTITION_TOP_N, groupExpression, logicalProperties, physicalProperties,
statistics, child);
this.function = function;
@ -85,6 +89,7 @@ public class PhysicalPartitionTopN<CHILD_TYPE extends Plan> extends PhysicalUnar
this.orderKeys = orderKeys;
this.hasGlobalLimit = hasGlobalLimit;
this.partitionLimit = partitionLimit;
this.phase = phase;
}
public WindowFuncType getFunction() {
@ -110,6 +115,10 @@ public class PhysicalPartitionTopN<CHILD_TYPE extends Plan> extends PhysicalUnar
return partitionLimit;
}
public PartitionTopnPhase getPhase() {
return phase;
}
@Override
public boolean equals(Object o) {
if (this == o) {
@ -125,7 +134,8 @@ public class PhysicalPartitionTopN<CHILD_TYPE extends Plan> extends PhysicalUnar
return Objects.equals(this.function, that.function)
&& Objects.equals(this.partitionKeys, that.partitionKeys)
&& Objects.equals(this.orderKeys, that.orderKeys) && this.hasGlobalLimit == that.hasGlobalLimit
&& this.partitionLimit == that.partitionLimit;
&& this.partitionLimit == that.partitionLimit
&& this.phase == that.phase;
}
@Override
@ -152,28 +162,28 @@ public class PhysicalPartitionTopN<CHILD_TYPE extends Plan> extends PhysicalUnar
public PhysicalPartitionTopN<Plan> withChildren(List<Plan> children) {
Preconditions.checkArgument(children.size() == 1);
return new PhysicalPartitionTopN<>(function, partitionKeys, orderKeys, hasGlobalLimit,
partitionLimit, groupExpression, getLogicalProperties(), physicalProperties,
partitionLimit, phase, groupExpression, getLogicalProperties(), physicalProperties,
statistics, children.get(0));
}
@Override
public PhysicalPartitionTopN<CHILD_TYPE> withGroupExpression(Optional<GroupExpression> groupExpression) {
return new PhysicalPartitionTopN<>(function, partitionKeys, orderKeys, hasGlobalLimit, partitionLimit,
groupExpression, getLogicalProperties(), child());
return new PhysicalPartitionTopN<>(function, partitionKeys, orderKeys, hasGlobalLimit, partitionLimit, phase,
groupExpression, getLogicalProperties(), child());
}
@Override
public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties, List<Plan> children) {
return new PhysicalPartitionTopN<>(function, partitionKeys, orderKeys, hasGlobalLimit, partitionLimit,
return new PhysicalPartitionTopN<>(function, partitionKeys, orderKeys, hasGlobalLimit, partitionLimit, phase,
groupExpression, logicalProperties.get(), children.get(0));
}
@Override
public PhysicalPartitionTopN<CHILD_TYPE> withPhysicalPropertiesAndStats(PhysicalProperties physicalProperties,
Statistics statistics) {
return new PhysicalPartitionTopN<>(function, partitionKeys, orderKeys, hasGlobalLimit, partitionLimit,
groupExpression, getLogicalProperties(), physicalProperties, statistics, child());
return new PhysicalPartitionTopN<>(function, partitionKeys, orderKeys, hasGlobalLimit, partitionLimit, phase,
groupExpression, getLogicalProperties(), physicalProperties, statistics, child());
}
@Override
@ -184,7 +194,8 @@ public class PhysicalPartitionTopN<CHILD_TYPE extends Plan> extends PhysicalUnar
"orderKeys", orderKeys,
"hasGlobalLimit", hasGlobalLimit,
"partitionLimit", partitionLimit,
"stats", statistics
"stats", statistics,
"phase", phase
);
}
@ -195,7 +206,7 @@ public class PhysicalPartitionTopN<CHILD_TYPE extends Plan> extends PhysicalUnar
@Override
public PhysicalPartitionTopN<CHILD_TYPE> resetLogicalProperties() {
return new PhysicalPartitionTopN<>(function, partitionKeys, orderKeys, hasGlobalLimit, partitionLimit,
groupExpression, null, physicalProperties, statistics, child());
return new PhysicalPartitionTopN<>(function, partitionKeys, orderKeys, hasGlobalLimit, partitionLimit, phase,
groupExpression, null, physicalProperties, statistics, child());
}
}