[feature](mtmv)MTMV pause and resume (#28887)
- PAUSE MATERIALIZED VIEW JOB ON mv1 - RESUME MATERIALIZED VIEW JOB ON mv1 - fix when drop db,not drop job - add lock for one materialized view can only run one task at a time
This commit is contained in:
@ -519,6 +519,11 @@ public class InternalCatalog implements CatalogIf<Database> {
|
||||
}
|
||||
}
|
||||
}
|
||||
for (Table table : tableList) {
|
||||
if (table.getType() == TableType.MATERIALIZED_VIEW) {
|
||||
Env.getCurrentEnv().getMtmvService().dropMTMV((MTMV) table);
|
||||
}
|
||||
}
|
||||
unprotectDropDb(db, stmt.isForceDrop(), false, 0);
|
||||
} finally {
|
||||
MetaLockUtils.writeUnlockTables(tableList);
|
||||
|
||||
@ -47,9 +47,12 @@ import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
public class MTMVJob extends AbstractJob<MTMVTask, MTMVTaskContext> {
|
||||
private static final Logger LOG = LogManager.getLogger(MTMVJob.class);
|
||||
private ReentrantReadWriteLock jobRwLock;
|
||||
|
||||
private static final ShowResultSetMetaData JOB_META_DATA =
|
||||
ShowResultSetMetaData.builder()
|
||||
.addColumn(new Column("JobId", ScalarType.createVarchar(20)))
|
||||
@ -98,12 +101,14 @@ public class MTMVJob extends AbstractJob<MTMVTask, MTMVTaskContext> {
|
||||
private long mtmvId;
|
||||
|
||||
public MTMVJob() {
|
||||
jobRwLock = new ReentrantReadWriteLock(true);
|
||||
}
|
||||
|
||||
public MTMVJob(long dbId, long mtmvId) {
|
||||
this.dbId = dbId;
|
||||
this.mtmvId = mtmvId;
|
||||
super.setCreateTimeMs(System.currentTimeMillis());
|
||||
jobRwLock = new ReentrantReadWriteLock(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -203,6 +208,22 @@ public class MTMVJob extends AbstractJob<MTMVTask, MTMVTaskContext> {
|
||||
return (MTMV) db.getTableOrMetaException(mtmvId, TableType.MATERIALIZED_VIEW);
|
||||
}
|
||||
|
||||
public void readLock() {
|
||||
this.jobRwLock.readLock().lock();
|
||||
}
|
||||
|
||||
public void readUnlock() {
|
||||
this.jobRwLock.readLock().unlock();
|
||||
}
|
||||
|
||||
public void writeLock() {
|
||||
this.jobRwLock.writeLock().lock();
|
||||
}
|
||||
|
||||
public void writeUnlock() {
|
||||
this.jobRwLock.writeLock().unlock();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(DataOutput out) throws IOException {
|
||||
Text.writeString(out, GsonUtils.GSON.toJson(this));
|
||||
|
||||
@ -220,6 +220,17 @@ public class MTMVTask extends AbstractTask {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void runTask() throws JobException {
|
||||
MTMVJob job = (MTMVJob) getJobOrJobException();
|
||||
try {
|
||||
job.writeLock();
|
||||
super.runTask();
|
||||
} finally {
|
||||
job.writeUnlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public TRow getTvfInfo() {
|
||||
TRow trow = new TRow();
|
||||
@ -276,8 +287,10 @@ public class MTMVTask extends AbstractTask {
|
||||
}
|
||||
|
||||
private void after() {
|
||||
Env.getCurrentEnv()
|
||||
.addMTMVTaskResult(new TableNameInfo(mtmv.getQualifiedDbName(), mtmv.getName()), this, relation);
|
||||
if (mtmv != null) {
|
||||
Env.getCurrentEnv()
|
||||
.addMTMVTaskResult(new TableNameInfo(mtmv.getQualifiedDbName(), mtmv.getName()), this, relation);
|
||||
}
|
||||
mtmv = null;
|
||||
relation = null;
|
||||
executor = null;
|
||||
|
||||
@ -135,4 +135,12 @@ public abstract class AbstractTask implements Task {
|
||||
return job == null ? "" : job.getJobName();
|
||||
}
|
||||
|
||||
public Job getJobOrJobException() throws JobException {
|
||||
AbstractJob job = Env.getCurrentEnv().getJobManager().getJob(jobId);
|
||||
if (job == null) {
|
||||
throw new JobException("job not exist, jobId:" + jobId);
|
||||
}
|
||||
return job;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -21,8 +21,11 @@ import org.apache.doris.catalog.MTMV;
|
||||
import org.apache.doris.catalog.Table;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.MetaNotFoundException;
|
||||
import org.apache.doris.job.exception.JobException;
|
||||
import org.apache.doris.job.extensions.mtmv.MTMVTask;
|
||||
import org.apache.doris.nereids.trees.plans.commands.info.PauseMTMVInfo;
|
||||
import org.apache.doris.nereids.trees.plans.commands.info.RefreshMTMVInfo;
|
||||
import org.apache.doris.nereids.trees.plans.commands.info.ResumeMTMVInfo;
|
||||
import org.apache.doris.persist.AlterMTMV;
|
||||
|
||||
/**
|
||||
@ -77,7 +80,7 @@ public interface MTMVHookService {
|
||||
* @throws DdlException
|
||||
* @throws MetaNotFoundException
|
||||
*/
|
||||
void refreshMTMV(RefreshMTMVInfo info) throws DdlException, MetaNotFoundException;
|
||||
void refreshMTMV(RefreshMTMVInfo info) throws DdlException, MetaNotFoundException, JobException;
|
||||
|
||||
/**
|
||||
* triggered when mtmv task finish
|
||||
@ -101,4 +104,16 @@ public interface MTMVHookService {
|
||||
* @param table
|
||||
*/
|
||||
void alterTable(Table table);
|
||||
|
||||
/**
|
||||
* Triggered when pause mtmv
|
||||
* @param info
|
||||
*/
|
||||
void pauseMTMV(PauseMTMVInfo info) throws MetaNotFoundException, DdlException, JobException;
|
||||
|
||||
/**
|
||||
* Triggered when resume mtmv
|
||||
* @param info
|
||||
*/
|
||||
void resumeMTMV(ResumeMTMVInfo info) throws MetaNotFoundException, DdlException, JobException;
|
||||
}
|
||||
|
||||
@ -37,7 +37,10 @@ import org.apache.doris.job.extensions.mtmv.MTMVTask.MTMVTaskTriggerMode;
|
||||
import org.apache.doris.job.extensions.mtmv.MTMVTaskContext;
|
||||
import org.apache.doris.mtmv.MTMVRefreshEnum.BuildMode;
|
||||
import org.apache.doris.mtmv.MTMVRefreshEnum.RefreshTrigger;
|
||||
import org.apache.doris.nereids.trees.plans.commands.info.PauseMTMVInfo;
|
||||
import org.apache.doris.nereids.trees.plans.commands.info.RefreshMTMVInfo;
|
||||
import org.apache.doris.nereids.trees.plans.commands.info.ResumeMTMVInfo;
|
||||
import org.apache.doris.nereids.trees.plans.commands.info.TableNameInfo;
|
||||
import org.apache.doris.persist.AlterMTMV;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
|
||||
@ -167,23 +170,11 @@ public class MTMVJobManager implements MTMVHookService {
|
||||
* @throws MetaNotFoundException
|
||||
*/
|
||||
@Override
|
||||
public void refreshMTMV(RefreshMTMVInfo info) throws DdlException, MetaNotFoundException {
|
||||
Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(info.getMvName().getDb());
|
||||
MTMV mtmv = (MTMV) db.getTableOrMetaException(info.getMvName().getTbl(), TableType.MATERIALIZED_VIEW);
|
||||
List<MTMVJob> jobs = Env.getCurrentEnv().getJobManager()
|
||||
.queryJobs(JobType.MV, mtmv.getJobInfo().getJobName());
|
||||
if (CollectionUtils.isEmpty(jobs) || jobs.size() != 1) {
|
||||
throw new DdlException("jobs not normal,should have one job,but job num is: " + jobs.size());
|
||||
}
|
||||
try {
|
||||
MTMVTaskContext mtmvTaskContext = new MTMVTaskContext(MTMVTaskTriggerMode.MANUAL, info.getPartitions(),
|
||||
info.isComplete());
|
||||
Env.getCurrentEnv().getJobManager().triggerJob(jobs.get(0).getJobId(), mtmvTaskContext);
|
||||
|
||||
} catch (JobException e) {
|
||||
e.printStackTrace();
|
||||
throw new DdlException(e.getMessage());
|
||||
}
|
||||
public void refreshMTMV(RefreshMTMVInfo info) throws DdlException, MetaNotFoundException, JobException {
|
||||
MTMVJob job = getJobByTableNameInfo(info.getMvName());
|
||||
MTMVTaskContext mtmvTaskContext = new MTMVTaskContext(MTMVTaskTriggerMode.MANUAL, info.getPartitions(),
|
||||
info.isComplete());
|
||||
Env.getCurrentEnv().getJobManager().triggerJob(job.getJobId(), mtmvTaskContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -201,4 +192,27 @@ public class MTMVJobManager implements MTMVHookService {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void pauseMTMV(PauseMTMVInfo info) throws MetaNotFoundException, DdlException, JobException {
|
||||
MTMVJob job = getJobByTableNameInfo(info.getMvName());
|
||||
Env.getCurrentEnv().getJobManager().alterJobStatus(job.getJobId(), JobStatus.PAUSED);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void resumeMTMV(ResumeMTMVInfo info) throws MetaNotFoundException, DdlException, JobException {
|
||||
MTMVJob job = getJobByTableNameInfo(info.getMvName());
|
||||
Env.getCurrentEnv().getJobManager().alterJobStatus(job.getJobId(), JobStatus.RUNNING);
|
||||
}
|
||||
|
||||
private MTMVJob getJobByTableNameInfo(TableNameInfo info) throws DdlException, MetaNotFoundException {
|
||||
Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(info.getDb());
|
||||
MTMV mtmv = (MTMV) db.getTableOrMetaException(info.getTbl(), TableType.MATERIALIZED_VIEW);
|
||||
List<MTMVJob> jobs = Env.getCurrentEnv().getJobManager()
|
||||
.queryJobs(JobType.MV, mtmv.getJobInfo().getJobName());
|
||||
if (CollectionUtils.isEmpty(jobs) || jobs.size() != 1) {
|
||||
throw new DdlException("jobs not normal,should have one job,but job num is: " + jobs.size());
|
||||
}
|
||||
return jobs.get(0);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -24,9 +24,12 @@ import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.MetaNotFoundException;
|
||||
import org.apache.doris.job.common.TaskStatus;
|
||||
import org.apache.doris.job.exception.JobException;
|
||||
import org.apache.doris.job.extensions.mtmv.MTMVTask;
|
||||
import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVState;
|
||||
import org.apache.doris.nereids.trees.plans.commands.info.PauseMTMVInfo;
|
||||
import org.apache.doris.nereids.trees.plans.commands.info.RefreshMTMVInfo;
|
||||
import org.apache.doris.nereids.trees.plans.commands.info.ResumeMTMVInfo;
|
||||
import org.apache.doris.nereids.trees.plans.commands.info.TableNameInfo;
|
||||
import org.apache.doris.persist.AlterMTMV;
|
||||
|
||||
@ -187,6 +190,16 @@ public class MTMVRelationManager implements MTMVHookService {
|
||||
processBaseTableChange(table, "The base table has been updated:");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void pauseMTMV(PauseMTMVInfo info) throws MetaNotFoundException, DdlException, JobException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void resumeMTMV(ResumeMTMVInfo info) throws MetaNotFoundException, DdlException, JobException {
|
||||
|
||||
}
|
||||
|
||||
private void processBaseTableChange(Table table, String msgPrefix) {
|
||||
BaseTableInfo baseTableInfo = new BaseTableInfo(table);
|
||||
Set<BaseTableInfo> mtmvsByBaseTable = getMtmvsByBaseTable(baseTableInfo);
|
||||
|
||||
@ -23,9 +23,12 @@ import org.apache.doris.catalog.Table;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.MetaNotFoundException;
|
||||
import org.apache.doris.job.exception.JobException;
|
||||
import org.apache.doris.job.extensions.mtmv.MTMVTask;
|
||||
import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType;
|
||||
import org.apache.doris.nereids.trees.plans.commands.info.PauseMTMVInfo;
|
||||
import org.apache.doris.nereids.trees.plans.commands.info.RefreshMTMVInfo;
|
||||
import org.apache.doris.nereids.trees.plans.commands.info.ResumeMTMVInfo;
|
||||
import org.apache.doris.persist.AlterMTMV;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
@ -108,7 +111,7 @@ public class MTMVService {
|
||||
}
|
||||
}
|
||||
|
||||
public void refreshMTMV(RefreshMTMVInfo info) throws DdlException, MetaNotFoundException {
|
||||
public void refreshMTMV(RefreshMTMVInfo info) throws DdlException, MetaNotFoundException, JobException {
|
||||
Objects.requireNonNull(info);
|
||||
LOG.info("refreshMTMV, RefreshMTMVInfo: {}", info);
|
||||
for (MTMVHookService mtmvHookService : hooks.values()) {
|
||||
@ -140,4 +143,20 @@ public class MTMVService {
|
||||
mtmvHookService.refreshComplete(mtmv, cache, task);
|
||||
}
|
||||
}
|
||||
|
||||
public void pauseMTMV(PauseMTMVInfo info) throws DdlException, MetaNotFoundException, JobException {
|
||||
Objects.requireNonNull(info);
|
||||
LOG.info("pauseMTMV, PauseMTMVInfo: {}", info);
|
||||
for (MTMVHookService mtmvHookService : hooks.values()) {
|
||||
mtmvHookService.pauseMTMV(info);
|
||||
}
|
||||
}
|
||||
|
||||
public void resumeMTMV(ResumeMTMVInfo info) throws MetaNotFoundException, DdlException, JobException {
|
||||
Objects.requireNonNull(info);
|
||||
LOG.info("resumeMTMV, ResumeMTMVInfo: {}", info);
|
||||
for (MTMVHookService mtmvHookService : hooks.values()) {
|
||||
mtmvHookService.resumeMTMV(info);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -123,6 +123,7 @@ import org.apache.doris.nereids.DorisParser.ParenthesizedExpressionContext;
|
||||
import org.apache.doris.nereids.DorisParser.PartitionSpecContext;
|
||||
import org.apache.doris.nereids.DorisParser.PartitionValueDefContext;
|
||||
import org.apache.doris.nereids.DorisParser.PartitionsDefContext;
|
||||
import org.apache.doris.nereids.DorisParser.PauseMTMVContext;
|
||||
import org.apache.doris.nereids.DorisParser.PlanTypeContext;
|
||||
import org.apache.doris.nereids.DorisParser.PredicateContext;
|
||||
import org.apache.doris.nereids.DorisParser.PredicatedContext;
|
||||
@ -142,6 +143,7 @@ import org.apache.doris.nereids.DorisParser.RefreshScheduleContext;
|
||||
import org.apache.doris.nereids.DorisParser.RefreshTriggerContext;
|
||||
import org.apache.doris.nereids.DorisParser.RegularQuerySpecificationContext;
|
||||
import org.apache.doris.nereids.DorisParser.RelationContext;
|
||||
import org.apache.doris.nereids.DorisParser.ResumeMTMVContext;
|
||||
import org.apache.doris.nereids.DorisParser.RollupDefContext;
|
||||
import org.apache.doris.nereids.DorisParser.RollupDefsContext;
|
||||
import org.apache.doris.nereids.DorisParser.RowConstructorContext;
|
||||
@ -339,7 +341,9 @@ import org.apache.doris.nereids.trees.plans.commands.ExportCommand;
|
||||
import org.apache.doris.nereids.trees.plans.commands.InsertIntoTableCommand;
|
||||
import org.apache.doris.nereids.trees.plans.commands.InsertOverwriteTableCommand;
|
||||
import org.apache.doris.nereids.trees.plans.commands.LoadCommand;
|
||||
import org.apache.doris.nereids.trees.plans.commands.PauseMTMVCommand;
|
||||
import org.apache.doris.nereids.trees.plans.commands.RefreshMTMVCommand;
|
||||
import org.apache.doris.nereids.trees.plans.commands.ResumeMTMVCommand;
|
||||
import org.apache.doris.nereids.trees.plans.commands.UpdateCommand;
|
||||
import org.apache.doris.nereids.trees.plans.commands.info.AlterMTMVInfo;
|
||||
import org.apache.doris.nereids.trees.plans.commands.info.AlterMTMVPropertyInfo;
|
||||
@ -360,7 +364,9 @@ import org.apache.doris.nereids.trees.plans.commands.info.IndexDefinition;
|
||||
import org.apache.doris.nereids.trees.plans.commands.info.LessThanPartition;
|
||||
import org.apache.doris.nereids.trees.plans.commands.info.PartitionDefinition;
|
||||
import org.apache.doris.nereids.trees.plans.commands.info.PartitionDefinition.MaxValue;
|
||||
import org.apache.doris.nereids.trees.plans.commands.info.PauseMTMVInfo;
|
||||
import org.apache.doris.nereids.trees.plans.commands.info.RefreshMTMVInfo;
|
||||
import org.apache.doris.nereids.trees.plans.commands.info.ResumeMTMVInfo;
|
||||
import org.apache.doris.nereids.trees.plans.commands.info.RollupDefinition;
|
||||
import org.apache.doris.nereids.trees.plans.commands.info.SimpleColumnDefinition;
|
||||
import org.apache.doris.nereids.trees.plans.commands.info.StepPartition;
|
||||
@ -688,6 +694,18 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> {
|
||||
return new DropMTMVCommand(new DropMTMVInfo(new TableNameInfo(nameParts), ctx.EXISTS() != null));
|
||||
}
|
||||
|
||||
@Override
|
||||
public PauseMTMVCommand visitPauseMTMV(PauseMTMVContext ctx) {
|
||||
List<String> nameParts = visitMultipartIdentifier(ctx.mvName);
|
||||
return new PauseMTMVCommand(new PauseMTMVInfo(new TableNameInfo(nameParts)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public ResumeMTMVCommand visitResumeMTMV(ResumeMTMVContext ctx) {
|
||||
List<String> nameParts = visitMultipartIdentifier(ctx.mvName);
|
||||
return new ResumeMTMVCommand(new ResumeMTMVInfo(new TableNameInfo(nameParts)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public AlterMTMVCommand visitAlterMTMV(AlterMTMVContext ctx) {
|
||||
List<String> nameParts = visitMultipartIdentifier(ctx.mvName);
|
||||
|
||||
@ -133,5 +133,7 @@ public enum PlanType {
|
||||
DROP_CONSTRAINT_COMMAND,
|
||||
REFRESH_MTMV_COMMAND,
|
||||
DROP_MTMV_COMMAND,
|
||||
PAUSE_MTMV_COMMAND,
|
||||
RESUME_MTMV_COMMAND,
|
||||
CALL_COMMAND
|
||||
}
|
||||
|
||||
@ -0,0 +1,50 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.nereids.trees.plans.commands;
|
||||
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.nereids.trees.plans.PlanType;
|
||||
import org.apache.doris.nereids.trees.plans.commands.info.PauseMTMVInfo;
|
||||
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.qe.StmtExecutor;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* pause mtmv
|
||||
*/
|
||||
public class PauseMTMVCommand extends Command implements ForwardWithSync, NotAllowFallback {
|
||||
private final PauseMTMVInfo pauseMTMVInfo;
|
||||
|
||||
public PauseMTMVCommand(PauseMTMVInfo pauseMTMVInfo) {
|
||||
super(PlanType.PAUSE_MTMV_COMMAND);
|
||||
this.pauseMTMVInfo = Objects.requireNonNull(pauseMTMVInfo, "require pauseMTMVInfo object");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
|
||||
pauseMTMVInfo.analyze(ctx);
|
||||
Env.getCurrentEnv().getMtmvService().pauseMTMV(pauseMTMVInfo);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
|
||||
return visitor.visitPauseMTMVCommand(this, context);
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,50 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.nereids.trees.plans.commands;
|
||||
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.nereids.trees.plans.PlanType;
|
||||
import org.apache.doris.nereids.trees.plans.commands.info.ResumeMTMVInfo;
|
||||
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.qe.StmtExecutor;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* resume mtmv
|
||||
*/
|
||||
public class ResumeMTMVCommand extends Command implements ForwardWithSync, NotAllowFallback {
|
||||
private final ResumeMTMVInfo resumeMTMVInfo;
|
||||
|
||||
public ResumeMTMVCommand(ResumeMTMVInfo resumeMTMVInfo) {
|
||||
super(PlanType.RESUME_MTMV_COMMAND);
|
||||
this.resumeMTMVInfo = Objects.requireNonNull(resumeMTMVInfo, "require resumeMTMVInfo object");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
|
||||
resumeMTMVInfo.analyze(ctx);
|
||||
Env.getCurrentEnv().getMtmvService().resumeMTMV(resumeMTMVInfo);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
|
||||
return visitor.visitResumeMTMVCommand(this, context);
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,72 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.nereids.trees.plans.commands.info;
|
||||
|
||||
import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.TableIf.TableType;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.ErrorCode;
|
||||
import org.apache.doris.common.MetaNotFoundException;
|
||||
import org.apache.doris.mysql.privilege.PrivPredicate;
|
||||
import org.apache.doris.nereids.exceptions.AnalysisException;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* pause mtmv info
|
||||
*/
|
||||
public class PauseMTMVInfo {
|
||||
private final TableNameInfo mvName;
|
||||
|
||||
public PauseMTMVInfo(TableNameInfo mvName) {
|
||||
this.mvName = Objects.requireNonNull(mvName, "require mvName object");
|
||||
}
|
||||
|
||||
/**
|
||||
* analyze pause info
|
||||
*
|
||||
* @param ctx ConnectContext
|
||||
*/
|
||||
public void analyze(ConnectContext ctx) {
|
||||
mvName.analyze(ctx);
|
||||
if (!Env.getCurrentEnv().getAccessManager().checkTblPriv(ConnectContext.get(), mvName.getDb(),
|
||||
mvName.getTbl(), PrivPredicate.CREATE)) {
|
||||
String message = ErrorCode.ERR_TABLEACCESS_DENIED_ERROR.formatErrorMsg("CREATE",
|
||||
ConnectContext.get().getQualifiedUser(), ConnectContext.get().getRemoteIP(),
|
||||
mvName.getDb() + ": " + mvName.getTbl());
|
||||
throw new AnalysisException(message);
|
||||
}
|
||||
try {
|
||||
Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(mvName.getDb());
|
||||
db.getTableOrMetaException(mvName.getTbl(), TableType.MATERIALIZED_VIEW);
|
||||
} catch (MetaNotFoundException | DdlException e) {
|
||||
throw new AnalysisException(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* getMvName
|
||||
*
|
||||
* @return TableNameInfo
|
||||
*/
|
||||
public TableNameInfo getMvName() {
|
||||
return mvName;
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,72 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.nereids.trees.plans.commands.info;
|
||||
|
||||
import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.TableIf.TableType;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.ErrorCode;
|
||||
import org.apache.doris.common.MetaNotFoundException;
|
||||
import org.apache.doris.mysql.privilege.PrivPredicate;
|
||||
import org.apache.doris.nereids.exceptions.AnalysisException;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* resume mtmv info
|
||||
*/
|
||||
public class ResumeMTMVInfo {
|
||||
private final TableNameInfo mvName;
|
||||
|
||||
public ResumeMTMVInfo(TableNameInfo mvName) {
|
||||
this.mvName = Objects.requireNonNull(mvName, "require mvName object");
|
||||
}
|
||||
|
||||
/**
|
||||
* analyze resume info
|
||||
*
|
||||
* @param ctx ConnectContext
|
||||
*/
|
||||
public void analyze(ConnectContext ctx) {
|
||||
mvName.analyze(ctx);
|
||||
if (!Env.getCurrentEnv().getAccessManager().checkTblPriv(ConnectContext.get(), mvName.getDb(),
|
||||
mvName.getTbl(), PrivPredicate.CREATE)) {
|
||||
String message = ErrorCode.ERR_TABLEACCESS_DENIED_ERROR.formatErrorMsg("CREATE",
|
||||
ConnectContext.get().getQualifiedUser(), ConnectContext.get().getRemoteIP(),
|
||||
mvName.getDb() + ": " + mvName.getTbl());
|
||||
throw new AnalysisException(message);
|
||||
}
|
||||
try {
|
||||
Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(mvName.getDb());
|
||||
db.getTableOrMetaException(mvName.getTbl(), TableType.MATERIALIZED_VIEW);
|
||||
} catch (MetaNotFoundException | DdlException e) {
|
||||
throw new AnalysisException(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* getMvName
|
||||
*
|
||||
* @return TableNameInfo
|
||||
*/
|
||||
public TableNameInfo getMvName() {
|
||||
return mvName;
|
||||
}
|
||||
}
|
||||
@ -34,7 +34,9 @@ import org.apache.doris.nereids.trees.plans.commands.ExportCommand;
|
||||
import org.apache.doris.nereids.trees.plans.commands.InsertIntoTableCommand;
|
||||
import org.apache.doris.nereids.trees.plans.commands.InsertOverwriteTableCommand;
|
||||
import org.apache.doris.nereids.trees.plans.commands.LoadCommand;
|
||||
import org.apache.doris.nereids.trees.plans.commands.PauseMTMVCommand;
|
||||
import org.apache.doris.nereids.trees.plans.commands.RefreshMTMVCommand;
|
||||
import org.apache.doris.nereids.trees.plans.commands.ResumeMTMVCommand;
|
||||
import org.apache.doris.nereids.trees.plans.commands.UpdateCommand;
|
||||
|
||||
/** CommandVisitor. */
|
||||
@ -113,6 +115,14 @@ public interface CommandVisitor<R, C> {
|
||||
return visitCommand(dropMTMVCommand, context);
|
||||
}
|
||||
|
||||
default R visitPauseMTMVCommand(PauseMTMVCommand pauseMTMVCommand, C context) {
|
||||
return visitCommand(pauseMTMVCommand, context);
|
||||
}
|
||||
|
||||
default R visitResumeMTMVCommand(ResumeMTMVCommand resumeMTMVCommand, C context) {
|
||||
return visitCommand(resumeMTMVCommand, context);
|
||||
}
|
||||
|
||||
default R visitCallCommand(CallCommand callCommand, C context) {
|
||||
return visitCommand(callCommand, context);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user