[cherry-pick](branch-2.1) Pick "[Enhancement](txn) Block new insert into if schema change happens during transaction (#39483)" (#40115)

## Proposed changes

Pick #39483 

<!--Describe your changes.-->
This commit is contained in:
abmdocrt
2024-08-30 10:01:10 +08:00
committed by GitHub
parent 45e35e8a99
commit 0b9817ea4b
17 changed files with 466 additions and 5 deletions

View File

@ -19,6 +19,7 @@ package org.apache.doris.nereids.trees.plans.commands.insert;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.ErrorCode;
@ -114,6 +115,16 @@ public class BatchInsertIntoTableCommand extends Command implements NoForward, E
Preconditions.checkArgument(plan.isPresent(), "insert into command must contain OlapTableSinkNode");
sink = ((PhysicalOlapTableSink<?>) plan.get());
Table targetTable = sink.getTargetTable();
if (ctx.getTxnEntry().isFirstTxnInsert()) {
ctx.getTxnEntry().setTxnSchemaVersion(((OlapTable) targetTable).getBaseSchemaVersion());
ctx.getTxnEntry().setFirstTxnInsert(false);
} else {
if (((OlapTable) targetTable).getBaseSchemaVersion() != ctx.getTxnEntry().getTxnSchemaVersion()) {
throw new AnalysisException("There are schema changes in one transaction, "
+ "you can commit this transaction with formal data or rollback "
+ "this whole transaction.");
}
}
// should set columns of sink since we maybe generate some invisible columns
List<Column> fullSchema = sink.getTargetTable().getFullSchema();
List<Column> targetSchema = Lists.newArrayList();

View File

@ -58,10 +58,6 @@ public class PhysicalResultSink<CHILD_TYPE extends Plan> extends PhysicalSink<CH
logicalProperties, physicalProperties, statistics, child);
}
public List<NamedExpression> getOutputExprs() {
return outputExprs;
}
@Override
public PhysicalResultSink<Plan> withChildren(List<Plan> children) {
Preconditions.checkArgument(children.size() == 1,

View File

@ -1978,6 +1978,7 @@ public class StmtExecutor {
.setThriftRpcTimeoutMs(5000).setTxnId(-1).setDb("").setTbl("")
.setMaxFilterRatio(context.getSessionVariable().getEnableInsertStrict() ? 0
: context.getSessionVariable().getInsertMaxFilterRatio()));
context.getTxnEntry().setFirstTxnInsert(true);
StringBuilder sb = new StringBuilder();
sb.append("{'label':'").append(context.getTxnEntry().getLabel()).append("', 'status':'")
.append(TransactionStatus.PREPARE.name());

View File

@ -67,6 +67,8 @@ public class TransactionEntry {
private List<InternalService.PDataRow> dataToSend = new ArrayList<>();
private long rowsInTransaction = 0;
private Types.PUniqueId pLoadId;
private boolean isFirstTxnInsert = false;
private volatile int txnSchemaVersion = -1;
// for insert into select for multi tables
private boolean isTransactionBegan = false;
@ -164,6 +166,22 @@ public class TransactionEntry {
this.pLoadId = pLoadId;
}
public boolean isFirstTxnInsert() {
return isFirstTxnInsert;
}
public void setFirstTxnInsert(boolean firstTxnInsert) {
isFirstTxnInsert = firstTxnInsert;
}
public int getTxnSchemaVersion() {
return txnSchemaVersion;
}
public void setTxnSchemaVersion(int txnSchemaVersion) {
this.txnSchemaVersion = txnSchemaVersion;
}
// Used for insert into select
public void beginTransaction(DatabaseIf database, TableIf table)
throws DdlException, BeginTransactionException, MetaNotFoundException, AnalysisException,