[PALF] do not change config if majority of paxos members have crashed

This commit is contained in:
BinChenn
2023-06-01 06:53:49 +00:00
committed by ob-robot
parent a77662f444
commit 0bb9124896
12 changed files with 261 additions and 102 deletions

View File

@ -67,6 +67,11 @@ bool MockNetKeepAliveAdapter::is_server_stopped(const common::ObAddr &server)
return log_deliver_->need_filter_packet_by_blacklist(server);
}
bool MockNetKeepAliveAdapter::in_black(const common::ObAddr &server)
{
return log_deliver_->need_filter_packet_by_blacklist(server);
}
uint32_t get_local_addr(const char *dev_name)
{
int fd, intrface;

View File

@ -77,6 +77,7 @@ public:
int init(unittest::ObLogDeliver *log_deliver);
bool in_black_or_stopped(const common::ObAddr &server) override final;
bool is_server_stopped(const common::ObAddr &server) override final;
bool in_black(const common::ObAddr &server) override final;
private:
unittest::ObLogDeliver *log_deliver_;
};

View File

@ -663,6 +663,18 @@ TEST_F(TestObSimpleLogClusterArbService, test_2f1a_degrade_when_no_leader)
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 100, id));
sleep(2);
LogConfigChangeArgs args(ObMember(palf_list[another_f_idx]->palf_handle_impl_->self_, 1), 0, DEGRADE_ACCEPTOR_TO_LEARNER);
const int64_t proposal_id = leader.palf_handle_impl_->state_mgr_.get_proposal_id();
const int64_t election_epoch = leader.palf_handle_impl_->state_mgr_.get_leader_epoch();
LogConfigVersion config_version;
EXPECT_EQ(OB_EAGAIN, leader.palf_handle_impl_->config_mgr_.change_config(args, proposal_id, election_epoch, config_version));
// leader appended config meta, but did not apply config meta
EXPECT_NE(palf_list[leader_idx]->get_palf_handle_impl()->config_mgr_.log_ms_meta_.curr_.config_version_,
palf_list[leader_idx]->get_palf_handle_impl()->config_mgr_.config_meta_.curr_.config_version_);
EXPECT_EQ(palf_list[another_f_idx]->get_palf_handle_impl()->config_mgr_.log_ms_meta_.curr_.config_version_,
palf_list[another_f_idx]->get_palf_handle_impl()->config_mgr_.config_meta_.curr_.config_version_);
// block all networks of arb member, and the network from the follower to the leader
block_net(arb_replica_idx, another_f_idx, true);
block_net(arb_replica_idx, leader_idx, true);
@ -673,12 +685,6 @@ TEST_F(TestObSimpleLogClusterArbService, test_2f1a_degrade_when_no_leader)
sleep(1);
}
// leader appended config meta, but did not apply config meta
EXPECT_NE(palf_list[leader_idx]->get_palf_handle_impl()->config_mgr_.log_ms_meta_.curr_.config_version_,
palf_list[leader_idx]->get_palf_handle_impl()->config_mgr_.config_meta_.curr_.config_version_);
EXPECT_EQ(palf_list[another_f_idx]->get_palf_handle_impl()->config_mgr_.log_ms_meta_.curr_.config_version_,
palf_list[another_f_idx]->get_palf_handle_impl()->config_mgr_.config_meta_.curr_.config_version_);
// unblock_net
unblock_net(another_f_idx, leader_idx);
unblock_net(arb_replica_idx, leader_idx);
@ -771,7 +777,6 @@ TEST_F(TestObSimpleLogClusterArbService, test_2f1a_upgrade_when_no_leader)
PALF_LOG(INFO, "end test_2f1a_upgrade_when_no_leader", K(id));
}
} // end unittest
} // end oceanbase

View File

