Fix palf sw unexpected full bug.
This commit is contained in:
@ -145,10 +145,12 @@ bool LogGroupBuffer::can_handle_new_log(const LSN &lsn,
|
||||
} else if (!lsn.is_valid() || total_len <= 0 || !ref_reuse_lsn.is_valid()) {
|
||||
PALF_LOG_RET(WARN, OB_INVALID_ARGUMENT, "invalid arguments", K(bool_ret), K(lsn), K(total_len), K(ref_reuse_lsn));
|
||||
} else if (lsn < start_lsn) {
|
||||
PALF_LOG_RET(WARN, OB_ERR_UNEXPECTED, "lsn is less than start_lsn", K(bool_ret), K(lsn), K_(start_lsn));
|
||||
PALF_LOG_RET(WARN, OB_INVALID_ARGUMENT, "lsn is less than start_lsn", K(bool_ret), K(lsn), K_(start_lsn));
|
||||
} else if (end_lsn > reuse_lsn + get_available_buffer_size()) {
|
||||
PALF_LOG_RET(WARN, OB_ERR_UNEXPECTED, "end_lsn is larger than max reuse pos", K(bool_ret), K(lsn), K(end_lsn),
|
||||
if (REACH_TIME_INTERVAL(1000 * 1000)) {
|
||||
PALF_LOG_RET(WARN, OB_EAGAIN, "end_lsn is larger than max reuse pos", K(bool_ret), K(lsn), K(end_lsn),
|
||||
K(reuse_lsn), K_(available_buffer_size));
|
||||
}
|
||||
} else {
|
||||
bool_ret = true;
|
||||
}
|
||||
@ -189,31 +191,6 @@ int LogGroupBuffer::get_log_buf(const LSN &lsn, const int64_t total_len, LogWrit
|
||||
return ret;
|
||||
}
|
||||
|
||||
int LogGroupBuffer::wait(const LSN &lsn, const int64_t data_len)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const LSN end_lsn = lsn + data_len;
|
||||
LSN start_lsn, reuse_lsn;
|
||||
get_buffer_start_lsn_(start_lsn);
|
||||
get_reuse_lsn_(reuse_lsn);
|
||||
const int64_t buf_size = get_available_buffer_size();
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
} else if (!lsn.is_valid() || data_len <= 0) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
PALF_LOG(WARN, "invalid arguments", K(ret), K(lsn), K(data_len));
|
||||
} else if (lsn < start_lsn) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
PALF_LOG(WARN, "lsn is less than start_lsn", K(ret), K(lsn), K(start_lsn));
|
||||
} else if (end_lsn > reuse_lsn + buf_size) {
|
||||
ret = OB_EAGAIN;
|
||||
PALF_LOG(WARN, "need retry", K(ret), K(lsn), K(data_len), K(end_lsn), K(reuse_lsn), K(start_lsn));
|
||||
} else {
|
||||
// wait success
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int LogGroupBuffer::fill(const LSN &lsn,
|
||||
const char *data,
|
||||
const int64_t data_len)
|
||||
|
@ -48,7 +48,6 @@ public:
|
||||
const int64_t data_len);
|
||||
int fill_padding(const LSN &lsn,
|
||||
const int64_t padding_len);
|
||||
int wait(const LSN &lsn, const int64_t data_len);
|
||||
int get_log_buf(const LSN &lsn, const int64_t total_len, LogWriteBuf &log_buf);
|
||||
bool can_handle_new_log(const LSN &lsn,
|
||||
const int64_t total_len) const;
|
||||
|
@ -337,9 +337,11 @@ bool LogSlidingWindow::leader_can_submit_new_log_(const int64_t valid_log_size)
|
||||
// NB: 采用committed_lsn作为可复用起点的下界,避免写盘立即复用group_buffer导致follower的
|
||||
// group_buffer被uncommitted log填满而无法滑出
|
||||
} else if (!group_buffer_.can_handle_new_log(curr_end_lsn, valid_log_size, curr_committed_end_lsn)) {
|
||||
if (REACH_TIME_INTERVAL(1000 * 1000)) {
|
||||
PALF_LOG_RET(WARN, OB_ERR_UNEXPECTED, "group_buffer_ cannot handle new log now", K(tmp_ret), K_(palf_id), K_(self),
|
||||
K(valid_log_size), K(curr_end_lsn), K(curr_committed_end_lsn),
|
||||
"start_id", get_start_id(), "max_log_id", get_max_log_id());
|
||||
}
|
||||
} else {
|
||||
bool_ret = true;
|
||||
}
|
||||
@ -419,8 +421,10 @@ int LogSlidingWindow::submit_log(const char *buf,
|
||||
} else if (!leader_can_submit_new_log_(valid_log_size)
|
||||
|| !leader_can_submit_larger_log_(get_max_log_id() + 1)) {
|
||||
ret = OB_EAGAIN;
|
||||
if (REACH_TIME_INTERVAL(1000 * 1000)) {
|
||||
PALF_LOG(WARN, "cannot submit new log now, try again", K(ret), K_(palf_id), K_(self),
|
||||
K(valid_log_size), K(buf_len), "start_id", get_start_id(), "max_log_id", get_max_log_id());
|
||||
}
|
||||
// sw_ cannot submit larger log
|
||||
} else if (OB_FAIL(lsn_allocator_.alloc_lsn_scn(ref_scn, valid_log_size,
|
||||
tmp_lsn, log_id, scn, is_new_log, need_gen_padding_entry, padding_size))) {
|
||||
@ -553,9 +557,10 @@ int LogSlidingWindow::wait_group_buffer_ready_(const LSN &lsn, const int64_t dat
|
||||
// NB: 尽管已经使用'committed_end_lsn_'限制了'leader_can_submit_new_log_', 但我们依旧需要判断'group_buffer_'是否已经可以复用:
|
||||
// 1. 并发提交日志会导致所有日志都能进入到提交流程;
|
||||
// 2. 不能够使用'committed_end_lsn_'判断'group_buffer_'是否可以被复用, 因为'committed_end_lsn_'可能大于'max_flushed_end_lsn'.
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
int64_t wait_times = 0;
|
||||
while (OB_EAGAIN == (tmp_ret = group_buffer_.wait(lsn, data_len))) {
|
||||
LSN curr_committed_end_lsn;
|
||||
get_committed_end_lsn_(curr_committed_end_lsn);
|
||||
while (false == group_buffer_.can_handle_new_log(lsn, data_len, curr_committed_end_lsn)) {
|
||||
// 要填充的终点超过了buffer可复用的范围
|
||||
// 需要重试直到可复用终点推大
|
||||
static const int64_t MAX_SLEEP_US = 100;
|
||||
@ -565,9 +570,9 @@ int LogSlidingWindow::wait_group_buffer_ready_(const LSN &lsn, const int64_t dat
|
||||
sleep_us = MAX_SLEEP_US;
|
||||
}
|
||||
ob_usleep(sleep_us);
|
||||
PALF_LOG(WARN, "usleep wait", K(tmp_ret), K_(palf_id), K_(self), K(lsn), K(data_len));
|
||||
PALF_LOG(WARN, "usleep wait", K_(palf_id), K_(self), K(lsn), K(data_len), K(curr_committed_end_lsn));
|
||||
get_committed_end_lsn_(curr_committed_end_lsn);
|
||||
}
|
||||
ret = tmp_ret;
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -2816,7 +2821,9 @@ int LogSlidingWindow::receive_log(const common::ObAddr &src_server,
|
||||
K_(last_truncate_lsn), K_(is_rebuilding), K(src_server), K(lsn), KP(buf), K(buf_len));
|
||||
} else if (!group_buffer_.can_handle_new_log(lsn, buf_len)) {
|
||||
ret = OB_EAGAIN;
|
||||
if (REACH_TIME_INTERVAL(1000 * 1000)) {
|
||||
PALF_LOG(WARN, "group_buffer_ cannot handle new log", K(ret), K_(palf_id), K_(self), K(lsn));
|
||||
}
|
||||
} else if (OB_FAIL(group_entry_header.deserialize(buf, buf_len, pos))) {
|
||||
PALF_LOG(WARN, "group_entry_header deserialize failed", K(ret), K_(palf_id), K_(self));
|
||||
} else if (!group_entry_header.check_integrity(buf + LogGroupEntryHeader::HEADER_SER_SIZE,
|
||||
|
@ -356,31 +356,33 @@ int PalfHandleImpl::submit_log(
|
||||
const int64_t curr_time_us = ObClockGenerator::getClock();
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
PALF_LOG(WARN, "PalfHandleImpl is not inited", K(ret));
|
||||
PALF_LOG(WARN, "PalfHandleImpl is not inited");
|
||||
} else if (NULL == buf || buf_len <= 0 || buf_len > MAX_LOG_BODY_SIZE
|
||||
|| !ref_scn.is_valid()
|
||||
|| ref_scn.convert_to_ts() > curr_time_us + MAX_ALLOWED_SKEW_FOR_REF_US) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
PALF_LOG(WARN, "invalid argument", K(ret), K_(palf_id), KP(buf), K(buf_len), K(ref_scn));
|
||||
PALF_LOG(WARN, "invalid argument", K_(palf_id), KP(buf), K(buf_len), K(ref_scn));
|
||||
} else {
|
||||
RLockGuard guard(lock_);
|
||||
if (false == palf_env_impl_->check_disk_space_enough()) {
|
||||
ret = OB_LOG_OUTOF_DISK_SPACE;
|
||||
if (palf_reach_time_interval(1 * 1000 * 1000, log_disk_full_warn_time_)) {
|
||||
PALF_LOG(WARN, "log outof disk space", K(ret), KPC(this), K(opts), K(ref_scn));
|
||||
PALF_LOG(WARN, "log outof disk space", KPC(this), K(opts), K(ref_scn));
|
||||
}
|
||||
} else if (!state_mgr_.can_append(opts.proposal_id, opts.need_check_proposal_id)) {
|
||||
ret = OB_NOT_MASTER;
|
||||
PALF_LOG(WARN, "cannot submit_log", K(ret), KPC(this), KP(buf), K(buf_len), "role",
|
||||
PALF_LOG(WARN, "cannot submit_log", KPC(this), KP(buf), K(buf_len), "role",
|
||||
state_mgr_.get_role(), "state", state_mgr_.get_state(), "proposal_id",
|
||||
state_mgr_.get_proposal_id(), K(opts), "mode_mgr can_append", mode_mgr_.can_append());
|
||||
} else if (OB_UNLIKELY(state_mgr_.is_changing_config_with_arb())) {
|
||||
ret = OB_EAGAIN;
|
||||
if (palf_reach_time_interval(200 * 1000, chaning_config_warn_time_)) {
|
||||
PALF_LOG(WARN, "can not submit log when memberlist is being changed", K(ret), KPC(this));
|
||||
PALF_LOG(WARN, "can not submit log when memberlist is being changed", KPC(this));
|
||||
}
|
||||
} else if (OB_FAIL(sw_.submit_log(buf, buf_len, ref_scn, lsn, scn))) {
|
||||
PALF_LOG(WARN, "submit_log failed", K(ret), KPC(this), KP(buf), K(buf_len));
|
||||
if (OB_EAGAIN != ret) {
|
||||
PALF_LOG(WARN, "submit_log failed", KPC(this), KP(buf), K(buf_len));
|
||||
}
|
||||
} else {
|
||||
PALF_LOG(TRACE, "submit_log success", K(ret), KPC(this), K(buf_len), K(lsn), K(scn));
|
||||
if (palf_reach_time_interval(2 * 1000 * 1000, append_size_stat_time_us_)) {
|
||||
|
@ -133,25 +133,6 @@ TEST_F(TestLogGroupBuffer, test_get_log_buf)
|
||||
EXPECT_EQ(OB_SUCCESS, log_group_buffer_.get_log_buf(lsn, len, log_buf));
|
||||
}
|
||||
|
||||
TEST_F(TestLogGroupBuffer, test_wait)
|
||||
{
|
||||
LSN lsn;
|
||||
int64_t len = 0;
|
||||
EXPECT_EQ(OB_NOT_INIT, log_group_buffer_.wait(lsn, len));
|
||||
LSN start_lsn(100);
|
||||
EXPECT_EQ(OB_SUCCESS, log_group_buffer_.init(start_lsn));
|
||||
EXPECT_EQ(OB_INVALID_ARGUMENT, log_group_buffer_.wait(lsn, len));
|
||||
lsn = start_lsn;
|
||||
EXPECT_EQ(OB_INVALID_ARGUMENT, log_group_buffer_.wait(lsn, len));
|
||||
len = 100;
|
||||
lsn.val_ = start_lsn.val_ - 1;
|
||||
EXPECT_EQ(OB_ERR_UNEXPECTED, log_group_buffer_.wait(lsn, len));
|
||||
lsn.val_ = start_lsn.val_ + log_group_buffer_.get_available_buffer_size();
|
||||
EXPECT_EQ(OB_EAGAIN, log_group_buffer_.wait(lsn, len));
|
||||
lsn.val_ = start_lsn.val_ + 100;
|
||||
EXPECT_EQ(OB_SUCCESS, log_group_buffer_.wait(lsn, len));
|
||||
}
|
||||
|
||||
TEST_F(TestLogGroupBuffer, test_fill)
|
||||
{
|
||||
LSN lsn;
|
||||
|
Reference in New Issue
Block a user