From 9756ff1e259fae3f29143f55e01aac549a73f612 Mon Sep 17 00:00:00 2001 From: jakevin Date: Thu, 29 Jun 2023 22:04:41 +0800 Subject: [PATCH] [feature](Nereids): infer distinct from SetOperator (#21235) Infer distinct from Distinct SetOperator, and put distinct above children to reduce data. tpcds_sf100 q14: before 100 rows in set (7.60 sec) after 100 rows in set (6.80 sec) --- .../doris/nereids/jobs/executor/Rewriter.java | 2 + .../apache/doris/nereids/rules/RuleType.java | 1 + .../ProjectWithDistinctToAggregate.java | 36 ++++---- .../rewrite/InferSetOperatorDistinct.java | 54 +++++++++++ .../trees/plans/logical/LogicalAggregate.java | 7 ++ .../shape/query14.out | 90 ++++++++++--------- 6 files changed, 127 insertions(+), 63 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/InferSetOperatorDistinct.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java index f7bd7c1a5a..ba3d71d896 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java @@ -61,6 +61,7 @@ import org.apache.doris.nereids.rules.rewrite.InferAggNotNull; import org.apache.doris.nereids.rules.rewrite.InferFilterNotNull; import org.apache.doris.nereids.rules.rewrite.InferJoinNotNull; import org.apache.doris.nereids.rules.rewrite.InferPredicates; +import org.apache.doris.nereids.rules.rewrite.InferSetOperatorDistinct; import org.apache.doris.nereids.rules.rewrite.InlineCTE; import org.apache.doris.nereids.rules.rewrite.MergeFilters; import org.apache.doris.nereids.rules.rewrite.MergeOneRowRelationIntoUnion; @@ -232,6 +233,7 @@ public class Rewriter extends AbstractBatchJobExecutor { bottomUp(new MergeSetOperations()), bottomUp(new PushProjectIntoOneRowRelation()), topDown(new MergeOneRowRelationIntoUnion()), + costBased(topDown(new InferSetOperatorDistinct())), topDown(new BuildAggForUnion()) ), diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java index bf95f4468e..b7e75c3659 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java @@ -107,6 +107,7 @@ public enum RuleType { ELIMINATE_HINT(RuleTypeClass.REWRITE), INFER_PREDICATES(RuleTypeClass.REWRITE), INFER_AGG_NOT_NULL(RuleTypeClass.REWRITE), + INFER_SET_OPERATOR_DISTINCT(RuleTypeClass.REWRITE), INFER_FILTER_NOT_NULL(RuleTypeClass.REWRITE), INFER_JOIN_NOT_NULL(RuleTypeClass.REWRITE), // subquery analyze diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/ProjectWithDistinctToAggregate.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/ProjectWithDistinctToAggregate.java index 1b6b270ab7..230eef9e1f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/ProjectWithDistinctToAggregate.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/ProjectWithDistinctToAggregate.java @@ -22,38 +22,32 @@ import org.apache.doris.nereids.rules.RuleType; import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction; import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate; - -import com.google.common.collect.Lists; +import org.apache.doris.nereids.trees.plans.logical.LogicalProject; /** * ProjectWithDistinctToAggregate. - * + *

* example sql: *

- * select distinct value
- * from tbl
- * 
+ * select distinct value from tbl * - * origin plan: transformed plan: - * - * LogicalProject(projects=[distinct value]) LogicalAggregate(groupBy=[value], output=[value]) - * | => | - * LogicalOlapScan(table=tbl) LogicalOlapScan(table=tbl) + * LogicalProject(projects=[distinct value]) + * | + * LogicalOlapScan(table=tbl) + * => + * LogicalAggregate(groupBy=[value], output=[value]) + * | + * LogicalOlapScan(table=tbl) + * */ public class ProjectWithDistinctToAggregate extends OneAnalysisRuleFactory { @Override public Rule build() { return RuleType.PROJECT_WITH_DISTINCT_TO_AGGREGATE.build( - logicalProject().then(project -> { - if (project.isDistinct() && project.getProjects() - .stream() - .noneMatch(this::hasAggregateFunction)) { - return new LogicalAggregate<>(Lists.newArrayList(project.getProjects()), project.getProjects(), - project.child()); - } else { - return project; - } - }) + logicalProject() + .when(LogicalProject::isDistinct) + .whenNot(project -> project.getProjects().stream().anyMatch(this::hasAggregateFunction)) + .then(project -> new LogicalAggregate<>(project.getProjects(), project.child())) ); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/InferSetOperatorDistinct.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/InferSetOperatorDistinct.java new file mode 100644 index 0000000000..c77edb2e17 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/InferSetOperatorDistinct.java @@ -0,0 +1,54 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.rules.rewrite; + +import org.apache.doris.nereids.rules.Rule; +import org.apache.doris.nereids.rules.RuleType; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.algebra.SetOperation.Qualifier; +import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate; + +import com.google.common.collect.ImmutableList; + +import java.util.List; + +/** + * Infer Distinct from SetOperator; + * Example: + *
+ *                    Intersect
+ *   Intersect ->        |
+ *                  Agg for Distinct
+ * 
+ */ +public class InferSetOperatorDistinct extends OneRewriteRuleFactory { + @Override + public Rule build() { + return logicalSetOperation() + .when(operation -> operation.getQualifier() == Qualifier.DISTINCT) + .then(setOperation -> { + List newChildren = setOperation.children().stream() + .map(child -> new LogicalAggregate<>(ImmutableList.copyOf(child.getOutput()), child)) + .collect(ImmutableList.toImmutableList()); + if (newChildren.equals(setOperation.children())) { + return null; + } + return setOperation.withChildren(newChildren); + }).toRule(RuleType.INFER_SET_OPERATOR_DISTINCT); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalAggregate.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalAggregate.java index e9926d4d92..d6136a0812 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalAggregate.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalAggregate.java @@ -76,6 +76,13 @@ public class LogicalAggregate false, Optional.empty(), child); } + /** + * Distinct Agg + */ + public LogicalAggregate(List namedExpressions, CHILD_TYPE child) { + this(ImmutableList.copyOf(namedExpressions), namedExpressions, false, Optional.empty(), child); + } + public LogicalAggregate(List groupByExpressions, List outputExpressions, boolean ordinalIsResolved, CHILD_TYPE child) { this(groupByExpressions, outputExpressions, false, ordinalIsResolved, Optional.empty(), diff --git a/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query14.out b/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query14.out index 4877f71f8c..ecf6a69c87 100644 --- a/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query14.out +++ b/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query14.out @@ -4,49 +4,55 @@ CteAnchor[cteId= ( CTEId#8=] ) --CteProducer[cteId= ( CTEId#8=] ) ----PhysicalProject ------hashJoin[INNER_JOIN](item.i_brand_id = t.brand_id)(item.i_class_id = t.class_id)(item.i_category_id = t.category_id) ---------PhysicalIntersect -----------PhysicalDistribute -------------PhysicalProject ---------------hashJoin[INNER_JOIN](store_sales.ss_item_sk = iss.i_item_sk) -----------------hashJoin[INNER_JOIN](store_sales.ss_sold_date_sk = d1.d_date_sk) -------------------PhysicalProject ---------------------PhysicalOlapScan[store_sales] -------------------PhysicalDistribute ---------------------PhysicalProject -----------------------filter((d1.d_year <= 2002)(d1.d_year >= 2000)) -------------------------PhysicalOlapScan[date_dim] -----------------PhysicalDistribute -------------------PhysicalProject ---------------------PhysicalOlapScan[item] -----------PhysicalDistribute -------------PhysicalProject ---------------hashJoin[INNER_JOIN](catalog_sales.cs_item_sk = ics.i_item_sk) -----------------hashJoin[INNER_JOIN](catalog_sales.cs_sold_date_sk = d2.d_date_sk) -------------------PhysicalProject ---------------------PhysicalOlapScan[catalog_sales] -------------------PhysicalDistribute ---------------------PhysicalProject -----------------------filter((d2.d_year >= 2000)(d2.d_year <= 2002)) -------------------------PhysicalOlapScan[date_dim] -----------------PhysicalDistribute -------------------PhysicalProject ---------------------PhysicalOlapScan[item] -----------PhysicalDistribute -------------PhysicalProject ---------------hashJoin[INNER_JOIN](web_sales.ws_item_sk = iws.i_item_sk) -----------------hashJoin[INNER_JOIN](web_sales.ws_sold_date_sk = d3.d_date_sk) -------------------PhysicalProject ---------------------PhysicalOlapScan[web_sales] -------------------PhysicalDistribute ---------------------PhysicalProject -----------------------filter((d3.d_year <= 2002)(d3.d_year >= 2000)) -------------------------PhysicalOlapScan[date_dim] -----------------PhysicalDistribute -------------------PhysicalProject ---------------------PhysicalOlapScan[item] +--------PhysicalProject +----------PhysicalOlapScan[item] --------PhysicalDistribute -----------PhysicalProject -------------PhysicalOlapScan[item] +----------PhysicalIntersect +------------hashAgg[GLOBAL] +--------------PhysicalDistribute +----------------hashAgg[LOCAL] +------------------PhysicalProject +--------------------hashJoin[INNER_JOIN](store_sales.ss_item_sk = iss.i_item_sk) +----------------------hashJoin[INNER_JOIN](store_sales.ss_sold_date_sk = d1.d_date_sk) +------------------------PhysicalProject +--------------------------PhysicalOlapScan[store_sales] +------------------------PhysicalDistribute +--------------------------PhysicalProject +----------------------------filter((d1.d_year <= 2002)(d1.d_year >= 2000)) +------------------------------PhysicalOlapScan[date_dim] +----------------------PhysicalDistribute +------------------------PhysicalProject +--------------------------PhysicalOlapScan[item] +------------hashAgg[GLOBAL] +--------------PhysicalDistribute +----------------hashAgg[LOCAL] +------------------PhysicalProject +--------------------hashJoin[INNER_JOIN](catalog_sales.cs_item_sk = ics.i_item_sk) +----------------------hashJoin[INNER_JOIN](catalog_sales.cs_sold_date_sk = d2.d_date_sk) +------------------------PhysicalProject +--------------------------PhysicalOlapScan[catalog_sales] +------------------------PhysicalDistribute +--------------------------PhysicalProject +----------------------------filter((d2.d_year >= 2000)(d2.d_year <= 2002)) +------------------------------PhysicalOlapScan[date_dim] +----------------------PhysicalDistribute +------------------------PhysicalProject +--------------------------PhysicalOlapScan[item] +------------hashAgg[GLOBAL] +--------------PhysicalDistribute +----------------hashAgg[LOCAL] +------------------PhysicalProject +--------------------hashJoin[INNER_JOIN](web_sales.ws_item_sk = iws.i_item_sk) +----------------------hashJoin[INNER_JOIN](web_sales.ws_sold_date_sk = d3.d_date_sk) +------------------------PhysicalProject +--------------------------PhysicalOlapScan[web_sales] +------------------------PhysicalDistribute +--------------------------PhysicalProject +----------------------------filter((d3.d_year <= 2002)(d3.d_year >= 2000)) +------------------------------PhysicalOlapScan[date_dim] +----------------------PhysicalDistribute +------------------------PhysicalProject +--------------------------PhysicalOlapScan[item] --CteAnchor[cteId= ( CTEId#10=] ) ----CteProducer[cteId= ( CTEId#10=] ) ------hashAgg[GLOBAL]