diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java index 1a15fd0151..f859f9cc7d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java @@ -285,7 +285,8 @@ public class HiveMetaStoreCache { InputFormat inputFormat = HiveUtil.getInputFormat(jobConf, key.inputFormat, false); // TODO: This is a temp config, will remove it after the HiveSplitter is stable. if (key.useSelfSplitter) { - result = HiveSplitter.getFileCache(new Path(finalLocation), inputFormat, jobConf); + result = HiveSplitter.getFileCache(new Path(finalLocation), inputFormat, + jobConf, key.getPartitionValues()); } else { InputSplit[] splits; String remoteUser = jobConf.get(HdfsResource.HADOOP_USER_NAME); @@ -300,7 +301,7 @@ public class HiveMetaStoreCache { // Convert the hadoop split to Doris Split. for (int i = 0; i < splits.length; i++) { org.apache.hadoop.mapred.FileSplit fs = ((org.apache.hadoop.mapred.FileSplit) splits[i]); - result.addSplit(new FileSplit(fs.getPath(), fs.getStart(), fs.getLength(), -1, null)); + result.addSplit(new FileSplit(fs.getPath(), fs.getStart(), fs.getLength(), -1, null, null)); } } @@ -358,7 +359,11 @@ public class HiveMetaStoreCache { public List getFilesByPartitions(List partitions, boolean useSelfSplitter) { long start = System.currentTimeMillis(); List keys = Lists.newArrayListWithExpectedSize(partitions.size()); - partitions.stream().forEach(p -> keys.add(new FileCacheKey(p.getPath(), p.getInputFormat(), useSelfSplitter))); + partitions.stream().forEach(p -> { + FileCacheKey fileCacheKey = new FileCacheKey(p.getPath(), p.getInputFormat(), p.getPartitionValues()); + fileCacheKey.setUseSelfSplitter(useSelfSplitter); + keys.add(fileCacheKey); + }); Stream stream; if (partitions.size() < MIN_BATCH_FETCH_PARTITION_NUM) { @@ -368,7 +373,14 @@ public class HiveMetaStoreCache { } List fileLists = stream.map(k -> { try { - return fileCacheRef.get().get(k); + FileCacheValue fileCacheValue = fileCacheRef.get().get(k); + // Replace default hive partition with a null_string. + for (int i = 0; i < fileCacheValue.getValuesSize(); i++) { + if (HIVE_DEFAULT_PARTITION.equals(fileCacheValue.getPartitionValues().get(i))) { + fileCacheValue.getPartitionValues().set(i, FeConstants.null_string); + } + } + return fileCacheValue; } catch (ExecutionException e) { throw new RuntimeException(e); } @@ -412,7 +424,8 @@ public class HiveMetaStoreCache { PartitionCacheKey partKey = new PartitionCacheKey(dbName, tblName, values); HivePartition partition = partitionCache.getIfPresent(partKey); if (partition != null) { - fileCacheRef.get().invalidate(new FileCacheKey(partition.getPath(), null)); + fileCacheRef.get().invalidate(new FileCacheKey(partition.getPath(), + null, partition.getPartitionValues())); partitionCache.invalidate(partKey); } } @@ -430,7 +443,7 @@ public class HiveMetaStoreCache { Table table = catalog.getClient().getTable(dbName, tblName); // we just need to assign the `location` filed because the `equals` method of `FileCacheKey` // just compares the value of `location` - fileCacheRef.get().invalidate(new FileCacheKey(table.getSd().getLocation(), null)); + fileCacheRef.get().invalidate(new FileCacheKey(table.getSd().getLocation(), null, null)); } } @@ -443,7 +456,8 @@ public class HiveMetaStoreCache { PartitionCacheKey partKey = new PartitionCacheKey(dbName, tblName, values); HivePartition partition = partitionCache.getIfPresent(partKey); if (partition != null) { - fileCacheRef.get().invalidate(new FileCacheKey(partition.getPath(), null)); + fileCacheRef.get().invalidate(new FileCacheKey(partition.getPath(), + null, partition.getPartitionValues())); partitionCache.invalidate(partKey); } } @@ -691,19 +705,18 @@ public class HiveMetaStoreCache { // Temp variable, use self file splitter or use InputFormat.getSplits. // Will remove after self splitter is stable. private boolean useSelfSplitter; + // The values of partitions. + // e.g for file : hdfs://path/to/table/part1=a/part2=b/datafile + // partitionValues would be ["part1", "part2"] + protected List partitionValues; - public FileCacheKey(String location, String inputFormat) { + public FileCacheKey(String location, String inputFormat, List partitionValues) { this.location = location; this.inputFormat = inputFormat; + this.partitionValues = partitionValues == null ? Lists.newArrayList() : partitionValues; this.useSelfSplitter = true; } - public FileCacheKey(String location, String inputFormat, boolean useSelfSplitter) { - this.location = location; - this.inputFormat = inputFormat; - this.useSelfSplitter = useSelfSplitter; - } - @Override public boolean equals(Object obj) { if (this == obj) { @@ -712,12 +725,13 @@ public class HiveMetaStoreCache { if (!(obj instanceof FileCacheKey)) { return false; } - return location.equals(((FileCacheKey) obj).location); + return location.equals(((FileCacheKey) obj).location) + && partitionValues.equals(((FileCacheKey) obj).partitionValues); } @Override public int hashCode() { - return Objects.hash(location); + return Objects.hash(location, partitionValues); } @Override @@ -733,6 +747,10 @@ public class HiveMetaStoreCache { // File split cache for old splitter. This is a temp variable. private List splits; private boolean isSplittable; + // The values of partitions. + // e.g for file : hdfs://path/to/table/part1=a/part2=b/datafile + // partitionValues would be ["part1", "part2"] + protected List partitionValues; public void addFile(LocatedFileStatus file) { if (files == null) { @@ -752,6 +770,10 @@ public class HiveMetaStoreCache { } splits.add(split); } + + public int getValuesSize() { + return partitionValues == null ? 0 : partitionValues.size(); + } } @Data diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplit.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplit.java index dedbc9f469..bd0fa97cc4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplit.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplit.java @@ -22,6 +22,8 @@ import org.apache.doris.planner.Split; import lombok.Data; import org.apache.hadoop.fs.Path; +import java.util.List; + @Data public class FileSplit extends Split { protected Path path; @@ -33,13 +35,19 @@ public class FileSplit extends Split { // If the file length is not set, the file length will be fetched from the file system. protected long fileLength; protected TableFormatType tableFormatType; + // The values of partitions. + // e.g for file : hdfs://path/to/table/part1=a/part2=b/datafile + // partitionValues would be ["part1", "part2"] + protected List partitionValues; - public FileSplit(Path path, long start, long length, long fileLength, String[] hosts) { + public FileSplit(Path path, long start, long length, long fileLength, + String[] hosts, List partitionValues) { this.path = path; this.start = start; this.length = length; this.fileLength = fileLength; this.hosts = hosts; + this.partitionValues = partitionValues; } public String[] getHosts() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplitter.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplitter.java index b17704251a..37bb395254 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplitter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplitter.java @@ -142,19 +142,20 @@ public class HiveSplitter implements Splitter { if (fileCacheValue.getFiles() != null) { boolean isSplittable = fileCacheValue.isSplittable(); for (HiveMetaStoreCache.HiveFileStatus status : fileCacheValue.getFiles()) { - allFiles.addAll(splitFile(status, isSplittable)); + allFiles.addAll(splitFile(status, isSplittable, fileCacheValue.getPartitionValues())); } } } } - private List splitFile(HiveMetaStoreCache.HiveFileStatus status, boolean splittable) throws IOException { + private List splitFile(HiveMetaStoreCache.HiveFileStatus status, + boolean splittable, List partitionValues) throws IOException { List result = Lists.newArrayList(); if (!splittable) { LOG.debug("Path {} is not splittable.", status.getPath()); BlockLocation block = status.getBlockLocations()[0]; result.add(new FileSplit(status.getPath(), 0, status.getLength(), - status.getLength(), block.getHosts())); + status.getLength(), block.getHosts(), partitionValues)); return result; } long splitSize = ConnectContext.get().getSessionVariable().getFileSplitSize(); @@ -170,12 +171,12 @@ public class HiveSplitter implements Splitter { bytesRemaining -= splitSize) { int location = getBlockIndex(blockLocations, length - bytesRemaining); result.add(new FileSplit(status.getPath(), length - bytesRemaining, - splitSize, length, blockLocations[location].getHosts())); + splitSize, length, blockLocations[location].getHosts(), partitionValues)); } if (bytesRemaining != 0L) { int location = getBlockIndex(blockLocations, length - bytesRemaining); result.add(new FileSplit(status.getPath(), length - bytesRemaining, - bytesRemaining, length, blockLocations[location].getHosts())); + bytesRemaining, length, blockLocations[location].getHosts(), partitionValues)); } LOG.debug("Path {} includes {} splits.", status.getPath(), result.size()); @@ -192,15 +193,17 @@ public class HiveSplitter implements Splitter { // Get File Status by using FileSystem API. public static HiveMetaStoreCache.FileCacheValue getFileCache(Path path, InputFormat inputFormat, - JobConf jobConf) throws IOException { + JobConf jobConf, + List partitionValues) throws IOException { FileSystem fs = path.getFileSystem(jobConf); boolean splittable = HiveUtil.isSplittable(inputFormat, fs, path); - RemoteIterator locatedFileStatusRemoteIterator = fs.listFiles(path, true); + RemoteIterator locatedFileStatusRemoteIterator = fs.listFiles(path, false); HiveMetaStoreCache.FileCacheValue result = new HiveMetaStoreCache.FileCacheValue(); result.setSplittable(splittable); while (locatedFileStatusRemoteIterator.hasNext()) { result.addFile(locatedFileStatusRemoteIterator.next()); } + result.setPartitionValues(partitionValues); return result; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java index 4f9e1d2e17..e1e1d71272 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java @@ -119,12 +119,16 @@ public abstract class QueryScanProvider implements FileScanProviderIf { params.setProperties(locationProperties); } + List pathPartitionKeys = getPathPartitionKeys(); for (Split split : inputSplits) { TScanRangeLocations curLocations = newLocations(params, backendPolicy); FileSplit fileSplit = (FileSplit) split; - List pathPartitionKeys = getPathPartitionKeys(); - List partitionValuesFromPath = BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(), - pathPartitionKeys, false); + + // If fileSplit has partition values, use the values collected from hive partitions. + // Otherwise, use the values in file path. + List partitionValuesFromPath = fileSplit.getPartitionValues() == null + ? BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(), pathPartitionKeys, false) + : fileSplit.getPartitionValues(); TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath, pathPartitionKeys); // external data lake table diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFSplitter.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFSplitter.java index 5119f67f3b..e2f5e556aa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFSplitter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFSplitter.java @@ -58,7 +58,7 @@ public class TVFSplitter implements Splitter { splitSize = splitSize > DEFAULT_SPLIT_SIZE ? splitSize : DEFAULT_SPLIT_SIZE; addFileSplits(path, fileLength, splitSize, splits); } else { - Split split = new FileSplit(path, 0, fileLength, fileLength, new String[0]); + Split split = new FileSplit(path, 0, fileLength, fileLength, new String[0], null); splits.add(split); } } @@ -69,10 +69,10 @@ public class TVFSplitter implements Splitter { long bytesRemaining; for (bytesRemaining = fileSize; (double) bytesRemaining / (double) splitSize > 1.1D; bytesRemaining -= splitSize) { - splits.add(new FileSplit(path, fileSize - bytesRemaining, splitSize, fileSize, new String[0])); + splits.add(new FileSplit(path, fileSize - bytesRemaining, splitSize, fileSize, new String[0], null)); } if (bytesRemaining != 0L) { - splits.add(new FileSplit(path, fileSize - bytesRemaining, bytesRemaining, fileSize, new String[0])); + splits.add(new FileSplit(path, fileSize - bytesRemaining, bytesRemaining, fileSize, new String[0], null)); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java index 1a0f63d6f5..e840c9a876 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java @@ -28,7 +28,7 @@ import java.util.List; @Data public class IcebergSplit extends FileSplit { public IcebergSplit(Path file, long start, long length, long fileLength, String[] hosts) { - super(file, start, length, fileLength, hosts); + super(file, start, length, fileLength, hosts, null); } private Analyzer analyzer; diff --git a/regression-test/data/external_table_emr_p2/hive/test_hive_partition_location.out b/regression-test/data/external_table_emr_p2/hive/test_hive_partition_location.out new file mode 100644 index 0000000000..15d4e8f232 --- /dev/null +++ b/regression-test/data/external_table_emr_p2/hive/test_hive_partition_location.out @@ -0,0 +1,41 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !one_partition1 -- +1 Zhangsan part1 +2 Lisi part2 + +-- !one_partition2 -- +1 Zhangsan part1 + +-- !one_partition3 -- +2 Lisi part2 + +-- !one_partition4 -- +part1 + +-- !one_partition5 -- +part2 + +-- !one_partition6 -- +part1 +part2 + +-- !two_partition1 -- +1 Zhangsan part1_1 part2_1 +2 Lisi part1_2 part2_2 + +-- !two_partition2 -- +1 Zhangsan part1_1 part2_1 + +-- !two_partition3 -- +1 Zhangsan part1_1 part2_1 + +-- !two_partition4 -- +2 Lisi part1_2 part2_2 + +-- !two_partition5 -- +2 Lisi part1_2 part2_2 + +-- !two_partition6 -- +part1_1 part2_1 +part1_2 part2_2 + diff --git a/regression-test/suites/external_table_emr_p2/hive/test_hive_partition_location.groovy b/regression-test/suites/external_table_emr_p2/hive/test_hive_partition_location.groovy new file mode 100644 index 0000000000..19bfbf36f1 --- /dev/null +++ b/regression-test/suites/external_table_emr_p2/hive/test_hive_partition_location.groovy @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_hive_partition_location", "p2") { + def one_partition1 = """select * from partition_location_1 order by id;""" + def one_partition2 = """select * from partition_location_1 where part='part1';""" + def one_partition3 = """select * from partition_location_1 where part='part2';""" + def one_partition4 = """select part from partition_location_1 where part='part1';""" + def one_partition5 = """select part from partition_location_1 where part='part2';""" + def one_partition6 = """select part from partition_location_1 order by part;""" + + def two_partition1 = """select * from partition_location_2 order by id;""" + def two_partition2 = """select * from partition_location_2 where part1='part1_1';""" + def two_partition3 = """select * from partition_location_2 where part2='part2_1';""" + def two_partition4 = """select * from partition_location_2 where part1='part1_2';""" + def two_partition5 = """select * from partition_location_2 where part2='part2_2';""" + def two_partition6 = """select part1, part2 from partition_location_2 order by part1;""" + + String enabled = context.config.otherConfigs.get("enableExternalHiveTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String extHiveHmsHost = context.config.otherConfigs.get("extHiveHmsHost") + String extHiveHmsPort = context.config.otherConfigs.get("extHiveHmsPort") + String catalog_name = "hive_partition_location" + sql """drop catalog if exists ${catalog_name};""" + sql """ + create catalog if not exists ${catalog_name} properties ( + 'type'='hms', + 'hive.metastore.uris' = 'thrift://${extHiveHmsHost}:${extHiveHmsPort}' + ); + """ + logger.info("catalog " + catalog_name + " created") + sql """switch ${catalog_name};""" + logger.info("switched to catalog " + catalog_name) + sql """use multi_catalog;""" + qt_one_partition1 one_partition1 + qt_one_partition2 one_partition2 + qt_one_partition3 one_partition3 + qt_one_partition4 one_partition4 + qt_one_partition5 one_partition5 + qt_one_partition6 one_partition6 + + qt_two_partition1 two_partition1 + qt_two_partition2 two_partition2 + qt_two_partition3 two_partition3 + qt_two_partition4 two_partition4 + qt_two_partition5 two_partition5 + qt_two_partition6 two_partition6 + sql """drop catalog if exists ${catalog_name};""" + } +} +