diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java index d9b7b9bd12..e47021f0cf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java @@ -137,29 +137,17 @@ public class Alter { if (!stmt.isForMTMV() && stmt.getTableName() == null) { throw new DdlException("Drop materialized view without table name is unsupported : " + stmt.toSql()); } - TableName tableName = !stmt.isForMTMV() ? stmt.getTableName() : stmt.getMTMVName(); - Database db; - OlapTable olapTable; - if (stmt.isIfExists()) { - try { - String dbName = tableName.getDb(); - db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbName); - String name = tableName.getTbl(); - olapTable = (OlapTable) db.getTableOrMetaException(name, - !stmt.isForMTMV() ? TableType.OLAP : TableType.MATERIALIZED_VIEW); - } catch (Exception e) { - LOG.info("db or table not exists, msg={}", e.getMessage()); - return; - } - } else { - String dbName = tableName.getDb(); - db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbName); - String name = tableName.getTbl(); - olapTable = (OlapTable) db.getTableOrMetaException(name, - !stmt.isForMTMV() ? TableType.OLAP : TableType.MATERIALIZED_VIEW); - } + // drop materialized view if (!stmt.isForMTMV()) { + TableName tableName = stmt.getTableName(); + + // check db + String dbName = tableName.getDb(); + Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbName); + + String name = tableName.getTbl(); + OlapTable olapTable = (OlapTable) db.getTableOrMetaException(name, TableType.OLAP); ((MaterializedViewHandler) materializedViewHandler).processDropMaterializedView(stmt, db, olapTable); } else { DropTableStmt dropTableStmt = new DropTableStmt(stmt.isIfExists(), stmt.getMTMVName(), false); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/MVRefreshInfo.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/MVRefreshInfo.java index f2a49ee335..93d7389dae 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/MVRefreshInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/MVRefreshInfo.java @@ -37,7 +37,7 @@ public class MVRefreshInfo { } public MVRefreshInfo(RefreshMethod method, MVRefreshTriggerInfo trigger) { - this(false, method, trigger); + this(trigger == null, method, trigger); } public MVRefreshInfo(boolean neverRefresh, RefreshMethod method, MVRefreshTriggerInfo trigger) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/MaterializedView.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/MaterializedView.java index 1f1763c29f..72a810eeee 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/MaterializedView.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/MaterializedView.java @@ -20,13 +20,19 @@ package org.apache.doris.catalog; import org.apache.doris.analysis.MVRefreshInfo; import org.apache.doris.analysis.MVRefreshInfo.BuildMode; import org.apache.doris.catalog.OlapTableFactory.MaterializedViewParams; +import org.apache.doris.common.FeConstants; import org.apache.doris.common.io.Text; +import org.apache.doris.meta.MetaContext; import org.apache.doris.persist.gson.GsonUtils; import com.google.gson.annotations.SerializedName; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.DataInput; +import java.io.DataInputStream; import java.io.DataOutput; +import java.io.DataOutputStream; import java.io.IOException; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; @@ -42,7 +48,7 @@ public class MaterializedView extends OlapTable { private final ReentrantLock mvTaskLock = new ReentrantLock(true); - public boolean tryMvTaskLock() { + public boolean tryLockMVTask() { try { return mvTaskLock.tryLock(5, TimeUnit.SECONDS); } catch (InterruptedException e) { @@ -50,7 +56,7 @@ public class MaterializedView extends OlapTable { } } - public void mvTaskUnLock() { + public void unLockMVTask() { this.mvTaskLock.unlock(); } @@ -100,4 +106,20 @@ public class MaterializedView extends OlapTable { query = materializedView.query; buildMode = materializedView.buildMode; } + + public MaterializedView clone(String mvName) throws IOException { + MetaContext metaContext = new MetaContext(); + metaContext.setMetaVersion(FeConstants.meta_version); + metaContext.setThreadLocalInfo(); + try { + ByteArrayOutputStream out = new ByteArrayOutputStream(256); + MaterializedView cloned = new MaterializedView(); + this.write(new DataOutputStream(out)); + cloned.readFields(new DataInputStream(new ByteArrayInputStream(out.toByteArray()))); + cloned.setName(mvName); + return cloned; + } finally { + MetaContext.remove(); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobFactory.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobFactory.java index c7ad2479df..5af572304e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobFactory.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobFactory.java @@ -73,8 +73,8 @@ public class MTMVJobFactory { MTMVJob job = new MTMVJob(materializedView.getName() + "_" + uid); job.setTriggerMode(TriggerMode.PERIODICAL); job.setSchedule(genJobSchedule(materializedView)); - job.setDbName(dbName); - job.setMvName(materializedView.getName()); + job.setDBName(dbName); + job.setMVName(materializedView.getName()); job.setQuery(materializedView.getQuery()); job.setCreateTime(MTMVUtils.getNowTimeStamp()); return job; @@ -84,8 +84,8 @@ public class MTMVJobFactory { String uid = UUID.randomUUID().toString(); MTMVJob job = new MTMVJob(materializedView.getName() + "_" + uid); job.setTriggerMode(TriggerMode.ONCE); - job.setDbName(dbName); - job.setMvName(materializedView.getName()); + job.setDBName(dbName); + job.setMVName(materializedView.getName()); job.setQuery(materializedView.getQuery()); job.setCreateTime(MTMVUtils.getNowTimeStamp()); return job; diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java index ca92bd48da..fe77ea4015 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java @@ -285,14 +285,14 @@ public class MTMVJobManager { if (dbName == null) { jobList.addAll(nameToJobMap.values()); } else { - jobList.addAll(nameToJobMap.values().stream().filter(u -> u.getDbName().equals(dbName)) + jobList.addAll(nameToJobMap.values().stream().filter(u -> u.getDBName().equals(dbName)) .collect(Collectors.toList())); } return jobList.stream().sorted().collect(Collectors.toList()); } public List showJobs(String dbName, String mvName) { - return showJobs(dbName).stream().filter(u -> u.getMvName().equals(mvName)).collect(Collectors.toList()); + return showJobs(dbName).stream().filter(u -> u.getMVName().equals(mvName)).collect(Collectors.toList()); } private boolean tryLock() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskExecutor.java index 1fb6128acd..7fddfd91a4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskExecutor.java @@ -19,9 +19,8 @@ package org.apache.doris.mtmv; import org.apache.doris.analysis.UserIdentity; import org.apache.doris.catalog.Env; +import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.common.Config; -import org.apache.doris.mtmv.MTMVUtils.TaskState; -import org.apache.doris.mtmv.metadata.ChangeMTMVTask; import org.apache.doris.mtmv.metadata.MTMVJob; import org.apache.doris.mtmv.metadata.MTMVTask; import org.apache.doris.qe.ConnectContext; @@ -98,7 +97,9 @@ public class MTMVTaskExecutor implements Comparable { MTMVTaskContext taskContext = new MTMVTaskContext(); taskContext.setQuery(task.getQuery()); ctx = new ConnectContext(); - ctx.setDatabase(job.getDbName()); + ctx.setEnv(Env.getCurrentEnv()); + ctx.setCluster(ClusterNamespace.getClusterNameFromFullName(job.getDBName())); + ctx.setDatabase(job.getDBName()); ctx.setQualifiedUser(task.getUser()); ctx.setCurrentUserIdentity(UserIdentity.createAnalyzedUserIdentWithIp(job.getUser(), "%")); ctx.getState().reset(); @@ -113,10 +114,7 @@ public class MTMVTaskExecutor implements Comparable { Map properties = Maps.newHashMap(); taskContext.setProperties(properties); - processor.process(taskContext); - ChangeMTMVTask changeTask = new ChangeMTMVTask(job.getId(), task, TaskState.RUNNING, task.getState()); - Env.getCurrentEnv().getEditLog().logAlterScheduleTask(changeTask); - return task.getState() == TaskState.SUCCESS; + return processor.process(taskContext); } public ConnectContext getCtx() { @@ -136,9 +134,9 @@ public class MTMVTaskExecutor implements Comparable { } else { task.setCreateTime(createTime); } - task.setMvName(job.getMvName()); + task.setMVName(job.getMVName()); task.setUser(job.getUser()); - task.setDbName(job.getDbName()); + task.setDBName(job.getDBName()); task.setQuery(job.getQuery()); task.setExpireTime(MTMVUtils.getNowTimeStamp() + Config.scheduler_mtmv_task_expired); task.setRetryTimes(job.getRetryPolicy().getTimes()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskExecutorPool.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskExecutorPool.java index b069a2f2af..419412b3c3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskExecutorPool.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskExecutorPool.java @@ -17,7 +17,9 @@ package org.apache.doris.mtmv; +import org.apache.doris.catalog.Env; import org.apache.doris.mtmv.MTMVUtils.TaskState; +import org.apache.doris.mtmv.metadata.ChangeMTMVTask; import org.apache.doris.mtmv.metadata.MTMVTask; import org.apache.logging.log4j.LogManager; @@ -26,9 +28,11 @@ import org.apache.logging.log4j.Logger; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; public class MTMVTaskExecutorPool { private static final Logger LOG = LogManager.getLogger(MTMVTaskExecutorPool.class); + private static final long RETRY_INTERVAL = TimeUnit.SECONDS.toMillis(30); private final ExecutorService taskPool = Executors.newCachedThreadPool(); public void executeTask(MTMVTaskExecutor taskExecutor) { @@ -39,7 +43,7 @@ public class MTMVTaskExecutorPool { if (task == null) { return; } - if (task.getState() == TaskState.SUCCESS || task.getState() == TaskState.FAILED) { + if (task.getState() == TaskState.SUCCESS || task.getState() == TaskState.FAILURE) { LOG.warn("Task {} is in final status {} ", task.getTaskId(), task.getState()); return; } @@ -53,20 +57,31 @@ public class MTMVTaskExecutorPool { isSuccess = taskExecutor.executeTask(); if (isSuccess) { task.setState(TaskState.SUCCESS); - } else { - task.setState(TaskState.FAILED); + break; } - } catch (Exception ex) { - LOG.warn("failed to execute task.", ex); - } finally { - task.setFinishTime(MTMVUtils.getNowTimeStamp()); + } catch (Throwable t) { + LOG.warn("Failed to execute the task, taskId=" + task.getTaskId() + ".", t); } retryTimes--; - } while (!isSuccess && retryTimes >= 0); + + if (retryTimes > 0) { + try { + Thread.sleep(RETRY_INTERVAL); + } catch (InterruptedException e) { + LOG.warn("Failed to sleep.", e); + break; + } + } + } while (!isSuccess && retryTimes > 0); if (!isSuccess) { - task.setState(TaskState.FAILED); + task.setState(TaskState.FAILURE); task.setErrorCode(-1); } + task.setFinishTime(MTMVUtils.getNowTimeStamp()); + + ChangeMTMVTask changeTask = new ChangeMTMVTask(taskExecutor.getJob().getId(), task, TaskState.RUNNING, + task.getState()); + Env.getCurrentEnv().getEditLog().logAlterScheduleTask(changeTask); }); taskExecutor.setFuture(future); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java index e3682ab5cd..6cb90c262d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java @@ -278,20 +278,20 @@ public class MTMVTaskManager { } else { for (Queue pTaskQueue : getPendingTaskMap().values()) { taskList.addAll( - pTaskQueue.stream().map(MTMVTaskExecutor::getTask).filter(u -> u.getDbName().equals(dbName)) + pTaskQueue.stream().map(MTMVTaskExecutor::getTask).filter(u -> u.getDBName().equals(dbName)) .collect(Collectors.toList())); } taskList.addAll(getRunningTaskMap().values().stream().map(MTMVTaskExecutor::getTask) - .filter(u -> u.getDbName().equals(dbName)).collect(Collectors.toList())); + .filter(u -> u.getDBName().equals(dbName)).collect(Collectors.toList())); taskList.addAll( - getAllHistory().stream().filter(u -> u.getDbName().equals(dbName)).collect(Collectors.toList())); + getAllHistory().stream().filter(u -> u.getDBName().equals(dbName)).collect(Collectors.toList())); } return taskList.stream().sorted().collect(Collectors.toList()); } public List showTasks(String dbName, String mvName) { - return showTasks(dbName).stream().filter(u -> u.getMvName().equals(mvName)).collect(Collectors.toList()); + return showTasks(dbName).stream().filter(u -> u.getMVName().equals(mvName)).collect(Collectors.toList()); } public MTMVTask getTask(String taskId) throws AnalysisException { @@ -307,7 +307,7 @@ public class MTMVTaskManager { } public void replayCreateJobTask(MTMVTask task) { - if (task.getState() == TaskState.SUCCESS || task.getState() == TaskState.FAILED) { + if (task.getState() == TaskState.SUCCESS || task.getState() == TaskState.FAILURE) { if (MTMVUtils.getNowTimeStamp() > task.getExpireTime()) { return; } @@ -326,10 +326,10 @@ public class MTMVTaskManager { arrangeToPendingTask(taskExecutor); break; case RUNNING: - task.setState(TaskState.FAILED); + task.setState(TaskState.FAILURE); addHistory(task); break; - case FAILED: + case FAILURE: case SUCCESS: addHistory(task); break; @@ -360,16 +360,17 @@ public class MTMVTaskManager { status.setState(TaskState.RUNNING); getRunningTaskMap().put(jobId, pendingTask); } - } else if (toStatus == TaskState.FAILED) { + } else if (toStatus == TaskState.FAILURE) { status.setMessage(changeTask.getErrorMessage()); status.setErrorCode(changeTask.getErrorCode()); - status.setState(TaskState.FAILED); + status.setState(TaskState.FAILURE); addHistory(status); } if (taskQueue.size() == 0) { getPendingTaskMap().remove(jobId); } - } else if (fromStatus == TaskState.RUNNING && (toStatus == TaskState.SUCCESS || toStatus == TaskState.FAILED)) { + } else if (fromStatus == TaskState.RUNNING && (toStatus == TaskState.SUCCESS + || toStatus == TaskState.FAILURE)) { MTMVTaskExecutor runningTask = getRunningTaskMap().remove(jobId); if (runningTask == null) { return; @@ -425,10 +426,10 @@ public class MTMVTaskManager { MTMVTaskExecutor taskExecutor = tasks.poll(); taskExecutor.getTask().setMessage("Fe abort the task"); taskExecutor.getTask().setErrorCode(-1); - taskExecutor.getTask().setState(TaskState.FAILED); + taskExecutor.getTask().setState(TaskState.FAILURE); addHistory(taskExecutor.getTask()); changeAndLogTaskStatus(taskExecutor.getJobId(), taskExecutor.getTask(), TaskState.PENDING, - TaskState.FAILED); + TaskState.FAILURE); } pendingIter.remove(); } @@ -437,12 +438,12 @@ public class MTMVTaskManager { MTMVTaskExecutor taskExecutor = getRunningTaskMap().get(runningIter.next()); taskExecutor.getTask().setMessage("Fe abort the task"); taskExecutor.getTask().setErrorCode(-1); - taskExecutor.getTask().setState(TaskState.FAILED); + taskExecutor.getTask().setState(TaskState.FAILURE); taskExecutor.getTask().setFinishTime(MTMVUtils.getNowTimeStamp()); runningIter.remove(); addHistory(taskExecutor.getTask()); changeAndLogTaskStatus(taskExecutor.getJobId(), taskExecutor.getTask(), TaskState.RUNNING, - TaskState.FAILED); + TaskState.FAILURE); } } finally { unlock(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskProcessor.java index 5ba0bccaaf..3e133af6db 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskProcessor.java @@ -17,31 +17,20 @@ package org.apache.doris.mtmv; -import org.apache.doris.analysis.SqlParser; -import org.apache.doris.analysis.SqlScanner; -import org.apache.doris.analysis.StatementBase; -import org.apache.doris.analysis.UserIdentity; -import org.apache.doris.catalog.DatabaseIf; +import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.MaterializedView; -import org.apache.doris.catalog.TableIf; -import org.apache.doris.cluster.ClusterNamespace; -import org.apache.doris.common.AnalysisException; -import org.apache.doris.common.DdlException; +import org.apache.doris.common.ErrorCode; import org.apache.doris.common.FeConstants; -import org.apache.doris.common.util.SqlParserUtils; -import org.apache.doris.datasource.InternalCatalog; -import org.apache.doris.mtmv.MTMVUtils.TaskState; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.QueryState; +import org.apache.doris.qe.QueryState.MysqlStateType; import org.apache.doris.qe.StmtExecutor; -import org.apache.doris.system.SystemInfoService; import com.google.common.collect.Lists; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.io.StringReader; import java.util.List; import java.util.concurrent.atomic.AtomicLong; @@ -49,209 +38,118 @@ import java.util.concurrent.atomic.AtomicLong; public class MTMVTaskProcessor { private static final Logger LOG = LogManager.getLogger(MTMVTaskProcessor.class); private static final AtomicLong STMT_ID_GENERATOR = new AtomicLong(0); - private ConnectContext context; - void process(MTMVTaskContext context) throws Exception { + boolean process(MTMVTaskContext context) throws Exception { String taskId = context.getTask().getTaskId(); long jobId = context.getJob().getId(); - LOG.info("run mtmv logic start, task_id:{}, jobid:{}", taskId, jobId); - String tableName = context.getTask().getMvName(); - String tmpTableName = genTmpTableName(tableName); - DatabaseIf db = Env.getCurrentEnv().getCatalogMgr().getCatalog(InternalCatalog.INTERNAL_CATALOG_NAME) - .getDbOrAnalysisException(context.getTask().getDbName()); - MaterializedView table = (MaterializedView) db.getTableOrAnalysisException(tableName); - if (!table.tryMvTaskLock()) { - LOG.warn("run mtmv task failed, taskid:{}, jobid:{}, msg:{}", taskId, jobId, "get lock fail"); - return; + LOG.info("Start to run a MTMV task, taskId={}, jobId={}.", taskId, jobId); + + String mvName = context.getTask().getMVName(); + String temporaryMVName = getTemporaryMVName(mvName); + Database db = context.getCtx().getEnv().getInternalCatalog() + .getDbOrMetaException(context.getTask().getDBName()); + MaterializedView mv = (MaterializedView) db.getTableOrAnalysisException(mvName); + + if (!mv.tryLockMVTask()) { + LOG.warn("Failed to run the MTMV task, taskId={}, jobId={}, msg={}.", taskId, jobId, + "Failed to get the lock"); + context.getTask().setMessage("Failed to get the lock."); + return false; } try { - //step1 create tmp table - String tmpCreateTableStmt = genCreateTempMaterializedViewStmt(context, tableName, tmpTableName); - //check whther tmp table exists, if exists means run mtmv task failed before, so need to drop it first - if (db.isTableExist(tmpTableName)) { - String dropStml = genDropStml(context, tmpTableName); - ConnectContext dropResult = execSQL(context, dropStml); - LOG.info("exec drop table stmt, taskid:{}, stmt:{}, ret:{}, msg:{}", taskId, dropStml, - dropResult.getState(), dropResult.getState().getInfoMessage()); - } - ConnectContext createTempTableResult = execSQL(context, tmpCreateTableStmt); - LOG.info("exec tmp table stmt, taskid:{}, stmt:{}, ret:{}, msg:{}", taskId, tmpCreateTableStmt, - createTempTableResult.getState(), createTempTableResult.getState().getInfoMessage()); - if (createTempTableResult.getState().getStateType() != QueryState.MysqlStateType.OK) { - throw new Throwable("create tmp table failed, sql:" + tmpCreateTableStmt); + // Check whether the temporary materialized view exists, we should drop the obsolete materialized view first + // because it was created by previous tasks which failed to complete their work. + dropMaterializedView(context, temporaryMVName); + + // Step 1: create the temporary materialized view. + String createStatement = generateCreateStatement(mv.clone(temporaryMVName)); + if (!executeSQL(context, createStatement)) { + throw new RuntimeException( + "Failed to create the temporary materialized view, sql=" + createStatement + "."); } - //step2 insert data to tmp table - String insertStmt = genInsertIntoStmt(context, tmpTableName); - ConnectContext insertDataResult = execSQL(context, insertStmt); - LOG.info("exec insert into stmt, taskid:{}, stmt:{}, ret:{}, msg:{}, effected_row:{}", taskId, insertStmt, - insertDataResult.getState(), insertDataResult.getState().getInfoMessage(), - insertDataResult.getState().getAffectedRows()); - if (insertDataResult.getState().getStateType() != QueryState.MysqlStateType.OK) { - throw new Throwable("insert data failed, sql:" + insertStmt); + // Step 2: insert data to the temporary materialized view. + String insertSelectStatement = generateInsertSelectStmt(context, temporaryMVName); + if (!executeSQL(context, insertSelectStatement)) { + throw new RuntimeException( + "Failed to insert data to the temporary materialized view, sql=" + insertSelectStatement + "."); + } + String insertInfoMessage = context.getCtx().getState().getInfoMessage(); + + // Step 3: swap the temporary materialized view with the original materialized view. + String swapStatement = generateSwapStatement(mvName, temporaryMVName); + if (!executeSQL(context, swapStatement)) { + throw new RuntimeException( + "Failed to swap the temporary materialized view with the original materialized view, sql=" + + swapStatement + "."); } - //step3 swap tmp table with origin table - String swapStmt = genSwapStmt(context, tableName, tmpTableName); - ConnectContext swapResult = execSQL(context, swapStmt); - LOG.info("exec swap stmt, taskid:{}, stmt:{}, ret:{}, msg:{}", taskId, swapStmt, swapResult.getState(), - swapResult.getState().getInfoMessage()); - if (swapResult.getState().getStateType() != QueryState.MysqlStateType.OK) { - throw new Throwable("swap table failed, sql:" + swapStmt); - } - //step4 update task info - context.getTask().setMessage(insertDataResult.getState().getInfoMessage()); - context.getTask().setState(TaskState.SUCCESS); - LOG.info("run mtmv task success, task_id:{},jobid:{}", taskId, jobId); - } catch (AnalysisException e) { - LOG.warn("run mtmv task failed, taskid:{}, jobid:{}, msg:{}", taskId, jobId, e.getMessage()); - context.getTask().setMessage("run task failed, caused by " + e.getMessage()); - context.getTask().setState(TaskState.FAILED); + context.getTask().setMessage(insertInfoMessage); + LOG.info("Run MTMV task successfully, taskId={}, jobId={}.", taskId, jobId); + return true; } catch (Throwable e) { - LOG.warn("run mtmv task failed, taskid:{}, jobid:{}, msg:{}", taskId, jobId, e.getMessage()); - context.getTask().setMessage("run task failed, caused by " + e.getMessage()); - context.getTask().setState(TaskState.FAILED); + context.getTask().setMessage(e.getMessage()); + throw e; } finally { - context.getTask().setFinishTime(MTMVUtils.getNowTimeStamp()); - table.mvTaskUnLock(); - //double check - if (db.isTableExist(tmpTableName)) { - String dropStml = genDropStml(context, tmpTableName); - ConnectContext dropResult = execSQL(context, dropStml); - LOG.info("exec drop table stmt, taskid:{}, stmt:{}, ret:{}, msg:{}", taskId, dropStml, - dropResult.getState(), dropResult.getState().getInfoMessage()); - } + mv.unLockMVTask(); + dropMaterializedView(context, temporaryMVName); } } - private String genDropStml(MTMVTaskContext context, String tableName) { - String stmt = "DROP MATERIALIZED VIEW if exists " + tableName; - LOG.info("gen drop stmt, taskid:{}, stmt:{}", context.getTask().getTaskId(), stmt); - return stmt; + private String getTemporaryMVName(String mvName) { + return FeConstants.TEMP_MATERIZLIZE_DVIEW_PREFIX + mvName; } - private String genTmpTableName(String tableName) { - String tmpTableName = FeConstants.TEMP_MATERIZLIZE_DVIEW_PREFIX + tableName; - return tmpTableName; - } - - // ALTER TABLE t1 REPLACE WITH TABLE t1_mirror PROPERTIES('swap' = 'false'); - private String genSwapStmt(MTMVTaskContext context, String tableName, String tmpTableName) { - String stmt = "ALTER TABLE " + tableName + " REPLACE WITH TABLE " + tmpTableName - + " PROPERTIES('swap' = 'false');"; - LOG.info("gen swap stmt, taskid:{}, stmt:{}", context.getTask().getTaskId(), stmt); - return stmt; - } - - private String genInsertIntoStmt(MTMVTaskContext context, String tmpTableName) { - String query = context.getQuery(); - String stmt = "insert into " + tmpTableName + " " + query; - stmt = stmt.replaceAll(SystemInfoService.DEFAULT_CLUSTER + ":", ""); - LOG.info("gen insert into stmt, taskid:{}, stmt:{}", context.getTask().getTaskId(), stmt); - return stmt; - } - - private String genCreateTempMaterializedViewStmt(MTMVTaskContext context, String tableName, String tmpTableName) { - try { - String dbName = context.getTask().getDbName(); - String originViewStmt = getCreateViewStmt(dbName, tableName); - String tmpViewStmt = convertCreateViewStmt(originViewStmt, tmpTableName); - LOG.info("gen tmp table stmt, taskid:{}, originstml:{}, stmt:{}", context.getTask().getTaskId(), - originViewStmt.replaceAll("\n", " "), tmpViewStmt); - return tmpViewStmt; - } catch (Throwable e) { - LOG.warn("fail to gen tmp table stmt, taskid:{}, msg:{}", context.getTask().getTaskId(), e.getMessage()); - return ""; + private void dropMaterializedView(MTMVTaskContext context, String mvName) { + String dropStatement = generateDropStatement(mvName); + if (!executeSQL(context, dropStatement)) { + throw new RuntimeException( + "Failed to drop the temporary materialized view, sql=" + dropStatement + "."); } } - //Generate temporary view table statement - private String convertCreateViewStmt(String stmt, String tmpTable) { - stmt = stmt.replace("`", ""); - String regex = "CREATE MATERIALIZED VIEW.*\n"; - String replacement = "CREATE MATERIALIZED VIEW " + tmpTable + "\n"; - stmt = stmt.replaceAll(regex, replacement); - // regex = "BUILD.*\n"; - // stmt = stmt.replaceAll(regex, " BUILD deferred never REFRESH \n"); - stmt = stmt.replaceAll("\n", " "); - stmt = stmt.replaceAll(SystemInfoService.DEFAULT_CLUSTER + ":", ""); - return stmt; + private String generateDropStatement(String mvName) { + return "DROP MATERIALIZED VIEW IF EXISTS " + mvName; } - // get origin table create stmt from env - private String getCreateViewStmt(String dbName, String tableName) throws AnalysisException { - ConnectContext ctx = new ConnectContext(); - ctx.setEnv(Env.getCurrentEnv()); - DatabaseIf db = ctx.getEnv().getCatalogMgr().getCatalog(InternalCatalog.INTERNAL_CATALOG_NAME) - .getDbOrAnalysisException(dbName); - TableIf table = db.getTableOrAnalysisException(tableName); - table.readLock(); - try { - List createTableStmt = Lists.newArrayList(); - Env.getDdlStmt(table, createTableStmt, null, null, false, true /* hide password */, -1L); - if (createTableStmt.isEmpty()) { - return ""; - } - return createTableStmt.get(0); - } catch (Throwable e) { - //throw new AnalysisException(e.getMessage()); - } finally { - table.readUnlock(); - } - return ""; - } - - private ConnectContext execSQL(MTMVTaskContext context, String originStmt) throws AnalysisException, DdlException { - ConnectContext ctx = new ConnectContext(); - ctx.setEnv(Env.getCurrentEnv()); - ctx.setCluster(SystemInfoService.DEFAULT_CLUSTER); + private boolean executeSQL(MTMVTaskContext context, String sql) { + ConnectContext ctx = context.getCtx(); ctx.setThreadLocalInfo(); - String fullDbName = ClusterNamespace - .getFullName(SystemInfoService.DEFAULT_CLUSTER, context.getTask().getDbName()); - ctx.setDatabase(fullDbName); - ctx.setQualifiedUser("root"); - ctx.setCurrentUserIdentity(UserIdentity.createAnalyzedUserIdentWithIp("root", "%")); ctx.getState().reset(); - - List stmts = null; - StatementBase parsedStmt = null; - stmts = parse(ctx, originStmt); - parsedStmt = stmts.get(0); try { - StmtExecutor executor = new StmtExecutor(ctx, parsedStmt); + StmtExecutor executor = new StmtExecutor(ctx, sql); ctx.setExecutor(executor); executor.execute(); } catch (Throwable e) { - LOG.warn("execSQL failed, taskid:{}, msg:{}, stmt:{}", context.getTask().getTaskId(), e.getMessage(), - originStmt); + QueryState queryState = new QueryState(); + queryState.setError(ErrorCode.ERR_INTERNAL_ERROR, e.getMessage()); + ctx.setState(queryState); } finally { - LOG.debug("execSQL succ, taskid:{}, stmt:{}", context.getTask().getTaskId(), originStmt); + ConnectContext.remove(); } - return ctx; + + if (ctx.getState().getStateType() == MysqlStateType.OK) { + LOG.info("Execute SQL successfully, taskId={}, sql={}.", context.getTask().getTaskId(), sql); + } else { + LOG.warn("Failed to execute SQL, taskId={}, sql={}, errorCode={}, message={}.", + context.getTask().getTaskId(), + sql, ctx.getState().getErrorCode(), ctx.getState().getErrorMessage()); + } + return ctx.getState().getStateType() == MysqlStateType.OK; } - private List parse(ConnectContext ctx, String originStmt) throws AnalysisException, DdlException { - // Parse statement with parser generated by CUP&FLEX - SqlScanner input = new SqlScanner(new StringReader(originStmt), ctx.getSessionVariable().getSqlMode()); - SqlParser parser = new SqlParser(input); - try { - return SqlParserUtils.getMultiStmts(parser); - } catch (Error e) { - throw new AnalysisException("Please check your sql, we meet an error when parsing.", e); - } catch (AnalysisException | DdlException e) { - String errorMessage = parser.getErrorMsg(originStmt); - LOG.debug("origin stmt: {}; Analyze error message: {}", originStmt, parser.getErrorMsg(originStmt), e); - if (errorMessage == null) { - throw e; - } else { - throw new AnalysisException(errorMessage, e); - } - } catch (ArrayStoreException e) { - throw new AnalysisException("Sql parser can't convert the result to array, please check your sql.", e); - } catch (Exception e) { - // TODO(lingbin): we catch 'Exception' to prevent unexpected error, - // should be removed this try-catch clause future. - throw new AnalysisException("Internal Error, maybe syntax error or this is a bug"); - } + private String generateCreateStatement(MaterializedView mv) { + List createStatement = Lists.newArrayList(); + Env.getDdlStmt(mv, createStatement, null, null, false, true /* hide password */, -1L); + return createStatement.stream().findFirst().orElse(""); + } + + private String generateInsertSelectStmt(MTMVTaskContext context, String temporaryMVName) { + return "INSERT INTO " + temporaryMVName + " " + context.getQuery(); + } + + // ALTER TABLE t1 REPLACE WITH TABLE t1_mirror PROPERTIES('swap' = 'false'); + private String generateSwapStatement(String mvName, String temporaryMVName) { + return "ALTER TABLE " + mvName + " REPLACE WITH TABLE " + temporaryMVName + " PROPERTIES('swap' = 'false')"; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtils.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtils.java index 027aadfc1b..874eb0510c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtils.java @@ -60,7 +60,7 @@ public class MTMVUtils { } public enum TaskState { - PENDING, RUNNING, FAILED, SUCCESS, + PENDING, RUNNING, FAILURE, SUCCESS, } enum TaskSubmitStatus { diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/metadata/MTMVJob.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/metadata/MTMVJob.java index 6ed264ce51..a8dd62b4a9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/metadata/MTMVJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/metadata/MTMVJob.java @@ -150,19 +150,19 @@ public class MTMVJob implements Writable, Comparable { this.createTime = createTime; } - public String getDbName() { + public String getDBName() { return dbName; } - public void setDbName(String dbName) { + public void setDBName(String dbName) { this.dbName = dbName; } - public String getMvName() { + public String getMVName() { return mvName; } - public void setMvName(String mvName) { + public void setMVName(String mvName) { this.mvName = mvName; } @@ -287,8 +287,8 @@ public class MTMVJob implements Writable, Comparable { list.add(getName()); list.add(getTriggerMode().toString()); list.add(getSchedule() == null ? "NULL" : getSchedule().toString()); - list.add(getDbName()); - list.add(getMvName()); + list.add(getDBName()); + list.add(getMVName()); list.add(getQuery().length() > 10240 ? getQuery().substring(0, 10240) : getQuery()); list.add(getUser()); list.add(getRetryPolicy().toString()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/metadata/MTMVTask.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/metadata/MTMVTask.java index 98b0566643..603e98a560 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/metadata/MTMVTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/metadata/MTMVTask.java @@ -117,19 +117,19 @@ public class MTMVTask implements Writable, Comparable { this.state = state; } - public String getDbName() { + public String getDBName() { return dbName; } - public void setDbName(String dbName) { + public void setDBName(String dbName) { this.dbName = dbName; } - public String getMvName() { + public String getMVName() { return mvName; } - public void setMvName(String mvName) { + public void setMVName(String mvName) { this.mvName = mvName; } @@ -222,8 +222,8 @@ public class MTMVTask implements Writable, Comparable { List list = Lists.newArrayList(); list.add(getTaskId()); list.add(getJobName()); - list.add(getDbName()); - list.add(getMvName()); + list.add(getDBName()); + list.add(getMVName()); list.add(getQuery().length() > 10240 ? getQuery().substring(0, 10240) : getQuery()); list.add(getUser()); list.add(Integer.toString(getPriority())); diff --git a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVJobManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVJobManagerTest.java index 9985669449..c0545000a5 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVJobManagerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVJobManagerTest.java @@ -125,7 +125,7 @@ public class MTMVJobManagerTest extends TestWithFeService { // index 7: RetryTimes Assertions.assertEquals("0", taskRow.get(7)); // index 8: State - Assertions.assertEquals("FAILED", taskRow.get(8)); + Assertions.assertEquals("FAILURE", taskRow.get(8)); // index 9: Message Assertions.assertEquals("", taskRow.get(9)); // index 10: ErrorCode diff --git a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVTaskExecutorTest.java b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVTaskExecutorTest.java index 9756dd5db2..bee3c2e8bd 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVTaskExecutorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVTaskExecutorTest.java @@ -38,7 +38,7 @@ public class MTMVTaskExecutorTest extends TestWithFeService { executor.initTask(UUID.randomUUID().toString(), System.currentTimeMillis()); pool.executeTask(executor); executor.getFuture().get(); - Assertions.assertEquals(TaskState.FAILED, executor.getTask().getState()); + Assertions.assertEquals(TaskState.FAILURE, executor.getTask().getState()); } @@ -51,7 +51,7 @@ public class MTMVTaskExecutorTest extends TestWithFeService { executor.initTask(UUID.randomUUID().toString(), System.currentTimeMillis()); pool.executeTask(executor); executor.getFuture().get(); - Assertions.assertEquals(TaskState.FAILED, executor.getTask().getState()); + Assertions.assertEquals(TaskState.FAILURE, executor.getTask().getState()); //Assertions.assertEquals("java.lang.Exception: my define error 1", executor.getTask().getMessage()); } @@ -67,7 +67,7 @@ public class MTMVTaskExecutorTest extends TestWithFeService { executor.initTask(UUID.randomUUID().toString(), System.currentTimeMillis()); pool.executeTask(executor); executor.getFuture().get(); - Assertions.assertEquals(TaskState.FAILED, executor.getTask().getState()); + Assertions.assertEquals(TaskState.FAILURE, executor.getTask().getState()); } @Test @@ -82,7 +82,7 @@ public class MTMVTaskExecutorTest extends TestWithFeService { executor.initTask(UUID.randomUUID().toString(), System.currentTimeMillis()); pool.executeTask(executor); executor.getFuture().get(); - Assertions.assertEquals(TaskState.FAILED, executor.getTask().getState()); + Assertions.assertEquals(TaskState.FAILURE, executor.getTask().getState()); //Assertions.assertEquals("java.lang.Exception: my define error 4", executor.getTask().getMessage()); } @@ -94,11 +94,12 @@ public class MTMVTaskExecutorTest extends TestWithFeService { this.times = times; } - void process(MTMVTaskContext context) throws Exception { + boolean process(MTMVTaskContext context) throws Exception { if (runTimes < times) { runTimes++; throw new Exception("my define error " + runTimes); } + return true; } } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVUtilsTest.java b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVUtilsTest.java index 76a96f45f3..5a81173715 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVUtilsTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVUtilsTest.java @@ -40,17 +40,17 @@ public class MTMVUtilsTest { public static MTMVJob createDummyJob() { MTMVJob job = new MTMVJob("dummy"); - job.setDbName(dbName); - job.setMvName(MV_NAME); + job.setDBName(dbName); + job.setMVName(MV_NAME); return job; } public static MTMVJob createOnceJob() { MTMVJob job = new MTMVJob(""); job.setTriggerMode(TriggerMode.ONCE); - job.setDbName(dbName); + job.setDBName(dbName); job.setName(O_JOB); - job.setMvName(MV_NAME); + job.setMVName(MV_NAME); return job; } @@ -59,9 +59,9 @@ public class MTMVUtilsTest { JobSchedule jobSchedule = new JobSchedule(System.currentTimeMillis() / 1000, 1, TimeUnit.SECONDS); job.setSchedule(jobSchedule); job.setTriggerMode(TriggerMode.PERIODICAL); - job.setDbName(dbName); + job.setDBName(dbName); job.setName(S_JOB); - job.setMvName(MV_NAME); + job.setMVName(MV_NAME); return job; } diff --git a/regression-test/suites/mtmv_p0/test_create_mtmv.groovy b/regression-test/suites/mtmv_p0/test_create_mtmv.groovy index 9d50dd3286..6d3966614e 100644 --- a/regression-test/suites/mtmv_p0/test_create_mtmv.groovy +++ b/regression-test/suites/mtmv_p0/test_create_mtmv.groovy @@ -58,10 +58,7 @@ suite("test_create_mtmv") { """ sql """ CREATE MATERIALIZED VIEW ${mvName} - BUILD IMMEDIATE - REFRESH COMPLETE - START WITH "2022-10-27 19:35:00" - NEXT 60 second + BUILD IMMEDIATE REFRESH COMPLETE KEY(username) DISTRIBUTED BY HASH (username) buckets 1 PROPERTIES ('replication_num' = '1') @@ -72,15 +69,16 @@ suite("test_create_mtmv") { def show_task_meta = sql_meta "SHOW MTMV TASK FROM ${dbName}" def index = show_task_meta.indexOf(['State', 'CHAR']) def query = "SHOW MTMV TASK FROM ${dbName}" + def show_task_result def state do { - def show_task_result = sql "${query}" + show_task_result = sql "${query}" state = show_task_result.last().get(index) println "The state of ${query} is ${state}" Thread.sleep(1000); } while (state.equals('PENDING') || state.equals('RUNNING')) - assertEquals('SUCCESS', state) + assertEquals 'SUCCESS', state, show_task_result.last().toString() order_qt_select "SELECT * FROM ${mvName}" }