[master][tx-route] fix check alive after session killed internally

This commit is contained in:
chinaxing 2023-02-13 07:12:35 +00:00 committed by ob-robot
parent 8ff89731d7
commit 41ff8d0ce0
9 changed files with 131 additions and 81 deletions

View File

@ -934,7 +934,8 @@ int ObTransService::tx_free_route_check_alive(ObTxnFreeRouteCtx &ctx, const ObTx
m.tx_id_ = tx.tx_id_;
m.sender_ = self_;
m.receiver_ = ctx.txn_addr_;
m.session_id_ = session_id;
m.req_sess_id_ = session_id;
m.tx_sess_id_ = tx.sess_id_;
ret = rpc_->post_msg(ctx.txn_addr_, m);
bool print_log = OB_FAIL(ret);
#ifndef NDEBUG
@ -949,33 +950,20 @@ int ObTransService::tx_free_route_check_alive(ObTxnFreeRouteCtx &ctx, const ObTx
return ret;
}
int ObTransService::tx_free_route_handle_check_alive(const ObTxFreeRouteCheckAliveMsg &msg)
int ObTransService::tx_free_route_handle_check_alive(const ObTxFreeRouteCheckAliveMsg &msg, const int retcode)
{
int ret = OB_SUCCESS;
ObTxDesc *tx = NULL;
if (OB_FAIL(tx_desc_mgr_.get(msg.tx_id_, tx))) {
if (OB_ENTRY_NOT_EXIST == ret) {
ret = OB_TRANS_CTX_NOT_EXIST;
}
} else if (OB_ISNULL(tx)) {
ret = OB_TRANS_CTX_NOT_EXIST;
} else if (tx->is_xa_trans() && tx->xa_start_addr_ != self_) {
ret = OB_TRANS_CTX_NOT_EXIST;
}
if (OB_NOT_NULL(tx)) {
tx_desc_mgr_.revert(*tx);
}
ObTxFreeRouteCheckAliveRespMsg m;
m.request_id_ = msg.request_id_;
m.receiver_ = msg.sender_;
m.sender_ = self_;
m.tx_id_ = msg.tx_id_;
m.session_id_ = msg.session_id_;
m.ret_ = ret;
m.req_sess_id_ = msg.req_sess_id_;
m.ret_ = retcode;
ret = rpc_->post_msg(m.receiver_, m);
#ifndef NDEBUG
TRANS_LOG(INFO, "[txn free route] handle check txn alive", K(ret), K_(msg.sender),
K_(msg.tx_id), K_(msg.session_id), K_(msg.request_id));
TRANS_LOG(INFO, "[txn free route] handle check txn alive", K(retcode), K_(msg.sender),
K_(msg.tx_id), K_(msg.req_sess_id), K_(msg.request_id));
#endif
return ret;
}

View File

@ -9,7 +9,7 @@ LST_DO(DEF_FREE_ROUTE_API, (;), static, dynamic, parts, extra)
int calc_txn_free_route(ObTxDesc *tx, ObTxnFreeRouteCtx &ctx);
int tx_free_route_check_alive(ObTxnFreeRouteCtx &ctx, const ObTxDesc &tx, const uint32_t session_id);
int tx_free_route_handle_check_alive(const ObTxFreeRouteCheckAliveMsg &msg);
int tx_free_route_handle_check_alive(const ObTxFreeRouteCheckAliveMsg &msg, const int retcode);
int tx_free_route_handle_push_state(const ObTxFreeRoutePushState &msg);
private:
int clean_txn_state_(ObTxDesc *&tx, const ObTransID &tx_id);

View File

@ -1,8 +1,8 @@
#include "ob_tx_free_route_msg.h"
namespace oceanbase {
namespace transaction {
OB_SERIALIZE_MEMBER(ObTxFreeRouteCheckAliveMsg, request_id_, session_id_, tx_id_, sender_, receiver_);
OB_SERIALIZE_MEMBER(ObTxFreeRouteCheckAliveRespMsg, request_id_, session_id_, tx_id_, sender_, receiver_, ret_);
OB_SERIALIZE_MEMBER(ObTxFreeRouteCheckAliveMsg, request_id_, req_sess_id_, tx_sess_id_, tx_id_, sender_, receiver_);
OB_SERIALIZE_MEMBER(ObTxFreeRouteCheckAliveRespMsg, request_id_, req_sess_id_, tx_id_, sender_, receiver_, ret_);
OB_DEF_SERIALIZE_SIZE(ObTxFreeRoutePushState)
{

View File

@ -62,15 +62,20 @@ struct ObTxFreeRouteCheckAliveMsg : ObTxFreeRouteMsg
{
ObTxFreeRouteCheckAliveMsg() : ObTxFreeRouteMsg(TX_FREE_ROUTE_CHECK_ALIVE) {}
int64_t request_id_;
uint32_t session_id_;
uint32_t req_sess_id_;
uint32_t tx_sess_id_;
ObTransID tx_id_;
ObAddr sender_;
ObAddr receiver_;
bool is_valid() const {
return request_id_ > 0 && session_id_ > 0 && tx_id_.is_valid()
&& sender_.is_valid() && receiver_.is_valid();
return request_id_ > 0
&& req_sess_id_ > 0
&& tx_sess_id_ > 0
&& tx_id_.is_valid()
&& sender_.is_valid()
&& receiver_.is_valid();
}
TO_STRING_KV(K_(type), K_(request_id), K_(session_id), K_(tx_id), K_(sender), K_(receiver));
TO_STRING_KV(K_(type), K_(request_id), K_(req_sess_id), K_(tx_sess_id), K_(tx_id), K_(sender), K_(receiver));
OB_UNIS_VERSION(1);
};
@ -78,16 +83,19 @@ struct ObTxFreeRouteCheckAliveRespMsg : ObTxFreeRouteMsg
{
ObTxFreeRouteCheckAliveRespMsg() : ObTxFreeRouteMsg(TX_FREE_ROUTE_CHECK_ALIVE_RESP) {}
int64_t request_id_;
uint32_t session_id_;
uint32_t req_sess_id_;
ObTransID tx_id_;
ObAddr sender_;
ObAddr receiver_;
int ret_;
bool is_valid() const {
return request_id_ > 0 && session_id_ > 0 && tx_id_.is_valid()
&& sender_.is_valid() && receiver_.is_valid();
return request_id_ > 0
&& req_sess_id_ > 0
&& tx_id_.is_valid()
&& sender_.is_valid()
&& receiver_.is_valid();
}
TO_STRING_KV(K_(type), K_(request_id), K_(session_id), K_(tx_id), K_(ret), K_(sender), K_(receiver));
TO_STRING_KV(K_(type), K_(request_id), K_(req_sess_id), K_(tx_id), K_(ret), K_(sender), K_(receiver));
OB_UNIS_VERSION(1);
};

View File

@ -10,25 +10,49 @@
namespace oceanbase {
namespace obrpc {
/*
* check txn is alive on the session
* skip the situation session was killed or un-exist
*/
int ObTxFreeRouteCheckAliveP::process()
{
int ret = OB_SUCCESS;
// get txn by tx_id
// if txn not exist return with OB_TRANS_CTX_NOT_EXIST
transaction::ObTransID sess_tx_id;
{
sql::ObSQLSessionInfo *session = NULL;
auto session_mgr = GCTX.session_mgr_;
if (OB_FAIL(session_mgr->get_session(arg_.tx_sess_id_, session))) {
if (OB_ENTRY_NOT_EXIST == ret) {
ret = OB_SESSION_NOT_FOUND;
} else {
TRANS_LOG(WARN, "get session fail", K(ret), K_(arg_.tx_sess_id));
}
} else if (OB_FAIL(session->get_query_lock().trylock())) {
TRANS_LOG(WARN, "try session query_lock failed", K(ret), K_(arg_.tx_sess_id));
} else {
if (session->get_session_state() == sql::ObSQLSessionState::SESSION_KILLED) {
ret = OB_SESSION_KILLED;
} else {
sess_tx_id = session->get_tx_id();
ret = sess_tx_id == arg_.tx_id_ ? OB_SUCCESS : OB_TRANS_CTX_NOT_EXIST;
}
session->get_query_lock().unlock();
}
}
auto txs = MTL(transaction::ObTransService*);
if (OB_ISNULL(txs)) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "can not get trans service", KR(ret));
} else if (OB_FAIL(txs->tx_free_route_handle_check_alive(arg_))) {
if (OB_TRANS_CTX_NOT_EXIST != ret) {
TRANS_LOG(WARN, "[tx free route] handle check alive fail", K(ret), K(arg_));
}
} else if (OB_FAIL(txs->tx_free_route_handle_check_alive(arg_, ret))) {
TRANS_LOG(WARN, "[tx free route] handle check alive fail", K(ret), K(arg_));
}
return ret;
}
int ObTxFreeRouteCheckAliveRespP::process()
{
// if ret is OB_SESSION_NOT_FOUND || OB_SESSION_KILLED
// kill_session
// if ret is OB_TRANS_CTX_NOT_EXIST
// 1. get session by session_id
// 2. lock session
@ -37,54 +61,73 @@ int ObTxFreeRouteCheckAliveRespP::process()
// 5. unlock session
// 6. revert session
int ret = OB_SUCCESS;
if (arg_.ret_ == OB_TRANS_CTX_NOT_EXIST) {
if (OB_SESSION_NOT_FOUND == arg_.ret_ || OB_SESSION_KILLED == arg_.ret_) {
TRANS_LOG(INFO, "[txn free route] txn start session has quit, close current session", "respMsg", arg_);
ret = kill_session_();
} else if (OB_TRANS_CTX_NOT_EXIST == arg_.ret_) {
TRANS_LOG(INFO, "[txn free route] txn has quit on start node, release tx on current node", "respMsg", arg_);
auto session_id = arg_.session_id_;
sql::ObSQLSessionInfo *session = NULL;
auto session_mgr = GCTX.session_mgr_;
if (OB_FAIL(session_mgr->get_session(session_id, session))) {
TRANS_LOG(WARN, "get session fail", K(ret), K(session_id));
} else if (session->get_is_in_retry()) {
// skip quit txn if session in Query Retry
} else {
{
sql::ObSQLSessionInfo::LockGuard query_lock_guard(session->get_query_lock());
sql::ObSQLSessionInfo::LockGuard data_lock_guard(session->get_thread_data_lock());
auto &ctx = session->get_txn_free_route_ctx();
auto &tx_desc = session->get_tx_desc();
if (ctx.version() != arg_.request_id_) {
TRANS_LOG(INFO, "skip handle checkAliveResp, staled", K(arg_), K(ctx.version()));
} else if (OB_NOT_NULL(tx_desc) && tx_desc->get_tx_id() == arg_.tx_id_) {
// mark idle release, if an Query has release query_lock but not send txn state Packet yet,
// it can sens the txn was released in plan (not a surprise)
ctx.set_idle_released();
// for XA txn temporary session on the XA orig node, the TxDesc is shared
// between XA'Ctx and the temporary session, so release ref is required
if (tx_desc->is_xa_trans() && tx_desc->get_addr() == GCONF.self_addr_) {
MTL_SWITCH(session->get_effective_tenant_id()) {
ret = MTL(transaction::ObTransService*)->release_tx_ref(*tx_desc);
}
tx_desc = NULL;
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(sql::ObSqlTransControl::reset_session_tx_state(session, false))) {
TRANS_LOG(WARN, "[txn free route] release session tx failed", K(ret), KPC(session));
} else if (OB_NOT_NULL(tx_desc)) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "txn not released", KR(ret));
} else {
TRANS_LOG(INFO, "[txn free route] release tx success");
}
}
}
session_mgr->revert_session(session);
}
ret = release_session_tx_();
} else if (OB_SUCCESS != arg_.ret_) {
TRANS_LOG(WARN, "[txn free route] check txn alive fail", "respMsg", arg_);
}
return ret;
}
int ObTxFreeRouteCheckAliveRespP::kill_session_()
{
int ret = OB_SUCCESS;
sql::ObSessionGetterGuard guard(*GCTX.session_mgr_, arg_.req_sess_id_);
sql::ObSQLSessionInfo *session = NULL;
if (OB_SUCC(guard.get_session(session))) {
if (session->get_is_in_retry()) {
} else if (OB_FAIL(GCTX.session_mgr_->kill_session(*session))) {
TRANS_LOG(WARN, "kill session fail", K(ret));
}
}
return ret;
}
int ObTxFreeRouteCheckAliveRespP::release_session_tx_()
{
int ret = OB_SUCCESS;
sql::ObSessionGetterGuard guard(*GCTX.session_mgr_, arg_.req_sess_id_);
sql::ObSQLSessionInfo *session = NULL;
if (OB_FAIL(guard.get_session(session))) {
} else if (session->get_is_in_retry()) {
// skip quit txn if session in Query Retry
} else {
sql::ObSQLSessionInfo::LockGuard query_lock_guard(session->get_query_lock());
sql::ObSQLSessionInfo::LockGuard data_lock_guard(session->get_thread_data_lock());
auto &ctx = session->get_txn_free_route_ctx();
auto &tx_desc = session->get_tx_desc();
if (ctx.version() != arg_.request_id_) {
TRANS_LOG(INFO, "skip handle checkAliveResp, staled", K(arg_), K(ctx.version()));
} else if (OB_NOT_NULL(tx_desc) && tx_desc->get_tx_id() == arg_.tx_id_) {
// mark idle release, if an Query has release query_lock but not send txn state Packet yet,
// it can sens the txn was released in plan (not a surprise)
ctx.set_idle_released();
// for XA txn temporary session on the XA orig node, the TxDesc is shared
// between XA'Ctx and the temporary session, so release ref is required
if (tx_desc->is_xa_trans() && tx_desc->get_addr() == GCONF.self_addr_) {
MTL_SWITCH(session->get_effective_tenant_id()) {
ret = MTL(transaction::ObTransService*)->release_tx_ref(*tx_desc);
}
tx_desc = NULL;
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(sql::ObSqlTransControl::reset_session_tx_state(session, false))) {
TRANS_LOG(WARN, "[txn free route] release session tx failed", K(ret), KPC(session));
} else if (OB_NOT_NULL(tx_desc)) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "txn not released", KR(ret));
} else {
TRANS_LOG(INFO, "[txn free route] release tx success");
}
}
}
return ret;
}
int ObTxFreeRoutePushStateP::process()
{
int ret = OB_SUCCESS;

View File

@ -18,6 +18,8 @@ public:
protected:
int process();
private:
int kill_session_();
int release_session_tx_();
DISALLOW_COPY_AND_ASSIGN(ObTxFreeRouteCheckAliveRespP);
};

View File

@ -998,7 +998,7 @@ int ObXAService::xa_start(const ObXATransID &xid,
}
} else if (ObXAFlag::is_tmjoin(flags) || ObXAFlag::is_tmresume(flags)) {
// } else if (ObXAFlag::contain_tmjoin(flags) || ObXAFlag::contain_tmresume(flags)) {
if (OB_FAIL(xa_start_join_(xid, flags, timeout_seconds, tx_desc))) {
if (OB_FAIL(xa_start_join_(xid, flags, timeout_seconds, session_id, tx_desc))) {
TRANS_LOG(WARN, "xa start join failed", K(ret), K(flags), K(xid), K(tx_desc));
}
} else {
@ -1223,6 +1223,10 @@ int ObXAService::xa_start_(const ObXATransID &xid,
}
}
}
// xa_start on new session, adjust tx_desc.sess_id_
if (OB_SUCC(ret)) {
tx_desc->set_sessid(session_id);
}
}
}
@ -1232,6 +1236,7 @@ int ObXAService::xa_start_(const ObXATransID &xid,
int ObXAService::xa_start_join_(const ObXATransID &xid,
const int64_t flags,
const int64_t timeout_seconds,
const uint32_t session_id,
ObTxDesc *&tx_desc)
{
int ret = OB_SUCCESS;
@ -1323,7 +1328,10 @@ int ObXAService::xa_start_join_(const ObXATransID &xid,
}
}
}
// xa_join/resume on new session, adjust tx_desc.sess_id_
if (OB_SUCC(ret)) {
tx_desc->set_sessid(session_id);
}
return ret;
}
@ -1762,7 +1770,7 @@ int ObXAService::one_phase_xa_rollback_(const ObXATransID &xid,
if (OB_FAIL(xa_ctx->one_phase_end_trans(xid, true/*is_rollback*/, timeout_us, request_id))) {
TRANS_LOG(WARN, "one phase xa rollback failed", K(ret), K(tx_id));
} else if (OB_FAIL(xa_ctx->wait_one_phase_end_trans(true/*is_rollback*/, timeout_us))) {
TRANS_LOG(WARN, "fail to wait one phase xa end trans", K(ret), K(xid), K(trans_id));
TRANS_LOG(WARN, "fail to wait one phase xa end trans", K(ret), K(xid), K(tx_id));
}
xa_ctx_mgr_.revert_xa_ctx(xa_ctx);
}

View File

@ -206,6 +206,7 @@ private:
int xa_start_join_(const ObXATransID &xid,
const int64_t flags,
const int64_t timeout_seconds,
const uint32_t session_id,
ObTxDesc *&tx_desc);
int one_phase_xa_commit_(const ObXATransID &xid,
const int64_t timeout_us,

View File

@ -406,7 +406,7 @@ int ObTxNode::handle_msg_(MsgPack *pkt)
ObTxFreeRouteCheckAliveMsg msg;
OZ(msg.deserialize(buf, size, pos));
TRANS_LOG(TRACE, "handle_msg", K(msg), KPC(this));
OZ(txs_.tx_free_route_handle_check_alive(msg));
OZ(txs_.tx_free_route_handle_check_alive(msg, OB_TRANS_CTX_NOT_EXIST));
break;
}
case TX_FREE_ROUTE_CHECK_ALIVE_RESP: