[fix](nereids) removeRuntimeFilter() removes more than one RFs if there are different types of RF from the buildNode to the target (#31197)

this bug can be produced on tpcds 95, when set runtime filter type: min-max + bloom
This commit is contained in:
minghong
2024-02-21 15:53:23 +08:00
committed by yiguolei
parent 49411a3139
commit f1fbfeba2f
8 changed files with 61 additions and 34 deletions

View File

@ -181,12 +181,12 @@ public class RuntimeFilterContext {
}
/**
* remove rf from builderNode to target
* remove the given target from runtime filters from builderNode to target with all runtime filter types
*
* @param targetId rf target
* @param builderNode rf src
*/
public void removeFilter(ExprId targetId, PhysicalHashJoin builderNode) {
public void removeFilters(ExprId targetId, PhysicalHashJoin builderNode) {
List<RuntimeFilter> filters = targetExprIdToFilter.get(targetId);
if (filters != null) {
Iterator<RuntimeFilter> filterIter = filters.iterator();
@ -219,6 +219,33 @@ public class RuntimeFilterContext {
}
}
/**
* remove one target from rf, and if there is no target, remove the rf
*/
public void removeFilter(RuntimeFilter rf, ExprId targetId) {
Iterator<Slot> targetSlotIter = rf.getTargetSlots().listIterator();
Iterator<PhysicalRelation> targetScanIter = rf.getTargetScans().iterator();
Iterator<Expression> targetExpressionIter = rf.getTargetExpressions().iterator();
Slot targetSlot;
PhysicalRelation targetScan;
while (targetScanIter.hasNext() && targetSlotIter.hasNext() && targetExpressionIter.hasNext()) {
targetExpressionIter.next();
targetScan = targetScanIter.next();
targetSlot = targetSlotIter.next();
if (targetSlot.getExprId().equals(targetId)) {
targetScan.removeAppliedRuntimeFilter(rf);
targetExpressionIter.remove();
targetScanIter.remove();
targetSlotIter.remove();
}
}
if (rf.getTargetSlots().isEmpty()) {
rf.getBuilderNode().getRuntimeFilters().remove(rf);
targetExprIdToFilter.get(targetId).remove(rf);
prunedRF.add(rf);
}
}
public void setTargetsOnScanNode(PhysicalRelation relation, Slot slot) {
this.targetOnOlapScanNodeMap.computeIfAbsent(relation, k -> Lists.newArrayList()).add(slot);
}

View File

@ -224,8 +224,8 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
);
if (pushedDown) {
rfCtx.removeFilter(
rightDeepTargetExpressionOnCTE.getInputSlotExprIds().iterator().next(),
(PhysicalHashJoin) rfToPush.getBuilderNode());
rfToPush,
rightDeepTargetExpressionOnCTE.getInputSlotExprIds().iterator().next());
}
}
}

View File

@ -147,7 +147,7 @@ public class RuntimeFilterPruner extends PlanPostProcessor {
outputExprIdOfExpandTargets.addAll(expand.target2.getOutputExprIds());
rfContext.getTargetExprIdByFilterJoin(join)
.stream().filter(exprId -> outputExprIdOfExpandTargets.contains(exprId))
.forEach(exprId -> rfContext.removeFilter(exprId, join));
.forEach(exprId -> rfContext.removeFilters(exprId, join));
}
}
RuntimeFilterContext.EffectiveSrcType childType =
@ -163,7 +163,7 @@ public class RuntimeFilterPruner extends PlanPostProcessor {
}
}
if (!isEffective) {
exprIds.stream().forEach(exprId -> rfContext.removeFilter(exprId, join));
exprIds.stream().forEach(exprId -> rfContext.removeFilters(exprId, join));
}
}
}

View File

@ -52,7 +52,7 @@ public class RuntimeFilterPrunerForExternalTable extends PlanPostProcessor {
for (int i = 0; i < rf.getTargetScans().size(); i++) {
PhysicalRelation scan = rf.getTargetScans().get(i);
if (canPrune(scan, joinAncestors)) {
rfCtx.removeFilter(rf.getTargetSlots().get(i).getExprId(), (PhysicalHashJoin) join);
rfCtx.removeFilters(rf.getTargetSlots().get(i).getExprId(), (PhysicalHashJoin) join);
}
}
}

View File

