diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/cost/CostModelV1.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/cost/CostModelV1.java index 820649331c..fd587a42c2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/cost/CostModelV1.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/cost/CostModelV1.java @@ -33,6 +33,7 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin; import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcScan; import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin; import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan; +import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN; import org.apache.doris.nereids.trees.plans.physical.PhysicalProject; import org.apache.doris.nereids.trees.plans.physical.PhysicalQuickSort; import org.apache.doris.nereids.trees.plans.physical.PhysicalSchemaScan; @@ -147,6 +148,16 @@ class CostModelV1 extends PlanVisitor { childStatistics.getRowCount()); } + @Override + public Cost visitPhysicalPartitionTopN(PhysicalPartitionTopN partitionTopN, PlanContext context) { + Statistics statistics = context.getStatisticsWithCheck(); + Statistics childStatistics = context.getChildStatistics(0); + return CostV1.of( + childStatistics.getRowCount(), + statistics.getRowCount(), + childStatistics.getRowCount()); + } + @Override public Cost visitPhysicalDistribute( PhysicalDistribute distribute, PlanContext context) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/cost/CostModelV2.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/cost/CostModelV2.java index fec5b811cf..c90f28c6b8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/cost/CostModelV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/cost/CostModelV2.java @@ -35,6 +35,7 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcScan; import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit; import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin; import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan; +import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN; import org.apache.doris.nereids.trees.plans.physical.PhysicalProject; import org.apache.doris.nereids.trees.plans.physical.PhysicalRepeat; import org.apache.doris.nereids.trees.plans.physical.PhysicalSchemaScan; @@ -156,6 +157,20 @@ class CostModelV2 extends PlanVisitor { return new CostV2(startCost, runCost, statistics.computeSize()); } + @Override + public Cost visitPhysicalPartitionTopN(PhysicalPartitionTopN partitionTopN, PlanContext context) { + Statistics statistics = context.getStatisticsWithCheck(); + Statistics childStatistics = context.getChildStatistics(0); + + // Random set a value. The partitionTopN is generated in the rewrite phase, + // and it only has one physical implementation. So this cost will not affect the result. + double runCost = childStatistics.getRowCount() * CMP_COST * Math.log(statistics.getRowCount()) + / statistics.getBENumber(); + + double startCost = runCost; + return new CostV2(startCost, runCost, statistics.computeSize()); + } + @Override public Cost visitPhysicalDistribute(PhysicalDistribute distribute, PlanContext context) { Statistics childStatistics = context.getChildStatistics(0); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index a8872ed59e..9e567bbe36 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -103,6 +103,7 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin; import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan; import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalOneRowRelation; +import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN; import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan; import org.apache.doris.nereids.trees.plans.physical.PhysicalProject; import org.apache.doris.nereids.trees.plans.physical.PhysicalQuickSort; @@ -137,6 +138,7 @@ import org.apache.doris.planner.JoinNodeBase; import org.apache.doris.planner.NestedLoopJoinNode; import org.apache.doris.planner.OlapScanNode; import org.apache.doris.planner.OlapTableSink; +import org.apache.doris.planner.PartitionSortNode; import org.apache.doris.planner.PlanFragment; import org.apache.doris.planner.PlanNode; import org.apache.doris.planner.RepeatNode; @@ -994,6 +996,57 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor partitionTopN, + PlanTranslatorContext context) { + PlanFragment inputFragment = partitionTopN.child(0).accept(this, context); + + Preconditions.checkArgument(!(partitionTopN.child(0) instanceof ExchangeNode)); + PartitionSortNode partitionSortNode = translatePartitionSortNode(partitionTopN, + inputFragment.getPlanRoot(), context); + addPlanRoot(inputFragment, partitionSortNode, partitionTopN); + + return inputFragment; + } + + private PartitionSortNode translatePartitionSortNode(PhysicalPartitionTopN partitionTopN, + PlanNode childNode, PlanTranslatorContext context) { + // Generate the SortInfo, similar to 'translateSortNode'. + List oldOrderingExprList = Lists.newArrayList(); + List ascOrderList = Lists.newArrayList(); + List nullsFirstParamList = Lists.newArrayList(); + List orderKeyList = partitionTopN.getOrderKeys(); + // 1. Get previous slotRef + orderKeyList.forEach(k -> { + oldOrderingExprList.add(ExpressionTranslator.translate(k.getExpr(), context)); + ascOrderList.add(k.isAsc()); + nullsFirstParamList.add(k.isNullFirst()); + }); + List sortTupleOutputList = new ArrayList<>(); + List outputList = partitionTopN.getOutput(); + outputList.forEach(k -> { + sortTupleOutputList.add(ExpressionTranslator.translate(k, context)); + }); + List partitionExprs = partitionTopN.getPartitionKeys().stream() + .map(e -> ExpressionTranslator.translate(e, context)) + .collect(Collectors.toList()); + // 2. Generate new Tuple and get current slotRef for newOrderingExprList + List newOrderingExprList = Lists.newArrayList(); + TupleDescriptor tupleDesc = generateTupleDesc(outputList, orderKeyList, newOrderingExprList, context, null); + // 3. fill in SortInfo members + SortInfo sortInfo = new SortInfo(newOrderingExprList, ascOrderList, nullsFirstParamList, tupleDesc); + + PartitionSortNode partitionSortNode = new PartitionSortNode(context.nextPlanNodeId(), childNode, + partitionTopN.getFunction(), partitionExprs, sortInfo, partitionTopN.hasGlobalLimit(), + partitionTopN.getPartitionLimit(), sortTupleOutputList, oldOrderingExprList); + + if (partitionTopN.getStats() != null) { + partitionSortNode.setCardinality((long) partitionTopN.getStats().getRowCount()); + } + updateLegacyPlanIdToPhysicalPlan(partitionSortNode, partitionTopN); + return partitionSortNode; + } + @Override public PlanFragment visitPhysicalTopN(PhysicalTopN topN, PlanTranslatorContext context) { PlanFragment inputFragment = topN.child(0).accept(this, context); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/batch/NereidsRewriter.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/batch/NereidsRewriter.java index b1bb239e2c..324a0b95ce 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/batch/NereidsRewriter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/batch/NereidsRewriter.java @@ -67,7 +67,9 @@ import org.apache.doris.nereids.rules.rewrite.logical.PruneOlapScanPartition; import org.apache.doris.nereids.rules.rewrite.logical.PruneOlapScanTablet; import org.apache.doris.nereids.rules.rewrite.logical.PushFilterInsideJoin; import org.apache.doris.nereids.rules.rewrite.logical.PushdownFilterThroughProject; +import org.apache.doris.nereids.rules.rewrite.logical.PushdownFilterThroughWindow; import org.apache.doris.nereids.rules.rewrite.logical.PushdownLimit; +import org.apache.doris.nereids.rules.rewrite.logical.PushdownTopNThroughWindow; import org.apache.doris.nereids.rules.rewrite.logical.ReorderJoin; import org.apache.doris.nereids.rules.rewrite.logical.SemiJoinCommute; import org.apache.doris.nereids.rules.rewrite.logical.SimplifyAggGroupBy; @@ -238,6 +240,14 @@ public class NereidsRewriter extends BatchRewriteJob { )).addAll(RuleSet.PUSH_DOWN_FILTERS).build()) ), + topic("Window optimization", + topDown( + new PushdownLimit(), + new PushdownTopNThroughWindow(), + new PushdownFilterThroughWindow() + ) + ), + // TODO: I think these rules should be implementation rules, and generate alternative physical plans. topic("Table/Physical optimization", topDown( diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java index 069a4811e6..339378e244 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java @@ -43,6 +43,7 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit; import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin; import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan; import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink; +import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN; import org.apache.doris.nereids.trees.plans.physical.PhysicalProject; import org.apache.doris.nereids.trees.plans.physical.PhysicalStorageLayerAggregate; import org.apache.doris.nereids.trees.plans.physical.PhysicalTVFRelation; @@ -132,6 +133,14 @@ public class ChildOutputPropertyDeriver extends PlanVisitor partitionTopN, + PlanContext context) { + Preconditions.checkState(childrenOutputProperties.size() == 1); + PhysicalProperties childOutputProperty = childrenOutputProperties.get(0); + return new PhysicalProperties(childOutputProperty.getDistributionSpec()); + } + @Override public PhysicalProperties visitPhysicalProject(PhysicalProject project, PlanContext context) { // TODO: order spec do not process since we do not use it. diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java index ff2f67909e..5ab11c015d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java @@ -36,6 +36,7 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin; import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit; import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin; import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink; +import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.nereids.util.ExpressionUtils; import org.apache.doris.nereids.util.JoinUtils; @@ -115,6 +116,12 @@ public class RequestPropertyDeriver extends PlanVisitor { return null; } + @Override + public Void visitPhysicalPartitionTopN(PhysicalPartitionTopN partitionTopN, PlanContext context) { + addRequestPropertyToChildren(PhysicalProperties.ANY); + return null; + } + @Override public Void visitPhysicalHashJoin(PhysicalHashJoin hashJoin, PlanContext context) { JoinHint hint = hashJoin.getHint(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java index 319e0cebd5..67ac4b8758 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java @@ -55,6 +55,7 @@ import org.apache.doris.nereids.rules.implementation.LogicalLimitToPhysicalLimit import org.apache.doris.nereids.rules.implementation.LogicalOlapScanToPhysicalOlapScan; import org.apache.doris.nereids.rules.implementation.LogicalOlapTableSinkToPhysicalOlapTableSink; import org.apache.doris.nereids.rules.implementation.LogicalOneRowRelationToPhysicalOneRowRelation; +import org.apache.doris.nereids.rules.implementation.LogicalPartitionTopNToPhysicalPartitionTopN; import org.apache.doris.nereids.rules.implementation.LogicalProjectToPhysicalProject; import org.apache.doris.nereids.rules.implementation.LogicalRepeatToPhysicalRepeat; import org.apache.doris.nereids.rules.implementation.LogicalSchemaScanToPhysicalSchemaScan; @@ -75,6 +76,7 @@ import org.apache.doris.nereids.rules.rewrite.logical.PushdownFilterThroughJoin; import org.apache.doris.nereids.rules.rewrite.logical.PushdownFilterThroughProject; import org.apache.doris.nereids.rules.rewrite.logical.PushdownFilterThroughRepeat; import org.apache.doris.nereids.rules.rewrite.logical.PushdownFilterThroughSetOperation; +import org.apache.doris.nereids.rules.rewrite.logical.PushdownFilterThroughWindow; import org.apache.doris.nereids.rules.rewrite.logical.PushdownJoinOtherCondition; import org.apache.doris.nereids.rules.rewrite.logical.PushdownProjectThroughLimit; @@ -113,6 +115,7 @@ public class RuleSet { new PushdownFilterThroughAggregation(), new PushdownFilterThroughRepeat(), new PushdownFilterThroughSetOperation(), + new PushdownFilterThroughWindow(), new PushdownProjectThroughLimit(), new PushdownAliasThroughJoin(), new EliminateOuterJoin(), @@ -136,6 +139,7 @@ public class RuleSet { .add(new LogicalWindowToPhysicalWindow()) .add(new LogicalSortToPhysicalQuickSort()) .add(new LogicalTopNToPhysicalTopN()) + .add(new LogicalPartitionTopNToPhysicalPartitionTopN()) .add(new LogicalAssertNumRowsToPhysicalAssertNumRows()) .add(new LogicalOneRowRelationToPhysicalOneRowRelation()) .add(new LogicalEmptyRelationToPhysicalEmptyRelation()) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java index 836ebdde91..266554955d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java @@ -134,11 +134,15 @@ public enum RuleType { PUSH_FILTER_INSIDE_JOIN(RuleTypeClass.REWRITE), PUSHDOWN_FILTER_THROUGH_PROJECT(RuleTypeClass.REWRITE), PUSHDOWN_FILTER_THROUGH_PROJECT_UNDER_LIMIT(RuleTypeClass.REWRITE), + PUSHDOWN_FILTER_THROUGH_WINDOW(RuleTypeClass.REWRITE), PUSHDOWN_PROJECT_THROUGH_LIMIT(RuleTypeClass.REWRITE), PUSHDOWN_ALIAS_THROUGH_JOIN(RuleTypeClass.REWRITE), PUSHDOWN_FILTER_THROUGH_SET_OPERATION(RuleTypeClass.REWRITE), COLUMN_PRUNING(RuleTypeClass.REWRITE), + PUSHDOWN_TOP_N_THROUGH_PROJECTION_WINDOW(RuleTypeClass.REWRITE), + PUSHDOWN_TOP_N_THROUGH_WINDOW(RuleTypeClass.REWRITE), + TRANSPOSE_LOGICAL_SEMI_JOIN_LOGICAL_JOIN(RuleTypeClass.REWRITE), TRANSPOSE_LOGICAL_SEMI_JOIN_LOGICAL_JOIN_PROJECT(RuleTypeClass.REWRITE), LOGICAL_SEMI_JOIN_COMMUTE(RuleTypeClass.REWRITE), @@ -206,7 +210,9 @@ public enum RuleType { // limit push down PUSH_LIMIT_THROUGH_JOIN(RuleTypeClass.REWRITE), PUSH_LIMIT_THROUGH_PROJECT_JOIN(RuleTypeClass.REWRITE), + PUSH_LIMIT_THROUGH_PROJECT_WINDOW(RuleTypeClass.REWRITE), PUSH_LIMIT_THROUGH_UNION(RuleTypeClass.REWRITE), + PUSH_LIMIT_THROUGH_WINDOW(RuleTypeClass.REWRITE), PUSH_LIMIT_INTO_SORT(RuleTypeClass.REWRITE), // adjust nullable ADJUST_NULLABLE(RuleTypeClass.REWRITE), @@ -258,6 +264,7 @@ public enum RuleType { LOGICAL_FILTER_TO_PHYSICAL_FILTER_RULE(RuleTypeClass.IMPLEMENTATION), LOGICAL_SORT_TO_PHYSICAL_QUICK_SORT_RULE(RuleTypeClass.IMPLEMENTATION), LOGICAL_TOP_N_TO_PHYSICAL_TOP_N_RULE(RuleTypeClass.IMPLEMENTATION), + LOGICAL_PARTITION_TOP_N_TO_PHYSICAL_PARTITION_TOP_N_RULE(RuleTypeClass.IMPLEMENTATION), LOGICAL_EMPTY_RELATION_TO_PHYSICAL_EMPTY_RELATION_RULE(RuleTypeClass.IMPLEMENTATION), LOGICAL_LIMIT_TO_PHYSICAL_LIMIT_RULE(RuleTypeClass.IMPLEMENTATION), LOGICAL_OLAP_SCAN_TO_PHYSICAL_OLAP_SCAN_RULE(RuleTypeClass.IMPLEMENTATION), diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalPartitionTopNToPhysicalPartitionTopN.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalPartitionTopNToPhysicalPartitionTopN.java new file mode 100644 index 0000000000..b7975e7ca6 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalPartitionTopNToPhysicalPartitionTopN.java @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.rules.implementation; + +import org.apache.doris.nereids.properties.OrderKey; +import org.apache.doris.nereids.rules.Rule; +import org.apache.doris.nereids.rules.RuleType; +import org.apache.doris.nereids.trees.expressions.OrderExpression; +import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN; + +import com.google.common.collect.ImmutableList; + +import java.util.List; + +/** + * Implementation rule that convert logical partition-top-n to physical partition-top-n. + */ +public class LogicalPartitionTopNToPhysicalPartitionTopN extends OneImplementationRuleFactory { + @Override + public Rule build() { + return logicalPartitionTopN().then(partitionTopN -> { + List orderKeys = !partitionTopN.getOrderKeys().isEmpty() + ? partitionTopN.getOrderKeys().stream() + .map(OrderExpression::getOrderKey) + .collect(ImmutableList.toImmutableList()) : + ImmutableList.of(); + + return new PhysicalPartitionTopN<>( + partitionTopN.getFunction(), + partitionTopN.getPartitionKeys(), + orderKeys, + partitionTopN.hasGlobalLimit(), + partitionTopN.getPartitionLimit(), + partitionTopN.getLogicalProperties(), + partitionTopN.child()); + }).toRule(RuleType.LOGICAL_PARTITION_TOP_N_TO_PHYSICAL_PARTITION_TOP_N_RULE); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownFilterThroughWindow.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownFilterThroughWindow.java new file mode 100644 index 0000000000..9162345290 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownFilterThroughWindow.java @@ -0,0 +1,167 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.rules.rewrite.logical; + +import org.apache.doris.nereids.rules.Rule; +import org.apache.doris.nereids.rules.RuleType; +import org.apache.doris.nereids.rules.rewrite.OneRewriteRuleFactory; +import org.apache.doris.nereids.trees.expressions.BinaryOperator; +import org.apache.doris.nereids.trees.expressions.EqualTo; +import org.apache.doris.nereids.trees.expressions.ExprId; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.LessThan; +import org.apache.doris.nereids.trees.expressions.LessThanEqual; +import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.expressions.SlotReference; +import org.apache.doris.nereids.trees.expressions.WindowExpression; +import org.apache.doris.nereids.trees.expressions.literal.IntegerLikeLiteral; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.logical.LogicalEmptyRelation; +import org.apache.doris.nereids.trees.plans.logical.LogicalPartitionTopN; +import org.apache.doris.nereids.trees.plans.logical.LogicalWindow; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSet; + +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.function.Predicate; + +/** + * Push down the 'filter' into the 'window'. + * It will convert the filter condition to the 'limit value' and push down below the 'window'. + * But there are some restrictions, the details are explained below. + * For example: + * 'SELECT * FROM ( + * SELECT *, ROW_NUMBER() OVER (ORDER BY b) AS row_number + * FROM t + * ) AS tt WHERE row_number <= 100;' + * The filter 'row_number <= 100' can be pushed down into the window operator. + * The following will demonstrate how the plan changes: + * Logical plan tree: + * any_node + * | + * filter (row_number <= 100) + * | + * window (PARTITION BY a ORDER BY b) + * | + * any_node + * transformed to: + * any_node + * | + * filter (row_number <= 100) + * | + * window (PARTITION BY a ORDER BY b) + * | + * partition_topn(PARTITION BY: a, ORDER BY b, Partition Limit: 100) + * | + * any_node + */ + +public class PushdownFilterThroughWindow extends OneRewriteRuleFactory { + + @Override + public Rule build() { + return logicalFilter(logicalWindow()).then(filter -> { + LogicalWindow window = filter.child(); + + // We have already done such optimization rule, so just ignore it. + if (window.child(0) instanceof LogicalPartitionTopN) { + return filter; + } + + List windowExprs = window.getWindowExpressions(); + if (windowExprs.size() != 1) { + return filter; + } + NamedExpression windowExpr = windowExprs.get(0); + if (windowExpr.children().size() != 1 || !(windowExpr.child(0) instanceof WindowExpression)) { + return filter; + } + + // Check the filter conditions. Now, we currently only support simple conditions of the form + // 'column conjuncts = filter.getConjuncts(); + Set relatedConjuncts = extractRelatedConjuncts(conjuncts, windowExpr.getExprId()); + + boolean hasPartitionLimit = false; + long partitionLimit = Long.MAX_VALUE; + + for (Expression conjunct : relatedConjuncts) { + Preconditions.checkArgument(conjunct instanceof BinaryOperator); + BinaryOperator op = (BinaryOperator) conjunct; + Expression leftChild = op.children().get(0); + Expression rightChild = op.children().get(1); + + Preconditions.checkArgument(leftChild instanceof SlotReference + && rightChild instanceof IntegerLikeLiteral); + + long limitVal = ((IntegerLikeLiteral) rightChild).getLongValue(); + // Adjust the value for 'limitVal' based on the comparison operators. + if (conjunct instanceof LessThan) { + limitVal--; + } + if (limitVal < 0) { + return new LogicalEmptyRelation(filter.getOutput()); + } + if (hasPartitionLimit) { + partitionLimit = Math.min(partitionLimit, limitVal); + } else { + partitionLimit = limitVal; + hasPartitionLimit = true; + } + } + + if (!hasPartitionLimit) { + return filter; + } + + Optional newWindow = window.pushPartitionLimitThroughWindow(partitionLimit, false); + if (!newWindow.isPresent()) { + return filter; + } + return filter.withChildren(newWindow.get()); + }).toRule(RuleType.PUSHDOWN_FILTER_THROUGH_WINDOW); + } + + private Set extractRelatedConjuncts(Set conjuncts, ExprId slotRefID) { + Predicate condition = conjunct -> { + if (!(conjunct instanceof BinaryOperator)) { + return false; + } + BinaryOperator op = (BinaryOperator) conjunct; + Expression leftChild = op.children().get(0); + Expression rightChild = op.children().get(1); + + if (!(conjunct instanceof LessThan || conjunct instanceof LessThanEqual || conjunct instanceof EqualTo)) { + return false; + } + + // TODO: Now, we only support the column on the left side. + if (!(leftChild instanceof SlotReference) || !(rightChild instanceof IntegerLikeLiteral)) { + return false; + } + return ((SlotReference) leftChild).getExprId() == slotRefID; + }; + + return conjuncts.stream() + .filter(condition) + .collect(ImmutableSet.toImmutableSet()); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownLimit.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownLimit.java index c989b53e24..953f11435f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownLimit.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownLimit.java @@ -31,11 +31,13 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalProject; import org.apache.doris.nereids.trees.plans.logical.LogicalSort; import org.apache.doris.nereids.trees.plans.logical.LogicalTopN; import org.apache.doris.nereids.trees.plans.logical.LogicalUnion; +import org.apache.doris.nereids.trees.plans.logical.LogicalWindow; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import java.util.List; +import java.util.Optional; /** * Rules to push {@link org.apache.doris.nereids.trees.plans.logical.LogicalLimit} down. @@ -70,6 +72,31 @@ public class PushdownLimit implements RewriteRuleFactory { return limit.withChildren(project.withChildren(newJoin)); }).toRule(RuleType.PUSH_LIMIT_THROUGH_PROJECT_JOIN), + // limit -> window + logicalLimit(logicalWindow()) + .then(limit -> { + LogicalWindow window = limit.child(); + long partitionLimit = limit.getLimit() + limit.getOffset(); + Optional newWindow = window.pushPartitionLimitThroughWindow(partitionLimit, true); + if (!newWindow.isPresent()) { + return limit; + } + return limit.withChildren(newWindow.get()); + }).toRule(RuleType.PUSH_LIMIT_THROUGH_WINDOW), + + // limit -> project -> window + logicalLimit(logicalProject(logicalWindow())) + .then(limit -> { + LogicalProject> project = limit.child(); + LogicalWindow window = project.child(); + long partitionLimit = limit.getLimit() + limit.getOffset(); + Optional newWindow = window.pushPartitionLimitThroughWindow(partitionLimit, true); + if (!newWindow.isPresent()) { + return limit; + } + return limit.withChildren(project.withChildren(newWindow.get())); + }).toRule(RuleType.PUSH_LIMIT_THROUGH_PROJECT_WINDOW), + // limit -> union logicalLimit(logicalUnion(multi()).when(union -> union.getQualifier() == Qualifier.ALL)) .whenNot(Limit::hasValidOffset) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownTopNThroughWindow.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownTopNThroughWindow.java new file mode 100644 index 0000000000..92acb4bf51 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownTopNThroughWindow.java @@ -0,0 +1,117 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.rules.rewrite.logical; + +import org.apache.doris.nereids.properties.OrderKey; +import org.apache.doris.nereids.rules.Rule; +import org.apache.doris.nereids.rules.RuleType; +import org.apache.doris.nereids.rules.rewrite.RewriteRuleFactory; +import org.apache.doris.nereids.trees.expressions.ExprId; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.expressions.SlotReference; +import org.apache.doris.nereids.trees.expressions.WindowExpression; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.logical.LogicalProject; +import org.apache.doris.nereids.trees.plans.logical.LogicalTopN; +import org.apache.doris.nereids.trees.plans.logical.LogicalWindow; + +import com.google.common.collect.ImmutableList; + +import java.util.List; +import java.util.Optional; + +/** + * PushdownTopNThroughWindow push down the TopN through the Window and generate the PartitionTopN. + */ +public class PushdownTopNThroughWindow implements RewriteRuleFactory { + @Override + public List buildRules() { + return ImmutableList.of( + // topn -> window + logicalTopN(logicalWindow()).then(topn -> { + LogicalWindow window = topn.child(); + ExprId windowExprId = getExprID4WindowFunc(window); + if (windowExprId == null) { + return topn; + } + + if (!checkTopN4PartitionLimitPushDown(topn, windowExprId)) { + return topn; + } + long partitionLimit = topn.getLimit() + topn.getOffset(); + Optional newWindow = window.pushPartitionLimitThroughWindow(partitionLimit, true); + if (!newWindow.isPresent()) { + return topn; + } + return topn.withChildren(newWindow.get()); + }).toRule(RuleType.PUSHDOWN_TOP_N_THROUGH_WINDOW), + + // topn -> projection -> window + logicalTopN(logicalProject(logicalWindow())).then(topn -> { + LogicalProject> project = topn.child(); + LogicalWindow window = project.child(); + ExprId windowExprId = getExprID4WindowFunc(window); + if (windowExprId == null) { + return topn; + } + + if (!checkTopN4PartitionLimitPushDown(topn, windowExprId)) { + return topn; + } + long partitionLimit = topn.getLimit() + topn.getOffset(); + Optional newWindow = window.pushPartitionLimitThroughWindow(partitionLimit, true); + if (!newWindow.isPresent()) { + return topn; + } + return topn.withChildren(project.withChildren(newWindow.get())); + }).toRule(RuleType.PUSHDOWN_TOP_N_THROUGH_PROJECTION_WINDOW) + ); + } + + private ExprId getExprID4WindowFunc(LogicalWindow window) { + List windowExprs = window.getWindowExpressions(); + if (windowExprs.size() != 1) { + return null; + } + NamedExpression windowExpr = windowExprs.get(0); + if (windowExpr.children().size() != 1 || !(windowExpr.child(0) instanceof WindowExpression)) { + return null; + } + return windowExpr.getExprId(); + } + + private boolean checkTopN4PartitionLimitPushDown(LogicalTopN topn, ExprId slotRefID) { + List orderKeys = topn.getOrderKeys(); + if (orderKeys.size() != 1) { + return false; + } + + OrderKey orderkey = orderKeys.get(0); + if (!orderkey.isAsc()) { + return false; + } + + Expression orderKeyExpr = orderkey.getExpr(); + if (!(orderKeyExpr instanceof SlotReference)) { + return false; + } + + return ((SlotReference) orderKeyExpr).getExprId() == slotRefID; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java index daceb9494b..5755719aa0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java @@ -37,6 +37,7 @@ import org.apache.doris.nereids.trees.plans.algebra.Filter; import org.apache.doris.nereids.trees.plans.algebra.Generate; import org.apache.doris.nereids.trees.plans.algebra.Limit; import org.apache.doris.nereids.trees.plans.algebra.OneRowRelation; +import org.apache.doris.nereids.trees.plans.algebra.PartitionTopN; import org.apache.doris.nereids.trees.plans.algebra.Project; import org.apache.doris.nereids.trees.plans.algebra.Repeat; import org.apache.doris.nereids.trees.plans.algebra.Scan; @@ -58,6 +59,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalLimit; import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan; import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink; import org.apache.doris.nereids.trees.plans.logical.LogicalOneRowRelation; +import org.apache.doris.nereids.trees.plans.logical.LogicalPartitionTopN; import org.apache.doris.nereids.trees.plans.logical.LogicalProject; import org.apache.doris.nereids.trees.plans.logical.LogicalRepeat; import org.apache.doris.nereids.trees.plans.logical.LogicalSchemaScan; @@ -83,6 +85,7 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin; import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan; import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalOneRowRelation; +import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN; import org.apache.doris.nereids.trees.plans.physical.PhysicalProject; import org.apache.doris.nereids.trees.plans.physical.PhysicalQuickSort; import org.apache.doris.nereids.trees.plans.physical.PhysicalRepeat; @@ -117,6 +120,7 @@ import java.util.stream.Collectors; */ public class StatsCalculator extends DefaultPlanVisitor { public static double DEFAULT_AGGREGATE_RATIO = 0.5; + public static double DEFAULT_COLUMN_NDV_RATIO = 0.5; private final GroupExpression groupExpression; private boolean forbidUnknownColStats = false; @@ -276,6 +280,11 @@ public class StatsCalculator extends DefaultPlanVisitor { return computeTopN(topN); } + @Override + public Statistics visitLogicalPartitionTopN(LogicalPartitionTopN partitionTopN, Void context) { + return computePartitionTopN(partitionTopN); + } + @Override public Statistics visitLogicalJoin(LogicalJoin join, Void context) { return JoinEstimation.estimate(groupExpression.childStatistics(0), @@ -325,6 +334,11 @@ public class StatsCalculator extends DefaultPlanVisitor { return computeWindow(window); } + @Override + public Statistics visitPhysicalPartitionTopN(PhysicalPartitionTopN partitionTopN, Void context) { + return computePartitionTopN(partitionTopN); + } + @Override public Statistics visitPhysicalEmptyRelation(PhysicalEmptyRelation emptyRelation, Void context) { return computeEmptyRelation(emptyRelation); @@ -545,6 +559,35 @@ public class StatsCalculator extends DefaultPlanVisitor { return stats.withRowCount(Math.min(stats.getRowCount(), topN.getLimit())); } + private Statistics computePartitionTopN(PartitionTopN partitionTopN) { + Statistics stats = groupExpression.childStatistics(0); + double rowCount = stats.getRowCount(); + List partitionKeys = partitionTopN.getPartitionKeys(); + if (!partitionTopN.hasGlobalLimit() && !partitionKeys.isEmpty()) { + // If there is no global limit. So result for the cardinality estimation is: + // NDV(partition key) * partitionLimit + Map childSlotToColumnStats = stats.columnStatistics(); + List partitionByKeyStats = partitionKeys.stream() + .filter(childSlotToColumnStats::containsKey) + .map(childSlotToColumnStats::get) + .filter(s -> !s.isUnKnown) + .collect(Collectors.toList()); + if (partitionByKeyStats.isEmpty()) { + // all column stats are unknown, use default ratio + rowCount = rowCount * DEFAULT_COLUMN_NDV_RATIO; + } else { + rowCount = Math.min(rowCount, partitionByKeyStats.stream().map(s -> s.ndv) + .max(Double::compare).get()); + } + } else { + rowCount = Math.min(rowCount, partitionTopN.getPartitionLimit()); + } + // TODO: for the filter push down window situation, we will prune the row count twice + // because we keep the pushed down filter. And it will be calculated twice, one of them in 'PartitionTopN' + // and the other is in 'Filter'. It's hard to dismiss. + return stats.updateRowCountOnly(rowCount); + } + private Statistics computeLimit(Limit limit) { Statistics stats = groupExpression.childStatistics(0); return stats.withRowCount(Math.min(stats.getRowCount(), limit.getLimit())); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java index 0928975f64..c0e4450cb9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java @@ -43,6 +43,7 @@ public enum PlanType { LOGICAL_REPEAT, LOGICAL_SORT, LOGICAL_TOP_N, + LOGICAL_PARTITION_TOP_N, LOGICAL_LIMIT, LOGICAL_OLAP_SCAN, LOGICAL_SCHEMA_SCAN, @@ -79,6 +80,7 @@ public enum PlanType { PHYSICAL_REPEAT, PHYSICAL_QUICK_SORT, PHYSICAL_TOP_N, + PHYSICAL_PARTITION_TOP_N, PHYSICAL_LOCAL_QUICK_SORT, PHYSICAL_LIMIT, PHYSICAL_HASH_JOIN, diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/PartitionTopN.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/PartitionTopN.java new file mode 100644 index 0000000000..e3a32353ec --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/PartitionTopN.java @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.algebra; + +import org.apache.doris.nereids.trees.expressions.Expression; + +import java.util.List; + +/** + * Common interface for logical/physical PartitionTopN. + */ +public interface PartitionTopN { + long getPartitionLimit(); + + boolean hasGlobalLimit(); + + List getPartitionKeys(); +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalPartitionTopN.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalPartitionTopN.java new file mode 100644 index 0000000000..22e0996822 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalPartitionTopN.java @@ -0,0 +1,194 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.logical; + +import org.apache.doris.nereids.memo.GroupExpression; +import org.apache.doris.nereids.properties.LogicalProperties; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.OrderExpression; +import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.expressions.WindowExpression; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.algebra.PartitionTopN; +import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; +import org.apache.doris.nereids.types.WindowFuncType; +import org.apache.doris.nereids.util.Utils; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; + +import java.util.List; +import java.util.Objects; +import java.util.Optional; + +/** + * Logical partition-top-N plan. + */ +public class LogicalPartitionTopN extends LogicalUnary implements PartitionTopN { + private final WindowFuncType function; + private final List partitionKeys; + private final List orderKeys; + private final boolean hasGlobalLimit; + private final long partitionLimit; + + public LogicalPartitionTopN(WindowExpression windowExpr, boolean hasGlobalLimit, long partitionLimit, + CHILD_TYPE child) { + this(windowExpr.getFunction(), windowExpr.getPartitionKeys(), windowExpr.getOrderKeys(), + hasGlobalLimit, partitionLimit, Optional.empty(), + Optional.empty(), child); + } + + public LogicalPartitionTopN(WindowFuncType windowFuncType, List partitionKeys, + List orderKeys, boolean hasGlobalLimit, long partitionLimit, + CHILD_TYPE child) { + this(windowFuncType, partitionKeys, orderKeys, hasGlobalLimit, partitionLimit, + Optional.empty(), Optional.empty(), child); + } + + /** + * Constructor for LogicalPartitionTopN. + */ + public LogicalPartitionTopN(WindowFuncType windowFuncType, List partitionKeys, + List orderKeys, boolean hasGlobalLimit, + long partitionLimit, Optional groupExpression, + Optional logicalProperties, CHILD_TYPE child) { + super(PlanType.LOGICAL_PARTITION_TOP_N, groupExpression, logicalProperties, child); + this.function = windowFuncType; + this.partitionKeys = ImmutableList.copyOf(Objects.requireNonNull(partitionKeys, + "partitionKeys can not be null")); + this.orderKeys = ImmutableList.copyOf(Objects.requireNonNull(orderKeys, + "orderKeys can not be null")); + this.hasGlobalLimit = hasGlobalLimit; + this.partitionLimit = partitionLimit; + } + + /** + * Constructor for LogicalPartitionTopN. + */ + public LogicalPartitionTopN(Expression expr, List partitionKeys, List orderKeys, + boolean hasGlobalLimit, long partitionLimit, Optional groupExpression, + Optional logicalProperties, CHILD_TYPE child) { + super(PlanType.LOGICAL_PARTITION_TOP_N, groupExpression, logicalProperties, child); + String funcName = expr.toString(); + if (funcName.equals("row_number()")) { + this.function = WindowFuncType.ROW_NUMBER; + } else if (funcName.equals("rank()")) { + this.function = WindowFuncType.RANK; + } else { + Preconditions.checkArgument(funcName.equals("dense_rank()")); + this.function = WindowFuncType.DENSE_RANK; + } + this.partitionKeys = ImmutableList.copyOf( + Objects.requireNonNull(partitionKeys, "partitionKeys can not be null")); + this.orderKeys = ImmutableList.copyOf( + Objects.requireNonNull(orderKeys, "orderKeys can not be null")); + this.hasGlobalLimit = hasGlobalLimit; + this.partitionLimit = partitionLimit; + } + + @Override + public List computeOutput() { + return child().getOutput(); + } + + public WindowFuncType getFunction() { + return function; + } + + @Override + public List getPartitionKeys() { + return partitionKeys; + } + + public List getOrderKeys() { + return orderKeys; + } + + @Override + public boolean hasGlobalLimit() { + return hasGlobalLimit; + } + + @Override + public long getPartitionLimit() { + return partitionLimit; + } + + @Override + public String toString() { + return Utils.toSqlString("LogicalPartitionTopN", + "function", function, + "partitionKeys", partitionKeys, + "orderKeys", orderKeys, + "hasGlobalLimit", hasGlobalLimit, + "partitionLimit", partitionLimit + ); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + LogicalPartitionTopN that = (LogicalPartitionTopN) o; + return Objects.equals(this.function, that.function) && Objects.equals(this.partitionKeys, that.partitionKeys) + && Objects.equals(this.orderKeys, that.orderKeys) && this.hasGlobalLimit == that.hasGlobalLimit + && this.partitionLimit == that.partitionLimit; + } + + @Override + public int hashCode() { + return Objects.hash(function, partitionKeys, orderKeys, hasGlobalLimit, partitionLimit); + } + + @Override + public R accept(PlanVisitor visitor, C context) { + return visitor.visitLogicalPartitionTopN(this, context); + } + + @Override + public List getExpressions() { + return new ImmutableList.Builder() + .addAll(partitionKeys) + .addAll(orderKeys) + .build(); + } + + @Override + public LogicalPartitionTopN withChildren(List children) { + Preconditions.checkArgument(children.size() == 1); + return new LogicalPartitionTopN<>(function, partitionKeys, orderKeys, hasGlobalLimit, partitionLimit, + children.get(0)); + } + + @Override + public LogicalPartitionTopN withGroupExpression(Optional groupExpression) { + return new LogicalPartitionTopN<>(function, partitionKeys, orderKeys, hasGlobalLimit, partitionLimit, + groupExpression, Optional.of(getLogicalProperties()), child()); + } + + @Override + public LogicalPartitionTopN withLogicalProperties(Optional logicalProperties) { + return new LogicalPartitionTopN<>(function, partitionKeys, orderKeys, hasGlobalLimit, partitionLimit, + Optional.empty(), logicalProperties, child()); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalWindow.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalWindow.java index 3e4a61b3f2..a1056f0109 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalWindow.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalWindow.java @@ -22,11 +22,17 @@ import org.apache.doris.nereids.properties.LogicalProperties; import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.NamedExpression; import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.expressions.WindowExpression; +import org.apache.doris.nereids.trees.expressions.WindowFrame; +import org.apache.doris.nereids.trees.expressions.functions.window.DenseRank; +import org.apache.doris.nereids.trees.expressions.functions.window.Rank; +import org.apache.doris.nereids.trees.expressions.functions.window.RowNumber; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.algebra.Window; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.nereids.util.Utils; +import org.apache.doris.qe.ConnectContext; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; @@ -159,4 +165,62 @@ public class LogicalWindow extends LogicalUnary pushPartitionLimitThroughWindow(long partitionLimit, boolean hasGlobalLimit) { + if (!ConnectContext.get().getSessionVariable().isEnablePartitionTopN()) { + return Optional.empty(); + } + // We have already done such optimization rule, so just ignore it. + if (child(0) instanceof LogicalPartitionTopN) { + return Optional.empty(); + } + + // Check the window function. There are some restrictions for window function: + // 1. The number of window function should be 1. + // 2. The window function should be one of the 'row_number()', 'rank()', 'dense_rank()'. + // 3. The window frame should be 'UNBOUNDED' to 'CURRENT'. + // 4. The 'PARTITION' key and 'ORDER' key can not be empty at the same time. + if (windowExpressions.size() != 1) { + return Optional.empty(); + } + NamedExpression windowExpr = windowExpressions.get(0); + if (windowExpr.children().size() != 1 || !(windowExpr.child(0) instanceof WindowExpression)) { + return Optional.empty(); + } + + WindowExpression windowFunc = (WindowExpression) windowExpr.child(0); + // Check the window function name. + if (!(windowFunc.getFunction() instanceof RowNumber + || windowFunc.getFunction() instanceof Rank + || windowFunc.getFunction() instanceof DenseRank)) { + return Optional.empty(); + } + + // Check the partition key and order key. + if (windowFunc.getPartitionKeys().isEmpty() && windowFunc.getOrderKeys().isEmpty()) { + return Optional.empty(); + } + + // Check the window type and window frame. + Optional windowFrame = windowFunc.getWindowFrame(); + if (windowFrame.isPresent()) { + WindowFrame frame = windowFrame.get(); + if (!(frame.getLeftBoundary().getFrameBoundType() == WindowFrame.FrameBoundType.UNBOUNDED_PRECEDING + && frame.getRightBoundary().getFrameBoundType() == WindowFrame.FrameBoundType.CURRENT_ROW)) { + return Optional.empty(); + } + } else { + return Optional.empty(); + } + + LogicalWindow window = (LogicalWindow) withChildren(new LogicalPartitionTopN<>(windowFunc, hasGlobalLimit, + partitionLimit, child(0))); + + return Optional.ofNullable(window); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalPartitionTopN.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalPartitionTopN.java new file mode 100644 index 0000000000..8dbe29228d --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalPartitionTopN.java @@ -0,0 +1,186 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.physical; + +import org.apache.doris.nereids.memo.GroupExpression; +import org.apache.doris.nereids.properties.LogicalProperties; +import org.apache.doris.nereids.properties.OrderKey; +import org.apache.doris.nereids.properties.PhysicalProperties; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.algebra.PartitionTopN; +import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; +import org.apache.doris.nereids.types.WindowFuncType; +import org.apache.doris.nereids.util.Utils; +import org.apache.doris.statistics.Statistics; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; + +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.stream.Collectors; + +/** + * Physical partition-top-N plan. + */ +public class PhysicalPartitionTopN extends PhysicalUnary implements PartitionTopN { + private final WindowFuncType function; + private final List partitionKeys; + private final List orderKeys; + private final Boolean hasGlobalLimit; + private final long partitionLimit; + + public PhysicalPartitionTopN(WindowFuncType function, List partitionKeys, List orderKeys, + Boolean hasGlobalLimit, long partitionLimit, + LogicalProperties logicalProperties, CHILD_TYPE child) { + this(function, partitionKeys, orderKeys, hasGlobalLimit, partitionLimit, + Optional.empty(), logicalProperties, child); + } + + /** + * Constructor of PhysicalPartitionTopN. + */ + public PhysicalPartitionTopN(WindowFuncType function, List partitionKeys, List orderKeys, + Boolean hasGlobalLimit, long partitionLimit, + Optional groupExpression, LogicalProperties logicalProperties, + CHILD_TYPE child) { + super(PlanType.PHYSICAL_PARTITION_TOP_N, groupExpression, logicalProperties, child); + this.function = function; + this.partitionKeys = ImmutableList.copyOf(partitionKeys); + this.orderKeys = ImmutableList.copyOf(orderKeys); + this.hasGlobalLimit = hasGlobalLimit; + this.partitionLimit = partitionLimit; + } + + /** + * Constructor of PhysicalPartitionTopN. + */ + public PhysicalPartitionTopN(WindowFuncType function, List partitionKeys, List orderKeys, + Boolean hasGlobalLimit, long partitionLimit, + Optional groupExpression, LogicalProperties logicalProperties, + PhysicalProperties physicalProperties, Statistics statistics, CHILD_TYPE child) { + super(PlanType.PHYSICAL_PARTITION_TOP_N, groupExpression, logicalProperties, physicalProperties, + statistics, child); + this.function = function; + this.partitionKeys = partitionKeys; + this.orderKeys = orderKeys; + this.hasGlobalLimit = hasGlobalLimit; + this.partitionLimit = partitionLimit; + } + + public WindowFuncType getFunction() { + return function; + } + + @Override + public List getPartitionKeys() { + return partitionKeys; + } + + public List getOrderKeys() { + return orderKeys; + } + + @Override + public boolean hasGlobalLimit() { + return hasGlobalLimit; + } + + @Override + public long getPartitionLimit() { + return partitionLimit; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + PhysicalPartitionTopN that = (PhysicalPartitionTopN) o; + return Objects.equals(this.function, that.function) + && Objects.equals(this.partitionKeys, that.partitionKeys) + && Objects.equals(this.orderKeys, that.orderKeys) && this.hasGlobalLimit == that.hasGlobalLimit + && this.partitionLimit == that.partitionLimit; + } + + @Override + public int hashCode() { + return Objects.hash(function, partitionKeys, orderKeys, hasGlobalLimit, partitionLimit); + } + + @Override + public R accept(PlanVisitor visitor, C context) { + return visitor.visitPhysicalPartitionTopN(this, context); + } + + @Override + public List getExpressions() { + return new ImmutableList.Builder() + .addAll(partitionKeys) + .addAll(orderKeys.stream() + .map(OrderKey::getExpr) + .collect(Collectors.toList())) + .build(); + } + + @Override + public PhysicalPartitionTopN withChildren(List children) { + Preconditions.checkArgument(children.size() == 1); + return new PhysicalPartitionTopN<>(function, partitionKeys, orderKeys, hasGlobalLimit, partitionLimit, + getLogicalProperties(), children.get(0)); + } + + @Override + public PhysicalPartitionTopN withGroupExpression(Optional groupExpression) { + return new PhysicalPartitionTopN<>(function, partitionKeys, orderKeys, hasGlobalLimit, partitionLimit, + groupExpression, getLogicalProperties(), child()); + } + + @Override + public PhysicalPartitionTopN withLogicalProperties(Optional logicalProperties) { + return new PhysicalPartitionTopN<>(function, partitionKeys, orderKeys, hasGlobalLimit, partitionLimit, + Optional.empty(), logicalProperties.get(), child()); + } + + @Override + public PhysicalPartitionTopN withPhysicalPropertiesAndStats(PhysicalProperties physicalProperties, + Statistics statistics) { + return new PhysicalPartitionTopN<>(function, partitionKeys, orderKeys, hasGlobalLimit, partitionLimit, + groupExpression, getLogicalProperties(), physicalProperties, statistics, child()); + } + + @Override + public String toString() { + return Utils.toSqlString("PhysicalPartitionTopN[" + id.asInt() + "]" + getGroupIdAsString(), + "function", function, + "partitionKeys", partitionKeys, + "orderKeys", orderKeys, + "hasGlobalLimit", hasGlobalLimit, + "partitionLimit", partitionLimit + ); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/PlanVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/PlanVisitor.java index b3f7dd4e6f..63c83b31da 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/PlanVisitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/PlanVisitor.java @@ -46,6 +46,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalLimit; import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan; import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink; import org.apache.doris.nereids.trees.plans.logical.LogicalOneRowRelation; +import org.apache.doris.nereids.trees.plans.logical.LogicalPartitionTopN; import org.apache.doris.nereids.trees.plans.logical.LogicalProject; import org.apache.doris.nereids.trees.plans.logical.LogicalRelation; import org.apache.doris.nereids.trees.plans.logical.LogicalRepeat; @@ -77,6 +78,7 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin; import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan; import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalOneRowRelation; +import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN; import org.apache.doris.nereids.trees.plans.physical.PhysicalProject; import org.apache.doris.nereids.trees.plans.physical.PhysicalQuickSort; import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation; @@ -216,6 +218,10 @@ public abstract class PlanVisitor { return visit(topN, context); } + public R visitLogicalPartitionTopN(LogicalPartitionTopN partitionTopN, C context) { + return visit(partitionTopN, context); + } + public R visitLogicalLimit(LogicalLimit limit, C context) { return visit(limit, context); } @@ -336,6 +342,10 @@ public abstract class PlanVisitor { return visitAbstractPhysicalSort(topN, context); } + public R visitPhysicalPartitionTopN(PhysicalPartitionTopN partitionTopN, C context) { + return visit(partitionTopN, context); + } + public R visitPhysicalLimit(PhysicalLimit limit, C context) { return visit(limit, context); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/types/WindowFuncType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/types/WindowFuncType.java new file mode 100644 index 0000000000..557e63954e --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/types/WindowFuncType.java @@ -0,0 +1,27 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.types; + +/** + * WindowFuncType represent the window function which is used in the PartitionTopN. + */ +public enum WindowFuncType { + ROW_NUMBER, + RANK, + DENSE_RANK, +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionSortNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionSortNode.java new file mode 100644 index 0000000000..d27b54af7b --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PartitionSortNode.java @@ -0,0 +1,222 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner; + +import org.apache.doris.analysis.Analyzer; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.SlotDescriptor; +import org.apache.doris.analysis.SlotId; +import org.apache.doris.analysis.SortInfo; +import org.apache.doris.common.NotImplementedException; +import org.apache.doris.nereids.types.WindowFuncType; +import org.apache.doris.statistics.StatisticalType; +import org.apache.doris.thrift.TExplainLevel; +import org.apache.doris.thrift.TPartitionSortNode; +import org.apache.doris.thrift.TPlanNode; +import org.apache.doris.thrift.TPlanNodeType; +import org.apache.doris.thrift.TSortInfo; +import org.apache.doris.thrift.TopNAlgorithm; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + +/** + * PartitionSortNode. + * PartitionSortNode is only used in the Nereids. + */ +public class PartitionSortNode extends PlanNode { + private static final Logger LOG = LogManager.getLogger(PartitionSortNode.class); + private List resolvedTupleExprs; + private final WindowFuncType function; + private final List partitionExprs; + private final SortInfo info; + private final boolean hasGlobalLimit; + private final long partitionLimit; + + private boolean isUnusedExprRemoved = false; + private ArrayList nullabilityChangedFlags = Lists.newArrayList(); + + /** + * Constructor. + */ + public PartitionSortNode(PlanNodeId id, PlanNode input, WindowFuncType function, List partitionExprs, + SortInfo info, boolean hasGlobalLimit, long partitionLimit, + List outputList, List orderingExpr) { + super(id, "PartitionTopN", StatisticalType.PARTITION_TOPN_NODE); + this.function = function; + this.partitionExprs = partitionExprs; + this.info = info; + this.hasGlobalLimit = hasGlobalLimit; + this.partitionLimit = partitionLimit; + this.tupleIds.addAll(Lists.newArrayList(info.getSortTupleDescriptor().getId())); + this.tblRefIds.addAll(Lists.newArrayList(info.getSortTupleDescriptor().getId())); + this.nullableTupleIds.addAll(input.getNullableTupleIds()); + this.children.add(input); + + List resolvedTupleExprs = new ArrayList<>(); + for (Expr order : orderingExpr) { + if (!resolvedTupleExprs.contains(order)) { + resolvedTupleExprs.add(order); + } + } + for (Expr output : outputList) { + if (!resolvedTupleExprs.contains(output)) { + resolvedTupleExprs.add(output); + } + } + this.resolvedTupleExprs = ImmutableList.copyOf(resolvedTupleExprs); + info.setSortTupleSlotExprs(resolvedTupleExprs); + + nullabilityChangedFlags.clear(); + for (int i = 0; i < resolvedTupleExprs.size(); i++) { + nullabilityChangedFlags.add(false); + } + Preconditions.checkArgument(info.getOrderingExprs().size() == info.getIsAscOrder().size()); + } + + public SortInfo getSortInfo() { + return info; + } + + @Override + public void getMaterializedIds(Analyzer analyzer, List ids) { + super.getMaterializedIds(analyzer, ids); + Expr.getIds(info.getOrderingExprs(), null, ids); + } + + @Override + public String getNodeExplainString(String prefix, TExplainLevel detailLevel) { + if (detailLevel == TExplainLevel.BRIEF) { + return ""; + } + + StringBuilder output = new StringBuilder(); + + // Add the function name. + String funcName; + if (function == WindowFuncType.ROW_NUMBER) { + funcName = "row_number"; + } else if (function == WindowFuncType.RANK) { + funcName = "rank"; + } else { + funcName = "dense_rank"; + } + output.append(prefix).append("functions: ").append(funcName).append("\n"); + + // Add the partition expr. + List strings = Lists.newArrayList(); + if (!partitionExprs.isEmpty()) { + output.append(prefix).append("partition by: "); + + for (Expr partitionExpr : partitionExprs) { + strings.add(partitionExpr.toSql()); + } + + output.append(Joiner.on(", ").join(strings)); + output.append("\n"); + } + + // Add the order by. + output.append(prefix).append("order by: "); + Iterator expr = info.getOrderingExprs().iterator(); + Iterator isAsc = info.getIsAscOrder().iterator(); + boolean start = true; + while (expr.hasNext()) { + if (start) { + start = false; + } else { + output.append(", "); + } + output.append(expr.next().toSql()).append(" "); + output.append(isAsc.next() ? "ASC" : "DESC"); + } + output.append("\n"); + + // Add the limit information; + output.append(prefix).append("has global limit: ").append(hasGlobalLimit).append("\n"); + output.append(prefix).append("partition limit: ").append(partitionLimit).append("\n"); + + return output.toString(); + } + + private void removeUnusedExprs() { + if (!isUnusedExprRemoved) { + if (resolvedTupleExprs != null) { + List slotDescriptorList = this.info.getSortTupleDescriptor().getSlots(); + for (int i = slotDescriptorList.size() - 1; i >= 0; i--) { + if (!slotDescriptorList.get(i).isMaterialized()) { + resolvedTupleExprs.remove(i); + nullabilityChangedFlags.remove(i); + } + } + } + isUnusedExprRemoved = true; + } + } + + @Override + protected void toThrift(TPlanNode msg) { + msg.node_type = TPlanNodeType.PARTITION_SORT_NODE; + + TSortInfo sortInfo = info.toThrift(); + Preconditions.checkState(tupleIds.size() == 1, "Incorrect size for tupleIds in PartitionSortNode"); + removeUnusedExprs(); + if (resolvedTupleExprs != null) { + sortInfo.setSortTupleSlotExprs(Expr.treesToThrift(resolvedTupleExprs)); + // FIXME this is a bottom line solution for wrong nullability of resolvedTupleExprs + // remove the following line after nereids online + sortInfo.setSlotExprsNullabilityChangedFlags(nullabilityChangedFlags); + } + + TopNAlgorithm topNAlgorithm; + if (function == WindowFuncType.ROW_NUMBER) { + topNAlgorithm = TopNAlgorithm.ROW_NUMBER; + } else if (function == WindowFuncType.RANK) { + topNAlgorithm = TopNAlgorithm.RANK; + } else { + topNAlgorithm = TopNAlgorithm.DENSE_RANK; + } + + TPartitionSortNode partitionSortNode = new TPartitionSortNode(); + partitionSortNode.setTopNAlgorithm(topNAlgorithm); + partitionSortNode.setPartitionExprs(Expr.treesToThrift(partitionExprs)); + partitionSortNode.setSortInfo(sortInfo); + partitionSortNode.setHasGlobalLimit(hasGlobalLimit); + partitionSortNode.setPartitionInnerLimit(partitionLimit); + msg.partition_sort_node = partitionSortNode; + } + + @Override + public Set computeInputSlotIds(Analyzer analyzer) throws NotImplementedException { + removeUnusedExprs(); + List materializedTupleExprs = new ArrayList<>(resolvedTupleExprs); + List result = Lists.newArrayList(); + Expr.getIds(materializedTupleExprs, null, result); + return new HashSet<>(result); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 1278ce46fa..042344f5b1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -173,6 +173,8 @@ public class SessionVariable implements Serializable, Writable { public static final String ENABLE_BUSHY_TREE = "enable_bushy_tree"; + public static final String ENABLE_PARTITION_TOPN = "enable_partition_topn"; + public static final String ENABLE_INFER_PREDICATE = "enable_infer_predicate"; public static final long DEFAULT_INSERT_VISIBLE_TIMEOUT_MS = 10_000; @@ -639,6 +641,9 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = ENABLE_BUSHY_TREE, needForward = true) private boolean enableBushyTree = false; + @VariableMgr.VarAttr(name = ENABLE_PARTITION_TOPN) + private boolean enablePartitionTopN = false; + @VariableMgr.VarAttr(name = ENABLE_INFER_PREDICATE) private boolean enableInferPredicate = true; @@ -1641,6 +1646,14 @@ public class SessionVariable implements Serializable, Writable { this.enableBushyTree = enableBushyTree; } + public boolean isEnablePartitionTopN() { + return enablePartitionTopN; + } + + public void setEnablePartitionTopN(boolean enablePartitionTopN) { + this.enablePartitionTopN = enablePartitionTopN; + } + public boolean isReturnObjectDataAsBinary() { return returnObjectDataAsBinary; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java index 1d1857856d..3a4a283c79 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java @@ -38,6 +38,7 @@ public enum StatisticalType { MYSQL_SCAN_NODE, ODBC_SCAN_NODE, OLAP_SCAN_NODE, + PARTITION_TOPN_NODE, REPEAT_NODE, SELECT_NODE, SET_OPERATION_NODE, diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownFilterThroughWindowTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownFilterThroughWindowTest.java new file mode 100644 index 0000000000..5c70cf40a2 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownFilterThroughWindowTest.java @@ -0,0 +1,120 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.rules.rewrite.logical; + +import org.apache.doris.nereids.properties.OrderKey; +import org.apache.doris.nereids.trees.expressions.Alias; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.LessThanEqual; +import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.expressions.OrderExpression; +import org.apache.doris.nereids.trees.expressions.WindowExpression; +import org.apache.doris.nereids.trees.expressions.WindowFrame; +import org.apache.doris.nereids.trees.expressions.functions.window.RowNumber; +import org.apache.doris.nereids.trees.expressions.literal.Literal; +import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan; +import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; +import org.apache.doris.nereids.trees.plans.logical.LogicalWindow; +import org.apache.doris.nereids.types.WindowFuncType; +import org.apache.doris.nereids.util.LogicalPlanBuilder; +import org.apache.doris.nereids.util.MemoPatternMatchSupported; +import org.apache.doris.nereids.util.MemoTestUtils; +import org.apache.doris.nereids.util.PlanChecker; +import org.apache.doris.nereids.util.PlanConstructor; +import org.apache.doris.nereids.util.RelationUtil; +import org.apache.doris.qe.ConnectContext; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import org.junit.jupiter.api.Test; + +import java.util.List; + +public class PushdownFilterThroughWindowTest implements MemoPatternMatchSupported { + private final LogicalOlapScan scan = new LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.student, + ImmutableList.of("")); + + /*- + * origin plan: + * project + * | + * filter row_number <= 100 + * | + * window(ROW_NUMBER() as row_number PARTITION BY gender ORDER BY age) + * | + * scan(student) + * + * transformed plan: + * project + * | + * filter row_number <= 100 + * | + * window(ROW_NUMBER() as row_number PARTITION BY gender ORDER BY age) + * | + * partitionTopN(row_number(), partition by gender, order by age, hasGlobalLimit: false, partitionLimit: 100) + * | + * scan(student) + */ + @Test + public void pushDownFilterThroughWindowTest() { + ConnectContext context = MemoTestUtils.createConnectContext(); + context.getSessionVariable().setEnablePartitionTopN(true); + NamedExpression gender = scan.getOutput().get(1).toSlot(); + NamedExpression age = scan.getOutput().get(3).toSlot(); + + List partitionKeyList = ImmutableList.of(gender); + List orderKeyList = ImmutableList.of(new OrderExpression( + new OrderKey(age, true, true))); + WindowFrame windowFrame = new WindowFrame(WindowFrame.FrameUnitsType.ROWS, + WindowFrame.FrameBoundary.newPrecedingBoundary(), + WindowFrame.FrameBoundary.newCurrentRowBoundary()); + WindowExpression window1 = new WindowExpression(new RowNumber(), partitionKeyList, orderKeyList, windowFrame); + Alias windowAlias1 = new Alias(window1, window1.toSql()); + List expressions = Lists.newArrayList(windowAlias1); + LogicalWindow window = new LogicalWindow<>(expressions, scan); + Expression filterPredicate = new LessThanEqual(window.getOutput().get(4).toSlot(), Literal.of(100)); + + LogicalPlan plan = new LogicalPlanBuilder(window) + .filter(filterPredicate) + .project(ImmutableList.of(0)) + .build(); + + PlanChecker.from(context, plan) + .applyTopDown(new PushdownFilterThroughWindow()) + .matches( + logicalProject( + logicalFilter( + logicalWindow( + logicalPartitionTopN( + logicalOlapScan() + ).when(logicalPartitionTopN -> { + WindowFuncType funName = logicalPartitionTopN.getFunction(); + List partitionKeys = logicalPartitionTopN.getPartitionKeys(); + List orderKeys = logicalPartitionTopN.getOrderKeys(); + boolean hasGlobalLimit = logicalPartitionTopN.hasGlobalLimit(); + long partitionLimit = logicalPartitionTopN.getPartitionLimit(); + return funName == WindowFuncType.ROW_NUMBER && partitionKeys.equals(partitionKeyList) + && orderKeys.equals(orderKeyList) && !hasGlobalLimit && partitionLimit == 100; + }) + ) + ).when(filter -> filter.getConjuncts().equals(ImmutableSet.of(filterPredicate))) + ) + ); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownLimitTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownLimitTest.java index f961fa4b8d..6f7a0af4b4 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownLimitTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownLimitTest.java @@ -19,15 +19,28 @@ package org.apache.doris.nereids.rules.rewrite.logical; import org.apache.doris.nereids.analyzer.UnboundSlot; import org.apache.doris.nereids.pattern.PatternDescriptor; +import org.apache.doris.nereids.properties.OrderKey; +import org.apache.doris.nereids.trees.expressions.Alias; import org.apache.doris.nereids.trees.expressions.EqualTo; import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.expressions.OrderExpression; +import org.apache.doris.nereids.trees.expressions.WindowExpression; +import org.apache.doris.nereids.trees.expressions.WindowFrame; +import org.apache.doris.nereids.trees.expressions.functions.window.Rank; +import org.apache.doris.nereids.trees.expressions.functions.window.RowNumber; import org.apache.doris.nereids.trees.plans.JoinType; import org.apache.doris.nereids.trees.plans.LimitPhase; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.logical.LogicalJoin; import org.apache.doris.nereids.trees.plans.logical.LogicalLimit; import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan; +import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.nereids.trees.plans.logical.LogicalProject; +import org.apache.doris.nereids.trees.plans.logical.LogicalSort; +import org.apache.doris.nereids.trees.plans.logical.LogicalWindow; +import org.apache.doris.nereids.types.WindowFuncType; +import org.apache.doris.nereids.util.LogicalPlanBuilder; import org.apache.doris.nereids.util.MemoPatternMatchSupported; import org.apache.doris.nereids.util.MemoTestUtils; import org.apache.doris.nereids.util.PlanChecker; @@ -35,9 +48,11 @@ import org.apache.doris.nereids.util.PlanConstructor; import org.apache.doris.nereids.util.RelationUtil; import org.apache.doris.planner.OlapScanNode; import org.apache.doris.planner.PlanFragment; +import org.apache.doris.qe.ConnectContext; import org.apache.doris.utframe.TestWithFeService; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; import com.google.common.collect.Sets; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -49,7 +64,7 @@ import java.util.function.Function; import java.util.stream.Collectors; class PushdownLimitTest extends TestWithFeService implements MemoPatternMatchSupported { - private Plan scanScore = new LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.score); + private final LogicalOlapScan scanScore = new LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.score); private Plan scanStudent = new LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.student); @Override @@ -246,6 +261,91 @@ class PushdownLimitTest extends TestWithFeService implements MemoPatternMatchSup ); } + @Test + public void testLimitPushWindow() { + ConnectContext context = MemoTestUtils.createConnectContext(); + context.getSessionVariable().setEnablePartitionTopN(true); + NamedExpression grade = scanScore.getOutput().get(2).toSlot(); + + List partitionKeyList = ImmutableList.of(); + List orderKeyList = ImmutableList.of(new OrderExpression( + new OrderKey(grade, true, true))); + WindowFrame windowFrame = new WindowFrame(WindowFrame.FrameUnitsType.ROWS, + WindowFrame.FrameBoundary.newPrecedingBoundary(), + WindowFrame.FrameBoundary.newCurrentRowBoundary()); + WindowExpression window1 = new WindowExpression(new RowNumber(), partitionKeyList, orderKeyList, windowFrame); + Alias windowAlias1 = new Alias(window1, window1.toSql()); + List expressions = Lists.newArrayList(windowAlias1); + LogicalWindow window = new LogicalWindow<>(expressions, scanScore); + + LogicalPlan plan = new LogicalPlanBuilder(window) + .limit(100) + .build(); + + PlanChecker.from(context, plan) + .rewrite() + .matches( + logicalLimit( + logicalWindow( + logicalPartitionTopN( + logicalOlapScan() + ).when(logicalPartitionTopN -> { + WindowFuncType funName = logicalPartitionTopN.getFunction(); + List partitionKeys = logicalPartitionTopN.getPartitionKeys(); + List orderKeys = logicalPartitionTopN.getOrderKeys(); + boolean hasGlobalLimit = logicalPartitionTopN.hasGlobalLimit(); + long partitionLimit = logicalPartitionTopN.getPartitionLimit(); + return funName == WindowFuncType.ROW_NUMBER && partitionKeys.equals(partitionKeyList) + && orderKeys.equals(orderKeyList) && hasGlobalLimit && partitionLimit == 100; + }) + ) + ).when(limit -> limit.getLimit() == 100) + ); + } + + @Test + public void testTopNPushWindow() { + ConnectContext context = MemoTestUtils.createConnectContext(); + context.getSessionVariable().setEnablePartitionTopN(true); + NamedExpression grade = scanScore.getOutput().get(2).toSlot(); + + List partitionKeyList = ImmutableList.of(); + List orderKeyList = ImmutableList.of(new OrderExpression( + new OrderKey(grade, true, true))); + WindowFrame windowFrame = new WindowFrame(WindowFrame.FrameUnitsType.RANGE, + WindowFrame.FrameBoundary.newPrecedingBoundary(), + WindowFrame.FrameBoundary.newCurrentRowBoundary()); + WindowExpression window1 = new WindowExpression(new Rank(), partitionKeyList, orderKeyList, windowFrame); + Alias windowAlias1 = new Alias(window1, window1.toSql()); + List expressions = Lists.newArrayList(windowAlias1); + LogicalWindow window = new LogicalWindow<>(expressions, scanScore); + List orderKey = ImmutableList.of( + new OrderKey(windowAlias1.toSlot(), true, true) + ); + LogicalSort sort = new LogicalSort<>(orderKey, window); + + LogicalPlan plan = new LogicalPlanBuilder(sort) + .limit(100) + .build(); + + PlanChecker.from(context, plan) + .rewrite() + .matches( + logicalTopN( + logicalWindow( + logicalPartitionTopN( + logicalOlapScan() + ).when(logicalPartitionTopN -> { + WindowFuncType funName = logicalPartitionTopN.getFunction(); + boolean hasGlobalLimit = logicalPartitionTopN.hasGlobalLimit(); + long partitionLimit = logicalPartitionTopN.getPartitionLimit(); + return funName == WindowFuncType.RANK && hasGlobalLimit && partitionLimit == 100; + }) + ) + ) + ); + } + private void test(JoinType joinType, boolean hasProject, PatternDescriptor pattern) { Plan plan = generatePlan(joinType, hasProject); PlanChecker.from(MemoTestUtils.createConnectContext())