From 386c55f00aa9cd2f178eefb1d41f6e082e0715cd Mon Sep 17 00:00:00 2001 From: meiyi Date: Wed, 25 Sep 2024 10:56:10 +0800 Subject: [PATCH] [fix](transaction) commit txn check txn status (#40064) (#41227) pick https://github.com/apache/doris/pull/40064/ --- .../doris/service/FrontendServiceImpl.java | 16 ++++--------- .../transaction/DatabaseTransactionMgr.java | 18 ++++++--------- .../transaction/GlobalTransactionMgr.java | 5 +++- .../DatabaseTransactionMgrTest.java | 3 ++- .../transaction/GlobalTransactionMgrTest.java | 23 ++++++++++++------- 5 files changed, 32 insertions(+), 33 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index be05c02316..29e1793490 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -1712,13 +1712,8 @@ public class FrontendServiceImpl implements FrontendService.Iface { throw new UserException("transaction [" + request.getTxnId() + "] not found"); } List tableIdList = transactionState.getTableIdList(); - List tableList = new ArrayList<>(); - List tables = new ArrayList<>(); // if table was dropped, transaction must be aborted - tableList = db.getTablesOnIdOrderOrThrowException(tableIdList); - for (Table table : tableList) { - tables.add(table.getName()); - } + List
tableList = db.getTablesOnIdOrderOrThrowException(tableIdList); // Step 3: check auth if (request.isSetAuthCode()) { @@ -1726,6 +1721,7 @@ public class FrontendServiceImpl implements FrontendService.Iface { } else if (request.isSetToken()) { checkToken(request.getToken()); } else { + List tables = tableList.stream().map(Table::getName).collect(Collectors.toList()); checkPasswordAndPrivs(request.getUser(), request.getPasswd(), request.getDb(), tables, request.getUserIp(), PrivPredicate.LOAD); } @@ -1901,12 +1897,7 @@ public class FrontendServiceImpl implements FrontendService.Iface { throw new UserException("transaction [" + request.getTxnId() + "] not found"); } List tableIdList = transactionState.getTableIdList(); - List
tableList = new ArrayList<>(); - List tables = new ArrayList<>(); - tableList = db.getTablesOnIdOrderOrThrowException(tableIdList); - for (Table table : tableList) { - tables.add(table.getName()); - } + List
tableList = db.getTablesOnIdOrderOrThrowException(tableIdList); // Step 3: check auth if (request.isSetAuthCode()) { @@ -1914,6 +1905,7 @@ public class FrontendServiceImpl implements FrontendService.Iface { } else if (request.isSetToken()) { checkToken(request.getToken()); } else { + List tables = tableList.stream().map(Table::getName).collect(Collectors.toList()); checkPasswordAndPrivs(request.getUser(), request.getPasswd(), request.getDb(), tables, request.getUserIp(), PrivPredicate.LOAD); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java index e779476975..61721e7d89 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java @@ -92,7 +92,6 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; /** @@ -126,14 +125,6 @@ public class DatabaseTransactionMgr { // transactionId -> running TransactionState private final Map idToRunningTransactionState = Maps.newHashMap(); - /** - * the multi table ids that are in transaction, used to check whether a table is in transaction - * multi table transaction state - * txnId -> tableId list - */ - private final ConcurrentHashMap> multiTableRunningTransactionTableIdMaps = - new ConcurrentHashMap<>(); - // transactionId -> final status TransactionState private final Map idToFinalStatusTransactionState = Maps.newHashMap(); @@ -472,8 +463,13 @@ public class DatabaseTransactionMgr { checkCommitStatus(tableList, transactionState, tabletCommitInfos, txnCommitAttachment, errorReplicaIds, tableToPartition, totalInvolvedBackends); - unprotectedPreCommitTransaction2PC(transactionState, errorReplicaIds, tableToPartition, - totalInvolvedBackends, db); + writeLock(); + try { + unprotectedPreCommitTransaction2PC(transactionState, errorReplicaIds, tableToPartition, + totalInvolvedBackends, db); + } finally { + writeUnlock(); + } LOG.info("transaction:[{}] successfully pre-committed", transactionState); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java index 928fa78550..15d6daf30d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java @@ -211,7 +211,7 @@ public class GlobalTransactionMgr implements Writable { } } - public void preCommitTransaction2PC(long dbId, List
tableList, long transactionId, + private void preCommitTransaction2PC(long dbId, List
tableList, long transactionId, List tabletCommitInfos, TxnCommitAttachment txnCommitAttachment) throws UserException { if (Config.disable_load_job) { @@ -225,6 +225,7 @@ public class GlobalTransactionMgr implements Writable { dbTransactionMgr.preCommitTransaction2PC(tableList, transactionId, tabletCommitInfos, txnCommitAttachment); } + @Deprecated public void commitTransaction(long dbId, List
tableList, long transactionId, List tabletCommitInfos) throws UserException { @@ -692,6 +693,7 @@ public class GlobalTransactionMgr implements Writable { TransactionState transactionState = dbTransactionMgr.getTransactionState(txnInfo.second); long coordStartTime = transactionState.getCoordinator().startTime; if (coordStartTime < beStartTime) { + // does not hold table write lock dbTransactionMgr.abortTransaction(txnInfo.second, "coordinate BE restart", null); } } catch (UserException e) { @@ -709,6 +711,7 @@ public class GlobalTransactionMgr implements Writable { = getPrepareTransactionIdByCoordinateBe(coordinateBeId, coordinateHost, limit); for (Pair txnInfo : transactionIdByCoordinateBe) { try { + // does not hold table write lock DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(txnInfo.first); dbTransactionMgr.abortTransaction(txnInfo.second, "coordinate BE is down", null); } catch (UserException e) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java index 437f1bcb20..5f182ef50b 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java @@ -111,7 +111,8 @@ public class DatabaseTransactionMgrTest { transTablets.add(tabletCommitInfo3); Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1) .getTableOrMetaException(CatalogTestUtil.testTableId1); - masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId1, transTablets); + masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId1, + transTablets, null); TransactionState transactionState1 = fakeEditLog.getTransaction(transactionId1); setTransactionFinishPublish(transactionState1, Lists.newArrayList(CatalogTestUtil.testBackendId1, diff --git a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java index 5c8f72723a..8ae95f35f2 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java @@ -173,7 +173,7 @@ public class GlobalTransactionMgrTest { Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1) .getTableOrMetaException(CatalogTestUtil.testTableId1); masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId, - transTablets); + transTablets, null); TransactionState transactionState = fakeEditLog.getTransaction(transactionId); // check status is committed Assert.assertEquals(TransactionStatus.COMMITTED, transactionState.getTransactionStatus()); @@ -213,7 +213,8 @@ public class GlobalTransactionMgrTest { transTablets.add(tabletCommitInfo2); Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1) .getTableOrMetaException(CatalogTestUtil.testTableId1); - masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId, transTablets); + masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId, + transTablets, null); // follower catalog replay the transaction transactionState = fakeEditLog.getTransaction(transactionId); @@ -234,7 +235,8 @@ public class GlobalTransactionMgrTest { transTablets.add(tabletCommitInfo1); transTablets.add(tabletCommitInfo3); try { - masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2, transTablets); + masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2, + transTablets, null); Assert.fail(); } catch (TabletQuorumFailedException e) { transactionState = masterTransMgr.getTransactionState(CatalogTestUtil.testDbId1, transactionId2); @@ -263,7 +265,8 @@ public class GlobalTransactionMgrTest { transTablets.add(tabletCommitInfo1); transTablets.add(tabletCommitInfo2); transTablets.add(tabletCommitInfo3); - masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2, transTablets); + masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2, + transTablets, null); transactionState = fakeEditLog.getTransaction(transactionId2); // check status is commit Assert.assertEquals(TransactionStatus.COMMITTED, transactionState.getTransactionStatus()); @@ -468,7 +471,8 @@ public class GlobalTransactionMgrTest { transTablets.add(tabletCommitInfo3); Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1) .getTableOrMetaException(CatalogTestUtil.testTableId1); - masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId, transTablets); + masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId, + transTablets, null); TransactionState transactionState = fakeEditLog.getTransaction(transactionId); Assert.assertEquals(TransactionStatus.COMMITTED, transactionState.getTransactionStatus()); slaveTransMgr.replayUpsertTransactionState(transactionState); @@ -533,7 +537,8 @@ public class GlobalTransactionMgrTest { transTablets.add(tabletCommitInfo2); Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1) .getTableOrMetaException(CatalogTestUtil.testTableId1); - masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId, transTablets); + masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId, + transTablets, null); // follower catalog replay the transaction transactionState = fakeEditLog.getTransaction(transactionId); @@ -602,7 +607,8 @@ public class GlobalTransactionMgrTest { transTablets.add(tabletCommitInfo1); transTablets.add(tabletCommitInfo3); try { - masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2, transTablets); + masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2, + transTablets, null); Assert.fail(); } catch (TabletQuorumFailedException e) { transactionState = masterTransMgr.getTransactionState(CatalogTestUtil.testDbId1, transactionId2); @@ -618,7 +624,8 @@ public class GlobalTransactionMgrTest { transTablets.add(tabletCommitInfo1); transTablets.add(tabletCommitInfo2); transTablets.add(tabletCommitInfo3); - masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2, transTablets); + masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2, + transTablets, null); transactionState = fakeEditLog.getTransaction(transactionId2); // check status is commit Assert.assertEquals(TransactionStatus.COMMITTED, transactionState.getTransactionStatus());