diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index c37e810d5d..7446ca6bae 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -854,8 +854,6 @@ public class Env { // If not using bdb, we need to notify the FE type transfer manually. notifyNewFETypeTransfer(FrontendNodeType.MASTER); } - // 7. start mtmv jobManager - mtmvJobManager.start(); } // wait until FE is ready. @@ -1412,7 +1410,8 @@ public class Env { if (Config.enable_hms_events_incremental_sync) { metastoreEventsProcessor.start(); } - + // start mtmv jobManager + mtmvJobManager.start(); } // start threads that should running on all FE @@ -1462,6 +1461,9 @@ public class Env { startNonMasterDaemonThreads(); MetricRepo.init(); + + // stop mtmv scheduler + mtmvJobManager.stop(); } // Set global variable 'lower_case_table_names' only when the cluster is initialized. diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index 4aa4ce71c3..c8214ec7d4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -956,7 +956,7 @@ public class InternalCatalog implements CatalogIf { if (table instanceof MaterializedView && Config.enable_mtmv_scheduler_framework) { List dropIds = Env.getCurrentEnv().getMTMVJobManager().showJobs(db.getFullName(), table.getName()) .stream().map(MTMVJob::getId).collect(Collectors.toList()); - Env.getCurrentEnv().getMTMVJobManager().dropJobs(dropIds, false); + Env.getCurrentEnv().getMTMVJobManager().dropJobs(dropIds, isReplay); LOG.info("Drop related {} mv job.", dropIds.size()); } LOG.info("finished dropping table[{}] in db[{}]", table.getName(), db.getFullName()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java index 070eec0725..0c0e49f617 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java +++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java @@ -744,7 +744,7 @@ public class JournalEntity implements Writable { isRead = true; break; } - case OperationType.OP_ALTER_MTMV_JOB: { + case OperationType.OP_CHANGE_MTMV_JOB: { data = ChangeMTMVJob.read(in); isRead = true; break; @@ -759,7 +759,7 @@ public class JournalEntity implements Writable { isRead = true; break; } - case OperationType.OP_ALTER_MTMV_TASK: { + case OperationType.OP_CHANGE_MTMV_TASK: { data = ChangeMTMVTask.read(in); isRead = true; break; diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobFactory.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobFactory.java index 5af572304e..aef9b92386 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobFactory.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobFactory.java @@ -57,14 +57,18 @@ public class MTMVJobFactory { public static List buildJob(MaterializedView materializedView, String dbName) { List jobs = new ArrayList<>(); - if (materializedView.getBuildMode() == BuildMode.IMMEDIATE) { - jobs.add(genOnceJob(materializedView, dbName)); - } MVRefreshTriggerInfo triggerInfo = materializedView.getRefreshInfo().getTriggerInfo(); + boolean isRunPeriodJobImmediate = false; if (triggerInfo != null && triggerInfo.getRefreshTrigger() == RefreshTrigger.INTERVAL) { - jobs.add(genPeriodicalJob(materializedView, dbName)); + MTMVJob job = genPeriodicalJob(materializedView, dbName); + isRunPeriodJobImmediate = MTMVUtils.getDelaySeconds(job) == 0; + jobs.add(job); } + // if the PeriodicalJob run immediate since an early start time, don't run the immediate build. + if (!isRunPeriodJobImmediate && materializedView.getBuildMode() == BuildMode.IMMEDIATE) { + jobs.add(genOnceJob(materializedView, dbName)); + } return jobs; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java index fe77ea4015..fd9f55ce10 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java @@ -62,9 +62,9 @@ public class MTMVJobManager { private final MTMVTaskManager taskManager; - private final ScheduledExecutorService periodScheduler = Executors.newScheduledThreadPool(1); + private ScheduledExecutorService periodScheduler = Executors.newScheduledThreadPool(1); - private final ScheduledExecutorService cleanerScheduler = Executors.newScheduledThreadPool(1); + private ScheduledExecutorService cleanerScheduler = Executors.newScheduledThreadPool(1); private final ReentrantLock reentrantLock; @@ -82,8 +82,17 @@ public class MTMVJobManager { if (isStarted.compareAndSet(false, true)) { taskManager.clearUnfinishedTasks(); + // check the scheduler before using it + // since it may be shutdown when master change to follower without process shutdown. + if (periodScheduler.isShutdown()) { + periodScheduler = Executors.newScheduledThreadPool(1); + } + registerJobs(); + if (cleanerScheduler.isShutdown()) { + cleanerScheduler = Executors.newScheduledThreadPool(1); + } cleanerScheduler.scheduleAtFixedRate(() -> { if (!Env.getCurrentEnv().isMaster()) { return; @@ -99,29 +108,43 @@ public class MTMVJobManager { } finally { unlock(); } - }, 0, 1, TimeUnit.DAYS); + }, 0, 1, TimeUnit.MINUTES); taskManager.startTaskScheduler(); } } + public void stop() { + if (isStarted.compareAndSet(true, false)) { + periodScheduler.shutdown(); + cleanerScheduler.shutdown(); + taskManager.stopTaskScheduler(); + } + } + private void registerJobs() { + int num = nameToJobMap.size(); + int periodNum = 0; + int onceNum = 0; for (MTMVJob job : nameToJobMap.values()) { - if (job.getState() != JobState.ACTIVE) { + if (!job.getState().equals(JobState.ACTIVE)) { continue; } if (job.getTriggerMode() == TriggerMode.PERIODICAL) { JobSchedule schedule = job.getSchedule(); ScheduledFuture future = periodScheduler.scheduleAtFixedRate(() -> submitJobTask(job.getName()), - MTMVUtils.getDelaySeconds(job), schedule.getPeriod(), schedule.getTimeUnit()); + MTMVUtils.getDelaySeconds(job), schedule.getSecondPeriod(), TimeUnit.SECONDS); periodFutureMap.put(job.getId(), future); + periodNum++; } else if (job.getTriggerMode() == TriggerMode.ONCE) { if (job.getRetryPolicy() == TaskRetryPolicy.ALWAYS || job.getRetryPolicy() == TaskRetryPolicy.TIMES) { MTMVTaskExecuteParams executeOption = new MTMVTaskExecuteParams(); submitJobTask(job.getName(), executeOption); + onceNum++; } } } + LOG.info("Register {} period jobs and {} once jobs in the total {} jobs.", periodNum, onceNum, num); } public void createJob(MTMVJob job, boolean isReplay) throws DdlException { @@ -146,27 +169,33 @@ public class MTMVJobManager { idToJobMap.put(job.getId(), job); if (!isReplay) { // log job before submit any task. - Env.getCurrentEnv().getEditLog().logCreateScheduleJob(job); + Env.getCurrentEnv().getEditLog().logCreateMTMVJob(job); ScheduledFuture future = periodScheduler.scheduleAtFixedRate(() -> submitJobTask(job.getName()), - MTMVUtils.getDelaySeconds(job), schedule.getPeriod(), schedule.getTimeUnit()); + MTMVUtils.getDelaySeconds(job), schedule.getSecondPeriod(), TimeUnit.SECONDS); periodFutureMap.put(job.getId(), future); } } else if (job.getTriggerMode() == TriggerMode.ONCE) { - job.setState(JobState.ACTIVE); - job.setExpireTime(MTMVUtils.getNowTimeStamp() + Config.scheduler_mtmv_job_expired); + // only change once job state from unknown to active. if job is completed, only put it in map + if (job.getState() == JobState.UNKNOWN) { + job.setState(JobState.ACTIVE); + job.setExpireTime(MTMVUtils.getNowTimeStamp() + Config.scheduler_mtmv_job_expired); + } nameToJobMap.put(job.getName(), job); idToJobMap.put(job.getId(), job); if (!isReplay) { - Env.getCurrentEnv().getEditLog().logCreateScheduleJob(job); + Env.getCurrentEnv().getEditLog().logCreateMTMVJob(job); MTMVTaskExecuteParams executeOption = new MTMVTaskExecuteParams(); submitJobTask(job.getName(), executeOption); } } else if (job.getTriggerMode() == TriggerMode.MANUAL) { - job.setState(JobState.ACTIVE); + // only change once job state from unknown to active. if job is completed, only put it in map + if (job.getState() == JobState.UNKNOWN) { + job.setState(JobState.ACTIVE); + } nameToJobMap.put(job.getName(), job); idToJobMap.put(job.getId(), job); if (!isReplay) { - Env.getCurrentEnv().getEditLog().logCreateScheduleJob(job); + Env.getCurrentEnv().getEditLog().logCreateMTMVJob(job); } } else { throw new DdlException("Unsupported trigger mode for multi-table mv."); @@ -235,7 +264,7 @@ public class MTMVJobManager { job.setState(changeJob.getToStatus()); job.setLastModifyTime(changeJob.getLastModifyTime()); if (!isReplay) { - Env.getCurrentEnv().getEditLog().logChangeScheduleJob(changeJob); + Env.getCurrentEnv().getEditLog().logChangeMTMVJob(changeJob); } } finally { unlock(); @@ -268,7 +297,7 @@ public class MTMVJobManager { } if (!isReplay) { - Env.getCurrentEnv().getEditLog().logDropScheduleJob(jobIds); + Env.getCurrentEnv().getEditLog().logDropMTMVJob(jobIds); } } finally { unlock(); @@ -343,11 +372,7 @@ public class MTMVJobManager { } public void replayDropJobTasks(List taskIds) { - Map index = Maps.newHashMapWithExpectedSize(taskIds.size()); - for (String taskId : taskIds) { - index.put(taskId, null); - } - taskManager.getAllHistory().removeIf(runStatus -> index.containsKey(runStatus.getTaskId())); + taskManager.dropTasks(taskIds, true); } public void removeExpiredJobs() { @@ -382,7 +407,7 @@ public class MTMVJobManager { unlock(); } - dropJobs(jobIdsToDelete, true); + dropJobs(jobIdsToDelete, false); } public MTMVJob getJob(String jobName) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskExecutor.java index 7fddfd91a4..da1685f3ac 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskExecutor.java @@ -125,6 +125,10 @@ public class MTMVTaskExecutor implements Comparable { return task; } + public void setTask(MTMVTask task) { + this.task = task; + } + public MTMVTask initTask(String taskId, Long createTime) { MTMVTask task = new MTMVTask(); task.setTaskId(taskId); diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskExecutorPool.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskExecutorPool.java index 419412b3c3..f68f68cbaf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskExecutorPool.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskExecutorPool.java @@ -81,7 +81,7 @@ public class MTMVTaskExecutorPool { ChangeMTMVTask changeTask = new ChangeMTMVTask(taskExecutor.getJob().getId(), task, TaskState.RUNNING, task.getState()); - Env.getCurrentEnv().getEditLog().logAlterScheduleTask(changeTask); + Env.getCurrentEnv().getEditLog().logChangeMTMVTask(changeTask); }); taskExecutor.setFuture(future); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java index 6cb90c262d..55508b18c6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java @@ -37,10 +37,12 @@ import org.apache.logging.log4j.Logger; import org.jetbrains.annotations.Nullable; import java.util.Deque; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Queue; +import java.util.Set; import java.util.UUID; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -67,7 +69,7 @@ public class MTMVTaskManager { // keep track of all the completed tasks private final Deque historyQueue = Queues.newLinkedBlockingDeque(); - private final ScheduledExecutorService taskScheduler = Executors.newScheduledThreadPool(1); + private ScheduledExecutorService taskScheduler = Executors.newScheduledThreadPool(1); private final MTMVJobManager mtmvJobManager; @@ -76,6 +78,9 @@ public class MTMVTaskManager { } public void startTaskScheduler() { + if (taskScheduler.isShutdown()) { + taskScheduler = Executors.newScheduledThreadPool(1); + } taskScheduler.scheduleAtFixedRate(() -> { if (!tryLock()) { return; @@ -91,6 +96,10 @@ public class MTMVTaskManager { }, 0, 1, TimeUnit.SECONDS); } + public void stopTaskScheduler() { + taskScheduler.shutdown(); + } + public MTMVUtils.TaskSubmitStatus submitTask(MTMVTaskExecutor taskExecutor, MTMVTaskExecuteParams params) { // duplicate submit if (taskExecutor.getTask() != null) { @@ -113,7 +122,8 @@ public class MTMVTaskManager { String taskId = UUID.randomUUID().toString(); MTMVTask task = taskExecutor.initTask(taskId, MTMVUtils.getNowTimeStamp()); task.setPriority(params.getPriority()); - Env.getCurrentEnv().getEditLog().logCreateScheduleTask(task); + LOG.info("Submit a mtmv task with id: {} of the job {}.", taskId, taskExecutor.getJob().getName()); + Env.getCurrentEnv().getEditLog().logCreateMTMVTask(task); arrangeToPendingTask(taskExecutor); return MTMVUtils.TaskSubmitStatus.SUBMITTED; } @@ -228,7 +238,7 @@ public class MTMVTaskManager { private void changeAndLogTaskStatus(long jobId, MTMVTask task, TaskState fromStatus, TaskState toStatus) { ChangeMTMVTask changeTask = new ChangeMTMVTask(jobId, task, fromStatus, toStatus); - Env.getCurrentEnv().getEditLog().logAlterScheduleTask(changeTask); + Env.getCurrentEnv().getEditLog().logChangeMTMVTask(changeTask); } public boolean tryLock() { @@ -322,7 +332,7 @@ public class MTMVTaskManager { return; } MTMVTaskExecutor taskExecutor = MTMVUtils.buildTask(job); - taskExecutor.initTask(task.getTaskId(), task.getCreateTime()); + taskExecutor.setTask(task); arrangeToPendingTask(taskExecutor); break; case RUNNING: @@ -399,19 +409,32 @@ public class MTMVTaskManager { } try { Deque taskHistory = getAllHistory(); - Iterator iterator = taskHistory.iterator(); - while (iterator.hasNext()) { - MTMVTask task = iterator.next(); + for (MTMVTask task : taskHistory) { long expireTime = task.getExpireTime(); if (currentTime > expireTime) { historyToDelete.add(task.getTaskId()); - iterator.remove(); } } } finally { unlock(); } - LOG.info("remove task history:{}", historyToDelete); + dropTasks(historyToDelete, false); + } + + public void dropTasks(List taskIds, boolean isReplay) { + if (!tryLock()) { + return; + } + try { + Set taskSet = new HashSet<>(taskIds); + getAllHistory().removeIf(mtmvTask -> taskSet.contains(mtmvTask.getTaskId())); + if (!isReplay) { + Env.getCurrentEnv().getEditLog().logDropMTMVTasks(taskIds); + } + } finally { + unlock(); + } + LOG.info("drop task history:{}", taskIds); } public void clearUnfinishedTasks() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtils.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtils.java index 874eb0510c..d421d2f50a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtils.java @@ -85,7 +85,6 @@ public class MTMVUtils { return getDelaySeconds(job, LocalDateTime.now()); } - // this method only for test public static long getDelaySeconds(MTMVJob job, LocalDateTime now) { long lastModifyTime = job.getLastModifyTime(); long nextTime = 0; @@ -117,10 +116,10 @@ public class MTMVUtils { switch (strTimeUnit.toUpperCase()) { case "SECOND": return TimeUnit.SECONDS; + case "MINUTE": + return TimeUnit.MINUTES; case "HOUR": return TimeUnit.HOURS; - case "DAY": - return TimeUnit.DAYS; default: return TimeUnit.DAYS; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/metadata/MTMVJob.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/metadata/MTMVJob.java index a8dd62b4a9..8a348b7f99 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/metadata/MTMVJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/metadata/MTMVJob.java @@ -259,8 +259,8 @@ public class MTMVJob implements Writable, Comparable { } public String toString() { - return " (START " + LocalDateTime.ofInstant(Instant.ofEpochSecond(startTime), ZoneId.systemDefault()) - + " EVERY(" + period + " " + timeUnit + "))"; + return "START " + LocalDateTime.ofInstant(Instant.ofEpochSecond(startTime), ZoneId.systemDefault()) + + " EVERY(" + period + " " + timeUnit + ")"; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java index 03ef40086d..f6a71b5377 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java @@ -881,7 +881,7 @@ public class EditLog { env.getMTMVJobManager().replayCreateJob(job); break; } - case OperationType.OP_ALTER_MTMV_JOB: { + case OperationType.OP_CHANGE_MTMV_JOB: { final ChangeMTMVJob changeJob = (ChangeMTMVJob) journal.getData(); env.getMTMVJobManager().replayUpdateJob(changeJob); break; @@ -896,7 +896,7 @@ public class EditLog { env.getMTMVJobManager().replayCreateJobTask(task); break; } - case OperationType.OP_ALTER_MTMV_TASK: { + case OperationType.OP_CHANGE_MTMV_TASK: { final ChangeMTMVTask changeTask = (ChangeMTMVTask) journal.getData(); env.getMTMVJobManager().replayUpdateTask(changeTask); break; @@ -1613,27 +1613,27 @@ public class EditLog { logEdit(id, log); } - public void logCreateScheduleJob(MTMVJob job) { + public void logCreateMTMVJob(MTMVJob job) { logEdit(OperationType.OP_CREATE_MTMV_JOB, job); } - public void logDropScheduleJob(List jobIds) { + public void logDropMTMVJob(List jobIds) { logEdit(OperationType.OP_DROP_MTMV_JOB, new DropMTMVJob(jobIds)); } - public void logChangeScheduleJob(ChangeMTMVJob changeJob) { - logEdit(OperationType.OP_ALTER_MTMV_JOB, changeJob); + public void logChangeMTMVJob(ChangeMTMVJob changeJob) { + logEdit(OperationType.OP_CHANGE_MTMV_JOB, changeJob); } - public void logCreateScheduleTask(MTMVTask task) { + public void logCreateMTMVTask(MTMVTask task) { logEdit(OperationType.OP_CREATE_MTMV_TASK, task); } - public void logAlterScheduleTask(ChangeMTMVTask changeTaskRecord) { - logEdit(OperationType.OP_ALTER_MTMV_TASK, changeTaskRecord); + public void logChangeMTMVTask(ChangeMTMVTask changeTaskRecord) { + logEdit(OperationType.OP_CHANGE_MTMV_TASK, changeTaskRecord); } - public void logAlterScheduleTask(List taskIds) { + public void logDropMTMVTasks(List taskIds) { logEdit(OperationType.OP_DROP_MTMV_TASK, new DropMTMVTask(taskIds)); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java index 813b4c4c49..73ead272e4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java @@ -251,11 +251,11 @@ public class OperationType { // scheduler job and task 330-350 public static final short OP_CREATE_MTMV_JOB = 330; public static final short OP_DROP_MTMV_JOB = 331; - public static final short OP_ALTER_MTMV_JOB = 332; + public static final short OP_CHANGE_MTMV_JOB = 332; public static final short OP_CREATE_MTMV_TASK = 340; public static final short OP_DROP_MTMV_TASK = 341; - public static final short OP_ALTER_MTMV_TASK = 342; + public static final short OP_CHANGE_MTMV_TASK = 342; public static final short OP_DROP_EXTERNAL_TABLE = 350; public static final short OP_DROP_EXTERNAL_DB = 351; diff --git a/regression-test/suites/mtmv_p0/test_create_mtmv.groovy b/regression-test/suites/mtmv_p0/test_create_mtmv.groovy index 6d3966614e..61534f7758 100644 --- a/regression-test/suites/mtmv_p0/test_create_mtmv.groovy +++ b/regression-test/suites/mtmv_p0/test_create_mtmv.groovy @@ -16,16 +16,12 @@ // under the License. suite("test_create_mtmv") { - def dbName = "db_mtmv" def tableName = "t_user" def tableNamePv = "t_user_pv" def mvName = "multi_mv" sql """ ADMIN SET FRONTEND CONFIG("enable_mtmv_scheduler_framework"="true"); """ - sql "DROP DATABASE IF EXISTS ${dbName};" - sql "CREATE DATABASE ${dbName};" - sql "USE ${dbName};" sql """ CREATE TABLE IF NOT EXISTS `${tableName}` ( @@ -42,7 +38,7 @@ suite("test_create_mtmv") { INSERT INTO ${tableName} VALUES("2022-10-26",1,"clz"),("2022-10-28",2,"zhangsang"),("2022-10-29",3,"lisi"); """ sql """ - create table ${tableNamePv}( + create table IF NOT EXISTS ${tableNamePv}( event_day DATE, id BIGINT, pv BIGINT @@ -66,9 +62,9 @@ suite("test_create_mtmv") { SELECT ${tableName}.username, ${tableNamePv}.pv FROM ${tableName}, ${tableNamePv} WHERE ${tableName}.id=${tableNamePv}.id; """ - def show_task_meta = sql_meta "SHOW MTMV TASK FROM ${dbName}" + def show_task_meta = sql_meta "SHOW MTMV TASK ON ${mvName}" def index = show_task_meta.indexOf(['State', 'CHAR']) - def query = "SHOW MTMV TASK FROM ${dbName}" + def query = "SHOW MTMV TASK ON ${mvName}" def show_task_result def state do { @@ -80,5 +76,35 @@ suite("test_create_mtmv") { assertEquals 'SUCCESS', state, show_task_result.last().toString() order_qt_select "SELECT * FROM ${mvName}" + + sql """ + DROP MATERIALIZED VIEW ${mvName} + """ + + // test only one job created when build IMMEDIATE and start time is before now. + sql """ + CREATE MATERIALIZED VIEW ${mvName} + BUILD IMMEDIATE REFRESH COMPLETE + start with "2022-11-03 00:00:00" next 1 DAY + KEY(username) + DISTRIBUTED BY HASH (username) buckets 1 + PROPERTIES ('replication_num' = '1') + AS + SELECT ${tableName}.username, ${tableNamePv}.pv FROM ${tableName}, ${tableNamePv} WHERE ${tableName}.id=${tableNamePv}.id; + """ + // wait task to be finished to avoid task leak in suite. + do { + show_task_result = sql "${query}" + state = show_task_result.last().get(index) + println "The state of ${query} is ${state}" + Thread.sleep(1000); + } while (state.equals('PENDING') || state.equals('RUNNING')) + + def show_job_result = sql "SHOW MTMV JOB ON ${mvName}" + assertEquals 1, show_job_result.size() + + sql """ + DROP MATERIALIZED VIEW ${mvName} + """ }