diff --git a/src/sql/ob_sql_trans_control.cpp b/src/sql/ob_sql_trans_control.cpp index 781c1dba96..6046fee42e 100644 --- a/src/sql/ob_sql_trans_control.cpp +++ b/src/sql/ob_sql_trans_control.cpp @@ -208,6 +208,14 @@ int ObSqlTransControl::implicit_end_trans(ObExecContext &exec_ctx, #endif ObSQLSessionInfo *session = GET_MY_SESSION(exec_ctx); CK (OB_NOT_NULL(session)); + if (OB_SUCCESS != ret) { + // do nothing + } else if (session->associated_xa()) { + // NOTE that not support dblink trans in this interface + // PLEASE handle implicit cases for dblink trans instead of this interface + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("executing do end trans in xa", K(ret), K(session->get_xid())); + } int64_t tx_id = 0; OX (tx_id = session->get_tx_id().get_id()); CHECK_TX_FREE_ROUTE(exec_ctx, session); diff --git a/src/storage/tx/ob_tx_api.cpp b/src/storage/tx/ob_tx_api.cpp index bf72bca13c..62da798d2a 100644 --- a/src/storage/tx/ob_tx_api.cpp +++ b/src/storage/tx/ob_tx_api.cpp @@ -133,7 +133,7 @@ int ObTransService::finalize_tx_(ObTxDesc &tx) * - for tx which is a shadow copy of original tx (started on another server) * release just free its memory used */ -int ObTransService::release_tx(ObTxDesc &tx) +int ObTransService::release_tx(ObTxDesc &tx, const bool is_from_xa) { /* * for compatible with cross tenant session usage @@ -148,6 +148,9 @@ int ObTransService::release_tx(ObTxDesc &tx) MTL_SWITCH(tx.tenant_id_) { return MTL(ObTransService*)->release_tx(tx); } + } else if (NULL != tx.get_xa_ctx() && !is_from_xa) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(ERROR, "unexpected case", K(ret), K(is_from_xa), K(tx)); } else { ObTransTraceLog &tlog = tx.get_tlog(); REC_TRANS_TRACE_EXT(&tlog, release, OB_Y(ret), diff --git a/src/storage/tx/ob_tx_api.h b/src/storage/tx/ob_tx_api.h index 38052ade85..c8e569a273 100644 --- a/src/storage/tx/ob_tx_api.h +++ b/src/storage/tx/ob_tx_api.h @@ -100,10 +100,11 @@ int submit_commit_tx(ObTxDesc &tx, * this is the end of lifecycle of a transaction * * @tx: the target transaction's descriptor + * @is_from_xa: whether xa ctx calls this interface * * Return: OB_SUCCESS - OK */ -int release_tx(ObTxDesc &tx); +int release_tx(ObTxDesc &tx, const bool is_from_xa = false); /** * reuse_tx - reuse transaction descriptor diff --git a/src/storage/tx/ob_xa_ctx.cpp b/src/storage/tx/ob_xa_ctx.cpp index 4b9c68611a..9323b50b81 100644 --- a/src/storage/tx/ob_xa_ctx.cpp +++ b/src/storage/tx/ob_xa_ctx.cpp @@ -158,7 +158,7 @@ int ObXACtx::init(const ObXATransID &xid, int ObXACtx::handle_timeout(const int64_t delay) { int ret = OB_SUCCESS; - TRANS_LOG(INFO, "start to handle timeout for xa trans", K(*this), "lbt", lbt()); + TRANS_LOG(INFO, "start to handle timeout for xa trans", K(*this)); if (OB_SUCC(lock_.wrlock(common::ObLatchIds::XA_CTX_LOCK, 5000000/*5 seconds*/))) { if (is_exiting_) { @@ -490,7 +490,7 @@ int ObXACtx::get_branch_info_(const ObXATransID &xid, void ObXACtx::set_terminated_() { - TRANS_LOG(INFO, "set terminated", K_(is_terminated), K(*this), "lbt", lbt()); + TRANS_LOG(INFO, "set terminated", K_(is_terminated), K(*this)); is_terminated_ = true; need_print_trace_log_ = true; REC_TRACE_EXT(tlog_, terminate, OB_ID(ctx_ref), get_uref()); @@ -503,8 +503,17 @@ int ObXACtx::xa_rollback_terminate_(const int cause) int ret = OB_SUCCESS; (void)unregister_timeout_task_(); - if (OB_FAIL(MTL(ObTransService*)->abort_tx(*tx_desc_, cause))) { - TRANS_LOG(WARN, "abort tx for session terminate failed", K(ret), K(*this)); + if (OB_ISNULL(tx_desc_)) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(ERROR, "trans desc is null", K(ret), K(*this)); + } else { + ObTransID tx_id = tx_desc_->get_tx_id(); + const int tx_ref_count = tx_desc_->get_ref(); + if (tx_id != trans_id_ || MIN_TX_REF_COUNT > tx_ref_count) { + TRANS_LOG(ERROR, "unexpected trans desc", K(tx_ref_count), K(tx_id), K(*this)); + } else if (OB_FAIL(MTL(ObTransService*)->abort_tx(*tx_desc_, cause))) { + TRANS_LOG(WARN, "abort tx for session terminate failed", K(ret), K(*this)); + } } set_terminated_(); @@ -804,7 +813,8 @@ int ObXACtx::process_xa_start_response(const obrpc::ObXAStartRPCResponse &resp) } else if (OB_FAIL(MTL(ObTransService *)->recover_tx(tx_info, tx_desc_))) { TRANS_LOG(WARN, "recover tx failed", K(ret), K(*this), K(tx_info)); } else { - // do nothing + // increment reference of tx_desc + tx_desc_->inc_ref(1); } TRANS_LOG(INFO, "xa start response", K(ret), K(*this)); @@ -1632,7 +1642,10 @@ int ObXACtx::xa_start_remote_second_(const ObXATransID &xid, int ObXACtx::save_tx_desc_(ObTxDesc *tx_desc) { int ret = OB_SUCCESS; + // NOTE that the tx_desc should be valid tx_desc_ = tx_desc; + // increment tx_desc ref + tx_desc_->inc_ref(1); return ret; } @@ -2284,6 +2297,7 @@ void ObXACtx::try_exit_() int ObXACtx::set_exiting_() { int ret = OB_SUCCESS; + int tx_ref_count = -1; if (is_exiting_) { // do nothing @@ -2296,15 +2310,24 @@ int ObXACtx::set_exiting_() } else { is_exiting_ = true; if (NULL != tx_desc_) { - tx_desc_->reset_for_xa(); - MTL(ObTransService *)->release_tx(*tx_desc_); + ObTransID tx_id = tx_desc_->get_tx_id(); + tx_ref_count = tx_desc_->get_ref(); + if (tx_id != trans_id_ || MIN_TX_REF_COUNT > tx_ref_count) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(ERROR, "unexpected trans desc", K(ret), K(tx_ref_count), K(tx_id), K(*this)); + } else { + const bool is_from_xa = true; + tx_desc_->dec_ref(1); + // tx_desc_->reset_for_xa(); + MTL(ObTransService *)->release_tx(*tx_desc_, is_from_xa); + } tx_desc_ = NULL; } if (OB_FAIL(xa_ctx_mgr_->erase_xa_ctx(trans_id_))) { TRANS_LOG(WARN, "erase xa ctx failed", K(ret), K_(xid), K(*this)); } } - TRANS_LOG(INFO, "xa ctx set exiting", K(ret), K_(xid), K(*this)); + TRANS_LOG(INFO, "xa ctx set exiting", K(ret), K(tx_ref_count), K_(xid), K(*this)); return ret; } @@ -2911,13 +2934,7 @@ int ObXACtx::wait_xa_prepare(const ObXATransID &xid, const int64_t timeout_us) } if (OB_LIKELY(!is_exiting_)) { - is_exiting_ = true; - if (OB_NOT_NULL(xa_ctx_mgr_)) { - xa_ctx_mgr_->erase_xa_ctx(trans_id_); - } - // release tx desc - MTL(ObTransService*)->release_tx(*tx_desc_); - tx_desc_ = NULL; + set_exiting_(); } TRANS_LOG(INFO, "wait xa prepare", K(ret), K(*this)); @@ -2958,7 +2975,7 @@ int ObXACtx::two_phase_end_trans(const ObXATransID &xid, ret = OB_INVALID_ARGUMENT; TRANS_LOG(WARN, "invalid trans descriptor", K(ret), K(xid)); } else { - tx_desc_ = tx; + save_tx_desc_(tx); request_id_ = request_id; if (is_rollback) { xa_trans_state_ = ObXATransState::ROLLBACKING; diff --git a/src/storage/tx/ob_xa_ctx.h b/src/storage/tx/ob_xa_ctx.h index 449727303b..f89509b7f1 100644 --- a/src/storage/tx/ob_xa_ctx.h +++ b/src/storage/tx/ob_xa_ctx.h @@ -257,6 +257,8 @@ private: int get_dblink_client_(const common::sqlclient::DblinkDriverProto dblink_type, common::sqlclient::ObISQLConnection *dblink_conn, ObDBLinkClient *&dblink_client); +private: + static const int MIN_TX_REF_COUNT = 3; private: bool is_inited_; ObXACtxMgr *xa_ctx_mgr_;