diff --git a/src/rootserver/ob_root_service.cpp b/src/rootserver/ob_root_service.cpp index 360e0f0e79..f8a1700767 100644 --- a/src/rootserver/ob_root_service.cpp +++ b/src/rootserver/ob_root_service.cpp @@ -8992,6 +8992,59 @@ int ObRootService::update_stat_cache(const obrpc::ObUpdateStatCacheArg &arg) return ret; } +int ObRootService::check_weak_read_version_refresh_interval(int64_t refresh_interval, bool &valid) +{ + int ret = OB_SUCCESS; + ObSchemaGetterGuard sys_schema_guard; + ObArray tenant_ids; + valid = true; + + if (OB_ISNULL(GCTX.schema_service_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("schema service is null", KR(ret)); + } else if (OB_FAIL(GCTX.schema_service_->get_tenant_schema_guard(OB_SYS_TENANT_ID, sys_schema_guard))) { + LOG_WARN("get sys schema guard failed", KR(ret)); + } else if (OB_FAIL(sys_schema_guard.get_tenant_ids(tenant_ids))) { + LOG_WARN("get tenant ids failed", KR(ret)); + } else { + ObSchemaGetterGuard schema_guard; + const ObSimpleTenantSchema *tenant_schema = NULL; + const ObSysVarSchema *var_schema = NULL; + ObObj obj; + int64_t session_max_stale_time = 0; + uint64_t tenant_id = OB_INVALID_TENANT_ID; + for (int64_t i = 0; OB_SUCC(ret) && valid && i < tenant_ids.count(); i++) { + tenant_id = tenant_ids[i]; + if (OB_FAIL(sys_schema_guard.get_tenant_info(tenant_id, tenant_schema))) { + LOG_WARN("fail to get tenant schema", KR(ret), K(tenant_id)); + } else if (OB_ISNULL(tenant_schema)) { + ret = OB_SUCCESS; + LOG_WARN("tenant schema is null, skip and continue", KR(ret), K(tenant_id)); + } else if (!tenant_schema->is_normal()) { + ret = OB_SUCCESS; + LOG_WARN("tenant schema is not normal, skip and continue", KR(ret), K(tenant_id)); + } else if (OB_FAIL(GCTX.schema_service_->get_tenant_schema_guard(tenant_id, schema_guard))) { + LOG_WARN("get schema guard failed", KR(ret), K(tenant_id)); + } else if (OB_FAIL(schema_guard.get_tenant_system_variable(tenant_id, + OB_SV_MAX_READ_STALE_TIME, var_schema))) { + LOG_WARN("get tenant system variable failed", KR(ret), K(tenant_id)); + } else if (OB_ISNULL(var_schema)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("var schema is null", KR(ret), K(tenant_id)); + } else if (OB_FAIL(var_schema->get_value(NULL, NULL, obj))) { + LOG_WARN("get value failed", KR(ret), K(tenant_id), K(obj)); + } else if (OB_FAIL(obj.get_int(session_max_stale_time))) { + LOG_WARN("get int failed", KR(ret), K(tenant_id), K(obj)); + } else if (refresh_interval > session_max_stale_time) { + valid = false; + LOG_USER_ERROR(OB_INVALID_ARGUMENT, + "weak_read_version_refresh_interval is larger than ob_max_read_stale_time"); + } + } + } + return ret; +} + int ObRootService::set_config_pre_hook(obrpc::ObAdminSetConfigArg &arg) { int ret = OB_SUCCESS; @@ -9022,6 +9075,14 @@ int ObRootService::set_config_pre_hook(obrpc::ObAdminSetConfigArg &arg) LOG_WARN("config invalid", "item", *item, K(ret), K(i), K(item->tenant_ids_.at(i))); } } + } else if (0 == STRCMP(item->name_.ptr(), WEAK_READ_VERSION_REFRESH_INTERVAL)) { + int64_t refresh_interval = ObConfigTimeParser::get(item->value_.ptr(), valid); + if (valid && OB_FAIL(check_weak_read_version_refresh_interval(refresh_interval, valid))) { + LOG_WARN("check refresh interval failed ", KR(ret), K(*item)); + } else if (!valid) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("config invalid", KR(ret), K(*item)); + } } } return ret; diff --git a/src/rootserver/ob_root_service.h b/src/rootserver/ob_root_service.h index 7e4037a020..ac356ad226 100644 --- a/src/rootserver/ob_root_service.h +++ b/src/rootserver/ob_root_service.h @@ -721,6 +721,7 @@ public: int submit_max_availability_mode_task(const common::ObProtectionLevel level, const int64_t cluster_version); int submit_ddl_single_replica_build_task(share::ObAsyncTask &task); + int check_weak_read_version_refresh_interval(int64_t refresh_interval, bool &valid); // may modify arg before taking effect int set_config_pre_hook(obrpc::ObAdminSetConfigArg &arg); // arg is readonly after take effect diff --git a/src/share/config/ob_server_config.h b/src/share/config/ob_server_config.h index 3708537ba4..23665e9db3 100644 --- a/src/share/config/ob_server_config.h +++ b/src/share/config/ob_server_config.h @@ -53,6 +53,7 @@ const char* const CLUSTER_NAME = "cluster"; const char* const FREEZE_TRIGGER_PERCENTAGE = "freeze_trigger_percentage"; const char* const WRITING_THROTTLEIUNG_TRIGGER_PERCENTAGE = "writing_throttling_trigger_percentage"; const char* const COMPATIBLE = "compatible"; +const char* const WEAK_READ_VERSION_REFRESH_INTERVAL = "weak_read_version_refresh_interval"; class ObServerMemoryConfig; class ObServerConfig : public ObCommonConfig diff --git a/src/sql/ob_sql_trans_control.cpp b/src/sql/ob_sql_trans_control.cpp index 9daa01d2ae..c0c318fb3d 100644 --- a/src/sql/ob_sql_trans_control.cpp +++ b/src/sql/ob_sql_trans_control.cpp @@ -636,8 +636,14 @@ int ObSqlTransControl::stmt_setup_snapshot_(ObSQLSessionInfo *session, if (cl == ObConsistencyLevel::WEAK || cl == ObConsistencyLevel::FROZEN) { SCN snapshot_version = SCN::min_scn(); if (OB_FAIL(txs->get_weak_read_snapshot_version(session->get_ob_max_read_stale_time(), - snapshot_version))) { + snapshot_version))) { TRANS_LOG(WARN, "get weak read snapshot fail", KPC(txs)); + int64_t stale_time = session->get_ob_max_read_stale_time(); + int64_t refresh_interval = GCONF.weak_read_version_refresh_interval; + if (refresh_interval > stale_time) { + TRANS_LOG(WARN, "weak_read_version_refresh_interval is larger than ob_max_read_stale_time ", + K(refresh_interval), K(stale_time), KPC(txs)); + } } else { snapshot.init_weak_read(snapshot_version); } diff --git a/src/storage/tx/wrs/ob_black_list.cpp b/src/storage/tx/wrs/ob_black_list.cpp index 5f46cb1f84..39abe79b1c 100644 --- a/src/storage/tx/wrs/ob_black_list.cpp +++ b/src/storage/tx/wrs/ob_black_list.cpp @@ -243,17 +243,19 @@ int ObBLService::do_black_list_check_(sqlclient::ObMySQLResult *result) if (OB_FAIL(get_info_from_result_(*result, bl_key, ls_info))) { TRANS_LOG(WARN, "get_info_from_result_ fail ", KR(ret), K(result)); } else if (LEADER == ls_info.ls_state_) { - // 该日志流是leader,不能加入黑名单 + // cannot add leader into blacklist + } else if (ls_info.weak_read_scn_ == 0 && ls_info.migrate_status_ == OB_MIGRATE_STATUS_NONE) { + // log stream is initializing, should't be put into blacklist } else { max_stale_time = get_tenant_max_stale_time_(bl_key.get_tenant_id()); int64_t max_stale_time_ns = max_stale_time * 1000; if (curr_time_ns > ls_info.weak_read_scn_ + max_stale_time_ns) { - // 时间戳落后,将对应日志流加入黑名单 + // scn is out-of-time,add this log stream into blacklist if (OB_FAIL(ls_bl_mgr_.update(bl_key, ls_info))) { TRANS_LOG(WARN, "ls_bl_mgr_ add fail ", K(bl_key), K(ls_info)); } } else if (curr_time_ns + BLACK_LIST_WHITEWASH_INTERVAL_NS < ls_info.weak_read_scn_ + max_stale_time_ns) { - // 时间戳赶上,将对应日志流从黑名单中移除 + // scn is new enough,remove this log stream in the blacklist ls_bl_mgr_.remove(bl_key); } else { // do nothing