[fix](transaction) commit txn check txn status (#40064) (#41227)

pick https://github.com/apache/doris/pull/40064/
This commit is contained in:
meiyi
2024-09-25 10:56:10 +08:00
committed by GitHub
parent 5b3b2cec80
commit 386c55f00a
5 changed files with 32 additions and 33 deletions

View File

@ -1712,13 +1712,8 @@ public class FrontendServiceImpl implements FrontendService.Iface {
throw new UserException("transaction [" + request.getTxnId() + "] not found");
}
List<Long> tableIdList = transactionState.getTableIdList();
List<Table> tableList = new ArrayList<>();
List<String> tables = new ArrayList<>();
// if table was dropped, transaction must be aborted
tableList = db.getTablesOnIdOrderOrThrowException(tableIdList);
for (Table table : tableList) {
tables.add(table.getName());
}
List<Table> tableList = db.getTablesOnIdOrderOrThrowException(tableIdList);
// Step 3: check auth
if (request.isSetAuthCode()) {
@ -1726,6 +1721,7 @@ public class FrontendServiceImpl implements FrontendService.Iface {
} else if (request.isSetToken()) {
checkToken(request.getToken());
} else {
List<String> tables = tableList.stream().map(Table::getName).collect(Collectors.toList());
checkPasswordAndPrivs(request.getUser(), request.getPasswd(), request.getDb(), tables,
request.getUserIp(), PrivPredicate.LOAD);
}
@ -1901,12 +1897,7 @@ public class FrontendServiceImpl implements FrontendService.Iface {
throw new UserException("transaction [" + request.getTxnId() + "] not found");
}
List<Long> tableIdList = transactionState.getTableIdList();
List<Table> tableList = new ArrayList<>();
List<String> tables = new ArrayList<>();
tableList = db.getTablesOnIdOrderOrThrowException(tableIdList);
for (Table table : tableList) {
tables.add(table.getName());
}
List<Table> tableList = db.getTablesOnIdOrderOrThrowException(tableIdList);
// Step 3: check auth
if (request.isSetAuthCode()) {
@ -1914,6 +1905,7 @@ public class FrontendServiceImpl implements FrontendService.Iface {
} else if (request.isSetToken()) {
checkToken(request.getToken());
} else {
List<String> tables = tableList.stream().map(Table::getName).collect(Collectors.toList());
checkPasswordAndPrivs(request.getUser(), request.getPasswd(), request.getDb(), tables,
request.getUserIp(), PrivPredicate.LOAD);
}

View File

@ -92,7 +92,6 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
/**
@ -126,14 +125,6 @@ public class DatabaseTransactionMgr {
// transactionId -> running TransactionState
private final Map<Long, TransactionState> idToRunningTransactionState = Maps.newHashMap();
/**
* the multi table ids that are in transaction, used to check whether a table is in transaction
* multi table transaction state
* txnId -> tableId list
*/
private final ConcurrentHashMap<Long, List<Long>> multiTableRunningTransactionTableIdMaps =
new ConcurrentHashMap<>();
// transactionId -> final status TransactionState
private final Map<Long, TransactionState> idToFinalStatusTransactionState = Maps.newHashMap();
@ -472,8 +463,13 @@ public class DatabaseTransactionMgr {
checkCommitStatus(tableList, transactionState, tabletCommitInfos, txnCommitAttachment, errorReplicaIds,
tableToPartition, totalInvolvedBackends);
unprotectedPreCommitTransaction2PC(transactionState, errorReplicaIds, tableToPartition,
totalInvolvedBackends, db);
writeLock();
try {
unprotectedPreCommitTransaction2PC(transactionState, errorReplicaIds, tableToPartition,
totalInvolvedBackends, db);
} finally {
writeUnlock();
}
LOG.info("transaction:[{}] successfully pre-committed", transactionState);
}

View File

@ -211,7 +211,7 @@ public class GlobalTransactionMgr implements Writable {
}
}
public void preCommitTransaction2PC(long dbId, List<Table> tableList, long transactionId,
private void preCommitTransaction2PC(long dbId, List<Table> tableList, long transactionId,
List<TabletCommitInfo> tabletCommitInfos, TxnCommitAttachment txnCommitAttachment)
throws UserException {
if (Config.disable_load_job) {
@ -225,6 +225,7 @@ public class GlobalTransactionMgr implements Writable {
dbTransactionMgr.preCommitTransaction2PC(tableList, transactionId, tabletCommitInfos, txnCommitAttachment);
}
@Deprecated
public void commitTransaction(long dbId, List<Table> tableList,
long transactionId, List<TabletCommitInfo> tabletCommitInfos)
throws UserException {
@ -692,6 +693,7 @@ public class GlobalTransactionMgr implements Writable {
TransactionState transactionState = dbTransactionMgr.getTransactionState(txnInfo.second);
long coordStartTime = transactionState.getCoordinator().startTime;
if (coordStartTime < beStartTime) {
// does not hold table write lock
dbTransactionMgr.abortTransaction(txnInfo.second, "coordinate BE restart", null);
}
} catch (UserException e) {
@ -709,6 +711,7 @@ public class GlobalTransactionMgr implements Writable {
= getPrepareTransactionIdByCoordinateBe(coordinateBeId, coordinateHost, limit);
for (Pair<Long, Long> txnInfo : transactionIdByCoordinateBe) {
try {
// does not hold table write lock
DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(txnInfo.first);
dbTransactionMgr.abortTransaction(txnInfo.second, "coordinate BE is down", null);
} catch (UserException e) {

View File

@ -111,7 +111,8 @@ public class DatabaseTransactionMgrTest {
transTablets.add(tabletCommitInfo3);
Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId1, transTablets);
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId1,
transTablets, null);
TransactionState transactionState1 = fakeEditLog.getTransaction(transactionId1);
setTransactionFinishPublish(transactionState1,
Lists.newArrayList(CatalogTestUtil.testBackendId1,

View File

@ -173,7 +173,7 @@ public class GlobalTransactionMgrTest {
Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId,
transTablets);
transTablets, null);
TransactionState transactionState = fakeEditLog.getTransaction(transactionId);
// check status is committed
Assert.assertEquals(TransactionStatus.COMMITTED, transactionState.getTransactionStatus());
@ -213,7 +213,8 @@ public class GlobalTransactionMgrTest {
transTablets.add(tabletCommitInfo2);
Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId, transTablets);
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId,
transTablets, null);
// follower catalog replay the transaction
transactionState = fakeEditLog.getTransaction(transactionId);
@ -234,7 +235,8 @@ public class GlobalTransactionMgrTest {
transTablets.add(tabletCommitInfo1);
transTablets.add(tabletCommitInfo3);
try {
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2, transTablets);
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2,
transTablets, null);
Assert.fail();
} catch (TabletQuorumFailedException e) {
transactionState = masterTransMgr.getTransactionState(CatalogTestUtil.testDbId1, transactionId2);
@ -263,7 +265,8 @@ public class GlobalTransactionMgrTest {
transTablets.add(tabletCommitInfo1);
transTablets.add(tabletCommitInfo2);
transTablets.add(tabletCommitInfo3);
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2, transTablets);
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2,
transTablets, null);
transactionState = fakeEditLog.getTransaction(transactionId2);
// check status is commit
Assert.assertEquals(TransactionStatus.COMMITTED, transactionState.getTransactionStatus());
@ -468,7 +471,8 @@ public class GlobalTransactionMgrTest {
transTablets.add(tabletCommitInfo3);
Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId, transTablets);
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId,
transTablets, null);
TransactionState transactionState = fakeEditLog.getTransaction(transactionId);
Assert.assertEquals(TransactionStatus.COMMITTED, transactionState.getTransactionStatus());
slaveTransMgr.replayUpsertTransactionState(transactionState);
@ -533,7 +537,8 @@ public class GlobalTransactionMgrTest {
transTablets.add(tabletCommitInfo2);
Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId, transTablets);
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId,
transTablets, null);
// follower catalog replay the transaction
transactionState = fakeEditLog.getTransaction(transactionId);
@ -602,7 +607,8 @@ public class GlobalTransactionMgrTest {
transTablets.add(tabletCommitInfo1);
transTablets.add(tabletCommitInfo3);
try {
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2, transTablets);
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2,
transTablets, null);
Assert.fail();
} catch (TabletQuorumFailedException e) {
transactionState = masterTransMgr.getTransactionState(CatalogTestUtil.testDbId1, transactionId2);
@ -618,7 +624,8 @@ public class GlobalTransactionMgrTest {
transTablets.add(tabletCommitInfo1);
transTablets.add(tabletCommitInfo2);
transTablets.add(tabletCommitInfo3);
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2, transTablets);
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2,
transTablets, null);
transactionState = fakeEditLog.getTransaction(transactionId2);
// check status is commit
Assert.assertEquals(TransactionStatus.COMMITTED, transactionState.getTransactionStatus());