fix: xa_start and xa_start_stmt concurrency bug

This commit is contained in:
obdev
2024-02-23 03:15:48 +00:00
committed by ob-robot
parent 643ad4b2f9
commit 40b6589bd1
2 changed files with 22 additions and 5 deletions

View File

@ -73,6 +73,7 @@ void ObXACtx::reset()
timeout_task_.reset();
xa_start_cond_.reset();
xa_sync_status_cond_.reset();
sync_stmt_info_cond_.reset();
xa_branch_count_ = 0;
xa_ref_count_ = 0;
lock_grant_ = 0;
@ -295,6 +296,20 @@ int ObXACtx::wait_xa_sync_status_(const int64_t expired_time)
return ret;
}
int ObXACtx::wait_sync_stmt_info_(const int64_t expired_time) {
int ret = OB_SUCCESS;
int result = OB_SUCCESS;
if (0 > expired_time) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "invalid argument", K(ret), K(expired_time));
} else {
if (OB_FAIL(sync_stmt_info_cond_.wait(expired_time, result)) || OB_FAIL(result)) {
TRANS_LOG(WARN, "wait sync stmt info failed", K(ret), "context", *this, K(expired_time), K(result));
}
}
return ret;
}
int ObXACtx::init_xa_branch_info_()
{
int ret = OB_SUCCESS;
@ -817,7 +832,7 @@ int ObXACtx::process_xa_start_response(const obrpc::ObXAStartRPCResponse &resp)
tx_desc_->inc_ref(1);
}
TRANS_LOG(INFO, "xa start response", K(ret), K(*this));
TRANS_LOG(INFO, "xa start response", K(ret), K(*this), K(resp));
xa_sync_status_cond_.notify(ret);
REC_TRACE_EXT(tlog_, xa_start_response, OB_Y(ret), OB_ID(ctx_ref), get_uref());
@ -970,7 +985,7 @@ int ObXACtx::process_start_stmt_response(const obrpc::ObXAStartStmtRPCResponse &
}
TRANS_LOG(INFO, "process start stmt response", K(ret), K(res));
xa_sync_status_cond_.notify(ret);
sync_stmt_info_cond_.notify(ret);
REC_TRACE_EXT(tlog_, xa_start_stmt_response, OB_Y(ret), OB_ID(ctx_ref), get_uref());
return ret;
@ -1835,7 +1850,7 @@ int ObXACtx::start_stmt(const ObXATransID &xid, const uint32_t session_id)
// this flag indicates that a branch is executing normal stmt
is_executing_ = true;
start_stmt_cond_.reset();
xa_sync_status_cond_.reset();
sync_stmt_info_cond_.reset();
if (is_original) {
// local
if (OB_FAIL(start_stmt_local_(xid))) {
@ -1908,7 +1923,7 @@ int ObXACtx::start_stmt_local_(const ObXATransID &xid)
} else {
// notify SUCCESS
start_stmt_cond_.notify(ret);
xa_sync_status_cond_.notify(ret);
sync_stmt_info_cond_.notify(ret);
TRANS_LOG(INFO, "succeed to start stmt local", K(ret), K(xid));
}
@ -2384,7 +2399,7 @@ int ObXACtx::wait_start_stmt(const uint32_t session_id)
} else {
TRANS_LOG(WARN, "fail to wait cond", K(ret), K(result));
}
} else if (OB_FAIL(wait_xa_sync_status_(wait_time + 500000))) {
} else if (OB_FAIL(wait_sync_stmt_info_(wait_time + 500000))) {
// TRANS_LOG(WARN, "unexpected status", K(ret));
if (OB_TIMEOUT == ret) {
TRANS_LOG(WARN, "wait xa stmt info timeout, need retry", K(ret));

View File

@ -178,6 +178,7 @@ private:
int register_xa_timeout_task_();
void notify_xa_start_complete_(int ret_code);
int wait_xa_sync_status_(const int64_t expired_time);
int wait_sync_stmt_info_(const int64_t expired_time);
int update_xa_branch_info_(const ObXATransID &xid,
const int64_t to_state,
const common::ObAddr &addr,
@ -308,6 +309,7 @@ private:
// can be removed when xid is from session
ObXATransID executing_xid_;
ObTransCond start_stmt_cond_;
ObTransCond sync_stmt_info_cond_;
SyncXACb end_trans_cb_;
// if dblink trans, record dblink client
ObDBLinkClientArray dblink_client_array_;