[Improvement](statistics)Collect stats for hive partition column using metadata (#24853)

Hive partition columns' stats could be calculated from hive metastore data. Doesn't need to execute sql to get the stats.
This PR is using hive partition metadata to collect partition column stats.
This commit is contained in:
Jibing-Li
2023-10-17 10:31:57 +08:00
committed by GitHub
parent a383a2bc83
commit 1130317b91
15 changed files with 447 additions and 48 deletions

View File

@ -42,8 +42,8 @@ public class AnalyzeProperties {
public static final String PROPERTY_NUM_BUCKETS = "num.buckets";
public static final String PROPERTY_ANALYSIS_TYPE = "analysis.type";
public static final String PROPERTY_PERIOD_SECONDS = "period.seconds";
public static final String PROPERTY_FORCE_FULL = "force.full";
public static final String PROPERTY_PARTITION_COLUMN_FROM_SQL = "partition.column.from.sql";
public static final AnalyzeProperties DEFAULT_PROP = new AnalyzeProperties(new HashMap<String, String>() {
{
@ -71,6 +71,7 @@ public class AnalyzeProperties {
.add(PROPERTY_PERIOD_SECONDS)
.add(PROPERTY_PERIOD_CRON)
.add(PROPERTY_FORCE_FULL)
.add(PROPERTY_PARTITION_COLUMN_FROM_SQL)
.build();
public AnalyzeProperties(Map<String, String> properties) {
@ -276,6 +277,10 @@ public class AnalyzeProperties {
return properties.containsKey(PROPERTY_SAMPLE_ROWS);
}
public boolean usingSqlForPartitionColumn() {
return properties.containsKey(PROPERTY_PARTITION_COLUMN_FROM_SQL);
}
public String toSQL() {
StringBuilder sb = new StringBuilder();
sb.append("PROPERTIES(");

View File

@ -97,4 +97,8 @@ public class AnalyzeStmt extends StatementBase {
public boolean forceFull() {
return analyzeProperties.forceFull();
}
public boolean usingSqlForPartitionColumn() {
return analyzeProperties.usingSqlForPartitionColumn();
}
}

View File

@ -87,10 +87,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.FileNotFoundException;
import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.HashMap;
@ -298,16 +295,7 @@ public class HiveMetaStoreCache {
Preconditions.checkState(parts.length == types.size(), partitionName + " vs. " + types);
List<PartitionValue> values = Lists.newArrayListWithExpectedSize(types.size());
for (String part : parts) {
String[] kv = part.split("=");
Preconditions.checkState(kv.length == 2, partitionName);
String partitionValue;
try {
// hive partition value maybe contains special characters like '=' and '/'
partitionValue = URLDecoder.decode(kv[1], StandardCharsets.UTF_8.name());
} catch (UnsupportedEncodingException e) {
// It should not be here
throw new RuntimeException(e);
}
String partitionValue = HiveUtil.getHivePartitionValue(part);
values.add(new PartitionValue(partitionValue, HIVE_DEFAULT_PARTITION.equals(partitionValue)));
}
try {

View File

@ -27,6 +27,7 @@ import org.apache.doris.fs.FileSystemFactory;
import org.apache.doris.fs.remote.BrokerFileSystem;
import org.apache.doris.fs.remote.RemoteFileSystem;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -46,8 +47,11 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.util.List;
/**
@ -221,4 +225,16 @@ public final class HiveUtil {
}
}
public static String getHivePartitionValue(String part) {
String[] kv = part.split("=");
Preconditions.checkState(kv.length == 2, String.format("Malformed partition name %s", part));
try {
// hive partition value maybe contains special characters like '=' and '/'
return URLDecoder.decode(kv[1], StandardCharsets.UTF_8.name());
} catch (UnsupportedEncodingException e) {
// It should not be here
throw new RuntimeException(e);
}
}
}

View File

@ -179,13 +179,17 @@ public class AnalysisInfo implements Writable {
@SerializedName("forceFull")
public final boolean forceFull;
@SerializedName("usingSqlForPartitionColumn")
public final boolean usingSqlForPartitionColumn;
public AnalysisInfo(long jobId, long taskId, List<Long> taskIds, long catalogId, long dbId, long tblId,
Map<String, Set<String>> colToPartitions, Set<String> partitionNames, String colName, Long indexId,
JobType jobType, AnalysisMode analysisMode, AnalysisMethod analysisMethod, AnalysisType analysisType,
int samplePercent, long sampleRows, int maxBucketNum, long periodTimeInMs, String message,
long lastExecTimeInMs, long timeCostInMs, AnalysisState state, ScheduleType scheduleType,
boolean isExternalTableLevelTask, boolean partitionOnly, boolean samplingPartition,
boolean isAllPartition, long partitionCount, CronExpression cronExpression, boolean forceFull) {
boolean isAllPartition, long partitionCount, CronExpression cronExpression, boolean forceFull,
boolean usingSqlForPartitionColumn) {
this.jobId = jobId;
this.taskId = taskId;
this.taskIds = taskIds;
@ -219,6 +223,7 @@ public class AnalysisInfo implements Writable {
this.cronExprStr = cronExpression.getCronExpression();
}
this.forceFull = forceFull;
this.usingSqlForPartitionColumn = usingSqlForPartitionColumn;
}
@Override
@ -259,6 +264,7 @@ public class AnalysisInfo implements Writable {
sj.add("cronExpr: " + cronExprStr);
}
sj.add("forceFull: " + forceFull);
sj.add("usingSqlForPartitionColumn: " + usingSqlForPartitionColumn);
return sj.toString();
}

View File

@ -60,6 +60,7 @@ public class AnalysisInfoBuilder {
private long partitionCount;
private CronExpression cronExpression;
private boolean forceFull;
private boolean usingSqlForPartitionColumn;
public AnalysisInfoBuilder() {
}
@ -95,6 +96,7 @@ public class AnalysisInfoBuilder {
partitionCount = info.partitionCount;
cronExpression = info.cronExpression;
forceFull = info.forceFull;
usingSqlForPartitionColumn = info.usingSqlForPartitionColumn;
}
public AnalysisInfoBuilder setJobId(long jobId) {
@ -237,12 +239,19 @@ public class AnalysisInfoBuilder {
return this;
}
public void setCronExpression(CronExpression cronExpression) {
public AnalysisInfoBuilder setCronExpression(CronExpression cronExpression) {
this.cronExpression = cronExpression;
return this;
}
public void setForceFull(boolean forceFull) {
public AnalysisInfoBuilder setForceFull(boolean forceFull) {
this.forceFull = forceFull;
return this;
}
public AnalysisInfoBuilder setUsingSqlForPartitionColumn(boolean usingSqlForPartitionColumn) {
this.usingSqlForPartitionColumn = usingSqlForPartitionColumn;
return this;
}
public AnalysisInfo build() {
@ -250,7 +259,7 @@ public class AnalysisInfoBuilder {
colName, indexId, jobType, analysisMode, analysisMethod, analysisType, samplePercent,
sampleRows, maxBucketNum, periodTimeInMs, message, lastExecTimeInMs, timeCostInMs, state, scheduleType,
externalTableLevelTask, partitionOnly, samplingPartition, isAllPartition, partitionCount,
cronExpression, forceFull);
cronExpression, forceFull, usingSqlForPartitionColumn);
}
public AnalysisInfoBuilder copy() {
@ -281,6 +290,9 @@ public class AnalysisInfoBuilder {
.setSamplingPartition(samplingPartition)
.setPartitionOnly(partitionOnly)
.setAllPartition(isAllPartition)
.setPartitionCount(partitionCount);
.setPartitionCount(partitionCount)
.setCronExpression(cronExpression)
.setForceFull(forceFull)
.setUsingSqlForPartitionColumn(usingSqlForPartitionColumn);
}
}

View File

@ -537,6 +537,7 @@ public class AnalysisManager extends Daemon implements Writable {
infoBuilder.setLastExecTimeInMs(0);
infoBuilder.setCronExpression(cronExpression);
infoBuilder.setForceFull(stmt.forceFull());
infoBuilder.setUsingSqlForPartitionColumn(stmt.usingSqlForPartitionColumn());
if (analysisMethod == AnalysisMethod.SAMPLE) {
infoBuilder.setSamplePercent(samplePercent);
infoBuilder.setSampleRows(sampleRows);

View File

@ -93,6 +93,24 @@ public abstract class BaseAnalysisTask {
+ " ${internalDB}.${columnStatTbl}.part_id IS NOT NULL"
+ " ) t1, \n";
protected static final String ANALYZE_PARTITION_COLUMN_TEMPLATE = "INSERT INTO "
+ "${internalDB}.${columnStatTbl}"
+ " SELECT "
+ "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS id, "
+ "${catalogId} AS catalog_id, "
+ "${dbId} AS db_id, "
+ "${tblId} AS tbl_id, "
+ "${idxId} AS idx_id, "
+ "'${colId}' AS col_id, "
+ "NULL AS part_id, "
+ "${row_count} AS row_count, "
+ "${ndv} AS ndv, "
+ "${null_count} AS null_count, "
+ "'${min}' AS min, "
+ "'${max}' AS max, "
+ "${data_size} AS data_size, "
+ "NOW() ";
protected AnalysisInfo info;
protected CatalogIf<? extends DatabaseIf<? extends TableIf>> catalog;

View File

@ -21,12 +21,15 @@ import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.external.HMSExternalTable;
import org.apache.doris.common.FeConstants;
import org.apache.doris.datasource.hive.HiveMetaStoreCache;
import org.apache.doris.external.hive.util.HiveUtil;
import org.apache.doris.qe.AutoCloseConnectContext;
import org.apache.doris.qe.QueryState;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.statistics.util.StatisticsUtil;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.commons.text.StringSubstitutor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -118,7 +121,7 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
* Get table row count
*/
private void getTableStats() throws Exception {
Map<String, String> params = buildTableStatsParams(null);
Map<String, String> params = buildStatsParams(null);
List<ResultRow> columnResult =
StatisticsUtil.execStatisticQuery(new StringSubstitutor(params)
.replace(ANALYZE_TABLE_COUNT_TEMPLATE));
@ -132,6 +135,33 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
* Get column statistics and insert the result to __internal_schema.column_statistics
*/
private void getTableColumnStats() throws Exception {
if (isPartitionOnly) {
getPartitionNames();
List<String> partitionAnalysisSQLs = new ArrayList<>();
for (String partId : this.partitionNames) {
partitionAnalysisSQLs.add(generateSqlForPartition(partId));
}
execSQLs(partitionAnalysisSQLs);
} else {
if (!info.usingSqlForPartitionColumn && isPartitionColumn()) {
try {
getPartitionColumnStats();
} catch (Exception e) {
LOG.warn("Failed to collect stats for partition col {} using metadata, "
+ "fallback to normal collection", col.getName(), e);
getOrdinaryColumnStats();
}
} else {
getOrdinaryColumnStats();
}
}
}
private boolean isPartitionColumn() {
return table.getPartitionColumns().stream().anyMatch(c -> c.getName().equals(col.getName()));
}
private void getOrdinaryColumnStats() throws Exception {
// An example sql for a column stats:
// INSERT INTO __internal_schema.column_statistics
// SELECT CONCAT(13055, '-', -1, '-', 'r_regionkey') AS id,
@ -148,26 +178,100 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
// MAX(`r_regionkey`) AS max,
// 0 AS data_size,
// NOW() FROM `hive`.`tpch100`.`region`
if (isPartitionOnly) {
getPartitionNames();
List<String> partitionAnalysisSQLs = new ArrayList<>();
for (String partId : this.partitionNames) {
partitionAnalysisSQLs.add(generateSqlForPartition(partId));
StringBuilder sb = new StringBuilder();
sb.append(ANALYZE_TABLE_TEMPLATE);
Map<String, String> params = buildStatsParams("NULL");
params.put("dataSizeFunction", getDataSizeFunction(col));
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
String sql = stringSubstitutor.replace(sb.toString());
executeInsertSql(sql);
}
private void getPartitionColumnStats() throws Exception {
Set<String> partitionNames = table.getPartitionNames();
Set<String> ndvPartValues = Sets.newHashSet();
long numNulls = 0;
long dataSize = 0;
String min = null;
String max = null;
for (String names : partitionNames) {
// names is like "date=20230101" for one level partition
// and like "date=20230101/hour=12" for two level partition
String[] parts = names.split("/");
for (String part : parts) {
if (part.startsWith(col.getName())) {
String value = HiveUtil.getHivePartitionValue(part);
// HIVE_DEFAULT_PARTITION hive partition value when the partition name is not specified.
if (value == null || value.isEmpty() || value.equals(HiveMetaStoreCache.HIVE_DEFAULT_PARTITION)) {
numNulls += 1;
continue;
}
ndvPartValues.add(value);
dataSize += col.getType().isStringType() ? value.length() : col.getType().getSlotSize();
min = updateMinValue(min, value);
max = updateMaxValue(max, value);
}
}
execSQLs(partitionAnalysisSQLs);
} else {
StringBuilder sb = new StringBuilder();
sb.append(ANALYZE_TABLE_TEMPLATE);
Map<String, String> params = buildTableStatsParams("NULL");
params.put("internalDB", FeConstants.INTERNAL_DB_NAME);
params.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME);
params.put("colName", col.getName());
params.put("colId", info.colName);
params.put("dataSizeFunction", getDataSizeFunction(col));
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
String sql = stringSubstitutor.replace(sb.toString());
executeInsertSql(sql);
}
// Estimate the row count. This value is inaccurate if the table stats is empty.
TableStatsMeta tableStatsStatus = Env.getCurrentEnv().getAnalysisManager().findTableStatsStatus(table.getId());
long count = tableStatsStatus == null ? table.estimatedRowCount() : tableStatsStatus.rowCount;
dataSize = dataSize * count / partitionNames.size();
numNulls = numNulls * count / partitionNames.size();
int ndv = ndvPartValues.size();
Map<String, String> params = buildStatsParams("NULL");
params.put("row_count", String.valueOf(count));
params.put("ndv", String.valueOf(ndv));
params.put("null_count", String.valueOf(numNulls));
params.put("min", min);
params.put("max", max);
params.put("data_size", String.valueOf(dataSize));
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
String sql = stringSubstitutor.replace(ANALYZE_PARTITION_COLUMN_TEMPLATE);
executeInsertSql(sql);
}
private String updateMinValue(String currentMin, String value) {
if (currentMin == null) {
return value;
}
if (col.getType().isFixedPointType()) {
if (Long.parseLong(value) < Long.parseLong(currentMin)) {
return value;
} else {
return currentMin;
}
}
if (col.getType().isFloatingPointType() || col.getType().isDecimalV2() || col.getType().isDecimalV3()) {
if (Double.parseDouble(value) < Double.parseDouble(currentMin)) {
return value;
} else {
return currentMin;
}
}
return value.compareTo(currentMin) < 0 ? value : currentMin;
}
private String updateMaxValue(String currentMax, String value) {
if (currentMax == null) {
return value;
}
if (col.getType().isFixedPointType()) {
if (Long.parseLong(value) > Long.parseLong(currentMax)) {
return value;
} else {
return currentMax;
}
}
if (col.getType().isFloatingPointType() || col.getType().isDecimalV2() || col.getType().isDecimalV3()) {
if (Double.parseDouble(value) > Double.parseDouble(currentMax)) {
return value;
} else {
return currentMax;
}
}
return value.compareTo(currentMax) > 0 ? value : currentMax;
}
private void getPartitionNames() {
@ -198,11 +302,7 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
sb.append(" and ");
}
}
Map<String, String> params = buildTableStatsParams(partId);
params.put("internalDB", FeConstants.INTERNAL_DB_NAME);
params.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME);
params.put("colName", col.getName());
params.put("colId", info.colName);
Map<String, String> params = buildStatsParams(partId);
params.put("dataSizeFunction", getDataSizeFunction(col));
return new StringSubstitutor(params).replace(sb.toString());
}
@ -262,7 +362,7 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
}
}
private Map<String, String> buildTableStatsParams(String partId) {
private Map<String, String> buildStatsParams(String partId) {
Map<String, String> commonParams = new HashMap<>();
String id = StatisticsUtil.constructId(tbl.getId(), -1);
if (partId == null) {
@ -271,12 +371,16 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
id = StatisticsUtil.constructId(id, partId);
commonParams.put("partId", "\'" + partId + "\'");
}
commonParams.put("internalDB", FeConstants.INTERNAL_DB_NAME);
commonParams.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME);
commonParams.put("id", id);
commonParams.put("catalogId", String.valueOf(catalog.getId()));
commonParams.put("dbId", String.valueOf(db.getId()));
commonParams.put("tblId", String.valueOf(tbl.getId()));
commonParams.put("indexId", "-1");
commonParams.put("idxId", "-1");
commonParams.put("colName", info.colName);
commonParams.put("colId", info.colName);
commonParams.put("catalogName", catalog.getName());
commonParams.put("dbName", db.getFullName());
commonParams.put("tblName", tbl.getName());

View File

@ -629,7 +629,12 @@ public class StatisticsUtil {
}
// Estimate row count: totalSize/estimatedRowSize
long estimatedRowSize = 0;
List<Column> partitionColumns = table.getPartitionColumns();
for (Column column : table.getFullSchema()) {
// Partition column shouldn't count to the row size, because it is not in the data file.
if (partitionColumns != null && partitionColumns.contains(column)) {
continue;
}
estimatedRowSize += column.getDataType().getSlotSize();
}
if (estimatedRowSize == 0) {