[fix](nereids) runtime filter push-down-cte column mapping bug #34875
This commit is contained in:
@ -533,54 +533,57 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
|
||||
if (!checkProbeSlot(ctx, unwrappedSlot)) {
|
||||
return false;
|
||||
}
|
||||
Slot cteSlot = ctx.getAliasTransferPair(unwrappedSlot).second;
|
||||
Slot consumerOutputSlot = ctx.getAliasTransferPair(unwrappedSlot).second;
|
||||
PhysicalRelation cteNode = ctx.getAliasTransferPair(unwrappedSlot).first;
|
||||
long buildSideNdv = rf.getBuildSideNdv();
|
||||
if (cteNode instanceof PhysicalCTEConsumer && inputPlanNode instanceof PhysicalProject) {
|
||||
PhysicalProject<Plan> project = (PhysicalProject<Plan>) inputPlanNode;
|
||||
NamedExpression targetExpr = null;
|
||||
for (NamedExpression ne : project.getProjects()) {
|
||||
if (cteSlot.getName().equals(ne.getName())) {
|
||||
targetExpr = ne;
|
||||
break;
|
||||
}
|
||||
if (!(cteNode instanceof PhysicalCTEConsumer) || !(inputPlanNode instanceof PhysicalProject)) {
|
||||
return false;
|
||||
}
|
||||
Slot cteSlot = ((PhysicalCTEConsumer) cteNode).getProducerSlot(consumerOutputSlot);
|
||||
|
||||
PhysicalProject<Plan> project = (PhysicalProject<Plan>) inputPlanNode;
|
||||
NamedExpression targetExpr = null;
|
||||
for (NamedExpression ne : project.getProjects()) {
|
||||
if (cteSlot.getExprId().equals(ne.getExprId())) {
|
||||
targetExpr = ne;
|
||||
break;
|
||||
}
|
||||
Preconditions.checkState(targetExpr != null,
|
||||
"cannot find runtime filter cte.target: "
|
||||
+ cteSlot + "in project " + project.toString());
|
||||
if (targetExpr instanceof SlotReference && checkCanPushDownIntoBasicTable(project)) {
|
||||
Map<Slot, PhysicalRelation> pushDownBasicTableInfos = getPushDownBasicTablesInfos(project,
|
||||
(SlotReference) targetExpr, ctx);
|
||||
if (!pushDownBasicTableInfos.isEmpty()) {
|
||||
List<Slot> targetList = new ArrayList<>();
|
||||
List<Expression> targetExpressions = new ArrayList<>();
|
||||
List<PhysicalRelation> targetNodes = new ArrayList<>();
|
||||
for (Map.Entry<Slot, PhysicalRelation> entry : pushDownBasicTableInfos.entrySet()) {
|
||||
Slot targetSlot = entry.getKey();
|
||||
PhysicalRelation scan = entry.getValue();
|
||||
if (!RuntimeFilterGenerator.checkPushDownPreconditionsForRelation(project, scan)) {
|
||||
continue;
|
||||
}
|
||||
targetList.add(targetSlot);
|
||||
targetExpressions.add(targetSlot);
|
||||
targetNodes.add(scan);
|
||||
ctx.addJoinToTargetMap(rf.getBuilderNode(), targetSlot.getExprId());
|
||||
ctx.setTargetsOnScanNode(scan, targetSlot);
|
||||
}
|
||||
Preconditions.checkState(targetExpr != null,
|
||||
"cannot find runtime filter cte.target: "
|
||||
+ cteSlot + "in project " + project.toString());
|
||||
if (targetExpr instanceof SlotReference && checkCanPushDownIntoBasicTable(project)) {
|
||||
Map<Slot, PhysicalRelation> pushDownBasicTableInfos = getPushDownBasicTablesInfos(project,
|
||||
(SlotReference) targetExpr, ctx);
|
||||
if (!pushDownBasicTableInfos.isEmpty()) {
|
||||
List<Slot> targetList = new ArrayList<>();
|
||||
List<Expression> targetExpressions = new ArrayList<>();
|
||||
List<PhysicalRelation> targetNodes = new ArrayList<>();
|
||||
for (Map.Entry<Slot, PhysicalRelation> entry : pushDownBasicTableInfos.entrySet()) {
|
||||
Slot targetSlot = entry.getKey();
|
||||
PhysicalRelation scan = entry.getValue();
|
||||
if (!RuntimeFilterGenerator.checkPushDownPreconditionsForRelation(project, scan)) {
|
||||
continue;
|
||||
}
|
||||
if (targetList.isEmpty()) {
|
||||
return false;
|
||||
}
|
||||
RuntimeFilter filter = new RuntimeFilter(generator.getNextId(),
|
||||
rf.getSrcExpr(), targetList, targetExpressions, rf.getType(), rf.getExprOrder(),
|
||||
rf.getBuilderNode(), buildSideNdv, rf.isBloomFilterSizeCalculatedByNdv(),
|
||||
rf.gettMinMaxType(), cteNode);
|
||||
targetNodes.forEach(node -> node.addAppliedRuntimeFilter(filter));
|
||||
for (Slot slot : targetList) {
|
||||
ctx.setTargetExprIdToFilter(slot.getExprId(), filter);
|
||||
}
|
||||
ctx.setRuntimeFilterIdentityToFilter(rf.getSrcExpr(), rf.getType(), rf.getBuilderNode(), filter);
|
||||
return true;
|
||||
targetList.add(targetSlot);
|
||||
targetExpressions.add(targetSlot);
|
||||
targetNodes.add(scan);
|
||||
ctx.addJoinToTargetMap(rf.getBuilderNode(), targetSlot.getExprId());
|
||||
ctx.setTargetsOnScanNode(scan, targetSlot);
|
||||
}
|
||||
if (targetList.isEmpty()) {
|
||||
return false;
|
||||
}
|
||||
RuntimeFilter filter = new RuntimeFilter(generator.getNextId(),
|
||||
rf.getSrcExpr(), targetList, targetExpressions, rf.getType(), rf.getExprOrder(),
|
||||
rf.getBuilderNode(), buildSideNdv, rf.isBloomFilterSizeCalculatedByNdv(),
|
||||
rf.gettMinMaxType(), cteNode);
|
||||
targetNodes.forEach(node -> node.addAppliedRuntimeFilter(filter));
|
||||
for (Slot slot : targetList) {
|
||||
ctx.setTargetExprIdToFilter(slot.getExprId(), filter);
|
||||
}
|
||||
ctx.setRuntimeFilterIdentityToFilter(rf.getSrcExpr(), rf.getType(), rf.getBuilderNode(), filter);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
|
||||
Reference in New Issue
Block a user