From 672acb8784cbe4e44c73d7ef618b2162543b64df Mon Sep 17 00:00:00 2001 From: Mingyu Chen Date: Fri, 4 Aug 2023 16:55:10 +0800 Subject: [PATCH] [fix](show-table-status) fix hive view NPE and external meta cache refresh issue (#22377) --- .../java/org/apache/doris/common/Config.java | 2 +- .../datasource/hive/HiveMetaStoreCache.java | 60 ++++++++++++++++--- .../doris/planner/external/HiveScanNode.java | 4 +- .../org/apache/doris/qe/ShowExecutor.java | 6 +- .../org/apache/doris/qe/StmtExecutor.java | 1 - .../doris/statistics/util/StatisticsUtil.java | 13 +++- 6 files changed, 70 insertions(+), 16 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 11c85db137..5721928b98 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2042,7 +2042,7 @@ public class Config extends ConfigBase { public static long statistics_sql_mem_limit_in_bytes = 2L * 1024 * 1024 * 1024; @ConfField(mutable = true, masterOnly = true, description = { - "用于强制设定内表的副本数,如果改参数大于零,则用户在建表时指定的副本数将被忽略,而使用本参数设置的值。" + "用于强制设定内表的副本数,如果该参数大于零,则用户在建表时指定的副本数将被忽略,而使用本参数设置的值。" + "同时,建表语句中指定的副本标签等参数会被忽略。该参数不影响包括创建分区、修改表属性的操作。该参数建议仅用于测试环境", "Used to force the number of replicas of the internal table. If the config is greater than zero, " + "the number of replicas specified by the user when creating the table will be ignored, " diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java index d3e4750c3b..e7e621948e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java @@ -31,6 +31,7 @@ import org.apache.doris.catalog.external.HMSExternalTable; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.FeConstants; +import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; import org.apache.doris.common.util.CacheBulkLoader; import org.apache.doris.common.util.S3Util; @@ -98,6 +99,7 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -499,8 +501,18 @@ public class HiveMetaStoreCache { } } - public List getFilesByPartitions(List partitions, - boolean useSelfSplitter) { + public List getFilesByPartitionsWithCache(List partitions, + boolean useSelfSplitter) { + return getFilesByPartitions(partitions, useSelfSplitter, true); + } + + public List getFilesByPartitionsWithoutCache(List partitions, + boolean useSelfSplitter) { + return getFilesByPartitions(partitions, useSelfSplitter, false); + } + + private List getFilesByPartitions(List partitions, + boolean useSelfSplitter, boolean withCache) { long start = System.currentTimeMillis(); List keys = partitions.stream().map(p -> { FileCacheKey fileCacheKey = p.isDummyPartition() @@ -513,28 +525,58 @@ public class HiveMetaStoreCache { List fileLists; try { - fileLists = fileCacheRef.get().getAll(keys).values().asList(); + if (withCache) { + fileLists = fileCacheRef.get().getAll(keys).values().asList(); + } else { + List>> pList = keys.stream() + .map(key -> Pair.of(key, executor.submit(() -> loadFiles(key)))) + .collect(Collectors.toList()); + + fileLists = Lists.newArrayListWithExpectedSize(keys.size()); + for (Pair> p : pList) { + fileLists.add(p.second.get()); + } + } } catch (ExecutionException e) { throw new CacheException("failed to get files from partitions in catalog %s", - e, catalog.getName()); + e, catalog.getName()); + } catch (InterruptedException e) { + throw new CacheException("failed to get files from partitions in catalog %s with interrupted exception", + e, catalog.getName()); } LOG.debug("get #{} files from #{} partitions in catalog {} cost: {} ms", fileLists.stream().mapToInt(l -> l.getFiles() == null - ? (l.getSplits() == null ? 0 : l.getSplits().size()) : l.getFiles().size()).sum(), + ? (l.getSplits() == null ? 0 : l.getSplits().size()) : l.getFiles().size()).sum(), partitions.size(), catalog.getName(), (System.currentTimeMillis() - start)); return fileLists; } - public List getAllPartitions(String dbName, String name, List> partitionValuesList) { + public List getAllPartitionsWithCache(String dbName, String name, + List> partitionValuesList) { + return getAllPartitions(dbName, name, partitionValuesList, true); + } + + public List getAllPartitionsWithoutCache(String dbName, String name, + List> partitionValuesList) { + return getAllPartitions(dbName, name, partitionValuesList, false); + } + + private List getAllPartitions(String dbName, String name, List> partitionValuesList, + boolean withCache) { long start = System.currentTimeMillis(); List keys = partitionValuesList.stream() - .map(p -> new PartitionCacheKey(dbName, name, p)) - .collect(Collectors.toList()); + .map(p -> new PartitionCacheKey(dbName, name, p)) + .collect(Collectors.toList()); List partitions; try { - partitions = partitionCache.getAll(keys).values().asList(); + if (withCache) { + partitions = partitionCache.getAll(keys).values().asList(); + } else { + Map map = loadPartitions(keys); + partitions = map.values().stream().collect(Collectors.toList()); + } } catch (ExecutionException e) { throw new CacheException("failed to get partition in catalog %s", e, catalog.getName()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java index f43a6b1dce..211f6e8056 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java @@ -151,7 +151,7 @@ public class HiveScanNode extends FileQueryScanNode { partitionValuesList.add(listPartitionItem.getItems().get(0).getPartitionValuesAsStringList()); } List allPartitions = - cache.getAllPartitions(hmsTable.getDbName(), hmsTable.getName(), partitionValuesList); + cache.getAllPartitionsWithCache(hmsTable.getDbName(), hmsTable.getName(), partitionValuesList); if (ConnectContext.get().getExecutor() != null) { ConnectContext.get().getExecutor().getSummaryProfile().setGetPartitionsFinishTime(); } @@ -197,7 +197,7 @@ public class HiveScanNode extends FileQueryScanNode { if (hiveTransaction != null) { fileCaches = getFileSplitByTransaction(cache, partitions); } else { - fileCaches = cache.getFilesByPartitions(partitions, useSelfSplitter); + fileCaches = cache.getFilesByPartitionsWithCache(partitions, useSelfSplitter); } if (ConnectContext.get().getExecutor() != null) { ConnectContext.get().getExecutor().getSummaryProfile().setGetPartitionFilesFinishTime(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java index 9ee8cb91c2..bd9a57ccb8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java @@ -882,7 +882,11 @@ public class ShowExecutor { // Row_format row.add(null); // Rows - row.add(String.valueOf(table.getRowCount())); + // Use estimatedRowCount(), not getRowCount(). + // because estimatedRowCount() is an async call, it will not block, and it will call getRowCount() + // finally. So that for some table(especially external table), + // we can get the row count without blocking. + row.add(String.valueOf(table.estimatedRowCount())); // Avg_row_length row.add(String.valueOf(table.getAvgRowLength())); // Data_length diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 59982ff6b0..cc445a9cbc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -1408,7 +1408,6 @@ public class StmtExecutor { } } - Span fetchResultSpan = context.getTracer().spanBuilder("fetch result").setParent(Context.current()).startSpan(); try (Scope scope = fetchResultSpan.makeCurrent()) { while (true) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java index 0c586a0e62..767e76d790 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java @@ -553,6 +553,9 @@ public class StatisticsUtil { * @return estimated row count */ public static long getRowCountFromFileList(HMSExternalTable table) { + if (table.isView()) { + return 0; + } HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr() .getMetaStoreCache((HMSExternalCatalog) table.getCatalog()); List partitionColumnTypes = table.getPartitionColumnTypes(); @@ -562,6 +565,9 @@ public class StatisticsUtil { int totalPartitionSize = 1; // Get table partitions from cache. if (!partitionColumnTypes.isEmpty()) { + // It is ok to get partition values from cache, + // no need to worry that this call will invalid or refresh the cache. + // because it has enough space to keep partition info of all tables in cache. partitionValues = cache.getPartitionValues(table.getDbName(), table.getName(), partitionColumnTypes); } if (partitionValues != null) { @@ -582,14 +588,17 @@ public class StatisticsUtil { for (PartitionItem item : partitionItems) { partitionValuesList.add(((ListPartitionItem) item).getItems().get(0).getPartitionValuesAsStringList()); } - hivePartitions = cache.getAllPartitions(table.getDbName(), table.getName(), partitionValuesList); + // get partitions without cache, so that it will not invalid the cache when executing + // non query request such as `show table status` + hivePartitions = cache.getAllPartitionsWithoutCache(table.getDbName(), table.getName(), + partitionValuesList); } else { hivePartitions.add(new HivePartition(table.getDbName(), table.getName(), true, table.getRemoteTable().getSd().getInputFormat(), table.getRemoteTable().getSd().getLocation(), null)); } // Get files for all partitions. - List filesByPartitions = cache.getFilesByPartitions( + List filesByPartitions = cache.getFilesByPartitionsWithoutCache( hivePartitions, true); long totalSize = 0; // Calculate the total file size.