From 9a13e1b8638e0ed5e974854f65d05892dcf53bd5 Mon Sep 17 00:00:00 2001 From: obdev Date: Tue, 14 Mar 2023 17:10:55 +0000 Subject: [PATCH] Fix lsn or log_id exceeds upper bound bug. --- src/logservice/palf/log_group_buffer.h | 1 + src/logservice/palf/log_sliding_window.cpp | 19 +++++++++---- src/logservice/palf/log_sliding_window.h | 2 +- src/logservice/palf/lsn_allocator.cpp | 23 +++++++++++++-- src/logservice/palf/lsn_allocator.h | 2 ++ unittest/logservice/test_lsn_allocator.cpp | 33 +++++++++++++++------- 6 files changed, 61 insertions(+), 19 deletions(-) diff --git a/src/logservice/palf/log_group_buffer.h b/src/logservice/palf/log_group_buffer.h index cde36e38d..761bb453e 100644 --- a/src/logservice/palf/log_group_buffer.h +++ b/src/logservice/palf/log_group_buffer.h @@ -63,6 +63,7 @@ public: int inc_update_reuse_lsn(const LSN &new_reuse_lsn); // set reuse_lsn, used for truncate case(trucate/rebuild) int set_reuse_lsn(const LSN &new_reuse_lsn); + void get_reuse_lsn(LSN &reuse_lsn) const { return get_reuse_lsn_(reuse_lsn); } TO_STRING_KV("log_group_buffer: start_lsn", start_lsn_, "reuse_lsn", reuse_lsn_, "reserved_buffer_size", reserved_buffer_size_, "available_buffer_size", available_buffer_size_); private: diff --git a/src/logservice/palf/log_sliding_window.cpp b/src/logservice/palf/log_sliding_window.cpp index 843a809b5..4c20be22b 100644 --- a/src/logservice/palf/log_sliding_window.cpp +++ b/src/logservice/palf/log_sliding_window.cpp @@ -330,7 +330,7 @@ bool LogSlidingWindow::leader_can_submit_larger_log_(const int64_t log_id) const return bool_ret; } -bool LogSlidingWindow::leader_can_submit_new_log_(const int64_t valid_log_size) +bool LogSlidingWindow::leader_can_submit_new_log_(const int64_t valid_log_size, LSN &lsn_upper_bound) { // Check whether leader can submit new log. // The valid_log_size does not consider group_header for generating new group log case. @@ -339,6 +339,13 @@ bool LogSlidingWindow::leader_can_submit_new_log_(const int64_t valid_log_size) LSN curr_end_lsn; LSN curr_committed_end_lsn; get_committed_end_lsn_(curr_committed_end_lsn); + // calculate lsn_upper_bound + LSN buffer_reuse_lsn; + (void) group_buffer_.get_reuse_lsn(buffer_reuse_lsn); + const int64_t group_buffer_size = group_buffer_.get_available_buffer_size(); + LSN reuse_base_lsn = MIN(curr_committed_end_lsn, buffer_reuse_lsn); + lsn_upper_bound = reuse_base_lsn + group_buffer_size; + if (OB_SUCCESS != (tmp_ret = lsn_allocator_.get_curr_end_lsn(curr_end_lsn))) { PALF_LOG_RET(WARN, tmp_ret, "get_curr_end_lsn failed", K(tmp_ret), K_(palf_id), K_(self), K(valid_log_size)); // NB: 采用committed_lsn作为可复用起点的下界,避免写盘立即复用group_buffer导致follower的 @@ -419,13 +426,15 @@ int LogSlidingWindow::submit_log(const char *buf, int64_t padding_size = 0; // group log valid size (without padding part) const int64_t valid_log_size = LogEntryHeader::HEADER_SER_SIZE + buf_len; - LSN tmp_lsn; + const int64_t start_log_id = get_start_id(); + const int64_t log_id_upper_bound = start_log_id + PALF_MAX_LEADER_SUBMIT_LOG_COUNT - 1; + LSN tmp_lsn, lsn_upper_bound; if (IS_NOT_INIT) { ret = OB_NOT_INIT; } else if (NULL == buf || buf_len <= 0 || buf_len > MAX_LOG_BODY_SIZE || (!ref_scn.is_valid())) { ret = OB_INVALID_ARGUMENT; PALF_LOG(WARN, "invalid arguments", K(ret), K_(palf_id), K_(self), K(buf_len), KP(buf)); - } else if (!leader_can_submit_new_log_(valid_log_size) + } else if (!leader_can_submit_new_log_(valid_log_size, lsn_upper_bound) || !leader_can_submit_larger_log_(get_max_log_id() + 1)) { ret = OB_EAGAIN; if (REACH_TIME_INTERVAL(1000 * 1000)) { @@ -433,7 +442,7 @@ int LogSlidingWindow::submit_log(const char *buf, K(valid_log_size), K(buf_len), "start_id", get_start_id(), "max_log_id", get_max_log_id()); } // sw_ cannot submit larger log - } else if (OB_FAIL(lsn_allocator_.alloc_lsn_scn(ref_scn, valid_log_size, + } else if (OB_FAIL(lsn_allocator_.alloc_lsn_scn(ref_scn, valid_log_size, log_id_upper_bound, lsn_upper_bound, tmp_lsn, log_id, scn, is_new_log, need_gen_padding_entry, padding_size))) { PALF_LOG(WARN, "alloc_lsn_scn failed", K(ret), K_(palf_id), K_(self)); } else if (OB_FAIL(leader_wait_sw_slot_ready_(log_id))) { @@ -658,7 +667,7 @@ int LogSlidingWindow::generate_new_group_log_(const LSN &lsn, PALF_LOG(WARN, "invalid argumetns", K(ret), K_(palf_id), K_(self), K(lsn), K(scn), K(log_id), K(log_body_size), K(log_type), KP(log_data), K(data_len)); } else if (OB_FAIL(guard.get_log_task(log_id, log_task))) { - PALF_LOG(WARN, "get_log_task_ failed", K(ret), K(log_id), K_(palf_id), K_(self)); + PALF_LOG(ERROR, "get_log_task_ failed", K(ret), K(log_id), K_(palf_id), K_(self), "start_log_id", get_start_id(), "max_log_id", get_max_log_id()); } else { LogEntryHeader log_entry_header; LogGroupEntryHeader header; diff --git a/src/logservice/palf/log_sliding_window.h b/src/logservice/palf/log_sliding_window.h index df754f930..820686183 100644 --- a/src/logservice/palf/log_sliding_window.h +++ b/src/logservice/palf/log_sliding_window.h @@ -273,7 +273,7 @@ private: int leader_wait_sw_slot_ready_(const int64_t log_id); bool can_receive_larger_log_(const int64_t log_id) const; bool leader_can_submit_larger_log_(const int64_t log_id) const; - bool leader_can_submit_new_log_(const int64_t valid_log_size); + bool leader_can_submit_new_log_(const int64_t valid_log_size, LSN &lsn_upper_bound); bool leader_can_submit_group_log_(const LSN &lsn, const int64_t group_log_size); void get_committed_end_lsn_(LSN &out_lsn) const; int get_max_flushed_log_info_(LSN &lsn, diff --git a/src/logservice/palf/lsn_allocator.cpp b/src/logservice/palf/lsn_allocator.cpp index 8bc6e233f..84c9e37b4 100644 --- a/src/logservice/palf/lsn_allocator.cpp +++ b/src/logservice/palf/lsn_allocator.cpp @@ -288,6 +288,8 @@ int LSNAllocator::try_freeze(LSN &last_lsn, int64_t &last_log_id) int LSNAllocator::alloc_lsn_scn(const SCN &base_scn, const int64_t size, // 已包含LogHeader size + const int64_t log_id_upper_bound, + const LSN &lsn_upper_bound, LSN &lsn, int64_t &log_id, SCN &scn, @@ -298,9 +300,9 @@ int LSNAllocator::alloc_lsn_scn(const SCN &base_scn, int ret = OB_SUCCESS; if (IS_NOT_INIT) { ret = OB_NOT_INIT; - } else if (size <= 0 || !base_scn.is_valid()) { + } else if (size <= 0 || !base_scn.is_valid() || log_id_upper_bound <= 0 || !lsn_upper_bound.is_valid()) { ret = OB_INVALID_ARGUMENT; - PALF_LOG(WARN, "invalid arguments", K(ret), K(base_scn), K(size)); + PALF_LOG(WARN, "invalid arguments", K(ret), K(base_scn), K(size), K(log_id_upper_bound), K(lsn_upper_bound)); } else { // 生成新日志时需加上log_group_entry_header的size const int64_t new_group_log_size = size + LogGroupEntryHeader::HEADER_SER_SIZE; @@ -448,7 +450,22 @@ int LSNAllocator::alloc_lsn_scn(const SCN &base_scn, next.log_id_delta_ = tmp_next_log_id_delta; next.scn_delta_ = tmp_next_scn_delta; - if (CAS128(&lsn_ts_meta_, last, next)) { + int64_t new_log_id = is_new_group_log ? (last_log_id + 1) : last_log_id; + if (need_gen_padding_entry) { + // Padding entry also consumes one log_id. + new_log_id++; + } + LSN new_max_lsn; + new_max_lsn.val_ = next.lsn_val_; + if (new_log_id > log_id_upper_bound || new_max_lsn > lsn_upper_bound) { + ret = OB_EAGAIN; + if (REACH_TIME_INTERVAL(100 * 1000)) { + PALF_LOG(INFO, "log_id or lsn will exceed upper bound, need retry", K(ret), K(size), + K(new_log_id), K(new_max_lsn), K(is_new_group_log), K(need_gen_padding_entry), + K(log_id_upper_bound), K(lsn_upper_bound)); + } + break; + } else if (CAS128(&lsn_ts_meta_, last, next)) { lsn.val_ = last.lsn_val_; if (is_new_group_log) { log_id = last_log_id + 1; diff --git a/src/logservice/palf/lsn_allocator.h b/src/logservice/palf/lsn_allocator.h index e3400def8..a5a07a885 100644 --- a/src/logservice/palf/lsn_allocator.h +++ b/src/logservice/palf/lsn_allocator.h @@ -57,6 +57,8 @@ public: // int alloc_lsn_scn(const share::SCN &base_scn, const int64_t size, + const int64_t log_id_upper_bound, + const LSN &lsn_upper_bound, LSN &lsn, int64_t &log_id, share::SCN &scn, diff --git a/unittest/logservice/test_lsn_allocator.cpp b/unittest/logservice/test_lsn_allocator.cpp index d236e7966..bd1b847d0 100644 --- a/unittest/logservice/test_lsn_allocator.cpp +++ b/unittest/logservice/test_lsn_allocator.cpp @@ -182,14 +182,20 @@ TEST_F(TestLSNAllocator, test_lsn_allocator_alloc_lsn_scn) bool need_gen_padding_entry = false; int64_t padding_len = 0; - EXPECT_EQ(OB_NOT_INIT, lsn_allocator_.alloc_lsn_scn(b_scn, size, lsn, log_id, scn, + int64_t log_id_upper_bound = 99999; + LSN lsn_upper_bound(9999999999); + + EXPECT_EQ(OB_NOT_INIT, lsn_allocator_.alloc_lsn_scn(b_scn, size, log_id_upper_bound, lsn_upper_bound, lsn, log_id, scn, is_new_log, need_gen_padding_entry, padding_len)); EXPECT_EQ(OB_SUCCESS, lsn_allocator_.init(initial_log_id, initial_scn, start_lsn)); int64_t invalid_size = 0; - EXPECT_EQ(OB_INVALID_ARGUMENT, lsn_allocator_.alloc_lsn_scn(b_scn, invalid_size, lsn, log_id, scn, + EXPECT_EQ(OB_INVALID_ARGUMENT, lsn_allocator_.alloc_lsn_scn(b_scn, invalid_size, log_id_upper_bound, lsn_upper_bound, lsn, log_id, scn, is_new_log, need_gen_padding_entry, padding_len)); + EXPECT_EQ(OB_INVALID_ARGUMENT, lsn_allocator_.alloc_lsn_scn(b_scn, size, 0, lsn_upper_bound, lsn, log_id, scn, is_new_log, need_gen_padding_entry, padding_len)); + LSN invalid_lsn; + EXPECT_EQ(OB_INVALID_ARGUMENT, lsn_allocator_.alloc_lsn_scn(b_scn, size, 1, invalid_lsn, lsn, log_id, scn, is_new_log, need_gen_padding_entry, padding_len)); // test alloc_lsn_scn() - EXPECT_EQ(OB_SUCCESS, lsn_allocator_.alloc_lsn_scn(b_scn, size, lsn, log_id, scn, + EXPECT_EQ(OB_SUCCESS, lsn_allocator_.alloc_lsn_scn(b_scn, size, log_id_upper_bound, lsn_upper_bound, lsn, log_id, scn, is_new_log, need_gen_padding_entry, padding_len)); EXPECT_EQ(initial_log_id + 1, log_id); } @@ -232,7 +238,9 @@ TEST_F(TestLSNAllocator, test_lsn_allocator_truncate) EXPECT_EQ(OB_INVALID_ARGUMENT, lsn_allocator_.truncate(tmp_lsn, truncate_log_id, new_scn)); tmp_lsn.val_ = 100; EXPECT_EQ(OB_SUCCESS, lsn_allocator_.truncate(tmp_lsn, truncate_log_id, new_scn)); - EXPECT_EQ(OB_SUCCESS, lsn_allocator_.alloc_lsn_scn(b_scn, size, lsn, log_id, scn, + int64_t log_id_upper_bound = 99999; + LSN lsn_upper_bound(9999999999); + EXPECT_EQ(OB_SUCCESS, lsn_allocator_.alloc_lsn_scn(b_scn, size, log_id_upper_bound, lsn_upper_bound, lsn, log_id, scn, is_new_log, need_gen_padding_entry, padding_len)); EXPECT_EQ(truncate_log_id + 1, log_id); // test truncate() @@ -240,14 +248,14 @@ TEST_F(TestLSNAllocator, test_lsn_allocator_truncate) EXPECT_EQ(OB_INVALID_ARGUMENT, lsn_allocator_.inc_update_last_log_info(tmp_lsn, tmp_log_id, tmp_scn)); tmp_lsn.val_ = 10; // no need update EXPECT_EQ(OB_SUCCESS, lsn_allocator_.inc_update_last_log_info(tmp_lsn, tmp_log_id, tmp_scn)); - EXPECT_EQ(OB_SUCCESS, lsn_allocator_.alloc_lsn_scn(b_scn, size, lsn, log_id, scn, + EXPECT_EQ(OB_SUCCESS, lsn_allocator_.alloc_lsn_scn(b_scn, size, log_id_upper_bound, lsn_upper_bound, lsn, log_id, scn, is_new_log, need_gen_padding_entry, padding_len)); EXPECT_EQ(truncate_log_id + 1, log_id); // 聚合到上一条日志中 // update success tmp_lsn.val_ = 10000000; EXPECT_EQ(OB_SUCCESS, lsn_allocator_.inc_update_last_log_info(tmp_lsn, tmp_log_id, tmp_scn)); size = 2 * 1024 * 1024; - EXPECT_EQ(OB_SUCCESS, lsn_allocator_.alloc_lsn_scn(b_scn, size, lsn, log_id, scn, + EXPECT_EQ(OB_SUCCESS, lsn_allocator_.alloc_lsn_scn(b_scn, size, log_id_upper_bound, lsn_upper_bound, lsn, log_id, scn, is_new_log, need_gen_padding_entry, padding_len)); EXPECT_EQ(tmp_log_id + 1, log_id); @@ -255,7 +263,7 @@ TEST_F(TestLSNAllocator, test_lsn_allocator_truncate) EXPECT_EQ(OB_STATE_NOT_MATCH, lsn_allocator_.try_freeze_by_time(end_lsn, end_log_id)); // 生成一条新的小日志,预期is_need_cut会为false size = 10; - EXPECT_EQ(OB_SUCCESS, lsn_allocator_.alloc_lsn_scn(b_scn, size, lsn, log_id, scn, + EXPECT_EQ(OB_SUCCESS, lsn_allocator_.alloc_lsn_scn(b_scn, size, log_id_upper_bound, lsn_upper_bound, lsn, log_id, scn, is_new_log, need_gen_padding_entry, padding_len)); EXPECT_EQ(log_id, lsn_allocator_.get_max_log_id()); @@ -277,10 +285,13 @@ TEST_F(TestLSNAllocator, test_alloc_offset_single_thread) int64_t avg_cost = 0; int64_t ROUND = 1; + int64_t count = 1000000; + int64_t log_id_upper_bound = 999999999; + LSN lsn_upper_bound(ROUND * count * MAX_LOG_BUFFER_SIZE); for (int64_t j = 0; j < ROUND; j++) { int64_t idx = rand() % LOG_LOG_CNT; const int64_t begin_ts = ObTimeUtility::current_time_ns(); - for (int i = 0; i < 1000000; i++) { + for (int i = 0; i < count; i++) { share::SCN b_scn = share::SCN::base_scn(); int64_t size = log_size_array[idx]; LSN ret_offset; @@ -290,7 +301,7 @@ TEST_F(TestLSNAllocator, test_alloc_offset_single_thread) bool need_gen_padding_entry = false; int64_t padding_len = 0; - EXPECT_EQ(OB_SUCCESS, golbal_lsn_allocator.alloc_lsn_scn(b_scn, size, ret_offset, ret_log_id, ret_scn, + EXPECT_EQ(OB_SUCCESS, golbal_lsn_allocator.alloc_lsn_scn(b_scn, size, log_id_upper_bound, lsn_upper_bound, ret_offset, ret_log_id, ret_scn, is_new_log, need_gen_padding_entry, padding_len)); } int64_t cost = ObTimeUtility::current_time_ns() - begin_ts; @@ -335,12 +346,14 @@ public: bool is_new_log = false; bool need_gen_padding_entry = false; int64_t padding_len = 0; + int64_t log_id_upper_bound = 9999999; + LSN lsn_upper_bound(999999 * MAX_LOG_BUFFER_SIZE); for (int j = 0; j < 1; j++) { const int64_t begin_ts = ObTimeUtility::current_time_ns(); for (int i = 0; i < 1000; i++) { // size = log_size_array[i % LOG_LOG_CNT]; - EXPECT_EQ(OB_SUCCESS, golbal_lsn_allocator.alloc_lsn_scn(scn, size, ret_offset, ret_log_id, ret_scn, + EXPECT_EQ(OB_SUCCESS, golbal_lsn_allocator.alloc_lsn_scn(scn, size, log_id_upper_bound, lsn_upper_bound, ret_offset, ret_log_id, ret_scn, is_new_log, need_gen_padding_entry, padding_len)); } int64_t cost = ObTimeUtility::current_time_ns() - begin_ts;