Canonical tenant-level thread exit

This commit is contained in:
maosy
2023-03-02 18:02:26 +00:00
committed by ob-robot
parent 6a4317b34e
commit 1cd6a71941
13 changed files with 73 additions and 79 deletions

View File

@ -92,6 +92,7 @@
#include "rootserver/ob_common_ls_service.h"//ObCommonLSService
#include "rootserver/ob_tenant_info_report.h"//ObTenantInfoReportor
#include "rootserver/restore/ob_restore_scheduler.h" //ObRestoreService
#include "rootserver/ob_tenant_thread_helper.h"//ObTenantThreadHelper
#include "logservice/leader_coordinator/ob_leader_coordinator.h"
#include "storage/lob/ob_lob_manager.h"
#include "share/deadlock/ob_deadlock_detector_mgr.h"
@ -405,11 +406,11 @@ int ObMultiTenant::init(ObAddr myaddr,
MTL_BIND2(mtl_new_default, ObTenantMetaChecker::mtl_init, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default);
MTL_BIND2(mtl_new_default, rootserver::ObLSRecoveryReportor::mtl_init, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default);
MTL_BIND2(mtl_new_default, rootserver::ObTenantInfoLoader::mtl_init, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default);
MTL_BIND2(mtl_new_default, rootserver::ObPrimaryLSService::mtl_init, nullptr, nullptr, nullptr, mtl_destroy_default);
MTL_BIND2(mtl_new_default, rootserver::ObCommonLSService::mtl_init, nullptr, nullptr, nullptr, mtl_destroy_default);
MTL_BIND2(mtl_new_default, rootserver::ObTenantInfoReportor::mtl_init, nullptr, nullptr, nullptr, mtl_destroy_default);
MTL_BIND2(mtl_new_default, rootserver::ObRecoveryLSService::mtl_init, nullptr, nullptr, nullptr, mtl_destroy_default);
MTL_BIND2(mtl_new_default, rootserver::ObRestoreService::mtl_init, nullptr, nullptr, nullptr, mtl_destroy_default);
MTL_BIND2(mtl_new_default, rootserver::ObPrimaryLSService::mtl_init, nullptr, rootserver::ObPrimaryLSService::mtl_stop, rootserver::ObPrimaryLSService::mtl_wait, mtl_destroy_default);
MTL_BIND2(mtl_new_default, rootserver::ObCommonLSService::mtl_init, nullptr, rootserver::ObCommonLSService::mtl_stop, rootserver::ObCommonLSService::mtl_wait, mtl_destroy_default);
MTL_BIND2(mtl_new_default, rootserver::ObTenantInfoReportor::mtl_init, nullptr, rootserver::ObTenantInfoReportor::mtl_stop, rootserver::ObTenantInfoReportor::mtl_wait, mtl_destroy_default);
MTL_BIND2(mtl_new_default, rootserver::ObRecoveryLSService::mtl_init, nullptr, rootserver::ObRecoveryLSService::mtl_stop, rootserver::ObRecoveryLSService::mtl_wait, mtl_destroy_default);
MTL_BIND2(mtl_new_default, rootserver::ObRestoreService::mtl_init, nullptr, rootserver::ObRestoreService::mtl_stop, rootserver::ObRestoreService::mtl_wait, mtl_destroy_default);
MTL_BIND2(mtl_new_default, coordinator::ObLeaderCoordinator::mtl_init, coordinator::ObLeaderCoordinator::mtl_start, coordinator::ObLeaderCoordinator::mtl_stop, coordinator::ObLeaderCoordinator::mtl_wait, mtl_destroy_default);
MTL_BIND2(mtl_new_default, coordinator::ObFailureDetector::mtl_init, coordinator::ObFailureDetector::mtl_start, coordinator::ObFailureDetector::mtl_stop, coordinator::ObFailureDetector::mtl_wait, mtl_destroy_default);
MTL_BIND2(ObLobManager::mtl_new, mtl_init_default, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default);

View File

@ -35,11 +35,6 @@ using namespace palf;
namespace rootserver
{
//////////////ObCommonLSService
int ObCommonLSService::mtl_init(ObCommonLSService *&ka)
{
return ka->init();
}
int ObCommonLSService::init()
{
int ret = OB_SUCCESS;

View File

@ -65,10 +65,10 @@ class ObCommonLSService : public ObTenantThreadHelper,
public:
ObCommonLSService():inited_(false), tenant_id_(OB_INVALID_TENANT_ID) {}
virtual ~ObCommonLSService() {}
static int mtl_init(ObCommonLSService *&ka);
int init();
void destroy();
virtual void do_work() override;
DEFINE_MTL_FUNC(ObCommonLSService)
public:
virtual share::SCN get_rec_scn() override { return share::SCN::max_scn();}

View File

@ -33,11 +33,6 @@ namespace rootserver
{
//////////////ObPrimaryLSService
int ObPrimaryLSService::mtl_init(ObPrimaryLSService *&ka)
{
return ka->init();
}
int ObPrimaryLSService::init()
{
int ret = OB_SUCCESS;

View File

@ -71,10 +71,10 @@ class ObPrimaryLSService : public ObTenantThreadHelper,
public:
ObPrimaryLSService():inited_(false), tenant_id_(OB_INVALID_TENANT_ID) {}
virtual ~ObPrimaryLSService() {}
static int mtl_init(ObPrimaryLSService *&ka);
int init();
void destroy();
virtual void do_work() override;
DEFINE_MTL_FUNC(ObPrimaryLSService)
public:
virtual share::SCN get_rec_scn() override { return share::SCN::max_scn();}

View File

@ -41,10 +41,6 @@ using namespace palf;
namespace rootserver
{
int ObRecoveryLSService::mtl_init(ObRecoveryLSService*&ka)
{
return ka->init();
}
int ObRecoveryLSService::init()
{
int ret = OB_SUCCESS;

View File

@ -39,10 +39,10 @@ public:
ObRecoveryLSService() : inited_(false),
tenant_id_(OB_INVALID_TENANT_ID), proxy_(NULL) {}
virtual ~ObRecoveryLSService() {}
static int mtl_init(ObRecoveryLSService *&ka);
int init();
void destroy();
virtual void do_work() override;
DEFINE_MTL_FUNC(ObRecoveryLSService)
private:
void try_tenant_upgrade_end_();
int get_min_data_version_(uint64_t &compatible);

View File

@ -27,10 +27,6 @@ using namespace palf;
namespace rootserver
{
//////////////ObTenantInfoReportor
int ObTenantInfoReportor::mtl_init(ObTenantInfoReportor *&ka)
{
return ka->init();
}
int ObTenantInfoReportor::init()
{

View File

@ -54,10 +54,10 @@ class ObTenantInfoReportor : public ObTenantThreadHelper,
public:
ObTenantInfoReportor():inited_(false), tenant_id_(OB_INVALID_TENANT_ID) {}
virtual ~ObTenantInfoReportor() {}
static int mtl_init(ObTenantInfoReportor *&ka);
int init();
void destroy();
virtual void do_work() override;
DEFINE_MTL_FUNC(ObTenantInfoReportor)
public:
virtual share::SCN get_rec_scn() override { return share::SCN::max_scn();}

View File

@ -80,19 +80,41 @@ int ObTenantThreadHelper::start()
void ObTenantThreadHelper::stop()
{
LOG_INFO("[TENANT THREAD] thread stop start", K(tg_id_), K(thread_name_));
LOG_INFO("[TENANT THREAD] thread logical stop start", K(tg_id_), K(thread_name_));
if (-1 != tg_id_) {
TG_REENTRANT_LOGICAL_STOP(tg_id_);
}
LOG_INFO("[TENANT THREAD] thread stop finish", K(tg_id_), K(thread_name_));
LOG_INFO("[TENANT THREAD] thread logical stop finish", K(tg_id_), K(thread_name_));
}
void ObTenantThreadHelper::wait()
{
LOG_INFO("[TENANT THREAD] thread wait start", K(tg_id_), K(thread_name_));
LOG_INFO("[TENANT THREAD] thread logical wait start", K(tg_id_), K(thread_name_));
if (-1 != tg_id_) {
TG_REENTRANT_LOGICAL_WAIT(tg_id_);
}
LOG_INFO("[TENANT THREAD] thread logical wait finish", K(tg_id_), K(thread_name_));
}
void ObTenantThreadHelper::mtl_thread_stop()
{
LOG_INFO("[TENANT THREAD] thread stop start", K(tg_id_), K(thread_name_));
if (-1 != tg_id_) {
TG_STOP(tg_id_);
}
LOG_INFO("[TENANT THREAD] thread stop finish", K(tg_id_), K(thread_name_));
}
void ObTenantThreadHelper::mtl_thread_wait()
{
LOG_INFO("[TENANT THREAD] thread wait start", K(tg_id_), K(thread_name_));
if (-1 != tg_id_) {
{
ObThreadCondGuard guard(thread_cond_);
thread_cond_.broadcast();
}
TG_WAIT(tg_id_);
is_first_time_to_start_ = true;
}
LOG_INFO("[TENANT THREAD] thread wait finish", K(tg_id_), K(thread_name_));
}
@ -100,12 +122,8 @@ void ObTenantThreadHelper::destroy()
{
LOG_INFO("[TENANT THREAD] thread destory start", K(tg_id_), K(thread_name_));
if (-1 != tg_id_) {
TG_STOP(tg_id_);
{
ObThreadCondGuard guard(thread_cond_);
thread_cond_.broadcast();
}
TG_WAIT(tg_id_);
mtl_thread_stop();
mtl_thread_wait();
TG_DESTROY(tg_id_);
tg_id_ = -1;
}

View File

@ -57,6 +57,8 @@ public:
int start();
void stop();
void wait();
void mtl_thread_stop();
void mtl_thread_wait();
int create(const char* thread_name, int tg_def_id, ObTenantThreadHelper &tenant_thread);
void idle(const int64_t idle_time_us);
public:
@ -79,7 +81,6 @@ public:
static int get_zone_priority(const ObZone &primary_zone,
const share::schema::ObTenantSchema &tenant_schema,
common::ObSqlString &primary_zone_str);
protected:
int wait_tenant_schema_and_version_ready_(
const uint64_t tenant_id, const uint64_t &data_version);
@ -93,7 +94,25 @@ private:
const char* thread_name_;
};
#define DEFINE_MTL_FUNC(TYPE)\
static int mtl_init(TYPE *&ka) {\
int ret = OB_SUCCESS;\
if (OB_ISNULL(ka)) {\
ret = OB_ERR_UNEXPECTED;\
} else if (OB_FAIL(ka->init())) {\
}\
return ret;\
}\
static void mtl_stop(TYPE *&ka) {\
if (OB_NOT_NULL(ka)) {\
ka->mtl_thread_stop();\
}\
}\
static void mtl_wait(TYPE *&ka) {\
if (OB_NOT_NULL(ka)) {\
ka->mtl_thread_wait();\
}\
}
}
}

View File

@ -56,21 +56,6 @@ ObRestoreService::ObRestoreService()
{
}
int ObRestoreService::mtl_init(ObRestoreService *&ka)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(ka)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("restore service is null", KR(ret));
} else if (OB_FAIL(ka->init(
GCTX.schema_service_, GCTX.sql_proxy_,
GCTX.rs_rpc_proxy_, GCTX.srv_rpc_proxy_,
GCTX.lst_operator_, GCTX.self_addr()))) {
LOG_WARN("failed to init restore service", KR(ret));
}
return ret;
}
ObRestoreService::~ObRestoreService()
{
if (!has_set_stop()) {
@ -83,40 +68,34 @@ void ObRestoreService::destroy()
ObTenantThreadHelper::destroy();
inited_ = false;
}
int ObRestoreService::init(
ObMultiVersionSchemaService *schema_service,
ObMySQLProxy *sql_proxy,
ObCommonRpcProxy *rpc_proxy,
obrpc::ObSrvRpcProxy *srv_rpc_proxy,
ObLSTableOperator *lst_operator,
const common::ObAddr &self_addr)
int ObRestoreService::init()
{
int ret = OB_SUCCESS;
if (inited_) {
ret = OB_INIT_TWICE;
LOG_WARN("init twice", KR(ret));
} else if (OB_ISNULL(schema_service) || OB_ISNULL(sql_proxy)
|| OB_ISNULL(rpc_proxy) || OB_ISNULL(srv_rpc_proxy)
|| OB_ISNULL(lst_operator)) {
} else if (OB_ISNULL(GCTX.schema_service_) || OB_ISNULL(GCTX.sql_proxy_)
|| OB_ISNULL(GCTX.rs_rpc_proxy_) || OB_ISNULL(GCTX.srv_rpc_proxy_)
|| OB_ISNULL(GCTX.lst_operator_)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KR(ret), KP(schema_service), KP(sql_proxy),
KP(rpc_proxy), KP(srv_rpc_proxy), KP(lst_operator));
LOG_WARN("invalid argument", KR(ret), KP(GCTX.schema_service_), KP(GCTX.sql_proxy_),
KP(GCTX.rs_rpc_proxy_), KP(GCTX.srv_rpc_proxy_), KP(GCTX.lst_operator_));
} else if (OB_FAIL(ObTenantThreadHelper::create("REST_SER", lib::TGDefIDs::SimpleLSService, *this))) {
LOG_WARN("failed to create thread", KR(ret));
} else if (OB_FAIL(ObTenantThreadHelper::start())) {
LOG_WARN("fail to start thread", KR(ret));
} else if (OB_FAIL(upgrade_processors_.init(
ObBaseUpgradeProcessor::UPGRADE_MODE_PHYSICAL_RESTORE,
*sql_proxy, *srv_rpc_proxy, *rpc_proxy, *schema_service, *this))) {
*GCTX.sql_proxy_, *GCTX.srv_rpc_proxy_, *GCTX.rs_rpc_proxy_, *GCTX.schema_service_, *this))) {
LOG_WARN("fail to init upgrade processors", KR(ret));
} else {
schema_service_ = schema_service;
sql_proxy_ = sql_proxy;
rpc_proxy_ = rpc_proxy;
srv_rpc_proxy_ = srv_rpc_proxy;
lst_operator_ = lst_operator;
schema_service_ = GCTX.schema_service_;
sql_proxy_ = GCTX.sql_proxy_;
rpc_proxy_ = GCTX.rs_rpc_proxy_;
srv_rpc_proxy_ = GCTX.srv_rpc_proxy_;
lst_operator_ = GCTX.lst_operator_;
tenant_id_ = is_sys_tenant(MTL_ID()) ? MTL_ID() : gen_user_tenant_id(MTL_ID());
self_addr_ = self_addr;
self_addr_ = GCTX.self_addr();
inited_ = true;
}
return ret;

View File

@ -44,15 +44,10 @@ public:
public:
ObRestoreService();
virtual ~ObRestoreService();
static int mtl_init(ObRestoreService *&ka);
int init(share::schema::ObMultiVersionSchemaService *schema_service,
common::ObMySQLProxy *sql_proxy,
obrpc::ObCommonRpcProxy *rpc_proxy,
obrpc::ObSrvRpcProxy *srv_rpc_proxy,
share::ObLSTableOperator *lst_operator,
const common::ObAddr &self_addr);
int init();
virtual void do_work() override;
void destroy();
DEFINE_MTL_FUNC(ObRestoreService)
public:
virtual share::SCN get_rec_scn() override { return share::SCN::max_scn();}
virtual int flush(share::SCN &rec_scn) override { return OB_SUCCESS; }