diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java index 548e06d65b..39532a6785 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java @@ -36,6 +36,7 @@ import org.apache.doris.metric.MetricRepo; import org.apache.doris.planner.ColumnBound; import org.apache.doris.planner.ListPartitionPrunerV2; import org.apache.doris.planner.PartitionPrunerV2Base.UniqueId; +import org.apache.doris.planner.Split; import org.apache.doris.planner.external.FileSplit; import org.apache.doris.planner.external.HiveSplitter; @@ -44,7 +45,6 @@ import com.google.common.base.Strings; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; -import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Range; @@ -53,6 +53,8 @@ import com.google.common.collect.TreeRangeMap; import lombok.Data; import org.apache.commons.lang.math.NumberUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hive.metastore.api.Partition; @@ -97,7 +99,7 @@ public class HiveMetaStoreCache { // cache from -> private LoadingCache partitionCache; // the ref of cache from -> - private volatile AtomicReference>> fileCacheRef + private volatile AtomicReference> fileCacheRef = new AtomicReference<>(); public HiveMetaStoreCache(HMSExternalCatalog catalog, Executor executor) { @@ -148,10 +150,10 @@ public class HiveMetaStoreCache { } // if the file.meta.cache.ttl-second is equal 0, use the synchronous loader // if the file.meta.cache.ttl-second greater than 0, use the asynchronous loader - CacheLoader> loader = getGuavaCacheLoader(executor, + CacheLoader loader = getGuavaCacheLoader(executor, fileMetaCacheTtlSecond); - LoadingCache> preFileCache = fileCacheRef.get(); + LoadingCache preFileCache = fileCacheRef.get(); fileCacheRef.set(fileCacheBuilder.build(loader)); if (Objects.nonNull(preFileCache)) { @@ -262,7 +264,7 @@ public class HiveMetaStoreCache { return new HivePartition(sd.getInputFormat(), sd.getLocation(), key.values); } - private ImmutableList loadFiles(FileCacheKey key) { + private FileCacheValue loadFiles(FileCacheKey key) { ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); try { Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader()); @@ -279,11 +281,11 @@ public class HiveMetaStoreCache { jobConf.set("mapreduce.input.fileinputformat.input.dir.recursive", "true"); FileInputFormat.setInputPaths(jobConf, finalLocation); try { - FileSplit[] result; + FileCacheValue result; InputFormat inputFormat = HiveUtil.getInputFormat(jobConf, key.inputFormat, false); // TODO: This is a temp config, will remove it after the HiveSplitter is stable. if (key.useSelfSplitter) { - result = HiveSplitter.getHiveSplits(new Path(finalLocation), inputFormat, jobConf); + result = HiveSplitter.getFileCache(new Path(finalLocation), inputFormat, jobConf); } else { InputSplit[] splits; String remoteUser = jobConf.get(HdfsResource.HADOOP_USER_NAME); @@ -294,18 +296,18 @@ public class HiveMetaStoreCache { } else { splits = inputFormat.getSplits(jobConf, 0 /* use hdfs block size as default */); } - result = new FileSplit[splits.length]; + result = new FileCacheValue(); // Convert the hadoop split to Doris Split. for (int i = 0; i < splits.length; i++) { org.apache.hadoop.mapred.FileSplit fs = ((org.apache.hadoop.mapred.FileSplit) splits[i]); - result[i] = new FileSplit(fs.getPath(), fs.getStart(), fs.getLength(), -1, null); + result.addSplit(new FileSplit(fs.getPath(), fs.getStart(), fs.getLength(), -1, null)); } } if (LOG.isDebugEnabled()) { - LOG.debug("load #{} splits for {} in catalog {}", result.length, key, catalog.getName()); + LOG.debug("load #{} splits for {} in catalog {}", result.getFiles().size(), key, catalog.getName()); } - return ImmutableList.copyOf(result); + return result; } catch (Exception e) { throw new CacheException("failed to get input splits for %s in catalog %s", e, key, catalog.getName()); } @@ -353,7 +355,7 @@ public class HiveMetaStoreCache { } } - public List getFilesByPartitions(List partitions, boolean useSelfSplitter) { + public List getFilesByPartitions(List partitions, boolean useSelfSplitter) { long start = System.currentTimeMillis(); List keys = Lists.newArrayListWithExpectedSize(partitions.size()); partitions.stream().forEach(p -> keys.add(new FileCacheKey(p.getPath(), p.getInputFormat(), useSelfSplitter))); @@ -364,19 +366,18 @@ public class HiveMetaStoreCache { } else { stream = keys.parallelStream(); } - List> fileLists = stream.map(k -> { + List fileLists = stream.map(k -> { try { return fileCacheRef.get().get(k); } catch (ExecutionException e) { throw new RuntimeException(e); } }).collect(Collectors.toList()); - List retFiles = Lists.newArrayListWithExpectedSize( - fileLists.stream().mapToInt(l -> l.size()).sum()); - fileLists.stream().forEach(l -> retFiles.addAll(l)); LOG.debug("get #{} files from #{} partitions in catalog {} cost: {} ms", - retFiles.size(), partitions.size(), catalog.getName(), (System.currentTimeMillis() - start)); - return retFiles; + fileLists.stream().mapToInt(l -> l.getFiles() == null + ? (l.getSplits() == null ? 0 : l.getSplits().size()) : l.getFiles().size()).sum(), + partitions.size(), catalog.getName(), (System.currentTimeMillis() - start)); + return fileLists; } public List getAllPartitions(String dbName, String name, List> partitionValuesList) { @@ -582,12 +583,12 @@ public class HiveMetaStoreCache { * @param fileMetaCacheTtlSecond * @return */ - private CacheLoader> getGuavaCacheLoader(Executor executor, + private CacheLoader getGuavaCacheLoader(Executor executor, int fileMetaCacheTtlSecond) { - CacheLoader> loader = - new CacheLoader>() { + CacheLoader loader = + new CacheLoader() { @Override - public ImmutableList load(FileCacheKey key) throws Exception { + public FileCacheValue load(FileCacheKey key) throws Exception { return loadFiles(key); } }; @@ -602,7 +603,7 @@ public class HiveMetaStoreCache { * get fileCache ref * @return */ - public AtomicReference>> getFileCacheRef() { + public AtomicReference> getFileCacheRef() { return fileCacheRef; } @@ -694,7 +695,7 @@ public class HiveMetaStoreCache { public FileCacheKey(String location, String inputFormat) { this.location = location; this.inputFormat = inputFormat; - this.useSelfSplitter = false; + this.useSelfSplitter = true; } public FileCacheKey(String location, String inputFormat, boolean useSelfSplitter) { @@ -725,6 +726,42 @@ public class HiveMetaStoreCache { } } + @Data + public static class FileCacheValue { + // File Cache for self splitter. + private List files; + // File split cache for old splitter. This is a temp variable. + private List splits; + private boolean isSplittable; + + public void addFile(LocatedFileStatus file) { + if (files == null) { + files = Lists.newArrayList(); + } + HiveFileStatus status = new HiveFileStatus(); + status.setBlockLocations(file.getBlockLocations()); + status.setPath(file.getPath()); + status.length = file.getLen(); + status.blockSize = file.getBlockSize(); + files.add(status); + } + + public void addSplit(Split split) { + if (splits == null) { + splits = Lists.newArrayList(); + } + splits.add(split); + } + } + + @Data + public static class HiveFileStatus { + BlockLocation[] blockLocations; + Path path; + long length; + long blockSize; + } + @Data public static class HivePartitionValues { private long nextPartitionId; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplitter.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplitter.java index 5a3af95c6d..b17704251a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplitter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplitter.java @@ -35,7 +35,6 @@ import org.apache.doris.planner.Split; import org.apache.doris.planner.Splitter; import org.apache.doris.qe.ConnectContext; -import com.google.common.base.Joiner; import com.google.common.collect.Lists; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileSystem; @@ -51,7 +50,6 @@ import java.io.IOException; import java.util.Collection; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; public class HiveSplitter implements Splitter { @@ -81,11 +79,11 @@ public class HiveSplitter implements Splitter { partitionColumnTypes); } Map properties = hmsTable.getCatalog().getCatalogProperty().getProperties(); - boolean useSelfSplitter = false; + boolean useSelfSplitter = true; if (properties.containsKey(HMSExternalCatalog.ENABLE_SELF_SPLITTER) - && properties.get(HMSExternalCatalog.ENABLE_SELF_SPLITTER).equalsIgnoreCase("true")) { + && properties.get(HMSExternalCatalog.ENABLE_SELF_SPLITTER).equalsIgnoreCase("false")) { LOG.debug("Using self splitter for hmsTable {}", hmsTable.getName()); - useSelfSplitter = true; + useSelfSplitter = false; } List allFiles = Lists.newArrayList(); @@ -135,15 +133,53 @@ public class HiveSplitter implements Splitter { } private void getFileSplitByPartitions(HiveMetaStoreCache cache, List partitions, - List allFiles, boolean useSelfSplitter) { - List files = cache.getFilesByPartitions(partitions, useSelfSplitter); - if (LOG.isDebugEnabled()) { - LOG.debug("get #{} files from #{} partitions: {}", files.size(), partitions.size(), - Joiner.on(",") - .join(files.stream().limit(10).map(f -> ((FileSplit) f).getPath()) - .collect(Collectors.toList()))); + List allFiles, boolean useSelfSplitter) throws IOException { + for (HiveMetaStoreCache.FileCacheValue fileCacheValue : + cache.getFilesByPartitions(partitions, useSelfSplitter)) { + if (fileCacheValue.getSplits() != null) { + allFiles.addAll(fileCacheValue.getSplits()); + } + if (fileCacheValue.getFiles() != null) { + boolean isSplittable = fileCacheValue.isSplittable(); + for (HiveMetaStoreCache.HiveFileStatus status : fileCacheValue.getFiles()) { + allFiles.addAll(splitFile(status, isSplittable)); + } + } } - allFiles.addAll(files); + } + + private List splitFile(HiveMetaStoreCache.HiveFileStatus status, boolean splittable) throws IOException { + List result = Lists.newArrayList(); + if (!splittable) { + LOG.debug("Path {} is not splittable.", status.getPath()); + BlockLocation block = status.getBlockLocations()[0]; + result.add(new FileSplit(status.getPath(), 0, status.getLength(), + status.getLength(), block.getHosts())); + return result; + } + long splitSize = ConnectContext.get().getSessionVariable().getFileSplitSize(); + if (splitSize <= 0) { + splitSize = status.getBlockSize(); + } + // Min split size is DEFAULT_SPLIT_SIZE(128MB). + splitSize = splitSize > DEFAULT_SPLIT_SIZE ? splitSize : DEFAULT_SPLIT_SIZE; + BlockLocation[] blockLocations = status.getBlockLocations(); + long length = status.getLength(); + long bytesRemaining; + for (bytesRemaining = length; (double) bytesRemaining / (double) splitSize > 1.1D; + bytesRemaining -= splitSize) { + int location = getBlockIndex(blockLocations, length - bytesRemaining); + result.add(new FileSplit(status.getPath(), length - bytesRemaining, + splitSize, length, blockLocations[location].getHosts())); + } + if (bytesRemaining != 0L) { + int location = getBlockIndex(blockLocations, length - bytesRemaining); + result.add(new FileSplit(status.getPath(), length - bytesRemaining, + bytesRemaining, length, blockLocations[location].getHosts())); + } + + LOG.debug("Path {} includes {} splits.", status.getPath(), result.size()); + return result; } public int getTotalPartitionNum() { @@ -154,52 +190,18 @@ public class HiveSplitter implements Splitter { return readPartitionNum; } - // Get splits by using FileSystem API, the splits are blocks in HDFS or S3 like storage system. - public static FileSplit[] getHiveSplits(Path path, InputFormat inputFormat, - JobConf jobConf) throws IOException { + // Get File Status by using FileSystem API. + public static HiveMetaStoreCache.FileCacheValue getFileCache(Path path, InputFormat inputFormat, + JobConf jobConf) throws IOException { FileSystem fs = path.getFileSystem(jobConf); boolean splittable = HiveUtil.isSplittable(inputFormat, fs, path); - List splits = Lists.newArrayList(); RemoteIterator locatedFileStatusRemoteIterator = fs.listFiles(path, true); - if (!locatedFileStatusRemoteIterator.hasNext()) { - LOG.debug("File status for path {} is empty.", path); - return new FileSplit[0]; - } - if (!splittable) { - LOG.debug("Path {} is not splittable.", path); - while (locatedFileStatusRemoteIterator.hasNext()) { - LocatedFileStatus status = locatedFileStatusRemoteIterator.next(); - BlockLocation block = status.getBlockLocations()[0]; - splits.add(new FileSplit(status.getPath(), 0, status.getLen(), status.getLen(), block.getHosts())); - } - return splits.toArray(new FileSplit[splits.size()]); - } - long splitSize = ConnectContext.get().getSessionVariable().getFileSplitSize(); + HiveMetaStoreCache.FileCacheValue result = new HiveMetaStoreCache.FileCacheValue(); + result.setSplittable(splittable); while (locatedFileStatusRemoteIterator.hasNext()) { - LocatedFileStatus status = locatedFileStatusRemoteIterator.next(); - if (splitSize <= 0) { - splitSize = status.getBlockSize(); - } - // Min split size is DEFAULT_SPLIT_SIZE(128MB). - splitSize = splitSize > DEFAULT_SPLIT_SIZE ? splitSize : DEFAULT_SPLIT_SIZE; - BlockLocation[] blockLocations = status.getBlockLocations(); - long length = status.getLen(); - long bytesRemaining; - for (bytesRemaining = length; (double) bytesRemaining / (double) splitSize > 1.1D; - bytesRemaining -= splitSize) { - int location = getBlockIndex(blockLocations, length - bytesRemaining); - splits.add(new FileSplit(status.getPath(), length - bytesRemaining, - splitSize, length, blockLocations[location].getHosts())); - } - if (bytesRemaining != 0L) { - int location = getBlockIndex(blockLocations, length - bytesRemaining); - splits.add(new FileSplit(status.getPath(), length - bytesRemaining, - bytesRemaining, length, blockLocations[location].getHosts())); - } + result.addFile(locatedFileStatusRemoteIterator.next()); } - - LOG.debug("Path {} includes {} splits.", path, splits.size()); - return splits.toArray(new FileSplit[splits.size()]); + return result; } private static int getBlockIndex(BlockLocation[] blkLocations, long offset) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/CatalogMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/CatalogMgrTest.java index 67416619b0..10692a5f68 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/CatalogMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/CatalogMgrTest.java @@ -54,7 +54,6 @@ import org.apache.doris.mysql.privilege.Auth; import org.apache.doris.planner.ColumnBound; import org.apache.doris.planner.ListPartitionPrunerV2; import org.apache.doris.planner.PartitionPrunerV2Base.UniqueId; -import org.apache.doris.planner.external.FileSplit; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.ShowResultSet; import org.apache.doris.system.SystemInfoService; @@ -62,7 +61,6 @@ import org.apache.doris.utframe.TestWithFeService; import com.google.common.base.Preconditions; import com.google.common.cache.LoadingCache; -import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Range; @@ -678,7 +676,7 @@ public class CatalogMgrTest extends TestWithFeService { HMSExternalCatalog hiveCatalog = (HMSExternalCatalog) mgr.getCatalog(catalogName); HiveMetaStoreCache metaStoreCache = externalMetaCacheMgr.getMetaStoreCache(hiveCatalog); - LoadingCache> preFileCache = metaStoreCache.getFileCacheRef().get(); + LoadingCache preFileCache = metaStoreCache.getFileCacheRef().get(); // 1. properties contains `file.meta.cache.ttl-second`, it should not be equal