[nereids] config global partition topn (#31476)
* [nereids] config global partition topn * [nereids] config global partition topn --------- Co-authored-by: zhongjian.xzj <zhongjian.xzj@zhongjianxzjdeMacBook-Pro.local>
This commit is contained in:
@ -17,18 +17,26 @@
|
||||
|
||||
package org.apache.doris.nereids.rules.implementation;
|
||||
|
||||
import org.apache.doris.nereids.memo.Group;
|
||||
import org.apache.doris.nereids.properties.OrderKey;
|
||||
import org.apache.doris.nereids.rules.Rule;
|
||||
import org.apache.doris.nereids.rules.RuleType;
|
||||
import org.apache.doris.nereids.trees.expressions.Expression;
|
||||
import org.apache.doris.nereids.trees.expressions.OrderExpression;
|
||||
import org.apache.doris.nereids.trees.expressions.SlotReference;
|
||||
import org.apache.doris.nereids.trees.plans.PartitionTopnPhase;
|
||||
import org.apache.doris.nereids.trees.plans.Plan;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalPartitionTopN;
|
||||
import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.statistics.ColumnStatistic;
|
||||
import org.apache.doris.statistics.Statistics;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* Implementation rule that convert logical partition-top-n to physical partition-top-n.
|
||||
@ -42,7 +50,8 @@ public class LogicalPartitionTopNToPhysicalPartitionTopN extends OneImplementati
|
||||
|
||||
private List<PhysicalPartitionTopN<? extends Plan>> generatePhysicalPartitionTopn(
|
||||
LogicalPartitionTopN<? extends Plan> logicalPartitionTopN) {
|
||||
if (logicalPartitionTopN.getPartitionKeys().isEmpty()) {
|
||||
if (logicalPartitionTopN.getPartitionKeys().isEmpty()
|
||||
|| !checkTwoPhaseGlobalPartitionTopn(logicalPartitionTopN)) {
|
||||
// if no partition by keys, use local partition topn combined with further full sort
|
||||
List<OrderKey> orderKeys = !logicalPartitionTopN.getOrderKeys().isEmpty()
|
||||
? logicalPartitionTopN.getOrderKeys().stream()
|
||||
@ -99,6 +108,65 @@ public class LogicalPartitionTopNToPhysicalPartitionTopN extends OneImplementati
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* check if partition keys' ndv is almost near the total row count.
|
||||
* if yes, it is not suitable for two phase global partition topn.
|
||||
*/
|
||||
private boolean checkTwoPhaseGlobalPartitionTopn(LogicalPartitionTopN<? extends Plan> logicalPartitionTopN) {
|
||||
double globalPartitionTopnThreshold = ConnectContext.get().getSessionVariable()
|
||||
.getGlobalPartitionTopNThreshold();
|
||||
if (logicalPartitionTopN.getGroupExpression().isPresent()) {
|
||||
Group group = logicalPartitionTopN.getGroupExpression().get().getOwnerGroup();
|
||||
if (group != null && group.getStatistics() != null) {
|
||||
Statistics stats = group.getStatistics();
|
||||
double rowCount = stats.getRowCount();
|
||||
List<Expression> partitionKeys = logicalPartitionTopN.getPartitionKeys();
|
||||
if (!checkPartitionKeys(partitionKeys)) {
|
||||
return false;
|
||||
}
|
||||
List<ColumnStatistic> partitionByKeyStats = partitionKeys.stream()
|
||||
.map(partitionKey -> stats.findColumnStatistics(partitionKey))
|
||||
.filter(Objects::nonNull)
|
||||
.filter(e -> !e.isUnKnown)
|
||||
.collect(Collectors.toList());
|
||||
if (partitionByKeyStats.size() != partitionKeys.size()) {
|
||||
return false;
|
||||
} else {
|
||||
List<Double> ndvs = partitionByKeyStats.stream().map(s -> s.ndv)
|
||||
.filter(e -> e > 0 && !Double.isInfinite(e))
|
||||
.collect(Collectors.toList());
|
||||
if (ndvs.size() != partitionByKeyStats.size()) {
|
||||
return false;
|
||||
} else {
|
||||
double maxNdv = ndvs.stream().max(Double::compare).get();
|
||||
return rowCount / maxNdv >= globalPartitionTopnThreshold;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* global partition topn only take effect if partition keys are columns from basic table
|
||||
*/
|
||||
private boolean checkPartitionKeys(List<Expression> partitionKeys) {
|
||||
for (Expression expr : partitionKeys) {
|
||||
if (!(expr instanceof SlotReference)) {
|
||||
return false;
|
||||
} else {
|
||||
SlotReference slot = (SlotReference) expr;
|
||||
if (!slot.getColumn().isPresent() || !slot.getTable().isPresent()) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
private ImmutableList<OrderKey> getAllOrderKeys(LogicalPartitionTopN<? extends Plan> logicalPartitionTopN) {
|
||||
ImmutableList.Builder<OrderKey> builder = ImmutableList.builder();
|
||||
|
||||
|
||||
@ -209,6 +209,8 @@ public class SessionVariable implements Serializable, Writable {
|
||||
public static final String MAX_JOIN_NUMBER_BUSHY_TREE = "max_join_number_bushy_tree";
|
||||
public static final String ENABLE_PARTITION_TOPN = "enable_partition_topn";
|
||||
|
||||
public static final String GLOBAL_PARTITION_TOPN_THRESHOLD = "global_partition_topn_threshold";
|
||||
|
||||
public static final String ENABLE_INFER_PREDICATE = "enable_infer_predicate";
|
||||
|
||||
public static final long DEFAULT_INSERT_VISIBLE_TIMEOUT_MS = 10_000;
|
||||
@ -1027,6 +1029,9 @@ public class SessionVariable implements Serializable, Writable {
|
||||
@VariableMgr.VarAttr(name = ENABLE_PARTITION_TOPN)
|
||||
private boolean enablePartitionTopN = true;
|
||||
|
||||
@VariableMgr.VarAttr(name = GLOBAL_PARTITION_TOPN_THRESHOLD)
|
||||
private double globalPartitionTopNThreshold = 100;
|
||||
|
||||
@VariableMgr.VarAttr(name = ENABLE_INFER_PREDICATE)
|
||||
private boolean enableInferPredicate = true;
|
||||
|
||||
@ -2533,6 +2538,14 @@ public class SessionVariable implements Serializable, Writable {
|
||||
this.enablePartitionTopN = enablePartitionTopN;
|
||||
}
|
||||
|
||||
public double getGlobalPartitionTopNThreshold() {
|
||||
return globalPartitionTopNThreshold;
|
||||
}
|
||||
|
||||
public void setGlobalPartitionTopnThreshold(int threshold) {
|
||||
this.globalPartitionTopNThreshold = threshold;
|
||||
}
|
||||
|
||||
public boolean isEnableFoldNondeterministicFn() {
|
||||
return enableFoldNondeterministicFn;
|
||||
}
|
||||
|
||||
@ -389,7 +389,7 @@ PhysicalResultSink
|
||||
--PhysicalLimit[GLOBAL]
|
||||
----PhysicalLimit[LOCAL]
|
||||
------PhysicalWindow
|
||||
--------PhysicalPartitionTopN
|
||||
--------PhysicalQuickSort[LOCAL_SORT]
|
||||
----------PhysicalPartitionTopN
|
||||
------------hashAgg[GLOBAL]
|
||||
--------------hashAgg[LOCAL]
|
||||
|
||||
@ -44,7 +44,7 @@ PhysicalResultSink
|
||||
----PhysicalProject
|
||||
------filter((T.b = 2) and (rn <= 2))
|
||||
--------PhysicalWindow
|
||||
----------PhysicalPartitionTopN
|
||||
----------PhysicalQuickSort[LOCAL_SORT]
|
||||
------------PhysicalDistribute[DistributionSpecHash]
|
||||
--------------PhysicalPartitionTopN
|
||||
----------------PhysicalProject
|
||||
|
||||
@ -6,7 +6,7 @@ PhysicalResultSink
|
||||
------PhysicalTopN[LOCAL_SORT]
|
||||
--------filter((rk <= 100))
|
||||
----------PhysicalWindow
|
||||
------------PhysicalPartitionTopN
|
||||
------------PhysicalQuickSort[LOCAL_SORT]
|
||||
--------------PhysicalDistribute[DistributionSpecHash]
|
||||
----------------PhysicalPartitionTopN
|
||||
------------------PhysicalProject
|
||||
|
||||
@ -6,7 +6,7 @@ PhysicalResultSink
|
||||
------PhysicalTopN[LOCAL_SORT]
|
||||
--------filter((rk <= 100))
|
||||
----------PhysicalWindow
|
||||
------------PhysicalPartitionTopN
|
||||
------------PhysicalQuickSort[LOCAL_SORT]
|
||||
--------------PhysicalDistribute[DistributionSpecHash]
|
||||
----------------PhysicalPartitionTopN
|
||||
------------------PhysicalProject
|
||||
|
||||
@ -0,0 +1,76 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
suite("test_global_partition_topn_plan") {
|
||||
sql "SET enable_nereids_planner=true"
|
||||
sql "SET enable_fallback_to_original_planner=false"
|
||||
|
||||
sql "DROP TABLE IF EXISTS test_global_partition_topn_plan"
|
||||
sql """ CREATE TABLE `test_global_partition_topn_plan` (
|
||||
c1 int, c2 int, c3 int
|
||||
)ENGINE=OLAP
|
||||
distributed by hash(c1) buckets 10
|
||||
properties(
|
||||
"replication_allocation" = "tag.location.default: 1"
|
||||
);"""
|
||||
|
||||
sql """ alter table test_global_partition_topn_plan modify column c1 set stats('row_count'='52899687', 'ndv'='52899687', 'num_nulls'='0', 'min_value'='1', 'max_value'='52899687', 'data_size'='4'); """
|
||||
sql """ alter table test_global_partition_topn_plan modify column c2 set stats('row_count'='52899687', 'ndv'='23622730', 'num_nulls'='0', 'min_value'='1', 'max_value'='52899687', 'data_size'='4'); """
|
||||
sql """ alter table test_global_partition_topn_plan modify column c3 set stats('row_count'='52899687', 'ndv'='2', 'num_nulls'='0', 'min_value'='0', 'max_value'='1', 'data_size'='4'); """
|
||||
|
||||
sql "SET global_partition_topn_threshold=2"
|
||||
explain {
|
||||
sql("shape plan select rn from (select row_number() over (partition by c2 order by c3) as rn from test_global_partition_topn_plan) tmp where rn <= 100");
|
||||
contains"PhysicalPartitionTopN"
|
||||
notContains"PhysicalQuickSort"
|
||||
}
|
||||
|
||||
sql "SET global_partition_topn_threshold=3"
|
||||
explain {
|
||||
sql("shape plan select rn from (select row_number() over (partition by c2 order by c3) as rn from test_global_partition_topn_plan) tmp where rn <= 100");
|
||||
contains"PhysicalPartitionTopN"
|
||||
contains"PhysicalQuickSort"
|
||||
}
|
||||
|
||||
sql "SET global_partition_topn_threshold=100"
|
||||
explain {
|
||||
sql("shape plan select rn from (select row_number() over (partition by c3 order by c2) as rn from test_global_partition_topn_plan) tmp where rn <= 100");
|
||||
contains"PhysicalPartitionTopN"
|
||||
notContains"PhysicalQuickSort"
|
||||
}
|
||||
|
||||
sql "SET global_partition_topn_threshold=2"
|
||||
explain {
|
||||
sql("shape plan select rn from (select row_number() over (partition by c2, c3 order by c1) as rn from test_global_partition_topn_plan) tmp where rn <= 100");
|
||||
contains"PhysicalPartitionTopN"
|
||||
notContains"PhysicalQuickSort"
|
||||
}
|
||||
|
||||
sql "SET global_partition_topn_threshold=3"
|
||||
explain {
|
||||
sql("shape plan select rn from (select row_number() over (partition by c2, c3 order by c1) as rn from test_global_partition_topn_plan) tmp where rn <= 100");
|
||||
contains"PhysicalPartitionTopN"
|
||||
contains"PhysicalQuickSort"
|
||||
}
|
||||
|
||||
sql "SET global_partition_topn_threshold=2"
|
||||
explain {
|
||||
sql("shape plan select rn from (select row_number() over (partition by c2 + c3 order by c1) as rn from test_global_partition_topn_plan) tmp where rn <= 100");
|
||||
contains"PhysicalPartitionTopN"
|
||||
contains"PhysicalQuickSort"
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user