Fix lsn or log_id exceeds upper bound bug.
This commit is contained in:
parent
b5b1d2d57f
commit
9a13e1b863
@ -63,6 +63,7 @@ public:
|
||||
int inc_update_reuse_lsn(const LSN &new_reuse_lsn);
|
||||
// set reuse_lsn, used for truncate case(trucate/rebuild)
|
||||
int set_reuse_lsn(const LSN &new_reuse_lsn);
|
||||
void get_reuse_lsn(LSN &reuse_lsn) const { return get_reuse_lsn_(reuse_lsn); }
|
||||
TO_STRING_KV("log_group_buffer: start_lsn", start_lsn_, "reuse_lsn", reuse_lsn_, "reserved_buffer_size",
|
||||
reserved_buffer_size_, "available_buffer_size", available_buffer_size_);
|
||||
private:
|
||||
|
@ -330,7 +330,7 @@ bool LogSlidingWindow::leader_can_submit_larger_log_(const int64_t log_id) const
|
||||
return bool_ret;
|
||||
}
|
||||
|
||||
bool LogSlidingWindow::leader_can_submit_new_log_(const int64_t valid_log_size)
|
||||
bool LogSlidingWindow::leader_can_submit_new_log_(const int64_t valid_log_size, LSN &lsn_upper_bound)
|
||||
{
|
||||
// Check whether leader can submit new log.
|
||||
// The valid_log_size does not consider group_header for generating new group log case.
|
||||
@ -339,6 +339,13 @@ bool LogSlidingWindow::leader_can_submit_new_log_(const int64_t valid_log_size)
|
||||
LSN curr_end_lsn;
|
||||
LSN curr_committed_end_lsn;
|
||||
get_committed_end_lsn_(curr_committed_end_lsn);
|
||||
// calculate lsn_upper_bound
|
||||
LSN buffer_reuse_lsn;
|
||||
(void) group_buffer_.get_reuse_lsn(buffer_reuse_lsn);
|
||||
const int64_t group_buffer_size = group_buffer_.get_available_buffer_size();
|
||||
LSN reuse_base_lsn = MIN(curr_committed_end_lsn, buffer_reuse_lsn);
|
||||
lsn_upper_bound = reuse_base_lsn + group_buffer_size;
|
||||
|
||||
if (OB_SUCCESS != (tmp_ret = lsn_allocator_.get_curr_end_lsn(curr_end_lsn))) {
|
||||
PALF_LOG_RET(WARN, tmp_ret, "get_curr_end_lsn failed", K(tmp_ret), K_(palf_id), K_(self), K(valid_log_size));
|
||||
// NB: 采用committed_lsn作为可复用起点的下界,避免写盘立即复用group_buffer导致follower的
|
||||
@ -419,13 +426,15 @@ int LogSlidingWindow::submit_log(const char *buf,
|
||||
int64_t padding_size = 0;
|
||||
// group log valid size (without padding part)
|
||||
const int64_t valid_log_size = LogEntryHeader::HEADER_SER_SIZE + buf_len;
|
||||
LSN tmp_lsn;
|
||||
const int64_t start_log_id = get_start_id();
|
||||
const int64_t log_id_upper_bound = start_log_id + PALF_MAX_LEADER_SUBMIT_LOG_COUNT - 1;
|
||||
LSN tmp_lsn, lsn_upper_bound;
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
} else if (NULL == buf || buf_len <= 0 || buf_len > MAX_LOG_BODY_SIZE || (!ref_scn.is_valid())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
PALF_LOG(WARN, "invalid arguments", K(ret), K_(palf_id), K_(self), K(buf_len), KP(buf));
|
||||
} else if (!leader_can_submit_new_log_(valid_log_size)
|
||||
} else if (!leader_can_submit_new_log_(valid_log_size, lsn_upper_bound)
|
||||
|| !leader_can_submit_larger_log_(get_max_log_id() + 1)) {
|
||||
ret = OB_EAGAIN;
|
||||
if (REACH_TIME_INTERVAL(1000 * 1000)) {
|
||||
@ -433,7 +442,7 @@ int LogSlidingWindow::submit_log(const char *buf,
|
||||
K(valid_log_size), K(buf_len), "start_id", get_start_id(), "max_log_id", get_max_log_id());
|
||||
}
|
||||
// sw_ cannot submit larger log
|
||||
} else if (OB_FAIL(lsn_allocator_.alloc_lsn_scn(ref_scn, valid_log_size,
|
||||
} else if (OB_FAIL(lsn_allocator_.alloc_lsn_scn(ref_scn, valid_log_size, log_id_upper_bound, lsn_upper_bound,
|
||||
tmp_lsn, log_id, scn, is_new_log, need_gen_padding_entry, padding_size))) {
|
||||
PALF_LOG(WARN, "alloc_lsn_scn failed", K(ret), K_(palf_id), K_(self));
|
||||
} else if (OB_FAIL(leader_wait_sw_slot_ready_(log_id))) {
|
||||
@ -658,7 +667,7 @@ int LogSlidingWindow::generate_new_group_log_(const LSN &lsn,
|
||||
PALF_LOG(WARN, "invalid argumetns", K(ret), K_(palf_id), K_(self), K(lsn), K(scn), K(log_id), K(log_body_size),
|
||||
K(log_type), KP(log_data), K(data_len));
|
||||
} else if (OB_FAIL(guard.get_log_task(log_id, log_task))) {
|
||||
PALF_LOG(WARN, "get_log_task_ failed", K(ret), K(log_id), K_(palf_id), K_(self));
|
||||
PALF_LOG(ERROR, "get_log_task_ failed", K(ret), K(log_id), K_(palf_id), K_(self), "start_log_id", get_start_id(), "max_log_id", get_max_log_id());
|
||||
} else {
|
||||
LogEntryHeader log_entry_header;
|
||||
LogGroupEntryHeader header;
|
||||
|
@ -273,7 +273,7 @@ private:
|
||||
int leader_wait_sw_slot_ready_(const int64_t log_id);
|
||||
bool can_receive_larger_log_(const int64_t log_id) const;
|
||||
bool leader_can_submit_larger_log_(const int64_t log_id) const;
|
||||
bool leader_can_submit_new_log_(const int64_t valid_log_size);
|
||||
bool leader_can_submit_new_log_(const int64_t valid_log_size, LSN &lsn_upper_bound);
|
||||
bool leader_can_submit_group_log_(const LSN &lsn, const int64_t group_log_size);
|
||||
void get_committed_end_lsn_(LSN &out_lsn) const;
|
||||
int get_max_flushed_log_info_(LSN &lsn,
|
||||
|
@ -288,6 +288,8 @@ int LSNAllocator::try_freeze(LSN &last_lsn, int64_t &last_log_id)
|
||||
|
||||
int LSNAllocator::alloc_lsn_scn(const SCN &base_scn,
|
||||
const int64_t size, // 已包含LogHeader size
|
||||
const int64_t log_id_upper_bound,
|
||||
const LSN &lsn_upper_bound,
|
||||
LSN &lsn,
|
||||
int64_t &log_id,
|
||||
SCN &scn,
|
||||
@ -298,9 +300,9 @@ int LSNAllocator::alloc_lsn_scn(const SCN &base_scn,
|
||||
int ret = OB_SUCCESS;
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
} else if (size <= 0 || !base_scn.is_valid()) {
|
||||
} else if (size <= 0 || !base_scn.is_valid() || log_id_upper_bound <= 0 || !lsn_upper_bound.is_valid()) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
PALF_LOG(WARN, "invalid arguments", K(ret), K(base_scn), K(size));
|
||||
PALF_LOG(WARN, "invalid arguments", K(ret), K(base_scn), K(size), K(log_id_upper_bound), K(lsn_upper_bound));
|
||||
} else {
|
||||
// 生成新日志时需加上log_group_entry_header的size
|
||||
const int64_t new_group_log_size = size + LogGroupEntryHeader::HEADER_SER_SIZE;
|
||||
@ -448,7 +450,22 @@ int LSNAllocator::alloc_lsn_scn(const SCN &base_scn,
|
||||
next.log_id_delta_ = tmp_next_log_id_delta;
|
||||
next.scn_delta_ = tmp_next_scn_delta;
|
||||
|
||||
if (CAS128(&lsn_ts_meta_, last, next)) {
|
||||
int64_t new_log_id = is_new_group_log ? (last_log_id + 1) : last_log_id;
|
||||
if (need_gen_padding_entry) {
|
||||
// Padding entry also consumes one log_id.
|
||||
new_log_id++;
|
||||
}
|
||||
LSN new_max_lsn;
|
||||
new_max_lsn.val_ = next.lsn_val_;
|
||||
if (new_log_id > log_id_upper_bound || new_max_lsn > lsn_upper_bound) {
|
||||
ret = OB_EAGAIN;
|
||||
if (REACH_TIME_INTERVAL(100 * 1000)) {
|
||||
PALF_LOG(INFO, "log_id or lsn will exceed upper bound, need retry", K(ret), K(size),
|
||||
K(new_log_id), K(new_max_lsn), K(is_new_group_log), K(need_gen_padding_entry),
|
||||
K(log_id_upper_bound), K(lsn_upper_bound));
|
||||
}
|
||||
break;
|
||||
} else if (CAS128(&lsn_ts_meta_, last, next)) {
|
||||
lsn.val_ = last.lsn_val_;
|
||||
if (is_new_group_log) {
|
||||
log_id = last_log_id + 1;
|
||||
|
@ -57,6 +57,8 @@ public:
|
||||
//
|
||||
int alloc_lsn_scn(const share::SCN &base_scn,
|
||||
const int64_t size,
|
||||
const int64_t log_id_upper_bound,
|
||||
const LSN &lsn_upper_bound,
|
||||
LSN &lsn,
|
||||
int64_t &log_id,
|
||||
share::SCN &scn,
|
||||
|
@ -182,14 +182,20 @@ TEST_F(TestLSNAllocator, test_lsn_allocator_alloc_lsn_scn)
|
||||
bool need_gen_padding_entry = false;
|
||||
int64_t padding_len = 0;
|
||||
|
||||
EXPECT_EQ(OB_NOT_INIT, lsn_allocator_.alloc_lsn_scn(b_scn, size, lsn, log_id, scn,
|
||||
int64_t log_id_upper_bound = 99999;
|
||||
LSN lsn_upper_bound(9999999999);
|
||||
|
||||
EXPECT_EQ(OB_NOT_INIT, lsn_allocator_.alloc_lsn_scn(b_scn, size, log_id_upper_bound, lsn_upper_bound, lsn, log_id, scn,
|
||||
is_new_log, need_gen_padding_entry, padding_len));
|
||||
EXPECT_EQ(OB_SUCCESS, lsn_allocator_.init(initial_log_id, initial_scn, start_lsn));
|
||||
int64_t invalid_size = 0;
|
||||
EXPECT_EQ(OB_INVALID_ARGUMENT, lsn_allocator_.alloc_lsn_scn(b_scn, invalid_size, lsn, log_id, scn,
|
||||
EXPECT_EQ(OB_INVALID_ARGUMENT, lsn_allocator_.alloc_lsn_scn(b_scn, invalid_size, log_id_upper_bound, lsn_upper_bound, lsn, log_id, scn,
|
||||
is_new_log, need_gen_padding_entry, padding_len));
|
||||
EXPECT_EQ(OB_INVALID_ARGUMENT, lsn_allocator_.alloc_lsn_scn(b_scn, size, 0, lsn_upper_bound, lsn, log_id, scn, is_new_log, need_gen_padding_entry, padding_len));
|
||||
LSN invalid_lsn;
|
||||
EXPECT_EQ(OB_INVALID_ARGUMENT, lsn_allocator_.alloc_lsn_scn(b_scn, size, 1, invalid_lsn, lsn, log_id, scn, is_new_log, need_gen_padding_entry, padding_len));
|
||||
// test alloc_lsn_scn()
|
||||
EXPECT_EQ(OB_SUCCESS, lsn_allocator_.alloc_lsn_scn(b_scn, size, lsn, log_id, scn,
|
||||
EXPECT_EQ(OB_SUCCESS, lsn_allocator_.alloc_lsn_scn(b_scn, size, log_id_upper_bound, lsn_upper_bound, lsn, log_id, scn,
|
||||
is_new_log, need_gen_padding_entry, padding_len));
|
||||
EXPECT_EQ(initial_log_id + 1, log_id);
|
||||
}
|
||||
@ -232,7 +238,9 @@ TEST_F(TestLSNAllocator, test_lsn_allocator_truncate)
|
||||
EXPECT_EQ(OB_INVALID_ARGUMENT, lsn_allocator_.truncate(tmp_lsn, truncate_log_id, new_scn));
|
||||
tmp_lsn.val_ = 100;
|
||||
EXPECT_EQ(OB_SUCCESS, lsn_allocator_.truncate(tmp_lsn, truncate_log_id, new_scn));
|
||||
EXPECT_EQ(OB_SUCCESS, lsn_allocator_.alloc_lsn_scn(b_scn, size, lsn, log_id, scn,
|
||||
int64_t log_id_upper_bound = 99999;
|
||||
LSN lsn_upper_bound(9999999999);
|
||||
EXPECT_EQ(OB_SUCCESS, lsn_allocator_.alloc_lsn_scn(b_scn, size, log_id_upper_bound, lsn_upper_bound, lsn, log_id, scn,
|
||||
is_new_log, need_gen_padding_entry, padding_len));
|
||||
EXPECT_EQ(truncate_log_id + 1, log_id);
|
||||
// test truncate()
|
||||
@ -240,14 +248,14 @@ TEST_F(TestLSNAllocator, test_lsn_allocator_truncate)
|
||||
EXPECT_EQ(OB_INVALID_ARGUMENT, lsn_allocator_.inc_update_last_log_info(tmp_lsn, tmp_log_id, tmp_scn));
|
||||
tmp_lsn.val_ = 10; // no need update
|
||||
EXPECT_EQ(OB_SUCCESS, lsn_allocator_.inc_update_last_log_info(tmp_lsn, tmp_log_id, tmp_scn));
|
||||
EXPECT_EQ(OB_SUCCESS, lsn_allocator_.alloc_lsn_scn(b_scn, size, lsn, log_id, scn,
|
||||
EXPECT_EQ(OB_SUCCESS, lsn_allocator_.alloc_lsn_scn(b_scn, size, log_id_upper_bound, lsn_upper_bound, lsn, log_id, scn,
|
||||
is_new_log, need_gen_padding_entry, padding_len));
|
||||
EXPECT_EQ(truncate_log_id + 1, log_id); // 聚合到上一条日志中
|
||||
// update success
|
||||
tmp_lsn.val_ = 10000000;
|
||||
EXPECT_EQ(OB_SUCCESS, lsn_allocator_.inc_update_last_log_info(tmp_lsn, tmp_log_id, tmp_scn));
|
||||
size = 2 * 1024 * 1024;
|
||||
EXPECT_EQ(OB_SUCCESS, lsn_allocator_.alloc_lsn_scn(b_scn, size, lsn, log_id, scn,
|
||||
EXPECT_EQ(OB_SUCCESS, lsn_allocator_.alloc_lsn_scn(b_scn, size, log_id_upper_bound, lsn_upper_bound, lsn, log_id, scn,
|
||||
is_new_log, need_gen_padding_entry, padding_len));
|
||||
EXPECT_EQ(tmp_log_id + 1, log_id);
|
||||
|
||||
@ -255,7 +263,7 @@ TEST_F(TestLSNAllocator, test_lsn_allocator_truncate)
|
||||
EXPECT_EQ(OB_STATE_NOT_MATCH, lsn_allocator_.try_freeze_by_time(end_lsn, end_log_id));
|
||||
// 生成一条新的小日志,预期is_need_cut会为false
|
||||
size = 10;
|
||||
EXPECT_EQ(OB_SUCCESS, lsn_allocator_.alloc_lsn_scn(b_scn, size, lsn, log_id, scn,
|
||||
EXPECT_EQ(OB_SUCCESS, lsn_allocator_.alloc_lsn_scn(b_scn, size, log_id_upper_bound, lsn_upper_bound, lsn, log_id, scn,
|
||||
is_new_log, need_gen_padding_entry, padding_len));
|
||||
|
||||
EXPECT_EQ(log_id, lsn_allocator_.get_max_log_id());
|
||||
@ -277,10 +285,13 @@ TEST_F(TestLSNAllocator, test_alloc_offset_single_thread)
|
||||
|
||||
int64_t avg_cost = 0;
|
||||
int64_t ROUND = 1;
|
||||
int64_t count = 1000000;
|
||||
int64_t log_id_upper_bound = 999999999;
|
||||
LSN lsn_upper_bound(ROUND * count * MAX_LOG_BUFFER_SIZE);
|
||||
for (int64_t j = 0; j < ROUND; j++) {
|
||||
int64_t idx = rand() % LOG_LOG_CNT;
|
||||
const int64_t begin_ts = ObTimeUtility::current_time_ns();
|
||||
for (int i = 0; i < 1000000; i++) {
|
||||
for (int i = 0; i < count; i++) {
|
||||
share::SCN b_scn = share::SCN::base_scn();
|
||||
int64_t size = log_size_array[idx];
|
||||
LSN ret_offset;
|
||||
@ -290,7 +301,7 @@ TEST_F(TestLSNAllocator, test_alloc_offset_single_thread)
|
||||
bool need_gen_padding_entry = false;
|
||||
int64_t padding_len = 0;
|
||||
|
||||
EXPECT_EQ(OB_SUCCESS, golbal_lsn_allocator.alloc_lsn_scn(b_scn, size, ret_offset, ret_log_id, ret_scn,
|
||||
EXPECT_EQ(OB_SUCCESS, golbal_lsn_allocator.alloc_lsn_scn(b_scn, size, log_id_upper_bound, lsn_upper_bound, ret_offset, ret_log_id, ret_scn,
|
||||
is_new_log, need_gen_padding_entry, padding_len));
|
||||
}
|
||||
int64_t cost = ObTimeUtility::current_time_ns() - begin_ts;
|
||||
@ -335,12 +346,14 @@ public:
|
||||
bool is_new_log = false;
|
||||
bool need_gen_padding_entry = false;
|
||||
int64_t padding_len = 0;
|
||||
int64_t log_id_upper_bound = 9999999;
|
||||
LSN lsn_upper_bound(999999 * MAX_LOG_BUFFER_SIZE);
|
||||
|
||||
for (int j = 0; j < 1; j++) {
|
||||
const int64_t begin_ts = ObTimeUtility::current_time_ns();
|
||||
for (int i = 0; i < 1000; i++) {
|
||||
// size = log_size_array[i % LOG_LOG_CNT];
|
||||
EXPECT_EQ(OB_SUCCESS, golbal_lsn_allocator.alloc_lsn_scn(scn, size, ret_offset, ret_log_id, ret_scn,
|
||||
EXPECT_EQ(OB_SUCCESS, golbal_lsn_allocator.alloc_lsn_scn(scn, size, log_id_upper_bound, lsn_upper_bound, ret_offset, ret_log_id, ret_scn,
|
||||
is_new_log, need_gen_padding_entry, padding_len));
|
||||
}
|
||||
int64_t cost = ObTimeUtility::current_time_ns() - begin_ts;
|
||||
|
Loading…
x
Reference in New Issue
Block a user