From 5413e7bf2739bfa00d1bb687111af7424c1caa97 Mon Sep 17 00:00:00 2001 From: obdev Date: Wed, 7 Feb 2024 17:43:30 +0000 Subject: [PATCH] [CP] fix: uncommitted txns may not be aborted after standby tenant switch over --- src/storage/tx/ob_trans_part_ctx.cpp | 17 +++++++++++++++-- src/storage/tx/ob_trans_part_ctx.h | 1 + src/storage/tx/ob_trans_service_v4.cpp | 12 +++++++++++- 3 files changed, 27 insertions(+), 3 deletions(-) diff --git a/src/storage/tx/ob_trans_part_ctx.cpp b/src/storage/tx/ob_trans_part_ctx.cpp index fd957b708..53f784e33 100644 --- a/src/storage/tx/ob_trans_part_ctx.cpp +++ b/src/storage/tx/ob_trans_part_ctx.cpp @@ -7730,17 +7730,28 @@ int ObPartTransCtx::abort(const int reason) return ret; } +int ObPartTransCtx::handle_tx_keepalive_response(const int64_t status) +{ + int ret = OB_SUCCESS; + + if (OB_SUCCESS == lock_.try_lock()) { + CtxLockGuard guard(lock_, false); + ret = tx_keepalive_response_(status); + } + + return ret; +} int ObPartTransCtx::tx_keepalive_response_(const int64_t status) { int ret = OB_SUCCESS; - CtxLockGuard guard(lock_); if (OB_SWITCHING_TO_FOLLOWER_GRACEFULLY == ERRSIM_DELAY_TX_SUBMIT_LOG) { return ret; } - if ((OB_TRANS_CTX_NOT_EXIST == status || OB_TRANS_ROLLBACKED == status || OB_TRANS_KILLED == status) && can_be_recycled_()) { + if ((OB_TRANS_CTX_NOT_EXIST == status || OB_TRANS_ROLLBACKED == status || + OB_TRANS_KILLED == status || common::OB_TENANT_NOT_IN_SERVER == status) && can_be_recycled_()) { if (REACH_TIME_INTERVAL(5 * 1000 * 1000)) { TRANS_LOG(WARN, "[TRANS GC] tx has quit, local tx will be aborted", K(status), KPC(this)); @@ -7917,6 +7928,8 @@ int ObPartTransCtx::do_local_abort_tx_() { int ret = OB_SUCCESS; + TRANS_LOG(WARN, "do_local_abort_tx_", KR(ret), K(*this)); + if (has_persisted_log_() || is_logging_()) { // part_trans_action_ = ObPartTransAction::ABORT; if (OB_FAIL(compensate_abort_log_())) { diff --git a/src/storage/tx/ob_trans_part_ctx.h b/src/storage/tx/ob_trans_part_ctx.h index b0b529cc5..4a1f2195a 100644 --- a/src/storage/tx/ob_trans_part_ctx.h +++ b/src/storage/tx/ob_trans_part_ctx.h @@ -837,6 +837,7 @@ public: bool is_logging_blocked(); bool is_xa_trans() const { return !exec_info_.xid_.empty(); } bool is_transfer_deleted() const { return transfer_deleted_; } + int handle_tx_keepalive_response(const int64_t status); private: int check_status_(); int tx_keepalive_response_(const int64_t status); diff --git a/src/storage/tx/ob_trans_service_v4.cpp b/src/storage/tx/ob_trans_service_v4.cpp index ad8bb4dfe..f8d35f4cd 100644 --- a/src/storage/tx/ob_trans_service_v4.cpp +++ b/src/storage/tx/ob_trans_service_v4.cpp @@ -944,7 +944,7 @@ int ObTransService::handle_trans_keepalive_response(const ObTxKeepaliveRespMsg & if (OB_FAIL(get_tx_ctx_(ls_id, tx_id, ctx))) { TRANS_LOG(WARN, "get tx ctx fail", K(tx_id), K(ls_id)); } else { - (void)ctx->tx_keepalive_response_(msg.status_); + (void)ctx->handle_tx_keepalive_response(msg.status_); } if (OB_NOT_NULL(ctx)) { revert_tx_ctx_(ctx); @@ -2377,6 +2377,16 @@ int ObTransService::handle_trans_msg_callback(const share::ObLSID &sender_ls_id, ret = OB_INVALID_ARGUMENT; TRANS_LOG(WARN, "invalid argument", K(ret), K(tx_id), K(msg_type), K(status), K(receiver_addr), K(request_id)); + } else if (KEEPALIVE == msg_type && common::OB_TENANT_NOT_IN_SERVER == status) { + ObPartTransCtx *ctx = NULL; + if (OB_FAIL(get_tx_ctx_(sender_ls_id, tx_id, ctx))) { + TRANS_LOG(WARN, "get tx ctx fail", K(tx_id), K(sender_ls_id)); + } else { + (void)ctx->handle_tx_keepalive_response(status); + } + if (OB_NOT_NULL(ctx)) { + revert_tx_ctx_(ctx); + } } else if (common::OB_TENANT_NOT_IN_SERVER == status || common::OB_TRANS_RPC_TIMEOUT == status) { // upper layer do retry