From d54cde51ba491d7c941c45a6cddfe107b018494f Mon Sep 17 00:00:00 2001 From: meiyi Date: Tue, 2 Jan 2024 21:58:00 +0800 Subject: [PATCH] [fix](group commit) fix http stream group commit begin a txn (#29399) --- .../doris/analysis/NativeInsertStmt.java | 36 +++++++++++-------- ..._http_stream_lineitem_schema_change.groovy | 2 +- 2 files changed, 23 insertions(+), 15 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java index da82c4406c..576ebba9b0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java @@ -382,7 +382,7 @@ public class NativeInsertStmt extends InsertStmt { label = new LabelName(db.getFullName(), insertType.labePrefix + DebugUtil.printId(analyzer.getContext().queryId()).replace("-", "_")); } - if (!isExplain() && !isTransactionBegin) { + if (!isExplain() && !isTransactionBegin && !isGroupCommitStreamLoadSql) { if (targetTable instanceof OlapTable) { LoadJobSourceType sourceType = LoadJobSourceType.INSERT_STREAMING; transactionId = Env.getCurrentGlobalTransactionMgr().beginTransaction(db.getId(), @@ -1020,15 +1020,17 @@ public class NativeInsertStmt extends InsertStmt { if (!allowAutoPartition) { ((OlapTableSink) dataSink).setAutoPartition(false); } - // add table indexes to transaction state - TransactionState txnState = Env.getCurrentGlobalTransactionMgr() - .getTransactionState(db.getId(), transactionId); - if (txnState == null) { - throw new DdlException("txn does not exist: " + transactionId); - } - txnState.addTableIndexes((OlapTable) targetTable); - if (isPartialUpdate) { - txnState.setSchemaForPartialUpdate((OlapTable) targetTable); + if (!isGroupCommitStreamLoadSql) { + // add table indexes to transaction state + TransactionState txnState = Env.getCurrentGlobalTransactionMgr() + .getTransactionState(db.getId(), transactionId); + if (txnState == null) { + throw new DdlException("txn does not exist: " + transactionId); + } + txnState.addTableIndexes((OlapTable) targetTable); + if (isPartialUpdate) { + txnState.setSchemaForPartialUpdate((OlapTable) targetTable); + } } } } @@ -1110,11 +1112,17 @@ public class NativeInsertStmt extends InsertStmt { } public void analyzeGroupCommit(Analyzer analyzer) throws AnalysisException { - if (isGroupCommitStreamLoadSql && targetTable != null && (targetTable instanceof OlapTable) - && !((OlapTable) targetTable).getTableProperty().getUseSchemaLightChange()) { - throw new AnalysisException( - "table light_schema_change is false, can't do http_stream with group commit mode"); + // check if http stream meets group commit requirements. + // If not meets, throw exception (consider fallback to non group commit mode). + if (isGroupCommitStreamLoadSql) { + if (targetTable != null && (targetTable instanceof OlapTable) + && !((OlapTable) targetTable).getTableProperty().getUseSchemaLightChange()) { + throw new AnalysisException( + "table light_schema_change is false, can't do http_stream with group commit mode"); + } + return; } + // check if 'insert into' meets group commit requirements. If meets, set isGroupCommit to true if (isGroupCommit) { return; } diff --git a/regression-test/suites/insert_p2/test_group_commit_http_stream_lineitem_schema_change.groovy b/regression-test/suites/insert_p2/test_group_commit_http_stream_lineitem_schema_change.groovy index dd0d40e645..7d43f0c91e 100644 --- a/regression-test/suites/insert_p2/test_group_commit_http_stream_lineitem_schema_change.groovy +++ b/regression-test/suites/insert_p2/test_group_commit_http_stream_lineitem_schema_change.groovy @@ -368,7 +368,7 @@ l_comment) select c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c14, c15, c def process = { table_name -> - for (int i = 1; i <= 4; i++) { + for (int i = 1; i <= 5; i++) { switch (i) { case SC.TRUNCATE_TABLE.value: truncate(table_name)