[Election] revoke() -> advance_ballot() + downgrade_priority()

This commit is contained in:
obdev 2024-02-22 03:16:30 +00:00 committed by ob-robot
parent 18f4f766a2
commit 5ff857ba8b
21 changed files with 187 additions and 64 deletions

View File

@ -90,9 +90,10 @@ int MockElection::change_leader_to(const common::ObAddr &dest_addr)
return ret;
}
int MockElection::revoke(const RoleChangeReason &reason)
int MockElection::temporarily_downgrade_protocol_priority(const int64_t time_us, const char *reason)
{
int ret = OB_SUCCESS;
UNUSED(time_us);
UNUSED(reason);
return ret;
}

View File

@ -35,7 +35,7 @@ public:
int64_t &cur_leader_epoch) const override final;
// 供role change service使用
int change_leader_to(const common::ObAddr &dest_addr) override final;
int revoke(const RoleChangeReason &reason) override final;
int temporarily_downgrade_protocol_priority(const int64_t time_us, const char *reason) override final;
// 拿本机地址
const common::ObAddr &get_self_addr() const override final;
// 打印日志

View File

@ -473,7 +473,7 @@ int RoleCoordinator::switch_follower_to_leader_(
CLOG_LOG(INFO, "switch_follower_to_leader_ success", K(ret), KPC(ls));
}
if (OB_FAIL(ret) && !retry_ctx.need_retry()) {
log_handler->revoke_leader();
log_handler->change_leader_to(GCTX.self_addr());
CLOG_LOG(WARN, "switch_follower_to_leader_ failed", K(ret), KPC(ls));
}
return ret;
@ -515,7 +515,7 @@ int RoleCoordinator::switch_leader_to_follower_forcedly_(
CLOG_LOG(INFO, "switch_leader_to_follower_forcedly_ success", K(ret), KPC(ls));
}
if (OB_FAIL(ret)) {
log_handler->revoke_leader();
log_handler->change_leader_to(GCTX.self_addr());
CLOG_LOG(WARN, "switch_leader_to_follower_forcedly_ failed", K(ret), K(new_proposal_id), K(new_role));
}
return ret;
@ -573,7 +573,7 @@ int RoleCoordinator::switch_leader_to_follower_gracefully_(
CLOG_LOG(INFO, "switch_to_follower_gracefully success", K(ret), K(new_role), K(new_proposal_id), K(dst_addr));
}
if (OB_FAIL(ret) || OB_LS_NEED_REVOKE == tmp_ret) {
log_handler->revoke_leader();
log_handler->change_leader_to(GCTX.self_addr());
CLOG_LOG(WARN, "switch_leader_to_follower_gracefully failed, revoke leader", K(ret), K(tmp_ret), K(dst_addr),
K(new_role), K(new_proposal_id));
ret = (OB_SUCCESS == ret ? tmp_ret : ret);

View File

@ -48,10 +48,11 @@ int ObLogHandlerBase::prepare_switch_role(common::ObRole &curr_role,
return ret;
}
int ObLogHandlerBase::revoke_leader()
int ObLogHandlerBase::advance_election_epoch_and_downgrade_priority(const int64_t downgrade_priority_time_us,
const char *reason)
{
RLockGuard guard(lock_);
return palf_handle_.revoke_leader(proposal_id_);
return palf_handle_.advance_election_epoch_and_downgrade_priority(proposal_id_, downgrade_priority_time_us, reason);
}
int ObLogHandlerBase::change_leader_to(const common::ObAddr &dst_addr)

View File

@ -45,7 +45,8 @@ public:
bool &is_pending_state) const;
// NB: only called by ObRoleChangeService
virtual void switch_role(const common::ObRole &role, const int64_t proposal_id) = 0;
int revoke_leader();
int advance_election_epoch_and_downgrade_priority(const int64_t downgrade_priority_time_us,
const char *reason);
int change_leader_to(const common::ObAddr &dst_addr);
int get_role_atomically(common::ObRole &role) const;

