set register_no into ObTxBufferNode to filter replay mds redo

This commit is contained in:
KyrielightWei 2023-10-17 09:13:57 +00:00 committed by ob-robot
parent daaf3a2e08
commit c5be3d3110
13 changed files with 883 additions and 326 deletions

View File

@ -23,4 +23,5 @@ endfunction()
ob_unittest_multi_replica(test_ob_multi_replica_basic)
ob_unittest_multi_replica(test_ob_dup_table_basic)
ob_unittest_multi_replica(test_ob_dup_table_leader_switch)
ob_unittest_multi_replica(test_ob_dup_table_tablet_gc)
ob_unittest_multi_replica(test_ob_dup_table_tablet_gc)
ob_unittest_multi_replica(test_mds_replay_from_ctx_table)

View File

@ -10,14 +10,15 @@
* See the Mulan PubL v2 for more details.
*/
#include <fstream>
#include "ob_multi_replica_test_base.h"
#include "lib/ob_errno.h"
#include "lib/oblog/ob_log.h"
#include "lib/profile/ob_trace_id.h"
#include "lib/time/ob_time_utility.h"
#include "lib/utility/ob_defer.h"
#include "logservice/palf/election/utils/election_common_define.h"
#include "ob_multi_replica_test_base.h"
#include "ob_multi_replica_util.h"
#include <fstream>
namespace oceanbase
{
@ -40,12 +41,12 @@ void init_log_and_gtest(int argc, char **argv)
std::string app_gtest_log_name = app_name + "_gtest.log";
std::string app_trace_log_name = app_name + "_trace.log";
system(("rm -rf " + app_log_name + "*").c_str());
system(("rm -rf " + app_rs_log_name + "*").c_str());
system(("rm -rf " + app_ele_log_name + "*").c_str());
system(("rm -rf " + app_gtest_log_name + "*").c_str());
system(("rm -rf " + app_trace_log_name + "*").c_str());
system(("rm -rf " + app_name + "_*").c_str());
// system(("rm -rf " + app_log_name + "*").c_str());
// system(("rm -rf " + app_rs_log_name + "*").c_str());
// system(("rm -rf " + app_ele_log_name + "*").c_str());
// system(("rm -rf " + app_gtest_log_name + "*").c_str());
// system(("rm -rf " + app_trace_log_name + "*").c_str());
// system(("rm -rf " + app_name + "_*").c_str());
init_gtest_output(app_gtest_log_name);
OB_LOGGER.set_file_name(app_log_name.c_str(), true, false, app_rs_log_name.c_str(),
@ -123,7 +124,8 @@ std::shared_ptr<observer::ObSimpleServerReplica> ObMultiReplicaTestBase::replica
bool ObMultiReplicaTestBase::is_started_ = false;
bool ObMultiReplicaTestBase::is_inited_ = false;
std::string ObMultiReplicaTestBase::env_prefix_;
std::string ObMultiReplicaTestBase::curr_dir_;
std::string ObMultiReplicaTestBase::app_name_;
std::string ObMultiReplicaTestBase::exec_dir_;
std::string ObMultiReplicaTestBase::env_prefix_path_;
std::string ObMultiReplicaTestBase::event_file_path_;
bool ObMultiReplicaTestBase::enable_env_warn_log_ = false;
@ -133,28 +135,48 @@ ObServerInfoList ObMultiReplicaTestBase::server_list_;
std::string ObMultiReplicaTestBase::rs_list_;
std::string ObMultiReplicaTestBase::local_ip_;
int ObMultiReplicaTestBase::child_pid_ = -1;
int ObMultiReplicaTestBase::child_pid2_ = -1;
int ObMultiReplicaTestBase::cur_zone_id_ = 0;
bool ObMultiReplicaTestBase::block_msg_ = false;
std::vector<int> ObMultiReplicaTestBase::zone_pids_;
int ObMultiReplicaTestBase::cur_zone_id_ = -1;
int ObMultiReplicaTestBase::restart_zone_id_ = -1;
int ObMultiReplicaTestBase::restart_no_ = 0;
ObMultiReplicaTestBase::ObMultiReplicaTestBase()
{
}
bool ObMultiReplicaTestBase::block_msg_ = false;
ObMultiReplicaTestBase::ObMultiReplicaTestBase() {}
ObMultiReplicaTestBase::~ObMultiReplicaTestBase() {}
int ObMultiReplicaTestBase::bootstrap_multi_replica(const std::string &env_prefix)
int ObMultiReplicaTestBase::bootstrap_multi_replica( const std::string & app_name,
const int restart_zone_id,
const int restart_no,
const std::string &env_prefix)
{
int ret = OB_SUCCESS;
if (!is_inited_) {
if (is_valid_zone_id(restart_zone_id) && restart_no <= 0) {
ret = OB_INVALID_ARGUMENT;
SERVER_LOG(ERROR, "invalid restart arg", K(ret), K(restart_zone_id), K(restart_no));
}
if (!is_inited_ && OB_SUCC(ret)) {
env_prefix_ =
env_prefix + "_test_data"; //+ std::to_string(ObTimeUtility::current_time()) + "_";
curr_dir_ = get_current_dir_name();
env_prefix_path_ = curr_dir_ + "/" + env_prefix_;
exec_dir_ = get_current_dir_name();
env_prefix_path_ = exec_dir_ + "/" + env_prefix_;
event_file_path_ = env_prefix_path_ + "/" + CLUSTER_EVENT_FILE_NAME;
zone_pids_.resize(3);
restart_zone_id_ = restart_zone_id;
restart_no_ = restart_no;
app_name_ = app_name.substr(app_name.find_last_of("/\\") + 1);
// SERVER_LOG(INFO, "bootstrap_multi_replica arg", K(ret), K(getpid()), K(restart_zone_id_),
// K(restart_no_), K(env_prefix_path_.c_str()));
printf("bootstrap_multi_replica arg: pid=%d, restart_zone_id=%d, restart_no=%d, "
"env_prefix_path=%s\n",
getpid(), restart_zone_id_, restart_no_, env_prefix_path_.c_str());
if (OB_FAIL(init_replicas_())) {
SERVER_LOG(WARN, "init multi replica failed.", KR(ret));
}
@ -162,6 +184,14 @@ int ObMultiReplicaTestBase::bootstrap_multi_replica(const std::string &env_prefi
if (OB_FAIL(ret)) {
// do nothing
} else if (0 == cur_zone_id_) {
// wait zone process exit
int status = 0;
for (int i = 0; i < 3; i++) {
waitpid(zone_pids_[i], &status, 0);
SERVER_LOG(INFO, "wait zone pid exit successfully", KR(ret), K(cur_zone_id_), K(i),
K(zone_pids_[i]), K(status));
}
} else if (!is_started_) {
if (OB_FAIL(start())) {
SERVER_LOG(WARN, "start multi replica failed.", KR(ret));
@ -195,27 +225,28 @@ int ObMultiReplicaTestBase::wait_all_test_completed()
"MAX_ZONE_COUNT = %d\n",
ret, cur_zone_id_, MAX_ZONE_COUNT);
}
if (cur_zone_id_ == 1) {
int status = 0;
int status2 = 0;
waitpid(child_pid_, &status, 0);
waitpid(child_pid2_, &status2, 0);
if (0 != status || 0 != status2) {
fprintf(stdout,
"Child process exit with error code : [%d]%d, [%d]%d\n",
child_pid_, status, child_pid2_, status2);
SERVER_LOG(INFO, "[ObMultiReplicaTestBase] Child process exit with error code", K(child_pid_),
K(status), K(child_pid2_), K(status2));
ret = status;
return ret;
} else {
fprintf(stdout,
"Child process run all test cases done. [%d]%d, [%d]%d\n",
child_pid_, status, child_pid2_, status2);
SERVER_LOG(INFO, "[ObMultiReplicaTestBase] Child process run all test cases done",
K(child_pid_), K(status), K(child_pid2_), K(status2));
}
}
// if (cur_zone_id_ == 1) {
// int status = 0;
// int status2 = 0;
// waitpid(child_pid_, &status, 0);
// waitpid(child_pid2_, &status2, 0);
// if (0 != status || 0 != status2) {
// fprintf(stdout,
// "Child process exit with error code : [%d]%d, [%d]%d\n",
// child_pid_, status, child_pid2_, status2);
// SERVER_LOG(INFO, "[ObMultiReplicaTestBase] Child process exit with error code",
// K(child_pid_),
// K(status), K(child_pid2_), K(status2));
// ret = status;
// return ret;
// } else {
// fprintf(stdout,
// "Child process run all test cases done. [%d]%d, [%d]%d\n",
// child_pid_, status, child_pid2_, status2);
// SERVER_LOG(INFO, "[ObMultiReplicaTestBase] Child process run all test cases done",
// K(child_pid_), K(status), K(child_pid2_), K(status2));
// }
// }
return ret;
}
@ -255,7 +286,7 @@ void ObMultiReplicaTestBase::TearDownTestCase()
// ASSERT_EQ(ret, OB_SUCCESS);
}
int fail_cnt = ::testing::UnitTest::GetInstance()->failed_test_case_count();
if (chdir(curr_dir_.c_str()) == 0) {
if (chdir(exec_dir_.c_str()) == 0) {
bool to_delete = true;
if (to_delete) {
// system((std::string("rm -rf ") + env_prefix_ + std::string("*")).c_str());
@ -266,93 +297,220 @@ void ObMultiReplicaTestBase::TearDownTestCase()
int ObMultiReplicaTestBase::init_replicas_()
{
SERVER_LOG(INFO, "init simple cluster test base");
SERVER_LOG(INFO, "init simple cluster test base", K(restart_zone_id_), K(restart_no_));
int ret = OB_SUCCESS;
system(("rm -rf " + env_prefix_).c_str());
if (!is_valid_zone_id(restart_zone_id_)) {
// for guard process
system(("rm -rf " + env_prefix_).c_str());
SERVER_LOG(INFO, "create dir and change work dir start.", K(env_prefix_.c_str()));
if (OB_FAIL(mkdir(env_prefix_.c_str(), 0777))) {
} else if (OB_FAIL(chdir(env_prefix_.c_str()))) {
// SERVER_LOG(INFO, "create dir and change work dir start.", K(env_prefix_.c_str()));
if (OB_FAIL(mkdir(env_prefix_.c_str(), 0777))) {
} else if (OB_FAIL(chdir(env_prefix_.c_str()))) {
} else {
const char *current_dir = env_prefix_.c_str();
// SERVER_LOG(INFO, "create dir and change work dir done.", K(current_dir));
}
std::string app_log_name = app_name_ + ".log";
std::string app_rs_log_name = app_name_ + "_rs.log";
std::string app_ele_log_name = app_name_ + "_election.log";
std::string app_gtest_log_name = app_name_ + "_gtest.log";
std::string app_trace_log_name = app_name_ + "_trace.log";
// system(("rm -rf " + app_log_name + "*").c_str());
// system(("rm -rf " + app_rs_log_name + "*").c_str());
// system(("rm -rf " + app_ele_log_name + "*").c_str());
// system(("rm -rf " + app_gtest_log_name + "*").c_str());
// system(("rm -rf " + app_trace_log_name + "*").c_str());
// system(("rm -rf " + app_name + "_*").c_str());
init_gtest_output(app_gtest_log_name);
OB_LOGGER.set_file_name(app_log_name.c_str(), true, false, app_rs_log_name.c_str(),
app_ele_log_name.c_str(), app_trace_log_name.c_str());
if (OB_SUCC(ret)) {
local_ip_ = get_local_ip();
if (local_ip_ == "") {
SERVER_LOG(WARN, "get_local_ip failed");
return -666666666;
}
}
// mkdir
std::vector<std::string> dirs;
rs_list_.clear();
rpc_ports_.clear();
server_list_.reset();
int server_fd = 0;
for (int i = 1; i <= MAX_ZONE_COUNT && OB_SUCC(ret); i++) {
std::string zone_dir = "zone" + std::to_string(i);
ret = mkdir(zone_dir.c_str(), 0777);
std::string data_dir = zone_dir + "/store";
dirs.push_back(data_dir);
dirs.push_back(zone_dir + "/run");
dirs.push_back(zone_dir + "/etc");
dirs.push_back(zone_dir + "/log");
dirs.push_back(zone_dir + "/wallet");
dirs.push_back(data_dir + "/clog");
dirs.push_back(data_dir + "/slog");
dirs.push_back(data_dir + "/sstable");
int64_t tmp_port = observer::ObSimpleServerReplica::get_rpc_port(server_fd);
rpc_ports_.push_back(tmp_port);
rs_list_ += local_ip_ + ":" + std::to_string(rpc_ports_[i - 1]) + ":"
+ std::to_string(rpc_ports_[i - 1] + 1);
if (i < MAX_ZONE_COUNT) {
rs_list_ += ";";
}
obrpc::ObServerInfo server_info;
server_info.zone_ = zone_dir.c_str();
server_info.server_ =
common::ObAddr(common::ObAddr::IPV4, local_ip_.c_str(), rpc_ports_[i - 1]);
server_info.region_ = "sys_region";
server_list_.push_back(server_info);
}
if (OB_SUCC(ret)) {
for (auto &dir : dirs) {
ret = mkdir(dir.c_str(), 0777);
if (OB_FAIL(ret)) {
SERVER_LOG(ERROR, "ObSimpleServerReplica mkdir", K(ret), K(dir.c_str()));
return ret;
}
}
}
if (OB_SUCC(ret)) {
for (int j = 0; j < MAX_ZONE_COUNT && OB_SUCC(ret); j++) {
if (OB_FAIL(finish_event("ZONE" + std::to_string(j + 1) + "_RPC_PORT",
std::to_string(rpc_ports_[j])))) {
SERVER_LOG(ERROR, "write RPC_PORT event failed", K(ret), K(j), K(rpc_ports_[j]));
}
}
}
} else {
const char *current_dir = env_prefix_.c_str();
SERVER_LOG(INFO, "create dir and change work dir done.", K(current_dir));
}
if (OB_SUCC(ret)) {
local_ip_ = get_local_ip();
if (local_ip_ == "") {
SERVER_LOG(WARN, "get_local_ip failed");
return -666666666;
rs_list_.clear();
rpc_ports_.clear();
server_list_.reset();
if (OB_FAIL(chdir(env_prefix_.c_str()))) {
} else {
const char *current_dir = env_prefix_.c_str();
SERVER_LOG(INFO, "create dir and change work dir done.", K(current_dir));
}
}
std::string app_log_name = "zone" + std::to_string(restart_zone_id_) + "/" + app_name_ + ".log";
std::string app_rs_log_name =
"zone" + std::to_string(restart_zone_id_) + "/" + app_name_ + "_rs.log";
std::string app_ele_log_name =
"zone" + std::to_string(restart_zone_id_) + "/" + app_name_ + "_election.log";
std::string app_gtest_log_name =
"zone" + std::to_string(restart_zone_id_) + "/" + app_name_ + "_gtest.log";
std::string app_trace_log_name =
"zone" + std::to_string(restart_zone_id_) + "/" + app_name_ + "_trace.log";
// mkdir
std::vector<std::string> dirs;
rs_list_.clear();
rpc_ports_.clear();
server_list_.reset();
// system(("rm -rf " + app_log_name + "*").c_str());
// system(("rm -rf " + app_rs_log_name + "*").c_str());
// system(("rm -rf " + app_ele_log_name + "*").c_str());
// system(("rm -rf " + app_gtest_log_name + "*").c_str());
// system(("rm -rf " + app_trace_log_name + "*").c_str());
// system(("rm -rf " + app_name + "_*").c_str());
int server_fd = 0;
for (int i = 1; i <= MAX_ZONE_COUNT && OB_SUCC(ret); i++) {
std::string zone_dir = "zone" + std::to_string(i);
ret = mkdir(zone_dir.c_str(), 0777);
std::string data_dir = zone_dir + "/store";
dirs.push_back(data_dir);
dirs.push_back(zone_dir + "/run");
dirs.push_back(zone_dir + "/etc");
dirs.push_back(zone_dir + "/log");
dirs.push_back(zone_dir + "/wallet");
init_gtest_output(app_gtest_log_name);
OB_LOGGER.set_file_name(app_log_name.c_str(), true, false, app_rs_log_name.c_str(),
app_ele_log_name.c_str(), app_trace_log_name.c_str());
dirs.push_back(data_dir + "/clog");
dirs.push_back(data_dir + "/slog");
dirs.push_back(data_dir + "/sstable");
int64_t tmp_port = observer::ObSimpleServerReplica::get_rpc_port(server_fd);
rpc_ports_.push_back(tmp_port);
rs_list_ += local_ip_ + ":" + std::to_string(rpc_ports_[i - 1]) + ":"
+ std::to_string(rpc_ports_[i - 1] + 1);
if (i < MAX_ZONE_COUNT) {
rs_list_ += ";";
if (OB_SUCC(ret)) {
local_ip_ = get_local_ip();
if (local_ip_ == "") {
SERVER_LOG(WARN, "get_local_ip failed");
return -666666666;
}
}
obrpc::ObServerInfo server_info;
server_info.zone_ = zone_dir.c_str();
server_info.server_ =
common::ObAddr(common::ObAddr::IPV4, local_ip_.c_str(), rpc_ports_[i - 1]);
server_info.region_ = "sys_region";
server_list_.push_back(server_info);
}
if (OB_SUCC(ret)) {
for (int j = 0; j < MAX_ZONE_COUNT && OB_SUCC(ret); j++) {
std::string rpc_port_str = "";
if (OB_FAIL(wait_event_finish("ZONE" + std::to_string(j + 1) + "_RPC_PORT", rpc_port_str,
5000 /*5s*/, 100 /*100ms*/))) {
if (OB_SUCC(ret)) {
for (auto &dir : dirs) {
ret = mkdir(dir.c_str(), 0777);
if (OB_FAIL(ret)) {
SERVER_LOG(ERROR, "ObSimpleServerReplica mkdir", K(ret), K(dir.c_str()));
return ret;
SERVER_LOG(ERROR, "read RPC_PORT event failed", K(ret), K(j), K(rpc_ports_[j]),
K(rpc_port_str.c_str()));
} else {
int tmp_rpc_port = std::stoi(rpc_port_str);
rpc_ports_.push_back(tmp_rpc_port);
rs_list_ += local_ip_ + ":" + rpc_port_str + ":" + std::to_string(tmp_rpc_port + 1);
if (j < MAX_ZONE_COUNT) {
rs_list_ += ";";
}
obrpc::ObServerInfo server_info;
std::string zone_dir = "zone" + std::to_string(j + 1);
server_info.zone_ = zone_dir.c_str();
server_info.server_ =
common::ObAddr(common::ObAddr::IPV4, local_ip_.c_str(), tmp_rpc_port);
server_info.region_ = "sys_region";
server_list_.push_back(server_info);
}
}
}
}
if (OB_SUCC(ret)) {
child_pid_ = fork();
child_pid2_ = -1;
if (!is_valid_zone_id(restart_zone_id_)) {
if (child_pid_ < 0) {
perror("fork");
exit(EXIT_FAILURE);
} else if (child_pid_ > 0) {
child_pid2_ = fork();
if (child_pid2_ > 0) {
ret = init_test_replica_(1);
} else if (child_pid2_ == 0) {
ret = init_test_replica_(3);
int prev_zone_pid = 999999;
for (int i = 0; i < 3; i++) {
if (prev_zone_pid > 0) {
prev_zone_pid = fork();
if (prev_zone_pid > 0) {
zone_pids_[i] = prev_zone_pid;
} else if (prev_zone_pid == 0) {
cur_zone_id_ = i + 1;
SERVER_LOG(INFO, "[ObMultiReplicaTestBase] init sub process zone id", K(i), K(getpid()),
K(cur_zone_id_), K(prev_zone_pid));
} else if (prev_zone_pid < 0) {
perror("fork");
exit(EXIT_FAILURE);
}
}
}
} else if (child_pid_ == 0) {
ret = init_test_replica_(2);
if (cur_zone_id_ < 0) {
// guard process
cur_zone_id_ = 0;
::testing::GTEST_FLAG(filter) = "GuardProcessTest";
SERVER_LOG(INFO, "[ObMultiReplicaTestBase] init guard zone id", K(ret), K(getpid()),
K(cur_zone_id_), K(zone_pids_[0]), K(zone_pids_[1]), K(zone_pids_[2]));
} else {
::testing::GTEST_FLAG(filter) =
ObMultiReplicaTestBase::ZONE_TEST_CASE_NAME[cur_zone_id_ - 1] + "*";
fprintf(stdout, "zone %d test_case_name = %s\n", cur_zone_id_,
ObMultiReplicaTestBase::ZONE_TEST_CASE_NAME[cur_zone_id_ - 1].c_str());
ret = init_test_replica_(cur_zone_id_);
SERVER_LOG(INFO, "[ObMultiReplicaTestBase] init sub process replica", K(getpid()),
K(cur_zone_id_), K(prev_zone_pid));
}
} else {
cur_zone_id_ = restart_zone_id_;
std::string restart_case_name = TEST_CASE_BASE_NAME + std::string("_RESTART_")
+ std::to_string(restart_no_) + std::string("_ZONE")
+ std::to_string(restart_zone_id_) + "*";
::testing::GTEST_FLAG(filter) = restart_case_name.c_str();
fprintf(stdout, "restart pid%d zone %d test_case_name = %s\n", getpid(), restart_zone_id_,
restart_case_name.c_str());
ret = init_test_replica_(cur_zone_id_);
}
}
@ -360,21 +518,46 @@ int ObMultiReplicaTestBase::init_replicas_()
return ret;
}
int ObMultiReplicaTestBase::restart_zone(const int zone_id, const int restart_no)
{
int ret = OB_SUCCESS;
char exec_path[4096] = {0};
int return_val = 0;
fprintf(stdout,
"[RESTART %d] prepare restart zone %d with the restart_no of %d (cur_time = %ld)\n",
getpid(), zone_id, restart_no, ObTimeUtility::current_time());
if (zone_id != cur_zone_id_ || restart_no <= 0) {
ret = OB_INVALID_ARGUMENT;
SERVER_LOG(ERROR, "invalid restart arg", K(ret), K(zone_id), K(restart_no), K(cur_zone_id_),
K(getpid()));
} else if (0 >= (return_val = readlink("/proc/self/exe", exec_path, 4096))) {
ret = OB_ERR_UNEXPECTED;
SERVER_LOG(ERROR, "realink for exec name error", K(ret), K(zone_id), K(restart_no),
K(cur_zone_id_), K(getpid()), K(return_val), K(exec_path));
} else if (OB_FAIL(chdir(exec_dir_.c_str()))) {
// } else if (OB_FALSE_IT())
} else if (0 > (return_val = execl(exec_path, exec_path, (std::to_string(zone_id)).c_str(),
(std::to_string(restart_no)).c_str(), NULL))) {
ret = OB_ERR_UNEXPECTED;
SERVER_LOG(ERROR, "execl for restart error", K(ret), K(errno), K(zone_id), K(restart_no),
K(cur_zone_id_), K(getpid()), K(return_val), K(exec_path));
}
return ret;
}
int ObMultiReplicaTestBase::init_test_replica_(const int zone_id)
{
int ret = OB_SUCCESS;
::testing::GTEST_FLAG(filter) = ObMultiReplicaTestBase::ZONE_TEST_CASE_NAME[zone_id - 1] + "*";
// std::string output_file_path =
// env_prefix_path_ + "/" + "ZONE" + std::to_string(zone_id) + ".output";
// ::testing::GTEST_FLAG(output) = output_file_path;
fprintf(stdout, "zone %d test_case_name = %s\n", zone_id,
ObMultiReplicaTestBase::ZONE_TEST_CASE_NAME[zone_id - 1].c_str());
if (replica_ == nullptr) {
cur_zone_id_ = zone_id;
replica_ = std::make_shared<observer::ObSimpleServerReplica>(
"zone" + std::to_string(zone_id), zone_id, rpc_ports_[zone_id - 1], rs_list_, server_list_,
app_name_, "zone" + std::to_string(zone_id), zone_id, rpc_ports_[zone_id - 1], rs_list_,
server_list_, is_valid_zone_id(restart_zone_id_),
oceanbase::observer::ObServer::get_instance(), "./store", log_disk_size_, memory_size_);
} else {
SERVER_LOG(ERROR, "construct ObSimpleServerReplica repeatedlly", K(ret), K(zone_id),
@ -395,9 +578,8 @@ int ObMultiReplicaTestBase::read_cur_json_document_(rapidjson::Document &json_do
int ret = OB_SUCCESS;
FILE *fp = fopen(event_file_path_.c_str(), "r");
if (fp == NULL) {
if(json_doc.IsObject())
{
fprintf(stdout, "Fail to open file! file_path = %s\n", event_file_path_.c_str());
if (json_doc.IsObject()) {
fprintf(stdout, "Fail to open file! file_path = %s\n", event_file_path_.c_str());
}
ret = OB_ENTRY_NOT_EXIST;
return ret;
@ -492,8 +674,8 @@ int ObMultiReplicaTestBase::finish_event(const std::string &event_name,
fclose(fp);
}
SERVER_LOG(INFO, "[ObMultiReplicaTestBase] [WAIT EVENT] write target event", K(event_name.c_str()),
K(event_content.c_str()));
SERVER_LOG(INFO, "[ObMultiReplicaTestBase] [WAIT EVENT] write target event",
K(event_name.c_str()), K(event_content.c_str()));
return ret;
}
@ -608,7 +790,8 @@ int ObMultiReplicaTestBase::create_tenant(const char *tenant_name,
}
{
ObSqlString sql;
if (FAILEDx(sql.assign_fmt("alter system set _enable_parallel_table_creation = false tenant = all"))) {
if (FAILEDx(sql.assign_fmt(
"alter system set _enable_parallel_table_creation = false tenant = all"))) {
SERVER_LOG(WARN, "create_tenant", KR(ret));
} else if (OB_FAIL(sql_proxy.write(sql.ptr(), affected_rows))) {
SERVER_LOG(WARN, "create_tenant", KR(ret));
@ -698,19 +881,16 @@ int ObMultiReplicaTestBase::check_tenant_exist(bool &bool_ret, const char *tenan
return ret;
}
} // namespace unittest
} // namespace oceanbase
int ::oceanbase::omt::ObWorkerProcessor::process_err_test()
{
int ret = OB_SUCCESS;
if(ATOMIC_LOAD(&::oceanbase::unittest::ObMultiReplicaTestBase::block_msg_))
{
ret =OB_EAGAIN;
SERVER_LOG(INFO, "[ObMultiReplicaTestBase] block msg process",K(ret));
if (ATOMIC_LOAD(&::oceanbase::unittest::ObMultiReplicaTestBase::block_msg_)) {
ret = OB_EAGAIN;
SERVER_LOG(INFO, "[ObMultiReplicaTestBase] block msg process", K(ret));
}
return ret;

View File

@ -35,6 +35,23 @@ int set_trace_id(char *buf);
void init_log_and_gtest(int argc, char **argv);
void init_gtest_output(std::string &gtest_log_name);
struct RestartIndex
{
int restart_zone_id_;
int restart_no_;
void reset()
{
restart_zone_id_ = -1;
restart_no_ = 0;
}
RestartIndex()
{
reset();
}
};
class ObMultiReplicaTestBase : public testing::Test
{
public:
@ -43,8 +60,12 @@ public:
ObMultiReplicaTestBase();
virtual ~ObMultiReplicaTestBase();
static int bootstrap_multi_replica(const std::string &env_prefix = "run_");
static int bootstrap_multi_replica(const std::string &app_name,
const int restart_zone_id = -1,
const int restart_no = 0,
const std::string &env_prefix = "run_");
static int wait_all_test_completed();
static int restart_zone(const int zone_id, const int restart_no);
static int start();
static int close();
observer::ObServer &get_curr_observer() { return replica_->get_observer(); }
@ -66,12 +87,14 @@ public:
int exec_write_sql_sys(const char *sql_str, int64_t &affected_rows);
int check_tenant_exist(bool &bool_ret, const char *tenant_name = DEFAULT_TEST_TENANT_NAME);
static std::string TEST_CASE_BASE_NAME;
static std::string ZONE_TEST_CASE_NAME[MAX_ZONE_COUNT];
protected:
static int init_replicas_();
static int init_test_replica_(const int zone_id);
static int is_valid_zone_id(int zone_id) { return zone_id >= 0; }
protected:
virtual void SetUp();
virtual void TearDown();
@ -85,7 +108,8 @@ protected:
static bool is_inited_;
static std::thread th_;
static std::string env_prefix_;
static std::string curr_dir_;
static std::string app_name_;
static std::string exec_dir_;
static std::string event_file_path_;
static std::string env_prefix_path_;
static bool enable_env_warn_log_;
@ -96,8 +120,9 @@ protected:
static std::string local_ip_;
static int cur_zone_id_;
static int child_pid_;
static int child_pid2_;
static int restart_zone_id_;
static int restart_no_;
static std::vector<int> zone_pids_;
static std::vector<int64_t> rpc_ports_;
static ObServerInfoList server_list_;

View File

@ -37,6 +37,7 @@
{ \
namespace unittest \
{ \
std::string ObMultiReplicaTestBase::TEST_CASE_BASE_NAME = STR_NAME(CUR_TEST_CASE_NAME); \
std::string ObMultiReplicaTestBase::ZONE_TEST_CASE_NAME[MAX_ZONE_COUNT] = { \
GET_ZONE_TEST_CLASS_STR(1), GET_ZONE_TEST_CLASS_STR(2), GET_ZONE_TEST_CLASS_STR(3)}; \
\
@ -62,22 +63,61 @@
} \
}
#define RESTART_ZONE_TEST_CASE_CALSS_NAME_INNER(TEST_CASE_NAME, ZONE_ID, RESTART_NO) \
TEST_CASE_NAME##_RESTART_##RESTART_NO##_ZONE##ZONE_ID
#define RESTART_ZONE_TEST_CASE_CALSS_NAME(TEST_CASE_NAME, ZONE_ID, RESTART_NO) \
RESTART_ZONE_TEST_CASE_CALSS_NAME_INNER(TEST_CASE_NAME, ZONE_ID, RESTART_NO)
#define GET_RESTART_ZONE_TEST_CLASS_NAME(ZONE_ID, RESTART_NO) \
RESTART_ZONE_TEST_CASE_CALSS_NAME(CUR_TEST_CASE_NAME, ZONE_ID, RESTART_NO)
#define GET_RESTART_ZONE_TEST_CLASS_STR(ZONE_ID, REGISTER_NO) \
STR_NAME(GET_RESTART_ZONE_TEST_CLASS_NAME(ZONE_ID, REGISTER_NO))
#define APPEND_RESTART_TEST_CASE_CLASS(ZONE_ID, REGISTER_NO) \
namespace oceanbase \
{ \
namespace unittest \
{ \
class GET_RESTART_ZONE_TEST_CLASS_NAME(ZONE_ID, REGISTER_NO) : public ObMultiReplicaTestBase \
{ \
public: \
GET_RESTART_ZONE_TEST_CLASS_NAME(ZONE_ID, REGISTER_NO)() : ObMultiReplicaTestBase() {} \
}; \
TEST_F(GET_RESTART_ZONE_TEST_CLASS_NAME(ZONE_ID, REGISTER_NO), start_observer) {} \
} \
}
#define MULTI_REPLICA_TEST_MAIN_FUNCTION(TEST_DIR_PREFIX) \
int main(int argc, char **argv) \
{ \
int ret = OB_SUCCESS; \
int restart_zone_id = -1; \
int restart_zone_no = 0; \
if (argc < 1) { \
abort(); \
} \
std::string app_name = argv[0]; \
if (argc >= 3) { \
std::string tmp_str; \
tmp_str = argv[1]; \
restart_zone_id = std::stoi(tmp_str); \
tmp_str = argv[2]; \
restart_zone_no = std::stoi(tmp_str); \
} \
char *log_level = (char *)"INFO"; \
oceanbase::unittest::init_log_and_gtest(argc, argv); \
OB_LOGGER.set_log_level(log_level); \
::testing::InitGoogleTest(&argc, argv); \
if (OB_FAIL(oceanbase::unittest::ObMultiReplicaTestBase::bootstrap_multi_replica( \
#TEST_DIR_PREFIX))) { \
app_name, restart_zone_id, restart_zone_no, #TEST_DIR_PREFIX))) { \
fprintf(stdout, "init test case failed. ret = %d", ret); \
return ret; \
} \
return RUN_ALL_TESTS(); \
}
// oceanbase::unittest::init_log_and_gtest(argc, argv);
namespace oceanbase
{
namespace unittest

View File

@ -79,24 +79,21 @@ int64_t ObSimpleServerReplica::get_rpc_port(int &server_fd)
return unittest::get_rpc_port(server_fd);
}
ObSimpleServerReplica::ObSimpleServerReplica(const std::string &env_prefix,
const int zone_id,
const int rpc_port,
const string &rs_list,
const ObServerInfoList &server_list,
ObServer &server ,
const std::string &dir_prefix,
const char *log_disk_size,
const char *memory_limit)
: server_(server),
zone_id_(zone_id),
rpc_port_(rpc_port),
rs_list_(rs_list),
server_info_list_(server_list),
data_dir_(dir_prefix),
run_dir_(env_prefix),
log_disk_size_(log_disk_size),
memory_limit_(memory_limit)
ObSimpleServerReplica::ObSimpleServerReplica(const std::string &app_name,
const std::string &env_prefix,
const int zone_id,
const int rpc_port,
const string &rs_list,
const ObServerInfoList &server_list,
bool is_restart,
ObServer &server,
const std::string &dir_prefix,
const char *log_disk_size,
const char *memory_limit)
: server_(server), zone_id_(zone_id), rpc_port_(rpc_port), rs_list_(rs_list),
server_info_list_(server_list), app_name_(app_name), data_dir_(dir_prefix),
run_dir_(env_prefix), log_disk_size_(log_disk_size), memory_limit_(memory_limit),
is_restart_(is_restart)
{
// if (ObSimpleServerReplicaRestartHelper::is_restart_) {
// std::string port_file_name = run_dir_ + std::string("/port.txt");
@ -106,7 +103,7 @@ ObSimpleServerReplica::ObSimpleServerReplica(const std::string &env_prefix,
// }
// fscanf(infile, "%d\n", &rpc_port_);
// } else {
// rpc_port_ = unittest::get_rpc_port(server_fd_);
// rpc_port_ = unittest::get_rpc_port(server_fd_);
// }
mysql_port_ = rpc_port_ + 1;
}
@ -172,16 +169,31 @@ int ObSimpleServerReplica::simple_init()
getpid(), zone_id_, rpc_port_, mysql_port_, zone_str.c_str(), server_info_list_.count(),
rs_list_.c_str());
// 因为改变了工作目录,设置为绝对路径
for (int i = 0; i < MAX_FD_FILE; i++) {
int len = strlen(OB_LOGGER.log_file_[i].filename_);
if (len > 0) {
std::string cur_file_name = OB_LOGGER.log_file_[i].filename_;
cur_file_name = cur_file_name.substr(cur_file_name.find_last_of("/\\") + 1);
std::string ab_file = std::string(curr_dir) + "/" + run_dir_ + "/"
+ std::string(OB_LOGGER.log_file_[i].filename_);
+ cur_file_name;
SERVER_LOG(INFO, "convert ab file", K(ab_file.c_str()));
MEMCPY(OB_LOGGER.log_file_[i].filename_, ab_file.c_str(), ab_file.size());
}
}
// std::string ab_file = std::string(curr_dir) + "/" + run_dir_ + "/" + app_name_;
//
// std::string app_log_name = ab_file + ".log";
// std::string app_rs_log_name = ab_file + "_rs.log";
// std::string app_ele_log_name = ab_file + "_election.log";
// std::string app_trace_log_name = ab_file + "_trace.log";
// OB_LOGGER.set_file_name(app_log_name.c_str(),
// true,
// false,
// app_rs_log_name.c_str(),
// app_ele_log_name.c_str(),
// app_trace_log_name.c_str());
ObPLogWriterCfg log_cfg;
ret = server_.init(opts, log_cfg);
@ -277,7 +289,7 @@ int ObSimpleServerReplica::simple_start()
{
int ret = OB_SUCCESS;
// bootstrap
if (zone_id_ == 1) {
if (zone_id_ == 1 && !is_restart_) {
std::thread th([this]() {
int64_t start_time = ObTimeUtility::current_time();
int ret = OB_SUCCESS;
@ -298,7 +310,7 @@ int ObSimpleServerReplica::simple_start()
SERVER_LOG(INFO, "ObSimpleServerReplica init succ prepare to start...", K(zone_id_), K(rpc_port_),
K(mysql_port_));
ret = server_.start();
if (zone_id_ == 1) {
if (zone_id_ == 1 && !is_restart_) {
th_.join();
fprintf(stdout, "[BOOTSTRAP SUCC] zone_id = %d, rpc_port = %d, mysql_port = %d\n", zone_id_,
rpc_port_, mysql_port_);

View File

@ -33,11 +33,13 @@ public:
static int64_t get_rpc_port(int &server_fd);
public:
ObSimpleServerReplica(const std::string &env_prefix,
ObSimpleServerReplica(const std::string &app_name,
const std::string &env_prefix,
const int zone_id,
const int rpc_port,
const string &rs_list,
const ObServerInfoList &server_list,
bool is_restart,
ObServer &server = ObServer::get_instance(),
const std::string &dir_prefix = "./store",
const char *log_disk_size = "10G",
@ -77,6 +79,7 @@ private:
int mysql_port_;
std::string rs_list_;
ObServerInfoList server_info_list_;
std::string app_name_;
std::string data_dir_;
std::string optstr_;
std::string run_dir_;
@ -91,6 +94,8 @@ private:
int server_fd_;
bool set_bootstrap_warn_log_;
bool is_restart_;
obrpc::ObNetClient bootstrap_client_;
obrpc::ObSrvRpcProxy bootstrap_srv_proxy_;
};

View File

@ -0,0 +1,234 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#include <gtest/gtest.h>
#define USING_LOG_PREFIX SERVER
#define protected public
#define private public
#include "env/ob_fast_bootstrap.h"
#include "env/ob_multi_replica_util.h"
#include "lib/mysqlclient/ob_mysql_result.h"
#include "storage/tx/ob_trans_part_ctx.h"
using namespace oceanbase::transaction;
using namespace oceanbase::storage;
#define CUR_TEST_CASE_NAME ObMdsReplayFromCtxTable
DEFINE_MULTI_ZONE_TEST_CASE_CLASS
APPEND_RESTART_TEST_CASE_CLASS(2, 1);
MULTI_REPLICA_TEST_MAIN_FUNCTION(test_mds_replay_from_ctx_table_);
namespace oceanbase
{
namespace unittest
{
// using namespace storage;
using namespace oceanbase::storage::checkpoint;
struct RegisterSuccTxArg
{
transaction::ObTransID tx_id_;
share::SCN register_scn1_;
int64_t register_no1_;
share::SCN register_scn2_;
int64_t register_no2_;
TO_STRING_KV(K(tx_id_), K(register_scn1_), K(register_no1_), K(register_scn2_), K(register_no2_));
OB_UNIS_VERSION(1);
};
OB_SERIALIZE_MEMBER(RegisterSuccTxArg,
tx_id_,
register_scn1_,
register_no1_,
register_scn2_,
register_no2_);
RegisterSuccTxArg register_succ_arg;
common::ObMySQLTransaction mysql_trans;
oceanbase::observer::ObInnerSQLConnection *register_mds_conn = nullptr;
void minor_freeze_tx_ctx_memtable(ObLS *ls)
{
TRANS_LOG(INFO, "minor_freeze_tx_ctx_memtable begin");
int ret = OB_SUCCESS;
ObCheckpointExecutor *checkpoint_executor = ls->get_checkpoint_executor();
ASSERT_NE(nullptr, checkpoint_executor);
ObTxCtxMemtable *tx_ctx_memtable = dynamic_cast<ObTxCtxMemtable *>(
dynamic_cast<ObLSTxService *>(
checkpoint_executor->handlers_[logservice::TRANS_SERVICE_LOG_BASE_TYPE])
->common_checkpoints_[ObCommonCheckpointType::TX_CTX_MEMTABLE_TYPE]);
ASSERT_EQ(true, tx_ctx_memtable->is_active_memtable());
ASSERT_EQ(OB_SUCCESS, tx_ctx_memtable->flush(share::SCN::max_scn()));
// // TODO(handora.qc): use more graceful wait
// usleep(10 * 1000 * 1000);
// usleep(100 * 1000);
RETRY_UNTIL_TIMEOUT(tx_ctx_memtable->is_active_memtable(), 10 * 1000 * 1000, 100 * 1000);
ASSERT_EQ(OB_SUCCESS, ret);
TRANS_LOG(INFO, "minor_freeze_tx_ctx_memtable end");
}
TEST_F(GET_ZONE_TEST_CLASS_NAME(1), register_mds_without_commit)
{
int ret = OB_SUCCESS;
common::ObMySQLProxy &sys_tenant_proxy = get_curr_simple_server().get_sql_proxy();
ASSERT_EQ(OB_SUCCESS, mysql_trans.start(GCTX.sql_proxy_, 1));
// ACQUIRE_CONN_FROM_SQL_PROXY(sys_conn, sys_tenant_proxy);
// ASSERT_EQ(OB_SUCCESS, mysql_trans.start_transaction(1, false));
register_mds_conn = static_cast<observer::ObInnerSQLConnection *>(mysql_trans.get_connection());
transaction::ObTxDesc *desc = register_mds_conn->get_session().get_tx_desc();
ASSERT_NE(nullptr, desc);
ASSERT_EQ(true, desc->is_valid());
std::string test_register_str1 = "TEST REGISTER MDS 1";
transaction::ObRegisterMdsFlag register_mds_flag1;
register_mds_flag1.need_flush_redo_instantly_ = true;
ASSERT_EQ(OB_SUCCESS, register_mds_flag1.mds_base_scn_.convert_for_tx(1000));
ASSERT_EQ(OB_SUCCESS,
register_mds_conn->register_multi_data_source(
1, ObLSID(1), transaction::ObTxDataSourceType::DDL_TRANS,
test_register_str1.c_str(), test_register_str1.size(), register_mds_flag1));
ObPartTransCtx *tx_ctx = nullptr;
GET_LS(1, 1, ls_handle);
ASSERT_EQ(ls_handle.is_valid(), true);
ASSERT_EQ(OB_SUCCESS, ls_handle.get_ls()->get_tx_ctx(desc->get_tx_id(), false, tx_ctx));
RETRY_UNTIL_TIMEOUT(tx_ctx->busy_cbs_.is_empty(), 5 * 1000 * 1000, 5000);
ASSERT_EQ(ret, OB_SUCCESS);
share::SCN register_1_scn = tx_ctx->exec_info_.max_applying_log_ts_;
std::string test_register_str2 = "TEST REGISTER MDS 2";
transaction::ObRegisterMdsFlag register_mds_flag2;
register_mds_flag2.need_flush_redo_instantly_ = true;
ASSERT_EQ(OB_SUCCESS, register_mds_flag2.mds_base_scn_.convert_for_tx(2000));
ASSERT_EQ(OB_SUCCESS,
register_mds_conn->register_multi_data_source(
1, ObLSID(1), transaction::ObTxDataSourceType::DDL_TRANS,
test_register_str2.c_str(), test_register_str2.size(), register_mds_flag2));
RETRY_UNTIL_TIMEOUT(tx_ctx->busy_cbs_.is_empty(), 5 * 1000 * 1000, 5000);
ASSERT_EQ(ret, OB_SUCCESS);
share::SCN register_2_scn = tx_ctx->exec_info_.max_applying_log_ts_;
ASSERT_EQ(true, register_2_scn > register_1_scn);
ASSERT_EQ(tx_ctx->exec_info_.multi_data_source_.count(), 2);
register_succ_arg.tx_id_ = desc->get_tx_id();
register_succ_arg.register_scn1_ = register_1_scn;
register_succ_arg.register_no1_ = tx_ctx->exec_info_.multi_data_source_[0].get_register_no();
register_succ_arg.register_scn2_ = register_2_scn;
register_succ_arg.register_no2_ = tx_ctx->exec_info_.multi_data_source_[1].get_register_no();
TRANS_LOG(INFO, "[ObMultiReplicaTestBase] register mds into tx and submit mds redo success",
K(ret), K(register_succ_arg), K(tx_ctx->exec_info_.multi_data_source_));
ASSERT_EQ(OB_SUCCESS, ls_handle.get_ls()->revert_tx_ctx(tx_ctx));
std::string tmp_str;
ASSERT_EQ(OB_SUCCESS,
EventArgSerTool<RegisterSuccTxArg>::serialize_arg(register_succ_arg, tmp_str));
ASSERT_EQ(OB_SUCCESS, finish_event("REGISTER_MDS_SUCC", tmp_str));
}
TEST_F(GET_ZONE_TEST_CLASS_NAME(2), replay_mds_log_normal)
{
int ret = OB_SUCCESS;
std::string tmp_event_val;
ASSERT_EQ(OB_SUCCESS, wait_event_finish("REGISTER_MDS_SUCC", tmp_event_val, 30 * 60 * 1000));
ASSERT_EQ(OB_SUCCESS,
EventArgSerTool<RegisterSuccTxArg>::deserialize_arg(register_succ_arg, tmp_event_val));
ObPartTransCtx *tx_ctx = nullptr;
GET_LS(1, 1, ls_handle);
ASSERT_EQ(ls_handle.is_valid(), true);
ASSERT_EQ(OB_SUCCESS, ls_handle.get_ls()->get_tx_ctx(register_succ_arg.tx_id_, true, tx_ctx));
RETRY_UNTIL_TIMEOUT(tx_ctx->exec_info_.max_applied_log_ts_ == register_succ_arg.register_scn2_,
5 * 1000 * 1000, 5000);
ASSERT_EQ(ret, OB_SUCCESS);
ASSERT_EQ(tx_ctx->exec_info_.multi_data_source_.count(), 2);
// ASSERT_EQ(tx_ctx->exec_info_.multi_data_source_[0].get_register_no(),
// register_succ_arg.register_no1_);
// ASSERT_EQ(tx_ctx->exec_info_.multi_data_source_[1].get_register_no(),
// register_succ_arg.register_no2_);
ASSERT_EQ(OB_SUCCESS, ls_handle.get_ls()->revert_tx_ctx(tx_ctx));
}
TEST_F(GET_ZONE_TEST_CLASS_NAME(2), dump_tx_ctx_table)
{
int ret = OB_SUCCESS;
GET_LS(1, 1, ls_handle);
ASSERT_EQ(ls_handle.is_valid(), true);
{
share::ObTenantSwitchGuard tenant_guard;
ASSERT_EQ(OB_SUCCESS, tenant_guard.switch_to(1));
minor_freeze_tx_ctx_memtable(ls_handle.get_ls());
}
ASSERT_EQ(restart_zone(2, 1), OB_SUCCESS);
}
TEST_F(GET_RESTART_ZONE_TEST_CLASS_NAME(2, 1), restart_zone2_from_tx_ctx_table)
{
int ret = OB_SUCCESS;
std::string tmp_event_val;
ASSERT_EQ(OB_SUCCESS, wait_event_finish("REGISTER_MDS_SUCC", tmp_event_val, 30 * 60 * 1000));
ASSERT_EQ(OB_SUCCESS,
EventArgSerTool<RegisterSuccTxArg>::deserialize_arg(register_succ_arg, tmp_event_val));
ObPartTransCtx *tx_ctx = nullptr;
GET_LS(1, 1, ls_handle);
ASSERT_EQ(ls_handle.is_valid(), true);
ASSERT_EQ(OB_SUCCESS, ls_handle.get_ls()->get_tx_ctx(register_succ_arg.tx_id_, true, tx_ctx));
RETRY_UNTIL_TIMEOUT(tx_ctx->start_recover_ts_ == register_succ_arg.register_scn2_,
5 * 1000 * 1000, 5000);
// tx_ctx->print_trace_log();
// TRANS_LOG(INFO, "after restart, print tx ctx",K(*tx_ctx));
ASSERT_EQ(ret, OB_SUCCESS);
RETRY_UNTIL_TIMEOUT(tx_ctx->exec_info_.max_applying_log_ts_ == register_succ_arg.register_scn2_,
5 * 1000 * 1000, 5000);
ASSERT_EQ(ret, OB_SUCCESS);
TRANS_LOG(INFO, "[ObMultiReplicaTestBase] Print mds in tx ctx after retarted", K(ret),
K(tx_ctx->trans_id_), K(tx_ctx->exec_info_.multi_data_source_));
ASSERT_EQ(tx_ctx->exec_info_.multi_data_source_.count(), 2);
ASSERT_EQ(tx_ctx->exec_info_.multi_data_source_[0].get_register_no(),
register_succ_arg.register_no1_);
ASSERT_EQ(tx_ctx->exec_info_.multi_data_source_[1].get_register_no(),
register_succ_arg.register_no2_);
ASSERT_EQ(OB_SUCCESS, ls_handle.get_ls()->revert_tx_ctx(tx_ctx));
}
} // namespace unittest
} // namespace oceanbase

View File

@ -16,9 +16,16 @@
#define private public
#include "env/ob_multi_replica_test_base.h"
#include "env/ob_multi_replica_util.h"
#include "env/ob_fast_bootstrap.h"
#include "lib/mysqlclient/ob_mysql_result.h"
#define CUR_TEST_CASE_NAME ObSimpleMultiReplicaExampleTest
DEFINE_MULTI_ZONE_TEST_CASE_CLASS
APPEND_RESTART_TEST_CASE_CLASS(1, 1)
namespace oceanbase
{
namespace unittest
@ -27,12 +34,6 @@ namespace unittest
using namespace oceanbase::transaction;
using namespace oceanbase::storage;
std::string ObMultiReplicaTestBase::ZONE_TEST_CASE_NAME[MAX_ZONE_COUNT] = {
"ObSimpleMultiReplicaExampleTest_ZONE1", "ObSimpleMultiReplicaExampleTest_ZONE2",
"ObSimpleMultiReplicaExampleTest_ZONE3"};
static const std::string TEST_DIR_PREFIX = "test_multi_replica_basic_";
class TestRunCtx
{
public:
@ -42,25 +43,6 @@ public:
TestRunCtx RunCtx;
class ObSimpleMultiReplicaExampleTest_ZONE1 : public ObMultiReplicaTestBase
{
public:
ObSimpleMultiReplicaExampleTest_ZONE1() : ObMultiReplicaTestBase() {}
};
class ObSimpleMultiReplicaExampleTest_ZONE2 : public ObMultiReplicaTestBase
{
public:
ObSimpleMultiReplicaExampleTest_ZONE2() : ObMultiReplicaTestBase() {}
};
class ObSimpleMultiReplicaExampleTest_ZONE3 : public ObMultiReplicaTestBase
{
public:
ObSimpleMultiReplicaExampleTest_ZONE3() : ObMultiReplicaTestBase() {}
};
TEST_F(ObSimpleMultiReplicaExampleTest_ZONE1, observer_start)
{
SERVER_LOG(INFO, "observer_start succ");
@ -78,7 +60,6 @@ TEST_F(ObSimpleMultiReplicaExampleTest_ZONE1, add_tenant)
ASSERT_EQ(OB_SUCCESS, get_curr_simple_server().init_sql_proxy2());
}
TEST_F(ObSimpleMultiReplicaExampleTest_ZONE1, create_table)
{
int ret = OB_SUCCESS;
@ -123,13 +104,12 @@ TEST_F(ObSimpleMultiReplicaExampleTest_ZONE1, create_table)
}
}
TEST_F(ObSimpleMultiReplicaExampleTest_ZONE1, delete_tenant)
TEST_F(ObSimpleMultiReplicaExampleTest_ZONE1, restart_zone1)
{
ASSERT_EQ(OB_SUCCESS, delete_tenant());
// ASSERT_EQ(OB_SUCCESS, delete_tenant());
ASSERT_EQ(OB_SUCCESS, ObMultiReplicaTestBase::restart_zone(1, 1));
}
TEST_F(ObSimpleMultiReplicaExampleTest_ZONE1, end)
{
RunCtx.time_sec_ = 0;
@ -154,41 +134,61 @@ TEST_F(ObSimpleMultiReplicaExampleTest_ZONE3, end)
}
}
TEST_F(GET_RESTART_ZONE_TEST_CLASS_NAME(1, 1), restart_zone1)
{
}
} // end unittest
} // end oceanbase
int main(int argc, char **argv)
{
int return_code = 0;
int ret = OB_SUCCESS;
int c = 0;
int time_sec = 0;
char *log_level = (char *)"INFO";
while (EOF != (c = getopt(argc, argv, "t:l:"))) {
switch (c) {
case 't':
time_sec = atoi(optarg);
break;
case 'l':
log_level = optarg;
oceanbase::unittest::ObMultiReplicaTestBase::enable_env_warn_log_ = false;
break;
default:
break;
}
}
oceanbase::unittest::init_log_and_gtest(argc, argv);
OB_LOGGER.set_log_level(log_level);
LOG_INFO("main>>>");
oceanbase::unittest::RunCtx.time_sec_ = time_sec;
::testing::InitGoogleTest(&argc, argv);
if (OB_FAIL(oceanbase::unittest::ObMultiReplicaTestBase::bootstrap_multi_replica(
oceanbase::unittest::TEST_DIR_PREFIX))) {
fprintf(stdout, "init test case failed. ret = %d", ret);
return ret;
}
return_code = RUN_ALL_TESTS();
return return_code;
}
MULTI_REPLICA_TEST_MAIN_FUNCTION( "test_multi_replica_basic_" );
// int main(int argc, char **argv)
// {
// int return_code = 0;
// int ret = OB_SUCCESS;
// int c = 0;
// int time_sec = 0;
// char *log_level = (char *)"INFO";
// while (EOF != (c = getopt(argc, argv, "t:l:"))) {
// switch (c) {
// case 't':
// time_sec = atoi(optarg);
// break;
// case 'l':
// log_level = optarg;
// oceanbase::unittest::ObMultiReplicaTestBase::enable_env_warn_log_ = false;
// break;
// default:
// break;
// }
// }
// oceanbase::unittest::init_log_and_gtest(argc, argv);
// OB_LOGGER.set_log_level(log_level);
//
// for(int i =0; i < argc; i++)
// {
// printf("MAIN FUNC : argc = %d, i =%d, arv = %s\n", argc, i, argv[i]);
// }
//
// int restart_zone_id = -1;
// int restart_zone_no = 0;
// if(argc >= 3)
// {
// std::string tmp_str;
// tmp_str = argv[1];
// restart_zone_id = std::stoi(tmp_str);
// tmp_str = argv[2];
// restart_zone_no = std::stoi(tmp_str);
// }
// LOG_INFO("main>>>");
// oceanbase::unittest::RunCtx.time_sec_ = time_sec;
// ::testing::InitGoogleTest(&argc, argv);
// if (OB_FAIL(oceanbase::unittest::ObMultiReplicaTestBase::bootstrap_multi_replica(
// restart_zone_id, restart_zone_no, oceanbase::unittest::TEST_DIR_PREFIX))) {
// fprintf(stdout, "init test case failed. ret = %d", ret);
// return ret;
// }
// return_code = RUN_ALL_TESTS();
// return return_code;
// }

View File

@ -42,7 +42,7 @@ namespace transaction
// ObTxBufferNode
//#####################################################
OB_SERIALIZE_MEMBER(ObTxBufferNode, type_, data_);
OB_SERIALIZE_MEMBER(ObTxBufferNode, type_, data_, register_no_);
int ObTxBufferNode::init(const ObTxDataSourceType type,
const ObString &data,
@ -63,6 +63,23 @@ int ObTxBufferNode::init(const ObTxDataSourceType type,
return ret;
}
int ObTxBufferNode::set_mds_register_no(const uint64_t register_no)
{
int ret = OB_SUCCESS;
if (register_no <= 0 || !is_valid()) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "invalid argument", K(ret), K(register_no), KPC(this));
} else if (register_no_ > 0) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "invalid register no", K(ret), K(register_no), KPC(this));
} else {
register_no_ = register_no;
// TRANS_LOG(INFO, "set register no in mds node", K(ret), KPC(this));
}
return ret;
}
void ObTxBufferNode::replace_data(const common::ObString &data)
{
if (nullptr != data_.ptr()) {
@ -158,8 +175,21 @@ int ObMulSourceTxDataNotifier::notify(const ObTxBufferNodeArray &array,
tmp_notify_arg.redo_submitted_ = node.is_submitted();
tmp_notify_arg.redo_synced_ = node.is_synced();
if (i > 0) {
const ObTxBufferNode &prev_node = array.at(i - 1);
if (ObTxBufferNode::is_valid_register_no(prev_node.get_register_no())
&& ObTxBufferNode::is_valid_register_no(node.get_register_no())
&& prev_node.get_register_no() >= node.get_register_no()) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "unexpected register no for the mds_node", K(ret), K(i), K(array.count()),
K(arg), K(node), K(prev_node));
}
}
OB_ASSERT(node.type_ != ObTxDataSourceType::BEFORE_VERSION_4_1);
if (node.type_ < ObTxDataSourceType::BEFORE_VERSION_4_1
if(OB_FAIL(ret)) {
//do nothing
} else if (node.type_ < ObTxDataSourceType::BEFORE_VERSION_4_1
&& ObTxDataSourceType::CREATE_TABLET_NEW_MDS != node.type_
&& ObTxDataSourceType::DELETE_TABLET_NEW_MDS != node.type_
&& ObTxDataSourceType::UNBIND_TABLET_NEW_MDS != node.type_) {

View File

@ -13,6 +13,7 @@
#ifndef OCEANBASE_TRANSACTION_OB_MULTI_DATA_SOURCE_
#define OCEANBASE_TRANSACTION_OB_MULTI_DATA_SOURCE_
#include "share/ob_cluster_version.h"
#include "lib/container/ob_se_array.h"
#include "lib/list/ob_list.h"
#include "lib/utility/ob_unify_serialize.h"
@ -87,14 +88,20 @@ class ObTxBufferNode
public:
ObTxBufferNode() : type_(ObTxDataSourceType::UNKNOWN), data_() { reset(); }
~ObTxBufferNode() {}
int init(const ObTxDataSourceType type, const common::ObString &data, const share::SCN &base_scn, storage::mds::BufferCtx *ctx);
int init(const ObTxDataSourceType type,
const common::ObString &data,
const share::SCN &base_scn,
storage::mds::BufferCtx *ctx);
bool is_valid() const
{
return type_ > ObTxDataSourceType::UNKNOWN && type_ < ObTxDataSourceType::MAX_TYPE
&& data_.length() > 0;
bool valid_member = false;
valid_member = type_ > ObTxDataSourceType::UNKNOWN && type_ < ObTxDataSourceType::MAX_TYPE
&& data_.length() > 0;
return valid_member;
}
void reset()
{
register_no_ = 0;
type_ = ObTxDataSourceType::UNKNOWN;
data_.reset();
has_submitted_ = false;
@ -102,6 +109,10 @@ public:
mds_base_scn_.reset();
}
static bool is_valid_register_no(const int64_t register_no) { return register_no > 0; }
int set_mds_register_no(const uint64_t register_no);
uint64_t get_register_no() const { return register_no_; }
// only for some mds types of CDC
// can not be used by observer functions
bool allow_to_use_mds_big_segment() { return type_ == ObTxDataSourceType::DDL_TRANS; }
@ -121,7 +132,7 @@ public:
const share::SCN &get_base_scn() { return mds_base_scn_; }
bool operator==(const ObTxBufferNode & buffer_node) const;
bool operator==(const ObTxBufferNode &buffer_node) const;
void log_sync_fail()
{
@ -129,8 +140,10 @@ public:
has_synced_ = false;
}
storage::mds::BufferCtxNode &get_buffer_ctx_node() const { return buffer_ctx_node_; }
TO_STRING_KV(K(has_submitted_), K(has_synced_), K_(type), K(data_.length()));
TO_STRING_KV(K(register_no_), K(has_submitted_), K(has_synced_), K_(type), K(data_.length()));
private:
uint64_t register_no_;
bool has_submitted_;
bool has_synced_;
share::SCN mds_base_scn_;

View File

@ -6045,34 +6045,34 @@ int ObPartTransCtx::gen_total_mds_array_(ObTxBufferNodeArray &mds_array) const
}
int ObPartTransCtx::deep_copy_mds_array_(const ObTxBufferNodeArray &mds_array,
ObTxBufferNodeArray &incremental_array,
bool need_replace)
ObTxBufferNodeArray &incremental_array,
bool need_replace)
{
auto process_with_buffer_ctx = [this](const ObTxBufferNode &old_node,
mds::BufferCtx *&new_ctx) -> int {
int ret = OB_SUCCESS;
if (old_node.get_data_source_type() <= ObTxDataSourceType::UNKNOWN ||
old_node.get_data_source_type() >= ObTxDataSourceType::MAX_TYPE) {
if (old_node.get_data_source_type() <= ObTxDataSourceType::UNKNOWN
|| old_node.get_data_source_type() >= ObTxDataSourceType::MAX_TYPE) {
ret = OB_ERR_UNDEFINED;
TRANS_LOG(ERROR, "unexpected mds type", KR(ret), K(*this));
} else if (old_node.get_data_source_type() <= ObTxDataSourceType::BEFORE_VERSION_4_1
&& ObTxDataSourceType::CREATE_TABLET_NEW_MDS != old_node.get_data_source_type()
&& ObTxDataSourceType::DELETE_TABLET_NEW_MDS != old_node.get_data_source_type()
&& ObTxDataSourceType::UNBIND_TABLET_NEW_MDS != old_node.get_data_source_type()) {
&& ObTxDataSourceType::CREATE_TABLET_NEW_MDS != old_node.get_data_source_type()
&& ObTxDataSourceType::DELETE_TABLET_NEW_MDS != old_node.get_data_source_type()
&& ObTxDataSourceType::UNBIND_TABLET_NEW_MDS != old_node.get_data_source_type()) {
TRANS_LOG(INFO, "old mds type, no need process with buffer ctx",
K(old_node.get_data_source_type()), K(*this));
K(old_node.get_data_source_type()), K(*this));
} else {
if (OB_ISNULL(old_node.get_buffer_ctx_node().get_ctx())) {// this is replay path, create ctx
if (OB_FAIL(mds::MdsFactory::create_buffer_ctx(old_node.get_data_source_type(),
trans_id_,
if (OB_ISNULL(old_node.get_buffer_ctx_node().get_ctx())) { // this is replay path, create ctx
if (OB_FAIL(mds::MdsFactory::create_buffer_ctx(old_node.get_data_source_type(), trans_id_,
new_ctx))) {
TRANS_LOG(WARN, "fail to create buffer ctx", KR(ret), KPC(new_ctx), K(*this), K(old_node));
TRANS_LOG(WARN, "fail to create buffer ctx", KR(ret), KPC(new_ctx), K(*this),
K(old_node));
}
} else {// this is recover path, copy ctx
if (OB_FAIL(mds::MdsFactory::deep_copy_buffer_ctx(trans_id_,
*(old_node.buffer_ctx_node_.get_ctx()),
new_ctx))) {
TRANS_LOG(WARN, "fail to deep copy buffer ctx", KR(ret), KPC(new_ctx), K(*this), K(old_node));
} else { // this is recover path, copy ctx
if (OB_FAIL(mds::MdsFactory::deep_copy_buffer_ctx(
trans_id_, *(old_node.buffer_ctx_node_.get_ctx()), new_ctx))) {
TRANS_LOG(WARN, "fail to deep copy buffer ctx", KR(ret), KPC(new_ctx), K(*this),
K(old_node));
}
}
}
@ -6114,20 +6114,26 @@ int ObPartTransCtx::deep_copy_mds_array_(const ObTxBufferNodeArray &mds_array,
mds::BufferCtx *new_ctx = nullptr;
if (OB_FAIL(process_with_buffer_ctx(node, new_ctx))) {
TRANS_LOG(WARN, "process_with_buffer_ctx failed", KR(ret), K(*this));
} else if (OB_FAIL(new_node.init(node.get_data_source_type(),
data,
node.mds_base_scn_,
} else if (OB_FAIL(new_node.init(node.get_data_source_type(), data, node.mds_base_scn_,
new_ctx))) {
mtl_free(data.ptr());
if (OB_NOT_NULL(new_ctx)) {
MTL(mds::ObTenantMdsService*)->get_buffer_ctx_allocator().free(new_ctx);
MTL(mds::ObTenantMdsService *)->get_buffer_ctx_allocator().free(new_ctx);
new_ctx = nullptr;
}
TRANS_LOG(WARN, "init new node failed", KR(ret), K(*this));
} else if (ObTxBufferNode::is_valid_register_no(node.get_register_no())
&& OB_FAIL(new_node.set_mds_register_no(node.get_register_no()))) {
mtl_free(data.ptr());
if (OB_NOT_NULL(new_ctx)) {
MTL(mds::ObTenantMdsService *)->get_buffer_ctx_allocator().free(new_ctx);
new_ctx = nullptr;
}
TRANS_LOG(WARN, "set mds register_no failed", KR(ret), K(*this));
} else if (OB_FAIL(tmp_buf_arr.push_back(new_node))) {
mtl_free(data.ptr());
if (OB_NOT_NULL(new_ctx)) {
MTL(mds::ObTenantMdsService*)->get_buffer_ctx_allocator().free(new_ctx);
MTL(mds::ObTenantMdsService *)->get_buffer_ctx_allocator().free(new_ctx);
new_ctx = nullptr;
}
TRANS_LOG(WARN, "push multi source data failed", KR(ret), K(*this));
@ -6161,15 +6167,55 @@ int ObPartTransCtx::deep_copy_mds_array_(const ObTxBufferNodeArray &mds_array,
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < tmp_buf_arr.count(); ++i) {
const int64_t tmp_buf_array_cnt = tmp_buf_arr.count();
const int64_t ctx_mds_array_cnt = exec_info_.multi_data_source_.count();
int64_t max_register_no_in_ctx = 0;
if (exec_info_.multi_data_source_.count() > 0) {
max_register_no_in_ctx =
exec_info_.multi_data_source_[ctx_mds_array_cnt - 1].get_register_no();
}
int64_t ctx_array_start_index = 0;
for (int64_t i = 0; OB_SUCC(ret) && i < tmp_buf_array_cnt; ++i) {
if (is_follower_()) {
tmp_buf_arr[i].set_submitted();
tmp_buf_arr[i].set_synced();
}
if (OB_FAIL(exec_info_.multi_data_source_.push_back(tmp_buf_arr[i]))) {
TRANS_LOG(WARN, "push back exec_info_.multi_data_source_ failed", K(ret));
} else if (OB_FAIL(incremental_array.push_back(tmp_buf_arr[i]))) {
TRANS_LOG(WARN, "push back incremental_array failed", K(ret));
if (ObTxBufferNode::is_valid_register_no(max_register_no_in_ctx)
&& ObTxBufferNode::is_valid_register_no(tmp_buf_arr[i].get_register_no())
&& tmp_buf_arr[i].get_register_no() <= max_register_no_in_ctx) {
while ((!ObTxBufferNode::is_valid_register_no(
exec_info_.multi_data_source_[ctx_array_start_index].get_register_no())
|| tmp_buf_arr[i].get_register_no()
> exec_info_.multi_data_source_[ctx_array_start_index].get_register_no())
&& ctx_array_start_index < ctx_mds_array_cnt) {
ctx_array_start_index++;
}
if (tmp_buf_arr[i].get_register_no()
== exec_info_.multi_data_source_[ctx_array_start_index].get_register_no()) {
mtl_free(tmp_buf_arr[i].data_.ptr());
tmp_buf_arr[i].buffer_ctx_node_.destroy_ctx();
if (OB_FAIL(incremental_array.push_back(
exec_info_.multi_data_source_[ctx_array_start_index]))) {
TRANS_LOG(WARN, "push back incremental_array failed", K(ret));
}
TRANS_LOG(INFO, "filter mds node replay by the register_no", K(ret), K(trans_id_),
K(ls_id_), K(i), K(ctx_array_start_index), K(tmp_buf_arr[i].get_register_no()),
K(exec_info_.multi_data_source_[ctx_array_start_index]));
} else {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "we can not find a mds node in ctx with the same register_no", K(ret),
K(i), K(ctx_array_start_index), K(tmp_buf_arr[i].get_register_no()),
K(exec_info_.multi_data_source_[ctx_array_start_index]), KPC(this));
}
} else {
if (OB_FAIL(exec_info_.multi_data_source_.push_back(tmp_buf_arr[i]))) {
TRANS_LOG(WARN, "push back exec_info_.multi_data_source_ failed", K(ret));
} else if (OB_FAIL(incremental_array.push_back(tmp_buf_arr[i]))) {
TRANS_LOG(WARN, "push back incremental_array failed", K(ret));
}
}
}
}
@ -6550,6 +6596,8 @@ int ObPartTransCtx::register_multi_data_source(const ObTxDataSourceType data_sou
ret = OB_LOG_TOO_LARGE;
TRANS_LOG(WARN, "too large mds buf node", K(ret), K(tmp_array.get_serialize_size()));
//#endif
} else if (OB_FAIL(mds_cache_.try_recover_max_register_no(exec_info_.multi_data_source_))) {
TRANS_LOG(WARN, "recover max register no failed", K(ret), K(mds_cache_), KPC(this));
} else if (OB_FAIL(mds_cache_.insert_mds_node(node))) {
TRANS_LOG(WARN, "register multi source data failed", KR(ret), K(data_source_type),
K(*this));

View File

@ -24,6 +24,7 @@ void ObTxMDSCache::reset()
mds_list_.reset();
submitted_iterator_ = mds_list_.end(); // ObTxBufferNodeList::iterator();
need_retry_submit_mds_ = false;
max_register_no_ = 0;
}
void ObTxMDSCache::destroy()
@ -39,13 +40,31 @@ void ObTxMDSCache::destroy()
}
}
int ObTxMDSCache::insert_mds_node(const ObTxBufferNode &buf_node)
int ObTxMDSCache::try_recover_max_register_no(const ObTxBufferNodeArray &node_array)
{
int ret = OB_SUCCESS;
int64_t array_cnt = node_array.count();
if (array_cnt > 0) {
int64_t max_register_no = node_array[array_cnt - 1].get_register_no();
if (max_register_no > max_register_no_) {
max_register_no_ = max_register_no;
}
}
return ret;
}
int ObTxMDSCache::insert_mds_node(ObTxBufferNode &buf_node)
{
int ret = OB_SUCCESS;
if (!buf_node.is_valid()) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "insert MDS buf node", K(ret));
} else if (OB_FALSE_IT(max_register_no_++)) {
//do nothing
} else if (OB_FAIL(buf_node.set_mds_register_no(max_register_no_))) {
TRANS_LOG(WARN, "set mds register no failed", K(ret), K(buf_node), KPC(this));
} else if (OB_FAIL(mds_list_.push_back(buf_node))) {
TRANS_LOG(WARN, "push back MDS buf node", K(ret));
} else {
@ -63,6 +82,7 @@ int ObTxMDSCache::rollback_last_mds_node()
if (OB_FAIL(mds_list_.pop_back())) {
TRANS_LOG(WARN, "pop back last node failed", K(ret));
} else {
TRANS_LOG(INFO, "rollback the last mds node", K(ret), K(buf_node), KPC(this));
share::mtl_free(buf_node.get_ptr());
buf_node.get_buffer_ctx_node().destroy_ctx();
}
@ -309,7 +329,18 @@ int ObTxMDSRange::move_from_cache_to_arr(ObTxMDSCache &mds_cache,
TRANS_LOG(WARN, "empty range in move function", K(ret), KPC(tx_ctx_));
} else {
for (int64_t i = 0; i < range_array_.count() && OB_SUCC(ret); i++) {
if (OB_FAIL(mds_cache.earse_from_cache(range_array_[i]))) {
if (!ObTxBufferNode::is_valid_register_no(range_array_[i].get_register_no())) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(ERROR, "invalid register no for a mds node in cache", K(ret), K(i),
K(range_array_[i]));
} else if (!mds_durable_arr.empty()
&& ObTxBufferNode::is_valid_register_no(
mds_durable_arr[mds_durable_arr.count() - 1].get_register_no())
&& range_array_[i].get_register_no()
<= mds_durable_arr[mds_durable_arr.count() - 1].get_register_no()) {
TRANS_LOG(ERROR, "invalid smaller register no", K(ret), K(i), K(range_array_[i]),
K(mds_durable_arr[mds_durable_arr.count() - 1]));
} else if (OB_FAIL(mds_cache.earse_from_cache(range_array_[i]))) {
TRANS_LOG(WARN, "earse from mds cache failed", K(ret), K(range_array_[i]), K(mds_cache),
K(mds_durable_arr));
} else if (OB_FALSE_IT(range_array_[i].set_synced())) {
@ -324,70 +355,6 @@ int ObTxMDSRange::move_from_cache_to_arr(ObTxMDSCache &mds_cache,
return ret;
}
// int ObTxMDSRange::move_to(ObTxBufferNodeArray &tx_buffer_node_arr)
// {
// int ret = OB_SUCCESS;
//
// ObTxBufferNodeList::iterator del_iterator, next_iterator;
//
// if (OB_ISNULL(list_ptr_)) {
// ret = OB_NOT_INIT;
// TRANS_LOG(WARN, "MDS range is not init", K(ret));
// } else if (start_iter_ == list_ptr_->end() || 0 == count_) {
// // empty MDS range
// TRANS_LOG(WARN, "use empty mds range when move", K(this), K(lbt()));
// } else {
// int64_t i = 0;
// del_iterator = list_ptr_->end();
// next_iterator = start_iter_;
//
// for (i = 0; i < count_ && OB_SUCC(ret) && next_iterator != list_ptr_->end(); i++) {
// del_iterator = next_iterator;
// next_iterator++;
//
// if (!del_iterator->is_submitted()) {
// ret = OB_ERR_UNEXPECTED;
// TRANS_LOG(WARN, "try to move unsubmitted MDS node", K(ret));
// } else if (OB_FALSE_IT(del_iterator->set_synced())) {
// TRANS_LOG(WARN, "set synced MDS node failed", K(*del_iterator));
// } else if (OB_FAIL(tx_buffer_node_arr.push_back(*del_iterator))) {
// TRANS_LOG(WARN, "push back MDS node failed", K(ret));
// } else if (OB_FAIL(list_ptr_->erase(del_iterator))) {
// TRANS_LOG(WARN, "earse from MDS list failed", K(ret));
// }
// }
// }
//
// return ret;
// }
//
// int ObTxMDSRange::copy_to(ObTxBufferNodeArray &tx_buffer_node_arr) const
// {
// int ret = OB_SUCCESS;
//
// ObTxBufferNodeList::iterator next_iterator;
//
// if (OB_ISNULL(list_ptr_)) {
// ret = OB_NOT_INIT;
// TRANS_LOG(WARN, "MDS range is not init", K(ret));
// } else if (start_iter_ == list_ptr_->end() || 0 == count_) {
// // empty MDS range
// TRANS_LOG(WARN, "use empty mds range when copy", K(this), K(lbt()));
// } else {
// int64_t i = 0;
// next_iterator = start_iter_;
//
// for (i = 0; i < count_ && OB_SUCC(ret) && next_iterator != list_ptr_->end();
// i++, next_iterator++) {
// if (OB_FAIL(tx_buffer_node_arr.push_back(*next_iterator))) {
// TRANS_LOG(WARN, "push back MDS node failed", K(ret));
// }
// }
// }
//
// return ret;
// }
//
int ObTxMDSRange::range_submitted(ObTxMDSCache &cache)
{
int ret = OB_SUCCESS;

View File

@ -33,7 +33,8 @@ public:
void reset();
void destroy();
int insert_mds_node(const ObTxBufferNode &buf_node);
int try_recover_max_register_no(const ObTxBufferNodeArray & node_array);
int insert_mds_node(ObTxBufferNode &buf_node);
int rollback_last_mds_node();
int fill_mds_log(ObPartTransCtx *ctx,
ObTxMultiDataSourceLog &mds_log,
@ -61,10 +62,11 @@ public:
void set_need_retry_submit_mds(bool need_retry) { need_retry_submit_mds_ = need_retry; };
bool need_retry_submit_mds() { return need_retry_submit_mds_; }
TO_STRING_KV(K(unsubmitted_size_), K(mds_list_.size()));
TO_STRING_KV(K(unsubmitted_size_), K(mds_list_.size()), K(max_register_no_));
private:
// TransModulePageAllocator allocator_;
uint64_t max_register_no_;
bool need_retry_submit_mds_;
int64_t unsubmitted_size_;
ObTxBufferNodeList mds_list_;