[feat](meta) Reuse HMS statistics analyzed by Spark engine for Analyze Task. (#28525)

Taking the Idea further from PR #24853 (#24853)
Column statistics already analyzed and available in HMS from spark, this PR proposes to reuse the analyzed stats from external source, when executed WITH SQL clause of analyze cooamd.

Spark analyzes and stores the statistics in Table properties instead of HiveColumnStatistics. In this PR, we try to get the statistics from these properties and make it available to Doris.
This commit is contained in:
Nitin-Kashyap
2024-01-12 14:18:27 +05:30
committed by yiguolei
parent 7b30119537
commit d127527af3
2 changed files with 85 additions and 5 deletions

View File

@ -34,6 +34,7 @@ import org.apache.doris.statistics.BaseAnalysisTask;
import org.apache.doris.statistics.ColumnStatistic;
import org.apache.doris.statistics.ColumnStatisticBuilder;
import org.apache.doris.statistics.HMSAnalysisTask;
import org.apache.doris.statistics.StatsType;
import org.apache.doris.statistics.TableStatsMeta;
import org.apache.doris.statistics.util.StatisticsUtil;
import org.apache.doris.thrift.THiveTable;
@ -41,6 +42,7 @@ import org.apache.doris.thrift.TTableDescriptor;
import org.apache.doris.thrift.TTableType;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
@ -81,7 +83,7 @@ public class HMSExternalTable extends ExternalTable {
private static final Set<String> SUPPORTED_HIVE_FILE_FORMATS;
private static final Set<String> SUPPORTED_HIVE_TRANSACTIONAL_FILE_FORMATS;
private static final Map<StatsType, String> MAP_SPARK_STATS_TO_DORIS;
private static final String TBL_PROP_TXN_PROPERTIES = "transactional_properties";
private static final String TBL_PROP_INSERT_ONLY = "insert_only";
@ -89,6 +91,15 @@ public class HMSExternalTable extends ExternalTable {
private static final String NUM_ROWS = "numRows";
private static final String SPARK_COL_STATS = "spark.sql.statistics.colStats.";
private static final String SPARK_STATS_MAX = ".max";
private static final String SPARK_STATS_MIN = ".min";
private static final String SPARK_STATS_NDV = ".distinctCount";
private static final String SPARK_STATS_NULLS = ".nullCount";
private static final String SPARK_STATS_AVG_LEN = ".avgLen";
private static final String SPARK_STATS_MAX_LEN = ".avgLen";
private static final String SPARK_STATS_HISTOGRAM = ".histogram";
static {
SUPPORTED_HIVE_FILE_FORMATS = Sets.newHashSet();
SUPPORTED_HIVE_FILE_FORMATS.add("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat");
@ -109,6 +120,17 @@ public class HMSExternalTable extends ExternalTable {
SUPPORTED_HUDI_FILE_FORMATS.add("com.uber.hoodie.hadoop.realtime.HoodieRealtimeInputFormat");
}
static {
MAP_SPARK_STATS_TO_DORIS = Maps.newHashMap();
MAP_SPARK_STATS_TO_DORIS.put(StatsType.NDV, SPARK_STATS_NDV);
MAP_SPARK_STATS_TO_DORIS.put(StatsType.AVG_SIZE, SPARK_STATS_AVG_LEN);
MAP_SPARK_STATS_TO_DORIS.put(StatsType.MAX_SIZE, SPARK_STATS_MAX_LEN);
MAP_SPARK_STATS_TO_DORIS.put(StatsType.NUM_NULLS, SPARK_STATS_NULLS);
MAP_SPARK_STATS_TO_DORIS.put(StatsType.MIN_VALUE, SPARK_STATS_MIN);
MAP_SPARK_STATS_TO_DORIS.put(StatsType.MAX_VALUE, SPARK_STATS_MAX);
MAP_SPARK_STATS_TO_DORIS.put(StatsType.HISTOGRAM, SPARK_STATS_HISTOGRAM);
}
private volatile org.apache.hadoop.hive.metastore.api.Table remoteTable = null;
private List<Column> partitionColumns;
@ -536,6 +558,31 @@ public class HMSExternalTable extends ExternalTable {
LOG.debug("get {} partition columns for table: {}", partitionColumns.size(), name);
}
public boolean hasColumnStatistics(String colName) {
Map<String, String> parameters = remoteTable.getParameters();
return parameters.keySet().stream()
.filter(k -> k.startsWith(SPARK_COL_STATS + colName + ".")).findAny().isPresent();
}
public boolean fillColumnStatistics(String colName, Map<StatsType, String> statsTypes, Map<String, String> stats) {
makeSureInitialized();
if (!hasColumnStatistics(colName)) {
return false;
}
Map<String, String> parameters = remoteTable.getParameters();
for (StatsType type : statsTypes.keySet()) {
String key = SPARK_COL_STATS + colName + MAP_SPARK_STATS_TO_DORIS.getOrDefault(type, "-");
if (parameters.containsKey(key)) {
stats.put(statsTypes.get(type), parameters.get(key));
} else {
// should not happen, spark would have all type (except histogram)
stats.put(statsTypes.get(type), "NULL");
}
}
return true;
}
@Override
public Optional<ColumnStatistic> getColumnStatistic(String colName) {
makeSureInitialized();

View File

@ -20,6 +20,7 @@ package org.apache.doris.statistics;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.external.HMSExternalTable;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
import org.apache.doris.datasource.hive.HiveMetaStoreCache;
@ -89,12 +90,18 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
* Get column statistics and insert the result to __internal_schema.column_statistics
*/
protected void getTableColumnStats() throws Exception {
if (!info.usingSqlForPartitionColumn && isPartitionColumn()) {
if (!info.usingSqlForPartitionColumn) {
try {
getPartitionColumnStats();
if (isPartitionColumn()) {
getPartitionColumnStats();
} else {
getHmsColumnStats();
}
} catch (Exception e) {
LOG.warn("Failed to collect stats for partition col {} using metadata, "
+ "fallback to normal collection", col.getName(), e);
LOG.warn("Failed to collect stats for {}col {} using metadata, "
+ "fallback to normal collection",
isPartitionColumn() ? "partition " : "", col.getName(), e);
/* retry using sql way! */
getOrdinaryColumnStats();
}
} else {
@ -209,6 +216,32 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
runQuery(sql);
}
// Collect the spark analyzed column stats through HMS metadata.
private void getHmsColumnStats() throws Exception {
TableStatsMeta tableStatsStatus = Env.getCurrentEnv().getAnalysisManager().findTableStatsStatus(table.getId());
long count = tableStatsStatus == null ? table.estimatedRowCount() : tableStatsStatus.rowCount;
Map<String, String> params = buildStatsParams("NULL");
Map<StatsType, String> statsParams = new HashMap<>();
statsParams.put(StatsType.NDV, "ndv");
statsParams.put(StatsType.NUM_NULLS, "null_count");
statsParams.put(StatsType.MIN_VALUE, "min");
statsParams.put(StatsType.MAX_VALUE, "max");
statsParams.put(StatsType.AVG_SIZE, "avg_len");
if (table.fillColumnStatistics(info.colName, statsParams, params)) {
throw new AnalysisException("some column stats not available");
}
long dataSize = Long.valueOf(params.get("avg_len")) * count;
params.put("row_count", String.valueOf(count));
params.put("data_size", String.valueOf(dataSize));
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
String sql = stringSubstitutor.replace(ANALYZE_PARTITION_COLUMN_TEMPLATE);
runQuery(sql);
}
private String updateMinValue(String currentMin, String value) {
if (currentMin == null) {
return value;