From 0bb91248969e421153355d8a6ef77f66a75b5971 Mon Sep 17 00:00:00 2001 From: BinChenn Date: Thu, 1 Jun 2023 06:53:49 +0000 Subject: [PATCH] [PALF] do not change config if majority of paxos members have crashed --- .../logservice/env/ob_simple_log_server.cpp | 5 + mittest/logservice/env/ob_simple_log_server.h | 1 + mittest/logservice/test_ob_simple_log_arb.cpp | 19 ++- .../test_ob_simple_log_arb_mock_ele.cpp | 144 +++++++++++++++++- src/logservice/ob_net_keepalive_adapter.cpp | 12 ++ src/logservice/ob_net_keepalive_adapter.h | 2 + src/logservice/palf/log_config_mgr.cpp | 129 ++++++++-------- src/logservice/palf/log_config_mgr.h | 12 +- src/logservice/palf/log_define.h | 2 +- src/logservice/palf/palf_handle_impl.cpp | 33 ++-- src/logservice/palf/palf_handle_impl.h | 3 +- .../test_ob_arbitration_service.cpp | 1 + 12 files changed, 261 insertions(+), 102 deletions(-) diff --git a/mittest/logservice/env/ob_simple_log_server.cpp b/mittest/logservice/env/ob_simple_log_server.cpp index ba32314b78..c9cc4b719c 100644 --- a/mittest/logservice/env/ob_simple_log_server.cpp +++ b/mittest/logservice/env/ob_simple_log_server.cpp @@ -67,6 +67,11 @@ bool MockNetKeepAliveAdapter::is_server_stopped(const common::ObAddr &server) return log_deliver_->need_filter_packet_by_blacklist(server); } +bool MockNetKeepAliveAdapter::in_black(const common::ObAddr &server) +{ + return log_deliver_->need_filter_packet_by_blacklist(server); +} + uint32_t get_local_addr(const char *dev_name) { int fd, intrface; diff --git a/mittest/logservice/env/ob_simple_log_server.h b/mittest/logservice/env/ob_simple_log_server.h index 498a5e5acd..84f45ca26b 100644 --- a/mittest/logservice/env/ob_simple_log_server.h +++ b/mittest/logservice/env/ob_simple_log_server.h @@ -77,6 +77,7 @@ public: int init(unittest::ObLogDeliver *log_deliver); bool in_black_or_stopped(const common::ObAddr &server) override final; bool is_server_stopped(const common::ObAddr &server) override final; + bool in_black(const common::ObAddr &server) override final; private: unittest::ObLogDeliver *log_deliver_; }; diff --git a/mittest/logservice/test_ob_simple_log_arb.cpp b/mittest/logservice/test_ob_simple_log_arb.cpp index 3a30a52315..66be6d469a 100644 --- a/mittest/logservice/test_ob_simple_log_arb.cpp +++ b/mittest/logservice/test_ob_simple_log_arb.cpp @@ -663,6 +663,18 @@ TEST_F(TestObSimpleLogClusterArbService, test_2f1a_degrade_when_no_leader) EXPECT_EQ(OB_SUCCESS, submit_log(leader, 100, id)); sleep(2); + LogConfigChangeArgs args(ObMember(palf_list[another_f_idx]->palf_handle_impl_->self_, 1), 0, DEGRADE_ACCEPTOR_TO_LEARNER); + const int64_t proposal_id = leader.palf_handle_impl_->state_mgr_.get_proposal_id(); + const int64_t election_epoch = leader.palf_handle_impl_->state_mgr_.get_leader_epoch(); + LogConfigVersion config_version; + EXPECT_EQ(OB_EAGAIN, leader.palf_handle_impl_->config_mgr_.change_config(args, proposal_id, election_epoch, config_version)); + + // leader appended config meta, but did not apply config meta + EXPECT_NE(palf_list[leader_idx]->get_palf_handle_impl()->config_mgr_.log_ms_meta_.curr_.config_version_, + palf_list[leader_idx]->get_palf_handle_impl()->config_mgr_.config_meta_.curr_.config_version_); + EXPECT_EQ(palf_list[another_f_idx]->get_palf_handle_impl()->config_mgr_.log_ms_meta_.curr_.config_version_, + palf_list[another_f_idx]->get_palf_handle_impl()->config_mgr_.config_meta_.curr_.config_version_); + // block all networks of arb member, and the network from the follower to the leader block_net(arb_replica_idx, another_f_idx, true); block_net(arb_replica_idx, leader_idx, true); @@ -673,12 +685,6 @@ TEST_F(TestObSimpleLogClusterArbService, test_2f1a_degrade_when_no_leader) sleep(1); } - // leader appended config meta, but did not apply config meta - EXPECT_NE(palf_list[leader_idx]->get_palf_handle_impl()->config_mgr_.log_ms_meta_.curr_.config_version_, - palf_list[leader_idx]->get_palf_handle_impl()->config_mgr_.config_meta_.curr_.config_version_); - EXPECT_EQ(palf_list[another_f_idx]->get_palf_handle_impl()->config_mgr_.log_ms_meta_.curr_.config_version_, - palf_list[another_f_idx]->get_palf_handle_impl()->config_mgr_.config_meta_.curr_.config_version_); - // unblock_net unblock_net(another_f_idx, leader_idx); unblock_net(arb_replica_idx, leader_idx); @@ -771,7 +777,6 @@ TEST_F(TestObSimpleLogClusterArbService, test_2f1a_upgrade_when_no_leader) PALF_LOG(INFO, "end test_2f1a_upgrade_when_no_leader", K(id)); } - } // end unittest } // end oceanbase diff --git a/mittest/logservice/test_ob_simple_log_arb_mock_ele.cpp b/mittest/logservice/test_ob_simple_log_arb_mock_ele.cpp index c688f749a3..9542a2fa40 100644 --- a/mittest/logservice/test_ob_simple_log_arb_mock_ele.cpp +++ b/mittest/logservice/test_ob_simple_log_arb_mock_ele.cpp @@ -80,7 +80,7 @@ std::string ObSimpleLogClusterTestBase::test_name_ = TEST_NAME; TEST_F(TestObSimpleLogClusterArbMockEleService, switch_leader_during_degrading) { int ret = OB_SUCCESS; - const int64_t id = ATOMIC_AAF(&palf_id_, 1); + const int64_t id = ATOMIC_AAF(&palf_id_, 1); const int64_t TIMEOUT_US = 10 * 1000 * 1000L; SET_CASE_LOG_FILE(TEST_NAME, "switch_leader_during_degrading"); PALF_LOG(INFO, "begin test switch_leader_during_degrading", K(id)); @@ -131,6 +131,7 @@ TEST_F(TestObSimpleLogClusterArbMockEleService, switch_leader_during_degrading) // A can not degrades B successfully EXPECT_EQ(OB_NOT_MASTER, a_handle->palf_handle_impl_->config_mgr_.change_config_(degrade_b_args, degrade_b_pid, degrade_b_ele_epoch, degrade_b_version)); + a_handle->palf_handle_impl_->config_mgr_.end_degrade(); for (auto srv: get_cluster()) { srv->set_leader(id, a_addr, 3); @@ -238,6 +239,7 @@ TEST_F(TestObSimpleLogClusterArbMockEleService, switch_leader_to_other_during_de // A can not degrades B successfully EXPECT_EQ(OB_NOT_MASTER, a_handle->palf_handle_impl_->config_mgr_.change_config_(degrade_b_args, degrade_b_pid, degrade_b_ele_epoch, degrade_b_version)); + a_handle->palf_handle_impl_->config_mgr_.end_degrade(); for (auto srv: get_cluster()) { srv->set_leader(id, b_addr, 3); @@ -293,6 +295,146 @@ TEST_F(TestObSimpleLogClusterArbMockEleService, switch_leader_to_other_during_de PALF_LOG(INFO, "end test switch_leader_to_other_during_degrading", K(id)); } +// 1. 2F1A, the leader starts to degrade another F +// 2. after the config log has been accepted by the arb member, the leader revoked +// 3. the previous leader has been elected as the new leader +// 4. reconfirm may fail because leader's config_version is not same to that of the follower +TEST_F(TestObSimpleLogClusterArbMockEleService, test_2f1a_degrade_when_no_leader2) +{ + int ret = OB_SUCCESS; + const int64_t id = ATOMIC_AAF(&palf_id_, 1); + const int64_t TIMEOUT_US = 10 * 1000 * 1000L; + SET_CASE_LOG_FILE(TEST_NAME, "test_2f1a_degrade_when_no_leader2"); + PALF_LOG(INFO, "begin test test_2f1a_degrade_when_no_leader2", K(id)); + { + int64_t leader_idx = 0; + int64_t arb_replica_idx = 0; + PalfHandleImplGuard leader; + std::vector palf_list; + EXPECT_EQ(OB_SUCCESS, create_paxos_group_with_arb_mock_election(id, arb_replica_idx, leader_idx, leader)); + EXPECT_EQ(OB_SUCCESS, submit_log(leader, 200, id)); + EXPECT_EQ(OB_SUCCESS, get_cluster_palf_handle_guard(id, palf_list)); + + const int64_t b_idx = (leader_idx + 1) % 3; + const int64_t c_idx = (leader_idx + 2) % 3; + const common::ObAddr a_addr = get_cluster()[leader_idx]->get_addr(); + const common::ObAddr b_addr = get_cluster()[b_idx]->get_addr(); + const common::ObAddr c_addr = get_cluster()[c_idx]->get_addr(); + PalfHandleImplGuard *a_handle = palf_list[leader_idx]; + PalfHandleImplGuard *b_handle = palf_list[b_idx]; + IPalfHandleImpl *c_ihandle = NULL; + get_cluster()[c_idx]->get_palf_env()->get_palf_handle_impl(id, c_ihandle); + palflite::PalfHandleLite *c_handle = dynamic_cast(c_ihandle); + dynamic_cast(get_cluster()[leader_idx])->log_service_.get_arbitration_service()->stop(); + dynamic_cast(get_cluster()[b_idx])->log_service_.get_arbitration_service()->stop(); + + // 1. A tries to degrade B + block_net(leader_idx, b_idx); + int64_t degrade_b_pid = 0; + int64_t degrade_b_ele_epoch = 0; + LogConfigVersion degrade_b_version; + LogConfigChangeArgs degrade_b_args(ObMember(b_addr, 1), 0, DEGRADE_ACCEPTOR_TO_LEARNER); + EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->config_mgr_.start_change_config(degrade_b_pid, degrade_b_ele_epoch, degrade_b_args.type_)); + + while (0 == leader.palf_handle_impl_->config_mgr_.state_) { + EXPECT_EQ(OB_EAGAIN, leader.palf_handle_impl_->config_mgr_.change_config_(degrade_b_args, degrade_b_pid, degrade_b_ele_epoch, degrade_b_version)); + } + const LSN &leader_last_committed_end_lsn = leader.palf_handle_impl_->sw_.committed_end_lsn_; + sleep(2); + EXPECT_EQ(leader_last_committed_end_lsn, leader.palf_handle_impl_->sw_.committed_end_lsn_); + + // A sends config meta to C + while (-1 == leader.palf_handle_impl_->config_mgr_.last_submit_config_log_time_us_ || + c_handle->config_mgr_.log_ms_meta_.curr_.log_sync_memberlist_.contains(b_addr)) { + EXPECT_EQ(OB_EAGAIN, leader.palf_handle_impl_->config_mgr_.change_config_(degrade_b_args, degrade_b_pid, degrade_b_ele_epoch, degrade_b_version)); + usleep(500); + } + + EXPECT_FALSE(leader.palf_handle_impl_->config_mgr_.log_ms_meta_.curr_.log_sync_memberlist_.contains(b_addr)); + EXPECT_TRUE(leader.palf_handle_impl_->config_mgr_.config_meta_.curr_.log_sync_memberlist_.contains(b_addr)); + + // 2. the leader A revokes and takeover again + for (auto srv: get_cluster()) { + const ObAddr addr1(ObAddr::IPV4, "0.0.0.0", 0); + srv->set_leader(id, addr1); + } + EXPECT_UNTIL_EQ(false, a_handle->palf_handle_impl_->state_mgr_.is_leader_active()); + + // A can not degrades B successfully + EXPECT_EQ(OB_NOT_MASTER, a_handle->palf_handle_impl_->config_mgr_.change_config_(degrade_b_args, degrade_b_pid, degrade_b_ele_epoch, degrade_b_version)); + a_handle->palf_handle_impl_->config_mgr_.end_degrade(); + + for (auto srv: get_cluster()) { + srv->set_leader(id, a_addr, 3); + } + dynamic_cast(get_cluster()[leader_idx])->log_service_.get_arbitration_service()->start(); + EXPECT_UNTIL_EQ(true, a_handle->palf_handle_impl_->state_mgr_.is_leader_active()); + EXPECT_UNTIL_EQ(true, leader.palf_handle_impl_->config_mgr_.config_meta_.curr_.degraded_learnerlist_.contains(b_addr)); + + unblock_net(leader_idx, b_idx); + get_cluster()[c_idx]->get_palf_env()->revert_palf_handle_impl(c_ihandle); + revert_cluster_palf_handle_guard(palf_list); + } + delete_paxos_group(id); + PALF_LOG(INFO, "end test test_2f1a_degrade_when_no_leader2", K(id)); +} + +TEST_F(TestObSimpleLogClusterArbMockEleService, test_2f1a_degrade_when_arb_crash) +{ + OB_LOGGER.set_log_level("INFO"); + int ret = OB_SUCCESS; + const int64_t id = ATOMIC_AAF(&palf_id_, 1); + const int64_t TIMEOUT_US = 10 * 1000 * 1000L; + SET_CASE_LOG_FILE(TEST_NAME, "test_2f1a_degrade_when_arb_crash"); + PALF_LOG(INFO, "begin test test_2f1a_degrade_when_arb_crash", K(id)); + { + int64_t leader_idx = 0; + int64_t arb_replica_idx = 0; + PalfHandleImplGuard leader; + std::vector palf_list; + EXPECT_EQ(OB_SUCCESS, create_paxos_group_with_arb_mock_election(id, arb_replica_idx, leader_idx, leader)); + EXPECT_EQ(OB_SUCCESS, submit_log(leader, 200, id)); + EXPECT_EQ(OB_SUCCESS, get_cluster_palf_handle_guard(id, palf_list)); + + const int64_t b_idx = (leader_idx + 1) % 3; + const int64_t c_idx = (leader_idx + 2) % 3; + const common::ObAddr a_addr = get_cluster()[leader_idx]->get_addr(); + const common::ObAddr b_addr = get_cluster()[b_idx]->get_addr(); + const common::ObAddr c_addr = get_cluster()[c_idx]->get_addr(); + PalfHandleImplGuard *a_handle = palf_list[leader_idx]; + PalfHandleImplGuard *b_handle = palf_list[b_idx]; + + // block the network from the leader to the arb member + block_net(leader_idx, arb_replica_idx); + // block the network from the leader to the follower + block_net(leader_idx, b_idx); + sleep(4); + // the leader can not degrade successfully + EXPECT_TRUE(leader.palf_handle_impl_->config_mgr_.log_ms_meta_.curr_.log_sync_memberlist_.contains(b_addr)); + EXPECT_TRUE(leader.palf_handle_impl_->config_mgr_.config_meta_.curr_.log_sync_memberlist_.contains(b_addr)); + + // start to degrade B manually, do not allow + LogConfigChangeArgs args(ObMember(b_addr, 1), 0, DEGRADE_ACCEPTOR_TO_LEARNER); + EXPECT_EQ(OB_EAGAIN, leader.palf_handle_impl_->one_stage_config_change_(args, 10 * 1000 * 1000)); + EXPECT_EQ(false, leader.palf_handle_impl_->config_mgr_.is_sw_interrupted_by_degrade_); + EXPECT_EQ(false, leader.palf_handle_impl_->state_mgr_.is_changing_config_with_arb_); + + // start to degrade B manually, do not allow + int64_t degrade_b_pid = 0; + int64_t degrade_b_ele_epoch = 0; + LogConfigVersion degrade_b_version; + EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->config_mgr_.start_change_config(degrade_b_pid, degrade_b_ele_epoch, args.type_)); + EXPECT_EQ(OB_EAGAIN, leader.palf_handle_impl_->config_mgr_.change_config(args, degrade_b_pid, degrade_b_ele_epoch, degrade_b_version)); + EXPECT_EQ(true, leader.palf_handle_impl_->config_mgr_.is_sw_interrupted_by_degrade_); + EXPECT_EQ(true, leader.palf_handle_impl_->state_mgr_.is_changing_config_with_arb_); + + unblock_net(leader_idx, arb_replica_idx); + unblock_net(leader_idx, b_idx); + revert_cluster_palf_handle_guard(palf_list); + } + delete_paxos_group(id); + PALF_LOG(INFO, "end test test_2f1a_degrade_when_arb_crash", K(id)); +} } // end unittest } // end oceanbase diff --git a/src/logservice/ob_net_keepalive_adapter.cpp b/src/logservice/ob_net_keepalive_adapter.cpp index ed019adc57..c682017601 100644 --- a/src/logservice/ob_net_keepalive_adapter.cpp +++ b/src/logservice/ob_net_keepalive_adapter.cpp @@ -72,5 +72,17 @@ bool ObNetKeepAliveAdapter::is_server_stopped(const common::ObAddr &server) } return bool_ret; } + +bool ObNetKeepAliveAdapter::in_black(const common::ObAddr &server) +{ + bool bool_ret = false; + bool in_blacklist = false; + bool unused_is_server_stopped = false; + if (OB_SUCCESS != in_black_or_stopped_(server, in_blacklist, unused_is_server_stopped)) { + } else { + bool_ret = in_blacklist; + } + return bool_ret; +} } // end namespace logservice } // end namespace oceanbase diff --git a/src/logservice/ob_net_keepalive_adapter.h b/src/logservice/ob_net_keepalive_adapter.h index ef3af146c9..d141776c57 100644 --- a/src/logservice/ob_net_keepalive_adapter.h +++ b/src/logservice/ob_net_keepalive_adapter.h @@ -29,6 +29,7 @@ public: virtual ~IObNetKeepAliveAdapter() {} virtual bool in_black_or_stopped(const common::ObAddr &server) = 0; virtual bool is_server_stopped(const common::ObAddr &server) = 0; + virtual bool in_black(const common::ObAddr &server) = 0; }; class ObNetKeepAliveAdapter : public IObNetKeepAliveAdapter { @@ -37,6 +38,7 @@ public: ~ObNetKeepAliveAdapter() override; bool in_black_or_stopped(const common::ObAddr &server) override final; bool is_server_stopped(const common::ObAddr &server) override final; + bool in_black(const common::ObAddr &server) override final; private: int in_black_or_stopped_(const common::ObAddr &server, bool &in_black, diff --git a/src/logservice/palf/log_config_mgr.cpp b/src/logservice/palf/log_config_mgr.cpp index a8a0ab80f9..1f4919b11a 100644 --- a/src/logservice/palf/log_config_mgr.cpp +++ b/src/logservice/palf/log_config_mgr.cpp @@ -1374,9 +1374,8 @@ int LogConfigMgr::append_config_meta_(const int64_t curr_proposal_id, } else if (has_arb_member && OB_FAIL(state_mgr_->set_changing_config_with_arb())) { PALF_LOG(ERROR, "set_changing_config_with_arb failed", KR(ret), K_(palf_id), K_(self), K(args)); } else if (has_arb_member && - OB_FAIL(wait_log_barrier_(args, new_config_info.log_sync_memberlist_, - new_config_info.log_sync_replica_num_))) { - PALF_LOG(WARN, "check_follower_sync_status_ eagain", KR(ret), K_(palf_id), K_(self), K(args), K(new_config_info)); + OB_FAIL(wait_log_barrier_(args, new_config_info))) { + PALF_LOG(WARN, "wait_log_barrier_ eagain", KR(ret), K_(palf_id), K_(self), K(args), K(new_config_info)); if (OB_LOG_NOT_SYNC == ret) { state_mgr_->reset_changing_config_with_arb(); } @@ -1933,18 +1932,16 @@ void LogConfigChangeArgs::reset() } int LogConfigMgr::check_follower_sync_status(const LogConfigChangeArgs &args, - const ObMemberList &new_member_list, - const int64_t new_replica_num, + const LogConfigInfo &new_config_info, bool &added_member_has_new_version) const { int ret = OB_SUCCESS; SpinLockGuard guard(lock_); - return check_follower_sync_status_(args, new_member_list, new_replica_num, added_member_has_new_version); + return check_follower_sync_status_(args, new_config_info, added_member_has_new_version); } int LogConfigMgr::wait_log_barrier_(const LogConfigChangeArgs &args, - const ObMemberList &new_member_list, - const int64_t new_replica_num) const + const LogConfigInfo &new_config_info) const { int ret = OB_SUCCESS; LSN first_committed_end_lsn; @@ -1957,12 +1954,11 @@ int LogConfigMgr::wait_log_barrier_(const LogConfigChangeArgs &args, constexpr bool need_remote_check = false; const bool need_skip_log_barrier = mode_mgr_->need_skip_log_barrier(); LSN prev_log_end_lsn; - if (new_member_list.get_member_number() == 0) { + if (new_config_info.log_sync_memberlist_.get_member_number() == 0) { ret = OB_INVALID_ARGUMENT; - } else if (OB_FAIL(sync_get_committed_end_lsn_(args, new_member_list, new_replica_num, - need_purge_throttling, need_remote_check, conn_timeout_us, first_committed_end_lsn, unused_bool, unused_lsn, unused_id))) { - PALF_LOG(WARN, "sync_get_committed_end_lsn failed", K(ret), K_(palf_id), K_(self), - K(new_member_list), K(new_replica_num)); + } else if (OB_FAIL(sync_get_committed_end_lsn_(args, new_config_info, need_purge_throttling, + need_remote_check, conn_timeout_us, first_committed_end_lsn, unused_bool, unused_lsn, unused_id))) { + PALF_LOG(WARN, "sync_get_committed_end_lsn failed", K(ret), K_(palf_id), K_(self), K(new_config_info)); } else if (need_skip_log_barrier) { ret = OB_SUCCESS; PALF_LOG(INFO, "PALF is in FLASHBACK mode, skip log barrier", K(ret), K_(palf_id), K_(self), \ @@ -1998,14 +1994,13 @@ int LogConfigMgr::wait_log_barrier_(const LogConfigChangeArgs &args, start_wait_barrier_time_us_ = curr_ts_us; } } - PALF_LOG(INFO, "waiting for log barrier", K(ret), K_(palf_id), K_(self), K(first_committed_end_lsn), - K(prev_log_end_lsn), K(new_member_list), K(new_replica_num)); + PALF_LOG(INFO, "waiting for log barrier", K(ret), K_(palf_id), K_(self), + K(first_committed_end_lsn), K(prev_log_end_lsn), K(new_config_info)); return ret; } int LogConfigMgr::check_follower_sync_status_(const LogConfigChangeArgs &args, - const ObMemberList &new_member_list, - const int64_t new_replica_num, + const LogConfigInfo &new_config_info, bool &added_member_has_new_version) const { int ret = OB_SUCCESS; @@ -2022,15 +2017,13 @@ int LogConfigMgr::check_follower_sync_status_(const LogConfigChangeArgs &args, (void) sw_->get_committed_end_lsn(first_leader_committed_end_lsn); const bool need_skip_log_barrier = mode_mgr_->need_skip_log_barrier(); - if (new_member_list.get_member_number() == 0) { + if (new_config_info.log_sync_memberlist_.get_member_number() == 0) { ret = OB_INVALID_ARGUMENT; - } else if (new_member_list.get_member_number() == 1) { - ret = OB_SUCCESS; - } else if (OB_FAIL(sync_get_committed_end_lsn_(args, new_member_list, new_replica_num, need_purge_throttling, - need_remote_check, conn_timeout_us, first_committed_end_lsn, added_member_has_new_version, added_member_flushed_end_lsn, - added_member_last_slide_log_id))) { - PALF_LOG(WARN, "sync_get_committed_end_lsn failed", K(ret), K_(palf_id), K_(self), K(new_member_list), - K(new_replica_num), K(added_member_has_new_version)); + } else if (OB_FAIL(sync_get_committed_end_lsn_(args, new_config_info, need_purge_throttling, + need_remote_check, conn_timeout_us, first_committed_end_lsn, added_member_has_new_version, + added_member_flushed_end_lsn, added_member_last_slide_log_id))) { + PALF_LOG(WARN, "sync_get_committed_end_lsn failed", K(ret), K_(palf_id), K_(self), K(new_config_info), + K(added_member_has_new_version)); } else if (need_skip_log_barrier) { ret = OB_SUCCESS; PALF_LOG(INFO, "PALF is in FLASHBACK mode, skip log barrier", K(ret), K_(palf_id), K_(self), \ @@ -2038,11 +2031,11 @@ int LogConfigMgr::check_follower_sync_status_(const LogConfigChangeArgs &args, } else if (first_committed_end_lsn >= first_leader_committed_end_lsn) { // if committed lsn of new majority do not retreat, then start config change PALF_LOG(INFO, "majority of new_member_list are sync with leader, start config change", K(ret), K_(palf_id), K_(self), - K(first_committed_end_lsn), K(first_leader_committed_end_lsn), K(new_member_list), K(new_replica_num), K(conn_timeout_us)); + K(first_committed_end_lsn), K(first_leader_committed_end_lsn), K(new_config_info), K(conn_timeout_us)); // when quorum has been changed (e.g., 1 -> 2), committed_end_lsn of new memberlist may always be behind the committed_end_lsn of // leader, so we relax the condition for adding members which has changed quorum } else if (is_add_log_sync_member_list(args.type_) && - (new_replica_num / 2) > (config_meta_.curr_.log_sync_replica_num_ / 2) && + (new_config_info.log_sync_replica_num_ / 2) > (config_meta_.curr_.log_sync_replica_num_ / 2) && config_meta_.curr_.arbitration_member_.is_valid()) { if (added_member_flushed_end_lsn.is_valid() && first_leader_committed_end_lsn - added_member_flushed_end_lsn < LEADER_DEFAULT_GROUP_BUFFER_SIZE && @@ -2050,17 +2043,17 @@ int LogConfigMgr::check_follower_sync_status_(const LogConfigChangeArgs &args, leader_last_slide_log_id - added_member_last_slide_log_id < PALF_SLIDING_WINDOW_SIZE)) { ret = OB_SUCCESS; PALF_LOG(INFO, "the gap between the leader and added member is smaller than the group_buffer_size", - K(ret), K_(palf_id), K_(self), K(args), K(new_replica_num), K(first_leader_committed_end_lsn), + K(ret), K_(palf_id), K_(self), K(args), K(new_config_info), K(first_leader_committed_end_lsn), K(added_member_flushed_end_lsn), K(leader_last_slide_log_id), K(added_member_last_slide_log_id)); } else { ret = OB_EAGAIN; PALF_LOG(INFO, "the gap between the leader and added member is larger than the group_buffer_size, skip", - K(ret), K_(palf_id), K_(self), K(args), K(new_replica_num), K(first_leader_committed_end_lsn), + K(ret), K_(palf_id), K_(self), K(args), K(new_config_info), K(first_leader_committed_end_lsn), K(added_member_flushed_end_lsn), K(leader_last_slide_log_id), K(added_member_last_slide_log_id)); } } else { PALF_LOG(INFO, "majority of new_member_list aren't sync with leader", K_(palf_id), K_(self), K(first_committed_end_lsn), - K(first_leader_committed_end_lsn), K(new_member_list), K(new_replica_num), K(conn_timeout_us)); + K(first_leader_committed_end_lsn), K(new_config_info), K(conn_timeout_us)); // committed_lsn of new majority is behind than old majority's, we want to know if // they can catch up with leader during config change timeout. If they can, start config change ob_usleep(500 * 1000); @@ -2068,15 +2061,15 @@ int LogConfigMgr::check_follower_sync_status_(const LogConfigChangeArgs &args, int64_t expected_sync_time_s; int64_t sync_speed_gap; added_member_has_new_version = is_add_member_list(args.type_)? false: true; - if (OB_FAIL(sync_get_committed_end_lsn_(args, new_member_list, new_replica_num, false/*no need purge throttling*/, + if (OB_FAIL(sync_get_committed_end_lsn_(args, new_config_info, false/*no need purge throttling*/, need_remote_check, conn_timeout_us, second_committed_end_lsn, added_member_has_new_version, added_member_flushed_end_lsn, added_member_last_slide_log_id))) { - PALF_LOG(WARN, "sync_get_committed_end_lsn failed", K(ret), K_(palf_id), K_(self), K(new_member_list), - K(new_replica_num), K(added_member_has_new_version)); + PALF_LOG(WARN, "sync_get_committed_end_lsn failed", K(ret), K_(palf_id), K_(self), + K(new_config_info), K(added_member_has_new_version)); } else if (second_committed_end_lsn >= second_leader_committed_end_lsn) { // if committed lsn of new majority do not retreat, then start config change PALF_LOG(INFO, "majority of new_member_list are sync with leader, start config change", K_(palf_id), K_(self), - K(second_committed_end_lsn), K(second_leader_committed_end_lsn), K(new_member_list), K(new_replica_num), K(conn_timeout_us)); + K(second_committed_end_lsn), K(second_leader_committed_end_lsn), K(new_config_info), K(conn_timeout_us)); } else if (FALSE_IT(sync_speed_gap = ((second_committed_end_lsn - first_committed_end_lsn) * 2) - \ ((second_leader_committed_end_lsn - first_leader_committed_end_lsn) * 2) )) { } else if (sync_speed_gap <= 0) { @@ -2133,8 +2126,8 @@ int LogConfigMgr::check_servers_lsn_and_version_(const common::ObAddr &server, get_from_local = true; } else if (OB_FAIL(log_engine_->submit_config_change_pre_check_req(server, config_version, need_purge_throttling, conn_timeout_us, resp))) { - PALF_LOG(WARN, "submit_config_change_pre_check_req failed", KR(ret), K_(palf_id), K_(self), - K(server), K(need_purge_throttling), K(config_version), K(conn_timeout_us), K(resp)); + // PALF_LOG(WARN, "submit_config_change_pre_check_req failed", KR(ret), K_(palf_id), K_(self), + // K(server), K(need_purge_throttling), K(config_version), K(conn_timeout_us), K(resp)); has_same_version = false; } else if (false == resp.is_normal_replica_) { has_same_version = false; @@ -2155,8 +2148,7 @@ int LogConfigMgr::check_servers_lsn_and_version_(const common::ObAddr &server, // 2. check if the config_version of added member are same to current config_version. // if the config change don't add member to list, return true int LogConfigMgr::sync_get_committed_end_lsn_(const LogConfigChangeArgs &args, - const ObMemberList &new_member_list, - const int64_t new_replica_num, + const LogConfigInfo &new_config_info, const bool need_purge_throttling, const bool need_remote_check, const int64_t conn_timeout_us, @@ -2166,68 +2158,73 @@ int LogConfigMgr::sync_get_committed_end_lsn_(const LogConfigChangeArgs &args, int64_t &added_member_last_slide_log_id) const { int ret = OB_SUCCESS, tmp_ret = OB_SUCCESS; - int64_t resp_cnt = 0; - const LogConfigVersion config_version = config_meta_.curr_.config_version_; + int64_t log_sync_resp_cnt = 0, paxos_resp_cnt = 0; LSN lsn_array[OB_MAX_MEMBER_NUMBER]; + const LogConfigVersion &config_version = config_meta_.curr_.config_version_; + const common::ObMemberList new_log_sync_memberlist = new_config_info.log_sync_memberlist_; + const int64_t new_log_sync_replica_num = new_config_info.log_sync_replica_num_; + common::ObMemberList new_paxos_memberlist; + int64_t new_paxos_replica_num = 0; + GlobalLearnerList unused_list; added_member_has_new_version = is_add_member_list(args.type_)? false: true; added_member_flushed_end_lsn.reset(); added_member_last_slide_log_id = 0; - for (int64_t i = 0; i < new_member_list.get_member_number(); ++i) { + if (OB_FAIL(new_config_info.convert_to_complete_config(new_paxos_memberlist, new_paxos_replica_num, unused_list))) { + PALF_LOG(WARN, "convert_to_complete_config failed", K(ret), K_(palf_id), K_(self), K(new_config_info)); + } + + for (int64_t i = 0; OB_SUCC(ret) && i < new_paxos_memberlist.get_member_number(); ++i) { common::ObAddr server; bool is_added_member = false; bool force_remote_check = false; LSN max_flushed_end_lsn; bool has_same_version = false; - if (OB_SUCCESS != (tmp_ret = new_member_list.get_server_by_index(i, server))) { - PALF_LOG(ERROR, "get_server_by_index failed", KR(ret), K_(palf_id), K_(self), K(i), - "new_member_list size:", new_member_list.get_member_number()); - } else if (FALSE_IT(is_added_member = is_add_member_list(args.type_) && (args.server_.get_server() == server))) { + bool is_arb_member = false; + int64_t last_slide_log_id = 0; + if (OB_SUCCESS != (tmp_ret = new_paxos_memberlist.get_server_by_index(i, server))) { + PALF_LOG(ERROR, "get_server_by_index failed", KR(ret), K_(palf_id), K_(self), K(i), K(new_paxos_memberlist)); + } else if (FALSE_IT(is_added_member = (is_add_member_list(args.type_) && (args.server_.get_server() == server)))) { + } else if (FALSE_IT(is_arb_member = (server == new_config_info.arbitration_member_.get_server()))) { } else if (FALSE_IT(force_remote_check = is_added_member || need_purge_throttling || need_remote_check)) { } else if (OB_SUCCESS != (tmp_ret = check_servers_lsn_and_version_(server, config_version, conn_timeout_us, force_remote_check, need_purge_throttling, max_flushed_end_lsn, has_same_version, - added_member_last_slide_log_id))) { + last_slide_log_id))) { // PALF_LOG(WARN, "check_servers_lsn_and_version_ failed", K(ret), K(tmp_ret), K_(palf_id), K_(self), K(server), // K(config_version), K(conn_timeout_us), K(force_remote_check), K(max_flushed_end_lsn), K(has_same_version)); + } else if (false == is_arb_member) { + lsn_array[log_sync_resp_cnt++] = max_flushed_end_lsn; + paxos_resp_cnt++; } else { - lsn_array[resp_cnt++] = max_flushed_end_lsn; + paxos_resp_cnt++; } added_member_has_new_version = (is_added_member)? has_same_version: added_member_has_new_version; added_member_flushed_end_lsn = (is_added_member)? max_flushed_end_lsn: added_member_flushed_end_lsn; - } - - // added member isn't in new_member_list, e.g., add arb member - if (ADD_ARB_MEMBER == args.type_) { - LSN max_flushed_end_lsn; - bool has_same_version = false; - if (OB_SUCCESS != (tmp_ret = check_servers_lsn_and_version_(args.server_.get_server(), - config_version, conn_timeout_us, true/*force_check*/, false/*need_purge_throttle*/, max_flushed_end_lsn, - has_same_version, added_member_last_slide_log_id))) { - PALF_LOG(WARN, "check_servers_lsn_and_version_ failed", K(ret), K_(palf_id), K_(self), K(args), K(config_version), - K(conn_timeout_us), K(max_flushed_end_lsn), K(has_same_version)); - } - added_member_has_new_version = has_same_version; + added_member_last_slide_log_id = (is_added_member)? last_slide_log_id: added_member_last_slide_log_id; } if (false == added_member_has_new_version) { ret = OB_EAGAIN; PALF_LOG(WARN, "added member don't have new version, eagain", K(ret), K_(palf_id), K_(self), K(args), K(config_version)); - } else if (resp_cnt < new_replica_num / 2 + 1) { + } else if ((paxos_resp_cnt < new_paxos_replica_num / 2 + 1) || + (log_sync_resp_cnt < new_log_sync_replica_num / 2 + 1)) { // do not recv majority resp, can not change member ret = OB_EAGAIN; PALF_LOG(WARN, "connection timeout with majority of new_member_list, can't change member!", - K_(palf_id), K_(self), K(new_member_list), K(new_replica_num), K(resp_cnt), K(conn_timeout_us)); + K_(palf_id), K_(self), K(new_paxos_replica_num), K(paxos_resp_cnt), + K(new_log_sync_replica_num), K(log_sync_resp_cnt), K(conn_timeout_us)); } else { - std::sort(lsn_array, lsn_array + resp_cnt, LSNCompare()); - committed_end_lsn = lsn_array[new_replica_num / 2]; + std::sort(lsn_array, lsn_array + log_sync_resp_cnt, LSNCompare()); + committed_end_lsn = lsn_array[new_log_sync_replica_num / 2]; } PALF_LOG(INFO, "sync_get_committed_end_lsn_ finish", K(ret), K_(palf_id), K_(self), K(args), - K(new_member_list), K(new_replica_num), K(need_purge_throttling), K(need_remote_check), + K(new_config_info), K(need_purge_throttling), K(need_remote_check), K(conn_timeout_us), K(committed_end_lsn), K(added_member_has_new_version), K(added_member_flushed_end_lsn), K(added_member_last_slide_log_id), - "lsn_array:", common::ObArrayWrap(lsn_array, resp_cnt)); + K(paxos_resp_cnt), K(new_paxos_replica_num), K(log_sync_resp_cnt), K(new_log_sync_replica_num), + "lsn_array:", common::ObArrayWrap(lsn_array, log_sync_resp_cnt)); return ret; } diff --git a/src/logservice/palf/log_config_mgr.h b/src/logservice/palf/log_config_mgr.h index 7cd2880ab2..70754f0973 100644 --- a/src/logservice/palf/log_config_mgr.h +++ b/src/logservice/palf/log_config_mgr.h @@ -326,12 +326,10 @@ public: virtual int submit_broadcast_leader_info(const int64_t proposal_id) const; virtual void reset_status(); int check_follower_sync_status(const LogConfigChangeArgs &args, - const ObMemberList &new_member_list, - const int64_t new_replica_num, + const LogConfigInfo &new_config_info, bool &added_member_has_new_version) const; int wait_log_barrier_(const LogConfigChangeArgs &args, - const ObMemberList &new_member_list, - const int64_t new_replica_num) const; + const LogConfigInfo &new_config_info) const; int sync_meta_for_arb_election_leader(); bool need_sync_to_degraded_learners() const; // ================ Config Change ================== @@ -436,8 +434,7 @@ private: bool &has_same_version, int64_t &last_slide_log_id) const; int sync_get_committed_end_lsn_(const LogConfigChangeArgs &args, - const ObMemberList &new_member_list, - const int64_t new_replica_num, + const LogConfigInfo &new_config_info, const bool need_purge_throttling, const bool need_remote_check, const int64_t conn_timeout_us, @@ -446,8 +443,7 @@ private: LSN &added_member_flushed_end_lsn, int64_t &added_member_last_slide_log_id) const; int check_follower_sync_status_(const LogConfigChangeArgs &args, - const ObMemberList &new_member_list, - const int64_t new_replica_num, + const LogConfigInfo &new_config_info, bool &added_member_has_new_version) const; int pre_sync_config_log_and_mode_meta_(const common::ObMember &server, const int64_t proposal_id, diff --git a/src/logservice/palf/log_define.h b/src/logservice/palf/log_define.h index 3490914b1e..e89e6aec5f 100644 --- a/src/logservice/palf/log_define.h +++ b/src/logservice/palf/log_define.h @@ -96,7 +96,7 @@ const int64_t MATCH_LSN_ADVANCE_DELAY_THRESHOLD_US = 1 * 1000 * 1000L; const int64_t PALF_RECONFIRM_FETCH_MAX_LSN_INTERVAL = 1 * 1000 * 1000; const int64_t PALF_FETCH_LOG_INTERVAL_US = 2 * 1000 * 1000L; // 2s // Control the fetch interval trigger by outer(eg. config change pre check) by 500ms. -const int64_t PALF_FETCH_LOG_OUTER_TRIGGER_INTERVAL = 100 * 1000 * 1000L; +const int64_t PALF_FETCH_LOG_OUTER_TRIGGER_INTERVAL = 500 * 1000L; // 500 ms const int64_t PALF_FETCH_LOG_RENEW_LEADER_INTERVAL_US = 5 * 1000 * 1000; // 5s const int64_t PALF_LEADER_RECONFIRM_SYNC_TIMEOUT_US = 10 * 1000 * 1000L; // 10s const int64_t PREPARE_LOG_BUFFER_SIZE = 2048; diff --git a/src/logservice/palf/palf_handle_impl.cpp b/src/logservice/palf/palf_handle_impl.cpp index 0f6c9e230c..4984a1d19c 100644 --- a/src/logservice/palf/palf_handle_impl.cpp +++ b/src/logservice/palf/palf_handle_impl.cpp @@ -943,21 +943,16 @@ int PalfHandleImpl::check_args_and_generate_config_(const LogConfigChangeArgs &a const int64_t proposal_id, const int64_t election_epoch, bool &is_already_finished, - common::ObMemberList &log_sync_memberlist, - int64_t &log_sync_repclia_num) const + LogConfigInfo &new_config_info) const { int ret = OB_SUCCESS; RLockGuard guard(lock_); - LogConfigInfo config_info; if (OB_FAIL(config_mgr_.check_args_and_generate_config(args, proposal_id, - election_epoch, is_already_finished, config_info))) { + election_epoch, is_already_finished, new_config_info))) { if (palf_reach_time_interval(100 * 1000, config_change_print_time_us_)) { PALF_LOG(WARN, "check_args_and_generate_config failed", K(ret), KPC(this), K(args)); } - } else { - log_sync_memberlist = config_info.log_sync_memberlist_; - log_sync_repclia_num = config_info.log_sync_replica_num_; - } + } else { } return ret; } @@ -969,9 +964,8 @@ int PalfHandleImpl::one_stage_config_change_(const LogConfigChangeArgs &args, int64_t proposal_id = INVALID_PROPOSAL_ID; int64_t election_epoch = INVALID_PROPOSAL_ID; bool is_already_finished = false; - ObMemberList new_log_sync_memberlist; - int64_t new_log_sync_replica_num = 0; int get_lock = OB_EAGAIN; + LogConfigInfo new_config_info; if (DEGRADE_ACCEPTOR_TO_LEARNER == args.type_) { // for concurrent DEGRADE if (ATOMIC_BCAS(&has_higher_prio_config_change_, false, true)) { @@ -994,7 +988,7 @@ int PalfHandleImpl::one_stage_config_change_(const LogConfigChangeArgs &args, PALF_LOG(WARN, "start_change_config failed", KR(ret), KPC(this), K(args)); } else if (FALSE_IT(doing_degrade = (args.type_ == DEGRADE_ACCEPTOR_TO_LEARNER))) { } else if (OB_FAIL(check_args_and_generate_config_(args, proposal_id, election_epoch, - is_already_finished, new_log_sync_memberlist, new_log_sync_replica_num))) { + is_already_finished, new_config_info))) { if (palf_reach_time_interval(100 * 1000, config_change_print_time_us_)) { PALF_LOG(WARN, "check_args_and_generate_config failed", KR(ret), KPC(this), K(args)); } @@ -1030,13 +1024,12 @@ int PalfHandleImpl::one_stage_config_change_(const LogConfigChangeArgs &args, ret = OB_NOT_MASTER; PALF_LOG(WARN, "leader has been switched, try to change config again", KR(ret), KPC(this), K(proposal_id), K(curr_proposal_id)); - } else if (OB_SUCC(config_mgr_.check_follower_sync_status(args, new_log_sync_memberlist, - new_log_sync_replica_num, added_member_has_new_version))) { + } else if (OB_SUCC(config_mgr_.check_follower_sync_status(args, new_config_info, + added_member_has_new_version))) { // check log synchronization progress of new memberlist majority synchronically break; } else if (OB_EAGAIN != ret) { - PALF_LOG(WARN, "check_follower_sync_status_ fails", K(ret), K_(palf_id), - K(new_log_sync_memberlist), K(new_log_sync_replica_num)); + PALF_LOG(WARN, "check_follower_sync_status_ fails", K(ret), K_(palf_id), K(new_config_info)); } else if (is_upgrade_or_degrade(args.type_)) { ret = OB_EAGAIN; PALF_LOG(WARN, "degrade/upgrade eagain, arb_reason: check_follower_sync_status_ return false", @@ -1101,8 +1094,14 @@ int PalfHandleImpl::one_stage_config_change_(const LogConfigChangeArgs &args, PALF_LOG(INFO, "one_stage_config_change finish", KR(ret), KPC(this), K(args), K(config_version), K(timeout_us), K(time_guard)); ret = (OB_LOG_NOT_SYNC == ret)? OB_EAGAIN: ret; - if (OB_TIMEOUT == ret && config_version.is_valid()) { - config_mgr_.after_config_change_timeout(config_version); + if (OB_FAIL(ret)) { + if (config_version.is_valid()) { + config_mgr_.after_config_change_timeout(config_version); + } else { + // encounter unexpected error, reset flag + WLockGuard guard(lock_); + state_mgr_.reset_changing_config_with_arb(); + } } } if (OB_SUCCESS == get_lock) { diff --git a/src/logservice/palf/palf_handle_impl.h b/src/logservice/palf/palf_handle_impl.h index d8bb27b8eb..3f5e1886b3 100755 --- a/src/logservice/palf/palf_handle_impl.h +++ b/src/logservice/palf/palf_handle_impl.h @@ -1046,8 +1046,7 @@ private: const int64_t proposal_id, const int64_t election_epoch, bool &is_already_finished, - common::ObMemberList &log_sync_memberlist, - int64_t &log_sync_repclia_num) const; + LogConfigInfo &new_config_info) const; int one_stage_config_change_(const LogConfigChangeArgs &args, const int64_t timeout_us); int check_need_rebuild_(const LSN &base_lsn, const LogInfo &base_prev_log_info, diff --git a/unittest/logservice/test_ob_arbitration_service.cpp b/unittest/logservice/test_ob_arbitration_service.cpp index b6636338b5..8db309eb60 100644 --- a/unittest/logservice/test_ob_arbitration_service.cpp +++ b/unittest/logservice/test_ob_arbitration_service.cpp @@ -28,6 +28,7 @@ public: ~MockNetKeepAliveAdapter() { } bool in_black_or_stopped(const common::ObAddr &server) override final {return false;} bool is_server_stopped(const common::ObAddr &server) override final {return false;} + bool in_black(const common::ObAddr &server) override final {return false;} }; const ObAddr addr1(ObAddr::IPV4, "127.0.0.1", 1000);