[4.1][xa] fix session terminate for xa trans

This commit is contained in:
obdev
2023-02-15 03:41:53 +00:00
committed by ob-robot
parent 8dd3ce1ac3
commit 4aa841b9e4
7 changed files with 316 additions and 286 deletions

View File

@ -194,13 +194,13 @@ int ObPlXaEndExecutor::execute(ObExecContext &ctx, ObXaEndStmt &stmt)
flags = my_session->has_tx_level_temp_table() ? (flags | ObXAFlag::TEMPTABLE) : flags;
if (OB_FAIL(MTL(transaction::ObXAService*)->xa_end(xid, flags,
my_session->get_tx_desc()))) {
LOG_WARN("xa end failed", K(ret), K(stmt.get_xa_string()));
// 如果是OB_TRANS_XA_RMFAIL错误那么由用户决定是否回滚
// if (OB_TRANS_XA_RMFAIL != ret
// && OB_SUCCESS != (tmp_ret = ObSqlTransControl::explicit_end_trans(ctx, true))) {
// ret = tmp_ret;
// LOG_WARN("explicit end trans failed", K(ret));
// }
LOG_WARN("xa end failed", K(ret), K(xid));
// if branch fail is returned, clean trans in session
if (OB_TRANS_XA_BRANCH_FAIL == ret) {
my_session->reset_tx_variable();
my_session->disassociate_xa();
ctx.set_need_disconnect(false);
}
} else {
my_session->reset_tx_variable();
my_session->disassociate_xa();

View File

@ -168,7 +168,7 @@ int ObXACtx::handle_timeout(const int64_t delay)
} else {
timeout_task_.set_running(true);
if (get_original_sche_addr() == GCONF.self_addr_) {
if (OB_FAIL(xa_rollback_terminate_())) {
if (OB_FAIL(xa_rollback_terminate_(ObTxAbortCause::IMPLICIT_ROLLBACK))) {
TRANS_LOG(WARN, "xa rollback terminate failed", K(ret), K(*this));
}
if (is_tightly_coupled_) {
@ -183,19 +183,17 @@ int ObXACtx::handle_timeout(const int64_t delay)
} else {
set_terminated_();
}
if (0 == xa_ref_count_) {
set_exiting_();
}
try_exit_();
timeout_task_.set_running(false);
}
lock_.unlock();
} else {
TRANS_LOG(WARN, "xa trans handle timeout failed", K(ret), K(*this));
if (OB_FAIL(register_timeout_task_(delay))) {
TRANS_LOG(WARN, "register timeout handler error", K(ret), K(*this));
}
TRANS_LOG(WARN, "xa trans handle timeout failed", K(ret), K(*this));
if (OB_FAIL(register_timeout_task_(delay))) {
TRANS_LOG(WARN, "register timeout handler error", K(ret), K(*this));
}
}
REC_TRACE_EXT(tlog_, handle_timeout, OB_Y(ret), OB_ID(ctx_ref), get_uref());
TRANS_LOG(INFO, "xa trans timeout", K(*this));
@ -221,13 +219,6 @@ int ObXACtx::kill()
return OB_NOT_SUPPORTED;
}
int ObXACtx::check_terminated()
{
ObLatchWGuard guard(lock_, common::ObLatchIds::XA_CTX_LOCK);
return check_terminated_();
}
int ObXACtx::check_terminated_() const
{
int ret = OB_SUCCESS;
@ -492,12 +483,6 @@ int ObXACtx::get_branch_info_(const ObXATransID &xid,
return ret;
}
void ObXACtx::set_terminated()
{
ObLatchWGuard guard(lock_, common::ObLatchIds::XA_CTX_LOCK);
set_terminated_();
}
void ObXACtx::set_terminated_()
{
TRANS_LOG(INFO, "set terminated", K_(is_terminated), K(*this), "lbt", lbt());
@ -506,14 +491,15 @@ void ObXACtx::set_terminated_()
REC_TRACE_EXT(tlog_, terminate, OB_ID(ctx_ref), get_uref());
}
int ObXACtx::xa_rollback_terminate_()
// this is used to roll back xa trans for exceptions
// e.g., timeout, session terminate, protocol error
int ObXACtx::xa_rollback_terminate_(const int cause)
{
int ret = OB_SUCCESS;
const bool is_rollback = true;
const int64_t request_id = ObTimeUtility::current_time();
if (OB_FAIL(one_phase_end_trans_(is_rollback, 0, request_id))) {
TRANS_LOG(WARN, "xa rollback terminate failed", K(ret), K(*this));
(void)unregister_timeout_task_();
if (OB_FAIL(MTL(ObTransService*)->abort_tx(*tx_desc_, cause))) {
TRANS_LOG(WARN, "abort tx for session terminate failed", K(ret), K(*this));
}
set_terminated_();
@ -683,6 +669,8 @@ void ObXACtx::print_branch_info_() const
return;
}
// process xa start request from temporary scheduler
// if branch fail and ref count is zero, exit
int ObXACtx::process_xa_start(const obrpc::ObXAStartRPCRequest &req)
{
int ret = OB_SUCCESS;
@ -707,7 +695,7 @@ int ObXACtx::process_xa_start(const obrpc::ObXAStartRPCRequest &req)
}
}
TRANS_LOG(INFO, "xa start", K(ret), K(req));
TRANS_LOG(INFO, "xa start", K(ret), K(req), K_(xa_ref_count));
return ret;
}
@ -861,6 +849,9 @@ int ObXACtx::process_xa_end(const obrpc::ObXAEndRPCRequest &req)
}
}
}
if (OB_TRANS_XA_BRANCH_FAIL == ret) {
try_exit_();
}
TRANS_LOG(INFO, "process xa end", K(ret), K(*this));
return ret;
@ -935,6 +926,9 @@ int ObXACtx::process_start_stmt(const obrpc::ObXAStartStmtRPCRequest &req)
}
}
}
if (OB_TRANS_XA_BRANCH_FAIL == ret) {
try_exit_();
}
REC_TRACE_EXT(tlog_, xa_start_stmt_request, OB_Y(ret), OB_ID(ctx_ref), get_uref());
TRANS_LOG(INFO, "process start stmt", K(ret), K(req), K(*this));
@ -978,7 +972,6 @@ int ObXACtx::process_end_stmt(const obrpc::ObXAEndStmtRPCRequest &req)
{
int ret = OB_SUCCESS;
const ObXATransID &xid = req.get_xid();
ObAddr fake_addr;
ObLatchWGuard guard(lock_, common::ObLatchIds::XA_CTX_LOCK);
if (OB_UNLIKELY(!req.is_valid())) {
@ -1006,6 +999,9 @@ int ObXACtx::process_end_stmt(const obrpc::ObXAEndStmtRPCRequest &req)
is_executing_ = false;
}
}
if (OB_TRANS_XA_BRANCH_FAIL == ret) {
try_exit_();
}
return ret;
}
@ -1195,9 +1191,7 @@ int ObXACtx::recover_tx_for_dblink_callback(ObTxDesc *&tx_desc)
ret = OB_TRANS_XA_BRANCH_FAIL;
TRANS_LOG(WARN, "xa trans is terminated", K(ret), K(*this));
is_terminated_ = true;
if (0 == xa_ref_count_) {
set_exiting_();
}
try_exit_();
} else if (NULL == tx_desc_) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "unexpected local tx desc", K(ret), K(*this));
@ -1229,9 +1223,7 @@ int ObXACtx::revert_tx_for_dblink_callback(ObTxDesc *&tx_desc)
tx_desc = NULL;
}
if (0 == xa_ref_count_) {
set_exiting_();
}
try_exit_();
TRANS_LOG(INFO, "revert tx for dblink callback", K(ret), K(*this));
return ret;
@ -1264,7 +1256,7 @@ int ObXACtx::xa_start(const ObXATransID &xid,
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "unexpected xid", K(xid), K(xid_), K(*this));
} else if (OB_FAIL(xa_start_(xid, flags, timeout_seconds, tx_desc))) {
TRANS_LOG(WARN, "loose mode, xa start failed", K(ret), K(xid), K(flags), K(*this));
TRANS_LOG(WARN, "xa start failed", K(ret), K(xid), K(flags), K(*this));
} else {
// set global trans type to xa trans
tx_desc->set_global_tx_type(ObGlobalTxType::XA_TRANS);
@ -1300,6 +1292,9 @@ int ObXACtx::xa_start_second(const ObXATransID &xid,
} else if (is_exiting_) {
ret = OB_TRANS_IS_EXITING;
TRANS_LOG(WARN, "xa trans is exiting", K(ret), K(xid), K(flags), K(*this));
} else if (OB_FAIL(check_for_execution_(xid, is_new_branch))) {
//include branch fail
TRANS_LOG(WARN, "check for execution failed", K(ret), K(xid), K(*this));
} else {
if (is_original) {
if (OB_FAIL(xa_start_local_(xid, flags, timeout_seconds, is_new_branch, tx_desc))) {
@ -1338,7 +1333,7 @@ int ObXACtx::xa_start_remote_first(const ObXATransID &xid,
const bool is_join = ObXAFlag::is_tmjoin(flags) || ObXAFlag::is_tmresume(flags);
const bool is_original = (GCTX.self_addr() == original_sche_addr_);
const bool is_new_branch = !is_join;
ObLatchWGuard guard(lock_, common::ObLatchIds::XA_CTX_LOCK);
if (OB_UNLIKELY(!xid.is_valid()) || 0 > timeout_seconds) {
ret = OB_INVALID_ARGUMENT;
@ -1400,9 +1395,6 @@ int ObXACtx::xa_start_(const ObXATransID &xid,
// if fail, the local variable tx_desc_ should be NULL
tx_desc_ = NULL;
//xa_ref_count_ is added only when success is returned
if (0 == xa_ref_count_) {
set_exiting_();
}
}
return ret;
@ -1422,9 +1414,6 @@ int ObXACtx::xa_start_local_(const ObXATransID &xid,
if (!is_new_branch && OB_FAIL(check_join_(xid))) {
TRANS_LOG(WARN, "check join failed", K(ret), K(xid), K(*this));
} else if (OB_FAIL(check_for_execution_(xid, is_new_branch))) {
//include branch fail
TRANS_LOG(WARN, "check for execution failed", K(ret), K(xid), K(*this));
} else if (OB_FAIL(update_xa_branch_info_(xid,
ObXATransState::ACTIVE,
GCTX.self_addr(),//ATTENTION, check here
@ -1496,9 +1485,6 @@ int ObXACtx::xa_start_remote_first_(const ObXATransID &xid,
// TODO, check loose couple mode but OB_TRANS_CTX_NOT_EXIST, check OB_TIMEOUT
if (is_tightly_coupled_ && (OB_TRANS_XA_BRANCH_FAIL == ret || OB_TRANS_CTX_NOT_EXIST == ret)) {
TRANS_LOG(INFO, "xa trans has terminated", K(ret), K(xid), K(result));
if (OB_FAIL(xa_service_->delete_xa_all_tightly_branch(tenant_id_, xid))) {
TRANS_LOG(WARN, "delete all xa tightly branch failed", K(ret), K(xid));
}
//rewrite code
ret = OB_TRANS_XA_BRANCH_FAIL;
} else {
@ -1516,14 +1502,10 @@ int ObXACtx::xa_start_remote_first_(const ObXATransID &xid,
// OB_TRANS_XA_BRANCH_FAIL,
// TODO, check whether need to delete inner table record
if (OB_TRANS_XA_BRANCH_FAIL == ret) {
is_terminated_ = true;
}
// xa_ref_count_ is added only when success is returned
if (0 == xa_ref_count_ && !is_exiting_) {
is_exiting_ = true;
xa_ctx_mgr_->erase_xa_ctx(trans_id_);
set_terminated_();
}
} else {
// xa_ref_count_ is added only when success is returned
++xa_ref_count_;
tx_desc_->set_xid(xid);
tx_desc_->set_xa_ctx(this);
@ -1584,9 +1566,6 @@ int ObXACtx::xa_start_remote_second_(const ObXATransID &xid,
if (OB_FAIL(cond.wait(wait_time, result)) || OB_FAIL(result)) {
if (OB_TRANS_XA_BRANCH_FAIL == ret || OB_TRANS_CTX_NOT_EXIST == ret) {
TRANS_LOG(INFO, "xa trans has terminated", K(ret), K(xid), K(result));
if (OB_FAIL(xa_service_->delete_xa_all_tightly_branch(tenant_id_, xid))) {
TRANS_LOG(WARN, "delete all xa tightly branch failed", K(ret), K(xid));
}
//rewrite code
ret = OB_TRANS_XA_BRANCH_FAIL;
} else {
@ -1597,17 +1576,13 @@ int ObXACtx::xa_start_remote_second_(const ObXATransID &xid,
if (OB_FAIL(ret)) {
// OB_TRANS_XA_BRANCH_FAIL,
// TODO, check whether need to delete inner table record
if (OB_TRANS_XA_BRANCH_FAIL == ret) {
is_terminated_ = true;
}
// xa_ref_count_ is added only when success is returned
if (0 == xa_ref_count_ && !is_exiting_) {
is_exiting_ = true;
xa_ctx_mgr_->erase_xa_ctx(trans_id_);
set_terminated_();
}
} else {
// xa_ref_count_ is increased only when success is returned
++xa_ref_count_;
// set tx_desc in session
tx_desc = tx_desc_;
}
return ret;
@ -1621,6 +1596,8 @@ int ObXACtx::save_tx_desc_(ObTxDesc *tx_desc)
}
// xa end in any scheduler
// regardless of the return code, decrease ref count
// if xa end fails, xa trans will fail. Therefore, branch fail is returned.
// @param [in] xid
// @param [in] flags
// @param [in/out] tx_desc
@ -1650,47 +1627,51 @@ int ObXACtx::xa_end(const ObXATransID &xid,
} else if (tx_desc_ != tx_desc) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "unexpected tx desc", K(ret), K(xid), K(*this));
} else if (is_terminated_) {
ret = OB_TRANS_XA_BRANCH_FAIL;
--xa_ref_count_;
if (0 == xa_ref_count_) {
set_exiting_();
}
TRANS_LOG(INFO, "xa trans is terminating", K(ret), K(*this));
} else if (!is_tightly_coupled_) {
// loosely coupled mode
if (is_original) {
if (OB_FAIL(xa_end_loose_local_(xid, flags, tx_desc))) {
TRANS_LOG(WARN, "xa end loose local failed", K(ret), K(xid), K(*this));
}
} else {
if (OB_FAIL(xa_end_loose_remote_(xid, flags, tx_desc))) {
TRANS_LOG(WARN, "xa end loose remote failed", K(ret), K(xid), K(*this));
}
}
} else if (OB_FAIL(check_for_execution_(xid, false))) {
// include branch fail
TRANS_LOG(WARN, "check for execution failed", K(ret), K(xid), K(*this));
} else {
//tightly coupled mode
if (is_original) {
if (OB_FAIL(xa_end_tight_local_(xid, flags, tx_desc))) {
TRANS_LOG(WARN, "xa end tight local failed", K(ret), K(xid), K(*this));
if (!is_tightly_coupled_) {
// loosely coupled mode
if (is_original) {
if (OB_FAIL(xa_end_loose_local_(xid, flags, tx_desc))) {
TRANS_LOG(WARN, "xa end loose local failed", K(ret), K(xid), K(*this));
}
} else {
if (OB_FAIL(xa_end_loose_remote_(xid, flags, tx_desc))) {
TRANS_LOG(WARN, "xa end loose remote failed", K(ret), K(xid), K(*this));
}
}
} else {
if (OB_FAIL(xa_end_tight_remote_(xid, flags, tx_desc))) {
TRANS_LOG(WARN, "xa end tight remote failed", K(ret), K(xid), K(*this));
//tightly coupled mode
if (is_original) {
if (OB_FAIL(xa_end_tight_local_(xid, flags, tx_desc))) {
TRANS_LOG(WARN, "xa end tight local failed", K(ret), K(xid), K(*this));
}
} else {
if (OB_FAIL(xa_end_tight_remote_(xid, flags, tx_desc))) {
TRANS_LOG(WARN, "xa end tight remote failed", K(ret), K(xid), K(*this));
}
}
}
}
if (OB_SUCC(ret)) {
// trans_desc.set_xa_ctx(NULL);
// trans_desc.set_sql_trans_action(ObSQLTransAction::END_TRANS);
--xa_ref_count_;
if (0 == xa_ref_count_ && !is_original) {
set_exiting_();
--xa_ref_count_;
// if fail, force terminate
if (OB_FAIL(ret)) {
if (OB_TRANS_XA_BRANCH_FAIL != ret) {
int tmp_ret = OB_SUCCESS;
if (OB_SUCCESS != (tmp_ret = xa_rollback_terminate_(ObTxAbortCause::IMPLICIT_ROLLBACK))) {
TRANS_LOG(WARN, "abort tx for session terminate failed", K(tmp_ret), K(*this));
}
// return branch fail
ret = OB_TRANS_XA_BRANCH_FAIL;
}
} else if (OB_TRANS_XA_BRANCH_FAIL == ret) {
// rewrite ret
ret = OB_SUCCESS;
}
if (OB_SUCC(ret) && is_original) {
// if succeed in original scheduler, do not exit
} else {
try_exit_();
}
REC_TRACE_EXT(tlog_, xa_end, OB_Y(ret), OB_ID(bqual), xid_.get_bqual_hash(),
@ -2011,6 +1992,9 @@ int ObXACtx::xa_end_loose_remote_(const ObXATransID &xid,
} else if (OB_FAIL(cond.wait(wait_time, result)) || OB_FAIL(result)) {
TRANS_LOG(WARN, "wait cond failed", K(ret), K(result), K(*this));
}
if (OB_SUCC(ret)) {
ret = result;
}
}
return ret;
@ -2076,53 +2060,55 @@ int ObXACtx::xa_end_tight_remote_(const ObXATransID &xid,
} else if (OB_FAIL(cond.wait(wait_time, result)) || OB_FAIL(result)) {
TRANS_LOG(WARN, "wait cond failed", K(ret), K(result), K(*this));
}
if (OB_SUCC(ret)) {
ret = result;
}
}
return ret;
}
int ObXACtx::clear_branch_for_xa_terminate(const ObXATransID &xid,
ObTxDesc *&tx_desc,
const bool delete_branch)
{
ObLatchWGuard guard(lock_, common::ObLatchIds::XA_CTX_LOCK);
return clear_branch_for_xa_terminate_(xid, tx_desc, delete_branch);
}
int ObXACtx::clear_branch_for_xa_terminate_(const ObXATransID &xid,
ObTxDesc *&tx_desc,
const bool delete_branch)
// this is ONLY used for session terminate
// subtract ref count
// if ref count is zero, exit
int ObXACtx::clear_branch_for_xa_terminate(const ObXATransID &xid)
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
ObLatchWGuard guard(lock_, common::ObLatchIds::XA_CTX_LOCK);
if (OB_ISNULL(xa_ctx_mgr_) || OB_ISNULL(xa_service_)) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "unexpected null ptr", K(ret), KP(xa_ctx_mgr_), KP(xa_service_));
} else {
--xa_ref_count_;
if (!xa_ref_count_ && !is_exiting_) {
if (OB_FAIL(xa_ctx_mgr_->erase_xa_ctx(trans_id_))) {
TRANS_LOG(WARN, "erase xa ctx failed", K(ret), K(xid), K(*this));
}
is_exiting_ = true;
}
// tx_desc.set_xa_ctx(NULL);
// xa_ctx_mgr_->revert_xa_ctx(this);
if (delete_branch &&
OB_SUCCESS != (tmp_ret = xa_service_->delete_xa_all_tightly_branch(tenant_id_, xid))) {
TRANS_LOG(WARN, "delete xa tight branch failed", K(ret), K(xid));
ret = (OB_SUCCESS == ret) ? tmp_ret : ret;
if (!is_tightly_coupled_ && 0 != xa_ref_count_) {
// if loosely coupled mode, ref count must be zero
tmp_ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "unexpected xa ref count", K(tmp_ret), K_(xa_ref_count),
K(xid), K(*this));
}
try_exit_();
}
return ret;
}
int ObXACtx::set_exiting()
// if ref count is zero, exit
void ObXACtx::try_exit(const bool need_decrease_ref)
{
ObLatchWGuard guard(lock_, common::ObLatchIds::XA_CTX_LOCK);
return set_exiting_();
if (need_decrease_ref) {
--xa_ref_count_;
}
try_exit_();
}
void ObXACtx::try_exit_()
{
if (0 == xa_ref_count_) {
set_exiting_();
}
}
int ObXACtx::set_exiting_()
@ -2255,6 +2241,9 @@ int ObXACtx::one_phase_end_trans(const ObXATransID &xid,
} else {
if (OB_FAIL(one_phase_end_trans_(is_rollback, timeout_us, request_id))) {
TRANS_LOG(WARN, "one phase xa end trans failed", K(ret), K(*this));
// in this case, the timeout task has been unregistered.
// therefore, if fail, xa ctx should exit
try_exit_();
}
}
@ -2291,11 +2280,6 @@ int ObXACtx::one_phase_end_trans_(const bool is_rollback, const int64_t timeout_
}
}
if (OB_FAIL(ret)) {
if (0 == xa_ref_count_) {
set_exiting_();
}
}
TRANS_LOG(INFO, "one phase end trans", K(ret), K(is_rollback), K(*this));
return ret;
@ -2324,46 +2308,67 @@ int ObXACtx::wait_one_phase_end_trans(const bool is_rollback, const int64_t time
set_exiting_();
}
TRANS_LOG(INFO, "wait one phase end trans", K(ret), K(is_rollback), K(*this));
return ret;
}
int ObXACtx::xa_rollback_session_terminate()
{
ObLatchWGuard guard(lock_, common::ObLatchIds::XA_CTX_LOCK);
return xa_rollback_session_terminate_();
}
int ObXACtx::xa_rollback_session_terminate_()
// this is ONLY used for session terminate
// DO NOT decrease ref count
int ObXACtx::xa_rollback_session_terminate(bool &is_first_terminate)
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
ObLatchWGuard guard(lock_, common::ObLatchIds::XA_CTX_LOCK);
if (!is_inited_) {
ret = OB_NOT_INIT;
TRANS_LOG(WARN, "ObXACtx not inited", K(ret));
} else if (is_terminated_) {
is_first_terminate = false;
TRANS_LOG(INFO, "transaction is terminating", K(ret), "context", *this);
} else if (ObXATransState::ACTIVE != xa_trans_state_) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "unexpected state", K(ret), K(xa_trans_state_), K(*this));
} else {
if (!is_tightly_coupled_) {
xa_ref_count_--;
}
set_terminated_();
const bool is_rollback = true;
const int64_t timeout_us = 0;
const int64_t request_id = ObTimeUtility::current_time();
if (OB_FAIL(one_phase_end_trans_(is_rollback, timeout_us, request_id))) {
TRANS_LOG(WARN, "terminate xa trans failed", K(ret), K(*this));
is_first_terminate = true;
// regardless of coupled mode, set terminate
// regardless of original scheduler, abort trans
if (OB_SUCCESS != (tmp_ret = xa_rollback_terminate_(ObTxAbortCause::SESSION_DISCONNECT))) {
TRANS_LOG(WARN, "abort tx for session terminate failed", K(tmp_ret), K(*this));
}
}
TRANS_LOG(INFO, "rollback xa trans when session terminate", K(ret), K(*this));
return ret;
}
// process terminate request from temporary scheduler
// DO NOT decrease ref count
// if ref count is zero, exit
int ObXACtx::process_terminate(const ObXATransID &xid)
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
ObLatchWGuard guard(lock_, common::ObLatchIds::XA_CTX_LOCK);
if (!is_inited_) {
ret = OB_NOT_INIT;
TRANS_LOG(WARN, "ObXACtx not inited", K(ret));
} else if (is_terminated_) {
TRANS_LOG(INFO, "transaction is terminating", K(ret), "context", *this);
} else {
// regardless of coupled mode, set terminate
// regardless of original scheduler, abort trans
if (OB_SUCCESS != (tmp_ret = xa_rollback_terminate_(ObTxAbortCause::SESSION_DISCONNECT))) {
TRANS_LOG(WARN, "abort tx for session terminate failed", K(tmp_ret), K(*this));
}
}
try_exit_();
TRANS_LOG(INFO, "process terminate in original scheduler", K(ret), K(xid));
return ret;
}
int ObXACtx::try_heartbeat()
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
ObLatchWGuard guard(lock_, common::ObLatchIds::XA_CTX_LOCK);
const int64_t now = ObTimeUtility::current_time();
if (original_sche_addr_ != GCTX.self_addr()) {
@ -2395,8 +2400,8 @@ int ObXACtx::try_heartbeat()
// exit only
// no need to delete xa records
// if xa records exist, we can rely on garbage collection
if (OB_FAIL(MTL(ObTransService*)->stop_tx(*tx_desc_))) {
TRANS_LOG(WARN, "fail to stop transaction", K(ret), K(*this));
if (OB_SUCCESS != (tmp_ret = xa_rollback_terminate_(ObTxAbortCause::IMPLICIT_ROLLBACK))) {
TRANS_LOG(WARN, "fail to stop xa trans", K(tmp_ret), K(*this));
}
(void)set_exiting_();
} else if (now > branch_info.last_hb_ts_ + XA_HB_THRESHOLD) {
@ -2427,13 +2432,12 @@ int ObXACtx::try_heartbeat()
} else if (ObXATransState::ACTIVE != branch_info.state_) {
//do nothing
} else if (branch_info.unrespond_msg_cnt_ > MAX_UNRESPOND_XA_HB_CNT) {
const bool is_rollback = true;
const int64_t timeout_us = 0;
set_terminated_();
int64_t request_id = ObTimeUtility::current_time();
if (OB_FAIL(one_phase_end_trans_(is_rollback, timeout_us, request_id))) {
TRANS_LOG(WARN, "fail to rollback xa trans", K(ret), K(*this));
int tmp_ret = OB_SUCCESS;
if (OB_SUCCESS != (tmp_ret = xa_rollback_terminate_(ObTxAbortCause::IMPLICIT_ROLLBACK))) {
TRANS_LOG(WARN, "fail to stop xa trans", K(tmp_ret), K(*this));
}
// if xa_ref_count is zero, xa ctx should exit
try_exit_();
TRANS_LOG(INFO, "scheduler unrespond, rollbacked", K(ret), K(branch_info), K(*this));
break;
} else if (now > branch_info.last_hb_ts_ + XA_HB_THRESHOLD) {
@ -2502,7 +2506,7 @@ int ObXACtx::check_for_execution_(const ObXATransID &xid, const bool is_new_bran
if (is_terminated_) {
ret = OB_TRANS_XA_BRANCH_FAIL;
TRANS_LOG(INFO, "xa trans is terminating", K(ret), K(*this));
TRANS_LOG(INFO, "xa trans is terminating", K(ret), K(xid));
} else if (is_exiting_) {
if (is_tightly_coupled_) {
ret = OB_TRANS_XA_BRANCH_FAIL;
@ -2521,11 +2525,11 @@ int ObXACtx::check_for_execution_(const ObXATransID &xid, const bool is_new_bran
} else {
//join, xa end, lock...
if (ObXATransState::PREPARING == xa_trans_state_) {
if (OB_FAIL(xa_rollback_terminate_())) {
TRANS_LOG(WARN, "rollback terminate failed", K(ret), K(xid), K(*this));
} else {
ret = OB_TRANS_XA_BRANCH_FAIL;
int tmp_ret = OB_SUCCESS;
if (OB_SUCCESS != (tmp_ret = xa_rollback_terminate_(ObTxAbortCause::IMPLICIT_ROLLBACK))) {
TRANS_LOG(WARN, "rollback terminate failed", K(tmp_ret), K(xid), K(*this));
}
ret = OB_TRANS_XA_BRANCH_FAIL;
}
}
} else {
@ -2569,16 +2573,14 @@ int ObXACtx::xa_prepare(const ObXATransID &xid, const int64_t timeout_us)
TRANS_LOG(WARN, "fail to check terminate", K(ret), K(xid), K(*this));
} else if (OB_FAIL(xa_prepare_(xid, timeout_us, need_exit))) {
TRANS_LOG(WARN, "xa prepare failed", K(ret), K(xid), K(need_exit), K(*this));
}
if (OB_FAIL(ret)) {
if (need_exit) {
int tmp_ret = OB_SUCCESS;
if (OB_SUCCESS != (tmp_ret = MTL(ObTransService*)->stop_tx(*tx_desc_))) {
if (OB_SUCCESS != (tmp_ret = MTL(ObTransService*)->abort_tx(*tx_desc_,
ObTxAbortCause::IMPLICIT_ROLLBACK))) {
TRANS_LOG(WARN, "fail to stop transaction", K(tmp_ret), K(*this));
}
set_terminated_();
set_exiting_();
tx_desc_ = NULL;
}
}
@ -2773,7 +2775,6 @@ int ObXACtx::two_phase_end_trans(const ObXATransID &xid,
if (OB_FAIL(ret)) {
set_exiting_();
tx_desc_ = NULL;
}
REC_TRACE_EXT(tlog_, xa_end_trans, OB_Y(ret), OB_ID(is_rollback), is_rollback,
@ -2802,7 +2803,6 @@ int ObXACtx::wait_two_phase_end_trans(const ObXATransID &xid,
if (OB_LIKELY(!is_exiting_)) {
set_exiting_();
tx_desc_ = NULL;
}
return ret;

View File

@ -81,8 +81,6 @@ public:
bool is_tightly_coupled() const { return is_tightly_coupled_; }
int stmt_lock_with_guard(const ObXATransID &xid);
int stmt_unlock_with_guard(const ObXATransID &xid);
int check_terminated();
void set_terminated();
bool is_terminated() { return is_terminated_; }
int xa_scheduler_hb_req();
common::ObAddr get_original_sche_addr() { return original_sche_addr_; }
@ -115,11 +113,13 @@ public:
const bool is_rollback,
const int64_t timeout_us,
const int64_t request_id);
int xa_rollback_session_terminate();
int set_exiting();
int clear_branch_for_xa_terminate(const ObXATransID &xid,
ObTxDesc *&tx_desc,
const bool delete_branch);
// this is ONLY used for session terminate
int xa_rollback_session_terminate(bool &is_first_terminate);
// process terminate request from temporary scheduler
int process_terminate(const ObXATransID &xid);
void try_exit(const bool need_decrease_ref = false);
// this is ONLY used for session terminate
int clear_branch_for_xa_terminate(const ObXATransID &xid);
int try_heartbeat();
int response_for_heartbeat(const ObXATransID &xid, const ObAddr &original_addr);
int update_xa_branch_for_heartbeat(const ObXATransID &xid);
@ -141,11 +141,6 @@ public:
const int64_t timeout_us);
int wait_one_phase_end_trans(const bool is_rollback,
const int64_t timeout_us);
void dec_xa_ref_count()
{
ObLatchWGuard guard(lock_, common::ObLatchIds::XA_CTX_LOCK);
xa_ref_count_--;
}
int64_t get_xa_ref_count()
{
ObLatchWGuard guard(lock_, common::ObLatchIds::XA_CTX_LOCK);
@ -234,9 +229,6 @@ private:
const int64_t timeout_us,
const int64_t request_id);
int save_tx_desc_(ObTxDesc *tx_desc);
int clear_branch_for_xa_terminate_(const ObXATransID &xid,
ObTxDesc *&tx_desc,
const bool delete_branch);
int start_stmt_local_(const ObXATransID &xid);
int start_stmt_remote_(const ObXATransID &xid);
int end_stmt_local_(const ObXATransID &xid);
@ -244,9 +236,9 @@ private:
bool check_response_(const int64_t response_id,
const bool is_stmt_response) const;
int set_exiting_();
void try_exit_();
int check_for_execution_(const ObXATransID &xid, const bool is_new_branch);
int xa_rollback_session_terminate_();
int xa_rollback_terminate_();
int xa_rollback_terminate_(const int cause);
int xa_prepare_(const ObXATransID &xid, const int64_t timeout_us, bool &need_exit);
int drive_prepare_(const ObXATransID &xid,

View File

@ -187,16 +187,17 @@ int ObXAService::xa_start_for_tm_promotion_(const int64_t flags,
//commit record
if (OB_FAIL(trans.end(true))) {
TRANS_LOG(WARN, "commit inner table trans failed", K(ret), K(xid));
const bool need_decrease_ref = true;
xa_ctx->try_exit(need_decrease_ref);
xa_ctx_mgr_.revert_xa_ctx(xa_ctx);
}
} else {
//rollback record
if (OB_SUCCESS != (tmp_ret = trans.end(false))) {
TRANS_LOG(WARN, "rollback inner table trans failed", K(tmp_ret), K(xid));
}
}
if (OB_FAIL(ret)) {
if (OB_NOT_NULL(xa_ctx)) {
xa_ctx_mgr_.erase_xa_ctx(trans_id);
xa_ctx_mgr_.revert_xa_ctx(xa_ctx);
}
}
@ -361,18 +362,19 @@ int ObXAService::xa_start_for_tm_(const int64_t flags,
if (OB_FAIL(trans.end(true))) {
TRANS_LOG(WARN, "commit inner table trans failed", K(ret), K(xid));
}
const bool need_decrease_ref = true;
xa_ctx->try_exit(need_decrease_ref);
xa_ctx_mgr_.revert_xa_ctx(xa_ctx);
} else {
//rollback record
if (OB_SUCCESS != (tmp_ret = trans.end(false))) {
TRANS_LOG(WARN, "rollback inner table trans failed", K(tmp_ret), K(xid));
}
}
if (OB_FAIL(ret)) {
if (OB_NOT_NULL(xa_ctx)) {
xa_ctx_mgr_.erase_xa_ctx(trans_id);
xa_ctx_mgr_.revert_xa_ctx(xa_ctx);
}
// txs_->remove_tx(*tx_desc);
// since tx_desc is not set into xa ctx, release tx desc explicitly
MTL(ObTransService *)->release_tx(*tx_desc);
tx_desc = NULL;
}

View File

@ -605,10 +605,8 @@ int ObXATerminateP::process()
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "xa ctx is null", K(ret), K(arg_));
} else {
if (!xa_ctx->is_tightly_coupled()) {
xa_ctx->set_exiting();
} else if (OB_FAIL(xa_ctx->xa_rollback_session_terminate())) {
TRANS_LOG(WARN, "xa rollback session terminate fail", K(ret), K_(arg));
if (OB_FAIL(xa_ctx->process_terminate(xid))) {
TRANS_LOG(WARN, "process terminate failed", K(ret), K(xid), K(tx_id));
}
xa_service->revert_xa_ctx(xa_ctx);
}

View File

@ -1009,7 +1009,7 @@ int ObXAService::xa_start(const ObXATransID &xid,
if (OB_FAIL(ret)) {
TRANS_LOG(WARN, "xa start failed", K(ret), K(xid), K(flags), K(timeout_seconds));
} else {
TRANS_LOG(INFO, "xa start", K(ret), K(xid), K(flags), K(timeout_seconds), "tx_id", tx_desc->get_tx_id());
TRANS_LOG(INFO, "xa start", K(ret), K(xid), K(flags), K(timeout_seconds), "tx_id", tx_desc->get_tx_id(), KPC(tx_desc));
}
return ret;
@ -1139,22 +1139,25 @@ int ObXAService::xa_start_(const ObXATransID &xid,
//commit record
if (OB_FAIL(trans.end(true))) {
TRANS_LOG(WARN, "commit inner table trans failed", K(ret), K(xid));
const bool need_decrease_ref = true;
xa_ctx->try_exit(need_decrease_ref);
xa_ctx_mgr_.revert_xa_ctx(xa_ctx);
tx_desc = NULL;
}
} else {
//rollback record
if (OB_SUCCESS != (tmp_ret = trans.end(false))) {
TRANS_LOG(WARN, "rollback inner table trans failed", K(tmp_ret), K(xid));
}
}
if (OB_FAIL(ret)) {
if (OB_NOT_NULL(xa_ctx)) {
xa_ctx->set_exiting();
xa_ctx_mgr_.erase_xa_ctx(trans_id);
xa_ctx_mgr_.revert_xa_ctx(xa_ctx);
}
// since tx_desc is not set into xa ctx, release tx desc explicitly
MTL(ObTransService *)->release_tx(*tx_desc);
tx_desc = NULL;
}
} else {
// tightly coupled mode, xa start noflags
// this xa start is not the first for this xa trans
@ -1183,10 +1186,12 @@ int ObXAService::xa_start_(const ObXATransID &xid,
&xa_rpc_,
&timer_))) {
TRANS_LOG(WARN, "init xa ctx failed", K(ret), K(xid));
// if init fails, erase xa ctx
xa_ctx_mgr_.erase_xa_ctx(trans_id);
}
} else {
if (OB_FAIL(xa_ctx->wait_xa_start_complete())) {
TRANS_LOG(WARN, "wait xa astart complete", K(ret), K(xid));
TRANS_LOG(WARN, "wait xa start complete", K(ret), K(xid));
}
}
}
@ -1199,6 +1204,8 @@ int ObXAService::xa_start_(const ObXATransID &xid,
// Therefore, tx_desc shouled be synchronized from original scheduler.
if (OB_FAIL(xa_ctx->xa_start_remote_first(xid, flags, timeout_seconds, tx_desc))) {
TRANS_LOG(WARN, "xa ctx start failed", K(ret), K(xid));
// if fail, erase xa ctx
xa_ctx_mgr_.erase_xa_ctx(trans_id);
}
} else {
if (OB_FAIL(xa_ctx->xa_start_second(xid, flags, timeout_seconds, tx_desc))) {
@ -1208,6 +1215,9 @@ int ObXAService::xa_start_(const ObXATransID &xid,
xa_ctx = NULL;
need_retry = true;
alloc = (GCTX.self_addr() == sche_addr) ? false : true;
} else if (OB_TRANS_XA_BRANCH_FAIL == ret) {
const bool need_decrease_ref = false;
xa_ctx->try_exit(need_decrease_ref);
}
}
}
@ -1218,10 +1228,16 @@ int ObXAService::xa_start_(const ObXATransID &xid,
if (OB_NOT_NULL(xa_ctx)) {
xa_ctx_mgr_.revert_xa_ctx(xa_ctx);
}
if (OB_SUCCESS != (tmp_ret = delete_xa_record(tenant_id, xid))) {
TRANS_LOG(WARN, "delete xa record failed", K(tmp_ret), K(xid), K(flags));
if (OB_TRANS_XA_BRANCH_FAIL == ret) {
if (OB_SUCCESS != (tmp_ret = delete_xa_all_tightly_branch(tenant_id, xid))) {
TRANS_LOG(WARN, "delete all xa tightly branch failed", K(tmp_ret), K(xid));
}
} else {
if (OB_SUCCESS != (tmp_ret = delete_xa_record(tenant_id, xid))) {
TRANS_LOG(WARN, "delete xa record failed", K(tmp_ret), K(xid), K(flags));
}
}
}
} // end if fail
}
// xa_start on new session, adjust tx_desc.sess_id_
if (OB_SUCC(ret)) {
@ -1240,6 +1256,7 @@ int ObXAService::xa_start_join_(const ObXATransID &xid,
ObTxDesc *&tx_desc)
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
ObAddr scheduler_addr;
ObTransID trans_id;
const uint64_t tenant_id = MTL_ID();
@ -1289,6 +1306,8 @@ int ObXAService::xa_start_join_(const ObXATransID &xid,
&xa_rpc_,
&timer_))) {
TRANS_LOG(WARN, "xa ctx init failed", K(ret), K(xid));
// if init fails, erase xa ctx
xa_ctx_mgr_.erase_xa_ctx(trans_id);
}
} else {
if (OB_FAIL(xa_ctx->wait_xa_start_complete())) {
@ -1305,17 +1324,21 @@ int ObXAService::xa_start_join_(const ObXATransID &xid,
// Therefore, tx_desc shouled be synchronized from original scheduler.
if (OB_FAIL(xa_ctx->xa_start_remote_first(xid, flags, timeout_seconds, tx_desc))) {
TRANS_LOG(WARN, "xa ctx start failed", K(ret), K(xid));
// if fail, erase xa ctx
xa_ctx_mgr_.erase_xa_ctx(trans_id);
}
} else {
if (OB_FAIL(xa_ctx->xa_start_second(xid, flags, timeout_seconds, tx_desc))) {
TRANS_LOG(WARN, "xa ctx start failed", K(ret), K(xid));
// TODO, ATTENTION, add control according to time or number of retry times,
// must be handled here or may affect error handling
if (is_tightly_coupled && OB_TRANS_IS_EXITING == ret) {
xa_ctx_mgr_.revert_xa_ctx(xa_ctx);
xa_ctx = NULL;
need_retry = true;
alloc = (GCTX.self_addr() == scheduler_addr) ? false : true;
} else if (OB_TRANS_XA_BRANCH_FAIL == ret) {
const bool need_decrease_ref = false;
xa_ctx->try_exit(need_decrease_ref);
}
}
}
@ -1326,6 +1349,10 @@ int ObXAService::xa_start_join_(const ObXATransID &xid,
if (OB_NOT_NULL(xa_ctx)) {
xa_ctx_mgr_.revert_xa_ctx(xa_ctx);
}
if (OB_TRANS_XA_BRANCH_FAIL == ret
&& OB_SUCCESS != (tmp_ret = delete_xa_all_tightly_branch(tenant_id, xid))) {
TRANS_LOG(WARN, "delete all xa tightly branch failed", K(tmp_ret), K(xid));
}
}
}
// xa_join/resume on new session, adjust tx_desc.sess_id_
@ -1351,7 +1378,7 @@ int ObXAService::xa_end(const ObXATransID &xid,
} else if (!tx_desc->is_xa_trans()) {
ret = OB_TRANS_XA_PROTO;
TRANS_LOG(WARN, "Routine invoked in an improper context", K(ret), K(xid));
} else if (tx_desc->get_xid() != xid) {
} else if (!xid.gtrid_equal_to(tx_desc->get_xid())) {
// tx->get_xid().gtrid != xid.gtrid
// oracle returns 0
ret = OB_TRANS_XA_NOTA;
@ -1361,6 +1388,10 @@ int ObXAService::xa_end(const ObXATransID &xid,
TRANS_LOG(ERROR, "transaction context is null", K(ret), K(xid));
} else if (OB_FAIL(xa_ctx->xa_end(xid, flags, tx_desc))) {
TRANS_LOG(WARN, "xa end failed", K(ret), K(xid), K(flags));
if (OB_TRANS_XA_BRANCH_FAIL == ret) {
tx_desc = NULL;
xa_ctx_mgr_.revert_xa_ctx(xa_ctx);
}
} else {
tx_desc = NULL;
xa_ctx_mgr_.revert_xa_ctx(xa_ctx);
@ -2018,6 +2049,7 @@ int ObXAService::delete_xa_pending_record(const uint64_t tenant_id,
return ret;
}
// this is only used for session terminate
int ObXAService::handle_terminate_for_xa_branch(const ObXATransID &xid, ObTxDesc *tx_desc, const int64_t xa_end_timeout_seconds)
{
int ret = OB_SUCCESS;
@ -2032,59 +2064,39 @@ int ObXAService::handle_terminate_for_xa_branch(const ObXATransID &xid, ObTxDesc
const int64_t timeout_us = xa_end_timeout_seconds * 1000 * 1000;
ObXACtx *xa_ctx = tx_desc->get_xa_ctx();
ObTransID tx_id = tx_desc->tid();
const int64_t expire_ts = now + timeout_us;
int tmp_ret = OB_SUCCESS;
bool is_first_terminate = true;
TRANS_LOG(INFO, "start to terminate xa trans", K(xid), K(tx_id), "lbt", lbt());
if (NULL == xa_ctx) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "xa ctx is null, may be stmt fail or rollback", K(ret), K(tx_id), K(xid));
} else if (!xa_ctx->is_tightly_coupled()) {
if (OB_FAIL(xa_ctx->xa_rollback_session_terminate())) {
// loosely coupled mode
if (OB_FAIL(xa_ctx->xa_rollback_session_terminate(is_first_terminate))) {
TRANS_LOG(WARN, "rollback xa trans failed", K(ret), K(tx_id), K(xid));
}
// if tmp scheduler, we needs to send terminate request to the original scheduler
// send the terminate request to the original scheduler no matter whether the rollback succeeds
if (xa_ctx->get_original_sche_addr() != self) {
int result;
do {
obrpc::ObXATerminateRPCRequest req;
obrpc::ObXARPCCB<obrpc::OB_XA_TERMINATE> cb;
ObTransCond cond;
// rely on timeout of cb, therefore timeout of cond is set to max
const int64_t wait_time = (INT64_MAX - now) / 2;
if (OB_FAIL(cb.init(&cond))) {
TRANS_LOG(WARN, "ObXARPCCB init failed", KR(ret));
} else if (OB_FAIL(req.init(tx_id, xid, timeout_us))) {
TRANS_LOG(WARN, "init ObXATerminateRPCRequest failed", KR(ret), K(xid), K(tx_id));
} else if (OB_FAIL(xa_rpc_.xa_terminate(tenant_id, xa_ctx->get_original_sche_addr(), req, &cb))) {
TRANS_LOG(WARN, "xa proxy terminate failed", KR(ret),
K(req), K(xa_ctx->get_original_sche_addr()));
} else if (OB_FAIL(cond.wait(wait_time, result))) {
TRANS_LOG(WARN, "wait xa_terminate rpc callback failed", KR(ret),
K(req), K(xa_ctx->get_original_sche_addr()));
} else if (OB_SUCCESS != result) {
TRANS_LOG(WARN, "xa_terminate rpc failed result", K(result),
K(req), K(xa_ctx->get_original_sche_addr()));
}
} while (OB_SUCC(ret) && (OB_TIMEOUT == result && expire_ts > ObTimeUtility::current_time()));
if (OB_SUCC(ret)) {
if (OB_TRANS_CTX_NOT_EXIST == result || OB_TIMEOUT == result) {
// if trans ctx does not exist or the rpc is timeout, assume that the orginal scheduler has been exited
} else {
ret = result;
}
}
if (OB_FAIL(ret)) {
TRANS_LOG(WARN, "terminate remote original scheduler failed", K(ret),
K(xa_ctx->get_original_sche_addr()), K(tx_id), K(xid));
if (is_first_terminate && xa_ctx->get_original_sche_addr() != self) {
// original scheduler is in remote
// send terminate to original scheduler
if (OB_SUCCESS != (tmp_ret = terminate_to_original_(xid, tx_id,
xa_ctx->get_original_sche_addr(), timeout_us))) {
TRANS_LOG(WARN, "terminate remote original scheduler failed", K(tmp_ret), K(xid),
K(tx_id), K(xa_ctx->get_original_sche_addr()), K(timeout_us));
}
}
if (OB_SUCCESS != (tmp_ret = xa_ctx->clear_branch_for_xa_terminate(xid))) {
TRANS_LOG(WARN, "clear branch for xa terminate failed", K(ret), K(xid), K(tx_id));
}
if (OB_SUCCESS != (tmp_ret = delete_xa_branch(tenant_id, xid, false))) {
TRANS_LOG(WARN, "delete xa record failed", K(ret), K(xid), K(tx_id));
}
xa_ctx_mgr_.revert_xa_ctx(xa_ctx);
tx_desc = NULL;
TRANS_LOG(INFO, "handle terminate for loosely coupled xa branch", K(ret), K(tx_id), K(xid));
} else {
// tightly coupled mode
// regardless of the location of scheduler, try to acquire lock first
int64_t expired_time = now + 10000000; // 10s
while (!xa_ctx->is_terminated()
@ -2098,51 +2110,28 @@ int ObXAService::handle_terminate_for_xa_branch(const ObXATransID &xid, ObTxDesc
if (xa_ctx->is_terminated()) {
// avoid the terminate operations of different branches
TRANS_LOG(INFO, "xa trans has terminated", K(tx_id), K(xid));
} else if (xa_ctx->get_original_sche_addr() == self){
// original scheduler is in local
if (OB_FAIL(xa_ctx->xa_rollback_session_terminate())) {
TRANS_LOG(WARN, "rollback xa trans failed", K(ret), K(tx_id), K(xid));
} else {
TRANS_LOG(INFO, "rollback xa trans success", K(tx_id), K(xid));
}
} else {
// original scheduler is in remote
int result;
xa_ctx->set_terminated();
do {
obrpc::ObXATerminateRPCRequest req;
obrpc::ObXARPCCB<obrpc::OB_XA_TERMINATE> cb;
ObTransCond cond;
// rely on timeout of cb, therefore timeout of cond is set to max
const int64_t wait_time = (INT64_MAX - now) / 2;
if (OB_FAIL(cb.init(&cond))) {
TRANS_LOG(WARN, "ObXARPCCB init failed", KR(ret));
} else if (OB_FAIL(req.init(tx_id, xid, timeout_us))) {
TRANS_LOG(WARN, "init ObXATerminateRPCRequest failed", KR(ret), K(xid), K(tx_id));
} else if (OB_FAIL(xa_rpc_.xa_terminate(tenant_id, xa_ctx->get_original_sche_addr(), req, &cb))) {
TRANS_LOG(WARN, "xa proxy terminate failed", KR(ret), K(xa_ctx->get_original_sche_addr()), K(req));
} else if (OB_FAIL(cond.wait(wait_time, result))) {
TRANS_LOG(WARN, "wait xa_terminate rpc callback failed", KR(ret),
K(req), K(xa_ctx->get_original_sche_addr()));
} else if (OB_SUCCESS != result) {
TRANS_LOG(WARN, "xa_terminate rpc failed result", K(result),
K(req), K(xa_ctx->get_original_sche_addr()));
} else {
// do nothing
}
} while (OB_SUCC(ret) && (OB_TIMEOUT == result && expire_ts > ObTimeUtility::current_time()));
if (OB_SUCC(ret)) {
if (OB_TRANS_CTX_NOT_EXIST == result || OB_TIMEOUT == result) {
ret = OB_SUCCESS;
} else {
ret = result;
if (OB_FAIL(xa_ctx->xa_rollback_session_terminate(is_first_terminate))) {
TRANS_LOG(WARN, "rollback xa trans failed", K(ret), K(tx_id), K(xid));
}
if (is_first_terminate && xa_ctx->get_original_sche_addr() != self) {
// original scheduler is in remote
// send terminate to original scheduler
if (OB_SUCCESS != (tmp_ret = terminate_to_original_(xid, tx_id,
xa_ctx->get_original_sche_addr(), timeout_us))) {
TRANS_LOG(WARN, "terminate remote original scheduler failed", K(tmp_ret), K(xid),
K(tx_id), K(xa_ctx->get_original_sche_addr()), K(timeout_us));
}
}
}
if (OB_SUCCESS != (tmp_ret = xa_ctx->clear_branch_for_xa_terminate(xid, tx_desc, true))) {
if (OB_SUCCESS != (tmp_ret = xa_ctx->clear_branch_for_xa_terminate(xid))) {
TRANS_LOG(WARN, "clear branch for xa terminate failed", K(ret), K(xid), K(tx_id));
}
if (OB_SUCCESS != (tmp_ret = delete_xa_all_tightly_branch(tenant_id, xid))) {
TRANS_LOG(WARN, "delete xa tight branch failed", K(ret), K(xid));
}
xa_ctx_mgr_.revert_xa_ctx(xa_ctx);
tx_desc = NULL;
TRANS_LOG(INFO, "handle terminate for tightly coupled xa branch", K(ret), K(xid), K(tx_id));
}
}
@ -2150,6 +2139,53 @@ int ObXAService::handle_terminate_for_xa_branch(const ObXATransID &xid, ObTxDesc
return ret;
}
// send terminate rpc to original scheduler
int ObXAService::terminate_to_original_(const ObXATransID &xid,
const ObTransID &tx_id,
const ObAddr &original_sche_addr,
const int64_t timeout_us)
{
int ret = OB_SUCCESS;
int result = OB_SUCCESS;
const uint64_t tenant_id = MTL_ID();
const int64_t now = ObTimeUtility::current_time();
const int64_t expire_ts = now + timeout_us;
do {
obrpc::ObXATerminateRPCRequest req;
obrpc::ObXARPCCB<obrpc::OB_XA_TERMINATE> cb;
ObTransCond cond;
// rely on timeout of cb, therefore timeout of cond is set to max
const int64_t wait_time = (INT64_MAX - now) / 2;
if (OB_FAIL(cb.init(&cond))) {
TRANS_LOG(WARN, "ObXARPCCB init failed", KR(ret));
} else if (OB_FAIL(req.init(tx_id, xid, timeout_us))) {
TRANS_LOG(WARN, "init ObXATerminateRPCRequest failed", KR(ret), K(xid), K(tx_id));
} else if (OB_FAIL(xa_rpc_.xa_terminate(tenant_id, original_sche_addr, req, &cb))) {
TRANS_LOG(WARN, "xa proxy terminate failed", KR(ret), K(original_sche_addr), K(req));
} else if (OB_FAIL(cond.wait(wait_time, result))) {
TRANS_LOG(WARN, "wait xa_terminate rpc callback failed", KR(ret),
K(req), K(original_sche_addr));
} else if (OB_SUCCESS != result) {
TRANS_LOG(WARN, "xa_terminate rpc failed result", K(result),
K(req), K(original_sche_addr));
} else {
// do nothing
}
} while (OB_SUCC(ret) && (OB_TIMEOUT == result && expire_ts > ObTimeUtility::current_time()));
if (OB_SUCC(ret)) {
if (OB_TRANS_CTX_NOT_EXIST == result || OB_TIMEOUT == result) {
// if trans ctx does not exist or the rpc is timeout,
// assume that the orginal scheduler has exited
ret = OB_SUCCESS;
} else {
ret = result;
}
}
return ret;
}
int ObXAService::xa_rollback_all_changes(const ObXATransID &xid, ObTxDesc *&tx_desc, const int64_t stmt_expired_time)
{
int ret = OB_SUCCESS;
@ -2836,11 +2872,9 @@ void ObXAService::clear_xa_branch(const ObXATransID &xid, ObTxDesc *&tx_desc)
if (NULL == xa_ctx) {
TRANS_LOG_RET(WARN, OB_ERR_UNEXPECTED, "xa ctx is null", K(tx_id), K(xid));
} else {
xa_ctx->dec_xa_ref_count();
if (0 == xa_ctx->get_xa_ref_count()) {
xa_ctx->set_exiting();
xa_ctx_mgr_.revert_xa_ctx(xa_ctx);
}
const bool need_decrease_ref = true;
xa_ctx->try_exit(need_decrease_ref);
xa_ctx_mgr_.revert_xa_ctx(xa_ctx);
}
}
tx_desc = NULL;

View File

@ -256,6 +256,10 @@ private:
int gc_invalid_xa_record_(const uint64_t tenant_id,
const bool check_self,
const int64_t gc_time_threshold);
int terminate_to_original_(const ObXATransID &xid,
const ObTransID &tx_id,
const ObAddr &original_sche_addr,
const int64_t timeout_us);
private:
// for 4.0 dblink
int xa_start_for_tm_promotion_(const int64_t flags,