From e9f0e97a5a25c7966a3ccd1cfdeed674931220c7 Mon Sep 17 00:00:00 2001 From: obdev Date: Thu, 8 Feb 2024 08:04:10 +0000 Subject: [PATCH] [bug fix] fix core at TenantConfigUpdateTask::runTimerTask --- deps/oblib/src/lib/task/ob_timer.cpp | 44 ++-- deps/oblib/src/lib/task/ob_timer.h | 5 +- deps/oblib/unittest/lib/CMakeLists.txt | 1 + .../unittest/lib/task/test_cancel_task.cpp | 194 ++++++++++++++++++ deps/oblib/unittest/lib/task/test_timer.cpp | 3 +- src/observer/omt/ob_tenant_config.h | 1 - 6 files changed, 221 insertions(+), 27 deletions(-) create mode 100644 deps/oblib/unittest/lib/task/test_cancel_task.cpp diff --git a/deps/oblib/src/lib/task/ob_timer.cpp b/deps/oblib/src/lib/task/ob_timer.cpp index a49a95ebb..bed0f2979 100644 --- a/deps/oblib/src/lib/task/ob_timer.cpp +++ b/deps/oblib/src/lib/task/ob_timer.cpp @@ -216,7 +216,7 @@ int ObTimer::schedule_task(ObTimerTask &task, const int64_t delay, const bool re int ObTimer::insert_token(const Token &token) { int ret = OB_SUCCESS; - int32_t max_task_num= max_task_num_; + int64_t max_task_num= max_task_num_; if (!is_inited_) { ret = OB_NOT_INIT; } else { @@ -250,34 +250,34 @@ int ObTimer::cancel_task(const ObTimerTask &task) ObMonitor::Lock guard(monitor_); if (!is_inited_) { ret = OB_NOT_INIT; - } else { - if (&task == uncanceled_task_) { + } else if (&task == uncanceled_task_) { // repeat cancel, do-nothing - } else if (&task == running_task_) { + } else { + if (&task == running_task_) { if (uncanceled_task_ != NULL) { ret = OB_ERR_UNEXPECTED; } else { - ATOMIC_STORE(&const_cast(task).timer_, nullptr); + // the token corresponding to running_task_ has been removed in run1(), + // so no need remove here uncanceled_task_ = &const_cast(task); OB_LOG(INFO, "cancel task", KP(this), K_(thread_id), K(wakeup_time_), K(tasks_num_), K(task)); } - } else { - int64_t pos = -1; - for (int64_t i = 0; i < tasks_num_; ++i) { - if (&task == tokens_[i].task) { - pos = i; - break; + } + if (OB_SUCC(ret)) { + ATOMIC_STORE(&const_cast(task).timer_, nullptr); + // for any tokens_[i].task == &task, we need remove tokens_[i] + int64_t i = 0; + while(i < tasks_num_) { + if (tokens_[i].task == &task) { + if (i + 1 < tasks_num_) { + memmove(&tokens_[i], &tokens_[i + 1], sizeof(tokens_[0]) * (tasks_num_ - i - 1)); + } + --tasks_num_; + OB_LOG(INFO, "cancel task", KP(this), K_(thread_id), K(wakeup_time_), K(tasks_num_), K(task)); + } else { + ++i; } } - if (-1 == pos) { - // not found, do-nothing - } else { - ATOMIC_STORE(&const_cast(task).timer_, nullptr); - memmove(&tokens_[pos], &tokens_[pos + 1], - sizeof(tokens_[0]) * (tasks_num_ - pos - 1)); - --tasks_num_; - OB_LOG(INFO, "cancel task", KP(this), K_(thread_id), K(wakeup_time_), K(tasks_num_), K(task)); - } } } return ret; @@ -481,8 +481,8 @@ void ObTimer::run1() void ObTimer::dump() const { - for (int32_t i = 0; i < tasks_num_; ++i) { - printf("%d : %ld %ld %p\n", i, tokens_[i].scheduled_time, tokens_[i].delay, tokens_[i].task); + for (int64_t i = 0; i < tasks_num_; ++i) { + printf("%ld : %ld %ld %p\n", i, tokens_[i].scheduled_time, tokens_[i].delay, tokens_[i].task); } } diff --git a/deps/oblib/src/lib/task/ob_timer.h b/deps/oblib/src/lib/task/ob_timer.h index 58835410d..7a9dde6b9 100644 --- a/deps/oblib/src/lib/task/ob_timer.h +++ b/deps/oblib/src/lib/task/ob_timer.h @@ -89,7 +89,7 @@ public: int cancel_task(const ObTimerTask &task); int wait_task(const ObTimerTask &task); void cancel_all(); - int32_t get_tasks_num() const { return tasks_num_; } + int64_t get_tasks_num() const { return tasks_num_; } void dump() const; private: struct Token @@ -100,7 +100,6 @@ private: TO_STRING_KV(K(scheduled_time), K(delay), KP(task), KPC(task)); int64_t scheduled_time; int64_t delay; - bool canceled_; ObTimerTask *task; }; int insert_token(const Token &token); @@ -109,7 +108,7 @@ private: DISALLOW_COPY_AND_ASSIGN(ObTimer); private: const static int64_t ELAPSED_TIME_LOG_THREASHOLD = 10 * 60 * 1000 * 1000; // 10 mins - int32_t tasks_num_; + int64_t tasks_num_; int64_t max_task_num_; int64_t wakeup_time_; bool is_inited_; diff --git a/deps/oblib/unittest/lib/CMakeLists.txt b/deps/oblib/unittest/lib/CMakeLists.txt index 1011f704e..a0d0c377e 100644 --- a/deps/oblib/unittest/lib/CMakeLists.txt +++ b/deps/oblib/unittest/lib/CMakeLists.txt @@ -98,6 +98,7 @@ oblib_addtest(stat/test_diagnose_info.cpp) oblib_addtest(stat/test_stat_template.cpp) oblib_addtest(string/test_fixed_length_string.cpp) oblib_addtest(task/test_timer.cpp) +oblib_addtest(task/test_cancel_task.cpp) oblib_addtest(test_fixed_array.cpp) oblib_addtest(test_worker.cpp) oblib_addtest(test_work_queue.cpp) diff --git a/deps/oblib/unittest/lib/task/test_cancel_task.cpp b/deps/oblib/unittest/lib/task/test_cancel_task.cpp new file mode 100644 index 000000000..afe7ef9fd --- /dev/null +++ b/deps/oblib/unittest/lib/task/test_cancel_task.cpp @@ -0,0 +1,194 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#include +#define private public +#include "lib/task/ob_timer.h" +#undef private + +using namespace oceanbase::lib; +namespace oceanbase +{ +namespace common +{ + +class TaskCommon : public ObTimerTask +{ +public: + TaskCommon() + : task_run_count_(0) + {} + void runTimerTask() + { + ++task_run_count_; + ::usleep(exec_time_); + } + + int64_t task_run_count_; + int64_t exec_time_ = 100000; // 100ms +}; + +class TaskCancelSelf : public ObTimerTask +{ +public: + TaskCancelSelf(ObTimer &t) + : task_run_count_(0), timer_(t) + {} + void runTimerTask() + { + int ret = OB_SUCCESS; + ::usleep(20000); // 20ms + if (OB_FAIL(timer_.cancel_task(*this))) { + fprintf(stderr, "[%s: %d] call cancel_task failed, ret=%d\n", __FUNCTION__, __LINE__, ret); + return; + } + ++task_run_count_; // call cancel first, then self-increment + ::usleep(20000); // 20ms + } + + int64_t task_run_count_; + ObTimer &timer_; +}; + +class TaskRescheduleAndCancel : public ObTimerTask +{ +public: + TaskRescheduleAndCancel(ObTimer &t) + : task_run_count_(0), timer_(t) + {} + void runTimerTask() + { + int ret = OB_SUCCESS; + ++task_run_count_; + ::usleep(20000); // 20ms + if (OB_FAIL(timer_.schedule(*this, 0, false))) { // repeate = false + fprintf(stderr, "[%s: %d] call schedule failed, ret=%d\n", __FUNCTION__, __LINE__, ret); + } else if (OB_FAIL(timer_.cancel_task(*this))) { // both cancel the running one + // and the re-scheduled one + fprintf(stderr, "[%s: %d] call cancel_task failed, ret=%d\n", __FUNCTION__, __LINE__, ret); + } + ::usleep(20000); // 20ms + } + + int64_t task_run_count_; + ObTimer &timer_; +}; + +// case1: cancel the task immediately +TEST(TestCancelTask, cancel_immediately) +{ + ObTimer timer; + ASSERT_EQ(OB_SUCCESS, timer.init()); + ASSERT_EQ(OB_SUCCESS, timer.start()); + TaskCommon task; + TaskCommon task_another; + task_another.exec_time_ = 20000; // 20ms + ASSERT_EQ(OB_SUCCESS, timer.schedule(task, 0, true)); + ASSERT_EQ(OB_SUCCESS, timer.schedule(task_another, 0, false)); + ASSERT_EQ(2, timer.get_tasks_num()); + ASSERT_EQ(OB_SUCCESS, timer.cancel(task)); // cancel it + ASSERT_EQ(1, timer.get_tasks_num()); // cancel task, do not affect task_another + ASSERT_EQ(OB_SUCCESS, timer.cancel(task)); // test duplicate cancel + timer.wait_task(task); + timer.wait_task(task_another); + ASSERT_EQ(0, task.task_run_count_); + ASSERT_EQ(1, task_another.task_run_count_); + timer.destroy(); +} + +// case2: cancel the running task +TEST(TestCancelTask, cancel_running) +{ + ObTimer timer; + ASSERT_EQ(OB_SUCCESS, timer.init()); + ASSERT_EQ(OB_SUCCESS, timer.start()); + TaskCommon task; + task.exec_time_ = 100000; // 100ms + ASSERT_EQ(OB_SUCCESS, timer.schedule(task, 0, true)); // repeate = true + ASSERT_EQ(1, timer.get_tasks_num()); + ::usleep(50000); + ASSERT_EQ(1, task.task_run_count_); // task is running + // the running task has been removed from the task array + ASSERT_EQ(0, timer.get_tasks_num()); + ASSERT_EQ(OB_SUCCESS, timer.cancel(task)); // cancel it + timer.wait_task(task); + ASSERT_EQ(1, task.task_run_count_); // the repeat task has been canceled. + timer.destroy(); +} + +// case3: cancel the non-running task +TEST(TestCancelTask, cancel_non_running) +{ + ObTimer timer; + ASSERT_EQ(OB_SUCCESS, timer.init()); + ASSERT_EQ(OB_SUCCESS, timer.start()); + TaskCommon task1; + task1.exec_time_ = 100000; // 100ms + TaskCommon task2; + task2.exec_time_ = 50000; //50ms + ASSERT_EQ(OB_SUCCESS, timer.schedule(task1, 0, false)); // t1 + ASSERT_EQ(OB_SUCCESS, timer.schedule(task2, 10000, true)); // t2 + ASSERT_EQ(OB_SUCCESS, timer.schedule(task2, 30000, true)); // t2 + ASSERT_EQ(OB_SUCCESS, timer.schedule(task2, 50000, true)); // t4 + ASSERT_EQ(4, timer.get_tasks_num()); // 4 tasks were scheduled + ::usleep(50000); + ASSERT_EQ(1, task1.task_run_count_); // t1 is running + // t1 (i.e., the running task) has been removed from the task array + ASSERT_EQ(3, timer.get_tasks_num()); + ASSERT_EQ(OB_SUCCESS, timer.cancel(task2)); // cancel task2 (i.e., t2, t3, t4) + ASSERT_EQ(0, timer.get_tasks_num()); // no tasks in task array + // task2 did not run once, because all scheduling for it has been canceled + ASSERT_EQ(0, task2.task_run_count_); + ASSERT_EQ(OB_SUCCESS, timer.cancel(task2)); // test duplicate cancel + timer.wait_task(task1); + timer.wait_task(task2); + timer.destroy(); +} + +// case4: cancel self +TEST(TestCancelTask, cancel_self) +{ + ObTimer timer; + ASSERT_EQ(OB_SUCCESS, timer.init()); + ASSERT_TRUE(timer.inited()); + TaskCancelSelf task(timer); + ASSERT_EQ(OB_SUCCESS, timer.start()); + ASSERT_EQ(OB_SUCCESS, timer.schedule(task, 10000, true)); // repeate = true + timer.wait_task(task); + // when canceling a running task, it can't be stopped immediately, + // but has to wait until the end of the current round. + ASSERT_EQ(1, task.task_run_count_); + timer.destroy(); +} + +// case5: re-schedule slef, then cancel +TEST(TestCancelTask, reschedule_self_and_cancel) +{ + ObTimer timer; + ASSERT_EQ(OB_SUCCESS, timer.init()); + ASSERT_TRUE(timer.inited()); + TaskRescheduleAndCancel task(timer); + ASSERT_EQ(OB_SUCCESS, timer.start()); + ASSERT_EQ(OB_SUCCESS, timer.schedule(task, 0)); + timer.wait_task(task); + ASSERT_EQ(task.task_run_count_, 1); + timer.destroy(); +} + +} // end namespace common +} // end namespace oceanbase + +int main(int argc, char **argv) +{ + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/deps/oblib/unittest/lib/task/test_timer.cpp b/deps/oblib/unittest/lib/task/test_timer.cpp index dd0eff48f..9ae5b369a 100644 --- a/deps/oblib/unittest/lib/task/test_timer.cpp +++ b/deps/oblib/unittest/lib/task/test_timer.cpp @@ -144,7 +144,8 @@ TEST(TestTimer, task_cancel_wait) task.exec_time_ = 1000000; int64_t cur_time = ObTimeUtility::current_time(); ASSERT_EQ(OB_SUCCESS, timer.schedule(task, 0, true)); - usleep(10000); + // it must sleep for enough time to ensure that the task has started running + usleep(50000); ASSERT_EQ(OB_SUCCESS, timer.cancel_task(task)); // repeat cancel ASSERT_EQ(OB_SUCCESS, timer.cancel_task(task)); diff --git a/src/observer/omt/ob_tenant_config.h b/src/observer/omt/ob_tenant_config.h index a92b8eed7..3a8402191 100644 --- a/src/observer/omt/ob_tenant_config.h +++ b/src/observer/omt/ob_tenant_config.h @@ -52,7 +52,6 @@ public: virtual ~TenantConfigUpdateTask() {} TenantConfigUpdateTask(const TenantConfigUpdateTask &) = delete; TenantConfigUpdateTask &operator=(const TenantConfigUpdateTask &) = delete; - void set_tenant_config(ObTenantConfig *config) { tenant_config_ = config; } void runTimerTask(void) override; ObTenantConfigMgr *config_mgr_; ObTenantConfig *tenant_config_;