diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
index 3b8428a435..1b76ce42bc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
@@ -321,7 +321,7 @@ public class BrokerLoadJob extends BulkLoadJob {
.add("txn_id", transactionId)
.add("msg", "Load job try to commit txn")
.build());
- Env.getCurrentGlobalTransactionMgr().commitTransaction(
+ Env.getCurrentGlobalTransactionMgr().commitTransactionWithoutLock(
dbId, tableList, transactionId, commitInfos,
new LoadJobFinalOperation(id, loadingStatus, progress, loadStartTimestamp,
finishTimestamp, state, failMsg));
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
index 9aac4b3655..f7accb126e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
@@ -653,7 +653,7 @@ public class SparkLoadJob extends BulkLoadJob {
Lists.newArrayList(tableToLoadPartitions.keySet()));
MetaLockUtils.writeLockTablesOrMetaException(tableList);
try {
- Env.getCurrentGlobalTransactionMgr().commitTransaction(
+ Env.getCurrentGlobalTransactionMgr().commitTransactionWithoutLock(
dbId, tableList, transactionId, commitInfos,
new LoadJobFinalOperation(id, loadingStatus, progress, loadStartTimestamp,
finishTimestamp, state, failMsg));
diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 7c1b6a45b4..d48418c7f2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -1747,11 +1747,21 @@ public class FrontendServiceImpl implements FrontendService.Iface {
long timeoutMs = request.isSetThriftRpcTimeoutMs() ? request.getThriftRpcTimeoutMs() / 2 : 5000;
// Step 5: commit and publish
- return Env.getCurrentGlobalTransactionMgr()
- .commitAndPublishTransaction(db, tableList,
- request.getTxnId(),
- TabletCommitInfo.fromThrift(request.getCommitInfos()), timeoutMs,
- TxnCommitAttachment.fromThrift(request.getTxnCommitAttachment()));
+ if (!request.isOnlyCommit()) {
+ return Env.getCurrentGlobalTransactionMgr()
+ .commitAndPublishTransaction(db, tableList,
+ request.getTxnId(),
+ TabletCommitInfo.fromThrift(request.getCommitInfos()), timeoutMs,
+ TxnCommitAttachment.fromThrift(request.getTxnCommitAttachment()));
+ } else {
+ // single table commit, so don't need to wait for publish.
+ Env.getCurrentGlobalTransactionMgr()
+ .commitTransaction(db, tableList,
+ request.getTxnId(),
+ TabletCommitInfo.fromThrift(request.getCommitInfos()), timeoutMs,
+ TxnCommitAttachment.fromThrift(request.getTxnCommitAttachment()));
+ return true;
+ }
}
@Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
index 15d6daf30d..3bee819a1c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
@@ -226,10 +226,10 @@ public class GlobalTransactionMgr implements Writable {
}
@Deprecated
- public void commitTransaction(long dbId, List
tableList,
+ public void commitTransactionWithoutLock(long dbId, List tableList,
long transactionId, List tabletCommitInfos)
throws UserException {
- commitTransaction(dbId, tableList, transactionId, tabletCommitInfos, null);
+ commitTransactionWithoutLock(dbId, tableList, transactionId, tabletCommitInfos, null);
}
/**
@@ -241,7 +241,7 @@ public class GlobalTransactionMgr implements Writable {
* @note it is necessary to optimize the `lock` mechanism and `lock` scope resulting from wait lock long time
* @note callers should get all tables' write locks before call this api
*/
- public void commitTransaction(long dbId, List tableList, long transactionId,
+ public void commitTransactionWithoutLock(long dbId, List tableList, long transactionId,
List tabletCommitInfos, TxnCommitAttachment txnCommitAttachment)
throws UserException {
if (Config.disable_load_job) {
@@ -255,6 +255,20 @@ public class GlobalTransactionMgr implements Writable {
dbTransactionMgr.commitTransaction(tableList, transactionId, tabletCommitInfos, txnCommitAttachment, false);
}
+ public void commitTransaction(DatabaseIf db, List tableList, long transactionId,
+ List tabletCommitInfos, long timeoutMillis, TxnCommitAttachment txnCommitAttachment)
+ throws UserException {
+ if (!MetaLockUtils.tryWriteLockTablesOrMetaException(tableList, timeoutMillis, TimeUnit.MILLISECONDS)) {
+ throw new UserException("get tableList write lock timeout, tableList=("
+ + StringUtils.join(tableList, ",") + ")");
+ }
+ try {
+ commitTransactionWithoutLock(db.getId(), tableList, transactionId, tabletCommitInfos, txnCommitAttachment);
+ } finally {
+ MetaLockUtils.writeUnlockTables(tableList);
+ }
+ }
+
private void commitTransaction2PC(long dbId, long transactionId)
throws UserException {
if (Config.disable_load_job) {
@@ -282,7 +296,7 @@ public class GlobalTransactionMgr implements Writable {
+ StringUtils.join(tableList, ",") + ")");
}
try {
- commitTransaction(db.getId(), tableList, transactionId, tabletCommitInfos, txnCommitAttachment);
+ commitTransactionWithoutLock(db.getId(), tableList, transactionId, tabletCommitInfos, txnCommitAttachment);
} finally {
MetaLockUtils.writeUnlockTables(tableList);
}
@@ -313,7 +327,7 @@ public class GlobalTransactionMgr implements Writable {
}
stopWatch.stop();
LOG.info("stream load tasks are committed successfully. txns: {}. time cost: {} ms."
- + " data will be visable later.", transactionId, stopWatch.getTime());
+ + " data will be visible later.", transactionId, stopWatch.getTime());
}
public void abortTransaction(long dbId, long transactionId, String reason) throws UserException {
diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java
index 18f29ac30d..bf1b82019e 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java
@@ -358,7 +358,8 @@ public class DeleteHandlerTest {
new Expectations(globalTransactionMgr) {
{
try {
- globalTransactionMgr.commitTransaction(anyLong, (List) any, anyLong, (List) any, (TxnCommitAttachment) any);
+ globalTransactionMgr.commitTransactionWithoutLock(
+ anyLong, (List) any, anyLong, (List) any, (TxnCommitAttachment) any);
} catch (UserException e) {
// CHECKSTYLE IGNORE THIS LINE
}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java
index e3916cfb18..f1e9942c7e 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java
@@ -377,7 +377,8 @@ public class SparkLoadJobTest {
AgentTaskExecutor.submit((AgentBatchTask) any);
Env.getCurrentGlobalTransactionMgr();
result = transactionMgr;
- transactionMgr.commitTransaction(dbId, (List) any, transactionId, (List) any,
+ transactionMgr.commitTransactionWithoutLock(
+ dbId, (List) any, transactionId, (List) any,
(LoadJobFinalOperation) any);
}
};
diff --git a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
index 5f182ef50b..76ae778153 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
@@ -111,7 +111,8 @@ public class DatabaseTransactionMgrTest {
transTablets.add(tabletCommitInfo3);
Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId1,
+ masterTransMgr.commitTransactionWithoutLock(
+ CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId1,
transTablets, null);
TransactionState transactionState1 = fakeEditLog.getTransaction(transactionId1);
setTransactionFinishPublish(transactionState1,
diff --git a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
index 7910df48be..2779a5d510 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
@@ -172,8 +172,8 @@ public class GlobalTransactionMgrTest {
transTablets.add(tabletCommitInfo3);
Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId,
- transTablets, null);
+ masterTransMgr.commitTransactionWithoutLock(
+ CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId, transTablets, null);
TransactionState transactionState = fakeEditLog.getTransaction(transactionId);
// check status is committed
Assert.assertEquals(TransactionStatus.COMMITTED, transactionState.getTransactionStatus());
@@ -213,7 +213,8 @@ public class GlobalTransactionMgrTest {
transTablets.add(tabletCommitInfo2);
Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId,
+ masterTransMgr.commitTransactionWithoutLock(
+ CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId,
transTablets, null);
// follower catalog replay the transaction
@@ -235,7 +236,8 @@ public class GlobalTransactionMgrTest {
transTablets.add(tabletCommitInfo1);
transTablets.add(tabletCommitInfo3);
try {
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2,
+ masterTransMgr.commitTransactionWithoutLock(
+ CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2,
transTablets, null);
Assert.fail();
} catch (TabletQuorumFailedException e) {
@@ -265,7 +267,8 @@ public class GlobalTransactionMgrTest {
transTablets.add(tabletCommitInfo1);
transTablets.add(tabletCommitInfo2);
transTablets.add(tabletCommitInfo3);
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2,
+ masterTransMgr.commitTransactionWithoutLock(
+ CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2,
transTablets, null);
transactionState = fakeEditLog.getTransaction(transactionId2);
// check status is commit
@@ -366,7 +369,8 @@ public class GlobalTransactionMgrTest {
Deencapsulation.setField(masterTransMgr.getDatabaseTransactionMgr(CatalogTestUtil.testDbId1), "idToRunningTransactionState", idToTransactionState);
Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
- masterTransMgr.commitTransaction(1L, Lists.newArrayList(testTable1), 1L, transTablets, txnCommitAttachment);
+ masterTransMgr.commitTransactionWithoutLock(
+ 1L, Lists.newArrayList(testTable1), 1L, transTablets, txnCommitAttachment);
RoutineLoadStatistic jobStatistic = Deencapsulation.getField(routineLoadJob, "jobStatistic");
Assert.assertEquals(Long.valueOf(101), Deencapsulation.getField(jobStatistic, "currentTotalRows"));
@@ -439,7 +443,8 @@ public class GlobalTransactionMgrTest {
Deencapsulation.setField(masterTransMgr.getDatabaseTransactionMgr(CatalogTestUtil.testDbId1), "idToRunningTransactionState", idToTransactionState);
Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
- masterTransMgr.commitTransaction(1L, Lists.newArrayList(testTable1), 1L, transTablets, txnCommitAttachment);
+ masterTransMgr.commitTransactionWithoutLock(
+ 1L, Lists.newArrayList(testTable1), 1L, transTablets, txnCommitAttachment);
// current total rows and error rows will be reset after job pause, so here they should be 0.
RoutineLoadStatistic jobStatistic = Deencapsulation.getField(routineLoadJob, "jobStatistic");
@@ -471,7 +476,8 @@ public class GlobalTransactionMgrTest {
transTablets.add(tabletCommitInfo3);
Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId,
+ masterTransMgr.commitTransactionWithoutLock(
+ CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId,
transTablets, null);
TransactionState transactionState = fakeEditLog.getTransaction(transactionId);
Assert.assertEquals(TransactionStatus.COMMITTED, transactionState.getTransactionStatus());
@@ -537,7 +543,8 @@ public class GlobalTransactionMgrTest {
transTablets.add(tabletCommitInfo2);
Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId,
+ masterTransMgr.commitTransactionWithoutLock(
+ CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId,
transTablets, null);
// follower catalog replay the transaction
@@ -607,7 +614,8 @@ public class GlobalTransactionMgrTest {
transTablets.add(tabletCommitInfo1);
transTablets.add(tabletCommitInfo3);
try {
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2,
+ masterTransMgr.commitTransactionWithoutLock(
+ CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2,
transTablets, null);
Assert.fail();
} catch (TabletQuorumFailedException e) {
@@ -624,7 +632,8 @@ public class GlobalTransactionMgrTest {
transTablets.add(tabletCommitInfo1);
transTablets.add(tabletCommitInfo2);
transTablets.add(tabletCommitInfo3);
- masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2,
+ masterTransMgr.commitTransactionWithoutLock(
+ CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2,
transTablets, null);
transactionState = fakeEditLog.getTransaction(transactionId2);
// check status is commit
diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift
index 3018924064..210f1ada0a 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -824,6 +824,8 @@ struct TCommitTxnRequest {
10: optional i64 thrift_rpc_timeout_ms
11: optional string token
12: optional i64 db_id
+ // used for ccr
+ 15: optional bool only_commit // only commit txn, without waiting txn publish
}
struct TCommitTxnResult {