[improvement](stats) Reduce unnecessary SQL from full auto analyze #22583

1. Remove bunch of SQLs related to partition's information
2. Fix the duplicate SQLs submission
3. Fix bug that table's stats not get updated after system job finished
This commit is contained in:
AKIRA
2023-08-04 15:52:25 +08:00
committed by GitHub
parent 7d1e08eafa
commit 56e8ad197c
6 changed files with 238 additions and 194 deletions

View File

@ -68,7 +68,7 @@ public class AnalysisInfo implements Writable {
// submit by user directly
MANUAL,
// submit by system automatically
SYSTEM
SYSTEM;
}
public enum ScheduleType {

View File

@ -31,7 +31,6 @@ import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MaterializedIndexMeta;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.TableIf.TableType;
@ -106,8 +105,97 @@ public class AnalysisManager extends Daemon implements Writable {
private final Map<Long, AnalysisInfo> analysisTaskInfoMap = Collections.synchronizedMap(new TreeMap<>());
private final Map<Long, AnalysisInfo> analysisJobInfoMap = Collections.synchronizedMap(new TreeMap<>());
private final Map<Long, AnalysisInfo> systemJobInfoMap = new ConcurrentHashMap<>();
private final ConcurrentMap<ConnectContext, SyncTaskCollection> ctxToSyncTask = new ConcurrentHashMap<>();
private final Function<TaskStatusWrapper, Void> userJobStatusUpdater = w -> {
AnalysisInfo info = w.info;
AnalysisState taskState = w.taskState;
String message = w.message;
long time = w.time;
if (analysisJobIdToTaskMap.get(info.jobId) == null) {
return null;
}
info.state = taskState;
info.message = message;
// Update the task cost time when task finished or failed. And only log the final state.
if (taskState.equals(AnalysisState.FINISHED) || taskState.equals(AnalysisState.FAILED)) {
info.timeCostInMs = time - info.lastExecTimeInMs;
info.lastExecTimeInMs = time;
logCreateAnalysisTask(info);
}
info.lastExecTimeInMs = time;
AnalysisInfo job = analysisJobInfoMap.get(info.jobId);
// Synchronize the job state change in job level.
synchronized (job) {
job.lastExecTimeInMs = time;
// Set the job state to RUNNING when its first task becomes RUNNING.
if (info.state.equals(AnalysisState.RUNNING) && job.state.equals(AnalysisState.PENDING)) {
job.state = AnalysisState.RUNNING;
replayCreateAnalysisJob(job);
}
boolean allFinished = true;
boolean hasFailure = false;
for (BaseAnalysisTask task : analysisJobIdToTaskMap.get(info.jobId).values()) {
AnalysisInfo taskInfo = task.info;
if (taskInfo.state.equals(AnalysisState.RUNNING) || taskInfo.state.equals(AnalysisState.PENDING)) {
allFinished = false;
break;
}
if (taskInfo.state.equals(AnalysisState.FAILED)) {
hasFailure = true;
}
}
if (allFinished) {
if (hasFailure) {
job.state = AnalysisState.FAILED;
logCreateAnalysisJob(job);
} else {
job.state = AnalysisState.FINISHED;
try {
updateTableStats(job);
} catch (Throwable e) {
LOG.warn("Failed to update Table statistics in job: {}", info.toString(), e);
}
logCreateAnalysisJob(job);
}
analysisJobIdToTaskMap.remove(job.jobId);
}
}
return null;
};
private final Function<TaskStatusWrapper, Void> systemJobStatusUpdater = w -> {
AnalysisInfo info = w.info;
info.state = w.taskState;
AnalysisInfo job = systemJobInfoMap.get(info.jobId);
if (job == null) {
return null;
}
for (BaseAnalysisTask task : analysisJobIdToTaskMap.get(info.jobId).values()) {
if (!task.info.state.equals(AnalysisState.FINISHED)) {
if (task.info.state.equals(AnalysisState.FAILED)) {
systemJobInfoMap.remove(info.jobId);
}
return null;
}
}
try {
updateTableStats(job);
} catch (Throwable e) {
LOG.warn("Failed to update Table statistics in job: {}", info.toString(), e);
} finally {
systemJobInfoMap.remove(info.jobId);
}
return null;
};
private final Function<TaskStatusWrapper, Void>[] updaters =
new Function[] {userJobStatusUpdater, systemJobStatusUpdater};
public AnalysisManager() {
super(TimeUnit.SECONDS.toMillis(StatisticConstants.ANALYZE_MANAGER_INTERVAL_IN_SECS));
if (!Env.isCheckpointThread()) {
@ -124,15 +212,15 @@ public class AnalysisManager extends Daemon implements Writable {
private void clear() {
clearMeta(analysisJobInfoMap, (a) ->
a.scheduleType.equals(ScheduleType.ONCE)
&& System.currentTimeMillis() - a.lastExecTimeInMs
> TimeUnit.DAYS.toMillis(StatisticConstants.ANALYSIS_JOB_INFO_EXPIRATION_TIME_IN_DAYS),
a.scheduleType.equals(ScheduleType.ONCE)
&& System.currentTimeMillis() - a.lastExecTimeInMs
> TimeUnit.DAYS.toMillis(StatisticConstants.ANALYSIS_JOB_INFO_EXPIRATION_TIME_IN_DAYS),
(id) -> {
Env.getCurrentEnv().getEditLog().logDeleteAnalysisJob(new AnalyzeDeletionLog(id));
return null;
});
Env.getCurrentEnv().getEditLog().logDeleteAnalysisJob(new AnalyzeDeletionLog(id));
return null;
});
clearMeta(analysisTaskInfoMap, (a) -> System.currentTimeMillis() - a.lastExecTimeInMs
> TimeUnit.DAYS.toMillis(StatisticConstants.ANALYSIS_JOB_INFO_EXPIRATION_TIME_IN_DAYS),
> TimeUnit.DAYS.toMillis(StatisticConstants.ANALYSIS_JOB_INFO_EXPIRATION_TIME_IN_DAYS),
(id) -> {
Env.getCurrentEnv().getEditLog().logDeleteAnalysisTask(new AnalyzeDeletionLog(id));
return null;
@ -140,7 +228,7 @@ public class AnalysisManager extends Daemon implements Writable {
}
private void clearMeta(Map<Long, AnalysisInfo> infoMap, Predicate<AnalysisInfo> isExpired,
Function<Long, Void> writeLog) {
Function<Long, Void> writeLog) {
synchronized (infoMap) {
List<Long> expired = new ArrayList<>();
for (Entry<Long, AnalysisInfo> entry : infoMap.entrySet()) {
@ -190,8 +278,8 @@ public class AnalysisManager extends Daemon implements Writable {
// columnNames null means to add all visitable columns.
AnalyzeTblStmt analyzeTblStmt = new AnalyzeTblStmt(analyzeProperties, tableName,
table.getBaseSchema().stream().filter(c -> !StatisticsUtil.isUnsupportedType(c.getType())).map(
Column::getName).collect(
Collectors.toList()), db.getId(), table);
Column::getName).collect(
Collectors.toList()), db.getId(), table);
try {
analyzeTblStmt.check();
} catch (AnalysisException analysisException) {
@ -267,6 +355,7 @@ public class AnalysisManager extends Daemon implements Writable {
public void createSystemAnalysisJob(AnalysisInfo info, AnalysisTaskExecutor analysisTaskExecutor)
throws DdlException {
AnalysisInfo jobInfo = buildAnalysisJobInfo(info);
systemJobInfoMap.put(info.jobId, info);
if (jobInfo.colToPartitions.isEmpty()) {
// No statistics need to be collected or updated
return;
@ -275,11 +364,8 @@ public class AnalysisManager extends Daemon implements Writable {
Map<Long, BaseAnalysisTask> analysisTaskInfos = new HashMap<>();
createTaskForEachColumns(jobInfo, analysisTaskInfos, false);
createTaskForMVIdx(jobInfo, analysisTaskInfos, false);
if (!jobInfo.jobType.equals(JobType.SYSTEM)) {
persistAnalysisJob(jobInfo);
analysisJobIdToTaskMap.put(jobInfo.jobId, analysisTaskInfos);
}
analysisTaskInfos.values().forEach(taskExecutor::submitTask);
analysisJobIdToTaskMap.put(jobInfo.jobId, analysisTaskInfos);
analysisTaskInfos.values().forEach(analysisTaskExecutor::submitTask);
}
private void sendJobId(List<AnalysisInfo> analysisInfos, boolean proxy) {
@ -327,14 +413,15 @@ public class AnalysisManager extends Daemon implements Writable {
* TODO Supports incremental collection of statistics from materialized views
*/
private Map<String, Set<String>> validateAndGetPartitions(TableIf table, Set<String> columnNames,
Set<String> partitionNames, AnalysisType analysisType, AnalysisMode analysisMode) throws DdlException {
Set<String> partitionNames, AnalysisType analysisType,
AnalysisMode analysisMode) throws DdlException {
long tableId = table.getId();
Map<String, Set<String>> columnToPartitions = columnNames.stream()
.collect(Collectors.toMap(
.collect(Collectors.toMap(
columnName -> columnName,
columnName -> new HashSet<>(partitionNames)
));
columnName -> new HashSet<>(partitionNames == null ? Collections.emptySet() : partitionNames)
));
if (analysisType == AnalysisType.HISTOGRAM) {
// Collecting histograms does not need to support incremental collection,
@ -476,15 +563,8 @@ public class AnalysisManager extends Daemon implements Writable {
taskInfoBuilder.setMaxBucketNum(jobInfo.maxBucketNum);
taskInfoBuilder.setPeriodTimeInMs(jobInfo.periodTimeInMs);
taskInfoBuilder.setLastExecTimeInMs(jobInfo.lastExecTimeInMs);
try {
TableIf table = StatisticsUtil
.findTable(jobInfo.catalogName, jobInfo.dbName, jobInfo.tblName);
Map<String, Set<String>> colToPartitions = validateAndGetPartitions(table, jobInfo.colToPartitions.keySet(),
jobInfo.partitionNames, jobInfo.analysisType, jobInfo.analysisMode);
taskInfoBuilder.setColToPartitions(colToPartitions);
} catch (Throwable e) {
throw new RuntimeException(e);
}
taskInfoBuilder.setColToPartitions(jobInfo.colToPartitions);
taskInfoBuilder.setTaskIds(new ArrayList<>());
return taskInfoBuilder.build();
}
@ -498,7 +578,7 @@ public class AnalysisManager extends Daemon implements Writable {
}
private void createTaskForMVIdx(AnalysisInfo jobInfo, Map<Long, BaseAnalysisTask> analysisTasks,
boolean isSync) throws DdlException {
boolean isSync) throws DdlException {
TableIf table;
try {
table = StatisticsUtil.findTable(jobInfo.catalogName, jobInfo.dbName, jobInfo.tblName);
@ -539,7 +619,7 @@ public class AnalysisManager extends Daemon implements Writable {
}
private void createTaskForEachColumns(AnalysisInfo jobInfo, Map<Long, BaseAnalysisTask> analysisTasks,
boolean isSync) throws DdlException {
boolean isSync) throws DdlException {
Map<String, Set<String>> columnToPartitions = jobInfo.colToPartitions;
for (Entry<String, Set<String>> entry : columnToPartitions.entrySet()) {
long indexId = -1;
@ -580,8 +660,8 @@ public class AnalysisManager extends Daemon implements Writable {
}
private void createTaskForExternalTable(AnalysisInfo jobInfo,
Map<Long, BaseAnalysisTask> analysisTasks,
boolean isSync) throws DdlException {
Map<Long, BaseAnalysisTask> analysisTasks,
boolean isSync) throws DdlException {
TableIf table;
try {
table = StatisticsUtil.findTable(jobInfo.catalogName, jobInfo.dbName, jobInfo.tblName);
@ -610,57 +690,8 @@ public class AnalysisManager extends Daemon implements Writable {
}
public void updateTaskStatus(AnalysisInfo info, AnalysisState taskState, String message, long time) {
if (analysisJobIdToTaskMap.get(info.jobId) == null) {
return;
}
info.state = taskState;
info.message = message;
// Update the task cost time when task finished or failed. And only log the final state.
if (taskState.equals(AnalysisState.FINISHED) || taskState.equals(AnalysisState.FAILED)) {
info.timeCostInMs = time - info.lastExecTimeInMs;
info.lastExecTimeInMs = time;
logCreateAnalysisTask(info);
}
info.lastExecTimeInMs = time;
AnalysisInfo job = analysisJobInfoMap.get(info.jobId);
// Synchronize the job state change in job level.
synchronized (job) {
job.lastExecTimeInMs = time;
// Set the job state to RUNNING when its first task becomes RUNNING.
if (info.state.equals(AnalysisState.RUNNING) && job.state.equals(AnalysisState.PENDING)) {
job.state = AnalysisState.RUNNING;
replayCreateAnalysisJob(job);
}
boolean allFinished = true;
boolean hasFailure = false;
for (BaseAnalysisTask task : analysisJobIdToTaskMap.get(info.jobId).values()) {
AnalysisInfo taskInfo = task.info;
if (taskInfo.state.equals(AnalysisState.RUNNING) || taskInfo.state.equals(AnalysisState.PENDING)) {
allFinished = false;
break;
}
if (taskInfo.state.equals(AnalysisState.FAILED)) {
hasFailure = true;
}
}
if (allFinished) {
if (hasFailure) {
job.state = AnalysisState.FAILED;
logCreateAnalysisJob(job);
} else {
job.state = AnalysisState.FINISHED;
if (job.jobType.equals(JobType.SYSTEM)) {
try {
updateTableStats(job);
} catch (Throwable e) {
LOG.warn("Failed to update Table statistics in job: {}", info.toString(), e);
}
}
logCreateAnalysisJob(job);
}
analysisJobIdToTaskMap.remove(job.jobId);
}
}
TaskStatusWrapper taskStatusWrapper = new TaskStatusWrapper(info, taskState, message, time);
updaters[info.jobType.ordinal()].apply(taskStatusWrapper);
}
private void updateTableStats(AnalysisInfo jobInfo) throws Throwable {
@ -696,15 +727,6 @@ public class AnalysisManager extends Daemon implements Writable {
}
private void updateOlapTableStats(OlapTable table, Map<String, String> params) throws Throwable {
for (Partition partition : table.getPartitions()) {
HashMap<String, String> partParams = Maps.newHashMap(params);
long rowCount = partition.getBaseIndex().getRowCount();
partParams.put("id", StatisticsUtil
.constructId(params.get("id"), partition.getId()));
partParams.put("partId", String.valueOf(partition.getId()));
partParams.put("rowCount", String.valueOf(rowCount));
StatisticsRepository.persistTableStats(partParams);
}
HashMap<String, String> tblParams = Maps.newHashMap(params);
long rowCount = table.getRowCount();
@ -717,12 +739,12 @@ public class AnalysisManager extends Daemon implements Writable {
String state = stmt.getStateValue();
TableName tblName = stmt.getDbTableName();
return analysisJobInfoMap.values().stream()
.filter(a -> stmt.getJobId() == 0 || a.jobId == stmt.getJobId())
.filter(a -> state == null || a.state.equals(AnalysisState.valueOf(state)))
.filter(a -> tblName == null || a.catalogName.equals(tblName.getCtl())
&& a.dbName.equals(tblName.getDb()) && a.tblName.equals(tblName.getTbl()))
.sorted(Comparator.comparingLong(a -> a.jobId))
.collect(Collectors.toList());
.filter(a -> stmt.getJobId() == 0 || a.jobId == stmt.getJobId())
.filter(a -> state == null || a.state.equals(AnalysisState.valueOf(state)))
.filter(a -> tblName == null || a.catalogName.equals(tblName.getCtl())
&& a.dbName.equals(tblName.getDb()) && a.tblName.equals(tblName.getTbl()))
.sorted(Comparator.comparingLong(a -> a.jobId))
.collect(Collectors.toList());
}
public String getJobProgress(long jobId) {
@ -765,12 +787,12 @@ public class AnalysisManager extends Daemon implements Writable {
private ThreadPoolExecutor createThreadPoolForSyncAnalyze() {
String poolName = "SYNC ANALYZE THREAD POOL";
return new ThreadPoolExecutor(0,
ConnectContext.get().getSessionVariable().parallelSyncAnalyzeTaskNum,
0, TimeUnit.SECONDS,
ConnectContext.get().getSessionVariable().parallelSyncAnalyzeTaskNum,
0, TimeUnit.SECONDS,
new SynchronousQueue(),
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("SYNC ANALYZE" + "-%d")
.build(), new BlockedPolicy(poolName,
(int) TimeUnit.HOURS.toSeconds(Config.analyze_task_timeout_in_hours)));
.build(), new BlockedPolicy(poolName,
(int) TimeUnit.HOURS.toSeconds(Config.analyze_task_timeout_in_hours)));
}
public void dropStats(DropStatsStmt dropStatsStmt) throws DdlException {
@ -821,7 +843,7 @@ public class AnalysisManager extends Daemon implements Writable {
if (!Env.getCurrentEnv().getAccessManager()
.checkTblPriv(ConnectContext.get(), analysisInfo.dbName, analysisInfo.tblName, PrivPredicate.SELECT)) {
throw new RuntimeException("You need at least SELECT PRIV to corresponding table to kill this analyze"
+ " job");
+ " job");
}
}
@ -907,26 +929,26 @@ public class AnalysisManager extends Daemon implements Writable {
}
if (!colNames.isEmpty()) {
throw new RuntimeException("Failed to analyze following columns:[" + String.join(",", colNames)
+ "] Reasons: " + String.join(",", errorMessages));
+ "] Reasons: " + String.join(",", errorMessages));
}
}
private void updateSyncTaskStatus(BaseAnalysisTask task, AnalysisState state) {
Env.getCurrentEnv().getAnalysisManager()
.updateTaskStatus(task.info, state, "", System.currentTimeMillis());
.updateTaskStatus(task.info, state, "", System.currentTimeMillis());
}
}
public List<AnalysisInfo> findAutomaticAnalysisJobs() {
synchronized (analysisJobInfoMap) {
return analysisJobInfoMap.values().stream()
.filter(a ->
a.scheduleType.equals(ScheduleType.AUTOMATIC)
&& (!(a.state.equals(AnalysisState.RUNNING)
|| a.state.equals(AnalysisState.PENDING)))
&& System.currentTimeMillis() - a.lastExecTimeInMs
> TimeUnit.MINUTES.toMillis(Config.auto_check_statistics_in_minutes))
.collect(Collectors.toList());
.filter(a ->
a.scheduleType.equals(ScheduleType.AUTOMATIC)
&& (!(a.state.equals(AnalysisState.RUNNING)
|| a.state.equals(AnalysisState.PENDING)))
&& System.currentTimeMillis() - a.lastExecTimeInMs
> TimeUnit.MINUTES.toMillis(Config.auto_check_statistics_in_minutes))
.collect(Collectors.toList());
}
}
@ -938,13 +960,13 @@ public class AnalysisManager extends Daemon implements Writable {
}
if (a.cronExpression == null) {
return a.scheduleType.equals(ScheduleType.PERIOD)
&& System.currentTimeMillis() - a.lastExecTimeInMs > a.periodTimeInMs;
&& System.currentTimeMillis() - a.lastExecTimeInMs > a.periodTimeInMs;
}
return a.cronExpression.getTimeAfter(new Date(a.lastExecTimeInMs)).before(new Date());
};
return analysisJobInfoMap.values().stream()
.filter(p)
.collect(Collectors.toList());
.filter(p)
.collect(Collectors.toList());
}
}

View File

@ -17,12 +17,13 @@
package org.apache.doris.statistics;
import org.apache.doris.analysis.AnalyzeProperties;
import org.apache.doris.analysis.TableName;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.View;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.util.MasterDaemon;
@ -38,8 +39,8 @@ import org.apache.logging.log4j.Logger;
import java.time.LocalTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -53,7 +54,8 @@ public class StatisticsAutoAnalyzer extends MasterDaemon {
private AnalysisTaskExecutor analysisTaskExecutor;
public StatisticsAutoAnalyzer() {
super("Automatic Analyzer", TimeUnit.MINUTES.toMillis(Config.auto_check_statistics_in_minutes));
super("Automatic Analyzer",
TimeUnit.MINUTES.toMillis(Config.auto_check_statistics_in_minutes) / 2);
analysisTaskExecutor = new AnalysisTaskExecutor(Config.full_auto_analyze_simultaneously_running_task_num);
}
@ -92,8 +94,7 @@ public class StatisticsAutoAnalyzer extends MasterDaemon {
continue;
}
AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager();
List<AnalysisInfo> analysisInfos = analysisManager.buildAnalysisInfosForDB(databaseIf,
AnalyzeProperties.DEFAULT_PROP);
List<AnalysisInfo> analysisInfos = constructAnalysisInfo(databaseIf);
for (AnalysisInfo analysisInfo : analysisInfos) {
analysisInfo = getReAnalyzeRequiredPart(analysisInfo);
if (analysisInfo == null) {
@ -106,10 +107,34 @@ public class StatisticsAutoAnalyzer extends MasterDaemon {
}
}
}
}
}
analyzePeriodically();
private List<AnalysisInfo> constructAnalysisInfo(DatabaseIf<TableIf> db) {
List<AnalysisInfo> analysisInfos = new ArrayList<>();
for (TableIf table : db.getTables()) {
if (table instanceof View) {
continue;
}
TableName tableName = new TableName(db.getCatalog().getName(), db.getFullName(),
table.getName());
AnalysisInfo jobInfo = new AnalysisInfoBuilder()
.setJobId(Env.getCurrentEnv().getNextId())
.setCatalogName(db.getCatalog().getName())
.setDbName(db.getFullName())
.setTblName(tableName.getTbl())
.setColName(
table.getBaseSchema().stream().filter(c -> !StatisticsUtil.isUnsupportedType(c.getType())).map(
Column::getName).collect(Collectors.joining(","))
)
.setAnalysisType(AnalysisInfo.AnalysisType.FUNDAMENTALS)
.setAnalysisMode(AnalysisInfo.AnalysisMode.INCREMENTAL)
.setAnalysisMethod(AnalysisInfo.AnalysisMethod.FULL)
.setScheduleType(AnalysisInfo.ScheduleType.ONCE)
.setJobType(JobType.SYSTEM).build();
analysisInfos.add(jobInfo);
}
return analysisInfos;
}
private void analyzePeriodically() {
@ -174,23 +199,17 @@ public class StatisticsAutoAnalyzer extends MasterDaemon {
return null;
}
if (tblStats == TableStatistic.UNKNOWN) {
return jobInfo;
}
if (!needReanalyzeTable(table, tblStats)) {
if (!(needReanalyzeTable(table, tblStats) || tblStats == TableStatistic.UNKNOWN)) {
return null;
}
Set<String> needRunPartitions = new HashSet<>();
Set<String> statsPartitions = jobInfo.colToPartitions.values()
.stream()
.flatMap(Collection::stream)
Set<String> needRunPartitions = table.getPartitionNames().stream()
.map(table::getPartition)
.filter(Partition::hasData)
.filter(partition ->
partition.getVisibleVersionTime() >= lastExecTimeInMs).map(Partition::getName)
.collect(Collectors.toSet());
checkAnalyzedPartitions(table, statsPartitions, needRunPartitions, lastExecTimeInMs);
checkNewPartitions(table, needRunPartitions, lastExecTimeInMs);
if (needRunPartitions.isEmpty()) {
return null;
}
@ -205,65 +224,22 @@ public class StatisticsAutoAnalyzer extends MasterDaemon {
return tblHealth < StatisticConstants.TABLE_STATS_HEALTH_THRESHOLD;
}
private void checkAnalyzedPartitions(TableIf table, Set<String> statsPartitions,
Set<String> needRunPartitions, long lastExecTimeInMs) {
for (String statsPartition : statsPartitions) {
Partition partition = table.getPartition(statsPartition);
if (partition == null) {
// Partition that has been deleted also need to
// be reanalyzed (delete partition statistics later)
needRunPartitions.add(statsPartition);
continue;
}
TableStatistic partitionStats = null;
try {
partitionStats = StatisticsRepository
.fetchTableLevelOfPartStats(partition.getId());
} catch (DdlException e) {
LOG.warn("Failed to fetch part stats", e);
continue;
}
if (needReanalyzePartition(lastExecTimeInMs, partition, partitionStats)
|| partitionStats == TableStatistic.UNKNOWN) {
needRunPartitions.add(partition.getName());
}
}
}
private boolean needReanalyzePartition(long lastExecTimeInMs, Partition partition, TableStatistic partStats) {
long partUpdateTime = partition.getVisibleVersionTime();
if (partUpdateTime < lastExecTimeInMs) {
return false;
}
long pRowCount = partition.getBaseIndex().getRowCount();
long pUpdateRows = Math.abs(pRowCount - partStats.rowCount);
int partHealth = StatisticsUtil.getTableHealth(pRowCount, pUpdateRows);
return partHealth < StatisticConstants.TABLE_STATS_HEALTH_THRESHOLD;
}
private void checkNewPartitions(TableIf table, Set<String> needRunPartitions, long lastExecTimeInMs) {
Set<String> partitionNames = table.getPartitionNames();
partitionNames.removeAll(needRunPartitions);
needRunPartitions.addAll(
partitionNames.stream()
.map(table::getPartition)
.filter(partition -> partition.getVisibleVersionTime() >= lastExecTimeInMs)
.map(Partition::getName)
.collect(Collectors.toSet())
);
}
private AnalysisInfo getAnalysisJobInfo(AnalysisInfo jobInfo, TableIf table,
Set<String> needRunPartitions) {
Map<String, Set<String>> newColToPartitions = Maps.newHashMap();
Map<String, Set<String>> colToPartitions = jobInfo.colToPartitions;
colToPartitions.keySet().forEach(colName -> {
Column column = table.getColumn(colName);
if (column != null) {
newColToPartitions.put(colName, needRunPartitions);
if (colToPartitions == null) {
for (Column c : table.getColumns()) {
newColToPartitions.put(c.getName(), needRunPartitions);
}
});
} else {
colToPartitions.keySet().forEach(colName -> {
Column column = table.getColumn(colName);
if (column != null) {
newColToPartitions.put(colName, needRunPartitions);
}
});
}
return new AnalysisInfoBuilder(jobInfo)
.setColToPartitions(newColToPartitions).build();
}

View File

@ -395,7 +395,7 @@ public class StatisticsRepository {
if (resultRows.size() == 1) {
return TableStatistic.fromResultRow(resultRows.get(0));
}
throw new DdlException("Query result is not as expected: " + sql);
return TableStatistic.UNKNOWN;
}
public static TableStatistic fetchTableLevelOfPartStats(long partId) throws DdlException {

View File

@ -0,0 +1,33 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.statistics;
public class TaskStatusWrapper {
public final AnalysisInfo info;
public final AnalysisState taskState;
public final String message;
public final long time;
public TaskStatusWrapper(AnalysisInfo info, AnalysisState taskState, String message, long time) {
this.info = info;
this.taskState = taskState;
this.message = message;
this.time = time;
}
}

View File

@ -17,6 +17,9 @@
package org.apache.doris.statistics;
import org.apache.doris.statistics.AnalysisInfo.AnalysisType;
import org.apache.doris.statistics.AnalysisInfo.JobType;
import mockit.Mock;
import mockit.MockUp;
import mockit.Mocked;
@ -42,12 +45,22 @@ public class AnalysisManagerTest {
};
new MockUp<AnalysisInfo>() {
@Mock
public String toString() {
return "";
}
};
AnalysisInfo job = new AnalysisInfoBuilder().setJobId(1)
.setState(AnalysisState.PENDING).setJobType(AnalysisInfo.JobType.MANUAL).build();
.setState(AnalysisState.PENDING).setAnalysisType(AnalysisType.FUNDAMENTALS)
.setJobType(AnalysisInfo.JobType.MANUAL).build();
AnalysisInfo taskInfo1 = new AnalysisInfoBuilder().setJobId(1)
.setTaskId(2).setState(AnalysisState.PENDING).build();
.setTaskId(2).setJobType(JobType.MANUAL).setAnalysisType(AnalysisType.FUNDAMENTALS)
.setState(AnalysisState.PENDING).build();
AnalysisInfo taskInfo2 = new AnalysisInfoBuilder().setJobId(1)
.setTaskId(3).setState(AnalysisState.PENDING).build();
.setTaskId(3).setAnalysisType(AnalysisType.FUNDAMENTALS).setJobType(JobType.MANUAL)
.setState(AnalysisState.PENDING).build();
AnalysisManager manager = new AnalysisManager();
manager.replayCreateAnalysisJob(job);
manager.replayCreateAnalysisTask(taskInfo1);