// owner: zjf225077 // owner group: log /** * Copyright (c) 2021 OceanBase * OceanBase CE is licensed under Mulan PubL v2. * You can use this software according to the terms and conditions of the Mulan PubL v2. * You may obtain a copy of Mulan PubL v2 at: * http://license.coscl.org.cn/MulanPubL-2.0 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. * See the Mulan PubL v2 for more details. */ #define private public #include "env/ob_simple_log_cluster_env.h" #undef private const std::string TEST_NAME = "throttling_member_change"; using namespace oceanbase::common; using namespace oceanbase; namespace oceanbase { using namespace logservice; using namespace palf; namespace unittest { class TestObSimpleLogIOWorkerThrottlingV2 : public ObSimpleLogClusterTestEnv { public: TestObSimpleLogIOWorkerThrottlingV2() : ObSimpleLogClusterTestEnv() {} void set_palf_disk_options(PalfEnvImpl &palf_env_impl, const PalfDiskOptions &disk_options) { palf_env_impl.disk_options_wrapper_.disk_opts_for_recycling_blocks_ = disk_options; palf_env_impl.disk_options_wrapper_.disk_opts_for_stopping_writing_ = disk_options; } }; int64_t ObSimpleLogClusterTestBase::member_cnt_ = 3; int64_t ObSimpleLogClusterTestBase::node_cnt_ = 5; std::string ObSimpleLogClusterTestBase::test_name_ = TEST_NAME; bool ObSimpleLogClusterTestBase::need_add_arb_server_ = false; bool ObSimpleLogClusterTestBase::need_shared_storage_ = false; MockLocCB loc_cb; TEST_F(TestObSimpleLogIOWorkerThrottlingV2, test_throttling_majority) { SET_CASE_LOG_FILE(TEST_NAME, "test_throttling_majority"); OB_LOGGER.set_log_level("INFO"); int ret = OB_SUCCESS; const int64_t id = ATOMIC_AAF(&palf_id_, 1); PALF_LOG(INFO, "begin test throttling_majority", K(id)); int64_t leader_idx = 0; const int64_t CONFIG_CHANGE_TIMEOUT = 10 * 1000 * 1000L; // 10s PalfHandleImplGuard leader; PalfEnv *palf_env = NULL; ASSERT_EQ(OB_SUCCESS, create_paxos_group(id, &loc_cb, leader_idx, leader)); ASSERT_EQ(OB_SUCCESS, get_palf_env(leader_idx, palf_env)); const PalfDiskOptions disk_options = palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_recycling_blocks_; set_disk_options_for_throttling(palf_env->palf_env_impl_); loc_cb.leader_ = get_cluster()[leader_idx]->get_addr(); std::vector palf_list; EXPECT_EQ(OB_SUCCESS, get_cluster_palf_handle_guard(id, palf_list)); ASSERT_EQ(5, palf_list.size()); const int64_t follower_B_idx = (leader_idx + 1); const int64_t follower_C_idx = (leader_idx + 2); const int64_t follower_D_idx = (leader_idx + 3); ASSERT_EQ(OB_SUCCESS, get_palf_env(follower_B_idx, palf_env)); set_disk_options_for_throttling(palf_env->palf_env_impl_); ASSERT_EQ(OB_SUCCESS, get_palf_env(follower_C_idx, palf_env)); set_disk_options_for_throttling(palf_env->palf_env_impl_); ASSERT_EQ(OB_SUCCESS, get_palf_env(follower_D_idx, palf_env)); set_disk_options_for_throttling(palf_env->palf_env_impl_); const int64_t KB = 1024L; const int64_t MB = 1024 * 1024L; PALF_LOG(INFO, "prepare for throttling"); leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE; ASSERT_EQ(OB_SUCCESS, submit_log(leader, 140, id, 2 * MB)); LSN max_lsn = leader.palf_handle_impl_->sw_.get_max_lsn(); LSN follower_max_lsn = palf_list[1]->palf_handle_impl_->sw_.get_max_lsn(); PALF_LOG(INFO, "prepare max_lsn", K(max_lsn), K(follower_max_lsn)); wait_lsn_until_flushed(max_lsn, leader); wait_lsn_until_flushed(max_lsn, *palf_list[1]); wait_lsn_until_flushed(max_lsn, *palf_list[2]); PALF_LOG(INFO, "[CASE 1]test throttling interval"); int64_t throttling_percentage = 60; ASSERT_EQ(OB_SUCCESS, get_palf_env(leader_idx, palf_env)); palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = throttling_percentage; ASSERT_EQ(OB_SUCCESS, get_palf_env(follower_B_idx, palf_env)); palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = throttling_percentage; ASSERT_EQ(OB_SUCCESS, get_palf_env(follower_C_idx, palf_env)); palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = throttling_percentage; ASSERT_EQ(OB_SUCCESS, get_palf_env(follower_D_idx, palf_env)); palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = throttling_percentage; usleep(LogWritingThrottle::UPDATE_INTERVAL_US); int64_t cur_ts = common::ObClockGenerator::getClock(); leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE; ASSERT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 512 * KB)); max_lsn = leader.palf_handle_impl_->sw_.get_max_lsn(); wait_lsn_until_flushed(max_lsn, leader); wait_lsn_until_flushed(max_lsn, *palf_list[1]); wait_lsn_until_flushed(max_lsn, *palf_list[2]); int64_t break_ts = common::ObClockGenerator::getClock(); int64_t used_time = break_ts- cur_ts; PALF_LOG(INFO, "[CASE 1] ", K(used_time)); ASSERT_EQ(true, (used_time) > 14 * 1000 * 1000); PALF_LOG(INFO, "[1.1] before change_replica_num 3->5"); ASSERT_EQ(OB_SUCCESS, leader.palf_handle_impl_->change_replica_num(get_member_list(), 3, 5, CONFIG_CHANGE_TIMEOUT)); ASSERT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 1 * KB)); max_lsn = leader.palf_handle_impl_->sw_.get_max_lsn(); wait_lsn_until_flushed(max_lsn, leader); PALF_LOG(INFO, "[1.2] before change_replica_num 5->3"); ASSERT_EQ(OB_SUCCESS, leader.palf_handle_impl_->change_replica_num(get_member_list(), 5, 3, CONFIG_CHANGE_TIMEOUT)); PALF_LOG(INFO, "CASE[1.3] test remove_member while throttling"); ASSERT_EQ(OB_SUCCESS, submit_log(leader, 2, id, 512 * KB)); usleep(500 * 1000); ASSERT_EQ(OB_SUCCESS, leader.palf_handle_impl_->remove_member(ObMember(get_cluster()[follower_C_idx]->get_addr(), 1), 3, CONFIG_CHANGE_TIMEOUT)); ASSERT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 1 * KB)); ASSERT_EQ(OB_SUCCESS, get_palf_env(follower_D_idx, palf_env)); palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = 100; ASSERT_EQ(OB_SUCCESS, submit_log(leader, 2, id, 512 * KB)); usleep(500 * 1000); PALF_LOG(INFO, "CASE[1.4] test add_member while throttling"); LogConfigVersion config_version; ASSERT_EQ(OB_SUCCESS, leader.palf_handle_impl_->get_config_version(config_version)); ASSERT_EQ(OB_SUCCESS, leader.palf_handle_impl_->add_member(ObMember(get_cluster()[follower_D_idx]->get_addr(), 1), 3, config_version, CONFIG_CHANGE_TIMEOUT)); ASSERT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 1 * KB)); PALF_LOG(INFO, "CASE[1.5] test switch_leader while throttling[major]"); int64_t new_leader_idx = 1; PalfHandleImplGuard new_leader; ASSERT_EQ(OB_SUCCESS, submit_log(leader, 2, id, 512 * KB)); int64_t switch_start_ts = common::ObClockGenerator::getClock(); ASSERT_EQ(OB_SUCCESS, switch_leader(id, new_leader_idx, new_leader)); int64_t switch_end_ts = common::ObClockGenerator::getClock(); used_time = switch_end_ts - switch_start_ts; PALF_LOG(INFO, "[CASE 1.5 end ] end switch_leader", K(used_time)); // ASSERT_EQ(true, used_time < 2 * 1000 * 1000); new_leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE; ASSERT_EQ(OB_SUCCESS, submit_log(new_leader, 1, id, 1 * KB)); max_lsn = new_leader.palf_handle_impl_->sw_.get_max_lsn(); wait_lsn_until_flushed(max_lsn, new_leader); loc_cb.leader_ = get_cluster()[new_leader_idx]->get_addr(); PALF_LOG(INFO, "end test throttling_major", K(id)); leader.reset(); new_leader.reset(); ASSERT_EQ(OB_SUCCESS, get_palf_env(leader_idx, palf_env)); set_palf_disk_options(palf_env->palf_env_impl_, disk_options); ASSERT_EQ(OB_SUCCESS, get_palf_env(follower_B_idx, palf_env)); set_palf_disk_options(palf_env->palf_env_impl_, disk_options); ASSERT_EQ(OB_SUCCESS, get_palf_env(follower_C_idx, palf_env)); set_palf_disk_options(palf_env->palf_env_impl_, disk_options); ASSERT_EQ(OB_SUCCESS, get_palf_env(follower_D_idx, palf_env)); set_palf_disk_options(palf_env->palf_env_impl_, disk_options); revert_cluster_palf_handle_guard(palf_list); delete_paxos_group(id); PALF_LOG(INFO, "destroy", K(id)); } TEST_F(TestObSimpleLogIOWorkerThrottlingV2, test_throttling_minor_leader) { SET_CASE_LOG_FILE(TEST_NAME, "test_throttling_minor_leader"); int ret = OB_SUCCESS; const int64_t id = ATOMIC_AAF(&palf_id_, 1); PALF_LOG(INFO, "begin test throttling_minor_leader", K(id)); int64_t leader_idx = 0; const int64_t CONFIG_CHANGE_TIMEOUT = 10 * 1000 * 1000L; // 10s PalfHandleImplGuard leader; PalfEnv *palf_env = NULL; ASSERT_EQ(OB_SUCCESS, create_paxos_group(id, &loc_cb, leader_idx, leader)); ASSERT_EQ(OB_SUCCESS, get_palf_env(leader_idx, palf_env)); const PalfDiskOptions disk_options = palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_recycling_blocks_; set_disk_options_for_throttling(palf_env->palf_env_impl_); loc_cb.leader_ = get_cluster()[leader_idx]->get_addr(); std::vector palf_list; EXPECT_EQ(OB_SUCCESS, get_cluster_palf_handle_guard(id, palf_list)); const int64_t follower_B_idx = (leader_idx + 1); const int64_t follower_C_idx = (leader_idx + 2); const int64_t follower_D_idx = (leader_idx + 3); ASSERT_EQ(OB_SUCCESS, get_palf_env(follower_B_idx, palf_env)); set_disk_options_for_throttling(palf_env->palf_env_impl_); ASSERT_EQ(OB_SUCCESS, get_palf_env(follower_C_idx, palf_env)); set_disk_options_for_throttling(palf_env->palf_env_impl_); ASSERT_EQ(OB_SUCCESS, get_palf_env(follower_D_idx, palf_env)); set_disk_options_for_throttling(palf_env->palf_env_impl_); const int64_t KB = 1024L; const int64_t MB = 1024 * 1024L; PALF_LOG(INFO, "[CASE 2]prepare for throttling"); leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE; ASSERT_EQ(OB_SUCCESS, submit_log(leader, 140, id, 2 * MB)); LSN max_lsn = leader.palf_handle_impl_->sw_.get_max_lsn(); LSN follower_max_lsn = palf_list[1]->palf_handle_impl_->sw_.get_max_lsn(); PALF_LOG(INFO, "prepare max_lsn", K(max_lsn), K(follower_max_lsn)); wait_lsn_until_flushed(max_lsn, leader); wait_lsn_until_flushed(max_lsn, *palf_list[1]); wait_lsn_until_flushed(max_lsn, *palf_list[2]); PALF_LOG(INFO, "[CASE 2]test throttling interval"); int64_t throttling_percentage = 60; ASSERT_EQ(OB_SUCCESS, get_palf_env(leader_idx, palf_env)); palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = throttling_percentage; usleep(LogWritingThrottle::UPDATE_INTERVAL_US); int64_t cur_ts = common::ObClockGenerator::getClock(); leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE; ASSERT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 512 * KB)); max_lsn = leader.palf_handle_impl_->sw_.get_max_lsn(); wait_lsn_until_flushed(max_lsn, leader); int64_t break_ts = common::ObClockGenerator::getClock(); ASSERT_EQ(true, (break_ts - cur_ts) > 13 * 1000 * 1000); PALF_LOG(INFO, "YYY ", K(break_ts- cur_ts)); PALF_LOG(INFO, "[CASE 2.1] before change_replica_num 3->5"); ASSERT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 512 * KB)); EXPECT_UNTIL_EQ(OB_SUCCESS, leader.palf_handle_impl_->change_replica_num(get_member_list(), 3, 5, CONFIG_CHANGE_TIMEOUT)); ASSERT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 1 * KB)); max_lsn = leader.palf_handle_impl_->sw_.get_max_lsn(); wait_lsn_until_flushed(max_lsn, leader); ASSERT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 512 * KB)); PALF_LOG(INFO, "[CASE 2.2] before change_replica_num 5->3"); EXPECT_UNTIL_EQ(OB_SUCCESS, leader.palf_handle_impl_->change_replica_num(get_member_list(), 5, 3, CONFIG_CHANGE_TIMEOUT)); wait_lsn_until_flushed(max_lsn, leader); ASSERT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 1 * KB)); PALF_LOG(INFO, "[CASE 2.3] test remove_member"); ASSERT_EQ(OB_SUCCESS, submit_log(leader, 2, id, 512 * KB)); usleep(500 * 1000); //OB_TIMEOUT: committed_end_lsn of new majority is smaller than commited_end_lsn of leader EXPECT_UNTIL_EQ(OB_TIMEOUT, leader.palf_handle_impl_->remove_member(ObMember(get_cluster()[follower_C_idx]->get_addr(), 1), 3, CONFIG_CHANGE_TIMEOUT)); ASSERT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 1 * KB)); /* ASSERT_EQ(OB_SUCCESS, submit_log(leader, 2, id, 512 * KB)); usleep(500 * 1000); PALF_LOG(INFO, "[CASE 2.4] test add_member"); ASSERT_EQ(OB_TIMEOUT, leader.palf_handle_impl_->add_member(ObMember(get_cluster()[follower_D_idx]->get_addr(), 1), 4, CONFIG_CHANGE_TIMEOUT)); ASSERT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 1 * KB)); */ max_lsn = leader.palf_handle_impl_->sw_.get_max_lsn(); wait_lsn_until_flushed(max_lsn, leader); PALF_LOG(INFO, "[CASE 2.5] test switch_leader"); int64_t new_leader_idx = 1; PalfHandleImplGuard new_leader; ASSERT_EQ(OB_SUCCESS, submit_log(leader, 2, id, 512 * KB)); int64_t switch_start_ts = common::ObClockGenerator::getClock(); ASSERT_EQ(OB_SUCCESS, switch_leader(id, new_leader_idx, new_leader)); int64_t switch_end_ts = common::ObClockGenerator::getClock(); const int64_t used_time = switch_end_ts - switch_start_ts; PALF_LOG(INFO, "[CASE 2.5 end ] end switch_leader", K(used_time)); // ASSERT_EQ(true, used_time < 2 * 1000 * 1000); new_leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE; ASSERT_EQ(OB_SUCCESS, submit_log(new_leader, 1, id, 1 * KB)); max_lsn = new_leader.palf_handle_impl_->sw_.get_max_lsn(); wait_lsn_until_flushed(max_lsn, new_leader); loc_cb.leader_ = get_cluster()[new_leader_idx]->get_addr(); usleep(5*1000*1000);//wait follower_c log sync LSN old_max_lsn = leader.palf_handle_impl_->sw_.get_max_lsn(); LSN leader_old_max_lsn = new_leader.palf_handle_impl_->sw_.get_max_lsn(); new_leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE; ASSERT_EQ(OB_SUCCESS, submit_log(new_leader, 2, id, 512 * KB)); LSN cur_max_lsn = leader.palf_handle_impl_->sw_.get_max_lsn(); LSN leader_cur_max_lsn = new_leader.palf_handle_impl_->sw_.get_max_lsn(); usleep(500 * 1000); LSN new_max_lsn = leader.palf_handle_impl_->sw_.get_max_lsn(); LSN leader_new_max_lsn = new_leader.palf_handle_impl_->sw_.get_max_lsn(); PALF_LOG(INFO, "before remove member",K(leader_old_max_lsn), K(leader_cur_max_lsn), K(leader_new_max_lsn), K(old_max_lsn), K(cur_max_lsn), K(new_max_lsn)); EXPECT_UNTIL_EQ(OB_TIMEOUT, new_leader.palf_handle_impl_->remove_member(ObMember(get_cluster()[follower_C_idx]->get_addr(), 1), 3, CONFIG_CHANGE_TIMEOUT)); ASSERT_EQ(OB_SUCCESS, submit_log(new_leader, 1, id, 1 * KB)); usleep(500 * 1000); LogConfigVersion config_version; ASSERT_EQ(OB_SUCCESS, leader.palf_handle_impl_->get_config_version(config_version)); EXPECT_UNTIL_EQ(OB_SUCCESS, new_leader.palf_handle_impl_->add_member(ObMember(get_cluster()[follower_D_idx]->get_addr(), 1), 4, config_version, CONFIG_CHANGE_TIMEOUT)); ASSERT_EQ(OB_SUCCESS, submit_log(new_leader, 1, id, 1 * KB)); PALF_LOG(INFO, "end test throttling_minor_leader", K(id)); leader.reset(); new_leader.reset(); ASSERT_EQ(OB_SUCCESS, get_palf_env(leader_idx, palf_env)); set_palf_disk_options(palf_env->palf_env_impl_, disk_options); ASSERT_EQ(OB_SUCCESS, get_palf_env(follower_B_idx, palf_env)); set_palf_disk_options(palf_env->palf_env_impl_, disk_options); ASSERT_EQ(OB_SUCCESS, get_palf_env(follower_C_idx, palf_env)); set_palf_disk_options(palf_env->palf_env_impl_, disk_options); ASSERT_EQ(OB_SUCCESS, get_palf_env(follower_D_idx, palf_env)); set_palf_disk_options(palf_env->palf_env_impl_, disk_options); revert_cluster_palf_handle_guard(palf_list); delete_paxos_group(id); PALF_LOG(INFO, "destroy", K(id)); } TEST_F(TestObSimpleLogIOWorkerThrottlingV2, test_throttling_minor_follower) { SET_CASE_LOG_FILE(TEST_NAME, "test_throttling_minor_follower"); int ret = OB_SUCCESS; const int64_t id = ATOMIC_AAF(&palf_id_, 1); PALF_LOG(INFO, "begin test throttling_minor_follower", K(id)); int64_t leader_idx = 0; const int64_t CONFIG_CHANGE_TIMEOUT = 10 * 1000 * 1000L; // 10s PalfHandleImplGuard leader; PalfEnv *palf_env = NULL; ASSERT_EQ(OB_SUCCESS, create_paxos_group(id, &loc_cb, leader_idx, leader)); ASSERT_EQ(OB_SUCCESS, get_palf_env(leader_idx, palf_env)); set_disk_options_for_throttling(palf_env->palf_env_impl_); loc_cb.leader_ = get_cluster()[leader_idx]->get_addr(); std::vector palf_list; EXPECT_EQ(OB_SUCCESS, get_cluster_palf_handle_guard(id, palf_list)); const int64_t follower_B_idx = (leader_idx + 1); const int64_t follower_C_idx = (leader_idx + 2); const int64_t follower_D_idx = (leader_idx + 3); ASSERT_EQ(OB_SUCCESS, get_palf_env(follower_B_idx, palf_env)); set_disk_options_for_throttling(palf_env->palf_env_impl_); ASSERT_EQ(OB_SUCCESS, get_palf_env(follower_C_idx, palf_env)); set_disk_options_for_throttling(palf_env->palf_env_impl_); ASSERT_EQ(OB_SUCCESS, get_palf_env(follower_D_idx, palf_env)); set_disk_options_for_throttling(palf_env->palf_env_impl_); const int64_t KB = 1024L; const int64_t MB = 1024 * 1024L; PALF_LOG(INFO, "[CASE 3]prepare for throttling"); leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE; ASSERT_EQ(OB_SUCCESS, submit_log(leader, 140, id, 2 * MB)); LSN max_lsn = leader.palf_handle_impl_->sw_.get_max_lsn(); LSN follower_max_lsn = palf_list[1]->palf_handle_impl_->sw_.get_max_lsn(); PALF_LOG(INFO, "prepare max_lsn", K(max_lsn), K(follower_max_lsn)); wait_lsn_until_flushed(max_lsn, leader); PALF_LOG(INFO, "[CASE 3.1] before change_replica_num 3->5"); EXPECT_UNTIL_EQ(OB_SUCCESS, leader.palf_handle_impl_->change_replica_num(get_member_list(), 3, 5, CONFIG_CHANGE_TIMEOUT)); ASSERT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 1 * KB)); max_lsn = leader.palf_handle_impl_->sw_.get_max_lsn(); wait_lsn_until_flushed(max_lsn, leader); PALF_LOG(INFO, "[CASE 3.2] before change_replica_num 5->3"); ASSERT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 512 * KB)); EXPECT_UNTIL_EQ(OB_SUCCESS, leader.palf_handle_impl_->change_replica_num(get_member_list(), 5, 3, CONFIG_CHANGE_TIMEOUT)); ASSERT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 512 * KB)); usleep(500 * 1000); PALF_LOG(INFO, "[CASE 3.4] test add_member(3-4)"); LogConfigVersion config_version; ASSERT_EQ(OB_SUCCESS, leader.palf_handle_impl_->get_config_version(config_version)); EXPECT_UNTIL_EQ(OB_SUCCESS, leader.palf_handle_impl_->add_member(ObMember(get_cluster()[follower_D_idx]->get_addr(), 1), 4, config_version, CONFIG_CHANGE_TIMEOUT)); ASSERT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 1 * KB)); EXPECT_UNTIL_EQ(OB_SUCCESS, leader.palf_handle_impl_->remove_member(ObMember(get_cluster()[follower_D_idx]->get_addr(), 1), 3, CONFIG_CHANGE_TIMEOUT)); PALF_LOG(INFO, "[CASE 3.3] test remove_member"); ASSERT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 512 * KB)); usleep(500 * 1000); EXPECT_UNTIL_EQ(OB_SUCCESS, leader.palf_handle_impl_->remove_member(ObMember(get_cluster()[follower_B_idx]->get_addr(), 1), 3, CONFIG_CHANGE_TIMEOUT)); ASSERT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 1 * KB)); ASSERT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 512 * KB)); usleep(500 * 1000); PALF_LOG(INFO, "[CASE 3.4] test add_member"); ASSERT_EQ(OB_SUCCESS, leader.palf_handle_impl_->get_config_version(config_version)); EXPECT_UNTIL_EQ(OB_SUCCESS, leader.palf_handle_impl_->add_member(ObMember(get_cluster()[follower_B_idx]->get_addr(), 1), 3, config_version, CONFIG_CHANGE_TIMEOUT)); ASSERT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 1 * KB)); PALF_LOG(INFO, "[CASE 3.5] test switch_leader to C (not throttled replica)"); int64_t new_leader_idx = 2; PalfHandleImplGuard new_leader; ASSERT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 512 * KB)); int64_t switch_start_ts = common::ObClockGenerator::getClock(); ASSERT_EQ(OB_SUCCESS, switch_leader(id, new_leader_idx, new_leader)); int64_t switch_end_ts = common::ObClockGenerator::getClock(); int64_t used_time = switch_end_ts - switch_start_ts; PALF_LOG(INFO, "[CASE 3.5 end ] end switch_leader", K(used_time)); // ASSERT_EQ(true, used_time < 2 * 1000 * 1000); new_leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE; ASSERT_EQ(OB_SUCCESS, submit_log(new_leader, 1, id, 1 * KB)); max_lsn = new_leader.palf_handle_impl_->sw_.get_max_lsn(); wait_lsn_until_flushed(max_lsn, new_leader); loc_cb.leader_ = get_cluster()[new_leader_idx]->get_addr(); PALF_LOG(INFO, "[CASE 3.6] test switch_leader to C(throttled replica)"); int64_t new_leader_idx_v2 = 1; PalfHandleImplGuard new_leader_v2; ASSERT_EQ(OB_SUCCESS, submit_log(new_leader, 1, id, 512 * KB)); switch_start_ts = common::ObClockGenerator::getClock(); ASSERT_EQ(OB_SUCCESS, switch_leader(id, new_leader_idx_v2, new_leader_v2)); switch_end_ts = common::ObClockGenerator::getClock(); used_time = switch_end_ts - switch_start_ts; PALF_LOG(INFO, "[CASE 3.5 end ] end switch_leader", K(used_time)); // ASSERT_EQ(true, used_time < 2 * 1000 * 1000); new_leader_v2.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE; ASSERT_EQ(OB_SUCCESS, submit_log(new_leader_v2, 1, id, 1 * KB)); max_lsn = new_leader.palf_handle_impl_->sw_.get_max_lsn(); wait_lsn_until_flushed(max_lsn, new_leader); loc_cb.leader_ = get_cluster()[new_leader_idx]->get_addr(); PALF_LOG(INFO, "end test throttling_minor_follower", K(id)); leader.reset(); new_leader.reset(); new_leader_v2.reset(); revert_cluster_palf_handle_guard(palf_list); delete_paxos_group(id); PALF_LOG(INFO, "destroy", K(id)); } } // end unittest } // end oceanbase int main(int argc, char **argv) { RUN_SIMPLE_LOG_CLUSTER_TEST(TEST_NAME); }