[fix](nereids) make runtime filter order stable #29203

This commit is contained in:
minghong
2023-12-29 01:00:27 +08:00
committed by GitHub
parent 69c90b1640
commit ce13a1d951
7 changed files with 17 additions and 15 deletions

View File

@ -121,17 +121,17 @@ public class RuntimeFilterContext {
// you can see disjoint set data structure to learn the processing detailed.
private final Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap = Maps.newHashMap();
private final Map<Slot, ScanNode> scanNodeOfLegacyRuntimeFilterTarget = Maps.newHashMap();
private final Map<Slot, ScanNode> scanNodeOfLegacyRuntimeFilterTarget = Maps.newLinkedHashMap();
private final Set<Plan> effectiveSrcNodes = Sets.newHashSet();
// cte to related joins map which can extract common runtime filter to cte inside
private final Map<CTEId, Set<PhysicalHashJoin>> cteToJoinsMap = Maps.newHashMap();
private final Map<CTEId, Set<PhysicalHashJoin>> cteToJoinsMap = Maps.newLinkedHashMap();
// cte candidates which can be pushed into common runtime filter into from outside
private final Map<PhysicalCTEProducer, Map<EqualTo, PhysicalHashJoin>> cteRFPushDownMap = Maps.newLinkedHashMap();
private final Map<CTEId, PhysicalCTEProducer> cteProducerMap = Maps.newHashMap();
private final Map<CTEId, PhysicalCTEProducer> cteProducerMap = Maps.newLinkedHashMap();
// cte whose runtime filter has been extracted
private final Set<CTEId> processedCTE = Sets.newHashSet();

View File

@ -71,6 +71,7 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -392,7 +393,7 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
private void collectPushDownCTEInfos(PhysicalHashJoin<? extends Plan, ? extends Plan> join,
CascadesContext context) {
RuntimeFilterContext ctx = context.getRuntimeFilterContext();
Set<CTEId> cteIds = new HashSet<>();
Set<CTEId> cteIds = new LinkedHashSet<>(); // use LinkedHashSet to make runtime filter order stable
PhysicalPlan leftChild = (PhysicalPlan) join.left();
PhysicalPlan rightChild = (PhysicalPlan) join.right();
@ -404,7 +405,8 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
if ((leftHasCTE && !rightHasCTE) || (!leftHasCTE && rightHasCTE)) {
for (CTEId id : cteIds) {
if (ctx.getCteToJoinsMap().get(id) == null) {
Set<PhysicalHashJoin> newJoin = new HashSet<>();
// use LinkedHashSet to make runtime filter order stable
Set<PhysicalHashJoin> newJoin = new LinkedHashSet<>();
newJoin.add(join);
ctx.getCteToJoinsMap().put(id, newJoin);
} else {

View File

@ -21,14 +21,14 @@ PhysicalCteAnchor ( cteId=CTEId#0 )
------------------PhysicalProject
--------------------hashJoin[RIGHT_SEMI_JOIN] hashCondition=((ws1.ws_order_number = web_returns.wr_order_number)) otherCondition=()
----------------------PhysicalProject
------------------------hashJoin[INNER_JOIN] hashCondition=((web_returns.wr_order_number = ws_wh.ws_order_number)) otherCondition=() build RFs:RF5 wr_order_number->[ws_order_number];RF6 wr_order_number->[ws_order_number,ws_order_number]
------------------------hashJoin[INNER_JOIN] hashCondition=((web_returns.wr_order_number = ws_wh.ws_order_number)) otherCondition=() build RFs:RF5 wr_order_number->[ws_order_number];RF7 wr_order_number->[ws_order_number,ws_order_number]
--------------------------PhysicalDistribute
----------------------------PhysicalProject
------------------------------PhysicalCteConsumer ( cteId=CTEId#0 )
--------------------------PhysicalDistribute
----------------------------PhysicalProject
------------------------------PhysicalOlapScan[web_returns]
----------------------hashJoin[RIGHT_SEMI_JOIN] hashCondition=((ws1.ws_order_number = ws_wh.ws_order_number)) otherCondition=() build RFs:RF4 ws_order_number->[ws_order_number];RF7 ws_order_number->[ws_order_number,ws_order_number]
----------------------hashJoin[RIGHT_SEMI_JOIN] hashCondition=((ws1.ws_order_number = ws_wh.ws_order_number)) otherCondition=() build RFs:RF4 ws_order_number->[ws_order_number];RF6 ws_order_number->[ws_order_number,ws_order_number]
------------------------PhysicalDistribute
--------------------------PhysicalProject
----------------------------PhysicalCteConsumer ( cteId=CTEId#0 )

View File

@ -21,14 +21,14 @@ PhysicalCteAnchor ( cteId=CTEId#0 )
------------------PhysicalProject
--------------------hashJoin[RIGHT_SEMI_JOIN] hashCondition=((ws1.ws_order_number = web_returns.wr_order_number)) otherCondition=()
----------------------PhysicalProject
------------------------hashJoin[INNER_JOIN] hashCondition=((web_returns.wr_order_number = ws_wh.ws_order_number)) otherCondition=() build RFs:RF5 wr_order_number->[ws_order_number];RF6 wr_order_number->[ws_order_number,ws_order_number]
------------------------hashJoin[INNER_JOIN] hashCondition=((web_returns.wr_order_number = ws_wh.ws_order_number)) otherCondition=() build RFs:RF5 wr_order_number->[ws_order_number];RF7 wr_order_number->[ws_order_number,ws_order_number]
--------------------------PhysicalDistribute
----------------------------PhysicalProject
------------------------------PhysicalCteConsumer ( cteId=CTEId#0 )
--------------------------PhysicalDistribute
----------------------------PhysicalProject
------------------------------PhysicalOlapScan[web_returns]
----------------------hashJoin[RIGHT_SEMI_JOIN] hashCondition=((ws1.ws_order_number = ws_wh.ws_order_number)) otherCondition=() build RFs:RF4 ws_order_number->[ws_order_number];RF7 ws_order_number->[ws_order_number,ws_order_number]
----------------------hashJoin[RIGHT_SEMI_JOIN] hashCondition=((ws1.ws_order_number = ws_wh.ws_order_number)) otherCondition=() build RFs:RF4 ws_order_number->[ws_order_number];RF6 ws_order_number->[ws_order_number,ws_order_number]
------------------------PhysicalDistribute
--------------------------PhysicalProject
----------------------------PhysicalCteConsumer ( cteId=CTEId#0 )

View File

@ -21,14 +21,14 @@ PhysicalCteAnchor ( cteId=CTEId#0 )
------------------PhysicalProject
--------------------hashJoin[RIGHT_SEMI_JOIN] hashCondition=((ws1.ws_order_number = web_returns.wr_order_number)) otherCondition=()
----------------------PhysicalProject
------------------------hashJoin[INNER_JOIN] hashCondition=((web_returns.wr_order_number = ws_wh.ws_order_number)) otherCondition=() build RFs:RF5 wr_order_number->[ws_order_number];RF6 wr_order_number->[ws_order_number,ws_order_number]
------------------------hashJoin[INNER_JOIN] hashCondition=((web_returns.wr_order_number = ws_wh.ws_order_number)) otherCondition=() build RFs:RF5 wr_order_number->[ws_order_number];RF7 wr_order_number->[ws_order_number,ws_order_number]
--------------------------PhysicalDistribute
----------------------------PhysicalProject
------------------------------PhysicalCteConsumer ( cteId=CTEId#0 )
--------------------------PhysicalDistribute
----------------------------PhysicalProject
------------------------------PhysicalOlapScan[web_returns]
----------------------hashJoin[RIGHT_SEMI_JOIN] hashCondition=((ws1.ws_order_number = ws_wh.ws_order_number)) otherCondition=() build RFs:RF4 ws_order_number->[ws_order_number];RF7 ws_order_number->[ws_order_number,ws_order_number]
----------------------hashJoin[RIGHT_SEMI_JOIN] hashCondition=((ws1.ws_order_number = ws_wh.ws_order_number)) otherCondition=() build RFs:RF4 ws_order_number->[ws_order_number];RF6 ws_order_number->[ws_order_number,ws_order_number]
------------------------PhysicalDistribute
--------------------------PhysicalProject
----------------------------PhysicalCteConsumer ( cteId=CTEId#0 )

View File

@ -21,14 +21,14 @@ PhysicalCteAnchor ( cteId=CTEId#0 )
------------------PhysicalProject
--------------------hashJoin[RIGHT_SEMI_JOIN] hashCondition=((ws1.ws_order_number = web_returns.wr_order_number)) otherCondition=()
----------------------PhysicalProject
------------------------hashJoin[INNER_JOIN] hashCondition=((web_returns.wr_order_number = ws_wh.ws_order_number)) otherCondition=() build RFs:RF5 wr_order_number->[ws_order_number];RF6 wr_order_number->[ws_order_number,ws_order_number]
------------------------hashJoin[INNER_JOIN] hashCondition=((web_returns.wr_order_number = ws_wh.ws_order_number)) otherCondition=() build RFs:RF5 wr_order_number->[ws_order_number];RF7 wr_order_number->[ws_order_number,ws_order_number]
--------------------------PhysicalDistribute
----------------------------PhysicalProject
------------------------------PhysicalCteConsumer ( cteId=CTEId#0 )
--------------------------PhysicalDistribute
----------------------------PhysicalProject
------------------------------PhysicalOlapScan[web_returns]
----------------------hashJoin[RIGHT_SEMI_JOIN] hashCondition=((ws1.ws_order_number = ws_wh.ws_order_number)) otherCondition=() build RFs:RF4 ws_order_number->[ws_order_number];RF7 ws_order_number->[ws_order_number,ws_order_number]
----------------------hashJoin[RIGHT_SEMI_JOIN] hashCondition=((ws1.ws_order_number = ws_wh.ws_order_number)) otherCondition=() build RFs:RF4 ws_order_number->[ws_order_number];RF6 ws_order_number->[ws_order_number,ws_order_number]
------------------------PhysicalDistribute
--------------------------PhysicalProject
----------------------------PhysicalCteConsumer ( cteId=CTEId#0 )

View File

@ -21,14 +21,14 @@ PhysicalCteAnchor ( cteId=CTEId#0 )
------------------PhysicalProject
--------------------hashJoin[RIGHT_SEMI_JOIN] hashCondition=((ws1.ws_order_number = web_returns.wr_order_number)) otherCondition=()
----------------------PhysicalProject
------------------------hashJoin[INNER_JOIN] hashCondition=((web_returns.wr_order_number = ws_wh.ws_order_number)) otherCondition=() build RFs:RF5 wr_order_number->[ws_order_number];RF6 wr_order_number->[ws_order_number,ws_order_number]
------------------------hashJoin[INNER_JOIN] hashCondition=((web_returns.wr_order_number = ws_wh.ws_order_number)) otherCondition=() build RFs:RF5 wr_order_number->[ws_order_number];RF7 wr_order_number->[ws_order_number,ws_order_number]
--------------------------PhysicalDistribute
----------------------------PhysicalProject
------------------------------PhysicalCteConsumer ( cteId=CTEId#0 )
--------------------------PhysicalDistribute
----------------------------PhysicalProject
------------------------------PhysicalOlapScan[web_returns]
----------------------hashJoin[RIGHT_SEMI_JOIN] hashCondition=((ws1.ws_order_number = ws_wh.ws_order_number)) otherCondition=() build RFs:RF4 ws_order_number->[ws_order_number];RF7 ws_order_number->[ws_order_number,ws_order_number]
----------------------hashJoin[RIGHT_SEMI_JOIN] hashCondition=((ws1.ws_order_number = ws_wh.ws_order_number)) otherCondition=() build RFs:RF4 ws_order_number->[ws_order_number];RF6 ws_order_number->[ws_order_number,ws_order_number]
------------------------PhysicalDistribute
--------------------------PhysicalProject
----------------------------PhysicalCteConsumer ( cteId=CTEId#0 )