Fix fetch log streamingly bug.
This commit is contained in:
@ -892,12 +892,17 @@ int LogSlidingWindow::handle_next_submit_log_(bool &is_committed_lsn_updated)
|
|||||||
(void) set_last_submit_log_info_(begin_lsn, log_end_lsn, tmp_log_id, \
|
(void) set_last_submit_log_info_(begin_lsn, log_end_lsn, tmp_log_id, \
|
||||||
group_entry_header.get_log_proposal_id());
|
group_entry_header.get_log_proposal_id());
|
||||||
if (FOLLOWER == state_mgr_->get_role() || state_mgr_->is_leader_reconfirm()) {
|
if (FOLLOWER == state_mgr_->get_role() || state_mgr_->is_leader_reconfirm()) {
|
||||||
|
bool is_need_fetch = false;
|
||||||
|
const LSN log_committed_end_lsn = group_entry_header.get_committed_end_lsn();
|
||||||
try_update_committed_lsn_for_fetch_(log_end_lsn, tmp_log_id, \
|
try_update_committed_lsn_for_fetch_(log_end_lsn, tmp_log_id, \
|
||||||
group_entry_header.get_committed_end_lsn());
|
log_committed_end_lsn, is_need_fetch);
|
||||||
|
if (is_need_fetch) {
|
||||||
|
try_fetch_log_streamingly_(log_committed_end_lsn);
|
||||||
|
}
|
||||||
// Advance committed_end_lsn_, follower/reconfirm leader needs
|
// Advance committed_end_lsn_, follower/reconfirm leader needs
|
||||||
// the order with try_update_committed_lsn_for_fetch_() is important,
|
// the order with try_update_committed_lsn_for_fetch_() is important,
|
||||||
// this exec order can avoid trigger failure of next round fetch by sliding_cb()
|
// this exec order can avoid trigger failure of next round fetch by sliding_cb()
|
||||||
(void) try_advance_committed_lsn_(group_entry_header.get_committed_end_lsn());
|
(void) try_advance_committed_lsn_(log_committed_end_lsn);
|
||||||
is_committed_lsn_updated = true;
|
is_committed_lsn_updated = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1537,9 +1542,11 @@ void LogSlidingWindow::try_reset_last_fetch_log_info_(const LSN &expected_end_ls
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void LogSlidingWindow::try_update_committed_lsn_for_fetch_(const LSN &log_end_lsn,
|
void LogSlidingWindow::try_update_committed_lsn_for_fetch_(
|
||||||
|
const LSN &log_end_lsn,
|
||||||
const int64_t &log_id,
|
const int64_t &log_id,
|
||||||
const LSN &log_committed_end_lsn)
|
const LSN &log_committed_end_lsn,
|
||||||
|
bool &is_need_fetch)
|
||||||
{
|
{
|
||||||
bool need_update = false;
|
bool need_update = false;
|
||||||
LSN last_fetch_end_lsn;
|
LSN last_fetch_end_lsn;
|
||||||
@ -1570,9 +1577,22 @@ void LogSlidingWindow::try_update_committed_lsn_for_fetch_(const LSN &log_end_ls
|
|||||||
|| log_id == last_fetch_max_log_id_) {
|
|| log_id == last_fetch_max_log_id_) {
|
||||||
LSN committed_end_lsn;
|
LSN committed_end_lsn;
|
||||||
get_committed_end_lsn_(committed_end_lsn);
|
get_committed_end_lsn_(committed_end_lsn);
|
||||||
|
// The order is fatal:
|
||||||
|
// 1) update last_fetch_committed_end_lsn_
|
||||||
|
// 2) check last slide end_lsn to decide fetching log streamingly or not.
|
||||||
ATOMIC_STORE(&last_fetch_committed_end_lsn_.val_, log_committed_end_lsn.val_);
|
ATOMIC_STORE(&last_fetch_committed_end_lsn_.val_, log_committed_end_lsn.val_);
|
||||||
|
MEM_BARRIER();
|
||||||
|
// The committed_end_lsn may already be updated at least to this value by previous group log.
|
||||||
|
// And the logs before log_committed_end_lsn have been slid out.
|
||||||
|
// For this scenario, it need triger fetch log streamingly now.
|
||||||
|
LSN last_slide_end_lsn;
|
||||||
|
get_last_slide_end_lsn_(last_slide_end_lsn);
|
||||||
|
if (committed_end_lsn >= log_committed_end_lsn
|
||||||
|
&& last_slide_end_lsn >= log_committed_end_lsn) {
|
||||||
|
is_need_fetch = true;
|
||||||
|
}
|
||||||
PALF_LOG(INFO, "update last fetch log info", K_(palf_id), K_(self), K(last_fetch_end_lsn), K(log_id),
|
PALF_LOG(INFO, "update last fetch log info", K_(palf_id), K_(self), K(last_fetch_end_lsn), K(log_id),
|
||||||
K_(last_fetch_end_lsn), K_(last_fetch_committed_end_lsn), K(committed_end_lsn));
|
K_(last_fetch_end_lsn), K_(last_fetch_committed_end_lsn), K(committed_end_lsn), K(last_slide_end_lsn));
|
||||||
} else {
|
} else {
|
||||||
PALF_LOG(INFO, "last_fetch_max_log_id_ has changed, skip update", K_(palf_id), K_(self),
|
PALF_LOG(INFO, "last_fetch_max_log_id_ has changed, skip update", K_(palf_id), K_(self),
|
||||||
K(last_fetch_end_lsn), K(log_id), K_(last_fetch_max_log_id), K_(last_fetch_end_lsn),
|
K(last_fetch_end_lsn), K(log_id), K_(last_fetch_max_log_id), K_(last_fetch_end_lsn),
|
||||||
@ -1704,15 +1724,23 @@ int LogSlidingWindow::do_fetch_log_(const FetchTriggerType &trigger_type,
|
|||||||
// so just set it to PALF_SLIDING_WINDOW_SIZE
|
// so just set it to PALF_SLIDING_WINDOW_SIZE
|
||||||
const int64_t fetch_log_count = PALF_SLIDING_WINDOW_SIZE - skip_log_count;
|
const int64_t fetch_log_count = PALF_SLIDING_WINDOW_SIZE - skip_log_count;
|
||||||
// Update last_fetch_end_lsn_ AND last_fetch_max_log_id_
|
// Update last_fetch_end_lsn_ AND last_fetch_max_log_id_
|
||||||
|
bool need_exec_fetch = true;
|
||||||
do {
|
do {
|
||||||
ObSpinLockGuard guard(fetch_info_lock_);
|
ObSpinLockGuard guard(fetch_info_lock_);
|
||||||
const LSN tmp_end_lsn = fetch_start_lsn + fetch_log_size;
|
const LSN tmp_end_lsn = fetch_start_lsn + fetch_log_size;
|
||||||
ATOMIC_STORE(&last_fetch_end_lsn_.val_, tmp_end_lsn.val_);
|
if (FetchTriggerType::SLIDING_CB == trigger_type
|
||||||
last_fetch_max_log_id_ = fetch_start_log_id + fetch_log_count - 1;
|
&& !last_fetch_committed_end_lsn_.is_valid()) {
|
||||||
last_fetch_committed_end_lsn_.reset();
|
// Streamingly fetching may trigger more than one time,
|
||||||
|
// we need filter duplicated ops here.
|
||||||
|
need_exec_fetch = false;
|
||||||
|
} else {
|
||||||
|
ATOMIC_STORE(&last_fetch_end_lsn_.val_, tmp_end_lsn.val_);
|
||||||
|
last_fetch_max_log_id_ = fetch_start_log_id + fetch_log_count - 1;
|
||||||
|
last_fetch_committed_end_lsn_.reset();
|
||||||
|
}
|
||||||
} while(0);
|
} while(0);
|
||||||
|
|
||||||
if (OB_SUCC(ret)) {
|
if (OB_SUCC(ret) && need_exec_fetch) {
|
||||||
FetchLogType fetch_type = FETCH_LOG_FOLLOWER;
|
FetchLogType fetch_type = FETCH_LOG_FOLLOWER;
|
||||||
if (LEADER_RECONFIRM == trigger_type) {
|
if (LEADER_RECONFIRM == trigger_type) {
|
||||||
fetch_type = FETCH_LOG_LEADER_RECONFIRM;
|
fetch_type = FETCH_LOG_LEADER_RECONFIRM;
|
||||||
@ -1863,6 +1891,8 @@ int LogSlidingWindow::sliding_cb(const int64_t sn, const FixedSlidingWindowSlot
|
|||||||
log_proposal_id, log_accum_checksum);
|
log_proposal_id, log_accum_checksum);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
MEM_BARRIER(); // ensure last_slide_log_info_ has been updated before fetch log streamingly
|
||||||
|
|
||||||
if (OB_SUCC(ret)
|
if (OB_SUCC(ret)
|
||||||
&& (FOLLOWER == state_mgr_->get_role() || state_mgr_->is_leader_reconfirm())) {
|
&& (FOLLOWER == state_mgr_->get_role() || state_mgr_->is_leader_reconfirm())) {
|
||||||
// Check if need fetch log streamingly,
|
// Check if need fetch log streamingly,
|
||||||
|
|||||||
@ -304,7 +304,8 @@ private:
|
|||||||
void try_reset_last_fetch_log_info_(const LSN &expected_end_lsn, const int64_t log_id);
|
void try_reset_last_fetch_log_info_(const LSN &expected_end_lsn, const int64_t log_id);
|
||||||
void try_update_committed_lsn_for_fetch_(const LSN &expected_end_lsn,
|
void try_update_committed_lsn_for_fetch_(const LSN &expected_end_lsn,
|
||||||
const int64_t &expected_log_id,
|
const int64_t &expected_log_id,
|
||||||
const LSN &log_committed_end_lsn);
|
const LSN &log_committed_end_lsn,
|
||||||
|
bool &is_need_fetch);
|
||||||
void try_fetch_log_streamingly_(const LSN &log_end_lsn);
|
void try_fetch_log_streamingly_(const LSN &log_end_lsn);
|
||||||
int do_fetch_log_(const FetchTriggerType &trigger_type,
|
int do_fetch_log_(const FetchTriggerType &trigger_type,
|
||||||
const common::ObAddr &dest,
|
const common::ObAddr &dest,
|
||||||
|
|||||||
Reference in New Issue
Block a user