[enhancement](Nereids): remove stats derivation in CostAndEnforce job (#24945)

1. remove stats derivation in CostAndEnforce job
2. enforce valid for each stats after estimating
This commit is contained in:
谢健
2023-09-27 16:31:03 +08:00
committed by GitHub
parent 5fc04b6aeb
commit 9562e280af
9 changed files with 102 additions and 99 deletions

View File

@ -88,7 +88,7 @@ public class ApplyRuleJob extends Job {
} else {
// The Join Commute rule preserves the operator's expression and children,
// thereby not altering the statistics. Hence, there is no need to derive statistics for it.
groupExpression.setStatDerived(true);
newGroupExpression.setStatDerived(true);
}
} else {
pushJob(new CostAndEnforcerJob(newGroupExpression, context));
@ -100,7 +100,7 @@ public class ApplyRuleJob extends Job {
// These implementation rules integrate rules for plan shape transformation.
pushJob(new DeriveStatsJob(newGroupExpression, context));
} else {
groupExpression.setStatDerived(true);
newGroupExpression.setStatDerived(true);
}
}

View File

@ -31,7 +31,6 @@ import org.apache.doris.nereids.properties.ChildrenPropertiesRegulator;
import org.apache.doris.nereids.properties.EnforceMissingPropertiesHelper;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.properties.RequestPropertyDeriver;
import org.apache.doris.nereids.stats.StatsCalculator;
import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
@ -236,25 +235,13 @@ public class CostAndEnforcerJob extends Job implements Cloneable {
PhysicalProperties outputProperty = childOutputPropertyDeriver.getOutputProperties(groupExpression);
// update current group statistics and re-compute costs.
if (groupExpression.children().stream().anyMatch(group -> group.getStatistics() == null)) {
// TODO: If it's error, add some warning log at least.
if (groupExpression.children().stream().anyMatch(group -> group.getStatistics() == null)
&& groupExpression.getOwnerGroup().getStatistics() == null) {
// if we come here, mean that we have some error in stats calculator and should fix it.
LOG.warn("Nereids try to calculate cost without stats for group expression {}", groupExpression);
return false;
}
StatsCalculator statsCalculator = StatsCalculator.estimate(groupExpression,
context.getCascadesContext().getConnectContext().getSessionVariable().getForbidUnknownColStats(),
context.getCascadesContext().getConnectContext().getTotalColumnStatisticMap(),
context.getCascadesContext().getConnectContext().getSessionVariable().isPlayNereidsDump(),
context.getCascadesContext());
if (!context.getCascadesContext().getConnectContext().getSessionVariable().isPlayNereidsDump()
&& context.getCascadesContext().getConnectContext().getSessionVariable().isEnableMinidump()) {
context.getCascadesContext().getConnectContext().getTotalColumnStatisticMap()
.putAll(statsCalculator.getTotalColumnStatisticMap());
context.getCascadesContext().getConnectContext().getTotalHistogramMap()
.putAll(statsCalculator.getTotalHistogramMap());
}
// recompute cost after adjusting property
curNodeCost = CostCalculator.calculateCost(groupExpression, requestChildrenProperties);
groupExpression.setCost(curNodeCost);

View File

@ -220,6 +220,7 @@ public class StatsCalculator extends DefaultPlanVisitor<Statistics, Void> {
private void estimate() {
Plan plan = groupExpression.getPlan();
Statistics newStats = plan.accept(this, null);
newStats.enforceValid();
// We ensure that the rowCount remains unchanged in order to make the cost of each plan comparable.
if (groupExpression.getOwnerGroup().getStatistics() == null) {
groupExpression.getOwnerGroup().setStatistics(newStats);

View File

@ -123,6 +123,26 @@ public class Statistics {
}
}
public void enforceValid() {
for (Entry<Expression, ColumnStatistic> entry : expressionToColumnStats.entrySet()) {
ColumnStatistic columnStatistic = entry.getValue();
if (!checkColumnStatsValid(columnStatistic)) {
double ndv = Math.min(columnStatistic.ndv, rowCount);
ColumnStatisticBuilder columnStatisticBuilder = new ColumnStatisticBuilder(columnStatistic);
columnStatisticBuilder.setNdv(ndv);
columnStatisticBuilder.setNumNulls(Math.min(columnStatistic.numNulls, rowCount - ndv));
columnStatisticBuilder.setCount(rowCount);
columnStatistic = columnStatisticBuilder.build();
}
expressionToColumnStats.put(entry.getKey(), columnStatistic);
}
}
public boolean checkColumnStatsValid(ColumnStatistic columnStatistic) {
return columnStatistic.ndv <= rowCount
&& columnStatistic.numNulls <= rowCount - columnStatistic.ndv;
}
public Statistics withSel(double sel) {
sel = StatsMathUtil.minNonNaN(sel, 1);
return withRowCount(rowCount * sel);

View File

@ -285,7 +285,7 @@ public class StatsCalculatorTest {
Assertions.assertEquals(1, limitStats.getRowCount());
ColumnStatistic slot1Stats = limitStats.columnStatistics().get(slot1);
Assertions.assertEquals(1, slot1Stats.ndv, 0.1);
Assertions.assertEquals(0.5, slot1Stats.numNulls);
Assertions.assertEquals(0, slot1Stats.numNulls, 0.1);
}
@Test
@ -311,6 +311,6 @@ public class StatsCalculatorTest {
Assertions.assertEquals(1, topNStats.getRowCount());
ColumnStatistic slot1Stats = topNStats.columnStatistics().get(slot1);
Assertions.assertEquals(1, slot1Stats.ndv, 0.1);
Assertions.assertEquals(0.5, slot1Stats.numNulls);
Assertions.assertEquals(0, slot1Stats.numNulls, 0.1);
}
}

View File

@ -9,27 +9,33 @@ PhysicalResultSink
------------PhysicalDistribute
--------------hashAgg[LOCAL]
----------------PhysicalProject
------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_item_sk = item.i_item_sk))otherCondition=()
------------------hashJoin[INNER_JOIN] hashCondition=((customer_address.ca_address_sk = customer.c_current_addr_sk))otherCondition=()
--------------------PhysicalProject
----------------------filter((item.i_category = 'Jewelry'))
------------------------PhysicalOlapScan[item]
----------------------filter((customer_address.ca_gmt_offset = -7.00))
------------------------PhysicalOlapScan[customer_address]
--------------------PhysicalDistribute
----------------------PhysicalProject
------------------------hashJoin[INNER_JOIN] hashCondition=((customer_address.ca_address_sk = customer.c_current_addr_sk))otherCondition=()
----------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_item_sk = item.i_item_sk))otherCondition=()
------------------------PhysicalProject
--------------------------filter((item.i_category = 'Jewelry'))
----------------------------PhysicalOlapScan[item]
------------------------PhysicalDistribute
--------------------------PhysicalProject
----------------------------filter((customer_address.ca_gmt_offset = -7.00))
------------------------------PhysicalOlapScan[customer_address]
--------------------------PhysicalDistribute
----------------------------PhysicalProject
------------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_customer_sk = customer.c_customer_sk))otherCondition=()
--------------------------------PhysicalDistribute
----------------------------------PhysicalProject
------------------------------------PhysicalOlapScan[customer]
--------------------------------PhysicalDistribute
----------------------------------PhysicalProject
------------------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_sold_date_sk = date_dim.d_date_sk))otherCondition=()
--------------------------------------PhysicalProject
----------------------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_promo_sk = promotion.p_promo_sk))otherCondition=()
----------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_customer_sk = customer.c_customer_sk))otherCondition=()
------------------------------PhysicalProject
--------------------------------PhysicalOlapScan[customer]
------------------------------PhysicalDistribute
--------------------------------PhysicalProject
----------------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_promo_sk = promotion.p_promo_sk))otherCondition=()
------------------------------------PhysicalProject
--------------------------------------filter((((promotion.p_channel_dmail = 'Y') OR (promotion.p_channel_email = 'Y')) OR (promotion.p_channel_tv = 'Y')))
----------------------------------------PhysicalOlapScan[promotion]
------------------------------------PhysicalDistribute
--------------------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_sold_date_sk = date_dim.d_date_sk))otherCondition=()
----------------------------------------PhysicalDistribute
------------------------------------------PhysicalProject
--------------------------------------------filter((date_dim.d_moy = 11) and (date_dim.d_year = 1999))
----------------------------------------------PhysicalOlapScan[date_dim]
----------------------------------------PhysicalDistribute
------------------------------------------PhysicalProject
--------------------------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_store_sk = store.s_store_sk))otherCondition=()
----------------------------------------------PhysicalProject
@ -38,48 +44,38 @@ PhysicalResultSink
------------------------------------------------PhysicalProject
--------------------------------------------------filter((store.s_gmt_offset = -7.00))
----------------------------------------------------PhysicalOlapScan[store]
------------------------------------------PhysicalDistribute
--------------------------------------------PhysicalProject
----------------------------------------------filter((((promotion.p_channel_dmail = 'Y') OR (promotion.p_channel_email = 'Y')) OR (promotion.p_channel_tv = 'Y')))
------------------------------------------------PhysicalOlapScan[promotion]
--------------------------------------PhysicalDistribute
----------------------------------------PhysicalProject
------------------------------------------filter((date_dim.d_moy = 11) and (date_dim.d_year = 1999))
--------------------------------------------PhysicalOlapScan[date_dim]
----------PhysicalDistribute
------------hashAgg[GLOBAL]
--------------PhysicalDistribute
----------------hashAgg[LOCAL]
------------------PhysicalProject
--------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_item_sk = item.i_item_sk))otherCondition=()
--------------------hashJoin[INNER_JOIN] hashCondition=((customer_address.ca_address_sk = customer.c_current_addr_sk))otherCondition=()
----------------------PhysicalProject
------------------------filter((item.i_category = 'Jewelry'))
--------------------------PhysicalOlapScan[item]
------------------------filter((customer_address.ca_gmt_offset = -7.00))
--------------------------PhysicalOlapScan[customer_address]
----------------------PhysicalDistribute
------------------------PhysicalProject
--------------------------hashJoin[INNER_JOIN] hashCondition=((customer_address.ca_address_sk = customer.c_current_addr_sk))otherCondition=()
------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_item_sk = item.i_item_sk))otherCondition=()
--------------------------PhysicalProject
----------------------------filter((item.i_category = 'Jewelry'))
------------------------------PhysicalOlapScan[item]
--------------------------PhysicalDistribute
----------------------------PhysicalProject
------------------------------filter((customer_address.ca_gmt_offset = -7.00))
--------------------------------PhysicalOlapScan[customer_address]
----------------------------PhysicalDistribute
------------------------------PhysicalProject
--------------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_customer_sk = customer.c_customer_sk))otherCondition=()
----------------------------------PhysicalDistribute
------------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_sold_date_sk = date_dim.d_date_sk))otherCondition=()
--------------------------------PhysicalDistribute
----------------------------------PhysicalProject
------------------------------------filter((date_dim.d_moy = 11) and (date_dim.d_year = 1999))
--------------------------------------PhysicalOlapScan[date_dim]
--------------------------------PhysicalDistribute
----------------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_customer_sk = customer.c_customer_sk))otherCondition=()
------------------------------------PhysicalProject
--------------------------------------PhysicalOlapScan[customer]
----------------------------------PhysicalDistribute
------------------------------------PhysicalProject
--------------------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_sold_date_sk = date_dim.d_date_sk))otherCondition=()
----------------------------------------PhysicalProject
------------------------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_store_sk = store.s_store_sk))otherCondition=()
--------------------------------------------PhysicalProject
----------------------------------------------PhysicalOlapScan[store_sales]
--------------------------------------------PhysicalDistribute
----------------------------------------------PhysicalProject
------------------------------------------------filter((store.s_gmt_offset = -7.00))
--------------------------------------------------PhysicalOlapScan[store]
----------------------------------------PhysicalDistribute
------------------------------------PhysicalDistribute
--------------------------------------PhysicalProject
----------------------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_store_sk = store.s_store_sk))otherCondition=()
------------------------------------------PhysicalProject
--------------------------------------------filter((date_dim.d_moy = 11) and (date_dim.d_year = 1999))
----------------------------------------------PhysicalOlapScan[date_dim]
--------------------------------------------PhysicalOlapScan[store_sales]
------------------------------------------PhysicalDistribute
--------------------------------------------PhysicalProject
----------------------------------------------filter((store.s_gmt_offset = -7.00))
------------------------------------------------PhysicalOlapScan[store]

