From 0fcdab468db5915fd6b30de819fc04398007abb2 Mon Sep 17 00:00:00 2001 From: xzj7019 <131111794+xzj7019@users.noreply.github.com> Date: Wed, 28 Feb 2024 18:47:11 +0800 Subject: [PATCH] [nereids] config global partition topn (#31476) * [nereids] config global partition topn * [nereids] config global partition topn --------- Co-authored-by: zhongjian.xzj --- ...lPartitionTopNToPhysicalPartitionTopN.java | 70 ++++++++++++++++- .../org/apache/doris/qe/SessionVariable.java | 13 ++++ .../limit_push_down/limit_push_down.out | 2 +- .../push_filter_through_ptopn.out | 2 +- .../noStatsRfPrune/query67.out | 2 +- .../no_stats_shape/query67.out | 2 +- .../test_global_partition_topn_plan.groovy | 76 +++++++++++++++++++ 7 files changed, 162 insertions(+), 5 deletions(-) create mode 100644 regression-test/suites/nereids_p0/explain/test_global_partition_topn_plan.groovy diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalPartitionTopNToPhysicalPartitionTopN.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalPartitionTopNToPhysicalPartitionTopN.java index 2b6fcfe464..410e68f3ec 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalPartitionTopNToPhysicalPartitionTopN.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalPartitionTopNToPhysicalPartitionTopN.java @@ -17,18 +17,26 @@ package org.apache.doris.nereids.rules.implementation; +import org.apache.doris.nereids.memo.Group; import org.apache.doris.nereids.properties.OrderKey; import org.apache.doris.nereids.rules.Rule; import org.apache.doris.nereids.rules.RuleType; +import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.OrderExpression; +import org.apache.doris.nereids.trees.expressions.SlotReference; import org.apache.doris.nereids.trees.plans.PartitionTopnPhase; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.logical.LogicalPartitionTopN; import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.statistics.ColumnStatistic; +import org.apache.doris.statistics.Statistics; import com.google.common.collect.ImmutableList; import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; /** * Implementation rule that convert logical partition-top-n to physical partition-top-n. @@ -42,7 +50,8 @@ public class LogicalPartitionTopNToPhysicalPartitionTopN extends OneImplementati private List> generatePhysicalPartitionTopn( LogicalPartitionTopN logicalPartitionTopN) { - if (logicalPartitionTopN.getPartitionKeys().isEmpty()) { + if (logicalPartitionTopN.getPartitionKeys().isEmpty() + || !checkTwoPhaseGlobalPartitionTopn(logicalPartitionTopN)) { // if no partition by keys, use local partition topn combined with further full sort List orderKeys = !logicalPartitionTopN.getOrderKeys().isEmpty() ? logicalPartitionTopN.getOrderKeys().stream() @@ -99,6 +108,65 @@ public class LogicalPartitionTopNToPhysicalPartitionTopN extends OneImplementati } } + /** + * check if partition keys' ndv is almost near the total row count. + * if yes, it is not suitable for two phase global partition topn. + */ + private boolean checkTwoPhaseGlobalPartitionTopn(LogicalPartitionTopN logicalPartitionTopN) { + double globalPartitionTopnThreshold = ConnectContext.get().getSessionVariable() + .getGlobalPartitionTopNThreshold(); + if (logicalPartitionTopN.getGroupExpression().isPresent()) { + Group group = logicalPartitionTopN.getGroupExpression().get().getOwnerGroup(); + if (group != null && group.getStatistics() != null) { + Statistics stats = group.getStatistics(); + double rowCount = stats.getRowCount(); + List partitionKeys = logicalPartitionTopN.getPartitionKeys(); + if (!checkPartitionKeys(partitionKeys)) { + return false; + } + List partitionByKeyStats = partitionKeys.stream() + .map(partitionKey -> stats.findColumnStatistics(partitionKey)) + .filter(Objects::nonNull) + .filter(e -> !e.isUnKnown) + .collect(Collectors.toList()); + if (partitionByKeyStats.size() != partitionKeys.size()) { + return false; + } else { + List ndvs = partitionByKeyStats.stream().map(s -> s.ndv) + .filter(e -> e > 0 && !Double.isInfinite(e)) + .collect(Collectors.toList()); + if (ndvs.size() != partitionByKeyStats.size()) { + return false; + } else { + double maxNdv = ndvs.stream().max(Double::compare).get(); + return rowCount / maxNdv >= globalPartitionTopnThreshold; + } + } + } else { + return false; + } + } else { + return false; + } + } + + /** + * global partition topn only take effect if partition keys are columns from basic table + */ + private boolean checkPartitionKeys(List partitionKeys) { + for (Expression expr : partitionKeys) { + if (!(expr instanceof SlotReference)) { + return false; + } else { + SlotReference slot = (SlotReference) expr; + if (!slot.getColumn().isPresent() || !slot.getTable().isPresent()) { + return false; + } + } + } + return true; + } + private ImmutableList getAllOrderKeys(LogicalPartitionTopN logicalPartitionTopN) { ImmutableList.Builder builder = ImmutableList.builder(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 207c48e1f2..ed2f1dcbd4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -209,6 +209,8 @@ public class SessionVariable implements Serializable, Writable { public static final String MAX_JOIN_NUMBER_BUSHY_TREE = "max_join_number_bushy_tree"; public static final String ENABLE_PARTITION_TOPN = "enable_partition_topn"; + public static final String GLOBAL_PARTITION_TOPN_THRESHOLD = "global_partition_topn_threshold"; + public static final String ENABLE_INFER_PREDICATE = "enable_infer_predicate"; public static final long DEFAULT_INSERT_VISIBLE_TIMEOUT_MS = 10_000; @@ -1027,6 +1029,9 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = ENABLE_PARTITION_TOPN) private boolean enablePartitionTopN = true; + @VariableMgr.VarAttr(name = GLOBAL_PARTITION_TOPN_THRESHOLD) + private double globalPartitionTopNThreshold = 100; + @VariableMgr.VarAttr(name = ENABLE_INFER_PREDICATE) private boolean enableInferPredicate = true; @@ -2533,6 +2538,14 @@ public class SessionVariable implements Serializable, Writable { this.enablePartitionTopN = enablePartitionTopN; } + public double getGlobalPartitionTopNThreshold() { + return globalPartitionTopNThreshold; + } + + public void setGlobalPartitionTopnThreshold(int threshold) { + this.globalPartitionTopNThreshold = threshold; + } + public boolean isEnableFoldNondeterministicFn() { return enableFoldNondeterministicFn; } diff --git a/regression-test/data/nereids_rules_p0/limit_push_down/limit_push_down.out b/regression-test/data/nereids_rules_p0/limit_push_down/limit_push_down.out index 31ffeee07d..cbf8a09c7e 100644 --- a/regression-test/data/nereids_rules_p0/limit_push_down/limit_push_down.out +++ b/regression-test/data/nereids_rules_p0/limit_push_down/limit_push_down.out @@ -389,7 +389,7 @@ PhysicalResultSink --PhysicalLimit[GLOBAL] ----PhysicalLimit[LOCAL] ------PhysicalWindow ---------PhysicalPartitionTopN +--------PhysicalQuickSort[LOCAL_SORT] ----------PhysicalPartitionTopN ------------hashAgg[GLOBAL] --------------hashAgg[LOCAL] diff --git a/regression-test/data/nereids_syntax_p0/push_filter_through_ptopn.out b/regression-test/data/nereids_syntax_p0/push_filter_through_ptopn.out index e37f73ef79..dd23fd8ea5 100644 --- a/regression-test/data/nereids_syntax_p0/push_filter_through_ptopn.out +++ b/regression-test/data/nereids_syntax_p0/push_filter_through_ptopn.out @@ -44,7 +44,7 @@ PhysicalResultSink ----PhysicalProject ------filter((T.b = 2) and (rn <= 2)) --------PhysicalWindow -----------PhysicalPartitionTopN +----------PhysicalQuickSort[LOCAL_SORT] ------------PhysicalDistribute[DistributionSpecHash] --------------PhysicalPartitionTopN ----------------PhysicalProject diff --git a/regression-test/data/nereids_tpcds_shape_sf100_p0/noStatsRfPrune/query67.out b/regression-test/data/nereids_tpcds_shape_sf100_p0/noStatsRfPrune/query67.out index bd3f6458a3..246d233728 100644 --- a/regression-test/data/nereids_tpcds_shape_sf100_p0/noStatsRfPrune/query67.out +++ b/regression-test/data/nereids_tpcds_shape_sf100_p0/noStatsRfPrune/query67.out @@ -6,7 +6,7 @@ PhysicalResultSink ------PhysicalTopN[LOCAL_SORT] --------filter((rk <= 100)) ----------PhysicalWindow -------------PhysicalPartitionTopN +------------PhysicalQuickSort[LOCAL_SORT] --------------PhysicalDistribute[DistributionSpecHash] ----------------PhysicalPartitionTopN ------------------PhysicalProject diff --git a/regression-test/data/nereids_tpcds_shape_sf100_p0/no_stats_shape/query67.out b/regression-test/data/nereids_tpcds_shape_sf100_p0/no_stats_shape/query67.out index d069db662b..900fe97ff0 100644 --- a/regression-test/data/nereids_tpcds_shape_sf100_p0/no_stats_shape/query67.out +++ b/regression-test/data/nereids_tpcds_shape_sf100_p0/no_stats_shape/query67.out @@ -6,7 +6,7 @@ PhysicalResultSink ------PhysicalTopN[LOCAL_SORT] --------filter((rk <= 100)) ----------PhysicalWindow -------------PhysicalPartitionTopN +------------PhysicalQuickSort[LOCAL_SORT] --------------PhysicalDistribute[DistributionSpecHash] ----------------PhysicalPartitionTopN ------------------PhysicalProject diff --git a/regression-test/suites/nereids_p0/explain/test_global_partition_topn_plan.groovy b/regression-test/suites/nereids_p0/explain/test_global_partition_topn_plan.groovy new file mode 100644 index 0000000000..7141ac9fce --- /dev/null +++ b/regression-test/suites/nereids_p0/explain/test_global_partition_topn_plan.groovy @@ -0,0 +1,76 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_global_partition_topn_plan") { + sql "SET enable_nereids_planner=true" + sql "SET enable_fallback_to_original_planner=false" + + sql "DROP TABLE IF EXISTS test_global_partition_topn_plan" + sql """ CREATE TABLE `test_global_partition_topn_plan` ( + c1 int, c2 int, c3 int + )ENGINE=OLAP + distributed by hash(c1) buckets 10 + properties( + "replication_allocation" = "tag.location.default: 1" + );""" + + sql """ alter table test_global_partition_topn_plan modify column c1 set stats('row_count'='52899687', 'ndv'='52899687', 'num_nulls'='0', 'min_value'='1', 'max_value'='52899687', 'data_size'='4'); """ + sql """ alter table test_global_partition_topn_plan modify column c2 set stats('row_count'='52899687', 'ndv'='23622730', 'num_nulls'='0', 'min_value'='1', 'max_value'='52899687', 'data_size'='4'); """ + sql """ alter table test_global_partition_topn_plan modify column c3 set stats('row_count'='52899687', 'ndv'='2', 'num_nulls'='0', 'min_value'='0', 'max_value'='1', 'data_size'='4'); """ + + sql "SET global_partition_topn_threshold=2" + explain { + sql("shape plan select rn from (select row_number() over (partition by c2 order by c3) as rn from test_global_partition_topn_plan) tmp where rn <= 100"); + contains"PhysicalPartitionTopN" + notContains"PhysicalQuickSort" + } + + sql "SET global_partition_topn_threshold=3" + explain { + sql("shape plan select rn from (select row_number() over (partition by c2 order by c3) as rn from test_global_partition_topn_plan) tmp where rn <= 100"); + contains"PhysicalPartitionTopN" + contains"PhysicalQuickSort" + } + + sql "SET global_partition_topn_threshold=100" + explain { + sql("shape plan select rn from (select row_number() over (partition by c3 order by c2) as rn from test_global_partition_topn_plan) tmp where rn <= 100"); + contains"PhysicalPartitionTopN" + notContains"PhysicalQuickSort" + } + + sql "SET global_partition_topn_threshold=2" + explain { + sql("shape plan select rn from (select row_number() over (partition by c2, c3 order by c1) as rn from test_global_partition_topn_plan) tmp where rn <= 100"); + contains"PhysicalPartitionTopN" + notContains"PhysicalQuickSort" + } + + sql "SET global_partition_topn_threshold=3" + explain { + sql("shape plan select rn from (select row_number() over (partition by c2, c3 order by c1) as rn from test_global_partition_topn_plan) tmp where rn <= 100"); + contains"PhysicalPartitionTopN" + contains"PhysicalQuickSort" + } + + sql "SET global_partition_topn_threshold=2" + explain { + sql("shape plan select rn from (select row_number() over (partition by c2 + c3 order by c1) as rn from test_global_partition_topn_plan) tmp where rn <= 100"); + contains"PhysicalPartitionTopN" + contains"PhysicalQuickSort" + } +}