984 lines
		
	
	
		
			36 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			984 lines
		
	
	
		
			36 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /**
 | |
|  * Copyright (c) 2021 OceanBase
 | |
|  * OceanBase CE is licensed under Mulan PubL v2.
 | |
|  * You can use this software according to the terms and conditions of the Mulan PubL v2.
 | |
|  * You may obtain a copy of Mulan PubL v2 at:
 | |
|  *          http://license.coscl.org.cn/MulanPubL-2.0
 | |
|  * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 | |
|  * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 | |
|  * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 | |
|  * See the Mulan PubL v2 for more details.
 | |
|  */
 | |
| 
 | |
| #include "test_election_cluster.h"
 | |
| #include "share/allocator/ob_tenant_mutil_allocator_mgr.h"
 | |
| #include "share/ob_cluster_version.h"
 | |
| #include "election/ob_election_async_log.h"
 | |
| #include <cstdio>
 | |
| 
 | |
| namespace oceanbase {
 | |
| namespace unittest {
 | |
| const static char* LOCAL_IP = "127.0.0.1";
 | |
| const static int32_t RPC_PORT = 34506;
 | |
| static int64_t membership_version = 0;
 | |
| 
 | |
| int ObTestElectionCluster::start_one_(const int64_t idx, const char* ip, const int32_t rpc_port)
 | |
| {
 | |
|   int ret = OB_SUCCESS;
 | |
| 
 | |
|   if (idx < 0 || idx >= MAX_OB_TRANS_SERVICE_NUM || NULL == ip) {
 | |
|     ELECT_ASYNC_LOG(WARN, "invalid argument", K(idx), KP(ip));
 | |
|     ret = OB_INVALID_ARGUMENT;
 | |
|   } else {
 | |
|     ObAddr addr;
 | |
| 
 | |
|     if (!addr.set_ip_addr(ip, rpc_port)) {
 | |
|       ELECT_ASYNC_LOG(WARN, "set ipv4 addr error", K(ret), K(ip), K(rpc_port));
 | |
|       ret = OB_INVALID_ARGUMENT;
 | |
|     } else if (OB_SUCCESS != (ret = election_mgr_[idx].init(addr, &batch_rpc_, &eg_cb_))) {
 | |
|       ELECT_ASYNC_LOG(WARN, "election mgr init error", K(ret), K(idx));
 | |
|     } else if (OB_SUCCESS != (ret = election_rpc_[idx].init(&election_mgr_[idx], addr))) {
 | |
|       ELECT_ASYNC_LOG(WARN, "election rpc init error", K(ret));
 | |
|     } else if (OB_SUCCESS != (ret = election_mgr_[idx].set_election_rpc(this))) {
 | |
|       ELECT_ASYNC_LOG(WARN, "election mgr set rpc error", K(ret));
 | |
|     } else if (OB_SUCCESS != (ret = election_mgr_[idx].start())) {
 | |
|       ELECT_ASYNC_LOG(WARN, "election mgr start error", K(ret));
 | |
|     } else {
 | |
|       election_mgr_[idx].idx_ = idx;
 | |
|       election_mgr_[idx].addr_ = addr;
 | |
|       ELECT_ASYNC_LOG(INFO, "election mgr start success", K(idx), K(addr));
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   return ret;
 | |
| }
 | |
| 
 | |
| int ObTestElectionCluster::init()
 | |
| {
 | |
|   int ret = OB_SUCCESS;
 | |
| 
 | |
|   ip_ = LOCAL_IP;
 | |
|   base_rpc_port_ = RPC_PORT;
 | |
|   is_force_leader_ = false;
 | |
| 
 | |
|   return ret;
 | |
| }
 | |
| 
 | |
| int ObTestElectionCluster::start()
 | |
| {
 | |
|   int ret = OB_SUCCESS;
 | |
| 
 | |
|   for (int64_t i = 0; OB_SUCC(ret) && i < MAX_OB_TRANS_SERVICE_NUM; ++i) {
 | |
|     if (OB_SUCCESS != (ret = start_one_(i, ip_, base_rpc_port_ + (int32_t)i))) {
 | |
|       ELECT_ASYNC_LOG(WARN, "start one election mgr error", K(ret), K(i));
 | |
|     } else {
 | |
|       ELECT_ASYNC_LOG(INFO, "start one election mgr success", K(i));
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   if (OB_SUCC(ret)) {
 | |
|     inited_ = true;
 | |
|   }
 | |
| 
 | |
|   return ret;
 | |
| }
 | |
| 
 | |
| void ObTestElectionCluster::destroy()
 | |
| {
 | |
|   ELECT_ASYNC_LOG(INFO, "ObTestElectionCluster destroy");
 | |
|   for (int64_t i = 0; i < MAX_OB_TRANS_SERVICE_NUM; ++i) {
 | |
|     election_rpc_[i].destroy();
 | |
|     election_mgr_[i].destroy();
 | |
|   }
 | |
|   inited_ = false;
 | |
| }
 | |
| 
 | |
| int ObTestElectionCluster::post_election_msg(
 | |
|     const ObAddr& server, const ObPartitionKey& partition, const election::ObElectionMsg& msg)
 | |
| {
 | |
|   int ret = OB_SUCCESS;
 | |
| 
 | |
|   if (!inited_) {
 | |
|     ELECT_ASYNC_LOG(WARN, "not init");
 | |
|     ret = OB_NOT_INIT;
 | |
|   } else if (!server.is_valid() || !msg.is_valid()) {
 | |
|     ret = OB_INVALID_ARGUMENT;
 | |
|     ELECT_ASYNC_LOG(WARN, "invalid argument", K(ret), K(server), K(msg));
 | |
|   } else {
 | |
|     if (!is_force_leader_) {
 | |
|       for (int32_t i = 0; i < MAX_ELECTION_MSG_COUNT; ++i) {
 | |
|         if (msg.get_msg_type() == i && rpc_loss_[i].times_-- > 0) {
 | |
|           ELECT_ASYNC_LOG(INFO, "message loss", "msg_type", i);
 | |
|           ret = OB_RPC_POST_ERROR;
 | |
|           break;
 | |
|         }
 | |
|       }
 | |
|     }
 | |
| 
 | |
|     if (OB_SUCC(ret)) {
 | |
|       const int32_t idx = server.get_port() - base_rpc_port_;
 | |
|       if (idx < 0 || idx > MAX_OB_TRANS_SERVICE_NUM) {
 | |
|         ret = OB_ERR_UNEXPECTED;
 | |
|         ELECT_ASYNC_LOG(WARN, "server address error", K(idx), K(server), K(base_rpc_port_));
 | |
|       } else {
 | |
|         ElectionRpcTask* rpc_task = ElectionRpcTaskFactory::alloc();
 | |
|         if (NULL == rpc_task) {
 | |
|           ELECT_ASYNC_LOG(WARN, "alloc election rpc task error");
 | |
|           ret = OB_ALLOCATE_MEMORY_FAILED;
 | |
|         } else {
 | |
|           ObElectionMsgBuffer msgbuf;
 | |
| 
 | |
|           if (OB_SUCCESS !=
 | |
|               (ret = serialization::encode_i32(
 | |
|                    msgbuf.get_data(), msgbuf.get_capacity(), msgbuf.get_position(), msg.get_msg_type()))) {
 | |
|             ELECT_ASYNC_LOG(WARN, "serialize msg type error", K(ret), "msg type", msg.get_msg_type());
 | |
|           } else if (OB_SUCCESS !=
 | |
|                      (ret = partition.serialize(msgbuf.get_data(), msgbuf.get_capacity(), msgbuf.get_position()))) {
 | |
|             ELECT_ASYNC_LOG(WARN, "serialize partition error", K(ret), K(partition));
 | |
|           } else if (OB_SUCCESS != msg.serialize(msgbuf.get_data(), msgbuf.get_capacity(), msgbuf.get_position())) {
 | |
|             ELECT_ASYNC_LOG(WARN, "serialize msg error", K(ret), K(msg));
 | |
|           } else {
 | |
|             rpc_task->server_ = server;
 | |
|             rpc_task->msgbuf_ = msgbuf;
 | |
|             rpc_task->partition_ = partition;
 | |
|             rpc_task->timestamp_ = ObClockGenerator::getClock();
 | |
| 
 | |
|             if (OB_SUCCESS != (ret = election_rpc_[idx].push(rpc_task))) {
 | |
|               ELECT_ASYNC_LOG(WARN, "push election rpc task error", K(ret), K(msg));
 | |
|               ElectionRpcTaskFactory::release(rpc_task);
 | |
|             } else {
 | |
|               ELECT_ASYNC_LOG(INFO, "election msg", K(idx), K(server), K(msg), K(partition));
 | |
|             }
 | |
|           }
 | |
|         }
 | |
|       }
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   return ret;
 | |
| }
 | |
| 
 | |
| int ObTestElectionCluster::post_election_group_msg(const common::ObAddr& server, const ObElectionGroupId& eg_id,
 | |
|     const ObPartArrayBuffer& part_array_buf, const ObElectionMsg& msg)
 | |
| {
 | |
|   UNUSED(server);
 | |
|   UNUSED(eg_id);
 | |
|   UNUSED(part_array_buf);
 | |
|   UNUSED(msg);
 | |
|   return OB_SUCCESS;
 | |
| }
 | |
| 
 | |
| int ObTestElectionCluster::add_partition(const ObPartitionKey& partition, const ObAddr& leader)
 | |
| {
 | |
|   int ret = OB_SUCCESS;
 | |
|   int64_t leader_epoch = 0;
 | |
|   ObIElection* unused = NULL;
 | |
| 
 | |
|   if (!inited_) {
 | |
|     ELECT_ASYNC_LOG(WARN, "not init");
 | |
|     ret = OB_NOT_INIT;
 | |
|   } else if (!partition.is_valid() || !leader.is_valid()) {
 | |
|     ELECT_ASYNC_LOG(WARN, "invalid argument", K(partition), K(leader));
 | |
|     ret = OB_INVALID_ARGUMENT;
 | |
|   } else {
 | |
|     ObAddr addr;
 | |
|     ObMemberList mlist;
 | |
| 
 | |
|     for (int64_t i = 0; OB_SUCC(ret) && i < OB_REPLICA_NUM; ++i) {
 | |
|       if (!addr.set_ip_addr(ip_, base_rpc_port_ + (int32_t)i)) {
 | |
|         ELECT_ASYNC_LOG(WARN, "set ipv4 addr error", K_(ip), "port", base_rpc_port_ + i);
 | |
|         ret = OB_ERR_UNEXPECTED;
 | |
|       } else {
 | |
|         if (OB_SUCCESS != (ret = mlist.add_server(addr))) {
 | |
|           ELECT_ASYNC_LOG(WARN, "mlist add server error", K(ret), K(i), K(mlist), K(addr));
 | |
|         } else if (OB_SUCCESS !=
 | |
|                    (ret = election_mgr_[i].add_partition(partition, OB_REPLICA_NUM, &election_cb_[i], unused))) {
 | |
|           ELECT_ASYNC_LOG(WARN, "election_mgr add partition error", K(ret), K(i));
 | |
|         } else {
 | |
|           ELECT_ASYNC_LOG(INFO, "election_mgr add partition success", K(i), K(partition), K(addr));
 | |
|         }
 | |
|       }
 | |
|     }
 | |
| 
 | |
|     int64_t cur_ts = ObClockGenerator::getClock() - T_ELECT2;
 | |
|     for (int64_t i = 0; OB_SUCC(ret) && i < OB_REPLICA_NUM; ++i) {
 | |
|       if (OB_SUCCESS != (election_mgr_[i].set_candidate(partition, OB_REPLICA_NUM, mlist, ++membership_version))) {
 | |
|         ELECT_ASYNC_LOG(WARN, "set candidate error", K(ret), K(i), K(partition), K(mlist));
 | |
|       } else if (OB_SUCCESS != (ret = election_mgr_[i].start_partition(partition, leader, cur_ts, leader_epoch))) {
 | |
|         ELECT_ASYNC_LOG(WARN, "election_mgr start partition error", K(ret), K(i), K(partition), K(leader));
 | |
|       } else {
 | |
|         ELECT_ASYNC_LOG(INFO, "election_mgr start partition success", K(i), K(partition), K(mlist), K(leader));
 | |
|       }
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   return ret;
 | |
| }
 | |
| 
 | |
| int ObTestElectionCluster::remove_partition(const ObPartitionKey& partition)
 | |
| {
 | |
|   int ret = OB_SUCCESS;
 | |
| 
 | |
|   if (!inited_) {
 | |
|     ELECT_ASYNC_LOG(WARN, "not init");
 | |
|     ret = OB_NOT_INIT;
 | |
|   } else if (!partition.is_valid()) {
 | |
|     ELECT_ASYNC_LOG(WARN, "invalid argument", K(partition));
 | |
|   } else {
 | |
|     for (int64_t i = 0; i < OB_REPLICA_NUM; ++i) {
 | |
|       if (OB_SUCCESS != (ret = election_mgr_[i].remove_partition(partition))) {
 | |
|         ELECT_ASYNC_LOG(WARN, "remove partition error", K(ret), K(i), K(partition));
 | |
|       } else {
 | |
|         ELECT_ASYNC_LOG(INFO, "remove partition success", K(i), K(partition));
 | |
|       }
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   return ret;
 | |
| }
 | |
| 
 | |
| void TestObElectionCluster::SetUp()
 | |
| {
 | |
|   oceanbase::election::ObAsyncLog::getLogger().init("test_election_cluster.log", OB_LOG_LEVEL_INFO, true);
 | |
|   test_partition_.init(combine_id(1, 3001), 0, 1);
 | |
|   EXPECT_TRUE(leader_.set_ip_addr(LOCAL_IP, RPC_PORT));
 | |
|   for (int64_t i = 0; i < OB_REPLICA_NUM; ++i) {
 | |
|     ObElectionPriority priority(true, OB_REPLICA_NUM, ObVersion(10), OB_REPLICA_NUM - i);
 | |
|     cluster_.election_cb_[i].init(priority);
 | |
|   }
 | |
| 
 | |
|   ObElectionPriority priority(true, 1, ObVersion(1), 1);
 | |
|   cluster_.election_cb_[3].init(priority);
 | |
| 
 | |
|   EXPECT_EQ(OB_SUCCESS, cluster_.init());
 | |
|   EXPECT_EQ(OB_SUCCESS, cluster_.start());
 | |
|   EXPECT_EQ(OB_SUCCESS, cluster_.add_partition(test_partition_, leader_));
 | |
| }
 | |
| 
 | |
| void TestObElectionCluster::TearTown()
 | |
| {
 | |
|   cluster_.remove_partition(test_partition_);
 | |
|   cluster_.destroy();
 | |
| }
 | |
| 
 | |
| TEST_F(TestObElectionCluster, test_get_leader)
 | |
| {
 | |
|   ObAddr addr;
 | |
|   ObAddr previous_leader;
 | |
|   int64_t leader_epoch = 0;
 | |
|   bool unused_bool;
 | |
|   ELECT_ASYNC_LOG(INFO, "test_get_leader");
 | |
| 
 | |
|   for (int64_t i = 0; i < OB_REPLICA_NUM; ++i) {
 | |
|     addr.reset();
 | |
|     EXPECT_EQ(OB_SUCCESS,
 | |
|         cluster_.election_mgr_[i].get_leader(test_partition_, addr, previous_leader, leader_epoch, unused_bool));
 | |
|     EXPECT_EQ(leader_, addr);
 | |
|   }
 | |
| }
 | |
| 
 | |
| TEST_F(TestObElectionCluster, change_leader_not_exist)
 | |
| {
 | |
|   int ret = OB_SUCCESS;
 | |
|   ObAddr addr;
 | |
|   ObAddr next_leader;
 | |
|   ObAddr previous_leader;
 | |
|   int64_t leader_epoch = 0;
 | |
|   bool unused_bool;
 | |
|   ELECT_ASYNC_LOG(INFO, "change_leader_not_exist");
 | |
|   sleep(5);
 | |
| 
 | |
|   EXPECT_EQ(OB_SUCCESS,
 | |
|       cluster_.election_mgr_[0].get_leader(test_partition_, addr, previous_leader, leader_epoch, unused_bool));
 | |
|   EXPECT_EQ(leader_, addr);
 | |
| 
 | |
|   const int32_t idx = addr.get_port() - cluster_.base_rpc_port_;
 | |
| 
 | |
|   EXPECT_EQ(OB_SUCCESS, cluster_.election_mgr_[1].stop_partition(test_partition_));
 | |
| 
 | |
|   EXPECT_TRUE(next_leader.set_ip_addr(LOCAL_IP, RPC_PORT + 1));
 | |
|   const int32_t next_idx = 1;
 | |
|   int64_t old_membership_version = cluster_.election_cb_[next_idx].get_priority().get_membership_version();
 | |
|   int64_t old_data_version = cluster_.election_cb_[next_idx].get_priority().get_data_version();
 | |
|   ObTsWindows ts_windows;
 | |
|   cluster_.election_cb_[next_idx].priority_.set_membership_version(
 | |
|       cluster_.election_cb_[idx].get_priority().get_membership_version() + 1);
 | |
|   cluster_.election_cb_[next_idx].priority_.set_data_version(
 | |
|       cluster_.election_cb_[idx].get_priority().get_data_version());
 | |
|   if (OB_SUCCESS != (ret = cluster_.election_mgr_[idx].change_leader_async(test_partition_, next_leader, ts_windows))) {
 | |
|     ELECT_ASYNC_LOG(WARN, "change leader error", K(ret), K_(test_partition), K(next_leader));
 | |
|   } else {
 | |
|     sleep(8);
 | |
|   }
 | |
|   //  EXPECT_EQ(OB_ELECTION_WARN_INVALID_LEADER, cluster_.election_mgr_[0].get_leader(test_partition_, addr,
 | |
|   //  previous_leader, leader_epoch, unused_bool));
 | |
|   cluster_.election_cb_[next_idx].priority_.set_membership_version(old_membership_version);
 | |
|   cluster_.election_cb_[next_idx].priority_.set_data_version(ObVersion(old_data_version));
 | |
| }
 | |
| 
 | |
| TEST_F(TestObElectionCluster, change_leader)
 | |
| {
 | |
|   int ret = OB_SUCCESS;
 | |
|   ObAddr addr;
 | |
|   ObAddr next_leader;
 | |
|   ObAddr previous_leader;
 | |
|   ObTsWindows ts_windows;
 | |
|   int64_t leader_epoch = 0;
 | |
|   bool unused_bool;
 | |
|   ELECT_ASYNC_LOG(INFO, "change_leader");
 | |
|   sleep(3);
 | |
| 
 | |
|   EXPECT_EQ(OB_SUCCESS,
 | |
|       cluster_.election_mgr_[0].get_leader(test_partition_, addr, previous_leader, leader_epoch, unused_bool));
 | |
|   EXPECT_EQ(leader_, addr);
 | |
| 
 | |
|   const int32_t idx = addr.get_port() - cluster_.base_rpc_port_;
 | |
|   EXPECT_TRUE(next_leader.set_ip_addr(LOCAL_IP, RPC_PORT + 1));
 | |
|   if (OB_SUCCESS != (ret = cluster_.election_mgr_[idx].change_leader_async(test_partition_, next_leader, ts_windows))) {
 | |
|     ELECT_ASYNC_LOG(WARN, "change leader error", K(ret), K_(test_partition), K(next_leader));
 | |
|   } else {
 | |
|     sleep(3);
 | |
|   }
 | |
|   EXPECT_EQ(OB_SUCCESS,
 | |
|       cluster_.election_mgr_[0].get_leader(test_partition_, addr, previous_leader, leader_epoch, unused_bool));
 | |
|   EXPECT_EQ(addr, next_leader);
 | |
| }
 | |
| 
 | |
| TEST_F(TestObElectionCluster, test_vote_vote_leader_candidate_false)
 | |
| {
 | |
|   ObAddr addr;
 | |
|   ObAddr previous_leader;
 | |
|   int64_t leader_epoch = 0;
 | |
|   bool unused_bool;
 | |
|   ELECT_ASYNC_LOG(INFO, "test_vote_vote_leader_candidate_false");
 | |
| 
 | |
|   EXPECT_EQ(OB_SUCCESS,
 | |
|       cluster_.election_mgr_[0].get_leader(test_partition_, addr, previous_leader, leader_epoch, unused_bool));
 | |
|   EXPECT_EQ(leader_, addr);
 | |
|   const int32_t idx = addr.get_port() - cluster_.base_rpc_port_;
 | |
|   // printf("idx=%d\n", idx);
 | |
| 
 | |
|   // set leader priority candidate is false
 | |
|   // cluster_.election_cb_[idx].priority_.set_candidate(false);
 | |
| 
 | |
|   sleep(5);
 | |
|   EXPECT_EQ(OB_SUCCESS,
 | |
|       cluster_.election_mgr_[idx].get_leader(test_partition_, addr, previous_leader, leader_epoch, unused_bool));
 | |
|   EXPECT_EQ(leader_, addr);
 | |
| 
 | |
|   cluster_.election_cb_[idx].priority_.set_candidate(true);
 | |
| }
 | |
| 
 | |
| //  A(A, B, C)
 | |
| //  B(A, B, C)
 | |
| //  C(A, B, C)
 | |
| //  D(A, B, C)
 | |
| TEST_F(TestObElectionCluster, test_query_not_in_member)
 | |
| {
 | |
|   int ret = OB_SUCCESS;
 | |
|   ObAddr addr;
 | |
|   ObAddr previous_leader;
 | |
|   ObMemberList mlist;
 | |
|   int64_t leader_epoch = 0;
 | |
|   ObIElection* unused = NULL;
 | |
|   ELECT_ASYNC_LOG(INFO, "test_query_not_in_member");
 | |
|   ObIElectionMgr* query = &cluster_.election_mgr_[3];
 | |
|   ObIElectionCallback* query_election_cb = &cluster_.election_cb_[3];
 | |
| 
 | |
|   for (int32_t i = 0; OB_SUCC(ret) && i < OB_REPLICA_NUM; ++i) {
 | |
|     if (!(addr.set_ip_addr(cluster_.ip_, cluster_.base_rpc_port_ + i))) {
 | |
|       ELECT_ASYNC_LOG(WARN, "set addr error", "ip", cluster_.ip_, "port", cluster_.base_rpc_port_ + i);
 | |
|     } else {
 | |
|       if (OB_SUCCESS != (ret = mlist.add_server(addr))) {
 | |
|         ELECT_ASYNC_LOG(WARN, "mlist add server error", K(ret), K(mlist), K(addr));
 | |
|       } else {
 | |
|         ELECT_ASYNC_LOG(INFO, "mlist add server", K(mlist), K(addr));
 | |
|       }
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   if (OB_SUCC(ret)) {
 | |
|     if (OB_SUCCESS != (ret = query->add_partition(test_partition_, OB_REPLICA_NUM, query_election_cb, unused))) {
 | |
|       ELECT_ASYNC_LOG(WARN, "add partition error", K(ret), K_(test_partition));
 | |
|     } else if (OB_SUCCESS !=
 | |
|                (ret = query->set_candidate(test_partition_, OB_REPLICA_NUM, mlist, ++membership_version))) {
 | |
|       ELECT_ASYNC_LOG(WARN, "set candidate error", K(ret), K_(test_partition), K(mlist));
 | |
|     } else if (OB_SUCCESS != (ret = query->start_partition(test_partition_))) {
 | |
|       ELECT_ASYNC_LOG(WARN, "start candidate error", K(ret), K_(test_partition));
 | |
|     } else {
 | |
|       ELECT_ASYNC_LOG(INFO, "query start partition success", K_(test_partition), K(mlist));
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   sleep(10);
 | |
|   bool unused_bool;
 | |
|   for (int64_t i = 0; i < 10; ++i) {
 | |
|     addr.reset();
 | |
|     EXPECT_EQ(OB_SUCCESS, query->get_leader(test_partition_, addr, previous_leader, leader_epoch, unused_bool));
 | |
|     EXPECT_EQ(leader_, addr);
 | |
|     sleep(1);
 | |
|   }
 | |
|   EXPECT_EQ(OB_SUCCESS, query->remove_partition(test_partition_));
 | |
| }
 | |
| 
 | |
| // A(A, B, C)
 | |
| // B(A, B, C)
 | |
| // C(A, B, C)
 | |
| // D(B, C, D)
 | |
| TEST_F(TestObElectionCluster, test_query_not_in_leader_member_and_without_leader)
 | |
| {
 | |
|   int ret = OB_SUCCESS;
 | |
|   ObAddr addr;
 | |
|   ObMemberList mlist;
 | |
|   ObAddr previous_leader;
 | |
|   int64_t leader_epoch = 0;
 | |
|   bool unused_bool;
 | |
|   ObIElection* unused = NULL;
 | |
|   ELECT_ASYNC_LOG(INFO, "test_query_not_in_leader_member_and_without_leader");
 | |
| 
 | |
|   ObIElectionMgr* query = &cluster_.election_mgr_[3];
 | |
|   ObIElectionCallback* query_election_cb = &cluster_.election_cb_[3];
 | |
| 
 | |
|   for (int32_t i = 1; OB_SUCC(ret) && i < MAX_OB_TRANS_SERVICE_NUM; ++i) {
 | |
|     if (!(addr.set_ip_addr(cluster_.ip_, cluster_.base_rpc_port_ + i))) {
 | |
|       ELECT_ASYNC_LOG(WARN, "set addr error", "ip", cluster_.ip_, "port", cluster_.base_rpc_port_ + i);
 | |
|     } else {
 | |
|       EXPECT_EQ(OB_SUCCESS, mlist.add_server(addr));
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   if (OB_SUCC(ret)) {
 | |
|     if (OB_SUCCESS != (ret = query->add_partition(test_partition_, OB_REPLICA_NUM, query_election_cb, unused))) {
 | |
|       ELECT_ASYNC_LOG(WARN, "add partition error", K(ret), K_(test_partition));
 | |
|     } else if (OB_SUCCESS !=
 | |
|                (ret = query->set_candidate(test_partition_, OB_REPLICA_NUM, mlist, ++membership_version))) {
 | |
|       ELECT_ASYNC_LOG(WARN, "set candidate error", K(ret), K_(test_partition), K(mlist));
 | |
|     } else if (OB_SUCCESS != (ret = query->start_partition(test_partition_))) {
 | |
|       ELECT_ASYNC_LOG(WARN, "start candidate error", K(ret), K_(test_partition));
 | |
|     } else {
 | |
|       ELECT_ASYNC_LOG(INFO, "query start partition success", K_(test_partition), K(mlist));
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   sleep(10);
 | |
|   for (int64_t i = 0; i < 10; ++i) {
 | |
|     addr.reset();
 | |
|     EXPECT_EQ(OB_SUCCESS, query->get_leader(test_partition_, addr, previous_leader, leader_epoch, unused_bool));
 | |
|     EXPECT_EQ(leader_, addr);
 | |
|     sleep(1);
 | |
|   }
 | |
|   EXPECT_EQ(OB_SUCCESS, query->remove_partition(test_partition_));
 | |
| }
 | |
| 
 | |
| // A(A, B, C)
 | |
| // B(A, B, C)
 | |
| // C(A, B, C)
 | |
| // D(A, B, D)
 | |
| 
 | |
| TEST_F(TestObElectionCluster, test_query_leader_and_with_leader_and_self)
 | |
| {
 | |
|   int ret = OB_SUCCESS;
 | |
|   ObAddr addr;
 | |
|   ObAddr previous_leader;
 | |
|   ObMemberList mlist;
 | |
|   int64_t leader_epoch = 0;
 | |
|   ObIElection* unused = NULL;
 | |
|   ELECT_ASYNC_LOG(INFO, "test_query_leader_and_with_leader_and_self");
 | |
| 
 | |
|   ObIElectionMgr* query = &cluster_.election_mgr_[3];
 | |
|   ObIElectionCallback* query_election_cb = &cluster_.election_cb_[3];
 | |
| 
 | |
|   for (int64_t i = 0; OB_SUCC(ret) && i < MAX_OB_TRANS_SERVICE_NUM; ++i) {
 | |
|     if (i == 2) {  // skip
 | |
|       continue;
 | |
|     }
 | |
|     if (!(addr.set_ip_addr(cluster_.ip_, cluster_.base_rpc_port_ + (int32_t)i))) {
 | |
|       ELECT_ASYNC_LOG(WARN, "set addr error", "ip", cluster_.ip_, "port", cluster_.base_rpc_port_ + i);
 | |
|     } else {
 | |
|       if (OB_SUCCESS != (ret = mlist.add_server(addr))) {
 | |
|         ELECT_ASYNC_LOG(WARN, "mlist add server error", K(ret), K(mlist), K(addr));
 | |
|       } else {
 | |
|         ELECT_ASYNC_LOG(INFO, "mlist add server", K(mlist), K(addr));
 | |
|       }
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   if (OB_SUCC(ret)) {
 | |
|     if (OB_SUCCESS != (ret = query->add_partition(test_partition_, OB_REPLICA_NUM, query_election_cb, unused))) {
 | |
|       ELECT_ASYNC_LOG(WARN, "add partition error", K(ret), K_(test_partition));
 | |
|     } else if (OB_SUCCESS !=
 | |
|                (ret = query->set_candidate(test_partition_, OB_REPLICA_NUM, mlist, ++membership_version))) {
 | |
|       ELECT_ASYNC_LOG(WARN, "set candidate error", K(ret), K_(test_partition), K(mlist));
 | |
|     } else if (OB_SUCCESS != (ret = query->start_partition(test_partition_))) {
 | |
|       ELECT_ASYNC_LOG(WARN, "start candidate error", K(ret), K_(test_partition));
 | |
|     } else {
 | |
|       ELECT_ASYNC_LOG(INFO, "query start partition success", K_(test_partition), K(mlist));
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   sleep(10);
 | |
|   bool unused_bool;
 | |
|   for (int64_t i = 0; i < 10; ++i) {
 | |
|     addr.reset();
 | |
|     EXPECT_EQ(OB_SUCCESS, query->get_leader(test_partition_, addr, previous_leader, leader_epoch, unused_bool));
 | |
|     EXPECT_EQ(leader_, addr);
 | |
|     sleep(1);
 | |
|   }
 | |
|   EXPECT_EQ(OB_SUCCESS, query->remove_partition(test_partition_));
 | |
| }
 | |
| 
 | |
| // A(A, B, C)
 | |
| // B(A, B, C)
 | |
| // C(A, B, C)
 | |
| // D(C, D)
 | |
| 
 | |
| TEST_F(TestObElectionCluster, test_query_leader_and_without_leader)
 | |
| {
 | |
|   int ret = OB_SUCCESS;
 | |
|   ObAddr addr;
 | |
|   ObAddr previous_leader;
 | |
|   ObMemberList mlist;
 | |
|   int64_t leader_epoch = 0;
 | |
|   ObIElection* unused = NULL;
 | |
|   ELECT_ASYNC_LOG(INFO, "test_query_leader_and_without_leader");
 | |
| 
 | |
|   ObIElectionMgr* query = &cluster_.election_mgr_[3];
 | |
|   ObIElectionCallback* query_election_cb = &cluster_.election_cb_[3];
 | |
| 
 | |
|   for (int64_t i = 0; OB_SUCC(ret) && i < MAX_OB_TRANS_SERVICE_NUM; ++i) {
 | |
|     if (i == 0 || i == 1) {  // skip
 | |
|       continue;
 | |
|     }
 | |
|     if (!(addr.set_ip_addr(cluster_.ip_, cluster_.base_rpc_port_ + (int32_t)i))) {
 | |
|       ELECT_ASYNC_LOG(WARN, "set addr error", "ip", cluster_.ip_, "port", cluster_.base_rpc_port_ + i);
 | |
|     } else {
 | |
|       if (OB_SUCCESS != (ret = mlist.add_server(addr))) {
 | |
|         ELECT_ASYNC_LOG(WARN, "mlist add server error", K(ret), K(mlist), K(addr));
 | |
|       } else {
 | |
|         ELECT_ASYNC_LOG(INFO, "mlist add server", K(mlist), K(addr));
 | |
|       }
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   if (OB_SUCC(ret)) {
 | |
|     if (OB_SUCCESS != (ret = query->add_partition(test_partition_, OB_REPLICA_NUM, query_election_cb, unused))) {
 | |
|       ELECT_ASYNC_LOG(WARN, "add partition error", K(ret), K_(test_partition));
 | |
|     } else if (OB_SUCCESS !=
 | |
|                (ret = query->set_candidate(test_partition_, OB_REPLICA_NUM, mlist, ++membership_version))) {
 | |
|       ELECT_ASYNC_LOG(WARN, "set candidate error", K(ret), K_(test_partition), K(mlist));
 | |
|     } else if (OB_SUCCESS != (ret = query->start_partition(test_partition_))) {
 | |
|       ELECT_ASYNC_LOG(WARN, "start candidate error", K(ret), K_(test_partition));
 | |
|     } else {
 | |
|       ELECT_ASYNC_LOG(INFO, "query start partition success", K_(test_partition), K(mlist));
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   sleep(10);
 | |
|   bool unused_bool;
 | |
|   for (int64_t i = 0; i < 10; ++i) {
 | |
|     addr.reset();
 | |
|     EXPECT_EQ(OB_SUCCESS, query->get_leader(test_partition_, addr, previous_leader, leader_epoch, unused_bool));
 | |
|     EXPECT_EQ(leader_, addr);
 | |
|     sleep(1);
 | |
|   }
 | |
|   EXPECT_EQ(OB_SUCCESS, query->remove_partition(test_partition_));
 | |
| }
 | |
| 
 | |
| // A(A, B, C)
 | |
| // B(A, B, C)
 | |
| // C(A, B, C)
 | |
| // D(B, C)
 | |
| 
 | |
| TEST_F(TestObElectionCluster, test_query_leader_and_without_leader_and_self)
 | |
| {
 | |
|   int ret = OB_SUCCESS;
 | |
|   ObAddr addr;
 | |
|   ObAddr previous_leader;
 | |
|   ObMemberList mlist;
 | |
|   int64_t leader_epoch = 0;
 | |
|   ObIElection* unused = NULL;
 | |
|   ObIElectionMgr* query = &cluster_.election_mgr_[3];
 | |
|   ELECT_ASYNC_LOG(INFO, "test_query_leader_and_without_leader_and_self");
 | |
| 
 | |
|   ObIElectionCallback* query_election_cb = &cluster_.election_cb_[3];
 | |
|   for (int64_t i = 0; OB_SUCC(ret) && i < MAX_OB_TRANS_SERVICE_NUM; ++i) {
 | |
|     if (i == 0 || i == 3) {  // skip
 | |
|       continue;
 | |
|     }
 | |
|     if (!(addr.set_ip_addr(cluster_.ip_, cluster_.base_rpc_port_ + (int32_t)i))) {
 | |
|       ELECT_ASYNC_LOG(WARN, "set addr error", "ip", cluster_.ip_, "port", cluster_.base_rpc_port_ + i);
 | |
|     } else {
 | |
|       if (OB_SUCCESS != (ret = mlist.add_server(addr))) {
 | |
|         ELECT_ASYNC_LOG(WARN, "mlist add server error", K(ret), K(mlist), K(addr));
 | |
|       } else {
 | |
|         ELECT_ASYNC_LOG(INFO, "mlist add server", K(mlist), K(addr));
 | |
|       }
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   if (OB_SUCC(ret)) {
 | |
|     if (OB_SUCCESS != (ret = query->add_partition(test_partition_, OB_REPLICA_NUM, query_election_cb, unused))) {
 | |
|       ELECT_ASYNC_LOG(WARN, "add partition error", K(ret), K_(test_partition));
 | |
|     } else if (OB_SUCCESS !=
 | |
|                (ret = query->set_candidate(test_partition_, OB_REPLICA_NUM, mlist, ++membership_version))) {
 | |
|       ELECT_ASYNC_LOG(WARN, "set candidate error", K(ret), K_(test_partition), K(mlist));
 | |
|     } else if (OB_SUCCESS != (ret = query->start_partition(test_partition_))) {
 | |
|       ELECT_ASYNC_LOG(WARN, "start candidate error", K(ret), K_(test_partition));
 | |
|     } else {
 | |
|       ELECT_ASYNC_LOG(INFO, "query start partition success", K_(test_partition), K(mlist));
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   sleep(10);
 | |
|   bool unused_bool;
 | |
|   for (int64_t i = 0; i < 10; ++i) {
 | |
|     addr.reset();
 | |
|     EXPECT_EQ(OB_SUCCESS, query->get_leader(test_partition_, addr, previous_leader, leader_epoch, unused_bool));
 | |
|     EXPECT_EQ(leader_, addr);
 | |
|     sleep(1);
 | |
|   }
 | |
|   EXPECT_EQ(OB_SUCCESS, query->remove_partition(test_partition_));
 | |
| }
 | |
| 
 | |
| // A(A, B, C)
 | |
| // B(A, B, C)
 | |
| // C(A, B, C)
 | |
| // D(A, B, C)
 | |
| 
 | |
| TEST_F(TestObElectionCluster, test_query_leader_and_without_self)
 | |
| {
 | |
|   int ret = OB_SUCCESS;
 | |
|   ObAddr addr;
 | |
|   ObAddr previous_leader;
 | |
|   ObMemberList mlist;
 | |
|   int64_t leader_epoch = 0;
 | |
|   ObIElection* unused = NULL;
 | |
|   ELECT_ASYNC_LOG(INFO, "test_query_leader_and_without_self");
 | |
| 
 | |
|   ObIElectionMgr* query = &cluster_.election_mgr_[3];
 | |
|   ObIElectionCallback* query_election_cb = &cluster_.election_cb_[3];
 | |
| 
 | |
|   for (int64_t i = 0; OB_SUCC(ret) && i < MAX_OB_TRANS_SERVICE_NUM; ++i) {
 | |
|     if (i == 2) {  // skip
 | |
|       continue;
 | |
|     }
 | |
|     if (!(addr.set_ip_addr(cluster_.ip_, cluster_.base_rpc_port_ + (int32_t)i))) {
 | |
|       ELECT_ASYNC_LOG(WARN, "set addr error", "ip", cluster_.ip_, "port", cluster_.base_rpc_port_ + i);
 | |
|     } else {
 | |
|       if (OB_SUCCESS != (ret = mlist.add_server(addr))) {
 | |
|         ELECT_ASYNC_LOG(WARN, "mlist add server error", K(ret), K(mlist), K(addr));
 | |
|       } else {
 | |
|         ELECT_ASYNC_LOG(INFO, "mlist add server", K(mlist), K(addr));
 | |
|       }
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   if (OB_SUCC(ret)) {
 | |
|     if (OB_SUCCESS != (ret = query->add_partition(test_partition_, OB_REPLICA_NUM, query_election_cb, unused))) {
 | |
|       ELECT_ASYNC_LOG(WARN, "add partition error", K(ret), K_(test_partition));
 | |
|     } else if (OB_SUCCESS !=
 | |
|                (ret = query->set_candidate(test_partition_, OB_REPLICA_NUM, mlist, ++membership_version))) {
 | |
|       ELECT_ASYNC_LOG(WARN, "set candidate error", K(ret), K_(test_partition), K(mlist));
 | |
|     } else if (OB_SUCCESS != (ret = query->start_partition(test_partition_))) {
 | |
|       ELECT_ASYNC_LOG(WARN, "start candidate error", K(ret), K_(test_partition));
 | |
|     } else {
 | |
|       ELECT_ASYNC_LOG(INFO, "query start partition success", K_(test_partition), K(mlist));
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   sleep(10);
 | |
|   bool unused_bool;
 | |
|   for (int64_t i = 0; i < 10; ++i) {
 | |
|     addr.reset();
 | |
|     EXPECT_EQ(OB_SUCCESS, query->get_leader(test_partition_, addr, previous_leader, leader_epoch, unused_bool));
 | |
|     EXPECT_EQ(leader_, addr);
 | |
|     sleep(1);
 | |
|   }
 | |
|   EXPECT_EQ(OB_SUCCESS, query->remove_partition(test_partition_));
 | |
| }
 | |
| 
 | |
| // A(A, D)
 | |
| // D(B, C)
 | |
| 
 | |
| TEST_F(TestObElectionCluster, query_vote_leader_no_query)
 | |
| {
 | |
|   int ret = OB_SUCCESS;
 | |
|   ObAddr leader;
 | |
|   ObAddr addr;
 | |
|   ObAddr next_leader;
 | |
|   ObAddr previous_leader;
 | |
|   int64_t leader_epoch = 0;
 | |
|   const int32_t idx = 0;
 | |
|   ObIElection* unused = NULL;
 | |
|   ELECT_ASYNC_LOG(INFO, "query_vote_leader_no_query");
 | |
| 
 | |
|   for (int64_t i = 0; OB_SUCC(ret) && i < OB_REPLICA_NUM; ++i) {
 | |
|     if (i != idx) {
 | |
|       EXPECT_EQ(OB_SUCCESS, cluster_.election_mgr_[i].remove_partition(test_partition_));
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   sleep(10);
 | |
|   bool unused_bool;
 | |
|   EXPECT_EQ(OB_ELECTION_WARN_INVALID_LEADER,
 | |
|       cluster_.election_mgr_[idx].get_leader(test_partition_, leader, previous_leader, leader_epoch, unused_bool));
 | |
| 
 | |
|   ObMemberList mlist;
 | |
|   ObIElectionMgr* tmp_replica = &cluster_.election_mgr_[3];
 | |
| 
 | |
|   EXPECT_TRUE(addr.set_ip_addr(cluster_.ip_, cluster_.base_rpc_port_ + idx));
 | |
|   EXPECT_EQ(OB_SUCCESS, mlist.add_server(addr));
 | |
|   EXPECT_TRUE(addr.set_ip_addr(cluster_.ip_, cluster_.base_rpc_port_ + 3));
 | |
|   EXPECT_EQ(OB_SUCCESS, mlist.add_server(addr));
 | |
|   cluster_.election_mgr_[idx].set_candidate(test_partition_, OB_REPLICA_NUM, mlist, ++membership_version);
 | |
| 
 | |
|   mlist.reset();
 | |
|   EXPECT_TRUE(addr.set_ip_addr(cluster_.ip_, cluster_.base_rpc_port_ + 1));
 | |
|   mlist.add_server(addr);
 | |
|   EXPECT_TRUE(addr.set_ip_addr(cluster_.ip_, cluster_.base_rpc_port_ + 2));
 | |
|   mlist.add_server(addr);
 | |
| 
 | |
|   ObElectionPriority priority(false, 1, ObVersion(1), 1);
 | |
|   cluster_.election_cb_[3].init(priority);
 | |
|   ObIElectionCallback* election_cb = &cluster_.election_cb_[3];
 | |
| 
 | |
|   if (OB_SUCCESS != (ret = tmp_replica->add_partition(test_partition_, OB_REPLICA_NUM, election_cb, unused))) {
 | |
|     ELECT_ASYNC_LOG(WARN, "add partition error", K(ret), K_(test_partition));
 | |
|   } else if (OB_SUCCESS !=
 | |
|              (ret = tmp_replica->set_candidate(test_partition_, OB_REPLICA_NUM, mlist, ++membership_version))) {
 | |
|     ELECT_ASYNC_LOG(WARN, "set candidate error", K(ret), K_(test_partition), K(mlist));
 | |
|   } else if (OB_SUCCESS != (ret = tmp_replica->start_partition(test_partition_))) {
 | |
|     ELECT_ASYNC_LOG(WARN, "start candidate error", K(ret), K_(test_partition));
 | |
|   } else {
 | |
|     ELECT_ASYNC_LOG(INFO, "query start partition success", K_(test_partition), K(mlist));
 | |
|   }
 | |
| 
 | |
|   sleep(10);
 | |
|   leader.reset();
 | |
|   EXPECT_EQ(OB_SUCCESS, tmp_replica->get_leader(test_partition_, leader, previous_leader, leader_epoch, unused_bool));
 | |
|   leader.reset();
 | |
|   EXPECT_EQ(OB_SUCCESS,
 | |
|       cluster_.election_mgr_[idx].get_leader(test_partition_, leader, previous_leader, leader_epoch, unused_bool));
 | |
|   EXPECT_TRUE(addr.set_ip_addr(cluster_.ip_, cluster_.base_rpc_port_ + idx));
 | |
|   EXPECT_EQ(leader, addr);
 | |
| 
 | |
|   sleep(15);
 | |
|   for (int64_t i = 0; i < 10; ++i) {
 | |
|     leader.reset();
 | |
|     EXPECT_EQ(OB_SUCCESS,
 | |
|         cluster_.election_mgr_[idx].get_leader(test_partition_, leader, previous_leader, leader_epoch, unused_bool));
 | |
|     leader.reset();
 | |
|     EXPECT_EQ(OB_SUCCESS, tmp_replica->get_leader(test_partition_, leader, previous_leader, leader_epoch, unused_bool));
 | |
|     EXPECT_EQ(leader, addr);
 | |
|     sleep(1);
 | |
|   }
 | |
| 
 | |
|   EXPECT_EQ(OB_SUCCESS, tmp_replica->remove_partition(test_partition_));
 | |
| }
 | |
| 
 | |
| // A(A, B)
 | |
| // B(A, B)
 | |
| // D(B, C)
 | |
| 
 | |
| // A (A, D)
 | |
| // D (B, C)
 | |
| 
 | |
| TEST_F(TestObElectionCluster, query_vote_leader_need_query_to_without_query)
 | |
| {
 | |
|   int ret = OB_SUCCESS;
 | |
|   ObAddr leader;
 | |
|   ObAddr addr;
 | |
|   ObAddr next_leader;
 | |
|   ObAddr previous_leader;
 | |
|   int64_t leader_epoch = 0;
 | |
|   const int32_t idx = 0;
 | |
|   ObIElection* unused = NULL;
 | |
|   ELECT_ASYNC_LOG(INFO, "query_vote_leader_need_query_to_without_query");
 | |
| 
 | |
|   EXPECT_EQ(OB_SUCCESS, cluster_.election_mgr_[2].remove_partition(test_partition_));
 | |
| 
 | |
|   ObIElectionMgr* tmp_replica = &cluster_.election_mgr_[3];
 | |
|   ObMemberList mlist;
 | |
|   EXPECT_TRUE(addr.set_ip_addr(cluster_.ip_, cluster_.base_rpc_port_ + idx));
 | |
|   EXPECT_EQ(OB_SUCCESS, mlist.add_server(addr));
 | |
|   EXPECT_TRUE(addr.set_ip_addr(cluster_.ip_, cluster_.base_rpc_port_ + 1));
 | |
|   EXPECT_EQ(OB_SUCCESS, mlist.add_server(addr));
 | |
|   cluster_.election_mgr_[idx].set_candidate(test_partition_, OB_REPLICA_NUM, mlist, ++membership_version);
 | |
|   cluster_.election_mgr_[idx + 1].set_candidate(test_partition_, OB_REPLICA_NUM, mlist, ++membership_version);
 | |
| 
 | |
|   mlist.reset();
 | |
|   EXPECT_TRUE(addr.set_ip_addr(cluster_.ip_, cluster_.base_rpc_port_ + 1));
 | |
|   mlist.add_server(addr);
 | |
|   EXPECT_TRUE(addr.set_ip_addr(cluster_.ip_, cluster_.base_rpc_port_ + 2));
 | |
|   mlist.add_server(addr);
 | |
| 
 | |
|   ObElectionPriority priority(false, 1, ObVersion(1), 1);
 | |
|   cluster_.election_cb_[3].init(priority);
 | |
|   ObIElectionCallback* election_cb = &cluster_.election_cb_[3];
 | |
| 
 | |
|   if (OB_SUCCESS != (ret = tmp_replica->add_partition(test_partition_, OB_REPLICA_NUM, election_cb, unused))) {
 | |
|     ELECT_ASYNC_LOG(WARN, "add partition error", K(ret), K_(test_partition));
 | |
|   } else if (OB_SUCCESS !=
 | |
|              (ret = tmp_replica->set_candidate(test_partition_, OB_REPLICA_NUM, mlist, ++membership_version))) {
 | |
|     ELECT_ASYNC_LOG(WARN, "set candidate error", K(ret), K_(test_partition), K(mlist));
 | |
|   } else if (OB_SUCCESS != (ret = tmp_replica->start_partition(test_partition_))) {
 | |
|     ELECT_ASYNC_LOG(WARN, "start candidate error", K(ret), K_(test_partition));
 | |
|   } else {
 | |
|     ELECT_ASYNC_LOG(INFO, "query start partition success", K_(test_partition), K(mlist));
 | |
|   }
 | |
| 
 | |
|   sleep(10);
 | |
|   leader.reset();
 | |
|   bool unused_bool;
 | |
|   EXPECT_EQ(OB_SUCCESS,
 | |
|       cluster_.election_mgr_[idx].get_leader(test_partition_, leader, previous_leader, leader_epoch, unused_bool));
 | |
|   EXPECT_TRUE(addr.set_ip_addr(cluster_.ip_, cluster_.base_rpc_port_ + idx));
 | |
|   EXPECT_EQ(leader, addr);
 | |
| 
 | |
|   sleep(15);
 | |
|   for (int64_t i = 0; i < 10; ++i) {
 | |
|     leader.reset();
 | |
|     EXPECT_EQ(OB_SUCCESS,
 | |
|         cluster_.election_mgr_[idx].get_leader(test_partition_, leader, previous_leader, leader_epoch, unused_bool));
 | |
|     EXPECT_EQ(OB_SUCCESS, tmp_replica->get_leader(test_partition_, leader, previous_leader, leader_epoch, unused_bool));
 | |
|     EXPECT_EQ(leader, addr);
 | |
|     sleep(1);
 | |
|   }
 | |
| 
 | |
|   EXPECT_EQ(OB_SUCCESS, cluster_.election_mgr_[1].remove_partition(test_partition_));
 | |
| 
 | |
|   sleep(10);
 | |
|   EXPECT_EQ(OB_ELECTION_WARN_INVALID_LEADER,
 | |
|       cluster_.election_mgr_[idx].get_leader(test_partition_, leader, previous_leader, leader_epoch, unused_bool));
 | |
| 
 | |
|   mlist.reset();
 | |
|   EXPECT_TRUE(addr.set_ip_addr(cluster_.ip_, cluster_.base_rpc_port_ + idx));
 | |
|   EXPECT_EQ(OB_SUCCESS, mlist.add_server(addr));
 | |
|   EXPECT_TRUE(addr.set_ip_addr(cluster_.ip_, cluster_.base_rpc_port_ + 3));
 | |
|   EXPECT_EQ(OB_SUCCESS, mlist.add_server(addr));
 | |
|   cluster_.election_mgr_[idx].set_candidate(test_partition_, OB_REPLICA_NUM, mlist, ++membership_version);
 | |
| 
 | |
|   sleep(15);
 | |
|   for (int64_t i = 0; i < 1; ++i) {
 | |
|     leader.reset();
 | |
|     EXPECT_EQ(OB_SUCCESS,
 | |
|         cluster_.election_mgr_[idx].get_leader(test_partition_, leader, previous_leader, leader_epoch, unused_bool));
 | |
|     EXPECT_TRUE(addr.set_ip_addr(cluster_.ip_, cluster_.base_rpc_port_ + idx));
 | |
|     EXPECT_EQ(leader, addr);
 | |
|     sleep(1);
 | |
|   }
 | |
|   EXPECT_EQ(OB_SUCCESS, tmp_replica->get_leader(test_partition_, leader, previous_leader, leader_epoch, unused_bool));
 | |
|   EXPECT_EQ(OB_SUCCESS, tmp_replica->remove_partition(test_partition_));
 | |
|   EXPECT_EQ(OB_SUCCESS, cluster_.election_mgr_[idx].remove_partition(test_partition_));
 | |
| }
 | |
| 
 | |
| // TEST_F(TestObElectionCluster, send_vote_vote_message_loss)
 | |
| //{
 | |
| // int ret = OB_SUCCESS;
 | |
| // ObAddr addr;
 | |
| // ObAddr next_leader;
 | |
| // int64_t leader_epoch = 0;
 | |
| 
 | |
| // ELECT_ASYNC_LOG(INFO, "send_vote_vote_message_loss");
 | |
| // EXPECT_EQ(OB_SUCCESS, cluster_.election_mgr_[0].get_leader(test_partition_, addr, leader_epoch));
 | |
| // EXPECT_EQ(leader_, addr);
 | |
| 
 | |
| // const int32_t idx = addr.get_port() - cluster_.base_rpc_port_;
 | |
| 
 | |
| // EXPECT_EQ(OB_SUCCESS, cluster_.election_mgr_[2].stop_partition(test_partition_));
 | |
| 
 | |
| // cluster_.rpc_loss_[OB_ELECTION_VOTE_VOTE].times_ = 1;
 | |
| // EXPECT_TRUE(next_leader.set_ip_addr(LOCAL_IP, RPC_PORT + 1));
 | |
| // int32_t next_idx = next_leader.get_port() - cluster_.base_rpc_port_;
 | |
| // int64_t old_membership_version = cluster_.election_cb_[next_idx].get_priority().get_membership_version();
 | |
| // int64_t old_data_version = cluster_.election_cb_[next_idx].get_priority().get_data_version();
 | |
| // cluster_.election_cb_[next_idx].priority_.set_membership_version(cluster_.election_cb_[idx].get_priority().get_membership_version()
 | |
| // + 1);
 | |
| // cluster_.election_cb_[next_idx].priority_.set_data_version(cluster_.election_cb_[idx].get_priority().get_data_version());
 | |
| // if (OB_SUCCESS != (ret = cluster_.election_mgr_[idx].change_leader_async(test_partition_, next_leader))) {
 | |
| // ELECT_ASYNC_LOG(WARN, "change leader error", K(ret), K_(test_partition), K(next_leader));
 | |
| //} else {
 | |
| // sleep(6);
 | |
| //}
 | |
| // EXPECT_EQ(OB_SUCCESS, cluster_.election_mgr_[0].get_leader(test_partition_, addr, leader_epoch));
 | |
| // EXPECT_EQ(next_leader, addr);
 | |
| // EXPECT_EQ(OB_SUCCESS, cluster_.election_mgr_[next_idx].leader_revoke(test_partition_));
 | |
| // EXPECT_EQ(OB_SUCCESS, cluster_.election_mgr_[2].start_partition(test_partition_));
 | |
| // cluster_.election_cb_[next_idx].priority_.set_membership_version(old_membership_version);
 | |
| // cluster_.election_cb_[next_idx].priority_.set_data_version(ObVersion(old_data_version));
 | |
| //}
 | |
| 
 | |
| TEST_F(TestObElectionCluster, test_get_valid_candidate)
 | |
| {
 | |
|   ObAddr addr;
 | |
|   ObAddr previous_leader;
 | |
|   int64_t leader_epoch = 0;
 | |
|   bool unused_bool;
 | |
| 
 | |
|   ELECT_ASYNC_LOG(INFO, "test_get_valid_candiadte");
 | |
| 
 | |
|   sleep(5);
 | |
|   EXPECT_EQ(OB_SUCCESS,
 | |
|       cluster_.election_mgr_[0].get_leader(test_partition_, addr, previous_leader, leader_epoch, unused_bool));
 | |
|   EXPECT_EQ(leader_, addr);
 | |
| 
 | |
|   const int32_t idx = addr.get_port() - cluster_.base_rpc_port_;
 | |
|   ObMemberList valid_list;
 | |
|   ObMemberList curr_list;
 | |
|   EXPECT_EQ(OB_SUCCESS, cluster_.election_mgr_[idx].get_curr_candidate(test_partition_, curr_list));
 | |
|   EXPECT_EQ(OB_SUCCESS, cluster_.election_mgr_[idx].get_valid_candidate(test_partition_, valid_list));
 | |
| 
 | |
|   EXPECT_EQ(OB_REPLICA_NUM, valid_list.get_member_number());
 | |
|   EXPECT_EQ(valid_list.get_member_number(), curr_list.get_member_number());
 | |
|   for (int64_t index = 0; index < curr_list.get_member_number(); ++index) {
 | |
|     ObAddr addr;
 | |
|     EXPECT_EQ(OB_SUCCESS, curr_list.get_server_by_index(index, addr));
 | |
|     EXPECT_TRUE(valid_list.contains(addr));
 | |
|   }
 | |
| }
 | |
| 
 | |
| TEST_F(TestObElectionCluster, force_leader)
 | |
| {
 | |
|   ObAddr addr;
 | |
|   ObAddr previous_leader;
 | |
|   int64_t leader_epoch = 0;
 | |
|   bool unused_bool;
 | |
|   EXPECT_EQ(OB_SUCCESS,
 | |
|       cluster_.election_mgr_[0].get_leader(test_partition_, addr, previous_leader, leader_epoch, unused_bool));
 | |
|   EXPECT_EQ(leader_, addr);
 | |
|   const int32_t idx = addr.get_port() - cluster_.base_rpc_port_;
 | |
|   const uint32_t revoke_type = ObElection::RevokeType::LEASE_EXPIRED;
 | |
|   EXPECT_EQ(OB_SUCCESS, cluster_.election_mgr_[idx].leader_revoke(test_partition_, revoke_type));
 | |
|   cluster_.rpc_loss_[OB_ELECTION_DEVOTE_PREPARE].times_ = 1000;
 | |
|   cluster_.rpc_loss_[OB_ELECTION_DEVOTE_VOTE].times_ = 1000;
 | |
|   cluster_.rpc_loss_[OB_ELECTION_DEVOTE_SUCCESS].times_ = 1000;
 | |
|   sleep(10);
 | |
|   cluster_.is_force_leader_ = true;
 | |
|   EXPECT_EQ(OB_SUCCESS, cluster_.election_mgr_[2].force_leader_async(test_partition_));
 | |
|   sleep(15);
 | |
|   EXPECT_EQ(OB_SUCCESS,
 | |
|       cluster_.election_mgr_[0].get_leader(test_partition_, addr, previous_leader, leader_epoch, unused_bool));
 | |
| 
 | |
|   //  EXPECT_EQ(addr, cluster_.election_mgr_[2].addr_);
 | |
| }
 | |
| }  // namespace unittest
 | |
| }  // namespace oceanbase
 | |
| 
 | |
| int main(int argc, char** argv)
 | |
| {
 | |
|   int ret = -1;
 | |
| 
 | |
|   oceanbase::election::ObAsyncLog::getLogger().init("test_election_cluster.log", OB_LOG_LEVEL_INFO, true);
 | |
|   oceanbase::common::ObClusterVersion::get_instance().init(CLUSTER_VERSION_1500);
 | |
|   EXPECT_EQ(OB_SUCCESS, ObTenantMutilAllocatorMgr::get_instance().init());
 | |
| 
 | |
|   if (OB_FAIL(oceanbase::common::ObClockGenerator::init())) {
 | |
|     ELECT_ASYNC_LOG(WARN, "clock generator init error.", K(ret));
 | |
|   } else {
 | |
|     testing::InitGoogleTest(&argc, argv);
 | |
|     ret = RUN_ALL_TESTS();
 | |
|   }
 | |
| 
 | |
|   return ret;
 | |
| }
 | 
