diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJob.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJob.java index bfe496dbdd..224ef91883 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJob.java @@ -20,6 +20,7 @@ package org.apache.doris.statistics; import org.apache.doris.analysis.AnalyzeStmt; import org.apache.doris.catalog.Catalog; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.DdlException; import com.google.common.base.Strings; import com.google.common.collect.Lists; @@ -110,141 +111,131 @@ public class StatisticsJob { } public long getId() { - return this.id; + return id; } public long getDbId() { - return this.dbId; + return dbId; } public Set getTblIds() { - return this.tblIds; + return tblIds; } public Map> getTableIdToColumnName() { - return this.tableIdToColumnName; + return tableIdToColumnName; } public Map getProperties() { - return this.properties; + return properties; } public List getTasks() { - return this.tasks; + return tasks; } public List getErrorMsgs() { - return this.errorMsgs; + return errorMsgs; } public JobState getJobState() { - return this.jobState; + return jobState; } public long getCreateTime() { - return this.createTime; + return createTime; } public long getStartTime() { - return this.startTime; + return startTime; } public long getFinishTime() { - return this.finishTime; + return finishTime; } public int getProgress() { - return this.progress; + return progress; } - public void updateJobState(JobState newState) throws IllegalStateException { + public void updateJobState(JobState newState) throws DdlException { LOG.info("To change statistics job(id={}) state from {} to {}", id, jobState, newState); writeLock(); - + JobState fromState = jobState; try { - // PENDING -> SCHEDULING/FAILED/CANCELLED - if (jobState == JobState.PENDING) { - if (newState == JobState.SCHEDULING) { - this.jobState = newState; - LOG.info("Statistics job(id={}) state changed from {} to {}", id, jobState, newState); - } else if (newState == JobState.FAILED) { - this.jobState = newState; - LOG.info("Statistics job(id={}) state changed from {} to {}", id, jobState, newState); - } else if (newState == JobState.CANCELLED) { - this.jobState = newState; - LOG.info("Statistics job(id={}) state changed from {} to {}", id, jobState, newState); - } else { - LOG.info("Invalid statistics job(id={}) state transition from {} to {}", id, jobState, newState); - throw new IllegalStateException("Invalid job state transition from PENDING to " + newState); - } - return; - } - - // SCHEDULING -> RUNNING/FAILED/CANCELLED - if (jobState == JobState.SCHEDULING) { - if (newState == JobState.RUNNING) { - this.jobState = newState; - // job start running, set start time - this.startTime = System.currentTimeMillis(); - LOG.info("Statistics job(id={}) state changed from {} to {}", id, jobState, newState); - } else if (newState == JobState.FAILED) { - this.jobState = newState; - LOG.info("Statistics job(id={}) state changed from {} to {}", id, jobState, newState); - } else if (newState == JobState.CANCELLED) { - this.jobState = newState; - LOG.info("Statistics job(id={}) state changed from {} to {}", id, jobState, newState); - } else { - LOG.info("Invalid statistics job(id={}) state transition from {} to {}", id, jobState, newState); - throw new IllegalStateException("Invalid job state transition from SCHEDULING to " + newState); - } - return; - } - - // RUNNING -> FINISHED/FAILED/CANCELLED - if (jobState == JobState.RUNNING) { - if (newState == JobState.FINISHED) { - // set finish time - this.finishTime = System.currentTimeMillis(); - this.jobState = newState; - LOG.info("Statistics job(id={}) state changed from {} to {}", id, jobState, newState); - } else if (newState == JobState.FAILED) { - this.jobState = newState; - LOG.info("Statistics job(id={}) state changed from {} to {}", id, jobState, newState); - } else if (newState == JobState.CANCELLED) { - this.jobState = newState; - LOG.info("Statistics job(id={}) state changed from {} to {}", id, jobState, newState); - } else { - LOG.info("Invalid statistics job(id={}) state transition from {} to {}", id, jobState, newState); - throw new IllegalStateException("Invalid job state transition from RUNNING to " + newState); - } - return; - } - - // unsupported transition - LOG.info("Invalid job(id={}) state transition from {} to {}", id, jobState, newState); - throw new IllegalStateException("Invalid job state transition from " + jobState + " to " + newState); + unprotectedUpdateJobState(newState); + } catch (DdlException e) { + LOG.warn(e.getMessage(), e); + throw e; } finally { writeUnlock(); - LOG.info("Statistics job(id={}) current state is {} ", id, jobState); } + LOG.info("Statistics job(id={}) state changed from {} to {}", id, fromState, jobState); } - public void updateJobInfoByTaskId(Long taskId, String errorMsg) { - writeLock(); + private void unprotectedUpdateJobState(JobState newState) throws DdlException { + // PENDING -> PENDING/SCHEDULING/FAILED/CANCELLED + if (jobState == JobState.PENDING) { + switch (newState) { + case PENDING: + case SCHEDULING: + break; + case FAILED: + case CANCELLED: + finishTime = System.currentTimeMillis(); + break; + default: + throw new DdlException("Invalid job state transition from " + jobState + " to " + newState); + } + } + // SCHEDULING -> RUNNING/FAILED/CANCELLED + else if (jobState == JobState.SCHEDULING) { + switch (newState) { + case RUNNING: + startTime = System.currentTimeMillis(); + break; + case FAILED: + case CANCELLED: + finishTime = System.currentTimeMillis(); + break; + default: + throw new DdlException("Invalid job state transition from " + jobState + " to " + newState); + } + } + // RUNNING -> FINISHED/FAILED/CANCELLED + else if (jobState == JobState.RUNNING) { + switch (newState) { + case FINISHED: + case FAILED: + case CANCELLED: + // set finish time + finishTime = System.currentTimeMillis(); + break; + default: + throw new DdlException("Invalid job state transition from " + jobState + " to " + newState); + } + } else { + // TODO + throw new DdlException("Invalid job state transition from " + jobState + " to " + newState); + } + jobState = newState; + } + public void updateJobInfoByTaskId(Long taskId, String errorMsg) throws DdlException { + writeLock(); try { - for (StatisticsTask task : this.tasks) { + for (StatisticsTask task : tasks) { if (taskId == task.getId()) { if (Strings.isNullOrEmpty(errorMsg)) { - this.progress += 1; - if (this.progress == this.tasks.size()) { - updateJobState(StatisticsJob.JobState.FINISHED); + progress += 1; + if (progress == tasks.size()) { + unprotectedUpdateJobState(StatisticsJob.JobState.FINISHED); } task.updateTaskState(StatisticsTask.TaskState.FINISHED); } else { - this.errorMsgs.add(errorMsg); + errorMsgs.add(errorMsg); task.updateTaskState(StatisticsTask.TaskState.FAILED); - updateJobState(StatisticsJob.JobState.FAILED); + unprotectedUpdateJobState(StatisticsJob.JobState.FAILED); } return; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobScheduler.java index b62fea4381..d09da09de8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobScheduler.java @@ -86,7 +86,12 @@ public class StatisticsJobScheduler extends MasterDaemon { LOG.info("The statistics task queue is full, schedule the job(id={}) later", pendingJob.getId()); } catch (DdlException e) { pendingJobQueue.remove(); - pendingJob.updateJobState(StatisticsJob.JobState.FAILED); + try { + // TODO change to without exception + pendingJob.updateJobState(StatisticsJob.JobState.FAILED); + } catch (DdlException ddlException) { + LOG.fatal(ddlException.getMessage(), e); + } LOG.info("Failed to schedule the statistical job(id={})", pendingJob.getId(), e); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTask.java index a98df1753d..fa236584c3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTask.java @@ -18,6 +18,7 @@ package org.apache.doris.statistics; import org.apache.doris.catalog.Catalog; +import org.apache.doris.common.DdlException; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -135,10 +136,9 @@ public abstract class StatisticsTask implements Callable { @Override public abstract StatisticsTaskResult call() throws Exception; - public void updateTaskState(TaskState newState) throws IllegalStateException{ + // please retain job lock firstly + public void updateTaskState(TaskState newState) throws DdlException { LOG.info("To change statistics task(id={}) state from {} to {}", id, taskState, newState); - writeLock(); - try { // PENDING -> RUNNING/FAILED if (taskState == TaskState.PENDING) { @@ -152,7 +152,7 @@ public abstract class StatisticsTask implements Callable { LOG.info("Statistics task(id={}) state changed from {} to {}", id, taskState, newState); } else { LOG.info("Invalid task(id={}) state transition from {} to {}", id, taskState, newState); - throw new IllegalStateException("Invalid task state transition from PENDING to " + newState); + throw new DdlException("Invalid task state transition from PENDING to " + newState); } return; } @@ -169,14 +169,13 @@ public abstract class StatisticsTask implements Callable { LOG.info("Statistics task(id={}) state changed from {} to {}", id, taskState, newState); } else { LOG.info("Invalid task(id={}) state transition from {} to {}", id, taskState, newState); - throw new IllegalStateException("Invalid task state transition from RUNNING to " + newState); + throw new DdlException("Invalid task state transition from RUNNING to " + newState); } } LOG.info("Invalid task(id={}) state transition from {} to {}", id, taskState, newState); - throw new IllegalStateException("Invalid task state transition from " + taskState + " to " + newState); + throw new DdlException("Invalid task state transition from " + taskState + " to " + newState); } finally { - writeUnlock(); LOG.info("Statistics task(id={}) current state is {}", id, taskState); } }