[Improvement](multi catalog)Hive splitter. Get HDFS/S3 splits by using FileSystem api (#17706)

Use FileSystem API to get splits for file in HDFS/S3 instead of calling InputFormat.getSplits.
The splits is based on blocks in HDFS/S3.
This commit is contained in:
Jibing-Li
2023-03-15 00:25:00 +08:00
committed by GitHub
parent b28f31f98d
commit 02220560c5
6 changed files with 144 additions and 15 deletions

View File

@ -1714,6 +1714,9 @@ public class Config extends ConfigBase {
@ConfField(mutable = true, masterOnly = false)
public static long file_scan_node_split_num = 128;
@ConfField(mutable = true, masterOnly = false)
public static long file_split_size = 0; // 0 means use the block size in HDFS/S3 as split size
/**
* If set to TRUE, FE will:
* 1. divide BE into high load and low load(no mid load) to force triggering tablet scheduling;
@ -2057,6 +2060,7 @@ public class Config extends ConfigBase {
@ConfField(mutable = false, masterOnly = false)
public static String mysql_load_server_secure_path = "";
@ConfField(mutable = false, masterOnly = false)
public static int mysql_load_thread_pool = 4;
}

View File

@ -53,6 +53,7 @@ public class HMSExternalCatalog extends ExternalCatalog {
protected PooledHiveMetaStoreClient client;
// Record the latest synced event id when processing hive events
private long lastSyncedEventId;
public static final String ENABLE_SELF_SPLITTER = "enable.self.splitter";
/**
* Default constructor for HMSExternalCatalog.

View File

@ -36,6 +36,7 @@ import org.apache.doris.metric.MetricRepo;
import org.apache.doris.planner.ColumnBound;
import org.apache.doris.planner.ListPartitionPrunerV2;
import org.apache.doris.planner.PartitionPrunerV2Base.UniqueId;
import org.apache.doris.planner.external.HiveSplitter;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
@ -50,6 +51,7 @@ import com.google.common.collect.RangeMap;
import com.google.common.collect.TreeRangeMap;
import lombok.Data;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
@ -249,16 +251,20 @@ public class HiveMetaStoreCache {
try {
InputFormat<?, ?> inputFormat = HiveUtil.getInputFormat(jobConf, key.inputFormat, false);
InputSplit[] splits;
String remoteUser = jobConf.get(HdfsResource.HADOOP_USER_NAME);
// TODO: Implement getSplits logic by ourselves, don't call inputFormat.getSplits anymore.
if (!Strings.isNullOrEmpty(remoteUser)) {
UserGroupInformation ugi = UserGroupInformation.createRemoteUser(remoteUser);
splits = ugi.doAs(
(PrivilegedExceptionAction<InputSplit[]>) () -> inputFormat.getSplits(jobConf, 0));
// TODO: This is a temp config, will remove it after the HiveSplitter is stable.
if (key.useSelfSplitter) {
splits = HiveSplitter.getHiveSplits(new Path(finalLocation), inputFormat, jobConf);
} else {
splits = inputFormat.getSplits(jobConf, 0 /* use hdfs block size as default */);
String remoteUser = jobConf.get(HdfsResource.HADOOP_USER_NAME);
if (!Strings.isNullOrEmpty(remoteUser)) {
UserGroupInformation ugi = UserGroupInformation.createRemoteUser(remoteUser);
splits = ugi.doAs(
(PrivilegedExceptionAction<InputSplit[]>) () -> inputFormat.getSplits(jobConf, 0));
} else {
splits = inputFormat.getSplits(jobConf, 0 /* use hdfs block size as default */);
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("load #{} files for {} in catalog {}", splits.length, key, catalog.getName());
}
@ -310,10 +316,10 @@ public class HiveMetaStoreCache {
}
}
public List<InputSplit> getFilesByPartitions(List<HivePartition> partitions) {
public List<InputSplit> getFilesByPartitions(List<HivePartition> partitions, boolean useSelfSplitter) {
long start = System.currentTimeMillis();
List<FileCacheKey> keys = Lists.newArrayListWithExpectedSize(partitions.size());
partitions.stream().forEach(p -> keys.add(new FileCacheKey(p.getPath(), p.getInputFormat())));
partitions.stream().forEach(p -> keys.add(new FileCacheKey(p.getPath(), p.getInputFormat(), useSelfSplitter)));
Stream<FileCacheKey> stream;
if (partitions.size() < MIN_BATCH_FETCH_PARTITION_NUM) {
@ -601,10 +607,20 @@ public class HiveMetaStoreCache {
private String location;
// not in key
private String inputFormat;
// Temp variable, use self file splitter or use InputFormat.getSplits.
// Will remove after self splitter is stable.
private boolean useSelfSplitter;
public FileCacheKey(String location, String inputFormat) {
this.location = location;
this.inputFormat = inputFormat;
this.useSelfSplitter = false;
}
public FileCacheKey(String location, String inputFormat, boolean useSelfSplitter) {
this.location = location;
this.inputFormat = inputFormat;
this.useSelfSplitter = useSelfSplitter;
}
@Override

View File

@ -25,6 +25,8 @@ import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.UserException;
import com.google.common.collect.Lists;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat;
import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
@ -40,6 +42,8 @@ import org.apache.hadoop.util.ReflectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.List;
/**
@ -178,4 +182,32 @@ public final class HiveUtil {
}
}
public static boolean isSplittable(InputFormat<?, ?> inputFormat, FileSystem fileSystem, Path path) {
// ORC uses a custom InputFormat but is always splittable
if (inputFormat.getClass().getSimpleName().equals("OrcInputFormat")) {
return true;
}
// use reflection to get isSplittable method on FileInputFormat
Method method = null;
for (Class<?> clazz = inputFormat.getClass(); clazz != null; clazz = clazz.getSuperclass()) {
try {
method = clazz.getDeclaredMethod("isSplitable", FileSystem.class, Path.class);
break;
} catch (NoSuchMethodException ignored) {
LOG.warn("Class {} doesn't contain isSplitable method.", clazz);
}
}
if (method == null) {
return false;
}
try {
method.setAccessible(true);
return (boolean) method.invoke(inputFormat, fileSystem, path);
} catch (InvocationTargetException | IllegalAccessException e) {
throw new RuntimeException(e);
}
}
}

View File

@ -34,7 +34,7 @@ public class FileSplitStrategy {
}
public boolean hasNext() {
return totalSplitSize > Config.file_scan_node_split_size || splitNum > Config.file_scan_node_split_num;
return totalSplitSize >= Config.file_scan_node_split_size || splitNum >= Config.file_scan_node_split_num;
}
public void next() {

View File

@ -23,11 +23,13 @@ import org.apache.doris.catalog.ListPartitionItem;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.Type;
import org.apache.doris.catalog.external.HMSExternalTable;
import org.apache.doris.common.Config;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.HMSExternalCatalog;
import org.apache.doris.datasource.hive.HiveMetaStoreCache;
import org.apache.doris.datasource.hive.HivePartition;
import org.apache.doris.external.hive.util.HiveUtil;
import org.apache.doris.planner.ColumnRange;
import org.apache.doris.planner.ListPartitionPrunerV2;
import org.apache.doris.planner.Split;
@ -35,12 +37,20 @@ import org.apache.doris.planner.Splitter;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hive.ql.io.orc.OrcSplit;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@ -73,6 +83,13 @@ public class HiveSplitter implements Splitter {
hivePartitionValues = cache.getPartitionValues(hmsTable.getDbName(), hmsTable.getName(),
partitionColumnTypes);
}
Map<String, String> properties = hmsTable.getCatalog().getCatalogProperty().getProperties();
boolean useSelfSplitter = false;
if (properties.containsKey(HMSExternalCatalog.ENABLE_SELF_SPLITTER)
&& properties.get(HMSExternalCatalog.ENABLE_SELF_SPLITTER).equalsIgnoreCase("true")) {
LOG.debug("Using self splitter for hmsTable {}", hmsTable.getName());
useSelfSplitter = true;
}
List<Split> allFiles = Lists.newArrayList();
if (hivePartitionValues != null) {
@ -99,13 +116,13 @@ public class HiveSplitter implements Splitter {
List<HivePartition> partitions = cache.getAllPartitions(hmsTable.getDbName(), hmsTable.getName(),
partitionValuesList);
// 4. get all files of partitions
getFileSplitByPartitions(cache, partitions, allFiles);
getFileSplitByPartitions(cache, partitions, allFiles, useSelfSplitter);
} else {
// unpartitioned table, create a dummy partition to save location and inputformat,
// so that we can unify the interface.
HivePartition dummyPartition = new HivePartition(hmsTable.getRemoteTable().getSd().getInputFormat(),
hmsTable.getRemoteTable().getSd().getLocation(), null);
getFileSplitByPartitions(cache, Lists.newArrayList(dummyPartition), allFiles);
getFileSplitByPartitions(cache, Lists.newArrayList(dummyPartition), allFiles, useSelfSplitter);
this.totalPartitionNum = 1;
this.readPartitionNum = 1;
}
@ -121,8 +138,8 @@ public class HiveSplitter implements Splitter {
}
private void getFileSplitByPartitions(HiveMetaStoreCache cache, List<HivePartition> partitions,
List<Split> allFiles) {
List<InputSplit> files = cache.getFilesByPartitions(partitions);
List<Split> allFiles, boolean useSelfSplitter) {
List<InputSplit> files = cache.getFilesByPartitions(partitions, useSelfSplitter);
if (LOG.isDebugEnabled()) {
LOG.debug("get #{} files from #{} partitions: {}", files.size(), partitions.size(),
Joiner.on(",")
@ -152,4 +169,63 @@ public class HiveSplitter implements Splitter {
public int getReadPartitionNum() {
return readPartitionNum;
}
// Get splits by using FileSystem API, the splits are blocks in HDFS or S3 like storage system.
public static InputSplit[] getHiveSplits(Path path, InputFormat<?, ?> inputFormat,
JobConf jobConf) throws IOException {
FileSystem fs = path.getFileSystem(jobConf);
boolean splittable = HiveUtil.isSplittable(inputFormat, fs, path);
List<InputSplit> splits = Lists.newArrayList();
RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = fs.listFiles(path, true);
if (!locatedFileStatusRemoteIterator.hasNext()) {
LOG.debug("File status for path {} is empty.", path);
return new InputSplit[0];
}
if (!splittable) {
LOG.debug("Path {} is not splittable.", path);
while (locatedFileStatusRemoteIterator.hasNext()) {
LocatedFileStatus status = locatedFileStatusRemoteIterator.next();
BlockLocation block = status.getBlockLocations()[0];
splits.add(new FileSplit(status.getPath(), 0, status.getLen(), block.getHosts()));
}
return splits.toArray(new InputSplit[splits.size()]);
}
long splitSize = Config.file_split_size;
boolean useBlockSize = (splitSize <= 0);
while (locatedFileStatusRemoteIterator.hasNext()) {
LocatedFileStatus status = locatedFileStatusRemoteIterator.next();
if (useBlockSize) {
splitSize = status.getBlockSize();
}
BlockLocation[] blockLocations = status.getBlockLocations();
long length = status.getLen();
long bytesRemaining;
for (bytesRemaining = length; (double) bytesRemaining / (double) splitSize > 1.1D;
bytesRemaining -= splitSize) {
int location = getBlockIndex(blockLocations, length - bytesRemaining);
splits.add(new FileSplit(status.getPath(), length - bytesRemaining,
splitSize, blockLocations[location].getHosts()));
}
if (bytesRemaining != 0L) {
int location = getBlockIndex(blockLocations, length - bytesRemaining);
splits.add(new FileSplit(status.getPath(), length - bytesRemaining,
bytesRemaining, blockLocations[location].getHosts()));
}
}
LOG.debug("Path {} includes {} splits.", path, splits.size());
return splits.toArray(new InputSplit[splits.size()]);
}
private static int getBlockIndex(BlockLocation[] blkLocations, long offset) {
for (int i = 0; i < blkLocations.length; ++i) {
if (blkLocations[i].getOffset() <= offset
&& offset < blkLocations[i].getOffset() + blkLocations[i].getLength()) {
return i;
}
}
BlockLocation last = blkLocations[blkLocations.length - 1];
long fileLength = last.getOffset() + last.getLength() - 1L;
throw new IllegalArgumentException(String.format("Offset %d is outside of file (0..%d)", offset, fileLength));
}
}