diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java index 6372657378..2683754d77 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java @@ -42,7 +42,6 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute; import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter; import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin; import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin; -import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan; import org.apache.doris.nereids.trees.plans.physical.PhysicalOneRowRelation; import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan; import org.apache.doris.nereids.trees.plans.physical.PhysicalProject; @@ -84,7 +83,7 @@ public class RuntimeFilterGenerator extends PlanPostProcessor { ); private static final Set> SPJ_PLAN = ImmutableSet.of( - PhysicalOlapScan.class, + PhysicalRelation.class, PhysicalProject.class, PhysicalFilter.class, PhysicalDistribute.class, @@ -103,7 +102,7 @@ public class RuntimeFilterGenerator extends PlanPostProcessor { * second step: if encounter project, collect the association of its child and it for pushing down through * the project node. * plan translation: - * third step: generate nereids runtime filter target at olap scan node fragment. + * third step: generate nereids runtime filter target at scan node fragment. * forth step: generate legacy runtime filter target and runtime filter at hash join node fragment. * NOTICE: bottom-up travel the plan tree!!! */ @@ -180,17 +179,17 @@ public class RuntimeFilterGenerator extends PlanPostProcessor { TRuntimeFilterType type = TRuntimeFilterType.BITMAP; Set targetSlots = bitmapContains.child(1).getInputSlots(); for (Slot targetSlot : targetSlots) { - if (!checkPushDownPreconditions(join, ctx, targetSlot)) { + if (!checkPushDownPreconditionsForJoin(join, ctx, targetSlot)) { continue; } - Slot olapScanSlot = aliasTransferMap.get(targetSlot).second; + Slot scanSlot = aliasTransferMap.get(targetSlot).second; RuntimeFilter filter = new RuntimeFilter(generator.getNextId(), - bitmapContains.child(0), ImmutableList.of(olapScanSlot), + bitmapContains.child(0), ImmutableList.of(scanSlot), ImmutableList.of(bitmapContains.child(1)), type, i, join, isNot, -1L); - ctx.addJoinToTargetMap(join, olapScanSlot.getExprId()); - ctx.setTargetExprIdToFilter(olapScanSlot.getExprId(), filter); + ctx.addJoinToTargetMap(join, scanSlot.getExprId()); + ctx.setTargetExprIdToFilter(scanSlot.getExprId(), filter); ctx.setTargetsOnScanNode(aliasTransferMap.get(targetSlot).first.getRelationId(), - olapScanSlot); + scanSlot); join.addBitmapRuntimeFilterCondition(bitmapRuntimeFilterCondition); } } @@ -467,9 +466,9 @@ public class RuntimeFilterGenerator extends PlanPostProcessor { Map> aliasTransferMap = ctx.getAliasTransferMap(); PhysicalPlan inputPlanNode = (PhysicalPlan) cteProducer.child(0); Slot unwrappedSlot = checkTargetChild(equalTo.left()); - // aliasTransMap doesn't contain the key, means that the path from the olap scan to the join + // aliasTransMap doesn't contain the key, means that the path from the scan to the join // contains join with denied join type. for example: a left join b on a.id = b.id - if (!checkPushDownPreconditions(join, ctx, unwrappedSlot)) { + if (!checkPushDownPreconditionsForJoin(join, ctx, unwrappedSlot)) { return; } Slot cteSlot = aliasTransferMap.get(unwrappedSlot).second; @@ -492,13 +491,16 @@ public class RuntimeFilterGenerator extends PlanPostProcessor { } else if (!checkCanPushDownIntoBasicTable(project)) { return; } else { - Map pushDownBasicTableInfos = getPushDownBasicTablesInfos(project, + Map pushDownBasicTableInfos = getPushDownBasicTablesInfos(project, (SlotReference) targetExpr, aliasTransferMap); if (!pushDownBasicTableInfos.isEmpty()) { List targetList = new ArrayList<>(); - for (Map.Entry entry : pushDownBasicTableInfos.entrySet()) { + for (Map.Entry entry : pushDownBasicTableInfos.entrySet()) { Slot targetSlot = entry.getKey(); - PhysicalOlapScan scan = entry.getValue(); + PhysicalRelation scan = entry.getValue(); + if (!RuntimeFilterGenerator.checkPushDownPreconditionsForRelation(project, scan)) { + continue; + } targetList.add(targetSlot); ctx.addJoinToTargetMap(join, targetSlot.getExprId()); ctx.setTargetsOnScanNode(scan.getRelationId(), targetSlot); @@ -539,7 +541,7 @@ public class RuntimeFilterGenerator extends PlanPostProcessor { /** * Check runtime filter push down pre-conditions, such as builder side join type, etc. */ - public static boolean checkPushDownPreconditions(AbstractPhysicalJoin physicalJoin, + public static boolean checkPushDownPreconditionsForJoin(AbstractPhysicalJoin physicalJoin, RuntimeFilterContext ctx, Slot slot) { Map> aliasTransferMap = ctx.getAliasTransferMap(); if (slot == null || !aliasTransferMap.containsKey(slot)) { @@ -551,6 +553,21 @@ public class RuntimeFilterGenerator extends PlanPostProcessor { } } + /** + * Check runtime filter push down relation related pre-conditions. + */ + public static boolean checkPushDownPreconditionsForRelation(PhysicalPlan root, PhysicalRelation relation) { + Preconditions.checkState(relation != null, "relation is null"); + // check if the relation supports runtime filter push down + if (!relation.canPushDownRuntimeFilter()) { + return false; + } + // check if the plan root can cover the push down candidate relation + Set relations = new HashSet<>(); + RuntimeFilterGenerator.getAllScanInfo(root, relations); + return relations.contains(relation); + } + private boolean checkCanPushDownIntoBasicTable(PhysicalPlan root) { // only support spj currently List plans = Lists.newArrayList(); @@ -558,13 +575,13 @@ public class RuntimeFilterGenerator extends PlanPostProcessor { return plans.stream().allMatch(p -> SPJ_PLAN.stream().anyMatch(c -> c.isInstance(p))); } - private Map getPushDownBasicTablesInfos(PhysicalPlan root, SlotReference slot, + private Map getPushDownBasicTablesInfos(PhysicalPlan root, SlotReference slot, Map> aliasTransferMap) { - Map basicTableInfos = new HashMap<>(); + Map basicTableInfos = new HashMap<>(); Set joins = new HashSet<>(); ExprId exprId = slot.getExprId(); - if (aliasTransferMap.get(slot) != null && aliasTransferMap.get(slot).first instanceof PhysicalOlapScan) { - basicTableInfos.put(slot, (PhysicalOlapScan) aliasTransferMap.get(slot).first); + if (aliasTransferMap.get(slot) != null) { + basicTableInfos.put(slot, aliasTransferMap.get(slot).first); } // try to find propagation condition from join getAllJoinInfo(root, joins); @@ -575,12 +592,12 @@ public class RuntimeFilterGenerator extends PlanPostProcessor { SlotReference leftSlot = (SlotReference) ((EqualTo) equalTo).left(); SlotReference rightSlot = (SlotReference) ((EqualTo) equalTo).right(); if (leftSlot.getExprId() == exprId && aliasTransferMap.get(rightSlot) != null) { - PhysicalOlapScan rightTable = (PhysicalOlapScan) aliasTransferMap.get(rightSlot).first; + PhysicalRelation rightTable = aliasTransferMap.get(rightSlot).first; if (rightTable != null) { basicTableInfos.put(rightSlot, rightTable); } } else if (rightSlot.getExprId() == exprId && aliasTransferMap.get(leftSlot) != null) { - PhysicalOlapScan leftTable = (PhysicalOlapScan) aliasTransferMap.get(leftSlot).first; + PhysicalRelation leftTable = aliasTransferMap.get(leftSlot).first; if (leftTable != null) { basicTableInfos.put(leftSlot, leftTable); } @@ -601,12 +618,6 @@ public class RuntimeFilterGenerator extends PlanPostProcessor { } } - public static boolean isCoveredByPlanNode(PhysicalPlan root, PhysicalRelation relation) { - Set relations = new HashSet<>(); - RuntimeFilterGenerator.getAllScanInfo(root, relations); - return relations.contains(relation); - } - /** * Get all relation node from current root plan. */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java index 783d374901..d1f3a410f2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java @@ -88,9 +88,9 @@ public abstract class AbstractPhysicalPlan extends AbstractPlan implements Physi // so right maybe an expression and left is a slot Slot probeSlot = RuntimeFilterGenerator.checkTargetChild(probeExpr); - // aliasTransMap doesn't contain the key, means that the path from the olap scan to the join + // aliasTransMap doesn't contain the key, means that the path from the scan to the join // contains join with denied join type. for example: a left join b on a.id = b.id - if (!RuntimeFilterGenerator.checkPushDownPreconditions(builderNode, ctx, probeSlot)) { + if (!RuntimeFilterGenerator.checkPushDownPreconditionsForJoin(builderNode, ctx, probeSlot)) { return false; } @@ -104,12 +104,11 @@ public abstract class AbstractPhysicalPlan extends AbstractPlan implements Physi return true; } - Slot olapScanSlot = aliasTransferMap.get(probeSlot).second; + Slot scanSlot = aliasTransferMap.get(probeSlot).second; PhysicalRelation scan = aliasTransferMap.get(probeSlot).first; - if (!RuntimeFilterGenerator.isCoveredByPlanNode(this, scan)) { + if (!RuntimeFilterGenerator.checkPushDownPreconditionsForRelation(this, scan)) { return false; } - Preconditions.checkState(olapScanSlot != null && scan != null); // in-filter is not friendly to pipeline if (type == TRuntimeFilterType.IN_OR_BLOOM @@ -119,15 +118,16 @@ public abstract class AbstractPhysicalPlan extends AbstractPlan implements Physi } org.apache.doris.nereids.trees.plans.physical.RuntimeFilter filter = ctx.getRuntimeFilterBySrcAndType(src, type, builderNode); + Preconditions.checkState(scanSlot != null, "scan slot is null"); if (filter != null) { - filter.addTargetSlot(olapScanSlot); - filter.addTargetExpressoin(olapScanSlot); + filter.addTargetSlot(scanSlot); + filter.addTargetExpressoin(scanSlot); } else { filter = new RuntimeFilter(generator.getNextId(), - src, ImmutableList.of(olapScanSlot), type, exprOrder, builderNode, buildSideNdv); - ctx.addJoinToTargetMap(builderNode, olapScanSlot.getExprId()); - ctx.setTargetExprIdToFilter(olapScanSlot.getExprId(), filter); - ctx.setTargetsOnScanNode(aliasTransferMap.get(probeExpr).first.getRelationId(), olapScanSlot); + src, ImmutableList.of(scanSlot), type, exprOrder, builderNode, buildSideNdv); + ctx.addJoinToTargetMap(builderNode, scanSlot.getExprId()); + ctx.setTargetExprIdToFilter(scanSlot.getExprId(), filter); + ctx.setTargetsOnScanNode(aliasTransferMap.get(probeExpr).first.getRelationId(), scanSlot); ctx.setRuntimeFilterIdentityToFilter(src, type, builderNode, filter); } return true; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalCTEConsumer.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalCTEConsumer.java index d9991d4c13..38259e155e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalCTEConsumer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalCTEConsumer.java @@ -150,4 +150,9 @@ public class PhysicalCTEConsumer extends PhysicalRelation { return super.pushDownRuntimeFilter(context, generator, builderNode, src, probeExpr, type, buildSideNdv, exprOrder); } + + @Override + public boolean canPushDownRuntimeFilter() { + return true; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalCatalogRelation.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalCatalogRelation.java index fc359fe305..57e3b94221 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalCatalogRelation.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalCatalogRelation.java @@ -123,4 +123,8 @@ public abstract class PhysicalCatalogRelation extends PhysicalRelation implement return Utils.qualifiedName(qualifier, table.getName()); } + @Override + public boolean canPushDownRuntimeFilter() { + return true; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalDistribute.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalDistribute.java index be56cf6dfe..e3b547dd88 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalDistribute.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalDistribute.java @@ -139,22 +139,21 @@ public class PhysicalDistribute extends PhysicalUnary extends PhysicalUnar // so right maybe an expression and left is a slot Slot probeSlot = RuntimeFilterGenerator.checkTargetChild(probeExpr); - // aliasTransMap doesn't contain the key, means that the path from the olap scan to the join + // aliasTransMap doesn't contain the key, means that the path from the scan to the join // contains join with denied join type. for example: a left join b on a.id = b.id - if (!RuntimeFilterGenerator.checkPushDownPreconditions(builderNode, ctx, probeSlot)) { + if (!RuntimeFilterGenerator.checkPushDownPreconditionsForJoin(builderNode, ctx, probeSlot)) { return false; } PhysicalRelation scan = aliasTransferMap.get(probeSlot).first; - if (!RuntimeFilterGenerator.isCoveredByPlanNode(this, scan)) { + if (!RuntimeFilterGenerator.checkPushDownPreconditionsForRelation(this, scan)) { return false; } AbstractPhysicalPlan child = (AbstractPhysicalPlan) child(0); - boolean pushedDown = child.pushDownRuntimeFilter(context, generator, builderNode, + return child.pushDownRuntimeFilter(context, generator, builderNode, src, probeExpr, type, buildSideNdv, exprOrder); - return pushedDown; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashJoin.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashJoin.java index c0c9b602b3..bdc14ec338 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashJoin.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashJoin.java @@ -256,19 +256,18 @@ public class PhysicalHashJoin< pushedDown |= rightNode.pushDownRuntimeFilter(context, generator, builderNode, srcExpr, prob, type, buildSideNdv, exprOrder); } + // currently, we can ensure children in the two side are corresponding to the equal_to's. // so right maybe an expression and left is a slot Slot probeSlot = RuntimeFilterGenerator.checkTargetChild(probeExpr); // aliasTransMap doesn't contain the key, means that the path from the olap scan to the join // contains join with denied join type. for example: a left join b on a.id = b.id - if (!RuntimeFilterGenerator.checkPushDownPreconditions(builderNode, ctx, probeSlot)) { + if (!RuntimeFilterGenerator.checkPushDownPreconditionsForJoin(builderNode, ctx, probeSlot)) { return false; } - Slot olapScanSlot = aliasTransferMap.get(probeSlot).second; PhysicalRelation scan = aliasTransferMap.get(probeSlot).first; - Preconditions.checkState(olapScanSlot != null && scan != null); - if (!RuntimeFilterGenerator.isCoveredByPlanNode(this, scan)) { + if (!RuntimeFilterGenerator.checkPushDownPreconditionsForRelation(this, scan)) { return false; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalProject.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalProject.java index 4d7ed51681..5066d2ad91 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalProject.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalProject.java @@ -167,12 +167,13 @@ public class PhysicalProject extends PhysicalUnary extends PhysicalUnary