From 2a12251b8331a2ce6e189b5bbd6e385b87d71668 Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Sat, 31 Aug 2024 19:24:53 +0800 Subject: [PATCH] [branch-2.1][Feat](Job)After a job is paused, it can be manually triggered to execute. (#40180) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit … pick (#39565) --- .../apache/doris/job/base/AbstractJob.java | 26 ++++++++++++++----- .../job/extensions/insert/InsertTask.java | 8 +++++- .../doris/job/scheduler/JobScheduler.java | 5 ++-- 3 files changed, 29 insertions(+), 10 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java index 3f595d6daf..94a0b0146c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java @@ -212,8 +212,9 @@ public abstract class AbstractJob implements Job commonCreateTasks(TaskType taskType, C taskContext) { - if (!getJobStatus().equals(JobStatus.RUNNING)) { - log.warn("job is not running, job id is {}", jobId); + if (!canCreateTask(taskType)) { + log.info("job is not ready for scheduling, job id is {},job status is {}, taskType is {}", jobId, + jobStatus, taskType); return new ArrayList<>(); } if (!isReadyForScheduling(taskContext)) { @@ -235,6 +236,19 @@ public abstract class AbstractJob implements Job tasks, TaskType taskType) { tasks.forEach(task -> { task.setTaskType(taskType); @@ -307,7 +321,7 @@ public abstract class AbstractJob implements Job implements Job, C> implements Closeable { clearEndJob(job); continue; } - if (!job.getJobStatus().equals(JobStatus.RUNNING) && !job.getJobConfig().checkIsTimerJob()) { - continue; + if (job.getJobStatus().equals(JobStatus.RUNNING) && job.getJobConfig().checkIsTimerJob()) { + cycleTimerJobScheduler(job, lastTimeWindowMs); } - cycleTimerJobScheduler(job, lastTimeWindowMs); } }