[opt](nereids)push down filter through window #23935
select rank() over (partition by A, B) as r, sum(x) over(A, C) as s from T;
A is a common partition key for all windowExpressions, that is A is intersection of {A,B} and {A, C}
we could push filter A=1 through this window, since A is a common Partition key:
select * from (select a, row_number() over (partition by a) from win) T where a=1;
origin plan:
----filter((T.a = 1))
----------PhysicalWindow
------------PhysicalQuickSort
--------------PhysicalProject
------------------PhysicalOlapScan[win]
transformed to
----PhysicalWindow
------PhysicalQuickSort
--------PhysicalProject
----------filter((T.a = 1))
------------PhysicalOlapScan[win]
But C=1 can not be pushed through window.
This commit is contained in:
@ -49,6 +49,7 @@ import org.apache.doris.nereids.rules.rewrite.ColumnPruning;
|
||||
import org.apache.doris.nereids.rules.rewrite.ConvertInnerOrCrossJoin;
|
||||
import org.apache.doris.nereids.rules.rewrite.CountDistinctRewrite;
|
||||
import org.apache.doris.nereids.rules.rewrite.CountLiteralToCountStar;
|
||||
import org.apache.doris.nereids.rules.rewrite.CreatePartitionTopNFromWindow;
|
||||
import org.apache.doris.nereids.rules.rewrite.DeferMaterializeTopNResult;
|
||||
import org.apache.doris.nereids.rules.rewrite.EliminateAggregate;
|
||||
import org.apache.doris.nereids.rules.rewrite.EliminateDedupJoinCondition;
|
||||
@ -87,7 +88,6 @@ import org.apache.doris.nereids.rules.rewrite.PushFilterInsideJoin;
|
||||
import org.apache.doris.nereids.rules.rewrite.PushProjectIntoOneRowRelation;
|
||||
import org.apache.doris.nereids.rules.rewrite.PushProjectThroughUnion;
|
||||
import org.apache.doris.nereids.rules.rewrite.PushdownFilterThroughProject;
|
||||
import org.apache.doris.nereids.rules.rewrite.PushdownFilterThroughWindow;
|
||||
import org.apache.doris.nereids.rules.rewrite.PushdownLimit;
|
||||
import org.apache.doris.nereids.rules.rewrite.PushdownTopNThroughWindow;
|
||||
import org.apache.doris.nereids.rules.rewrite.ReorderJoin;
|
||||
@ -277,7 +277,7 @@ public class Rewriter extends AbstractBatchJobExecutor {
|
||||
new SplitLimit(),
|
||||
new PushdownLimit(),
|
||||
new PushdownTopNThroughWindow(),
|
||||
new PushdownFilterThroughWindow()
|
||||
new CreatePartitionTopNFromWindow()
|
||||
)
|
||||
),
|
||||
// TODO: these rules should be implementation rules, and generate alternative physical plans.
|
||||
|
||||
@ -74,6 +74,7 @@ import org.apache.doris.nereids.rules.implementation.LogicalTopNToPhysicalTopN;
|
||||
import org.apache.doris.nereids.rules.implementation.LogicalUnionToPhysicalUnion;
|
||||
import org.apache.doris.nereids.rules.implementation.LogicalWindowToPhysicalWindow;
|
||||
import org.apache.doris.nereids.rules.rewrite.ConvertOuterJoinToAntiJoin;
|
||||
import org.apache.doris.nereids.rules.rewrite.CreatePartitionTopNFromWindow;
|
||||
import org.apache.doris.nereids.rules.rewrite.EliminateOuterJoin;
|
||||
import org.apache.doris.nereids.rules.rewrite.MergeFilters;
|
||||
import org.apache.doris.nereids.rules.rewrite.MergeGenerates;
|
||||
@ -121,6 +122,7 @@ public class RuleSet {
|
||||
.build();
|
||||
|
||||
public static final List<RuleFactory> PUSH_DOWN_FILTERS = ImmutableList.of(
|
||||
new CreatePartitionTopNFromWindow(),
|
||||
new PushdownFilterThroughProject(),
|
||||
new PushdownFilterThroughSort(),
|
||||
new PushdownJoinOtherCondition(),
|
||||
@ -129,7 +131,6 @@ public class RuleSet {
|
||||
new PushdownFilterThroughAggregation(),
|
||||
new PushdownFilterThroughRepeat(),
|
||||
new PushdownFilterThroughSetOperation(),
|
||||
new PushdownFilterThroughWindow(),
|
||||
new PushdownProjectThroughLimit(),
|
||||
new EliminateOuterJoin(),
|
||||
new ConvertOuterJoinToAntiJoin(),
|
||||
@ -137,7 +138,9 @@ public class RuleSet {
|
||||
new MergeFilters(),
|
||||
new MergeGenerates(),
|
||||
new MergeLimits(),
|
||||
new PushdownAliasThroughJoin());
|
||||
new PushdownAliasThroughJoin(),
|
||||
new PushdownFilterThroughWindow()
|
||||
);
|
||||
|
||||
public static final List<Rule> IMPLEMENTATION_RULES = planRuleFactories()
|
||||
.add(new LogicalCTEProducerToPhysicalCTEProducer())
|
||||
|
||||
@ -0,0 +1,167 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.nereids.rules.rewrite;
|
||||
|
||||
import org.apache.doris.nereids.rules.Rule;
|
||||
import org.apache.doris.nereids.rules.RuleType;
|
||||
import org.apache.doris.nereids.trees.expressions.BinaryOperator;
|
||||
import org.apache.doris.nereids.trees.expressions.EqualTo;
|
||||
import org.apache.doris.nereids.trees.expressions.ExprId;
|
||||
import org.apache.doris.nereids.trees.expressions.Expression;
|
||||
import org.apache.doris.nereids.trees.expressions.LessThan;
|
||||
import org.apache.doris.nereids.trees.expressions.LessThanEqual;
|
||||
import org.apache.doris.nereids.trees.expressions.NamedExpression;
|
||||
import org.apache.doris.nereids.trees.expressions.SlotReference;
|
||||
import org.apache.doris.nereids.trees.expressions.WindowExpression;
|
||||
import org.apache.doris.nereids.trees.expressions.literal.IntegerLikeLiteral;
|
||||
import org.apache.doris.nereids.trees.plans.Plan;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalEmptyRelation;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalPartitionTopN;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalWindow;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
/**
|
||||
* Push down the 'partitionTopN' into the 'window'.
|
||||
* It will convert the filter condition to the 'limit value' and push down below the 'window'.
|
||||
* But there are some restrictions, the details are explained below.
|
||||
* For example:
|
||||
* 'SELECT * FROM (
|
||||
* SELECT *, ROW_NUMBER() OVER (ORDER BY b) AS row_number
|
||||
* FROM t
|
||||
* ) AS tt WHERE row_number <= 100;'
|
||||
* The filter 'row_number <= 100' can be pushed down into the window operator.
|
||||
* The following will demonstrate how the plan changes:
|
||||
* Logical plan tree:
|
||||
* any_node
|
||||
* |
|
||||
* filter (row_number <= 100)
|
||||
* |
|
||||
* window (PARTITION BY a ORDER BY b)
|
||||
* |
|
||||
* any_node
|
||||
* transformed to:
|
||||
* any_node
|
||||
* |
|
||||
* filter (row_number <= 100)
|
||||
* |
|
||||
* window (PARTITION BY a ORDER BY b)
|
||||
* |
|
||||
* partition_topn(PARTITION BY: a, ORDER BY b, Partition Limit: 100)
|
||||
* |
|
||||
* any_node
|
||||
*/
|
||||
|
||||
public class CreatePartitionTopNFromWindow extends OneRewriteRuleFactory {
|
||||
@Override
|
||||
public Rule build() {
|
||||
return logicalFilter(logicalWindow()).thenApply(ctx -> {
|
||||
LogicalFilter<LogicalWindow<Plan>> filter = ctx.root;
|
||||
LogicalWindow<Plan> window = filter.child();
|
||||
|
||||
// We have already done such optimization rule, so just ignore it.
|
||||
if (window.child(0) instanceof LogicalPartitionTopN) {
|
||||
return filter;
|
||||
}
|
||||
|
||||
List<NamedExpression> windowExprs = window.getWindowExpressions();
|
||||
if (windowExprs.size() != 1) {
|
||||
return filter;
|
||||
}
|
||||
NamedExpression windowExpr = windowExprs.get(0);
|
||||
if (windowExpr.children().size() != 1 || !(windowExpr.child(0) instanceof WindowExpression)) {
|
||||
return filter;
|
||||
}
|
||||
|
||||
// Check the filter conditions. Now, we currently only support simple conditions of the form
|
||||
// 'column </ <=/ = constant'. We will extract some related conjuncts and do some check.
|
||||
Set<Expression> conjuncts = filter.getConjuncts();
|
||||
Set<Expression> relatedConjuncts = extractRelatedConjuncts(conjuncts, windowExpr.getExprId());
|
||||
|
||||
boolean hasPartitionLimit = false;
|
||||
long partitionLimit = Long.MAX_VALUE;
|
||||
|
||||
for (Expression conjunct : relatedConjuncts) {
|
||||
Preconditions.checkArgument(conjunct instanceof BinaryOperator);
|
||||
BinaryOperator op = (BinaryOperator) conjunct;
|
||||
Expression leftChild = op.children().get(0);
|
||||
Expression rightChild = op.children().get(1);
|
||||
|
||||
Preconditions.checkArgument(leftChild instanceof SlotReference
|
||||
&& rightChild instanceof IntegerLikeLiteral);
|
||||
|
||||
long limitVal = ((IntegerLikeLiteral) rightChild).getLongValue();
|
||||
// Adjust the value for 'limitVal' based on the comparison operators.
|
||||
if (conjunct instanceof LessThan) {
|
||||
limitVal--;
|
||||
}
|
||||
if (limitVal < 0) {
|
||||
return new LogicalEmptyRelation(ctx.statementContext.getNextRelationId(), filter.getOutput());
|
||||
}
|
||||
if (hasPartitionLimit) {
|
||||
partitionLimit = Math.min(partitionLimit, limitVal);
|
||||
} else {
|
||||
partitionLimit = limitVal;
|
||||
hasPartitionLimit = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (!hasPartitionLimit) {
|
||||
return filter;
|
||||
}
|
||||
|
||||
Optional<Plan> newWindow = window.pushPartitionLimitThroughWindow(partitionLimit, false);
|
||||
if (!newWindow.isPresent()) {
|
||||
return filter;
|
||||
}
|
||||
return filter.withChildren(newWindow.get());
|
||||
}).toRule(RuleType.PUSHDOWN_FILTER_THROUGH_WINDOW);
|
||||
}
|
||||
|
||||
private Set<Expression> extractRelatedConjuncts(Set<Expression> conjuncts, ExprId slotRefID) {
|
||||
Predicate<Expression> condition = conjunct -> {
|
||||
if (!(conjunct instanceof BinaryOperator)) {
|
||||
return false;
|
||||
}
|
||||
BinaryOperator op = (BinaryOperator) conjunct;
|
||||
Expression leftChild = op.children().get(0);
|
||||
Expression rightChild = op.children().get(1);
|
||||
|
||||
if (!(conjunct instanceof LessThan || conjunct instanceof LessThanEqual || conjunct instanceof EqualTo)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// TODO: Now, we only support the column on the left side.
|
||||
if (!(leftChild instanceof SlotReference) || !(rightChild instanceof IntegerLikeLiteral)) {
|
||||
return false;
|
||||
}
|
||||
return ((SlotReference) leftChild).getExprId() == slotRefID;
|
||||
};
|
||||
|
||||
return conjuncts.stream()
|
||||
.filter(condition)
|
||||
.collect(ImmutableSet.toImmutableSet());
|
||||
}
|
||||
}
|
||||
@ -19,45 +19,22 @@ package org.apache.doris.nereids.rules.rewrite;
|
||||
|
||||
import org.apache.doris.nereids.rules.Rule;
|
||||
import org.apache.doris.nereids.rules.RuleType;
|
||||
import org.apache.doris.nereids.trees.expressions.BinaryOperator;
|
||||
import org.apache.doris.nereids.trees.expressions.EqualTo;
|
||||
import org.apache.doris.nereids.trees.expressions.ExprId;
|
||||
import org.apache.doris.nereids.trees.expressions.Expression;
|
||||
import org.apache.doris.nereids.trees.expressions.LessThan;
|
||||
import org.apache.doris.nereids.trees.expressions.LessThanEqual;
|
||||
import org.apache.doris.nereids.trees.expressions.NamedExpression;
|
||||
import org.apache.doris.nereids.trees.expressions.SlotReference;
|
||||
import org.apache.doris.nereids.trees.expressions.WindowExpression;
|
||||
import org.apache.doris.nereids.trees.expressions.literal.IntegerLikeLiteral;
|
||||
import org.apache.doris.nereids.trees.plans.Plan;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalEmptyRelation;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalPartitionTopN;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalWindow;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
/**
|
||||
* Push down the 'filter' into the 'window'.
|
||||
* It will convert the filter condition to the 'limit value' and push down below the 'window'.
|
||||
* But there are some restrictions, the details are explained below.
|
||||
* For example:
|
||||
* 'SELECT * FROM (
|
||||
* SELECT *, ROW_NUMBER() OVER (ORDER BY b) AS row_number
|
||||
* FROM t
|
||||
* ) AS tt WHERE row_number <= 100;'
|
||||
* The filter 'row_number <= 100' can be pushed down into the window operator.
|
||||
* The following will demonstrate how the plan changes:
|
||||
* Push down the 'filter' into the 'window' if filter key is window partition key.
|
||||
* Logical plan tree:
|
||||
* any_node
|
||||
* |
|
||||
* filter (row_number <= 100)
|
||||
* filter (a <= 100)
|
||||
* |
|
||||
* window (PARTITION BY a ORDER BY b)
|
||||
* |
|
||||
@ -65,11 +42,9 @@ import java.util.function.Predicate;
|
||||
* transformed to:
|
||||
* any_node
|
||||
* |
|
||||
* filter (row_number <= 100)
|
||||
* |
|
||||
* window (PARTITION BY a ORDER BY b)
|
||||
* |
|
||||
* partition_topn(PARTITION BY: a, ORDER BY b, Partition Limit: 100)
|
||||
* filter (a <= 100)
|
||||
* |
|
||||
* any_node
|
||||
*/
|
||||
@ -81,88 +56,43 @@ public class PushdownFilterThroughWindow extends OneRewriteRuleFactory {
|
||||
return logicalFilter(logicalWindow()).thenApply(ctx -> {
|
||||
LogicalFilter<LogicalWindow<Plan>> filter = ctx.root;
|
||||
LogicalWindow<Plan> window = filter.child();
|
||||
|
||||
// We have already done such optimization rule, so just ignore it.
|
||||
if (window.child(0) instanceof LogicalPartitionTopN) {
|
||||
return filter;
|
||||
}
|
||||
|
||||
List<NamedExpression> windowExprs = window.getWindowExpressions();
|
||||
if (windowExprs.size() != 1) {
|
||||
return filter;
|
||||
}
|
||||
NamedExpression windowExpr = windowExprs.get(0);
|
||||
if (windowExpr.children().size() != 1 || !(windowExpr.child(0) instanceof WindowExpression)) {
|
||||
return filter;
|
||||
}
|
||||
|
||||
// Check the filter conditions. Now, we currently only support simple conditions of the form
|
||||
// 'column </ <=/ = constant'. We will extract some related conjuncts and do some check.
|
||||
Set<Expression> conjuncts = filter.getConjuncts();
|
||||
Set<Expression> relatedConjuncts = extractRelatedConjuncts(conjuncts, windowExpr.getExprId());
|
||||
|
||||
boolean hasPartitionLimit = false;
|
||||
long partitionLimit = Long.MAX_VALUE;
|
||||
|
||||
for (Expression conjunct : relatedConjuncts) {
|
||||
Preconditions.checkArgument(conjunct instanceof BinaryOperator);
|
||||
BinaryOperator op = (BinaryOperator) conjunct;
|
||||
Expression leftChild = op.children().get(0);
|
||||
Expression rightChild = op.children().get(1);
|
||||
|
||||
Preconditions.checkArgument(leftChild instanceof SlotReference
|
||||
&& rightChild instanceof IntegerLikeLiteral);
|
||||
|
||||
long limitVal = ((IntegerLikeLiteral) rightChild).getLongValue();
|
||||
// Adjust the value for 'limitVal' based on the comparison operators.
|
||||
if (conjunct instanceof LessThan) {
|
||||
limitVal--;
|
||||
// now we only handle single slot used as partition key
|
||||
// for example:
|
||||
// select * from (select T.*, rank() over(partition by c2+c3 order by c4) rn from T) abc where c2=1;
|
||||
// c2=1 cannot be pushed down.
|
||||
Set<SlotReference> commonPartitionKeys = window.getCommonPartitionKeyFromWindowExpressions();
|
||||
Set<Expression> bottomConjuncts = Sets.newHashSet();
|
||||
Set<Expression> upperConjuncts = Sets.newHashSet();
|
||||
for (Expression expr : filter.getConjuncts()) {
|
||||
boolean pushed = false;
|
||||
for (Expression partitionKey : commonPartitionKeys) {
|
||||
// partitionKey is a single slot reference,
|
||||
// we want to push expressions which have only one input slot, and the input slot is used as
|
||||
// partition key in all windowExpressions.
|
||||
if (partitionKey.getInputSlots().containsAll(expr.getInputSlots())) {
|
||||
bottomConjuncts.add(expr);
|
||||
pushed = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (limitVal < 0) {
|
||||
return new LogicalEmptyRelation(ctx.statementContext.getNextRelationId(), filter.getOutput());
|
||||
}
|
||||
if (hasPartitionLimit) {
|
||||
partitionLimit = Math.min(partitionLimit, limitVal);
|
||||
} else {
|
||||
partitionLimit = limitVal;
|
||||
hasPartitionLimit = true;
|
||||
if (!pushed) {
|
||||
upperConjuncts.add(expr);
|
||||
}
|
||||
}
|
||||
|
||||
if (!hasPartitionLimit) {
|
||||
return filter;
|
||||
if (bottomConjuncts.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
|
||||
Optional<Plan> newWindow = window.pushPartitionLimitThroughWindow(partitionLimit, false);
|
||||
if (!newWindow.isPresent()) {
|
||||
return filter;
|
||||
LogicalFilter<Plan> bottomFilter = new LogicalFilter<>(bottomConjuncts, window.child());
|
||||
window = (LogicalWindow<Plan>) window.withChildren(bottomFilter);
|
||||
if (upperConjuncts.isEmpty()) {
|
||||
return window;
|
||||
} else {
|
||||
LogicalFilter<Plan> upperFilter = (LogicalFilter<Plan>) filter
|
||||
.withConjuncts(upperConjuncts).withChildren(window);
|
||||
return upperFilter;
|
||||
}
|
||||
return filter.withChildren(newWindow.get());
|
||||
}).toRule(RuleType.PUSHDOWN_FILTER_THROUGH_WINDOW);
|
||||
}
|
||||
|
||||
private Set<Expression> extractRelatedConjuncts(Set<Expression> conjuncts, ExprId slotRefID) {
|
||||
Predicate<Expression> condition = conjunct -> {
|
||||
if (!(conjunct instanceof BinaryOperator)) {
|
||||
return false;
|
||||
}
|
||||
BinaryOperator op = (BinaryOperator) conjunct;
|
||||
Expression leftChild = op.children().get(0);
|
||||
Expression rightChild = op.children().get(1);
|
||||
|
||||
if (!(conjunct instanceof LessThan || conjunct instanceof LessThanEqual || conjunct instanceof EqualTo)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// TODO: Now, we only support the column on the left side.
|
||||
if (!(leftChild instanceof SlotReference) || !(rightChild instanceof IntegerLikeLiteral)) {
|
||||
return false;
|
||||
}
|
||||
return ((SlotReference) leftChild).getExprId() == slotRefID;
|
||||
};
|
||||
|
||||
return conjuncts.stream()
|
||||
.filter(condition)
|
||||
.collect(ImmutableSet.toImmutableSet());
|
||||
}
|
||||
}
|
||||
|
||||
@ -19,9 +19,11 @@ package org.apache.doris.nereids.trees.plans.logical;
|
||||
|
||||
import org.apache.doris.nereids.memo.GroupExpression;
|
||||
import org.apache.doris.nereids.properties.LogicalProperties;
|
||||
import org.apache.doris.nereids.trees.expressions.Alias;
|
||||
import org.apache.doris.nereids.trees.expressions.Expression;
|
||||
import org.apache.doris.nereids.trees.expressions.NamedExpression;
|
||||
import org.apache.doris.nereids.trees.expressions.Slot;
|
||||
import org.apache.doris.nereids.trees.expressions.SlotReference;
|
||||
import org.apache.doris.nereids.trees.expressions.WindowExpression;
|
||||
import org.apache.doris.nereids.trees.expressions.WindowFrame;
|
||||
import org.apache.doris.nereids.trees.expressions.functions.window.DenseRank;
|
||||
@ -36,10 +38,14 @@ import org.apache.doris.qe.ConnectContext;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.collect.Maps;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* logical node to deal with window functions;
|
||||
@ -224,4 +230,31 @@ public class LogicalWindow<CHILD_TYPE extends Plan> extends LogicalUnary<CHILD_T
|
||||
|
||||
return Optional.ofNullable(window);
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* select rank() over (partition by A, B) as r, sum(x) over(A, C) as s from T;
|
||||
* A is a common partition key for all windowExpressions.
|
||||
* for a common Partition key A, we could push filter A=1 through this window.
|
||||
*/
|
||||
public Set<SlotReference> getCommonPartitionKeyFromWindowExpressions() {
|
||||
ImmutableSet.Builder<SlotReference> commonPartitionKeySet = ImmutableSet.builder();
|
||||
Map<Expression, Integer> partitionKeyCount = Maps.newHashMap();
|
||||
for (Expression expr : windowExpressions) {
|
||||
if (expr instanceof Alias && expr.child(0) instanceof WindowExpression) {
|
||||
WindowExpression winExpr = (WindowExpression) expr.child(0);
|
||||
for (Expression partitionKey : winExpr.getPartitionKeys()) {
|
||||
int count = partitionKeyCount.getOrDefault(partitionKey, 0);
|
||||
partitionKeyCount.put(partitionKey, count + 1);
|
||||
}
|
||||
}
|
||||
}
|
||||
int winExprCount = windowExpressions.size();
|
||||
for (Map.Entry<Expression, Integer> entry : partitionKeyCount.entrySet()) {
|
||||
if (entry.getValue() == winExprCount && entry.getKey() instanceof SlotReference) {
|
||||
commonPartitionKeySet.add((SlotReference) entry.getKey());
|
||||
}
|
||||
}
|
||||
return commonPartitionKeySet.build();
|
||||
}
|
||||
}
|
||||
|
||||
@ -46,7 +46,7 @@ import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public class PushdownFilterThroughWindowTest implements MemoPatternMatchSupported {
|
||||
public class GeneratePartitionTopnFromWindowTest implements MemoPatternMatchSupported {
|
||||
private final LogicalOlapScan scan = new LogicalOlapScan(StatementScopeIdGenerator.newRelationId(), PlanConstructor.student,
|
||||
ImmutableList.of(""));
|
||||
|
||||
@ -72,7 +72,7 @@ public class PushdownFilterThroughWindowTest implements MemoPatternMatchSupporte
|
||||
* scan(student)
|
||||
*/
|
||||
@Test
|
||||
public void pushDownFilterThroughWindowTest() {
|
||||
public void testGeneratePartitionTopnFromWindow() {
|
||||
ConnectContext context = MemoTestUtils.createConnectContext();
|
||||
context.getSessionVariable().setEnablePartitionTopN(true);
|
||||
NamedExpression gender = scan.getOutput().get(1).toSlot();
|
||||
@ -96,7 +96,7 @@ public class PushdownFilterThroughWindowTest implements MemoPatternMatchSupporte
|
||||
.build();
|
||||
|
||||
PlanChecker.from(context, plan)
|
||||
.applyTopDown(new PushdownFilterThroughWindow())
|
||||
.applyTopDown(new CreatePartitionTopNFromWindow())
|
||||
.matches(
|
||||
logicalProject(
|
||||
logicalFilter(
|
||||
@ -0,0 +1,92 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.nereids.rules.rewrite;
|
||||
|
||||
import org.apache.doris.nereids.trees.expressions.Alias;
|
||||
import org.apache.doris.nereids.trees.expressions.EqualTo;
|
||||
import org.apache.doris.nereids.trees.expressions.Expression;
|
||||
import org.apache.doris.nereids.trees.expressions.NamedExpression;
|
||||
import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator;
|
||||
import org.apache.doris.nereids.trees.expressions.WindowExpression;
|
||||
import org.apache.doris.nereids.trees.expressions.WindowFrame;
|
||||
import org.apache.doris.nereids.trees.expressions.functions.window.RowNumber;
|
||||
import org.apache.doris.nereids.trees.expressions.literal.Literal;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalWindow;
|
||||
import org.apache.doris.nereids.util.LogicalPlanBuilder;
|
||||
import org.apache.doris.nereids.util.MemoPatternMatchSupported;
|
||||
import org.apache.doris.nereids.util.MemoTestUtils;
|
||||
import org.apache.doris.nereids.util.PlanChecker;
|
||||
import org.apache.doris.nereids.util.PlanConstructor;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Lists;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public class PushdowFilterThroughWindowTest implements MemoPatternMatchSupported {
|
||||
private final LogicalOlapScan scan = new LogicalOlapScan(StatementScopeIdGenerator.newRelationId(), PlanConstructor.student,
|
||||
ImmutableList.of(""));
|
||||
|
||||
@Test
|
||||
public void pushDownFilterThroughWindowTest() {
|
||||
ConnectContext context = MemoTestUtils.createConnectContext();
|
||||
NamedExpression age = scan.getOutput().get(3).toSlot();
|
||||
List<Expression> partitionKeyList = ImmutableList.of(age);
|
||||
WindowFrame windowFrame = new WindowFrame(WindowFrame.FrameUnitsType.ROWS,
|
||||
WindowFrame.FrameBoundary.newPrecedingBoundary(),
|
||||
WindowFrame.FrameBoundary.newCurrentRowBoundary());
|
||||
WindowExpression window1 = new WindowExpression(new RowNumber(), partitionKeyList,
|
||||
Lists.newArrayList(), windowFrame);
|
||||
Alias windowAlias1 = new Alias(window1, window1.toSql());
|
||||
List<NamedExpression> expressions = Lists.newArrayList(windowAlias1);
|
||||
LogicalWindow<LogicalOlapScan> window = new LogicalWindow<>(expressions, scan);
|
||||
Expression filterPredicate = new EqualTo(age, Literal.of(100));
|
||||
|
||||
LogicalPlan plan = new LogicalPlanBuilder(window)
|
||||
.filter(filterPredicate)
|
||||
.project(ImmutableList.of(0))
|
||||
.build();
|
||||
PlanChecker.from(context, plan)
|
||||
.applyTopDown(new PushdownFilterThroughWindow())
|
||||
.matches(
|
||||
logicalProject(
|
||||
logicalWindow(
|
||||
logicalFilter(
|
||||
logicalOlapScan()
|
||||
).when(filter -> {
|
||||
if (filter.getConjuncts().size() != 1) {
|
||||
return false;
|
||||
}
|
||||
Expression conj = filter.getConjuncts().iterator().next();
|
||||
if (!(conj instanceof EqualTo)) {
|
||||
return false;
|
||||
}
|
||||
EqualTo eq = (EqualTo) conj;
|
||||
return eq.left().equals(age);
|
||||
|
||||
})
|
||||
)
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user