[runtimefilter](nerieds)support Non equal runtime filter for nested loop join #25193
This commit is contained in:
@ -180,6 +180,7 @@ import org.apache.doris.tablefunction.TableValuedFunctionIf;
|
||||
import org.apache.doris.thrift.TFetchOption;
|
||||
import org.apache.doris.thrift.TPartitionType;
|
||||
import org.apache.doris.thrift.TPushAggOp;
|
||||
import org.apache.doris.thrift.TRuntimeFilterType;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
@ -1392,7 +1393,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
|
||||
.getRuntimeFilterOfHashJoinNode(nestedLoopJoin);
|
||||
filters.forEach(filter -> runtimeFilterTranslator
|
||||
.createLegacyRuntimeFilter(filter, nestedLoopJoinNode, context));
|
||||
if (!filters.isEmpty()) {
|
||||
if (filters.stream().anyMatch(filter -> filter.getType() == TRuntimeFilterType.BITMAP)) {
|
||||
nestedLoopJoinNode.setOutputLeftSideOnly(true);
|
||||
}
|
||||
});
|
||||
|
||||
@ -152,8 +152,8 @@ public class RuntimeFilterTranslator {
|
||||
if (!hasInvalidTarget) {
|
||||
org.apache.doris.planner.RuntimeFilter origFilter
|
||||
= org.apache.doris.planner.RuntimeFilter.fromNereidsRuntimeFilter(
|
||||
filter.getId(), node, src, filter.getExprOrder(), targetExprList,
|
||||
targetTupleIdMapList, filter.getType(), context.getLimits(), filter.getBuildSideNdv());
|
||||
filter, node, src, targetExprList,
|
||||
targetTupleIdMapList, context.getLimits());
|
||||
if (node instanceof HashJoinNode) {
|
||||
origFilter.setIsBroadcast(((HashJoinNode) node).getDistributionMode() == DistributionMode.BROADCAST);
|
||||
} else {
|
||||
|
||||
@ -23,9 +23,14 @@ import org.apache.doris.nereids.CascadesContext;
|
||||
import org.apache.doris.nereids.stats.ExpressionEstimation;
|
||||
import org.apache.doris.nereids.trees.expressions.Alias;
|
||||
import org.apache.doris.nereids.trees.expressions.CTEId;
|
||||
import org.apache.doris.nereids.trees.expressions.ComparisonPredicate;
|
||||
import org.apache.doris.nereids.trees.expressions.EqualTo;
|
||||
import org.apache.doris.nereids.trees.expressions.ExprId;
|
||||
import org.apache.doris.nereids.trees.expressions.Expression;
|
||||
import org.apache.doris.nereids.trees.expressions.GreaterThan;
|
||||
import org.apache.doris.nereids.trees.expressions.GreaterThanEqual;
|
||||
import org.apache.doris.nereids.trees.expressions.LessThan;
|
||||
import org.apache.doris.nereids.trees.expressions.LessThanEqual;
|
||||
import org.apache.doris.nereids.trees.expressions.NamedExpression;
|
||||
import org.apache.doris.nereids.trees.expressions.Not;
|
||||
import org.apache.doris.nereids.trees.expressions.Slot;
|
||||
@ -53,6 +58,7 @@ import org.apache.doris.nereids.util.ExpressionUtils;
|
||||
import org.apache.doris.nereids.util.JoinUtils;
|
||||
import org.apache.doris.planner.RuntimeFilterId;
|
||||
import org.apache.doris.statistics.ColumnStatistic;
|
||||
import org.apache.doris.thrift.TMinMaxRuntimeFilterType;
|
||||
import org.apache.doris.thrift.TRuntimeFilterType;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
@ -113,6 +119,10 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
|
||||
RuntimeFilterContext ctx = context.getRuntimeFilterContext();
|
||||
join.right().accept(this, context);
|
||||
join.left().accept(this, context);
|
||||
if (RuntimeFilterGenerator.DENIED_JOIN_TYPES.contains(join.getJoinType()) || join.isMarkJoin()) {
|
||||
join.right().getOutput().forEach(slot ->
|
||||
context.getRuntimeFilterContext().getAliasTransferMap().remove(slot));
|
||||
}
|
||||
collectPushDownCTEInfos(join, context);
|
||||
if (!getPushDownCTECandidates(ctx).isEmpty()) {
|
||||
pushDownRuntimeFilterIntoCTE(ctx);
|
||||
@ -142,29 +152,19 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
|
||||
return producer;
|
||||
}
|
||||
|
||||
@Override
|
||||
public PhysicalPlan visitPhysicalNestedLoopJoin(PhysicalNestedLoopJoin<? extends Plan, ? extends Plan> join,
|
||||
CascadesContext context) {
|
||||
// TODO: we need to support all type join
|
||||
join.right().accept(this, context);
|
||||
join.left().accept(this, context);
|
||||
private void generateBitMapRuntimeFilterForNLJ(PhysicalNestedLoopJoin<? extends Plan, ? extends Plan> join,
|
||||
RuntimeFilterContext ctx) {
|
||||
if (join.getJoinType() != JoinType.LEFT_SEMI_JOIN && join.getJoinType() != JoinType.CROSS_JOIN) {
|
||||
return join;
|
||||
return;
|
||||
}
|
||||
RuntimeFilterContext ctx = context.getRuntimeFilterContext();
|
||||
Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap = ctx.getAliasTransferMap();
|
||||
|
||||
if ((ctx.getSessionVariable().getRuntimeFilterType() & TRuntimeFilterType.BITMAP.getValue()) == 0) {
|
||||
//only generate BITMAP filter for nested loop join
|
||||
return join;
|
||||
}
|
||||
List<Slot> leftSlots = join.left().getOutput();
|
||||
List<Slot> rightSlots = join.right().getOutput();
|
||||
List<Expression> bitmapRuntimeFilterConditions = JoinUtils.extractBitmapRuntimeFilterConditions(leftSlots,
|
||||
rightSlots, join.getOtherJoinConjuncts());
|
||||
if (!JoinUtils.extractExpressionForHashTable(leftSlots, rightSlots, join.getOtherJoinConjuncts())
|
||||
.first.isEmpty()) {
|
||||
return join;
|
||||
return;
|
||||
}
|
||||
int bitmapRFCount = bitmapRuntimeFilterConditions.size();
|
||||
for (int i = 0; i < bitmapRFCount; i++) {
|
||||
@ -193,6 +193,104 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
|
||||
join.addBitmapRuntimeFilterCondition(bitmapRuntimeFilterCondition);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A join B on B.x < A.x
|
||||
* transform B.x < A.x to A.x > B.x,
|
||||
* otherwise return null
|
||||
*/
|
||||
private ComparisonPredicate normalizeNonEqual(AbstractPhysicalJoin<? extends Plan, ? extends Plan> join,
|
||||
Expression expr) {
|
||||
if (!(expr instanceof ComparisonPredicate)) {
|
||||
return null;
|
||||
}
|
||||
if (!(expr.child(0) instanceof SlotReference)) {
|
||||
return null;
|
||||
}
|
||||
if (!(expr.child(1) instanceof SlotReference)) {
|
||||
return null;
|
||||
}
|
||||
if (! join.left().getOutput().contains(expr.child(0))
|
||||
|| ! join.right().getOutput().contains(expr.child(1))) {
|
||||
if (join.left().getOutput().contains(expr.child(1))
|
||||
&& join.right().getOutput().contains(expr.child(0))) {
|
||||
return ((ComparisonPredicate) expr).commute();
|
||||
}
|
||||
} else {
|
||||
return (ComparisonPredicate) expr;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private TMinMaxRuntimeFilterType getMinMaxType(ComparisonPredicate compare) {
|
||||
if (compare instanceof LessThan || compare instanceof LessThanEqual) {
|
||||
return TMinMaxRuntimeFilterType.MAX;
|
||||
}
|
||||
if (compare instanceof GreaterThan || compare instanceof GreaterThanEqual) {
|
||||
return TMinMaxRuntimeFilterType.MIN;
|
||||
}
|
||||
return TMinMaxRuntimeFilterType.MIN_MAX;
|
||||
}
|
||||
|
||||
/**
|
||||
* A join B on A.x < B.y
|
||||
* min-max filter (A.x < N, N=max(B.y)) could be applied to A.x
|
||||
*/
|
||||
private void generateMinMaxRuntimeFilter(AbstractPhysicalJoin<? extends Plan, ? extends Plan> join,
|
||||
RuntimeFilterContext ctx) {
|
||||
Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap = ctx.getAliasTransferMap();
|
||||
int hashCondionSize = join.getHashJoinConjuncts().size();
|
||||
for (int idx = 0; idx < join.getOtherJoinConjuncts().size(); idx++) {
|
||||
int exprOrder = idx + hashCondionSize;
|
||||
Expression expr = join.getOtherJoinConjuncts().get(exprOrder);
|
||||
ComparisonPredicate compare = normalizeNonEqual(join, expr);
|
||||
if (compare != null) {
|
||||
Slot unwrappedSlot = checkTargetChild(compare.child(0));
|
||||
if (unwrappedSlot == null) {
|
||||
continue;
|
||||
}
|
||||
Pair<PhysicalRelation, Slot> pair = aliasTransferMap.get(unwrappedSlot);
|
||||
if (pair == null) {
|
||||
continue;
|
||||
}
|
||||
Slot olapScanSlot = pair.second;
|
||||
PhysicalRelation scan = pair.first;
|
||||
Preconditions.checkState(olapScanSlot != null && scan != null);
|
||||
long buildSideNdv = getBuildSideNdv(join, compare);
|
||||
RuntimeFilter filter = new RuntimeFilter(generator.getNextId(),
|
||||
compare.child(1), ImmutableList.of(olapScanSlot), ImmutableList.of(olapScanSlot),
|
||||
TRuntimeFilterType.MIN_MAX, exprOrder, join, true, buildSideNdv,
|
||||
getMinMaxType(compare));
|
||||
ctx.addJoinToTargetMap(join, olapScanSlot.getExprId());
|
||||
ctx.setTargetExprIdToFilter(olapScanSlot.getExprId(), filter);
|
||||
ctx.setTargetsOnScanNode(scan.getRelationId(), olapScanSlot);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public PhysicalPlan visitPhysicalNestedLoopJoin(PhysicalNestedLoopJoin<? extends Plan, ? extends Plan> join,
|
||||
CascadesContext context) {
|
||||
// TODO: we need to support all type join
|
||||
join.right().accept(this, context);
|
||||
join.left().accept(this, context);
|
||||
|
||||
if (RuntimeFilterGenerator.DENIED_JOIN_TYPES.contains(join.getJoinType()) || join.isMarkJoin()) {
|
||||
join.right().getOutput().forEach(slot ->
|
||||
context.getRuntimeFilterContext().getAliasTransferMap().remove(slot));
|
||||
return join;
|
||||
}
|
||||
RuntimeFilterContext ctx = context.getRuntimeFilterContext();
|
||||
|
||||
if ((ctx.getSessionVariable().getRuntimeFilterType() & TRuntimeFilterType.BITMAP.getValue()) != 0) {
|
||||
generateBitMapRuntimeFilterForNLJ(join, ctx);
|
||||
}
|
||||
|
||||
if ((ctx.getSessionVariable().getRuntimeFilterType() & TRuntimeFilterType.MIN_MAX.getValue()) != 0) {
|
||||
generateMinMaxRuntimeFilter(join, ctx);
|
||||
}
|
||||
|
||||
return join;
|
||||
}
|
||||
|
||||
@ -233,14 +331,16 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
|
||||
return relation;
|
||||
}
|
||||
|
||||
private long getBuildSideNdv(PhysicalHashJoin<? extends Plan, ? extends Plan> join, EqualTo equalTo) {
|
||||
// runtime filter build side ndv
|
||||
private long getBuildSideNdv(AbstractPhysicalJoin<? extends Plan, ? extends Plan> join,
|
||||
ComparisonPredicate compare) {
|
||||
AbstractPlan right = (AbstractPlan) join.right();
|
||||
//make ut test friendly
|
||||
if (right.getStats() == null) {
|
||||
return -1L;
|
||||
}
|
||||
ExpressionEstimation estimator = new ExpressionEstimation();
|
||||
ColumnStatistic buildColStats = equalTo.right().accept(estimator, right.getStats());
|
||||
ColumnStatistic buildColStats = compare.right().accept(estimator, right.getStats());
|
||||
return buildColStats.isUnKnown ? -1 : Math.max(1, (long) buildColStats.ndv);
|
||||
}
|
||||
|
||||
|
||||
@ -30,6 +30,7 @@ import org.apache.doris.nereids.trees.plans.PlanType;
|
||||
import org.apache.doris.nereids.trees.plans.algebra.Join;
|
||||
import org.apache.doris.nereids.util.ExpressionUtils;
|
||||
import org.apache.doris.nereids.util.JoinUtils;
|
||||
import org.apache.doris.nereids.util.Utils;
|
||||
import org.apache.doris.statistics.Statistics;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
@ -41,6 +42,7 @@ import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* Abstract class for all physical join node.
|
||||
@ -214,4 +216,30 @@ public abstract class AbstractPhysicalJoin<
|
||||
? ImmutableList.of(markJoinSlotReference.get()) : ImmutableList.of())
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
List<Object> args = Lists.newArrayList("type", joinType,
|
||||
"hashCondition", hashJoinConjuncts,
|
||||
"otherCondition", otherJoinConjuncts,
|
||||
"stats", statistics);
|
||||
if (markJoinSlotReference.isPresent()) {
|
||||
args.add("isMarkJoin");
|
||||
args.add("true");
|
||||
}
|
||||
if (markJoinSlotReference.isPresent()) {
|
||||
args.add("MarkJoinSlotReference");
|
||||
args.add(markJoinSlotReference.get());
|
||||
}
|
||||
if (hint != JoinHint.NONE) {
|
||||
args.add("hint");
|
||||
args.add(hint);
|
||||
}
|
||||
if (!runtimeFilters.isEmpty()) {
|
||||
args.add("runtimeFilters");
|
||||
args.add(runtimeFilters.stream().map(rf -> rf.toString() + " ").collect(Collectors.toList()));
|
||||
}
|
||||
return Utils.toSqlString(this.getClass().getName() + "[" + id.asInt() + "]" + getGroupIdWithPrefix(),
|
||||
args.toArray());
|
||||
}
|
||||
}
|
||||
|
||||
@ -31,14 +31,12 @@ import org.apache.doris.nereids.trees.expressions.Expression;
|
||||
import org.apache.doris.nereids.trees.expressions.MarkJoinSlotReference;
|
||||
import org.apache.doris.nereids.trees.expressions.NamedExpression;
|
||||
import org.apache.doris.nereids.trees.expressions.Slot;
|
||||
import org.apache.doris.nereids.trees.plans.AbstractPlan;
|
||||
import org.apache.doris.nereids.trees.plans.JoinHint;
|
||||
import org.apache.doris.nereids.trees.plans.JoinType;
|
||||
import org.apache.doris.nereids.trees.plans.Plan;
|
||||
import org.apache.doris.nereids.trees.plans.PlanType;
|
||||
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
|
||||
import org.apache.doris.nereids.util.MutableState;
|
||||
import org.apache.doris.nereids.util.Utils;
|
||||
import org.apache.doris.planner.RuntimeFilterId;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.statistics.Statistics;
|
||||
@ -140,33 +138,6 @@ public class PhysicalHashJoin<
|
||||
return visitor.visitPhysicalHashJoin(this, context);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
List<Object> args = Lists.newArrayList("type", joinType,
|
||||
"hashJoinCondition", hashJoinConjuncts,
|
||||
"otherJoinCondition", otherJoinConjuncts,
|
||||
"stats", statistics,
|
||||
"fr", getMutableState(AbstractPlan.FRAGMENT_ID));
|
||||
if (markJoinSlotReference.isPresent()) {
|
||||
args.add("isMarkJoin");
|
||||
args.add("true");
|
||||
}
|
||||
if (markJoinSlotReference.isPresent()) {
|
||||
args.add("MarkJoinSlotReference");
|
||||
args.add(markJoinSlotReference.get());
|
||||
}
|
||||
if (hint != JoinHint.NONE) {
|
||||
args.add("hint");
|
||||
args.add(hint);
|
||||
}
|
||||
if (!runtimeFilters.isEmpty()) {
|
||||
args.add("runtimeFilters");
|
||||
args.add(runtimeFilters.stream().map(rf -> rf.toString() + " ").collect(Collectors.toList()));
|
||||
}
|
||||
return Utils.toSqlString("PhysicalHashJoin[" + id.asInt() + "]" + getGroupIdWithPrefix(),
|
||||
args.toArray());
|
||||
}
|
||||
|
||||
@Override
|
||||
public PhysicalHashJoin<Plan, Plan> withChildren(List<Plan> children) {
|
||||
Preconditions.checkArgument(children.size() == 2);
|
||||
|
||||
@ -28,7 +28,6 @@ import org.apache.doris.nereids.trees.plans.Plan;
|
||||
import org.apache.doris.nereids.trees.plans.PlanType;
|
||||
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
|
||||
import org.apache.doris.nereids.util.MutableState;
|
||||
import org.apache.doris.nereids.util.Utils;
|
||||
import org.apache.doris.statistics.Statistics;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
@ -113,17 +112,17 @@ public class PhysicalNestedLoopJoin<
|
||||
return visitor.visitPhysicalNestedLoopJoin(this, context);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
// TODO: Maybe we could pull up this to the abstract class in the future.
|
||||
return Utils.toSqlString("PhysicalNestedLoopJoin[" + id.asInt() + "]" + getGroupIdWithPrefix(),
|
||||
"type", joinType,
|
||||
"otherJoinCondition", otherJoinConjuncts,
|
||||
"isMarkJoin", markJoinSlotReference.isPresent(),
|
||||
"markJoinSlotReference", markJoinSlotReference.isPresent() ? markJoinSlotReference.get() : "empty",
|
||||
"stats", statistics
|
||||
);
|
||||
}
|
||||
// @Override
|
||||
// public String toString() {
|
||||
// // TODO: Maybe we could pull up this to the abstract class in the future.
|
||||
// return Utils.toSqlString("PhysicalNestedLoopJoin[" + id.asInt() + "]" + getGroupIdWithPrefix(),
|
||||
// "type", joinType,
|
||||
// "otherJoinCondition", otherJoinConjuncts,
|
||||
// "isMarkJoin", markJoinSlotReference.isPresent(),
|
||||
// "markJoinSlotReference", markJoinSlotReference.isPresent() ? markJoinSlotReference.get() : "empty",
|
||||
// "stats", statistics
|
||||
// );
|
||||
// }
|
||||
|
||||
@Override
|
||||
public PhysicalNestedLoopJoin<Plan, Plan> withChildren(List<Plan> children) {
|
||||
|
||||
@ -20,6 +20,7 @@ package org.apache.doris.nereids.trees.plans.physical;
|
||||
import org.apache.doris.nereids.trees.expressions.Expression;
|
||||
import org.apache.doris.nereids.trees.expressions.Slot;
|
||||
import org.apache.doris.planner.RuntimeFilterId;
|
||||
import org.apache.doris.thrift.TMinMaxRuntimeFilterType;
|
||||
import org.apache.doris.thrift.TRuntimeFilterType;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
@ -45,21 +46,31 @@ public class RuntimeFilter {
|
||||
private final boolean bitmapFilterNotIn;
|
||||
|
||||
private final long buildSideNdv;
|
||||
// use for min-max filter only. specify if the min or max side is valid
|
||||
private final TMinMaxRuntimeFilterType tMinMaxType;
|
||||
|
||||
/**
|
||||
* constructor
|
||||
*/
|
||||
public RuntimeFilter(RuntimeFilterId id, Expression src, List<Slot> targets, TRuntimeFilterType type,
|
||||
int exprOrder, AbstractPhysicalJoin builderNode, long buildSideNdv) {
|
||||
this(id, src, targets, ImmutableList.copyOf(targets), type, exprOrder, builderNode, false, buildSideNdv);
|
||||
this(id, src, targets, ImmutableList.copyOf(targets), type, exprOrder,
|
||||
builderNode, false, buildSideNdv, TMinMaxRuntimeFilterType.MIN_MAX);
|
||||
}
|
||||
|
||||
public RuntimeFilter(RuntimeFilterId id, Expression src, List<Slot> targets, List<Expression> targetExpressions,
|
||||
TRuntimeFilterType type, int exprOrder, AbstractPhysicalJoin builderNode,
|
||||
boolean bitmapFilterNotIn, long buildSideNdv) {
|
||||
this(id, src, targets, targetExpressions, type, exprOrder,
|
||||
builderNode, bitmapFilterNotIn, buildSideNdv, TMinMaxRuntimeFilterType.MIN_MAX);
|
||||
}
|
||||
|
||||
/**
|
||||
* constructor
|
||||
*/
|
||||
public RuntimeFilter(RuntimeFilterId id, Expression src, List<Slot> targets, List<Expression> targetExpressions,
|
||||
TRuntimeFilterType type, int exprOrder, AbstractPhysicalJoin builderNode, boolean bitmapFilterNotIn,
|
||||
long buildSideNdv) {
|
||||
TRuntimeFilterType type, int exprOrder, AbstractPhysicalJoin builderNode,
|
||||
boolean bitmapFilterNotIn, long buildSideNdv, TMinMaxRuntimeFilterType tMinMaxType) {
|
||||
this.id = id;
|
||||
this.srcSlot = src;
|
||||
this.targetSlots = Lists.newArrayList(targets);
|
||||
@ -69,9 +80,14 @@ public class RuntimeFilter {
|
||||
this.builderNode = builderNode;
|
||||
this.bitmapFilterNotIn = bitmapFilterNotIn;
|
||||
this.buildSideNdv = buildSideNdv <= 0 ? -1L : buildSideNdv;
|
||||
this.tMinMaxType = tMinMaxType;
|
||||
builderNode.addRuntimeFilter(this);
|
||||
}
|
||||
|
||||
public TMinMaxRuntimeFilterType gettMinMaxType() {
|
||||
return tMinMaxType;
|
||||
}
|
||||
|
||||
public Expression getSrcExpr() {
|
||||
return srcSlot;
|
||||
}
|
||||
|
||||
@ -150,24 +150,7 @@ public class DataStreamSink extends DataSink {
|
||||
}
|
||||
List<String> filtersStr = new ArrayList<>();
|
||||
for (RuntimeFilter filter : runtimeFilters) {
|
||||
StringBuilder filterStr = new StringBuilder();
|
||||
filterStr.append(filter.getFilterId());
|
||||
if (!isBrief) {
|
||||
filterStr.append("[");
|
||||
filterStr.append(filter.getType().toString().toLowerCase());
|
||||
filterStr.append("]");
|
||||
if (isBuildNode) {
|
||||
filterStr.append(" <- ");
|
||||
filterStr.append(filter.getSrcExpr().toSql());
|
||||
filterStr.append("(").append(filter.getEstimateNdv()).append("/")
|
||||
.append(filter.getExpectFilterSizeBytes()).append("/")
|
||||
.append(filter.getFilterSizeBytes()).append(")");
|
||||
} else {
|
||||
filterStr.append(" -> ");
|
||||
filterStr.append(filter.getTargetExpr(getExchNodeId()).toSql());
|
||||
}
|
||||
}
|
||||
filtersStr.add(filterStr.toString());
|
||||
filtersStr.add(filter.getExplainString(isBuildNode, isBrief, getExchNodeId()));
|
||||
}
|
||||
return Joiner.on(", ").join(filtersStr) + "\n";
|
||||
}
|
||||
|
||||
@ -1116,24 +1116,7 @@ public abstract class PlanNode extends TreeNode<PlanNode> implements PlanStats {
|
||||
}
|
||||
List<String> filtersStr = new ArrayList<>();
|
||||
for (RuntimeFilter filter : runtimeFilters) {
|
||||
StringBuilder filterStr = new StringBuilder();
|
||||
filterStr.append(filter.getFilterId());
|
||||
if (!isBrief) {
|
||||
filterStr.append("[");
|
||||
filterStr.append(filter.getType().toString().toLowerCase());
|
||||
filterStr.append("]");
|
||||
if (isBuildNode) {
|
||||
filterStr.append(" <- ");
|
||||
filterStr.append(filter.getSrcExpr().toSql());
|
||||
filterStr.append("(").append(filter.getEstimateNdv()).append("/")
|
||||
.append(filter.getExpectFilterSizeBytes()).append("/")
|
||||
.append(filter.getFilterSizeBytes()).append(")");
|
||||
} else {
|
||||
filterStr.append(" -> ");
|
||||
filterStr.append(filter.getTargetExpr(getId()).toSql());
|
||||
}
|
||||
}
|
||||
filtersStr.add(filterStr.toString());
|
||||
filtersStr.add(filter.getExplainString(isBuildNode, isBrief, getId()));
|
||||
}
|
||||
return Joiner.on(", ").join(filtersStr) + "\n";
|
||||
}
|
||||
|
||||
@ -35,6 +35,7 @@ import org.apache.doris.common.IdGenerator;
|
||||
import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.qe.SessionVariable;
|
||||
import org.apache.doris.thrift.TMinMaxRuntimeFilterType;
|
||||
import org.apache.doris.thrift.TRuntimeFilterDesc;
|
||||
import org.apache.doris.thrift.TRuntimeFilterType;
|
||||
|
||||
@ -109,6 +110,8 @@ public final class RuntimeFilter {
|
||||
|
||||
private boolean useRemoteRfOpt = true;
|
||||
|
||||
private TMinMaxRuntimeFilterType tMinMaxRuntimeFilterType;
|
||||
|
||||
/**
|
||||
* Internal representation of a runtime filter target.
|
||||
*/
|
||||
@ -142,8 +145,10 @@ public final class RuntimeFilter {
|
||||
}
|
||||
|
||||
private RuntimeFilter(RuntimeFilterId filterId, PlanNode filterSrcNode, Expr srcExpr, int exprOrder,
|
||||
List<Expr> origTargetExprs, List<Map<TupleId, List<SlotId>>> targetSlots, TRuntimeFilterType type,
|
||||
RuntimeFilterGenerator.FilterSizeLimits filterSizeLimits, long buildSizeNdv) {
|
||||
List<Expr> origTargetExprs, List<Map<TupleId, List<SlotId>>> targetSlots,
|
||||
TRuntimeFilterType type,
|
||||
RuntimeFilterGenerator.FilterSizeLimits filterSizeLimits, long buildSizeNdv,
|
||||
TMinMaxRuntimeFilterType tMinMaxRuntimeFilterType) {
|
||||
this.id = filterId;
|
||||
this.builderNode = filterSrcNode;
|
||||
this.srcExpr = srcExpr;
|
||||
@ -152,16 +157,27 @@ public final class RuntimeFilter {
|
||||
this.targetSlotsByTid = ImmutableList.copyOf(targetSlots);
|
||||
this.runtimeFilterType = type;
|
||||
this.ndvEstimate = buildSizeNdv;
|
||||
this.tMinMaxRuntimeFilterType = tMinMaxRuntimeFilterType;
|
||||
computeNdvEstimate();
|
||||
calculateFilterSize(filterSizeLimits);
|
||||
}
|
||||
|
||||
private RuntimeFilter(RuntimeFilterId filterId, PlanNode filterSrcNode, Expr srcExpr, int exprOrder,
|
||||
List<Expr> origTargetExprs, List<Map<TupleId, List<SlotId>>> targetSlots, TRuntimeFilterType type,
|
||||
RuntimeFilterGenerator.FilterSizeLimits filterSizeLimits, long buildSizeNdv) {
|
||||
this(filterId, filterSrcNode, srcExpr, exprOrder, origTargetExprs,
|
||||
targetSlots, type, filterSizeLimits, buildSizeNdv, TMinMaxRuntimeFilterType.MIN_MAX);
|
||||
}
|
||||
|
||||
// only for nereids planner
|
||||
public static RuntimeFilter fromNereidsRuntimeFilter(RuntimeFilterId id, JoinNodeBase node, Expr srcExpr,
|
||||
int exprOrder, List<Expr> origTargetExprs, List<Map<TupleId, List<SlotId>>> targetSlots,
|
||||
TRuntimeFilterType type, RuntimeFilterGenerator.FilterSizeLimits filterSizeLimits, long buildSizeNdv) {
|
||||
return new RuntimeFilter(id, node, srcExpr, exprOrder, origTargetExprs,
|
||||
targetSlots, type, filterSizeLimits, buildSizeNdv);
|
||||
public static RuntimeFilter fromNereidsRuntimeFilter(
|
||||
org.apache.doris.nereids.trees.plans.physical.RuntimeFilter nereidsFilter,
|
||||
JoinNodeBase node, Expr srcExpr, List<Expr> origTargetExprs,
|
||||
List<Map<TupleId, List<SlotId>>> targetSlots,
|
||||
RuntimeFilterGenerator.FilterSizeLimits filterSizeLimits) {
|
||||
return new RuntimeFilter(nereidsFilter.getId(), node, srcExpr, nereidsFilter.getExprOrder(), origTargetExprs,
|
||||
targetSlots, nereidsFilter.getType(), filterSizeLimits, nereidsFilter.getBuildSideNdv(),
|
||||
nereidsFilter.gettMinMaxType());
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -224,6 +240,9 @@ public final class RuntimeFilter {
|
||||
tFilter.setBitmapTargetExpr(targets.get(0).expr.treeToThrift());
|
||||
tFilter.setBitmapFilterNotIn(bitmapFilterNotIn);
|
||||
}
|
||||
if (runtimeFilterType.equals(TRuntimeFilterType.MIN_MAX)) {
|
||||
tFilter.setMinMaxType(tMinMaxRuntimeFilterType);
|
||||
}
|
||||
tFilter.setOptRemoteRf(optRemoteRf);
|
||||
return tFilter;
|
||||
}
|
||||
@ -256,6 +275,18 @@ public final class RuntimeFilter {
|
||||
return runtimeFilterType;
|
||||
}
|
||||
|
||||
public String getTypeDesc() {
|
||||
String desc = runtimeFilterType.toString().toLowerCase();
|
||||
if (runtimeFilterType == TRuntimeFilterType.MIN_MAX) {
|
||||
if (tMinMaxRuntimeFilterType == TMinMaxRuntimeFilterType.MIN) {
|
||||
desc = "min";
|
||||
} else if (tMinMaxRuntimeFilterType == TMinMaxRuntimeFilterType.MAX) {
|
||||
desc = "max";
|
||||
}
|
||||
}
|
||||
return desc;
|
||||
}
|
||||
|
||||
public void setType(TRuntimeFilterType type) {
|
||||
runtimeFilterType = type;
|
||||
}
|
||||
@ -685,4 +716,25 @@ public final class RuntimeFilter {
|
||||
public long getExpectFilterSizeBytes() {
|
||||
return expectFilterSizeBytes;
|
||||
}
|
||||
|
||||
public String getExplainString(boolean isBuildNode, boolean isBrief, PlanNodeId targetNodeId) {
|
||||
StringBuilder filterStr = new StringBuilder();
|
||||
filterStr.append(getFilterId());
|
||||
if (!isBrief) {
|
||||
filterStr.append("[");
|
||||
filterStr.append(getTypeDesc());
|
||||
filterStr.append("]");
|
||||
if (isBuildNode) {
|
||||
filterStr.append(" <- ");
|
||||
filterStr.append(getSrcExpr().toSql());
|
||||
filterStr.append("(").append(getEstimateNdv()).append("/")
|
||||
.append(getExpectFilterSizeBytes()).append("/")
|
||||
.append(getFilterSizeBytes()).append(")");
|
||||
} else {
|
||||
filterStr.append(" -> ");
|
||||
filterStr.append(getTargetExpr(targetNodeId).toSql());
|
||||
}
|
||||
}
|
||||
return filterStr.toString();
|
||||
}
|
||||
}
|
||||
|
||||
111
regression-test/suites/correctness_p0/test_runtime_filter.groovy
Normal file
111
regression-test/suites/correctness_p0/test_runtime_filter.groovy
Normal file
@ -0,0 +1,111 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
// The cases is copied from https://github.com/trinodb/trino/tree/master
|
||||
// /testing/trino-product-tests/src/main/resources/sql-tests/testcases/aggregate
|
||||
// and modified by Doris.
|
||||
|
||||
suite("test_runtime_filter") {
|
||||
|
||||
sql """ DROP TABLE IF EXISTS rf_tblA """
|
||||
sql """
|
||||
CREATE TABLE IF NOT EXISTS rf_tblA (
|
||||
a int
|
||||
)
|
||||
DUPLICATE KEY(a)
|
||||
DISTRIBUTED BY HASH(a) BUCKETS 1
|
||||
PROPERTIES (
|
||||
"replication_num" = "1"
|
||||
)
|
||||
"""
|
||||
|
||||
sql """ DROP TABLE IF EXISTS rf_tblB """
|
||||
sql """
|
||||
CREATE TABLE IF NOT EXISTS rf_tblB (
|
||||
b int
|
||||
)
|
||||
DUPLICATE KEY(b)
|
||||
DISTRIBUTED BY HASH(b) BUCKETS 1
|
||||
PROPERTIES (
|
||||
"replication_num" = "1"
|
||||
)
|
||||
"""
|
||||
sql """
|
||||
CREATE TABLE IF NOT EXISTS rf_tblC (
|
||||
c int
|
||||
)
|
||||
DUPLICATE KEY(c)
|
||||
DISTRIBUTED BY HASH(c) BUCKETS 1
|
||||
PROPERTIES (
|
||||
"replication_num" = "1"
|
||||
)
|
||||
"""
|
||||
|
||||
sql "set enable_pipeline_engine=true;"
|
||||
sql "set runtime_filter_type=4"
|
||||
sql "set enable_nereids_planner=true"
|
||||
sql "set enable_fallback_to_original_planner=false"
|
||||
sql "set disable_join_reorder=true"
|
||||
|
||||
explain{
|
||||
sql ("""select * from rf_tblA join rf_tblB on a < b""")
|
||||
contains "runtime filters: RF000[max] -> a"
|
||||
contains "runtime filters: RF000[max] <- b"
|
||||
}
|
||||
|
||||
explain{
|
||||
sql ("""select * from rf_tblA join rf_tblB on a > b""")
|
||||
contains "runtime filters: RF000[min] -> a"
|
||||
contains "runtime filters: RF000[min] <- b"
|
||||
}
|
||||
|
||||
explain{
|
||||
sql ("""select * from rf_tblA join rf_tblB on b < a""")
|
||||
contains "runtime filters: RF000[min] -> a"
|
||||
contains "runtime filters: RF000[min] <- b"
|
||||
}
|
||||
|
||||
explain{
|
||||
sql ("""select * from rf_tblA right outer join rf_tblB on a < b""")
|
||||
contains "runtime filters: RF000[max] <- b"
|
||||
contains "runtime filters: RF000[max] -> a"
|
||||
}
|
||||
|
||||
explain{
|
||||
sql ("""select * from rf_tblA left join rf_tblB on a < b; """)
|
||||
notContains "runtime filters"
|
||||
}
|
||||
|
||||
explain{
|
||||
sql ("""select * from rf_tblA full outer join rf_tblB on a = b; """)
|
||||
notContains "runtime filters"
|
||||
}
|
||||
|
||||
explain{
|
||||
sql ("""
|
||||
with x as (select * from rf_tblA join rf_tblB on a=b)
|
||||
select * from x join rf_tblC on x.b <= rf_tblC.c
|
||||
union
|
||||
select * from x join rf_tblC on x.b <= rf_tblC.c
|
||||
""")
|
||||
contains "runtime filters: RF001[max] -> b"
|
||||
contains "runtime filters: RF002[max] -> b"
|
||||
contains "runtime filters: RF001[max] <- c"
|
||||
contains "runtime filters: RF002[max] <- c"
|
||||
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user