From bd2fea0f7e7a3cac3daeb2220603c7f49894781e Mon Sep 17 00:00:00 2001 From: BinChenn Date: Mon, 21 Oct 2024 06:16:35 +0000 Subject: [PATCH] [opt] reduce the number of PushLogResp messages to the leader --- src/logservice/palf/log_req.h | 2 ++ src/logservice/palf/log_sliding_window.cpp | 23 ++++++++++++++++------ 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/src/logservice/palf/log_req.h b/src/logservice/palf/log_req.h index f5ae9426f..af4d29c2b 100644 --- a/src/logservice/palf/log_req.h +++ b/src/logservice/palf/log_req.h @@ -32,6 +32,7 @@ enum PushLogType { PUSH_LOG = 0, FETCH_LOG_RESP = 1, + PUSH_LOG_WO_ACK = 2, }; inline const char *push_log_type_2_str(const PushLogType type) @@ -41,6 +42,7 @@ inline const char *push_log_type_2_str(const PushLogType type) { EXTRACT_PUSH_LOG_TYPE(PUSH_LOG); EXTRACT_PUSH_LOG_TYPE(FETCH_LOG_RESP); + EXTRACT_PUSH_LOG_TYPE(PUSH_LOG_WO_ACK); default: return "Invalid Type"; diff --git a/src/logservice/palf/log_sliding_window.cpp b/src/logservice/palf/log_sliding_window.cpp index c4f40b090..796780362 100644 --- a/src/logservice/palf/log_sliding_window.cpp +++ b/src/logservice/palf/log_sliding_window.cpp @@ -887,10 +887,11 @@ int LogSlidingWindow::try_push_log_to_children_(const int64_t curr_proposal_id, const bool need_presend_log = (state_mgr_->is_leader_active()) ? true : false; const bool is_fetch_log = false; const bool need_batch_push = need_use_batch_rpc_(log_write_buf.get_total_size(), is_fetch_log); + const PushLogType to_child_log_type = (GET_MIN_CLUSTER_VERSION() >= CLUSTER_CURRENT_VERSION)? PUSH_LOG_WO_ACK: PUSH_LOG; if (OB_FAIL(mm_->get_log_sync_children_list(children_list))) { PALF_LOG(WARN, "get_children_list failed", K(ret), K_(palf_id)); } else if (children_list.is_valid() - && OB_FAIL(log_engine_->submit_push_log_req(children_list, PUSH_LOG, curr_proposal_id, + && OB_FAIL(log_engine_->submit_push_log_req(children_list, to_child_log_type, curr_proposal_id, prev_log_pid, prev_lsn, lsn, log_write_buf, need_batch_push))) { PALF_LOG(WARN, "submit_push_log_req failed", K(ret), K_(palf_id), K_(self)); } else if (false == need_presend_log) { @@ -1436,6 +1437,7 @@ int LogSlidingWindow::after_flush_log(const FlushLogCbCtx &flush_cb_ctx) const LSN log_end_lsn = flush_cb_ctx.lsn_ + flush_cb_ctx.total_len_; const int64_t cb_begin_ts = ObTimeUtility::current_time(); bool is_fetch_log = false; + bool need_reply_ack = true; PALF_LOG(TRACE, "after_flush_log begin", K_(palf_id), K_(self), K(flush_cb_ctx)); if (IS_NOT_INIT) { ret = OB_NOT_INIT; @@ -1471,7 +1473,9 @@ int LogSlidingWindow::after_flush_log(const FlushLogCbCtx &flush_cb_ctx) can_exec_cb = true; // update log_task's flushed_ts log_task->set_flushed_ts(cb_begin_ts); - is_fetch_log = log_task->is_fetch_log_type(); + PushLogType push_log_type = log_task->get_push_log_type(); + is_fetch_log = (PushLogType::FETCH_LOG_RESP == push_log_type); + need_reply_ack = (PushLogType::PUSH_LOG_WO_ACK != push_log_type); } log_task->unlock(); } @@ -1484,7 +1488,9 @@ int LogSlidingWindow::after_flush_log(const FlushLogCbCtx &flush_cb_ctx) PALF_LOG(WARN, "get_log_task failed", K(ret), K(log_id), K_(palf_id), K_(self)); } else { log_task->set_flushed_ts(cb_begin_ts); - is_fetch_log = log_task->is_fetch_log_type(); + PushLogType push_log_type = log_task->get_push_log_type(); + is_fetch_log = (PushLogType::FETCH_LOG_RESP == push_log_type); + need_reply_ack = (PushLogType::PUSH_LOG_WO_ACK != push_log_type); } } @@ -1509,8 +1515,13 @@ int LogSlidingWindow::after_flush_log(const FlushLogCbCtx &flush_cb_ctx) (void) mm_->get_config_version(config_version); // flush op for different role // migrating replicas do not send responses for reducing its impact on the leader - if (!leader.is_valid()) { - PALF_LOG(TRACE, "current leader is invalid, cannot send ack", K(ret), K_(palf_id), K_(self), + if (!need_reply_ack) { + if (REACH_TIME_INTERVAL(1 * 1000 * 1000)) { + PALF_LOG(INFO, "log_type do not need to reply ack", K(ret), K_(palf_id), K_(self), + K(log_end_lsn), K(leader)); + } + } else if (!leader.is_valid()) { + PALF_LOG(INFO, "current leader is invalid, cannot send ack", K(ret), K_(palf_id), K_(self), K(flush_cb_ctx), K(log_end_lsn), K(leader)); } else if (OB_UNLIKELY(config_version.is_initial_version())) { if (REACH_TIME_INTERVAL(1 * 1000 * 1000)) { @@ -3380,7 +3391,7 @@ int LogSlidingWindow::receive_log(const common::ObAddr &src_server, LSN last_fetch_end_lsn; last_fetch_end_lsn.val_ = ATOMIC_LOAD(&last_fetch_end_lsn_.val_); if (OB_SUCC(ret) - && PUSH_LOG == push_log_type + && (PUSH_LOG == push_log_type || PUSH_LOG_WO_ACK == push_log_type) && last_fetch_end_lsn.is_valid() && log_end_lsn <= last_fetch_end_lsn) { // 只有当收到push log的end_lsn和log_id均处于last fetch范围内时