[feat](Nereids) push down predicates with multi columns through LogicalWindow and LogicalPartitionTopN (#36828) (#36981)

cherry-pick #36828 to branch-2.1

The requirement for predicate pushdown through the window operator is
that the partition by slots of the window contains all slots in the
predicate. The original implementation of doris only allows predicate
pushdown with one slot. This PR relaxes this restriction and allows for
predicate pushdown with multiple slots. The same applies to the
predicate pushdown of the LogicalPartitionTopN operator. The following
sql is an example.

select
	*
from
	(
	select
		row_number() over(partition by id, value1 order by value1) as num,
		id,
		value1
	from
		push_down_multi_column_predicate_through_window_t ) t
where
	abs(id + value1)<4
	and num <= 2;


Co-authored-by: feiniaofeiafei <moailing@selectdb.com>
This commit is contained in:
feiniaofeiafei
2024-07-04 21:00:08 +08:00
committed by GitHub
parent c8978fc9d1
commit 4e4f3d204e
4 changed files with 89 additions and 24 deletions

View File

@ -29,6 +29,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalPartitionTopN;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSet.Builder;
import java.util.HashSet;
import java.util.Set;
/**
@ -62,19 +63,17 @@ public class PushDownFilterThroughPartitionTopN extends OneRewriteRuleFactory {
// PushdownFilterThroughWindow
Builder<Expression> bottomConjunctsBuilder = ImmutableSet.builder();
Builder<Expression> upperConjunctsBuilder = ImmutableSet.builder();
for (Expression expr : filter.getConjuncts()) {
boolean pushed = false;
Set<Slot> exprInputSlots = expr.getInputSlots();
for (Expression partitionKey : partitionTopN.getPartitionKeys()) {
if (partitionKey instanceof SlotReference
&& exprInputSlots.size() == 1
&& partitionKey.getInputSlots().containsAll(exprInputSlots)) {
bottomConjunctsBuilder.add(expr);
pushed = true;
break;
}
Set<SlotReference> partitionKeySlots = new HashSet<>();
for (Expression partitionKey : partitionTopN.getPartitionKeys()) {
if (partitionKey instanceof SlotReference) {
partitionKeySlots.add((SlotReference) partitionKey);
}
if (!pushed) {
}
for (Expression expr : filter.getConjuncts()) {
Set<Slot> exprInputSlots = expr.getInputSlots();
if (partitionKeySlots.containsAll(exprInputSlots)) {
bottomConjunctsBuilder.add(expr);
} else {
upperConjunctsBuilder.add(expr);
}
}

View File

@ -64,18 +64,9 @@ public class PushDownFilterThroughWindow extends OneRewriteRuleFactory {
Set<Expression> bottomConjuncts = Sets.newHashSet();
Set<Expression> upperConjuncts = Sets.newHashSet();
for (Expression expr : filter.getConjuncts()) {
boolean pushed = false;
for (Expression partitionKey : commonPartitionKeys) {
// partitionKey is a single slot reference,
// we want to push expressions which have only one input slot, and the input slot is used as
// partition key in all windowExpressions.
if (partitionKey.getInputSlots().containsAll(expr.getInputSlots())) {
bottomConjuncts.add(expr);
pushed = true;
break;
}
}
if (!pushed) {
if (commonPartitionKeys.containsAll(expr.getInputSlots())) {
bottomConjuncts.add(expr);
} else {
upperConjuncts.add(expr);
}
}

View File

@ -0,0 +1,31 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !multi_column_predicate_push_down_window_shape --
PhysicalResultSink
--filter((num <= 2))
----PhysicalWindow
------PhysicalQuickSort[LOCAL_SORT]
--------PhysicalPartitionTopN
----------PhysicalProject
------------filter((abs((id + value1)) < 4))
--------------PhysicalOlapScan[push_down_multi_column_predicate_through_window_t]
-- !multi_column_predicate_push_down_window --
1 1 10
1 2 20
-- !multi_column_or_predicate_push_down_window_shape --
PhysicalResultSink
--filter((rc < 2))
----PhysicalWindow
------PhysicalQuickSort[LOCAL_SORT]
--------PhysicalPartitionTopN
----------PhysicalProject
------------filter(((t.id > 1) OR (t.value1 > 2)))
--------------PhysicalOlapScan[push_down_multi_column_predicate_through_window_t]
-- !multi_column_or_predicate_push_down_window --
1 10 1
2 20 1
3 30 1
4 40 1

View File

@ -0,0 +1,44 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
suite("push_down_filter_through_window") {
sql "SET enable_nereids_planner=true"
sql "SET enable_fallback_to_original_planner=false"
sql "set ignore_shape_nodes='PhysicalDistribute'"
sql "drop table if exists push_down_multi_column_predicate_through_window_t"
multi_sql """
CREATE TABLE push_down_multi_column_predicate_through_window_t (id INT, value1 INT, value2 VARCHAR(50))
properties("replication_num"="1");
INSERT INTO push_down_multi_column_predicate_through_window_t (id, value1, value2) VALUES(1, 10, 'A'),(2, 20, 'B'),(3, 30, 'C'),(4, 40, 'D');
"""
qt_multi_column_predicate_push_down_window_shape """
explain shape plan
select * from (select row_number() over(partition by id,value1 order by value1) as num, id, value1 from push_down_multi_column_predicate_through_window_t ) t
where abs(id+value1)<4 and num<=2;
"""
qt_multi_column_predicate_push_down_window """
select * from (select row_number() over(partition by id,value1 order by value1) as num, id, value1 from push_down_multi_column_predicate_through_window_t ) t
where abs(id+value1)<30 and num<=2 order by id,value1,num;
"""
qt_multi_column_or_predicate_push_down_window_shape """
explain shape plan
select * from (select id,value1, row_number() over(partition by id,value1 order by value1) rc from push_down_multi_column_predicate_through_window_t ) t where (id>1 or value1>2) and rc<2;
"""
qt_multi_column_or_predicate_push_down_window """
select * from (select id,value1, row_number() over(partition by id,value1 order by value1) rc from push_down_multi_column_predicate_through_window_t ) t where (id>1 or value1>2) and rc<2 order by 1,2 ;
"""
}