[Enhancement](multi-catalog) Make meta cache batch loading concurrently. (#21471)

I will enhance performance about querying meta cache of hms tables by 2 steps:
**Step1** : use concurrent batch loading for meta cache
**Step2** : execute some other tasks concurrently as soon as possible

**This pr mainly for step1 and it mainly do the following things:**
- Create a `CacheBulkLoader` for batch loading
- Remove the executor of the previous async cache loader and change the loader's type to `CacheBulkLoader` (We do not set any refresh strategies for LoadingCache, so the previous executor is not useful)
- Use a `FixedCacheThreadPool` to replace the `CacheThreadPool` (The previous `CacheThreadPool` just log warn infos and will not throw any exceptions when the pool is full).
- Remove parallel streams and use the `CacheBulkLoader` to do batch loadings
- Change the value of `max_external_cache_loader_thread_pool_size` to 64, and set the pool size of hms client pool to `max_external_cache_loader_thread_pool_size`
- Fix the spelling mistake for `max_hive_table_catch_num`
This commit is contained in:
Xiangyu Wang
2023-07-06 15:18:30 +08:00
committed by GitHub
parent fde73b6cc6
commit bb3b6770b5
8 changed files with 233 additions and 101 deletions

View File

@ -1694,7 +1694,7 @@ public class Config extends ConfigBase {
@ConfField(mutable = false, masterOnly = false, description = {"Hive表到分区名列表缓存的最大数量。",
"Max cache number of hive table to partition names list."})
public static long max_hive_table_catch_num = 1000;
public static long max_hive_table_cache_num = 1000;
@ConfField(mutable = false, masterOnly = false, description = {"获取Hive分区值时候的最大返回数量,-1代表没有限制。",
"Max number of hive partition values to return while list partitions, -1 means no limitation."})
@ -1705,7 +1705,7 @@ public class Config extends ConfigBase {
* Max thread pool size for loading external meta cache
*/
@ConfField(mutable = false, masterOnly = false)
public static int max_external_cache_loader_thread_pool_size = 10;
public static int max_external_cache_loader_thread_pool_size = 64;
/**
* Max cache num of external catalog's file

View File

@ -126,6 +126,14 @@ public class ThreadPoolManager {
poolName, needRegisterMetric);
}
public static ThreadPoolExecutor newDaemonFixedThreadPool(int numThread, int queueSize,
String poolName, int timeoutSeconds,
boolean needRegisterMetric) {
return newDaemonThreadPool(numThread, numThread, KEEP_ALIVE_TIME, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(queueSize), new BlockedPolicy(poolName, timeoutSeconds),
poolName, needRegisterMetric);
}
public static <T> ThreadPoolExecutor newDaemonFixedPriorityThreadPool(int numThread, int initQueueSize,
Comparator<T> comparator, Class<T> tClass,
String poolName, boolean needRegisterMetric) {

View File

@ -0,0 +1,51 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.common.util;
import org.apache.doris.common.Pair;
import com.google.common.cache.CacheLoader;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.collect.Streams;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
public abstract class CacheBulkLoader<K, V> extends CacheLoader<K, V> {
protected abstract ExecutorService getExecutor();
@Override
public Map<K, V> loadAll(Iterable<? extends K> keys)
throws ExecutionException, InterruptedException {
List<Pair<? extends K, Future<V>>> pList = Streams.stream(keys)
.map(key -> Pair.of(key, getExecutor().submit(() -> load(key))))
.collect(Collectors.toList());
Map<K, V> vMap = Maps.newLinkedHashMap();
for (Pair<? extends K, Future<V>> p : pList) {
vMap.put(p.first, p.second.get());
}
return ImmutableMap.copyOf(vMap);
}
}

View File

@ -30,7 +30,7 @@ import org.apache.logging.log4j.Logger;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
/**
* Cache meta of external catalog
@ -44,11 +44,13 @@ public class ExternalMetaCacheMgr {
private Map<Long, HiveMetaStoreCache> cacheMap = Maps.newConcurrentMap();
// catalog id -> table schema cache
private Map<Long, ExternalSchemaCache> schemaCacheMap = Maps.newHashMap();
private Executor executor;
private ExecutorService executor;
public ExternalMetaCacheMgr() {
executor = ThreadPoolManager.newDaemonCacheThreadPool(Config.max_external_cache_loader_thread_pool_size,
"ExternalMetaCacheMgr", true);
executor = ThreadPoolManager.newDaemonFixedThreadPool(
Config.max_external_cache_loader_thread_pool_size,
Config.max_external_cache_loader_thread_pool_size * 1000,
"ExternalMetaCacheMgr", 120, true);
}
public HiveMetaStoreCache getMetaStoreCache(HMSExternalCatalog catalog) {
@ -69,7 +71,7 @@ public class ExternalMetaCacheMgr {
if (cache == null) {
synchronized (schemaCacheMap) {
if (!schemaCacheMap.containsKey(catalog.getId())) {
schemaCacheMap.put(catalog.getId(), new ExternalSchemaCache(catalog, executor));
schemaCacheMap.put(catalog.getId(), new ExternalSchemaCache(catalog));
}
cache = schemaCacheMap.get(catalog.getId());
}

View File

@ -37,31 +37,30 @@ import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
// The schema cache for external table
public class ExternalSchemaCache {
private static final Logger LOG = LogManager.getLogger(ExternalSchemaCache.class);
private ExternalCatalog catalog;
private final ExternalCatalog catalog;
private LoadingCache<SchemaCacheKey, ImmutableList<Column>> schemaCache;
public ExternalSchemaCache(ExternalCatalog catalog, Executor executor) {
public ExternalSchemaCache(ExternalCatalog catalog) {
this.catalog = catalog;
init(executor);
init();
initMetrics();
}
private void init(Executor executor) {
private void init() {
schemaCache = CacheBuilder.newBuilder().maximumSize(Config.max_external_schema_cache_num)
.expireAfterAccess(Config.external_cache_expire_time_minutes_after_access, TimeUnit.MINUTES)
.build(CacheLoader.asyncReloading(new CacheLoader<SchemaCacheKey, ImmutableList<Column>>() {
.build(new CacheLoader<SchemaCacheKey, ImmutableList<Column>>() {
@Override
public ImmutableList<Column> load(SchemaCacheKey key) throws Exception {
public ImmutableList<Column> load(SchemaCacheKey key) {
return loadSchema(key);
}
}, executor));
});
}
private void initMetrics() {

View File

@ -54,7 +54,7 @@ import java.util.Objects;
public class HMSExternalCatalog extends ExternalCatalog {
private static final Logger LOG = LogManager.getLogger(HMSExternalCatalog.class);
private static final int MAX_CLIENT_POOL_SIZE = 8;
private static final int MIN_CLIENT_POOL_SIZE = 8;
protected PooledHiveMetaStoreClient client;
// Record the latest synced event id when processing hive events
// Must set to -1 otherwise client.getNextNotification will throw exception
@ -161,7 +161,8 @@ public class HMSExternalCatalog extends ExternalCatalog {
}
}
client = new PooledHiveMetaStoreClient(hiveConf, MAX_CLIENT_POOL_SIZE);
client = new PooledHiveMetaStoreClient(hiveConf,
Math.max(MIN_CLIENT_POOL_SIZE, Config.max_external_cache_loader_thread_pool_size));
}
@Override

View File

@ -29,6 +29,7 @@ import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.CacheBulkLoader;
import org.apache.doris.common.util.S3Util;
import org.apache.doris.datasource.CacheException;
import org.apache.doris.datasource.HMSExternalCatalog;
@ -89,11 +90,10 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
// The cache of a hms catalog. 3 kind of caches:
// 1. partitionValuesCache: cache the partition values of a table, for partition prune.
@ -101,17 +101,15 @@ import java.util.stream.Stream;
// 3. fileCache: cache the files of a location.
public class HiveMetaStoreCache {
private static final Logger LOG = LogManager.getLogger(HiveMetaStoreCache.class);
private static final int MIN_BATCH_FETCH_PARTITION_NUM = 50;
public static final String HIVE_DEFAULT_PARTITION = "__HIVE_DEFAULT_PARTITION__";
// After hive 3, transactional table's will have file '_orc_acid_version' with value >= '2'.
public static final String HIVE_ORC_ACID_VERSION_FILE = "_orc_acid_version";
private static final String HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX = "bucket_";
private HMSExternalCatalog catalog;
private final HMSExternalCatalog catalog;
private JobConf jobConf;
private Executor executor;
private final ExecutorService executor;
// cache from <dbname-tblname> -> <values of partitions>
private LoadingCache<PartitionValueCacheKey, HivePartitionValues> partitionValuesCache;
@ -121,7 +119,7 @@ public class HiveMetaStoreCache {
private volatile AtomicReference<LoadingCache<FileCacheKey, FileCacheValue>> fileCacheRef
= new AtomicReference<>();
public HiveMetaStoreCache(HMSExternalCatalog catalog, Executor executor) {
public HiveMetaStoreCache(HMSExternalCatalog catalog, ExecutorService executor) {
this.catalog = catalog;
this.executor = executor;
init();
@ -129,24 +127,40 @@ public class HiveMetaStoreCache {
}
private void init() {
partitionValuesCache = CacheBuilder.newBuilder().maximumSize(Config.max_hive_table_catch_num)
/**
* Because the partitionValuesCache|partitionCache|fileCache use the same executor for batch loading,
* we need to be very careful and try to avoid the circular dependency of there tasks
* which will bring out thread deak-locks.
* */
partitionValuesCache = CacheBuilder.newBuilder().maximumSize(Config.max_hive_table_cache_num)
.expireAfterAccess(Config.external_cache_expire_time_minutes_after_access, TimeUnit.MINUTES)
.build(CacheLoader.asyncReloading(
new CacheLoader<PartitionValueCacheKey, HivePartitionValues>() {
@Override
public HivePartitionValues load(PartitionValueCacheKey key) throws Exception {
return loadPartitionValues(key);
}
}, executor));
.build(new CacheBulkLoader<PartitionValueCacheKey, HivePartitionValues>() {
@Override
protected ExecutorService getExecutor() {
return HiveMetaStoreCache.this.executor;
}
@Override
public HivePartitionValues load(PartitionValueCacheKey key) {
return loadPartitionValues(key);
}
});
partitionCache = CacheBuilder.newBuilder().maximumSize(Config.max_hive_partition_cache_num)
.expireAfterAccess(Config.external_cache_expire_time_minutes_after_access, TimeUnit.MINUTES)
.build(CacheLoader.asyncReloading(new CacheLoader<PartitionCacheKey, HivePartition>() {
.build(new CacheBulkLoader<PartitionCacheKey, HivePartition>() {
@Override
public HivePartition load(PartitionCacheKey key) throws Exception {
protected ExecutorService getExecutor() {
return HiveMetaStoreCache.this.executor;
}
@Override
public HivePartition load(PartitionCacheKey key) {
return loadPartitions(key);
}
}, executor));
});
setNewFileCache();
}
@ -169,10 +183,18 @@ public class HiveMetaStoreCache {
if (fileMetaCacheTtlSecond >= HMSExternalCatalog.FILE_META_CACHE_TTL_DISABLE_CACHE) {
fileCacheBuilder.expireAfterWrite(fileMetaCacheTtlSecond, TimeUnit.SECONDS);
}
// if the file.meta.cache.ttl-second is equal 0, use the synchronous loader
// if the file.meta.cache.ttl-second greater than 0, use the asynchronous loader
CacheLoader<FileCacheKey, FileCacheValue> loader = getGuavaCacheLoader(executor,
fileMetaCacheTtlSecond);
CacheLoader<FileCacheKey, FileCacheValue> loader = new CacheBulkLoader<FileCacheKey, FileCacheValue>() {
@Override
protected ExecutorService getExecutor() {
return HiveMetaStoreCache.this.executor;
}
@Override
public FileCacheValue load(FileCacheKey key) {
return loadFiles(key);
}
};
LoadingCache<FileCacheKey, FileCacheValue> preFileCache = fileCacheRef.get();
@ -374,6 +396,13 @@ public class HiveMetaStoreCache {
}
}
// Replace default hive partition with a null_string.
for (int i = 0; i < result.getValuesSize(); i++) {
if (HIVE_DEFAULT_PARTITION.equals(result.getPartitionValues().get(i))) {
result.getPartitionValues().set(i, FeConstants.null_string);
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("load #{} splits for {} in catalog {}", result.getFiles().size(), key, catalog.getName());
}
@ -425,36 +454,23 @@ public class HiveMetaStoreCache {
public List<FileCacheValue> getFilesByPartitions(List<HivePartition> partitions, boolean useSelfSplitter) {
long start = System.currentTimeMillis();
List<FileCacheKey> keys = Lists.newArrayListWithExpectedSize(partitions.size());
partitions.stream().forEach(p -> {
List<FileCacheKey> keys = partitions.stream().map(p -> {
FileCacheKey fileCacheKey = p.isDummyPartition()
? FileCacheKey.createDummyCacheKey(p.getDbName(), p.getTblName(), p.getPath(),
p.getInputFormat(), useSelfSplitter)
: new FileCacheKey(p.getPath(), p.getInputFormat(), p.getPartitionValues());
fileCacheKey.setUseSelfSplitter(useSelfSplitter);
keys.add(fileCacheKey);
});
Stream<FileCacheKey> stream;
if (partitions.size() < MIN_BATCH_FETCH_PARTITION_NUM) {
stream = keys.stream();
} else {
stream = keys.parallelStream();
}
List<FileCacheValue> fileLists = stream.map(k -> {
try {
FileCacheValue fileCacheValue = fileCacheRef.get().get(k);
// Replace default hive partition with a null_string.
for (int i = 0; i < fileCacheValue.getValuesSize(); i++) {
if (HIVE_DEFAULT_PARTITION.equals(fileCacheValue.getPartitionValues().get(i))) {
fileCacheValue.getPartitionValues().set(i, FeConstants.null_string);
}
}
return fileCacheValue;
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
return fileCacheKey;
}).collect(Collectors.toList());
List<FileCacheValue> fileLists;
try {
fileLists = fileCacheRef.get().getAll(keys).values().asList();
} catch (ExecutionException e) {
throw new CacheException("failed to get files from partitions in catalog %s",
e, catalog.getName());
}
LOG.debug("get #{} files from #{} partitions in catalog {} cost: {} ms",
fileLists.stream().mapToInt(l -> l.getFiles() == null
? (l.getSplits() == null ? 0 : l.getSplits().size()) : l.getFiles().size()).sum(),
@ -464,22 +480,17 @@ public class HiveMetaStoreCache {
public List<HivePartition> getAllPartitions(String dbName, String name, List<List<String>> partitionValuesList) {
long start = System.currentTimeMillis();
List<PartitionCacheKey> keys = Lists.newArrayListWithExpectedSize(partitionValuesList.size());
partitionValuesList.stream().forEach(p -> keys.add(new PartitionCacheKey(dbName, name, p)));
List<PartitionCacheKey> keys = partitionValuesList.stream()
.map(p -> new PartitionCacheKey(dbName, name, p))
.collect(Collectors.toList());
Stream<PartitionCacheKey> stream;
if (partitionValuesList.size() < MIN_BATCH_FETCH_PARTITION_NUM) {
stream = keys.stream();
} else {
stream = keys.parallelStream();
List<HivePartition> partitions;
try {
partitions = partitionCache.getAll(keys).values().asList();
} catch (ExecutionException e) {
throw new CacheException("failed to get partition in catalog %s", e, catalog.getName());
}
List<HivePartition> partitions = stream.map(k -> {
try {
return partitionCache.get(k);
} catch (ExecutionException e) {
throw new CacheException("failed to get partition for %s in catalog %s", e, k, catalog.getName());
}
}).collect(Collectors.toList());
LOG.debug("get #{} partitions in catalog {} cost: {} ms", partitions.size(), catalog.getName(),
(System.currentTimeMillis() - start));
return partitions;
@ -665,30 +676,6 @@ public class HiveMetaStoreCache {
partitionValuesCache.put(key, values);
}
/***
* get the guava CacheLoader
* if the fileMetaCacheTtlSecond equal 0 , the synchronous loader is used
* if the fileMetaCacheTtlSecond greater than 0 , the asynchronous loader is used
* @param executor
* @param fileMetaCacheTtlSecond
* @return
*/
private CacheLoader<FileCacheKey, FileCacheValue> getGuavaCacheLoader(Executor executor,
int fileMetaCacheTtlSecond) {
CacheLoader<FileCacheKey, FileCacheValue> loader =
new CacheLoader<FileCacheKey, FileCacheValue>() {
@Override
public FileCacheValue load(FileCacheKey key) throws Exception {
return loadFiles(key);
}
};
if (fileMetaCacheTtlSecond == HMSExternalCatalog.FILE_META_CACHE_TTL_DISABLE_CACHE) {
return loader;
} else {
return CacheLoader.asyncReloading(loader, executor);
}
}
/***
* get fileCache ref
* @return

View File

@ -0,0 +1,84 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.common;
import org.apache.doris.common.util.CacheBulkLoader;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.LoadingCache;
import org.apache.commons.collections.MapUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class CacheBulkLoaderTest {
@Test
public void test() {
ThreadPoolExecutor executor = ThreadPoolManager.newDaemonFixedThreadPool(
10, 10, "TestThreadPool", 120, true);
LoadingCache<String, String> testCache = CacheBuilder.newBuilder().maximumSize(100)
.expireAfterAccess(1, TimeUnit.MINUTES)
.build(new CacheBulkLoader<String, String>() {
@Override
protected ExecutorService getExecutor() {
return executor;
}
@Override
public String load(String key) {
Assertions.assertTrue(Thread.currentThread().getName().startsWith("TestThreadPool"));
try {
Thread.sleep(100);
} catch (InterruptedException interruptedException) {
interruptedException.printStackTrace();
}
return key.replace("k", "v");
}
});
List<String> testKeys = IntStream.range(1, 101).boxed()
.map(i -> String.format("k%d", i)).collect(Collectors.toList());
try {
Map<String, String> vMap = testCache.getAll(testKeys);
Assertions.assertTrue(MapUtils.isNotEmpty(vMap) && vMap.size() == testKeys.size());
for (String key : vMap.keySet()) {
Assertions.assertTrue(key.replace("k", "v").equals(vMap.get(key)));
}
} catch (ExecutionException e) {
e.printStackTrace();
Assertions.fail();
}
try {
executor.shutdown();
executor.awaitTermination(10, TimeUnit.SECONDS);
} catch (Exception e) {
e.printStackTrace();
}
}
}