Files
oceanbase/unittest/election/test_election_cluster.cpp
oceanbase-admin cea7de1475 init push
2021-05-31 22:56:52 +08:00

984 lines
36 KiB
C++

/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#include "test_election_cluster.h"
#include "share/allocator/ob_tenant_mutil_allocator_mgr.h"
#include "share/ob_cluster_version.h"
#include "election/ob_election_async_log.h"
#include <cstdio>
namespace oceanbase {
namespace unittest {
const static char* LOCAL_IP = "127.0.0.1";
const static int32_t RPC_PORT = 34506;
static int64_t membership_version = 0;
int ObTestElectionCluster::start_one_(const int64_t idx, const char* ip, const int32_t rpc_port)
{
int ret = OB_SUCCESS;
if (idx < 0 || idx >= MAX_OB_TRANS_SERVICE_NUM || NULL == ip) {
ELECT_ASYNC_LOG(WARN, "invalid argument", K(idx), KP(ip));
ret = OB_INVALID_ARGUMENT;
} else {
ObAddr addr;
if (!addr.set_ip_addr(ip, rpc_port)) {
ELECT_ASYNC_LOG(WARN, "set ipv4 addr error", K(ret), K(ip), K(rpc_port));
ret = OB_INVALID_ARGUMENT;
} else if (OB_SUCCESS != (ret = election_mgr_[idx].init(addr, &batch_rpc_, &eg_cb_))) {
ELECT_ASYNC_LOG(WARN, "election mgr init error", K(ret), K(idx));
} else if (OB_SUCCESS != (ret = election_rpc_[idx].init(&election_mgr_[idx], addr))) {
ELECT_ASYNC_LOG(WARN, "election rpc init error", K(ret));
} else if (OB_SUCCESS != (ret = election_mgr_[idx].set_election_rpc(this))) {
ELECT_ASYNC_LOG(WARN, "election mgr set rpc error", K(ret));
} else if (OB_SUCCESS != (ret = election_mgr_[idx].start())) {
ELECT_ASYNC_LOG(WARN, "election mgr start error", K(ret));
} else {
election_mgr_[idx].idx_ = idx;
election_mgr_[idx].addr_ = addr;
ELECT_ASYNC_LOG(INFO, "election mgr start success", K(idx), K(addr));
}
}
return ret;
}
int ObTestElectionCluster::init()
{
int ret = OB_SUCCESS;
ip_ = LOCAL_IP;
base_rpc_port_ = RPC_PORT;
is_force_leader_ = false;
return ret;
}
int ObTestElectionCluster::start()
{
int ret = OB_SUCCESS;
for (int64_t i = 0; OB_SUCC(ret) && i < MAX_OB_TRANS_SERVICE_NUM; ++i) {
if (OB_SUCCESS != (ret = start_one_(i, ip_, base_rpc_port_ + (int32_t)i))) {
ELECT_ASYNC_LOG(WARN, "start one election mgr error", K(ret), K(i));
} else {
ELECT_ASYNC_LOG(INFO, "start one election mgr success", K(i));
}
}
if (OB_SUCC(ret)) {
inited_ = true;
}
return ret;
}
void ObTestElectionCluster::destroy()
{
ELECT_ASYNC_LOG(INFO, "ObTestElectionCluster destroy");
for (int64_t i = 0; i < MAX_OB_TRANS_SERVICE_NUM; ++i) {
election_rpc_[i].destroy();
election_mgr_[i].destroy();
}
inited_ = false;
}
int ObTestElectionCluster::post_election_msg(
const ObAddr& server, const ObPartitionKey& partition, const election::ObElectionMsg& msg)
{
int ret = OB_SUCCESS;
if (!inited_) {
ELECT_ASYNC_LOG(WARN, "not init");
ret = OB_NOT_INIT;
} else if (!server.is_valid() || !msg.is_valid()) {
ret = OB_INVALID_ARGUMENT;
ELECT_ASYNC_LOG(WARN, "invalid argument", K(ret), K(server), K(msg));
} else {
if (!is_force_leader_) {
for (int32_t i = 0; i < MAX_ELECTION_MSG_COUNT; ++i) {
if (msg.get_msg_type() == i && rpc_loss_[i].times_-- > 0) {
ELECT_ASYNC_LOG(INFO, "message loss", "msg_type", i);
ret = OB_RPC_POST_ERROR;
break;
}
}
}
if (OB_SUCC(ret)) {
const int32_t idx = server.get_port() - base_rpc_port_;
if (idx < 0 || idx > MAX_OB_TRANS_SERVICE_NUM) {
ret = OB_ERR_UNEXPECTED;
ELECT_ASYNC_LOG(WARN, "server address error", K(idx), K(server), K(base_rpc_port_));
} else {
ElectionRpcTask* rpc_task = ElectionRpcTaskFactory::alloc();
if (NULL == rpc_task) {
ELECT_ASYNC_LOG(WARN, "alloc election rpc task error");
ret = OB_ALLOCATE_MEMORY_FAILED;
} else {
ObElectionMsgBuffer msgbuf;
if (OB_SUCCESS !=
(ret = serialization::encode_i32(
msgbuf.get_data(), msgbuf.get_capacity(), msgbuf.get_position(), msg.get_msg_type()))) {
ELECT_ASYNC_LOG(WARN, "serialize msg type error", K(ret), "msg type", msg.get_msg_type());
} else if (OB_SUCCESS !=
(ret = partition.serialize(msgbuf.get_data(), msgbuf.get_capacity(), msgbuf.get_position()))) {
ELECT_ASYNC_LOG(WARN, "serialize partition error", K(ret), K(partition));
} else if (OB_SUCCESS != msg.serialize(msgbuf.get_data(), msgbuf.get_capacity(), msgbuf.get_position())) {
ELECT_ASYNC_LOG(WARN, "serialize msg error", K(ret), K(msg));
} else {
rpc_task->server_ = server;
rpc_task->msgbuf_ = msgbuf;
rpc_task->partition_ = partition;
rpc_task->timestamp_ = ObClockGenerator::getClock();
if (OB_SUCCESS != (ret = election_rpc_[idx].push(rpc_task))) {
ELECT_ASYNC_LOG(WARN, "push election rpc task error", K(ret), K(msg));
ElectionRpcTaskFactory::release(rpc_task);
} else {
ELECT_ASYNC_LOG(INFO, "election msg", K(idx), K(server), K(msg), K(partition));
}
}
}
}
}
}
return ret;
}
int ObTestElectionCluster::post_election_group_msg(const common::ObAddr& server, const ObElectionGroupId& eg_id,
const ObPartArrayBuffer& part_array_buf, const ObElectionMsg& msg)
{
UNUSED(server);
UNUSED(eg_id);
UNUSED(part_array_buf);
UNUSED(msg);
return OB_SUCCESS;
}
int ObTestElectionCluster::add_partition(const ObPartitionKey& partition, const ObAddr& leader)
{
int ret = OB_SUCCESS;
int64_t leader_epoch = 0;
ObIElection* unused = NULL;
if (!inited_) {
ELECT_ASYNC_LOG(WARN, "not init");
ret = OB_NOT_INIT;
} else if (!partition.is_valid() || !leader.is_valid()) {
ELECT_ASYNC_LOG(WARN, "invalid argument", K(partition), K(leader));
ret = OB_INVALID_ARGUMENT;
} else {
ObAddr addr;
ObMemberList mlist;
for (int64_t i = 0; OB_SUCC(ret) && i < OB_REPLICA_NUM; ++i) {
if (!addr.set_ip_addr(ip_, base_rpc_port_ + (int32_t)i)) {
ELECT_ASYNC_LOG(WARN, "set ipv4 addr error", K_(ip), "port", base_rpc_port_ + i);
ret = OB_ERR_UNEXPECTED;
} else {
if (OB_SUCCESS != (ret = mlist.add_server(addr))) {
ELECT_ASYNC_LOG(WARN, "mlist add server error", K(ret), K(i), K(mlist), K(addr));
} else if (OB_SUCCESS !=
(ret = election_mgr_[i].add_partition(partition, OB_REPLICA_NUM, &election_cb_[i], unused))) {
ELECT_ASYNC_LOG(WARN, "election_mgr add partition error", K(ret), K(i));
} else {
ELECT_ASYNC_LOG(INFO, "election_mgr add partition success", K(i), K(partition), K(addr));
}
}
}
int64_t cur_ts = ObClockGenerator::getClock() - T_ELECT2;
for (int64_t i = 0; OB_SUCC(ret) && i < OB_REPLICA_NUM; ++i) {
if (OB_SUCCESS != (election_mgr_[i].set_candidate(partition, OB_REPLICA_NUM, mlist, ++membership_version))) {
ELECT_ASYNC_LOG(WARN, "set candidate error", K(ret), K(i), K(partition), K(mlist));
} else if (OB_SUCCESS != (ret = election_mgr_[i].start_partition(partition, leader, cur_ts, leader_epoch))) {
ELECT_ASYNC_LOG(WARN, "election_mgr start partition error", K(ret), K(i), K(partition), K(leader));
} else {
ELECT_ASYNC_LOG(INFO, "election_mgr start partition success", K(i), K(partition), K(mlist), K(leader));
}
}
}
return ret;
}
int ObTestElectionCluster::remove_partition(const ObPartitionKey& partition)
{
int ret = OB_SUCCESS;
if (!inited_) {
ELECT_ASYNC_LOG(WARN, "not init");
ret = OB_NOT_INIT;
} else if (!partition.is_valid()) {
ELECT_ASYNC_LOG(WARN, "invalid argument", K(partition));
} else {
for (int64_t i = 0; i < OB_REPLICA_NUM; ++i) {
if (OB_SUCCESS != (ret = election_mgr_[i].remove_partition(partition))) {
ELECT_ASYNC_LOG(WARN, "remove partition error", K(ret), K(i), K(partition));
} else {
ELECT_ASYNC_LOG(INFO, "remove partition success", K(i), K(partition));
}
}
}
return ret;
}
void TestObElectionCluster::SetUp()
{
oceanbase::election::ObAsyncLog::getLogger().init("test_election_cluster.log", OB_LOG_LEVEL_INFO, true);
test_partition_.init(combine_id(1, 3001), 0, 1);
EXPECT_TRUE(leader_.set_ip_addr(LOCAL_IP, RPC_PORT));
for (int64_t i = 0; i < OB_REPLICA_NUM; ++i) {
ObElectionPriority priority(true, OB_REPLICA_NUM, ObVersion(10), OB_REPLICA_NUM - i);
cluster_.election_cb_[i].init(priority);
}
ObElectionPriority priority(true, 1, ObVersion(1), 1);
cluster_.election_cb_[3].init(priority);
EXPECT_EQ(OB_SUCCESS, cluster_.init());
EXPECT_EQ(OB_SUCCESS, cluster_.start());
EXPECT_EQ(OB_SUCCESS, cluster_.add_partition(test_partition_, leader_));
}
void TestObElectionCluster::TearTown()
{
cluster_.remove_partition(test_partition_);
cluster_.destroy();
}
TEST_F(TestObElectionCluster, test_get_leader)
{
ObAddr addr;
ObAddr previous_leader;
int64_t leader_epoch = 0;
bool unused_bool;
ELECT_ASYNC_LOG(INFO, "test_get_leader");
for (int64_t i = 0; i < OB_REPLICA_NUM; ++i) {
addr.reset();
EXPECT_EQ(OB_SUCCESS,
cluster_.election_mgr_[i].get_leader(test_partition_, addr, previous_leader, leader_epoch, unused_bool));
EXPECT_EQ(leader_, addr);
}
}
TEST_F(TestObElectionCluster, change_leader_not_exist)
{
int ret = OB_SUCCESS;
ObAddr addr;
ObAddr next_leader;
ObAddr previous_leader;
int64_t leader_epoch = 0;
bool unused_bool;
ELECT_ASYNC_LOG(INFO, "change_leader_not_exist");
sleep(5);
EXPECT_EQ(OB_SUCCESS,
cluster_.election_mgr_[0].get_leader(test_partition_, addr, previous_leader, leader_epoch, unused_bool));
EXPECT_EQ(leader_, addr);
const int32_t idx = addr.get_port() - cluster_.base_rpc_port_;
EXPECT_EQ(OB_SUCCESS, cluster_.election_mgr_[1].stop_partition(test_partition_));
EXPECT_TRUE(next_leader.set_ip_addr(LOCAL_IP, RPC_PORT + 1));
const int32_t next_idx = 1;
int64_t old_membership_version = cluster_.election_cb_[next_idx].get_priority().get_membership_version();
int64_t old_data_version = cluster_.election_cb_[next_idx].get_priority().get_data_version();
ObTsWindows ts_windows;
cluster_.election_cb_[next_idx].priority_.set_membership_version(
cluster_.election_cb_[idx].get_priority().get_membership_version() + 1);
cluster_.election_cb_[next_idx].priority_.set_data_version(
cluster_.election_cb_[idx].get_priority().get_data_version());
if (OB_SUCCESS != (ret = cluster_.election_mgr_[idx].change_leader_async(test_partition_, next_leader, ts_windows))) {
ELECT_ASYNC_LOG(WARN, "change leader error", K(ret), K_(test_partition), K(next_leader));
} else {
sleep(8);
}
// EXPECT_EQ(OB_ELECTION_WARN_INVALID_LEADER, cluster_.election_mgr_[0].get_leader(test_partition_, addr,
// previous_leader, leader_epoch, unused_bool));
cluster_.election_cb_[next_idx].priority_.set_membership_version(old_membership_version);
cluster_.election_cb_[next_idx].priority_.set_data_version(ObVersion(old_data_version));
}
TEST_F(TestObElectionCluster, change_leader)
{
int ret = OB_SUCCESS;
ObAddr addr;
ObAddr next_leader;
ObAddr previous_leader;
ObTsWindows ts_windows;
int64_t leader_epoch = 0;
bool unused_bool;
ELECT_ASYNC_LOG(INFO, "change_leader");
sleep(3);
EXPECT_EQ(OB_SUCCESS,
cluster_.election_mgr_[0].get_leader(test_partition_, addr, previous_leader, leader_epoch, unused_bool));
EXPECT_EQ(leader_, addr);
const int32_t idx = addr.get_port() - cluster_.base_rpc_port_;
EXPECT_TRUE(next_leader.set_ip_addr(LOCAL_IP, RPC_PORT + 1));
if (OB_SUCCESS != (ret = cluster_.election_mgr_[idx].change_leader_async(test_partition_, next_leader, ts_windows))) {
ELECT_ASYNC_LOG(WARN, "change leader error", K(ret), K_(test_partition), K(next_leader));
} else {
sleep(3);
}
EXPECT_EQ(OB_SUCCESS,
cluster_.election_mgr_[0].get_leader(test_partition_, addr, previous_leader, leader_epoch, unused_bool));
EXPECT_EQ(addr, next_leader);
}
TEST_F(TestObElectionCluster, test_vote_vote_leader_candidate_false)
{
ObAddr addr;
ObAddr previous_leader;
int64_t leader_epoch = 0;
bool unused_bool;
ELECT_ASYNC_LOG(INFO, "test_vote_vote_leader_candidate_false");
EXPECT_EQ(OB_SUCCESS,
cluster_.election_mgr_[0].get_leader(test_partition_, addr, previous_leader, leader_epoch, unused_bool));
EXPECT_EQ(leader_, addr);
const int32_t idx = addr.get_port() - cluster_.base_rpc_port_;
// printf("idx=%d\n", idx);
// set leader priority candidate is false
// cluster_.election_cb_[idx].priority_.set_candidate(false);
sleep(5);
EXPECT_EQ(OB_SUCCESS,
cluster_.election_mgr_[idx].get_leader(test_partition_, addr, previous_leader, leader_epoch, unused_bool));
EXPECT_EQ(leader_, addr);
cluster_.election_cb_[idx].priority_.set_candidate(true);
}
// A(A, B, C)
// B(A, B, C)
// C(A, B, C)
// D(A, B, C)
TEST_F(TestObElectionCluster, test_query_not_in_member)
{
int ret = OB_SUCCESS;
ObAddr addr;
ObAddr previous_leader;
ObMemberList mlist;
int64_t leader_epoch = 0;
ObIElection* unused = NULL;
ELECT_ASYNC_LOG(INFO, "test_query_not_in_member");
ObIElectionMgr* query = &cluster_.election_mgr_[3];
ObIElectionCallback* query_election_cb = &cluster_.election_cb_[3];
for (int32_t i = 0; OB_SUCC(ret) && i < OB_REPLICA_NUM; ++i) {
if (!(addr.set_ip_addr(cluster_.ip_, cluster_.base_rpc_port_ + i))) {
ELECT_ASYNC_LOG(WARN, "set addr error", "ip", cluster_.ip_, "port", cluster_.base_rpc_port_ + i);
} else {
if (OB_SUCCESS != (ret = mlist.add_server(addr))) {
ELECT_ASYNC_LOG(WARN, "mlist add server error", K(ret), K(mlist), K(addr));
} else {
ELECT_ASYNC_LOG(INFO, "mlist add server", K(mlist), K(addr));
}
}
}
if (OB_SUCC(ret)) {
if (OB_SUCCESS != (ret = query->add_partition(test_partition_, OB_REPLICA_NUM, query_election_cb, unused))) {
ELECT_ASYNC_LOG(WARN, "add partition error", K(ret), K_(test_partition));
} else if (OB_SUCCESS !=
(ret = query->set_candidate(test_partition_, OB_REPLICA_NUM, mlist, ++membership_version))) {
ELECT_ASYNC_LOG(WARN, "set candidate error", K(ret), K_(test_partition), K(mlist));
} else if (OB_SUCCESS != (ret = query->start_partition(test_partition_))) {
ELECT_ASYNC_LOG(WARN, "start candidate error", K(ret), K_(test_partition));
} else {
ELECT_ASYNC_LOG(INFO, "query start partition success", K_(test_partition), K(mlist));
}
}
sleep(10);
bool unused_bool;
for (int64_t i = 0; i < 10; ++i) {
addr.reset();
EXPECT_EQ(OB_SUCCESS, query->get_leader(test_partition_, addr, previous_leader, leader_epoch, unused_bool));
EXPECT_EQ(leader_, addr);
sleep(1);
}
EXPECT_EQ(OB_SUCCESS, query->remove_partition(test_partition_));
}
// A(A, B, C)
// B(A, B, C)
// C(A, B, C)
// D(B, C, D)
TEST_F(TestObElectionCluster, test_query_not_in_leader_member_and_without_leader)
{
int ret = OB_SUCCESS;
ObAddr addr;
ObMemberList mlist;
ObAddr previous_leader;
int64_t leader_epoch = 0;
bool unused_bool;
ObIElection* unused = NULL;
ELECT_ASYNC_LOG(INFO, "test_query_not_in_leader_member_and_without_leader");
ObIElectionMgr* query = &cluster_.election_mgr_[3];
ObIElectionCallback* query_election_cb = &cluster_.election_cb_[3];
for (int32_t i = 1; OB_SUCC(ret) && i < MAX_OB_TRANS_SERVICE_NUM; ++i) {
if (!(addr.set_ip_addr(cluster_.ip_, cluster_.base_rpc_port_ + i))) {
ELECT_ASYNC_LOG(WARN, "set addr error", "ip", cluster_.ip_, "port", cluster_.base_rpc_port_ + i);
} else {
EXPECT_EQ(OB_SUCCESS, mlist.add_server(addr));
}
}
if (OB_SUCC(ret)) {
if (OB_SUCCESS != (ret = query->add_partition(test_partition_, OB_REPLICA_NUM, query_election_cb, unused))) {
ELECT_ASYNC_LOG(WARN, "add partition error", K(ret), K_(test_partition));
} else if (OB_SUCCESS !=
(ret = query->set_candidate(test_partition_, OB_REPLICA_NUM, mlist, ++membership_version))) {
ELECT_ASYNC_LOG(WARN, "set candidate error", K(ret), K_(test_partition), K(mlist));
} else if (OB_SUCCESS != (ret = query->start_partition(test_partition_))) {
ELECT_ASYNC_LOG(WARN, "start candidate error", K(ret), K_(test_partition));
} else {
ELECT_ASYNC_LOG(INFO, "query start partition success", K_(test_partition), K(mlist));
}
}
sleep(10);
for (int64_t i = 0; i < 10; ++i) {
addr.reset();
EXPECT_EQ(OB_SUCCESS, query->get_leader(test_partition_, addr, previous_leader, leader_epoch, unused_bool));
EXPECT_EQ(leader_, addr);
sleep(1);
}
EXPECT_EQ(OB_SUCCESS, query->remove_partition(test_partition_));
}
// A(A, B, C)
// B(A, B, C)
// C(A, B, C)
// D(A, B, D)
TEST_F(TestObElectionCluster, test_query_leader_and_with_leader_and_self)
{
int ret = OB_SUCCESS;
ObAddr addr;
ObAddr previous_leader;
ObMemberList mlist;
int64_t leader_epoch = 0;
ObIElection* unused = NULL;
ELECT_ASYNC_LOG(INFO, "test_query_leader_and_with_leader_and_self");
ObIElectionMgr* query = &cluster_.election_mgr_[3];
ObIElectionCallback* query_election_cb = &cluster_.election_cb_[3];
for (int64_t i = 0; OB_SUCC(ret) && i < MAX_OB_TRANS_SERVICE_NUM; ++i) {
if (i == 2) { // skip
continue;
}
if (!(addr.set_ip_addr(cluster_.ip_, cluster_.base_rpc_port_ + (int32_t)i))) {
ELECT_ASYNC_LOG(WARN, "set addr error", "ip", cluster_.ip_, "port", cluster_.base_rpc_port_ + i);
} else {
if (OB_SUCCESS != (ret = mlist.add_server(addr))) {
ELECT_ASYNC_LOG(WARN, "mlist add server error", K(ret), K(mlist), K(addr));
} else {
ELECT_ASYNC_LOG(INFO, "mlist add server", K(mlist), K(addr));
}
}
}
if (OB_SUCC(ret)) {
if (OB_SUCCESS != (ret = query->add_partition(test_partition_, OB_REPLICA_NUM, query_election_cb, unused))) {
ELECT_ASYNC_LOG(WARN, "add partition error", K(ret), K_(test_partition));
} else if (OB_SUCCESS !=
(ret = query->set_candidate(test_partition_, OB_REPLICA_NUM, mlist, ++membership_version))) {
ELECT_ASYNC_LOG(WARN, "set candidate error", K(ret), K_(test_partition), K(mlist));
} else if (OB_SUCCESS != (ret = query->start_partition(test_partition_))) {
ELECT_ASYNC_LOG(WARN, "start candidate error", K(ret), K_(test_partition));
} else {
ELECT_ASYNC_LOG(INFO, "query start partition success", K_(test_partition), K(mlist));
}
}
sleep(10);
bool unused_bool;
for (int64_t i = 0; i < 10; ++i) {
addr.reset();
EXPECT_EQ(OB_SUCCESS, query->get_leader(test_partition_, addr, previous_leader, leader_epoch, unused_bool));
EXPECT_EQ(leader_, addr);
sleep(1);
}
EXPECT_EQ(OB_SUCCESS, query->remove_partition(test_partition_));
}
// A(A, B, C)
// B(A, B, C)
// C(A, B, C)
// D(C, D)
TEST_F(TestObElectionCluster, test_query_leader_and_without_leader)
{
int ret = OB_SUCCESS;
ObAddr addr;
ObAddr previous_leader;
ObMemberList mlist;
int64_t leader_epoch = 0;
ObIElection* unused = NULL;
ELECT_ASYNC_LOG(INFO, "test_query_leader_and_without_leader");
ObIElectionMgr* query = &cluster_.election_mgr_[3];
ObIElectionCallback* query_election_cb = &cluster_.election_cb_[3];
for (int64_t i = 0; OB_SUCC(ret) && i < MAX_OB_TRANS_SERVICE_NUM; ++i) {
if (i == 0 || i == 1) { // skip
continue;
}
if (!(addr.set_ip_addr(cluster_.ip_, cluster_.base_rpc_port_ + (int32_t)i))) {
ELECT_ASYNC_LOG(WARN, "set addr error", "ip", cluster_.ip_, "port", cluster_.base_rpc_port_ + i);
} else {
if (OB_SUCCESS != (ret = mlist.add_server(addr))) {
ELECT_ASYNC_LOG(WARN, "mlist add server error", K(ret), K(mlist), K(addr));
} else {
ELECT_ASYNC_LOG(INFO, "mlist add server", K(mlist), K(addr));
}
}
}
if (OB_SUCC(ret)) {
if (OB_SUCCESS != (ret = query->add_partition(test_partition_, OB_REPLICA_NUM, query_election_cb, unused))) {
ELECT_ASYNC_LOG(WARN, "add partition error", K(ret), K_(test_partition));
} else if (OB_SUCCESS !=
(ret = query->set_candidate(test_partition_, OB_REPLICA_NUM, mlist, ++membership_version))) {
ELECT_ASYNC_LOG(WARN, "set candidate error", K(ret), K_(test_partition), K(mlist));
} else if (OB_SUCCESS != (ret = query->start_partition(test_partition_))) {
ELECT_ASYNC_LOG(WARN, "start candidate error", K(ret), K_(test_partition));
} else {
ELECT_ASYNC_LOG(INFO, "query start partition success", K_(test_partition), K(mlist));
}
}
sleep(10);
bool unused_bool;
for (int64_t i = 0; i < 10; ++i) {
addr.reset();
EXPECT_EQ(OB_SUCCESS, query->get_leader(test_partition_, addr, previous_leader, leader_epoch, unused_bool));
EXPECT_EQ(leader_, addr);
sleep(1);
}
EXPECT_EQ(OB_SUCCESS, query->remove_partition(test_partition_));
}
// A(A, B, C)
// B(A, B, C)
// C(A, B, C)
// D(B, C)
TEST_F(TestObElectionCluster, test_query_leader_and_without_leader_and_self)
{
int ret = OB_SUCCESS;
ObAddr addr;
ObAddr previous_leader;
ObMemberList mlist;
int64_t leader_epoch = 0;
ObIElection* unused = NULL;
ObIElectionMgr* query = &cluster_.election_mgr_[3];
ELECT_ASYNC_LOG(INFO, "test_query_leader_and_without_leader_and_self");
ObIElectionCallback* query_election_cb = &cluster_.election_cb_[3];
for (int64_t i = 0; OB_SUCC(ret) && i < MAX_OB_TRANS_SERVICE_NUM; ++i) {
if (i == 0 || i == 3) { // skip
continue;
}
if (!(addr.set_ip_addr(cluster_.ip_, cluster_.base_rpc_port_ + (int32_t)i))) {
ELECT_ASYNC_LOG(WARN, "set addr error", "ip", cluster_.ip_, "port", cluster_.base_rpc_port_ + i);
} else {
if (OB_SUCCESS != (ret = mlist.add_server(addr))) {
ELECT_ASYNC_LOG(WARN, "mlist add server error", K(ret), K(mlist), K(addr));
} else {
ELECT_ASYNC_LOG(INFO, "mlist add server", K(mlist), K(addr));
}
}
}
if (OB_SUCC(ret)) {
if (OB_SUCCESS != (ret = query->add_partition(test_partition_, OB_REPLICA_NUM, query_election_cb, unused))) {
ELECT_ASYNC_LOG(WARN, "add partition error", K(ret), K_(test_partition));
} else if (OB_SUCCESS !=
(ret = query->set_candidate(test_partition_, OB_REPLICA_NUM, mlist, ++membership_version))) {
ELECT_ASYNC_LOG(WARN, "set candidate error", K(ret), K_(test_partition), K(mlist));
} else if (OB_SUCCESS != (ret = query->start_partition(test_partition_))) {
ELECT_ASYNC_LOG(WARN, "start candidate error", K(ret), K_(test_partition));
} else {
ELECT_ASYNC_LOG(INFO, "query start partition success", K_(test_partition), K(mlist));
}
}
sleep(10);
bool unused_bool;
for (int64_t i = 0; i < 10; ++i) {
addr.reset();
EXPECT_EQ(OB_SUCCESS, query->get_leader(test_partition_, addr, previous_leader, leader_epoch, unused_bool));
EXPECT_EQ(leader_, addr);
sleep(1);
}
EXPECT_EQ(OB_SUCCESS, query->remove_partition(test_partition_));
}
// A(A, B, C)
// B(A, B, C)
// C(A, B, C)
// D(A, B, C)
TEST_F(TestObElectionCluster, test_query_leader_and_without_self)
{
int ret = OB_SUCCESS;
ObAddr addr;
ObAddr previous_leader;
ObMemberList mlist;
int64_t leader_epoch = 0;
ObIElection* unused = NULL;
ELECT_ASYNC_LOG(INFO, "test_query_leader_and_without_self");
ObIElectionMgr* query = &cluster_.election_mgr_[3];
ObIElectionCallback* query_election_cb = &cluster_.election_cb_[3];
for (int64_t i = 0; OB_SUCC(ret) && i < MAX_OB_TRANS_SERVICE_NUM; ++i) {
if (i == 2) { // skip
continue;
}
if (!(addr.set_ip_addr(cluster_.ip_, cluster_.base_rpc_port_ + (int32_t)i))) {
ELECT_ASYNC_LOG(WARN, "set addr error", "ip", cluster_.ip_, "port", cluster_.base_rpc_port_ + i);
} else {
if (OB_SUCCESS != (ret = mlist.add_server(addr))) {
ELECT_ASYNC_LOG(WARN, "mlist add server error", K(ret), K(mlist), K(addr));
} else {
ELECT_ASYNC_LOG(INFO, "mlist add server", K(mlist), K(addr));
}
}
}
if (OB_SUCC(ret)) {
if (OB_SUCCESS != (ret = query->add_partition(test_partition_, OB_REPLICA_NUM, query_election_cb, unused))) {
ELECT_ASYNC_LOG(WARN, "add partition error", K(ret), K_(test_partition));
} else if (OB_SUCCESS !=
(ret = query->set_candidate(test_partition_, OB_REPLICA_NUM, mlist, ++membership_version))) {
ELECT_ASYNC_LOG(WARN, "set candidate error", K(ret), K_(test_partition), K(mlist));
} else if (OB_SUCCESS != (ret = query->start_partition(test_partition_))) {
ELECT_ASYNC_LOG(WARN, "start candidate error", K(ret), K_(test_partition));
} else {
ELECT_ASYNC_LOG(INFO, "query start partition success", K_(test_partition), K(mlist));
}
}
sleep(10);
bool unused_bool;
for (int64_t i = 0; i < 10; ++i) {
addr.reset();
EXPECT_EQ(OB_SUCCESS, query->get_leader(test_partition_, addr, previous_leader, leader_epoch, unused_bool));
EXPECT_EQ(leader_, addr);
sleep(1);
}
EXPECT_EQ(OB_SUCCESS, query->remove_partition(test_partition_));
}
// A(A, D)
// D(B, C)
TEST_F(TestObElectionCluster, query_vote_leader_no_query)
{
int ret = OB_SUCCESS;
ObAddr leader;
ObAddr addr;
ObAddr next_leader;
ObAddr previous_leader;
int64_t leader_epoch = 0;
const int32_t idx = 0;
ObIElection* unused = NULL;
ELECT_ASYNC_LOG(INFO, "query_vote_leader_no_query");
for (int64_t i = 0; OB_SUCC(ret) && i < OB_REPLICA_NUM; ++i) {
if (i != idx) {
EXPECT_EQ(OB_SUCCESS, cluster_.election_mgr_[i].remove_partition(test_partition_));
}
}
sleep(10);
bool unused_bool;
EXPECT_EQ(OB_ELECTION_WARN_INVALID_LEADER,
cluster_.election_mgr_[idx].get_leader(test_partition_, leader, previous_leader, leader_epoch, unused_bool));
ObMemberList mlist;
ObIElectionMgr* tmp_replica = &cluster_.election_mgr_[3];
EXPECT_TRUE(addr.set_ip_addr(cluster_.ip_, cluster_.base_rpc_port_ + idx));
EXPECT_EQ(OB_SUCCESS, mlist.add_server(addr));
EXPECT_TRUE(addr.set_ip_addr(cluster_.ip_, cluster_.base_rpc_port_ + 3));
EXPECT_EQ(OB_SUCCESS, mlist.add_server(addr));
cluster_.election_mgr_[idx].set_candidate(test_partition_, OB_REPLICA_NUM, mlist, ++membership_version);
mlist.reset();
EXPECT_TRUE(addr.set_ip_addr(cluster_.ip_, cluster_.base_rpc_port_ + 1));
mlist.add_server(addr);
EXPECT_TRUE(addr.set_ip_addr(cluster_.ip_, cluster_.base_rpc_port_ + 2));
mlist.add_server(addr);
ObElectionPriority priority(false, 1, ObVersion(1), 1);
cluster_.election_cb_[3].init(priority);
ObIElectionCallback* election_cb = &cluster_.election_cb_[3];
if (OB_SUCCESS != (ret = tmp_replica->add_partition(test_partition_, OB_REPLICA_NUM, election_cb, unused))) {
ELECT_ASYNC_LOG(WARN, "add partition error", K(ret), K_(test_partition));
} else if (OB_SUCCESS !=
(ret = tmp_replica->set_candidate(test_partition_, OB_REPLICA_NUM, mlist, ++membership_version))) {
ELECT_ASYNC_LOG(WARN, "set candidate error", K(ret), K_(test_partition), K(mlist));
} else if (OB_SUCCESS != (ret = tmp_replica->start_partition(test_partition_))) {
ELECT_ASYNC_LOG(WARN, "start candidate error", K(ret), K_(test_partition));
} else {
ELECT_ASYNC_LOG(INFO, "query start partition success", K_(test_partition), K(mlist));
}
sleep(10);
leader.reset();
EXPECT_EQ(OB_SUCCESS, tmp_replica->get_leader(test_partition_, leader, previous_leader, leader_epoch, unused_bool));
leader.reset();
EXPECT_EQ(OB_SUCCESS,
cluster_.election_mgr_[idx].get_leader(test_partition_, leader, previous_leader, leader_epoch, unused_bool));
EXPECT_TRUE(addr.set_ip_addr(cluster_.ip_, cluster_.base_rpc_port_ + idx));
EXPECT_EQ(leader, addr);
sleep(15);
for (int64_t i = 0; i < 10; ++i) {
leader.reset();
EXPECT_EQ(OB_SUCCESS,
cluster_.election_mgr_[idx].get_leader(test_partition_, leader, previous_leader, leader_epoch, unused_bool));
leader.reset();
EXPECT_EQ(OB_SUCCESS, tmp_replica->get_leader(test_partition_, leader, previous_leader, leader_epoch, unused_bool));
EXPECT_EQ(leader, addr);
sleep(1);
}
EXPECT_EQ(OB_SUCCESS, tmp_replica->remove_partition(test_partition_));
}
// A(A, B)
// B(A, B)
// D(B, C)
// A (A, D)
// D (B, C)
TEST_F(TestObElectionCluster, query_vote_leader_need_query_to_without_query)
{
int ret = OB_SUCCESS;
ObAddr leader;
ObAddr addr;
ObAddr next_leader;
ObAddr previous_leader;
int64_t leader_epoch = 0;
const int32_t idx = 0;
ObIElection* unused = NULL;
ELECT_ASYNC_LOG(INFO, "query_vote_leader_need_query_to_without_query");
EXPECT_EQ(OB_SUCCESS, cluster_.election_mgr_[2].remove_partition(test_partition_));
ObIElectionMgr* tmp_replica = &cluster_.election_mgr_[3];
ObMemberList mlist;
EXPECT_TRUE(addr.set_ip_addr(cluster_.ip_, cluster_.base_rpc_port_ + idx));
EXPECT_EQ(OB_SUCCESS, mlist.add_server(addr));
EXPECT_TRUE(addr.set_ip_addr(cluster_.ip_, cluster_.base_rpc_port_ + 1));
EXPECT_EQ(OB_SUCCESS, mlist.add_server(addr));
cluster_.election_mgr_[idx].set_candidate(test_partition_, OB_REPLICA_NUM, mlist, ++membership_version);
cluster_.election_mgr_[idx + 1].set_candidate(test_partition_, OB_REPLICA_NUM, mlist, ++membership_version);
mlist.reset();
EXPECT_TRUE(addr.set_ip_addr(cluster_.ip_, cluster_.base_rpc_port_ + 1));
mlist.add_server(addr);
EXPECT_TRUE(addr.set_ip_addr(cluster_.ip_, cluster_.base_rpc_port_ + 2));
mlist.add_server(addr);
ObElectionPriority priority(false, 1, ObVersion(1), 1);
cluster_.election_cb_[3].init(priority);
ObIElectionCallback* election_cb = &cluster_.election_cb_[3];
if (OB_SUCCESS != (ret = tmp_replica->add_partition(test_partition_, OB_REPLICA_NUM, election_cb, unused))) {
ELECT_ASYNC_LOG(WARN, "add partition error", K(ret), K_(test_partition));
} else if (OB_SUCCESS !=
(ret = tmp_replica->set_candidate(test_partition_, OB_REPLICA_NUM, mlist, ++membership_version))) {
ELECT_ASYNC_LOG(WARN, "set candidate error", K(ret), K_(test_partition), K(mlist));
} else if (OB_SUCCESS != (ret = tmp_replica->start_partition(test_partition_))) {
ELECT_ASYNC_LOG(WARN, "start candidate error", K(ret), K_(test_partition));
} else {
ELECT_ASYNC_LOG(INFO, "query start partition success", K_(test_partition), K(mlist));
}
sleep(10);
leader.reset();
bool unused_bool;
EXPECT_EQ(OB_SUCCESS,
cluster_.election_mgr_[idx].get_leader(test_partition_, leader, previous_leader, leader_epoch, unused_bool));
EXPECT_TRUE(addr.set_ip_addr(cluster_.ip_, cluster_.base_rpc_port_ + idx));
EXPECT_EQ(leader, addr);
sleep(15);
for (int64_t i = 0; i < 10; ++i) {
leader.reset();
EXPECT_EQ(OB_SUCCESS,
cluster_.election_mgr_[idx].get_leader(test_partition_, leader, previous_leader, leader_epoch, unused_bool));
EXPECT_EQ(OB_SUCCESS, tmp_replica->get_leader(test_partition_, leader, previous_leader, leader_epoch, unused_bool));
EXPECT_EQ(leader, addr);
sleep(1);
}
EXPECT_EQ(OB_SUCCESS, cluster_.election_mgr_[1].remove_partition(test_partition_));
sleep(10);
EXPECT_EQ(OB_ELECTION_WARN_INVALID_LEADER,
cluster_.election_mgr_[idx].get_leader(test_partition_, leader, previous_leader, leader_epoch, unused_bool));
mlist.reset();
EXPECT_TRUE(addr.set_ip_addr(cluster_.ip_, cluster_.base_rpc_port_ + idx));
EXPECT_EQ(OB_SUCCESS, mlist.add_server(addr));
EXPECT_TRUE(addr.set_ip_addr(cluster_.ip_, cluster_.base_rpc_port_ + 3));
EXPECT_EQ(OB_SUCCESS, mlist.add_server(addr));
cluster_.election_mgr_[idx].set_candidate(test_partition_, OB_REPLICA_NUM, mlist, ++membership_version);
sleep(15);
for (int64_t i = 0; i < 1; ++i) {
leader.reset();
EXPECT_EQ(OB_SUCCESS,
cluster_.election_mgr_[idx].get_leader(test_partition_, leader, previous_leader, leader_epoch, unused_bool));
EXPECT_TRUE(addr.set_ip_addr(cluster_.ip_, cluster_.base_rpc_port_ + idx));
EXPECT_EQ(leader, addr);
sleep(1);
}
EXPECT_EQ(OB_SUCCESS, tmp_replica->get_leader(test_partition_, leader, previous_leader, leader_epoch, unused_bool));
EXPECT_EQ(OB_SUCCESS, tmp_replica->remove_partition(test_partition_));
EXPECT_EQ(OB_SUCCESS, cluster_.election_mgr_[idx].remove_partition(test_partition_));
}
// TEST_F(TestObElectionCluster, send_vote_vote_message_loss)
//{
// int ret = OB_SUCCESS;
// ObAddr addr;
// ObAddr next_leader;
// int64_t leader_epoch = 0;
// ELECT_ASYNC_LOG(INFO, "send_vote_vote_message_loss");
// EXPECT_EQ(OB_SUCCESS, cluster_.election_mgr_[0].get_leader(test_partition_, addr, leader_epoch));
// EXPECT_EQ(leader_, addr);
// const int32_t idx = addr.get_port() - cluster_.base_rpc_port_;
// EXPECT_EQ(OB_SUCCESS, cluster_.election_mgr_[2].stop_partition(test_partition_));
// cluster_.rpc_loss_[OB_ELECTION_VOTE_VOTE].times_ = 1;
// EXPECT_TRUE(next_leader.set_ip_addr(LOCAL_IP, RPC_PORT + 1));
// int32_t next_idx = next_leader.get_port() - cluster_.base_rpc_port_;
// int64_t old_membership_version = cluster_.election_cb_[next_idx].get_priority().get_membership_version();
// int64_t old_data_version = cluster_.election_cb_[next_idx].get_priority().get_data_version();
// cluster_.election_cb_[next_idx].priority_.set_membership_version(cluster_.election_cb_[idx].get_priority().get_membership_version()
// + 1);
// cluster_.election_cb_[next_idx].priority_.set_data_version(cluster_.election_cb_[idx].get_priority().get_data_version());
// if (OB_SUCCESS != (ret = cluster_.election_mgr_[idx].change_leader_async(test_partition_, next_leader))) {
// ELECT_ASYNC_LOG(WARN, "change leader error", K(ret), K_(test_partition), K(next_leader));
//} else {
// sleep(6);
//}
// EXPECT_EQ(OB_SUCCESS, cluster_.election_mgr_[0].get_leader(test_partition_, addr, leader_epoch));
// EXPECT_EQ(next_leader, addr);
// EXPECT_EQ(OB_SUCCESS, cluster_.election_mgr_[next_idx].leader_revoke(test_partition_));
// EXPECT_EQ(OB_SUCCESS, cluster_.election_mgr_[2].start_partition(test_partition_));
// cluster_.election_cb_[next_idx].priority_.set_membership_version(old_membership_version);
// cluster_.election_cb_[next_idx].priority_.set_data_version(ObVersion(old_data_version));
//}
TEST_F(TestObElectionCluster, test_get_valid_candidate)
{
ObAddr addr;
ObAddr previous_leader;
int64_t leader_epoch = 0;
bool unused_bool;
ELECT_ASYNC_LOG(INFO, "test_get_valid_candiadte");
sleep(5);
EXPECT_EQ(OB_SUCCESS,
cluster_.election_mgr_[0].get_leader(test_partition_, addr, previous_leader, leader_epoch, unused_bool));
EXPECT_EQ(leader_, addr);
const int32_t idx = addr.get_port() - cluster_.base_rpc_port_;
ObMemberList valid_list;
ObMemberList curr_list;
EXPECT_EQ(OB_SUCCESS, cluster_.election_mgr_[idx].get_curr_candidate(test_partition_, curr_list));
EXPECT_EQ(OB_SUCCESS, cluster_.election_mgr_[idx].get_valid_candidate(test_partition_, valid_list));
EXPECT_EQ(OB_REPLICA_NUM, valid_list.get_member_number());
EXPECT_EQ(valid_list.get_member_number(), curr_list.get_member_number());
for (int64_t index = 0; index < curr_list.get_member_number(); ++index) {
ObAddr addr;
EXPECT_EQ(OB_SUCCESS, curr_list.get_server_by_index(index, addr));
EXPECT_TRUE(valid_list.contains(addr));
}
}
TEST_F(TestObElectionCluster, force_leader)
{
ObAddr addr;
ObAddr previous_leader;
int64_t leader_epoch = 0;
bool unused_bool;
EXPECT_EQ(OB_SUCCESS,
cluster_.election_mgr_[0].get_leader(test_partition_, addr, previous_leader, leader_epoch, unused_bool));
EXPECT_EQ(leader_, addr);
const int32_t idx = addr.get_port() - cluster_.base_rpc_port_;
const uint32_t revoke_type = ObElection::RevokeType::LEASE_EXPIRED;
EXPECT_EQ(OB_SUCCESS, cluster_.election_mgr_[idx].leader_revoke(test_partition_, revoke_type));
cluster_.rpc_loss_[OB_ELECTION_DEVOTE_PREPARE].times_ = 1000;
cluster_.rpc_loss_[OB_ELECTION_DEVOTE_VOTE].times_ = 1000;
cluster_.rpc_loss_[OB_ELECTION_DEVOTE_SUCCESS].times_ = 1000;
sleep(10);
cluster_.is_force_leader_ = true;
EXPECT_EQ(OB_SUCCESS, cluster_.election_mgr_[2].force_leader_async(test_partition_));
sleep(15);
EXPECT_EQ(OB_SUCCESS,
cluster_.election_mgr_[0].get_leader(test_partition_, addr, previous_leader, leader_epoch, unused_bool));
// EXPECT_EQ(addr, cluster_.election_mgr_[2].addr_);
}
} // namespace unittest
} // namespace oceanbase
int main(int argc, char** argv)
{
int ret = -1;
oceanbase::election::ObAsyncLog::getLogger().init("test_election_cluster.log", OB_LOG_LEVEL_INFO, true);
oceanbase::common::ObClusterVersion::get_instance().init(CLUSTER_VERSION_1500);
EXPECT_EQ(OB_SUCCESS, ObTenantMutilAllocatorMgr::get_instance().init());
if (OB_FAIL(oceanbase::common::ObClockGenerator::init())) {
ELECT_ASYNC_LOG(WARN, "clock generator init error.", K(ret));
} else {
testing::InitGoogleTest(&argc, argv);
ret = RUN_ALL_TESTS();
}
return ret;
}