diff --git a/src/storage/tx/ob_tx_free_route.cpp b/src/storage/tx/ob_tx_free_route.cpp index 970c5b7bea..992b810922 100644 --- a/src/storage/tx/ob_tx_free_route.cpp +++ b/src/storage/tx/ob_tx_free_route.cpp @@ -337,7 +337,11 @@ int ObTransService::txn_free_route__kill_session_(const uint32_t session_id) return ret; } -int ObTransService::txn_free_route__handle_tx_exist_(const ObTransID &tx_id, ObTxnFreeRouteAuditRecord &audit_record, ObTxDesc *&tx) +int ObTransService::txn_free_route__handle_tx_exist_( + const ObTransID &tx_id, + ObTxnFreeRouteAuditRecord &audit_record, + ObTxDesc *&tx, + const bool is_update_extra) { int ret = OB_SUCCESS; ObTxDesc *tmp_tx = NULL; @@ -353,7 +357,13 @@ int ObTransService::txn_free_route__handle_tx_exist_(const ObTransID &tx_id, ObT uint32_t assoc_sess_id = tmp_tx->assoc_sess_id_; FLOG_WARN("tx found associate with other session, will kill the session", K(assoc_sess_id), K(tx_id)); - if (OB_FAIL(txn_free_route__kill_session_(assoc_sess_id))) { + if (is_update_extra && OB_UNLIKELY(tmp_tx->is_tx_active() || !tmp_tx->is_clean())) { + ret = OB_ERR_UNEXPECTED; + ObSpinLockGuard guard(tmp_tx->lock_); + TRANS_LOG(WARN, "update extra exit trans should not be active", + K(ret), K(assoc_sess_id), KPC(tmp_tx)); + tx_desc_mgr_.revert(*tmp_tx); + } else if (OB_FAIL(txn_free_route__kill_session_(assoc_sess_id))) { TRANS_LOG(WARN, "kill old session failed", K(ret), K(assoc_sess_id)); } else if (OB_FAIL(release_tx(*tmp_tx))) { TRANS_LOG(WARN, "release tx failed", K(ret), K(assoc_sess_id), K(tx_id)); @@ -368,6 +378,9 @@ int ObTransService::txn_free_route__handle_tx_exist_(const ObTransID &tx_id, ObT tx_desc_mgr_.revert(*tmp_tx); } } + } else if (is_update_extra) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(ERROR, "XA-tx found when update extra state", K(ret), K_(self), K(tx_id), K_(tmp_tx->addr)); } else if (tmp_tx->addr_ != self_) { ret = OB_ERR_UNEXPECTED; TRANS_LOG(ERROR, "XA-tx found but not on the orignal", K(ret), K_(self), K(tx_id), K_(tmp_tx->addr)); @@ -416,7 +429,7 @@ int ObTransService::txn_free_route__update_static_state(const uint32_t session_i ret = txn_free_route__sanity_check_fallback_(tx, ctx); } else { if (OB_ISNULL(tx)) { - if (OB_FAIL(txn_free_route__handle_tx_exist_(header.tx_id_, audit_record, tx))) { + if (OB_FAIL(txn_free_route__handle_tx_exist_(header.tx_id_, audit_record, tx, false))) { TRANS_LOG(WARN, "handle tx exist fail", K(ret)); } else if (OB_ISNULL(tx)) { audit_record.alloc_tx_ = true; @@ -661,6 +674,9 @@ int ObTransService::txn_free_route__update_extra_state(const uint32_t session_id } else if (header.flag_.is_fallback()) { audit_record.upd_fallback_ = true; ret = txn_free_route__sanity_check_fallback_(tx, ctx); + } else if (OB_ISNULL(tx) && // kill other session assoc `tx_id` + OB_FAIL(txn_free_route__handle_tx_exist_(header.tx_id_, audit_record, tx, true))) { + TRANS_LOG(WARN, "handle tx exist fail", K(ret), K(header.tx_id_)); } else { bool add_tx = OB_ISNULL(tx); bool replace_tx = OB_NOT_NULL(tx) && tx->tx_id_ != header.tx_id_; @@ -694,23 +710,22 @@ int ObTransService::txn_free_route__update_extra_state(const uint32_t session_id TRANS_LOG(WARN, "add tx to mgr fail", K(ret)); } else if (FALSE_IT(tx->flags_.REPLICA_ = tx->addr_ != self_)) { } - if (OB_FAIL(ret) && add_tx) { - release_tx(*tx); - tx = NULL; - } else { - int64_t elapsed_us = ObTimeUtility::current_time() - start_ts; - ObTransTraceLog &tlog = tx->get_tlog(); - REC_TRANS_TRACE_EXT(&tlog, tx_free_route_update_extra, OB_Y(ret), - OB_ID(txid), header.tx_id_.get_id(), - OB_ID(from), before_tx_id.get_id(), - OB_ID(time_used), elapsed_us, - OB_ID(logic_clock), logic_clock, - OB_ID(length), len, - OB_ID(tag1), add_tx, - OB_ID(tag2), replace_tx, - OB_ID(ref), tx->get_ref(), - OB_ID(thread_id), GETTID()); - } + int64_t elapsed_us = ObTimeUtility::current_time() - start_ts; + ObTransTraceLog &tlog = tx->get_tlog(); + REC_TRANS_TRACE_EXT(&tlog, tx_free_route_update_extra, OB_Y(ret), + OB_ID(txid), header.tx_id_.get_id(), + OB_ID(from), before_tx_id.get_id(), + OB_ID(time_used), elapsed_us, + OB_ID(logic_clock), logic_clock, + OB_ID(length), len, + OB_ID(tag1), add_tx, + OB_ID(tag2), replace_tx, + OB_ID(ref), tx->get_ref(), + OB_ID(thread_id), GETTID()); + } + if (OB_FAIL(ret) && add_tx && tx) { + release_tx(*tx); + tx = NULL; } } if (OB_SUCC(ret)) { diff --git a/src/storage/tx/ob_tx_free_route_api.h b/src/storage/tx/ob_tx_free_route_api.h index ded334c87a..19e6e156f6 100644 --- a/src/storage/tx/ob_tx_free_route_api.h +++ b/src/storage/tx/ob_tx_free_route_api.h @@ -33,5 +33,5 @@ static int update_logic_clock_(const int64_t logic_clock, const ObTxDesc *tx, co bool need_fallback_(ObTxDesc &tx, int64_t &state_size); int push_tx_state_to_remote_(ObTxDesc &tx, const ObAddr &txn_addr); int txn_free_route__sanity_check_fallback_(ObTxDesc *tx, ObTxnFreeRouteCtx &ctx); -int txn_free_route__handle_tx_exist_(const ObTransID &tx_id, ObTxnFreeRouteAuditRecord &audit_record, ObTxDesc *&tx); +int txn_free_route__handle_tx_exist_(const ObTransID &tx_id, ObTxnFreeRouteAuditRecord &audit_record, ObTxDesc *&tx, const bool is_update_extra); int txn_free_route__kill_session_(const uint32_t session_id);