[CP] fix: xa_ctx may be freed in remote_xa_prepare retry process
This commit is contained in:
@ -2907,6 +2907,7 @@ int ObXACtx::wait_xa_prepare(const ObXATransID &xid, const int64_t timeout_us)
|
||||
ObLatchWGuard guard(lock_, common::ObLatchIds::XA_CTX_LOCK);
|
||||
if (OB_SUCC(ret) || OB_ERR_READ_ONLY_TRANSACTION == ret) {
|
||||
xa_trans_state_ = ObXATransState::PREPARED;
|
||||
MTL(ObXAService*)->get_xa_cache().insert_prepare_cache_item(xid, ObXATransState::PREPARED);
|
||||
}
|
||||
|
||||
if (OB_LIKELY(!is_exiting_)) {
|
||||
@ -2941,8 +2942,10 @@ int ObXACtx::two_phase_end_trans(const ObXATransID &xid,
|
||||
ret = OB_NOT_INIT;
|
||||
TRANS_LOG(WARN, "xa ctx not inited", K(ret), K(*this));
|
||||
} else if (OB_FAIL(check_trans_state_(is_rollback, request_id, false))) {
|
||||
if (!((is_rollback && OB_TRANS_ROLLBACKED == ret)
|
||||
|| (!is_rollback && OB_TRANS_COMMITED == ret))) {
|
||||
if (is_rollback && ObXATransState::ROLLBACKING == xa_trans_state_) {
|
||||
ret = OB_SUCCESS;
|
||||
} else if (!((is_rollback && OB_TRANS_ROLLBACKED == ret)
|
||||
|| (!is_rollback && OB_TRANS_COMMITED == ret))) {
|
||||
TRANS_LOG(WARN, "check trans state fail", K(ret), K(xid), K(is_rollback), K(timeout_us));
|
||||
}
|
||||
} else {
|
||||
|
||||
@ -119,6 +119,8 @@ ObXATransID &ObXATransID::operator=(const ObXATransID &xid)
|
||||
bqual_str_.assign_buffer(bqual_buf_, sizeof(bqual_buf_));
|
||||
gtrid_str_.write(xid.gtrid_str_.ptr(), xid.gtrid_str_.length());
|
||||
bqual_str_.write(xid.bqual_str_.ptr(), xid.bqual_str_.length());
|
||||
g_hv_ = xid.g_hv_;
|
||||
b_hv_ = xid.b_hv_;
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
|
||||
@ -181,6 +181,13 @@ public:
|
||||
int64_t get_format_id() const { return format_id_; }
|
||||
uint64_t get_gtrid_hash() const { return g_hv_; }
|
||||
uint64_t get_bqual_hash() const { return b_hv_; }
|
||||
uint64_t get_hash() const {
|
||||
if (0 == g_hv_ || 0 == b_hv_) {
|
||||
g_hv_ = murmurhash(gtrid_str_.ptr(), gtrid_str_.length(), 0) % HASH_SIZE;
|
||||
b_hv_ = murmurhash(bqual_str_.ptr(), bqual_str_.length(), 0) % HASH_SIZE;
|
||||
}
|
||||
return (g_hv_ + b_hv_) / 11;
|
||||
}
|
||||
bool empty() const;
|
||||
// empty xid is also valid
|
||||
bool is_valid() const;
|
||||
@ -206,8 +213,8 @@ private:
|
||||
char bqual_buf_[MAX_BQUAL_LENGTH];
|
||||
common::ObString bqual_str_;
|
||||
int64_t format_id_;
|
||||
uint64_t g_hv_;
|
||||
uint64_t b_hv_;
|
||||
mutable uint64_t g_hv_;
|
||||
mutable uint64_t b_hv_;
|
||||
};
|
||||
|
||||
struct ObXABranchInfo
|
||||
|
||||
@ -1795,6 +1795,7 @@ int ObXAService::two_phase_xa_rollback_(const ObXATransID &xid,
|
||||
}
|
||||
xa_ctx_mgr_.revert_xa_ctx(xa_ctx);
|
||||
}
|
||||
xa_cache_.clean_prepare_cache_item(xid);
|
||||
|
||||
return ret;
|
||||
}
|
||||
@ -2232,9 +2233,14 @@ int ObXAService::local_xa_prepare_(const ObXATransID &xid,
|
||||
int ret = OB_SUCCESS;
|
||||
bool alloc = false;
|
||||
ObXACtx *xa_ctx = NULL;
|
||||
|
||||
if (OB_FAIL(xa_ctx_mgr_.get_xa_ctx(tx_id, alloc, xa_ctx))) {
|
||||
TRANS_LOG(WARN, "get xa ctx failed", K(ret), K(xid), K(tx_id), KP(xa_ctx));
|
||||
int64_t xa_state = ObXATransState::UNKNOWN;
|
||||
if (OB_SUCCESS == xa_cache_.query_prepare_cache_item(xid, xa_state) && ObXATransState::PREPARED == xa_state) {
|
||||
TRANS_LOG(INFO, "xa_cache hit", K(xid), K(tx_id));
|
||||
ret = OB_SUCCESS;
|
||||
} else {
|
||||
TRANS_LOG(WARN, "get xa ctx failed", K(ret), K(xid), K(tx_id), KP(xa_ctx));
|
||||
}
|
||||
} else if (OB_ISNULL(xa_ctx)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(WARN, "xa ctx is null", K(ret), K(xid), K(tx_id));
|
||||
@ -2850,6 +2856,8 @@ int ObXAService::two_phase_xa_commit_(const ObXATransID &xid,
|
||||
has_tx_level_temp_table = ObXAFlag::contain_temp_table(end_flag);
|
||||
}
|
||||
|
||||
xa_cache_.clean_prepare_cache_item(xid);
|
||||
|
||||
TRANS_LOG(INFO, "two phase xa commit", K(ret), K(xid), K(tx_id), K(coordinator));
|
||||
return ret;
|
||||
}
|
||||
@ -2873,6 +2881,48 @@ void ObXAService::clear_xa_branch(const ObXATransID &xid, ObTxDesc *&tx_desc)
|
||||
TRANS_LOG(INFO, "clear xa branch", K(xid), K(tx_id));
|
||||
}
|
||||
|
||||
// XACache
|
||||
int ObXACache::query_prepare_cache_item(const ObXATransID &xid, int64_t &state)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int idx = xid.get_hash() % XA_PREPARE_CACHE_COUNT;
|
||||
ObXACacheItem &item = xa_prepare_cache_[idx];
|
||||
ObSpinLockGuard guard(item.lock_);
|
||||
if (item.is_valid_to_query(xid)) {
|
||||
state = item.state_;
|
||||
} else {
|
||||
ret = OB_HASH_NOT_EXIST;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ObXACache::insert_prepare_cache_item(const ObXATransID &xid, int64_t state)
|
||||
{
|
||||
int idx = xid.get_hash() % XA_PREPARE_CACHE_COUNT;
|
||||
ObXACacheItem &item = xa_prepare_cache_[idx];
|
||||
ObSpinLockGuard guard(item.lock_);
|
||||
if (item.is_valid_to_set()) {
|
||||
clean_prepare_cache_item_(item);
|
||||
item.state_ = state;
|
||||
item.xid_ = xid;
|
||||
item.create_timestamp_ = ObTimeUtility::current_time();
|
||||
}
|
||||
}
|
||||
|
||||
void ObXACache::clean_prepare_cache_item(const ObXATransID &xid) {
|
||||
int idx = xid.get_hash() % XA_PREPARE_CACHE_COUNT;
|
||||
ObXACacheItem &item = xa_prepare_cache_[idx];
|
||||
ObSpinLockGuard guard(item.lock_);
|
||||
if (item.is_valid_to_query(xid)) {
|
||||
clean_prepare_cache_item_(item);
|
||||
}
|
||||
}
|
||||
|
||||
void ObXACache::clean_prepare_cache_item_(ObXACacheItem &item) {
|
||||
// should get lock in upper layer
|
||||
item.reset();
|
||||
}
|
||||
|
||||
}//transaction
|
||||
|
||||
|
||||
|
||||
@ -45,6 +45,49 @@ class ObTransService;
|
||||
class ObXATransID;
|
||||
class ObStmtParam;
|
||||
|
||||
struct ObXACacheItem {
|
||||
public:
|
||||
static const int64_t XA_CACHE_ITEM_EXPIRE_TIME = 2000000; // 2s
|
||||
ObXACacheItem(): state_(ObXATransState::UNKNOWN), create_timestamp_(0) {}
|
||||
~ObXACacheItem() { reset(); }
|
||||
void reset()
|
||||
{
|
||||
state_ = ObXATransState::UNKNOWN;
|
||||
create_timestamp_ = 0;
|
||||
xid_.reset();
|
||||
}
|
||||
|
||||
bool is_valid_to_query(ObXATransID xid)
|
||||
{
|
||||
return this->xid_.all_equal_to(xid);
|
||||
}
|
||||
|
||||
bool is_valid_to_set()
|
||||
{
|
||||
return 0 == this->create_timestamp_ ||
|
||||
this->create_timestamp_ + XA_CACHE_ITEM_EXPIRE_TIME < ObTimeUtility::current_time();
|
||||
}
|
||||
|
||||
public:
|
||||
ObSpinLock lock_;
|
||||
ObXATransID xid_;
|
||||
int64_t state_;
|
||||
int64_t create_timestamp_;
|
||||
TO_STRING_KV(K(xid_), K(state_), K(create_timestamp_));
|
||||
};
|
||||
|
||||
class ObXACache {
|
||||
public:
|
||||
static const uint64_t XA_PREPARE_CACHE_COUNT = 8000;
|
||||
int query_prepare_cache_item(const ObXATransID &xid, int64_t &state);
|
||||
void insert_prepare_cache_item(const ObXATransID &xid, int64_t state);
|
||||
void clean_prepare_cache_item(const ObXATransID &xid);
|
||||
private:
|
||||
void clean_prepare_cache_item_(ObXACacheItem &item);
|
||||
private:
|
||||
ObXACacheItem xa_prepare_cache_[XA_PREPARE_CACHE_COUNT];
|
||||
};
|
||||
|
||||
class ObXAService
|
||||
{
|
||||
public:
|
||||
@ -279,6 +322,7 @@ private:
|
||||
public:
|
||||
int xa_scheduler_hb_req();
|
||||
int gc_invalid_xa_record(const uint64_t tenant_id);
|
||||
ObXACache &get_xa_cache() { return xa_cache_; }
|
||||
private:
|
||||
ObXACtxMgr xa_ctx_mgr_;
|
||||
obrpc::ObXARpcProxy xa_proxy_;
|
||||
@ -289,6 +333,7 @@ private:
|
||||
bool is_running_;
|
||||
bool is_inited_;
|
||||
ObXAStatistics xa_statistics_;
|
||||
ObXACache xa_cache_;
|
||||
};
|
||||
|
||||
}//transaction
|
||||
|
||||
Reference in New Issue
Block a user