[feature](Nereids) add rule split limit into two phase (#16797)

1. Add a rule split limit, like Limit(Origin) ==> Limit(Global) -> Gather -> Limit(Local)
2. Add a rule: limit-> sort ==> topN
3. fix a bug about topN
4. make the type of limit,offset long in topN
And because this rule is always beneficial, we add a rule in the rewrite phase
This commit is contained in:
谢健
2023-03-07 15:34:12 +08:00
committed by GitHub
parent caacee253d
commit 704faaed84
30 changed files with 367 additions and 148 deletions

View File

@ -808,9 +808,13 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
PlanFragment inputFragment = topN.child(0).accept(this, context);
PlanFragment currentFragment = inputFragment;
//1. generate new fragment for sort when the child is exchangeNode
if (inputFragment.getPlanRoot() instanceof ExchangeNode) {
Preconditions.checkArgument(!topN.getSortPhase().isLocal());
//1. Generate new fragment for sort when the child is exchangeNode, If the child is
// mergingExchange, it means we have always generated a new fragment when processing mergeSort
if (inputFragment.getPlanRoot() instanceof ExchangeNode
&& !((ExchangeNode) inputFragment.getPlanRoot()).isMergingExchange()) {
// Except LocalTopN->MergeTopN, we don't allow localTopN's child is Exchange Node
Preconditions.checkArgument(!topN.getSortPhase().isLocal(),
"local sort requires any property but child is" + inputFragment.getPlanRoot());
DataPartition outputPartition = DataPartition.UNPARTITIONED;
ExchangeNode exchangeNode = (ExchangeNode) inputFragment.getPlanRoot();
inputFragment.setOutputPartition(outputPartition);
@ -822,20 +826,28 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
// 2. According to the type of sort, generate physical plan
if (!topN.getSortPhase().isMerge()) {
// For localSort or Gather->Sort, we just need to add sortNode
// For localSort or Gather->Sort, we just need to add TopNNode
SortNode sortNode = translateSortNode(topN, inputFragment.getPlanRoot(), context);
sortNode.setOffset(topN.getOffset());
sortNode.setLimit(topN.getLimit());
currentFragment.addPlanRoot(sortNode);
} else {
// For mergeSort, we need to push sortInfo to exchangeNode
if (!(currentFragment.getPlanRoot() instanceof ExchangeNode)) {
// if there is no exchange node for mergeSort
// e.g., localSort -> mergeSort
// e.g., mergeTopN -> localTopN
// It means the local has satisfied the Gather property. We can just ignore mergeSort
currentFragment.getPlanRoot().setOffset(topN.getOffset());
currentFragment.getPlanRoot().setLimit(topN.getLimit());
return currentFragment;
}
Preconditions.checkArgument(inputFragment.getPlanRoot() instanceof SortNode);
Preconditions.checkArgument(inputFragment.getPlanRoot() instanceof SortNode,
"mergeSort' child must be sortNode");
SortNode sortNode = (SortNode) inputFragment.getPlanRoot();
((ExchangeNode) currentFragment.getPlanRoot()).setMergeInfo(sortNode.getSortInfo());
ExchangeNode exchangeNode = (ExchangeNode) currentFragment.getPlanRoot();
exchangeNode.setMergeInfo(sortNode.getSortInfo());
exchangeNode.setLimit(topN.getLimit());
exchangeNode.setOffset(topN.getOffset());
}
return currentFragment;
}
@ -1388,38 +1400,12 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
if (inputFragment == null) {
return inputFragment;
}
// For case globalLimit(l, o) -> LocalLimit(l+o, 0), that is the LocalLimit has already gathered
// The globalLimit can overwrite the limit and offset, so it's still correct
PlanNode child = inputFragment.getPlanRoot();
// physical plan: limit --> sort
// after translate, it could be:
// 1. limit->sort => set (limit and offset) on sort
// 2. limit->exchange->sort => set (limit and offset) on exchange, set sort.limit = limit+offset
if (child instanceof SortNode) {
SortNode sort = (SortNode) child;
sort.setLimit(physicalLimit.getLimit());
sort.setOffset(physicalLimit.getOffset());
return inputFragment;
}
if (child instanceof ExchangeNode) {
ExchangeNode exchangeNode = (ExchangeNode) child;
exchangeNode.setLimit(physicalLimit.getLimit());
// we do not check if this is a merging exchange here,
// since this guaranteed by translating logic plan to physical plan
exchangeNode.setOffset(physicalLimit.getOffset());
if (exchangeNode.getChild(0) instanceof SortNode) {
SortNode sort = (SortNode) exchangeNode.getChild(0);
sort.setLimit(physicalLimit.getLimit() + physicalLimit.getOffset());
sort.setOffset(0);
}
return inputFragment;
}
// for other PlanNode, just set limit as limit+offset
child.setLimit(physicalLimit.getLimit() + physicalLimit.getOffset());
PlanFragment planFragment = exchangeToMergeFragment(inputFragment, context);
planFragment.getPlanRoot().setLimit(physicalLimit.getLimit());
planFragment.getPlanRoot().setOffSetDirectly(physicalLimit.getOffset());
return planFragment;
child.setLimit(physicalLimit.getLimit());
child.setOffset(physicalLimit.getOffset());
return inputFragment;
}
@Override

View File

@ -61,6 +61,7 @@ import org.apache.doris.nereids.rules.rewrite.logical.PruneOlapScanTablet;
import org.apache.doris.nereids.rules.rewrite.logical.PushFilterInsideJoin;
import org.apache.doris.nereids.rules.rewrite.logical.PushdownLimit;
import org.apache.doris.nereids.rules.rewrite.logical.ReorderJoin;
import org.apache.doris.nereids.rules.rewrite.logical.SplitLimit;
import java.util.List;
@ -192,6 +193,7 @@ public class NereidsRewriter extends BatchRewriteJob {
new EliminateAggregate(),
new MergeSetOperations(),
new PushdownLimit(),
new SplitLimit(),
new BuildAggForUnion()
)),

View File

@ -194,6 +194,7 @@ import org.apache.doris.nereids.trees.expressions.literal.TinyIntLiteral;
import org.apache.doris.nereids.trees.expressions.literal.VarcharLiteral;
import org.apache.doris.nereids.trees.plans.JoinHint;
import org.apache.doris.nereids.trees.plans.JoinType;
import org.apache.doris.nereids.trees.plans.LimitPhase;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.algebra.Aggregate;
import org.apache.doris.nereids.trees.plans.algebra.SetOperation.Qualifier;
@ -1346,7 +1347,7 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> {
if (offsetToken != null) {
offset = Long.parseLong(offsetToken.getText());
}
return new LogicalLimit<>(limit, offset, input);
return new LogicalLimit<>(limit, offset, LimitPhase.ORIGIN, input);
});
}

View File

