From c5d9e8529a9522b85215a6092fb55e8a26d77a1f Mon Sep 17 00:00:00 2001 From: morrySnow <101034200+morrySnow@users.noreply.github.com> Date: Fri, 7 Apr 2023 15:48:25 +0800 Subject: [PATCH] [enhancement](Nereids) adjust runtime filter parameters when enable pipeline engine (#18427) when enable pipeline engine, we 1. reduce non-broadcast join bloom filter size 2. turn in or bloom filter to bloom filter if target is remote --- .../translator/PhysicalPlanTranslator.java | 4 ++-- .../translator/RuntimeFilterTranslator.java | 14 +++++++++++-- .../post/RuntimeFilterGenerator.java | 20 +++++++------------ .../doris/planner/RuntimeFilterGenerator.java | 13 +++++++++++- 4 files changed, 33 insertions(+), 18 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 287531b0c6..e19a027844 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -543,8 +543,8 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor runtimeFilterGenerator.getTargetOnScanNode(olapScan.getId()).forEach( - expr -> runtimeFilterGenerator.translateRuntimeFilterTarget(expr, olapScanNode, context) + runtimeFilterTranslator -> runtimeFilterTranslator.getTargetOnScanNode(olapScan.getId()).forEach( + expr -> runtimeFilterTranslator.translateRuntimeFilterTarget(expr, olapScanNode, context) ) ); olapScanNode.finalizeForNereids(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java index 4d45aa05bc..6138596620 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java @@ -34,6 +34,7 @@ import org.apache.doris.planner.HashJoinNode; import org.apache.doris.planner.HashJoinNode.DistributionMode; import org.apache.doris.planner.JoinNodeBase; import org.apache.doris.planner.RuntimeFilter.RuntimeFilterTarget; +import org.apache.doris.planner.RuntimeFilterGenerator.FilterSizeLimits; import org.apache.doris.planner.ScanNode; import org.apache.doris.qe.ConnectContext; import org.apache.doris.thrift.TRuntimeFilterType; @@ -125,11 +126,20 @@ public class RuntimeFilterTranslator { if (!src.getType().equals(target.getType()) && filter.getType() != TRuntimeFilterType.BITMAP) { targetExpr = new CastExpr(src.getType(), targetExpr); } + FilterSizeLimits filterSizeLimits = context.getLimits(); + if (node instanceof HashJoinNode + && !(((HashJoinNode) node).getDistributionMode() == DistributionMode.BROADCAST) + && ConnectContext.get() != null + && ConnectContext.get().getSessionVariable().enablePipelineEngine() + && ConnectContext.get().getSessionVariable().getParallelExecInstanceNum() > 0) { + filterSizeLimits = filterSizeLimits.adjustForParallel( + ConnectContext.get().getSessionVariable().getParallelExecInstanceNum()); + } org.apache.doris.planner.RuntimeFilter origFilter = org.apache.doris.planner.RuntimeFilter.fromNereidsRuntimeFilter( filter.getId(), node, src, filter.getExprOrder(), targetExpr, ImmutableMap.of(targetTupleId, ImmutableList.of(targetSlotId)), - filter.getType(), context.getLimits()); + filter.getType(), filterSizeLimits); if (node instanceof HashJoinNode) { origFilter.setIsBroadcast(((HashJoinNode) node).getDistributionMode() == DistributionMode.BROADCAST); } else { @@ -151,7 +161,7 @@ public class RuntimeFilterTranslator { origFilter.assignToPlanNodes(); origFilter.extractTargetsPosition(); // Number of parallel instances are large for pipeline engine, so we prefer bloom filter. - if (!origFilter.hasRemoteTargets() && origFilter.getType() == TRuntimeFilterType.IN_OR_BLOOM + if (origFilter.hasRemoteTargets() && origFilter.getType() == TRuntimeFilterType.IN_OR_BLOOM && ConnectContext.get() != null && ConnectContext.get().getSessionVariable().enablePipelineEngine()) { origFilter.setType(TRuntimeFilterType.BLOOM); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java index ec85e65d65..bcd2eedc7e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java @@ -35,7 +35,6 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin; import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan; import org.apache.doris.nereids.trees.plans.physical.PhysicalProject; import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation; -import org.apache.doris.nereids.trees.plans.physical.PhysicalStorageLayerAggregate; import org.apache.doris.nereids.trees.plans.physical.RuntimeFilter; import org.apache.doris.nereids.util.ExpressionUtils; import org.apache.doris.nereids.util.JoinUtils; @@ -54,12 +53,14 @@ import java.util.stream.Collectors; * generate runtime filter */ public class RuntimeFilterGenerator extends PlanPostProcessor { - private static final ImmutableSet deniedJoinType = ImmutableSet.of( + + private static final ImmutableSet DENIED_JOIN_TYPES = ImmutableSet.of( JoinType.LEFT_ANTI_JOIN, JoinType.FULL_OUTER_JOIN, JoinType.LEFT_OUTER_JOIN, JoinType.NULL_AWARE_LEFT_ANTI_JOIN ); + private final IdGenerator generator = RuntimeFilterId.createGenerator(); /** @@ -83,8 +84,7 @@ public class RuntimeFilterGenerator extends PlanPostProcessor { Map> aliasTransferMap = ctx.getAliasTransferMap(); join.right().accept(this, context); join.left().accept(this, context); - if (deniedJoinType.contains(join.getJoinType()) || join.isMarkJoin()) { - // copy to avoid bug when next call of getOutputSet() + if (DENIED_JOIN_TYPES.contains(join.getJoinType()) || join.isMarkJoin()) { Set slots = join.getOutputSet(); slots.forEach(aliasTransferMap::remove); } else { @@ -92,7 +92,7 @@ public class RuntimeFilterGenerator extends PlanPostProcessor { .filter(type -> (type.getValue() & ctx.getSessionVariable().getRuntimeFilterType()) > 0) .collect(Collectors.toList()); // TODO: some complex situation cannot be handled now, see testPushDownThroughJoin. - // TODO: we will support it in later version. + // we will support it in later version. for (int i = 0; i < join.getHashJoinConjuncts().size(); i++) { EqualTo equalTo = ((EqualTo) JoinUtils.swapEqualToForChildrenOrder( (EqualTo) join.getHashJoinConjuncts().get(i), join.left().getOutputSet())); @@ -102,7 +102,7 @@ public class RuntimeFilterGenerator extends PlanPostProcessor { continue; } // currently, we can ensure children in the two side are corresponding to the equal_to's. - // so right maybe an expression and left is a slot or cast(slot) + // so right maybe an expression and left is a slot Slot unwrappedSlot = checkTargetChild(equalTo.left()); // aliasTransMap doesn't contain the key, means that the path from the olap scan to the join // contains join with denied join type. for example: a left join b on a.id = b.id @@ -124,6 +124,7 @@ public class RuntimeFilterGenerator extends PlanPostProcessor { @Override public PhysicalPlan visitPhysicalNestedLoopJoin(PhysicalNestedLoopJoin join, CascadesContext context) { + // TODO: we need to support all type join if (join.getJoinType() != JoinType.LEFT_SEMI_JOIN && join.getJoinType() != JoinType.CROSS_JOIN) { return join; } @@ -201,13 +202,6 @@ public class RuntimeFilterGenerator extends PlanPostProcessor { return scan; } - @Override - public PhysicalStorageLayerAggregate visitPhysicalStorageLayerAggregate( - PhysicalStorageLayerAggregate storageLayerAggregate, CascadesContext context) { - storageLayerAggregate.getRelation().accept(this, context); - return storageLayerAggregate; - } - private static Slot checkTargetChild(Expression leftChild) { Expression expression = ExpressionUtils.getExpressionCoveredByCast(leftChild); return expression instanceof Slot ? ((Slot) expression) : null; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilterGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilterGenerator.java index 23a67233a3..a7432fd9fb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilterGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilterGenerator.java @@ -117,6 +117,17 @@ public final class RuntimeFilterGenerator { defaultValue = Math.max(defaultValue, minVal); defaultVal = BitUtil.roundUpToPowerOf2(Math.min(defaultValue, maxVal)); } + + private FilterSizeLimits(long maxVal, long minVal, long defaultVal) { + this.maxVal = BitUtil.roundUpToPowerOf2(maxVal); + this.minVal = BitUtil.roundUpToPowerOf2(minVal); + defaultVal = Math.max(defaultVal, this.minVal); + this.defaultVal = BitUtil.roundUpToPowerOf2(Math.min(defaultVal, this.maxVal)); + } + + public FilterSizeLimits adjustForParallel(int parallel) { + return new FilterSizeLimits(maxVal / parallel, minVal / parallel, defaultVal / parallel); + } } // Contains size limits for bloom filters. @@ -191,7 +202,7 @@ public final class RuntimeFilterGenerator { for (RuntimeFilter filter : filters) { filter.extractTargetsPosition(); // Number of parallel instances are large for pipeline engine, so we prefer bloom filter. - if (!filter.hasRemoteTargets() && filter.getType() == TRuntimeFilterType.IN_OR_BLOOM + if (filter.hasRemoteTargets() && filter.getType() == TRuntimeFilterType.IN_OR_BLOOM && ConnectContext.get() != null && ConnectContext.get().getSessionVariable().enablePipelineEngine()) { filter.setType(TRuntimeFilterType.BLOOM);