From 349b943e126c9bd9f7e4ed3530c1bbed60d1bf21 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B0=A2=E5=81=A5?= Date: Wed, 19 Jun 2024 14:49:09 +0800 Subject: [PATCH] [opt](Nereids) Optimize Join Penalty Calculation Based on Build Side Data Volume (#36107) pick from master #35773 This PR introduces an optimization that adjusts the penalty applied during join operations based on the volume of data on the build side. Specifically, when the number of rows and width of the tables being joined are equal, the materialization costs are now considered more accurately. The update ensures that joins with a larger dataset on the build side incur a higher penalty, improving overall query performance and resource allocation. --- .../doris/nereids/cost/CostModelV1.java | 7 ++++ .../apache/doris/statistics/Statistics.java | 2 +- .../doris/nereids/cost/CostModelV1Test.java | 42 +++++++++++++++++++ .../eager_aggregate/basic.out | 10 ++--- .../push_down_count_through_join.out | 4 +- .../push_down_count_through_join_one_side.out | 4 +- .../push_down_max_through_join.out | 2 +- .../push_down_min_through_join.out | 2 +- .../push_down_sum_through_join.out | 8 ++-- .../push_down_sum_through_join_one_side.out | 2 +- .../eliminate_outer_join.out | 4 +- .../noStatsRfPrune/query14.out | 14 +++---- .../no_stats_shape/query14.out | 16 +++---- 13 files changed, 83 insertions(+), 34 deletions(-) create mode 100644 fe/fe-core/src/test/java/org/apache/doris/nereids/cost/CostModelV1Test.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/cost/CostModelV1.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/cost/CostModelV1.java index 483ccfbcc7..e824891365 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/cost/CostModelV1.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/cost/CostModelV1.java @@ -298,6 +298,13 @@ class CostModelV1 extends PlanVisitor { if (rightConnectivity < leftConnectivity) { leftRowCount += 1; } + if (probeStats.getWidthInJoinCluster() == buildStats.getWidthInJoinCluster() + && probeStats.computeTupleSize() < buildStats.computeTupleSize()) { + // When the number of rows and the width on both sides of the join are the same, + // we need to consider the cost of materializing the output. + // When there is more data on the build side, a greater penalty will be given. + leftRowCount += 1e-3; + } } /* diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/Statistics.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/Statistics.java index aaa04ac052..3d961982d1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/Statistics.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/Statistics.java @@ -122,7 +122,7 @@ public class Statistics { && expressionToColumnStats.get(s).isUnKnown); } - private double computeTupleSize() { + public double computeTupleSize() { if (tupleSize <= 0) { double tempSize = 0.0; for (ColumnStatistic s : expressionToColumnStats.values()) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/cost/CostModelV1Test.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/cost/CostModelV1Test.java new file mode 100644 index 0000000000..9ebd933d46 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/cost/CostModelV1Test.java @@ -0,0 +1,42 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.cost; + +import org.apache.doris.nereids.sqltest.SqlTestBase; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin; +import org.apache.doris.nereids.util.PlanChecker; + +import org.junit.jupiter.api.Test; + +class CostModelV1Test extends SqlTestBase { + + @Test + void testMaterializingCost() { + String sql = "select T1.id, T2.id, T2.score from T1 left join T2 " + + "on T1.id = T2.id"; + connectContext.getSessionVariable().setDisableNereidsRules("PRUNE_EMPTY_PARTITION"); + Plan p = PlanChecker.from(connectContext) + .analyze(sql) + .rewrite() + .deriveStats() + .optimize() + .getBestPlanTree(); + p.anyMatch(j -> j instanceof PhysicalHashJoin && ((PhysicalHashJoin) j).getJoinType().isRightJoin()); + } +} diff --git a/regression-test/data/nereids_rules_p0/eager_aggregate/basic.out b/regression-test/data/nereids_rules_p0/eager_aggregate/basic.out index a526355634..0f52d9c808 100644 --- a/regression-test/data/nereids_rules_p0/eager_aggregate/basic.out +++ b/regression-test/data/nereids_rules_p0/eager_aggregate/basic.out @@ -5,11 +5,11 @@ PhysicalResultSink ----hashAgg[LOCAL] ------hashJoin[INNER_JOIN] hashCondition=((a.device_id = b.device_id)) otherCondition=() --------hashAgg[LOCAL] -----------filter((a.event_id = 'ad_click')) -------------PhysicalOlapScan[com_dd_library] ---------hashAgg[LOCAL] ----------filter((cast(experiment_id as DOUBLE) = 37.0)) ------------PhysicalOlapScan[shunt_log_com_dd_library] +--------hashAgg[LOCAL] +----------filter((a.event_id = 'ad_click')) +------------PhysicalOlapScan[com_dd_library] -- !2 -- PhysicalResultSink @@ -37,7 +37,7 @@ PhysicalResultSink ----hashAgg[LOCAL] ------hashJoin[INNER_JOIN] hashCondition=((a.device_id = b.device_id)) otherCondition=() --------hashAgg[LOCAL] -----------PhysicalOlapScan[com_dd_library] ---------hashAgg[LOCAL] ----------PhysicalOlapScan[shunt_log_com_dd_library] +--------hashAgg[LOCAL] +----------PhysicalOlapScan[com_dd_library] diff --git a/regression-test/data/nereids_rules_p0/eager_aggregate/push_down_count_through_join.out b/regression-test/data/nereids_rules_p0/eager_aggregate/push_down_count_through_join.out index 2cdf0df3cf..8c822ef4a6 100644 --- a/regression-test/data/nereids_rules_p0/eager_aggregate/push_down_count_through_join.out +++ b/regression-test/data/nereids_rules_p0/eager_aggregate/push_down_count_through_join.out @@ -21,7 +21,7 @@ PhysicalResultSink PhysicalResultSink --hashAgg[GLOBAL] ----hashAgg[LOCAL] -------hashJoin[LEFT_OUTER_JOIN] hashCondition=((t1.id = t2.id)) otherCondition=() +------hashJoin[RIGHT_OUTER_JOIN] hashCondition=((t1.id = t2.id)) otherCondition=() --------PhysicalOlapScan[count_t] --------PhysicalOlapScan[count_t] @@ -272,7 +272,7 @@ PhysicalResultSink PhysicalResultSink --hashAgg[GLOBAL] ----hashAgg[LOCAL] -------hashJoin[LEFT_OUTER_JOIN] hashCondition=((t1.id = t2.id)) otherCondition=() +------hashJoin[RIGHT_OUTER_JOIN] hashCondition=((t1.id = t2.id)) otherCondition=() --------PhysicalOlapScan[count_t] --------PhysicalOlapScan[count_t] diff --git a/regression-test/data/nereids_rules_p0/eager_aggregate/push_down_count_through_join_one_side.out b/regression-test/data/nereids_rules_p0/eager_aggregate/push_down_count_through_join_one_side.out index 6ad731d573..c3132e1811 100644 --- a/regression-test/data/nereids_rules_p0/eager_aggregate/push_down_count_through_join_one_side.out +++ b/regression-test/data/nereids_rules_p0/eager_aggregate/push_down_count_through_join_one_side.out @@ -20,7 +20,7 @@ PhysicalResultSink PhysicalResultSink --hashAgg[GLOBAL] ----hashAgg[LOCAL] -------hashJoin[LEFT_OUTER_JOIN] hashCondition=((t1.id = t2.id)) otherCondition=() +------hashJoin[RIGHT_OUTER_JOIN] hashCondition=((t1.id = t2.id)) otherCondition=() --------PhysicalOlapScan[count_t_one_side] --------PhysicalOlapScan[count_t_one_side] @@ -257,7 +257,7 @@ PhysicalResultSink PhysicalResultSink --hashAgg[GLOBAL] ----hashAgg[LOCAL] -------hashJoin[LEFT_OUTER_JOIN] hashCondition=((t1.id = t2.id)) otherCondition=() +------hashJoin[RIGHT_OUTER_JOIN] hashCondition=((t1.id = t2.id)) otherCondition=() --------PhysicalOlapScan[count_t_one_side] --------PhysicalOlapScan[count_t_one_side] diff --git a/regression-test/data/nereids_rules_p0/eager_aggregate/push_down_max_through_join.out b/regression-test/data/nereids_rules_p0/eager_aggregate/push_down_max_through_join.out index 9a7cfa6a4f..7725ac6eee 100644 --- a/regression-test/data/nereids_rules_p0/eager_aggregate/push_down_max_through_join.out +++ b/regression-test/data/nereids_rules_p0/eager_aggregate/push_down_max_through_join.out @@ -20,7 +20,7 @@ PhysicalResultSink PhysicalResultSink --hashAgg[GLOBAL] ----hashAgg[LOCAL] -------hashJoin[LEFT_OUTER_JOIN] hashCondition=((t1.id = t2.id)) otherCondition=() +------hashJoin[RIGHT_OUTER_JOIN] hashCondition=((t1.id = t2.id)) otherCondition=() --------PhysicalOlapScan[max_t] --------PhysicalOlapScan[max_t] diff --git a/regression-test/data/nereids_rules_p0/eager_aggregate/push_down_min_through_join.out b/regression-test/data/nereids_rules_p0/eager_aggregate/push_down_min_through_join.out index 3e2ccc6f43..c0efc8db16 100644 --- a/regression-test/data/nereids_rules_p0/eager_aggregate/push_down_min_through_join.out +++ b/regression-test/data/nereids_rules_p0/eager_aggregate/push_down_min_through_join.out @@ -20,7 +20,7 @@ PhysicalResultSink PhysicalResultSink --hashAgg[GLOBAL] ----hashAgg[LOCAL] -------hashJoin[LEFT_OUTER_JOIN] hashCondition=((t1.id = t2.id)) otherCondition=() +------hashJoin[RIGHT_OUTER_JOIN] hashCondition=((t1.id = t2.id)) otherCondition=() --------PhysicalOlapScan[min_t] --------PhysicalOlapScan[min_t] diff --git a/regression-test/data/nereids_rules_p0/eager_aggregate/push_down_sum_through_join.out b/regression-test/data/nereids_rules_p0/eager_aggregate/push_down_sum_through_join.out index 106d888207..2b08c5889d 100644 --- a/regression-test/data/nereids_rules_p0/eager_aggregate/push_down_sum_through_join.out +++ b/regression-test/data/nereids_rules_p0/eager_aggregate/push_down_sum_through_join.out @@ -21,7 +21,7 @@ PhysicalResultSink PhysicalResultSink --hashAgg[GLOBAL] ----hashAgg[LOCAL] -------hashJoin[LEFT_OUTER_JOIN] hashCondition=((t1.id = t2.id)) otherCondition=() +------hashJoin[RIGHT_OUTER_JOIN] hashCondition=((t1.id = t2.id)) otherCondition=() --------PhysicalOlapScan[sum_t] --------PhysicalOlapScan[sum_t] @@ -122,14 +122,14 @@ PhysicalResultSink ----hashJoin[INNER_JOIN] hashCondition=((t1.name = t3.name)) otherCondition=() ------hashAgg[GLOBAL] --------hashAgg[LOCAL] +----------PhysicalOlapScan[sum_t] +------hashAgg[GLOBAL] +--------hashAgg[LOCAL] ----------hashJoin[INNER_JOIN] hashCondition=((t1.id = t2.id)) otherCondition=() ------------hashAgg[LOCAL] --------------PhysicalOlapScan[sum_t] ------------hashAgg[LOCAL] --------------PhysicalOlapScan[sum_t] -------hashAgg[GLOBAL] ---------hashAgg[LOCAL] -----------PhysicalOlapScan[sum_t] -- !groupby_pushdown_with_order_by -- PhysicalResultSink diff --git a/regression-test/data/nereids_rules_p0/eager_aggregate/push_down_sum_through_join_one_side.out b/regression-test/data/nereids_rules_p0/eager_aggregate/push_down_sum_through_join_one_side.out index 65d3a7b68f..f36cb82677 100644 --- a/regression-test/data/nereids_rules_p0/eager_aggregate/push_down_sum_through_join_one_side.out +++ b/regression-test/data/nereids_rules_p0/eager_aggregate/push_down_sum_through_join_one_side.out @@ -20,7 +20,7 @@ PhysicalResultSink PhysicalResultSink --hashAgg[GLOBAL] ----hashAgg[LOCAL] -------hashJoin[LEFT_OUTER_JOIN] hashCondition=((t1.id = t2.id)) otherCondition=() +------hashJoin[RIGHT_OUTER_JOIN] hashCondition=((t1.id = t2.id)) otherCondition=() --------PhysicalOlapScan[sum_t_one_side] --------PhysicalOlapScan[sum_t_one_side] diff --git a/regression-test/data/nereids_rules_p0/eliminate_outer_join/eliminate_outer_join.out b/regression-test/data/nereids_rules_p0/eliminate_outer_join/eliminate_outer_join.out index 8c2e3ad837..c08e68d3c7 100644 --- a/regression-test/data/nereids_rules_p0/eliminate_outer_join/eliminate_outer_join.out +++ b/regression-test/data/nereids_rules_p0/eliminate_outer_join/eliminate_outer_join.out @@ -165,7 +165,7 @@ PhysicalResultSink ----PhysicalProject ------filter((count(id) > 1)) --------hashAgg[LOCAL] -----------hashJoin[LEFT_OUTER_JOIN] hashCondition=((t1.id = t2.id)) otherCondition=() +----------hashJoin[RIGHT_OUTER_JOIN] hashCondition=((t1.id = t2.id)) otherCondition=() ------------PhysicalProject --------------PhysicalOlapScan[t] ------------PhysicalProject @@ -221,7 +221,7 @@ PhysicalResultSink --PhysicalDistribute[DistributionSpecGather] ----hashAgg[LOCAL] ------PhysicalProject ---------hashJoin[LEFT_OUTER_JOIN] hashCondition=((t1.id = t2.id)) otherCondition=() +--------hashJoin[RIGHT_OUTER_JOIN] hashCondition=((t1.id = t2.id)) otherCondition=() ----------PhysicalProject ------------PhysicalOlapScan[t] ----------PhysicalProject diff --git a/regression-test/data/nereids_tpcds_shape_sf100_p0/noStatsRfPrune/query14.out b/regression-test/data/nereids_tpcds_shape_sf100_p0/noStatsRfPrune/query14.out index 8dd1172b32..3186b9c86b 100644 --- a/regression-test/data/nereids_tpcds_shape_sf100_p0/noStatsRfPrune/query14.out +++ b/regression-test/data/nereids_tpcds_shape_sf100_p0/noStatsRfPrune/query14.out @@ -158,16 +158,16 @@ PhysicalCteAnchor ( cteId=CTEId#0 ) ------------------------------------PhysicalProject --------------------------------------hashJoin[INNER_JOIN] hashCondition=((web_sales.ws_sold_date_sk = date_dim.d_date_sk)) otherCondition=() build RFs:RF18 d_date_sk->[ws_sold_date_sk] ----------------------------------------PhysicalProject -------------------------------------------hashJoin[INNER_JOIN] hashCondition=((web_sales.ws_item_sk = item.i_item_sk)) otherCondition=() ---------------------------------------------hashJoin[RIGHT_SEMI_JOIN] hashCondition=((web_sales.ws_item_sk = cross_items.ss_item_sk)) otherCondition=() -----------------------------------------------PhysicalDistribute[DistributionSpecHash] -------------------------------------------------PhysicalCteConsumer ( cteId=CTEId#0 ) +------------------------------------------hashJoin[RIGHT_SEMI_JOIN] hashCondition=((web_sales.ws_item_sk = cross_items.ss_item_sk)) otherCondition=() build RFs:RF17 ws_item_sk->[ss_item_sk] +--------------------------------------------PhysicalDistribute[DistributionSpecHash] +----------------------------------------------PhysicalCteConsumer ( cteId=CTEId#0 ) apply RFs: RF17 +--------------------------------------------hashJoin[INNER_JOIN] hashCondition=((web_sales.ws_item_sk = item.i_item_sk)) otherCondition=() ----------------------------------------------PhysicalDistribute[DistributionSpecHash] ------------------------------------------------PhysicalProject --------------------------------------------------PhysicalOlapScan[web_sales] apply RFs: RF18 ---------------------------------------------PhysicalDistribute[DistributionSpecHash] -----------------------------------------------PhysicalProject -------------------------------------------------PhysicalOlapScan[item] +----------------------------------------------PhysicalDistribute[DistributionSpecHash] +------------------------------------------------PhysicalProject +--------------------------------------------------PhysicalOlapScan[item] ----------------------------------------PhysicalDistribute[DistributionSpecReplicated] ------------------------------------------PhysicalProject --------------------------------------------filter((date_dim.d_moy = 11) and (date_dim.d_year = 2002)) diff --git a/regression-test/data/nereids_tpcds_shape_sf100_p0/no_stats_shape/query14.out b/regression-test/data/nereids_tpcds_shape_sf100_p0/no_stats_shape/query14.out index 76221a53ac..c2b2f96ca8 100644 --- a/regression-test/data/nereids_tpcds_shape_sf100_p0/no_stats_shape/query14.out +++ b/regression-test/data/nereids_tpcds_shape_sf100_p0/no_stats_shape/query14.out @@ -158,16 +158,16 @@ PhysicalCteAnchor ( cteId=CTEId#0 ) ------------------------------------PhysicalProject --------------------------------------hashJoin[INNER_JOIN] hashCondition=((web_sales.ws_sold_date_sk = date_dim.d_date_sk)) otherCondition=() build RFs:RF18 d_date_sk->[ws_sold_date_sk] ----------------------------------------PhysicalProject -------------------------------------------hashJoin[INNER_JOIN] hashCondition=((web_sales.ws_item_sk = item.i_item_sk)) otherCondition=() build RFs:RF17 i_item_sk->[ss_item_sk,ws_item_sk] ---------------------------------------------hashJoin[RIGHT_SEMI_JOIN] hashCondition=((web_sales.ws_item_sk = cross_items.ss_item_sk)) otherCondition=() build RFs:RF16 ws_item_sk->[ss_item_sk] -----------------------------------------------PhysicalDistribute[DistributionSpecHash] -------------------------------------------------PhysicalCteConsumer ( cteId=CTEId#0 ) apply RFs: RF16 RF17 +------------------------------------------hashJoin[RIGHT_SEMI_JOIN] hashCondition=((web_sales.ws_item_sk = cross_items.ss_item_sk)) otherCondition=() build RFs:RF17 ws_item_sk->[ss_item_sk] +--------------------------------------------PhysicalDistribute[DistributionSpecHash] +----------------------------------------------PhysicalCteConsumer ( cteId=CTEId#0 ) apply RFs: RF17 +--------------------------------------------hashJoin[INNER_JOIN] hashCondition=((web_sales.ws_item_sk = item.i_item_sk)) otherCondition=() build RFs:RF16 i_item_sk->[ws_item_sk] ----------------------------------------------PhysicalDistribute[DistributionSpecHash] ------------------------------------------------PhysicalProject ---------------------------------------------------PhysicalOlapScan[web_sales] apply RFs: RF17 RF18 ---------------------------------------------PhysicalDistribute[DistributionSpecHash] -----------------------------------------------PhysicalProject -------------------------------------------------PhysicalOlapScan[item] +--------------------------------------------------PhysicalOlapScan[web_sales] apply RFs: RF16 RF18 +----------------------------------------------PhysicalDistribute[DistributionSpecHash] +------------------------------------------------PhysicalProject +--------------------------------------------------PhysicalOlapScan[item] ----------------------------------------PhysicalDistribute[DistributionSpecReplicated] ------------------------------------------PhysicalProject --------------------------------------------filter((date_dim.d_moy = 11) and (date_dim.d_year = 2002))