From 9aa878ea1f951693e8b5b4c21b0d52f140bd420b Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Wed, 20 Dec 2023 09:21:15 +0800 Subject: [PATCH] [Fix](Job)Fixed job scheduling missing certain time window schedules (#28659) Since scheduling itself consumes a certain amount of time, the start time of the time window should not be the current time, but the end time of the last schedule. --- .../doris/job/base/JobExecutionConfiguration.java | 4 ++-- .../org/apache/doris/job/scheduler/JobScheduler.java | 10 ++++++---- .../doris/job/base/JobExecutionConfigurationTest.java | 2 +- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java index 16b9dd2428..0b44073464 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java @@ -178,8 +178,8 @@ public class JobExecutionConfiguration { // Calculate the trigger time list for (long triggerTime = firstTriggerTime; triggerTime <= windowEndTimeMs; triggerTime += intervalMs) { - if (triggerTime >= currentTimeMs && (null == timerDefinition.getEndTimeMs() - || triggerTime < timerDefinition.getEndTimeMs())) { + if (null == timerDefinition.getEndTimeMs() + || triggerTime < timerDefinition.getEndTimeMs()) { timerDefinition.setLatestSchedulerTimeMs(triggerTime); timestamps.add(queryDelayTimeSecond(currentTimeMs, triggerTime)); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java index 47e91d97b4..08bbbb6dba 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java @@ -130,7 +130,7 @@ public class JobScheduler, C> implements Closeable { schedulerInstantJob(job, TaskType.SCHEDULED, null); } //if it's timer job and trigger last window already start, we will scheduler it immediately - cycleTimerJobScheduler(job); + cycleTimerJobScheduler(job, System.currentTimeMillis()); } @Override @@ -139,9 +139,9 @@ public class JobScheduler, C> implements Closeable { } - private void cycleTimerJobScheduler(T job) { + private void cycleTimerJobScheduler(T job, long startTimeWindowMs) { List delaySeconds = job.getJobConfig().getTriggerDelayTimes(System.currentTimeMillis(), - System.currentTimeMillis(), latestBatchSchedulerTimerTaskTimeMs); + startTimeWindowMs, latestBatchSchedulerTimerTaskTimeMs); if (CollectionUtils.isNotEmpty(delaySeconds)) { delaySeconds.forEach(delaySecond -> { TimerJobSchedulerTask timerJobSchedulerTask = new TimerJobSchedulerTask<>(timerJobDisruptor, job); @@ -170,6 +170,8 @@ public class JobScheduler, C> implements Closeable { * We will get the task in the next time window, and then hand it over to the time wheel for timing trigger */ private void executeTimerJobIdsWithinLastTenMinutesWindow() { + + long lastTimeWindowMs = latestBatchSchedulerTimerTaskTimeMs; if (latestBatchSchedulerTimerTaskTimeMs < System.currentTimeMillis()) { this.latestBatchSchedulerTimerTaskTimeMs = System.currentTimeMillis(); } @@ -186,7 +188,7 @@ public class JobScheduler, C> implements Closeable { if (!job.getJobStatus().equals(JobStatus.RUNNING) && !job.getJobConfig().checkIsTimerJob()) { continue; } - cycleTimerJobScheduler(job); + cycleTimerJobScheduler(job, lastTimeWindowMs); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java b/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java index 87d1430375..91678ee5c1 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java @@ -64,7 +64,7 @@ public class JobExecutionConfigurationTest { Assertions.assertArrayEquals(new Long[]{ 500L}, delayTimes.toArray()); delayTimes = configuration.getTriggerDelayTimes( 1001000L, 0L, 1000000L); - Assertions.assertEquals(0, delayTimes.size()); + Assertions.assertEquals(1, delayTimes.size()); } }