[PALF] fix get_log_barrier bug add additional mittest cases

This commit is contained in:
BinChenn
2023-05-26 09:41:11 +00:00
committed by ob-robot
parent 4bea8f35ec
commit 0a0a5671ba
13 changed files with 1084 additions and 49 deletions

View File

@ -2,7 +2,8 @@ set(OBLOGSVR_TEST_SRCS
env/ob_simple_log_server.cpp
env/ob_simple_arb_server.cpp
env/ob_simple_log_cluster_testbase.cpp
env/ob_simple_log_cluster_env.cpp)
env/ob_simple_log_cluster_env.cpp
env/mock_election.cpp)
add_library(oblogsvr_test ${OBLOGSVR_TEST_SRCS})
target_include_directories(oblogsvr_test PUBLIC
@ -33,6 +34,8 @@ ob_unittest_clog(test_ob_simple_log_data_intergrity test_ob_simple_log_data_inte
ob_unittest_clog(test_ob_simple_log_throttling test_ob_simple_log_throttling.cpp)
ob_unittest_clog(test_ob_simple_log_throttling_member_change test_ob_simple_log_throttling_member_change.cpp)
ob_unittest_clog(test_ob_simple_log_throttling_arb test_ob_simple_log_throttling_arb.cpp)
ob_unittest_clog(test_ob_simple_log_config_change_mock_ele test_ob_simple_log_config_change_mock_ele.cpp)
ob_unittest_clog(test_ob_simple_log_arb_mock_ele test_ob_simple_log_arb_mock_ele.cpp)
add_subdirectory(archiveservice)

170
mittest/logservice/env/mock_election.cpp vendored Normal file
View File

@ -0,0 +1,170 @@
/**
* Copyright (c)2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#include "mock_election.h"
namespace oceanbase
{
using namespace palf::election;
namespace unittest
{
MockElection::MockElection()
: id_(0),
self_(),
role_(common::ObRole::FOLLOWER),
epoch_(0),
leader_(),
is_inited_(false) { }
MockElection::MockElection(const int64_t id, const common::ObAddr &self)
: id_(id),
self_(self),
role_(common::ObRole::FOLLOWER),
epoch_(0),
leader_(),
is_inited_(true) { }
void MockElection::stop()
{
}
int MockElection::init(const int64_t id, const common::ObAddr &self)
{
int ret = OB_SUCCESS;
if (IS_INIT) {
ret = OB_INIT_TWICE;
} else if (id <= 0 || false == self.is_valid()) {
ret = OB_INVALID_ARGUMENT;
} else {
id_ = id;
self_ = self;
is_inited_ = true;
SERVER_LOG(INFO, "MockElection::init success", K(id), K(self));
}
return ret;
}
int MockElection::set_memberlist(const MemberList &new_member_list)
{
int ret = OB_SUCCESS;
UNUSED(new_member_list);
return ret;
}
int MockElection::get_role(common::ObRole &role, int64_t &epoch)const
{
int ret = OB_SUCCESS;
role = role_;
epoch = (common::ObRole::LEADER == role_)? epoch_: 0;
return ret;
}
int MockElection::get_current_leader_likely(common::ObAddr &addr, int64_t &cur_leader_epoch) const
{
int ret = OB_SUCCESS;
addr = leader_;
cur_leader_epoch = epoch_;
return ret;
}
int MockElection::change_leader_to(const common::ObAddr &dest_addr)
{
int ret = OB_SUCCESS;
UNUSED(dest_addr);
return ret;
}
int MockElection::revoke(const RoleChangeReason &reason)
{
int ret = OB_SUCCESS;
UNUSED(reason);
return ret;
}
const common::ObAddr &MockElection::get_self_addr() const
{
return self_;
}
int64_t MockElection::to_string(char *buf, const int64_t buf_len)const
{
UNUSEDx(buf, buf_len);
return 0;
}
int MockElection::set_priority(ElectionPriority *priority)
{
int ret = OB_SUCCESS;
UNUSED(priority);
return ret;
}
int MockElection::reset_priority()
{
int ret = OB_SUCCESS;
return ret;
}
// 处理消息
int MockElection::handle_message(const ElectionPrepareRequestMsg &msg)
{
int ret = OB_SUCCESS;
UNUSED(msg);
return ret;
}
int MockElection::handle_message(const ElectionAcceptRequestMsg &msg)
{
int ret = OB_SUCCESS;
UNUSED(msg);
return ret;
}
int MockElection::handle_message(const ElectionPrepareResponseMsg &msg)
{
int ret = OB_SUCCESS;
UNUSED(msg);
return ret;
}
int MockElection::handle_message(const ElectionAcceptResponseMsg &msg)
{
int ret = OB_SUCCESS;
UNUSED(msg);
return ret;
}
int MockElection::handle_message(const ElectionChangeLeaderMsg &msg)
{
int ret = OB_SUCCESS;
UNUSED(msg);
return ret;
}
int MockElection::set_leader(const common::ObAddr &leader, const int64_t new_epoch = 0)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
} else if (0 > new_epoch) {
ret = OB_INVALID_ARGUMENT;
} else {
role_ = (self_ == leader)? common::ObRole::LEADER: common::ObRole::FOLLOWER;
epoch_ = (0 == new_epoch)? ++epoch_: new_epoch;
leader_ = leader;
}
SERVER_LOG(INFO, "MockElection::set_leader success", K(ret), K_(self), K_(id), K(leader), K(new_epoch),
K(role_), K(epoch_), K(leader_));
return ret;
}
}// unittest
}// oceanbase

61
mittest/logservice/env/mock_election.h vendored Normal file
View File

@ -0,0 +1,61 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#include "logservice/palf/election/interface/election.h"
#include "logservice/palf/palf_handle_impl.h"
namespace oceanbase
{
using namespace palf::election;
namespace unittest
{
class MockElection : public Election, public common::LinkHashValue<palf::LSKey>
{
public:
MockElection();
MockElection(const int64_t id, const common::ObAddr &self);
virtual ~MockElection() { }
int init(const int64_t id, const common::ObAddr &self);
void stop() override final;
// 设置成员列表
int set_memberlist(const MemberList &new_member_list) override final;
// 获取选举当前的角色
int get_role(common::ObRole &role, int64_t &epoch) const override final;
// 如果自己是leader,那么拿到的就是准确的leader,如果自己不是leader,那么拿到lease的owner
int get_current_leader_likely(common::ObAddr &addr,
int64_t &cur_leader_epoch) const override final;
// 供role change service使用
int change_leader_to(const common::ObAddr &dest_addr) override final;
int revoke(const RoleChangeReason &reason) override final;
// 拿本机地址
const common::ObAddr &get_self_addr() const override final;
// 打印日志
int64_t to_string(char *buf, const int64_t buf_len) const override final;
// 设置选举优先级
int set_priority(ElectionPriority *priority) override final;
int reset_priority() override final;
// 处理消息
int handle_message(const ElectionPrepareRequestMsg &msg) override final;
int handle_message(const ElectionAcceptRequestMsg &msg) override final;
int handle_message(const ElectionPrepareResponseMsg &msg) override final;
int handle_message(const ElectionAcceptResponseMsg &msg) override final;
int handle_message(const ElectionChangeLeaderMsg &msg) override final;
int set_leader(const common::ObAddr &leader, const int64_t new_epoch);
private:
int64_t id_;
common::ObAddr self_;
common::ObRole role_;
int64_t epoch_;
common::ObAddr leader_;
bool is_inited_;
};
}// unittest
}// oceanbase

View File

@ -82,6 +82,7 @@ int ObSimpleArbServer::simple_init(const std::string &cluster_name,
arb_opts.self_ = addr;
arb_opts.negotiation_enable_ = 0;
arb_opts.mittest_ = true;
ObMemAttr ele_attr(1, ObNewModIds::OB_ELECTION);
int ret = OB_SUCCESS;
tenant_base_ = OB_NEW(ObTenantBase, "TestBase", node_id);
auto malloc = ObMallocAllocator::get_instance();
@ -117,6 +118,8 @@ int ObSimpleArbServer::simple_init(const std::string &cluster_name,
} else if (OB_FAIL(palf_env_mgr_.init(logserver_dir.c_str(), addr,
srv_network_frame_.get_req_transport()))) {
CLOG_LOG(WARN, "PalfEnvLiteMgr init failed", K(ret), K(addr), K(clog_dir.c_str()));
} else if (OB_FAIL(mock_election_map_.init(ele_attr))) {
SERVER_LOG(ERROR, "mock_election_map_ init fail", K(ret));
} else if (OB_FAIL(timer_.init(lib::TGDefIDs::ArbServerTimer, &palf_env_mgr_))) {
CLOG_LOG(WARN, "timer init failed", K(ret), K(addr), K(clog_dir.c_str()));
} else {

View File

@ -117,6 +117,54 @@ public:
{blacklist_.reset_rpc_loss(src);}
bool is_arb_server() const override final {return true;}
int64_t get_node_id() {return node_id_;}
int create_mock_election(const int64_t palf_id, MockElection *&mock_election) override final
{
int ret = OB_SUCCESS;
mock_election = NULL;
void *buf = NULL;
ObMemAttr attr(1, ObNewModIds::OB_ELECTION);
if (OB_ISNULL(buf = ob_malloc(sizeof(MockElection), attr))) {
ret = OB_ERR_UNEXPECTED;
SERVER_LOG(ERROR, "ob_malloc failed", K(palf_id));
} else if (FALSE_IT(mock_election = new (buf) MockElection)) {
} else if (OB_FAIL(mock_election->init(palf_id, self_))) {
SERVER_LOG(WARN, "mock_election->init failed", K(palf_id), K_(self));
} else if (OB_FAIL(mock_election_map_.insert_and_get(palf::LSKey(palf_id), mock_election))) {
SERVER_LOG(WARN, "create_mock_election failed", K(palf_id));
} else {
SERVER_LOG(INFO, "create_mock_election success", K(palf_id), K_(self), KP(mock_election));
}
if (OB_FAIL(ret) && NULL != mock_election) {
ob_free(mock_election);
mock_election = NULL;
}
return ret;
}
int remove_mock_election(const int64_t palf_id) override final
{
int ret = OB_SUCCESS;
if (OB_FAIL(mock_election_map_.del(palf::LSKey(palf_id))) && OB_ENTRY_NOT_EXIST != ret) {
SERVER_LOG(WARN, "del failed", K(palf_id));
} else {
ret = OB_SUCCESS;
SERVER_LOG(INFO, "remove_mock_election success", K(palf_id), K_(self));
}
return ret;
}
int set_leader(const int64_t palf_id, const common::ObAddr &leader, const int64_t new_epoch = 0)
{
int ret = OB_SUCCESS;
MockElection *mock_election= NULL;
if (OB_FAIL(mock_election_map_.get(palf::LSKey(palf_id), mock_election))) {
SERVER_LOG(WARN, "get failed", K(palf_id));
} else if (OB_FAIL(mock_election->set_leader(leader, new_epoch))) {
SERVER_LOG(WARN, "set_leader failed", K(palf_id), KP(mock_election), K(leader), K(new_epoch));
}
if (OB_NOT_NULL(mock_election)) {
mock_election_map_.revert(mock_election);
}
return ret;
}
public:
int simple_init(const std::string &cluster_name,
const common::ObAddr &addr,
@ -143,6 +191,7 @@ private:
int64_t node_id_;
ObMittestBlacklist blacklist_;
ObFunction<bool(const ObAddr &src)> filter_;
MockElectionMap mock_election_map_;
bool is_inited_;
};
}

View File

@ -212,6 +212,7 @@ int ObSimpleLogClusterTestEnv::delete_paxos_group(const int64_t id)
CLOG_LOG(WARN, "remove_ls failed", K(ret));
break;
}
svr->remove_mock_election(id);
svr->revert_palf_env(palf_env);
}
return ret;
@ -220,14 +221,16 @@ int ObSimpleLogClusterTestEnv::create_paxos_group(const int64_t id, int64_t &lea
{
PalfBaseInfo palf_base_info;
palf_base_info.generate_by_default();
return create_paxos_group(id, palf_base_info, NULL, leader_idx, leader);
const bool with_mock_election = false;
return create_paxos_group(id, palf_base_info, NULL, leader_idx, with_mock_election, leader);
}
int ObSimpleLogClusterTestEnv::create_paxos_group(const int64_t id, palf::PalfLocationCacheCb *loc_cb, int64_t &leader_idx, PalfHandleImplGuard &leader)
{
PalfBaseInfo palf_base_info;
palf_base_info.generate_by_default();
return create_paxos_group(id, palf_base_info, loc_cb, leader_idx, leader);
const bool with_mock_election = false;
return create_paxos_group(id, palf_base_info, loc_cb, leader_idx, with_mock_election, leader);
}
int ObSimpleLogClusterTestEnv::create_paxos_group(const int64_t id, const share::SCN &create_scn, int64_t &leader_idx, PalfHandleImplGuard &leader)
@ -236,7 +239,8 @@ int ObSimpleLogClusterTestEnv::create_paxos_group(const int64_t id, const share:
PalfBaseInfo palf_base_info;
palf_base_info.generate_by_default();
palf_base_info.prev_log_info_.scn_ = create_scn;
return create_paxos_group(id, palf_base_info, NULL, leader_idx, leader);
const bool with_mock_election = false;
return create_paxos_group(id, palf_base_info, NULL, leader_idx, with_mock_election, leader);
}
int ObSimpleLogClusterTestEnv::create_paxos_group(const int64_t id, const LSN &lsn, int64_t &leader_idx, PalfHandleImplGuard &leader)
@ -245,20 +249,28 @@ int ObSimpleLogClusterTestEnv::create_paxos_group(const int64_t id, const LSN &l
PalfBaseInfo palf_base_info;
palf_base_info.generate_by_default();
palf_base_info.curr_lsn_ = lsn;
return create_paxos_group(id, palf_base_info, NULL, leader_idx, leader);
const bool with_mock_election = false;
return create_paxos_group(id, palf_base_info, NULL, leader_idx, with_mock_election, leader);
}
int ObSimpleLogClusterTestEnv::create_paxos_group(const int64_t id, const PalfBaseInfo &palf_base_info, int64_t &leader_idx, PalfHandleImplGuard &leader)
{
return create_paxos_group(id, palf_base_info, NULL, leader_idx, leader);
const bool with_mock_election = false;
return create_paxos_group(id, palf_base_info, NULL, leader_idx, with_mock_election, leader);
}
int ObSimpleLogClusterTestEnv::create_paxos_group(const int64_t id, const PalfBaseInfo &palf_base_info, palf::PalfLocationCacheCb *loc_cb, int64_t &leader_idx, PalfHandleImplGuard &leader)
int ObSimpleLogClusterTestEnv::create_paxos_group(const int64_t id,
const PalfBaseInfo &palf_base_info,
palf::PalfLocationCacheCb *loc_cb,
int64_t &leader_idx,
const bool with_mock_election,
PalfHandleImplGuard &leader)
{
int ret = OB_SUCCESS;
for (auto svr : get_cluster()) {
ObTenantEnv::set_tenant(svr->get_tenant_base());
IPalfHandleImpl* handle = NULL;
MockElection *mock_election = NULL;
share::ObLSID sid(id);
if (svr->get_palf_env() == NULL) {
ret = OB_ERR_UNEXPECTED;
@ -267,6 +279,20 @@ int ObSimpleLogClusterTestEnv::create_paxos_group(const int64_t id, const PalfBa
CLOG_LOG(WARN, "create_palf_handle_impl failed", K(ret), K(id), KPC(svr));
break;
} else {
if (with_mock_election) {
if (OB_FAIL(svr->create_mock_election(id, mock_election))) {
CLOG_LOG(WARN, "create_mock_election failed", K(ret), K(id), KPC(svr));
break;
} else {
common::ObAddr leader_addr;
const ObMemberList &member_list = get_member_list();
member_list.get_server_by_index(0, leader_addr);
PalfHandleImpl *palf_handle_impl = dynamic_cast<PalfHandleImpl*>(handle);
mock_election->set_leader(leader_addr, 1);
palf_handle_impl->state_mgr_.election_ = mock_election;
palf_handle_impl->config_mgr_.election_ = mock_election;
}
}
handle->set_location_cache_cb(loc_cb);
const ObMemberList &member_list = get_member_list();
GlobalLearnerList learner_list;
@ -285,13 +311,35 @@ int ObSimpleLogClusterTestEnv::create_paxos_group(const int64_t id, const PalfBa
return ret;
}
int ObSimpleLogClusterTestEnv::create_paxos_group_with_mock_election(
const int64_t id,
int64_t &leader_idx,
PalfHandleImplGuard &leader)
{
PalfBaseInfo palf_base_info;
palf_base_info.generate_by_default();
const bool with_mock_election = true;
return create_paxos_group(id, palf_base_info, NULL, leader_idx, with_mock_election, leader);
}
int ObSimpleLogClusterTestEnv::create_paxos_group_with_arb(
const int64_t id,
int64_t &arb_replica_idx,
int64_t &leader_idx,
PalfHandleImplGuard &leader)
{
return create_paxos_group_with_arb(id, NULL, arb_replica_idx, leader_idx, leader);
const bool with_mock_election = false;
return create_paxos_group_with_arb(id, NULL, arb_replica_idx, leader_idx, with_mock_election, leader);
}
int ObSimpleLogClusterTestEnv::create_paxos_group_with_arb_mock_election(
const int64_t id,
int64_t &arb_replica_idx,
int64_t &leader_idx,
PalfHandleImplGuard &leader)
{
const bool with_mock_election = true;
return create_paxos_group_with_arb(id, NULL, arb_replica_idx, leader_idx, with_mock_election, leader);
}
int ObSimpleLogClusterTestEnv::create_paxos_group_with_arb(
@ -299,6 +347,7 @@ int ObSimpleLogClusterTestEnv::create_paxos_group_with_arb(
palf::PalfLocationCacheCb *loc_cb,
int64_t &arb_replica_idx,
int64_t &leader_idx,
const bool with_mock_election,
PalfHandleImplGuard &leader)
{
int ret = OB_SUCCESS;
@ -318,43 +367,60 @@ int ObSimpleLogClusterTestEnv::create_paxos_group_with_arb(
if (-1 == arb_replica_idx) {
ret = OB_NOT_SUPPORTED;
PALF_LOG(ERROR, "there is not any arb server");
} else if (OB_FAIL(member_list.get_member_by_index(arb_replica_idx, arb_replica))) {
PALF_LOG(ERROR, "get_member_by_index failed", K(ret), K(arb_replica_idx));
} else {
if (OB_FAIL(member_list.get_member_by_index(arb_replica_idx, arb_replica))) {
PALF_LOG(ERROR, "get_member_by_index failed", K(ret), K(arb_replica_idx));
} else {
member_list.remove_member(arb_replica);
for (auto svr : get_cluster()) {
ObTenantEnv::set_tenant(svr->get_tenant_base());
IPalfHandleImpl *handle = NULL;
share::ObLSID sid(id);
if (svr->get_palf_env() == NULL) {
ret = OB_ERR_UNEXPECTED;
CLOG_LOG(ERROR, "svr is null", KPC(svr));
break;
} else if (OB_FAIL(svr->get_palf_env()->create_palf_handle_impl(id, palf::AccessMode::APPEND, palf_base_info, handle))) {
CLOG_LOG(WARN, "create_palf_handle_impl failed", K(ret), K(id), KPC(svr));
} else if (!svr->is_arb_server() && OB_FAIL(handle->set_initial_member_list(member_list, arb_replica, get_member_cnt()-1, learner_list))) {
CLOG_LOG(ERROR, "set_initial_member_list failed", K(ret), K(id), KPC(svr));
} else {
handle->set_location_cache_cb(loc_cb);
handle->set_paxos_member_region_map(get_member_region_map());
CLOG_LOG(INFO, "set_initial_member_list success", K(member_list));
}
if (NULL == svr->get_palf_env()) {
ret = OB_ERR_UNEXPECTED;
ObSimpleLogServer *log_p = dynamic_cast<ObSimpleLogServer*>(svr);
ObSimpleArbServer *arb_p = dynamic_cast<ObSimpleArbServer*>(svr);
CLOG_LOG(ERROR, "palf env is nullptr", K(ret), K(svr->is_arb_server()), KPC(svr), KP(log_p), K(arb_p), KPC(log_p), KPC(arb_p));
} else {
}
if (NULL != handle) {
svr->get_palf_env()->revert_palf_handle_impl(handle);
member_list.remove_member(arb_replica);
for (auto svr : get_cluster()) {
ObTenantEnv::set_tenant(svr->get_tenant_base());
IPalfHandleImpl *handle = NULL;
MockElection *mock_election = NULL;
share::ObLSID sid(id);
if (svr->get_palf_env() == NULL) {
ret = OB_ERR_UNEXPECTED;
CLOG_LOG(ERROR, "svr is null", KPC(svr));
break;
} else if (OB_FAIL(svr->get_palf_env()->create_palf_handle_impl(id, palf::AccessMode::APPEND, palf_base_info, handle))) {
CLOG_LOG(WARN, "create_palf_handle_impl failed", K(ret), K(id), KPC(svr));
} else if (!svr->is_arb_server() && OB_FAIL(handle->set_initial_member_list(member_list, arb_replica, get_member_cnt()-1, learner_list))) {
CLOG_LOG(ERROR, "set_initial_member_list failed", K(ret), K(id), KPC(svr));
} else {
common::ObAddr leader_addr;
member_list.get_server_by_index(0, leader_addr);
if (with_mock_election) {
if (OB_FAIL(svr->create_mock_election(id, mock_election))) {
CLOG_LOG(WARN, "create_mock_election failed", K(ret), K(id), KPC(svr));
break;
} else if (false == svr->is_arb_server()) {
PalfHandleImpl *palf_handle_impl = dynamic_cast<PalfHandleImpl*>(handle);
mock_election->set_leader(leader_addr, 1);
palf_handle_impl->state_mgr_.election_ = mock_election;
palf_handle_impl->config_mgr_.election_ = mock_election;
} else {
palflite::PalfHandleLite *palf_handle_lite = dynamic_cast<palflite::PalfHandleLite*>(handle);
mock_election->set_leader(leader_addr, 1);
palf_handle_lite->state_mgr_.election_ = mock_election;
palf_handle_lite->config_mgr_.election_ = mock_election;
}
}
handle->set_location_cache_cb(loc_cb);
handle->set_paxos_member_region_map(get_member_region_map());
CLOG_LOG(INFO, "set_initial_member_list success", K(member_list));
}
if (OB_SUCC(ret)) {
ret = get_leader(id, leader, leader_idx);
ObTenantEnv::set_tenant(get_cluster()[leader_idx]->get_tenant_base());
if (NULL == svr->get_palf_env()) {
ret = OB_ERR_UNEXPECTED;
ObSimpleLogServer *log_p = dynamic_cast<ObSimpleLogServer*>(svr);
ObSimpleArbServer *arb_p = dynamic_cast<ObSimpleArbServer*>(svr);
CLOG_LOG(ERROR, "palf env is nullptr", K(ret), K(svr->is_arb_server()), KPC(svr), KP(log_p), K(arb_p), KPC(log_p), KPC(arb_p));
} else {
}
if (NULL != handle) {
svr->get_palf_env()->revert_palf_handle_impl(handle);
}
}
if (OB_SUCC(ret)) {
ret = get_leader(id, leader, leader_idx);
ObTenantEnv::set_tenant(get_cluster()[leader_idx]->get_tenant_base());
}
}
return ret;

View File

@ -72,6 +72,17 @@ class MockLSAdapter;
ObClusterVersion::get_instance().update_data_version(DATA_CURRENT_VERSION); \
return RUN_ALL_TESTS();
#define EXPECT_UNTIL_EQ(x, y) while(!(x == y)) \
{ usleep(500); \
SERVER_LOG(INFO, "EXPECT_UNTIL_EQ WAIT", \
"file", oceanbase::common::occam::get_file_name_without_dir(__FILE__), \
"line", __LINE__); }
#define EXPECT_UNTIL_NE(x, y) while(x == y) \
{ usleep(500); \
SERVER_LOG(INFO, "EXPECT_UNTIL_NE WAIT", \
"file", oceanbase::common::occam::get_file_name_without_dir(__FILE__), \
"line", __LINE__); }
void init_gtest_output(std::string &gtest_log_name);
int generate_data(char *&buf, int buf_len, int &real_data_size, const int wanted_data_size);
int generate_specifice_size_data(char *&buf, int buf_len, int wanted_data_size);
@ -141,10 +152,26 @@ public:
int create_paxos_group(const int64_t id, const share::SCN &create_scn, int64_t &leader_idx, PalfHandleImplGuard &leader);
int create_paxos_group(const int64_t id, const LSN &lsn, int64_t &leader_idx, PalfHandleImplGuard &leader);
int create_paxos_group(const int64_t id, const PalfBaseInfo &info, int64_t &leader_idx, PalfHandleImplGuard &leader);
int create_paxos_group(const int64_t id, const PalfBaseInfo &info, palf::PalfLocationCacheCb *loc_cb, int64_t &leader_idx, PalfHandleImplGuard &leader);
int create_paxos_group(const int64_t id,
const PalfBaseInfo &info,
palf::PalfLocationCacheCb *loc_cb,
int64_t &leader_idx,
const bool with_mock_election,
PalfHandleImplGuard &leader);
int create_paxos_group_with_arb(const int64_t id, int64_t &arb_replica_idx, int64_t &leader_idx, PalfHandleImplGuard &leader);
int create_paxos_group_with_arb(const int64_t id, palf::PalfLocationCacheCb *loc_cb, int64_t &arb_replica_idx,
int64_t &leader_idx, PalfHandleImplGuard &leader);
int create_paxos_group_with_arb(const int64_t id,
palf::PalfLocationCacheCb *loc_cb,
int64_t &arb_replica_idx,
int64_t &leader_idx,
const bool with_mock_election,
PalfHandleImplGuard &leader);
int create_paxos_group_with_mock_election(const int64_t id,
int64_t &leader_idx,
PalfHandleImplGuard &leader);
int create_paxos_group_with_arb_mock_election(const int64_t id,
int64_t &arb_replica_idx,
int64_t &leader_idx,
PalfHandleImplGuard &leader);
virtual int delete_paxos_group(const int64_t id);
virtual int update_disk_options(const int64_t server_id, const int64_t log_block_number);
virtual int restart_paxos_groups();

View File

@ -287,12 +287,15 @@ int ObSimpleLogServer::init_log_service_()
std::string clog_dir = clog_dir_ + "/tenant_1";
allocator_ = OB_NEW(ObTenantMutilAllocator, "TestBase", node_id_);
ObMemAttr attr(1, "SimpleLog");
ObMemAttr ele_attr(1, ObNewModIds::OB_ELECTION);
net_keepalive_ = MTL_NEW(MockNetKeepAliveAdapter, "SimpleLog");
if (OB_FAIL(net_keepalive_->init(&deliver_))) {
} else if (OB_FAIL(log_service_.init(opts, clog_dir.c_str(), addr_, allocator_, transport_, &ls_service_,
&location_service_, &reporter_, &log_block_pool_, &sql_proxy_, net_keepalive_))) {
SERVER_LOG(ERROR, "init_log_service_ fail", K(ret));
} else if (OB_FAIL(mock_election_map_.init(ele_attr))) {
SERVER_LOG(ERROR, "mock_election_map_ init fail", K(ret));
} else {
palf_env_ = log_service_.get_palf_env();
palf_env_->palf_env_impl_.log_rpc_.tenant_id_ = OB_SERVER_TENANT_ID;

View File

@ -12,6 +12,8 @@
#pragma once
#include "lib/hash/ob_hashset.h"
#include "lib/hash/ob_hashset.h"
#include "lib/hash/ob_linear_hash_map.h"
#include "lib/ob_errno.h"
#include "lib/thread/ob_simple_thread_pool.h"
#include "lib/thread/thread_mgr_interface.h"
@ -30,6 +32,7 @@
#include "logservice/palf/log_rpc_processor.h"
#include "logservice/palf/palf_env.h"
#include "logservice/ob_arbitration_service.h"
#include "mock_election.h"
#include "mock_ob_locality_manager.h"
#include "mock_ob_meta_reporter.h"
#include "lib/net/ob_addr.h"
@ -160,6 +163,33 @@ private:
int64_t node_id_;
};
class MockElectionAlloc
{
public:
typedef common::LinkHashNode<palf::LSKey> Node;
static MockElection *alloc_value() { return NULL; }
static void free_value(MockElection *mock_election)
{
ob_free(mock_election);
mock_election = NULL;
}
static Node *alloc_node(MockElection *val)
{
UNUSED(val);
ObMemAttr attr(1, ObNewModIds::OB_ELECTION);
Node* node = (Node*)ob_malloc(sizeof(Node), attr);
new(node) Node();
return node;
}
static void free_node(Node *node)
{
node->~Node();
ob_free(node);
node = NULL;
}
};
typedef common::ObLinkHashMap<palf::LSKey, MockElection, MockElectionAlloc> MockElectionMap;
class ObISimpleLogServer
{
public:
@ -190,6 +220,9 @@ public:
virtual int get_palf_env(PalfEnv *&palf_env) = 0;
virtual bool is_arb_server() const {return false;};
virtual int64_t get_node_id() = 0;
virtual int create_mock_election(const int64_t palf_id, MockElection *&mock_election) = 0;
virtual int remove_mock_election(const int64_t palf_id) = 0;
virtual int set_leader(const int64_t palf_id, const common::ObAddr &leader, const int64_t new_epoch = 0) = 0;
DECLARE_PURE_VIRTUAL_TO_STRING;
};
@ -249,6 +282,54 @@ public:
{ deliver_.set_rpc_loss(src, loss_rate); }
void reset_rpc_loss(const ObAddr &src) override final
{ deliver_.reset_rpc_loss(src); }
int create_mock_election(const int64_t palf_id, MockElection *&mock_election) override final
{
int ret = OB_SUCCESS;
mock_election = NULL;
void *buf = NULL;
ObMemAttr attr(1, ObNewModIds::OB_ELECTION);
if (OB_ISNULL(buf = ob_malloc(sizeof(MockElection), attr))) {
ret = OB_ERR_UNEXPECTED;
SERVER_LOG(ERROR, "ob_malloc failed", K(palf_id));
} else if (FALSE_IT(mock_election = new (buf) MockElection)) {
} else if (OB_FAIL(mock_election->init(palf_id, addr_))) {
SERVER_LOG(WARN, "mock_election->init failed", K(palf_id), K_(addr));
} else if (OB_FAIL(mock_election_map_.insert_and_get(palf::LSKey(palf_id), mock_election))) {
SERVER_LOG(WARN, "create_mock_election failed", K(palf_id));
} else {
SERVER_LOG(INFO, "create_mock_election success", K(palf_id), K_(addr), KP(mock_election));
}
if (OB_FAIL(ret) && NULL != mock_election) {
ob_free(mock_election);
mock_election = NULL;
}
return ret;
}
int remove_mock_election(const int64_t palf_id) override final
{
int ret = OB_SUCCESS;
if (OB_FAIL(mock_election_map_.del(palf::LSKey(palf_id))) && OB_ENTRY_NOT_EXIST != ret) {
SERVER_LOG(WARN, "del failed", K(palf_id));
} else {
ret = OB_SUCCESS;
SERVER_LOG(INFO, "remove_mock_election success", K(palf_id), K_(addr));
}
return ret;
}
int set_leader(const int64_t palf_id, const common::ObAddr &leader, const int64_t new_epoch = 0)
{
int ret = OB_SUCCESS;
MockElection *mock_election= NULL;
if (OB_FAIL(mock_election_map_.get(palf::LSKey(palf_id), mock_election))) {
SERVER_LOG(WARN, "get failed", K(palf_id));
} else if (OB_FAIL(mock_election->set_leader(leader, new_epoch))) {
SERVER_LOG(WARN, "set_leader failed", K(palf_id), KP(mock_election), K(leader), K(new_epoch));
}
if (OB_NOT_NULL(mock_election)) {
mock_election_map_.revert(mock_election);
}
return ret;
}
TO_STRING_KV(K_(node_id), K_(addr), KP(palf_env_));
protected:
@ -285,6 +366,7 @@ private:
MockNetKeepAliveAdapter *net_keepalive_;
ObSrvRpcProxy srv_proxy_;
logservice::coordinator::ObFailureDetector detector_;
MockElectionMap mock_election_map_;
};
} // end unittest

View File

@ -0,0 +1,303 @@
// Copyright (c) 2021 OceanBase
// OceanBase is licensed under Mulan PubL v2.
// You can use this software according to the terms and conditions of the Mulan PubL v2.
// You may obtain a copy of Mulan PubL v2 at:
// http://license.coscl.org.cn/MulanPubL-2.0
// THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
// EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
// MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
// See the Mulan PubL v2 for more details.
#include <cstdio>
#include <gtest/gtest.h>
#include <signal.h>
#define private public
#include "env/ob_simple_log_cluster_env.h"
#undef private
const std::string TEST_NAME = "arb_mock_ele";
using namespace oceanbase::common;
using namespace oceanbase;
namespace oceanbase
{
using namespace logservice;
namespace logservice
{
void ObArbitrationService::update_arb_timeout_()
{
arb_timeout_us_ = 2 * 1000 * 1000L;
if (REACH_TIME_INTERVAL(2 * 1000 * 1000)) {
CLOG_LOG_RET(WARN, OB_ERR_UNEXPECTED, "update_arb_timeout_", K_(self), K_(arb_timeout_us));
}
}
}
namespace unittest
{
class TestObSimpleLogClusterArbMockEleService : public ObSimpleLogClusterTestEnv
{
public:
TestObSimpleLogClusterArbMockEleService() : ObSimpleLogClusterTestEnv()
{}
bool is_degraded(const PalfHandleImplGuard &leader,
const int64_t degraded_server_idx)
{
bool has_degraded = false;
while (!has_degraded) {
common::GlobalLearnerList degraded_learner_list;
leader.palf_handle_impl_->config_mgr_.get_degraded_learner_list(degraded_learner_list);
has_degraded = degraded_learner_list.contains(get_cluster()[degraded_server_idx]->get_addr());
sleep(1);
PALF_LOG(INFO, "wait degrade");
}
return has_degraded;
}
bool is_upgraded(PalfHandleImplGuard &leader, const int64_t palf_id)
{
bool has_upgraded = false;
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, palf_id));
while (!has_upgraded) {
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, palf_id));
common::GlobalLearnerList degraded_learner_list;
leader.palf_handle_impl_->config_mgr_.get_degraded_learner_list(degraded_learner_list);
has_upgraded = (0 == degraded_learner_list.get_member_number());
sleep(1);
PALF_LOG(INFO, "wait upgrade");
}
return has_upgraded;
}
};
int64_t ObSimpleLogClusterTestBase::member_cnt_ = 3;
int64_t ObSimpleLogClusterTestBase::node_cnt_ = 5;
bool ObSimpleLogClusterTestBase::need_add_arb_server_ = true;
std::string ObSimpleLogClusterTestBase::test_name_ = TEST_NAME;
TEST_F(TestObSimpleLogClusterArbMockEleService, switch_leader_during_degrading)
{
int ret = OB_SUCCESS;
const int64_t id = ATOMIC_AAF(&palf_id_, 1);
const int64_t TIMEOUT_US = 10 * 1000 * 1000L;
SET_CASE_LOG_FILE(TEST_NAME, "switch_leader_during_degrading");
PALF_LOG(INFO, "begin test switch_leader_during_degrading", K(id));
{
int64_t leader_idx = 0;
int64_t arb_replica_idx = 0;
PalfHandleImplGuard leader;
std::vector<PalfHandleImplGuard*> palf_list;
EXPECT_EQ(OB_SUCCESS, create_paxos_group_with_arb_mock_election(id, arb_replica_idx, leader_idx, leader));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 200, id));
EXPECT_EQ(OB_SUCCESS, get_cluster_palf_handle_guard(id, palf_list));
const int64_t b_idx = (leader_idx + 1) % 3;
const int64_t c_idx = (leader_idx + 2) % 3;
const common::ObAddr a_addr = get_cluster()[leader_idx]->get_addr();
const common::ObAddr b_addr = get_cluster()[b_idx]->get_addr();
const common::ObAddr c_addr = get_cluster()[c_idx]->get_addr();
PalfHandleImplGuard *a_handle = palf_list[leader_idx];
PalfHandleImplGuard *b_handle = palf_list[b_idx];
dynamic_cast<ObSimpleLogServer*>(get_cluster()[leader_idx])->log_service_.get_arbitration_service()->stop();
dynamic_cast<ObSimpleLogServer*>(get_cluster()[b_idx])->log_service_.get_arbitration_service()->stop();
// 1. A tries to degrade B
int64_t degrade_b_pid = 0;
int64_t degrade_b_ele_epoch = 0;
LogConfigVersion degrade_b_version;
LogConfigChangeArgs degrade_b_args(ObMember(b_addr, 1), 0, DEGRADE_ACCEPTOR_TO_LEARNER);
EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->config_mgr_.start_change_config(degrade_b_pid, degrade_b_ele_epoch, degrade_b_args.type_));
while (0 == leader.palf_handle_impl_->config_mgr_.state_) {
EXPECT_EQ(OB_EAGAIN, leader.palf_handle_impl_->config_mgr_.change_config_(degrade_b_args, degrade_b_pid, degrade_b_ele_epoch, degrade_b_version));
}
const LSN &leader_last_committed_end_lsn = leader.palf_handle_impl_->sw_.committed_end_lsn_;
sleep(2);
EXPECT_EQ(leader_last_committed_end_lsn, leader.palf_handle_impl_->sw_.committed_end_lsn_);
EXPECT_FALSE(leader.palf_handle_impl_->config_mgr_.log_ms_meta_.curr_.log_sync_memberlist_.contains(b_addr));
EXPECT_TRUE(b_handle->palf_handle_impl_->config_mgr_.log_ms_meta_.curr_.log_sync_memberlist_.contains(b_addr));
EXPECT_TRUE(leader.palf_handle_impl_->config_mgr_.config_meta_.curr_.log_sync_memberlist_.contains(b_addr));
EXPECT_TRUE(b_handle->palf_handle_impl_->config_mgr_.config_meta_.curr_.log_sync_memberlist_.contains(b_addr));
// 2. the leader A revokes and takeover again
for (auto srv: get_cluster()) {
const ObAddr addr1(ObAddr::IPV4, "0.0.0.0", 0);
srv->set_leader(id, addr1);
}
EXPECT_UNTIL_EQ(false, a_handle->palf_handle_impl_->state_mgr_.is_leader_active());
// A can not degrades B successfully
EXPECT_EQ(OB_NOT_MASTER, a_handle->palf_handle_impl_->config_mgr_.change_config_(degrade_b_args, degrade_b_pid, degrade_b_ele_epoch, degrade_b_version));
for (auto srv: get_cluster()) {
srv->set_leader(id, a_addr, 3);
}
EXPECT_UNTIL_EQ(true, a_handle->palf_handle_impl_->state_mgr_.is_leader_active());
// 3. leader A degrades B again
leader.reset();
EXPECT_EQ(OB_SUCCESS, get_leader(id, leader, leader_idx));
EXPECT_LE(leader_last_committed_end_lsn, leader.palf_handle_impl_->sw_.committed_end_lsn_);
EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->one_stage_config_change_(degrade_b_args, TIMEOUT_US));
EXPECT_FALSE(leader.palf_handle_impl_->config_mgr_.log_ms_meta_.curr_.log_sync_memberlist_.contains(b_addr));
EXPECT_FALSE(leader.palf_handle_impl_->config_mgr_.config_meta_.curr_.log_sync_memberlist_.contains(b_addr));
// 4. leader A tries to upgrade B
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 200, id));
int64_t upgrade_b_pid = 0;
int64_t upgrade_b_ele_epoch = 0;
LogConfigVersion upgrade_b_version;
LogConfigChangeArgs upgrade_b_args(ObMember(b_addr, 1), 0, UPGRADE_LEARNER_TO_ACCEPTOR);
EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->config_mgr_.start_change_config(upgrade_b_pid, upgrade_b_ele_epoch, upgrade_b_args.type_));
while (0 == leader.palf_handle_impl_->config_mgr_.state_) {
leader.palf_handle_impl_->config_mgr_.change_config_(upgrade_b_args, upgrade_b_pid, upgrade_b_ele_epoch, upgrade_b_version);
}
const LSN &leader_max_flushed_end_lsn = leader.palf_handle_impl_->sw_.max_flushed_end_lsn_;
EXPECT_EQ(leader.palf_handle_impl_->sw_.max_flushed_end_lsn_, b_handle->palf_handle_impl_->sw_.max_flushed_end_lsn_);
// 5. the leader A revokes and takeover again
for (auto srv: get_cluster()) {
const ObAddr addr1(ObAddr::IPV4, "0.0.0.0", 0);
srv->set_leader(id, addr1);
}
EXPECT_UNTIL_EQ(false, a_handle->palf_handle_impl_->state_mgr_.is_leader_active());
// A can not upgrades B successfully
EXPECT_EQ(OB_NOT_MASTER, leader.palf_handle_impl_->config_mgr_.change_config_(upgrade_b_args, upgrade_b_pid, upgrade_b_ele_epoch, upgrade_b_version));
for (auto srv: get_cluster()) {
srv->set_leader(id, a_addr, 3);
}
EXPECT_UNTIL_EQ(true, a_handle->palf_handle_impl_->state_mgr_.is_leader_active());
EXPECT_FALSE(leader.palf_handle_impl_->config_mgr_.log_ms_meta_.curr_.log_sync_memberlist_.contains(b_addr));
EXPECT_FALSE(leader.palf_handle_impl_->config_mgr_.config_meta_.curr_.log_sync_memberlist_.contains(b_addr));
EXPECT_EQ(leader_max_flushed_end_lsn, leader.palf_handle_impl_->sw_.committed_end_lsn_);
revert_cluster_palf_handle_guard(palf_list);
}
delete_paxos_group(id);
PALF_LOG(INFO, "end test switch_leader_during_degrading", K(id));
}
TEST_F(TestObSimpleLogClusterArbMockEleService, switch_leader_to_other_during_degrading)
{
int ret = OB_SUCCESS;
const int64_t id = ATOMIC_AAF(&palf_id_, 1);
const int64_t TIMEOUT_US = 10 * 1000 * 1000L;
SET_CASE_LOG_FILE(TEST_NAME, "switch_leader_to_other_during_degrading");
PALF_LOG(INFO, "begin test switch_leader_to_other_during_degrading", K(id));
{
int64_t leader_idx = 0;
int64_t arb_replica_idx = 0;
PalfHandleImplGuard leader;
std::vector<PalfHandleImplGuard*> palf_list;
EXPECT_EQ(OB_SUCCESS, create_paxos_group_with_arb_mock_election(id, arb_replica_idx, leader_idx, leader));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 200, id));
EXPECT_EQ(OB_SUCCESS, get_cluster_palf_handle_guard(id, palf_list));
const int64_t b_idx = (leader_idx + 1) % 3;
const int64_t c_idx = (leader_idx + 2) % 3;
const common::ObAddr a_addr = get_cluster()[leader_idx]->get_addr();
const common::ObAddr b_addr = get_cluster()[b_idx]->get_addr();
const common::ObAddr c_addr = get_cluster()[c_idx]->get_addr();
PalfHandleImplGuard *a_handle = palf_list[leader_idx];
PalfHandleImplGuard *b_handle = palf_list[b_idx];
dynamic_cast<ObSimpleLogServer*>(get_cluster()[leader_idx])->log_service_.get_arbitration_service()->stop();
dynamic_cast<ObSimpleLogServer*>(get_cluster()[b_idx])->log_service_.get_arbitration_service()->stop();
// 1. A tries to degrade B
int64_t degrade_b_pid = 0;
int64_t degrade_b_ele_epoch = 0;
LogConfigVersion degrade_b_version;
LogConfigChangeArgs degrade_b_args(ObMember(b_addr, 1), 0, DEGRADE_ACCEPTOR_TO_LEARNER);
EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->config_mgr_.start_change_config(degrade_b_pid, degrade_b_ele_epoch, degrade_b_args.type_));
while (0 == leader.palf_handle_impl_->config_mgr_.state_) {
EXPECT_EQ(OB_EAGAIN, leader.palf_handle_impl_->config_mgr_.change_config_(degrade_b_args, degrade_b_pid, degrade_b_ele_epoch, degrade_b_version));
}
const LSN &leader_last_committed_end_lsn = leader.palf_handle_impl_->sw_.committed_end_lsn_;
sleep(2);
EXPECT_EQ(leader_last_committed_end_lsn, leader.palf_handle_impl_->sw_.committed_end_lsn_);
EXPECT_FALSE(leader.palf_handle_impl_->config_mgr_.log_ms_meta_.curr_.log_sync_memberlist_.contains(b_addr));
EXPECT_TRUE(b_handle->palf_handle_impl_->config_mgr_.log_ms_meta_.curr_.log_sync_memberlist_.contains(b_addr));
EXPECT_TRUE(leader.palf_handle_impl_->config_mgr_.config_meta_.curr_.log_sync_memberlist_.contains(b_addr));
EXPECT_TRUE(b_handle->palf_handle_impl_->config_mgr_.config_meta_.curr_.log_sync_memberlist_.contains(b_addr));
// 2. the leader A revokes and B takeover
for (auto srv: get_cluster()) {
const ObAddr addr1(ObAddr::IPV4, "0.0.0.0", 0);
srv->set_leader(id, addr1);
}
EXPECT_UNTIL_EQ(false, a_handle->palf_handle_impl_->state_mgr_.is_leader_active());
// A can not degrades B successfully
EXPECT_EQ(OB_NOT_MASTER, a_handle->palf_handle_impl_->config_mgr_.change_config_(degrade_b_args, degrade_b_pid, degrade_b_ele_epoch, degrade_b_version));
for (auto srv: get_cluster()) {
srv->set_leader(id, b_addr, 3);
}
EXPECT_UNTIL_EQ(true, b_handle->palf_handle_impl_->state_mgr_.is_leader_active());
// 3. leader B degrades A
leader.reset();
EXPECT_EQ(OB_SUCCESS, get_leader(id, leader, leader_idx));
EXPECT_TRUE(leader.palf_handle_impl_->config_mgr_.log_ms_meta_.curr_.log_sync_memberlist_.contains(b_addr));
EXPECT_TRUE(leader.palf_handle_impl_->config_mgr_.log_ms_meta_.curr_.log_sync_memberlist_.contains(b_addr));
EXPECT_TRUE(leader.palf_handle_impl_->config_mgr_.config_meta_.curr_.log_sync_memberlist_.contains(a_addr));
EXPECT_TRUE(leader.palf_handle_impl_->config_mgr_.config_meta_.curr_.log_sync_memberlist_.contains(a_addr));
LogConfigChangeArgs degrade_a_args(ObMember(a_addr, 1), 0, DEGRADE_ACCEPTOR_TO_LEARNER);
EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->one_stage_config_change_(degrade_a_args, TIMEOUT_US));
// 4. leader B tries to upgrade A
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 200, id));
int64_t upgrade_a_pid = 0;
int64_t upgrade_a_ele_epoch = 0;
LogConfigVersion upgrade_a_version;
LogConfigChangeArgs upgrade_a_args(ObMember(a_addr, 1), 0, UPGRADE_LEARNER_TO_ACCEPTOR);
EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->config_mgr_.start_change_config(upgrade_a_pid, upgrade_a_ele_epoch, upgrade_a_args.type_));
while (0 == leader.palf_handle_impl_->config_mgr_.state_) {
leader.palf_handle_impl_->config_mgr_.change_config_(upgrade_a_args, upgrade_a_pid, upgrade_a_ele_epoch, upgrade_a_version);
}
EXPECT_EQ(a_handle->palf_handle_impl_->sw_.max_flushed_end_lsn_, b_handle->palf_handle_impl_->sw_.max_flushed_end_lsn_);
EXPECT_EQ(OB_EAGAIN, leader.palf_handle_impl_->config_mgr_.change_config_(upgrade_a_args, upgrade_a_pid, upgrade_a_ele_epoch, upgrade_a_version));
EXPECT_UNTIL_EQ(a_handle->palf_handle_impl_->config_mgr_.log_ms_meta_.curr_.config_version_, b_handle->palf_handle_impl_->config_mgr_.log_ms_meta_.curr_.config_version_);
// 5. the leader B revokes and A takeover
for (auto srv: get_cluster()) {
const ObAddr addr1(ObAddr::IPV4, "0.0.0.0", 0);
srv->set_leader(id, addr1);
}
// A can not upgrades B successfully
EXPECT_EQ(OB_NOT_MASTER, leader.palf_handle_impl_->config_mgr_.change_config_(upgrade_a_args, upgrade_a_pid, upgrade_a_ele_epoch, upgrade_a_version));
EXPECT_UNTIL_EQ(false, leader.palf_handle_impl_->state_mgr_.is_leader_active());
for (auto srv: get_cluster()) {
srv->set_leader(id, a_addr, 3);
}
EXPECT_UNTIL_EQ(true, a_handle->palf_handle_impl_->state_mgr_.is_leader_active());
EXPECT_TRUE(a_handle->palf_handle_impl_->config_mgr_.log_ms_meta_.curr_.log_sync_memberlist_.contains(b_addr));
EXPECT_TRUE(a_handle->palf_handle_impl_->config_mgr_.config_meta_.curr_.log_sync_memberlist_.contains(b_addr));
revert_cluster_palf_handle_guard(palf_list);
}
delete_paxos_group(id);
PALF_LOG(INFO, "end test switch_leader_to_other_during_degrading", K(id));
}
} // end unittest
} // end oceanbase
int main(int argc, char **argv)
{
RUN_SIMPLE_LOG_CLUSTER_TEST(TEST_NAME);
}

View File

@ -0,0 +1,266 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#include <cstdio>
#include <gtest/gtest.h>
#include <signal.h>
#include <share/scn.h>
#define private public
#include "env/ob_simple_log_cluster_env.h"
#undef private
const std::string TEST_NAME = "config_change_mock_ele";
using namespace oceanbase::common;
using namespace oceanbase;
namespace oceanbase
{
using namespace logservice;
namespace unittest
{
class TestObSimpleLogClusterConfigChangeMockEle : public ObSimpleLogClusterTestEnv
{
public:
TestObSimpleLogClusterConfigChangeMockEle() : ObSimpleLogClusterTestEnv()
{}
};
int64_t ObSimpleLogClusterTestBase::member_cnt_ = 3;
int64_t ObSimpleLogClusterTestBase::node_cnt_ = 7;
std::string ObSimpleLogClusterTestBase::test_name_ = TEST_NAME;
bool ObSimpleLogClusterTestBase::need_add_arb_server_ = false;
MockLocCB loc_cb;
// switch leader after appending config log
TEST_F(TestObSimpleLogClusterConfigChangeMockEle, switch_leader_during_removing_member1)
{
int ret = OB_SUCCESS;
const int64_t id = ATOMIC_AAF(&palf_id_, 1);
SET_CASE_LOG_FILE(TEST_NAME, "switch_leader_during_removing_member1");
PALF_LOG(INFO, "begin test switch_leader_during_removing_member1", K(id));
{
int64_t leader_idx = 0;
PalfHandleImplGuard leader;
std::vector<PalfHandleImplGuard*> palf_list;
EXPECT_EQ(OB_SUCCESS, create_paxos_group_with_mock_election(id, leader_idx, leader));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 200, id));
EXPECT_EQ(OB_SUCCESS, get_cluster_palf_handle_guard(id, palf_list));
const int64_t b_idx = (leader_idx + 1) % 3;
const int64_t c_idx = (leader_idx + 2) % 3;
PalfHandleImplGuard *a_handle = palf_list[leader_idx];
PalfHandleImplGuard *b_handle = palf_list[b_idx];
PalfHandleImplGuard *c_handle = palf_list[c_idx];
// 1. A starts to remove B
int64_t remove_b_pid = 0;
int64_t remove_b_ele_epoch = 0;
LogConfigVersion remove_b_version;
const common::ObAddr b_addr = get_cluster()[b_idx]->get_addr();
const common::ObAddr c_addr = get_cluster()[c_idx]->get_addr();
LogConfigChangeArgs args(common::ObMember(b_addr, 1), 2, REMOVE_MEMBER);
EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->config_mgr_.start_change_config(remove_b_pid, remove_b_ele_epoch, args.type_));
EXPECT_EQ(OB_EAGAIN, leader.palf_handle_impl_->config_mgr_.change_config_(args, remove_b_pid, remove_b_ele_epoch, remove_b_version));
EXPECT_FALSE(leader.palf_handle_impl_->config_mgr_.log_ms_meta_.curr_.log_sync_memberlist_.contains(b_addr));
EXPECT_TRUE(b_handle->palf_handle_impl_->config_mgr_.log_ms_meta_.curr_.log_sync_memberlist_.contains(b_addr));
EXPECT_TRUE(c_handle->palf_handle_impl_->config_mgr_.log_ms_meta_.curr_.log_sync_memberlist_.contains(b_addr));
// 2. B is elected to be the leader
block_net(leader_idx, b_idx);
for (auto srv: get_cluster()) {
srv->set_leader(id, b_addr);
}
EXPECT_UNTIL_EQ(true, b_handle->palf_handle_impl_->state_mgr_.is_leader_active());
// 3. A can not remove B successfully
EXPECT_EQ(OB_NOT_MASTER, leader.palf_handle_impl_->config_mgr_.change_config_(args, remove_b_pid, remove_b_ele_epoch, remove_b_version));
unblock_net(leader_idx, b_idx);
// 4. A's memberlist will be reset
EXPECT_UNTIL_EQ(true, leader.palf_handle_impl_->config_mgr_.log_ms_meta_.curr_.log_sync_memberlist_.contains(b_addr));
leader.reset();
// 5. B remove C
EXPECT_EQ(OB_SUCCESS, get_leader(id, leader, leader_idx));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 100, id));
LogConfigChangeArgs remove_c_args(common::ObMember(c_addr, 1), 2, REMOVE_MEMBER);
int64_t remove_c_pid = 0;
int64_t remove_c_ele_epoch = 0;
LogConfigVersion remove_c_version;
EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->config_mgr_.start_change_config(remove_c_pid, remove_c_ele_epoch, remove_c_args.type_));
EXPECT_EQ(OB_EAGAIN, leader.palf_handle_impl_->config_mgr_.change_config_(remove_c_args, remove_c_pid, remove_c_ele_epoch, remove_c_version));
EXPECT_TRUE(a_handle->palf_handle_impl_->config_mgr_.log_ms_meta_.curr_.log_sync_memberlist_.contains(c_addr));
EXPECT_FALSE(b_handle->palf_handle_impl_->config_mgr_.log_ms_meta_.curr_.log_sync_memberlist_.contains(c_addr));
EXPECT_TRUE(c_handle->palf_handle_impl_->config_mgr_.log_ms_meta_.curr_.log_sync_memberlist_.contains(c_addr));
// 6. the leader B revokes and takeover again
for (auto srv: get_cluster()) {
const ObAddr addr1(ObAddr::IPV4, "0.0.0.0", 0);
srv->set_leader(id, addr1);
}
EXPECT_UNTIL_EQ(false, b_handle->palf_handle_impl_->state_mgr_.is_leader_active());
for (auto srv: get_cluster()) {
srv->set_leader(id, b_addr, 3);
}
EXPECT_UNTIL_EQ(true, b_handle->palf_handle_impl_->state_mgr_.is_leader_active());
// 7. C will be removed successfully
EXPECT_FALSE(a_handle->palf_handle_impl_->config_mgr_.log_ms_meta_.curr_.log_sync_memberlist_.contains(c_addr));
EXPECT_FALSE(b_handle->palf_handle_impl_->config_mgr_.log_ms_meta_.curr_.log_sync_memberlist_.contains(c_addr));
revert_cluster_palf_handle_guard(palf_list);
}
delete_paxos_group(id);
PALF_LOG(INFO, "end test switch_leader_during_removing_member1", K(id));
}
// switch leader after sending config meta to followers
TEST_F(TestObSimpleLogClusterConfigChangeMockEle, switch_leader_during_removing_member2)
{
int ret = OB_SUCCESS;
const int64_t id = ATOMIC_AAF(&palf_id_, 1);
SET_CASE_LOG_FILE(TEST_NAME, "switch_leader_during_removing_member2");
PALF_LOG(INFO, "begin test switch_leader_during_removing_member2", K(id));
{
int64_t leader_idx = 0;
PalfHandleImplGuard leader;
std::vector<PalfHandleImplGuard*> palf_list;
EXPECT_EQ(OB_SUCCESS, create_paxos_group_with_mock_election(id, leader_idx, leader));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 200, id));
EXPECT_EQ(OB_SUCCESS, get_cluster_palf_handle_guard(id, palf_list));
const int64_t b_idx = (leader_idx + 1) % 3;
const int64_t c_idx = (leader_idx + 2) % 3;
PalfHandleImplGuard *b_handle = palf_list[b_idx];
PalfHandleImplGuard *c_handle = palf_list[c_idx];
// 1. A starts to remove B
int64_t remove_b_pid = 0;
int64_t remove_b_ele_epoch = 0;
LogConfigVersion remove_b_version;
const common::ObAddr b_addr = get_cluster()[b_idx]->get_addr();
LogConfigChangeArgs args(common::ObMember(b_addr, 1), 2, REMOVE_MEMBER);
EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->config_mgr_.start_change_config(remove_b_pid, remove_b_ele_epoch, args.type_));
EXPECT_EQ(OB_EAGAIN, leader.palf_handle_impl_->config_mgr_.change_config_(args, remove_b_pid, remove_b_ele_epoch, remove_b_version));
EXPECT_FALSE(leader.palf_handle_impl_->config_mgr_.log_ms_meta_.curr_.log_sync_memberlist_.contains(b_addr));
EXPECT_TRUE(b_handle->palf_handle_impl_->config_mgr_.log_ms_meta_.curr_.log_sync_memberlist_.contains(b_addr));
EXPECT_TRUE(c_handle->palf_handle_impl_->config_mgr_.log_ms_meta_.curr_.log_sync_memberlist_.contains(b_addr));
// 1. A sends config meta to C
block_net(leader_idx, b_idx);
while (-1 == leader.palf_handle_impl_->config_mgr_.last_submit_config_log_time_us_ ||
c_handle->palf_handle_impl_->config_mgr_.log_ms_meta_.curr_.log_sync_memberlist_.contains(b_addr)) {
EXPECT_EQ(OB_EAGAIN, leader.palf_handle_impl_->config_mgr_.change_config_(args, remove_b_pid, remove_b_ele_epoch, remove_b_version));
usleep(500);
}
EXPECT_FALSE(leader.palf_handle_impl_->config_mgr_.log_ms_meta_.curr_.log_sync_memberlist_.contains(b_addr));
EXPECT_UNTIL_EQ(true, b_handle->palf_handle_impl_->config_mgr_.log_ms_meta_.curr_.log_sync_memberlist_.contains(b_addr));
EXPECT_UNTIL_EQ(false, c_handle->palf_handle_impl_->config_mgr_.log_ms_meta_.curr_.log_sync_memberlist_.contains(b_addr));
// 2. B is elected to be the leader
for (auto srv: get_cluster()) {
srv->set_leader(id, b_addr);
}
EXPECT_UNTIL_EQ(true, b_handle->palf_handle_impl_->state_mgr_.is_leader_active());
// 3. A can not remove B successfully
EXPECT_EQ(OB_NOT_MASTER, leader.palf_handle_impl_->config_mgr_.change_config_(args, remove_b_pid, remove_b_ele_epoch, remove_b_version));
unblock_net(leader_idx, b_idx);
// 4. A's memberlist will be reset
EXPECT_UNTIL_EQ(true, leader.palf_handle_impl_->config_mgr_.log_ms_meta_.curr_.log_sync_memberlist_.contains(b_addr));
revert_cluster_palf_handle_guard(palf_list);
}
delete_paxos_group(id);
PALF_LOG(INFO, "end test switch_leader_during_removing_member2", K(id));
}
TEST_F(TestObSimpleLogClusterConfigChangeMockEle, switch_leader_during_removing_member3)
{
int ret = OB_SUCCESS;
const int64_t id = ATOMIC_AAF(&palf_id_, 1);
SET_CASE_LOG_FILE(TEST_NAME, "switch_leader_during_removing_member3");
PALF_LOG(INFO, "begin test switch_leader_during_removing_member3", K(id));
{
int64_t leader_idx = 0;
PalfHandleImplGuard leader;
std::vector<PalfHandleImplGuard*> palf_list;
EXPECT_EQ(OB_SUCCESS, create_paxos_group_with_mock_election(id, leader_idx, leader));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 200, id));
EXPECT_EQ(OB_SUCCESS, get_cluster_palf_handle_guard(id, palf_list));
const int64_t b_idx = (leader_idx + 1) % 3;
const int64_t c_idx = (leader_idx + 2) % 3;
PalfHandleImplGuard *a_handle = palf_list[leader_idx];
PalfHandleImplGuard *b_handle = palf_list[b_idx];
PalfHandleImplGuard *c_handle = palf_list[c_idx];
// 1. A starts to remove B
int64_t remove_b_pid = 0;
int64_t remove_b_ele_epoch = 0;
LogConfigVersion remove_b_version;
const common::ObAddr b_addr = get_cluster()[b_idx]->get_addr();
const common::ObAddr c_addr = get_cluster()[c_idx]->get_addr();
LogConfigChangeArgs args(common::ObMember(b_addr, 1), 2, REMOVE_MEMBER);
EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->config_mgr_.start_change_config(remove_b_pid, remove_b_ele_epoch, args.type_));
EXPECT_EQ(OB_EAGAIN, leader.palf_handle_impl_->config_mgr_.change_config_(args, remove_b_pid, remove_b_ele_epoch, remove_b_version));
EXPECT_FALSE(leader.palf_handle_impl_->config_mgr_.log_ms_meta_.curr_.log_sync_memberlist_.contains(b_addr));
EXPECT_TRUE(b_handle->palf_handle_impl_->config_mgr_.log_ms_meta_.curr_.log_sync_memberlist_.contains(b_addr));
EXPECT_TRUE(c_handle->palf_handle_impl_->config_mgr_.log_ms_meta_.curr_.log_sync_memberlist_.contains(b_addr));
// A sends config meta to C
block_net(leader_idx, b_idx);
while (-1 == leader.palf_handle_impl_->config_mgr_.last_submit_config_log_time_us_ ||
c_handle->palf_handle_impl_->config_mgr_.log_ms_meta_.curr_.log_sync_memberlist_.contains(b_addr)) {
EXPECT_EQ(OB_EAGAIN, leader.palf_handle_impl_->config_mgr_.change_config_(args, remove_b_pid, remove_b_ele_epoch, remove_b_version));
usleep(500);
}
EXPECT_FALSE(leader.palf_handle_impl_->config_mgr_.log_ms_meta_.curr_.log_sync_memberlist_.contains(b_addr));
EXPECT_UNTIL_EQ(true, b_handle->palf_handle_impl_->config_mgr_.log_ms_meta_.curr_.log_sync_memberlist_.contains(b_addr));
EXPECT_UNTIL_EQ(false, c_handle->palf_handle_impl_->config_mgr_.log_ms_meta_.curr_.log_sync_memberlist_.contains(b_addr));
// 2. C is elected to be the leader
for (auto srv: get_cluster()) {
srv->set_leader(id, c_addr);
}
EXPECT_UNTIL_EQ(true, c_handle->palf_handle_impl_->state_mgr_.is_leader_active());
// 3. A can not remove B successfully
EXPECT_EQ(OB_NOT_MASTER, leader.palf_handle_impl_->config_mgr_.change_config_(args, remove_b_pid, remove_b_ele_epoch, remove_b_version));
unblock_net(leader_idx, b_idx);
// 4. A's memberlist will be re-sync by C successfully
EXPECT_UNTIL_EQ(false, a_handle->palf_handle_impl_->config_mgr_.log_ms_meta_.curr_.log_sync_memberlist_.contains(b_addr));
EXPECT_UNTIL_EQ(false, c_handle->palf_handle_impl_->config_mgr_.log_ms_meta_.curr_.log_sync_memberlist_.contains(b_addr));
leader.reset();
revert_cluster_palf_handle_guard(palf_list);
}
delete_paxos_group(id);
PALF_LOG(INFO, "end test switch_leader_during_removing_member3", K(id));
}
} // end unittest
} // end oceanbase
int main(int argc, char **argv)
{
RUN_SIMPLE_LOG_CLUSTER_TEST(TEST_NAME);
}

View File

@ -102,7 +102,7 @@ TEST_F(TestObSimpleLogThrottleArb, test_2f1a_throttling_major)
const int64_t CONFIG_CHANGE_TIMEOUT = 10 * 1000 * 1000L; // 10s
const int64_t id = ATOMIC_AAF(&palf_id_, 1);
common::ObMember dummy_member;
ASSERT_EQ(OB_SUCCESS, create_paxos_group_with_arb(id, &loc_cb, arb_replica_idx, leader_idx, leader));
ASSERT_EQ(OB_SUCCESS, create_paxos_group_with_arb(id, &loc_cb, arb_replica_idx, leader_idx, false, leader));
ASSERT_EQ(OB_SUCCESS, get_cluster_palf_handle_guard(id, palf_list));
loc_cb.leader_ = leader.palf_handle_impl_->self_;
@ -233,7 +233,7 @@ TEST_F(TestObSimpleLogThrottleArb, test_2f1a_throttling_minor_leader)
const int64_t CONFIG_CHANGE_TIMEOUT = 10 * 1000 * 1000L; // 10s
const int64_t id = ATOMIC_AAF(&palf_id_, 1);
common::ObMember dummy_member;
ASSERT_EQ(OB_SUCCESS, create_paxos_group_with_arb(id, &loc_cb, arb_replica_idx, leader_idx, leader));
ASSERT_EQ(OB_SUCCESS, create_paxos_group_with_arb(id, &loc_cb, arb_replica_idx, leader_idx, false, leader));
ASSERT_EQ(OB_SUCCESS, get_cluster_palf_handle_guard(id, palf_list));
loc_cb.leader_ = leader.palf_handle_impl_->self_;
@ -351,7 +351,7 @@ TEST_F(TestObSimpleLogThrottleArb, test_2f1a_throttling_minor_follower)
const int64_t CONFIG_CHANGE_TIMEOUT = 10 * 1000 * 1000L; // 10s
const int64_t id = ATOMIC_AAF(&palf_id_, 1);
common::ObMember dummy_member;
ASSERT_EQ(OB_SUCCESS, create_paxos_group_with_arb(id, &loc_cb, arb_replica_idx, leader_idx, leader));
ASSERT_EQ(OB_SUCCESS, create_paxos_group_with_arb(id, &loc_cb, arb_replica_idx, leader_idx, false, leader));
ASSERT_EQ(OB_SUCCESS, get_cluster_palf_handle_guard(id, palf_list));
loc_cb.leader_ = leader.palf_handle_impl_->self_;
@ -484,7 +484,7 @@ TEST_F(TestObSimpleLogThrottleArb, test_4f1a_degrade_upgrade)
const int64_t id = ATOMIC_AAF(&palf_id_, 1);
common::ObMember dummy_member;
std::vector<PalfHandleImplGuard*> palf_list;
ASSERT_EQ(OB_SUCCESS, create_paxos_group_with_arb(id, &loc_cb, arb_replica_idx, leader_idx, leader));
ASSERT_EQ(OB_SUCCESS, create_paxos_group_with_arb(id, &loc_cb, arb_replica_idx, leader_idx, false, leader));
ASSERT_EQ(OB_SUCCESS, leader.palf_handle_impl_->add_member(ObMember(get_cluster()[3]->get_addr(), 1), 3, CONFIG_CHANGE_TIMEOUT));
ASSERT_EQ(OB_SUCCESS, leader.palf_handle_impl_->add_member(ObMember(get_cluster()[4]->get_addr(), 1), 4, CONFIG_CHANGE_TIMEOUT));
ASSERT_EQ(OB_SUCCESS, get_cluster_palf_handle_guard(id, palf_list));

View File

@ -1956,7 +1956,7 @@ int LogConfigMgr::wait_log_barrier_(const LogConfigChangeArgs &args,
constexpr bool need_purge_throttling = true;
constexpr bool need_remote_check = false;
const bool need_skip_log_barrier = mode_mgr_->need_skip_log_barrier();
LSN prev_log_end_lsn = prev_end_lsn_;
LSN prev_log_end_lsn;
if (new_member_list.get_member_number() == 0) {
ret = OB_INVALID_ARGUMENT;
} else if (OB_FAIL(sync_get_committed_end_lsn_(args, new_member_list, new_replica_num,
@ -1967,6 +1967,8 @@ int LogConfigMgr::wait_log_barrier_(const LogConfigChangeArgs &args,
ret = OB_SUCCESS;
PALF_LOG(INFO, "PALF is in FLASHBACK mode, skip log barrier", K(ret), K_(palf_id), K_(self), \
"accepted_mode_meta", mode_mgr_->get_accepted_mode_meta());
} else if (OB_FAIL(get_log_barrier_(unused_lsn, prev_log_end_lsn, unused_id))) {
PALF_LOG(WARN, "get_log_barrier_ failed", KR(ret), K_(palf_id), K_(self));
} else if (FALSE_IT(ret = (first_committed_end_lsn >= prev_log_end_lsn)? OB_SUCCESS: OB_EAGAIN)) {
} else if (OB_EAGAIN == ret) {
// committed_end_lsn do not change during 2s, skip the reconfiguration