[Improve](Job) Create task adds concurrency control (#29144)

This commit is contained in:
Calvin Kirs
2023-12-28 10:24:39 +08:00
committed by GitHub
parent 8b225c6c3c
commit 1284975b9b
2 changed files with 21 additions and 9 deletions

View File

@ -50,6 +50,8 @@ import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
@Data
@ -105,7 +107,7 @@ public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T, C
UserIdentity createUser,
JobExecutionConfiguration jobConfig) {
this(jobId, jobName, jobStatus, currentDbName, comment,
createUser, jobConfig, System.currentTimeMillis(), null, null);
createUser, jobConfig, System.currentTimeMillis(), null);
}
public AbstractJob(Long jobId, String jobName, JobStatus jobStatus,
@ -114,8 +116,7 @@ public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T, C
UserIdentity createUser,
JobExecutionConfiguration jobConfig,
Long createTimeMs,
String executeSql,
List<T> runningTasks) {
String executeSql) {
this.jobId = jobId;
this.jobName = jobName;
this.jobStatus = jobStatus;
@ -125,11 +126,12 @@ public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T, C
this.jobConfig = jobConfig;
this.createTimeMs = createTimeMs;
this.executeSql = executeSql;
this.runningTasks = runningTasks;
}
private List<T> runningTasks = new ArrayList<>();
private Lock createTaskLock = new ReentrantLock();
@Override
public void cancelAllTasks() throws JobException {
if (CollectionUtils.isEmpty(runningTasks)) {
@ -201,13 +203,22 @@ public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T, C
log.info("job is not ready for scheduling, job id is {}", jobId);
return new ArrayList<>();
}
return createTasks(taskType, taskContext);
try {
//it's better to use tryLock and add timeout limit
createTaskLock.lock();
if (!isReadyForScheduling(taskContext)) {
log.info("job is not ready for scheduling, job id is {}", jobId);
return new ArrayList<>();
}
List<T> tasks = createTasks(taskType, taskContext);
tasks.forEach(task -> log.info("common create task, job id is {}, task id is {}", jobId, task.getTaskId()));
return tasks;
} finally {
createTaskLock.unlock();
}
}
public void initTasks(Collection<? extends T> tasks, TaskType taskType) {
if (CollectionUtils.isEmpty(getRunningTasks())) {
runningTasks = new ArrayList<>();
}
tasks.forEach(task -> {
task.setTaskType(taskType);
task.setJobId(getJobId());
@ -260,6 +271,7 @@ public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T, C
String jsonJob = Text.readString(in);
AbstractJob job = GsonUtils.GSON.fromJson(jsonJob, AbstractJob.class);
job.runningTasks = new ArrayList<>();
job.createTaskLock = new ReentrantLock();
return job;
}

View File

@ -217,7 +217,7 @@ public class InsertJob extends AbstractJob<InsertTask, Map<Object, Object>> impl
Long createTimeMs,
String executeSql) {
super(getNextJobId(), jobName, jobStatus, dbName, comment, createUser,
jobConfig, createTimeMs, executeSql, null);
jobConfig, createTimeMs, executeSql);
this.dbId = ConnectContext.get().getCurrentDbId();
}