@ -80,7 +80,7 @@ std::string ObSimpleLogClusterTestBase::test_name_ = TEST_NAME;
TEST_F(TestObSimpleLogClusterArbMockEleService, switch_leader_during_degrading)
{
int ret = OB_SUCCESS;
const int64_t id = ATOMIC_AAF(&palf_id_, 1);
const int64_t id = ATOMIC_AAF(&palf_id_, 1);
const int64_t TIMEOUT_US = 10 * 1000 * 1000L;
SET_CASE_LOG_FILE(TEST_NAME, "switch_leader_during_degrading");
PALF_LOG(INFO, "begin test switch_leader_during_degrading", K(id));
@ -131,6 +131,7 @@ TEST_F(TestObSimpleLogClusterArbMockEleService, switch_leader_during_degrading)
// A can not degrades B successfully
EXPECT_EQ(OB_NOT_MASTER, a_handle->palf_handle_impl_->config_mgr_.change_config_(degrade_b_args, degrade_b_pid, degrade_b_ele_epoch, degrade_b_version));
a_handle->palf_handle_impl_->config_mgr_.end_degrade();
for (auto srv: get_cluster()) {
srv->set_leader(id, a_addr, 3);
@ -238,6 +239,7 @@ TEST_F(TestObSimpleLogClusterArbMockEleService, switch_leader_to_other_during_de
// A can not degrades B successfully
EXPECT_EQ(OB_NOT_MASTER, a_handle->palf_handle_impl_->config_mgr_.change_config_(degrade_b_args, degrade_b_pid, degrade_b_ele_epoch, degrade_b_version));
a_handle->palf_handle_impl_->config_mgr_.end_degrade();
for (auto srv: get_cluster()) {
srv->set_leader(id, b_addr, 3);
@ -293,6 +295,146 @@ TEST_F(TestObSimpleLogClusterArbMockEleService, switch_leader_to_other_during_de
PALF_LOG(INFO, "end test switch_leader_to_other_during_degrading", K(id));
}
// 1. 2F1A, the leader starts to degrade another F
// 2. after the config log has been accepted by the arb member, the leader revoked
// 3. the previous leader has been elected as the new leader
// 4. reconfirm may fail because leader's config_version is not same to that of the follower
TEST_F(TestObSimpleLogClusterArbMockEleService, test_2f1a_degrade_when_no_leader2)
{
int ret = OB_SUCCESS;
const int64_t id = ATOMIC_AAF(&palf_id_, 1);
const int64_t TIMEOUT_US = 10 * 1000 * 1000L;
SET_CASE_LOG_FILE(TEST_NAME, "test_2f1a_degrade_when_no_leader2");
PALF_LOG(INFO, "begin test test_2f1a_degrade_when_no_leader2", K(id));
{
int64_t leader_idx = 0;
int64_t arb_replica_idx = 0;
PalfHandleImplGuard leader;
std::vector<PalfHandleImplGuard*> palf_list;
EXPECT_EQ(OB_SUCCESS, create_paxos_group_with_arb_mock_election(id, arb_replica_idx, leader_idx, leader));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 200, id));
EXPECT_EQ(OB_SUCCESS, get_cluster_palf_handle_guard(id, palf_list));
const int64_t b_idx = (leader_idx + 1) % 3;
const int64_t c_idx = (leader_idx + 2) % 3;
const common::ObAddr a_addr = get_cluster()[leader_idx]->get_addr();
const common::ObAddr b_addr = get_cluster()[b_idx]->get_addr();
const common::ObAddr c_addr = get_cluster()[c_idx]->get_addr();
PalfHandleImplGuard *a_handle = palf_list[leader_idx];
PalfHandleImplGuard *b_handle = palf_list[b_idx];
IPalfHandleImpl *c_ihandle = NULL;
get_cluster()[c_idx]->get_palf_env()->get_palf_handle_impl(id, c_ihandle);
palflite::PalfHandleLite *c_handle = dynamic_cast<palflite::PalfHandleLite*>(c_ihandle);
dynamic_cast<ObSimpleLogServer*>(get_cluster()[leader_idx])->log_service_.get_arbitration_service()->stop();
dynamic_cast<ObSimpleLogServer*>(get_cluster()[b_idx])->log_service_.get_arbitration_service()->stop();
// 1. A tries to degrade B
block_net(leader_idx, b_idx);
int64_t degrade_b_pid = 0;
int64_t degrade_b_ele_epoch = 0;
LogConfigVersion degrade_b_version;
LogConfigChangeArgs degrade_b_args(ObMember(b_addr, 1), 0, DEGRADE_ACCEPTOR_TO_LEARNER);
EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->config_mgr_.start_change_config(degrade_b_pid, degrade_b_ele_epoch, degrade_b_args.type_));
while (0 == leader.palf_handle_impl_->config_mgr_.state_) {
EXPECT_EQ(OB_EAGAIN, leader.palf_handle_impl_->config_mgr_.change_config_(degrade_b_args, degrade_b_pid, degrade_b_ele_epoch, degrade_b_version));
}
const LSN &leader_last_committed_end_lsn = leader.palf_handle_impl_->sw_.committed_end_lsn_;
sleep(2);
EXPECT_EQ(leader_last_committed_end_lsn, leader.palf_handle_impl_->sw_.committed_end_lsn_);
// A sends config meta to C
while (-1 == leader.palf_handle_impl_->config_mgr_.last_submit_config_log_time_us_ ||
c_handle->config_mgr_.log_ms_meta_.curr_.log_sync_memberlist_.contains(b_addr)) {
EXPECT_EQ(OB_EAGAIN, leader.palf_handle_impl_->config_mgr_.change_config_(degrade_b_args, degrade_b_pid, degrade_b_ele_epoch, degrade_b_version));
usleep(500);
}
EXPECT_FALSE(leader.palf_handle_impl_->config_mgr_.log_ms_meta_.curr_.log_sync_memberlist_.contains(b_addr));
EXPECT_TRUE(leader.palf_handle_impl_->config_mgr_.config_meta_.curr_.log_sync_memberlist_.contains(b_addr));
// 2. the leader A revokes and takeover again
for (auto srv: get_cluster()) {
const ObAddr addr1(ObAddr::IPV4, "0.0.0.0", 0);
srv->set_leader(id, addr1);
}
EXPECT_UNTIL_EQ(false, a_handle->palf_handle_impl_->state_mgr_.is_leader_active());
// A can not degrades B successfully
EXPECT_EQ(OB_NOT_MASTER, a_handle->palf_handle_impl_->config_mgr_.change_config_(degrade_b_args, degrade_b_pid, degrade_b_ele_epoch, degrade_b_version));
a_handle->palf_handle_impl_->config_mgr_.end_degrade();
for (auto srv: get_cluster()) {
srv->set_leader(id, a_addr, 3);
}
dynamic_cast<ObSimpleLogServer*>(get_cluster()[leader_idx])->log_service_.get_arbitration_service()->start();
EXPECT_UNTIL_EQ(true, a_handle->palf_handle_impl_->state_mgr_.is_leader_active());
EXPECT_UNTIL_EQ(true, leader.palf_handle_impl_->config_mgr_.config_meta_.curr_.degraded_learnerlist_.contains(b_addr));
unblock_net(leader_idx, b_idx);
get_cluster()[c_idx]->get_palf_env()->revert_palf_handle_impl(c_ihandle);
revert_cluster_palf_handle_guard(palf_list);
}
delete_paxos_group(id);
PALF_LOG(INFO, "end test test_2f1a_degrade_when_no_leader2", K(id));
}
TEST_F(TestObSimpleLogClusterArbMockEleService, test_2f1a_degrade_when_arb_crash)
{
OB_LOGGER.set_log_level("INFO");
int ret = OB_SUCCESS;
const int64_t id = ATOMIC_AAF(&palf_id_, 1);
const int64_t TIMEOUT_US = 10 * 1000 * 1000L;
SET_CASE_LOG_FILE(TEST_NAME, "test_2f1a_degrade_when_arb_crash");
PALF_LOG(INFO, "begin test test_2f1a_degrade_when_arb_crash", K(id));
{
int64_t leader_idx = 0;
int64_t arb_replica_idx = 0;
PalfHandleImplGuard leader;
std::vector<PalfHandleImplGuard*> palf_list;
EXPECT_EQ(OB_SUCCESS, create_paxos_group_with_arb_mock_election(id, arb_replica_idx, leader_idx, leader));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 200, id));
EXPECT_EQ(OB_SUCCESS, get_cluster_palf_handle_guard(id, palf_list));
const int64_t b_idx = (leader_idx + 1) % 3;
const int64_t c_idx = (leader_idx + 2) % 3;
const common::ObAddr a_addr = get_cluster()[leader_idx]->get_addr();
const common::ObAddr b_addr = get_cluster()[b_idx]->get_addr();
const common::ObAddr c_addr = get_cluster()[c_idx]->get_addr();
PalfHandleImplGuard *a_handle = palf_list[leader_idx];
PalfHandleImplGuard *b_handle = palf_list[b_idx];
// block the network from the leader to the arb member
block_net(leader_idx, arb_replica_idx);
// block the network from the leader to the follower
block_net(leader_idx, b_idx);
sleep(4);
// the leader can not degrade successfully
EXPECT_TRUE(leader.palf_handle_impl_->config_mgr_.log_ms_meta_.curr_.log_sync_memberlist_.contains(b_addr));
EXPECT_TRUE(leader.palf_handle_impl_->config_mgr_.config_meta_.curr_.log_sync_memberlist_.contains(b_addr));
// start to degrade B manually, do not allow
LogConfigChangeArgs args(ObMember(b_addr, 1), 0, DEGRADE_ACCEPTOR_TO_LEARNER);
EXPECT_EQ(OB_EAGAIN, leader.palf_handle_impl_->one_stage_config_change_(args, 10 * 1000 * 1000));
EXPECT_EQ(false, leader.palf_handle_impl_->config_mgr_.is_sw_interrupted_by_degrade_);
EXPECT_EQ(false, leader.palf_handle_impl_->state_mgr_.is_changing_config_with_arb_);
// start to degrade B manually, do not allow
int64_t degrade_b_pid = 0;
int64_t degrade_b_ele_epoch = 0;
LogConfigVersion degrade_b_version;
EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->config_mgr_.start_change_config(degrade_b_pid, degrade_b_ele_epoch, args.type_));
EXPECT_EQ(OB_EAGAIN, leader.palf_handle_impl_->config_mgr_.change_config(args, degrade_b_pid, degrade_b_ele_epoch, degrade_b_version));
EXPECT_EQ(true, leader.palf_handle_impl_->config_mgr_.is_sw_interrupted_by_degrade_);
EXPECT_EQ(true, leader.palf_handle_impl_->state_mgr_.is_changing_config_with_arb_);
unblock_net(leader_idx, arb_replica_idx);
unblock_net(leader_idx, b_idx);
revert_cluster_palf_handle_guard(palf_list);
}
delete_paxos_group(id);
PALF_LOG(INFO, "end test test_2f1a_degrade_when_arb_crash", K(id));
}
} // end unittest
} // end oceanbase

