[Improvement](multi catalog)Change hive metastore cache split value type to Doris defined Split. Fix split file length -1 bug (#18319)
HiveMetastoreCache type for file split was Hadoop InputSplit. In this pr, change it to Doris defined Split This change could avoid convert it every time. Also fix the explain verbose result return -1 for split file length.
This commit is contained in:
@ -36,6 +36,7 @@ import org.apache.doris.metric.MetricRepo;
|
||||
import org.apache.doris.planner.ColumnBound;
|
||||
import org.apache.doris.planner.ListPartitionPrunerV2;
|
||||
import org.apache.doris.planner.PartitionPrunerV2Base.UniqueId;
|
||||
import org.apache.doris.planner.external.FileSplit;
|
||||
import org.apache.doris.planner.external.HiveSplitter;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
@ -96,7 +97,7 @@ public class HiveMetaStoreCache {
|
||||
// cache from <dbname-tblname-partition_values> -> <partition info>
|
||||
private LoadingCache<PartitionCacheKey, HivePartition> partitionCache;
|
||||
// the ref of cache from <location> -> <file list>
|
||||
private volatile AtomicReference<LoadingCache<FileCacheKey, ImmutableList<InputSplit>>> fileCacheRef
|
||||
private volatile AtomicReference<LoadingCache<FileCacheKey, ImmutableList<FileSplit>>> fileCacheRef
|
||||
= new AtomicReference<>();
|
||||
|
||||
public HiveMetaStoreCache(HMSExternalCatalog catalog, Executor executor) {
|
||||
@ -147,10 +148,10 @@ public class HiveMetaStoreCache {
|
||||
}
|
||||
// if the file.meta.cache.ttl-second is equal 0, use the synchronous loader
|
||||
// if the file.meta.cache.ttl-second greater than 0, use the asynchronous loader
|
||||
CacheLoader<FileCacheKey, ImmutableList<InputSplit>> loader = getGuavaCacheLoader(executor,
|
||||
CacheLoader<FileCacheKey, ImmutableList<FileSplit>> loader = getGuavaCacheLoader(executor,
|
||||
fileMetaCacheTtlSecond);
|
||||
|
||||
LoadingCache<FileCacheKey, ImmutableList<InputSplit>> preFileCache = fileCacheRef.get();
|
||||
LoadingCache<FileCacheKey, ImmutableList<FileSplit>> preFileCache = fileCacheRef.get();
|
||||
|
||||
fileCacheRef.set(fileCacheBuilder.build(loader));
|
||||
if (Objects.nonNull(preFileCache)) {
|
||||
@ -261,7 +262,7 @@ public class HiveMetaStoreCache {
|
||||
return new HivePartition(sd.getInputFormat(), sd.getLocation(), key.values);
|
||||
}
|
||||
|
||||
private ImmutableList<InputSplit> loadFiles(FileCacheKey key) {
|
||||
private ImmutableList<FileSplit> loadFiles(FileCacheKey key) {
|
||||
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
|
||||
try {
|
||||
Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader());
|
||||
@ -278,12 +279,13 @@ public class HiveMetaStoreCache {
|
||||
jobConf.set("mapreduce.input.fileinputformat.input.dir.recursive", "true");
|
||||
FileInputFormat.setInputPaths(jobConf, finalLocation);
|
||||
try {
|
||||
FileSplit[] result;
|
||||
InputFormat<?, ?> inputFormat = HiveUtil.getInputFormat(jobConf, key.inputFormat, false);
|
||||
InputSplit[] splits;
|
||||
// TODO: This is a temp config, will remove it after the HiveSplitter is stable.
|
||||
if (key.useSelfSplitter) {
|
||||
splits = HiveSplitter.getHiveSplits(new Path(finalLocation), inputFormat, jobConf);
|
||||
result = HiveSplitter.getHiveSplits(new Path(finalLocation), inputFormat, jobConf);
|
||||
} else {
|
||||
InputSplit[] splits;
|
||||
String remoteUser = jobConf.get(HdfsResource.HADOOP_USER_NAME);
|
||||
if (!Strings.isNullOrEmpty(remoteUser)) {
|
||||
UserGroupInformation ugi = UserGroupInformation.createRemoteUser(remoteUser);
|
||||
@ -292,12 +294,18 @@ public class HiveMetaStoreCache {
|
||||
} else {
|
||||
splits = inputFormat.getSplits(jobConf, 0 /* use hdfs block size as default */);
|
||||
}
|
||||
result = new FileSplit[splits.length];
|
||||
// Convert the hadoop split to Doris Split.
|
||||
for (int i = 0; i < splits.length; i++) {
|
||||
org.apache.hadoop.mapred.FileSplit fs = ((org.apache.hadoop.mapred.FileSplit) splits[i]);
|
||||
result[i] = new FileSplit(fs.getPath(), fs.getStart(), fs.getLength(), -1, null);
|
||||
}
|
||||
}
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("load #{} files for {} in catalog {}", splits.length, key, catalog.getName());
|
||||
LOG.debug("load #{} splits for {} in catalog {}", result.length, key, catalog.getName());
|
||||
}
|
||||
return ImmutableList.copyOf(splits);
|
||||
return ImmutableList.copyOf(result);
|
||||
} catch (Exception e) {
|
||||
throw new CacheException("failed to get input splits for %s in catalog %s", e, key, catalog.getName());
|
||||
}
|
||||
@ -345,7 +353,7 @@ public class HiveMetaStoreCache {
|
||||
}
|
||||
}
|
||||
|
||||
public List<InputSplit> getFilesByPartitions(List<HivePartition> partitions, boolean useSelfSplitter) {
|
||||
public List<FileSplit> getFilesByPartitions(List<HivePartition> partitions, boolean useSelfSplitter) {
|
||||
long start = System.currentTimeMillis();
|
||||
List<FileCacheKey> keys = Lists.newArrayListWithExpectedSize(partitions.size());
|
||||
partitions.stream().forEach(p -> keys.add(new FileCacheKey(p.getPath(), p.getInputFormat(), useSelfSplitter)));
|
||||
@ -356,14 +364,14 @@ public class HiveMetaStoreCache {
|
||||
} else {
|
||||
stream = keys.parallelStream();
|
||||
}
|
||||
List<ImmutableList<InputSplit>> fileLists = stream.map(k -> {
|
||||
List<ImmutableList<FileSplit>> fileLists = stream.map(k -> {
|
||||
try {
|
||||
return fileCacheRef.get().get(k);
|
||||
} catch (ExecutionException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}).collect(Collectors.toList());
|
||||
List<InputSplit> retFiles = Lists.newArrayListWithExpectedSize(
|
||||
List<FileSplit> retFiles = Lists.newArrayListWithExpectedSize(
|
||||
fileLists.stream().mapToInt(l -> l.size()).sum());
|
||||
fileLists.stream().forEach(l -> retFiles.addAll(l));
|
||||
LOG.debug("get #{} files from #{} partitions in catalog {} cost: {} ms",
|
||||
@ -574,12 +582,12 @@ public class HiveMetaStoreCache {
|
||||
* @param fileMetaCacheTtlSecond
|
||||
* @return
|
||||
*/
|
||||
private CacheLoader<FileCacheKey, ImmutableList<InputSplit>> getGuavaCacheLoader(Executor executor,
|
||||
private CacheLoader<FileCacheKey, ImmutableList<FileSplit>> getGuavaCacheLoader(Executor executor,
|
||||
int fileMetaCacheTtlSecond) {
|
||||
CacheLoader<FileCacheKey, ImmutableList<InputSplit>> loader =
|
||||
new CacheLoader<FileCacheKey, ImmutableList<InputSplit>>() {
|
||||
CacheLoader<FileCacheKey, ImmutableList<FileSplit>> loader =
|
||||
new CacheLoader<FileCacheKey, ImmutableList<FileSplit>>() {
|
||||
@Override
|
||||
public ImmutableList<InputSplit> load(FileCacheKey key) throws Exception {
|
||||
public ImmutableList<FileSplit> load(FileCacheKey key) throws Exception {
|
||||
return loadFiles(key);
|
||||
}
|
||||
};
|
||||
@ -594,7 +602,7 @@ public class HiveMetaStoreCache {
|
||||
* get fileCache ref
|
||||
* @return
|
||||
*/
|
||||
public AtomicReference<LoadingCache<FileCacheKey, ImmutableList<InputSplit>>> getFileCacheRef() {
|
||||
public AtomicReference<LoadingCache<FileCacheKey, ImmutableList<FileSplit>>> getFileCacheRef() {
|
||||
return fileCacheRef;
|
||||
}
|
||||
|
||||
|
||||
@ -758,7 +758,7 @@ public class ExternalFileScanNode extends ExternalScanNode {
|
||||
for (TFileRangeDesc file : fileRangeDescs) {
|
||||
output.append(prefix).append(" ").append(file.getPath())
|
||||
.append(" start: ").append(file.getStartOffset())
|
||||
.append(" length: ").append(file.getFileSize())
|
||||
.append(" length: ").append(file.getSize())
|
||||
.append("\n");
|
||||
}
|
||||
} else {
|
||||
@ -766,7 +766,7 @@ public class ExternalFileScanNode extends ExternalScanNode {
|
||||
TFileRangeDesc file = fileRangeDescs.get(i);
|
||||
output.append(prefix).append(" ").append(file.getPath())
|
||||
.append(" start: ").append(file.getStartOffset())
|
||||
.append(" length: ").append(file.getFileSize())
|
||||
.append(" length: ").append(file.getSize())
|
||||
.append("\n");
|
||||
}
|
||||
int other = size - 4;
|
||||
@ -774,7 +774,7 @@ public class ExternalFileScanNode extends ExternalScanNode {
|
||||
TFileRangeDesc file = fileRangeDescs.get(size - 1);
|
||||
output.append(prefix).append(" ").append(file.getPath())
|
||||
.append(" start: ").append(file.getStartOffset())
|
||||
.append(" length: ").append(file.getFileSize())
|
||||
.append(" length: ").append(file.getSize())
|
||||
.append("\n");
|
||||
}
|
||||
}
|
||||
|
||||
@ -42,9 +42,7 @@ import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.LocatedFileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.RemoteIterator;
|
||||
import org.apache.hadoop.mapred.FileSplit;
|
||||
import org.apache.hadoop.mapred.InputFormat;
|
||||
import org.apache.hadoop.mapred.InputSplit;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
@ -138,20 +136,14 @@ public class HiveSplitter implements Splitter {
|
||||
|
||||
private void getFileSplitByPartitions(HiveMetaStoreCache cache, List<HivePartition> partitions,
|
||||
List<Split> allFiles, boolean useSelfSplitter) {
|
||||
List<InputSplit> files = cache.getFilesByPartitions(partitions, useSelfSplitter);
|
||||
List<FileSplit> files = cache.getFilesByPartitions(partitions, useSelfSplitter);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("get #{} files from #{} partitions: {}", files.size(), partitions.size(),
|
||||
Joiner.on(",")
|
||||
.join(files.stream().limit(10).map(f -> ((FileSplit) f).getPath())
|
||||
.collect(Collectors.toList())));
|
||||
}
|
||||
allFiles.addAll(files.stream().map(file -> {
|
||||
FileSplit fs = (FileSplit) file;
|
||||
org.apache.doris.planner.external.FileSplit split = new org.apache.doris.planner.external.FileSplit(
|
||||
fs.getPath(), fs.getStart(), fs.getLength(), -1, null
|
||||
);
|
||||
return split;
|
||||
}).collect(Collectors.toList()));
|
||||
allFiles.addAll(files);
|
||||
}
|
||||
|
||||
public int getTotalPartitionNum() {
|
||||
@ -163,24 +155,24 @@ public class HiveSplitter implements Splitter {
|
||||
}
|
||||
|
||||
// Get splits by using FileSystem API, the splits are blocks in HDFS or S3 like storage system.
|
||||
public static InputSplit[] getHiveSplits(Path path, InputFormat<?, ?> inputFormat,
|
||||
public static FileSplit[] getHiveSplits(Path path, InputFormat<?, ?> inputFormat,
|
||||
JobConf jobConf) throws IOException {
|
||||
FileSystem fs = path.getFileSystem(jobConf);
|
||||
boolean splittable = HiveUtil.isSplittable(inputFormat, fs, path);
|
||||
List<InputSplit> splits = Lists.newArrayList();
|
||||
List<FileSplit> splits = Lists.newArrayList();
|
||||
RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = fs.listFiles(path, true);
|
||||
if (!locatedFileStatusRemoteIterator.hasNext()) {
|
||||
LOG.debug("File status for path {} is empty.", path);
|
||||
return new InputSplit[0];
|
||||
return new FileSplit[0];
|
||||
}
|
||||
if (!splittable) {
|
||||
LOG.debug("Path {} is not splittable.", path);
|
||||
while (locatedFileStatusRemoteIterator.hasNext()) {
|
||||
LocatedFileStatus status = locatedFileStatusRemoteIterator.next();
|
||||
BlockLocation block = status.getBlockLocations()[0];
|
||||
splits.add(new FileSplit(status.getPath(), 0, status.getLen(), block.getHosts()));
|
||||
splits.add(new FileSplit(status.getPath(), 0, status.getLen(), status.getLen(), block.getHosts()));
|
||||
}
|
||||
return splits.toArray(new InputSplit[splits.size()]);
|
||||
return splits.toArray(new FileSplit[splits.size()]);
|
||||
}
|
||||
long splitSize = Config.file_split_size;
|
||||
boolean useDefaultBlockSize = (splitSize <= 0);
|
||||
@ -196,17 +188,17 @@ public class HiveSplitter implements Splitter {
|
||||
bytesRemaining -= splitSize) {
|
||||
int location = getBlockIndex(blockLocations, length - bytesRemaining);
|
||||
splits.add(new FileSplit(status.getPath(), length - bytesRemaining,
|
||||
splitSize, blockLocations[location].getHosts()));
|
||||
splitSize, length, blockLocations[location].getHosts()));
|
||||
}
|
||||
if (bytesRemaining != 0L) {
|
||||
int location = getBlockIndex(blockLocations, length - bytesRemaining);
|
||||
splits.add(new FileSplit(status.getPath(), length - bytesRemaining,
|
||||
bytesRemaining, blockLocations[location].getHosts()));
|
||||
bytesRemaining, length, blockLocations[location].getHosts()));
|
||||
}
|
||||
}
|
||||
|
||||
LOG.debug("Path {} includes {} splits.", path, splits.size());
|
||||
return splits.toArray(new InputSplit[splits.size()]);
|
||||
return splits.toArray(new FileSplit[splits.size()]);
|
||||
}
|
||||
|
||||
private static int getBlockIndex(BlockLocation[] blkLocations, long offset) {
|
||||
|
||||
@ -54,6 +54,7 @@ import org.apache.doris.mysql.privilege.Auth;
|
||||
import org.apache.doris.planner.ColumnBound;
|
||||
import org.apache.doris.planner.ListPartitionPrunerV2;
|
||||
import org.apache.doris.planner.PartitionPrunerV2Base.UniqueId;
|
||||
import org.apache.doris.planner.external.FileSplit;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.qe.ShowResultSet;
|
||||
import org.apache.doris.system.SystemInfoService;
|
||||
@ -66,7 +67,6 @@ import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Range;
|
||||
import com.google.common.collect.RangeMap;
|
||||
import org.apache.hadoop.mapred.InputSplit;
|
||||
import org.junit.Assert;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
@ -678,7 +678,7 @@ public class CatalogMgrTest extends TestWithFeService {
|
||||
|
||||
HMSExternalCatalog hiveCatalog = (HMSExternalCatalog) mgr.getCatalog(catalogName);
|
||||
HiveMetaStoreCache metaStoreCache = externalMetaCacheMgr.getMetaStoreCache(hiveCatalog);
|
||||
LoadingCache<FileCacheKey, ImmutableList<InputSplit>> preFileCache = metaStoreCache.getFileCacheRef().get();
|
||||
LoadingCache<FileCacheKey, ImmutableList<FileSplit>> preFileCache = metaStoreCache.getFileCacheRef().get();
|
||||
|
||||
|
||||
// 1. properties contains `file.meta.cache.ttl-second`, it should not be equal
|
||||
|
||||
Reference in New Issue
Block a user