View File

@ -12,35 +12,34 @@ PhysicalResultSink
------------------PhysicalDistribute
--------------------PhysicalProject
----------------------hashJoin[INNER_JOIN] hashCondition=((tmp.sold_item_sk = item.i_item_sk))otherCondition=()
------------------------PhysicalDistribute
--------------------------PhysicalUnion
----------------------------PhysicalDistribute
------------------------------PhysicalProject
--------------------------------hashJoin[INNER_JOIN] hashCondition=((date_dim.d_date_sk = web_sales.ws_sold_date_sk))otherCondition=()
------------------------PhysicalUnion
--------------------------PhysicalDistribute
----------------------------PhysicalProject
------------------------------hashJoin[INNER_JOIN] hashCondition=((date_dim.d_date_sk = web_sales.ws_sold_date_sk))otherCondition=()
--------------------------------PhysicalProject
----------------------------------PhysicalOlapScan[web_sales]
--------------------------------PhysicalDistribute
----------------------------------PhysicalProject
------------------------------------PhysicalOlapScan[web_sales]
----------------------------------PhysicalDistribute
------------------------------------PhysicalProject
--------------------------------------filter((date_dim.d_moy = 12) and (date_dim.d_year = 1998))
----------------------------------------PhysicalOlapScan[date_dim]
----------------------------PhysicalDistribute
------------------------------PhysicalProject
--------------------------------hashJoin[INNER_JOIN] hashCondition=((date_dim.d_date_sk = catalog_sales.cs_sold_date_sk))otherCondition=()
------------------------------------filter((date_dim.d_moy = 12) and (date_dim.d_year = 1998))
--------------------------------------PhysicalOlapScan[date_dim]
--------------------------PhysicalDistribute
----------------------------PhysicalProject
------------------------------hashJoin[INNER_JOIN] hashCondition=((date_dim.d_date_sk = catalog_sales.cs_sold_date_sk))otherCondition=()
--------------------------------PhysicalProject
----------------------------------PhysicalOlapScan[catalog_sales]
--------------------------------PhysicalDistribute
----------------------------------PhysicalProject
------------------------------------PhysicalOlapScan[catalog_sales]
----------------------------------PhysicalDistribute
------------------------------------PhysicalProject
--------------------------------------filter((date_dim.d_moy = 12) and (date_dim.d_year = 1998))
----------------------------------------PhysicalOlapScan[date_dim]
----------------------------PhysicalDistribute
------------------------------PhysicalProject
--------------------------------hashJoin[INNER_JOIN] hashCondition=((date_dim.d_date_sk = store_sales.ss_sold_date_sk))otherCondition=()
------------------------------------filter((date_dim.d_moy = 12) and (date_dim.d_year = 1998))
--------------------------------------PhysicalOlapScan[date_dim]
--------------------------PhysicalDistribute
----------------------------PhysicalProject
------------------------------hashJoin[INNER_JOIN] hashCondition=((date_dim.d_date_sk = store_sales.ss_sold_date_sk))otherCondition=()
--------------------------------PhysicalProject
----------------------------------PhysicalOlapScan[store_sales]
--------------------------------PhysicalDistribute
----------------------------------PhysicalProject
------------------------------------PhysicalOlapScan[store_sales]
----------------------------------PhysicalDistribute
------------------------------------PhysicalProject
--------------------------------------filter((date_dim.d_moy = 12) and (date_dim.d_year = 1998))
----------------------------------------PhysicalOlapScan[date_dim]
------------------------------------filter((date_dim.d_moy = 12) and (date_dim.d_year = 1998))
--------------------------------------PhysicalOlapScan[date_dim]
------------------------PhysicalDistribute
--------------------------PhysicalProject
----------------------------filter((item.i_manager_id = 1))

