diff --git a/docs/en/docs/sql-manual/sql-reference/Show-Statements/SHOW-JOB.md b/docs/en/docs/sql-manual/sql-reference/Show-Statements/SHOW-JOB.md index 60c1dfd1cd..ec11fd3168 100644 --- a/docs/en/docs/sql-manual/sql-reference/Show-Statements/SHOW-JOB.md +++ b/docs/en/docs/sql-manual/sql-reference/Show-Statements/SHOW-JOB.md @@ -71,7 +71,6 @@ Result description: * PAUSED: Paused * STOPPED: end (manually triggered by the user) * FINISHED: Finished - * WAITING_FINISH: pending completion ### Example diff --git a/docs/zh-CN/docs/sql-manual/sql-reference/Show-Statements/SHOW-JOB.md b/docs/zh-CN/docs/sql-manual/sql-reference/Show-Statements/SHOW-JOB.md index 009e2cb4e4..1b5da59908 100644 --- a/docs/zh-CN/docs/sql-manual/sql-reference/Show-Statements/SHOW-JOB.md +++ b/docs/zh-CN/docs/sql-manual/sql-reference/Show-Statements/SHOW-JOB.md @@ -71,7 +71,6 @@ SHOW JOBS 用于展示当前 DB 下所有作业的运行状态,SHOW JOB FOR jo * PAUSED:暂停 * STOPPED:结束(用户手动触发) * FINISHED: 完成 - * WAITING_FINISH: 待结束 ### Example diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/JobStatus.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/JobStatus.java index ae204ef948..f01686a521 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/JobStatus.java +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/JobStatus.java @@ -37,8 +37,6 @@ public enum JobStatus { */ STOPPED, - WAITING_FINISH, - /** * When the task is finished, the finished state will be triggered. */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/SystemJob.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/SystemJob.java deleted file mode 100644 index 3428c1724f..0000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/SystemJob.java +++ /dev/null @@ -1,42 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.scheduler.constants; - -import lombok.Getter; - -/** - * System scheduler event job - * They will start when scheduler starts,don't use this job in other place,it just for system inner scheduler - */ -public enum SystemJob { - - /** - * System cycle scheduler event job, it will start cycle scheduler - */ - SYSTEM_SCHEDULER_JOB("system_scheduler_event_job", 1L); - - @Getter - private final String description; - @Getter - private final Long id; - - SystemJob(String description, Long id) { - this.description = description; - this.id = id; - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java index 297537f712..0b309ec3ce 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java @@ -18,8 +18,6 @@ package org.apache.doris.scheduler.disruptor; import org.apache.doris.catalog.Env; -import org.apache.doris.scheduler.constants.JobStatus; -import org.apache.doris.scheduler.constants.SystemJob; import org.apache.doris.scheduler.exception.JobException; import org.apache.doris.scheduler.executor.TransientTaskExecutor; import org.apache.doris.scheduler.job.Job; @@ -71,10 +69,6 @@ public class TaskHandler implements WorkHandler { */ @Override public void onEvent(TaskEvent event) { - if (checkIsSystemEvent(event)) { - onSystemEvent(); - return; - } switch (event.getTaskType()) { case TimerJobTask: onTimerJobTaskHandle(event); @@ -97,14 +91,14 @@ public class TaskHandler implements WorkHandler { long jobId = taskEvent.getId(); Job job = timerJobManager.getJob(jobId); if (job == null) { - log.info("Event job is null, eventJobId: {}", jobId); + log.info("job is null, jobId: {}", jobId); return; } - if (!job.isRunning() && !job.getJobStatus().equals(JobStatus.WAITING_FINISH)) { - log.info("Event job is not running, eventJobId: {}", jobId); + if (!job.isRunning()) { + log.info("job is not running, eventJobId: {}", jobId); return; } - log.debug("Event job is running, eventJobId: {}", jobId); + log.debug("job is running, eventJobId: {}", jobId); JobTask jobTask = new JobTask(jobId); try { jobTask.setStartTimeMs(System.currentTimeMillis()); @@ -147,27 +141,6 @@ public class TaskHandler implements WorkHandler { } } - /** - * Handles a system event by scheduling batch scheduler tasks. - */ - private void onSystemEvent() { - try { - timerJobManager.batchSchedulerTasks(); - } catch (Exception e) { - log.error("System batch scheduler execute failed", e); - } - } - - /** - * Checks whether the specified event task is a system event. - * - * @param event The event task to be checked. - * @return true if the event task is a system event, false otherwise. - */ - private boolean checkIsSystemEvent(TaskEvent event) { - return Objects.equals(event.getId(), SystemJob.SYSTEM_SCHEDULER_JOB.getId()); - } - private void updateJobStatusIfPastEndTime(Job job) { if (job.isExpired()) { job.finish(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/Job.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/Job.java index 3d29b0b842..e65cf07754 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/Job.java +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/Job.java @@ -18,6 +18,7 @@ package org.apache.doris.scheduler.job; import org.apache.doris.catalog.Env; +import org.apache.doris.common.DdlException; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.TimeUtils; @@ -129,6 +130,14 @@ public class Job implements Writable { @SerializedName("errMsg") private String errMsg; + /** + * if we want to start the job immediately, we can set this flag to true. + * The default value is false. + * when we set this flag to true, the start time will be set to current time. + * we don't need to serialize this field. + */ + private boolean immediatelyStart = false; + public boolean isRunning() { return jobStatus == JobStatus.RUNNING; } @@ -187,21 +196,29 @@ public class Job implements Writable { this.jobStatus = JobStatus.STOPPED; } - public boolean checkJobParam() { + public void checkJobParam() throws DdlException { if (startTimeMs != 0L && startTimeMs < System.currentTimeMillis()) { - return false; + throw new DdlException("startTimeMs must be greater than current time"); + } + if (immediatelyStart && startTimeMs != 0L) { + throw new DdlException("immediately start and startTimeMs can't be set at the same time"); + } + if (immediatelyStart) { + startTimeMs = System.currentTimeMillis(); } if (endTimeMs != 0L && endTimeMs < System.currentTimeMillis()) { - return false; + throw new DdlException("endTimeMs must be greater than current time"); } if (isCycleJob && (intervalMs == null || intervalMs <= 0L)) { - return false; + throw new DdlException("cycle job must set intervalMs"); } if (null == jobCategory) { - return false; + throw new DdlException("jobCategory must be set"); + } + if (null == executor) { + throw new DdlException("Job executor must be set"); } - return null != executor; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TimerJobManager.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TimerJobManager.java index 2b3b922b37..9000dba470 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TimerJobManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TimerJobManager.java @@ -53,10 +53,12 @@ public class TimerJobManager implements Closeable, Writable { private long lastBatchSchedulerTimestamp; + private static final long BATCH_SCHEDULER_INTERVAL_SECONDS = 600; + /** - * batch scheduler interval time + * batch scheduler interval ms time */ - private static final long BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS = 10 * 60 * 1000L; + private static final long BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS = BATCH_SCHEDULER_INTERVAL_SECONDS * 1000L; private boolean isClosed = false; @@ -88,9 +90,7 @@ public class TimerJobManager implements Closeable, Writable { } public Long registerJob(Job job) throws DdlException { - if (!job.checkJobParam()) { - throw new DdlException("Job param is invalid, please check time param"); - } + job.checkJobParam(); checkIsJobNameUsed(job.getDbName(), job.getJobName(), job.getJobCategory()); jobMap.putIfAbsent(job.getJobId(), job); initAndSchedulerJob(job); @@ -144,9 +144,9 @@ public class TimerJobManager implements Closeable, Writable { Long nextExecuteTimeMs = findFistExecuteTime(currentTimeMs, job.getStartTimeMs(), job.getIntervalMs(), job.isCycleJob()); job.setNextExecuteTimeMs(nextExecuteTimeMs); - if (job.getNextExecuteTimeMs() < BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS + lastBatchSchedulerTimestamp) { + if (job.getNextExecuteTimeMs() < lastBatchSchedulerTimestamp) { List executeTimestamp = findTasksBetweenTime(job, - BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS + lastBatchSchedulerTimestamp, + lastBatchSchedulerTimestamp, job.getNextExecuteTimeMs()); if (!executeTimestamp.isEmpty()) { for (Long timestamp : executeTimestamp) { @@ -300,11 +300,6 @@ public class TimerJobManager implements Closeable, Writable { List jobExecuteTimes = new ArrayList<>(); if (!job.isCycleJob() && (nextExecuteTime < endTimeEndWindow)) { jobExecuteTimes.add(nextExecuteTime); - if (job.isStreamingJob()) { - job.setJobStatus(JobStatus.RUNNING); - } else { - job.setJobStatus(JobStatus.WAITING_FINISH); - } return jobExecuteTimes; } while (endTimeEndWindow >= nextExecuteTime) { @@ -323,18 +318,18 @@ public class TimerJobManager implements Closeable, Writable { private void executeJobIdsWithinLastTenMinutesWindow() { // if the task executes for more than 10 minutes, it will be delay, so, // set lastBatchSchedulerTimestamp to current time - if (lastBatchSchedulerTimestamp + BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS < System.currentTimeMillis()) { + if (lastBatchSchedulerTimestamp < System.currentTimeMillis()) { this.lastBatchSchedulerTimestamp = System.currentTimeMillis(); } + this.lastBatchSchedulerTimestamp += BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS; if (jobMap.isEmpty()) { - this.lastBatchSchedulerTimestamp += BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS; return; } jobMap.forEach((k, v) -> { if (v.isRunning() && (v.getNextExecuteTimeMs() - + v.getIntervalMs() < lastBatchSchedulerTimestamp + BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS)) { + + v.getIntervalMs() < lastBatchSchedulerTimestamp)) { List executeTimes = findTasksBetweenTime( - v, lastBatchSchedulerTimestamp + BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS, + v, lastBatchSchedulerTimestamp, v.getNextExecuteTimeMs()); if (!executeTimes.isEmpty()) { for (Long executeTime : executeTimes) { @@ -343,7 +338,6 @@ public class TimerJobManager implements Closeable, Writable { } } }); - this.lastBatchSchedulerTimestamp += BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS; } /** @@ -351,10 +345,7 @@ public class TimerJobManager implements Closeable, Writable { * Jobs will be re-registered after the task is completed */ private void cycleSystemSchedulerTasks() { - dorisTimer.newTimeout(timeout -> { - batchSchedulerTasks(); - cycleSystemSchedulerTasks(); - }, BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS, TimeUnit.MILLISECONDS); + dorisTimer.newTimeout(timeout -> batchSchedulerTasks(), BATCH_SCHEDULER_INTERVAL_SECONDS, TimeUnit.SECONDS); } /** diff --git a/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/TimerJobManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/TimerJobManagerTest.java index 86c0dcdca9..fd871be962 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/TimerJobManagerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/TimerJobManagerTest.java @@ -132,6 +132,18 @@ public class TimerJobManagerTest { Assertions.assertEquals(2, testExecuteCount.get()); } + @Test + public void testCycleSchedulerWithImmediatelyStart(@Mocked Env env) throws DdlException { + setContext(env); + long startTimestamp = System.currentTimeMillis(); + job.setImmediatelyStart(true); + timerJobManager.registerJob(job); + //consider the time of the first execution and give some buffer time + Awaitility.await().atMost(16, TimeUnit.SECONDS).until(() -> System.currentTimeMillis() + >= startTimestamp + 15000L); + Assertions.assertEquals(3, testExecuteCount.get()); + } + @Test public void testOneTimeJob(@Mocked Env env) throws DdlException { setContext(env);