speed up create standby tenant when tenant not create on rs server

This commit is contained in:
obdev
2023-05-18 12:11:35 +00:00
committed by ob-robot
parent d6593b3fea
commit 96cead500e
9 changed files with 209 additions and 69 deletions

View File

@ -14,11 +14,16 @@
#include "ob_create_standby_from_net_actor.h"
#include "share/schema/ob_multi_version_schema_service.h" // ObMultiVersionSchemaService
#include "observer/ob_server_struct.h" // GCTX
#include "observer/ob_service.h" // ObService
#include "share/rc/ob_tenant_base.h" // MTL_ID
#include "rootserver/ob_tenant_info_loader.h" // ObTenantInfoLoader
#include "share/ob_rpc_struct.h" // ObCreateTenantEndArg
#include "rootserver/restore/ob_restore_scheduler.h" //reset_schema_status
#include "rootserver/ob_rs_async_rpc_proxy.h" // ObSwitchSchemaProxy
#include "share/ob_common_rpc_proxy.h" // create_tenant_end
#include "share/ob_schema_status_proxy.h"//ObSchemaStatusProxy
#include "share/ob_rpc_struct.h" // ObBroadcastSchemaArg
#include "share/schema/ob_multi_version_schema_service.h" // for GSCHEMASERVICE
#define STAT(level, fmt, args...) RS_LOG(level, "[NET_STANDBY_TNT_SERVICE] " fmt, ##args)
#define ISTAT(fmt, args...) STAT(INFO, fmt, ##args)
@ -63,6 +68,7 @@ int ObCreateStandbyFromNetActor::init()
WSTAT("failed to start NET_STANDBY_TNT_SERVICE", KR(ret));
} else {
tenant_id_ = MTL_ID();
schema_broadcasted_ = false;
is_inited_ = true;
}
@ -95,6 +101,7 @@ void ObCreateStandbyFromNetActor::destroy()
is_inited_ = false;
tenant_id_ = OB_INVALID_TENANT_ID;
sql_proxy_ = NULL;
schema_broadcasted_ = false;
}
int ObCreateStandbyFromNetActor::check_inner_stat_()
@ -137,6 +144,8 @@ int ObCreateStandbyFromNetActor::check_has_user_ls(const uint64_t tenant_id, com
int ObCreateStandbyFromNetActor::finish_restore_if_possible_()
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
ObLSRecoveryStat recovery_stat;
ObLSRecoveryStatOperator ls_recovery_operator;
rootserver::ObTenantInfoLoader *tenant_info_loader = MTL(rootserver::ObTenantInfoLoader*);
obrpc::ObCreateTenantEndArg arg;
@ -148,7 +157,7 @@ int ObCreateStandbyFromNetActor::finish_restore_if_possible_()
if (OB_ISNULL(rs_rpc_proxy)) {
ret = OB_ERR_UNEXPECTED;
WSTAT("rs_rpc_proxy is null", KP(rs_rpc_proxy));
WSTAT("pointer is null", KP(rs_rpc_proxy));
} else if (OB_FAIL(check_inner_stat_())) {
WSTAT("error unexpected", KR(ret), K(tenant_id_), KP(sql_proxy_));
} else if (OB_FAIL(ls_recovery_operator.get_tenant_min_user_ls_create_scn(tenant_id_, *sql_proxy_,
@ -159,19 +168,38 @@ int ObCreateStandbyFromNetActor::finish_restore_if_possible_()
WSTAT("tenant info loader should not be NULL", KR(ret), KP(tenant_info_loader));
} else {
ISTAT("start to wait whether can finish restore", K_(tenant_id), K(min_user_ls_create_scn));
int64_t retry_cnt_after_sync_user_ls = 0;
// wait 1 minute, sleep 1s and retry 60 times
for (int64_t retry_cnt = 60; OB_SUCC(ret) && retry_cnt > 0 && !has_set_stop(); --retry_cnt) {
if (OB_FAIL(tenant_info_loader->get_tenant_info(tenant_info))) {
bool is_dropped = false;
if (OB_FAIL(GSCHEMASERVICE.check_if_tenant_has_been_dropped(tenant_id_, is_dropped))) {
LOG_WARN("tenant has been dropped", KR(ret), K_(tenant_id));
} else if (is_dropped) {
ret = OB_TENANT_HAS_BEEN_DROPPED;
LOG_WARN("tenant has been dropped", KR(ret), K_(tenant_id));
} else if (OB_FAIL(tenant_info_loader->get_tenant_info(tenant_info))) {
WSTAT("failed to get tenant info", KR(ret));
} else if (tenant_info.get_standby_scn() >= min_user_ls_create_scn) {
ISTAT("tenant readable scn can read inner table", K(tenant_info), K(min_user_ls_create_scn));
retry_cnt_after_sync_user_ls++;
ISTAT("tenant readable scn can read inner table", K(tenant_info), K(min_user_ls_create_scn),
K(retry_cnt_after_sync_user_ls));
if (OB_FAIL(ObRestoreService::reset_schema_status(tenant_id_, sql_proxy_))) {
WSTAT("failed to reset schema status", KR(ret), K_(tenant_id), K(tenant_info), K(min_user_ls_create_scn));
bool is_refreshed = false;
if (OB_FAIL(GSCHEMASERVICE.check_if_tenant_schema_has_been_refreshed(tenant_id_, is_refreshed))) {
LOG_WARN("fail to check tenant schema has been refreshed", KR(ret), K_(tenant_id), K(retry_cnt_after_sync_user_ls));
} else if (!is_refreshed || !schema_broadcasted_) {
if (50 < retry_cnt_after_sync_user_ls) {
WSTAT("schema has not refreshed", KR(ret), KR(tmp_ret), K(is_refreshed),
K_(schema_broadcasted), K(retry_cnt_after_sync_user_ls));
}
if (OB_TMP_FAIL(refresh_schema_())) {
WSTAT("failed to refresh schema", KR(ret), KR(tmp_ret), K(is_refreshed), K_(schema_broadcasted),
K(retry_cnt_after_sync_user_ls));
}
} else if (OB_FAIL(rs_rpc_proxy->create_tenant_end(arg))) {
WSTAT("fail to execute create tenant end", KR(ret), K_(tenant_id), K(arg));
WSTAT("fail to execute create tenant end", KR(ret), K_(tenant_id), K(arg), K(retry_cnt_after_sync_user_ls));
} else {
ISTAT("execute create_tenant_end", KR(ret), K_(tenant_id), K(arg));
ISTAT("execute create_tenant_end", KR(ret), K_(tenant_id), K(arg), K(retry_cnt_after_sync_user_ls));
break;
}
}
@ -184,6 +212,44 @@ int ObCreateStandbyFromNetActor::finish_restore_if_possible_()
return ret;
}
int ObCreateStandbyFromNetActor::refresh_schema_()
{
int ret = OB_SUCCESS;
observer::ObService *ob_service = GCTX.ob_service_;
int tmp_ret = OB_SUCCESS;
ObSchemaStatusProxy *schema_status_proxy = GCTX.schema_status_proxy_;
ObRefreshSchemaStatus refresh_schema_status;
if (OB_ISNULL(ob_service) || OB_ISNULL(schema_status_proxy)) {
ret = OB_ERR_UNEXPECTED;
WSTAT("pointer is null", KP(ob_service), KP(schema_status_proxy));
} else if (OB_FAIL(check_inner_stat_())) {
WSTAT("error unexpected", KR(ret), K(tenant_id_), KP(sql_proxy_));
} else if (OB_FAIL(schema_status_proxy->get_refresh_schema_status(tenant_id_, refresh_schema_status))) {
LOG_WARN("fail to get refresh schema status", KR(ret), K_(tenant_id));
} else if (refresh_schema_status.snapshot_timestamp_ == 0) {
if (OB_FAIL(ObRestoreService::reset_schema_status(tenant_id_, sql_proxy_))) {
WSTAT("failed to reset schema status", KR(ret), K_(tenant_id));
}
}
if (OB_SUCC(ret) && !schema_broadcasted_) {
obrpc::ObBroadcastSchemaArg arg;
arg.tenant_id_ = tenant_id_;
if (OB_ISNULL(GCTX.rs_rpc_proxy_) || OB_ISNULL(GCTX.rs_mgr_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("pointer is null", KR(ret), KP(GCTX.rs_mgr_), KP(GCTX.rs_rpc_proxy_));
} else if (OB_FAIL(GCTX.rs_rpc_proxy_->to_rs(*GCTX.rs_mgr_).broadcast_schema(arg))) {
LOG_WARN("failed to broadcast schema", KR(ret), K(arg));
} else {
schema_broadcasted_ = true;
}
}
ISTAT("refresh_schema finished", KR(ret), K_(tenant_id), K_(schema_broadcasted), K(refresh_schema_status));
return ret;
}
int ObCreateStandbyFromNetActor::do_creating_standby_tenant()
{
int ret = OB_SUCCESS;

View File

@ -35,7 +35,7 @@ class ObCreateStandbyFromNetActor : public ObTenantThreadHelper
{
public:
ObCreateStandbyFromNetActor() : is_inited_(false),
tenant_id_(OB_INVALID_TENANT_ID), sql_proxy_(NULL), idle_time_(DEFAULT_IDLE_TIME) {}
tenant_id_(OB_INVALID_TENANT_ID), sql_proxy_(NULL), schema_broadcasted_(false), idle_time_(DEFAULT_IDLE_TIME) {}
virtual ~ObCreateStandbyFromNetActor() {}
int init();
void destroy();
@ -50,17 +50,19 @@ private:
int finish_restore_if_possible_();
int64_t get_idle_interval_us_() { return ATOMIC_LOAD(&idle_time_); }
int set_idle_interval_us_(const int64_t idle_time);
int refresh_schema_();
const static int64_t DEFAULT_IDLE_TIME = 1000 * 1000; // 1s
const static int64_t MAX_IDLE_TIME = 3600L * 1000 * 1000; // 3600s
public:
TO_STRING_KV(K_(is_inited), K_(tenant_id), KP_(sql_proxy), K_(idle_time));
TO_STRING_KV(K_(is_inited), K_(tenant_id), KP_(sql_proxy), K_(schema_broadcasted), K_(idle_time));
private:
bool is_inited_;
uint64_t tenant_id_;
common::ObMySQLProxy *sql_proxy_;
bool schema_broadcasted_;
int64_t idle_time_;
};

View File

@ -21558,8 +21558,13 @@ int ObDDLService::create_tenant_end(const uint64_t tenant_id)
ObDDLSQLTransaction trans(schema_service_, true, false, false, false);
DEBUG_SYNC(BEFORE_CREATE_TENANT_END);
ObTenantSchema new_tenant_schema;
ObSchemaStatusProxy *schema_status_proxy = GCTX.schema_status_proxy_;
ObRefreshSchemaStatus schema_status;
if (OB_FAIL(check_inner_stat())) {
LOG_WARN("variable is not init", KR(ret));
} else if (OB_ISNULL(schema_status_proxy)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get invalid schema status proxy", KR(ret));
} else if (OB_FAIL(ObAllTenantInfoProxy::load_tenant_info(
tenant_id, sql_proxy_, false, tenant_info))) {
LOG_WARN("failed to load tenant info", KR(ret), K(tenant_id));
@ -21569,6 +21574,14 @@ int ObDDLService::create_tenant_end(const uint64_t tenant_id)
} else if (OB_ISNULL(schema_service_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("schema_service is null", K(ret), KP_(schema_service));
/*
After the inner-table is synchronized by the network standby tenant, the schema refresh switch
is turned on, but standby tenant may not be in the same observer with RS, causing RS to use the
old cache when creating tenant end, which may cause create tenant end to fail.
So here, force trigger schema refresh refresh cache
*/
} else if (OB_FAIL(schema_status_proxy->load_refresh_schema_status(tenant_id, schema_status))) {
LOG_WARN("fail to load refresh schema status", KR(ret), K(tenant_id));
} else if (OB_FAIL(get_tenant_schema_guard_with_version_in_inner_table(OB_SYS_TENANT_ID, schema_guard))) {
LOG_WARN("fail to get schema guard with version in inner table", K(ret));
} else if (OB_FAIL(get_tenant_schema_guard_with_version_in_inner_table(tenant_id, schema_guard))) {

View File

@ -466,6 +466,9 @@ int ObAllTenantInfoCache::refresh_tenant_info(const uint64_t tenant_id,
} else if (OB_FAIL(ObAllTenantInfoProxy::load_tenant_info(tenant_id,
sql_proxy, false /* for_update */, ora_rowscn, new_tenant_info))) {
LOG_WARN("failed to load tenant info", KR(ret), K(tenant_id));
} else if (INT64_MAX == ora_rowscn || 0 == ora_rowscn) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("invalid ora_rowscn", KR(ret), K(ora_rowscn), K(tenant_id), K(new_tenant_info), K(lbt()));
} else {
/**
* Only need to refer to tenant role, no need to refer to switchover status.
@ -487,7 +490,7 @@ int ObAllTenantInfoCache::refresh_tenant_info(const uint64_t tenant_id,
last_sql_update_time_ = new_refresh_time_us;
} else {
ret = OB_EAGAIN;
LOG_WARN("refresh tenant info conflict", K(new_tenant_info), K(new_refresh_time_us),
LOG_WARN("refresh tenant info conflict", KR(ret), K(new_tenant_info), K(new_refresh_time_us),
K(tenant_id), K(tenant_info_), K(last_sql_update_time_), K(ora_rowscn_), K(ora_rowscn));
}
}
@ -507,7 +510,7 @@ int ObAllTenantInfoCache::update_tenant_info_cache(
{
int ret = OB_SUCCESS;
refreshed = false;
if (!new_tenant_info.is_valid() || 0 == new_ora_rowscn) {
if (!new_tenant_info.is_valid() || 0 == new_ora_rowscn || INT64_MAX == new_ora_rowscn) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KR(ret), K(new_tenant_info), K(new_ora_rowscn));
} else {

View File

@ -195,7 +195,7 @@ int ObBackupConfigParserGenerator::set_restore_source_type_(const common::ObSqlS
} else if ((is_location && is_service) || (!is_location && !is_service)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("fail to parse restore source type", K(ret), K(value));
LOG_USER_ERROR(OB_INVALID_ARGUMENT, "parse log restore source type");
LOG_USER_ERROR(OB_INVALID_ARGUMENT, "log restore source type");
} else if (is_location) {
restore_source_type_ = share::ObLogRestoreSourceType::LOCATION;
} else if (is_service) {

View File

@ -112,7 +112,6 @@ int ObSchemaStatusProxy::get_refresh_schema_status(
ObRefreshSchemaStatus &refresh_schema_status)
{
int ret = OB_SUCCESS;
ObMySQLTransaction trans;
refresh_schema_status.reset();
if (OB_FAIL(check_inner_stat())) {
LOG_WARN("check inner stat failed", K(ret));
@ -122,12 +121,32 @@ int ObSchemaStatusProxy::get_refresh_schema_status(
} else if (OB_FAIL(schema_status_cache_.get_refactored(refresh_tenant_id, refresh_schema_status))) {
if (OB_HASH_NOT_EXIST != ret) {
LOG_WARN("fail to get schema_status from cache", K(ret), K(refresh_tenant_id));
} else if (OB_FAIL(trans.start(&sql_proxy_, OB_SYS_TENANT_ID))) { //overwrite ret
LOG_WARN("fail to start", K(ret));
} else if (OB_FAIL(load_refresh_schema_status(refresh_tenant_id, refresh_schema_status))) { //overwrite ret
LOG_WARN("fail to load_refresh_schema_status", KR(ret), K(refresh_tenant_id));
}
}
return ret;
}
int ObSchemaStatusProxy::load_refresh_schema_status(
const uint64_t refresh_tenant_id,
ObRefreshSchemaStatus &refresh_schema_status)
{
int ret = OB_SUCCESS;
ObMySQLTransaction trans;
refresh_schema_status.reset();
if (OB_FAIL(check_inner_stat())) {
LOG_WARN("check inner stat failed", KR(ret));
} else if (OB_INVALID_TENANT_ID == refresh_tenant_id) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid tenant_id", KR(ret), K(refresh_tenant_id));
} else if (OB_FAIL(trans.start(&sql_proxy_, OB_SYS_TENANT_ID))) {
LOG_WARN("fail to start trans", KR(ret));
} else {
ObCoreTableProxy core_table(OB_ALL_SCHEMA_STATUS_TNAME, trans, OB_SYS_TENANT_ID);
if (OB_FAIL(core_table.load())) {
LOG_WARN("fail to load core table", K(ret));
LOG_WARN("fail to load core table", KR(ret));
} else {
uint64_t tenant_id = OB_INVALID_TENANT_ID;
int64_t snapshot_timestamp = OB_INVALID_TIMESTAMP;
@ -139,14 +158,14 @@ int ObSchemaStatusProxy::get_refresh_schema_status(
ret = OB_SUCCESS;
break;
} else {
LOG_WARN("fail to next", K(ret));
LOG_WARN("fail to next", KR(ret));
}
} else if (OB_FAIL(core_table.get_uint(TENANT_ID_CNAME, tenant_id))) {
LOG_WARN("fail to get int", K(ret));
LOG_WARN("fail to get int", KR(ret));
} else if (OB_FAIL(core_table.get_int(SNAPSHOT_TIMESTAMP_CNAME, snapshot_timestamp))) {
LOG_WARN("fail to get int", K(ret));
LOG_WARN("fail to get int", KR(ret));
} else if (OB_FAIL(core_table.get_int(READABLE_SCHEMA_VERSION_CNAME, readable_schema_version))) {
LOG_WARN("fail to get int", K(ret));
LOG_WARN("fail to get int", KR(ret));
}
if (OB_FAIL(ret)) {
} else if (refresh_tenant_id == tenant_id) {
@ -160,18 +179,18 @@ int ObSchemaStatusProxy::get_refresh_schema_status(
if (OB_SUCC(ret)) {
if (!exist) {
ret = OB_ENTRY_NOT_EXIST;
LOG_WARN("tenant refresh schema status not exist", K(ret), K(refresh_tenant_id));
LOG_WARN("tenant refresh schema status not exist", KR(ret), K(refresh_tenant_id));
} else {
ObSchemaStatusUpdater updater(refresh_schema_status);
if (OB_FAIL(schema_status_cache_.atomic_refactored(refresh_tenant_id, updater))) {
if (OB_HASH_NOT_EXIST != ret) {
LOG_WARN("fail to update schema_status", K(ret), K(refresh_tenant_id), K(refresh_schema_status));
LOG_WARN("fail to update schema_status", KR(ret), K(refresh_tenant_id), K(refresh_schema_status));
} else if (OB_FAIL(schema_status_cache_.set_refactored(refresh_tenant_id, refresh_schema_status))) {
if (OB_HASH_EXIST == ret) {
LOG_WARN("concurrent set, just ignore", K(ret), K(refresh_tenant_id), K(refresh_schema_status));
LOG_WARN("concurrent set, just ignore", KR(ret), K(refresh_tenant_id), K(refresh_schema_status));
ret = OB_SUCCESS;
} else {
LOG_WARN("fail to set schema_status", K(ret), K(refresh_tenant_id), K(refresh_schema_status));
LOG_WARN("fail to set schema_status", KR(ret), K(refresh_tenant_id), K(refresh_schema_status));
}
}
}
@ -179,17 +198,17 @@ int ObSchemaStatusProxy::get_refresh_schema_status(
}
}
}
if (trans.is_started()) {
bool is_commit = (OB_SUCCESS == ret);
int tmp_ret = trans.end(is_commit);
if (OB_SUCCESS != tmp_ret) {
LOG_WARN("fail to commit transaction", K(tmp_ret), K(ret), K(is_commit));
LOG_WARN("fail to commit transaction", KR(tmp_ret), KR(ret), K(is_commit));
if (OB_SUCC(ret)) {
ret = tmp_ret;
}
}
}
}
return ret;
}

View File

@ -86,6 +86,9 @@ public:
const share::schema::ObRefreshSchemaStatus &refresh_schema_status);
int update_schema_status(const share::schema::ObRefreshSchemaStatus &cur_schema_status);
int load_refresh_schema_status(
const uint64_t refresh_tenant_id,
schema::ObRefreshSchemaStatus &refresh_schema_status);
private:
int check_inner_stat();

View File

@ -1998,6 +1998,37 @@ int ObMultiVersionSchemaService::check_if_tenant_has_been_dropped(
return ret;
}
int ObMultiVersionSchemaService::check_if_tenant_schema_has_been_refreshed(
const uint64_t tenant_id,
bool &is_refreshed)
{
int ret = OB_SUCCESS;
is_refreshed = false;
bool sys_schema_not_full = false;
bool tenant_schema_not_full = false;
if (!check_inner_stat()) {
ret = OB_INNER_STAT_ERROR;
LOG_WARN("inner stat error", KR(ret));
} else if (OB_INVALID_TENANT_ID == tenant_id) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid tenant_id", KR(ret), K(tenant_id));
} else if (OB_FAIL(refresh_full_schema_map_.get_refactored(OB_SYS_TENANT_ID, sys_schema_not_full))) {
} else if (sys_schema_not_full) {
// observer may be not start service
is_refreshed = false;
} else if (OB_FAIL(refresh_full_schema_map_.get_refactored(tenant_id, tenant_schema_not_full))) {
} else if (tenant_schema_not_full) {
is_refreshed = false;
} else {
// tenant's schema is full
is_refreshed = true;
}
LOG_TRACE("check if tenant schema has been refreshed", KR(ret), K(sys_schema_not_full), K(tenant_schema_not_full), K(is_refreshed), K(tenant_id));
return ret;
}
int ObMultiVersionSchemaService::check_is_creating_standby_tenant(
const uint64_t tenant_id,
bool &is_creating_standby)

View File

@ -264,6 +264,9 @@ public:
int check_if_tenant_has_been_dropped(
const uint64_t tenant_id,
bool &is_dropped);
int check_if_tenant_schema_has_been_refreshed(
const uint64_t tenant_id,
bool &is_refreshed);
int check_is_creating_standby_tenant(
const uint64_t tenant_id,
bool &is_creating_standby);