reorder replay status switch to follower process
This commit is contained in:
@ -1006,10 +1006,10 @@ int ObLogHandler::pend_submit_replay_log()
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
} else if (FALSE_IT(id = id_)) {
|
||||
} else if (OB_FAIL(replay_service_->set_submit_log_pending(id))) {
|
||||
CLOG_LOG(WARN, "failed to set_submit_log_pending", K(ret), K(id));
|
||||
} else if (OB_FAIL(replay_service_->block_submit_log(id))) {
|
||||
CLOG_LOG(WARN, "failed to block_submit_log", K(ret), K(id));
|
||||
} else {
|
||||
CLOG_LOG(INFO, "set_submit_log_pending success", K(ret), K(id));
|
||||
CLOG_LOG(INFO, "block_submit_log success", K(ret), K(id));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -1022,10 +1022,10 @@ int ObLogHandler::restore_submit_replay_log()
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
} else if (FALSE_IT(id = id_)) {
|
||||
} else if (OB_FAIL(replay_service_->erase_submit_log_pending(id))) {
|
||||
CLOG_LOG(WARN, "failed to erase_submit_log_pending", K(ret), K(id));
|
||||
} else if (OB_FAIL(replay_service_->unblock_submit_log(id))) {
|
||||
CLOG_LOG(WARN, "failed to unblock_submit_log", K(ret), K(id));
|
||||
} else {
|
||||
CLOG_LOG(INFO, "erase_submit_log_pending success", K(ret), K(id));
|
||||
CLOG_LOG(INFO, "unblock_submit_log success", K(ret), K(id));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
@ -473,7 +473,7 @@ int ObLogReplayService::is_enabled(const share::ObLSID &id, bool &is_enabled)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObLogReplayService::set_submit_log_pending(const share::ObLSID &id)
|
||||
int ObLogReplayService::block_submit_log(const share::ObLSID &id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObReplayStatus *replay_status = NULL;
|
||||
@ -487,12 +487,12 @@ int ObLogReplayService::set_submit_log_pending(const share::ObLSID &id)
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
CLOG_LOG(WARN, "replay status is not exist", K(ret), K(id));
|
||||
} else {
|
||||
replay_status->set_pending();
|
||||
replay_status->block_submit();
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObLogReplayService::erase_submit_log_pending(const share::ObLSID &id)
|
||||
int ObLogReplayService::unblock_submit_log(const share::ObLSID &id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObReplayStatus *replay_status = NULL;
|
||||
@ -506,7 +506,7 @@ int ObLogReplayService::erase_submit_log_pending(const share::ObLSID &id)
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
CLOG_LOG(WARN, "replay status is not exist", K(ret), K(id));
|
||||
} else {
|
||||
replay_status->erase_pending();
|
||||
replay_status->unblock_submit();
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
@ -140,8 +140,8 @@ public:
|
||||
const share::SCN &base_scn);
|
||||
int disable(const share::ObLSID &id);
|
||||
int is_enabled(const share::ObLSID &id, bool &is_enabled);
|
||||
int set_submit_log_pending(const share::ObLSID &id);
|
||||
int erase_submit_log_pending(const share::ObLSID &id);
|
||||
int block_submit_log(const share::ObLSID &id);
|
||||
int unblock_submit_log(const share::ObLSID &id);
|
||||
int switch_to_leader(const share::ObLSID &id);
|
||||
int switch_to_follower(const share::ObLSID &id,
|
||||
const palf::LSN &begin_lsn);
|
||||
|
@ -759,23 +759,26 @@ bool ObReplayStatus::is_enabled_without_lock() const
|
||||
return is_replay_enabled_();
|
||||
}
|
||||
|
||||
void ObReplayStatus::set_pending()
|
||||
void ObReplayStatus::block_submit()
|
||||
{
|
||||
WLockGuard guard(rolelock_);
|
||||
is_submit_blocked_ = true;
|
||||
CLOG_LOG(INFO, "replay status set pending", KPC(this));
|
||||
CLOG_LOG(INFO, "replay status block submit", KPC(this));
|
||||
}
|
||||
|
||||
void ObReplayStatus::erase_pending()
|
||||
void ObReplayStatus::unblock_submit()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
do {
|
||||
WLockGuard guard(rolelock_);
|
||||
is_submit_blocked_ = false;
|
||||
CLOG_LOG(INFO, "replay status erase pending", KPC(this));
|
||||
CLOG_LOG(INFO, "replay status unblock submit", KPC(this));
|
||||
} while (0);
|
||||
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_FAIL(submit_task_to_replay_service_(submit_log_task_))) {
|
||||
RLockGuard rlock_guard(rwlock_);
|
||||
if (!is_enabled_) {
|
||||
// do nothing
|
||||
} else if (OB_FAIL(submit_task_to_replay_service_(submit_log_task_))) {
|
||||
CLOG_LOG(ERROR, "failed to submit submit_log_task to replay service", K(submit_log_task_),
|
||||
KPC(this), K(ret));
|
||||
}
|
||||
@ -802,19 +805,31 @@ void ObReplayStatus::switch_to_leader()
|
||||
void ObReplayStatus::switch_to_follower(const palf::LSN &begin_lsn)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
// 1.switch role after reset iterator, or fscb may push submit task with
|
||||
// old iterator, which will fetch logs smaller than max decided scn.
|
||||
// 2.submit task after switch role, or this task may be discarded if role
|
||||
// still be leader, and no more submit task being submitted.
|
||||
do {
|
||||
WLockGuard guard(rolelock_);
|
||||
WLockGuardWithRetryInterval wguard(rwlock_, WRLOCK_TRY_THRESHOLD, WRLOCK_RETRY_INTERVAL);
|
||||
if (!is_enabled_) {
|
||||
// do nothing
|
||||
} else {
|
||||
(void)submit_log_task_.reset_iterator(palf_handle_, begin_lsn);
|
||||
}
|
||||
} while (0);
|
||||
do {
|
||||
WLockGuard role_guard(rolelock_);
|
||||
role_ = FOLLOWER;
|
||||
} while (0);
|
||||
WLockGuardWithRetryInterval wguard(rwlock_, WRLOCK_TRY_THRESHOLD, WRLOCK_RETRY_INTERVAL);
|
||||
if (is_enabled_) {
|
||||
(void)submit_log_task_.reset_iterator(palf_handle_, begin_lsn);
|
||||
if (OB_FAIL(submit_task_to_replay_service_(submit_log_task_))) {
|
||||
CLOG_LOG(ERROR, "failed to submit submit_log_task to replay service", K(submit_log_task_),
|
||||
KPC(this), K(ret));
|
||||
}
|
||||
} else {
|
||||
|
||||
RLockGuard rguard(rwlock_);
|
||||
if (!is_enabled_) {
|
||||
// do nothing
|
||||
} else if (OB_FAIL(submit_task_to_replay_service_(submit_log_task_))) {
|
||||
CLOG_LOG(ERROR, "failed to submit submit_log_task to replay service", K(submit_log_task_),
|
||||
KPC(this), K(ret));
|
||||
} else {
|
||||
// success
|
||||
}
|
||||
CLOG_LOG(INFO, "replay status switch_to_follower", KPC(this), K(begin_lsn));
|
||||
}
|
||||
@ -909,20 +924,12 @@ int ObReplayStatus::update_end_offset(const LSN &lsn)
|
||||
CLOG_LOG(ERROR, "invalid arguments", K(ls_id_), K(lsn), K(ret));
|
||||
} else if (!need_submit_log()) {
|
||||
// leader do nothing, keep submit_log_task recording last round status as follower
|
||||
} else {
|
||||
} else if (OB_FAIL(submit_log_task_.update_committed_end_lsn(lsn))) {
|
||||
// update offset and submit submit_log_task
|
||||
{
|
||||
if (OB_FAIL(submit_log_task_.update_committed_end_lsn(lsn))) {
|
||||
CLOG_LOG(ERROR, "failed to update_apply_end_offset", KR(ret), K(ls_id_),
|
||||
K(lsn));
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(submit_task_to_replay_service_(submit_log_task_))) {
|
||||
CLOG_LOG(ERROR, "failed to submit submit_log_task to replay Service", K(submit_log_task_),
|
||||
KPC(this), K(ret));
|
||||
}
|
||||
}
|
||||
CLOG_LOG(ERROR, "failed to update_apply_end_offset", KR(ret), K(ls_id_), K(lsn));
|
||||
} else if (OB_FAIL(submit_task_to_replay_service_(submit_log_task_))) {
|
||||
CLOG_LOG(ERROR, "failed to submit submit_log_task to replay Service", K(submit_log_task_),
|
||||
KPC(this), K(ret));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
@ -310,7 +310,8 @@ public:
|
||||
K(committed_end_lsn_),
|
||||
K(next_to_submit_scn_),
|
||||
K(base_lsn_),
|
||||
K(base_scn_));
|
||||
K(base_scn_),
|
||||
K(iterator_));
|
||||
private:
|
||||
int update_next_to_submit_lsn_(const palf::LSN &lsn);
|
||||
int update_next_to_submit_scn_(const share::SCN &scn);
|
||||
@ -465,8 +466,8 @@ public:
|
||||
// for follower speed_limit
|
||||
// 1. avoid more replay cause OOM because speed_limit cannot work when freeze
|
||||
// 2. quick improving max_undecided_log to reduce freeze cost
|
||||
void set_pending();
|
||||
void erase_pending();
|
||||
void block_submit();
|
||||
void unblock_submit();
|
||||
|
||||
bool need_submit_log() const;
|
||||
bool try_rdlock()
|
||||
|
Reference in New Issue
Block a user