## Proposed changes bp: #40729
This commit is contained in:
@ -101,7 +101,7 @@ public class ExternalMetaCacheMgr {
|
||||
|
||||
commonRefreshExecutor = ThreadPoolManager.newDaemonFixedThreadPool(
|
||||
Config.max_external_cache_loader_thread_pool_size,
|
||||
Config.max_external_cache_loader_thread_pool_size * 1000,
|
||||
Config.max_external_cache_loader_thread_pool_size * 10000,
|
||||
"CommonRefreshExecutor", 10, true);
|
||||
|
||||
// The queue size should be large enough,
|
||||
|
||||
@ -149,7 +149,7 @@ public class HiveMetaStoreCache {
|
||||
|
||||
CacheFactory partitionCacheFactory = new CacheFactory(
|
||||
OptionalLong.of(28800L),
|
||||
OptionalLong.of(Config.external_cache_expire_time_minutes_after_access * 60L),
|
||||
OptionalLong.empty(),
|
||||
Config.max_hive_partition_cache_num,
|
||||
true,
|
||||
null);
|
||||
@ -481,7 +481,8 @@ public class HiveMetaStoreCache {
|
||||
List<FileCacheKey> keys = partitions.stream().map(p -> p.isDummyPartition()
|
||||
? FileCacheKey.createDummyCacheKey(
|
||||
p.getDbName(), p.getTblName(), p.getPath(), p.getInputFormat(), bindBrokerName)
|
||||
: new FileCacheKey(p.getPath(), p.getInputFormat(), p.getPartitionValues(), bindBrokerName))
|
||||
: new FileCacheKey(p.getDbName(), p.getTblName(), p.getPath(),
|
||||
p.getInputFormat(), p.getPartitionValues(), bindBrokerName))
|
||||
.collect(Collectors.toList());
|
||||
|
||||
List<FileCacheValue> fileLists;
|
||||
@ -553,38 +554,19 @@ public class HiveMetaStoreCache {
|
||||
}
|
||||
|
||||
public void invalidateTableCache(String dbName, String tblName) {
|
||||
PartitionValueCacheKey key = new PartitionValueCacheKey(dbName, tblName, null);
|
||||
HivePartitionValues partitionValues = partitionValuesCache.getIfPresent(key);
|
||||
if (partitionValues != null) {
|
||||
long start = System.currentTimeMillis();
|
||||
for (List<String> values : partitionValues.partitionValuesMap.values()) {
|
||||
PartitionCacheKey partKey = new PartitionCacheKey(dbName, tblName, values);
|
||||
HivePartition partition = partitionCache.getIfPresent(partKey);
|
||||
if (partition != null) {
|
||||
fileCacheRef.get().invalidate(new FileCacheKey(partition.getPath(),
|
||||
null, partition.getPartitionValues(), null));
|
||||
partitionCache.invalidate(partKey);
|
||||
}
|
||||
partitionValuesCache.invalidate(new PartitionValueCacheKey(dbName, tblName, null));
|
||||
partitionCache.asMap().keySet().forEach(k -> {
|
||||
if (k.isSameTable(dbName, tblName)) {
|
||||
partitionCache.invalidate(k);
|
||||
}
|
||||
partitionValuesCache.invalidate(key);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("invalid table cache for {}.{} in catalog {}, cache num: {}, cost: {} ms",
|
||||
dbName, tblName, catalog.getName(), partitionValues.partitionValuesMap.size(),
|
||||
(System.currentTimeMillis() - start));
|
||||
});
|
||||
long id = Util.genIdByName(dbName, tblName);
|
||||
LoadingCache<FileCacheKey, FileCacheValue> fileCache = fileCacheRef.get();
|
||||
fileCache.asMap().keySet().forEach(k -> {
|
||||
if (k.isSameTable(id)) {
|
||||
fileCache.invalidate(k);
|
||||
}
|
||||
} else {
|
||||
/**
|
||||
* A file cache entry can be created reference to
|
||||
* {@link org.apache.doris.planner.external.HiveSplitter#getSplits},
|
||||
* so we need to invalidate it if this is a non-partitioned table.
|
||||
* We use {@link org.apache.doris.datasource.hive.HiveMetaStoreCache.FileCacheKey#createDummyCacheKey}
|
||||
* to avoid invocation by Hms Client, because this method may be invoked when salve FE replay journal logs,
|
||||
* and FE will exit if some network problems occur.
|
||||
* */
|
||||
FileCacheKey fileCacheKey = FileCacheKey.createDummyCacheKey(
|
||||
dbName, tblName, null, null, null);
|
||||
fileCacheRef.get().invalidate(fileCacheKey);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public void invalidatePartitionCache(String dbName, String tblName, String partitionName) {
|
||||
@ -596,7 +578,7 @@ public class HiveMetaStoreCache {
|
||||
PartitionCacheKey partKey = new PartitionCacheKey(dbName, tblName, values);
|
||||
HivePartition partition = partitionCache.getIfPresent(partKey);
|
||||
if (partition != null) {
|
||||
fileCacheRef.get().invalidate(new FileCacheKey(partition.getPath(),
|
||||
fileCacheRef.get().invalidate(new FileCacheKey(dbName, tblName, partition.getPath(),
|
||||
null, partition.getPartitionValues(), null));
|
||||
partitionCache.invalidate(partKey);
|
||||
}
|
||||
@ -740,10 +722,21 @@ public class HiveMetaStoreCache {
|
||||
* get fileCache ref
|
||||
* @return
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public AtomicReference<LoadingCache<FileCacheKey, FileCacheValue>> getFileCacheRef() {
|
||||
return fileCacheRef;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public LoadingCache<PartitionValueCacheKey, HivePartitionValues> getPartitionValuesCache() {
|
||||
return partitionValuesCache;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public LoadingCache<PartitionCacheKey, HivePartition> getPartitionCache() {
|
||||
return partitionCache;
|
||||
}
|
||||
|
||||
public List<FileCacheValue> getFilesByTransaction(List<HivePartition> partitions, ValidWriteIdList validWriteIds,
|
||||
boolean isFullAcid, long tableId, String bindBrokerName) {
|
||||
List<FileCacheValue> fileCacheValues = Lists.newArrayList();
|
||||
@ -925,6 +918,10 @@ public class HiveMetaStoreCache {
|
||||
&& Objects.equals(values, ((PartitionCacheKey) obj).values);
|
||||
}
|
||||
|
||||
boolean isSameTable(String dbName, String tblName) {
|
||||
return this.dbName.equals(dbName) && this.tblName.equals(tblName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(dbName, tblName, values);
|
||||
@ -949,18 +946,21 @@ public class HiveMetaStoreCache {
|
||||
// e.g for file : hdfs://path/to/table/part1=a/part2=b/datafile
|
||||
// partitionValues would be ["part1", "part2"]
|
||||
protected List<String> partitionValues;
|
||||
private long id;
|
||||
|
||||
public FileCacheKey(String location, String inputFormat, List<String> partitionValues, String bindBrokerName) {
|
||||
public FileCacheKey(String dbName, String tblName, String location, String inputFormat,
|
||||
List<String> partitionValues, String bindBrokerName) {
|
||||
this.location = location;
|
||||
this.inputFormat = inputFormat;
|
||||
this.partitionValues = partitionValues == null ? Lists.newArrayList() : partitionValues;
|
||||
this.bindBrokerName = bindBrokerName;
|
||||
this.id = Util.genIdByName(dbName, tblName);
|
||||
}
|
||||
|
||||
public static FileCacheKey createDummyCacheKey(String dbName, String tblName, String location,
|
||||
String inputFormat,
|
||||
String bindBrokerName) {
|
||||
FileCacheKey fileCacheKey = new FileCacheKey(location, inputFormat, null, bindBrokerName);
|
||||
FileCacheKey fileCacheKey = new FileCacheKey(dbName, tblName, location, inputFormat, null, bindBrokerName);
|
||||
fileCacheKey.dummyKey = dbName + "." + tblName;
|
||||
return fileCacheKey;
|
||||
}
|
||||
@ -980,6 +980,10 @@ public class HiveMetaStoreCache {
|
||||
&& Objects.equals(partitionValues, ((FileCacheKey) obj).partitionValues);
|
||||
}
|
||||
|
||||
boolean isSameTable(long id) {
|
||||
return this.id == id;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
if (dummyKey != null) {
|
||||
|
||||
@ -0,0 +1,92 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.datasource.hive;
|
||||
|
||||
import org.apache.doris.common.ThreadPoolManager;
|
||||
|
||||
import com.github.benmanes.caffeine.cache.LoadingCache;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
|
||||
public class HiveMetaStoreCacheTest {
|
||||
|
||||
@Test
|
||||
public void testInvalidateTableCache() {
|
||||
ThreadPoolExecutor executor = ThreadPoolManager.newDaemonFixedThreadPool(
|
||||
1, 1, "refresh", 1, false);
|
||||
ThreadPoolExecutor listExecutor = ThreadPoolManager.newDaemonFixedThreadPool(
|
||||
1, 1, "file", 1, false);
|
||||
|
||||
HiveMetaStoreCache hiveMetaStoreCache = new HiveMetaStoreCache(
|
||||
new HMSExternalCatalog(1L, "catalog", null, new HashMap<>(), null), executor, listExecutor);
|
||||
|
||||
LoadingCache<HiveMetaStoreCache.FileCacheKey, HiveMetaStoreCache.FileCacheValue> fileCache = hiveMetaStoreCache.getFileCacheRef().get();
|
||||
LoadingCache<HiveMetaStoreCache.PartitionCacheKey, HivePartition> partitionCache = hiveMetaStoreCache.getPartitionCache();
|
||||
LoadingCache<HiveMetaStoreCache.PartitionValueCacheKey, HiveMetaStoreCache.HivePartitionValues> partitionValuesCache = hiveMetaStoreCache.getPartitionValuesCache();
|
||||
|
||||
String dbName = "db";
|
||||
String tbName = "tb";
|
||||
String tbName2 = "tb2";
|
||||
|
||||
putCache(fileCache, partitionCache, partitionValuesCache, dbName, tbName);
|
||||
Assertions.assertEquals(2, fileCache.asMap().size());
|
||||
Assertions.assertEquals(1, partitionCache.asMap().size());
|
||||
Assertions.assertEquals(1, partitionValuesCache.asMap().size());
|
||||
|
||||
putCache(fileCache, partitionCache, partitionValuesCache, dbName, tbName2);
|
||||
Assertions.assertEquals(4, fileCache.asMap().size());
|
||||
Assertions.assertEquals(2, partitionCache.asMap().size());
|
||||
Assertions.assertEquals(2, partitionValuesCache.asMap().size());
|
||||
|
||||
hiveMetaStoreCache.invalidateTableCache(dbName, tbName2);
|
||||
Assertions.assertEquals(2, fileCache.asMap().size());
|
||||
Assertions.assertEquals(1, partitionCache.asMap().size());
|
||||
Assertions.assertEquals(1, partitionValuesCache.asMap().size());
|
||||
|
||||
hiveMetaStoreCache.invalidateTableCache(dbName, tbName);
|
||||
Assertions.assertEquals(0, fileCache.asMap().size());
|
||||
Assertions.assertEquals(0, partitionCache.asMap().size());
|
||||
Assertions.assertEquals(0, partitionValuesCache.asMap().size());
|
||||
}
|
||||
|
||||
private void putCache(
|
||||
LoadingCache<HiveMetaStoreCache.FileCacheKey, HiveMetaStoreCache.FileCacheValue> fileCache,
|
||||
LoadingCache<HiveMetaStoreCache.PartitionCacheKey, HivePartition> partitionCache,
|
||||
LoadingCache<HiveMetaStoreCache.PartitionValueCacheKey, HiveMetaStoreCache.HivePartitionValues> partitionValuesCache,
|
||||
String dbName, String tbName) {
|
||||
HiveMetaStoreCache.FileCacheKey fileCacheKey1 = new HiveMetaStoreCache.FileCacheKey(dbName, tbName, tbName, "", new ArrayList<>(), null);
|
||||
HiveMetaStoreCache.FileCacheKey fileCacheKey2 = HiveMetaStoreCache.FileCacheKey.createDummyCacheKey(dbName, tbName, tbName, "", null);
|
||||
fileCache.put(fileCacheKey1, new HiveMetaStoreCache.FileCacheValue());
|
||||
fileCache.put(fileCacheKey2, new HiveMetaStoreCache.FileCacheValue());
|
||||
|
||||
HiveMetaStoreCache.PartitionCacheKey partitionCacheKey = new HiveMetaStoreCache.PartitionCacheKey(
|
||||
dbName,
|
||||
tbName,
|
||||
new ArrayList<>()
|
||||
);
|
||||
partitionCache.put(partitionCacheKey, new HivePartition(dbName, tbName, false, "", "", new ArrayList<>(), new HashMap<>()));
|
||||
|
||||
HiveMetaStoreCache.PartitionValueCacheKey partitionValueCacheKey = new HiveMetaStoreCache.PartitionValueCacheKey(dbName, tbName, new ArrayList<>());
|
||||
partitionValuesCache.put(partitionValueCacheKey, new HiveMetaStoreCache.HivePartitionValues());
|
||||
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user