/** * Copyright (c) 2021 OceanBase * OceanBase CE is licensed under Mulan PubL v2. * You can use this software according to the terms and conditions of the Mulan PubL v2. * You may obtain a copy of Mulan PubL v2 at: * http://license.coscl.org.cn/MulanPubL-2.0 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. * See the Mulan PubL v2 for more details. */ #include "ob_multi_replica_test_base.h" #include "ob_multi_replica_util.h" namespace oceanbase { namespace unittest { int set_trace_id(char *buf) { return ObCurTraceId::get_trace_id()->set(buf); } void init_log_and_gtest(int argc, char **argv) { if (argc < 1) { abort(); } std::string app_name = argv[0]; app_name = app_name.substr(app_name.find_last_of("/\\") + 1); std::string app_log_name = app_name + ".log"; std::string app_rs_log_name = app_name + "_rs.log"; std::string app_ele_log_name = app_name + "_election.log"; std::string app_gtest_log_name = app_name + "_gtest.log"; std::string app_trace_log_name = app_name + "_trace.log"; // system(("rm -rf " + app_log_name + "*").c_str()); // system(("rm -rf " + app_rs_log_name + "*").c_str()); // system(("rm -rf " + app_ele_log_name + "*").c_str()); // system(("rm -rf " + app_gtest_log_name + "*").c_str()); // system(("rm -rf " + app_trace_log_name + "*").c_str()); // system(("rm -rf " + app_name + "_*").c_str()); init_gtest_output(app_gtest_log_name); OB_LOGGER.set_file_name(app_log_name.c_str(), true, false, app_rs_log_name.c_str(), app_ele_log_name.c_str(), app_trace_log_name.c_str()); } void init_gtest_output(std::string >est_log_name) { // 判断是否处于Farm中 char *mit_network_start_port_env = getenv("mit_network_start_port"); char *mit_network_port_num_env = getenv("mit_network_port_num"); if (mit_network_start_port_env != nullptr && mit_network_port_num_env != nullptr) { std::string gtest_file_name = gtest_log_name; int fd = open(gtest_file_name.c_str(), O_RDWR | O_CREAT, 0666); if (fd == 0) { ob_abort(); } dup2(fd, STDOUT_FILENO); dup2(fd, STDERR_FILENO); } } uint32_t get_local_addr(const char *dev_name) { int fd, intrface; struct ifreq buf[16]; struct ifconf ifc; if ((fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) { return 0; } ifc.ifc_len = sizeof(buf); ifc.ifc_buf = (caddr_t)buf; if (ioctl(fd, SIOCGIFCONF, (char *)&ifc) != 0) { close(fd); return 0; } intrface = static_cast(ifc.ifc_len / sizeof(struct ifreq)); while (intrface-- > 0) { if (ioctl(fd, SIOCGIFFLAGS, (char *)&buf[intrface]) != 0) { continue; } if ((buf[intrface].ifr_flags & IFF_LOOPBACK) != 0) continue; if (!(buf[intrface].ifr_flags & IFF_UP)) continue; if (dev_name != NULL && strcmp(dev_name, buf[intrface].ifr_name)) continue; if (!(ioctl(fd, SIOCGIFADDR, (char *)&buf[intrface]))) { close(fd); return ((struct sockaddr_in *)(&buf[intrface].ifr_addr))->sin_addr.s_addr; } } close(fd); return 0; } std::string get_local_ip() { uint32_t ip = get_local_addr("bond0"); if (ip == 0) { ip = get_local_addr("eth0"); } if (ip == 0) { return ""; } return inet_ntoa(*(struct in_addr *)(&ip)); } const char *ObMultiReplicaTestBase::log_disk_size_ = "10G"; const char *ObMultiReplicaTestBase::memory_size_ = "10G"; std::shared_ptr ObMultiReplicaTestBase::replica_ = nullptr; bool ObMultiReplicaTestBase::is_started_ = false; bool ObMultiReplicaTestBase::is_inited_ = false; std::string ObMultiReplicaTestBase::env_prefix_; std::string ObMultiReplicaTestBase::app_name_; std::string ObMultiReplicaTestBase::exec_dir_; std::string ObMultiReplicaTestBase::env_prefix_path_; std::string ObMultiReplicaTestBase::event_file_path_; bool ObMultiReplicaTestBase::enable_env_warn_log_ = false; std::vector ObMultiReplicaTestBase::rpc_ports_; ObServerInfoList ObMultiReplicaTestBase::server_list_; std::string ObMultiReplicaTestBase::rs_list_; std::string ObMultiReplicaTestBase::local_ip_; std::vector ObMultiReplicaTestBase::zone_pids_; int ObMultiReplicaTestBase::cur_zone_id_ = -1; int ObMultiReplicaTestBase::restart_zone_id_ = -1; int ObMultiReplicaTestBase::restart_no_ = 0; bool ObMultiReplicaTestBase::block_msg_ = false; ObMultiReplicaTestBase::ObMultiReplicaTestBase() {} ObMultiReplicaTestBase::~ObMultiReplicaTestBase() {} int ObMultiReplicaTestBase::bootstrap_multi_replica( const std::string & app_name, const int restart_zone_id, const int restart_no, const std::string &env_prefix) { int ret = OB_SUCCESS; if (is_valid_zone_id(restart_zone_id) && restart_no <= 0) { ret = OB_INVALID_ARGUMENT; SERVER_LOG(ERROR, "invalid restart arg", K(ret), K(restart_zone_id), K(restart_no)); } if (!is_inited_ && OB_SUCC(ret)) { env_prefix_ = env_prefix + "_test_data"; //+ std::to_string(ObTimeUtility::current_time()) + "_"; exec_dir_ = get_current_dir_name(); env_prefix_path_ = exec_dir_ + "/" + env_prefix_; event_file_path_ = env_prefix_path_ + "/" + CLUSTER_EVENT_FILE_NAME; zone_pids_.resize(3); restart_zone_id_ = restart_zone_id; restart_no_ = restart_no; app_name_ = app_name.substr(app_name.find_last_of("/\\") + 1); // SERVER_LOG(INFO, "bootstrap_multi_replica arg", K(ret), K(getpid()), K(restart_zone_id_), // K(restart_no_), K(env_prefix_path_.c_str())); printf("bootstrap_multi_replica arg: pid=%d, restart_zone_id=%d, restart_no=%d, " "env_prefix_path=%s\n", getpid(), restart_zone_id_, restart_no_, env_prefix_path_.c_str()); if (OB_FAIL(init_replicas_())) { SERVER_LOG(WARN, "init multi replica failed.", KR(ret)); } } if (OB_FAIL(ret)) { // do nothing } else if (0 == cur_zone_id_) { // wait zone process exit int status = 0; for (int i = 0; i < 3; i++) { waitpid(zone_pids_[i], &status, 0); SERVER_LOG(INFO, "wait zone pid exit successfully", KR(ret), K(cur_zone_id_), K(i), K(zone_pids_[i]), K(status)); } } else if (!is_started_) { if (OB_FAIL(start())) { SERVER_LOG(WARN, "start multi replica failed.", KR(ret)); sleep(5); abort(); } } return ret; } int ObMultiReplicaTestBase::wait_all_test_completed() { int ret = OB_SUCCESS; std::string zone_str = "ZONE" + std::to_string(cur_zone_id_); if (OB_FAIL(finish_event(TEST_CASE_FINSH_EVENT_PREFIX + zone_str, zone_str))) { SERVER_LOG(WARN, "write test finish event failed", K(ret), K(zone_str.c_str())); } else { for (int i = 1; i <= MAX_ZONE_COUNT && OB_SUCC(ret); i++) { zone_str = "ZONE" + std::to_string(i); if (OB_FAIL( wait_event_finish(TEST_CASE_FINSH_EVENT_PREFIX + zone_str, zone_str, INT64_MAX))) { fprintf(stdout, "[WAIT EVENT] wait target event failed : ret = %d, zone_str = %s\n", ret, zone_str.c_str()); } } SERVER_LOG(INFO, "ObMultiReplicaTestBase [WAIT EVENT] find all finish event", K(ret), K(cur_zone_id_), K(TEST_CASE_FINSH_EVENT_PREFIX)); fprintf(stdout, "[WAIT EVENT] wait all test case successfully, ret = %d, cur_zone_id = %d, " "MAX_ZONE_COUNT = %d\n", ret, cur_zone_id_, MAX_ZONE_COUNT); } // if (cur_zone_id_ == 1) { // int status = 0; // int status2 = 0; // waitpid(child_pid_, &status, 0); // waitpid(child_pid2_, &status2, 0); // if (0 != status || 0 != status2) { // fprintf(stdout, // "Child process exit with error code : [%d]%d, [%d]%d\n", // child_pid_, status, child_pid2_, status2); // SERVER_LOG(INFO, "[ObMultiReplicaTestBase] Child process exit with error code", // K(child_pid_), // K(status), K(child_pid2_), K(status2)); // ret = status; // return ret; // } else { // fprintf(stdout, // "Child process run all test cases done. [%d]%d, [%d]%d\n", // child_pid_, status, child_pid2_, status2); // SERVER_LOG(INFO, "[ObMultiReplicaTestBase] Child process run all test cases done", // K(child_pid_), K(status), K(child_pid2_), K(status2)); // } // } return ret; } void ObMultiReplicaTestBase::SetUp() { std::string cur_test_case_name = ::testing::UnitTest::GetInstance()->current_test_case()->name(); std::string cur_test_info_name = ::testing::UnitTest::GetInstance()->current_test_info()->name(); SERVER_LOG(INFO, "[ObMultiReplicaTestBase] SetUp", K(cur_test_case_name.c_str()), K(cur_test_info_name.c_str())); } void ObMultiReplicaTestBase::TearDown() { std::string cur_test_case_name = ::testing::UnitTest::GetInstance()->current_test_case()->name(); std::string cur_test_info_name = ::testing::UnitTest::GetInstance()->current_test_info()->name(); SERVER_LOG(INFO, "[ObMultiReplicaTestBase] TearDown", K(cur_test_case_name.c_str()), K(cur_test_info_name.c_str())); } void ObMultiReplicaTestBase::SetUpTestCase() { SERVER_LOG(INFO, "[ObMultiReplicaTestBase] SetUpTestCase"); } void ObMultiReplicaTestBase::TearDownTestCase() { SERVER_LOG(INFO, "[ObMultiReplicaTestBase] TearDownTestCase"); int ret = OB_SUCCESS; const bool enable_failed_sleep = false; int fail_cnt = ::testing::UnitTest::GetInstance()->failed_test_case_count(); if (fail_cnt > 0 && enable_failed_sleep) { fprintf(stdout, "[SLEEP] FAIL %d TEST CASE, WAIT TO KILL", fail_cnt); usleep(30 * 60 * 1000 * 1000); } // fprintf(stdout, ">>>>>>> AFTER RUN TEST: pid = %d\n", getpid()); if (OB_FAIL(oceanbase::unittest::ObMultiReplicaTestBase::wait_all_test_completed())) { fprintf(stdout, "wait test case completed failed. ret = %d", ret); } if (OB_NOT_NULL(replica_)) { // ret = close(); // ASSERT_EQ(ret, OB_SUCCESS); } if (chdir(exec_dir_.c_str()) == 0) { bool to_delete = true; if (to_delete) { // system((std::string("rm -rf ") + env_prefix_ + std::string("*")).c_str()); } } _Exit(fail_cnt); } int ObMultiReplicaTestBase::init_replicas_() { SERVER_LOG(INFO, "init simple cluster test base", K(restart_zone_id_), K(restart_no_)); int ret = OB_SUCCESS; if (!is_valid_zone_id(restart_zone_id_)) { // for guard process system(("rm -rf " + env_prefix_).c_str()); // SERVER_LOG(INFO, "create dir and change work dir start.", K(env_prefix_.c_str())); if (OB_FAIL(mkdir(env_prefix_.c_str(), 0777))) { } else if (OB_FAIL(chdir(env_prefix_.c_str()))) { } else { const char *current_dir = env_prefix_.c_str(); // SERVER_LOG(INFO, "create dir and change work dir done.", K(current_dir)); } std::string app_log_name = app_name_ + ".log"; std::string app_rs_log_name = app_name_ + "_rs.log"; std::string app_ele_log_name = app_name_ + "_election.log"; std::string app_gtest_log_name = app_name_ + "_gtest.log"; std::string app_trace_log_name = app_name_ + "_trace.log"; // system(("rm -rf " + app_log_name + "*").c_str()); // system(("rm -rf " + app_rs_log_name + "*").c_str()); // system(("rm -rf " + app_ele_log_name + "*").c_str()); // system(("rm -rf " + app_gtest_log_name + "*").c_str()); // system(("rm -rf " + app_trace_log_name + "*").c_str()); // system(("rm -rf " + app_name + "_*").c_str()); init_gtest_output(app_gtest_log_name); OB_LOGGER.set_file_name(app_log_name.c_str(), true, false, app_rs_log_name.c_str(), app_ele_log_name.c_str(), app_trace_log_name.c_str()); if (OB_SUCC(ret)) { local_ip_ = get_local_ip(); if (local_ip_ == "") { SERVER_LOG(WARN, "get_local_ip failed"); return -666666666; } } // mkdir std::vector dirs; rs_list_.clear(); rpc_ports_.clear(); server_list_.reset(); int server_fd = 0; for (int i = 1; i <= MAX_ZONE_COUNT && OB_SUCC(ret); i++) { std::string zone_dir = "zone" + std::to_string(i); ret = mkdir(zone_dir.c_str(), 0777); std::string data_dir = zone_dir + "/store"; dirs.push_back(data_dir); dirs.push_back(zone_dir + "/run"); dirs.push_back(zone_dir + "/etc"); dirs.push_back(zone_dir + "/log"); dirs.push_back(zone_dir + "/wallet"); dirs.push_back(data_dir + "/clog"); dirs.push_back(data_dir + "/slog"); dirs.push_back(data_dir + "/sstable"); int64_t tmp_port = observer::ObSimpleServerReplica::get_rpc_port(server_fd); rpc_ports_.push_back(tmp_port); rs_list_ += local_ip_ + ":" + std::to_string(rpc_ports_[i - 1]) + ":" + std::to_string(rpc_ports_[i - 1] + 1); if (i < MAX_ZONE_COUNT) { rs_list_ += ";"; } obrpc::ObServerInfo server_info; server_info.zone_ = zone_dir.c_str(); server_info.server_ = common::ObAddr(common::ObAddr::IPV4, local_ip_.c_str(), rpc_ports_[i - 1]); server_info.region_ = "sys_region"; server_list_.push_back(server_info); } if (OB_SUCC(ret)) { for (auto &dir : dirs) { ret = mkdir(dir.c_str(), 0777); if (OB_FAIL(ret)) { SERVER_LOG(ERROR, "ObSimpleServerReplica mkdir", K(ret), K(dir.c_str())); return ret; } } } if (OB_SUCC(ret)) { for (int j = 0; j < MAX_ZONE_COUNT && OB_SUCC(ret); j++) { if (OB_FAIL(finish_event("ZONE" + std::to_string(j + 1) + "_RPC_PORT", std::to_string(rpc_ports_[j])))) { SERVER_LOG(ERROR, "write RPC_PORT event failed", K(ret), K(j), K(rpc_ports_[j])); } } } } else { rs_list_.clear(); rpc_ports_.clear(); server_list_.reset(); if (OB_FAIL(chdir(env_prefix_.c_str()))) { } else { const char *current_dir = env_prefix_.c_str(); SERVER_LOG(INFO, "create dir and change work dir done.", K(current_dir)); } std::string app_log_name = "zone" + std::to_string(restart_zone_id_) + "/" + app_name_ + ".log"; std::string app_rs_log_name = "zone" + std::to_string(restart_zone_id_) + "/" + app_name_ + "_rs.log"; std::string app_ele_log_name = "zone" + std::to_string(restart_zone_id_) + "/" + app_name_ + "_election.log"; std::string app_gtest_log_name = "zone" + std::to_string(restart_zone_id_) + "/" + app_name_ + "_gtest.log"; std::string app_trace_log_name = "zone" + std::to_string(restart_zone_id_) + "/" + app_name_ + "_trace.log"; // system(("rm -rf " + app_log_name + "*").c_str()); // system(("rm -rf " + app_rs_log_name + "*").c_str()); // system(("rm -rf " + app_ele_log_name + "*").c_str()); // system(("rm -rf " + app_gtest_log_name + "*").c_str()); // system(("rm -rf " + app_trace_log_name + "*").c_str()); // system(("rm -rf " + app_name + "_*").c_str()); init_gtest_output(app_gtest_log_name); OB_LOGGER.set_file_name(app_log_name.c_str(), true, false, app_rs_log_name.c_str(), app_ele_log_name.c_str(), app_trace_log_name.c_str()); if (OB_SUCC(ret)) { local_ip_ = get_local_ip(); if (local_ip_ == "") { SERVER_LOG(WARN, "get_local_ip failed"); return -666666666; } } if (OB_SUCC(ret)) { for (int j = 0; j < MAX_ZONE_COUNT && OB_SUCC(ret); j++) { std::string rpc_port_str = ""; if (OB_FAIL(wait_event_finish("ZONE" + std::to_string(j + 1) + "_RPC_PORT", rpc_port_str, 30000 /*30s*/, 100 /*100ms*/))) { SERVER_LOG(ERROR, "read RPC_PORT event failed", K(ret), K(j), K(rpc_port_str.c_str())); } else { int tmp_rpc_port = std::stoi(rpc_port_str); rpc_ports_.push_back(tmp_rpc_port); rs_list_ += local_ip_ + ":" + rpc_port_str + ":" + std::to_string(tmp_rpc_port + 1); if (j < MAX_ZONE_COUNT) { rs_list_ += ";"; } obrpc::ObServerInfo server_info; std::string zone_dir = "zone" + std::to_string(j + 1); server_info.zone_ = zone_dir.c_str(); server_info.server_ = common::ObAddr(common::ObAddr::IPV4, local_ip_.c_str(), tmp_rpc_port); server_info.region_ = "sys_region"; server_list_.push_back(server_info); } } } } if (OB_SUCC(ret)) { if (!is_valid_zone_id(restart_zone_id_)) { int prev_zone_pid = 999999; for (int i = 0; i < 3; i++) { if (prev_zone_pid > 0) { prev_zone_pid = fork(); if (prev_zone_pid > 0) { zone_pids_[i] = prev_zone_pid; } else if (prev_zone_pid == 0) { cur_zone_id_ = i + 1; SERVER_LOG(INFO, "[ObMultiReplicaTestBase] init sub process zone id", K(i), K(getpid()), K(cur_zone_id_), K(prev_zone_pid)); } else if (prev_zone_pid < 0) { perror("fork"); exit(EXIT_FAILURE); } } } if (cur_zone_id_ < 0) { // guard process cur_zone_id_ = 0; ::testing::GTEST_FLAG(filter) = "GuardProcessTest"; SERVER_LOG(INFO, "[ObMultiReplicaTestBase] init guard zone id", K(ret), K(getpid()), K(cur_zone_id_), K(zone_pids_[0]), K(zone_pids_[1]), K(zone_pids_[2])); } else { ::testing::GTEST_FLAG(filter) = ObMultiReplicaTestBase::ZONE_TEST_CASE_NAME[cur_zone_id_ - 1] + "*"; fprintf(stdout, "zone %d test_case_name = %s\n", cur_zone_id_, ObMultiReplicaTestBase::ZONE_TEST_CASE_NAME[cur_zone_id_ - 1].c_str()); ret = init_test_replica_(cur_zone_id_); SERVER_LOG(INFO, "[ObMultiReplicaTestBase] init sub process replica", K(getpid()), K(cur_zone_id_), K(prev_zone_pid)); } } else { cur_zone_id_ = restart_zone_id_; std::string restart_case_name = TEST_CASE_BASE_NAME + std::string("_RESTART_") + std::to_string(restart_no_) + std::string("_ZONE") + std::to_string(restart_zone_id_) + "*"; ::testing::GTEST_FLAG(filter) = restart_case_name.c_str(); fprintf(stdout, "restart pid%d zone %d test_case_name = %s\n", getpid(), restart_zone_id_, restart_case_name.c_str()); ret = init_test_replica_(cur_zone_id_); } } is_inited_ = true; return ret; } int ObMultiReplicaTestBase::restart_zone(const int zone_id, const int restart_no) { int ret = OB_SUCCESS; char exec_path[4096] = {0}; int return_val = 0; fprintf(stdout, "[RESTART %d] prepare restart zone %d with the restart_no of %d (cur_time = %ld)\n", getpid(), zone_id, restart_no, ObTimeUtility::current_time()); if (zone_id != cur_zone_id_ || restart_no <= 0) { ret = OB_INVALID_ARGUMENT; SERVER_LOG(ERROR, "invalid restart arg", K(ret), K(zone_id), K(restart_no), K(cur_zone_id_), K(getpid())); } else if (0 >= (return_val = readlink("/proc/self/exe", exec_path, 4096))) { ret = OB_ERR_UNEXPECTED; SERVER_LOG(ERROR, "realink for exec name error", K(ret), K(zone_id), K(restart_no), K(cur_zone_id_), K(getpid()), K(return_val), K(exec_path)); } else if (OB_FAIL(chdir(exec_dir_.c_str()))) { // } else if (OB_FALSE_IT()) } else if (0 > (return_val = execl(exec_path, exec_path, (std::to_string(zone_id)).c_str(), (std::to_string(restart_no)).c_str(), NULL))) { ret = OB_ERR_UNEXPECTED; SERVER_LOG(ERROR, "execl for restart error", K(ret), K(errno), K(zone_id), K(restart_no), K(cur_zone_id_), K(getpid()), K(return_val), K(exec_path)); } return ret; } int ObMultiReplicaTestBase::init_test_replica_(const int zone_id) { int ret = OB_SUCCESS; if (replica_ == nullptr) { cur_zone_id_ = zone_id; replica_ = std::make_shared( app_name_, "zone" + std::to_string(zone_id), zone_id, rpc_ports_[zone_id - 1], rs_list_, server_list_, is_valid_zone_id(restart_zone_id_), oceanbase::observer::ObServer::get_instance(), "./store", log_disk_size_, memory_size_); } else { SERVER_LOG(ERROR, "construct ObSimpleServerReplica repeatedlly", K(ret), K(zone_id), K(rpc_ports_[zone_id - 1]), K(rs_list_.c_str())); } if (replica_ != nullptr) { int ret = replica_->simple_init(); if (OB_FAIL(ret)) { SERVER_LOG(ERROR, "init replica failed", K(ret), K(zone_id)); } } return ret; } int ObMultiReplicaTestBase::read_cur_json_document_(rapidjson::Document &json_doc) { int ret = OB_SUCCESS; FILE *fp = fopen(event_file_path_.c_str(), "rb"); if (fp == NULL) { if (json_doc.IsObject()) { fprintf(stdout, "Fail to open file! file_path = %s\n", event_file_path_.c_str()); } ret = OB_ENTRY_NOT_EXIST; return ret; } char read_buffer[4 * 1024]; rapidjson::FileReadStream rs(fp, read_buffer, sizeof(read_buffer)); json_doc.ParseStream(rs); if (json_doc.HasParseError()) { ret = OB_ERR_UNEXPECTED; SERVER_LOG(WARN, "[ObMultiReplicaTestBase] Parse EVENT JSON ERROR", K(ret), K(json_doc.GetParseError())); fprintf(stdout, "Parse Event Json Error\n"); } fclose(fp); return OB_SUCCESS; } int ObMultiReplicaTestBase::wait_event_finish(const std::string &event_name, std::string &event_content, int64_t wait_timeout_ms, int64_t retry_interval_ms) { int ret = OB_SUCCESS; bool find_event = false; int64_t start_time = ObTimeUtility::fast_current_time(); while (OB_SUCC(ret) && !find_event) { rapidjson::Document json_doc; if (OB_FAIL(read_cur_json_document_(json_doc))) { SERVER_LOG(WARN, "read existed json document failed", K(ret)); if (ret == OB_ENTRY_NOT_EXIST) { ret = OB_SUCCESS; } } else { rapidjson::Value::ConstMemberIterator iter = json_doc.FindMember(event_name.c_str()); if (iter == json_doc.MemberEnd()) { SERVER_LOG(WARN, "[ObMultiReplicaTestBase] [WAIT EVENT] not find target event", K(ret), K(event_name.c_str())); ret = OB_SUCCESS; } else { find_event = true; event_content = std::string(iter->value.GetString(), iter->value.GetStringLength()); fprintf(stdout, "[WAIT EVENT] find target event : EVENT_KEY = %s; EVENT_VAL = %s\n", event_name.c_str(), iter->value.GetString()); SERVER_LOG(INFO, "[ObMultiReplicaTestBase] [WAIT EVENT] find target event", K(event_name.c_str()), K(iter->value.GetString())); } } if (!find_event) { if (wait_timeout_ms != INT64_MAX && ObTimeUtility::fast_current_time() - start_time > wait_timeout_ms * 1000) { ret = OB_TIMEOUT; break; } else { usleep(retry_interval_ms * 1000); } } else { break; } } return ret; } int ObMultiReplicaTestBase::finish_event(const std::string &event_name, const std::string &event_content) { int ret = OB_SUCCESS; rapidjson::Document json_doc; json_doc.Parse("{}"); if (OB_FAIL(read_cur_json_document_(json_doc))) { SERVER_LOG(WARN, "read existed json document failed", K(ret)); if (ret == OB_ENTRY_NOT_EXIST) { ret = OB_SUCCESS; } } if (OB_SUCC(ret)) { FILE *fp = fopen(event_file_path_.c_str(), "w"); char write_buffer[4 * 1024]; rapidjson::FileWriteStream file_w_stream(fp, write_buffer, sizeof(write_buffer)); rapidjson::PrettyWriter prettywriter(file_w_stream); json_doc.AddMember(rapidjson::StringRef(event_name.c_str(), event_name.size()), rapidjson::StringRef(event_content.c_str(), event_content.size()), json_doc.GetAllocator()); json_doc.Accept(prettywriter); fclose(fp); } fprintf(stdout, "[WAIT EVENT] write target event : EVENT_KEY = %s; EVENT_VAL = %s\n", event_name.c_str(), event_content.c_str()); SERVER_LOG(INFO, "[ObMultiReplicaTestBase] [WAIT EVENT] write target event", K(event_name.c_str()), K(event_content.c_str())); return ret; } int ObMultiReplicaTestBase::start() { SERVER_LOG(INFO, "start simple cluster test base"); OB_LOGGER.set_enable_log_limit(false); // oceanbase::palf::election::GLOBAL_INIT_ELECTION_MODULE(); // oceanbase::palf::election::INIT_TS = 1; // oceanbase::palf::election::MAX_TST = 100 * 1000; GCONF.enable_perf_event = false; GCONF.enable_sql_audit = true; GCONF.enable_record_trace_log = false; int32_t log_level; bool change_log_level = false; if (enable_env_warn_log_) { if (OB_LOGGER.get_log_level() > OB_LOG_LEVEL_WARN) { change_log_level = true; log_level = OB_LOGGER.get_log_level(); OB_LOGGER.set_log_level("WARN"); } } int ret = replica_->simple_start(); is_started_ = true; if (change_log_level) { OB_LOGGER.set_log_level(log_level); } return ret; } int ObMultiReplicaTestBase::close() { int ret = OB_SUCCESS; if (OB_NOT_NULL(replica_)) { ret = replica_->simple_close(); } return ret; } int ObMultiReplicaTestBase::create_tenant(const char *tenant_name, const char *memory_size, const char *log_disk_size, const bool oracle_mode, const char *primary_zone) { SERVER_LOG(INFO, "create tenant start"); int32_t log_level; bool change_log_level = false; if (enable_env_warn_log_) { if (OB_LOGGER.get_log_level() > OB_LOG_LEVEL_WARN) { change_log_level = true; log_level = OB_LOGGER.get_log_level(); OB_LOGGER.set_log_level("WARN"); } } int ret = OB_SUCCESS; common::ObMySQLProxy &sql_proxy = replica_->get_sql_proxy(); int64_t affected_rows = 0; { ObSqlString sql; if (OB_FAIL(ret)) { } else if (OB_FAIL(sql.assign_fmt("set session ob_trx_timeout=1000000000000;"))) { SERVER_LOG(WARN, "set session", K(ret)); } else if (OB_FAIL(sql_proxy.write(sql.ptr(), affected_rows))) { SERVER_LOG(WARN, "set session", K(ret)); } } { ObSqlString sql; if (OB_FAIL(ret)) { } else if (OB_FAIL(sql.assign_fmt("set session ob_query_timeout=1000000000000;"))) { SERVER_LOG(WARN, "set session", K(ret)); } else if (OB_FAIL(sql_proxy.write(sql.ptr(), affected_rows))) { SERVER_LOG(WARN, "set session", K(ret)); } } { ObSqlString sql; if (OB_FAIL(ret)) { } else if (OB_FAIL(sql.assign_fmt("create resource unit box_ym_%s max_cpu 2, memory_size '%s', " "log_disk_size='%s';", tenant_name, memory_size, log_disk_size))) { SERVER_LOG(WARN, "create_tenant", K(ret)); } else if (OB_FAIL(sql_proxy.write(sql.ptr(), affected_rows))) { SERVER_LOG(WARN, "create_tenant", K(ret)); } } { ObSqlString sql; if (OB_FAIL(ret)) { } else if (OB_FAIL(sql.assign_fmt("create resource pool pool_ym_%s unit = 'box_ym_%s', " "unit_num = 1, zone_list = ('zone1', 'zone2', 'zone3');", tenant_name, tenant_name))) { SERVER_LOG(WARN, "create_tenant", K(ret)); } else if (OB_FAIL(sql_proxy.write(sql.ptr(), affected_rows))) { SERVER_LOG(WARN, "create_tenant", K(ret)); } } { ObSqlString sql; if (OB_FAIL(ret)) { } else if (OB_FAIL(sql.assign_fmt( "create tenant %s replica_num = 3, primary_zone='%s', " "resource_pool_list=('pool_ym_%s') set ob_tcp_invited_nodes='%%'%s", tenant_name, primary_zone, tenant_name, oracle_mode ? ", ob_compatibility_mode='oracle'" : ";"))) { SERVER_LOG(WARN, "create_tenant", K(ret)); } else if (OB_FAIL(sql_proxy.write(sql.ptr(), affected_rows))) { SERVER_LOG(WARN, "create_tenant", K(ret)); } } { ObSqlString sql; if (FAILEDx(sql.assign_fmt( "alter system set _enable_parallel_table_creation = false tenant = all"))) { SERVER_LOG(WARN, "create_tenant", KR(ret)); } else if (OB_FAIL(sql_proxy.write(sql.ptr(), affected_rows))) { SERVER_LOG(WARN, "create_tenant", KR(ret)); } usleep(5 * 1000 * 1000L); // 5s } if (change_log_level) { OB_LOGGER.set_log_level(log_level); } SERVER_LOG(INFO, "create tenant finish", K(ret)); return ret; } int ObMultiReplicaTestBase::delete_tenant(const char *tenant_name) { ObSqlString sql; common::ObMySQLProxy &sql_proxy = replica_->get_sql_proxy(); sql.assign_fmt("drop tenant %s force", tenant_name); int64_t affected_rows = 0; return sql_proxy.write(sql.ptr(), affected_rows); } int ObMultiReplicaTestBase::get_tenant_id(uint64_t &tenant_id, const char *tenant_name) { SERVER_LOG(INFO, "get_tenant_id"); int ret = OB_SUCCESS; ObSqlString sql; common::ObMySQLProxy &sql_proxy = replica_->get_sql_proxy(); sql.assign_fmt("select tenant_id from oceanbase.__all_tenant where tenant_name = '%s'", tenant_name); SMART_VAR(ObMySQLProxy::MySQLResult, res) { if (OB_FAIL(sql_proxy.read(res, sql.ptr()))) { SERVER_LOG(WARN, "get_tenant_id", K(ret)); } else { sqlclient::ObMySQLResult *result = res.get_result(); if (result != nullptr && OB_SUCC(result->next())) { ret = result->get_uint("tenant_id", tenant_id); SERVER_LOG(WARN, "get_tenant_id", K(ret)); } else { ret = OB_ERR_UNEXPECTED; SERVER_LOG(WARN, "get_tenant_id", K(ret)); } } } return ret; } int ObMultiReplicaTestBase::exec_write_sql_sys(const char *sql_str, int64_t &affected_rows) { int ret = OB_SUCCESS; ObSqlString sql; common::ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy(); return sql_proxy.write(sql_str, affected_rows); } int ObMultiReplicaTestBase::check_tenant_exist(bool &bool_ret, const char *tenant_name) { int ret = OB_SUCCESS; bool_ret = true; uint64_t tenant_id; if (OB_FAIL(get_tenant_id(tenant_id, tenant_name))) { SERVER_LOG(WARN, "get_tenant_id failed", K(ret)); } else { ObSqlString sql; common::ObMySQLProxy &sql_proxy = replica_->get_sql_proxy(); sql.assign_fmt("select tenant_id from oceanbase.gv$ob_units where tenant_id= '%" PRIu64 "' ", tenant_id); SMART_VAR(ObMySQLProxy::MySQLResult, res) { if (OB_FAIL(sql_proxy.read(res, sql.ptr()))) { SERVER_LOG(WARN, "get gv$ob_units", K(ret)); } else { sqlclient::ObMySQLResult *result = res.get_result(); if (result != nullptr && OB_SUCC(result->next())) { bool_ret = true; } else if (result == nullptr) { bool_ret = false; } else { ret = OB_ERR_UNEXPECTED; SERVER_LOG(WARN, "get_tenant_id", K(ret)); } } } } return ret; } } // namespace unittest } // namespace oceanbase OB_NOINLINE int ::oceanbase::omt::ObWorkerProcessor::process_err_test() { int ret = OB_SUCCESS; if (ATOMIC_LOAD(&::oceanbase::unittest::ObMultiReplicaTestBase::block_msg_)) { ret = OB_EAGAIN; SERVER_LOG(INFO, "[ERRSIM] block msg process", K(ret)); } return ret; }