[Fix](multi catalog statistics)Improve external table statistics collection (#22224)
Improve external table statistics collection, including log, observability and fix some bugs. 1. Add Running state for statistics job. 2. Add progress for show analyze job. (n/m tasks finished, n/m task failed and so on) 3. Add analyze time cost for show analyze task. 4. Make task failure message more clear. 5. Synchronize the job status updating code in updateTaskStatus. 6. Fix NPE in HMSAnalyzeTask. (Avoid refreshing statistics cache if the collection sql failed) 7. Return error message for with sync collection while timeout. 8. Log level improvement 9. Fix misuse of logCreateAnalysisJob for tasks.
This commit is contained in:
@ -84,6 +84,7 @@ public class AnalyzeTblStmt extends AnalyzeStmt {
|
||||
private final TableName tableName;
|
||||
private List<String> columnNames;
|
||||
private List<String> partitionNames;
|
||||
private boolean isAllColumns;
|
||||
|
||||
// after analyzed
|
||||
private long dbId;
|
||||
@ -98,6 +99,7 @@ public class AnalyzeTblStmt extends AnalyzeStmt {
|
||||
this.partitionNames = partitionNames == null ? null : partitionNames.getPartitionNames();
|
||||
this.columnNames = columnNames;
|
||||
this.analyzeProperties = properties;
|
||||
this.isAllColumns = columnNames == null;
|
||||
}
|
||||
|
||||
public AnalyzeTblStmt(AnalyzeProperties analyzeProperties, TableName tableName, List<String> columnNames, long dbId,
|
||||
@ -107,6 +109,7 @@ public class AnalyzeTblStmt extends AnalyzeStmt {
|
||||
this.columnNames = columnNames;
|
||||
this.dbId = dbId;
|
||||
this.table = table;
|
||||
this.isAllColumns = columnNames == null;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -128,6 +131,7 @@ public class AnalyzeTblStmt extends AnalyzeStmt {
|
||||
DatabaseIf db = catalog.getDbOrAnalysisException(dbName);
|
||||
dbId = db.getId();
|
||||
table = db.getTableOrAnalysisException(tblName);
|
||||
isAllColumns = columnNames == null;
|
||||
check();
|
||||
}
|
||||
|
||||
@ -301,4 +305,8 @@ public class AnalyzeTblStmt extends AnalyzeStmt {
|
||||
public Database getDb() throws AnalysisException {
|
||||
return analyzer.getEnv().getInternalCatalog().getDbOrAnalysisException(dbId);
|
||||
}
|
||||
|
||||
public boolean isAllColumns() {
|
||||
return isAllColumns;
|
||||
}
|
||||
}
|
||||
|
||||
@ -64,6 +64,7 @@ public class ShowAnalyzeStmt extends ShowStmt {
|
||||
.add("message")
|
||||
.add("last_exec_time_in_ms")
|
||||
.add("state")
|
||||
.add("progress")
|
||||
.add("schedule_type")
|
||||
.build();
|
||||
|
||||
|
||||
@ -33,7 +33,8 @@ public class ShowAnalyzeTaskStatus extends ShowStmt {
|
||||
.addColumn(new Column("task_id", ScalarType.createVarchar(100)))
|
||||
.addColumn(new Column("col_name", ScalarType.createVarchar(1000)))
|
||||
.addColumn(new Column("message", ScalarType.createVarchar(1000)))
|
||||
.addColumn(new Column("last_exec_time_in_ms", ScalarType.createVarchar(1000)))
|
||||
.addColumn(new Column("last_state_change_time", ScalarType.createVarchar(1000)))
|
||||
.addColumn(new Column("time_cost_in_ms", ScalarType.createVarchar(1000)))
|
||||
.addColumn(new Column("state", ScalarType.createVarchar(1000))).build();
|
||||
|
||||
private final long jobId;
|
||||
|
||||
@ -2562,6 +2562,7 @@ public class ShowExecutor {
|
||||
LocalDateTime.ofInstant(Instant.ofEpochMilli(analysisInfo.lastExecTimeInMs),
|
||||
ZoneId.systemDefault())));
|
||||
row.add(analysisInfo.state.toString());
|
||||
row.add(Env.getCurrentEnv().getAnalysisManager().getJobProgress(analysisInfo.jobId));
|
||||
row.add(analysisInfo.scheduleType.toString());
|
||||
resultRows.add(row);
|
||||
}
|
||||
@ -2762,6 +2763,7 @@ public class ShowExecutor {
|
||||
row.add(TimeUtils.DATETIME_FORMAT.format(
|
||||
LocalDateTime.ofInstant(Instant.ofEpochMilli(analysisInfo.lastExecTimeInMs),
|
||||
ZoneId.systemDefault())));
|
||||
row.add(String.valueOf(analysisInfo.timeCostInMs));
|
||||
row.add(analysisInfo.state.toString());
|
||||
rows.add(row);
|
||||
}
|
||||
|
||||
@ -20,6 +20,7 @@ package org.apache.doris.qe;
|
||||
import org.apache.doris.analysis.AddPartitionLikeClause;
|
||||
import org.apache.doris.analysis.AlterClause;
|
||||
import org.apache.doris.analysis.AlterTableStmt;
|
||||
import org.apache.doris.analysis.AnalyzeDBStmt;
|
||||
import org.apache.doris.analysis.AnalyzeStmt;
|
||||
import org.apache.doris.analysis.AnalyzeTblStmt;
|
||||
import org.apache.doris.analysis.Analyzer;
|
||||
@ -1149,7 +1150,7 @@ public class StmtExecutor {
|
||||
if (mysqlLoadId != null) {
|
||||
Env.getCurrentEnv().getLoadManager().getMysqlLoadManager().cancelMySqlLoad(mysqlLoadId);
|
||||
}
|
||||
if (parsedStmt instanceof AnalyzeTblStmt) {
|
||||
if (parsedStmt instanceof AnalyzeTblStmt || parsedStmt instanceof AnalyzeDBStmt) {
|
||||
Env.getCurrentEnv().getAnalysisManager().cancelSyncTask(context);
|
||||
}
|
||||
}
|
||||
@ -2485,7 +2486,7 @@ public class StmtExecutor {
|
||||
analyze(context.getSessionVariable().toThrift());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("Failed to execute internal SQL", e);
|
||||
throw new RuntimeException("Failed to execute internal SQL. " + Util.getRootCauseMessage(e), e);
|
||||
}
|
||||
planner.getFragments();
|
||||
RowBatch batch;
|
||||
@ -2495,7 +2496,7 @@ public class StmtExecutor {
|
||||
QeProcessorImpl.INSTANCE.registerQuery(context.queryId(),
|
||||
new QeProcessorImpl.QueryInfo(context, originStmt.originStmt, coord));
|
||||
} catch (UserException e) {
|
||||
throw new RuntimeException("Failed to execute internal SQL", e);
|
||||
throw new RuntimeException("Failed to execute internal SQL. " + Util.getRootCauseMessage(e), e);
|
||||
}
|
||||
|
||||
Span queryScheduleSpan = context.getTracer()
|
||||
@ -2504,7 +2505,7 @@ public class StmtExecutor {
|
||||
coord.exec();
|
||||
} catch (Exception e) {
|
||||
queryScheduleSpan.recordException(e);
|
||||
throw new RuntimeException("Failed to execute internal SQL", e);
|
||||
throw new RuntimeException("Failed to execute internal SQL. " + Util.getRootCauseMessage(e), e);
|
||||
} finally {
|
||||
queryScheduleSpan.end();
|
||||
}
|
||||
@ -2521,7 +2522,7 @@ public class StmtExecutor {
|
||||
}
|
||||
} catch (Exception e) {
|
||||
fetchResultSpan.recordException(e);
|
||||
throw new RuntimeException("Failed to execute internal SQL", e);
|
||||
throw new RuntimeException("Failed to execute internal SQL. " + Util.getRootCauseMessage(e), e);
|
||||
} finally {
|
||||
fetchResultSpan.end();
|
||||
}
|
||||
|
||||
@ -131,6 +131,10 @@ public class AnalysisInfo implements Writable {
|
||||
@SerializedName("lastExecTimeInMs")
|
||||
public long lastExecTimeInMs;
|
||||
|
||||
// finished or failed
|
||||
@SerializedName("timeCostInMs")
|
||||
public long timeCostInMs;
|
||||
|
||||
@SerializedName("state")
|
||||
public AnalysisState state;
|
||||
|
||||
@ -161,8 +165,9 @@ public class AnalysisInfo implements Writable {
|
||||
Map<String, Set<String>> colToPartitions, Set<String> partitionNames, String colName, Long indexId,
|
||||
JobType jobType, AnalysisMode analysisMode, AnalysisMethod analysisMethod, AnalysisType analysisType,
|
||||
int samplePercent, int sampleRows, int maxBucketNum, long periodTimeInMs, String message,
|
||||
long lastExecTimeInMs, AnalysisState state, ScheduleType scheduleType, boolean isExternalTableLevelTask,
|
||||
boolean partitionOnly, boolean samplingPartition, CronExpression cronExpression) {
|
||||
long lastExecTimeInMs, long timeCostInMs, AnalysisState state, ScheduleType scheduleType,
|
||||
boolean isExternalTableLevelTask, boolean partitionOnly, boolean samplingPartition,
|
||||
CronExpression cronExpression) {
|
||||
this.jobId = jobId;
|
||||
this.taskId = taskId;
|
||||
this.catalogName = catalogName;
|
||||
@ -182,6 +187,7 @@ public class AnalysisInfo implements Writable {
|
||||
this.periodTimeInMs = periodTimeInMs;
|
||||
this.message = message;
|
||||
this.lastExecTimeInMs = lastExecTimeInMs;
|
||||
this.timeCostInMs = timeCostInMs;
|
||||
this.state = state;
|
||||
this.scheduleType = scheduleType;
|
||||
this.externalTableLevelTask = isExternalTableLevelTask;
|
||||
@ -218,6 +224,9 @@ public class AnalysisInfo implements Writable {
|
||||
if (lastExecTimeInMs > 0) {
|
||||
sj.add("LastExecTime: " + StatisticsUtil.getReadableTime(lastExecTimeInMs));
|
||||
}
|
||||
if (timeCostInMs > 0) {
|
||||
sj.add("timeCost: " + timeCostInMs);
|
||||
}
|
||||
if (periodTimeInMs > 0) {
|
||||
sj.add("periodTimeInMs: " + StatisticsUtil.getReadableTime(periodTimeInMs));
|
||||
}
|
||||
@ -275,6 +284,8 @@ public class AnalysisInfo implements Writable {
|
||||
analysisInfoBuilder.setPeriodTimeInMs(StatisticsUtil.convertStrToInt(periodTimeInMs));
|
||||
String lastExecTimeInMs = resultRow.getColumnValue("last_exec_time_in_ms");
|
||||
analysisInfoBuilder.setLastExecTimeInMs(StatisticsUtil.convertStrToLong(lastExecTimeInMs));
|
||||
String timeCostInMs = resultRow.getColumnValue("time_cost_in_ms");
|
||||
analysisInfoBuilder.setTimeCostInMs(StatisticsUtil.convertStrToLong(timeCostInMs));
|
||||
String message = resultRow.getColumnValue("message");
|
||||
analysisInfoBuilder.setMessage(message);
|
||||
return analysisInfoBuilder.build();
|
||||
|
||||
@ -47,6 +47,7 @@ public class AnalysisInfoBuilder {
|
||||
private int sampleRows;
|
||||
private long periodTimeInMs;
|
||||
private long lastExecTimeInMs;
|
||||
private long timeCostInMs;
|
||||
private AnalysisState state;
|
||||
private ScheduleType scheduleType;
|
||||
private String message = "";
|
||||
@ -79,6 +80,7 @@ public class AnalysisInfoBuilder {
|
||||
maxBucketNum = info.maxBucketNum;
|
||||
message = info.message;
|
||||
lastExecTimeInMs = info.lastExecTimeInMs;
|
||||
timeCostInMs = info.timeCostInMs;
|
||||
state = info.state;
|
||||
scheduleType = info.scheduleType;
|
||||
externalTableLevelTask = info.externalTableLevelTask;
|
||||
@ -181,6 +183,11 @@ public class AnalysisInfoBuilder {
|
||||
return this;
|
||||
}
|
||||
|
||||
public AnalysisInfoBuilder setTimeCostInMs(long timeCostInMs) {
|
||||
this.timeCostInMs = timeCostInMs;
|
||||
return this;
|
||||
}
|
||||
|
||||
public AnalysisInfoBuilder setState(AnalysisState state) {
|
||||
this.state = state;
|
||||
return this;
|
||||
@ -213,7 +220,7 @@ public class AnalysisInfoBuilder {
|
||||
public AnalysisInfo build() {
|
||||
return new AnalysisInfo(jobId, taskId, catalogName, dbName, tblName, colToPartitions, partitionNames,
|
||||
colName, indexId, jobType, analysisMode, analysisMethod, analysisType, samplePercent,
|
||||
sampleRows, maxBucketNum, periodTimeInMs, message, lastExecTimeInMs, state, scheduleType,
|
||||
sampleRows, maxBucketNum, periodTimeInMs, message, lastExecTimeInMs, timeCostInMs, state, scheduleType,
|
||||
externalTableLevelTask, partitionOnly, samplingPartition, cronExpression);
|
||||
}
|
||||
|
||||
@ -237,6 +244,7 @@ public class AnalysisInfoBuilder {
|
||||
.setMaxBucketNum(maxBucketNum)
|
||||
.setMessage(message)
|
||||
.setLastExecTimeInMs(lastExecTimeInMs)
|
||||
.setTimeCostInMs(timeCostInMs)
|
||||
.setState(state)
|
||||
.setScheduleType(scheduleType)
|
||||
.setExternalTableLevelTask(externalTableLevelTask);
|
||||
|
||||
@ -43,6 +43,7 @@ import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.common.io.Writable;
|
||||
import org.apache.doris.common.util.Daemon;
|
||||
import org.apache.doris.common.util.Util;
|
||||
import org.apache.doris.datasource.CatalogIf;
|
||||
import org.apache.doris.mysql.privilege.PrivPredicate;
|
||||
import org.apache.doris.persist.AnalyzeDeletionLog;
|
||||
@ -184,10 +185,9 @@ public class AnalysisManager extends Daemon implements Writable {
|
||||
}
|
||||
TableName tableName = new TableName(db.getCatalog().getName(), db.getFullName(),
|
||||
table.getName());
|
||||
// columnNames null means to add all visitable columns.
|
||||
AnalyzeTblStmt analyzeTblStmt = new AnalyzeTblStmt(analyzeProperties, tableName,
|
||||
table.getBaseSchema().stream().map(
|
||||
Column::getName).collect(
|
||||
Collectors.toList()), db.getId(), table);
|
||||
null, db.getId(), table);
|
||||
try {
|
||||
analyzeTblStmt.check();
|
||||
} catch (AnalysisException analysisException) {
|
||||
@ -229,8 +229,9 @@ public class AnalysisManager extends Daemon implements Writable {
|
||||
Map<Long, BaseAnalysisTask> analysisTaskInfos = new HashMap<>();
|
||||
createTaskForEachColumns(jobInfo, analysisTaskInfos, isSync);
|
||||
createTaskForMVIdx(jobInfo, analysisTaskInfos, isSync);
|
||||
createTaskForExternalTable(jobInfo, analysisTaskInfos, isSync);
|
||||
|
||||
if (stmt.isAllColumns()) {
|
||||
createTaskForExternalTable(jobInfo, analysisTaskInfos, isSync);
|
||||
}
|
||||
if (!isSync) {
|
||||
persistAnalysisJob(jobInfo);
|
||||
analysisJobIdToTaskMap.put(jobInfo.jobId, analysisTaskInfos);
|
||||
@ -513,12 +514,12 @@ public class AnalysisManager extends Daemon implements Writable {
|
||||
long taskId = Env.getCurrentEnv().getNextId();
|
||||
AnalysisInfoBuilder indexTaskInfoBuilder = new AnalysisInfoBuilder(jobInfo);
|
||||
AnalysisInfo analysisInfo = indexTaskInfoBuilder.setIndexId(indexId)
|
||||
.setTaskId(taskId).build();
|
||||
.setTaskId(taskId).setLastExecTimeInMs(System.currentTimeMillis()).build();
|
||||
if (isSync) {
|
||||
return;
|
||||
}
|
||||
analysisTasks.put(taskId, createTask(analysisInfo));
|
||||
logCreateAnalysisJob(analysisInfo);
|
||||
logCreateAnalysisTask(analysisInfo);
|
||||
}
|
||||
} finally {
|
||||
olapTable.readUnlock();
|
||||
@ -538,7 +539,7 @@ public class AnalysisManager extends Daemon implements Writable {
|
||||
colTaskInfoBuilder.setColToPartitions(Collections.singletonMap(colName, entry.getValue()));
|
||||
}
|
||||
AnalysisInfo analysisInfo = colTaskInfoBuilder.setColName(colName).setIndexId(indexId)
|
||||
.setTaskId(taskId).build();
|
||||
.setTaskId(taskId).setLastExecTimeInMs(System.currentTimeMillis()).build();
|
||||
analysisTasks.put(taskId, createTask(analysisInfo));
|
||||
if (isSync) {
|
||||
continue;
|
||||
@ -553,13 +554,15 @@ public class AnalysisManager extends Daemon implements Writable {
|
||||
}
|
||||
}
|
||||
|
||||
private void logCreateAnalysisTask(AnalysisInfo analysisInfo) {
|
||||
analysisTaskInfoMap.put(analysisInfo.taskId, analysisInfo);
|
||||
// Change to public for unit test.
|
||||
public void logCreateAnalysisTask(AnalysisInfo analysisInfo) {
|
||||
replayCreateAnalysisTask(analysisInfo);
|
||||
Env.getCurrentEnv().getEditLog().logCreateAnalysisTasks(analysisInfo);
|
||||
}
|
||||
|
||||
private void logCreateAnalysisJob(AnalysisInfo analysisJob) {
|
||||
analysisJobInfoMap.put(analysisJob.jobId, analysisJob);
|
||||
// Change to public for unit test.
|
||||
public void logCreateAnalysisJob(AnalysisInfo analysisJob) {
|
||||
replayCreateAnalysisJob(analysisJob);
|
||||
Env.getCurrentEnv().getEditLog().logCreateAnalysisJob(analysisJob);
|
||||
}
|
||||
|
||||
@ -578,63 +581,71 @@ public class AnalysisManager extends Daemon implements Writable {
|
||||
}
|
||||
AnalysisInfoBuilder colTaskInfoBuilder = new AnalysisInfoBuilder(jobInfo);
|
||||
long taskId = Env.getCurrentEnv().getNextId();
|
||||
AnalysisInfo analysisInfo = colTaskInfoBuilder.setIndexId(-1L)
|
||||
.setTaskId(taskId).setExternalTableLevelTask(true).build();
|
||||
AnalysisInfo analysisInfo = colTaskInfoBuilder.setIndexId(-1L).setLastExecTimeInMs(System.currentTimeMillis())
|
||||
.setTaskId(taskId).setColName("TableRowCount").setExternalTableLevelTask(true).build();
|
||||
analysisTasks.put(taskId, createTask(analysisInfo));
|
||||
if (isSync) {
|
||||
// For sync job, don't need to persist, return here and execute it immediately.
|
||||
return;
|
||||
}
|
||||
try {
|
||||
logCreateAnalysisJob(analysisInfo);
|
||||
logCreateAnalysisTask(analysisInfo);
|
||||
} catch (Exception e) {
|
||||
throw new DdlException("Failed to create analysis task", e);
|
||||
}
|
||||
}
|
||||
|
||||
public void updateTaskStatus(AnalysisInfo info, AnalysisState jobState, String message, long time) {
|
||||
public void updateTaskStatus(AnalysisInfo info, AnalysisState taskState, String message, long time) {
|
||||
if (analysisJobIdToTaskMap.get(info.jobId) == null) {
|
||||
return;
|
||||
}
|
||||
info.state = jobState;
|
||||
info.state = taskState;
|
||||
info.message = message;
|
||||
// Update the task cost time when task finished or failed. And only log the final state.
|
||||
if (taskState.equals(AnalysisState.FINISHED) || taskState.equals(AnalysisState.FAILED)) {
|
||||
info.timeCostInMs = time - info.lastExecTimeInMs;
|
||||
info.lastExecTimeInMs = time;
|
||||
logCreateAnalysisTask(info);
|
||||
}
|
||||
info.lastExecTimeInMs = time;
|
||||
logCreateAnalysisTask(info);
|
||||
|
||||
AnalysisInfo job = analysisJobInfoMap.get(info.jobId);
|
||||
job.lastExecTimeInMs = time;
|
||||
if (info.state.equals(AnalysisState.RUNNING) && !job.state.equals(AnalysisState.PENDING)) {
|
||||
job.state = AnalysisState.RUNNING;
|
||||
Env.getCurrentEnv().getEditLog().logCreateAnalysisTasks(job);
|
||||
}
|
||||
boolean allFinished = true;
|
||||
boolean hasFailure = false;
|
||||
for (BaseAnalysisTask task : analysisJobIdToTaskMap.get(info.jobId).values()) {
|
||||
AnalysisInfo taskInfo = task.info;
|
||||
if (taskInfo.state.equals(AnalysisState.RUNNING) || taskInfo.state.equals(AnalysisState.PENDING)) {
|
||||
allFinished = false;
|
||||
break;
|
||||
// Synchronize the job state change in job level.
|
||||
synchronized (job) {
|
||||
job.lastExecTimeInMs = time;
|
||||
// Set the job state to RUNNING when its first task becomes RUNNING.
|
||||
if (info.state.equals(AnalysisState.RUNNING) && job.state.equals(AnalysisState.PENDING)) {
|
||||
job.state = AnalysisState.RUNNING;
|
||||
replayCreateAnalysisJob(job);
|
||||
}
|
||||
if (taskInfo.state.equals(AnalysisState.FAILED)) {
|
||||
hasFailure = true;
|
||||
}
|
||||
}
|
||||
if (allFinished) {
|
||||
if (hasFailure) {
|
||||
job.state = AnalysisState.FAILED;
|
||||
logCreateAnalysisJob(job);
|
||||
} else {
|
||||
job.state = AnalysisState.FINISHED;
|
||||
if (job.jobType.equals(JobType.SYSTEM)) {
|
||||
try {
|
||||
updateTableStats(job);
|
||||
} catch (Throwable e) {
|
||||
LOG.warn("Failed to update Table statistics in job: {}", info.toString());
|
||||
}
|
||||
boolean allFinished = true;
|
||||
boolean hasFailure = false;
|
||||
for (BaseAnalysisTask task : analysisJobIdToTaskMap.get(info.jobId).values()) {
|
||||
AnalysisInfo taskInfo = task.info;
|
||||
if (taskInfo.state.equals(AnalysisState.RUNNING) || taskInfo.state.equals(AnalysisState.PENDING)) {
|
||||
allFinished = false;
|
||||
break;
|
||||
}
|
||||
if (taskInfo.state.equals(AnalysisState.FAILED)) {
|
||||
hasFailure = true;
|
||||
}
|
||||
logCreateAnalysisJob(job);
|
||||
}
|
||||
analysisJobIdToTaskMap.remove(job.jobId);
|
||||
if (allFinished) {
|
||||
if (hasFailure) {
|
||||
job.state = AnalysisState.FAILED;
|
||||
logCreateAnalysisJob(job);
|
||||
} else {
|
||||
job.state = AnalysisState.FINISHED;
|
||||
if (job.jobType.equals(JobType.SYSTEM)) {
|
||||
try {
|
||||
updateTableStats(job);
|
||||
} catch (Throwable e) {
|
||||
LOG.warn("Failed to update Table statistics in job: {}", info.toString(), e);
|
||||
}
|
||||
}
|
||||
logCreateAnalysisJob(job);
|
||||
}
|
||||
analysisJobIdToTaskMap.remove(job.jobId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -700,6 +711,28 @@ public class AnalysisManager extends Daemon implements Writable {
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
public String getJobProgress(long jobId) {
|
||||
List<AnalysisInfo> tasks = findTasks(jobId);
|
||||
int finished = 0;
|
||||
int failed = 0;
|
||||
int inProgress = 0;
|
||||
int total = tasks.size();
|
||||
for (AnalysisInfo info : tasks) {
|
||||
switch (info.state) {
|
||||
case FINISHED:
|
||||
finished++;
|
||||
break;
|
||||
case FAILED:
|
||||
failed++;
|
||||
break;
|
||||
default:
|
||||
inProgress++;
|
||||
break;
|
||||
}
|
||||
}
|
||||
return String.format("%d Finished/%d Failed/%d In Progress/%d Total", finished, failed, inProgress, total);
|
||||
}
|
||||
|
||||
private void syncExecute(Collection<BaseAnalysisTask> tasks) {
|
||||
SyncTaskCollection syncTaskCollection = new SyncTaskCollection(tasks);
|
||||
ConnectContext ctx = ConnectContext.get();
|
||||
@ -813,8 +846,11 @@ public class AnalysisManager extends Daemon implements Writable {
|
||||
|
||||
public void execute() {
|
||||
List<String> colNames = new ArrayList<>();
|
||||
List<String> errorMessages = new ArrayList<>();
|
||||
for (BaseAnalysisTask task : tasks) {
|
||||
if (cancelled) {
|
||||
colNames.add(task.info.colName);
|
||||
errorMessages.add("Cancelled");
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
@ -822,12 +858,14 @@ public class AnalysisManager extends Daemon implements Writable {
|
||||
updateSyncTaskStatus(task, AnalysisState.FINISHED);
|
||||
} catch (Throwable t) {
|
||||
colNames.add(task.info.colName);
|
||||
errorMessages.add(Util.getRootCauseMessage(t));
|
||||
updateSyncTaskStatus(task, AnalysisState.FAILED);
|
||||
LOG.warn("Failed to analyze, info: {}", task, t);
|
||||
}
|
||||
}
|
||||
if (!colNames.isEmpty()) {
|
||||
throw new RuntimeException("Failed to analyze following columns: " + String.join(",", colNames));
|
||||
throw new RuntimeException("Failed to analyze following columns:[" + String.join(",", colNames)
|
||||
+ "] Reasons: " + String.join(",", errorMessages));
|
||||
}
|
||||
}
|
||||
|
||||
@ -920,4 +958,9 @@ public class AnalysisManager extends Daemon implements Writable {
|
||||
entry.getValue().write(out);
|
||||
}
|
||||
}
|
||||
|
||||
// For unit test use only.
|
||||
public void addToJobIdTasksMap(long jobId, Map<Long, BaseAnalysisTask> tasks) {
|
||||
analysisJobIdToTaskMap.put(jobId, tasks);
|
||||
}
|
||||
}
|
||||
|
||||
@ -17,7 +17,6 @@
|
||||
|
||||
package org.apache.doris.statistics;
|
||||
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.ThreadPoolManager;
|
||||
import org.apache.doris.common.ThreadPoolManager.BlockedPolicy;
|
||||
@ -101,9 +100,6 @@ public class AnalysisTaskExecutor extends Thread {
|
||||
BaseAnalysisTask task = taskScheduler.getPendingTasks();
|
||||
AnalysisTaskWrapper taskWrapper = new AnalysisTaskWrapper(this, task);
|
||||
executors.submit(taskWrapper);
|
||||
Env.getCurrentEnv().getAnalysisManager()
|
||||
.updateTaskStatus(task.info,
|
||||
AnalysisState.RUNNING, "", System.currentTimeMillis());
|
||||
}
|
||||
|
||||
public void putJob(AnalysisTaskWrapper wrapper) throws Exception {
|
||||
|
||||
@ -18,6 +18,7 @@
|
||||
package org.apache.doris.statistics;
|
||||
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.common.util.Util;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
@ -62,16 +63,17 @@ public class AnalysisTaskWrapper extends FutureTask<Void> {
|
||||
} finally {
|
||||
if (!task.killed) {
|
||||
if (except != null) {
|
||||
LOG.warn("Failed to execute task", except);
|
||||
LOG.warn("Analyze {} failed.", task.toString(), except);
|
||||
Env.getCurrentEnv().getAnalysisManager()
|
||||
.updateTaskStatus(task.info,
|
||||
AnalysisState.FAILED, except.getMessage(), System.currentTimeMillis());
|
||||
AnalysisState.FAILED, Util.getRootCauseMessage(except), System.currentTimeMillis());
|
||||
} else {
|
||||
LOG.debug("Analyze {} finished, cost time:{}", task.toString(),
|
||||
System.currentTimeMillis() - startTime);
|
||||
Env.getCurrentEnv().getAnalysisManager()
|
||||
.updateTaskStatus(task.info,
|
||||
AnalysisState.FINISHED, "", System.currentTimeMillis());
|
||||
}
|
||||
LOG.warn("{} finished, cost time:{}", task.toString(), System.currentTimeMillis() - startTime);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -171,12 +171,20 @@ public abstract class BaseAnalysisTask {
|
||||
break;
|
||||
} catch (Throwable t) {
|
||||
LOG.warn("Failed to execute analysis task, retried times: {}", retriedTimes++, t);
|
||||
if (retriedTimes > StatisticConstants.ANALYZE_TASK_RETRY_TIMES) {
|
||||
throw new RuntimeException(t);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public abstract void doExecute() throws Exception;
|
||||
|
||||
protected void setTaskStateToRunning() {
|
||||
Env.getCurrentEnv().getAnalysisManager()
|
||||
.updateTaskStatus(info, AnalysisState.RUNNING, "", System.currentTimeMillis());
|
||||
}
|
||||
|
||||
public void cancel() {
|
||||
killed = true;
|
||||
if (stmtExecutor != null) {
|
||||
@ -184,7 +192,7 @@ public abstract class BaseAnalysisTask {
|
||||
}
|
||||
Env.getCurrentEnv().getAnalysisManager()
|
||||
.updateTaskStatus(info, AnalysisState.FAILED,
|
||||
String.format("Job has been cancelled: %s", info.toString()), -1);
|
||||
String.format("Job has been cancelled: %s", info.message), System.currentTimeMillis());
|
||||
}
|
||||
|
||||
public long getLastExecTime() {
|
||||
@ -218,4 +226,11 @@ public abstract class BaseAnalysisTask {
|
||||
return String.format("TABLESAMPLE(%d ROWS)", info.sampleRows);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return String.format("Job id [%d], Task id [%d], catalog [%s], db [%s], table [%s], column [%s]",
|
||||
info.jobId, info.taskId, catalog.getName(), db.getFullName(), tbl.getName(),
|
||||
col == null ? "TableRowCount" : col.getName());
|
||||
}
|
||||
}
|
||||
|
||||
@ -22,6 +22,7 @@ import org.apache.doris.catalog.external.HMSExternalTable;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.common.util.TimeUtils;
|
||||
import org.apache.doris.qe.AutoCloseConnectContext;
|
||||
import org.apache.doris.qe.QueryState;
|
||||
import org.apache.doris.qe.StmtExecutor;
|
||||
import org.apache.doris.statistics.util.InternalQueryResult;
|
||||
import org.apache.doris.statistics.util.StatisticsUtil;
|
||||
@ -105,6 +106,7 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
|
||||
}
|
||||
|
||||
public void doExecute() throws Exception {
|
||||
setTaskStateToRunning();
|
||||
if (isTableLevelTask) {
|
||||
getTableStats();
|
||||
} else {
|
||||
@ -190,11 +192,7 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
|
||||
params.put("dataSizeFunction", getDataSizeFunction(col));
|
||||
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
|
||||
String sql = stringSubstitutor.replace(sb.toString());
|
||||
try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) {
|
||||
r.connectContext.getSessionVariable().disableNereidsPlannerOnce();
|
||||
this.stmtExecutor = new StmtExecutor(r.connectContext, sql);
|
||||
this.stmtExecutor.execute();
|
||||
}
|
||||
executeInsertSql(sql);
|
||||
}
|
||||
} else {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
@ -233,12 +231,27 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
|
||||
params.put("dataSizeFunction", getDataSizeFunction(col));
|
||||
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
|
||||
String sql = stringSubstitutor.replace(sb.toString());
|
||||
try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) {
|
||||
r.connectContext.getSessionVariable().disableNereidsPlannerOnce();
|
||||
this.stmtExecutor = new StmtExecutor(r.connectContext, sql);
|
||||
this.stmtExecutor.execute();
|
||||
executeInsertSql(sql);
|
||||
Env.getCurrentEnv().getStatisticsCache().refreshColStatsSync(
|
||||
catalog.getId(), db.getId(), tbl.getId(), -1, col.getName());
|
||||
}
|
||||
}
|
||||
|
||||
private void executeInsertSql(String sql) throws Exception {
|
||||
long startTime = System.currentTimeMillis();
|
||||
try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) {
|
||||
r.connectContext.getSessionVariable().disableNereidsPlannerOnce();
|
||||
this.stmtExecutor = new StmtExecutor(r.connectContext, sql);
|
||||
r.connectContext.setExecutor(stmtExecutor);
|
||||
this.stmtExecutor.execute();
|
||||
QueryState queryState = r.connectContext.getState();
|
||||
if (queryState.getStateType().equals(QueryState.MysqlStateType.ERR)) {
|
||||
LOG.warn(String.format("Failed to analyze %s.%s.%s, sql: [%s], error: [%s]",
|
||||
info.catalogName, info.dbName, info.colName, sql, queryState.getErrorMessage()));
|
||||
throw new RuntimeException(queryState.getErrorMessage());
|
||||
}
|
||||
Env.getCurrentEnv().getStatisticsCache().refreshColStatsSync(tbl.getId(), -1, col.getName());
|
||||
LOG.debug(String.format("Analyze %s.%s.%s done. SQL: [%s]. Cost %d ms.",
|
||||
info.catalogName, info.dbName, info.colName, sql, (System.currentTimeMillis() - startTime)));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -59,6 +59,7 @@ public class HistogramTask extends BaseAnalysisTask {
|
||||
|
||||
@Override
|
||||
public void doExecute() throws Exception {
|
||||
setTaskStateToRunning();
|
||||
Map<String, String> params = new HashMap<>();
|
||||
params.put("internalDB", FeConstants.INTERNAL_DB_NAME);
|
||||
params.put("histogramStatTbl", StatisticConstants.HISTOGRAM_TBL_NAME);
|
||||
|
||||
@ -87,6 +87,7 @@ public class MVAnalysisTask extends BaseAnalysisTask {
|
||||
|
||||
@Override
|
||||
public void doExecute() throws Exception {
|
||||
setTaskStateToRunning();
|
||||
for (Column column : meta.getSchema()) {
|
||||
SelectStmt selectOne = (SelectStmt) selectStmt.clone();
|
||||
TableRef tableRef = selectOne.getTableRefs().get(0);
|
||||
|
||||
@ -60,6 +60,7 @@ public class OlapAnalysisTask extends BaseAnalysisTask {
|
||||
}
|
||||
|
||||
public void doExecute() throws Exception {
|
||||
setTaskStateToRunning();
|
||||
Map<String, String> params = new HashMap<>();
|
||||
params.put("internalDB", FeConstants.INTERNAL_DB_NAME);
|
||||
params.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME);
|
||||
|
||||
@ -169,7 +169,11 @@ public class StatisticsCache {
|
||||
}
|
||||
|
||||
public void refreshColStatsSync(long tblId, long idxId, String colName) {
|
||||
columnStatisticsCache.synchronous().refresh(new StatisticsCacheKey(tblId, idxId, colName));
|
||||
columnStatisticsCache.synchronous().refresh(new StatisticsCacheKey(-1, -1, tblId, idxId, colName));
|
||||
}
|
||||
|
||||
public void refreshColStatsSync(long catalogId, long dbId, long tblId, long idxId, String colName) {
|
||||
columnStatisticsCache.synchronous().refresh(new StatisticsCacheKey(catalogId, dbId, tblId, idxId, colName));
|
||||
}
|
||||
|
||||
public void refreshHistogramSync(long tblId, long idxId, String colName) {
|
||||
|
||||
@ -174,6 +174,8 @@ public class StatisticsUtil {
|
||||
sessionVariable.parallelPipelineTaskNum = Config.statistics_sql_parallel_exec_instance_num;
|
||||
sessionVariable.setEnableNereidsPlanner(false);
|
||||
sessionVariable.enableProfile = false;
|
||||
sessionVariable.queryTimeoutS = Config.analyze_task_timeout_in_hours * 60 * 60;
|
||||
sessionVariable.insertTimeoutS = Config.analyze_task_timeout_in_hours * 60 * 60;
|
||||
connectContext.setEnv(Env.getCurrentEnv());
|
||||
connectContext.setDatabase(FeConstants.INTERNAL_DB_NAME);
|
||||
connectContext.setQualifiedUser(UserIdentity.ROOT.getQualifiedUser());
|
||||
|
||||
@ -0,0 +1,73 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.statistics;
|
||||
|
||||
import mockit.Mock;
|
||||
import mockit.MockUp;
|
||||
import mockit.Mocked;
|
||||
import org.junit.Test;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
public class AnalysisManagerTest {
|
||||
@Test
|
||||
public void testUpdateTaskStatus(@Mocked BaseAnalysisTask task1,
|
||||
@Mocked BaseAnalysisTask task2) {
|
||||
|
||||
new MockUp<AnalysisManager>() {
|
||||
@Mock
|
||||
public void logCreateAnalysisTask(AnalysisInfo job) {
|
||||
}
|
||||
|
||||
@Mock
|
||||
public void logCreateAnalysisJob(AnalysisInfo job) {
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
AnalysisInfo job = new AnalysisInfoBuilder().setJobId(1)
|
||||
.setState(AnalysisState.PENDING).setJobType(AnalysisInfo.JobType.MANUAL).build();
|
||||
AnalysisInfo taskInfo1 = new AnalysisInfoBuilder().setJobId(1)
|
||||
.setTaskId(2).setState(AnalysisState.PENDING).build();
|
||||
AnalysisInfo taskInfo2 = new AnalysisInfoBuilder().setJobId(1)
|
||||
.setTaskId(3).setState(AnalysisState.PENDING).build();
|
||||
AnalysisManager manager = new AnalysisManager();
|
||||
manager.replayCreateAnalysisJob(job);
|
||||
manager.replayCreateAnalysisTask(taskInfo1);
|
||||
manager.replayCreateAnalysisTask(taskInfo2);
|
||||
Map<Long, BaseAnalysisTask> tasks = new HashMap<>();
|
||||
|
||||
task1.info = taskInfo1;
|
||||
task2.info = taskInfo2;
|
||||
tasks.put(2L, task1);
|
||||
tasks.put(3L, task2);
|
||||
manager.addToJobIdTasksMap(1, tasks);
|
||||
|
||||
Assertions.assertEquals(job.state, AnalysisState.PENDING);
|
||||
manager.updateTaskStatus(taskInfo1, AnalysisState.RUNNING, "", 0);
|
||||
Assertions.assertEquals(job.state, AnalysisState.RUNNING);
|
||||
manager.updateTaskStatus(taskInfo2, AnalysisState.RUNNING, "", 0);
|
||||
Assertions.assertEquals(job.state, AnalysisState.RUNNING);
|
||||
manager.updateTaskStatus(taskInfo1, AnalysisState.FINISHED, "", 0);
|
||||
Assertions.assertEquals(job.state, AnalysisState.RUNNING);
|
||||
manager.updateTaskStatus(taskInfo2, AnalysisState.FINISHED, "", 0);
|
||||
Assertions.assertEquals(job.state, AnalysisState.FINISHED);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user