diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 992eb35f4b..7a400d9363 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -531,8 +531,12 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor runtimeFilterGenerator.getTargetOnScanNode(schemaScan.getId()).forEach( + expr -> runtimeFilterGenerator.translateRuntimeFilterTarget(expr, scanNode, context) + ) + ); scanNode.finalizeForNereids(); context.getScanNodes().add(scanNode); PlanFragment planFragment = @@ -556,6 +560,11 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor runtimeFilterGenerator.getTargetOnScanNode(fileScan.getId()).forEach( + expr -> runtimeFilterGenerator.translateRuntimeFilterTarget(expr, fileScanNode, context) + ) + ); Utils.execWithUncheckedException(fileScanNode::finalizeForNerieds); // Create PlanFragment DataPartition dataPartition = DataPartition.RANDOM; @@ -571,6 +580,11 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor runtimeFilterGenerator.getTargetOnScanNode(tvfRelation.getId()).forEach( + expr -> runtimeFilterGenerator.translateRuntimeFilterTarget(expr, scanNode, context) + ) + ); scanNode.finalizeForNereids(); context.addScanNode(scanNode); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java index 92b03f4c20..78b2dbdf25 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java @@ -29,8 +29,8 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin; import org.apache.doris.nereids.trees.plans.physical.RuntimeFilter; import org.apache.doris.planner.HashJoinNode; import org.apache.doris.planner.HashJoinNode.DistributionMode; -import org.apache.doris.planner.OlapScanNode; import org.apache.doris.planner.RuntimeFilter.RuntimeFilterTarget; +import org.apache.doris.planner.ScanNode; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -63,7 +63,7 @@ public class RuntimeFilterTranslator { * @param node olap scan node * @param ctx plan translator context */ - public void translateRuntimeFilterTarget(Slot slot, OlapScanNode node, PlanTranslatorContext ctx) { + public void translateRuntimeFilterTarget(Slot slot, ScanNode node, PlanTranslatorContext ctx) { context.getExprIdToOlapScanNodeSlotRef().put(slot.getExprId(), ctx.findSlotRef(slot.getExprId())); context.getScanNodeOfLegacyRuntimeFilterTarget().put(slot, node); } @@ -94,7 +94,7 @@ public class RuntimeFilterTranslator { ImmutableMap.of(targetTupleId, ImmutableList.of(targetSlotId)), filter.getType(), context.getLimits()); origFilter.setIsBroadcast(node.getDistributionMode() == DistributionMode.BROADCAST); - OlapScanNode scanNode = context.getScanNodeOfLegacyRuntimeFilterTarget().get(filter.getTargetExpr()); + ScanNode scanNode = context.getScanNodeOfLegacyRuntimeFilterTarget().get(filter.getTargetExpr()); origFilter.addTarget(new RuntimeFilterTarget( scanNode, target, diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java index 3882a92f15..e00775f520 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java @@ -27,9 +27,9 @@ import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.RelationId; import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin; import org.apache.doris.nereids.trees.plans.physical.RuntimeFilter; -import org.apache.doris.planner.OlapScanNode; import org.apache.doris.planner.RuntimeFilterGenerator.FilterSizeLimits; import org.apache.doris.planner.RuntimeFilterId; +import org.apache.doris.planner.ScanNode; import org.apache.doris.qe.SessionVariable; import com.google.common.annotations.VisibleForTesting; @@ -71,7 +71,7 @@ public class RuntimeFilterContext { // you can see disjoint set data structure to learn the processing detailed. private final Map> aliasTransferMap = Maps.newHashMap(); - private final Map scanNodeOfLegacyRuntimeFilterTarget = Maps.newHashMap(); + private final Map scanNodeOfLegacyRuntimeFilterTarget = Maps.newHashMap(); private final Set effectiveSrcNodes = Sets.newHashSet(); private final SessionVariable sessionVariable; @@ -128,7 +128,7 @@ public class RuntimeFilterContext { return aliasTransferMap; } - public Map getScanNodeOfLegacyRuntimeFilterTarget() { + public Map getScanNodeOfLegacyRuntimeFilterTarget() { return scanNodeOfLegacyRuntimeFilterTarget; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java index 57e10c50f9..7c8658fc37 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java @@ -29,9 +29,9 @@ import org.apache.doris.nereids.trees.plans.JoinType; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.RelationId; import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin; -import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan; import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan; import org.apache.doris.nereids.trees.plans.physical.PhysicalProject; +import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation; import org.apache.doris.nereids.trees.plans.physical.PhysicalStorageLayerAggregate; import org.apache.doris.nereids.trees.plans.physical.RuntimeFilter; import org.apache.doris.nereids.util.ExpressionUtils; @@ -136,7 +136,7 @@ public class RuntimeFilterGenerator extends PlanPostProcessor { } @Override - public PhysicalOlapScan visitPhysicalOlapScan(PhysicalOlapScan scan, CascadesContext context) { + public PhysicalRelation visitPhysicalScan(PhysicalRelation scan, CascadesContext context) { // add all the slots in map. RuntimeFilterContext ctx = context.getRuntimeFilterContext(); scan.getOutput().forEach(slot -> ctx.getAliasTransferMap().put(slot, Pair.of(scan.getId(), slot))); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPruner.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPruner.java index 48abc974a1..7d0613260e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPruner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPruner.java @@ -30,9 +30,9 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter; import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate; import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin; import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit; -import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan; import org.apache.doris.nereids.trees.plans.physical.PhysicalProject; import org.apache.doris.nereids.trees.plans.physical.PhysicalQuickSort; +import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation; import org.apache.doris.nereids.trees.plans.physical.PhysicalTopN; import org.apache.doris.statistics.ColumnStatistic; import org.apache.doris.statistics.StatsDeriveResult; @@ -129,19 +129,19 @@ public class RuntimeFilterPruner extends PlanPostProcessor { } @Override - public PhysicalOlapScan visitPhysicalOlapScan(PhysicalOlapScan olapScan, CascadesContext context) { + public PhysicalRelation visitPhysicalScan(PhysicalRelation scan, CascadesContext context) { RuntimeFilterContext rfCtx = context.getRuntimeFilterContext(); - List slots = rfCtx.getTargetOnOlapScanNodeMap().get(olapScan.getId()); + List slots = rfCtx.getTargetOnOlapScanNodeMap().get(scan.getId()); if (slots != null) { for (Slot slot : slots) { //if this scan node is the target of any effective RF, it is effective source if (!rfCtx.getTargetExprIdToFilter().get(slot.getExprId()).isEmpty()) { - context.getRuntimeFilterContext().addEffectiveSrcNode(olapScan); + context.getRuntimeFilterContext().addEffectiveSrcNode(scan); break; } } } - return olapScan; + return scan; } // *******************************