From acc14d7e4c73976704101d2f94b059da4edeb912 Mon Sep 17 00:00:00 2001 From: jakevin Date: Wed, 29 Nov 2023 21:12:42 +0800 Subject: [PATCH] [feature](Planner): Push down LimitDistinct through Union (#27745) --- .../doris/nereids/jobs/executor/Rewriter.java | 2 + .../apache/doris/nereids/rules/RuleType.java | 1 + .../PushDownLimitDistinctThroughUnion.java | 100 ++++++++++++++++++ .../PushDownTopNDistinctThroughUnion.java | 1 + .../doris/nereids/util/ExpressionUtils.java | 14 +++ .../push_down_limit_distinct.out | 21 ++++ .../push_down_limit_distinct.groovy | 54 ++++++++++ 7 files changed, 193 insertions(+) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownLimitDistinctThroughUnion.java create mode 100644 regression-test/data/nereids_rules_p0/push_down_limit_distinct/push_down_limit_distinct.out create mode 100644 regression-test/suites/nereids_rules_p0/push_down_limit_distinct/push_down_limit_distinct.groovy diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java index e523fc5163..c850cdae58 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java @@ -95,6 +95,7 @@ import org.apache.doris.nereids.rules.rewrite.PushDownDistinctThroughJoin; import org.apache.doris.nereids.rules.rewrite.PushDownFilterThroughProject; import org.apache.doris.nereids.rules.rewrite.PushDownLimit; import org.apache.doris.nereids.rules.rewrite.PushDownLimitDistinctThroughJoin; +import org.apache.doris.nereids.rules.rewrite.PushDownLimitDistinctThroughUnion; import org.apache.doris.nereids.rules.rewrite.PushDownMinMaxThroughJoin; import org.apache.doris.nereids.rules.rewrite.PushDownSumThroughJoin; import org.apache.doris.nereids.rules.rewrite.PushDownTopNThroughJoin; @@ -286,6 +287,7 @@ public class Rewriter extends AbstractBatchJobExecutor { new PushDownLimit(), new PushDownTopNThroughJoin(), new PushDownLimitDistinctThroughJoin(), + new PushDownLimitDistinctThroughUnion(), new PushDownTopNThroughWindow(), new PushDownTopNThroughUnion() ), diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java index 09e01136d8..0aa71f39ea 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java @@ -269,6 +269,7 @@ public enum RuleType { // limit distinct push down PUSH_DOWN_LIMIT_DISTINCT_THROUGH_JOIN(RuleTypeClass.REWRITE), PUSH_DOWN_LIMIT_DISTINCT_THROUGH_PROJECT_JOIN(RuleTypeClass.REWRITE), + PUSH_DOWN_LIMIT_DISTINCT_THROUGH_UNION(RuleTypeClass.REWRITE), // adjust nullable ADJUST_NULLABLE(RuleTypeClass.REWRITE), ADJUST_CONJUNCTS_RETURN_TYPE(RuleTypeClass.REWRITE), diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownLimitDistinctThroughUnion.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownLimitDistinctThroughUnion.java new file mode 100644 index 0000000000..b685f680c9 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownLimitDistinctThroughUnion.java @@ -0,0 +1,100 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.rules.rewrite; + +import org.apache.doris.nereids.rules.Rule; +import org.apache.doris.nereids.rules.RuleType; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.algebra.SetOperation.Qualifier; +import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate; +import org.apache.doris.nereids.trees.plans.logical.LogicalLimit; +import org.apache.doris.nereids.trees.plans.logical.LogicalUnion; +import org.apache.doris.nereids.util.ExpressionUtils; + +import com.google.common.collect.ImmutableList; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + *
+ * LIMIT-Distinct
+ * -> Union All
+ * -> child plan1
+ * -> child plan2
+ * -> child plan3
+ *
+ * rewritten to
+ *
+ * LIMIT-Distinct
+ * -> Union All
+ *   -> LIMIT-Distinct
+ *     -> child plan1
+ *   -> LIMIT-Distinct
+ *     -> LIMIT plan2
+ *   -> TopN-Distinct
+ *     -> LIMIT plan3
+ * 
+ */ +public class PushDownLimitDistinctThroughUnion implements RewriteRuleFactory { + + @Override + public List buildRules() { + return ImmutableList.of( + logicalLimit(logicalAggregate(logicalUnion().when(union -> union.getQualifier() == Qualifier.ALL)) + .when(agg -> agg.isDistinct())) + .then(limit -> { + LogicalAggregate agg = limit.child(); + LogicalUnion union = agg.child(); + + List newChildren = new ArrayList<>(); + for (Plan child : union.children()) { + Map replaceMap = new HashMap<>(); + for (int i = 0; i < union.getOutputs().size(); ++i) { + NamedExpression output = union.getOutputs().get(i); + replaceMap.put(output, child.getOutput().get(i)); + } + + List newGroupBy = agg.getGroupByExpressions().stream() + .map(expr -> ExpressionUtils.replace(expr, replaceMap)) + .collect(Collectors.toList()); + List newOutputs = agg.getOutputs().stream() + .map(expr -> ExpressionUtils.replace(expr, replaceMap)) + .collect(Collectors.toList()); + + LogicalAggregate newAgg = new LogicalAggregate<>(newGroupBy, newOutputs, child); + LogicalLimit newLimit = limit.withLimitChild(limit.getLimit() + limit.getOffset(), + 0, newAgg); + + newChildren.add(newLimit); + } + + if (union.children().equals(newChildren)) { + return null; + } + return limit.withChildren(agg.withChildren(union.withChildren(newChildren))); + }) + .toRule(RuleType.PUSH_DOWN_LIMIT_DISTINCT_THROUGH_UNION) + ); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownTopNDistinctThroughUnion.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownTopNDistinctThroughUnion.java index 15023cb514..7563ecd516 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownTopNDistinctThroughUnion.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownTopNDistinctThroughUnion.java @@ -46,6 +46,7 @@ import java.util.Map; * * rewritten to * + * TopN-Distinct * -> Union All * -> TopN-Distinct * -> child plan1 diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/ExpressionUtils.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/ExpressionUtils.java index c27254a7ed..3149d140fe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/ExpressionUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/ExpressionUtils.java @@ -23,6 +23,7 @@ import org.apache.doris.nereids.rules.exploration.mv.SlotMapping; import org.apache.doris.nereids.rules.expression.ExpressionRewriteContext; import org.apache.doris.nereids.rules.expression.rules.FoldConstantRule; import org.apache.doris.nereids.trees.TreeNode; +import org.apache.doris.nereids.trees.expressions.Alias; import org.apache.doris.nereids.trees.expressions.And; import org.apache.doris.nereids.trees.expressions.Cast; import org.apache.doris.nereids.trees.expressions.ComparisonPredicate; @@ -300,6 +301,19 @@ public class ExpressionUtils { return expr.accept(ExpressionReplacer.INSTANCE, replaceMap); } + /** + * replace NameExpression. + */ + public static NamedExpression replace(NamedExpression expr, + Map replaceMap) { + Expression newExpr = expr.accept(ExpressionReplacer.INSTANCE, replaceMap); + if (newExpr instanceof NamedExpression) { + return (NamedExpression) newExpr; + } else { + return new Alias(expr.getExprId(), newExpr, expr.getName()); + } + } + public static List replace(List exprs, Map replaceMap) { return exprs.stream() diff --git a/regression-test/data/nereids_rules_p0/push_down_limit_distinct/push_down_limit_distinct.out b/regression-test/data/nereids_rules_p0/push_down_limit_distinct/push_down_limit_distinct.out new file mode 100644 index 0000000000..1cb6435483 --- /dev/null +++ b/regression-test/data/nereids_rules_p0/push_down_limit_distinct/push_down_limit_distinct.out @@ -0,0 +1,21 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !basic -- +PhysicalResultSink +--PhysicalLimit[GLOBAL] +----PhysicalDistribute +------PhysicalLimit[LOCAL] +--------hashAgg[GLOBAL] +----------PhysicalDistribute +------------hashAgg[LOCAL] +--------------PhysicalUnion +----------------PhysicalDistribute +------------------PhysicalLimit[LOCAL] +--------------------hashAgg[LOCAL] +----------------------PhysicalProject +------------------------PhysicalOlapScan[t] +----------------PhysicalDistribute +------------------PhysicalLimit[LOCAL] +--------------------hashAgg[LOCAL] +----------------------PhysicalProject +------------------------PhysicalOlapScan[t] + diff --git a/regression-test/suites/nereids_rules_p0/push_down_limit_distinct/push_down_limit_distinct.groovy b/regression-test/suites/nereids_rules_p0/push_down_limit_distinct/push_down_limit_distinct.groovy new file mode 100644 index 0000000000..cbb509db32 --- /dev/null +++ b/regression-test/suites/nereids_rules_p0/push_down_limit_distinct/push_down_limit_distinct.groovy @@ -0,0 +1,54 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("push_down_limit_distinct_through_join") { + sql "SET enable_nereids_planner=true" + sql "SET enable_fallback_to_original_planner=false" + + sql """ + DROP TABLE IF EXISTS t; + """ + + sql """ + CREATE TABLE IF NOT EXISTS t( + `id` int(32), + `score` int(64) NULL, + `name` varchar(64) NULL + ) ENGINE = OLAP + DISTRIBUTED BY HASH(id) BUCKETS 4 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + + sql "insert into t values (1, 1, 'a')" + sql "insert into t values (2, null, 'a')" + sql "insert into t values (3, 1, null)" + sql "insert into t values (4, 2, 'b')" + sql "insert into t values (5, null, 'b')" + sql "insert into t values (6, 2, null)" + sql "insert into t values (7, 3, 'c')" + sql "insert into t values (8, null, 'c')" + sql "insert into t values (9, 3, null)" + sql "insert into t values (10, null, null)" + + sql "SET ENABLE_NEREIDS_RULES=push_down_limit_distinct_through_join" + + qt_basic """ + explain shape plan (select id from t t1 union select id from t t2) limit 0, 200; + """ +} \ No newline at end of file