From 64d0e28ed05d48d82d7e8bc99c9ffbffbde3d4bf Mon Sep 17 00:00:00 2001 From: Jibing-Li <64681310+Jibing-Li@users.noreply.github.com> Date: Fri, 7 Jul 2023 10:37:33 +0800 Subject: [PATCH] [improvement](multi catalog)Use getPartitionsByNames to retrieve hive partitions (#21562) Before, we get hive partition using HMS getPartition api. In this case, each partition need to call the api once. The performance is very poor when partition number is large. This pr use getPartitionsByNames to get multiple partitions in one api call. To get 90000 partitions, the time costing is reduced to 14s from 108s. --- .../datasource/hive/HiveMetaStoreCache.java | 44 ++++++++++++++++++- .../hive/PooledHiveMetaStoreClient.java | 14 ++++++ .../hive/metastore/HiveMetaStoreClient.java | 18 +++++--- 3 files changed, 67 insertions(+), 9 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java index 3de4a5d1d2..0b8e9d5bc2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java @@ -20,11 +20,13 @@ package org.apache.doris.datasource.hive; import org.apache.doris.analysis.PartitionValue; import org.apache.doris.backup.Status; import org.apache.doris.backup.Status.ErrCode; +import org.apache.doris.catalog.Column; import org.apache.doris.catalog.HdfsResource; import org.apache.doris.catalog.ListPartitionItem; import org.apache.doris.catalog.PartitionItem; import org.apache.doris.catalog.PartitionKey; import org.apache.doris.catalog.Type; +import org.apache.doris.catalog.external.HMSExternalTable; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.FeConstants; @@ -54,10 +56,12 @@ import com.google.common.base.Strings; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Range; import com.google.common.collect.RangeMap; +import com.google.common.collect.Streams; import com.google.common.collect.TreeRangeMap; import lombok.Data; import org.apache.commons.lang3.math.NumberUtils; @@ -157,7 +161,12 @@ public class HiveMetaStoreCache { @Override public HivePartition load(PartitionCacheKey key) { - return loadPartitions(key); + return loadPartition(key); + } + + @Override + public Map loadAll(Iterable keys) { + return loadPartitions(keys); } }); @@ -304,7 +313,7 @@ public class HiveMetaStoreCache { } } - private HivePartition loadPartitions(PartitionCacheKey key) { + private HivePartition loadPartition(PartitionCacheKey key) { Partition partition = catalog.getClient().getPartition(key.dbName, key.tblName, key.values); StorageDescriptor sd = partition.getSd(); if (LOG.isDebugEnabled()) { @@ -315,6 +324,37 @@ public class HiveMetaStoreCache { return new HivePartition(key.dbName, key.tblName, false, sd.getInputFormat(), sd.getLocation(), key.values); } + private Map loadPartitions(Iterable keys) { + PartitionCacheKey oneKey = Iterables.get(keys, 0); + String dbName = oneKey.getDbName(); + String tblName = oneKey.getTblName(); + List partitionColumns = ((HMSExternalTable) + (catalog.getDbNullable(dbName).getTableNullable(tblName))).getPartitionColumns(); + // A partitionName is like "country=China/city=Beijing" or "date=2023-02-01" + List partitionNames = Streams.stream(keys).map(key -> { + StringBuilder sb = new StringBuilder(); + Preconditions.checkState(key.getValues().size() == partitionColumns.size()); + for (int i = 0; i < partitionColumns.size(); i++) { + sb.append(partitionColumns.get(i).getName()); + sb.append("="); + sb.append(key.getValues().get(i)); + sb.append("/"); + } + sb.delete(sb.length() - 1, sb.length()); + return sb.toString(); + }).collect(Collectors.toList()); + List partitions = catalog.getClient().getPartitions(dbName, tblName, partitionNames); + // Compose the return result map. + Map ret = new HashMap<>(); + for (Partition partition : partitions) { + StorageDescriptor sd = partition.getSd(); + ret.put(new PartitionCacheKey(dbName, tblName, partition.getValues()), + new HivePartition(dbName, tblName, false, + sd.getInputFormat(), sd.getLocation(), partition.getValues())); + } + return ret; + } + // Get File Status by using FileSystem API. private FileCacheValue getFileCache(String location, InputFormat inputFormat, JobConf jobConf, diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PooledHiveMetaStoreClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PooledHiveMetaStoreClient.java index 9f9a1a457f..f3c2557a1d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PooledHiveMetaStoreClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PooledHiveMetaStoreClient.java @@ -152,6 +152,20 @@ public class PooledHiveMetaStoreClient { } } + public List getPartitions(String dbName, String tblName, List partitionNames) { + try (CachedClient client = getClient()) { + try { + return client.client.getPartitionsByNames(dbName, tblName, partitionNames); + } catch (Exception e) { + client.setThrowable(e); + throw e; + } + } catch (Exception e) { + throw new HMSClientException("failed to get partition for table %s in db %s with value %s", e, tblName, + dbName, partitionNames); + } + } + public List getPartitionsByFilter(String dbName, String tblName, String filter) { try (CachedClient client = getClient()) { try { diff --git a/fe/fe-core/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java b/fe/fe-core/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java index 852c4a289d..f117b1fb12 100644 --- a/fe/fe-core/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java +++ b/fe/fe-core/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java @@ -1739,13 +1739,17 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable { return getPartitionsByNames(getDefaultCatalog(conf), db_name, tbl_name, part_names); } - @Override - public List getPartitionsByNames(String catName, String db_name, String tbl_name, - List part_names) throws TException { - List parts = - client.get_partitions_by_names(prependCatalogToDbName(catName, db_name, conf), tbl_name, part_names); - return deepCopyPartitions(filterHook.filterPartitions(parts)); - } + @Override + public List getPartitionsByNames(String catName, String db_name, String tbl_name, + List part_names) throws TException { + if (hiveVersion == HiveVersion.V1_0 || hiveVersion == HiveVersion.V2_0 || hiveVersion == HiveVersion.V2_3) { + return deepCopyPartitions( + filterHook.filterPartitions(client.get_partitions_by_names(db_name, tbl_name, part_names))); + } else { + return deepCopyPartitions(filterHook.filterPartitions( + client.get_partitions_by_names(prependCatalogToDbName(catName, db_name, conf), tbl_name, part_names))); + } + } @Override public PartitionValuesResponse listPartitionValues(PartitionValuesRequest request)