From be47a270133021b726163de276754413313a48ee Mon Sep 17 00:00:00 2001 From: Jibing-Li <64681310+Jibing-Li@users.noreply.github.com> Date: Wed, 17 May 2023 21:39:59 +0800 Subject: [PATCH] [Fix](multi catalog, nereids)Fix FileQueryScanNode couldn't filter partition in nereids planner bug (#19564) Nereids planner add conjuncts to ScanNode after call finalize, this may cause external table scan node fail to filter useless partition, because external table do the partition prune in the finalize method. This pr is to fix this bug. In the rewrite stage, pass the conjuncts to LogicalFileScan object, and eventually pass to ScanNode while creating it. So that the ScanNode could use the conjuncts while doing finalize. Why not doing the partition prune in the LogicalFileScan like LogicalOlapScan doing? Because Iceberg api doesn't have the partition concept, it just accept a list of Conjuncts, so it's easier to pass the conjuncts to ScanNode (Hive, Icegerg, Hudi...) and doing the partition prune in there. --- .../translator/PhysicalPlanTranslator.java | 3 ++ .../nereids/jobs/batch/NereidsRewriter.java | 4 +- .../apache/doris/nereids/rules/RuleType.java | 1 + .../LogicalFileScanToPhysicalFileScan.java | 3 +- .../logical/PruneFileScanPartition.java | 43 +++++++++++++++++++ .../trees/plans/logical/LogicalFileScan.java | 30 +++++++++++-- .../plans/physical/PhysicalFileScan.java | 21 ++++++--- 7 files changed, 94 insertions(+), 11 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/PruneFileScanPartition.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 87a3cc4318..1b151131db 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -672,6 +672,9 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor ExpressionTranslator.translate(e, context)) + .forEach(scanNode::addConjunct); TableName tableName = new TableName(null, "", ""); TableRef ref = new TableRef(tableName, null, null); BaseTableRef tableRef = new BaseTableRef(ref, table, tableName); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/batch/NereidsRewriter.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/batch/NereidsRewriter.java index 3302c6ff06..b09de54a85 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/batch/NereidsRewriter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/batch/NereidsRewriter.java @@ -61,6 +61,7 @@ import org.apache.doris.nereids.rules.rewrite.logical.MergeProjects; import org.apache.doris.nereids.rules.rewrite.logical.MergeSetOperations; import org.apache.doris.nereids.rules.rewrite.logical.NormalizeAggregate; import org.apache.doris.nereids.rules.rewrite.logical.NormalizeSort; +import org.apache.doris.nereids.rules.rewrite.logical.PruneFileScanPartition; import org.apache.doris.nereids.rules.rewrite.logical.PruneOlapScanPartition; import org.apache.doris.nereids.rules.rewrite.logical.PruneOlapScanTablet; import org.apache.doris.nereids.rules.rewrite.logical.PushFilterInsideJoin; @@ -244,7 +245,8 @@ public class NereidsRewriter extends BatchRewriteJob { // generate one PhysicalLimit if current distribution is gather or two // PhysicalLimits with gather exchange new SplitLimit(), - new PruneOlapScanPartition() + new PruneOlapScanPartition(), + new PruneFileScanPartition() ) ), diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java index c910eb4381..7037537f48 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java @@ -188,6 +188,7 @@ public enum RuleType { MATERIALIZED_INDEX_PROJECT_FILTER_SCAN(RuleTypeClass.REWRITE), MATERIALIZED_INDEX_FILTER_PROJECT_SCAN(RuleTypeClass.REWRITE), OLAP_SCAN_PARTITION_PRUNE(RuleTypeClass.REWRITE), + FILE_SCAN_PARTITION_PRUNE(RuleTypeClass.REWRITE), OLAP_SCAN_TABLET_PRUNE(RuleTypeClass.REWRITE), PUSH_AGGREGATE_TO_OLAP_SCAN(RuleTypeClass.REWRITE), EXTRACT_SINGLE_TABLE_EXPRESSION_FROM_DISJUNCTION(RuleTypeClass.REWRITE), diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalFileScanToPhysicalFileScan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalFileScanToPhysicalFileScan.java index e277c6b7b1..08e9e9e503 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalFileScanToPhysicalFileScan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalFileScanToPhysicalFileScan.java @@ -37,7 +37,8 @@ public class LogicalFileScanToPhysicalFileScan extends OneImplementationRuleFact fileScan.getQualifier(), DistributionSpecAny.INSTANCE, Optional.empty(), - fileScan.getLogicalProperties()) + fileScan.getLogicalProperties(), + fileScan.getConjuncts()) ).toRule(RuleType.LOGICAL_FILE_SCAN_TO_PHYSICAL_FILE_SCAN_RULE); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/PruneFileScanPartition.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/PruneFileScanPartition.java new file mode 100644 index 0000000000..9df916e1b5 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/PruneFileScanPartition.java @@ -0,0 +1,43 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.rules.rewrite.logical; + +import org.apache.doris.nereids.rules.Rule; +import org.apache.doris.nereids.rules.RuleType; +import org.apache.doris.nereids.rules.rewrite.OneRewriteRuleFactory; +import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan; +import org.apache.doris.nereids.trees.plans.logical.LogicalFilter; + +/** + * Used to prune partition of file scan. For different external tables, there is no unified partition prune method. + * For example, Hive is using hive meta store api to get partitions. Iceberg is using Iceberg api to get FileScanTask, + * which doesn't return a partition list. So, here we simply pass the conjucts to LogicalFileScan, so that different + * external file ScanNode could do the partition filter by themselves. + */ +public class PruneFileScanPartition extends OneRewriteRuleFactory { + + @Override + public Rule build() { + return logicalFilter(logicalFileScan()).thenApply(ctx -> { + LogicalFilter filter = ctx.root; + LogicalFileScan scan = filter.child(); + LogicalFileScan rewrittenScan = scan.withConjuncts(filter.getConjuncts()); + return new LogicalFilter<>(filter.getConjuncts(), rewrittenScan); + }).toRule(RuleType.FILE_SCAN_PARTITION_PRUNE); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java index c8eb7b1da0..8d53a42cc7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java @@ -20,33 +20,41 @@ package org.apache.doris.nereids.trees.plans.logical; import org.apache.doris.catalog.external.ExternalTable; import org.apache.doris.nereids.memo.GroupExpression; import org.apache.doris.nereids.properties.LogicalProperties; +import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.plans.ObjectId; import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.nereids.util.Utils; import com.google.common.base.Preconditions; +import com.google.common.collect.Sets; import java.util.List; +import java.util.Objects; import java.util.Optional; +import java.util.Set; /** * Logical file scan for external catalog. */ public class LogicalFileScan extends LogicalRelation { + private final Set conjuncts; + /** * Constructor for LogicalFileScan. */ public LogicalFileScan(ObjectId id, ExternalTable table, List qualifier, Optional groupExpression, - Optional logicalProperties) { + Optional logicalProperties, + Set conjuncts) { super(id, PlanType.LOGICAL_FILE_SCAN, table, qualifier, groupExpression, logicalProperties); + this.conjuncts = conjuncts; } public LogicalFileScan(ObjectId id, ExternalTable table, List qualifier) { - this(id, table, qualifier, Optional.empty(), Optional.empty()); + this(id, table, qualifier, Optional.empty(), Optional.empty(), Sets.newHashSet()); } @Override @@ -63,20 +71,34 @@ public class LogicalFileScan extends LogicalRelation { ); } + @Override + public boolean equals(Object o) { + return super.equals(o) && Objects.equals(conjuncts, ((LogicalFileScan) o).conjuncts); + } + @Override public LogicalFileScan withGroupExpression(Optional groupExpression) { return new LogicalFileScan(id, (ExternalTable) table, qualifier, groupExpression, - Optional.of(getLogicalProperties())); + Optional.of(getLogicalProperties()), conjuncts); } @Override public LogicalFileScan withLogicalProperties(Optional logicalProperties) { return new LogicalFileScan(id, (ExternalTable) table, qualifier, groupExpression, - logicalProperties); + logicalProperties, conjuncts); + } + + public LogicalFileScan withConjuncts(Set conjuncts) { + return new LogicalFileScan(id, (ExternalTable) table, qualifier, groupExpression, + Optional.of(getLogicalProperties()), conjuncts); } @Override public R accept(PlanVisitor visitor, C context) { return visitor.visitLogicalFileScan(this, context); } + + public Set getConjuncts() { + return this.conjuncts; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalFileScan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalFileScan.java index 536ef5d7da..5d05c011ad 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalFileScan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalFileScan.java @@ -22,6 +22,7 @@ import org.apache.doris.nereids.memo.GroupExpression; import org.apache.doris.nereids.properties.DistributionSpec; import org.apache.doris.nereids.properties.LogicalProperties; import org.apache.doris.nereids.properties.PhysicalProperties; +import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.plans.ObjectId; import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; @@ -31,6 +32,7 @@ import org.apache.doris.statistics.Statistics; import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.Set; /** * Physical file scan for external catalog. @@ -39,16 +41,18 @@ public class PhysicalFileScan extends PhysicalRelation { private final ExternalTable table; private final DistributionSpec distributionSpec; + private final Set conjuncts; /** * Constructor for PhysicalFileScan. */ public PhysicalFileScan(ObjectId id, ExternalTable table, List qualifier, DistributionSpec distributionSpec, Optional groupExpression, - LogicalProperties logicalProperties) { + LogicalProperties logicalProperties, Set conjuncts) { super(id, PlanType.PHYSICAL_FILE_SCAN, qualifier, groupExpression, logicalProperties); this.table = table; this.distributionSpec = distributionSpec; + this.conjuncts = conjuncts; } /** @@ -57,11 +61,12 @@ public class PhysicalFileScan extends PhysicalRelation { public PhysicalFileScan(ObjectId id, ExternalTable table, List qualifier, DistributionSpec distributionSpec, Optional groupExpression, LogicalProperties logicalProperties, PhysicalProperties physicalProperties, - Statistics statistics) { + Statistics statistics, Set conjuncts) { super(id, PlanType.PHYSICAL_FILE_SCAN, qualifier, groupExpression, logicalProperties, physicalProperties, statistics); this.table = table; this.distributionSpec = distributionSpec; + this.conjuncts = conjuncts; } @Override @@ -97,12 +102,14 @@ public class PhysicalFileScan extends PhysicalRelation { @Override public PhysicalFileScan withGroupExpression(Optional groupExpression) { - return new PhysicalFileScan(id, table, qualifier, distributionSpec, groupExpression, getLogicalProperties()); + return new PhysicalFileScan(id, table, qualifier, distributionSpec, + groupExpression, getLogicalProperties(), conjuncts); } @Override public PhysicalFileScan withLogicalProperties(Optional logicalProperties) { - return new PhysicalFileScan(id, table, qualifier, distributionSpec, groupExpression, logicalProperties.get()); + return new PhysicalFileScan(id, table, qualifier, distributionSpec, + groupExpression, logicalProperties.get(), conjuncts); } @Override @@ -114,6 +121,10 @@ public class PhysicalFileScan extends PhysicalRelation { public PhysicalFileScan withPhysicalPropertiesAndStats(PhysicalProperties physicalProperties, Statistics statistics) { return new PhysicalFileScan(id, table, qualifier, distributionSpec, groupExpression, getLogicalProperties(), - physicalProperties, statistics); + physicalProperties, statistics, conjuncts); + } + + public Set getConjuncts() { + return this.conjuncts; } }