diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java index 80e8b0cf5e..629c88c19b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java @@ -136,12 +136,6 @@ public class JobExecutionConfiguration { } long intervalValue = timerDefinition.getIntervalUnit().getIntervalMs(timerDefinition.getInterval()); long jobStartTimeMs = timerDefinition.getStartTimeMs(); - if (isImmediate()) { - jobStartTimeMs += intervalValue; - if (jobStartTimeMs > endTimeMs) { - return delayTimeSeconds; - } - } return getExecutionDelaySeconds(startTimeMs, endTimeMs, jobStartTimeMs, intervalValue, currentTimeMs); } @@ -171,6 +165,10 @@ public class JobExecutionConfiguration { long firstTriggerTime = windowStartTimeMs + (intervalMs - ((windowStartTimeMs - startTimeMs) % intervalMs)) % intervalMs; + // should filter result which smaller than start time + if (firstTriggerTime < startTimeMs) { + firstTriggerTime = startTimeMs; + } if (firstTriggerTime < currentTimeMs) { // Calculate how many intervals to add to get the largest trigger time < currentTimeMs long intervalsToAdd = (currentTimeMs - firstTriggerTime) / intervalMs; diff --git a/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java b/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java index fb0600b281..8196c00095 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java @@ -53,13 +53,20 @@ public class JobExecutionConfigurationTest { configuration.setExecuteType(JobExecuteType.RECURRING); TimerDefinition timerDefinition = new TimerDefinition(); - timerDefinition.setStartTimeMs(100000L); // Start time set to 1 second in the future - timerDefinition.setInterval(10L); // Interval set to 10 milliseconds + timerDefinition.setStartTimeMs(700000L); // Start time set to 700 second in the future + timerDefinition.setInterval(10L); // Interval set to 10 minute timerDefinition.setIntervalUnit(IntervalUnit.MINUTE); configuration.setTimerDefinition(timerDefinition); List delayTimes = configuration.getTriggerDelayTimes( 0L, 0L, 1100000L); + // test should filter result which smaller than start time + Assertions.assertEquals(1, delayTimes.size()); + Assertions.assertArrayEquals(new Long[]{700L}, delayTimes.toArray()); + + timerDefinition.setStartTimeMs(100000L); // Start time set to 100 second in the future + delayTimes = configuration.getTriggerDelayTimes( + 0L, 0L, 1100000L); Assertions.assertEquals(2, delayTimes.size()); Assertions.assertArrayEquals(new Long[]{100L, 700L}, delayTimes.toArray()); diff --git a/regression-test/data/mtmv_p0/test_immediate_starttime_mtmv.out b/regression-test/data/mtmv_p0/test_immediate_starttime_mtmv.out new file mode 100644 index 0000000000..79ac76f677 --- /dev/null +++ b/regression-test/data/mtmv_p0/test_immediate_starttime_mtmv.out @@ -0,0 +1,7 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !immediate -- +2 + +-- !deferred -- +1 + diff --git a/regression-test/suites/mtmv_p0/test_immediate_starttime_mtmv.groovy b/regression-test/suites/mtmv_p0/test_immediate_starttime_mtmv.groovy new file mode 100644 index 0000000000..8732eb20fd --- /dev/null +++ b/regression-test/suites/mtmv_p0/test_immediate_starttime_mtmv.groovy @@ -0,0 +1,82 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.time.Instant; +import java.time.ZoneId; +import org.junit.Assert; + +suite("test_immediate_starttime_mtmv","mtmv") { + String suiteName = "test_immediate_starttime_mtmv" + String tableName = "${suiteName}_table" + String mvName = "${suiteName}_mv" + + sql """drop table if exists `${tableName}`""" + sql """drop materialized view if exists ${mvName};""" + + sql """ + CREATE TABLE ${tableName} + ( + k2 INT, + k3 varchar(32) + ) + DISTRIBUTED BY HASH(k2) BUCKETS 2 + PROPERTIES ( + "replication_num" = "1" + ); + """ + sql """ + insert into ${tableName} values (2,1),(2,2); + """ + def currentMs = System.currentTimeMillis() + 10000; + def dateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(currentMs), ZoneId.systemDefault()); + def formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + def startTime= dateTime.format(formatter); + sql """ + CREATE MATERIALIZED VIEW ${mvName} + REFRESH AUTO ON SCHEDULE EVERY 1 DAY STARTS '${startTime}' + DISTRIBUTED BY hash(k2) BUCKETS 2 + PROPERTIES ( + 'replication_num' = '1' + ) + AS + SELECT * from ${tableName}; + """ + Thread.sleep(20000) + order_qt_immediate "SELECT count(*) from tasks('type'='mv') where MvName='${mvName}'" + + sql """drop materialized view if exists ${mvName};""" + currentMs = System.currentTimeMillis() + 10000; + dateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(currentMs), ZoneId.systemDefault()); + formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + startTime= dateTime.format(formatter); + sql """ + CREATE MATERIALIZED VIEW ${mvName} + build deferred REFRESH AUTO ON SCHEDULE EVERY 1 DAY STARTS '${startTime}' + DISTRIBUTED BY hash(k2) BUCKETS 2 + PROPERTIES ( + 'replication_num' = '1' + ) + AS + SELECT * from ${tableName}; + """ + Thread.sleep(20000) + order_qt_deferred "SELECT count(*) from tasks('type'='mv') where MvName='${mvName}'" + sql """drop table if exists `${tableName}`""" + sql """drop materialized view if exists ${mvName};""" +}