From 59d8aa5a6f2f135bb8463ccd790ed3a77a20009c Mon Sep 17 00:00:00 2001 From: Jibing-Li <64681310+Jibing-Li@users.noreply.github.com> Date: Wed, 26 Apr 2023 17:18:51 +0800 Subject: [PATCH] [Fix](multi catalog)Fix Hive partition path doesn't contain partition value case bug (#19053) Hive support create partition with a specific location. In this case, the file path for the create partition may not contain the partition name and value. Which will cause Doris fail to query the the hive partition. This pr is to fix this bug. --- .../datasource/hive/HiveMetaStoreCache.java | 54 ++++++++++----- .../doris/planner/external/FileSplit.java | 10 ++- .../doris/planner/external/HiveSplitter.java | 17 +++-- .../planner/external/QueryScanProvider.java | 10 ++- .../doris/planner/external/TVFSplitter.java | 6 +- .../external/iceberg/IcebergSplit.java | 2 +- .../hive/test_hive_partition_location.out | 41 ++++++++++++ .../hive/test_hive_partition_location.groovy | 65 +++++++++++++++++++ 8 files changed, 174 insertions(+), 31 deletions(-) create mode 100644 regression-test/data/external_table_emr_p2/hive/test_hive_partition_location.out create mode 100644 regression-test/suites/external_table_emr_p2/hive/test_hive_partition_location.groovy diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java index 1a15fd0151..f859f9cc7d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java @@ -285,7 +285,8 @@ public class HiveMetaStoreCache { InputFormat inputFormat = HiveUtil.getInputFormat(jobConf, key.inputFormat, false); // TODO: This is a temp config, will remove it after the HiveSplitter is stable. if (key.useSelfSplitter) { - result = HiveSplitter.getFileCache(new Path(finalLocation), inputFormat, jobConf); + result = HiveSplitter.getFileCache(new Path(finalLocation), inputFormat, + jobConf, key.getPartitionValues()); } else { InputSplit[] splits; String remoteUser = jobConf.get(HdfsResource.HADOOP_USER_NAME); @@ -300,7 +301,7 @@ public class HiveMetaStoreCache { // Convert the hadoop split to Doris Split. for (int i = 0; i < splits.length; i++) { org.apache.hadoop.mapred.FileSplit fs = ((org.apache.hadoop.mapred.FileSplit) splits[i]); - result.addSplit(new FileSplit(fs.getPath(), fs.getStart(), fs.getLength(), -1, null)); + result.addSplit(new FileSplit(fs.getPath(), fs.getStart(), fs.getLength(), -1, null, null)); } } @@ -358,7 +359,11 @@ public class HiveMetaStoreCache { public List getFilesByPartitions(List partitions, boolean useSelfSplitter) { long start = System.currentTimeMillis(); List keys = Lists.newArrayListWithExpectedSize(partitions.size()); - partitions.stream().forEach(p -> keys.add(new FileCacheKey(p.getPath(), p.getInputFormat(), useSelfSplitter))); + partitions.stream().forEach(p -> { + FileCacheKey fileCacheKey = new FileCacheKey(p.getPath(), p.getInputFormat(), p.getPartitionValues()); + fileCacheKey.setUseSelfSplitter(useSelfSplitter); + keys.add(fileCacheKey); + }); Stream stream; if (partitions.size() < MIN_BATCH_FETCH_PARTITION_NUM) { @@ -368,7 +373,14 @@ public class HiveMetaStoreCache { } List fileLists = stream.map(k -> { try { - return fileCacheRef.get().get(k); + FileCacheValue fileCacheValue = fileCacheRef.get().get(k); + // Replace default hive partition with a null_string. + for (int i = 0; i < fileCacheValue.getValuesSize(); i++) { + if (HIVE_DEFAULT_PARTITION.equals(fileCacheValue.getPartitionValues().get(i))) { + fileCacheValue.getPartitionValues().set(i, FeConstants.null_string); + } + } + return fileCacheValue; } catch (ExecutionException e) { throw new RuntimeException(e); } @@ -412,7 +424,8 @@ public class HiveMetaStoreCache { PartitionCacheKey partKey = new PartitionCacheKey(dbName, tblName, values); HivePartition partition = partitionCache.getIfPresent(partKey); if (partition != null) { - fileCacheRef.get().invalidate(new FileCacheKey(partition.getPath(), null)); + fileCacheRef.get().invalidate(new FileCacheKey(partition.getPath(), + null, partition.getPartitionValues())); partitionCache.invalidate(partKey); } } @@ -430,7 +443,7 @@ public class HiveMetaStoreCache { Table table = catalog.getClient().getTable(dbName, tblName); // we just need to assign the `location` filed because the `equals` method of `FileCacheKey` // just compares the value of `location` - fileCacheRef.get().invalidate(new FileCacheKey(table.getSd().getLocation(), null)); + fileCacheRef.get().invalidate(new FileCacheKey(table.getSd().getLocation(), null, null)); } } @@ -443,7 +456,8 @@ public class HiveMetaStoreCache { PartitionCacheKey partKey = new PartitionCacheKey(dbName, tblName, values); HivePartition partition = partitionCache.getIfPresent(partKey); if (partition != null) { - fileCacheRef.get().invalidate(new FileCacheKey(partition.getPath(), null)); + fileCacheRef.get().invalidate(new FileCacheKey(partition.getPath(), + null, partition.getPartitionValues())); partitionCache.invalidate(partKey); } } @@ -691,19 +705,18 @@ public class HiveMetaStoreCache { // Temp variable, use self file splitter or use InputFormat.getSplits. // Will remove after self splitter is stable. private boolean useSelfSplitter; + // The values of partitions. + // e.g for file : hdfs://path/to/table/part1=a/part2=b/datafile + // partitionValues would be ["part1", "part2"] + protected List partitionValues; - public FileCacheKey(String location, String inputFormat) { + public FileCacheKey(String location, String inputFormat, List partitionValues) { this.location = location; this.inputFormat = inputFormat; + this.partitionValues = partitionValues == null ? Lists.newArrayList() : partitionValues; this.useSelfSplitter = true; } - public FileCacheKey(String location, String inputFormat, boolean useSelfSplitter) { - this.location = location; - this.inputFormat = inputFormat; - this.useSelfSplitter = useSelfSplitter; - } - @Override public boolean equals(Object obj) { if (this == obj) { @@ -712,12 +725,13 @@ public class HiveMetaStoreCache { if (!(obj instanceof FileCacheKey)) { return false; } - return location.equals(((FileCacheKey) obj).location); + return location.equals(((FileCacheKey) obj).location) + && partitionValues.equals(((FileCacheKey) obj).partitionValues); } @Override public int hashCode() { - return Objects.hash(location); + return Objects.hash(location, partitionValues); } @Override @@ -733,6 +747,10 @@ public class HiveMetaStoreCache { // File split cache for old splitter. This is a temp variable. private List splits; private boolean isSplittable; + // The values of partitions. + // e.g for file : hdfs://path/to/table/part1=a/part2=b/datafile + // partitionValues would be ["part1", "part2"] + protected List partitionValues; public void addFile(LocatedFileStatus file) { if (files == null) { @@ -752,6 +770,10 @@ public class HiveMetaStoreCache { } splits.add(split); } + + public int getValuesSize() { + return partitionValues == null ? 0 : partitionValues.size(); + } } @Data diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplit.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplit.java index dedbc9f469..bd0fa97cc4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplit.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplit.java @@ -22,6 +22,8 @@ import org.apache.doris.planner.Split; import lombok.Data; import org.apache.hadoop.fs.Path; +import java.util.List; + @Data public class FileSplit extends Split { protected Path path; @@ -33,13 +35,19 @@ public class FileSplit extends Split { // If the file length is not set, the file length will be fetched from the file system. protected long fileLength; protected TableFormatType tableFormatType; + // The values of partitions. + // e.g for file : hdfs://path/to/table/part1=a/part2=b/datafile + // partitionValues would be ["part1", "part2"] + protected List partitionValues; - public FileSplit(Path path, long start, long length, long fileLength, String[] hosts) { + public FileSplit(Path path, long start, long length, long fileLength, + String[] hosts, List partitionValues) { this.path = path; this.start = start; this.length = length; this.fileLength = fileLength; this.hosts = hosts; + this.partitionValues = partitionValues; } public String[] getHosts() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplitter.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplitter.java index b17704251a..37bb395254 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplitter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplitter.java @@ -142,19 +142,20 @@ public class HiveSplitter implements Splitter { if (fileCacheValue.getFiles() != null) { boolean isSplittable = fileCacheValue.isSplittable(); for (HiveMetaStoreCache.HiveFileStatus status : fileCacheValue.getFiles()) { - allFiles.addAll(splitFile(status, isSplittable)); + allFiles.addAll(splitFile(status, isSplittable, fileCacheValue.getPartitionValues())); } } } } - private List splitFile(HiveMetaStoreCache.HiveFileStatus status, boolean splittable) throws IOException { + private List splitFile(HiveMetaStoreCache.HiveFileStatus status, + boolean splittable, List partitionValues) throws IOException { List result = Lists.newArrayList(); if (!splittable) { LOG.debug("Path {} is not splittable.", status.getPath()); BlockLocation block = status.getBlockLocations()[0]; result.add(new FileSplit(status.getPath(), 0, status.getLength(), - status.getLength(), block.getHosts())); + status.getLength(), block.getHosts(), partitionValues)); return result; } long splitSize = ConnectContext.get().getSessionVariable().getFileSplitSize(); @@ -170,12 +171,12 @@ public class HiveSplitter implements Splitter { bytesRemaining -= splitSize) { int location = getBlockIndex(blockLocations, length - bytesRemaining); result.add(new FileSplit(status.getPath(), length - bytesRemaining, - splitSize, length, blockLocations[location].getHosts())); + splitSize, length, blockLocations[location].getHosts(), partitionValues)); } if (bytesRemaining != 0L) { int location = getBlockIndex(blockLocations, length - bytesRemaining); result.add(new FileSplit(status.getPath(), length - bytesRemaining, - bytesRemaining, length, blockLocations[location].getHosts())); + bytesRemaining, length, blockLocations[location].getHosts(), partitionValues)); } LOG.debug("Path {} includes {} splits.", status.getPath(), result.size()); @@ -192,15 +193,17 @@ public class HiveSplitter implements Splitter { // Get File Status by using FileSystem API. public static HiveMetaStoreCache.FileCacheValue getFileCache(Path path, InputFormat inputFormat, - JobConf jobConf) throws IOException { + JobConf jobConf, + List partitionValues) throws IOException { FileSystem fs = path.getFileSystem(jobConf); boolean splittable = HiveUtil.isSplittable(inputFormat, fs, path); - RemoteIterator locatedFileStatusRemoteIterator = fs.listFiles(path, true); + RemoteIterator locatedFileStatusRemoteIterator = fs.listFiles(path, false); HiveMetaStoreCache.FileCacheValue result = new HiveMetaStoreCache.FileCacheValue(); result.setSplittable(splittable); while (locatedFileStatusRemoteIterator.hasNext()) { result.addFile(locatedFileStatusRemoteIterator.next()); } + result.setPartitionValues(partitionValues); return result; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java index 4f9e1d2e17..e1e1d71272 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java @@ -119,12 +119,16 @@ public abstract class QueryScanProvider implements FileScanProviderIf { params.setProperties(locationProperties); } + List pathPartitionKeys = getPathPartitionKeys(); for (Split split : inputSplits) { TScanRangeLocations curLocations = newLocations(params, backendPolicy); FileSplit fileSplit = (FileSplit) split; - List pathPartitionKeys = getPathPartitionKeys(); - List partitionValuesFromPath = BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(), - pathPartitionKeys, false); + + // If fileSplit has partition values, use the values collected from hive partitions. + // Otherwise, use the values in file path. + List partitionValuesFromPath = fileSplit.getPartitionValues() == null + ? BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(), pathPartitionKeys, false) + : fileSplit.getPartitionValues(); TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath, pathPartitionKeys); // external data lake table diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFSplitter.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFSplitter.java index 5119f67f3b..e2f5e556aa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFSplitter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFSplitter.java @@ -58,7 +58,7 @@ public class TVFSplitter implements Splitter { splitSize = splitSize > DEFAULT_SPLIT_SIZE ? splitSize : DEFAULT_SPLIT_SIZE; addFileSplits(path, fileLength, splitSize, splits); } else { - Split split = new FileSplit(path, 0, fileLength, fileLength, new String[0]); + Split split = new FileSplit(path, 0, fileLength, fileLength, new String[0], null); splits.add(split); } } @@ -69,10 +69,10 @@ public class TVFSplitter implements Splitter { long bytesRemaining; for (bytesRemaining = fileSize; (double) bytesRemaining / (double) splitSize > 1.1D; bytesRemaining -= splitSize) { - splits.add(new FileSplit(path, fileSize - bytesRemaining, splitSize, fileSize, new String[0])); + splits.add(new FileSplit(path, fileSize - bytesRemaining, splitSize, fileSize, new String[0], null)); } if (bytesRemaining != 0L) { - splits.add(new FileSplit(path, fileSize - bytesRemaining, bytesRemaining, fileSize, new String[0])); + splits.add(new FileSplit(path, fileSize - bytesRemaining, bytesRemaining, fileSize, new String[0], null)); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java index 1a0f63d6f5..e840c9a876 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java @@ -28,7 +28,7 @@ import java.util.List; @Data public class IcebergSplit extends FileSplit { public IcebergSplit(Path file, long start, long length, long fileLength, String[] hosts) { - super(file, start, length, fileLength, hosts); + super(file, start, length, fileLength, hosts, null); } private Analyzer analyzer; diff --git a/regression-test/data/external_table_emr_p2/hive/test_hive_partition_location.out b/regression-test/data/external_table_emr_p2/hive/test_hive_partition_location.out new file mode 100644 index 0000000000..15d4e8f232 --- /dev/null +++ b/regression-test/data/external_table_emr_p2/hive/test_hive_partition_location.out @@ -0,0 +1,41 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !one_partition1 -- +1 Zhangsan part1 +2 Lisi part2 + +-- !one_partition2 -- +1 Zhangsan part1 + +-- !one_partition3 -- +2 Lisi part2 + +-- !one_partition4 -- +part1 + +-- !one_partition5 -- +part2 + +-- !one_partition6 -- +part1 +part2 + +-- !two_partition1 -- +1 Zhangsan part1_1 part2_1 +2 Lisi part1_2 part2_2 + +-- !two_partition2 -- +1 Zhangsan part1_1 part2_1 + +-- !two_partition3 -- +1 Zhangsan part1_1 part2_1 + +-- !two_partition4 -- +2 Lisi part1_2 part2_2 + +-- !two_partition5 -- +2 Lisi part1_2 part2_2 + +-- !two_partition6 -- +part1_1 part2_1 +part1_2 part2_2 + diff --git a/regression-test/suites/external_table_emr_p2/hive/test_hive_partition_location.groovy b/regression-test/suites/external_table_emr_p2/hive/test_hive_partition_location.groovy new file mode 100644 index 0000000000..19bfbf36f1 --- /dev/null +++ b/regression-test/suites/external_table_emr_p2/hive/test_hive_partition_location.groovy @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_hive_partition_location", "p2") { + def one_partition1 = """select * from partition_location_1 order by id;""" + def one_partition2 = """select * from partition_location_1 where part='part1';""" + def one_partition3 = """select * from partition_location_1 where part='part2';""" + def one_partition4 = """select part from partition_location_1 where part='part1';""" + def one_partition5 = """select part from partition_location_1 where part='part2';""" + def one_partition6 = """select part from partition_location_1 order by part;""" + + def two_partition1 = """select * from partition_location_2 order by id;""" + def two_partition2 = """select * from partition_location_2 where part1='part1_1';""" + def two_partition3 = """select * from partition_location_2 where part2='part2_1';""" + def two_partition4 = """select * from partition_location_2 where part1='part1_2';""" + def two_partition5 = """select * from partition_location_2 where part2='part2_2';""" + def two_partition6 = """select part1, part2 from partition_location_2 order by part1;""" + + String enabled = context.config.otherConfigs.get("enableExternalHiveTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String extHiveHmsHost = context.config.otherConfigs.get("extHiveHmsHost") + String extHiveHmsPort = context.config.otherConfigs.get("extHiveHmsPort") + String catalog_name = "hive_partition_location" + sql """drop catalog if exists ${catalog_name};""" + sql """ + create catalog if not exists ${catalog_name} properties ( + 'type'='hms', + 'hive.metastore.uris' = 'thrift://${extHiveHmsHost}:${extHiveHmsPort}' + ); + """ + logger.info("catalog " + catalog_name + " created") + sql """switch ${catalog_name};""" + logger.info("switched to catalog " + catalog_name) + sql """use multi_catalog;""" + qt_one_partition1 one_partition1 + qt_one_partition2 one_partition2 + qt_one_partition3 one_partition3 + qt_one_partition4 one_partition4 + qt_one_partition5 one_partition5 + qt_one_partition6 one_partition6 + + qt_two_partition1 two_partition1 + qt_two_partition2 two_partition2 + qt_two_partition3 two_partition3 + qt_two_partition4 two_partition4 + qt_two_partition5 two_partition5 + qt_two_partition6 two_partition6 + sql """drop catalog if exists ${catalog_name};""" + } +} +