From c5be3d3110f74de97ff273ecdea01d2b518a9aea Mon Sep 17 00:00:00 2001 From: KyrielightWei Date: Tue, 17 Oct 2023 09:13:57 +0000 Subject: [PATCH] set register_no into ObTxBufferNode to filter replay mds redo --- mittest/multi_replica/CMakeLists.txt | 3 +- .../env/ob_multi_replica_test_base.cpp | 438 ++++++++++++------ .../env/ob_multi_replica_test_base.h | 35 +- .../multi_replica/env/ob_multi_replica_util.h | 44 +- .../multi_replica/env/ob_simple_replica.cpp | 56 ++- mittest/multi_replica/env/ob_simple_replica.h | 7 +- .../test_mds_replay_from_ctx_table.cpp | 234 ++++++++++ .../test_ob_multi_replica_basic.cpp | 128 ++--- src/storage/tx/ob_multi_data_source.cpp | 34 +- src/storage/tx/ob_multi_data_source.h | 23 +- src/storage/tx/ob_trans_part_ctx.cpp | 102 ++-- src/storage/tx/ob_tx_ctx_mds.cpp | 99 ++-- src/storage/tx/ob_tx_ctx_mds.h | 6 +- 13 files changed, 883 insertions(+), 326 deletions(-) create mode 100644 mittest/multi_replica/test_mds_replay_from_ctx_table.cpp diff --git a/mittest/multi_replica/CMakeLists.txt b/mittest/multi_replica/CMakeLists.txt index 7c4fc05c9..156b167b8 100644 --- a/mittest/multi_replica/CMakeLists.txt +++ b/mittest/multi_replica/CMakeLists.txt @@ -23,4 +23,5 @@ endfunction() ob_unittest_multi_replica(test_ob_multi_replica_basic) ob_unittest_multi_replica(test_ob_dup_table_basic) ob_unittest_multi_replica(test_ob_dup_table_leader_switch) -ob_unittest_multi_replica(test_ob_dup_table_tablet_gc) \ No newline at end of file +ob_unittest_multi_replica(test_ob_dup_table_tablet_gc) +ob_unittest_multi_replica(test_mds_replay_from_ctx_table) diff --git a/mittest/multi_replica/env/ob_multi_replica_test_base.cpp b/mittest/multi_replica/env/ob_multi_replica_test_base.cpp index 9e267e4e8..183dec98a 100644 --- a/mittest/multi_replica/env/ob_multi_replica_test_base.cpp +++ b/mittest/multi_replica/env/ob_multi_replica_test_base.cpp @@ -10,14 +10,15 @@ * See the Mulan PubL v2 for more details. */ -#include -#include "ob_multi_replica_test_base.h" #include "lib/ob_errno.h" #include "lib/oblog/ob_log.h" #include "lib/profile/ob_trace_id.h" #include "lib/time/ob_time_utility.h" #include "lib/utility/ob_defer.h" #include "logservice/palf/election/utils/election_common_define.h" +#include "ob_multi_replica_test_base.h" +#include "ob_multi_replica_util.h" +#include namespace oceanbase { @@ -40,12 +41,12 @@ void init_log_and_gtest(int argc, char **argv) std::string app_gtest_log_name = app_name + "_gtest.log"; std::string app_trace_log_name = app_name + "_trace.log"; - system(("rm -rf " + app_log_name + "*").c_str()); - system(("rm -rf " + app_rs_log_name + "*").c_str()); - system(("rm -rf " + app_ele_log_name + "*").c_str()); - system(("rm -rf " + app_gtest_log_name + "*").c_str()); - system(("rm -rf " + app_trace_log_name + "*").c_str()); - system(("rm -rf " + app_name + "_*").c_str()); + // system(("rm -rf " + app_log_name + "*").c_str()); + // system(("rm -rf " + app_rs_log_name + "*").c_str()); + // system(("rm -rf " + app_ele_log_name + "*").c_str()); + // system(("rm -rf " + app_gtest_log_name + "*").c_str()); + // system(("rm -rf " + app_trace_log_name + "*").c_str()); + // system(("rm -rf " + app_name + "_*").c_str()); init_gtest_output(app_gtest_log_name); OB_LOGGER.set_file_name(app_log_name.c_str(), true, false, app_rs_log_name.c_str(), @@ -123,7 +124,8 @@ std::shared_ptr ObMultiReplicaTestBase::replica bool ObMultiReplicaTestBase::is_started_ = false; bool ObMultiReplicaTestBase::is_inited_ = false; std::string ObMultiReplicaTestBase::env_prefix_; -std::string ObMultiReplicaTestBase::curr_dir_; +std::string ObMultiReplicaTestBase::app_name_; +std::string ObMultiReplicaTestBase::exec_dir_; std::string ObMultiReplicaTestBase::env_prefix_path_; std::string ObMultiReplicaTestBase::event_file_path_; bool ObMultiReplicaTestBase::enable_env_warn_log_ = false; @@ -133,28 +135,48 @@ ObServerInfoList ObMultiReplicaTestBase::server_list_; std::string ObMultiReplicaTestBase::rs_list_; std::string ObMultiReplicaTestBase::local_ip_; -int ObMultiReplicaTestBase::child_pid_ = -1; -int ObMultiReplicaTestBase::child_pid2_ = -1; -int ObMultiReplicaTestBase::cur_zone_id_ = 0; -bool ObMultiReplicaTestBase::block_msg_ = false; +std::vector ObMultiReplicaTestBase::zone_pids_; +int ObMultiReplicaTestBase::cur_zone_id_ = -1; +int ObMultiReplicaTestBase::restart_zone_id_ = -1; +int ObMultiReplicaTestBase::restart_no_ = 0; -ObMultiReplicaTestBase::ObMultiReplicaTestBase() -{ -} +bool ObMultiReplicaTestBase::block_msg_ = false; + +ObMultiReplicaTestBase::ObMultiReplicaTestBase() {} ObMultiReplicaTestBase::~ObMultiReplicaTestBase() {} -int ObMultiReplicaTestBase::bootstrap_multi_replica(const std::string &env_prefix) +int ObMultiReplicaTestBase::bootstrap_multi_replica( const std::string & app_name, + const int restart_zone_id, + const int restart_no, + const std::string &env_prefix) { int ret = OB_SUCCESS; - if (!is_inited_) { + if (is_valid_zone_id(restart_zone_id) && restart_no <= 0) { + ret = OB_INVALID_ARGUMENT; + SERVER_LOG(ERROR, "invalid restart arg", K(ret), K(restart_zone_id), K(restart_no)); + } + + if (!is_inited_ && OB_SUCC(ret)) { env_prefix_ = env_prefix + "_test_data"; //+ std::to_string(ObTimeUtility::current_time()) + "_"; - curr_dir_ = get_current_dir_name(); - env_prefix_path_ = curr_dir_ + "/" + env_prefix_; + exec_dir_ = get_current_dir_name(); + env_prefix_path_ = exec_dir_ + "/" + env_prefix_; event_file_path_ = env_prefix_path_ + "/" + CLUSTER_EVENT_FILE_NAME; + + zone_pids_.resize(3); + restart_zone_id_ = restart_zone_id; + restart_no_ = restart_no; + app_name_ = app_name.substr(app_name.find_last_of("/\\") + 1); + // SERVER_LOG(INFO, "bootstrap_multi_replica arg", K(ret), K(getpid()), K(restart_zone_id_), + // K(restart_no_), K(env_prefix_path_.c_str())); + + printf("bootstrap_multi_replica arg: pid=%d, restart_zone_id=%d, restart_no=%d, " + "env_prefix_path=%s\n", + getpid(), restart_zone_id_, restart_no_, env_prefix_path_.c_str()); + if (OB_FAIL(init_replicas_())) { SERVER_LOG(WARN, "init multi replica failed.", KR(ret)); } @@ -162,6 +184,14 @@ int ObMultiReplicaTestBase::bootstrap_multi_replica(const std::string &env_prefi if (OB_FAIL(ret)) { // do nothing + } else if (0 == cur_zone_id_) { + // wait zone process exit + int status = 0; + for (int i = 0; i < 3; i++) { + waitpid(zone_pids_[i], &status, 0); + SERVER_LOG(INFO, "wait zone pid exit successfully", KR(ret), K(cur_zone_id_), K(i), + K(zone_pids_[i]), K(status)); + } } else if (!is_started_) { if (OB_FAIL(start())) { SERVER_LOG(WARN, "start multi replica failed.", KR(ret)); @@ -195,27 +225,28 @@ int ObMultiReplicaTestBase::wait_all_test_completed() "MAX_ZONE_COUNT = %d\n", ret, cur_zone_id_, MAX_ZONE_COUNT); } - if (cur_zone_id_ == 1) { - int status = 0; - int status2 = 0; - waitpid(child_pid_, &status, 0); - waitpid(child_pid2_, &status2, 0); - if (0 != status || 0 != status2) { - fprintf(stdout, - "Child process exit with error code : [%d]%d, [%d]%d\n", - child_pid_, status, child_pid2_, status2); - SERVER_LOG(INFO, "[ObMultiReplicaTestBase] Child process exit with error code", K(child_pid_), - K(status), K(child_pid2_), K(status2)); - ret = status; - return ret; - } else { - fprintf(stdout, - "Child process run all test cases done. [%d]%d, [%d]%d\n", - child_pid_, status, child_pid2_, status2); - SERVER_LOG(INFO, "[ObMultiReplicaTestBase] Child process run all test cases done", - K(child_pid_), K(status), K(child_pid2_), K(status2)); - } - } + // if (cur_zone_id_ == 1) { + // int status = 0; + // int status2 = 0; + // waitpid(child_pid_, &status, 0); + // waitpid(child_pid2_, &status2, 0); + // if (0 != status || 0 != status2) { + // fprintf(stdout, + // "Child process exit with error code : [%d]%d, [%d]%d\n", + // child_pid_, status, child_pid2_, status2); + // SERVER_LOG(INFO, "[ObMultiReplicaTestBase] Child process exit with error code", + // K(child_pid_), + // K(status), K(child_pid2_), K(status2)); + // ret = status; + // return ret; + // } else { + // fprintf(stdout, + // "Child process run all test cases done. [%d]%d, [%d]%d\n", + // child_pid_, status, child_pid2_, status2); + // SERVER_LOG(INFO, "[ObMultiReplicaTestBase] Child process run all test cases done", + // K(child_pid_), K(status), K(child_pid2_), K(status2)); + // } + // } return ret; } @@ -255,7 +286,7 @@ void ObMultiReplicaTestBase::TearDownTestCase() // ASSERT_EQ(ret, OB_SUCCESS); } int fail_cnt = ::testing::UnitTest::GetInstance()->failed_test_case_count(); - if (chdir(curr_dir_.c_str()) == 0) { + if (chdir(exec_dir_.c_str()) == 0) { bool to_delete = true; if (to_delete) { // system((std::string("rm -rf ") + env_prefix_ + std::string("*")).c_str()); @@ -266,93 +297,220 @@ void ObMultiReplicaTestBase::TearDownTestCase() int ObMultiReplicaTestBase::init_replicas_() { - SERVER_LOG(INFO, "init simple cluster test base"); + SERVER_LOG(INFO, "init simple cluster test base", K(restart_zone_id_), K(restart_no_)); int ret = OB_SUCCESS; - system(("rm -rf " + env_prefix_).c_str()); + if (!is_valid_zone_id(restart_zone_id_)) { + // for guard process + system(("rm -rf " + env_prefix_).c_str()); - SERVER_LOG(INFO, "create dir and change work dir start.", K(env_prefix_.c_str())); - if (OB_FAIL(mkdir(env_prefix_.c_str(), 0777))) { - } else if (OB_FAIL(chdir(env_prefix_.c_str()))) { + // SERVER_LOG(INFO, "create dir and change work dir start.", K(env_prefix_.c_str())); + if (OB_FAIL(mkdir(env_prefix_.c_str(), 0777))) { + } else if (OB_FAIL(chdir(env_prefix_.c_str()))) { + } else { + const char *current_dir = env_prefix_.c_str(); + // SERVER_LOG(INFO, "create dir and change work dir done.", K(current_dir)); + } + + std::string app_log_name = app_name_ + ".log"; + std::string app_rs_log_name = app_name_ + "_rs.log"; + std::string app_ele_log_name = app_name_ + "_election.log"; + std::string app_gtest_log_name = app_name_ + "_gtest.log"; + std::string app_trace_log_name = app_name_ + "_trace.log"; + + // system(("rm -rf " + app_log_name + "*").c_str()); + // system(("rm -rf " + app_rs_log_name + "*").c_str()); + // system(("rm -rf " + app_ele_log_name + "*").c_str()); + // system(("rm -rf " + app_gtest_log_name + "*").c_str()); + // system(("rm -rf " + app_trace_log_name + "*").c_str()); + // system(("rm -rf " + app_name + "_*").c_str()); + + init_gtest_output(app_gtest_log_name); + OB_LOGGER.set_file_name(app_log_name.c_str(), true, false, app_rs_log_name.c_str(), + app_ele_log_name.c_str(), app_trace_log_name.c_str()); + + if (OB_SUCC(ret)) { + local_ip_ = get_local_ip(); + if (local_ip_ == "") { + SERVER_LOG(WARN, "get_local_ip failed"); + return -666666666; + } + } + + // mkdir + std::vector dirs; + rs_list_.clear(); + rpc_ports_.clear(); + server_list_.reset(); + + int server_fd = 0; + for (int i = 1; i <= MAX_ZONE_COUNT && OB_SUCC(ret); i++) { + std::string zone_dir = "zone" + std::to_string(i); + ret = mkdir(zone_dir.c_str(), 0777); + std::string data_dir = zone_dir + "/store"; + dirs.push_back(data_dir); + dirs.push_back(zone_dir + "/run"); + dirs.push_back(zone_dir + "/etc"); + dirs.push_back(zone_dir + "/log"); + dirs.push_back(zone_dir + "/wallet"); + + dirs.push_back(data_dir + "/clog"); + dirs.push_back(data_dir + "/slog"); + dirs.push_back(data_dir + "/sstable"); + + int64_t tmp_port = observer::ObSimpleServerReplica::get_rpc_port(server_fd); + rpc_ports_.push_back(tmp_port); + + rs_list_ += local_ip_ + ":" + std::to_string(rpc_ports_[i - 1]) + ":" + + std::to_string(rpc_ports_[i - 1] + 1); + + if (i < MAX_ZONE_COUNT) { + rs_list_ += ";"; + } + + obrpc::ObServerInfo server_info; + server_info.zone_ = zone_dir.c_str(); + server_info.server_ = + common::ObAddr(common::ObAddr::IPV4, local_ip_.c_str(), rpc_ports_[i - 1]); + server_info.region_ = "sys_region"; + server_list_.push_back(server_info); + } + + if (OB_SUCC(ret)) { + for (auto &dir : dirs) { + ret = mkdir(dir.c_str(), 0777); + if (OB_FAIL(ret)) { + SERVER_LOG(ERROR, "ObSimpleServerReplica mkdir", K(ret), K(dir.c_str())); + return ret; + } + } + } + + if (OB_SUCC(ret)) { + for (int j = 0; j < MAX_ZONE_COUNT && OB_SUCC(ret); j++) { + if (OB_FAIL(finish_event("ZONE" + std::to_string(j + 1) + "_RPC_PORT", + std::to_string(rpc_ports_[j])))) { + + SERVER_LOG(ERROR, "write RPC_PORT event failed", K(ret), K(j), K(rpc_ports_[j])); + } + } + } } else { - const char *current_dir = env_prefix_.c_str(); - SERVER_LOG(INFO, "create dir and change work dir done.", K(current_dir)); - } - if (OB_SUCC(ret)) { - local_ip_ = get_local_ip(); - if (local_ip_ == "") { - SERVER_LOG(WARN, "get_local_ip failed"); - return -666666666; + rs_list_.clear(); + rpc_ports_.clear(); + server_list_.reset(); + if (OB_FAIL(chdir(env_prefix_.c_str()))) { + } else { + const char *current_dir = env_prefix_.c_str(); + SERVER_LOG(INFO, "create dir and change work dir done.", K(current_dir)); } - } + std::string app_log_name = "zone" + std::to_string(restart_zone_id_) + "/" + app_name_ + ".log"; + std::string app_rs_log_name = + "zone" + std::to_string(restart_zone_id_) + "/" + app_name_ + "_rs.log"; + std::string app_ele_log_name = + "zone" + std::to_string(restart_zone_id_) + "/" + app_name_ + "_election.log"; + std::string app_gtest_log_name = + "zone" + std::to_string(restart_zone_id_) + "/" + app_name_ + "_gtest.log"; + std::string app_trace_log_name = + "zone" + std::to_string(restart_zone_id_) + "/" + app_name_ + "_trace.log"; - // mkdir - std::vector dirs; - rs_list_.clear(); - rpc_ports_.clear(); - server_list_.reset(); + // system(("rm -rf " + app_log_name + "*").c_str()); + // system(("rm -rf " + app_rs_log_name + "*").c_str()); + // system(("rm -rf " + app_ele_log_name + "*").c_str()); + // system(("rm -rf " + app_gtest_log_name + "*").c_str()); + // system(("rm -rf " + app_trace_log_name + "*").c_str()); + // system(("rm -rf " + app_name + "_*").c_str()); - int server_fd = 0; - for (int i = 1; i <= MAX_ZONE_COUNT && OB_SUCC(ret); i++) { - std::string zone_dir = "zone" + std::to_string(i); - ret = mkdir(zone_dir.c_str(), 0777); - std::string data_dir = zone_dir + "/store"; - dirs.push_back(data_dir); - dirs.push_back(zone_dir + "/run"); - dirs.push_back(zone_dir + "/etc"); - dirs.push_back(zone_dir + "/log"); - dirs.push_back(zone_dir + "/wallet"); + init_gtest_output(app_gtest_log_name); + OB_LOGGER.set_file_name(app_log_name.c_str(), true, false, app_rs_log_name.c_str(), + app_ele_log_name.c_str(), app_trace_log_name.c_str()); - dirs.push_back(data_dir + "/clog"); - dirs.push_back(data_dir + "/slog"); - dirs.push_back(data_dir + "/sstable"); - - int64_t tmp_port = observer::ObSimpleServerReplica::get_rpc_port(server_fd); - rpc_ports_.push_back(tmp_port); - - rs_list_ += local_ip_ + ":" + std::to_string(rpc_ports_[i - 1]) + ":" - + std::to_string(rpc_ports_[i - 1] + 1); - - if (i < MAX_ZONE_COUNT) { - rs_list_ += ";"; + if (OB_SUCC(ret)) { + local_ip_ = get_local_ip(); + if (local_ip_ == "") { + SERVER_LOG(WARN, "get_local_ip failed"); + return -666666666; + } } - obrpc::ObServerInfo server_info; - server_info.zone_ = zone_dir.c_str(); - server_info.server_ = - common::ObAddr(common::ObAddr::IPV4, local_ip_.c_str(), rpc_ports_[i - 1]); - server_info.region_ = "sys_region"; - server_list_.push_back(server_info); - } + if (OB_SUCC(ret)) { + for (int j = 0; j < MAX_ZONE_COUNT && OB_SUCC(ret); j++) { + std::string rpc_port_str = ""; + if (OB_FAIL(wait_event_finish("ZONE" + std::to_string(j + 1) + "_RPC_PORT", rpc_port_str, + 5000 /*5s*/, 100 /*100ms*/))) { - if (OB_SUCC(ret)) { - for (auto &dir : dirs) { - ret = mkdir(dir.c_str(), 0777); - if (OB_FAIL(ret)) { - SERVER_LOG(ERROR, "ObSimpleServerReplica mkdir", K(ret), K(dir.c_str())); - return ret; + SERVER_LOG(ERROR, "read RPC_PORT event failed", K(ret), K(j), K(rpc_ports_[j]), + K(rpc_port_str.c_str())); + } else { + + int tmp_rpc_port = std::stoi(rpc_port_str); + rpc_ports_.push_back(tmp_rpc_port); + + rs_list_ += local_ip_ + ":" + rpc_port_str + ":" + std::to_string(tmp_rpc_port + 1); + + if (j < MAX_ZONE_COUNT) { + rs_list_ += ";"; + } + + obrpc::ObServerInfo server_info; + std::string zone_dir = "zone" + std::to_string(j + 1); + server_info.zone_ = zone_dir.c_str(); + server_info.server_ = + common::ObAddr(common::ObAddr::IPV4, local_ip_.c_str(), tmp_rpc_port); + server_info.region_ = "sys_region"; + server_list_.push_back(server_info); + } } } } if (OB_SUCC(ret)) { - child_pid_ = fork(); - child_pid2_ = -1; + if (!is_valid_zone_id(restart_zone_id_)) { - if (child_pid_ < 0) { - perror("fork"); - exit(EXIT_FAILURE); - } else if (child_pid_ > 0) { - child_pid2_ = fork(); - if (child_pid2_ > 0) { - ret = init_test_replica_(1); - } else if (child_pid2_ == 0) { - ret = init_test_replica_(3); + int prev_zone_pid = 999999; + for (int i = 0; i < 3; i++) { + if (prev_zone_pid > 0) { + prev_zone_pid = fork(); + if (prev_zone_pid > 0) { + zone_pids_[i] = prev_zone_pid; + } else if (prev_zone_pid == 0) { + cur_zone_id_ = i + 1; + SERVER_LOG(INFO, "[ObMultiReplicaTestBase] init sub process zone id", K(i), K(getpid()), + K(cur_zone_id_), K(prev_zone_pid)); + } else if (prev_zone_pid < 0) { + + perror("fork"); + exit(EXIT_FAILURE); + } + } } - } else if (child_pid_ == 0) { - ret = init_test_replica_(2); + if (cur_zone_id_ < 0) { + // guard process + cur_zone_id_ = 0; + + ::testing::GTEST_FLAG(filter) = "GuardProcessTest"; + SERVER_LOG(INFO, "[ObMultiReplicaTestBase] init guard zone id", K(ret), K(getpid()), + K(cur_zone_id_), K(zone_pids_[0]), K(zone_pids_[1]), K(zone_pids_[2])); + } else { + ::testing::GTEST_FLAG(filter) = + ObMultiReplicaTestBase::ZONE_TEST_CASE_NAME[cur_zone_id_ - 1] + "*"; + fprintf(stdout, "zone %d test_case_name = %s\n", cur_zone_id_, + ObMultiReplicaTestBase::ZONE_TEST_CASE_NAME[cur_zone_id_ - 1].c_str()); + ret = init_test_replica_(cur_zone_id_); + SERVER_LOG(INFO, "[ObMultiReplicaTestBase] init sub process replica", K(getpid()), + K(cur_zone_id_), K(prev_zone_pid)); + } + } else { + cur_zone_id_ = restart_zone_id_; + std::string restart_case_name = TEST_CASE_BASE_NAME + std::string("_RESTART_") + + std::to_string(restart_no_) + std::string("_ZONE") + + std::to_string(restart_zone_id_) + "*"; + ::testing::GTEST_FLAG(filter) = restart_case_name.c_str(); + fprintf(stdout, "restart pid%d zone %d test_case_name = %s\n", getpid(), restart_zone_id_, + restart_case_name.c_str()); + ret = init_test_replica_(cur_zone_id_); } } @@ -360,21 +518,46 @@ int ObMultiReplicaTestBase::init_replicas_() return ret; } +int ObMultiReplicaTestBase::restart_zone(const int zone_id, const int restart_no) +{ + int ret = OB_SUCCESS; + char exec_path[4096] = {0}; + int return_val = 0; + + fprintf(stdout, + "[RESTART %d] prepare restart zone %d with the restart_no of %d (cur_time = %ld)\n", + getpid(), zone_id, restart_no, ObTimeUtility::current_time()); + + if (zone_id != cur_zone_id_ || restart_no <= 0) { + ret = OB_INVALID_ARGUMENT; + SERVER_LOG(ERROR, "invalid restart arg", K(ret), K(zone_id), K(restart_no), K(cur_zone_id_), + K(getpid())); + } else if (0 >= (return_val = readlink("/proc/self/exe", exec_path, 4096))) { + ret = OB_ERR_UNEXPECTED; + SERVER_LOG(ERROR, "realink for exec name error", K(ret), K(zone_id), K(restart_no), + K(cur_zone_id_), K(getpid()), K(return_val), K(exec_path)); + } else if (OB_FAIL(chdir(exec_dir_.c_str()))) { + // } else if (OB_FALSE_IT()) + + } else if (0 > (return_val = execl(exec_path, exec_path, (std::to_string(zone_id)).c_str(), + (std::to_string(restart_no)).c_str(), NULL))) { + ret = OB_ERR_UNEXPECTED; + SERVER_LOG(ERROR, "execl for restart error", K(ret), K(errno), K(zone_id), K(restart_no), + K(cur_zone_id_), K(getpid()), K(return_val), K(exec_path)); + } + + return ret; +} + int ObMultiReplicaTestBase::init_test_replica_(const int zone_id) { int ret = OB_SUCCESS; - ::testing::GTEST_FLAG(filter) = ObMultiReplicaTestBase::ZONE_TEST_CASE_NAME[zone_id - 1] + "*"; - // std::string output_file_path = - // env_prefix_path_ + "/" + "ZONE" + std::to_string(zone_id) + ".output"; - // ::testing::GTEST_FLAG(output) = output_file_path; - fprintf(stdout, "zone %d test_case_name = %s\n", zone_id, - ObMultiReplicaTestBase::ZONE_TEST_CASE_NAME[zone_id - 1].c_str()); - if (replica_ == nullptr) { cur_zone_id_ = zone_id; replica_ = std::make_shared( - "zone" + std::to_string(zone_id), zone_id, rpc_ports_[zone_id - 1], rs_list_, server_list_, + app_name_, "zone" + std::to_string(zone_id), zone_id, rpc_ports_[zone_id - 1], rs_list_, + server_list_, is_valid_zone_id(restart_zone_id_), oceanbase::observer::ObServer::get_instance(), "./store", log_disk_size_, memory_size_); } else { SERVER_LOG(ERROR, "construct ObSimpleServerReplica repeatedlly", K(ret), K(zone_id), @@ -395,9 +578,8 @@ int ObMultiReplicaTestBase::read_cur_json_document_(rapidjson::Document &json_do int ret = OB_SUCCESS; FILE *fp = fopen(event_file_path_.c_str(), "r"); if (fp == NULL) { - if(json_doc.IsObject()) - { - fprintf(stdout, "Fail to open file! file_path = %s\n", event_file_path_.c_str()); + if (json_doc.IsObject()) { + fprintf(stdout, "Fail to open file! file_path = %s\n", event_file_path_.c_str()); } ret = OB_ENTRY_NOT_EXIST; return ret; @@ -492,8 +674,8 @@ int ObMultiReplicaTestBase::finish_event(const std::string &event_name, fclose(fp); } - SERVER_LOG(INFO, "[ObMultiReplicaTestBase] [WAIT EVENT] write target event", K(event_name.c_str()), - K(event_content.c_str())); + SERVER_LOG(INFO, "[ObMultiReplicaTestBase] [WAIT EVENT] write target event", + K(event_name.c_str()), K(event_content.c_str())); return ret; } @@ -608,7 +790,8 @@ int ObMultiReplicaTestBase::create_tenant(const char *tenant_name, } { ObSqlString sql; - if (FAILEDx(sql.assign_fmt("alter system set _enable_parallel_table_creation = false tenant = all"))) { + if (FAILEDx(sql.assign_fmt( + "alter system set _enable_parallel_table_creation = false tenant = all"))) { SERVER_LOG(WARN, "create_tenant", KR(ret)); } else if (OB_FAIL(sql_proxy.write(sql.ptr(), affected_rows))) { SERVER_LOG(WARN, "create_tenant", KR(ret)); @@ -698,19 +881,16 @@ int ObMultiReplicaTestBase::check_tenant_exist(bool &bool_ret, const char *tenan return ret; } - } // namespace unittest } // namespace oceanbase - int ::oceanbase::omt::ObWorkerProcessor::process_err_test() { int ret = OB_SUCCESS; - if(ATOMIC_LOAD(&::oceanbase::unittest::ObMultiReplicaTestBase::block_msg_)) - { - ret =OB_EAGAIN; - SERVER_LOG(INFO, "[ObMultiReplicaTestBase] block msg process",K(ret)); + if (ATOMIC_LOAD(&::oceanbase::unittest::ObMultiReplicaTestBase::block_msg_)) { + ret = OB_EAGAIN; + SERVER_LOG(INFO, "[ObMultiReplicaTestBase] block msg process", K(ret)); } return ret; diff --git a/mittest/multi_replica/env/ob_multi_replica_test_base.h b/mittest/multi_replica/env/ob_multi_replica_test_base.h index 7fe1c22d4..5381a3c79 100644 --- a/mittest/multi_replica/env/ob_multi_replica_test_base.h +++ b/mittest/multi_replica/env/ob_multi_replica_test_base.h @@ -35,6 +35,23 @@ int set_trace_id(char *buf); void init_log_and_gtest(int argc, char **argv); void init_gtest_output(std::string >est_log_name); +struct RestartIndex +{ + int restart_zone_id_; + int restart_no_; + + void reset() + { + restart_zone_id_ = -1; + restart_no_ = 0; + } + + RestartIndex() + { + reset(); + } +}; + class ObMultiReplicaTestBase : public testing::Test { public: @@ -43,8 +60,12 @@ public: ObMultiReplicaTestBase(); virtual ~ObMultiReplicaTestBase(); - static int bootstrap_multi_replica(const std::string &env_prefix = "run_"); + static int bootstrap_multi_replica(const std::string &app_name, + const int restart_zone_id = -1, + const int restart_no = 0, + const std::string &env_prefix = "run_"); static int wait_all_test_completed(); + static int restart_zone(const int zone_id, const int restart_no); static int start(); static int close(); observer::ObServer &get_curr_observer() { return replica_->get_observer(); } @@ -66,12 +87,14 @@ public: int exec_write_sql_sys(const char *sql_str, int64_t &affected_rows); int check_tenant_exist(bool &bool_ret, const char *tenant_name = DEFAULT_TEST_TENANT_NAME); + static std::string TEST_CASE_BASE_NAME; static std::string ZONE_TEST_CASE_NAME[MAX_ZONE_COUNT]; - protected: static int init_replicas_(); static int init_test_replica_(const int zone_id); + static int is_valid_zone_id(int zone_id) { return zone_id >= 0; } + protected: virtual void SetUp(); virtual void TearDown(); @@ -85,7 +108,8 @@ protected: static bool is_inited_; static std::thread th_; static std::string env_prefix_; - static std::string curr_dir_; + static std::string app_name_; + static std::string exec_dir_; static std::string event_file_path_; static std::string env_prefix_path_; static bool enable_env_warn_log_; @@ -96,8 +120,9 @@ protected: static std::string local_ip_; static int cur_zone_id_; - static int child_pid_; - static int child_pid2_; + static int restart_zone_id_; + static int restart_no_; + static std::vector zone_pids_; static std::vector rpc_ports_; static ObServerInfoList server_list_; diff --git a/mittest/multi_replica/env/ob_multi_replica_util.h b/mittest/multi_replica/env/ob_multi_replica_util.h index 62f17f37f..f329d4335 100644 --- a/mittest/multi_replica/env/ob_multi_replica_util.h +++ b/mittest/multi_replica/env/ob_multi_replica_util.h @@ -37,6 +37,7 @@ { \ namespace unittest \ { \ + std::string ObMultiReplicaTestBase::TEST_CASE_BASE_NAME = STR_NAME(CUR_TEST_CASE_NAME); \ std::string ObMultiReplicaTestBase::ZONE_TEST_CASE_NAME[MAX_ZONE_COUNT] = { \ GET_ZONE_TEST_CLASS_STR(1), GET_ZONE_TEST_CLASS_STR(2), GET_ZONE_TEST_CLASS_STR(3)}; \ \ @@ -62,22 +63,61 @@ } \ } + +#define RESTART_ZONE_TEST_CASE_CALSS_NAME_INNER(TEST_CASE_NAME, ZONE_ID, RESTART_NO) \ + TEST_CASE_NAME##_RESTART_##RESTART_NO##_ZONE##ZONE_ID +#define RESTART_ZONE_TEST_CASE_CALSS_NAME(TEST_CASE_NAME, ZONE_ID, RESTART_NO) \ + RESTART_ZONE_TEST_CASE_CALSS_NAME_INNER(TEST_CASE_NAME, ZONE_ID, RESTART_NO) + +#define GET_RESTART_ZONE_TEST_CLASS_NAME(ZONE_ID, RESTART_NO) \ + RESTART_ZONE_TEST_CASE_CALSS_NAME(CUR_TEST_CASE_NAME, ZONE_ID, RESTART_NO) + +#define GET_RESTART_ZONE_TEST_CLASS_STR(ZONE_ID, REGISTER_NO) \ + STR_NAME(GET_RESTART_ZONE_TEST_CLASS_NAME(ZONE_ID, REGISTER_NO)) + +#define APPEND_RESTART_TEST_CASE_CLASS(ZONE_ID, REGISTER_NO) \ + namespace oceanbase \ + { \ + namespace unittest \ + { \ + class GET_RESTART_ZONE_TEST_CLASS_NAME(ZONE_ID, REGISTER_NO) : public ObMultiReplicaTestBase \ + { \ + public: \ + GET_RESTART_ZONE_TEST_CLASS_NAME(ZONE_ID, REGISTER_NO)() : ObMultiReplicaTestBase() {} \ + }; \ + TEST_F(GET_RESTART_ZONE_TEST_CLASS_NAME(ZONE_ID, REGISTER_NO), start_observer) {} \ + } \ + } #define MULTI_REPLICA_TEST_MAIN_FUNCTION(TEST_DIR_PREFIX) \ int main(int argc, char **argv) \ { \ int ret = OB_SUCCESS; \ + int restart_zone_id = -1; \ + int restart_zone_no = 0; \ + if (argc < 1) { \ + abort(); \ + } \ + std::string app_name = argv[0]; \ + if (argc >= 3) { \ + std::string tmp_str; \ + tmp_str = argv[1]; \ + restart_zone_id = std::stoi(tmp_str); \ + tmp_str = argv[2]; \ + restart_zone_no = std::stoi(tmp_str); \ + } \ char *log_level = (char *)"INFO"; \ - oceanbase::unittest::init_log_and_gtest(argc, argv); \ OB_LOGGER.set_log_level(log_level); \ ::testing::InitGoogleTest(&argc, argv); \ if (OB_FAIL(oceanbase::unittest::ObMultiReplicaTestBase::bootstrap_multi_replica( \ - #TEST_DIR_PREFIX))) { \ + app_name, restart_zone_id, restart_zone_no, #TEST_DIR_PREFIX))) { \ fprintf(stdout, "init test case failed. ret = %d", ret); \ return ret; \ } \ return RUN_ALL_TESTS(); \ } + // oceanbase::unittest::init_log_and_gtest(argc, argv); + namespace oceanbase { namespace unittest diff --git a/mittest/multi_replica/env/ob_simple_replica.cpp b/mittest/multi_replica/env/ob_simple_replica.cpp index 087a8509c..26459c0d3 100644 --- a/mittest/multi_replica/env/ob_simple_replica.cpp +++ b/mittest/multi_replica/env/ob_simple_replica.cpp @@ -79,24 +79,21 @@ int64_t ObSimpleServerReplica::get_rpc_port(int &server_fd) return unittest::get_rpc_port(server_fd); } -ObSimpleServerReplica::ObSimpleServerReplica(const std::string &env_prefix, - const int zone_id, - const int rpc_port, - const string &rs_list, - const ObServerInfoList &server_list, - ObServer &server , - const std::string &dir_prefix, - const char *log_disk_size, - const char *memory_limit) - : server_(server), - zone_id_(zone_id), - rpc_port_(rpc_port), - rs_list_(rs_list), - server_info_list_(server_list), - data_dir_(dir_prefix), - run_dir_(env_prefix), - log_disk_size_(log_disk_size), - memory_limit_(memory_limit) +ObSimpleServerReplica::ObSimpleServerReplica(const std::string &app_name, + const std::string &env_prefix, + const int zone_id, + const int rpc_port, + const string &rs_list, + const ObServerInfoList &server_list, + bool is_restart, + ObServer &server, + const std::string &dir_prefix, + const char *log_disk_size, + const char *memory_limit) + : server_(server), zone_id_(zone_id), rpc_port_(rpc_port), rs_list_(rs_list), + server_info_list_(server_list), app_name_(app_name), data_dir_(dir_prefix), + run_dir_(env_prefix), log_disk_size_(log_disk_size), memory_limit_(memory_limit), + is_restart_(is_restart) { // if (ObSimpleServerReplicaRestartHelper::is_restart_) { // std::string port_file_name = run_dir_ + std::string("/port.txt"); @@ -106,7 +103,7 @@ ObSimpleServerReplica::ObSimpleServerReplica(const std::string &env_prefix, // } // fscanf(infile, "%d\n", &rpc_port_); // } else { - // rpc_port_ = unittest::get_rpc_port(server_fd_); + // rpc_port_ = unittest::get_rpc_port(server_fd_); // } mysql_port_ = rpc_port_ + 1; } @@ -172,16 +169,31 @@ int ObSimpleServerReplica::simple_init() getpid(), zone_id_, rpc_port_, mysql_port_, zone_str.c_str(), server_info_list_.count(), rs_list_.c_str()); + // 因为改变了工作目录,设置为绝对路径 for (int i = 0; i < MAX_FD_FILE; i++) { int len = strlen(OB_LOGGER.log_file_[i].filename_); if (len > 0) { + std::string cur_file_name = OB_LOGGER.log_file_[i].filename_; + cur_file_name = cur_file_name.substr(cur_file_name.find_last_of("/\\") + 1); std::string ab_file = std::string(curr_dir) + "/" + run_dir_ + "/" - + std::string(OB_LOGGER.log_file_[i].filename_); + + cur_file_name; SERVER_LOG(INFO, "convert ab file", K(ab_file.c_str())); MEMCPY(OB_LOGGER.log_file_[i].filename_, ab_file.c_str(), ab_file.size()); } } + // std::string ab_file = std::string(curr_dir) + "/" + run_dir_ + "/" + app_name_; + // + // std::string app_log_name = ab_file + ".log"; + // std::string app_rs_log_name = ab_file + "_rs.log"; + // std::string app_ele_log_name = ab_file + "_election.log"; + // std::string app_trace_log_name = ab_file + "_trace.log"; + // OB_LOGGER.set_file_name(app_log_name.c_str(), + // true, + // false, + // app_rs_log_name.c_str(), + // app_ele_log_name.c_str(), + // app_trace_log_name.c_str()); ObPLogWriterCfg log_cfg; ret = server_.init(opts, log_cfg); @@ -277,7 +289,7 @@ int ObSimpleServerReplica::simple_start() { int ret = OB_SUCCESS; // bootstrap - if (zone_id_ == 1) { + if (zone_id_ == 1 && !is_restart_) { std::thread th([this]() { int64_t start_time = ObTimeUtility::current_time(); int ret = OB_SUCCESS; @@ -298,7 +310,7 @@ int ObSimpleServerReplica::simple_start() SERVER_LOG(INFO, "ObSimpleServerReplica init succ prepare to start...", K(zone_id_), K(rpc_port_), K(mysql_port_)); ret = server_.start(); - if (zone_id_ == 1) { + if (zone_id_ == 1 && !is_restart_) { th_.join(); fprintf(stdout, "[BOOTSTRAP SUCC] zone_id = %d, rpc_port = %d, mysql_port = %d\n", zone_id_, rpc_port_, mysql_port_); diff --git a/mittest/multi_replica/env/ob_simple_replica.h b/mittest/multi_replica/env/ob_simple_replica.h index 64b798646..66f3a06ce 100644 --- a/mittest/multi_replica/env/ob_simple_replica.h +++ b/mittest/multi_replica/env/ob_simple_replica.h @@ -33,11 +33,13 @@ public: static int64_t get_rpc_port(int &server_fd); public: - ObSimpleServerReplica(const std::string &env_prefix, + ObSimpleServerReplica(const std::string &app_name, + const std::string &env_prefix, const int zone_id, const int rpc_port, const string &rs_list, const ObServerInfoList &server_list, + bool is_restart, ObServer &server = ObServer::get_instance(), const std::string &dir_prefix = "./store", const char *log_disk_size = "10G", @@ -77,6 +79,7 @@ private: int mysql_port_; std::string rs_list_; ObServerInfoList server_info_list_; + std::string app_name_; std::string data_dir_; std::string optstr_; std::string run_dir_; @@ -91,6 +94,8 @@ private: int server_fd_; bool set_bootstrap_warn_log_; + bool is_restart_; + obrpc::ObNetClient bootstrap_client_; obrpc::ObSrvRpcProxy bootstrap_srv_proxy_; }; diff --git a/mittest/multi_replica/test_mds_replay_from_ctx_table.cpp b/mittest/multi_replica/test_mds_replay_from_ctx_table.cpp new file mode 100644 index 000000000..ac3950362 --- /dev/null +++ b/mittest/multi_replica/test_mds_replay_from_ctx_table.cpp @@ -0,0 +1,234 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#include +#define USING_LOG_PREFIX SERVER +#define protected public +#define private public + +#include "env/ob_fast_bootstrap.h" +#include "env/ob_multi_replica_util.h" +#include "lib/mysqlclient/ob_mysql_result.h" +#include "storage/tx/ob_trans_part_ctx.h" + +using namespace oceanbase::transaction; +using namespace oceanbase::storage; + +#define CUR_TEST_CASE_NAME ObMdsReplayFromCtxTable + +DEFINE_MULTI_ZONE_TEST_CASE_CLASS + +APPEND_RESTART_TEST_CASE_CLASS(2, 1); + +MULTI_REPLICA_TEST_MAIN_FUNCTION(test_mds_replay_from_ctx_table_); + +namespace oceanbase +{ +namespace unittest +{ + +// using namespace storage; +using namespace oceanbase::storage::checkpoint; + +struct RegisterSuccTxArg +{ + transaction::ObTransID tx_id_; + share::SCN register_scn1_; + int64_t register_no1_; + share::SCN register_scn2_; + int64_t register_no2_; + + TO_STRING_KV(K(tx_id_), K(register_scn1_), K(register_no1_), K(register_scn2_), K(register_no2_)); + + OB_UNIS_VERSION(1); +}; + +OB_SERIALIZE_MEMBER(RegisterSuccTxArg, + tx_id_, + register_scn1_, + register_no1_, + register_scn2_, + register_no2_); + +RegisterSuccTxArg register_succ_arg; + +common::ObMySQLTransaction mysql_trans; +oceanbase::observer::ObInnerSQLConnection *register_mds_conn = nullptr; + +void minor_freeze_tx_ctx_memtable(ObLS *ls) +{ + TRANS_LOG(INFO, "minor_freeze_tx_ctx_memtable begin"); + int ret = OB_SUCCESS; + + ObCheckpointExecutor *checkpoint_executor = ls->get_checkpoint_executor(); + ASSERT_NE(nullptr, checkpoint_executor); + ObTxCtxMemtable *tx_ctx_memtable = dynamic_cast( + dynamic_cast( + checkpoint_executor->handlers_[logservice::TRANS_SERVICE_LOG_BASE_TYPE]) + ->common_checkpoints_[ObCommonCheckpointType::TX_CTX_MEMTABLE_TYPE]); + ASSERT_EQ(true, tx_ctx_memtable->is_active_memtable()); + ASSERT_EQ(OB_SUCCESS, tx_ctx_memtable->flush(share::SCN::max_scn())); + + // // TODO(handora.qc): use more graceful wait + // usleep(10 * 1000 * 1000); + // usleep(100 * 1000); + + RETRY_UNTIL_TIMEOUT(tx_ctx_memtable->is_active_memtable(), 10 * 1000 * 1000, 100 * 1000); + ASSERT_EQ(OB_SUCCESS, ret); + + TRANS_LOG(INFO, "minor_freeze_tx_ctx_memtable end"); +} + +TEST_F(GET_ZONE_TEST_CLASS_NAME(1), register_mds_without_commit) +{ + int ret = OB_SUCCESS; + common::ObMySQLProxy &sys_tenant_proxy = get_curr_simple_server().get_sql_proxy(); + + ASSERT_EQ(OB_SUCCESS, mysql_trans.start(GCTX.sql_proxy_, 1)); + + // ACQUIRE_CONN_FROM_SQL_PROXY(sys_conn, sys_tenant_proxy); + + // ASSERT_EQ(OB_SUCCESS, mysql_trans.start_transaction(1, false)); + register_mds_conn = static_cast(mysql_trans.get_connection()); + transaction::ObTxDesc *desc = register_mds_conn->get_session().get_tx_desc(); + ASSERT_NE(nullptr, desc); + ASSERT_EQ(true, desc->is_valid()); + + std::string test_register_str1 = "TEST REGISTER MDS 1"; + transaction::ObRegisterMdsFlag register_mds_flag1; + register_mds_flag1.need_flush_redo_instantly_ = true; + ASSERT_EQ(OB_SUCCESS, register_mds_flag1.mds_base_scn_.convert_for_tx(1000)); + ASSERT_EQ(OB_SUCCESS, + register_mds_conn->register_multi_data_source( + 1, ObLSID(1), transaction::ObTxDataSourceType::DDL_TRANS, + test_register_str1.c_str(), test_register_str1.size(), register_mds_flag1)); + + ObPartTransCtx *tx_ctx = nullptr; + GET_LS(1, 1, ls_handle); + ASSERT_EQ(ls_handle.is_valid(), true); + ASSERT_EQ(OB_SUCCESS, ls_handle.get_ls()->get_tx_ctx(desc->get_tx_id(), false, tx_ctx)); + + RETRY_UNTIL_TIMEOUT(tx_ctx->busy_cbs_.is_empty(), 5 * 1000 * 1000, 5000); + ASSERT_EQ(ret, OB_SUCCESS); + share::SCN register_1_scn = tx_ctx->exec_info_.max_applying_log_ts_; + + std::string test_register_str2 = "TEST REGISTER MDS 2"; + transaction::ObRegisterMdsFlag register_mds_flag2; + register_mds_flag2.need_flush_redo_instantly_ = true; + ASSERT_EQ(OB_SUCCESS, register_mds_flag2.mds_base_scn_.convert_for_tx(2000)); + ASSERT_EQ(OB_SUCCESS, + register_mds_conn->register_multi_data_source( + 1, ObLSID(1), transaction::ObTxDataSourceType::DDL_TRANS, + test_register_str2.c_str(), test_register_str2.size(), register_mds_flag2)); + + RETRY_UNTIL_TIMEOUT(tx_ctx->busy_cbs_.is_empty(), 5 * 1000 * 1000, 5000); + ASSERT_EQ(ret, OB_SUCCESS); + share::SCN register_2_scn = tx_ctx->exec_info_.max_applying_log_ts_; + + ASSERT_EQ(true, register_2_scn > register_1_scn); + ASSERT_EQ(tx_ctx->exec_info_.multi_data_source_.count(), 2); + + register_succ_arg.tx_id_ = desc->get_tx_id(); + register_succ_arg.register_scn1_ = register_1_scn; + register_succ_arg.register_no1_ = tx_ctx->exec_info_.multi_data_source_[0].get_register_no(); + register_succ_arg.register_scn2_ = register_2_scn; + register_succ_arg.register_no2_ = tx_ctx->exec_info_.multi_data_source_[1].get_register_no(); + + TRANS_LOG(INFO, "[ObMultiReplicaTestBase] register mds into tx and submit mds redo success", + K(ret), K(register_succ_arg), K(tx_ctx->exec_info_.multi_data_source_)); + + ASSERT_EQ(OB_SUCCESS, ls_handle.get_ls()->revert_tx_ctx(tx_ctx)); + + std::string tmp_str; + ASSERT_EQ(OB_SUCCESS, + EventArgSerTool::serialize_arg(register_succ_arg, tmp_str)); + ASSERT_EQ(OB_SUCCESS, finish_event("REGISTER_MDS_SUCC", tmp_str)); +} + +TEST_F(GET_ZONE_TEST_CLASS_NAME(2), replay_mds_log_normal) +{ + int ret = OB_SUCCESS; + std::string tmp_event_val; + ASSERT_EQ(OB_SUCCESS, wait_event_finish("REGISTER_MDS_SUCC", tmp_event_val, 30 * 60 * 1000)); + ASSERT_EQ(OB_SUCCESS, + EventArgSerTool::deserialize_arg(register_succ_arg, tmp_event_val)); + + ObPartTransCtx *tx_ctx = nullptr; + GET_LS(1, 1, ls_handle); + ASSERT_EQ(ls_handle.is_valid(), true); + ASSERT_EQ(OB_SUCCESS, ls_handle.get_ls()->get_tx_ctx(register_succ_arg.tx_id_, true, tx_ctx)); + + RETRY_UNTIL_TIMEOUT(tx_ctx->exec_info_.max_applied_log_ts_ == register_succ_arg.register_scn2_, + 5 * 1000 * 1000, 5000); + ASSERT_EQ(ret, OB_SUCCESS); + + ASSERT_EQ(tx_ctx->exec_info_.multi_data_source_.count(), 2); + // ASSERT_EQ(tx_ctx->exec_info_.multi_data_source_[0].get_register_no(), + // register_succ_arg.register_no1_); + // ASSERT_EQ(tx_ctx->exec_info_.multi_data_source_[1].get_register_no(), + // register_succ_arg.register_no2_); + + ASSERT_EQ(OB_SUCCESS, ls_handle.get_ls()->revert_tx_ctx(tx_ctx)); +} + +TEST_F(GET_ZONE_TEST_CLASS_NAME(2), dump_tx_ctx_table) +{ + int ret = OB_SUCCESS; + GET_LS(1, 1, ls_handle); + ASSERT_EQ(ls_handle.is_valid(), true); + + { + share::ObTenantSwitchGuard tenant_guard; + ASSERT_EQ(OB_SUCCESS, tenant_guard.switch_to(1)); + minor_freeze_tx_ctx_memtable(ls_handle.get_ls()); + } + + ASSERT_EQ(restart_zone(2, 1), OB_SUCCESS); +} + +TEST_F(GET_RESTART_ZONE_TEST_CLASS_NAME(2, 1), restart_zone2_from_tx_ctx_table) +{ + int ret = OB_SUCCESS; + + std::string tmp_event_val; + ASSERT_EQ(OB_SUCCESS, wait_event_finish("REGISTER_MDS_SUCC", tmp_event_val, 30 * 60 * 1000)); + ASSERT_EQ(OB_SUCCESS, + EventArgSerTool::deserialize_arg(register_succ_arg, tmp_event_val)); + + ObPartTransCtx *tx_ctx = nullptr; + GET_LS(1, 1, ls_handle); + ASSERT_EQ(ls_handle.is_valid(), true); + ASSERT_EQ(OB_SUCCESS, ls_handle.get_ls()->get_tx_ctx(register_succ_arg.tx_id_, true, tx_ctx)); + + RETRY_UNTIL_TIMEOUT(tx_ctx->start_recover_ts_ == register_succ_arg.register_scn2_, + 5 * 1000 * 1000, 5000); + // tx_ctx->print_trace_log(); + // TRANS_LOG(INFO, "after restart, print tx ctx",K(*tx_ctx)); + ASSERT_EQ(ret, OB_SUCCESS); + RETRY_UNTIL_TIMEOUT(tx_ctx->exec_info_.max_applying_log_ts_ == register_succ_arg.register_scn2_, + 5 * 1000 * 1000, 5000); + ASSERT_EQ(ret, OB_SUCCESS); + + TRANS_LOG(INFO, "[ObMultiReplicaTestBase] Print mds in tx ctx after retarted", K(ret), + K(tx_ctx->trans_id_), K(tx_ctx->exec_info_.multi_data_source_)); + ASSERT_EQ(tx_ctx->exec_info_.multi_data_source_.count(), 2); + ASSERT_EQ(tx_ctx->exec_info_.multi_data_source_[0].get_register_no(), + register_succ_arg.register_no1_); + ASSERT_EQ(tx_ctx->exec_info_.multi_data_source_[1].get_register_no(), + register_succ_arg.register_no2_); + + ASSERT_EQ(OB_SUCCESS, ls_handle.get_ls()->revert_tx_ctx(tx_ctx)); +} + +} // namespace unittest + +} // namespace oceanbase diff --git a/mittest/multi_replica/test_ob_multi_replica_basic.cpp b/mittest/multi_replica/test_ob_multi_replica_basic.cpp index 43cb8ff1f..f04e4a588 100644 --- a/mittest/multi_replica/test_ob_multi_replica_basic.cpp +++ b/mittest/multi_replica/test_ob_multi_replica_basic.cpp @@ -16,9 +16,16 @@ #define private public #include "env/ob_multi_replica_test_base.h" +#include "env/ob_multi_replica_util.h" #include "env/ob_fast_bootstrap.h" #include "lib/mysqlclient/ob_mysql_result.h" +#define CUR_TEST_CASE_NAME ObSimpleMultiReplicaExampleTest + +DEFINE_MULTI_ZONE_TEST_CASE_CLASS + +APPEND_RESTART_TEST_CASE_CLASS(1, 1) + namespace oceanbase { namespace unittest @@ -27,12 +34,6 @@ namespace unittest using namespace oceanbase::transaction; using namespace oceanbase::storage; -std::string ObMultiReplicaTestBase::ZONE_TEST_CASE_NAME[MAX_ZONE_COUNT] = { - "ObSimpleMultiReplicaExampleTest_ZONE1", "ObSimpleMultiReplicaExampleTest_ZONE2", - "ObSimpleMultiReplicaExampleTest_ZONE3"}; - -static const std::string TEST_DIR_PREFIX = "test_multi_replica_basic_"; - class TestRunCtx { public: @@ -42,25 +43,6 @@ public: TestRunCtx RunCtx; -class ObSimpleMultiReplicaExampleTest_ZONE1 : public ObMultiReplicaTestBase -{ -public: - ObSimpleMultiReplicaExampleTest_ZONE1() : ObMultiReplicaTestBase() {} - -}; - -class ObSimpleMultiReplicaExampleTest_ZONE2 : public ObMultiReplicaTestBase -{ -public: - ObSimpleMultiReplicaExampleTest_ZONE2() : ObMultiReplicaTestBase() {} -}; - -class ObSimpleMultiReplicaExampleTest_ZONE3 : public ObMultiReplicaTestBase -{ -public: - ObSimpleMultiReplicaExampleTest_ZONE3() : ObMultiReplicaTestBase() {} -}; - TEST_F(ObSimpleMultiReplicaExampleTest_ZONE1, observer_start) { SERVER_LOG(INFO, "observer_start succ"); @@ -78,7 +60,6 @@ TEST_F(ObSimpleMultiReplicaExampleTest_ZONE1, add_tenant) ASSERT_EQ(OB_SUCCESS, get_curr_simple_server().init_sql_proxy2()); } - TEST_F(ObSimpleMultiReplicaExampleTest_ZONE1, create_table) { int ret = OB_SUCCESS; @@ -123,13 +104,12 @@ TEST_F(ObSimpleMultiReplicaExampleTest_ZONE1, create_table) } } - -TEST_F(ObSimpleMultiReplicaExampleTest_ZONE1, delete_tenant) +TEST_F(ObSimpleMultiReplicaExampleTest_ZONE1, restart_zone1) { - ASSERT_EQ(OB_SUCCESS, delete_tenant()); + // ASSERT_EQ(OB_SUCCESS, delete_tenant()); + ASSERT_EQ(OB_SUCCESS, ObMultiReplicaTestBase::restart_zone(1, 1)); } - TEST_F(ObSimpleMultiReplicaExampleTest_ZONE1, end) { RunCtx.time_sec_ = 0; @@ -154,41 +134,61 @@ TEST_F(ObSimpleMultiReplicaExampleTest_ZONE3, end) } } +TEST_F(GET_RESTART_ZONE_TEST_CLASS_NAME(1, 1), restart_zone1) +{ +} + } // end unittest } // end oceanbase -int main(int argc, char **argv) -{ - int return_code = 0; - int ret = OB_SUCCESS; - int c = 0; - int time_sec = 0; - char *log_level = (char *)"INFO"; - while (EOF != (c = getopt(argc, argv, "t:l:"))) { - switch (c) { - case 't': - time_sec = atoi(optarg); - break; - case 'l': - log_level = optarg; - oceanbase::unittest::ObMultiReplicaTestBase::enable_env_warn_log_ = false; - break; - default: - break; - } - } - oceanbase::unittest::init_log_and_gtest(argc, argv); - OB_LOGGER.set_log_level(log_level); - - LOG_INFO("main>>>"); - oceanbase::unittest::RunCtx.time_sec_ = time_sec; - ::testing::InitGoogleTest(&argc, argv); - if (OB_FAIL(oceanbase::unittest::ObMultiReplicaTestBase::bootstrap_multi_replica( - oceanbase::unittest::TEST_DIR_PREFIX))) { - fprintf(stdout, "init test case failed. ret = %d", ret); - return ret; - } - return_code = RUN_ALL_TESTS(); - return return_code; -} +MULTI_REPLICA_TEST_MAIN_FUNCTION( "test_multi_replica_basic_" ); +// int main(int argc, char **argv) +// { +// int return_code = 0; +// int ret = OB_SUCCESS; +// int c = 0; +// int time_sec = 0; +// char *log_level = (char *)"INFO"; +// while (EOF != (c = getopt(argc, argv, "t:l:"))) { +// switch (c) { +// case 't': +// time_sec = atoi(optarg); +// break; +// case 'l': +// log_level = optarg; +// oceanbase::unittest::ObMultiReplicaTestBase::enable_env_warn_log_ = false; +// break; +// default: +// break; +// } +// } +// oceanbase::unittest::init_log_and_gtest(argc, argv); +// OB_LOGGER.set_log_level(log_level); +// +// for(int i =0; i < argc; i++) +// { +// printf("MAIN FUNC : argc = %d, i =%d, arv = %s\n", argc, i, argv[i]); +// } +// +// int restart_zone_id = -1; +// int restart_zone_no = 0; +// if(argc >= 3) +// { +// std::string tmp_str; +// tmp_str = argv[1]; +// restart_zone_id = std::stoi(tmp_str); +// tmp_str = argv[2]; +// restart_zone_no = std::stoi(tmp_str); +// } +// LOG_INFO("main>>>"); +// oceanbase::unittest::RunCtx.time_sec_ = time_sec; +// ::testing::InitGoogleTest(&argc, argv); +// if (OB_FAIL(oceanbase::unittest::ObMultiReplicaTestBase::bootstrap_multi_replica( +// restart_zone_id, restart_zone_no, oceanbase::unittest::TEST_DIR_PREFIX))) { +// fprintf(stdout, "init test case failed. ret = %d", ret); +// return ret; +// } +// return_code = RUN_ALL_TESTS(); +// return return_code; +// } diff --git a/src/storage/tx/ob_multi_data_source.cpp b/src/storage/tx/ob_multi_data_source.cpp index bc706e3df..65f1961d3 100644 --- a/src/storage/tx/ob_multi_data_source.cpp +++ b/src/storage/tx/ob_multi_data_source.cpp @@ -42,7 +42,7 @@ namespace transaction // ObTxBufferNode //##################################################### -OB_SERIALIZE_MEMBER(ObTxBufferNode, type_, data_); +OB_SERIALIZE_MEMBER(ObTxBufferNode, type_, data_, register_no_); int ObTxBufferNode::init(const ObTxDataSourceType type, const ObString &data, @@ -63,6 +63,23 @@ int ObTxBufferNode::init(const ObTxDataSourceType type, return ret; } +int ObTxBufferNode::set_mds_register_no(const uint64_t register_no) +{ + int ret = OB_SUCCESS; + if (register_no <= 0 || !is_valid()) { + ret = OB_INVALID_ARGUMENT; + TRANS_LOG(WARN, "invalid argument", K(ret), K(register_no), KPC(this)); + } else if (register_no_ > 0) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(WARN, "invalid register no", K(ret), K(register_no), KPC(this)); + } else { + register_no_ = register_no; + // TRANS_LOG(INFO, "set register no in mds node", K(ret), KPC(this)); + } + + return ret; +} + void ObTxBufferNode::replace_data(const common::ObString &data) { if (nullptr != data_.ptr()) { @@ -158,8 +175,21 @@ int ObMulSourceTxDataNotifier::notify(const ObTxBufferNodeArray &array, tmp_notify_arg.redo_submitted_ = node.is_submitted(); tmp_notify_arg.redo_synced_ = node.is_synced(); + if (i > 0) { + const ObTxBufferNode &prev_node = array.at(i - 1); + if (ObTxBufferNode::is_valid_register_no(prev_node.get_register_no()) + && ObTxBufferNode::is_valid_register_no(node.get_register_no()) + && prev_node.get_register_no() >= node.get_register_no()) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(ERROR, "unexpected register no for the mds_node", K(ret), K(i), K(array.count()), + K(arg), K(node), K(prev_node)); + } + } + OB_ASSERT(node.type_ != ObTxDataSourceType::BEFORE_VERSION_4_1); - if (node.type_ < ObTxDataSourceType::BEFORE_VERSION_4_1 + if(OB_FAIL(ret)) { + //do nothing + } else if (node.type_ < ObTxDataSourceType::BEFORE_VERSION_4_1 && ObTxDataSourceType::CREATE_TABLET_NEW_MDS != node.type_ && ObTxDataSourceType::DELETE_TABLET_NEW_MDS != node.type_ && ObTxDataSourceType::UNBIND_TABLET_NEW_MDS != node.type_) { diff --git a/src/storage/tx/ob_multi_data_source.h b/src/storage/tx/ob_multi_data_source.h index 055c02e0a..5c3ed08c1 100644 --- a/src/storage/tx/ob_multi_data_source.h +++ b/src/storage/tx/ob_multi_data_source.h @@ -13,6 +13,7 @@ #ifndef OCEANBASE_TRANSACTION_OB_MULTI_DATA_SOURCE_ #define OCEANBASE_TRANSACTION_OB_MULTI_DATA_SOURCE_ +#include "share/ob_cluster_version.h" #include "lib/container/ob_se_array.h" #include "lib/list/ob_list.h" #include "lib/utility/ob_unify_serialize.h" @@ -87,14 +88,20 @@ class ObTxBufferNode public: ObTxBufferNode() : type_(ObTxDataSourceType::UNKNOWN), data_() { reset(); } ~ObTxBufferNode() {} - int init(const ObTxDataSourceType type, const common::ObString &data, const share::SCN &base_scn, storage::mds::BufferCtx *ctx); + int init(const ObTxDataSourceType type, + const common::ObString &data, + const share::SCN &base_scn, + storage::mds::BufferCtx *ctx); bool is_valid() const { - return type_ > ObTxDataSourceType::UNKNOWN && type_ < ObTxDataSourceType::MAX_TYPE - && data_.length() > 0; + bool valid_member = false; + valid_member = type_ > ObTxDataSourceType::UNKNOWN && type_ < ObTxDataSourceType::MAX_TYPE + && data_.length() > 0; + return valid_member; } void reset() { + register_no_ = 0; type_ = ObTxDataSourceType::UNKNOWN; data_.reset(); has_submitted_ = false; @@ -102,6 +109,10 @@ public: mds_base_scn_.reset(); } + static bool is_valid_register_no(const int64_t register_no) { return register_no > 0; } + int set_mds_register_no(const uint64_t register_no); + uint64_t get_register_no() const { return register_no_; } + // only for some mds types of CDC // can not be used by observer functions bool allow_to_use_mds_big_segment() { return type_ == ObTxDataSourceType::DDL_TRANS; } @@ -121,7 +132,7 @@ public: const share::SCN &get_base_scn() { return mds_base_scn_; } - bool operator==(const ObTxBufferNode & buffer_node) const; + bool operator==(const ObTxBufferNode &buffer_node) const; void log_sync_fail() { @@ -129,8 +140,10 @@ public: has_synced_ = false; } storage::mds::BufferCtxNode &get_buffer_ctx_node() const { return buffer_ctx_node_; } - TO_STRING_KV(K(has_submitted_), K(has_synced_), K_(type), K(data_.length())); + TO_STRING_KV(K(register_no_), K(has_submitted_), K(has_synced_), K_(type), K(data_.length())); + private: + uint64_t register_no_; bool has_submitted_; bool has_synced_; share::SCN mds_base_scn_; diff --git a/src/storage/tx/ob_trans_part_ctx.cpp b/src/storage/tx/ob_trans_part_ctx.cpp index 9f1baef5b..75d9fda52 100755 --- a/src/storage/tx/ob_trans_part_ctx.cpp +++ b/src/storage/tx/ob_trans_part_ctx.cpp @@ -6045,34 +6045,34 @@ int ObPartTransCtx::gen_total_mds_array_(ObTxBufferNodeArray &mds_array) const } int ObPartTransCtx::deep_copy_mds_array_(const ObTxBufferNodeArray &mds_array, - ObTxBufferNodeArray &incremental_array, - bool need_replace) + ObTxBufferNodeArray &incremental_array, + bool need_replace) { auto process_with_buffer_ctx = [this](const ObTxBufferNode &old_node, mds::BufferCtx *&new_ctx) -> int { int ret = OB_SUCCESS; - if (old_node.get_data_source_type() <= ObTxDataSourceType::UNKNOWN || - old_node.get_data_source_type() >= ObTxDataSourceType::MAX_TYPE) { + if (old_node.get_data_source_type() <= ObTxDataSourceType::UNKNOWN + || old_node.get_data_source_type() >= ObTxDataSourceType::MAX_TYPE) { ret = OB_ERR_UNDEFINED; TRANS_LOG(ERROR, "unexpected mds type", KR(ret), K(*this)); } else if (old_node.get_data_source_type() <= ObTxDataSourceType::BEFORE_VERSION_4_1 - && ObTxDataSourceType::CREATE_TABLET_NEW_MDS != old_node.get_data_source_type() - && ObTxDataSourceType::DELETE_TABLET_NEW_MDS != old_node.get_data_source_type() - && ObTxDataSourceType::UNBIND_TABLET_NEW_MDS != old_node.get_data_source_type()) { + && ObTxDataSourceType::CREATE_TABLET_NEW_MDS != old_node.get_data_source_type() + && ObTxDataSourceType::DELETE_TABLET_NEW_MDS != old_node.get_data_source_type() + && ObTxDataSourceType::UNBIND_TABLET_NEW_MDS != old_node.get_data_source_type()) { TRANS_LOG(INFO, "old mds type, no need process with buffer ctx", - K(old_node.get_data_source_type()), K(*this)); + K(old_node.get_data_source_type()), K(*this)); } else { - if (OB_ISNULL(old_node.get_buffer_ctx_node().get_ctx())) {// this is replay path, create ctx - if (OB_FAIL(mds::MdsFactory::create_buffer_ctx(old_node.get_data_source_type(), - trans_id_, + if (OB_ISNULL(old_node.get_buffer_ctx_node().get_ctx())) { // this is replay path, create ctx + if (OB_FAIL(mds::MdsFactory::create_buffer_ctx(old_node.get_data_source_type(), trans_id_, new_ctx))) { - TRANS_LOG(WARN, "fail to create buffer ctx", KR(ret), KPC(new_ctx), K(*this), K(old_node)); + TRANS_LOG(WARN, "fail to create buffer ctx", KR(ret), KPC(new_ctx), K(*this), + K(old_node)); } - } else {// this is recover path, copy ctx - if (OB_FAIL(mds::MdsFactory::deep_copy_buffer_ctx(trans_id_, - *(old_node.buffer_ctx_node_.get_ctx()), - new_ctx))) { - TRANS_LOG(WARN, "fail to deep copy buffer ctx", KR(ret), KPC(new_ctx), K(*this), K(old_node)); + } else { // this is recover path, copy ctx + if (OB_FAIL(mds::MdsFactory::deep_copy_buffer_ctx( + trans_id_, *(old_node.buffer_ctx_node_.get_ctx()), new_ctx))) { + TRANS_LOG(WARN, "fail to deep copy buffer ctx", KR(ret), KPC(new_ctx), K(*this), + K(old_node)); } } } @@ -6114,20 +6114,26 @@ int ObPartTransCtx::deep_copy_mds_array_(const ObTxBufferNodeArray &mds_array, mds::BufferCtx *new_ctx = nullptr; if (OB_FAIL(process_with_buffer_ctx(node, new_ctx))) { TRANS_LOG(WARN, "process_with_buffer_ctx failed", KR(ret), K(*this)); - } else if (OB_FAIL(new_node.init(node.get_data_source_type(), - data, - node.mds_base_scn_, + } else if (OB_FAIL(new_node.init(node.get_data_source_type(), data, node.mds_base_scn_, new_ctx))) { mtl_free(data.ptr()); if (OB_NOT_NULL(new_ctx)) { - MTL(mds::ObTenantMdsService*)->get_buffer_ctx_allocator().free(new_ctx); + MTL(mds::ObTenantMdsService *)->get_buffer_ctx_allocator().free(new_ctx); new_ctx = nullptr; } TRANS_LOG(WARN, "init new node failed", KR(ret), K(*this)); + } else if (ObTxBufferNode::is_valid_register_no(node.get_register_no()) + && OB_FAIL(new_node.set_mds_register_no(node.get_register_no()))) { + mtl_free(data.ptr()); + if (OB_NOT_NULL(new_ctx)) { + MTL(mds::ObTenantMdsService *)->get_buffer_ctx_allocator().free(new_ctx); + new_ctx = nullptr; + } + TRANS_LOG(WARN, "set mds register_no failed", KR(ret), K(*this)); } else if (OB_FAIL(tmp_buf_arr.push_back(new_node))) { mtl_free(data.ptr()); if (OB_NOT_NULL(new_ctx)) { - MTL(mds::ObTenantMdsService*)->get_buffer_ctx_allocator().free(new_ctx); + MTL(mds::ObTenantMdsService *)->get_buffer_ctx_allocator().free(new_ctx); new_ctx = nullptr; } TRANS_LOG(WARN, "push multi source data failed", KR(ret), K(*this)); @@ -6161,15 +6167,55 @@ int ObPartTransCtx::deep_copy_mds_array_(const ObTxBufferNodeArray &mds_array, } else { - for (int64_t i = 0; OB_SUCC(ret) && i < tmp_buf_arr.count(); ++i) { + const int64_t tmp_buf_array_cnt = tmp_buf_arr.count(); + const int64_t ctx_mds_array_cnt = exec_info_.multi_data_source_.count(); + int64_t max_register_no_in_ctx = 0; + if (exec_info_.multi_data_source_.count() > 0) { + max_register_no_in_ctx = + exec_info_.multi_data_source_[ctx_mds_array_cnt - 1].get_register_no(); + } + int64_t ctx_array_start_index = 0; + + for (int64_t i = 0; OB_SUCC(ret) && i < tmp_buf_array_cnt; ++i) { if (is_follower_()) { tmp_buf_arr[i].set_submitted(); tmp_buf_arr[i].set_synced(); } - if (OB_FAIL(exec_info_.multi_data_source_.push_back(tmp_buf_arr[i]))) { - TRANS_LOG(WARN, "push back exec_info_.multi_data_source_ failed", K(ret)); - } else if (OB_FAIL(incremental_array.push_back(tmp_buf_arr[i]))) { - TRANS_LOG(WARN, "push back incremental_array failed", K(ret)); + if (ObTxBufferNode::is_valid_register_no(max_register_no_in_ctx) + && ObTxBufferNode::is_valid_register_no(tmp_buf_arr[i].get_register_no()) + && tmp_buf_arr[i].get_register_no() <= max_register_no_in_ctx) { + while ((!ObTxBufferNode::is_valid_register_no( + exec_info_.multi_data_source_[ctx_array_start_index].get_register_no()) + || tmp_buf_arr[i].get_register_no() + > exec_info_.multi_data_source_[ctx_array_start_index].get_register_no()) + && ctx_array_start_index < ctx_mds_array_cnt) { + ctx_array_start_index++; + } + if (tmp_buf_arr[i].get_register_no() + == exec_info_.multi_data_source_[ctx_array_start_index].get_register_no()) { + + mtl_free(tmp_buf_arr[i].data_.ptr()); + tmp_buf_arr[i].buffer_ctx_node_.destroy_ctx(); + if (OB_FAIL(incremental_array.push_back( + exec_info_.multi_data_source_[ctx_array_start_index]))) { + TRANS_LOG(WARN, "push back incremental_array failed", K(ret)); + } + TRANS_LOG(INFO, "filter mds node replay by the register_no", K(ret), K(trans_id_), + K(ls_id_), K(i), K(ctx_array_start_index), K(tmp_buf_arr[i].get_register_no()), + K(exec_info_.multi_data_source_[ctx_array_start_index])); + } else { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(ERROR, "we can not find a mds node in ctx with the same register_no", K(ret), + K(i), K(ctx_array_start_index), K(tmp_buf_arr[i].get_register_no()), + K(exec_info_.multi_data_source_[ctx_array_start_index]), KPC(this)); + } + + } else { + if (OB_FAIL(exec_info_.multi_data_source_.push_back(tmp_buf_arr[i]))) { + TRANS_LOG(WARN, "push back exec_info_.multi_data_source_ failed", K(ret)); + } else if (OB_FAIL(incremental_array.push_back(tmp_buf_arr[i]))) { + TRANS_LOG(WARN, "push back incremental_array failed", K(ret)); + } } } } @@ -6550,6 +6596,8 @@ int ObPartTransCtx::register_multi_data_source(const ObTxDataSourceType data_sou ret = OB_LOG_TOO_LARGE; TRANS_LOG(WARN, "too large mds buf node", K(ret), K(tmp_array.get_serialize_size())); //#endif + } else if (OB_FAIL(mds_cache_.try_recover_max_register_no(exec_info_.multi_data_source_))) { + TRANS_LOG(WARN, "recover max register no failed", K(ret), K(mds_cache_), KPC(this)); } else if (OB_FAIL(mds_cache_.insert_mds_node(node))) { TRANS_LOG(WARN, "register multi source data failed", KR(ret), K(data_source_type), K(*this)); diff --git a/src/storage/tx/ob_tx_ctx_mds.cpp b/src/storage/tx/ob_tx_ctx_mds.cpp index 34b1f1af8..11957f1f7 100644 --- a/src/storage/tx/ob_tx_ctx_mds.cpp +++ b/src/storage/tx/ob_tx_ctx_mds.cpp @@ -24,6 +24,7 @@ void ObTxMDSCache::reset() mds_list_.reset(); submitted_iterator_ = mds_list_.end(); // ObTxBufferNodeList::iterator(); need_retry_submit_mds_ = false; + max_register_no_ = 0; } void ObTxMDSCache::destroy() @@ -39,13 +40,31 @@ void ObTxMDSCache::destroy() } } -int ObTxMDSCache::insert_mds_node(const ObTxBufferNode &buf_node) +int ObTxMDSCache::try_recover_max_register_no(const ObTxBufferNodeArray &node_array) +{ + int ret = OB_SUCCESS; + int64_t array_cnt = node_array.count(); + if (array_cnt > 0) { + int64_t max_register_no = node_array[array_cnt - 1].get_register_no(); + if (max_register_no > max_register_no_) { + max_register_no_ = max_register_no; + } + } + + return ret; +} + +int ObTxMDSCache::insert_mds_node(ObTxBufferNode &buf_node) { int ret = OB_SUCCESS; if (!buf_node.is_valid()) { ret = OB_INVALID_ARGUMENT; TRANS_LOG(WARN, "insert MDS buf node", K(ret)); + } else if (OB_FALSE_IT(max_register_no_++)) { + //do nothing + } else if (OB_FAIL(buf_node.set_mds_register_no(max_register_no_))) { + TRANS_LOG(WARN, "set mds register no failed", K(ret), K(buf_node), KPC(this)); } else if (OB_FAIL(mds_list_.push_back(buf_node))) { TRANS_LOG(WARN, "push back MDS buf node", K(ret)); } else { @@ -63,6 +82,7 @@ int ObTxMDSCache::rollback_last_mds_node() if (OB_FAIL(mds_list_.pop_back())) { TRANS_LOG(WARN, "pop back last node failed", K(ret)); } else { + TRANS_LOG(INFO, "rollback the last mds node", K(ret), K(buf_node), KPC(this)); share::mtl_free(buf_node.get_ptr()); buf_node.get_buffer_ctx_node().destroy_ctx(); } @@ -309,7 +329,18 @@ int ObTxMDSRange::move_from_cache_to_arr(ObTxMDSCache &mds_cache, TRANS_LOG(WARN, "empty range in move function", K(ret), KPC(tx_ctx_)); } else { for (int64_t i = 0; i < range_array_.count() && OB_SUCC(ret); i++) { - if (OB_FAIL(mds_cache.earse_from_cache(range_array_[i]))) { + if (!ObTxBufferNode::is_valid_register_no(range_array_[i].get_register_no())) { + ret = OB_INVALID_ARGUMENT; + TRANS_LOG(ERROR, "invalid register no for a mds node in cache", K(ret), K(i), + K(range_array_[i])); + } else if (!mds_durable_arr.empty() + && ObTxBufferNode::is_valid_register_no( + mds_durable_arr[mds_durable_arr.count() - 1].get_register_no()) + && range_array_[i].get_register_no() + <= mds_durable_arr[mds_durable_arr.count() - 1].get_register_no()) { + TRANS_LOG(ERROR, "invalid smaller register no", K(ret), K(i), K(range_array_[i]), + K(mds_durable_arr[mds_durable_arr.count() - 1])); + } else if (OB_FAIL(mds_cache.earse_from_cache(range_array_[i]))) { TRANS_LOG(WARN, "earse from mds cache failed", K(ret), K(range_array_[i]), K(mds_cache), K(mds_durable_arr)); } else if (OB_FALSE_IT(range_array_[i].set_synced())) { @@ -324,70 +355,6 @@ int ObTxMDSRange::move_from_cache_to_arr(ObTxMDSCache &mds_cache, return ret; } -// int ObTxMDSRange::move_to(ObTxBufferNodeArray &tx_buffer_node_arr) -// { -// int ret = OB_SUCCESS; -// -// ObTxBufferNodeList::iterator del_iterator, next_iterator; -// -// if (OB_ISNULL(list_ptr_)) { -// ret = OB_NOT_INIT; -// TRANS_LOG(WARN, "MDS range is not init", K(ret)); -// } else if (start_iter_ == list_ptr_->end() || 0 == count_) { -// // empty MDS range -// TRANS_LOG(WARN, "use empty mds range when move", K(this), K(lbt())); -// } else { -// int64_t i = 0; -// del_iterator = list_ptr_->end(); -// next_iterator = start_iter_; -// -// for (i = 0; i < count_ && OB_SUCC(ret) && next_iterator != list_ptr_->end(); i++) { -// del_iterator = next_iterator; -// next_iterator++; -// -// if (!del_iterator->is_submitted()) { -// ret = OB_ERR_UNEXPECTED; -// TRANS_LOG(WARN, "try to move unsubmitted MDS node", K(ret)); -// } else if (OB_FALSE_IT(del_iterator->set_synced())) { -// TRANS_LOG(WARN, "set synced MDS node failed", K(*del_iterator)); -// } else if (OB_FAIL(tx_buffer_node_arr.push_back(*del_iterator))) { -// TRANS_LOG(WARN, "push back MDS node failed", K(ret)); -// } else if (OB_FAIL(list_ptr_->erase(del_iterator))) { -// TRANS_LOG(WARN, "earse from MDS list failed", K(ret)); -// } -// } -// } -// -// return ret; -// } -// -// int ObTxMDSRange::copy_to(ObTxBufferNodeArray &tx_buffer_node_arr) const -// { -// int ret = OB_SUCCESS; -// -// ObTxBufferNodeList::iterator next_iterator; -// -// if (OB_ISNULL(list_ptr_)) { -// ret = OB_NOT_INIT; -// TRANS_LOG(WARN, "MDS range is not init", K(ret)); -// } else if (start_iter_ == list_ptr_->end() || 0 == count_) { -// // empty MDS range -// TRANS_LOG(WARN, "use empty mds range when copy", K(this), K(lbt())); -// } else { -// int64_t i = 0; -// next_iterator = start_iter_; -// -// for (i = 0; i < count_ && OB_SUCC(ret) && next_iterator != list_ptr_->end(); -// i++, next_iterator++) { -// if (OB_FAIL(tx_buffer_node_arr.push_back(*next_iterator))) { -// TRANS_LOG(WARN, "push back MDS node failed", K(ret)); -// } -// } -// } -// -// return ret; -// } -// int ObTxMDSRange::range_submitted(ObTxMDSCache &cache) { int ret = OB_SUCCESS; diff --git a/src/storage/tx/ob_tx_ctx_mds.h b/src/storage/tx/ob_tx_ctx_mds.h index f02edf7f4..d8a26d541 100644 --- a/src/storage/tx/ob_tx_ctx_mds.h +++ b/src/storage/tx/ob_tx_ctx_mds.h @@ -33,7 +33,8 @@ public: void reset(); void destroy(); - int insert_mds_node(const ObTxBufferNode &buf_node); + int try_recover_max_register_no(const ObTxBufferNodeArray & node_array); + int insert_mds_node(ObTxBufferNode &buf_node); int rollback_last_mds_node(); int fill_mds_log(ObPartTransCtx *ctx, ObTxMultiDataSourceLog &mds_log, @@ -61,10 +62,11 @@ public: void set_need_retry_submit_mds(bool need_retry) { need_retry_submit_mds_ = need_retry; }; bool need_retry_submit_mds() { return need_retry_submit_mds_; } - TO_STRING_KV(K(unsubmitted_size_), K(mds_list_.size())); + TO_STRING_KV(K(unsubmitted_size_), K(mds_list_.size()), K(max_register_no_)); private: // TransModulePageAllocator allocator_; + uint64_t max_register_no_; bool need_retry_submit_mds_; int64_t unsubmitted_size_; ObTxBufferNodeList mds_list_;