From 607bfa740b271ede14ac58a6df186f468fa88c15 Mon Sep 17 00:00:00 2001 From: obdev Date: Mon, 6 Nov 2023 11:43:57 +0000 Subject: [PATCH] fix bug: register interrupt with guard not has a return code --- .../interrupt/ob_global_interrupt_call.cpp | 7 +- src/sql/engine/px/ob_px_interruption.cpp | 6 +- src/sql/engine/px/ob_px_interruption.h | 2 + src/sql/engine/px/ob_px_rpc_processor.cpp | 32 +- src/sql/engine/px/ob_px_worker.cpp | 4 +- unittest/share/interrupt/CMakeLists.txt | 3 +- .../share/interrupt/test_interrupt_system.cpp | 412 +++++++++++------- 7 files changed, 293 insertions(+), 173 deletions(-) diff --git a/src/share/interrupt/ob_global_interrupt_call.cpp b/src/share/interrupt/ob_global_interrupt_call.cpp index ab2857464..58be7c433 100644 --- a/src/share/interrupt/ob_global_interrupt_call.cpp +++ b/src/share/interrupt/ob_global_interrupt_call.cpp @@ -140,6 +140,11 @@ int ObGlobalInterruptManager::register_checker(ObInterruptChecker *checker, } while (ret == OB_HASH_NOT_EXIST); if (OB_SUCC(ret)) { ATOMIC_INC(&(checker->ref_count_)); + } else { + LOG_WARN("failed to register_checker"); + if (OB_NOT_NULL(checker_node)) { + ob_delete(checker_node); + } } } return ret; @@ -159,7 +164,7 @@ int ObGlobalInterruptManager::unregister_checker(ObInterruptChecker *checker, LIB_LOG(ERROR, "invaild checker pointer"); } else { ObInterruptGetCheckerNodeCall get_node_call(checker); - if (OB_HASH_NOT_EXIST == map_.read_atomic(tid, get_node_call)) { + if (OB_HASH_NOT_EXIST == (ret = map_.read_atomic(tid, get_node_call))) { LIB_LOG(ERROR, "unregister checker failed", K(ret)); } else if (!get_node_call.is_checker_exist()) { ret = OB_HASH_NOT_EXIST; diff --git a/src/sql/engine/px/ob_px_interruption.cpp b/src/sql/engine/px/ob_px_interruption.cpp index 93400038f..d19b086b3 100644 --- a/src/sql/engine/px/ob_px_interruption.cpp +++ b/src/sql/engine/px/ob_px_interruption.cpp @@ -25,12 +25,14 @@ OB_SERIALIZE_MEMBER(ObPxInterruptID, query_interrupt_id_, px_interrupt_id_); ObPxInterruptGuard::ObPxInterruptGuard(const ObInterruptibleTaskID &interrupt_id) { interrupt_id_ = interrupt_id; - SET_INTERRUPTABLE(interrupt_id_); + interrupt_reg_ret_ = SET_INTERRUPTABLE(interrupt_id_); } ObPxInterruptGuard::~ObPxInterruptGuard() { - UNSET_INTERRUPTABLE(interrupt_id_); + if (OB_SUCCESS == interrupt_reg_ret_) { + UNSET_INTERRUPTABLE(interrupt_id_); + } } int ObInterruptUtil::broadcast_px(ObIArray &dfos, int int_code) diff --git a/src/sql/engine/px/ob_px_interruption.h b/src/sql/engine/px/ob_px_interruption.h index 4e8f76f87..992320201 100644 --- a/src/sql/engine/px/ob_px_interruption.h +++ b/src/sql/engine/px/ob_px_interruption.h @@ -55,8 +55,10 @@ class ObPxInterruptGuard public: ObPxInterruptGuard(const common::ObInterruptibleTaskID &interrupt_id_); ~ObPxInterruptGuard(); + int get_interrupt_reg_ret() { return interrupt_reg_ret_; } private: common::ObInterruptibleTaskID interrupt_id_; + int interrupt_reg_ret_; }; diff --git a/src/sql/engine/px/ob_px_rpc_processor.cpp b/src/sql/engine/px/ob_px_rpc_processor.cpp index 7c9f6b1ef..d355f935f 100644 --- a/src/sql/engine/px/ob_px_rpc_processor.cpp +++ b/src/sql/engine/px/ob_px_rpc_processor.cpp @@ -73,11 +73,15 @@ int ObInitSqcP::process() */ if (OB_NOT_NULL(sqc_handler)) { ObPxRpcInitSqcArgs &arg = sqc_handler->get_sqc_init_arg(); - SET_INTERRUPTABLE(arg.sqc_.get_interrupt_id().px_interrupt_id_); - unregister_interrupt_ = true; + if (OB_FAIL(SET_INTERRUPTABLE(arg.sqc_.get_interrupt_id().px_interrupt_id_))) { + LOG_WARN("sqc failed to SET_INTERRUPTABLE"); + } else { + unregister_interrupt_ = true; + } } - if (OB_ISNULL(sqc_handler)) { + if (OB_FAIL(ret)) { + } else if (OB_ISNULL(sqc_handler)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("Sqc handler can't be nullptr", K(ret)); } else if (OB_FAIL(sqc_handler->init_env())) { @@ -408,15 +412,19 @@ int ObInitFastSqcP::process() ObPxRpcInitSqcArgs &arg = sqc_handler->get_sqc_init_arg(); arg.sqc_.set_task_count(1); ObPxInterruptGuard px_int_guard(arg.sqc_.get_interrupt_id().px_interrupt_id_); - lib::CompatModeGuard g(session->get_compatibility_mode() == ORACLE_MODE ? - lib::Worker::CompatMode::ORACLE : lib::Worker::CompatMode::MYSQL); - sqc_handler->set_tenant_id(session->get_effective_tenant_id()); - LOG_TRACE("process dfo", - K(arg), - K(session->get_compatibility_mode()), - K(sqc_handler->get_reserved_px_thread_count())); - if (OB_FAIL(startup_normal_sqc(*sqc_handler))) { - LOG_WARN("fail to startup normal sqc", K(ret)); + if (OB_FAIL(px_int_guard.get_interrupt_reg_ret())) { + LOG_WARN("fast sqc failed to SET_INTERRUPTABLE"); + } else { + lib::CompatModeGuard g(session->get_compatibility_mode() == ORACLE_MODE ? + lib::Worker::CompatMode::ORACLE : lib::Worker::CompatMode::MYSQL); + sqc_handler->set_tenant_id(session->get_effective_tenant_id()); + LOG_TRACE("process dfo", + K(arg), + K(session->get_compatibility_mode()), + K(sqc_handler->get_reserved_px_thread_count())); + if (OB_FAIL(startup_normal_sqc(*sqc_handler))) { + LOG_WARN("fail to startup normal sqc", K(ret)); + } } } diff --git a/src/sql/engine/px/ob_px_worker.cpp b/src/sql/engine/px/ob_px_worker.cpp index a62307d57..fc8b49076 100644 --- a/src/sql/engine/px/ob_px_worker.cpp +++ b/src/sql/engine/px/ob_px_worker.cpp @@ -163,7 +163,9 @@ void PxWorkerFunctor::operator ()() const bool enable_trace_log = lib::is_trace_log_enabled(); //ensure PX worker skip updating timeout_ts_ by ntp offset THIS_WORKER.set_ntp_offset(0); - if (OB_NOT_NULL(sqc_handler) && OB_LIKELY(!sqc_handler->has_interrupted())) { + if (OB_FAIL(px_int_guard.get_interrupt_reg_ret())) { + LOG_WARN("px worker failed to SET_INTERRUPTABLE"); + } else if (OB_NOT_NULL(sqc_handler) && OB_LIKELY(!sqc_handler->has_interrupted())) { THIS_WORKER.set_worker_level(sqc_handler->get_rpc_level()); THIS_WORKER.set_curr_request_level(sqc_handler->get_rpc_level()); LOG_TRACE("init flt ctx", K(sqc_handler->get_flt_ctx())); diff --git a/unittest/share/interrupt/CMakeLists.txt b/unittest/share/interrupt/CMakeLists.txt index 82146a508..91be5125e 100644 --- a/unittest/share/interrupt/CMakeLists.txt +++ b/unittest/share/interrupt/CMakeLists.txt @@ -1,2 +1 @@ -##ob_unittest(test_interrupt_system.cpp) - +ob_unittest(test_interrupt_system) diff --git a/unittest/share/interrupt/test_interrupt_system.cpp b/unittest/share/interrupt/test_interrupt_system.cpp index c55a7a0d2..813b0b48f 100644 --- a/unittest/share/interrupt/test_interrupt_system.cpp +++ b/unittest/share/interrupt/test_interrupt_system.cpp @@ -15,105 +15,192 @@ #define private public -#include "lib/coro/testing.h" #include "share/interrupt/ob_interrupt_rpc_proxy.h" #include "share/interrupt/ob_global_interrupt_call.h" +#include "sql/engine/px/ob_px_interruption.h" #include "rpc/testing.h" +#include using namespace oceanbase::obrpc; -namespace oceanbase { -namespace common { - -TEST(ObGlobalInterruptManager, normal) +namespace oceanbase +{ +namespace common { - rpctesting::Service service; - ObInterruptProcessor p; - service.init(); - service.reg_processor(&p); - ObInterruptRpcProxy proxy; - service.get_proxy(proxy); +rpctesting::Service g_service; +ObInterruptProcessor g_processor; +ObInterruptRpcProxy g_proxy; +ObGlobalInterruptManager *g_manager(nullptr); +void init_rpc() +{ int ret = OB_SUCCESS; - int temp_id = 1; - int icode = 1; + ret = g_service.init(); + ASSERT_EQ(OB_SUCCESS, ret); + + ret = g_service.reg_processor(&g_processor); + ASSERT_EQ(OB_SUCCESS, ret); + + ret = g_service.get_proxy(g_proxy); + ASSERT_EQ(OB_SUCCESS, ret); +} + +TEST(ObGlobalInterruptManager, init_manager) +{ + int ret = OB_SUCCESS; + init_rpc(); ObAddr local(ObAddr::IPV4, "127.0.0.1", 10086); - ObAddr dst = service.get_dst(); - - /// Singleton acquisition - ObGlobalInterruptManager *manager = ObGlobalInterruptManager::getInstance(); - ASSERT_EQ(false, nullptr == manager); + ObAddr dst = g_service.get_dst(); + // prepare mocked ObInterruptibleTaskID, ObInterruptCode, + ObInterruptibleTaskID interrupt_id(1, 1); + ObInterruptStackInfo interrupt_stack_info; + interrupt_stack_info.set_info(1 /*from tid*/, local /*from_svr_addr*/, + "mock error" /*extra_msg*/); + ObInterruptCode interrupt_code(OB_RPC_CONNECT_ERROR, interrupt_stack_info); ObInterruptChecker *checker = nullptr; - /// Perform various operations without initialization, correct feedback - ret = manager->interrupt(temp_id, icode); + // Singleton acquisition + g_manager = ObGlobalInterruptManager::getInstance(); + ASSERT_EQ(false, nullptr == g_manager); + + // test init ObGlobalInterruptManager + ret = g_manager->interrupt(interrupt_id, interrupt_code); ASSERT_EQ(OB_NOT_INIT, ret); - ret = manager->interrupt(local, temp_id, icode); + ret = g_manager->interrupt(local, interrupt_id, interrupt_code); ASSERT_EQ(OB_NOT_INIT, ret); - ret = manager->register_checker(checker); + ret = g_manager->register_checker(checker, interrupt_id); ASSERT_EQ(OB_NOT_INIT, ret); - ret = manager->unregister_checker(checker); + ret = g_manager->unregister_checker(checker, interrupt_id); ASSERT_EQ(OB_NOT_INIT, ret); - /// No RPC initialization is illegal - ret = manager->init(local, nullptr); + // No RPC initialization is illegal + ret = g_manager->init(local, nullptr); ASSERT_EQ(OB_INVALID_ARGUMENT, ret); - /// Regular initialization - ret = manager->init(local, &proxy); + // Regular initialization + ret = g_manager->init(local, &g_proxy); ASSERT_EQ(OB_SUCCESS, ret); - /// Initialize correctly and feedback error many times - ret = manager->init(local, &proxy); + // Initialize correctly and feedback error many times + ret = g_manager->init(local, &g_proxy); ASSERT_EQ(OB_INIT_TWICE, ret); +} - ret = manager->register_checker(checker); - ASSERT_EQ(OB_INVALID_ARGUMENT, ret); +TEST(ObGlobalInterruptManager, register_and_unregister) +{ + int ret = OB_SUCCESS; + ObAddr local(ObAddr::IPV4, "127.0.0.1", 10086); + ObAddr dst = g_service.get_dst(); - ret = manager->unregister_checker(checker); - ASSERT_EQ(OB_INVALID_ARGUMENT, ret); + // prepare mocked ObInterruptibleTaskID, ObInterruptCode, + ObInterruptibleTaskID interrupt_id(1, 1); + ObInterruptStackInfo interrupt_stack_info; + interrupt_stack_info.set_info(1 /*from tid*/, local /*from_svr_addr*/, + "mock error" /*extra_msg*/); + ObInterruptCode interrupt_code(OB_RPC_CONNECT_ERROR, interrupt_stack_info); - /// Get the inspector - checker = new ObInterruptChecker(true, temp_id); - /// Test self-registration - ret = checker->register_checker(temp_id); + // prepare checker + ObInterruptChecker *checker = nullptr; + checker = new ObInterruptChecker(false, 1); // no coroutine again + + // Test self-registration + ret = checker->register_checker(interrupt_id); ASSERT_EQ(OB_SUCCESS, ret); - /// Test duplicate registration - ret = checker->register_checker(temp_id); - ASSERT_EQ(OB_INIT_TWICE, ret); - ret = checker->register_checker(temp_id + 1); - ASSERT_EQ(OB_INIT_TWICE, ret); - ret = manager->register_checker(checker); - ASSERT_EQ(OB_INIT_TWICE, ret); + // Test duplicate registration + ret = checker->register_checker(interrupt_id); + ASSERT_EQ(OB_HASH_EXIST, ret); + ret = g_manager->register_checker(checker, interrupt_id); + ASSERT_EQ(OB_HASH_EXIST, ret); - /// Semaphore detection without interruption + // unregister checker + ret = g_manager->unregister_checker(checker, interrupt_id); + ASSERT_EQ(OB_SUCCESS, ret); + + // duplicate unregister checker + ret = g_manager->unregister_checker(checker, interrupt_id); + ASSERT_EQ(OB_HASH_NOT_EXIST, ret); + EXPECT_EQ(0, g_manager->map_.size()); + + delete checker; + + // Test group registration and deregistration + ObInterruptChecker *checker0 = new ObInterruptChecker(false, 1); + ObInterruptChecker *checker1 = new ObInterruptChecker(false, 1); + ObInterruptChecker *checker2 = new ObInterruptChecker(false, 1); + ObInterruptChecker *checker3 = new ObInterruptChecker(false, 1); + ret = g_manager->register_checker(checker0, interrupt_id); + EXPECT_EQ(OB_SUCCESS, ret); + ret = g_manager->register_checker(checker1, interrupt_id); + EXPECT_EQ(OB_SUCCESS, ret); + ret = g_manager->register_checker(checker2, interrupt_id); + EXPECT_EQ(OB_SUCCESS, ret); + ret = g_manager->register_checker(checker3, interrupt_id); + EXPECT_EQ(OB_SUCCESS, ret); + ret = g_manager->unregister_checker(checker0, interrupt_id); + EXPECT_EQ(OB_SUCCESS, ret); + ret = g_manager->unregister_checker(checker1, interrupt_id); + EXPECT_EQ(OB_SUCCESS, ret); + ret = g_manager->unregister_checker(checker2, interrupt_id); + EXPECT_EQ(OB_SUCCESS, ret); + ret = g_manager->unregister_checker(checker3, interrupt_id); + EXPECT_EQ(OB_SUCCESS, ret); + delete checker0; + delete checker1; + delete checker2; + delete checker3; + + EXPECT_EQ(0, g_manager->map_.size()); +} + +TEST(ObGlobalInterruptManager, interrupt) +{ + int ret = OB_SUCCESS; + ObAddr local(ObAddr::IPV4, "127.0.0.1", 10086); + ObAddr dst = g_service.get_dst(); + + // prepare mocked ObInterruptibleTaskID, ObInterruptCode, + ObInterruptibleTaskID interrupt_id(1, 1); + ObInterruptStackInfo interrupt_stack_info; + interrupt_stack_info.set_info(1 /*from tid*/, local /*from_svr_addr*/, + "mock error" /*extra_msg*/); + ObInterruptCode interrupt_code(OB_RPC_CONNECT_ERROR, interrupt_stack_info); + + // prepare checker && register checker + ObInterruptChecker *checker = nullptr; + checker = new ObInterruptChecker(false, 1); // no coroutine again + ret = checker->register_checker(interrupt_id); + ASSERT_EQ(OB_SUCCESS, ret); + + // Semaphore detection without interruption bool r = checker->is_interrupted(); ASSERT_EQ(false, r); - /// Local execution interrupt check semaphore - ret = manager->interrupt(temp_id, icode); + // Local execution interrupt check semaphore + ret = g_manager->interrupt(interrupt_id, interrupt_code); ASSERT_EQ(OB_SUCCESS, ret); r = checker->is_interrupted(); ASSERT_EQ(true, r); - /// Release checker - ret = manager->unregister_checker(checker); + // unregister checker + ret = g_manager->unregister_checker(checker, interrupt_id); ASSERT_EQ(OB_SUCCESS, ret); - /// The interrupt can still be called after the checker is released - ret = manager->interrupt(local, temp_id, icode); + // The interrupt can still be called after the checker is unregistered + // but can not actually do interrupt, the return code is OB_HASH_NOT_EXIST BUT overwrite by + // OB_SUCCESS + ret = g_manager->interrupt(local, interrupt_id, interrupt_code); ASSERT_EQ(OB_SUCCESS, ret); - /// Downgrade to local interrupt test when remote interrupt is passed to local address - manager->register_checker(checker); - ret = manager->interrupt(local, temp_id, icode); + // Downgrade to local interrupt test when remote interrupt is passed to local address + g_manager->register_checker(checker, interrupt_id); + ret = g_manager->interrupt(local, interrupt_id, interrupt_code); ASSERT_EQ(OB_SUCCESS, ret); r = checker->is_interrupted(); @@ -124,125 +211,139 @@ TEST(ObGlobalInterruptManager, normal) r = checker->is_interrupted(); ASSERT_EQ(false, r); - manager->interrupt(temp_id, icode); + g_manager->interrupt(interrupt_id, interrupt_code); r = checker->is_interrupted(); ASSERT_EQ(true, r); - /// rpc call test + // rpc call test checker->clear_status(); - ret = manager->interrupt(dst, temp_id, icode); + ret = g_manager->interrupt(dst, interrupt_id, interrupt_code); ASSERT_EQ(OB_SUCCESS, ret); + sleep(1); r = checker->is_interrupted(); ASSERT_EQ(true, r); + // Automatic logout of test destruction + // unregister checker + ret = g_manager->unregister_checker(checker, interrupt_id); + ASSERT_EQ(OB_SUCCESS, ret); - - //Automatic logout of test destruction - delete checker; - - ret = manager->map_.get_refactored(temp_id, checker); + ObInterruptCheckerNode *checker_node = nullptr; + ret = g_manager->map_.get_refactored(interrupt_id, checker_node); EXPECT_EQ(OB_HASH_NOT_EXIST, ret); - // Test group registration, interruption and deregistration - ObInterruptChecker *checker0 = new ObInterruptChecker(true, 1); - ObInterruptChecker *checker1 = new ObInterruptChecker(true, 1); - ObInterruptChecker *checker2 = new ObInterruptChecker(true, 1); - ObInterruptChecker *checker3 = new ObInterruptChecker(true, 1); - ret = manager->register_checker(checker0); - EXPECT_EQ(OB_SUCCESS, ret); - ret = manager->register_checker(checker1); - EXPECT_EQ(OB_SUCCESS, ret); - ret = manager->register_checker(checker2); - EXPECT_EQ(OB_SUCCESS, ret); - ret = manager->register_checker(checker3); - EXPECT_EQ(OB_SUCCESS, ret); - ret = manager->unregister_checker(checker0); - EXPECT_EQ(OB_SUCCESS, ret); - ret = manager->unregister_checker(checker1); - EXPECT_EQ(OB_SUCCESS, ret); - ret = manager->unregister_checker(checker2); - EXPECT_EQ(OB_SUCCESS, ret); - ret = manager->unregister_checker(checker3); - EXPECT_EQ(OB_SUCCESS, ret); - delete checker0; - delete checker1; - delete checker2; - delete checker3; + EXPECT_EQ(0, g_manager->map_.size()); + delete checker; +} - // The following is in coroutine mode - // Simulate regular usage scenarios - // And test the use of several global functions +enum RegMode +{ + ManagerInterface = 0, + CheckerInterface = 1, + Guard = 2, + SetInterruptable = 3, +}; - int total = -1; - int fail = 0; - int ready = 0; - uint64_t n[40]; +static void mulit_reg_unreg(RegMode reg_mod) { + int ret = OB_SUCCESS; + ObRandom random; + uint64_t slp_time = abs(random.get_int32()) % 100 + 1; + ObInterruptibleTaskID interrupt_id(1, 1); + ObInterruptChecker *checker = new ObInterruptChecker(false, 1); + switch (reg_mod) { + case RegMode::ManagerInterface: { + ret = g_manager->register_checker(checker, interrupt_id); + EXPECT_EQ(OB_SUCCESS, ret); + ob_usleep(slp_time); + ret = g_manager->unregister_checker(checker, interrupt_id); + EXPECT_EQ(OB_SUCCESS, ret); + break; + } + case RegMode::CheckerInterface: { + ret = checker->register_checker(interrupt_id); + ASSERT_EQ(OB_SUCCESS, ret); + ob_usleep(slp_time); + checker->unregister_checker(interrupt_id); + break; + } + case RegMode::Guard: { + sql::ObPxInterruptGuard px_int_guard(interrupt_id); + break; + } + case RegMode::SetInterruptable: { + ret = SET_INTERRUPTABLE(interrupt_id); + ASSERT_EQ(OB_SUCCESS, ret); + ob_usleep(slp_time); + UNSET_INTERRUPTABLE(interrupt_id); + break; + } + default: { + ret = OB_ERR_UNEXPECTED; + break; + } + } - auto apool = cotesting::FlexPool { - [&] { - int idx = ATOMIC_AAF(&total, 1); - uint64_t cid = CO_ID(); - n[idx] = cid; - SET_INTERRUPTABLE(cid); - /// The first 39 coroutines are waiting to be terminated - if (idx < 39) { - while (ready == 0) { - ::usleep(1000); - } - if (IS_INTERRUPTED()) { - ASSERT_EQ(idx + 10, GET_INTERRUPT_CODE()); - return; - } - fail++; - } - /// The 40th coroutine is responsible for terminating them with three scenarios respectively - else { - for (int i = 0; i < 39; i++) { - int k = i % 3; - if (k == 0) - manager->interrupt(n[i], i + 10); - else if (k == 1) - manager->interrupt(local, n[i], i + 10); - else - manager->interrupt(dst, n[i], i + 10); - } - ASSERT_EQ(false, IS_INTERRUPTED()); - ready++; - } - }, - 4, 10}; - apool.start(true); - ASSERT_EQ(0, fail); + delete checker; +} - int xcnt = 0; - // Test group interrupted - SET_INTERRUPTABLE(10000); +TEST(ObGlobalInterruptManager, multi_thread_register_unregister) +{ + const int64_t thread_cnt = 2048; + std::thread threads[thread_cnt]; + for (int64_t i = 0; i < thread_cnt; ++i) { + RegMode reg_mod = static_cast(i % 4); + threads[i] = std::thread(mulit_reg_unreg, reg_mod); + } - auto bpool = cotesting::FlexPool{ - [&] { - SET_INTERRUPTABLE(10000); - ATOMIC_INC(&xcnt); - while (!IS_INTERRUPTED()) { - CO_YIELD(); - } - EXPECT_EQ(1, GET_INTERRUPT_CODE()); - ATOMIC_DEC(&xcnt); - }, - 10, 10}; - bpool.start(false); - // Wait for all coroutines to execute set_interruptable - while (xcnt != 100) {}; + for (int i = 0; i < thread_cnt; ++i) { + threads[i].join(); + } - // Execute interrupt for taskid = 10000 - manager->interrupt(10000, 1); - bpool.wait(); - // Coroutine interrupt count is cleared to 0 - ASSERT_EQ(0, xcnt); - // The current thread also receives an interrupt signal - ASSERT_EQ(true, IS_INTERRUPTED()); - ASSERT_EQ(1, GET_INTERRUPT_CODE()); + EXPECT_EQ(0, g_manager->map_.size()); +} + +static void thread_wait_interrupt(void *ready_thr_cnt) +{ + ObInterruptibleTaskID interrupt_id(2, 2); + SET_INTERRUPTABLE(interrupt_id); + std::atomic *ready_cnt = static_cast*> (ready_thr_cnt); + ready_cnt->fetch_add(1); + while (!IS_INTERRUPTED()) { + ob_usleep(100); + } + UNSET_INTERRUPTABLE(interrupt_id); +} + +TEST(ObGlobalInterruptManager, interrupt_all_threads) +{ + const int64_t thread_cnt = 64; + std::thread threads[thread_cnt]; + std::atomic ready_thr_cnt{0}; + for (int64_t i = 0; i < thread_cnt; ++i) { + threads[i] = std::thread(thread_wait_interrupt, &ready_thr_cnt); + } + + // waiting all threads ready + while (ready_thr_cnt.load() != thread_cnt) { + ob_usleep(100); + } + + ObInterruptibleTaskID interrupt_id(2, 2); + ObInterruptStackInfo interrupt_stack_info; + ObAddr dst = g_service.get_dst(); + interrupt_stack_info.set_info(1 /*from tid*/, dst /*from_svr_addr*/, + "mock error" /*extra_msg*/); + ObInterruptCode interrupt_code(OB_RPC_CONNECT_ERROR, interrupt_stack_info); + + g_manager->interrupt(dst, interrupt_id, interrupt_code); + + for (int i = 0; i < thread_cnt; ++i) { + threads[i].join(); + } + + EXPECT_EQ(0, g_manager->map_.size()); } } // namespace common @@ -250,7 +351,8 @@ TEST(ObGlobalInterruptManager, normal) int main(int argc, char **argv) { - oceanbase::common::ObLogger::get_logger().set_log_level("DEBUG"); + OB_LOGGER.set_file_name("test_ob_interrupt_manager.log", true, true); + OB_LOGGER.set_log_level("DEBUG"); testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); }