check ob_max_read_stale_time when set weak_read_version_refresh_interval & blacklist logic optimization
This commit is contained in:
@ -8992,6 +8992,59 @@ int ObRootService::update_stat_cache(const obrpc::ObUpdateStatCacheArg &arg)
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int ObRootService::check_weak_read_version_refresh_interval(int64_t refresh_interval, bool &valid)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
ObSchemaGetterGuard sys_schema_guard;
|
||||||
|
ObArray<uint64_t> tenant_ids;
|
||||||
|
valid = true;
|
||||||
|
|
||||||
|
if (OB_ISNULL(GCTX.schema_service_)) {
|
||||||
|
ret = OB_ERR_UNEXPECTED;
|
||||||
|
LOG_WARN("schema service is null", KR(ret));
|
||||||
|
} else if (OB_FAIL(GCTX.schema_service_->get_tenant_schema_guard(OB_SYS_TENANT_ID, sys_schema_guard))) {
|
||||||
|
LOG_WARN("get sys schema guard failed", KR(ret));
|
||||||
|
} else if (OB_FAIL(sys_schema_guard.get_tenant_ids(tenant_ids))) {
|
||||||
|
LOG_WARN("get tenant ids failed", KR(ret));
|
||||||
|
} else {
|
||||||
|
ObSchemaGetterGuard schema_guard;
|
||||||
|
const ObSimpleTenantSchema *tenant_schema = NULL;
|
||||||
|
const ObSysVarSchema *var_schema = NULL;
|
||||||
|
ObObj obj;
|
||||||
|
int64_t session_max_stale_time = 0;
|
||||||
|
uint64_t tenant_id = OB_INVALID_TENANT_ID;
|
||||||
|
for (int64_t i = 0; OB_SUCC(ret) && valid && i < tenant_ids.count(); i++) {
|
||||||
|
tenant_id = tenant_ids[i];
|
||||||
|
if (OB_FAIL(sys_schema_guard.get_tenant_info(tenant_id, tenant_schema))) {
|
||||||
|
LOG_WARN("fail to get tenant schema", KR(ret), K(tenant_id));
|
||||||
|
} else if (OB_ISNULL(tenant_schema)) {
|
||||||
|
ret = OB_SUCCESS;
|
||||||
|
LOG_WARN("tenant schema is null, skip and continue", KR(ret), K(tenant_id));
|
||||||
|
} else if (!tenant_schema->is_normal()) {
|
||||||
|
ret = OB_SUCCESS;
|
||||||
|
LOG_WARN("tenant schema is not normal, skip and continue", KR(ret), K(tenant_id));
|
||||||
|
} else if (OB_FAIL(GCTX.schema_service_->get_tenant_schema_guard(tenant_id, schema_guard))) {
|
||||||
|
LOG_WARN("get schema guard failed", KR(ret), K(tenant_id));
|
||||||
|
} else if (OB_FAIL(schema_guard.get_tenant_system_variable(tenant_id,
|
||||||
|
OB_SV_MAX_READ_STALE_TIME, var_schema))) {
|
||||||
|
LOG_WARN("get tenant system variable failed", KR(ret), K(tenant_id));
|
||||||
|
} else if (OB_ISNULL(var_schema)) {
|
||||||
|
ret = OB_ERR_UNEXPECTED;
|
||||||
|
LOG_WARN("var schema is null", KR(ret), K(tenant_id));
|
||||||
|
} else if (OB_FAIL(var_schema->get_value(NULL, NULL, obj))) {
|
||||||
|
LOG_WARN("get value failed", KR(ret), K(tenant_id), K(obj));
|
||||||
|
} else if (OB_FAIL(obj.get_int(session_max_stale_time))) {
|
||||||
|
LOG_WARN("get int failed", KR(ret), K(tenant_id), K(obj));
|
||||||
|
} else if (refresh_interval > session_max_stale_time) {
|
||||||
|
valid = false;
|
||||||
|
LOG_USER_ERROR(OB_INVALID_ARGUMENT,
|
||||||
|
"weak_read_version_refresh_interval is larger than ob_max_read_stale_time");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
int ObRootService::set_config_pre_hook(obrpc::ObAdminSetConfigArg &arg)
|
int ObRootService::set_config_pre_hook(obrpc::ObAdminSetConfigArg &arg)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
@ -9022,6 +9075,14 @@ int ObRootService::set_config_pre_hook(obrpc::ObAdminSetConfigArg &arg)
|
|||||||
LOG_WARN("config invalid", "item", *item, K(ret), K(i), K(item->tenant_ids_.at(i)));
|
LOG_WARN("config invalid", "item", *item, K(ret), K(i), K(item->tenant_ids_.at(i)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} else if (0 == STRCMP(item->name_.ptr(), WEAK_READ_VERSION_REFRESH_INTERVAL)) {
|
||||||
|
int64_t refresh_interval = ObConfigTimeParser::get(item->value_.ptr(), valid);
|
||||||
|
if (valid && OB_FAIL(check_weak_read_version_refresh_interval(refresh_interval, valid))) {
|
||||||
|
LOG_WARN("check refresh interval failed ", KR(ret), K(*item));
|
||||||
|
} else if (!valid) {
|
||||||
|
ret = OB_INVALID_ARGUMENT;
|
||||||
|
LOG_WARN("config invalid", KR(ret), K(*item));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
|
|||||||
@ -721,6 +721,7 @@ public:
|
|||||||
int submit_max_availability_mode_task(const common::ObProtectionLevel level, const int64_t cluster_version);
|
int submit_max_availability_mode_task(const common::ObProtectionLevel level, const int64_t cluster_version);
|
||||||
|
|
||||||
int submit_ddl_single_replica_build_task(share::ObAsyncTask &task);
|
int submit_ddl_single_replica_build_task(share::ObAsyncTask &task);
|
||||||
|
int check_weak_read_version_refresh_interval(int64_t refresh_interval, bool &valid);
|
||||||
// may modify arg before taking effect
|
// may modify arg before taking effect
|
||||||
int set_config_pre_hook(obrpc::ObAdminSetConfigArg &arg);
|
int set_config_pre_hook(obrpc::ObAdminSetConfigArg &arg);
|
||||||
// arg is readonly after take effect
|
// arg is readonly after take effect
|
||||||
|
|||||||
@ -53,6 +53,7 @@ const char* const CLUSTER_NAME = "cluster";
|
|||||||
const char* const FREEZE_TRIGGER_PERCENTAGE = "freeze_trigger_percentage";
|
const char* const FREEZE_TRIGGER_PERCENTAGE = "freeze_trigger_percentage";
|
||||||
const char* const WRITING_THROTTLEIUNG_TRIGGER_PERCENTAGE = "writing_throttling_trigger_percentage";
|
const char* const WRITING_THROTTLEIUNG_TRIGGER_PERCENTAGE = "writing_throttling_trigger_percentage";
|
||||||
const char* const COMPATIBLE = "compatible";
|
const char* const COMPATIBLE = "compatible";
|
||||||
|
const char* const WEAK_READ_VERSION_REFRESH_INTERVAL = "weak_read_version_refresh_interval";
|
||||||
class ObServerMemoryConfig;
|
class ObServerMemoryConfig;
|
||||||
|
|
||||||
class ObServerConfig : public ObCommonConfig
|
class ObServerConfig : public ObCommonConfig
|
||||||
|
|||||||
@ -638,6 +638,12 @@ int ObSqlTransControl::stmt_setup_snapshot_(ObSQLSessionInfo *session,
|
|||||||
if (OB_FAIL(txs->get_weak_read_snapshot_version(session->get_ob_max_read_stale_time(),
|
if (OB_FAIL(txs->get_weak_read_snapshot_version(session->get_ob_max_read_stale_time(),
|
||||||
snapshot_version))) {
|
snapshot_version))) {
|
||||||
TRANS_LOG(WARN, "get weak read snapshot fail", KPC(txs));
|
TRANS_LOG(WARN, "get weak read snapshot fail", KPC(txs));
|
||||||
|
int64_t stale_time = session->get_ob_max_read_stale_time();
|
||||||
|
int64_t refresh_interval = GCONF.weak_read_version_refresh_interval;
|
||||||
|
if (refresh_interval > stale_time) {
|
||||||
|
TRANS_LOG(WARN, "weak_read_version_refresh_interval is larger than ob_max_read_stale_time ",
|
||||||
|
K(refresh_interval), K(stale_time), KPC(txs));
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
snapshot.init_weak_read(snapshot_version);
|
snapshot.init_weak_read(snapshot_version);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -243,17 +243,19 @@ int ObBLService::do_black_list_check_(sqlclient::ObMySQLResult *result)
|
|||||||
if (OB_FAIL(get_info_from_result_(*result, bl_key, ls_info))) {
|
if (OB_FAIL(get_info_from_result_(*result, bl_key, ls_info))) {
|
||||||
TRANS_LOG(WARN, "get_info_from_result_ fail ", KR(ret), K(result));
|
TRANS_LOG(WARN, "get_info_from_result_ fail ", KR(ret), K(result));
|
||||||
} else if (LEADER == ls_info.ls_state_) {
|
} else if (LEADER == ls_info.ls_state_) {
|
||||||
// 该日志流是leader,不能加入黑名单
|
// cannot add leader into blacklist
|
||||||
|
} else if (ls_info.weak_read_scn_ == 0 && ls_info.migrate_status_ == OB_MIGRATE_STATUS_NONE) {
|
||||||
|
// log stream is initializing, should't be put into blacklist
|
||||||
} else {
|
} else {
|
||||||
max_stale_time = get_tenant_max_stale_time_(bl_key.get_tenant_id());
|
max_stale_time = get_tenant_max_stale_time_(bl_key.get_tenant_id());
|
||||||
int64_t max_stale_time_ns = max_stale_time * 1000;
|
int64_t max_stale_time_ns = max_stale_time * 1000;
|
||||||
if (curr_time_ns > ls_info.weak_read_scn_ + max_stale_time_ns) {
|
if (curr_time_ns > ls_info.weak_read_scn_ + max_stale_time_ns) {
|
||||||
// 时间戳落后,将对应日志流加入黑名单
|
// scn is out-of-time,add this log stream into blacklist
|
||||||
if (OB_FAIL(ls_bl_mgr_.update(bl_key, ls_info))) {
|
if (OB_FAIL(ls_bl_mgr_.update(bl_key, ls_info))) {
|
||||||
TRANS_LOG(WARN, "ls_bl_mgr_ add fail ", K(bl_key), K(ls_info));
|
TRANS_LOG(WARN, "ls_bl_mgr_ add fail ", K(bl_key), K(ls_info));
|
||||||
}
|
}
|
||||||
} else if (curr_time_ns + BLACK_LIST_WHITEWASH_INTERVAL_NS < ls_info.weak_read_scn_ + max_stale_time_ns) {
|
} else if (curr_time_ns + BLACK_LIST_WHITEWASH_INTERVAL_NS < ls_info.weak_read_scn_ + max_stale_time_ns) {
|
||||||
// 时间戳赶上,将对应日志流从黑名单中移除
|
// scn is new enough,remove this log stream in the blacklist
|
||||||
ls_bl_mgr_.remove(bl_key);
|
ls_bl_mgr_.remove(bl_key);
|
||||||
} else {
|
} else {
|
||||||
// do nothing
|
// do nothing
|
||||||
|
|||||||
Reference in New Issue
Block a user