diff --git a/src/logservice/palf/log_group_buffer.cpp b/src/logservice/palf/log_group_buffer.cpp index 0e08f6839b..08e8daa616 100644 --- a/src/logservice/palf/log_group_buffer.cpp +++ b/src/logservice/palf/log_group_buffer.cpp @@ -145,10 +145,12 @@ bool LogGroupBuffer::can_handle_new_log(const LSN &lsn, } else if (!lsn.is_valid() || total_len <= 0 || !ref_reuse_lsn.is_valid()) { PALF_LOG_RET(WARN, OB_INVALID_ARGUMENT, "invalid arguments", K(bool_ret), K(lsn), K(total_len), K(ref_reuse_lsn)); } else if (lsn < start_lsn) { - PALF_LOG_RET(WARN, OB_ERR_UNEXPECTED, "lsn is less than start_lsn", K(bool_ret), K(lsn), K_(start_lsn)); + PALF_LOG_RET(WARN, OB_INVALID_ARGUMENT, "lsn is less than start_lsn", K(bool_ret), K(lsn), K_(start_lsn)); } else if (end_lsn > reuse_lsn + get_available_buffer_size()) { - PALF_LOG_RET(WARN, OB_ERR_UNEXPECTED, "end_lsn is larger than max reuse pos", K(bool_ret), K(lsn), K(end_lsn), - K(reuse_lsn), K_(available_buffer_size)); + if (REACH_TIME_INTERVAL(1000 * 1000)) { + PALF_LOG_RET(WARN, OB_EAGAIN, "end_lsn is larger than max reuse pos", K(bool_ret), K(lsn), K(end_lsn), + K(reuse_lsn), K_(available_buffer_size)); + } } else { bool_ret = true; } @@ -189,31 +191,6 @@ int LogGroupBuffer::get_log_buf(const LSN &lsn, const int64_t total_len, LogWrit return ret; } -int LogGroupBuffer::wait(const LSN &lsn, const int64_t data_len) -{ - int ret = OB_SUCCESS; - const LSN end_lsn = lsn + data_len; - LSN start_lsn, reuse_lsn; - get_buffer_start_lsn_(start_lsn); - get_reuse_lsn_(reuse_lsn); - const int64_t buf_size = get_available_buffer_size(); - if (IS_NOT_INIT) { - ret = OB_NOT_INIT; - } else if (!lsn.is_valid() || data_len <= 0) { - ret = OB_INVALID_ARGUMENT; - PALF_LOG(WARN, "invalid arguments", K(ret), K(lsn), K(data_len)); - } else if (lsn < start_lsn) { - ret = OB_ERR_UNEXPECTED; - PALF_LOG(WARN, "lsn is less than start_lsn", K(ret), K(lsn), K(start_lsn)); - } else if (end_lsn > reuse_lsn + buf_size) { - ret = OB_EAGAIN; - PALF_LOG(WARN, "need retry", K(ret), K(lsn), K(data_len), K(end_lsn), K(reuse_lsn), K(start_lsn)); - } else { - // wait success - } - return ret; -} - int LogGroupBuffer::fill(const LSN &lsn, const char *data, const int64_t data_len) diff --git a/src/logservice/palf/log_group_buffer.h b/src/logservice/palf/log_group_buffer.h index f825e8b8a5..212640781d 100644 --- a/src/logservice/palf/log_group_buffer.h +++ b/src/logservice/palf/log_group_buffer.h @@ -48,7 +48,6 @@ public: const int64_t data_len); int fill_padding(const LSN &lsn, const int64_t padding_len); - int wait(const LSN &lsn, const int64_t data_len); int get_log_buf(const LSN &lsn, const int64_t total_len, LogWriteBuf &log_buf); bool can_handle_new_log(const LSN &lsn, const int64_t total_len) const; diff --git a/src/logservice/palf/log_sliding_window.cpp b/src/logservice/palf/log_sliding_window.cpp index e323781c01..77eb46076e 100644 --- a/src/logservice/palf/log_sliding_window.cpp +++ b/src/logservice/palf/log_sliding_window.cpp @@ -337,9 +337,11 @@ bool LogSlidingWindow::leader_can_submit_new_log_(const int64_t valid_log_size) // NB: 采用committed_lsn作为可复用起点的下界,避免写盘立即复用group_buffer导致follower的 // group_buffer被uncommitted log填满而无法滑出 } else if (!group_buffer_.can_handle_new_log(curr_end_lsn, valid_log_size, curr_committed_end_lsn)) { - PALF_LOG_RET(WARN, OB_ERR_UNEXPECTED, "group_buffer_ cannot handle new log now", K(tmp_ret), K_(palf_id), K_(self), - K(valid_log_size), K(curr_end_lsn), K(curr_committed_end_lsn), - "start_id", get_start_id(), "max_log_id", get_max_log_id()); + if (REACH_TIME_INTERVAL(1000 * 1000)) { + PALF_LOG_RET(WARN, OB_ERR_UNEXPECTED, "group_buffer_ cannot handle new log now", K(tmp_ret), K_(palf_id), K_(self), + K(valid_log_size), K(curr_end_lsn), K(curr_committed_end_lsn), + "start_id", get_start_id(), "max_log_id", get_max_log_id()); + } } else { bool_ret = true; } @@ -419,8 +421,10 @@ int LogSlidingWindow::submit_log(const char *buf, } else if (!leader_can_submit_new_log_(valid_log_size) || !leader_can_submit_larger_log_(get_max_log_id() + 1)) { ret = OB_EAGAIN; - PALF_LOG(WARN, "cannot submit new log now, try again", K(ret), K_(palf_id), K_(self), - K(valid_log_size), K(buf_len), "start_id", get_start_id(), "max_log_id", get_max_log_id()); + if (REACH_TIME_INTERVAL(1000 * 1000)) { + PALF_LOG(WARN, "cannot submit new log now, try again", K(ret), K_(palf_id), K_(self), + K(valid_log_size), K(buf_len), "start_id", get_start_id(), "max_log_id", get_max_log_id()); + } // sw_ cannot submit larger log } else if (OB_FAIL(lsn_allocator_.alloc_lsn_scn(ref_scn, valid_log_size, tmp_lsn, log_id, scn, is_new_log, need_gen_padding_entry, padding_size))) { @@ -553,9 +557,10 @@ int LogSlidingWindow::wait_group_buffer_ready_(const LSN &lsn, const int64_t dat // NB: 尽管已经使用'committed_end_lsn_'限制了'leader_can_submit_new_log_', 但我们依旧需要判断'group_buffer_'是否已经可以复用: // 1. 并发提交日志会导致所有日志都能进入到提交流程; // 2. 不能够使用'committed_end_lsn_'判断'group_buffer_'是否可以被复用, 因为'committed_end_lsn_'可能大于'max_flushed_end_lsn'. - int tmp_ret = OB_SUCCESS; int64_t wait_times = 0; - while (OB_EAGAIN == (tmp_ret = group_buffer_.wait(lsn, data_len))) { + LSN curr_committed_end_lsn; + get_committed_end_lsn_(curr_committed_end_lsn); + while (false == group_buffer_.can_handle_new_log(lsn, data_len, curr_committed_end_lsn)) { // 要填充的终点超过了buffer可复用的范围 // 需要重试直到可复用终点推大 static const int64_t MAX_SLEEP_US = 100; @@ -565,9 +570,9 @@ int LogSlidingWindow::wait_group_buffer_ready_(const LSN &lsn, const int64_t dat sleep_us = MAX_SLEEP_US; } ob_usleep(sleep_us); - PALF_LOG(WARN, "usleep wait", K(tmp_ret), K_(palf_id), K_(self), K(lsn), K(data_len)); + PALF_LOG(WARN, "usleep wait", K_(palf_id), K_(self), K(lsn), K(data_len), K(curr_committed_end_lsn)); + get_committed_end_lsn_(curr_committed_end_lsn); } - ret = tmp_ret; return ret; } @@ -2816,7 +2821,9 @@ int LogSlidingWindow::receive_log(const common::ObAddr &src_server, K_(last_truncate_lsn), K_(is_rebuilding), K(src_server), K(lsn), KP(buf), K(buf_len)); } else if (!group_buffer_.can_handle_new_log(lsn, buf_len)) { ret = OB_EAGAIN; - PALF_LOG(WARN, "group_buffer_ cannot handle new log", K(ret), K_(palf_id), K_(self), K(lsn)); + if (REACH_TIME_INTERVAL(1000 * 1000)) { + PALF_LOG(WARN, "group_buffer_ cannot handle new log", K(ret), K_(palf_id), K_(self), K(lsn)); + } } else if (OB_FAIL(group_entry_header.deserialize(buf, buf_len, pos))) { PALF_LOG(WARN, "group_entry_header deserialize failed", K(ret), K_(palf_id), K_(self)); } else if (!group_entry_header.check_integrity(buf + LogGroupEntryHeader::HEADER_SER_SIZE, diff --git a/src/logservice/palf/palf_handle_impl.cpp b/src/logservice/palf/palf_handle_impl.cpp index 3e28af2772..c8c3260976 100644 --- a/src/logservice/palf/palf_handle_impl.cpp +++ b/src/logservice/palf/palf_handle_impl.cpp @@ -356,31 +356,33 @@ int PalfHandleImpl::submit_log( const int64_t curr_time_us = ObClockGenerator::getClock(); if (IS_NOT_INIT) { ret = OB_NOT_INIT; - PALF_LOG(WARN, "PalfHandleImpl is not inited", K(ret)); + PALF_LOG(WARN, "PalfHandleImpl is not inited"); } else if (NULL == buf || buf_len <= 0 || buf_len > MAX_LOG_BODY_SIZE || !ref_scn.is_valid() || ref_scn.convert_to_ts() > curr_time_us + MAX_ALLOWED_SKEW_FOR_REF_US) { ret = OB_INVALID_ARGUMENT; - PALF_LOG(WARN, "invalid argument", K(ret), K_(palf_id), KP(buf), K(buf_len), K(ref_scn)); + PALF_LOG(WARN, "invalid argument", K_(palf_id), KP(buf), K(buf_len), K(ref_scn)); } else { RLockGuard guard(lock_); if (false == palf_env_impl_->check_disk_space_enough()) { ret = OB_LOG_OUTOF_DISK_SPACE; if (palf_reach_time_interval(1 * 1000 * 1000, log_disk_full_warn_time_)) { - PALF_LOG(WARN, "log outof disk space", K(ret), KPC(this), K(opts), K(ref_scn)); + PALF_LOG(WARN, "log outof disk space", KPC(this), K(opts), K(ref_scn)); } } else if (!state_mgr_.can_append(opts.proposal_id, opts.need_check_proposal_id)) { ret = OB_NOT_MASTER; - PALF_LOG(WARN, "cannot submit_log", K(ret), KPC(this), KP(buf), K(buf_len), "role", + PALF_LOG(WARN, "cannot submit_log", KPC(this), KP(buf), K(buf_len), "role", state_mgr_.get_role(), "state", state_mgr_.get_state(), "proposal_id", state_mgr_.get_proposal_id(), K(opts), "mode_mgr can_append", mode_mgr_.can_append()); } else if (OB_UNLIKELY(state_mgr_.is_changing_config_with_arb())) { ret = OB_EAGAIN; if (palf_reach_time_interval(200 * 1000, chaning_config_warn_time_)) { - PALF_LOG(WARN, "can not submit log when memberlist is being changed", K(ret), KPC(this)); + PALF_LOG(WARN, "can not submit log when memberlist is being changed", KPC(this)); } } else if (OB_FAIL(sw_.submit_log(buf, buf_len, ref_scn, lsn, scn))) { - PALF_LOG(WARN, "submit_log failed", K(ret), KPC(this), KP(buf), K(buf_len)); + if (OB_EAGAIN != ret) { + PALF_LOG(WARN, "submit_log failed", KPC(this), KP(buf), K(buf_len)); + } } else { PALF_LOG(TRACE, "submit_log success", K(ret), KPC(this), K(buf_len), K(lsn), K(scn)); if (palf_reach_time_interval(2 * 1000 * 1000, append_size_stat_time_us_)) { diff --git a/unittest/logservice/test_log_group_buffer.cpp b/unittest/logservice/test_log_group_buffer.cpp index c8cd645467..3961a38fa3 100644 --- a/unittest/logservice/test_log_group_buffer.cpp +++ b/unittest/logservice/test_log_group_buffer.cpp @@ -133,25 +133,6 @@ TEST_F(TestLogGroupBuffer, test_get_log_buf) EXPECT_EQ(OB_SUCCESS, log_group_buffer_.get_log_buf(lsn, len, log_buf)); } -TEST_F(TestLogGroupBuffer, test_wait) -{ - LSN lsn; - int64_t len = 0; - EXPECT_EQ(OB_NOT_INIT, log_group_buffer_.wait(lsn, len)); - LSN start_lsn(100); - EXPECT_EQ(OB_SUCCESS, log_group_buffer_.init(start_lsn)); - EXPECT_EQ(OB_INVALID_ARGUMENT, log_group_buffer_.wait(lsn, len)); - lsn = start_lsn; - EXPECT_EQ(OB_INVALID_ARGUMENT, log_group_buffer_.wait(lsn, len)); - len = 100; - lsn.val_ = start_lsn.val_ - 1; - EXPECT_EQ(OB_ERR_UNEXPECTED, log_group_buffer_.wait(lsn, len)); - lsn.val_ = start_lsn.val_ + log_group_buffer_.get_available_buffer_size(); - EXPECT_EQ(OB_EAGAIN, log_group_buffer_.wait(lsn, len)); - lsn.val_ = start_lsn.val_ + 100; - EXPECT_EQ(OB_SUCCESS, log_group_buffer_.wait(lsn, len)); -} - TEST_F(TestLogGroupBuffer, test_fill) { LSN lsn;