[enhancement](Nereids) adjust runtime filter parameters when enable pipeline engine (#18427)

when enable pipeline engine, we

1. reduce non-broadcast join bloom filter size
2. turn in or bloom filter to bloom filter if target is remote
This commit is contained in:
morrySnow
2023-04-07 15:48:25 +08:00
committed by GitHub
parent 63994e351f
commit c5d9e8529a
4 changed files with 33 additions and 18 deletions

View File

@ -543,8 +543,8 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
context.addScanNode(olapScanNode);
// translate runtime filter
context.getRuntimeTranslator().ifPresent(
runtimeFilterGenerator -> runtimeFilterGenerator.getTargetOnScanNode(olapScan.getId()).forEach(
expr -> runtimeFilterGenerator.translateRuntimeFilterTarget(expr, olapScanNode, context)
runtimeFilterTranslator -> runtimeFilterTranslator.getTargetOnScanNode(olapScan.getId()).forEach(
expr -> runtimeFilterTranslator.translateRuntimeFilterTarget(expr, olapScanNode, context)
)
);
olapScanNode.finalizeForNereids();

View File

@ -34,6 +34,7 @@ import org.apache.doris.planner.HashJoinNode;
import org.apache.doris.planner.HashJoinNode.DistributionMode;
import org.apache.doris.planner.JoinNodeBase;
import org.apache.doris.planner.RuntimeFilter.RuntimeFilterTarget;
import org.apache.doris.planner.RuntimeFilterGenerator.FilterSizeLimits;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.thrift.TRuntimeFilterType;
@ -125,11 +126,20 @@ public class RuntimeFilterTranslator {
if (!src.getType().equals(target.getType()) && filter.getType() != TRuntimeFilterType.BITMAP) {
targetExpr = new CastExpr(src.getType(), targetExpr);
}
FilterSizeLimits filterSizeLimits = context.getLimits();
if (node instanceof HashJoinNode
&& !(((HashJoinNode) node).getDistributionMode() == DistributionMode.BROADCAST)
&& ConnectContext.get() != null
&& ConnectContext.get().getSessionVariable().enablePipelineEngine()
&& ConnectContext.get().getSessionVariable().getParallelExecInstanceNum() > 0) {
filterSizeLimits = filterSizeLimits.adjustForParallel(
ConnectContext.get().getSessionVariable().getParallelExecInstanceNum());
}
org.apache.doris.planner.RuntimeFilter origFilter
= org.apache.doris.planner.RuntimeFilter.fromNereidsRuntimeFilter(
filter.getId(), node, src, filter.getExprOrder(), targetExpr,
ImmutableMap.of(targetTupleId, ImmutableList.of(targetSlotId)),
filter.getType(), context.getLimits());
filter.getType(), filterSizeLimits);
if (node instanceof HashJoinNode) {
origFilter.setIsBroadcast(((HashJoinNode) node).getDistributionMode() == DistributionMode.BROADCAST);
} else {
@ -151,7 +161,7 @@ public class RuntimeFilterTranslator {
origFilter.assignToPlanNodes();
origFilter.extractTargetsPosition();
// Number of parallel instances are large for pipeline engine, so we prefer bloom filter.
if (!origFilter.hasRemoteTargets() && origFilter.getType() == TRuntimeFilterType.IN_OR_BLOOM
if (origFilter.hasRemoteTargets() && origFilter.getType() == TRuntimeFilterType.IN_OR_BLOOM
&& ConnectContext.get() != null
&& ConnectContext.get().getSessionVariable().enablePipelineEngine()) {
origFilter.setType(TRuntimeFilterType.BLOOM);

View File

@ -35,7 +35,6 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalStorageLayerAggregate;
import org.apache.doris.nereids.trees.plans.physical.RuntimeFilter;
import org.apache.doris.nereids.util.ExpressionUtils;
import org.apache.doris.nereids.util.JoinUtils;
@ -54,12 +53,14 @@ import java.util.stream.Collectors;
* generate runtime filter
*/
public class RuntimeFilterGenerator extends PlanPostProcessor {
private static final ImmutableSet<JoinType> deniedJoinType = ImmutableSet.of(
private static final ImmutableSet<JoinType> DENIED_JOIN_TYPES = ImmutableSet.of(
JoinType.LEFT_ANTI_JOIN,
JoinType.FULL_OUTER_JOIN,
JoinType.LEFT_OUTER_JOIN,
JoinType.NULL_AWARE_LEFT_ANTI_JOIN
);
private final IdGenerator<RuntimeFilterId> generator = RuntimeFilterId.createGenerator();
/**
@ -83,8 +84,7 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
Map<NamedExpression, Pair<ObjectId, Slot>> aliasTransferMap = ctx.getAliasTransferMap();
join.right().accept(this, context);
join.left().accept(this, context);
if (deniedJoinType.contains(join.getJoinType()) || join.isMarkJoin()) {
// copy to avoid bug when next call of getOutputSet()
if (DENIED_JOIN_TYPES.contains(join.getJoinType()) || join.isMarkJoin()) {
Set<Slot> slots = join.getOutputSet();
slots.forEach(aliasTransferMap::remove);
} else {
@ -92,7 +92,7 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
.filter(type -> (type.getValue() & ctx.getSessionVariable().getRuntimeFilterType()) > 0)
.collect(Collectors.toList());
// TODO: some complex situation cannot be handled now, see testPushDownThroughJoin.
// TODO: we will support it in later version.
// we will support it in later version.
for (int i = 0; i < join.getHashJoinConjuncts().size(); i++) {
EqualTo equalTo = ((EqualTo) JoinUtils.swapEqualToForChildrenOrder(
(EqualTo) join.getHashJoinConjuncts().get(i), join.left().getOutputSet()));
@ -102,7 +102,7 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
continue;
}
// currently, we can ensure children in the two side are corresponding to the equal_to's.
// so right maybe an expression and left is a slot or cast(slot)
// so right maybe an expression and left is a slot
Slot unwrappedSlot = checkTargetChild(equalTo.left());
// aliasTransMap doesn't contain the key, means that the path from the olap scan to the join
// contains join with denied join type. for example: a left join b on a.id = b.id
@ -124,6 +124,7 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
@Override
public PhysicalPlan visitPhysicalNestedLoopJoin(PhysicalNestedLoopJoin<? extends Plan, ? extends Plan> join,
CascadesContext context) {
// TODO: we need to support all type join
if (join.getJoinType() != JoinType.LEFT_SEMI_JOIN && join.getJoinType() != JoinType.CROSS_JOIN) {
return join;
}
@ -201,13 +202,6 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
return scan;
}
@Override
public PhysicalStorageLayerAggregate visitPhysicalStorageLayerAggregate(
PhysicalStorageLayerAggregate storageLayerAggregate, CascadesContext context) {
storageLayerAggregate.getRelation().accept(this, context);
return storageLayerAggregate;
}
private static Slot checkTargetChild(Expression leftChild) {
Expression expression = ExpressionUtils.getExpressionCoveredByCast(leftChild);
return expression instanceof Slot ? ((Slot) expression) : null;

View File

@ -117,6 +117,17 @@ public final class RuntimeFilterGenerator {
defaultValue = Math.max(defaultValue, minVal);
defaultVal = BitUtil.roundUpToPowerOf2(Math.min(defaultValue, maxVal));
}
private FilterSizeLimits(long maxVal, long minVal, long defaultVal) {
this.maxVal = BitUtil.roundUpToPowerOf2(maxVal);
this.minVal = BitUtil.roundUpToPowerOf2(minVal);
defaultVal = Math.max(defaultVal, this.minVal);
this.defaultVal = BitUtil.roundUpToPowerOf2(Math.min(defaultVal, this.maxVal));
}
public FilterSizeLimits adjustForParallel(int parallel) {
return new FilterSizeLimits(maxVal / parallel, minVal / parallel, defaultVal / parallel);
}
}
// Contains size limits for bloom filters.
@ -191,7 +202,7 @@ public final class RuntimeFilterGenerator {
for (RuntimeFilter filter : filters) {
filter.extractTargetsPosition();
// Number of parallel instances are large for pipeline engine, so we prefer bloom filter.
if (!filter.hasRemoteTargets() && filter.getType() == TRuntimeFilterType.IN_OR_BLOOM
if (filter.hasRemoteTargets() && filter.getType() == TRuntimeFilterType.IN_OR_BLOOM
&& ConnectContext.get() != null
&& ConnectContext.get().getSessionVariable().enablePipelineEngine()) {
filter.setType(TRuntimeFilterType.BLOOM);