[fix](Nereids) fix rf push down union (#29847)
Current union rf push down only support rf from parent join, but not support ancestor join. The pr fixes this problem on project/distribute node's rf pushing down checking.
This commit is contained in:
@ -657,6 +657,17 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
|
||||
return window;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check runtime filter push down project/distribute pre-conditions.
|
||||
*/
|
||||
public static boolean checkPushDownPreconditionsForProjectOrDistribute(RuntimeFilterContext ctx, Slot slot) {
|
||||
if (slot == null || !ctx.aliasTransferMapContains(slot)) {
|
||||
return false;
|
||||
} else {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check runtime filter push down pre-conditions, such as builder side join type, etc.
|
||||
*/
|
||||
|
||||
@ -134,22 +134,26 @@ public class PhysicalDistribute<CHILD_TYPE extends Plan> extends PhysicalUnary<C
|
||||
// currently, we can ensure children in the two side are corresponding to the equal_to's.
|
||||
// so right maybe an expression and left is a slot
|
||||
Slot probeSlot = RuntimeFilterGenerator.checkTargetChild(probeExpr);
|
||||
|
||||
// aliasTransMap doesn't contain the key, means that the path from the scan to the join
|
||||
// contains join with denied join type. for example: a left join b on a.id = b.id
|
||||
if (!RuntimeFilterGenerator.checkPushDownPreconditionsForJoin(builderNode, ctx, probeSlot)) {
|
||||
if (probeSlot == null) {
|
||||
return false;
|
||||
}
|
||||
PhysicalRelation scan = ctx.getAliasTransferPair(probeSlot).first;
|
||||
if (!RuntimeFilterGenerator.checkPushDownPreconditionsForRelation(this, scan)) {
|
||||
return false;
|
||||
if (RuntimeFilterGenerator.checkPushDownPreconditionsForProjectOrDistribute(ctx, probeSlot)) {
|
||||
PhysicalRelation scan = ctx.getAliasTransferPair(probeSlot).first;
|
||||
if (!RuntimeFilterGenerator.checkPushDownPreconditionsForRelation(this, scan)) {
|
||||
return false;
|
||||
}
|
||||
// TODO: global rf need merge stage which is heavy
|
||||
// add some rule, such as bc only is allowed for
|
||||
// pushing down through distribute, currently always pushing.
|
||||
AbstractPhysicalPlan childPlan = (AbstractPhysicalPlan) child(0);
|
||||
return childPlan.pushDownRuntimeFilter(context, generator, builderNode, src, probeExpr,
|
||||
type, buildSideNdv, exprOrder);
|
||||
} else {
|
||||
// if probe slot doesn't exist in aliasTransferMap, then try to pass it to child
|
||||
AbstractPhysicalPlan childPlan = (AbstractPhysicalPlan) child(0);
|
||||
return childPlan.pushDownRuntimeFilter(context, generator, builderNode, src, probeExpr,
|
||||
type, buildSideNdv, exprOrder);
|
||||
}
|
||||
// TODO: global rf need merge stage which is heavy
|
||||
// add some rule, such as bc only is allowed for
|
||||
// pushing down through distribute, currently always pushing.
|
||||
AbstractPhysicalPlan childPlan = (AbstractPhysicalPlan) child(0);
|
||||
return childPlan.pushDownRuntimeFilter(context, generator, builderNode, src, probeExpr,
|
||||
type, buildSideNdv, exprOrder);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -241,20 +241,6 @@ public class PhysicalHashJoin<
|
||||
srcExpr, prob, type, buildSideNdv, exprOrder);
|
||||
}
|
||||
|
||||
// currently, we can ensure children in the two side are corresponding to the equal_to's.
|
||||
// so right maybe an expression and left is a slot
|
||||
Slot probeSlot = RuntimeFilterGenerator.checkTargetChild(probeExpr);
|
||||
|
||||
// aliasTransMap doesn't contain the key, means that the path from the olap scan to the join
|
||||
// contains join with denied join type. for example: a left join b on a.id = b.id
|
||||
if (!RuntimeFilterGenerator.checkPushDownPreconditionsForJoin(builderNode, ctx, probeSlot)) {
|
||||
return false;
|
||||
}
|
||||
PhysicalRelation scan = ctx.getAliasTransferPair(probeSlot).first;
|
||||
if (!RuntimeFilterGenerator.checkPushDownPreconditionsForRelation(this, scan)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return pushedDown;
|
||||
}
|
||||
|
||||
|
||||
@ -163,46 +163,51 @@ public class PhysicalProject<CHILD_TYPE extends Plan> extends PhysicalUnary<CHIL
|
||||
// currently, we can ensure children in the two side are corresponding to the equal_to's.
|
||||
// so right maybe an expression and left is a slot
|
||||
Slot probeSlot = RuntimeFilterGenerator.checkTargetChild(probeExpr);
|
||||
|
||||
// aliasTransMap doesn't contain the key, means that the path from the scan to the join
|
||||
// contains join with denied join type. for example: a left join b on a.id = b.id
|
||||
if (!RuntimeFilterGenerator.checkPushDownPreconditionsForJoin(builderNode, ctx, probeSlot)) {
|
||||
if (probeSlot == null) {
|
||||
return false;
|
||||
}
|
||||
PhysicalRelation scan = ctx.getAliasTransferPair(probeSlot).first;
|
||||
Preconditions.checkState(scan != null, "scan is null");
|
||||
if (scan instanceof PhysicalCTEConsumer) {
|
||||
// update the probeExpr
|
||||
int projIndex = -1;
|
||||
for (int i = 0; i < getProjects().size(); i++) {
|
||||
NamedExpression expr = getProjects().get(i);
|
||||
if (expr.getName().equals(probeSlot.getName())) {
|
||||
projIndex = i;
|
||||
break;
|
||||
|
||||
if (RuntimeFilterGenerator.checkPushDownPreconditionsForProjectOrDistribute(ctx, probeSlot)) {
|
||||
PhysicalRelation scan = ctx.getAliasTransferPair(probeSlot).first;
|
||||
Preconditions.checkState(scan != null, "scan is null");
|
||||
if (scan instanceof PhysicalCTEConsumer) {
|
||||
// update the probeExpr
|
||||
int projIndex = -1;
|
||||
for (int i = 0; i < getProjects().size(); i++) {
|
||||
NamedExpression expr = getProjects().get(i);
|
||||
if (expr.getName().equals(probeSlot.getName())) {
|
||||
projIndex = i;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (projIndex < 0 || projIndex >= getProjects().size()) {
|
||||
// the pushed down path can't contain the probe expr
|
||||
return false;
|
||||
}
|
||||
NamedExpression newProbeExpr = this.getProjects().get(projIndex);
|
||||
if (newProbeExpr instanceof Alias) {
|
||||
newProbeExpr = (NamedExpression) newProbeExpr.child(0);
|
||||
}
|
||||
Slot newProbeSlot = RuntimeFilterGenerator.checkTargetChild(newProbeExpr);
|
||||
if (!RuntimeFilterGenerator.checkPushDownPreconditionsForJoin(builderNode, ctx, newProbeSlot)) {
|
||||
return false;
|
||||
}
|
||||
scan = ctx.getAliasTransferPair(newProbeSlot).first;
|
||||
probeExpr = newProbeExpr;
|
||||
}
|
||||
if (projIndex < 0 || projIndex >= getProjects().size()) {
|
||||
// the pushed down path can't contain the probe expr
|
||||
if (!RuntimeFilterGenerator.checkPushDownPreconditionsForRelation(this, scan)) {
|
||||
return false;
|
||||
}
|
||||
NamedExpression newProbeExpr = this.getProjects().get(projIndex);
|
||||
if (newProbeExpr instanceof Alias) {
|
||||
newProbeExpr = (NamedExpression) newProbeExpr.child(0);
|
||||
}
|
||||
Slot newProbeSlot = RuntimeFilterGenerator.checkTargetChild(newProbeExpr);
|
||||
if (!RuntimeFilterGenerator.checkPushDownPreconditionsForJoin(builderNode, ctx, newProbeSlot)) {
|
||||
return false;
|
||||
}
|
||||
scan = ctx.getAliasTransferPair(newProbeSlot).first;
|
||||
probeExpr = newProbeExpr;
|
||||
}
|
||||
if (!RuntimeFilterGenerator.checkPushDownPreconditionsForRelation(this, scan)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
AbstractPhysicalPlan child = (AbstractPhysicalPlan) child(0);
|
||||
return child.pushDownRuntimeFilter(context, generator, builderNode,
|
||||
src, probeExpr, type, buildSideNdv, exprOrder);
|
||||
AbstractPhysicalPlan child = (AbstractPhysicalPlan) child(0);
|
||||
return child.pushDownRuntimeFilter(context, generator, builderNode,
|
||||
src, probeExpr, type, buildSideNdv, exprOrder);
|
||||
} else {
|
||||
// if probe slot doesn't exist in aliasTransferMap, then try to pass it to child
|
||||
AbstractPhysicalPlan child = (AbstractPhysicalPlan) child(0);
|
||||
return child.pushDownRuntimeFilter(context, generator, builderNode,
|
||||
src, probeExpr, type, buildSideNdv, exprOrder);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
Reference in New Issue
Block a user