diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 7e9526d8da..51b4d4073c 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1812,6 +1812,9 @@ public class Config extends ConfigBase { @ConfField(mutable = true, masterOnly = true) public static long scheduler_mtmv_task_expired = 24 * 60 * 60L; // 1day + @ConfField(mutable = true, masterOnly = true) + public static boolean keep_scheduler_mtmv_task_when_job_deleted = false; + /** * The candidate of the backend node for federation query such as hive table and es table query. * If the backend of computation role is less than this value, it will acquire some mix backend. diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java index 5bb3323d82..3ea07a3579 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java @@ -29,6 +29,7 @@ import org.apache.doris.analysis.CreateMultiTableMaterializedViewStmt; import org.apache.doris.analysis.DropMaterializedViewStmt; import org.apache.doris.analysis.DropPartitionClause; import org.apache.doris.analysis.DropTableStmt; +import org.apache.doris.analysis.MVRefreshInfo.RefreshMethod; import org.apache.doris.analysis.ModifyColumnCommentClause; import org.apache.doris.analysis.ModifyDistributionClause; import org.apache.doris.analysis.ModifyEngineClause; @@ -65,6 +66,7 @@ import org.apache.doris.common.UserException; import org.apache.doris.common.util.DynamicPartitionUtil; import org.apache.doris.common.util.MetaLockUtils; import org.apache.doris.common.util.PropertyAnalyzer; +import org.apache.doris.mtmv.MTMVUtils.TaskSubmitStatus; import org.apache.doris.persist.AlterViewInfo; import org.apache.doris.persist.BatchModifyPartitionsInfo; import org.apache.doris.persist.ModifyCommentOperationLog; @@ -157,7 +159,15 @@ public class Alter { } public void processRefreshMaterializedView(RefreshMaterializedViewStmt stmt) throws DdlException { - throw new DdlException("Refresh materialized view is not implemented: " + stmt.toSql()); + if (stmt.getRefreshMethod() != RefreshMethod.COMPLETE) { + throw new DdlException("Now only support REFRESH COMPLETE."); + } + String db = stmt.getMvName().getDb(); + String tbl = stmt.getMvName().getTbl(); + TaskSubmitStatus status = Env.getCurrentEnv().getMTMVJobManager().refreshMTMVTask(db, tbl); + if (status != TaskSubmitStatus.SUBMITTED) { + throw new DdlException("Refresh MaterializedView with " + status.toString()); + } } private boolean processAlterOlapTable(AlterTableStmt stmt, OlapTable olapTable, List alterClauses, @@ -497,6 +507,8 @@ public class Alter { } public void processAlterMaterializedView(AlterMaterializedViewStmt stmt) throws UserException { + TableName tbl = stmt.getTable(); + Env.getCurrentEnv().getInternalCatalog().getDb(tbl.getDb()); throw new DdlException("ALTER MATERIALIZED VIEW is not implemented: " + stmt.toSql()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterMaterializedViewStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterMaterializedViewStmt.java index a4331d9f5b..a16860cfac 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterMaterializedViewStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterMaterializedViewStmt.java @@ -33,6 +33,14 @@ public class AlterMaterializedViewStmt extends DdlStmt { this.info = info; } + public TableName getTable() { + return mvName; + } + + public MVRefreshInfo getRefreshInfo() { + return info; + } + @Override public void analyze(Analyzer analyzer) throws AnalysisException { mvName.analyze(analyzer); diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java index fd9f55ce10..13941c49a7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java @@ -239,6 +239,16 @@ public class MTMVJobManager { return taskManager.killTask(job.getId(), clearPending); } + public MTMVUtils.TaskSubmitStatus refreshMTMVTask(String dbName, String mvName) throws DdlException { + for (String jobName : nameToJobMap.keySet()) { + MTMVJob job = nameToJobMap.get(jobName); + if (job.getMVName().equals(mvName) && job.getDBName().equals(dbName)) { + return submitJobTask(jobName); + } + } + throw new DdlException("No job find for the MaterializedView " + dbName + "." + mvName + " ."); + } + public MTMVUtils.TaskSubmitStatus submitJobTask(String jobName) { return submitJobTask(jobName, new MTMVTaskExecuteParams()); } @@ -292,6 +302,9 @@ public class MTMVJobManager { periodFutureMap.remove(job.getId()); } killJobTask(job.getName(), true); + if (!Config.keep_scheduler_mtmv_task_when_job_deleted) { + taskManager.clearTasksByJobName(job.getName(), isReplay); + } idToJobMap.remove(job.getId()); nameToJobMap.remove(job.getName()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java index 55508b18c6..79f15ae4fc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java @@ -399,6 +399,25 @@ public class MTMVTaskManager { } } + public void clearTasksByJobName(String jobName, boolean isReplay) { + List clearTasks = Lists.newArrayList(); + + if (!tryLock()) { + return; + } + try { + Deque taskHistory = getAllHistory(); + for (MTMVTask task : taskHistory) { + if (task.getJobName().equals(jobName)) { + clearTasks.add(task.getTaskId()); + } + } + } finally { + unlock(); + } + dropTasks(clearTasks, isReplay); + } + public void removeExpiredTasks() { long currentTime = MTMVUtils.getNowTimeStamp(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtils.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtils.java index d421d2f50a..ffaf5af888 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtils.java @@ -63,7 +63,7 @@ public class MTMVUtils { PENDING, RUNNING, FAILURE, SUCCESS, } - enum TaskSubmitStatus { + public enum TaskSubmitStatus { SUBMITTED, REJECTED, FAILED } diff --git a/regression-test/suites/mtmv_p0/test_create_mtmv.groovy b/regression-test/suites/mtmv_p0/test_create_mtmv.groovy index 61534f7758..c506519912 100644 --- a/regression-test/suites/mtmv_p0/test_create_mtmv.groovy +++ b/regression-test/suites/mtmv_p0/test_create_mtmv.groovy @@ -66,10 +66,12 @@ suite("test_create_mtmv") { def index = show_task_meta.indexOf(['State', 'CHAR']) def query = "SHOW MTMV TASK ON ${mvName}" def show_task_result - def state + def state = "PENDING" do { show_task_result = sql "${query}" - state = show_task_result.last().get(index) + if (!show_task_result.isEmpty()) { + state = show_task_result.last().get(index) + } println "The state of ${query} is ${state}" Thread.sleep(1000); } while (state.equals('PENDING') || state.equals('RUNNING')) @@ -93,9 +95,12 @@ suite("test_create_mtmv") { SELECT ${tableName}.username, ${tableNamePv}.pv FROM ${tableName}, ${tableNamePv} WHERE ${tableName}.id=${tableNamePv}.id; """ // wait task to be finished to avoid task leak in suite. + state = "PENDING" do { show_task_result = sql "${query}" - state = show_task_result.last().get(index) + if (!show_task_result.isEmpty()) { + state = show_task_result.last().get(index) + } println "The state of ${query} is ${state}" Thread.sleep(1000); } while (state.equals('PENDING') || state.equals('RUNNING')) @@ -103,6 +108,23 @@ suite("test_create_mtmv") { def show_job_result = sql "SHOW MTMV JOB ON ${mvName}" assertEquals 1, show_job_result.size() + // test REFRESH make sure only define one mv and already run a task. + sql """ + REFRESH MATERIALIZED VIEW ${mvName} COMPLETE + """ + state = "PENDING" + do { + show_task_result = sql "${query}" + if (!show_task_result.isEmpty()) { + state = show_task_result.last().get(index) + } + println "The state of ${query} is ${state}" + Thread.sleep(1000); + } while (state.equals('PENDING') || state.equals('RUNNING')) + + assertEquals 'SUCCESS', state, show_task_result.last().toString() + assertEquals 2, show_task_result.size() + sql """ DROP MATERIALIZED VIEW ${mvName} """