[Improve](Job)Job internal interface provides immediate scheduling (#23735)

Delete meaningless job status
System scheduling is executed in the time wheel
Optimize window calculation code
This commit is contained in:
Calvin Kirs
2023-09-01 12:50:08 +08:00
committed by GitHub
parent c31cb5fd11
commit e88c218390
8 changed files with 51 additions and 104 deletions

View File

@ -71,7 +71,6 @@ Result description:
* PAUSED: Paused
* STOPPED: end (manually triggered by the user)
* FINISHED: Finished
* WAITING_FINISH: pending completion
### Example

View File

@ -71,7 +71,6 @@ SHOW JOBS 用于展示当前 DB 下所有作业的运行状态,SHOW JOB FOR jo
* PAUSED:暂停
* STOPPED:结束(用户手动触发)
* FINISHED: 完成
* WAITING_FINISH: 待结束
### Example

View File

@ -37,8 +37,6 @@ public enum JobStatus {
*/
STOPPED,
WAITING_FINISH,
/**
* When the task is finished, the finished state will be triggered.
*/

View File

@ -1,42 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.scheduler.constants;
import lombok.Getter;
/**
* System scheduler event job
* They will start when scheduler starts,don't use this job in other place,it just for system inner scheduler
*/
public enum SystemJob {
/**
* System cycle scheduler event job, it will start cycle scheduler
*/
SYSTEM_SCHEDULER_JOB("system_scheduler_event_job", 1L);
@Getter
private final String description;
@Getter
private final Long id;
SystemJob(String description, Long id) {
this.description = description;
this.id = id;
}
}

View File

@ -18,8 +18,6 @@
package org.apache.doris.scheduler.disruptor;
import org.apache.doris.catalog.Env;
import org.apache.doris.scheduler.constants.JobStatus;
import org.apache.doris.scheduler.constants.SystemJob;
import org.apache.doris.scheduler.exception.JobException;
import org.apache.doris.scheduler.executor.TransientTaskExecutor;
import org.apache.doris.scheduler.job.Job;
@ -71,10 +69,6 @@ public class TaskHandler implements WorkHandler<TaskEvent> {
*/
@Override
public void onEvent(TaskEvent event) {
if (checkIsSystemEvent(event)) {
onSystemEvent();
return;
}
switch (event.getTaskType()) {
case TimerJobTask:
onTimerJobTaskHandle(event);
@ -97,14 +91,14 @@ public class TaskHandler implements WorkHandler<TaskEvent> {
long jobId = taskEvent.getId();
Job job = timerJobManager.getJob(jobId);
if (job == null) {
log.info("Event job is null, eventJobId: {}", jobId);
log.info("job is null, jobId: {}", jobId);
return;
}
if (!job.isRunning() && !job.getJobStatus().equals(JobStatus.WAITING_FINISH)) {
log.info("Event job is not running, eventJobId: {}", jobId);
if (!job.isRunning()) {
log.info("job is not running, eventJobId: {}", jobId);
return;
}
log.debug("Event job is running, eventJobId: {}", jobId);
log.debug("job is running, eventJobId: {}", jobId);
JobTask jobTask = new JobTask(jobId);
try {
jobTask.setStartTimeMs(System.currentTimeMillis());
@ -147,27 +141,6 @@ public class TaskHandler implements WorkHandler<TaskEvent> {
}
}
/**
* Handles a system event by scheduling batch scheduler tasks.
*/
private void onSystemEvent() {
try {
timerJobManager.batchSchedulerTasks();
} catch (Exception e) {
log.error("System batch scheduler execute failed", e);
}
}
/**
* Checks whether the specified event task is a system event.
*
* @param event The event task to be checked.
* @return true if the event task is a system event, false otherwise.
*/
private boolean checkIsSystemEvent(TaskEvent event) {
return Objects.equals(event.getId(), SystemJob.SYSTEM_SCHEDULER_JOB.getId());
}
private void updateJobStatusIfPastEndTime(Job job) {
if (job.isExpired()) {
job.finish();

View File

@ -18,6 +18,7 @@
package org.apache.doris.scheduler.job;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.TimeUtils;
@ -129,6 +130,14 @@ public class Job implements Writable {
@SerializedName("errMsg")
private String errMsg;
/**
* if we want to start the job immediately, we can set this flag to true.
* The default value is false.
* when we set this flag to true, the start time will be set to current time.
* we don't need to serialize this field.
*/
private boolean immediatelyStart = false;
public boolean isRunning() {
return jobStatus == JobStatus.RUNNING;
}
@ -187,21 +196,29 @@ public class Job implements Writable {
this.jobStatus = JobStatus.STOPPED;
}
public boolean checkJobParam() {
public void checkJobParam() throws DdlException {
if (startTimeMs != 0L && startTimeMs < System.currentTimeMillis()) {
return false;
throw new DdlException("startTimeMs must be greater than current time");
}
if (immediatelyStart && startTimeMs != 0L) {
throw new DdlException("immediately start and startTimeMs can't be set at the same time");
}
if (immediatelyStart) {
startTimeMs = System.currentTimeMillis();
}
if (endTimeMs != 0L && endTimeMs < System.currentTimeMillis()) {
return false;
throw new DdlException("endTimeMs must be greater than current time");
}
if (isCycleJob && (intervalMs == null || intervalMs <= 0L)) {
return false;
throw new DdlException("cycle job must set intervalMs");
}
if (null == jobCategory) {
return false;
throw new DdlException("jobCategory must be set");
}
if (null == executor) {
throw new DdlException("Job executor must be set");
}
return null != executor;
}

View File

@ -53,10 +53,12 @@ public class TimerJobManager implements Closeable, Writable {
private long lastBatchSchedulerTimestamp;
private static final long BATCH_SCHEDULER_INTERVAL_SECONDS = 600;
/**
* batch scheduler interval time
* batch scheduler interval ms time
*/
private static final long BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS = 10 * 60 * 1000L;
private static final long BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS = BATCH_SCHEDULER_INTERVAL_SECONDS * 1000L;
private boolean isClosed = false;
@ -88,9 +90,7 @@ public class TimerJobManager implements Closeable, Writable {
}
public Long registerJob(Job job) throws DdlException {
if (!job.checkJobParam()) {
throw new DdlException("Job param is invalid, please check time param");
}
job.checkJobParam();
checkIsJobNameUsed(job.getDbName(), job.getJobName(), job.getJobCategory());
jobMap.putIfAbsent(job.getJobId(), job);
initAndSchedulerJob(job);
@ -144,9 +144,9 @@ public class TimerJobManager implements Closeable, Writable {
Long nextExecuteTimeMs = findFistExecuteTime(currentTimeMs, job.getStartTimeMs(),
job.getIntervalMs(), job.isCycleJob());
job.setNextExecuteTimeMs(nextExecuteTimeMs);
if (job.getNextExecuteTimeMs() < BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS + lastBatchSchedulerTimestamp) {
if (job.getNextExecuteTimeMs() < lastBatchSchedulerTimestamp) {
List<Long> executeTimestamp = findTasksBetweenTime(job,
BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS + lastBatchSchedulerTimestamp,
lastBatchSchedulerTimestamp,
job.getNextExecuteTimeMs());
if (!executeTimestamp.isEmpty()) {
for (Long timestamp : executeTimestamp) {
@ -300,11 +300,6 @@ public class TimerJobManager implements Closeable, Writable {
List<Long> jobExecuteTimes = new ArrayList<>();
if (!job.isCycleJob() && (nextExecuteTime < endTimeEndWindow)) {
jobExecuteTimes.add(nextExecuteTime);
if (job.isStreamingJob()) {
job.setJobStatus(JobStatus.RUNNING);
} else {
job.setJobStatus(JobStatus.WAITING_FINISH);
}
return jobExecuteTimes;
}
while (endTimeEndWindow >= nextExecuteTime) {
@ -323,18 +318,18 @@ public class TimerJobManager implements Closeable, Writable {
private void executeJobIdsWithinLastTenMinutesWindow() {
// if the task executes for more than 10 minutes, it will be delay, so,
// set lastBatchSchedulerTimestamp to current time
if (lastBatchSchedulerTimestamp + BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS < System.currentTimeMillis()) {
if (lastBatchSchedulerTimestamp < System.currentTimeMillis()) {
this.lastBatchSchedulerTimestamp = System.currentTimeMillis();
}
this.lastBatchSchedulerTimestamp += BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS;
if (jobMap.isEmpty()) {
this.lastBatchSchedulerTimestamp += BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS;
return;
}
jobMap.forEach((k, v) -> {
if (v.isRunning() && (v.getNextExecuteTimeMs()
+ v.getIntervalMs() < lastBatchSchedulerTimestamp + BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS)) {
+ v.getIntervalMs() < lastBatchSchedulerTimestamp)) {
List<Long> executeTimes = findTasksBetweenTime(
v, lastBatchSchedulerTimestamp + BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS,
v, lastBatchSchedulerTimestamp,
v.getNextExecuteTimeMs());
if (!executeTimes.isEmpty()) {
for (Long executeTime : executeTimes) {
@ -343,7 +338,6 @@ public class TimerJobManager implements Closeable, Writable {
}
}
});
this.lastBatchSchedulerTimestamp += BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS;
}
/**
@ -351,10 +345,7 @@ public class TimerJobManager implements Closeable, Writable {
* Jobs will be re-registered after the task is completed
*/
private void cycleSystemSchedulerTasks() {
dorisTimer.newTimeout(timeout -> {
batchSchedulerTasks();
cycleSystemSchedulerTasks();
}, BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS, TimeUnit.MILLISECONDS);
dorisTimer.newTimeout(timeout -> batchSchedulerTasks(), BATCH_SCHEDULER_INTERVAL_SECONDS, TimeUnit.SECONDS);
}
/**

View File

@ -132,6 +132,18 @@ public class TimerJobManagerTest {
Assertions.assertEquals(2, testExecuteCount.get());
}
@Test
public void testCycleSchedulerWithImmediatelyStart(@Mocked Env env) throws DdlException {
setContext(env);
long startTimestamp = System.currentTimeMillis();
job.setImmediatelyStart(true);
timerJobManager.registerJob(job);
//consider the time of the first execution and give some buffer time
Awaitility.await().atMost(16, TimeUnit.SECONDS).until(() -> System.currentTimeMillis()
>= startTimestamp + 15000L);
Assertions.assertEquals(3, testExecuteCount.get());
}
@Test
public void testOneTimeJob(@Mocked Env env) throws DdlException {
setContext(env);