diff --git a/mittest/logservice/test_ob_simple_log_config_change_mock_ele.cpp b/mittest/logservice/test_ob_simple_log_config_change_mock_ele.cpp index 8f5b714e1..31b084ed1 100644 --- a/mittest/logservice/test_ob_simple_log_config_change_mock_ele.cpp +++ b/mittest/logservice/test_ob_simple_log_config_change_mock_ele.cpp @@ -291,8 +291,8 @@ TEST_F(TestObSimpleLogClusterConfigChangeMockEle, test_committed_end_lsn_after_r // 1. leader can not commit logs block_pcode(leader_idx, ObRpcPacketCode::OB_LOG_PUSH_RESP); + block_pcode(leader_idx, ObRpcPacketCode::OB_BATCH); EXPECT_EQ(OB_SUCCESS, submit_log(leader, 100, id)); - sleep(1); EXPECT_GT(leader.palf_handle_impl_->sw_.last_submit_lsn_, leader.palf_handle_impl_->sw_.committed_end_lsn_); // 2. remove D @@ -303,6 +303,7 @@ TEST_F(TestObSimpleLogClusterConfigChangeMockEle, test_committed_end_lsn_after_r // 3. leader can commit logs unblock_pcode(leader_idx, ObRpcPacketCode::OB_LOG_PUSH_RESP); + unblock_pcode(leader_idx, ObRpcPacketCode::OB_BATCH); // 4. check if the leader can commit logs after D has been removed from match_lsn_map EXPECT_UNTIL_EQ(leader.palf_handle_impl_->sw_.committed_end_lsn_, leader.palf_handle_impl_->sw_.last_submit_end_lsn_); @@ -400,6 +401,7 @@ TEST_F(TestObSimpleLogClusterConfigChangeMockEle, test_committed_end_lsn_after_r // 2. leader can not commit logs block_pcode(leader_idx, ObRpcPacketCode::OB_LOG_PUSH_RESP); + block_pcode(leader_idx, ObRpcPacketCode::OB_BATCH); EXPECT_EQ(OB_SUCCESS, submit_log(leader, 100, id)); sleep(1); EXPECT_GT(leader.palf_handle_impl_->sw_.last_submit_lsn_, leader.palf_handle_impl_->sw_.committed_end_lsn_); @@ -412,6 +414,7 @@ TEST_F(TestObSimpleLogClusterConfigChangeMockEle, test_committed_end_lsn_after_r // 3. leader can commit logs unblock_pcode(leader_idx, ObRpcPacketCode::OB_LOG_PUSH_RESP); + unblock_pcode(leader_idx, ObRpcPacketCode::OB_BATCH); // 4. check if the leader can commit logs after C has been removed from match_lsn_map EXPECT_UNTIL_EQ(leader.palf_handle_impl_->sw_.committed_end_lsn_, leader.palf_handle_impl_->sw_.last_submit_end_lsn_); diff --git a/src/logservice/palf/log_sliding_window.cpp b/src/logservice/palf/log_sliding_window.cpp index 7d4743e80..47de59794 100644 --- a/src/logservice/palf/log_sliding_window.cpp +++ b/src/logservice/palf/log_sliding_window.cpp @@ -1433,6 +1433,7 @@ int LogSlidingWindow::after_flush_log(const FlushLogCbCtx &flush_cb_ctx) const int64_t log_id = flush_cb_ctx.log_id_; const LSN log_end_lsn = flush_cb_ctx.lsn_ + flush_cb_ctx.total_len_; const int64_t cb_begin_ts = ObTimeUtility::current_time(); + bool is_fetch_log = false; PALF_LOG(TRACE, "after_flush_log begin", K_(palf_id), K_(self), K(flush_cb_ctx)); if (IS_NOT_INIT) { ret = OB_NOT_INIT; @@ -1468,6 +1469,7 @@ int LogSlidingWindow::after_flush_log(const FlushLogCbCtx &flush_cb_ctx) can_exec_cb = true; // update log_task's flushed_ts log_task->set_flushed_ts(cb_begin_ts); + is_fetch_log = log_task->is_fetch_log_type(); } log_task->unlock(); } @@ -1480,6 +1482,7 @@ int LogSlidingWindow::after_flush_log(const FlushLogCbCtx &flush_cb_ctx) PALF_LOG(WARN, "get_log_task failed", K(ret), K(log_id), K_(palf_id), K_(self)); } else { log_task->set_flushed_ts(cb_begin_ts); + is_fetch_log = log_task->is_fetch_log_type(); } } @@ -1512,7 +1515,7 @@ int LogSlidingWindow::after_flush_log(const FlushLogCbCtx &flush_cb_ctx) PALF_LOG(INFO, "migrating replicas do not send responses", K(ret), K_(palf_id), K_(self), K(log_end_lsn), K(leader)); } - } else if (OB_FAIL(submit_push_log_resp_(leader, flush_cb_ctx.curr_proposal_id_, log_end_lsn))) { + } else if (OB_FAIL(submit_push_log_resp_(leader, flush_cb_ctx.curr_proposal_id_, log_end_lsn, is_fetch_log))) { PALF_LOG(WARN, "submit_push_log_resp failed", K(ret), K_(palf_id), K_(self), K(leader), K(flush_cb_ctx)); } else {} } else {} @@ -3296,7 +3299,7 @@ int LogSlidingWindow::receive_log(const common::ObAddr &src_server, } else if (need_send_ack) { // This log matches with msg and it has been flushed, just sending ack directly. const int64_t curr_proposal_id = state_mgr_->get_proposal_id(); - if (OB_FAIL(submit_push_log_resp_(src_server, curr_proposal_id, log_end_lsn))) { + if (OB_FAIL(submit_push_log_resp_(src_server, curr_proposal_id, log_end_lsn, PushLogType::FETCH_LOG_RESP == push_log_type))) { PALF_LOG(WARN, "submit_push_log_resp failed", K(ret), K_(palf_id), K_(self), K(src_server)); } else { PALF_LOG(INFO, "submit_push_log_resp succ", K(ret), K_(palf_id), K_(self), K(src_server), K(curr_proposal_id), @@ -3352,6 +3355,7 @@ int LogSlidingWindow::receive_log(const common::ObAddr &src_server, log_task->set_group_log_checksum(group_log_data_checksum); (void) log_task->set_freezed(); log_task->set_freeze_ts(ObTimeUtility::current_time()); + log_task->set_push_log_type(push_log_type); } log_task->unlock(); } @@ -3409,17 +3413,19 @@ int LogSlidingWindow::submit_push_log_resp(const common::ObAddr &server) const int64_t curr_proposal_id = state_mgr_->get_proposal_id(); LSN committed_end_lsn; get_committed_end_lsn_(committed_end_lsn); - ret = submit_push_log_resp_(server, curr_proposal_id, committed_end_lsn); + const bool is_fetch_log = false; + ret = submit_push_log_resp_(server, curr_proposal_id, committed_end_lsn, is_fetch_log); } return ret; } int LogSlidingWindow::submit_push_log_resp_(const common::ObAddr &server, const int64_t &msg_proposal_id, - const LSN &log_end_lsn) + const LSN &log_end_lsn, + const bool &is_fetch_log) { int ret = OB_SUCCESS; - const bool is_need_batch = need_use_batch_rpc_(0); + const bool is_need_batch = need_use_batch_rpc_(0) || is_fetch_log; if (state_mgr_->is_allow_vote() && OB_FAIL(log_engine_->submit_push_log_resp(server, msg_proposal_id, log_end_lsn, is_need_batch))) { PALF_LOG(WARN, "submit_push_log_resp failed", K(ret), K_(palf_id), K_(self), K(server), K(log_end_lsn)); diff --git a/src/logservice/palf/log_sliding_window.h b/src/logservice/palf/log_sliding_window.h index af4b9689e..06f3e4f2a 100755 --- a/src/logservice/palf/log_sliding_window.h +++ b/src/logservice/palf/log_sliding_window.h @@ -461,7 +461,10 @@ private: int64_t &log_id, int64_t &log_proposal_id); int leader_broadcast_committed_info_(const LSN &committed_end_lsn); - int submit_push_log_resp_(const common::ObAddr &server, const int64_t &msg_proposal_id, const LSN &lsn); + int submit_push_log_resp_(const common::ObAddr &server, + const int64_t &msg_proposal_id, + const LSN &lsn, + const bool &is_fetch_log); inline int try_push_log_to_paxos_follower_(const int64_t curr_proposal_id, const int64_t prev_log_pid, const LSN &prev_lsn, diff --git a/src/logservice/palf/log_task.cpp b/src/logservice/palf/log_task.cpp index b0dcd500b..d28841de1 100644 --- a/src/logservice/palf/log_task.cpp +++ b/src/logservice/palf/log_task.cpp @@ -142,6 +142,7 @@ LogTask::LogTask() freeze_ts_(OB_INVALID_TIMESTAMP), submit_ts_(OB_INVALID_TIMESTAMP), flushed_ts_(OB_INVALID_TIMESTAMP), + push_log_type_(PushLogType::PUSH_LOG), lock_() { reset(); @@ -436,5 +437,15 @@ void LogTask::set_flushed_ts(const int64_t ts) ATOMIC_STORE(&flushed_ts_, ts); } +void LogTask::set_push_log_type(const PushLogType &push_log_type) +{ + ATOMIC_STORE(&push_log_type_, push_log_type); +} + +bool LogTask::is_fetch_log_type() const +{ + return PushLogType::FETCH_LOG_RESP == ATOMIC_LOAD(&push_log_type_); +} + } // namespace palf } // namespace oceanbase diff --git a/src/logservice/palf/log_task.h b/src/logservice/palf/log_task.h index 0405a7e2f..8820d2f6f 100644 --- a/src/logservice/palf/log_task.h +++ b/src/logservice/palf/log_task.h @@ -18,6 +18,7 @@ #include "fixed_sliding_window.h" #include "log_define.h" // block_id_t #include "lsn.h" +#include "log_req.h" // PushLogType namespace oceanbase { @@ -141,10 +142,13 @@ public: void set_freeze_ts(const int64_t ts); void set_submit_ts(const int64_t ts); void set_flushed_ts(const int64_t ts); + void set_push_log_type(const PushLogType &push_log_type); int64_t get_gen_ts() const { return ATOMIC_LOAD(&(gen_ts_)); } int64_t get_freeze_ts() const { return ATOMIC_LOAD(&(freeze_ts_)); } int64_t get_submit_ts() const { return ATOMIC_LOAD(&(submit_ts_)); } int64_t get_flushed_ts() const { return ATOMIC_LOAD(&(flushed_ts_)); } + PushLogType get_push_log_type() const { return ATOMIC_LOAD(&push_log_type_);} + bool is_fetch_log_type() const; TO_STRING_KV(K_(header), K_(state_map), K_(ref_cnt), K_(gen_ts), K_(freeze_ts), K_(submit_ts), K_(flushed_ts), "gen_to_freeze cost time", freeze_ts_ - gen_ts_, @@ -162,6 +166,7 @@ private: mutable int64_t freeze_ts_; mutable int64_t submit_ts_; mutable int64_t flushed_ts_; + mutable PushLogType push_log_type_; mutable common::ObLatch lock_; }; } // end namespace palf