diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp
index 04c3ed9f8c..53a09999cd 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -825,6 +825,8 @@ void TaskWorkerPool::_publish_version_worker_thread_callback() {
_tasks.push_back(agent_task_req);
_worker_thread_condition_variable.notify_one();
}
+ LOG(INFO) << "wait for previous publish version task to be done"
+ << "transaction_id: " << publish_version_req.transaction_id;
break;
} else {
LOG_WARNING("failed to publish version")
diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 58c2eb479e..f520ab5f64 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -1099,7 +1099,7 @@ public class FrontendServiceImpl implements FrontendService.Iface {
Env.getCurrentGlobalTransactionMgr()
.commitTransaction2PC(database, tableList, request.getTxnId(), 5000);
} else if (txnOperation.equalsIgnoreCase("abort")) {
- Env.getCurrentGlobalTransactionMgr().abortTransaction2PC(database.getId(), request.getTxnId());
+ Env.getCurrentGlobalTransactionMgr().abortTransaction2PC(database.getId(), request.getTxnId(), tableList);
} else {
throw new UserException("transaction operation should be \'commit\' or \'abort\'");
}
@@ -1221,9 +1221,15 @@ public class FrontendServiceImpl implements FrontendService.Iface {
throw new MetaNotFoundException("db " + request.getDb() + " does not exist");
}
long dbId = db.getId();
+ DatabaseTransactionMgr dbTransactionMgr = Env.getCurrentGlobalTransactionMgr().getDatabaseTransactionMgr(dbId);
+ TransactionState transactionState = dbTransactionMgr.getTransactionState(request.getTxnId());
+ if (transactionState == null) {
+ throw new UserException("transaction [" + request.getTxnId() + "] not found");
+ }
+ List
tableList = db.getTablesOnIdOrderIfExist(transactionState.getTableIdList());
Env.getCurrentGlobalTransactionMgr().abortTransaction(dbId, request.getTxnId(),
request.isSetReason() ? request.getReason() : "system cancel",
- TxnCommitAttachment.fromThrift(request.getTxnCommitAttachment()));
+ TxnCommitAttachment.fromThrift(request.getTxnCommitAttachment()), tableList);
}
@Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
index 06359155cb..e32c24e373 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
@@ -297,13 +297,29 @@ public class GlobalTransactionMgr implements Writable {
}
public void abortTransaction(long dbId, long transactionId, String reason) throws UserException {
- abortTransaction(dbId, transactionId, reason, null);
+ Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
+ TransactionState transactionState = getDatabaseTransactionMgr(dbId).getTransactionState(transactionId);
+ List tableList = db.getTablesOnIdOrderIfExist(transactionState.getTableIdList());
+ abortTransaction(dbId, transactionId, reason, null, tableList);
+ }
+
+ public void abortTransaction(long dbId, long transactionId, String reason, List tableList)
+ throws UserException {
+ abortTransaction(dbId, transactionId, reason, null, tableList);
}
public void abortTransaction(Long dbId, Long txnId, String reason,
- TxnCommitAttachment txnCommitAttachment) throws UserException {
+ TxnCommitAttachment txnCommitAttachment, List tableList) throws UserException {
DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(dbId);
- dbTransactionMgr.abortTransaction(txnId, reason, txnCommitAttachment);
+ if (!MetaLockUtils.tryWriteLockTablesOrMetaException(tableList, 5000, TimeUnit.MILLISECONDS)) {
+ throw new UserException("get tableList write lock timeout, tableList=("
+ + StringUtils.join(tableList, ",") + ")");
+ }
+ try {
+ dbTransactionMgr.abortTransaction(txnId, reason, txnCommitAttachment);
+ } finally {
+ MetaLockUtils.writeUnlockTables(tableList);
+ }
}
// for http cancel stream load api
@@ -312,9 +328,17 @@ public class GlobalTransactionMgr implements Writable {
dbTransactionMgr.abortTransaction(label, reason);
}
- public void abortTransaction2PC(Long dbId, long transactionId) throws UserException {
+ public void abortTransaction2PC(Long dbId, long transactionId, List tableList) throws UserException {
DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(dbId);
- dbTransactionMgr.abortTransaction2PC(transactionId);
+ if (!MetaLockUtils.tryWriteLockTablesOrMetaException(tableList, 5000, TimeUnit.MILLISECONDS)) {
+ throw new UserException("get tableList write lock timeout, tableList=("
+ + StringUtils.join(tableList, ",") + ")");
+ }
+ try {
+ dbTransactionMgr.abortTransaction2PC(transactionId);
+ } finally {
+ MetaLockUtils.writeUnlockTables(tableList);
+ }
}
/*