Fix weak read timeout bug
This commit is contained in:
@ -377,7 +377,7 @@ int ObTenantWeakReadClusterService::start_service()
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("WRS leader epoch is invalid", KR(ret), K(leader_epoch));
|
||||
} else {
|
||||
SCN cur_min_version, cur_max_version, gts_scn;
|
||||
SCN cur_min_version, cur_max_version, gts_scn, gts_barrier_scn;
|
||||
bool record_exist = false;
|
||||
|
||||
begin_query_ts = ObTimeUtility::current_time();
|
||||
@ -387,37 +387,41 @@ int ObTenantWeakReadClusterService::start_service()
|
||||
LOG_WARN("query cluster version range from WRS table fail", KR(ret));
|
||||
} else if (OB_FAIL(OB_TS_MGR.get_gts(tenant_id, NULL, gts_scn))) {
|
||||
LOG_WARN("get gts error", K(ret), K(cur_min_version), K(cur_max_version));
|
||||
} else if (OB_FAIL(gts_barrier_scn.convert_from_ts(gts_scn.convert_to_ts() - max_stale_time))) {
|
||||
LOG_WARN("convert from ts error", K(ret), K(gts_scn), K(gts_barrier_scn));
|
||||
} else {
|
||||
begin_persist_ts = ObTimeUtility::current_time();
|
||||
|
||||
// new weak read version delay should smaller than max_stale_time
|
||||
int64_t new_version = MAX(cur_max_version.convert_to_ts(), gts_scn.convert_to_ts() - max_stale_time);
|
||||
SCN new_version_scn;
|
||||
if (OB_FAIL(new_version_scn.convert_from_ts(new_version))) {
|
||||
LOG_ERROR("convert for gts fail", KR(ret));
|
||||
SCN new_version_scn = SCN::max(cur_max_version, gts_barrier_scn);
|
||||
SCN new_min_version = new_version_scn;
|
||||
SCN new_max_version = generate_max_version_(new_min_version);
|
||||
int64_t affected_rows = 0;
|
||||
|
||||
// do persist
|
||||
if (OB_FAIL(persist_version_if_need_(cur_min_version, cur_max_version,
|
||||
new_min_version, new_max_version, record_exist, affected_rows))) {
|
||||
LOG_WARN("persist version if need fail", KR(ret), K(cluster_service_tablet_id_), K(cur_min_version),
|
||||
K(cur_max_version), K(new_min_version), K(new_max_version), K(max_stale_time),
|
||||
K(record_exist), K(affected_rows), K(error_count_for_change_leader_),
|
||||
K(last_error_tstamp_for_change_leader_));
|
||||
} else if (new_min_version < min_version_ || new_max_version < max_version_) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("unexpected new_min_version or new_max_version", K(new_min_version),
|
||||
K(new_max_version),
|
||||
K(min_version_),
|
||||
K(max_version_),
|
||||
K(current_version_));
|
||||
} else {
|
||||
SCN new_min_version = new_version_scn;
|
||||
SCN new_max_version = generate_max_version_(new_min_version);
|
||||
int64_t affected_rows = 0;
|
||||
// init version
|
||||
min_version_ = new_min_version;
|
||||
max_version_ = new_max_version;
|
||||
current_version_.atomic_set(new_version_scn);
|
||||
|
||||
// do persist
|
||||
if (OB_FAIL(persist_version_if_need_(cur_min_version, cur_max_version,
|
||||
new_min_version, new_max_version, record_exist, affected_rows))) {
|
||||
LOG_WARN("persist version if need fail", KR(ret), K(cluster_service_tablet_id_), K(cur_min_version),
|
||||
K(cur_max_version), K(new_min_version), K(new_max_version), K(max_stale_time),
|
||||
K(record_exist), K(affected_rows), K(error_count_for_change_leader_),
|
||||
K(last_error_tstamp_for_change_leader_));
|
||||
} else {
|
||||
// init version
|
||||
min_version_ = new_min_version;
|
||||
max_version_ = new_max_version;
|
||||
current_version_.atomic_set(new_version_scn);
|
||||
|
||||
// weak read service start success
|
||||
leader_epoch_ = leader_epoch;
|
||||
ATOMIC_STORE(&in_service_, true);
|
||||
ATOMIC_STORE(&start_service_tstamp_, ObTimeUtility::current_time());
|
||||
}
|
||||
// weak read service start success
|
||||
leader_epoch_ = leader_epoch;
|
||||
ATOMIC_STORE(&in_service_, true);
|
||||
ATOMIC_STORE(&start_service_tstamp_, ObTimeUtility::current_time());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user