[improvement](statistics, multi catalog)Estimate hive table row count based on file size. (#21207)
Support estimate table row count based on file size. With sample size=3000 (total partition number is 87491), load cache time is 45s. With sample size=100000 (more than total partition number 87505), load cache time is 388s.
This commit is contained in:
@ -79,9 +79,7 @@ public class HMSExternalTable extends ExternalTable {
|
||||
|
||||
private static final String TBL_PROP_TXN_PROPERTIES = "transactional_properties";
|
||||
private static final String TBL_PROP_INSERT_ONLY = "insert_only";
|
||||
|
||||
public static final String NUM_ROWS = "numRows";
|
||||
public static final String NUM_FILES = "numFiles";
|
||||
private static final String NUM_ROWS = "numRows";
|
||||
|
||||
static {
|
||||
SUPPORTED_HIVE_FILE_FORMATS = Sets.newHashSet();
|
||||
@ -269,7 +267,24 @@ public class HMSExternalTable extends ExternalTable {
|
||||
|
||||
@Override
|
||||
public long getRowCount() {
|
||||
return 0;
|
||||
makeSureInitialized();
|
||||
long rowCount;
|
||||
switch (dlaType) {
|
||||
case HIVE:
|
||||
rowCount = StatisticsUtil.getHiveRowCount(this);
|
||||
break;
|
||||
case ICEBERG:
|
||||
rowCount = StatisticsUtil.getIcebergRowCount(this);
|
||||
break;
|
||||
default:
|
||||
LOG.warn("getRowCount for dlaType {} is not supported.", dlaType);
|
||||
rowCount = -1;
|
||||
}
|
||||
if (rowCount == -1) {
|
||||
LOG.debug("Will estimate row count from file list.");
|
||||
rowCount = StatisticsUtil.getRowCountFromFileList(this);
|
||||
}
|
||||
return rowCount;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -416,10 +431,12 @@ public class HMSExternalTable extends ExternalTable {
|
||||
Optional<TableStatistic> tableStatistics = Env.getCurrentEnv().getStatisticsCache().getTableStatistics(
|
||||
catalog.getId(), catalog.getDbOrAnalysisException(dbName).getId(), id);
|
||||
if (tableStatistics.isPresent()) {
|
||||
return tableStatistics.get().rowCount;
|
||||
long rowCount = tableStatistics.get().rowCount;
|
||||
LOG.debug("Estimated row count for db {} table {} is {}.", dbName, name, rowCount);
|
||||
return rowCount;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.warn(String.format("Fail to get row count for table %s", name), e);
|
||||
LOG.warn("Fail to get row count for table {}", name, e);
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
@ -576,6 +593,5 @@ public class HMSExternalTable extends ExternalTable {
|
||||
builder.setMaxValue(Double.MAX_VALUE);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
@ -44,6 +44,8 @@ public class StatisticConstants {
|
||||
|
||||
public static final int STATISTICS_CACHE_REFRESH_INTERVAL = 24 * 2;
|
||||
|
||||
public static final int ROW_COUNT_CACHE_VALID_DURATION_IN_HOURS = 12;
|
||||
|
||||
/**
|
||||
* Bucket count fot column_statistics and analysis_job table.
|
||||
*/
|
||||
|
||||
@ -72,7 +72,7 @@ public class StatisticsCache {
|
||||
private final AsyncLoadingCache<StatisticsCacheKey, Optional<TableStatistic>> tableStatisticsCache =
|
||||
Caffeine.newBuilder()
|
||||
.maximumSize(Config.stats_cache_size)
|
||||
.expireAfterWrite(Duration.ofHours(StatisticConstants.STATISTICS_CACHE_VALID_DURATION_IN_HOURS))
|
||||
.expireAfterWrite(Duration.ofHours(StatisticConstants.ROW_COUNT_CACHE_VALID_DURATION_IN_HOURS))
|
||||
.executor(threadPool)
|
||||
.buildAsync(tableStatisticsCacheLoader);
|
||||
|
||||
@ -143,8 +143,9 @@ public class StatisticsCache {
|
||||
StatisticsCacheKey k = new StatisticsCacheKey(catalogId, dbId, tableId);
|
||||
try {
|
||||
CompletableFuture<Optional<TableStatistic>> f = tableStatisticsCache.get(k);
|
||||
// Synchronous return the cache value for table row count.
|
||||
return f.get();
|
||||
if (f.isDone()) {
|
||||
return f.get();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Unexpected exception while returning Histogram", e);
|
||||
}
|
||||
|
||||
@ -49,7 +49,7 @@ public class TableStatisticsCacheLoader extends StatisticsCacheLoader<Optional<T
|
||||
long rowCount = table.getRowCount();
|
||||
long lastAnalyzeTimeInMs = System.currentTimeMillis();
|
||||
String updateTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(lastAnalyzeTimeInMs));
|
||||
Optional.of(new TableStatistic(rowCount, lastAnalyzeTimeInMs, updateTime));
|
||||
return Optional.of(new TableStatistic(rowCount, lastAnalyzeTimeInMs, updateTime));
|
||||
} catch (Exception e) {
|
||||
LOG.warn(String.format("Fail to get row count for table %d", key.tableId), e);
|
||||
}
|
||||
|
||||
@ -32,17 +32,25 @@ import org.apache.doris.analysis.UserIdentity;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.DatabaseIf;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.HiveMetaStoreClientHelper;
|
||||
import org.apache.doris.catalog.ListPartitionItem;
|
||||
import org.apache.doris.catalog.OlapTable;
|
||||
import org.apache.doris.catalog.Partition;
|
||||
import org.apache.doris.catalog.PartitionItem;
|
||||
import org.apache.doris.catalog.PrimitiveType;
|
||||
import org.apache.doris.catalog.ScalarType;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.catalog.Type;
|
||||
import org.apache.doris.catalog.external.HMSExternalTable;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.datasource.CatalogIf;
|
||||
import org.apache.doris.datasource.HMSExternalCatalog;
|
||||
import org.apache.doris.datasource.InternalCatalog;
|
||||
import org.apache.doris.datasource.hive.HiveMetaStoreCache;
|
||||
import org.apache.doris.datasource.hive.HivePartition;
|
||||
import org.apache.doris.nereids.trees.expressions.literal.DateTimeLiteral;
|
||||
import org.apache.doris.nereids.trees.expressions.literal.VarcharLiteral;
|
||||
import org.apache.doris.qe.AutoCloseConnectContext;
|
||||
@ -59,9 +67,15 @@ import org.apache.doris.system.SystemInfoService;
|
||||
import org.apache.doris.thrift.TUniqueId;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.commons.text.StringSubstitutor;
|
||||
import org.apache.iceberg.FileScanTask;
|
||||
import org.apache.iceberg.Table;
|
||||
import org.apache.iceberg.TableScan;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.thrift.TException;
|
||||
|
||||
import java.text.SimpleDateFormat;
|
||||
@ -80,10 +94,14 @@ import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class StatisticsUtil {
|
||||
private static final Logger LOG = LogManager.getLogger(StatisticsUtil.class);
|
||||
|
||||
private static final String ID_DELIMITER = "-";
|
||||
private static final String VALUES_DELIMITER = ",";
|
||||
|
||||
private static final String TOTAL_SIZE = "totalSize";
|
||||
private static final String NUM_ROWS = "numRows";
|
||||
|
||||
private static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
|
||||
|
||||
public static List<ResultRow> executeQuery(String template, Map<String, String> params) {
|
||||
@ -461,4 +479,119 @@ public class StatisticsUtil {
|
||||
return (int) (healthCoefficient * 100.0);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Estimate hive table row count.
|
||||
* First get it from remote table parameters. If not found, estimate it : totalSize/estimatedRowSize
|
||||
* @param table Hive HMSExternalTable to estimate row count.
|
||||
* @return estimated row count
|
||||
*/
|
||||
public static long getHiveRowCount(HMSExternalTable table) {
|
||||
Map<String, String> parameters = table.getRemoteTable().getParameters();
|
||||
if (parameters == null) {
|
||||
return -1;
|
||||
}
|
||||
// Table parameters contains row count, simply get and return it.
|
||||
if (parameters.containsKey(NUM_ROWS)) {
|
||||
return Long.parseLong(parameters.get(NUM_ROWS));
|
||||
}
|
||||
if (!parameters.containsKey(TOTAL_SIZE)) {
|
||||
return -1;
|
||||
}
|
||||
// Table parameters doesn't contain row count but contain total size. Estimate row count : totalSize/rowSize
|
||||
long totalSize = Long.parseLong(parameters.get(TOTAL_SIZE));
|
||||
long estimatedRowSize = 0;
|
||||
for (Column column : table.getFullSchema()) {
|
||||
estimatedRowSize += column.getDataType().getSlotSize();
|
||||
}
|
||||
if (estimatedRowSize == 0) {
|
||||
return 1;
|
||||
}
|
||||
return totalSize / estimatedRowSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* Estimate iceberg table row count.
|
||||
* Get the row count by adding all task file recordCount.
|
||||
* @param table Iceberg HMSExternalTable to estimate row count.
|
||||
* @return estimated row count
|
||||
*/
|
||||
public static long getIcebergRowCount(HMSExternalTable table) {
|
||||
long rowCount = 0;
|
||||
try {
|
||||
Table icebergTable = HiveMetaStoreClientHelper.getIcebergTable(table);
|
||||
TableScan tableScan = icebergTable.newScan().includeColumnStats();
|
||||
for (FileScanTask task : tableScan.planFiles()) {
|
||||
rowCount += task.file().recordCount();
|
||||
}
|
||||
return rowCount;
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Fail to collect row count for db {} table {}", table.getDbName(), table.getName(), e);
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
/**
|
||||
* Estimate hive table row count : totalFileSize/estimatedRowSize
|
||||
* @param table Hive HMSExternalTable to estimate row count.
|
||||
* @return estimated row count
|
||||
*/
|
||||
public static long getRowCountFromFileList(HMSExternalTable table) {
|
||||
HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
|
||||
.getMetaStoreCache((HMSExternalCatalog) table.getCatalog());
|
||||
List<Type> partitionColumnTypes = table.getPartitionColumnTypes();
|
||||
HiveMetaStoreCache.HivePartitionValues partitionValues = null;
|
||||
List<HivePartition> hivePartitions = Lists.newArrayList();
|
||||
int samplePartitionSize = Config.hive_stats_partition_sample_size;
|
||||
int totalPartitionSize = 1;
|
||||
// Get table partitions from cache.
|
||||
if (!partitionColumnTypes.isEmpty()) {
|
||||
partitionValues = cache.getPartitionValues(table.getDbName(), table.getName(), partitionColumnTypes);
|
||||
}
|
||||
if (partitionValues != null) {
|
||||
Map<Long, PartitionItem> idToPartitionItem = partitionValues.getIdToPartitionItem();
|
||||
totalPartitionSize = idToPartitionItem.size();
|
||||
Collection<PartitionItem> partitionItems;
|
||||
List<List<String>> partitionValuesList;
|
||||
// If partition number is too large, randomly choose part of them to estimate the whole table.
|
||||
if (samplePartitionSize < totalPartitionSize) {
|
||||
List<PartitionItem> items = new ArrayList<>(idToPartitionItem.values());
|
||||
Collections.shuffle(items);
|
||||
partitionItems = items.subList(0, samplePartitionSize);
|
||||
partitionValuesList = Lists.newArrayListWithCapacity(samplePartitionSize);
|
||||
} else {
|
||||
partitionItems = idToPartitionItem.values();
|
||||
partitionValuesList = Lists.newArrayListWithCapacity(totalPartitionSize);
|
||||
}
|
||||
for (PartitionItem item : partitionItems) {
|
||||
partitionValuesList.add(((ListPartitionItem) item).getItems().get(0).getPartitionValuesAsStringList());
|
||||
}
|
||||
hivePartitions = cache.getAllPartitions(table.getDbName(), table.getName(), partitionValuesList);
|
||||
} else {
|
||||
hivePartitions.add(new HivePartition(table.getDbName(), table.getName(), true,
|
||||
table.getRemoteTable().getSd().getInputFormat(),
|
||||
table.getRemoteTable().getSd().getLocation(), null));
|
||||
}
|
||||
// Get files for all partitions.
|
||||
List<HiveMetaStoreCache.FileCacheValue> filesByPartitions = cache.getFilesByPartitions(hivePartitions, true);
|
||||
long totalSize = 0;
|
||||
// Calculate the total file size.
|
||||
for (HiveMetaStoreCache.FileCacheValue files : filesByPartitions) {
|
||||
for (HiveMetaStoreCache.HiveFileStatus file : files.getFiles()) {
|
||||
totalSize += file.getLength();
|
||||
}
|
||||
}
|
||||
// Estimate row count: totalSize/estimatedRowSize
|
||||
long estimatedRowSize = 0;
|
||||
for (Column column : table.getFullSchema()) {
|
||||
estimatedRowSize += column.getDataType().getSlotSize();
|
||||
}
|
||||
if (estimatedRowSize == 0) {
|
||||
return 1;
|
||||
}
|
||||
if (samplePartitionSize < totalPartitionSize) {
|
||||
totalSize = totalSize * totalPartitionSize / samplePartitionSize;
|
||||
}
|
||||
return totalSize / estimatedRowSize;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user