@ -26,6 +26,7 @@ import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.functions.table.TableValuedFunction;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalSort;
import org.apache.doris.nereids.trees.plans.physical.PhysicalAssertNumRows;
import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute;
import org.apache.doris.nereids.trees.plans.physical.PhysicalEsScan;
@ -39,10 +40,8 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit;
import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
import org.apache.doris.nereids.trees.plans.physical.PhysicalQuickSort;
import org.apache.doris.nereids.trees.plans.physical.PhysicalStorageLayerAggregate;
import org.apache.doris.nereids.trees.plans.physical.PhysicalTVFRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalTopN;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.JoinUtils;
@ -105,13 +104,8 @@ public class ChildOutputPropertyDeriver extends PlanVisitor<PhysicalProperties,
}
@Override
public PhysicalProperties visitPhysicalTopN(PhysicalTopN<? extends Plan> topN, PlanContext context) {
Preconditions.checkState(childrenOutputProperties.size() == 1);
return new PhysicalProperties(DistributionSpecGather.INSTANCE, new OrderSpec(topN.getOrderKeys()));
}
@Override
public PhysicalProperties visitPhysicalQuickSort(PhysicalQuickSort<? extends Plan> sort, PlanContext context) {
public PhysicalProperties visitAbstractPhysicalSort(AbstractPhysicalSort<? extends Plan> sort,
PlanContext context) {
Preconditions.checkState(childrenOutputProperties.size() == 1);
if (sort.getSortPhase().isLocal()) {
return new PhysicalProperties(

View File

@ -29,11 +29,12 @@ import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction;
import org.apache.doris.nereids.trees.plans.JoinHint;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalSort;
import org.apache.doris.nereids.trees.plans.physical.PhysicalAssertNumRows;
import org.apache.doris.nereids.trees.plans.physical.PhysicalGenerate;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit;
import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalQuickSort;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.ExpressionUtils;
import org.apache.doris.nereids.util.JoinUtils;
@ -94,7 +95,7 @@ public class RequestPropertyDeriver extends PlanVisitor<Void, PlanContext> {
}
@Override
public Void visitPhysicalQuickSort(PhysicalQuickSort<? extends Plan> sort, PlanContext context) {
public Void visitAbstractPhysicalSort(AbstractPhysicalSort<? extends Plan> sort, PlanContext context) {
if (!sort.getSortPhase().isLocal()) {
addRequestPropertyToChildren(PhysicalProperties.GATHER);
} else {
@ -103,6 +104,16 @@ public class RequestPropertyDeriver extends PlanVisitor<Void, PlanContext> {
return null;
}
@Override
public Void visitPhysicalLimit(PhysicalLimit<? extends Plan> limit, PlanContext context) {
if (limit.isGlobal()) {
addRequestPropertyToChildren(PhysicalProperties.GATHER);
} else {
addRequestPropertyToChildren(PhysicalProperties.ANY);
}
return null;
}
@Override
public Void visitPhysicalHashJoin(PhysicalHashJoin<? extends Plan, ? extends Plan> hashJoin, PlanContext context) {
JoinHint hint = hashJoin.getHint();

View File

@ -188,13 +188,15 @@ public enum RuleType {
INNER_TO_CROSS_JOIN(RuleTypeClass.REWRITE),
REWRITE_SENTINEL(RuleTypeClass.REWRITE),
// split limit
SPLIT_LIMIT(RuleTypeClass.REWRITE),
// limit push down
PUSH_LIMIT_THROUGH_JOIN(RuleTypeClass.REWRITE),
PUSH_LIMIT_THROUGH_PROJECT_JOIN(RuleTypeClass.REWRITE),
PUSH_LIMIT_THROUGH_UNION(RuleTypeClass.REWRITE),
PUSH_LIMIT_THROUGH_ONE_ROW_RELATION(RuleTypeClass.REWRITE),
PUSH_LIMIT_THROUGH_EMPTY_RELATION(RuleTypeClass.REWRITE),
PUSH_LIMIT_INTO_SORT(RuleTypeClass.REWRITE),
// adjust nullable
ADJUST_NULLABLE_ON_AGGREGATE(RuleTypeClass.REWRITE),
ADJUST_NULLABLE_ON_ASSERT_NUM_ROWS(RuleTypeClass.REWRITE),

View File

@ -30,6 +30,7 @@ public class LogicalLimitToPhysicalLimit extends OneImplementationRuleFactory {
return logicalLimit().then(limit -> new PhysicalLimit<>(
limit.getLimit(),
limit.getOffset(),
limit.getPhase(),
limit.getLogicalProperties(),
limit.child())
).toRule(RuleType.LOGICAL_LIMIT_TO_PHYSICAL_LIMIT_RULE);

View File

@ -38,10 +38,16 @@ public class LogicalTopNToPhysicalTopN extends OneImplementationRuleFactory {
.toRule(RuleType.LOGICAL_TOP_N_TO_PHYSICAL_TOP_N_RULE);
}
/**
* before: logicalTopN(off, limit)
* after:
* gatherTopN(limit, off, require gather)
* mergeTopN(limit, off, require gather) -> localTopN(off+limit, 0, require any)
*/
private List<PhysicalTopN<? extends Plan>> twoPhaseSort(LogicalTopN logicalTopN) {
PhysicalTopN localSort = new PhysicalTopN(logicalTopN.getOrderKeys(), logicalTopN.getLimit(),
logicalTopN.getOffset(), SortPhase.LOCAL_SORT, logicalTopN.getLogicalProperties(), logicalTopN.child(0)
);
PhysicalTopN localSort = new PhysicalTopN(logicalTopN.getOrderKeys(),
logicalTopN.getLimit() + logicalTopN.getOffset(), 0, SortPhase.LOCAL_SORT,
logicalTopN.getLogicalProperties(), logicalTopN.child(0));
PhysicalTopN twoPhaseSort = new PhysicalTopN<>(logicalTopN.getOrderKeys(), logicalTopN.getLimit(),
logicalTopN.getOffset(), SortPhase.MERGE_SORT, logicalTopN.getLogicalProperties(), localSort);
PhysicalTopN onePhaseSort = new PhysicalTopN<>(logicalTopN.getOrderKeys(), logicalTopN.getLimit(),

View File

@ -28,6 +28,7 @@ import org.apache.doris.nereids.trees.expressions.functions.agg.Count;
import org.apache.doris.nereids.trees.expressions.literal.IntegerLiteral;
import org.apache.doris.nereids.trees.plans.JoinHint;
import org.apache.doris.nereids.trees.plans.JoinType;
import org.apache.doris.nereids.trees.plans.LimitPhase;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
import org.apache.doris.nereids.trees.plans.logical.LogicalApply;
@ -117,7 +118,7 @@ public class ExistsApplyToJoin extends OneRewriteRuleFactory {
}
private Plan unCorrelatedNotExist(LogicalApply unapply) {
LogicalLimit newLimit = new LogicalLimit<>(1, 0, (LogicalPlan) unapply.right());
LogicalLimit newLimit = new LogicalLimit<>(1, 0, LimitPhase.ORIGIN, (LogicalPlan) unapply.right());
Alias alias = new Alias(new Count(), "count(*)");
LogicalAggregate newAgg = new LogicalAggregate<>(new ArrayList<>(),
ImmutableList.of(alias), newLimit);
@ -128,7 +129,7 @@ public class ExistsApplyToJoin extends OneRewriteRuleFactory {
}
private Plan unCorrelatedExist(LogicalApply unapply) {
LogicalLimit newLimit = new LogicalLimit<>(1, 0, (LogicalPlan) unapply.right());
LogicalLimit newLimit = new LogicalLimit<>(1, 0, LimitPhase.ORIGIN, (LogicalPlan) unapply.right());
return new LogicalJoin<>(JoinType.CROSS_JOIN, (LogicalPlan) unapply.left(), newLimit);
}
}

View File

@ -42,13 +42,16 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalLimit;
public class MergeLimits extends OneRewriteRuleFactory {
@Override
public Rule build() {
return logicalLimit(logicalLimit()).then(upperLimit -> {
LogicalLimit<? extends Plan> bottomLimit = upperLimit.child();
return new LogicalLimit<>(
Math.min(upperLimit.getLimit(), Math.max(bottomLimit.getLimit() - upperLimit.getOffset(), 0)),
bottomLimit.getOffset() + upperLimit.getOffset(),
bottomLimit.child()
);
}).toRule(RuleType.MERGE_LIMITS);
return logicalLimit(logicalLimit())
.when(upperLimit -> upperLimit.getPhase().equals(upperLimit.child().getPhase()))
.then(upperLimit -> {
LogicalLimit<? extends Plan> bottomLimit = upperLimit.child();
return new LogicalLimit<>(
Math.min(upperLimit.getLimit(),
Math.max(bottomLimit.getLimit() - upperLimit.getOffset(), 0)),
bottomLimit.getOffset() + upperLimit.getOffset(),
bottomLimit.getPhase(), bottomLimit.child()
);
}).toRule(RuleType.MERGE_LIMITS);
}
}

View File

@ -28,6 +28,8 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalEmptyRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
import org.apache.doris.nereids.trees.plans.logical.LogicalLimit;
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
import org.apache.doris.nereids.trees.plans.logical.LogicalSort;
import org.apache.doris.nereids.trees.plans.logical.LogicalTopN;
import org.apache.doris.nereids.trees.plans.logical.LogicalUnion;
import com.google.common.collect.ImmutableList;
@ -82,6 +84,16 @@ public class PushdownLimit implements RewriteRuleFactory {
return limit.withChildren(union.withChildren(newUnionChildren));
})
.toRule(RuleType.PUSH_LIMIT_THROUGH_UNION),
// limit -> sort ==> topN
logicalLimit(logicalSort())
.then(limit -> {
LogicalSort sort = limit.child();
LogicalTopN topN = new LogicalTopN(sort.getOrderKeys(),
limit.getLimit(),
limit.getOffset(),
sort.child(0));
return topN;
}).toRule(RuleType.PUSH_LIMIT_INTO_SORT),
logicalLimit(logicalOneRowRelation())
.then(limit -> limit.getLimit() > 0
? limit.child() : new LogicalEmptyRelation(limit.child().getOutput()))

View File

@ -51,8 +51,8 @@ public class PushdownProjectThroughLimit extends OneRewriteRuleFactory {
return logicalProject(logicalLimit(any())).thenApply(ctx -> {
LogicalProject<LogicalLimit<Plan>> logicalProject = ctx.root;
LogicalLimit<Plan> logicalLimit = logicalProject.child();
return new LogicalLimit<>(logicalLimit.getLimit(),
logicalLimit.getOffset(), new LogicalProject<>(logicalProject.getProjects(),
return new LogicalLimit<>(logicalLimit.getLimit(), logicalLimit.getOffset(),
logicalLimit.getPhase(), new LogicalProject<>(logicalProject.getProjects(),
logicalLimit.child()));
}).toRule(RuleType.PUSHDOWN_PROJECT_THROUGH_LIMIT);
}

View File

@ -0,0 +1,48 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.rules.rewrite.logical;
import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.rules.rewrite.OneRewriteRuleFactory;
import org.apache.doris.nereids.trees.plans.LimitPhase;
import org.apache.doris.nereids.trees.plans.logical.LogicalLimit;
/**
* Split limit into two phase
* before:
* Limit(origin) limit, offset
* after:
* Limit(global) limit, offset
* |
* Limit(local) limit + offset, 0
*/
public class SplitLimit extends OneRewriteRuleFactory {
@Override
public Rule build() {
return logicalLimit().when(limit -> !limit.isSplit())
.then(limit -> {
long l = limit.getLimit();
long o = limit.getOffset();
return new LogicalLimit<>(l, o,
LimitPhase.GLOBAL, new LogicalLimit<>(l + o, 0, LimitPhase.LOCAL, limit.child())
);
}).toRule(RuleType.SPLIT_LIMIT);
}
}

View File

@ -0,0 +1,38 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.plans;
/**
* Limit phase for logical and physical limit, like
* LocalLimit -> Gather -> GlobalLimit
* Origin is used to mark the limit operator that has not been split into 2-phase
*/
public enum LimitPhase {
LOCAL("LOCAL"),
GLOBAL("GLOBAL"),
ORIGIN("ORIGIN");
private final String name;
LimitPhase(String name) {
this.name = name;
}
public boolean isLocal() {
return this == LOCAL;
}
}

View File

@ -22,7 +22,7 @@ package org.apache.doris.nereids.trees.plans.algebra;
*/
public interface TopN extends Sort {
int getOffset();
long getOffset();
int getLimit();
long getLimit();
}

View File

@ -21,6 +21,7 @@ import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.plans.LimitPhase;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.algebra.Limit;
@ -44,19 +45,28 @@ import java.util.Optional;
* offset 100
*/
public class LogicalLimit<CHILD_TYPE extends Plan> extends LogicalUnary<CHILD_TYPE> implements Limit {
private final LimitPhase phase;
private final long limit;
private final long offset;
public LogicalLimit(long limit, long offset, CHILD_TYPE child) {
this(limit, offset, Optional.empty(), Optional.empty(), child);
public LogicalLimit(long limit, long offset, LimitPhase phase, CHILD_TYPE child) {
this(limit, offset, phase, Optional.empty(), Optional.empty(), child);
}
public LogicalLimit(long limit, long offset, Optional<GroupExpression> groupExpression,
public LogicalLimit(long limit, long offset, LimitPhase phase, Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties, CHILD_TYPE child) {
super(PlanType.LOGICAL_LIMIT, groupExpression, logicalProperties, child);
this.limit = limit;
this.offset = offset;
this.phase = phase;
}
public LimitPhase getPhase() {
return phase;
}
public boolean isSplit() {
return phase != LimitPhase.ORIGIN;
}
public long getLimit() {
@ -94,7 +104,7 @@ public class LogicalLimit<CHILD_TYPE extends Plan> extends LogicalUnary<CHILD_TY
return false;
}
LogicalLimit that = (LogicalLimit) o;
return limit == that.limit && offset == that.offset;
return limit == that.limit && offset == that.offset && phase == that.phase;
}
@Override
@ -108,17 +118,17 @@ public class LogicalLimit<CHILD_TYPE extends Plan> extends LogicalUnary<CHILD_TY
@Override
public Plan withGroupExpression(Optional<GroupExpression> groupExpression) {
return new LogicalLimit<>(limit, offset, groupExpression, Optional.of(getLogicalProperties()), child());
return new LogicalLimit<>(limit, offset, phase, groupExpression, Optional.of(getLogicalProperties()), child());
}
@Override
public Plan withLogicalProperties(Optional<LogicalProperties> logicalProperties) {
return new LogicalLimit<>(limit, offset, Optional.empty(), logicalProperties, child());
return new LogicalLimit<>(limit, offset, phase, Optional.empty(), logicalProperties, child());
}
@Override
public LogicalLimit<Plan> withChildren(List<Plan> children) {
Preconditions.checkArgument(children.size() == 1);
return new LogicalLimit<>(limit, offset, children.get(0));
return new LogicalLimit<>(limit, offset, phase, children.get(0));
}
}

View File

@ -41,17 +41,17 @@ import java.util.Optional;
public class LogicalTopN<CHILD_TYPE extends Plan> extends LogicalUnary<CHILD_TYPE> implements TopN {
private final List<OrderKey> orderKeys;
private final int limit;
private final int offset;
private final long limit;
private final long offset;
public LogicalTopN(List<OrderKey> orderKeys, int limit, int offset, CHILD_TYPE child) {
public LogicalTopN(List<OrderKey> orderKeys, long limit, long offset, CHILD_TYPE child) {
this(orderKeys, limit, offset, Optional.empty(), Optional.empty(), child);
}
/**
* Constructor for LogicalSort.
*/
public LogicalTopN(List<OrderKey> orderKeys, int limit, int offset, Optional<GroupExpression> groupExpression,
public LogicalTopN(List<OrderKey> orderKeys, long limit, long offset, Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties, CHILD_TYPE child) {
super(PlanType.LOGICAL_TOP_N, groupExpression, logicalProperties, child);
this.orderKeys = ImmutableList.copyOf(Objects.requireNonNull(orderKeys, "orderKeys can not be null"));
@ -68,11 +68,11 @@ public class LogicalTopN<CHILD_TYPE extends Plan> extends LogicalUnary<CHILD_TYP
return orderKeys;
}
public int getOffset() {
public long getOffset() {
return offset;
}
public int getLimit() {
public long getLimit() {
return limit;
}

View File

@ -21,6 +21,7 @@ import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.plans.LimitPhase;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.algebra.Limit;
@ -39,14 +40,14 @@ import java.util.Optional;
* Physical limit plan
*/
public class PhysicalLimit<CHILD_TYPE extends Plan> extends PhysicalUnary<CHILD_TYPE> implements Limit {
private final LimitPhase phase;
private final long limit;
private final long offset;
public PhysicalLimit(long limit, long offset,
LogicalProperties logicalProperties,
LimitPhase phase, LogicalProperties logicalProperties,
CHILD_TYPE child) {
this(limit, offset, Optional.empty(), logicalProperties, child);
this(limit, offset, phase, Optional.empty(), logicalProperties, child);
}
/**
@ -57,11 +58,12 @@ public class PhysicalLimit<CHILD_TYPE extends Plan> extends PhysicalUnary<CHILD_
* @param offset the number of tuples skipped.
*/
public PhysicalLimit(long limit, long offset,
Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties,
LimitPhase phase, Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties,
CHILD_TYPE child) {
super(PlanType.PHYSICAL_LIMIT, groupExpression, logicalProperties, child);
this.limit = limit;
this.offset = offset;
this.phase = phase;
}
/**
@ -70,14 +72,16 @@ public class PhysicalLimit<CHILD_TYPE extends Plan> extends PhysicalUnary<CHILD_
*
* @param limit the number of tuples retrieved.
* @param offset the number of tuples skipped.
* @param phase the phase of 2-phase limit.
*/
public PhysicalLimit(long limit, long offset, Optional<GroupExpression> groupExpression,
public PhysicalLimit(long limit, long offset, LimitPhase phase, Optional<GroupExpression> groupExpression,
LogicalProperties logicalProperties, PhysicalProperties physicalProperties,
StatsDeriveResult statsDeriveResult, CHILD_TYPE child) {
super(PlanType.PHYSICAL_LIMIT, groupExpression, logicalProperties, physicalProperties, statsDeriveResult,
child);
this.limit = limit;
this.offset = offset;
this.phase = phase;
}
public long getLimit() {
@ -88,10 +92,18 @@ public class PhysicalLimit<CHILD_TYPE extends Plan> extends PhysicalUnary<CHILD_
return offset;
}
public LimitPhase getPhase() {
return phase;
}
public boolean isGlobal() {
return phase == LimitPhase.GLOBAL;
}
@Override
public Plan withChildren(List<Plan> children) {
Preconditions.checkArgument(children.size() == 1);
return new PhysicalLimit<>(limit, offset, getLogicalProperties(), children.get(0));
return new PhysicalLimit<>(limit, offset, phase, getLogicalProperties(), children.get(0));
}
@Override
@ -101,18 +113,18 @@ public class PhysicalLimit<CHILD_TYPE extends Plan> extends PhysicalUnary<CHILD_
@Override
public PhysicalLimit<CHILD_TYPE> withGroupExpression(Optional<GroupExpression> groupExpression) {
return new PhysicalLimit<>(limit, offset, groupExpression, getLogicalProperties(), child());
return new PhysicalLimit<>(limit, offset, phase, groupExpression, getLogicalProperties(), child());
}
@Override
public PhysicalLimit<CHILD_TYPE> withLogicalProperties(Optional<LogicalProperties> logicalProperties) {
return new PhysicalLimit<>(limit, offset, logicalProperties.get(), child());
return new PhysicalLimit<>(limit, offset, phase, logicalProperties.get(), child());
}
@Override
public PhysicalLimit<CHILD_TYPE> withPhysicalPropertiesAndStats(PhysicalProperties physicalProperties,
StatsDeriveResult statsDeriveResult) {
return new PhysicalLimit<>(limit, offset, groupExpression, getLogicalProperties(), physicalProperties,
return new PhysicalLimit<>(limit, offset, phase, groupExpression, getLogicalProperties(), physicalProperties,
statsDeriveResult, child());
}
@ -125,7 +137,7 @@ public class PhysicalLimit<CHILD_TYPE extends Plan> extends PhysicalUnary<CHILD_
return false;
}
PhysicalLimit that = (PhysicalLimit) o;
return offset == that.offset && limit == that.limit;
return offset == that.offset && limit == that.limit && phase == that.phase;
}
@Override
@ -143,6 +155,7 @@ public class PhysicalLimit<CHILD_TYPE extends Plan> extends PhysicalUnary<CHILD_
return Utils.toSqlString("PhysicalLimit",
"limit", limit,
"offset", offset,
"phase", phase,
"stats", statsDeriveResult
);
}

View File

@ -40,10 +40,10 @@ import java.util.Optional;
*/
public class PhysicalTopN<CHILD_TYPE extends Plan> extends AbstractPhysicalSort<CHILD_TYPE> implements TopN {
private final int limit;
private final int offset;
private final long limit;
private final long offset;
public PhysicalTopN(List<OrderKey> orderKeys, int limit, int offset,
public PhysicalTopN(List<OrderKey> orderKeys, long limit, long offset,
SortPhase phase, LogicalProperties logicalProperties, CHILD_TYPE child) {
this(orderKeys, limit, offset, phase, Optional.empty(), logicalProperties, child);
}
@ -51,7 +51,7 @@ public class PhysicalTopN<CHILD_TYPE extends Plan> extends AbstractPhysicalSort<
/**
* Constructor of PhysicalHashJoinNode.
*/
public PhysicalTopN(List<OrderKey> orderKeys, int limit, int offset,
public PhysicalTopN(List<OrderKey> orderKeys, long limit, long offset,
SortPhase phase, Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties,
CHILD_TYPE child) {
super(PlanType.PHYSICAL_TOP_N, orderKeys, phase, groupExpression, logicalProperties, child);
@ -63,7 +63,7 @@ public class PhysicalTopN<CHILD_TYPE extends Plan> extends AbstractPhysicalSort<
/**
* Constructor of PhysicalHashJoinNode.
*/
public PhysicalTopN(List<OrderKey> orderKeys, int limit, int offset,
public PhysicalTopN(List<OrderKey> orderKeys, long limit, long offset,
SortPhase phase, Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties,
PhysicalProperties physicalProperties, StatsDeriveResult statsDeriveResult, CHILD_TYPE child) {
super(PlanType.PHYSICAL_TOP_N, orderKeys, phase, groupExpression, logicalProperties, physicalProperties,
@ -73,11 +73,11 @@ public class PhysicalTopN<CHILD_TYPE extends Plan> extends AbstractPhysicalSort<
this.offset = offset;
}
public int getLimit() {
public long getLimit() {
return limit;
}
public int getOffset() {
public long getOffset() {
return offset;
}

View File

@ -317,7 +317,7 @@ public abstract class PlanVisitor<R, C> {
}
public R visitPhysicalTopN(PhysicalTopN<? extends Plan> topN, C context) {
return visit(topN, context);
return visitAbstractPhysicalSort(topN, context);
}
public R visitPhysicalLimit(PhysicalLimit<? extends Plan> limit, C context) {

View File

@ -31,6 +31,7 @@ import org.apache.doris.nereids.trees.plans.FakePlan;
import org.apache.doris.nereids.trees.plans.GroupPlan;
import org.apache.doris.nereids.trees.plans.JoinType;
import org.apache.doris.nereids.trees.plans.LeafPlan;
import org.apache.doris.nereids.trees.plans.LimitPhase;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
@ -361,7 +362,7 @@ class MemoTest implements MemoPatternMatchSupported {
@Test
public void a2bc() {
LogicalOlapScan student = new LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.student);
LogicalLimit<LogicalOlapScan> limit = new LogicalLimit<>(1, 0, student);
LogicalLimit<? extends Plan> limit = new LogicalLimit<>(1, 0, LimitPhase.ORIGIN, student);
PlanChecker.from(connectContext, new UnboundRelation(RelationUtil.newRelationId(), ImmutableList.of("student")))
.applyBottomUp(
@ -396,7 +397,7 @@ class MemoTest implements MemoPatternMatchSupported {
// invalid case
Assertions.assertThrows(IllegalStateException.class, () -> {
UnboundRelation student = new UnboundRelation(RelationUtil.newRelationId(), ImmutableList.of("student"));
LogicalLimit<UnboundRelation> limit = new LogicalLimit<>(1, 0, student);
LogicalLimit<? extends Plan> limit = new LogicalLimit<>(1, 0, LimitPhase.ORIGIN, student);
PlanChecker.from(connectContext, student)
.applyBottomUp(
@ -414,7 +415,7 @@ class MemoTest implements MemoPatternMatchSupported {
UnboundRelation a = new UnboundRelation(RelationUtil.newRelationId(), ImmutableList.of("student"));
UnboundRelation a2 = new UnboundRelation(RelationUtil.newRelationId(), ImmutableList.of("student"));
LogicalLimit<UnboundRelation> limit = new LogicalLimit<>(1, 0, a2);
LogicalLimit<UnboundRelation> limit = new LogicalLimit<>(1, 0, LimitPhase.ORIGIN, a2);
PlanChecker.from(connectContext, a)
.setMaxInvokeTimesPerRule(1000)
.applyBottomUp(
@ -479,8 +480,8 @@ class MemoTest implements MemoPatternMatchSupported {
@Test
public void a2bcd() {
LogicalOlapScan scan = new LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.student);
LogicalLimit<LogicalOlapScan> limit5 = new LogicalLimit<>(5, 0, scan);
LogicalLimit<LogicalLimit<LogicalOlapScan>> limit10 = new LogicalLimit<>(10, 0, limit5);
LogicalLimit<LogicalOlapScan> limit5 = new LogicalLimit<>(5, 0, LimitPhase.ORIGIN, scan);
LogicalLimit<LogicalLimit<LogicalOlapScan>> limit10 = new LogicalLimit<>(10, 0, LimitPhase.ORIGIN, limit5);
PlanChecker.from(connectContext, limit10)
.applyBottomUp(
@ -507,7 +508,7 @@ class MemoTest implements MemoPatternMatchSupported {
@Test
public void ab2a() {
LogicalOlapScan student = new LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.student);
LogicalLimit<LogicalOlapScan> limit10 = new LogicalLimit<>(10, 0, student);
LogicalLimit<LogicalOlapScan> limit10 = new LogicalLimit<>(10, 0, LimitPhase.ORIGIN, student);
PlanChecker.from(connectContext, limit10)
.applyBottomUp(
@ -531,7 +532,7 @@ class MemoTest implements MemoPatternMatchSupported {
@Test
public void ab2NewA() {
LogicalOlapScan student = new LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.student);
LogicalLimit<LogicalOlapScan> limit10 = new LogicalLimit<>(10, 0, student);
LogicalLimit<LogicalOlapScan> limit10 = new LogicalLimit<>(10, 0, LimitPhase.ORIGIN, student);
PlanChecker.from(connectContext, limit10)
.applyBottomUp(
@ -555,7 +556,7 @@ class MemoTest implements MemoPatternMatchSupported {
@Test
public void ab2GroupB() {
LogicalOlapScan student = new LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.student);
LogicalLimit<LogicalOlapScan> limit10 = new LogicalLimit<>(10, 0, student);
LogicalLimit<LogicalOlapScan> limit10 = new LogicalLimit<>(10, 0, LimitPhase.ORIGIN, student);
PlanChecker.from(connectContext, limit10)
.applyBottomUp(
@ -577,7 +578,7 @@ class MemoTest implements MemoPatternMatchSupported {
@Test
public void ab2PlanB() {
LogicalOlapScan student = new LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.student);
LogicalLimit<LogicalOlapScan> limit10 = new LogicalLimit<>(10, 0, student);
LogicalLimit<LogicalOlapScan> limit10 = new LogicalLimit<>(10, 0, LimitPhase.ORIGIN, student);
PlanChecker.from(connectContext, limit10)
.applyBottomUp(
@ -599,7 +600,7 @@ class MemoTest implements MemoPatternMatchSupported {
@Test
public void ab2c() {
UnboundRelation relation = new UnboundRelation(RelationUtil.newRelationId(), ImmutableList.of("student"));
LogicalLimit<UnboundRelation> limit10 = new LogicalLimit<>(10, 0, relation);
LogicalLimit<UnboundRelation> limit10 = new LogicalLimit<>(10, 0, LimitPhase.ORIGIN, relation);
LogicalOlapScan student = new LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.student);
PlanChecker.from(connectContext, limit10)
@ -622,10 +623,10 @@ class MemoTest implements MemoPatternMatchSupported {
@Test
public void ab2cd() {
UnboundRelation relation = new UnboundRelation(RelationUtil.newRelationId(), ImmutableList.of("student"));
LogicalLimit<UnboundRelation> limit10 = new LogicalLimit<>(10, 0, relation);
LogicalLimit<UnboundRelation> limit10 = new LogicalLimit<>(10, 0, LimitPhase.ORIGIN, relation);
LogicalOlapScan student = new LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.student);
LogicalLimit<LogicalOlapScan> limit5 = new LogicalLimit<>(5, 0, student);
LogicalLimit<LogicalOlapScan> limit5 = new LogicalLimit<>(5, 0, LimitPhase.ORIGIN, student);
PlanChecker.from(connectContext, limit10)
.applyBottomUp(
@ -650,8 +651,8 @@ class MemoTest implements MemoPatternMatchSupported {
@Test
public void ab2cb() {
LogicalOlapScan student = new LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.student);
LogicalLimit<LogicalOlapScan> limit10 = new LogicalLimit<>(10, 0, student);
LogicalLimit<LogicalOlapScan> limit5 = new LogicalLimit<>(5, 0, student);
LogicalLimit<LogicalOlapScan> limit10 = new LogicalLimit<>(10, 0, LimitPhase.ORIGIN, student);
LogicalLimit<LogicalOlapScan> limit5 = new LogicalLimit<>(5, 0, LimitPhase.ORIGIN, student);
PlanChecker.from(connectContext, limit10)
.applyBottomUp(
@ -681,7 +682,7 @@ class MemoTest implements MemoPatternMatchSupported {
Assertions.assertThrowsExactly(IllegalStateException.class, () -> {
LogicalOlapScan student = new LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.student);
LogicalLimit<LogicalOlapScan> limit10 = new LogicalLimit<>(10, 0, student);
LogicalLimit<LogicalOlapScan> limit10 = new LogicalLimit<>(10, 0, LimitPhase.ORIGIN, student);
PlanChecker.from(connectContext, limit10)
.setMaxInvokeTimesPerRule(1000)
@ -707,8 +708,8 @@ class MemoTest implements MemoPatternMatchSupported {
Assertions.assertThrowsExactly(IllegalStateException.class, () -> {
UnboundRelation student = new UnboundRelation(RelationUtil.newRelationId(), ImmutableList.of("student"));
LogicalLimit<UnboundRelation> limit5 = new LogicalLimit<>(5, 0, student);
LogicalLimit<LogicalLimit<UnboundRelation>> limit10 = new LogicalLimit<>(10, 0, limit5);
LogicalLimit<UnboundRelation> limit5 = new LogicalLimit<>(5, 0, LimitPhase.ORIGIN, student);
LogicalLimit<LogicalLimit<UnboundRelation>> limit10 = new LogicalLimit<>(10, 0, LimitPhase.ORIGIN, limit5);
PlanChecker.from(connectContext, limit10)
.applyBottomUp(
@ -733,11 +734,11 @@ class MemoTest implements MemoPatternMatchSupported {
@Test
public void ab2cde() {
UnboundRelation student = new UnboundRelation(RelationUtil.newRelationId(), ImmutableList.of("student"));
LogicalLimit<UnboundRelation> limit3 = new LogicalLimit<>(3, 0, student);
LogicalLimit<UnboundRelation> limit3 = new LogicalLimit<>(3, 0, LimitPhase.ORIGIN, student);
LogicalOlapScan scan = new LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.student);
LogicalLimit<LogicalOlapScan> limit5 = new LogicalLimit<>(5, 0, scan);
LogicalLimit<LogicalLimit<LogicalOlapScan>> limit10 = new LogicalLimit<>(10, 0, limit5);
LogicalLimit<LogicalOlapScan> limit5 = new LogicalLimit<>(5, 0, LimitPhase.ORIGIN, scan);
LogicalLimit<LogicalLimit<LogicalOlapScan>> limit10 = new LogicalLimit<>(10, 0, LimitPhase.ORIGIN, limit5);
PlanChecker.from(connectContext, limit3)
.applyBottomUp(
@ -766,8 +767,8 @@ class MemoTest implements MemoPatternMatchSupported {
public void abc2bac() {
UnboundRelation student = new UnboundRelation(RelationUtil.newRelationId(), ImmutableList.of("student"));
LogicalLimit<UnboundRelation> limit5 = new LogicalLimit<>(5, 0, student);
LogicalLimit<LogicalLimit<UnboundRelation>> limit10 = new LogicalLimit<>(10, 0, limit5);
LogicalLimit<UnboundRelation> limit5 = new LogicalLimit<>(5, 0, LimitPhase.ORIGIN, student);
LogicalLimit<LogicalLimit<UnboundRelation>> limit10 = new LogicalLimit<>(10, 0, LimitPhase.ORIGIN, limit5);
PlanChecker.from(connectContext, limit10)
.applyBottomUp(
@ -805,8 +806,8 @@ class MemoTest implements MemoPatternMatchSupported {
public void abc2bc() {
UnboundRelation student = new UnboundRelation(RelationUtil.newRelationId(), ImmutableList.of("student"));
LogicalLimit<UnboundRelation> limit5 = new LogicalLimit<>(5, 0, student);
LogicalLimit<LogicalLimit<UnboundRelation>> limit10 = new LogicalLimit<>(10, 0, limit5);
LogicalLimit<UnboundRelation> limit5 = new LogicalLimit<>(5, 0, LimitPhase.ORIGIN, student);
LogicalLimit<LogicalLimit<UnboundRelation>> limit10 = new LogicalLimit<>(10, 0, LimitPhase.ORIGIN, limit5);
PlanChecker.from(connectContext, limit10)
.applyBottomUp(
@ -829,7 +830,7 @@ class MemoTest implements MemoPatternMatchSupported {
@Test
public void testRewriteBottomPlanToOnePlan() {
LogicalOlapScan student = new LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.student);
LogicalLimit<LogicalOlapScan> limit = new LogicalLimit<>(1, 0, student);
LogicalLimit<LogicalOlapScan> limit = new LogicalLimit<>(1, 0, LimitPhase.ORIGIN, student);
LogicalOlapScan score = new LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.score);
@ -848,10 +849,10 @@ class MemoTest implements MemoPatternMatchSupported {
@Test
public void testRewriteBottomPlanToMultiPlan() {
LogicalOlapScan student = new LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.student);
LogicalLimit<LogicalOlapScan> limit10 = new LogicalLimit<>(10, 0, student);
LogicalLimit<LogicalOlapScan> limit10 = new LogicalLimit<>(10, 0, LimitPhase.ORIGIN, student);
LogicalOlapScan score = new LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.score);
LogicalLimit<LogicalOlapScan> limit1 = new LogicalLimit<>(1, 0, score);
LogicalLimit<LogicalOlapScan> limit1 = new LogicalLimit<>(1, 0, LimitPhase.ORIGIN, score);
PlanChecker.from(connectContext, limit10)
.applyBottomUp(
@ -892,7 +893,7 @@ class MemoTest implements MemoPatternMatchSupported {
@Test
public void testRecomputeLogicalProperties() {
UnboundRelation unboundTable = new UnboundRelation(RelationUtil.newRelationId(), ImmutableList.of("score"));
LogicalLimit<UnboundRelation> unboundLimit = new LogicalLimit<>(1, 0, unboundTable);
LogicalLimit<UnboundRelation> unboundLimit = new LogicalLimit<>(1, 0, LimitPhase.ORIGIN, unboundTable);
LogicalOlapScan boundTable = new LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.score);
LogicalLimit<Plan> boundLimit = unboundLimit.withChildren(ImmutableList.of(boundTable));
@ -924,7 +925,7 @@ class MemoTest implements MemoPatternMatchSupported {
@Test
public void testEliminateRootWithChildGroupInTwoLevels() {
LogicalOlapScan scan = new LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.score);
LogicalLimit<Plan> limit = new LogicalLimit<>(1, 0, scan);
LogicalLimit<Plan> limit = new LogicalLimit<>(1, 0, LimitPhase.ORIGIN, scan);
PlanChecker.from(connectContext, limit)
.applyBottomUp(logicalLimit().then(LogicalLimit::child))
@ -936,7 +937,7 @@ class MemoTest implements MemoPatternMatchSupported {
@Test
public void testEliminateRootWithChildPlanInTwoLevels() {
LogicalOlapScan scan = new LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.score);
LogicalLimit<Plan> limit = new LogicalLimit<>(1, 0, scan);
LogicalLimit<Plan> limit = new LogicalLimit<>(1, 0, LimitPhase.ORIGIN, scan);
PlanChecker.from(connectContext, limit)
.applyBottomUp(logicalLimit(any()).then(LogicalLimit::child))
@ -948,7 +949,7 @@ class MemoTest implements MemoPatternMatchSupported {
@Test
public void testEliminateTwoLevelsToOnePlan() {
LogicalOlapScan score = new LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.score);
LogicalLimit<Plan> limit = new LogicalLimit<>(1, 0, score);
LogicalLimit<Plan> limit = new LogicalLimit<>(1, 0, LimitPhase.ORIGIN, score);
LogicalOlapScan student = new LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.student);
@ -968,10 +969,10 @@ class MemoTest implements MemoPatternMatchSupported {
@Test
public void testEliminateTwoLevelsToTwoPlans() {
LogicalOlapScan score = new LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.score);
LogicalLimit<Plan> limit1 = new LogicalLimit<>(1, 0, score);
LogicalLimit<Plan> limit1 = new LogicalLimit<>(1, 0, LimitPhase.ORIGIN, score);
LogicalOlapScan student = new LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.student);
LogicalLimit<Plan> limit10 = new LogicalLimit<>(10, 0, student);
LogicalLimit<Plan> limit10 = new LogicalLimit<>(10, 0, LimitPhase.ORIGIN, student);
PlanChecker.from(connectContext, limit1)
.applyBottomUp(logicalLimit(any()).when(limit1::equals).then(l -> limit10))
@ -998,7 +999,7 @@ class MemoTest implements MemoPatternMatchSupported {
public void test() {
PlanChecker.from(MemoTestUtils.createConnectContext())
.analyze(new LogicalLimit<>(10, 0,
new LogicalJoin<>(JoinType.LEFT_OUTER_JOIN,
LimitPhase.ORIGIN, new LogicalJoin<>(JoinType.LEFT_OUTER_JOIN,
ImmutableList.of(new EqualTo(new UnboundSlot("sid"), new UnboundSlot("id"))),
new LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.score),
new LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.student)

View File

@ -33,6 +33,7 @@ import org.apache.doris.nereids.trees.plans.AggPhase;
import org.apache.doris.nereids.trees.plans.GroupPlan;
import org.apache.doris.nereids.trees.plans.JoinHint;
import org.apache.doris.nereids.trees.plans.JoinType;
import org.apache.doris.nereids.trees.plans.LimitPhase;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.SortPhase;
import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalJoin;
@ -385,6 +386,7 @@ public class ChildOutputPropertyDeriverTest {
public void testTopN() {
SlotReference key = new SlotReference("col1", IntegerType.INSTANCE);
List<OrderKey> orderKeys = Lists.newArrayList(new OrderKey(key, true, true));
// localSort require any
PhysicalTopN<GroupPlan> sort = new PhysicalTopN<>(orderKeys, 10, 10, SortPhase.LOCAL_SORT, logicalProperties, groupPlan);
GroupExpression groupExpression = new GroupExpression(sort);
PhysicalProperties child = new PhysicalProperties(DistributionSpecReplicated.INSTANCE,
@ -394,6 +396,17 @@ public class ChildOutputPropertyDeriverTest {
ChildOutputPropertyDeriver deriver = new ChildOutputPropertyDeriver(Lists.newArrayList(child));
PhysicalProperties result = deriver.getOutputProperties(groupExpression);
Assertions.assertEquals(orderKeys, result.getOrderSpec().getOrderKeys());
Assertions.assertEquals(DistributionSpecReplicated.INSTANCE, result.getDistributionSpec());
// merge/gather sort requires gather
sort = new PhysicalTopN<>(orderKeys, 10, 10, SortPhase.MERGE_SORT, logicalProperties, groupPlan);
groupExpression = new GroupExpression(sort);
child = new PhysicalProperties(DistributionSpecReplicated.INSTANCE,
new OrderSpec(Lists.newArrayList(
new OrderKey(new SlotReference("ignored", IntegerType.INSTANCE), true, true))));
deriver = new ChildOutputPropertyDeriver(Lists.newArrayList(child));
result = deriver.getOutputProperties(groupExpression);
Assertions.assertEquals(orderKeys, result.getOrderSpec().getOrderKeys());
Assertions.assertEquals(DistributionSpecGather.INSTANCE, result.getDistributionSpec());
}
@ -401,7 +414,7 @@ public class ChildOutputPropertyDeriverTest {
public void testLimit() {
SlotReference key = new SlotReference("col1", IntegerType.INSTANCE);
List<OrderKey> orderKeys = Lists.newArrayList(new OrderKey(key, true, true));
PhysicalLimit<GroupPlan> limit = new PhysicalLimit<>(10, 10, logicalProperties, groupPlan);
PhysicalLimit<GroupPlan> limit = new PhysicalLimit<>(10, 10, LimitPhase.ORIGIN, logicalProperties, groupPlan);
GroupExpression groupExpression = new GroupExpression(limit);
PhysicalProperties child = new PhysicalProperties(DistributionSpecReplicated.INSTANCE,
new OrderSpec(orderKeys));

View File

@ -22,6 +22,7 @@ import org.apache.doris.nereids.properties.OrderKey;
import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.plans.GroupPlan;
import org.apache.doris.nereids.trees.plans.LimitPhase;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
@ -108,7 +109,7 @@ public class ImplementationTest {
public void toPhysicalLimitTest() {
int limit = 10;
int offset = 100;
LogicalLimit<GroupPlan> logicalLimit = new LogicalLimit<>(limit, offset, groupPlan);
LogicalLimit<? extends Plan> logicalLimit = new LogicalLimit<>(limit, offset, LimitPhase.LOCAL, groupPlan);
PhysicalPlan physicalPlan = executeImplementationRule(logicalLimit);
Assertions.assertEquals(PlanType.PHYSICAL_LIMIT, physicalPlan.getType());
PhysicalLimit<GroupPlan> physicalLimit = (PhysicalLimit<GroupPlan>) physicalPlan;

View File

@ -18,12 +18,17 @@
package org.apache.doris.nereids.rules.rewrite.logical;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.properties.OrderKey;
import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.nereids.trees.plans.LimitPhase;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalEmptyRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalLimit;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalSort;
import org.apache.doris.nereids.trees.plans.logical.LogicalTopN;
import org.apache.doris.nereids.util.MemoTestUtils;
import org.apache.doris.nereids.util.PlanChecker;
import org.apache.doris.nereids.util.PlanConstructor;
import com.google.common.collect.Lists;
@ -31,6 +36,7 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.List;
import java.util.stream.Collectors;
/**
* MergeConsecutiveFilter ut
@ -39,7 +45,7 @@ public class EliminateLimitTest {
@Test
public void testEliminateLimit() {
LogicalOlapScan scan = PlanConstructor.newLogicalOlapScan(0, "t1", 0);
LogicalLimit<LogicalOlapScan> limit = new LogicalLimit<>(0, 0, scan);
LogicalLimit<LogicalOlapScan> limit = new LogicalLimit<>(0, 0, LimitPhase.ORIGIN, scan);
CascadesContext cascadesContext = MemoTestUtils.createCascadesContext(limit);
List<Rule> rules = Lists.newArrayList(new EliminateLimit().build());
@ -48,4 +54,17 @@ public class EliminateLimitTest {
Plan actual = cascadesContext.getMemo().copyOut();
Assertions.assertTrue(actual instanceof LogicalEmptyRelation);
}
@Test
public void testLimitSort() {
LogicalOlapScan scan = PlanConstructor.newLogicalOlapScan(0, "t1", 0);
LogicalLimit limit = new LogicalLimit<>(1, 1, LimitPhase.ORIGIN,
new LogicalSort<>(scan.getOutput().stream().map(c -> new OrderKey(c, true, true)).collect(Collectors.toList()),
scan));
Plan actual = PlanChecker.from(MemoTestUtils.createConnectContext(), limit)
.rewrite()
.getPlan();
Assertions.assertTrue(actual instanceof LogicalTopN);
}
}

View File

@ -20,6 +20,7 @@ package org.apache.doris.nereids.rules.rewrite.logical;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.analyzer.UnboundRelation;
import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.nereids.trees.plans.LimitPhase;
import org.apache.doris.nereids.trees.plans.logical.LogicalLimit;
import org.apache.doris.nereids.trees.plans.logical.RelationUtil;
import org.apache.doris.nereids.util.MemoTestUtils;
@ -33,10 +34,10 @@ import java.util.List;
public class MergeLimitsTest {
@Test
public void testMergeConsecutiveLimits() {
LogicalLimit limit3 = new LogicalLimit<>(3, 5, new UnboundRelation(
LogicalLimit limit3 = new LogicalLimit<>(3, 5, LimitPhase.ORIGIN, new UnboundRelation(
RelationUtil.newRelationId(), Lists.newArrayList("db", "t")));
LogicalLimit limit2 = new LogicalLimit<>(2, 0, limit3);
LogicalLimit limit1 = new LogicalLimit<>(10, 0, limit2);
LogicalLimit limit2 = new LogicalLimit<>(2, 0, LimitPhase.ORIGIN, limit3);
LogicalLimit limit1 = new LogicalLimit<>(10, 0, LimitPhase.ORIGIN, limit2);
CascadesContext context = MemoTestUtils.createCascadesContext(limit1);
List<Rule> rules = Lists.newArrayList(new MergeLimits().build());

View File

@ -22,6 +22,7 @@ import org.apache.doris.nereids.pattern.PatternDescriptor;
import org.apache.doris.nereids.trees.expressions.EqualTo;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.plans.JoinType;
import org.apache.doris.nereids.trees.plans.LimitPhase;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.RelationId;
import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
@ -204,7 +205,6 @@ class PushdownLimitTest extends TestWithFeService implements MemoPatternMatchSup
// plan among fragments has duplicate elements.
(s1, s2) -> s1)
);
// limit is push down to left scan of `t1`.
Assertions.assertEquals(2, nameToScan.size());
Assertions.assertEquals(5, nameToScan.get("t1").getLimit());
@ -212,6 +212,14 @@ class PushdownLimitTest extends TestWithFeService implements MemoPatternMatchSup
);
}
@Test
public void testLimitPushSort() {
PlanChecker.from(connectContext)
.analyze("select k1 from t1 order by k1 limit 1")
.rewrite()
.matches(logicalTopN());
}
@Test
public void testLimitPushUnion() {
PlanChecker.from(connectContext)
@ -229,8 +237,10 @@ class PushdownLimitTest extends TestWithFeService implements MemoPatternMatchSup
logicalOlapScan().when(scan -> "t2".equals(scan.getTable().getName()))
),
logicalLimit(
logicalProject(
logicalOlapScan().when(scan -> "t3".equals(scan.getTable().getName()))
logicalLimit(
logicalProject(
logicalOlapScan().when(scan -> "t3".equals(scan.getTable().getName()))
)
)
)
)
@ -261,12 +271,12 @@ class PushdownLimitTest extends TestWithFeService implements MemoPatternMatchSup
if (hasProject) {
// return limit -> project -> join
return new LogicalLimit<>(10, 0, new LogicalProject<>(
return new LogicalLimit<>(10, 0, LimitPhase.ORIGIN, new LogicalProject<>(
ImmutableList.of(new UnboundSlot("sid"), new UnboundSlot("id")),
join));
} else {
// return limit -> join
return new LogicalLimit<>(10, 0, join);
return new LogicalLimit<>(10, 0, LimitPhase.ORIGIN, join);
}
}
}

View File

@ -0,0 +1,41 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.rules.rewrite.logical;
import org.apache.doris.nereids.trees.plans.LimitPhase;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalLimit;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
import org.apache.doris.nereids.util.MemoTestUtils;
import org.apache.doris.nereids.util.PlanChecker;
import org.apache.doris.nereids.util.PlanConstructor;
import org.junit.jupiter.api.Test;
public class SplitLimitTest {
private final LogicalOlapScan scan1 = PlanConstructor.newLogicalOlapScan(0, "t1", 0);
@Test
void testSplitLimit() {
Plan plan = new LogicalLimit<>(0, 0, LimitPhase.ORIGIN, scan1);
plan = PlanChecker.from(MemoTestUtils.createConnectContext(), plan)
.rewrite()
.getPlan();
plan.anyMatch(x -> x instanceof LogicalLimit && ((LogicalLimit<?>) x).isSplit());
}
}

View File

@ -28,6 +28,8 @@ import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.literal.IntegerLiteral;
import org.apache.doris.nereids.trees.plans.FakePlan;
import org.apache.doris.nereids.trees.plans.GroupPlan;
import org.apache.doris.nereids.trees.plans.LimitPhase;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
import org.apache.doris.nereids.trees.plans.logical.LogicalLimit;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
@ -279,7 +281,9 @@ public class StatsCalculatorTest {
GroupPlan groupPlan = new GroupPlan(childGroup);
childGroup.setStatistics(childStats);
LogicalLimit<GroupPlan> logicalLimit = new LogicalLimit<>(1, 2, groupPlan);
LogicalLimit<? extends Plan> logicalLimit = new LogicalLimit<>(1, 2,
LimitPhase.GLOBAL, new LogicalLimit<>(1, 2, LimitPhase.LOCAL, groupPlan)
);
GroupExpression groupExpression = new GroupExpression(logicalLimit, ImmutableList.of(childGroup));
Group ownerGroup = newGroup();
ownerGroup.addGroupExpression(groupExpression);

View File

@ -46,7 +46,7 @@ public class PlanToStringTest {
@Test
public void testLogicalLimit(@Mocked Plan child) {
LogicalLimit<Plan> plan = new LogicalLimit<>(0, 0, child);
LogicalLimit<Plan> plan = new LogicalLimit<>(0, 0, LimitPhase.ORIGIN, child);
Assertions.assertEquals("LogicalLimit ( limit=0, offset=0 )", plan.toString());
}

View File

@ -24,6 +24,7 @@ import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.plans.JoinHint;
import org.apache.doris.nereids.trees.plans.JoinType;
import org.apache.doris.nereids.trees.plans.LimitPhase;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
@ -122,7 +123,7 @@ public class LogicalPlanBuilder {
}
public LogicalPlanBuilder limit(long limit, long offset) {
LogicalLimit<LogicalPlan> limitPlan = new LogicalLimit<>(limit, offset, this.plan);
LogicalLimit<LogicalPlan> limitPlan = new LogicalLimit<>(limit, offset, LimitPhase.ORIGIN, this.plan);
return from(limitPlan);
}