View File

@ -59,7 +59,7 @@ public:
if (OB_UNLIKELY(msg.get_ballot_number() < p_acceptor->ballot_number_)) {
using T = typename ResponseType<RequestMsg>::type;
T reject_msg = create_reject_message_(p_acceptor->p_election_->get_self_addr(),
p_acceptor->p_election_->inner_priority_seed_,
p_acceptor->p_election_->generate_inner_priority_seed_(),
p_acceptor->p_election_->get_membership_version_(),
p_acceptor->p_election_->get_ls_biggest_min_cluster_version_ever_seen_(),
msg);
@ -321,7 +321,7 @@ void ElectionAcceptor::on_accept_request(const ElectionAcceptRequestMsg &accept_
*us_to_expired = lease_.get_lease_end_ts() - get_monotonic_ts();
// 3. 构造accept ok消息
ElectionAcceptResponseMsg accept_res_accept(p_election_->get_self_addr(),
p_election_->inner_priority_seed_,
p_election_->generate_inner_priority_seed_(),
p_election_->get_membership_version_(),
p_election_->get_ls_biggest_min_cluster_version_ever_seen_(),
accept_req);

View File

@ -195,7 +195,7 @@ int ElectionImpl::change_leader_to(const common::ObAddr &dest_addr)
if (CLICK_FAIL(proposer_.change_leader_to(dest_addr))) {
LOG_CHANGE_LEADER(WARN, "change leader to failed");
} else {
LOG_CHANGE_LEADER(INFO, "change leader to");
LOG_CHANGE_LEADER(INFO, "change leader to", K(lbt()));
}
return ret;
#undef PRINT_WRAPPER
@ -438,14 +438,19 @@ int ElectionImpl::send_(const ElectionChangeLeaderMsg &msg) const
#undef PRINT_WRAPPER
}
int ElectionImpl::revoke(const RoleChangeReason &reason)
int ElectionImpl::temporarily_downgrade_protocol_priority(const int64_t time_us, const char *reason)
{
#define PRINT_WRAPPER KR(ret), K(*this), K(time_us), K(reason)
ELECT_TIME_GUARD(500_ms);
int ret = OB_SUCCESS;
LockGuard lock_guard(lock_);
CHECK_ELECTION_INIT();
ret = proposer_.revoke(reason);
temporarily_downgrade_priority_info_.downgrade_expire_ts_ = get_monotonic_ts() + time_us;
temporarily_downgrade_priority_info_.interval_ = time_us;
temporarily_downgrade_priority_info_.reason_ = reason;
LOG_NONE(WARN, "temporarily downgrade protocol priority");
return ret;
#undef PRINT_WRAPPER
}
int ElectionImpl::add_inner_priority_seed_bit(const PRIORITY_SEED_BIT new_bit)
@ -517,6 +522,15 @@ uint64_t ElectionImpl::get_ls_biggest_min_cluster_version_ever_seen_() const
#undef PRINT_WRAPPER
}
uint64_t ElectionImpl::generate_inner_priority_seed_() const
{
uint64_t priority_seed = inner_priority_seed_;
if (get_monotonic_ts() < temporarily_downgrade_priority_info_.downgrade_expire_ts_) {
priority_seed |= (uint64_t)PRIORITY_SEED_BIT::SEED_TEMORARILY_DOWNGRADE_PRIORIY_BIT;
}
return priority_seed;
}
}
}
}

View File

