[Feature](insert) support insert overwrite stmt (#19616)
This commit is contained in:
@ -18,6 +18,7 @@
|
||||
package org.apache.doris.alter;
|
||||
|
||||
import org.apache.doris.analysis.AddPartitionClause;
|
||||
import org.apache.doris.analysis.AddPartitionLikeClause;
|
||||
import org.apache.doris.analysis.AlterClause;
|
||||
import org.apache.doris.analysis.AlterSystemStmt;
|
||||
import org.apache.doris.analysis.AlterTableStmt;
|
||||
@ -260,7 +261,7 @@ public class Alter {
|
||||
}
|
||||
} else if (alterClause instanceof DropPartitionFromIndexClause) {
|
||||
// do nothing
|
||||
} else if (alterClause instanceof AddPartitionClause) {
|
||||
} else if (alterClause instanceof AddPartitionClause || alterClause instanceof AddPartitionLikeClause) {
|
||||
needProcessOutsideTableLock = true;
|
||||
} else {
|
||||
throw new DdlException("Invalid alter operation: " + alterClause.getOpType());
|
||||
@ -484,6 +485,12 @@ public class Alter {
|
||||
(OlapTable) db.getTableOrMetaException(tableName, TableType.OLAP));
|
||||
}
|
||||
Env.getCurrentEnv().addPartition(db, tableName, (AddPartitionClause) alterClause);
|
||||
} else if (alterClause instanceof AddPartitionLikeClause) {
|
||||
if (!((AddPartitionLikeClause) alterClause).getIsTempPartition()) {
|
||||
DynamicPartitionUtil.checkAlterAllowed(
|
||||
(OlapTable) db.getTableOrMetaException(tableName, TableType.OLAP));
|
||||
}
|
||||
Env.getCurrentEnv().addPartitionLike(db, tableName, (AddPartitionLikeClause) alterClause);
|
||||
} else if (alterClause instanceof ModifyPartitionClause) {
|
||||
ModifyPartitionClause clause = ((ModifyPartitionClause) alterClause);
|
||||
Map<String, String> properties = clause.getProperties();
|
||||
|
||||
@ -0,0 +1,61 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.analysis;
|
||||
|
||||
import org.apache.doris.alter.AlterOpType;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
|
||||
import lombok.Getter;
|
||||
|
||||
public class AddPartitionLikeClause extends AlterTableClause {
|
||||
@Getter
|
||||
private final String partitionName;
|
||||
|
||||
@Getter
|
||||
private final String existedPartitionName;
|
||||
|
||||
@Getter
|
||||
private final Boolean isTempPartition;
|
||||
|
||||
public AddPartitionLikeClause(String partitionName,
|
||||
String existedPartitionName,
|
||||
boolean isTempPartition) {
|
||||
super(AlterOpType.ADD_PARTITION);
|
||||
this.partitionName = partitionName;
|
||||
this.existedPartitionName = existedPartitionName;
|
||||
this.isTempPartition = isTempPartition;
|
||||
this.needTableStable = false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void analyze(Analyzer analyzer) throws AnalysisException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toSql() {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("ADD PARTITION ").append(partitionName).append(" LIKE ");
|
||||
sb.append(existedPartitionName);
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return toSql();
|
||||
}
|
||||
}
|
||||
@ -51,7 +51,8 @@ public class CreateTableAsSelectStmt extends DdlStmt {
|
||||
this.createTableStmt = createTableStmt;
|
||||
this.columnNames = columnNames;
|
||||
this.queryStmt = queryStmt;
|
||||
this.insertStmt = new NativeInsertStmt(createTableStmt.getDbTbl(), queryStmt);
|
||||
this.insertStmt = new NativeInsertStmt(createTableStmt.getDbTbl(), null, null,
|
||||
queryStmt, null, columnNames);
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@ -0,0 +1,73 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.analysis;
|
||||
|
||||
import org.apache.doris.common.UserException;
|
||||
|
||||
import lombok.Getter;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
public class InsertOverwriteTableStmt extends DdlStmt {
|
||||
|
||||
private final InsertTarget target;
|
||||
|
||||
@Getter
|
||||
private final String label;
|
||||
|
||||
@Getter
|
||||
private final List<String> cols;
|
||||
|
||||
private final InsertSource source;
|
||||
|
||||
@Getter
|
||||
private final List<String> hints;
|
||||
|
||||
public InsertOverwriteTableStmt(InsertTarget target, String label, List<String> cols, InsertSource source,
|
||||
List<String> hints) {
|
||||
this.target = target;
|
||||
this.label = label;
|
||||
this.cols = cols;
|
||||
this.source = source;
|
||||
this.hints = hints;
|
||||
}
|
||||
|
||||
public String getDb() {
|
||||
return target.getTblName().getDb();
|
||||
}
|
||||
|
||||
public String getTbl() {
|
||||
return target.getTblName().getTbl();
|
||||
}
|
||||
|
||||
public QueryStmt getQueryStmt() {
|
||||
return source.getQueryStmt();
|
||||
}
|
||||
|
||||
public List<String> getPartitionNames() {
|
||||
if (target.getPartitionNames() == null) {
|
||||
return new ArrayList<>();
|
||||
}
|
||||
return target.getPartitionNames().getPartitionNames();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void analyze(Analyzer analyzer) throws UserException {
|
||||
}
|
||||
}
|
||||
@ -144,13 +144,17 @@ public class NativeInsertStmt extends InsertStmt {
|
||||
&& ((SelectStmt) queryStmt).getTableRefs().isEmpty());
|
||||
}
|
||||
|
||||
// Ctor for CreateTableAsSelectStmt
|
||||
public NativeInsertStmt(TableName name, QueryStmt queryStmt) {
|
||||
// Ctor for CreateTableAsSelectStmt and InsertOverwriteTableStmt
|
||||
public NativeInsertStmt(TableName name, PartitionNames targetPartitionNames, LabelName label,
|
||||
QueryStmt queryStmt, List<String> planHints, List<String> targetColumnNames) {
|
||||
this.tblName = name;
|
||||
this.targetPartitionNames = null;
|
||||
this.targetColumnNames = null;
|
||||
this.targetPartitionNames = targetPartitionNames;
|
||||
this.label = label;
|
||||
this.queryStmt = queryStmt;
|
||||
this.planHints = null;
|
||||
this.planHints = planHints;
|
||||
this.targetColumnNames = targetColumnNames;
|
||||
this.isValuesOrConstantSelect = (queryStmt instanceof SelectStmt
|
||||
&& ((SelectStmt) queryStmt).getTableRefs().isEmpty());
|
||||
}
|
||||
|
||||
public boolean isValuesOrConstantSelect() {
|
||||
|
||||
@ -71,6 +71,15 @@ public class PartitionDesc {
|
||||
return this.singlePartitionDescs;
|
||||
}
|
||||
|
||||
public SinglePartitionDesc getSinglePartitionDescByName(String partitionName) {
|
||||
for (SinglePartitionDesc singlePartitionDesc : this.singlePartitionDescs) {
|
||||
if (singlePartitionDesc.getPartitionName().equals(partitionName)) {
|
||||
return singlePartitionDesc;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public List<String> getPartitionColNames() {
|
||||
return partitionColNames;
|
||||
}
|
||||
|
||||
@ -24,6 +24,7 @@ import org.apache.doris.alter.MaterializedViewHandler;
|
||||
import org.apache.doris.alter.SchemaChangeHandler;
|
||||
import org.apache.doris.alter.SystemHandler;
|
||||
import org.apache.doris.analysis.AddPartitionClause;
|
||||
import org.apache.doris.analysis.AddPartitionLikeClause;
|
||||
import org.apache.doris.analysis.AdminCheckTabletsStmt;
|
||||
import org.apache.doris.analysis.AdminCheckTabletsStmt.CheckType;
|
||||
import org.apache.doris.analysis.AdminCleanTrashStmt;
|
||||
@ -2713,6 +2714,11 @@ public class Env {
|
||||
getInternalCatalog().addPartition(db, tableName, addPartitionClause);
|
||||
}
|
||||
|
||||
public void addPartitionLike(Database db, String tableName, AddPartitionLikeClause addPartitionLikeClause)
|
||||
throws DdlException {
|
||||
getInternalCatalog().addPartitionLike(db, tableName, addPartitionLikeClause);
|
||||
}
|
||||
|
||||
public void replayAddPartition(PartitionPersistInfo info) throws MetaNotFoundException {
|
||||
getInternalCatalog().replayAddPartition(info);
|
||||
}
|
||||
|
||||
@ -318,7 +318,7 @@ public class PartitionInfo implements Writable {
|
||||
if (expr == MaxLiteral.MAX_VALUE) {
|
||||
return PartitionValue.MAX_VALUE;
|
||||
} else if (expr instanceof DateLiteral) {
|
||||
return new PartitionValue(expr.toSql());
|
||||
return new PartitionValue(expr.getStringValue());
|
||||
} else {
|
||||
return new PartitionValue(expr.getRealValue().toString());
|
||||
}
|
||||
|
||||
@ -18,6 +18,7 @@
|
||||
package org.apache.doris.datasource;
|
||||
|
||||
import org.apache.doris.analysis.AddPartitionClause;
|
||||
import org.apache.doris.analysis.AddPartitionLikeClause;
|
||||
import org.apache.doris.analysis.AddRollupClause;
|
||||
import org.apache.doris.analysis.AlterClause;
|
||||
import org.apache.doris.analysis.AlterDatabaseQuotaStmt;
|
||||
@ -1254,6 +1255,51 @@ public class InternalCatalog implements CatalogIf<Database> {
|
||||
}
|
||||
}
|
||||
|
||||
public void addPartitionLike(Database db, String tableName, AddPartitionLikeClause addPartitionLikeClause)
|
||||
throws DdlException {
|
||||
try {
|
||||
Table table = db.getTableOrDdlException(tableName);
|
||||
|
||||
if (table.getType() != TableType.OLAP) {
|
||||
throw new DdlException("Only support create partition from a OLAP table");
|
||||
}
|
||||
|
||||
// Lock the table to prevent other SQL from performing write operation during table structure modification
|
||||
AddPartitionClause clause = null;
|
||||
table.readLock();
|
||||
try {
|
||||
String partitionName = addPartitionLikeClause.getPartitionName();
|
||||
String existedName = addPartitionLikeClause.getExistedPartitionName();
|
||||
OlapTable olapTable = (OlapTable) table;
|
||||
Partition part = olapTable.getPartition(existedName);
|
||||
if (part == null) {
|
||||
throw new DdlException("Failed to ADD PARTITION" + partitionName + " LIKE "
|
||||
+ existedName + ". Reason: " + "partition " + existedName + "not exist");
|
||||
}
|
||||
PartitionInfo partitionInfo = olapTable.getPartitionInfo();
|
||||
PartitionDesc partitionDesc = partitionInfo.toPartitionDesc((OlapTable) table);
|
||||
SinglePartitionDesc oldPartitionDesc = partitionDesc.getSinglePartitionDescByName(existedName);
|
||||
if (oldPartitionDesc == null) {
|
||||
throw new DdlException("Failed to ADD PARTITION" + partitionName + " LIKE "
|
||||
+ existedName + ". Reason: " + "partition " + existedName + "desc not exist");
|
||||
}
|
||||
DistributionDesc distributionDesc = part.getDistributionInfo().toDistributionDesc();
|
||||
SinglePartitionDesc newPartitionDesc = new SinglePartitionDesc(false, partitionName,
|
||||
oldPartitionDesc.getPartitionKeyDesc(), oldPartitionDesc.getProperties());
|
||||
Map<String, String> properties = newPartitionDesc.getProperties();
|
||||
clause = new AddPartitionClause(newPartitionDesc, distributionDesc,
|
||||
properties, addPartitionLikeClause.getIsTempPartition());
|
||||
} finally {
|
||||
table.readUnlock();
|
||||
}
|
||||
addPartition(db, tableName, clause);
|
||||
|
||||
} catch (UserException e) {
|
||||
throw new DdlException("Failed to ADD PARTITION " + addPartitionLikeClause.getPartitionName()
|
||||
+ " LIKE " + addPartitionLikeClause.getExistedPartitionName() + ". Reason: " + e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
public void addPartition(Database db, String tableName, AddPartitionClause addPartitionClause) throws DdlException {
|
||||
SinglePartitionDesc singlePartitionDesc = addPartitionClause.getSingeRangePartitionDesc();
|
||||
DistributionDesc distributionDesc = addPartitionClause.getDistributionDesc();
|
||||
|
||||
@ -17,31 +17,42 @@
|
||||
|
||||
package org.apache.doris.qe;
|
||||
|
||||
import org.apache.doris.analysis.AddPartitionLikeClause;
|
||||
import org.apache.doris.analysis.AlterClause;
|
||||
import org.apache.doris.analysis.AlterTableStmt;
|
||||
import org.apache.doris.analysis.AnalyzeStmt;
|
||||
import org.apache.doris.analysis.Analyzer;
|
||||
import org.apache.doris.analysis.ArrayLiteral;
|
||||
import org.apache.doris.analysis.CreateTableAsSelectStmt;
|
||||
import org.apache.doris.analysis.CreateTableLikeStmt;
|
||||
import org.apache.doris.analysis.DdlStmt;
|
||||
import org.apache.doris.analysis.DecimalLiteral;
|
||||
import org.apache.doris.analysis.DeleteStmt;
|
||||
import org.apache.doris.analysis.DropPartitionClause;
|
||||
import org.apache.doris.analysis.DropTableStmt;
|
||||
import org.apache.doris.analysis.ExecuteStmt;
|
||||
import org.apache.doris.analysis.ExplainOptions;
|
||||
import org.apache.doris.analysis.ExportStmt;
|
||||
import org.apache.doris.analysis.Expr;
|
||||
import org.apache.doris.analysis.FloatLiteral;
|
||||
import org.apache.doris.analysis.InsertOverwriteTableStmt;
|
||||
import org.apache.doris.analysis.InsertStmt;
|
||||
import org.apache.doris.analysis.KillStmt;
|
||||
import org.apache.doris.analysis.LabelName;
|
||||
import org.apache.doris.analysis.LiteralExpr;
|
||||
import org.apache.doris.analysis.LoadStmt;
|
||||
import org.apache.doris.analysis.LoadType;
|
||||
import org.apache.doris.analysis.LockTablesStmt;
|
||||
import org.apache.doris.analysis.NativeInsertStmt;
|
||||
import org.apache.doris.analysis.NullLiteral;
|
||||
import org.apache.doris.analysis.OutFileClause;
|
||||
import org.apache.doris.analysis.PartitionNames;
|
||||
import org.apache.doris.analysis.PrepareStmt;
|
||||
import org.apache.doris.analysis.Queriable;
|
||||
import org.apache.doris.analysis.QueryStmt;
|
||||
import org.apache.doris.analysis.RedirectStatus;
|
||||
import org.apache.doris.analysis.ReplacePartitionClause;
|
||||
import org.apache.doris.analysis.ReplaceTableClause;
|
||||
import org.apache.doris.analysis.SelectListItem;
|
||||
import org.apache.doris.analysis.SelectStmt;
|
||||
import org.apache.doris.analysis.SetOperationStmt;
|
||||
@ -166,6 +177,7 @@ import java.io.StringReader;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
@ -657,6 +669,8 @@ public class StmtExecutor {
|
||||
handleTransactionStmt();
|
||||
} else if (parsedStmt instanceof CreateTableAsSelectStmt) {
|
||||
handleCtasStmt();
|
||||
} else if (parsedStmt instanceof InsertOverwriteTableStmt) {
|
||||
handleIotStmt();
|
||||
} else if (parsedStmt instanceof InsertStmt) { // Must ahead of DdlStmt because InsertStmt is its subclass
|
||||
InsertStmt insertStmt = (InsertStmt) parsedStmt;
|
||||
if (insertStmt.needLoadManager()) {
|
||||
@ -855,7 +869,8 @@ public class StmtExecutor {
|
||||
|
||||
if (parsedStmt instanceof QueryStmt
|
||||
|| (parsedStmt instanceof InsertStmt && !((InsertStmt) parsedStmt).needLoadManager())
|
||||
|| parsedStmt instanceof CreateTableAsSelectStmt) {
|
||||
|| parsedStmt instanceof CreateTableAsSelectStmt
|
||||
|| parsedStmt instanceof InsertOverwriteTableStmt) {
|
||||
if (Config.enable_resource_group && context.sessionVariable.enablePipelineEngine()) {
|
||||
analyzer.setResourceGroups(analyzer.getEnv().getResourceGroupMgr()
|
||||
.getResourceGroup(context.sessionVariable.resourceGroup));
|
||||
@ -866,6 +881,10 @@ public class StmtExecutor {
|
||||
if (parsedStmt instanceof QueryStmt) {
|
||||
queryStmt = (QueryStmt) parsedStmt;
|
||||
queryStmt.getTables(analyzer, false, tableMap, parentViewNameSet);
|
||||
} else if (parsedStmt instanceof InsertOverwriteTableStmt) {
|
||||
InsertOverwriteTableStmt parsedStmt = (InsertOverwriteTableStmt) this.parsedStmt;
|
||||
queryStmt = parsedStmt.getQueryStmt();
|
||||
queryStmt.getTables(analyzer, false, tableMap, parentViewNameSet);
|
||||
} else if (parsedStmt instanceof CreateTableAsSelectStmt) {
|
||||
CreateTableAsSelectStmt parsedStmt = (CreateTableAsSelectStmt) this.parsedStmt;
|
||||
queryStmt = parsedStmt.getQueryStmt();
|
||||
@ -2096,21 +2115,21 @@ public class StmtExecutor {
|
||||
// Maybe our bug
|
||||
LOG.warn("CTAS create table error, stmt={}", originStmt.originStmt, e);
|
||||
context.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, "Unexpected exception: " + e.getMessage());
|
||||
return;
|
||||
}
|
||||
// after success create table insert data
|
||||
if (MysqlStateType.OK.equals(context.getState().getStateType())) {
|
||||
try {
|
||||
parsedStmt = ctasStmt.getInsertStmt();
|
||||
parsedStmt.setUserInfo(context.getCurrentUserIdentity());
|
||||
execute();
|
||||
if (MysqlStateType.ERR.equals(context.getState().getStateType())) {
|
||||
LOG.warn("CTAS insert data error, stmt={}", ctasStmt.toSql());
|
||||
handleCtasRollback(ctasStmt.getCreateTableStmt().getDbTbl());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.warn("CTAS insert data error, stmt={}", ctasStmt.toSql(), e);
|
||||
try {
|
||||
parsedStmt = ctasStmt.getInsertStmt();
|
||||
parsedStmt.setUserInfo(context.getCurrentUserIdentity());
|
||||
execute();
|
||||
if (MysqlStateType.ERR.equals(context.getState().getStateType())) {
|
||||
LOG.warn("CTAS insert data error, stmt={}", ctasStmt.toSql());
|
||||
handleCtasRollback(ctasStmt.getCreateTableStmt().getDbTbl());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.warn("CTAS insert data error, stmt={}", ctasStmt.toSql(), e);
|
||||
context.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, "Unexpected exception: " + e.getMessage());
|
||||
handleCtasRollback(ctasStmt.getCreateTableStmt().getDbTbl());
|
||||
}
|
||||
}
|
||||
|
||||
@ -2127,6 +2146,184 @@ public class StmtExecutor {
|
||||
}
|
||||
}
|
||||
|
||||
private void handleIotStmt() {
|
||||
InsertOverwriteTableStmt iotStmt = (InsertOverwriteTableStmt) this.parsedStmt;
|
||||
if (iotStmt.getPartitionNames().size() == 0) {
|
||||
// insert overwrite table
|
||||
handleOverwriteTable(iotStmt);
|
||||
} else {
|
||||
// insert overwrite table with partition
|
||||
handleOverwritePartition(iotStmt);
|
||||
}
|
||||
}
|
||||
|
||||
private void handleOverwriteTable(InsertOverwriteTableStmt iotStmt) {
|
||||
UUID uuid = UUID.randomUUID();
|
||||
// to comply with naming rules
|
||||
TableName tmpTableName = new TableName(null, iotStmt.getDb(), "tmp_table_" + uuid.toString().replace('-', '_'));
|
||||
TableName targetTableName = new TableName(null, iotStmt.getDb(), iotStmt.getTbl());
|
||||
try {
|
||||
// create a tmp table with uuid
|
||||
parsedStmt = new CreateTableLikeStmt(false, tmpTableName, targetTableName, null, false);
|
||||
parsedStmt.setUserInfo(context.getCurrentUserIdentity());
|
||||
execute();
|
||||
// if create tmp table err, return
|
||||
if (MysqlStateType.ERR.equals(context.getState().getStateType())) {
|
||||
// There is already an error message in the execute() function, so there is no need to set it here
|
||||
LOG.warn("IOT create table error, stmt={}", originStmt.originStmt);
|
||||
return;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
// Maybe our bug
|
||||
LOG.warn("IOT create a tmp table error, stmt={}", originStmt.originStmt, e);
|
||||
context.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, "Unexpected exception: " + e.getMessage());
|
||||
return;
|
||||
}
|
||||
// after success create table insert data
|
||||
try {
|
||||
parsedStmt = new NativeInsertStmt(tmpTableName, null, new LabelName(iotStmt.getDb(), iotStmt.getLabel()),
|
||||
iotStmt.getQueryStmt(), iotStmt.getHints(), iotStmt.getCols());
|
||||
parsedStmt.setUserInfo(context.getCurrentUserIdentity());
|
||||
execute();
|
||||
if (MysqlStateType.ERR.equals(context.getState().getStateType())) {
|
||||
LOG.warn("IOT insert data error, stmt={}", parsedStmt.toSql());
|
||||
handleIotRollback(tmpTableName);
|
||||
return;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.warn("IOT insert data error, stmt={}", parsedStmt.toSql(), e);
|
||||
context.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, "Unexpected exception: " + e.getMessage());
|
||||
handleIotRollback(tmpTableName);
|
||||
return;
|
||||
}
|
||||
|
||||
// overwrite old table with tmp table
|
||||
try {
|
||||
List<AlterClause> ops = new ArrayList<>();
|
||||
Map<String, String> properties = new HashMap<>();
|
||||
properties.put("swap", "false");
|
||||
ops.add(new ReplaceTableClause(tmpTableName.getTbl(), properties));
|
||||
parsedStmt = new AlterTableStmt(targetTableName, ops);
|
||||
parsedStmt.setUserInfo(context.getCurrentUserIdentity());
|
||||
execute();
|
||||
if (MysqlStateType.ERR.equals(context.getState().getStateType())) {
|
||||
LOG.warn("IOT overwrite table error, stmt={}", parsedStmt.toSql());
|
||||
handleIotRollback(tmpTableName);
|
||||
return;
|
||||
}
|
||||
context.getState().setOk();
|
||||
} catch (Exception e) {
|
||||
// Maybe our bug
|
||||
LOG.warn("IOT overwrite table error, stmt={}", parsedStmt.toSql(), e);
|
||||
context.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, "Unexpected exception: " + e.getMessage());
|
||||
handleIotRollback(tmpTableName);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private void handleOverwritePartition(InsertOverwriteTableStmt iotStmt) {
|
||||
TableName targetTableName = new TableName(null, iotStmt.getDb(), iotStmt.getTbl());
|
||||
List<String> partitionNames = iotStmt.getPartitionNames();
|
||||
List<String> tempPartitionName = new ArrayList<>();
|
||||
try {
|
||||
// create tmp partitions with uuid
|
||||
for (String partitionName : partitionNames) {
|
||||
UUID uuid = UUID.randomUUID();
|
||||
// to comply with naming rules
|
||||
String tempPartName = "tmp_partition_" + uuid.toString().replace('-', '_');
|
||||
List<AlterClause> ops = new ArrayList<>();
|
||||
ops.add(new AddPartitionLikeClause(tempPartName, partitionName, true));
|
||||
parsedStmt = new AlterTableStmt(targetTableName, ops);
|
||||
parsedStmt.setUserInfo(context.getCurrentUserIdentity());
|
||||
execute();
|
||||
if (MysqlStateType.ERR.equals(context.getState().getStateType())) {
|
||||
LOG.warn("IOT create tmp partitions error, stmt={}", originStmt.originStmt);
|
||||
handleIotPartitionRollback(targetTableName, tempPartitionName);
|
||||
return;
|
||||
}
|
||||
// only when execution succeeded, put the temp partition name into list
|
||||
tempPartitionName.add(tempPartName);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
// Maybe our bug
|
||||
LOG.warn("IOT create tmp table partitions error, stmt={}", originStmt.originStmt, e);
|
||||
context.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, "Unexpected exception: " + e.getMessage());
|
||||
handleIotPartitionRollback(targetTableName, tempPartitionName);
|
||||
return;
|
||||
}
|
||||
// after success add tmp partitions
|
||||
try {
|
||||
parsedStmt = new NativeInsertStmt(targetTableName, new PartitionNames(true, tempPartitionName),
|
||||
new LabelName(iotStmt.getDb(), iotStmt.getLabel()), iotStmt.getQueryStmt(),
|
||||
iotStmt.getHints(), iotStmt.getCols());
|
||||
parsedStmt.setUserInfo(context.getCurrentUserIdentity());
|
||||
execute();
|
||||
if (MysqlStateType.ERR.equals(context.getState().getStateType())) {
|
||||
LOG.warn("IOT insert data error, stmt={}", parsedStmt.toSql());
|
||||
handleIotPartitionRollback(targetTableName, tempPartitionName);
|
||||
return;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.warn("IOT insert data error, stmt={}", parsedStmt.toSql(), e);
|
||||
context.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, "Unexpected exception: " + e.getMessage());
|
||||
handleIotPartitionRollback(targetTableName, tempPartitionName);
|
||||
return;
|
||||
}
|
||||
|
||||
// overwrite old partition with tmp partition
|
||||
try {
|
||||
List<AlterClause> ops = new ArrayList<>();
|
||||
Map<String, String> properties = new HashMap<>();
|
||||
properties.put("use_temp_partition_name", "false");
|
||||
ops.add(new ReplacePartitionClause(new PartitionNames(false, partitionNames),
|
||||
new PartitionNames(true, tempPartitionName), properties));
|
||||
parsedStmt = new AlterTableStmt(targetTableName, ops);
|
||||
parsedStmt.setUserInfo(context.getCurrentUserIdentity());
|
||||
execute();
|
||||
if (MysqlStateType.ERR.equals(context.getState().getStateType())) {
|
||||
LOG.warn("IOT overwrite table partitions error, stmt={}", parsedStmt.toSql());
|
||||
handleIotPartitionRollback(targetTableName, tempPartitionName);
|
||||
return;
|
||||
}
|
||||
context.getState().setOk();
|
||||
} catch (Exception e) {
|
||||
// Maybe our bug
|
||||
LOG.warn("IOT overwrite table partitions error, stmt={}", parsedStmt.toSql(), e);
|
||||
context.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, "Unexpected exception: " + e.getMessage());
|
||||
handleIotPartitionRollback(targetTableName, tempPartitionName);
|
||||
}
|
||||
}
|
||||
|
||||
private void handleIotRollback(TableName table) {
|
||||
// insert error drop the tmp table
|
||||
DropTableStmt dropTableStmt = new DropTableStmt(true, table, true);
|
||||
try {
|
||||
Analyzer tempAnalyzer = new Analyzer(Env.getCurrentEnv(), context);
|
||||
dropTableStmt.analyze(tempAnalyzer);
|
||||
DdlExecutor.execute(context.getEnv(), dropTableStmt);
|
||||
} catch (Exception ex) {
|
||||
LOG.warn("IOT drop table error, stmt={}", parsedStmt.toSql(), ex);
|
||||
context.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, "Unexpected exception: " + ex.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
private void handleIotPartitionRollback(TableName targetTableName, List<String> tempPartitionNames) {
|
||||
// insert error drop the tmp partitions
|
||||
try {
|
||||
for (String partitionName : tempPartitionNames) {
|
||||
List<AlterClause> ops = new ArrayList<>();
|
||||
ops.add(new DropPartitionClause(true, partitionName, true, true));
|
||||
AlterTableStmt dropTablePartitionStmt = new AlterTableStmt(targetTableName, ops);
|
||||
Analyzer tempAnalyzer = new Analyzer(Env.getCurrentEnv(), context);
|
||||
dropTablePartitionStmt.analyze(tempAnalyzer);
|
||||
DdlExecutor.execute(context.getEnv(), dropTablePartitionStmt);
|
||||
}
|
||||
} catch (Exception ex) {
|
||||
LOG.warn("IOT drop partitions error, stmt={}", parsedStmt.toSql(), ex);
|
||||
context.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, "Unexpected exception: " + ex.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
public Data.PQueryStatistics getQueryStatisticsForAuditLog() {
|
||||
if (statisticsForAuditLog == null) {
|
||||
statisticsForAuditLog = Data.PQueryStatistics.newBuilder();
|
||||
|
||||
Reference in New Issue
Block a user