[feature](mtmv)support cancel mtmv task command (#29252)

- `CANCEL MATERIALIZED VIEW TASK taskId on mvName`
- CANCEL MATERIALIZED VIEW TASK, tasks("type"="mv") and jobs("type"="mv") support check auth use priv of mv
- tasks and jobs add column mvName and mvDbName,you can use `select * from tasks("type"="mv") where MvName="xxx"` get all tasks of mv
- fix `desc mv all` error
- fix p0 The task sequence is incorrect
This commit is contained in:
zhangdong
2023-12-31 23:10:30 +08:00
committed by GitHub
parent b07ce175de
commit 5985d216f3
23 changed files with 396 additions and 17 deletions

View File

@ -187,7 +187,7 @@ public class DescribeStmt extends ShowStmt {
// show base table schema only
String procString = "/catalogs/" + catalog.getId() + "/" + db.getId() + "/" + table.getId() + "/"
+ TableProcDir.INDEX_SCHEMA + "/";
if (table.getType() == TableType.OLAP) {
if (table instanceof OlapTable) {
procString += ((OlapTable) table).getBaseIndexId();
} else {
if (partitionNames != null) {
@ -212,7 +212,7 @@ public class DescribeStmt extends ShowStmt {
}
} else {
Util.prohibitExternalCatalog(dbTableName.getCtl(), this.getClass().getSimpleName() + " ALL");
if (table.getType() == TableType.OLAP) {
if (table instanceof OlapTable) {
isOlapTable = true;
OlapTable olapTable = (OlapTable) table;
Set<String> bfColumns = olapTable.getCopiedBfColumns();

View File

@ -17,6 +17,7 @@
package org.apache.doris.job.extensions.mtmv;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
@ -25,12 +26,15 @@ import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.job.base.AbstractJob;
import org.apache.doris.job.common.JobType;
import org.apache.doris.job.common.TaskType;
import org.apache.doris.job.extensions.mtmv.MTMVTask.MTMVTaskTriggerMode;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ShowResultSetMetaData;
import org.apache.doris.thrift.TCell;
@ -67,11 +71,14 @@ public class MTMVJob extends AbstractJob<MTMVTask, MTMVTaskContext> {
public static final ImmutableList<Column> SCHEMA = ImmutableList.of(
new Column("Id", ScalarType.createStringType()),
new Column("Name", ScalarType.createStringType()),
new Column("MvId", ScalarType.createStringType()),
new Column("MvName", ScalarType.createStringType()),
new Column("MvDatabaseId", ScalarType.createStringType()),
new Column("MvDatabaseName", ScalarType.createStringType()),
new Column("ExecuteType", ScalarType.createStringType()),
new Column("RecurringStrategy", ScalarType.createStringType()),
new Column("Status", ScalarType.createStringType()),
new Column("CreateTime", ScalarType.createStringType()),
new Column("Comment", ScalarType.createStringType()));
new Column("CreateTime", ScalarType.createStringType()));
public static final ImmutableMap<String, Integer> COLUMN_TO_INDEX;
@ -118,6 +125,7 @@ public class MTMVJob extends AbstractJob<MTMVTask, MTMVTaskContext> {
@Override
public List<MTMVTask> createTasks(TaskType taskType, MTMVTaskContext taskContext) {
LOG.info("begin create mtmv task, jobId: {}, taskContext: {}", super.getJobId(), taskContext);
if (taskContext == null) {
taskContext = new MTMVTaskContext(MTMVTaskTriggerMode.SYSTEM);
}
@ -126,6 +134,7 @@ public class MTMVJob extends AbstractJob<MTMVTask, MTMVTaskContext> {
ArrayList<MTMVTask> tasks = new ArrayList<>();
tasks.add(task);
super.initTasks(tasks, taskType);
LOG.info("finish create mtmv task, task: {}", task);
return tasks;
}
@ -144,6 +153,8 @@ public class MTMVJob extends AbstractJob<MTMVTask, MTMVTaskContext> {
List<MTMVTask> runningTasks = getRunningTasks();
for (MTMVTask task : runningTasks) {
if (task.getTaskContext() == null || task.getTaskContext().getTriggerMode() == MTMVTaskTriggerMode.SYSTEM) {
LOG.warn("isReadyForScheduling return false, because current taskContext is null, exist task: {}",
task);
return false;
}
}
@ -195,14 +206,40 @@ public class MTMVJob extends AbstractJob<MTMVTask, MTMVTaskContext> {
TRow trow = new TRow();
trow.addToColumnValue(new TCell().setStringVal(String.valueOf(super.getJobId())));
trow.addToColumnValue(new TCell().setStringVal(super.getJobName()));
String dbName = "";
String mvName = "";
try {
MTMV mtmv = getMTMV();
dbName = mtmv.getQualifiedDbName();
mvName = mtmv.getName();
} catch (UserException e) {
LOG.warn("can not find mv", e);
}
trow.addToColumnValue(new TCell().setStringVal(String.valueOf(mtmvId)));
trow.addToColumnValue(new TCell().setStringVal(mvName));
trow.addToColumnValue(new TCell().setStringVal(String.valueOf(dbId)));
trow.addToColumnValue(new TCell().setStringVal(dbName));
trow.addToColumnValue(new TCell().setStringVal(super.getJobConfig().getExecuteType().name()));
trow.addToColumnValue(new TCell().setStringVal(super.getJobConfig().convertRecurringStrategyToString()));
trow.addToColumnValue(new TCell().setStringVal(super.getJobStatus().name()));
trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(super.getCreateTimeMs())));
trow.addToColumnValue(new TCell().setStringVal(super.getComment()));
return trow;
}
public boolean hasPriv(UserIdentity userIdentity, PrivPredicate wanted) {
MTMV mtmv;
try {
mtmv = getMTMV();
} catch (UserException e) {
LOG.warn("can not find mv", e);
return false;
}
return Env.getCurrentEnv().getAccessManager()
.checkTblPriv(userIdentity, InternalCatalog.INTERNAL_CATALOG_NAME,
mtmv.getQualifiedDbName(), mtmv.getName(),
wanted);
}
private MTMV getMTMV() throws DdlException, MetaNotFoundException {
Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbId);
return (MTMV) db.getTableOrMetaException(mtmvId, TableType.MATERIALIZED_VIEW);

View File

@ -25,7 +25,9 @@ import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.job.exception.JobException;
@ -73,7 +75,9 @@ public class MTMVTask extends AbstractTask {
new Column("JobId", ScalarType.createStringType()),
new Column("JobName", ScalarType.createStringType()),
new Column("MvId", ScalarType.createStringType()),
new Column("MvName", ScalarType.createStringType()),
new Column("MvDatabaseId", ScalarType.createStringType()),
new Column("MvDatabaseName", ScalarType.createStringType()),
new Column("Status", ScalarType.createStringType()),
new Column("ErrorMsg", ScalarType.createStringType()),
new Column("CreateTime", ScalarType.createStringType()),
@ -213,8 +217,7 @@ public class MTMVTask extends AbstractTask {
LOG.info("mtmv task before, taskId: {}", super.getTaskId());
super.before();
try {
Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbId);
mtmv = (MTMV) db.getTableOrMetaException(mtmvId, TableType.MATERIALIZED_VIEW);
mtmv = getMTMV();
if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE) {
OlapTable relatedTable = (OlapTable) MTMVUtil.getTable(mtmv.getMvPartitionInfo().getRelatedTable());
MTMVUtil.alignMvPartition(mtmv, relatedTable);
@ -225,6 +228,11 @@ public class MTMVTask extends AbstractTask {
}
}
private MTMV getMTMV() throws DdlException, MetaNotFoundException {
Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbId);
return (MTMV) db.getTableOrMetaException(mtmvId, TableType.MATERIALIZED_VIEW);
}
@Override
public void runTask() throws JobException {
LOG.info("mtmv task runTask, taskId: {}", super.getTaskId());
@ -246,8 +254,19 @@ public class MTMVTask extends AbstractTask {
trow.addToColumnValue(new TCell().setStringVal(String.valueOf(super.getTaskId())));
trow.addToColumnValue(new TCell().setStringVal(String.valueOf(super.getJobId())));
trow.addToColumnValue(new TCell().setStringVal(super.getJobName()));
String dbName = "";
String mvName = "";
try {
MTMV mtmv = getMTMV();
dbName = mtmv.getQualifiedDbName();
mvName = mtmv.getName();
} catch (UserException e) {
LOG.warn("can not find mv", e);
}
trow.addToColumnValue(new TCell().setStringVal(String.valueOf(mtmvId)));
trow.addToColumnValue(new TCell().setStringVal(mvName));
trow.addToColumnValue(new TCell().setStringVal(String.valueOf(dbId)));
trow.addToColumnValue(new TCell().setStringVal(dbName));
trow.addToColumnValue(new TCell()
.setStringVal(super.getStatus() == null ? FeConstants.null_string : super.getStatus().toString()));
trow.addToColumnValue(new TCell().setStringVal(super.getErrMsg()));
@ -355,4 +374,19 @@ public class MTMVTask extends AbstractTask {
public MTMVTaskContext getTaskContext() {
return taskContext;
}
@Override
public String toString() {
return "MTMVTask{"
+ "dbId=" + dbId
+ ", mtmvId=" + mtmvId
+ ", taskContext=" + taskContext
+ ", needRefreshPartitions=" + needRefreshPartitions
+ ", completedPartitions=" + completedPartitions
+ ", refreshMode=" + refreshMode
+ ", mtmv=" + mtmv
+ ", relation=" + relation
+ ", executor=" + executor
+ "} " + super.toString();
}
}

View File

@ -55,4 +55,13 @@ public class MTMVTaskContext {
public boolean isComplete() {
return isComplete;
}
@Override
public String toString() {
return "MTMVTaskContext{"
+ "triggerMode=" + triggerMode
+ ", partitions=" + partitions
+ ", isComplete=" + isComplete
+ '}';
}
}

View File

@ -23,6 +23,7 @@ import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.extensions.mtmv.MTMVTask;
import org.apache.doris.nereids.trees.plans.commands.info.CancelMTMVTaskInfo;
import org.apache.doris.nereids.trees.plans.commands.info.PauseMTMVInfo;
import org.apache.doris.nereids.trees.plans.commands.info.RefreshMTMVInfo;
import org.apache.doris.nereids.trees.plans.commands.info.ResumeMTMVInfo;
@ -107,13 +108,22 @@ public interface MTMVHookService {
/**
* Triggered when pause mtmv
*
* @param info
*/
void pauseMTMV(PauseMTMVInfo info) throws MetaNotFoundException, DdlException, JobException;
/**
* Triggered when resume mtmv
*
* @param info
*/
void resumeMTMV(ResumeMTMVInfo info) throws MetaNotFoundException, DdlException, JobException;
/**
* cancel mtmv task
*
* @param info
*/
void cancelMTMVTask(CancelMTMVTaskInfo info) throws DdlException, MetaNotFoundException, JobException;
}

View File

@ -37,6 +37,7 @@ import org.apache.doris.job.extensions.mtmv.MTMVTask.MTMVTaskTriggerMode;
import org.apache.doris.job.extensions.mtmv.MTMVTaskContext;
import org.apache.doris.mtmv.MTMVRefreshEnum.BuildMode;
import org.apache.doris.mtmv.MTMVRefreshEnum.RefreshTrigger;
import org.apache.doris.nereids.trees.plans.commands.info.CancelMTMVTaskInfo;
import org.apache.doris.nereids.trees.plans.commands.info.PauseMTMVInfo;
import org.apache.doris.nereids.trees.plans.commands.info.RefreshMTMVInfo;
import org.apache.doris.nereids.trees.plans.commands.info.ResumeMTMVInfo;
@ -66,7 +67,6 @@ public class MTMVJobManager implements MTMVHookService {
MTMVJob job = new MTMVJob(mtmv.getDatabase().getId(), mtmv.getId());
job.setJobId(Env.getCurrentEnv().getNextId());
job.setJobName(mtmv.getJobInfo().getJobName());
job.setComment(mtmv.getName());
job.setCreateUser(ConnectContext.get().getCurrentUserIdentity());
job.setJobStatus(JobStatus.RUNNING);
job.setJobConfig(getJobConfig(mtmv));
@ -204,6 +204,12 @@ public class MTMVJobManager implements MTMVHookService {
Env.getCurrentEnv().getJobManager().alterJobStatus(job.getJobId(), JobStatus.RUNNING);
}
@Override
public void cancelMTMVTask(CancelMTMVTaskInfo info) throws DdlException, MetaNotFoundException, JobException {
MTMVJob job = getJobByTableNameInfo(info.getMvName());
job.cancelTaskById(info.getTaskId());
}
private MTMVJob getJobByTableNameInfo(TableNameInfo info) throws DdlException, MetaNotFoundException {
Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(info.getDb());
MTMV mtmv = (MTMV) db.getTableOrMetaException(info.getTbl(), TableType.MATERIALIZED_VIEW);

View File

@ -27,6 +27,7 @@ import org.apache.doris.job.common.TaskStatus;
import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.extensions.mtmv.MTMVTask;
import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVState;
import org.apache.doris.nereids.trees.plans.commands.info.CancelMTMVTaskInfo;
import org.apache.doris.nereids.trees.plans.commands.info.PauseMTMVInfo;
import org.apache.doris.nereids.trees.plans.commands.info.RefreshMTMVInfo;
import org.apache.doris.nereids.trees.plans.commands.info.ResumeMTMVInfo;
@ -200,6 +201,11 @@ public class MTMVRelationManager implements MTMVHookService {
}
@Override
public void cancelMTMVTask(CancelMTMVTaskInfo info) {
}
private void processBaseTableChange(Table table, String msgPrefix) {
BaseTableInfo baseTableInfo = new BaseTableInfo(table);
Set<BaseTableInfo> mtmvsByBaseTable = getMtmvsByBaseTable(baseTableInfo);

View File

@ -26,6 +26,7 @@ import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.extensions.mtmv.MTMVTask;
import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType;
import org.apache.doris.nereids.trees.plans.commands.info.CancelMTMVTaskInfo;
import org.apache.doris.nereids.trees.plans.commands.info.PauseMTMVInfo;
import org.apache.doris.nereids.trees.plans.commands.info.RefreshMTMVInfo;
import org.apache.doris.nereids.trees.plans.commands.info.ResumeMTMVInfo;
@ -159,4 +160,12 @@ public class MTMVService {
mtmvHookService.resumeMTMV(info);
}
}
public void cancelMTMVTask(CancelMTMVTaskInfo info) throws MetaNotFoundException, DdlException, JobException {
Objects.requireNonNull(info);
LOG.info("cancelMTMVTask, CancelMTMVTaskInfo: {}", info);
for (MTMVHookService mtmvHookService : hooks.values()) {
mtmvHookService.cancelMTMVTask(info);
}
}
}

View File

@ -55,6 +55,7 @@ import org.apache.doris.nereids.DorisParser.BracketJoinHintContext;
import org.apache.doris.nereids.DorisParser.BracketRelationHintContext;
import org.apache.doris.nereids.DorisParser.BuildModeContext;
import org.apache.doris.nereids.DorisParser.CallProcedureContext;
import org.apache.doris.nereids.DorisParser.CancelMTMVTaskContext;
import org.apache.doris.nereids.DorisParser.CollateContext;
import org.apache.doris.nereids.DorisParser.ColumnDefContext;
import org.apache.doris.nereids.DorisParser.ColumnDefsContext;
@ -326,6 +327,7 @@ import org.apache.doris.nereids.trees.plans.commands.AddConstraintCommand;
import org.apache.doris.nereids.trees.plans.commands.AlterMTMVCommand;
import org.apache.doris.nereids.trees.plans.commands.BatchInsertIntoTableCommand;
import org.apache.doris.nereids.trees.plans.commands.CallCommand;
import org.apache.doris.nereids.trees.plans.commands.CancelMTMVTaskCommand;
import org.apache.doris.nereids.trees.plans.commands.Command;
import org.apache.doris.nereids.trees.plans.commands.Constraint;
import org.apache.doris.nereids.trees.plans.commands.CreateMTMVCommand;
@ -351,6 +353,7 @@ import org.apache.doris.nereids.trees.plans.commands.info.AlterMTMVRefreshInfo;
import org.apache.doris.nereids.trees.plans.commands.info.AlterMTMVRenameInfo;
import org.apache.doris.nereids.trees.plans.commands.info.BulkLoadDataDesc;
import org.apache.doris.nereids.trees.plans.commands.info.BulkStorageDesc;
import org.apache.doris.nereids.trees.plans.commands.info.CancelMTMVTaskInfo;
import org.apache.doris.nereids.trees.plans.commands.info.ColumnDefinition;
import org.apache.doris.nereids.trees.plans.commands.info.CreateMTMVInfo;
import org.apache.doris.nereids.trees.plans.commands.info.CreateTableInfo;
@ -712,6 +715,13 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> {
return new ResumeMTMVCommand(new ResumeMTMVInfo(new TableNameInfo(nameParts)));
}
@Override
public CancelMTMVTaskCommand visitCancelMTMVTask(CancelMTMVTaskContext ctx) {
List<String> nameParts = visitMultipartIdentifier(ctx.mvName);
long taskId = Long.parseLong(ctx.taskId.getText());
return new CancelMTMVTaskCommand(new CancelMTMVTaskInfo(new TableNameInfo(nameParts), taskId));
}
@Override
public AlterMTMVCommand visitAlterMTMV(AlterMTMVContext ctx) {
List<String> nameParts = visitMultipartIdentifier(ctx.mvName);

View File

@ -135,5 +135,6 @@ public enum PlanType {
DROP_MTMV_COMMAND,
PAUSE_MTMV_COMMAND,
RESUME_MTMV_COMMAND,
CANCEL_MTMV_TASK_COMMAND,
CALL_COMMAND
}

View File

@ -0,0 +1,50 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.plans.commands;
import org.apache.doris.catalog.Env;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.commands.info.CancelMTMVTaskInfo;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.StmtExecutor;
import java.util.Objects;
/**
* cancel mtmv task
*/
public class CancelMTMVTaskCommand extends Command implements ForwardWithSync, NotAllowFallback {
private final CancelMTMVTaskInfo cancelMTMVTaskInfo;
public CancelMTMVTaskCommand(CancelMTMVTaskInfo cancelMTMVTaskInfo) {
super(PlanType.CANCEL_MTMV_TASK_COMMAND);
this.cancelMTMVTaskInfo = Objects.requireNonNull(cancelMTMVTaskInfo, "require cancelMTMVTaskInfo object");
}
@Override
public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
cancelMTMVTaskInfo.analyze(ctx);
Env.getCurrentEnv().getMtmvService().cancelMTMVTask(cancelMTMVTaskInfo);
}
@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitCancelMTMVTaskCommand(this, context);
}
}

View File

@ -0,0 +1,91 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.plans.commands.info;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.qe.ConnectContext;
import java.util.Objects;
/**
* cancel mtmv task info
*/
public class CancelMTMVTaskInfo {
private final TableNameInfo mvName;
private final long taskId;
public CancelMTMVTaskInfo(TableNameInfo mvName, long taskId) {
this.mvName = Objects.requireNonNull(mvName, "require mvName object");
this.taskId = taskId;
}
/**
* analyze pause info
*
* @param ctx ConnectContext
*/
public void analyze(ConnectContext ctx) {
mvName.analyze(ctx);
if (!Env.getCurrentEnv().getAccessManager().checkTblPriv(ConnectContext.get(), mvName.getDb(),
mvName.getTbl(), PrivPredicate.CREATE)) {
String message = ErrorCode.ERR_TABLEACCESS_DENIED_ERROR.formatErrorMsg("CREATE",
ConnectContext.get().getQualifiedUser(), ConnectContext.get().getRemoteIP(),
mvName.getDb() + ": " + mvName.getTbl());
throw new AnalysisException(message);
}
try {
Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(mvName.getDb());
db.getTableOrMetaException(mvName.getTbl(), TableType.MATERIALIZED_VIEW);
} catch (MetaNotFoundException | DdlException e) {
throw new AnalysisException(e.getMessage());
}
}
/**
* getMvName
*
* @return TableNameInfo
*/
public TableNameInfo getMvName() {
return mvName;
}
/**
* get taskId
*
* @return taskId
*/
public long getTaskId() {
return taskId;
}
@Override
public String toString() {
return "CancelMTMVTaskInfo{"
+ "mvName=" + mvName
+ ", taskId=" + taskId
+ '}';
}
}

View File

@ -69,4 +69,11 @@ public class PauseMTMVInfo {
public TableNameInfo getMvName() {
return mvName;
}
@Override
public String toString() {
return "PauseMTMVInfo{"
+ "mvName=" + mvName
+ '}';
}
}

View File

@ -100,4 +100,13 @@ public class RefreshMTMVInfo {
public boolean isComplete() {
return isComplete;
}
@Override
public String toString() {
return "RefreshMTMVInfo{"
+ "mvName=" + mvName
+ ", partitions=" + partitions
+ ", isComplete=" + isComplete
+ '}';
}
}

View File

@ -69,4 +69,11 @@ public class ResumeMTMVInfo {
public TableNameInfo getMvName() {
return mvName;
}
@Override
public String toString() {
return "ResumeMTMVInfo{"
+ "mvName=" + mvName
+ '}';
}
}

View File

@ -21,6 +21,7 @@ import org.apache.doris.nereids.trees.plans.commands.AddConstraintCommand;
import org.apache.doris.nereids.trees.plans.commands.AlterMTMVCommand;
import org.apache.doris.nereids.trees.plans.commands.BatchInsertIntoTableCommand;
import org.apache.doris.nereids.trees.plans.commands.CallCommand;
import org.apache.doris.nereids.trees.plans.commands.CancelMTMVTaskCommand;
import org.apache.doris.nereids.trees.plans.commands.Command;
import org.apache.doris.nereids.trees.plans.commands.CreateMTMVCommand;
import org.apache.doris.nereids.trees.plans.commands.CreatePolicyCommand;
@ -123,6 +124,10 @@ public interface CommandVisitor<R, C> {
return visitCommand(resumeMTMVCommand, context);
}
default R visitCancelMTMVTaskCommand(CancelMTMVTaskCommand cancelMTMVTaskCommand, C context) {
return visitCommand(cancelMTMVTaskCommand, context);
}
default R visitCallCommand(CallCommand callCommand, C context) {
return visitCommand(callCommand, context);
}

View File

@ -17,11 +17,12 @@
package org.apache.doris.tablefunction;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.job.common.JobType;
import org.apache.doris.job.extensions.insert.InsertJob;
import org.apache.doris.job.extensions.mtmv.MTMVJob;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.thrift.TJobsMetadataParams;
@ -64,9 +65,10 @@ public class JobsTableValuedFunction extends MetadataTableValuedFunction {
throw new AnalysisException("Invalid job metadata query");
}
this.jobType = jobType;
UserIdentity userIdentity = ConnectContext.get().getCurrentUserIdentity();
if (!userIdentity.isRootUser()) {
throw new AnalysisException("only root user can operate");
if (jobType != JobType.MV) {
if (!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) {
throw new AnalysisException("only ADMIN priv can operate");
}
}
}

View File

@ -31,6 +31,7 @@ import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.job.common.JobType;
import org.apache.doris.job.extensions.mtmv.MTMVJob;
import org.apache.doris.job.task.AbstractTask;
import org.apache.doris.mtmv.MTMVUtil;
import org.apache.doris.mysql.privilege.PrivPredicate;
@ -586,12 +587,20 @@ public class MetadataGenerator {
TJobsMetadataParams jobsMetadataParams = params.getJobsMetadataParams();
String type = jobsMetadataParams.getType();
JobType jobType = JobType.valueOf(type);
TUserIdentity currentUserIdent = jobsMetadataParams.getCurrentUserIdent();
UserIdentity userIdentity = UserIdentity.fromThrift(currentUserIdent);
List<TRow> dataBatch = Lists.newArrayList();
TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult();
List<org.apache.doris.job.base.AbstractJob> jobList = Env.getCurrentEnv().getJobManager().queryJobs(jobType);
for (org.apache.doris.job.base.AbstractJob job : jobList) {
if (job instanceof MTMVJob) {
MTMVJob mtmvJob = (MTMVJob) job;
if (!mtmvJob.hasPriv(userIdentity, PrivPredicate.SHOW)) {
continue;
}
}
dataBatch.add(job.getTvfInfo());
}
result.setDataBatch(dataBatch);
@ -607,12 +616,20 @@ public class MetadataGenerator {
TTasksMetadataParams tasksMetadataParams = params.getTasksMetadataParams();
String type = tasksMetadataParams.getType();
JobType jobType = JobType.valueOf(type);
TUserIdentity currentUserIdent = tasksMetadataParams.getCurrentUserIdent();
UserIdentity userIdentity = UserIdentity.fromThrift(currentUserIdent);
List<TRow> dataBatch = Lists.newArrayList();
TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult();
List<org.apache.doris.job.base.AbstractJob> jobList = Env.getCurrentEnv().getJobManager().queryJobs(jobType);
for (org.apache.doris.job.base.AbstractJob job : jobList) {
if (job instanceof MTMVJob) {
MTMVJob mtmvJob = (MTMVJob) job;
if (!mtmvJob.hasPriv(userIdentity, PrivPredicate.SHOW)) {
continue;
}
}
List<AbstractTask> tasks = job.queryAllTasks();
for (AbstractTask task : tasks) {
TRow tvfInfo = task.getTvfInfo();

View File

@ -17,11 +17,12 @@
package org.apache.doris.tablefunction;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.job.common.JobType;
import org.apache.doris.job.extensions.insert.InsertTask;
import org.apache.doris.job.extensions.mtmv.MTMVTask;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.thrift.TMetaScanRange;
@ -64,9 +65,10 @@ public class TasksTableValuedFunction extends MetadataTableValuedFunction {
throw new AnalysisException("Invalid task metadata query");
}
this.jobType = jobType;
UserIdentity userIdentity = ConnectContext.get().getCurrentUserIdentity();
if (!userIdentity.isRootUser()) {
throw new AnalysisException("only root user can operate");
if (jobType != JobType.MV) {
if (!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) {
throw new AnalysisException("only ADMIN priv can operate");
}
}
}