add interface refresh_read_snaphost_tx_state for read latest
This commit is contained in:
parent
2939bcb2af
commit
ce48a373dd
@ -2141,6 +2141,44 @@ bool ObTransService::tx_need_reset_(const int error_code) const
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTransService::refresh_read_snapshot_tx_state(ObTxReadSnapshot &snapshot, const ObTxDesc &tx)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (snapshot.is_weak_read()) {
|
||||
} else {
|
||||
ObSpinLockGuard guard(tx.lock_);
|
||||
if (tx.is_in_tx()) {
|
||||
snapshot.core_.scn_ = tx.get_tx_seq();
|
||||
if (tx.is_tx_active()) {
|
||||
if (snapshot.is_ls_snapshot() && (snapshot.parts_.count() == 1 && snapshot.parts_.at(0).left_ == snapshot.snapshot_lsid_)) {
|
||||
// skip refresh
|
||||
} else if (snapshot.parts_.count() == tx.parts_.count()) {
|
||||
// skip refresh, tx.parts_ will never be reduced
|
||||
} else { // slow path
|
||||
if (!snapshot.core_.tx_id_.is_valid()) {
|
||||
snapshot.core_.tx_id_ = tx.tx_id_;
|
||||
}
|
||||
snapshot.parts_.reuse();
|
||||
if (OB_FAIL(snapshot.parts_.reserve(tx.parts_.count()))) {
|
||||
TRANS_LOG(WARN, "reserve space for snapshot's parts fail", K(ret), K(snapshot), K(tx));
|
||||
} else {
|
||||
ARRAY_FOREACH(tx.parts_, i) {
|
||||
const ObTxPart &it = tx.parts_.at(i);
|
||||
ret = snapshot.parts_.push_back(ObTxLSEpochPair(it.id_, it.epoch_));
|
||||
}
|
||||
}
|
||||
if (TC_REACH_TIME_INTERVAL(100_ms)) {
|
||||
TRANS_LOG(INFO, "[TxReadSnapshot] refresh tx state", K(snapshot), "tx_id", tx.tx_id_);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
ret = OB_TRANS_HAS_DECIDED;
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
} // transaction
|
||||
} // namespace
|
||||
#undef TXN_API_SANITY_CHECK_FOR_TXN_FREE_ROUTE
|
||||
|
@ -237,6 +237,22 @@ int get_ls_read_snapshot_version(const share::ObLSID &local_ls_id,
|
||||
int get_weak_read_snapshot_version(const int64_t max_read_stale_time,
|
||||
const bool local_single_ls,
|
||||
share::SCN &snapshot_version);
|
||||
/**
|
||||
* refresh_read_snapshot_tx_state - update snapshot's tx state part
|
||||
*
|
||||
* @snapshot: the snapshot to refresh
|
||||
* @tx: the target tx descriptor
|
||||
*
|
||||
* this interface try to update snapshot's tx state part by tx descriptor
|
||||
* it was used by conflict checker which need read latest state of current
|
||||
* transaction (while other read only read the data of before current stmt)
|
||||
*
|
||||
* Return:
|
||||
* OB_SUCCESS - OK
|
||||
* OB_HAS_DECIDED - transaction has terminated
|
||||
* OB_XXX - unexpected error
|
||||
*/
|
||||
int refresh_read_snapshot_tx_state(ObTxReadSnapshot &snapshot, const ObTxDesc &tx);
|
||||
/*
|
||||
* release_snapshot - release snapshot
|
||||
*
|
||||
|
Loading…
x
Reference in New Issue
Block a user