[Fix](multi catalog, nereids)Fix FileQueryScanNode couldn't filter partition in nereids planner bug (#19564)
Nereids planner add conjuncts to ScanNode after call finalize, this may cause external table scan node fail to filter useless partition, because external table do the partition prune in the finalize method. This pr is to fix this bug. In the rewrite stage, pass the conjuncts to LogicalFileScan object, and eventually pass to ScanNode while creating it. So that the ScanNode could use the conjuncts while doing finalize. Why not doing the partition prune in the LogicalFileScan like LogicalOlapScan doing? Because Iceberg api doesn't have the partition concept, it just accept a list of Conjuncts, so it's easier to pass the conjuncts to ScanNode (Hive, Icegerg, Hudi...) and doing the partition prune in there.
This commit is contained in:
@ -672,6 +672,9 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
|
||||
scanNode = new IcebergScanNode(context.nextPlanNodeId(), tupleDescriptor, false);
|
||||
}
|
||||
Preconditions.checkNotNull(scanNode);
|
||||
fileScan.getConjuncts().stream()
|
||||
.map(e -> ExpressionTranslator.translate(e, context))
|
||||
.forEach(scanNode::addConjunct);
|
||||
TableName tableName = new TableName(null, "", "");
|
||||
TableRef ref = new TableRef(tableName, null, null);
|
||||
BaseTableRef tableRef = new BaseTableRef(ref, table, tableName);
|
||||
|
||||
@ -61,6 +61,7 @@ import org.apache.doris.nereids.rules.rewrite.logical.MergeProjects;
|
||||
import org.apache.doris.nereids.rules.rewrite.logical.MergeSetOperations;
|
||||
import org.apache.doris.nereids.rules.rewrite.logical.NormalizeAggregate;
|
||||
import org.apache.doris.nereids.rules.rewrite.logical.NormalizeSort;
|
||||
import org.apache.doris.nereids.rules.rewrite.logical.PruneFileScanPartition;
|
||||
import org.apache.doris.nereids.rules.rewrite.logical.PruneOlapScanPartition;
|
||||
import org.apache.doris.nereids.rules.rewrite.logical.PruneOlapScanTablet;
|
||||
import org.apache.doris.nereids.rules.rewrite.logical.PushFilterInsideJoin;
|
||||
@ -244,7 +245,8 @@ public class NereidsRewriter extends BatchRewriteJob {
|
||||
// generate one PhysicalLimit if current distribution is gather or two
|
||||
// PhysicalLimits with gather exchange
|
||||
new SplitLimit(),
|
||||
new PruneOlapScanPartition()
|
||||
new PruneOlapScanPartition(),
|
||||
new PruneFileScanPartition()
|
||||
)
|
||||
),
|
||||
|
||||
|
||||
@ -188,6 +188,7 @@ public enum RuleType {
|
||||
MATERIALIZED_INDEX_PROJECT_FILTER_SCAN(RuleTypeClass.REWRITE),
|
||||
MATERIALIZED_INDEX_FILTER_PROJECT_SCAN(RuleTypeClass.REWRITE),
|
||||
OLAP_SCAN_PARTITION_PRUNE(RuleTypeClass.REWRITE),
|
||||
FILE_SCAN_PARTITION_PRUNE(RuleTypeClass.REWRITE),
|
||||
OLAP_SCAN_TABLET_PRUNE(RuleTypeClass.REWRITE),
|
||||
PUSH_AGGREGATE_TO_OLAP_SCAN(RuleTypeClass.REWRITE),
|
||||
EXTRACT_SINGLE_TABLE_EXPRESSION_FROM_DISJUNCTION(RuleTypeClass.REWRITE),
|
||||
|
||||
@ -37,7 +37,8 @@ public class LogicalFileScanToPhysicalFileScan extends OneImplementationRuleFact
|
||||
fileScan.getQualifier(),
|
||||
DistributionSpecAny.INSTANCE,
|
||||
Optional.empty(),
|
||||
fileScan.getLogicalProperties())
|
||||
fileScan.getLogicalProperties(),
|
||||
fileScan.getConjuncts())
|
||||
).toRule(RuleType.LOGICAL_FILE_SCAN_TO_PHYSICAL_FILE_SCAN_RULE);
|
||||
}
|
||||
}
|
||||
|
||||
@ -0,0 +1,43 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.nereids.rules.rewrite.logical;
|
||||
|
||||
import org.apache.doris.nereids.rules.Rule;
|
||||
import org.apache.doris.nereids.rules.RuleType;
|
||||
import org.apache.doris.nereids.rules.rewrite.OneRewriteRuleFactory;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
|
||||
|
||||
/**
|
||||
* Used to prune partition of file scan. For different external tables, there is no unified partition prune method.
|
||||
* For example, Hive is using hive meta store api to get partitions. Iceberg is using Iceberg api to get FileScanTask,
|
||||
* which doesn't return a partition list. So, here we simply pass the conjucts to LogicalFileScan, so that different
|
||||
* external file ScanNode could do the partition filter by themselves.
|
||||
*/
|
||||
public class PruneFileScanPartition extends OneRewriteRuleFactory {
|
||||
|
||||
@Override
|
||||
public Rule build() {
|
||||
return logicalFilter(logicalFileScan()).thenApply(ctx -> {
|
||||
LogicalFilter<LogicalFileScan> filter = ctx.root;
|
||||
LogicalFileScan scan = filter.child();
|
||||
LogicalFileScan rewrittenScan = scan.withConjuncts(filter.getConjuncts());
|
||||
return new LogicalFilter<>(filter.getConjuncts(), rewrittenScan);
|
||||
}).toRule(RuleType.FILE_SCAN_PARTITION_PRUNE);
|
||||
}
|
||||
}
|
||||
@ -20,33 +20,41 @@ package org.apache.doris.nereids.trees.plans.logical;
|
||||
import org.apache.doris.catalog.external.ExternalTable;
|
||||
import org.apache.doris.nereids.memo.GroupExpression;
|
||||
import org.apache.doris.nereids.properties.LogicalProperties;
|
||||
import org.apache.doris.nereids.trees.expressions.Expression;
|
||||
import org.apache.doris.nereids.trees.plans.ObjectId;
|
||||
import org.apache.doris.nereids.trees.plans.PlanType;
|
||||
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
|
||||
import org.apache.doris.nereids.util.Utils;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* Logical file scan for external catalog.
|
||||
*/
|
||||
public class LogicalFileScan extends LogicalRelation {
|
||||
|
||||
private final Set<Expression> conjuncts;
|
||||
|
||||
/**
|
||||
* Constructor for LogicalFileScan.
|
||||
*/
|
||||
public LogicalFileScan(ObjectId id, ExternalTable table, List<String> qualifier,
|
||||
Optional<GroupExpression> groupExpression,
|
||||
Optional<LogicalProperties> logicalProperties) {
|
||||
Optional<LogicalProperties> logicalProperties,
|
||||
Set<Expression> conjuncts) {
|
||||
super(id, PlanType.LOGICAL_FILE_SCAN, table, qualifier,
|
||||
groupExpression, logicalProperties);
|
||||
this.conjuncts = conjuncts;
|
||||
}
|
||||
|
||||
public LogicalFileScan(ObjectId id, ExternalTable table, List<String> qualifier) {
|
||||
this(id, table, qualifier, Optional.empty(), Optional.empty());
|
||||
this(id, table, qualifier, Optional.empty(), Optional.empty(), Sets.newHashSet());
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -63,20 +71,34 @@ public class LogicalFileScan extends LogicalRelation {
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
return super.equals(o) && Objects.equals(conjuncts, ((LogicalFileScan) o).conjuncts);
|
||||
}
|
||||
|
||||
@Override
|
||||
public LogicalFileScan withGroupExpression(Optional<GroupExpression> groupExpression) {
|
||||
return new LogicalFileScan(id, (ExternalTable) table, qualifier, groupExpression,
|
||||
Optional.of(getLogicalProperties()));
|
||||
Optional.of(getLogicalProperties()), conjuncts);
|
||||
}
|
||||
|
||||
@Override
|
||||
public LogicalFileScan withLogicalProperties(Optional<LogicalProperties> logicalProperties) {
|
||||
return new LogicalFileScan(id, (ExternalTable) table, qualifier, groupExpression,
|
||||
logicalProperties);
|
||||
logicalProperties, conjuncts);
|
||||
}
|
||||
|
||||
public LogicalFileScan withConjuncts(Set<Expression> conjuncts) {
|
||||
return new LogicalFileScan(id, (ExternalTable) table, qualifier, groupExpression,
|
||||
Optional.of(getLogicalProperties()), conjuncts);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
|
||||
return visitor.visitLogicalFileScan(this, context);
|
||||
}
|
||||
|
||||
public Set<Expression> getConjuncts() {
|
||||
return this.conjuncts;
|
||||
}
|
||||
}
|
||||
|
||||
@ -22,6 +22,7 @@ import org.apache.doris.nereids.memo.GroupExpression;
|
||||
import org.apache.doris.nereids.properties.DistributionSpec;
|
||||
import org.apache.doris.nereids.properties.LogicalProperties;
|
||||
import org.apache.doris.nereids.properties.PhysicalProperties;
|
||||
import org.apache.doris.nereids.trees.expressions.Expression;
|
||||
import org.apache.doris.nereids.trees.plans.ObjectId;
|
||||
import org.apache.doris.nereids.trees.plans.PlanType;
|
||||
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
|
||||
@ -31,6 +32,7 @@ import org.apache.doris.statistics.Statistics;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* Physical file scan for external catalog.
|
||||
@ -39,16 +41,18 @@ public class PhysicalFileScan extends PhysicalRelation {
|
||||
|
||||
private final ExternalTable table;
|
||||
private final DistributionSpec distributionSpec;
|
||||
private final Set<Expression> conjuncts;
|
||||
|
||||
/**
|
||||
* Constructor for PhysicalFileScan.
|
||||
*/
|
||||
public PhysicalFileScan(ObjectId id, ExternalTable table, List<String> qualifier,
|
||||
DistributionSpec distributionSpec, Optional<GroupExpression> groupExpression,
|
||||
LogicalProperties logicalProperties) {
|
||||
LogicalProperties logicalProperties, Set<Expression> conjuncts) {
|
||||
super(id, PlanType.PHYSICAL_FILE_SCAN, qualifier, groupExpression, logicalProperties);
|
||||
this.table = table;
|
||||
this.distributionSpec = distributionSpec;
|
||||
this.conjuncts = conjuncts;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -57,11 +61,12 @@ public class PhysicalFileScan extends PhysicalRelation {
|
||||
public PhysicalFileScan(ObjectId id, ExternalTable table, List<String> qualifier,
|
||||
DistributionSpec distributionSpec, Optional<GroupExpression> groupExpression,
|
||||
LogicalProperties logicalProperties, PhysicalProperties physicalProperties,
|
||||
Statistics statistics) {
|
||||
Statistics statistics, Set<Expression> conjuncts) {
|
||||
super(id, PlanType.PHYSICAL_FILE_SCAN, qualifier, groupExpression, logicalProperties,
|
||||
physicalProperties, statistics);
|
||||
this.table = table;
|
||||
this.distributionSpec = distributionSpec;
|
||||
this.conjuncts = conjuncts;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -97,12 +102,14 @@ public class PhysicalFileScan extends PhysicalRelation {
|
||||
|
||||
@Override
|
||||
public PhysicalFileScan withGroupExpression(Optional<GroupExpression> groupExpression) {
|
||||
return new PhysicalFileScan(id, table, qualifier, distributionSpec, groupExpression, getLogicalProperties());
|
||||
return new PhysicalFileScan(id, table, qualifier, distributionSpec,
|
||||
groupExpression, getLogicalProperties(), conjuncts);
|
||||
}
|
||||
|
||||
@Override
|
||||
public PhysicalFileScan withLogicalProperties(Optional<LogicalProperties> logicalProperties) {
|
||||
return new PhysicalFileScan(id, table, qualifier, distributionSpec, groupExpression, logicalProperties.get());
|
||||
return new PhysicalFileScan(id, table, qualifier, distributionSpec,
|
||||
groupExpression, logicalProperties.get(), conjuncts);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -114,6 +121,10 @@ public class PhysicalFileScan extends PhysicalRelation {
|
||||
public PhysicalFileScan withPhysicalPropertiesAndStats(PhysicalProperties physicalProperties,
|
||||
Statistics statistics) {
|
||||
return new PhysicalFileScan(id, table, qualifier, distributionSpec, groupExpression, getLogicalProperties(),
|
||||
physicalProperties, statistics);
|
||||
physicalProperties, statistics, conjuncts);
|
||||
}
|
||||
|
||||
public Set<Expression> getConjuncts() {
|
||||
return this.conjuncts;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user