diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java index d963b52c7f..548e06d65b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java @@ -36,6 +36,7 @@ import org.apache.doris.metric.MetricRepo; import org.apache.doris.planner.ColumnBound; import org.apache.doris.planner.ListPartitionPrunerV2; import org.apache.doris.planner.PartitionPrunerV2Base.UniqueId; +import org.apache.doris.planner.external.FileSplit; import org.apache.doris.planner.external.HiveSplitter; import com.google.common.base.Preconditions; @@ -96,7 +97,7 @@ public class HiveMetaStoreCache { // cache from -> private LoadingCache partitionCache; // the ref of cache from -> - private volatile AtomicReference>> fileCacheRef + private volatile AtomicReference>> fileCacheRef = new AtomicReference<>(); public HiveMetaStoreCache(HMSExternalCatalog catalog, Executor executor) { @@ -147,10 +148,10 @@ public class HiveMetaStoreCache { } // if the file.meta.cache.ttl-second is equal 0, use the synchronous loader // if the file.meta.cache.ttl-second greater than 0, use the asynchronous loader - CacheLoader> loader = getGuavaCacheLoader(executor, + CacheLoader> loader = getGuavaCacheLoader(executor, fileMetaCacheTtlSecond); - LoadingCache> preFileCache = fileCacheRef.get(); + LoadingCache> preFileCache = fileCacheRef.get(); fileCacheRef.set(fileCacheBuilder.build(loader)); if (Objects.nonNull(preFileCache)) { @@ -261,7 +262,7 @@ public class HiveMetaStoreCache { return new HivePartition(sd.getInputFormat(), sd.getLocation(), key.values); } - private ImmutableList loadFiles(FileCacheKey key) { + private ImmutableList loadFiles(FileCacheKey key) { ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); try { Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader()); @@ -278,12 +279,13 @@ public class HiveMetaStoreCache { jobConf.set("mapreduce.input.fileinputformat.input.dir.recursive", "true"); FileInputFormat.setInputPaths(jobConf, finalLocation); try { + FileSplit[] result; InputFormat inputFormat = HiveUtil.getInputFormat(jobConf, key.inputFormat, false); - InputSplit[] splits; // TODO: This is a temp config, will remove it after the HiveSplitter is stable. if (key.useSelfSplitter) { - splits = HiveSplitter.getHiveSplits(new Path(finalLocation), inputFormat, jobConf); + result = HiveSplitter.getHiveSplits(new Path(finalLocation), inputFormat, jobConf); } else { + InputSplit[] splits; String remoteUser = jobConf.get(HdfsResource.HADOOP_USER_NAME); if (!Strings.isNullOrEmpty(remoteUser)) { UserGroupInformation ugi = UserGroupInformation.createRemoteUser(remoteUser); @@ -292,12 +294,18 @@ public class HiveMetaStoreCache { } else { splits = inputFormat.getSplits(jobConf, 0 /* use hdfs block size as default */); } + result = new FileSplit[splits.length]; + // Convert the hadoop split to Doris Split. + for (int i = 0; i < splits.length; i++) { + org.apache.hadoop.mapred.FileSplit fs = ((org.apache.hadoop.mapred.FileSplit) splits[i]); + result[i] = new FileSplit(fs.getPath(), fs.getStart(), fs.getLength(), -1, null); + } } if (LOG.isDebugEnabled()) { - LOG.debug("load #{} files for {} in catalog {}", splits.length, key, catalog.getName()); + LOG.debug("load #{} splits for {} in catalog {}", result.length, key, catalog.getName()); } - return ImmutableList.copyOf(splits); + return ImmutableList.copyOf(result); } catch (Exception e) { throw new CacheException("failed to get input splits for %s in catalog %s", e, key, catalog.getName()); } @@ -345,7 +353,7 @@ public class HiveMetaStoreCache { } } - public List getFilesByPartitions(List partitions, boolean useSelfSplitter) { + public List getFilesByPartitions(List partitions, boolean useSelfSplitter) { long start = System.currentTimeMillis(); List keys = Lists.newArrayListWithExpectedSize(partitions.size()); partitions.stream().forEach(p -> keys.add(new FileCacheKey(p.getPath(), p.getInputFormat(), useSelfSplitter))); @@ -356,14 +364,14 @@ public class HiveMetaStoreCache { } else { stream = keys.parallelStream(); } - List> fileLists = stream.map(k -> { + List> fileLists = stream.map(k -> { try { return fileCacheRef.get().get(k); } catch (ExecutionException e) { throw new RuntimeException(e); } }).collect(Collectors.toList()); - List retFiles = Lists.newArrayListWithExpectedSize( + List retFiles = Lists.newArrayListWithExpectedSize( fileLists.stream().mapToInt(l -> l.size()).sum()); fileLists.stream().forEach(l -> retFiles.addAll(l)); LOG.debug("get #{} files from #{} partitions in catalog {} cost: {} ms", @@ -574,12 +582,12 @@ public class HiveMetaStoreCache { * @param fileMetaCacheTtlSecond * @return */ - private CacheLoader> getGuavaCacheLoader(Executor executor, + private CacheLoader> getGuavaCacheLoader(Executor executor, int fileMetaCacheTtlSecond) { - CacheLoader> loader = - new CacheLoader>() { + CacheLoader> loader = + new CacheLoader>() { @Override - public ImmutableList load(FileCacheKey key) throws Exception { + public ImmutableList load(FileCacheKey key) throws Exception { return loadFiles(key); } }; @@ -594,7 +602,7 @@ public class HiveMetaStoreCache { * get fileCache ref * @return */ - public AtomicReference>> getFileCacheRef() { + public AtomicReference>> getFileCacheRef() { return fileCacheRef; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java index e22025800d..6c6089b16c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java @@ -758,7 +758,7 @@ public class ExternalFileScanNode extends ExternalScanNode { for (TFileRangeDesc file : fileRangeDescs) { output.append(prefix).append(" ").append(file.getPath()) .append(" start: ").append(file.getStartOffset()) - .append(" length: ").append(file.getFileSize()) + .append(" length: ").append(file.getSize()) .append("\n"); } } else { @@ -766,7 +766,7 @@ public class ExternalFileScanNode extends ExternalScanNode { TFileRangeDesc file = fileRangeDescs.get(i); output.append(prefix).append(" ").append(file.getPath()) .append(" start: ").append(file.getStartOffset()) - .append(" length: ").append(file.getFileSize()) + .append(" length: ").append(file.getSize()) .append("\n"); } int other = size - 4; @@ -774,7 +774,7 @@ public class ExternalFileScanNode extends ExternalScanNode { TFileRangeDesc file = fileRangeDescs.get(size - 1); output.append(prefix).append(" ").append(file.getPath()) .append(" start: ").append(file.getStartOffset()) - .append(" length: ").append(file.getFileSize()) + .append(" length: ").append(file.getSize()) .append("\n"); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplitter.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplitter.java index f7f09b6da6..9c8dec303b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplitter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplitter.java @@ -42,9 +42,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; -import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputFormat; -import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -138,20 +136,14 @@ public class HiveSplitter implements Splitter { private void getFileSplitByPartitions(HiveMetaStoreCache cache, List partitions, List allFiles, boolean useSelfSplitter) { - List files = cache.getFilesByPartitions(partitions, useSelfSplitter); + List files = cache.getFilesByPartitions(partitions, useSelfSplitter); if (LOG.isDebugEnabled()) { LOG.debug("get #{} files from #{} partitions: {}", files.size(), partitions.size(), Joiner.on(",") .join(files.stream().limit(10).map(f -> ((FileSplit) f).getPath()) .collect(Collectors.toList()))); } - allFiles.addAll(files.stream().map(file -> { - FileSplit fs = (FileSplit) file; - org.apache.doris.planner.external.FileSplit split = new org.apache.doris.planner.external.FileSplit( - fs.getPath(), fs.getStart(), fs.getLength(), -1, null - ); - return split; - }).collect(Collectors.toList())); + allFiles.addAll(files); } public int getTotalPartitionNum() { @@ -163,24 +155,24 @@ public class HiveSplitter implements Splitter { } // Get splits by using FileSystem API, the splits are blocks in HDFS or S3 like storage system. - public static InputSplit[] getHiveSplits(Path path, InputFormat inputFormat, + public static FileSplit[] getHiveSplits(Path path, InputFormat inputFormat, JobConf jobConf) throws IOException { FileSystem fs = path.getFileSystem(jobConf); boolean splittable = HiveUtil.isSplittable(inputFormat, fs, path); - List splits = Lists.newArrayList(); + List splits = Lists.newArrayList(); RemoteIterator locatedFileStatusRemoteIterator = fs.listFiles(path, true); if (!locatedFileStatusRemoteIterator.hasNext()) { LOG.debug("File status for path {} is empty.", path); - return new InputSplit[0]; + return new FileSplit[0]; } if (!splittable) { LOG.debug("Path {} is not splittable.", path); while (locatedFileStatusRemoteIterator.hasNext()) { LocatedFileStatus status = locatedFileStatusRemoteIterator.next(); BlockLocation block = status.getBlockLocations()[0]; - splits.add(new FileSplit(status.getPath(), 0, status.getLen(), block.getHosts())); + splits.add(new FileSplit(status.getPath(), 0, status.getLen(), status.getLen(), block.getHosts())); } - return splits.toArray(new InputSplit[splits.size()]); + return splits.toArray(new FileSplit[splits.size()]); } long splitSize = Config.file_split_size; boolean useDefaultBlockSize = (splitSize <= 0); @@ -196,17 +188,17 @@ public class HiveSplitter implements Splitter { bytesRemaining -= splitSize) { int location = getBlockIndex(blockLocations, length - bytesRemaining); splits.add(new FileSplit(status.getPath(), length - bytesRemaining, - splitSize, blockLocations[location].getHosts())); + splitSize, length, blockLocations[location].getHosts())); } if (bytesRemaining != 0L) { int location = getBlockIndex(blockLocations, length - bytesRemaining); splits.add(new FileSplit(status.getPath(), length - bytesRemaining, - bytesRemaining, blockLocations[location].getHosts())); + bytesRemaining, length, blockLocations[location].getHosts())); } } LOG.debug("Path {} includes {} splits.", path, splits.size()); - return splits.toArray(new InputSplit[splits.size()]); + return splits.toArray(new FileSplit[splits.size()]); } private static int getBlockIndex(BlockLocation[] blkLocations, long offset) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/CatalogMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/CatalogMgrTest.java index c93be4a3ac..67416619b0 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/CatalogMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/CatalogMgrTest.java @@ -54,6 +54,7 @@ import org.apache.doris.mysql.privilege.Auth; import org.apache.doris.planner.ColumnBound; import org.apache.doris.planner.ListPartitionPrunerV2; import org.apache.doris.planner.PartitionPrunerV2Base.UniqueId; +import org.apache.doris.planner.external.FileSplit; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.ShowResultSet; import org.apache.doris.system.SystemInfoService; @@ -66,7 +67,6 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Range; import com.google.common.collect.RangeMap; -import org.apache.hadoop.mapred.InputSplit; import org.junit.Assert; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -678,7 +678,7 @@ public class CatalogMgrTest extends TestWithFeService { HMSExternalCatalog hiveCatalog = (HMSExternalCatalog) mgr.getCatalog(catalogName); HiveMetaStoreCache metaStoreCache = externalMetaCacheMgr.getMetaStoreCache(hiveCatalog); - LoadingCache> preFileCache = metaStoreCache.getFileCacheRef().get(); + LoadingCache> preFileCache = metaStoreCache.getFileCacheRef().get(); // 1. properties contains `file.meta.cache.ttl-second`, it should not be equal