[opt] reduce the number of PushLogResp messages to the leader
This commit is contained in:
parent
6c616f2f4e
commit
bd2fea0f7e
@ -32,6 +32,7 @@ enum PushLogType
|
||||
{
|
||||
PUSH_LOG = 0,
|
||||
FETCH_LOG_RESP = 1,
|
||||
PUSH_LOG_WO_ACK = 2,
|
||||
};
|
||||
|
||||
inline const char *push_log_type_2_str(const PushLogType type)
|
||||
@ -41,6 +42,7 @@ inline const char *push_log_type_2_str(const PushLogType type)
|
||||
{
|
||||
EXTRACT_PUSH_LOG_TYPE(PUSH_LOG);
|
||||
EXTRACT_PUSH_LOG_TYPE(FETCH_LOG_RESP);
|
||||
EXTRACT_PUSH_LOG_TYPE(PUSH_LOG_WO_ACK);
|
||||
|
||||
default:
|
||||
return "Invalid Type";
|
||||
|
@ -887,10 +887,11 @@ int LogSlidingWindow::try_push_log_to_children_(const int64_t curr_proposal_id,
|
||||
const bool need_presend_log = (state_mgr_->is_leader_active()) ? true : false;
|
||||
const bool is_fetch_log = false;
|
||||
const bool need_batch_push = need_use_batch_rpc_(log_write_buf.get_total_size(), is_fetch_log);
|
||||
const PushLogType to_child_log_type = (GET_MIN_CLUSTER_VERSION() >= CLUSTER_CURRENT_VERSION)? PUSH_LOG_WO_ACK: PUSH_LOG;
|
||||
if (OB_FAIL(mm_->get_log_sync_children_list(children_list))) {
|
||||
PALF_LOG(WARN, "get_children_list failed", K(ret), K_(palf_id));
|
||||
} else if (children_list.is_valid()
|
||||
&& OB_FAIL(log_engine_->submit_push_log_req(children_list, PUSH_LOG, curr_proposal_id,
|
||||
&& OB_FAIL(log_engine_->submit_push_log_req(children_list, to_child_log_type, curr_proposal_id,
|
||||
prev_log_pid, prev_lsn, lsn, log_write_buf, need_batch_push))) {
|
||||
PALF_LOG(WARN, "submit_push_log_req failed", K(ret), K_(palf_id), K_(self));
|
||||
} else if (false == need_presend_log) {
|
||||
@ -1436,6 +1437,7 @@ int LogSlidingWindow::after_flush_log(const FlushLogCbCtx &flush_cb_ctx)
|
||||
const LSN log_end_lsn = flush_cb_ctx.lsn_ + flush_cb_ctx.total_len_;
|
||||
const int64_t cb_begin_ts = ObTimeUtility::current_time();
|
||||
bool is_fetch_log = false;
|
||||
bool need_reply_ack = true;
|
||||
PALF_LOG(TRACE, "after_flush_log begin", K_(palf_id), K_(self), K(flush_cb_ctx));
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
@ -1471,7 +1473,9 @@ int LogSlidingWindow::after_flush_log(const FlushLogCbCtx &flush_cb_ctx)
|
||||
can_exec_cb = true;
|
||||
// update log_task's flushed_ts
|
||||
log_task->set_flushed_ts(cb_begin_ts);
|
||||
is_fetch_log = log_task->is_fetch_log_type();
|
||||
PushLogType push_log_type = log_task->get_push_log_type();
|
||||
is_fetch_log = (PushLogType::FETCH_LOG_RESP == push_log_type);
|
||||
need_reply_ack = (PushLogType::PUSH_LOG_WO_ACK != push_log_type);
|
||||
}
|
||||
log_task->unlock();
|
||||
}
|
||||
@ -1484,7 +1488,9 @@ int LogSlidingWindow::after_flush_log(const FlushLogCbCtx &flush_cb_ctx)
|
||||
PALF_LOG(WARN, "get_log_task failed", K(ret), K(log_id), K_(palf_id), K_(self));
|
||||
} else {
|
||||
log_task->set_flushed_ts(cb_begin_ts);
|
||||
is_fetch_log = log_task->is_fetch_log_type();
|
||||
PushLogType push_log_type = log_task->get_push_log_type();
|
||||
is_fetch_log = (PushLogType::FETCH_LOG_RESP == push_log_type);
|
||||
need_reply_ack = (PushLogType::PUSH_LOG_WO_ACK != push_log_type);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1509,8 +1515,13 @@ int LogSlidingWindow::after_flush_log(const FlushLogCbCtx &flush_cb_ctx)
|
||||
(void) mm_->get_config_version(config_version);
|
||||
// flush op for different role
|
||||
// migrating replicas do not send responses for reducing its impact on the leader
|
||||
if (!leader.is_valid()) {
|
||||
PALF_LOG(TRACE, "current leader is invalid, cannot send ack", K(ret), K_(palf_id), K_(self),
|
||||
if (!need_reply_ack) {
|
||||
if (REACH_TIME_INTERVAL(1 * 1000 * 1000)) {
|
||||
PALF_LOG(INFO, "log_type do not need to reply ack", K(ret), K_(palf_id), K_(self),
|
||||
K(log_end_lsn), K(leader));
|
||||
}
|
||||
} else if (!leader.is_valid()) {
|
||||
PALF_LOG(INFO, "current leader is invalid, cannot send ack", K(ret), K_(palf_id), K_(self),
|
||||
K(flush_cb_ctx), K(log_end_lsn), K(leader));
|
||||
} else if (OB_UNLIKELY(config_version.is_initial_version())) {
|
||||
if (REACH_TIME_INTERVAL(1 * 1000 * 1000)) {
|
||||
@ -3380,7 +3391,7 @@ int LogSlidingWindow::receive_log(const common::ObAddr &src_server,
|
||||
LSN last_fetch_end_lsn;
|
||||
last_fetch_end_lsn.val_ = ATOMIC_LOAD(&last_fetch_end_lsn_.val_);
|
||||
if (OB_SUCC(ret)
|
||||
&& PUSH_LOG == push_log_type
|
||||
&& (PUSH_LOG == push_log_type || PUSH_LOG_WO_ACK == push_log_type)
|
||||
&& last_fetch_end_lsn.is_valid()
|
||||
&& log_end_lsn <= last_fetch_end_lsn) {
|
||||
// 只有当收到push log的end_lsn和log_id均处于last fetch范围内时
|
||||
|
Loading…
x
Reference in New Issue
Block a user