Batch small logs to optimize the speed of fetching log

This commit is contained in:
obdev
2023-06-05 03:16:00 +00:00
committed by ob-robot
parent 0226f0695f
commit 89ed784966
19 changed files with 677 additions and 85 deletions

View File

@ -924,6 +924,7 @@ PCODE_DEF(OB_LOG_GET_PALF_STAT, 0x151D)
PCODE_DEF(OB_LOG_NOTIFY_FETCH_LOG, 0x151E)
PCODE_DEF(OB_LOG_GET_STAT, 0x151F)
PCODE_DEF(OB_LOG_FORCE_SET_LS_AS_SINGLE_REPLICA, 0x1520)
PCODE_DEF(OB_LOG_BATCH_FETCH_RESP, 0X1523)
// 1531-1550 for obesi
// PCODE_DEF(OB_ESI_IS_EXIST, 0x1531)

View File

@ -622,6 +622,9 @@ int ObLogDeliver::handle_req_(rpc::ObRequest &req)
case obrpc::OB_LOG_FETCH_REQ: {
PROCESS(LogFetchReqP);
}
case obrpc::OB_LOG_BATCH_FETCH_RESP: {
PROCESS(LogBatchFetchRespP);
}
case obrpc::OB_LOG_PREPARE_REQ: {
PROCESS(LogPrepareReqP);
}

View File

@ -589,9 +589,9 @@ TEST_F(TestObSimpleLogClusterArbService, test_multi_meta_block)
ASSERT_EQ(OB_SUCCESS, get_palf_handle_lite(OB_SERVER_TENANT_ID, id, arb_server, arb_guard));
PalfHandleLite *arb_palf = dynamic_cast<PalfHandleLite *>(arb_guard.palf_handle_impl_);
LogEngine *log_engine = &arb_palf->log_engine_;
LSN meta_tail = log_engine->log_meta_storage_.log_tail_;
LogStorage *meta_storage = &log_engine->log_meta_storage_;
EXPECT_EQ(OB_SUCCESS, log_engine->append_log_meta_(log_engine->log_meta_));
LSN meta_tail = log_engine->log_meta_storage_.log_tail_;
ASSERT_NE(meta_tail, LSN(log_engine->log_meta_storage_.logical_block_size_));
}
EXPECT_EQ(OB_SUCCESS, restart_paxos_groups());

View File

@ -1031,6 +1031,24 @@ int LogEngine::submit_fetch_log_req(const ObAddr &server,
return ret;
}
int LogEngine::submit_batch_fetch_log_resp(const common::ObAddr &server,
const int64_t msg_proposal_id,
const int64_t prev_log_proposal_id,
const LSN &prev_lsn,
const LSN &curr_lsn,
const LogWriteBuf &write_buf)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
} else {
ret = log_net_service_.submit_batch_fetch_log_resp(
server, msg_proposal_id, prev_log_proposal_id,
prev_lsn, curr_lsn, write_buf);
}
return ret;
}
int LogEngine::submit_notify_rebuild_req(const ObAddr &server,
const LSN &base_lsn,
const LogInfo &base_prev_log_info)

View File

@ -347,7 +347,12 @@ public:
const int64_t fetch_log_size,
const int64_t fetch_log_count,
const int64_t accepted_mode_pid);
virtual int submit_batch_fetch_log_resp(const common::ObAddr &server,
const int64_t msg_proposal_id,
const int64_t prev_log_proposal_id,
const LSN &prev_lsn,
const LSN &curr_lsn,
const LogWriteBuf &write_buf);
virtual int submit_register_parent_req(const common::ObAddr &server,
const LogLearner &child_itself,
const bool is_to_leader);

View File

