fixed read log after flashback may be hang

This commit is contained in:
HaHaJeff 2023-03-04 01:43:38 +00:00 committed by ob-robot
parent 9eba955ac6
commit f8e03ff190
3 changed files with 792 additions and 200 deletions

View File

@ -897,9 +897,8 @@ int ObSimpleLogClusterTestEnv::read_log(PalfHandleImplGuard &leader, const LSN &
bool is_raw_write = false;
if (OB_FAIL(iterator.next())) {
} else if (OB_FAIL(iterator.get_entry(buf, buf_len, scn, log_offset, is_raw_write))) {
} else if (true == is_raw_write){
ret = OB_ERR_UNEXPECTED;
PALF_LOG(ERROR, "the log mustn't be raw write", K(ret), K(iterator), K(log_offset));
} else {
PALF_LOG(TRACE, "print log entry", K(is_raw_write));
}
}
}
@ -920,6 +919,8 @@ int ObSimpleLogClusterTestEnv::read_group_log(PalfHandleImplGuard &leader, LSN l
} else if (iterator.iterator_impl_.curr_entry_is_raw_write_ != entry.get_header().is_raw_write()) {
ret = OB_ERR_UNEXPECTED;
PALF_LOG(ERROR, "raw write not match, unexpected error", K(ret), K(iterator));
} else {
PALF_LOG(TRACE, "print log group entry", K(entry));
}
}
}

View File

