Fix sliding_cb and fill padding log contennt.

This commit is contained in:
obdev
2023-02-25 02:46:15 +00:00
committed by ob-robot
parent e66b618a25
commit 8f9004950c
10 changed files with 113 additions and 96 deletions

View File

@ -293,6 +293,7 @@ public:
// pass and retry until timeout
// check slide condition and execute sliding_cb before inc begin_sn
} else if (!cond(curr_begin, &(array_[idx]))) {
ret = cond.ret_;
break;
} else {
// begin_sn_ and end_sn_ locate in same index and ref count == 0,
@ -330,7 +331,7 @@ public:
break;
}
}
PALF_LOG(TRACE, "slide end", K(begin_sn_), K(end_sn_));
PALF_LOG(TRACE, "slide end", K(ret), K(begin_sn_), K(end_sn_));
}
return ret;

View File

@ -69,18 +69,23 @@ int LogChecksum::acquire_accum_checksum(const int64_t data_checksum,
int LogChecksum::verify_accum_checksum(const int64_t data_checksum,
const int64_t accum_checksum)
{
// This interface is re-entrant.
// If canlculated checksum is unexpected, the verify_checksum_ won't change.
int ret = common::OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
} else {
const int64_t old_verify_checksum = verify_checksum_;
verify_checksum_ = common::ob_crc64(verify_checksum_, const_cast<int64_t *>(&data_checksum),
const int64_t new_verify_checksum = common::ob_crc64(verify_checksum_, const_cast<int64_t *>(&data_checksum),
sizeof(data_checksum));
if (verify_checksum_ != accum_checksum) {
if (new_verify_checksum != accum_checksum) {
// Checksum error occurs, verify_checksum_ won't change.
ret = common::OB_CHECKSUM_ERROR;
LOG_DBA_ERROR(OB_CHECKSUM_ERROR, "msg", "log checksum error", "ret", ret, K_(palf_id), K(data_checksum),
K(accum_checksum), K(old_verify_checksum), K_(verify_checksum));
K(accum_checksum), K(old_verify_checksum), K(new_verify_checksum));
} else {
// Update verify_checksum_ when checking succeeds.
verify_checksum_ = new_verify_checksum;
PALF_LOG(TRACE, "verify_accum_checksum success", K(ret), K_(palf_id), K(data_checksum), K(accum_checksum),
K_(verify_checksum), K_(accum_checksum));
}

View File

@ -101,6 +101,7 @@ const int64_t PALF_SYNC_RPC_TIMEOUT_US = 500 * 1000;
const int64_t PALF_LOG_SYNC_DELAY_THRESHOLD_US = 3 * 1000 * 1000L; // 3 s
constexpr int64_t INVALID_PROPOSAL_ID = INT64_MAX;
constexpr int64_t PALF_INITIAL_PROPOSAL_ID = 0;
constexpr char PADDING_LOG_CONTENT_CHAR = '\0';
inline int64_t max_proposal_id(const int64_t a, const int64_t b)
{

View File

@ -235,12 +235,12 @@ int LogGroupBuffer::fill(const LSN &lsn,
return ret;
}
int LogGroupBuffer::fill_padding(const LSN &lsn,
const int64_t padding_len)
int LogGroupBuffer::fill_padding_body(const LSN &lsn,
const int64_t log_body_size)
{
int ret = OB_SUCCESS;
int64_t start_pos = 0;
const LSN end_lsn = lsn + padding_len;
const LSN end_lsn = lsn + log_body_size;
LSN start_lsn, reuse_lsn;
get_buffer_start_lsn_(start_lsn);
get_reuse_lsn_(reuse_lsn);
@ -248,9 +248,9 @@ int LogGroupBuffer::fill_padding(const LSN &lsn,
const int64_t available_buf_size = get_available_buffer_size();
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
} else if (!lsn.is_valid() || padding_len <= 0) {
} else if (!lsn.is_valid() || log_body_size <= 0) {
ret = OB_INVALID_ARGUMENT;
PALF_LOG(WARN, "invalid arguments", K(ret), K(lsn), K(padding_len));
PALF_LOG(WARN, "invalid arguments", K(ret), K(lsn), K(log_body_size));
} else if (lsn < start_lsn) {
ret = OB_ERR_UNEXPECTED;
PALF_LOG(WARN, "lsn is less than start_lsn", K(ret), K(lsn), K_(start_lsn));
@ -267,14 +267,14 @@ int LogGroupBuffer::fill_padding(const LSN &lsn,
PALF_LOG(WARN, "get_buffer_pos_ failed", K(ret), K(lsn));
} else {
const int64_t group_buf_tail_len = reserved_buf_size - start_pos;
int64_t first_part_len = min(group_buf_tail_len, padding_len);
memset(data_buf_ + start_pos, 0, first_part_len);
if (padding_len > first_part_len) {
int64_t first_part_len = min(group_buf_tail_len, log_body_size);
memset(data_buf_ + start_pos, PADDING_LOG_CONTENT_CHAR, first_part_len);
if (log_body_size > first_part_len) {
// seeking to buffer's beginning
memset(data_buf_, 0, padding_len - first_part_len);
memset(data_buf_, PADDING_LOG_CONTENT_CHAR, log_body_size - first_part_len);
}
PALF_LOG(INFO, "fill padding success", K(ret), K(lsn), K(padding_len), K(start_pos), K(group_buf_tail_len),
K(first_part_len), "second_part_len", padding_len - first_part_len);
PALF_LOG(INFO, "fill padding body success", K(ret), K(lsn), K(log_body_size), K(start_pos), K(group_buf_tail_len),
K(first_part_len), "second_part_len", log_body_size - first_part_len);
}
return ret;
}

View File

@ -46,8 +46,8 @@ public:
int fill(const LSN &lsn,
const char *data,
const int64_t data_len);
int fill_padding(const LSN &lsn,
const int64_t padding_len);
int fill_padding_body(const LSN &lsn,
const int64_t log_body_size);
int get_log_buf(const LSN &lsn, const int64_t total_len, LogWriteBuf &log_buf);
bool can_handle_new_log(const LSN &lsn,
const int64_t total_len) const;

View File

@ -45,9 +45,6 @@ public:
// return total size of header and body, including the length of padding log
int64_t get_group_entry_size() const { return header_.get_serialize_size() +
header_.get_data_len(); }
// used for fetch_log, ignore the data len of padding entry
int64_t get_group_size_without_padding_data() const { return header_.get_serialize_size() +
(header_.is_padding_log() ? 0 : header_.get_data_len()); }
int get_log_min_scn(share::SCN &min_scn) const;
const share::SCN get_scn() const { return header_.get_max_scn(); }
LSN get_committed_end_lsn() const { return header_.get_committed_end_lsn(); }

View File

@ -122,7 +122,7 @@ private:
static constexpr int64_t PADDING_LOG_DATA_CHECKSUM = 0; // padding log的data_checksum为0
private:
// Binary visualization, for LogGroupEntryHeader, its' magic number
// is 0x4752, means GH(group header)
// is 0x4752, means GR(group header)
int16_t magic_;
// Upgrade compatible
int16_t version_;

View File

@ -634,7 +634,7 @@ int LogSlidingWindow::append_to_group_log_(const LSN &lsn,
int LogSlidingWindow::generate_new_group_log_(const LSN &lsn,
const int64_t log_id,
const SCN &scn,
const int64_t log_body_size, // LOG_HEADER_SIZE + log_data_len
const int64_t log_body_size, // log_entry_header_size + log_data_len
const LogType &log_type,
const char *log_data,
const int64_t data_len,
@ -687,12 +687,15 @@ int LogSlidingWindow::generate_new_group_log_(const LSN &lsn,
if (OB_FAIL(wait_group_buffer_ready_(lsn, log_body_size + LogGroupEntryHeader::HEADER_SER_SIZE))) {
PALF_LOG(ERROR, "group_buffer wait failed", K(ret), K_(palf_id), K_(self));
} else if (is_padding_log) {
// padding log, no need fill buf, because its padding length is maybe larger than group_buffer size.
const int64_t log_entry_buf_len = log_body_size;
// inc ref
log_task->ref(log_entry_buf_len);
const bool set_submit_tag_res = log_task->set_submit_log_exist();
assert(true == set_submit_tag_res);
// padding log, fill log body with '\0'.
if (OB_FAIL(group_buffer_.fill_padding_body(lsn + LogGroupEntryHeader::HEADER_SER_SIZE, log_body_size))) {
PALF_LOG(WARN, "group_buffer fill_padding_body failed", K(ret), K_(palf_id), K_(self), K(log_body_size));
} else {
// inc ref
log_task->ref(log_body_size);
const bool set_submit_tag_res = log_task->set_submit_log_exist();
assert(true == set_submit_tag_res);
}
} else {
int64_t pos = 0;
assert(LogEntryHeader::HEADER_SER_SIZE < TMP_HEADER_SER_BUF_LEN);
@ -1980,55 +1983,59 @@ int LogSlidingWindow::sliding_cb(const int64_t sn, const FixedSlidingWindowSlot
const int64_t log_submit_ts = log_task->get_submit_ts();
log_task->unlock();
int tmp_ret = OB_SUCCESS;
const int64_t fs_cb_begin_ts = ObTimeUtility::current_time();
if (OB_SUCCESS != (tmp_ret = palf_fs_cb_->update_end_lsn(palf_id_, log_end_lsn, log_proposal_id))) {
if (OB_EAGAIN == tmp_ret) {
if (REACH_TIME_INTERVAL(1 * 1000 * 1000)) {
PALF_LOG(WARN, "update_end_lsn eagain", K(tmp_ret), K_(palf_id), K_(self), K(log_id), KPC(log_task));
}
} else {
PALF_LOG(WARN, "update_end_lsn failed", K(tmp_ret), K_(palf_id), K_(self), K(log_id), KPC(log_task));
}
}
const int64_t fs_cb_cost = ObTimeUtility::current_time() - fs_cb_begin_ts;
fs_cb_cost_stat_.stat(fs_cb_cost);
if (fs_cb_cost > 1 * 1000) {
PALF_LOG_RET(WARN, OB_ERR_TOO_MUCH_TIME, "fs_cb->update_end_lsn() cost too much time", K(tmp_ret), K_(palf_id), K_(self),
K(fs_cb_cost), K(log_id), K(log_begin_lsn), K(log_end_lsn), K(log_proposal_id));
}
const int64_t log_life_time = fs_cb_begin_ts - log_gen_ts;
log_life_time_stat_.stat(log_life_time);
log_submit_wait_stat_.stat(log_submit_ts - log_gen_ts);
log_submit_to_slide_cost_stat_.stat(fs_cb_begin_ts - log_submit_ts);
if (log_life_time > 100 * 1000) {
if (palf_reach_time_interval(100 * 1000, log_life_long_warn_time_)) {
PALF_LOG_RET(WARN, OB_ERR_TOO_MUCH_TIME, "log_task life cost too much time", K_(palf_id), K_(self), K(log_id), KPC(log_task),
K(fs_cb_begin_ts), K(log_life_time));
}
}
// Verifying accum_checksum firstly.
if (OB_FAIL(checksum_.verify_accum_checksum(log_task_header.data_checksum_,
log_task_header.accum_checksum_))) {
PALF_LOG(ERROR, "verify_accum_checksum failed", K_(palf_id), K_(self), K(ret), K(log_id), KPC(log_task));
PALF_LOG(ERROR, "verify_accum_checksum failed", K(ret), KPC(this), K(log_id), KPC(log_task));
} else {
// Call fs_cb.
int tmp_ret = OB_SUCCESS;
const int64_t fs_cb_begin_ts = ObTimeUtility::current_time();
if (OB_SUCCESS != (tmp_ret = palf_fs_cb_->update_end_lsn(palf_id_, log_end_lsn, log_proposal_id))) {
if (OB_EAGAIN == tmp_ret) {
if (REACH_TIME_INTERVAL(1 * 1000 * 1000)) {
PALF_LOG(WARN, "update_end_lsn eagain", K(tmp_ret), K_(palf_id), K_(self), K(log_id), KPC(log_task));
}
} else {
PALF_LOG(WARN, "update_end_lsn failed", K(tmp_ret), K_(palf_id), K_(self), K(log_id), KPC(log_task));
}
}
const int64_t fs_cb_cost = ObTimeUtility::current_time() - fs_cb_begin_ts;
fs_cb_cost_stat_.stat(fs_cb_cost);
if (fs_cb_cost > 1 * 1000) {
PALF_LOG_RET(WARN, OB_ERR_TOO_MUCH_TIME, "fs_cb->update_end_lsn() cost too much time", K(tmp_ret), K_(palf_id), K_(self),
K(fs_cb_cost), K(log_id), K(log_begin_lsn), K(log_end_lsn), K(log_proposal_id));
}
const int64_t log_life_time = fs_cb_begin_ts - log_gen_ts;
log_life_time_stat_.stat(log_life_time);
log_submit_wait_stat_.stat(log_submit_ts - log_gen_ts);
log_submit_to_slide_cost_stat_.stat(fs_cb_begin_ts - log_submit_ts);
if (log_life_time > 100 * 1000) {
if (palf_reach_time_interval(100 * 1000, log_life_long_warn_time_)) {
PALF_LOG_RET(WARN, OB_ERR_TOO_MUCH_TIME, "log_task life cost too much time", K_(palf_id), K_(self), K(log_id), KPC(log_task),
K(fs_cb_begin_ts), K(log_life_time));
}
}
// update last_slide_lsn_
(void) try_update_last_slide_log_info_(log_id, log_max_scn, log_begin_lsn, log_end_lsn, \
log_proposal_id, log_accum_checksum);
}
if (OB_SUCC(ret)) {
(void) try_update_last_slide_log_info_(log_id, log_max_scn, log_begin_lsn, log_end_lsn, \
log_proposal_id, log_accum_checksum);
}
MEM_BARRIER(); // ensure last_slide_log_info_ has been updated before fetch log streamingly
MEM_BARRIER(); // ensure last_slide_log_info_ has been updated before fetch log streamingly
if (OB_SUCC(ret)
&& (FOLLOWER == state_mgr_->get_role() || state_mgr_->is_leader_reconfirm())) {
// Check if need fetch log streamingly,
try_fetch_log_streamingly_(log_end_lsn);
if (OB_SUCC(ret)
&& (FOLLOWER == state_mgr_->get_role() || state_mgr_->is_leader_reconfirm())) {
// Check if need fetch log streamingly,
try_fetch_log_streamingly_(log_end_lsn);
}
}
}
if (0 == log_id % 100) {
PALF_LOG(INFO, "sliding_cb finished", K_(palf_id), K_(self), K(ret), K(log_id));
PALF_LOG(INFO, "sliding_cb finished", K(ret), K_(palf_id), K_(self), K(ret), K(log_id));
}
}
return ret;
@ -2174,7 +2181,7 @@ int LogSlidingWindow::clean_cached_log(const int64_t begin_log_id,
{
// Caller holds palf_handle's wrlock.
// This func is used to clean cached log_task that has not been processed.
// The arg begin_log_id is expected to be equal with (last_submit_log_id + 1).
// The arg begin_log_id is expected to be equal to (last_submit_log_id + 1).
// Before executing clean op, we need double check prev_log info.
int ret = OB_SUCCESS;
const int64_t last_submit_log_id = get_last_submit_log_id_();
@ -2891,7 +2898,9 @@ int LogSlidingWindow::receive_log(const common::ObAddr &src_server,
if (OB_SUCC(ret)
&& log_id > (last_submit_log_id + 1)
&& log_proposal_id != last_submit_log_pid) {
// if its proposal_id does not equal to last_submit_log_pid, we cannot cache it.
// if its proposal_id does not equal to last_submit_log_pid,
// and it's not continuous with last_submit_log_id, we cannot receive it.
// Only logs whose proposal_id is equal to last_submit_log_pid can be cached into sw.
ret = OB_EAGAIN;
PALF_LOG(WARN, "new log's proposal_id does not equal to last submit log's, and log_id is not continuous with last "\
"submit log, cannot receive(cache) it", K(ret), K_(palf_id), K_(self), K(log_id), K(log_proposal_id),
@ -2903,7 +2912,7 @@ int LogSlidingWindow::receive_log(const common::ObAddr &src_server,
&& need_check_clean_log
&& log_id == last_submit_log_id + 1
&& log_proposal_id != last_submit_log_pid) {
// prev log matches, new log's proposal_id does not equal with last submit log,
// prev log matches, new log's proposal_id does not equal to last submit log's,
// and it is continuous with last submit log,
// check if need clean cached log_tasks.
if (INVALID_PROPOSAL_ID != last_submit_log_pid && log_proposal_id < last_submit_log_pid) {
@ -2928,27 +2937,30 @@ int LogSlidingWindow::receive_log(const common::ObAddr &src_server,
if (OB_SUCC(ret)) {
bool is_local_log_valid = false;
if (!need_update_log_task_(group_entry_header, log_task, need_send_ack, is_local_log_valid, is_log_pid_match)) {
// local log_task is already valid
PALF_LOG(INFO, "no need update log", K(log_id), K_(palf_id), K_(self), K(need_send_ack), K(is_log_pid_match),
K(is_local_log_valid), K(is_prev_log_exist), K(is_prev_log_match), K(group_entry_header), KPC(log_task));
if (false == is_local_log_valid) {
// local log_task is invalid, and it does not need update, this means that it's maybe in PRE_FILL state.
} else if (false == is_log_pid_match) {
// local log_task's proposal_id does not match with new log.
// And (log_id <= last_submit_log_id) must be true, because:
// if log_id > last_submit_log_id, and there are too cases:
// + its proposal_id != last_submit_log_pid, ret cannot be OB_SUCCESS, it cannot reach here.
// + its proposal_id == last_submit_log_pid, the local log_task's propsal_id should be equal
// with last_submit_log_pid too.
// If not, local log_task's data is unexpected (it cannot be received).
// In summary, log_id <= last_submit_log_id, and it has passed prev log check.
//
// (log_id <= last_submit_log_id) must be true.
//
// Because if log_id > last_submit_log_id, there are only too cases:
// 1) new log_proposal_id != last_submit_log_pid, it cannot reach here, because it cannot be received.
//
// 2) new log_proposal_id == last_submit_log_pid, the local log_task's propsal_id should be equal
// with last_submit_log_pid too. If not, local log shouldn't exist (it should already be
// truncated by previous receive operation).
//
// In summary, (log_id <= last_submit_log_id) must be true.
if (lsn <= last_submit_end_lsn) {
// It means that this log is the first mismatch one with request server,
// because it has passed prev log check.
// We need truncate log at this lsn.
// We need truncate log at this log's begin lsn.
truncate_log_info.truncate_type_ = TRUNCATE_LOG;
truncate_log_info.truncate_log_id_ = log_id;
// lsn is expected to be equal with log_task's end_lsn.
// lsn is expected to be equal to log_task's end_lsn.
// So we can use lsn as the truncate_begin_lsn_.
truncate_log_info.truncate_begin_lsn_ = lsn;
log_task->lock();
@ -2960,8 +2972,10 @@ int LogSlidingWindow::receive_log(const common::ObAddr &src_server,
K(last_submit_log_id), KPC(log_task), K(is_prev_log_exist), K(is_prev_log_match),
K(truncate_log_info));
} else {
// It means that the expected truncate pos should be some previous log.
// But it cannot pass prev log check, so this is unexpected!
// Unexpected case:
// (log_id <= last_submit_log_id) and (lsn > last_submit_end_lsn).
// If the expected truncating log is some previous one,
// this log cannot pass the prev log check.
ret = OB_ERR_UNEXPECTED;
PALF_LOG(ERROR, "lsn > last_submit_end_lsn and log_id <= last_submit_log_id, \
and local log_task's proposal_id != arg proposal_id, unexpected",
@ -4081,9 +4095,9 @@ int LogSlidingWindow::reset_location_cache_cb()
}
int LogSlidingWindow::get_min_scn_from_buf_(const LogGroupEntryHeader &header,
const char *buf,
const int64_t buf_len,
SCN &min_scn)
const char *buf,
const int64_t buf_len,
SCN &min_scn)
{
int ret = OB_SUCCESS;
LogEntryHeader log_entry_header;

View File

@ -3031,8 +3031,7 @@ int PalfHandleImpl::submit_fetch_log_resp_(const common::ObAddr &server,
LogWriteBuf write_buf;
// NB: 'curr_group_entry' generates by PalfGroupBufferIterator, the memory is safe before next();
const char *buf = curr_group_entry.get_data_buf() - curr_group_entry.get_header().get_serialize_size();
// buf_len ignores padding entry's data_len
const int64_t buf_len = curr_group_entry.get_group_size_without_padding_data();
const int64_t buf_len = curr_group_entry.get_group_entry_size();
int64_t pos = 0;
const int64_t curr_log_proposal_id = curr_group_entry.get_header().get_log_proposal_id();
if (OB_FAIL(write_buf.push_back(buf, buf_len))) {

View File

@ -172,28 +172,28 @@ TEST_F(TestLogGroupBuffer, test_fill_padding)
LSN lsn;
int64_t len = 0;
LSN reuse_lsn(1024);
EXPECT_EQ(OB_NOT_INIT, log_group_buffer_.fill_padding(lsn, len));
EXPECT_EQ(OB_NOT_INIT, log_group_buffer_.fill_padding_body(lsn, len));
LSN start_lsn(100);
EXPECT_EQ(OB_SUCCESS, log_group_buffer_.init(start_lsn));
EXPECT_EQ(OB_INVALID_ARGUMENT, log_group_buffer_.fill_padding(lsn, len));
EXPECT_EQ(OB_INVALID_ARGUMENT, log_group_buffer_.fill_padding_body(lsn, len));
lsn = reuse_lsn;
EXPECT_EQ(OB_INVALID_ARGUMENT, log_group_buffer_.fill_padding(lsn, len));
EXPECT_EQ(OB_INVALID_ARGUMENT, log_group_buffer_.fill_padding(lsn, len));
EXPECT_EQ(OB_INVALID_ARGUMENT, log_group_buffer_.fill_padding_body(lsn, len));
EXPECT_EQ(OB_INVALID_ARGUMENT, log_group_buffer_.fill_padding_body(lsn, len));
len = 100;
lsn.val_ = start_lsn.val_ - 1;
EXPECT_EQ(OB_ERR_UNEXPECTED, log_group_buffer_.fill_padding(lsn, len));
EXPECT_EQ(OB_ERR_UNEXPECTED, log_group_buffer_.fill_padding_body(lsn, len));
lsn.val_ = start_lsn.val_;
EXPECT_EQ(OB_SUCCESS, log_group_buffer_.inc_update_reuse_lsn(reuse_lsn));
EXPECT_EQ(OB_ERR_UNEXPECTED, log_group_buffer_.fill_padding(lsn, len));
EXPECT_EQ(OB_ERR_UNEXPECTED, log_group_buffer_.fill_padding_body(lsn, len));
lsn = reuse_lsn;
len = log_group_buffer_.get_available_buffer_size() + 1;
EXPECT_EQ(OB_EAGAIN, log_group_buffer_.fill_padding(lsn, len));
EXPECT_EQ(OB_EAGAIN, log_group_buffer_.fill_padding_body(lsn, len));
len = 1024;
int64_t used_size = len;
const int64_t buf_size = log_group_buffer_.get_available_buffer_size();
LSN buf_end_lsn = reuse_lsn + (buf_size - (reuse_lsn.val_ - start_lsn.val_));
while (lsn + len < buf_end_lsn) {
EXPECT_EQ(OB_SUCCESS, log_group_buffer_.fill_padding(lsn, len));
EXPECT_EQ(OB_SUCCESS, log_group_buffer_.fill_padding_body(lsn, len));
lsn.val_ += len;
}
EXPECT_GT(lsn + len, buf_end_lsn);