View File

@ -96,5 +96,5 @@ limit 100;
// File file = new File(outFile)
// file.write(getRuntimeFilters(plan))
assertEquals("RF10[ss_item_sk->[i_item_sk],RF9[c_current_addr_sk->[ca_address_sk],RF8[ss_customer_sk->[c_customer_sk],RF7[d_date_sk->[ss_sold_date_sk],RF6[p_promo_sk->[ss_promo_sk],RF5[s_store_sk->[ss_store_sk],RF4[ss_item_sk->[i_item_sk],RF3[c_current_addr_sk->[ca_address_sk],RF2[ss_customer_sk->[c_customer_sk],RF1[d_date_sk->[ss_sold_date_sk],RF0[s_store_sk->[ss_store_sk]", getRuntimeFilters(plan))
assertEquals("RF10[c_current_addr_sk->[ca_address_sk],RF9[ss_item_sk->[i_item_sk],RF8[ss_customer_sk->[c_customer_sk],RF7[ss_promo_sk->[p_promo_sk],RF6[ss_sold_date_sk->[d_date_sk],RF5[s_store_sk->[ss_store_sk],RF4[c_current_addr_sk->[ca_address_sk],RF3[ss_item_sk->[i_item_sk],RF2[ss_sold_date_sk->[d_date_sk],RF1[ss_customer_sk->[c_customer_sk],RF0[s_store_sk->[ss_store_sk]", getRuntimeFilters(plan))
}

View File

@ -92,5 +92,5 @@ suite("ds_rf71") {
// File file = new File(outFile)
// file.write(getRuntimeFilters(plan))
assertEquals("RF0[d_date_sk->[ws_sold_date_sk],RF1[d_date_sk->[cs_sold_date_sk],RF2[d_date_sk->[ss_sold_date_sk]", getRuntimeFilters(plan))
assertEquals("RF3[i_item_sk->[ws_item_sk, cs_item_sk, ss_item_sk],RF0[d_date_sk->[ws_sold_date_sk],RF1[d_date_sk->[cs_sold_date_sk],RF2[d_date_sk->[ss_sold_date_sk]", getRuntimeFilters(plan))
}