From 0b1c82b02135cf9efacac94ff3334c3d76f3bcb8 Mon Sep 17 00:00:00 2001 From: xzj7019 <131111794+xzj7019@users.noreply.github.com> Date: Fri, 21 Jul 2023 23:31:30 +0800 Subject: [PATCH] [opt](nereids) enhance runtime filter pushdown (#21883) Current runtime filter can't be pushed down into complicated plan pattern, such as set operation as join child and cte sender as filter before shuffling. This pr refines the pushing down ability and can able to push the filter into different plan tree layer recursively, such as nested subquery, set op, cte sender, etc. --- .../translator/PhysicalPlanTranslator.java | 19 ++ .../translator/PlanTranslatorContext.java | 14 ++ .../translator/RuntimeFilterTranslator.java | 18 +- .../processor/post/RuntimeFilterContext.java | 7 + .../post/RuntimeFilterGenerator.java | 179 ++++++------------ .../plans/physical/AbstractPhysicalPlan.java | 64 +++++++ .../plans/physical/PhysicalCTEConsumer.java | 15 ++ .../plans/physical/PhysicalDistribute.java | 38 ++++ .../plans/physical/PhysicalHashAggregate.java | 35 ++++ .../plans/physical/PhysicalHashJoin.java | 77 ++++++++ .../trees/plans/physical/PhysicalProject.java | 61 ++++++ .../plans/physical/PhysicalSetOperation.java | 47 +++++ .../org/apache/doris/planner/CTEScanNode.java | 62 ++++++ .../apache/doris/planner/DataStreamSink.java | 54 ++++++ .../apache/doris/planner/HashJoinNode.java | 2 +- .../apache/doris/planner/PlanFragment.java | 5 +- .../apache/doris/planner/RuntimeFilter.java | 3 +- .../doris/statistics/StatisticalType.java | 1 + 18 files changed, 574 insertions(+), 127 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/planner/CTEScanNode.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index f1dacc32e6..a407be03a8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -128,6 +128,7 @@ import org.apache.doris.nereids.util.Utils; import org.apache.doris.planner.AggregationNode; import org.apache.doris.planner.AnalyticEvalNode; import org.apache.doris.planner.AssertNumRowsNode; +import org.apache.doris.planner.CTEScanNode; import org.apache.doris.planner.DataPartition; import org.apache.doris.planner.DataStreamSink; import org.apache.doris.planner.EmptySetNode; @@ -296,6 +297,14 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor + runtimeFilterTranslator.getContext().getPlanNodeIdToCTEDataSinkMap() + .put(cteScanNode.getId(), dataStreamSink)); } else { inputFragment.setDestination(exchangeNode); inputFragment.setOutputPartition(dataPartition); @@ -813,6 +822,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor cteProducer = context.getCteProduceMap().get(cteId); Preconditions.checkState(cteProducer != null, "invalid cteProducer"); + context.getCteConsumerMap().put(cteId, cteConsumer); // set datasink to multicast data sink but do not set target now // target will be set when translate distribute DataStreamSink streamSink = new DataStreamSink(); @@ -821,11 +831,19 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor + runtimeFilterTranslator.getTargetOnScanNode(cteConsumer.getRelationId()).forEach( + expr -> runtimeFilterTranslator.translateRuntimeFilterTarget(expr, cteScanNode, context))); + context.getCteScanNodeMap().put(multiCastFragment.getFragmentId(), cteScanNode); + return multiCastFragment; } @@ -835,6 +853,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor cteProducerMap = Maps.newHashMap(); + private final Map cteConsumerMap = Maps.newHashMap(); + + private final Map cteScanNodeMap = Maps.newHashMap(); + public PlanTranslatorContext(CascadesContext ctx) { this.translator = new RuntimeFilterTranslator(ctx.getRuntimeFilterContext()); } @@ -108,6 +114,14 @@ public class PlanTranslatorContext { return cteProducerMap; } + public Map getCteConsumerMap() { + return cteConsumerMap; + } + + public Map getCteScanNodeMap() { + return cteScanNodeMap; + } + public TupleDescriptor generateTupleDesc() { return descTable.createTupleDescriptor(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java index 6980f05fa5..982c8677aa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java @@ -31,12 +31,15 @@ import org.apache.doris.nereids.trees.expressions.SlotReference; import org.apache.doris.nereids.trees.plans.RelationId; import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalJoin; import org.apache.doris.nereids.trees.plans.physical.RuntimeFilter; +import org.apache.doris.planner.CTEScanNode; +import org.apache.doris.planner.DataStreamSink; import org.apache.doris.planner.HashJoinNode; import org.apache.doris.planner.HashJoinNode.DistributionMode; import org.apache.doris.planner.JoinNodeBase; import org.apache.doris.planner.RuntimeFilter.RuntimeFilterTarget; import org.apache.doris.planner.ScanNode; import org.apache.doris.qe.SessionVariable; +import org.apache.doris.statistics.StatisticalType; import org.apache.doris.thrift.TRuntimeFilterType; import com.google.common.collect.ImmutableList; @@ -122,6 +125,7 @@ public class RuntimeFilterTranslator { hasInvalidTarget = true; break; } + ScanNode scanNode = context.getScanNodeOfLegacyRuntimeFilterTarget().get(curTargetExpr); Expr targetExpr; if (filter.getType() == TRuntimeFilterType.BITMAP) { if (curTargetExpression.equals(curTargetExpr)) { @@ -141,7 +145,6 @@ public class RuntimeFilterTranslator { SlotRef targetSlot = target.getSrcSlotRef(); TupleId targetTupleId = targetSlot.getDesc().getParent().getId(); SlotId targetSlotId = targetSlot.getSlotId(); - ScanNode scanNode = context.getScanNodeOfLegacyRuntimeFilterTarget().get(curTargetExpr); scanNodeList.add(scanNode); targetExprList.add(targetExpr); targetTupleIdMapList.add(ImmutableMap.of(targetTupleId, ImmutableList.of(targetSlotId))); @@ -157,7 +160,8 @@ public class RuntimeFilterTranslator { //bitmap rf requires isBroadCast=false, it always requires merge filter origFilter.setIsBroadcast(false); } - boolean isLocalTarget = scanNodeList.stream().allMatch(e -> e.getFragmentId().equals(node.getFragmentId())); + boolean isLocalTarget = scanNodeList.stream().allMatch(e -> + !(e instanceof CTEScanNode) && e.getFragmentId().equals(node.getFragmentId())); for (int i = 0; i < targetExprList.size(); i++) { ScanNode scanNode = scanNodeList.get(i); Expr targetExpr = targetExprList.get(i); @@ -165,7 +169,15 @@ public class RuntimeFilterTranslator { scanNode, targetExpr, true, isLocalTarget)); } origFilter.setBitmapFilterNotIn(filter.isBitmapFilterNotIn()); - context.getLegacyFilters().add(finalize(origFilter)); + org.apache.doris.planner.RuntimeFilter finalizedFilter = finalize(origFilter); + scanNodeList.stream().filter(e -> e.getStatisticalType() == StatisticalType.CTE_SCAN_NODE) + .forEach(f -> { + DataStreamSink sink = context.getPlanNodeIdToCTEDataSinkMap().get(f.getId()); + if (sink != null) { + sink.addRuntimeFilter(finalizedFilter); + } + }); + context.getLegacyFilters().add(finalizedFilter); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java index 5aab5ab5dd..84f9f24e09 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java @@ -33,6 +33,8 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEProducer; import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin; import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation; import org.apache.doris.nereids.trees.plans.physical.RuntimeFilter; +import org.apache.doris.planner.DataStreamSink; +import org.apache.doris.planner.PlanNodeId; import org.apache.doris.planner.RuntimeFilterGenerator.FilterSizeLimits; import org.apache.doris.planner.RuntimeFilterId; import org.apache.doris.planner.ScanNode; @@ -60,6 +62,7 @@ public class RuntimeFilterContext { // exprId of target to runtime filter. private final Map> targetExprIdToFilter = Maps.newHashMap(); + private final Map planNodeIdToCTEDataSinkMap = Maps.newHashMap(); private final Map> joinToTargetExprId = Maps.newHashMap(); // olap scan node that contains target of a runtime filter. @@ -165,6 +168,10 @@ public class RuntimeFilterContext { return exprIdToOlapScanNodeSlotRef; } + public Map getPlanNodeIdToCTEDataSinkMap() { + return planNodeIdToCTEDataSinkMap; + } + public Map> getAliasTransferMap() { return aliasTransferMap; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java index 1e5575e4ac..fc3b1f4ced 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java @@ -39,16 +39,13 @@ import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalJoin; import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEConsumer; import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEProducer; import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute; -import org.apache.doris.nereids.trees.plans.physical.PhysicalExcept; import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter; import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin; -import org.apache.doris.nereids.trees.plans.physical.PhysicalIntersect; import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin; import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan; import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan; import org.apache.doris.nereids.trees.plans.physical.PhysicalProject; import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation; -import org.apache.doris.nereids.trees.plans.physical.PhysicalUnion; import org.apache.doris.nereids.trees.plans.physical.RuntimeFilter; import org.apache.doris.nereids.util.ExpressionUtils; import org.apache.doris.nereids.util.JoinUtils; @@ -152,8 +149,14 @@ public class RuntimeFilterGenerator extends PlanPostProcessor { @Override public PhysicalCTEProducer visitPhysicalCTEProducer(PhysicalCTEProducer producer, CascadesContext context) { - CTEId id = producer.getCteId(); - context.getRuntimeFilterContext().getCteProduceMap().put(id, producer); + CTEId cteId = producer.getCteId(); + context.getRuntimeFilterContext().getCteProduceMap().put(cteId, producer); + Set processedCTE = context.getRuntimeFilterContext().getProcessedCTE(); + if (!processedCTE.contains(cteId)) { + PhysicalPlan inputPlanNode = (PhysicalPlan) producer.child(0); + inputPlanNode.accept(this, context); + processedCTE.add(cteId); + } return producer; } @@ -194,7 +197,7 @@ public class RuntimeFilterGenerator extends PlanPostProcessor { TRuntimeFilterType type = TRuntimeFilterType.BITMAP; Set targetSlots = bitmapContains.child(1).getInputSlots(); for (Slot targetSlot : targetSlots) { - if (!checkCanPushDownFromJoinType(join, ctx, targetSlot)) { + if (!checkPushDownPreconditions(join, ctx, targetSlot)) { continue; } Slot olapScanSlot = aliasTransferMap.get(targetSlot).second; @@ -251,7 +254,7 @@ public class RuntimeFilterGenerator extends PlanPostProcessor { return buildColStats.isUnKnown ? -1 : Math.max(1, (long) buildColStats.ndv); } - private static Slot checkTargetChild(Expression leftChild) { + public static Slot checkTargetChild(Expression leftChild) { Expression expression = ExpressionUtils.getExpressionCoveredByCast(leftChild); return expression instanceof Slot ? ((Slot) expression) : null; } @@ -262,8 +265,6 @@ public class RuntimeFilterGenerator extends PlanPostProcessor { List legalTypes = Arrays.stream(TRuntimeFilterType.values()) .filter(type -> (type.getValue() & ctx.getSessionVariable().getRuntimeFilterType()) > 0) .collect(Collectors.toList()); - // TODO: some complex situation cannot be handled now, see testPushDownThroughJoin. - // we will support it in later version. for (int i = 0; i < join.getHashJoinConjuncts().size(); i++) { EqualTo equalTo = ((EqualTo) JoinUtils.swapEqualToForChildrenOrder( (EqualTo) join.getHashJoinConjuncts().get(i), join.left().getOutputSet())); @@ -272,112 +273,9 @@ public class RuntimeFilterGenerator extends PlanPostProcessor { if (type == TRuntimeFilterType.BITMAP) { continue; } - if (join.left() instanceof PhysicalUnion - || join.left() instanceof PhysicalIntersect - || join.left() instanceof PhysicalExcept) { - doPushDownIntoSetOperation(join, ctx, equalTo, type, i); - } else { - doPushDownBasic(join, context, ctx, equalTo, type, i); - } - } - } - } - - private void doPushDownBasic(PhysicalHashJoin join, CascadesContext context, - RuntimeFilterContext ctx, EqualTo equalTo, TRuntimeFilterType type, int exprOrder) { - Map> aliasTransferMap = ctx.getAliasTransferMap(); - // currently, we can ensure children in the two side are corresponding to the equal_to's. - // so right maybe an expression and left is a slot - Slot unwrappedSlot = checkTargetChild(equalTo.left()); - // aliasTransMap doesn't contain the key, means that the path from the olap scan to the join - // contains join with denied join type. for example: a left join b on a.id = b.id - if (!checkCanPushDownFromJoinType(join, ctx, unwrappedSlot)) { - return; - } - Slot olapScanSlot = aliasTransferMap.get(unwrappedSlot).second; - PhysicalRelation scan = aliasTransferMap.get(unwrappedSlot).first; - - Preconditions.checkState(olapScanSlot != null && scan != null); - - if (scan instanceof PhysicalCTEConsumer) { - Set processedCTE = context.getRuntimeFilterContext().getProcessedCTE(); - CTEId cteId = ((PhysicalCTEConsumer) scan).getCteId(); - if (!processedCTE.contains(cteId)) { - PhysicalCTEProducer cteProducer = context.getRuntimeFilterContext() - .getCteProduceMap().get(cteId); - PhysicalPlan inputPlanNode = (PhysicalPlan) cteProducer.child(0); - // process cte producer self recursively - inputPlanNode.accept(this, context); - processedCTE.add(cteId); - } - } else { - // in-filter is not friendly to pipeline - if (type == TRuntimeFilterType.IN_OR_BLOOM - && ctx.getSessionVariable().getEnablePipelineEngine() - && hasRemoteTarget(join, scan)) { - type = TRuntimeFilterType.BLOOM; - } - long buildSideNdv = getBuildSideNdv(join, equalTo); - RuntimeFilter filter = new RuntimeFilter(generator.getNextId(), - equalTo.right(), ImmutableList.of(olapScanSlot), type, exprOrder, join, buildSideNdv); - ctx.addJoinToTargetMap(join, olapScanSlot.getExprId()); - ctx.setTargetExprIdToFilter(olapScanSlot.getExprId(), filter); - ctx.setTargetsOnScanNode(aliasTransferMap.get(unwrappedSlot).first.getRelationId(), olapScanSlot); - } - } - - private void doPushDownIntoSetOperation(PhysicalHashJoin join, - RuntimeFilterContext ctx, EqualTo equalTo, TRuntimeFilterType type, int exprOrder) { - Map> aliasTransferMap = ctx.getAliasTransferMap(); - List targetList = new ArrayList<>(); - int projIndex = -1; - for (int j = 0; j < join.left().children().size(); j++) { - PhysicalPlan child = (PhysicalPlan) join.left().child(j); - if (child instanceof PhysicalProject) { - PhysicalProject project = (PhysicalProject) child; - Slot leftSlot = checkTargetChild(equalTo.left()); - if (leftSlot == null) { - break; - } - for (int k = 0; projIndex < 0 && k < project.getProjects().size(); k++) { - NamedExpression expr = (NamedExpression) project.getProjects().get(k); - if (expr.getName().equals(leftSlot.getName())) { - projIndex = k; - break; - } - } - Preconditions.checkState(projIndex >= 0 - && projIndex < project.getProjects().size()); - - NamedExpression targetExpr = (NamedExpression) project.getProjects().get(projIndex); - - SlotReference origSlot = null; - if (targetExpr instanceof Alias) { - origSlot = (SlotReference) targetExpr.child(0); - } else { - origSlot = (SlotReference) targetExpr; - } - Slot olapScanSlot = aliasTransferMap.get(origSlot).second; - if (!checkCanPushDownFromJoinType(join, ctx, olapScanSlot)) { - continue; - } - PhysicalRelation scan = aliasTransferMap.get(origSlot).first; - if (type == TRuntimeFilterType.IN_OR_BLOOM - && ctx.getSessionVariable().getEnablePipelineEngine() - && hasRemoteTarget(join, scan)) { - type = TRuntimeFilterType.BLOOM; - } - targetList.add(olapScanSlot); - ctx.addJoinToTargetMap(join, olapScanSlot.getExprId()); - ctx.setTargetsOnScanNode(aliasTransferMap.get(origSlot).first.getRelationId(), olapScanSlot); - } - } - if (!targetList.isEmpty()) { - long buildSideNdv = getBuildSideNdv(join, equalTo); - RuntimeFilter filter = new RuntimeFilter(generator.getNextId(), - equalTo.right(), targetList, type, exprOrder, join, buildSideNdv); - for (int j = 0; j < targetList.size(); j++) { - ctx.setTargetExprIdToFilter(targetList.get(j).getExprId(), filter); + long buildSideNdv = getBuildSideNdv(join, equalTo); + join.pushDownRuntimeFilter(context, generator, join, equalTo.right(), + equalTo.left(), type, buildSideNdv, i); } } } @@ -580,7 +478,7 @@ public class RuntimeFilterGenerator extends PlanPostProcessor { Slot unwrappedSlot = checkTargetChild(equalTo.left()); // aliasTransMap doesn't contain the key, means that the path from the olap scan to the join // contains join with denied join type. for example: a left join b on a.id = b.id - if (!checkCanPushDownFromJoinType(join, ctx, unwrappedSlot)) { + if (!checkPushDownPreconditions(join, ctx, unwrappedSlot)) { return; } Slot cteSlot = aliasTransferMap.get(unwrappedSlot).second; @@ -626,8 +524,11 @@ public class RuntimeFilterGenerator extends PlanPostProcessor { } } - private boolean checkCanPushDownFromJoinType(AbstractPhysicalJoin physicalJoin, - RuntimeFilterContext ctx, Slot slot) { + /** + * Check runtime filter push down pre-conditions, such as builder side join type, etc. + */ + public static boolean checkPushDownPreconditions(AbstractPhysicalJoin physicalJoin, + RuntimeFilterContext ctx, Slot slot) { Map> aliasTransferMap = ctx.getAliasTransferMap(); if (slot == null || !aliasTransferMap.containsKey(slot)) { return false; @@ -688,7 +589,47 @@ public class RuntimeFilterGenerator extends PlanPostProcessor { } } - private boolean hasRemoteTarget(AbstractPlan join, AbstractPlan scan) { + public static boolean isCoveredByPlanNode(PhysicalPlan root, PhysicalRelation relation) { + Set relations = new HashSet<>(); + RuntimeFilterGenerator.getAllScanInfo(root, relations); + return relations.contains(relation); + } + + /** + * Get all relation node from current root plan. + */ + public static void getAllScanInfo(PhysicalPlan root, Set scans) { + if (root instanceof PhysicalRelation) { + scans.add((PhysicalRelation) root); + } else { + for (Object child : root.children()) { + getAllScanInfo((PhysicalPlan) child, scans); + } + } + } + + /** + * Check whether plan root contains cte consumer descendant. + */ + public static boolean hasCTEConsumerDescendant(PhysicalPlan root) { + if (root instanceof PhysicalCTEConsumer) { + return true; + } else if (root.children().size() == 1) { + return hasCTEConsumerDescendant((PhysicalPlan) root.child(0)); + } else { + for (Object child : root.children()) { + if (hasCTEConsumerDescendant((PhysicalPlan) child)) { + return true; + } + } + return false; + } + } + + /** + * Check whether runtime filter target is remote or local + */ + public static boolean hasRemoteTarget(AbstractPlan join, AbstractPlan scan) { if (scan instanceof PhysicalCTEConsumer) { return true; } else { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java index 14b989f414..01320ec075 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java @@ -17,16 +17,30 @@ package org.apache.doris.nereids.trees.plans.physical; +import org.apache.doris.common.IdGenerator; +import org.apache.doris.common.Pair; +import org.apache.doris.nereids.CascadesContext; import org.apache.doris.nereids.memo.GroupExpression; +import org.apache.doris.nereids.processor.post.RuntimeFilterContext; +import org.apache.doris.nereids.processor.post.RuntimeFilterGenerator; import org.apache.doris.nereids.properties.LogicalProperties; import org.apache.doris.nereids.properties.PhysicalProperties; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.expressions.Slot; import org.apache.doris.nereids.trees.plans.AbstractPlan; import org.apache.doris.nereids.trees.plans.Explainable; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.planner.RuntimeFilterId; import org.apache.doris.qe.ConnectContext; import org.apache.doris.statistics.Statistics; +import org.apache.doris.thrift.TRuntimeFilterType; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; + +import java.util.Map; import java.util.Optional; import javax.annotation.Nullable; @@ -57,6 +71,56 @@ public abstract class AbstractPhysicalPlan extends AbstractPlan implements Physi return physicalProperties; } + /** + * Pushing down runtime filter into different plan node, such as olap scan node, cte sender node, etc. + */ + public boolean pushDownRuntimeFilter(CascadesContext context, IdGenerator generator, + AbstractPhysicalJoin builderNode, + Expression src, Expression probeExpr, + TRuntimeFilterType type, long buildSideNdv, int exprOrder) { + RuntimeFilterContext ctx = context.getRuntimeFilterContext(); + Map> aliasTransferMap = ctx.getAliasTransferMap(); + // currently, we can ensure children in the two side are corresponding to the equal_to's. + // so right maybe an expression and left is a slot + Slot probeSlot = RuntimeFilterGenerator.checkTargetChild(probeExpr); + + // aliasTransMap doesn't contain the key, means that the path from the olap scan to the join + // contains join with denied join type. for example: a left join b on a.id = b.id + if (!RuntimeFilterGenerator.checkPushDownPreconditions(builderNode, ctx, probeSlot)) { + return false; + } + + boolean pushedDown = false; + for (Object child : children) { + AbstractPhysicalPlan childPlan = (AbstractPhysicalPlan) child; + pushedDown |= childPlan.pushDownRuntimeFilter(context, generator, builderNode, src, probeExpr, + type, buildSideNdv, exprOrder); + } + if (pushedDown) { + return true; + } + + Slot olapScanSlot = aliasTransferMap.get(probeSlot).second; + PhysicalRelation scan = aliasTransferMap.get(probeSlot).first; + if (!RuntimeFilterGenerator.isCoveredByPlanNode(this, scan)) { + return false; + } + Preconditions.checkState(olapScanSlot != null && scan != null); + + // in-filter is not friendly to pipeline + if (type == TRuntimeFilterType.IN_OR_BLOOM + && ctx.getSessionVariable().getEnablePipelineEngine() + && RuntimeFilterGenerator.hasRemoteTarget(builderNode, scan)) { + type = TRuntimeFilterType.BLOOM; + } + org.apache.doris.nereids.trees.plans.physical.RuntimeFilter filter = new RuntimeFilter(generator.getNextId(), + src, ImmutableList.of(olapScanSlot), type, exprOrder, builderNode, buildSideNdv); + ctx.addJoinToTargetMap(builderNode, olapScanSlot.getExprId()); + ctx.setTargetExprIdToFilter(olapScanSlot.getExprId(), filter); + ctx.setTargetsOnScanNode(aliasTransferMap.get(probeExpr).first.getRelationId(), olapScanSlot); + return true; + } + @Override public Plan getExplainPlan(ConnectContext ctx) { return this; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalCTEConsumer.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalCTEConsumer.java index e6450a94e8..67056b1681 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalCTEConsumer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalCTEConsumer.java @@ -17,17 +17,22 @@ package org.apache.doris.nereids.trees.plans.physical; +import org.apache.doris.common.IdGenerator; +import org.apache.doris.nereids.CascadesContext; import org.apache.doris.nereids.memo.GroupExpression; import org.apache.doris.nereids.properties.LogicalProperties; import org.apache.doris.nereids.properties.PhysicalProperties; import org.apache.doris.nereids.trees.expressions.CTEId; +import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.Slot; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.RelationId; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.nereids.util.Utils; +import org.apache.doris.planner.RuntimeFilterId; import org.apache.doris.statistics.Statistics; +import org.apache.doris.thrift.TRuntimeFilterType; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; @@ -134,4 +139,14 @@ public class PhysicalCTEConsumer extends PhysicalRelation { return Utils.toSqlString("PhysicalCteConsumer", "cteId", cteId); } + + @Override + public boolean pushDownRuntimeFilter(CascadesContext context, IdGenerator generator, + AbstractPhysicalJoin builderNode, + Expression src, Expression probeExpr, + TRuntimeFilterType type, long buildSideNdv, int exprOrder) { + // TODO: current cte internal pushing down is too complicated and it is not convenient to move the logic here. + // will refine it in the future. + return false; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalDistribute.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalDistribute.java index 6fc7f0bd44..ce6a0411b2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalDistribute.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalDistribute.java @@ -17,22 +17,32 @@ package org.apache.doris.nereids.trees.plans.physical; +import org.apache.doris.common.IdGenerator; +import org.apache.doris.common.Pair; +import org.apache.doris.nereids.CascadesContext; import org.apache.doris.nereids.memo.GroupExpression; +import org.apache.doris.nereids.processor.post.RuntimeFilterContext; +import org.apache.doris.nereids.processor.post.RuntimeFilterGenerator; import org.apache.doris.nereids.properties.DistributionSpec; import org.apache.doris.nereids.properties.LogicalProperties; import org.apache.doris.nereids.properties.PhysicalProperties; import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.expressions.Slot; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.nereids.util.Utils; +import org.apache.doris.planner.RuntimeFilterId; import org.apache.doris.statistics.Statistics; +import org.apache.doris.thrift.TRuntimeFilterType; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import org.json.JSONObject; import java.util.List; +import java.util.Map; import java.util.Optional; /** @@ -118,4 +128,32 @@ public class PhysicalDistribute extends PhysicalUnary(distributionSpec, groupExpression, getLogicalProperties(), physicalProperties, statistics, child()); } + + @Override + public boolean pushDownRuntimeFilter(CascadesContext context, IdGenerator generator, + AbstractPhysicalJoin builderNode, Expression src, Expression probeExpr, + TRuntimeFilterType type, long buildSideNdv, int exprOrder) { + RuntimeFilterContext ctx = context.getRuntimeFilterContext(); + Map> aliasTransferMap = ctx.getAliasTransferMap(); + // currently, we can ensure children in the two side are corresponding to the equal_to's. + // so right maybe an expression and left is a slot + Slot probeSlot = RuntimeFilterGenerator.checkTargetChild(probeExpr); + + // aliasTransMap doesn't contain the key, means that the path from the olap scan to the join + // contains join with denied join type. for example: a left join b on a.id = b.id + if (!RuntimeFilterGenerator.checkPushDownPreconditions(builderNode, ctx, probeSlot)) { + return false; + } + PhysicalRelation scan = aliasTransferMap.get(probeSlot).first; + if (!RuntimeFilterGenerator.isCoveredByPlanNode(this, scan)) { + return false; + } + // TODO: global rf need merge stage which is heavy + // add some rule, such as bc only is allowed for + // pushing down through distribute, currently always pushing. + AbstractPhysicalPlan childPlan = (AbstractPhysicalPlan) child(0); + boolean pushedDown = childPlan.pushDownRuntimeFilter(context, generator, builderNode, src, probeExpr, + type, buildSideNdv, exprOrder); + return pushedDown; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashAggregate.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashAggregate.java index 01c9065ab3..9a408bb9ab 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashAggregate.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashAggregate.java @@ -17,13 +17,19 @@ package org.apache.doris.nereids.trees.plans.physical; +import org.apache.doris.common.IdGenerator; +import org.apache.doris.common.Pair; +import org.apache.doris.nereids.CascadesContext; import org.apache.doris.nereids.memo.GroupExpression; +import org.apache.doris.nereids.processor.post.RuntimeFilterContext; +import org.apache.doris.nereids.processor.post.RuntimeFilterGenerator; import org.apache.doris.nereids.properties.LogicalProperties; import org.apache.doris.nereids.properties.PhysicalProperties; import org.apache.doris.nereids.properties.RequireProperties; import org.apache.doris.nereids.properties.RequirePropertiesSupplier; import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.expressions.Slot; import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateParam; import org.apache.doris.nereids.trees.plans.AggMode; import org.apache.doris.nereids.trees.plans.AggPhase; @@ -32,12 +38,15 @@ import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.algebra.Aggregate; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.nereids.util.Utils; +import org.apache.doris.planner.RuntimeFilterId; import org.apache.doris.statistics.Statistics; +import org.apache.doris.thrift.TRuntimeFilterType; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -283,4 +292,30 @@ public class PhysicalHashAggregate extends PhysicalUnar builder.append(getAggPhase()).append("]"); return builder.toString(); } + + @Override + public boolean pushDownRuntimeFilter(CascadesContext context, IdGenerator generator, + AbstractPhysicalJoin builderNode, Expression src, Expression probeExpr, + TRuntimeFilterType type, long buildSideNdv, int exprOrder) { + RuntimeFilterContext ctx = context.getRuntimeFilterContext(); + Map> aliasTransferMap = ctx.getAliasTransferMap(); + // currently, we can ensure children in the two side are corresponding to the equal_to's. + // so right maybe an expression and left is a slot + Slot probeSlot = RuntimeFilterGenerator.checkTargetChild(probeExpr); + + // aliasTransMap doesn't contain the key, means that the path from the olap scan to the join + // contains join with denied join type. for example: a left join b on a.id = b.id + if (!RuntimeFilterGenerator.checkPushDownPreconditions(builderNode, ctx, probeSlot)) { + return false; + } + PhysicalRelation scan = aliasTransferMap.get(probeSlot).first; + if (!RuntimeFilterGenerator.isCoveredByPlanNode(this, scan)) { + return false; + } + + AbstractPhysicalPlan child = (AbstractPhysicalPlan) child(0); + boolean pushedDown = child.pushDownRuntimeFilter(context, generator, builderNode, + src, probeExpr, type, buildSideNdv, exprOrder); + return pushedDown; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashJoin.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashJoin.java index c8ed9ec8ed..8220856794 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashJoin.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashJoin.java @@ -17,13 +17,19 @@ package org.apache.doris.nereids.trees.plans.physical; +import org.apache.doris.common.IdGenerator; import org.apache.doris.common.Pair; +import org.apache.doris.nereids.CascadesContext; import org.apache.doris.nereids.memo.GroupExpression; +import org.apache.doris.nereids.processor.post.RuntimeFilterContext; +import org.apache.doris.nereids.processor.post.RuntimeFilterGenerator; import org.apache.doris.nereids.properties.LogicalProperties; import org.apache.doris.nereids.properties.PhysicalProperties; import org.apache.doris.nereids.trees.expressions.ExprId; import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.MarkJoinSlotReference; +import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.expressions.Slot; import org.apache.doris.nereids.trees.plans.AbstractPlan; import org.apache.doris.nereids.trees.plans.JoinHint; import org.apache.doris.nereids.trees.plans.JoinType; @@ -31,13 +37,17 @@ import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.nereids.util.Utils; +import org.apache.doris.planner.RuntimeFilterId; import org.apache.doris.statistics.Statistics; +import org.apache.doris.thrift.TRuntimeFilterType; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import java.util.Comparator; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -189,6 +199,73 @@ public class PhysicalHashJoin< groupExpression, getLogicalProperties(), physicalProperties, statistics, left(), right()); } + @Override + public boolean pushDownRuntimeFilter(CascadesContext context, IdGenerator generator, + AbstractPhysicalJoin builderNode, Expression srcExpr, Expression probeExpr, + TRuntimeFilterType type, long buildSideNdv, int exprOrder) { + RuntimeFilterContext ctx = context.getRuntimeFilterContext(); + Map> aliasTransferMap = ctx.getAliasTransferMap(); + + // if rf built between plan nodes containing cte both, for example both src slot and target slot are from cte, + // or two sub-queries both containing cte, disable this rf since this kind of cross-cte rf will make one side + // of cte to wait for a long time until another side cte consumer finished, which will make the rf into + // not ready state. + AbstractPhysicalPlan builderLeftNode = (AbstractPhysicalPlan) builderNode.child(0); + AbstractPhysicalPlan builderRightNode = (AbstractPhysicalPlan) builderNode.child(1); + Preconditions.checkState(builderLeftNode != null && builderRightNode != null, + "builder join node child node is null"); + if (RuntimeFilterGenerator.hasCTEConsumerDescendant(builderLeftNode) + && RuntimeFilterGenerator.hasCTEConsumerDescendant(builderRightNode)) { + return false; + } + + boolean pushedDown = false; + AbstractPhysicalPlan leftNode = (AbstractPhysicalPlan) child(0); + AbstractPhysicalPlan rightNode = (AbstractPhysicalPlan) child(1); + Preconditions.checkState(leftNode != null && rightNode != null, + "join child node is null"); + + pushedDown |= leftNode.pushDownRuntimeFilter(context, generator, builderNode, + srcExpr, probeExpr, type, buildSideNdv, exprOrder); + pushedDown |= rightNode.pushDownRuntimeFilter(context, generator, builderNode, + srcExpr, probeExpr, type, buildSideNdv, exprOrder); + + // currently, we can ensure children in the two side are corresponding to the equal_to's. + // so right maybe an expression and left is a slot + Slot probeSlot = RuntimeFilterGenerator.checkTargetChild(probeExpr); + + // aliasTransMap doesn't contain the key, means that the path from the olap scan to the join + // contains join with denied join type. for example: a left join b on a.id = b.id + if (!RuntimeFilterGenerator.checkPushDownPreconditions(builderNode, ctx, probeSlot)) { + return false; + } + Slot olapScanSlot = aliasTransferMap.get(probeSlot).second; + PhysicalRelation scan = aliasTransferMap.get(probeSlot).first; + Preconditions.checkState(olapScanSlot != null && scan != null); + if (!RuntimeFilterGenerator.isCoveredByPlanNode(this, scan)) { + return false; + } + + // TODO: if can't push down into join's chidren, try to + // find possible chance in upper layer + if (pushedDown) { + return true; + } + + // in-filter is not friendly to pipeline + if (type == TRuntimeFilterType.IN_OR_BLOOM + && ctx.getSessionVariable().getEnablePipelineEngine() + && RuntimeFilterGenerator.hasRemoteTarget(this, scan)) { + type = TRuntimeFilterType.BLOOM; + } + RuntimeFilter filter = new RuntimeFilter(generator.getNextId(), + srcExpr, ImmutableList.of(olapScanSlot), type, exprOrder, this, buildSideNdv); + ctx.addJoinToTargetMap(this, olapScanSlot.getExprId()); + ctx.setTargetExprIdToFilter(olapScanSlot.getExprId(), filter); + ctx.setTargetsOnScanNode(aliasTransferMap.get(probeSlot).first.getRelationId(), olapScanSlot); + return true; + } + private class ExprComparator implements Comparator { @Override public int compare(Expression e1, Expression e2) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalProject.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalProject.java index 6c36045df4..5587a5ba74 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalProject.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalProject.java @@ -17,22 +17,32 @@ package org.apache.doris.nereids.trees.plans.physical; +import org.apache.doris.common.IdGenerator; +import org.apache.doris.common.Pair; +import org.apache.doris.nereids.CascadesContext; import org.apache.doris.nereids.memo.GroupExpression; +import org.apache.doris.nereids.processor.post.RuntimeFilterContext; +import org.apache.doris.nereids.processor.post.RuntimeFilterGenerator; import org.apache.doris.nereids.properties.LogicalProperties; import org.apache.doris.nereids.properties.PhysicalProperties; +import org.apache.doris.nereids.trees.expressions.Alias; import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.expressions.Slot; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.algebra.Project; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.nereids.util.Utils; +import org.apache.doris.planner.RuntimeFilterId; import org.apache.doris.statistics.Statistics; +import org.apache.doris.thrift.TRuntimeFilterType; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -146,4 +156,55 @@ public class PhysicalProject extends PhysicalUnary generator, + AbstractPhysicalJoin builderNode, Expression src, Expression probeExpr, + TRuntimeFilterType type, long buildSideNdv, int exprOrder) { + RuntimeFilterContext ctx = context.getRuntimeFilterContext(); + Map> aliasTransferMap = ctx.getAliasTransferMap(); + // currently, we can ensure children in the two side are corresponding to the equal_to's. + // so right maybe an expression and left is a slot + Slot probeSlot = RuntimeFilterGenerator.checkTargetChild(probeExpr); + + // aliasTransMap doesn't contain the key, means that the path from the olap scan to the join + // contains join with denied join type. for example: a left join b on a.id = b.id + if (!RuntimeFilterGenerator.checkPushDownPreconditions(builderNode, ctx, probeSlot)) { + return false; + } + PhysicalRelation scan = aliasTransferMap.get(probeSlot).first; + if (scan instanceof PhysicalCTEConsumer) { + // update the probeExpr + int projIndex = -1; + for (int i = 0; i < getProjects().size(); i++) { + NamedExpression expr = getProjects().get(i); + if (expr.getName().equals(probeSlot.getName())) { + projIndex = i; + break; + } + } + if (projIndex < 0 || projIndex >= getProjects().size()) { + // the pushed down path can't contain the probe expr + return false; + } + NamedExpression newProbeExpr = this.getProjects().get(projIndex); + if (newProbeExpr instanceof Alias) { + newProbeExpr = (NamedExpression) newProbeExpr.child(0); + } + Slot newProbeSlot = RuntimeFilterGenerator.checkTargetChild(newProbeExpr); + if (!RuntimeFilterGenerator.checkPushDownPreconditions(builderNode, ctx, newProbeSlot)) { + return false; + } + scan = aliasTransferMap.get(newProbeSlot).first; + probeExpr = newProbeExpr; + } + if (!RuntimeFilterGenerator.isCoveredByPlanNode(this, scan)) { + return false; + } + + AbstractPhysicalPlan child = (AbstractPhysicalPlan) child(0); + boolean pushedDown = child.pushDownRuntimeFilter(context, generator, builderNode, + src, probeExpr, type, buildSideNdv, exprOrder); + return pushedDown; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalSetOperation.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalSetOperation.java index 640a5c1115..c77f4940b9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalSetOperation.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalSetOperation.java @@ -17,9 +17,14 @@ package org.apache.doris.nereids.trees.plans.physical; +import org.apache.doris.common.IdGenerator; +import org.apache.doris.nereids.CascadesContext; import org.apache.doris.nereids.memo.GroupExpression; +import org.apache.doris.nereids.processor.post.RuntimeFilterContext; +import org.apache.doris.nereids.processor.post.RuntimeFilterGenerator; import org.apache.doris.nereids.properties.LogicalProperties; import org.apache.doris.nereids.properties.PhysicalProperties; +import org.apache.doris.nereids.trees.expressions.Alias; import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.NamedExpression; import org.apache.doris.nereids.trees.expressions.Slot; @@ -28,7 +33,9 @@ import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.algebra.SetOperation; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.nereids.util.Utils; +import org.apache.doris.planner.RuntimeFilterId; import org.apache.doris.statistics.Statistics; +import org.apache.doris.thrift.TRuntimeFilterType; import com.google.common.collect.ImmutableList; @@ -130,4 +137,44 @@ public abstract class PhysicalSetOperation extends AbstractPhysicalPlan implemen return children.size(); } + @Override + public boolean pushDownRuntimeFilter(CascadesContext context, IdGenerator generator, + AbstractPhysicalJoin builderNode, + Expression src, Expression probeExpr, + TRuntimeFilterType type, long buildSideNdv, int exprOrder) { + RuntimeFilterContext ctx = context.getRuntimeFilterContext(); + boolean pushedDown = false; + for (int i = 0; i < this.children().size(); i++) { + AbstractPhysicalPlan child = (AbstractPhysicalPlan) this.child(i); + // TODO: replace this special logic with dynamic handling + if (child instanceof PhysicalDistribute) { + child = (AbstractPhysicalPlan) child.child(0); + } + if (child instanceof PhysicalProject) { + PhysicalProject project = (PhysicalProject) child; + int projIndex = -1; + Slot probeSlot = RuntimeFilterGenerator.checkTargetChild(probeExpr); + if (!RuntimeFilterGenerator.checkPushDownPreconditions(builderNode, ctx, probeSlot)) { + continue; + } + for (int j = 0; j < project.getProjects().size(); j++) { + NamedExpression expr = (NamedExpression) project.getProjects().get(j); + if (expr.getName().equals(probeSlot.getName())) { + projIndex = j; + break; + } + } + if (projIndex < 0 || projIndex >= project.getProjects().size()) { + continue; + } + NamedExpression newProbeExpr = (NamedExpression) project.getProjects().get(projIndex); + if (newProbeExpr instanceof Alias) { + newProbeExpr = (NamedExpression) newProbeExpr.child(0); + } + pushedDown |= child.pushDownRuntimeFilter(context, generator, builderNode, src, + newProbeExpr, type, buildSideNdv, exprOrder); + } + } + return pushedDown; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/CTEScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/CTEScanNode.java new file mode 100644 index 0000000000..8a0c05c70e --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/CTEScanNode.java @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner; + +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.common.UserException; +import org.apache.doris.statistics.StatisticalType; +import org.apache.doris.thrift.TPlanNode; +import org.apache.doris.thrift.TScanRangeLocations; + +import java.util.List; + +/** + * The cte scan node is just a cte consumer wrapper which is convenient for collecting + * cte target information. + */ +public class CTEScanNode extends ScanNode { + private static final PlanNodeId UNINITIAL_PLANNODEID = new PlanNodeId(-1); + + public CTEScanNode(TupleDescriptor desc) { + super(UNINITIAL_PLANNODEID, desc, "CTEScanNode", StatisticalType.CTE_SCAN_NODE); + } + + public CTEScanNode(PlanNodeId id, TupleDescriptor desc) { + super(id, desc, "CTEScanNode", StatisticalType.CTE_SCAN_NODE); + } + + public void setPlanNodeId(PlanNodeId id) { + this.id = id; + } + + @Override + protected void toThrift(TPlanNode msg) { + // NO real action to be taken, just a wrapper + } + + @Override + protected void createScanRangeLocations() throws UserException { + // NO real action to be taken, just a wrapper + } + + @Override + public List getScanRangeLocations(long maxScanRangeLength) { + // NO real action to be taken, just a wrapper + return null; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java index 6a43694527..8f5ce78304 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java @@ -28,9 +28,11 @@ import org.apache.doris.thrift.TDataSinkType; import org.apache.doris.thrift.TDataStreamSink; import org.apache.doris.thrift.TExplainLevel; +import com.google.common.base.Joiner; import com.google.common.collect.Lists; import org.springframework.util.CollectionUtils; +import java.util.ArrayList; import java.util.List; /** @@ -48,6 +50,8 @@ public class DataStreamSink extends DataSink { protected List conjuncts = Lists.newArrayList(); + protected List runtimeFilters = Lists.newArrayList(); + public DataStreamSink() { } @@ -102,6 +106,18 @@ public class DataStreamSink extends DataSink { this.conjuncts.add(conjunct); } + public List getRuntimeFilters() { + return runtimeFilters; + } + + public void setRuntimeFilters(List runtimeFilters) { + this.runtimeFilters = runtimeFilters; + } + + public void addRuntimeFilter(RuntimeFilter runtimeFilter) { + this.runtimeFilters.add(runtimeFilter); + } + @Override public String getExplainString(String prefix, TExplainLevel explainLevel) { StringBuilder strBuilder = new StringBuilder(); @@ -114,6 +130,10 @@ public class DataStreamSink extends DataSink { Expr expr = PlanNode.convertConjunctsToAndCompoundPredicate(conjuncts); strBuilder.append(prefix).append(" CONJUNCTS: ").append(expr.toSql()).append("\n"); } + if (!runtimeFilters.isEmpty()) { + strBuilder.append(prefix).append(" runtime filters: "); + strBuilder.append(getRuntimeFilterExplainString(false, false)); + } if (!CollectionUtils.isEmpty(projections)) { strBuilder.append(prefix).append(" PROJECTIONS: ") .append(PlanNode.getExplainString(projections)).append("\n"); @@ -124,6 +144,34 @@ public class DataStreamSink extends DataSink { return strBuilder.toString(); } + protected String getRuntimeFilterExplainString(boolean isBuildNode, boolean isBrief) { + if (runtimeFilters.isEmpty()) { + return ""; + } + List filtersStr = new ArrayList<>(); + for (RuntimeFilter filter : runtimeFilters) { + StringBuilder filterStr = new StringBuilder(); + filterStr.append(filter.getFilterId()); + if (!isBrief) { + filterStr.append("["); + filterStr.append(filter.getType().toString().toLowerCase()); + filterStr.append("]"); + if (isBuildNode) { + filterStr.append(" <- "); + filterStr.append(filter.getSrcExpr().toSql()); + filterStr.append("(").append(filter.getEstimateNdv()).append("/") + .append(filter.getExpectFilterSizeBytes()).append("/") + .append(filter.getFilterSizeBytes()).append(")"); + } else { + filterStr.append(" -> "); + filterStr.append(filter.getTargetExpr(getExchNodeId()).toSql()); + } + } + filtersStr.add(filterStr.toString()); + } + return Joiner.on(", ").join(filtersStr) + "\n"; + } + @Override protected TDataSink toThrift() { TDataSink result = new TDataSink(TDataSinkType.DATA_STREAM_SINK); @@ -142,6 +190,12 @@ public class DataStreamSink extends DataSink { if (outputTupleDesc != null) { tStreamSink.setOutputTupleId(outputTupleDesc.getId().asInt()); } + + if (runtimeFilters != null) { + for (RuntimeFilter rf : runtimeFilters) { + tStreamSink.addToRuntimeFilters(rf.toThrift()); + } + } result.setStreamSink(tStreamSink); return result; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java index 02418fc01a..73c48ec181 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java @@ -747,7 +747,7 @@ public class HashJoinNode extends JoinNodeBase { output.append(detailPrefix).append( String.format("cardinality=%,d", cardinality)).append("\n"); if (!runtimeFilters.isEmpty()) { - output.append(detailPrefix).append("Build RFs: "); + output.append(detailPrefix).append("runtime filters: "); output.append(getRuntimeFilterExplainString(true, true)); } return output.toString(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java index 64ac4c3051..c398ed7a8e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java @@ -33,7 +33,6 @@ import org.apache.doris.thrift.TPartitionType; import org.apache.doris.thrift.TPlanFragment; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import org.apache.commons.collections.CollectionUtils; import org.apache.logging.log4j.LogManager; @@ -171,8 +170,8 @@ public class PlanFragment extends TreeNode { public PlanFragment(PlanFragmentId id, PlanNode root, DataPartition partition, Set builderRuntimeFilterIds, Set targetRuntimeFilterIds) { this(id, root, partition); - this.builderRuntimeFilterIds = ImmutableSet.copyOf(builderRuntimeFilterIds); - this.targetRuntimeFilterIds = ImmutableSet.copyOf(targetRuntimeFilterIds); + this.builderRuntimeFilterIds = new HashSet<>(builderRuntimeFilterIds); + this.targetRuntimeFilterIds = new HashSet<>(targetRuntimeFilterIds); } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java index d45f4f2611..7b84548e0f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java @@ -123,7 +123,8 @@ public final class RuntimeFilter { public RuntimeFilterTarget(ScanNode targetNode, Expr targetExpr, boolean isBoundByKeyColumns, boolean isLocalTarget) { - Preconditions.checkState(targetExpr.isBoundByTupleIds(targetNode.getTupleIds())); + Preconditions.checkState(targetExpr.isBoundByTupleIds(targetNode.getTupleIds()) + || targetNode instanceof CTEScanNode); this.node = targetNode; this.expr = targetExpr; this.isBoundByKeyColumns = isBoundByKeyColumns; diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java index 67dd9bb054..d1527df0ec 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java @@ -22,6 +22,7 @@ public enum StatisticalType { AGG_NODE, ANALYTIC_EVAL_NODE, ASSERT_NUM_ROWS_NODE, + CTE_SCAN_NODE, BROKER_SCAN_NODE, NESTED_LOOP_JOIN_NODE, EMPTY_SET_NODE,