[CP] reduce the rpc count of PushLogResp

This commit is contained in:
HaHaJeff 2024-07-15 12:47:13 +00:00 committed by ob-robot
parent 409cc130af
commit 6f6d3fcd08
5 changed files with 35 additions and 7 deletions

View File

@ -291,8 +291,8 @@ TEST_F(TestObSimpleLogClusterConfigChangeMockEle, test_committed_end_lsn_after_r
// 1. leader can not commit logs
block_pcode(leader_idx, ObRpcPacketCode::OB_LOG_PUSH_RESP);
block_pcode(leader_idx, ObRpcPacketCode::OB_BATCH);
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 100, id));
sleep(1);
EXPECT_GT(leader.palf_handle_impl_->sw_.last_submit_lsn_, leader.palf_handle_impl_->sw_.committed_end_lsn_);
// 2. remove D
@ -303,6 +303,7 @@ TEST_F(TestObSimpleLogClusterConfigChangeMockEle, test_committed_end_lsn_after_r
// 3. leader can commit logs
unblock_pcode(leader_idx, ObRpcPacketCode::OB_LOG_PUSH_RESP);
unblock_pcode(leader_idx, ObRpcPacketCode::OB_BATCH);
// 4. check if the leader can commit logs after D has been removed from match_lsn_map
EXPECT_UNTIL_EQ(leader.palf_handle_impl_->sw_.committed_end_lsn_, leader.palf_handle_impl_->sw_.last_submit_end_lsn_);
@ -400,6 +401,7 @@ TEST_F(TestObSimpleLogClusterConfigChangeMockEle, test_committed_end_lsn_after_r
// 2. leader can not commit logs
block_pcode(leader_idx, ObRpcPacketCode::OB_LOG_PUSH_RESP);
block_pcode(leader_idx, ObRpcPacketCode::OB_BATCH);
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 100, id));
sleep(1);
EXPECT_GT(leader.palf_handle_impl_->sw_.last_submit_lsn_, leader.palf_handle_impl_->sw_.committed_end_lsn_);
@ -412,6 +414,7 @@ TEST_F(TestObSimpleLogClusterConfigChangeMockEle, test_committed_end_lsn_after_r
// 3. leader can commit logs
unblock_pcode(leader_idx, ObRpcPacketCode::OB_LOG_PUSH_RESP);
unblock_pcode(leader_idx, ObRpcPacketCode::OB_BATCH);
// 4. check if the leader can commit logs after C has been removed from match_lsn_map
EXPECT_UNTIL_EQ(leader.palf_handle_impl_->sw_.committed_end_lsn_, leader.palf_handle_impl_->sw_.last_submit_end_lsn_);

View File

@ -1433,6 +1433,7 @@ int LogSlidingWindow::after_flush_log(const FlushLogCbCtx &flush_cb_ctx)
const int64_t log_id = flush_cb_ctx.log_id_;
const LSN log_end_lsn = flush_cb_ctx.lsn_ + flush_cb_ctx.total_len_;
const int64_t cb_begin_ts = ObTimeUtility::current_time();
bool is_fetch_log = false;
PALF_LOG(TRACE, "after_flush_log begin", K_(palf_id), K_(self), K(flush_cb_ctx));
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
@ -1468,6 +1469,7 @@ int LogSlidingWindow::after_flush_log(const FlushLogCbCtx &flush_cb_ctx)
can_exec_cb = true;
// update log_task's flushed_ts
log_task->set_flushed_ts(cb_begin_ts);
is_fetch_log = log_task->is_fetch_log_type();
}
log_task->unlock();
}
@ -1480,6 +1482,7 @@ int LogSlidingWindow::after_flush_log(const FlushLogCbCtx &flush_cb_ctx)
PALF_LOG(WARN, "get_log_task failed", K(ret), K(log_id), K_(palf_id), K_(self));
} else {
log_task->set_flushed_ts(cb_begin_ts);
is_fetch_log = log_task->is_fetch_log_type();
}
}
@ -1512,7 +1515,7 @@ int LogSlidingWindow::after_flush_log(const FlushLogCbCtx &flush_cb_ctx)
PALF_LOG(INFO, "migrating replicas do not send responses", K(ret), K_(palf_id), K_(self),
K(log_end_lsn), K(leader));
}
} else if (OB_FAIL(submit_push_log_resp_(leader, flush_cb_ctx.curr_proposal_id_, log_end_lsn))) {
} else if (OB_FAIL(submit_push_log_resp_(leader, flush_cb_ctx.curr_proposal_id_, log_end_lsn, is_fetch_log))) {
PALF_LOG(WARN, "submit_push_log_resp failed", K(ret), K_(palf_id), K_(self), K(leader), K(flush_cb_ctx));
} else {}
} else {}
@ -3296,7 +3299,7 @@ int LogSlidingWindow::receive_log(const common::ObAddr &src_server,
} else if (need_send_ack) {
// This log matches with msg and it has been flushed, just sending ack directly.
const int64_t curr_proposal_id = state_mgr_->get_proposal_id();
if (OB_FAIL(submit_push_log_resp_(src_server, curr_proposal_id, log_end_lsn))) {
if (OB_FAIL(submit_push_log_resp_(src_server, curr_proposal_id, log_end_lsn, PushLogType::FETCH_LOG_RESP == push_log_type))) {
PALF_LOG(WARN, "submit_push_log_resp failed", K(ret), K_(palf_id), K_(self), K(src_server));
} else {
PALF_LOG(INFO, "submit_push_log_resp succ", K(ret), K_(palf_id), K_(self), K(src_server), K(curr_proposal_id),
@ -3352,6 +3355,7 @@ int LogSlidingWindow::receive_log(const common::ObAddr &src_server,
log_task->set_group_log_checksum(group_log_data_checksum);
(void) log_task->set_freezed();
log_task->set_freeze_ts(ObTimeUtility::current_time());
log_task->set_push_log_type(push_log_type);
}
log_task->unlock();
}
@ -3409,17 +3413,19 @@ int LogSlidingWindow::submit_push_log_resp(const common::ObAddr &server)
const int64_t curr_proposal_id = state_mgr_->get_proposal_id();
LSN committed_end_lsn;
get_committed_end_lsn_(committed_end_lsn);
ret = submit_push_log_resp_(server, curr_proposal_id, committed_end_lsn);
const bool is_fetch_log = false;
ret = submit_push_log_resp_(server, curr_proposal_id, committed_end_lsn, is_fetch_log);
}
return ret;
}
int LogSlidingWindow::submit_push_log_resp_(const common::ObAddr &server,
const int64_t &msg_proposal_id,
const LSN &log_end_lsn)
const LSN &log_end_lsn,
const bool &is_fetch_log)
{
int ret = OB_SUCCESS;
const bool is_need_batch = need_use_batch_rpc_(0);
const bool is_need_batch = need_use_batch_rpc_(0) || is_fetch_log;
if (state_mgr_->is_allow_vote() &&
OB_FAIL(log_engine_->submit_push_log_resp(server, msg_proposal_id, log_end_lsn, is_need_batch))) {
PALF_LOG(WARN, "submit_push_log_resp failed", K(ret), K_(palf_id), K_(self), K(server), K(log_end_lsn));

View File

@ -461,7 +461,10 @@ private:
int64_t &log_id,
int64_t &log_proposal_id);
int leader_broadcast_committed_info_(const LSN &committed_end_lsn);
int submit_push_log_resp_(const common::ObAddr &server, const int64_t &msg_proposal_id, const LSN &lsn);
int submit_push_log_resp_(const common::ObAddr &server,
const int64_t &msg_proposal_id,
const LSN &lsn,
const bool &is_fetch_log);
inline int try_push_log_to_paxos_follower_(const int64_t curr_proposal_id,
const int64_t prev_log_pid,
const LSN &prev_lsn,

View File

@ -142,6 +142,7 @@ LogTask::LogTask()
freeze_ts_(OB_INVALID_TIMESTAMP),
submit_ts_(OB_INVALID_TIMESTAMP),
flushed_ts_(OB_INVALID_TIMESTAMP),
push_log_type_(PushLogType::PUSH_LOG),
lock_()
{
reset();
@ -436,5 +437,15 @@ void LogTask::set_flushed_ts(const int64_t ts)
ATOMIC_STORE(&flushed_ts_, ts);
}
void LogTask::set_push_log_type(const PushLogType &push_log_type)
{
ATOMIC_STORE(&push_log_type_, push_log_type);
}
bool LogTask::is_fetch_log_type() const
{
return PushLogType::FETCH_LOG_RESP == ATOMIC_LOAD(&push_log_type_);
}
} // namespace palf
} // namespace oceanbase

