[opt](nereids) prune runtime redundant filters (#29828)

1. expand_runtime_filter_by_inner_join will create some redundant rfs,e.g., tpch q5 and q9, we need to remove one
2. hive: prune rf if target only used as probe
This commit is contained in:
minghong
2024-01-11 16:08:19 +08:00
committed by yiguolei
parent ed3c8bba87
commit f67a00ffbb
11 changed files with 152 additions and 44 deletions

View File

@ -21,6 +21,7 @@ import org.apache.doris.analysis.SlotRef;
import org.apache.doris.common.IdGenerator;
import org.apache.doris.common.Pair;
import org.apache.doris.nereids.trees.expressions.CTEId;
import org.apache.doris.nereids.trees.expressions.EqualPredicate;
import org.apache.doris.nereids.trees.expressions.EqualTo;
import org.apache.doris.nereids.trees.expressions.ExprId;
import org.apache.doris.nereids.trees.expressions.Expression;
@ -125,7 +126,7 @@ public class RuntimeFilterContext {
private final Map<Slot, ScanNode> scanNodeOfLegacyRuntimeFilterTarget = Maps.newLinkedHashMap();
private final Set<Plan> effectiveSrcNodes = Sets.newHashSet();
private final Map<Plan, EffectiveSrcType> effectiveSrcNodes = Maps.newHashMap();
// cte to related joins map which can extract common runtime filter to cte inside
private final Map<CTEId, Set<PhysicalHashJoin>> cteToJoinsMap = Maps.newLinkedHashMap();
@ -147,6 +148,30 @@ public class RuntimeFilterContext {
private int targetNullCount = 0;
private final List<ExpandRF> expandedRF = Lists.newArrayList();
/**
* info about expand rf by inner join
*/
public static class ExpandRF {
public AbstractPhysicalJoin buildNode;
public PhysicalRelation srcNode;
public PhysicalRelation target1;
public PhysicalRelation target2;
public EqualPredicate equal;
public ExpandRF(AbstractPhysicalJoin buildNode, PhysicalRelation srcNode,
PhysicalRelation target1, PhysicalRelation target2, EqualPredicate equal) {
this.buildNode = buildNode;
this.srcNode = srcNode;
this.target1 = target1;
this.target2 = target2;
}
}
public RuntimeFilterContext(SessionVariable sessionVariable) {
this.sessionVariable = sessionVariable;
this.limits = new FilterSizeLimits(sessionVariable);
@ -307,12 +332,23 @@ public class RuntimeFilterContext {
targetNullCount++;
}
public void addEffectiveSrcNode(Plan node) {
effectiveSrcNodes.add(node);
/**
* the selectivity produced by predicate or rf
*/
public enum EffectiveSrcType {
NATIVE, REF
}
public void addEffectiveSrcNode(Plan node, EffectiveSrcType type) {
effectiveSrcNodes.put(node, type);
}
public boolean isEffectiveSrcNode(Plan node) {
return effectiveSrcNodes.contains(node);
return effectiveSrcNodes.keySet().contains(node);
}
public EffectiveSrcType getEffectiveSrcType(Plan plan) {
return effectiveSrcNodes.get(plan);
}
@VisibleForTesting
@ -335,4 +371,22 @@ public class RuntimeFilterContext {
}
return olapSlot;
}
/**
* return the info about expand_runtime_filter_by_inner_join
*/
public ExpandRF getExpandRfByJoin(AbstractPhysicalJoin join) {
if (join instanceof PhysicalHashJoin) {
for (ExpandRF expand : expandedRF) {
if (expand.buildNode.equals(join)) {
return expand;
}
}
}
return null;
}
public List<ExpandRF> getExpandedRF() {
return expandedRF;
}
}

View File

@ -32,9 +32,12 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit;
import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalTopN;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.statistics.ColumnStatistic;
import org.apache.doris.statistics.Statistics;
import com.google.common.collect.Sets;
import java.util.List;
import java.util.Set;
@ -57,7 +60,9 @@ public class RuntimeFilterPruner extends PlanPostProcessor {
if (!plan.children().isEmpty()) {
plan.child(0).accept(this, context);
if (context.getRuntimeFilterContext().isEffectiveSrcNode(plan.child(0))) {
context.getRuntimeFilterContext().addEffectiveSrcNode(plan);
RuntimeFilterContext.EffectiveSrcType childType = context.getRuntimeFilterContext()
.getEffectiveSrcType(plan.child(0));
context.getRuntimeFilterContext().addEffectiveSrcNode(plan, childType);
}
}
return plan;
@ -66,13 +71,13 @@ public class RuntimeFilterPruner extends PlanPostProcessor {
@Override
public PhysicalTopN visitPhysicalTopN(PhysicalTopN<? extends Plan> topN, CascadesContext context) {
topN.child().accept(this, context);
context.getRuntimeFilterContext().addEffectiveSrcNode(topN);
context.getRuntimeFilterContext().addEffectiveSrcNode(topN, RuntimeFilterContext.EffectiveSrcType.NATIVE);
return topN;
}
public PhysicalLimit visitPhysicalLimit(PhysicalLimit<? extends Plan> limit, CascadesContext context) {
limit.child().accept(this, context);
context.getRuntimeFilterContext().addEffectiveSrcNode(limit);
context.getRuntimeFilterContext().addEffectiveSrcNode(limit, RuntimeFilterContext.EffectiveSrcType.NATIVE);
return limit;
}
@ -80,11 +85,29 @@ public class RuntimeFilterPruner extends PlanPostProcessor {
public PhysicalHashJoin visitPhysicalHashJoin(PhysicalHashJoin<? extends Plan, ? extends Plan> join,
CascadesContext context) {
join.right().accept(this, context);
if (context.getRuntimeFilterContext().isEffectiveSrcNode(join.right())) {
context.getRuntimeFilterContext().addEffectiveSrcNode(join);
RuntimeFilterContext rfContext = context.getRuntimeFilterContext();
if (rfContext.isEffectiveSrcNode(join.right())) {
boolean enableExpand = false;
if (ConnectContext.get() != null) {
enableExpand = ConnectContext.get().getSessionVariable().expandRuntimeFilterByInnerJoin;
}
if (enableExpand && rfContext.getEffectiveSrcType(join.right())
== RuntimeFilterContext.EffectiveSrcType.REF) {
RuntimeFilterContext.ExpandRF expand = rfContext.getExpandRfByJoin(join);
if (expand != null) {
Set<ExprId> outputExprIdOfExpandTargets = Sets.newHashSet();
outputExprIdOfExpandTargets.addAll(expand.target1.getOutputExprIds());
outputExprIdOfExpandTargets.addAll(expand.target2.getOutputExprIds());
rfContext.getTargetExprIdByFilterJoin(join)
.stream().filter(exprId -> outputExprIdOfExpandTargets.contains(exprId))
.forEach(exprId -> rfContext.removeFilter(exprId, join));
}
}
RuntimeFilterContext.EffectiveSrcType childType =
rfContext.getEffectiveSrcType(join.right());
context.getRuntimeFilterContext().addEffectiveSrcNode(join, childType);
} else {
RuntimeFilterContext ctx = context.getRuntimeFilterContext();
List<ExprId> exprIds = ctx.getTargetExprIdByFilterJoin(join);
List<ExprId> exprIds = rfContext.getTargetExprIdByFilterJoin(join);
if (exprIds != null && !exprIds.isEmpty()) {
boolean isEffective = false;
for (Expression expr : join.getEqualToConjuncts()) {
@ -93,13 +116,21 @@ public class RuntimeFilterPruner extends PlanPostProcessor {
}
}
if (!isEffective) {
exprIds.stream().forEach(exprId -> context.getRuntimeFilterContext().removeFilter(exprId, join));
exprIds.stream().forEach(exprId -> rfContext.removeFilter(exprId, join));
}
}
}
join.left().accept(this, context);
if (context.getRuntimeFilterContext().isEffectiveSrcNode(join.left())) {
context.getRuntimeFilterContext().addEffectiveSrcNode(join);
if (rfContext.isEffectiveSrcNode(join.left())) {
RuntimeFilterContext.EffectiveSrcType leftType =
rfContext.getEffectiveSrcType(join.left());
RuntimeFilterContext.EffectiveSrcType rightType =
rfContext.getEffectiveSrcType(join.right());
if (rightType == null
|| (rightType == RuntimeFilterContext.EffectiveSrcType.REF
&& leftType == RuntimeFilterContext.EffectiveSrcType.NATIVE)) {
rfContext.addEffectiveSrcNode(join, leftType);
}
}
return join;
}
@ -122,7 +153,7 @@ public class RuntimeFilterPruner extends PlanPostProcessor {
.anyMatch(slot -> isVisibleColumn(slot));
if (visibleFilter) {
// skip filters like: __DORIS_DELETE_SIGN__ = 0
context.getRuntimeFilterContext().addEffectiveSrcNode(filter);
context.getRuntimeFilterContext().addEffectiveSrcNode(filter, RuntimeFilterContext.EffectiveSrcType.NATIVE);
}
return filter;
}
@ -134,7 +165,7 @@ public class RuntimeFilterPruner extends PlanPostProcessor {
for (Slot slot : slots) {
//if this scan node is the target of any effective RF, it is effective source
if (!rfCtx.getTargetExprIdToFilter().get(slot.getExprId()).isEmpty()) {
context.getRuntimeFilterContext().addEffectiveSrcNode(scan);
context.getRuntimeFilterContext().addEffectiveSrcNode(scan, RuntimeFilterContext.EffectiveSrcType.REF);
break;
}
}
@ -145,20 +176,23 @@ public class RuntimeFilterPruner extends PlanPostProcessor {
public PhysicalAssertNumRows visitPhysicalAssertNumRows(PhysicalAssertNumRows<? extends Plan> assertNumRows,
CascadesContext context) {
assertNumRows.child().accept(this, context);
context.getRuntimeFilterContext().addEffectiveSrcNode(assertNumRows);
context.getRuntimeFilterContext().addEffectiveSrcNode(assertNumRows,
RuntimeFilterContext.EffectiveSrcType.NATIVE);
return assertNumRows;
}
@Override
public PhysicalHashAggregate visitPhysicalHashAggregate(PhysicalHashAggregate<? extends Plan> aggregate,
CascadesContext context) {
RuntimeFilterContext ctx = context.getRuntimeFilterContext();
aggregate.child().accept(this, context);
// q1: A join (select x, sum(y) as z from B group by x) T on A.a = T.x
// q2: A join (select x, sum(y) as z from B group by x) T on A.a = T.z
// RF on q1 is not effective, but RF on q2 is. But q1 is a more generous pattern, and hence agg is not
// regarded as an effective source. Let this RF judge by ndv.
if (context.getRuntimeFilterContext().isEffectiveSrcNode(aggregate.child(0))) {
context.getRuntimeFilterContext().addEffectiveSrcNode(aggregate);
if (ctx.isEffectiveSrcNode(aggregate.child(0))) {
RuntimeFilterContext.EffectiveSrcType childType = ctx.getEffectiveSrcType(aggregate.child());
ctx.addEffectiveSrcNode(aggregate, childType);
}
return aggregate;
}

View File

@ -97,7 +97,8 @@ public class RuntimeFilterPrunerForExternalTable extends PlanPostProcessor {
CascadesContext context) {
join.right().accept(this, context);
join.right().setMutableState(MutableState.KEY_PARENT, join);
join.setMutableState(MutableState.KEY_RF_JUMP, join.right().getMutableState(MutableState.KEY_RF_JUMP).get());
join.setMutableState(MutableState.KEY_RF_JUMP,
join.right().getMutableState(MutableState.KEY_RF_JUMP).get());
join.left().accept(this, context);
join.left().setMutableState(MutableState.KEY_PARENT, join);
return join;
@ -121,15 +122,18 @@ public class RuntimeFilterPrunerForExternalTable extends PlanPostProcessor {
Plan cursor = scan;
Optional<Plan> parent = cursor.getMutableState(MutableState.KEY_PARENT);
while (parent.isPresent()) {
if (joinAndAncestors.contains(parent.get())) {
Optional oi = parent.get().getMutableState(MutableState.KEY_RF_JUMP);
if (oi.isPresent() && ConnectContext.get() != null
&& (int) (oi.get()) > ConnectContext.get().getSessionVariable().runtimeFilterJumpThreshold) {
return true;
}
} else {
if (isBuildSide(parent.get(), cursor)) {
return false;
if (parent.get() instanceof Join) {
if (joinAndAncestors.contains(parent.get())) {
Optional oi = parent.get().getMutableState(MutableState.KEY_RF_JUMP);
if (oi.isPresent() && ConnectContext.get() != null
&& (int) (oi.get())
> ConnectContext.get().getSessionVariable().runtimeFilterJumpThreshold) {
return true;
}
} else {
if (isBuildSide(parent.get(), cursor)) {
return false;
}
}
}
cursor = parent.get();
@ -148,6 +152,9 @@ public class RuntimeFilterPrunerForExternalTable extends PlanPostProcessor {
Optional oi = child.getMutableState(MutableState.KEY_RF_JUMP);
if (oi.isPresent()) {
int jump = (Integer) (oi.get());
if (child instanceof Join) {
jump++;
}
if (jump > maxJump) {
maxJump = jump;
}

View File

@ -207,17 +207,31 @@ public class PhysicalHashJoin<
"join child node is null");
Set<Expression> probExprList = Sets.newHashSet(probeExpr);
Pair<PhysicalRelation, Slot> pair = ctx.getAliasTransferMap().get(probeExpr);
PhysicalRelation target1 = (pair == null) ? null : pair.first;
PhysicalRelation target2 = null;
pair = ctx.getAliasTransferMap().get(srcExpr);
PhysicalRelation srcNode = (pair == null) ? null : pair.first;
if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable().expandRuntimeFilterByInnerJoin) {
if (!this.equals(builderNode) && this.getJoinType() == JoinType.INNER_JOIN) {
for (Expression expr : this.getHashJoinConjuncts()) {
EqualPredicate equalTo = (EqualPredicate) expr;
if (probeExpr.equals(equalTo.left())) {
probExprList.add(equalTo.right());
pair = ctx.getAliasTransferMap().get(equalTo.right());
target2 = (pair == null) ? null : pair.first;
} else if (probeExpr.equals(equalTo.right())) {
probExprList.add(equalTo.left());
pair = ctx.getAliasTransferMap().get(equalTo.left());
target2 = (pair == null) ? null : pair.first;
}
if (target2 != null) {
ctx.getExpandedRF().add(
new RuntimeFilterContext.ExpandRF(this, srcNode, target1, target2, equalTo));
}
}
probExprList.remove(srcExpr);
}
}
for (Expression prob : probExprList) {
@ -257,7 +271,6 @@ public class PhysicalHashJoin<
builder.append(" build RFs:").append(runtimeFilters.stream()
.map(rf -> rf.shapeInfo()).collect(Collectors.joining(";")));
}
// builder.append("jump: ").append(getMutableState(MutableState.KEY_RF_JUMP));
return builder.toString();
}