[opt](statistics) optimize Incremental statistics collection and statistics cleaning (#18971)
This pr mainly optimizes the following items: - the collection of statistics: clear up invalid historical statistics before collecting them, so as not to affect the final table statistics. - the incremental collection of statistics: in the case of incremental collection, only the corresponding partition statistics need to be collected. TODO: Supports incremental collection of materialized view statistics.
This commit is contained in:
@ -54,11 +54,14 @@ import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class AnalysisManager {
|
||||
|
||||
@ -104,11 +107,22 @@ public class AnalysisManager {
|
||||
if (!StatisticsUtil.statsTblAvailable() && !FeConstants.runningUnitTest) {
|
||||
throw new DdlException("Stats table not available, please make sure your cluster status is normal");
|
||||
}
|
||||
|
||||
Map<String, Set<String>> columnToPartitions = validateAndGetPartitions(stmt);
|
||||
if (columnToPartitions.isEmpty()) {
|
||||
// No statistics need to be collected or updated
|
||||
return;
|
||||
}
|
||||
|
||||
long jobId = Env.getCurrentEnv().getNextId();
|
||||
TableIf table = stmt.getTable();
|
||||
AnalysisType analysisType = stmt.getAnalysisType();
|
||||
boolean isSync = stmt.isSync();
|
||||
|
||||
AnalysisTaskInfoBuilder taskInfoBuilder = buildCommonTaskInfo(stmt, jobId);
|
||||
Map<Long, BaseAnalysisTask> analysisTaskInfos = new HashMap<>();
|
||||
createTaskForEachColumns(stmt.getColumnNames(), taskInfoBuilder, analysisTaskInfos, stmt.isSync());
|
||||
createTaskForMVIdx(stmt.getTable(), taskInfoBuilder, analysisTaskInfos, stmt.getAnalysisType(), stmt.isSync());
|
||||
createTaskForEachColumns(columnToPartitions, taskInfoBuilder, analysisTaskInfos, analysisType, isSync);
|
||||
createTaskForMVIdx(table, taskInfoBuilder, analysisTaskInfos, analysisType, isSync);
|
||||
|
||||
if (stmt.isSync()) {
|
||||
syncExecute(analysisTaskInfos.values());
|
||||
@ -137,6 +151,81 @@ public class AnalysisManager {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the partitions for which statistics are to be collected. First verify that
|
||||
* there are partitions that have been deleted but have historical statistics(invalid statistics),
|
||||
* if there are these partitions, we need to delete them to avoid errors in summary table level statistics.
|
||||
* Then get the partitions for which statistics need to be collected based on collection mode (incremental/full).
|
||||
* <p>
|
||||
* note:
|
||||
* If there is no invalid statistics, it does not need to collect/update
|
||||
* statistics if the following conditions are met:
|
||||
* - in full collection mode, the partitioned table does not have partitions
|
||||
* - in incremental collection mode, partition statistics already exist
|
||||
* <p>
|
||||
* TODO Supports incremental collection of statistics from materialized views
|
||||
*/
|
||||
private Map<String, Set<String>> validateAndGetPartitions(AnalyzeStmt stmt) throws DdlException {
|
||||
TableIf table = stmt.getTable();
|
||||
long tableId = table.getId();
|
||||
Set<String> columnNames = stmt.getColumnNames();
|
||||
Set<String> partitionNames = table.getPartitionNames();
|
||||
|
||||
Map<String, Set<String>> columnToPartitions = columnNames.stream()
|
||||
.collect(Collectors.toMap(
|
||||
columnName -> columnName,
|
||||
columnName -> new HashSet<>(partitionNames)
|
||||
));
|
||||
|
||||
if (stmt.getAnalysisType() == AnalysisType.HISTOGRAM) {
|
||||
// Collecting histograms does not need to support incremental collection,
|
||||
// and will automatically cover historical statistics
|
||||
return columnToPartitions;
|
||||
}
|
||||
|
||||
// Get the partition granularity statistics that have been collected
|
||||
Map<String, Set<Long>> existColAndPartsForStats = StatisticsRepository
|
||||
.fetchColAndPartsForStats(tableId);
|
||||
|
||||
if (existColAndPartsForStats.isEmpty()) {
|
||||
// There is no historical statistical information, no need to do validation
|
||||
return columnToPartitions;
|
||||
}
|
||||
|
||||
Set<Long> existPartIdsForStats = new HashSet<>();
|
||||
existColAndPartsForStats.values().forEach(existPartIdsForStats::addAll);
|
||||
Map<Long, String> idToPartition = StatisticsUtil.getPartitionIdToName(table);
|
||||
// Get an invalid set of partitions (those partitions were deleted)
|
||||
Set<Long> invalidPartIds = existPartIdsForStats.stream()
|
||||
.filter(id -> !idToPartition.containsKey(id)).collect(Collectors.toSet());
|
||||
|
||||
if (!invalidPartIds.isEmpty()) {
|
||||
// Delete invalid partition statistics to avoid affecting table statistics
|
||||
StatisticsRepository.dropStatistics(invalidPartIds);
|
||||
}
|
||||
|
||||
if (stmt.isIncremental() && stmt.getAnalysisType() == AnalysisType.COLUMN) {
|
||||
existColAndPartsForStats.values().forEach(partIds -> partIds.removeAll(invalidPartIds));
|
||||
// In incremental collection mode, just collect the uncollected partition statistics
|
||||
existColAndPartsForStats.forEach((columnName, partitionIds) -> {
|
||||
Set<String> existPartitions = partitionIds.stream()
|
||||
.map(idToPartition::get)
|
||||
.collect(Collectors.toSet());
|
||||
columnToPartitions.computeIfPresent(columnName, (colName, partNames) -> {
|
||||
partNames.removeAll(existPartitions);
|
||||
return partNames;
|
||||
});
|
||||
});
|
||||
if (invalidPartIds.isEmpty()) {
|
||||
// There is no invalid statistics, so there is no need to update table statistics,
|
||||
// remove columns that do not require re-collection of statistics
|
||||
columnToPartitions.entrySet().removeIf(entry -> entry.getValue().isEmpty());
|
||||
}
|
||||
}
|
||||
|
||||
return columnToPartitions;
|
||||
}
|
||||
|
||||
private AnalysisTaskInfoBuilder buildCommonTaskInfo(AnalyzeStmt stmt, long jobId) {
|
||||
AnalysisTaskInfoBuilder taskInfoBuilder = new AnalysisTaskInfoBuilder();
|
||||
String catalogName = stmt.getCatalogName();
|
||||
@ -212,9 +301,11 @@ public class AnalysisManager {
|
||||
AnalysisTaskInfo analysisTaskInfo = indexTaskInfoBuilder.setIndexId(indexId)
|
||||
.setTaskId(taskId).build();
|
||||
analysisTasks.put(taskId, createTask(analysisTaskInfo));
|
||||
if (isSync) {
|
||||
return;
|
||||
}
|
||||
// TODO Temporarily save the statistics synchronous task,
|
||||
// which is mainly used to test the incremental collection of statistics.
|
||||
// if (isSync) {
|
||||
// return;
|
||||
// }
|
||||
try {
|
||||
StatisticsRepository.persistAnalysisTask(analysisTaskInfo);
|
||||
} catch (Exception e) {
|
||||
@ -226,24 +317,32 @@ public class AnalysisManager {
|
||||
}
|
||||
}
|
||||
|
||||
private void createTaskForEachColumns(Set<String> colNames, AnalysisTaskInfoBuilder taskInfoBuilder,
|
||||
Map<Long, BaseAnalysisTask> analysisTasks, boolean isSync) throws DdlException {
|
||||
for (String colName : colNames) {
|
||||
private void createTaskForEachColumns(Map<String, Set<String>> columnToPartitions,
|
||||
AnalysisTaskInfoBuilder taskInfoBuilder, Map<Long, BaseAnalysisTask> analysisTasks,
|
||||
AnalysisType analysisType, boolean isSync) throws DdlException {
|
||||
for (Entry<String, Set<String>> entry : columnToPartitions.entrySet()) {
|
||||
Set<String> partitionNames = entry.getValue();
|
||||
AnalysisTaskInfoBuilder colTaskInfoBuilder = taskInfoBuilder.copy();
|
||||
if (analysisType != AnalysisType.HISTOGRAM) {
|
||||
// Histograms do not need to specify partitions
|
||||
colTaskInfoBuilder.setPartitionNames(partitionNames);
|
||||
}
|
||||
long indexId = -1;
|
||||
String colName = entry.getKey();
|
||||
long taskId = Env.getCurrentEnv().getNextId();
|
||||
AnalysisTaskInfo analysisTaskInfo = colTaskInfoBuilder.setColName(colName)
|
||||
.setIndexId(indexId).setTaskId(taskId).build();
|
||||
analysisTasks.put(taskId, createTask(analysisTaskInfo));
|
||||
if (isSync) {
|
||||
continue;
|
||||
}
|
||||
// TODO Temporarily save the statistics synchronous task,
|
||||
// which is mainly used to test the incremental collection of statistics.
|
||||
// if (isSync) {
|
||||
// continue;
|
||||
// }
|
||||
try {
|
||||
StatisticsRepository.persistAnalysisTask(analysisTaskInfo);
|
||||
} catch (Exception e) {
|
||||
throw new DdlException("Failed to create analysis task", e);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -17,9 +17,12 @@
|
||||
|
||||
package org.apache.doris.statistics;
|
||||
|
||||
import org.apache.doris.statistics.util.StatisticsUtil;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.Set;
|
||||
import java.util.StringJoiner;
|
||||
|
||||
public class AnalysisTaskInfo {
|
||||
@ -60,6 +63,8 @@ public class AnalysisTaskInfo {
|
||||
|
||||
public final String tblName;
|
||||
|
||||
public final Set<String> partitionNames;
|
||||
|
||||
public final String colName;
|
||||
|
||||
public final Long indexId;
|
||||
@ -86,7 +91,7 @@ public class AnalysisTaskInfo {
|
||||
public final ScheduleType scheduleType;
|
||||
|
||||
public AnalysisTaskInfo(long jobId, long taskId, String catalogName, String dbName, String tblName,
|
||||
String colName, Long indexId, JobType jobType, AnalysisMethod analysisMethod,
|
||||
Set<String> partitionNames, String colName, Long indexId, JobType jobType, AnalysisMethod analysisMethod,
|
||||
AnalysisType analysisType, int samplePercent, int sampleRows, int maxBucketNum,
|
||||
String message, int lastExecTimeInMs, AnalysisState state, ScheduleType scheduleType) {
|
||||
this.jobId = jobId;
|
||||
@ -94,6 +99,7 @@ public class AnalysisTaskInfo {
|
||||
this.catalogName = catalogName;
|
||||
this.dbName = dbName;
|
||||
this.tblName = tblName;
|
||||
this.partitionNames = partitionNames;
|
||||
this.colName = colName;
|
||||
this.indexId = indexId;
|
||||
this.jobType = jobType;
|
||||
@ -115,6 +121,7 @@ public class AnalysisTaskInfo {
|
||||
sj.add("CatalogName: " + catalogName);
|
||||
sj.add("DBName: " + dbName);
|
||||
sj.add("TableName: " + tblName);
|
||||
sj.add("PartitionNames: " + StatisticsUtil.joinElementsToString(partitionNames, ","));
|
||||
sj.add("ColumnName: " + colName);
|
||||
sj.add("TaskType: " + analysisType.toString());
|
||||
sj.add("TaskMethod: " + analysisMethod.toString());
|
||||
|
||||
@ -22,12 +22,15 @@ import org.apache.doris.statistics.AnalysisTaskInfo.AnalysisType;
|
||||
import org.apache.doris.statistics.AnalysisTaskInfo.JobType;
|
||||
import org.apache.doris.statistics.AnalysisTaskInfo.ScheduleType;
|
||||
|
||||
import java.util.Set;
|
||||
|
||||
public class AnalysisTaskInfoBuilder {
|
||||
private long jobId;
|
||||
private long taskId;
|
||||
private String catalogName;
|
||||
private String dbName;
|
||||
private String tblName;
|
||||
private Set<String> partitionNames;
|
||||
private String colName;
|
||||
private Long indexId;
|
||||
private JobType jobType;
|
||||
@ -66,6 +69,11 @@ public class AnalysisTaskInfoBuilder {
|
||||
return this;
|
||||
}
|
||||
|
||||
public AnalysisTaskInfoBuilder setPartitionNames(Set<String> partitionNames) {
|
||||
this.partitionNames = partitionNames;
|
||||
return this;
|
||||
}
|
||||
|
||||
public AnalysisTaskInfoBuilder setColName(String colName) {
|
||||
this.colName = colName;
|
||||
return this;
|
||||
@ -127,7 +135,7 @@ public class AnalysisTaskInfoBuilder {
|
||||
}
|
||||
|
||||
public AnalysisTaskInfo build() {
|
||||
return new AnalysisTaskInfo(jobId, taskId, catalogName, dbName, tblName,
|
||||
return new AnalysisTaskInfo(jobId, taskId, catalogName, dbName, tblName, partitionNames,
|
||||
colName, indexId, jobType, analysisMethod, analysisType, samplePercent,
|
||||
sampleRows, maxBucketNum, message, lastExecTimeInMs, state, scheduleType);
|
||||
}
|
||||
@ -139,6 +147,7 @@ public class AnalysisTaskInfoBuilder {
|
||||
.setCatalogName(catalogName)
|
||||
.setDbName(dbName)
|
||||
.setTblName(tblName)
|
||||
.setPartitionNames(partitionNames)
|
||||
.setColName(colName)
|
||||
.setIndexId(indexId)
|
||||
.setJobType(jobType)
|
||||
|
||||
@ -74,7 +74,7 @@ public class OlapAnalysisTask extends BaseAnalysisTask {
|
||||
List<String> partitionAnalysisSQLs = new ArrayList<>();
|
||||
try {
|
||||
tbl.readLock();
|
||||
Set<String> partNames = tbl.getPartitionNames();
|
||||
Set<String> partNames = info.partitionNames;
|
||||
for (String partName : partNames) {
|
||||
Partition part = tbl.getPartition(partName);
|
||||
if (part == null) {
|
||||
|
||||
@ -30,6 +30,7 @@ import org.apache.doris.statistics.util.InternalQueryResult.ResultRow;
|
||||
import org.apache.doris.statistics.util.StatisticsUtil;
|
||||
import org.apache.doris.system.SystemInfoService;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
import org.apache.commons.text.StringSubstitutor;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
@ -108,6 +109,11 @@ public class StatisticsRepository {
|
||||
+ " ORDER BY update_time "
|
||||
+ "LIMIT ${limit} OFFSET ${offset}";
|
||||
|
||||
private static final String FETCH_STATS_PART_ID = "SELECT col_id, part_id FROM "
|
||||
+ FeConstants.INTERNAL_DB_NAME + "." + StatisticConstants.STATISTIC_TBL_NAME
|
||||
+ " WHERE tbl_id = ${tblId}"
|
||||
+ " AND part_id IS NOT NULL";
|
||||
|
||||
public static ColumnStatistic queryColumnStatisticsByName(long tableId, String colName) {
|
||||
ResultRow resultRow = queryColumnStatisticById(tableId, colName);
|
||||
if (resultRow == null) {
|
||||
@ -180,12 +186,17 @@ public class StatisticsRepository {
|
||||
return stringJoiner.toString();
|
||||
}
|
||||
|
||||
public static void dropStatistics(long tblId, Set<String> colNames) throws DdlException {
|
||||
dropStatistics(tblId, colNames, StatisticConstants.STATISTIC_TBL_NAME);
|
||||
dropStatistics(tblId, colNames, StatisticConstants.HISTOGRAM_TBL_NAME);
|
||||
public static void dropStatistics(Set<Long> partIds) throws DdlException {
|
||||
dropStatisticsByPartId(partIds, StatisticConstants.STATISTIC_TBL_NAME);
|
||||
}
|
||||
|
||||
public static void dropStatistics(long tblId, Set<String> colNames, String statsTblName) throws DdlException {
|
||||
public static void dropStatistics(long tblId, Set<String> colNames) throws DdlException {
|
||||
dropStatisticsByColName(tblId, colNames, StatisticConstants.STATISTIC_TBL_NAME);
|
||||
dropStatisticsByColName(tblId, colNames, StatisticConstants.HISTOGRAM_TBL_NAME);
|
||||
}
|
||||
|
||||
public static void dropStatisticsByColName(long tblId, Set<String> colNames, String statsTblName)
|
||||
throws DdlException {
|
||||
Map<String, String> params = new HashMap<>();
|
||||
String right = colNames.stream().map(s -> "'" + s + "'").collect(Collectors.joining(","));
|
||||
String inPredicate = String.format("tbl_id = %s AND %s IN (%s)", tblId, "col_id", right);
|
||||
@ -198,12 +209,17 @@ public class StatisticsRepository {
|
||||
}
|
||||
}
|
||||
|
||||
private static <T> void buildPredicate(String fieldName, Set<T> fieldValues, StringBuilder predicate) {
|
||||
StringJoiner predicateBuilder = new StringJoiner(",", "(", ")");
|
||||
fieldValues.stream().map(value -> String.format("'%s'", value))
|
||||
.forEach(predicateBuilder::add);
|
||||
String partPredicate = String.format(" AND %s IN %s", fieldName, predicateBuilder);
|
||||
predicate.append(partPredicate);
|
||||
public static void dropStatisticsByPartId(Set<Long> partIds, String statsTblName) throws DdlException {
|
||||
Map<String, String> params = new HashMap<>();
|
||||
String right = StatisticsUtil.joinElementsToString(partIds, ",");
|
||||
String inPredicate = String.format(" part_id IN (%s)", right);
|
||||
params.put("tblName", statsTblName);
|
||||
params.put("condition", inPredicate);
|
||||
try {
|
||||
StatisticsUtil.execUpdate(new StringSubstitutor(params).replace(DROP_TABLE_STATISTICS_TEMPLATE));
|
||||
} catch (Exception e) {
|
||||
throw new DdlException(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
public static void persistAnalysisTask(AnalysisTaskInfo analysisTaskInfo) throws Exception {
|
||||
@ -241,7 +257,7 @@ public class StatisticsRepository {
|
||||
builder.setCount(Double.parseDouble(rowCount));
|
||||
}
|
||||
if (ndv != null) {
|
||||
Double dNdv = Double.parseDouble(ndv);
|
||||
double dNdv = Double.parseDouble(ndv);
|
||||
builder.setNdv(dNdv);
|
||||
builder.setOriginalNdv(dNdv);
|
||||
}
|
||||
@ -298,4 +314,28 @@ public class StatisticsRepository {
|
||||
params.put("now", String.valueOf(System.currentTimeMillis()));
|
||||
return StatisticsUtil.execStatisticQuery(new StringSubstitutor(params).replace(FIND_EXPIRED_JOBS));
|
||||
}
|
||||
|
||||
public static Map<String, Set<Long>> fetchColAndPartsForStats(long tblId) {
|
||||
Map<String, String> params = Maps.newHashMap();
|
||||
params.put("tblId", String.valueOf(tblId));
|
||||
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
|
||||
String partSql = stringSubstitutor.replace(FETCH_STATS_PART_ID);
|
||||
List<ResultRow> resultRows = StatisticsUtil.execStatisticQuery(partSql);
|
||||
|
||||
Map<String, Set<Long>> columnToPartitions = Maps.newHashMap();
|
||||
|
||||
resultRows.forEach(row -> {
|
||||
try {
|
||||
String colId = row.getColumnValue("col_id");
|
||||
String partId = row.getColumnValue("part_id");
|
||||
columnToPartitions.computeIfAbsent(colId,
|
||||
k -> new HashSet<>()).add(Long.valueOf(partId));
|
||||
} catch (NumberFormatException | DdlException e) {
|
||||
LOG.warn("Failed to obtain the column and partition for statistics.{}",
|
||||
e.getMessage());
|
||||
}
|
||||
});
|
||||
|
||||
return columnToPartitions;
|
||||
}
|
||||
}
|
||||
|
||||
@ -63,10 +63,12 @@ import org.apache.commons.text.StringSubstitutor;
|
||||
import org.apache.thrift.TException;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.StringJoiner;
|
||||
import java.util.UUID;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@ -345,4 +347,19 @@ public class StatisticsUtil {
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
public static Map<Long, String> getPartitionIdToName(TableIf table) {
|
||||
return table.getPartitionNames().stream()
|
||||
.map(table::getPartition)
|
||||
.collect(Collectors.toMap(
|
||||
Partition::getId,
|
||||
Partition::getName
|
||||
));
|
||||
}
|
||||
|
||||
public static <T> String joinElementsToString(Collection<T> values, String delimiter) {
|
||||
StringJoiner builder = new StringJoiner(delimiter);
|
||||
values.forEach(v -> builder.add(String.valueOf(v)));
|
||||
return builder.toString();
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user