From 151120c90758ce2a83047805f04e96f12bc16079 Mon Sep 17 00:00:00 2001 From: Jibing-Li <64681310+Jibing-Li@users.noreply.github.com> Date: Thu, 3 Aug 2023 21:22:37 +0800 Subject: [PATCH] [Improvement](statistics)Improve show analyze performance. #22484 --- .../apache/doris/statistics/AnalysisInfo.java | 13 ++++++++++++- .../doris/statistics/AnalysisInfoBuilder.java | 11 ++++++++++- .../doris/statistics/AnalysisManager.java | 18 +++++++++++++++++- 3 files changed, 39 insertions(+), 3 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java index 53032778cc..285950816f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java @@ -39,6 +39,7 @@ import java.lang.reflect.Type; import java.text.ParseException; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.StringJoiner; @@ -79,9 +80,14 @@ public class AnalysisInfo implements Writable { @SerializedName("jobId") public final long jobId; + // When this AnalysisInfo represent a task, this is the task id for it. @SerializedName("taskId") public final long taskId; + // When this AnalysisInfo represent a job, this is the list of task ids belong to this job. + @SerializedName("taskIds") + public final List taskIds; + @SerializedName("catalogName") public final String catalogName; @@ -161,7 +167,7 @@ public class AnalysisInfo implements Writable { public CronExpression cronExpression; - public AnalysisInfo(long jobId, long taskId, String catalogName, String dbName, String tblName, + public AnalysisInfo(long jobId, long taskId, List taskIds, String catalogName, String dbName, String tblName, Map> colToPartitions, Set partitionNames, String colName, Long indexId, JobType jobType, AnalysisMode analysisMode, AnalysisMethod analysisMethod, AnalysisType analysisType, int samplePercent, int sampleRows, int maxBucketNum, long periodTimeInMs, String message, @@ -170,6 +176,7 @@ public class AnalysisInfo implements Writable { CronExpression cronExpression) { this.jobId = jobId; this.taskId = taskId; + this.taskIds = taskIds; this.catalogName = catalogName; this.dbName = dbName; this.tblName = tblName; @@ -241,6 +248,10 @@ public class AnalysisInfo implements Writable { return taskId == -1; } + public void addTaskId(long taskId) { + taskIds.add(taskId); + } + // TODO: use thrift public static AnalysisInfo fromResultRow(ResultRow resultRow) { try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfoBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfoBuilder.java index 5efce203de..dc368fcdf1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfoBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfoBuilder.java @@ -25,12 +25,14 @@ import org.apache.doris.statistics.AnalysisInfo.ScheduleType; import org.quartz.CronExpression; +import java.util.List; import java.util.Map; import java.util.Set; public class AnalysisInfoBuilder { private long jobId; private long taskId; + private List taskIds; private String catalogName; private String dbName; private String tblName; @@ -63,6 +65,7 @@ public class AnalysisInfoBuilder { public AnalysisInfoBuilder(AnalysisInfo info) { jobId = info.jobId; taskId = info.taskId; + taskIds = info.taskIds; catalogName = info.catalogName; dbName = info.dbName; tblName = info.tblName; @@ -98,6 +101,11 @@ public class AnalysisInfoBuilder { return this; } + public AnalysisInfoBuilder setTaskIds(List taskIds) { + this.taskIds = taskIds; + return this; + } + public AnalysisInfoBuilder setCatalogName(String catalogName) { this.catalogName = catalogName; return this; @@ -218,7 +226,7 @@ public class AnalysisInfoBuilder { } public AnalysisInfo build() { - return new AnalysisInfo(jobId, taskId, catalogName, dbName, tblName, colToPartitions, partitionNames, + return new AnalysisInfo(jobId, taskId, taskIds, catalogName, dbName, tblName, colToPartitions, partitionNames, colName, indexId, jobType, analysisMode, analysisMethod, analysisType, samplePercent, sampleRows, maxBucketNum, periodTimeInMs, message, lastExecTimeInMs, timeCostInMs, state, scheduleType, externalTableLevelTask, partitionOnly, samplingPartition, cronExpression); @@ -228,6 +236,7 @@ public class AnalysisInfoBuilder { return new AnalysisInfoBuilder() .setJobId(jobId) .setTaskId(taskId) + .setTaskIds(taskIds) .setCatalogName(catalogName) .setDbName(dbName) .setTblName(tblName) diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java index 24e64ceabb..b67e4f76ac 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java @@ -59,6 +59,7 @@ import org.apache.doris.statistics.AnalysisInfo.ScheduleType; import org.apache.doris.statistics.util.StatisticsUtil; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.logging.log4j.LogManager; @@ -452,6 +453,7 @@ public class AnalysisManager extends Daemon implements Writable { Map> colToPartitions = validateAndGetPartitions(table, columnNames, partitionNames, analysisType, analysisMode); taskInfoBuilder.setColToPartitions(colToPartitions); + taskInfoBuilder.setTaskIds(Lists.newArrayList()); return taskInfoBuilder.build(); } @@ -524,6 +526,7 @@ public class AnalysisManager extends Daemon implements Writable { AnalysisInfoBuilder indexTaskInfoBuilder = new AnalysisInfoBuilder(jobInfo); AnalysisInfo analysisInfo = indexTaskInfoBuilder.setIndexId(indexId) .setTaskId(taskId).setLastExecTimeInMs(System.currentTimeMillis()).build(); + jobInfo.addTaskId(taskId); if (isSync) { return; } @@ -550,6 +553,7 @@ public class AnalysisManager extends Daemon implements Writable { AnalysisInfo analysisInfo = colTaskInfoBuilder.setColName(colName).setIndexId(indexId) .setTaskId(taskId).setLastExecTimeInMs(System.currentTimeMillis()).build(); analysisTasks.put(taskId, createTask(analysisInfo)); + jobInfo.addTaskId(taskId); if (isSync) { continue; } @@ -593,6 +597,7 @@ public class AnalysisManager extends Daemon implements Writable { AnalysisInfo analysisInfo = colTaskInfoBuilder.setIndexId(-1L).setLastExecTimeInMs(System.currentTimeMillis()) .setTaskId(taskId).setColName("TableRowCount").setExternalTableLevelTask(true).build(); analysisTasks.put(taskId, createTask(analysisInfo)); + jobInfo.addTaskId(taskId); if (isSync) { // For sync job, don't need to persist, return here and execute it immediately. return; @@ -721,7 +726,10 @@ public class AnalysisManager extends Daemon implements Writable { } public String getJobProgress(long jobId) { - List tasks = findTasks(jobId); + List tasks = findTasksByTaskIds(jobId); + if (tasks == null) { + return "N/A"; + } int finished = 0; int failed = 0; int inProgress = 0; @@ -946,6 +954,14 @@ public class AnalysisManager extends Daemon implements Writable { } } + public List findTasksByTaskIds(long jobId) { + AnalysisInfo jobInfo = analysisJobInfoMap.get(jobId); + if (jobInfo != null && jobInfo.taskIds != null) { + return jobInfo.taskIds.stream().map(id -> analysisTaskInfoMap.get(id)).collect(Collectors.toList()); + } + return null; + } + public void removeAll(List analysisInfos) { for (AnalysisInfo analysisInfo : analysisInfos) { analysisTaskInfoMap.remove(analysisInfo.taskId);