[bug fix] fix core at TenantConfigUpdateTask::runTimerTask
This commit is contained in:
38
deps/oblib/src/lib/task/ob_timer.cpp
vendored
38
deps/oblib/src/lib/task/ob_timer.cpp
vendored
@ -216,7 +216,7 @@ int ObTimer::schedule_task(ObTimerTask &task, const int64_t delay, const bool re
|
|||||||
int ObTimer::insert_token(const Token &token)
|
int ObTimer::insert_token(const Token &token)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
int32_t max_task_num= max_task_num_;
|
int64_t max_task_num= max_task_num_;
|
||||||
if (!is_inited_) {
|
if (!is_inited_) {
|
||||||
ret = OB_NOT_INIT;
|
ret = OB_NOT_INIT;
|
||||||
} else {
|
} else {
|
||||||
@ -250,33 +250,33 @@ int ObTimer::cancel_task(const ObTimerTask &task)
|
|||||||
ObMonitor<Mutex>::Lock guard(monitor_);
|
ObMonitor<Mutex>::Lock guard(monitor_);
|
||||||
if (!is_inited_) {
|
if (!is_inited_) {
|
||||||
ret = OB_NOT_INIT;
|
ret = OB_NOT_INIT;
|
||||||
} else {
|
} else if (&task == uncanceled_task_) {
|
||||||
if (&task == uncanceled_task_) {
|
|
||||||
// repeat cancel, do-nothing
|
// repeat cancel, do-nothing
|
||||||
} else if (&task == running_task_) {
|
} else {
|
||||||
|
if (&task == running_task_) {
|
||||||
if (uncanceled_task_ != NULL) {
|
if (uncanceled_task_ != NULL) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
} else {
|
} else {
|
||||||
ATOMIC_STORE(&const_cast<ObTimerTask&>(task).timer_, nullptr);
|
// the token corresponding to running_task_ has been removed in run1(),
|
||||||
|
// so no need remove here
|
||||||
uncanceled_task_ = &const_cast<ObTimerTask&>(task);
|
uncanceled_task_ = &const_cast<ObTimerTask&>(task);
|
||||||
OB_LOG(INFO, "cancel task", KP(this), K_(thread_id), K(wakeup_time_), K(tasks_num_), K(task));
|
OB_LOG(INFO, "cancel task", KP(this), K_(thread_id), K(wakeup_time_), K(tasks_num_), K(task));
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
int64_t pos = -1;
|
|
||||||
for (int64_t i = 0; i < tasks_num_; ++i) {
|
|
||||||
if (&task == tokens_[i].task) {
|
|
||||||
pos = i;
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
}
|
if (OB_SUCC(ret)) {
|
||||||
if (-1 == pos) {
|
|
||||||
// not found, do-nothing
|
|
||||||
} else {
|
|
||||||
ATOMIC_STORE(&const_cast<ObTimerTask&>(task).timer_, nullptr);
|
ATOMIC_STORE(&const_cast<ObTimerTask&>(task).timer_, nullptr);
|
||||||
memmove(&tokens_[pos], &tokens_[pos + 1],
|
// for any tokens_[i].task == &task, we need remove tokens_[i]
|
||||||
sizeof(tokens_[0]) * (tasks_num_ - pos - 1));
|
int64_t i = 0;
|
||||||
|
while(i < tasks_num_) {
|
||||||
|
if (tokens_[i].task == &task) {
|
||||||
|
if (i + 1 < tasks_num_) {
|
||||||
|
memmove(&tokens_[i], &tokens_[i + 1], sizeof(tokens_[0]) * (tasks_num_ - i - 1));
|
||||||
|
}
|
||||||
--tasks_num_;
|
--tasks_num_;
|
||||||
OB_LOG(INFO, "cancel task", KP(this), K_(thread_id), K(wakeup_time_), K(tasks_num_), K(task));
|
OB_LOG(INFO, "cancel task", KP(this), K_(thread_id), K(wakeup_time_), K(tasks_num_), K(task));
|
||||||
|
} else {
|
||||||
|
++i;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -481,8 +481,8 @@ void ObTimer::run1()
|
|||||||
|
|
||||||
void ObTimer::dump() const
|
void ObTimer::dump() const
|
||||||
{
|
{
|
||||||
for (int32_t i = 0; i < tasks_num_; ++i) {
|
for (int64_t i = 0; i < tasks_num_; ++i) {
|
||||||
printf("%d : %ld %ld %p\n", i, tokens_[i].scheduled_time, tokens_[i].delay, tokens_[i].task);
|
printf("%ld : %ld %ld %p\n", i, tokens_[i].scheduled_time, tokens_[i].delay, tokens_[i].task);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
5
deps/oblib/src/lib/task/ob_timer.h
vendored
5
deps/oblib/src/lib/task/ob_timer.h
vendored
@ -89,7 +89,7 @@ public:
|
|||||||
int cancel_task(const ObTimerTask &task);
|
int cancel_task(const ObTimerTask &task);
|
||||||
int wait_task(const ObTimerTask &task);
|
int wait_task(const ObTimerTask &task);
|
||||||
void cancel_all();
|
void cancel_all();
|
||||||
int32_t get_tasks_num() const { return tasks_num_; }
|
int64_t get_tasks_num() const { return tasks_num_; }
|
||||||
void dump() const;
|
void dump() const;
|
||||||
private:
|
private:
|
||||||
struct Token
|
struct Token
|
||||||
@ -100,7 +100,6 @@ private:
|
|||||||
TO_STRING_KV(K(scheduled_time), K(delay), KP(task), KPC(task));
|
TO_STRING_KV(K(scheduled_time), K(delay), KP(task), KPC(task));
|
||||||
int64_t scheduled_time;
|
int64_t scheduled_time;
|
||||||
int64_t delay;
|
int64_t delay;
|
||||||
bool canceled_;
|
|
||||||
ObTimerTask *task;
|
ObTimerTask *task;
|
||||||
};
|
};
|
||||||
int insert_token(const Token &token);
|
int insert_token(const Token &token);
|
||||||
@ -109,7 +108,7 @@ private:
|
|||||||
DISALLOW_COPY_AND_ASSIGN(ObTimer);
|
DISALLOW_COPY_AND_ASSIGN(ObTimer);
|
||||||
private:
|
private:
|
||||||
const static int64_t ELAPSED_TIME_LOG_THREASHOLD = 10 * 60 * 1000 * 1000; // 10 mins
|
const static int64_t ELAPSED_TIME_LOG_THREASHOLD = 10 * 60 * 1000 * 1000; // 10 mins
|
||||||
int32_t tasks_num_;
|
int64_t tasks_num_;
|
||||||
int64_t max_task_num_;
|
int64_t max_task_num_;
|
||||||
int64_t wakeup_time_;
|
int64_t wakeup_time_;
|
||||||
bool is_inited_;
|
bool is_inited_;
|
||||||
|
|||||||
1
deps/oblib/unittest/lib/CMakeLists.txt
vendored
1
deps/oblib/unittest/lib/CMakeLists.txt
vendored
@ -98,6 +98,7 @@ oblib_addtest(stat/test_diagnose_info.cpp)
|
|||||||
oblib_addtest(stat/test_stat_template.cpp)
|
oblib_addtest(stat/test_stat_template.cpp)
|
||||||
oblib_addtest(string/test_fixed_length_string.cpp)
|
oblib_addtest(string/test_fixed_length_string.cpp)
|
||||||
oblib_addtest(task/test_timer.cpp)
|
oblib_addtest(task/test_timer.cpp)
|
||||||
|
oblib_addtest(task/test_cancel_task.cpp)
|
||||||
oblib_addtest(test_fixed_array.cpp)
|
oblib_addtest(test_fixed_array.cpp)
|
||||||
oblib_addtest(test_worker.cpp)
|
oblib_addtest(test_worker.cpp)
|
||||||
oblib_addtest(test_work_queue.cpp)
|
oblib_addtest(test_work_queue.cpp)
|
||||||
|
|||||||
194
deps/oblib/unittest/lib/task/test_cancel_task.cpp
vendored
Normal file
194
deps/oblib/unittest/lib/task/test_cancel_task.cpp
vendored
Normal file
@ -0,0 +1,194 @@
|
|||||||
|
/**
|
||||||
|
* Copyright (c) 2021 OceanBase
|
||||||
|
* OceanBase CE is licensed under Mulan PubL v2.
|
||||||
|
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
||||||
|
* You may obtain a copy of Mulan PubL v2 at:
|
||||||
|
* http://license.coscl.org.cn/MulanPubL-2.0
|
||||||
|
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
||||||
|
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
||||||
|
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
||||||
|
* See the Mulan PubL v2 for more details.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include <gtest/gtest.h>
|
||||||
|
#define private public
|
||||||
|
#include "lib/task/ob_timer.h"
|
||||||
|
#undef private
|
||||||
|
|
||||||
|
using namespace oceanbase::lib;
|
||||||
|
namespace oceanbase
|
||||||
|
{
|
||||||
|
namespace common
|
||||||
|
{
|
||||||
|
|
||||||
|
class TaskCommon : public ObTimerTask
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
TaskCommon()
|
||||||
|
: task_run_count_(0)
|
||||||
|
{}
|
||||||
|
void runTimerTask()
|
||||||
|
{
|
||||||
|
++task_run_count_;
|
||||||
|
::usleep(exec_time_);
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t task_run_count_;
|
||||||
|
int64_t exec_time_ = 100000; // 100ms
|
||||||
|
};
|
||||||
|
|
||||||
|
class TaskCancelSelf : public ObTimerTask
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
TaskCancelSelf(ObTimer &t)
|
||||||
|
: task_run_count_(0), timer_(t)
|
||||||
|
{}
|
||||||
|
void runTimerTask()
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
::usleep(20000); // 20ms
|
||||||
|
if (OB_FAIL(timer_.cancel_task(*this))) {
|
||||||
|
fprintf(stderr, "[%s: %d] call cancel_task failed, ret=%d\n", __FUNCTION__, __LINE__, ret);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
++task_run_count_; // call cancel first, then self-increment
|
||||||
|
::usleep(20000); // 20ms
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t task_run_count_;
|
||||||
|
ObTimer &timer_;
|
||||||
|
};
|
||||||
|
|
||||||
|
class TaskRescheduleAndCancel : public ObTimerTask
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
TaskRescheduleAndCancel(ObTimer &t)
|
||||||
|
: task_run_count_(0), timer_(t)
|
||||||
|
{}
|
||||||
|
void runTimerTask()
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
++task_run_count_;
|
||||||
|
::usleep(20000); // 20ms
|
||||||
|
if (OB_FAIL(timer_.schedule(*this, 0, false))) { // repeate = false
|
||||||
|
fprintf(stderr, "[%s: %d] call schedule failed, ret=%d\n", __FUNCTION__, __LINE__, ret);
|
||||||
|
} else if (OB_FAIL(timer_.cancel_task(*this))) { // both cancel the running one
|
||||||
|
// and the re-scheduled one
|
||||||
|
fprintf(stderr, "[%s: %d] call cancel_task failed, ret=%d\n", __FUNCTION__, __LINE__, ret);
|
||||||
|
}
|
||||||
|
::usleep(20000); // 20ms
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t task_run_count_;
|
||||||
|
ObTimer &timer_;
|
||||||
|
};
|
||||||
|
|
||||||
|
// case1: cancel the task immediately
|
||||||
|
TEST(TestCancelTask, cancel_immediately)
|
||||||
|
{
|
||||||
|
ObTimer timer;
|
||||||
|
ASSERT_EQ(OB_SUCCESS, timer.init());
|
||||||
|
ASSERT_EQ(OB_SUCCESS, timer.start());
|
||||||
|
TaskCommon task;
|
||||||
|
TaskCommon task_another;
|
||||||
|
task_another.exec_time_ = 20000; // 20ms
|
||||||
|
ASSERT_EQ(OB_SUCCESS, timer.schedule(task, 0, true));
|
||||||
|
ASSERT_EQ(OB_SUCCESS, timer.schedule(task_another, 0, false));
|
||||||
|
ASSERT_EQ(2, timer.get_tasks_num());
|
||||||
|
ASSERT_EQ(OB_SUCCESS, timer.cancel(task)); // cancel it
|
||||||
|
ASSERT_EQ(1, timer.get_tasks_num()); // cancel task, do not affect task_another
|
||||||
|
ASSERT_EQ(OB_SUCCESS, timer.cancel(task)); // test duplicate cancel
|
||||||
|
timer.wait_task(task);
|
||||||
|
timer.wait_task(task_another);
|
||||||
|
ASSERT_EQ(0, task.task_run_count_);
|
||||||
|
ASSERT_EQ(1, task_another.task_run_count_);
|
||||||
|
timer.destroy();
|
||||||
|
}
|
||||||
|
|
||||||
|
// case2: cancel the running task
|
||||||
|
TEST(TestCancelTask, cancel_running)
|
||||||
|
{
|
||||||
|
ObTimer timer;
|
||||||
|
ASSERT_EQ(OB_SUCCESS, timer.init());
|
||||||
|
ASSERT_EQ(OB_SUCCESS, timer.start());
|
||||||
|
TaskCommon task;
|
||||||
|
task.exec_time_ = 100000; // 100ms
|
||||||
|
ASSERT_EQ(OB_SUCCESS, timer.schedule(task, 0, true)); // repeate = true
|
||||||
|
ASSERT_EQ(1, timer.get_tasks_num());
|
||||||
|
::usleep(50000);
|
||||||
|
ASSERT_EQ(1, task.task_run_count_); // task is running
|
||||||
|
// the running task has been removed from the task array
|
||||||
|
ASSERT_EQ(0, timer.get_tasks_num());
|
||||||
|
ASSERT_EQ(OB_SUCCESS, timer.cancel(task)); // cancel it
|
||||||
|
timer.wait_task(task);
|
||||||
|
ASSERT_EQ(1, task.task_run_count_); // the repeat task has been canceled.
|
||||||
|
timer.destroy();
|
||||||
|
}
|
||||||
|
|
||||||
|
// case3: cancel the non-running task
|
||||||
|
TEST(TestCancelTask, cancel_non_running)
|
||||||
|
{
|
||||||
|
ObTimer timer;
|
||||||
|
ASSERT_EQ(OB_SUCCESS, timer.init());
|
||||||
|
ASSERT_EQ(OB_SUCCESS, timer.start());
|
||||||
|
TaskCommon task1;
|
||||||
|
task1.exec_time_ = 100000; // 100ms
|
||||||
|
TaskCommon task2;
|
||||||
|
task2.exec_time_ = 50000; //50ms
|
||||||
|
ASSERT_EQ(OB_SUCCESS, timer.schedule(task1, 0, false)); // t1
|
||||||
|
ASSERT_EQ(OB_SUCCESS, timer.schedule(task2, 10000, true)); // t2
|
||||||
|
ASSERT_EQ(OB_SUCCESS, timer.schedule(task2, 30000, true)); // t2
|
||||||
|
ASSERT_EQ(OB_SUCCESS, timer.schedule(task2, 50000, true)); // t4
|
||||||
|
ASSERT_EQ(4, timer.get_tasks_num()); // 4 tasks were scheduled
|
||||||
|
::usleep(50000);
|
||||||
|
ASSERT_EQ(1, task1.task_run_count_); // t1 is running
|
||||||
|
// t1 (i.e., the running task) has been removed from the task array
|
||||||
|
ASSERT_EQ(3, timer.get_tasks_num());
|
||||||
|
ASSERT_EQ(OB_SUCCESS, timer.cancel(task2)); // cancel task2 (i.e., t2, t3, t4)
|
||||||
|
ASSERT_EQ(0, timer.get_tasks_num()); // no tasks in task array
|
||||||
|
// task2 did not run once, because all scheduling for it has been canceled
|
||||||
|
ASSERT_EQ(0, task2.task_run_count_);
|
||||||
|
ASSERT_EQ(OB_SUCCESS, timer.cancel(task2)); // test duplicate cancel
|
||||||
|
timer.wait_task(task1);
|
||||||
|
timer.wait_task(task2);
|
||||||
|
timer.destroy();
|
||||||
|
}
|
||||||
|
|
||||||
|
// case4: cancel self
|
||||||
|
TEST(TestCancelTask, cancel_self)
|
||||||
|
{
|
||||||
|
ObTimer timer;
|
||||||
|
ASSERT_EQ(OB_SUCCESS, timer.init());
|
||||||
|
ASSERT_TRUE(timer.inited());
|
||||||
|
TaskCancelSelf task(timer);
|
||||||
|
ASSERT_EQ(OB_SUCCESS, timer.start());
|
||||||
|
ASSERT_EQ(OB_SUCCESS, timer.schedule(task, 10000, true)); // repeate = true
|
||||||
|
timer.wait_task(task);
|
||||||
|
// when canceling a running task, it can't be stopped immediately,
|
||||||
|
// but has to wait until the end of the current round.
|
||||||
|
ASSERT_EQ(1, task.task_run_count_);
|
||||||
|
timer.destroy();
|
||||||
|
}
|
||||||
|
|
||||||
|
// case5: re-schedule slef, then cancel
|
||||||
|
TEST(TestCancelTask, reschedule_self_and_cancel)
|
||||||
|
{
|
||||||
|
ObTimer timer;
|
||||||
|
ASSERT_EQ(OB_SUCCESS, timer.init());
|
||||||
|
ASSERT_TRUE(timer.inited());
|
||||||
|
TaskRescheduleAndCancel task(timer);
|
||||||
|
ASSERT_EQ(OB_SUCCESS, timer.start());
|
||||||
|
ASSERT_EQ(OB_SUCCESS, timer.schedule(task, 0));
|
||||||
|
timer.wait_task(task);
|
||||||
|
ASSERT_EQ(task.task_run_count_, 1);
|
||||||
|
timer.destroy();
|
||||||
|
}
|
||||||
|
|
||||||
|
} // end namespace common
|
||||||
|
} // end namespace oceanbase
|
||||||
|
|
||||||
|
int main(int argc, char **argv)
|
||||||
|
{
|
||||||
|
testing::InitGoogleTest(&argc, argv);
|
||||||
|
return RUN_ALL_TESTS();
|
||||||
|
}
|
||||||
3
deps/oblib/unittest/lib/task/test_timer.cpp
vendored
3
deps/oblib/unittest/lib/task/test_timer.cpp
vendored
@ -144,7 +144,8 @@ TEST(TestTimer, task_cancel_wait)
|
|||||||
task.exec_time_ = 1000000;
|
task.exec_time_ = 1000000;
|
||||||
int64_t cur_time = ObTimeUtility::current_time();
|
int64_t cur_time = ObTimeUtility::current_time();
|
||||||
ASSERT_EQ(OB_SUCCESS, timer.schedule(task, 0, true));
|
ASSERT_EQ(OB_SUCCESS, timer.schedule(task, 0, true));
|
||||||
usleep(10000);
|
// it must sleep for enough time to ensure that the task has started running
|
||||||
|
usleep(50000);
|
||||||
ASSERT_EQ(OB_SUCCESS, timer.cancel_task(task));
|
ASSERT_EQ(OB_SUCCESS, timer.cancel_task(task));
|
||||||
// repeat cancel
|
// repeat cancel
|
||||||
ASSERT_EQ(OB_SUCCESS, timer.cancel_task(task));
|
ASSERT_EQ(OB_SUCCESS, timer.cancel_task(task));
|
||||||
|
|||||||
@ -52,7 +52,6 @@ public:
|
|||||||
virtual ~TenantConfigUpdateTask() {}
|
virtual ~TenantConfigUpdateTask() {}
|
||||||
TenantConfigUpdateTask(const TenantConfigUpdateTask &) = delete;
|
TenantConfigUpdateTask(const TenantConfigUpdateTask &) = delete;
|
||||||
TenantConfigUpdateTask &operator=(const TenantConfigUpdateTask &) = delete;
|
TenantConfigUpdateTask &operator=(const TenantConfigUpdateTask &) = delete;
|
||||||
void set_tenant_config(ObTenantConfig *config) { tenant_config_ = config; }
|
|
||||||
void runTimerTask(void) override;
|
void runTimerTask(void) override;
|
||||||
ObTenantConfigMgr *config_mgr_;
|
ObTenantConfigMgr *config_mgr_;
|
||||||
ObTenantConfig *tenant_config_;
|
ObTenantConfig *tenant_config_;
|
||||||
|
|||||||
Reference in New Issue
Block a user