View File

@ -72,5 +72,17 @@ bool ObNetKeepAliveAdapter::is_server_stopped(const common::ObAddr &server)
}
return bool_ret;
}
bool ObNetKeepAliveAdapter::in_black(const common::ObAddr &server)
{
bool bool_ret = false;
bool in_blacklist = false;
bool unused_is_server_stopped = false;
if (OB_SUCCESS != in_black_or_stopped_(server, in_blacklist, unused_is_server_stopped)) {
} else {
bool_ret = in_blacklist;
}
return bool_ret;
}
} // end namespace logservice
} // end namespace oceanbase

View File

@ -29,6 +29,7 @@ public:
virtual ~IObNetKeepAliveAdapter() {}
virtual bool in_black_or_stopped(const common::ObAddr &server) = 0;
virtual bool is_server_stopped(const common::ObAddr &server) = 0;
virtual bool in_black(const common::ObAddr &server) = 0;
};
class ObNetKeepAliveAdapter : public IObNetKeepAliveAdapter {
@ -37,6 +38,7 @@ public:
~ObNetKeepAliveAdapter() override;
bool in_black_or_stopped(const common::ObAddr &server) override final;
bool is_server_stopped(const common::ObAddr &server) override final;
bool in_black(const common::ObAddr &server) override final;
private:
int in_black_or_stopped_(const common::ObAddr &server,
bool &in_black,

View File

@ -1374,9 +1374,8 @@ int LogConfigMgr::append_config_meta_(const int64_t curr_proposal_id,
} else if (has_arb_member && OB_FAIL(state_mgr_->set_changing_config_with_arb())) {
PALF_LOG(ERROR, "set_changing_config_with_arb failed", KR(ret), K_(palf_id), K_(self), K(args));
} else if (has_arb_member &&
OB_FAIL(wait_log_barrier_(args, new_config_info.log_sync_memberlist_,
new_config_info.log_sync_replica_num_))) {
PALF_LOG(WARN, "check_follower_sync_status_ eagain", KR(ret), K_(palf_id), K_(self), K(args), K(new_config_info));
OB_FAIL(wait_log_barrier_(args, new_config_info))) {
PALF_LOG(WARN, "wait_log_barrier_ eagain", KR(ret), K_(palf_id), K_(self), K(args), K(new_config_info));
if (OB_LOG_NOT_SYNC == ret) {
state_mgr_->reset_changing_config_with_arb();
}
@ -1933,18 +1932,16 @@ void LogConfigChangeArgs::reset()
}
int LogConfigMgr::check_follower_sync_status(const LogConfigChangeArgs &args,
const ObMemberList &new_member_list,
const int64_t new_replica_num,
const LogConfigInfo &new_config_info,
bool &added_member_has_new_version) const
{
int ret = OB_SUCCESS;
SpinLockGuard guard(lock_);
return check_follower_sync_status_(args, new_member_list, new_replica_num, added_member_has_new_version);
return check_follower_sync_status_(args, new_config_info, added_member_has_new_version);
}
int LogConfigMgr::wait_log_barrier_(const LogConfigChangeArgs &args,
const ObMemberList &new_member_list,
const int64_t new_replica_num) const
const LogConfigInfo &new_config_info) const
{
int ret = OB_SUCCESS;
LSN first_committed_end_lsn;
@ -1957,12 +1954,11 @@ int LogConfigMgr::wait_log_barrier_(const LogConfigChangeArgs &args,
constexpr bool need_remote_check = false;
const bool need_skip_log_barrier = mode_mgr_->need_skip_log_barrier();
LSN prev_log_end_lsn;
if (new_member_list.get_member_number() == 0) {
if (new_config_info.log_sync_memberlist_.get_member_number() == 0) {
ret = OB_INVALID_ARGUMENT;
} else if (OB_FAIL(sync_get_committed_end_lsn_(args, new_member_list, new_replica_num,
need_purge_throttling, need_remote_check, conn_timeout_us, first_committed_end_lsn, unused_bool, unused_lsn, unused_id))) {
PALF_LOG(WARN, "sync_get_committed_end_lsn failed", K(ret), K_(palf_id), K_(self),
K(new_member_list), K(new_replica_num));
} else if (OB_FAIL(sync_get_committed_end_lsn_(args, new_config_info, need_purge_throttling,
need_remote_check, conn_timeout_us, first_committed_end_lsn, unused_bool, unused_lsn, unused_id))) {
PALF_LOG(WARN, "sync_get_committed_end_lsn failed", K(ret), K_(palf_id), K_(self), K(new_config_info));
} else if (need_skip_log_barrier) {
ret = OB_SUCCESS;
PALF_LOG(INFO, "PALF is in FLASHBACK mode, skip log barrier", K(ret), K_(palf_id), K_(self), \
@ -1998,14 +1994,13 @@ int LogConfigMgr::wait_log_barrier_(const LogConfigChangeArgs &args,
start_wait_barrier_time_us_ = curr_ts_us;
}
}
PALF_LOG(INFO, "waiting for log barrier", K(ret), K_(palf_id), K_(self), K(first_committed_end_lsn),
K(prev_log_end_lsn), K(new_member_list), K(new_replica_num));
PALF_LOG(INFO, "waiting for log barrier", K(ret), K_(palf_id), K_(self),
K(first_committed_end_lsn), K(prev_log_end_lsn), K(new_config_info));
return ret;
}
int LogConfigMgr::check_follower_sync_status_(const LogConfigChangeArgs &args,
const ObMemberList &new_member_list,
const int64_t new_replica_num,
const LogConfigInfo &new_config_info,
bool &added_member_has_new_version) const
{
int ret = OB_SUCCESS;
@ -2022,15 +2017,13 @@ int LogConfigMgr::check_follower_sync_status_(const LogConfigChangeArgs &args,
(void) sw_->get_committed_end_lsn(first_leader_committed_end_lsn);
const bool need_skip_log_barrier = mode_mgr_->need_skip_log_barrier();
if (new_member_list.get_member_number() == 0) {
if (new_config_info.log_sync_memberlist_.get_member_number() == 0) {
ret = OB_INVALID_ARGUMENT;
} else if (new_member_list.get_member_number() == 1) {
ret = OB_SUCCESS;
} else if (OB_FAIL(sync_get_committed_end_lsn_(args, new_member_list, new_replica_num, need_purge_throttling,
need_remote_check, conn_timeout_us, first_committed_end_lsn, added_member_has_new_version, added_member_flushed_end_lsn,
added_member_last_slide_log_id))) {
PALF_LOG(WARN, "sync_get_committed_end_lsn failed", K(ret), K_(palf_id), K_(self), K(new_member_list),
K(new_replica_num), K(added_member_has_new_version));
} else if (OB_FAIL(sync_get_committed_end_lsn_(args, new_config_info, need_purge_throttling,
need_remote_check, conn_timeout_us, first_committed_end_lsn, added_member_has_new_version,
added_member_flushed_end_lsn, added_member_last_slide_log_id))) {
PALF_LOG(WARN, "sync_get_committed_end_lsn failed", K(ret), K_(palf_id), K_(self), K(new_config_info),
K(added_member_has_new_version));
} else if (need_skip_log_barrier) {
ret = OB_SUCCESS;
PALF_LOG(INFO, "PALF is in FLASHBACK mode, skip log barrier", K(ret), K_(palf_id), K_(self), \
@ -2038,11 +2031,11 @@ int LogConfigMgr::check_follower_sync_status_(const LogConfigChangeArgs &args,
} else if (first_committed_end_lsn >= first_leader_committed_end_lsn) {
// if committed lsn of new majority do not retreat, then start config change
PALF_LOG(INFO, "majority of new_member_list are sync with leader, start config change", K(ret), K_(palf_id), K_(self),
K(first_committed_end_lsn), K(first_leader_committed_end_lsn), K(new_member_list), K(new_replica_num), K(conn_timeout_us));
K(first_committed_end_lsn), K(first_leader_committed_end_lsn), K(new_config_info), K(conn_timeout_us));
// when quorum has been changed (e.g., 1 -> 2), committed_end_lsn of new memberlist may always be behind the committed_end_lsn of
// leader, so we relax the condition for adding members which has changed quorum
} else if (is_add_log_sync_member_list(args.type_) &&
(new_replica_num / 2) > (config_meta_.curr_.log_sync_replica_num_ / 2) &&
(new_config_info.log_sync_replica_num_ / 2) > (config_meta_.curr_.log_sync_replica_num_ / 2) &&
config_meta_.curr_.arbitration_member_.is_valid()) {
if (added_member_flushed_end_lsn.is_valid() &&
first_leader_committed_end_lsn - added_member_flushed_end_lsn < LEADER_DEFAULT_GROUP_BUFFER_SIZE &&
@ -2050,17 +2043,17 @@ int LogConfigMgr::check_follower_sync_status_(const LogConfigChangeArgs &args,
leader_last_slide_log_id - added_member_last_slide_log_id < PALF_SLIDING_WINDOW_SIZE)) {
ret = OB_SUCCESS;
PALF_LOG(INFO, "the gap between the leader and added member is smaller than the group_buffer_size",
K(ret), K_(palf_id), K_(self), K(args), K(new_replica_num), K(first_leader_committed_end_lsn),
K(ret), K_(palf_id), K_(self), K(args), K(new_config_info), K(first_leader_committed_end_lsn),
K(added_member_flushed_end_lsn), K(leader_last_slide_log_id), K(added_member_last_slide_log_id));
} else {
ret = OB_EAGAIN;
PALF_LOG(INFO, "the gap between the leader and added member is larger than the group_buffer_size, skip",
K(ret), K_(palf_id), K_(self), K(args), K(new_replica_num), K(first_leader_committed_end_lsn),
K(ret), K_(palf_id), K_(self), K(args), K(new_config_info), K(first_leader_committed_end_lsn),
K(added_member_flushed_end_lsn), K(leader_last_slide_log_id), K(added_member_last_slide_log_id));
}
} else {
PALF_LOG(INFO, "majority of new_member_list aren't sync with leader", K_(palf_id), K_(self), K(first_committed_end_lsn),
K(first_leader_committed_end_lsn), K(new_member_list), K(new_replica_num), K(conn_timeout_us));
K(first_leader_committed_end_lsn), K(new_config_info), K(conn_timeout_us));
// committed_lsn of new majority is behind than old majority's, we want to know if
// they can catch up with leader during config change timeout. If they can, start config change
ob_usleep(500 * 1000);
@ -2068,15 +2061,15 @@ int LogConfigMgr::check_follower_sync_status_(const LogConfigChangeArgs &args,
int64_t expected_sync_time_s;
int64_t sync_speed_gap;
added_member_has_new_version = is_add_member_list(args.type_)? false: true;
if (OB_FAIL(sync_get_committed_end_lsn_(args, new_member_list, new_replica_num, false/*no need purge throttling*/,
if (OB_FAIL(sync_get_committed_end_lsn_(args, new_config_info, false/*no need purge throttling*/,
need_remote_check, conn_timeout_us, second_committed_end_lsn, added_member_has_new_version,
added_member_flushed_end_lsn, added_member_last_slide_log_id))) {
PALF_LOG(WARN, "sync_get_committed_end_lsn failed", K(ret), K_(palf_id), K_(self), K(new_member_list),
K(new_replica_num), K(added_member_has_new_version));
PALF_LOG(WARN, "sync_get_committed_end_lsn failed", K(ret), K_(palf_id), K_(self),
K(new_config_info), K(added_member_has_new_version));
} else if (second_committed_end_lsn >= second_leader_committed_end_lsn) {
// if committed lsn of new majority do not retreat, then start config change
PALF_LOG(INFO, "majority of new_member_list are sync with leader, start config change", K_(palf_id), K_(self),
K(second_committed_end_lsn), K(second_leader_committed_end_lsn), K(new_member_list), K(new_replica_num), K(conn_timeout_us));
K(second_committed_end_lsn), K(second_leader_committed_end_lsn), K(new_config_info), K(conn_timeout_us));
} else if (FALSE_IT(sync_speed_gap = ((second_committed_end_lsn - first_committed_end_lsn) * 2) - \
((second_leader_committed_end_lsn - first_leader_committed_end_lsn) * 2) )) {
} else if (sync_speed_gap <= 0) {
@ -2133,8 +2126,8 @@ int LogConfigMgr::check_servers_lsn_and_version_(const common::ObAddr &server,
get_from_local = true;
} else if (OB_FAIL(log_engine_->submit_config_change_pre_check_req(server, config_version, need_purge_throttling,
conn_timeout_us, resp))) {
PALF_LOG(WARN, "submit_config_change_pre_check_req failed", KR(ret), K_(palf_id), K_(self),
K(server), K(need_purge_throttling), K(config_version), K(conn_timeout_us), K(resp));
// PALF_LOG(WARN, "submit_config_change_pre_check_req failed", KR(ret), K_(palf_id), K_(self),
// K(server), K(need_purge_throttling), K(config_version), K(conn_timeout_us), K(resp));
has_same_version = false;
} else if (false == resp.is_normal_replica_) {
has_same_version = false;
@ -2155,8 +2148,7 @@ int LogConfigMgr::check_servers_lsn_and_version_(const common::ObAddr &server,
// 2. check if the config_version of added member are same to current config_version.
// if the config change don't add member to list, return true
int LogConfigMgr::sync_get_committed_end_lsn_(const LogConfigChangeArgs &args,
const ObMemberList &new_member_list,
const int64_t new_replica_num,
const LogConfigInfo &new_config_info,
const bool need_purge_throttling,
const bool need_remote_check,
const int64_t conn_timeout_us,
@ -2166,68 +2158,73 @@ int LogConfigMgr::sync_get_committed_end_lsn_(const LogConfigChangeArgs &args,
int64_t &added_member_last_slide_log_id) const
{
int ret = OB_SUCCESS, tmp_ret = OB_SUCCESS;
int64_t resp_cnt = 0;
const LogConfigVersion config_version = config_meta_.curr_.config_version_;
int64_t log_sync_resp_cnt = 0, paxos_resp_cnt = 0;
LSN lsn_array[OB_MAX_MEMBER_NUMBER];
const LogConfigVersion &config_version = config_meta_.curr_.config_version_;
const common::ObMemberList new_log_sync_memberlist = new_config_info.log_sync_memberlist_;
const int64_t new_log_sync_replica_num = new_config_info.log_sync_replica_num_;
common::ObMemberList new_paxos_memberlist;
int64_t new_paxos_replica_num = 0;
GlobalLearnerList unused_list;
added_member_has_new_version = is_add_member_list(args.type_)? false: true;
added_member_flushed_end_lsn.reset();
added_member_last_slide_log_id = 0;
for (int64_t i = 0; i < new_member_list.get_member_number(); ++i) {
if (OB_FAIL(new_config_info.convert_to_complete_config(new_paxos_memberlist, new_paxos_replica_num, unused_list))) {
PALF_LOG(WARN, "convert_to_complete_config failed", K(ret), K_(palf_id), K_(self), K(new_config_info));
}
for (int64_t i = 0; OB_SUCC(ret) && i < new_paxos_memberlist.get_member_number(); ++i) {
common::ObAddr server;
bool is_added_member = false;
bool force_remote_check = false;
LSN max_flushed_end_lsn;
bool has_same_version = false;
if (OB_SUCCESS != (tmp_ret = new_member_list.get_server_by_index(i, server))) {
PALF_LOG(ERROR, "get_server_by_index failed", KR(ret), K_(palf_id), K_(self), K(i),
"new_member_list size:", new_member_list.get_member_number());
} else if (FALSE_IT(is_added_member = is_add_member_list(args.type_) && (args.server_.get_server() == server))) {
bool is_arb_member = false;
int64_t last_slide_log_id = 0;
if (OB_SUCCESS != (tmp_ret = new_paxos_memberlist.get_server_by_index(i, server))) {
PALF_LOG(ERROR, "get_server_by_index failed", KR(ret), K_(palf_id), K_(self), K(i), K(new_paxos_memberlist));
} else if (FALSE_IT(is_added_member = (is_add_member_list(args.type_) && (args.server_.get_server() == server)))) {
} else if (FALSE_IT(is_arb_member = (server == new_config_info.arbitration_member_.get_server()))) {
} else if (FALSE_IT(force_remote_check = is_added_member || need_purge_throttling || need_remote_check)) {
} else if (OB_SUCCESS != (tmp_ret = check_servers_lsn_and_version_(server, config_version,
conn_timeout_us, force_remote_check, need_purge_throttling, max_flushed_end_lsn, has_same_version,
added_member_last_slide_log_id))) {
last_slide_log_id))) {
// PALF_LOG(WARN, "check_servers_lsn_and_version_ failed", K(ret), K(tmp_ret), K_(palf_id), K_(self), K(server),
// K(config_version), K(conn_timeout_us), K(force_remote_check), K(max_flushed_end_lsn), K(has_same_version));
} else if (false == is_arb_member) {
lsn_array[log_sync_resp_cnt++] = max_flushed_end_lsn;
paxos_resp_cnt++;
} else {
lsn_array[resp_cnt++] = max_flushed_end_lsn;
paxos_resp_cnt++;
}
added_member_has_new_version = (is_added_member)? has_same_version: added_member_has_new_version;
added_member_flushed_end_lsn = (is_added_member)? max_flushed_end_lsn: added_member_flushed_end_lsn;
}
// added member isn't in new_member_list, e.g., add arb member
if (ADD_ARB_MEMBER == args.type_) {
LSN max_flushed_end_lsn;
bool has_same_version = false;
if (OB_SUCCESS != (tmp_ret = check_servers_lsn_and_version_(args.server_.get_server(),
config_version, conn_timeout_us, true/*force_check*/, false/*need_purge_throttle*/, max_flushed_end_lsn,
has_same_version, added_member_last_slide_log_id))) {
PALF_LOG(WARN, "check_servers_lsn_and_version_ failed", K(ret), K_(palf_id), K_(self), K(args), K(config_version),
K(conn_timeout_us), K(max_flushed_end_lsn), K(has_same_version));
}
added_member_has_new_version = has_same_version;
added_member_last_slide_log_id = (is_added_member)? last_slide_log_id: added_member_last_slide_log_id;
}
if (false == added_member_has_new_version) {
ret = OB_EAGAIN;
PALF_LOG(WARN, "added member don't have new version, eagain", K(ret), K_(palf_id),
K_(self), K(args), K(config_version));
} else if (resp_cnt < new_replica_num / 2 + 1) {
} else if ((paxos_resp_cnt < new_paxos_replica_num / 2 + 1) ||
(log_sync_resp_cnt < new_log_sync_replica_num / 2 + 1)) {
// do not recv majority resp, can not change member
ret = OB_EAGAIN;
PALF_LOG(WARN, "connection timeout with majority of new_member_list, can't change member!",
K_(palf_id), K_(self), K(new_member_list), K(new_replica_num), K(resp_cnt), K(conn_timeout_us));
K_(palf_id), K_(self), K(new_paxos_replica_num), K(paxos_resp_cnt),
K(new_log_sync_replica_num), K(log_sync_resp_cnt), K(conn_timeout_us));
} else {
std::sort(lsn_array, lsn_array + resp_cnt, LSNCompare());
committed_end_lsn = lsn_array[new_replica_num / 2];
std::sort(lsn_array, lsn_array + log_sync_resp_cnt, LSNCompare());
committed_end_lsn = lsn_array[new_log_sync_replica_num / 2];
}
PALF_LOG(INFO, "sync_get_committed_end_lsn_ finish", K(ret), K_(palf_id), K_(self), K(args),
K(new_member_list), K(new_replica_num), K(need_purge_throttling), K(need_remote_check),
K(new_config_info), K(need_purge_throttling), K(need_remote_check),
K(conn_timeout_us), K(committed_end_lsn), K(added_member_has_new_version),
K(added_member_flushed_end_lsn), K(added_member_last_slide_log_id),
"lsn_array:", common::ObArrayWrap<LSN>(lsn_array, resp_cnt));
K(paxos_resp_cnt), K(new_paxos_replica_num), K(log_sync_resp_cnt), K(new_log_sync_replica_num),
"lsn_array:", common::ObArrayWrap<LSN>(lsn_array, log_sync_resp_cnt));
return ret;
}

View File

@ -326,12 +326,10 @@ public:
virtual int submit_broadcast_leader_info(const int64_t proposal_id) const;
virtual void reset_status();
int check_follower_sync_status(const LogConfigChangeArgs &args,
const ObMemberList &new_member_list,
const int64_t new_replica_num,
const LogConfigInfo &new_config_info,
bool &added_member_has_new_version) const;
int wait_log_barrier_(const LogConfigChangeArgs &args,
const ObMemberList &new_member_list,
const int64_t new_replica_num) const;
const LogConfigInfo &new_config_info) const;
int sync_meta_for_arb_election_leader();
bool need_sync_to_degraded_learners() const;
// ================ Config Change ==================
@ -436,8 +434,7 @@ private:
bool &has_same_version,
int64_t &last_slide_log_id) const;
int sync_get_committed_end_lsn_(const LogConfigChangeArgs &args,
const ObMemberList &new_member_list,
const int64_t new_replica_num,
const LogConfigInfo &new_config_info,
const bool need_purge_throttling,
const bool need_remote_check,
const int64_t conn_timeout_us,
@ -446,8 +443,7 @@ private:
LSN &added_member_flushed_end_lsn,
int64_t &added_member_last_slide_log_id) const;
int check_follower_sync_status_(const LogConfigChangeArgs &args,
const ObMemberList &new_member_list,
const int64_t new_replica_num,
const LogConfigInfo &new_config_info,
bool &added_member_has_new_version) const;
int pre_sync_config_log_and_mode_meta_(const common::ObMember &server,
const int64_t proposal_id,

View File

@ -96,7 +96,7 @@ const int64_t MATCH_LSN_ADVANCE_DELAY_THRESHOLD_US = 1 * 1000 * 1000L;
const int64_t PALF_RECONFIRM_FETCH_MAX_LSN_INTERVAL = 1 * 1000 * 1000;
const int64_t PALF_FETCH_LOG_INTERVAL_US = 2 * 1000 * 1000L; // 2s
// Control the fetch interval trigger by outer(eg. config change pre check) by 500ms.
const int64_t PALF_FETCH_LOG_OUTER_TRIGGER_INTERVAL = 100 * 1000 * 1000L;
const int64_t PALF_FETCH_LOG_OUTER_TRIGGER_INTERVAL = 500 * 1000L; // 500 ms
const int64_t PALF_FETCH_LOG_RENEW_LEADER_INTERVAL_US = 5 * 1000 * 1000; // 5s
const int64_t PALF_LEADER_RECONFIRM_SYNC_TIMEOUT_US = 10 * 1000 * 1000L; // 10s
const int64_t PREPARE_LOG_BUFFER_SIZE = 2048;

View File

@ -943,21 +943,16 @@ int PalfHandleImpl::check_args_and_generate_config_(const LogConfigChangeArgs &a
const int64_t proposal_id,
const int64_t election_epoch,
bool &is_already_finished,
common::ObMemberList &log_sync_memberlist,
int64_t &log_sync_repclia_num) const
LogConfigInfo &new_config_info) const
{
int ret = OB_SUCCESS;
RLockGuard guard(lock_);
LogConfigInfo config_info;
if (OB_FAIL(config_mgr_.check_args_and_generate_config(args, proposal_id,
election_epoch, is_already_finished, config_info))) {
election_epoch, is_already_finished, new_config_info))) {
if (palf_reach_time_interval(100 * 1000, config_change_print_time_us_)) {
PALF_LOG(WARN, "check_args_and_generate_config failed", K(ret), KPC(this), K(args));
}
} else {
log_sync_memberlist = config_info.log_sync_memberlist_;
log_sync_repclia_num = config_info.log_sync_replica_num_;
}
} else { }
return ret;
}
@ -969,9 +964,8 @@ int PalfHandleImpl::one_stage_config_change_(const LogConfigChangeArgs &args,
int64_t proposal_id = INVALID_PROPOSAL_ID;
int64_t election_epoch = INVALID_PROPOSAL_ID;
bool is_already_finished = false;
ObMemberList new_log_sync_memberlist;
int64_t new_log_sync_replica_num = 0;
int get_lock = OB_EAGAIN;
LogConfigInfo new_config_info;
if (DEGRADE_ACCEPTOR_TO_LEARNER == args.type_) {
// for concurrent DEGRADE
if (ATOMIC_BCAS(&has_higher_prio_config_change_, false, true)) {
@ -994,7 +988,7 @@ int PalfHandleImpl::one_stage_config_change_(const LogConfigChangeArgs &args,
PALF_LOG(WARN, "start_change_config failed", KR(ret), KPC(this), K(args));
} else if (FALSE_IT(doing_degrade = (args.type_ == DEGRADE_ACCEPTOR_TO_LEARNER))) {
} else if (OB_FAIL(check_args_and_generate_config_(args, proposal_id, election_epoch,
is_already_finished, new_log_sync_memberlist, new_log_sync_replica_num))) {
is_already_finished, new_config_info))) {
if (palf_reach_time_interval(100 * 1000, config_change_print_time_us_)) {
PALF_LOG(WARN, "check_args_and_generate_config failed", KR(ret), KPC(this), K(args));
}
@ -1030,13 +1024,12 @@ int PalfHandleImpl::one_stage_config_change_(const LogConfigChangeArgs &args,
ret = OB_NOT_MASTER;
PALF_LOG(WARN, "leader has been switched, try to change config again", KR(ret), KPC(this),
K(proposal_id), K(curr_proposal_id));
} else if (OB_SUCC(config_mgr_.check_follower_sync_status(args, new_log_sync_memberlist,
new_log_sync_replica_num, added_member_has_new_version))) {
} else if (OB_SUCC(config_mgr_.check_follower_sync_status(args, new_config_info,
added_member_has_new_version))) {
// check log synchronization progress of new memberlist majority synchronically
break;
} else if (OB_EAGAIN != ret) {
PALF_LOG(WARN, "check_follower_sync_status_ fails", K(ret), K_(palf_id),
K(new_log_sync_memberlist), K(new_log_sync_replica_num));
PALF_LOG(WARN, "check_follower_sync_status_ fails", K(ret), K_(palf_id), K(new_config_info));
} else if (is_upgrade_or_degrade(args.type_)) {
ret = OB_EAGAIN;
PALF_LOG(WARN, "degrade/upgrade eagain, arb_reason: check_follower_sync_status_ return false",
@ -1101,8 +1094,14 @@ int PalfHandleImpl::one_stage_config_change_(const LogConfigChangeArgs &args,
PALF_LOG(INFO, "one_stage_config_change finish", KR(ret), KPC(this), K(args), K(config_version),
K(timeout_us), K(time_guard));
ret = (OB_LOG_NOT_SYNC == ret)? OB_EAGAIN: ret;
if (OB_TIMEOUT == ret && config_version.is_valid()) {
config_mgr_.after_config_change_timeout(config_version);
if (OB_FAIL(ret)) {
if (config_version.is_valid()) {
config_mgr_.after_config_change_timeout(config_version);
} else {
// encounter unexpected error, reset flag
WLockGuard guard(lock_);
state_mgr_.reset_changing_config_with_arb();
}
}
}
if (OB_SUCCESS == get_lock) {

View File

@ -1046,8 +1046,7 @@ private:
const int64_t proposal_id,
const int64_t election_epoch,
bool &is_already_finished,
common::ObMemberList &log_sync_memberlist,
int64_t &log_sync_repclia_num) const;
LogConfigInfo &new_config_info) const;
int one_stage_config_change_(const LogConfigChangeArgs &args, const int64_t timeout_us);
int check_need_rebuild_(const LSN &base_lsn,
const LogInfo &base_prev_log_info,

View File

@ -28,6 +28,7 @@ public:
~MockNetKeepAliveAdapter() { }
bool in_black_or_stopped(const common::ObAddr &server) override final {return false;}
bool is_server_stopped(const common::ObAddr &server) override final {return false;}
bool in_black(const common::ObAddr &server) override final {return false;}
};
const ObAddr addr1(ObAddr::IPV4, "127.0.0.1", 1000);