optimize implement of get_max_applied_scn()

This commit is contained in:
obdev
2024-02-10 09:10:04 +00:00
committed by ob-robot
parent 9f7f594b74
commit fc51a52d1b
4 changed files with 245 additions and 10 deletions

View File

@ -692,23 +692,28 @@ int ObApplyStatus::get_max_applied_scn(SCN &scn)
} else if (OB_UNLIKELY(is_in_stop_state_)) {
// stop后不会再上任, 始终返回上轮作为leader时缓存的值
} else if (FOLLOWER == role_) {
palf::LSN palf_end_lsn;
//The max_applied_cb_scn_ undergoes asynchronous updating, and under circumstances where a
//transiting to a follower role, there exists a possibility that its recorded value might underestimate the actual one.
//Upon a log stream replica's shift from the leader role to a follower role, it guarantees the
//application of every log entry that has been confirmed. Thus, while in the follower phase, the
//value of max_applied_cb_scn_ can be securely incremented to match palf_committed_end_scn_.
palf::LSN apply_end_lsn;
SCN palf_end_scn;
bool is_done = false;
if (OB_FAIL(is_apply_done(is_done, apply_end_lsn))) {
const SCN cur_palf_committed_end_scn = palf_committed_end_scn_.atomic_load();
if (max_applied_cb_scn_ > cur_palf_committed_end_scn) {
ret = OB_ERR_UNEXPECTED;
CLOG_LOG(ERROR, "invalid max_applied_cb_scn", KPC(this));
} else if (max_applied_cb_scn_ == cur_palf_committed_end_scn) {
//no need to push up
} else if (OB_FAIL(is_apply_done(is_done, apply_end_lsn))) {
CLOG_LOG(WARN, "check is_apply_done failed", K(ret), KPC(this));
} else if (!is_done) {
// follower期间cb未完全回调之前暂不做任何更新
// 始终返回上轮作为leader时缓存的值
// 所有cb回调完成后, 尝试推进一次最大连续回调位点
} else if (OB_FAIL(palf_handle_.get_end_scn(palf_end_scn))) {
CLOG_LOG(WARN, "get_end_scn failed", K(ret), KPC(this));
} else if (OB_FAIL(palf_handle_.get_end_lsn(palf_end_lsn))) {
CLOG_LOG(WARN, "get_end_lsn failed", K(ret), KPC(this));
} else if (palf_end_lsn == apply_end_lsn) {
max_applied_cb_scn_ = palf_end_scn;
CLOG_LOG(INFO, "update max_applied_cb_scn_", K(ret), KPC(this));
} else if (max_applied_cb_scn_ < cur_palf_committed_end_scn) {
max_applied_cb_scn_ = cur_palf_committed_end_scn;
CLOG_LOG(INFO, "update max_applied_cb_scn_", K(cur_palf_committed_end_scn), KPC(this));
}
} else if ((!last_check_scn.is_valid()) || last_check_scn == max_applied_cb_scn_) {
if (OB_FAIL(update_last_check_scn_())) {
@ -800,7 +805,9 @@ void ObApplyStatus::reset_meta()
lib::ObMutexGuard guard(mutex_);
last_check_scn_.reset();
max_applied_cb_scn_.reset();
// palf_committed_end_scn_ also should be reset along with palf_committed_end_lsn_.
palf_committed_end_lsn_.val_ = 0;
palf_committed_end_scn_.reset();
}
int ObApplyStatus::submit_task_to_apply_service_(ObApplyServiceTask &task)