[4.1][xa] create xa savepoint after acquiring local stmt lock

This commit is contained in:
obdev 2023-03-21 17:03:57 +00:00 committed by ob-robot
parent d760428489
commit 3d997e7f47
3 changed files with 21 additions and 7 deletions

View File

@ -1776,11 +1776,18 @@ int ObXACtx::start_stmt(const ObXATransID &xid, const uint32_t session_id)
} else if (OB_ISNULL(tx_desc_)) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "trans desc is null", K(ret), K(*this));
} else if (OB_FAIL(create_xa_savepoint_if_need_(xid, session_id))) {
TRANS_LOG(WARN, "check xa savepoint fail", K(ret), K(xid), K(session_id), K(*this));
} else if (is_exiting_) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "xa ctx is exiting", K(ret), K(*this));
} else if (is_terminated_) {
// NOTE that anohter error code maybe required for loosely coupled mode
ret = OB_TRANS_XA_BRANCH_FAIL;
TRANS_LOG(INFO, "xa trans has terminated", K(ret), K(xid), K(*this));
} else if (!is_tightly_coupled_) {
// loosely coupled mode
// do nothing
if (OB_FAIL(create_xa_savepoint_if_need_(xid, session_id))) {
TRANS_LOG(WARN, "check xa savepoint fail", K(ret), K(xid), K(session_id), K(*this));
}
} else {
// tightly coupled mode
if (is_executing_) {
@ -1924,6 +1931,9 @@ int ObXACtx::end_stmt(const ObXATransID &xid)
} else if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
TRANS_LOG(WARN, "xa ctx not inited", K(ret), K(*this));
} else if (is_exiting_) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "xa ctx is exiting", K(ret), K(*this));
} else if (!is_tightly_coupled_) {
// do nothing
} else if (OB_UNLIKELY(!is_executing_)) {
@ -2269,7 +2279,7 @@ bool ObXACtx::check_response_(const int64_t response_id,
// wait the result of start stmt
// this function is called only when SUCCESS is returned by start_stmt (local/remote)
int ObXACtx::wait_start_stmt()
int ObXACtx::wait_start_stmt(const uint32_t session_id)
{
int ret = OB_SUCCESS;
int result = OB_SUCCESS;
@ -2311,8 +2321,12 @@ int ObXACtx::wait_start_stmt()
TRANS_LOG(WARN, "fail to wait start stmt", K(ret), K(*this));
is_executing_ = false;
executing_xid_.reset();
} else if (OB_FAIL(create_xa_savepoint_if_need_(executing_xid_, session_id))) {
TRANS_LOG(WARN, "check xa savepoint fail", K(ret), K(session_id), K(*this));
} else {
// do nothing
}
}
}
return ret;
}

View File

@ -106,7 +106,7 @@ public:
ObTxDesc *&tx_desc);
int xa_end(const ObXATransID &xid, const int64_t flags, ObTxDesc *&tx_desc);
int start_stmt(const ObXATransID &xid, const uint32_t session_id);
int wait_start_stmt();
int wait_start_stmt(const uint32_t session_id);
int end_stmt(const ObXATransID &xid);
const ObXATransID &get_executing_xid() const { return executing_xid_; }
int one_phase_end_trans(const ObXATransID &xid,

View File

@ -1428,7 +1428,7 @@ int ObXAService::start_stmt(const ObXATransID &xid, const uint32_t session_id, O
TRANS_LOG(WARN, "unexpected trans descriptor", K(ret), K(tx_id), K(xid));
} else if (OB_FAIL(xa_ctx->start_stmt(xid, session_id))) {
TRANS_LOG(WARN, "xa trans start stmt failed", K(ret), K(tx_id), K(xid));
} else if (OB_FAIL(xa_ctx->wait_start_stmt())) {
} else if (OB_FAIL(xa_ctx->wait_start_stmt(session_id))) {
TRANS_LOG(WARN, "fail to wait start stmt", K(ret), K(tx_id), K(xid));
} else {
TRANS_LOG(INFO, "xa trans start stmt", K(ret), K(tx_id), K(xid));