From 02220560c59bdb18cdf924b62ae9574c5bedee84 Mon Sep 17 00:00:00 2001 From: Jibing-Li <64681310+Jibing-Li@users.noreply.github.com> Date: Wed, 15 Mar 2023 00:25:00 +0800 Subject: [PATCH] [Improvement](multi catalog)Hive splitter. Get HDFS/S3 splits by using FileSystem api (#17706) Use FileSystem API to get splits for file in HDFS/S3 instead of calling InputFormat.getSplits. The splits is based on blocks in HDFS/S3. --- .../java/org/apache/doris/common/Config.java | 4 + .../doris/datasource/HMSExternalCatalog.java | 1 + .../datasource/hive/HiveMetaStoreCache.java | 36 +++++--- .../doris/external/hive/util/HiveUtil.java | 32 +++++++ .../planner/external/FileSplitStrategy.java | 2 +- .../doris/planner/external/HiveSplitter.java | 84 ++++++++++++++++++- 6 files changed, 144 insertions(+), 15 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 1f7561c8cb..b2f33c0dea 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1714,6 +1714,9 @@ public class Config extends ConfigBase { @ConfField(mutable = true, masterOnly = false) public static long file_scan_node_split_num = 128; + @ConfField(mutable = true, masterOnly = false) + public static long file_split_size = 0; // 0 means use the block size in HDFS/S3 as split size + /** * If set to TRUE, FE will: * 1. divide BE into high load and low load(no mid load) to force triggering tablet scheduling; @@ -2057,6 +2060,7 @@ public class Config extends ConfigBase { @ConfField(mutable = false, masterOnly = false) public static String mysql_load_server_secure_path = ""; + @ConfField(mutable = false, masterOnly = false) public static int mysql_load_thread_pool = 4; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java index 0cf7afdc53..80443ca8b2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java @@ -53,6 +53,7 @@ public class HMSExternalCatalog extends ExternalCatalog { protected PooledHiveMetaStoreClient client; // Record the latest synced event id when processing hive events private long lastSyncedEventId; + public static final String ENABLE_SELF_SPLITTER = "enable.self.splitter"; /** * Default constructor for HMSExternalCatalog. diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java index c2313eb885..156f6eba66 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java @@ -36,6 +36,7 @@ import org.apache.doris.metric.MetricRepo; import org.apache.doris.planner.ColumnBound; import org.apache.doris.planner.ListPartitionPrunerV2; import org.apache.doris.planner.PartitionPrunerV2Base.UniqueId; +import org.apache.doris.planner.external.HiveSplitter; import com.google.common.base.Preconditions; import com.google.common.base.Strings; @@ -50,6 +51,7 @@ import com.google.common.collect.RangeMap; import com.google.common.collect.TreeRangeMap; import lombok.Data; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; @@ -249,16 +251,20 @@ public class HiveMetaStoreCache { try { InputFormat inputFormat = HiveUtil.getInputFormat(jobConf, key.inputFormat, false); InputSplit[] splits; - String remoteUser = jobConf.get(HdfsResource.HADOOP_USER_NAME); - - // TODO: Implement getSplits logic by ourselves, don't call inputFormat.getSplits anymore. - if (!Strings.isNullOrEmpty(remoteUser)) { - UserGroupInformation ugi = UserGroupInformation.createRemoteUser(remoteUser); - splits = ugi.doAs( - (PrivilegedExceptionAction) () -> inputFormat.getSplits(jobConf, 0)); + // TODO: This is a temp config, will remove it after the HiveSplitter is stable. + if (key.useSelfSplitter) { + splits = HiveSplitter.getHiveSplits(new Path(finalLocation), inputFormat, jobConf); } else { - splits = inputFormat.getSplits(jobConf, 0 /* use hdfs block size as default */); + String remoteUser = jobConf.get(HdfsResource.HADOOP_USER_NAME); + if (!Strings.isNullOrEmpty(remoteUser)) { + UserGroupInformation ugi = UserGroupInformation.createRemoteUser(remoteUser); + splits = ugi.doAs( + (PrivilegedExceptionAction) () -> inputFormat.getSplits(jobConf, 0)); + } else { + splits = inputFormat.getSplits(jobConf, 0 /* use hdfs block size as default */); + } } + if (LOG.isDebugEnabled()) { LOG.debug("load #{} files for {} in catalog {}", splits.length, key, catalog.getName()); } @@ -310,10 +316,10 @@ public class HiveMetaStoreCache { } } - public List getFilesByPartitions(List partitions) { + public List getFilesByPartitions(List partitions, boolean useSelfSplitter) { long start = System.currentTimeMillis(); List keys = Lists.newArrayListWithExpectedSize(partitions.size()); - partitions.stream().forEach(p -> keys.add(new FileCacheKey(p.getPath(), p.getInputFormat()))); + partitions.stream().forEach(p -> keys.add(new FileCacheKey(p.getPath(), p.getInputFormat(), useSelfSplitter))); Stream stream; if (partitions.size() < MIN_BATCH_FETCH_PARTITION_NUM) { @@ -601,10 +607,20 @@ public class HiveMetaStoreCache { private String location; // not in key private String inputFormat; + // Temp variable, use self file splitter or use InputFormat.getSplits. + // Will remove after self splitter is stable. + private boolean useSelfSplitter; public FileCacheKey(String location, String inputFormat) { this.location = location; this.inputFormat = inputFormat; + this.useSelfSplitter = false; + } + + public FileCacheKey(String location, String inputFormat, boolean useSelfSplitter) { + this.location = location; + this.inputFormat = inputFormat; + this.useSelfSplitter = useSelfSplitter; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/hive/util/HiveUtil.java b/fe/fe-core/src/main/java/org/apache/doris/external/hive/util/HiveUtil.java index 1586788a30..f3a617d3e4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/hive/util/HiveUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/hive/util/HiveUtil.java @@ -25,6 +25,8 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.UserException; import com.google.common.collect.Lists; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat; import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat; @@ -40,6 +42,8 @@ import org.apache.hadoop.util.ReflectionUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.util.List; /** @@ -178,4 +182,32 @@ public final class HiveUtil { } } + public static boolean isSplittable(InputFormat inputFormat, FileSystem fileSystem, Path path) { + // ORC uses a custom InputFormat but is always splittable + if (inputFormat.getClass().getSimpleName().equals("OrcInputFormat")) { + return true; + } + + // use reflection to get isSplittable method on FileInputFormat + Method method = null; + for (Class clazz = inputFormat.getClass(); clazz != null; clazz = clazz.getSuperclass()) { + try { + method = clazz.getDeclaredMethod("isSplitable", FileSystem.class, Path.class); + break; + } catch (NoSuchMethodException ignored) { + LOG.warn("Class {} doesn't contain isSplitable method.", clazz); + } + } + + if (method == null) { + return false; + } + try { + method.setAccessible(true); + return (boolean) method.invoke(inputFormat, fileSystem, path); + } catch (InvocationTargetException | IllegalAccessException e) { + throw new RuntimeException(e); + } + } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplitStrategy.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplitStrategy.java index e574aeb9d2..8fd7f2d16a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplitStrategy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplitStrategy.java @@ -34,7 +34,7 @@ public class FileSplitStrategy { } public boolean hasNext() { - return totalSplitSize > Config.file_scan_node_split_size || splitNum > Config.file_scan_node_split_num; + return totalSplitSize >= Config.file_scan_node_split_size || splitNum >= Config.file_scan_node_split_num; } public void next() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplitter.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplitter.java index a49935b9ee..3dc2253f8a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplitter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplitter.java @@ -23,11 +23,13 @@ import org.apache.doris.catalog.ListPartitionItem; import org.apache.doris.catalog.PartitionItem; import org.apache.doris.catalog.Type; import org.apache.doris.catalog.external.HMSExternalTable; +import org.apache.doris.common.Config; import org.apache.doris.common.UserException; import org.apache.doris.common.util.Util; import org.apache.doris.datasource.HMSExternalCatalog; import org.apache.doris.datasource.hive.HiveMetaStoreCache; import org.apache.doris.datasource.hive.HivePartition; +import org.apache.doris.external.hive.util.HiveUtil; import org.apache.doris.planner.ColumnRange; import org.apache.doris.planner.ListPartitionPrunerV2; import org.apache.doris.planner.Split; @@ -35,12 +37,20 @@ import org.apache.doris.planner.Splitter; import com.google.common.base.Joiner; import com.google.common.collect.Lists; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hive.ql.io.orc.OrcSplit; import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.io.IOException; import java.util.Collection; import java.util.List; import java.util.Map; @@ -73,6 +83,13 @@ public class HiveSplitter implements Splitter { hivePartitionValues = cache.getPartitionValues(hmsTable.getDbName(), hmsTable.getName(), partitionColumnTypes); } + Map properties = hmsTable.getCatalog().getCatalogProperty().getProperties(); + boolean useSelfSplitter = false; + if (properties.containsKey(HMSExternalCatalog.ENABLE_SELF_SPLITTER) + && properties.get(HMSExternalCatalog.ENABLE_SELF_SPLITTER).equalsIgnoreCase("true")) { + LOG.debug("Using self splitter for hmsTable {}", hmsTable.getName()); + useSelfSplitter = true; + } List allFiles = Lists.newArrayList(); if (hivePartitionValues != null) { @@ -99,13 +116,13 @@ public class HiveSplitter implements Splitter { List partitions = cache.getAllPartitions(hmsTable.getDbName(), hmsTable.getName(), partitionValuesList); // 4. get all files of partitions - getFileSplitByPartitions(cache, partitions, allFiles); + getFileSplitByPartitions(cache, partitions, allFiles, useSelfSplitter); } else { // unpartitioned table, create a dummy partition to save location and inputformat, // so that we can unify the interface. HivePartition dummyPartition = new HivePartition(hmsTable.getRemoteTable().getSd().getInputFormat(), hmsTable.getRemoteTable().getSd().getLocation(), null); - getFileSplitByPartitions(cache, Lists.newArrayList(dummyPartition), allFiles); + getFileSplitByPartitions(cache, Lists.newArrayList(dummyPartition), allFiles, useSelfSplitter); this.totalPartitionNum = 1; this.readPartitionNum = 1; } @@ -121,8 +138,8 @@ public class HiveSplitter implements Splitter { } private void getFileSplitByPartitions(HiveMetaStoreCache cache, List partitions, - List allFiles) { - List files = cache.getFilesByPartitions(partitions); + List allFiles, boolean useSelfSplitter) { + List files = cache.getFilesByPartitions(partitions, useSelfSplitter); if (LOG.isDebugEnabled()) { LOG.debug("get #{} files from #{} partitions: {}", files.size(), partitions.size(), Joiner.on(",") @@ -152,4 +169,63 @@ public class HiveSplitter implements Splitter { public int getReadPartitionNum() { return readPartitionNum; } + + // Get splits by using FileSystem API, the splits are blocks in HDFS or S3 like storage system. + public static InputSplit[] getHiveSplits(Path path, InputFormat inputFormat, + JobConf jobConf) throws IOException { + FileSystem fs = path.getFileSystem(jobConf); + boolean splittable = HiveUtil.isSplittable(inputFormat, fs, path); + List splits = Lists.newArrayList(); + RemoteIterator locatedFileStatusRemoteIterator = fs.listFiles(path, true); + if (!locatedFileStatusRemoteIterator.hasNext()) { + LOG.debug("File status for path {} is empty.", path); + return new InputSplit[0]; + } + if (!splittable) { + LOG.debug("Path {} is not splittable.", path); + while (locatedFileStatusRemoteIterator.hasNext()) { + LocatedFileStatus status = locatedFileStatusRemoteIterator.next(); + BlockLocation block = status.getBlockLocations()[0]; + splits.add(new FileSplit(status.getPath(), 0, status.getLen(), block.getHosts())); + } + return splits.toArray(new InputSplit[splits.size()]); + } + long splitSize = Config.file_split_size; + boolean useBlockSize = (splitSize <= 0); + while (locatedFileStatusRemoteIterator.hasNext()) { + LocatedFileStatus status = locatedFileStatusRemoteIterator.next(); + if (useBlockSize) { + splitSize = status.getBlockSize(); + } + BlockLocation[] blockLocations = status.getBlockLocations(); + long length = status.getLen(); + long bytesRemaining; + for (bytesRemaining = length; (double) bytesRemaining / (double) splitSize > 1.1D; + bytesRemaining -= splitSize) { + int location = getBlockIndex(blockLocations, length - bytesRemaining); + splits.add(new FileSplit(status.getPath(), length - bytesRemaining, + splitSize, blockLocations[location].getHosts())); + } + if (bytesRemaining != 0L) { + int location = getBlockIndex(blockLocations, length - bytesRemaining); + splits.add(new FileSplit(status.getPath(), length - bytesRemaining, + bytesRemaining, blockLocations[location].getHosts())); + } + } + + LOG.debug("Path {} includes {} splits.", path, splits.size()); + return splits.toArray(new InputSplit[splits.size()]); + } + + private static int getBlockIndex(BlockLocation[] blkLocations, long offset) { + for (int i = 0; i < blkLocations.length; ++i) { + if (blkLocations[i].getOffset() <= offset + && offset < blkLocations[i].getOffset() + blkLocations[i].getLength()) { + return i; + } + } + BlockLocation last = blkLocations[blkLocations.length - 1]; + long fileLength = last.getOffset() + last.getLength() - 1L; + throw new IllegalArgumentException(String.format("Offset %d is outside of file (0..%d)", offset, fileLength)); + } }