From f55055096bfdea24b2fd15186b6b1ba71c092af0 Mon Sep 17 00:00:00 2001 From: walter Date: Sat, 29 Mar 2025 08:59:23 +0800 Subject: [PATCH] branch-2.1: [improve](binlog) Allow commit txn without waiting txn publish #48961 (#49266) cherry pick from #48961 --- .../doris/load/loadv2/BrokerLoadJob.java | 2 +- .../doris/load/loadv2/SparkLoadJob.java | 2 +- .../doris/service/FrontendServiceImpl.java | 20 +++++++++--- .../transaction/GlobalTransactionMgr.java | 24 +++++++++++--- .../apache/doris/load/DeleteHandlerTest.java | 3 +- .../doris/load/loadv2/SparkLoadJobTest.java | 3 +- .../DatabaseTransactionMgrTest.java | 3 +- .../transaction/GlobalTransactionMgrTest.java | 31 ++++++++++++------- gensrc/thrift/FrontendService.thrift | 2 ++ 9 files changed, 64 insertions(+), 26 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java index 3b8428a435..1b76ce42bc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java @@ -321,7 +321,7 @@ public class BrokerLoadJob extends BulkLoadJob { .add("txn_id", transactionId) .add("msg", "Load job try to commit txn") .build()); - Env.getCurrentGlobalTransactionMgr().commitTransaction( + Env.getCurrentGlobalTransactionMgr().commitTransactionWithoutLock( dbId, tableList, transactionId, commitInfos, new LoadJobFinalOperation(id, loadingStatus, progress, loadStartTimestamp, finishTimestamp, state, failMsg)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java index 9aac4b3655..f7accb126e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java @@ -653,7 +653,7 @@ public class SparkLoadJob extends BulkLoadJob { Lists.newArrayList(tableToLoadPartitions.keySet())); MetaLockUtils.writeLockTablesOrMetaException(tableList); try { - Env.getCurrentGlobalTransactionMgr().commitTransaction( + Env.getCurrentGlobalTransactionMgr().commitTransactionWithoutLock( dbId, tableList, transactionId, commitInfos, new LoadJobFinalOperation(id, loadingStatus, progress, loadStartTimestamp, finishTimestamp, state, failMsg)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 7c1b6a45b4..d48418c7f2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -1747,11 +1747,21 @@ public class FrontendServiceImpl implements FrontendService.Iface { long timeoutMs = request.isSetThriftRpcTimeoutMs() ? request.getThriftRpcTimeoutMs() / 2 : 5000; // Step 5: commit and publish - return Env.getCurrentGlobalTransactionMgr() - .commitAndPublishTransaction(db, tableList, - request.getTxnId(), - TabletCommitInfo.fromThrift(request.getCommitInfos()), timeoutMs, - TxnCommitAttachment.fromThrift(request.getTxnCommitAttachment())); + if (!request.isOnlyCommit()) { + return Env.getCurrentGlobalTransactionMgr() + .commitAndPublishTransaction(db, tableList, + request.getTxnId(), + TabletCommitInfo.fromThrift(request.getCommitInfos()), timeoutMs, + TxnCommitAttachment.fromThrift(request.getTxnCommitAttachment())); + } else { + // single table commit, so don't need to wait for publish. + Env.getCurrentGlobalTransactionMgr() + .commitTransaction(db, tableList, + request.getTxnId(), + TabletCommitInfo.fromThrift(request.getCommitInfos()), timeoutMs, + TxnCommitAttachment.fromThrift(request.getTxnCommitAttachment())); + return true; + } } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java index 15d6daf30d..3bee819a1c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java @@ -226,10 +226,10 @@ public class GlobalTransactionMgr implements Writable { } @Deprecated - public void commitTransaction(long dbId, List tableList, + public void commitTransactionWithoutLock(long dbId, List
tableList, long transactionId, List tabletCommitInfos) throws UserException { - commitTransaction(dbId, tableList, transactionId, tabletCommitInfos, null); + commitTransactionWithoutLock(dbId, tableList, transactionId, tabletCommitInfos, null); } /** @@ -241,7 +241,7 @@ public class GlobalTransactionMgr implements Writable { * @note it is necessary to optimize the `lock` mechanism and `lock` scope resulting from wait lock long time * @note callers should get all tables' write locks before call this api */ - public void commitTransaction(long dbId, List
tableList, long transactionId, + public void commitTransactionWithoutLock(long dbId, List
tableList, long transactionId, List tabletCommitInfos, TxnCommitAttachment txnCommitAttachment) throws UserException { if (Config.disable_load_job) { @@ -255,6 +255,20 @@ public class GlobalTransactionMgr implements Writable { dbTransactionMgr.commitTransaction(tableList, transactionId, tabletCommitInfos, txnCommitAttachment, false); } + public void commitTransaction(DatabaseIf db, List
tableList, long transactionId, + List tabletCommitInfos, long timeoutMillis, TxnCommitAttachment txnCommitAttachment) + throws UserException { + if (!MetaLockUtils.tryWriteLockTablesOrMetaException(tableList, timeoutMillis, TimeUnit.MILLISECONDS)) { + throw new UserException("get tableList write lock timeout, tableList=(" + + StringUtils.join(tableList, ",") + ")"); + } + try { + commitTransactionWithoutLock(db.getId(), tableList, transactionId, tabletCommitInfos, txnCommitAttachment); + } finally { + MetaLockUtils.writeUnlockTables(tableList); + } + } + private void commitTransaction2PC(long dbId, long transactionId) throws UserException { if (Config.disable_load_job) { @@ -282,7 +296,7 @@ public class GlobalTransactionMgr implements Writable { + StringUtils.join(tableList, ",") + ")"); } try { - commitTransaction(db.getId(), tableList, transactionId, tabletCommitInfos, txnCommitAttachment); + commitTransactionWithoutLock(db.getId(), tableList, transactionId, tabletCommitInfos, txnCommitAttachment); } finally { MetaLockUtils.writeUnlockTables(tableList); } @@ -313,7 +327,7 @@ public class GlobalTransactionMgr implements Writable { } stopWatch.stop(); LOG.info("stream load tasks are committed successfully. txns: {}. time cost: {} ms." - + " data will be visable later.", transactionId, stopWatch.getTime()); + + " data will be visible later.", transactionId, stopWatch.getTime()); } public void abortTransaction(long dbId, long transactionId, String reason) throws UserException { diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java index 18f29ac30d..bf1b82019e 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java @@ -358,7 +358,8 @@ public class DeleteHandlerTest { new Expectations(globalTransactionMgr) { { try { - globalTransactionMgr.commitTransaction(anyLong, (List
) any, anyLong, (List) any, (TxnCommitAttachment) any); + globalTransactionMgr.commitTransactionWithoutLock( + anyLong, (List
) any, anyLong, (List) any, (TxnCommitAttachment) any); } catch (UserException e) { // CHECKSTYLE IGNORE THIS LINE } diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java index e3916cfb18..f1e9942c7e 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java @@ -377,7 +377,8 @@ public class SparkLoadJobTest { AgentTaskExecutor.submit((AgentBatchTask) any); Env.getCurrentGlobalTransactionMgr(); result = transactionMgr; - transactionMgr.commitTransaction(dbId, (List
) any, transactionId, (List) any, + transactionMgr.commitTransactionWithoutLock( + dbId, (List
) any, transactionId, (List) any, (LoadJobFinalOperation) any); } }; diff --git a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java index 5f182ef50b..76ae778153 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java @@ -111,7 +111,8 @@ public class DatabaseTransactionMgrTest { transTablets.add(tabletCommitInfo3); Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1) .getTableOrMetaException(CatalogTestUtil.testTableId1); - masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId1, + masterTransMgr.commitTransactionWithoutLock( + CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId1, transTablets, null); TransactionState transactionState1 = fakeEditLog.getTransaction(transactionId1); setTransactionFinishPublish(transactionState1, diff --git a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java index 7910df48be..2779a5d510 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java @@ -172,8 +172,8 @@ public class GlobalTransactionMgrTest { transTablets.add(tabletCommitInfo3); Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1) .getTableOrMetaException(CatalogTestUtil.testTableId1); - masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId, - transTablets, null); + masterTransMgr.commitTransactionWithoutLock( + CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId, transTablets, null); TransactionState transactionState = fakeEditLog.getTransaction(transactionId); // check status is committed Assert.assertEquals(TransactionStatus.COMMITTED, transactionState.getTransactionStatus()); @@ -213,7 +213,8 @@ public class GlobalTransactionMgrTest { transTablets.add(tabletCommitInfo2); Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1) .getTableOrMetaException(CatalogTestUtil.testTableId1); - masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId, + masterTransMgr.commitTransactionWithoutLock( + CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId, transTablets, null); // follower catalog replay the transaction @@ -235,7 +236,8 @@ public class GlobalTransactionMgrTest { transTablets.add(tabletCommitInfo1); transTablets.add(tabletCommitInfo3); try { - masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2, + masterTransMgr.commitTransactionWithoutLock( + CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2, transTablets, null); Assert.fail(); } catch (TabletQuorumFailedException e) { @@ -265,7 +267,8 @@ public class GlobalTransactionMgrTest { transTablets.add(tabletCommitInfo1); transTablets.add(tabletCommitInfo2); transTablets.add(tabletCommitInfo3); - masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2, + masterTransMgr.commitTransactionWithoutLock( + CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2, transTablets, null); transactionState = fakeEditLog.getTransaction(transactionId2); // check status is commit @@ -366,7 +369,8 @@ public class GlobalTransactionMgrTest { Deencapsulation.setField(masterTransMgr.getDatabaseTransactionMgr(CatalogTestUtil.testDbId1), "idToRunningTransactionState", idToTransactionState); Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1) .getTableOrMetaException(CatalogTestUtil.testTableId1); - masterTransMgr.commitTransaction(1L, Lists.newArrayList(testTable1), 1L, transTablets, txnCommitAttachment); + masterTransMgr.commitTransactionWithoutLock( + 1L, Lists.newArrayList(testTable1), 1L, transTablets, txnCommitAttachment); RoutineLoadStatistic jobStatistic = Deencapsulation.getField(routineLoadJob, "jobStatistic"); Assert.assertEquals(Long.valueOf(101), Deencapsulation.getField(jobStatistic, "currentTotalRows")); @@ -439,7 +443,8 @@ public class GlobalTransactionMgrTest { Deencapsulation.setField(masterTransMgr.getDatabaseTransactionMgr(CatalogTestUtil.testDbId1), "idToRunningTransactionState", idToTransactionState); Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1) .getTableOrMetaException(CatalogTestUtil.testTableId1); - masterTransMgr.commitTransaction(1L, Lists.newArrayList(testTable1), 1L, transTablets, txnCommitAttachment); + masterTransMgr.commitTransactionWithoutLock( + 1L, Lists.newArrayList(testTable1), 1L, transTablets, txnCommitAttachment); // current total rows and error rows will be reset after job pause, so here they should be 0. RoutineLoadStatistic jobStatistic = Deencapsulation.getField(routineLoadJob, "jobStatistic"); @@ -471,7 +476,8 @@ public class GlobalTransactionMgrTest { transTablets.add(tabletCommitInfo3); Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1) .getTableOrMetaException(CatalogTestUtil.testTableId1); - masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId, + masterTransMgr.commitTransactionWithoutLock( + CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId, transTablets, null); TransactionState transactionState = fakeEditLog.getTransaction(transactionId); Assert.assertEquals(TransactionStatus.COMMITTED, transactionState.getTransactionStatus()); @@ -537,7 +543,8 @@ public class GlobalTransactionMgrTest { transTablets.add(tabletCommitInfo2); Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1) .getTableOrMetaException(CatalogTestUtil.testTableId1); - masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId, + masterTransMgr.commitTransactionWithoutLock( + CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId, transTablets, null); // follower catalog replay the transaction @@ -607,7 +614,8 @@ public class GlobalTransactionMgrTest { transTablets.add(tabletCommitInfo1); transTablets.add(tabletCommitInfo3); try { - masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2, + masterTransMgr.commitTransactionWithoutLock( + CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2, transTablets, null); Assert.fail(); } catch (TabletQuorumFailedException e) { @@ -624,7 +632,8 @@ public class GlobalTransactionMgrTest { transTablets.add(tabletCommitInfo1); transTablets.add(tabletCommitInfo2); transTablets.add(tabletCommitInfo3); - masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2, + masterTransMgr.commitTransactionWithoutLock( + CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2, transTablets, null); transactionState = fakeEditLog.getTransaction(transactionId2); // check status is commit diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 3018924064..210f1ada0a 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -824,6 +824,8 @@ struct TCommitTxnRequest { 10: optional i64 thrift_rpc_timeout_ms 11: optional string token 12: optional i64 db_id + // used for ccr + 15: optional bool only_commit // only commit txn, without waiting txn publish } struct TCommitTxnResult {