diff --git a/src/logservice/palf/log_sliding_window.cpp b/src/logservice/palf/log_sliding_window.cpp index 585b53da8f..61d5a8010c 100644 --- a/src/logservice/palf/log_sliding_window.cpp +++ b/src/logservice/palf/log_sliding_window.cpp @@ -892,12 +892,17 @@ int LogSlidingWindow::handle_next_submit_log_(bool &is_committed_lsn_updated) (void) set_last_submit_log_info_(begin_lsn, log_end_lsn, tmp_log_id, \ group_entry_header.get_log_proposal_id()); if (FOLLOWER == state_mgr_->get_role() || state_mgr_->is_leader_reconfirm()) { + bool is_need_fetch = false; + const LSN log_committed_end_lsn = group_entry_header.get_committed_end_lsn(); try_update_committed_lsn_for_fetch_(log_end_lsn, tmp_log_id, \ - group_entry_header.get_committed_end_lsn()); + log_committed_end_lsn, is_need_fetch); + if (is_need_fetch) { + try_fetch_log_streamingly_(log_committed_end_lsn); + } // Advance committed_end_lsn_, follower/reconfirm leader needs // the order with try_update_committed_lsn_for_fetch_() is important, // this exec order can avoid trigger failure of next round fetch by sliding_cb() - (void) try_advance_committed_lsn_(group_entry_header.get_committed_end_lsn()); + (void) try_advance_committed_lsn_(log_committed_end_lsn); is_committed_lsn_updated = true; } } @@ -1537,9 +1542,11 @@ void LogSlidingWindow::try_reset_last_fetch_log_info_(const LSN &expected_end_ls } } -void LogSlidingWindow::try_update_committed_lsn_for_fetch_(const LSN &log_end_lsn, +void LogSlidingWindow::try_update_committed_lsn_for_fetch_( + const LSN &log_end_lsn, const int64_t &log_id, - const LSN &log_committed_end_lsn) + const LSN &log_committed_end_lsn, + bool &is_need_fetch) { bool need_update = false; LSN last_fetch_end_lsn; @@ -1570,9 +1577,22 @@ void LogSlidingWindow::try_update_committed_lsn_for_fetch_(const LSN &log_end_ls || log_id == last_fetch_max_log_id_) { LSN committed_end_lsn; get_committed_end_lsn_(committed_end_lsn); + // The order is fatal: + // 1) update last_fetch_committed_end_lsn_ + // 2) check last slide end_lsn to decide fetching log streamingly or not. ATOMIC_STORE(&last_fetch_committed_end_lsn_.val_, log_committed_end_lsn.val_); + MEM_BARRIER(); + // The committed_end_lsn may already be updated at least to this value by previous group log. + // And the logs before log_committed_end_lsn have been slid out. + // For this scenario, it need triger fetch log streamingly now. + LSN last_slide_end_lsn; + get_last_slide_end_lsn_(last_slide_end_lsn); + if (committed_end_lsn >= log_committed_end_lsn + && last_slide_end_lsn >= log_committed_end_lsn) { + is_need_fetch = true; + } PALF_LOG(INFO, "update last fetch log info", K_(palf_id), K_(self), K(last_fetch_end_lsn), K(log_id), - K_(last_fetch_end_lsn), K_(last_fetch_committed_end_lsn), K(committed_end_lsn)); + K_(last_fetch_end_lsn), K_(last_fetch_committed_end_lsn), K(committed_end_lsn), K(last_slide_end_lsn)); } else { PALF_LOG(INFO, "last_fetch_max_log_id_ has changed, skip update", K_(palf_id), K_(self), K(last_fetch_end_lsn), K(log_id), K_(last_fetch_max_log_id), K_(last_fetch_end_lsn), @@ -1704,15 +1724,23 @@ int LogSlidingWindow::do_fetch_log_(const FetchTriggerType &trigger_type, // so just set it to PALF_SLIDING_WINDOW_SIZE const int64_t fetch_log_count = PALF_SLIDING_WINDOW_SIZE - skip_log_count; // Update last_fetch_end_lsn_ AND last_fetch_max_log_id_ + bool need_exec_fetch = true; do { ObSpinLockGuard guard(fetch_info_lock_); const LSN tmp_end_lsn = fetch_start_lsn + fetch_log_size; - ATOMIC_STORE(&last_fetch_end_lsn_.val_, tmp_end_lsn.val_); - last_fetch_max_log_id_ = fetch_start_log_id + fetch_log_count - 1; - last_fetch_committed_end_lsn_.reset(); + if (FetchTriggerType::SLIDING_CB == trigger_type + && !last_fetch_committed_end_lsn_.is_valid()) { + // Streamingly fetching may trigger more than one time, + // we need filter duplicated ops here. + need_exec_fetch = false; + } else { + ATOMIC_STORE(&last_fetch_end_lsn_.val_, tmp_end_lsn.val_); + last_fetch_max_log_id_ = fetch_start_log_id + fetch_log_count - 1; + last_fetch_committed_end_lsn_.reset(); + } } while(0); - if (OB_SUCC(ret)) { + if (OB_SUCC(ret) && need_exec_fetch) { FetchLogType fetch_type = FETCH_LOG_FOLLOWER; if (LEADER_RECONFIRM == trigger_type) { fetch_type = FETCH_LOG_LEADER_RECONFIRM; @@ -1863,6 +1891,8 @@ int LogSlidingWindow::sliding_cb(const int64_t sn, const FixedSlidingWindowSlot log_proposal_id, log_accum_checksum); } + MEM_BARRIER(); // ensure last_slide_log_info_ has been updated before fetch log streamingly + if (OB_SUCC(ret) && (FOLLOWER == state_mgr_->get_role() || state_mgr_->is_leader_reconfirm())) { // Check if need fetch log streamingly, diff --git a/src/logservice/palf/log_sliding_window.h b/src/logservice/palf/log_sliding_window.h index a692da7de1..6a23bd5084 100644 --- a/src/logservice/palf/log_sliding_window.h +++ b/src/logservice/palf/log_sliding_window.h @@ -304,7 +304,8 @@ private: void try_reset_last_fetch_log_info_(const LSN &expected_end_lsn, const int64_t log_id); void try_update_committed_lsn_for_fetch_(const LSN &expected_end_lsn, const int64_t &expected_log_id, - const LSN &log_committed_end_lsn); + const LSN &log_committed_end_lsn, + bool &is_need_fetch); void try_fetch_log_streamingly_(const LSN &log_end_lsn); int do_fetch_log_(const FetchTriggerType &trigger_type, const common::ObAddr &dest,