[fix](nereids) support cte rf pushdown for exttable (#24806)

Currently, cte inside rf pushdown doesn't support external table, this pr open this restriction.
And since rf for schema scan, for example tables under information_schema, currently isn't supported by be, this pr also check and prevent to generate rf on them.
This commit is contained in:
xzj7019
2023-09-23 23:00:05 +08:00
committed by GitHub
parent 9cd9e195d8
commit ece2e3cd70
11 changed files with 86 additions and 59 deletions

View File

@ -42,7 +42,6 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOneRowRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
@ -84,7 +83,7 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
);
private static final Set<Class<? extends PhysicalPlan>> SPJ_PLAN = ImmutableSet.of(
PhysicalOlapScan.class,
PhysicalRelation.class,
PhysicalProject.class,
PhysicalFilter.class,
PhysicalDistribute.class,
@ -103,7 +102,7 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
* second step: if encounter project, collect the association of its child and it for pushing down through
* the project node.
* plan translation:
* third step: generate nereids runtime filter target at olap scan node fragment.
* third step: generate nereids runtime filter target at scan node fragment.
* forth step: generate legacy runtime filter target and runtime filter at hash join node fragment.
* NOTICE: bottom-up travel the plan tree!!!
*/
@ -180,17 +179,17 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
TRuntimeFilterType type = TRuntimeFilterType.BITMAP;
Set<Slot> targetSlots = bitmapContains.child(1).getInputSlots();
for (Slot targetSlot : targetSlots) {
if (!checkPushDownPreconditions(join, ctx, targetSlot)) {
if (!checkPushDownPreconditionsForJoin(join, ctx, targetSlot)) {
continue;
}
Slot olapScanSlot = aliasTransferMap.get(targetSlot).second;
Slot scanSlot = aliasTransferMap.get(targetSlot).second;
RuntimeFilter filter = new RuntimeFilter(generator.getNextId(),
bitmapContains.child(0), ImmutableList.of(olapScanSlot),
bitmapContains.child(0), ImmutableList.of(scanSlot),
ImmutableList.of(bitmapContains.child(1)), type, i, join, isNot, -1L);
ctx.addJoinToTargetMap(join, olapScanSlot.getExprId());
ctx.setTargetExprIdToFilter(olapScanSlot.getExprId(), filter);
ctx.addJoinToTargetMap(join, scanSlot.getExprId());
ctx.setTargetExprIdToFilter(scanSlot.getExprId(), filter);
ctx.setTargetsOnScanNode(aliasTransferMap.get(targetSlot).first.getRelationId(),
olapScanSlot);
scanSlot);
join.addBitmapRuntimeFilterCondition(bitmapRuntimeFilterCondition);
}
}
@ -467,9 +466,9 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap = ctx.getAliasTransferMap();
PhysicalPlan inputPlanNode = (PhysicalPlan) cteProducer.child(0);
Slot unwrappedSlot = checkTargetChild(equalTo.left());
// aliasTransMap doesn't contain the key, means that the path from the olap scan to the join
// aliasTransMap doesn't contain the key, means that the path from the scan to the join
// contains join with denied join type. for example: a left join b on a.id = b.id
if (!checkPushDownPreconditions(join, ctx, unwrappedSlot)) {
if (!checkPushDownPreconditionsForJoin(join, ctx, unwrappedSlot)) {
return;
}
Slot cteSlot = aliasTransferMap.get(unwrappedSlot).second;
@ -492,13 +491,16 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
} else if (!checkCanPushDownIntoBasicTable(project)) {
return;
} else {
Map<Slot, PhysicalOlapScan> pushDownBasicTableInfos = getPushDownBasicTablesInfos(project,
Map<Slot, PhysicalRelation> pushDownBasicTableInfos = getPushDownBasicTablesInfos(project,
(SlotReference) targetExpr, aliasTransferMap);
if (!pushDownBasicTableInfos.isEmpty()) {
List<Slot> targetList = new ArrayList<>();
for (Map.Entry<Slot, PhysicalOlapScan> entry : pushDownBasicTableInfos.entrySet()) {
for (Map.Entry<Slot, PhysicalRelation> entry : pushDownBasicTableInfos.entrySet()) {
Slot targetSlot = entry.getKey();
PhysicalOlapScan scan = entry.getValue();
PhysicalRelation scan = entry.getValue();
if (!RuntimeFilterGenerator.checkPushDownPreconditionsForRelation(project, scan)) {
continue;
}
targetList.add(targetSlot);
ctx.addJoinToTargetMap(join, targetSlot.getExprId());
ctx.setTargetsOnScanNode(scan.getRelationId(), targetSlot);
@ -539,7 +541,7 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
/**
* Check runtime filter push down pre-conditions, such as builder side join type, etc.
*/
public static boolean checkPushDownPreconditions(AbstractPhysicalJoin physicalJoin,
public static boolean checkPushDownPreconditionsForJoin(AbstractPhysicalJoin physicalJoin,
RuntimeFilterContext ctx, Slot slot) {
Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap = ctx.getAliasTransferMap();
if (slot == null || !aliasTransferMap.containsKey(slot)) {
@ -551,6 +553,21 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
}
}
/**
* Check runtime filter push down relation related pre-conditions.
*/
public static boolean checkPushDownPreconditionsForRelation(PhysicalPlan root, PhysicalRelation relation) {
Preconditions.checkState(relation != null, "relation is null");
// check if the relation supports runtime filter push down
if (!relation.canPushDownRuntimeFilter()) {
return false;
}
// check if the plan root can cover the push down candidate relation
Set<PhysicalRelation> relations = new HashSet<>();
RuntimeFilterGenerator.getAllScanInfo(root, relations);
return relations.contains(relation);
}
private boolean checkCanPushDownIntoBasicTable(PhysicalPlan root) {
// only support spj currently
List<PhysicalPlan> plans = Lists.newArrayList();
@ -558,13 +575,13 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
return plans.stream().allMatch(p -> SPJ_PLAN.stream().anyMatch(c -> c.isInstance(p)));
}
private Map<Slot, PhysicalOlapScan> getPushDownBasicTablesInfos(PhysicalPlan root, SlotReference slot,
private Map<Slot, PhysicalRelation> getPushDownBasicTablesInfos(PhysicalPlan root, SlotReference slot,
Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap) {
Map<Slot, PhysicalOlapScan> basicTableInfos = new HashMap<>();
Map<Slot, PhysicalRelation> basicTableInfos = new HashMap<>();
Set<PhysicalHashJoin> joins = new HashSet<>();
ExprId exprId = slot.getExprId();
if (aliasTransferMap.get(slot) != null && aliasTransferMap.get(slot).first instanceof PhysicalOlapScan) {
basicTableInfos.put(slot, (PhysicalOlapScan) aliasTransferMap.get(slot).first);
if (aliasTransferMap.get(slot) != null) {
basicTableInfos.put(slot, aliasTransferMap.get(slot).first);
}
// try to find propagation condition from join
getAllJoinInfo(root, joins);
@ -575,12 +592,12 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
SlotReference leftSlot = (SlotReference) ((EqualTo) equalTo).left();
SlotReference rightSlot = (SlotReference) ((EqualTo) equalTo).right();
if (leftSlot.getExprId() == exprId && aliasTransferMap.get(rightSlot) != null) {
PhysicalOlapScan rightTable = (PhysicalOlapScan) aliasTransferMap.get(rightSlot).first;
PhysicalRelation rightTable = aliasTransferMap.get(rightSlot).first;
if (rightTable != null) {
basicTableInfos.put(rightSlot, rightTable);
}
} else if (rightSlot.getExprId() == exprId && aliasTransferMap.get(leftSlot) != null) {
PhysicalOlapScan leftTable = (PhysicalOlapScan) aliasTransferMap.get(leftSlot).first;
PhysicalRelation leftTable = aliasTransferMap.get(leftSlot).first;
if (leftTable != null) {
basicTableInfos.put(leftSlot, leftTable);
}
@ -601,12 +618,6 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
}
}
public static boolean isCoveredByPlanNode(PhysicalPlan root, PhysicalRelation relation) {
Set<PhysicalRelation> relations = new HashSet<>();
RuntimeFilterGenerator.getAllScanInfo(root, relations);
return relations.contains(relation);
}
/**
* Get all relation node from current root plan.
*/

View File

@ -88,9 +88,9 @@ public abstract class AbstractPhysicalPlan extends AbstractPlan implements Physi
// so right maybe an expression and left is a slot
Slot probeSlot = RuntimeFilterGenerator.checkTargetChild(probeExpr);
// aliasTransMap doesn't contain the key, means that the path from the olap scan to the join
// aliasTransMap doesn't contain the key, means that the path from the scan to the join
// contains join with denied join type. for example: a left join b on a.id = b.id
if (!RuntimeFilterGenerator.checkPushDownPreconditions(builderNode, ctx, probeSlot)) {
if (!RuntimeFilterGenerator.checkPushDownPreconditionsForJoin(builderNode, ctx, probeSlot)) {
return false;
}
@ -104,12 +104,11 @@ public abstract class AbstractPhysicalPlan extends AbstractPlan implements Physi
return true;
}
Slot olapScanSlot = aliasTransferMap.get(probeSlot).second;
Slot scanSlot = aliasTransferMap.get(probeSlot).second;
PhysicalRelation scan = aliasTransferMap.get(probeSlot).first;
if (!RuntimeFilterGenerator.isCoveredByPlanNode(this, scan)) {
if (!RuntimeFilterGenerator.checkPushDownPreconditionsForRelation(this, scan)) {
return false;
}
Preconditions.checkState(olapScanSlot != null && scan != null);
// in-filter is not friendly to pipeline
if (type == TRuntimeFilterType.IN_OR_BLOOM
@ -119,15 +118,16 @@ public abstract class AbstractPhysicalPlan extends AbstractPlan implements Physi
}
org.apache.doris.nereids.trees.plans.physical.RuntimeFilter filter =
ctx.getRuntimeFilterBySrcAndType(src, type, builderNode);
Preconditions.checkState(scanSlot != null, "scan slot is null");
if (filter != null) {
filter.addTargetSlot(olapScanSlot);
filter.addTargetExpressoin(olapScanSlot);
filter.addTargetSlot(scanSlot);
filter.addTargetExpressoin(scanSlot);
} else {
filter = new RuntimeFilter(generator.getNextId(),
src, ImmutableList.of(olapScanSlot), type, exprOrder, builderNode, buildSideNdv);
ctx.addJoinToTargetMap(builderNode, olapScanSlot.getExprId());
ctx.setTargetExprIdToFilter(olapScanSlot.getExprId(), filter);
ctx.setTargetsOnScanNode(aliasTransferMap.get(probeExpr).first.getRelationId(), olapScanSlot);
src, ImmutableList.of(scanSlot), type, exprOrder, builderNode, buildSideNdv);
ctx.addJoinToTargetMap(builderNode, scanSlot.getExprId());
ctx.setTargetExprIdToFilter(scanSlot.getExprId(), filter);
ctx.setTargetsOnScanNode(aliasTransferMap.get(probeExpr).first.getRelationId(), scanSlot);
ctx.setRuntimeFilterIdentityToFilter(src, type, builderNode, filter);
}
return true;

View File

@ -150,4 +150,9 @@ public class PhysicalCTEConsumer extends PhysicalRelation {
return super.pushDownRuntimeFilter(context, generator, builderNode,
src, probeExpr, type, buildSideNdv, exprOrder);
}
@Override
public boolean canPushDownRuntimeFilter() {
return true;
}
}

View File

@ -123,4 +123,8 @@ public abstract class PhysicalCatalogRelation extends PhysicalRelation implement
return Utils.qualifiedName(qualifier, table.getName());
}
@Override
public boolean canPushDownRuntimeFilter() {
return true;
}
}

View File

@ -139,22 +139,21 @@ public class PhysicalDistribute<CHILD_TYPE extends Plan> extends PhysicalUnary<C
// so right maybe an expression and left is a slot
Slot probeSlot = RuntimeFilterGenerator.checkTargetChild(probeExpr);
// aliasTransMap doesn't contain the key, means that the path from the olap scan to the join
// aliasTransMap doesn't contain the key, means that the path from the scan to the join
// contains join with denied join type. for example: a left join b on a.id = b.id
if (!RuntimeFilterGenerator.checkPushDownPreconditions(builderNode, ctx, probeSlot)) {
if (!RuntimeFilterGenerator.checkPushDownPreconditionsForJoin(builderNode, ctx, probeSlot)) {
return false;
}
PhysicalRelation scan = aliasTransferMap.get(probeSlot).first;
if (!RuntimeFilterGenerator.isCoveredByPlanNode(this, scan)) {
if (!RuntimeFilterGenerator.checkPushDownPreconditionsForRelation(this, scan)) {
return false;
}
// TODO: global rf need merge stage which is heavy
// add some rule, such as bc only is allowed for
// pushing down through distribute, currently always pushing.
AbstractPhysicalPlan childPlan = (AbstractPhysicalPlan) child(0);
boolean pushedDown = childPlan.pushDownRuntimeFilter(context, generator, builderNode, src, probeExpr,
return childPlan.pushDownRuntimeFilter(context, generator, builderNode, src, probeExpr,
type, buildSideNdv, exprOrder);
return pushedDown;
}
@Override

View File

@ -303,20 +303,19 @@ public class PhysicalHashAggregate<CHILD_TYPE extends Plan> extends PhysicalUnar
// so right maybe an expression and left is a slot
Slot probeSlot = RuntimeFilterGenerator.checkTargetChild(probeExpr);
// aliasTransMap doesn't contain the key, means that the path from the olap scan to the join
// aliasTransMap doesn't contain the key, means that the path from the scan to the join
// contains join with denied join type. for example: a left join b on a.id = b.id
if (!RuntimeFilterGenerator.checkPushDownPreconditions(builderNode, ctx, probeSlot)) {
if (!RuntimeFilterGenerator.checkPushDownPreconditionsForJoin(builderNode, ctx, probeSlot)) {
return false;
}
PhysicalRelation scan = aliasTransferMap.get(probeSlot).first;
if (!RuntimeFilterGenerator.isCoveredByPlanNode(this, scan)) {
if (!RuntimeFilterGenerator.checkPushDownPreconditionsForRelation(this, scan)) {
return false;
}
AbstractPhysicalPlan child = (AbstractPhysicalPlan) child(0);
boolean pushedDown = child.pushDownRuntimeFilter(context, generator, builderNode,
return child.pushDownRuntimeFilter(context, generator, builderNode,
src, probeExpr, type, buildSideNdv, exprOrder);
return pushedDown;
}
@Override

View File

@ -256,19 +256,18 @@ public class PhysicalHashJoin<
pushedDown |= rightNode.pushDownRuntimeFilter(context, generator, builderNode,
srcExpr, prob, type, buildSideNdv, exprOrder);
}
// currently, we can ensure children in the two side are corresponding to the equal_to's.
// so right maybe an expression and left is a slot
Slot probeSlot = RuntimeFilterGenerator.checkTargetChild(probeExpr);
// aliasTransMap doesn't contain the key, means that the path from the olap scan to the join
// contains join with denied join type. for example: a left join b on a.id = b.id
if (!RuntimeFilterGenerator.checkPushDownPreconditions(builderNode, ctx, probeSlot)) {
if (!RuntimeFilterGenerator.checkPushDownPreconditionsForJoin(builderNode, ctx, probeSlot)) {
return false;
}
Slot olapScanSlot = aliasTransferMap.get(probeSlot).second;
PhysicalRelation scan = aliasTransferMap.get(probeSlot).first;
Preconditions.checkState(olapScanSlot != null && scan != null);
if (!RuntimeFilterGenerator.isCoveredByPlanNode(this, scan)) {
if (!RuntimeFilterGenerator.checkPushDownPreconditionsForRelation(this, scan)) {
return false;
}

View File

@ -167,12 +167,13 @@ public class PhysicalProject<CHILD_TYPE extends Plan> extends PhysicalUnary<CHIL
// so right maybe an expression and left is a slot
Slot probeSlot = RuntimeFilterGenerator.checkTargetChild(probeExpr);
// aliasTransMap doesn't contain the key, means that the path from the olap scan to the join
// aliasTransMap doesn't contain the key, means that the path from the scan to the join
// contains join with denied join type. for example: a left join b on a.id = b.id
if (!RuntimeFilterGenerator.checkPushDownPreconditions(builderNode, ctx, probeSlot)) {
if (!RuntimeFilterGenerator.checkPushDownPreconditionsForJoin(builderNode, ctx, probeSlot)) {
return false;
}
PhysicalRelation scan = aliasTransferMap.get(probeSlot).first;
Preconditions.checkState(scan != null, "scan is null");
if (scan instanceof PhysicalCTEConsumer) {
// update the probeExpr
int projIndex = -1;
@ -192,20 +193,19 @@ public class PhysicalProject<CHILD_TYPE extends Plan> extends PhysicalUnary<CHIL
newProbeExpr = (NamedExpression) newProbeExpr.child(0);
}
Slot newProbeSlot = RuntimeFilterGenerator.checkTargetChild(newProbeExpr);
if (!RuntimeFilterGenerator.checkPushDownPreconditions(builderNode, ctx, newProbeSlot)) {
if (!RuntimeFilterGenerator.checkPushDownPreconditionsForJoin(builderNode, ctx, newProbeSlot)) {
return false;
}
scan = aliasTransferMap.get(newProbeSlot).first;
probeExpr = newProbeExpr;
}
if (!RuntimeFilterGenerator.isCoveredByPlanNode(this, scan)) {
if (!RuntimeFilterGenerator.checkPushDownPreconditionsForRelation(this, scan)) {
return false;
}
AbstractPhysicalPlan child = (AbstractPhysicalPlan) child(0);
boolean pushedDown = child.pushDownRuntimeFilter(context, generator, builderNode,
return child.pushDownRuntimeFilter(context, generator, builderNode,
src, probeExpr, type, buildSideNdv, exprOrder);
return pushedDown;
}
@Override

View File

@ -83,4 +83,8 @@ public abstract class PhysicalRelation extends PhysicalLeaf implements Relation
public RelationId getRelationId() {
return relationId;
}
public boolean canPushDownRuntimeFilter() {
return false;
}
}

View File

@ -82,4 +82,10 @@ public class PhysicalSchemaScan extends PhysicalCatalogRelation {
public String toString() {
return Utils.toSqlString("PhysicalSchemaScan");
}
@Override
public boolean canPushDownRuntimeFilter() {
// currently be doesn't support schema scan rf
return false;
}
}

View File

@ -178,7 +178,7 @@ public abstract class PhysicalSetOperation extends AbstractPhysicalPlan implemen
newProbeExpr = (NamedExpression) newProbeExpr.child(0);
}
Slot newProbeSlot = RuntimeFilterGenerator.checkTargetChild(newProbeExpr);
if (!RuntimeFilterGenerator.checkPushDownPreconditions(builderNode, ctx, newProbeSlot)) {
if (!RuntimeFilterGenerator.checkPushDownPreconditionsForJoin(builderNode, ctx, newProbeSlot)) {
continue;
}
pushedDown |= child.pushDownRuntimeFilter(context, generator, builderNode, src,