diff --git a/src/share/system_variable/ob_system_variable_init.json b/src/share/system_variable/ob_system_variable_init.json index 6728ced6a2..3846c6e2fc 100644 --- a/src/share/system_variable/ob_system_variable_init.json +++ b/src/share/system_variable/ob_system_variable_init.json @@ -2911,7 +2911,7 @@ "ob_max_read_stale_time": { "id": 10137, "name": "ob_max_read_stale_time", - "value": "5000000", + "value": "-1", "data_type": "int", "info": "max stale time(us) for weak read query ", "flags": "GLOBAL | SESSION | NEED_SERIALIZE", diff --git a/src/sql/ob_sql_trans_control.cpp b/src/sql/ob_sql_trans_control.cpp index 11366af20c..decc001531 100644 --- a/src/sql/ob_sql_trans_control.cpp +++ b/src/sql/ob_sql_trans_control.cpp @@ -1183,11 +1183,12 @@ int ObSqlTransControl::check_ls_readable(const uint64_t tenant_id, can_read = false; if (!ls_id.is_valid() - || !addr.is_valid() - || max_stale_time_us <= 0) { + || !addr.is_valid()) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid argument", K(ls_id), K(addr), K(max_stale_time_us)); + } else if (-1 == max_stale_time_us) { + // no need check + can_read = true; } else if (observer::ObServer::get_instance().get_self() == addr) { storage::ObLSService *ls_svr = MTL(storage::ObLSService *); storage::ObLSHandle handle; @@ -1201,16 +1202,12 @@ int ObSqlTransControl::check_ls_readable(const uint64_t tenant_id, } else if (OB_ISNULL(ls = handle.get_ls())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("id service log stream not exist"); - } else { - can_read = true; - } - /* } else if (ObTimeUtility::current_time() - max_stale_time_us < ls->get_ls_wrs_handler()->get_ls_weak_read_ts().convert_to_ts()) { can_read = true; } else if (REACH_TIME_INTERVAL(10 * 1000 * 1000)) { LOG_WARN("log stream unreadable", K(ls_id), K(addr), K(max_stale_time_us)); - }*/ + } } else { LOG_TRACE("log stream is not local", K(ls_id), K(addr)); } diff --git a/src/storage/tx/ob_tx_api.cpp b/src/storage/tx/ob_tx_api.cpp index 7485126e5d..9c429fe21f 100644 --- a/src/storage/tx/ob_tx_api.cpp +++ b/src/storage/tx/ob_tx_api.cpp @@ -715,7 +715,6 @@ ERRSIM_POINT_DEF(ERRSIM_WEAK_READ_SNAPSHOT_DELAY_US); int ObTransService::get_weak_read_snapshot_version(const int64_t max_read_stale_time, SCN &snapshot) { - UNUSED(max_read_stale_time); int ret = OB_SUCCESS; bool monotinic_read = true;; // server weak read version @@ -727,12 +726,12 @@ int ObTransService::get_weak_read_snapshot_version(const int64_t max_read_stale_ // wrs cluster version } else if (OB_FAIL(GCTX.weak_read_service_->get_cluster_version(tenant_id_, snapshot))) { TRANS_LOG(WARN, "get weak read snapshot fail", K(ret), KPC(this)); - } else { + } else if (-1 == max_read_stale_time) { + // no need to check barrier version // do nothing - } - if (OB_SUCC(ret)) { - int64_t max_stale_time = ObWeakReadUtil::max_stale_time_for_weak_consistency(tenant_id_, 0); - const int64_t snapshot_barrier = ObTimeUtility::current_time() - max_stale_time + } else { + // check snapshot version barrier which is setted by user system variable + const int64_t snapshot_barrier = ObTimeUtility::current_time() - max_read_stale_time + abs(ERRSIM_WEAK_READ_SNAPSHOT_DELAY_US); if (snapshot.convert_to_ts() < snapshot_barrier) { TRANS_LOG(WARN, "weak read snapshot too stale", K(snapshot), diff --git a/src/storage/tx/wrs/ob_black_list.cpp b/src/storage/tx/wrs/ob_black_list.cpp index 39abe79b1c..3067237e7e 100644 --- a/src/storage/tx/wrs/ob_black_list.cpp +++ b/src/storage/tx/wrs/ob_black_list.cpp @@ -15,6 +15,7 @@ #include "observer/ob_server_struct.h" // for GCTX #include "deps/oblib/src/common/ob_role.h" // role #include "src/storage/tx/wrs/ob_weak_read_util.h" // ObWeakReadUtil +#include "src/storage/tx/ob_ts_mgr.h" namespace oceanbase { @@ -235,26 +236,28 @@ int ObBLService::do_black_list_check_(sqlclient::ObMySQLResult *result) { int ret = OB_SUCCESS; int64_t max_stale_time = 0; - int64_t curr_time_ns = ObTimeUtility::current_time_ns(); while (OB_SUCC(result->next())) { ObBLKey bl_key; ObLsInfo ls_info; + SCN gts_scn; if (OB_FAIL(get_info_from_result_(*result, bl_key, ls_info))) { TRANS_LOG(WARN, "get_info_from_result_ fail ", KR(ret), K(result)); } else if (LEADER == ls_info.ls_state_) { // cannot add leader into blacklist } else if (ls_info.weak_read_scn_ == 0 && ls_info.migrate_status_ == OB_MIGRATE_STATUS_NONE) { // log stream is initializing, should't be put into blacklist + } else if (OB_FAIL(OB_TS_MGR.get_gts(bl_key.get_tenant_id(), NULL, gts_scn))) { + TRANS_LOG(WARN, "get gts scn error", K(ret), K(bl_key)); } else { max_stale_time = get_tenant_max_stale_time_(bl_key.get_tenant_id()); int64_t max_stale_time_ns = max_stale_time * 1000; - if (curr_time_ns > ls_info.weak_read_scn_ + max_stale_time_ns) { + if (gts_scn.get_val_for_gts() > ls_info.weak_read_scn_ + max_stale_time_ns) { // scn is out-of-time,add this log stream into blacklist if (OB_FAIL(ls_bl_mgr_.update(bl_key, ls_info))) { TRANS_LOG(WARN, "ls_bl_mgr_ add fail ", K(bl_key), K(ls_info)); } - } else if (curr_time_ns + BLACK_LIST_WHITEWASH_INTERVAL_NS < ls_info.weak_read_scn_ + max_stale_time_ns) { + } else if (gts_scn.get_val_for_gts() + BLACK_LIST_WHITEWASH_INTERVAL_NS < ls_info.weak_read_scn_ + max_stale_time_ns) { // scn is new enough,remove this log stream in the blacklist ls_bl_mgr_.remove(bl_key); } else { diff --git a/src/storage/tx/wrs/ob_ls_wrs_handler.cpp b/src/storage/tx/wrs/ob_ls_wrs_handler.cpp index 94a433cadf..f78aa445fe 100644 --- a/src/storage/tx/wrs/ob_ls_wrs_handler.cpp +++ b/src/storage/tx/wrs/ob_ls_wrs_handler.cpp @@ -81,6 +81,7 @@ int ObLSWRSHandler::generate_ls_weak_read_snapshot_version(ObLS &ls, { int ret = OB_SUCCESS; SCN timestamp; + SCN gts_scn; const ObLSID &ls_id = ls.get_ls_id(); need_skip = false; ObMigrationStatus status = ObMigrationStatus::OB_MIGRATION_STATUS_NONE; @@ -99,10 +100,12 @@ int ObLSWRSHandler::generate_ls_weak_read_snapshot_version(ObLS &ls, STORAGE_LOG(DEBUG, "fail to generate weak read timestamp", KR(ret), K(max_stale_time)); need_skip = true; ret = OB_SUCCESS; + } else if (OB_TS_MGR.get_gts(MTL_ID(), NULL, gts_scn)) { + TRANS_LOG(WARN, "get gts scn error", K(ls_id), K(max_stale_time)); } else if (OB_FAIL(ls.get_migration_status(status)) || ObMigrationStatus::OB_MIGRATION_STATUS_NONE == status ) { // check the weak read timestamp of the migrated ls - if (timestamp.get_val_for_logservice() > ObTimeUtility::current_time() - 500 * 1000) { + if (timestamp.convert_to_ts() > gts_scn.convert_to_ts() - 500 * 1000) { STORAGE_LOG(TRACE, "ls received the latest log", K(timestamp)); // clog chases within 500ms, then clear the mark need_skip = false; @@ -110,8 +113,8 @@ int ObLSWRSHandler::generate_ls_weak_read_snapshot_version(ObLS &ls, need_skip = true; } } else { - int64_t snapshot_version_barrier = ObTimeUtility::current_time() - max_stale_time; - if (timestamp.get_val_for_logservice() <= snapshot_version_barrier) { + int64_t snapshot_version_barrier = gts_scn.convert_to_ts() - max_stale_time; + if (timestamp.convert_to_ts() <= snapshot_version_barrier) { // rule out these ls to avoid too old weak read timestamp need_skip = true; } else { @@ -171,8 +174,9 @@ int ObLSWRSHandler::generate_weak_read_timestamp_(ObLS &ls, const int64_t max_st || REACH_TIME_INTERVAL(10 * 1000 * 1000)) { TRANS_LOG(INFO, "get wrs ts", K(ls_id), "delta", current_us - timestamp.convert_to_ts(), - K(timestamp), - K(min_tx_service_ts)); + "log_service_ts", min_log_service_scn.convert_to_ts(), + "min_tx_service_ts", min_tx_service_ts.convert_to_ts(), + K(timestamp)); // print keep alive info ls.get_keep_alive_ls_handler()->print_stat_info(); } diff --git a/src/storage/tx/wrs/ob_tenant_weak_read_cluster_service.cpp b/src/storage/tx/wrs/ob_tenant_weak_read_cluster_service.cpp index a7c6c1952e..67491bad8c 100644 --- a/src/storage/tx/wrs/ob_tenant_weak_read_cluster_service.cpp +++ b/src/storage/tx/wrs/ob_tenant_weak_read_cluster_service.cpp @@ -20,6 +20,7 @@ #include "storage/tx_storage/ob_ls_service.h" #include "storage/tx_storage/ob_ls_map.h" #include "storage/tx_storage/ob_ls_handle.h" +#include "storage/tx/ob_ts_mgr.h" #include "logservice/ob_log_service.h" #include "ob_tenant_weak_read_cluster_service.h" @@ -340,8 +341,7 @@ int ObTenantWeakReadClusterService::start_service() int64_t leader_epoch = 0; int64_t begin_ts = ObTimeUtility::current_time(); int64_t after_lock_time = 0, begin_query_ts = 0, begin_persist_ts = 0, end_ts = 0; - const int64_t max_stale_time = - ObWeakReadUtil::max_stale_time_for_weak_consistency(MTL_ID()) * 1000; + const int64_t max_stale_time = ObWeakReadUtil::max_stale_time_for_weak_consistency(MTL_ID()); const int64_t tenant_id = MTL_ID(); ISTAT("begin start service", K(tenant_id), K(is_in_service()), K_(can_update_version)); @@ -368,7 +368,7 @@ int ObTenantWeakReadClusterService::start_service() ret = OB_ERR_UNEXPECTED; LOG_ERROR("WRS leader epoch is invalid", KR(ret), K(leader_epoch)); } else { - SCN cur_min_version, cur_max_version; + SCN cur_min_version, cur_max_version, gts_scn; bool record_exist = false; begin_query_ts = ObTimeUtility::current_time(); @@ -376,13 +376,15 @@ int ObTenantWeakReadClusterService::start_service() // query cluster weak read version range in WRS table if (OB_FAIL(query_cluster_version_range_(cur_min_version, cur_max_version, record_exist))) { LOG_WARN("query cluster version range from WRS table fail", KR(ret)); + } else if (OB_FAIL(OB_TS_MGR.get_gts(tenant_id, NULL, gts_scn))) { + LOG_WARN("get gts error", K(ret), K(cur_min_version), K(cur_max_version)); } else { begin_persist_ts = ObTimeUtility::current_time(); // new weak read version delay should smaller than max_stale_time - int64_t new_version = MAX(cur_max_version.get_val_for_gts(), ObTimeUtility::current_time_ns() - max_stale_time); + int64_t new_version = MAX(cur_max_version.convert_to_ts(), gts_scn.convert_to_ts() - max_stale_time); SCN new_version_scn; - if (OB_FAIL(new_version_scn.convert_for_gts(new_version))) { + if (OB_FAIL(new_version_scn.convert_from_ts(new_version))) { LOG_ERROR("convert for gts fail", KR(ret)); } else { SCN new_min_version = new_version_scn; @@ -468,13 +470,13 @@ void ObTenantWeakReadClusterService::get_serve_info(bool &in_service, int64_t &l leader_epoch = leader_epoch_; } -int ObTenantWeakReadClusterService::get_version(SCN &version) const +int ObTenantWeakReadClusterService::get_cluster_version(SCN &version) const { SCN min_version, max_version; - return get_version(version, min_version, max_version); + return get_cluster_version(version, min_version, max_version); } -int ObTenantWeakReadClusterService::get_version(SCN &version, SCN &min_version, +int ObTenantWeakReadClusterService::get_cluster_version(SCN &version, SCN &min_version, SCN &max_version) const { int ret = OB_SUCCESS; @@ -612,7 +614,7 @@ bool ObTenantWeakReadClusterService::check_can_update_version_() return can_update_version_; } -int ObTenantWeakReadClusterService::update_version(int64_t &affected_rows) +int ObTenantWeakReadClusterService::update_cluster_version(int64_t &affected_rows) { int ret = OB_SUCCESS; SCN new_version; @@ -640,8 +642,8 @@ int ObTenantWeakReadClusterService::update_version(int64_t &affected_rows) "tenant_id", MTL_ID(), K(cur_leader_epoch), K(leader_epoch_)); } else if (! check_can_update_version_()) { TRANS_LOG(TRACE, "can not update version"); - } else if (FALSE_IT(new_version = compute_version_(skipped_server_count, need_print))) { - // nothing to do + } else if (OB_FAIL(compute_cluster_version_(skipped_server_count, need_print, new_version))) { + TRANS_LOG(WARN, "compute version error", K(ret), K(skipped_server_count), K(need_print)); } else if (OB_UNLIKELY(!new_version.is_valid())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("invalid CLUSTER weak read version", K(new_version), KR(ret)); @@ -682,13 +684,19 @@ int ObTenantWeakReadClusterService::update_version(int64_t &affected_rows) return ret; } -SCN ObTenantWeakReadClusterService::compute_version_(int64_t &skipped_servers, bool need_print) const +int ObTenantWeakReadClusterService::compute_cluster_version_(int64_t &skipped_servers, bool need_print, SCN &version) const { + int ret = OB_SUCCESS; /// min weak read version delay should not smaller than max_stale_time - SCN base_version = ObWeakReadUtil::generate_min_weak_read_version(MTL_ID()); - // weak read version should increase monotonically - base_version = SCN::max(current_version_, base_version); - return cluster_version_mgr_.get_version(base_version, skipped_servers, need_print); + SCN base_version; + if (OB_FAIL(ObWeakReadUtil::generate_min_weak_read_version(MTL_ID(), base_version))) { + TRANS_LOG(WARN, "generate min weak read version error", K(ret), K(skipped_servers), K(need_print)); + } else { + // weak read version should increase monotonically + base_version = SCN::max(current_version_, base_version); + version = cluster_version_mgr_.get_cluster_version(base_version, skipped_servers, need_print); + } + return ret; } bool ObTenantWeakReadClusterService::is_service_master() const diff --git a/src/storage/tx/wrs/ob_tenant_weak_read_cluster_service.h b/src/storage/tx/wrs/ob_tenant_weak_read_cluster_service.h index ea80aa50f9..9cb5aebd64 100644 --- a/src/storage/tx/wrs/ob_tenant_weak_read_cluster_service.h +++ b/src/storage/tx/wrs/ob_tenant_weak_read_cluster_service.h @@ -80,8 +80,8 @@ public: /// @retval OB_NOT_MASTER self is in service, but not wrs leader, should stop service /// @retval OB_NEED_RETRY wrs not ready,need retry /// @retval OTHER CODE fail - int get_version(share::SCN &version) const; - int get_version(share::SCN &version, share::SCN &min_version, share::SCN &max_version) const; + int get_cluster_version(share::SCN &version) const; + int get_cluster_version(share::SCN &version, share::SCN &min_version, share::SCN &max_version) const; /// update wrs version /// @@ -90,7 +90,7 @@ public: /// @retval OB_NOT_MASTER self is in service, but not wrs leader, should stop service /// @retval OB_NEED_RETRY need retry /// @retval OTHER CODE fail - int update_version(int64_t &affected_rows); + int update_cluster_version(int64_t &affected_rows); bool need_print_skipped_server(); @@ -142,8 +142,8 @@ private: return share::SCN::plus(min_version, WRS_VERSION_GAP_FOR_PERSISTENCE * 1000); } void stop_service_impl_(); - share::SCN compute_version_(int64_t &skipped_servers, bool need_print) const; - int get_version_(share::SCN &version, share::SCN &min_version, share::SCN &max_version) const; + int compute_cluster_version_(int64_t &skipped_servers, bool need_print, share::SCN &new_version) const; + int get_cluster_version_(share::SCN &version, share::SCN &min_version, share::SCN &max_version) const; bool need_force_change_leader_(); int force_change_leader_() const; int verify_candidate_server_(const common::ObAddr &server) const; diff --git a/src/storage/tx/wrs/ob_tenant_weak_read_cluster_version_mgr.cpp b/src/storage/tx/wrs/ob_tenant_weak_read_cluster_version_mgr.cpp index 092a9b3519..18148b16ae 100644 --- a/src/storage/tx/wrs/ob_tenant_weak_read_cluster_version_mgr.cpp +++ b/src/storage/tx/wrs/ob_tenant_weak_read_cluster_version_mgr.cpp @@ -121,7 +121,7 @@ int ObTenantWeakReadClusterVersionMgr::update_server_version(const common::ObAdd return ret; } -SCN ObTenantWeakReadClusterVersionMgr::get_version(const SCN base_version, +SCN ObTenantWeakReadClusterVersionMgr::get_cluster_version(const SCN base_version, int64_t &skip_server_count, const bool force_print) const { @@ -150,7 +150,7 @@ SCN ObTenantWeakReadClusterVersionMgr::get_version(const SCN base_version, min_version = version; } } - LOG_DEBUG("[WEAK_READ_SERVER_VERSION_MGR] get version from server", K_(tenant_id), K(i), + LOG_DEBUG("[WEAK_READ_SERVER_VERSION_MGR] get cluster version from server", K_(tenant_id), K(i), K(need_skip), K(base_version), K(min_version), "server_verion", svr_array_.at(i)); } diff --git a/src/storage/tx/wrs/ob_tenant_weak_read_cluster_version_mgr.h b/src/storage/tx/wrs/ob_tenant_weak_read_cluster_version_mgr.h index 23bba5a10e..1a924daeff 100644 --- a/src/storage/tx/wrs/ob_tenant_weak_read_cluster_version_mgr.h +++ b/src/storage/tx/wrs/ob_tenant_weak_read_cluster_version_mgr.h @@ -56,7 +56,7 @@ public: /// get min server version which not smaller than base_version /// if no statisfied server version, return base_version - share::SCN get_version(const share::SCN base_version, int64_t &skip_server_count, const bool need_print_server_info) const; + share::SCN get_cluster_version(const share::SCN base_version, int64_t &skip_server_count, const bool need_print_server_info) const; // get server count in cluster master cached registered servers int64_t get_server_count() const; diff --git a/src/storage/tx/wrs/ob_tenant_weak_read_service.cpp b/src/storage/tx/wrs/ob_tenant_weak_read_service.cpp index a4a32a0a9c..dfd9262631 100644 --- a/src/storage/tx/wrs/ob_tenant_weak_read_service.cpp +++ b/src/storage/tx/wrs/ob_tenant_weak_read_service.cpp @@ -207,7 +207,7 @@ int ObTenantWeakReadService::get_cluster_version_internal_(SCN &version, int ret = OB_SUCCESS; if (OB_UNLIKELY(! inited_)) { ret = OB_NOT_INIT; - } else if (OB_FAIL(cluster_service_.get_version(version))) { + } else if (OB_FAIL(cluster_service_.get_cluster_version(version))) { LOG_WARN("get weak read cluster version fail", KR(ret), K(version), K(tenant_id_)); if (OB_NEED_RETRY == ret) { // self may be WRS Leader while not ready, need retry @@ -306,11 +306,19 @@ int ObTenantWeakReadService::update_server_version_with_part_info(const int64_t } int ObTenantWeakReadService::generate_server_version(const int64_t epoch_tstamp, - const bool need_print_status) + const bool need_print_status) { - SCN base_version_when_no_valid_partition = ObWeakReadUtil::generate_min_weak_read_version(tenant_id_); - return svr_version_mgr_.generate_new_version(tenant_id_, epoch_tstamp, - base_version_when_no_valid_partition, need_print_status); + int ret = OB_SUCCESS; + SCN base_version_when_no_valid_partition; + if (OB_FAIL(ObWeakReadUtil::generate_min_weak_read_version(tenant_id_, base_version_when_no_valid_partition))) { + TRANS_LOG(WARN, "generate min weak read version error", K(ret), K_(tenant_id)); + } else { + ret = svr_version_mgr_.generate_new_version(tenant_id_, + epoch_tstamp, + base_version_when_no_valid_partition, + need_print_status); + } + return ret; } void ObTenantWeakReadService::get_weak_read_stat(ObTenantWeakReadStat &wrs_stat) const @@ -352,7 +360,7 @@ void ObTenantWeakReadService::get_weak_read_stat(ObTenantWeakReadStat &wrs_stat) wrs_stat.self_ = self_; // cluster info - cluster_service_.get_version(current_cluster_version, min_cluster_version, max_cluster_version); + cluster_service_.get_cluster_version(current_cluster_version, min_cluster_version, max_cluster_version); wrs_stat.cluster_version_ = current_cluster_version; wrs_stat.cluster_version_delta_ = in_service? (cur_tstamp - wrs_stat.cluster_version_.convert_to_ts(ignore_invalid)):0; wrs_stat.min_cluster_version_ = min_cluster_version; @@ -504,7 +512,7 @@ void ObTenantWeakReadService::print_stat_() svr_version_mgr_.get_version(sv); if (in_cluster_service) { - get_cluster_version_err = cluster_service_.get_version(cluster_version, min_cluster_version, + get_cluster_version_err = cluster_service_.get_cluster_version(cluster_version, min_cluster_version, max_cluster_version); } @@ -871,7 +879,7 @@ void ObTenantWeakReadService::generate_cluster_version_() { int ret = OB_SUCCESS; int64_t affected_rows = 0; - if (OB_FAIL(cluster_service_.update_version(affected_rows))) { + if (OB_FAIL(cluster_service_.update_cluster_version(affected_rows))) { bool need_stop_service = false; bool need_self_check = need_force_self_check_(ret, affected_rows, need_stop_service); if (need_self_check) { diff --git a/src/storage/tx/wrs/ob_weak_read_util.cpp b/src/storage/tx/wrs/ob_weak_read_util.cpp index aa4d7b718a..3a32f3a52d 100644 --- a/src/storage/tx/wrs/ob_weak_read_util.cpp +++ b/src/storage/tx/wrs/ob_weak_read_util.cpp @@ -15,6 +15,7 @@ #include #include #include +#include "storage/tx/ob_ts_mgr.h" namespace oceanbase { using namespace common; @@ -44,9 +45,9 @@ int64_t ObWeakReadUtil::replica_keepalive_interval() // 2. all partitions offline // 3. all partitions delay too much or in invalid status // 4. all partitions in migrating and readable snapshot version delay more than 500ms -SCN ObWeakReadUtil::generate_min_weak_read_version(const uint64_t tenant_id) +int ObWeakReadUtil::generate_min_weak_read_version(const uint64_t tenant_id, SCN &scn) { - SCN base_version_when_no_valid_partition; + int ret = OB_SUCCESS; int64_t max_stale_time = 0; bool tenant_config_exist = false; // generating min weak version version should statisfy following constraint @@ -69,15 +70,20 @@ SCN ObWeakReadUtil::generate_min_weak_read_version(const uint64_t tenant_id) ); max_stale_time = std::max(max_stale_time, static_cast(DEFAULT_REPLICA_KEEPALIVE_INTERVAL)); - // the unit of max_stale_time is us,we should change to ns - base_version_when_no_valid_partition.convert_from_ts(ObTimeUtility::current_time() - max_stale_time); + SCN tmp_scn; + if (OB_FAIL(OB_TS_MGR.get_gts(tenant_id, NULL, tmp_scn))) { + TRANS_LOG(WARN, "get gts cache error", K(ret), K(tenant_id)); + } else { + // the unit of max_stale_time is us,we should change to ns + scn.convert_from_ts(tmp_scn.convert_to_ts() - max_stale_time); + } if ((!tenant_config_exist) && REACH_TIME_INTERVAL(1 * 1000 * 1000L)) { TRANS_LOG_RET(WARN, OB_ERR_UNEXPECTED, "tenant not exist when generate min weak read version, use default max stale time instead", - K(tenant_id), K(base_version_when_no_valid_partition), K(lbt())); + K(tenant_id), K(tmp_scn), K(lbt())); } - return base_version_when_no_valid_partition; + return ret; } bool ObWeakReadUtil::enable_monotonic_weak_read(const uint64_t tenant_id) diff --git a/src/storage/tx/wrs/ob_weak_read_util.h b/src/storage/tx/wrs/ob_weak_read_util.h index 5480415bc9..dee954a7eb 100644 --- a/src/storage/tx/wrs/ob_weak_read_util.h +++ b/src/storage/tx/wrs/ob_weak_read_util.h @@ -31,7 +31,7 @@ public: static const int64_t DEFAULT_REPLICA_KEEPALIVE_INTERVAL = 3000 * 1000L; static const int64_t IGNORE_TENANT_EXIST_WARN = 1; static int64_t replica_keepalive_interval(); - static share::SCN generate_min_weak_read_version(const uint64_t tenant_id); + static int generate_min_weak_read_version(const uint64_t tenant_id, share::SCN &scn); static bool enable_monotonic_weak_read(const uint64_t tenant_id); static int64_t max_stale_time_for_weak_consistency(const uint64_t tenant_id, int64_t ignore_warn = 0); static bool check_weak_read_service_available();