diff --git a/src/storage/tx/ob_trans_service_v4.cpp b/src/storage/tx/ob_trans_service_v4.cpp index f4306e53a0..70ba5e9ee6 100644 --- a/src/storage/tx/ob_trans_service_v4.cpp +++ b/src/storage/tx/ob_trans_service_v4.cpp @@ -681,7 +681,8 @@ int ObTransService::end_two_phase_tx(const ObTransID &tx_id, const share::ObLSID &coord, const int64_t timeout_us, const bool is_rollback, - ObITxCallback &cb) + ObITxCallback &cb, + ObTxDesc *&tx_desc) { int ret = OB_SUCCESS; int64_t now = ObClockGenerator::getClock(); @@ -691,6 +692,8 @@ int ObTransService::end_two_phase_tx(const ObTransID &tx_id, TRANS_LOG(WARN, "alloc tx fail", K(ret), KPC(this)); } else if (OB_FAIL(tx_desc_mgr_.add_with_txid(tx_id, *tx))) { TRANS_LOG(WARN, "add tx to txMgr fail", K(ret), K(tx)); + tx_desc_mgr_.revert(*tx); + tx = NULL; } else { tx->commit_expire_ts_ = now + timeout_us; tx->coord_id_ = coord; @@ -701,16 +704,15 @@ int ObTransService::end_two_phase_tx(const ObTransID &tx_id, } else if (OB_FAIL(register_commit_retry_task_(*tx))) { TRANS_LOG(WARN, "fail to register retry commit task", K(ret), K(*tx)); } else { + int tmp_ret = OB_SUCCESS; if (is_rollback) { // two phase rollback ObTxSubRollbackMsg msg; tx->state_ = ObTxDesc::State::SUB_ROLLBACKING; if (OB_FAIL(build_tx_sub_rollback_msg_(*tx, msg))) { TRANS_LOG(WARN, "fail to build tx sub-rollback msg", K(ret), K(*tx)); - } else if (OB_FAIL(rpc_->post_msg(tx->coord_id_, msg))) { - TRANS_LOG(WARN, "fail to post tx sub-rollback msg", K(ret), K(*tx), K(msg)); - // send msg fail won't cause commit fail, later driven by retry timer - ret = OB_SUCCESS; + } else if (OB_SUCCESS != (tmp_ret = rpc_->post_msg(tx->coord_id_, msg))) { + TRANS_LOG(WARN, "fail to post tx sub-rollback msg", K(tmp_ret), K(*tx), K(msg)); } } else { // two phase commit @@ -718,15 +720,20 @@ int ObTransService::end_two_phase_tx(const ObTransID &tx_id, tx->state_ = ObTxDesc::State::SUB_COMMITTING; if (OB_FAIL(build_tx_sub_commit_msg_(*tx, msg))) { TRANS_LOG(WARN, "fail to build tx sub-commit msg", K(ret), K(*tx)); - } else if (OB_FAIL(rpc_->post_msg(tx->coord_id_, msg))) { - TRANS_LOG(WARN, "fail to post tx sub-commit msg", K(ret), K(*tx), K(msg)); - // send msg fail won't cause commit fail, later driven by retry timer - ret = OB_SUCCESS; + } else if (OB_SUCCESS != (tmp_ret = rpc_->post_msg(tx->coord_id_, msg))) { + TRANS_LOG(WARN, "fail to post tx sub-commit msg", K(tmp_ret), K(*tx), K(msg)); } } } + if (OB_SUCCESS != ret && OB_NOT_NULL(tx)) { + tx_desc_mgr_.remove(*tx); + tx_desc_mgr_.revert(*tx); + tx = NULL; + } else { + tx_desc = tx; + } } - TRANS_LOG(INFO, "end two phase tx", K(tx_id), K(is_rollback), K(xid), KP(&cb)); + TRANS_LOG(INFO, "end two phase tx", K(ret), K(tx_id), K(is_rollback), K(xid), KP(&cb)); return ret; } @@ -2397,7 +2404,7 @@ int ObTransService::handle_sub_rollback_timeout_(ObTxDesc &tx, const int64_t del ret = OB_ERR_UNEXPECTED; TRANS_LOG(WARN, "unexpect trans state", K(ret), K_(tx.state), K(tx)); } else if (tx.commit_expire_ts_ <= now) { - TRANS_LOG(WARN, "sub prepare timeout", K_(tx.commit_expire_ts), K(tx)); + TRANS_LOG(WARN, "sub rollback timeout", K_(tx.commit_expire_ts), K(tx)); const bool is_rollback = true; ret = handle_sub_end_tx_result_(tx, is_rollback, OB_TRANS_STMT_TIMEOUT); } else { @@ -2423,7 +2430,7 @@ int ObTransService::handle_sub_commit_timeout_(ObTxDesc &tx, const int64_t delay ret = OB_ERR_UNEXPECTED; TRANS_LOG(WARN, "unexpect trans state", K(ret), K_(tx.state), K(tx)); } else if (tx.commit_expire_ts_ <= now) { - TRANS_LOG(WARN, "sub prepare timeout", K_(tx.commit_expire_ts), K(tx)); + TRANS_LOG(WARN, "sub commit timeout", K_(tx.commit_expire_ts), K(tx)); const bool is_rollback = false; ret = handle_sub_end_tx_result_(tx, is_rollback, OB_TRANS_STMT_TIMEOUT); } else { @@ -2728,11 +2735,8 @@ int ObTransService::handle_sub_commit_result(const ObTransID &tx_id, } ret = handle_sub_end_tx_result_(*tx, is_rollback, final_result); } - tx_desc_mgr_.remove(*tx); tx->lock_.unlock(); tx->execute_commit_cb(); - // revert of add - tx_desc_mgr_.revert(*tx); } if (OB_NOT_NULL(tx)) { tx_desc_mgr_.revert(*tx); @@ -2776,11 +2780,8 @@ int ObTransService::handle_sub_rollback_result(const ObTransID &tx_id, } ret = handle_sub_end_tx_result_(*tx, is_rollback, final_result); } - tx_desc_mgr_.remove(*tx); tx->lock_.unlock(); tx->execute_commit_cb(); - // revert of add - tx_desc_mgr_.revert(*tx); } if (OB_NOT_NULL(tx)) { tx_desc_mgr_.revert(*tx); diff --git a/src/storage/tx/ob_trans_service_v4.h b/src/storage/tx/ob_trans_service_v4.h index a5922486b0..8219e669c1 100644 --- a/src/storage/tx/ob_trans_service_v4.h +++ b/src/storage/tx/ob_trans_service_v4.h @@ -47,7 +47,8 @@ int end_two_phase_tx(const ObTransID &tx_id, const share::ObLSID &ls_id, const int64_t timeout_us, const bool is_rollback, - ObITxCallback &cb); + ObITxCallback &cb, + ObTxDesc *&tx_desc); /* * acquire transaction's coordinator diff --git a/src/storage/tx/ob_two_phase_downstream_committer.cpp b/src/storage/tx/ob_two_phase_downstream_committer.cpp index c00cf49fed..eb26c24ced 100644 --- a/src/storage/tx/ob_two_phase_downstream_committer.cpp +++ b/src/storage/tx/ob_two_phase_downstream_committer.cpp @@ -287,6 +287,7 @@ int ObTxCycleTwoPhaseCommitter::retransmit_upstream_msg_(const ObTxState state) } else { need_respond = false; } + break; } case ObTxState::PREPARE: { msg_type = ObTwoPhaseCommitMsgType::OB_MSG_TX_PREPARE_RESP; diff --git a/src/storage/tx/ob_xa_ctx.cpp b/src/storage/tx/ob_xa_ctx.cpp index 32c2c967b1..4552fbf57f 100644 --- a/src/storage/tx/ob_xa_ctx.cpp +++ b/src/storage/tx/ob_xa_ctx.cpp @@ -2498,11 +2498,16 @@ int ObXACtx::two_phase_end_trans(const ObXATransID &xid, } else if (OB_FAIL(check_trans_state_(is_rollback, request_id, false))) { TRANS_LOG(WARN, "check trans state fail", K(ret), K(xid), K(is_rollback), K(timeout_us)); } else { + ObTxDesc *tx = NULL; if (OB_FAIL(MTL(ObTransService*)->end_two_phase_tx(trans_id_, xid, coord, timeout_us, - is_rollback, end_trans_cb_))) { + is_rollback, end_trans_cb_, tx))) { TRANS_LOG(WARN, "fail to end trans for two phase commit", K(ret), K(xid), K(coord), K(is_rollback), K(timeout_us), K(*this)); + } else if (OB_ISNULL(tx)) { + ret = OB_INVALID_ARGUMENT; + TRANS_LOG(WARN, "invalid trans descriptor", K(ret), K(xid)); } else { + tx_desc_ = tx; request_id_ = request_id; if (is_rollback) { xa_trans_state_ = ObXATransState::ROLLBACKING;