[4.1][xa] refine for xa two phase commit

This commit is contained in:
obdev
2022-12-29 15:41:59 +00:00
committed by ob-robot
parent 9822def517
commit 8887f061fa
4 changed files with 28 additions and 20 deletions

View File

@ -681,7 +681,8 @@ int ObTransService::end_two_phase_tx(const ObTransID &tx_id,
const share::ObLSID &coord,
const int64_t timeout_us,
const bool is_rollback,
ObITxCallback &cb)
ObITxCallback &cb,
ObTxDesc *&tx_desc)
{
int ret = OB_SUCCESS;
int64_t now = ObClockGenerator::getClock();
@ -691,6 +692,8 @@ int ObTransService::end_two_phase_tx(const ObTransID &tx_id,
TRANS_LOG(WARN, "alloc tx fail", K(ret), KPC(this));
} else if (OB_FAIL(tx_desc_mgr_.add_with_txid(tx_id, *tx))) {
TRANS_LOG(WARN, "add tx to txMgr fail", K(ret), K(tx));
tx_desc_mgr_.revert(*tx);
tx = NULL;
} else {
tx->commit_expire_ts_ = now + timeout_us;
tx->coord_id_ = coord;
@ -701,16 +704,15 @@ int ObTransService::end_two_phase_tx(const ObTransID &tx_id,
} else if (OB_FAIL(register_commit_retry_task_(*tx))) {
TRANS_LOG(WARN, "fail to register retry commit task", K(ret), K(*tx));
} else {
int tmp_ret = OB_SUCCESS;
if (is_rollback) {
// two phase rollback
ObTxSubRollbackMsg msg;
tx->state_ = ObTxDesc::State::SUB_ROLLBACKING;
if (OB_FAIL(build_tx_sub_rollback_msg_(*tx, msg))) {
TRANS_LOG(WARN, "fail to build tx sub-rollback msg", K(ret), K(*tx));
} else if (OB_FAIL(rpc_->post_msg(tx->coord_id_, msg))) {
TRANS_LOG(WARN, "fail to post tx sub-rollback msg", K(ret), K(*tx), K(msg));
// send msg fail won't cause commit fail, later driven by retry timer
ret = OB_SUCCESS;
} else if (OB_SUCCESS != (tmp_ret = rpc_->post_msg(tx->coord_id_, msg))) {
TRANS_LOG(WARN, "fail to post tx sub-rollback msg", K(tmp_ret), K(*tx), K(msg));
}
} else {
// two phase commit
@ -718,15 +720,20 @@ int ObTransService::end_two_phase_tx(const ObTransID &tx_id,
tx->state_ = ObTxDesc::State::SUB_COMMITTING;
if (OB_FAIL(build_tx_sub_commit_msg_(*tx, msg))) {
TRANS_LOG(WARN, "fail to build tx sub-commit msg", K(ret), K(*tx));
} else if (OB_FAIL(rpc_->post_msg(tx->coord_id_, msg))) {
TRANS_LOG(WARN, "fail to post tx sub-commit msg", K(ret), K(*tx), K(msg));
// send msg fail won't cause commit fail, later driven by retry timer
ret = OB_SUCCESS;
} else if (OB_SUCCESS != (tmp_ret = rpc_->post_msg(tx->coord_id_, msg))) {
TRANS_LOG(WARN, "fail to post tx sub-commit msg", K(tmp_ret), K(*tx), K(msg));
}
}
}
if (OB_SUCCESS != ret && OB_NOT_NULL(tx)) {
tx_desc_mgr_.remove(*tx);
tx_desc_mgr_.revert(*tx);
tx = NULL;
} else {
tx_desc = tx;
}
TRANS_LOG(INFO, "end two phase tx", K(tx_id), K(is_rollback), K(xid), KP(&cb));
}
TRANS_LOG(INFO, "end two phase tx", K(ret), K(tx_id), K(is_rollback), K(xid), KP(&cb));
return ret;
}
@ -2397,7 +2404,7 @@ int ObTransService::handle_sub_rollback_timeout_(ObTxDesc &tx, const int64_t del
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "unexpect trans state", K(ret), K_(tx.state), K(tx));
} else if (tx.commit_expire_ts_ <= now) {
TRANS_LOG(WARN, "sub prepare timeout", K_(tx.commit_expire_ts), K(tx));
TRANS_LOG(WARN, "sub rollback timeout", K_(tx.commit_expire_ts), K(tx));
const bool is_rollback = true;
ret = handle_sub_end_tx_result_(tx, is_rollback, OB_TRANS_STMT_TIMEOUT);
} else {
@ -2423,7 +2430,7 @@ int ObTransService::handle_sub_commit_timeout_(ObTxDesc &tx, const int64_t delay
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "unexpect trans state", K(ret), K_(tx.state), K(tx));
} else if (tx.commit_expire_ts_ <= now) {
TRANS_LOG(WARN, "sub prepare timeout", K_(tx.commit_expire_ts), K(tx));
TRANS_LOG(WARN, "sub commit timeout", K_(tx.commit_expire_ts), K(tx));
const bool is_rollback = false;
ret = handle_sub_end_tx_result_(tx, is_rollback, OB_TRANS_STMT_TIMEOUT);
} else {
@ -2728,11 +2735,8 @@ int ObTransService::handle_sub_commit_result(const ObTransID &tx_id,
}
ret = handle_sub_end_tx_result_(*tx, is_rollback, final_result);
}
tx_desc_mgr_.remove(*tx);
tx->lock_.unlock();
tx->execute_commit_cb();
// revert of add
tx_desc_mgr_.revert(*tx);
}
if (OB_NOT_NULL(tx)) {
tx_desc_mgr_.revert(*tx);
@ -2776,11 +2780,8 @@ int ObTransService::handle_sub_rollback_result(const ObTransID &tx_id,
}
ret = handle_sub_end_tx_result_(*tx, is_rollback, final_result);
}
tx_desc_mgr_.remove(*tx);
tx->lock_.unlock();
tx->execute_commit_cb();
// revert of add
tx_desc_mgr_.revert(*tx);
}
if (OB_NOT_NULL(tx)) {
tx_desc_mgr_.revert(*tx);

View File

@ -47,7 +47,8 @@ int end_two_phase_tx(const ObTransID &tx_id,
const share::ObLSID &ls_id,
const int64_t timeout_us,
const bool is_rollback,
ObITxCallback &cb);
ObITxCallback &cb,
ObTxDesc *&tx_desc);
/*
* acquire transaction's coordinator

View File

@ -287,6 +287,7 @@ int ObTxCycleTwoPhaseCommitter::retransmit_upstream_msg_(const ObTxState state)
} else {
need_respond = false;
}
break;
}
case ObTxState::PREPARE: {
msg_type = ObTwoPhaseCommitMsgType::OB_MSG_TX_PREPARE_RESP;

View File

@ -2498,11 +2498,16 @@ int ObXACtx::two_phase_end_trans(const ObXATransID &xid,
} else if (OB_FAIL(check_trans_state_(is_rollback, request_id, false))) {
TRANS_LOG(WARN, "check trans state fail", K(ret), K(xid), K(is_rollback), K(timeout_us));
} else {
ObTxDesc *tx = NULL;
if (OB_FAIL(MTL(ObTransService*)->end_two_phase_tx(trans_id_, xid, coord, timeout_us,
is_rollback, end_trans_cb_))) {
is_rollback, end_trans_cb_, tx))) {
TRANS_LOG(WARN, "fail to end trans for two phase commit", K(ret), K(xid), K(coord),
K(is_rollback), K(timeout_us), K(*this));
} else if (OB_ISNULL(tx)) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "invalid trans descriptor", K(ret), K(xid));
} else {
tx_desc_ = tx;
request_id_ = request_id;
if (is_rollback) {
xa_trans_state_ = ObXATransState::ROLLBACKING;