From 01a0f0915ffac8a067559cc97220deadd9a72ab9 Mon Sep 17 00:00:00 2001 From: zhangdong <493738387@qq.com> Date: Mon, 1 Jan 2024 08:12:26 +0800 Subject: [PATCH] [fix](mtmv)fix insert overwrite will generate garbage temporary partition when restarting FE (#29075) --- .../java/org/apache/doris/catalog/Env.java | 23 +++ .../insertoverwrite/InsertOverwriteLog.java | 81 ++++++++ .../InsertOverwriteManager.java | 194 ++++++++++++++++++ .../insertoverwrite/InsertOverwriteTask.java | 96 +++++++++ .../insertoverwrite/InsertOverwriteUtil.java | 133 ++++++++++++ .../apache/doris/journal/JournalEntity.java | 6 + .../commands/InsertOverwriteTableCommand.java | 177 ++++------------ .../org/apache/doris/persist/EditLog.java | 10 + .../apache/doris/persist/OperationType.java | 2 + .../doris/persist/meta/MetaPersistMethod.java | 6 + .../persist/meta/PersistMetaModules.java | 3 +- 11 files changed, 588 insertions(+), 143 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteLog.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteManager.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteTask.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteUtil.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index aba414ecdf..b4b4837385 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -146,6 +146,7 @@ import org.apache.doris.ha.MasterInfo; import org.apache.doris.httpv2.entity.ResponseBody; import org.apache.doris.httpv2.meta.MetaBaseAction; import org.apache.doris.httpv2.rest.RestApiStatusCode; +import org.apache.doris.insertoverwrite.InsertOverwriteManager; import org.apache.doris.job.base.AbstractJob; import org.apache.doris.job.extensions.mtmv.MTMVTask; import org.apache.doris.job.manager.JobManager; @@ -515,6 +516,8 @@ public class Env { private MTMVService mtmvService; + private InsertOverwriteManager insertOverwriteManager; + public List getFrontendInfos() { List res = new ArrayList<>(); @@ -748,6 +751,7 @@ public class Env { this.workloadActionPublisherThread = new WorkloadActionPublishThread("WorkloadActionPublisher", Config.workload_action_interval_ms, systemInfo); this.mtmvService = new MTMVService(); + this.insertOverwriteManager = new InsertOverwriteManager(); } public static void destroyCheckpoint() { @@ -807,6 +811,10 @@ public class Env { return mtmvService; } + public InsertOverwriteManager getInsertOverwriteManager() { + return insertOverwriteManager; + } + public TabletScheduler getTabletScheduler() { return tabletScheduler; } @@ -1495,6 +1503,8 @@ public class Env { // so no need to check 'isReady' flag in this method postProcessAfterMetadataReplayed(false); + insertOverwriteManager.allTaskFail(); + toMasterProgress = "start daemon threads"; // start all daemon threads that only running on MASTER FE @@ -1626,6 +1636,7 @@ public class Env { // binlog gcer binlogGcer.start(); columnIdFlusher.start(); + insertOverwriteManager.start(); } // start threads that should running on all FE @@ -2148,6 +2159,18 @@ public class Env { return checksum; } + public long loadInsertOverwrite(DataInputStream in, long checksum) throws IOException { + this.insertOverwriteManager = InsertOverwriteManager.read(in); + LOG.info("finished replay iot from image"); + return checksum; + } + + public long saveInsertOverwrite(CountingDataOutputStream out, long checksum) throws IOException { + this.insertOverwriteManager.write(out); + LOG.info("finished save iot to image"); + return checksum; + } + // Only called by checkpoint thread // return the latest image file's absolute path public String saveImage() throws IOException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteLog.java b/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteLog.java new file mode 100644 index 0000000000..0d51184380 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteLog.java @@ -0,0 +1,81 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.insertoverwrite; + +import org.apache.doris.common.io.Text; +import org.apache.doris.common.io.Writable; +import org.apache.doris.persist.gson.GsonUtils; + +import com.google.gson.annotations.SerializedName; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +public class InsertOverwriteLog implements Writable { + + public enum InsertOverwriteOpType { + ADD, + DROP, + CANCEL + } + + @SerializedName(value = "id") + private long taskId; + @SerializedName(value = "task") + private InsertOverwriteTask task; + @SerializedName(value = "type") + private InsertOverwriteOpType opType; + + public InsertOverwriteLog(long taskId, InsertOverwriteTask task, + InsertOverwriteOpType opType) { + this.taskId = taskId; + this.task = task; + this.opType = opType; + } + + public long getTaskId() { + return taskId; + } + + public InsertOverwriteTask getTask() { + return task; + } + + public InsertOverwriteOpType getOpType() { + return opType; + } + + @Override + public String toString() { + return "InsertOverwriteLog{" + + "taskId=" + taskId + + ", task=" + task + + ", opType=" + opType + + '}'; + } + + @Override + public void write(DataOutput out) throws IOException { + Text.writeString(out, GsonUtils.GSON.toJson(this)); + } + + public static InsertOverwriteLog read(DataInput in) throws IOException { + return GsonUtils.GSON.fromJson(Text.readString(in), InsertOverwriteLog.class); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteManager.java b/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteManager.java new file mode 100644 index 0000000000..9835efa531 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteManager.java @@ -0,0 +1,194 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.insertoverwrite; + +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.io.Text; +import org.apache.doris.common.io.Writable; +import org.apache.doris.common.util.MasterDaemon; +import org.apache.doris.insertoverwrite.InsertOverwriteLog.InsertOverwriteOpType; +import org.apache.doris.persist.gson.GsonUtils; + +import com.google.common.collect.Maps; +import com.google.gson.annotations.SerializedName; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +public class InsertOverwriteManager extends MasterDaemon implements Writable { + private static final Logger LOG = LogManager.getLogger(InsertOverwriteManager.class); + + private static final long CLEAN_INTERVAL_SECOND = 10; + + @SerializedName(value = "tasks") + private Map tasks = Maps.newConcurrentMap(); + + public InsertOverwriteManager() { + super("InsertOverwriteDropDirtyPartitions", CLEAN_INTERVAL_SECOND * 1000); + } + + /** + * register insert overwrite task + * + * @param dbId + * @param tableId + * @param tempPartitionNames + * @return taskId + */ + public long registerTask(long dbId, long tableId, List tempPartitionNames) { + long taskId = Env.getCurrentEnv().getNextId(); + InsertOverwriteTask task = new InsertOverwriteTask(dbId, tableId, + tempPartitionNames); + tasks.put(taskId, task); + Env.getCurrentEnv().getEditLog() + .logInsertOverwrite(new InsertOverwriteLog(taskId, task, InsertOverwriteOpType.ADD)); + return taskId; + } + + /** + * when insert overwrite fail, try drop temp partition + * + * @param taskId + */ + public void taskFail(long taskId) { + boolean rollback = rollback(taskId); + if (rollback) { + removeTask(taskId); + } else { + cancelTask(taskId); + } + } + + /** + * when insert overwrite success, drop task + * + * @param taskId + */ + public void taskSuccess(long taskId) { + removeTask(taskId); + } + + /** + * for transferToMaster, try drop all temp partitions + */ + public void allTaskFail() { + LOG.info("try drop all temp partitions when transferToMaster"); + HashMap copyTasks = Maps.newHashMap(tasks); + for (Entry entry : copyTasks.entrySet()) { + taskFail(entry.getKey()); + } + } + + private void cancelTask(long taskId) { + if (tasks.containsKey(taskId)) { + LOG.info("cancel insert overwrite task: {}", tasks.get(taskId)); + tasks.get(taskId).setCancel(true); + Env.getCurrentEnv().getEditLog() + .logInsertOverwrite(new InsertOverwriteLog(taskId, null, InsertOverwriteOpType.CANCEL)); + } + } + + private void removeTask(long taskId) { + if (tasks.containsKey(taskId)) { + LOG.info("remove insert overwrite task: {}", tasks.get(taskId)); + tasks.remove(taskId); + Env.getCurrentEnv().getEditLog() + .logInsertOverwrite(new InsertOverwriteLog(taskId, null, InsertOverwriteOpType.DROP)); + } + } + + /** + * drop temp partitions + * + * @param taskId + * @return if success + */ + private boolean rollback(long taskId) { + InsertOverwriteTask task = tasks.get(taskId); + OlapTable olapTable; + try { + olapTable = task.getTable(); + } catch (DdlException e) { + LOG.warn("can not get table, task: {}", task); + return true; + } + return InsertOverwriteUtil.dropPartitions(olapTable, task.getTempPartitionNames()); + } + + /** + * replay logs + * + * @param insertOverwriteLog + */ + public void replayInsertOverwriteLog(InsertOverwriteLog insertOverwriteLog) { + switch (insertOverwriteLog.getOpType()) { + case ADD: + tasks.put(insertOverwriteLog.getTaskId(), insertOverwriteLog.getTask()); + break; + case DROP: + tasks.remove(insertOverwriteLog.getTaskId()); + break; + case CANCEL: + InsertOverwriteTask task = tasks.get(insertOverwriteLog.getTaskId()); + if (task != null) { + task.setCancel(true); + } + break; + default: + LOG.warn("error insertOverwriteLog: {}", insertOverwriteLog.toString()); + } + } + + /** + * Regularly drop partitions that have failed dropped + */ + @Override + protected void runAfterCatalogReady() { + LOG.info("start clean insert overwrite temp partitions"); + HashMap copyTasks = Maps.newHashMap(tasks); + for (Entry entry : copyTasks.entrySet()) { + if (entry.getValue().isCancel()) { + boolean rollback = rollback(entry.getKey()); + if (rollback) { + removeTask(entry.getKey()); + } + } + } + } + + + @Override + public void write(DataOutput out) throws IOException { + String json = GsonUtils.GSON.toJson(this); + Text.writeString(out, json); + } + + public static InsertOverwriteManager read(DataInput in) throws IOException { + String json = Text.readString(in); + return GsonUtils.GSON.fromJson(json, InsertOverwriteManager.class); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteTask.java b/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteTask.java new file mode 100644 index 0000000000..b37dba6a6f --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteTask.java @@ -0,0 +1,96 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.insertoverwrite; + +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.TableIf.TableType; +import org.apache.doris.common.DdlException; + +import com.google.gson.annotations.SerializedName; + +import java.util.List; + +public class InsertOverwriteTask { + @SerializedName(value = "cancel") + private boolean cancel; + @SerializedName(value = "dbId") + private long dbId; + @SerializedName(value = "tId") + private long tableId; + @SerializedName(value = "tpns") + private List tempPartitionNames; + + public InsertOverwriteTask() { + } + + public InsertOverwriteTask(long dbId, long tableId, List tempPartitionNames) { + this.dbId = dbId; + this.tableId = tableId; + this.tempPartitionNames = tempPartitionNames; + this.cancel = false; + } + + public boolean isCancel() { + return cancel; + } + + public void setCancel(boolean cancel) { + this.cancel = cancel; + } + + public long getDbId() { + return dbId; + } + + public void setDbId(long dbId) { + this.dbId = dbId; + } + + public long getTableId() { + return tableId; + } + + public void setTableId(long tableId) { + this.tableId = tableId; + } + + public List getTempPartitionNames() { + return tempPartitionNames; + } + + public void setTempPartitionNames(List tempPartitionNames) { + this.tempPartitionNames = tempPartitionNames; + } + + public OlapTable getTable() throws DdlException { + return (OlapTable) Env.getCurrentEnv().getInternalCatalog().getDbOrDdlException(dbId) + .getTableOrDdlException(tableId, + TableType.OLAP); + } + + @Override + public String toString() { + return "InsertOverwriteTask{" + + "cancel=" + cancel + + ", dbId=" + dbId + + ", tableId=" + tableId + + ", tempPartitionNames=" + tempPartitionNames + + '}'; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteUtil.java b/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteUtil.java new file mode 100644 index 0000000000..af906d2653 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteUtil.java @@ -0,0 +1,133 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.insertoverwrite; + +import org.apache.doris.analysis.AddPartitionLikeClause; +import org.apache.doris.analysis.DropPartitionClause; +import org.apache.doris.analysis.PartitionNames; +import org.apache.doris.analysis.ReplacePartitionClause; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.util.PropertyAnalyzer; + +import com.google.common.collect.Maps; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +public class InsertOverwriteUtil { + private static final Logger LOG = LogManager.getLogger(InsertOverwriteUtil.class); + + /** + * add temp partitions + * + * @param olapTable + * @param partitionNames + * @param tempPartitionNames + * @throws DdlException + */ + public static void addTempPartitions(OlapTable olapTable, List partitionNames, + List tempPartitionNames) throws DdlException { + for (int i = 0; i < partitionNames.size(); i++) { + Env.getCurrentEnv().addPartitionLike((Database) olapTable.getDatabase(), olapTable.getName(), + new AddPartitionLikeClause(tempPartitionNames.get(i), partitionNames.get(i), true)); + } + } + + /** + * replace partitions + * + * @param olapTable + * @param partitionNames + * @param tempPartitionNames + * @throws DdlException + */ + public static void replacePartition(OlapTable olapTable, List partitionNames, + List tempPartitionNames) throws DdlException { + try { + if (!olapTable.writeLockIfExist()) { + return; + } + Map properties = Maps.newHashMap(); + properties.put(PropertyAnalyzer.PROPERTIES_USE_TEMP_PARTITION_NAME, "false"); + ReplacePartitionClause replacePartitionClause = new ReplacePartitionClause( + new PartitionNames(false, partitionNames), + new PartitionNames(true, tempPartitionNames), properties); + Env.getCurrentEnv() + .replaceTempPartition((Database) olapTable.getDatabase(), olapTable, replacePartitionClause); + } finally { + olapTable.writeUnlock(); + } + + } + + /** + * generate temp partitionName + * + * @param partitionNames + * @return + */ + public static List generateTempPartitionNames(List partitionNames) { + List tempPartitionNames = new ArrayList(partitionNames.size()); + for (String partitionName : partitionNames) { + String tempPartitionName = "iot_temp_" + partitionName; + if (tempPartitionName.length() > 50) { + tempPartitionName = tempPartitionName.substring(0, 30) + Math.abs(Objects.hash(tempPartitionName)) + + "_" + System.currentTimeMillis(); + } + tempPartitionNames.add(tempPartitionName); + } + return tempPartitionNames; + } + + /** + * drop temp partitions + * + * @param olapTable + * @param tempPartitionNames + * @return + */ + public static boolean dropPartitions(OlapTable olapTable, List tempPartitionNames) { + try { + if (!olapTable.writeLockIfExist()) { + return true; + } + for (String partitionName : tempPartitionNames) { + if (olapTable.getPartition(partitionName, true) == null) { + continue; + } + Env.getCurrentEnv().dropPartition( + (Database) olapTable.getDatabase(), olapTable, + new DropPartitionClause(true, partitionName, true, true)); + LOG.info("successfully drop temp partition [{}] for [{}]", partitionName, olapTable.getName()); + } + } catch (DdlException e) { + LOG.info("drop partition failed for [{}]", olapTable.getName(), e); + return false; + } finally { + olapTable.writeUnlock(); + } + return true; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java index aae8f4e4e7..6f6bc94509 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java +++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java @@ -44,6 +44,7 @@ import org.apache.doris.datasource.InitCatalogLog; import org.apache.doris.datasource.InitDatabaseLog; import org.apache.doris.datasource.InitTableLog; import org.apache.doris.ha.MasterInfo; +import org.apache.doris.insertoverwrite.InsertOverwriteLog; import org.apache.doris.job.base.AbstractJob; import org.apache.doris.journal.bdbje.Timestamp; import org.apache.doris.load.DeleteInfo; @@ -888,6 +889,11 @@ public class JournalEntity implements Writable { isRead = true; break; } + case OperationType.OP_INSERT_OVERWRITE: { + data = InsertOverwriteLog.read(in); + isRead = true; + break; + } case OperationType.OP_ALTER_REPOSITORY: { data = Repository.read(in); isRead = true; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertOverwriteTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertOverwriteTableCommand.java index 95a36146c7..501f75b78d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertOverwriteTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertOverwriteTableCommand.java @@ -17,21 +17,13 @@ package org.apache.doris.nereids.trees.plans.commands; -import org.apache.doris.analysis.AddPartitionLikeClause; -import org.apache.doris.analysis.AlterClause; -import org.apache.doris.analysis.AlterTableStmt; -import org.apache.doris.analysis.Analyzer; -import org.apache.doris.analysis.DropPartitionClause; -import org.apache.doris.analysis.PartitionNames; -import org.apache.doris.analysis.ReplacePartitionClause; -import org.apache.doris.analysis.TableName; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.TableIf; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; import org.apache.doris.common.UserException; -import org.apache.doris.datasource.InternalCatalog; +import org.apache.doris.insertoverwrite.InsertOverwriteUtil; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.nereids.NereidsPlanner; import org.apache.doris.nereids.analyzer.UnboundTableSink; @@ -45,7 +37,6 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.qe.ConnectContext; -import org.apache.doris.qe.DdlExecutor; import org.apache.doris.qe.QueryState.MysqlStateType; import org.apache.doris.qe.StmtExecutor; @@ -56,14 +47,10 @@ import org.apache.commons.collections.CollectionUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; -import java.util.UUID; /** * insert into select command implementation @@ -133,149 +120,55 @@ public class InsertOverwriteTableCommand extends Command implements ForwardWithS targetTable.getQualifiedDbName() + ": " + targetTable.getName()); } - TableName tableName = new TableName(InternalCatalog.INTERNAL_CATALOG_NAME, - targetTable.getQualifiedDbName(), targetTable.getName()); ConnectContext.get().setSkipAuth(true); + List partitionNames = ((UnboundTableSink) logicalQuery).getPartitions(); + if (CollectionUtils.isEmpty(partitionNames)) { + partitionNames = Lists.newArrayList(targetTable.getPartitionNames()); + } + List tempPartitionNames = InsertOverwriteUtil.generateTempPartitionNames(partitionNames); + long taskId = Env.getCurrentEnv().getInsertOverwriteManager() + .registerTask(targetTable.getDatabase().getId(), targetTable.getId(), tempPartitionNames); try { - List partitionNames = ((UnboundTableSink) logicalQuery).getPartitions(); - if (CollectionUtils.isEmpty(partitionNames)) { - partitionNames = Lists.newArrayList(targetTable.getPartitionNames()); - } - List tempPartitionNames = addTempPartitions(ctx, tableName, partitionNames); - boolean insertRes = insertInto(ctx, executor, tempPartitionNames, tableName); - if (!insertRes) { - return; - } - replacePartition(ctx, tableName, partitionNames, tempPartitionNames); + InsertOverwriteUtil.addTempPartitions(targetTable, partitionNames, tempPartitionNames); + insertInto(ctx, executor, tempPartitionNames); + InsertOverwriteUtil.replacePartition(targetTable, partitionNames, tempPartitionNames); + Env.getCurrentEnv().getInsertOverwriteManager().taskSuccess(taskId); + } catch (Exception e) { + LOG.warn("insert into overwrite failed"); + Env.getCurrentEnv().getInsertOverwriteManager().taskFail(taskId); + throw e; } finally { ConnectContext.get().setSkipAuth(false); } } - /** - * replacing partitionNames with tempPartitionNames - * - * @param ctx ctx - * @param tableName tableName - * @param partitionNames partitionNames - * @param tempPartitionNames tempPartitionNames - * @throws UserException UserException - */ - private void replacePartition(ConnectContext ctx, TableName tableName, List partitionNames, - List tempPartitionNames) - throws UserException { - // overwrite old partition with tmp partition - try { - List ops = new ArrayList<>(); - Map properties = new HashMap<>(); - properties.put("use_temp_partition_name", "false"); - ops.add(new ReplacePartitionClause(new PartitionNames(false, partitionNames), - new PartitionNames(true, tempPartitionNames), properties)); - AlterTableStmt alterTableStmt = new AlterTableStmt(tableName, ops); - Env.getCurrentEnv().alterTable(alterTableStmt); - } catch (Exception e) { - LOG.warn("IOT overwrite table partitions error", e); - rollback(ctx, tableName, tempPartitionNames); - throw e; - } - } - /** * insert into select * * @param ctx ctx * @param executor executor * @param tempPartitionNames tempPartitionNames - * @param tableName tableName */ - private boolean insertInto(ConnectContext ctx, StmtExecutor executor, List tempPartitionNames, - TableName tableName) { - try { - UnboundTableSink sink = (UnboundTableSink) logicalQuery; - UnboundTableSink copySink = new UnboundTableSink<>( - sink.getNameParts(), - sink.getColNames(), - sink.getHints(), - true, - tempPartitionNames, - sink.isPartialUpdate(), - sink.getDMLCommandType(), - (LogicalPlan) (sink.child(0))); - // for overwrite situation, we disable auto create partition. - InsertIntoTableCommand insertCommand = new InsertIntoTableCommand(copySink, labelName); - insertCommand.setAllowAutoPartition(false); - insertCommand.run(ctx, executor); - if (ctx.getState().getStateType() == MysqlStateType.ERR) { - String errMsg = Strings.emptyToNull(ctx.getState().getErrorMessage()); - LOG.warn("InsertInto state error:{}", errMsg); - rollback(ctx, tableName, tempPartitionNames); - return false; - } - return true; - } catch (Exception e) { - LOG.warn("InsertInto error", e); - rollback(ctx, tableName, tempPartitionNames); - return false; - } - } - - /** - * add some tempPartitions - * - * @param ctx ctx - * @param tableName tableName - * @param partitionNames partitionNames - * @return tempPartitionNames - * @throws Exception Exception - */ - private List addTempPartitions(ConnectContext ctx, TableName tableName, List partitionNames) + private void insertInto(ConnectContext ctx, StmtExecutor executor, List tempPartitionNames) throws Exception { - List tempPartitionNames = new ArrayList<>(); - try { - // create tmp partitions with uuid - for (String partitionName : partitionNames) { - UUID uuid = UUID.randomUUID(); - // to comply with naming rules - String tempPartName = "tmp_partition_" + uuid.toString().replace('-', '_'); - List ops = new ArrayList<>(); - ops.add(new AddPartitionLikeClause(tempPartName, partitionName, true)); - - AlterTableStmt alterTableStmt = new AlterTableStmt(tableName, ops); - Analyzer tempAnalyzer = new Analyzer(Env.getCurrentEnv(), ctx); - alterTableStmt.analyze(tempAnalyzer); - DdlExecutor.execute(ctx.getEnv(), alterTableStmt); - // only when execution succeeded, put the temp partition name into list - tempPartitionNames.add(tempPartName); - } - return tempPartitionNames; - } catch (Exception e) { - LOG.warn("IOT create tmp table partitions error", e); - rollback(ctx, tableName, tempPartitionNames); - throw e; - } - } - - /** - * delete temp partitions - * - * @param ctx ctx - * @param targetTableName targetTableName - * @param tempPartitionNames tempPartitionNames - */ - private void rollback(ConnectContext ctx, TableName targetTableName, - List tempPartitionNames) { - try { - for (String partitionName : tempPartitionNames) { - List ops = new ArrayList<>(); - ops.add(new DropPartitionClause(true, partitionName, true, true)); - AlterTableStmt dropTablePartitionStmt = new AlterTableStmt(targetTableName, ops); - Analyzer tempAnalyzer = new Analyzer(Env.getCurrentEnv(), ctx); - dropTablePartitionStmt.analyze(tempAnalyzer); - DdlExecutor.execute(ctx.getEnv(), dropTablePartitionStmt); - } - } catch (Exception ex) { - LOG.warn("IOT drop partitions error", ex); - ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, "Unexpected exception: " + ex.getMessage()); + UnboundTableSink sink = (UnboundTableSink) logicalQuery; + UnboundTableSink copySink = new UnboundTableSink<>( + sink.getNameParts(), + sink.getColNames(), + sink.getHints(), + true, + tempPartitionNames, + sink.isPartialUpdate(), + sink.getDMLCommandType(), + (LogicalPlan) (sink.child(0))); + // for overwrite situation, we disable auto create partition. + InsertIntoTableCommand insertCommand = new InsertIntoTableCommand(copySink, labelName); + insertCommand.setAllowAutoPartition(false); + insertCommand.run(ctx, executor); + if (ctx.getState().getStateType() == MysqlStateType.ERR) { + String errMsg = Strings.emptyToNull(ctx.getState().getErrorMessage()); + LOG.warn("InsertInto state error:{}", errMsg); + throw new UserException(errMsg); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java index f78f417e66..cbbae9d178 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java @@ -52,6 +52,7 @@ import org.apache.doris.datasource.ExternalObjectLog; import org.apache.doris.datasource.InitCatalogLog; import org.apache.doris.datasource.InitDatabaseLog; import org.apache.doris.ha.MasterInfo; +import org.apache.doris.insertoverwrite.InsertOverwriteLog; import org.apache.doris.job.base.AbstractJob; import org.apache.doris.journal.Journal; import org.apache.doris.journal.JournalCursor; @@ -1132,6 +1133,11 @@ public class EditLog { env.getAlterInstance().processAlterMTMV(alterMtmv, true); break; } + case OperationType.OP_INSERT_OVERWRITE: { + final InsertOverwriteLog insertOverwriteLog = (InsertOverwriteLog) journal.getData(); + env.getInsertOverwriteManager().replayInsertOverwriteLog(insertOverwriteLog); + break; + } case OperationType.OP_ALTER_REPOSITORY: { Repository repository = (Repository) journal.getData(); env.getBackupHandler().getRepoMgr().alterRepo(repository, true); @@ -1963,6 +1969,10 @@ public class EditLog { } + public void logInsertOverwrite(InsertOverwriteLog log) { + logEdit(OperationType.OP_INSERT_OVERWRITE, log); + } + public String getNotReadyReason() { if (journal == null) { return "journal is null"; diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java index 87b6b5bacf..d26ddc12f4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java @@ -353,6 +353,8 @@ public class OperationType { public static final short OP_ALTER_REPOSITORY = 460; + public static final short OP_INSERT_OVERWRITE = 461; + /** * Get opcode name by op code. **/ diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaPersistMethod.java b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaPersistMethod.java index 37741d84af..2d777a50b6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaPersistMethod.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaPersistMethod.java @@ -240,6 +240,12 @@ public class MetaPersistMethod { break; case "JobTaskManager": break; + case "insertOverwrite": + metaPersistMethod.readMethod = + Env.class.getDeclaredMethod("loadInsertOverwrite", DataInputStream.class, long.class); + metaPersistMethod.writeMethod = + Env.class.getDeclaredMethod("saveInsertOverwrite", CountingDataOutputStream.class, long.class); + break; default: break; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/meta/PersistMetaModules.java b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/PersistMetaModules.java index efaa732baa..32bf866f22 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/meta/PersistMetaModules.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/PersistMetaModules.java @@ -39,7 +39,8 @@ public class PersistMetaModules { "globalVariable", "cluster", "broker", "resources", "exportJob", "syncJob", "backupHandler", "paloAuth", "transactionState", "colocateTableIndex", "routineLoadJobs", "loadJobV2", "smallFiles", "plugins", "deleteHandler", "sqlBlockRule", "policy", "globalFunction", "workloadGroups", - "binlogs", "resourceGroups", "AnalysisMgrV2", "AsyncJobManager", "workloadSchedPolicy"); + "binlogs", "resourceGroups", "AnalysisMgrV2", "AsyncJobManager", "workloadSchedPolicy", + "insertOverwrite"); // Modules in this list is deprecated and will not be saved in meta file. (also should not be in MODULE_NAMES) public static final ImmutableList DEPRECATED_MODULE_NAMES = ImmutableList.of(