Files
oceanbase/mittest/logservice/test_ob_simple_log_throttling_member_change.cpp

449 lines
21 KiB
C++

// owner: zjf225077
// owner group: log
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define private public
#include "env/ob_simple_log_cluster_env.h"
#undef private
const std::string TEST_NAME = "throttling_member_change";
using namespace oceanbase::common;
using namespace oceanbase;
namespace oceanbase
{
using namespace logservice;
using namespace palf;
namespace unittest
{
class TestObSimpleLogIOWorkerThrottlingV2 : public ObSimpleLogClusterTestEnv
{
public:
TestObSimpleLogIOWorkerThrottlingV2() : ObSimpleLogClusterTestEnv()
{}
void set_palf_disk_options(PalfEnvImpl &palf_env_impl, const PalfDiskOptions &disk_options)
{
palf_env_impl.disk_options_wrapper_.disk_opts_for_recycling_blocks_ = disk_options;
palf_env_impl.disk_options_wrapper_.disk_opts_for_stopping_writing_ = disk_options;
}
};
int64_t ObSimpleLogClusterTestBase::member_cnt_ = 3;
int64_t ObSimpleLogClusterTestBase::node_cnt_ = 5;
std::string ObSimpleLogClusterTestBase::test_name_ = TEST_NAME;
bool ObSimpleLogClusterTestBase::need_add_arb_server_ = false;
bool ObSimpleLogClusterTestBase::need_shared_storage_ = false;
MockLocCB loc_cb;
TEST_F(TestObSimpleLogIOWorkerThrottlingV2, test_throttling_majority)
{
SET_CASE_LOG_FILE(TEST_NAME, "test_throttling_majority");
OB_LOGGER.set_log_level("INFO");
int ret = OB_SUCCESS;
const int64_t id = ATOMIC_AAF(&palf_id_, 1);
PALF_LOG(INFO, "begin test throttling_majority", K(id));
int64_t leader_idx = 0;
const int64_t CONFIG_CHANGE_TIMEOUT = 10 * 1000 * 1000L; // 10s
PalfHandleImplGuard leader;
PalfEnv *palf_env = NULL;
ASSERT_EQ(OB_SUCCESS, create_paxos_group(id, &loc_cb, leader_idx, leader));
ASSERT_EQ(OB_SUCCESS, get_palf_env(leader_idx, palf_env));
const PalfDiskOptions disk_options = palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_recycling_blocks_;
set_disk_options_for_throttling(palf_env->palf_env_impl_);
loc_cb.leader_ = get_cluster()[leader_idx]->get_addr();
std::vector<PalfHandleImplGuard*> palf_list;
EXPECT_EQ(OB_SUCCESS, get_cluster_palf_handle_guard(id, palf_list));
ASSERT_EQ(5, palf_list.size());
const int64_t follower_B_idx = (leader_idx + 1);
const int64_t follower_C_idx = (leader_idx + 2);
const int64_t follower_D_idx = (leader_idx + 3);
ASSERT_EQ(OB_SUCCESS, get_palf_env(follower_B_idx, palf_env));
set_disk_options_for_throttling(palf_env->palf_env_impl_);
ASSERT_EQ(OB_SUCCESS, get_palf_env(follower_C_idx, palf_env));
set_disk_options_for_throttling(palf_env->palf_env_impl_);
ASSERT_EQ(OB_SUCCESS, get_palf_env(follower_D_idx, palf_env));
set_disk_options_for_throttling(palf_env->palf_env_impl_);
const int64_t KB = 1024L;
const int64_t MB = 1024 * 1024L;
PALF_LOG(INFO, "prepare for throttling");
leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
ASSERT_EQ(OB_SUCCESS, submit_log(leader, 140, id, 2 * MB));
LSN max_lsn = leader.palf_handle_impl_->sw_.get_max_lsn();
LSN follower_max_lsn = palf_list[1]->palf_handle_impl_->sw_.get_max_lsn();
PALF_LOG(INFO, "prepare max_lsn", K(max_lsn), K(follower_max_lsn));
wait_lsn_until_flushed(max_lsn, leader);
wait_lsn_until_flushed(max_lsn, *palf_list[1]);
wait_lsn_until_flushed(max_lsn, *palf_list[2]);
PALF_LOG(INFO, "[CASE 1]test throttling interval");
int64_t throttling_percentage = 60;
ASSERT_EQ(OB_SUCCESS, get_palf_env(leader_idx, palf_env));
palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = throttling_percentage;
ASSERT_EQ(OB_SUCCESS, get_palf_env(follower_B_idx, palf_env));
palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = throttling_percentage;
ASSERT_EQ(OB_SUCCESS, get_palf_env(follower_C_idx, palf_env));
palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = throttling_percentage;
ASSERT_EQ(OB_SUCCESS, get_palf_env(follower_D_idx, palf_env));
palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = throttling_percentage;
usleep(LogWritingThrottle::UPDATE_INTERVAL_US);
int64_t cur_ts = common::ObClockGenerator::getClock();
leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
ASSERT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 512 * KB));
max_lsn = leader.palf_handle_impl_->sw_.get_max_lsn();
wait_lsn_until_flushed(max_lsn, leader);
wait_lsn_until_flushed(max_lsn, *palf_list[1]);
wait_lsn_until_flushed(max_lsn, *palf_list[2]);
int64_t break_ts = common::ObClockGenerator::getClock();
int64_t used_time = break_ts- cur_ts;
PALF_LOG(INFO, "[CASE 1] ", K(used_time));
ASSERT_EQ(true, (used_time) > 14 * 1000 * 1000);
PALF_LOG(INFO, "[1.1] before change_replica_num 3->5");
ASSERT_EQ(OB_SUCCESS, leader.palf_handle_impl_->change_replica_num(get_member_list(), 3, 5, CONFIG_CHANGE_TIMEOUT));
ASSERT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 1 * KB));
max_lsn = leader.palf_handle_impl_->sw_.get_max_lsn();
wait_lsn_until_flushed(max_lsn, leader);
PALF_LOG(INFO, "[1.2] before change_replica_num 5->3");
ASSERT_EQ(OB_SUCCESS, leader.palf_handle_impl_->change_replica_num(get_member_list(), 5, 3, CONFIG_CHANGE_TIMEOUT));
PALF_LOG(INFO, "CASE[1.3] test remove_member while throttling");
ASSERT_EQ(OB_SUCCESS, submit_log(leader, 2, id, 512 * KB));
usleep(500 * 1000);
ASSERT_EQ(OB_SUCCESS, leader.palf_handle_impl_->remove_member(ObMember(get_cluster()[follower_C_idx]->get_addr(), 1), 3, CONFIG_CHANGE_TIMEOUT));
ASSERT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 1 * KB));
ASSERT_EQ(OB_SUCCESS, get_palf_env(follower_D_idx, palf_env));
palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = 100;
ASSERT_EQ(OB_SUCCESS, submit_log(leader, 2, id, 512 * KB));
usleep(500 * 1000);
PALF_LOG(INFO, "CASE[1.4] test add_member while throttling");
LogConfigVersion config_version;
ASSERT_EQ(OB_SUCCESS, leader.palf_handle_impl_->get_config_version(config_version));
ASSERT_EQ(OB_SUCCESS, leader.palf_handle_impl_->add_member(ObMember(get_cluster()[follower_D_idx]->get_addr(), 1), 3, config_version, CONFIG_CHANGE_TIMEOUT));
ASSERT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 1 * KB));
PALF_LOG(INFO, "CASE[1.5] test switch_leader while throttling[major]");
int64_t new_leader_idx = 1;
PalfHandleImplGuard new_leader;
ASSERT_EQ(OB_SUCCESS, submit_log(leader, 2, id, 512 * KB));
int64_t switch_start_ts = common::ObClockGenerator::getClock();
ASSERT_EQ(OB_SUCCESS, switch_leader(id, new_leader_idx, new_leader));
int64_t switch_end_ts = common::ObClockGenerator::getClock();
used_time = switch_end_ts - switch_start_ts;
PALF_LOG(INFO, "[CASE 1.5 end ] end switch_leader", K(used_time));
// ASSERT_EQ(true, used_time < 2 * 1000 * 1000);
new_leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
ASSERT_EQ(OB_SUCCESS, submit_log(new_leader, 1, id, 1 * KB));
max_lsn = new_leader.palf_handle_impl_->sw_.get_max_lsn();
wait_lsn_until_flushed(max_lsn, new_leader);
loc_cb.leader_ = get_cluster()[new_leader_idx]->get_addr();
PALF_LOG(INFO, "end test throttling_major", K(id));
leader.reset();
new_leader.reset();
ASSERT_EQ(OB_SUCCESS, get_palf_env(leader_idx, palf_env));
set_palf_disk_options(palf_env->palf_env_impl_, disk_options);
ASSERT_EQ(OB_SUCCESS, get_palf_env(follower_B_idx, palf_env));
set_palf_disk_options(palf_env->palf_env_impl_, disk_options);
ASSERT_EQ(OB_SUCCESS, get_palf_env(follower_C_idx, palf_env));
set_palf_disk_options(palf_env->palf_env_impl_, disk_options);
ASSERT_EQ(OB_SUCCESS, get_palf_env(follower_D_idx, palf_env));
set_palf_disk_options(palf_env->palf_env_impl_, disk_options);
revert_cluster_palf_handle_guard(palf_list);
delete_paxos_group(id);
PALF_LOG(INFO, "destroy", K(id));
}
TEST_F(TestObSimpleLogIOWorkerThrottlingV2, test_throttling_minor_leader)
{
SET_CASE_LOG_FILE(TEST_NAME, "test_throttling_minor_leader");
int ret = OB_SUCCESS;
const int64_t id = ATOMIC_AAF(&palf_id_, 1);
PALF_LOG(INFO, "begin test throttling_minor_leader", K(id));
int64_t leader_idx = 0;
const int64_t CONFIG_CHANGE_TIMEOUT = 10 * 1000 * 1000L; // 10s
PalfHandleImplGuard leader;
PalfEnv *palf_env = NULL;
ASSERT_EQ(OB_SUCCESS, create_paxos_group(id, &loc_cb, leader_idx, leader));
ASSERT_EQ(OB_SUCCESS, get_palf_env(leader_idx, palf_env));
const PalfDiskOptions disk_options = palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_recycling_blocks_;
set_disk_options_for_throttling(palf_env->palf_env_impl_);
loc_cb.leader_ = get_cluster()[leader_idx]->get_addr();
std::vector<PalfHandleImplGuard*> palf_list;
EXPECT_EQ(OB_SUCCESS, get_cluster_palf_handle_guard(id, palf_list));
const int64_t follower_B_idx = (leader_idx + 1);
const int64_t follower_C_idx = (leader_idx + 2);
const int64_t follower_D_idx = (leader_idx + 3);
ASSERT_EQ(OB_SUCCESS, get_palf_env(follower_B_idx, palf_env));
set_disk_options_for_throttling(palf_env->palf_env_impl_);
ASSERT_EQ(OB_SUCCESS, get_palf_env(follower_C_idx, palf_env));
set_disk_options_for_throttling(palf_env->palf_env_impl_);
ASSERT_EQ(OB_SUCCESS, get_palf_env(follower_D_idx, palf_env));
set_disk_options_for_throttling(palf_env->palf_env_impl_);
const int64_t KB = 1024L;
const int64_t MB = 1024 * 1024L;
PALF_LOG(INFO, "[CASE 2]prepare for throttling");
leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
ASSERT_EQ(OB_SUCCESS, submit_log(leader, 140, id, 2 * MB));
LSN max_lsn = leader.palf_handle_impl_->sw_.get_max_lsn();
LSN follower_max_lsn = palf_list[1]->palf_handle_impl_->sw_.get_max_lsn();
PALF_LOG(INFO, "prepare max_lsn", K(max_lsn), K(follower_max_lsn));
wait_lsn_until_flushed(max_lsn, leader);
wait_lsn_until_flushed(max_lsn, *palf_list[1]);
wait_lsn_until_flushed(max_lsn, *palf_list[2]);
PALF_LOG(INFO, "[CASE 2]test throttling interval");
int64_t throttling_percentage = 60;
ASSERT_EQ(OB_SUCCESS, get_palf_env(leader_idx, palf_env));
palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = throttling_percentage;
usleep(LogWritingThrottle::UPDATE_INTERVAL_US);
int64_t cur_ts = common::ObClockGenerator::getClock();
leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
ASSERT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 512 * KB));
max_lsn = leader.palf_handle_impl_->sw_.get_max_lsn();
wait_lsn_until_flushed(max_lsn, leader);
int64_t break_ts = common::ObClockGenerator::getClock();
ASSERT_EQ(true, (break_ts - cur_ts) > 13 * 1000 * 1000);
PALF_LOG(INFO, "YYY ", K(break_ts- cur_ts));
PALF_LOG(INFO, "[CASE 2.1] before change_replica_num 3->5");
ASSERT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 512 * KB));
EXPECT_UNTIL_EQ(OB_SUCCESS, leader.palf_handle_impl_->change_replica_num(get_member_list(), 3, 5, CONFIG_CHANGE_TIMEOUT));
ASSERT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 1 * KB));
max_lsn = leader.palf_handle_impl_->sw_.get_max_lsn();
wait_lsn_until_flushed(max_lsn, leader);
ASSERT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 512 * KB));
PALF_LOG(INFO, "[CASE 2.2] before change_replica_num 5->3");
EXPECT_UNTIL_EQ(OB_SUCCESS, leader.palf_handle_impl_->change_replica_num(get_member_list(), 5, 3, CONFIG_CHANGE_TIMEOUT));
wait_lsn_until_flushed(max_lsn, leader);
ASSERT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 1 * KB));
PALF_LOG(INFO, "[CASE 2.3] test remove_member");
ASSERT_EQ(OB_SUCCESS, submit_log(leader, 2, id, 512 * KB));
usleep(500 * 1000);
//OB_TIMEOUT: committed_end_lsn of new majority is smaller than commited_end_lsn of leader
EXPECT_UNTIL_EQ(OB_TIMEOUT, leader.palf_handle_impl_->remove_member(ObMember(get_cluster()[follower_C_idx]->get_addr(), 1), 3, CONFIG_CHANGE_TIMEOUT));
ASSERT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 1 * KB));
/*
ASSERT_EQ(OB_SUCCESS, submit_log(leader, 2, id, 512 * KB));
usleep(500 * 1000);
PALF_LOG(INFO, "[CASE 2.4] test add_member");
ASSERT_EQ(OB_TIMEOUT, leader.palf_handle_impl_->add_member(ObMember(get_cluster()[follower_D_idx]->get_addr(), 1), 4, CONFIG_CHANGE_TIMEOUT));
ASSERT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 1 * KB));
*/
max_lsn = leader.palf_handle_impl_->sw_.get_max_lsn();
wait_lsn_until_flushed(max_lsn, leader);
PALF_LOG(INFO, "[CASE 2.5] test switch_leader");
int64_t new_leader_idx = 1;
PalfHandleImplGuard new_leader;
ASSERT_EQ(OB_SUCCESS, submit_log(leader, 2, id, 512 * KB));
int64_t switch_start_ts = common::ObClockGenerator::getClock();
ASSERT_EQ(OB_SUCCESS, switch_leader(id, new_leader_idx, new_leader));
int64_t switch_end_ts = common::ObClockGenerator::getClock();
const int64_t used_time = switch_end_ts - switch_start_ts;
PALF_LOG(INFO, "[CASE 2.5 end ] end switch_leader", K(used_time));
// ASSERT_EQ(true, used_time < 2 * 1000 * 1000);
new_leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
ASSERT_EQ(OB_SUCCESS, submit_log(new_leader, 1, id, 1 * KB));
max_lsn = new_leader.palf_handle_impl_->sw_.get_max_lsn();
wait_lsn_until_flushed(max_lsn, new_leader);
loc_cb.leader_ = get_cluster()[new_leader_idx]->get_addr();
usleep(5*1000*1000);//wait follower_c log sync
LSN old_max_lsn = leader.palf_handle_impl_->sw_.get_max_lsn();
LSN leader_old_max_lsn = new_leader.palf_handle_impl_->sw_.get_max_lsn();
new_leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
ASSERT_EQ(OB_SUCCESS, submit_log(new_leader, 2, id, 512 * KB));
LSN cur_max_lsn = leader.palf_handle_impl_->sw_.get_max_lsn();
LSN leader_cur_max_lsn = new_leader.palf_handle_impl_->sw_.get_max_lsn();
usleep(500 * 1000);
LSN new_max_lsn = leader.palf_handle_impl_->sw_.get_max_lsn();
LSN leader_new_max_lsn = new_leader.palf_handle_impl_->sw_.get_max_lsn();
PALF_LOG(INFO, "before remove member",K(leader_old_max_lsn), K(leader_cur_max_lsn), K(leader_new_max_lsn), K(old_max_lsn), K(cur_max_lsn), K(new_max_lsn));
EXPECT_UNTIL_EQ(OB_TIMEOUT, new_leader.palf_handle_impl_->remove_member(ObMember(get_cluster()[follower_C_idx]->get_addr(), 1), 3, CONFIG_CHANGE_TIMEOUT));
ASSERT_EQ(OB_SUCCESS, submit_log(new_leader, 1, id, 1 * KB));
usleep(500 * 1000);
LogConfigVersion config_version;
ASSERT_EQ(OB_SUCCESS, leader.palf_handle_impl_->get_config_version(config_version));
EXPECT_UNTIL_EQ(OB_SUCCESS, new_leader.palf_handle_impl_->add_member(ObMember(get_cluster()[follower_D_idx]->get_addr(), 1), 4, config_version, CONFIG_CHANGE_TIMEOUT));
ASSERT_EQ(OB_SUCCESS, submit_log(new_leader, 1, id, 1 * KB));
PALF_LOG(INFO, "end test throttling_minor_leader", K(id));
leader.reset();
new_leader.reset();
ASSERT_EQ(OB_SUCCESS, get_palf_env(leader_idx, palf_env));
set_palf_disk_options(palf_env->palf_env_impl_, disk_options);
ASSERT_EQ(OB_SUCCESS, get_palf_env(follower_B_idx, palf_env));
set_palf_disk_options(palf_env->palf_env_impl_, disk_options);
ASSERT_EQ(OB_SUCCESS, get_palf_env(follower_C_idx, palf_env));
set_palf_disk_options(palf_env->palf_env_impl_, disk_options);
ASSERT_EQ(OB_SUCCESS, get_palf_env(follower_D_idx, palf_env));
set_palf_disk_options(palf_env->palf_env_impl_, disk_options);
revert_cluster_palf_handle_guard(palf_list);
delete_paxos_group(id);
PALF_LOG(INFO, "destroy", K(id));
}
TEST_F(TestObSimpleLogIOWorkerThrottlingV2, test_throttling_minor_follower)
{
SET_CASE_LOG_FILE(TEST_NAME, "test_throttling_minor_follower");
int ret = OB_SUCCESS;
const int64_t id = ATOMIC_AAF(&palf_id_, 1);
PALF_LOG(INFO, "begin test throttling_minor_follower", K(id));
int64_t leader_idx = 0;
const int64_t CONFIG_CHANGE_TIMEOUT = 10 * 1000 * 1000L; // 10s
PalfHandleImplGuard leader;
PalfEnv *palf_env = NULL;
ASSERT_EQ(OB_SUCCESS, create_paxos_group(id, &loc_cb, leader_idx, leader));
ASSERT_EQ(OB_SUCCESS, get_palf_env(leader_idx, palf_env));
set_disk_options_for_throttling(palf_env->palf_env_impl_);
loc_cb.leader_ = get_cluster()[leader_idx]->get_addr();
std::vector<PalfHandleImplGuard*> palf_list;
EXPECT_EQ(OB_SUCCESS, get_cluster_palf_handle_guard(id, palf_list));
const int64_t follower_B_idx = (leader_idx + 1);
const int64_t follower_C_idx = (leader_idx + 2);
const int64_t follower_D_idx = (leader_idx + 3);
ASSERT_EQ(OB_SUCCESS, get_palf_env(follower_B_idx, palf_env));
set_disk_options_for_throttling(palf_env->palf_env_impl_);
ASSERT_EQ(OB_SUCCESS, get_palf_env(follower_C_idx, palf_env));
set_disk_options_for_throttling(palf_env->palf_env_impl_);
ASSERT_EQ(OB_SUCCESS, get_palf_env(follower_D_idx, palf_env));
set_disk_options_for_throttling(palf_env->palf_env_impl_);
const int64_t KB = 1024L;
const int64_t MB = 1024 * 1024L;
PALF_LOG(INFO, "[CASE 3]prepare for throttling");
leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
ASSERT_EQ(OB_SUCCESS, submit_log(leader, 140, id, 2 * MB));
LSN max_lsn = leader.palf_handle_impl_->sw_.get_max_lsn();
LSN follower_max_lsn = palf_list[1]->palf_handle_impl_->sw_.get_max_lsn();
PALF_LOG(INFO, "prepare max_lsn", K(max_lsn), K(follower_max_lsn));
wait_lsn_until_flushed(max_lsn, leader);
PALF_LOG(INFO, "[CASE 3.1] before change_replica_num 3->5");
EXPECT_UNTIL_EQ(OB_SUCCESS, leader.palf_handle_impl_->change_replica_num(get_member_list(), 3, 5, CONFIG_CHANGE_TIMEOUT));
ASSERT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 1 * KB));
max_lsn = leader.palf_handle_impl_->sw_.get_max_lsn();
wait_lsn_until_flushed(max_lsn, leader);
PALF_LOG(INFO, "[CASE 3.2] before change_replica_num 5->3");
ASSERT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 512 * KB));
EXPECT_UNTIL_EQ(OB_SUCCESS, leader.palf_handle_impl_->change_replica_num(get_member_list(), 5, 3, CONFIG_CHANGE_TIMEOUT));
ASSERT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 512 * KB));
usleep(500 * 1000);
PALF_LOG(INFO, "[CASE 3.4] test add_member(3-4)");
LogConfigVersion config_version;
ASSERT_EQ(OB_SUCCESS, leader.palf_handle_impl_->get_config_version(config_version));
EXPECT_UNTIL_EQ(OB_SUCCESS, leader.palf_handle_impl_->add_member(ObMember(get_cluster()[follower_D_idx]->get_addr(), 1), 4, config_version, CONFIG_CHANGE_TIMEOUT));
ASSERT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 1 * KB));
EXPECT_UNTIL_EQ(OB_SUCCESS, leader.palf_handle_impl_->remove_member(ObMember(get_cluster()[follower_D_idx]->get_addr(), 1), 3, CONFIG_CHANGE_TIMEOUT));
PALF_LOG(INFO, "[CASE 3.3] test remove_member");
ASSERT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 512 * KB));
usleep(500 * 1000);
EXPECT_UNTIL_EQ(OB_SUCCESS, leader.palf_handle_impl_->remove_member(ObMember(get_cluster()[follower_B_idx]->get_addr(), 1), 3, CONFIG_CHANGE_TIMEOUT));
ASSERT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 1 * KB));
ASSERT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 512 * KB));
usleep(500 * 1000);
PALF_LOG(INFO, "[CASE 3.4] test add_member");
ASSERT_EQ(OB_SUCCESS, leader.palf_handle_impl_->get_config_version(config_version));
EXPECT_UNTIL_EQ(OB_SUCCESS, leader.palf_handle_impl_->add_member(ObMember(get_cluster()[follower_B_idx]->get_addr(), 1), 3, config_version, CONFIG_CHANGE_TIMEOUT));
ASSERT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 1 * KB));
PALF_LOG(INFO, "[CASE 3.5] test switch_leader to C (not throttled replica)");
int64_t new_leader_idx = 2;
PalfHandleImplGuard new_leader;
ASSERT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 512 * KB));
int64_t switch_start_ts = common::ObClockGenerator::getClock();
ASSERT_EQ(OB_SUCCESS, switch_leader(id, new_leader_idx, new_leader));
int64_t switch_end_ts = common::ObClockGenerator::getClock();
int64_t used_time = switch_end_ts - switch_start_ts;
PALF_LOG(INFO, "[CASE 3.5 end ] end switch_leader", K(used_time));
// ASSERT_EQ(true, used_time < 2 * 1000 * 1000);
new_leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
ASSERT_EQ(OB_SUCCESS, submit_log(new_leader, 1, id, 1 * KB));
max_lsn = new_leader.palf_handle_impl_->sw_.get_max_lsn();
wait_lsn_until_flushed(max_lsn, new_leader);
loc_cb.leader_ = get_cluster()[new_leader_idx]->get_addr();
PALF_LOG(INFO, "[CASE 3.6] test switch_leader to C(throttled replica)");
int64_t new_leader_idx_v2 = 1;
PalfHandleImplGuard new_leader_v2;
ASSERT_EQ(OB_SUCCESS, submit_log(new_leader, 1, id, 512 * KB));
switch_start_ts = common::ObClockGenerator::getClock();
ASSERT_EQ(OB_SUCCESS, switch_leader(id, new_leader_idx_v2, new_leader_v2));
switch_end_ts = common::ObClockGenerator::getClock();
used_time = switch_end_ts - switch_start_ts;
PALF_LOG(INFO, "[CASE 3.5 end ] end switch_leader", K(used_time));
// ASSERT_EQ(true, used_time < 2 * 1000 * 1000);
new_leader_v2.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
ASSERT_EQ(OB_SUCCESS, submit_log(new_leader_v2, 1, id, 1 * KB));
max_lsn = new_leader.palf_handle_impl_->sw_.get_max_lsn();
wait_lsn_until_flushed(max_lsn, new_leader);
loc_cb.leader_ = get_cluster()[new_leader_idx]->get_addr();
PALF_LOG(INFO, "end test throttling_minor_follower", K(id));
leader.reset();
new_leader.reset();
new_leader_v2.reset();
revert_cluster_palf_handle_guard(palf_list);
delete_paxos_group(id);
PALF_LOG(INFO, "destroy", K(id));
}
} // end unittest
} // end oceanbase
int main(int argc, char **argv)
{
RUN_SIMPLE_LOG_CLUSTER_TEST(TEST_NAME);
}