@ -65,11 +65,11 @@ public:
const int64_t restart_counter,
const ObFunction<int(const int64_t, const ObAddr &)> &prepare_change_leader_cb,
const ObFunction<void(ElectionImpl *, common::ObRole, common::ObRole, RoleChangeReason)> &cb = DefaultRoleChangeCallBack());
int revoke(const RoleChangeReason &reason) override;
virtual void stop() override final;
virtual int can_set_memberlist(const palf::LogConfigVersion &new_config_version) const override final;
virtual int set_memberlist(const MemberList &new_memberlist) override final;
virtual int change_leader_to(const common::ObAddr &dest_addr) override final;
virtual int temporarily_downgrade_protocol_priority(const int64_t time_us, const char *reason) override final;
/**
* @description: epoch
* @param {ObRole} &role LEADER或者FOLLOWER
@ -131,7 +131,7 @@ public:
int clear_inner_priority_seed_bit(const PRIORITY_SEED_BIT old_bit);
int set_inner_priority_seed(const uint64_t seed);
TO_STRING_KV(K_(is_inited), K_(is_running), K_(proposer), K_(acceptor),
K_(ls_biggest_min_cluster_version_ever_seen), KPC_(priority));
K_(ls_biggest_min_cluster_version_ever_seen), KPC_(priority), K_(temporarily_downgrade_priority_info));
private:// 定向暴露给友元类
void handle_message_base_(const ElectionMsgBase &message_base);
void refresh_priority_();
@ -279,6 +279,7 @@ private:// 定向暴露给友元类
int send_(const ElectionChangeLeaderMsg &msg) const;
uint64_t get_ls_biggest_min_cluster_version_ever_seen_() const;
const char *print_version_pretty_(const uint64_t version) const;
uint64_t generate_inner_priority_seed_() const;
private:
bool is_inited_;
bool is_running_;
@ -292,6 +293,18 @@ private:
ObFunction<int(const int64_t, const ObAddr &)> prepare_change_leader_cb_;// 切主回调
ObFunction<void(ElectionImpl *,common::ObRole,common::ObRole,RoleChangeReason)> role_change_cb_;// 角色状态变更回调
uint64_t inner_priority_seed_;// 协议内选举优先级
struct TemporarilyDowngradePriorityInfo {
TemporarilyDowngradePriorityInfo()
: downgrade_expire_ts_(0),
interval_(0),
reason_(nullptr) {}
int64_t downgrade_expire_ts_;// 触发临时降低选举优先级的结束时间
int64_t interval_;// 临时降低选举优先级的持续时长
const char *reason_; // 临时降低选举优先级的原因
TO_STRING_KV("downgrade_expire_ts", TimeSpanWrapper(downgrade_expire_ts_),
"interval", ObTimeLiteralPrettyPrinter(interval_),
K_(reason));
} temporarily_downgrade_priority_info_;
common::ObOccamTimer *timer_;// 选举定时任务的定时器
LsBiggestMinClusterVersionEverSeen ls_biggest_min_cluster_version_ever_seen_;// 为仲裁副本维护的日志流级别的min_cluster_version值,用于处理选举兼容性升级相关问题
EventRecorder event_recorder_;// 事件汇报模块

View File

@ -294,12 +294,17 @@ int ElectionProposer::reschedule_or_register_prepare_task_after_(const int64_t d
int ret = OB_SUCCESS;
LockGuard lock_guard(p_election_->lock_);
if (check_leader()) {// Leader不应该靠定时任务主动做Prepare,只能被动触发Prepare
LOG_RENEW_LEASE(INFO, "leader not allow do prepare in timer task before lease expired, this log may printed when message delay too large", K(*this));
if (prepare_success_ballot_ == ballot_number_) {
LOG_RENEW_LEASE(INFO, "leader not allow do prepare in timer task before lease expired, this log may printed when message delay too large", K(*this));
} else {// 需要进行leader prepare推大用于续约的ballot number
LOG_RENEW_LEASE(INFO, "prepare_success_ballot_ not same as ballot_number_, do leader prepare to advance it");
this->prepare(role_);
}
} else {
if (role_ == ObRole::LEADER) {
role_ = ObRole::FOLLOWER;
}
this->prepare(role_);// 只有Follower可以走到这里
this->prepare(role_);
}
return false;
}))) {
@ -334,7 +339,7 @@ void ElectionProposer::stop()
if (leader_revoke_if_lease_expired_(RoleChangeReason::StopToRevoke)) {
LOG_DESTROY(INFO, "leader revoke because election is stopped");
}
#undef PRINT_WRAPPER
#undef PRINT_WRAPPER
}
void ElectionProposer::prepare(const ObRole role)
@ -368,7 +373,7 @@ void ElectionProposer::prepare(const ObRole role)
restart_counter_,
ballot_number_,
p_election_->get_ls_biggest_min_cluster_version_ever_seen_(),
p_election_->inner_priority_seed_,
p_election_->generate_inner_priority_seed_(),
p_election_->get_membership_version_());
(void) p_election_->refresh_priority_();
if (CLICK_FAIL(prepare_req.set(p_election_->get_priority_(),
@ -430,7 +435,7 @@ void ElectionProposer::on_prepare_request(const ElectionPrepareRequestMsg &prepa
restart_counter_,
ballot_number_,
p_election_->get_ls_biggest_min_cluster_version_ever_seen_(),
p_election_->inner_priority_seed_,
p_election_->generate_inner_priority_seed_(),
p_election_->get_membership_version_());
if (CLICK_FAIL(prepare_followed_req.set(p_election_->get_priority_(),
role_))) {
@ -576,7 +581,7 @@ void ElectionProposer::on_accept_response(const ElectionAcceptResponseMsg &accep
ObStringHolder higher_than_cached_msg_reason;
(void) p_election_->refresh_priority_();
ElectionAcceptResponseMsg mock_self_accept_response_msg(p_election_->self_addr_,
p_election_->inner_priority_seed_,
p_election_->generate_inner_priority_seed_(),
p_election_->get_membership_version_(),
p_election_->get_ls_biggest_min_cluster_version_ever_seen_(),
ElectionAcceptRequestMsg(p_election_->id_,
@ -743,7 +748,7 @@ int64_t ElectionProposer::to_string(char *buf, const int64_t buf_len) const
common::databuff_printf(buf, buf_len, pos, ", switch_source_leader_addr:%s",
to_cstring(switch_source_leader_addr_));
}
common::databuff_printf(buf, buf_len, pos, ", priority_seed:0x%lx", (unsigned long)p_election_->inner_priority_seed_);
common::databuff_printf(buf, buf_len, pos, ", priority_seed:0x%lx", (unsigned long)p_election_->generate_inner_priority_seed_());
common::databuff_printf(buf, buf_len, pos, ", restart_counter:%ld", restart_counter_);
common::databuff_printf(buf, buf_len, pos, ", last_do_prepare_ts:%s", ObTime2Str::ob_timestamp_str_range<YEAR, USECOND>(last_do_prepare_ts_));
if (OB_NOT_NULL(p_election_)) {
@ -753,23 +758,6 @@ int64_t ElectionProposer::to_string(char *buf, const int64_t buf_len) const
return pos;
}
int ElectionProposer::revoke(const RoleChangeReason &reason)
{
ELECT_TIME_GUARD(500_ms);
#define PRINT_WRAPPER K(*this)
int ret = OB_SUCCESS;
if (!check_leader()) {
ret = OB_NOT_MASTER;
LOG_NONE(WARN, "i am not leader, but someone ask me to revoke", K(lbt()));
}
leader_lease_and_epoch_.reset();
if (!leader_revoke_if_lease_expired_(reason)) {
LOG_NONE(WARN, "somethig wrong when revoke", K(lbt()));
}
return ret;
#undef PRINT_WRAPPER
}
bool ElectionProposer::is_self_in_memberlist_() const
{
bool ret = false;

View File

@ -91,7 +91,6 @@ public:
}
return ret;
}
int revoke(const RoleChangeReason &reason);
public:
// 发prepare请求
void prepare(const common::ObRole role);

View File

@ -41,7 +41,6 @@ enum class RoleChangeReason
LeaseExpiredToRevoke = 3, // 有主连任失败,Lease超时,从Leader变为Follower
ChangeLeaderToRevoke = 4, // 切主流程旧主从Leader变为Follower
StopToRevoke = 5,// 选举leader调用stop接口后leader卸任
AskToRevoke = 6,// 有人要求选举卸任(???)
};
class ElectionProposer;
@ -73,7 +72,7 @@ public:
int64_t &cur_leader_epoch) const = 0;
// 供role change service使用
virtual int change_leader_to(const common::ObAddr &dest_addr) = 0;
virtual int revoke(const RoleChangeReason &reason) = 0;
virtual int temporarily_downgrade_protocol_priority(const int64_t time_us, const char *reason) = 0;
// 拿本机地址
virtual const common::ObAddr &get_self_addr() const = 0;
// 打印日志

View File

@ -103,6 +103,8 @@ enum class LogPhase
enum class PRIORITY_SEED_BIT : uint64_t
{
DEFAULT_SEED = (1ULL << 12),
TEST_BIT = (1ULL << 13),
SEED_TEMORARILY_DOWNGRADE_PRIORIY_BIT = (1ULL << 22),
SEED_IN_REBUILD_PHASE_BIT = (1ULL << 32),
SEED_NOT_NORMOL_REPLICA_BIT = (1ULL << 48),
};

View File

@ -102,7 +102,7 @@ inline int64_t count_if(const common::ObArray<T> &array, ObFunction<bool(const T
}
template <typename T>
inline void bubble_sort_desc(common::ObArray<T> &array)
inline void bubble_sort_desc(common::ObIArray<T> &array)
{
if(array.count() >= 2 ) {
for (int64_t i = 0; i < array.count() - 1; ++i) {
@ -120,7 +120,7 @@ inline int get_sorted_majority_one_desc(const common::ObArray<T> &array, T &majo
{
ELECT_TIME_GUARD(500_ms);
int ret = common::OB_SUCCESS;
common::ObArray<T> temp_array;
common::ObSEArray<T, 7> temp_array;
for (int64_t idx = 0; idx < array.count() && OB_SUCC(ret); ++idx) {
if (CLICK_FAIL(temp_array.push_back(array.at(idx)))) {
ELECT_LOG(ERROR, "assign temp array failed", KR(ret));
@ -173,9 +173,6 @@ inline const char *obj_to_string<RoleChangeReason>(const RoleChangeReason &v)
case RoleChangeReason::StopToRevoke:
ret = "election stopped to revoke";
break;
case RoleChangeReason::AskToRevoke:
ret = "someone asking election to revoke";
break;
default:
break;
}

View File

@ -703,10 +703,14 @@ int PalfHandle::reset_election_priority()
return ret;
}
int PalfHandle::revoke_leader(const int64_t proposal_id)
int PalfHandle::advance_election_epoch_and_downgrade_priority(const int64_t proposal_id,
const int64_t downgrade_priority_time_us,
const char *reason)
{
CHECK_VALID;
return palf_handle_impl_->revoke_leader(proposal_id);
return palf_handle_impl_->advance_election_epoch_and_downgrade_priority(proposal_id,
downgrade_priority_time_us,
reason);
}
int PalfHandle::stat(PalfStat &palf_stat) const

View File

@ -417,7 +417,9 @@ public:
// - OB_NOT_INIT
int get_arbitration_member(common::ObMember &arb_member) const;
#endif
int revoke_leader(const int64_t proposal_id);
int advance_election_epoch_and_downgrade_priority(const int64_t proposal_id,
const int64_t downgrade_priority_time_us,
const char *reason);
int change_leader_to(const common::ObAddr &dst_addr);
// @brief: change AccessMode of palf.
// @param[in] const int64_t &proposal_id: current proposal_id of leader

View File

@ -4570,20 +4570,24 @@ int PalfHandleImpl::get_election_leader_without_lock_(ObAddr &addr) const
return ret;
}
int PalfHandleImpl::revoke_leader(const int64_t proposal_id)
int PalfHandleImpl::advance_election_epoch_and_downgrade_priority(const int64_t proposal_id,
const int64_t downgrade_priority_time_us,
const char *reason)
{
int ret = OB_SUCCESS;
RLockGuard guard(lock_);
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
PALF_LOG(WARN, "PalfHandleImpl not inited", K(ret), K_(palf_id));
PALF_LOG(WARN, "PalfHandleImpl not inited", K(ret), K_(palf_id), K(reason));
} else if (false == state_mgr_.can_revoke(proposal_id)) {
ret = OB_NOT_MASTER;
PALF_LOG(WARN, "revoke_leader failed, not master", K(ret), K_(palf_id), K(proposal_id));
} else if (OB_FAIL(election_.revoke(RoleChangeReason::AskToRevoke))) {
PALF_LOG(WARN, "PalfHandleImpl revoke leader failed", K(ret), K_(palf_id));
PALF_LOG(WARN, "advance election epoch failed, not master", K(ret), K_(palf_id), K(proposal_id), K(reason));
} else if (OB_FAIL(election_.change_leader_to(GCTX.self_addr()))) {
PALF_LOG(WARN, "fail to change leader to self", K(ret), K_(palf_id), K(proposal_id), K(reason));
} else if (OB_FAIL(election_.temporarily_downgrade_protocol_priority(downgrade_priority_time_us, reason))) {
PALF_LOG(WARN, "PalfHandleImpl revoke leader failed", K(ret), K_(palf_id), K(reason));
} else {
PALF_LOG(INFO, "PalfHandleImpl revoke leader success", K(ret), K_(palf_id));
PALF_LOG(INFO, "PalfHandleImpl revoke leader success", K(ret), K_(palf_id), K(reason));
}
return ret;
}

View File

@ -805,7 +805,9 @@ public:
virtual int set_election_priority(election::ElectionPriority *priority) = 0;
virtual int reset_election_priority() = 0;
// ==================== Callback end ========================
virtual int revoke_leader(const int64_t proposal_id) = 0;
virtual int advance_election_epoch_and_downgrade_priority(const int64_t proposal_id,
const int64_t downgrade_priority_time_us,
const char *reason) = 0;
virtual int flashback(const int64_t mode_version,
const share::SCN &flashback_scn,
const int64_t timeout_us) = 0;
@ -1139,7 +1141,9 @@ public:
int handle_config_change_pre_check(const ObAddr &server,
const LogGetMCStReq &req,
LogGetMCStResp &resp) override final;
int revoke_leader(const int64_t proposal_id) override final;
int advance_election_epoch_and_downgrade_priority(const int64_t proposal_id,
const int64_t downgrade_priority_time_us,
const char *reason) override final;
int stat(PalfStat &palf_stat) override final;
int handle_register_parent_req(const LogLearner &child,
const bool is_to_leader) override final;

View File

@ -622,7 +622,7 @@ int ObRoleChangeService::switch_follower_to_leader_(
CLOG_LOG(INFO, "switch_follower_to_leader_ success", K(ret), KPC(ls));
}
if (OB_FAIL(ret) && !retry_ctx.need_retry()) {
log_handler->revoke_leader();
log_handler->advance_election_epoch_and_downgrade_priority(0_s, "palf switch follower to leader failed");
CLOG_LOG(WARN, "switch_follower_to_leader_ failed", K(ret), KPC(ls));
}
return ret;
@ -664,7 +664,7 @@ int ObRoleChangeService::switch_leader_to_follower_forcedly_(
CLOG_LOG(INFO, "switch_leader_to_follower_forcedly_ success", K(ret), KPC(ls));
}
if (OB_FAIL(ret)) {
log_handler->revoke_leader();
log_handler->advance_election_epoch_and_downgrade_priority(0_s, "palf switch leader to follower forcedly failed");
CLOG_LOG(WARN, "switch_leader_to_follower_forcedly_ failed", K(ret), K(new_proposal_id), K(new_role));
}
return ret;
@ -722,7 +722,7 @@ int ObRoleChangeService::switch_leader_to_follower_gracefully_(
CLOG_LOG(INFO, "switch_to_follower_gracefully success", K(ret), K(new_role), K(new_proposal_id), K(dst_addr));
}
if (OB_FAIL(ret) || OB_LS_NEED_REVOKE == tmp_ret) {
log_handler->revoke_leader();
log_handler->advance_election_epoch_and_downgrade_priority(0_s, "palf switch leader to follower gracefully failed");
CLOG_LOG(WARN, "switch_leader_to_follower_gracefully failed, revoke leader", K(ret), K(tmp_ret), K(dst_addr),
K(new_role), K(new_proposal_id));
ret = (OB_SUCCESS == ret ? tmp_ret : ret);
@ -785,7 +785,7 @@ int ObRoleChangeService::switch_follower_to_leader_restore_(
ATOMIC_SET(&cur_task_info_.log_type_, ObLogBaseType::INVALID_LOG_BASE_TYPE);
}
if (OB_FAIL(ret)) {
log_restore_handler->revoke_leader();
log_restore_handler->advance_election_epoch_and_downgrade_priority(0_s, "palf switch follower to leader restore failed");
}
return ret;
}
@ -819,7 +819,7 @@ int ObRoleChangeService::switch_leader_to_follower_gracefully_restore_(
} else {
}
if (OB_FAIL(ret) || OB_LS_NEED_REVOKE == tmp_ret) {
log_restore_handler->revoke_leader();
log_restore_handler->advance_election_epoch_and_downgrade_priority(0_s, "palf switch leader to follower gracefully restore failed");
CLOG_LOG(WARN, "switch_leader_to_follower_gracefully failed, revoke leader", K(ret), K(tmp_ret), K(dst_addr));
ret = (OB_SUCCESS == ret ? tmp_ret : ret);
}

View File

@ -81,6 +81,13 @@ public:
UNUSED(dest_addr);
return ret;
}
virtual int temporarily_downgrade_protocol_priority(const int64_t time_us, const char *reason) override final
{
int ret = OB_SUCCESS;
UNUSED(time_us);
UNUSED(reason);
return OB_SUCCESS;
}
// 拿本机地址
const common::ObAddr &get_self_addr() const override final
{

View File

@ -596,6 +596,92 @@ TEST_F(TestElection, set_inner_priority_seed) {
ASSERT_EQ(stop_to_be_follower_count, 1);
}
TEST_F(TestElection, advance_ballot) {
// 创建paxos group
auto election_list = create_election_group(3, {(uint64_t)PRIORITY_SEED_BIT::DEFAULT_SEED, (uint64_t)PRIORITY_SEED_BIT::DEFAULT_SEED, (uint64_t)PRIORITY_SEED_BIT::DEFAULT_SEED}, [](){});
// 建立网络连接
for (auto &election_1 : election_list) {
for (auto &election_2 : election_list) {
GlobalNetService.connect(election_1, election_2);
}
}
this_thread::sleep_for(chrono::seconds(7));// 等待第一轮选举结果,为election[0]
ObRole role;
int64_t old_epoch;
election_list[0]->get_role(role, old_epoch);
ASSERT_EQ(role, ObRole::LEADER);
// 推大leader的ballot number之后,leader将会重新进行paxos两阶段确定新的epoch值
ASSERT_EQ(OB_SUCCESS, election_list[0]->change_leader_to(election_list[0]->get_self_addr()));
this_thread::sleep_for(chrono::seconds(5));// wait for advance prepare_success_ballot
int64_t new_epoch;
election_list[0]->get_role(role, new_epoch);
ASSERT_EQ(role, ObRole::LEADER);
ASSERT_EQ(old_epoch + 1, new_epoch);
// 析构+清理动作
for (auto iter = election_list.rbegin(); iter != election_list.rend(); ++iter)
(*iter)->stop();
this_thread::sleep_for(chrono::seconds(2));
for (auto &election_ : election_list)
delete election_;
// 测试过程中发生的事件数量断言
ASSERT_EQ(leader_takeover_times, 2);
ASSERT_EQ(leader_revoke_times, 2);
ASSERT_EQ(devote_to_be_leader_count, 1);
ASSERT_EQ(lease_expired_to_be_follower_count, 0);
ASSERT_EQ(change_leader_to_be_leader_count, 1);
ASSERT_EQ(change_leader_to_be_follower_count, 1);
ASSERT_EQ(stop_to_be_follower_count, 1);
}
TEST_F(TestElection, temporarily_downgrade_protocol_priority) {
// 创建paxos group,初始场景下,election[0]的优先级为默认,election[1]/[2]的优先级被默认降低
auto election_list = create_election_group(3, {(uint64_t)PRIORITY_SEED_BIT::DEFAULT_SEED,
(uint64_t)PRIORITY_SEED_BIT::DEFAULT_SEED | (uint64_t)PRIORITY_SEED_BIT::TEST_BIT,
(uint64_t)PRIORITY_SEED_BIT::DEFAULT_SEED | (uint64_t)PRIORITY_SEED_BIT::TEST_BIT}, [](){});
// 建立网络连接
for (auto &election_1 : election_list) {
for (auto &election_2 : election_list) {
GlobalNetService.connect(election_1, election_2);
}
}
this_thread::sleep_for(chrono::seconds(7));// 等待第一轮选举结果,为election[0]
ObRole role;
int64_t _;
election_list[0]->get_role(role, _);
ASSERT_EQ(role, ObRole::LEADER);
// 临时降低election[0]的优先级2s,此时发生切主:election[0] -> election[1]
ASSERT_EQ(OB_SUCCESS, election_list[0]->temporarily_downgrade_protocol_priority(2_s, "test"));
this_thread::sleep_for(chrono::seconds(2));
election_list[1]->get_role(role, _);
ASSERT_EQ(role, ObRole::LEADER);
// 等待2s后,election[0]的优先级恢复,此时发生切主:election[1] -> election[0]
this_thread::sleep_for(chrono::seconds(2));
election_list[0]->get_role(role, _);
ASSERT_EQ(role, ObRole::LEADER);
// 析构+清理动作
for (auto iter = election_list.rbegin(); iter != election_list.rend(); ++iter)
(*iter)->stop();
this_thread::sleep_for(chrono::seconds(2));
for (auto &election_ : election_list)
delete election_;
// 测试过程中发生的事件数量断言
ASSERT_EQ(leader_takeover_times, 3);
ASSERT_EQ(leader_revoke_times, 3);
ASSERT_EQ(devote_to_be_leader_count, 1);
ASSERT_EQ(lease_expired_to_be_follower_count, 0);
ASSERT_EQ(change_leader_to_be_leader_count, 2);
ASSERT_EQ(change_leader_to_be_follower_count, 2);
ASSERT_EQ(stop_to_be_follower_count, 1);
}
}
}

View File

@ -144,8 +144,9 @@ public:
UNUSED(dst_addr);
return OB_SUCCESS;
}
virtual int revoke_leader()
virtual int advance_election_epoch_and_downgrade_priority(const int64_t downgrade_priority_time_us, const char *reason)
{
UNUSED(downgrade_priority_time_us, reason);
curr_role_ = FOLLOWER;
new_role_ = FOLLOWER;
new_proposal_id_++;