[fix](group commit) Group commit http stream should not begin txn (#35494) (#35672)

## Proposed changes

Pick https://github.com/apache/doris/pull/35494
This commit is contained in:
meiyi
2024-05-30 20:57:59 +08:00
committed by GitHub
parent 64023c54bc
commit d6757e03de

View File

@ -86,6 +86,10 @@ public class OlapInsertExecutor extends AbstractInsertExecutor {
@Override
public void beginTransaction() {
if (isGroupCommitHttpStream()) {
LOG.info("skip begin transaction for group commit http stream");
return;
}
try {
if (ctx.isTxnModel()) {
TransactionEntry txnEntry = ctx.getTxnEntry();
@ -155,13 +159,15 @@ public class OlapInsertExecutor extends AbstractInsertExecutor {
} catch (Exception e) {
throw new AnalysisException(e.getMessage(), e);
}
TransactionState state = Env.getCurrentGlobalTransactionMgr().getTransactionState(database.getId(), txnId);
if (state == null) {
throw new AnalysisException("txn does not exist: " + txnId);
}
state.addTableIndexes((OlapTable) table);
if (physicalOlapTableSink.isPartialUpdate()) {
state.setSchemaForPartialUpdate((OlapTable) table);
if (!isGroupCommitHttpStream()) {
TransactionState state = Env.getCurrentGlobalTransactionMgr().getTransactionState(database.getId(), txnId);
if (state == null) {
throw new AnalysisException("txn does not exist: " + txnId);
}
state.addTableIndexes((OlapTable) table);
if (physicalOlapTableSink.isPartialUpdate()) {
state.setSchemaForPartialUpdate((OlapTable) table);
}
}
}
@ -279,4 +285,8 @@ public class OlapInsertExecutor extends AbstractInsertExecutor {
// update it, so that user can get loaded rows in fe.audit.log
ctx.updateReturnRows((int) loadedRows);
}
private boolean isGroupCommitHttpStream() {
return ConnectContext.get() != null && ConnectContext.get().isGroupCommitStreamLoadSql();
}
}