[tx-route] fix receiving extra state duplicated on closing session
This commit is contained in:
@ -337,7 +337,11 @@ int ObTransService::txn_free_route__kill_session_(const uint32_t session_id)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTransService::txn_free_route__handle_tx_exist_(const ObTransID &tx_id, ObTxnFreeRouteAuditRecord &audit_record, ObTxDesc *&tx)
|
||||
int ObTransService::txn_free_route__handle_tx_exist_(
|
||||
const ObTransID &tx_id,
|
||||
ObTxnFreeRouteAuditRecord &audit_record,
|
||||
ObTxDesc *&tx,
|
||||
const bool is_update_extra)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObTxDesc *tmp_tx = NULL;
|
||||
@ -353,7 +357,13 @@ int ObTransService::txn_free_route__handle_tx_exist_(const ObTransID &tx_id, ObT
|
||||
uint32_t assoc_sess_id = tmp_tx->assoc_sess_id_;
|
||||
FLOG_WARN("tx found associate with other session, will kill the session",
|
||||
K(assoc_sess_id), K(tx_id));
|
||||
if (OB_FAIL(txn_free_route__kill_session_(assoc_sess_id))) {
|
||||
if (is_update_extra && OB_UNLIKELY(tmp_tx->is_tx_active() || !tmp_tx->is_clean())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
ObSpinLockGuard guard(tmp_tx->lock_);
|
||||
TRANS_LOG(WARN, "update extra exit trans should not be active",
|
||||
K(ret), K(assoc_sess_id), KPC(tmp_tx));
|
||||
tx_desc_mgr_.revert(*tmp_tx);
|
||||
} else if (OB_FAIL(txn_free_route__kill_session_(assoc_sess_id))) {
|
||||
TRANS_LOG(WARN, "kill old session failed", K(ret), K(assoc_sess_id));
|
||||
} else if (OB_FAIL(release_tx(*tmp_tx))) {
|
||||
TRANS_LOG(WARN, "release tx failed", K(ret), K(assoc_sess_id), K(tx_id));
|
||||
@ -368,6 +378,9 @@ int ObTransService::txn_free_route__handle_tx_exist_(const ObTransID &tx_id, ObT
|
||||
tx_desc_mgr_.revert(*tmp_tx);
|
||||
}
|
||||
}
|
||||
} else if (is_update_extra) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(ERROR, "XA-tx found when update extra state", K(ret), K_(self), K(tx_id), K_(tmp_tx->addr));
|
||||
} else if (tmp_tx->addr_ != self_) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(ERROR, "XA-tx found but not on the orignal", K(ret), K_(self), K(tx_id), K_(tmp_tx->addr));
|
||||
@ -416,7 +429,7 @@ int ObTransService::txn_free_route__update_static_state(const uint32_t session_i
|
||||
ret = txn_free_route__sanity_check_fallback_(tx, ctx);
|
||||
} else {
|
||||
if (OB_ISNULL(tx)) {
|
||||
if (OB_FAIL(txn_free_route__handle_tx_exist_(header.tx_id_, audit_record, tx))) {
|
||||
if (OB_FAIL(txn_free_route__handle_tx_exist_(header.tx_id_, audit_record, tx, false))) {
|
||||
TRANS_LOG(WARN, "handle tx exist fail", K(ret));
|
||||
} else if (OB_ISNULL(tx)) {
|
||||
audit_record.alloc_tx_ = true;
|
||||
@ -661,6 +674,9 @@ int ObTransService::txn_free_route__update_extra_state(const uint32_t session_id
|
||||
} else if (header.flag_.is_fallback()) {
|
||||
audit_record.upd_fallback_ = true;
|
||||
ret = txn_free_route__sanity_check_fallback_(tx, ctx);
|
||||
} else if (OB_ISNULL(tx) && // kill other session assoc `tx_id`
|
||||
OB_FAIL(txn_free_route__handle_tx_exist_(header.tx_id_, audit_record, tx, true))) {
|
||||
TRANS_LOG(WARN, "handle tx exist fail", K(ret), K(header.tx_id_));
|
||||
} else {
|
||||
bool add_tx = OB_ISNULL(tx);
|
||||
bool replace_tx = OB_NOT_NULL(tx) && tx->tx_id_ != header.tx_id_;
|
||||
@ -694,23 +710,22 @@ int ObTransService::txn_free_route__update_extra_state(const uint32_t session_id
|
||||
TRANS_LOG(WARN, "add tx to mgr fail", K(ret));
|
||||
} else if (FALSE_IT(tx->flags_.REPLICA_ = tx->addr_ != self_)) {
|
||||
}
|
||||
if (OB_FAIL(ret) && add_tx) {
|
||||
release_tx(*tx);
|
||||
tx = NULL;
|
||||
} else {
|
||||
int64_t elapsed_us = ObTimeUtility::current_time() - start_ts;
|
||||
ObTransTraceLog &tlog = tx->get_tlog();
|
||||
REC_TRANS_TRACE_EXT(&tlog, tx_free_route_update_extra, OB_Y(ret),
|
||||
OB_ID(txid), header.tx_id_.get_id(),
|
||||
OB_ID(from), before_tx_id.get_id(),
|
||||
OB_ID(time_used), elapsed_us,
|
||||
OB_ID(logic_clock), logic_clock,
|
||||
OB_ID(length), len,
|
||||
OB_ID(tag1), add_tx,
|
||||
OB_ID(tag2), replace_tx,
|
||||
OB_ID(ref), tx->get_ref(),
|
||||
OB_ID(thread_id), GETTID());
|
||||
}
|
||||
int64_t elapsed_us = ObTimeUtility::current_time() - start_ts;
|
||||
ObTransTraceLog &tlog = tx->get_tlog();
|
||||
REC_TRANS_TRACE_EXT(&tlog, tx_free_route_update_extra, OB_Y(ret),
|
||||
OB_ID(txid), header.tx_id_.get_id(),
|
||||
OB_ID(from), before_tx_id.get_id(),
|
||||
OB_ID(time_used), elapsed_us,
|
||||
OB_ID(logic_clock), logic_clock,
|
||||
OB_ID(length), len,
|
||||
OB_ID(tag1), add_tx,
|
||||
OB_ID(tag2), replace_tx,
|
||||
OB_ID(ref), tx->get_ref(),
|
||||
OB_ID(thread_id), GETTID());
|
||||
}
|
||||
if (OB_FAIL(ret) && add_tx && tx) {
|
||||
release_tx(*tx);
|
||||
tx = NULL;
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
|
||||
@ -33,5 +33,5 @@ static int update_logic_clock_(const int64_t logic_clock, const ObTxDesc *tx, co
|
||||
bool need_fallback_(ObTxDesc &tx, int64_t &state_size);
|
||||
int push_tx_state_to_remote_(ObTxDesc &tx, const ObAddr &txn_addr);
|
||||
int txn_free_route__sanity_check_fallback_(ObTxDesc *tx, ObTxnFreeRouteCtx &ctx);
|
||||
int txn_free_route__handle_tx_exist_(const ObTransID &tx_id, ObTxnFreeRouteAuditRecord &audit_record, ObTxDesc *&tx);
|
||||
int txn_free_route__handle_tx_exist_(const ObTransID &tx_id, ObTxnFreeRouteAuditRecord &audit_record, ObTxDesc *&tx, const bool is_update_extra);
|
||||
int txn_free_route__kill_session_(const uint32_t session_id);
|
||||
|
||||
Reference in New Issue
Block a user