[bug](txn) fix concurrent txns's status data race when aborting txn (#17893)
This commit is contained in:
@ -825,6 +825,8 @@ void TaskWorkerPool::_publish_version_worker_thread_callback() {
|
||||
_tasks.push_back(agent_task_req);
|
||||
_worker_thread_condition_variable.notify_one();
|
||||
}
|
||||
LOG(INFO) << "wait for previous publish version task to be done"
|
||||
<< "transaction_id: " << publish_version_req.transaction_id;
|
||||
break;
|
||||
} else {
|
||||
LOG_WARNING("failed to publish version")
|
||||
|
||||
@ -1099,7 +1099,7 @@ public class FrontendServiceImpl implements FrontendService.Iface {
|
||||
Env.getCurrentGlobalTransactionMgr()
|
||||
.commitTransaction2PC(database, tableList, request.getTxnId(), 5000);
|
||||
} else if (txnOperation.equalsIgnoreCase("abort")) {
|
||||
Env.getCurrentGlobalTransactionMgr().abortTransaction2PC(database.getId(), request.getTxnId());
|
||||
Env.getCurrentGlobalTransactionMgr().abortTransaction2PC(database.getId(), request.getTxnId(), tableList);
|
||||
} else {
|
||||
throw new UserException("transaction operation should be \'commit\' or \'abort\'");
|
||||
}
|
||||
@ -1221,9 +1221,15 @@ public class FrontendServiceImpl implements FrontendService.Iface {
|
||||
throw new MetaNotFoundException("db " + request.getDb() + " does not exist");
|
||||
}
|
||||
long dbId = db.getId();
|
||||
DatabaseTransactionMgr dbTransactionMgr = Env.getCurrentGlobalTransactionMgr().getDatabaseTransactionMgr(dbId);
|
||||
TransactionState transactionState = dbTransactionMgr.getTransactionState(request.getTxnId());
|
||||
if (transactionState == null) {
|
||||
throw new UserException("transaction [" + request.getTxnId() + "] not found");
|
||||
}
|
||||
List<Table> tableList = db.getTablesOnIdOrderIfExist(transactionState.getTableIdList());
|
||||
Env.getCurrentGlobalTransactionMgr().abortTransaction(dbId, request.getTxnId(),
|
||||
request.isSetReason() ? request.getReason() : "system cancel",
|
||||
TxnCommitAttachment.fromThrift(request.getTxnCommitAttachment()));
|
||||
TxnCommitAttachment.fromThrift(request.getTxnCommitAttachment()), tableList);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -297,13 +297,29 @@ public class GlobalTransactionMgr implements Writable {
|
||||
}
|
||||
|
||||
public void abortTransaction(long dbId, long transactionId, String reason) throws UserException {
|
||||
abortTransaction(dbId, transactionId, reason, null);
|
||||
Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
|
||||
TransactionState transactionState = getDatabaseTransactionMgr(dbId).getTransactionState(transactionId);
|
||||
List<Table> tableList = db.getTablesOnIdOrderIfExist(transactionState.getTableIdList());
|
||||
abortTransaction(dbId, transactionId, reason, null, tableList);
|
||||
}
|
||||
|
||||
public void abortTransaction(long dbId, long transactionId, String reason, List<Table> tableList)
|
||||
throws UserException {
|
||||
abortTransaction(dbId, transactionId, reason, null, tableList);
|
||||
}
|
||||
|
||||
public void abortTransaction(Long dbId, Long txnId, String reason,
|
||||
TxnCommitAttachment txnCommitAttachment) throws UserException {
|
||||
TxnCommitAttachment txnCommitAttachment, List<Table> tableList) throws UserException {
|
||||
DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(dbId);
|
||||
dbTransactionMgr.abortTransaction(txnId, reason, txnCommitAttachment);
|
||||
if (!MetaLockUtils.tryWriteLockTablesOrMetaException(tableList, 5000, TimeUnit.MILLISECONDS)) {
|
||||
throw new UserException("get tableList write lock timeout, tableList=("
|
||||
+ StringUtils.join(tableList, ",") + ")");
|
||||
}
|
||||
try {
|
||||
dbTransactionMgr.abortTransaction(txnId, reason, txnCommitAttachment);
|
||||
} finally {
|
||||
MetaLockUtils.writeUnlockTables(tableList);
|
||||
}
|
||||
}
|
||||
|
||||
// for http cancel stream load api
|
||||
@ -312,9 +328,17 @@ public class GlobalTransactionMgr implements Writable {
|
||||
dbTransactionMgr.abortTransaction(label, reason);
|
||||
}
|
||||
|
||||
public void abortTransaction2PC(Long dbId, long transactionId) throws UserException {
|
||||
public void abortTransaction2PC(Long dbId, long transactionId, List<Table> tableList) throws UserException {
|
||||
DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(dbId);
|
||||
dbTransactionMgr.abortTransaction2PC(transactionId);
|
||||
if (!MetaLockUtils.tryWriteLockTablesOrMetaException(tableList, 5000, TimeUnit.MILLISECONDS)) {
|
||||
throw new UserException("get tableList write lock timeout, tableList=("
|
||||
+ StringUtils.join(tableList, ",") + ")");
|
||||
}
|
||||
try {
|
||||
dbTransactionMgr.abortTransaction2PC(transactionId);
|
||||
} finally {
|
||||
MetaLockUtils.writeUnlockTables(tableList);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
|
||||
Reference in New Issue
Block a user