diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java index 285950816f..874e329c06 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java @@ -68,7 +68,7 @@ public class AnalysisInfo implements Writable { // submit by user directly MANUAL, // submit by system automatically - SYSTEM + SYSTEM; } public enum ScheduleType { diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java index b67e4f76ac..6747ec322f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java @@ -31,7 +31,6 @@ import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.MaterializedIndexMeta; import org.apache.doris.catalog.OlapTable; -import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.TableIf.TableType; @@ -106,8 +105,97 @@ public class AnalysisManager extends Daemon implements Writable { private final Map analysisTaskInfoMap = Collections.synchronizedMap(new TreeMap<>()); private final Map analysisJobInfoMap = Collections.synchronizedMap(new TreeMap<>()); + private final Map systemJobInfoMap = new ConcurrentHashMap<>(); + private final ConcurrentMap ctxToSyncTask = new ConcurrentHashMap<>(); + private final Function userJobStatusUpdater = w -> { + AnalysisInfo info = w.info; + AnalysisState taskState = w.taskState; + String message = w.message; + long time = w.time; + if (analysisJobIdToTaskMap.get(info.jobId) == null) { + return null; + } + info.state = taskState; + info.message = message; + // Update the task cost time when task finished or failed. And only log the final state. + if (taskState.equals(AnalysisState.FINISHED) || taskState.equals(AnalysisState.FAILED)) { + info.timeCostInMs = time - info.lastExecTimeInMs; + info.lastExecTimeInMs = time; + logCreateAnalysisTask(info); + } + info.lastExecTimeInMs = time; + AnalysisInfo job = analysisJobInfoMap.get(info.jobId); + // Synchronize the job state change in job level. + synchronized (job) { + job.lastExecTimeInMs = time; + // Set the job state to RUNNING when its first task becomes RUNNING. + if (info.state.equals(AnalysisState.RUNNING) && job.state.equals(AnalysisState.PENDING)) { + job.state = AnalysisState.RUNNING; + replayCreateAnalysisJob(job); + } + boolean allFinished = true; + boolean hasFailure = false; + for (BaseAnalysisTask task : analysisJobIdToTaskMap.get(info.jobId).values()) { + AnalysisInfo taskInfo = task.info; + if (taskInfo.state.equals(AnalysisState.RUNNING) || taskInfo.state.equals(AnalysisState.PENDING)) { + allFinished = false; + break; + } + if (taskInfo.state.equals(AnalysisState.FAILED)) { + hasFailure = true; + } + } + if (allFinished) { + if (hasFailure) { + job.state = AnalysisState.FAILED; + logCreateAnalysisJob(job); + } else { + job.state = AnalysisState.FINISHED; + try { + updateTableStats(job); + } catch (Throwable e) { + LOG.warn("Failed to update Table statistics in job: {}", info.toString(), e); + } + logCreateAnalysisJob(job); + } + analysisJobIdToTaskMap.remove(job.jobId); + } + } + return null; + }; + + + private final Function systemJobStatusUpdater = w -> { + AnalysisInfo info = w.info; + info.state = w.taskState; + AnalysisInfo job = systemJobInfoMap.get(info.jobId); + if (job == null) { + return null; + } + for (BaseAnalysisTask task : analysisJobIdToTaskMap.get(info.jobId).values()) { + if (!task.info.state.equals(AnalysisState.FINISHED)) { + if (task.info.state.equals(AnalysisState.FAILED)) { + systemJobInfoMap.remove(info.jobId); + } + return null; + } + } + try { + updateTableStats(job); + } catch (Throwable e) { + LOG.warn("Failed to update Table statistics in job: {}", info.toString(), e); + } finally { + systemJobInfoMap.remove(info.jobId); + } + return null; + }; + + private final Function[] updaters = + new Function[] {userJobStatusUpdater, systemJobStatusUpdater}; + + public AnalysisManager() { super(TimeUnit.SECONDS.toMillis(StatisticConstants.ANALYZE_MANAGER_INTERVAL_IN_SECS)); if (!Env.isCheckpointThread()) { @@ -124,15 +212,15 @@ public class AnalysisManager extends Daemon implements Writable { private void clear() { clearMeta(analysisJobInfoMap, (a) -> - a.scheduleType.equals(ScheduleType.ONCE) - && System.currentTimeMillis() - a.lastExecTimeInMs - > TimeUnit.DAYS.toMillis(StatisticConstants.ANALYSIS_JOB_INFO_EXPIRATION_TIME_IN_DAYS), + a.scheduleType.equals(ScheduleType.ONCE) + && System.currentTimeMillis() - a.lastExecTimeInMs + > TimeUnit.DAYS.toMillis(StatisticConstants.ANALYSIS_JOB_INFO_EXPIRATION_TIME_IN_DAYS), (id) -> { - Env.getCurrentEnv().getEditLog().logDeleteAnalysisJob(new AnalyzeDeletionLog(id)); - return null; - }); + Env.getCurrentEnv().getEditLog().logDeleteAnalysisJob(new AnalyzeDeletionLog(id)); + return null; + }); clearMeta(analysisTaskInfoMap, (a) -> System.currentTimeMillis() - a.lastExecTimeInMs - > TimeUnit.DAYS.toMillis(StatisticConstants.ANALYSIS_JOB_INFO_EXPIRATION_TIME_IN_DAYS), + > TimeUnit.DAYS.toMillis(StatisticConstants.ANALYSIS_JOB_INFO_EXPIRATION_TIME_IN_DAYS), (id) -> { Env.getCurrentEnv().getEditLog().logDeleteAnalysisTask(new AnalyzeDeletionLog(id)); return null; @@ -140,7 +228,7 @@ public class AnalysisManager extends Daemon implements Writable { } private void clearMeta(Map infoMap, Predicate isExpired, - Function writeLog) { + Function writeLog) { synchronized (infoMap) { List expired = new ArrayList<>(); for (Entry entry : infoMap.entrySet()) { @@ -190,8 +278,8 @@ public class AnalysisManager extends Daemon implements Writable { // columnNames null means to add all visitable columns. AnalyzeTblStmt analyzeTblStmt = new AnalyzeTblStmt(analyzeProperties, tableName, table.getBaseSchema().stream().filter(c -> !StatisticsUtil.isUnsupportedType(c.getType())).map( - Column::getName).collect( - Collectors.toList()), db.getId(), table); + Column::getName).collect( + Collectors.toList()), db.getId(), table); try { analyzeTblStmt.check(); } catch (AnalysisException analysisException) { @@ -267,6 +355,7 @@ public class AnalysisManager extends Daemon implements Writable { public void createSystemAnalysisJob(AnalysisInfo info, AnalysisTaskExecutor analysisTaskExecutor) throws DdlException { AnalysisInfo jobInfo = buildAnalysisJobInfo(info); + systemJobInfoMap.put(info.jobId, info); if (jobInfo.colToPartitions.isEmpty()) { // No statistics need to be collected or updated return; @@ -275,11 +364,8 @@ public class AnalysisManager extends Daemon implements Writable { Map analysisTaskInfos = new HashMap<>(); createTaskForEachColumns(jobInfo, analysisTaskInfos, false); createTaskForMVIdx(jobInfo, analysisTaskInfos, false); - if (!jobInfo.jobType.equals(JobType.SYSTEM)) { - persistAnalysisJob(jobInfo); - analysisJobIdToTaskMap.put(jobInfo.jobId, analysisTaskInfos); - } - analysisTaskInfos.values().forEach(taskExecutor::submitTask); + analysisJobIdToTaskMap.put(jobInfo.jobId, analysisTaskInfos); + analysisTaskInfos.values().forEach(analysisTaskExecutor::submitTask); } private void sendJobId(List analysisInfos, boolean proxy) { @@ -327,14 +413,15 @@ public class AnalysisManager extends Daemon implements Writable { * TODO Supports incremental collection of statistics from materialized views */ private Map> validateAndGetPartitions(TableIf table, Set columnNames, - Set partitionNames, AnalysisType analysisType, AnalysisMode analysisMode) throws DdlException { + Set partitionNames, AnalysisType analysisType, + AnalysisMode analysisMode) throws DdlException { long tableId = table.getId(); Map> columnToPartitions = columnNames.stream() - .collect(Collectors.toMap( + .collect(Collectors.toMap( columnName -> columnName, - columnName -> new HashSet<>(partitionNames) - )); + columnName -> new HashSet<>(partitionNames == null ? Collections.emptySet() : partitionNames) + )); if (analysisType == AnalysisType.HISTOGRAM) { // Collecting histograms does not need to support incremental collection, @@ -476,15 +563,8 @@ public class AnalysisManager extends Daemon implements Writable { taskInfoBuilder.setMaxBucketNum(jobInfo.maxBucketNum); taskInfoBuilder.setPeriodTimeInMs(jobInfo.periodTimeInMs); taskInfoBuilder.setLastExecTimeInMs(jobInfo.lastExecTimeInMs); - try { - TableIf table = StatisticsUtil - .findTable(jobInfo.catalogName, jobInfo.dbName, jobInfo.tblName); - Map> colToPartitions = validateAndGetPartitions(table, jobInfo.colToPartitions.keySet(), - jobInfo.partitionNames, jobInfo.analysisType, jobInfo.analysisMode); - taskInfoBuilder.setColToPartitions(colToPartitions); - } catch (Throwable e) { - throw new RuntimeException(e); - } + taskInfoBuilder.setColToPartitions(jobInfo.colToPartitions); + taskInfoBuilder.setTaskIds(new ArrayList<>()); return taskInfoBuilder.build(); } @@ -498,7 +578,7 @@ public class AnalysisManager extends Daemon implements Writable { } private void createTaskForMVIdx(AnalysisInfo jobInfo, Map analysisTasks, - boolean isSync) throws DdlException { + boolean isSync) throws DdlException { TableIf table; try { table = StatisticsUtil.findTable(jobInfo.catalogName, jobInfo.dbName, jobInfo.tblName); @@ -539,7 +619,7 @@ public class AnalysisManager extends Daemon implements Writable { } private void createTaskForEachColumns(AnalysisInfo jobInfo, Map analysisTasks, - boolean isSync) throws DdlException { + boolean isSync) throws DdlException { Map> columnToPartitions = jobInfo.colToPartitions; for (Entry> entry : columnToPartitions.entrySet()) { long indexId = -1; @@ -580,8 +660,8 @@ public class AnalysisManager extends Daemon implements Writable { } private void createTaskForExternalTable(AnalysisInfo jobInfo, - Map analysisTasks, - boolean isSync) throws DdlException { + Map analysisTasks, + boolean isSync) throws DdlException { TableIf table; try { table = StatisticsUtil.findTable(jobInfo.catalogName, jobInfo.dbName, jobInfo.tblName); @@ -610,57 +690,8 @@ public class AnalysisManager extends Daemon implements Writable { } public void updateTaskStatus(AnalysisInfo info, AnalysisState taskState, String message, long time) { - if (analysisJobIdToTaskMap.get(info.jobId) == null) { - return; - } - info.state = taskState; - info.message = message; - // Update the task cost time when task finished or failed. And only log the final state. - if (taskState.equals(AnalysisState.FINISHED) || taskState.equals(AnalysisState.FAILED)) { - info.timeCostInMs = time - info.lastExecTimeInMs; - info.lastExecTimeInMs = time; - logCreateAnalysisTask(info); - } - info.lastExecTimeInMs = time; - AnalysisInfo job = analysisJobInfoMap.get(info.jobId); - // Synchronize the job state change in job level. - synchronized (job) { - job.lastExecTimeInMs = time; - // Set the job state to RUNNING when its first task becomes RUNNING. - if (info.state.equals(AnalysisState.RUNNING) && job.state.equals(AnalysisState.PENDING)) { - job.state = AnalysisState.RUNNING; - replayCreateAnalysisJob(job); - } - boolean allFinished = true; - boolean hasFailure = false; - for (BaseAnalysisTask task : analysisJobIdToTaskMap.get(info.jobId).values()) { - AnalysisInfo taskInfo = task.info; - if (taskInfo.state.equals(AnalysisState.RUNNING) || taskInfo.state.equals(AnalysisState.PENDING)) { - allFinished = false; - break; - } - if (taskInfo.state.equals(AnalysisState.FAILED)) { - hasFailure = true; - } - } - if (allFinished) { - if (hasFailure) { - job.state = AnalysisState.FAILED; - logCreateAnalysisJob(job); - } else { - job.state = AnalysisState.FINISHED; - if (job.jobType.equals(JobType.SYSTEM)) { - try { - updateTableStats(job); - } catch (Throwable e) { - LOG.warn("Failed to update Table statistics in job: {}", info.toString(), e); - } - } - logCreateAnalysisJob(job); - } - analysisJobIdToTaskMap.remove(job.jobId); - } - } + TaskStatusWrapper taskStatusWrapper = new TaskStatusWrapper(info, taskState, message, time); + updaters[info.jobType.ordinal()].apply(taskStatusWrapper); } private void updateTableStats(AnalysisInfo jobInfo) throws Throwable { @@ -696,15 +727,6 @@ public class AnalysisManager extends Daemon implements Writable { } private void updateOlapTableStats(OlapTable table, Map params) throws Throwable { - for (Partition partition : table.getPartitions()) { - HashMap partParams = Maps.newHashMap(params); - long rowCount = partition.getBaseIndex().getRowCount(); - partParams.put("id", StatisticsUtil - .constructId(params.get("id"), partition.getId())); - partParams.put("partId", String.valueOf(partition.getId())); - partParams.put("rowCount", String.valueOf(rowCount)); - StatisticsRepository.persistTableStats(partParams); - } HashMap tblParams = Maps.newHashMap(params); long rowCount = table.getRowCount(); @@ -717,12 +739,12 @@ public class AnalysisManager extends Daemon implements Writable { String state = stmt.getStateValue(); TableName tblName = stmt.getDbTableName(); return analysisJobInfoMap.values().stream() - .filter(a -> stmt.getJobId() == 0 || a.jobId == stmt.getJobId()) - .filter(a -> state == null || a.state.equals(AnalysisState.valueOf(state))) - .filter(a -> tblName == null || a.catalogName.equals(tblName.getCtl()) - && a.dbName.equals(tblName.getDb()) && a.tblName.equals(tblName.getTbl())) - .sorted(Comparator.comparingLong(a -> a.jobId)) - .collect(Collectors.toList()); + .filter(a -> stmt.getJobId() == 0 || a.jobId == stmt.getJobId()) + .filter(a -> state == null || a.state.equals(AnalysisState.valueOf(state))) + .filter(a -> tblName == null || a.catalogName.equals(tblName.getCtl()) + && a.dbName.equals(tblName.getDb()) && a.tblName.equals(tblName.getTbl())) + .sorted(Comparator.comparingLong(a -> a.jobId)) + .collect(Collectors.toList()); } public String getJobProgress(long jobId) { @@ -765,12 +787,12 @@ public class AnalysisManager extends Daemon implements Writable { private ThreadPoolExecutor createThreadPoolForSyncAnalyze() { String poolName = "SYNC ANALYZE THREAD POOL"; return new ThreadPoolExecutor(0, - ConnectContext.get().getSessionVariable().parallelSyncAnalyzeTaskNum, - 0, TimeUnit.SECONDS, + ConnectContext.get().getSessionVariable().parallelSyncAnalyzeTaskNum, + 0, TimeUnit.SECONDS, new SynchronousQueue(), new ThreadFactoryBuilder().setDaemon(true).setNameFormat("SYNC ANALYZE" + "-%d") - .build(), new BlockedPolicy(poolName, - (int) TimeUnit.HOURS.toSeconds(Config.analyze_task_timeout_in_hours))); + .build(), new BlockedPolicy(poolName, + (int) TimeUnit.HOURS.toSeconds(Config.analyze_task_timeout_in_hours))); } public void dropStats(DropStatsStmt dropStatsStmt) throws DdlException { @@ -821,7 +843,7 @@ public class AnalysisManager extends Daemon implements Writable { if (!Env.getCurrentEnv().getAccessManager() .checkTblPriv(ConnectContext.get(), analysisInfo.dbName, analysisInfo.tblName, PrivPredicate.SELECT)) { throw new RuntimeException("You need at least SELECT PRIV to corresponding table to kill this analyze" - + " job"); + + " job"); } } @@ -907,26 +929,26 @@ public class AnalysisManager extends Daemon implements Writable { } if (!colNames.isEmpty()) { throw new RuntimeException("Failed to analyze following columns:[" + String.join(",", colNames) - + "] Reasons: " + String.join(",", errorMessages)); + + "] Reasons: " + String.join(",", errorMessages)); } } private void updateSyncTaskStatus(BaseAnalysisTask task, AnalysisState state) { Env.getCurrentEnv().getAnalysisManager() - .updateTaskStatus(task.info, state, "", System.currentTimeMillis()); + .updateTaskStatus(task.info, state, "", System.currentTimeMillis()); } } public List findAutomaticAnalysisJobs() { synchronized (analysisJobInfoMap) { return analysisJobInfoMap.values().stream() - .filter(a -> - a.scheduleType.equals(ScheduleType.AUTOMATIC) - && (!(a.state.equals(AnalysisState.RUNNING) - || a.state.equals(AnalysisState.PENDING))) - && System.currentTimeMillis() - a.lastExecTimeInMs - > TimeUnit.MINUTES.toMillis(Config.auto_check_statistics_in_minutes)) - .collect(Collectors.toList()); + .filter(a -> + a.scheduleType.equals(ScheduleType.AUTOMATIC) + && (!(a.state.equals(AnalysisState.RUNNING) + || a.state.equals(AnalysisState.PENDING))) + && System.currentTimeMillis() - a.lastExecTimeInMs + > TimeUnit.MINUTES.toMillis(Config.auto_check_statistics_in_minutes)) + .collect(Collectors.toList()); } } @@ -938,13 +960,13 @@ public class AnalysisManager extends Daemon implements Writable { } if (a.cronExpression == null) { return a.scheduleType.equals(ScheduleType.PERIOD) - && System.currentTimeMillis() - a.lastExecTimeInMs > a.periodTimeInMs; + && System.currentTimeMillis() - a.lastExecTimeInMs > a.periodTimeInMs; } return a.cronExpression.getTimeAfter(new Date(a.lastExecTimeInMs)).before(new Date()); }; return analysisJobInfoMap.values().stream() - .filter(p) - .collect(Collectors.toList()); + .filter(p) + .collect(Collectors.toList()); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoAnalyzer.java index ebe687d78e..bb068d3ac0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoAnalyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoAnalyzer.java @@ -17,12 +17,13 @@ package org.apache.doris.statistics; -import org.apache.doris.analysis.AnalyzeProperties; +import org.apache.doris.analysis.TableName; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.TableIf; +import org.apache.doris.catalog.View; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.util.MasterDaemon; @@ -38,8 +39,8 @@ import org.apache.logging.log4j.Logger; import java.time.LocalTime; import java.time.format.DateTimeFormatter; import java.time.format.DateTimeParseException; +import java.util.ArrayList; import java.util.Collection; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -53,7 +54,8 @@ public class StatisticsAutoAnalyzer extends MasterDaemon { private AnalysisTaskExecutor analysisTaskExecutor; public StatisticsAutoAnalyzer() { - super("Automatic Analyzer", TimeUnit.MINUTES.toMillis(Config.auto_check_statistics_in_minutes)); + super("Automatic Analyzer", + TimeUnit.MINUTES.toMillis(Config.auto_check_statistics_in_minutes) / 2); analysisTaskExecutor = new AnalysisTaskExecutor(Config.full_auto_analyze_simultaneously_running_task_num); } @@ -92,8 +94,7 @@ public class StatisticsAutoAnalyzer extends MasterDaemon { continue; } AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager(); - List analysisInfos = analysisManager.buildAnalysisInfosForDB(databaseIf, - AnalyzeProperties.DEFAULT_PROP); + List analysisInfos = constructAnalysisInfo(databaseIf); for (AnalysisInfo analysisInfo : analysisInfos) { analysisInfo = getReAnalyzeRequiredPart(analysisInfo); if (analysisInfo == null) { @@ -106,10 +107,34 @@ public class StatisticsAutoAnalyzer extends MasterDaemon { } } } - } + } - analyzePeriodically(); + private List constructAnalysisInfo(DatabaseIf db) { + List analysisInfos = new ArrayList<>(); + for (TableIf table : db.getTables()) { + if (table instanceof View) { + continue; + } + TableName tableName = new TableName(db.getCatalog().getName(), db.getFullName(), + table.getName()); + AnalysisInfo jobInfo = new AnalysisInfoBuilder() + .setJobId(Env.getCurrentEnv().getNextId()) + .setCatalogName(db.getCatalog().getName()) + .setDbName(db.getFullName()) + .setTblName(tableName.getTbl()) + .setColName( + table.getBaseSchema().stream().filter(c -> !StatisticsUtil.isUnsupportedType(c.getType())).map( + Column::getName).collect(Collectors.joining(",")) + ) + .setAnalysisType(AnalysisInfo.AnalysisType.FUNDAMENTALS) + .setAnalysisMode(AnalysisInfo.AnalysisMode.INCREMENTAL) + .setAnalysisMethod(AnalysisInfo.AnalysisMethod.FULL) + .setScheduleType(AnalysisInfo.ScheduleType.ONCE) + .setJobType(JobType.SYSTEM).build(); + analysisInfos.add(jobInfo); + } + return analysisInfos; } private void analyzePeriodically() { @@ -174,23 +199,17 @@ public class StatisticsAutoAnalyzer extends MasterDaemon { return null; } - if (tblStats == TableStatistic.UNKNOWN) { - return jobInfo; - } - - if (!needReanalyzeTable(table, tblStats)) { + if (!(needReanalyzeTable(table, tblStats) || tblStats == TableStatistic.UNKNOWN)) { return null; } - Set needRunPartitions = new HashSet<>(); - Set statsPartitions = jobInfo.colToPartitions.values() - .stream() - .flatMap(Collection::stream) + Set needRunPartitions = table.getPartitionNames().stream() + .map(table::getPartition) + .filter(Partition::hasData) + .filter(partition -> + partition.getVisibleVersionTime() >= lastExecTimeInMs).map(Partition::getName) .collect(Collectors.toSet()); - checkAnalyzedPartitions(table, statsPartitions, needRunPartitions, lastExecTimeInMs); - checkNewPartitions(table, needRunPartitions, lastExecTimeInMs); - if (needRunPartitions.isEmpty()) { return null; } @@ -205,65 +224,22 @@ public class StatisticsAutoAnalyzer extends MasterDaemon { return tblHealth < StatisticConstants.TABLE_STATS_HEALTH_THRESHOLD; } - private void checkAnalyzedPartitions(TableIf table, Set statsPartitions, - Set needRunPartitions, long lastExecTimeInMs) { - for (String statsPartition : statsPartitions) { - Partition partition = table.getPartition(statsPartition); - if (partition == null) { - // Partition that has been deleted also need to - // be reanalyzed (delete partition statistics later) - needRunPartitions.add(statsPartition); - continue; - } - TableStatistic partitionStats = null; - try { - partitionStats = StatisticsRepository - .fetchTableLevelOfPartStats(partition.getId()); - } catch (DdlException e) { - LOG.warn("Failed to fetch part stats", e); - continue; - } - - if (needReanalyzePartition(lastExecTimeInMs, partition, partitionStats) - || partitionStats == TableStatistic.UNKNOWN) { - needRunPartitions.add(partition.getName()); - } - } - } - - private boolean needReanalyzePartition(long lastExecTimeInMs, Partition partition, TableStatistic partStats) { - long partUpdateTime = partition.getVisibleVersionTime(); - if (partUpdateTime < lastExecTimeInMs) { - return false; - } - long pRowCount = partition.getBaseIndex().getRowCount(); - long pUpdateRows = Math.abs(pRowCount - partStats.rowCount); - int partHealth = StatisticsUtil.getTableHealth(pRowCount, pUpdateRows); - return partHealth < StatisticConstants.TABLE_STATS_HEALTH_THRESHOLD; - } - - private void checkNewPartitions(TableIf table, Set needRunPartitions, long lastExecTimeInMs) { - Set partitionNames = table.getPartitionNames(); - partitionNames.removeAll(needRunPartitions); - needRunPartitions.addAll( - partitionNames.stream() - .map(table::getPartition) - .filter(partition -> partition.getVisibleVersionTime() >= lastExecTimeInMs) - .map(Partition::getName) - .collect(Collectors.toSet()) - ); - } - private AnalysisInfo getAnalysisJobInfo(AnalysisInfo jobInfo, TableIf table, Set needRunPartitions) { Map> newColToPartitions = Maps.newHashMap(); Map> colToPartitions = jobInfo.colToPartitions; - colToPartitions.keySet().forEach(colName -> { - Column column = table.getColumn(colName); - if (column != null) { - newColToPartitions.put(colName, needRunPartitions); + if (colToPartitions == null) { + for (Column c : table.getColumns()) { + newColToPartitions.put(c.getName(), needRunPartitions); } - }); + } else { + colToPartitions.keySet().forEach(colName -> { + Column column = table.getColumn(colName); + if (column != null) { + newColToPartitions.put(colName, needRunPartitions); + } + }); + } return new AnalysisInfoBuilder(jobInfo) .setColToPartitions(newColToPartitions).build(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java index 7a043e7708..b7b717b5b1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java @@ -395,7 +395,7 @@ public class StatisticsRepository { if (resultRows.size() == 1) { return TableStatistic.fromResultRow(resultRows.get(0)); } - throw new DdlException("Query result is not as expected: " + sql); + return TableStatistic.UNKNOWN; } public static TableStatistic fetchTableLevelOfPartStats(long partId) throws DdlException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/TaskStatusWrapper.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/TaskStatusWrapper.java new file mode 100644 index 0000000000..d74b14267d --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/TaskStatusWrapper.java @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.statistics; + +public class TaskStatusWrapper { + + public final AnalysisInfo info; + public final AnalysisState taskState; + public final String message; + public final long time; + + public TaskStatusWrapper(AnalysisInfo info, AnalysisState taskState, String message, long time) { + this.info = info; + this.taskState = taskState; + this.message = message; + this.time = time; + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java index 253f9c9332..8e809f9de7 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java @@ -17,6 +17,9 @@ package org.apache.doris.statistics; +import org.apache.doris.statistics.AnalysisInfo.AnalysisType; +import org.apache.doris.statistics.AnalysisInfo.JobType; + import mockit.Mock; import mockit.MockUp; import mockit.Mocked; @@ -42,12 +45,22 @@ public class AnalysisManagerTest { }; + new MockUp() { + @Mock + public String toString() { + return ""; + } + }; + AnalysisInfo job = new AnalysisInfoBuilder().setJobId(1) - .setState(AnalysisState.PENDING).setJobType(AnalysisInfo.JobType.MANUAL).build(); + .setState(AnalysisState.PENDING).setAnalysisType(AnalysisType.FUNDAMENTALS) + .setJobType(AnalysisInfo.JobType.MANUAL).build(); AnalysisInfo taskInfo1 = new AnalysisInfoBuilder().setJobId(1) - .setTaskId(2).setState(AnalysisState.PENDING).build(); + .setTaskId(2).setJobType(JobType.MANUAL).setAnalysisType(AnalysisType.FUNDAMENTALS) + .setState(AnalysisState.PENDING).build(); AnalysisInfo taskInfo2 = new AnalysisInfoBuilder().setJobId(1) - .setTaskId(3).setState(AnalysisState.PENDING).build(); + .setTaskId(3).setAnalysisType(AnalysisType.FUNDAMENTALS).setJobType(JobType.MANUAL) + .setState(AnalysisState.PENDING).build(); AnalysisManager manager = new AnalysisManager(); manager.replayCreateAnalysisJob(job); manager.replayCreateAnalysisTask(taskInfo1);