From c132abd2bdbbd6e86b61eedc84473a891fb69880 Mon Sep 17 00:00:00 2001 From: EmmyMiao87 <522274284@qq.com> Date: Fri, 29 Apr 2022 10:46:24 +0800 Subject: [PATCH] (Refactor)[Statistics] Fix lock risks in Statistics Job (#9256) * (Refactor)[Statistics] Fix lock risks in Statistics Job 1. Remove lock nesting between job and task 2. Solve the deadlock problem during job update 3. Avoid printing the log while holding the lock * Add log --- .../doris/statistics/StatisticsJob.java | 159 +++++++++--------- .../statistics/StatisticsJobScheduler.java | 7 +- .../doris/statistics/StatisticsTask.java | 13 +- 3 files changed, 87 insertions(+), 92 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJob.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJob.java index bfe496dbdd..224ef91883 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJob.java @@ -20,6 +20,7 @@ package org.apache.doris.statistics; import org.apache.doris.analysis.AnalyzeStmt; import org.apache.doris.catalog.Catalog; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.DdlException; import com.google.common.base.Strings; import com.google.common.collect.Lists; @@ -110,141 +111,131 @@ public class StatisticsJob { } public long getId() { - return this.id; + return id; } public long getDbId() { - return this.dbId; + return dbId; } public Set getTblIds() { - return this.tblIds; + return tblIds; } public Map> getTableIdToColumnName() { - return this.tableIdToColumnName; + return tableIdToColumnName; } public Map getProperties() { - return this.properties; + return properties; } public List getTasks() { - return this.tasks; + return tasks; } public List getErrorMsgs() { - return this.errorMsgs; + return errorMsgs; } public JobState getJobState() { - return this.jobState; + return jobState; } public long getCreateTime() { - return this.createTime; + return createTime; } public long getStartTime() { - return this.startTime; + return startTime; } public long getFinishTime() { - return this.finishTime; + return finishTime; } public int getProgress() { - return this.progress; + return progress; } - public void updateJobState(JobState newState) throws IllegalStateException { + public void updateJobState(JobState newState) throws DdlException { LOG.info("To change statistics job(id={}) state from {} to {}", id, jobState, newState); writeLock(); - + JobState fromState = jobState; try { - // PENDING -> SCHEDULING/FAILED/CANCELLED - if (jobState == JobState.PENDING) { - if (newState == JobState.SCHEDULING) { - this.jobState = newState; - LOG.info("Statistics job(id={}) state changed from {} to {}", id, jobState, newState); - } else if (newState == JobState.FAILED) { - this.jobState = newState; - LOG.info("Statistics job(id={}) state changed from {} to {}", id, jobState, newState); - } else if (newState == JobState.CANCELLED) { - this.jobState = newState; - LOG.info("Statistics job(id={}) state changed from {} to {}", id, jobState, newState); - } else { - LOG.info("Invalid statistics job(id={}) state transition from {} to {}", id, jobState, newState); - throw new IllegalStateException("Invalid job state transition from PENDING to " + newState); - } - return; - } - - // SCHEDULING -> RUNNING/FAILED/CANCELLED - if (jobState == JobState.SCHEDULING) { - if (newState == JobState.RUNNING) { - this.jobState = newState; - // job start running, set start time - this.startTime = System.currentTimeMillis(); - LOG.info("Statistics job(id={}) state changed from {} to {}", id, jobState, newState); - } else if (newState == JobState.FAILED) { - this.jobState = newState; - LOG.info("Statistics job(id={}) state changed from {} to {}", id, jobState, newState); - } else if (newState == JobState.CANCELLED) { - this.jobState = newState; - LOG.info("Statistics job(id={}) state changed from {} to {}", id, jobState, newState); - } else { - LOG.info("Invalid statistics job(id={}) state transition from {} to {}", id, jobState, newState); - throw new IllegalStateException("Invalid job state transition from SCHEDULING to " + newState); - } - return; - } - - // RUNNING -> FINISHED/FAILED/CANCELLED - if (jobState == JobState.RUNNING) { - if (newState == JobState.FINISHED) { - // set finish time - this.finishTime = System.currentTimeMillis(); - this.jobState = newState; - LOG.info("Statistics job(id={}) state changed from {} to {}", id, jobState, newState); - } else if (newState == JobState.FAILED) { - this.jobState = newState; - LOG.info("Statistics job(id={}) state changed from {} to {}", id, jobState, newState); - } else if (newState == JobState.CANCELLED) { - this.jobState = newState; - LOG.info("Statistics job(id={}) state changed from {} to {}", id, jobState, newState); - } else { - LOG.info("Invalid statistics job(id={}) state transition from {} to {}", id, jobState, newState); - throw new IllegalStateException("Invalid job state transition from RUNNING to " + newState); - } - return; - } - - // unsupported transition - LOG.info("Invalid job(id={}) state transition from {} to {}", id, jobState, newState); - throw new IllegalStateException("Invalid job state transition from " + jobState + " to " + newState); + unprotectedUpdateJobState(newState); + } catch (DdlException e) { + LOG.warn(e.getMessage(), e); + throw e; } finally { writeUnlock(); - LOG.info("Statistics job(id={}) current state is {} ", id, jobState); } + LOG.info("Statistics job(id={}) state changed from {} to {}", id, fromState, jobState); } - public void updateJobInfoByTaskId(Long taskId, String errorMsg) { - writeLock(); + private void unprotectedUpdateJobState(JobState newState) throws DdlException { + // PENDING -> PENDING/SCHEDULING/FAILED/CANCELLED + if (jobState == JobState.PENDING) { + switch (newState) { + case PENDING: + case SCHEDULING: + break; + case FAILED: + case CANCELLED: + finishTime = System.currentTimeMillis(); + break; + default: + throw new DdlException("Invalid job state transition from " + jobState + " to " + newState); + } + } + // SCHEDULING -> RUNNING/FAILED/CANCELLED + else if (jobState == JobState.SCHEDULING) { + switch (newState) { + case RUNNING: + startTime = System.currentTimeMillis(); + break; + case FAILED: + case CANCELLED: + finishTime = System.currentTimeMillis(); + break; + default: + throw new DdlException("Invalid job state transition from " + jobState + " to " + newState); + } + } + // RUNNING -> FINISHED/FAILED/CANCELLED + else if (jobState == JobState.RUNNING) { + switch (newState) { + case FINISHED: + case FAILED: + case CANCELLED: + // set finish time + finishTime = System.currentTimeMillis(); + break; + default: + throw new DdlException("Invalid job state transition from " + jobState + " to " + newState); + } + } else { + // TODO + throw new DdlException("Invalid job state transition from " + jobState + " to " + newState); + } + jobState = newState; + } + public void updateJobInfoByTaskId(Long taskId, String errorMsg) throws DdlException { + writeLock(); try { - for (StatisticsTask task : this.tasks) { + for (StatisticsTask task : tasks) { if (taskId == task.getId()) { if (Strings.isNullOrEmpty(errorMsg)) { - this.progress += 1; - if (this.progress == this.tasks.size()) { - updateJobState(StatisticsJob.JobState.FINISHED); + progress += 1; + if (progress == tasks.size()) { + unprotectedUpdateJobState(StatisticsJob.JobState.FINISHED); } task.updateTaskState(StatisticsTask.TaskState.FINISHED); } else { - this.errorMsgs.add(errorMsg); + errorMsgs.add(errorMsg); task.updateTaskState(StatisticsTask.TaskState.FAILED); - updateJobState(StatisticsJob.JobState.FAILED); + unprotectedUpdateJobState(StatisticsJob.JobState.FAILED); } return; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobScheduler.java index b62fea4381..d09da09de8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobScheduler.java @@ -86,7 +86,12 @@ public class StatisticsJobScheduler extends MasterDaemon { LOG.info("The statistics task queue is full, schedule the job(id={}) later", pendingJob.getId()); } catch (DdlException e) { pendingJobQueue.remove(); - pendingJob.updateJobState(StatisticsJob.JobState.FAILED); + try { + // TODO change to without exception + pendingJob.updateJobState(StatisticsJob.JobState.FAILED); + } catch (DdlException ddlException) { + LOG.fatal(ddlException.getMessage(), e); + } LOG.info("Failed to schedule the statistical job(id={})", pendingJob.getId(), e); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTask.java index a98df1753d..fa236584c3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTask.java @@ -18,6 +18,7 @@ package org.apache.doris.statistics; import org.apache.doris.catalog.Catalog; +import org.apache.doris.common.DdlException; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -135,10 +136,9 @@ public abstract class StatisticsTask implements Callable { @Override public abstract StatisticsTaskResult call() throws Exception; - public void updateTaskState(TaskState newState) throws IllegalStateException{ + // please retain job lock firstly + public void updateTaskState(TaskState newState) throws DdlException { LOG.info("To change statistics task(id={}) state from {} to {}", id, taskState, newState); - writeLock(); - try { // PENDING -> RUNNING/FAILED if (taskState == TaskState.PENDING) { @@ -152,7 +152,7 @@ public abstract class StatisticsTask implements Callable { LOG.info("Statistics task(id={}) state changed from {} to {}", id, taskState, newState); } else { LOG.info("Invalid task(id={}) state transition from {} to {}", id, taskState, newState); - throw new IllegalStateException("Invalid task state transition from PENDING to " + newState); + throw new DdlException("Invalid task state transition from PENDING to " + newState); } return; } @@ -169,14 +169,13 @@ public abstract class StatisticsTask implements Callable { LOG.info("Statistics task(id={}) state changed from {} to {}", id, taskState, newState); } else { LOG.info("Invalid task(id={}) state transition from {} to {}", id, taskState, newState); - throw new IllegalStateException("Invalid task state transition from RUNNING to " + newState); + throw new DdlException("Invalid task state transition from RUNNING to " + newState); } } LOG.info("Invalid task(id={}) state transition from {} to {}", id, taskState, newState); - throw new IllegalStateException("Invalid task state transition from " + taskState + " to " + newState); + throw new DdlException("Invalid task state transition from " + taskState + " to " + newState); } finally { - writeUnlock(); LOG.info("Statistics task(id={}) current state is {}", id, taskState); } }