[feature](nereids) runtime filter prune when column stats are not available (#27099)

1. prune rf when column stats are not available
2. print rf in "explain shape plan", both join side and apply side
3. add regression case to check plan shape/rf/rf prune for tpch_sf1000 (stats are not available)
This commit is contained in:
minghong
2023-11-18 19:32:33 +08:00
committed by GitHub
parent b42828cf69
commit 329abc3452
761 changed files with 7853 additions and 14983 deletions

View File

@ -517,7 +517,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
context.addScanNode(scanNode);
ScanNode finalScanNode = scanNode;
context.getRuntimeTranslator().ifPresent(
runtimeFilterGenerator -> runtimeFilterGenerator.getTargetOnScanNode(fileScan.getRelationId()).forEach(
runtimeFilterGenerator -> runtimeFilterGenerator.getContext().getTargetListByScan(fileScan).forEach(
expr -> runtimeFilterGenerator.translateRuntimeFilterTarget(expr, finalScanNode, context)
)
);
@ -560,7 +560,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
Utils.execWithUncheckedException(esScanNode::init);
context.addScanNode(esScanNode);
context.getRuntimeTranslator().ifPresent(
runtimeFilterGenerator -> runtimeFilterGenerator.getTargetOnScanNode(esScan.getRelationId()).forEach(
runtimeFilterGenerator -> runtimeFilterGenerator.getContext().getTargetListByScan(esScan).forEach(
expr -> runtimeFilterGenerator.translateRuntimeFilterTarget(expr, esScanNode, context)
)
);
@ -583,7 +583,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
Utils.execWithUncheckedException(jdbcScanNode::init);
context.addScanNode(jdbcScanNode);
context.getRuntimeTranslator().ifPresent(
runtimeFilterGenerator -> runtimeFilterGenerator.getTargetOnScanNode(jdbcScan.getRelationId()).forEach(
runtimeFilterGenerator -> runtimeFilterGenerator.getContext().getTargetListByScan(jdbcScan).forEach(
expr -> runtimeFilterGenerator.translateRuntimeFilterTarget(expr, jdbcScanNode, context)
)
);
@ -652,7 +652,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
// TODO: process translate runtime filter in one place
// use real plan node to present rf apply and rf generator
context.getRuntimeTranslator().ifPresent(
runtimeFilterTranslator -> runtimeFilterTranslator.getTargetOnScanNode(olapScan.getRelationId())
runtimeFilterTranslator -> runtimeFilterTranslator.getContext().getTargetListByScan(olapScan)
.forEach(expr -> runtimeFilterTranslator.translateRuntimeFilterTarget(
expr, olapScanNode, context)
)
@ -738,7 +738,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
}
SchemaScanNode finalScanNode = scanNode;
context.getRuntimeTranslator().ifPresent(
runtimeFilterGenerator -> runtimeFilterGenerator.getTargetOnScanNode(schemaScan.getRelationId())
runtimeFilterGenerator -> runtimeFilterGenerator.getContext().getTargetListByScan(schemaScan)
.forEach(expr -> runtimeFilterGenerator
.translateRuntimeFilterTarget(expr, finalScanNode, context)
)
@ -760,7 +760,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
ScanNode scanNode = catalogFunction.getScanNode(tvfRelation.translatePlanNodeId(), tupleDescriptor);
Utils.execWithUncheckedException(scanNode::init);
context.getRuntimeTranslator().ifPresent(
runtimeFilterGenerator -> runtimeFilterGenerator.getTargetOnScanNode(tvfRelation.getRelationId())
runtimeFilterGenerator -> runtimeFilterGenerator.getContext().getTargetListByScan(tvfRelation)
.forEach(expr -> runtimeFilterGenerator.translateRuntimeFilterTarget(expr, scanNode, context)
)
);
@ -977,7 +977,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
}
CTEScanNode cteScanNode = new CTEScanNode(tupleDescriptor);
context.getRuntimeTranslator().ifPresent(runtimeFilterTranslator ->
runtimeFilterTranslator.getTargetOnScanNode(cteConsumer.getRelationId()).forEach(
runtimeFilterTranslator.getContext().getTargetListByScan(cteConsumer).forEach(
expr -> runtimeFilterTranslator.translateRuntimeFilterTarget(expr, cteScanNode, context)));
context.getCteScanNodeMap().put(multiCastFragment.getFragmentId(), cteScanNode);

View File

@ -28,7 +28,6 @@ import org.apache.doris.nereids.trees.expressions.ExprId;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.plans.RelationId;
import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalJoin;
import org.apache.doris.nereids.trees.plans.physical.RuntimeFilter;
import org.apache.doris.planner.CTEScanNode;
@ -47,7 +46,6 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -72,10 +70,6 @@ public class RuntimeFilterTranslator {
return context;
}
public List<Slot> getTargetOnScanNode(RelationId id) {
return context.getTargetOnOlapScanNodeMap().getOrDefault(id, Collections.emptyList());
}
/**
* translate runtime filter target.
* @param node olap scan node

View File

@ -28,7 +28,6 @@ import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.RelationId;
import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEProducer;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
@ -107,7 +106,7 @@ public class RuntimeFilterContext {
private final Map<Plan, List<ExprId>> joinToTargetExprId = Maps.newHashMap();
// olap scan node that contains target of a runtime filter.
private final Map<RelationId, List<Slot>> targetOnOlapScanNodeMap = Maps.newHashMap();
private final Map<PhysicalRelation, List<Slot>> targetOnOlapScanNodeMap = Maps.newHashMap();
private final List<org.apache.doris.planner.RuntimeFilter> legacyFilters = Lists.newArrayList();
@ -197,6 +196,14 @@ public class RuntimeFilterContext {
RuntimeFilter rf = iter.next();
if (rf.getBuilderNode().equals(builderNode)) {
builderNode.getRuntimeFilters().remove(rf);
for (Slot target : rf.getTargetSlots()) {
if (target.getExprId().equals(targetId)) {
Pair<PhysicalRelation, Slot> pair = aliasTransferMap.get(target);
if (pair != null) {
pair.first.removeAppliedRuntimeFilter(rf);
}
}
}
iter.remove();
prunedRF.add(rf);
}
@ -204,8 +211,8 @@ public class RuntimeFilterContext {
}
}
public void setTargetsOnScanNode(RelationId id, Slot slot) {
this.targetOnOlapScanNodeMap.computeIfAbsent(id, k -> Lists.newArrayList()).add(slot);
public void setTargetsOnScanNode(PhysicalRelation relation, Slot slot) {
this.targetOnOlapScanNodeMap.computeIfAbsent(relation, k -> Lists.newArrayList()).add(slot);
}
public RuntimeFilter getRuntimeFilterBySrcAndType(Expression src,
@ -247,8 +254,8 @@ public class RuntimeFilterContext {
return targetExprIdToFilter;
}
public Map<RelationId, List<Slot>> getTargetOnOlapScanNodeMap() {
return targetOnOlapScanNodeMap;
public List<Slot> getTargetListByScan(PhysicalRelation scan) {
return targetOnOlapScanNodeMap.getOrDefault(scan, Collections.emptyList());
}
public List<org.apache.doris.planner.RuntimeFilter> getLegacyFilters() {

View File

@ -183,12 +183,14 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
continue;
}
Slot scanSlot = aliasTransferMap.get(targetSlot).second;
PhysicalRelation scan = aliasTransferMap.get(targetSlot).first;
RuntimeFilter filter = new RuntimeFilter(generator.getNextId(),
bitmapContains.child(0), ImmutableList.of(scanSlot),
ImmutableList.of(bitmapContains.child(1)), type, i, join, isNot, -1L);
scan.addAppliedRuntimeFilter(filter);
ctx.addJoinToTargetMap(join, scanSlot.getExprId());
ctx.setTargetExprIdToFilter(scanSlot.getExprId(), filter);
ctx.setTargetsOnScanNode(aliasTransferMap.get(targetSlot).first.getRelationId(),
ctx.setTargetsOnScanNode(aliasTransferMap.get(targetSlot).first,
scanSlot);
join.addBitmapRuntimeFilterCondition(bitmapRuntimeFilterCondition);
}
@ -266,9 +268,10 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
compare.child(1), ImmutableList.of(olapScanSlot), ImmutableList.of(olapScanSlot),
TRuntimeFilterType.MIN_MAX, exprOrder, join, true, buildSideNdv,
getMinMaxType(compare));
scan.addAppliedRuntimeFilter(filter);
ctx.addJoinToTargetMap(join, olapScanSlot.getExprId());
ctx.setTargetExprIdToFilter(olapScanSlot.getExprId(), filter);
ctx.setTargetsOnScanNode(scan.getRelationId(), olapScanSlot);
ctx.setTargetsOnScanNode(scan, olapScanSlot);
}
}
}
@ -600,6 +603,7 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
(SlotReference) targetExpr, aliasTransferMap);
if (!pushDownBasicTableInfos.isEmpty()) {
List<Slot> targetList = new ArrayList<>();
List<PhysicalRelation> targetNodes = new ArrayList<>();
for (Map.Entry<Slot, PhysicalRelation> entry : pushDownBasicTableInfos.entrySet()) {
Slot targetSlot = entry.getKey();
PhysicalRelation scan = entry.getValue();
@ -607,13 +611,15 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
continue;
}
targetList.add(targetSlot);
targetNodes.add(scan);
ctx.addJoinToTargetMap(join, targetSlot.getExprId());
ctx.setTargetsOnScanNode(scan.getRelationId(), targetSlot);
ctx.setTargetsOnScanNode(scan, targetSlot);
}
// build multi-target runtime filter
// since always on different join, set the expr_order as 0
RuntimeFilter filter = new RuntimeFilter(generator.getNextId(),
equalTo.right(), targetList, type, 0, join, buildSideNdv);
targetNodes.forEach(node -> node.addAppliedRuntimeFilter(filter));
for (Slot slot : targetList) {
ctx.setTargetExprIdToFilter(slot.getExprId(), filter);
}

View File

@ -25,12 +25,9 @@ import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.plans.AbstractPlan;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalAssertNumRows;
import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit;
import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
import org.apache.doris.nereids.trees.plans.physical.PhysicalQuickSort;
import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalTopN;
import org.apache.doris.statistics.ColumnStatistic;
@ -54,12 +51,14 @@ import java.util.Set;
public class RuntimeFilterPruner extends PlanPostProcessor {
@Override
public PhysicalQuickSort visitPhysicalQuickSort(PhysicalQuickSort<? extends Plan> sort, CascadesContext context) {
sort.child().accept(this, context);
if (context.getRuntimeFilterContext().isEffectiveSrcNode(sort.child())) {
context.getRuntimeFilterContext().addEffectiveSrcNode(sort);
public Plan visit(Plan plan, CascadesContext context) {
if (!plan.children().isEmpty()) {
plan.child(0).accept(this, context);
if (context.getRuntimeFilterContext().isEffectiveSrcNode(plan.child(0))) {
context.getRuntimeFilterContext().addEffectiveSrcNode(plan);
}
}
return sort;
return plan;
}
@Override
@ -97,16 +96,10 @@ public class RuntimeFilterPruner extends PlanPostProcessor {
}
}
join.left().accept(this, context);
return join;
}
@Override
public PhysicalProject visitPhysicalProject(PhysicalProject<? extends Plan> project, CascadesContext context) {
project.child().accept(this, context);
if (context.getRuntimeFilterContext().isEffectiveSrcNode(project.child())) {
context.getRuntimeFilterContext().addEffectiveSrcNode(project);
if (context.getRuntimeFilterContext().isEffectiveSrcNode(join.left())) {
context.getRuntimeFilterContext().addEffectiveSrcNode(join);
}
return project;
return join;
}
@Override
@ -119,31 +112,17 @@ public class RuntimeFilterPruner extends PlanPostProcessor {
@Override
public PhysicalRelation visitPhysicalRelation(PhysicalRelation scan, CascadesContext context) {
RuntimeFilterContext rfCtx = context.getRuntimeFilterContext();
List<Slot> slots = rfCtx.getTargetOnOlapScanNodeMap().get(scan.getRelationId());
if (slots != null) {
for (Slot slot : slots) {
//if this scan node is the target of any effective RF, it is effective source
if (!rfCtx.getTargetExprIdToFilter().get(slot.getExprId()).isEmpty()) {
context.getRuntimeFilterContext().addEffectiveSrcNode(scan);
break;
}
List<Slot> slots = rfCtx.getTargetListByScan(scan);
for (Slot slot : slots) {
//if this scan node is the target of any effective RF, it is effective source
if (!rfCtx.getTargetExprIdToFilter().get(slot.getExprId()).isEmpty()) {
context.getRuntimeFilterContext().addEffectiveSrcNode(scan);
break;
}
}
return scan;
}
// *******************************
// Physical enforcer
// *******************************
public PhysicalDistribute visitPhysicalDistribute(PhysicalDistribute<? extends Plan> distribute,
CascadesContext context) {
distribute.child().accept(this, context);
if (context.getRuntimeFilterContext().isEffectiveSrcNode(distribute.child())) {
context.getRuntimeFilterContext().addEffectiveSrcNode(distribute);
}
return distribute;
}
public PhysicalAssertNumRows visitPhysicalAssertNumRows(PhysicalAssertNumRows<? extends Plan> assertNumRows,
CascadesContext context) {
assertNumRows.child().accept(this, context);
@ -185,10 +164,11 @@ public class RuntimeFilterPruner extends PlanPostProcessor {
return false;
}
}
//without column statistics, we can not judge if the rf is effective.
if (probeColumnStat.isUnKnown || buildColumnStat.isUnKnown) {
return true;
return false;
}
double buildNdvInProbeRange = buildColumnStat.ndvIntersection(probeColumnStat);
return probeColumnStat.ndv > buildNdvInProbeRange * (1 + ColumnStatistic.STATS_ERROR);
}

View File

@ -640,8 +640,7 @@ public class StatsCalculator extends DefaultPlanVisitor<Statistics, Void> {
throw new RuntimeException(String.format("Invalid slot: %s", slotReference.getExprId()));
}
ColumnStatistic cache;
if (ConnectContext.get() == null || !ConnectContext.get().getSessionVariable().enableStats
|| !FeConstants.enableInternalSchemaDb
if (!FeConstants.enableInternalSchemaDb
|| shouldIgnoreThisCol) {
cache = ColumnStatistic.UNKNOWN;
} else {
@ -663,7 +662,11 @@ public class StatsCalculator extends DefaultPlanVisitor<Statistics, Void> {
cache = columnStatisticBuilder.build();
}
}
columnStatisticMap.put(slotReference, cache);
if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable().enableStats) {
columnStatisticMap.put(slotReference, cache);
} else {
columnStatisticMap.put(slotReference, ColumnStatistic.UNKNOWN);
}
}
return new Statistics(rowCount, columnStatisticMap);
}

View File

@ -40,7 +40,9 @@ import org.apache.doris.thrift.TRuntimeFilterType;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.annotation.Nullable;
@ -49,8 +51,8 @@ import javax.annotation.Nullable;
* Abstract class for all concrete physical plan.
*/
public abstract class AbstractPhysicalPlan extends AbstractPlan implements PhysicalPlan, Explainable {
protected final PhysicalProperties physicalProperties;
private final List<RuntimeFilter> appliedRuntimeFilters = Lists.newArrayList();
public AbstractPhysicalPlan(PlanType type, LogicalProperties logicalProperties, Plan... children) {
this(type, Optional.empty(), logicalProperties, children);
@ -121,13 +123,14 @@ public abstract class AbstractPhysicalPlan extends AbstractPlan implements Physi
Preconditions.checkState(scanSlot != null, "scan slot is null");
if (filter != null) {
filter.addTargetSlot(scanSlot);
filter.addTargetExpressoin(scanSlot);
filter.addTargetExpression(scanSlot);
} else {
filter = new RuntimeFilter(generator.getNextId(),
src, ImmutableList.of(scanSlot), type, exprOrder, builderNode, buildSideNdv);
this.addAppliedRuntimeFilter(filter);
ctx.addJoinToTargetMap(builderNode, scanSlot.getExprId());
ctx.setTargetExprIdToFilter(scanSlot.getExprId(), filter);
ctx.setTargetsOnScanNode(aliasTransferMap.get(probeExpr).first.getRelationId(), scanSlot);
ctx.setTargetsOnScanNode(aliasTransferMap.get(probeExpr).first, scanSlot);
ctx.setRuntimeFilterIdentityToFilter(src, type, builderNode, filter);
}
return true;
@ -144,4 +147,16 @@ public abstract class AbstractPhysicalPlan extends AbstractPlan implements Physi
newPlan.setMutableState(MutableState.KEY_GROUP, from.getGroupIdAsString());
return newPlan;
}
public List<RuntimeFilter> getAppliedRuntimeFilters() {
return appliedRuntimeFilters;
}
public void addAppliedRuntimeFilter(RuntimeFilter filter) {
appliedRuntimeFilters.add(filter);
}
public void removeAppliedRuntimeFilter(RuntimeFilter filter) {
appliedRuntimeFilters.remove(filter);
}
}

View File

@ -127,4 +127,17 @@ public abstract class PhysicalCatalogRelation extends PhysicalRelation implement
public boolean canPushDownRuntimeFilter() {
return true;
}
@Override
public String shapeInfo() {
StringBuilder shapeBuilder = new StringBuilder();
shapeBuilder.append(this.getClass().getSimpleName())
.append("[").append(table.getName()).append("]");
if (!getAppliedRuntimeFilters().isEmpty()) {
shapeBuilder.append(" apply RFs:");
getAppliedRuntimeFilters()
.stream().forEach(rf -> shapeBuilder.append(" RF").append(rf.getId().asInt()));
}
return shapeBuilder.toString();
}
}

View File

@ -255,7 +255,11 @@ public class PhysicalHashJoin<
builder.append(hashJoinConjuncts.stream().map(conjunct -> conjunct.shapeInfo())
.sorted().collect(Collectors.joining(" and ", " hashCondition=(", ")")));
builder.append(otherJoinConjuncts.stream().map(cond -> cond.shapeInfo())
.sorted().collect(Collectors.joining(" and ", "otherCondition=(", ")")));
.sorted().collect(Collectors.joining(" and ", " otherCondition=(", ")")));
if (!runtimeFilters.isEmpty()) {
builder.append(" build RFs:").append(runtimeFilters.stream()
.map(rf -> rf.shapeInfo()).collect(Collectors.joining(";")));
}
return builder.toString();
}

View File

@ -180,11 +180,6 @@ public class PhysicalOlapScan extends PhysicalCatalogRelation implements OlapSca
getLogicalProperties(), physicalProperties, statistics, tableSample);
}
@Override
public String shapeInfo() {
return this.getClass().getSimpleName() + "[" + table.getName() + "]";
}
@Override
public JSONObject toJson() {
JSONObject olapScan = super.toJson();

View File

@ -27,6 +27,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import java.util.List;
import java.util.stream.Collectors;
/**
* runtime filter
@ -128,7 +129,11 @@ public class RuntimeFilter {
targetSlots.add(target);
}
public void addTargetExpressoin(Expression targetExpr) {
public List<Slot> getTargetSlots() {
return targetSlots;
}
public void addTargetExpression(Expression targetExpr) {
targetExpressions.add(targetExpr);
}
@ -142,4 +147,17 @@ public class RuntimeFilter {
.append(")");
return sb.toString();
}
/**
* print rf in explain shape plan
* @return brief version of toString()
*/
public String shapeInfo() {
StringBuilder sb = new StringBuilder();
sb.append("RF").append(id.asInt())
.append(" ").append(getSrcExpr().toSql()).append("->[").append(
targetSlots.stream().map(slot -> slot.getName()).collect(Collectors.joining(",")))
.append("]");
return sb.toString();
}
}

View File

@ -267,6 +267,10 @@ public class ColumnStatistic {
if (isUnKnown) {
return 1;
}
if (Double.isInfinite(minValue) || Double.isInfinite(maxValue)
|| Double.isInfinite(other.minValue) || Double.isInfinite(other.maxValue)) {
return 1;
}
if (maxValue == minValue) {
if (minValue <= other.maxValue && minValue >= other.minValue) {
return 1;