[Improvement](statistics)Improve show analyze performance. #22484
This commit is contained in:
@ -39,6 +39,7 @@ import java.lang.reflect.Type;
|
||||
import java.text.ParseException;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.StringJoiner;
|
||||
@ -79,9 +80,14 @@ public class AnalysisInfo implements Writable {
|
||||
@SerializedName("jobId")
|
||||
public final long jobId;
|
||||
|
||||
// When this AnalysisInfo represent a task, this is the task id for it.
|
||||
@SerializedName("taskId")
|
||||
public final long taskId;
|
||||
|
||||
// When this AnalysisInfo represent a job, this is the list of task ids belong to this job.
|
||||
@SerializedName("taskIds")
|
||||
public final List<Long> taskIds;
|
||||
|
||||
@SerializedName("catalogName")
|
||||
public final String catalogName;
|
||||
|
||||
@ -161,7 +167,7 @@ public class AnalysisInfo implements Writable {
|
||||
|
||||
public CronExpression cronExpression;
|
||||
|
||||
public AnalysisInfo(long jobId, long taskId, String catalogName, String dbName, String tblName,
|
||||
public AnalysisInfo(long jobId, long taskId, List<Long> taskIds, String catalogName, String dbName, String tblName,
|
||||
Map<String, Set<String>> colToPartitions, Set<String> partitionNames, String colName, Long indexId,
|
||||
JobType jobType, AnalysisMode analysisMode, AnalysisMethod analysisMethod, AnalysisType analysisType,
|
||||
int samplePercent, int sampleRows, int maxBucketNum, long periodTimeInMs, String message,
|
||||
@ -170,6 +176,7 @@ public class AnalysisInfo implements Writable {
|
||||
CronExpression cronExpression) {
|
||||
this.jobId = jobId;
|
||||
this.taskId = taskId;
|
||||
this.taskIds = taskIds;
|
||||
this.catalogName = catalogName;
|
||||
this.dbName = dbName;
|
||||
this.tblName = tblName;
|
||||
@ -241,6 +248,10 @@ public class AnalysisInfo implements Writable {
|
||||
return taskId == -1;
|
||||
}
|
||||
|
||||
public void addTaskId(long taskId) {
|
||||
taskIds.add(taskId);
|
||||
}
|
||||
|
||||
// TODO: use thrift
|
||||
public static AnalysisInfo fromResultRow(ResultRow resultRow) {
|
||||
try {
|
||||
|
||||
@ -25,12 +25,14 @@ import org.apache.doris.statistics.AnalysisInfo.ScheduleType;
|
||||
|
||||
import org.quartz.CronExpression;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
public class AnalysisInfoBuilder {
|
||||
private long jobId;
|
||||
private long taskId;
|
||||
private List<Long> taskIds;
|
||||
private String catalogName;
|
||||
private String dbName;
|
||||
private String tblName;
|
||||
@ -63,6 +65,7 @@ public class AnalysisInfoBuilder {
|
||||
public AnalysisInfoBuilder(AnalysisInfo info) {
|
||||
jobId = info.jobId;
|
||||
taskId = info.taskId;
|
||||
taskIds = info.taskIds;
|
||||
catalogName = info.catalogName;
|
||||
dbName = info.dbName;
|
||||
tblName = info.tblName;
|
||||
@ -98,6 +101,11 @@ public class AnalysisInfoBuilder {
|
||||
return this;
|
||||
}
|
||||
|
||||
public AnalysisInfoBuilder setTaskIds(List<Long> taskIds) {
|
||||
this.taskIds = taskIds;
|
||||
return this;
|
||||
}
|
||||
|
||||
public AnalysisInfoBuilder setCatalogName(String catalogName) {
|
||||
this.catalogName = catalogName;
|
||||
return this;
|
||||
@ -218,7 +226,7 @@ public class AnalysisInfoBuilder {
|
||||
}
|
||||
|
||||
public AnalysisInfo build() {
|
||||
return new AnalysisInfo(jobId, taskId, catalogName, dbName, tblName, colToPartitions, partitionNames,
|
||||
return new AnalysisInfo(jobId, taskId, taskIds, catalogName, dbName, tblName, colToPartitions, partitionNames,
|
||||
colName, indexId, jobType, analysisMode, analysisMethod, analysisType, samplePercent,
|
||||
sampleRows, maxBucketNum, periodTimeInMs, message, lastExecTimeInMs, timeCostInMs, state, scheduleType,
|
||||
externalTableLevelTask, partitionOnly, samplingPartition, cronExpression);
|
||||
@ -228,6 +236,7 @@ public class AnalysisInfoBuilder {
|
||||
return new AnalysisInfoBuilder()
|
||||
.setJobId(jobId)
|
||||
.setTaskId(taskId)
|
||||
.setTaskIds(taskIds)
|
||||
.setCatalogName(catalogName)
|
||||
.setDbName(dbName)
|
||||
.setTblName(tblName)
|
||||
|
||||
@ -59,6 +59,7 @@ import org.apache.doris.statistics.AnalysisInfo.ScheduleType;
|
||||
import org.apache.doris.statistics.util.StatisticsUtil;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
@ -452,6 +453,7 @@ public class AnalysisManager extends Daemon implements Writable {
|
||||
Map<String, Set<String>> colToPartitions = validateAndGetPartitions(table, columnNames,
|
||||
partitionNames, analysisType, analysisMode);
|
||||
taskInfoBuilder.setColToPartitions(colToPartitions);
|
||||
taskInfoBuilder.setTaskIds(Lists.newArrayList());
|
||||
|
||||
return taskInfoBuilder.build();
|
||||
}
|
||||
@ -524,6 +526,7 @@ public class AnalysisManager extends Daemon implements Writable {
|
||||
AnalysisInfoBuilder indexTaskInfoBuilder = new AnalysisInfoBuilder(jobInfo);
|
||||
AnalysisInfo analysisInfo = indexTaskInfoBuilder.setIndexId(indexId)
|
||||
.setTaskId(taskId).setLastExecTimeInMs(System.currentTimeMillis()).build();
|
||||
jobInfo.addTaskId(taskId);
|
||||
if (isSync) {
|
||||
return;
|
||||
}
|
||||
@ -550,6 +553,7 @@ public class AnalysisManager extends Daemon implements Writable {
|
||||
AnalysisInfo analysisInfo = colTaskInfoBuilder.setColName(colName).setIndexId(indexId)
|
||||
.setTaskId(taskId).setLastExecTimeInMs(System.currentTimeMillis()).build();
|
||||
analysisTasks.put(taskId, createTask(analysisInfo));
|
||||
jobInfo.addTaskId(taskId);
|
||||
if (isSync) {
|
||||
continue;
|
||||
}
|
||||
@ -593,6 +597,7 @@ public class AnalysisManager extends Daemon implements Writable {
|
||||
AnalysisInfo analysisInfo = colTaskInfoBuilder.setIndexId(-1L).setLastExecTimeInMs(System.currentTimeMillis())
|
||||
.setTaskId(taskId).setColName("TableRowCount").setExternalTableLevelTask(true).build();
|
||||
analysisTasks.put(taskId, createTask(analysisInfo));
|
||||
jobInfo.addTaskId(taskId);
|
||||
if (isSync) {
|
||||
// For sync job, don't need to persist, return here and execute it immediately.
|
||||
return;
|
||||
@ -721,7 +726,10 @@ public class AnalysisManager extends Daemon implements Writable {
|
||||
}
|
||||
|
||||
public String getJobProgress(long jobId) {
|
||||
List<AnalysisInfo> tasks = findTasks(jobId);
|
||||
List<AnalysisInfo> tasks = findTasksByTaskIds(jobId);
|
||||
if (tasks == null) {
|
||||
return "N/A";
|
||||
}
|
||||
int finished = 0;
|
||||
int failed = 0;
|
||||
int inProgress = 0;
|
||||
@ -946,6 +954,14 @@ public class AnalysisManager extends Daemon implements Writable {
|
||||
}
|
||||
}
|
||||
|
||||
public List<AnalysisInfo> findTasksByTaskIds(long jobId) {
|
||||
AnalysisInfo jobInfo = analysisJobInfoMap.get(jobId);
|
||||
if (jobInfo != null && jobInfo.taskIds != null) {
|
||||
return jobInfo.taskIds.stream().map(id -> analysisTaskInfoMap.get(id)).collect(Collectors.toList());
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public void removeAll(List<AnalysisInfo> analysisInfos) {
|
||||
for (AnalysisInfo analysisInfo : analysisInfos) {
|
||||
analysisTaskInfoMap.remove(analysisInfo.taskId);
|
||||
|
||||
Reference in New Issue
Block a user