[feature](Nereids) pushdown filter through window (#18784)

Support the operator `PartitionTopN`, which can partition first and do the topn operation later in each partition. It used in the following case
```
-- Support push the filter down to the window and generate the PartitionTopN.
-- The plan change from `window -> filter` to `partitionTopN -> window -> filter`.
explain select *  from (select * , row_number() over(partition by b order by a) as num from t ) tt where  num <= 10;

-- Support push the limit down to the window and generate the PartitionTopN. 
-- The plan change from `window -> limit` to `partitionTopN -> window -> limit `.
explain select row_number() over(partition by b order by a) as num from t limit 10;

-- Support push the topn down to the window and generate the PartitionTopN. 
-- The plan change from `window -> topn` to `partitionTopN -> window -> topn `.
explain select row_number() over(partition by b order by a) as num from t order by num limit 10;
```

The FE part detail design:
1. Add the following rewrite rules:
    - PUSHDOWN_FILTER_THROUGH_WINDOW
    - PUSH_LIMIT_THROUGH_PROJECT_WINDOW
    - PUSH_LIMIT_THROUGH_WINDOW
    - PUSHDOWN_TOP_N_THROUGH_PROJECTION_WINDOW
    - PUSHDOWN_TOP_N_THROUGH_WINDOW
2. Add the PartitionTopN node(LogicalPlan/ PhysicalPlan/ TranslatorPlan)
3. For the rewrite plan, there are several requests that need to meet:
    - For the `Filter` part, only consider `</ <=/ =` conditions. And the filter conditions will be stored.
    - For the `Window` part, we only support one window function. And we support the `row_number`, `rank`, `dense_rank` window functions. And the `partition by` key and `order by` key can not be empty at the same time. The `Window Frame` should be `UNBOUNDED to CURRENT`.
4. For the `PhysicalPartitionTopN`, the requested property is `Any`and the output property is its children's property.

That's the main details that are very important. For the other part, you can directly check the code.

Issue Number #18646

BE Part #19708
This commit is contained in:
Chengpeng Yan
2023-05-26 11:23:48 +08:00
committed by GitHub
parent 558f625d3b
commit dee9c2240f
25 changed files with 1496 additions and 1 deletions

View File

@ -33,6 +33,7 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN;
import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
import org.apache.doris.nereids.trees.plans.physical.PhysicalQuickSort;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSchemaScan;
@ -147,6 +148,16 @@ class CostModelV1 extends PlanVisitor<Cost, PlanContext> {
childStatistics.getRowCount());
}
@Override
public Cost visitPhysicalPartitionTopN(PhysicalPartitionTopN<? extends Plan> partitionTopN, PlanContext context) {
Statistics statistics = context.getStatisticsWithCheck();
Statistics childStatistics = context.getChildStatistics(0);
return CostV1.of(
childStatistics.getRowCount(),
statistics.getRowCount(),
childStatistics.getRowCount());
}
@Override
public Cost visitPhysicalDistribute(
PhysicalDistribute<? extends Plan> distribute, PlanContext context) {

View File

@ -35,6 +35,7 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit;
import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN;
import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
import org.apache.doris.nereids.trees.plans.physical.PhysicalRepeat;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSchemaScan;
@ -156,6 +157,20 @@ class CostModelV2 extends PlanVisitor<Cost, PlanContext> {
return new CostV2(startCost, runCost, statistics.computeSize());
}
@Override
public Cost visitPhysicalPartitionTopN(PhysicalPartitionTopN<? extends Plan> partitionTopN, PlanContext context) {
Statistics statistics = context.getStatisticsWithCheck();
Statistics childStatistics = context.getChildStatistics(0);
// Random set a value. The partitionTopN is generated in the rewrite phase,
// and it only has one physical implementation. So this cost will not affect the result.
double runCost = childStatistics.getRowCount() * CMP_COST * Math.log(statistics.getRowCount())
/ statistics.getBENumber();
double startCost = runCost;
return new CostV2(startCost, runCost, statistics.computeSize());
}
@Override
public Cost visitPhysicalDistribute(PhysicalDistribute<? extends Plan> distribute, PlanContext context) {
Statistics childStatistics = context.getChildStatistics(0);

View File

@ -103,6 +103,7 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOneRowRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
import org.apache.doris.nereids.trees.plans.physical.PhysicalQuickSort;
@ -137,6 +138,7 @@ import org.apache.doris.planner.JoinNodeBase;
import org.apache.doris.planner.NestedLoopJoinNode;
import org.apache.doris.planner.OlapScanNode;
import org.apache.doris.planner.OlapTableSink;
import org.apache.doris.planner.PartitionSortNode;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.planner.PlanNode;
import org.apache.doris.planner.RepeatNode;
@ -994,6 +996,57 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
return inputPlanFragment;
}
@Override
public PlanFragment visitPhysicalPartitionTopN(PhysicalPartitionTopN<? extends Plan> partitionTopN,
PlanTranslatorContext context) {
PlanFragment inputFragment = partitionTopN.child(0).accept(this, context);
Preconditions.checkArgument(!(partitionTopN.child(0) instanceof ExchangeNode));
PartitionSortNode partitionSortNode = translatePartitionSortNode(partitionTopN,
inputFragment.getPlanRoot(), context);
addPlanRoot(inputFragment, partitionSortNode, partitionTopN);
return inputFragment;
}
private PartitionSortNode translatePartitionSortNode(PhysicalPartitionTopN<? extends Plan> partitionTopN,
PlanNode childNode, PlanTranslatorContext context) {
// Generate the SortInfo, similar to 'translateSortNode'.
List<Expr> oldOrderingExprList = Lists.newArrayList();
List<Boolean> ascOrderList = Lists.newArrayList();
List<Boolean> nullsFirstParamList = Lists.newArrayList();
List<OrderKey> orderKeyList = partitionTopN.getOrderKeys();
// 1. Get previous slotRef
orderKeyList.forEach(k -> {
oldOrderingExprList.add(ExpressionTranslator.translate(k.getExpr(), context));
ascOrderList.add(k.isAsc());
nullsFirstParamList.add(k.isNullFirst());
});
List<Expr> sortTupleOutputList = new ArrayList<>();
List<Slot> outputList = partitionTopN.getOutput();
outputList.forEach(k -> {
sortTupleOutputList.add(ExpressionTranslator.translate(k, context));
});
List<Expr> partitionExprs = partitionTopN.getPartitionKeys().stream()
.map(e -> ExpressionTranslator.translate(e, context))
.collect(Collectors.toList());
// 2. Generate new Tuple and get current slotRef for newOrderingExprList
List<Expr> newOrderingExprList = Lists.newArrayList();
TupleDescriptor tupleDesc = generateTupleDesc(outputList, orderKeyList, newOrderingExprList, context, null);
// 3. fill in SortInfo members
SortInfo sortInfo = new SortInfo(newOrderingExprList, ascOrderList, nullsFirstParamList, tupleDesc);
PartitionSortNode partitionSortNode = new PartitionSortNode(context.nextPlanNodeId(), childNode,
partitionTopN.getFunction(), partitionExprs, sortInfo, partitionTopN.hasGlobalLimit(),
partitionTopN.getPartitionLimit(), sortTupleOutputList, oldOrderingExprList);
if (partitionTopN.getStats() != null) {
partitionSortNode.setCardinality((long) partitionTopN.getStats().getRowCount());
}
updateLegacyPlanIdToPhysicalPlan(partitionSortNode, partitionTopN);
return partitionSortNode;
}
@Override
public PlanFragment visitPhysicalTopN(PhysicalTopN<? extends Plan> topN, PlanTranslatorContext context) {
PlanFragment inputFragment = topN.child(0).accept(this, context);

View File

@ -67,7 +67,9 @@ import org.apache.doris.nereids.rules.rewrite.logical.PruneOlapScanPartition;
import org.apache.doris.nereids.rules.rewrite.logical.PruneOlapScanTablet;
import org.apache.doris.nereids.rules.rewrite.logical.PushFilterInsideJoin;
import org.apache.doris.nereids.rules.rewrite.logical.PushdownFilterThroughProject;
import org.apache.doris.nereids.rules.rewrite.logical.PushdownFilterThroughWindow;
import org.apache.doris.nereids.rules.rewrite.logical.PushdownLimit;
import org.apache.doris.nereids.rules.rewrite.logical.PushdownTopNThroughWindow;
import org.apache.doris.nereids.rules.rewrite.logical.ReorderJoin;
import org.apache.doris.nereids.rules.rewrite.logical.SemiJoinCommute;
import org.apache.doris.nereids.rules.rewrite.logical.SimplifyAggGroupBy;
@ -238,6 +240,14 @@ public class NereidsRewriter extends BatchRewriteJob {
)).addAll(RuleSet.PUSH_DOWN_FILTERS).build())
),
topic("Window optimization",
topDown(
new PushdownLimit(),
new PushdownTopNThroughWindow(),
new PushdownFilterThroughWindow()
)
),
// TODO: I think these rules should be implementation rules, and generate alternative physical plans.
topic("Table/Physical optimization",
topDown(

View File

@ -43,6 +43,7 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit;
import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN;
import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
import org.apache.doris.nereids.trees.plans.physical.PhysicalStorageLayerAggregate;
import org.apache.doris.nereids.trees.plans.physical.PhysicalTVFRelation;
@ -132,6 +133,14 @@ public class ChildOutputPropertyDeriver extends PlanVisitor<PhysicalProperties,
return new PhysicalProperties(DistributionSpecGather.INSTANCE, childOutputProperty.getOrderSpec());
}
@Override
public PhysicalProperties visitPhysicalPartitionTopN(PhysicalPartitionTopN<? extends Plan> partitionTopN,
PlanContext context) {
Preconditions.checkState(childrenOutputProperties.size() == 1);
PhysicalProperties childOutputProperty = childrenOutputProperties.get(0);
return new PhysicalProperties(childOutputProperty.getDistributionSpec());
}
@Override
public PhysicalProperties visitPhysicalProject(PhysicalProject<? extends Plan> project, PlanContext context) {
// TODO: order spec do not process since we do not use it.

View File

@ -36,6 +36,7 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit;
import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.ExpressionUtils;
import org.apache.doris.nereids.util.JoinUtils;
@ -115,6 +116,12 @@ public class RequestPropertyDeriver extends PlanVisitor<Void, PlanContext> {
return null;
}
@Override
public Void visitPhysicalPartitionTopN(PhysicalPartitionTopN<? extends Plan> partitionTopN, PlanContext context) {
addRequestPropertyToChildren(PhysicalProperties.ANY);
return null;
}
@Override
public Void visitPhysicalHashJoin(PhysicalHashJoin<? extends Plan, ? extends Plan> hashJoin, PlanContext context) {
JoinHint hint = hashJoin.getHint();

View File

@ -55,6 +55,7 @@ import org.apache.doris.nereids.rules.implementation.LogicalLimitToPhysicalLimit
import org.apache.doris.nereids.rules.implementation.LogicalOlapScanToPhysicalOlapScan;
import org.apache.doris.nereids.rules.implementation.LogicalOlapTableSinkToPhysicalOlapTableSink;
import org.apache.doris.nereids.rules.implementation.LogicalOneRowRelationToPhysicalOneRowRelation;
import org.apache.doris.nereids.rules.implementation.LogicalPartitionTopNToPhysicalPartitionTopN;
import org.apache.doris.nereids.rules.implementation.LogicalProjectToPhysicalProject;
import org.apache.doris.nereids.rules.implementation.LogicalRepeatToPhysicalRepeat;
import org.apache.doris.nereids.rules.implementation.LogicalSchemaScanToPhysicalSchemaScan;
@ -75,6 +76,7 @@ import org.apache.doris.nereids.rules.rewrite.logical.PushdownFilterThroughJoin;
import org.apache.doris.nereids.rules.rewrite.logical.PushdownFilterThroughProject;
import org.apache.doris.nereids.rules.rewrite.logical.PushdownFilterThroughRepeat;
import org.apache.doris.nereids.rules.rewrite.logical.PushdownFilterThroughSetOperation;
import org.apache.doris.nereids.rules.rewrite.logical.PushdownFilterThroughWindow;
import org.apache.doris.nereids.rules.rewrite.logical.PushdownJoinOtherCondition;
import org.apache.doris.nereids.rules.rewrite.logical.PushdownProjectThroughLimit;
@ -113,6 +115,7 @@ public class RuleSet {
new PushdownFilterThroughAggregation(),
new PushdownFilterThroughRepeat(),
new PushdownFilterThroughSetOperation(),
new PushdownFilterThroughWindow(),
new PushdownProjectThroughLimit(),
new PushdownAliasThroughJoin(),
new EliminateOuterJoin(),
@ -136,6 +139,7 @@ public class RuleSet {
.add(new LogicalWindowToPhysicalWindow())
.add(new LogicalSortToPhysicalQuickSort())
.add(new LogicalTopNToPhysicalTopN())
.add(new LogicalPartitionTopNToPhysicalPartitionTopN())
.add(new LogicalAssertNumRowsToPhysicalAssertNumRows())
.add(new LogicalOneRowRelationToPhysicalOneRowRelation())
.add(new LogicalEmptyRelationToPhysicalEmptyRelation())

View File

@ -134,11 +134,15 @@ public enum RuleType {
PUSH_FILTER_INSIDE_JOIN(RuleTypeClass.REWRITE),
PUSHDOWN_FILTER_THROUGH_PROJECT(RuleTypeClass.REWRITE),
PUSHDOWN_FILTER_THROUGH_PROJECT_UNDER_LIMIT(RuleTypeClass.REWRITE),
PUSHDOWN_FILTER_THROUGH_WINDOW(RuleTypeClass.REWRITE),
PUSHDOWN_PROJECT_THROUGH_LIMIT(RuleTypeClass.REWRITE),
PUSHDOWN_ALIAS_THROUGH_JOIN(RuleTypeClass.REWRITE),
PUSHDOWN_FILTER_THROUGH_SET_OPERATION(RuleTypeClass.REWRITE),
COLUMN_PRUNING(RuleTypeClass.REWRITE),
PUSHDOWN_TOP_N_THROUGH_PROJECTION_WINDOW(RuleTypeClass.REWRITE),
PUSHDOWN_TOP_N_THROUGH_WINDOW(RuleTypeClass.REWRITE),
TRANSPOSE_LOGICAL_SEMI_JOIN_LOGICAL_JOIN(RuleTypeClass.REWRITE),
TRANSPOSE_LOGICAL_SEMI_JOIN_LOGICAL_JOIN_PROJECT(RuleTypeClass.REWRITE),
LOGICAL_SEMI_JOIN_COMMUTE(RuleTypeClass.REWRITE),
@ -206,7 +210,9 @@ public enum RuleType {
// limit push down
PUSH_LIMIT_THROUGH_JOIN(RuleTypeClass.REWRITE),
PUSH_LIMIT_THROUGH_PROJECT_JOIN(RuleTypeClass.REWRITE),
PUSH_LIMIT_THROUGH_PROJECT_WINDOW(RuleTypeClass.REWRITE),
PUSH_LIMIT_THROUGH_UNION(RuleTypeClass.REWRITE),
PUSH_LIMIT_THROUGH_WINDOW(RuleTypeClass.REWRITE),
PUSH_LIMIT_INTO_SORT(RuleTypeClass.REWRITE),
// adjust nullable
ADJUST_NULLABLE(RuleTypeClass.REWRITE),
@ -258,6 +264,7 @@ public enum RuleType {
LOGICAL_FILTER_TO_PHYSICAL_FILTER_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_SORT_TO_PHYSICAL_QUICK_SORT_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_TOP_N_TO_PHYSICAL_TOP_N_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_PARTITION_TOP_N_TO_PHYSICAL_PARTITION_TOP_N_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_EMPTY_RELATION_TO_PHYSICAL_EMPTY_RELATION_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_LIMIT_TO_PHYSICAL_LIMIT_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_OLAP_SCAN_TO_PHYSICAL_OLAP_SCAN_RULE(RuleTypeClass.IMPLEMENTATION),

View File

@ -0,0 +1,53 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.rules.implementation;
import org.apache.doris.nereids.properties.OrderKey;
import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.trees.expressions.OrderExpression;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN;
import com.google.common.collect.ImmutableList;
import java.util.List;
/**
* Implementation rule that convert logical partition-top-n to physical partition-top-n.
*/
public class LogicalPartitionTopNToPhysicalPartitionTopN extends OneImplementationRuleFactory {
@Override
public Rule build() {
return logicalPartitionTopN().then(partitionTopN -> {
List<OrderKey> orderKeys = !partitionTopN.getOrderKeys().isEmpty()
? partitionTopN.getOrderKeys().stream()
.map(OrderExpression::getOrderKey)
.collect(ImmutableList.toImmutableList()) :
ImmutableList.of();
return new PhysicalPartitionTopN<>(
partitionTopN.getFunction(),
partitionTopN.getPartitionKeys(),
orderKeys,
partitionTopN.hasGlobalLimit(),
partitionTopN.getPartitionLimit(),
partitionTopN.getLogicalProperties(),
partitionTopN.child());
}).toRule(RuleType.LOGICAL_PARTITION_TOP_N_TO_PHYSICAL_PARTITION_TOP_N_RULE);
}
}

View File

@ -0,0 +1,167 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.rules.rewrite.logical;
import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.rules.rewrite.OneRewriteRuleFactory;
import org.apache.doris.nereids.trees.expressions.BinaryOperator;
import org.apache.doris.nereids.trees.expressions.EqualTo;
import org.apache.doris.nereids.trees.expressions.ExprId;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.LessThan;
import org.apache.doris.nereids.trees.expressions.LessThanEqual;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.WindowExpression;
import org.apache.doris.nereids.trees.expressions.literal.IntegerLikeLiteral;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalEmptyRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalPartitionTopN;
import org.apache.doris.nereids.trees.plans.logical.LogicalWindow;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.Predicate;
/**
* Push down the 'filter' into the 'window'.
* It will convert the filter condition to the 'limit value' and push down below the 'window'.
* But there are some restrictions, the details are explained below.
* For example:
* 'SELECT * FROM (
* SELECT *, ROW_NUMBER() OVER (ORDER BY b) AS row_number
* FROM t
* ) AS tt WHERE row_number <= 100;'
* The filter 'row_number <= 100' can be pushed down into the window operator.
* The following will demonstrate how the plan changes:
* Logical plan tree:
* any_node
* |
* filter (row_number <= 100)
* |
* window (PARTITION BY a ORDER BY b)
* |
* any_node
* transformed to:
* any_node
* |
* filter (row_number <= 100)
* |
* window (PARTITION BY a ORDER BY b)
* |
* partition_topn(PARTITION BY: a, ORDER BY b, Partition Limit: 100)
* |
* any_node
*/
public class PushdownFilterThroughWindow extends OneRewriteRuleFactory {
@Override
public Rule build() {
return logicalFilter(logicalWindow()).then(filter -> {
LogicalWindow<Plan> window = filter.child();
// We have already done such optimization rule, so just ignore it.
if (window.child(0) instanceof LogicalPartitionTopN) {
return filter;
}
List<NamedExpression> windowExprs = window.getWindowExpressions();
if (windowExprs.size() != 1) {
return filter;
}
NamedExpression windowExpr = windowExprs.get(0);
if (windowExpr.children().size() != 1 || !(windowExpr.child(0) instanceof WindowExpression)) {
return filter;
}
// Check the filter conditions. Now, we currently only support simple conditions of the form
// 'column </ <=/ = constant'. We will extract some related conjuncts and do some check.
Set<Expression> conjuncts = filter.getConjuncts();
Set<Expression> relatedConjuncts = extractRelatedConjuncts(conjuncts, windowExpr.getExprId());
boolean hasPartitionLimit = false;
long partitionLimit = Long.MAX_VALUE;
for (Expression conjunct : relatedConjuncts) {
Preconditions.checkArgument(conjunct instanceof BinaryOperator);
BinaryOperator op = (BinaryOperator) conjunct;
Expression leftChild = op.children().get(0);
Expression rightChild = op.children().get(1);
Preconditions.checkArgument(leftChild instanceof SlotReference
&& rightChild instanceof IntegerLikeLiteral);
long limitVal = ((IntegerLikeLiteral) rightChild).getLongValue();
// Adjust the value for 'limitVal' based on the comparison operators.
if (conjunct instanceof LessThan) {
limitVal--;
}
if (limitVal < 0) {
return new LogicalEmptyRelation(filter.getOutput());
}
if (hasPartitionLimit) {
partitionLimit = Math.min(partitionLimit, limitVal);
} else {
partitionLimit = limitVal;
hasPartitionLimit = true;
}
}
if (!hasPartitionLimit) {
return filter;
}
Optional<Plan> newWindow = window.pushPartitionLimitThroughWindow(partitionLimit, false);
if (!newWindow.isPresent()) {
return filter;
}
return filter.withChildren(newWindow.get());
}).toRule(RuleType.PUSHDOWN_FILTER_THROUGH_WINDOW);
}
private Set<Expression> extractRelatedConjuncts(Set<Expression> conjuncts, ExprId slotRefID) {
Predicate<Expression> condition = conjunct -> {
if (!(conjunct instanceof BinaryOperator)) {
return false;
}
BinaryOperator op = (BinaryOperator) conjunct;
Expression leftChild = op.children().get(0);
Expression rightChild = op.children().get(1);
if (!(conjunct instanceof LessThan || conjunct instanceof LessThanEqual || conjunct instanceof EqualTo)) {
return false;
}
// TODO: Now, we only support the column on the left side.
if (!(leftChild instanceof SlotReference) || !(rightChild instanceof IntegerLikeLiteral)) {
return false;
}
return ((SlotReference) leftChild).getExprId() == slotRefID;
};
return conjuncts.stream()
.filter(condition)
.collect(ImmutableSet.toImmutableSet());
}
}

View File

@ -31,11 +31,13 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
import org.apache.doris.nereids.trees.plans.logical.LogicalSort;
import org.apache.doris.nereids.trees.plans.logical.LogicalTopN;
import org.apache.doris.nereids.trees.plans.logical.LogicalUnion;
import org.apache.doris.nereids.trees.plans.logical.LogicalWindow;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import java.util.List;
import java.util.Optional;
/**
* Rules to push {@link org.apache.doris.nereids.trees.plans.logical.LogicalLimit} down.
@ -70,6 +72,31 @@ public class PushdownLimit implements RewriteRuleFactory {
return limit.withChildren(project.withChildren(newJoin));
}).toRule(RuleType.PUSH_LIMIT_THROUGH_PROJECT_JOIN),
// limit -> window
logicalLimit(logicalWindow())
.then(limit -> {
LogicalWindow<Plan> window = limit.child();
long partitionLimit = limit.getLimit() + limit.getOffset();
Optional<Plan> newWindow = window.pushPartitionLimitThroughWindow(partitionLimit, true);
if (!newWindow.isPresent()) {
return limit;
}
return limit.withChildren(newWindow.get());
}).toRule(RuleType.PUSH_LIMIT_THROUGH_WINDOW),
// limit -> project -> window
logicalLimit(logicalProject(logicalWindow()))
.then(limit -> {
LogicalProject<LogicalWindow<Plan>> project = limit.child();
LogicalWindow<Plan> window = project.child();
long partitionLimit = limit.getLimit() + limit.getOffset();
Optional<Plan> newWindow = window.pushPartitionLimitThroughWindow(partitionLimit, true);
if (!newWindow.isPresent()) {
return limit;
}
return limit.withChildren(project.withChildren(newWindow.get()));
}).toRule(RuleType.PUSH_LIMIT_THROUGH_PROJECT_WINDOW),
// limit -> union
logicalLimit(logicalUnion(multi()).when(union -> union.getQualifier() == Qualifier.ALL))
.whenNot(Limit::hasValidOffset)

View File

@ -0,0 +1,117 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.rules.rewrite.logical;
import org.apache.doris.nereids.properties.OrderKey;
import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.rules.rewrite.RewriteRuleFactory;
import org.apache.doris.nereids.trees.expressions.ExprId;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.WindowExpression;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
import org.apache.doris.nereids.trees.plans.logical.LogicalTopN;
import org.apache.doris.nereids.trees.plans.logical.LogicalWindow;
import com.google.common.collect.ImmutableList;
import java.util.List;
import java.util.Optional;
/**
* PushdownTopNThroughWindow push down the TopN through the Window and generate the PartitionTopN.
*/
public class PushdownTopNThroughWindow implements RewriteRuleFactory {
@Override
public List<Rule> buildRules() {
return ImmutableList.of(
// topn -> window
logicalTopN(logicalWindow()).then(topn -> {
LogicalWindow<Plan> window = topn.child();
ExprId windowExprId = getExprID4WindowFunc(window);
if (windowExprId == null) {
return topn;
}
if (!checkTopN4PartitionLimitPushDown(topn, windowExprId)) {
return topn;
}
long partitionLimit = topn.getLimit() + topn.getOffset();
Optional<Plan> newWindow = window.pushPartitionLimitThroughWindow(partitionLimit, true);
if (!newWindow.isPresent()) {
return topn;
}
return topn.withChildren(newWindow.get());
}).toRule(RuleType.PUSHDOWN_TOP_N_THROUGH_WINDOW),
// topn -> projection -> window
logicalTopN(logicalProject(logicalWindow())).then(topn -> {
LogicalProject<LogicalWindow<Plan>> project = topn.child();
LogicalWindow<Plan> window = project.child();
ExprId windowExprId = getExprID4WindowFunc(window);
if (windowExprId == null) {
return topn;
}
if (!checkTopN4PartitionLimitPushDown(topn, windowExprId)) {
return topn;
}
long partitionLimit = topn.getLimit() + topn.getOffset();
Optional<Plan> newWindow = window.pushPartitionLimitThroughWindow(partitionLimit, true);
if (!newWindow.isPresent()) {
return topn;
}
return topn.withChildren(project.withChildren(newWindow.get()));
}).toRule(RuleType.PUSHDOWN_TOP_N_THROUGH_PROJECTION_WINDOW)
);
}
private ExprId getExprID4WindowFunc(LogicalWindow<?> window) {
List<NamedExpression> windowExprs = window.getWindowExpressions();
if (windowExprs.size() != 1) {
return null;
}
NamedExpression windowExpr = windowExprs.get(0);
if (windowExpr.children().size() != 1 || !(windowExpr.child(0) instanceof WindowExpression)) {
return null;
}
return windowExpr.getExprId();
}
private boolean checkTopN4PartitionLimitPushDown(LogicalTopN<?> topn, ExprId slotRefID) {
List<OrderKey> orderKeys = topn.getOrderKeys();
if (orderKeys.size() != 1) {
return false;
}
OrderKey orderkey = orderKeys.get(0);
if (!orderkey.isAsc()) {
return false;
}
Expression orderKeyExpr = orderkey.getExpr();
if (!(orderKeyExpr instanceof SlotReference)) {
return false;
}
return ((SlotReference) orderKeyExpr).getExprId() == slotRefID;
}
}

View File

@ -37,6 +37,7 @@ import org.apache.doris.nereids.trees.plans.algebra.Filter;
import org.apache.doris.nereids.trees.plans.algebra.Generate;
import org.apache.doris.nereids.trees.plans.algebra.Limit;
import org.apache.doris.nereids.trees.plans.algebra.OneRowRelation;
import org.apache.doris.nereids.trees.plans.algebra.PartitionTopN;
import org.apache.doris.nereids.trees.plans.algebra.Project;
import org.apache.doris.nereids.trees.plans.algebra.Repeat;
import org.apache.doris.nereids.trees.plans.algebra.Scan;
@ -58,6 +59,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalLimit;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalOneRowRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalPartitionTopN;
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
import org.apache.doris.nereids.trees.plans.logical.LogicalRepeat;
import org.apache.doris.nereids.trees.plans.logical.LogicalSchemaScan;
@ -83,6 +85,7 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOneRowRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN;
import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
import org.apache.doris.nereids.trees.plans.physical.PhysicalQuickSort;
import org.apache.doris.nereids.trees.plans.physical.PhysicalRepeat;
@ -117,6 +120,7 @@ import java.util.stream.Collectors;
*/
public class StatsCalculator extends DefaultPlanVisitor<Statistics, Void> {
public static double DEFAULT_AGGREGATE_RATIO = 0.5;
public static double DEFAULT_COLUMN_NDV_RATIO = 0.5;
private final GroupExpression groupExpression;
private boolean forbidUnknownColStats = false;
@ -276,6 +280,11 @@ public class StatsCalculator extends DefaultPlanVisitor<Statistics, Void> {
return computeTopN(topN);
}
@Override
public Statistics visitLogicalPartitionTopN(LogicalPartitionTopN<? extends Plan> partitionTopN, Void context) {
return computePartitionTopN(partitionTopN);
}
@Override
public Statistics visitLogicalJoin(LogicalJoin<? extends Plan, ? extends Plan> join, Void context) {
return JoinEstimation.estimate(groupExpression.childStatistics(0),
@ -325,6 +334,11 @@ public class StatsCalculator extends DefaultPlanVisitor<Statistics, Void> {
return computeWindow(window);
}
@Override
public Statistics visitPhysicalPartitionTopN(PhysicalPartitionTopN partitionTopN, Void context) {
return computePartitionTopN(partitionTopN);
}
@Override
public Statistics visitPhysicalEmptyRelation(PhysicalEmptyRelation emptyRelation, Void context) {
return computeEmptyRelation(emptyRelation);
@ -545,6 +559,35 @@ public class StatsCalculator extends DefaultPlanVisitor<Statistics, Void> {
return stats.withRowCount(Math.min(stats.getRowCount(), topN.getLimit()));
}
private Statistics computePartitionTopN(PartitionTopN partitionTopN) {
Statistics stats = groupExpression.childStatistics(0);
double rowCount = stats.getRowCount();
List<Expression> partitionKeys = partitionTopN.getPartitionKeys();
if (!partitionTopN.hasGlobalLimit() && !partitionKeys.isEmpty()) {
// If there is no global limit. So result for the cardinality estimation is:
// NDV(partition key) * partitionLimit
Map<Expression, ColumnStatistic> childSlotToColumnStats = stats.columnStatistics();
List<ColumnStatistic> partitionByKeyStats = partitionKeys.stream()
.filter(childSlotToColumnStats::containsKey)
.map(childSlotToColumnStats::get)
.filter(s -> !s.isUnKnown)
.collect(Collectors.toList());
if (partitionByKeyStats.isEmpty()) {
// all column stats are unknown, use default ratio
rowCount = rowCount * DEFAULT_COLUMN_NDV_RATIO;
} else {
rowCount = Math.min(rowCount, partitionByKeyStats.stream().map(s -> s.ndv)
.max(Double::compare).get());
}
} else {
rowCount = Math.min(rowCount, partitionTopN.getPartitionLimit());
}
// TODO: for the filter push down window situation, we will prune the row count twice
// because we keep the pushed down filter. And it will be calculated twice, one of them in 'PartitionTopN'
// and the other is in 'Filter'. It's hard to dismiss.
return stats.updateRowCountOnly(rowCount);
}
private Statistics computeLimit(Limit limit) {
Statistics stats = groupExpression.childStatistics(0);
return stats.withRowCount(Math.min(stats.getRowCount(), limit.getLimit()));

View File

@ -43,6 +43,7 @@ public enum PlanType {
LOGICAL_REPEAT,
LOGICAL_SORT,
LOGICAL_TOP_N,
LOGICAL_PARTITION_TOP_N,
LOGICAL_LIMIT,
LOGICAL_OLAP_SCAN,
LOGICAL_SCHEMA_SCAN,
@ -79,6 +80,7 @@ public enum PlanType {
PHYSICAL_REPEAT,
PHYSICAL_QUICK_SORT,
PHYSICAL_TOP_N,
PHYSICAL_PARTITION_TOP_N,
PHYSICAL_LOCAL_QUICK_SORT,
PHYSICAL_LIMIT,
PHYSICAL_HASH_JOIN,

View File

@ -0,0 +1,33 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.plans.algebra;
import org.apache.doris.nereids.trees.expressions.Expression;
import java.util.List;
/**
* Common interface for logical/physical PartitionTopN.
*/
public interface PartitionTopN {
long getPartitionLimit();
boolean hasGlobalLimit();
List<Expression> getPartitionKeys();
}

View File

@ -0,0 +1,194 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.plans.logical;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.OrderExpression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.WindowExpression;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.algebra.PartitionTopN;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.types.WindowFuncType;
import org.apache.doris.nereids.util.Utils;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
/**
* Logical partition-top-N plan.
*/
public class LogicalPartitionTopN<CHILD_TYPE extends Plan> extends LogicalUnary<CHILD_TYPE> implements PartitionTopN {
private final WindowFuncType function;
private final List<Expression> partitionKeys;
private final List<OrderExpression> orderKeys;
private final boolean hasGlobalLimit;
private final long partitionLimit;
public LogicalPartitionTopN(WindowExpression windowExpr, boolean hasGlobalLimit, long partitionLimit,
CHILD_TYPE child) {
this(windowExpr.getFunction(), windowExpr.getPartitionKeys(), windowExpr.getOrderKeys(),
hasGlobalLimit, partitionLimit, Optional.empty(),
Optional.empty(), child);
}
public LogicalPartitionTopN(WindowFuncType windowFuncType, List<Expression> partitionKeys,
List<OrderExpression> orderKeys, boolean hasGlobalLimit, long partitionLimit,
CHILD_TYPE child) {
this(windowFuncType, partitionKeys, orderKeys, hasGlobalLimit, partitionLimit,
Optional.empty(), Optional.empty(), child);
}
/**
* Constructor for LogicalPartitionTopN.
*/
public LogicalPartitionTopN(WindowFuncType windowFuncType, List<Expression> partitionKeys,
List<OrderExpression> orderKeys, boolean hasGlobalLimit,
long partitionLimit, Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties, CHILD_TYPE child) {
super(PlanType.LOGICAL_PARTITION_TOP_N, groupExpression, logicalProperties, child);
this.function = windowFuncType;
this.partitionKeys = ImmutableList.copyOf(Objects.requireNonNull(partitionKeys,
"partitionKeys can not be null"));
this.orderKeys = ImmutableList.copyOf(Objects.requireNonNull(orderKeys,
"orderKeys can not be null"));
this.hasGlobalLimit = hasGlobalLimit;
this.partitionLimit = partitionLimit;
}
/**
* Constructor for LogicalPartitionTopN.
*/
public LogicalPartitionTopN(Expression expr, List<Expression> partitionKeys, List<OrderExpression> orderKeys,
boolean hasGlobalLimit, long partitionLimit, Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties, CHILD_TYPE child) {
super(PlanType.LOGICAL_PARTITION_TOP_N, groupExpression, logicalProperties, child);
String funcName = expr.toString();
if (funcName.equals("row_number()")) {
this.function = WindowFuncType.ROW_NUMBER;
} else if (funcName.equals("rank()")) {
this.function = WindowFuncType.RANK;
} else {
Preconditions.checkArgument(funcName.equals("dense_rank()"));
this.function = WindowFuncType.DENSE_RANK;
}
this.partitionKeys = ImmutableList.copyOf(
Objects.requireNonNull(partitionKeys, "partitionKeys can not be null"));
this.orderKeys = ImmutableList.copyOf(
Objects.requireNonNull(orderKeys, "orderKeys can not be null"));
this.hasGlobalLimit = hasGlobalLimit;
this.partitionLimit = partitionLimit;
}
@Override
public List<Slot> computeOutput() {
return child().getOutput();
}
public WindowFuncType getFunction() {
return function;
}
@Override
public List<Expression> getPartitionKeys() {
return partitionKeys;
}
public List<OrderExpression> getOrderKeys() {
return orderKeys;
}
@Override
public boolean hasGlobalLimit() {
return hasGlobalLimit;
}
@Override
public long getPartitionLimit() {
return partitionLimit;
}
@Override
public String toString() {
return Utils.toSqlString("LogicalPartitionTopN",
"function", function,
"partitionKeys", partitionKeys,
"orderKeys", orderKeys,
"hasGlobalLimit", hasGlobalLimit,
"partitionLimit", partitionLimit
);
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
LogicalPartitionTopN<?> that = (LogicalPartitionTopN<?>) o;
return Objects.equals(this.function, that.function) && Objects.equals(this.partitionKeys, that.partitionKeys)
&& Objects.equals(this.orderKeys, that.orderKeys) && this.hasGlobalLimit == that.hasGlobalLimit
&& this.partitionLimit == that.partitionLimit;
}
@Override
public int hashCode() {
return Objects.hash(function, partitionKeys, orderKeys, hasGlobalLimit, partitionLimit);
}
@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitLogicalPartitionTopN(this, context);
}
@Override
public List<? extends Expression> getExpressions() {
return new ImmutableList.Builder<Expression>()
.addAll(partitionKeys)
.addAll(orderKeys)
.build();
}
@Override
public LogicalPartitionTopN<Plan> withChildren(List<Plan> children) {
Preconditions.checkArgument(children.size() == 1);
return new LogicalPartitionTopN<>(function, partitionKeys, orderKeys, hasGlobalLimit, partitionLimit,
children.get(0));
}
@Override
public LogicalPartitionTopN<Plan> withGroupExpression(Optional<GroupExpression> groupExpression) {
return new LogicalPartitionTopN<>(function, partitionKeys, orderKeys, hasGlobalLimit, partitionLimit,
groupExpression, Optional.of(getLogicalProperties()), child());
}
@Override
public LogicalPartitionTopN<Plan> withLogicalProperties(Optional<LogicalProperties> logicalProperties) {
return new LogicalPartitionTopN<>(function, partitionKeys, orderKeys, hasGlobalLimit, partitionLimit,
Optional.empty(), logicalProperties, child());
}
}

View File

@ -22,11 +22,17 @@ import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.WindowExpression;
import org.apache.doris.nereids.trees.expressions.WindowFrame;
import org.apache.doris.nereids.trees.expressions.functions.window.DenseRank;
import org.apache.doris.nereids.trees.expressions.functions.window.Rank;
import org.apache.doris.nereids.trees.expressions.functions.window.RowNumber;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.algebra.Window;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.Utils;
import org.apache.doris.qe.ConnectContext;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
@ -159,4 +165,62 @@ public class LogicalWindow<CHILD_TYPE extends Plan> extends LogicalUnary<CHILD_T
public int hashCode() {
return Objects.hash(windowExpressions, isChecked);
}
/**
* pushPartitionLimitThroughWindow is used to push the partitionLimit through the window
* and generate the partitionTopN. If the window can not meet the requirement,
* it will return null. So when we use this function, we need check the null in the outside.
*/
public Optional<Plan> pushPartitionLimitThroughWindow(long partitionLimit, boolean hasGlobalLimit) {
if (!ConnectContext.get().getSessionVariable().isEnablePartitionTopN()) {
return Optional.empty();
}
// We have already done such optimization rule, so just ignore it.
if (child(0) instanceof LogicalPartitionTopN) {
return Optional.empty();
}
// Check the window function. There are some restrictions for window function:
// 1. The number of window function should be 1.
// 2. The window function should be one of the 'row_number()', 'rank()', 'dense_rank()'.
// 3. The window frame should be 'UNBOUNDED' to 'CURRENT'.
// 4. The 'PARTITION' key and 'ORDER' key can not be empty at the same time.
if (windowExpressions.size() != 1) {
return Optional.empty();
}
NamedExpression windowExpr = windowExpressions.get(0);
if (windowExpr.children().size() != 1 || !(windowExpr.child(0) instanceof WindowExpression)) {
return Optional.empty();
}
WindowExpression windowFunc = (WindowExpression) windowExpr.child(0);
// Check the window function name.
if (!(windowFunc.getFunction() instanceof RowNumber
|| windowFunc.getFunction() instanceof Rank
|| windowFunc.getFunction() instanceof DenseRank)) {
return Optional.empty();
}
// Check the partition key and order key.
if (windowFunc.getPartitionKeys().isEmpty() && windowFunc.getOrderKeys().isEmpty()) {
return Optional.empty();
}
// Check the window type and window frame.
Optional<WindowFrame> windowFrame = windowFunc.getWindowFrame();
if (windowFrame.isPresent()) {
WindowFrame frame = windowFrame.get();
if (!(frame.getLeftBoundary().getFrameBoundType() == WindowFrame.FrameBoundType.UNBOUNDED_PRECEDING
&& frame.getRightBoundary().getFrameBoundType() == WindowFrame.FrameBoundType.CURRENT_ROW)) {
return Optional.empty();
}
} else {
return Optional.empty();
}
LogicalWindow<?> window = (LogicalWindow<?>) withChildren(new LogicalPartitionTopN<>(windowFunc, hasGlobalLimit,
partitionLimit, child(0)));
return Optional.ofNullable(window);
}
}

View File

@ -0,0 +1,186 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.plans.physical;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.properties.OrderKey;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.algebra.PartitionTopN;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.types.WindowFuncType;
import org.apache.doris.nereids.util.Utils;
import org.apache.doris.statistics.Statistics;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
/**
* Physical partition-top-N plan.
*/
public class PhysicalPartitionTopN<CHILD_TYPE extends Plan> extends PhysicalUnary<CHILD_TYPE> implements PartitionTopN {
private final WindowFuncType function;
private final List<Expression> partitionKeys;
private final List<OrderKey> orderKeys;
private final Boolean hasGlobalLimit;
private final long partitionLimit;
public PhysicalPartitionTopN(WindowFuncType function, List<Expression> partitionKeys, List<OrderKey> orderKeys,
Boolean hasGlobalLimit, long partitionLimit,
LogicalProperties logicalProperties, CHILD_TYPE child) {
this(function, partitionKeys, orderKeys, hasGlobalLimit, partitionLimit,
Optional.empty(), logicalProperties, child);
}
/**
* Constructor of PhysicalPartitionTopN.
*/
public PhysicalPartitionTopN(WindowFuncType function, List<Expression> partitionKeys, List<OrderKey> orderKeys,
Boolean hasGlobalLimit, long partitionLimit,
Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties,
CHILD_TYPE child) {
super(PlanType.PHYSICAL_PARTITION_TOP_N, groupExpression, logicalProperties, child);
this.function = function;
this.partitionKeys = ImmutableList.copyOf(partitionKeys);
this.orderKeys = ImmutableList.copyOf(orderKeys);
this.hasGlobalLimit = hasGlobalLimit;
this.partitionLimit = partitionLimit;
}
/**
* Constructor of PhysicalPartitionTopN.
*/
public PhysicalPartitionTopN(WindowFuncType function, List<Expression> partitionKeys, List<OrderKey> orderKeys,
Boolean hasGlobalLimit, long partitionLimit,
Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties,
PhysicalProperties physicalProperties, Statistics statistics, CHILD_TYPE child) {
super(PlanType.PHYSICAL_PARTITION_TOP_N, groupExpression, logicalProperties, physicalProperties,
statistics, child);
this.function = function;
this.partitionKeys = partitionKeys;
this.orderKeys = orderKeys;
this.hasGlobalLimit = hasGlobalLimit;
this.partitionLimit = partitionLimit;
}
public WindowFuncType getFunction() {
return function;
}
@Override
public List<Expression> getPartitionKeys() {
return partitionKeys;
}
public List<OrderKey> getOrderKeys() {
return orderKeys;
}
@Override
public boolean hasGlobalLimit() {
return hasGlobalLimit;
}
@Override
public long getPartitionLimit() {
return partitionLimit;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
PhysicalPartitionTopN<?> that = (PhysicalPartitionTopN<?>) o;
return Objects.equals(this.function, that.function)
&& Objects.equals(this.partitionKeys, that.partitionKeys)
&& Objects.equals(this.orderKeys, that.orderKeys) && this.hasGlobalLimit == that.hasGlobalLimit
&& this.partitionLimit == that.partitionLimit;
}
@Override
public int hashCode() {
return Objects.hash(function, partitionKeys, orderKeys, hasGlobalLimit, partitionLimit);
}
@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitPhysicalPartitionTopN(this, context);
}
@Override
public List<? extends Expression> getExpressions() {
return new ImmutableList.Builder<Expression>()
.addAll(partitionKeys)
.addAll(orderKeys.stream()
.map(OrderKey::getExpr)
.collect(Collectors.toList()))
.build();
}
@Override
public PhysicalPartitionTopN<Plan> withChildren(List<Plan> children) {
Preconditions.checkArgument(children.size() == 1);
return new PhysicalPartitionTopN<>(function, partitionKeys, orderKeys, hasGlobalLimit, partitionLimit,
getLogicalProperties(), children.get(0));
}
@Override
public PhysicalPartitionTopN<CHILD_TYPE> withGroupExpression(Optional<GroupExpression> groupExpression) {
return new PhysicalPartitionTopN<>(function, partitionKeys, orderKeys, hasGlobalLimit, partitionLimit,
groupExpression, getLogicalProperties(), child());
}
@Override
public PhysicalPartitionTopN<CHILD_TYPE> withLogicalProperties(Optional<LogicalProperties> logicalProperties) {
return new PhysicalPartitionTopN<>(function, partitionKeys, orderKeys, hasGlobalLimit, partitionLimit,
Optional.empty(), logicalProperties.get(), child());
}
@Override
public PhysicalPartitionTopN<CHILD_TYPE> withPhysicalPropertiesAndStats(PhysicalProperties physicalProperties,
Statistics statistics) {
return new PhysicalPartitionTopN<>(function, partitionKeys, orderKeys, hasGlobalLimit, partitionLimit,
groupExpression, getLogicalProperties(), physicalProperties, statistics, child());
}
@Override
public String toString() {
return Utils.toSqlString("PhysicalPartitionTopN[" + id.asInt() + "]" + getGroupIdAsString(),
"function", function,
"partitionKeys", partitionKeys,
"orderKeys", orderKeys,
"hasGlobalLimit", hasGlobalLimit,
"partitionLimit", partitionLimit
);
}
}

View File

@ -46,6 +46,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalLimit;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalOneRowRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalPartitionTopN;
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
import org.apache.doris.nereids.trees.plans.logical.LogicalRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalRepeat;
@ -77,6 +78,7 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOneRowRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN;
import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
import org.apache.doris.nereids.trees.plans.physical.PhysicalQuickSort;
import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation;
@ -216,6 +218,10 @@ public abstract class PlanVisitor<R, C> {
return visit(topN, context);
}
public R visitLogicalPartitionTopN(LogicalPartitionTopN<? extends Plan> partitionTopN, C context) {
return visit(partitionTopN, context);
}
public R visitLogicalLimit(LogicalLimit<? extends Plan> limit, C context) {
return visit(limit, context);
}
@ -336,6 +342,10 @@ public abstract class PlanVisitor<R, C> {
return visitAbstractPhysicalSort(topN, context);
}
public R visitPhysicalPartitionTopN(PhysicalPartitionTopN<? extends Plan> partitionTopN, C context) {
return visit(partitionTopN, context);
}
public R visitPhysicalLimit(PhysicalLimit<? extends Plan> limit, C context) {
return visit(limit, context);
}

View File

@ -0,0 +1,27 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.types;
/**
* WindowFuncType represent the window function which is used in the PartitionTopN.
*/
public enum WindowFuncType {
ROW_NUMBER,
RANK,
DENSE_RANK,
}

View File

@ -0,0 +1,222 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.planner;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.SlotId;
import org.apache.doris.analysis.SortInfo;
import org.apache.doris.common.NotImplementedException;
import org.apache.doris.nereids.types.WindowFuncType;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TPartitionSortNode;
import org.apache.doris.thrift.TPlanNode;
import org.apache.doris.thrift.TPlanNodeType;
import org.apache.doris.thrift.TSortInfo;
import org.apache.doris.thrift.TopNAlgorithm;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
/**
* PartitionSortNode.
* PartitionSortNode is only used in the Nereids.
*/
public class PartitionSortNode extends PlanNode {
private static final Logger LOG = LogManager.getLogger(PartitionSortNode.class);
private List<Expr> resolvedTupleExprs;
private final WindowFuncType function;
private final List<Expr> partitionExprs;
private final SortInfo info;
private final boolean hasGlobalLimit;
private final long partitionLimit;
private boolean isUnusedExprRemoved = false;
private ArrayList<Boolean> nullabilityChangedFlags = Lists.newArrayList();
/**
* Constructor.
*/
public PartitionSortNode(PlanNodeId id, PlanNode input, WindowFuncType function, List<Expr> partitionExprs,
SortInfo info, boolean hasGlobalLimit, long partitionLimit,
List<Expr> outputList, List<Expr> orderingExpr) {
super(id, "PartitionTopN", StatisticalType.PARTITION_TOPN_NODE);
this.function = function;
this.partitionExprs = partitionExprs;
this.info = info;
this.hasGlobalLimit = hasGlobalLimit;
this.partitionLimit = partitionLimit;
this.tupleIds.addAll(Lists.newArrayList(info.getSortTupleDescriptor().getId()));
this.tblRefIds.addAll(Lists.newArrayList(info.getSortTupleDescriptor().getId()));
this.nullableTupleIds.addAll(input.getNullableTupleIds());
this.children.add(input);
List<Expr> resolvedTupleExprs = new ArrayList<>();
for (Expr order : orderingExpr) {
if (!resolvedTupleExprs.contains(order)) {
resolvedTupleExprs.add(order);
}
}
for (Expr output : outputList) {
if (!resolvedTupleExprs.contains(output)) {
resolvedTupleExprs.add(output);
}
}
this.resolvedTupleExprs = ImmutableList.copyOf(resolvedTupleExprs);
info.setSortTupleSlotExprs(resolvedTupleExprs);
nullabilityChangedFlags.clear();
for (int i = 0; i < resolvedTupleExprs.size(); i++) {
nullabilityChangedFlags.add(false);
}
Preconditions.checkArgument(info.getOrderingExprs().size() == info.getIsAscOrder().size());
}
public SortInfo getSortInfo() {
return info;
}
@Override
public void getMaterializedIds(Analyzer analyzer, List<SlotId> ids) {
super.getMaterializedIds(analyzer, ids);
Expr.getIds(info.getOrderingExprs(), null, ids);
}
@Override
public String getNodeExplainString(String prefix, TExplainLevel detailLevel) {
if (detailLevel == TExplainLevel.BRIEF) {
return "";
}
StringBuilder output = new StringBuilder();
// Add the function name.
String funcName;
if (function == WindowFuncType.ROW_NUMBER) {
funcName = "row_number";
} else if (function == WindowFuncType.RANK) {
funcName = "rank";
} else {
funcName = "dense_rank";
}
output.append(prefix).append("functions: ").append(funcName).append("\n");
// Add the partition expr.
List<String> strings = Lists.newArrayList();
if (!partitionExprs.isEmpty()) {
output.append(prefix).append("partition by: ");
for (Expr partitionExpr : partitionExprs) {
strings.add(partitionExpr.toSql());
}
output.append(Joiner.on(", ").join(strings));
output.append("\n");
}
// Add the order by.
output.append(prefix).append("order by: ");
Iterator<Expr> expr = info.getOrderingExprs().iterator();
Iterator<Boolean> isAsc = info.getIsAscOrder().iterator();
boolean start = true;
while (expr.hasNext()) {
if (start) {
start = false;
} else {
output.append(", ");
}
output.append(expr.next().toSql()).append(" ");
output.append(isAsc.next() ? "ASC" : "DESC");
}
output.append("\n");
// Add the limit information;
output.append(prefix).append("has global limit: ").append(hasGlobalLimit).append("\n");
output.append(prefix).append("partition limit: ").append(partitionLimit).append("\n");
return output.toString();
}
private void removeUnusedExprs() {
if (!isUnusedExprRemoved) {
if (resolvedTupleExprs != null) {
List<SlotDescriptor> slotDescriptorList = this.info.getSortTupleDescriptor().getSlots();
for (int i = slotDescriptorList.size() - 1; i >= 0; i--) {
if (!slotDescriptorList.get(i).isMaterialized()) {
resolvedTupleExprs.remove(i);
nullabilityChangedFlags.remove(i);
}
}
}
isUnusedExprRemoved = true;
}
}
@Override
protected void toThrift(TPlanNode msg) {
msg.node_type = TPlanNodeType.PARTITION_SORT_NODE;
TSortInfo sortInfo = info.toThrift();
Preconditions.checkState(tupleIds.size() == 1, "Incorrect size for tupleIds in PartitionSortNode");
removeUnusedExprs();
if (resolvedTupleExprs != null) {
sortInfo.setSortTupleSlotExprs(Expr.treesToThrift(resolvedTupleExprs));
// FIXME this is a bottom line solution for wrong nullability of resolvedTupleExprs
// remove the following line after nereids online
sortInfo.setSlotExprsNullabilityChangedFlags(nullabilityChangedFlags);
}
TopNAlgorithm topNAlgorithm;
if (function == WindowFuncType.ROW_NUMBER) {
topNAlgorithm = TopNAlgorithm.ROW_NUMBER;
} else if (function == WindowFuncType.RANK) {
topNAlgorithm = TopNAlgorithm.RANK;
} else {
topNAlgorithm = TopNAlgorithm.DENSE_RANK;
}
TPartitionSortNode partitionSortNode = new TPartitionSortNode();
partitionSortNode.setTopNAlgorithm(topNAlgorithm);
partitionSortNode.setPartitionExprs(Expr.treesToThrift(partitionExprs));
partitionSortNode.setSortInfo(sortInfo);
partitionSortNode.setHasGlobalLimit(hasGlobalLimit);
partitionSortNode.setPartitionInnerLimit(partitionLimit);
msg.partition_sort_node = partitionSortNode;
}
@Override
public Set<SlotId> computeInputSlotIds(Analyzer analyzer) throws NotImplementedException {
removeUnusedExprs();
List<Expr> materializedTupleExprs = new ArrayList<>(resolvedTupleExprs);
List<SlotId> result = Lists.newArrayList();
Expr.getIds(materializedTupleExprs, null, result);
return new HashSet<>(result);
}
}

View File

@ -173,6 +173,8 @@ public class SessionVariable implements Serializable, Writable {
public static final String ENABLE_BUSHY_TREE = "enable_bushy_tree";
public static final String ENABLE_PARTITION_TOPN = "enable_partition_topn";
public static final String ENABLE_INFER_PREDICATE = "enable_infer_predicate";
public static final long DEFAULT_INSERT_VISIBLE_TIMEOUT_MS = 10_000;
@ -639,6 +641,9 @@ public class SessionVariable implements Serializable, Writable {
@VariableMgr.VarAttr(name = ENABLE_BUSHY_TREE, needForward = true)
private boolean enableBushyTree = false;
@VariableMgr.VarAttr(name = ENABLE_PARTITION_TOPN)
private boolean enablePartitionTopN = false;
@VariableMgr.VarAttr(name = ENABLE_INFER_PREDICATE)
private boolean enableInferPredicate = true;
@ -1641,6 +1646,14 @@ public class SessionVariable implements Serializable, Writable {
this.enableBushyTree = enableBushyTree;
}
public boolean isEnablePartitionTopN() {
return enablePartitionTopN;
}
public void setEnablePartitionTopN(boolean enablePartitionTopN) {
this.enablePartitionTopN = enablePartitionTopN;
}
public boolean isReturnObjectDataAsBinary() {
return returnObjectDataAsBinary;
}

View File

@ -38,6 +38,7 @@ public enum StatisticalType {
MYSQL_SCAN_NODE,
ODBC_SCAN_NODE,
OLAP_SCAN_NODE,
PARTITION_TOPN_NODE,
REPEAT_NODE,
SELECT_NODE,
SET_OPERATION_NODE,

View File

@ -0,0 +1,120 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.rules.rewrite.logical;
import org.apache.doris.nereids.properties.OrderKey;
import org.apache.doris.nereids.trees.expressions.Alias;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.LessThanEqual;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.OrderExpression;
import org.apache.doris.nereids.trees.expressions.WindowExpression;
import org.apache.doris.nereids.trees.expressions.WindowFrame;
import org.apache.doris.nereids.trees.expressions.functions.window.RowNumber;
import org.apache.doris.nereids.trees.expressions.literal.Literal;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalWindow;
import org.apache.doris.nereids.types.WindowFuncType;
import org.apache.doris.nereids.util.LogicalPlanBuilder;
import org.apache.doris.nereids.util.MemoPatternMatchSupported;
import org.apache.doris.nereids.util.MemoTestUtils;
import org.apache.doris.nereids.util.PlanChecker;
import org.apache.doris.nereids.util.PlanConstructor;
import org.apache.doris.nereids.util.RelationUtil;
import org.apache.doris.qe.ConnectContext;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import org.junit.jupiter.api.Test;
import java.util.List;
public class PushdownFilterThroughWindowTest implements MemoPatternMatchSupported {
private final LogicalOlapScan scan = new LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.student,
ImmutableList.of(""));
/*-
* origin plan:
* project
* |
* filter row_number <= 100
* |
* window(ROW_NUMBER() as row_number PARTITION BY gender ORDER BY age)
* |
* scan(student)
*
* transformed plan:
* project
* |
* filter row_number <= 100
* |
* window(ROW_NUMBER() as row_number PARTITION BY gender ORDER BY age)
* |
* partitionTopN(row_number(), partition by gender, order by age, hasGlobalLimit: false, partitionLimit: 100)
* |
* scan(student)
*/
@Test
public void pushDownFilterThroughWindowTest() {
ConnectContext context = MemoTestUtils.createConnectContext();
context.getSessionVariable().setEnablePartitionTopN(true);
NamedExpression gender = scan.getOutput().get(1).toSlot();
NamedExpression age = scan.getOutput().get(3).toSlot();
List<Expression> partitionKeyList = ImmutableList.of(gender);
List<OrderExpression> orderKeyList = ImmutableList.of(new OrderExpression(
new OrderKey(age, true, true)));
WindowFrame windowFrame = new WindowFrame(WindowFrame.FrameUnitsType.ROWS,
WindowFrame.FrameBoundary.newPrecedingBoundary(),
WindowFrame.FrameBoundary.newCurrentRowBoundary());
WindowExpression window1 = new WindowExpression(new RowNumber(), partitionKeyList, orderKeyList, windowFrame);
Alias windowAlias1 = new Alias(window1, window1.toSql());
List<NamedExpression> expressions = Lists.newArrayList(windowAlias1);
LogicalWindow<LogicalOlapScan> window = new LogicalWindow<>(expressions, scan);
Expression filterPredicate = new LessThanEqual(window.getOutput().get(4).toSlot(), Literal.of(100));
LogicalPlan plan = new LogicalPlanBuilder(window)
.filter(filterPredicate)
.project(ImmutableList.of(0))
.build();
PlanChecker.from(context, plan)
.applyTopDown(new PushdownFilterThroughWindow())
.matches(
logicalProject(
logicalFilter(
logicalWindow(
logicalPartitionTopN(
logicalOlapScan()
).when(logicalPartitionTopN -> {
WindowFuncType funName = logicalPartitionTopN.getFunction();
List<Expression> partitionKeys = logicalPartitionTopN.getPartitionKeys();
List<OrderExpression> orderKeys = logicalPartitionTopN.getOrderKeys();
boolean hasGlobalLimit = logicalPartitionTopN.hasGlobalLimit();
long partitionLimit = logicalPartitionTopN.getPartitionLimit();
return funName == WindowFuncType.ROW_NUMBER && partitionKeys.equals(partitionKeyList)
&& orderKeys.equals(orderKeyList) && !hasGlobalLimit && partitionLimit == 100;
})
)
).when(filter -> filter.getConjuncts().equals(ImmutableSet.of(filterPredicate)))
)
);
}
}

View File

@ -19,15 +19,28 @@ package org.apache.doris.nereids.rules.rewrite.logical;
import org.apache.doris.nereids.analyzer.UnboundSlot;
import org.apache.doris.nereids.pattern.PatternDescriptor;
import org.apache.doris.nereids.properties.OrderKey;
import org.apache.doris.nereids.trees.expressions.Alias;
import org.apache.doris.nereids.trees.expressions.EqualTo;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.OrderExpression;
import org.apache.doris.nereids.trees.expressions.WindowExpression;
import org.apache.doris.nereids.trees.expressions.WindowFrame;
import org.apache.doris.nereids.trees.expressions.functions.window.Rank;
import org.apache.doris.nereids.trees.expressions.functions.window.RowNumber;
import org.apache.doris.nereids.trees.plans.JoinType;
import org.apache.doris.nereids.trees.plans.LimitPhase;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
import org.apache.doris.nereids.trees.plans.logical.LogicalLimit;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
import org.apache.doris.nereids.trees.plans.logical.LogicalSort;
import org.apache.doris.nereids.trees.plans.logical.LogicalWindow;
import org.apache.doris.nereids.types.WindowFuncType;
import org.apache.doris.nereids.util.LogicalPlanBuilder;
import org.apache.doris.nereids.util.MemoPatternMatchSupported;
import org.apache.doris.nereids.util.MemoTestUtils;
import org.apache.doris.nereids.util.PlanChecker;
@ -35,9 +48,11 @@ import org.apache.doris.nereids.util.PlanConstructor;
import org.apache.doris.nereids.util.RelationUtil;
import org.apache.doris.planner.OlapScanNode;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.utframe.TestWithFeService;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@ -49,7 +64,7 @@ import java.util.function.Function;
import java.util.stream.Collectors;
class PushdownLimitTest extends TestWithFeService implements MemoPatternMatchSupported {
private Plan scanScore = new LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.score);
private final LogicalOlapScan scanScore = new LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.score);
private Plan scanStudent = new LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.student);
@Override
@ -246,6 +261,91 @@ class PushdownLimitTest extends TestWithFeService implements MemoPatternMatchSup
);
}
@Test
public void testLimitPushWindow() {
ConnectContext context = MemoTestUtils.createConnectContext();
context.getSessionVariable().setEnablePartitionTopN(true);
NamedExpression grade = scanScore.getOutput().get(2).toSlot();
List<Expression> partitionKeyList = ImmutableList.of();
List<OrderExpression> orderKeyList = ImmutableList.of(new OrderExpression(
new OrderKey(grade, true, true)));
WindowFrame windowFrame = new WindowFrame(WindowFrame.FrameUnitsType.ROWS,
WindowFrame.FrameBoundary.newPrecedingBoundary(),
WindowFrame.FrameBoundary.newCurrentRowBoundary());
WindowExpression window1 = new WindowExpression(new RowNumber(), partitionKeyList, orderKeyList, windowFrame);
Alias windowAlias1 = new Alias(window1, window1.toSql());
List<NamedExpression> expressions = Lists.newArrayList(windowAlias1);
LogicalWindow<LogicalOlapScan> window = new LogicalWindow<>(expressions, scanScore);
LogicalPlan plan = new LogicalPlanBuilder(window)
.limit(100)
.build();
PlanChecker.from(context, plan)
.rewrite()
.matches(
logicalLimit(
logicalWindow(
logicalPartitionTopN(
logicalOlapScan()
).when(logicalPartitionTopN -> {
WindowFuncType funName = logicalPartitionTopN.getFunction();
List<Expression> partitionKeys = logicalPartitionTopN.getPartitionKeys();
List<OrderExpression> orderKeys = logicalPartitionTopN.getOrderKeys();
boolean hasGlobalLimit = logicalPartitionTopN.hasGlobalLimit();
long partitionLimit = logicalPartitionTopN.getPartitionLimit();
return funName == WindowFuncType.ROW_NUMBER && partitionKeys.equals(partitionKeyList)
&& orderKeys.equals(orderKeyList) && hasGlobalLimit && partitionLimit == 100;
})
)
).when(limit -> limit.getLimit() == 100)
);
}
@Test
public void testTopNPushWindow() {
ConnectContext context = MemoTestUtils.createConnectContext();
context.getSessionVariable().setEnablePartitionTopN(true);
NamedExpression grade = scanScore.getOutput().get(2).toSlot();
List<Expression> partitionKeyList = ImmutableList.of();
List<OrderExpression> orderKeyList = ImmutableList.of(new OrderExpression(
new OrderKey(grade, true, true)));
WindowFrame windowFrame = new WindowFrame(WindowFrame.FrameUnitsType.RANGE,
WindowFrame.FrameBoundary.newPrecedingBoundary(),
WindowFrame.FrameBoundary.newCurrentRowBoundary());
WindowExpression window1 = new WindowExpression(new Rank(), partitionKeyList, orderKeyList, windowFrame);
Alias windowAlias1 = new Alias(window1, window1.toSql());
List<NamedExpression> expressions = Lists.newArrayList(windowAlias1);
LogicalWindow<LogicalOlapScan> window = new LogicalWindow<>(expressions, scanScore);
List<OrderKey> orderKey = ImmutableList.of(
new OrderKey(windowAlias1.toSlot(), true, true)
);
LogicalSort<LogicalWindow> sort = new LogicalSort<>(orderKey, window);
LogicalPlan plan = new LogicalPlanBuilder(sort)
.limit(100)
.build();
PlanChecker.from(context, plan)
.rewrite()
.matches(
logicalTopN(
logicalWindow(
logicalPartitionTopN(
logicalOlapScan()
).when(logicalPartitionTopN -> {
WindowFuncType funName = logicalPartitionTopN.getFunction();
boolean hasGlobalLimit = logicalPartitionTopN.hasGlobalLimit();
long partitionLimit = logicalPartitionTopN.getPartitionLimit();
return funName == WindowFuncType.RANK && hasGlobalLimit && partitionLimit == 100;
})
)
)
);
}
private void test(JoinType joinType, boolean hasProject, PatternDescriptor<? extends Plan> pattern) {
Plan plan = generatePlan(joinType, hasProject);
PlanChecker.from(MemoTestUtils.createConnectContext())