Adjust the read snapshot when max_read_stale_time is valid
This commit is contained in:
@ -401,7 +401,7 @@ int ObTableApiProcessorBase::setup_tx_snapshot_(transaction::ObTxDesc &trans_des
|
|||||||
} else {
|
} else {
|
||||||
SCN weak_read_snapshot;
|
SCN weak_read_snapshot;
|
||||||
if (OB_FAIL(txs->get_weak_read_snapshot_version(
|
if (OB_FAIL(txs->get_weak_read_snapshot_version(
|
||||||
transaction::ObWeakReadUtil::max_stale_time_for_weak_consistency(MTL_ID()),
|
-1, // system variable : max read stale time for user
|
||||||
weak_read_snapshot))) {
|
weak_read_snapshot))) {
|
||||||
LOG_WARN("fail to get weak read snapshot", K(ret));
|
LOG_WARN("fail to get weak read snapshot", K(ret));
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
@ -642,7 +642,7 @@ int ObSqlTransControl::stmt_setup_snapshot_(ObSQLSessionInfo *session,
|
|||||||
if (cl == ObConsistencyLevel::WEAK || cl == ObConsistencyLevel::FROZEN) {
|
if (cl == ObConsistencyLevel::WEAK || cl == ObConsistencyLevel::FROZEN) {
|
||||||
SCN snapshot_version = SCN::min_scn();
|
SCN snapshot_version = SCN::min_scn();
|
||||||
if (OB_FAIL(txs->get_weak_read_snapshot_version(session->get_ob_max_read_stale_time(),
|
if (OB_FAIL(txs->get_weak_read_snapshot_version(session->get_ob_max_read_stale_time(),
|
||||||
snapshot_version))) {
|
snapshot_version))) {
|
||||||
TRANS_LOG(WARN, "get weak read snapshot fail", KPC(txs));
|
TRANS_LOG(WARN, "get weak read snapshot fail", KPC(txs));
|
||||||
int64_t stale_time = session->get_ob_max_read_stale_time();
|
int64_t stale_time = session->get_ob_max_read_stale_time();
|
||||||
int64_t refresh_interval = GCONF.weak_read_version_refresh_interval;
|
int64_t refresh_interval = GCONF.weak_read_version_refresh_interval;
|
||||||
@ -1188,7 +1188,7 @@ int ObSqlTransControl::check_ls_readable(const uint64_t tenant_id,
|
|||||||
|| max_stale_time_us <= -2) {
|
|| max_stale_time_us <= -2) {
|
||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
LOG_WARN("invalid argument", K(ls_id), K(addr), K(max_stale_time_us));
|
LOG_WARN("invalid argument", K(ls_id), K(addr), K(max_stale_time_us));
|
||||||
} else if (-1 == max_stale_time_us) {
|
} else if (max_stale_time_us < 0) {
|
||||||
// no need check
|
// no need check
|
||||||
can_read = true;
|
can_read = true;
|
||||||
} else if (observer::ObServer::get_instance().get_self() == addr) {
|
} else if (observer::ObServer::get_instance().get_self() == addr) {
|
||||||
|
|||||||
@ -441,7 +441,7 @@ int ObMultiVersionGarbageCollector::study_min_unallocated_WRS(
|
|||||||
transaction::ObWeakReadUtil::max_stale_time_for_weak_consistency(MTL_ID());
|
transaction::ObWeakReadUtil::max_stale_time_for_weak_consistency(MTL_ID());
|
||||||
|
|
||||||
if (OB_FAIL(MTL(transaction::ObTransService*)->get_weak_read_snapshot_version(
|
if (OB_FAIL(MTL(transaction::ObTransService*)->get_weak_read_snapshot_version(
|
||||||
max_read_stale_time,
|
-1, // system variable : max read stale time for user
|
||||||
min_unallocated_WRS))) {
|
min_unallocated_WRS))) {
|
||||||
MVCC_LOG(WARN, "fail to get weak read snapshot", K(ret));
|
MVCC_LOG(WARN, "fail to get weak read snapshot", K(ret));
|
||||||
if (OB_REPLICA_NOT_READABLE == ret) {
|
if (OB_REPLICA_NOT_READABLE == ret) {
|
||||||
|
|||||||
@ -710,33 +710,42 @@ int ObTransService::get_ls_read_snapshot_version(const share::ObLSID &local_ls_i
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
ERRSIM_POINT_DEF(ERRSIM_WEAK_READ_SNAPSHOT_DELAY_US);
|
int ObTransService::get_weak_read_snapshot_version(const int64_t max_read_stale_us_for_user,
|
||||||
|
|
||||||
int ObTransService::get_weak_read_snapshot_version(const int64_t max_read_stale_time,
|
|
||||||
SCN &snapshot)
|
SCN &snapshot)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
bool monotinic_read = true;;
|
bool monotinic_read = true;;
|
||||||
|
SCN wrs_scn;
|
||||||
|
|
||||||
// server weak read version
|
// server weak read version
|
||||||
if (!ObWeakReadUtil::enable_monotonic_weak_read(tenant_id_)) {
|
if (!ObWeakReadUtil::enable_monotonic_weak_read(tenant_id_)) {
|
||||||
if (OB_FAIL(GCTX.weak_read_service_->get_server_version(tenant_id_, snapshot))) {
|
if (OB_FAIL(GCTX.weak_read_service_->get_server_version(tenant_id_, wrs_scn))) {
|
||||||
TRANS_LOG(WARN, "get server read snapshot fail", K(ret), KPC(this));
|
TRANS_LOG(WARN, "get server read snapshot fail", K(ret), KPC(this));
|
||||||
}
|
}
|
||||||
monotinic_read = false;
|
monotinic_read = false;
|
||||||
// wrs cluster version
|
// wrs cluster version
|
||||||
} else if (OB_FAIL(GCTX.weak_read_service_->get_cluster_version(tenant_id_, snapshot))) {
|
} else if (OB_FAIL(GCTX.weak_read_service_->get_cluster_version(tenant_id_, wrs_scn))) {
|
||||||
TRANS_LOG(WARN, "get weak read snapshot fail", K(ret), KPC(this));
|
TRANS_LOG(WARN, "get weak read snapshot fail", K(ret), KPC(this));
|
||||||
} else if (-1 == max_read_stale_time) {
|
|
||||||
// no need to check barrier version
|
|
||||||
// do nothing
|
|
||||||
} else {
|
} else {
|
||||||
// check snapshot version barrier which is setted by user system variable
|
// do nothing
|
||||||
const int64_t snapshot_barrier = ObTimeUtility::current_time() - max_read_stale_time
|
}
|
||||||
+ abs(ERRSIM_WEAK_READ_SNAPSHOT_DELAY_US);
|
if (OB_SUCC(ret)) {
|
||||||
if (snapshot.convert_to_ts() < snapshot_barrier) {
|
if (max_read_stale_us_for_user < 0) {
|
||||||
TRANS_LOG(WARN, "weak read snapshot too stale", K(snapshot),
|
// no need to check barrier version
|
||||||
K(snapshot_barrier), "delta", (snapshot_barrier - snapshot.convert_to_ts()));
|
snapshot = wrs_scn;
|
||||||
ret = OB_REPLICA_NOT_READABLE;
|
} else {
|
||||||
|
// check snapshot version barrier which is setted by user system variable
|
||||||
|
SCN gts_cache;
|
||||||
|
SCN current_scn;
|
||||||
|
if (OB_FAIL(OB_TS_MGR.get_gts(tenant_id_, NULL, gts_cache))) {
|
||||||
|
TRANS_LOG(WARN, "get ts sync error", K(ret), K(max_read_stale_us_for_user));
|
||||||
|
} else {
|
||||||
|
const int64_t current_time_us = MTL_IS_PRIMARY_TENANT()
|
||||||
|
? std::max(ObTimeUtility::current_time(), gts_cache.convert_to_ts())
|
||||||
|
: 0 ;
|
||||||
|
current_scn.convert_from_ts(current_time_us - max_read_stale_us_for_user);
|
||||||
|
snapshot = SCN::max(wrs_scn, current_scn);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
TRANS_LOG(TRACE, "get weak-read snapshot", K(ret), K(snapshot), K(monotinic_read));
|
TRANS_LOG(TRACE, "get weak-read snapshot", K(ret), K(snapshot), K(monotinic_read));
|
||||||
|
|||||||
Reference in New Issue
Block a user