[opt](Nereids) make runtime filter target support expression (#30131)
the target expression should be: 1. only one numeric slot, or 2. cast for any data type example: select * from T1 join T2 on abs(T1.a) = T2.a RF T2.a->abs(T1.a)
This commit is contained in:
@ -115,30 +115,27 @@ public class RuntimeFilterTranslator {
|
||||
List<Map<TupleId, List<SlotId>>> targetTupleIdMapList = new ArrayList<>();
|
||||
List<ScanNode> scanNodeList = new ArrayList<>();
|
||||
boolean hasInvalidTarget = false;
|
||||
for (int i = 0; i < filter.getTargetExprs().size(); i++) {
|
||||
Slot curTargetExpr = filter.getTargetExprs().get(i);
|
||||
for (int i = 0; i < filter.getTargetExpressions().size(); i++) {
|
||||
Slot curTargetSlot = filter.getTargetSlots().get(i);
|
||||
Expression curTargetExpression = filter.getTargetExpressions().get(i);
|
||||
Expr target = context.getExprIdToOlapScanNodeSlotRef().get(curTargetExpr.getExprId());
|
||||
Expr target = context.getExprIdToOlapScanNodeSlotRef().get(curTargetSlot.getExprId());
|
||||
if (target == null) {
|
||||
context.setTargetNullCount();
|
||||
hasInvalidTarget = true;
|
||||
break;
|
||||
}
|
||||
ScanNode scanNode = context.getScanNodeOfLegacyRuntimeFilterTarget().get(curTargetExpr);
|
||||
ScanNode scanNode = context.getScanNodeOfLegacyRuntimeFilterTarget().get(curTargetSlot);
|
||||
Expr targetExpr;
|
||||
if (filter.getType() == TRuntimeFilterType.BITMAP) {
|
||||
if (curTargetExpression.equals(curTargetExpr)) {
|
||||
targetExpr = target;
|
||||
} else {
|
||||
RuntimeFilterExpressionTranslator translator = new RuntimeFilterExpressionTranslator(
|
||||
context.getExprIdToOlapScanNodeSlotRef());
|
||||
targetExpr = curTargetExpression.accept(translator, ctx);
|
||||
}
|
||||
} else {
|
||||
if (curTargetSlot.equals(curTargetExpression)) {
|
||||
targetExpr = target;
|
||||
} else {
|
||||
RuntimeFilterExpressionTranslator translator = new RuntimeFilterExpressionTranslator(
|
||||
context.getExprIdToOlapScanNodeSlotRef());
|
||||
targetExpr = curTargetExpression.accept(translator, ctx);
|
||||
}
|
||||
|
||||
// adjust data type
|
||||
if (!src.getType().equals(target.getType()) && filter.getType() != TRuntimeFilterType.BITMAP) {
|
||||
if (!src.getType().equals(targetExpr.getType()) && filter.getType() != TRuntimeFilterType.BITMAP) {
|
||||
targetExpr = new CastExpr(src.getType(), targetExpr);
|
||||
}
|
||||
SlotRef targetSlot = target.getSrcSlotRef();
|
||||
|
||||
@ -206,7 +206,7 @@ public class RuntimeFilterContext {
|
||||
}
|
||||
|
||||
public void setTargetExprIdToFilter(ExprId id, RuntimeFilter filter) {
|
||||
Preconditions.checkArgument(filter.getTargetExprs().stream().anyMatch(expr -> expr.getExprId() == id));
|
||||
Preconditions.checkArgument(filter.getTargetSlots().stream().anyMatch(expr -> expr.getExprId() == id));
|
||||
this.targetExprIdToFilter.computeIfAbsent(id, k -> Lists.newArrayList()).add(filter);
|
||||
}
|
||||
|
||||
|
||||
@ -315,7 +315,7 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
|
||||
if (expression.children().isEmpty()) {
|
||||
continue;
|
||||
}
|
||||
Expression expr = ExpressionUtils.getExpressionCoveredByCast(expression.child(0));
|
||||
Expression expr = ExpressionUtils.getSingleNumericSlotOrExpressionCoveredByCast(expression.child(0));
|
||||
if (expr instanceof NamedExpression
|
||||
&& ctx.aliasTransferMapContains((NamedExpression) expr)) {
|
||||
if (expression instanceof Alias) {
|
||||
@ -357,7 +357,7 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
|
||||
}
|
||||
|
||||
public static Slot checkTargetChild(Expression leftChild) {
|
||||
Expression expression = ExpressionUtils.getExpressionCoveredByCast(leftChild);
|
||||
Expression expression = ExpressionUtils.getSingleNumericSlotOrExpressionCoveredByCast(leftChild);
|
||||
return expression instanceof Slot ? ((Slot) expression) : null;
|
||||
}
|
||||
|
||||
@ -611,6 +611,7 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
|
||||
(SlotReference) targetExpr, ctx);
|
||||
if (!pushDownBasicTableInfos.isEmpty()) {
|
||||
List<Slot> targetList = new ArrayList<>();
|
||||
List<Expression> targetExpressions = new ArrayList<>();
|
||||
List<PhysicalRelation> targetNodes = new ArrayList<>();
|
||||
for (Map.Entry<Slot, PhysicalRelation> entry : pushDownBasicTableInfos.entrySet()) {
|
||||
Slot targetSlot = entry.getKey();
|
||||
@ -619,6 +620,7 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
|
||||
continue;
|
||||
}
|
||||
targetList.add(targetSlot);
|
||||
targetExpressions.add(targetSlot);
|
||||
targetNodes.add(scan);
|
||||
ctx.addJoinToTargetMap(join, targetSlot.getExprId());
|
||||
ctx.setTargetsOnScanNode(scan, targetSlot);
|
||||
@ -626,7 +628,7 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
|
||||
// build multi-target runtime filter
|
||||
// since always on different join, set the expr_order as 0
|
||||
RuntimeFilter filter = new RuntimeFilter(generator.getNextId(),
|
||||
equalTo.right(), targetList, type, 0, join, buildSideNdv, cteNode);
|
||||
equalTo.right(), targetList, targetExpressions, type, 0, join, buildSideNdv, cteNode);
|
||||
targetNodes.forEach(node -> node.addAppliedRuntimeFilter(filter));
|
||||
for (Slot slot : targetList) {
|
||||
ctx.setTargetExprIdToFilter(slot.getExprId(), filter);
|
||||
|
||||
@ -126,19 +126,19 @@ public abstract class AbstractPhysicalPlan extends AbstractPlan implements Physi
|
||||
// however, RF(B.b->A.a2) is implied by RF(B.a->A.a1) and A.a1=A.a2
|
||||
// we skip RF(B.b->A.a2)
|
||||
this.addAppliedRuntimeFilter(filter);
|
||||
filter.addTargetSlot(scanSlot, scan);
|
||||
filter.addTargetExpression(scanSlot);
|
||||
filter.addTargetSlot(scanSlot, probeExpr, scan);
|
||||
ctx.addJoinToTargetMap(builderNode, scanSlot.getExprId());
|
||||
ctx.setTargetExprIdToFilter(scanSlot.getExprId(), filter);
|
||||
ctx.setTargetsOnScanNode(ctx.getAliasTransferPair((NamedExpression) probeExpr).first, scanSlot);
|
||||
}
|
||||
} else {
|
||||
filter = new RuntimeFilter(generator.getNextId(),
|
||||
src, ImmutableList.of(scanSlot), type, exprOrder, builderNode, buildSideNdv, scan);
|
||||
src, ImmutableList.of(scanSlot), ImmutableList.of(probeExpr),
|
||||
type, exprOrder, builderNode, buildSideNdv, scan);
|
||||
this.addAppliedRuntimeFilter(filter);
|
||||
ctx.addJoinToTargetMap(builderNode, scanSlot.getExprId());
|
||||
ctx.setTargetExprIdToFilter(scanSlot.getExprId(), filter);
|
||||
ctx.setTargetsOnScanNode(ctx.getAliasTransferPair((NamedExpression) probeExpr).first, scanSlot);
|
||||
ctx.setTargetsOnScanNode(ctx.getAliasTransferPair((NamedExpression) probeSlot).first, scanSlot);
|
||||
ctx.setRuntimeFilterIdentityToFilter(src, type, builderNode, filter);
|
||||
}
|
||||
return true;
|
||||
|
||||
@ -28,6 +28,7 @@ import org.apache.doris.nereids.trees.expressions.Alias;
|
||||
import org.apache.doris.nereids.trees.expressions.Expression;
|
||||
import org.apache.doris.nereids.trees.expressions.NamedExpression;
|
||||
import org.apache.doris.nereids.trees.expressions.Slot;
|
||||
import org.apache.doris.nereids.trees.expressions.SlotReference;
|
||||
import org.apache.doris.nereids.trees.plans.Plan;
|
||||
import org.apache.doris.nereids.trees.plans.PlanType;
|
||||
import org.apache.doris.nereids.trees.plans.algebra.Project;
|
||||
@ -198,7 +199,15 @@ public class PhysicalProject<CHILD_TYPE extends Plan> extends PhysicalUnary<CHIL
|
||||
if (!RuntimeFilterGenerator.checkPushDownPreconditionsForRelation(this, scan)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (probeExpr instanceof SlotReference) {
|
||||
for (NamedExpression namedExpression : projects) {
|
||||
if (namedExpression instanceof Alias
|
||||
&& namedExpression.getExprId() == ((SlotReference) probeExpr).getExprId()) {
|
||||
probeExpr = ((Alias) namedExpression).child();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
AbstractPhysicalPlan child = (AbstractPhysicalPlan) child(0);
|
||||
return child.pushDownRuntimeFilter(context, generator, builderNode,
|
||||
src, probeExpr, type, buildSideNdv, exprOrder);
|
||||
|
||||
@ -23,7 +23,6 @@ import org.apache.doris.planner.RuntimeFilterId;
|
||||
import org.apache.doris.thrift.TMinMaxRuntimeFilterType;
|
||||
import org.apache.doris.thrift.TRuntimeFilterType;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import java.util.List;
|
||||
@ -55,10 +54,10 @@ public class RuntimeFilter {
|
||||
/**
|
||||
* constructor
|
||||
*/
|
||||
public RuntimeFilter(RuntimeFilterId id, Expression src, List<Slot> targets, TRuntimeFilterType type,
|
||||
int exprOrder, AbstractPhysicalJoin builderNode, long buildSideNdv,
|
||||
public RuntimeFilter(RuntimeFilterId id, Expression src, List<Slot> targets, List<Expression> targetExpressions,
|
||||
TRuntimeFilterType type, int exprOrder, AbstractPhysicalJoin builderNode, long buildSideNdv,
|
||||
PhysicalRelation scan) {
|
||||
this(id, src, targets, ImmutableList.copyOf(targets), type, exprOrder,
|
||||
this(id, src, targets, targetExpressions, type, exprOrder,
|
||||
builderNode, false, buildSideNdv, TMinMaxRuntimeFilterType.MIN_MAX, scan);
|
||||
}
|
||||
|
||||
@ -99,10 +98,6 @@ public class RuntimeFilter {
|
||||
return srcSlot;
|
||||
}
|
||||
|
||||
public List<Slot> getTargetExprs() {
|
||||
return targetSlots;
|
||||
}
|
||||
|
||||
public RuntimeFilterId getId() {
|
||||
return id;
|
||||
}
|
||||
@ -131,7 +126,8 @@ public class RuntimeFilter {
|
||||
return buildSideNdv;
|
||||
}
|
||||
|
||||
public void addTargetSlot(Slot target, PhysicalRelation scan) {
|
||||
public void addTargetSlot(Slot target, Expression targetExpression, PhysicalRelation scan) {
|
||||
targetExpressions.add(targetExpression);
|
||||
targetSlots.add(target);
|
||||
targetScans.add(scan);
|
||||
}
|
||||
@ -140,10 +136,6 @@ public class RuntimeFilter {
|
||||
return targetSlots;
|
||||
}
|
||||
|
||||
public void addTargetExpression(Expression targetExpr) {
|
||||
targetExpressions.add(targetExpr);
|
||||
}
|
||||
|
||||
public List<PhysicalRelation> getTargetScans() {
|
||||
return targetScans;
|
||||
}
|
||||
@ -156,7 +148,7 @@ public class RuntimeFilter {
|
||||
public String toString() {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("RF").append(id.asInt())
|
||||
.append("[").append(getSrcExpr()).append("->").append(targetSlots)
|
||||
.append("[").append(getSrcExpr()).append("->").append(targetExpressions)
|
||||
.append("(ndv/size = ").append(buildSideNdv).append("/")
|
||||
.append(org.apache.doris.planner.RuntimeFilter.expectRuntimeFilterSize(buildSideNdv))
|
||||
.append(")");
|
||||
@ -171,7 +163,7 @@ public class RuntimeFilter {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("RF").append(id.asInt())
|
||||
.append(" ").append(getSrcExpr().toSql()).append("->[").append(
|
||||
targetSlots.stream().map(slot -> slot.getName()).collect(Collectors.joining(",")))
|
||||
targetExpressions.stream().map(expr -> expr.toSql()).collect(Collectors.joining(",")))
|
||||
.append("]");
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
@ -742,6 +742,25 @@ public class ExpressionUtils {
|
||||
return expression;
|
||||
}
|
||||
|
||||
/**
|
||||
* the expressions can be used as runtime filter targets
|
||||
*/
|
||||
public static Expression getSingleNumericSlotOrExpressionCoveredByCast(Expression expression) {
|
||||
if (expression.getInputSlots().size() == 1) {
|
||||
Slot slot = expression.getInputSlots().iterator().next();
|
||||
if (slot.getDataType() instanceof NumericType) {
|
||||
return expression.getInputSlots().iterator().next();
|
||||
}
|
||||
}
|
||||
// for other datatype, only support cast.
|
||||
// example: T1 join T2 on subStr(T1.a, 1,4) = subStr(T2.a, 1,4)
|
||||
// the cost of subStr is too high, and hence we do not generate RF subStr(T2.a, 1,4)->subStr(T1.a, 1,4)
|
||||
while (expression instanceof Cast) {
|
||||
expression = ((Cast) expression).child();
|
||||
}
|
||||
return expression;
|
||||
}
|
||||
|
||||
/**
|
||||
* To check whether a slot is constant after passing through a filter
|
||||
*/
|
||||
|
||||
Reference in New Issue
Block a user