View File

@ -18,6 +18,7 @@
#include "fixed_sliding_window.h"
#include "log_define.h" // block_id_t
#include "lsn.h"
#include "log_req.h" // PushLogType
namespace oceanbase
{
@ -141,10 +142,13 @@ public:
void set_freeze_ts(const int64_t ts);
void set_submit_ts(const int64_t ts);
void set_flushed_ts(const int64_t ts);
void set_push_log_type(const PushLogType &push_log_type);
int64_t get_gen_ts() const { return ATOMIC_LOAD(&(gen_ts_)); }
int64_t get_freeze_ts() const { return ATOMIC_LOAD(&(freeze_ts_)); }
int64_t get_submit_ts() const { return ATOMIC_LOAD(&(submit_ts_)); }
int64_t get_flushed_ts() const { return ATOMIC_LOAD(&(flushed_ts_)); }
PushLogType get_push_log_type() const { return ATOMIC_LOAD(&push_log_type_);}
bool is_fetch_log_type() const;
TO_STRING_KV(K_(header), K_(state_map), K_(ref_cnt),
K_(gen_ts), K_(freeze_ts), K_(submit_ts), K_(flushed_ts),
"gen_to_freeze cost time", freeze_ts_ - gen_ts_,
@ -162,6 +166,7 @@ private:
mutable int64_t freeze_ts_;
mutable int64_t submit_ts_;
mutable int64_t flushed_ts_;
mutable PushLogType push_log_type_;
mutable common::ObLatch lock_;
};
} // end namespace palf