diff --git a/src/storage/tx/wrs/ob_tenant_weak_read_cluster_service.cpp b/src/storage/tx/wrs/ob_tenant_weak_read_cluster_service.cpp index 0a74ea3968..b421f8e1d0 100644 --- a/src/storage/tx/wrs/ob_tenant_weak_read_cluster_service.cpp +++ b/src/storage/tx/wrs/ob_tenant_weak_read_cluster_service.cpp @@ -377,7 +377,7 @@ int ObTenantWeakReadClusterService::start_service() ret = OB_ERR_UNEXPECTED; LOG_ERROR("WRS leader epoch is invalid", KR(ret), K(leader_epoch)); } else { - SCN cur_min_version, cur_max_version, gts_scn; + SCN cur_min_version, cur_max_version, gts_scn, gts_barrier_scn; bool record_exist = false; begin_query_ts = ObTimeUtility::current_time(); @@ -387,37 +387,41 @@ int ObTenantWeakReadClusterService::start_service() LOG_WARN("query cluster version range from WRS table fail", KR(ret)); } else if (OB_FAIL(OB_TS_MGR.get_gts(tenant_id, NULL, gts_scn))) { LOG_WARN("get gts error", K(ret), K(cur_min_version), K(cur_max_version)); + } else if (OB_FAIL(gts_barrier_scn.convert_from_ts(gts_scn.convert_to_ts() - max_stale_time))) { + LOG_WARN("convert from ts error", K(ret), K(gts_scn), K(gts_barrier_scn)); } else { begin_persist_ts = ObTimeUtility::current_time(); // new weak read version delay should smaller than max_stale_time - int64_t new_version = MAX(cur_max_version.convert_to_ts(), gts_scn.convert_to_ts() - max_stale_time); - SCN new_version_scn; - if (OB_FAIL(new_version_scn.convert_from_ts(new_version))) { - LOG_ERROR("convert for gts fail", KR(ret)); + SCN new_version_scn = SCN::max(cur_max_version, gts_barrier_scn); + SCN new_min_version = new_version_scn; + SCN new_max_version = generate_max_version_(new_min_version); + int64_t affected_rows = 0; + + // do persist + if (OB_FAIL(persist_version_if_need_(cur_min_version, cur_max_version, + new_min_version, new_max_version, record_exist, affected_rows))) { + LOG_WARN("persist version if need fail", KR(ret), K(cluster_service_tablet_id_), K(cur_min_version), + K(cur_max_version), K(new_min_version), K(new_max_version), K(max_stale_time), + K(record_exist), K(affected_rows), K(error_count_for_change_leader_), + K(last_error_tstamp_for_change_leader_)); + } else if (new_min_version < min_version_ || new_max_version < max_version_) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("unexpected new_min_version or new_max_version", K(new_min_version), + K(new_max_version), + K(min_version_), + K(max_version_), + K(current_version_)); } else { - SCN new_min_version = new_version_scn; - SCN new_max_version = generate_max_version_(new_min_version); - int64_t affected_rows = 0; + // init version + min_version_ = new_min_version; + max_version_ = new_max_version; + current_version_.atomic_set(new_version_scn); - // do persist - if (OB_FAIL(persist_version_if_need_(cur_min_version, cur_max_version, - new_min_version, new_max_version, record_exist, affected_rows))) { - LOG_WARN("persist version if need fail", KR(ret), K(cluster_service_tablet_id_), K(cur_min_version), - K(cur_max_version), K(new_min_version), K(new_max_version), K(max_stale_time), - K(record_exist), K(affected_rows), K(error_count_for_change_leader_), - K(last_error_tstamp_for_change_leader_)); - } else { - // init version - min_version_ = new_min_version; - max_version_ = new_max_version; - current_version_.atomic_set(new_version_scn); - - // weak read service start success - leader_epoch_ = leader_epoch; - ATOMIC_STORE(&in_service_, true); - ATOMIC_STORE(&start_service_tstamp_, ObTimeUtility::current_time()); - } + // weak read service start success + leader_epoch_ = leader_epoch; + ATOMIC_STORE(&in_service_, true); + ATOMIC_STORE(&start_service_tstamp_, ObTimeUtility::current_time()); } } }