@ -662,7 +662,7 @@ TEST_F(TestObSimpleLogClusterSingleReplica, test_iterator)
// case1:
// - 验证mode_version变化后,cache是否清空
// - replayable_point_scn是否生效
// 当mode version发生变化时,预期case应该清空
// 当mode version发生变化时,预期cache应该清空
// raw模式下,当replayable_point_scn很小时,直接返回OB_ITER_END
PALF_LOG(INFO, "runlin trace case1", K(mode_version_v), K(*mode_version), K(max_scn_case1));
// mode_version_v 为无效值时,预期不清空
@ -721,19 +721,22 @@ TEST_F(TestObSimpleLogClusterSingleReplica, test_iterator)
PALF_LOG(INFO, "runlin trace case3", K(iterator), K(max_scn_case3), K(end_lsn_v), K(max_scn_case2));
SCN first_scn = logts[0];
// 在使用next(replayable_point_scn, &next_log_min_scn)接口时
// 我们禁止使用LogEntry的头作为迭代器
LSN first_log_start_lsn = lsns[0];
// 我们禁止使用LogEntry的头作为迭代器
LSN first_log_start_lsn = lsns[0] - sizeof(LogGroupEntryHeader);
LSN first_log_end_lsn = lsns[0]+log_size+sizeof(LogEntryHeader);
SCN next_log_min_scn;
bool iterate_end_by_replayable_point = false;
count = 5;
// 模拟提前达到文件终点, 此时curr_entry_size为0,因此next_log_min_scn为max_scn_case2+1
// 模拟提前达到文件终点, 没有读过新日志,因此next_log_min_scn为prev_entry_scn_+1
end_lsn_v = first_log_start_lsn - 1;
CLOG_LOG(INFO, "runlin trace 1", K(iterator), K(end_lsn_v), KPC(end_lsn), K(max_scn_case2), K(first_scn));
EXPECT_EQ(OB_ITER_END, iterator.next(SCN::plus(first_scn, 10000), next_log_min_scn, iterate_end_by_replayable_point));
// file_end_lsn尽管回退了,但curr_entry_已经没有被读取过, 因此next_log_min_scn依旧为first_scn
EXPECT_EQ(SCN::plus(max_scn_case2, 1), next_log_min_scn);
EXPECT_EQ(SCN::plus(iterator.iterator_impl_.prev_entry_scn_, 1), next_log_min_scn);
EXPECT_EQ(iterate_end_by_replayable_point, false);
CLOG_LOG(INFO, "runlin trace 3.1", K(iterator), K(end_lsn_v), KPC(end_lsn));
EXPECT_EQ(first_log_start_lsn,
iterator.iterator_impl_.log_storage_->get_lsn(iterator.iterator_impl_.curr_read_pos_));
// 读取一条日志成功,next_log_min_scn会被重置
// curr_entry为fisrt_log_ts对应的log
@ -742,10 +745,13 @@ TEST_F(TestObSimpleLogClusterSingleReplica, test_iterator)
EXPECT_EQ(OB_SUCCESS, iterator.next(first_scn, next_log_min_scn, iterate_end_by_replayable_point)); count--;
// iterator 返回成功,next_log_min_scn应该为OB_INVALID_TIMESTAMP
EXPECT_EQ(next_log_min_scn.is_valid(), false);
// iterator中的prev_entry_scn_被设置为first_scn
EXPECT_EQ(iterator.iterator_impl_.prev_entry_scn_, first_scn);
CLOG_LOG(INFO, "runlin trace 3", K(iterator), K(end_lsn_v), KPC(end_lsn));
{
// 模拟提前达到文件终点, 此时文件终点为file_log_end_lsn预期next_log_min_scn为first_scn对应的日志+1
// 模拟提前达到文件终点, 此时文件终点为file_log_end_lsn
// 预期next_log_min_scn为first_scn对应的日志+1
SCN second_scn = logts[1];
EXPECT_EQ(OB_ITER_END, iterator.next(second_scn, next_log_min_scn, iterate_end_by_replayable_point));
// iterator返回OB_ITER_END,next_log_min_scn为first_scn+1
@ -769,6 +775,8 @@ TEST_F(TestObSimpleLogClusterSingleReplica, test_iterator)
EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(iterate_end_by_replayable_point, true);
// iterator返回OB_ITER_END,next_log_min_scn为replayable_point_scn + 1
PALF_LOG(INFO, "runliun trace 4.1", K(replayable_point_scn), K(next_log_min_scn),
K(iterator));
EXPECT_EQ(next_log_min_scn, SCN::plus(replayable_point_scn, 1));
// 再次调用next,预期next_log_min_scn还是replayable_point_scn+1
EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point));
@ -782,33 +790,42 @@ TEST_F(TestObSimpleLogClusterSingleReplica, test_iterator)
}
// 模拟file end lsn不是group entry的终点
{
{
// 设置终点为第三条日志LogEntry对应的起点
end_lsn_v = lsns[2]+10;
// 设置时间戳为第三条日志
SCN third_scn = logts[2];
SCN replayable_point_scn = SCN::plus(third_scn, 10);
CLOG_LOG(INFO, "runlin trace 5.1", K(iterator), K(end_lsn_v), KPC(end_lsn), K(replayable_point_scn));
// 此时内存中缓存的日志为第三条日志, iterator读取过新日志,但该日志由于end_lsn的原因不可读(此时,由于日志非受控回放,因此curr_read_pos_会被递推56)
// 因此next_log_min_scn会被设置为third_scn
EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point));
CLOG_LOG(INFO, "runlin trace 5.1.1", K(iterator), K(next_log_min_scn), K(replayable_point_scn));
EXPECT_EQ(next_log_min_scn, third_scn);
EXPECT_EQ(iterate_end_by_replayable_point, false);
// 设置终点为第三条日志LogEntry对应的起点
end_lsn_v = lsns[2];
SCN third_scn = logts[2];
SCN replayable_point_scn = third_scn.minus(third_scn, 1);
CLOG_LOG(INFO, "runlin trace 5.1", K(iterator), K(end_lsn_v), KPC(end_lsn), K(replayable_point_scn));
// 此时内存中缓存的日志为第三条日志, 但该日志由于end_lsn的原因不可读,
// 因此next_log_min_scn为replayable_point_scn+1
EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(next_log_min_scn, SCN::plus(replayable_point_scn, 1));
EXPECT_EQ(iterate_end_by_replayable_point, false);
// 由于replayable_point_scn与curr_entry_之间不可能有日志,同时replayable_point_scn<curr_entry_,
// 因此prev_entry_scn_会推到到replayable_point_scn
prev_next_success_scn = replayable_point_scn;
EXPECT_EQ(replayable_point_scn, iterator.iterator_impl_.prev_entry_scn_);
// 验证第三条日志由于受控回放无法吐出(replayable_point_scn回退是不可能出现的,为了测试故意模拟)
replayable_point_scn = SCN::minus(third_scn, 4);
EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point));
// 由于replayable_point_scn与curr_entry_之间不可能有日志,同时replayable_point_scn<curr_entry_,
// 由于prev_entry_scn_此时为第二条日志对应的时间戳,小于replayable_point_scn,因此
// next_min_scn会被设置为curr_entry_ scn和replayable_point_scn+1最小值,
// 因此prev_entry_scn_会推到到replayable_point_scn+1
// 同时由于prev_entry_scn_小于replayable_point_scn,同时replayable_point_scn和prev_entry_scn_之间没有日志
// 因此,推到prev_entry_scn_为replayable_point_scn_
EXPECT_EQ(next_log_min_scn, SCN::plus(replayable_point_scn, 1));
EXPECT_EQ(iterate_end_by_replayable_point, true);
EXPECT_EQ(iterator.iterator_impl_.prev_entry_scn_, replayable_point_scn);
prev_next_success_scn = replayable_point_scn;
EXPECT_EQ(replayable_point_scn, iterator.iterator_impl_.prev_entry_scn_);
CLOG_LOG(INFO, "runlin trace 5.2", K(iterator), K(end_lsn_v), KPC(end_lsn));
CLOG_LOG(INFO, "runlin trace 5.2", K(iterator), K(end_lsn_v), KPC(end_lsn), K(replayable_point_scn));
// 将replayable_point_scn变小,但由于在case4的最后一步迭代日成功,因此next_log_min_scn为
// prev_next_success_scn + 1
replayable_point_scn = SCN::minus(replayable_point_scn, 2);
EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(SCN::plus(prev_next_success_scn, 1), next_log_min_scn);
EXPECT_EQ(SCN::plus(replayable_point_scn, 2), iterator.iterator_impl_.prev_entry_scn_);
EXPECT_EQ(iterate_end_by_replayable_point, false);
}
// 将replayable_point_scn变小,此时iterator会将next_min_scn设置为prev_next_success_scn + 1
replayable_point_scn = SCN::minus(replayable_point_scn, 2);
EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(SCN::plus(prev_next_success_scn, 1), next_log_min_scn);
}
end_lsn_v = LSN(1000000000);
while (count > 0) {
@ -817,6 +834,7 @@ TEST_F(TestObSimpleLogClusterSingleReplica, test_iterator)
EXPECT_EQ(false, next_log_min_scn.is_valid());
count--;
}
CLOG_LOG(INFO, "runlin trace 6.1", K(iterator), K(end_lsn_v), K(max_scn_case3));
// 磁盘上以及受控回放点之后没有可读日志,此时应该返回受控回放点+1
EXPECT_EQ(OB_ITER_END, iterator.next(max_scn_case3, next_log_min_scn, iterate_end_by_replayable_point));
@ -842,7 +860,7 @@ TEST_F(TestObSimpleLogClusterSingleReplica, test_iterator)
PALF_LOG(INFO, "runlin trace 6.4", "raw_write_leader_lsn", raw_write_leader.palf_handle_impl_->get_max_lsn(),
"new_leader_lsn", leader.palf_handle_impl_->get_max_lsn());
// case 7 end_lsn_v 为很大的之后,让内存中有2M数据, 预期iterator next会由于受控回放失败,prev_entry_scn_不变
// case 7 end_lsn_v 为很大的之后,让内存中有2M数据, 预期iterator next会由于受控回放失败,prev_entry_scn_不变
// replayable_point_scn 为第一条日志的时间戳-2, next_log_min_scn 为append第一条LogEntry的时间戳
// NB: 如果不将数据读到内存中来,可能会出现读数据报错OB_NEED_RETRY的问题。
end_lsn_v = LSN(1000000000);
@ -852,11 +870,11 @@ TEST_F(TestObSimpleLogClusterSingleReplica, test_iterator)
end_lsn_v = lsns_append[1]+2;
// 此时curr_entry_为第二条日志, curr_entry有效但不可读
// 模拟replayable_point_scn大于curr_entry_
PALF_LOG(INFO, "runlin trace 7.1", K(iterator), K(replayable_point_scn), K(end_lsn_v), K(logts_append[1]));
replayable_point_scn.convert_from_ts(ObTimeUtility::current_time() + 100000000);
// 此时curr_entry_为第二条日志, curr_entry有效但由于file end lsn不可读
// 对于append 日志受控回放无效
replayable_point_scn = SCN::plus(raw_write_leader.palf_handle_impl_->get_max_scn(), 1000000);
EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point));
PALF_LOG(INFO, "runlin trace 7.1", K(iterator), K(replayable_point_scn), K(end_lsn_v), K(logts_append[1]), K(replayable_point_scn));
EXPECT_EQ(next_log_min_scn, logts_append[1]);
EXPECT_EQ(prev_next_success_scn, iterator.iterator_impl_.prev_entry_scn_);
EXPECT_EQ(iterate_end_by_replayable_point, false);
@ -966,28 +984,36 @@ TEST_F(TestObSimpleLogClusterSingleReplica, test_iterator)
EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(SCN::plus(prev_next_success_scn, 1), next_log_min_scn);
EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(SCN::plus(prev_next_success_scn, 1), next_log_min_scn);
// 推大受控回放点到第一条日志,但end_lsn_v也变为第一条日志的起点,此时会由于end_lsn_v不可读
// 预期next_log_min_scn为prev_next_success_scn+1, 由于prev_next_success_scn和replayable_point_scn
// 之间可能存在日志,因此不能将next_Log_min_ts设置为replayable_point_scn+1
//
// 模拟prev_entry_后没有日志,replayable_point_scn大于prev_entry_scn_
end_lsn_v = lsns_append[0];
replayable_point_scn = logts_append[0];
PALF_LOG(INFO, "runlin trace 8.4", K(iterator), K(replayable_point_scn), K(end_lsn_v),
PALF_LOG(INFO, "runlin trace 8.3.1", K(iterator), K(replayable_point_scn), K(end_lsn_v),
K(logts_append[0]), K(prev_next_success_scn));
EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(SCN::plus(prev_next_success_scn, 1), next_log_min_scn);
// 推大受控回放点到第一条日志,但end_lsn_v也变为第一条日志的起点,此时会由于end_lsn_v不可读
// 预期next_min_scn为replayable_point_scn.
// 由于这条日志在此前的next中,不需要受控回放,会推大curr_read_pos_到LogEntry头,再次next不需要读数据直接返回OB_ITER_END
end_lsn_v = lsns_append[0]+10;
replayable_point_scn = logts_append[0];
PALF_LOG(INFO, "runlin trace 8.4", K(iterator), K(replayable_point_scn), K(end_lsn_v),
K(logts_append[0]), K(prev_next_success_scn));
EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(SCN::plus(prev_next_success_scn, 1), next_log_min_scn);
EXPECT_EQ(replayable_point_scn, next_log_min_scn);
EXPECT_EQ(iterate_end_by_replayable_point, false);
// 模拟prev_entry_后没有日志,replayable_point_scn小于prev_entry_scn_
PALF_LOG(INFO, "runlin trace 8.4.1", K(iterator), K(replayable_point_scn), K(end_lsn_v),
K(logts_append[0]), K(prev_next_success_scn));
EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(replayable_point_scn, next_log_min_scn);
EXPECT_EQ(iterate_end_by_replayable_point, false);
PALF_LOG(INFO, "runlin trace 8.4.2", K(iterator), K(replayable_point_scn), K(end_lsn_v),
K(logts_append[0]), K(prev_next_success_scn));
// 模拟prev_entry_后没有日志,replayable_point_scn小于prev_entry_scn_, 后续日志都需要受控回放
// replayable_point_scn回退是不会出现的事,此时next_min_scn会返回prev_entry_scn_+1
replayable_point_scn = SCN::minus(prev_next_success_scn, 100);
EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(SCN::plus(prev_next_success_scn, 1), next_log_min_scn);
EXPECT_EQ(true, iterate_end_by_replayable_point);
EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(SCN::plus(prev_next_success_scn, 1), next_log_min_scn);
@ -1116,6 +1142,29 @@ TEST_F(TestObSimpleLogClusterSingleReplica, test_gc_block)
EXPECT_EQ(expect_scn, min_block_scn);
}
class IOTaskCond : public LogIOTask {
public:
IOTaskCond(const int64_t palf_id, const int64_t palf_epoch) : LogIOTask(palf_id, palf_epoch) {}
virtual int do_task_(int tg_id, IPalfEnvImpl *palf_env_impl) override final
{
PALF_LOG(INFO, "before cond_wait");
cond_.wait();
PALF_LOG(INFO, "after cond_wait");
return OB_SUCCESS;
};
virtual int after_consume_(IPalfEnvImpl *palf_env_impl) override final
{
return OB_SUCCESS;
}
virtual LogIOTaskType get_io_task_type_() const { return LogIOTaskType::FLUSH_META_TYPE; }
int init(int64_t palf_id)
{
palf_id_ = palf_id;
return OB_SUCCESS;
};
virtual void free_this_(IPalfEnvImpl *impl) {UNUSED(impl);}
ObCond cond_;
};
TEST_F(TestObSimpleLogClusterSingleReplica, test_iterator_with_flashback)
{
SET_CASE_LOG_FILE(TEST_NAME, "test_iterator_with_flashback");
@ -1130,49 +1179,445 @@ TEST_F(TestObSimpleLogClusterSingleReplica, test_iterator_with_flashback)
EXPECT_EQ(OB_SUCCESS, create_paxos_group(id_raw_write, leader_idx, raw_write_leader));
EXPECT_EQ(OB_SUCCESS, change_access_mode_to_raw_write(raw_write_leader));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 2, leader_idx, 200));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, 200));
SCN max_scn1 = leader.palf_handle_impl_->get_max_scn();
LSN max_lsn1 = leader.palf_handle_impl_->get_max_lsn();
sleep(2);
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 2, leader_idx, 200));
LSN end_pos_of_log1 = leader.palf_handle_impl_->get_max_lsn();
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, 200));
EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, leader.palf_handle_impl_->get_max_lsn()));
SCN max_scn2 = leader.palf_handle_impl_->get_max_scn();
LSN end_pos_of_log2 = leader.palf_handle_impl_->get_max_lsn();
EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, leader.palf_handle_impl_->get_max_lsn()));
// 提交几条日志到raw_write leader
EXPECT_EQ(OB_ITER_END, read_and_submit_group_log(leader, raw_write_leader));
EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(raw_write_leader, raw_write_leader.palf_handle_impl_->get_max_lsn()));
PalfBufferIterator iterator;
EXPECT_EQ(OB_SUCCESS, raw_write_leader.palf_handle_impl_->alloc_palf_buffer_iterator(LSN(0), iterator));
// 迭代flashacbk之前的日志成功
EXPECT_EQ(OB_SUCCESS, iterator.next(max_scn1));
EXPECT_EQ(OB_SUCCESS, iterator.next(max_scn1));
PALF_LOG(INFO, "runlin trace case1", K(iterator));
EXPECT_EQ(OB_ITER_END, iterator.next(max_scn1));
// 迭代flashback之前的日志成功
SCN next_min_scn;
SCN tmp_scn; tmp_scn.val_ = 1000;
bool iterate_end_by_replayable_point = false;
EXPECT_EQ(OB_SUCCESS, iterator.next(max_scn1, next_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(false, iterate_end_by_replayable_point);
EXPECT_EQ(iterator.iterator_impl_.prev_entry_scn_, max_scn1);
EXPECT_EQ(OB_ITER_END, iterator.next(
max_scn1, next_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(true, iterate_end_by_replayable_point);
EXPECT_EQ(end_pos_of_log1, iterator.iterator_impl_.log_storage_->get_lsn(iterator.iterator_impl_.curr_read_pos_));
EXPECT_EQ(SCN::plus(max_scn1, 1), next_min_scn);
PALF_LOG(INFO, "runlin trace case1", K(iterator), K(end_pos_of_log1));
EXPECT_EQ(OB_SUCCESS, raw_write_leader.palf_handle_impl_->inner_flashback(max_scn1));
EXPECT_EQ(OB_SUCCESS, raw_write_leader.palf_handle_impl_->inner_flashback(max_scn2));
EXPECT_EQ(max_scn1, raw_write_leader.palf_handle_impl_->get_max_scn());
EXPECT_EQ(OB_ITER_END, iterator.next(max_scn1));
EXPECT_EQ(iterator.iterator_impl_.log_storage_->get_lsn(iterator.iterator_impl_.curr_read_pos_-sizeof(LogGroupEntryHeader)),
max_lsn1);
EXPECT_EQ(max_scn2, raw_write_leader.palf_handle_impl_->get_max_scn());
int64_t mode_version;
switch_flashback_to_append(raw_write_leader, mode_version);
EXPECT_EQ(OB_SUCCESS, submit_log(raw_write_leader, 2, leader_idx, 333));
// 磁盘上存在三条日志,一条日志已经迭代,另外一条日志没有迭代(raw_write),最后一条日志为Append
EXPECT_EQ(OB_SUCCESS, submit_log(raw_write_leader, 1, leader_idx, 333));
EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(raw_write_leader, raw_write_leader.palf_handle_impl_->get_max_lsn()));
EXPECT_EQ(OB_ITER_END, read_group_log(raw_write_leader, LSN(0)));
SCN max_scn3 = raw_write_leader.palf_handle_impl_->get_max_scn();
PALF_LOG(INFO, "runlin trace case2", K(iterator), K(max_scn3), "end_lsn:", raw_write_leader.palf_handle_impl_->get_end_lsn());
LSN iterator_end_lsn = iterator.iterator_storage_.end_lsn_;
// 内存中有两条日志,预期返回成功, 此时会清cache
EXPECT_EQ(OB_SUCCESS, iterator.next(max_scn3));
EXPECT_FALSE(iterator_end_lsn == iterator.iterator_storage_.end_lsn_);
EXPECT_EQ(OB_SUCCESS, iterator.next(max_scn3));
// iterator内存中有几条日志,预期返回成功, 此时会清cache, 前一条日志的信息会被清除(raw_write日志)
// 迭代器游标预期依旧指向第一条日志的终点, 由于受控回放,返回iterate_end
EXPECT_EQ(OB_ITER_END, iterator.next(
max_scn1, next_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(end_pos_of_log1, iterator.iterator_impl_.log_storage_->get_lsn(iterator.iterator_impl_.curr_read_pos_));
EXPECT_EQ(true, iterator.iterator_impl_.curr_entry_is_raw_write_);
// 需要从磁盘上将后面两日志读上来,但由于受控回放不会吐出
EXPECT_FALSE(iterator_end_lsn == iterator.iterator_storage_.end_lsn_);
EXPECT_EQ(OB_SUCCESS, iterator.next(max_scn2, next_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(false, iterate_end_by_replayable_point);
EXPECT_EQ(true, iterator.iterator_impl_.curr_entry_is_raw_write_);
EXPECT_EQ(iterator.iterator_impl_.prev_entry_scn_, max_scn2);
EXPECT_EQ(OB_SUCCESS, iterator.next(max_scn3, next_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(false, iterate_end_by_replayable_point);
EXPECT_EQ(false, iterator.iterator_impl_.curr_entry_is_raw_write_);
EXPECT_EQ(iterator.iterator_impl_.prev_entry_scn_, max_scn3);
// raw_write_leader已经有三条日志, raw_write(1 log entry), raw_write(1), append(1),
// 模拟一条group entry 中有多条小日志
LSN last_lsn = raw_write_leader.palf_handle_impl_->get_max_lsn();
SCN last_scn = raw_write_leader.palf_handle_impl_->get_max_scn();
LogIOWorker *io_worker = &raw_write_leader.palf_env_impl_->log_io_worker_;
IOTaskCond cond(id_raw_write, raw_write_leader.palf_env_impl_->last_palf_epoch_);
io_worker->submit_io_task(&cond);
std::vector<LSN> lsns;
std::vector<SCN> scns;
EXPECT_EQ(OB_SUCCESS, submit_log(raw_write_leader, 10, 100, id_raw_write, lsns, scns));
int group_entry_num = 1;
int first_log_entry_index = 0, last_log_entry_index = 0;
for (int i = 1; i < 10; i++) {
if (lsns[i-1]+100+sizeof(LogEntryHeader) == lsns[i]) {
last_log_entry_index = i;
} else {
first_log_entry_index = i;
group_entry_num++;
PALF_LOG(INFO, "group entry", K(i-1));
}
if (first_log_entry_index - last_log_entry_index > 2) {
break;
}
}
leader.reset();
if (first_log_entry_index != 1 && last_log_entry_index != 9) {
PALF_LOG(INFO, "no group log has more than 2 log entry", K(first_log_entry_index), K(last_log_entry_index));
return;
}
cond.cond_.signal();
// 验证从一条包含多条LogEntry中日志中flashback,iterator迭代到中间的LogEntry后,flashback位点前还有几条LogEntry
// LogGroup LogGroup LogGroup LogGroup LogGroup(9条小日志)
EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(raw_write_leader, raw_write_leader.palf_handle_impl_->get_max_lsn()));
{
const int64_t id_new_raw_write = ATOMIC_AAF(&palf_id_, 1);
PalfHandleImplGuard new_raw_write_leader;
EXPECT_EQ(OB_SUCCESS, create_paxos_group(id_new_raw_write, leader_idx, new_raw_write_leader));
EXPECT_EQ(OB_SUCCESS, change_access_mode_to_raw_write(new_raw_write_leader));
EXPECT_EQ(OB_ITER_END, read_and_submit_group_log(raw_write_leader, new_raw_write_leader));
PalfBufferIterator buff_iterator;
PalfGroupBufferIterator group_iterator;
EXPECT_EQ(OB_SUCCESS, new_raw_write_leader.palf_handle_impl_->alloc_palf_buffer_iterator(LSN(0), buff_iterator));
EXPECT_EQ(OB_SUCCESS, new_raw_write_leader.palf_handle_impl_->alloc_palf_group_buffer_iterator(LSN(0), group_iterator));
SCN replayable_point_scn(SCN::min_scn());
// 验证replayable_point_scn为min_scn
EXPECT_EQ(OB_ITER_END, buff_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(iterate_end_by_replayable_point, true);
EXPECT_EQ(next_min_scn, SCN::plus(replayable_point_scn, 1));
EXPECT_EQ(buff_iterator.iterator_impl_.prev_entry_scn_, replayable_point_scn);
EXPECT_EQ(OB_ITER_END, group_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(iterate_end_by_replayable_point, true);
EXPECT_EQ(next_min_scn, SCN::plus(replayable_point_scn, 1));
EXPECT_EQ(group_iterator.iterator_impl_.prev_entry_scn_, replayable_point_scn);
// replayable_point_scn为第一条日志-1
replayable_point_scn = SCN::minus(max_scn1, 1);
EXPECT_EQ(OB_ITER_END, buff_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(iterate_end_by_replayable_point, true);
EXPECT_EQ(next_min_scn, SCN::plus(replayable_point_scn, 1));
EXPECT_EQ(buff_iterator.iterator_impl_.prev_entry_scn_, replayable_point_scn);
EXPECT_EQ(OB_ITER_END, group_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(iterate_end_by_replayable_point, true);
EXPECT_EQ(next_min_scn, SCN::plus(replayable_point_scn, 1));
EXPECT_EQ(group_iterator.iterator_impl_.prev_entry_scn_, replayable_point_scn);
// replayable_point_scn为第一条日志
replayable_point_scn = max_scn1;
EXPECT_EQ(OB_SUCCESS, buff_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(iterate_end_by_replayable_point, false);
EXPECT_EQ(buff_iterator.iterator_impl_.prev_entry_scn_, replayable_point_scn);
EXPECT_EQ(OB_SUCCESS, group_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(iterate_end_by_replayable_point, false);
EXPECT_EQ(group_iterator.iterator_impl_.prev_entry_scn_, replayable_point_scn);
// replayable_point_scn为第一条日志 + 1
replayable_point_scn = SCN::plus(max_scn1, 1);
EXPECT_EQ(OB_ITER_END, buff_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(next_min_scn, SCN::plus(replayable_point_scn, 1));
EXPECT_EQ(iterate_end_by_replayable_point, true);
EXPECT_EQ(buff_iterator.iterator_impl_.prev_entry_scn_, replayable_point_scn);
EXPECT_EQ(OB_ITER_END, group_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(next_min_scn, SCN::plus(replayable_point_scn, 1));
EXPECT_EQ(iterate_end_by_replayable_point, true);
EXPECT_EQ(group_iterator.iterator_impl_.prev_entry_scn_, replayable_point_scn);
// 成功迭代第二条日志,第三条日志
replayable_point_scn = last_scn;
EXPECT_EQ(OB_SUCCESS, buff_iterator.next(replayable_point_scn));
EXPECT_EQ(OB_SUCCESS, group_iterator.next(replayable_point_scn));
EXPECT_EQ(OB_SUCCESS, buff_iterator.next(replayable_point_scn));
EXPECT_EQ(OB_SUCCESS, group_iterator.next(replayable_point_scn));
// 第四条日志一定是LogGroupEntry
replayable_point_scn = scns[0];
EXPECT_EQ(OB_SUCCESS, buff_iterator.next(replayable_point_scn));
EXPECT_EQ(buff_iterator.iterator_impl_.prev_entry_scn_, replayable_point_scn);
EXPECT_EQ(OB_SUCCESS, group_iterator.next(replayable_point_scn));
EXPECT_EQ(group_iterator.iterator_impl_.prev_entry_scn_, replayable_point_scn);
// 迭代第五条LogGroupEntry的第一条LogEntry
EXPECT_EQ(OB_ITER_END, buff_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(next_min_scn, SCN::plus(replayable_point_scn, 1));
EXPECT_EQ(iterate_end_by_replayable_point, true);
EXPECT_EQ(buff_iterator.iterator_impl_.prev_entry_scn_, replayable_point_scn);
EXPECT_NE(buff_iterator.iterator_impl_.log_storage_->get_lsn(
buff_iterator.iterator_impl_.curr_read_pos_), lsns[1]);
EXPECT_EQ(OB_ITER_END, group_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(next_min_scn, SCN::plus(replayable_point_scn, 1));
EXPECT_EQ(iterate_end_by_replayable_point, true);
EXPECT_EQ(group_iterator.iterator_impl_.prev_entry_scn_, replayable_point_scn);
EXPECT_NE(group_iterator.iterator_impl_.log_storage_->get_lsn(
group_iterator.iterator_impl_.curr_read_pos_), lsns[1]);
// 由于被受控回放,buff_iterator以及group_iterator都没有推进curr_read_pos_
EXPECT_EQ(group_iterator.iterator_impl_.log_storage_->get_lsn(
group_iterator.iterator_impl_.curr_read_pos_),
buff_iterator.iterator_impl_.log_storage_->get_lsn(
buff_iterator.iterator_impl_.curr_read_pos_));
// 成功迭代第五条LogGroupEntry的第一条LogEntry
replayable_point_scn = scns[1];
EXPECT_EQ(OB_SUCCESS, buff_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(buff_iterator.iterator_impl_.prev_entry_scn_, replayable_point_scn);
EXPECT_EQ(buff_iterator.iterator_impl_.log_storage_->get_lsn(
buff_iterator.iterator_impl_.curr_read_pos_), lsns[1]);
// group iterator被受控回放, 但由于第五条日志的max_scn大于受控回放点,故受控回放
EXPECT_EQ(OB_ITER_END, group_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point));
// 由于受控回放的group entry对应的min scn和replayable_point_scn一样,因此next_min_scn会被设置为replayable_point_scn
EXPECT_EQ(next_min_scn, replayable_point_scn);
EXPECT_EQ(iterate_end_by_replayable_point, true);
EXPECT_EQ(group_iterator.iterator_impl_.prev_entry_scn_, scns[0]);
EXPECT_NE(group_iterator.iterator_impl_.log_storage_->get_lsn(
group_iterator.iterator_impl_.curr_read_pos_), lsns[1]);
// 由于被第一条LogEntry受控回放,group_iterator没有推进curr_read_pos_, buff_iter推进了curr_read_pos_
EXPECT_NE(group_iterator.iterator_impl_.log_storage_->get_lsn(
group_iterator.iterator_impl_.curr_read_pos_),
buff_iterator.iterator_impl_.log_storage_->get_lsn(
buff_iterator.iterator_impl_.curr_read_pos_));
// buff_iterator的游标到了第五条group_entry的第一条小日志
// grou_iterator的游标到了第五条group_entry开头
// sncs[0] 第四条group entry,scns[1] - scns[9]是第二条
// 第五条group entry的第五条小日志被flashback
EXPECT_EQ(OB_SUCCESS, new_raw_write_leader.palf_handle_impl_->inner_flashback(scns[4]));
EXPECT_EQ(new_raw_write_leader.palf_handle_impl_->get_max_scn(), scns[4]);
EXPECT_EQ(OB_SUCCESS, change_access_mode_to_append(new_raw_write_leader));
// 提交一条group_entry
// 对于buff_iterator, 存在两条group_entry未读,一条raw_rwrite(包含4条小日志,游标停留在第一条小日志末尾),一条append
// 对于group_iterator, 存在三条group_entry未读,一条raw_rwrite(包含4条小日志,游标停留在group_entry头部),一条append
EXPECT_EQ(OB_SUCCESS, submit_log(new_raw_write_leader, 1, leader_idx, 100));
EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(new_raw_write_leader, new_raw_write_leader.palf_handle_impl_->get_max_lsn()));
// 对于buff_iterator
// lsns[2]为第二条小日志开头,即第一条小日志末尾
// 验证游标起始位置为第一条小日志头部
// next 返回iterate是否清空cache
// 迭代raw_write写入的小日志
// 迭代append写入的小日志
PALF_LOG(INFO, "rulin trace 1", K(lsns[2]), K(lsns[1]), K(lsns[0]), K(buff_iterator));
EXPECT_EQ(buff_iterator.iterator_impl_.log_storage_->get_lsn(buff_iterator.iterator_impl_.curr_read_pos_), lsns[1]);
EXPECT_EQ(OB_ITER_END, buff_iterator.next(SCN::min_scn(), next_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(true, iterate_end_by_replayable_point);
EXPECT_EQ(next_min_scn, SCN::plus(buff_iterator.iterator_impl_.prev_entry_scn_, 1));
EXPECT_EQ(0, buff_iterator.iterator_impl_.curr_read_pos_);
// 迭代第二条日志
EXPECT_EQ(OB_SUCCESS, buff_iterator.next(SCN::max_scn()));
// 迭代第三条日志
EXPECT_EQ(OB_SUCCESS, buff_iterator.next(SCN::max_scn()));
// 迭代第四条日志
EXPECT_EQ(OB_SUCCESS, buff_iterator.next(SCN::max_scn()));
// 迭代第五条日志(迭代新的GroupENtry, 非受控回放)
EXPECT_EQ(OB_SUCCESS, buff_iterator.next(SCN::min_scn()));
EXPECT_EQ(OB_ITER_END, buff_iterator.next(SCN::min_scn()));
// 对于group_iterator
// 验证游标起始位置为raw_write日志开头
// next 返回iterate是否清空cache
// 迭代raw_write写入的大日志
// 迭代append写入的大日志
PALF_LOG(INFO, "rulin trace 2", K(lsns[2]), K(lsns[1]), K(lsns[0]), K(group_iterator));
EXPECT_EQ(group_iterator.iterator_impl_.log_storage_->get_lsn(group_iterator.iterator_impl_.curr_read_pos_), lsns[1] - sizeof(LogGroupEntryHeader));
EXPECT_EQ(OB_ITER_END, group_iterator.next(SCN::min_scn(), next_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(true, iterate_end_by_replayable_point);
EXPECT_EQ(next_min_scn, SCN::plus(group_iterator.iterator_impl_.prev_entry_scn_, 1));
// 迭代raw_write日志
EXPECT_EQ(OB_SUCCESS, group_iterator.next(SCN::max_scn()));
EXPECT_EQ(OB_SUCCESS, group_iterator.next(SCN::max_scn()));
EXPECT_EQ(OB_ITER_END, group_iterator.next(SCN::max_scn()));
}
// 验证从一条包含多条LogEntry中日志中flashback,iterator迭代到中间的LogEntry后,flashback位点前没有LogEntry
// LogGroup LogGroup LogGroup LogGroup LogGroup(9条小日志)
EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(raw_write_leader, raw_write_leader.palf_handle_impl_->get_max_lsn()));
{
const int64_t id_new_raw_write = ATOMIC_AAF(&palf_id_, 1);
PalfHandleImplGuard new_raw_write_leader;
EXPECT_EQ(OB_SUCCESS, create_paxos_group(id_new_raw_write, leader_idx, new_raw_write_leader));
EXPECT_EQ(OB_SUCCESS, change_access_mode_to_raw_write(new_raw_write_leader));
EXPECT_EQ(OB_ITER_END, read_and_submit_group_log(raw_write_leader, new_raw_write_leader));
PalfBufferIterator buff_iterator;
PalfGroupBufferIterator group_iterator;
EXPECT_EQ(OB_SUCCESS, new_raw_write_leader.palf_handle_impl_->alloc_palf_buffer_iterator(LSN(0), buff_iterator));
EXPECT_EQ(OB_SUCCESS, new_raw_write_leader.palf_handle_impl_->alloc_palf_group_buffer_iterator(LSN(0), group_iterator));
// 成功迭代第一条日志,第二条日志,第三条日志
SCN replayable_point_scn(last_scn);
EXPECT_EQ(OB_SUCCESS, buff_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(OB_SUCCESS, group_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(OB_SUCCESS, buff_iterator.next(replayable_point_scn));
EXPECT_EQ(OB_SUCCESS, group_iterator.next(replayable_point_scn));
EXPECT_EQ(OB_SUCCESS, buff_iterator.next(replayable_point_scn));
EXPECT_EQ(OB_SUCCESS, group_iterator.next(replayable_point_scn));
// 第四条日志一定是LogGroupEntry
replayable_point_scn = scns[0];
EXPECT_EQ(OB_SUCCESS, buff_iterator.next(replayable_point_scn));
EXPECT_EQ(buff_iterator.iterator_impl_.prev_entry_scn_, replayable_point_scn);
EXPECT_EQ(OB_SUCCESS, group_iterator.next(replayable_point_scn));
EXPECT_EQ(group_iterator.iterator_impl_.prev_entry_scn_, replayable_point_scn);
// 迭代第五条LogGroupEntry的第一条LogEntry
EXPECT_EQ(OB_ITER_END, buff_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(next_min_scn, SCN::plus(replayable_point_scn, 1));
EXPECT_EQ(iterate_end_by_replayable_point, true);
EXPECT_EQ(buff_iterator.iterator_impl_.prev_entry_scn_, replayable_point_scn);
EXPECT_NE(buff_iterator.iterator_impl_.log_storage_->get_lsn(
buff_iterator.iterator_impl_.curr_read_pos_), lsns[1]);
EXPECT_EQ(OB_ITER_END, group_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(next_min_scn, SCN::plus(replayable_point_scn, 1));
EXPECT_EQ(iterate_end_by_replayable_point, true);
EXPECT_EQ(group_iterator.iterator_impl_.prev_entry_scn_, replayable_point_scn);
EXPECT_NE(group_iterator.iterator_impl_.log_storage_->get_lsn(
group_iterator.iterator_impl_.curr_read_pos_), lsns[1]);
// 由于被受控回放,buff_iterator以及group_iterator都没有推进curr_read_pos_
EXPECT_EQ(group_iterator.iterator_impl_.log_storage_->get_lsn(
group_iterator.iterator_impl_.curr_read_pos_),
buff_iterator.iterator_impl_.log_storage_->get_lsn(
buff_iterator.iterator_impl_.curr_read_pos_));
// 成功迭代第五条LogGroupEntry的第一条LogEntry
replayable_point_scn = scns[1];
EXPECT_EQ(OB_SUCCESS, buff_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(buff_iterator.iterator_impl_.prev_entry_scn_, replayable_point_scn);
EXPECT_EQ(buff_iterator.iterator_impl_.log_storage_->get_lsn(
buff_iterator.iterator_impl_.curr_read_pos_), lsns[1]);
// group iterator被受控回放, 但由于第五条日志的max_scn大于受控回放点,故受控回放
EXPECT_EQ(OB_ITER_END, group_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point));
// 由于受控回放的group entry对应的min scn和replayable_point_scn一样,因此next_min_scn会被设置为replayable_point_scn
EXPECT_EQ(next_min_scn, replayable_point_scn);
EXPECT_EQ(iterate_end_by_replayable_point, true);
EXPECT_EQ(group_iterator.iterator_impl_.prev_entry_scn_, scns[0]);
EXPECT_NE(group_iterator.iterator_impl_.log_storage_->get_lsn(
group_iterator.iterator_impl_.curr_read_pos_), lsns[1]);
// 由于被第一条LogEntry受控回放,group_iterator没有推进curr_read_pos_, buff_iter推进了curr_read_pos_
EXPECT_NE(group_iterator.iterator_impl_.log_storage_->get_lsn(
group_iterator.iterator_impl_.curr_read_pos_),
buff_iterator.iterator_impl_.log_storage_->get_lsn(
buff_iterator.iterator_impl_.curr_read_pos_));
// 迭代日志发现需要受控回放
EXPECT_EQ(OB_ITER_END, buff_iterator.next(scns[1], next_min_scn, iterate_end_by_replayable_point));
// buff_iterator的游标到了第五条group_entry的第一条小日志末尾
// grou_iterator的游标到了第五条group_entry开头
// sncs[0] 第四条group entry,scns[1] - scns[9]是第二条
// 第五条group entry的第二条小日志被flashback
EXPECT_EQ(OB_SUCCESS, new_raw_write_leader.palf_handle_impl_->inner_flashback(scns[2]));
EXPECT_EQ(new_raw_write_leader.palf_handle_impl_->get_max_scn(), scns[2]);
EXPECT_EQ(OB_SUCCESS, change_access_mode_to_append(new_raw_write_leader));
// 提交一条group_entry
// 对于buff_iterator, 存在两条group_entry未读,一条raw_rwrite(包含4条小日志,游标停留在第一条小日志末尾),一条append
// 对于group_iterator, 存在三条group_entry未读,一条raw_rwrite(包含4条小日志,游标停留在group_entry头部),一条append
EXPECT_EQ(OB_SUCCESS, submit_log(new_raw_write_leader, 1, leader_idx, 100));
EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(new_raw_write_leader, new_raw_write_leader.palf_handle_impl_->get_max_lsn()));
// 对于buff_iterator
// lsns[2]为第二条小日志开头,即第一条小日志末尾
// 验证游标起始位置为第一条小日志头部
// next 返回iterate是否清空cache
// 迭代raw_write写入的小日志
// 迭代append写入的小日志
PALF_LOG(INFO, "rulin trace 3", K(lsns[2]), K(lsns[1]), K(lsns[0]), K(buff_iterator));
EXPECT_EQ(buff_iterator.iterator_impl_.log_storage_->get_lsn(buff_iterator.iterator_impl_.curr_read_pos_), lsns[2]);
EXPECT_EQ(OB_ITER_END, buff_iterator.next(SCN::min_scn(), next_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(true, iterate_end_by_replayable_point);
EXPECT_EQ(next_min_scn, SCN::plus(buff_iterator.iterator_impl_.prev_entry_scn_, 1));
EXPECT_EQ(0, buff_iterator.iterator_impl_.curr_read_pos_);
// 迭代第二条小日志
EXPECT_EQ(OB_SUCCESS, buff_iterator.next(SCN::max_scn()));
// 迭代新写入的LogGroupEntry, 不需要受控回放
EXPECT_EQ(OB_SUCCESS, buff_iterator.next(SCN::min_scn()));
EXPECT_EQ(OB_ITER_END, buff_iterator.next(SCN::min_scn()));
// 对于group_iterator
// 验证游标起始位置为raw_write日志开头
// next 返回iterate是否清空cache
// 迭代raw_write写入的大日志
// 迭代append写入的大日志
PALF_LOG(INFO, "rulin trace 4", K(lsns[2]), K(lsns[1]), K(lsns[0]), K(group_iterator));
EXPECT_EQ(group_iterator.iterator_impl_.log_storage_->get_lsn(group_iterator.iterator_impl_.curr_read_pos_), lsns[1] - sizeof(LogGroupEntryHeader));
EXPECT_EQ(OB_ITER_END, group_iterator.next(SCN::min_scn(), next_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(true, iterate_end_by_replayable_point);
EXPECT_EQ(next_min_scn, SCN::plus(group_iterator.iterator_impl_.prev_entry_scn_, 1));
// 迭代raw_write日志
EXPECT_EQ(OB_SUCCESS, group_iterator.next(SCN::max_scn()));
// 迭代新的GruopEntry
EXPECT_EQ(OB_SUCCESS, group_iterator.next(SCN::min_scn()));
EXPECT_EQ(OB_ITER_END, group_iterator.next(SCN::min_scn()));
}
// 验证一条LogGroupEntry需要受控回放,buff iterator不能更新accumlate_checksum和curr_read_pos_
// LogGroup LogGroup LogGroup LogGroup LogGroup(9条小日志)
// last_scn scns[0] scns[1]...
{
const int64_t id_new_raw_write = ATOMIC_AAF(&palf_id_, 1);
PalfHandleImplGuard new_raw_write_leader;
EXPECT_EQ(OB_SUCCESS, create_paxos_group(id_new_raw_write, leader_idx, new_raw_write_leader));
EXPECT_EQ(OB_SUCCESS, change_access_mode_to_raw_write(new_raw_write_leader));
EXPECT_EQ(OB_ITER_END, read_and_submit_group_log(raw_write_leader, new_raw_write_leader));
PalfBufferIterator iterator;
EXPECT_EQ(OB_SUCCESS, new_raw_write_leader.palf_handle_impl_->alloc_palf_buffer_iterator(LSN(0), iterator));
SCN replayable_point_scn(last_scn);
EXPECT_EQ(OB_SUCCESS, iterator.next(replayable_point_scn));
EXPECT_EQ(OB_SUCCESS, iterator.next(replayable_point_scn));
EXPECT_EQ(OB_SUCCESS, iterator.next(replayable_point_scn));
EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(iterate_end_by_replayable_point, true);
EXPECT_EQ(next_min_scn, SCN::plus(last_scn, 1));
replayable_point_scn = scns[0];
EXPECT_EQ(OB_SUCCESS, iterator.next(replayable_point_scn));
// scns[1]对应的日志无法吐出
EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(next_min_scn, SCN::plus(scns[0], 1));
EXPECT_EQ(iterator.iterator_impl_.prev_entry_scn_, scns[0]);
// flashback到scns[0]
EXPECT_EQ(OB_SUCCESS, new_raw_write_leader.palf_handle_impl_->inner_flashback(scns[0]));
EXPECT_EQ(OB_SUCCESS, change_access_mode_to_append(new_raw_write_leader));
EXPECT_EQ(OB_SUCCESS, submit_log(new_raw_write_leader, 1, leader_idx, 100));
EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(new_raw_write_leader, new_raw_write_leader.palf_handle_impl_->get_max_lsn()));
// scns[0]对应的日志为raw write, 被flashback了, iterator停在scns[0]的末尾
// 迭代新写入的日志成功
EXPECT_EQ(OB_SUCCESS, iterator.next(SCN::min_scn(), next_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(OB_ITER_END, iterator.next(SCN::min_scn()));
}
EXPECT_EQ(OB_ITER_END, iterator.next(max_scn3));
}

View File

@ -46,6 +46,42 @@ enum class LogEntryType
};
// =========== LogEntryType end =============
enum class IterateEndReason {
DUE_TO_REPLAYBLE_POINT_SCN_LOG_GROUP_ENTRY = 0,
DUE_TO_REPLAYBLE_POINT_SCN_LOG_ENTRY = 1,
DUE_TO_FILE_END_LSN_NOT_READ_NEW_DATA = 2,
DUE_TO_FILE_END_LSN_READ_NEW_DATA = 3,
MAX_TYPE = 4
};
struct IterateEndInfo {
IterateEndInfo()
{
reset();
}
~IterateEndInfo()
{
reset();
}
void reset()
{
reason_ = IterateEndReason::MAX_TYPE;
log_scn_.reset();
}
bool is_valid() const
{
return IterateEndReason::MAX_TYPE != reason_
&& log_scn_.is_valid();
}
bool is_iterate_end_by_replayable_point_scn() const
{
return IterateEndReason::DUE_TO_REPLAYBLE_POINT_SCN_LOG_ENTRY == reason_
|| IterateEndReason::DUE_TO_REPLAYBLE_POINT_SCN_LOG_GROUP_ENTRY == reason_;
}
IterateEndReason reason_;
SCN log_scn_;
TO_STRING_KV(K_(reason), K_(log_scn));
};
// LogIteratorImpl provide the ability to iterate all log entry.
template <class ENTRY>
class LogIteratorImpl
@ -120,7 +156,8 @@ private:
// OB_ITER_END
// OB_ERR_OUT_LOWER_BOUND
// OB_NEED_RETRY: means the data has been truncate concurrently
int get_next_entry_();
int get_next_entry_(const SCN &replayable_point_scn,
IterateEndInfo &info);
// According to LogEntryType, deserialize different log entry
// The log format
@ -130,38 +167,79 @@ private:
// @retval
// OB_SUCCESS.
// OB_BUF_NOT_ENOUGH.
// OB_INVALID_DATA, means log entry is not integrity, need check this
// log entry whether is the last one.
int parse_one_entry_();
// OB_INVALID_DATA
// -- means log entry is not integrity, need check this log entry whether is the last one.
// OB_CHECKSUM_ERROR
// -- means accumlate checksum is not matched.
// OB_ITER_END
// -- means log entry is iterated end by replayable_point_scn
int parse_one_entry_(const SCN &replayable_point_scn,
IterateEndInfo &info);
template <
class ACTUAL_ENTRY>
int parse_one_specific_entry_(ACTUAL_ENTRY &actual_entry)
int parse_meta_entry_()
{
int ret = OB_SUCCESS;
const bool matched_type = std::is_same<ACTUAL_ENTRY, ENTRY>::value;
const bool matched_type = std::is_same<LogMetaEntry, ENTRY>::value;
int64_t pos = curr_read_pos_;
if (true == matched_type) {
if (OB_FAIL(curr_entry_.deserialize(buf_, curr_read_buf_end_pos_, pos))) {
// When curr_entry_ is LogGroupEntry, need check accumlate checksum
// and check whether it's raw write
} else if (OB_FAIL(handle_each_log_group_entry_(curr_entry_))) {
PALF_LOG(WARN, "handle_each_log_group_entry_ failed", KPC(this));
}
} else if (OB_FAIL(actual_entry.deserialize(buf_, curr_read_buf_end_pos_, pos))) {
PALF_LOG(TRACE, "deserialize entry failed", K(ret), KPC(this));
} else if (OB_FAIL(handle_each_log_group_entry_(actual_entry))) {
PALF_LOG(ERROR, "handle_each_log_group_entry_ failed", KPC(this), K(actual_entry));
} else {
ret = OB_EAGAIN;
advance_read_lsn_(actual_entry.get_payload_offset());
PALF_LOG(TRACE, "advance_read_lsn_ payload offset", K(ret), KPC(this), K(actual_entry), "payload offset",
actual_entry.get_payload_offset());
ret = OB_ERR_UNEXPECTED;
PALF_LOG(ERROR, "parse LogMetaEntry failed, unexpected error", K(ret), KPC(this));
}
return ret;
}
int parse_log_block_header_();
int parse_log_entry_(const SCN &replayable_point_scn,
IterateEndInfo &info)
{
int ret = OB_SUCCESS;
const bool matched_type = std::is_same<LogEntry, ENTRY>::value;
int64_t pos = curr_read_pos_;
if (true == matched_type) {
if (OB_FAIL(curr_entry_.deserialize(buf_, curr_read_buf_end_pos_, pos))) {
} else if (curr_entry_is_raw_write_ && curr_entry_.get_scn() > replayable_point_scn) {
ret = OB_ITER_END;
info.log_scn_ = curr_entry_.get_scn();
info.reason_ = IterateEndReason::DUE_TO_REPLAYBLE_POINT_SCN_LOG_ENTRY;
PALF_LOG(TRACE, "iterate end by replayable_point", KPC(this), K(replayable_point_scn), K(info));
}
} else {
ret = OB_ERR_UNEXPECTED;
PALF_LOG(ERROR, "parse LogEntry failed, unexpected error", KPC(this), K(replayable_point_scn), K(info));
}
return ret;
}
// When entry in file is LogGroupEntry, handle it specifically.
// 1. for raw write LogGroupEntry, if it's controlled by replayable_point_scn, no need update
// 'curr_read_pos_', 'accum_checksum_', because this log may be flashbacked.
// 2. for append LogGroupEntry, handle it normally.
int parse_log_group_entry_(const SCN &replayable_point_scn,
IterateEndInfo &info)
{
int ret = OB_SUCCESS;
const bool matched_type = std::is_same<LogGroupEntry, ENTRY>::value;
LogGroupEntry actual_entry;
int64_t pos = curr_read_pos_;
if (true == matched_type) {
if (OB_FAIL(curr_entry_.deserialize(buf_, curr_read_buf_end_pos_, pos))) {
} else if (OB_FAIL(handle_each_log_group_entry_(curr_entry_, replayable_point_scn, info))) {
PALF_LOG(WARN, "handle_each_log_group_entry_ failed", KPC(this), K(info), K(replayable_point_scn));
}
} else if (OB_FAIL(actual_entry.deserialize(buf_, curr_read_buf_end_pos_, pos))) {
PALF_LOG(TRACE, "deserialize entry failed", K(ret), KPC(this));
} else if (OB_FAIL(handle_each_log_group_entry_(actual_entry, replayable_point_scn, info))) {
PALF_LOG(WARN, "handle_each_log_group_entry_ failed", KPC(this), K(actual_entry), K(info), K(replayable_point_scn));
} else {
ret = OB_EAGAIN;
advance_read_lsn_(actual_entry.get_payload_offset());
PALF_LOG(TRACE, "advance_read_lsn_ payload offset", K(ret), KPC(this), K(actual_entry), "payload offset",
actual_entry.get_payload_offset(), K(info), K(replayable_point_scn));
}
return ret;
}
int get_log_entry_type_(LogEntryType &log_entry_type);
@ -175,29 +253,84 @@ private:
void try_clean_up_cache_();
template <class T>
int handle_each_log_group_entry_(const T&entry)
// when T is not LogGroupEntry, no need do anything
int handle_each_log_group_entry_(const T&entry,
const SCN &replayable_point_scn,
IterateEndInfo &info)
{
PALF_LOG(TRACE, "T is not LogGroupEntry, do no thing", K(entry));
return OB_SUCCESS;
}
template <>
int handle_each_log_group_entry_(const LogGroupEntry&entry)
// When T is LogGroupEntry, need do:
// 1. check accumlate checksum:
// - if accumlate checksum is not match, return OB_CHECKSUM_ERROR
// - if data checksum is not match, return OB_INVALID_DATA
// 2. check this entry whether need control by 'replayable_point_scn':
// - if control by 'replayable_point_scn', return OB_ITER_END, and don't modify
// several fields in LogIteratorImpl('curr_read_pos_', 'curr_entry_is_raw_write_')
// - if not control by 'replayable_point_scn', return OB_SUCCESS.
int handle_each_log_group_entry_(const LogGroupEntry&entry,
const SCN &replayable_point_scn,
IterateEndInfo &info)
{
int ret = OB_SUCCESS;
PALF_LOG(TRACE, "T is LogGroupEntry, do no thing", K(entry));
// if (OB_FAIL(verify_accum_checksum_(entry))) {
// PALF_LOG(ERROR, "verify_accum_checksum_ failed", K(ret), KPC(this), K(entry));
// } else {
bool curr_entry_is_raw_write = entry.get_header().is_raw_write();
int64_t new_accumlate_checksum = -1;
PALF_LOG(TRACE, "T is LogGroupEntry", K(entry));
if (OB_FAIL(verify_accum_checksum_(entry, new_accumlate_checksum))) {
PALF_LOG(WARN, "verify_accum_checksum_ failed", K(ret), KPC(this), K(entry));
// NB: when current entry is raw write, and the log scn of current entry is greater than
// replayable_point_scn, this log may be clean up, therefore we can not update several fields of
// LogIteratorImpl, return OB_ITER_END directlly, otherwise, we may not parse new LogGroupEntryHeader
// after flashback position, this will cause one log which is append, but control by replable_point_scn.
//
// NB: we need check the min scn of LogGroupEntry whether has been greater than
// replayable_point_scn:
// - if LogGroupEntry has two LogEntry, the scn of them are 10, 15 respectively,
// replayable_point_scn is 12. in this case, we can read first LogEntry, and
// we can update several fields like 'curr_entry_is_raw_write_', 'accum_checksum_'
// and the others('curr_read_pos_'...). when the flashback scn is 12, the LogEntry
// after 10 will be truncated, the new LogGroupEntry will be generated, meanwhile,
// we will advanced 'curr_read_pos_' to the end of first LogEntry and read new LogGroupEntry
// correctly.
// - if LogGroupEntry has one LogEntry, the scn of it is 13, the several fields are
// not been updated because of it's controlled by replayable_point_scn, when the flashback
// scn is 12, we don't need rollback these fields.
//
// NB: for PalfGroupBufferIterator, we should use max scn to control replay and use min scn
// as the log_scn_ of info. consider that, replayable_point_scn is 12, the min scn of group log
// is 7 and max scn of group 15, we should not return this log. meanwhile, we should use
// scn 7 to update next_min_scn.
} else if (true == curr_entry_is_raw_write) {
SCN min_scn;
bool is_group_iterator = std::is_same<ENTRY, LogGroupEntry>::value;
if (OB_FAIL(entry.get_log_min_scn(min_scn))) {
PALF_LOG(ERROR, "get_log_min_scn failed", K(ret), KPC(this), K(min_scn),
K(entry), K(replayable_point_scn));
} else if ((is_group_iterator && entry.get_scn() > replayable_point_scn)
|| (!is_group_iterator && min_scn > replayable_point_scn)) {
info.log_scn_ = min_scn;
info.reason_ = IterateEndReason::DUE_TO_REPLAYBLE_POINT_SCN_LOG_GROUP_ENTRY;
ret = OB_ITER_END;
PALF_LOG(TRACE, "iterate end by replayable_point", K(ret), KPC(this), K(min_scn),
K(entry), K(replayable_point_scn), K(info), K(is_group_iterator));
} else {
}
}
if (OB_SUCC(ret)) {
curr_entry_is_raw_write_ = entry.get_header().is_raw_write();
// }
return ret;
accumlate_checksum_ = new_accumlate_checksum;
}
return ret;
}
// @brief: accumlate checksum verify, only verify checkum when accum_checksum_ is not -1.
// ret val:
// OB_SUCCESS
// OB_CHECKSUM_ERROR
int verify_accum_checksum_(const LogGroupEntry &entry);
int verify_accum_checksum_(const LogGroupEntry &entry,
int64_t &new_accumlate_checksum);
private:
static constexpr int MAX_READ_TIMES_IN_EACH_NEXT = 2;
@ -278,9 +411,12 @@ int LogIteratorImpl<ENTRY>::init(const GetModeVersion &get_mode_version,
curr_read_buf_end_pos_ = 0;
next_round_pread_size_ = MAX_LOG_BUFFER_SIZE;
log_storage_ = log_storage;
curr_entry_.reset();
curr_entry_is_raw_write_ = false;
curr_entry_size_ = 0;
init_mode_version_ = PALF_INITIAL_PROPOSAL_ID;
get_mode_version_ = get_mode_version;
prev_entry_scn_.reset();
accumlate_checksum_ = -1;
is_inited_ = true;
PALF_LOG(TRACE, "LogIteratorImpl init success", K(ret), KPC(this));
@ -295,9 +431,11 @@ void LogIteratorImpl<ENTRY>::reuse()
curr_read_buf_start_pos_ = 0;
curr_read_buf_end_pos_ = 0;
next_round_pread_size_ = MAX_LOG_BUFFER_SIZE;
curr_entry_.reset();
curr_entry_is_raw_write_ = false;
curr_entry_size_ = 0;
prev_entry_scn_.reset();
init_mode_version_ = PALF_INITIAL_PROPOSAL_ID;
prev_entry_scn_.reset();
accumlate_checksum_ = -1;
}
@ -306,15 +444,19 @@ void LogIteratorImpl<ENTRY>::destroy()
{
if (IS_INIT) {
is_inited_ = false;
accumlate_checksum_ = -1;
prev_entry_scn_.reset();
init_mode_version_ = PALF_INITIAL_PROPOSAL_ID;
curr_entry_size_ = 0;
curr_entry_is_raw_write_ = false;
curr_entry_.reset();
log_storage_ = NULL;
next_round_pread_size_ = 0;
buf_ = NULL;
curr_read_buf_end_pos_ = 0;
curr_read_buf_start_pos_ = 0;
curr_read_pos_ = 0;
init_mode_version_ = 0;
accumlate_checksum_ = -1;
}
}
@ -337,14 +479,17 @@ LSN LogIteratorImpl<ENTRY>::get_curr_read_lsn() const
// last entry.
// NB: for restarting, the committed offset of sliding window is invalid.
template <class ENTRY>
int LogIteratorImpl<ENTRY>::get_next_entry_()
int LogIteratorImpl<ENTRY>::get_next_entry_(const SCN &replayable_point_scn,
IterateEndInfo &info)
{
int ret = OB_SUCCESS;
// NB: check need read next enty
// Assume that read size must greater or equal than 1 in each round.
if (true == log_storage_->check_iterate_end(curr_read_pos_ + 1)) {
ret = OB_ITER_END;
PALF_LOG(TRACE, "get_next_entry_ iterate end, not read new data", K(ret), KPC(this));
info.reason_ = IterateEndReason::DUE_TO_FILE_END_LSN_NOT_READ_NEW_DATA;
info.log_scn_.reset();
PALF_LOG(TRACE, "get_next_entry_ iterate end, not read new data", K(ret), KPC(this), K(info));
} else {
// In truncate log case, the 'read_buf_' in LogIteratorStorage will not has
// integrity data, to avoid read data in dead loop, return OB_NEED_RETRY.
@ -362,14 +507,16 @@ int LogIteratorImpl<ENTRY>::get_next_entry_()
int read_times = 0;
do {
int64_t header_size = 0;
if (OB_SUCC(parse_one_entry_())) {
if (OB_SUCC(parse_one_entry_(replayable_point_scn, info))) {
curr_entry_size_ = curr_entry_.get_serialize_size();
} else if (OB_BUF_NOT_ENOUGH == ret) {
if (OB_FAIL(read_data_from_storage_()) && OB_ITER_END != ret
&& OB_ERR_OUT_OF_LOWER_BOUND != ret) {
PALF_LOG(WARN, "read_data_from_storage_ failed", K(ret), KPC(this));
} else if (OB_ITER_END == ret) {
PALF_LOG(WARN, "has iterate to end of block", K(ret), KPC(this));
info.reason_ = IterateEndReason::DUE_TO_FILE_END_LSN_NOT_READ_NEW_DATA;
info.log_scn_.reset();
PALF_LOG(WARN, "has iterate to end of block", K(ret), KPC(this), K(info));
} else if (OB_ERR_OUT_OF_LOWER_BOUND == ret) {
PALF_LOG(WARN, "the block may be unlinked", K(ret), KPC(this));
} else {
@ -386,11 +533,13 @@ int LogIteratorImpl<ENTRY>::get_next_entry_()
}
} while (OB_EAGAIN == ret);
// NB: check curr entry can be readable
// NB: check curr entry can be readable by file end lsn.
if (OB_SUCC(ret)
&& true == log_storage_->check_iterate_end(curr_read_pos_ + curr_entry_size_)) {
ret = OB_ITER_END;
PALF_LOG(TRACE, "get_next_entry_ iterate end, read new data", K(ret), KPC(this));
info.reason_ = IterateEndReason::DUE_TO_FILE_END_LSN_READ_NEW_DATA;
info.log_scn_ = curr_entry_.get_scn();
PALF_LOG(WARN, "get_next_entry_ iterate end, read new data", K(ret), KPC(this), K(info), K(replayable_point_scn));
}
}
return ret;
@ -414,25 +563,23 @@ int LogIteratorImpl<ENTRY>::next(const share::SCN &replayable_point_scn,
advance_read_lsn_(curr_entry_size_);
curr_entry_size_ = 0;
iterate_end_by_replayable_point = false;
IterateEndInfo info;
// NB: when return OB_ITER_END, we need try to clean up cache, and we should not clean up cache only when
// NB: when return OB_ITER_END, we need try to clean up cache, and we should clean up cache only when
// the log ts of curr entry is greater than 'replayable_point_scn', otherwise, we would return some logs
// which has been flasback, consider following case:
// 1. T1, 'replayable_point_scn' is 10, the log ts of curr entry is 15, but there is no flashback option.
// 1. T1, 'replayable_point_scn' is 10, the log ts of curr entry is 15, but there is no flashback option.(no any bad effect)
// 2. T2, 'replayable_point_scn' is 10, the logs on disk which the log ts after 10 has been flashbacked, and
// return OB_ITER_END because of 'file end lsn'.
// return OB_ITER_END because of 'file end lsn'.(no any bad effect)
// 3. T3, 'replayable_point_scn' has been advanced to 16, and write several logs on disk, however, the cache
// of iterator has not been clean up, the old logs will be returned.
//
// 1. T1, 'replayable_point_scn' is 10, the log ts of curr entry is 15, but there is no flashback option.
// 2. T2, 'replayable_point_scn' is 10, the logs on disk which the log ts after 10 has been flashbacked, and
// has append new logs.
// 3. T3, 'replayable_point_scn' has been advanced to 16, however, the cache of iterator has not been clean up,
// the old logs will be returned.
// of iterator has not been clean up, the old logs will be returned.(bad effect)
//
// Therefore, we should try_clean_up_cache_ in the beginning of each round of next.
(void) try_clean_up_cache_();
if (OB_FAIL(get_next_entry_())) {
if (!replayable_point_scn.is_valid()) {
ret = OB_INVALID_ARGUMENT;
PALF_LOG(WARN, "invalid argumetn", K(replayable_point_scn), KPC(this));
} else if (OB_FAIL(get_next_entry_(replayable_point_scn, info))) {
// NB: if the data which has been corrupted, clean cache.
// NB: if the accum_checksum_ is not match, return OB_CHECKSUM_ERROR.
if (OB_INVALID_DATA == ret) {
@ -444,68 +591,74 @@ int LogIteratorImpl<ENTRY>::next(const share::SCN &replayable_point_scn,
if (OB_ITER_END != ret) {
PALF_LOG(WARN, "get_next_entry_ failed", K(ret), KPC(this));
}
} else {
// NB: when current entry is raw write, and the log scn of current entry is greater than replayable_point_scn
// clean up cache when mode version has been changed.
if (true == curr_entry_is_raw_write_ && curr_entry_.get_scn() > replayable_point_scn) {
ret = OB_ITER_END;
iterate_end_by_replayable_point = true;
}
PALF_LOG(TRACE, "get_next_entry_ failed", K(ret), KPC(this), K(info), K(replayable_point_scn));
iterate_end_by_replayable_point = info.is_iterate_end_by_replayable_point_scn();
}
// If 'curr_entry_' can be iterate at this round, set 'prev_entry_scn_' to the log ts of 'curr_entry_'.
if (OB_SUCC(ret)) {
prev_entry_scn_ = curr_entry_.get_scn();
}
// if 'curr_entry_' is not readable, we should set 'next_min_scn' to the scn of 'prev_entry_' + 1,
// and if 'curr_entry_' is not readable due to replayable_point_scn, we should set 'next_min_scn'
// to replayable_point_scn + 1
//
// In case of 'next' return OB_ITER_END, we should set 'next_min_scn' which is the out parameter of 'next' to:
//
// 1. if 'prev_entry_scn_' is OB_INVALID_TIMESTAMP and 'curr_entry_' is not valid, means that there is no log before
// 'file end lsn', we should set 'next_min_scn' to OB_INVALID_TIMESTAMP
//
// 2. if 'curr_entry_' is valid but not readable, means that there is no readable log before 'file end lsn' or replayable_point_scn,
// we should set next_min_scn to std::min(replayable_point_scn, the log ts of 'curr_entry_'), however, replayable_point_scn may
// be smaller than 'prev_entry_scn_'(for example, the log entry correspond to'prev_entry_scn_' was writen by APPEND, its' scn
// may be greater than replayable_point_scn), we shoud set next_min_scn to std::max(std::min(replayable_point_scn + 1, the log ts
// of 'curr_entry_'), 'prev_entry_scn_' + 1).
//
// 3. if 'curr_entry_' is not valid and 'prev_entry_scn_' is not OB_INVALID_TIMESTAMP, means there is no log after 'file end lsn'
// or 'replayable_point_scn', we should set 'next_min_scn' to prev_entry_scn_ + 1;
// 1. if 'curr_entry_' iterate end by replayable point scn:
// we should set next_min_scn to std::min(replayable_point_scn+1, the log scn of 'curr_entry_'),
// however, replayable_point_scn may be smaller than 'prev_entry_scn_'(for example, the log
// entry correspond to'prev_entry_scn_' was writen by APPEND, its' scn may be greater than
// replayable_point_scn), we shoud set next_min_scn to std::max(
// std::min(replayable_point_scn + 1, the log scn of 'curr_entry_'), 'prev_entry_scn_' + 1).
//
// 2. oterwise, iterate end by file end lsn:
// - case 1: if 'curr_entry_' has been parsed from LogIteratorStorage, however, it's not readable
// due to file end lsn(consider that someone set the file end lsn to the middle of one
// LogGroupEntry), we should set next_min_scn to the max of min(curr_entry_'s scn,
// replayable_point_scn + 1) and prev_entry_scn_.
// - case 2: if 'curr_entry_' has not been parsed from LogIteratorStorage, we just se next_min_scn
// to prev_entry_scn_ + 1.
if (OB_ITER_END == ret) {
if (0 == curr_entry_size_ && !prev_entry_scn_.is_valid()) {
if (!info.is_valid() && !prev_entry_scn_.is_valid()) {
next_min_scn.reset();
PALF_LOG(WARN, "there is no readable log, set next_min_scn to OB_INVALID_TIMESTAMP", K(ret), KPC(this));
} else if (0 != curr_entry_size_) {
if (!curr_entry_.is_valid()) {
ret = OB_ERR_UNEXPECTED;
PALF_LOG(ERROR, "unexpected error, curr_entry_size_ is not zero but curr_entry_ is invalid", K(ret), KPC(this));
} else {
next_min_scn = MIN(
(replayable_point_scn.is_valid() ? SCN::plus(replayable_point_scn, 1) : SCN::min_scn()),
curr_entry_.get_scn());
}
PALF_LOG(WARN, "there is no readable log, set next_min_scn to OB_INVALID_TIMESTAMP",
K(ret), KPC(this), K(info), K(replayable_point_scn));
} else if (iterate_end_by_replayable_point || IterateEndReason::DUE_TO_FILE_END_LSN_READ_NEW_DATA == info.reason_) {
// when there is no log between [replayable_point_scn, 'curr_entry'), we can advance next_log_min_scn.
next_min_scn = MIN(
(replayable_point_scn.is_valid() ? SCN::plus(replayable_point_scn, 1) : SCN::max_scn()),
info.log_scn_);
PALF_LOG(TRACE, "update next_min_scn to min of replayable_point_scn and log_group_entry_min_scn_", KPC(this), K(info));
next_min_scn = MAX(
(prev_entry_scn_.is_valid() ? SCN::plus(prev_entry_scn_, 1) : SCN::min_scn()),
next_min_scn);
// NB: we need update 'prev_entry_scn_' to the max of 'replayable_point_scn' and 'prev_entry_scn_'
// when iterate end by replayable point, otherwise, 'next_min_scn' may be smaller than previous
// result, consider following case:
// 1. T1, next return OB_ITER_END because of replayable point, the log ts of curr_entry_ is 10,
// 'prev_entry_scn_' is 5, 'replayable_point_scn' is 9, next_min_scn would be set to 10 because
// of 'curr_entry_' is valid.
// 2. T2, several logs after 9(log_ts) has been truncate, next return OB_ITER_END because of
// 'file end lsn', 'curr_entry_' is invalid, 'replayable_point_scn' is 9, 'prev_entry_scn_'
// is 5, 'next_min_scn' would be set to 5.
if (replayable_point_scn < curr_entry_.get_scn()) {
PALF_LOG(TRACE, "update next_min_scn to max of prev_entry_scn_ and next_min_scn",
KPC(this), K(info), K(replayable_point_scn));
// NB: To make 'next_min_scn' newly, advance 'prev_entry_scn_' when 'curr_entry_' need control by readable point scn.
// consider follower case:
// T1, iterate an entry successfully, and make prev_entry_scn to 5;
// T2, iterate control by readable point scn, scn of 'curr_entry_' is 10, and readable point scn is 8.
// T3, the logs after 8 have been flashabcked, and no log has been writen.
// if we don't advance 'prev_entry_scn_' to 8, the continuous point of replay service can not update
// to 8.
if (info.log_scn_ > replayable_point_scn) {
prev_entry_scn_ = MAX(replayable_point_scn, prev_entry_scn_);
PALF_LOG(TRACE, "update prev_entry_scn_ to replayable_point_scn", KPC(this), K(info), K(replayable_point_scn));
}
// NB: if has not read any log, we should set next_min_scn to invalid.
} else if (IterateEndReason::DUE_TO_FILE_END_LSN_NOT_READ_NEW_DATA == info.reason_) {
next_min_scn = prev_entry_scn_.is_valid() ? SCN::plus(prev_entry_scn_, 1) : SCN::min_scn();
PALF_LOG(TRACE, "update next_min_scn to prev_entry_scn_ + 1", KPC(this), K(info), K(replayable_point_scn));
} else {
// prev_entry_scn_ must be invalid
next_min_scn = SCN::plus(prev_entry_scn_, 1);
}
}
if (OB_FAIL(ret)) {
curr_entry_size_ = 0;
// To debug easily, don't reset 'curr_entry_'
// curr_entry_.reset();
}
return ret;
}
@ -528,7 +681,8 @@ int LogIteratorImpl<ENTRY>::get_entry(ENTRY &entry, LSN &lsn, bool &is_raw_write
}
template<class ENTRY>
int LogIteratorImpl<ENTRY>::verify_accum_checksum_(const LogGroupEntry &entry)
int LogIteratorImpl<ENTRY>::verify_accum_checksum_(const LogGroupEntry &entry,
int64_t &new_accumlate_checksum)
{
int ret = OB_SUCCESS;
int64_t data_checksum = -1;
@ -537,12 +691,13 @@ int LogIteratorImpl<ENTRY>::verify_accum_checksum_(const LogGroupEntry &entry)
ret = OB_INVALID_DATA;
PALF_LOG(WARN, "invalid data", K(ret), KPC(this), K(entry));
} else if (-1 == accumlate_checksum_) {
accumlate_checksum_ = expected_verify_checksum;
PALF_LOG(INFO, "init accumlate_checksum to first LogGroupEntry", K(entry), KPC(this));
new_accumlate_checksum = expected_verify_checksum;
PALF_LOG(INFO, "init accumlate_checksum to first LogGroupEntry", K(entry), KPC(this),
K(new_accumlate_checksum));
} else if (OB_FAIL(LogChecksum::verify_accum_checksum(
accumlate_checksum_, data_checksum,
expected_verify_checksum, accumlate_checksum_))) {
PALF_LOG(ERROR, "verify checksum failed", K(ret), KPC(this), K(entry));
expected_verify_checksum, new_accumlate_checksum))) {
PALF_LOG(ERROR, "verify accumlate checksum failed", K(ret), KPC(this), K(entry));
} else {
PALF_LOG(TRACE, "verify_accum_checksum_ success", K(ret), KPC(this), K(entry));
}
@ -555,7 +710,8 @@ int LogIteratorImpl<ENTRY>::verify_accum_checksum_(const LogGroupEntry &entry)
// entry type is not 'wanted_log_entry_type', skip the header of this entry. NB: for
// LOG_PADDING, the header size include data_len.
template <class ENTRY>
int LogIteratorImpl<ENTRY>::parse_one_entry_()
int LogIteratorImpl<ENTRY>::parse_one_entry_(const SCN &replayable_point_scn,
IterateEndInfo &info)
{
int ret = OB_SUCCESS;
const int MAGIC_NUMBER_SIZE = sizeof(int16_t);
@ -567,26 +723,17 @@ int LogIteratorImpl<ENTRY>::parse_one_entry_()
switch (actual_log_entry_type) {
case LogEntryType::GROUP_ENTRY_HEADER:
{
LogGroupEntry entry;
ret = parse_one_specific_entry_(entry);
ret = parse_log_group_entry_(replayable_point_scn, info);
break;
}
case LogEntryType::LOG_ENTRY_HEADER:
{
LogEntry entry;
ret = parse_one_specific_entry_(entry);
ret = parse_log_entry_(replayable_point_scn, info);
break;
}
case LogEntryType::LOG_META_ENTRY_HEADER:
{
LogMetaEntry entry;
ret = parse_one_specific_entry_(entry);
break;
}
case LogEntryType::LOG_INFO_BLOCK_HEADER:
{
ret = parse_log_block_header_();
if (OB_SUCC(ret)) ret = OB_EAGAIN;
ret = parse_meta_entry_();
break;
}
default:
@ -599,25 +746,6 @@ int LogIteratorImpl<ENTRY>::parse_one_entry_()
return ret;
}
template <class ENTRY>
int LogIteratorImpl<ENTRY>::parse_log_block_header_()
{
int ret = OB_SUCCESS;
int64_t pos = curr_read_pos_;
LogBlockHeader actual_entry;
if (OB_FAIL(actual_entry.deserialize(buf_, curr_read_buf_end_pos_, pos))) {
PALF_LOG(TRACE, "deserialize entry failed", K(ret), KPC(this));
} else {
ret = OB_EAGAIN;
advance_read_lsn_(MAX_INFO_BLOCK_SIZE);
PALF_LOG(INFO, "parse_log_block_header_ success", K(ret), KPC(this));
}
if (OB_BUF_NOT_ENOUGH == ret) {
OB_ASSERT(true);
}
return ret;
}
template <class ENTRY>
void LogIteratorImpl<ENTRY>::advance_read_lsn_(const offset_t step)
{
@ -757,12 +885,30 @@ void LogIteratorImpl<ENTRY>::try_clean_up_cache_()
PALF_LOG_RET(WARN, OB_ERR_UNEXPECTED, "current_mode_version is unexpected", K(current_mode_version), KPC(this));
} else if (init_mode_version_ < current_mode_version) {
PALF_LOG_RET(WARN, OB_SUCCESS, "mode version has been changed, need reset cache buf", KPC(this), K(current_mode_version));
init_mode_version_ = current_mode_version;
LSN curr_read_lsn = log_storage_->get_lsn(curr_read_pos_);
// reuse LogIteratorStorage firstly, only the log before 'start_lsn_' + 'curr_read_pos_'
// has been consumed.
// NB: need ensure that we can not update 'curr_read_pos_' to 'curr_read_pos_' + sizeof(LogGroupEntryHeader)
// when the LogGroupEntryHeader after 'curr_read_pos_' need be controlled by replable_point_scn.
log_storage_->reuse(curr_read_lsn);
curr_read_buf_start_pos_ = 0;
curr_read_pos_ = 0;
curr_read_buf_end_pos_ = 0;
curr_entry_.reset();
// NB: we can not reset curr_entry_is_raw_write_, otherwise, the log entry after replayable_point_scn may no be
// controlled by replable_point_scn.
// curr_entry_is_raw_write_ = false;
curr_entry_size_ = 0;
init_mode_version_ = current_mode_version;
// we can not reset prev_entry_scn_, otherwise, after flashback, if there is no logs which can be readbale on disk,
// we can not return a valid next_min_scn.
// - prev_entry_.reset();
// we need reset accum_checksum_, otherwise, the accum_checksum_ is the log before flashback, and iterate new
// group log will fail.
accumlate_checksum_ = -1;
}
}
} // end namespace palf