[branch-2.1][Feat](Job)After a job is paused, it can be manually triggered to execute. (#40180)
… pick (#39565)
This commit is contained in:
@ -212,8 +212,9 @@ public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T, C
|
||||
}
|
||||
|
||||
public List<T> commonCreateTasks(TaskType taskType, C taskContext) {
|
||||
if (!getJobStatus().equals(JobStatus.RUNNING)) {
|
||||
log.warn("job is not running, job id is {}", jobId);
|
||||
if (!canCreateTask(taskType)) {
|
||||
log.info("job is not ready for scheduling, job id is {},job status is {}, taskType is {}", jobId,
|
||||
jobStatus, taskType);
|
||||
return new ArrayList<>();
|
||||
}
|
||||
if (!isReadyForScheduling(taskContext)) {
|
||||
@ -235,6 +236,19 @@ public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T, C
|
||||
}
|
||||
}
|
||||
|
||||
private boolean canCreateTask(TaskType taskType) {
|
||||
JobStatus currentJobStatus = getJobStatus();
|
||||
|
||||
switch (taskType) {
|
||||
case SCHEDULED:
|
||||
return currentJobStatus.equals(JobStatus.RUNNING);
|
||||
case MANUAL:
|
||||
return currentJobStatus.equals(JobStatus.RUNNING) || currentJobStatus.equals(JobStatus.PAUSED);
|
||||
default:
|
||||
throw new IllegalArgumentException("Unsupported TaskType: " + taskType);
|
||||
}
|
||||
}
|
||||
|
||||
public void initTasks(Collection<? extends T> tasks, TaskType taskType) {
|
||||
tasks.forEach(task -> {
|
||||
task.setTaskType(taskType);
|
||||
@ -307,7 +321,7 @@ public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T, C
|
||||
@Override
|
||||
public void onTaskFail(T task) throws JobException {
|
||||
failedTaskCount.incrementAndGet();
|
||||
updateJobStatusIfEnd(false);
|
||||
updateJobStatusIfEnd(false, task.getTaskType());
|
||||
runningTasks.remove(task);
|
||||
logUpdateOperation();
|
||||
}
|
||||
@ -315,16 +329,16 @@ public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T, C
|
||||
@Override
|
||||
public void onTaskSuccess(T task) throws JobException {
|
||||
succeedTaskCount.incrementAndGet();
|
||||
updateJobStatusIfEnd(true);
|
||||
updateJobStatusIfEnd(true, task.getTaskType());
|
||||
runningTasks.remove(task);
|
||||
logUpdateOperation();
|
||||
|
||||
}
|
||||
|
||||
|
||||
private void updateJobStatusIfEnd(boolean taskSuccess) throws JobException {
|
||||
private void updateJobStatusIfEnd(boolean taskSuccess, TaskType taskType) throws JobException {
|
||||
JobExecuteType executeType = getJobConfig().getExecuteType();
|
||||
if (executeType.equals(JobExecuteType.MANUAL)) {
|
||||
if (executeType.equals(JobExecuteType.MANUAL) || taskType.equals(TaskType.MANUAL)) {
|
||||
return;
|
||||
}
|
||||
switch (executeType) {
|
||||
|
||||
@ -58,6 +58,7 @@ public class InsertTask extends AbstractTask {
|
||||
new Column("Status", ScalarType.createStringType()),
|
||||
new Column("ErrorMsg", ScalarType.createStringType()),
|
||||
new Column("CreateTime", ScalarType.createStringType()),
|
||||
new Column("StartTime", ScalarType.createStringType()),
|
||||
new Column("FinishTime", ScalarType.createStringType()),
|
||||
new Column("TrackingUrl", ScalarType.createStringType()),
|
||||
new Column("LoadStatistic", ScalarType.createStringType()),
|
||||
@ -247,6 +248,8 @@ public class InsertTask extends AbstractTask {
|
||||
trow.addToColumnValue(new TCell().setStringVal(errorMsg));
|
||||
// create time
|
||||
trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(getCreateTimeMs())));
|
||||
trow.addToColumnValue(new TCell().setStringVal(null == getStartTimeMs() ? ""
|
||||
: TimeUtils.longToTimeString(getStartTimeMs())));
|
||||
// load end time
|
||||
trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(getFinishTimeMs())));
|
||||
// tracking url
|
||||
@ -274,7 +277,10 @@ public class InsertTask extends AbstractTask {
|
||||
trow.addToColumnValue(new TCell().setStringVal(getStatus().name()));
|
||||
trow.addToColumnValue(new TCell().setStringVal(""));
|
||||
trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(getCreateTimeMs())));
|
||||
trow.addToColumnValue(new TCell().setStringVal(""));
|
||||
trow.addToColumnValue(new TCell().setStringVal(null == getStartTimeMs() ? ""
|
||||
: TimeUtils.longToTimeString(getStartTimeMs())));
|
||||
trow.addToColumnValue(new TCell().setStringVal(null == getFinishTimeMs() ? ""
|
||||
: TimeUtils.longToTimeString(getFinishTimeMs())));
|
||||
trow.addToColumnValue(new TCell().setStringVal(""));
|
||||
trow.addToColumnValue(new TCell().setStringVal(""));
|
||||
trow.addToColumnValue(new TCell().setStringVal(userIdentity.getQualifiedUser()));
|
||||
|
||||
@ -188,10 +188,9 @@ public class JobScheduler<T extends AbstractJob<?, C>, C> implements Closeable {
|
||||
clearEndJob(job);
|
||||
continue;
|
||||
}
|
||||
if (!job.getJobStatus().equals(JobStatus.RUNNING) && !job.getJobConfig().checkIsTimerJob()) {
|
||||
continue;
|
||||
if (job.getJobStatus().equals(JobStatus.RUNNING) && job.getJobConfig().checkIsTimerJob()) {
|
||||
cycleTimerJobScheduler(job, lastTimeWindowMs);
|
||||
}
|
||||
cycleTimerJobScheduler(job, lastTimeWindowMs);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user