[Enhancement](auto-partition) change the behaviour when insert overwrite an auto partition table #28683
If we specific target partition(s) when inserting overwrite an auto partition table, before: could create new partition now: behalf just like non-auto partition table
This commit is contained in:
@ -60,7 +60,7 @@ public class CreateTableAsSelectStmt extends DdlStmt {
|
||||
this.columnNames = columnNames;
|
||||
this.queryStmt = queryStmt;
|
||||
this.insertStmt = new NativeInsertStmt(createTableStmt.getDbTbl(), null, null,
|
||||
queryStmt, null, columnNames);
|
||||
queryStmt, null, columnNames, true);
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@ -160,6 +160,7 @@ public class NativeInsertStmt extends InsertStmt {
|
||||
private InsertType insertType = InsertType.NATIVE_INSERT;
|
||||
|
||||
boolean hasEmptyTargetColumns = false;
|
||||
private boolean allowAutoPartition = true;
|
||||
|
||||
enum InsertType {
|
||||
NATIVE_INSERT("insert_"),
|
||||
@ -193,13 +194,14 @@ public class NativeInsertStmt extends InsertStmt {
|
||||
|
||||
// Ctor for CreateTableAsSelectStmt and InsertOverwriteTableStmt
|
||||
public NativeInsertStmt(TableName name, PartitionNames targetPartitionNames, LabelName label,
|
||||
QueryStmt queryStmt, List<String> planHints, List<String> targetColumnNames) {
|
||||
QueryStmt queryStmt, List<String> planHints, List<String> targetColumnNames, boolean allowAutoPartition) {
|
||||
super(label, null, null);
|
||||
this.tblName = name;
|
||||
this.targetPartitionNames = targetPartitionNames;
|
||||
this.queryStmt = queryStmt;
|
||||
this.planHints = planHints;
|
||||
this.targetColumnNames = targetColumnNames;
|
||||
this.allowAutoPartition = allowAutoPartition;
|
||||
this.isValuesOrConstantSelect = (queryStmt instanceof SelectStmt
|
||||
&& ((SelectStmt) queryStmt).getTableRefs().isEmpty());
|
||||
}
|
||||
@ -1014,6 +1016,9 @@ public class NativeInsertStmt extends InsertStmt {
|
||||
public void complete() throws UserException {
|
||||
if (!isExplain() && targetTable instanceof OlapTable) {
|
||||
((OlapTableSink) dataSink).complete(analyzer);
|
||||
if (!allowAutoPartition) {
|
||||
((OlapTableSink) dataSink).setAutoPartition(false);
|
||||
}
|
||||
// add table indexes to transaction state
|
||||
TransactionState txnState = Env.getCurrentGlobalTransactionMgr()
|
||||
.getTransactionState(db.getId(), transactionId);
|
||||
|
||||
@ -152,7 +152,8 @@ public class InsertExecutor {
|
||||
/**
|
||||
* finalize sink to complete enough info for sink execution
|
||||
*/
|
||||
public void finalizeSink(DataSink sink, boolean isPartialUpdate, boolean isFromInsert) {
|
||||
public void finalizeSink(DataSink sink, boolean isPartialUpdate, boolean isFromInsert,
|
||||
boolean allowAutoPartition) {
|
||||
if (!(sink instanceof OlapTableSink)) {
|
||||
return;
|
||||
}
|
||||
@ -170,6 +171,9 @@ public class InsertExecutor {
|
||||
false,
|
||||
isStrictMode);
|
||||
olapTableSink.complete(new Analyzer(Env.getCurrentEnv(), ctx));
|
||||
if (!allowAutoPartition) {
|
||||
olapTableSink.setAutoPartition(false);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new AnalysisException(e.getMessage(), e);
|
||||
}
|
||||
|
||||
@ -82,6 +82,7 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync,
|
||||
* When source it's from job scheduler,it will be set.
|
||||
*/
|
||||
private long jobId;
|
||||
private boolean allowAutoPartition;
|
||||
|
||||
/**
|
||||
* constructor
|
||||
@ -90,6 +91,8 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync,
|
||||
super(PlanType.INSERT_INTO_TABLE_COMMAND);
|
||||
this.logicalQuery = Objects.requireNonNull(logicalQuery, "logicalQuery should not be null");
|
||||
this.labelName = Objects.requireNonNull(labelName, "labelName should not be null");
|
||||
// only insert overwrite will disable it.
|
||||
this.allowAutoPartition = true;
|
||||
}
|
||||
|
||||
public void setLabelName(Optional<String> labelName) {
|
||||
@ -100,6 +103,10 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync,
|
||||
this.jobId = jobId;
|
||||
}
|
||||
|
||||
public void setAllowAutoPartition(boolean allowAutoPartition) {
|
||||
this.allowAutoPartition = allowAutoPartition;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
|
||||
if (!ctx.getSessionVariable().isEnableNereidsDML()) {
|
||||
@ -160,7 +167,7 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync,
|
||||
physicalOlapTableSink.getTargetTable(), label, planner);
|
||||
insertExecutor.beginTransaction();
|
||||
insertExecutor.finalizeSink(sink, physicalOlapTableSink.isPartialUpdate(),
|
||||
physicalOlapTableSink.getDmlCommandType() == DMLCommandType.INSERT);
|
||||
physicalOlapTableSink.getDmlCommandType() == DMLCommandType.INSERT, this.allowAutoPartition);
|
||||
} finally {
|
||||
targetTableIf.readUnlock();
|
||||
}
|
||||
|
||||
@ -201,7 +201,10 @@ public class InsertOverwriteTableCommand extends Command implements ForwardWithS
|
||||
sink.isPartialUpdate(),
|
||||
sink.getDMLCommandType(),
|
||||
(LogicalPlan) (sink.child(0)));
|
||||
new InsertIntoTableCommand(copySink, labelName).run(ctx, executor);
|
||||
// for overwrite situation, we disable auto create partition.
|
||||
InsertIntoTableCommand insertCommand = new InsertIntoTableCommand(copySink, labelName);
|
||||
insertCommand.setAllowAutoPartition(false);
|
||||
insertCommand.run(ctx, executor);
|
||||
if (ctx.getState().getStateType() == MysqlStateType.ERR) {
|
||||
String errMsg = Strings.emptyToNull(ctx.getState().getErrorMessage());
|
||||
LOG.warn("InsertInto state error:{}", errMsg);
|
||||
|
||||
@ -172,6 +172,10 @@ public class OlapTableSink extends DataSink {
|
||||
tDataSink.getOlapTableSink().setLoadId(newLoadId);
|
||||
}
|
||||
|
||||
public void setAutoPartition(boolean var) {
|
||||
tDataSink.getOlapTableSink().getPartition().setEnableAutomaticPartition(var);
|
||||
}
|
||||
|
||||
// must called after tupleDescriptor is computed
|
||||
public void complete(Analyzer analyzer) throws UserException {
|
||||
for (Long partitionId : partitionIds) {
|
||||
|
||||
@ -2522,7 +2522,7 @@ public class StmtExecutor {
|
||||
// after success create table insert data
|
||||
try {
|
||||
parsedStmt = new NativeInsertStmt(tmpTableName, null, new LabelName(iotStmt.getDb(), iotStmt.getLabel()),
|
||||
iotStmt.getQueryStmt(), iotStmt.getHints(), iotStmt.getCols());
|
||||
iotStmt.getQueryStmt(), iotStmt.getHints(), iotStmt.getCols(), true);
|
||||
parsedStmt.setUserInfo(context.getCurrentUserIdentity());
|
||||
execute();
|
||||
if (MysqlStateType.ERR.equals(context.getState().getStateType())) {
|
||||
@ -2595,7 +2595,7 @@ public class StmtExecutor {
|
||||
try {
|
||||
parsedStmt = new NativeInsertStmt(targetTableName, new PartitionNames(true, tempPartitionName),
|
||||
new LabelName(iotStmt.getDb(), iotStmt.getLabel()), iotStmt.getQueryStmt(),
|
||||
iotStmt.getHints(), iotStmt.getCols());
|
||||
iotStmt.getHints(), iotStmt.getCols(), false);
|
||||
parsedStmt.setUserInfo(context.getCurrentUserIdentity());
|
||||
execute();
|
||||
if (MysqlStateType.ERR.equals(context.getState().getStateType())) {
|
||||
|
||||
Reference in New Issue
Block a user