[fix](hive) fix partition prune issue and some external table test cases (#24338)

1. Fix hive partition prune bug, introduced from #23845, will fail `test_hive_default_partition` test case.
2. Fix `test_local_tvf.groovy` test case, the path of local tvf should be relative path.
3. Fix `test_external_catalog_hive` test case, the `partitions` is now reserve keywords
4. Support `local` tvf in Nereids, but fix related issue like:

```
Caused by: java.lang.NullPointerException
        at org.apache.doris.nereids.stats.ExpressionEstimation.castMinMax(ExpressionEstimation.java:171) ~[doris-fe.jar:1.2-SNAPSHOT]
        at org.apache.doris.nereids.stats.ExpressionEstimation.visitCast(ExpressionEstimation.java:167) ~[doris-fe.jar:1.2-SNAPSHOT]
        at org.apache.doris.nereids.stats.ExpressionEstimation.visitCast(ExpressionEstimation.java:109) ~[doris-fe.jar:1.2-SNAPSHOT]
        at org.apache.doris.nereids.trees.expressions.Cast.accept(Cast.java:55) ~[doris-fe.jar:1.2-SNAPSHOT]
        at org.apache.doris.nereids.stats.ExpressionEstimation.visitAlias(ExpressionEstimation.java:394) ~[doris-fe.jar:1.2-SNAPSHOT]
        at org.apache.doris.nereids.stats.ExpressionEstimation.visitAlias(ExpressionEstimation.java:109) ~[doris-fe.jar:1.2-SNAPSHOT]
        at org.apache.doris.nereids.trees.expressions.Alias.accept(Alias.java:145) ~[doris-fe.jar:1.2-SNAPSHOT]
        at org.apache.doris.nereids.stats.ExpressionEstimation.estimate(ExpressionEstimation.java:119) ~[doris-fe.jar:1.2-SNAPSHOT]
        at org.apache.doris.nereids.stats.StatsCalculator.lambda$computeProject$7(StatsCalculator.java:785) ~[doris-fe.jar:1.2-SNAPSHOT]
        at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) ~[?:1.8.0_341]
        at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948) ~[?:1.8.0_341]
        at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) ~[?:1.8.0_341]
```
This commit is contained in:
Mingyu Chen
2023-09-15 20:57:04 +08:00
committed by GitHub
parent 29824ccd9d
commit b407f275c8
15 changed files with 175 additions and 136 deletions

View File

@ -18,6 +18,7 @@
package org.apache.doris.catalog;
import org.apache.doris.nereids.trees.expressions.functions.table.Hdfs;
import org.apache.doris.nereids.trees.expressions.functions.table.Local;
import org.apache.doris.nereids.trees.expressions.functions.table.Numbers;
import org.apache.doris.nereids.trees.expressions.functions.table.S3;
@ -33,7 +34,8 @@ public class BuiltinTableValuedFunctions implements FunctionHelper {
public final ImmutableList<TableValuedFunc> tableValuedFunctions = ImmutableList.of(
tableValued(Numbers.class, "numbers"),
tableValued(Hdfs.class, "hdfs"),
tableValued(S3.class, "s3")
tableValued(S3.class, "s3"),
tableValued(Local.class, "local")
);
public static final BuiltinTableValuedFunctions INSTANCE = new BuiltinTableValuedFunctions();
@ -41,3 +43,4 @@ public class BuiltinTableValuedFunctions implements FunctionHelper {
// Note: Do not add any code here!
private BuiltinTableValuedFunctions() {}
}

View File

@ -34,6 +34,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import org.apache.commons.collections.CollectionUtils;
@ -55,31 +56,35 @@ public class PruneFileScanPartition extends OneRewriteRuleFactory {
@Override
public Rule build() {
return logicalFilter(logicalFileScan()).when(p -> p.child().getSelectedPartitions() == null).thenApply(ctx -> {
LogicalFilter<LogicalFileScan> filter = ctx.root;
LogicalFileScan scan = filter.child();
ExternalTable tbl = scan.getTable();
SelectedPartitions selectedPartitions = new SelectedPartitions(0, Maps.newHashMap(), false);
return logicalFilter(logicalFileScan()).whenNot(p -> p.child().getSelectedPartitions().isPruned)
.thenApply(ctx -> {
LogicalFilter<LogicalFileScan> filter = ctx.root;
LogicalFileScan scan = filter.child();
ExternalTable tbl = scan.getTable();
// TODO(cmy): support other external table
if (tbl instanceof HMSExternalTable && ((HMSExternalTable) tbl).getDlaType() == DLAType.HIVE) {
HMSExternalTable hiveTbl = (HMSExternalTable) tbl;
selectedPartitions = pruneHivePartitions(hiveTbl, filter, scan, ctx.cascadesContext);
}
SelectedPartitions selectedPartitions;
// TODO(cmy): support other external table
if (tbl instanceof HMSExternalTable && ((HMSExternalTable) tbl).getDlaType() == DLAType.HIVE) {
HMSExternalTable hiveTbl = (HMSExternalTable) tbl;
selectedPartitions = pruneHivePartitions(hiveTbl, filter, scan, ctx.cascadesContext);
} else {
// set isPruned so that it won't go pass the partition prune again
selectedPartitions = new SelectedPartitions(0, ImmutableMap.of(), true);
}
LogicalFileScan rewrittenScan = scan.withConjuncts(filter.getConjuncts())
.withSelectedPartitions(selectedPartitions);
return new LogicalFilter<>(filter.getConjuncts(), rewrittenScan);
}).toRule(RuleType.FILE_SCAN_PARTITION_PRUNE);
LogicalFileScan rewrittenScan = scan.withConjuncts(filter.getConjuncts())
.withSelectedPartitions(selectedPartitions);
return new LogicalFilter<>(filter.getConjuncts(), rewrittenScan);
}).toRule(RuleType.FILE_SCAN_PARTITION_PRUNE);
}
private SelectedPartitions pruneHivePartitions(HMSExternalTable hiveTbl,
LogicalFilter<LogicalFileScan> filter, LogicalFileScan scan, CascadesContext ctx) {
Map<Long, PartitionItem> selectedPartitionItems = Maps.newHashMap();
if (CollectionUtils.isEmpty(hiveTbl.getPartitionColumns())) {
// non partitioned table, return null.
// and it will be handled in HiveScanNode
return new SelectedPartitions(1, Maps.newHashMap(), false);
// non partitioned table, return NOT_PRUNED.
// non partition table will be handled in HiveScanNode.
return SelectedPartitions.NOT_PRUNED;
}
Map<String, Slot> scanOutput = scan.getOutput()
.stream()
@ -104,3 +109,4 @@ public class PruneFileScanPartition extends OneRewriteRuleFactory {
return new SelectedPartitions(idToPartitionItem.size(), selectedPartitionItems, true);
}
}

View File

@ -95,6 +95,7 @@ import org.apache.doris.statistics.ColumnStatistic;
import org.apache.doris.statistics.ColumnStatisticBuilder;
import org.apache.doris.statistics.Statistics;
import com.google.common.base.Preconditions;
import org.apache.commons.collections.CollectionUtils;
import java.time.Instant;
@ -163,7 +164,7 @@ public class ExpressionEstimation extends ExpressionVisitor<ColumnStatistic, Sta
return stats;
}
ColumnStatistic childColStats = cast.child().accept(this, context);
Preconditions.checkNotNull(childColStats, "childColStats is null");
return castMinMax(childColStats, cast.getDataType());
}
@ -842,3 +843,4 @@ public class ExpressionEstimation extends ExpressionVisitor<ColumnStatistic, Sta
return dateDiff(1, secondsDiff, context);
}
}

