From fb85dd28bde4a8c4bf7c05a2cb496be3f65b1b16 Mon Sep 17 00:00:00 2001 From: obdev Date: Wed, 2 Nov 2022 16:07:01 +0000 Subject: [PATCH] Reply committed_info for fetch log request. --- src/logservice/palf/log_engine.cpp | 25 ++++++ src/logservice/palf/log_engine.h | 7 +- src/logservice/palf/log_net_service.cpp | 19 +++++ src/logservice/palf/log_net_service.h | 6 +- src/logservice/palf/log_sliding_window.cpp | 92 +++++++++++++++++++--- src/logservice/palf/log_sliding_window.h | 7 ++ src/logservice/palf/palf_handle_impl.cpp | 42 ++++++++-- src/logservice/palf/palf_handle_impl.h | 4 + 8 files changed, 181 insertions(+), 21 deletions(-) diff --git a/src/logservice/palf/log_engine.cpp b/src/logservice/palf/log_engine.cpp index ca1efb2234..e83ecf6b05 100644 --- a/src/logservice/palf/log_engine.cpp +++ b/src/logservice/palf/log_engine.cpp @@ -937,6 +937,31 @@ int LogEngine::submit_learner_keepalive_resp(const common::ObAddr &server, return ret; } +int LogEngine::submit_committed_info_req( + const common::ObAddr &server, + const int64_t &msg_proposal_id, + const int64_t prev_log_id, + const int64_t &prev_log_proposal_id, + const LSN &committed_end_lsn) +{ + int ret = OB_SUCCESS; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + PALF_LOG(ERROR, "LogEngine not init", K(ret), KPC(this)); + } else if (OB_FAIL(log_net_service_.submit_committed_info_req( + server, msg_proposal_id, + prev_log_id, prev_log_proposal_id, committed_end_lsn))) { + PALF_LOG(ERROR, "LogNetService submit_committed_info_req failed", K(ret), + KPC(this), K(server), + K(prev_log_id), K(prev_log_proposal_id), K(committed_end_lsn)); + } else { + PALF_LOG(TRACE, "submit_committed_info_req success", K(ret), KPC(this), + K(server), K(msg_proposal_id), K(prev_log_id), + K(prev_log_proposal_id), K(committed_end_lsn)); + } + return ret; +} + LogMeta LogEngine::get_log_meta() const { ObSpinLockGuard guard(log_meta_lock_); diff --git a/src/logservice/palf/log_engine.h b/src/logservice/palf/log_engine.h index c588f80a4a..3d24bcb0ea 100644 --- a/src/logservice/palf/log_engine.h +++ b/src/logservice/palf/log_engine.h @@ -320,7 +320,12 @@ public: int submit_notify_rebuild_req(const ObAddr &server, const LSN &base_lsn, const LogInfo &base_prev_log_info); - + int submit_committed_info_req( + const ObAddr &server, + const int64_t &msg_proposal_id, + const int64_t prev_log_id, + const int64_t &prev_log_proposal_id, + const LSN &committed_end_lsn); // @brief: this function used to send committed_info to child replica // @param[in] member_list: current paxos member list // @param[in] msg_proposal_id: the current proposal_id diff --git a/src/logservice/palf/log_net_service.cpp b/src/logservice/palf/log_net_service.cpp index 142c671502..cff9bbebc9 100644 --- a/src/logservice/palf/log_net_service.cpp +++ b/src/logservice/palf/log_net_service.cpp @@ -94,6 +94,25 @@ int LogNetService::submit_push_log_req( return ret; } +int LogNetService::submit_committed_info_req( + const common::ObAddr &server, + const int64_t &msg_proposal_id, + const int64_t prev_log_id, + const int64_t &prev_log_proposal_id, + const LSN &committed_end_lsn) +{ + int ret = OB_SUCCESS; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + PALF_LOG(ERROR, "LogNetService has not inited!!!", K(ret)); + } else { + CommittedInfo committed_info_req(msg_proposal_id, prev_log_id, + prev_log_proposal_id, committed_end_lsn); + ret = post_request_to_server_(server, committed_info_req); + } + return ret; +} + int LogNetService::submit_push_log_resp( const ObAddr &server, const int64_t &msg_proposal_id, diff --git a/src/logservice/palf/log_net_service.h b/src/logservice/palf/log_net_service.h index 2488b07844..7634e3a230 100644 --- a/src/logservice/palf/log_net_service.h +++ b/src/logservice/palf/log_net_service.h @@ -188,7 +188,11 @@ public: int submit_retire_child_req(const common::ObAddr &server, const LogLearner &parent_itself); int submit_learner_keepalive_req(const common::ObAddr &server, const LogLearner &sender_itself); int submit_learner_keepalive_resp(const common::ObAddr &server, const LogLearner &sender_itself); - + int submit_committed_info_req(const common::ObAddr &server, + const int64_t &msg_proposal_id, + const int64_t prev_log_id, + const int64_t &prev_log_proposal_id, + const LSN &committed_end_lsn); template int submit_committed_info_req( const List &member_list, diff --git a/src/logservice/palf/log_sliding_window.cpp b/src/logservice/palf/log_sliding_window.cpp index 0d1524a6dd..cc875fefe5 100644 --- a/src/logservice/palf/log_sliding_window.cpp +++ b/src/logservice/palf/log_sliding_window.cpp @@ -3273,7 +3273,60 @@ int LogSlidingWindow::try_update_match_lsn_map_(const common::ObAddr &server, co return ret; } -int LogSlidingWindow::leader_broadcast_committed_info_(const LSN &committed_end_lsn) +int LogSlidingWindow::try_send_committed_info(const common::ObAddr &server, + const LSN &log_lsn, + const LSN &log_end_lsn, + const int64_t &log_proposal_id) +{ + int ret = OB_SUCCESS; + LSN committed_end_lsn; + get_committed_end_lsn_(committed_end_lsn); + const int64_t curr_proposal_id = state_mgr_->get_proposal_id(); + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + } else if (!log_lsn.is_valid() || !log_end_lsn.is_valid() || INVALID_PROPOSAL_ID == log_proposal_id) { + ret = OB_INVALID_ARGUMENT; + } else if (state_mgr_->is_leader_active()) { + // Leader + int64_t last_log_id = OB_INVALID_LOG_ID; + int64_t last_log_proposal_id = INVALID_PROPOSAL_ID; + if (OB_FAIL(leader_get_committed_log_info_(committed_end_lsn, last_log_id, last_log_proposal_id)) + || OB_INVALID_LOG_ID == last_log_id) { + // no need send committed_info + } else if (OB_FAIL(log_engine_->submit_committed_info_req(server, curr_proposal_id, + last_log_id, log_proposal_id, committed_end_lsn))) { + PALF_LOG(WARN, "submit_committed_info_req failed", K(ret), K_(palf_id), K_(self), K(server)); + } + } else { + // Follower + int64_t last_slide_log_id = OB_INVALID_LOG_ID; + int64_t last_slide_log_ts = OB_INVALID_TIMESTAMP; + LSN last_slide_lsn; + LSN last_slide_end_lsn; + int64_t last_slide_log_pid = INVALID_PROPOSAL_ID; + int64_t last_slide_accum_checksum = -1; + get_last_slide_log_info_(last_slide_log_id, last_slide_log_ts, last_slide_lsn, \ + last_slide_end_lsn, last_slide_log_pid, last_slide_accum_checksum); + if (log_lsn == last_slide_lsn + && log_proposal_id == last_slide_log_pid + && committed_end_lsn == log_end_lsn) { + // If arg log does match with last slide log, follower can send committed_info to server. + OB_ASSERT(log_end_lsn == last_slide_end_lsn); + if (OB_FAIL(log_engine_->submit_committed_info_req(server, curr_proposal_id, + last_slide_log_id, log_proposal_id, committed_end_lsn))) { + PALF_LOG(WARN, "submit_committed_info_req failed", K(ret), K_(palf_id), K_(self), K(server)); + } else { + PALF_LOG(TRACE, "follower try_send_committed_info success", K(ret), K_(palf_id), K_(self), + K(last_slide_log_id), K(log_proposal_id), K(committed_end_lsn)); + } + } + } + return ret; +} + +int LogSlidingWindow::leader_get_committed_log_info_(const LSN &committed_end_lsn, + int64_t &log_id, + int64_t &log_proposal_id) { int ret = OB_SUCCESS; const int64_t max_log_id = get_max_log_id(); @@ -3289,25 +3342,38 @@ int LogSlidingWindow::leader_broadcast_committed_info_(const LSN &committed_end_ // log_task is invalid or not freezed, that means there is maybe new log after committed_end_lsn. // No need broadcast commonitted_info. } else { - int64_t log_proposal_id = INVALID_PROPOSAL_ID; LSN log_end_lsn; log_task->lock(); log_proposal_id = log_task->get_proposal_id(); log_end_lsn = log_task->get_begin_lsn() + LogGroupEntryHeader::HEADER_SER_SIZE + log_task->get_data_len(); log_task->unlock(); if (log_end_lsn == committed_end_lsn) { - ObMemberList dst_member_list; - const int64_t curr_proposal_id = state_mgr_->get_proposal_id(); - if (OB_FAIL(mm_->get_curr_member_list(dst_member_list))) { - PALF_LOG(WARN, "get_curr_member_list failed", K(ret), K_(palf_id), K_(self)); - } else if (OB_FAIL(dst_member_list.remove_server(self_))) { - PALF_LOG(WARN, "dst_member_list remove_server failed", K(ret), K_(palf_id), K_(self)); - } else if (dst_member_list.is_valid() - && OB_FAIL(log_engine_->submit_committed_info_req(dst_member_list, curr_proposal_id, - max_log_id, log_proposal_id, committed_end_lsn))) { - } else {} + log_id = max_log_id; } - PALF_LOG(TRACE, "leader_broadcast_committed_info_", K(ret), K_(palf_id), K_(self), K(max_log_id)); + } + return ret; +} + +int LogSlidingWindow::leader_broadcast_committed_info_(const LSN &committed_end_lsn) +{ + int ret = OB_SUCCESS; + const int64_t curr_proposal_id = state_mgr_->get_proposal_id(); + int64_t log_id = OB_INVALID_LOG_ID; + int64_t log_proposal_id = INVALID_PROPOSAL_ID; + ObMemberList dst_member_list; + if (OB_FAIL(leader_get_committed_log_info_(committed_end_lsn, log_id, log_proposal_id)) + || OB_INVALID_LOG_ID == log_id) { + // no need send committed_info + } else if (OB_FAIL(mm_->get_curr_member_list(dst_member_list))) { + PALF_LOG(WARN, "get_curr_member_list failed", K(ret), K_(palf_id), K_(self)); + } else if (OB_FAIL(dst_member_list.remove_server(self_))) { + PALF_LOG(WARN, "dst_member_list remove_server failed", K(ret), K_(palf_id), K_(self)); + } else if (dst_member_list.is_valid() + && OB_FAIL(log_engine_->submit_committed_info_req(dst_member_list, curr_proposal_id, + log_id, log_proposal_id, committed_end_lsn))) { + PALF_LOG(WARN, "submit_committed_info_req failed", K(ret), K_(palf_id), K_(self), K(log_id)); + } else { + PALF_LOG(TRACE, "leader_broadcast_committed_info_", K(ret), K_(palf_id), K_(self), K(log_id)); } return ret; } diff --git a/src/logservice/palf/log_sliding_window.h b/src/logservice/palf/log_sliding_window.h index b583637c15..240ad1dcc9 100644 --- a/src/logservice/palf/log_sliding_window.h +++ b/src/logservice/palf/log_sliding_window.h @@ -190,6 +190,10 @@ public: virtual int set_location_cache_cb(PalfLocationCacheCb *lc_cb); virtual int reset_location_cache_cb(); virtual int advance_reuse_lsn(const LSN &flush_log_end_lsn); + virtual int try_send_committed_info(const common::ObAddr &server, + const LSN &log_lsn, + const LSN &log_end_lsn, + const int64_t &log_proposal_id); TO_STRING_KV(K_(palf_id), K_(self), K_(lsn_allocator), K_(group_buffer), \ K_(last_submit_lsn), K_(last_submit_end_lsn), K_(last_submit_log_id), K_(last_submit_log_pid), \ K_(max_flushed_lsn), K_(max_flushed_end_lsn), K_(max_flushed_log_pid), K_(committed_end_lsn), \ @@ -303,6 +307,9 @@ private: const char *buf, const int64_t buf_len, int64_t &min_log_ts_ns); + int leader_get_committed_log_info_(const LSN &committed_end_lsn, + int64_t &log_id, + int64_t &log_proposal_id); int leader_broadcast_committed_info_(const LSN &committed_end_lsn); int submit_push_log_resp_(const common::ObAddr &server, const int64_t &msg_proposal_id, const LSN &lsn); inline int try_push_log_to_paxos_follower_(const int64_t curr_proposal_id, diff --git a/src/logservice/palf/palf_handle_impl.cpp b/src/logservice/palf/palf_handle_impl.cpp index 82dc411dfc..3214b29e77 100644 --- a/src/logservice/palf/palf_handle_impl.cpp +++ b/src/logservice/palf/palf_handle_impl.cpp @@ -2760,13 +2760,36 @@ int PalfHandleImpl::fetch_log_from_storage(const common::ObAddr &server, return ret; } +int PalfHandleImpl::try_send_committed_info_(const ObAddr &server, + const LSN &log_lsn, + const LSN &log_end_lsn, + const int64_t &log_proposal_id) +{ + int ret = OB_SUCCESS; + AccessMode access_mode; + if (!log_lsn.is_valid() || !log_end_lsn.is_valid() || INVALID_PROPOSAL_ID == log_proposal_id) { + ret = OB_INVALID_ARGUMENT; + } else if (OB_FAIL(mode_mgr_.get_access_mode(access_mode))) { + PALF_LOG(WARN, "get_access_mode failed", K(ret), KPC(this)); + } else if (AccessMode::APPEND == access_mode) { + // No need send committed_info in APPEND mode, because leader will genenrate keeapAlive log periodically. + } else if (OB_FAIL(sw_.try_send_committed_info(server, log_lsn, log_end_lsn, log_proposal_id))) { + PALF_LOG(TRACE, "try_send_committed_info failed", K(ret), K_(palf_id), K_(self), + K(server), K(log_lsn), K(log_end_lsn), K(log_proposal_id)); + } else { + PALF_LOG(TRACE, "try_send_committed_info_ success", K(ret), K_(palf_id), K_(self), K(server), + K(log_lsn), K(log_end_lsn), K(log_proposal_id)); + } + return ret; +} + int PalfHandleImpl::fetch_log_from_storage_(const common::ObAddr &server, - const FetchLogType fetch_type, - const int64_t &msg_proposal_id, - const LSN &prev_lsn, - const LSN &fetch_start_lsn, - const int64_t fetch_log_size, - const int64_t fetch_log_count) + const FetchLogType fetch_type, + const int64_t &msg_proposal_id, + const LSN &prev_lsn, + const LSN &fetch_start_lsn, + const int64_t fetch_log_size, + const int64_t fetch_log_count) { int ret = OB_SUCCESS; PalfGroupBufferIterator iterator; @@ -2822,6 +2845,7 @@ int PalfHandleImpl::fetch_log_from_storage_(const common::ObAddr &server, bool is_reach_end = false; int64_t fetched_count = 0; LSN curr_log_end_lsn = curr_lsn + curr_group_entry.get_group_entry_size(); + LSN prev_log_end_lsn; int64_t prev_log_proposal_id = prev_log_info.log_proposal_id_; while (OB_SUCC(ret) && !is_reach_size_limit && !is_reach_count_limit && !is_reach_end && OB_SUCC(iterator.next())) { @@ -2853,12 +2877,18 @@ int PalfHandleImpl::fetch_log_from_storage_(const common::ObAddr &server, K(prev_log_proposal_id), K(fetch_end_lsn), K(curr_log_end_lsn), K(is_reach_size_limit), K(fetch_log_size), K(fetched_count), K(is_reach_count_limit)); each_round_prev_lsn = curr_lsn; + prev_log_end_lsn = curr_log_end_lsn; prev_log_proposal_id = curr_group_entry.get_header().get_log_proposal_id(); } } if (OB_ITER_END == ret) { ret = OB_SUCCESS; } + // try send committed_info to server + if (OB_SUCC(ret)) { + RLockGuard guard(lock_); + (void) try_send_committed_info_(server, each_round_prev_lsn, prev_log_end_lsn, prev_log_proposal_id); + } } if (OB_FAIL(ret) && OB_ERR_OUT_OF_LOWER_BOUND == ret) { diff --git a/src/logservice/palf/palf_handle_impl.h b/src/logservice/palf/palf_handle_impl.h index 58c3163bd7..883c2593ad 100644 --- a/src/logservice/palf/palf_handle_impl.h +++ b/src/logservice/palf/palf_handle_impl.h @@ -867,6 +867,10 @@ private: int construct_palf_base_info_(const LSN &max_committed_lsn, PalfBaseInfo &palf_base_info); int append_disk_log_to_sw_(const LSN &start_lsn); + int try_send_committed_info_(const common::ObAddr &server, + const LSN &log_lsn, + const LSN &log_end_lsn, + const int64_t &log_proposal_id); int fetch_log_from_storage_(const common::ObAddr &server, const FetchLogType fetch_type, const int64_t &msg_proposal_id,