[improvement](multi catalog)Use getPartitionsByNames to retrieve hive partitions (#21562)
Before, we get hive partition using HMS getPartition api. In this case, each partition need to call the api once. The performance is very poor when partition number is large. This pr use getPartitionsByNames to get multiple partitions in one api call. To get 90000 partitions, the time costing is reduced to 14s from 108s.
This commit is contained in:
@ -20,11 +20,13 @@ package org.apache.doris.datasource.hive;
|
||||
import org.apache.doris.analysis.PartitionValue;
|
||||
import org.apache.doris.backup.Status;
|
||||
import org.apache.doris.backup.Status.ErrCode;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.HdfsResource;
|
||||
import org.apache.doris.catalog.ListPartitionItem;
|
||||
import org.apache.doris.catalog.PartitionItem;
|
||||
import org.apache.doris.catalog.PartitionKey;
|
||||
import org.apache.doris.catalog.Type;
|
||||
import org.apache.doris.catalog.external.HMSExternalTable;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
@ -54,10 +56,12 @@ import com.google.common.base.Strings;
|
||||
import com.google.common.cache.CacheBuilder;
|
||||
import com.google.common.cache.CacheLoader;
|
||||
import com.google.common.cache.LoadingCache;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Range;
|
||||
import com.google.common.collect.RangeMap;
|
||||
import com.google.common.collect.Streams;
|
||||
import com.google.common.collect.TreeRangeMap;
|
||||
import lombok.Data;
|
||||
import org.apache.commons.lang3.math.NumberUtils;
|
||||
@ -157,7 +161,12 @@ public class HiveMetaStoreCache {
|
||||
|
||||
@Override
|
||||
public HivePartition load(PartitionCacheKey key) {
|
||||
return loadPartitions(key);
|
||||
return loadPartition(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<PartitionCacheKey, HivePartition> loadAll(Iterable<? extends PartitionCacheKey> keys) {
|
||||
return loadPartitions(keys);
|
||||
}
|
||||
|
||||
});
|
||||
@ -304,7 +313,7 @@ public class HiveMetaStoreCache {
|
||||
}
|
||||
}
|
||||
|
||||
private HivePartition loadPartitions(PartitionCacheKey key) {
|
||||
private HivePartition loadPartition(PartitionCacheKey key) {
|
||||
Partition partition = catalog.getClient().getPartition(key.dbName, key.tblName, key.values);
|
||||
StorageDescriptor sd = partition.getSd();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
@ -315,6 +324,37 @@ public class HiveMetaStoreCache {
|
||||
return new HivePartition(key.dbName, key.tblName, false, sd.getInputFormat(), sd.getLocation(), key.values);
|
||||
}
|
||||
|
||||
private Map<PartitionCacheKey, HivePartition> loadPartitions(Iterable<? extends PartitionCacheKey> keys) {
|
||||
PartitionCacheKey oneKey = Iterables.get(keys, 0);
|
||||
String dbName = oneKey.getDbName();
|
||||
String tblName = oneKey.getTblName();
|
||||
List<Column> partitionColumns = ((HMSExternalTable)
|
||||
(catalog.getDbNullable(dbName).getTableNullable(tblName))).getPartitionColumns();
|
||||
// A partitionName is like "country=China/city=Beijing" or "date=2023-02-01"
|
||||
List<String> partitionNames = Streams.stream(keys).map(key -> {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
Preconditions.checkState(key.getValues().size() == partitionColumns.size());
|
||||
for (int i = 0; i < partitionColumns.size(); i++) {
|
||||
sb.append(partitionColumns.get(i).getName());
|
||||
sb.append("=");
|
||||
sb.append(key.getValues().get(i));
|
||||
sb.append("/");
|
||||
}
|
||||
sb.delete(sb.length() - 1, sb.length());
|
||||
return sb.toString();
|
||||
}).collect(Collectors.toList());
|
||||
List<Partition> partitions = catalog.getClient().getPartitions(dbName, tblName, partitionNames);
|
||||
// Compose the return result map.
|
||||
Map<PartitionCacheKey, HivePartition> ret = new HashMap<>();
|
||||
for (Partition partition : partitions) {
|
||||
StorageDescriptor sd = partition.getSd();
|
||||
ret.put(new PartitionCacheKey(dbName, tblName, partition.getValues()),
|
||||
new HivePartition(dbName, tblName, false,
|
||||
sd.getInputFormat(), sd.getLocation(), partition.getValues()));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
// Get File Status by using FileSystem API.
|
||||
private FileCacheValue getFileCache(String location, InputFormat<?, ?> inputFormat,
|
||||
JobConf jobConf,
|
||||
|
||||
@ -152,6 +152,20 @@ public class PooledHiveMetaStoreClient {
|
||||
}
|
||||
}
|
||||
|
||||
public List<Partition> getPartitions(String dbName, String tblName, List<String> partitionNames) {
|
||||
try (CachedClient client = getClient()) {
|
||||
try {
|
||||
return client.client.getPartitionsByNames(dbName, tblName, partitionNames);
|
||||
} catch (Exception e) {
|
||||
client.setThrowable(e);
|
||||
throw e;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new HMSClientException("failed to get partition for table %s in db %s with value %s", e, tblName,
|
||||
dbName, partitionNames);
|
||||
}
|
||||
}
|
||||
|
||||
public List<Partition> getPartitionsByFilter(String dbName, String tblName, String filter) {
|
||||
try (CachedClient client = getClient()) {
|
||||
try {
|
||||
|
||||
@ -1739,13 +1739,17 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
|
||||
return getPartitionsByNames(getDefaultCatalog(conf), db_name, tbl_name, part_names);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Partition> getPartitionsByNames(String catName, String db_name, String tbl_name,
|
||||
List<String> part_names) throws TException {
|
||||
List<Partition> parts =
|
||||
client.get_partitions_by_names(prependCatalogToDbName(catName, db_name, conf), tbl_name, part_names);
|
||||
return deepCopyPartitions(filterHook.filterPartitions(parts));
|
||||
}
|
||||
@Override
|
||||
public List<Partition> getPartitionsByNames(String catName, String db_name, String tbl_name,
|
||||
List<String> part_names) throws TException {
|
||||
if (hiveVersion == HiveVersion.V1_0 || hiveVersion == HiveVersion.V2_0 || hiveVersion == HiveVersion.V2_3) {
|
||||
return deepCopyPartitions(
|
||||
filterHook.filterPartitions(client.get_partitions_by_names(db_name, tbl_name, part_names)));
|
||||
} else {
|
||||
return deepCopyPartitions(filterHook.filterPartitions(
|
||||
client.get_partitions_by_names(prependCatalogToDbName(catName, db_name, conf), tbl_name, part_names)));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public PartitionValuesResponse listPartitionValues(PartitionValuesRequest request)
|
||||
|
||||
Reference in New Issue
Block a user