[feature-wip](MTMV) Support alter statement (#16817)
Steps: 1. drop the old MTMV jobs 2. clear the old task records and clean the running and pending tasks 3. set the new scheduler info in MTMV and replay it in followers. 4. create a job in the master node. Note that if you change the refresh info of MTMV, the old MTMV tasks will be cleaned.
This commit is contained in:
@ -19,7 +19,6 @@ package org.apache.doris.alter;
|
||||
|
||||
import org.apache.doris.analysis.AddPartitionClause;
|
||||
import org.apache.doris.analysis.AlterClause;
|
||||
import org.apache.doris.analysis.AlterMaterializedViewStmt;
|
||||
import org.apache.doris.analysis.AlterSystemStmt;
|
||||
import org.apache.doris.analysis.AlterTableStmt;
|
||||
import org.apache.doris.analysis.AlterViewStmt;
|
||||
@ -47,6 +46,7 @@ import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.DataProperty;
|
||||
import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.MaterializedView;
|
||||
import org.apache.doris.catalog.MysqlTable;
|
||||
import org.apache.doris.catalog.OdbcTable;
|
||||
import org.apache.doris.catalog.OlapTable;
|
||||
@ -60,13 +60,17 @@ import org.apache.doris.catalog.TableIf.TableType;
|
||||
import org.apache.doris.catalog.Tablet;
|
||||
import org.apache.doris.catalog.View;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.MetaNotFoundException;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.common.util.DynamicPartitionUtil;
|
||||
import org.apache.doris.common.util.MetaLockUtils;
|
||||
import org.apache.doris.common.util.PropertyAnalyzer;
|
||||
import org.apache.doris.mtmv.MTMVJobFactory;
|
||||
import org.apache.doris.mtmv.MTMVUtils.TaskSubmitStatus;
|
||||
import org.apache.doris.mtmv.metadata.MTMVJob;
|
||||
import org.apache.doris.persist.AlterMultiMaterializedView;
|
||||
import org.apache.doris.persist.AlterViewInfo;
|
||||
import org.apache.doris.persist.BatchModifyPartitionsInfo;
|
||||
import org.apache.doris.persist.ModifyCommentOperationLog;
|
||||
@ -507,10 +511,39 @@ public class Alter {
|
||||
}
|
||||
}
|
||||
|
||||
public void processAlterMaterializedView(AlterMaterializedViewStmt stmt) throws UserException {
|
||||
TableName tbl = stmt.getTable();
|
||||
Env.getCurrentEnv().getInternalCatalog().getDb(tbl.getDb());
|
||||
throw new DdlException("ALTER MATERIALIZED VIEW is not implemented: " + stmt.toSql());
|
||||
public void processAlterMaterializedView(AlterMultiMaterializedView alterView, boolean isReplay)
|
||||
throws UserException {
|
||||
TableName tbl = alterView.getMvName();
|
||||
MaterializedView olapTable = null;
|
||||
try {
|
||||
// 1. check mv exist
|
||||
Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(tbl.getDb());
|
||||
olapTable = (MaterializedView) db.getTableOrMetaException(tbl.getTbl(), TableType.MATERIALIZED_VIEW);
|
||||
|
||||
// 2. drop old job and kill the associated tasks
|
||||
Env.getCurrentEnv().getMTMVJobManager().dropJobByName(tbl.getDb(), tbl.getTbl());
|
||||
|
||||
// 3. overwrite the refresh info in the memory of fe.
|
||||
olapTable.writeLock();
|
||||
olapTable.setRefreshInfo(alterView.getInfo());
|
||||
|
||||
// 4. log it and replay it in the follower
|
||||
if (!isReplay) {
|
||||
Env.getCurrentEnv().getEditLog().logAlterMTMV(alterView);
|
||||
// 5. master node generate new jobs
|
||||
if (Config.enable_mtmv_scheduler_framework && MTMVJobFactory.isGenerateJob(olapTable)) {
|
||||
List<MTMVJob> jobs = MTMVJobFactory.buildJob(olapTable, db.getFullName());
|
||||
for (MTMVJob job : jobs) {
|
||||
Env.getCurrentEnv().getMTMVJobManager().createJob(job, false);
|
||||
}
|
||||
LOG.info("Alter mv success with new mv job created.");
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
if (olapTable != null) {
|
||||
olapTable.writeUnlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// entry of processing replace table
|
||||
|
||||
@ -171,6 +171,7 @@ import org.apache.doris.mtmv.MTMVJobManager;
|
||||
import org.apache.doris.mysql.privilege.AccessControllerManager;
|
||||
import org.apache.doris.mysql.privilege.Auth;
|
||||
import org.apache.doris.mysql.privilege.PrivPredicate;
|
||||
import org.apache.doris.persist.AlterMultiMaterializedView;
|
||||
import org.apache.doris.persist.BackendIdsUpdateInfo;
|
||||
import org.apache.doris.persist.BackendReplicasInfo;
|
||||
import org.apache.doris.persist.BackendTabletsInfo;
|
||||
@ -3666,7 +3667,8 @@ public class Env {
|
||||
}
|
||||
|
||||
public void alterMaterializedView(AlterMaterializedViewStmt stmt) throws UserException {
|
||||
this.alter.processAlterMaterializedView(stmt);
|
||||
AlterMultiMaterializedView alter = new AlterMultiMaterializedView(stmt.getTable(), stmt.getRefreshInfo());
|
||||
this.alter.processAlterMaterializedView(alter, false);
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@ -88,6 +88,10 @@ public class MaterializedView extends OlapTable {
|
||||
return refreshInfo;
|
||||
}
|
||||
|
||||
public void setRefreshInfo(MVRefreshInfo info) {
|
||||
refreshInfo = info;
|
||||
}
|
||||
|
||||
public String getQuery() {
|
||||
return query;
|
||||
}
|
||||
|
||||
@ -61,6 +61,7 @@ import org.apache.doris.mtmv.metadata.DropMTMVTask;
|
||||
import org.apache.doris.mtmv.metadata.MTMVJob;
|
||||
import org.apache.doris.mtmv.metadata.MTMVTask;
|
||||
import org.apache.doris.mysql.privilege.UserPropertyInfo;
|
||||
import org.apache.doris.persist.AlterMultiMaterializedView;
|
||||
import org.apache.doris.persist.AlterRoutineLoadJobOperationLog;
|
||||
import org.apache.doris.persist.AlterUserOperationLog;
|
||||
import org.apache.doris.persist.AlterViewInfo;
|
||||
@ -776,6 +777,11 @@ public class JournalEntity implements Writable {
|
||||
isRead = true;
|
||||
break;
|
||||
}
|
||||
case OperationType.OP_ALTER_MTMV_STMT: {
|
||||
data = AlterMultiMaterializedView.read(in);
|
||||
isRead = true;
|
||||
break;
|
||||
}
|
||||
case OperationType.OP_ALTER_USER: {
|
||||
data = AlterUserOperationLog.read(in);
|
||||
isRead = true;
|
||||
|
||||
@ -44,6 +44,7 @@ import java.io.DataOutputStream;
|
||||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Executors;
|
||||
@ -283,8 +284,20 @@ public class MTMVJobManager {
|
||||
LOG.info("change job:{}", changeJob.getJobId());
|
||||
}
|
||||
|
||||
public void dropJobByName(String dbName, String mvName) {
|
||||
for (String jobName : nameToJobMap.keySet()) {
|
||||
MTMVJob job = nameToJobMap.get(jobName);
|
||||
if (job.getMVName().equals(mvName) && job.getDBName().equals(dbName)) {
|
||||
dropJobs(Collections.singletonList(job.getId()), false);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void dropJobs(List<Long> jobIds, boolean isReplay) {
|
||||
// keep nameToJobMap and manualTaskMap consist
|
||||
if (jobIds.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
if (!tryLock()) {
|
||||
return;
|
||||
}
|
||||
|
||||
@ -442,6 +442,9 @@ public class MTMVTaskManager {
|
||||
}
|
||||
|
||||
public void dropTasks(List<String> taskIds, boolean isReplay) {
|
||||
if (taskIds.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
if (!tryLock()) {
|
||||
return;
|
||||
}
|
||||
|
||||
@ -0,0 +1,61 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.persist;
|
||||
|
||||
import org.apache.doris.analysis.MVRefreshInfo;
|
||||
import org.apache.doris.analysis.TableName;
|
||||
import org.apache.doris.common.io.Text;
|
||||
import org.apache.doris.common.io.Writable;
|
||||
import org.apache.doris.persist.gson.GsonUtils;
|
||||
|
||||
import com.google.gson.annotations.SerializedName;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
|
||||
public class AlterMultiMaterializedView implements Writable {
|
||||
|
||||
@SerializedName(value = "mvName")
|
||||
private TableName mvName;
|
||||
|
||||
@SerializedName(value = "info")
|
||||
private MVRefreshInfo info;
|
||||
|
||||
public AlterMultiMaterializedView(TableName mvName, MVRefreshInfo info) {
|
||||
this.mvName = mvName;
|
||||
this.info = info;
|
||||
}
|
||||
|
||||
public MVRefreshInfo getInfo() {
|
||||
return info;
|
||||
}
|
||||
|
||||
public TableName getMvName() {
|
||||
return mvName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(DataOutput out) throws IOException {
|
||||
Text.writeString(out, GsonUtils.GSON.toJson(this));
|
||||
}
|
||||
|
||||
public static AlterMultiMaterializedView read(DataInput in) throws IOException {
|
||||
return GsonUtils.GSON.fromJson(Text.readString(in), AlterMultiMaterializedView.class);
|
||||
}
|
||||
}
|
||||
@ -915,6 +915,11 @@ public class EditLog {
|
||||
env.getMTMVJobManager().replayDropJobTasks(dropTask.getTaskIds());
|
||||
break;
|
||||
}
|
||||
case OperationType.OP_ALTER_MTMV_STMT: {
|
||||
final AlterMultiMaterializedView alterView = (AlterMultiMaterializedView) journal.getData();
|
||||
env.getAlterInstance().processAlterMaterializedView(alterView, true);
|
||||
break;
|
||||
}
|
||||
case OperationType.OP_ALTER_USER: {
|
||||
final AlterUserOperationLog log = (AlterUserOperationLog) journal.getData();
|
||||
env.getAuth().replayAlterUser(log);
|
||||
@ -1713,4 +1718,8 @@ public class EditLog {
|
||||
public void logAlterUser(AlterUserOperationLog log) {
|
||||
logEdit(OperationType.OP_ALTER_USER, log);
|
||||
}
|
||||
|
||||
public void logAlterMTMV(AlterMultiMaterializedView log) {
|
||||
logEdit(OperationType.OP_ALTER_MTMV_STMT, log);
|
||||
}
|
||||
}
|
||||
|
||||
@ -258,6 +258,8 @@ public class OperationType {
|
||||
public static final short OP_DROP_MTMV_TASK = 341;
|
||||
public static final short OP_CHANGE_MTMV_TASK = 342;
|
||||
|
||||
public static final short OP_ALTER_MTMV_STMT = 345;
|
||||
|
||||
public static final short OP_DROP_EXTERNAL_TABLE = 350;
|
||||
public static final short OP_DROP_EXTERNAL_DB = 351;
|
||||
public static final short OP_CREATE_EXTERNAL_TABLE = 352;
|
||||
|
||||
@ -125,6 +125,18 @@ suite("test_create_mtmv") {
|
||||
assertEquals 'SUCCESS', state, show_task_result.last().toString()
|
||||
assertEquals 2, show_task_result.size()
|
||||
|
||||
// test alter mtmv
|
||||
sql """
|
||||
alter MATERIALIZED VIEW ${mvName} REFRESH COMPLETE start with "2022-11-03 00:00:00" next 2 DAY
|
||||
"""
|
||||
show_job_meta = sql_meta "SHOW MTMV JOB ON ${mvName}"
|
||||
def scheduleIndex = show_job_meta.indexOf(['Schedule', 'CHAR'])
|
||||
|
||||
show_job_result = sql "SHOW MTMV JOB ON ${mvName}"
|
||||
assertEquals 1, show_job_result.size()
|
||||
|
||||
assertEquals 'START 2022-11-03T00:00 EVERY(2 DAYS)', show_job_result.last().get(scheduleIndex).toString(), show_job_result.last().toString()
|
||||
|
||||
sql """
|
||||
DROP MATERIALIZED VIEW ${mvName}
|
||||
"""
|
||||
|
||||
Reference in New Issue
Block a user