@ -623,7 +623,7 @@ int BatchLogIOFlushLogTask::push_flush_cb_to_thread_pool_(int tg_id, IPalfEnvImp
LogIOFlushLogTask *io_task = io_task_array_[i];
if (NULL == io_task) {
PALF_LOG(WARN, "io_task is nullptr, may be its' epoch has changed", K(ret), KP(io_task),
KPC(this));
KPC(this));
} else if (FALSE_IT(io_task->push_cb_into_cb_pool_ts_ = current_time)) {
} else if (OB_FAIL(push_task_into_cb_thread_pool(tg_id, io_task))) {
// avoid memory leak when push task into cb thread pool failed.

View File

@ -159,6 +159,26 @@ int LogNetService::submit_fetch_log_req(
return ret;
}
int LogNetService::submit_batch_fetch_log_resp(
const common::ObAddr &server,
const int64_t msg_proposal_id,
const int64_t prev_log_proposal_id,
const LSN &prev_lsn,
const LSN &curr_lsn,
const LogWriteBuf &write_buf)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
PALF_LOG(ERROR, "LogNetService not inited!!!", K(ret), K(palf_id_));
} else {
LogBatchFetchResp batch_fetch_log_resp(msg_proposal_id, prev_log_proposal_id,
prev_lsn, curr_lsn, write_buf);
ret = post_request_to_server_(server, batch_fetch_log_resp);
}
return ret;
}
int LogNetService::submit_notify_rebuild_req(
const ObAddr &server,
const LSN &base_lsn,

View File

@ -128,6 +128,14 @@ public:
const int64_t fetch_log_count,
const int64_t accepted_mode_pid);
int submit_batch_fetch_log_resp(
const common::ObAddr &server,
const int64_t msg_proposal_id,
const int64_t prev_log_proposal_id,
const LSN &prev_lsn,
const LSN &curr_lsn,
const LogWriteBuf &write_buf);
int submit_notify_rebuild_req(
const ObAddr &server,
const LSN &base_lsn,
@ -261,8 +269,7 @@ int LogNetService::post_request_to_server_(
{
int ret = common::OB_SUCCESS;
if (OB_FAIL(log_rpc_->post_request(server, palf_id_, req))) {
// PALF_LOG(WARN, "LogRpc post_request failed", K(ret), K(palf_id_),
// K(req), K(server));
PALF_LOG(WARN, "LogRpc post_request failed", K(ret), K(palf_id_), K(req), K(server));
} else {
PALF_LOG(TRACE, "post_request_to_server_ success", K(ret), K(server), K(palf_id_), K(req));
}

View File

@ -773,5 +773,97 @@ OB_SERIALIZE_MEMBER(LogGetStatResp, max_scn_, end_lsn_);
// ================= LogGetStatResp end ================
LogBatchFetchResp::LogBatchFetchResp()
: msg_proposal_id_(INVALID_PROPOSAL_ID),
prev_log_proposal_id_(INVALID_PROPOSAL_ID),
prev_lsn_(),
curr_lsn_(),
write_buf_()
{
}
LogBatchFetchResp::LogBatchFetchResp(const int64_t msg_proposal_id,
const int64_t prev_log_proposal_id,
const LSN &prev_lsn,
const LSN &curr_lsn,
const LogWriteBuf &write_buf)
: msg_proposal_id_(msg_proposal_id),
prev_log_proposal_id_(prev_log_proposal_id),
prev_lsn_(prev_lsn),
curr_lsn_(curr_lsn),
write_buf_(write_buf)
{
}
LogBatchFetchResp::~LogBatchFetchResp()
{
reset();
}
bool LogBatchFetchResp::is_valid() const
{
return INVALID_PROPOSAL_ID != msg_proposal_id_
&& true == curr_lsn_.is_valid()
&& true == write_buf_.is_valid();
}
void LogBatchFetchResp::reset()
{
msg_proposal_id_ = INVALID_PROPOSAL_ID;
prev_log_proposal_id_ = INVALID_PROPOSAL_ID;
prev_lsn_.reset();
curr_lsn_.reset();
write_buf_.reset();
}
OB_DEF_SERIALIZE(LogBatchFetchResp)
{
int ret = OB_SUCCESS;
int64_t new_pos = pos;
if (NULL == buf || pos < 0 || pos > buf_len) {
ret = OB_INVALID_ARGUMENT;
} else if (OB_FAIL(serialization::encode_i64(buf, buf_len, new_pos, msg_proposal_id_))
|| OB_FAIL(serialization::encode_i64(buf, buf_len, new_pos, prev_log_proposal_id_))
|| OB_FAIL(prev_lsn_.serialize(buf, buf_len, new_pos))
|| OB_FAIL(curr_lsn_.serialize(buf, buf_len, new_pos))
|| OB_FAIL(write_buf_.serialize(buf, buf_len, new_pos))) {
PALF_LOG(ERROR, "LogBatchFetchResp serialize failed", K(ret), K(new_pos));
} else {
pos = new_pos;
}
return ret;
}
OB_DEF_DESERIALIZE(LogBatchFetchResp)
{
int ret = OB_SUCCESS;
int64_t new_pos = pos;
if (NULL == buf || pos < 0 || pos > data_len) {
ret = OB_INVALID_ARGUMENT;
} else if (OB_FAIL(serialization::decode_i64(buf, data_len, new_pos,
reinterpret_cast<int64_t *>(&msg_proposal_id_)))
|| OB_FAIL(serialization::decode_i64(buf, data_len, new_pos,
reinterpret_cast<int64_t *>(&prev_log_proposal_id_)))
|| OB_FAIL(prev_lsn_.deserialize(buf, data_len, new_pos))
|| OB_FAIL(curr_lsn_.deserialize(buf, data_len, new_pos))
|| OB_FAIL(write_buf_.deserialize(buf, data_len, new_pos))) {
PALF_LOG(ERROR, "LogBatchFetchResp deserialize failed", K(ret), K(new_pos));
} else {
pos = new_pos;
}
return ret;
}
OB_DEF_SERIALIZE_SIZE(LogBatchFetchResp)
{
int64_t size = 0;
size += serialization::encoded_length_i64(msg_proposal_id_);
size += serialization::encoded_length_i64(prev_log_proposal_id_);
size += prev_lsn_.get_serialize_size();
size += curr_lsn_.get_serialize_size();
size += write_buf_.get_serialize_size();
return size;
}
} // end namespace palf
} // end namespace oceanbase

View File

@ -105,6 +105,26 @@ public:
int64_t accepted_mode_pid_;
};
struct LogBatchFetchResp {
OB_UNIS_VERSION(1);
public:
LogBatchFetchResp();
LogBatchFetchResp(const int64_t msg_proposal_id,
const int64_t prev_log_proposal_id,
const LSN &prev_lsn,
const LSN &curr_lsn,
const LogWriteBuf &write_buf);
~LogBatchFetchResp();
bool is_valid() const;
void reset();
TO_STRING_KV(K_(msg_proposal_id), K_(prev_log_proposal_id), K_(prev_lsn), K_(curr_lsn), K_(write_buf));
int64_t msg_proposal_id_;
int64_t prev_log_proposal_id_;
LSN prev_lsn_;
LSN curr_lsn_;
LogWriteBuf write_buf_;
};
struct NotifyRebuildReq {
OB_UNIS_VERSION(1);
public:

View File

@ -190,6 +190,32 @@ int LogRequestHandler::handle_request<LogFetchReq>(
return ret;
}
template <>
int LogRequestHandler::handle_request<LogBatchFetchResp>(
const int64_t palf_id,
const ObAddr &server,
const LogBatchFetchResp &req)
{
int ret = common::OB_SUCCESS;
if (false == is_valid_palf_id(palf_id) || false == req.is_valid()) {
ret = OB_INVALID_ARGUMENT;
PALF_LOG(ERROR, "Invalid argument!!!", K(ret), K(palf_id), K(req), KPC(palf_env_impl_));
} else {
const char *buf = req.write_buf_.write_buf_[0].buf_;
const int64_t buf_len = req.write_buf_.write_buf_[0].buf_len_;
IPalfHandleImplGuard guard;
if (OB_FAIL(palf_env_impl_->get_palf_handle_impl(palf_id, guard))) {
PALF_LOG(WARN, "PalfEnvImpl get_palf_handle_impl failed", K(ret), K(palf_id));
} else if (OB_FAIL(guard.get_palf_handle_impl()->receive_batch_log(server, req.msg_proposal_id_,
req.prev_log_proposal_id_, req.prev_lsn_, req.curr_lsn_, buf, buf_len))) {
PALF_LOG(WARN, "PalfHandleImpl receive_batch_log failed", K(ret), K(server), K(req), KPC(palf_env_impl_));
} else {
PALF_LOG(TRACE, "PalfHandleImpl receive_batch_log success", K(ret), K(server), K(req), KPC(palf_env_impl_));
}
}
return ret;
}
template <>
int LogRequestHandler::handle_request<LogPrepareReq>(
const int64_t palf_id,

View File

@ -47,6 +47,11 @@ DEFINE_RPC_PROCESSOR(LogFetchReqP,
LogFetchReq,
obrpc::OB_LOG_FETCH_REQ);
DEFINE_RPC_PROCESSOR(LogBatchFetchRespP,
obrpc::LogRpcProxyV2,
LogBatchFetchResp,
obrpc::OB_LOG_BATCH_FETCH_RESP);
DEFINE_RPC_PROCESSOR(LogPrepareReqP,
obrpc::LogRpcProxyV2,
LogPrepareReq,

View File

@ -74,6 +74,8 @@ DEFINE_RPC_PROXY_POST_FUNCTION(LogPushResp,
OB_LOG_PUSH_RESP);
DEFINE_RPC_PROXY_POST_FUNCTION(LogFetchReq,
OB_LOG_FETCH_REQ);
DEFINE_RPC_PROXY_POST_FUNCTION(LogBatchFetchResp,
OB_LOG_BATCH_FETCH_RESP);
DEFINE_RPC_PROXY_POST_FUNCTION(LogPrepareReq,
OB_LOG_PREPARE_REQ);
DEFINE_RPC_PROXY_POST_FUNCTION(LogPrepareResp,

View File

@ -40,6 +40,9 @@ public:
DECLARE_RPC_PROXY_POST_FUNCTION(PR3,
LogFetchReq,
OB_LOG_FETCH_REQ);
DECLARE_RPC_PROXY_POST_FUNCTION(PR3,
LogBatchFetchResp,
OB_LOG_BATCH_FETCH_RESP);
DECLARE_RPC_PROXY_POST_FUNCTION(PR3,
LogPrepareReq,
OB_LOG_PREPARE_REQ);

View File

@ -1804,11 +1804,11 @@ void LogSlidingWindow::try_update_committed_lsn_for_fetch_(
LSN last_committed_end_lsn;
last_fetch_end_lsn.val_ = ATOMIC_LOAD(&last_fetch_end_lsn_.val_);
last_committed_end_lsn.val_ = ATOMIC_LOAD(&last_fetch_committed_end_lsn_.val_);
int64_t last_fetch_max_log_id = OB_INVALID_LOG_ID;
get_last_fetch_info_(last_fetch_end_lsn, last_committed_end_lsn, last_fetch_max_log_id);
if (!last_fetch_end_lsn.is_valid() || last_committed_end_lsn.is_valid()) {
// no need update
} else {
int64_t last_fetch_max_log_id = OB_INVALID_LOG_ID;
get_last_fetch_info_(last_fetch_end_lsn, last_committed_end_lsn, last_fetch_max_log_id);
if (last_fetch_end_lsn.is_valid()
&& !last_committed_end_lsn.is_valid()
&& (log_end_lsn >= last_fetch_end_lsn || log_id == last_fetch_max_log_id)) {
@ -1825,7 +1825,7 @@ void LogSlidingWindow::try_update_committed_lsn_for_fetch_(
K(log_id), K_(last_fetch_max_log_id), K_(last_fetch_end_lsn),
K_(last_fetch_committed_end_lsn));
} else if (last_fetch_end_lsn == last_fetch_end_lsn_
|| log_id == last_fetch_max_log_id_) {
|| last_fetch_max_log_id == last_fetch_max_log_id_) {
LSN committed_end_lsn;
get_committed_end_lsn_(committed_end_lsn);
// The order is fatal:

View File

@ -2617,14 +2617,14 @@ int PalfHandleImpl::handle_committed_info(const common::ObAddr &server,
return ret;
}
int PalfHandleImpl::receive_log(const common::ObAddr &server,
const PushLogType push_log_type,
const int64_t &msg_proposal_id,
const LSN &prev_lsn,
const int64_t &prev_log_proposal_id,
const LSN &lsn,
const char *buf,
const int64_t buf_len)
int PalfHandleImpl::receive_log_(const common::ObAddr &server,
const PushLogType push_log_type,
const int64_t &msg_proposal_id,
const LSN &prev_lsn,
const int64_t &prev_log_proposal_id,
const LSN &lsn,
const char *buf,
const int64_t buf_len)
{
int ret = OB_SUCCESS;
TruncateLogInfo truncate_log_info;
@ -2712,6 +2712,68 @@ int PalfHandleImpl::receive_log(const common::ObAddr &server,
return ret;
}
int PalfHandleImpl::receive_log(const common::ObAddr &server,
const PushLogType push_log_type,
const int64_t &msg_proposal_id,
const LSN &prev_lsn,
const int64_t &prev_log_proposal_id,
const LSN &lsn,
const char *buf,
const int64_t buf_len)
{
return receive_log_(server, push_log_type, msg_proposal_id, prev_lsn, prev_log_proposal_id, lsn, buf, buf_len);
}
int PalfHandleImpl::receive_batch_log(const common::ObAddr &server,
const int64_t msg_proposal_id,
const int64_t prev_log_proposal_id,
const LSN &prev_lsn,
const LSN &curr_lsn,
const char *buf,
const int64_t buf_len)
{
int ret = OB_SUCCESS;
int64_t start_ts = ObTimeUtility::current_time();
MemPalfGroupBufferIterator iterator;
MemoryStorage storage;
auto get_file_end_lsn = [curr_lsn, buf_len] () { return curr_lsn + buf_len; };
if (OB_FAIL(iterator.init(curr_lsn, get_file_end_lsn, &storage))) {
PALF_LOG(ERROR, "init iterator failed", K(ret), KPC(this));
} else if (OB_FAIL(storage.init(curr_lsn))) {
PALF_LOG(ERROR, "init storage failed", K(ret), KPC(this));
} else if (OB_FAIL(storage.append(buf, buf_len))) {
PALF_LOG(ERROR, "storage append failed", K(ret), KPC(this));
} else {
LSN prev_lsn_each_round = prev_lsn;
int64_t prev_log_proposal_id_each_round = prev_log_proposal_id;
LSN curr_lsn_each_round = curr_lsn;
int64_t curr_log_proposal_id = 0;
const char *buf_each_round = NULL;
int64_t buf_len_each_round = 0;
int64_t count = 0;
while (OB_SUCC(iterator.next())) {
if (OB_FAIL(iterator.get_entry(buf_each_round, buf_len_each_round, curr_lsn_each_round, curr_log_proposal_id))) {
PALF_LOG(ERROR, "get_entry failed", K(ret), KPC(this), K(iterator), KP(buf_each_round));
} else if (OB_FAIL(receive_log_(server, FETCH_LOG_RESP, msg_proposal_id, prev_lsn_each_round,
prev_log_proposal_id_each_round, curr_lsn_each_round, buf_each_round, buf_len_each_round))) {
PALF_LOG(WARN, "receive_log failed", K(ret), KPC(this), K(iterator), K(server), K(FETCH_LOG_RESP), K(msg_proposal_id),
K(prev_lsn_each_round), K(prev_log_proposal_id_each_round), K(curr_lsn_each_round), KP(buf_each_round),
K(buf_len_each_round));
}
prev_lsn_each_round = curr_lsn_each_round;
prev_log_proposal_id_each_round = curr_log_proposal_id;
count++;
}
if (OB_ITER_END == ret) {
ret = OB_SUCCESS;
}
int64_t cost_ts = ObTimeUtility::current_time() - start_ts;
PALF_LOG(TRACE, "receive_batch_log finished", K(ret), KPC(this), K(server), K(count), K(prev_lsn), K(curr_lsn),
K(buf_len), K(cost_ts), K_(sw), K(iterator));
}
return ret;
}
int PalfHandleImpl::submit_group_log(const PalfAppendOptions &opts,
const LSN &lsn,
const char *buf,
@ -3066,9 +3128,12 @@ int PalfHandleImpl::fetch_log_from_storage_(const common::ObAddr &server,
FetchLogStat &fetch_stat)
{
int ret = OB_SUCCESS;
int64_t send_cost = 0, get_entry_cost = 0;
LSN prev_lsn_each_round = prev_lsn;
LSN prev_end_lsn_each_round = prev_lsn;
int64_t prev_log_proposal_id_each_round = INVALID_PROPOSAL_ID;
PalfGroupBufferIterator iterator;
const LSN fetch_end_lsn = fetch_start_lsn + fetch_log_size;
int64_t fetched_count = 0;
const bool need_check_prev_log = (prev_lsn.is_valid() && PALF_INITIAL_LSN_VAL < fetch_start_lsn.val_);
LSN max_flushed_end_lsn;
LSN committed_end_lsn;
@ -3098,9 +3163,7 @@ int PalfHandleImpl::fetch_log_from_storage_(const common::ObAddr &server,
// max_flushed_end_lsn may be truncated by concurrent truncate, so itreator need handle this
// case when it try to read some log which is being truncated.
auto get_file_end_lsn = [&]() {
return max_flushed_end_lsn;
};
auto get_file_end_lsn = [&]() { return max_flushed_end_lsn; };
LogInfo prev_log_info;
const bool no_need_fetch_log = (prev_lsn >= max_flushed_end_lsn) ||
(AccessMode::FLASHBACK == access_mode);
@ -3108,6 +3171,9 @@ int PalfHandleImpl::fetch_log_from_storage_(const common::ObAddr &server,
int64_t replica_num = 0;
(void) config_mgr_.get_curr_member_list(member_list, replica_num);
const bool is_dest_in_memberlist = (member_list.contains(server));
// Rpc delay increases enormously when it's size exceeds 2M.
const int64_t MAX_BATCH_LOG_SIZE_EACH_ROUND = 2 * 1024 * 1024 - 1024;
char *batch_log_buf = NULL;
if (no_need_fetch_log) {
PALF_LOG(INFO, "no need fetch_log_from_storage", K(ret), KPC(this), K(server), K(fetch_start_lsn), K(prev_lsn),
K(max_flushed_end_lsn), K(access_mode));
@ -3125,34 +3191,61 @@ int PalfHandleImpl::fetch_log_from_storage_(const common::ObAddr &server,
}
} else if (check_need_hook_fetch_log_(fetch_type, fetch_start_lsn)) {
ret = OB_ERR_OUT_OF_LOWER_BOUND;
} else if (FALSE_IT(prev_log_proposal_id_each_round = prev_log_info.log_proposal_id_)) {
} else if (OB_FAIL(iterator.init(fetch_start_lsn, get_file_end_lsn, log_engine_.get_log_storage()))) {
PALF_LOG(WARN, "PalfGroupBufferIterator init failed", K(ret), K_(palf_id));
} else {
bool need_print_error = false;
PALF_LOG(ERROR, "init iterator failed", K(ret), K_(palf_id));
} else if (FALSE_IT(iterator.set_need_print_error(false))) {
// NB: Fetch log will be concurrent with truncate, the content on disk will not integrity, need igore
// read log error.
iterator.set_need_print_error(need_print_error);
LSN each_round_prev_lsn = prev_lsn;
} else if (GET_MIN_CLUSTER_VERSION() >= CLUSTER_VERSION_4_2_0_0 &&
OB_NOT_NULL(batch_log_buf = (char *)mtl_malloc(MAX_BATCH_LOG_SIZE_EACH_ROUND, "BatchLogBuf"))) {
// When group log size > 10K, we just send it without aggregating.
const int64_t MAX_NEED_BATCH_LOG_SIZE = 10 * 1024;
// When batched log count reaches max_batch_log_count_each_round, we end the aggregation.
const int64_t max_batch_log_count_each_round = PALF_SLIDING_WINDOW_SIZE;
int64_t remained_count = fetch_log_count;
bool is_reach_end = false;
bool skip_next = false;
BatchFetchParams batch_fetch_params;
batch_fetch_params.batch_log_buf_ = batch_log_buf;
batch_fetch_params.can_batch_size_ = MAX_BATCH_LOG_SIZE_EACH_ROUND;
batch_fetch_params.last_log_lsn_prev_round_ = prev_lsn_each_round;
batch_fetch_params.last_log_end_lsn_prev_round_ = prev_end_lsn_each_round;
batch_fetch_params.last_log_proposal_id_prev_round_ = prev_log_proposal_id_each_round;
while (OB_SUCC(ret) && remained_count > 0 && !is_reach_end &&
batch_fetch_params.last_log_end_lsn_prev_round_ < fetch_end_lsn) {
batch_fetch_params.can_batch_count_ = MIN(remained_count, max_batch_log_count_each_round);
batch_fetch_params.has_consumed_count_ = 0;
if (OB_FAIL(batch_fetch_log_each_round_(server, msg_proposal_id, iterator, is_limitted_by_end_lsn,
is_dest_in_memberlist, replayable_point, fetch_end_lsn, committed_end_lsn, MAX_NEED_BATCH_LOG_SIZE,
batch_fetch_params, skip_next, is_reach_end, fetch_stat)) && OB_ITER_END != ret) {
PALF_LOG(WARN, "batch_fetch_log_each_round_ failed", K(ret), KPC(this), K(iterator));
} else {
remained_count -= batch_fetch_params.has_consumed_count_;
}
}
prev_lsn_each_round = batch_fetch_params.last_log_lsn_prev_round_;
prev_end_lsn_each_round = batch_fetch_params.last_log_end_lsn_prev_round_;
prev_log_proposal_id_each_round = batch_fetch_params.last_log_proposal_id_prev_round_;
fetched_count = fetch_log_count - remained_count;
} else {
if (GET_MIN_CLUSTER_VERSION() >= CLUSTER_VERSION_4_2_0_0) {
PALF_LOG(WARN, "allocate batch_log_buf memory failed", KPC(this), K(server), K(prev_lsn));
}
LogGroupEntry curr_group_entry;
LSN curr_lsn;
LSN curr_log_end_lsn;
bool is_reach_size_limit = false; // whether the total fetched size exceeds fetch_log_size
bool is_reach_count_limit = false;
bool is_reach_end = false;
int64_t fetched_count = 0;
int64_t total_size = 0;
int64_t read_cost = 0;
LSN curr_log_end_lsn = curr_lsn + curr_group_entry.get_group_entry_size();
LSN prev_log_end_lsn;
int64_t prev_log_proposal_id = prev_log_info.log_proposal_id_;
int64_t read_begin_time = ObTimeUtility::current_time(), send_begin_time = read_begin_time, tmp_ts = 0;
int64_t send_cost = 0;
int64_t send_begin_time = ObTimeUtility::current_time();
while (OB_SUCC(ret) && !is_reach_size_limit && !is_reach_count_limit && !is_reach_end
&& OB_SUCC(iterator.next())) {
tmp_ts = ObTimeUtility::current_time();
read_cost += tmp_ts - read_begin_time;
if (OB_FAIL(iterator.get_entry(curr_group_entry, curr_lsn))) {
PALF_LOG(ERROR, "PalfGroupBufferIterator get_entry failed", K(ret), K_(palf_id),
K(curr_group_entry), K(curr_lsn), K(iterator));
} else if (FALSE_IT(get_entry_cost += ObTimeUtility::current_time() - tmp_ts)) {
} else if (FALSE_IT(curr_log_end_lsn = curr_lsn + curr_group_entry.get_group_entry_size())) {
} else if (is_limitted_by_end_lsn && curr_log_end_lsn > committed_end_lsn) {
// Only leader replica can send uncommitted logs to others,
@ -3168,10 +3261,10 @@ int PalfHandleImpl::fetch_log_from_storage_(const common::ObAddr &server,
PALF_LOG(INFO, "non paxos member could not fetch logs which scn is bigger than replayable_point, end fetch",
K_(palf_id), K(server), K(msg_proposal_id), K(curr_lsn), K(replayable_point));
} else if (FALSE_IT(send_begin_time = ObTimeUtility::current_time())) {
} else if (OB_FAIL(submit_fetch_log_resp_(server, msg_proposal_id, prev_log_proposal_id, \
each_round_prev_lsn, curr_lsn, curr_group_entry))) {
} else if (OB_FAIL(submit_fetch_log_resp_(server, msg_proposal_id, prev_log_proposal_id_each_round, \
prev_lsn_each_round, curr_lsn, curr_group_entry))) {
PALF_LOG(WARN, "submit_fetch_log_resp_ failed", K(ret), K_(palf_id), K(server),
K(msg_proposal_id), K(each_round_prev_lsn), K(fetch_start_lsn));
K(msg_proposal_id), K(prev_lsn_each_round), K(fetch_start_lsn));
} else {
send_cost += ObTimeUtility::current_time() - send_begin_time;
fetched_count++;
@ -3184,31 +3277,34 @@ int PalfHandleImpl::fetch_log_from_storage_(const common::ObAddr &server,
is_reach_size_limit = true;
}
PALF_LOG(TRACE, "fetch one log success", K(ret), K_(palf_id), K_(self), K(server), K(prev_lsn),
K(fetch_start_lsn), K(each_round_prev_lsn), K(curr_lsn), K(curr_group_entry),
K(prev_log_proposal_id), K(fetch_end_lsn), K(curr_log_end_lsn), K(is_reach_size_limit),
K(fetch_start_lsn), K(prev_lsn_each_round), K(curr_lsn), K(curr_group_entry),
K(prev_log_proposal_id_each_round), K(fetch_end_lsn), K(curr_log_end_lsn), K(is_reach_size_limit),
K(fetch_log_size), K(fetched_count), K(is_reach_count_limit));
each_round_prev_lsn = curr_lsn;
prev_log_end_lsn = curr_log_end_lsn;
prev_log_proposal_id = curr_group_entry.get_header().get_log_proposal_id();
prev_lsn_each_round = curr_lsn;
prev_end_lsn_each_round = curr_log_end_lsn;
prev_log_proposal_id_each_round = curr_group_entry.get_header().get_log_proposal_id();
}
read_begin_time = ObTimeUtility::current_time();
}
if (OB_ITER_END == ret) {
ret = OB_SUCCESS;
}
// try send committed_info to server
if (OB_SUCC(ret)) {
RLockGuard guard(lock_);
(void) try_send_committed_info_(server, each_round_prev_lsn, prev_log_end_lsn, prev_log_proposal_id);
}
// update fetch statistic info
fetch_stat.total_size_ = total_size;
fetch_stat.group_log_cnt_ = fetched_count;
fetch_stat.read_cost_ = read_cost;
fetch_stat.get_cost_ = get_entry_cost;
fetch_stat.send_cost_ = send_cost;
}
if (batch_log_buf != NULL) {
mtl_free(batch_log_buf);
batch_log_buf = NULL;
}
if (OB_ITER_END == ret) {
ret = OB_SUCCESS;
}
PALF_LOG(INFO, "fetch_log_from_storage_ finished", K(ret), KPC(this), K(prev_lsn), K(iterator),
K(fetch_log_count), "has_fetched_log_count", fetched_count);
// try send committed_info to server
if (OB_SUCC(ret)) {
RLockGuard guard(lock_);
(void) try_send_committed_info_(server, prev_lsn_each_round, prev_end_lsn_each_round,
prev_log_proposal_id_each_round);
}
if (OB_FAIL(ret) && OB_ERR_OUT_OF_LOWER_BOUND == ret) {
// ret is OB_ERR_OUT_OF_LOWER_BOUND, need notify dst server to trigger rebuild
LSN base_lsn = log_engine_.get_log_meta().get_log_snapshot_meta().base_lsn_;
@ -3230,6 +3326,152 @@ int PalfHandleImpl::fetch_log_from_storage_(const common::ObAddr &server,
return ret;
}
int PalfHandleImpl::batch_fetch_log_each_round_(const common::ObAddr &server,
const int64_t msg_proposal_id,
PalfGroupBufferIterator &iterator,
const bool is_limitted_by_end_lsn,
const bool is_dest_in_memberlist,
const share::SCN& replayable_point,
const LSN &fetch_end_lsn,
const LSN &committed_end_lsn,
const int64_t max_need_batch_log_size,
BatchFetchParams &batch_fetch_params,
bool &skip_next,
bool &is_reach_end,
FetchLogStat &fetch_stat)
{
int ret = OB_SUCCESS;
int64_t remained_count = batch_fetch_params.can_batch_count_;
int64_t remained_size = batch_fetch_params.can_batch_size_;
LSN prev_lsn = batch_fetch_params.last_log_lsn_prev_round_;
LSN prev_end_lsn = batch_fetch_params.last_log_end_lsn_prev_round_;
int64_t prev_log_proposal_id = batch_fetch_params.last_log_proposal_id_prev_round_;
int64_t has_consumed_count = batch_fetch_params.has_consumed_count_;
LSN curr_lsn;
LSN curr_end_lsn;
int64_t curr_log_proposal_id;
LSN first_log_lsn;
const char *curr_log_buf = NULL;
int64_t curr_log_buf_len = 0;
int64_t log_end_pos = 0;
bool has_reach_threshold = false;
share::SCN curr_scn;
bool is_raw_write;
int64_t start_ts = ObTimeUtility::current_time();
while (OB_SUCC(ret) && !is_reach_end && remained_count > 0 && remained_size > 0) {
if (!skip_next && OB_FAIL(iterator.next())) {
PALF_LOG(WARN, "iterator next failed", K(ret), KPC(this), K(iterator), K(replayable_point), K(prev_lsn));
} else if (FALSE_IT(skip_next = false)) {
} else if (OB_FAIL(iterator.get_entry(curr_log_buf, curr_log_buf_len, curr_scn, curr_lsn, curr_log_proposal_id,
is_raw_write))){
PALF_LOG(WARN, "iterator get_entry failed", K(ret), KPC(this), K(iterator), K(log_end_pos));
} else if (false == is_dest_in_memberlist && is_raw_write && replayable_point.is_valid() &&
curr_scn > replayable_point) {
is_reach_end = true;
PALF_LOG(INFO, "non paxos member could not fetch logs which scn is bigger than replayable_point, end fetch",
K_(palf_id), K(server), K(msg_proposal_id), K(curr_lsn), K(replayable_point));
} else if (FALSE_IT(curr_end_lsn = curr_lsn + curr_log_buf_len)) {
} else if (is_limitted_by_end_lsn && curr_end_lsn > committed_end_lsn){
// Only leader replica can send uncommitted logs to others,
// the other replicas just send committed logs to avoid unexpected rewriting.
is_reach_end = true;
PALF_LOG(INFO, "reach committed_end_lsn(not leader active replica), end fetch", K(ret), K_(palf_id), K(server),
K(msg_proposal_id), K(curr_lsn), K(curr_end_lsn), K(committed_end_lsn));
} else if (curr_log_buf_len >= max_need_batch_log_size ) {
has_reach_threshold = true;
PALF_LOG(TRACE, "group log is bigger than batch_log_size_threshold", K(ret), K_(palf_id), K(server),
K(msg_proposal_id), K(curr_lsn), K(curr_end_lsn), K(max_need_batch_log_size));
break;
} else if (remained_size >= curr_log_buf_len ) {
if (!first_log_lsn.is_valid()) {
first_log_lsn = curr_lsn;
}
MEMCPY(batch_fetch_params.batch_log_buf_ + log_end_pos, curr_log_buf, curr_log_buf_len);
log_end_pos += curr_log_buf_len;
prev_lsn = curr_lsn;
prev_end_lsn = curr_end_lsn;
prev_log_proposal_id = curr_log_proposal_id;
has_consumed_count++;
remained_count--;
remained_size -= curr_log_buf_len;
if (curr_end_lsn >= fetch_end_lsn) {
PALF_LOG(INFO, "fetched log has reached fetch_end_lsn", K(ret), KPC(this), K(curr_lsn), K(curr_end_lsn),
K(fetch_end_lsn));
is_reach_end = true;
}
} else {
// batch the group log next round
skip_next = true;
PALF_LOG(TRACE, "batched log has exceeded can_batch_size", K(ret), KPC(this), K(batch_fetch_params.can_batch_size_),
K(remained_size), K(prev_lsn), K(curr_lsn));
break;
}
}
int64_t batch_end_ts = common::ObTimeUtility::current_time();
if (OB_ITER_END == ret) {
is_reach_end = true;
ret = OB_SUCCESS;
}
if (OB_FAIL(ret)) {
PALF_LOG(WARN, "batch log failed", K(ret), KPC(this), K(iterator), K(log_end_pos),
K(curr_lsn), K(has_consumed_count), K(first_log_lsn));
} else {
if (1 < has_consumed_count) {
ret = submit_batch_fetch_log_resp_(server, msg_proposal_id, batch_fetch_params.last_log_proposal_id_prev_round_,
batch_fetch_params.last_log_lsn_prev_round_, first_log_lsn, batch_fetch_params.batch_log_buf_, log_end_pos);
} else if (1 == has_consumed_count) {
ret = submit_fetch_log_resp_(server, msg_proposal_id, batch_fetch_params.last_log_proposal_id_prev_round_,
batch_fetch_params.last_log_lsn_prev_round_, first_log_lsn, batch_fetch_params.batch_log_buf_, log_end_pos);
} else {
PALF_LOG(TRACE, "no log is aggregated", K(ret), KPC(this), K(log_end_pos), K(curr_lsn),
K(has_consumed_count), K(first_log_lsn));
}
}
if (OB_SUCC(ret) && has_reach_threshold) {
if (OB_FAIL(submit_fetch_log_resp_(server, msg_proposal_id, prev_log_proposal_id, prev_lsn,
curr_lsn, curr_log_buf, curr_log_buf_len))) {
PALF_LOG(WARN, "submit_fetch_log_resp_ failed", K(ret), KPC(this), K(prev_lsn),
K(curr_lsn), K(curr_log_buf_len));
} else {
log_end_pos += curr_log_buf_len;
prev_lsn = curr_lsn;
prev_end_lsn = curr_end_lsn;
prev_log_proposal_id = curr_log_proposal_id;
has_consumed_count++;
remained_count--;
remained_size -= curr_log_buf_len;
PALF_LOG(TRACE, "submit_fetch_log_resp_ success", K(ret), KPC(this), K(prev_lsn),
K(curr_lsn), K(curr_log_buf_len));
}
}
int64_t send_end_ts = common::ObTimeUtility::current_time();
if (OB_SUCC(ret) && 0 != has_consumed_count) {
batch_fetch_params.last_log_lsn_prev_round_ = prev_lsn;
batch_fetch_params.last_log_end_lsn_prev_round_ = prev_end_lsn;
batch_fetch_params.last_log_proposal_id_prev_round_ = prev_log_proposal_id;
batch_fetch_params.has_consumed_count_ = has_consumed_count;
int64_t total_size = log_end_pos;
int64_t batch_cost = batch_end_ts - start_ts;
int64_t send_cost = send_end_ts - batch_end_ts;
int64_t total_cost = send_end_ts - start_ts;
fetch_stat.total_size_ += total_size;
fetch_stat.group_log_cnt_ += has_consumed_count;
fetch_stat.send_cost_ += send_cost;
int64_t avg_log_size = total_size / has_consumed_count;
int64_t avg_log_cost = total_cost / has_consumed_count;
int64_t avg_batch_cost = batch_cost / has_consumed_count;
int64_t avg_send_cost = send_cost/ has_consumed_count;
PALF_LOG(TRACE, "batch_fetch_log_one_round_ success", K(ret), KPC(this), K(is_reach_end),
K(batch_fetch_params.can_batch_size_), K(remained_size), K(batch_fetch_params.can_batch_count_),
K(has_consumed_count), K(total_size), K(total_cost), K(batch_cost), K(send_cost), K(avg_log_size),
K(avg_log_cost), K(avg_send_cost), K(avg_batch_cost), K(first_log_lsn),
K(batch_fetch_params.last_log_lsn_prev_round_), K(iterator));
}
return ret;
}
int PalfHandleImpl::submit_fetch_log_resp_(const common::ObAddr &server,
const int64_t &msg_proposal_id,
const int64_t &prev_log_proposal_id,
@ -3257,6 +3499,54 @@ int PalfHandleImpl::submit_fetch_log_resp_(const common::ObAddr &server,
return ret;
}
int PalfHandleImpl::submit_fetch_log_resp_(const common::ObAddr &server,
const int64_t &msg_proposal_id,
const int64_t &prev_log_proposal_id,
const LSN &prev_lsn,
const LSN &curr_lsn,
const char *buf,
const int64_t buf_len)
{
int ret = OB_SUCCESS;
LogWriteBuf write_buf;
// NB: 'curr_group_entry' generates by PalfGroupBufferIterator, the memory is safe before next();
if (OB_FAIL(write_buf.push_back(buf, buf_len))) {
PALF_LOG(WARN, "push_back buf into LogWriteBuf failed", K(ret));
} else if (OB_FAIL(log_engine_.submit_push_log_req(server, FETCH_LOG_RESP, msg_proposal_id, prev_log_proposal_id,
prev_lsn, curr_lsn, write_buf))) {
PALF_LOG(WARN, "submit_push_log_req failed", K(ret), K(server), K(msg_proposal_id), K(prev_log_proposal_id),
K(prev_lsn), K(curr_lsn), K(write_buf));
} else {
PALF_LOG(TRACE, "submit_fetch_log_resp_ success", K(ret), K(server), K(msg_proposal_id), K(prev_log_proposal_id),
K(prev_lsn), K(curr_lsn), K(write_buf));
}
return ret;
}
int PalfHandleImpl::submit_batch_fetch_log_resp_(const common::ObAddr &server,
const int64_t msg_proposal_id,
const int64_t prev_log_proposal_id,
const LSN &prev_lsn,
const LSN &curr_lsn,
const char *buf,
const int64_t buf_len)
{
int ret = OB_SUCCESS;
LogWriteBuf write_buf;
int64_t pos = 0;
if (OB_FAIL(write_buf.push_back(buf, buf_len))) {
PALF_LOG(WARN, "push_back buf into LogWriteBuf failed", K(ret));
} else if (OB_FAIL(log_engine_.submit_batch_fetch_log_resp(server, msg_proposal_id, prev_log_proposal_id,
prev_lsn, curr_lsn, write_buf))) {
PALF_LOG(WARN, "submit_push_log_req failed", K(ret), K(server), K(msg_proposal_id), K(prev_log_proposal_id),
K(prev_lsn), K(curr_lsn), K(write_buf));
} else {
PALF_LOG(TRACE, "submit_batch_fetch_log_resp_ success", K(ret), K(server), KPC(this), K(msg_proposal_id),
K(prev_log_proposal_id), K(buf_len), K(prev_lsn), K(curr_lsn), K(write_buf));
}
return ret;
}
// NB: 1. there is no need to distinguish between reconfirm or follower active;
// 2. whether msg_proposal_id is equal to currentry proposal_id or not, response logs to the requester.
int PalfHandleImpl::get_log(const common::ObAddr &server,

View File

@ -150,6 +150,33 @@ struct FetchLogStat {
K_(send_cost));
};
struct BatchFetchParams {
BatchFetchParams() { reset(); }
~BatchFetchParams() { reset(); }
void reset() {
can_batch_count_ = 0;
can_batch_size_ = 0;
batch_log_buf_ = NULL;
last_log_lsn_prev_round_.reset();
last_log_end_lsn_prev_round_.reset();
last_log_proposal_id_prev_round_ = 0;
}
TO_STRING_KV(K_(can_batch_count),
K_(can_batch_size),
KP_(batch_log_buf),
K_(last_log_lsn_prev_round),
K_(last_log_end_lsn_prev_round),
K_(last_log_proposal_id_prev_round),
K_(has_consumed_count));
int64_t can_batch_count_;
int64_t can_batch_size_;
char *batch_log_buf_;
LSN last_log_lsn_prev_round_;
LSN last_log_end_lsn_prev_round_;
int64_t last_log_proposal_id_prev_round_;
int64_t has_consumed_count_;
};
struct LSKey {
LSKey() : id_(-1) {}
explicit LSKey(const int64_t id) : id_(id) {}
@ -567,6 +594,13 @@ public:
const LSN &lsn,
const char *buf,
const int64_t buf_len) = 0;
virtual int receive_batch_log(const common::ObAddr &server,
const int64_t msg_proposal_id,
const int64_t prev_log_proposal_id,
const LSN &prev_lsn,
const LSN &curr_lsn,
const char *buf,
const int64_t buf_len) = 0;
virtual int ack_log(const common::ObAddr &server,
const int64_t &proposal_id,
const LSN &log_end_lsn) = 0;
@ -911,6 +945,13 @@ public:
const LSN &lsn,
const char *buf,
const int64_t buf_len) override final;
int receive_batch_log(const common::ObAddr &server,
const int64_t msg_proposal_id,
const int64_t prev_log_proposal_id,
const LSN &prev_lsn,
const LSN &curr_lsn,
const char *buf,
const int64_t buf_len) override final;
int ack_log(const common::ObAddr &server,
const int64_t &proposal_id,
const LSN &log_end_lsn) override final;
@ -1015,6 +1056,14 @@ private:
const LSN &log_lsn,
const LSN &log_end_lsn,
const int64_t &log_proposal_id);
int receive_log_(const common::ObAddr &server,
const PushLogType push_log_type,
const int64_t &msg_proposal_id,
const LSN &prev_lsn,
const int64_t &prev_log_proposal_id,
const LSN &lsn,
const char *buf,
const int64_t buf_len);
int fetch_log_from_storage_(const common::ObAddr &server,
const FetchLogType fetch_type,
const int64_t &msg_proposal_id,
@ -1024,12 +1073,39 @@ private:
const int64_t fetch_log_count,
const SCN &replayable_point,
FetchLogStat &fetch_stat);
int batch_fetch_log_each_round_(const common::ObAddr &server,
const int64_t msg_proposal_id,
PalfGroupBufferIterator &iterator,
const bool is_limitted_by_end_lsn,
const bool is_dest_in_memberlist,
const share::SCN& replayable_point,
const LSN &fetch_end_lsn,
const LSN &committed_end_lsn,
const int64_t batch_log_size_threshold,
BatchFetchParams &batch_fetch_params,
bool &skip_next,
bool &is_reach_end,
FetchLogStat &fetch_stat);
int submit_fetch_log_resp_(const common::ObAddr &server,
const int64_t &msg_proposal_id,
const int64_t &prev_log_proposal_id,
const LSN &prev_lsn,
const LSN &curr_lsn,
const LogGroupEntry &curr_group_entry);
int submit_fetch_log_resp_(const common::ObAddr &server,
const int64_t &msg_proposal_id,
const int64_t &prev_log_proposal_id,
const LSN &prev_lsn,
const LSN &curr_lsn,
const char *buf,
const int64_t buf_len);
int submit_batch_fetch_log_resp_(const common::ObAddr &server,
const int64_t msg_proposal_id,
const int64_t prev_log_proposal_id,
const LSN &prev_lsn,
const LSN &curr_lsn,
const char *buf,
const int64_t buf_len);
int try_update_proposal_id_(const common::ObAddr &server,
const int64_t &proposal_id);
int get_binary_search_range_(const share::SCN &scn,

View File

@ -183,23 +183,6 @@ public:
}
return ret;
}
int get_entry(const char *&buffer, int64_t &nbytes, share::SCN &scn, LSN &lsn, bool &is_raw_write)
{
int ret = OB_SUCCESS;
LogEntryType entry;
OB_ASSERT((std::is_same<LogEntryType, LogEntry>::value) == true);
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
} else if (OB_FAIL(iterator_impl_.get_entry(entry, lsn, is_raw_write)) && OB_ITER_END != ret) {
PALF_LOG(WARN, "PalfIterator get_entry failed", K(ret), K(entry), K(lsn), KPC(this));
} else {
buffer = entry.get_data_buf();
nbytes = entry.get_data_len();
scn = entry.get_scn();
PALF_LOG(TRACE, "PalfIterator get_entry success", K(ret), KPC(this), K(entry), K(is_raw_write));
}
return ret;
}
int get_entry(const char *&buffer, LogEntryType &entry, LSN& lsn)
{
int ret = OB_SUCCESS;
@ -217,21 +200,23 @@ public:
}
int get_entry(const char *&buffer, int64_t &nbytes, share::SCN &scn, LSN &lsn)
{
int ret = OB_SUCCESS;
LogEntryType entry;
bool unused_is_raw_write = false;
OB_ASSERT((std::is_same<LogEntryType, LogEntry>::value) == true);
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
} else if (OB_FAIL(iterator_impl_.get_entry(entry, lsn, unused_is_raw_write)) && OB_ITER_END != ret) {
PALF_LOG(WARN, "PalfIterator get_entry failed", K(ret), K(entry), K(lsn), KPC(this));
} else {
buffer = entry.get_data_buf();
nbytes = entry.get_data_len();
scn = entry.get_scn();
PALF_LOG(TRACE, "PalfIterator get_entry success", K(iterator_impl_), K(ret), KPC(this), K(entry));
}
return ret;
return get_entry_(buffer, nbytes, scn, lsn, unused_is_raw_write);
}
int get_entry(const char *&buffer, int64_t &nbytes, share::SCN &scn, LSN &lsn, bool &is_raw_write)
{
return get_entry_(buffer, nbytes, scn, lsn, is_raw_write);
}
int get_entry(const char *&buffer, int64_t &nbytes, LSN &lsn, int64_t &log_proposal_id)
{
share::SCN unused_scn;
bool unused_is_raw_write = false;
return get_entry_(buffer, nbytes, unused_scn, lsn, log_proposal_id, unused_is_raw_write);
}
int get_entry(const char *&buffer, int64_t &nbytes, share::SCN &scn, LSN &lsn, int64_t &log_proposal_id,
bool &is_raw_write)
{
return get_entry_(buffer, nbytes, scn, lsn, log_proposal_id, is_raw_write);
}
bool is_inited() const
{
@ -284,6 +269,44 @@ private:
return ret;
}
int get_entry_(const char *&buffer, int64_t &nbytes, share::SCN &scn, LSN &lsn, bool &is_raw_write)
{
int ret = OB_SUCCESS;
LogEntryType entry;
OB_ASSERT((std::is_same<LogEntryType, LogEntry>::value) == true);
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
} else if (OB_FAIL(iterator_impl_.get_entry(entry, lsn, is_raw_write)) && OB_ITER_END != ret) {
PALF_LOG(WARN, "PalfIterator get_entry failed", K(ret), K(entry), K(lsn), KPC(this));
} else {
buffer = entry.get_data_buf();
nbytes = entry.get_data_len();
scn = entry.get_scn();
PALF_LOG(TRACE, "PalfIterator get_entry success", K(iterator_impl_), K(ret), KPC(this), K(entry));
}
return ret;
}
int get_entry_(const char *&buffer, int64_t &nbytes, share::SCN &scn, LSN &lsn, int64_t &log_proposal_id,
bool &is_raw_write)
{
int ret = OB_SUCCESS;
LogEntryType entry;
OB_ASSERT((std::is_same<LogEntryType, LogGroupEntry>::value) == true);
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
} else if (OB_FAIL(iterator_impl_.get_entry(entry, lsn, is_raw_write)) && OB_ITER_END != ret) {
PALF_LOG(WARN, "PalfIterator get_group_entry failed", K(ret), K(entry), K(lsn), KPC(this));
} else {
buffer = entry.get_data_buf() - entry.get_header_size();
nbytes = entry.get_serialize_size();
scn = entry.get_scn();
log_proposal_id = entry.get_header().get_log_proposal_id();
PALF_LOG(TRACE, "PalfIterator get_group_entry success", K(iterator_impl_), K(ret), KPC(this), K(entry));
}
return ret;
}
private:
PalfIteratorStorage iterator_storage_;
LogIteratorImpl<LogEntryType> iterator_impl_;

View File

@ -197,6 +197,7 @@ void oceanbase::observer::init_srv_xlator_for_palfenv(ObSrvRpcXlator *xlator)
RPC_PROCESSOR(palf::LogPushReqP);
RPC_PROCESSOR(palf::LogPushRespP);
RPC_PROCESSOR(palf::LogFetchReqP);
RPC_PROCESSOR(palf::LogBatchFetchRespP);
RPC_PROCESSOR(palf::LogPrepareReqP);
RPC_PROCESSOR(palf::LogPrepareRespP);
RPC_PROCESSOR(palf::LogChangeConfigMetaReqP);