This reverts commit 3eac53f75d5f3eb05e958403efeb7578ad86e438.
This commit is contained in:
@ -34,6 +34,7 @@ import org.apache.doris.planner.HashJoinNode;
|
||||
import org.apache.doris.planner.HashJoinNode.DistributionMode;
|
||||
import org.apache.doris.planner.JoinNodeBase;
|
||||
import org.apache.doris.planner.RuntimeFilter.RuntimeFilterTarget;
|
||||
import org.apache.doris.planner.RuntimeFilterGenerator.FilterSizeLimits;
|
||||
import org.apache.doris.planner.ScanNode;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.thrift.TRuntimeFilterType;
|
||||
@ -125,11 +126,20 @@ public class RuntimeFilterTranslator {
|
||||
if (!src.getType().equals(target.getType()) && filter.getType() != TRuntimeFilterType.BITMAP) {
|
||||
targetExpr = new CastExpr(src.getType(), targetExpr);
|
||||
}
|
||||
FilterSizeLimits filterSizeLimits = context.getLimits();
|
||||
if (node instanceof HashJoinNode
|
||||
&& !(((HashJoinNode) node).getDistributionMode() == DistributionMode.BROADCAST)
|
||||
&& ConnectContext.get() != null
|
||||
&& ConnectContext.get().getSessionVariable().enablePipelineEngine()
|
||||
&& ConnectContext.get().getSessionVariable().getParallelExecInstanceNum() > 0) {
|
||||
filterSizeLimits = filterSizeLimits.adjustForParallel(
|
||||
ConnectContext.get().getSessionVariable().getParallelExecInstanceNum());
|
||||
}
|
||||
org.apache.doris.planner.RuntimeFilter origFilter
|
||||
= org.apache.doris.planner.RuntimeFilter.fromNereidsRuntimeFilter(
|
||||
filter.getId(), node, src, filter.getExprOrder(), targetExpr,
|
||||
ImmutableMap.of(targetTupleId, ImmutableList.of(targetSlotId)),
|
||||
filter.getType(), context.getLimits(), filter.getBuildSideNdv());
|
||||
filter.getType(), filterSizeLimits);
|
||||
if (node instanceof HashJoinNode) {
|
||||
origFilter.setIsBroadcast(((HashJoinNode) node).getDistributionMode() == DistributionMode.BROADCAST);
|
||||
} else {
|
||||
|
||||
@ -20,7 +20,6 @@ package org.apache.doris.nereids.processor.post;
|
||||
import org.apache.doris.common.IdGenerator;
|
||||
import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.nereids.CascadesContext;
|
||||
import org.apache.doris.nereids.stats.StatsMathUtil;
|
||||
import org.apache.doris.nereids.trees.expressions.Alias;
|
||||
import org.apache.doris.nereids.trees.expressions.EqualTo;
|
||||
import org.apache.doris.nereids.trees.expressions.Expression;
|
||||
@ -28,7 +27,6 @@ import org.apache.doris.nereids.trees.expressions.NamedExpression;
|
||||
import org.apache.doris.nereids.trees.expressions.Not;
|
||||
import org.apache.doris.nereids.trees.expressions.Slot;
|
||||
import org.apache.doris.nereids.trees.expressions.functions.scalar.BitmapContains;
|
||||
import org.apache.doris.nereids.trees.plans.AbstractPlan;
|
||||
import org.apache.doris.nereids.trees.plans.JoinType;
|
||||
import org.apache.doris.nereids.trees.plans.ObjectId;
|
||||
import org.apache.doris.nereids.trees.plans.Plan;
|
||||
@ -48,7 +46,6 @@ import com.google.common.collect.ImmutableSet;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@ -114,24 +111,8 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
|
||||
continue;
|
||||
}
|
||||
Slot olapScanSlot = aliasTransferMap.get(unwrappedSlot).second;
|
||||
long buildSideNdv = -1L;
|
||||
AbstractPlan right = (AbstractPlan) join.right();
|
||||
if (right.getStats() != null) {
|
||||
List<Double> ndvs = join.getHashJoinConjuncts().stream()
|
||||
.map(Expression::getInputSlots)
|
||||
.flatMap(Set::stream)
|
||||
.filter(s -> right.getOutputExprIdSet().contains(s.getExprId()))
|
||||
.map(s -> right.getStats().columnStatistics().get(s))
|
||||
.filter(Objects::nonNull)
|
||||
.map(cs -> cs.ndv)
|
||||
.collect(Collectors.toList());
|
||||
buildSideNdv = (long) StatsMathUtil.jointNdv(ndvs);
|
||||
if (buildSideNdv <= 0 || buildSideNdv > right.getStats().getRowCount()) {
|
||||
buildSideNdv = (long) right.getStats().getRowCount();
|
||||
}
|
||||
}
|
||||
RuntimeFilter filter = new RuntimeFilter(generator.getNextId(),
|
||||
equalTo.right(), olapScanSlot, type, i, join, buildSideNdv);
|
||||
equalTo.right(), olapScanSlot, type, i, join);
|
||||
ctx.addJoinToTargetMap(join, olapScanSlot.getExprId());
|
||||
ctx.setTargetExprIdToFilter(olapScanSlot.getExprId(), filter);
|
||||
ctx.setTargetsOnScanNode(aliasTransferMap.get(unwrappedSlot).first, olapScanSlot);
|
||||
@ -169,7 +150,7 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
|
||||
for (int i = 0; i < bitmapRFCount; i++) {
|
||||
Expression bitmapRuntimeFilterCondition = bitmapRuntimeFilterConditions.get(i);
|
||||
boolean isNot = bitmapRuntimeFilterCondition instanceof Not;
|
||||
BitmapContains bitmapContains;
|
||||
BitmapContains bitmapContains = null;
|
||||
if (bitmapRuntimeFilterCondition instanceof Not) {
|
||||
bitmapContains = (BitmapContains) bitmapRuntimeFilterCondition.child(0);
|
||||
} else {
|
||||
@ -182,7 +163,7 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
|
||||
Slot olapScanSlot = aliasTransferMap.get(targetSlot).second;
|
||||
RuntimeFilter filter = new RuntimeFilter(generator.getNextId(),
|
||||
bitmapContains.child(0), olapScanSlot,
|
||||
bitmapContains.child(1), type, i, join, isNot, -1L);
|
||||
bitmapContains.child(1), type, i, join, isNot);
|
||||
ctx.addJoinToTargetMap(join, olapScanSlot.getExprId());
|
||||
ctx.setTargetExprIdToFilter(olapScanSlot.getExprId(), filter);
|
||||
ctx.setTargetsOnScanNode(aliasTransferMap.get(targetSlot).first, olapScanSlot);
|
||||
|
||||
@ -17,11 +17,6 @@
|
||||
|
||||
package org.apache.doris.nereids.stats;
|
||||
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Math util for statistics derivation
|
||||
*/
|
||||
@ -64,21 +59,4 @@ public class StatsMathUtil {
|
||||
return a / nonZeroDivisor(b);
|
||||
}
|
||||
|
||||
/**
|
||||
* compute the multi columns unite ndv
|
||||
*/
|
||||
public static double jointNdv(List<Double> ndvs) {
|
||||
if (CollectionUtils.isEmpty(ndvs)) {
|
||||
return -1;
|
||||
}
|
||||
if (ndvs.stream().anyMatch(n -> n <= 0)) {
|
||||
return -1;
|
||||
}
|
||||
ndvs.sort(Collections.reverseOrder());
|
||||
double multiNdv = 1;
|
||||
for (int i = 0; i < ndvs.size(); i++) {
|
||||
multiNdv = multiNdv * Math.pow(ndvs.get(i), 1 / Math.pow(2, i));
|
||||
}
|
||||
return multiNdv;
|
||||
}
|
||||
}
|
||||
|
||||
@ -32,29 +32,27 @@ public class RuntimeFilter {
|
||||
private final Expression srcSlot;
|
||||
//bitmap filter support target expression like k1+1, abs(k1)
|
||||
//targetExpression is an expression on targetSlot, in which there is only one non-const slot
|
||||
private final Expression targetExpression;
|
||||
private final Slot targetSlot;
|
||||
private Expression targetExpression;
|
||||
private Slot targetSlot;
|
||||
private final int exprOrder;
|
||||
private final AbstractPhysicalJoin builderNode;
|
||||
private AbstractPhysicalJoin builderNode;
|
||||
|
||||
private final boolean bitmapFilterNotIn;
|
||||
|
||||
private final long buildSideNdv;
|
||||
private boolean bitmapFilterNotIn;
|
||||
|
||||
/**
|
||||
* constructor
|
||||
*/
|
||||
public RuntimeFilter(RuntimeFilterId id, Expression src, Slot target, TRuntimeFilterType type,
|
||||
int exprOrder, AbstractPhysicalJoin builderNode, long buildSideNdv) {
|
||||
this(id, src, target, target, type, exprOrder, builderNode, false, buildSideNdv);
|
||||
int exprOrder, AbstractPhysicalJoin builderNode) {
|
||||
this(id, src, target, target, type, exprOrder, builderNode, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* constructor
|
||||
*/
|
||||
public RuntimeFilter(RuntimeFilterId id, Expression src, Slot target, Expression targetExpression,
|
||||
TRuntimeFilterType type, int exprOrder, AbstractPhysicalJoin builderNode, boolean bitmapFilterNotIn,
|
||||
long buildSideNdv) {
|
||||
TRuntimeFilterType type,
|
||||
int exprOrder, AbstractPhysicalJoin builderNode, boolean bitmapFilterNotIn) {
|
||||
this.id = id;
|
||||
this.srcSlot = src;
|
||||
this.targetSlot = target;
|
||||
@ -63,7 +61,6 @@ public class RuntimeFilter {
|
||||
this.exprOrder = exprOrder;
|
||||
this.builderNode = builderNode;
|
||||
this.bitmapFilterNotIn = bitmapFilterNotIn;
|
||||
this.buildSideNdv = buildSideNdv <= 0 ? -1L : buildSideNdv;
|
||||
}
|
||||
|
||||
public Expression getSrcExpr() {
|
||||
@ -98,7 +95,4 @@ public class RuntimeFilter {
|
||||
return targetExpression;
|
||||
}
|
||||
|
||||
public long getBuildSideNdv() {
|
||||
return buildSideNdv;
|
||||
}
|
||||
}
|
||||
|
||||
@ -317,14 +317,6 @@ public abstract class PlanNode extends TreeNode<PlanNode> implements PlanStats {
|
||||
return cardinality;
|
||||
}
|
||||
|
||||
public long getCardinalityAfterFilter() {
|
||||
if (cardinalityAfterFilter < 0) {
|
||||
return cardinality;
|
||||
} else {
|
||||
return cardinalityAfterFilter;
|
||||
}
|
||||
}
|
||||
|
||||
public int getNumNodes() {
|
||||
return numNodes;
|
||||
}
|
||||
|
||||
@ -134,8 +134,8 @@ public final class RuntimeFilter {
|
||||
}
|
||||
|
||||
private RuntimeFilter(RuntimeFilterId filterId, PlanNode filterSrcNode, Expr srcExpr, int exprOrder,
|
||||
Expr origTargetExpr, Map<TupleId, List<SlotId>> targetSlots, TRuntimeFilterType type,
|
||||
RuntimeFilterGenerator.FilterSizeLimits filterSizeLimits, long buildSizeNdv) {
|
||||
Expr origTargetExpr, Map<TupleId, List<SlotId>> targetSlots,
|
||||
TRuntimeFilterType type, RuntimeFilterGenerator.FilterSizeLimits filterSizeLimits) {
|
||||
this.id = filterId;
|
||||
this.builderNode = filterSrcNode;
|
||||
this.srcExpr = srcExpr;
|
||||
@ -143,7 +143,6 @@ public final class RuntimeFilter {
|
||||
this.origTargetExpr = origTargetExpr;
|
||||
this.targetSlotsByTid = targetSlots;
|
||||
this.runtimeFilterType = type;
|
||||
this.ndvEstimate = buildSizeNdv;
|
||||
computeNdvEstimate();
|
||||
calculateFilterSize(filterSizeLimits);
|
||||
}
|
||||
@ -151,9 +150,8 @@ public final class RuntimeFilter {
|
||||
// only for nereids planner
|
||||
public static RuntimeFilter fromNereidsRuntimeFilter(RuntimeFilterId id, JoinNodeBase node, Expr srcExpr,
|
||||
int exprOrder, Expr origTargetExpr, Map<TupleId, List<SlotId>> targetSlots,
|
||||
TRuntimeFilterType type, RuntimeFilterGenerator.FilterSizeLimits filterSizeLimits, long buildSizeNdv) {
|
||||
return new RuntimeFilter(id, node, srcExpr, exprOrder, origTargetExpr,
|
||||
targetSlots, type, filterSizeLimits, buildSizeNdv);
|
||||
TRuntimeFilterType type, RuntimeFilterGenerator.FilterSizeLimits filterSizeLimits) {
|
||||
return new RuntimeFilter(id, node, srcExpr, exprOrder, origTargetExpr, targetSlots, type, filterSizeLimits);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -308,7 +306,7 @@ public final class RuntimeFilter {
|
||||
}
|
||||
|
||||
return new RuntimeFilter(idGen.getNextId(), filterSrcNode, srcExpr, exprOrder,
|
||||
targetExpr, targetSlots, type, filterSizeLimits, -1L);
|
||||
targetExpr, targetSlots, type, filterSizeLimits);
|
||||
}
|
||||
|
||||
public static RuntimeFilter create(IdGenerator<RuntimeFilterId> idGen, Analyzer analyzer, Expr joinPredicate,
|
||||
@ -345,7 +343,7 @@ public final class RuntimeFilter {
|
||||
|
||||
RuntimeFilter runtimeFilter =
|
||||
new RuntimeFilter(idGen.getNextId(), filterSrcNode, srcExpr, exprOrder, targetExpr, targetSlots,
|
||||
type, filterSizeLimits, -1L);
|
||||
type, filterSizeLimits);
|
||||
runtimeFilter.setBitmapFilterNotIn(((BitmapFilterPredicate) joinPredicate).isNotIn());
|
||||
return runtimeFilter;
|
||||
}
|
||||
@ -517,9 +515,7 @@ public final class RuntimeFilter {
|
||||
}
|
||||
|
||||
public void computeNdvEstimate() {
|
||||
if (ndvEstimate < 0) {
|
||||
ndvEstimate = builderNode.getChild(1).getCardinalityAfterFilter();
|
||||
}
|
||||
ndvEstimate = builderNode.getChild(1).getCardinality();
|
||||
}
|
||||
|
||||
public void extractTargetsPosition() {
|
||||
|
||||
@ -117,6 +117,17 @@ public final class RuntimeFilterGenerator {
|
||||
defaultValue = Math.max(defaultValue, minVal);
|
||||
defaultVal = BitUtil.roundUpToPowerOf2(Math.min(defaultValue, maxVal));
|
||||
}
|
||||
|
||||
private FilterSizeLimits(long maxVal, long minVal, long defaultVal) {
|
||||
this.maxVal = BitUtil.roundUpToPowerOf2(maxVal);
|
||||
this.minVal = BitUtil.roundUpToPowerOf2(minVal);
|
||||
defaultVal = Math.max(defaultVal, this.minVal);
|
||||
this.defaultVal = BitUtil.roundUpToPowerOf2(Math.min(defaultVal, this.maxVal));
|
||||
}
|
||||
|
||||
public FilterSizeLimits adjustForParallel(int parallel) {
|
||||
return new FilterSizeLimits(maxVal / parallel, minVal / parallel, defaultVal / parallel);
|
||||
}
|
||||
}
|
||||
|
||||
// Contains size limits for bloom filters.
|
||||
|
||||
Reference in New Issue
Block a user