(Refactor)[Statistics] Fix lock risks in Statistics Job (#9256)

* (Refactor)[Statistics] Fix lock risks in Statistics Job

1. Remove lock nesting between job and task
2. Solve the deadlock problem during job update
3. Avoid printing the log while holding the lock

* Add log
This commit is contained in:
EmmyMiao87
2022-04-29 10:46:24 +08:00
committed by GitHub
parent 2fa19113ab
commit c132abd2bd
3 changed files with 87 additions and 92 deletions

View File

@ -20,6 +20,7 @@ package org.apache.doris.statistics;
import org.apache.doris.analysis.AnalyzeStmt;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
@ -110,141 +111,131 @@ public class StatisticsJob {
}
public long getId() {
return this.id;
return id;
}
public long getDbId() {
return this.dbId;
return dbId;
}
public Set<Long> getTblIds() {
return this.tblIds;
return tblIds;
}
public Map<Long, List<String>> getTableIdToColumnName() {
return this.tableIdToColumnName;
return tableIdToColumnName;
}
public Map<String, String> getProperties() {
return this.properties;
return properties;
}
public List<StatisticsTask> getTasks() {
return this.tasks;
return tasks;
}
public List<String> getErrorMsgs() {
return this.errorMsgs;
return errorMsgs;
}
public JobState getJobState() {
return this.jobState;
return jobState;
}
public long getCreateTime() {
return this.createTime;
return createTime;
}
public long getStartTime() {
return this.startTime;
return startTime;
}
public long getFinishTime() {
return this.finishTime;
return finishTime;
}
public int getProgress() {
return this.progress;
return progress;
}
public void updateJobState(JobState newState) throws IllegalStateException {
public void updateJobState(JobState newState) throws DdlException {
LOG.info("To change statistics job(id={}) state from {} to {}", id, jobState, newState);
writeLock();
JobState fromState = jobState;
try {
// PENDING -> SCHEDULING/FAILED/CANCELLED
if (jobState == JobState.PENDING) {
if (newState == JobState.SCHEDULING) {
this.jobState = newState;
LOG.info("Statistics job(id={}) state changed from {} to {}", id, jobState, newState);
} else if (newState == JobState.FAILED) {
this.jobState = newState;
LOG.info("Statistics job(id={}) state changed from {} to {}", id, jobState, newState);
} else if (newState == JobState.CANCELLED) {
this.jobState = newState;
LOG.info("Statistics job(id={}) state changed from {} to {}", id, jobState, newState);
} else {
LOG.info("Invalid statistics job(id={}) state transition from {} to {}", id, jobState, newState);
throw new IllegalStateException("Invalid job state transition from PENDING to " + newState);
}
return;
}
// SCHEDULING -> RUNNING/FAILED/CANCELLED
if (jobState == JobState.SCHEDULING) {
if (newState == JobState.RUNNING) {
this.jobState = newState;
// job start running, set start time
this.startTime = System.currentTimeMillis();
LOG.info("Statistics job(id={}) state changed from {} to {}", id, jobState, newState);
} else if (newState == JobState.FAILED) {
this.jobState = newState;
LOG.info("Statistics job(id={}) state changed from {} to {}", id, jobState, newState);
} else if (newState == JobState.CANCELLED) {
this.jobState = newState;
LOG.info("Statistics job(id={}) state changed from {} to {}", id, jobState, newState);
} else {
LOG.info("Invalid statistics job(id={}) state transition from {} to {}", id, jobState, newState);
throw new IllegalStateException("Invalid job state transition from SCHEDULING to " + newState);
}
return;
}
// RUNNING -> FINISHED/FAILED/CANCELLED
if (jobState == JobState.RUNNING) {
if (newState == JobState.FINISHED) {
// set finish time
this.finishTime = System.currentTimeMillis();
this.jobState = newState;
LOG.info("Statistics job(id={}) state changed from {} to {}", id, jobState, newState);
} else if (newState == JobState.FAILED) {
this.jobState = newState;
LOG.info("Statistics job(id={}) state changed from {} to {}", id, jobState, newState);
} else if (newState == JobState.CANCELLED) {
this.jobState = newState;
LOG.info("Statistics job(id={}) state changed from {} to {}", id, jobState, newState);
} else {
LOG.info("Invalid statistics job(id={}) state transition from {} to {}", id, jobState, newState);
throw new IllegalStateException("Invalid job state transition from RUNNING to " + newState);
}
return;
}
// unsupported transition
LOG.info("Invalid job(id={}) state transition from {} to {}", id, jobState, newState);
throw new IllegalStateException("Invalid job state transition from " + jobState + " to " + newState);
unprotectedUpdateJobState(newState);
} catch (DdlException e) {
LOG.warn(e.getMessage(), e);
throw e;
} finally {
writeUnlock();
LOG.info("Statistics job(id={}) current state is {} ", id, jobState);
}
LOG.info("Statistics job(id={}) state changed from {} to {}", id, fromState, jobState);
}
public void updateJobInfoByTaskId(Long taskId, String errorMsg) {
writeLock();
private void unprotectedUpdateJobState(JobState newState) throws DdlException {
// PENDING -> PENDING/SCHEDULING/FAILED/CANCELLED
if (jobState == JobState.PENDING) {
switch (newState) {
case PENDING:
case SCHEDULING:
break;
case FAILED:
case CANCELLED:
finishTime = System.currentTimeMillis();
break;
default:
throw new DdlException("Invalid job state transition from " + jobState + " to " + newState);
}
}
// SCHEDULING -> RUNNING/FAILED/CANCELLED
else if (jobState == JobState.SCHEDULING) {
switch (newState) {
case RUNNING:
startTime = System.currentTimeMillis();
break;
case FAILED:
case CANCELLED:
finishTime = System.currentTimeMillis();
break;
default:
throw new DdlException("Invalid job state transition from " + jobState + " to " + newState);
}
}
// RUNNING -> FINISHED/FAILED/CANCELLED
else if (jobState == JobState.RUNNING) {
switch (newState) {
case FINISHED:
case FAILED:
case CANCELLED:
// set finish time
finishTime = System.currentTimeMillis();
break;
default:
throw new DdlException("Invalid job state transition from " + jobState + " to " + newState);
}
} else {
// TODO
throw new DdlException("Invalid job state transition from " + jobState + " to " + newState);
}
jobState = newState;
}
public void updateJobInfoByTaskId(Long taskId, String errorMsg) throws DdlException {
writeLock();
try {
for (StatisticsTask task : this.tasks) {
for (StatisticsTask task : tasks) {
if (taskId == task.getId()) {
if (Strings.isNullOrEmpty(errorMsg)) {
this.progress += 1;
if (this.progress == this.tasks.size()) {
updateJobState(StatisticsJob.JobState.FINISHED);
progress += 1;
if (progress == tasks.size()) {
unprotectedUpdateJobState(StatisticsJob.JobState.FINISHED);
}
task.updateTaskState(StatisticsTask.TaskState.FINISHED);
} else {
this.errorMsgs.add(errorMsg);
errorMsgs.add(errorMsg);
task.updateTaskState(StatisticsTask.TaskState.FAILED);
updateJobState(StatisticsJob.JobState.FAILED);
unprotectedUpdateJobState(StatisticsJob.JobState.FAILED);
}
return;
}

View File

@ -86,7 +86,12 @@ public class StatisticsJobScheduler extends MasterDaemon {
LOG.info("The statistics task queue is full, schedule the job(id={}) later", pendingJob.getId());
} catch (DdlException e) {
pendingJobQueue.remove();
pendingJob.updateJobState(StatisticsJob.JobState.FAILED);
try {
// TODO change to without exception
pendingJob.updateJobState(StatisticsJob.JobState.FAILED);
} catch (DdlException ddlException) {
LOG.fatal(ddlException.getMessage(), e);
}
LOG.info("Failed to schedule the statistical job(id={})", pendingJob.getId(), e);
}
}

View File

@ -18,6 +18,7 @@
package org.apache.doris.statistics;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.DdlException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -135,10 +136,9 @@ public abstract class StatisticsTask implements Callable<StatisticsTaskResult> {
@Override
public abstract StatisticsTaskResult call() throws Exception;
public void updateTaskState(TaskState newState) throws IllegalStateException{
// please retain job lock firstly
public void updateTaskState(TaskState newState) throws DdlException {
LOG.info("To change statistics task(id={}) state from {} to {}", id, taskState, newState);
writeLock();
try {
// PENDING -> RUNNING/FAILED
if (taskState == TaskState.PENDING) {
@ -152,7 +152,7 @@ public abstract class StatisticsTask implements Callable<StatisticsTaskResult> {
LOG.info("Statistics task(id={}) state changed from {} to {}", id, taskState, newState);
} else {
LOG.info("Invalid task(id={}) state transition from {} to {}", id, taskState, newState);
throw new IllegalStateException("Invalid task state transition from PENDING to " + newState);
throw new DdlException("Invalid task state transition from PENDING to " + newState);
}
return;
}
@ -169,14 +169,13 @@ public abstract class StatisticsTask implements Callable<StatisticsTaskResult> {
LOG.info("Statistics task(id={}) state changed from {} to {}", id, taskState, newState);
} else {
LOG.info("Invalid task(id={}) state transition from {} to {}", id, taskState, newState);
throw new IllegalStateException("Invalid task state transition from RUNNING to " + newState);
throw new DdlException("Invalid task state transition from RUNNING to " + newState);
}
}
LOG.info("Invalid task(id={}) state transition from {} to {}", id, taskState, newState);
throw new IllegalStateException("Invalid task state transition from " + taskState + " to " + newState);
throw new DdlException("Invalid task state transition from " + taskState + " to " + newState);
} finally {
writeUnlock();
LOG.info("Statistics task(id={}) current state is {}", id, taskState);
}
}