From 22deeecbe180d13a23ecb9876f0078be6cd631db Mon Sep 17 00:00:00 2001 From: Jibing-Li <64681310+Jibing-Li@users.noreply.github.com> Date: Fri, 7 Apr 2023 00:07:23 +0800 Subject: [PATCH] [Improvement](multi catalog)Cache File for Hive Table, instead of cache file splits. (#18419) Currently, the session variable for Split size will not take effect after the file splits are cached. 1. This PR is to cache file for Hive Table, instead of cache file splits. And split the file every time using the current split size. 2. Use self splitter by default. --- .../datasource/hive/HiveMetaStoreCache.java | 85 ++++++++++---- .../doris/planner/external/HiveSplitter.java | 110 +++++++++--------- .../doris/datasource/CatalogMgrTest.java | 4 +- 3 files changed, 118 insertions(+), 81 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java index 548e06d65b..39532a6785 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java @@ -36,6 +36,7 @@ import org.apache.doris.metric.MetricRepo; import org.apache.doris.planner.ColumnBound; import org.apache.doris.planner.ListPartitionPrunerV2; import org.apache.doris.planner.PartitionPrunerV2Base.UniqueId; +import org.apache.doris.planner.Split; import org.apache.doris.planner.external.FileSplit; import org.apache.doris.planner.external.HiveSplitter; @@ -44,7 +45,6 @@ import com.google.common.base.Strings; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; -import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Range; @@ -53,6 +53,8 @@ import com.google.common.collect.TreeRangeMap; import lombok.Data; import org.apache.commons.lang.math.NumberUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hive.metastore.api.Partition; @@ -97,7 +99,7 @@ public class HiveMetaStoreCache { // cache from -> private LoadingCache partitionCache; // the ref of cache from -> - private volatile AtomicReference>> fileCacheRef + private volatile AtomicReference> fileCacheRef = new AtomicReference<>(); public HiveMetaStoreCache(HMSExternalCatalog catalog, Executor executor) { @@ -148,10 +150,10 @@ public class HiveMetaStoreCache { } // if the file.meta.cache.ttl-second is equal 0, use the synchronous loader // if the file.meta.cache.ttl-second greater than 0, use the asynchronous loader - CacheLoader> loader = getGuavaCacheLoader(executor, + CacheLoader loader = getGuavaCacheLoader(executor, fileMetaCacheTtlSecond); - LoadingCache> preFileCache = fileCacheRef.get(); + LoadingCache preFileCache = fileCacheRef.get(); fileCacheRef.set(fileCacheBuilder.build(loader)); if (Objects.nonNull(preFileCache)) { @@ -262,7 +264,7 @@ public class HiveMetaStoreCache { return new HivePartition(sd.getInputFormat(), sd.getLocation(), key.values); } - private ImmutableList loadFiles(FileCacheKey key) { + private FileCacheValue loadFiles(FileCacheKey key) { ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); try { Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader()); @@ -279,11 +281,11 @@ public class HiveMetaStoreCache { jobConf.set("mapreduce.input.fileinputformat.input.dir.recursive", "true"); FileInputFormat.setInputPaths(jobConf, finalLocation); try { - FileSplit[] result; + FileCacheValue result; InputFormat inputFormat = HiveUtil.getInputFormat(jobConf, key.inputFormat, false); // TODO: This is a temp config, will remove it after the HiveSplitter is stable. if (key.useSelfSplitter) { - result = HiveSplitter.getHiveSplits(new Path(finalLocation), inputFormat, jobConf); + result = HiveSplitter.getFileCache(new Path(finalLocation), inputFormat, jobConf); } else { InputSplit[] splits; String remoteUser = jobConf.get(HdfsResource.HADOOP_USER_NAME); @@ -294,18 +296,18 @@ public class HiveMetaStoreCache { } else { splits = inputFormat.getSplits(jobConf, 0 /* use hdfs block size as default */); } - result = new FileSplit[splits.length]; + result = new FileCacheValue(); // Convert the hadoop split to Doris Split. for (int i = 0; i < splits.length; i++) { org.apache.hadoop.mapred.FileSplit fs = ((org.apache.hadoop.mapred.FileSplit) splits[i]); - result[i] = new FileSplit(fs.getPath(), fs.getStart(), fs.getLength(), -1, null); + result.addSplit(new FileSplit(fs.getPath(), fs.getStart(), fs.getLength(), -1, null)); } } if (LOG.isDebugEnabled()) { - LOG.debug("load #{} splits for {} in catalog {}", result.length, key, catalog.getName()); + LOG.debug("load #{} splits for {} in catalog {}", result.getFiles().size(), key, catalog.getName()); } - return ImmutableList.copyOf(result); + return result; } catch (Exception e) { throw new CacheException("failed to get input splits for %s in catalog %s", e, key, catalog.getName()); } @@ -353,7 +355,7 @@ public class HiveMetaStoreCache { } } - public List getFilesByPartitions(List partitions, boolean useSelfSplitter) { + public List getFilesByPartitions(List partitions, boolean useSelfSplitter) { long start = System.currentTimeMillis(); List keys = Lists.newArrayListWithExpectedSize(partitions.size()); partitions.stream().forEach(p -> keys.add(new FileCacheKey(p.getPath(), p.getInputFormat(), useSelfSplitter))); @@ -364,19 +366,18 @@ public class HiveMetaStoreCache { } else { stream = keys.parallelStream(); } - List> fileLists = stream.map(k -> { + List fileLists = stream.map(k -> { try { return fileCacheRef.get().get(k); } catch (ExecutionException e) { throw new RuntimeException(e); } }).collect(Collectors.toList()); - List retFiles = Lists.newArrayListWithExpectedSize( - fileLists.stream().mapToInt(l -> l.size()).sum()); - fileLists.stream().forEach(l -> retFiles.addAll(l)); LOG.debug("get #{} files from #{} partitions in catalog {} cost: {} ms", - retFiles.size(), partitions.size(), catalog.getName(), (System.currentTimeMillis() - start)); - return retFiles; + fileLists.stream().mapToInt(l -> l.getFiles() == null + ? (l.getSplits() == null ? 0 : l.getSplits().size()) : l.getFiles().size()).sum(), + partitions.size(), catalog.getName(), (System.currentTimeMillis() - start)); + return fileLists; } public List getAllPartitions(String dbName, String name, List> partitionValuesList) { @@ -582,12 +583,12 @@ public class HiveMetaStoreCache { * @param fileMetaCacheTtlSecond * @return */ - private CacheLoader> getGuavaCacheLoader(Executor executor, + private CacheLoader getGuavaCacheLoader(Executor executor, int fileMetaCacheTtlSecond) { - CacheLoader> loader = - new CacheLoader>() { + CacheLoader loader = + new CacheLoader() { @Override - public ImmutableList load(FileCacheKey key) throws Exception { + public FileCacheValue load(FileCacheKey key) throws Exception { return loadFiles(key); } }; @@ -602,7 +603,7 @@ public class HiveMetaStoreCache { * get fileCache ref * @return */ - public AtomicReference>> getFileCacheRef() { + public AtomicReference> getFileCacheRef() { return fileCacheRef; } @@ -694,7 +695,7 @@ public class HiveMetaStoreCache { public FileCacheKey(String location, String inputFormat) { this.location = location; this.inputFormat = inputFormat; - this.useSelfSplitter = false; + this.useSelfSplitter = true; } public FileCacheKey(String location, String inputFormat, boolean useSelfSplitter) { @@ -725,6 +726,42 @@ public class HiveMetaStoreCache { } } + @Data + public static class FileCacheValue { + // File Cache for self splitter. + private List files; + // File split cache for old splitter. This is a temp variable. + private List splits; + private boolean isSplittable; + + public void addFile(LocatedFileStatus file) { + if (files == null) { + files = Lists.newArrayList(); + } + HiveFileStatus status = new HiveFileStatus(); + status.setBlockLocations(file.getBlockLocations()); + status.setPath(file.getPath()); + status.length = file.getLen(); + status.blockSize = file.getBlockSize(); + files.add(status); + } + + public void addSplit(Split split) { + if (splits == null) { + splits = Lists.newArrayList(); + } + splits.add(split); + } + } + + @Data + public static class HiveFileStatus { + BlockLocation[] blockLocations; + Path path; + long length; + long blockSize; + } + @Data public static class HivePartitionValues { private long nextPartitionId; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplitter.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplitter.java index 5a3af95c6d..b17704251a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplitter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplitter.java @@ -35,7 +35,6 @@ import org.apache.doris.planner.Split; import org.apache.doris.planner.Splitter; import org.apache.doris.qe.ConnectContext; -import com.google.common.base.Joiner; import com.google.common.collect.Lists; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileSystem; @@ -51,7 +50,6 @@ import java.io.IOException; import java.util.Collection; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; public class HiveSplitter implements Splitter { @@ -81,11 +79,11 @@ public class HiveSplitter implements Splitter { partitionColumnTypes); } Map properties = hmsTable.getCatalog().getCatalogProperty().getProperties(); - boolean useSelfSplitter = false; + boolean useSelfSplitter = true; if (properties.containsKey(HMSExternalCatalog.ENABLE_SELF_SPLITTER) - && properties.get(HMSExternalCatalog.ENABLE_SELF_SPLITTER).equalsIgnoreCase("true")) { + && properties.get(HMSExternalCatalog.ENABLE_SELF_SPLITTER).equalsIgnoreCase("false")) { LOG.debug("Using self splitter for hmsTable {}", hmsTable.getName()); - useSelfSplitter = true; + useSelfSplitter = false; } List allFiles = Lists.newArrayList(); @@ -135,15 +133,53 @@ public class HiveSplitter implements Splitter { } private void getFileSplitByPartitions(HiveMetaStoreCache cache, List partitions, - List allFiles, boolean useSelfSplitter) { - List files = cache.getFilesByPartitions(partitions, useSelfSplitter); - if (LOG.isDebugEnabled()) { - LOG.debug("get #{} files from #{} partitions: {}", files.size(), partitions.size(), - Joiner.on(",") - .join(files.stream().limit(10).map(f -> ((FileSplit) f).getPath()) - .collect(Collectors.toList()))); + List allFiles, boolean useSelfSplitter) throws IOException { + for (HiveMetaStoreCache.FileCacheValue fileCacheValue : + cache.getFilesByPartitions(partitions, useSelfSplitter)) { + if (fileCacheValue.getSplits() != null) { + allFiles.addAll(fileCacheValue.getSplits()); + } + if (fileCacheValue.getFiles() != null) { + boolean isSplittable = fileCacheValue.isSplittable(); + for (HiveMetaStoreCache.HiveFileStatus status : fileCacheValue.getFiles()) { + allFiles.addAll(splitFile(status, isSplittable)); + } + } } - allFiles.addAll(files); + } + + private List splitFile(HiveMetaStoreCache.HiveFileStatus status, boolean splittable) throws IOException { + List result = Lists.newArrayList(); + if (!splittable) { + LOG.debug("Path {} is not splittable.", status.getPath()); + BlockLocation block = status.getBlockLocations()[0]; + result.add(new FileSplit(status.getPath(), 0, status.getLength(), + status.getLength(), block.getHosts())); + return result; + } + long splitSize = ConnectContext.get().getSessionVariable().getFileSplitSize(); + if (splitSize <= 0) { + splitSize = status.getBlockSize(); + } + // Min split size is DEFAULT_SPLIT_SIZE(128MB). + splitSize = splitSize > DEFAULT_SPLIT_SIZE ? splitSize : DEFAULT_SPLIT_SIZE; + BlockLocation[] blockLocations = status.getBlockLocations(); + long length = status.getLength(); + long bytesRemaining; + for (bytesRemaining = length; (double) bytesRemaining / (double) splitSize > 1.1D; + bytesRemaining -= splitSize) { + int location = getBlockIndex(blockLocations, length - bytesRemaining); + result.add(new FileSplit(status.getPath(), length - bytesRemaining, + splitSize, length, blockLocations[location].getHosts())); + } + if (bytesRemaining != 0L) { + int location = getBlockIndex(blockLocations, length - bytesRemaining); + result.add(new FileSplit(status.getPath(), length - bytesRemaining, + bytesRemaining, length, blockLocations[location].getHosts())); + } + + LOG.debug("Path {} includes {} splits.", status.getPath(), result.size()); + return result; } public int getTotalPartitionNum() { @@ -154,52 +190,18 @@ public class HiveSplitter implements Splitter { return readPartitionNum; } - // Get splits by using FileSystem API, the splits are blocks in HDFS or S3 like storage system. - public static FileSplit[] getHiveSplits(Path path, InputFormat inputFormat, - JobConf jobConf) throws IOException { + // Get File Status by using FileSystem API. + public static HiveMetaStoreCache.FileCacheValue getFileCache(Path path, InputFormat inputFormat, + JobConf jobConf) throws IOException { FileSystem fs = path.getFileSystem(jobConf); boolean splittable = HiveUtil.isSplittable(inputFormat, fs, path); - List splits = Lists.newArrayList(); RemoteIterator locatedFileStatusRemoteIterator = fs.listFiles(path, true); - if (!locatedFileStatusRemoteIterator.hasNext()) { - LOG.debug("File status for path {} is empty.", path); - return new FileSplit[0]; - } - if (!splittable) { - LOG.debug("Path {} is not splittable.", path); - while (locatedFileStatusRemoteIterator.hasNext()) { - LocatedFileStatus status = locatedFileStatusRemoteIterator.next(); - BlockLocation block = status.getBlockLocations()[0]; - splits.add(new FileSplit(status.getPath(), 0, status.getLen(), status.getLen(), block.getHosts())); - } - return splits.toArray(new FileSplit[splits.size()]); - } - long splitSize = ConnectContext.get().getSessionVariable().getFileSplitSize(); + HiveMetaStoreCache.FileCacheValue result = new HiveMetaStoreCache.FileCacheValue(); + result.setSplittable(splittable); while (locatedFileStatusRemoteIterator.hasNext()) { - LocatedFileStatus status = locatedFileStatusRemoteIterator.next(); - if (splitSize <= 0) { - splitSize = status.getBlockSize(); - } - // Min split size is DEFAULT_SPLIT_SIZE(128MB). - splitSize = splitSize > DEFAULT_SPLIT_SIZE ? splitSize : DEFAULT_SPLIT_SIZE; - BlockLocation[] blockLocations = status.getBlockLocations(); - long length = status.getLen(); - long bytesRemaining; - for (bytesRemaining = length; (double) bytesRemaining / (double) splitSize > 1.1D; - bytesRemaining -= splitSize) { - int location = getBlockIndex(blockLocations, length - bytesRemaining); - splits.add(new FileSplit(status.getPath(), length - bytesRemaining, - splitSize, length, blockLocations[location].getHosts())); - } - if (bytesRemaining != 0L) { - int location = getBlockIndex(blockLocations, length - bytesRemaining); - splits.add(new FileSplit(status.getPath(), length - bytesRemaining, - bytesRemaining, length, blockLocations[location].getHosts())); - } + result.addFile(locatedFileStatusRemoteIterator.next()); } - - LOG.debug("Path {} includes {} splits.", path, splits.size()); - return splits.toArray(new FileSplit[splits.size()]); + return result; } private static int getBlockIndex(BlockLocation[] blkLocations, long offset) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/CatalogMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/CatalogMgrTest.java index 67416619b0..10692a5f68 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/CatalogMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/CatalogMgrTest.java @@ -54,7 +54,6 @@ import org.apache.doris.mysql.privilege.Auth; import org.apache.doris.planner.ColumnBound; import org.apache.doris.planner.ListPartitionPrunerV2; import org.apache.doris.planner.PartitionPrunerV2Base.UniqueId; -import org.apache.doris.planner.external.FileSplit; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.ShowResultSet; import org.apache.doris.system.SystemInfoService; @@ -62,7 +61,6 @@ import org.apache.doris.utframe.TestWithFeService; import com.google.common.base.Preconditions; import com.google.common.cache.LoadingCache; -import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Range; @@ -678,7 +676,7 @@ public class CatalogMgrTest extends TestWithFeService { HMSExternalCatalog hiveCatalog = (HMSExternalCatalog) mgr.getCatalog(catalogName); HiveMetaStoreCache metaStoreCache = externalMetaCacheMgr.getMetaStoreCache(hiveCatalog); - LoadingCache> preFileCache = metaStoreCache.getFileCacheRef().get(); + LoadingCache preFileCache = metaStoreCache.getFileCacheRef().get(); // 1. properties contains `file.meta.cache.ttl-second`, it should not be equal