diff --git a/src/storage/tx/ob_xa_ctx.cpp b/src/storage/tx/ob_xa_ctx.cpp index 9323b50b81..8d8a319e5f 100644 --- a/src/storage/tx/ob_xa_ctx.cpp +++ b/src/storage/tx/ob_xa_ctx.cpp @@ -73,6 +73,7 @@ void ObXACtx::reset() timeout_task_.reset(); xa_start_cond_.reset(); xa_sync_status_cond_.reset(); + sync_stmt_info_cond_.reset(); xa_branch_count_ = 0; xa_ref_count_ = 0; lock_grant_ = 0; @@ -295,6 +296,20 @@ int ObXACtx::wait_xa_sync_status_(const int64_t expired_time) return ret; } +int ObXACtx::wait_sync_stmt_info_(const int64_t expired_time) { + int ret = OB_SUCCESS; + int result = OB_SUCCESS; + if (0 > expired_time) { + ret = OB_INVALID_ARGUMENT; + TRANS_LOG(WARN, "invalid argument", K(ret), K(expired_time)); + } else { + if (OB_FAIL(sync_stmt_info_cond_.wait(expired_time, result)) || OB_FAIL(result)) { + TRANS_LOG(WARN, "wait sync stmt info failed", K(ret), "context", *this, K(expired_time), K(result)); + } + } + return ret; +} + int ObXACtx::init_xa_branch_info_() { int ret = OB_SUCCESS; @@ -817,7 +832,7 @@ int ObXACtx::process_xa_start_response(const obrpc::ObXAStartRPCResponse &resp) tx_desc_->inc_ref(1); } - TRANS_LOG(INFO, "xa start response", K(ret), K(*this)); + TRANS_LOG(INFO, "xa start response", K(ret), K(*this), K(resp)); xa_sync_status_cond_.notify(ret); REC_TRACE_EXT(tlog_, xa_start_response, OB_Y(ret), OB_ID(ctx_ref), get_uref()); @@ -970,7 +985,7 @@ int ObXACtx::process_start_stmt_response(const obrpc::ObXAStartStmtRPCResponse & } TRANS_LOG(INFO, "process start stmt response", K(ret), K(res)); - xa_sync_status_cond_.notify(ret); + sync_stmt_info_cond_.notify(ret); REC_TRACE_EXT(tlog_, xa_start_stmt_response, OB_Y(ret), OB_ID(ctx_ref), get_uref()); return ret; @@ -1835,7 +1850,7 @@ int ObXACtx::start_stmt(const ObXATransID &xid, const uint32_t session_id) // this flag indicates that a branch is executing normal stmt is_executing_ = true; start_stmt_cond_.reset(); - xa_sync_status_cond_.reset(); + sync_stmt_info_cond_.reset(); if (is_original) { // local if (OB_FAIL(start_stmt_local_(xid))) { @@ -1908,7 +1923,7 @@ int ObXACtx::start_stmt_local_(const ObXATransID &xid) } else { // notify SUCCESS start_stmt_cond_.notify(ret); - xa_sync_status_cond_.notify(ret); + sync_stmt_info_cond_.notify(ret); TRANS_LOG(INFO, "succeed to start stmt local", K(ret), K(xid)); } @@ -2384,7 +2399,7 @@ int ObXACtx::wait_start_stmt(const uint32_t session_id) } else { TRANS_LOG(WARN, "fail to wait cond", K(ret), K(result)); } - } else if (OB_FAIL(wait_xa_sync_status_(wait_time + 500000))) { + } else if (OB_FAIL(wait_sync_stmt_info_(wait_time + 500000))) { // TRANS_LOG(WARN, "unexpected status", K(ret)); if (OB_TIMEOUT == ret) { TRANS_LOG(WARN, "wait xa stmt info timeout, need retry", K(ret)); diff --git a/src/storage/tx/ob_xa_ctx.h b/src/storage/tx/ob_xa_ctx.h index f89509b7f1..0aee385cfe 100644 --- a/src/storage/tx/ob_xa_ctx.h +++ b/src/storage/tx/ob_xa_ctx.h @@ -178,6 +178,7 @@ private: int register_xa_timeout_task_(); void notify_xa_start_complete_(int ret_code); int wait_xa_sync_status_(const int64_t expired_time); + int wait_sync_stmt_info_(const int64_t expired_time); int update_xa_branch_info_(const ObXATransID &xid, const int64_t to_state, const common::ObAddr &addr, @@ -308,6 +309,7 @@ private: // can be removed when xid is from session ObXATransID executing_xid_; ObTransCond start_stmt_cond_; + ObTransCond sync_stmt_info_cond_; SyncXACb end_trans_cb_; // if dblink trans, record dblink client ObDBLinkClientArray dblink_client_array_;