branch-2.1: [fix](job)Fix millisecond offset issue in time window scheduling trigger time calculation #45176 (#45353)
Cherry-picked from #45176 Co-authored-by: Calvin Kirs <guoqiang@selectdb.com>
This commit is contained in:
committed by
GitHub
parent
667f5e6e6a
commit
95ffc7a65f
@ -257,6 +257,17 @@ public class TimeUtils {
|
||||
return d.getTime();
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts a millisecond timestamp to a second-level timestamp.
|
||||
*
|
||||
* @param timestamp The millisecond timestamp to be converted.
|
||||
* @return The timestamp rounded to the nearest second (in milliseconds).
|
||||
*/
|
||||
public static long convertToSecondTimestamp(long timestamp) {
|
||||
// Divide by 1000 to convert to seconds, then multiply by 1000 to return to milliseconds with no fractional part
|
||||
return (timestamp / 1000) * 1000;
|
||||
}
|
||||
|
||||
public static long timeStringToLong(String timeStr, TimeZone timeZone) {
|
||||
DateTimeFormatter dateFormatTimeZone = getDatetimeFormatWithTimeZone();
|
||||
dateFormatTimeZone.withZone(timeZone.toZoneId());
|
||||
|
||||
@ -155,7 +155,7 @@ public class JobExecutionConfiguration {
|
||||
return 0L;
|
||||
}
|
||||
|
||||
return (startTimeMs - currentTimeMs) / 1000;
|
||||
return (startTimeMs * 1000 / 1000 - currentTimeMs) / 1000;
|
||||
}
|
||||
|
||||
// Returns a list of delay times in seconds for executing the job within the specified window
|
||||
|
||||
@ -17,6 +17,7 @@
|
||||
|
||||
package org.apache.doris.job.base;
|
||||
|
||||
import org.apache.doris.common.util.TimeUtils;
|
||||
import org.apache.doris.job.common.IntervalUnit;
|
||||
|
||||
import com.google.gson.annotations.SerializedName;
|
||||
@ -40,11 +41,15 @@ public class TimerDefinition {
|
||||
|
||||
public void checkParams() {
|
||||
if (null == startTimeMs) {
|
||||
startTimeMs = System.currentTimeMillis() + intervalUnit.getIntervalMs(interval);
|
||||
long currentTimeMs = TimeUtils.convertToSecondTimestamp(System.currentTimeMillis());
|
||||
startTimeMs = currentTimeMs + intervalUnit.getIntervalMs(interval);
|
||||
}
|
||||
if (null != endTimeMs && endTimeMs < startTimeMs) {
|
||||
throw new IllegalArgumentException("endTimeMs must be greater than the start time");
|
||||
}
|
||||
if (null != endTimeMs) {
|
||||
endTimeMs = TimeUtils.convertToSecondTimestamp(endTimeMs);
|
||||
}
|
||||
|
||||
if (null != intervalUnit) {
|
||||
if (null == interval) {
|
||||
|
||||
@ -84,7 +84,8 @@ public class JobScheduler<T extends AbstractJob<?, C>, C> implements Closeable {
|
||||
taskDisruptorGroupManager = new TaskDisruptorGroupManager();
|
||||
taskDisruptorGroupManager.init();
|
||||
this.timerJobDisruptor = taskDisruptorGroupManager.getDispatchDisruptor();
|
||||
latestBatchSchedulerTimerTaskTimeMs = System.currentTimeMillis();
|
||||
long currentTimeMs = TimeUtils.convertToSecondTimestamp(System.currentTimeMillis());
|
||||
latestBatchSchedulerTimerTaskTimeMs = currentTimeMs;
|
||||
batchSchedulerTimerJob();
|
||||
cycleSystemSchedulerTasks();
|
||||
}
|
||||
@ -94,7 +95,8 @@ public class JobScheduler<T extends AbstractJob<?, C>, C> implements Closeable {
|
||||
* Jobs will be re-registered after the task is completed
|
||||
*/
|
||||
private void cycleSystemSchedulerTasks() {
|
||||
log.info("re-register system scheduler timer tasks" + TimeUtils.longToTimeString(System.currentTimeMillis()));
|
||||
log.info("re-register system scheduler timer tasks, time is " + TimeUtils
|
||||
.longToTimeStringWithms(System.currentTimeMillis()));
|
||||
timerTaskScheduler.newTimeout(timeout -> {
|
||||
batchSchedulerTimerJob();
|
||||
cycleSystemSchedulerTasks();
|
||||
@ -144,7 +146,9 @@ public class JobScheduler<T extends AbstractJob<?, C>, C> implements Closeable {
|
||||
|
||||
|
||||
private void cycleTimerJobScheduler(T job, long startTimeWindowMs) {
|
||||
List<Long> delaySeconds = job.getJobConfig().getTriggerDelayTimes(System.currentTimeMillis(),
|
||||
long currentTimeMs = TimeUtils.convertToSecondTimestamp(System.currentTimeMillis());
|
||||
startTimeWindowMs = TimeUtils.convertToSecondTimestamp(startTimeWindowMs);
|
||||
List<Long> delaySeconds = job.getJobConfig().getTriggerDelayTimes(currentTimeMs,
|
||||
startTimeWindowMs, latestBatchSchedulerTimerTaskTimeMs);
|
||||
if (CollectionUtils.isEmpty(delaySeconds)) {
|
||||
log.info("skip job {} scheduler timer job, delay seconds is empty", job.getJobName());
|
||||
@ -190,7 +194,8 @@ public class JobScheduler<T extends AbstractJob<?, C>, C> implements Closeable {
|
||||
|
||||
long lastTimeWindowMs = latestBatchSchedulerTimerTaskTimeMs;
|
||||
if (latestBatchSchedulerTimerTaskTimeMs < System.currentTimeMillis()) {
|
||||
this.latestBatchSchedulerTimerTaskTimeMs = System.currentTimeMillis();
|
||||
long currentTimeMs = TimeUtils.convertToSecondTimestamp(System.currentTimeMillis());
|
||||
this.latestBatchSchedulerTimerTaskTimeMs = currentTimeMs;
|
||||
}
|
||||
this.latestBatchSchedulerTimerTaskTimeMs += BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS;
|
||||
log.info("execute timer job ids within last ten minutes window, last time window is {}",
|
||||
|
||||
@ -75,7 +75,14 @@ public class JobExecutionConfigurationTest {
|
||||
timerDefinition.setInterval(1L);
|
||||
Assertions.assertEquals(3, configuration.getTriggerDelayTimes(second * 5 + 10L, second * 3, second * 7).size());
|
||||
Assertions.assertEquals(3, configuration.getTriggerDelayTimes(second * 5, second * 5, second * 7).size());
|
||||
timerDefinition.setStartTimeMs(1672531200000L);
|
||||
timerDefinition.setIntervalUnit(IntervalUnit.MINUTE);
|
||||
timerDefinition.setInterval(1L);
|
||||
Assertions.assertArrayEquals(new Long[]{0L}, configuration.getTriggerDelayTimes(1672531800000L, 1672531200000L, 1672531800000L).toArray());
|
||||
|
||||
List<Long> expectDelayTimes = configuration.getTriggerDelayTimes(1672531200000L, 1672531200000L, 1672531850000L);
|
||||
|
||||
Assertions.assertArrayEquals(new Long[]{0L, 60L, 120L, 180L, 240L, 300L, 360L, 420L, 480L, 540L, 600L}, expectDelayTimes.toArray());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
Reference in New Issue
Block a user