fix the problem that the gap between triggering a rebuild and disabling voting may lead to unnecessary rebuilds
This commit is contained in:
@ -134,7 +134,7 @@ TEST_F(TestObSimpleLogClusterArbService, test_2f1a_degrade_upgrade)
|
||||
EXPECT_TRUE(is_upgraded(leader, id));
|
||||
|
||||
// test disbale vote
|
||||
palf_list[another_f_idx]->palf_handle_impl_->disable_vote();
|
||||
palf_list[another_f_idx]->palf_handle_impl_->disable_vote(false/*no need check log missing*/);
|
||||
EXPECT_TRUE(is_degraded(leader, another_f_idx));
|
||||
palf_list[another_f_idx]->palf_handle_impl_->enable_vote();
|
||||
EXPECT_TRUE(is_upgraded(leader, id));
|
||||
|
||||
@ -1136,7 +1136,7 @@ int ObLogHandler::set_region(const common::ObRegion ®ion)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObLogHandler::disable_vote()
|
||||
int ObLogHandler::disable_vote(const bool need_check_log_missing)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
RLockGuard guard(lock_);
|
||||
@ -1145,7 +1145,7 @@ int ObLogHandler::disable_vote()
|
||||
} else if (is_in_stop_state_) {
|
||||
ret = OB_NOT_RUNNING;
|
||||
} else {
|
||||
ret = palf_handle_.disable_vote();
|
||||
ret = palf_handle_.disable_vote(need_check_log_missing);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -135,7 +135,7 @@ public:
|
||||
virtual int pend_submit_replay_log() = 0;
|
||||
virtual int restore_submit_replay_log() = 0;
|
||||
virtual bool is_replay_enabled() const = 0;
|
||||
virtual int disable_vote() = 0;
|
||||
virtual int disable_vote(const bool need_check_log_missing) = 0;
|
||||
virtual int enable_vote() = 0;
|
||||
virtual int register_rebuild_cb(palf::PalfRebuildCb *rebuild_cb) = 0;
|
||||
virtual int unregister_rebuild_cb() = 0;
|
||||
@ -458,10 +458,16 @@ public:
|
||||
// @brief, get max decided log ts considering both apply and replay.
|
||||
// @param[out] int64_t&, max decided log ts ns.
|
||||
int get_max_decided_scn(share::SCN &scn) override final;
|
||||
// @brief: store a persistent flag which means this paxos replica
|
||||
// can not reply ack when receiving logs.
|
||||
// @brief: store a persistent flag which means this paxos replica can not reply ack when receiving logs.
|
||||
// By default, paxos replica can reply ack.
|
||||
int disable_vote() override final;
|
||||
// @param[in] need_check_log_missing: for rebuildinng caused by log missing, need check whether log
|
||||
// is actually missing
|
||||
// @return:
|
||||
// OB_NOT_INIT: not inited
|
||||
// OB_NOT_RUNNING: in stop state
|
||||
// OB_OP_NOT_ALLOW: no need to rebuilds, log is available. rebuilding should be abandoned.
|
||||
// OB_LEADER_NOT_EXIST: no leader when double checking. rebuilding should retry.
|
||||
int disable_vote(const bool need_check_log_missing) override final;
|
||||
// @brief: store a persistent flag which means this paxos replica
|
||||
// can reply ack when receiving logs.
|
||||
// By default, paxos replica can reply ack.
|
||||
|
||||
@ -56,6 +56,7 @@ LogStateMgr::LogStateMgr()
|
||||
scan_disk_log_finished_(false),
|
||||
is_sync_enabled_(true),
|
||||
allow_vote_(true),
|
||||
allow_vote_persisted_(true),
|
||||
replica_type_(NORMAL_REPLICA),
|
||||
is_changing_config_with_arb_(false),
|
||||
last_set_changing_config_with_arb_time_us_(OB_INVALID_TIMESTAMP),
|
||||
@ -100,6 +101,7 @@ int LogStateMgr::init(const int64_t palf_id,
|
||||
state_ = INIT;
|
||||
scan_disk_log_finished_ = false;
|
||||
allow_vote_ = replica_property_meta.allow_vote_;
|
||||
allow_vote_persisted_ = replica_property_meta.allow_vote_;
|
||||
replica_type_ = replica_property_meta.replica_type_;
|
||||
is_sync_enabled_ = !is_arb_replica();
|
||||
is_inited_ = true;
|
||||
@ -1098,6 +1100,24 @@ bool LogStateMgr::is_allow_vote() const
|
||||
return ATOMIC_LOAD(&allow_vote_);
|
||||
}
|
||||
|
||||
bool LogStateMgr::is_allow_vote_persisted() const
|
||||
{
|
||||
|
||||
return ATOMIC_LOAD(&allow_vote_persisted_);
|
||||
}
|
||||
|
||||
int LogStateMgr::disable_vote_in_mem()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
} else {
|
||||
ATOMIC_STORE(&allow_vote_, false);
|
||||
PALF_LOG(INFO, "disable_vote_in_mem success", K(ret), K_(palf_id), K_(self));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int LogStateMgr::disable_vote()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -1105,6 +1125,7 @@ int LogStateMgr::disable_vote()
|
||||
ret = OB_NOT_INIT;
|
||||
} else {
|
||||
ATOMIC_STORE(&allow_vote_, false);
|
||||
ATOMIC_STORE(&allow_vote_persisted_, false);
|
||||
PALF_LOG(INFO, "disable_vote success", K(ret), K_(palf_id), K_(self));
|
||||
}
|
||||
return ret;
|
||||
@ -1117,6 +1138,7 @@ int LogStateMgr::enable_vote()
|
||||
ret = OB_NOT_INIT;
|
||||
} else {
|
||||
ATOMIC_STORE(&allow_vote_, true);
|
||||
ATOMIC_STORE(&allow_vote_persisted_, true);
|
||||
PALF_LOG(INFO, "enable_vote success", K(ret), K_(palf_id), K_(self));
|
||||
}
|
||||
return ret;
|
||||
|
||||
@ -92,6 +92,10 @@ public:
|
||||
virtual int disable_sync();
|
||||
virtual bool is_sync_enabled() const;
|
||||
virtual bool is_allow_vote() const;
|
||||
virtual bool is_allow_vote_persisted() const;
|
||||
//only modify allow_vote_
|
||||
virtual int disable_vote_in_mem();
|
||||
//modify allow_vote_ and allow_vote_persisted_
|
||||
virtual int disable_vote();
|
||||
virtual int enable_vote();
|
||||
virtual LogReplicaType get_replica_type() const;
|
||||
@ -102,7 +106,7 @@ public:
|
||||
virtual bool is_changing_config_with_arb() const;
|
||||
TO_STRING_KV(KP(this), K_(self), K_(palf_id), "role", role_to_string(role_), "replica_type", \
|
||||
replica_type_2_str(replica_type_), "state", replica_state_to_string(state_), K_(prepare_meta), \
|
||||
K_(leader), K_(leader_epoch), K_(is_sync_enabled), K_(allow_vote), K_(pending_end_lsn), \
|
||||
K_(leader), K_(leader_epoch), K_(is_sync_enabled), K_(allow_vote), K_(allow_vote_persisted), K_(pending_end_lsn), \
|
||||
K_(scan_disk_log_finished), K_(last_check_start_id), K_(is_changing_config_with_arb), \
|
||||
K_(reconfirm_start_time_us), KP_(palf_role_change_cb), K_(allow_vote));
|
||||
private:
|
||||
@ -191,6 +195,8 @@ private:
|
||||
// whether this replica is allowed to reply ack when receiving logs
|
||||
// it's true by default
|
||||
bool allow_vote_;
|
||||
// value of allow_vote persisted_, will be modified after meta is flushed
|
||||
bool allow_vote_persisted_;
|
||||
// whether this replica is an arbitration replica
|
||||
LogReplicaType replica_type_;
|
||||
// is changing config with arbitration member, stop appending logs
|
||||
|
||||
@ -404,11 +404,11 @@ int PalfHandle::get_access_mode(AccessMode &access_mode) const
|
||||
return ret;
|
||||
}
|
||||
|
||||
int PalfHandle::disable_vote()
|
||||
int PalfHandle::disable_vote(const bool need_check_log_missing)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
CHECK_VALID;
|
||||
ret = palf_handle_impl_->disable_vote();
|
||||
ret = palf_handle_impl_->disable_vote(need_check_log_missing);
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
@ -325,9 +325,10 @@ public:
|
||||
bool is_vote_enabled() const;
|
||||
// @brief: store a persistent flag which means this paxos replica
|
||||
// can not reply ack when receiving logs.
|
||||
// @param[in] need_check_log_missing: for rebuildinng caused by log missing, need check whether log
|
||||
// By default, paxos replica can reply ack.
|
||||
// @return:
|
||||
int disable_vote();
|
||||
int disable_vote(const bool need_check_log_missing);
|
||||
// @brief: store a persistent flag which means this paxos replica
|
||||
// can reply ack when receiving logs.
|
||||
// By default, paxos replica can reply ack.
|
||||
|
||||
@ -64,6 +64,7 @@ PalfHandleImpl::PalfHandleImpl()
|
||||
replace_member_print_time_us_(OB_INVALID_TIMESTAMP),
|
||||
config_change_print_time_us_(OB_INVALID_TIMESTAMP),
|
||||
last_rebuild_lsn_(),
|
||||
last_rebuild_meta_info_(),
|
||||
last_record_append_lsn_(PALF_INITIAL_LSN_VAL),
|
||||
has_set_deleted_(false),
|
||||
palf_env_impl_(NULL),
|
||||
@ -1216,19 +1217,34 @@ bool PalfHandleImpl::is_vote_enabled() const
|
||||
return bool_ret;
|
||||
}
|
||||
|
||||
int PalfHandleImpl::disable_vote()
|
||||
/*brief:disable_vote(need_check_log_missing), this function is reenterable.
|
||||
* step 1: check voting status, if already disabled, just return
|
||||
* step 2: for need_check_log_missing situation, double check whether it is really necessary to rebuild
|
||||
* step 3: set voting flag as false when necessary
|
||||
*/
|
||||
int PalfHandleImpl::disable_vote(const bool need_check_log_missing)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const PRIORITY_SEED_BIT new_election_inner_priority_seed = PRIORITY_SEED_BIT::SEED_IN_REBUILD_PHASE_BIT;
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
// Update election priority firstly
|
||||
} else if (OB_FAIL(election_.add_inner_priority_seed_bit(new_election_inner_priority_seed))
|
||||
&& OB_ENTRY_EXIST != ret) {
|
||||
} else {
|
||||
//step 1: check vote status.
|
||||
bool vote_disabled = false;
|
||||
do {
|
||||
RLockGuard guard(lock_);
|
||||
if (!state_mgr_.is_allow_vote()) {
|
||||
PALF_LOG(INFO, "vote has already been disabled", KPC(this));
|
||||
vote_disabled = true;
|
||||
}
|
||||
} while(0);
|
||||
|
||||
if (!vote_disabled) {
|
||||
if (OB_FAIL(election_.add_inner_priority_seed_bit(new_election_inner_priority_seed)) && OB_ENTRY_EXIST != ret) {
|
||||
// Because this interface is idempotent, so we need ignore err code OB_ENTRY_EXIST.
|
||||
PALF_LOG(WARN, "election add_inner_priority_seed_bit for rebuild failed", KPC(this));
|
||||
// Update allow_vote flag
|
||||
} else if (OB_FAIL(set_allow_vote_flag_(false))) {
|
||||
} else if (OB_FAIL(set_allow_vote_flag_(false, need_check_log_missing))) {
|
||||
PALF_LOG(WARN, "set_allow_vote_flag failed", KPC(this));
|
||||
// rollback election priority when it encounters failure
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
@ -1236,7 +1252,9 @@ int PalfHandleImpl::disable_vote()
|
||||
PALF_LOG(WARN, "election clear_inner_priority_seed_bit for rebuild failed", K(tmp_ret), KPC(this));
|
||||
}
|
||||
} else {
|
||||
PALF_EVENT("disable_vote success", palf_id_, KPC(this));
|
||||
PALF_EVENT("disable_vote success", palf_id_, KPC(this), K(need_check_log_missing));
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -1248,14 +1266,14 @@ int PalfHandleImpl::enable_vote()
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
// Update allow_vote flag firstly
|
||||
} else if (OB_FAIL(set_allow_vote_flag_(true))) {
|
||||
} else if (OB_FAIL(set_allow_vote_flag_(true, false/*no need check log misingg*/))) {
|
||||
PALF_LOG(WARN, "set_allow_vote_flag failed", KPC(this));
|
||||
} else if (OB_FAIL(election_.clear_inner_priority_seed_bit(election_inner_priority_seed))
|
||||
&& OB_ENTRY_NOT_EXIST != ret) {
|
||||
PALF_LOG(WARN, "election clear_inner_priority_seed_bit for rebuild failed", KPC(this));
|
||||
// rollback allow_vote flag when it encounters failure
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
if (OB_SUCCESS != (tmp_ret = set_allow_vote_flag_(false))) {
|
||||
if (OB_SUCCESS != (tmp_ret = set_allow_vote_flag_(false, false/*no need check log misingg*/))) {
|
||||
PALF_LOG(WARN, "rollback allow_vote flag failed", K(tmp_ret), KPC(this));
|
||||
}
|
||||
} else {
|
||||
@ -1264,7 +1282,8 @@ int PalfHandleImpl::enable_vote()
|
||||
return ret;
|
||||
}
|
||||
|
||||
int PalfHandleImpl::set_allow_vote_flag_(const bool allow_vote)
|
||||
int PalfHandleImpl::set_allow_vote_flag_(const bool allow_vote,
|
||||
const bool need_check_log_missing)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObSpinLockGuard guard(replica_meta_lock_);
|
||||
@ -1272,7 +1291,24 @@ int PalfHandleImpl::set_allow_vote_flag_(const bool allow_vote)
|
||||
ret = OB_NOT_SUPPORTED;
|
||||
PALF_LOG(WARN, "can not enable_vote/disable_vote in arb_member", K(ret), KPC(this));
|
||||
} else {
|
||||
RLockGuard guard(lock_);
|
||||
WLockGuard guard(lock_);
|
||||
if (!allow_vote && need_check_log_missing) {
|
||||
//disable_vote and need check whether log is actually missing
|
||||
RebuildMetaInfo last_rebuild_meta_info;
|
||||
RebuildMetaInfo rebuild_meta_info;
|
||||
get_last_rebuild_meta_info_(last_rebuild_meta_info);
|
||||
if (last_rebuild_meta_info.is_valid()) {
|
||||
//check with local rebuild meta info
|
||||
(void)gen_rebuild_meta_info_(rebuild_meta_info);
|
||||
ret = (last_rebuild_meta_info == rebuild_meta_info) ? OB_SUCCESS : OB_OP_NOT_ALLOW;
|
||||
PALF_LOG(INFO, "double check whether need disable_vote", K(last_rebuild_meta_info),
|
||||
K(rebuild_meta_info), KPC(this));
|
||||
} else {
|
||||
ret = OB_OP_NOT_ALLOW;
|
||||
PALF_LOG(INFO, "maybe restart during rebuild, just return OB_OP_NOT_ALLOW", KPC(this));
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
FlushMetaCbCtx flush_meta_cb_ctx;
|
||||
flush_meta_cb_ctx.type_ = REPLICA_PROPERTY_META;
|
||||
flush_meta_cb_ctx.allow_vote_ = allow_vote;
|
||||
@ -1285,11 +1321,17 @@ int PalfHandleImpl::set_allow_vote_flag_(const bool allow_vote)
|
||||
PALF_LOG(WARN, "election revoke failed", K(ret), K_(palf_id));
|
||||
} else if (OB_FAIL(log_engine_.submit_flush_replica_property_meta_task(flush_meta_cb_ctx, replica_property_meta))) {
|
||||
PALF_LOG(WARN, "submit_flush_replica_property_meta_task failed", K(ret), K(flush_meta_cb_ctx), K(replica_property_meta));
|
||||
} else {
|
||||
if (!allow_vote) {
|
||||
//for disble_vote, modify allow_vote in memory under protection of wlock
|
||||
state_mgr_.disable_vote_in_mem();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// wait until replica_property_meta has been flushed
|
||||
if (OB_SUCC(ret)) {
|
||||
while(allow_vote != state_mgr_.is_allow_vote()) {
|
||||
while(allow_vote != state_mgr_.is_allow_vote_persisted()) {
|
||||
ob_usleep(500);
|
||||
}
|
||||
}
|
||||
@ -2664,7 +2706,7 @@ int PalfHandleImpl::get_last_rebuild_lsn(LSN &last_rebuild_lsn) const
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
} else {
|
||||
SpinLockGuard guard(last_rebuild_lsn_lock_);
|
||||
SpinLockGuard guard(last_rebuild_meta_info_lock_);
|
||||
last_rebuild_lsn = last_rebuild_lsn_;
|
||||
}
|
||||
return ret;
|
||||
@ -2702,6 +2744,19 @@ int PalfHandleImpl::check_need_advance_base_info_(const LSN &base_lsn,
|
||||
return ret;
|
||||
}
|
||||
|
||||
void PalfHandleImpl::gen_rebuild_meta_info_(RebuildMetaInfo &rebuild_meta) const
|
||||
{
|
||||
int64_t unused_log_id = -1;
|
||||
sw_.get_committed_end_lsn(rebuild_meta.committed_end_lsn_);
|
||||
sw_.get_last_submit_log_info(rebuild_meta.last_submit_lsn_, unused_log_id, rebuild_meta.last_submit_log_pid_);
|
||||
}
|
||||
|
||||
void PalfHandleImpl::get_last_rebuild_meta_info_(RebuildMetaInfo &rebuild_meta_info) const
|
||||
{
|
||||
SpinLockGuard guard(last_rebuild_meta_info_lock_);
|
||||
rebuild_meta_info = last_rebuild_meta_info_;
|
||||
}
|
||||
|
||||
// caller should hold wlock when calling this function
|
||||
int PalfHandleImpl::check_need_rebuild_(const LSN &base_lsn,
|
||||
const LogInfo &base_prev_log_info,
|
||||
@ -2714,6 +2769,8 @@ int PalfHandleImpl::check_need_rebuild_(const LSN &base_lsn,
|
||||
int64_t last_submit_log_id;
|
||||
int64_t last_submit_log_pid;
|
||||
bool unused_bool;
|
||||
need_rebuild = false;
|
||||
need_fetch_log = false;
|
||||
if (!base_lsn.is_valid() || !base_prev_log_info.is_valid()) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
PALF_LOG(WARN, "invalid argument", K(ret), KPC(this), K(base_lsn), K(base_prev_log_info));
|
||||
@ -2723,8 +2780,7 @@ int PalfHandleImpl::check_need_rebuild_(const LSN &base_lsn,
|
||||
} else if (OB_FAIL(sw_.get_committed_end_lsn(committed_end_lsn))) {
|
||||
PALF_LOG(WARN, "get_committed_end_lsn failed", KR(ret), K_(palf_id));
|
||||
} else if (base_lsn <= committed_end_lsn) {
|
||||
ret = OB_NOT_SUPPORTED;
|
||||
PALF_LOG(WARN, "base_lsn is less than or equal to local committed_end_lsn",
|
||||
PALF_LOG(INFO, "base_lsn is less than or equal to local committed_end_lsn",
|
||||
K(ret), K_(palf_id), K(base_lsn), K(committed_end_lsn));
|
||||
} else if (OB_FAIL(sw_.get_last_submit_log_info(last_submit_lsn, last_submit_log_id, last_submit_log_pid))) {
|
||||
PALF_LOG(WARN, "get_last_submit_log_info failed", KR(ret), K_(palf_id));
|
||||
@ -2802,11 +2858,12 @@ int PalfHandleImpl::handle_notify_rebuild_req(const common::ObAddr &server,
|
||||
// this will cause wrong rebuild.
|
||||
bool need_rebuild = false;
|
||||
bool need_fetch_log = false;
|
||||
RebuildMetaInfo rebuild_meta_info;
|
||||
do {
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
// leader may send multiple notify_rebuild_req, when next req arrives, previous on_rebuild may
|
||||
// hold rlock, so try hold wlock and release it after timeout (1ms).
|
||||
const int64_t until_timeout_us = common::ObTimeUtility::current_time() + 1;
|
||||
const int64_t until_timeout_us = common::ObTimeUtility::current_time() + 1000;
|
||||
WLockGuardWithTimeout guard(lock_, until_timeout_us, tmp_ret);
|
||||
if (OB_SUCCESS != tmp_ret) {
|
||||
PALF_LOG(INFO, "notify_rebuild wait lock timeout", K(ret), KPC(this), K(server), K(base_lsn),
|
||||
@ -2819,9 +2876,13 @@ int PalfHandleImpl::handle_notify_rebuild_req(const common::ObAddr &server,
|
||||
PALF_LOG(WARN, "invalid argument", K(ret), K_(palf_id), K(server), K(base_lsn));
|
||||
} else if (OB_FAIL(check_need_rebuild_(base_lsn, base_prev_log_info, need_rebuild, need_fetch_log))) {
|
||||
PALF_LOG(WARN, "check_need_rebuild failed", K(ret), KPC(this), K(server), K(base_lsn), K(base_prev_log_info));
|
||||
}
|
||||
} else if (need_rebuild) {
|
||||
//set rebuild_meta_info
|
||||
gen_rebuild_meta_info_(rebuild_meta_info);
|
||||
} else {}
|
||||
} while (0);
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
// can not hold wlock when exec on_rebuild
|
||||
if (need_rebuild) {
|
||||
if (OB_FAIL(rebuild_cb_wrapper_.on_rebuild(palf_id_, base_lsn))) {
|
||||
@ -2830,12 +2891,14 @@ int PalfHandleImpl::handle_notify_rebuild_req(const common::ObAddr &server,
|
||||
PALF_EVENT("on_rebuild success", palf_id_, K(ret), K_(self), K(server), K(base_lsn));
|
||||
}
|
||||
// Whether on_rebuild returns OB_SUCCESS or not, set value for rebuild_base_lsn_
|
||||
SpinLockGuard rebuild_guard(last_rebuild_lsn_lock_);
|
||||
SpinLockGuard rebuild_guard(last_rebuild_meta_info_lock_);
|
||||
last_rebuild_lsn_ = base_lsn;
|
||||
last_rebuild_meta_info_ = rebuild_meta_info;
|
||||
} else if (need_fetch_log && OB_FAIL(sw_.try_fetch_log(FetchTriggerType::NOTIFY_REBUILD,
|
||||
base_prev_log_info.lsn_, base_lsn, base_prev_log_info.log_id_+1))) {
|
||||
PALF_LOG(WARN, "try_fetch_log failed", KR(ret), KPC(this), K(server), K(base_lsn), K(base_prev_log_info));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -4107,7 +4170,7 @@ int PalfHandleImpl::stat(PalfStat &palf_stat)
|
||||
} else {
|
||||
LSN last_rebuild_lsn;
|
||||
do {
|
||||
SpinLockGuard guard(last_rebuild_lsn_lock_);
|
||||
SpinLockGuard guard(last_rebuild_meta_info_lock_);
|
||||
last_rebuild_lsn = last_rebuild_lsn_;
|
||||
} while (0);
|
||||
|
||||
|
||||
@ -183,6 +183,38 @@ struct LSKey {
|
||||
TO_STRING_KV(K_(id));
|
||||
};
|
||||
|
||||
struct RebuildMetaInfo
|
||||
{
|
||||
public:
|
||||
RebuildMetaInfo() {reset();}
|
||||
~ RebuildMetaInfo() {reset();}
|
||||
void reset()
|
||||
{
|
||||
committed_end_lsn_.reset();
|
||||
last_submit_lsn_.reset();
|
||||
last_submit_log_pid_ = INVALID_PROPOSAL_ID;
|
||||
}
|
||||
bool is_valid() const
|
||||
{
|
||||
return (committed_end_lsn_.is_valid()
|
||||
&& last_submit_lsn_.is_valid()
|
||||
&& INVALID_PROPOSAL_ID != last_submit_log_pid_);
|
||||
}
|
||||
bool operator==(const RebuildMetaInfo &other) const
|
||||
{
|
||||
return (committed_end_lsn_ == other.committed_end_lsn_
|
||||
&& last_submit_lsn_ == other.last_submit_lsn_
|
||||
&& last_submit_log_pid_ == other.last_submit_log_pid_);
|
||||
}
|
||||
TO_STRING_KV(K_(committed_end_lsn),
|
||||
K_(last_submit_lsn),
|
||||
K_(last_submit_log_pid));
|
||||
public:
|
||||
LSN committed_end_lsn_;
|
||||
LSN last_submit_lsn_;
|
||||
int64_t last_submit_log_pid_;
|
||||
};
|
||||
|
||||
// 日志服务的接口类,logservice以外的模块使用日志服务,只允许调用IPalfHandleImpl的接口
|
||||
class IPalfHandleImpl : public common::LinkHashValue<LSKey>
|
||||
{
|
||||
@ -576,7 +608,14 @@ public:
|
||||
// can not reply ack when receiving logs.
|
||||
// By default, paxos replica can reply ack.
|
||||
// This interface is idempotent.
|
||||
virtual int disable_vote() = 0;
|
||||
// @param[in] need_check_log_missing: reason for rebuilding. True means log missing, False means data
|
||||
// missing
|
||||
// @return:
|
||||
// OB_NOT_INIT: not inited
|
||||
// OB_NOT_RUNNING: in stop state
|
||||
// OB_OP_NOT_ALLOWED: no need to rebuilds. rebuilding should be abandoned.
|
||||
// OB_LEADER_NOT_EXIST: no leader when double checking. rebuilding should retry.
|
||||
virtual int disable_vote(const bool need_check_log_missing) = 0;
|
||||
// @brief: store a persistent flag which means this paxos replica
|
||||
// can reply ack when receiving logs.
|
||||
// By default, paxos replica can reply ack.
|
||||
@ -710,7 +749,7 @@ public:
|
||||
int locate_by_scn_coarsely(const share::SCN &scn, LSN &result_lsn) override final;
|
||||
int locate_by_lsn_coarsely(const LSN &lsn, share::SCN &result_scn) override final;
|
||||
bool is_vote_enabled() const override final;
|
||||
int disable_vote() override final;
|
||||
int disable_vote(const bool need_check_log_missing) override final;
|
||||
int enable_vote() override final;
|
||||
public:
|
||||
int delete_block(const block_id_t &block_id) override final;
|
||||
@ -909,7 +948,11 @@ private:
|
||||
const LogModeMeta &mode_meta);
|
||||
int after_flush_snapshot_meta_(const LSN &lsn);
|
||||
int after_flush_replica_property_meta_(const bool allow_vote);
|
||||
int set_allow_vote_flag_(const bool allow_vote);
|
||||
/*
|
||||
*param[in] need_check_log_missing: for disable_vote invoke by rebuilding,
|
||||
true means need double check whether log is actually missing
|
||||
* */
|
||||
int set_allow_vote_flag_(const bool allow_vote, const bool need_check_log_missing);
|
||||
int get_prev_log_info_(const LSN &lsn, LogInfo &log_info);
|
||||
int get_prev_log_info_for_fetch_(const LSN &prev_lsn,
|
||||
const LSN &curr_lsn,
|
||||
@ -991,6 +1034,8 @@ private:
|
||||
int leader_sync_mode_meta_to_arb_member_();
|
||||
void is_in_sync_(bool &is_log_sync, bool &is_use_cache);
|
||||
int get_leader_max_scn_(SCN &max_scn, LSN &end_lsn);
|
||||
void gen_rebuild_meta_info_(RebuildMetaInfo &rebuild_meta) const;
|
||||
void get_last_rebuild_meta_info_(RebuildMetaInfo &rebuild_meta_info) const;
|
||||
private:
|
||||
class ElectionMsgSender : public election::ElectionMsgSender
|
||||
{
|
||||
@ -1077,8 +1122,9 @@ private:
|
||||
int64_t append_size_stat_time_us_;
|
||||
int64_t replace_member_print_time_us_;
|
||||
mutable int64_t config_change_print_time_us_;
|
||||
mutable SpinLock last_rebuild_lsn_lock_;
|
||||
mutable SpinLock last_rebuild_meta_info_lock_;//protect last_rebuild_lsn_ and last_rebuild_meta_info_
|
||||
LSN last_rebuild_lsn_;
|
||||
RebuildMetaInfo last_rebuild_meta_info_;//used for double checking whether it is necessary to rebuild
|
||||
LSN last_record_append_lsn_;
|
||||
// NB: only set has_set_deleted_ to true when this palf_handle has been deleted.
|
||||
bool has_set_deleted_;
|
||||
|
||||
@ -454,6 +454,7 @@ class ObString;
|
||||
ACT(AFTER_LS_GC_DELETE_ALL_TABLETS,)\
|
||||
ACT(BEFORE_ARCHIVE_ADD_LS_TASK,)\
|
||||
ACT(AFTER_UPDATE_INDEX_STATUS,)\
|
||||
ACT(BEFORE_MIGRATION_DISABLE_VOTE,)\
|
||||
ACT(MAX_DEBUG_SYNC_POINT,)
|
||||
|
||||
DECLARE_ENUM(ObDebugSyncPoint, debug_sync_point, OB_DEBUG_SYNC_POINT_DEF);
|
||||
|
||||
@ -315,6 +315,7 @@ int ObLSCompleteMigrationDagNet::update_migration_status_(ObLS *ls)
|
||||
bool is_finish = false;
|
||||
static const int64_t UPDATE_MIGRATION_STATUS_INTERVAL_MS = 100 * 1000; //100ms
|
||||
ObTenantDagScheduler *scheduler = nullptr;
|
||||
int32_t result = OB_SUCCESS;
|
||||
|
||||
if (!is_inited_) {
|
||||
ret = OB_NOT_INIT;
|
||||
@ -342,27 +343,30 @@ int ObLSCompleteMigrationDagNet::update_migration_status_(ObLS *ls)
|
||||
// TODO: muwei should not do this before ls create finished.
|
||||
if (OB_FAIL(ls->get_migration_status(current_migration_status))) {
|
||||
LOG_WARN("failed to get migration status", K(ret), K(ctx_));
|
||||
} else if (OB_FAIL(ctx_.get_result(result))) {
|
||||
LOG_WARN("failed to get result", K(ret), K(ctx_));
|
||||
} else if (ctx_.is_failed()) {
|
||||
if (ObMigrationOpType::REBUILD_LS_OP == ctx_.arg_.type_) {
|
||||
if (ObMigrationStatus::OB_MIGRATION_STATUS_REBUILD != current_migration_status) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("migration status is unexpected", K(ret), K(current_migration_status), K(ctx_));
|
||||
} else if (OB_NO_NEED_REBUILD == result) {
|
||||
new_migration_status = ObMigrationStatus::OB_MIGRATION_STATUS_NONE;
|
||||
} else {
|
||||
new_migration_status = current_migration_status;
|
||||
}
|
||||
} else if (OB_FAIL(ObMigrationStatusHelper::trans_fail_status(current_migration_status, new_migration_status))) {
|
||||
LOG_WARN("failed to trans fail status", K(ret), K(current_migration_status), K(new_migration_status));
|
||||
}
|
||||
} else {
|
||||
if (ObMigrationOpType::REBUILD_LS_OP == ctx_.arg_.type_
|
||||
&& OB_FAIL(ls->clear_saved_info())) {
|
||||
LOG_WARN("failed to clear ls saved info", K(ret), KPC(ls));
|
||||
} else {
|
||||
new_migration_status = ObMigrationStatus::OB_MIGRATION_STATUS_NONE;
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
//TODO(muwei): no need clear
|
||||
} else if (ObMigrationOpType::REBUILD_LS_OP == ctx_.arg_.type_ && ObMigrationStatus::OB_MIGRATION_STATUS_NONE == new_migration_status
|
||||
&& OB_FAIL(ls->clear_saved_info())) {
|
||||
LOG_WARN("failed to clear ls saved info", K(ret), KPC(ls));
|
||||
} else if (OB_FAIL(ls->set_migration_status(new_migration_status, ctx_.rebuild_seq_))) {
|
||||
LOG_WARN("failed to set migration status", K(ret), K(current_migration_status), K(new_migration_status), K(ctx_));
|
||||
} else {
|
||||
|
||||
@ -1035,6 +1035,7 @@ int ObStartMigrationTask::deal_with_local_ls_()
|
||||
int64_t proposal_id = 0;
|
||||
ObLSMeta local_ls_meta;
|
||||
logservice::ObLogService *log_service = nullptr;
|
||||
DEBUG_SYNC(BEFORE_MIGRATION_DISABLE_VOTE);
|
||||
if (!is_inited_) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("start migration task do not init", K(ret));
|
||||
@ -1059,6 +1060,16 @@ int ObStartMigrationTask::deal_with_local_ls_()
|
||||
LOG_WARN("leader cannot as add, migrate, change dst",
|
||||
K(ret), K(role), "myaddr", MYADDR, "arg", ctx_->arg_);
|
||||
}
|
||||
} else if (ObMigrationOpType::REBUILD_LS_OP == ctx_->arg_.type_
|
||||
&& OB_FAIL(ls->disable_vote(true/*need_check*/))) {
|
||||
LOG_WARN("failed to disable vote", K(ret), KPC(ctx_));
|
||||
if (OB_OP_NOT_ALLOW == ret) {
|
||||
if (ls->is_offline() && OB_FAIL(ls->online())) {
|
||||
LOG_WARN("failed to online ls", K(ret), KPC(ctx_));
|
||||
} else {
|
||||
ret = OB_NO_NEED_REBUILD;
|
||||
}
|
||||
}
|
||||
} else if (OB_FAIL(ls->offline())) {
|
||||
LOG_WARN("failed to disable log", K(ret), KPC(ctx_));
|
||||
}
|
||||
@ -3012,9 +3023,6 @@ int ObDataTabletsMigrationTask::ls_online_()
|
||||
} else if (OB_ISNULL(ls = ls_handle_.get_ls())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("ls should not be NULL", K(ret), KP(ls));
|
||||
} else if (ObMigrationOpType::REBUILD_LS_OP == ctx_->arg_.type_
|
||||
&& OB_FAIL(ls->disable_vote())) {
|
||||
LOG_WARN("failed to disable vote", K(ret), KPC(ctx_));
|
||||
} else if (OB_FAIL(ls->online())) {
|
||||
LOG_WARN("failed to online ls", K(ret), KPC(ctx_));
|
||||
} else {
|
||||
|
||||
@ -564,6 +564,7 @@ bool ObMigrationUtils::is_need_retry_error(const int err)
|
||||
case OB_CHECKSUM_ERROR :
|
||||
case OB_DDL_SSTABLE_RANGE_CROSS :
|
||||
case OB_TENANT_NOT_EXIST :
|
||||
case OB_NO_NEED_REBUILD :
|
||||
bret = false;
|
||||
break;
|
||||
default:
|
||||
|
||||
@ -542,7 +542,7 @@ public:
|
||||
// @param[out] null.
|
||||
DELEGATE_WITH_RET(log_handler_, enable_vote, int);
|
||||
// @breif, palf disable vote
|
||||
// @param[in] null.
|
||||
// @param[in] need_check.
|
||||
// @param[out] null.
|
||||
DELEGATE_WITH_RET(log_handler_, disable_vote, int);
|
||||
DELEGATE_WITH_RET(log_handler_, add_member, int);
|
||||
|
||||
@ -412,7 +412,11 @@ public:
|
||||
return OB_SUCCESS;
|
||||
}
|
||||
int enable_vote() { return OB_SUCCESS; }
|
||||
int disable_vote() { return OB_SUCCESS; }
|
||||
int disable_vote(const bool need_check_log_missing)
|
||||
{
|
||||
UNUSED(need_check_log_missing);
|
||||
return OB_SUCCESS;
|
||||
}
|
||||
int get_election_leader(common::ObAddr &addr) const
|
||||
{
|
||||
UNUSED(addr);
|
||||
|
||||
Reference in New Issue
Block a user