[master] reset session txDesc when receive new txstate
This commit is contained in:
@ -196,6 +196,7 @@ TO_STRING_KV(K(is_inited_), K(tenant_id_), KP(this));
|
|||||||
private:
|
private:
|
||||||
int check_ls_status_(const share::ObLSID &ls_id, bool &leader);
|
int check_ls_status_(const share::ObLSID &ls_id, bool &leader);
|
||||||
int init_tx_(ObTxDesc &tx, const uint32_t session_id);
|
int init_tx_(ObTxDesc &tx, const uint32_t session_id);
|
||||||
|
int reinit_tx_(ObTxDesc &tx, const uint32_t session_id);
|
||||||
int start_tx_(ObTxDesc &tx);
|
int start_tx_(ObTxDesc &tx);
|
||||||
int abort_tx_(ObTxDesc &tx, const int cause, bool cleanup = true);
|
int abort_tx_(ObTxDesc &tx, const int cause, bool cleanup = true);
|
||||||
void abort_tx__(ObTxDesc &tx, const bool cleanup);
|
void abort_tx__(ObTxDesc &tx, const bool cleanup);
|
||||||
|
@ -148,10 +148,9 @@ int ObTransService::release_tx(ObTxDesc &tx, const bool is_from_xa)
|
|||||||
MTL_SWITCH(tx.tenant_id_) {
|
MTL_SWITCH(tx.tenant_id_) {
|
||||||
return MTL(ObTransService*)->release_tx(tx);
|
return MTL(ObTransService*)->release_tx(tx);
|
||||||
}
|
}
|
||||||
// FIXME: open check later
|
} else if (NULL != tx.get_xa_ctx() && !is_from_xa) {
|
||||||
// } else if (NULL != tx.get_xa_ctx() && !is_from_xa) {
|
ret = OB_ERR_UNEXPECTED;
|
||||||
// ret = OB_ERR_UNEXPECTED;
|
TRANS_LOG(ERROR, "unexpected case", K(ret), K(is_from_xa), K(tx));
|
||||||
// TRANS_LOG(ERROR, "unexpected case", K(ret), K(is_from_xa), K(tx));
|
|
||||||
} else {
|
} else {
|
||||||
ObTransTraceLog &tlog = tx.get_tlog();
|
ObTransTraceLog &tlog = tx.get_tlog();
|
||||||
REC_TRANS_TRACE_EXT(&tlog, release, OB_Y(ret),
|
REC_TRANS_TRACE_EXT(&tlog, release, OB_Y(ret),
|
||||||
@ -212,9 +211,7 @@ int ObTransService::reuse_tx(ObTxDesc &tx)
|
|||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
// it is safe to operate tx without lock when not shared
|
// it is safe to operate tx without lock when not shared
|
||||||
uint32_t session_id = tx.sess_id_;
|
ret = reinit_tx_(tx, tx.sess_id_);
|
||||||
tx.reset();
|
|
||||||
ret = init_tx_(tx, session_id);
|
|
||||||
}
|
}
|
||||||
TRANS_LOG(TRACE, "reuse tx", K(ret), K(orig_tx_id), K(tx));
|
TRANS_LOG(TRACE, "reuse tx", K(ret), K(orig_tx_id), K(tx));
|
||||||
ObTransTraceLog &tlog = tx.get_tlog();
|
ObTransTraceLog &tlog = tx.get_tlog();
|
||||||
@ -228,6 +225,12 @@ int ObTransService::reuse_tx(ObTxDesc &tx)
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int ObTransService::reinit_tx_(ObTxDesc &tx, const uint32_t session_id)
|
||||||
|
{
|
||||||
|
tx.reset();
|
||||||
|
return init_tx_(tx, session_id);
|
||||||
|
}
|
||||||
|
|
||||||
int ObTransService::stop_tx(ObTxDesc &tx)
|
int ObTransService::stop_tx(ObTxDesc &tx)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
|
@ -431,7 +431,11 @@ int ObTransService::txn_free_route__update_static_state(const uint32_t session_i
|
|||||||
} else if (tx->tx_id_ != header.tx_id_) {
|
} else if (tx->tx_id_ != header.tx_id_) {
|
||||||
// replace
|
// replace
|
||||||
audit_record.replace_tx_ = true;
|
audit_record.replace_tx_ = true;
|
||||||
tx_desc_mgr_.remove(*tx);
|
if (!tx->flags_.SHADOW_) {
|
||||||
|
tx_desc_mgr_.remove(*tx);
|
||||||
|
}
|
||||||
|
// reset tx to cleanup for new txn
|
||||||
|
reinit_tx_(*tx, session_id);
|
||||||
need_add_tx = true;
|
need_add_tx = true;
|
||||||
} else {
|
} else {
|
||||||
// update
|
// update
|
||||||
|
Reference in New Issue
Block a user