diff --git a/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h b/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h index efa1cd35be..ddc52a9016 100644 --- a/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h +++ b/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h @@ -924,6 +924,7 @@ PCODE_DEF(OB_LOG_GET_PALF_STAT, 0x151D) PCODE_DEF(OB_LOG_NOTIFY_FETCH_LOG, 0x151E) PCODE_DEF(OB_LOG_GET_STAT, 0x151F) PCODE_DEF(OB_LOG_FORCE_SET_LS_AS_SINGLE_REPLICA, 0x1520) +PCODE_DEF(OB_LOG_BATCH_FETCH_RESP, 0X1523) // 1531-1550 for obesi // PCODE_DEF(OB_ESI_IS_EXIST, 0x1531) diff --git a/mittest/logservice/env/ob_simple_log_server.cpp b/mittest/logservice/env/ob_simple_log_server.cpp index ec9624faf3..e5d0010707 100644 --- a/mittest/logservice/env/ob_simple_log_server.cpp +++ b/mittest/logservice/env/ob_simple_log_server.cpp @@ -622,6 +622,9 @@ int ObLogDeliver::handle_req_(rpc::ObRequest &req) case obrpc::OB_LOG_FETCH_REQ: { PROCESS(LogFetchReqP); } + case obrpc::OB_LOG_BATCH_FETCH_RESP: { + PROCESS(LogBatchFetchRespP); + } case obrpc::OB_LOG_PREPARE_REQ: { PROCESS(LogPrepareReqP); } diff --git a/mittest/logservice/test_ob_simple_log_arb.cpp b/mittest/logservice/test_ob_simple_log_arb.cpp index 66be6d469a..e1736651a9 100644 --- a/mittest/logservice/test_ob_simple_log_arb.cpp +++ b/mittest/logservice/test_ob_simple_log_arb.cpp @@ -589,9 +589,9 @@ TEST_F(TestObSimpleLogClusterArbService, test_multi_meta_block) ASSERT_EQ(OB_SUCCESS, get_palf_handle_lite(OB_SERVER_TENANT_ID, id, arb_server, arb_guard)); PalfHandleLite *arb_palf = dynamic_cast(arb_guard.palf_handle_impl_); LogEngine *log_engine = &arb_palf->log_engine_; - LSN meta_tail = log_engine->log_meta_storage_.log_tail_; LogStorage *meta_storage = &log_engine->log_meta_storage_; EXPECT_EQ(OB_SUCCESS, log_engine->append_log_meta_(log_engine->log_meta_)); + LSN meta_tail = log_engine->log_meta_storage_.log_tail_; ASSERT_NE(meta_tail, LSN(log_engine->log_meta_storage_.logical_block_size_)); } EXPECT_EQ(OB_SUCCESS, restart_paxos_groups()); diff --git a/src/logservice/palf/log_engine.cpp b/src/logservice/palf/log_engine.cpp index 14e27db73d..9bbdeb4266 100644 --- a/src/logservice/palf/log_engine.cpp +++ b/src/logservice/palf/log_engine.cpp @@ -1031,6 +1031,24 @@ int LogEngine::submit_fetch_log_req(const ObAddr &server, return ret; } +int LogEngine::submit_batch_fetch_log_resp(const common::ObAddr &server, + const int64_t msg_proposal_id, + const int64_t prev_log_proposal_id, + const LSN &prev_lsn, + const LSN &curr_lsn, + const LogWriteBuf &write_buf) +{ + int ret = OB_SUCCESS; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + } else { + ret = log_net_service_.submit_batch_fetch_log_resp( + server, msg_proposal_id, prev_log_proposal_id, + prev_lsn, curr_lsn, write_buf); + } + return ret; +} + int LogEngine::submit_notify_rebuild_req(const ObAddr &server, const LSN &base_lsn, const LogInfo &base_prev_log_info) diff --git a/src/logservice/palf/log_engine.h b/src/logservice/palf/log_engine.h index a4c70a1feb..68334dd22d 100644 --- a/src/logservice/palf/log_engine.h +++ b/src/logservice/palf/log_engine.h @@ -347,7 +347,12 @@ public: const int64_t fetch_log_size, const int64_t fetch_log_count, const int64_t accepted_mode_pid); - + virtual int submit_batch_fetch_log_resp(const common::ObAddr &server, + const int64_t msg_proposal_id, + const int64_t prev_log_proposal_id, + const LSN &prev_lsn, + const LSN &curr_lsn, + const LogWriteBuf &write_buf); virtual int submit_register_parent_req(const common::ObAddr &server, const LogLearner &child_itself, const bool is_to_leader); diff --git a/src/logservice/palf/log_io_task.cpp b/src/logservice/palf/log_io_task.cpp index 20148b1c39..1d8eb4f74f 100644 --- a/src/logservice/palf/log_io_task.cpp +++ b/src/logservice/palf/log_io_task.cpp @@ -623,7 +623,7 @@ int BatchLogIOFlushLogTask::push_flush_cb_to_thread_pool_(int tg_id, IPalfEnvImp LogIOFlushLogTask *io_task = io_task_array_[i]; if (NULL == io_task) { PALF_LOG(WARN, "io_task is nullptr, may be its' epoch has changed", K(ret), KP(io_task), - KPC(this)); + KPC(this)); } else if (FALSE_IT(io_task->push_cb_into_cb_pool_ts_ = current_time)) { } else if (OB_FAIL(push_task_into_cb_thread_pool(tg_id, io_task))) { // avoid memory leak when push task into cb thread pool failed. diff --git a/src/logservice/palf/log_net_service.cpp b/src/logservice/palf/log_net_service.cpp index f6a249d771..5d045835a7 100644 --- a/src/logservice/palf/log_net_service.cpp +++ b/src/logservice/palf/log_net_service.cpp @@ -159,6 +159,26 @@ int LogNetService::submit_fetch_log_req( return ret; } +int LogNetService::submit_batch_fetch_log_resp( + const common::ObAddr &server, + const int64_t msg_proposal_id, + const int64_t prev_log_proposal_id, + const LSN &prev_lsn, + const LSN &curr_lsn, + const LogWriteBuf &write_buf) +{ + int ret = OB_SUCCESS; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + PALF_LOG(ERROR, "LogNetService not inited!!!", K(ret), K(palf_id_)); + } else { + LogBatchFetchResp batch_fetch_log_resp(msg_proposal_id, prev_log_proposal_id, + prev_lsn, curr_lsn, write_buf); + ret = post_request_to_server_(server, batch_fetch_log_resp); + } + return ret; +} + int LogNetService::submit_notify_rebuild_req( const ObAddr &server, const LSN &base_lsn, diff --git a/src/logservice/palf/log_net_service.h b/src/logservice/palf/log_net_service.h index b9da819371..27dfa783d6 100644 --- a/src/logservice/palf/log_net_service.h +++ b/src/logservice/palf/log_net_service.h @@ -128,6 +128,14 @@ public: const int64_t fetch_log_count, const int64_t accepted_mode_pid); + int submit_batch_fetch_log_resp( + const common::ObAddr &server, + const int64_t msg_proposal_id, + const int64_t prev_log_proposal_id, + const LSN &prev_lsn, + const LSN &curr_lsn, + const LogWriteBuf &write_buf); + int submit_notify_rebuild_req( const ObAddr &server, const LSN &base_lsn, @@ -261,8 +269,7 @@ int LogNetService::post_request_to_server_( { int ret = common::OB_SUCCESS; if (OB_FAIL(log_rpc_->post_request(server, palf_id_, req))) { - // PALF_LOG(WARN, "LogRpc post_request failed", K(ret), K(palf_id_), - // K(req), K(server)); + PALF_LOG(WARN, "LogRpc post_request failed", K(ret), K(palf_id_), K(req), K(server)); } else { PALF_LOG(TRACE, "post_request_to_server_ success", K(ret), K(server), K(palf_id_), K(req)); } diff --git a/src/logservice/palf/log_req.cpp b/src/logservice/palf/log_req.cpp index d669129c64..e6bff4be54 100644 --- a/src/logservice/palf/log_req.cpp +++ b/src/logservice/palf/log_req.cpp @@ -773,5 +773,97 @@ OB_SERIALIZE_MEMBER(LogGetStatResp, max_scn_, end_lsn_); // ================= LogGetStatResp end ================ +LogBatchFetchResp::LogBatchFetchResp() + : msg_proposal_id_(INVALID_PROPOSAL_ID), + prev_log_proposal_id_(INVALID_PROPOSAL_ID), + prev_lsn_(), + curr_lsn_(), + write_buf_() +{ +} + +LogBatchFetchResp::LogBatchFetchResp(const int64_t msg_proposal_id, + const int64_t prev_log_proposal_id, + const LSN &prev_lsn, + const LSN &curr_lsn, + const LogWriteBuf &write_buf) + : msg_proposal_id_(msg_proposal_id), + prev_log_proposal_id_(prev_log_proposal_id), + prev_lsn_(prev_lsn), + curr_lsn_(curr_lsn), + write_buf_(write_buf) +{ +} + +LogBatchFetchResp::~LogBatchFetchResp() +{ + reset(); +} + +bool LogBatchFetchResp::is_valid() const +{ + return INVALID_PROPOSAL_ID != msg_proposal_id_ + && true == curr_lsn_.is_valid() + && true == write_buf_.is_valid(); +} + +void LogBatchFetchResp::reset() +{ + msg_proposal_id_ = INVALID_PROPOSAL_ID; + prev_log_proposal_id_ = INVALID_PROPOSAL_ID; + prev_lsn_.reset(); + curr_lsn_.reset(); + write_buf_.reset(); +} + +OB_DEF_SERIALIZE(LogBatchFetchResp) +{ + int ret = OB_SUCCESS; + int64_t new_pos = pos; + if (NULL == buf || pos < 0 || pos > buf_len) { + ret = OB_INVALID_ARGUMENT; + } else if (OB_FAIL(serialization::encode_i64(buf, buf_len, new_pos, msg_proposal_id_)) + || OB_FAIL(serialization::encode_i64(buf, buf_len, new_pos, prev_log_proposal_id_)) + || OB_FAIL(prev_lsn_.serialize(buf, buf_len, new_pos)) + || OB_FAIL(curr_lsn_.serialize(buf, buf_len, new_pos)) + || OB_FAIL(write_buf_.serialize(buf, buf_len, new_pos))) { + PALF_LOG(ERROR, "LogBatchFetchResp serialize failed", K(ret), K(new_pos)); + } else { + pos = new_pos; + } + return ret; +} + +OB_DEF_DESERIALIZE(LogBatchFetchResp) +{ + int ret = OB_SUCCESS; + int64_t new_pos = pos; + if (NULL == buf || pos < 0 || pos > data_len) { + ret = OB_INVALID_ARGUMENT; + } else if (OB_FAIL(serialization::decode_i64(buf, data_len, new_pos, + reinterpret_cast(&msg_proposal_id_))) + || OB_FAIL(serialization::decode_i64(buf, data_len, new_pos, + reinterpret_cast(&prev_log_proposal_id_))) + || OB_FAIL(prev_lsn_.deserialize(buf, data_len, new_pos)) + || OB_FAIL(curr_lsn_.deserialize(buf, data_len, new_pos)) + || OB_FAIL(write_buf_.deserialize(buf, data_len, new_pos))) { + PALF_LOG(ERROR, "LogBatchFetchResp deserialize failed", K(ret), K(new_pos)); + } else { + pos = new_pos; + } + return ret; +} + +OB_DEF_SERIALIZE_SIZE(LogBatchFetchResp) +{ + int64_t size = 0; + size += serialization::encoded_length_i64(msg_proposal_id_); + size += serialization::encoded_length_i64(prev_log_proposal_id_); + size += prev_lsn_.get_serialize_size(); + size += curr_lsn_.get_serialize_size(); + size += write_buf_.get_serialize_size(); + return size; +} + } // end namespace palf } // end namespace oceanbase diff --git a/src/logservice/palf/log_req.h b/src/logservice/palf/log_req.h index 22d30018d3..fa06ebbb61 100644 --- a/src/logservice/palf/log_req.h +++ b/src/logservice/palf/log_req.h @@ -105,6 +105,26 @@ public: int64_t accepted_mode_pid_; }; +struct LogBatchFetchResp { + OB_UNIS_VERSION(1); +public: + LogBatchFetchResp(); + LogBatchFetchResp(const int64_t msg_proposal_id, + const int64_t prev_log_proposal_id, + const LSN &prev_lsn, + const LSN &curr_lsn, + const LogWriteBuf &write_buf); + ~LogBatchFetchResp(); + bool is_valid() const; + void reset(); + TO_STRING_KV(K_(msg_proposal_id), K_(prev_log_proposal_id), K_(prev_lsn), K_(curr_lsn), K_(write_buf)); + int64_t msg_proposal_id_; + int64_t prev_log_proposal_id_; + LSN prev_lsn_; + LSN curr_lsn_; + LogWriteBuf write_buf_; +}; + struct NotifyRebuildReq { OB_UNIS_VERSION(1); public: diff --git a/src/logservice/palf/log_request_handler.cpp b/src/logservice/palf/log_request_handler.cpp index 26176ef040..e84b783965 100644 --- a/src/logservice/palf/log_request_handler.cpp +++ b/src/logservice/palf/log_request_handler.cpp @@ -190,6 +190,32 @@ int LogRequestHandler::handle_request( return ret; } +template <> +int LogRequestHandler::handle_request( + const int64_t palf_id, + const ObAddr &server, + const LogBatchFetchResp &req) +{ + int ret = common::OB_SUCCESS; + if (false == is_valid_palf_id(palf_id) || false == req.is_valid()) { + ret = OB_INVALID_ARGUMENT; + PALF_LOG(ERROR, "Invalid argument!!!", K(ret), K(palf_id), K(req), KPC(palf_env_impl_)); + } else { + const char *buf = req.write_buf_.write_buf_[0].buf_; + const int64_t buf_len = req.write_buf_.write_buf_[0].buf_len_; + IPalfHandleImplGuard guard; + if (OB_FAIL(palf_env_impl_->get_palf_handle_impl(palf_id, guard))) { + PALF_LOG(WARN, "PalfEnvImpl get_palf_handle_impl failed", K(ret), K(palf_id)); + } else if (OB_FAIL(guard.get_palf_handle_impl()->receive_batch_log(server, req.msg_proposal_id_, + req.prev_log_proposal_id_, req.prev_lsn_, req.curr_lsn_, buf, buf_len))) { + PALF_LOG(WARN, "PalfHandleImpl receive_batch_log failed", K(ret), K(server), K(req), KPC(palf_env_impl_)); + } else { + PALF_LOG(TRACE, "PalfHandleImpl receive_batch_log success", K(ret), K(server), K(req), KPC(palf_env_impl_)); + } + } + return ret; +} + template <> int LogRequestHandler::handle_request( const int64_t palf_id, diff --git a/src/logservice/palf/log_rpc_processor.h b/src/logservice/palf/log_rpc_processor.h index 07bcac1c98..0f14eb65c6 100644 --- a/src/logservice/palf/log_rpc_processor.h +++ b/src/logservice/palf/log_rpc_processor.h @@ -47,6 +47,11 @@ DEFINE_RPC_PROCESSOR(LogFetchReqP, LogFetchReq, obrpc::OB_LOG_FETCH_REQ); +DEFINE_RPC_PROCESSOR(LogBatchFetchRespP, + obrpc::LogRpcProxyV2, + LogBatchFetchResp, + obrpc::OB_LOG_BATCH_FETCH_RESP); + DEFINE_RPC_PROCESSOR(LogPrepareReqP, obrpc::LogRpcProxyV2, LogPrepareReq, diff --git a/src/logservice/palf/log_rpc_proxy.cpp b/src/logservice/palf/log_rpc_proxy.cpp index 37cf7b8d1f..91758ee83a 100644 --- a/src/logservice/palf/log_rpc_proxy.cpp +++ b/src/logservice/palf/log_rpc_proxy.cpp @@ -74,6 +74,8 @@ DEFINE_RPC_PROXY_POST_FUNCTION(LogPushResp, OB_LOG_PUSH_RESP); DEFINE_RPC_PROXY_POST_FUNCTION(LogFetchReq, OB_LOG_FETCH_REQ); +DEFINE_RPC_PROXY_POST_FUNCTION(LogBatchFetchResp, + OB_LOG_BATCH_FETCH_RESP); DEFINE_RPC_PROXY_POST_FUNCTION(LogPrepareReq, OB_LOG_PREPARE_REQ); DEFINE_RPC_PROXY_POST_FUNCTION(LogPrepareResp, diff --git a/src/logservice/palf/log_rpc_proxy.h b/src/logservice/palf/log_rpc_proxy.h index f61da0dda3..c61feb8619 100644 --- a/src/logservice/palf/log_rpc_proxy.h +++ b/src/logservice/palf/log_rpc_proxy.h @@ -40,6 +40,9 @@ public: DECLARE_RPC_PROXY_POST_FUNCTION(PR3, LogFetchReq, OB_LOG_FETCH_REQ); + DECLARE_RPC_PROXY_POST_FUNCTION(PR3, + LogBatchFetchResp, + OB_LOG_BATCH_FETCH_RESP); DECLARE_RPC_PROXY_POST_FUNCTION(PR3, LogPrepareReq, OB_LOG_PREPARE_REQ); diff --git a/src/logservice/palf/log_sliding_window.cpp b/src/logservice/palf/log_sliding_window.cpp index 1bf10c0574..0120970785 100644 --- a/src/logservice/palf/log_sliding_window.cpp +++ b/src/logservice/palf/log_sliding_window.cpp @@ -1804,11 +1804,11 @@ void LogSlidingWindow::try_update_committed_lsn_for_fetch_( LSN last_committed_end_lsn; last_fetch_end_lsn.val_ = ATOMIC_LOAD(&last_fetch_end_lsn_.val_); last_committed_end_lsn.val_ = ATOMIC_LOAD(&last_fetch_committed_end_lsn_.val_); + int64_t last_fetch_max_log_id = OB_INVALID_LOG_ID; + get_last_fetch_info_(last_fetch_end_lsn, last_committed_end_lsn, last_fetch_max_log_id); if (!last_fetch_end_lsn.is_valid() || last_committed_end_lsn.is_valid()) { // no need update } else { - int64_t last_fetch_max_log_id = OB_INVALID_LOG_ID; - get_last_fetch_info_(last_fetch_end_lsn, last_committed_end_lsn, last_fetch_max_log_id); if (last_fetch_end_lsn.is_valid() && !last_committed_end_lsn.is_valid() && (log_end_lsn >= last_fetch_end_lsn || log_id == last_fetch_max_log_id)) { @@ -1825,7 +1825,7 @@ void LogSlidingWindow::try_update_committed_lsn_for_fetch_( K(log_id), K_(last_fetch_max_log_id), K_(last_fetch_end_lsn), K_(last_fetch_committed_end_lsn)); } else if (last_fetch_end_lsn == last_fetch_end_lsn_ - || log_id == last_fetch_max_log_id_) { + || last_fetch_max_log_id == last_fetch_max_log_id_) { LSN committed_end_lsn; get_committed_end_lsn_(committed_end_lsn); // The order is fatal: diff --git a/src/logservice/palf/palf_handle_impl.cpp b/src/logservice/palf/palf_handle_impl.cpp index d9aa92a4ea..638b039e68 100644 --- a/src/logservice/palf/palf_handle_impl.cpp +++ b/src/logservice/palf/palf_handle_impl.cpp @@ -2617,14 +2617,14 @@ int PalfHandleImpl::handle_committed_info(const common::ObAddr &server, return ret; } -int PalfHandleImpl::receive_log(const common::ObAddr &server, - const PushLogType push_log_type, - const int64_t &msg_proposal_id, - const LSN &prev_lsn, - const int64_t &prev_log_proposal_id, - const LSN &lsn, - const char *buf, - const int64_t buf_len) +int PalfHandleImpl::receive_log_(const common::ObAddr &server, + const PushLogType push_log_type, + const int64_t &msg_proposal_id, + const LSN &prev_lsn, + const int64_t &prev_log_proposal_id, + const LSN &lsn, + const char *buf, + const int64_t buf_len) { int ret = OB_SUCCESS; TruncateLogInfo truncate_log_info; @@ -2712,6 +2712,68 @@ int PalfHandleImpl::receive_log(const common::ObAddr &server, return ret; } +int PalfHandleImpl::receive_log(const common::ObAddr &server, + const PushLogType push_log_type, + const int64_t &msg_proposal_id, + const LSN &prev_lsn, + const int64_t &prev_log_proposal_id, + const LSN &lsn, + const char *buf, + const int64_t buf_len) +{ + return receive_log_(server, push_log_type, msg_proposal_id, prev_lsn, prev_log_proposal_id, lsn, buf, buf_len); +} + +int PalfHandleImpl::receive_batch_log(const common::ObAddr &server, + const int64_t msg_proposal_id, + const int64_t prev_log_proposal_id, + const LSN &prev_lsn, + const LSN &curr_lsn, + const char *buf, + const int64_t buf_len) +{ + int ret = OB_SUCCESS; + int64_t start_ts = ObTimeUtility::current_time(); + MemPalfGroupBufferIterator iterator; + MemoryStorage storage; + auto get_file_end_lsn = [curr_lsn, buf_len] () { return curr_lsn + buf_len; }; + if (OB_FAIL(iterator.init(curr_lsn, get_file_end_lsn, &storage))) { + PALF_LOG(ERROR, "init iterator failed", K(ret), KPC(this)); + } else if (OB_FAIL(storage.init(curr_lsn))) { + PALF_LOG(ERROR, "init storage failed", K(ret), KPC(this)); + } else if (OB_FAIL(storage.append(buf, buf_len))) { + PALF_LOG(ERROR, "storage append failed", K(ret), KPC(this)); + } else { + LSN prev_lsn_each_round = prev_lsn; + int64_t prev_log_proposal_id_each_round = prev_log_proposal_id; + LSN curr_lsn_each_round = curr_lsn; + int64_t curr_log_proposal_id = 0; + const char *buf_each_round = NULL; + int64_t buf_len_each_round = 0; + int64_t count = 0; + while (OB_SUCC(iterator.next())) { + if (OB_FAIL(iterator.get_entry(buf_each_round, buf_len_each_round, curr_lsn_each_round, curr_log_proposal_id))) { + PALF_LOG(ERROR, "get_entry failed", K(ret), KPC(this), K(iterator), KP(buf_each_round)); + } else if (OB_FAIL(receive_log_(server, FETCH_LOG_RESP, msg_proposal_id, prev_lsn_each_round, + prev_log_proposal_id_each_round, curr_lsn_each_round, buf_each_round, buf_len_each_round))) { + PALF_LOG(WARN, "receive_log failed", K(ret), KPC(this), K(iterator), K(server), K(FETCH_LOG_RESP), K(msg_proposal_id), + K(prev_lsn_each_round), K(prev_log_proposal_id_each_round), K(curr_lsn_each_round), KP(buf_each_round), + K(buf_len_each_round)); + } + prev_lsn_each_round = curr_lsn_each_round; + prev_log_proposal_id_each_round = curr_log_proposal_id; + count++; + } + if (OB_ITER_END == ret) { + ret = OB_SUCCESS; + } + int64_t cost_ts = ObTimeUtility::current_time() - start_ts; + PALF_LOG(TRACE, "receive_batch_log finished", K(ret), KPC(this), K(server), K(count), K(prev_lsn), K(curr_lsn), + K(buf_len), K(cost_ts), K_(sw), K(iterator)); + } + return ret; +} + int PalfHandleImpl::submit_group_log(const PalfAppendOptions &opts, const LSN &lsn, const char *buf, @@ -3066,9 +3128,12 @@ int PalfHandleImpl::fetch_log_from_storage_(const common::ObAddr &server, FetchLogStat &fetch_stat) { int ret = OB_SUCCESS; - int64_t send_cost = 0, get_entry_cost = 0; + LSN prev_lsn_each_round = prev_lsn; + LSN prev_end_lsn_each_round = prev_lsn; + int64_t prev_log_proposal_id_each_round = INVALID_PROPOSAL_ID; PalfGroupBufferIterator iterator; const LSN fetch_end_lsn = fetch_start_lsn + fetch_log_size; + int64_t fetched_count = 0; const bool need_check_prev_log = (prev_lsn.is_valid() && PALF_INITIAL_LSN_VAL < fetch_start_lsn.val_); LSN max_flushed_end_lsn; LSN committed_end_lsn; @@ -3098,9 +3163,7 @@ int PalfHandleImpl::fetch_log_from_storage_(const common::ObAddr &server, // max_flushed_end_lsn may be truncated by concurrent truncate, so itreator need handle this // case when it try to read some log which is being truncated. - auto get_file_end_lsn = [&]() { - return max_flushed_end_lsn; - }; + auto get_file_end_lsn = [&]() { return max_flushed_end_lsn; }; LogInfo prev_log_info; const bool no_need_fetch_log = (prev_lsn >= max_flushed_end_lsn) || (AccessMode::FLASHBACK == access_mode); @@ -3108,6 +3171,9 @@ int PalfHandleImpl::fetch_log_from_storage_(const common::ObAddr &server, int64_t replica_num = 0; (void) config_mgr_.get_curr_member_list(member_list, replica_num); const bool is_dest_in_memberlist = (member_list.contains(server)); + // Rpc delay increases enormously when it's size exceeds 2M. + const int64_t MAX_BATCH_LOG_SIZE_EACH_ROUND = 2 * 1024 * 1024 - 1024; + char *batch_log_buf = NULL; if (no_need_fetch_log) { PALF_LOG(INFO, "no need fetch_log_from_storage", K(ret), KPC(this), K(server), K(fetch_start_lsn), K(prev_lsn), K(max_flushed_end_lsn), K(access_mode)); @@ -3125,34 +3191,61 @@ int PalfHandleImpl::fetch_log_from_storage_(const common::ObAddr &server, } } else if (check_need_hook_fetch_log_(fetch_type, fetch_start_lsn)) { ret = OB_ERR_OUT_OF_LOWER_BOUND; + } else if (FALSE_IT(prev_log_proposal_id_each_round = prev_log_info.log_proposal_id_)) { } else if (OB_FAIL(iterator.init(fetch_start_lsn, get_file_end_lsn, log_engine_.get_log_storage()))) { - PALF_LOG(WARN, "PalfGroupBufferIterator init failed", K(ret), K_(palf_id)); - } else { - bool need_print_error = false; + PALF_LOG(ERROR, "init iterator failed", K(ret), K_(palf_id)); + } else if (FALSE_IT(iterator.set_need_print_error(false))) { // NB: Fetch log will be concurrent with truncate, the content on disk will not integrity, need igore // read log error. - iterator.set_need_print_error(need_print_error); - LSN each_round_prev_lsn = prev_lsn; + } else if (GET_MIN_CLUSTER_VERSION() >= CLUSTER_VERSION_4_2_0_0 && + OB_NOT_NULL(batch_log_buf = (char *)mtl_malloc(MAX_BATCH_LOG_SIZE_EACH_ROUND, "BatchLogBuf"))) { + // When group log size > 10K, we just send it without aggregating. + const int64_t MAX_NEED_BATCH_LOG_SIZE = 10 * 1024; + // When batched log count reaches max_batch_log_count_each_round, we end the aggregation. + const int64_t max_batch_log_count_each_round = PALF_SLIDING_WINDOW_SIZE; + int64_t remained_count = fetch_log_count; + bool is_reach_end = false; + bool skip_next = false; + BatchFetchParams batch_fetch_params; + batch_fetch_params.batch_log_buf_ = batch_log_buf; + batch_fetch_params.can_batch_size_ = MAX_BATCH_LOG_SIZE_EACH_ROUND; + batch_fetch_params.last_log_lsn_prev_round_ = prev_lsn_each_round; + batch_fetch_params.last_log_end_lsn_prev_round_ = prev_end_lsn_each_round; + batch_fetch_params.last_log_proposal_id_prev_round_ = prev_log_proposal_id_each_round; + while (OB_SUCC(ret) && remained_count > 0 && !is_reach_end && + batch_fetch_params.last_log_end_lsn_prev_round_ < fetch_end_lsn) { + batch_fetch_params.can_batch_count_ = MIN(remained_count, max_batch_log_count_each_round); + batch_fetch_params.has_consumed_count_ = 0; + if (OB_FAIL(batch_fetch_log_each_round_(server, msg_proposal_id, iterator, is_limitted_by_end_lsn, + is_dest_in_memberlist, replayable_point, fetch_end_lsn, committed_end_lsn, MAX_NEED_BATCH_LOG_SIZE, + batch_fetch_params, skip_next, is_reach_end, fetch_stat)) && OB_ITER_END != ret) { + PALF_LOG(WARN, "batch_fetch_log_each_round_ failed", K(ret), KPC(this), K(iterator)); + } else { + remained_count -= batch_fetch_params.has_consumed_count_; + } + } + prev_lsn_each_round = batch_fetch_params.last_log_lsn_prev_round_; + prev_end_lsn_each_round = batch_fetch_params.last_log_end_lsn_prev_round_; + prev_log_proposal_id_each_round = batch_fetch_params.last_log_proposal_id_prev_round_; + fetched_count = fetch_log_count - remained_count; + } else { + if (GET_MIN_CLUSTER_VERSION() >= CLUSTER_VERSION_4_2_0_0) { + PALF_LOG(WARN, "allocate batch_log_buf memory failed", KPC(this), K(server), K(prev_lsn)); + } LogGroupEntry curr_group_entry; LSN curr_lsn; + LSN curr_log_end_lsn; bool is_reach_size_limit = false; // whether the total fetched size exceeds fetch_log_size bool is_reach_count_limit = false; bool is_reach_end = false; - int64_t fetched_count = 0; int64_t total_size = 0; - int64_t read_cost = 0; - LSN curr_log_end_lsn = curr_lsn + curr_group_entry.get_group_entry_size(); - LSN prev_log_end_lsn; - int64_t prev_log_proposal_id = prev_log_info.log_proposal_id_; - int64_t read_begin_time = ObTimeUtility::current_time(), send_begin_time = read_begin_time, tmp_ts = 0; + int64_t send_cost = 0; + int64_t send_begin_time = ObTimeUtility::current_time(); while (OB_SUCC(ret) && !is_reach_size_limit && !is_reach_count_limit && !is_reach_end && OB_SUCC(iterator.next())) { - tmp_ts = ObTimeUtility::current_time(); - read_cost += tmp_ts - read_begin_time; if (OB_FAIL(iterator.get_entry(curr_group_entry, curr_lsn))) { PALF_LOG(ERROR, "PalfGroupBufferIterator get_entry failed", K(ret), K_(palf_id), K(curr_group_entry), K(curr_lsn), K(iterator)); - } else if (FALSE_IT(get_entry_cost += ObTimeUtility::current_time() - tmp_ts)) { } else if (FALSE_IT(curr_log_end_lsn = curr_lsn + curr_group_entry.get_group_entry_size())) { } else if (is_limitted_by_end_lsn && curr_log_end_lsn > committed_end_lsn) { // Only leader replica can send uncommitted logs to others, @@ -3168,10 +3261,10 @@ int PalfHandleImpl::fetch_log_from_storage_(const common::ObAddr &server, PALF_LOG(INFO, "non paxos member could not fetch logs which scn is bigger than replayable_point, end fetch", K_(palf_id), K(server), K(msg_proposal_id), K(curr_lsn), K(replayable_point)); } else if (FALSE_IT(send_begin_time = ObTimeUtility::current_time())) { - } else if (OB_FAIL(submit_fetch_log_resp_(server, msg_proposal_id, prev_log_proposal_id, \ - each_round_prev_lsn, curr_lsn, curr_group_entry))) { + } else if (OB_FAIL(submit_fetch_log_resp_(server, msg_proposal_id, prev_log_proposal_id_each_round, \ + prev_lsn_each_round, curr_lsn, curr_group_entry))) { PALF_LOG(WARN, "submit_fetch_log_resp_ failed", K(ret), K_(palf_id), K(server), - K(msg_proposal_id), K(each_round_prev_lsn), K(fetch_start_lsn)); + K(msg_proposal_id), K(prev_lsn_each_round), K(fetch_start_lsn)); } else { send_cost += ObTimeUtility::current_time() - send_begin_time; fetched_count++; @@ -3184,31 +3277,34 @@ int PalfHandleImpl::fetch_log_from_storage_(const common::ObAddr &server, is_reach_size_limit = true; } PALF_LOG(TRACE, "fetch one log success", K(ret), K_(palf_id), K_(self), K(server), K(prev_lsn), - K(fetch_start_lsn), K(each_round_prev_lsn), K(curr_lsn), K(curr_group_entry), - K(prev_log_proposal_id), K(fetch_end_lsn), K(curr_log_end_lsn), K(is_reach_size_limit), + K(fetch_start_lsn), K(prev_lsn_each_round), K(curr_lsn), K(curr_group_entry), + K(prev_log_proposal_id_each_round), K(fetch_end_lsn), K(curr_log_end_lsn), K(is_reach_size_limit), K(fetch_log_size), K(fetched_count), K(is_reach_count_limit)); - each_round_prev_lsn = curr_lsn; - prev_log_end_lsn = curr_log_end_lsn; - prev_log_proposal_id = curr_group_entry.get_header().get_log_proposal_id(); + prev_lsn_each_round = curr_lsn; + prev_end_lsn_each_round = curr_log_end_lsn; + prev_log_proposal_id_each_round = curr_group_entry.get_header().get_log_proposal_id(); } - read_begin_time = ObTimeUtility::current_time(); - } - if (OB_ITER_END == ret) { - ret = OB_SUCCESS; - } - // try send committed_info to server - if (OB_SUCC(ret)) { - RLockGuard guard(lock_); - (void) try_send_committed_info_(server, each_round_prev_lsn, prev_log_end_lsn, prev_log_proposal_id); } // update fetch statistic info fetch_stat.total_size_ = total_size; fetch_stat.group_log_cnt_ = fetched_count; - fetch_stat.read_cost_ = read_cost; - fetch_stat.get_cost_ = get_entry_cost; fetch_stat.send_cost_ = send_cost; } - + if (batch_log_buf != NULL) { + mtl_free(batch_log_buf); + batch_log_buf = NULL; + } + if (OB_ITER_END == ret) { + ret = OB_SUCCESS; + } + PALF_LOG(INFO, "fetch_log_from_storage_ finished", K(ret), KPC(this), K(prev_lsn), K(iterator), + K(fetch_log_count), "has_fetched_log_count", fetched_count); + // try send committed_info to server + if (OB_SUCC(ret)) { + RLockGuard guard(lock_); + (void) try_send_committed_info_(server, prev_lsn_each_round, prev_end_lsn_each_round, + prev_log_proposal_id_each_round); + } if (OB_FAIL(ret) && OB_ERR_OUT_OF_LOWER_BOUND == ret) { // ret is OB_ERR_OUT_OF_LOWER_BOUND, need notify dst server to trigger rebuild LSN base_lsn = log_engine_.get_log_meta().get_log_snapshot_meta().base_lsn_; @@ -3230,6 +3326,152 @@ int PalfHandleImpl::fetch_log_from_storage_(const common::ObAddr &server, return ret; } +int PalfHandleImpl::batch_fetch_log_each_round_(const common::ObAddr &server, + const int64_t msg_proposal_id, + PalfGroupBufferIterator &iterator, + const bool is_limitted_by_end_lsn, + const bool is_dest_in_memberlist, + const share::SCN& replayable_point, + const LSN &fetch_end_lsn, + const LSN &committed_end_lsn, + const int64_t max_need_batch_log_size, + BatchFetchParams &batch_fetch_params, + bool &skip_next, + bool &is_reach_end, + FetchLogStat &fetch_stat) +{ + int ret = OB_SUCCESS; + int64_t remained_count = batch_fetch_params.can_batch_count_; + int64_t remained_size = batch_fetch_params.can_batch_size_; + LSN prev_lsn = batch_fetch_params.last_log_lsn_prev_round_; + LSN prev_end_lsn = batch_fetch_params.last_log_end_lsn_prev_round_; + int64_t prev_log_proposal_id = batch_fetch_params.last_log_proposal_id_prev_round_; + int64_t has_consumed_count = batch_fetch_params.has_consumed_count_; + LSN curr_lsn; + LSN curr_end_lsn; + int64_t curr_log_proposal_id; + LSN first_log_lsn; + const char *curr_log_buf = NULL; + int64_t curr_log_buf_len = 0; + int64_t log_end_pos = 0; + bool has_reach_threshold = false; + share::SCN curr_scn; + bool is_raw_write; + + int64_t start_ts = ObTimeUtility::current_time(); + while (OB_SUCC(ret) && !is_reach_end && remained_count > 0 && remained_size > 0) { + if (!skip_next && OB_FAIL(iterator.next())) { + PALF_LOG(WARN, "iterator next failed", K(ret), KPC(this), K(iterator), K(replayable_point), K(prev_lsn)); + } else if (FALSE_IT(skip_next = false)) { + } else if (OB_FAIL(iterator.get_entry(curr_log_buf, curr_log_buf_len, curr_scn, curr_lsn, curr_log_proposal_id, + is_raw_write))){ + PALF_LOG(WARN, "iterator get_entry failed", K(ret), KPC(this), K(iterator), K(log_end_pos)); + } else if (false == is_dest_in_memberlist && is_raw_write && replayable_point.is_valid() && + curr_scn > replayable_point) { + is_reach_end = true; + PALF_LOG(INFO, "non paxos member could not fetch logs which scn is bigger than replayable_point, end fetch", + K_(palf_id), K(server), K(msg_proposal_id), K(curr_lsn), K(replayable_point)); + } else if (FALSE_IT(curr_end_lsn = curr_lsn + curr_log_buf_len)) { + } else if (is_limitted_by_end_lsn && curr_end_lsn > committed_end_lsn){ + // Only leader replica can send uncommitted logs to others, + // the other replicas just send committed logs to avoid unexpected rewriting. + is_reach_end = true; + PALF_LOG(INFO, "reach committed_end_lsn(not leader active replica), end fetch", K(ret), K_(palf_id), K(server), + K(msg_proposal_id), K(curr_lsn), K(curr_end_lsn), K(committed_end_lsn)); + } else if (curr_log_buf_len >= max_need_batch_log_size ) { + has_reach_threshold = true; + PALF_LOG(TRACE, "group log is bigger than batch_log_size_threshold", K(ret), K_(palf_id), K(server), + K(msg_proposal_id), K(curr_lsn), K(curr_end_lsn), K(max_need_batch_log_size)); + break; + } else if (remained_size >= curr_log_buf_len ) { + if (!first_log_lsn.is_valid()) { + first_log_lsn = curr_lsn; + } + MEMCPY(batch_fetch_params.batch_log_buf_ + log_end_pos, curr_log_buf, curr_log_buf_len); + log_end_pos += curr_log_buf_len; + prev_lsn = curr_lsn; + prev_end_lsn = curr_end_lsn; + prev_log_proposal_id = curr_log_proposal_id; + has_consumed_count++; + remained_count--; + remained_size -= curr_log_buf_len; + if (curr_end_lsn >= fetch_end_lsn) { + PALF_LOG(INFO, "fetched log has reached fetch_end_lsn", K(ret), KPC(this), K(curr_lsn), K(curr_end_lsn), + K(fetch_end_lsn)); + is_reach_end = true; + } + } else { + // batch the group log next round + skip_next = true; + PALF_LOG(TRACE, "batched log has exceeded can_batch_size", K(ret), KPC(this), K(batch_fetch_params.can_batch_size_), + K(remained_size), K(prev_lsn), K(curr_lsn)); + break; + } + } + int64_t batch_end_ts = common::ObTimeUtility::current_time(); + if (OB_ITER_END == ret) { + is_reach_end = true; + ret = OB_SUCCESS; + } + if (OB_FAIL(ret)) { + PALF_LOG(WARN, "batch log failed", K(ret), KPC(this), K(iterator), K(log_end_pos), + K(curr_lsn), K(has_consumed_count), K(first_log_lsn)); + } else { + if (1 < has_consumed_count) { + ret = submit_batch_fetch_log_resp_(server, msg_proposal_id, batch_fetch_params.last_log_proposal_id_prev_round_, + batch_fetch_params.last_log_lsn_prev_round_, first_log_lsn, batch_fetch_params.batch_log_buf_, log_end_pos); + } else if (1 == has_consumed_count) { + ret = submit_fetch_log_resp_(server, msg_proposal_id, batch_fetch_params.last_log_proposal_id_prev_round_, + batch_fetch_params.last_log_lsn_prev_round_, first_log_lsn, batch_fetch_params.batch_log_buf_, log_end_pos); + } else { + PALF_LOG(TRACE, "no log is aggregated", K(ret), KPC(this), K(log_end_pos), K(curr_lsn), + K(has_consumed_count), K(first_log_lsn)); + } + } + + if (OB_SUCC(ret) && has_reach_threshold) { + if (OB_FAIL(submit_fetch_log_resp_(server, msg_proposal_id, prev_log_proposal_id, prev_lsn, + curr_lsn, curr_log_buf, curr_log_buf_len))) { + PALF_LOG(WARN, "submit_fetch_log_resp_ failed", K(ret), KPC(this), K(prev_lsn), + K(curr_lsn), K(curr_log_buf_len)); + } else { + log_end_pos += curr_log_buf_len; + prev_lsn = curr_lsn; + prev_end_lsn = curr_end_lsn; + prev_log_proposal_id = curr_log_proposal_id; + has_consumed_count++; + remained_count--; + remained_size -= curr_log_buf_len; + PALF_LOG(TRACE, "submit_fetch_log_resp_ success", K(ret), KPC(this), K(prev_lsn), + K(curr_lsn), K(curr_log_buf_len)); + } + } + int64_t send_end_ts = common::ObTimeUtility::current_time(); + if (OB_SUCC(ret) && 0 != has_consumed_count) { + batch_fetch_params.last_log_lsn_prev_round_ = prev_lsn; + batch_fetch_params.last_log_end_lsn_prev_round_ = prev_end_lsn; + batch_fetch_params.last_log_proposal_id_prev_round_ = prev_log_proposal_id; + batch_fetch_params.has_consumed_count_ = has_consumed_count; + int64_t total_size = log_end_pos; + int64_t batch_cost = batch_end_ts - start_ts; + int64_t send_cost = send_end_ts - batch_end_ts; + int64_t total_cost = send_end_ts - start_ts; + fetch_stat.total_size_ += total_size; + fetch_stat.group_log_cnt_ += has_consumed_count; + fetch_stat.send_cost_ += send_cost; + int64_t avg_log_size = total_size / has_consumed_count; + int64_t avg_log_cost = total_cost / has_consumed_count; + int64_t avg_batch_cost = batch_cost / has_consumed_count; + int64_t avg_send_cost = send_cost/ has_consumed_count; + PALF_LOG(TRACE, "batch_fetch_log_one_round_ success", K(ret), KPC(this), K(is_reach_end), + K(batch_fetch_params.can_batch_size_), K(remained_size), K(batch_fetch_params.can_batch_count_), + K(has_consumed_count), K(total_size), K(total_cost), K(batch_cost), K(send_cost), K(avg_log_size), + K(avg_log_cost), K(avg_send_cost), K(avg_batch_cost), K(first_log_lsn), + K(batch_fetch_params.last_log_lsn_prev_round_), K(iterator)); + } + return ret; +} + int PalfHandleImpl::submit_fetch_log_resp_(const common::ObAddr &server, const int64_t &msg_proposal_id, const int64_t &prev_log_proposal_id, @@ -3257,6 +3499,54 @@ int PalfHandleImpl::submit_fetch_log_resp_(const common::ObAddr &server, return ret; } +int PalfHandleImpl::submit_fetch_log_resp_(const common::ObAddr &server, + const int64_t &msg_proposal_id, + const int64_t &prev_log_proposal_id, + const LSN &prev_lsn, + const LSN &curr_lsn, + const char *buf, + const int64_t buf_len) +{ + int ret = OB_SUCCESS; + LogWriteBuf write_buf; + // NB: 'curr_group_entry' generates by PalfGroupBufferIterator, the memory is safe before next(); + if (OB_FAIL(write_buf.push_back(buf, buf_len))) { + PALF_LOG(WARN, "push_back buf into LogWriteBuf failed", K(ret)); + } else if (OB_FAIL(log_engine_.submit_push_log_req(server, FETCH_LOG_RESP, msg_proposal_id, prev_log_proposal_id, + prev_lsn, curr_lsn, write_buf))) { + PALF_LOG(WARN, "submit_push_log_req failed", K(ret), K(server), K(msg_proposal_id), K(prev_log_proposal_id), + K(prev_lsn), K(curr_lsn), K(write_buf)); + } else { + PALF_LOG(TRACE, "submit_fetch_log_resp_ success", K(ret), K(server), K(msg_proposal_id), K(prev_log_proposal_id), + K(prev_lsn), K(curr_lsn), K(write_buf)); + } + return ret; +} + +int PalfHandleImpl::submit_batch_fetch_log_resp_(const common::ObAddr &server, + const int64_t msg_proposal_id, + const int64_t prev_log_proposal_id, + const LSN &prev_lsn, + const LSN &curr_lsn, + const char *buf, + const int64_t buf_len) +{ + int ret = OB_SUCCESS; + LogWriteBuf write_buf; + int64_t pos = 0; + if (OB_FAIL(write_buf.push_back(buf, buf_len))) { + PALF_LOG(WARN, "push_back buf into LogWriteBuf failed", K(ret)); + } else if (OB_FAIL(log_engine_.submit_batch_fetch_log_resp(server, msg_proposal_id, prev_log_proposal_id, + prev_lsn, curr_lsn, write_buf))) { + PALF_LOG(WARN, "submit_push_log_req failed", K(ret), K(server), K(msg_proposal_id), K(prev_log_proposal_id), + K(prev_lsn), K(curr_lsn), K(write_buf)); + } else { + PALF_LOG(TRACE, "submit_batch_fetch_log_resp_ success", K(ret), K(server), KPC(this), K(msg_proposal_id), + K(prev_log_proposal_id), K(buf_len), K(prev_lsn), K(curr_lsn), K(write_buf)); + } + return ret; +} + // NB: 1. there is no need to distinguish between reconfirm or follower active; // 2. whether msg_proposal_id is equal to currentry proposal_id or not, response logs to the requester. int PalfHandleImpl::get_log(const common::ObAddr &server, diff --git a/src/logservice/palf/palf_handle_impl.h b/src/logservice/palf/palf_handle_impl.h index 3f5e1886b3..f4e6e126be 100755 --- a/src/logservice/palf/palf_handle_impl.h +++ b/src/logservice/palf/palf_handle_impl.h @@ -150,6 +150,33 @@ struct FetchLogStat { K_(send_cost)); }; +struct BatchFetchParams { + BatchFetchParams() { reset(); } + ~BatchFetchParams() { reset(); } + void reset() { + can_batch_count_ = 0; + can_batch_size_ = 0; + batch_log_buf_ = NULL; + last_log_lsn_prev_round_.reset(); + last_log_end_lsn_prev_round_.reset(); + last_log_proposal_id_prev_round_ = 0; + } + TO_STRING_KV(K_(can_batch_count), + K_(can_batch_size), + KP_(batch_log_buf), + K_(last_log_lsn_prev_round), + K_(last_log_end_lsn_prev_round), + K_(last_log_proposal_id_prev_round), + K_(has_consumed_count)); + int64_t can_batch_count_; + int64_t can_batch_size_; + char *batch_log_buf_; + LSN last_log_lsn_prev_round_; + LSN last_log_end_lsn_prev_round_; + int64_t last_log_proposal_id_prev_round_; + int64_t has_consumed_count_; +}; + struct LSKey { LSKey() : id_(-1) {} explicit LSKey(const int64_t id) : id_(id) {} @@ -567,6 +594,13 @@ public: const LSN &lsn, const char *buf, const int64_t buf_len) = 0; + virtual int receive_batch_log(const common::ObAddr &server, + const int64_t msg_proposal_id, + const int64_t prev_log_proposal_id, + const LSN &prev_lsn, + const LSN &curr_lsn, + const char *buf, + const int64_t buf_len) = 0; virtual int ack_log(const common::ObAddr &server, const int64_t &proposal_id, const LSN &log_end_lsn) = 0; @@ -911,6 +945,13 @@ public: const LSN &lsn, const char *buf, const int64_t buf_len) override final; + int receive_batch_log(const common::ObAddr &server, + const int64_t msg_proposal_id, + const int64_t prev_log_proposal_id, + const LSN &prev_lsn, + const LSN &curr_lsn, + const char *buf, + const int64_t buf_len) override final; int ack_log(const common::ObAddr &server, const int64_t &proposal_id, const LSN &log_end_lsn) override final; @@ -1015,6 +1056,14 @@ private: const LSN &log_lsn, const LSN &log_end_lsn, const int64_t &log_proposal_id); + int receive_log_(const common::ObAddr &server, + const PushLogType push_log_type, + const int64_t &msg_proposal_id, + const LSN &prev_lsn, + const int64_t &prev_log_proposal_id, + const LSN &lsn, + const char *buf, + const int64_t buf_len); int fetch_log_from_storage_(const common::ObAddr &server, const FetchLogType fetch_type, const int64_t &msg_proposal_id, @@ -1024,12 +1073,39 @@ private: const int64_t fetch_log_count, const SCN &replayable_point, FetchLogStat &fetch_stat); + int batch_fetch_log_each_round_(const common::ObAddr &server, + const int64_t msg_proposal_id, + PalfGroupBufferIterator &iterator, + const bool is_limitted_by_end_lsn, + const bool is_dest_in_memberlist, + const share::SCN& replayable_point, + const LSN &fetch_end_lsn, + const LSN &committed_end_lsn, + const int64_t batch_log_size_threshold, + BatchFetchParams &batch_fetch_params, + bool &skip_next, + bool &is_reach_end, + FetchLogStat &fetch_stat); int submit_fetch_log_resp_(const common::ObAddr &server, const int64_t &msg_proposal_id, const int64_t &prev_log_proposal_id, const LSN &prev_lsn, const LSN &curr_lsn, const LogGroupEntry &curr_group_entry); + int submit_fetch_log_resp_(const common::ObAddr &server, + const int64_t &msg_proposal_id, + const int64_t &prev_log_proposal_id, + const LSN &prev_lsn, + const LSN &curr_lsn, + const char *buf, + const int64_t buf_len); + int submit_batch_fetch_log_resp_(const common::ObAddr &server, + const int64_t msg_proposal_id, + const int64_t prev_log_proposal_id, + const LSN &prev_lsn, + const LSN &curr_lsn, + const char *buf, + const int64_t buf_len); int try_update_proposal_id_(const common::ObAddr &server, const int64_t &proposal_id); int get_binary_search_range_(const share::SCN &scn, diff --git a/src/logservice/palf/palf_iterator.h b/src/logservice/palf/palf_iterator.h index 1eb7527feb..5a4203d448 100644 --- a/src/logservice/palf/palf_iterator.h +++ b/src/logservice/palf/palf_iterator.h @@ -183,23 +183,6 @@ public: } return ret; } - int get_entry(const char *&buffer, int64_t &nbytes, share::SCN &scn, LSN &lsn, bool &is_raw_write) - { - int ret = OB_SUCCESS; - LogEntryType entry; - OB_ASSERT((std::is_same::value) == true); - if (IS_NOT_INIT) { - ret = OB_NOT_INIT; - } else if (OB_FAIL(iterator_impl_.get_entry(entry, lsn, is_raw_write)) && OB_ITER_END != ret) { - PALF_LOG(WARN, "PalfIterator get_entry failed", K(ret), K(entry), K(lsn), KPC(this)); - } else { - buffer = entry.get_data_buf(); - nbytes = entry.get_data_len(); - scn = entry.get_scn(); - PALF_LOG(TRACE, "PalfIterator get_entry success", K(ret), KPC(this), K(entry), K(is_raw_write)); - } - return ret; - } int get_entry(const char *&buffer, LogEntryType &entry, LSN& lsn) { int ret = OB_SUCCESS; @@ -217,21 +200,23 @@ public: } int get_entry(const char *&buffer, int64_t &nbytes, share::SCN &scn, LSN &lsn) { - int ret = OB_SUCCESS; - LogEntryType entry; bool unused_is_raw_write = false; - OB_ASSERT((std::is_same::value) == true); - if (IS_NOT_INIT) { - ret = OB_NOT_INIT; - } else if (OB_FAIL(iterator_impl_.get_entry(entry, lsn, unused_is_raw_write)) && OB_ITER_END != ret) { - PALF_LOG(WARN, "PalfIterator get_entry failed", K(ret), K(entry), K(lsn), KPC(this)); - } else { - buffer = entry.get_data_buf(); - nbytes = entry.get_data_len(); - scn = entry.get_scn(); - PALF_LOG(TRACE, "PalfIterator get_entry success", K(iterator_impl_), K(ret), KPC(this), K(entry)); - } - return ret; + return get_entry_(buffer, nbytes, scn, lsn, unused_is_raw_write); + } + int get_entry(const char *&buffer, int64_t &nbytes, share::SCN &scn, LSN &lsn, bool &is_raw_write) + { + return get_entry_(buffer, nbytes, scn, lsn, is_raw_write); + } + int get_entry(const char *&buffer, int64_t &nbytes, LSN &lsn, int64_t &log_proposal_id) + { + share::SCN unused_scn; + bool unused_is_raw_write = false; + return get_entry_(buffer, nbytes, unused_scn, lsn, log_proposal_id, unused_is_raw_write); + } + int get_entry(const char *&buffer, int64_t &nbytes, share::SCN &scn, LSN &lsn, int64_t &log_proposal_id, + bool &is_raw_write) + { + return get_entry_(buffer, nbytes, scn, lsn, log_proposal_id, is_raw_write); } bool is_inited() const { @@ -284,6 +269,44 @@ private: return ret; } + int get_entry_(const char *&buffer, int64_t &nbytes, share::SCN &scn, LSN &lsn, bool &is_raw_write) + { + int ret = OB_SUCCESS; + LogEntryType entry; + OB_ASSERT((std::is_same::value) == true); + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + } else if (OB_FAIL(iterator_impl_.get_entry(entry, lsn, is_raw_write)) && OB_ITER_END != ret) { + PALF_LOG(WARN, "PalfIterator get_entry failed", K(ret), K(entry), K(lsn), KPC(this)); + } else { + buffer = entry.get_data_buf(); + nbytes = entry.get_data_len(); + scn = entry.get_scn(); + PALF_LOG(TRACE, "PalfIterator get_entry success", K(iterator_impl_), K(ret), KPC(this), K(entry)); + } + return ret; + } + + int get_entry_(const char *&buffer, int64_t &nbytes, share::SCN &scn, LSN &lsn, int64_t &log_proposal_id, + bool &is_raw_write) + { + int ret = OB_SUCCESS; + LogEntryType entry; + OB_ASSERT((std::is_same::value) == true); + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + } else if (OB_FAIL(iterator_impl_.get_entry(entry, lsn, is_raw_write)) && OB_ITER_END != ret) { + PALF_LOG(WARN, "PalfIterator get_group_entry failed", K(ret), K(entry), K(lsn), KPC(this)); + } else { + buffer = entry.get_data_buf() - entry.get_header_size(); + nbytes = entry.get_serialize_size(); + scn = entry.get_scn(); + log_proposal_id = entry.get_header().get_log_proposal_id(); + PALF_LOG(TRACE, "PalfIterator get_group_entry success", K(iterator_impl_), K(ret), KPC(this), K(entry)); + } + return ret; + } + private: PalfIteratorStorage iterator_storage_; LogIteratorImpl iterator_impl_; diff --git a/src/observer/ob_srv_xlator_primary.cpp b/src/observer/ob_srv_xlator_primary.cpp index eafb786e71..d7b6a4e80b 100644 --- a/src/observer/ob_srv_xlator_primary.cpp +++ b/src/observer/ob_srv_xlator_primary.cpp @@ -197,6 +197,7 @@ void oceanbase::observer::init_srv_xlator_for_palfenv(ObSrvRpcXlator *xlator) RPC_PROCESSOR(palf::LogPushReqP); RPC_PROCESSOR(palf::LogPushRespP); RPC_PROCESSOR(palf::LogFetchReqP); + RPC_PROCESSOR(palf::LogBatchFetchRespP); RPC_PROCESSOR(palf::LogPrepareReqP); RPC_PROCESSOR(palf::LogPrepareRespP); RPC_PROCESSOR(palf::LogChangeConfigMetaReqP);