fixed create arbration instance failed because concurrent create

This commit is contained in:
HaHaJeff 2024-08-13 06:02:43 +00:00 committed by ob-robot
parent 6c7d149173
commit f2cc96fbeb

View File

@ -13,7 +13,7 @@
#define private public
#include "env/ob_simple_log_cluster_env.h"
#undef private
#include <thread>
const std::string TEST_NAME = "single_arb_server";
using namespace oceanbase::common;
@ -327,6 +327,73 @@ TEST_F(TestObSimpleMutilArbServer, restart_arb)
EXPECT_EQ(OB_SUCCESS, restart_server(0));
}
TEST_F(TestObSimpleMutilArbServer, multi_thread)
{
SET_CASE_LOG_FILE(TEST_NAME, "restart_arb");
OB_LOGGER.set_log_level("TRACE");
ObISimpleLogServer *iserver = get_cluster()[0];
EXPECT_EQ(true, iserver->is_arb_server());
ObSimpleArbServer *arb_server = dynamic_cast<ObSimpleArbServer*>(iserver);
palflite::PalfEnvLiteMgr *palf_env_mgr = &arb_server->palf_env_mgr_;
int64_t cluster_id = 100;
arbserver::GCMsgEpoch epoch = arbserver::GCMsgEpoch(1, 1);
// test add tenant without cluster, generate placeholder
EXPECT_EQ(OB_SUCCESS, palf_env_mgr->create_palf_env_lite(palflite::PalfEnvKey(cluster_id, 1)));
EXPECT_TRUE(palf_env_mgr->is_cluster_placeholder_exists(cluster_id));
std::vector<int64_t> ls_ids = {1001, 1002, 1003, 1004, 1005, 1006, 1007};
int64_t create_success_count = 0;
auto create_func = [&]() {
for (auto ls_id : ls_ids) {
int ret = palf_env_mgr->create_arbitration_instance(
palflite::PalfEnvKey(cluster_id, 1), arb_server->self_, 1001,
ObTenantRole(ObTenantRole::PRIMARY_TENANT));
if (OB_SUCCESS == ret) {
ATOMIC_INC(&create_success_count);
}
if (OB_SUCCESS != ret) {
ASSERT_EQ(false, true);
} else {
}
}
};
int64_t remove_success_count = 0;
auto remove_func = [&] () {
for (auto ls_id : ls_ids) {
int ret = arb_server->palf_env_mgr_.delete_arbitration_instance(
palflite::PalfEnvKey(cluster_id, 1), arb_server->self_, ls_id);
if (OB_SUCCESS == ret) {
ATOMIC_INC(&remove_success_count);
}
if (OB_SUCCESS != ret) {
ASSERT_EQ(false, true);
} else {
}
}
};
int64_t thread_count = 8;
std::vector<std::thread> create_threads;
create_threads.reserve(thread_count);
for (int i = 0; i < thread_count; i++) {
create_threads.emplace_back(std::thread(create_func));
}
for (int i = 0; i < thread_count; i++) {
create_threads[i].join();
}
ASSERT_EQ(thread_count*ls_ids.size(), create_success_count);
std::vector<std::thread> remove_threads;
remove_threads.reserve(thread_count);
for (int i = 0; i < thread_count; i++) {
remove_threads.emplace_back(std::thread(remove_func));
}
for (int i = 0; i < thread_count; i++) {
remove_threads[i].join();
}
ASSERT_EQ(thread_count*ls_ids.size(), remove_success_count);
}
} // end unittest
} // end oceanbase