From 5985d216f33b1aa516d2a319165c229198e528fd Mon Sep 17 00:00:00 2001 From: zhangdong <493738387@qq.com> Date: Sun, 31 Dec 2023 23:10:30 +0800 Subject: [PATCH] [feature](mtmv)support cancel mtmv task command (#29252) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - `CANCEL MATERIALIZED VIEW TASK taskId on mvName` - CANCEL MATERIALIZED VIEW TASK, tasks("type"="mv") and jobs("type"="mv") support check auth use priv of mv - tasks and jobs add column mvName and mvDbName,you can use `select * from tasks("type"="mv") where MvName="xxx"` get all tasks of mv - fix `desc mv all` error - fix p0 The task sequence is incorrect --- .../org/apache/doris/nereids/DorisParser.g4 | 1 + .../apache/doris/analysis/DescribeStmt.java | 4 +- .../doris/job/extensions/mtmv/MTMVJob.java | 43 ++++++++- .../doris/job/extensions/mtmv/MTMVTask.java | 38 +++++++- .../job/extensions/mtmv/MTMVTaskContext.java | 9 ++ .../apache/doris/mtmv/MTMVHookService.java | 10 ++ .../org/apache/doris/mtmv/MTMVJobManager.java | 8 +- .../doris/mtmv/MTMVRelationManager.java | 6 ++ .../org/apache/doris/mtmv/MTMVService.java | 9 ++ .../nereids/parser/LogicalPlanBuilder.java | 10 ++ .../doris/nereids/trees/plans/PlanType.java | 1 + .../plans/commands/CancelMTMVTaskCommand.java | 50 ++++++++++ .../commands/info/CancelMTMVTaskInfo.java | 91 +++++++++++++++++++ .../plans/commands/info/PauseMTMVInfo.java | 7 ++ .../plans/commands/info/RefreshMTMVInfo.java | 9 ++ .../plans/commands/info/ResumeMTMVInfo.java | 7 ++ .../trees/plans/visitor/CommandVisitor.java | 5 + .../JobsTableValuedFunction.java | 10 +- .../tablefunction/MetadataGenerator.java | 17 ++++ .../TasksTableValuedFunction.java | 10 +- .../doris/regression/suite/Suite.groovy | 2 +- .../suites/mtmv_p0/test_build_mtmv.groovy | 4 + .../suites/mtmv_p0/test_task_mtmv.groovy | 62 +++++++++++++ 23 files changed, 396 insertions(+), 17 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CancelMTMVTaskCommand.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CancelMTMVTaskInfo.java create mode 100644 regression-test/suites/mtmv_p0/test_task_mtmv.groovy diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 index 0aab15c884..880986a6ce 100644 --- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 +++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 @@ -96,6 +96,7 @@ statement | DROP MATERIALIZED VIEW (IF EXISTS)? mvName=multipartIdentifier #dropMTMV | PAUSE MATERIALIZED VIEW JOB ON mvName=multipartIdentifier #pauseMTMV | RESUME MATERIALIZED VIEW JOB ON mvName=multipartIdentifier #resumeMTMV + | CANCEL MATERIALIZED VIEW TASK taskId=INTEGER_VALUE ON mvName=multipartIdentifier #cancelMTMVTask | ALTER TABLE table=relation ADD CONSTRAINT constraintName=errorCapturingIdentifier constraint #addConstraint diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DescribeStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DescribeStmt.java index cc5e8adb26..40c576b1c4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DescribeStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DescribeStmt.java @@ -187,7 +187,7 @@ public class DescribeStmt extends ShowStmt { // show base table schema only String procString = "/catalogs/" + catalog.getId() + "/" + db.getId() + "/" + table.getId() + "/" + TableProcDir.INDEX_SCHEMA + "/"; - if (table.getType() == TableType.OLAP) { + if (table instanceof OlapTable) { procString += ((OlapTable) table).getBaseIndexId(); } else { if (partitionNames != null) { @@ -212,7 +212,7 @@ public class DescribeStmt extends ShowStmt { } } else { Util.prohibitExternalCatalog(dbTableName.getCtl(), this.getClass().getSimpleName() + " ALL"); - if (table.getType() == TableType.OLAP) { + if (table instanceof OlapTable) { isOlapTable = true; OlapTable olapTable = (OlapTable) table; Set bfColumns = olapTable.getCopiedBfColumns(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java index 669a2806a5..678155170b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java @@ -17,6 +17,7 @@ package org.apache.doris.job.extensions.mtmv; +import org.apache.doris.analysis.UserIdentity; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; @@ -25,12 +26,15 @@ import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.TableIf.TableType; import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.common.UserException; import org.apache.doris.common.io.Text; import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.job.base.AbstractJob; import org.apache.doris.job.common.JobType; import org.apache.doris.job.common.TaskType; import org.apache.doris.job.extensions.mtmv.MTMVTask.MTMVTaskTriggerMode; +import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.qe.ShowResultSetMetaData; import org.apache.doris.thrift.TCell; @@ -67,11 +71,14 @@ public class MTMVJob extends AbstractJob { public static final ImmutableList SCHEMA = ImmutableList.of( new Column("Id", ScalarType.createStringType()), new Column("Name", ScalarType.createStringType()), + new Column("MvId", ScalarType.createStringType()), + new Column("MvName", ScalarType.createStringType()), + new Column("MvDatabaseId", ScalarType.createStringType()), + new Column("MvDatabaseName", ScalarType.createStringType()), new Column("ExecuteType", ScalarType.createStringType()), new Column("RecurringStrategy", ScalarType.createStringType()), new Column("Status", ScalarType.createStringType()), - new Column("CreateTime", ScalarType.createStringType()), - new Column("Comment", ScalarType.createStringType())); + new Column("CreateTime", ScalarType.createStringType())); public static final ImmutableMap COLUMN_TO_INDEX; @@ -118,6 +125,7 @@ public class MTMVJob extends AbstractJob { @Override public List createTasks(TaskType taskType, MTMVTaskContext taskContext) { + LOG.info("begin create mtmv task, jobId: {}, taskContext: {}", super.getJobId(), taskContext); if (taskContext == null) { taskContext = new MTMVTaskContext(MTMVTaskTriggerMode.SYSTEM); } @@ -126,6 +134,7 @@ public class MTMVJob extends AbstractJob { ArrayList tasks = new ArrayList<>(); tasks.add(task); super.initTasks(tasks, taskType); + LOG.info("finish create mtmv task, task: {}", task); return tasks; } @@ -144,6 +153,8 @@ public class MTMVJob extends AbstractJob { List runningTasks = getRunningTasks(); for (MTMVTask task : runningTasks) { if (task.getTaskContext() == null || task.getTaskContext().getTriggerMode() == MTMVTaskTriggerMode.SYSTEM) { + LOG.warn("isReadyForScheduling return false, because current taskContext is null, exist task: {}", + task); return false; } } @@ -195,14 +206,40 @@ public class MTMVJob extends AbstractJob { TRow trow = new TRow(); trow.addToColumnValue(new TCell().setStringVal(String.valueOf(super.getJobId()))); trow.addToColumnValue(new TCell().setStringVal(super.getJobName())); + String dbName = ""; + String mvName = ""; + try { + MTMV mtmv = getMTMV(); + dbName = mtmv.getQualifiedDbName(); + mvName = mtmv.getName(); + } catch (UserException e) { + LOG.warn("can not find mv", e); + } + trow.addToColumnValue(new TCell().setStringVal(String.valueOf(mtmvId))); + trow.addToColumnValue(new TCell().setStringVal(mvName)); + trow.addToColumnValue(new TCell().setStringVal(String.valueOf(dbId))); + trow.addToColumnValue(new TCell().setStringVal(dbName)); trow.addToColumnValue(new TCell().setStringVal(super.getJobConfig().getExecuteType().name())); trow.addToColumnValue(new TCell().setStringVal(super.getJobConfig().convertRecurringStrategyToString())); trow.addToColumnValue(new TCell().setStringVal(super.getJobStatus().name())); trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(super.getCreateTimeMs()))); - trow.addToColumnValue(new TCell().setStringVal(super.getComment())); return trow; } + public boolean hasPriv(UserIdentity userIdentity, PrivPredicate wanted) { + MTMV mtmv; + try { + mtmv = getMTMV(); + } catch (UserException e) { + LOG.warn("can not find mv", e); + return false; + } + return Env.getCurrentEnv().getAccessManager() + .checkTblPriv(userIdentity, InternalCatalog.INTERNAL_CATALOG_NAME, + mtmv.getQualifiedDbName(), mtmv.getName(), + wanted); + } + private MTMV getMTMV() throws DdlException, MetaNotFoundException { Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbId); return (MTMV) db.getTableOrMetaException(mtmvId, TableType.MATERIALIZED_VIEW); diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java index 7c11f0549a..cbfea9e713 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java @@ -25,7 +25,9 @@ import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.TableIf.TableType; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.DdlException; import org.apache.doris.common.FeConstants; +import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.job.exception.JobException; @@ -73,7 +75,9 @@ public class MTMVTask extends AbstractTask { new Column("JobId", ScalarType.createStringType()), new Column("JobName", ScalarType.createStringType()), new Column("MvId", ScalarType.createStringType()), + new Column("MvName", ScalarType.createStringType()), new Column("MvDatabaseId", ScalarType.createStringType()), + new Column("MvDatabaseName", ScalarType.createStringType()), new Column("Status", ScalarType.createStringType()), new Column("ErrorMsg", ScalarType.createStringType()), new Column("CreateTime", ScalarType.createStringType()), @@ -213,8 +217,7 @@ public class MTMVTask extends AbstractTask { LOG.info("mtmv task before, taskId: {}", super.getTaskId()); super.before(); try { - Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbId); - mtmv = (MTMV) db.getTableOrMetaException(mtmvId, TableType.MATERIALIZED_VIEW); + mtmv = getMTMV(); if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE) { OlapTable relatedTable = (OlapTable) MTMVUtil.getTable(mtmv.getMvPartitionInfo().getRelatedTable()); MTMVUtil.alignMvPartition(mtmv, relatedTable); @@ -225,6 +228,11 @@ public class MTMVTask extends AbstractTask { } } + private MTMV getMTMV() throws DdlException, MetaNotFoundException { + Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbId); + return (MTMV) db.getTableOrMetaException(mtmvId, TableType.MATERIALIZED_VIEW); + } + @Override public void runTask() throws JobException { LOG.info("mtmv task runTask, taskId: {}", super.getTaskId()); @@ -246,8 +254,19 @@ public class MTMVTask extends AbstractTask { trow.addToColumnValue(new TCell().setStringVal(String.valueOf(super.getTaskId()))); trow.addToColumnValue(new TCell().setStringVal(String.valueOf(super.getJobId()))); trow.addToColumnValue(new TCell().setStringVal(super.getJobName())); + String dbName = ""; + String mvName = ""; + try { + MTMV mtmv = getMTMV(); + dbName = mtmv.getQualifiedDbName(); + mvName = mtmv.getName(); + } catch (UserException e) { + LOG.warn("can not find mv", e); + } trow.addToColumnValue(new TCell().setStringVal(String.valueOf(mtmvId))); + trow.addToColumnValue(new TCell().setStringVal(mvName)); trow.addToColumnValue(new TCell().setStringVal(String.valueOf(dbId))); + trow.addToColumnValue(new TCell().setStringVal(dbName)); trow.addToColumnValue(new TCell() .setStringVal(super.getStatus() == null ? FeConstants.null_string : super.getStatus().toString())); trow.addToColumnValue(new TCell().setStringVal(super.getErrMsg())); @@ -355,4 +374,19 @@ public class MTMVTask extends AbstractTask { public MTMVTaskContext getTaskContext() { return taskContext; } + + @Override + public String toString() { + return "MTMVTask{" + + "dbId=" + dbId + + ", mtmvId=" + mtmvId + + ", taskContext=" + taskContext + + ", needRefreshPartitions=" + needRefreshPartitions + + ", completedPartitions=" + completedPartitions + + ", refreshMode=" + refreshMode + + ", mtmv=" + mtmv + + ", relation=" + relation + + ", executor=" + executor + + "} " + super.toString(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTaskContext.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTaskContext.java index adb6fd5ef7..1c53f5dd30 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTaskContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTaskContext.java @@ -55,4 +55,13 @@ public class MTMVTaskContext { public boolean isComplete() { return isComplete; } + + @Override + public String toString() { + return "MTMVTaskContext{" + + "triggerMode=" + triggerMode + + ", partitions=" + partitions + + ", isComplete=" + isComplete + + '}'; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVHookService.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVHookService.java index 41bc506f5c..d9ab998458 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVHookService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVHookService.java @@ -23,6 +23,7 @@ import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.job.exception.JobException; import org.apache.doris.job.extensions.mtmv.MTMVTask; +import org.apache.doris.nereids.trees.plans.commands.info.CancelMTMVTaskInfo; import org.apache.doris.nereids.trees.plans.commands.info.PauseMTMVInfo; import org.apache.doris.nereids.trees.plans.commands.info.RefreshMTMVInfo; import org.apache.doris.nereids.trees.plans.commands.info.ResumeMTMVInfo; @@ -107,13 +108,22 @@ public interface MTMVHookService { /** * Triggered when pause mtmv + * * @param info */ void pauseMTMV(PauseMTMVInfo info) throws MetaNotFoundException, DdlException, JobException; /** * Triggered when resume mtmv + * * @param info */ void resumeMTMV(ResumeMTMVInfo info) throws MetaNotFoundException, DdlException, JobException; + + /** + * cancel mtmv task + * + * @param info + */ + void cancelMTMVTask(CancelMTMVTaskInfo info) throws DdlException, MetaNotFoundException, JobException; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java index 8cf225c59c..f53a7b6086 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java @@ -37,6 +37,7 @@ import org.apache.doris.job.extensions.mtmv.MTMVTask.MTMVTaskTriggerMode; import org.apache.doris.job.extensions.mtmv.MTMVTaskContext; import org.apache.doris.mtmv.MTMVRefreshEnum.BuildMode; import org.apache.doris.mtmv.MTMVRefreshEnum.RefreshTrigger; +import org.apache.doris.nereids.trees.plans.commands.info.CancelMTMVTaskInfo; import org.apache.doris.nereids.trees.plans.commands.info.PauseMTMVInfo; import org.apache.doris.nereids.trees.plans.commands.info.RefreshMTMVInfo; import org.apache.doris.nereids.trees.plans.commands.info.ResumeMTMVInfo; @@ -66,7 +67,6 @@ public class MTMVJobManager implements MTMVHookService { MTMVJob job = new MTMVJob(mtmv.getDatabase().getId(), mtmv.getId()); job.setJobId(Env.getCurrentEnv().getNextId()); job.setJobName(mtmv.getJobInfo().getJobName()); - job.setComment(mtmv.getName()); job.setCreateUser(ConnectContext.get().getCurrentUserIdentity()); job.setJobStatus(JobStatus.RUNNING); job.setJobConfig(getJobConfig(mtmv)); @@ -204,6 +204,12 @@ public class MTMVJobManager implements MTMVHookService { Env.getCurrentEnv().getJobManager().alterJobStatus(job.getJobId(), JobStatus.RUNNING); } + @Override + public void cancelMTMVTask(CancelMTMVTaskInfo info) throws DdlException, MetaNotFoundException, JobException { + MTMVJob job = getJobByTableNameInfo(info.getMvName()); + job.cancelTaskById(info.getTaskId()); + } + private MTMVJob getJobByTableNameInfo(TableNameInfo info) throws DdlException, MetaNotFoundException { Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(info.getDb()); MTMV mtmv = (MTMV) db.getTableOrMetaException(info.getTbl(), TableType.MATERIALIZED_VIEW); diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelationManager.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelationManager.java index 7be43771e4..35bf777acb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelationManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelationManager.java @@ -27,6 +27,7 @@ import org.apache.doris.job.common.TaskStatus; import org.apache.doris.job.exception.JobException; import org.apache.doris.job.extensions.mtmv.MTMVTask; import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVState; +import org.apache.doris.nereids.trees.plans.commands.info.CancelMTMVTaskInfo; import org.apache.doris.nereids.trees.plans.commands.info.PauseMTMVInfo; import org.apache.doris.nereids.trees.plans.commands.info.RefreshMTMVInfo; import org.apache.doris.nereids.trees.plans.commands.info.ResumeMTMVInfo; @@ -200,6 +201,11 @@ public class MTMVRelationManager implements MTMVHookService { } + @Override + public void cancelMTMVTask(CancelMTMVTaskInfo info) { + + } + private void processBaseTableChange(Table table, String msgPrefix) { BaseTableInfo baseTableInfo = new BaseTableInfo(table); Set mtmvsByBaseTable = getMtmvsByBaseTable(baseTableInfo); diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java index 3ab05c92a4..5b7c188823 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java @@ -26,6 +26,7 @@ import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.job.exception.JobException; import org.apache.doris.job.extensions.mtmv.MTMVTask; import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType; +import org.apache.doris.nereids.trees.plans.commands.info.CancelMTMVTaskInfo; import org.apache.doris.nereids.trees.plans.commands.info.PauseMTMVInfo; import org.apache.doris.nereids.trees.plans.commands.info.RefreshMTMVInfo; import org.apache.doris.nereids.trees.plans.commands.info.ResumeMTMVInfo; @@ -159,4 +160,12 @@ public class MTMVService { mtmvHookService.resumeMTMV(info); } } + + public void cancelMTMVTask(CancelMTMVTaskInfo info) throws MetaNotFoundException, DdlException, JobException { + Objects.requireNonNull(info); + LOG.info("cancelMTMVTask, CancelMTMVTaskInfo: {}", info); + for (MTMVHookService mtmvHookService : hooks.values()) { + mtmvHookService.cancelMTMVTask(info); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java index 7189051662..745ae17946 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java @@ -55,6 +55,7 @@ import org.apache.doris.nereids.DorisParser.BracketJoinHintContext; import org.apache.doris.nereids.DorisParser.BracketRelationHintContext; import org.apache.doris.nereids.DorisParser.BuildModeContext; import org.apache.doris.nereids.DorisParser.CallProcedureContext; +import org.apache.doris.nereids.DorisParser.CancelMTMVTaskContext; import org.apache.doris.nereids.DorisParser.CollateContext; import org.apache.doris.nereids.DorisParser.ColumnDefContext; import org.apache.doris.nereids.DorisParser.ColumnDefsContext; @@ -326,6 +327,7 @@ import org.apache.doris.nereids.trees.plans.commands.AddConstraintCommand; import org.apache.doris.nereids.trees.plans.commands.AlterMTMVCommand; import org.apache.doris.nereids.trees.plans.commands.BatchInsertIntoTableCommand; import org.apache.doris.nereids.trees.plans.commands.CallCommand; +import org.apache.doris.nereids.trees.plans.commands.CancelMTMVTaskCommand; import org.apache.doris.nereids.trees.plans.commands.Command; import org.apache.doris.nereids.trees.plans.commands.Constraint; import org.apache.doris.nereids.trees.plans.commands.CreateMTMVCommand; @@ -351,6 +353,7 @@ import org.apache.doris.nereids.trees.plans.commands.info.AlterMTMVRefreshInfo; import org.apache.doris.nereids.trees.plans.commands.info.AlterMTMVRenameInfo; import org.apache.doris.nereids.trees.plans.commands.info.BulkLoadDataDesc; import org.apache.doris.nereids.trees.plans.commands.info.BulkStorageDesc; +import org.apache.doris.nereids.trees.plans.commands.info.CancelMTMVTaskInfo; import org.apache.doris.nereids.trees.plans.commands.info.ColumnDefinition; import org.apache.doris.nereids.trees.plans.commands.info.CreateMTMVInfo; import org.apache.doris.nereids.trees.plans.commands.info.CreateTableInfo; @@ -712,6 +715,13 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor { return new ResumeMTMVCommand(new ResumeMTMVInfo(new TableNameInfo(nameParts))); } + @Override + public CancelMTMVTaskCommand visitCancelMTMVTask(CancelMTMVTaskContext ctx) { + List nameParts = visitMultipartIdentifier(ctx.mvName); + long taskId = Long.parseLong(ctx.taskId.getText()); + return new CancelMTMVTaskCommand(new CancelMTMVTaskInfo(new TableNameInfo(nameParts), taskId)); + } + @Override public AlterMTMVCommand visitAlterMTMV(AlterMTMVContext ctx) { List nameParts = visitMultipartIdentifier(ctx.mvName); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java index 3db4a43834..3fe662b7ba 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java @@ -135,5 +135,6 @@ public enum PlanType { DROP_MTMV_COMMAND, PAUSE_MTMV_COMMAND, RESUME_MTMV_COMMAND, + CANCEL_MTMV_TASK_COMMAND, CALL_COMMAND } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CancelMTMVTaskCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CancelMTMVTaskCommand.java new file mode 100644 index 0000000000..e5c075da7e --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CancelMTMVTaskCommand.java @@ -0,0 +1,50 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands; + +import org.apache.doris.catalog.Env; +import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.commands.info.CancelMTMVTaskInfo; +import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.StmtExecutor; + +import java.util.Objects; + +/** + * cancel mtmv task + */ +public class CancelMTMVTaskCommand extends Command implements ForwardWithSync, NotAllowFallback { + private final CancelMTMVTaskInfo cancelMTMVTaskInfo; + + public CancelMTMVTaskCommand(CancelMTMVTaskInfo cancelMTMVTaskInfo) { + super(PlanType.CANCEL_MTMV_TASK_COMMAND); + this.cancelMTMVTaskInfo = Objects.requireNonNull(cancelMTMVTaskInfo, "require cancelMTMVTaskInfo object"); + } + + @Override + public void run(ConnectContext ctx, StmtExecutor executor) throws Exception { + cancelMTMVTaskInfo.analyze(ctx); + Env.getCurrentEnv().getMtmvService().cancelMTMVTask(cancelMTMVTaskInfo); + } + + @Override + public R accept(PlanVisitor visitor, C context) { + return visitor.visitCancelMTMVTaskCommand(this, context); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CancelMTMVTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CancelMTMVTaskInfo.java new file mode 100644 index 0000000000..257769fe35 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CancelMTMVTaskInfo.java @@ -0,0 +1,91 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands.info; + +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.TableIf.TableType; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.qe.ConnectContext; + +import java.util.Objects; + +/** + * cancel mtmv task info + */ +public class CancelMTMVTaskInfo { + private final TableNameInfo mvName; + private final long taskId; + + public CancelMTMVTaskInfo(TableNameInfo mvName, long taskId) { + this.mvName = Objects.requireNonNull(mvName, "require mvName object"); + this.taskId = taskId; + } + + /** + * analyze pause info + * + * @param ctx ConnectContext + */ + public void analyze(ConnectContext ctx) { + mvName.analyze(ctx); + if (!Env.getCurrentEnv().getAccessManager().checkTblPriv(ConnectContext.get(), mvName.getDb(), + mvName.getTbl(), PrivPredicate.CREATE)) { + String message = ErrorCode.ERR_TABLEACCESS_DENIED_ERROR.formatErrorMsg("CREATE", + ConnectContext.get().getQualifiedUser(), ConnectContext.get().getRemoteIP(), + mvName.getDb() + ": " + mvName.getTbl()); + throw new AnalysisException(message); + } + try { + Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(mvName.getDb()); + db.getTableOrMetaException(mvName.getTbl(), TableType.MATERIALIZED_VIEW); + } catch (MetaNotFoundException | DdlException e) { + throw new AnalysisException(e.getMessage()); + } + } + + /** + * getMvName + * + * @return TableNameInfo + */ + public TableNameInfo getMvName() { + return mvName; + } + + /** + * get taskId + * + * @return taskId + */ + public long getTaskId() { + return taskId; + } + + @Override + public String toString() { + return "CancelMTMVTaskInfo{" + + "mvName=" + mvName + + ", taskId=" + taskId + + '}'; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/PauseMTMVInfo.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/PauseMTMVInfo.java index 15bbcff6f7..c744e66670 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/PauseMTMVInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/PauseMTMVInfo.java @@ -69,4 +69,11 @@ public class PauseMTMVInfo { public TableNameInfo getMvName() { return mvName; } + + @Override + public String toString() { + return "PauseMTMVInfo{" + + "mvName=" + mvName + + '}'; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/RefreshMTMVInfo.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/RefreshMTMVInfo.java index e6a4368a85..315b30ae87 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/RefreshMTMVInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/RefreshMTMVInfo.java @@ -100,4 +100,13 @@ public class RefreshMTMVInfo { public boolean isComplete() { return isComplete; } + + @Override + public String toString() { + return "RefreshMTMVInfo{" + + "mvName=" + mvName + + ", partitions=" + partitions + + ", isComplete=" + isComplete + + '}'; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/ResumeMTMVInfo.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/ResumeMTMVInfo.java index a7f23bedfc..541d5c878e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/ResumeMTMVInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/ResumeMTMVInfo.java @@ -69,4 +69,11 @@ public class ResumeMTMVInfo { public TableNameInfo getMvName() { return mvName; } + + @Override + public String toString() { + return "ResumeMTMVInfo{" + + "mvName=" + mvName + + '}'; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java index a48a8aaf98..297e80114d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java @@ -21,6 +21,7 @@ import org.apache.doris.nereids.trees.plans.commands.AddConstraintCommand; import org.apache.doris.nereids.trees.plans.commands.AlterMTMVCommand; import org.apache.doris.nereids.trees.plans.commands.BatchInsertIntoTableCommand; import org.apache.doris.nereids.trees.plans.commands.CallCommand; +import org.apache.doris.nereids.trees.plans.commands.CancelMTMVTaskCommand; import org.apache.doris.nereids.trees.plans.commands.Command; import org.apache.doris.nereids.trees.plans.commands.CreateMTMVCommand; import org.apache.doris.nereids.trees.plans.commands.CreatePolicyCommand; @@ -123,6 +124,10 @@ public interface CommandVisitor { return visitCommand(resumeMTMVCommand, context); } + default R visitCancelMTMVTaskCommand(CancelMTMVTaskCommand cancelMTMVTaskCommand, C context) { + return visitCommand(cancelMTMVTaskCommand, context); + } + default R visitCallCommand(CallCommand callCommand, C context) { return visitCommand(callCommand, context); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JobsTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JobsTableValuedFunction.java index 2a3a698eb9..260a483d18 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JobsTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JobsTableValuedFunction.java @@ -17,11 +17,12 @@ package org.apache.doris.tablefunction; -import org.apache.doris.analysis.UserIdentity; import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Env; import org.apache.doris.job.common.JobType; import org.apache.doris.job.extensions.insert.InsertJob; import org.apache.doris.job.extensions.mtmv.MTMVJob; +import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.qe.ConnectContext; import org.apache.doris.thrift.TJobsMetadataParams; @@ -64,9 +65,10 @@ public class JobsTableValuedFunction extends MetadataTableValuedFunction { throw new AnalysisException("Invalid job metadata query"); } this.jobType = jobType; - UserIdentity userIdentity = ConnectContext.get().getCurrentUserIdentity(); - if (!userIdentity.isRootUser()) { - throw new AnalysisException("only root user can operate"); + if (jobType != JobType.MV) { + if (!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) { + throw new AnalysisException("only ADMIN priv can operate"); + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java index 9c773b37dc..5b4069ae8e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java @@ -31,6 +31,7 @@ import org.apache.doris.common.util.TimeUtils; import org.apache.doris.datasource.CatalogIf; import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.job.common.JobType; +import org.apache.doris.job.extensions.mtmv.MTMVJob; import org.apache.doris.job.task.AbstractTask; import org.apache.doris.mtmv.MTMVUtil; import org.apache.doris.mysql.privilege.PrivPredicate; @@ -586,12 +587,20 @@ public class MetadataGenerator { TJobsMetadataParams jobsMetadataParams = params.getJobsMetadataParams(); String type = jobsMetadataParams.getType(); JobType jobType = JobType.valueOf(type); + TUserIdentity currentUserIdent = jobsMetadataParams.getCurrentUserIdent(); + UserIdentity userIdentity = UserIdentity.fromThrift(currentUserIdent); List dataBatch = Lists.newArrayList(); TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult(); List jobList = Env.getCurrentEnv().getJobManager().queryJobs(jobType); for (org.apache.doris.job.base.AbstractJob job : jobList) { + if (job instanceof MTMVJob) { + MTMVJob mtmvJob = (MTMVJob) job; + if (!mtmvJob.hasPriv(userIdentity, PrivPredicate.SHOW)) { + continue; + } + } dataBatch.add(job.getTvfInfo()); } result.setDataBatch(dataBatch); @@ -607,12 +616,20 @@ public class MetadataGenerator { TTasksMetadataParams tasksMetadataParams = params.getTasksMetadataParams(); String type = tasksMetadataParams.getType(); JobType jobType = JobType.valueOf(type); + TUserIdentity currentUserIdent = tasksMetadataParams.getCurrentUserIdent(); + UserIdentity userIdentity = UserIdentity.fromThrift(currentUserIdent); List dataBatch = Lists.newArrayList(); TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult(); List jobList = Env.getCurrentEnv().getJobManager().queryJobs(jobType); for (org.apache.doris.job.base.AbstractJob job : jobList) { + if (job instanceof MTMVJob) { + MTMVJob mtmvJob = (MTMVJob) job; + if (!mtmvJob.hasPriv(userIdentity, PrivPredicate.SHOW)) { + continue; + } + } List tasks = job.queryAllTasks(); for (AbstractTask task : tasks) { TRow tvfInfo = task.getTvfInfo(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TasksTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TasksTableValuedFunction.java index c2df1cc000..c7015f57ff 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TasksTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TasksTableValuedFunction.java @@ -17,11 +17,12 @@ package org.apache.doris.tablefunction; -import org.apache.doris.analysis.UserIdentity; import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Env; import org.apache.doris.job.common.JobType; import org.apache.doris.job.extensions.insert.InsertTask; import org.apache.doris.job.extensions.mtmv.MTMVTask; +import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.qe.ConnectContext; import org.apache.doris.thrift.TMetaScanRange; @@ -64,9 +65,10 @@ public class TasksTableValuedFunction extends MetadataTableValuedFunction { throw new AnalysisException("Invalid task metadata query"); } this.jobType = jobType; - UserIdentity userIdentity = ConnectContext.get().getCurrentUserIdentity(); - if (!userIdentity.isRootUser()) { - throw new AnalysisException("only root user can operate"); + if (jobType != JobType.MV) { + if (!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) { + throw new AnalysisException("only ADMIN priv can operate"); + } } } diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy index 942f44f8d8..cd7a01b8b9 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy @@ -853,7 +853,7 @@ class Suite implements GroovyInterceptable { void waitingMTMVTaskFinished(String jobName) { Thread.sleep(2000); - String showTasks = "select TaskId,JobId,JobName,MvId,Status from tasks('type'='mv') where JobName = '${jobName}'" + String showTasks = "select TaskId,JobId,JobName,MvId,Status from tasks('type'='mv') where JobName = '${jobName}' order by CreateTime ASC" String status = "NULL" List> result long startTime = System.currentTimeMillis() diff --git a/regression-test/suites/mtmv_p0/test_build_mtmv.groovy b/regression-test/suites/mtmv_p0/test_build_mtmv.groovy index eb4560f8b0..05bc5e5322 100644 --- a/regression-test/suites/mtmv_p0/test_build_mtmv.groovy +++ b/regression-test/suites/mtmv_p0/test_build_mtmv.groovy @@ -88,6 +88,10 @@ suite("test_build_mtmv") { logger.info("showCreateTableResult: " + showCreateTableResult.toString()) assertTrue(showCreateTableResult.toString().contains("CREATE MATERIALIZED VIEW `multi_mv_test_create_mtmv` (\n `aa` BIGINT NULL COMMENT 'aaa',\n `bb` VARCHAR(20) NULL\n) ENGINE=MATERIALIZED_VIEW\nCOMMENT 'comment1'\nDISTRIBUTED BY RANDOM BUCKETS 2\nPROPERTIES")) + def descTableAllResult = sql """desc ${mvName} all""" + logger.info("descTableAllResult: " + descTableAllResult.toString()) + assertTrue(descTableAllResult.toString().contains("${mvName}")) + // if not exist try { sql """ diff --git a/regression-test/suites/mtmv_p0/test_task_mtmv.groovy b/regression-test/suites/mtmv_p0/test_task_mtmv.groovy new file mode 100644 index 0000000000..a3e59afd66 --- /dev/null +++ b/regression-test/suites/mtmv_p0/test_task_mtmv.groovy @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_task_mtmv") { + def tableName = "t_test_task_mtmv_user" + def mvName = "multi_mv_test_task_mtmv" + def dbName = "regression_test_mtmv_p0" + sql """drop materialized view if exists ${mvName};""" + sql """drop table if exists `${tableName}`""" + sql """ + CREATE TABLE IF NOT EXISTS `${tableName}` ( + event_day DATE, + id BIGINT, + username VARCHAR(20) + ) + DISTRIBUTED BY HASH(id) BUCKETS 10 + PROPERTIES ( + "replication_num" = "1" + ); + """ + + sql """ + CREATE MATERIALIZED VIEW ${mvName} + BUILD DEFERRED REFRESH COMPLETE ON MANUAL + DISTRIBUTED BY RANDOM BUCKETS 2 + PROPERTIES ('replication_num' = '1') + AS + SELECT * FROM ${tableName}; + """ + sql """ + REFRESH MATERIALIZED VIEW ${mvName} + """ + def jobName = getJobName(dbName, mvName); + waitingMTMVTaskFinished(jobName) + def taskIdArr = sql """ select TaskId from tasks('type'='mv') where MvName = '${mvName}';""" + def taskId = taskIdArr.get(0).get(0); + logger.info("taskId: " + taskId.toString()) + try { + sql """ + cancel MATERIALIZED VIEW TASK ${taskId} on ${mvName}; + """ + } catch (Exception e) { + log.info("cancel error msg: " + e.getMessage()) + assertTrue(e.getMessage().contains("no running task")); + } + sql """drop materialized view if exists ${mvName};""" + +}