prevent shared-timer from being destroyed prematurely
This commit is contained in:
9
deps/oblib/src/lib/thread/thread_mgr.h
vendored
9
deps/oblib/src/lib/thread/thread_mgr.h
vendored
@ -124,6 +124,7 @@ public:
|
|||||||
virtual int start() = 0;
|
virtual int start() = 0;
|
||||||
virtual void stop() = 0;
|
virtual void stop() = 0;
|
||||||
virtual void wait() = 0;
|
virtual void wait() = 0;
|
||||||
|
virtual void wait_only() {}
|
||||||
|
|
||||||
virtual int set_runnable(TGRunnable &runnable)
|
virtual int set_runnable(TGRunnable &runnable)
|
||||||
{
|
{
|
||||||
@ -798,6 +799,12 @@ public:
|
|||||||
destroy();
|
destroy();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
void wait_only() override
|
||||||
|
{
|
||||||
|
if (timer_ != nullptr) {
|
||||||
|
timer_->wait();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
int schedule(common::ObTimerTask &task, const int64_t delay, bool repeate = false, bool immediate = false) override
|
int schedule(common::ObTimerTask &task, const int64_t delay, bool repeate = false, bool immediate = false) override
|
||||||
{
|
{
|
||||||
@ -1111,10 +1118,12 @@ public:
|
|||||||
#define TG_SET_THREAD_CNT(tg_id, count) TG_INVOKE(tg_id, set_thread_cnt, count)
|
#define TG_SET_THREAD_CNT(tg_id, count) TG_INVOKE(tg_id, set_thread_cnt, count)
|
||||||
#define TG_WAIT_R(tg_id) TG_INVOKE(tg_id, wait)
|
#define TG_WAIT_R(tg_id) TG_INVOKE(tg_id, wait)
|
||||||
#define TG_WAIT(tg_id) do { int r = TG_INVOKE(tg_id, wait); UNUSED(r); } while (0)
|
#define TG_WAIT(tg_id) do { int r = TG_INVOKE(tg_id, wait); UNUSED(r); } while (0)
|
||||||
|
#define TG_WAIT_ONLY(tg_id) do { int r = TG_INVOKE(tg_id, wait_only); UNUSED(r); } while (0)
|
||||||
#define TG_STOP_R(tg_id) TG_INVOKE(tg_id, stop)
|
#define TG_STOP_R(tg_id) TG_INVOKE(tg_id, stop)
|
||||||
#define TG_STOP(tg_id) do { int r = TG_INVOKE(tg_id, stop); UNUSED(r); } while (0)
|
#define TG_STOP(tg_id) do { int r = TG_INVOKE(tg_id, stop); UNUSED(r); } while (0)
|
||||||
#define TG_CANCEL_R(tg_id, args...) TG_INVOKE(tg_id, cancel, args)
|
#define TG_CANCEL_R(tg_id, args...) TG_INVOKE(tg_id, cancel, args)
|
||||||
#define TG_CANCEL(tg_id, args...) do { int r = TG_INVOKE(tg_id, cancel, args); UNUSED(r); } while (0)
|
#define TG_CANCEL(tg_id, args...) do { int r = TG_INVOKE(tg_id, cancel, args); UNUSED(r); } while (0)
|
||||||
|
#define TG_CANCEL_TASK_R(tg_id, args...) TG_INVOKE(tg_id, cancel_task, args)
|
||||||
#define TG_CANCEL_TASK(tg_id, args...) do { int r = TG_INVOKE(tg_id, cancel_task, args); UNUSED(r); } while (0)
|
#define TG_CANCEL_TASK(tg_id, args...) do { int r = TG_INVOKE(tg_id, cancel_task, args); UNUSED(r); } while (0)
|
||||||
#define TG_WAIT_TASK(tg_id, args...) do { int r = TG_INVOKE(tg_id, wait_task, args); UNUSED(r); } while (0)
|
#define TG_WAIT_TASK(tg_id, args...) do { int r = TG_INVOKE(tg_id, wait_task, args); UNUSED(r); } while (0)
|
||||||
#define TG_CANCEL_ALL(tg_id) TG_INVOKE(tg_id, cancel_all)
|
#define TG_CANCEL_ALL(tg_id) TG_INVOKE(tg_id, cancel_all)
|
||||||
|
@ -46,7 +46,11 @@ TEST(TG, timer)
|
|||||||
::usleep(60000);
|
::usleep(60000);
|
||||||
ASSERT_EQ(1, task.task_run_count_);
|
ASSERT_EQ(1, task.task_run_count_);
|
||||||
ASSERT_EQ(OB_SUCCESS, TG_STOP_R(tg_id));
|
ASSERT_EQ(OB_SUCCESS, TG_STOP_R(tg_id));
|
||||||
|
TG_WAIT_ONLY(tg_id);
|
||||||
|
ASSERT_EQ(OB_SUCCESS, TG_CANCEL_R(tg_id, task));
|
||||||
ASSERT_EQ(OB_SUCCESS, TG_WAIT_R(tg_id));
|
ASSERT_EQ(OB_SUCCESS, TG_WAIT_R(tg_id));
|
||||||
|
// TG_WAIT = wait + destroy
|
||||||
|
ASSERT_EQ(OB_ERR_UNEXPECTED, TG_CANCEL_R(tg_id, task));
|
||||||
|
|
||||||
// restart
|
// restart
|
||||||
ASSERT_EQ(OB_SUCCESS, TG_START(tg_id));
|
ASSERT_EQ(OB_SUCCESS, TG_START(tg_id));
|
||||||
|
@ -2436,7 +2436,7 @@ void ObSharedTimer::mtl_wait(ObSharedTimer *&st)
|
|||||||
if (st != NULL) {
|
if (st != NULL) {
|
||||||
int &tg_id = st->tg_id_;
|
int &tg_id = st->tg_id_;
|
||||||
if (tg_id > 0) {
|
if (tg_id > 0) {
|
||||||
TG_WAIT(tg_id);
|
TG_WAIT_ONLY(tg_id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user