[feature](Nereids): infer distinct from SetOperator (#21235)

Infer distinct from Distinct SetOperator, and put distinct above children to reduce data.

tpcds_sf100 q14:

before
100 rows in set (7.60 sec)

after
100 rows in set (6.80 sec)
This commit is contained in:
jakevin
2023-06-29 22:04:41 +08:00
committed by GitHub
parent f07e0d7686
commit 9756ff1e25
6 changed files with 127 additions and 63 deletions

View File

@ -61,6 +61,7 @@ import org.apache.doris.nereids.rules.rewrite.InferAggNotNull;
import org.apache.doris.nereids.rules.rewrite.InferFilterNotNull;
import org.apache.doris.nereids.rules.rewrite.InferJoinNotNull;
import org.apache.doris.nereids.rules.rewrite.InferPredicates;
import org.apache.doris.nereids.rules.rewrite.InferSetOperatorDistinct;
import org.apache.doris.nereids.rules.rewrite.InlineCTE;
import org.apache.doris.nereids.rules.rewrite.MergeFilters;
import org.apache.doris.nereids.rules.rewrite.MergeOneRowRelationIntoUnion;
@ -232,6 +233,7 @@ public class Rewriter extends AbstractBatchJobExecutor {
bottomUp(new MergeSetOperations()),
bottomUp(new PushProjectIntoOneRowRelation()),
topDown(new MergeOneRowRelationIntoUnion()),
costBased(topDown(new InferSetOperatorDistinct())),
topDown(new BuildAggForUnion())
),

View File

@ -107,6 +107,7 @@ public enum RuleType {
ELIMINATE_HINT(RuleTypeClass.REWRITE),
INFER_PREDICATES(RuleTypeClass.REWRITE),
INFER_AGG_NOT_NULL(RuleTypeClass.REWRITE),
INFER_SET_OPERATOR_DISTINCT(RuleTypeClass.REWRITE),
INFER_FILTER_NOT_NULL(RuleTypeClass.REWRITE),
INFER_JOIN_NOT_NULL(RuleTypeClass.REWRITE),
// subquery analyze

View File

@ -22,38 +22,32 @@ import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction;
import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
import com.google.common.collect.Lists;
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
/**
* ProjectWithDistinctToAggregate.
*
* <p>
* example sql:
* <pre>
* select distinct value
* from tbl
* </pre>
* select distinct value from tbl
*
* origin plan: transformed plan:
*
* LogicalProject(projects=[distinct value]) LogicalAggregate(groupBy=[value], output=[value])
* | => |
* LogicalOlapScan(table=tbl) LogicalOlapScan(table=tbl)
* LogicalProject(projects=[distinct value])
* |
* LogicalOlapScan(table=tbl)
* =>
* LogicalAggregate(groupBy=[value], output=[value])
* |
* LogicalOlapScan(table=tbl)
* </pre>
*/
public class ProjectWithDistinctToAggregate extends OneAnalysisRuleFactory {
@Override
public Rule build() {
return RuleType.PROJECT_WITH_DISTINCT_TO_AGGREGATE.build(
logicalProject().then(project -> {
if (project.isDistinct() && project.getProjects()
.stream()
.noneMatch(this::hasAggregateFunction)) {
return new LogicalAggregate<>(Lists.newArrayList(project.getProjects()), project.getProjects(),
project.child());
} else {
return project;
}
})
logicalProject()
.when(LogicalProject::isDistinct)
.whenNot(project -> project.getProjects().stream().anyMatch(this::hasAggregateFunction))
.then(project -> new LogicalAggregate<>(project.getProjects(), project.child()))
);
}

View File

@ -0,0 +1,54 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.rules.rewrite;
import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.algebra.SetOperation.Qualifier;
import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
import com.google.common.collect.ImmutableList;
import java.util.List;
/**
* Infer Distinct from SetOperator;
* Example:
* <pre>
* Intersect
* Intersect -> |
* Agg for Distinct
* </pre>
*/
public class InferSetOperatorDistinct extends OneRewriteRuleFactory {
@Override
public Rule build() {
return logicalSetOperation()
.when(operation -> operation.getQualifier() == Qualifier.DISTINCT)
.then(setOperation -> {
List<Plan> newChildren = setOperation.children().stream()
.map(child -> new LogicalAggregate<>(ImmutableList.copyOf(child.getOutput()), child))
.collect(ImmutableList.toImmutableList());
if (newChildren.equals(setOperation.children())) {
return null;
}
return setOperation.withChildren(newChildren);
}).toRule(RuleType.INFER_SET_OPERATOR_DISTINCT);
}
}

View File

@ -76,6 +76,13 @@ public class LogicalAggregate<CHILD_TYPE extends Plan>
false, Optional.empty(), child);
}
/**
* Distinct Agg
*/
public LogicalAggregate(List<NamedExpression> namedExpressions, CHILD_TYPE child) {
this(ImmutableList.copyOf(namedExpressions), namedExpressions, false, Optional.empty(), child);
}
public LogicalAggregate(List<Expression> groupByExpressions,
List<NamedExpression> outputExpressions, boolean ordinalIsResolved, CHILD_TYPE child) {
this(groupByExpressions, outputExpressions, false, ordinalIsResolved, Optional.empty(),

View File

@ -4,49 +4,55 @@ CteAnchor[cteId= ( CTEId#8=] )
--CteProducer[cteId= ( CTEId#8=] )
----PhysicalProject
------hashJoin[INNER_JOIN](item.i_brand_id = t.brand_id)(item.i_class_id = t.class_id)(item.i_category_id = t.category_id)
--------PhysicalIntersect
----------PhysicalDistribute
------------PhysicalProject
--------------hashJoin[INNER_JOIN](store_sales.ss_item_sk = iss.i_item_sk)
----------------hashJoin[INNER_JOIN](store_sales.ss_sold_date_sk = d1.d_date_sk)
------------------PhysicalProject
--------------------PhysicalOlapScan[store_sales]
------------------PhysicalDistribute
--------------------PhysicalProject
----------------------filter((d1.d_year <= 2002)(d1.d_year >= 2000))
------------------------PhysicalOlapScan[date_dim]
----------------PhysicalDistribute
------------------PhysicalProject
--------------------PhysicalOlapScan[item]
----------PhysicalDistribute
------------PhysicalProject
--------------hashJoin[INNER_JOIN](catalog_sales.cs_item_sk = ics.i_item_sk)
----------------hashJoin[INNER_JOIN](catalog_sales.cs_sold_date_sk = d2.d_date_sk)
------------------PhysicalProject
--------------------PhysicalOlapScan[catalog_sales]
------------------PhysicalDistribute
--------------------PhysicalProject
----------------------filter((d2.d_year >= 2000)(d2.d_year <= 2002))
------------------------PhysicalOlapScan[date_dim]
----------------PhysicalDistribute
------------------PhysicalProject
--------------------PhysicalOlapScan[item]
----------PhysicalDistribute
------------PhysicalProject
--------------hashJoin[INNER_JOIN](web_sales.ws_item_sk = iws.i_item_sk)
----------------hashJoin[INNER_JOIN](web_sales.ws_sold_date_sk = d3.d_date_sk)
------------------PhysicalProject
--------------------PhysicalOlapScan[web_sales]
------------------PhysicalDistribute
--------------------PhysicalProject
----------------------filter((d3.d_year <= 2002)(d3.d_year >= 2000))
------------------------PhysicalOlapScan[date_dim]
----------------PhysicalDistribute
------------------PhysicalProject
--------------------PhysicalOlapScan[item]
--------PhysicalProject
----------PhysicalOlapScan[item]
--------PhysicalDistribute
----------PhysicalProject
------------PhysicalOlapScan[item]
----------PhysicalIntersect
------------hashAgg[GLOBAL]
--------------PhysicalDistribute
----------------hashAgg[LOCAL]
------------------PhysicalProject
--------------------hashJoin[INNER_JOIN](store_sales.ss_item_sk = iss.i_item_sk)
----------------------hashJoin[INNER_JOIN](store_sales.ss_sold_date_sk = d1.d_date_sk)
------------------------PhysicalProject
--------------------------PhysicalOlapScan[store_sales]
------------------------PhysicalDistribute
--------------------------PhysicalProject
----------------------------filter((d1.d_year <= 2002)(d1.d_year >= 2000))
------------------------------PhysicalOlapScan[date_dim]
----------------------PhysicalDistribute
------------------------PhysicalProject
--------------------------PhysicalOlapScan[item]
------------hashAgg[GLOBAL]
--------------PhysicalDistribute
----------------hashAgg[LOCAL]
------------------PhysicalProject
--------------------hashJoin[INNER_JOIN](catalog_sales.cs_item_sk = ics.i_item_sk)
----------------------hashJoin[INNER_JOIN](catalog_sales.cs_sold_date_sk = d2.d_date_sk)
------------------------PhysicalProject
--------------------------PhysicalOlapScan[catalog_sales]
------------------------PhysicalDistribute
--------------------------PhysicalProject
----------------------------filter((d2.d_year >= 2000)(d2.d_year <= 2002))
------------------------------PhysicalOlapScan[date_dim]
----------------------PhysicalDistribute
------------------------PhysicalProject
--------------------------PhysicalOlapScan[item]
------------hashAgg[GLOBAL]
--------------PhysicalDistribute
----------------hashAgg[LOCAL]
------------------PhysicalProject
--------------------hashJoin[INNER_JOIN](web_sales.ws_item_sk = iws.i_item_sk)
----------------------hashJoin[INNER_JOIN](web_sales.ws_sold_date_sk = d3.d_date_sk)
------------------------PhysicalProject
--------------------------PhysicalOlapScan[web_sales]
------------------------PhysicalDistribute
--------------------------PhysicalProject
----------------------------filter((d3.d_year <= 2002)(d3.d_year >= 2000))
------------------------------PhysicalOlapScan[date_dim]
----------------------PhysicalDistribute
------------------------PhysicalProject
--------------------------PhysicalOlapScan[item]
--CteAnchor[cteId= ( CTEId#10=] )
----CteProducer[cteId= ( CTEId#10=] )
------hashAgg[GLOBAL]