[fix](show-table-status) fix hive view NPE and external meta cache refresh issue (#22377)
This commit is contained in:
@ -2042,7 +2042,7 @@ public class Config extends ConfigBase {
|
||||
public static long statistics_sql_mem_limit_in_bytes = 2L * 1024 * 1024 * 1024;
|
||||
|
||||
@ConfField(mutable = true, masterOnly = true, description = {
|
||||
"用于强制设定内表的副本数,如果改参数大于零,则用户在建表时指定的副本数将被忽略,而使用本参数设置的值。"
|
||||
"用于强制设定内表的副本数,如果该参数大于零,则用户在建表时指定的副本数将被忽略,而使用本参数设置的值。"
|
||||
+ "同时,建表语句中指定的副本标签等参数会被忽略。该参数不影响包括创建分区、修改表属性的操作。该参数建议仅用于测试环境",
|
||||
"Used to force the number of replicas of the internal table. If the config is greater than zero, "
|
||||
+ "the number of replicas specified by the user when creating the table will be ignored, "
|
||||
|
||||
@ -31,6 +31,7 @@ import org.apache.doris.catalog.external.HMSExternalTable;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.common.util.CacheBulkLoader;
|
||||
import org.apache.doris.common.util.S3Util;
|
||||
@ -98,6 +99,7 @@ import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.stream.Collectors;
|
||||
@ -499,8 +501,18 @@ public class HiveMetaStoreCache {
|
||||
}
|
||||
}
|
||||
|
||||
public List<FileCacheValue> getFilesByPartitions(List<HivePartition> partitions,
|
||||
boolean useSelfSplitter) {
|
||||
public List<FileCacheValue> getFilesByPartitionsWithCache(List<HivePartition> partitions,
|
||||
boolean useSelfSplitter) {
|
||||
return getFilesByPartitions(partitions, useSelfSplitter, true);
|
||||
}
|
||||
|
||||
public List<FileCacheValue> getFilesByPartitionsWithoutCache(List<HivePartition> partitions,
|
||||
boolean useSelfSplitter) {
|
||||
return getFilesByPartitions(partitions, useSelfSplitter, false);
|
||||
}
|
||||
|
||||
private List<FileCacheValue> getFilesByPartitions(List<HivePartition> partitions,
|
||||
boolean useSelfSplitter, boolean withCache) {
|
||||
long start = System.currentTimeMillis();
|
||||
List<FileCacheKey> keys = partitions.stream().map(p -> {
|
||||
FileCacheKey fileCacheKey = p.isDummyPartition()
|
||||
@ -513,28 +525,58 @@ public class HiveMetaStoreCache {
|
||||
|
||||
List<FileCacheValue> fileLists;
|
||||
try {
|
||||
fileLists = fileCacheRef.get().getAll(keys).values().asList();
|
||||
if (withCache) {
|
||||
fileLists = fileCacheRef.get().getAll(keys).values().asList();
|
||||
} else {
|
||||
List<Pair<FileCacheKey, Future<FileCacheValue>>> pList = keys.stream()
|
||||
.map(key -> Pair.of(key, executor.submit(() -> loadFiles(key))))
|
||||
.collect(Collectors.toList());
|
||||
|
||||
fileLists = Lists.newArrayListWithExpectedSize(keys.size());
|
||||
for (Pair<FileCacheKey, Future<FileCacheValue>> p : pList) {
|
||||
fileLists.add(p.second.get());
|
||||
}
|
||||
}
|
||||
} catch (ExecutionException e) {
|
||||
throw new CacheException("failed to get files from partitions in catalog %s",
|
||||
e, catalog.getName());
|
||||
e, catalog.getName());
|
||||
} catch (InterruptedException e) {
|
||||
throw new CacheException("failed to get files from partitions in catalog %s with interrupted exception",
|
||||
e, catalog.getName());
|
||||
}
|
||||
|
||||
LOG.debug("get #{} files from #{} partitions in catalog {} cost: {} ms",
|
||||
fileLists.stream().mapToInt(l -> l.getFiles() == null
|
||||
? (l.getSplits() == null ? 0 : l.getSplits().size()) : l.getFiles().size()).sum(),
|
||||
? (l.getSplits() == null ? 0 : l.getSplits().size()) : l.getFiles().size()).sum(),
|
||||
partitions.size(), catalog.getName(), (System.currentTimeMillis() - start));
|
||||
return fileLists;
|
||||
}
|
||||
|
||||
public List<HivePartition> getAllPartitions(String dbName, String name, List<List<String>> partitionValuesList) {
|
||||
public List<HivePartition> getAllPartitionsWithCache(String dbName, String name,
|
||||
List<List<String>> partitionValuesList) {
|
||||
return getAllPartitions(dbName, name, partitionValuesList, true);
|
||||
}
|
||||
|
||||
public List<HivePartition> getAllPartitionsWithoutCache(String dbName, String name,
|
||||
List<List<String>> partitionValuesList) {
|
||||
return getAllPartitions(dbName, name, partitionValuesList, false);
|
||||
}
|
||||
|
||||
private List<HivePartition> getAllPartitions(String dbName, String name, List<List<String>> partitionValuesList,
|
||||
boolean withCache) {
|
||||
long start = System.currentTimeMillis();
|
||||
List<PartitionCacheKey> keys = partitionValuesList.stream()
|
||||
.map(p -> new PartitionCacheKey(dbName, name, p))
|
||||
.collect(Collectors.toList());
|
||||
.map(p -> new PartitionCacheKey(dbName, name, p))
|
||||
.collect(Collectors.toList());
|
||||
|
||||
List<HivePartition> partitions;
|
||||
try {
|
||||
partitions = partitionCache.getAll(keys).values().asList();
|
||||
if (withCache) {
|
||||
partitions = partitionCache.getAll(keys).values().asList();
|
||||
} else {
|
||||
Map<PartitionCacheKey, HivePartition> map = loadPartitions(keys);
|
||||
partitions = map.values().stream().collect(Collectors.toList());
|
||||
}
|
||||
} catch (ExecutionException e) {
|
||||
throw new CacheException("failed to get partition in catalog %s", e, catalog.getName());
|
||||
}
|
||||
|
||||
@ -151,7 +151,7 @@ public class HiveScanNode extends FileQueryScanNode {
|
||||
partitionValuesList.add(listPartitionItem.getItems().get(0).getPartitionValuesAsStringList());
|
||||
}
|
||||
List<HivePartition> allPartitions =
|
||||
cache.getAllPartitions(hmsTable.getDbName(), hmsTable.getName(), partitionValuesList);
|
||||
cache.getAllPartitionsWithCache(hmsTable.getDbName(), hmsTable.getName(), partitionValuesList);
|
||||
if (ConnectContext.get().getExecutor() != null) {
|
||||
ConnectContext.get().getExecutor().getSummaryProfile().setGetPartitionsFinishTime();
|
||||
}
|
||||
@ -197,7 +197,7 @@ public class HiveScanNode extends FileQueryScanNode {
|
||||
if (hiveTransaction != null) {
|
||||
fileCaches = getFileSplitByTransaction(cache, partitions);
|
||||
} else {
|
||||
fileCaches = cache.getFilesByPartitions(partitions, useSelfSplitter);
|
||||
fileCaches = cache.getFilesByPartitionsWithCache(partitions, useSelfSplitter);
|
||||
}
|
||||
if (ConnectContext.get().getExecutor() != null) {
|
||||
ConnectContext.get().getExecutor().getSummaryProfile().setGetPartitionFilesFinishTime();
|
||||
|
||||
@ -882,7 +882,11 @@ public class ShowExecutor {
|
||||
// Row_format
|
||||
row.add(null);
|
||||
// Rows
|
||||
row.add(String.valueOf(table.getRowCount()));
|
||||
// Use estimatedRowCount(), not getRowCount().
|
||||
// because estimatedRowCount() is an async call, it will not block, and it will call getRowCount()
|
||||
// finally. So that for some table(especially external table),
|
||||
// we can get the row count without blocking.
|
||||
row.add(String.valueOf(table.estimatedRowCount()));
|
||||
// Avg_row_length
|
||||
row.add(String.valueOf(table.getAvgRowLength()));
|
||||
// Data_length
|
||||
|
||||
@ -1408,7 +1408,6 @@ public class StmtExecutor {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Span fetchResultSpan = context.getTracer().spanBuilder("fetch result").setParent(Context.current()).startSpan();
|
||||
try (Scope scope = fetchResultSpan.makeCurrent()) {
|
||||
while (true) {
|
||||
|
||||
@ -553,6 +553,9 @@ public class StatisticsUtil {
|
||||
* @return estimated row count
|
||||
*/
|
||||
public static long getRowCountFromFileList(HMSExternalTable table) {
|
||||
if (table.isView()) {
|
||||
return 0;
|
||||
}
|
||||
HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
|
||||
.getMetaStoreCache((HMSExternalCatalog) table.getCatalog());
|
||||
List<Type> partitionColumnTypes = table.getPartitionColumnTypes();
|
||||
@ -562,6 +565,9 @@ public class StatisticsUtil {
|
||||
int totalPartitionSize = 1;
|
||||
// Get table partitions from cache.
|
||||
if (!partitionColumnTypes.isEmpty()) {
|
||||
// It is ok to get partition values from cache,
|
||||
// no need to worry that this call will invalid or refresh the cache.
|
||||
// because it has enough space to keep partition info of all tables in cache.
|
||||
partitionValues = cache.getPartitionValues(table.getDbName(), table.getName(), partitionColumnTypes);
|
||||
}
|
||||
if (partitionValues != null) {
|
||||
@ -582,14 +588,17 @@ public class StatisticsUtil {
|
||||
for (PartitionItem item : partitionItems) {
|
||||
partitionValuesList.add(((ListPartitionItem) item).getItems().get(0).getPartitionValuesAsStringList());
|
||||
}
|
||||
hivePartitions = cache.getAllPartitions(table.getDbName(), table.getName(), partitionValuesList);
|
||||
// get partitions without cache, so that it will not invalid the cache when executing
|
||||
// non query request such as `show table status`
|
||||
hivePartitions = cache.getAllPartitionsWithoutCache(table.getDbName(), table.getName(),
|
||||
partitionValuesList);
|
||||
} else {
|
||||
hivePartitions.add(new HivePartition(table.getDbName(), table.getName(), true,
|
||||
table.getRemoteTable().getSd().getInputFormat(),
|
||||
table.getRemoteTable().getSd().getLocation(), null));
|
||||
}
|
||||
// Get files for all partitions.
|
||||
List<HiveMetaStoreCache.FileCacheValue> filesByPartitions = cache.getFilesByPartitions(
|
||||
List<HiveMetaStoreCache.FileCacheValue> filesByPartitions = cache.getFilesByPartitionsWithoutCache(
|
||||
hivePartitions, true);
|
||||
long totalSize = 0;
|
||||
// Calculate the total file size.
|
||||
|
||||
Reference in New Issue
Block a user