From e02b126ceb53d677d7ed89f5c95cd6b711ee559d Mon Sep 17 00:00:00 2001 From: obdev Date: Mon, 6 Feb 2023 19:52:39 +0800 Subject: [PATCH] [Election] support set inner priority --- .../palf/election/algorithm/election_impl.cpp | 47 +++++++++++++++++++ .../palf/election/algorithm/election_impl.h | 3 ++ .../election/message/election_message.cpp | 7 ++- .../election/utils/election_common_define.h | 10 ++-- src/logservice/palf/palf_handle_impl.cpp | 9 ++-- unittest/logservice/test_ob_election.cpp | 45 ++++++++++++++++-- .../test_ob_election_message_compat.cpp | 2 +- 7 files changed, 110 insertions(+), 13 deletions(-) diff --git a/src/logservice/palf/election/algorithm/election_impl.cpp b/src/logservice/palf/election/algorithm/election_impl.cpp index f6228b0f9a..953290f054 100644 --- a/src/logservice/palf/election/algorithm/election_impl.cpp +++ b/src/logservice/palf/election/algorithm/election_impl.cpp @@ -10,6 +10,7 @@ * See the Mulan PubL v2 for more details. */ +#include "share/ob_errno.h" #include "share/ob_occam_time_guard.h" #include "election_impl.h" #include "lib/ob_errno.h" @@ -415,6 +416,52 @@ int ElectionImpl::revoke(const RoleChangeReason &reason) return ret; } +int ElectionImpl::add_inner_priority_seed_bit(const PRIORITY_SEED_BIT new_bit) +{ + #define PRINT_WRAPPER KR(ret), K(*this), K(new_bit) + int ret = OB_SUCCESS; + ELECT_TIME_GUARD(500_ms); + LockGuard lock_guard(lock_); + CLICK(); + CHECK_ELECTION_INIT(); + if (inner_priority_seed_ & static_cast(new_bit)) { + ret = OB_ENTRY_EXIST; + LOG_NONE(WARN, "set inner priority seed with new bit failed, caus it already exist"); + } else { + inner_priority_seed_ |= static_cast(new_bit); + } + return ret; + #undef PRINT_WRAPPER +} + +int ElectionImpl::clear_inner_priority_seed_bit(const PRIORITY_SEED_BIT old_bit) +{ + #define PRINT_WRAPPER KR(ret), K(*this), K(old_bit) + int ret = OB_SUCCESS; + ELECT_TIME_GUARD(500_ms); + LockGuard lock_guard(lock_); + CLICK(); + CHECK_ELECTION_INIT(); + if (!(inner_priority_seed_ | static_cast(old_bit))) { + ret = OB_ENTRY_NOT_EXIST; + LOG_NONE(WARN, "clear inner priority seed with old bit failed, caus it not exist"); + } else { + inner_priority_seed_ &= (~static_cast(old_bit)); + } + return ret; + #undef PRINT_WRAPPER +} + +int ElectionImpl::set_inner_priority_seed(const uint64_t seed) +{ + int ret = OB_SUCCESS; + ELECT_TIME_GUARD(500_ms); + LockGuard lock_guard(lock_); + CLICK(); + inner_priority_seed_ = seed; + return ret; +} + } } } diff --git a/src/logservice/palf/election/algorithm/election_impl.h b/src/logservice/palf/election/algorithm/election_impl.h index 7de7487f09..6efacae1c8 100644 --- a/src/logservice/palf/election/algorithm/election_impl.h +++ b/src/logservice/palf/election/algorithm/election_impl.h @@ -126,6 +126,9 @@ public: virtual int handle_message(const ElectionAcceptResponseMsg &msg) override final; virtual int handle_message(const ElectionChangeLeaderMsg &msg) override final; virtual const common::ObAddr &get_self_addr() const override; + int add_inner_priority_seed_bit(const PRIORITY_SEED_BIT new_bit); + int clear_inner_priority_seed_bit(const PRIORITY_SEED_BIT old_bit); + int set_inner_priority_seed(const uint64_t seed); TO_STRING_KV(K_(is_inited), K_(is_running), K_(proposer), K_(acceptor), KPC_(priority)); private:// 定向暴露给友元类 void refresh_priority_(); diff --git a/src/logservice/palf/election/message/election_message.cpp b/src/logservice/palf/election/message/election_message.cpp index 4f6d96e41e..974dac5f9e 100644 --- a/src/logservice/palf/election/message/election_message.cpp +++ b/src/logservice/palf/election/message/election_message.cpp @@ -159,7 +159,10 @@ ElectionPrepareRequestMsgMiddle::ElectionPrepareRequestMsgMiddle() : ElectionMsgBase(), role_(ObRole::INVALID_ROLE), is_buffer_valid_(false), -inner_priority_seed_(DEFAULT_SEED) { memset(priority_buffer_, 0, PRIORITY_BUFFER_SIZE); } +inner_priority_seed_(static_cast(PRIORITY_SEED_BIT::DEFAULT_SEED)) +{ + memset(priority_buffer_, 0, PRIORITY_BUFFER_SIZE); +} int ElectionPrepareRequestMsgMiddle::set(const ElectionPriority *priority, const common::ObRole role) { @@ -257,7 +260,7 @@ lease_started_ts_on_proposer_(0), lease_interval_(0), accepted_(false), is_buffer_valid_(false), -inner_priority_seed_(DEFAULT_SEED) +inner_priority_seed_(static_cast(PRIORITY_SEED_BIT::DEFAULT_SEED)) { memset(priority_buffer_, 0, PRIORITY_BUFFER_SIZE); } diff --git a/src/logservice/palf/election/utils/election_common_define.h b/src/logservice/palf/election/utils/election_common_define.h index 192313bb76..9d9fa8b15a 100644 --- a/src/logservice/palf/election/utils/election_common_define.h +++ b/src/logservice/palf/election/utils/election_common_define.h @@ -99,9 +99,13 @@ enum class LogPhase SET_MEMBER = 7, }; -// inner priority seed define -constexpr int64_t DEFAULT_SEED = (1ULL << 12); -constexpr int64_t SEED_NOT_NORMOL_REPLICA_BIT = (1ULL << 48); +// inner priority seed define, bigger means lower priority +enum class PRIORITY_SEED_BIT : uint64_t +{ + DEFAULT_SEED = (1ULL << 12), + SEED_IN_REBUILD_PHASE_BIT = (1ULL << 32), + SEED_NOT_NORMOL_REPLICA_BIT = (1ULL << 48), +}; constexpr int64_t MSG_DELAY_WARN_THRESHOLD = 200_ms; constexpr int64_t MAX_LEASE_TIME = 10_s; diff --git a/src/logservice/palf/palf_handle_impl.cpp b/src/logservice/palf/palf_handle_impl.cpp index a5db36ca9e..3e28af2772 100644 --- a/src/logservice/palf/palf_handle_impl.cpp +++ b/src/logservice/palf/palf_handle_impl.cpp @@ -2238,6 +2238,11 @@ int PalfHandleImpl::do_init_mem_( int ret = OB_SUCCESS; int pret = -1; const bool is_normal_replica = (log_meta.get_log_replica_property_meta().replica_type_ == NORMAL_REPLICA); + // inner priority seed: smaller means higher priority + // reserve some bits for future requirements + const uint64_t election_inner_priority_seed = is_normal_replica ? + static_cast(PRIORITY_SEED_BIT::DEFAULT_SEED) : + 0ULL | static_cast(PRIORITY_SEED_BIT::SEED_NOT_NORMOL_REPLICA_BIT); palf::PalfRoleChangeCbWrapper &role_change_cb_wrpper = role_change_cb_wrpper_; if ((pret = snprintf(log_dir_, MAX_PATH_SIZE, "%s", log_dir)) && false) { ret = OB_ERR_UNEXPECTED; @@ -2249,9 +2254,7 @@ int PalfHandleImpl::do_init_mem_( election_timer, &election_msg_sender_, self, - /*inner priority seed: smaller means higher priority*/ - /*reserve some bits for future requirements*/ - is_normal_replica ? DEFAULT_SEED : (0ULL | SEED_NOT_NORMOL_REPLICA_BIT), + election_inner_priority_seed, 1, [&role_change_cb_wrpper](int64_t id, const ObAddr &dest_addr){ diff --git a/unittest/logservice/test_ob_election.cpp b/unittest/logservice/test_ob_election.cpp index 6f4fa32d87..8357b64e7e 100644 --- a/unittest/logservice/test_ob_election.cpp +++ b/unittest/logservice/test_ob_election.cpp @@ -99,9 +99,9 @@ public: }; template -vector create_election_group(const int election_num, const vector &allowed_be_leader, TAKEOVER_OP &&op) +vector create_election_group(const int election_num, const vector &priority_seed, TAKEOVER_OP &&op) { - if (allowed_be_leader.size() != election_num) { + if (priority_seed.size() != election_num) { abort(); } vector v; @@ -134,9 +134,13 @@ vector create_election_group(const int election_num, const vecto &timer, &GlobalNetService, ObAddr(ObAddr::VER::IPV4, "127.0.0.1", port + index), - allowed_be_leader[index], + priority_seed[index], 1, - [](const int64_t, const ObAddr &){ return OB_SUCCESS; }, + [&election](const int64_t, const ObAddr &dest_addr) { + return THREAD_POOL.commit_task_ignore_ret([&election, dest_addr]() { + election->change_leader_to(dest_addr); + }); + }, [op, ret](Election *election, ObRole before, ObRole after, RoleChangeReason reason) { if (before == ObRole::FOLLOWER && after == ObRole::LEADER) { ELECT_LOG(INFO, "i become LEADER", K(obj_to_string(reason)), KPC(election)); @@ -559,6 +563,39 @@ TEST_F(TestElection, inner_priority_seed_valid_when_membership_version_equal) { ASSERT_EQ(stop_to_be_follower_count, 1); } +TEST_F(TestElection, set_inner_priority_seed) { + auto election_list = create_election_group(3, {(uint64_t)PRIORITY_SEED_BIT::DEFAULT_SEED, (uint64_t)PRIORITY_SEED_BIT::DEFAULT_SEED, (uint64_t)PRIORITY_SEED_BIT::DEFAULT_SEED}, [](){}); + for (auto &election_1 : election_list) { + for (auto &election_2 : election_list) { + GlobalNetService.connect(election_1, election_2); + } + } + this_thread::sleep_for(chrono::seconds(7)); + ObRole role; + int64_t _; + election_list[0]->get_role(role, _); + ASSERT_EQ(role, ObRole::LEADER); + + ASSERT_EQ(OB_SUCCESS, election_list[0]->add_inner_priority_seed_bit(PRIORITY_SEED_BIT::SEED_IN_REBUILD_PHASE_BIT)); + this_thread::sleep_for(chrono::seconds(3)); + election_list[1]->get_role(role, _); + ASSERT_EQ(role, ObRole::LEADER); + + for (auto iter = election_list.rbegin(); iter != election_list.rend(); ++iter) + (*iter)->stop(); + this_thread::sleep_for(chrono::seconds(2)); + for (auto &election_ : election_list) + delete election_; + + ASSERT_EQ(leader_takeover_times, 2); + ASSERT_EQ(leader_revoke_times, 2); + ASSERT_EQ(devote_to_be_leader_count, 1); + ASSERT_EQ(lease_expired_to_be_follower_count, 0); + ASSERT_EQ(change_leader_to_be_leader_count, 1); + ASSERT_EQ(change_leader_to_be_follower_count, 1); + ASSERT_EQ(stop_to_be_follower_count, 1); +} + } } diff --git a/unittest/logservice/test_ob_election_message_compat.cpp b/unittest/logservice/test_ob_election_message_compat.cpp index 093a26b25f..aa00fdcf5d 100644 --- a/unittest/logservice/test_ob_election_message_compat.cpp +++ b/unittest/logservice/test_ob_election_message_compat.cpp @@ -215,7 +215,7 @@ TEST_F(TestElectionMsgCompat, old_new_msg_serialize) { pos = 0; ASSERT_EQ(OB_SUCCESS, prepare_msg_new1.deserialize(buffer, buffer_size, pos)); ELECT_LOG(INFO, "test msg from old to new", K(prepare_msg_old1), K(prepare_msg_new1)); - ASSERT_EQ(DEFAULT_SEED, prepare_msg_new1.get_inner_priority_seed()); + ASSERT_EQ((uint64_t)PRIORITY_SEED_BIT::DEFAULT_SEED, prepare_msg_new1.get_inner_priority_seed()); pos = 0; ASSERT_EQ(OB_SUCCESS, prepare_msg_new2.serialize(buffer, buffer_size, pos)); pos = 0;