add interface refresh_read_snaphost_tx_state for read latest
This commit is contained in:
@ -2141,6 +2141,44 @@ bool ObTransService::tx_need_reset_(const int error_code) const
|
|||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int ObTransService::refresh_read_snapshot_tx_state(ObTxReadSnapshot &snapshot, const ObTxDesc &tx)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
if (snapshot.is_weak_read()) {
|
||||||
|
} else {
|
||||||
|
ObSpinLockGuard guard(tx.lock_);
|
||||||
|
if (tx.is_in_tx()) {
|
||||||
|
snapshot.core_.scn_ = tx.get_tx_seq();
|
||||||
|
if (tx.is_tx_active()) {
|
||||||
|
if (snapshot.is_ls_snapshot() && (snapshot.parts_.count() == 1 && snapshot.parts_.at(0).left_ == snapshot.snapshot_lsid_)) {
|
||||||
|
// skip refresh
|
||||||
|
} else if (snapshot.parts_.count() == tx.parts_.count()) {
|
||||||
|
// skip refresh, tx.parts_ will never be reduced
|
||||||
|
} else { // slow path
|
||||||
|
if (!snapshot.core_.tx_id_.is_valid()) {
|
||||||
|
snapshot.core_.tx_id_ = tx.tx_id_;
|
||||||
|
}
|
||||||
|
snapshot.parts_.reuse();
|
||||||
|
if (OB_FAIL(snapshot.parts_.reserve(tx.parts_.count()))) {
|
||||||
|
TRANS_LOG(WARN, "reserve space for snapshot's parts fail", K(ret), K(snapshot), K(tx));
|
||||||
|
} else {
|
||||||
|
ARRAY_FOREACH(tx.parts_, i) {
|
||||||
|
const ObTxPart &it = tx.parts_.at(i);
|
||||||
|
ret = snapshot.parts_.push_back(ObTxLSEpochPair(it.id_, it.epoch_));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (TC_REACH_TIME_INTERVAL(100_ms)) {
|
||||||
|
TRANS_LOG(INFO, "[TxReadSnapshot] refresh tx state", K(snapshot), "tx_id", tx.tx_id_);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
ret = OB_TRANS_HAS_DECIDED;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
} // transaction
|
} // transaction
|
||||||
} // namespace
|
} // namespace
|
||||||
#undef TXN_API_SANITY_CHECK_FOR_TXN_FREE_ROUTE
|
#undef TXN_API_SANITY_CHECK_FOR_TXN_FREE_ROUTE
|
||||||
|
|||||||
@ -237,6 +237,22 @@ int get_ls_read_snapshot_version(const share::ObLSID &local_ls_id,
|
|||||||
int get_weak_read_snapshot_version(const int64_t max_read_stale_time,
|
int get_weak_read_snapshot_version(const int64_t max_read_stale_time,
|
||||||
const bool local_single_ls,
|
const bool local_single_ls,
|
||||||
share::SCN &snapshot_version);
|
share::SCN &snapshot_version);
|
||||||
|
/**
|
||||||
|
* refresh_read_snapshot_tx_state - update snapshot's tx state part
|
||||||
|
*
|
||||||
|
* @snapshot: the snapshot to refresh
|
||||||
|
* @tx: the target tx descriptor
|
||||||
|
*
|
||||||
|
* this interface try to update snapshot's tx state part by tx descriptor
|
||||||
|
* it was used by conflict checker which need read latest state of current
|
||||||
|
* transaction (while other read only read the data of before current stmt)
|
||||||
|
*
|
||||||
|
* Return:
|
||||||
|
* OB_SUCCESS - OK
|
||||||
|
* OB_HAS_DECIDED - transaction has terminated
|
||||||
|
* OB_XXX - unexpected error
|
||||||
|
*/
|
||||||
|
int refresh_read_snapshot_tx_state(ObTxReadSnapshot &snapshot, const ObTxDesc &tx);
|
||||||
/*
|
/*
|
||||||
* release_snapshot - release snapshot
|
* release_snapshot - release snapshot
|
||||||
*
|
*
|
||||||
|
|||||||
Reference in New Issue
Block a user