[opt](nereids) do not change RuntimeFilter Type from IN-OR_BLOOM to BLOOM on broadcast join (#30148)

1. do not change RuntimeFilter Type from IN-OR_BLOOM to BLOOM on broadcast join
    tpcds1T, q48 improved from 4.x sec to 1.x sec
    2. skip some redunant runtime filter
    example: A join B on A.a1=B.b and A.a1 = A.a2
    RF B.b->(A.a1, A.a2)
    however, RF(B.b->A.a2) is implied by RF(B.a->A.a1) and A.a1=A.a2
    we skip RF(B.b->A.a2)
    Issue Number: close #xxx
This commit is contained in:
minghong
2024-01-22 09:22:19 +08:00
committed by yiguolei
parent 1bb1d35f70
commit 332b9cb619
7 changed files with 56 additions and 36 deletions

View File

@ -38,7 +38,6 @@ import org.apache.doris.planner.JoinNodeBase;
import org.apache.doris.planner.RuntimeFilter.RuntimeFilterTarget;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.thrift.TRuntimeFilterType;
@ -185,11 +184,6 @@ public class RuntimeFilterTranslator {
origFilter.markFinalized();
origFilter.assignToPlanNodes();
origFilter.extractTargetsPosition();
// Number of parallel instances are large for pipeline engine, so we prefer bloom filter.
if (origFilter.hasRemoteTargets() && origFilter.getType() == TRuntimeFilterType.IN_OR_BLOOM
&& SessionVariable.enablePipelineEngine()) {
origFilter.setType(TRuntimeFilterType.BLOOM);
}
return origFilter;
}
}

View File

@ -567,7 +567,7 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
PhysicalHashJoin<? extends Plan, ? extends Plan> join = innerEntry.getValue();
Preconditions.checkState(join != null);
TRuntimeFilterType type = TRuntimeFilterType.IN_OR_BLOOM;
if (ctx.getSessionVariable().getEnablePipelineEngine()) {
if (ctx.getSessionVariable().getEnablePipelineEngine() && !join.isBroadCastJoin()) {
type = TRuntimeFilterType.BLOOM;
}
EqualTo newEqualTo = ((EqualTo) JoinUtils.swapEqualToForChildrenOrder(

View File

@ -19,6 +19,8 @@ package org.apache.doris.nereids.trees.plans.physical;
import org.apache.doris.nereids.hint.DistributeHint;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.properties.DistributionSpec;
import org.apache.doris.nereids.properties.DistributionSpecReplicated;
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.trees.expressions.Expression;
@ -251,4 +253,17 @@ public abstract class AbstractPhysicalJoin<
return Utils.toSqlString(this.getClass().getSimpleName() + "[" + id.asInt() + "]" + getGroupIdWithPrefix(),
args.toArray());
}
/**
* true if this is a broadcast join
*/
public boolean isBroadCastJoin() {
if (child(1) instanceof PhysicalDistribute) {
DistributionSpec distSpec = ((PhysicalDistribute) child(1)).getDistributionSpec();
if (distSpec instanceof DistributionSpecReplicated) {
return true;
}
}
return false;
}
}

View File

@ -112,19 +112,26 @@ public abstract class AbstractPhysicalPlan extends AbstractPlan implements Physi
// in-filter is not friendly to pipeline
if (type == TRuntimeFilterType.IN_OR_BLOOM
&& ctx.getSessionVariable().getEnablePipelineEngine()
&& RuntimeFilterGenerator.hasRemoteTarget(builderNode, scan)) {
&& RuntimeFilterGenerator.hasRemoteTarget(builderNode, scan)
&& !builderNode.isBroadCastJoin()) {
type = TRuntimeFilterType.BLOOM;
}
org.apache.doris.nereids.trees.plans.physical.RuntimeFilter filter =
ctx.getRuntimeFilterBySrcAndType(src, type, builderNode);
Preconditions.checkState(scanSlot != null, "scan slot is null");
if (filter != null) {
this.addAppliedRuntimeFilter(filter);
filter.addTargetSlot(scanSlot, scan);
filter.addTargetExpression(scanSlot);
ctx.addJoinToTargetMap(builderNode, scanSlot.getExprId());
ctx.setTargetExprIdToFilter(scanSlot.getExprId(), filter);
ctx.setTargetsOnScanNode(ctx.getAliasTransferPair((NamedExpression) probeExpr).first, scanSlot);
if (!filter.hasTargetScan(scan)) {
// A join B on A.a1=B.b and A.a1 = A.a2
// RF B.b->(A.a1, A.a2)
// however, RF(B.b->A.a2) is implied by RF(B.a->A.a1) and A.a1=A.a2
// we skip RF(B.b->A.a2)
this.addAppliedRuntimeFilter(filter);
filter.addTargetSlot(scanSlot, scan);
filter.addTargetExpression(scanSlot);
ctx.addJoinToTargetMap(builderNode, scanSlot.getExprId());
ctx.setTargetExprIdToFilter(scanSlot.getExprId(), filter);
ctx.setTargetsOnScanNode(ctx.getAliasTransferPair((NamedExpression) probeExpr).first, scanSlot);
}
} else {
filter = new RuntimeFilter(generator.getNextId(),
src, ImmutableList.of(scanSlot), type, exprOrder, builderNode, buildSideNdv, scan);

View File

@ -148,6 +148,10 @@ public class RuntimeFilter {
return targetScans;
}
public boolean hasTargetScan(PhysicalRelation scan) {
return targetScans.contains(scan);
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();