[Fix](multi catalog)(nereids)Enable runtime filter for external table (#16855)

Enable runtime filter for external table.
This commit is contained in:
Jibing-Li
2023-02-21 10:35:58 +08:00
committed by GitHub
parent 4522aeb74a
commit 44fed0e99b
5 changed files with 28 additions and 14 deletions

View File

@ -531,8 +531,12 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
.build();
TupleDescriptor tupleDescriptor = generateTupleDesc(slotList, table, context);
tupleDescriptor.setTable(table);
SchemaScanNode scanNode = new SchemaScanNode(context.nextPlanNodeId(), tupleDescriptor);
context.getRuntimeTranslator().ifPresent(
runtimeFilterGenerator -> runtimeFilterGenerator.getTargetOnScanNode(schemaScan.getId()).forEach(
expr -> runtimeFilterGenerator.translateRuntimeFilterTarget(expr, scanNode, context)
)
);
scanNode.finalizeForNereids();
context.getScanNodes().add(scanNode);
PlanFragment planFragment =
@ -556,6 +560,11 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
Utils.execWithUncheckedException(fileScanNode::init);
context.addScanNode(fileScanNode);
context.getRuntimeTranslator().ifPresent(
runtimeFilterGenerator -> runtimeFilterGenerator.getTargetOnScanNode(fileScan.getId()).forEach(
expr -> runtimeFilterGenerator.translateRuntimeFilterTarget(expr, fileScanNode, context)
)
);
Utils.execWithUncheckedException(fileScanNode::finalizeForNerieds);
// Create PlanFragment
DataPartition dataPartition = DataPartition.RANDOM;
@ -571,6 +580,11 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
TableValuedFunctionIf catalogFunction = tvfRelation.getFunction().getCatalogFunction();
ScanNode scanNode = catalogFunction.getScanNode(context.nextPlanNodeId(), tupleDescriptor);
context.getRuntimeTranslator().ifPresent(
runtimeFilterGenerator -> runtimeFilterGenerator.getTargetOnScanNode(tvfRelation.getId()).forEach(
expr -> runtimeFilterGenerator.translateRuntimeFilterTarget(expr, scanNode, context)
)
);
scanNode.finalizeForNereids();
context.addScanNode(scanNode);

View File

@ -29,8 +29,8 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
import org.apache.doris.nereids.trees.plans.physical.RuntimeFilter;
import org.apache.doris.planner.HashJoinNode;
import org.apache.doris.planner.HashJoinNode.DistributionMode;
import org.apache.doris.planner.OlapScanNode;
import org.apache.doris.planner.RuntimeFilter.RuntimeFilterTarget;
import org.apache.doris.planner.ScanNode;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@ -63,7 +63,7 @@ public class RuntimeFilterTranslator {
* @param node olap scan node
* @param ctx plan translator context
*/
public void translateRuntimeFilterTarget(Slot slot, OlapScanNode node, PlanTranslatorContext ctx) {
public void translateRuntimeFilterTarget(Slot slot, ScanNode node, PlanTranslatorContext ctx) {
context.getExprIdToOlapScanNodeSlotRef().put(slot.getExprId(), ctx.findSlotRef(slot.getExprId()));
context.getScanNodeOfLegacyRuntimeFilterTarget().put(slot, node);
}
@ -94,7 +94,7 @@ public class RuntimeFilterTranslator {
ImmutableMap.of(targetTupleId, ImmutableList.of(targetSlotId)),
filter.getType(), context.getLimits());
origFilter.setIsBroadcast(node.getDistributionMode() == DistributionMode.BROADCAST);
OlapScanNode scanNode = context.getScanNodeOfLegacyRuntimeFilterTarget().get(filter.getTargetExpr());
ScanNode scanNode = context.getScanNodeOfLegacyRuntimeFilterTarget().get(filter.getTargetExpr());
origFilter.addTarget(new RuntimeFilterTarget(
scanNode,
target,

View File

@ -27,9 +27,9 @@ import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.RelationId;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
import org.apache.doris.nereids.trees.plans.physical.RuntimeFilter;
import org.apache.doris.planner.OlapScanNode;
import org.apache.doris.planner.RuntimeFilterGenerator.FilterSizeLimits;
import org.apache.doris.planner.RuntimeFilterId;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.qe.SessionVariable;
import com.google.common.annotations.VisibleForTesting;
@ -71,7 +71,7 @@ public class RuntimeFilterContext {
// you can see disjoint set data structure to learn the processing detailed.
private final Map<NamedExpression, Pair<RelationId, Slot>> aliasTransferMap = Maps.newHashMap();
private final Map<Slot, OlapScanNode> scanNodeOfLegacyRuntimeFilterTarget = Maps.newHashMap();
private final Map<Slot, ScanNode> scanNodeOfLegacyRuntimeFilterTarget = Maps.newHashMap();
private final Set<Plan> effectiveSrcNodes = Sets.newHashSet();
private final SessionVariable sessionVariable;
@ -128,7 +128,7 @@ public class RuntimeFilterContext {
return aliasTransferMap;
}
public Map<Slot, OlapScanNode> getScanNodeOfLegacyRuntimeFilterTarget() {
public Map<Slot, ScanNode> getScanNodeOfLegacyRuntimeFilterTarget() {
return scanNodeOfLegacyRuntimeFilterTarget;
}

View File

@ -29,9 +29,9 @@ import org.apache.doris.nereids.trees.plans.JoinType;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.RelationId;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalStorageLayerAggregate;
import org.apache.doris.nereids.trees.plans.physical.RuntimeFilter;
import org.apache.doris.nereids.util.ExpressionUtils;
@ -136,7 +136,7 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
}
@Override
public PhysicalOlapScan visitPhysicalOlapScan(PhysicalOlapScan scan, CascadesContext context) {
public PhysicalRelation visitPhysicalScan(PhysicalRelation scan, CascadesContext context) {
// add all the slots in map.
RuntimeFilterContext ctx = context.getRuntimeFilterContext();
scan.getOutput().forEach(slot -> ctx.getAliasTransferMap().put(slot, Pair.of(scan.getId(), slot)));

View File

@ -30,9 +30,9 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
import org.apache.doris.nereids.trees.plans.physical.PhysicalQuickSort;
import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalTopN;
import org.apache.doris.statistics.ColumnStatistic;
import org.apache.doris.statistics.StatsDeriveResult;
@ -129,19 +129,19 @@ public class RuntimeFilterPruner extends PlanPostProcessor {
}
@Override
public PhysicalOlapScan visitPhysicalOlapScan(PhysicalOlapScan olapScan, CascadesContext context) {
public PhysicalRelation visitPhysicalScan(PhysicalRelation scan, CascadesContext context) {
RuntimeFilterContext rfCtx = context.getRuntimeFilterContext();
List<Slot> slots = rfCtx.getTargetOnOlapScanNodeMap().get(olapScan.getId());
List<Slot> slots = rfCtx.getTargetOnOlapScanNodeMap().get(scan.getId());
if (slots != null) {
for (Slot slot : slots) {
//if this scan node is the target of any effective RF, it is effective source
if (!rfCtx.getTargetExprIdToFilter().get(slot.getExprId()).isEmpty()) {
context.getRuntimeFilterContext().addEffectiveSrcNode(olapScan);
context.getRuntimeFilterContext().addEffectiveSrcNode(scan);
break;
}
}
}
return olapScan;
return scan;
}
// *******************************