branch-2.1: [fix](job scheduler) specifies both startTime and immediate, it will trigger one fewer task execution #50624 (#50897)

Cherry-picked from #50624

Co-authored-by: zhangdong <zhangdong@selectdb.com>
This commit is contained in:
github-actions[bot]
2025-05-14 22:59:48 +08:00
committed by GitHub
parent 33df5ba180
commit c4d0e1e693
4 changed files with 102 additions and 8 deletions

View File

@ -136,12 +136,6 @@ public class JobExecutionConfiguration {
}
long intervalValue = timerDefinition.getIntervalUnit().getIntervalMs(timerDefinition.getInterval());
long jobStartTimeMs = timerDefinition.getStartTimeMs();
if (isImmediate()) {
jobStartTimeMs += intervalValue;
if (jobStartTimeMs > endTimeMs) {
return delayTimeSeconds;
}
}
return getExecutionDelaySeconds(startTimeMs, endTimeMs, jobStartTimeMs,
intervalValue, currentTimeMs);
}
@ -171,6 +165,10 @@ public class JobExecutionConfiguration {
long firstTriggerTime = windowStartTimeMs + (intervalMs - ((windowStartTimeMs - startTimeMs)
% intervalMs)) % intervalMs;
// should filter result which smaller than start time
if (firstTriggerTime < startTimeMs) {
firstTriggerTime = startTimeMs;
}
if (firstTriggerTime < currentTimeMs) {
// Calculate how many intervals to add to get the largest trigger time < currentTimeMs
long intervalsToAdd = (currentTimeMs - firstTriggerTime) / intervalMs;

View File

@ -53,13 +53,20 @@ public class JobExecutionConfigurationTest {
configuration.setExecuteType(JobExecuteType.RECURRING);
TimerDefinition timerDefinition = new TimerDefinition();
timerDefinition.setStartTimeMs(100000L); // Start time set to 1 second in the future
timerDefinition.setInterval(10L); // Interval set to 10 milliseconds
timerDefinition.setStartTimeMs(700000L); // Start time set to 700 second in the future
timerDefinition.setInterval(10L); // Interval set to 10 minute
timerDefinition.setIntervalUnit(IntervalUnit.MINUTE);
configuration.setTimerDefinition(timerDefinition);
List<Long> delayTimes = configuration.getTriggerDelayTimes(
0L, 0L, 1100000L);
// test should filter result which smaller than start time
Assertions.assertEquals(1, delayTimes.size());
Assertions.assertArrayEquals(new Long[]{700L}, delayTimes.toArray());
timerDefinition.setStartTimeMs(100000L); // Start time set to 100 second in the future
delayTimes = configuration.getTriggerDelayTimes(
0L, 0L, 1100000L);
Assertions.assertEquals(2, delayTimes.size());
Assertions.assertArrayEquals(new Long[]{100L, 700L}, delayTimes.toArray());

View File

@ -0,0 +1,7 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !immediate --
2
-- !deferred --
1

View File

@ -0,0 +1,82 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.time.Instant;
import java.time.ZoneId;
import org.junit.Assert;
suite("test_immediate_starttime_mtmv","mtmv") {
String suiteName = "test_immediate_starttime_mtmv"
String tableName = "${suiteName}_table"
String mvName = "${suiteName}_mv"
sql """drop table if exists `${tableName}`"""
sql """drop materialized view if exists ${mvName};"""
sql """
CREATE TABLE ${tableName}
(
k2 INT,
k3 varchar(32)
)
DISTRIBUTED BY HASH(k2) BUCKETS 2
PROPERTIES (
"replication_num" = "1"
);
"""
sql """
insert into ${tableName} values (2,1),(2,2);
"""
def currentMs = System.currentTimeMillis() + 10000;
def dateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(currentMs), ZoneId.systemDefault());
def formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
def startTime= dateTime.format(formatter);
sql """
CREATE MATERIALIZED VIEW ${mvName}
REFRESH AUTO ON SCHEDULE EVERY 1 DAY STARTS '${startTime}'
DISTRIBUTED BY hash(k2) BUCKETS 2
PROPERTIES (
'replication_num' = '1'
)
AS
SELECT * from ${tableName};
"""
Thread.sleep(20000)
order_qt_immediate "SELECT count(*) from tasks('type'='mv') where MvName='${mvName}'"
sql """drop materialized view if exists ${mvName};"""
currentMs = System.currentTimeMillis() + 10000;
dateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(currentMs), ZoneId.systemDefault());
formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
startTime= dateTime.format(formatter);
sql """
CREATE MATERIALIZED VIEW ${mvName}
build deferred REFRESH AUTO ON SCHEDULE EVERY 1 DAY STARTS '${startTime}'
DISTRIBUTED BY hash(k2) BUCKETS 2
PROPERTIES (
'replication_num' = '1'
)
AS
SELECT * from ${tableName};
"""
Thread.sleep(20000)
order_qt_deferred "SELECT count(*) from tasks('type'='mv') where MvName='${mvName}'"
sql """drop table if exists `${tableName}`"""
sql """drop materialized view if exists ${mvName};"""
}