diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index fd4e19f574..550961b61b 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2023,4 +2023,9 @@ public class Config extends ConfigBase { "是否禁止使用 WITH REOSOURCE 语句创建 Catalog。", "Whether to disable creating catalog with WITH RESOURCE statement."}) public static boolean disallow_create_catalog_with_resource = true; + + @ConfField(mutable = true, masterOnly = false, description = { + "Hive行数估算分区采样数", + "Sample size for hive row count estimation."}) + public static int hive_stats_partition_sample_size = 3000; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java index c9f03fd80c..1aee1803ae 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java @@ -79,9 +79,7 @@ public class HMSExternalTable extends ExternalTable { private static final String TBL_PROP_TXN_PROPERTIES = "transactional_properties"; private static final String TBL_PROP_INSERT_ONLY = "insert_only"; - - public static final String NUM_ROWS = "numRows"; - public static final String NUM_FILES = "numFiles"; + private static final String NUM_ROWS = "numRows"; static { SUPPORTED_HIVE_FILE_FORMATS = Sets.newHashSet(); @@ -269,7 +267,24 @@ public class HMSExternalTable extends ExternalTable { @Override public long getRowCount() { - return 0; + makeSureInitialized(); + long rowCount; + switch (dlaType) { + case HIVE: + rowCount = StatisticsUtil.getHiveRowCount(this); + break; + case ICEBERG: + rowCount = StatisticsUtil.getIcebergRowCount(this); + break; + default: + LOG.warn("getRowCount for dlaType {} is not supported.", dlaType); + rowCount = -1; + } + if (rowCount == -1) { + LOG.debug("Will estimate row count from file list."); + rowCount = StatisticsUtil.getRowCountFromFileList(this); + } + return rowCount; } @Override @@ -416,10 +431,12 @@ public class HMSExternalTable extends ExternalTable { Optional tableStatistics = Env.getCurrentEnv().getStatisticsCache().getTableStatistics( catalog.getId(), catalog.getDbOrAnalysisException(dbName).getId(), id); if (tableStatistics.isPresent()) { - return tableStatistics.get().rowCount; + long rowCount = tableStatistics.get().rowCount; + LOG.debug("Estimated row count for db {} table {} is {}.", dbName, name, rowCount); + return rowCount; } } catch (Exception e) { - LOG.warn(String.format("Fail to get row count for table %s", name), e); + LOG.warn("Fail to get row count for table {}", name, e); } return 1; } @@ -576,6 +593,5 @@ public class HMSExternalTable extends ExternalTable { builder.setMaxValue(Double.MAX_VALUE); } } - } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java index 89167d64f2..eb9572bb74 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java @@ -44,6 +44,8 @@ public class StatisticConstants { public static final int STATISTICS_CACHE_REFRESH_INTERVAL = 24 * 2; + public static final int ROW_COUNT_CACHE_VALID_DURATION_IN_HOURS = 12; + /** * Bucket count fot column_statistics and analysis_job table. */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java index 2963cbea00..f46e19f529 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java @@ -72,7 +72,7 @@ public class StatisticsCache { private final AsyncLoadingCache> tableStatisticsCache = Caffeine.newBuilder() .maximumSize(Config.stats_cache_size) - .expireAfterWrite(Duration.ofHours(StatisticConstants.STATISTICS_CACHE_VALID_DURATION_IN_HOURS)) + .expireAfterWrite(Duration.ofHours(StatisticConstants.ROW_COUNT_CACHE_VALID_DURATION_IN_HOURS)) .executor(threadPool) .buildAsync(tableStatisticsCacheLoader); @@ -143,8 +143,9 @@ public class StatisticsCache { StatisticsCacheKey k = new StatisticsCacheKey(catalogId, dbId, tableId); try { CompletableFuture> f = tableStatisticsCache.get(k); - // Synchronous return the cache value for table row count. - return f.get(); + if (f.isDone()) { + return f.get(); + } } catch (Exception e) { LOG.warn("Unexpected exception while returning Histogram", e); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatisticsCacheLoader.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatisticsCacheLoader.java index 6847dd6b97..817e74540f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatisticsCacheLoader.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatisticsCacheLoader.java @@ -49,7 +49,7 @@ public class TableStatisticsCacheLoader extends StatisticsCacheLoader executeQuery(String template, Map params) { @@ -461,4 +479,119 @@ public class StatisticsUtil { return (int) (healthCoefficient * 100.0); } } + + /** + * Estimate hive table row count. + * First get it from remote table parameters. If not found, estimate it : totalSize/estimatedRowSize + * @param table Hive HMSExternalTable to estimate row count. + * @return estimated row count + */ + public static long getHiveRowCount(HMSExternalTable table) { + Map parameters = table.getRemoteTable().getParameters(); + if (parameters == null) { + return -1; + } + // Table parameters contains row count, simply get and return it. + if (parameters.containsKey(NUM_ROWS)) { + return Long.parseLong(parameters.get(NUM_ROWS)); + } + if (!parameters.containsKey(TOTAL_SIZE)) { + return -1; + } + // Table parameters doesn't contain row count but contain total size. Estimate row count : totalSize/rowSize + long totalSize = Long.parseLong(parameters.get(TOTAL_SIZE)); + long estimatedRowSize = 0; + for (Column column : table.getFullSchema()) { + estimatedRowSize += column.getDataType().getSlotSize(); + } + if (estimatedRowSize == 0) { + return 1; + } + return totalSize / estimatedRowSize; + } + + /** + * Estimate iceberg table row count. + * Get the row count by adding all task file recordCount. + * @param table Iceberg HMSExternalTable to estimate row count. + * @return estimated row count + */ + public static long getIcebergRowCount(HMSExternalTable table) { + long rowCount = 0; + try { + Table icebergTable = HiveMetaStoreClientHelper.getIcebergTable(table); + TableScan tableScan = icebergTable.newScan().includeColumnStats(); + for (FileScanTask task : tableScan.planFiles()) { + rowCount += task.file().recordCount(); + } + return rowCount; + } catch (Exception e) { + LOG.warn("Fail to collect row count for db {} table {}", table.getDbName(), table.getName(), e); + } + return -1; + } + + /** + * Estimate hive table row count : totalFileSize/estimatedRowSize + * @param table Hive HMSExternalTable to estimate row count. + * @return estimated row count + */ + public static long getRowCountFromFileList(HMSExternalTable table) { + HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr() + .getMetaStoreCache((HMSExternalCatalog) table.getCatalog()); + List partitionColumnTypes = table.getPartitionColumnTypes(); + HiveMetaStoreCache.HivePartitionValues partitionValues = null; + List hivePartitions = Lists.newArrayList(); + int samplePartitionSize = Config.hive_stats_partition_sample_size; + int totalPartitionSize = 1; + // Get table partitions from cache. + if (!partitionColumnTypes.isEmpty()) { + partitionValues = cache.getPartitionValues(table.getDbName(), table.getName(), partitionColumnTypes); + } + if (partitionValues != null) { + Map idToPartitionItem = partitionValues.getIdToPartitionItem(); + totalPartitionSize = idToPartitionItem.size(); + Collection partitionItems; + List> partitionValuesList; + // If partition number is too large, randomly choose part of them to estimate the whole table. + if (samplePartitionSize < totalPartitionSize) { + List items = new ArrayList<>(idToPartitionItem.values()); + Collections.shuffle(items); + partitionItems = items.subList(0, samplePartitionSize); + partitionValuesList = Lists.newArrayListWithCapacity(samplePartitionSize); + } else { + partitionItems = idToPartitionItem.values(); + partitionValuesList = Lists.newArrayListWithCapacity(totalPartitionSize); + } + for (PartitionItem item : partitionItems) { + partitionValuesList.add(((ListPartitionItem) item).getItems().get(0).getPartitionValuesAsStringList()); + } + hivePartitions = cache.getAllPartitions(table.getDbName(), table.getName(), partitionValuesList); + } else { + hivePartitions.add(new HivePartition(table.getDbName(), table.getName(), true, + table.getRemoteTable().getSd().getInputFormat(), + table.getRemoteTable().getSd().getLocation(), null)); + } + // Get files for all partitions. + List filesByPartitions = cache.getFilesByPartitions(hivePartitions, true); + long totalSize = 0; + // Calculate the total file size. + for (HiveMetaStoreCache.FileCacheValue files : filesByPartitions) { + for (HiveMetaStoreCache.HiveFileStatus file : files.getFiles()) { + totalSize += file.getLength(); + } + } + // Estimate row count: totalSize/estimatedRowSize + long estimatedRowSize = 0; + for (Column column : table.getFullSchema()) { + estimatedRowSize += column.getDataType().getSlotSize(); + } + if (estimatedRowSize == 0) { + return 1; + } + if (samplePartitionSize < totalPartitionSize) { + totalSize = totalSize * totalPartitionSize / samplePartitionSize; + } + return totalSize / estimatedRowSize; + } }