[CP] [dblink][4.x] add defensive measure for unexpected ref of tx desc
This commit is contained in:
		| @ -208,6 +208,14 @@ int ObSqlTransControl::implicit_end_trans(ObExecContext &exec_ctx, | ||||
| #endif | ||||
|   ObSQLSessionInfo *session = GET_MY_SESSION(exec_ctx); | ||||
|   CK (OB_NOT_NULL(session)); | ||||
|   if (OB_SUCCESS != ret) { | ||||
|     // do nothing | ||||
|   } else if (session->associated_xa()) { | ||||
|     // NOTE that not support dblink trans in this interface | ||||
|     // PLEASE handle implicit cases for dblink trans instead of this interface | ||||
|     ret = OB_ERR_UNEXPECTED; | ||||
|     LOG_ERROR("executing do end trans in xa", K(ret), K(session->get_xid())); | ||||
|   } | ||||
|   int64_t tx_id = 0; | ||||
|   OX (tx_id = session->get_tx_id().get_id()); | ||||
|   CHECK_TX_FREE_ROUTE(exec_ctx, session); | ||||
|  | ||||
| @ -133,7 +133,7 @@ int ObTransService::finalize_tx_(ObTxDesc &tx) | ||||
|  * - for tx which is a shadow copy of original tx (started on another server) | ||||
|  *   release just free its memory used | ||||
|  */ | ||||
| int ObTransService::release_tx(ObTxDesc &tx) | ||||
| int ObTransService::release_tx(ObTxDesc &tx, const bool is_from_xa) | ||||
| { | ||||
|   /* | ||||
|    * for compatible with cross tenant session usage | ||||
| @ -148,6 +148,9 @@ int ObTransService::release_tx(ObTxDesc &tx) | ||||
|     MTL_SWITCH(tx.tenant_id_) { | ||||
|       return MTL(ObTransService*)->release_tx(tx); | ||||
|     } | ||||
|   } else if (NULL != tx.get_xa_ctx() && !is_from_xa) { | ||||
|     ret = OB_ERR_UNEXPECTED; | ||||
|     TRANS_LOG(ERROR, "unexpected case", K(ret), K(is_from_xa), K(tx)); | ||||
|   } else { | ||||
|     ObTransTraceLog &tlog = tx.get_tlog(); | ||||
|     REC_TRANS_TRACE_EXT(&tlog, release, OB_Y(ret), | ||||
|  | ||||
| @ -100,10 +100,11 @@ int submit_commit_tx(ObTxDesc &tx, | ||||
|  * this is the end of lifecycle of a transaction | ||||
|  * | ||||
|  * @tx:         the target transaction's descriptor | ||||
|  * @is_from_xa: whether xa ctx calls this interface | ||||
|  * | ||||
|  * Return: OB_SUCCESS - OK | ||||
|  */ | ||||
| int release_tx(ObTxDesc &tx); | ||||
| int release_tx(ObTxDesc &tx, const bool is_from_xa = false); | ||||
|  | ||||
| /** | ||||
|  * reuse_tx - reuse transaction descriptor | ||||
|  | ||||
| @ -158,7 +158,7 @@ int ObXACtx::init(const ObXATransID &xid, | ||||
| int ObXACtx::handle_timeout(const int64_t delay) | ||||
| { | ||||
|   int ret = OB_SUCCESS; | ||||
|   TRANS_LOG(INFO, "start to handle timeout for xa trans", K(*this), "lbt", lbt()); | ||||
|   TRANS_LOG(INFO, "start to handle timeout for xa trans", K(*this)); | ||||
|  | ||||
|   if (OB_SUCC(lock_.wrlock(common::ObLatchIds::XA_CTX_LOCK, 5000000/*5 seconds*/))) { | ||||
|     if (is_exiting_) { | ||||
| @ -490,7 +490,7 @@ int ObXACtx::get_branch_info_(const ObXATransID &xid, | ||||
|  | ||||
| void ObXACtx::set_terminated_() | ||||
| { | ||||
|   TRANS_LOG(INFO, "set terminated", K_(is_terminated), K(*this), "lbt", lbt()); | ||||
|   TRANS_LOG(INFO, "set terminated", K_(is_terminated), K(*this)); | ||||
|   is_terminated_ = true; | ||||
|   need_print_trace_log_ = true; | ||||
|   REC_TRACE_EXT(tlog_, terminate, OB_ID(ctx_ref), get_uref()); | ||||
| @ -503,8 +503,17 @@ int ObXACtx::xa_rollback_terminate_(const int cause) | ||||
|   int ret = OB_SUCCESS; | ||||
|  | ||||
|   (void)unregister_timeout_task_(); | ||||
|   if (OB_FAIL(MTL(ObTransService*)->abort_tx(*tx_desc_, cause))) { | ||||
|     TRANS_LOG(WARN, "abort tx for session terminate failed", K(ret), K(*this)); | ||||
|   if (OB_ISNULL(tx_desc_)) { | ||||
|     ret = OB_ERR_UNEXPECTED; | ||||
|     TRANS_LOG(ERROR, "trans desc is null", K(ret), K(*this)); | ||||
|   } else { | ||||
|     ObTransID tx_id = tx_desc_->get_tx_id(); | ||||
|     const int tx_ref_count = tx_desc_->get_ref(); | ||||
|     if (tx_id != trans_id_ || MIN_TX_REF_COUNT > tx_ref_count) { | ||||
|       TRANS_LOG(ERROR, "unexpected trans desc", K(tx_ref_count), K(tx_id), K(*this)); | ||||
|     } else if (OB_FAIL(MTL(ObTransService*)->abort_tx(*tx_desc_, cause))) { | ||||
|       TRANS_LOG(WARN, "abort tx for session terminate failed", K(ret), K(*this)); | ||||
|     } | ||||
|   } | ||||
|   set_terminated_(); | ||||
|  | ||||
| @ -804,7 +813,8 @@ int ObXACtx::process_xa_start_response(const obrpc::ObXAStartRPCResponse &resp) | ||||
|   } else if (OB_FAIL(MTL(ObTransService *)->recover_tx(tx_info, tx_desc_))) { | ||||
|     TRANS_LOG(WARN, "recover tx failed", K(ret), K(*this), K(tx_info)); | ||||
|   } else { | ||||
|     // do nothing | ||||
|     // increment reference of tx_desc | ||||
|     tx_desc_->inc_ref(1); | ||||
|   } | ||||
|  | ||||
|   TRANS_LOG(INFO, "xa start response", K(ret), K(*this)); | ||||
| @ -1632,7 +1642,10 @@ int ObXACtx::xa_start_remote_second_(const ObXATransID &xid, | ||||
| int ObXACtx::save_tx_desc_(ObTxDesc *tx_desc) | ||||
| { | ||||
|   int ret = OB_SUCCESS; | ||||
|   // NOTE that the tx_desc should be valid | ||||
|   tx_desc_ = tx_desc; | ||||
|   // increment tx_desc ref | ||||
|   tx_desc_->inc_ref(1); | ||||
|   return ret; | ||||
| } | ||||
|  | ||||
| @ -2284,6 +2297,7 @@ void ObXACtx::try_exit_() | ||||
| int ObXACtx::set_exiting_() | ||||
| { | ||||
|   int ret = OB_SUCCESS; | ||||
|   int tx_ref_count = -1; | ||||
|  | ||||
|   if (is_exiting_) { | ||||
|     // do nothing | ||||
| @ -2296,15 +2310,24 @@ int ObXACtx::set_exiting_() | ||||
|   } else { | ||||
|     is_exiting_ = true; | ||||
|     if (NULL != tx_desc_) { | ||||
|       tx_desc_->reset_for_xa(); | ||||
|       MTL(ObTransService *)->release_tx(*tx_desc_); | ||||
|       ObTransID tx_id = tx_desc_->get_tx_id(); | ||||
|       tx_ref_count = tx_desc_->get_ref(); | ||||
|       if (tx_id != trans_id_ || MIN_TX_REF_COUNT > tx_ref_count) { | ||||
|         ret = OB_ERR_UNEXPECTED; | ||||
|         TRANS_LOG(ERROR, "unexpected trans desc", K(ret), K(tx_ref_count), K(tx_id), K(*this)); | ||||
|       } else { | ||||
|         const bool is_from_xa = true; | ||||
|         tx_desc_->dec_ref(1); | ||||
|         // tx_desc_->reset_for_xa(); | ||||
|         MTL(ObTransService *)->release_tx(*tx_desc_, is_from_xa); | ||||
|       } | ||||
|       tx_desc_ = NULL; | ||||
|     } | ||||
|     if (OB_FAIL(xa_ctx_mgr_->erase_xa_ctx(trans_id_))) { | ||||
|       TRANS_LOG(WARN, "erase xa ctx failed", K(ret), K_(xid), K(*this)); | ||||
|     } | ||||
|   } | ||||
|   TRANS_LOG(INFO, "xa ctx set exiting", K(ret), K_(xid), K(*this)); | ||||
|   TRANS_LOG(INFO, "xa ctx set exiting", K(ret), K(tx_ref_count), K_(xid), K(*this)); | ||||
|  | ||||
|   return ret; | ||||
| } | ||||
| @ -2911,13 +2934,7 @@ int ObXACtx::wait_xa_prepare(const ObXATransID &xid, const int64_t timeout_us) | ||||
|   } | ||||
|  | ||||
|   if (OB_LIKELY(!is_exiting_)) { | ||||
|     is_exiting_ = true; | ||||
|     if (OB_NOT_NULL(xa_ctx_mgr_)) { | ||||
|       xa_ctx_mgr_->erase_xa_ctx(trans_id_); | ||||
|     } | ||||
|     // release tx desc | ||||
|     MTL(ObTransService*)->release_tx(*tx_desc_); | ||||
|     tx_desc_ = NULL; | ||||
|     set_exiting_(); | ||||
|   } | ||||
|  | ||||
|   TRANS_LOG(INFO, "wait xa prepare", K(ret), K(*this)); | ||||
| @ -2958,7 +2975,7 @@ int ObXACtx::two_phase_end_trans(const ObXATransID &xid, | ||||
|       ret = OB_INVALID_ARGUMENT; | ||||
|       TRANS_LOG(WARN, "invalid trans descriptor", K(ret), K(xid)); | ||||
|     } else { | ||||
|       tx_desc_ = tx; | ||||
|       save_tx_desc_(tx); | ||||
|       request_id_ = request_id; | ||||
|       if (is_rollback) { | ||||
|         xa_trans_state_ = ObXATransState::ROLLBACKING; | ||||
|  | ||||
| @ -257,6 +257,8 @@ private: | ||||
|   int get_dblink_client_(const common::sqlclient::DblinkDriverProto dblink_type, | ||||
|                          common::sqlclient::ObISQLConnection *dblink_conn, | ||||
|                          ObDBLinkClient *&dblink_client); | ||||
| private: | ||||
|   static const int MIN_TX_REF_COUNT = 3; | ||||
| private: | ||||
|   bool is_inited_; | ||||
|   ObXACtxMgr *xa_ctx_mgr_; | ||||
|  | ||||
		Reference in New Issue
	
	Block a user
	 obdev
					obdev