fix bug: register interrupt with guard not has a return code

This commit is contained in:
obdev
2023-11-06 11:43:57 +00:00
committed by ob-robot
parent e987af6eae
commit 607bfa740b
7 changed files with 293 additions and 173 deletions

View File

@ -15,105 +15,192 @@
#define private public
#include "lib/coro/testing.h"
#include "share/interrupt/ob_interrupt_rpc_proxy.h"
#include "share/interrupt/ob_global_interrupt_call.h"
#include "sql/engine/px/ob_px_interruption.h"
#include "rpc/testing.h"
#include <thread>
using namespace oceanbase::obrpc;
namespace oceanbase {
namespace common {
TEST(ObGlobalInterruptManager, normal)
namespace oceanbase
{
namespace common
{
rpctesting::Service service;
ObInterruptProcessor p;
service.init();
service.reg_processor(&p);
ObInterruptRpcProxy proxy;
service.get_proxy(proxy);
rpctesting::Service g_service;
ObInterruptProcessor g_processor;
ObInterruptRpcProxy g_proxy;
ObGlobalInterruptManager *g_manager(nullptr);
void init_rpc()
{
int ret = OB_SUCCESS;
int temp_id = 1;
int icode = 1;
ret = g_service.init();
ASSERT_EQ(OB_SUCCESS, ret);
ret = g_service.reg_processor(&g_processor);
ASSERT_EQ(OB_SUCCESS, ret);
ret = g_service.get_proxy(g_proxy);
ASSERT_EQ(OB_SUCCESS, ret);
}
TEST(ObGlobalInterruptManager, init_manager)
{
int ret = OB_SUCCESS;
init_rpc();
ObAddr local(ObAddr::IPV4, "127.0.0.1", 10086);
ObAddr dst = service.get_dst();
/// Singleton acquisition
ObGlobalInterruptManager *manager = ObGlobalInterruptManager::getInstance();
ASSERT_EQ(false, nullptr == manager);
ObAddr dst = g_service.get_dst();
// prepare mocked ObInterruptibleTaskID, ObInterruptCode,
ObInterruptibleTaskID interrupt_id(1, 1);
ObInterruptStackInfo interrupt_stack_info;
interrupt_stack_info.set_info(1 /*from tid*/, local /*from_svr_addr*/,
"mock error" /*extra_msg*/);
ObInterruptCode interrupt_code(OB_RPC_CONNECT_ERROR, interrupt_stack_info);
ObInterruptChecker *checker = nullptr;
/// Perform various operations without initialization, correct feedback
ret = manager->interrupt(temp_id, icode);
// Singleton acquisition
g_manager = ObGlobalInterruptManager::getInstance();
ASSERT_EQ(false, nullptr == g_manager);
// test init ObGlobalInterruptManager
ret = g_manager->interrupt(interrupt_id, interrupt_code);
ASSERT_EQ(OB_NOT_INIT, ret);
ret = manager->interrupt(local, temp_id, icode);
ret = g_manager->interrupt(local, interrupt_id, interrupt_code);
ASSERT_EQ(OB_NOT_INIT, ret);
ret = manager->register_checker(checker);
ret = g_manager->register_checker(checker, interrupt_id);
ASSERT_EQ(OB_NOT_INIT, ret);
ret = manager->unregister_checker(checker);
ret = g_manager->unregister_checker(checker, interrupt_id);
ASSERT_EQ(OB_NOT_INIT, ret);
/// No RPC initialization is illegal
ret = manager->init(local, nullptr);
// No RPC initialization is illegal
ret = g_manager->init(local, nullptr);
ASSERT_EQ(OB_INVALID_ARGUMENT, ret);
/// Regular initialization
ret = manager->init(local, &proxy);
// Regular initialization
ret = g_manager->init(local, &g_proxy);
ASSERT_EQ(OB_SUCCESS, ret);
/// Initialize correctly and feedback error many times
ret = manager->init(local, &proxy);
// Initialize correctly and feedback error many times
ret = g_manager->init(local, &g_proxy);
ASSERT_EQ(OB_INIT_TWICE, ret);
}
ret = manager->register_checker(checker);
ASSERT_EQ(OB_INVALID_ARGUMENT, ret);
TEST(ObGlobalInterruptManager, register_and_unregister)
{
int ret = OB_SUCCESS;
ObAddr local(ObAddr::IPV4, "127.0.0.1", 10086);
ObAddr dst = g_service.get_dst();
ret = manager->unregister_checker(checker);
ASSERT_EQ(OB_INVALID_ARGUMENT, ret);
// prepare mocked ObInterruptibleTaskID, ObInterruptCode,
ObInterruptibleTaskID interrupt_id(1, 1);
ObInterruptStackInfo interrupt_stack_info;
interrupt_stack_info.set_info(1 /*from tid*/, local /*from_svr_addr*/,
"mock error" /*extra_msg*/);
ObInterruptCode interrupt_code(OB_RPC_CONNECT_ERROR, interrupt_stack_info);
/// Get the inspector
checker = new ObInterruptChecker(true, temp_id);
/// Test self-registration
ret = checker->register_checker(temp_id);
// prepare checker
ObInterruptChecker *checker = nullptr;
checker = new ObInterruptChecker(false, 1); // no coroutine again
// Test self-registration
ret = checker->register_checker(interrupt_id);
ASSERT_EQ(OB_SUCCESS, ret);
/// Test duplicate registration
ret = checker->register_checker(temp_id);
ASSERT_EQ(OB_INIT_TWICE, ret);
ret = checker->register_checker(temp_id + 1);
ASSERT_EQ(OB_INIT_TWICE, ret);
ret = manager->register_checker(checker);
ASSERT_EQ(OB_INIT_TWICE, ret);
// Test duplicate registration
ret = checker->register_checker(interrupt_id);
ASSERT_EQ(OB_HASH_EXIST, ret);
ret = g_manager->register_checker(checker, interrupt_id);
ASSERT_EQ(OB_HASH_EXIST, ret);
/// Semaphore detection without interruption
// unregister checker
ret = g_manager->unregister_checker(checker, interrupt_id);
ASSERT_EQ(OB_SUCCESS, ret);
// duplicate unregister checker
ret = g_manager->unregister_checker(checker, interrupt_id);
ASSERT_EQ(OB_HASH_NOT_EXIST, ret);
EXPECT_EQ(0, g_manager->map_.size());
delete checker;
// Test group registration and deregistration
ObInterruptChecker *checker0 = new ObInterruptChecker(false, 1);
ObInterruptChecker *checker1 = new ObInterruptChecker(false, 1);
ObInterruptChecker *checker2 = new ObInterruptChecker(false, 1);
ObInterruptChecker *checker3 = new ObInterruptChecker(false, 1);
ret = g_manager->register_checker(checker0, interrupt_id);
EXPECT_EQ(OB_SUCCESS, ret);
ret = g_manager->register_checker(checker1, interrupt_id);
EXPECT_EQ(OB_SUCCESS, ret);
ret = g_manager->register_checker(checker2, interrupt_id);
EXPECT_EQ(OB_SUCCESS, ret);
ret = g_manager->register_checker(checker3, interrupt_id);
EXPECT_EQ(OB_SUCCESS, ret);
ret = g_manager->unregister_checker(checker0, interrupt_id);
EXPECT_EQ(OB_SUCCESS, ret);
ret = g_manager->unregister_checker(checker1, interrupt_id);
EXPECT_EQ(OB_SUCCESS, ret);
ret = g_manager->unregister_checker(checker2, interrupt_id);
EXPECT_EQ(OB_SUCCESS, ret);
ret = g_manager->unregister_checker(checker3, interrupt_id);
EXPECT_EQ(OB_SUCCESS, ret);
delete checker0;
delete checker1;
delete checker2;
delete checker3;
EXPECT_EQ(0, g_manager->map_.size());
}
TEST(ObGlobalInterruptManager, interrupt)
{
int ret = OB_SUCCESS;
ObAddr local(ObAddr::IPV4, "127.0.0.1", 10086);
ObAddr dst = g_service.get_dst();
// prepare mocked ObInterruptibleTaskID, ObInterruptCode,
ObInterruptibleTaskID interrupt_id(1, 1);
ObInterruptStackInfo interrupt_stack_info;
interrupt_stack_info.set_info(1 /*from tid*/, local /*from_svr_addr*/,
"mock error" /*extra_msg*/);
ObInterruptCode interrupt_code(OB_RPC_CONNECT_ERROR, interrupt_stack_info);
// prepare checker && register checker
ObInterruptChecker *checker = nullptr;
checker = new ObInterruptChecker(false, 1); // no coroutine again
ret = checker->register_checker(interrupt_id);
ASSERT_EQ(OB_SUCCESS, ret);
// Semaphore detection without interruption
bool r = checker->is_interrupted();
ASSERT_EQ(false, r);
/// Local execution interrupt check semaphore
ret = manager->interrupt(temp_id, icode);
// Local execution interrupt check semaphore
ret = g_manager->interrupt(interrupt_id, interrupt_code);
ASSERT_EQ(OB_SUCCESS, ret);
r = checker->is_interrupted();
ASSERT_EQ(true, r);
/// Release checker
ret = manager->unregister_checker(checker);
// unregister checker
ret = g_manager->unregister_checker(checker, interrupt_id);
ASSERT_EQ(OB_SUCCESS, ret);
/// The interrupt can still be called after the checker is released
ret = manager->interrupt(local, temp_id, icode);
// The interrupt can still be called after the checker is unregistered
// but can not actually do interrupt, the return code is OB_HASH_NOT_EXIST BUT overwrite by
// OB_SUCCESS
ret = g_manager->interrupt(local, interrupt_id, interrupt_code);
ASSERT_EQ(OB_SUCCESS, ret);
/// Downgrade to local interrupt test when remote interrupt is passed to local address
manager->register_checker(checker);
ret = manager->interrupt(local, temp_id, icode);
// Downgrade to local interrupt test when remote interrupt is passed to local address
g_manager->register_checker(checker, interrupt_id);
ret = g_manager->interrupt(local, interrupt_id, interrupt_code);
ASSERT_EQ(OB_SUCCESS, ret);
r = checker->is_interrupted();
@ -124,125 +211,139 @@ TEST(ObGlobalInterruptManager, normal)
r = checker->is_interrupted();
ASSERT_EQ(false, r);
manager->interrupt(temp_id, icode);
g_manager->interrupt(interrupt_id, interrupt_code);
r = checker->is_interrupted();
ASSERT_EQ(true, r);
/// rpc call test
// rpc call test
checker->clear_status();
ret = manager->interrupt(dst, temp_id, icode);
ret = g_manager->interrupt(dst, interrupt_id, interrupt_code);
ASSERT_EQ(OB_SUCCESS, ret);
sleep(1);
r = checker->is_interrupted();
ASSERT_EQ(true, r);
// Automatic logout of test destruction
// unregister checker
ret = g_manager->unregister_checker(checker, interrupt_id);
ASSERT_EQ(OB_SUCCESS, ret);
//Automatic logout of test destruction
delete checker;
ret = manager->map_.get_refactored(temp_id, checker);
ObInterruptCheckerNode *checker_node = nullptr;
ret = g_manager->map_.get_refactored(interrupt_id, checker_node);
EXPECT_EQ(OB_HASH_NOT_EXIST, ret);
// Test group registration, interruption and deregistration
ObInterruptChecker *checker0 = new ObInterruptChecker(true, 1);
ObInterruptChecker *checker1 = new ObInterruptChecker(true, 1);
ObInterruptChecker *checker2 = new ObInterruptChecker(true, 1);
ObInterruptChecker *checker3 = new ObInterruptChecker(true, 1);
ret = manager->register_checker(checker0);
EXPECT_EQ(OB_SUCCESS, ret);
ret = manager->register_checker(checker1);
EXPECT_EQ(OB_SUCCESS, ret);
ret = manager->register_checker(checker2);
EXPECT_EQ(OB_SUCCESS, ret);
ret = manager->register_checker(checker3);
EXPECT_EQ(OB_SUCCESS, ret);
ret = manager->unregister_checker(checker0);
EXPECT_EQ(OB_SUCCESS, ret);
ret = manager->unregister_checker(checker1);
EXPECT_EQ(OB_SUCCESS, ret);
ret = manager->unregister_checker(checker2);
EXPECT_EQ(OB_SUCCESS, ret);
ret = manager->unregister_checker(checker3);
EXPECT_EQ(OB_SUCCESS, ret);
delete checker0;
delete checker1;
delete checker2;
delete checker3;
EXPECT_EQ(0, g_manager->map_.size());
delete checker;
}
// The following is in coroutine mode
// Simulate regular usage scenarios
// And test the use of several global functions
enum RegMode
{
ManagerInterface = 0,
CheckerInterface = 1,
Guard = 2,
SetInterruptable = 3,
};
int total = -1;
int fail = 0;
int ready = 0;
uint64_t n[40];
static void mulit_reg_unreg(RegMode reg_mod) {
int ret = OB_SUCCESS;
ObRandom random;
uint64_t slp_time = abs(random.get_int32()) % 100 + 1;
ObInterruptibleTaskID interrupt_id(1, 1);
ObInterruptChecker *checker = new ObInterruptChecker(false, 1);
switch (reg_mod) {
case RegMode::ManagerInterface: {
ret = g_manager->register_checker(checker, interrupt_id);
EXPECT_EQ(OB_SUCCESS, ret);
ob_usleep(slp_time);
ret = g_manager->unregister_checker(checker, interrupt_id);
EXPECT_EQ(OB_SUCCESS, ret);
break;
}
case RegMode::CheckerInterface: {
ret = checker->register_checker(interrupt_id);
ASSERT_EQ(OB_SUCCESS, ret);
ob_usleep(slp_time);
checker->unregister_checker(interrupt_id);
break;
}
case RegMode::Guard: {
sql::ObPxInterruptGuard px_int_guard(interrupt_id);
break;
}
case RegMode::SetInterruptable: {
ret = SET_INTERRUPTABLE(interrupt_id);
ASSERT_EQ(OB_SUCCESS, ret);
ob_usleep(slp_time);
UNSET_INTERRUPTABLE(interrupt_id);
break;
}
default: {
ret = OB_ERR_UNEXPECTED;
break;
}
}
auto apool = cotesting::FlexPool {
[&] {
int idx = ATOMIC_AAF(&total, 1);
uint64_t cid = CO_ID();
n[idx] = cid;
SET_INTERRUPTABLE(cid);
/// The first 39 coroutines are waiting to be terminated
if (idx < 39) {
while (ready == 0) {
::usleep(1000);
}
if (IS_INTERRUPTED()) {
ASSERT_EQ(idx + 10, GET_INTERRUPT_CODE());
return;
}
fail++;
}
/// The 40th coroutine is responsible for terminating them with three scenarios respectively
else {
for (int i = 0; i < 39; i++) {
int k = i % 3;
if (k == 0)
manager->interrupt(n[i], i + 10);
else if (k == 1)
manager->interrupt(local, n[i], i + 10);
else
manager->interrupt(dst, n[i], i + 10);
}
ASSERT_EQ(false, IS_INTERRUPTED());
ready++;
}
},
4, 10};
apool.start(true);
ASSERT_EQ(0, fail);
delete checker;
}
int xcnt = 0;
// Test group interrupted
SET_INTERRUPTABLE(10000);
TEST(ObGlobalInterruptManager, multi_thread_register_unregister)
{
const int64_t thread_cnt = 2048;
std::thread threads[thread_cnt];
for (int64_t i = 0; i < thread_cnt; ++i) {
RegMode reg_mod = static_cast<RegMode>(i % 4);
threads[i] = std::thread(mulit_reg_unreg, reg_mod);
}
auto bpool = cotesting::FlexPool{
[&] {
SET_INTERRUPTABLE(10000);
ATOMIC_INC(&xcnt);
while (!IS_INTERRUPTED()) {
CO_YIELD();
}
EXPECT_EQ(1, GET_INTERRUPT_CODE());
ATOMIC_DEC(&xcnt);
},
10, 10};
bpool.start(false);
// Wait for all coroutines to execute set_interruptable
while (xcnt != 100) {};
for (int i = 0; i < thread_cnt; ++i) {
threads[i].join();
}
// Execute interrupt for taskid = 10000
manager->interrupt(10000, 1);
bpool.wait();
// Coroutine interrupt count is cleared to 0
ASSERT_EQ(0, xcnt);
// The current thread also receives an interrupt signal
ASSERT_EQ(true, IS_INTERRUPTED());
ASSERT_EQ(1, GET_INTERRUPT_CODE());
EXPECT_EQ(0, g_manager->map_.size());
}
static void thread_wait_interrupt(void *ready_thr_cnt)
{
ObInterruptibleTaskID interrupt_id(2, 2);
SET_INTERRUPTABLE(interrupt_id);
std::atomic<int> *ready_cnt = static_cast<std::atomic<int>*> (ready_thr_cnt);
ready_cnt->fetch_add(1);
while (!IS_INTERRUPTED()) {
ob_usleep(100);
}
UNSET_INTERRUPTABLE(interrupt_id);
}
TEST(ObGlobalInterruptManager, interrupt_all_threads)
{
const int64_t thread_cnt = 64;
std::thread threads[thread_cnt];
std::atomic<int> ready_thr_cnt{0};
for (int64_t i = 0; i < thread_cnt; ++i) {
threads[i] = std::thread(thread_wait_interrupt, &ready_thr_cnt);
}
// waiting all threads ready
while (ready_thr_cnt.load() != thread_cnt) {
ob_usleep(100);
}
ObInterruptibleTaskID interrupt_id(2, 2);
ObInterruptStackInfo interrupt_stack_info;
ObAddr dst = g_service.get_dst();
interrupt_stack_info.set_info(1 /*from tid*/, dst /*from_svr_addr*/,
"mock error" /*extra_msg*/);
ObInterruptCode interrupt_code(OB_RPC_CONNECT_ERROR, interrupt_stack_info);
g_manager->interrupt(dst, interrupt_id, interrupt_code);
for (int i = 0; i < thread_cnt; ++i) {
threads[i].join();
}
EXPECT_EQ(0, g_manager->map_.size());
}
} // namespace common
@ -250,7 +351,8 @@ TEST(ObGlobalInterruptManager, normal)
int main(int argc, char **argv)
{
oceanbase::common::ObLogger::get_logger().set_log_level("DEBUG");
OB_LOGGER.set_file_name("test_ob_interrupt_manager.log", true, true);
OB_LOGGER.set_log_level("DEBUG");
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}