[fix](nereids) push down filter through partition topn (#24944)

support pushing down filter through partition topn if the filter can pass through window.
fix CreatePartitionTopNFromWindow bug which may generate two partition topn unexpectly.
case:
select * from (select c2, row_number() over (partition by c2) as rn from t1) T where rn<=1 and c2 = 1;
before this pr:
| PhysicalResultSink                       |
| --PhysicalDistribute                     |
| ----filter((rn <= 1))                    |
| ------PhysicalWindow                     |
| --------PhysicalQuickSort                |
| ----------PhysicalDistribute             |
| ------------PhysicalPartitionTopN        |
| --------------filter((T.c2 = 1))         |
| ----------------PhysicalPartitionTopN    |
| ------------------PhysicalProject        |
| --------------------PhysicalOlapScan[t1] |
+------------------------------------------+
after:

| PhysicalResultSink                     |
| --PhysicalDistribute                   |
| ----filter((rn <= 1))                  |
| ------PhysicalWindow                   |
| --------PhysicalQuickSort              |
| ----------PhysicalDistribute           |
| ------------PhysicalPartitionTopN      |
| --------------PhysicalProject          |
| ----------------filter((T.c2 = 1))     |
| ------------------PhysicalOlapScan[t1] |
+----------------------------------------+
This commit is contained in:
xzj7019
2023-09-27 19:38:04 +08:00
committed by GitHub
parent 00786a3295
commit bb7f8d18a8
6 changed files with 212 additions and 3 deletions

View File

@ -84,6 +84,7 @@ import org.apache.doris.nereids.rules.rewrite.PushdownAliasThroughJoin;
import org.apache.doris.nereids.rules.rewrite.PushdownExpressionsInHashCondition;
import org.apache.doris.nereids.rules.rewrite.PushdownFilterThroughAggregation;
import org.apache.doris.nereids.rules.rewrite.PushdownFilterThroughJoin;
import org.apache.doris.nereids.rules.rewrite.PushdownFilterThroughPartitionTopN;
import org.apache.doris.nereids.rules.rewrite.PushdownFilterThroughProject;
import org.apache.doris.nereids.rules.rewrite.PushdownFilterThroughRepeat;
import org.apache.doris.nereids.rules.rewrite.PushdownFilterThroughSetOperation;
@ -139,7 +140,8 @@ public class RuleSet {
new MergeGenerates(),
new MergeLimits(),
new PushdownAliasThroughJoin(),
new PushdownFilterThroughWindow()
new PushdownFilterThroughWindow(),
new PushdownFilterThroughPartitionTopN()
);
public static final List<Rule> IMPLEMENTATION_RULES = planRuleFactories()

View File

@ -101,6 +101,7 @@ public enum RuleType {
EXTRACT_AND_NORMALIZE_WINDOW_EXPRESSIONS(RuleTypeClass.REWRITE),
CHECK_AND_STANDARDIZE_WINDOW_FUNCTION_AND_FRAME(RuleTypeClass.REWRITE),
CHECK_MATCH_EXPRESSION(RuleTypeClass.REWRITE),
CREATE_PARTITION_TOPN_FOR_WINDOW(RuleTypeClass.REWRITE),
AGGREGATE_DISASSEMBLE(RuleTypeClass.REWRITE),
SIMPLIFY_AGG_GROUP_BY(RuleTypeClass.REWRITE),
DISTINCT_AGGREGATE_DISASSEMBLE(RuleTypeClass.REWRITE),
@ -149,6 +150,7 @@ public enum RuleType {
PUSHDOWN_FILTER_THROUGH_PROJECT(RuleTypeClass.REWRITE),
PUSHDOWN_FILTER_THROUGH_PROJECT_UNDER_LIMIT(RuleTypeClass.REWRITE),
PUSHDOWN_FILTER_THROUGH_WINDOW(RuleTypeClass.REWRITE),
PUSHDOWN_FILTER_THROUGH_PARTITION_TOPN(RuleTypeClass.REWRITE),
PUSHDOWN_PROJECT_THROUGH_LIMIT(RuleTypeClass.REWRITE),
PUSHDOWN_ALIAS_THROUGH_JOIN(RuleTypeClass.REWRITE),
PUSHDOWN_ALIAS_INTO_UNION_ALL(RuleTypeClass.REWRITE),

View File

@ -82,7 +82,10 @@ public class CreatePartitionTopNFromWindow extends OneRewriteRuleFactory {
LogicalWindow<Plan> window = filter.child();
// We have already done such optimization rule, so just ignore it.
if (window.child(0) instanceof LogicalPartitionTopN) {
if (window.child(0) instanceof LogicalPartitionTopN
|| (window.child(0) instanceof LogicalFilter
&& window.child(0).child(0) != null
&& window.child(0).child(0) instanceof LogicalPartitionTopN)) {
return filter;
}
@ -137,7 +140,7 @@ public class CreatePartitionTopNFromWindow extends OneRewriteRuleFactory {
return filter;
}
return filter.withChildren(newWindow.get());
}).toRule(RuleType.PUSHDOWN_FILTER_THROUGH_WINDOW);
}).toRule(RuleType.CREATE_PARTITION_TOPN_FOR_WINDOW);
}
private Set<Expression> extractRelatedConjuncts(Set<Expression> conjuncts, ExprId slotRefID) {

View File

@ -0,0 +1,97 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.rules.rewrite;
import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
import org.apache.doris.nereids.trees.plans.logical.LogicalPartitionTopN;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSet.Builder;
import java.util.Set;
/**
* Push down the 'filter' pass through the 'partitionTopN' if filter key is partitionTopN's partition key.
* Logical plan tree:
* any_node
* |
* filter (a <= 100)
* |
* partition topn (PARTITION BY a)
* |
* any_node
* transformed to:
* any_node
* |
* partition topn (PARTITION BY a)
* |
* filter (a <= 100)
* |
* any_node
*/
public class PushdownFilterThroughPartitionTopN extends OneRewriteRuleFactory {
@Override
public Rule build() {
return logicalFilter(logicalPartitionTopN()).thenApply(ctx -> {
LogicalFilter<LogicalPartitionTopN<Plan>> filter = ctx.root;
LogicalPartitionTopN<Plan> partitionTopN = filter.child();
// follow the similar checking and transformation rule as
// PushdownFilterThroughWindow
Builder<Expression> bottomConjunctsBuilder = ImmutableSet.builder();
Builder<Expression> upperConjunctsBuilder = ImmutableSet.builder();
for (Expression expr : filter.getConjuncts()) {
boolean pushed = false;
Set<Slot> exprInputSlots = expr.getInputSlots();
for (Expression partitionKey : partitionTopN.getPartitionKeys()) {
if (partitionKey instanceof SlotReference
&& exprInputSlots.size() == 1
&& partitionKey.getInputSlots().containsAll(exprInputSlots)) {
bottomConjunctsBuilder.add(expr);
pushed = true;
break;
}
}
if (!pushed) {
upperConjunctsBuilder.add(expr);
}
}
ImmutableSet<Expression> bottomConjuncts = bottomConjunctsBuilder.build();
ImmutableSet<Expression> upperConjuncts = upperConjunctsBuilder.build();
if (bottomConjuncts.isEmpty()) {
return null;
}
LogicalFilter<Plan> bottomFilter = new LogicalFilter<>(bottomConjuncts, partitionTopN.child());
partitionTopN = (LogicalPartitionTopN<Plan>) partitionTopN.withChildren(bottomFilter);
if (upperConjuncts.isEmpty()) {
return partitionTopN;
} else {
return filter.withConjunctsAndChild(upperConjuncts, partitionTopN);
}
}).toRule(RuleType.PUSHDOWN_FILTER_THROUGH_PARTITION_TOPN);
}
}