From 1cd6a7194102d7211dc8feda092d1981fb5fe2e5 Mon Sep 17 00:00:00 2001 From: maosy <630014370@qq.com> Date: Thu, 2 Mar 2023 18:02:26 +0000 Subject: [PATCH] Canonical tenant-level thread exit --- src/observer/omt/ob_multi_tenant.cpp | 11 +++-- src/rootserver/ob_common_ls_service.cpp | 5 -- src/rootserver/ob_common_ls_service.h | 2 +- src/rootserver/ob_primary_ls_service.cpp | 5 -- src/rootserver/ob_primary_ls_service.h | 2 +- src/rootserver/ob_recovery_ls_service.cpp | 4 -- src/rootserver/ob_recovery_ls_service.h | 2 +- src/rootserver/ob_tenant_info_report.cpp | 4 -- src/rootserver/ob_tenant_info_report.h | 2 +- src/rootserver/ob_tenant_thread_helper.cpp | 36 ++++++++++---- src/rootserver/ob_tenant_thread_helper.h | 23 ++++++++- .../restore/ob_restore_scheduler.cpp | 47 +++++-------------- src/rootserver/restore/ob_restore_scheduler.h | 9 +--- 13 files changed, 73 insertions(+), 79 deletions(-) diff --git a/src/observer/omt/ob_multi_tenant.cpp b/src/observer/omt/ob_multi_tenant.cpp index 43517ce327..177b3c44ee 100644 --- a/src/observer/omt/ob_multi_tenant.cpp +++ b/src/observer/omt/ob_multi_tenant.cpp @@ -92,6 +92,7 @@ #include "rootserver/ob_common_ls_service.h"//ObCommonLSService #include "rootserver/ob_tenant_info_report.h"//ObTenantInfoReportor #include "rootserver/restore/ob_restore_scheduler.h" //ObRestoreService +#include "rootserver/ob_tenant_thread_helper.h"//ObTenantThreadHelper #include "logservice/leader_coordinator/ob_leader_coordinator.h" #include "storage/lob/ob_lob_manager.h" #include "share/deadlock/ob_deadlock_detector_mgr.h" @@ -405,11 +406,11 @@ int ObMultiTenant::init(ObAddr myaddr, MTL_BIND2(mtl_new_default, ObTenantMetaChecker::mtl_init, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default); MTL_BIND2(mtl_new_default, rootserver::ObLSRecoveryReportor::mtl_init, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default); MTL_BIND2(mtl_new_default, rootserver::ObTenantInfoLoader::mtl_init, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default); - MTL_BIND2(mtl_new_default, rootserver::ObPrimaryLSService::mtl_init, nullptr, nullptr, nullptr, mtl_destroy_default); - MTL_BIND2(mtl_new_default, rootserver::ObCommonLSService::mtl_init, nullptr, nullptr, nullptr, mtl_destroy_default); - MTL_BIND2(mtl_new_default, rootserver::ObTenantInfoReportor::mtl_init, nullptr, nullptr, nullptr, mtl_destroy_default); - MTL_BIND2(mtl_new_default, rootserver::ObRecoveryLSService::mtl_init, nullptr, nullptr, nullptr, mtl_destroy_default); - MTL_BIND2(mtl_new_default, rootserver::ObRestoreService::mtl_init, nullptr, nullptr, nullptr, mtl_destroy_default); + MTL_BIND2(mtl_new_default, rootserver::ObPrimaryLSService::mtl_init, nullptr, rootserver::ObPrimaryLSService::mtl_stop, rootserver::ObPrimaryLSService::mtl_wait, mtl_destroy_default); + MTL_BIND2(mtl_new_default, rootserver::ObCommonLSService::mtl_init, nullptr, rootserver::ObCommonLSService::mtl_stop, rootserver::ObCommonLSService::mtl_wait, mtl_destroy_default); + MTL_BIND2(mtl_new_default, rootserver::ObTenantInfoReportor::mtl_init, nullptr, rootserver::ObTenantInfoReportor::mtl_stop, rootserver::ObTenantInfoReportor::mtl_wait, mtl_destroy_default); + MTL_BIND2(mtl_new_default, rootserver::ObRecoveryLSService::mtl_init, nullptr, rootserver::ObRecoveryLSService::mtl_stop, rootserver::ObRecoveryLSService::mtl_wait, mtl_destroy_default); + MTL_BIND2(mtl_new_default, rootserver::ObRestoreService::mtl_init, nullptr, rootserver::ObRestoreService::mtl_stop, rootserver::ObRestoreService::mtl_wait, mtl_destroy_default); MTL_BIND2(mtl_new_default, coordinator::ObLeaderCoordinator::mtl_init, coordinator::ObLeaderCoordinator::mtl_start, coordinator::ObLeaderCoordinator::mtl_stop, coordinator::ObLeaderCoordinator::mtl_wait, mtl_destroy_default); MTL_BIND2(mtl_new_default, coordinator::ObFailureDetector::mtl_init, coordinator::ObFailureDetector::mtl_start, coordinator::ObFailureDetector::mtl_stop, coordinator::ObFailureDetector::mtl_wait, mtl_destroy_default); MTL_BIND2(ObLobManager::mtl_new, mtl_init_default, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default); diff --git a/src/rootserver/ob_common_ls_service.cpp b/src/rootserver/ob_common_ls_service.cpp index 35ebeebfb1..a9bb6a35c0 100755 --- a/src/rootserver/ob_common_ls_service.cpp +++ b/src/rootserver/ob_common_ls_service.cpp @@ -35,11 +35,6 @@ using namespace palf; namespace rootserver { //////////////ObCommonLSService -int ObCommonLSService::mtl_init(ObCommonLSService *&ka) -{ - return ka->init(); -} - int ObCommonLSService::init() { int ret = OB_SUCCESS; diff --git a/src/rootserver/ob_common_ls_service.h b/src/rootserver/ob_common_ls_service.h index d0131a856a..eb3af16e35 100644 --- a/src/rootserver/ob_common_ls_service.h +++ b/src/rootserver/ob_common_ls_service.h @@ -65,10 +65,10 @@ class ObCommonLSService : public ObTenantThreadHelper, public: ObCommonLSService():inited_(false), tenant_id_(OB_INVALID_TENANT_ID) {} virtual ~ObCommonLSService() {} - static int mtl_init(ObCommonLSService *&ka); int init(); void destroy(); virtual void do_work() override; + DEFINE_MTL_FUNC(ObCommonLSService) public: virtual share::SCN get_rec_scn() override { return share::SCN::max_scn();} diff --git a/src/rootserver/ob_primary_ls_service.cpp b/src/rootserver/ob_primary_ls_service.cpp index 662c1ea255..90c6956094 100755 --- a/src/rootserver/ob_primary_ls_service.cpp +++ b/src/rootserver/ob_primary_ls_service.cpp @@ -33,11 +33,6 @@ namespace rootserver { //////////////ObPrimaryLSService -int ObPrimaryLSService::mtl_init(ObPrimaryLSService *&ka) -{ - return ka->init(); -} - int ObPrimaryLSService::init() { int ret = OB_SUCCESS; diff --git a/src/rootserver/ob_primary_ls_service.h b/src/rootserver/ob_primary_ls_service.h index 5fd742a37c..f370d968f5 100644 --- a/src/rootserver/ob_primary_ls_service.h +++ b/src/rootserver/ob_primary_ls_service.h @@ -71,10 +71,10 @@ class ObPrimaryLSService : public ObTenantThreadHelper, public: ObPrimaryLSService():inited_(false), tenant_id_(OB_INVALID_TENANT_ID) {} virtual ~ObPrimaryLSService() {} - static int mtl_init(ObPrimaryLSService *&ka); int init(); void destroy(); virtual void do_work() override; + DEFINE_MTL_FUNC(ObPrimaryLSService) public: virtual share::SCN get_rec_scn() override { return share::SCN::max_scn();} diff --git a/src/rootserver/ob_recovery_ls_service.cpp b/src/rootserver/ob_recovery_ls_service.cpp index 83045cd07d..cdbbedeb47 100644 --- a/src/rootserver/ob_recovery_ls_service.cpp +++ b/src/rootserver/ob_recovery_ls_service.cpp @@ -41,10 +41,6 @@ using namespace palf; namespace rootserver { -int ObRecoveryLSService::mtl_init(ObRecoveryLSService*&ka) -{ - return ka->init(); -} int ObRecoveryLSService::init() { int ret = OB_SUCCESS; diff --git a/src/rootserver/ob_recovery_ls_service.h b/src/rootserver/ob_recovery_ls_service.h index 3d57426a67..53d85b3f34 100644 --- a/src/rootserver/ob_recovery_ls_service.h +++ b/src/rootserver/ob_recovery_ls_service.h @@ -39,10 +39,10 @@ public: ObRecoveryLSService() : inited_(false), tenant_id_(OB_INVALID_TENANT_ID), proxy_(NULL) {} virtual ~ObRecoveryLSService() {} - static int mtl_init(ObRecoveryLSService *&ka); int init(); void destroy(); virtual void do_work() override; + DEFINE_MTL_FUNC(ObRecoveryLSService) private: void try_tenant_upgrade_end_(); int get_min_data_version_(uint64_t &compatible); diff --git a/src/rootserver/ob_tenant_info_report.cpp b/src/rootserver/ob_tenant_info_report.cpp index 51237bfd82..2d27d523b5 100755 --- a/src/rootserver/ob_tenant_info_report.cpp +++ b/src/rootserver/ob_tenant_info_report.cpp @@ -27,10 +27,6 @@ using namespace palf; namespace rootserver { //////////////ObTenantInfoReportor -int ObTenantInfoReportor::mtl_init(ObTenantInfoReportor *&ka) -{ - return ka->init(); -} int ObTenantInfoReportor::init() { diff --git a/src/rootserver/ob_tenant_info_report.h b/src/rootserver/ob_tenant_info_report.h index 40f546535b..a387b386c0 100644 --- a/src/rootserver/ob_tenant_info_report.h +++ b/src/rootserver/ob_tenant_info_report.h @@ -54,10 +54,10 @@ class ObTenantInfoReportor : public ObTenantThreadHelper, public: ObTenantInfoReportor():inited_(false), tenant_id_(OB_INVALID_TENANT_ID) {} virtual ~ObTenantInfoReportor() {} - static int mtl_init(ObTenantInfoReportor *&ka); int init(); void destroy(); virtual void do_work() override; + DEFINE_MTL_FUNC(ObTenantInfoReportor) public: virtual share::SCN get_rec_scn() override { return share::SCN::max_scn();} diff --git a/src/rootserver/ob_tenant_thread_helper.cpp b/src/rootserver/ob_tenant_thread_helper.cpp index 7b36b3ecf3..0b7d78610f 100755 --- a/src/rootserver/ob_tenant_thread_helper.cpp +++ b/src/rootserver/ob_tenant_thread_helper.cpp @@ -80,19 +80,41 @@ int ObTenantThreadHelper::start() void ObTenantThreadHelper::stop() { - LOG_INFO("[TENANT THREAD] thread stop start", K(tg_id_), K(thread_name_)); + LOG_INFO("[TENANT THREAD] thread logical stop start", K(tg_id_), K(thread_name_)); if (-1 != tg_id_) { TG_REENTRANT_LOGICAL_STOP(tg_id_); } - LOG_INFO("[TENANT THREAD] thread stop finish", K(tg_id_), K(thread_name_)); + LOG_INFO("[TENANT THREAD] thread logical stop finish", K(tg_id_), K(thread_name_)); } void ObTenantThreadHelper::wait() { - LOG_INFO("[TENANT THREAD] thread wait start", K(tg_id_), K(thread_name_)); + LOG_INFO("[TENANT THREAD] thread logical wait start", K(tg_id_), K(thread_name_)); if (-1 != tg_id_) { TG_REENTRANT_LOGICAL_WAIT(tg_id_); } + LOG_INFO("[TENANT THREAD] thread logical wait finish", K(tg_id_), K(thread_name_)); +} +void ObTenantThreadHelper::mtl_thread_stop() +{ + LOG_INFO("[TENANT THREAD] thread stop start", K(tg_id_), K(thread_name_)); + if (-1 != tg_id_) { + TG_STOP(tg_id_); + } + LOG_INFO("[TENANT THREAD] thread stop finish", K(tg_id_), K(thread_name_)); +} + +void ObTenantThreadHelper::mtl_thread_wait() +{ + LOG_INFO("[TENANT THREAD] thread wait start", K(tg_id_), K(thread_name_)); + if (-1 != tg_id_) { + { + ObThreadCondGuard guard(thread_cond_); + thread_cond_.broadcast(); + } + TG_WAIT(tg_id_); + is_first_time_to_start_ = true; + } LOG_INFO("[TENANT THREAD] thread wait finish", K(tg_id_), K(thread_name_)); } @@ -100,12 +122,8 @@ void ObTenantThreadHelper::destroy() { LOG_INFO("[TENANT THREAD] thread destory start", K(tg_id_), K(thread_name_)); if (-1 != tg_id_) { - TG_STOP(tg_id_); - { - ObThreadCondGuard guard(thread_cond_); - thread_cond_.broadcast(); - } - TG_WAIT(tg_id_); + mtl_thread_stop(); + mtl_thread_wait(); TG_DESTROY(tg_id_); tg_id_ = -1; } diff --git a/src/rootserver/ob_tenant_thread_helper.h b/src/rootserver/ob_tenant_thread_helper.h index 95bad81044..a88c32d083 100644 --- a/src/rootserver/ob_tenant_thread_helper.h +++ b/src/rootserver/ob_tenant_thread_helper.h @@ -57,6 +57,8 @@ public: int start(); void stop(); void wait(); + void mtl_thread_stop(); + void mtl_thread_wait(); int create(const char* thread_name, int tg_def_id, ObTenantThreadHelper &tenant_thread); void idle(const int64_t idle_time_us); public: @@ -79,7 +81,6 @@ public: static int get_zone_priority(const ObZone &primary_zone, const share::schema::ObTenantSchema &tenant_schema, common::ObSqlString &primary_zone_str); - protected: int wait_tenant_schema_and_version_ready_( const uint64_t tenant_id, const uint64_t &data_version); @@ -93,7 +94,25 @@ private: const char* thread_name_; }; - +#define DEFINE_MTL_FUNC(TYPE)\ + static int mtl_init(TYPE *&ka) {\ + int ret = OB_SUCCESS;\ + if (OB_ISNULL(ka)) {\ + ret = OB_ERR_UNEXPECTED;\ + } else if (OB_FAIL(ka->init())) {\ + }\ + return ret;\ + }\ + static void mtl_stop(TYPE *&ka) {\ + if (OB_NOT_NULL(ka)) {\ + ka->mtl_thread_stop();\ + }\ + }\ + static void mtl_wait(TYPE *&ka) {\ + if (OB_NOT_NULL(ka)) {\ + ka->mtl_thread_wait();\ + }\ + } } } diff --git a/src/rootserver/restore/ob_restore_scheduler.cpp b/src/rootserver/restore/ob_restore_scheduler.cpp index 00323580b2..49dc2c27b4 100644 --- a/src/rootserver/restore/ob_restore_scheduler.cpp +++ b/src/rootserver/restore/ob_restore_scheduler.cpp @@ -56,21 +56,6 @@ ObRestoreService::ObRestoreService() { } -int ObRestoreService::mtl_init(ObRestoreService *&ka) -{ - int ret = OB_SUCCESS; - if (OB_ISNULL(ka)) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("restore service is null", KR(ret)); - } else if (OB_FAIL(ka->init( - GCTX.schema_service_, GCTX.sql_proxy_, - GCTX.rs_rpc_proxy_, GCTX.srv_rpc_proxy_, - GCTX.lst_operator_, GCTX.self_addr()))) { - LOG_WARN("failed to init restore service", KR(ret)); - } - return ret; -} - ObRestoreService::~ObRestoreService() { if (!has_set_stop()) { @@ -83,40 +68,34 @@ void ObRestoreService::destroy() ObTenantThreadHelper::destroy(); inited_ = false; } -int ObRestoreService::init( - ObMultiVersionSchemaService *schema_service, - ObMySQLProxy *sql_proxy, - ObCommonRpcProxy *rpc_proxy, - obrpc::ObSrvRpcProxy *srv_rpc_proxy, - ObLSTableOperator *lst_operator, - const common::ObAddr &self_addr) +int ObRestoreService::init() { int ret = OB_SUCCESS; if (inited_) { ret = OB_INIT_TWICE; LOG_WARN("init twice", KR(ret)); - } else if (OB_ISNULL(schema_service) || OB_ISNULL(sql_proxy) - || OB_ISNULL(rpc_proxy) || OB_ISNULL(srv_rpc_proxy) - || OB_ISNULL(lst_operator)) { + } else if (OB_ISNULL(GCTX.schema_service_) || OB_ISNULL(GCTX.sql_proxy_) + || OB_ISNULL(GCTX.rs_rpc_proxy_) || OB_ISNULL(GCTX.srv_rpc_proxy_) + || OB_ISNULL(GCTX.lst_operator_)) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid argument", KR(ret), KP(schema_service), KP(sql_proxy), - KP(rpc_proxy), KP(srv_rpc_proxy), KP(lst_operator)); + LOG_WARN("invalid argument", KR(ret), KP(GCTX.schema_service_), KP(GCTX.sql_proxy_), + KP(GCTX.rs_rpc_proxy_), KP(GCTX.srv_rpc_proxy_), KP(GCTX.lst_operator_)); } else if (OB_FAIL(ObTenantThreadHelper::create("REST_SER", lib::TGDefIDs::SimpleLSService, *this))) { LOG_WARN("failed to create thread", KR(ret)); } else if (OB_FAIL(ObTenantThreadHelper::start())) { LOG_WARN("fail to start thread", KR(ret)); } else if (OB_FAIL(upgrade_processors_.init( ObBaseUpgradeProcessor::UPGRADE_MODE_PHYSICAL_RESTORE, - *sql_proxy, *srv_rpc_proxy, *rpc_proxy, *schema_service, *this))) { + *GCTX.sql_proxy_, *GCTX.srv_rpc_proxy_, *GCTX.rs_rpc_proxy_, *GCTX.schema_service_, *this))) { LOG_WARN("fail to init upgrade processors", KR(ret)); } else { - schema_service_ = schema_service; - sql_proxy_ = sql_proxy; - rpc_proxy_ = rpc_proxy; - srv_rpc_proxy_ = srv_rpc_proxy; - lst_operator_ = lst_operator; + schema_service_ = GCTX.schema_service_; + sql_proxy_ = GCTX.sql_proxy_; + rpc_proxy_ = GCTX.rs_rpc_proxy_; + srv_rpc_proxy_ = GCTX.srv_rpc_proxy_; + lst_operator_ = GCTX.lst_operator_; tenant_id_ = is_sys_tenant(MTL_ID()) ? MTL_ID() : gen_user_tenant_id(MTL_ID()); - self_addr_ = self_addr; + self_addr_ = GCTX.self_addr(); inited_ = true; } return ret; diff --git a/src/rootserver/restore/ob_restore_scheduler.h b/src/rootserver/restore/ob_restore_scheduler.h index 3ddcb5441b..3ff790b870 100644 --- a/src/rootserver/restore/ob_restore_scheduler.h +++ b/src/rootserver/restore/ob_restore_scheduler.h @@ -44,15 +44,10 @@ public: public: ObRestoreService(); virtual ~ObRestoreService(); - static int mtl_init(ObRestoreService *&ka); - int init(share::schema::ObMultiVersionSchemaService *schema_service, - common::ObMySQLProxy *sql_proxy, - obrpc::ObCommonRpcProxy *rpc_proxy, - obrpc::ObSrvRpcProxy *srv_rpc_proxy, - share::ObLSTableOperator *lst_operator, - const common::ObAddr &self_addr); + int init(); virtual void do_work() override; void destroy(); + DEFINE_MTL_FUNC(ObRestoreService) public: virtual share::SCN get_rec_scn() override { return share::SCN::max_scn();} virtual int flush(share::SCN &rec_scn) override { return OB_SUCCESS; }