[fix](group commit) fix http stream group commit begin a txn (#29399)

This commit is contained in:
meiyi
2024-01-02 21:58:00 +08:00
committed by GitHub
parent 5db496d844
commit d54cde51ba
2 changed files with 23 additions and 15 deletions

View File

@ -382,7 +382,7 @@ public class NativeInsertStmt extends InsertStmt {
label = new LabelName(db.getFullName(),
insertType.labePrefix + DebugUtil.printId(analyzer.getContext().queryId()).replace("-", "_"));
}
if (!isExplain() && !isTransactionBegin) {
if (!isExplain() && !isTransactionBegin && !isGroupCommitStreamLoadSql) {
if (targetTable instanceof OlapTable) {
LoadJobSourceType sourceType = LoadJobSourceType.INSERT_STREAMING;
transactionId = Env.getCurrentGlobalTransactionMgr().beginTransaction(db.getId(),
@ -1020,15 +1020,17 @@ public class NativeInsertStmt extends InsertStmt {
if (!allowAutoPartition) {
((OlapTableSink) dataSink).setAutoPartition(false);
}
// add table indexes to transaction state
TransactionState txnState = Env.getCurrentGlobalTransactionMgr()
.getTransactionState(db.getId(), transactionId);
if (txnState == null) {
throw new DdlException("txn does not exist: " + transactionId);
}
txnState.addTableIndexes((OlapTable) targetTable);
if (isPartialUpdate) {
txnState.setSchemaForPartialUpdate((OlapTable) targetTable);
if (!isGroupCommitStreamLoadSql) {
// add table indexes to transaction state
TransactionState txnState = Env.getCurrentGlobalTransactionMgr()
.getTransactionState(db.getId(), transactionId);
if (txnState == null) {
throw new DdlException("txn does not exist: " + transactionId);
}
txnState.addTableIndexes((OlapTable) targetTable);
if (isPartialUpdate) {
txnState.setSchemaForPartialUpdate((OlapTable) targetTable);
}
}
}
}
@ -1110,11 +1112,17 @@ public class NativeInsertStmt extends InsertStmt {
}
public void analyzeGroupCommit(Analyzer analyzer) throws AnalysisException {
if (isGroupCommitStreamLoadSql && targetTable != null && (targetTable instanceof OlapTable)
&& !((OlapTable) targetTable).getTableProperty().getUseSchemaLightChange()) {
throw new AnalysisException(
"table light_schema_change is false, can't do http_stream with group commit mode");
// check if http stream meets group commit requirements.
// If not meets, throw exception (consider fallback to non group commit mode).
if (isGroupCommitStreamLoadSql) {
if (targetTable != null && (targetTable instanceof OlapTable)
&& !((OlapTable) targetTable).getTableProperty().getUseSchemaLightChange()) {
throw new AnalysisException(
"table light_schema_change is false, can't do http_stream with group commit mode");
}
return;
}
// check if 'insert into' meets group commit requirements. If meets, set isGroupCommit to true
if (isGroupCommit) {
return;
}