[fix](mtmv)fix insert overwrite will generate garbage temporary partition when restarting FE (#29075)

This commit is contained in:
zhangdong
2024-01-01 08:12:26 +08:00
committed by GitHub
parent 5985d216f3
commit 01a0f0915f
11 changed files with 588 additions and 143 deletions

View File

@ -146,6 +146,7 @@ import org.apache.doris.ha.MasterInfo;
import org.apache.doris.httpv2.entity.ResponseBody;
import org.apache.doris.httpv2.meta.MetaBaseAction;
import org.apache.doris.httpv2.rest.RestApiStatusCode;
import org.apache.doris.insertoverwrite.InsertOverwriteManager;
import org.apache.doris.job.base.AbstractJob;
import org.apache.doris.job.extensions.mtmv.MTMVTask;
import org.apache.doris.job.manager.JobManager;
@ -515,6 +516,8 @@ public class Env {
private MTMVService mtmvService;
private InsertOverwriteManager insertOverwriteManager;
public List<TFrontendInfo> getFrontendInfos() {
List<TFrontendInfo> res = new ArrayList<>();
@ -748,6 +751,7 @@ public class Env {
this.workloadActionPublisherThread = new WorkloadActionPublishThread("WorkloadActionPublisher",
Config.workload_action_interval_ms, systemInfo);
this.mtmvService = new MTMVService();
this.insertOverwriteManager = new InsertOverwriteManager();
}
public static void destroyCheckpoint() {
@ -807,6 +811,10 @@ public class Env {
return mtmvService;
}
public InsertOverwriteManager getInsertOverwriteManager() {
return insertOverwriteManager;
}
public TabletScheduler getTabletScheduler() {
return tabletScheduler;
}
@ -1495,6 +1503,8 @@ public class Env {
// so no need to check 'isReady' flag in this method
postProcessAfterMetadataReplayed(false);
insertOverwriteManager.allTaskFail();
toMasterProgress = "start daemon threads";
// start all daemon threads that only running on MASTER FE
@ -1626,6 +1636,7 @@ public class Env {
// binlog gcer
binlogGcer.start();
columnIdFlusher.start();
insertOverwriteManager.start();
}
// start threads that should running on all FE
@ -2148,6 +2159,18 @@ public class Env {
return checksum;
}
public long loadInsertOverwrite(DataInputStream in, long checksum) throws IOException {
this.insertOverwriteManager = InsertOverwriteManager.read(in);
LOG.info("finished replay iot from image");
return checksum;
}
public long saveInsertOverwrite(CountingDataOutputStream out, long checksum) throws IOException {
this.insertOverwriteManager.write(out);
LOG.info("finished save iot to image");
return checksum;
}
// Only called by checkpoint thread
// return the latest image file's absolute path
public String saveImage() throws IOException {

View File

@ -0,0 +1,81 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.insertoverwrite;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.gson.GsonUtils;
import com.google.gson.annotations.SerializedName;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class InsertOverwriteLog implements Writable {
public enum InsertOverwriteOpType {
ADD,
DROP,
CANCEL
}
@SerializedName(value = "id")
private long taskId;
@SerializedName(value = "task")
private InsertOverwriteTask task;
@SerializedName(value = "type")
private InsertOverwriteOpType opType;
public InsertOverwriteLog(long taskId, InsertOverwriteTask task,
InsertOverwriteOpType opType) {
this.taskId = taskId;
this.task = task;
this.opType = opType;
}
public long getTaskId() {
return taskId;
}
public InsertOverwriteTask getTask() {
return task;
}
public InsertOverwriteOpType getOpType() {
return opType;
}
@Override
public String toString() {
return "InsertOverwriteLog{"
+ "taskId=" + taskId
+ ", task=" + task
+ ", opType=" + opType
+ '}';
}
@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, GsonUtils.GSON.toJson(this));
}
public static InsertOverwriteLog read(DataInput in) throws IOException {
return GsonUtils.GSON.fromJson(Text.readString(in), InsertOverwriteLog.class);
}
}

View File

@ -0,0 +1,194 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.insertoverwrite;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.insertoverwrite.InsertOverwriteLog.InsertOverwriteOpType;
import org.apache.doris.persist.gson.GsonUtils;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
public class InsertOverwriteManager extends MasterDaemon implements Writable {
private static final Logger LOG = LogManager.getLogger(InsertOverwriteManager.class);
private static final long CLEAN_INTERVAL_SECOND = 10;
@SerializedName(value = "tasks")
private Map<Long, InsertOverwriteTask> tasks = Maps.newConcurrentMap();
public InsertOverwriteManager() {
super("InsertOverwriteDropDirtyPartitions", CLEAN_INTERVAL_SECOND * 1000);
}
/**
* register insert overwrite task
*
* @param dbId
* @param tableId
* @param tempPartitionNames
* @return taskId
*/
public long registerTask(long dbId, long tableId, List<String> tempPartitionNames) {
long taskId = Env.getCurrentEnv().getNextId();
InsertOverwriteTask task = new InsertOverwriteTask(dbId, tableId,
tempPartitionNames);
tasks.put(taskId, task);
Env.getCurrentEnv().getEditLog()
.logInsertOverwrite(new InsertOverwriteLog(taskId, task, InsertOverwriteOpType.ADD));
return taskId;
}
/**
* when insert overwrite fail, try drop temp partition
*
* @param taskId
*/
public void taskFail(long taskId) {
boolean rollback = rollback(taskId);
if (rollback) {
removeTask(taskId);
} else {
cancelTask(taskId);
}
}
/**
* when insert overwrite success, drop task
*
* @param taskId
*/
public void taskSuccess(long taskId) {
removeTask(taskId);
}
/**
* for transferToMaster, try drop all temp partitions
*/
public void allTaskFail() {
LOG.info("try drop all temp partitions when transferToMaster");
HashMap<Long, InsertOverwriteTask> copyTasks = Maps.newHashMap(tasks);
for (Entry<Long, InsertOverwriteTask> entry : copyTasks.entrySet()) {
taskFail(entry.getKey());
}
}
private void cancelTask(long taskId) {
if (tasks.containsKey(taskId)) {
LOG.info("cancel insert overwrite task: {}", tasks.get(taskId));
tasks.get(taskId).setCancel(true);
Env.getCurrentEnv().getEditLog()
.logInsertOverwrite(new InsertOverwriteLog(taskId, null, InsertOverwriteOpType.CANCEL));
}
}
private void removeTask(long taskId) {
if (tasks.containsKey(taskId)) {
LOG.info("remove insert overwrite task: {}", tasks.get(taskId));
tasks.remove(taskId);
Env.getCurrentEnv().getEditLog()
.logInsertOverwrite(new InsertOverwriteLog(taskId, null, InsertOverwriteOpType.DROP));
}
}
/**
* drop temp partitions
*
* @param taskId
* @return if success
*/
private boolean rollback(long taskId) {
InsertOverwriteTask task = tasks.get(taskId);
OlapTable olapTable;
try {
olapTable = task.getTable();
} catch (DdlException e) {
LOG.warn("can not get table, task: {}", task);
return true;
}
return InsertOverwriteUtil.dropPartitions(olapTable, task.getTempPartitionNames());
}
/**
* replay logs
*
* @param insertOverwriteLog
*/
public void replayInsertOverwriteLog(InsertOverwriteLog insertOverwriteLog) {
switch (insertOverwriteLog.getOpType()) {
case ADD:
tasks.put(insertOverwriteLog.getTaskId(), insertOverwriteLog.getTask());
break;
case DROP:
tasks.remove(insertOverwriteLog.getTaskId());
break;
case CANCEL:
InsertOverwriteTask task = tasks.get(insertOverwriteLog.getTaskId());
if (task != null) {
task.setCancel(true);
}
break;
default:
LOG.warn("error insertOverwriteLog: {}", insertOverwriteLog.toString());
}
}
/**
* Regularly drop partitions that have failed dropped
*/
@Override
protected void runAfterCatalogReady() {
LOG.info("start clean insert overwrite temp partitions");
HashMap<Long, InsertOverwriteTask> copyTasks = Maps.newHashMap(tasks);
for (Entry<Long, InsertOverwriteTask> entry : copyTasks.entrySet()) {
if (entry.getValue().isCancel()) {
boolean rollback = rollback(entry.getKey());
if (rollback) {
removeTask(entry.getKey());
}
}
}
}
@Override
public void write(DataOutput out) throws IOException {
String json = GsonUtils.GSON.toJson(this);
Text.writeString(out, json);
}
public static InsertOverwriteManager read(DataInput in) throws IOException {
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, InsertOverwriteManager.class);
}
}

View File

@ -0,0 +1,96 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.insertoverwrite;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.common.DdlException;
import com.google.gson.annotations.SerializedName;
import java.util.List;
public class InsertOverwriteTask {
@SerializedName(value = "cancel")
private boolean cancel;
@SerializedName(value = "dbId")
private long dbId;
@SerializedName(value = "tId")
private long tableId;
@SerializedName(value = "tpns")
private List<String> tempPartitionNames;
public InsertOverwriteTask() {
}
public InsertOverwriteTask(long dbId, long tableId, List<String> tempPartitionNames) {
this.dbId = dbId;
this.tableId = tableId;
this.tempPartitionNames = tempPartitionNames;
this.cancel = false;
}
public boolean isCancel() {
return cancel;
}
public void setCancel(boolean cancel) {
this.cancel = cancel;
}
public long getDbId() {
return dbId;
}
public void setDbId(long dbId) {
this.dbId = dbId;
}
public long getTableId() {
return tableId;
}
public void setTableId(long tableId) {
this.tableId = tableId;
}
public List<String> getTempPartitionNames() {
return tempPartitionNames;
}
public void setTempPartitionNames(List<String> tempPartitionNames) {
this.tempPartitionNames = tempPartitionNames;
}
public OlapTable getTable() throws DdlException {
return (OlapTable) Env.getCurrentEnv().getInternalCatalog().getDbOrDdlException(dbId)
.getTableOrDdlException(tableId,
TableType.OLAP);
}
@Override
public String toString() {
return "InsertOverwriteTask{"
+ "cancel=" + cancel
+ ", dbId=" + dbId
+ ", tableId=" + tableId
+ ", tempPartitionNames=" + tempPartitionNames
+ '}';
}
}

View File

@ -0,0 +1,133 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.insertoverwrite;
import org.apache.doris.analysis.AddPartitionLikeClause;
import org.apache.doris.analysis.DropPartitionClause;
import org.apache.doris.analysis.PartitionNames;
import org.apache.doris.analysis.ReplacePartitionClause;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.util.PropertyAnalyzer;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
public class InsertOverwriteUtil {
private static final Logger LOG = LogManager.getLogger(InsertOverwriteUtil.class);
/**
* add temp partitions
*
* @param olapTable
* @param partitionNames
* @param tempPartitionNames
* @throws DdlException
*/
public static void addTempPartitions(OlapTable olapTable, List<String> partitionNames,
List<String> tempPartitionNames) throws DdlException {
for (int i = 0; i < partitionNames.size(); i++) {
Env.getCurrentEnv().addPartitionLike((Database) olapTable.getDatabase(), olapTable.getName(),
new AddPartitionLikeClause(tempPartitionNames.get(i), partitionNames.get(i), true));
}
}
/**
* replace partitions
*
* @param olapTable
* @param partitionNames
* @param tempPartitionNames
* @throws DdlException
*/
public static void replacePartition(OlapTable olapTable, List<String> partitionNames,
List<String> tempPartitionNames) throws DdlException {
try {
if (!olapTable.writeLockIfExist()) {
return;
}
Map<String, String> properties = Maps.newHashMap();
properties.put(PropertyAnalyzer.PROPERTIES_USE_TEMP_PARTITION_NAME, "false");
ReplacePartitionClause replacePartitionClause = new ReplacePartitionClause(
new PartitionNames(false, partitionNames),
new PartitionNames(true, tempPartitionNames), properties);
Env.getCurrentEnv()
.replaceTempPartition((Database) olapTable.getDatabase(), olapTable, replacePartitionClause);
} finally {
olapTable.writeUnlock();
}
}
/**
* generate temp partitionName
*
* @param partitionNames
* @return
*/
public static List<String> generateTempPartitionNames(List<String> partitionNames) {
List<String> tempPartitionNames = new ArrayList<String>(partitionNames.size());
for (String partitionName : partitionNames) {
String tempPartitionName = "iot_temp_" + partitionName;
if (tempPartitionName.length() > 50) {
tempPartitionName = tempPartitionName.substring(0, 30) + Math.abs(Objects.hash(tempPartitionName))
+ "_" + System.currentTimeMillis();
}
tempPartitionNames.add(tempPartitionName);
}
return tempPartitionNames;
}
/**
* drop temp partitions
*
* @param olapTable
* @param tempPartitionNames
* @return
*/
public static boolean dropPartitions(OlapTable olapTable, List<String> tempPartitionNames) {
try {
if (!olapTable.writeLockIfExist()) {
return true;
}
for (String partitionName : tempPartitionNames) {
if (olapTable.getPartition(partitionName, true) == null) {
continue;
}
Env.getCurrentEnv().dropPartition(
(Database) olapTable.getDatabase(), olapTable,
new DropPartitionClause(true, partitionName, true, true));
LOG.info("successfully drop temp partition [{}] for [{}]", partitionName, olapTable.getName());
}
} catch (DdlException e) {
LOG.info("drop partition failed for [{}]", olapTable.getName(), e);
return false;
} finally {
olapTable.writeUnlock();
}
return true;
}
}

View File

@ -44,6 +44,7 @@ import org.apache.doris.datasource.InitCatalogLog;
import org.apache.doris.datasource.InitDatabaseLog;
import org.apache.doris.datasource.InitTableLog;
import org.apache.doris.ha.MasterInfo;
import org.apache.doris.insertoverwrite.InsertOverwriteLog;
import org.apache.doris.job.base.AbstractJob;
import org.apache.doris.journal.bdbje.Timestamp;
import org.apache.doris.load.DeleteInfo;
@ -888,6 +889,11 @@ public class JournalEntity implements Writable {
isRead = true;
break;
}
case OperationType.OP_INSERT_OVERWRITE: {
data = InsertOverwriteLog.read(in);
isRead = true;
break;
}
case OperationType.OP_ALTER_REPOSITORY: {
data = Repository.read(in);
isRead = true;

View File

@ -17,21 +17,13 @@
package org.apache.doris.nereids.trees.plans.commands;
import org.apache.doris.analysis.AddPartitionLikeClause;
import org.apache.doris.analysis.AlterClause;
import org.apache.doris.analysis.AlterTableStmt;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.DropPartitionClause;
import org.apache.doris.analysis.PartitionNames;
import org.apache.doris.analysis.ReplacePartitionClause;
import org.apache.doris.analysis.TableName;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.insertoverwrite.InsertOverwriteUtil;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.analyzer.UnboundTableSink;
@ -45,7 +37,6 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.DdlExecutor;
import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.qe.StmtExecutor;
@ -56,14 +47,10 @@ import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
/**
* insert into select command implementation
@ -133,149 +120,55 @@ public class InsertOverwriteTableCommand extends Command implements ForwardWithS
targetTable.getQualifiedDbName() + ": " + targetTable.getName());
}
TableName tableName = new TableName(InternalCatalog.INTERNAL_CATALOG_NAME,
targetTable.getQualifiedDbName(), targetTable.getName());
ConnectContext.get().setSkipAuth(true);
List<String> partitionNames = ((UnboundTableSink<?>) logicalQuery).getPartitions();
if (CollectionUtils.isEmpty(partitionNames)) {
partitionNames = Lists.newArrayList(targetTable.getPartitionNames());
}
List<String> tempPartitionNames = InsertOverwriteUtil.generateTempPartitionNames(partitionNames);
long taskId = Env.getCurrentEnv().getInsertOverwriteManager()
.registerTask(targetTable.getDatabase().getId(), targetTable.getId(), tempPartitionNames);
try {
List<String> partitionNames = ((UnboundTableSink<?>) logicalQuery).getPartitions();
if (CollectionUtils.isEmpty(partitionNames)) {
partitionNames = Lists.newArrayList(targetTable.getPartitionNames());
}
List<String> tempPartitionNames = addTempPartitions(ctx, tableName, partitionNames);
boolean insertRes = insertInto(ctx, executor, tempPartitionNames, tableName);
if (!insertRes) {
return;
}
replacePartition(ctx, tableName, partitionNames, tempPartitionNames);
InsertOverwriteUtil.addTempPartitions(targetTable, partitionNames, tempPartitionNames);
insertInto(ctx, executor, tempPartitionNames);
InsertOverwriteUtil.replacePartition(targetTable, partitionNames, tempPartitionNames);
Env.getCurrentEnv().getInsertOverwriteManager().taskSuccess(taskId);
} catch (Exception e) {
LOG.warn("insert into overwrite failed");
Env.getCurrentEnv().getInsertOverwriteManager().taskFail(taskId);
throw e;
} finally {
ConnectContext.get().setSkipAuth(false);
}
}
/**
* replacing partitionNames with tempPartitionNames
*
* @param ctx ctx
* @param tableName tableName
* @param partitionNames partitionNames
* @param tempPartitionNames tempPartitionNames
* @throws UserException UserException
*/
private void replacePartition(ConnectContext ctx, TableName tableName, List<String> partitionNames,
List<String> tempPartitionNames)
throws UserException {
// overwrite old partition with tmp partition
try {
List<AlterClause> ops = new ArrayList<>();
Map<String, String> properties = new HashMap<>();
properties.put("use_temp_partition_name", "false");
ops.add(new ReplacePartitionClause(new PartitionNames(false, partitionNames),
new PartitionNames(true, tempPartitionNames), properties));
AlterTableStmt alterTableStmt = new AlterTableStmt(tableName, ops);
Env.getCurrentEnv().alterTable(alterTableStmt);
} catch (Exception e) {
LOG.warn("IOT overwrite table partitions error", e);
rollback(ctx, tableName, tempPartitionNames);
throw e;
}
}
/**
* insert into select
*
* @param ctx ctx
* @param executor executor
* @param tempPartitionNames tempPartitionNames
* @param tableName tableName
*/
private boolean insertInto(ConnectContext ctx, StmtExecutor executor, List<String> tempPartitionNames,
TableName tableName) {
try {
UnboundTableSink<?> sink = (UnboundTableSink<?>) logicalQuery;
UnboundTableSink<?> copySink = new UnboundTableSink<>(
sink.getNameParts(),
sink.getColNames(),
sink.getHints(),
true,
tempPartitionNames,
sink.isPartialUpdate(),
sink.getDMLCommandType(),
(LogicalPlan) (sink.child(0)));
// for overwrite situation, we disable auto create partition.
InsertIntoTableCommand insertCommand = new InsertIntoTableCommand(copySink, labelName);
insertCommand.setAllowAutoPartition(false);
insertCommand.run(ctx, executor);
if (ctx.getState().getStateType() == MysqlStateType.ERR) {
String errMsg = Strings.emptyToNull(ctx.getState().getErrorMessage());
LOG.warn("InsertInto state error:{}", errMsg);
rollback(ctx, tableName, tempPartitionNames);
return false;
}
return true;
} catch (Exception e) {
LOG.warn("InsertInto error", e);
rollback(ctx, tableName, tempPartitionNames);
return false;
}
}
/**
* add some tempPartitions
*
* @param ctx ctx
* @param tableName tableName
* @param partitionNames partitionNames
* @return tempPartitionNames
* @throws Exception Exception
*/
private List<String> addTempPartitions(ConnectContext ctx, TableName tableName, List<String> partitionNames)
private void insertInto(ConnectContext ctx, StmtExecutor executor, List<String> tempPartitionNames)
throws Exception {
List<String> tempPartitionNames = new ArrayList<>();
try {
// create tmp partitions with uuid
for (String partitionName : partitionNames) {
UUID uuid = UUID.randomUUID();
// to comply with naming rules
String tempPartName = "tmp_partition_" + uuid.toString().replace('-', '_');
List<AlterClause> ops = new ArrayList<>();
ops.add(new AddPartitionLikeClause(tempPartName, partitionName, true));
AlterTableStmt alterTableStmt = new AlterTableStmt(tableName, ops);
Analyzer tempAnalyzer = new Analyzer(Env.getCurrentEnv(), ctx);
alterTableStmt.analyze(tempAnalyzer);
DdlExecutor.execute(ctx.getEnv(), alterTableStmt);
// only when execution succeeded, put the temp partition name into list
tempPartitionNames.add(tempPartName);
}
return tempPartitionNames;
} catch (Exception e) {
LOG.warn("IOT create tmp table partitions error", e);
rollback(ctx, tableName, tempPartitionNames);
throw e;
}
}
/**
* delete temp partitions
*
* @param ctx ctx
* @param targetTableName targetTableName
* @param tempPartitionNames tempPartitionNames
*/
private void rollback(ConnectContext ctx, TableName targetTableName,
List<String> tempPartitionNames) {
try {
for (String partitionName : tempPartitionNames) {
List<AlterClause> ops = new ArrayList<>();
ops.add(new DropPartitionClause(true, partitionName, true, true));
AlterTableStmt dropTablePartitionStmt = new AlterTableStmt(targetTableName, ops);
Analyzer tempAnalyzer = new Analyzer(Env.getCurrentEnv(), ctx);
dropTablePartitionStmt.analyze(tempAnalyzer);
DdlExecutor.execute(ctx.getEnv(), dropTablePartitionStmt);
}
} catch (Exception ex) {
LOG.warn("IOT drop partitions error", ex);
ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, "Unexpected exception: " + ex.getMessage());
UnboundTableSink<?> sink = (UnboundTableSink<?>) logicalQuery;
UnboundTableSink<?> copySink = new UnboundTableSink<>(
sink.getNameParts(),
sink.getColNames(),
sink.getHints(),
true,
tempPartitionNames,
sink.isPartialUpdate(),
sink.getDMLCommandType(),
(LogicalPlan) (sink.child(0)));
// for overwrite situation, we disable auto create partition.
InsertIntoTableCommand insertCommand = new InsertIntoTableCommand(copySink, labelName);
insertCommand.setAllowAutoPartition(false);
insertCommand.run(ctx, executor);
if (ctx.getState().getStateType() == MysqlStateType.ERR) {
String errMsg = Strings.emptyToNull(ctx.getState().getErrorMessage());
LOG.warn("InsertInto state error:{}", errMsg);
throw new UserException(errMsg);
}
}

View File

@ -52,6 +52,7 @@ import org.apache.doris.datasource.ExternalObjectLog;
import org.apache.doris.datasource.InitCatalogLog;
import org.apache.doris.datasource.InitDatabaseLog;
import org.apache.doris.ha.MasterInfo;
import org.apache.doris.insertoverwrite.InsertOverwriteLog;
import org.apache.doris.job.base.AbstractJob;
import org.apache.doris.journal.Journal;
import org.apache.doris.journal.JournalCursor;
@ -1132,6 +1133,11 @@ public class EditLog {
env.getAlterInstance().processAlterMTMV(alterMtmv, true);
break;
}
case OperationType.OP_INSERT_OVERWRITE: {
final InsertOverwriteLog insertOverwriteLog = (InsertOverwriteLog) journal.getData();
env.getInsertOverwriteManager().replayInsertOverwriteLog(insertOverwriteLog);
break;
}
case OperationType.OP_ALTER_REPOSITORY: {
Repository repository = (Repository) journal.getData();
env.getBackupHandler().getRepoMgr().alterRepo(repository, true);
@ -1963,6 +1969,10 @@ public class EditLog {
}
public void logInsertOverwrite(InsertOverwriteLog log) {
logEdit(OperationType.OP_INSERT_OVERWRITE, log);
}
public String getNotReadyReason() {
if (journal == null) {
return "journal is null";

View File

@ -353,6 +353,8 @@ public class OperationType {
public static final short OP_ALTER_REPOSITORY = 460;
public static final short OP_INSERT_OVERWRITE = 461;
/**
* Get opcode name by op code.
**/

View File

@ -240,6 +240,12 @@ public class MetaPersistMethod {
break;
case "JobTaskManager":
break;
case "insertOverwrite":
metaPersistMethod.readMethod =
Env.class.getDeclaredMethod("loadInsertOverwrite", DataInputStream.class, long.class);
metaPersistMethod.writeMethod =
Env.class.getDeclaredMethod("saveInsertOverwrite", CountingDataOutputStream.class, long.class);
break;
default:
break;
}

View File

@ -39,7 +39,8 @@ public class PersistMetaModules {
"globalVariable", "cluster", "broker", "resources", "exportJob", "syncJob", "backupHandler",
"paloAuth", "transactionState", "colocateTableIndex", "routineLoadJobs", "loadJobV2", "smallFiles",
"plugins", "deleteHandler", "sqlBlockRule", "policy", "globalFunction", "workloadGroups",
"binlogs", "resourceGroups", "AnalysisMgrV2", "AsyncJobManager", "workloadSchedPolicy");
"binlogs", "resourceGroups", "AnalysisMgrV2", "AsyncJobManager", "workloadSchedPolicy",
"insertOverwrite");
// Modules in this list is deprecated and will not be saved in meta file. (also should not be in MODULE_NAMES)
public static final ImmutableList<String> DEPRECATED_MODULE_NAMES = ImmutableList.of(