[fix](multi-catalog)enable use self splitter default (#30846)
This commit is contained in:
@ -610,16 +610,6 @@ public abstract class ExternalCatalog
|
||||
return specifiedDatabaseMap;
|
||||
}
|
||||
|
||||
public boolean useSelfSplitter() {
|
||||
Map<String, String> properties = catalogProperty.getProperties();
|
||||
boolean ret = true;
|
||||
if (properties.containsKey(HMSExternalCatalog.ENABLE_SELF_SPLITTER)
|
||||
&& properties.get(HMSExternalCatalog.ENABLE_SELF_SPLITTER).equalsIgnoreCase("false")) {
|
||||
ret = false;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
public String bindBrokerName() {
|
||||
Map<String, String> properties = catalogProperty.getProperties();
|
||||
if (properties.containsKey(HMSExternalCatalog.BIND_BROKER_NAME)) {
|
||||
|
||||
@ -54,7 +54,6 @@ public class HMSExternalCatalog extends ExternalCatalog {
|
||||
private static final int MIN_CLIENT_POOL_SIZE = 8;
|
||||
protected HMSCachedClient client;
|
||||
|
||||
public static final String ENABLE_SELF_SPLITTER = "enable.self.splitter";
|
||||
public static final String FILE_META_CACHE_TTL_SECOND = "file.meta.cache.ttl-second";
|
||||
// broker name for file split and query scan.
|
||||
public static final String BIND_BROKER_NAME = "broker.name";
|
||||
|
||||
@ -78,8 +78,6 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
|
||||
import org.apache.hadoop.hive.metastore.utils.FileUtils;
|
||||
import org.apache.hadoop.hive.ql.io.AcidUtils;
|
||||
import org.apache.hadoop.mapred.FileInputFormat;
|
||||
import org.apache.hadoop.mapred.InputFormat;
|
||||
import org.apache.hadoop.mapred.InputSplit;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
@ -424,34 +422,8 @@ public class HiveMetaStoreCache {
|
||||
}
|
||||
FileInputFormat.setInputPaths(jobConf, finalLocation.get());
|
||||
try {
|
||||
FileCacheValue result;
|
||||
// TODO: This is a temp config, will remove it after the HiveSplitter is stable.
|
||||
if (key.useSelfSplitter) {
|
||||
result = getFileCache(finalLocation.get(), key.inputFormat, jobConf,
|
||||
key.getPartitionValues(), key.bindBrokerName);
|
||||
} else {
|
||||
InputFormat<?, ?> inputFormat = HiveUtil.getInputFormat(jobConf, key.inputFormat, false);
|
||||
InputSplit[] splits;
|
||||
String remoteUser = jobConf.get(HdfsResource.HADOOP_USER_NAME);
|
||||
if (!Strings.isNullOrEmpty(remoteUser)) {
|
||||
UserGroupInformation ugi = UserGroupInformation.createRemoteUser(remoteUser);
|
||||
splits = ugi.doAs(
|
||||
(PrivilegedExceptionAction<InputSplit[]>) () -> inputFormat.getSplits(jobConf, 0));
|
||||
} else {
|
||||
splits = inputFormat.getSplits(jobConf, 0 /* use hdfs block size as default */);
|
||||
}
|
||||
result = new FileCacheValue();
|
||||
// Convert the hadoop split to Doris Split.
|
||||
for (int i = 0; i < splits.length; i++) {
|
||||
org.apache.hadoop.mapred.FileSplit fs = ((org.apache.hadoop.mapred.FileSplit) splits[i]);
|
||||
// todo: get modification time
|
||||
String dataFilePath = fs.getPath().toString();
|
||||
LocationPath locationPath = new LocationPath(dataFilePath, catalog.getProperties());
|
||||
Path splitFilePath = locationPath.toScanRangeLocation();
|
||||
result.addSplit(new FileSplit(splitFilePath, fs.getStart(), fs.getLength(), -1, null, null));
|
||||
}
|
||||
}
|
||||
|
||||
FileCacheValue result = getFileCache(finalLocation.get(), key.inputFormat, jobConf,
|
||||
key.getPartitionValues(), key.bindBrokerName);
|
||||
// Replace default hive partition with a null_string.
|
||||
for (int i = 0; i < result.getValuesSize(); i++) {
|
||||
if (HIVE_DEFAULT_PARTITION.equals(result.getPartitionValues().get(i))) {
|
||||
@ -509,24 +481,23 @@ public class HiveMetaStoreCache {
|
||||
}
|
||||
|
||||
public List<FileCacheValue> getFilesByPartitionsWithCache(List<HivePartition> partitions,
|
||||
boolean useSelfSplitter, String bindBrokerName) {
|
||||
return getFilesByPartitions(partitions, useSelfSplitter, true, bindBrokerName);
|
||||
String bindBrokerName) {
|
||||
return getFilesByPartitions(partitions, true, bindBrokerName);
|
||||
}
|
||||
|
||||
public List<FileCacheValue> getFilesByPartitionsWithoutCache(List<HivePartition> partitions,
|
||||
boolean useSelfSplitter, String bindBrokerName) {
|
||||
return getFilesByPartitions(partitions, useSelfSplitter, false, bindBrokerName);
|
||||
String bindBrokerName) {
|
||||
return getFilesByPartitions(partitions, false, bindBrokerName);
|
||||
}
|
||||
|
||||
private List<FileCacheValue> getFilesByPartitions(List<HivePartition> partitions,
|
||||
boolean useSelfSplitter, boolean withCache, String bindBrokerName) {
|
||||
boolean withCache, String bindBrokerName) {
|
||||
long start = System.currentTimeMillis();
|
||||
List<FileCacheKey> keys = partitions.stream().map(p -> {
|
||||
FileCacheKey fileCacheKey = p.isDummyPartition()
|
||||
? FileCacheKey.createDummyCacheKey(p.getDbName(), p.getTblName(), p.getPath(),
|
||||
p.getInputFormat(), useSelfSplitter, bindBrokerName)
|
||||
p.getInputFormat(), bindBrokerName)
|
||||
: new FileCacheKey(p.getPath(), p.getInputFormat(), p.getPartitionValues(), bindBrokerName);
|
||||
fileCacheKey.setUseSelfSplitter(useSelfSplitter);
|
||||
return fileCacheKey;
|
||||
}).collect(Collectors.toList());
|
||||
|
||||
@ -621,7 +592,7 @@ public class HiveMetaStoreCache {
|
||||
* and FE will exit if some network problems occur.
|
||||
* */
|
||||
FileCacheKey fileCacheKey = FileCacheKey.createDummyCacheKey(
|
||||
dbName, tblName, null, null, false, null);
|
||||
dbName, tblName, null, null, null);
|
||||
fileCacheRef.get().invalidate(fileCacheKey);
|
||||
}
|
||||
}
|
||||
@ -963,9 +934,6 @@ public class HiveMetaStoreCache {
|
||||
private String inputFormat;
|
||||
// Broker name for file split and file scan.
|
||||
private String bindBrokerName;
|
||||
// Temp variable, use self file splitter or use InputFormat.getSplits.
|
||||
// Will remove after self splitter is stable.
|
||||
private boolean useSelfSplitter;
|
||||
// The values of partitions.
|
||||
// e.g for file : hdfs://path/to/table/part1=a/part2=b/datafile
|
||||
// partitionValues would be ["part1", "part2"]
|
||||
@ -975,16 +943,14 @@ public class HiveMetaStoreCache {
|
||||
this.location = location;
|
||||
this.inputFormat = inputFormat;
|
||||
this.partitionValues = partitionValues == null ? Lists.newArrayList() : partitionValues;
|
||||
this.useSelfSplitter = true;
|
||||
this.bindBrokerName = bindBrokerName;
|
||||
}
|
||||
|
||||
public static FileCacheKey createDummyCacheKey(String dbName, String tblName, String location,
|
||||
String inputFormat, boolean useSelfSplitter,
|
||||
String inputFormat,
|
||||
String bindBrokerName) {
|
||||
FileCacheKey fileCacheKey = new FileCacheKey(location, inputFormat, null, bindBrokerName);
|
||||
fileCacheKey.dummyKey = dbName + "." + tblName;
|
||||
fileCacheKey.useSelfSplitter = useSelfSplitter;
|
||||
return fileCacheKey;
|
||||
}
|
||||
|
||||
|
||||
@ -196,15 +196,9 @@ public class HiveScanNode extends FileQueryScanNode {
|
||||
try {
|
||||
HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
|
||||
.getMetaStoreCache((HMSExternalCatalog) hmsTable.getCatalog());
|
||||
boolean useSelfSplitter = hmsTable.getCatalog().useSelfSplitter();
|
||||
String bindBrokerName = hmsTable.getCatalog().bindBrokerName();
|
||||
if (bindBrokerName != null && useSelfSplitter == false) {
|
||||
// useSelfSplitter must be true if bindBrokerName is set.
|
||||
throw new UserException(HMSExternalCatalog.ENABLE_SELF_SPLITTER + " should be true if "
|
||||
+ HMSExternalCatalog.BIND_BROKER_NAME + " is set");
|
||||
}
|
||||
List<Split> allFiles = Lists.newArrayList();
|
||||
getFileSplitByPartitions(cache, getPartitions(), allFiles, useSelfSplitter, bindBrokerName);
|
||||
getFileSplitByPartitions(cache, getPartitions(), allFiles, bindBrokerName);
|
||||
LOG.debug("get #{} files for table: {}.{}, cost: {} ms",
|
||||
allFiles.size(), hmsTable.getDbName(), hmsTable.getName(), (System.currentTimeMillis() - start));
|
||||
return allFiles;
|
||||
@ -217,13 +211,12 @@ public class HiveScanNode extends FileQueryScanNode {
|
||||
}
|
||||
|
||||
private void getFileSplitByPartitions(HiveMetaStoreCache cache, List<HivePartition> partitions,
|
||||
List<Split> allFiles, boolean useSelfSplitter,
|
||||
String bindBrokerName) throws IOException {
|
||||
List<Split> allFiles, String bindBrokerName) throws IOException {
|
||||
List<FileCacheValue> fileCaches;
|
||||
if (hiveTransaction != null) {
|
||||
fileCaches = getFileSplitByTransaction(cache, partitions, bindBrokerName);
|
||||
} else {
|
||||
fileCaches = cache.getFilesByPartitionsWithCache(partitions, useSelfSplitter, bindBrokerName);
|
||||
fileCaches = cache.getFilesByPartitionsWithCache(partitions, bindBrokerName);
|
||||
}
|
||||
if (ConnectContext.get().getExecutor() != null) {
|
||||
ConnectContext.get().getExecutor().getSummaryProfile().setGetPartitionFilesFinishTime();
|
||||
|
||||
@ -720,7 +720,7 @@ public class StatisticsUtil {
|
||||
}
|
||||
// Get files for all partitions.
|
||||
String bindBrokerName = table.getCatalog().bindBrokerName();
|
||||
return cache.getFilesByPartitionsWithoutCache(hivePartitions, true, bindBrokerName);
|
||||
return cache.getFilesByPartitionsWithoutCache(hivePartitions, bindBrokerName);
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
Reference in New Issue
Block a user