@ -3,13 +3,13 @@
PhysicalCteAnchor ( cteId=CTEId#0 )
--PhysicalCteProducer ( cteId=CTEId#0 )
----PhysicalProject
------hashJoin[INNER_JOIN] hashCondition=((ws1.ws_order_number = ws2.ws_order_number)) otherCondition=(( not (ws_warehouse_sk = ws_warehouse_sk))) build RFs:RF0 ws_order_number->[ws_order_number]
------hashJoin[INNER_JOIN] hashCondition=((ws1.ws_order_number = ws2.ws_order_number)) otherCondition=(( not (ws_warehouse_sk = ws_warehouse_sk))) build RFs:RF0 ws_order_number->[ws_order_number];RF1 ws_order_number->[ws_order_number]
--------PhysicalDistribute[DistributionSpecHash]
----------PhysicalProject
------------PhysicalOlapScan[web_sales] apply RFs: RF0 RF7
------------PhysicalOlapScan[web_sales] apply RFs: RF0 RF1 RF14 RF15
--------PhysicalDistribute[DistributionSpecHash]
----------PhysicalProject
------------PhysicalOlapScan[web_sales] apply RFs: RF7
------------PhysicalOlapScan[web_sales] apply RFs: RF14 RF15
--PhysicalResultSink
----PhysicalTopN[MERGE_SORT]
------PhysicalTopN[LOCAL_SORT]
@ -19,25 +19,25 @@ PhysicalCteAnchor ( cteId=CTEId#0 )
--------------hashAgg[GLOBAL]
----------------hashAgg[LOCAL]
------------------PhysicalProject
--------------------hashJoin[RIGHT_SEMI_JOIN] hashCondition=((ws1.ws_order_number = ws_wh.ws_order_number)) otherCondition=() build RFs:RF6 ws_order_number->[ws_order_number]
--------------------hashJoin[RIGHT_SEMI_JOIN] hashCondition=((ws1.ws_order_number = ws_wh.ws_order_number)) otherCondition=() build RFs:RF12 ws_order_number->[ws_order_number];RF13 ws_order_number->[ws_order_number]
----------------------PhysicalDistribute[DistributionSpecHash]
------------------------PhysicalProject
--------------------------PhysicalCteConsumer ( cteId=CTEId#0 ) apply RFs: RF6
----------------------hashJoin[INNER_JOIN] hashCondition=((ws1.ws_web_site_sk = web_site.web_site_sk)) otherCondition=() build RFs:RF5 web_site_sk->[ws_web_site_sk]
------------------------hashJoin[INNER_JOIN] hashCondition=((ws1.ws_ship_addr_sk = customer_address.ca_address_sk)) otherCondition=() build RFs:RF4 ca_address_sk->[ws_ship_addr_sk]
--------------------------hashJoin[INNER_JOIN] hashCondition=((ws1.ws_ship_date_sk = date_dim.d_date_sk)) otherCondition=() build RFs:RF3 d_date_sk->[ws_ship_date_sk]
----------------------------hashJoin[RIGHT_SEMI_JOIN] hashCondition=((ws1.ws_order_number = web_returns.wr_order_number)) otherCondition=() build RFs:RF2 ws_order_number->[wr_order_number];RF7 ws_order_number->[ws_order_number,ws_order_number]
--------------------------PhysicalCteConsumer ( cteId=CTEId#0 ) apply RFs: RF12 RF13
----------------------hashJoin[INNER_JOIN] hashCondition=((ws1.ws_web_site_sk = web_site.web_site_sk)) otherCondition=() build RFs:RF10 web_site_sk->[ws_web_site_sk];RF11 web_site_sk->[ws_web_site_sk]
------------------------hashJoin[INNER_JOIN] hashCondition=((ws1.ws_ship_addr_sk = customer_address.ca_address_sk)) otherCondition=() build RFs:RF8 ca_address_sk->[ws_ship_addr_sk];RF9 ca_address_sk->[ws_ship_addr_sk]
--------------------------hashJoin[INNER_JOIN] hashCondition=((ws1.ws_ship_date_sk = date_dim.d_date_sk)) otherCondition=() build RFs:RF6 d_date_sk->[ws_ship_date_sk];RF7 d_date_sk->[ws_ship_date_sk]
----------------------------hashJoin[RIGHT_SEMI_JOIN] hashCondition=((ws1.ws_order_number = web_returns.wr_order_number)) otherCondition=() build RFs:RF4 ws_order_number->[wr_order_number];RF5 ws_order_number->[wr_order_number];RF14 ws_order_number->[ws_order_number,ws_order_number];RF15 ws_order_number->[ws_order_number,ws_order_number]
------------------------------PhysicalProject
--------------------------------hashJoin[INNER_JOIN] hashCondition=((web_returns.wr_order_number = ws_wh.ws_order_number)) otherCondition=() build RFs:RF1 wr_order_number->[ws_order_number]
--------------------------------hashJoin[INNER_JOIN] hashCondition=((web_returns.wr_order_number = ws_wh.ws_order_number)) otherCondition=() build RFs:RF2 wr_order_number->[ws_order_number];RF3 wr_order_number->[ws_order_number]
----------------------------------PhysicalDistribute[DistributionSpecHash]
------------------------------------PhysicalProject
--------------------------------------PhysicalCteConsumer ( cteId=CTEId#0 ) apply RFs: RF1
--------------------------------------PhysicalCteConsumer ( cteId=CTEId#0 ) apply RFs: RF2 RF3
----------------------------------PhysicalDistribute[DistributionSpecHash]
------------------------------------PhysicalProject
--------------------------------------PhysicalOlapScan[web_returns] apply RFs: RF2
--------------------------------------PhysicalOlapScan[web_returns] apply RFs: RF4 RF5
------------------------------PhysicalDistribute[DistributionSpecHash]
--------------------------------PhysicalProject
----------------------------------PhysicalOlapScan[web_sales] apply RFs: RF3 RF4 RF5
----------------------------------PhysicalOlapScan[web_sales] apply RFs: RF6 RF7 RF8 RF9 RF10 RF11
----------------------------PhysicalDistribute[DistributionSpecReplicated]
------------------------------PhysicalProject
--------------------------------filter((date_dim.d_date <= '1999-04-02') and (date_dim.d_date >= '1999-02-01'))

View File

@ -3,13 +3,13 @@
PhysicalCteAnchor ( cteId=CTEId#0 )
--PhysicalCteProducer ( cteId=CTEId#0 )
----PhysicalProject
------hashJoin[INNER_JOIN] hashCondition=((ws1.ws_order_number = ws2.ws_order_number)) otherCondition=(( not (ws_warehouse_sk = ws_warehouse_sk))) build RFs:RF0 ws_order_number->[ws_order_number]
------hashJoin[INNER_JOIN] hashCondition=((ws1.ws_order_number = ws2.ws_order_number)) otherCondition=(( not (ws_warehouse_sk = ws_warehouse_sk))) build RFs:RF0 ws_order_number->[ws_order_number];RF1 ws_order_number->[ws_order_number]
--------PhysicalDistribute[DistributionSpecHash]
----------PhysicalProject
------------PhysicalOlapScan[web_sales] apply RFs: RF0 RF7
------------PhysicalOlapScan[web_sales] apply RFs: RF0 RF1 RF14 RF15
--------PhysicalDistribute[DistributionSpecHash]
----------PhysicalProject
------------PhysicalOlapScan[web_sales] apply RFs: RF7
------------PhysicalOlapScan[web_sales] apply RFs: RF14 RF15
--PhysicalResultSink
----PhysicalTopN[MERGE_SORT]
------PhysicalTopN[LOCAL_SORT]
@ -19,25 +19,25 @@ PhysicalCteAnchor ( cteId=CTEId#0 )
--------------hashAgg[GLOBAL]
----------------hashAgg[LOCAL]
------------------PhysicalProject
--------------------hashJoin[RIGHT_SEMI_JOIN] hashCondition=((ws1.ws_order_number = web_returns.wr_order_number)) otherCondition=() build RFs:RF6 ws_order_number->[ws_order_number,wr_order_number]
--------------------hashJoin[RIGHT_SEMI_JOIN] hashCondition=((ws1.ws_order_number = web_returns.wr_order_number)) otherCondition=() build RFs:RF12 ws_order_number->[ws_order_number,wr_order_number];RF13 ws_order_number->[ws_order_number,wr_order_number]
----------------------PhysicalProject
------------------------hashJoin[INNER_JOIN] hashCondition=((web_returns.wr_order_number = ws_wh.ws_order_number)) otherCondition=() build RFs:RF5 wr_order_number->[ws_order_number]
------------------------hashJoin[INNER_JOIN] hashCondition=((web_returns.wr_order_number = ws_wh.ws_order_number)) otherCondition=() build RFs:RF10 wr_order_number->[ws_order_number];RF11 wr_order_number->[ws_order_number]
--------------------------PhysicalDistribute[DistributionSpecHash]
----------------------------PhysicalProject
------------------------------PhysicalCteConsumer ( cteId=CTEId#0 ) apply RFs: RF5 RF6
------------------------------PhysicalCteConsumer ( cteId=CTEId#0 ) apply RFs: RF10 RF11 RF12 RF13
--------------------------PhysicalDistribute[DistributionSpecHash]
----------------------------PhysicalProject
------------------------------PhysicalOlapScan[web_returns] apply RFs: RF6
----------------------hashJoin[RIGHT_SEMI_JOIN] hashCondition=((ws1.ws_order_number = ws_wh.ws_order_number)) otherCondition=() build RFs:RF7 ws_order_number->[ws_order_number,ws_order_number]
------------------------------PhysicalOlapScan[web_returns] apply RFs: RF12 RF13
----------------------hashJoin[RIGHT_SEMI_JOIN] hashCondition=((ws1.ws_order_number = ws_wh.ws_order_number)) otherCondition=() build RFs:RF14 ws_order_number->[ws_order_number,ws_order_number];RF15 ws_order_number->[ws_order_number,ws_order_number]
------------------------PhysicalDistribute[DistributionSpecHash]
--------------------------PhysicalProject
----------------------------PhysicalCteConsumer ( cteId=CTEId#0 )
------------------------PhysicalDistribute[DistributionSpecHash]
--------------------------hashJoin[INNER_JOIN] hashCondition=((ws1.ws_web_site_sk = web_site.web_site_sk)) otherCondition=() build RFs:RF3 web_site_sk->[ws_web_site_sk]
----------------------------hashJoin[INNER_JOIN] hashCondition=((ws1.ws_ship_date_sk = date_dim.d_date_sk)) otherCondition=() build RFs:RF2 d_date_sk->[ws_ship_date_sk]
------------------------------hashJoin[INNER_JOIN] hashCondition=((ws1.ws_ship_addr_sk = customer_address.ca_address_sk)) otherCondition=() build RFs:RF1 ca_address_sk->[ws_ship_addr_sk]
--------------------------hashJoin[INNER_JOIN] hashCondition=((ws1.ws_web_site_sk = web_site.web_site_sk)) otherCondition=() build RFs:RF6 web_site_sk->[ws_web_site_sk];RF7 web_site_sk->[ws_web_site_sk]
----------------------------hashJoin[INNER_JOIN] hashCondition=((ws1.ws_ship_date_sk = date_dim.d_date_sk)) otherCondition=() build RFs:RF4 d_date_sk->[ws_ship_date_sk];RF5 d_date_sk->[ws_ship_date_sk]
------------------------------hashJoin[INNER_JOIN] hashCondition=((ws1.ws_ship_addr_sk = customer_address.ca_address_sk)) otherCondition=() build RFs:RF2 ca_address_sk->[ws_ship_addr_sk];RF3 ca_address_sk->[ws_ship_addr_sk]
--------------------------------PhysicalProject
----------------------------------PhysicalOlapScan[web_sales] apply RFs: RF1 RF2 RF3
----------------------------------PhysicalOlapScan[web_sales] apply RFs: RF2 RF3 RF4 RF5 RF6 RF7
--------------------------------PhysicalDistribute[DistributionSpecReplicated]
----------------------------------PhysicalProject
------------------------------------filter((customer_address.ca_state = 'NC'))

View File

@ -28,7 +28,7 @@ sql 'set enable_runtime_filter_prune=false'
sql 'set parallel_pipeline_task_num=8'
sql 'set forbid_unknown_col_stats=false'
sql 'set enable_stats=false'
sql "set runtime_filter_type=8"
sql "set runtime_filter_type=12"
sql 'set broadcast_row_count_limit = 30000000'
sql 'set enable_nereids_timeout = false'
sql 'SET enable_pipeline_engine = true'

View File

@ -29,7 +29,7 @@ suite("query95") {
sql 'set forbid_unknown_col_stats=true'
sql 'set enable_nereids_timeout = false'
sql 'set enable_runtime_filter_prune=false'
sql 'set runtime_filter_type=8'
sql 'set runtime_filter_type=12'
def ds = """with ws_wh as
(select ws1.ws_order_number,ws1.ws_warehouse_sk wh1,ws2.ws_warehouse_sk wh2
from web_sales ws1,web_sales ws2