[Fix](multi catalog)Fix Hive partition path doesn't contain partition value case bug (#19053)

Hive support create partition with a specific location. In this case, the file path for the create partition may not contain the partition name and value. Which will cause Doris fail to query the the hive partition.
This pr is to fix this bug.
This commit is contained in:
Jibing-Li
2023-04-26 17:18:51 +08:00
committed by GitHub
parent 60cda12e57
commit 59d8aa5a6f
8 changed files with 174 additions and 31 deletions

View File

@ -285,7 +285,8 @@ public class HiveMetaStoreCache {
InputFormat<?, ?> inputFormat = HiveUtil.getInputFormat(jobConf, key.inputFormat, false);
// TODO: This is a temp config, will remove it after the HiveSplitter is stable.
if (key.useSelfSplitter) {
result = HiveSplitter.getFileCache(new Path(finalLocation), inputFormat, jobConf);
result = HiveSplitter.getFileCache(new Path(finalLocation), inputFormat,
jobConf, key.getPartitionValues());
} else {
InputSplit[] splits;
String remoteUser = jobConf.get(HdfsResource.HADOOP_USER_NAME);
@ -300,7 +301,7 @@ public class HiveMetaStoreCache {
// Convert the hadoop split to Doris Split.
for (int i = 0; i < splits.length; i++) {
org.apache.hadoop.mapred.FileSplit fs = ((org.apache.hadoop.mapred.FileSplit) splits[i]);
result.addSplit(new FileSplit(fs.getPath(), fs.getStart(), fs.getLength(), -1, null));
result.addSplit(new FileSplit(fs.getPath(), fs.getStart(), fs.getLength(), -1, null, null));
}
}
@ -358,7 +359,11 @@ public class HiveMetaStoreCache {
public List<FileCacheValue> getFilesByPartitions(List<HivePartition> partitions, boolean useSelfSplitter) {
long start = System.currentTimeMillis();
List<FileCacheKey> keys = Lists.newArrayListWithExpectedSize(partitions.size());
partitions.stream().forEach(p -> keys.add(new FileCacheKey(p.getPath(), p.getInputFormat(), useSelfSplitter)));
partitions.stream().forEach(p -> {
FileCacheKey fileCacheKey = new FileCacheKey(p.getPath(), p.getInputFormat(), p.getPartitionValues());
fileCacheKey.setUseSelfSplitter(useSelfSplitter);
keys.add(fileCacheKey);
});
Stream<FileCacheKey> stream;
if (partitions.size() < MIN_BATCH_FETCH_PARTITION_NUM) {
@ -368,7 +373,14 @@ public class HiveMetaStoreCache {
}
List<FileCacheValue> fileLists = stream.map(k -> {
try {
return fileCacheRef.get().get(k);
FileCacheValue fileCacheValue = fileCacheRef.get().get(k);
// Replace default hive partition with a null_string.
for (int i = 0; i < fileCacheValue.getValuesSize(); i++) {
if (HIVE_DEFAULT_PARTITION.equals(fileCacheValue.getPartitionValues().get(i))) {
fileCacheValue.getPartitionValues().set(i, FeConstants.null_string);
}
}
return fileCacheValue;
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
@ -412,7 +424,8 @@ public class HiveMetaStoreCache {
PartitionCacheKey partKey = new PartitionCacheKey(dbName, tblName, values);
HivePartition partition = partitionCache.getIfPresent(partKey);
if (partition != null) {
fileCacheRef.get().invalidate(new FileCacheKey(partition.getPath(), null));
fileCacheRef.get().invalidate(new FileCacheKey(partition.getPath(),
null, partition.getPartitionValues()));
partitionCache.invalidate(partKey);
}
}
@ -430,7 +443,7 @@ public class HiveMetaStoreCache {
Table table = catalog.getClient().getTable(dbName, tblName);
// we just need to assign the `location` filed because the `equals` method of `FileCacheKey`
// just compares the value of `location`
fileCacheRef.get().invalidate(new FileCacheKey(table.getSd().getLocation(), null));
fileCacheRef.get().invalidate(new FileCacheKey(table.getSd().getLocation(), null, null));
}
}
@ -443,7 +456,8 @@ public class HiveMetaStoreCache {
PartitionCacheKey partKey = new PartitionCacheKey(dbName, tblName, values);
HivePartition partition = partitionCache.getIfPresent(partKey);
if (partition != null) {
fileCacheRef.get().invalidate(new FileCacheKey(partition.getPath(), null));
fileCacheRef.get().invalidate(new FileCacheKey(partition.getPath(),
null, partition.getPartitionValues()));
partitionCache.invalidate(partKey);
}
}
@ -691,19 +705,18 @@ public class HiveMetaStoreCache {
// Temp variable, use self file splitter or use InputFormat.getSplits.
// Will remove after self splitter is stable.
private boolean useSelfSplitter;
// The values of partitions.
// e.g for file : hdfs://path/to/table/part1=a/part2=b/datafile
// partitionValues would be ["part1", "part2"]
protected List<String> partitionValues;
public FileCacheKey(String location, String inputFormat) {
public FileCacheKey(String location, String inputFormat, List<String> partitionValues) {
this.location = location;
this.inputFormat = inputFormat;
this.partitionValues = partitionValues == null ? Lists.newArrayList() : partitionValues;
this.useSelfSplitter = true;
}
public FileCacheKey(String location, String inputFormat, boolean useSelfSplitter) {
this.location = location;
this.inputFormat = inputFormat;
this.useSelfSplitter = useSelfSplitter;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
@ -712,12 +725,13 @@ public class HiveMetaStoreCache {
if (!(obj instanceof FileCacheKey)) {
return false;
}
return location.equals(((FileCacheKey) obj).location);
return location.equals(((FileCacheKey) obj).location)
&& partitionValues.equals(((FileCacheKey) obj).partitionValues);
}
@Override
public int hashCode() {
return Objects.hash(location);
return Objects.hash(location, partitionValues);
}
@Override
@ -733,6 +747,10 @@ public class HiveMetaStoreCache {
// File split cache for old splitter. This is a temp variable.
private List<Split> splits;
private boolean isSplittable;
// The values of partitions.
// e.g for file : hdfs://path/to/table/part1=a/part2=b/datafile
// partitionValues would be ["part1", "part2"]
protected List<String> partitionValues;
public void addFile(LocatedFileStatus file) {
if (files == null) {
@ -752,6 +770,10 @@ public class HiveMetaStoreCache {
}
splits.add(split);
}
public int getValuesSize() {
return partitionValues == null ? 0 : partitionValues.size();
}
}
@Data

View File

@ -22,6 +22,8 @@ import org.apache.doris.planner.Split;
import lombok.Data;
import org.apache.hadoop.fs.Path;
import java.util.List;
@Data
public class FileSplit extends Split {
protected Path path;
@ -33,13 +35,19 @@ public class FileSplit extends Split {
// If the file length is not set, the file length will be fetched from the file system.
protected long fileLength;
protected TableFormatType tableFormatType;
// The values of partitions.
// e.g for file : hdfs://path/to/table/part1=a/part2=b/datafile
// partitionValues would be ["part1", "part2"]
protected List<String> partitionValues;
public FileSplit(Path path, long start, long length, long fileLength, String[] hosts) {
public FileSplit(Path path, long start, long length, long fileLength,
String[] hosts, List<String> partitionValues) {
this.path = path;
this.start = start;
this.length = length;
this.fileLength = fileLength;
this.hosts = hosts;
this.partitionValues = partitionValues;
}
public String[] getHosts() {

View File

@ -142,19 +142,20 @@ public class HiveSplitter implements Splitter {
if (fileCacheValue.getFiles() != null) {
boolean isSplittable = fileCacheValue.isSplittable();
for (HiveMetaStoreCache.HiveFileStatus status : fileCacheValue.getFiles()) {
allFiles.addAll(splitFile(status, isSplittable));
allFiles.addAll(splitFile(status, isSplittable, fileCacheValue.getPartitionValues()));
}
}
}
}
private List<Split> splitFile(HiveMetaStoreCache.HiveFileStatus status, boolean splittable) throws IOException {
private List<Split> splitFile(HiveMetaStoreCache.HiveFileStatus status,
boolean splittable, List<String> partitionValues) throws IOException {
List<Split> result = Lists.newArrayList();
if (!splittable) {
LOG.debug("Path {} is not splittable.", status.getPath());
BlockLocation block = status.getBlockLocations()[0];
result.add(new FileSplit(status.getPath(), 0, status.getLength(),
status.getLength(), block.getHosts()));
status.getLength(), block.getHosts(), partitionValues));
return result;
}
long splitSize = ConnectContext.get().getSessionVariable().getFileSplitSize();
@ -170,12 +171,12 @@ public class HiveSplitter implements Splitter {
bytesRemaining -= splitSize) {
int location = getBlockIndex(blockLocations, length - bytesRemaining);
result.add(new FileSplit(status.getPath(), length - bytesRemaining,
splitSize, length, blockLocations[location].getHosts()));
splitSize, length, blockLocations[location].getHosts(), partitionValues));
}
if (bytesRemaining != 0L) {
int location = getBlockIndex(blockLocations, length - bytesRemaining);
result.add(new FileSplit(status.getPath(), length - bytesRemaining,
bytesRemaining, length, blockLocations[location].getHosts()));
bytesRemaining, length, blockLocations[location].getHosts(), partitionValues));
}
LOG.debug("Path {} includes {} splits.", status.getPath(), result.size());
@ -192,15 +193,17 @@ public class HiveSplitter implements Splitter {
// Get File Status by using FileSystem API.
public static HiveMetaStoreCache.FileCacheValue getFileCache(Path path, InputFormat<?, ?> inputFormat,
JobConf jobConf) throws IOException {
JobConf jobConf,
List<String> partitionValues) throws IOException {
FileSystem fs = path.getFileSystem(jobConf);
boolean splittable = HiveUtil.isSplittable(inputFormat, fs, path);
RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = fs.listFiles(path, true);
RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = fs.listFiles(path, false);
HiveMetaStoreCache.FileCacheValue result = new HiveMetaStoreCache.FileCacheValue();
result.setSplittable(splittable);
while (locatedFileStatusRemoteIterator.hasNext()) {
result.addFile(locatedFileStatusRemoteIterator.next());
}
result.setPartitionValues(partitionValues);
return result;
}

View File

@ -119,12 +119,16 @@ public abstract class QueryScanProvider implements FileScanProviderIf {
params.setProperties(locationProperties);
}
List<String> pathPartitionKeys = getPathPartitionKeys();
for (Split split : inputSplits) {
TScanRangeLocations curLocations = newLocations(params, backendPolicy);
FileSplit fileSplit = (FileSplit) split;
List<String> pathPartitionKeys = getPathPartitionKeys();
List<String> partitionValuesFromPath = BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(),
pathPartitionKeys, false);
// If fileSplit has partition values, use the values collected from hive partitions.
// Otherwise, use the values in file path.
List<String> partitionValuesFromPath = fileSplit.getPartitionValues() == null
? BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(), pathPartitionKeys, false)
: fileSplit.getPartitionValues();
TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath, pathPartitionKeys);
// external data lake table

View File

@ -58,7 +58,7 @@ public class TVFSplitter implements Splitter {
splitSize = splitSize > DEFAULT_SPLIT_SIZE ? splitSize : DEFAULT_SPLIT_SIZE;
addFileSplits(path, fileLength, splitSize, splits);
} else {
Split split = new FileSplit(path, 0, fileLength, fileLength, new String[0]);
Split split = new FileSplit(path, 0, fileLength, fileLength, new String[0], null);
splits.add(split);
}
}
@ -69,10 +69,10 @@ public class TVFSplitter implements Splitter {
long bytesRemaining;
for (bytesRemaining = fileSize; (double) bytesRemaining / (double) splitSize > 1.1D;
bytesRemaining -= splitSize) {
splits.add(new FileSplit(path, fileSize - bytesRemaining, splitSize, fileSize, new String[0]));
splits.add(new FileSplit(path, fileSize - bytesRemaining, splitSize, fileSize, new String[0], null));
}
if (bytesRemaining != 0L) {
splits.add(new FileSplit(path, fileSize - bytesRemaining, bytesRemaining, fileSize, new String[0]));
splits.add(new FileSplit(path, fileSize - bytesRemaining, bytesRemaining, fileSize, new String[0], null));
}
}

View File

@ -28,7 +28,7 @@ import java.util.List;
@Data
public class IcebergSplit extends FileSplit {
public IcebergSplit(Path file, long start, long length, long fileLength, String[] hosts) {
super(file, start, length, fileLength, hosts);
super(file, start, length, fileLength, hosts, null);
}
private Analyzer analyzer;