[feat](nereids)disable join reorder if column stats is invalid #41790 (branch-2.1) (#42902)

## Proposed changes
pick #41790
Issue Number: close #xxx

<!--Describe your changes.-->
This commit is contained in:
minghong
2024-10-30 23:47:03 +08:00
committed by GitHub
parent 6d23020cb8
commit 0d008b5a43
10 changed files with 105 additions and 133 deletions

View File

@ -260,7 +260,8 @@ public class NereidsPlanner extends Planner {
&& !cascadesContext.isLeadingDisableJoinReorder()) {
List<LogicalOlapScan> scans = cascadesContext.getRewritePlan()
.collectToList(LogicalOlapScan.class::isInstance);
StatsCalculator.disableJoinReorderIfTableRowCountNotAvailable(scans, cascadesContext);
Optional<String> reason = StatsCalculator.disableJoinReorderIfStatsInvalid(scans, cascadesContext);
reason.ifPresent(LOG::info);
}
optimize();
if (statementContext.getConnectContext().getExecutor() != null) {

View File

@ -18,6 +18,7 @@
package org.apache.doris.nereids.stats;
import org.apache.doris.analysis.IntLiteral;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.TableIf;
@ -262,27 +263,75 @@ public class StatsCalculator extends DefaultPlanVisitor<Statistics, Void> {
groupExpression.setStatDerived(true);
}
private boolean isVisibleSlotReference(Slot slot) {
if (slot instanceof SlotReference) {
Optional<Column> colOpt = ((SlotReference) slot).getColumn();
if (colOpt.isPresent()) {
return colOpt.get().isVisible();
}
}
return false;
}
private ColumnStatistic getColumnStatsFromTableCache(CatalogRelation catalogRelation, SlotReference slot) {
long idxId = -1;
if (catalogRelation instanceof OlapScan) {
idxId = ((OlapScan) catalogRelation).getSelectedIndexId();
}
return getColumnStatistic(catalogRelation.getTable(), slot.getName(), idxId);
}
// check validation of ndv.
private Optional<String> checkNdvValidation(OlapScan olapScan, double rowCount) {
for (Slot slot : ((Plan) olapScan).getOutput()) {
if (isVisibleSlotReference(slot)) {
ColumnStatistic cache = getColumnStatsFromTableCache((CatalogRelation) olapScan, (SlotReference) slot);
if (!cache.isUnKnown) {
if ((cache.ndv == 0 && (cache.minExpr != null || cache.maxExpr != null))
|| cache.ndv > rowCount * 10) {
return Optional.of("slot " + slot.getName() + " has invalid column stats: " + cache);
}
}
}
}
return Optional.empty();
}
/**
* disable join reorder if any table row count is not available.
* disable join reorder if
* 1. any table rowCount is not available, or
* 2. col stats ndv=0 but minExpr or maxExpr is not null
* 3. ndv > 10 * rowCount
*/
public static void disableJoinReorderIfTableRowCountNotAvailable(
List<LogicalOlapScan> scans, CascadesContext context) {
public static Optional<String> disableJoinReorderIfStatsInvalid(List<LogicalOlapScan> scans,
CascadesContext context) {
StatsCalculator calculator = new StatsCalculator(context);
if (ConnectContext.get() == null) {
// ut case
return Optional.empty();
}
for (LogicalOlapScan scan : scans) {
double rowCount = calculator.getOlapTableRowCount(scan);
// analyzed rowCount may be zero, but BE-reported rowCount could be positive.
// check ndv validation when reported rowCount > 0
if (rowCount == -1 && ConnectContext.get() != null) {
// row count not available
if (rowCount == -1) {
LOG.info("disable join reorder since row count not available: "
+ scan.getTable().getNameWithFullQualifiers());
return Optional.of("table[" + scan.getTable().getName() + "] row count is invalid");
}
// ndv abnormal
Optional<String> reason = calculator.checkNdvValidation(scan, rowCount);
if (reason.isPresent()) {
try {
ConnectContext.get().getSessionVariable().disableNereidsJoinReorderOnce();
LOG.info("disable join reorder since row count not available: "
+ scan.getTable().getNameWithFullQualifiers());
LOG.info("disable join reorder since col stats invalid: "
+ reason.get());
} catch (Exception e) {
LOG.info("disableNereidsJoinReorderOnce failed");
}
return;
return reason;
}
}
return Optional.empty();
}
@Override

View File

@ -8,31 +8,31 @@ PhysicalResultSink
----------hashAgg[GLOBAL]
------------hashAgg[LOCAL]
--------------PhysicalProject
----------------hashJoin[RIGHT_SEMI_JOIN] hashCondition=((cs1.cs_order_number = cs2.cs_order_number)) otherCondition=(( not (cs_warehouse_sk = cs_warehouse_sk))) build RFs:RF4 cs_order_number->[cs_order_number]
------------------PhysicalDistribute[DistributionSpecHash]
--------------------PhysicalProject
----------------------PhysicalOlapScan[catalog_sales] apply RFs: RF4
----------------hashJoin[INNER_JOIN] hashCondition=((cs1.cs_call_center_sk = call_center.cc_call_center_sk)) otherCondition=() build RFs:RF3 cc_call_center_sk->[cs_call_center_sk]
------------------PhysicalProject
--------------------hashJoin[INNER_JOIN] hashCondition=((cs1.cs_call_center_sk = call_center.cc_call_center_sk)) otherCondition=() build RFs:RF3 cc_call_center_sk->[cs_call_center_sk]
----------------------hashJoin[RIGHT_ANTI_JOIN] hashCondition=((cs1.cs_order_number = cr1.cr_order_number)) otherCondition=() build RFs:RF2 cs_order_number->[cr_order_number]
------------------------PhysicalDistribute[DistributionSpecHash]
--------------------------PhysicalProject
----------------------------PhysicalOlapScan[catalog_returns] apply RFs: RF2
------------------------PhysicalDistribute[DistributionSpecHash]
--------------------------hashJoin[INNER_JOIN] hashCondition=((cs1.cs_ship_date_sk = date_dim.d_date_sk)) otherCondition=() build RFs:RF1 d_date_sk->[cs_ship_date_sk]
----------------------------hashJoin[INNER_JOIN] hashCondition=((cs1.cs_ship_addr_sk = customer_address.ca_address_sk)) otherCondition=() build RFs:RF0 ca_address_sk->[cs_ship_addr_sk]
--------------------hashJoin[RIGHT_SEMI_JOIN] hashCondition=((cs1.cs_order_number = cs2.cs_order_number)) otherCondition=(( not (cs_warehouse_sk = cs_warehouse_sk))) build RFs:RF2 cs_order_number->[cs_order_number]
----------------------PhysicalDistribute[DistributionSpecHash]
------------------------PhysicalProject
--------------------------PhysicalOlapScan[catalog_sales] apply RFs: RF2
----------------------PhysicalDistribute[DistributionSpecHash]
------------------------hashJoin[INNER_JOIN] hashCondition=((cs1.cs_ship_date_sk = date_dim.d_date_sk)) otherCondition=() build RFs:RF1 d_date_sk->[cs_ship_date_sk]
--------------------------hashJoin[INNER_JOIN] hashCondition=((cs1.cs_ship_addr_sk = customer_address.ca_address_sk)) otherCondition=() build RFs:RF0 ca_address_sk->[cs_ship_addr_sk]
----------------------------hashJoin[LEFT_ANTI_JOIN] hashCondition=((cs1.cs_order_number = cr1.cr_order_number)) otherCondition=()
------------------------------PhysicalProject
--------------------------------PhysicalOlapScan[catalog_sales] apply RFs: RF0 RF1 RF3
------------------------------PhysicalDistribute[DistributionSpecReplicated]
--------------------------------PhysicalProject
----------------------------------filter((customer_address.ca_state = 'WV'))
------------------------------------PhysicalOlapScan[customer_address]
----------------------------------PhysicalOlapScan[catalog_returns]
----------------------------PhysicalDistribute[DistributionSpecReplicated]
------------------------------PhysicalProject
--------------------------------filter((date_dim.d_date <= '2002-05-31') and (date_dim.d_date >= '2002-04-01'))
----------------------------------PhysicalOlapScan[date_dim]
----------------------PhysicalDistribute[DistributionSpecReplicated]
------------------------PhysicalProject
--------------------------filter(cc_county IN ('Barrow County', 'Daviess County', 'Luce County', 'Richland County', 'Ziebach County'))
----------------------------PhysicalOlapScan[call_center]
--------------------------------filter((customer_address.ca_state = 'WV'))
----------------------------------PhysicalOlapScan[customer_address]
--------------------------PhysicalDistribute[DistributionSpecReplicated]
----------------------------PhysicalProject
------------------------------filter((date_dim.d_date <= '2002-05-31') and (date_dim.d_date >= '2002-04-01'))
--------------------------------PhysicalOlapScan[date_dim]
------------------PhysicalDistribute[DistributionSpecReplicated]
--------------------PhysicalProject
----------------------filter(cc_county IN ('Barrow County', 'Daviess County', 'Luce County', 'Richland County', 'Ziebach County'))
------------------------PhysicalOlapScan[call_center]

View File

@ -8,31 +8,31 @@ PhysicalResultSink
----------hashAgg[GLOBAL]
------------hashAgg[LOCAL]
--------------PhysicalProject
----------------hashJoin[RIGHT_SEMI_JOIN] hashCondition=((cs1.cs_order_number = cs2.cs_order_number)) otherCondition=(( not (cs_warehouse_sk = cs_warehouse_sk))) build RFs:RF4 cs_order_number->[cs_order_number]
------------------PhysicalDistribute[DistributionSpecHash]
--------------------PhysicalProject
----------------------PhysicalOlapScan[catalog_sales] apply RFs: RF4
----------------hashJoin[INNER_JOIN] hashCondition=((cs1.cs_call_center_sk = call_center.cc_call_center_sk)) otherCondition=() build RFs:RF3 cc_call_center_sk->[cs_call_center_sk]
------------------PhysicalProject
--------------------hashJoin[INNER_JOIN] hashCondition=((cs1.cs_call_center_sk = call_center.cc_call_center_sk)) otherCondition=() build RFs:RF3 cc_call_center_sk->[cs_call_center_sk]
----------------------hashJoin[RIGHT_ANTI_JOIN] hashCondition=((cs1.cs_order_number = cr1.cr_order_number)) otherCondition=() build RFs:RF2 cs_order_number->[cr_order_number]
------------------------PhysicalDistribute[DistributionSpecHash]
--------------------------PhysicalProject
----------------------------PhysicalOlapScan[catalog_returns] apply RFs: RF2
------------------------PhysicalDistribute[DistributionSpecHash]
--------------------------hashJoin[INNER_JOIN] hashCondition=((cs1.cs_ship_date_sk = date_dim.d_date_sk)) otherCondition=() build RFs:RF1 d_date_sk->[cs_ship_date_sk]
----------------------------hashJoin[INNER_JOIN] hashCondition=((cs1.cs_ship_addr_sk = customer_address.ca_address_sk)) otherCondition=() build RFs:RF0 ca_address_sk->[cs_ship_addr_sk]
--------------------hashJoin[RIGHT_SEMI_JOIN] hashCondition=((cs1.cs_order_number = cs2.cs_order_number)) otherCondition=(( not (cs_warehouse_sk = cs_warehouse_sk))) build RFs:RF2 cs_order_number->[cs_order_number]
----------------------PhysicalDistribute[DistributionSpecHash]
------------------------PhysicalProject
--------------------------PhysicalOlapScan[catalog_sales] apply RFs: RF2
----------------------PhysicalDistribute[DistributionSpecHash]
------------------------hashJoin[INNER_JOIN] hashCondition=((cs1.cs_ship_date_sk = date_dim.d_date_sk)) otherCondition=() build RFs:RF1 d_date_sk->[cs_ship_date_sk]
--------------------------hashJoin[INNER_JOIN] hashCondition=((cs1.cs_ship_addr_sk = customer_address.ca_address_sk)) otherCondition=() build RFs:RF0 ca_address_sk->[cs_ship_addr_sk]
----------------------------hashJoin[LEFT_ANTI_JOIN] hashCondition=((cs1.cs_order_number = cr1.cr_order_number)) otherCondition=()
------------------------------PhysicalProject
--------------------------------PhysicalOlapScan[catalog_sales] apply RFs: RF0 RF1 RF3
------------------------------PhysicalDistribute[DistributionSpecReplicated]
--------------------------------PhysicalProject
----------------------------------filter((customer_address.ca_state = 'WV'))
------------------------------------PhysicalOlapScan[customer_address]
----------------------------------PhysicalOlapScan[catalog_returns]
----------------------------PhysicalDistribute[DistributionSpecReplicated]
------------------------------PhysicalProject
--------------------------------filter((date_dim.d_date <= '2002-05-31') and (date_dim.d_date >= '2002-04-01'))
----------------------------------PhysicalOlapScan[date_dim]
----------------------PhysicalDistribute[DistributionSpecReplicated]
------------------------PhysicalProject
--------------------------filter(cc_county IN ('Barrow County', 'Daviess County', 'Luce County', 'Richland County', 'Ziebach County'))
----------------------------PhysicalOlapScan[call_center]
--------------------------------filter((customer_address.ca_state = 'WV'))
----------------------------------PhysicalOlapScan[customer_address]
--------------------------PhysicalDistribute[DistributionSpecReplicated]
----------------------------PhysicalProject
------------------------------filter((date_dim.d_date <= '2002-05-31') and (date_dim.d_date >= '2002-04-01'))
--------------------------------PhysicalOlapScan[date_dim]
------------------PhysicalDistribute[DistributionSpecReplicated]
--------------------PhysicalProject
----------------------filter(cc_county IN ('Barrow County', 'Daviess County', 'Luce County', 'Richland County', 'Ziebach County'))
------------------------PhysicalOlapScan[call_center]

View File

@ -2336,7 +2336,7 @@ suite("load") {
"""
sql """
alter table call_center modify column cc_closed_date_sk set stats ('row_count'='42', 'ndv'='0', 'num_nulls'='42', 'min_value'='0', 'max_value'='179769313', 'data_size'='168')
alter table call_center modify column cc_closed_date_sk set stats ('row_count'='42', 'ndv'='0', 'num_nulls'='42', 'data_size'='168')
"""
sql """

View File

@ -2336,7 +2336,7 @@ suite("load") {
"""
sql """
alter table call_center modify column cc_closed_date_sk set stats ('row_count'='42', 'ndv'='0', 'num_nulls'='42', 'min_value'='0', 'max_value'='179769313', 'data_size'='168')
alter table call_center modify column cc_closed_date_sk set stats ('row_count'='42', 'ndv'='0', 'num_nulls'='42', 'data_size'='168')
"""
sql """

View File

@ -2340,7 +2340,7 @@ suite("load") {
"""
sql """
alter table call_center modify column cc_closed_date_sk set stats ('row_count'='42', 'ndv'='0', 'num_nulls'='42', 'min_value'='0', 'max_value'='179769313', 'data_size'='168')
alter table call_center modify column cc_closed_date_sk set stats ('row_count'='42', 'ndv'='0', 'num_nulls'='42', 'data_size'='168')
"""
sql """

View File

@ -1299,7 +1299,7 @@ alter table web_page modify column wp_max_ad_count set stats ('row_count'='2040'
"""
sql """
alter table call_center modify column cc_closed_date_sk set stats ('row_count'='30', 'ndv'='0', 'min_value'='2415022', 'max_value'='2488070', 'avg_size'='120', 'max_size'='120' )
alter table call_center modify column cc_closed_date_sk set stats ('row_count'='30', 'ndv'='0', 'num_nulls'='30', 'avg_size'='120', 'max_size'='120' )
"""
sql """
@ -2018,10 +2018,6 @@ sql """
alter table ship_mode modify column sm_contract set stats ('row_count'='20', 'ndv'='20', 'min_value'='2mM8l', 'max_value'='yVfotg7Tio3MVhBg6Bkn', 'avg_size'='252', 'max_size'='252' )
"""
sql """
alter table call_center modify column cc_closed_date_sk set stats ('row_count'='30', 'ndv'='0', 'min_value'='0', 'max_value'='0', 'avg_size'='120', 'max_size'='120' )
"""
sql """
alter table customer_address modify column ca_zip set stats ('row_count'='1000000', 'ndv'='7733', 'min_value'='', 'max_value'='99981', 'avg_size'='4848150', 'max_size'='4848150' )
"""

View File

@ -1,74 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
suite('tpcds_sf100_stats') {
String db = context.config.getDbNameByFile(new File(context.file.parent))
sql "use ${db}"
def stats
stats = sql """ show column stats call_center ;"""
logger.info("${stats}")
stats = sql """ show column stats catalog_page ;"""
logger.info("${stats}")
stats = sql """ show column stats catalog_returns ;"""
logger.info("${stats}")
stats = sql """ show column stats catalog_sales ;"""
logger.info("${stats}")
stats = sql """ show column stats customer ;"""
logger.info("${stats}")
stats = sql """ show column stats customer_address ;"""
logger.info("${stats}")
stats = sql """ show column stats customer_demographics ;"""
logger.info("${stats}")
stats = sql """ show column stats date_dim ;"""
logger.info("${stats}")
stats = sql """ show column stats dbgen_version ;"""
logger.info("${stats}")
stats = sql """ show column stats household_demographics ;"""
logger.info("${stats}")
stats = sql """ show column stats income_band ;"""
logger.info("${stats}")
stats = sql """ show column stats inventory ;"""
logger.info("${stats}")
stats = sql """ show column stats item ;"""
logger.info("${stats}")
stats = sql """ show column stats promotion ;"""
logger.info("${stats}")
stats = sql """ show column stats reason ;"""
logger.info("${stats}")
stats = sql """ show column stats ship_mode ;"""
logger.info("${stats}")
stats = sql """ show column stats store ;"""
logger.info("${stats}")
stats = sql """ show column stats store_returns ;"""
logger.info("${stats}")
stats = sql """ show column stats store_sales ;"""
logger.info("${stats}")
stats = sql """ show column stats time_dim ;"""
logger.info("${stats}")
stats = sql """ show column stats warehouse ;"""
logger.info("${stats}")
stats = sql """ show column stats web_page ;"""
logger.info("${stats}")
stats = sql """ show column stats web_returns ;"""
logger.info("${stats}")
stats = sql """ show column stats web_sales ;"""
logger.info("${stats}")
stats = sql """ show column stats web_site ;"""
logger.info("${stats}")
}

View File

@ -2754,7 +2754,7 @@ PARTITION `p599` VALUES IN (599)
sql """drop stats alter_test"""
alter_result = sql """show table stats alter_test"""
assertEquals("", alter_result[0][7])
sql """alter table alter_test modify column id set stats ('row_count'='100', 'ndv'='0', 'num_nulls'='0.0', 'data_size'='2.69975443E8', 'min_value'='1', 'max_value'='2');"""
sql """alter table alter_test modify column id set stats ('row_count'='100', 'ndv'='0', 'num_nulls'='0.0', 'max_value'='2');"""
alter_result = sql """show column stats alter_test(id)"""
logger.info("show column alter_test(id) stats: " + alter_result)
assertEquals(1, alter_result.size())