View File

@ -20,14 +20,11 @@ package org.apache.doris.nereids.trees.expressions.functions.table;
import org.apache.doris.catalog.FunctionSignature;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.trees.expressions.Properties;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
import org.apache.doris.nereids.types.coercion.AnyDataType;
import org.apache.doris.statistics.Statistics;
import org.apache.doris.tablefunction.HdfsTableValuedFunction;
import org.apache.doris.tablefunction.TableValuedFunctionIf;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -53,11 +50,6 @@ public class Hdfs extends TableValuedFunction {
}
}
@Override
public Statistics computeStats(List<Slot> slots) {
return new Statistics(0, new HashMap<>());
}
@Override
public <R, C> R accept(ExpressionVisitor<R, C> visitor, C context) {
return visitor.visitHdfs(this, context);

View File

@ -0,0 +1,58 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.expressions.functions.table;
import org.apache.doris.catalog.FunctionSignature;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.trees.expressions.Properties;
import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
import org.apache.doris.nereids.types.coercion.AnyDataType;
import org.apache.doris.tablefunction.LocalTableValuedFunction;
import org.apache.doris.tablefunction.TableValuedFunctionIf;
import java.util.Map;
/**
* local
*/
public class Local extends TableValuedFunction {
public Local(Properties properties) {
super("local", properties);
}
@Override
public FunctionSignature customSignature() {
return FunctionSignature.of(AnyDataType.INSTANCE_WITHOUT_INDEX, getArgumentsTypes());
}
@Override
protected TableValuedFunctionIf toCatalogFunction() {
try {
Map<String, String> arguments = getTVFProperties().getMap();
return new LocalTableValuedFunction(arguments);
} catch (Throwable t) {
throw new AnalysisException("Can not build LocalTableValuedFunction by "
+ this + ": " + t.getMessage(), t);
}
}
@Override
public <R, C> R accept(ExpressionVisitor<R, C> visitor, C context) {
return visitor.visitLocal(this, context);
}
}

View File

@ -20,15 +20,11 @@ package org.apache.doris.nereids.trees.expressions.functions.table;
import org.apache.doris.catalog.FunctionSignature;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.trees.expressions.Properties;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
import org.apache.doris.nereids.types.coercion.AnyDataType;
import org.apache.doris.statistics.Statistics;
import org.apache.doris.tablefunction.S3TableValuedFunction;
import org.apache.doris.tablefunction.TableValuedFunctionIf;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/** s3 */
@ -39,7 +35,7 @@ public class S3 extends TableValuedFunction {
@Override
public FunctionSignature customSignature() {
return FunctionSignature.of(AnyDataType.INSTANCE_WITHOUT_INDEX, (List) getArgumentsTypes());
return FunctionSignature.of(AnyDataType.INSTANCE_WITHOUT_INDEX, getArgumentsTypes());
}
@Override
@ -53,11 +49,6 @@ public class S3 extends TableValuedFunction {
}
}
@Override
public Statistics computeStats(List<Slot> slots) {
return new Statistics(0, new HashMap<>());
}
@Override
public <R, C> R accept(ExpressionVisitor<R, C> visitor, C context) {
return visitor.visitS3(this, context);

View File

@ -22,6 +22,7 @@ import org.apache.doris.catalog.FunctionGenTable;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.exceptions.UnboundException;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.Properties;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.functions.BoundFunction;
@ -29,13 +30,16 @@ import org.apache.doris.nereids.trees.expressions.functions.CustomSignature;
import org.apache.doris.nereids.trees.expressions.shape.UnaryExpression;
import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
import org.apache.doris.nereids.types.DataType;
import org.apache.doris.statistics.ColumnStatistic;
import org.apache.doris.statistics.Statistics;
import org.apache.doris.tablefunction.TableValuedFunctionIf;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@ -59,7 +63,20 @@ public abstract class TableValuedFunction extends BoundFunction implements Unary
protected abstract TableValuedFunctionIf toCatalogFunction();
public abstract Statistics computeStats(List<Slot> slots);
/**
* For most of tvf, eg, s3/local/hdfs, the column stats is unknown.
* The derived tvf can override this method to compute the column stats.
*
* @param slots the slots of the tvf
* @return the column stats of the tvf
*/
public Statistics computeStats(List<Slot> slots) {
Map<Expression, ColumnStatistic> columnToStatistics = Maps.newHashMap();
for (Slot slot : slots) {
columnToStatistics.put(slot, ColumnStatistic.UNKNOWN);
}
return new Statistics(0, columnToStatistics);
}
public Properties getTVFProperties() {
return (Properties) child(0);

View File

@ -18,6 +18,7 @@
package org.apache.doris.nereids.trees.expressions.visitor;
import org.apache.doris.nereids.trees.expressions.functions.table.Hdfs;
import org.apache.doris.nereids.trees.expressions.functions.table.Local;
import org.apache.doris.nereids.trees.expressions.functions.table.Numbers;
import org.apache.doris.nereids.trees.expressions.functions.table.S3;
import org.apache.doris.nereids.trees.expressions.functions.table.TableValuedFunction;
@ -34,6 +35,10 @@ public interface TableValuedFunctionVisitor<R, C> {
return visitTableValuedFunction(hdfs, context);
}
default R visitLocal(Local local, C context) {
return visitTableValuedFunction(local, context);
}
default R visitS3(S3 s3, C context) {
return visitTableValuedFunction(s3, context);
}

View File

@ -29,7 +29,7 @@ import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.Utils;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import lombok.Getter;
@ -64,7 +64,7 @@ public class LogicalFileScan extends LogicalCatalogRelation {
public LogicalFileScan(RelationId id, ExternalTable table, List<String> qualifier) {
this(id, table, qualifier, Optional.empty(), Optional.empty(),
Sets.newHashSet(), SelectedPartitions.EMPTY);
Sets.newHashSet(), SelectedPartitions.NOT_PRUNED);
}
@Override
@ -121,39 +121,51 @@ public class LogicalFileScan extends LogicalCatalogRelation {
* Mainly for hive table partition pruning.
*/
public static class SelectedPartitions {
public static SelectedPartitions EMPTY = new SelectedPartitions(0, Maps.newHashMap(), false);
// NOT_PRUNED means the Nereids planner does not handle the partition pruning.
// This can be treated as the initial value of SelectedPartitions.
// Or used to indicate that the partition pruning is not processed.
public static SelectedPartitions NOT_PRUNED = new SelectedPartitions(0, ImmutableMap.of(), false);
/**
* total partition number
*/
public long totalPartitionNum = 0;
public final long totalPartitionNum;
/**
* partition id -> partition item
*/
public Map<Long, PartitionItem> selectedPartitions;
public final Map<Long, PartitionItem> selectedPartitions;
/**
* true means the result is after partition pruning
* false means the partition pruning is not processed.
*/
public boolean isPartitionPruned;
public final boolean isPruned;
/**
* Constructor for SelectedPartitions.
*/
public SelectedPartitions(long totalPartitionNum, Map<Long, PartitionItem> selectedPartitions,
boolean isPartitionPruned) {
boolean isPruned) {
this.totalPartitionNum = totalPartitionNum;
this.selectedPartitions = selectedPartitions;
this.isPartitionPruned = isPartitionPruned;
if (this.selectedPartitions == null) {
this.selectedPartitions = Maps.newHashMap();
}
this.selectedPartitions = ImmutableMap.copyOf(Objects.requireNonNull(selectedPartitions,
"selectedPartitions is null"));
this.isPruned = isPruned;
}
@Override
public boolean equals(Object o) {
return isPartitionPruned == ((SelectedPartitions) o).isPartitionPruned && Objects.equals(
selectedPartitions.keySet(), ((SelectedPartitions) o).selectedPartitions.keySet());
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SelectedPartitions that = (SelectedPartitions) o;
return isPruned == that.isPruned && Objects.equals(
selectedPartitions.keySet(), that.selectedPartitions.keySet());
}
@Override
public int hashCode() {
return Objects.hash(selectedPartitions, isPruned);
}
}
}

View File

@ -83,7 +83,7 @@ public class PhysicalFileScan extends PhysicalCatalogRelation {
"stats", statistics,
"conjuncts", conjuncts,
"selected partitions num",
selectedPartitions.isPartitionPruned ? selectedPartitions.selectedPartitions.size() : "unknown"
selectedPartitions.isPruned ? selectedPartitions.selectedPartitions.size() : "unknown"
);
}

View File

@ -85,6 +85,7 @@ public class HiveScanNode extends FileQueryScanNode {
protected final HMSExternalTable hmsTable;
private HiveTransaction hiveTransaction = null;
// will only be set in Nereids, for lagency planner, it should be null
@Setter
private SelectedPartitions selectedPartitions = null;
@ -127,7 +128,7 @@ public class HiveScanNode extends FileQueryScanNode {
List<Type> partitionColumnTypes = hmsTable.getPartitionColumnTypes();
if (!partitionColumnTypes.isEmpty()) {
// partitioned table
boolean isPartitionPruned = selectedPartitions == null ? false : selectedPartitions.isPartitionPruned;
boolean isPartitionPruned = selectedPartitions == null ? false : selectedPartitions.isPruned;
Collection<PartitionItem> partitionItems;
if (!isPartitionPruned) {
// partitionItems is null means that the partition is not pruned by Nereids,