[fix](nereids) fix runtime filter expr order (#21480)
Current runtime filter pushing down to cte internal, we construct the runtime filter expr_order with incremental number, which is not correct. For cte internal rf pushing down, the join node will be always different, the expr_order should be fixed as 0 without incrementation, otherwise, it will lead the checking for expr_order and probe_expr_size illegal or wrong query result. This pr will revert 2827bc1 temporarily, it will break the cte rf pushing down plan pattern.
This commit is contained in:
@ -557,7 +557,6 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
|
||||
continue;
|
||||
}
|
||||
Map<EqualTo, PhysicalHashJoin> equalCondToJoinMap = entry.getValue();
|
||||
int exprOrder = 0;
|
||||
for (Map.Entry<EqualTo, PhysicalHashJoin> innerEntry : equalCondToJoinMap.entrySet()) {
|
||||
EqualTo equalTo = innerEntry.getKey();
|
||||
PhysicalHashJoin join = innerEntry.getValue();
|
||||
@ -568,15 +567,14 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
|
||||
}
|
||||
EqualTo newEqualTo = ((EqualTo) JoinUtils.swapEqualToForChildrenOrder(
|
||||
equalTo, join.child(0).getOutputSet()));
|
||||
doPushDownIntoCTEProducerInternal(join, ctx, newEqualTo, type, exprOrder++, cteProducer);
|
||||
doPushDownIntoCTEProducerInternal(join, ctx, newEqualTo, type, cteProducer);
|
||||
}
|
||||
ctx.getPushedDownCTE().add(cteProducer.getCteId());
|
||||
}
|
||||
}
|
||||
|
||||
private void doPushDownIntoCTEProducerInternal(PhysicalHashJoin<? extends Plan, ? extends Plan> join,
|
||||
RuntimeFilterContext ctx, EqualTo equalTo, TRuntimeFilterType type, int exprOrder,
|
||||
PhysicalCTEProducer cteProducer) {
|
||||
RuntimeFilterContext ctx, EqualTo equalTo, TRuntimeFilterType type, PhysicalCTEProducer cteProducer) {
|
||||
Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap = ctx.getAliasTransferMap();
|
||||
PhysicalPlan inputPlanNode = (PhysicalPlan) cteProducer.child(0);
|
||||
Slot unwrappedSlot = checkTargetChild(equalTo.left());
|
||||
@ -617,8 +615,9 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
|
||||
ctx.setTargetsOnScanNode(scan.getId(), targetSlot);
|
||||
}
|
||||
// build multi-target runtime filter
|
||||
// since always on different join, set the expr_order as 0
|
||||
RuntimeFilter filter = new RuntimeFilter(generator.getNextId(),
|
||||
equalTo.right(), targetList, type, exprOrder, join, buildSideNdv);
|
||||
equalTo.right(), targetList, type, 0, join, buildSideNdv);
|
||||
for (Slot slot : targetList) {
|
||||
ctx.setTargetExprIdToFilter(slot.getExprId(), filter);
|
||||
}
|
||||
@ -662,12 +661,12 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
|
||||
if (equalTo instanceof EqualTo) {
|
||||
SlotReference leftSlot = (SlotReference) ((EqualTo) equalTo).left();
|
||||
SlotReference rightSlot = (SlotReference) ((EqualTo) equalTo).right();
|
||||
if (leftSlot.getExprId() == exprId) {
|
||||
if (leftSlot.getExprId() == exprId && aliasTransferMap.get(rightSlot) != null) {
|
||||
PhysicalOlapScan rightTable = (PhysicalOlapScan) aliasTransferMap.get(rightSlot).first;
|
||||
if (rightTable != null) {
|
||||
basicTableInfos.put(rightSlot, rightTable);
|
||||
}
|
||||
} else if (rightSlot.getExprId() == exprId) {
|
||||
} else if (rightSlot.getExprId() == exprId && aliasTransferMap.get(leftSlot) != null) {
|
||||
PhysicalOlapScan leftTable = (PhysicalOlapScan) aliasTransferMap.get(leftSlot).first;
|
||||
if (leftTable != null) {
|
||||
basicTableInfos.put(leftSlot, leftTable);
|
||||
|
||||
@ -111,8 +111,8 @@ public class Statistics {
|
||||
ColumnStatistic columnStatistic = entry.getValue();
|
||||
ColumnStatisticBuilder columnStatisticBuilder = new ColumnStatisticBuilder(columnStatistic);
|
||||
columnStatisticBuilder.setNdv(Math.min(columnStatistic.ndv, rowCount));
|
||||
double numNulls = Math.min(columnStatistic.numNulls, rowCount - columnStatistic.ndv);
|
||||
columnStatisticBuilder.setNumNulls(numNulls);
|
||||
double nullFactor = (rowCount - columnStatistic.numNulls) / rowCount;
|
||||
columnStatisticBuilder.setNumNulls(nullFactor * rowCount);
|
||||
columnStatisticBuilder.setCount(rowCount);
|
||||
statistics.addColumnStats(entry.getKey(), columnStatisticBuilder.build());
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user