[improvement](statistic)Improve auto analyze visibility. (#29046)
Show auto analyze can show the running jobs, not only the finished/failed jobs. Show analyze task status could show auto tasks as well. Remove some useless code. Auto analyze execute catalog/db/table in the order of id, small id first.
This commit is contained in:
@ -2654,8 +2654,7 @@ public class ShowExecutor {
|
||||
|
||||
private void handleShowAnalyze() {
|
||||
ShowAnalyzeStmt showStmt = (ShowAnalyzeStmt) stmt;
|
||||
List<AnalysisInfo> results = Env.getCurrentEnv().getAnalysisManager()
|
||||
.showAnalysisJob(showStmt);
|
||||
List<AnalysisInfo> results = Env.getCurrentEnv().getAnalysisManager().showAnalysisJob(showStmt);
|
||||
List<List<String>> resultRows = Lists.newArrayList();
|
||||
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
||||
for (AnalysisInfo analysisInfo : results) {
|
||||
@ -2681,14 +2680,7 @@ public class ShowExecutor {
|
||||
LocalDateTime.ofInstant(Instant.ofEpochMilli(analysisInfo.lastExecTimeInMs),
|
||||
ZoneId.systemDefault())));
|
||||
row.add(analysisInfo.state.toString());
|
||||
try {
|
||||
row.add(showStmt.isAuto()
|
||||
? analysisInfo.progress
|
||||
: Env.getCurrentEnv().getAnalysisManager().getJobProgress(analysisInfo.jobId));
|
||||
} catch (Exception e) {
|
||||
row.add("N/A");
|
||||
LOG.warn("Failed to get progress for job: {}", analysisInfo, e);
|
||||
}
|
||||
row.add(Env.getCurrentEnv().getAnalysisManager().getJobProgress(analysisInfo.jobId));
|
||||
row.add(analysisInfo.scheduleType.toString());
|
||||
LocalDateTime startTime =
|
||||
LocalDateTime.ofInstant(Instant.ofEpochMilli(analysisInfo.startTime),
|
||||
|
||||
@ -78,7 +78,6 @@ import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedList;
|
||||
@ -96,8 +95,6 @@ import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class AnalysisManager implements Writable {
|
||||
@ -119,9 +116,6 @@ public class AnalysisManager implements Writable {
|
||||
protected final NavigableMap<Long, AnalysisInfo> analysisJobInfoMap =
|
||||
Collections.synchronizedNavigableMap(new TreeMap<>());
|
||||
|
||||
// Tracking system submitted job, keep in mem only
|
||||
protected final Map<Long, AnalysisInfo> systemJobInfoMap = new ConcurrentHashMap<>();
|
||||
|
||||
// Tracking and control sync analyze tasks, keep in mem only
|
||||
private final ConcurrentMap<ConnectContext, SyncTaskCollection> ctxToSyncTask = new ConcurrentHashMap<>();
|
||||
|
||||
@ -129,123 +123,11 @@ public class AnalysisManager implements Writable {
|
||||
|
||||
private final Map<Long, AnalysisJob> idToAnalysisJob = new ConcurrentHashMap<>();
|
||||
|
||||
// To be deprecated, keep it for meta compatibility now, will remove later.
|
||||
protected SimpleQueue<AnalysisInfo> autoJobs = createSimpleQueue(null, this);
|
||||
|
||||
private final Function<TaskStatusWrapper, Void> userJobStatusUpdater = w -> {
|
||||
AnalysisInfo info = w.info;
|
||||
AnalysisState taskState = w.taskState;
|
||||
String message = w.message;
|
||||
long time = w.time;
|
||||
if (analysisJobIdToTaskMap.get(info.jobId) == null) {
|
||||
return null;
|
||||
}
|
||||
info.state = taskState;
|
||||
info.message = message;
|
||||
// Update the task cost time when task finished or failed. And only log the final state.
|
||||
if (taskState.equals(AnalysisState.FINISHED) || taskState.equals(AnalysisState.FAILED)) {
|
||||
info.timeCostInMs = time - info.lastExecTimeInMs;
|
||||
info.lastExecTimeInMs = time;
|
||||
logCreateAnalysisTask(info);
|
||||
}
|
||||
info.lastExecTimeInMs = time;
|
||||
AnalysisInfo job = analysisJobInfoMap.get(info.jobId);
|
||||
// Job may get deleted during execution.
|
||||
if (job == null) {
|
||||
return null;
|
||||
}
|
||||
// Synchronize the job state change in job level.
|
||||
synchronized (job) {
|
||||
job.lastExecTimeInMs = time;
|
||||
// Set the job state to RUNNING when its first task becomes RUNNING.
|
||||
if (info.state.equals(AnalysisState.RUNNING) && job.state.equals(AnalysisState.PENDING)) {
|
||||
job.state = AnalysisState.RUNNING;
|
||||
job.markStartTime(System.currentTimeMillis());
|
||||
replayCreateAnalysisJob(job);
|
||||
}
|
||||
boolean allFinished = true;
|
||||
boolean hasFailure = false;
|
||||
for (BaseAnalysisTask task : analysisJobIdToTaskMap.get(info.jobId).values()) {
|
||||
AnalysisInfo taskInfo = task.info;
|
||||
if (taskInfo.state.equals(AnalysisState.RUNNING) || taskInfo.state.equals(AnalysisState.PENDING)) {
|
||||
allFinished = false;
|
||||
break;
|
||||
}
|
||||
if (taskInfo.state.equals(AnalysisState.FAILED)) {
|
||||
hasFailure = true;
|
||||
}
|
||||
}
|
||||
if (allFinished) {
|
||||
if (hasFailure) {
|
||||
job.markFailed();
|
||||
} else {
|
||||
job.markFinished();
|
||||
try {
|
||||
updateTableStats(job);
|
||||
} catch (Throwable e) {
|
||||
LOG.warn("Failed to update Table statistics in job: {}", info.toString(), e);
|
||||
}
|
||||
}
|
||||
logCreateAnalysisJob(job);
|
||||
analysisJobIdToTaskMap.remove(job.jobId);
|
||||
}
|
||||
}
|
||||
return null;
|
||||
};
|
||||
|
||||
private final String progressDisplayTemplate = "%d Finished | %d Failed | %d In Progress | %d Total";
|
||||
|
||||
protected final Function<TaskStatusWrapper, Void> systemJobStatusUpdater = w -> {
|
||||
AnalysisInfo info = w.info;
|
||||
info.state = w.taskState;
|
||||
info.message = w.message;
|
||||
AnalysisInfo job = systemJobInfoMap.get(info.jobId);
|
||||
if (job == null) {
|
||||
return null;
|
||||
}
|
||||
synchronized (job) {
|
||||
// Set the job state to RUNNING when its first task becomes RUNNING.
|
||||
if (info.state.equals(AnalysisState.RUNNING) && job.state.equals(AnalysisState.PENDING)) {
|
||||
job.state = AnalysisState.RUNNING;
|
||||
job.markStartTime(System.currentTimeMillis());
|
||||
}
|
||||
}
|
||||
int failedCount = 0;
|
||||
StringJoiner reason = new StringJoiner(", ");
|
||||
Map<Long, BaseAnalysisTask> taskMap = analysisJobIdToTaskMap.get(info.jobId);
|
||||
for (BaseAnalysisTask task : taskMap.values()) {
|
||||
if (task.info.state.equals(AnalysisState.RUNNING) || task.info.state.equals(AnalysisState.PENDING)) {
|
||||
return null;
|
||||
}
|
||||
if (task.info.state.equals(AnalysisState.FAILED)) {
|
||||
failedCount++;
|
||||
reason.add(task.info.message);
|
||||
}
|
||||
}
|
||||
try {
|
||||
updateTableStats(job);
|
||||
} catch (Throwable e) {
|
||||
LOG.warn("Failed to update Table statistics in job: {}", info.toString(), e);
|
||||
} finally {
|
||||
job.lastExecTimeInMs = System.currentTimeMillis();
|
||||
job.message = reason.toString();
|
||||
job.progress = String.format(progressDisplayTemplate,
|
||||
taskMap.size() - failedCount, failedCount, 0, taskMap.size());
|
||||
if (failedCount > 0) {
|
||||
job.message = reason.toString();
|
||||
job.markFailed();
|
||||
} else {
|
||||
job.markFinished();
|
||||
}
|
||||
autoJobs.offer(job);
|
||||
systemJobInfoMap.remove(info.jobId);
|
||||
analysisJobIdToTaskMap.remove(info.jobId);
|
||||
}
|
||||
return null;
|
||||
};
|
||||
|
||||
private final Function<TaskStatusWrapper, Void>[] updaters =
|
||||
new Function[] {userJobStatusUpdater, systemJobStatusUpdater};
|
||||
|
||||
public AnalysisManager() {
|
||||
if (!Env.isCheckpointThread()) {
|
||||
this.taskExecutor = new AnalysisTaskExecutor(Config.statistics_simultaneously_running_task_num,
|
||||
@ -524,7 +406,7 @@ public class AnalysisManager implements Writable {
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public void recordAnalysisJob(AnalysisInfo jobInfo) throws DdlException {
|
||||
public void recordAnalysisJob(AnalysisInfo jobInfo) {
|
||||
if (jobInfo.scheduleType == ScheduleType.PERIOD && jobInfo.lastExecTimeInMs > 0) {
|
||||
return;
|
||||
}
|
||||
@ -554,13 +436,7 @@ public class AnalysisManager implements Writable {
|
||||
if (isSync) {
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
if (!jobInfo.jobType.equals(JobType.SYSTEM)) {
|
||||
replayCreateAnalysisTask(analysisInfo);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new DdlException("Failed to create analysis task", e);
|
||||
}
|
||||
replayCreateAnalysisTask(analysisInfo);
|
||||
}
|
||||
}
|
||||
|
||||
@ -594,16 +470,63 @@ public class AnalysisManager implements Writable {
|
||||
// For sync job, don't need to persist, return here and execute it immediately.
|
||||
return;
|
||||
}
|
||||
try {
|
||||
replayCreateAnalysisTask(analysisInfo);
|
||||
} catch (Exception e) {
|
||||
throw new DdlException("Failed to create analysis task", e);
|
||||
}
|
||||
replayCreateAnalysisTask(analysisInfo);
|
||||
}
|
||||
|
||||
public void updateTaskStatus(AnalysisInfo info, AnalysisState taskState, String message, long time) {
|
||||
TaskStatusWrapper taskStatusWrapper = new TaskStatusWrapper(info, taskState, message, time);
|
||||
updaters[info.jobType.ordinal()].apply(taskStatusWrapper);
|
||||
if (analysisJobIdToTaskMap.get(info.jobId) == null) {
|
||||
return;
|
||||
}
|
||||
info.state = taskState;
|
||||
info.message = message;
|
||||
// Update the task cost time when task finished or failed. And only log the final state.
|
||||
if (taskState.equals(AnalysisState.FINISHED) || taskState.equals(AnalysisState.FAILED)) {
|
||||
info.timeCostInMs = time - info.lastExecTimeInMs;
|
||||
info.lastExecTimeInMs = time;
|
||||
logCreateAnalysisTask(info);
|
||||
}
|
||||
info.lastExecTimeInMs = time;
|
||||
AnalysisInfo job = analysisJobInfoMap.get(info.jobId);
|
||||
// Job may get deleted during execution.
|
||||
if (job == null) {
|
||||
return;
|
||||
}
|
||||
// Synchronize the job state change in job level.
|
||||
synchronized (job) {
|
||||
job.lastExecTimeInMs = time;
|
||||
// Set the job state to RUNNING when its first task becomes RUNNING.
|
||||
if (info.state.equals(AnalysisState.RUNNING) && job.state.equals(AnalysisState.PENDING)) {
|
||||
job.state = AnalysisState.RUNNING;
|
||||
job.markStartTime(System.currentTimeMillis());
|
||||
replayCreateAnalysisJob(job);
|
||||
}
|
||||
boolean allFinished = true;
|
||||
boolean hasFailure = false;
|
||||
for (BaseAnalysisTask task : analysisJobIdToTaskMap.get(info.jobId).values()) {
|
||||
AnalysisInfo taskInfo = task.info;
|
||||
if (taskInfo.state.equals(AnalysisState.RUNNING) || taskInfo.state.equals(AnalysisState.PENDING)) {
|
||||
allFinished = false;
|
||||
break;
|
||||
}
|
||||
if (taskInfo.state.equals(AnalysisState.FAILED)) {
|
||||
hasFailure = true;
|
||||
}
|
||||
}
|
||||
if (allFinished) {
|
||||
if (hasFailure) {
|
||||
job.markFailed();
|
||||
} else {
|
||||
job.markFinished();
|
||||
try {
|
||||
updateTableStats(job);
|
||||
} catch (Throwable e) {
|
||||
LOG.warn("Failed to update Table statistics in job: {}", info.toString(), e);
|
||||
}
|
||||
}
|
||||
logCreateAnalysisJob(job);
|
||||
analysisJobIdToTaskMap.remove(job.jobId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
@ -629,12 +552,6 @@ public class AnalysisManager implements Writable {
|
||||
}
|
||||
|
||||
public List<AnalysisInfo> showAnalysisJob(ShowAnalyzeStmt stmt) {
|
||||
if (stmt.isAuto()) {
|
||||
// It's ok to sync on this field, it would only be assigned when instance init or do checkpoint
|
||||
synchronized (autoJobs) {
|
||||
return findShowAnalyzeResult(autoJobs, stmt);
|
||||
}
|
||||
}
|
||||
return findShowAnalyzeResult(analysisJobInfoMap.values(), stmt);
|
||||
}
|
||||
|
||||
@ -650,6 +567,8 @@ public class AnalysisManager implements Writable {
|
||||
.filter(a -> stmt.getJobId() == 0 || a.jobId == stmt.getJobId())
|
||||
.filter(a -> state == null || a.state.equals(AnalysisState.valueOf(state)))
|
||||
.filter(a -> tblName == null || a.tblId == tblId)
|
||||
.filter(a -> stmt.isAuto() && a.jobType.equals(JobType.SYSTEM)
|
||||
|| !stmt.isAuto() && a.jobType.equals(JobType.MANUAL))
|
||||
.sorted(Comparator.comparingLong(a -> a.jobId))
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
@ -884,24 +803,6 @@ public class AnalysisManager implements Writable {
|
||||
}
|
||||
}
|
||||
|
||||
public List<AnalysisInfo> findPeriodicJobs() {
|
||||
synchronized (analysisJobInfoMap) {
|
||||
Predicate<AnalysisInfo> p = a -> {
|
||||
if (a.state.equals(AnalysisState.RUNNING)) {
|
||||
return false;
|
||||
}
|
||||
if (a.cronExpression == null) {
|
||||
return a.scheduleType.equals(ScheduleType.PERIOD)
|
||||
&& System.currentTimeMillis() - a.lastExecTimeInMs > a.periodTimeInMs;
|
||||
}
|
||||
return a.cronExpression.getTimeAfter(new Date(a.lastExecTimeInMs)).before(new Date());
|
||||
};
|
||||
return analysisJobInfoMap.values().stream()
|
||||
.filter(p)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
}
|
||||
|
||||
public List<AnalysisInfo> findTasks(long jobId) {
|
||||
synchronized (analysisTaskInfoMap) {
|
||||
return analysisTaskInfoMap.values().stream().filter(i -> i.jobId == jobId).collect(Collectors.toList());
|
||||
@ -979,10 +880,11 @@ public class AnalysisManager implements Writable {
|
||||
}
|
||||
}
|
||||
|
||||
// To be deprecated, keep it for meta compatibility now, will remove later.
|
||||
private static void readAutoJobs(DataInput in, AnalysisManager analysisManager) throws IOException {
|
||||
Type type = new TypeToken<LinkedList<AnalysisInfo>>() {}.getType();
|
||||
Collection<AnalysisInfo> autoJobs = GsonUtils.GSON.fromJson(Text.readString(in), type);
|
||||
analysisManager.autoJobs = analysisManager.createSimpleQueue(autoJobs, analysisManager);
|
||||
GsonUtils.GSON.fromJson(Text.readString(in), type);
|
||||
analysisManager.autoJobs = analysisManager.createSimpleQueue(null, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -1045,7 +947,7 @@ public class AnalysisManager implements Writable {
|
||||
}
|
||||
|
||||
public void registerSysJob(AnalysisInfo jobInfo, Map<Long, BaseAnalysisTask> taskInfos) {
|
||||
systemJobInfoMap.put(jobInfo.jobId, jobInfo);
|
||||
recordAnalysisJob(jobInfo);
|
||||
analysisJobIdToTaskMap.put(jobInfo.jobId, taskInfos);
|
||||
}
|
||||
|
||||
@ -1117,10 +1019,6 @@ public class AnalysisManager implements Writable {
|
||||
idToAnalysisJob.remove(id);
|
||||
}
|
||||
|
||||
public boolean hasUnFinished() {
|
||||
return !analysisJobIdToTaskMap.isEmpty();
|
||||
}
|
||||
|
||||
/**
|
||||
* Only OlapTable and Hive HMSExternalTable can sample for now.
|
||||
* @param table
|
||||
|
||||
@ -38,7 +38,6 @@ import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.time.LocalTime;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
@ -69,12 +68,12 @@ public class StatisticsAutoCollector extends StatisticsCollector {
|
||||
|
||||
@SuppressWarnings({"rawtypes", "unchecked"})
|
||||
private void analyzeAll() {
|
||||
Set<CatalogIf> catalogs = Env.getCurrentEnv().getCatalogMgr().getCopyOfCatalog();
|
||||
List<CatalogIf> catalogs = getCatalogsInOrder();
|
||||
for (CatalogIf ctl : catalogs) {
|
||||
if (!ctl.enableAutoAnalyze()) {
|
||||
continue;
|
||||
}
|
||||
Collection<DatabaseIf> dbs = ctl.getAllDbs();
|
||||
List<DatabaseIf> dbs = getDatabasesInOrder(ctl);
|
||||
for (DatabaseIf<TableIf> databaseIf : dbs) {
|
||||
if (StatisticConstants.SYSTEM_DBS.contains(databaseIf.getFullName())) {
|
||||
continue;
|
||||
@ -89,6 +88,21 @@ public class StatisticsAutoCollector extends StatisticsCollector {
|
||||
}
|
||||
}
|
||||
|
||||
public List<CatalogIf> getCatalogsInOrder() {
|
||||
return Env.getCurrentEnv().getCatalogMgr().getCopyOfCatalog().stream()
|
||||
.sorted((c1, c2) -> (int) (c1.getId() - c2.getId())).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
public List<DatabaseIf<? extends TableIf>> getDatabasesInOrder(CatalogIf<DatabaseIf> catalog) {
|
||||
return catalog.getAllDbs().stream()
|
||||
.sorted((d1, d2) -> (int) (d1.getId() - d2.getId())).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
public List<TableIf> getTablesInOrder(DatabaseIf<? extends TableIf> db) {
|
||||
return db.getTables().stream()
|
||||
.sorted((t1, t2) -> (int) (t1.getId() - t2.getId())).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
public void analyzeDb(DatabaseIf<TableIf> databaseIf) throws DdlException {
|
||||
List<AnalysisInfo> analysisInfos = constructAnalysisInfo(databaseIf);
|
||||
for (AnalysisInfo analysisInfo : analysisInfos) {
|
||||
@ -109,7 +123,7 @@ public class StatisticsAutoCollector extends StatisticsCollector {
|
||||
|
||||
protected List<AnalysisInfo> constructAnalysisInfo(DatabaseIf<? extends TableIf> db) {
|
||||
List<AnalysisInfo> analysisInfos = new ArrayList<>();
|
||||
for (TableIf table : db.getTables()) {
|
||||
for (TableIf table : getTablesInOrder(db)) {
|
||||
try {
|
||||
if (skip(table)) {
|
||||
continue;
|
||||
|
||||
@ -52,10 +52,6 @@ public abstract class StatisticsCollector extends MasterDaemon {
|
||||
if (Env.isCheckpointThread()) {
|
||||
return;
|
||||
}
|
||||
if (Env.getCurrentEnv().getAnalysisManager().hasUnFinished()) {
|
||||
LOG.info("Analyze tasks those submitted in last time is not finished, skip");
|
||||
return;
|
||||
}
|
||||
collect();
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user