[rf](nereids)prune rf for external db according to jump count (#29634)

* prune some rf for external db
This commit is contained in:
minghong
2024-01-08 16:33:43 +08:00
committed by yiguolei
parent c9e2f1934a
commit eea657a610
12 changed files with 217 additions and 28 deletions

View File

@ -69,6 +69,9 @@ public class PlanPostProcessors {
builder.add(new RuntimeFilterGenerator());
if (ConnectContext.get().getSessionVariable().enableRuntimeFilterPrune) {
builder.add(new RuntimeFilterPruner());
if (ConnectContext.get().getSessionVariable().runtimeFilterPruneForExternal) {
builder.add(new RuntimeFilterPrunerForExternalTable());
}
}
}
builder.add(new Validator());

View File

@ -96,6 +96,8 @@ public class RuntimeFilterContext {
public List<RuntimeFilter> prunedRF = Lists.newArrayList();
public final List<Plan> needRfPlans = Lists.newArrayList();
private final IdGenerator<RuntimeFilterId> generator = RuntimeFilterId.createGenerator();
// exprId of target to runtime filter.
@ -197,14 +199,20 @@ public class RuntimeFilterContext {
RuntimeFilter rf = iter.next();
if (rf.getBuilderNode().equals(builderNode)) {
builderNode.getRuntimeFilters().remove(rf);
for (Slot target : rf.getTargetSlots()) {
if (target.getExprId().equals(targetId)) {
Pair<PhysicalRelation, Slot> pair = aliasTransferMap.get(target);
if (pair != null) {
pair.first.removeAppliedRuntimeFilter(rf);
}
for (int i = 0; i < rf.getTargetSlots().size(); i++) {
Slot targetSlot = rf.getTargetSlots().get(i);
if (targetSlot.getExprId().equals(targetId)) {
rf.getTargetScans().get(i).removeAppliedRuntimeFilter(rf);
}
}
// for (Slot target : rf.getTargetSlots()) {
// if (target.getExprId().equals(targetId)) {
// Pair<PhysicalRelation, Slot> pair = aliasTransferMap.get(target);
// if (pair != null) {
// pair.first.removeAppliedRuntimeFilter(rf);
// }
// }
// }
iter.remove();
prunedRF.add(rf);
}

View File

@ -186,7 +186,7 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
PhysicalRelation scan = ctx.getAliasTransferPair(targetSlot).first;
RuntimeFilter filter = new RuntimeFilter(generator.getNextId(),
bitmapContains.child(0), ImmutableList.of(scanSlot),
ImmutableList.of(bitmapContains.child(1)), type, i, join, isNot, -1L);
ImmutableList.of(bitmapContains.child(1)), type, i, join, isNot, -1L, scan);
scan.addAppliedRuntimeFilter(filter);
ctx.addJoinToTargetMap(join, scanSlot.getExprId());
ctx.setTargetExprIdToFilter(scanSlot.getExprId(), filter);
@ -266,7 +266,7 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
RuntimeFilter filter = new RuntimeFilter(generator.getNextId(),
compare.child(1), ImmutableList.of(olapScanSlot), ImmutableList.of(olapScanSlot),
TRuntimeFilterType.MIN_MAX, exprOrder, join, true, buildSideNdv,
getMinMaxType(compare));
getMinMaxType(compare), scan);
scan.addAppliedRuntimeFilter(filter);
ctx.addJoinToTargetMap(join, olapScanSlot.getExprId());
ctx.setTargetExprIdToFilter(olapScanSlot.getExprId(), filter);
@ -625,7 +625,7 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
// build multi-target runtime filter
// since always on different join, set the expr_order as 0
RuntimeFilter filter = new RuntimeFilter(generator.getNextId(),
equalTo.right(), targetList, type, 0, join, buildSideNdv);
equalTo.right(), targetList, type, 0, join, buildSideNdv, cteNode);
targetNodes.forEach(node -> node.addAppliedRuntimeFilter(filter));
for (Slot slot : targetList) {
ctx.setTargetExprIdToFilter(slot.getExprId(), filter);

View File

@ -0,0 +1,158 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.processor.post;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.algebra.Join;
import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFileScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation;
import org.apache.doris.nereids.trees.plans.physical.RuntimeFilter;
import org.apache.doris.nereids.util.MutableState;
import org.apache.doris.qe.ConnectContext;
import com.google.common.collect.Lists;
import java.util.List;
import java.util.Optional;
/**
* prune rf for external db
*/
public class RuntimeFilterPrunerForExternalTable extends PlanPostProcessor {
/**
* add parent to plan node and then remove rf if it is an external scan and only used as probe
*/
@Override
public Plan processRoot(Plan plan, CascadesContext ctx) {
plan = plan.accept(this, ctx);
RuntimeFilterContext rfCtx = ctx.getRuntimeFilterContext();
for (RuntimeFilter rf : rfCtx.getNereidsRuntimeFilter()) {
AbstractPhysicalJoin join = rf.getBuilderNode();
if (join instanceof PhysicalHashJoin) {
List<Plan> joinAncestors = getAncestors(rf.getBuilderNode());
for (int i = 0; i < rf.getTargetScans().size(); i++) {
PhysicalRelation scan = rf.getTargetScans().get(i);
if (canPrune(scan, joinAncestors)) {
rfCtx.removeFilter(rf.getTargetSlots().get(i).getExprId(), (PhysicalHashJoin) join);
}
}
}
}
return plan;
}
@Override
public Plan visit(Plan plan, CascadesContext context) {
for (Plan child : plan.children()) {
child.setMutableState(MutableState.KEY_PARENT, plan);
child.accept(this, context);
}
setMaxChildRuntimeFilterJump(plan);
return plan;
}
@Override
public PhysicalRelation visitPhysicalRelation(PhysicalRelation scan, CascadesContext context) {
RuntimeFilterContext rfCtx = context.getRuntimeFilterContext();
List<Slot> slots = rfCtx.getTargetListByScan(scan);
int maxJump = -1;
for (Slot slot : slots) {
if (!rfCtx.getTargetExprIdToFilter().get(slot.getExprId()).isEmpty()) {
for (RuntimeFilter rf : rfCtx.getTargetExprIdToFilter().get(slot.getExprId())) {
Optional<Object> oJump = rf.getBuilderNode().getMutableState(MutableState.KEY_RF_JUMP);
if (oJump.isPresent()) {
Integer jump = (Integer) (oJump.get());
if (jump > maxJump) {
maxJump = jump;
}
}
}
}
}
scan.setMutableState(MutableState.KEY_RF_JUMP, maxJump + 1);
return scan;
}
@Override
public PhysicalHashJoin visitPhysicalHashJoin(PhysicalHashJoin<? extends Plan, ? extends Plan> join,
CascadesContext context) {
join.right().accept(this, context);
join.right().setMutableState(MutableState.KEY_PARENT, join);
join.setMutableState(MutableState.KEY_RF_JUMP, join.right().getMutableState(MutableState.KEY_RF_JUMP).get());
join.left().accept(this, context);
join.left().setMutableState(MutableState.KEY_PARENT, join);
return join;
}
private List<Plan> getAncestors(Plan plan) {
List<Plan> ancestors = Lists.newArrayList();
ancestors.add(plan);
Optional<Object> parent = plan.getMutableState(MutableState.KEY_PARENT);
while (parent.isPresent()) {
ancestors.add((Plan) parent.get());
parent = ((Plan) parent.get()).getMutableState(MutableState.KEY_PARENT);
}
return ancestors;
}
private boolean canPrune(PhysicalRelation scan, List<Plan> joinAndAncestors) {
if (!(scan instanceof PhysicalFileScan)) {
return false;
}
Plan cursor = scan;
Optional<Plan> parent = cursor.getMutableState(MutableState.KEY_PARENT);
while (parent.isPresent()) {
if (joinAndAncestors.contains(parent.get())) {
Optional oi = parent.get().getMutableState(MutableState.KEY_RF_JUMP);
if (oi.isPresent() && ConnectContext.get() != null
&& (int) (oi.get()) > ConnectContext.get().getSessionVariable().runtimeFilterJumpThreshold) {
return true;
}
} else {
if (isBuildSide(parent.get(), cursor)) {
return false;
}
}
cursor = parent.get();
parent = cursor.getMutableState(MutableState.KEY_PARENT);
}
return false;
}
private boolean isBuildSide(Plan parent, Plan child) {
return parent instanceof Join && child.equals(parent.child(1));
}
private void setMaxChildRuntimeFilterJump(Plan plan) {
int maxJump = 0;
for (Plan child : plan.children()) {
Optional oi = child.getMutableState(MutableState.KEY_RF_JUMP);
if (oi.isPresent()) {
int jump = (Integer) (oi.get());
if (jump > maxJump) {
maxJump = jump;
}
}
}
plan.setMutableState(MutableState.KEY_RF_JUMP, maxJump);
}
}

View File

@ -120,14 +120,14 @@ public abstract class AbstractPhysicalPlan extends AbstractPlan implements Physi
Preconditions.checkState(scanSlot != null, "scan slot is null");
if (filter != null) {
this.addAppliedRuntimeFilter(filter);
filter.addTargetSlot(scanSlot);
filter.addTargetSlot(scanSlot, scan);
filter.addTargetExpression(scanSlot);
ctx.addJoinToTargetMap(builderNode, scanSlot.getExprId());
ctx.setTargetExprIdToFilter(scanSlot.getExprId(), filter);
ctx.setTargetsOnScanNode(ctx.getAliasTransferPair((NamedExpression) probeExpr).first, scanSlot);
} else {
filter = new RuntimeFilter(generator.getNextId(),
src, ImmutableList.of(scanSlot), type, exprOrder, builderNode, buildSideNdv);
src, ImmutableList.of(scanSlot), type, exprOrder, builderNode, buildSideNdv, scan);
this.addAppliedRuntimeFilter(filter);
ctx.addJoinToTargetMap(builderNode, scanSlot.getExprId());
ctx.setTargetExprIdToFilter(scanSlot.getExprId(), filter);

View File

@ -257,6 +257,7 @@ public class PhysicalHashJoin<
builder.append(" build RFs:").append(runtimeFilters.stream()
.map(rf -> rf.shapeInfo()).collect(Collectors.joining(";")));
}
// builder.append("jump: ").append(getMutableState(MutableState.KEY_RF_JUMP));
return builder.toString();
}

View File

@ -50,20 +50,24 @@ public class RuntimeFilter {
// use for min-max filter only. specify if the min or max side is valid
private final TMinMaxRuntimeFilterType tMinMaxType;
private final List<PhysicalRelation> targetScans = Lists.newArrayList();
/**
* constructor
*/
public RuntimeFilter(RuntimeFilterId id, Expression src, List<Slot> targets, TRuntimeFilterType type,
int exprOrder, AbstractPhysicalJoin builderNode, long buildSideNdv) {
int exprOrder, AbstractPhysicalJoin builderNode, long buildSideNdv,
PhysicalRelation scan) {
this(id, src, targets, ImmutableList.copyOf(targets), type, exprOrder,
builderNode, false, buildSideNdv, TMinMaxRuntimeFilterType.MIN_MAX);
builderNode, false, buildSideNdv, TMinMaxRuntimeFilterType.MIN_MAX, scan);
}
public RuntimeFilter(RuntimeFilterId id, Expression src, List<Slot> targets, List<Expression> targetExpressions,
TRuntimeFilterType type, int exprOrder, AbstractPhysicalJoin builderNode,
boolean bitmapFilterNotIn, long buildSideNdv) {
boolean bitmapFilterNotIn, long buildSideNdv,
PhysicalRelation scan) {
this(id, src, targets, targetExpressions, type, exprOrder,
builderNode, bitmapFilterNotIn, buildSideNdv, TMinMaxRuntimeFilterType.MIN_MAX);
builderNode, bitmapFilterNotIn, buildSideNdv, TMinMaxRuntimeFilterType.MIN_MAX, scan);
}
/**
@ -71,7 +75,8 @@ public class RuntimeFilter {
*/
public RuntimeFilter(RuntimeFilterId id, Expression src, List<Slot> targets, List<Expression> targetExpressions,
TRuntimeFilterType type, int exprOrder, AbstractPhysicalJoin builderNode,
boolean bitmapFilterNotIn, long buildSideNdv, TMinMaxRuntimeFilterType tMinMaxType) {
boolean bitmapFilterNotIn, long buildSideNdv, TMinMaxRuntimeFilterType tMinMaxType,
PhysicalRelation scan) {
this.id = id;
this.srcSlot = src;
this.targetSlots = Lists.newArrayList(targets);
@ -83,6 +88,7 @@ public class RuntimeFilter {
this.buildSideNdv = buildSideNdv <= 0 ? -1L : buildSideNdv;
this.tMinMaxType = tMinMaxType;
builderNode.addRuntimeFilter(this);
this.targetScans.add(scan);
}
public TMinMaxRuntimeFilterType gettMinMaxType() {
@ -125,8 +131,9 @@ public class RuntimeFilter {
return buildSideNdv;
}
public void addTargetSlot(Slot target) {
public void addTargetSlot(Slot target, PhysicalRelation scan) {
targetSlots.add(target);
targetScans.add(scan);
}
public List<Slot> getTargetSlots() {
@ -137,6 +144,10 @@ public class RuntimeFilter {
targetExpressions.add(targetExpr);
}
public List<PhysicalRelation> getTargetScans() {
return targetScans;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();

View File

@ -25,6 +25,8 @@ import java.util.Optional;
public interface MutableState {
String KEY_GROUP = "group";
String KEY_FRAGMENT = "fragment";
String KEY_PARENT = "parent";
String KEY_RF_JUMP = "rf-jump";
<T> Optional<T> get(String key);

View File

@ -567,6 +567,12 @@ public class SessionVariable implements Serializable, Writable {
@VariableMgr.VarAttr(name = ENABLE_PROFILE, needForward = true)
public boolean enableProfile = false;
@VariableMgr.VarAttr(name = "runtime_filter_prune_for_external")
public boolean runtimeFilterPruneForExternal = true;
@VariableMgr.VarAttr(name = "runtime_filter_jump_threshold")
public int runtimeFilterJumpThreshold = 2;
// using hashset instead of group by + count can improve performance
// but may cause rpc failed when cluster has less BE
// Whether this switch is turned on depends on the BE number

View File

@ -12,7 +12,7 @@ PhysicalResultSink
----PhysicalDistribute[DistributionSpecGather]
------PhysicalTopN[LOCAL_SORT]
--------hashJoin[INNER_JOIN] hashCondition=((t1.id = t2.id)) otherCondition=()
----------PhysicalOlapScan[t1] apply RFs: RF0
----------PhysicalOlapScan[t1]
----------PhysicalDistribute[DistributionSpecHash]
------------PhysicalOlapScan[t2]
@ -22,7 +22,7 @@ PhysicalResultSink
----PhysicalDistribute[DistributionSpecGather]
------PhysicalTopN[LOCAL_SORT]
--------hashJoin[LEFT_SEMI_JOIN] hashCondition=((t1.id = t2.id)) otherCondition=()
----------PhysicalOlapScan[t1] apply RFs: RF0
----------PhysicalOlapScan[t1]
----------PhysicalDistribute[DistributionSpecHash]
------------PhysicalOlapScan[t2]
@ -114,7 +114,7 @@ PhysicalResultSink
----PhysicalDistribute[DistributionSpecGather]
------PhysicalTopN[LOCAL_SORT]
--------hashJoin[INNER_JOIN] hashCondition=((t1.id = t2.id)) otherCondition=()
----------PhysicalOlapScan[t1] apply RFs: RF0
----------PhysicalOlapScan[t1]
----------PhysicalDistribute[DistributionSpecHash]
------------PhysicalOlapScan[t2]
@ -401,7 +401,7 @@ PhysicalResultSink
----PhysicalDistribute[DistributionSpecGather]
------PhysicalTopN[LOCAL_SORT]
--------hashJoin[INNER_JOIN] hashCondition=((t1.id = t2.id)) otherCondition=()
----------PhysicalOlapScan[t1] apply RFs: RF0
----------PhysicalOlapScan[t1]
----------PhysicalDistribute[DistributionSpecHash]
------------PhysicalOlapScan[t2]
@ -683,7 +683,7 @@ PhysicalResultSink
----PhysicalDistribute[DistributionSpecGather]
------PhysicalTopN[LOCAL_SORT]
--------hashJoin[INNER_JOIN] hashCondition=((t1.id = t2.id)) otherCondition=()
----------PhysicalOlapScan[t1] apply RFs: RF0
----------PhysicalOlapScan[t1]
----------PhysicalDistribute[DistributionSpecHash]
------------PhysicalOlapScan[t2]
@ -754,7 +754,7 @@ PhysicalResultSink
----------PhysicalDistribute[DistributionSpecHash]
------------hashJoin[RIGHT_OUTER_JOIN] hashCondition=((t1.id = t2.id)) otherCondition=()
--------------PhysicalDistribute[DistributionSpecHash]
----------------PhysicalOlapScan[t1] apply RFs: RF0 RF1
----------------PhysicalOlapScan[t1]
--------------PhysicalDistribute[DistributionSpecHash]
----------------PhysicalOlapScan[t2]
----------PhysicalDistribute[DistributionSpecHash]
@ -794,7 +794,7 @@ PhysicalResultSink
------PhysicalTopN[LOCAL_SORT]
--------hashJoin[INNER_JOIN] hashCondition=((subq.id = t3.id)) otherCondition=()
----------hashJoin[INNER_JOIN] hashCondition=((subq.id = t2.id)) otherCondition=()
------------PhysicalOlapScan[t1] apply RFs: RF0 RF1
------------PhysicalOlapScan[t1]
------------PhysicalDistribute[DistributionSpecHash]
--------------PhysicalOlapScan[t2]
----------PhysicalDistribute[DistributionSpecHash]
@ -807,7 +807,7 @@ PhysicalResultSink
------PhysicalTopN[LOCAL_SORT]
--------hashJoin[INNER_JOIN] hashCondition=((subq2.id = t3.id)) otherCondition=()
----------hashJoin[INNER_JOIN] hashCondition=((subq2.id = t2.id)) otherCondition=()
------------PhysicalOlapScan[t1] apply RFs: RF0 RF1
------------PhysicalOlapScan[t1]
------------PhysicalDistribute[DistributionSpecHash]
--------------PhysicalOlapScan[t2]
----------PhysicalDistribute[DistributionSpecHash]
@ -821,7 +821,7 @@ PhysicalResultSink
--------hashAgg[LOCAL]
----------hashJoin[INNER_JOIN] hashCondition=((subq2.id = t3.id)) otherCondition=()
------------hashJoin[INNER_JOIN] hashCondition=((subq2.id = t2.id)) otherCondition=()
--------------PhysicalOlapScan[t1] apply RFs: RF0 RF1
--------------PhysicalOlapScan[t1]
--------------PhysicalDistribute[DistributionSpecHash]
----------------PhysicalOlapScan[t2]
------------PhysicalDistribute[DistributionSpecHash]

View File

@ -101,7 +101,7 @@ PhysicalResultSink
--------------PhysicalDistribute[DistributionSpecGather]
----------------PhysicalTopN[LOCAL_SORT]
------------------hashJoin[INNER_JOIN] hashCondition=((t1.id = t2.id)) otherCondition=()
--------------------PhysicalOlapScan[table1] apply RFs: RF0
--------------------PhysicalOlapScan[table1]
--------------------PhysicalOlapScan[table1]
----------PhysicalDistribute[DistributionSpecExecutionAny]
------------PhysicalTopN[MERGE_SORT]

View File

@ -38,7 +38,7 @@ PhysicalResultSink
----------------------------------------------------PhysicalDistribute[DistributionSpecReplicated]
------------------------------------------------------PhysicalProject
--------------------------------------------------------filter((d1.d_year = 2002))
----------------------------------------------------------PhysicalOlapScan[date_dim] apply RFs: RF5
----------------------------------------------------------PhysicalOlapScan[date_dim]
------------------------------------------------PhysicalDistribute[DistributionSpecHash]
--------------------------------------------------PhysicalProject
----------------------------------------------------filter((customer_demographics.cd_marital_status = 'W'))