diff --git a/src/observer/omt/ob_multi_tenant.cpp b/src/observer/omt/ob_multi_tenant.cpp index 4b83a0cf9..38cf696b8 100644 --- a/src/observer/omt/ob_multi_tenant.cpp +++ b/src/observer/omt/ob_multi_tenant.cpp @@ -86,6 +86,7 @@ #include "observer/report/ob_tenant_meta_checker.h" #include "storage/high_availability/ob_storage_ha_service.h" #include "rootserver/ob_tenant_recovery_reportor.h"//ObTenantRecoveryReportor +#include "rootserver/ob_standby_schema_refresh_trigger.h"//ObStandbySchemaRefreshTrigger #include "rootserver/ob_tenant_info_loader.h"//ObTenantInfoLoader #include "rootserver/ob_primary_ls_service.h"//ObLSService #include "rootserver/ob_recovery_ls_service.h"//ObRecoveryLSService @@ -401,6 +402,7 @@ int ObMultiTenant::init(ObAddr myaddr, MTL_BIND2(mtl_new_default, rootserver::ObRestoreMajorFreezeService::mtl_init, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default); MTL_BIND2(mtl_new_default, ObTenantMetaChecker::mtl_init, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default); MTL_BIND2(mtl_new_default, rootserver::ObTenantRecoveryReportor::mtl_init, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default); + MTL_BIND2(mtl_new_default, rootserver::ObStandbySchemaRefreshTrigger::mtl_init, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default); MTL_BIND2(mtl_new_default, rootserver::ObTenantInfoLoader::mtl_init, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default); MTL_BIND2(mtl_new_default, rootserver::ObPrimaryLSService::mtl_init, nullptr, rootserver::ObPrimaryLSService::mtl_stop, rootserver::ObPrimaryLSService::mtl_wait, mtl_destroy_default); MTL_BIND2(mtl_new_default, rootserver::ObRecoveryLSService::mtl_init, nullptr, rootserver::ObRecoveryLSService::mtl_stop, rootserver::ObRecoveryLSService::mtl_wait, mtl_destroy_default); diff --git a/src/rootserver/CMakeLists.txt b/src/rootserver/CMakeLists.txt index a1138982a..2d7fc13db 100644 --- a/src/rootserver/CMakeLists.txt +++ b/src/rootserver/CMakeLists.txt @@ -67,6 +67,7 @@ ob_set_subtarget(ob_rootserver common ob_zone_manager.cpp ob_zone_unit_provider.cpp ob_tenant_recovery_reportor.cpp + ob_standby_schema_refresh_trigger.cpp ob_tenant_info_loader.cpp ob_primary_ls_service.cpp ob_recovery_ls_service.cpp diff --git a/src/rootserver/ob_standby_schema_refresh_trigger.cpp b/src/rootserver/ob_standby_schema_refresh_trigger.cpp new file mode 100644 index 000000000..6c815c415 --- /dev/null +++ b/src/rootserver/ob_standby_schema_refresh_trigger.cpp @@ -0,0 +1,146 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#define USING_LOG_PREFIX RS +#include "ob_standby_schema_refresh_trigger.h" +#include "observer/ob_server_struct.h" // GCTX +#include "observer/ob_service.h" // ObService +#include "share/rc/ob_tenant_base.h" // MTL +#include "share/ob_schema_status_proxy.h" // ObSchemaStatusProxy +#include "rootserver/ob_tenant_info_loader.h" // ObTenantInfoLoader + +#define STAT(level, fmt, args...) RS_LOG(level, "[STANDBY_SCHEMA_REFRESH_TRIGGER] " fmt, ##args) +#define ISTAT(fmt, args...) STAT(INFO, fmt, ##args) +#define WSTAT(fmt, args...) STAT(WARN, fmt, ##args) +#define TSTAT(fmt, args...) STAT(TRACE, fmt, ##args) + +namespace oceanbase +{ +using namespace share; +using namespace common; +namespace rootserver +{ +int ObStandbySchemaRefreshTrigger::init() +{ + int ret = OB_SUCCESS; + sql_proxy_ = GCTX.sql_proxy_; + + if (IS_INIT) { + ret = OB_INIT_TWICE; + WSTAT("init twice", KR(ret)); + } else if (OB_ISNULL(sql_proxy_)) { + ret = OB_ERR_UNEXPECTED; + WSTAT("sql proxy is null", KR(ret)); + } else if (OB_FAIL(ObTenantThreadHelper::create("StandbySchem", + lib::TGDefIDs::SimpleLSService, *this))) { + WSTAT("failed to create STANDBY_SCHEMA_REFRESH_TRIGGER", KR(ret)); + } else if (OB_FAIL(ObTenantThreadHelper::start())) { + WSTAT("failed to start STANDBY_SCHEMA_REFRESH_TRIGGER", KR(ret)); + } else { + tenant_id_ = MTL_ID(); + is_inited_ = true; + } + + return ret; +} + +void ObStandbySchemaRefreshTrigger::do_work() +{ + ISTAT("standby schema refresh trigger start"); + int ret = OB_SUCCESS; + if (OB_FAIL(check_inner_stat_())) { + WSTAT("inner stat error", KR(ret), K_(is_inited)); + } else { + while (!has_set_stop()) { + ObCurTraceId::init(GCONF.self_addr_); + if (!is_user_tenant(tenant_id_)) { + } else if (OB_FAIL(submit_tenant_refresh_schema_task_())) { + WSTAT("submit_tenant_refresh_schema_task_ failed", KR(ret)); + } + + ISTAT("finish one round", KR(ret)); + idle(DEFAULT_IDLE_TIME); + } + } +} + +void ObStandbySchemaRefreshTrigger::destroy() +{ + ISTAT("standby schema refresh trigger destory", KPC(this)); + ObTenantThreadHelper::destroy(); + tenant_id_ = OB_INVALID_TENANT_ID; + sql_proxy_ = NULL; + is_inited_ = false; +} + +int ObStandbySchemaRefreshTrigger::check_inner_stat_() +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!is_inited_)) { + ret = OB_NOT_INIT; + WSTAT("not init", KR(ret)); + } else if (OB_ISNULL(sql_proxy_)) { + ret = OB_ERR_UNEXPECTED; + WSTAT("Member variables is NULL", KR(ret), KP(sql_proxy_)); + } + return ret; +} + +int ObStandbySchemaRefreshTrigger::submit_tenant_refresh_schema_task_() +{ + int ret = OB_SUCCESS; + ObAllTenantInfo tenant_info; + rootserver::ObTenantInfoLoader *tenant_info_loader = MTL(rootserver::ObTenantInfoLoader*); + + if (OB_FAIL(check_inner_stat_())) { + WSTAT("error unexpected", KR(ret), K(tenant_id_), KP(sql_proxy_)); + } else if (!is_user_tenant(tenant_id_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("only run for user tenant", KR(ret), K_(tenant_id)); + } else if (OB_ISNULL(GCTX.ob_service_) || OB_ISNULL(GCTX.schema_service_) || OB_ISNULL(sql_proxy_) || OB_ISNULL(tenant_info_loader)) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("pointer is null", KR(ret), KP(GCTX.ob_service_), KP(GCTX.schema_service_), KP(sql_proxy_), KP(tenant_info_loader)); + } else if (OB_FAIL(tenant_info_loader->get_tenant_info(tenant_info))) { + LOG_WARN("fail to get tenant info", KR(ret), K_(tenant_id)); + } else if (tenant_info.is_standby() && tenant_info.is_normal_status()) { + ObRefreshSchemaStatus schema_status; + ObSchemaStatusProxy *schema_status_proxy = GCTX.schema_status_proxy_; + if (OB_ISNULL(schema_status_proxy)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("schema_status_proxy is null", KR(ret)); + } else if (OB_FAIL(schema_status_proxy->get_refresh_schema_status(tenant_id_, schema_status))) { + LOG_WARN("fail to get schema status", KR(ret), K(tenant_id_)); + } else if (common::OB_INVALID_TIMESTAMP == schema_status.snapshot_timestamp_) { + int64_t version_in_inner_table = OB_INVALID_VERSION; + int64_t local_schema_version = OB_INVALID_VERSION; + if (OB_FAIL(GCTX.schema_service_->get_tenant_refreshed_schema_version( + tenant_id_, local_schema_version))) { + LOG_WARN("fail to get tenant refreshed schema version", KR(ret), K_(tenant_id)); + } else if (OB_FAIL(GCTX.schema_service_->get_schema_version_in_inner_table( + *sql_proxy_, schema_status, version_in_inner_table))) { + LOG_WARN("fail to get_schema_version_in_inner_table", KR(ret), K(schema_status)); + } else if (local_schema_version > version_in_inner_table) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("local_schema_version > version_in_inner_table", KR(ret), K_(tenant_id), + K(local_schema_version), K(version_in_inner_table)); + } else if (local_schema_version == version_in_inner_table) { + // do nothing + } else if (OB_FAIL(GCTX.ob_service_->submit_async_refresh_schema_task(tenant_id_, version_in_inner_table))) { + LOG_WARN("failed to submit_async_refresh_schema_task", KR(ret), K_(tenant_id)); + } + } + } + return ret; +} + +} +} diff --git a/src/rootserver/ob_standby_schema_refresh_trigger.h b/src/rootserver/ob_standby_schema_refresh_trigger.h new file mode 100644 index 000000000..dae084fc7 --- /dev/null +++ b/src/rootserver/ob_standby_schema_refresh_trigger.h @@ -0,0 +1,62 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#ifndef OCEANBASE_ROOTSERVER_STANDBY_SCHEMA_REFRESH_TRIGGER_H +#define OCEANBASE_ROOTSERVER_STANDBY_SCHEMA_REFRESH_TRIGGER_H + +#include "lib/thread/ob_reentrant_thread.h"//ObRsReentrantThread +#include "lib/utility/ob_print_utils.h" //TO_STRING_KV +#include "share/ob_tenant_info_proxy.h"//ObAllTenantInfo +#include "rootserver/ob_primary_ls_service.h"//ObTenantThreadHelper + +namespace oceanbase { +namespace common +{ +class ObMySQLProxy; +} +namespace share +{ +class SCN; +} +namespace rootserver +{ + +class ObStandbySchemaRefreshTrigger : public ObTenantThreadHelper +{ +public: + ObStandbySchemaRefreshTrigger() : is_inited_(false), + tenant_id_(OB_INVALID_TENANT_ID), sql_proxy_(NULL) {} + virtual ~ObStandbySchemaRefreshTrigger() {} + int init(); + void destroy(); + virtual void do_work() override; + + DEFINE_MTL_FUNC(ObStandbySchemaRefreshTrigger) + +private: + int check_inner_stat_(); + int submit_tenant_refresh_schema_task_(); + const static int64_t DEFAULT_IDLE_TIME = 1000 * 1000; // 1s + +public: + TO_STRING_KV(K_(is_inited), K_(tenant_id), KP_(sql_proxy)); + +private: + bool is_inited_; + uint64_t tenant_id_; + common::ObMySQLProxy *sql_proxy_; +}; + +} // namespace rootserver +} // namespace oceanbase + +#endif /* !OCEANBASE_ROOTSERVER_STANDBY_SCHEMA_REFRESH_TRIGGER_H */ \ No newline at end of file diff --git a/src/rootserver/ob_tenant_info_loader.cpp b/src/rootserver/ob_tenant_info_loader.cpp index 8f9b858c3..f2207049a 100644 --- a/src/rootserver/ob_tenant_info_loader.cpp +++ b/src/rootserver/ob_tenant_info_loader.cpp @@ -153,7 +153,7 @@ int ObTenantInfoLoader::get_valid_sts_after(const int64_t specified_time_us, sha } else { LOG_WARN("failed to get tenant info", KR(ret)); } - } else if (refresh_time_us < specified_time_us) { + } else if (refresh_time_us <= specified_time_us) { ret = OB_NEED_WAIT; LOG_TRACE("tenant info cache is old, need wait", KR(ret), K(refresh_time_us), K(specified_time_us), K(tenant_info)); wakeup(); @@ -206,7 +206,7 @@ int ObAllTenantInfoCache::refresh_tenant_info(const uint64_t tenant_id, common:: { int ret = OB_SUCCESS; ObAllTenantInfo new_tenant_info; - const int64_t new_refresh_time_us = ObTimeUtility::current_time(); + const int64_t new_refresh_time_us = ObClockGenerator::getCurrentTime(); if (OB_ISNULL(sql_proxy) || !is_user_tenant(tenant_id)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", KR(ret), K(tenant_id), KP(sql_proxy)); diff --git a/src/rootserver/ob_tenant_recovery_reportor.cpp b/src/rootserver/ob_tenant_recovery_reportor.cpp index de0f52696..9f0d26b14 100644 --- a/src/rootserver/ob_tenant_recovery_reportor.cpp +++ b/src/rootserver/ob_tenant_recovery_reportor.cpp @@ -141,10 +141,6 @@ void ObTenantRecoveryReportor::run2() } } - if (OB_SUCCESS != (tmp_ret = submit_tenant_refresh_schema_task_())) { - LOG_WARN("failed to submit_tenant_refresh_schema_task_", KR(tmp_ret)); - } - //更新受控回放位点到replayservice if (OB_SUCCESS != (tmp_ret = update_replayable_point_())) { LOG_WARN("failed to update_replayable_point", KR(tmp_ret)); @@ -156,50 +152,6 @@ void ObTenantRecoveryReportor::run2() } } -int ObTenantRecoveryReportor::submit_tenant_refresh_schema_task_() -{ - int ret = OB_SUCCESS; - ObAllTenantInfo tenant_info; - rootserver::ObTenantInfoLoader *tenant_info_loader = MTL(rootserver::ObTenantInfoLoader*); - - if (IS_NOT_INIT) { - ret = OB_NOT_INIT; - LOG_WARN("not init", KR(ret)); - } else if (OB_ISNULL(GCTX.ob_service_) || OB_ISNULL(GCTX.schema_service_) || OB_ISNULL(sql_proxy_) || OB_ISNULL(tenant_info_loader)) { - ret = OB_ERR_UNEXPECTED; - LOG_ERROR("pointer is null", KR(ret), KP(GCTX.ob_service_), KP(GCTX.schema_service_), KP(sql_proxy_), KP(tenant_info_loader)); - } else if (OB_FAIL(tenant_info_loader->get_tenant_info(tenant_info))) { - LOG_WARN("fail to get tenant info", KR(ret), K_(tenant_id)); - } else if (tenant_info.is_standby() && tenant_info.is_normal_status()) { - ObRefreshSchemaStatus schema_status; - ObSchemaStatusProxy *schema_status_proxy = GCTX.schema_status_proxy_; - if (OB_ISNULL(schema_status_proxy)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("schema_status_proxy is null", KR(ret)); - } else if (OB_FAIL(schema_status_proxy->get_refresh_schema_status(tenant_id_, schema_status))) { - LOG_WARN("fail to get schema status", KR(ret), K(tenant_id_)); - } else if (common::OB_INVALID_TIMESTAMP == schema_status.snapshot_timestamp_) { - int64_t version_in_inner_table = OB_INVALID_VERSION; - int64_t local_schema_version = OB_INVALID_VERSION; - if (OB_FAIL(GCTX.schema_service_->get_tenant_refreshed_schema_version( - tenant_id_, local_schema_version))) { - LOG_WARN("fail to get tenant refreshed schema version", KR(ret), K_(tenant_id)); - } else if (OB_FAIL(GCTX.schema_service_->get_schema_version_in_inner_table( - *sql_proxy_, schema_status, version_in_inner_table))) { - LOG_WARN("fail to get_schema_version_in_inner_table", KR(ret), K(schema_status)); - } else if (local_schema_version > version_in_inner_table) { - ret = OB_ERR_UNEXPECTED; - LOG_ERROR("local_schema_version > version_in_inner_table", KR(ret), K_(tenant_id), - K(local_schema_version), K(version_in_inner_table)); - } else if (local_schema_version == version_in_inner_table) { - // do nothing - } else if (OB_FAIL(GCTX.ob_service_->submit_async_refresh_schema_task(tenant_id_, version_in_inner_table))) { - LOG_WARN("failed to submit_async_refresh_schema_task", KR(ret), K_(tenant_id)); - } - } - } - return ret; -} int ObTenantRecoveryReportor::update_ls_recovery_stat_() { int ret = OB_SUCCESS; diff --git a/src/share/rc/ob_tenant_base.h b/src/share/rc/ob_tenant_base.h index 968b70c04..5649830cc 100644 --- a/src/share/rc/ob_tenant_base.h +++ b/src/share/rc/ob_tenant_base.h @@ -129,6 +129,7 @@ namespace rootserver class ObRestoreService; class ObRecoveryLSService; class ObArbitrationService; + class ObStandbySchemaRefreshTrigger; } namespace observer { @@ -198,7 +199,8 @@ using ObPartTransCtxObjPool = common::ObServerObjectPoolget_palf_role(share::GTS_LS, role, tmp_epoch))) { TRANS_LOG(WARN, "get ObStandbyTimestampService role fail", KR(ret)); } else { - ATOMIC_STORE(&switch_to_leader_ts_, ObTimeUtility::current_time()); + ATOMIC_STORE(&switch_to_leader_ts_, ObClockGenerator::getCurrentTime()); epoch_ = tmp_epoch; int64_t type = MTL(ObTimestampAccess *)->get_service_type(); if (ObTimestampAccess::ServiceType::FOLLOWER == type) {