diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java index 367d03fa86..8a8db0a3d1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java @@ -128,6 +128,7 @@ public class CreateJobStmt extends DdlStmt { if (null != onceJobStartTimestamp) { if (onceJobStartTimestamp.equalsIgnoreCase(CURRENT_TIMESTAMP_STRING)) { jobExecutionConfiguration.setImmediate(true); + timerDefinition.setStartTimeMs(System.currentTimeMillis()); } else { timerDefinition.setStartTimeMs(TimeUtils.timeStringToLong(onceJobStartTimestamp)); } @@ -149,6 +150,8 @@ public class CreateJobStmt extends DdlStmt { if (null != startsTimeStamp) { if (startsTimeStamp.equalsIgnoreCase(CURRENT_TIMESTAMP_STRING)) { jobExecutionConfiguration.setImmediate(true); + //To avoid immediate re-scheduling, set the start time of the timer 100ms before the current time. + timerDefinition.setStartTimeMs(System.currentTimeMillis()); } else { timerDefinition.setStartTimeMs(TimeUtils.timeStringToLong(startsTimeStamp)); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java index 46bc2c71ea..301222d543 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java @@ -57,9 +57,7 @@ public class JobExecutionConfiguration { if (executeType == JobExecuteType.INSTANT || executeType == JobExecuteType.MANUAL) { return; } - - checkTimerDefinition(immediate); - + checkTimerDefinition(); if (executeType == JobExecuteType.ONE_TIME) { validateStartTimeMs(); return; @@ -80,12 +78,12 @@ public class JobExecutionConfiguration { } } - private void checkTimerDefinition(boolean immediate) { + private void checkTimerDefinition() { if (timerDefinition == null) { throw new IllegalArgumentException( "timerDefinition cannot be null when executeType is not instant or manual"); } - timerDefinition.checkParams(immediate); + timerDefinition.checkParams(); } private void validateStartTimeMs() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/TimerDefinition.java b/fe/fe-core/src/main/java/org/apache/doris/job/base/TimerDefinition.java index bcff4216c6..9068a18f69 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/base/TimerDefinition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/TimerDefinition.java @@ -38,13 +38,7 @@ public class TimerDefinition { private Long latestSchedulerTimeMs; - public void checkParams(boolean immediate) { - if (null != startTimeMs && immediate) { - throw new IllegalArgumentException("startTimeMs must be null when immediate is true"); - } - if (null == startTimeMs && immediate) { - startTimeMs = System.currentTimeMillis(); - } + public void checkParams() { if (null == startTimeMs) { startTimeMs = System.currentTimeMillis() + intervalUnit.getIntervalMs(interval); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java index 5ba88c6e3c..33d12c30a4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java @@ -124,6 +124,10 @@ public class JobScheduler, C> implements Closeable { schedulerInstantJob(job, TaskType.SCHEDULED, null); } } + if (job.getJobConfig().isImmediate() && JobExecuteType.ONE_TIME.equals(job.getJobConfig().getExecuteType())) { + schedulerInstantJob(job, TaskType.SCHEDULED, null); + return; + } //RECURRING job and immediate is true if (job.getJobConfig().isImmediate()) { job.getJobConfig().getTimerDefinition().setLatestSchedulerTimeMs(System.currentTimeMillis()); diff --git a/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java b/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java index 6d01f09c5e..24c486baff 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java @@ -75,8 +75,8 @@ public class JobExecutionConfigurationTest { JobExecutionConfiguration configuration = new JobExecutionConfiguration(); configuration.setExecuteType(JobExecuteType.ONE_TIME); configuration.setImmediate(true); - configuration.setImmediate(true); TimerDefinition timerDefinition = new TimerDefinition(); + timerDefinition.setStartTimeMs(0L); configuration.setTimerDefinition(timerDefinition); configuration.checkParams(); }