finetune ObTimer and ObTenantTimezoneMgr
This commit is contained in:
5
deps/oblib/src/lib/task/ob_timer.cpp
vendored
5
deps/oblib/src/lib/task/ob_timer.cpp
vendored
@ -154,14 +154,13 @@ bool ObTimer::task_exist(const ObTimerTask &task)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTimer::schedule(ObTimerTask &task, const int64_t delay, const bool repeate /*=false*/)
|
||||
int ObTimer::schedule(ObTimerTask &task, const int64_t delay, const bool repeate /*=false*/, const bool immediate /*=false*/)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const bool schedule_immediately = false;
|
||||
if (!is_inited_) {
|
||||
ret = OB_NOT_INIT;
|
||||
} else {
|
||||
ret = schedule_task(task, delay, repeate, schedule_immediately);
|
||||
ret = schedule_task(task, delay, repeate, immediate);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
2
deps/oblib/src/lib/task/ob_timer.h
vendored
2
deps/oblib/src/lib/task/ob_timer.h
vendored
@ -79,7 +79,7 @@ public:
|
||||
void wait(); // wait all running task finish
|
||||
void destroy();
|
||||
public:
|
||||
int schedule(ObTimerTask &task, const int64_t delay, bool repeate = false);
|
||||
int schedule(ObTimerTask &task, const int64_t delay, bool repeate = false, bool immediate = false);
|
||||
int schedule_repeate_task_immediately(ObTimerTask &task, const int64_t delay);
|
||||
bool task_exist(const common::ObTimerTask &task);
|
||||
int task_exist(const common::ObTimerTask &task, bool &exist)
|
||||
|
||||
16
deps/oblib/src/lib/thread/thread_mgr.h
vendored
16
deps/oblib/src/lib/thread/thread_mgr.h
vendored
@ -176,15 +176,15 @@ public:
|
||||
UNUSED(task);
|
||||
return common::OB_NOT_SUPPORTED;
|
||||
}
|
||||
virtual int schedule(common::ObTimerTask &task, const int64_t delay, bool repeate = false)
|
||||
virtual int schedule(common::ObTimerTask &task, const int64_t delay, bool repeate = false, bool immediate = false)
|
||||
{
|
||||
UNUSEDx(task, delay, repeate);
|
||||
UNUSEDx(task, delay, repeate, immediate);
|
||||
return common::OB_NOT_SUPPORTED;
|
||||
}
|
||||
virtual int schedule(int idx, common::ObTimerTask &task,
|
||||
const int64_t delay, bool repeate = false)
|
||||
const int64_t delay, bool repeate = false, bool immediate = false)
|
||||
{
|
||||
UNUSEDx(idx, task, delay, repeate);
|
||||
UNUSEDx(idx, task, delay, repeate, immediate);
|
||||
return common::OB_NOT_SUPPORTED;
|
||||
}
|
||||
virtual int task_exist(const common::ObTimerTask &task, bool &exist)
|
||||
@ -778,13 +778,13 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
int schedule(common::ObTimerTask &task, const int64_t delay, bool repeate = false) override
|
||||
int schedule(common::ObTimerTask &task, const int64_t delay, bool repeate = false, bool immediate = false) override
|
||||
{
|
||||
int ret = common::OB_SUCCESS;
|
||||
if (OB_ISNULL(timer_)) {
|
||||
ret = common::OB_ERR_UNEXPECTED;
|
||||
} else {
|
||||
ret = timer_->schedule(task, delay, repeate);
|
||||
ret = timer_->schedule(task, delay, repeate, immediate);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -968,7 +968,7 @@ public:
|
||||
}
|
||||
}
|
||||
int schedule(int idx, common::ObTimerTask &task, const int64_t delay,
|
||||
bool repeate = false) override
|
||||
bool repeate = false, bool immediate = false) override
|
||||
{
|
||||
int ret = common::OB_SUCCESS;
|
||||
if (!is_inited_) {
|
||||
@ -978,7 +978,7 @@ public:
|
||||
} else if (OB_ISNULL(timers_[idx])) {
|
||||
ret = common::OB_ERR_UNEXPECTED;
|
||||
} else {
|
||||
ret = timers_[idx]->schedule(task, delay, repeate);
|
||||
ret = timers_[idx]->schedule(task, delay, repeate, immediate);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -929,7 +929,11 @@ int ObServer::start()
|
||||
FLOG_INFO("check if schema ready", KR(ret), K(stop_), K(schema_ready));
|
||||
|
||||
bool timezone_usable = false;
|
||||
tenant_timezone_mgr_.set_start_refresh(true);
|
||||
if (FAILEDx(tenant_timezone_mgr_.start())) {
|
||||
LOG_ERROR("fail to start tenant timezone mgr", KR(ret));
|
||||
} else {
|
||||
FLOG_INFO("success to start tenant timezone mgr");
|
||||
}
|
||||
while (OB_SUCC(ret) && !stop_ && !timezone_usable) {
|
||||
timezone_usable = tenant_timezone_mgr_.is_usable();
|
||||
if (!timezone_usable) {
|
||||
|
||||
@ -28,22 +28,12 @@ namespace omt {
|
||||
void ObTenantTimezoneMgr::AddTenantTZTask::runTimerTask()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t delay = SLEEP_USECONDS;
|
||||
const bool repeat = false;
|
||||
if (OB_ISNULL(tenant_tz_mgr_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("update all tenant task, tenant tz mgr is null", K(ret));
|
||||
} else if (tenant_tz_mgr_->get_start_refresh()) {
|
||||
if (OB_FAIL(tenant_tz_mgr_->update_timezone_map())) {
|
||||
} else if (OB_FAIL(tenant_tz_mgr_->update_timezone_map())) {
|
||||
LOG_WARN("tenant timezone mgr update tenant timezone map failed", K(ret));
|
||||
}
|
||||
} else {
|
||||
delay = 1 * 500 * 1000;
|
||||
}
|
||||
|
||||
if (OB_FAIL(TG_SCHEDULE(lib::TGDefIDs::TIMEZONE_MGR, *this, delay, repeat))) {
|
||||
LOG_ERROR("schedule timezone update task failed", K(ret));
|
||||
}
|
||||
}
|
||||
|
||||
void ObTenantTimezoneMgr::DeleteTenantTZTask::runTimerTask()
|
||||
@ -52,17 +42,10 @@ void ObTenantTimezoneMgr::DeleteTenantTZTask::runTimerTask()
|
||||
if (OB_ISNULL(tenant_tz_mgr_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("delete tenant task, tenant tz mgr is null", K(ret));
|
||||
} else if (tenant_tz_mgr_->get_start_refresh()) {
|
||||
if (OB_FAIL(tenant_tz_mgr_->remove_nonexist_tenant())) {
|
||||
} else if (OB_FAIL(tenant_tz_mgr_->remove_nonexist_tenant())) {
|
||||
LOG_WARN("remove nonexist tenants failed", K(ret));
|
||||
}
|
||||
}
|
||||
const int64_t delay = SLEEP_USECONDS;
|
||||
const bool repeat = false;
|
||||
if (OB_FAIL(TG_SCHEDULE(lib::TGDefIDs::TIMEZONE_MGR, *this, delay, repeat))) {
|
||||
LOG_ERROR("schedule timezone delete task failed", K(ret));
|
||||
}
|
||||
}
|
||||
|
||||
int ObTenantTimezoneMgr::UpdateTenantTZOp::operator() (common::hash::HashMapPair<uint64_t, ObTenantTimezone*> &entry)
|
||||
{
|
||||
@ -81,23 +64,16 @@ void ObTenantTimezoneMgr::UpdateTenantTZTask::runTimerTask()
|
||||
if (OB_ISNULL(tenant_tz_mgr_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("delete tenant task, tenant tz mgr is null", K(ret));
|
||||
} else if (tenant_tz_mgr_->get_start_refresh()) {
|
||||
if (OB_FAIL(tenant_tz_mgr_->timezone_map_.foreach_refactored(update_op))) {
|
||||
} else if (OB_FAIL(tenant_tz_mgr_->timezone_map_.foreach_refactored(update_op))) {
|
||||
LOG_WARN("update tenant time zone failed", K(ret));
|
||||
}
|
||||
}
|
||||
const int64_t delay = SLEEP_USECONDS;
|
||||
const bool repeat = false;
|
||||
if (OB_FAIL(TG_SCHEDULE(lib::TGDefIDs::TIMEZONE_MGR, *this, delay, repeat))) {
|
||||
LOG_ERROR("schedule timezone delete task failed", K(ret));
|
||||
}
|
||||
}
|
||||
|
||||
ObTenantTimezoneMgr::ObTenantTimezoneMgr()
|
||||
: allocator_("TenantTZ"), is_inited_(false), self_(), sql_proxy_(nullptr),
|
||||
rwlock_(ObLatchIds::TIMEZONE_LOCK),
|
||||
timezone_map_(), add_task_(this), delete_task_(this), update_task_(this),
|
||||
start_refresh_(false), usable_(false),
|
||||
usable_(false),
|
||||
schema_service_(nullptr)
|
||||
{
|
||||
tenant_tz_map_getter_ = ObTenantTimezoneMgr::get_tenant_timezone_default;
|
||||
@ -117,31 +93,36 @@ int ObTenantTimezoneMgr::init(ObMySQLProxy &sql_proxy, const ObAddr &server,
|
||||
share::schema::ObMultiVersionSchemaService &schema_service)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_FAIL(TG_START(lib::TGDefIDs::TIMEZONE_MGR))) {
|
||||
LOG_WARN("fail to init timer", K(ret));
|
||||
} else {
|
||||
sql_proxy_ = &sql_proxy;
|
||||
self_ = server;
|
||||
schema_service_ = &schema_service;
|
||||
is_inited_ = true;
|
||||
}
|
||||
const int64_t delay = 0;
|
||||
const bool repeat = false;
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_FAIL(add_tenant_timezone(OB_SYS_TENANT_ID))) {
|
||||
if (OB_FAIL(add_tenant_timezone(OB_SYS_TENANT_ID))) {
|
||||
LOG_WARN("add tenant timezone info failed", K(ret));
|
||||
} else if (OB_FAIL(TG_SCHEDULE(lib::TGDefIDs::TIMEZONE_MGR, add_task_, delay, repeat))) {
|
||||
LOG_WARN("schedual time zone mgr failed", K(ret));
|
||||
} else if (OB_FAIL(TG_SCHEDULE(lib::TGDefIDs::TIMEZONE_MGR, delete_task_, delay, repeat))) {
|
||||
LOG_WARN("schedual time zone mgr failed", K(ret));
|
||||
} else if (OB_FAIL(TG_SCHEDULE(lib::TGDefIDs::TIMEZONE_MGR, update_task_, delay, repeat))) {
|
||||
LOG_WARN("schedual time zone mgr failed", K(ret));
|
||||
} else {
|
||||
tenant_tz_map_getter_ = ObTenantTimezoneMgr::get_tenant_timezone_static;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTenantTimezoneMgr::start()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const int64_t delay = SLEEP_USECONDS;
|
||||
const bool repeat = true;
|
||||
const bool immediate = true;
|
||||
if (OB_FAIL(TG_START(lib::TGDefIDs::TIMEZONE_MGR))) {
|
||||
LOG_WARN("fail to start timer", K(ret));
|
||||
} else if (OB_FAIL(TG_SCHEDULE(lib::TGDefIDs::TIMEZONE_MGR, add_task_, delay, repeat, immediate))) {
|
||||
LOG_WARN("schedual time zone mgr failed", K(ret));
|
||||
} else if (OB_FAIL(TG_SCHEDULE(lib::TGDefIDs::TIMEZONE_MGR, delete_task_, delay, repeat, immediate))) {
|
||||
LOG_WARN("schedual time zone mgr failed", K(ret));
|
||||
} else if (OB_FAIL(TG_SCHEDULE(lib::TGDefIDs::TIMEZONE_MGR, update_task_, delay, repeat, immediate))) {
|
||||
LOG_WARN("schedual time zone mgr failed", K(ret));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ObTenantTimezoneMgr::init(tenant_timezone_map_getter tz_map_getter)
|
||||
{
|
||||
tenant_tz_map_getter_ = tz_map_getter;
|
||||
|
||||
@ -66,7 +66,6 @@ private:
|
||||
int update_tenant_map(common::ObIArray<uint64_t> &latest_tenant_ids);
|
||||
|
||||
ObTenantTimezoneMgr *tenant_tz_mgr_;
|
||||
const uint64_t SLEEP_USECONDS = 5000000;
|
||||
};
|
||||
class DeleteTenantTZTask : public common::ObTimerTask
|
||||
{
|
||||
@ -78,7 +77,6 @@ private:
|
||||
void runTimerTask(void) override;
|
||||
|
||||
ObTenantTimezoneMgr *tenant_tz_mgr_;
|
||||
const uint64_t SLEEP_USECONDS = 60000000;
|
||||
};
|
||||
class UpdateTenantTZOp
|
||||
{
|
||||
@ -100,7 +98,6 @@ private:
|
||||
UpdateTenantTZTask &operator=(const UpdateTenantTZTask &) = delete;
|
||||
void runTimerTask(void) override;
|
||||
ObTenantTimezoneMgr *tenant_tz_mgr_;
|
||||
const uint64_t SLEEP_USECONDS = 5000000;
|
||||
};
|
||||
friend AddTenantTZTask;
|
||||
friend DeleteTenantTZTask;
|
||||
@ -119,6 +116,7 @@ public:
|
||||
share::schema::ObMultiVersionSchemaService &schema_service);
|
||||
// init interface for liboblog only.
|
||||
void init(tenant_timezone_map_getter tz_map_getter);
|
||||
int start();
|
||||
int add_tenant_timezone(uint64_t tenant_id);
|
||||
int del_tenant_timezone(uint64_t tenant_id);
|
||||
|
||||
@ -133,8 +131,6 @@ public:
|
||||
int update_timezone_map();
|
||||
int delete_tenant_timezone();
|
||||
bool is_inited() { return is_inited_; }
|
||||
bool get_start_refresh() { return start_refresh_; }
|
||||
void set_start_refresh(bool start) { start_refresh_ = start; }
|
||||
bool is_usable() { return usable_; }
|
||||
void set_usable() { usable_ = true; }
|
||||
|
||||
@ -163,12 +159,12 @@ private:
|
||||
AddTenantTZTask add_task_;
|
||||
DeleteTenantTZTask delete_task_;
|
||||
UpdateTenantTZTask update_task_;
|
||||
bool start_refresh_;
|
||||
bool usable_;
|
||||
share::schema::ObMultiVersionSchemaService *schema_service_;
|
||||
public:
|
||||
// tenant timezone getter, observer and liboblog init it during start up.
|
||||
tenant_timezone_map_getter tenant_tz_map_getter_;
|
||||
const uint64_t SLEEP_USECONDS = 5000000;
|
||||
};
|
||||
|
||||
} // omt
|
||||
|
||||
Reference in New Issue
Block a user