PalfBufferIterator support return padding log.

This commit is contained in:
HaHaJeff
2023-04-28 03:41:40 +00:00
committed by ob-robot
parent 64dc54c87e
commit 58bb3d34b7
21 changed files with 887 additions and 332 deletions

View File

@ -106,13 +106,6 @@ int LogRequestHandler::get_self_addr_(common::ObAddr &self) const
return ret;
}
// could not enable replay service in mittest for now, just skip it
int ObLogReplayService::flashback(const share::ObLSID &id)
{
int ret = OB_SUCCESS;
return ret;
}
}
namespace unittest

View File

@ -222,6 +222,7 @@ TEST_F(TestObSimpleLogReplayFunc, test_flashback_to_padding)
PalfHandleImplGuard leader;
share::SCN basic_scn = share::SCN::min_scn();
CLOG_LOG(INFO, "test replay begin", K(id));
OB_LOGGER.set_log_level("TRACE");
EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, leader_idx, leader));
MockLSAdapter ls_adapter;
ls_adapter.init((ObLSService *)(0x1));
@ -279,16 +280,14 @@ TEST_F(TestObSimpleLogReplayFunc, test_flashback_to_padding)
// 开启拉日志
bool is_done = false;
while (!is_done) {
rp_sv.is_replay_done(ls_id, LSN(PALF_BLOCK_SIZE), is_done);
// 由于padding日志被受控回放,replay此时的回放位点最多为padding_header
rp_sv.is_replay_done(ls_id, padding_header, is_done);
usleep(10*1000);
CLOG_LOG(WARN, "not replay done", KPC(rp_st), K(padding_header));
}
is_done = false;
CLOG_LOG(INFO, "runlin trace 3", K(iterator), KPC(rp_st));
// 预期replay的next_to_submit_lsn是padding
EXPECT_EQ(iterator_end_lsn, rp_st->submit_log_task_.next_to_submit_lsn_);
EXPECT_EQ(OB_SUCCESS, rp_st->flashback());
// replay执行flashback后,next_to_submit_lsn是padding头
// 预期replay的next_to_submit_lsn是padding_header
EXPECT_EQ(padding_header, rp_st->submit_log_task_.next_to_submit_lsn_);
switch_flashback_to_append(leader, mode_version);
iterator_end_lsn = LSN(100000000);
@ -298,8 +297,6 @@ TEST_F(TestObSimpleLogReplayFunc, test_flashback_to_padding)
padding_tail_scn = leader.get_palf_handle_impl()->get_max_scn();
LSN max_lsn = leader.get_palf_handle_impl()->get_max_lsn();
EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, max_lsn));
//更新replay的committed_end_lsn
rp_st->submit_log_task_.update_committed_end_lsn(max_lsn);
}
// Test2: flashback到padding头部, replay先执行flashback,再执行replay padding
{
@ -313,15 +310,12 @@ TEST_F(TestObSimpleLogReplayFunc, test_flashback_to_padding)
iterator_end_lsn = padding_header;
// replay看到的committed位点是padding尾
rp_st->unblock_submit();
rp_st->submit_log_task_.committed_end_lsn_ = LSN(PALF_BLOCK_SIZE);
bool is_done = false;
while (!is_done) {
rp_sv.is_replay_done(ls_id, LSN(PALF_BLOCK_SIZE), is_done);
rp_sv.is_replay_done(ls_id, padding_header, is_done);
usleep(10*1000);
CLOG_LOG(WARN, "not replay done", KPC(rp_st), K(padding_header));
}
// 先执行replay的flashback
EXPECT_EQ(OB_SUCCESS, rp_st->flashback());
// 预期replay的next_to_submit_lsn是padding头
EXPECT_EQ(iterator_end_lsn, rp_st->submit_log_task_.next_to_submit_lsn_);
// 修改iterator看到的终点为padding尾
@ -338,9 +332,8 @@ TEST_F(TestObSimpleLogReplayFunc, test_flashback_to_padding)
padding_tail_scn = leader.get_palf_handle_impl()->get_max_scn();
LSN max_lsn = leader.get_palf_handle_impl()->get_max_lsn();
EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, max_lsn));
//更新replay的committed_end_lsn
rp_st->submit_log_task_.update_committed_end_lsn(max_lsn);
}
// Test3: flashback到padding尾部, replay先执行flashback,再执行replay padding
{
int ret = OB_SUCCESS;
@ -354,9 +347,8 @@ TEST_F(TestObSimpleLogReplayFunc, test_flashback_to_padding)
iterator_end_lsn = padding_header;
// replay看到的committed位点是padding头
rp_st->unblock_submit();
rp_st->submit_log_task_.committed_end_lsn_ = (padding_header);
bool is_done = false;
// iterator尽管没有吐出padding日志,replay会直接更新next_to_submit_lsn到padding
// iterator由于文件长度,不会吐出padding日志,replaynext_to_submit_lsn到padding
while (!is_done) {
rp_sv.is_replay_done(ls_id, padding_header, is_done);
usleep(10*1000);
@ -365,10 +357,8 @@ TEST_F(TestObSimpleLogReplayFunc, test_flashback_to_padding)
is_done = false;
// 预期replay的next_to_submit_lsn是padding头
EXPECT_EQ(padding_header, rp_st->submit_log_task_.next_to_submit_lsn_);
// 先执行replay的flashback
EXPECT_EQ(OB_SUCCESS, rp_st->flashback());
// replay执行flashback后,next_to_submit_lsn是padding头, palf的committed位点是padding尾,不会更新next_to_submit_lsn
EXPECT_EQ(padding_header, rp_st->submit_log_task_.next_to_submit_lsn_);
// iterator能看到padding日志
iterator_end_lsn = LSN(PALF_BLOCK_SIZE);
rp_st->trigger_fetch_log();
while (!is_done) {
rp_sv.is_replay_done(ls_id, LSN(PALF_BLOCK_SIZE), is_done);
@ -382,49 +372,67 @@ TEST_F(TestObSimpleLogReplayFunc, test_flashback_to_padding)
padding_tail_scn = leader.get_palf_handle_impl()->get_max_scn();
LSN max_lsn = leader.get_palf_handle_impl()->get_max_lsn();
EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, max_lsn));
//更新replay的committed_end_lsn
rp_st->submit_log_task_.update_committed_end_lsn(max_lsn);
}
// Test4: flashback到padding尾部, 先replay完,在执行flashback
{
int ret = OB_SUCCESS;
CLOG_LOG(WARN, "flashback to padding tailer case1");
int64_t abs_timeout_us = 4*1000*1000;
// flashback_scn为padding尾
SCN flashback_scn = SCN::minus(padding_tail_scn, 1);
switch_append_to_flashback(leader, mode_version);
EXPECT_EQ(OB_SUCCESS, leader.get_palf_handle_impl()->flashback(mode_version, flashback_scn, abs_timeout_us));
// iterator看到的终点是padding日志尾
}
TEST_F(TestObSimpleLogReplayFunc, test_wait_replay_done)
{
SET_CASE_LOG_FILE(TEST_NAME, "test_wait_replay_done");
const int64_t id = ATOMIC_AAF(&palf_id_, 1);
ObLSID ls_id(id);
int64_t leader_idx = 0;
LSN basic_lsn(0);
PalfHandleImplGuard leader;
share::SCN basic_scn = share::SCN::min_scn();
CLOG_LOG(INFO, "test replay begin", K(id));
OB_LOGGER.set_log_level("TRACE");
EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, leader_idx, leader));
MockLSAdapter ls_adapter;
ls_adapter.init((ObLSService *)(0x1));
ObLogReplayService rp_sv;
ObReplayStatus *rp_st = NULL;
PalfEnv *palf_env;
EXPECT_EQ(OB_SUCCESS, get_palf_env(leader_idx, palf_env));
rp_sv.init(palf_env, &ls_adapter, get_cluster()[0]->get_allocator());
rp_sv.start();
get_cluster()[0]->get_tenant_base()->update_thread_cnt(10);
LSN iterator_end_lsn(0);
LSN *iterator_end_lsn_ptr = &iterator_end_lsn;
auto get_file_end_lsn =[iterator_end_lsn_ptr]() {
CLOG_LOG(INFO, "get_file_end_lsn", K(*iterator_end_lsn_ptr));
return *iterator_end_lsn_ptr;
};
iterator_end_lsn = LSN(PALF_BLOCK_SIZE);
// replay看到的committed位点是padding尾
rp_st->unblock_submit();
rp_st->submit_log_task_.next_to_submit_lsn_ = (iterator_end_lsn);
EXPECT_EQ(OB_SUCCESS, rp_sv.add_ls(ls_id, ObReplicaType::REPLICA_TYPE_FULL));
EXPECT_EQ(OB_SUCCESS, rp_sv.enable(ls_id, basic_lsn, basic_scn));
{
ObReplayStatusGuard guard;
EXPECT_EQ(OB_SUCCESS, rp_sv.get_replay_status_(ls_id, guard));
rp_st = guard.get_replay_status();
ls_adapter.rp_st_ = rp_st;
}
PalfBufferIterator &iterator = rp_st->submit_log_task_.iterator_;
iterator.iterator_storage_.get_file_end_lsn_ = get_file_end_lsn;
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 31, leader_idx, MAX_LOG_BODY_SIZE));
EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, leader.get_palf_handle_impl()->get_max_lsn()));
int64_t remained_log_size = LSN(PALF_BLOCK_SIZE) - leader.get_palf_handle_impl()->get_max_lsn() - sizeof(LogGroupEntryHeader) - sizeof(LogEntryHeader);
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, remained_log_size));
EXPECT_EQ(LSN(PALF_BLOCK_SIZE), leader.get_palf_handle_impl()->get_max_lsn());
EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, leader.get_palf_handle_impl()->get_max_lsn()));
bool is_done = false;
// 无padding日志,committed位点是文件头
while (!is_done) {
rp_sv.is_replay_done(ls_id, iterator_end_lsn, is_done);
usleep(10*1000);
CLOG_LOG(WARN, "not replay done", KPC(rp_st), K(padding_header));
rp_sv.is_replay_done(ls_id, LSN(PALF_BLOCK_SIZE), is_done);
}
is_done = false;
// 预期replay的next_to_submit_lsn是padding尾
EXPECT_EQ(iterator_end_lsn, rp_st->submit_log_task_.next_to_submit_lsn_);
// 先执行replay的flashback
EXPECT_EQ(OB_SUCCESS, rp_st->flashback());
// replay执行flashback后,next_to_submit_lsn是padding尾
EXPECT_EQ(iterator_end_lsn, rp_st->submit_log_task_.next_to_submit_lsn_);
switch_flashback_to_append(leader, mode_version);
iterator_end_lsn = LSN(1000000000);
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1000, leader_idx, 100000));
LSN max_lsn = leader.get_palf_handle_impl()->get_max_lsn();
EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, max_lsn));
iterator_end_lsn = LSN(2*PALF_BLOCK_SIZE);
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 31, leader_idx, MAX_LOG_BODY_SIZE));
EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, leader.get_palf_handle_impl()->get_max_lsn()));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, MAX_LOG_BODY_SIZE));
// 有padding日志,committed位点是文件头
is_done =false;
while (!is_done) {
rp_sv.is_replay_done(ls_id, max_lsn, is_done);
usleep(10*1000);
CLOG_LOG(WARN, "not replay done", KPC(rp_st), K(padding_header));
rp_st->trigger_fetch_log();
}
rp_sv.is_replay_done(ls_id, LSN(PALF_BLOCK_SIZE), is_done);
}
}
} // unitest

View File

@ -66,6 +66,35 @@ std::string ObSimpleLogClusterTestBase::test_name_ = TEST_NAME;
bool ObSimpleLogClusterTestBase::need_add_arb_server_ = false;
constexpr int64_t timeout_ts_us = 3 * 1000 * 1000;
void read_padding_entry(PalfHandleImplGuard &leader, SCN padding_scn, LSN padding_log_lsn)
{
// 从padding group entry开始读取
{
PalfBufferIterator iterator;
EXPECT_EQ(OB_SUCCESS, leader.get_palf_handle_impl()->alloc_palf_buffer_iterator(padding_log_lsn, iterator));
EXPECT_EQ(OB_SUCCESS, iterator.next());
LogEntry padding_log_entry;
LSN check_lsn;
EXPECT_EQ(OB_SUCCESS, iterator.get_entry(padding_log_entry, check_lsn));
EXPECT_EQ(true, padding_log_entry.header_.is_padding_log_());
EXPECT_EQ(true, padding_log_entry.check_integrity());
EXPECT_EQ(padding_scn, padding_log_entry.get_scn());
}
// 从padding log entry开始读取
{
PalfBufferIterator iterator;
EXPECT_EQ(OB_SUCCESS, leader.get_palf_handle_impl()->alloc_palf_buffer_iterator(padding_log_lsn+LogGroupEntryHeader::HEADER_SER_SIZE, iterator));
EXPECT_EQ(OB_SUCCESS, iterator.next());
LogEntry padding_log_entry;
LSN check_lsn;
EXPECT_EQ(OB_SUCCESS, iterator.get_entry(padding_log_entry, check_lsn));
EXPECT_EQ(true, padding_log_entry.header_.is_padding_log_());
EXPECT_EQ(true, padding_log_entry.check_integrity());
EXPECT_EQ(padding_scn, padding_log_entry.get_scn());
}
}
TEST_F(TestObSimpleLogClusterSingleReplica, update_disk_options)
{
SET_CASE_LOG_FILE(TEST_NAME, "update_disk_options");
@ -242,11 +271,21 @@ TEST_F(TestObSimpleLogClusterSingleReplica, single_replica_flashback)
remained_log_size = LSN(PALF_BLOCK_SIZE) - leader.palf_handle_impl_->sw_.get_max_lsn();
EXPECT_LT(remained_log_size, 5*1024);
EXPECT_GT(remained_log_size, 0);
// 写一条大小为2KB的日志
// 写一条大小为5KB的日志
LSN padding_log_lsn = leader.get_palf_handle_impl()->get_max_lsn();
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, 5*1024));
EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, leader.get_palf_handle_impl()->get_max_lsn()));
// 验证读取padding是否成功
{
share::SCN padding_scn = leader.get_palf_handle_impl()->get_max_scn();
padding_scn = padding_scn.minus(padding_scn, 1);
read_padding_entry(leader, padding_scn, padding_log_lsn);
}
PALF_LOG(INFO, "runlin trace print sw3", K(leader.palf_handle_impl_->sw_));
// Padding日志预期不占用日志条数,因此存在33条日志
// Padding日志占用日志条数,因此存在34条日志
EXPECT_EQ(OB_SUCCESS, get_middle_scn(33, leader, mid_scn, header));
EXPECT_EQ(OB_SUCCESS, get_middle_scn(34, leader, mid_scn, header));
EXPECT_EQ(OB_ITER_END, get_middle_scn(35, leader, mid_scn, header));
EXPECT_LT(LSN(PALF_BLOCK_SIZE), leader.palf_handle_impl_->sw_.get_max_lsn());
max_scn = leader.palf_handle_impl_->sw_.get_max_scn();
EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, leader.palf_handle_impl_->get_max_lsn()));
@ -257,16 +296,25 @@ TEST_F(TestObSimpleLogClusterSingleReplica, single_replica_flashback)
PALF_LOG(INFO, "flashback to padding tail");
EXPECT_EQ(leader.palf_handle_impl_->get_max_lsn(), LSN(PALF_BLOCK_SIZE));
EXPECT_EQ(OB_ITER_END, read_log(leader));
// flashback后存在32条日志
EXPECT_EQ(OB_SUCCESS, get_middle_scn(32, leader, mid_scn, header));
EXPECT_EQ(OB_ITER_END, get_middle_scn(33, leader, mid_scn, header));
// flashback后存在33条日志(包含padding日志)
EXPECT_EQ(OB_SUCCESS, get_middle_scn(33, leader, mid_scn, header));
EXPECT_EQ(OB_ITER_END, get_middle_scn(34, leader, mid_scn, header));
// flashback到padding日志头部
// 验证读取padding是否成功
{
share::SCN padding_scn = leader.get_palf_handle_impl()->get_max_scn();
padding_scn.minus(padding_scn, 1);
PALF_LOG(INFO, "begin read_padding_entry", K(padding_scn), K(padding_log_lsn));
read_padding_entry(leader, padding_scn, padding_log_lsn);
}
// flashback到padding日志头部,磁盘上还有32条日志
tmp_scn = leader.palf_handle_impl_->get_max_scn();
EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->flashback(mode_version, SCN::minus(tmp_scn, 1), timeout_ts_us));
EXPECT_LT(leader.palf_handle_impl_->get_max_lsn(), LSN(PALF_BLOCK_SIZE));
EXPECT_EQ(OB_SUCCESS, get_middle_scn(32, leader, mid_scn, header));
EXPECT_EQ(OB_ITER_END, get_middle_scn(33, leader, mid_scn, header));
EXPECT_EQ(padding_log_lsn, leader.palf_handle_impl_->get_max_lsn());
switch_flashback_to_append(leader, mode_version);
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 10, leader_idx, 1000));
EXPECT_EQ(OB_SUCCESS, wait_lsn_until_flushed(LSN(PALF_BLOCK_SIZE), leader));
@ -414,11 +462,11 @@ TEST_F(TestObSimpleLogClusterSingleReplica, single_replica_flashback_restart)
{
unittest::PalfHandleImplGuard leader;
EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, leader_idx, leader));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1000, leader_idx));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1000, leader_idx, 1000));
LogEntryHeader header_origin;
EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, leader.palf_handle_impl_->get_max_lsn()));
EXPECT_EQ(OB_SUCCESS, get_middle_scn(323, leader, max_scn, header_origin));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 100, leader_idx));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 100, leader_idx, 1000));
wait_until_has_committed(leader, leader.palf_handle_impl_->sw_.get_max_lsn());
EXPECT_EQ(OB_ITER_END, read_log(leader));
switch_append_to_flashback(leader, mode_version);
@ -430,7 +478,7 @@ TEST_F(TestObSimpleLogClusterSingleReplica, single_replica_flashback_restart)
EXPECT_EQ(header_origin.data_checksum_, header_new.data_checksum_);
EXPECT_EQ(OB_ITER_END, get_middle_scn(324, leader, new_scn, header_new));
switch_flashback_to_append(leader, mode_version);
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1000, leader_idx));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1000, leader_idx, 1000));
EXPECT_EQ(OB_SUCCESS, get_middle_scn(1323, leader, new_scn, header_new));
EXPECT_EQ(OB_ITER_END, get_middle_scn(1324, leader, new_scn, header_new));
EXPECT_EQ(OB_ITER_END, read_log(leader));
@ -444,7 +492,7 @@ TEST_F(TestObSimpleLogClusterSingleReplica, single_replica_flashback_restart)
EXPECT_EQ(OB_SUCCESS, get_leader(id, new_leader, leader_idx));
EXPECT_EQ(OB_SUCCESS, new_leader.palf_handle_impl_->get_access_mode(curr_mode_version, curr_access_mode));
EXPECT_EQ(curr_mode_version, mode_version);
EXPECT_EQ(OB_SUCCESS, submit_log(new_leader, 1000, leader_idx));
EXPECT_EQ(OB_SUCCESS, submit_log(new_leader, 1000, leader_idx, 1000));
wait_until_has_committed(new_leader, new_leader.palf_handle_impl_->sw_.get_max_lsn());
EXPECT_EQ(OB_ITER_END, read_log(new_leader));
ref_scn.convert_for_tx(1000);
@ -496,13 +544,56 @@ TEST_F(TestObSimpleLogClusterSingleReplica, single_replica_flashback_restart)
EXPECT_EQ(OB_SUCCESS, restart_paxos_groups());
// 重启后继续提交日志
{
PalfHandleImplGuard new_leader;
EXPECT_EQ(OB_SUCCESS, get_leader(id, new_leader, leader_idx));
switch_flashback_to_append(new_leader, mode_version);
EXPECT_EQ(true, 0 == lsn_2_offset(new_leader.get_palf_handle_impl()->get_max_lsn(), PALF_BLOCK_SIZE));
share::SCN padding_scn = new_leader.get_palf_handle_impl()->get_max_scn();
EXPECT_EQ(OB_SUCCESS, submit_log(new_leader, 100, leader_idx));
wait_until_has_committed(new_leader, new_leader.palf_handle_impl_->sw_.get_max_lsn());
EXPECT_EQ(OB_ITER_END, read_log(new_leader));
switch_append_to_flashback(new_leader, mode_version);
// flashback到padding日志头后重启
EXPECT_EQ(OB_SUCCESS, new_leader.palf_handle_impl_->flashback(mode_version, padding_scn.minus(padding_scn, 1), timeout_ts_us));
EXPECT_EQ(true, 0 != lsn_2_offset(new_leader.get_palf_handle_impl()->get_max_lsn(), PALF_BLOCK_SIZE));
new_leader.reset();
}
EXPECT_EQ(OB_SUCCESS, restart_paxos_groups());
// 重启提交日志,不产生padding日志
{
PalfHandleImplGuard new_leader;
EXPECT_EQ(OB_SUCCESS, get_leader(id, new_leader, leader_idx));
LSN padding_start_lsn = new_leader.get_palf_handle_impl()->get_max_lsn();
EXPECT_EQ(true, 0 != lsn_2_offset(new_leader.get_palf_handle_impl()->get_max_lsn(), PALF_BLOCK_SIZE));
const int64_t remained_size = PALF_BLOCK_SIZE - lsn_2_offset(new_leader.get_palf_handle_impl()->get_max_lsn(), PALF_BLOCK_SIZE);
EXPECT_GE(remained_size, 0);
const int64_t group_entry_body_size = remained_size - LogGroupEntryHeader::HEADER_SER_SIZE - LogEntryHeader::HEADER_SER_SIZE;
PALF_LOG(INFO, "runlin trace print remained_size", K(remained_size), K(group_entry_body_size));
switch_flashback_to_append(new_leader, mode_version);
EXPECT_EQ(OB_SUCCESS, submit_log(new_leader, 1, leader_idx, group_entry_body_size));
EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(new_leader, new_leader.get_palf_handle_impl()->get_max_lsn()));
PalfBufferIterator iterator;
EXPECT_EQ(OB_SUCCESS, new_leader.get_palf_handle_impl()->alloc_palf_buffer_iterator(padding_start_lsn, iterator));
EXPECT_EQ(OB_SUCCESS, iterator.next());
LogEntry log_entry;
LSN check_lsn;
EXPECT_EQ(OB_SUCCESS, iterator.get_entry(log_entry, check_lsn));
EXPECT_EQ(check_lsn, padding_start_lsn + LogGroupEntryHeader::HEADER_SER_SIZE);
EXPECT_EQ(false, log_entry.header_.is_padding_log_());
EXPECT_EQ(true, log_entry.check_integrity());
new_leader.reset();
}
EXPECT_EQ(OB_SUCCESS, restart_paxos_groups());
// 重启后继续提交日志
{
PalfHandleImplGuard new_leader;
EXPECT_EQ(OB_SUCCESS, get_leader(id, new_leader, leader_idx));
EXPECT_EQ(true, 0 == lsn_2_offset(new_leader.get_palf_handle_impl()->get_max_lsn(), PALF_BLOCK_SIZE));
EXPECT_EQ(OB_SUCCESS, submit_log(new_leader, 100, leader_idx, 1000));
EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(new_leader, new_leader.get_palf_handle_impl()->get_max_lsn()));
EXPECT_EQ(OB_ITER_END, read_log(new_leader));
}
delete_paxos_group(id);
}
@ -1751,9 +1842,101 @@ TEST_F(TestObSimpleLogClusterSingleReplica, test_iterator_with_flashback)
// 迭代新写入的日志成功
EXPECT_EQ(OB_SUCCESS, iterator.next(SCN::min_scn(), next_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(OB_ITER_END, iterator.next(SCN::min_scn()));
}
// 验证一条padding LogGroupEntry需要受控回放
{
const int64_t append_id = ATOMIC_AAF(&palf_id_, 1);
PalfHandleImplGuard append_leader;
EXPECT_EQ(OB_SUCCESS, create_paxos_group(append_id, leader_idx, append_leader));
EXPECT_EQ(OB_SUCCESS, submit_log(append_leader, 31, leader_idx, MAX_LOG_BODY_SIZE));
const LSN padding_start_lsn = append_leader.get_palf_handle_impl()->get_max_lsn();
EXPECT_EQ(OB_SUCCESS, submit_log(append_leader, 1, leader_idx, MAX_LOG_BODY_SIZE));
EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(append_leader, append_leader.get_palf_handle_impl()->get_max_lsn()));
SCN padding_scn = append_leader.get_palf_handle_impl()->get_max_scn();
padding_scn = padding_scn.minus(padding_scn, 1);
const int64_t raw_write_id = ATOMIC_AAF(&palf_id_, 1);
PalfHandleImplGuard raw_write_leader;
EXPECT_EQ(OB_SUCCESS, create_paxos_group(raw_write_id, leader_idx, raw_write_leader));
EXPECT_EQ(OB_SUCCESS, change_access_mode_to_raw_write(raw_write_leader));
EXPECT_EQ(OB_ITER_END, read_and_submit_group_log(append_leader, raw_write_leader));
EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(raw_write_leader, raw_write_leader.get_palf_handle_impl()->get_max_lsn()));
switch_append_to_flashback(raw_write_leader, mode_version);
PalfBufferIterator buff_iterator;
PalfGroupBufferIterator group_buff_iterator;
PalfBufferIterator buff_iterator_padding_start;
PalfGroupBufferIterator group_buff_iterator_padding_start;
EXPECT_EQ(OB_SUCCESS, raw_write_leader.get_palf_handle_impl()->alloc_palf_buffer_iterator(LSN(0), buff_iterator));
EXPECT_EQ(OB_SUCCESS, raw_write_leader.get_palf_handle_impl()->alloc_palf_group_buffer_iterator(LSN(0), group_buff_iterator));
EXPECT_EQ(OB_SUCCESS, raw_write_leader.get_palf_handle_impl()->alloc_palf_buffer_iterator(LSN(0), buff_iterator_padding_start));
EXPECT_EQ(OB_SUCCESS, raw_write_leader.get_palf_handle_impl()->alloc_palf_group_buffer_iterator(LSN(0), group_buff_iterator_padding_start));
SCN next_min_scn;
bool iterate_end_by_replayable_point = false;
EXPECT_EQ(OB_ITER_END, buff_iterator.next(share::SCN::min_scn(), next_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(true, iterate_end_by_replayable_point);
EXPECT_EQ(OB_ITER_END, group_buff_iterator.next(share::SCN::min_scn(), next_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(true, iterate_end_by_replayable_point);
// 一共有33条日志,包括padding
SCN replayable_point_scn = padding_scn.minus(padding_scn, 1);
// 直到padding日志受控回放
int ret = OB_SUCCESS;
while (OB_SUCC(buff_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point))) {
}
ret = OB_SUCCESS;
while (OB_SUCC(buff_iterator_padding_start.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point))) {
}
EXPECT_EQ(OB_ITER_END, ret);
EXPECT_EQ(true, iterate_end_by_replayable_point);
EXPECT_EQ(next_min_scn, padding_scn);
ret = OB_SUCCESS;
while (OB_SUCC(group_buff_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point))) {
}
ret = OB_SUCCESS;
while (OB_SUCC(group_buff_iterator_padding_start.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point))) {
}
EXPECT_EQ(OB_ITER_END, ret);
EXPECT_EQ(true, iterate_end_by_replayable_point);
EXPECT_EQ(next_min_scn, padding_scn);
EXPECT_EQ(false, buff_iterator.iterator_impl_.curr_entry_is_padding_);
EXPECT_EQ(false, group_buff_iterator.iterator_impl_.curr_entry_is_padding_);
// flashback到padding日志尾
EXPECT_EQ(OB_SUCCESS, raw_write_leader.get_palf_handle_impl()->flashback(mode_version, padding_scn, timeout_ts_us));
EXPECT_EQ(OB_SUCCESS, buff_iterator.next(padding_scn, next_min_scn, iterate_end_by_replayable_point));
LogEntry padding_log_entry;
LSN padding_log_lsn;
EXPECT_EQ(OB_SUCCESS, buff_iterator.get_entry(padding_log_entry, padding_log_lsn));
EXPECT_EQ(true, padding_log_entry.check_integrity());
EXPECT_EQ(true, padding_log_entry.header_.is_padding_log_());
EXPECT_EQ(padding_scn, padding_log_entry.header_.scn_);
EXPECT_EQ(false, buff_iterator.iterator_impl_.padding_entry_scn_.is_valid());
EXPECT_EQ(OB_SUCCESS, group_buff_iterator.next(padding_scn, next_min_scn, iterate_end_by_replayable_point));
LogGroupEntry padding_group_entry;
LSN padding_group_lsn;
EXPECT_EQ(OB_SUCCESS, group_buff_iterator.get_entry(padding_group_entry, padding_group_lsn));
EXPECT_EQ(true, padding_group_entry.check_integrity());
EXPECT_EQ(true, padding_group_entry.header_.is_padding_log());
// 对于LogGruopEntry的iterator,在construct_padding_log_entry_后,不会重置padding状态
EXPECT_EQ(true, group_buff_iterator.iterator_impl_.padding_entry_scn_.is_valid());
EXPECT_EQ(padding_log_entry.header_.scn_, padding_group_entry.header_.max_scn_);
// flashback到padding日志头
EXPECT_EQ(OB_SUCCESS, raw_write_leader.get_palf_handle_impl()->flashback(mode_version, padding_scn.minus(padding_scn, 1), timeout_ts_us));
// 预期是由于文件长度导致的OB_ITER_END
EXPECT_EQ(OB_ITER_END, buff_iterator_padding_start.next(padding_scn, next_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(false, iterate_end_by_replayable_point);
EXPECT_GE(next_min_scn, buff_iterator_padding_start.iterator_impl_.prev_entry_scn_);
EXPECT_EQ(OB_ITER_END, group_buff_iterator_padding_start.next(padding_scn, next_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(false, iterate_end_by_replayable_point);
EXPECT_GE(next_min_scn, group_buff_iterator_padding_start.iterator_impl_.prev_entry_scn_);
switch_flashback_to_append(raw_write_leader, mode_version);
EXPECT_EQ(OB_SUCCESS, submit_log(raw_write_leader, 100, leader_idx, 1000));
EXPECT_EQ(OB_SUCCESS, buff_iterator_padding_start.next());
EXPECT_EQ(OB_SUCCESS, group_buff_iterator_padding_start.next());
}
}
TEST_F(TestObSimpleLogClusterSingleReplica, read_block_in_flashback)

View File

@ -358,8 +358,6 @@ int LogRequestHandler::handle_request<LogFlashbackMsg>(const LogFlashbackMsg &re
K(curr_access_mode), K(req));
} else if (OB_FAIL(palf_handle_guard.flashback(req.mode_version_, req.flashback_scn_, FLASHBACK_TIMEOUT_US))) {
CLOG_LOG(WARN, "flashback failed", K(ret), K(palf_id), K(req));
} else if (OB_FAIL(replay_srv->flashback(ls_id))) {
CLOG_LOG(WARN, "replay_service flashback failed", K(ret), K(ls_id));
} else if (OB_FAIL(get_rpc_proxy_(rpc_proxy))) {
CLOG_LOG(WARN, "get_rpc_proxy_ failed", K(ret), K(palf_id));
} else if (OB_FAIL(get_self_addr_(self))) {

View File

@ -77,6 +77,9 @@ enum ObLogBaseType
ARBITRATION_SERVICE_LOG_BASE_TYPE = 21,
HEARTBEAT_SERVICE_LOG_BASE_TYPE = 22,
// for padding log entry
PADDING_LOG_BASE_TYPE = 23,
// pay attention!!!
// add log type in log_base_type_to_string
// max value
@ -137,6 +140,8 @@ int log_base_type_to_string(const ObLogBaseType log_type,
strncpy(str ,"ARBITRATION_SERVICE", str_len);
} else if (log_type == HEARTBEAT_SERVICE_LOG_BASE_TYPE) {
strncpy(str ,"HEARTBEAT_SERVICE", str_len);
} else if (log_type == PADDING_LOG_BASE_TYPE) {
strncpy(str ,"PADDING_LOG_ENTRY", str_len);
} else {
ret = OB_INVALID_ARGUMENT;
}

View File

@ -68,6 +68,9 @@ int ObLSAdapter::replay(ObLogReplayTask *replay_task)
} else if (OB_ISNULL(ls = ls_handle.get_ls())) {
ret = OB_ERR_UNEXPECTED;
CLOG_LOG(ERROR, " log stream not exist", KPC(replay_task), K(ret));
} else if (ObLogBaseType::PADDING_LOG_BASE_TYPE == replay_task->log_type_) {
ret = OB_ERR_UNEXPECTED;
CLOG_LOG(ERROR, "padding log entry can't be replayed, unexpected error", KPC(replay_task));
} else if (OB_FAIL(ls->replay(replay_task->log_type_,
replay_task->log_buf_,
replay_task->log_size_,

View File

@ -14,7 +14,6 @@
#include "lib/oblog/ob_log_module.h" // LOG*
#include "lib/ob_errno.h" // ERROR NUMBER
#include "lib/checksum/ob_crc64.h" // ob_crc64
namespace oceanbase
{
namespace palf

View File

@ -11,9 +11,10 @@
*/
#include "log_entry_header.h"
#include "lib/checksum/ob_crc64.h"
#include "lib/checksum/ob_crc64.h" // ob_crc64
#include "lib/checksum/ob_parity_check.h" // parity_check
#include "lib/ob_errno.h"
#include "lib/ob_errno.h" // errno
#include "logservice/ob_log_base_header.h" // ObLogBaseHeader
namespace oceanbase
{
@ -22,6 +23,7 @@ namespace palf
{
const int64_t LogEntryHeader::HEADER_SER_SIZE = sizeof(LogEntryHeader);
const int64_t LogEntryHeader::PADDING_LOG_ENTRY_SIZE = sizeof(LogEntryHeader) + sizeof(logservice::ObLogBaseHeader);
LogEntryHeader::LogEntryHeader()
: magic_(0),
@ -110,6 +112,53 @@ bool LogEntryHeader::check_header_checksum_() const
return (header_checksum == saved_header_checksum);
}
bool LogEntryHeader::is_padding_log_() const
{
return (flag_ & PADDING_TYPE_MASK) > 0;
}
// static member function
// the format of out_buf
// | LogEntryHeader | ObLogBaseHeader |
int LogEntryHeader::generate_padding_log_buf(const int64_t padding_data_len,
const share::SCN &scn,
char *out_buf,
const int64_t padding_valid_data_len)
{
int ret = OB_SUCCESS;
LogEntryHeader header;
logservice::ObLogBaseHeader base_header(logservice::ObLogBaseType::PADDING_LOG_BASE_TYPE,
logservice::ObReplayBarrierType::NO_NEED_BARRIER,
0);
const int64_t base_header_len = base_header.get_serialize_size();
const int64_t header_len = header.get_serialize_size();
const int64_t serialize_len = base_header_len + header_len;
int64_t serialize_header_pos = 0;
int64_t serialize_base_header_pos = serialize_header_pos + header_len;
if (padding_data_len <= 0
|| !scn.is_valid()
|| NULL == out_buf
|| padding_valid_data_len < serialize_len
|| padding_data_len < padding_valid_data_len) {
ret = OB_INVALID_ARGUMENT;
PALF_LOG(WARN, "invalid argument", K(padding_data_len), K(scn), KP(out_buf), K(padding_valid_data_len));
} else if(OB_FAIL(base_header.serialize(out_buf, padding_valid_data_len, serialize_base_header_pos))) {
PALF_LOG(WARN, "serailize ObLogBaseHeader failed", K(padding_data_len), KP(out_buf), K(padding_valid_data_len),
K(serialize_base_header_pos));
} else if (FALSE_IT(serialize_base_header_pos = serialize_header_pos + header_len)) {
} else if (OB_FAIL(header.generate_padding_header_(out_buf+serialize_base_header_pos,
base_header_len,
padding_data_len,
scn))) {
PALF_LOG(WARN, "generaet LogEntryHeader failed", K(padding_data_len), K(scn), KP(out_buf), K(padding_valid_data_len));
} else if (OB_FAIL(header.serialize(out_buf, header_len, serialize_header_pos))) {
PALF_LOG(WARN, "serialize LogEntryHeader failed", K(padding_data_len), K(scn), KP(out_buf), K(padding_valid_data_len));
} else {
PALF_LOG(INFO, "generate_padding_log_buf success", K(header), K(padding_data_len), K(scn), KP(out_buf), K(padding_valid_data_len));
}
return ret;
}
bool LogEntryHeader::check_header_integrity() const
{
return true == is_valid() && true == check_header_checksum_();
@ -118,6 +167,8 @@ bool LogEntryHeader::check_header_integrity() const
bool LogEntryHeader::check_integrity(const char *buf, const int64_t data_len) const
{
bool bool_ret = false;
// for padding log, only check integrity of ObLogBaseHeader
int64_t valid_data_len = is_padding_log_() ? sizeof(logservice::ObLogBaseHeader) : data_len;
if (NULL == buf || data_len <= 0) {
PALF_LOG_RET(WARN, OB_INVALID_ARGUMENT, "invalid arguments", KP(buf), K(data_len));
} else if (LogEntryHeader::MAGIC != magic_) {
@ -126,17 +177,40 @@ bool LogEntryHeader::check_integrity(const char *buf, const int64_t data_len) co
} else if (false == check_header_checksum_()) {
PALF_LOG_RET(WARN, OB_ERROR, "check header checsum failed", K(*this));
} else {
const int64_t tmp_data_checksum = common::ob_crc64(buf, data_len);
const int64_t tmp_data_checksum = common::ob_crc64(buf, valid_data_len);
if (data_checksum_ == tmp_data_checksum) {
bool_ret = true;
} else {
bool_ret = false;
PALF_LOG_RET(WARN, OB_ERR_UNEXPECTED, "data checksum mismatch", K_(data_checksum), K(tmp_data_checksum), K(data_len), KPC(this));
PALF_LOG_RET(WARN, OB_ERR_UNEXPECTED, "data checksum mismatch", K_(data_checksum), K(tmp_data_checksum), K(data_len),
K(valid_data_len), KPC(this));
}
}
return bool_ret;
}
int LogEntryHeader::generate_padding_header_(const char *log_data,
const int64_t base_header_len,
const int64_t padding_data_len,
const share::SCN &scn)
{
int ret = OB_SUCCESS;
if (NULL == log_data || base_header_len <= 0 || padding_data_len <= 0 || !scn.is_valid()) {
ret = OB_INVALID_ARGUMENT;
} else {
magic_ = LogEntryHeader::MAGIC;
version_ = LogEntryHeader::LOG_ENTRY_HEADER_VERSION;
log_size_ = padding_data_len;
scn_ = scn;
data_checksum_ = common::ob_crc64(log_data, base_header_len);
flag_ = (flag_ | LogEntryHeader::PADDING_TYPE_MASK);
// update header checksum after all member vars assigned
(void) update_header_checksum_();
PALF_LOG(INFO, "generate_padding_header_ success", KPC(this), K(log_data), K(base_header_len), K(padding_data_len));
}
return ret;
}
DEFINE_SERIALIZE(LogEntryHeader)
{
int ret = OB_SUCCESS;

View File

@ -38,6 +38,18 @@ public:
const share::SCN get_scn() const { return scn_; }
int64_t get_data_checksum() const { return data_checksum_; }
bool check_header_integrity() const;
// @brief: generate padding log entry
// @param[in]: padding_data_len, the data len of padding entry(the group_size_ in LogGroupEntry
// - sizeof(LogEntryHeader))
// @param[in]: scn, the SCN of padding log entry
// @param[in&out]: out_buf, out_buf just only include LogEntryHeader and ObLogBaseHeader
// @param[in]: padding_valid_data_len, the valid data len of padding LogEntry(just only include
// LogEntryHeader and ObLogBaseHeader).
static int generate_padding_log_buf(const int64_t padding_data_len,
const share::SCN &scn,
char *out_buf,
const int64_t padding_valid_data_len);
NEED_SERIALIZE_AND_DESERIALIZE;
TO_STRING_KV("magic", magic_,
"version", version_,
@ -48,12 +60,20 @@ public:
public:
static constexpr int16_t MAGIC = 0x4C48; // 'LH' means LOG ENTRY HEADER
static const int64_t HEADER_SER_SIZE;
static const int64_t PADDING_LOG_ENTRY_SIZE;
private:
bool get_header_parity_check_res_() const;
void update_header_checksum_();
bool check_header_checksum_() const;
bool is_padding_log_() const;
int generate_padding_header_(const char *log_data,
const int64_t base_header_len,
const int64_t padding_data_len,
const share::SCN &scn);
private:
static constexpr int16_t LOG_ENTRY_HEADER_VERSION = 1;
static constexpr int64_t PADDING_TYPE_MASK = 1 << 1;
private:
int16_t magic_;
int16_t version_;

View File

@ -201,7 +201,6 @@ int LogGroupBuffer::fill(const LSN &lsn,
LSN start_lsn, reuse_lsn;
get_buffer_start_lsn_(start_lsn);
get_reuse_lsn_(reuse_lsn);
const int64_t reserved_buf_size = get_reserved_buffer_size();
const int64_t available_buf_size = get_available_buffer_size();
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
@ -219,33 +218,27 @@ int LogGroupBuffer::fill(const LSN &lsn,
// double check: 要填充的终点超过了buffer可复用的范围
ret = OB_EAGAIN;
PALF_LOG(WARN, "end_lsn is greater than reuse end pos", K(ret), K(lsn), K(end_lsn), K(reuse_lsn), K(available_buf_size));
} else if (OB_FAIL(get_buffer_pos_(lsn, start_pos))) {
PALF_LOG(WARN, "get_buffer_pos_ failed", K(ret), K(lsn));
} else if (OB_FAIL(fill_(lsn, data, data_len))) {
PALF_LOG(WARN, "fill data failed", K(lsn), K(data_len), KP(data_buf_));
} else {
const int64_t group_buf_tail_len = reserved_buf_size - start_pos;
int64_t first_part_len = min(group_buf_tail_len, data_len);
memcpy(data_buf_ + start_pos, data, first_part_len);
if (data_len > first_part_len) {
// seeking to buffer's beginning
memcpy(data_buf_, data + first_part_len, data_len - first_part_len);
}
PALF_LOG(TRACE, "fill group buffer success", K(ret), K(lsn), K(data_len), K(start_pos), K(group_buf_tail_len),
K(first_part_len), "second_part_len", data_len - first_part_len, KP(data_buf_));
PALF_LOG(TRACE, "fill group buffer success", K(ret), K(lsn), K(data_len), KP(data_buf_));
}
return ret;
}
int LogGroupBuffer::fill_padding_body(const LSN &lsn,
const char *data,
const int64_t data_len,
const int64_t log_body_size)
{
int ret = OB_SUCCESS;
int64_t start_pos = 0;
const LSN end_lsn = lsn + log_body_size;
LSN start_lsn, reuse_lsn;
get_buffer_start_lsn_(start_lsn);
get_reuse_lsn_(reuse_lsn);
const int64_t reserved_buf_size = get_reserved_buffer_size();
const int64_t available_buf_size = get_available_buffer_size();
const int64_t reserved_buf_size = get_reserved_buffer_size();
int64_t start_pos = 0;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
} else if (!lsn.is_valid() || log_body_size <= 0) {
@ -266,6 +259,7 @@ int LogGroupBuffer::fill_padding_body(const LSN &lsn,
} else if (OB_FAIL(get_buffer_pos_(lsn, start_pos))) {
PALF_LOG(WARN, "get_buffer_pos_ failed", K(ret), K(lsn));
} else {
// reset data to zero firstly.
const int64_t group_buf_tail_len = reserved_buf_size - start_pos;
int64_t first_part_len = min(group_buf_tail_len, log_body_size);
memset(data_buf_ + start_pos, PADDING_LOG_CONTENT_CHAR, first_part_len);
@ -273,8 +267,15 @@ int LogGroupBuffer::fill_padding_body(const LSN &lsn,
// seeking to buffer's beginning
memset(data_buf_, PADDING_LOG_CONTENT_CHAR, log_body_size - first_part_len);
}
PALF_LOG(INFO, "fill padding body success", K(ret), K(lsn), K(log_body_size), K(start_pos), K(group_buf_tail_len),
K(first_part_len), "second_part_len", log_body_size - first_part_len);
// fill valid padding data.
if (OB_FAIL(fill_(lsn, data, data_len))) {
PALF_LOG(WARN, "fill padding data filled", K(ret), K(lsn), K(log_body_size), K(start_pos), K(data_len),
K(group_buf_tail_len), K(first_part_len), "second_part_len", data_len - first_part_len);
} else {
PALF_LOG(INFO, "fill padding log success", K(ret), K(lsn), K(log_body_size), K(start_pos), K(data_len),
K(group_buf_tail_len), K(first_part_len), "second_part_len", data_len - first_part_len);
}
}
return ret;
}
@ -395,5 +396,28 @@ int LogGroupBuffer::set_reuse_lsn(const LSN &new_reuse_lsn)
}
return ret;
}
int LogGroupBuffer::fill_(const LSN &lsn,
const char *data,
const int64_t data_len)
{
int ret = OB_SUCCESS;
int64_t start_pos = 0;
const int64_t reserved_buf_size = get_reserved_buffer_size();
if (OB_FAIL(get_buffer_pos_(lsn, start_pos))) {
PALF_LOG(WARN, "get_buffer_pos_ failed", K(ret), K(lsn));
} else {
const int64_t group_buf_tail_len = reserved_buf_size - start_pos;
int64_t first_part_len = min(group_buf_tail_len, data_len);
memcpy(data_buf_ + start_pos, data, first_part_len);
if (data_len > first_part_len) {
// seeking to buffer's beginning
memcpy(data_buf_, data+first_part_len, data_len - first_part_len);
}
PALF_LOG(TRACE, "fill data success", K(ret), K(lsn), K(data_len), K(start_pos), K(group_buf_tail_len),
K(first_part_len), "second_part_len", data_len - first_part_len);
}
return ret;
}
} // namespace palf
} // namespace oceanbase

View File

@ -47,6 +47,8 @@ public:
const char *data,
const int64_t data_len);
int fill_padding_body(const LSN &lsn,
const char *data,
const int64_t data_len,
const int64_t log_body_size);
int get_log_buf(const LSN &lsn, const int64_t total_len, LogWriteBuf &log_buf);
bool can_handle_new_log(const LSN &lsn,
@ -70,6 +72,9 @@ private:
int get_buffer_pos_(const LSN &lsn, int64_t &start_pos) const;
void get_buffer_start_lsn_(LSN &start_lsn) const;
void get_reuse_lsn_(LSN &reuse_lsn) const;
int fill_(const LSN &lsn,
const char *data,
const int64_t data_len);
private:
// buffer起始位置对应的lsn
LSN start_lsn_;

View File

@ -40,8 +40,7 @@ public:
bool check_integrity() const;
bool check_integrity(int64_t &data_checksum) const;
int64_t get_header_size() const { return header_.get_serialize_size(); }
int64_t get_payload_offset() const { return header_.get_serialize_size() +
(header_.is_padding_log() ? header_.get_data_len() : 0); }
int64_t get_payload_offset() const { return header_.get_serialize_size(); }
int64_t get_data_len() const { return header_.get_data_len(); }
// return total size of header and body, including the length of padding log
int64_t get_group_entry_size() const { return header_.get_serialize_size() +

View File

@ -47,8 +47,8 @@ enum class LogEntryType
// =========== LogEntryType end =============
enum class IterateEndReason {
DUE_TO_REPLAYBLE_POINT_SCN_LOG_GROUP_ENTRY = 0,
DUE_TO_REPLAYBLE_POINT_SCN_LOG_ENTRY = 1,
DUE_TO_REPLAYABLE_POINT_SCN_LOG_GROUP_ENTRY = 0,
DUE_TO_REPLAYABLE_POINT_SCN_LOG_ENTRY = 1,
DUE_TO_FILE_END_LSN_NOT_READ_NEW_DATA = 2,
DUE_TO_FILE_END_LSN_READ_NEW_DATA = 3,
MAX_TYPE = 4
@ -74,8 +74,8 @@ struct IterateEndInfo {
}
bool is_iterate_end_by_replayable_point_scn() const
{
return IterateEndReason::DUE_TO_REPLAYBLE_POINT_SCN_LOG_ENTRY == reason_
|| IterateEndReason::DUE_TO_REPLAYBLE_POINT_SCN_LOG_GROUP_ENTRY == reason_;
return IterateEndReason::DUE_TO_REPLAYABLE_POINT_SCN_LOG_ENTRY == reason_
|| IterateEndReason::DUE_TO_REPLAYABLE_POINT_SCN_LOG_GROUP_ENTRY == reason_;
}
IterateEndReason reason_;
SCN log_scn_;
@ -101,18 +101,18 @@ public:
// - has iterated to the end of block.
// OB_NEED_RETRY
// - the data in cache is not integrity, and the integrity data has been truncate from disk,
// need read data from storage eagin.(data in cache will not been clean up, therefore,
// need read data from storage again.(data in cache will not been clean up, therefore,
// user need used a new iterator to read data again)
// - if the end_lsn get from get_file_end_lsn is smaller than 'log_tail_' of LogStorage, and it's
// not the exact boundary of LogGroupEntry(for PalgGroupeBufferIterator, or LogEntry for PalfBufferIterator),
// not the exact boundary of LogGroupEntry(for PalfGroupeBufferIterator, or LogEntry for PalfBufferIterator),
// OB_NEED_RETRY may be return.
// - if read_data_from_storage_ is concurrent with the last step of flashback, opening last block on disk may be failed
// due to rename, return OB_NEED_RETRY in this case.(TODO by runlin: retry by myself)
// OB_ERR_OUT_LOWER_BOUND
// - block has been recycled
// OB_CHECKSUM_ERROR
// - the accumlate checksum calc by accum_checksum_ and the data checksum of LogGroupEntry is not
// same as the accumlate checksum of LogGroupEntry
// - the accumulate checksum calc by accum_checksum_ and the data checksum of LogGroupEntry is not
// same as the accumulate checksum of LogGroupEntry
int next(const share::SCN &replayable_point_scn);
// param[in] replayable point scn, iterator will ensure that no log will return when the log scn is greater than
@ -124,7 +124,7 @@ public:
// OB_INVALID_DATA.
// OB_ITER_END, has iterated to the end of block.
// OB_NEED_RETRY, the data in cache is not integrity, and the integrity data has been truncate from disk,
// need read data from storage eagin.(data in cache will not been clean up, therefore,
// need read data from storage again.(data in cache will not been clean up, therefore,
// user need used a new iterator to read data again)
// OB_ERR_OUT_LOWER_BOUND, block has been recycled
//
@ -147,11 +147,12 @@ public:
TO_STRING_KV(KP(buf_), K_(next_round_pread_size), K_(curr_read_pos), K_(curr_read_buf_start_pos),
K_(curr_read_buf_end_pos), KPC(log_storage_), K_(curr_entry_is_raw_write), K_(curr_entry_size),
K_(prev_entry_scn), K_(curr_entry), K_(init_mode_version), K_(accumlate_checksum));
K_(prev_entry_scn), K_(curr_entry), K_(init_mode_version), K_(accumulate_checksum),
K_(curr_entry_is_padding), K_(padding_entry_size), K_(padding_entry_scn));
private:
// @brief get next entry from data storage or cache.
// @rtval
// @retval
// OB_SUCCESS
// OB_INVALID_DATA
// OB_ITER_END
@ -171,7 +172,7 @@ private:
// OB_INVALID_DATA
// -- means log entry is not integrity, need check this log entry whether is the last one.
// OB_CHECKSUM_ERROR
// -- means accumlate checksum is not matched.
// -- means accumulate checksum is not matched.
// OB_ITER_END
// -- means log entry is iterated end by replayable_point_scn
int parse_one_entry_(const SCN &replayable_point_scn,
@ -199,11 +200,13 @@ private:
const bool matched_type = std::is_same<LogEntry, ENTRY>::value;
int64_t pos = curr_read_pos_;
if (true == matched_type) {
if (OB_FAIL(curr_entry_.deserialize(buf_, curr_read_buf_end_pos_, pos))) {
if (curr_entry_is_padding_ && OB_FAIL(construct_padding_log_entry_(pos, padding_entry_size_))) {
PALF_LOG(WARN, "construct_padding_log_entry_ failed", KPC(this));
} else if (OB_FAIL(curr_entry_.deserialize(buf_, curr_read_buf_end_pos_, pos))) {
} else if (curr_entry_is_raw_write_ && curr_entry_.get_scn() > replayable_point_scn) {
ret = OB_ITER_END;
info.log_scn_ = curr_entry_.get_scn();
info.reason_ = IterateEndReason::DUE_TO_REPLAYBLE_POINT_SCN_LOG_ENTRY;
info.reason_ = IterateEndReason::DUE_TO_REPLAYABLE_POINT_SCN_LOG_ENTRY;
PALF_LOG(TRACE, "iterate end by replayable_point", KPC(this), K(replayable_point_scn), K(info));
}
} else {
@ -215,7 +218,7 @@ private:
// When entry in file is LogGroupEntry, handle it specifically.
// 1. for raw write LogGroupEntry, if it's controlled by replayable_point_scn, no need update
// 'curr_read_pos_', 'accum_checksum_', because this log may be flashbacked.
// 'curr_read_pos_', 'accum_checksum_', because this log may be flashback.
// 2. for append LogGroupEntry, handle it normally.
int parse_log_group_entry_(const SCN &replayable_point_scn,
IterateEndInfo &info)
@ -265,8 +268,8 @@ private:
template <>
// When T is LogGroupEntry, need do:
// 1. check accumlate checksum:
// - if accumlate checksum is not match, return OB_CHECKSUM_ERROR
// 1. check accumulate checksum:
// - if accumulate checksum is not match, return OB_CHECKSUM_ERROR
// - if data checksum is not match, return OB_INVALID_DATA
// 2. check this entry whether need control by 'replayable_point_scn':
// - if control by 'replayable_point_scn', return OB_ITER_END, and don't modify
@ -278,14 +281,14 @@ private:
{
int ret = OB_SUCCESS;
bool curr_entry_is_raw_write = entry.get_header().is_raw_write();
int64_t new_accumlate_checksum = -1;
int64_t new_accumulate_checksum = -1;
PALF_LOG(TRACE, "T is LogGroupEntry", K(entry));
if (OB_FAIL(verify_accum_checksum_(entry, new_accumlate_checksum))) {
if (OB_FAIL(verify_accum_checksum_(entry, new_accumulate_checksum))) {
PALF_LOG(WARN, "verify_accum_checksum_ failed", K(ret), KPC(this), K(entry));
// NB: when current entry is raw write, and the log scn of current entry is greater than
// replayable_point_scn, this log may be clean up, therefore we can not update several fields of
// LogIteratorImpl, return OB_ITER_END directlly, otherwise, we may not parse new LogGroupEntryHeader
// after flashback position, this will cause one log which is append, but control by replable_point_scn.
// LogIteratorImpl, return OB_ITER_END directly, otherwise, we may not parse new LogGroupEntryHeader
// after flashback position, this will cause one log which is append, but control by replayable_point_scn.
//
// NB: we need check the min scn of LogGroupEntry whether has been greater than
// replayable_point_scn:
@ -313,7 +316,7 @@ private:
} else if ((is_group_iterator && entry.get_scn() > replayable_point_scn)
|| (!is_group_iterator && min_scn > replayable_point_scn)) {
info.log_scn_ = min_scn;
info.reason_ = IterateEndReason::DUE_TO_REPLAYBLE_POINT_SCN_LOG_GROUP_ENTRY;
info.reason_ = IterateEndReason::DUE_TO_REPLAYABLE_POINT_SCN_LOG_GROUP_ENTRY;
ret = OB_ITER_END;
PALF_LOG(TRACE, "iterate end by replayable_point", K(ret), KPC(this), K(min_scn),
K(entry), K(replayable_point_scn), K(info), K(is_group_iterator));
@ -322,20 +325,42 @@ private:
}
if (OB_SUCC(ret)) {
curr_entry_is_raw_write_ = entry.get_header().is_raw_write();
accumlate_checksum_ = new_accumlate_checksum;
accumulate_checksum_ = new_accumulate_checksum;
// To support get PADDING entry, need record the meta info of PADDING entry
set_padding_info_(entry);
}
return ret;
}
// @brief: accumlate checksum verify, only verify checkum when accum_checksum_ is not -1.
// @brief: accumulate checksum verify, only verify checksum when accum_checksum_ is not -1.
// ret val:
// OB_SUCCESS
// OB_CHECKSUM_ERROR
int verify_accum_checksum_(const LogGroupEntry &entry,
int64_t &new_accumlate_checksum);
int64_t &new_accumulate_checksum);
int construct_padding_log_entry_(const int64_t memset_start_pos,
const int64_t padding_log_entry_len);
bool is_padding_entry_end_lsn_(const LSN &lsn)
{
return 0 == lsn_2_offset(lsn, PALF_BLOCK_SIZE);
}
void set_padding_info_(const LogGroupEntry &entry)
{
curr_entry_is_padding_ = entry.get_header().is_padding_log();
padding_entry_size_ = entry.get_header().get_data_len();
padding_entry_scn_ = entry.get_header().get_max_scn();
}
void reset_padding_info_()
{
curr_entry_is_padding_ = false;
padding_entry_size_ = 0;
padding_entry_scn_.reset();
}
private:
static constexpr int MAX_READ_TIMES_IN_EACH_NEXT = 2;
// In each `next_entry` round, need read data from `LogStorage` directlly,
// In each `next_entry` round, need read data from `LogStorage` directly,
// to amortized reading cost, use `read_buf` to cache the last read result.
//
// NB: each log must not exceed than 2MB + 4KB.
@ -371,7 +396,11 @@ static constexpr int MAX_READ_TIMES_IN_EACH_NEXT = 2;
//
share::SCN prev_entry_scn_;
GetModeVersion get_mode_version_;
int64_t accumlate_checksum_;
int64_t accumulate_checksum_;
// To support get PADDING ENTRY, add following fields:
int64_t curr_entry_is_padding_;
int64_t padding_entry_size_;
SCN padding_entry_scn_;
bool is_inited_;
};
@ -388,7 +417,10 @@ LogIteratorImpl<ENTRY>::LogIteratorImpl()
curr_entry_size_(0),
init_mode_version_(0),
prev_entry_scn_(),
accumlate_checksum_(-1),
accumulate_checksum_(-1),
curr_entry_is_padding_(false),
padding_entry_size_(0),
padding_entry_scn_(),
is_inited_(false)
{
}
@ -418,7 +450,10 @@ int LogIteratorImpl<ENTRY>::init(const GetModeVersion &get_mode_version,
init_mode_version_ = PALF_INITIAL_PROPOSAL_ID;
get_mode_version_ = get_mode_version;
prev_entry_scn_.reset();
accumlate_checksum_ = -1;
accumulate_checksum_ = -1;
curr_entry_is_padding_ = false;
padding_entry_size_ = 0;
padding_entry_scn_.reset();
is_inited_ = true;
PALF_LOG(TRACE, "LogIteratorImpl init success", K(ret), KPC(this));
}
@ -437,7 +472,10 @@ void LogIteratorImpl<ENTRY>::reuse()
curr_entry_size_ = 0;
init_mode_version_ = PALF_INITIAL_PROPOSAL_ID;
prev_entry_scn_.reset();
accumlate_checksum_ = -1;
accumulate_checksum_ = -1;
curr_entry_is_padding_ = false;
padding_entry_size_ = 0;
padding_entry_scn_.reset();
}
template <class ENTRY>
@ -445,7 +483,10 @@ void LogIteratorImpl<ENTRY>::destroy()
{
if (IS_INIT) {
is_inited_ = false;
accumlate_checksum_ = -1;
padding_entry_scn_.reset();
padding_entry_size_ = 0;
curr_entry_is_padding_ = false;
accumulate_checksum_ = -1;
prev_entry_scn_.reset();
init_mode_version_ = PALF_INITIAL_PROPOSAL_ID;
curr_entry_size_ = 0;
@ -497,12 +538,12 @@ int LogIteratorImpl<ENTRY>::get_next_entry_(const SCN &replayable_point_scn,
//
// For example, the end lsn of this log is 64MB, the start lsn of this log
// is 62M, the end lsn of 'read_buf_' is 63MB, however, this log has been
// truncate from disk, and then new log which length is 1.5MB has writen,
// truncate from disk, and then new log which length is 1.5MB has written,
// the log tail is 63.5MB. even if we read data from storage, the data is
// always not integrity.
//
// We can limit the number of disk reads to 2, the reason is that: when
// we pasrs a PADDING entry with PalfBufferIterator, we need read data
// we parse a PADDING entry with PalfBufferIterator, we need read data
// from disk again.
//
int read_times = 0;
@ -568,9 +609,9 @@ int LogIteratorImpl<ENTRY>::next(const share::SCN &replayable_point_scn,
// NB: when return OB_ITER_END, we need try to clean up cache, and we should clean up cache only when
// the log ts of curr entry is greater than 'replayable_point_scn', otherwise, we would return some logs
// which has been flasback, consider following case:
// which has been flashback, consider following case:
// 1. T1, 'replayable_point_scn' is 10, the log ts of curr entry is 15, but there is no flashback option.(no any bad effect)
// 2. T2, 'replayable_point_scn' is 10, the logs on disk which the log ts after 10 has been flashbacked, and
// 2. T2, 'replayable_point_scn' is 10, the logs on disk which the log ts after 10 has been flashback, and
// return OB_ITER_END because of 'file end lsn'.(no any bad effect)
// 3. T3, 'replayable_point_scn' has been advanced to 16, and write several logs on disk, however, the cache
// of iterator has not been clean up, the old logs will be returned.(bad effect)
@ -579,7 +620,7 @@ int LogIteratorImpl<ENTRY>::next(const share::SCN &replayable_point_scn,
(void) try_clean_up_cache_();
if (!replayable_point_scn.is_valid()) {
ret = OB_INVALID_ARGUMENT;
PALF_LOG(WARN, "invalid argumetn", K(replayable_point_scn), KPC(this));
PALF_LOG(WARN, "invalid argument", K(replayable_point_scn), KPC(this));
} else if (OB_FAIL(get_next_entry_(replayable_point_scn, info))) {
// NB: if the data which has been corrupted, clean cache.
// NB: if the accum_checksum_ is not match, return OB_CHECKSUM_ERROR.
@ -609,11 +650,11 @@ int LogIteratorImpl<ENTRY>::next(const share::SCN &replayable_point_scn,
// 1. if 'curr_entry_' iterate end by replayable point scn:
// we should set next_min_scn to std::min(replayable_point_scn+1, the log scn of 'curr_entry_'),
// however, replayable_point_scn may be smaller than 'prev_entry_scn_'(for example, the log
// entry correspond to'prev_entry_scn_' was writen by APPEND, its' scn may be greater than
// replayable_point_scn), we shoud set next_min_scn to std::max(
// entry correspond to'prev_entry_scn_' was written by APPEND, its' scn may be greater than
// replayable_point_scn), we should set next_min_scn to std::max(
// std::min(replayable_point_scn + 1, the log scn of 'curr_entry_'), 'prev_entry_scn_' + 1).
//
// 2. oterwise, iterate end by file end lsn:
// 2. otherwise, iterate end by file end lsn:
// - case 1: if 'curr_entry_' has been parsed from LogIteratorStorage, however, it's not readable
// due to file end lsn(consider that someone set the file end lsn to the middle of one
// LogGroupEntry), we should set next_min_scn to the max of min(curr_entry_'s scn,
@ -641,7 +682,7 @@ int LogIteratorImpl<ENTRY>::next(const share::SCN &replayable_point_scn,
// consider follower case:
// T1, iterate an entry successfully, and make prev_entry_scn to 5;
// T2, iterate control by readable point scn, scn of 'curr_entry_' is 10, and readable point scn is 8.
// T3, the logs after 8 have been flashabcked, and no log has been writen.
// T3, the logs after 8 have been flashback, and no log has been written.
// if we don't advance 'prev_entry_scn_' to 8, the continuous point of replay service can not update
// to 8.
if (info.log_scn_ > replayable_point_scn) {
@ -653,7 +694,6 @@ int LogIteratorImpl<ENTRY>::next(const share::SCN &replayable_point_scn,
next_min_scn = prev_entry_scn_.is_valid() ? SCN::plus(prev_entry_scn_, 1) : SCN::min_scn();
PALF_LOG(TRACE, "update next_min_scn to prev_entry_scn_ + 1", KPC(this), K(info), K(replayable_point_scn));
} else {
}
}
if (OB_FAIL(ret)) {
@ -674,6 +714,12 @@ int LogIteratorImpl<ENTRY>::get_entry(ENTRY &entry, LSN &lsn, bool &is_raw_write
} else if (OB_FAIL(entry.shallow_copy(curr_entry_))) {
ret = OB_ERR_UNEXPECTED;
PALF_LOG(ERROR, "shallow_copy failed", K(ret), KPC(this));
// If the 'start_lsn' of PalfBufferIterator is not pointed to LogGroupEntry,
// before iterate next LogGroupEntry, the LogEntry will not be checked integrity,
// therefore, when accumulate_checksum_ is -1, check the integrity of entry.
} else if (-1 == accumulate_checksum_ && !entry.check_integrity()) {
ret = OB_INVALID_DATA;
PALF_LOG(WARN, "invalid data", K(ret), KPC(this), K(entry));
} else {
lsn = log_storage_->get_lsn(curr_read_pos_);
is_raw_write = curr_entry_is_raw_write_;
@ -683,7 +729,7 @@ int LogIteratorImpl<ENTRY>::get_entry(ENTRY &entry, LSN &lsn, bool &is_raw_write
template<class ENTRY>
int LogIteratorImpl<ENTRY>::verify_accum_checksum_(const LogGroupEntry &entry,
int64_t &new_accumlate_checksum)
int64_t &new_accumulate_checksum)
{
int ret = OB_SUCCESS;
int64_t data_checksum = -1;
@ -691,13 +737,13 @@ int LogIteratorImpl<ENTRY>::verify_accum_checksum_(const LogGroupEntry &entry,
if (!entry.check_integrity(data_checksum)) {
ret = OB_INVALID_DATA;
PALF_LOG(WARN, "invalid data", K(ret), KPC(this), K(entry));
} else if (-1 == accumlate_checksum_) {
new_accumlate_checksum = expected_verify_checksum;
PALF_LOG(INFO, "init accumlate_checksum to first LogGroupEntry", K(entry), KPC(this),
K(new_accumlate_checksum));
} else if (-1 == accumulate_checksum_) {
new_accumulate_checksum = expected_verify_checksum;
PALF_LOG(INFO, "init accumulate_checksum to first LogGroupEntry", K(entry), KPC(this),
K(new_accumulate_checksum));
} else if (OB_FAIL(LogChecksum::verify_accum_checksum(
accumlate_checksum_, data_checksum,
expected_verify_checksum, new_accumlate_checksum))) {
accumulate_checksum_, data_checksum,
expected_verify_checksum, new_accumulate_checksum))) {
PALF_LOG(WARN, "verify accumlate checksum failed", K(ret), KPC(this), K(entry));
} else {
PALF_LOG(TRACE, "verify_accum_checksum_ success", K(ret), KPC(this), K(entry));
@ -705,6 +751,47 @@ int LogIteratorImpl<ENTRY>::verify_accum_checksum_(const LogGroupEntry &entry,
return ret;
}
template<class ENTRY>
int LogIteratorImpl<ENTRY>::construct_padding_log_entry_(const int64_t memset_start_pos,
const int64_t padding_log_entry_len)
{
int ret = OB_SUCCESS;
LSN padding_log_entry_start_lsn = log_storage_->get_lsn(memset_start_pos);
// defense code
if (!curr_entry_is_padding_) {
ret = OB_ERR_UNEXPECTED;
PALF_LOG(ERROR, "only call this function when LogGroupEntry is padding", KPC(this));
} else if (!is_padding_entry_end_lsn_(padding_log_entry_start_lsn + padding_log_entry_len)) {
ret = OB_ERR_UNEXPECTED;
PALF_LOG(ERROR, "unexpected error, the end lsn of padding log entry is not the header of nexet block!!!",
KPC(this), K(memset_start_pos), K(padding_log_entry_len));
} else if (memset_start_pos + padding_log_entry_len > curr_read_buf_end_pos_) {
ret = OB_ERR_UNEXPECTED;
PALF_LOG(ERROR, "unexpected error, the end pos of 'curr_read_buf_' is not enough!!!",
KPC(this), K(memset_start_pos), K(padding_log_entry_len));
} else {
// set memory which contained padding log entry to PADDING_LOG_CONTENT_CHAR
memset(buf_+memset_start_pos, PADDING_LOG_CONTENT_CHAR, padding_log_entry_len);
// The format of LogEntry
// | LogEntryHeader | ObLogBaseHeader | data |
// NB: construct padding log entry by self
int64_t header_size = LogEntryHeader::HEADER_SER_SIZE;
int64_t padding_log_entry_data_len = padding_log_entry_len - header_size;
int64_t serialize_log_entry_header_pos = memset_start_pos;
if (OB_FAIL(LogEntryHeader::generate_padding_log_buf(padding_log_entry_data_len,
padding_entry_scn_,
buf_+serialize_log_entry_header_pos,
LogEntryHeader::PADDING_LOG_ENTRY_SIZE))) {
PALF_LOG(ERROR, "generate_padding_log_buf failed", KPC(this), K(padding_log_entry_data_len),
K(header_size), K(padding_log_entry_len), K(serialize_log_entry_header_pos));
} else {
PALF_LOG(INFO, "generate padding log entry successfully", KPC(this));
reset_padding_info_();
}
}
return ret;
}
// step1. according to magic number, acquire log entry type;
// step2. deserialize ENTRY from 'read_buf_', if buf not enough, return and run in next
// round; step3. check entry integrity, if failed, return OB_INVALID_DATA; step4. if the
@ -821,6 +908,7 @@ int LogIteratorImpl<ENTRY>::get_log_entry_type_(LogEntryType &log_entry_type)
int ret = OB_SUCCESS;
int16_t magic_number;
int64_t pos = curr_read_pos_;
bool is_log_entry_iterator = std::is_same<ENTRY, LogEntry>::value;
// ensure that we can get the magic number of each log entry
if (OB_FAIL(
serialization::decode_i16(buf_, curr_read_buf_end_pos_, pos, &magic_number))) {
@ -833,6 +921,14 @@ int LogIteratorImpl<ENTRY>::get_log_entry_type_(LogEntryType &log_entry_type)
log_entry_type = LogEntryType::LOG_META_ENTRY_HEADER;
} else if (LogBlockHeader::MAGIC == magic_number) {
log_entry_type = LogEntryType::LOG_INFO_BLOCK_HEADER;
} else if (is_log_entry_iterator && curr_entry_is_padding_
// defense code
// after iterate padding log entry successfully, assume next block has been corrupted,
// iterate next log entry in next block will be failed, we mustn't return padding log
// entry again.(In this case, curr_entry_is_padding_ is the value of prev log entry)
&& !is_padding_entry_end_lsn_(log_storage_->get_lsn(curr_read_pos_))) {
log_entry_type = LogEntryType::LOG_ENTRY_HEADER;
PALF_LOG(INFO, "need consume padding log", KPC(this));
} else {
ret = OB_INVALID_DATA;
}
@ -851,7 +947,7 @@ int LogIteratorImpl<ENTRY>::get_log_entry_type_(LogEntryType &log_entry_type)
// When buf not enough to hold a complete log, need read data from 'curr',
// however, to avoid read amplification, we need read data from 'end':
// 1. limit read size into LOG_MAX_LOG_BUFFER_SIZE - valid_tail_part_size.
// 2. memove 'valid_tail_part' to the header of read_buf_.
// 2. memmove 'valid_tail_part' to the header of read_buf_.
// 3. read data from 'end' and memcpy these data into read_buf_ + valid_tail_part_size.
//
// NB: when iterate to the end of block, need switch to next block.
@ -890,7 +986,7 @@ void LogIteratorImpl<ENTRY>::try_clean_up_cache_()
// reuse LogIteratorStorage firstly, only the log before 'start_lsn_' + 'curr_read_pos_'
// has been consumed.
// NB: need ensure that we can not update 'curr_read_pos_' to 'curr_read_pos_' + sizeof(LogGroupEntryHeader)
// when the LogGroupEntryHeader after 'curr_read_pos_' need be controlled by replable_point_scn.
// when the LogGroupEntryHeader after 'curr_read_pos_' need be controlled by replayable_point_scn.
log_storage_->reuse(curr_read_lsn);
curr_read_buf_start_pos_ = 0;
curr_read_pos_ = 0;
@ -898,18 +994,28 @@ void LogIteratorImpl<ENTRY>::try_clean_up_cache_()
curr_entry_.reset();
// NB: we can not reset curr_entry_is_raw_write_, otherwise, the log entry after replayable_point_scn may no be
// controlled by replable_point_scn.
// controlled by replayable_point_scn.
// consider that:
// 1. At T1 timestamp, the LogGroupEntry has three LogEntry, the first LogEntry has been seen by ReplayService,
// however, the remained LogEntry can not been seen due to replayable_point_scn (current replayable_point_scn
// may be lag behind others).
// 2. At T2 timestamp, this replica is in flashback mode, the logs are not been flashback(flashback_scn is greater
// than replayable_point_scn, and there are several logs which SCN is greater than flashback_scn)
// 3. At T3 timestamp, ReplayService use next() function, iterator will try_clean_up_cache_ because mode version
// has been changed. if reset curr_entry_is_raw_write_, the second LogEntry may be seen by ReplayService even
// if the SCN of this LogEntry is greater than flashback_scn.
// curr_entry_is_raw_write_ = false;
curr_entry_size_ = 0;
init_mode_version_ = current_mode_version;
// we can not reset prev_entry_scn_, otherwise, after flashback, if there is no logs which can be readbale on disk,
// we can not reset prev_entry_scn_, otherwise, after flashback, if there is no logs which can be readable on disk,
// we can not return a valid next_min_scn.
// - prev_entry_.reset();
// we need reset accum_checksum_, otherwise, the accum_checksum_ is the log before flashback, and iterate new
// group log will fail.
accumlate_checksum_ = -1;
accumulate_checksum_ = -1;
reset_padding_info_();
}
}
} // end namespace palf

View File

@ -720,8 +720,22 @@ int LogSlidingWindow::generate_new_group_log_(const LSN &lsn,
if (OB_FAIL(wait_group_buffer_ready_(lsn, log_body_size + LogGroupEntryHeader::HEADER_SER_SIZE))) {
PALF_LOG(ERROR, "group_buffer wait failed", K(ret), K_(palf_id), K_(self));
} else if (is_padding_log) {
// padding log, fill log body with '\0'.
if (OB_FAIL(group_buffer_.fill_padding_body(lsn + LogGroupEntryHeader::HEADER_SER_SIZE, log_body_size))) {
const int64_t padding_log_body_size = log_body_size - LogEntryHeader::HEADER_SER_SIZE;
const int64_t padding_valid_data_len = LogEntryHeader::PADDING_LOG_ENTRY_SIZE;
// padding_valid_data only include LogEntryHeader and ObLogBaseHeader
// The format like follow:
// | LogEntryHeader | ObLogBaseHeader|
// and the format of padding log entry like follow:
// | LogEntryHeader | ObLogBaseHeader| PADDING_LOG_CONTENT_CHAR |
// | 32 BYTE | 16 BYTE | padding_log_body_size - 48 BYTE |
char padding_valid_data[padding_valid_data_len];
memset(padding_valid_data, 0, padding_valid_data_len);
if (OB_FAIL(LogEntryHeader::generate_padding_log_buf(padding_log_body_size, scn, padding_valid_data, padding_valid_data_len))) {
PALF_LOG(ERROR, "generate_padding_log_buf failed", K_(palf_id), K_(self), K(padding_valid_data_len),
K(scn), K(padding_log_body_size));
}
// padding log, fill log body with PADDING_LOG_CONTENT_CHAR.
else if (OB_FAIL(group_buffer_.fill_padding_body(lsn + LogGroupEntryHeader::HEADER_SER_SIZE, padding_valid_data, padding_valid_data_len, log_body_size))) {
PALF_LOG(WARN, "group_buffer fill_padding_body failed", K(ret), K_(palf_id), K_(self), K(log_body_size));
} else {
// inc ref

View File

@ -575,25 +575,6 @@ int ObLogReplayService::is_replay_done(const share::ObLSID &id,
return ret;
}
int ObLogReplayService::flashback(const share::ObLSID &id)
{
int ret = OB_SUCCESS;
ObReplayStatus *replay_status = NULL;
ObReplayStatusGuard guard;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
CLOG_LOG(WARN, "replay service not init", K(ret));
} else if (OB_FAIL(get_replay_status_(id, guard))) {
CLOG_LOG(WARN, "guard get replay status failed", K(ret), K(id));
} else if (NULL == (replay_status = guard.get_replay_status())) {
ret = OB_ERR_UNEXPECTED;
CLOG_LOG(WARN, "replay status is not exist", K(ret), K(id));
} else if (OB_FAIL(replay_status->flashback())) {
CLOG_LOG(WARN, "replay status flashback failed", K(ret), K(id));
}
return ret;
}
//通用接口, 受控回放时最终返回值为受控回放点前的最后一条日志的log_ts
int ObLogReplayService::get_max_replayed_scn(const share::ObLSID &id, SCN &scn)
{
@ -1005,6 +986,10 @@ int ObLogReplayService::fetch_and_submit_single_log_(ObReplayStatus &replay_stat
CLOG_LOG(TRACE, "skip current log", K(replay_status), K(cur_lsn), K(log_size), K(cur_log_submit_scn));
} else if (OB_FAIL(header.deserialize(log_buf, log_size, header_pos))) {
CLOG_LOG(WARN, "basic header deserialize failed", K(ret), K(header_pos), K(id));
} else if (ObLogBaseType::PADDING_LOG_BASE_TYPE == header.get_log_type()) {
// For padding log entry, iterate next log directly.
need_iterate_next_log = true;
CLOG_LOG(INFO, "no need to replay padding log entry", KPC(submit_task), K(header));
} else if (header.need_pre_replay_barrier()) {
// 前向barrier日志的replay task和log buf需要分别分配内存
if (OB_FAIL(fetch_pre_barrier_log_(replay_status,
@ -1077,7 +1062,6 @@ int ObLogReplayService::handle_submit_task_(ObReplayServiceSubmitTask *submit_ta
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
LSN committed_end_lsn;
ObReplayStatus *replay_status = NULL;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
@ -1090,11 +1074,7 @@ int ObLogReplayService::handle_submit_task_(ObReplayServiceSubmitTask *submit_ta
ret = OB_ERR_UNEXPECTED;
on_replay_error_();
CLOG_LOG(ERROR, "replay status is NULL", KPC(submit_task), KPC(replay_status), KR(ret));
} else if (OB_FAIL(submit_task->get_committed_end_lsn(committed_end_lsn))) {
// getting committed_end_lsn first ensures palf has all logs until committed_end_lsn.
CLOG_LOG(ERROR, "failed to get_committed_end_lsn", KR(ret), K(committed_end_lsn), KPC(replay_status));
} else if (replay_status->try_rdlock()) {
(void)submit_task->get_committed_end_lsn(committed_end_lsn);
const int64_t start_ts = ObClockGenerator::getClock();
bool need_submit_log = true;
int64_t count = 0;
@ -1110,41 +1090,7 @@ int ObLogReplayService::handle_submit_task_(ObReplayServiceSubmitTask *submit_ta
const SCN &replayable_point = replayable_point_.atomic_load();
need_submit_log = submit_task->has_remained_submit_log(replayable_point,
iterate_end_by_replayable_point);
// consider 3 cases
// 1. has_remained_submit_log return true:
// just fetch this log.
// 2. has_remained_submit_log return false and is iterate_end_by_replayable_point:
// do nothing, if replayable_point changed, thread lease ensures this task retry.
// 3. has_remained_submit_log return true and not iterate_end_by_replayable_point:
// check whether has padding log.
if (!need_submit_log) {
if (OB_FAIL(submit_task->get_next_to_submit_log_info(to_submit_lsn, to_submit_scn))) {
CLOG_LOG(ERROR, "failed to get_next_to_submit_log_info", KR(ret), K(to_submit_lsn),
K(to_submit_scn));
} else if (!iterate_end_by_replayable_point
&& committed_end_lsn.is_valid()
&& to_submit_lsn < committed_end_lsn) {
//TODO: @runlin 移除padding日志时此处特殊处理需要一并移除
//当前palf最后一条日志为padding,直接推大to_submit_lsn
if (lsn_2_offset(committed_end_lsn, PALF_BLOCK_SIZE) != 0) {
CLOG_LOG(WARN, "no log to fetch but committed_end_lsn is not new file header",
KR(ret), K(to_submit_lsn), K(committed_end_lsn), KPC(replay_status));
} else if (1 != lsn_2_block(committed_end_lsn, PALF_BLOCK_SIZE) -
lsn_2_block(to_submit_lsn, PALF_BLOCK_SIZE)) {
CLOG_LOG(ERROR, "padding log committed_end_lsn is not continuous with to_submit_lsn",
KR(ret), K(to_submit_lsn), K(committed_end_lsn), KPC(replay_status));
//padding日志scn和尾部日志重复,只更新lsn
} else if (OB_FAIL(submit_task->update_next_to_submit_lsn(committed_end_lsn))) {
// log info回退
CLOG_LOG(ERROR, "failed to update_next_submit_log_info", KR(ret), K(committed_end_lsn), KPC(replay_status));
replay_status->set_err_info(committed_end_lsn, to_submit_scn, ObLogBaseType::INVALID_LOG_BASE_TYPE,
0, true, ObClockGenerator::getClock(), ret);
} else {
CLOG_LOG(INFO, "no log to fetch but committed_end_lsn not reached, last log may be padding",
KR(ret), K(to_submit_lsn), K(committed_end_lsn), K(to_submit_scn), KPC(replay_status));
}
}
//end loop, return OB_SUCCESS for drop current task
} else if (OB_SUCC(fetch_and_submit_single_log_(*replay_status, submit_task, to_submit_lsn,
to_submit_scn, log_size))) {
count++;

View File

@ -148,7 +148,6 @@ public:
int is_replay_done(const share::ObLSID &id,
const palf::LSN &end_lsn,
bool &is_done);
int flashback(const share::ObLSID &id);
int get_max_replayed_scn(const share::ObLSID &id, share::SCN &scn);
int submit_task(ObReplayServiceTask *task);
int update_replayable_point(const share::SCN &replayable_scn);

View File

@ -111,7 +111,6 @@ int ObReplayServiceSubmitTask::init(const palf::LSN &base_lsn,
CLOG_LOG(ERROR, "base_scn is invalid", K(type_), K(base_lsn), K(base_scn), KR(ret));
} else {
replay_status_ = replay_status;
committed_end_lsn_ = base_lsn;
next_to_submit_lsn_ = base_lsn;
next_to_submit_scn_.set_min();
base_lsn_ = base_lsn;
@ -121,7 +120,7 @@ int ObReplayServiceSubmitTask::init(const palf::LSN &base_lsn,
// 在没有写入的情况下有可能已经到达边界
CLOG_LOG(WARN, "iterator next failed", K(iterator_), K(tmp_ret));
}
CLOG_LOG(INFO, "submit log task init success", K(type_), K(next_to_submit_lsn_), K(committed_end_lsn_),
CLOG_LOG(INFO, "submit log task init success", K(type_), K(next_to_submit_lsn_),
K(next_to_submit_scn_), K(replay_status_), K(ret));
}
return ret;
@ -131,7 +130,6 @@ void ObReplayServiceSubmitTask::reset()
{
ObLockGuard<ObSpinLock> guard(lock_);
next_to_submit_lsn_.reset();
committed_end_lsn_.reset();
next_to_submit_scn_.reset();
base_lsn_.reset();
base_scn_.reset();
@ -160,13 +158,6 @@ int ObReplayServiceSubmitTask::get_next_to_submit_log_info_(LSN &lsn, SCN &scn)
return ret;
}
int ObReplayServiceSubmitTask::get_committed_end_lsn(LSN &lsn) const
{
int ret = OB_SUCCESS;
lsn = committed_end_lsn_;
return ret;
}
int ObReplayServiceSubmitTask::get_base_lsn(LSN &lsn) const
{
ObLockGuard<ObSpinLock> guard(lock_);
@ -233,35 +224,6 @@ int ObReplayServiceSubmitTask::update_submit_log_meta_info(const LSN &lsn,
return ret;
}
int ObReplayServiceSubmitTask::update_committed_end_lsn(const LSN &lsn)
{
return update_committed_end_lsn_(lsn);
}
int ObReplayServiceSubmitTask::update_committed_end_lsn_(const LSN &lsn)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(lsn <= committed_end_lsn_)) {
ret = OB_INVALID_ARGUMENT;
CLOG_LOG(WARN, "update_committed_end_lsn invalid argument", K(type_), K(lsn),
K(committed_end_lsn_), K(ret));
} else {
committed_end_lsn_ = lsn;
}
return ret;
}
int ObReplayServiceSubmitTask::set_committed_end_lsn(const LSN &lsn)
{
int ret = OB_SUCCESS;
ATOMIC_SET(&committed_end_lsn_.val_, lsn.val_);
if (next_to_submit_lsn_ > committed_end_lsn_) {
CLOG_LOG(INFO, "need rollback next_to_submit_lsn_", K(lsn), KPC(this));
next_to_submit_lsn_ = committed_end_lsn_;
}
return ret;
}
int ObReplayServiceSubmitTask::need_skip(const SCN &scn, bool &need_skip)
{
int ret = OB_SUCCESS;
@ -838,35 +800,6 @@ void ObReplayStatus::switch_to_follower(const palf::LSN &begin_lsn)
CLOG_LOG(INFO, "replay status switch_to_follower", KPC(this), K(begin_lsn));
}
int ObReplayStatus::flashback()
{
int ret = OB_SUCCESS;
WLockGuardWithRetryInterval wguard(rwlock_, WRLOCK_TRY_THRESHOLD, WRLOCK_RETRY_INTERVAL);
if (OB_FAIL(flashback_())) {
CLOG_LOG(WARN, "replay status flashback failed", K(ret), KPC(this));
} else {
CLOG_LOG(INFO, "replay status flashback success", K(ret), KPC(this));
}
return ret;
}
int ObReplayStatus::flashback_()
{
int ret = OB_SUCCESS;
LSN committed_end_lsn;
if (OB_FAIL(palf_handle_.get_end_lsn(committed_end_lsn))) {
CLOG_LOG(WARN, "get_end_lsn failed", K(ret), KPC(this));
} else if (OB_FAIL(submit_log_task_.set_committed_end_lsn(committed_end_lsn))) {
CLOG_LOG(WARN, "set_committed_end_lsn failed", K(ret), KPC(this), K(committed_end_lsn));
} else if (OB_FAIL(submit_task_to_replay_service_(submit_log_task_))) {
CLOG_LOG(ERROR, "failed to submit submit_log_task to replay service", K(submit_log_task_),
KPC(this), K(ret));
} else {
// do nothing
}
return ret;
}
bool ObReplayStatus::has_remained_replay_task() const
{
int64_t count = 0;
@ -928,9 +861,6 @@ int ObReplayStatus::update_end_offset(const LSN &lsn)
CLOG_LOG(ERROR, "invalid arguments", K(ls_id_), K(lsn), K(ret));
} else if (!need_submit_log()) {
// leader do nothing, keep submit_log_task recording last round status as follower
} else if (OB_FAIL(submit_log_task_.update_committed_end_lsn(lsn))) {
// update offset and submit submit_log_task
CLOG_LOG(ERROR, "failed to update_apply_end_offset", KR(ret), K(ls_id_), K(lsn));
} else if (OB_FAIL(submit_task_to_replay_service_(submit_log_task_))) {
CLOG_LOG(ERROR, "failed to submit submit_log_task to replay Service", K(submit_log_task_),
KPC(this), K(ret));
@ -1345,8 +1275,8 @@ int ObReplayStatus::stat(LSReplayStat &stat) const
if (OB_FAIL(submit_log_task_.get_next_to_submit_log_info(stat.unsubmitted_lsn_,
stat.unsubmitted_scn_))) {
CLOG_LOG(WARN, "get_next_to_submit_log_info failed", KPC(this), K(ret));
} else if (OB_FAIL(submit_log_task_.get_committed_end_lsn(stat.end_lsn_))) {
CLOG_LOG(WARN, "get_committed_end_lsn failed", KPC(this), K(ret));
} else if (OB_FAIL(palf_handle_.get_end_lsn(stat.end_lsn_))) {
CLOG_LOG(WARN, "get_end_lsn from palf failed", KPC(this), K(ret));
}
}
return ret;

View File

@ -270,7 +270,6 @@ class ObReplayServiceSubmitTask : public ObReplayServiceTask
public:
ObReplayServiceSubmitTask(): ObReplayServiceTask(),
next_to_submit_lsn_(),
committed_end_lsn_(),
next_to_submit_scn_(),
base_lsn_(),
base_scn_(),
@ -296,9 +295,6 @@ public:
// 不允许回退
int update_submit_log_meta_info(const palf::LSN &lsn, const share::SCN &scn);
int update_next_to_submit_lsn(const palf::LSN &lsn);
int update_committed_end_lsn(const palf::LSN &lsn);
// only for flashback, should not complicated with update_committed_end_lsn
int set_committed_end_lsn(const palf::LSN &lsn);
int get_next_to_submit_log_info(palf::LSN &lsn, share::SCN &scn) const;
int get_committed_end_lsn(palf::LSN &lsn) const;
int get_base_lsn(palf::LSN &lsn) const;
@ -313,7 +309,6 @@ public:
INHERIT_TO_STRING_KV("ObReplayServiceSubmitTask", ObReplayServiceTask,
K(next_to_submit_lsn_),
K(committed_end_lsn_),
K(next_to_submit_scn_),
K(base_lsn_),
K(base_scn_),
@ -321,7 +316,6 @@ public:
private:
int update_next_to_submit_lsn_(const palf::LSN &lsn);
int update_next_to_submit_scn_(const share::SCN &scn);
int update_committed_end_lsn_(const palf::LSN &lsn);
void set_next_to_submit_log_info_(const palf::LSN &lsn, const share::SCN &scn);
int get_next_to_submit_log_info_(palf::LSN &lsn, share::SCN &scn) const;
int get_base_lsn_(palf::LSN &lsn) const;
@ -330,8 +324,6 @@ private:
private:
// location of next log after the last log that has already been submit to replay, consider as left side of iterator
palf::LSN next_to_submit_lsn_;
//location of the last log that need submit to replay, consider as right side of iterator
palf::LSN committed_end_lsn_;
share::SCN next_to_submit_scn_;
//initial log lsn when enable replay, for stat replay process
palf::LSN base_lsn_;
@ -502,7 +494,6 @@ public:
bool has_remained_replay_task() const;
// update right margin of logs that need to replay
int update_end_offset(const palf::LSN &lsn);
int flashback();
int push_log_replay_task(ObLogReplayTask &task);
int batch_push_all_task_queue();
@ -591,7 +582,6 @@ private:
// 注销回调并清空任务
int disable_();
bool is_replay_enabled_() const;
int flashback_();
private:
static const int64_t PENDING_COUNT_THRESHOLD = 100;
static const int64_t EAGAIN_COUNT_THRESHOLD = 50000;

View File

@ -76,7 +76,6 @@ TEST(TestLogChecksum, test_log_checksum)
EXPECT_TRUE(parity_check(v1));
EXPECT_FALSE(parity_check(v2));
}
}
}

View File

@ -14,12 +14,16 @@
#include "lib/ob_errno.h"
#include "lib/net/ob_addr.h" // ObAddr
#include "logservice/palf/log_define.h"
#include "logservice/palf/log_group_entry.h"
#include "logservice/palf/log_writer_utils.h"
#include "lib/checksum/ob_crc64.h" // ob_crc64
#define private public
#include "logservice/palf/log_group_entry_header.h"
#include "logservice/palf/log_entry.h"
#include "share/scn.h"
#include "logservice/ob_log_base_header.h" // ObLogBaseHeader
#include "logservice/palf/log_group_buffer.h"
#include "logservice/palf/log_group_entry.h"
#include "logservice/palf/log_writer_utils.h"
#include "share/rc/ob_tenant_base.h"
#undef private
#include <gtest/gtest.h>
@ -220,14 +224,199 @@ TEST(TestLogGroupEntryHeader, test_log_group_entry_header)
EXPECT_TRUE(log_group_entry2.check_integrity());
}
TEST(TestPaddingLogEntry, test_invalid_padding_log_entry)
{
LogEntryHeader header;
char buf[1024];
EXPECT_EQ(OB_INVALID_ARGUMENT, header.generate_padding_header_(NULL, 1, 1, share::SCN::min_scn()));
EXPECT_EQ(OB_INVALID_ARGUMENT, header.generate_padding_header_(buf, 0, 1, share::SCN::min_scn()));
EXPECT_EQ(OB_INVALID_ARGUMENT, header.generate_padding_header_(buf, 1, 0, share::SCN::min_scn()));
EXPECT_EQ(OB_INVALID_ARGUMENT, header.generate_padding_header_(buf, 1, 1, share::SCN::invalid_scn()));
EXPECT_EQ(OB_SUCCESS, header.generate_padding_header_(buf, 1, 1, share::SCN::min_scn()));
EXPECT_EQ(OB_INVALID_ARGUMENT, LogEntryHeader::generate_padding_log_buf(0, share::SCN::min_scn(), buf, 1));
EXPECT_EQ(OB_INVALID_ARGUMENT, LogEntryHeader::generate_padding_log_buf(1, share::SCN::invalid_scn(), buf, 1));
EXPECT_EQ(OB_INVALID_ARGUMENT, LogEntryHeader::generate_padding_log_buf(1, share::SCN::min_scn(), NULL, 1));
EXPECT_EQ(OB_INVALID_ARGUMENT, LogEntryHeader::generate_padding_log_buf(1, share::SCN::min_scn(), buf, 0));
EXPECT_EQ(OB_INVALID_ARGUMENT, LogEntryHeader::generate_padding_log_buf(1, share::SCN::min_scn(), buf, 2));
EXPECT_EQ(OB_INVALID_ARGUMENT, LogEntryHeader::generate_padding_log_buf(2, share::SCN::min_scn(), buf, 1));
EXPECT_EQ(OB_INVALID_ARGUMENT, LogEntryHeader::generate_padding_log_buf(1, share::SCN::min_scn(), buf, 1));
logservice::ObLogBaseHeader base_header;
const int64_t min_padding_valid_data_len = header.get_serialize_size() + base_header.get_serialize_size();
EXPECT_EQ(OB_SUCCESS, LogEntryHeader::generate_padding_log_buf(1+min_padding_valid_data_len, share::SCN::min_scn(), buf, min_padding_valid_data_len));
}
TEST(TestPaddingLogEntry, test_padding_log_entry)
{
PALF_LOG(INFO, "test_padding_log_entry");
LogEntry padding_log_entry;
const int64_t padding_data_len = MAX_LOG_BODY_SIZE;
share::SCN padding_group_scn;
padding_group_scn.convert_for_logservice(ObTimeUtility::current_time_ns());
LogEntryHeader padding_log_entry_header;
char base_header_data[1024] = {'\0'};
logservice::ObLogBaseHeader base_header(logservice::ObLogBaseType::PADDING_LOG_BASE_TYPE,
logservice::ObReplayBarrierType::NO_NEED_BARRIER);
int64_t serialize_pos = 0;
EXPECT_EQ(OB_SUCCESS, base_header.serialize(base_header_data, 1024, serialize_pos));
// 生成padding log entry的有效数据
EXPECT_EQ(OB_SUCCESS, padding_log_entry_header.generate_padding_header_(
base_header_data, base_header.get_serialize_size(),
padding_data_len-LogEntryHeader::HEADER_SER_SIZE, padding_group_scn));
EXPECT_EQ(true, padding_log_entry_header.check_integrity(base_header_data, padding_data_len));
// padding group log format
// | GroupHeader | EntryHeader | BaseHeader | '\0' |
LogGroupEntry padding_group_entry;
LogGroupEntryHeader padding_group_entry_header;
const int64_t padding_buffer_len = MAX_LOG_BUFFER_SIZE;
char *padding_buffer = reinterpret_cast<char *>(ob_malloc(padding_buffer_len, "unittest"));
ASSERT_NE(nullptr, padding_buffer);
memset(padding_buffer, PADDING_LOG_CONTENT_CHAR, padding_buffer_len);
{
// 将base_data的数据拷贝到对应位置
memcpy(padding_buffer+padding_group_entry_header.get_serialize_size() + LogEntryHeader::HEADER_SER_SIZE, base_header_data, 1024);
padding_log_entry.header_ = padding_log_entry_header;
padding_log_entry.buf_ = padding_buffer+padding_group_entry_header.get_serialize_size() + LogEntryHeader::HEADER_SER_SIZE;
// 构造有效的padding_log_entry,用于后续的序列化操作
EXPECT_EQ(true, padding_log_entry.check_integrity());
EXPECT_EQ(true, padding_log_entry.header_.is_padding_log_());
}
{
LogEntry deserialize_padding_log_entry;
const int64_t tmp_padding_buffer_len = MAX_LOG_BUFFER_SIZE;
char *tmp_padding_buffer = reinterpret_cast<char *>(ob_malloc(tmp_padding_buffer_len, "unittest"));
ASSERT_NE(nullptr, tmp_padding_buffer);
int64_t pos = 0;
EXPECT_EQ(OB_SUCCESS, padding_log_entry.serialize(tmp_padding_buffer, tmp_padding_buffer_len, pos));
pos = 0;
EXPECT_EQ(OB_SUCCESS, deserialize_padding_log_entry.deserialize(tmp_padding_buffer, tmp_padding_buffer_len, pos));
EXPECT_EQ(true, deserialize_padding_log_entry.check_integrity());
EXPECT_EQ(padding_log_entry.header_.data_checksum_, deserialize_padding_log_entry.header_.data_checksum_);
ob_free(tmp_padding_buffer);
tmp_padding_buffer = nullptr;
}
serialize_pos = padding_group_entry_header.get_serialize_size();
// 拷贝LogEntry到padding_buffer指定位置
EXPECT_EQ(OB_SUCCESS, padding_log_entry.serialize(padding_buffer, padding_buffer_len, serialize_pos));
LogWriteBuf write_buf;
EXPECT_EQ(OB_SUCCESS, write_buf.push_back(padding_buffer, padding_buffer_len));
bool is_raw_write = false;
bool is_padding_log = true;
int64_t data_checksum = 0;
EXPECT_EQ(OB_SUCCESS, padding_group_entry_header.generate(is_raw_write, is_padding_log, write_buf, padding_data_len, padding_group_scn, 1, LSN(0), 1, data_checksum));
padding_group_entry_header.update_accumulated_checksum(0);
padding_group_entry_header.update_header_checksum();
padding_group_entry.header_ = padding_group_entry_header;
padding_group_entry.buf_ = padding_buffer + padding_group_entry_header.get_serialize_size();
EXPECT_EQ(true, padding_group_entry.check_integrity());
EXPECT_EQ(true, padding_group_entry.header_.is_padding_log());
// 验证反序列化LogEntry
{
int64_t pos = 0;
LogEntry tmp_padding_log_entry;
EXPECT_EQ(OB_SUCCESS, tmp_padding_log_entry.deserialize(padding_group_entry.buf_, padding_group_entry.get_data_len(), pos));
EXPECT_EQ(pos, padding_group_entry.get_data_len());
EXPECT_EQ(true, tmp_padding_log_entry.check_integrity());
EXPECT_EQ(true, tmp_padding_log_entry.header_.is_padding_log_());
logservice::ObLogBaseHeader tmp_base_header;
pos = 0;
EXPECT_EQ(OB_SUCCESS, tmp_base_header.deserialize(tmp_padding_log_entry.buf_, tmp_padding_log_entry.get_data_len(), pos));
EXPECT_EQ(base_header.log_type_, logservice::ObLogBaseType::PADDING_LOG_BASE_TYPE);
}
char *serialize_buffer = reinterpret_cast<char *>(ob_malloc(padding_buffer_len, "unittest"));
ASSERT_NE(nullptr, serialize_buffer);
memset(serialize_buffer, PADDING_LOG_CONTENT_CHAR, padding_buffer_len);
serialize_pos = 0;
// 验证序列化的数据是否符合预期
EXPECT_EQ(OB_SUCCESS, padding_group_entry.serialize(serialize_buffer, padding_buffer_len, serialize_pos));
EXPECT_EQ(serialize_pos, padding_data_len+padding_group_entry_header.get_serialize_size());
LogGroupEntry deserialize_group_entry;
serialize_pos = 0;
EXPECT_EQ(OB_SUCCESS, deserialize_group_entry.deserialize(serialize_buffer, padding_buffer_len, serialize_pos));
EXPECT_EQ(true, deserialize_group_entry.check_integrity());
EXPECT_EQ(true, deserialize_group_entry.header_.is_padding_log());
EXPECT_EQ(padding_group_entry.header_, deserialize_group_entry.header_);
// 验证反序列化LogEntry
{
int64_t pos = 0;
LogEntry tmp_padding_log_entry;
EXPECT_EQ(OB_SUCCESS, tmp_padding_log_entry.deserialize(deserialize_group_entry.buf_, padding_group_entry.get_data_len(), pos));
EXPECT_EQ(pos, deserialize_group_entry.get_data_len());
EXPECT_EQ(true, tmp_padding_log_entry.check_integrity());
EXPECT_EQ(true, tmp_padding_log_entry.header_.is_padding_log_());
logservice::ObLogBaseHeader tmp_base_header;
pos = 0;
EXPECT_EQ(OB_SUCCESS, tmp_base_header.deserialize(tmp_padding_log_entry.buf_, tmp_padding_log_entry.get_data_len(), pos));
EXPECT_EQ(base_header.log_type_, logservice::ObLogBaseType::PADDING_LOG_BASE_TYPE);
}
LogGroupBuffer group_buffer;
LSN start_lsn(0);
ObMallocAllocator::get_instance()->create_and_add_tenant_allocator(1001);
// init MTL
share::ObTenantBase tbase(1001);
share::ObTenantEnv::set_tenant(&tbase);
EXPECT_EQ(OB_SUCCESS, group_buffer.init(start_lsn));
const int64_t padding_valid_data_len = deserialize_group_entry.get_header().get_serialize_size() + padding_log_entry_header.get_serialize_size() + base_header.get_serialize_size();
// 填充有效的padding日志到group buffer,验证数据是否相等
// padding_buffer包括LogGruopEntryHeader, LogEntryHeader, ObLogBaseHeader, padding log body(is filled with '\0')
EXPECT_EQ(OB_SUCCESS, group_buffer.fill_padding_body(start_lsn, serialize_buffer, padding_valid_data_len, padding_buffer_len));
EXPECT_EQ(0, memcmp(group_buffer.data_buf_, serialize_buffer, deserialize_group_entry.get_serialize_size()));
PALF_LOG(INFO, "runlin trace", K(group_buffer.data_buf_), K(serialize_buffer), K(padding_buffer_len), KP(group_buffer.data_buf_), KP(padding_buffer));
ob_free(padding_buffer);
padding_buffer = NULL;
ob_free(serialize_buffer);
serialize_buffer = NULL;
group_buffer.destroy();
ObMallocAllocator::get_instance()->recycle_tenant_allocator(1001);
}
TEST(TestPaddingLogEntry, test_generate_padding_log_entry)
{
PALF_LOG(INFO, "test_generate_padding_log_entry");
LogEntry padding_log_entry;
const int64_t padding_data_len = 1024;
const share::SCN padding_scn = share::SCN::min_scn();
const int64_t padding_log_entry_len = padding_data_len + LogEntryHeader::HEADER_SER_SIZE;
char *out_buf = reinterpret_cast<char*>(ob_malloc(padding_log_entry_len, "unittest"));
ASSERT_NE(nullptr, out_buf);
LogEntryHeader padding_header;
logservice::ObLogBaseHeader base_header(logservice::ObLogBaseType::PADDING_LOG_BASE_TYPE, logservice::ObReplayBarrierType::NO_NEED_BARRIER, 0);
char base_header_buf[1024];
memset(base_header_buf, 0, 1024);
int64_t serialize_base_header_pos = 0;
EXPECT_EQ(OB_SUCCESS, base_header.serialize(base_header_buf, 1024, serialize_base_header_pos));
EXPECT_EQ(OB_SUCCESS, padding_header.generate_padding_header_(base_header_buf, base_header.get_serialize_size(), padding_data_len, padding_scn));
EXPECT_EQ(true, padding_header.check_header_integrity());
EXPECT_EQ(OB_SUCCESS, LogEntryHeader::generate_padding_log_buf(padding_data_len, padding_scn, out_buf, LogEntryHeader::PADDING_LOG_ENTRY_SIZE));
int64_t pos = 0;
EXPECT_EQ(OB_SUCCESS, padding_log_entry.deserialize(out_buf, padding_log_entry_len, pos));
EXPECT_EQ(true, padding_log_entry.check_integrity());
EXPECT_EQ(true, padding_log_entry.header_.is_padding_log_());
EXPECT_EQ(padding_log_entry.header_.data_checksum_, padding_header.data_checksum_);
ob_free(out_buf);
out_buf = nullptr;
}
} // namespace unittest
} // namespace oceanbase
int main(int argc, char **argv)
{
system("rm -f test_log_entry_and_group_entry.log");
OB_LOGGER.set_file_name("test_log_entry_and_group_entry.log", true);
OB_LOGGER.set_log_level("INFO");
PALF_LOG(INFO, "begin unittest::test_log_entry_and_grou_entry");
PALF_LOG(INFO, "begin unittest::test_log_entry_and_group_entry");
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

View File

@ -18,6 +18,7 @@
#define private public
#include "logservice/palf/log_group_buffer.h"
#include "logservice/palf/log_writer_utils.h"
#include "logservice/palf/log_entry_header.h"
#undef private
#include "share/rc/ob_tenant_base.h"
@ -63,6 +64,7 @@ void TestLogGroupBuffer::TearDown()
{
PALF_LOG(INFO, "TestLogGroupBuffer has TearDown");
PALF_LOG(INFO, "TearDown success");
log_group_buffer_.destroy();
ObMallocAllocator::get_instance()->recycle_tenant_allocator(1001);
}
@ -170,35 +172,104 @@ TEST_F(TestLogGroupBuffer, test_fill)
TEST_F(TestLogGroupBuffer, test_fill_padding)
{
LSN lsn;
const int64_t padding_valid_data_len = LogEntryHeader::PADDING_LOG_ENTRY_SIZE;
char padding_valid_data[padding_valid_data_len];
int64_t len = 0;
LSN reuse_lsn(1024);
EXPECT_EQ(OB_NOT_INIT, log_group_buffer_.fill_padding_body(lsn, len));
EXPECT_EQ(OB_NOT_INIT, log_group_buffer_.fill_padding_body(lsn, padding_valid_data, padding_valid_data_len, len));
LSN start_lsn(100);
EXPECT_EQ(OB_SUCCESS, log_group_buffer_.init(start_lsn));
EXPECT_EQ(OB_INVALID_ARGUMENT, log_group_buffer_.fill_padding_body(lsn, len));
EXPECT_EQ(OB_INVALID_ARGUMENT, log_group_buffer_.fill_padding_body(lsn, padding_valid_data, padding_valid_data_len, len));
lsn = reuse_lsn;
EXPECT_EQ(OB_INVALID_ARGUMENT, log_group_buffer_.fill_padding_body(lsn, len));
EXPECT_EQ(OB_INVALID_ARGUMENT, log_group_buffer_.fill_padding_body(lsn, len));
EXPECT_EQ(OB_INVALID_ARGUMENT, log_group_buffer_.fill_padding_body(lsn, padding_valid_data, padding_valid_data_len, len));
len = 100;
lsn.val_ = start_lsn.val_ - 1;
EXPECT_EQ(OB_ERR_UNEXPECTED, log_group_buffer_.fill_padding_body(lsn, len));
EXPECT_EQ(OB_ERR_UNEXPECTED, log_group_buffer_.fill_padding_body(lsn, padding_valid_data, padding_valid_data_len, len));
lsn.val_ = start_lsn.val_;
EXPECT_EQ(OB_SUCCESS, log_group_buffer_.inc_update_reuse_lsn(reuse_lsn));
EXPECT_EQ(OB_ERR_UNEXPECTED, log_group_buffer_.fill_padding_body(lsn, len));
EXPECT_EQ(OB_ERR_UNEXPECTED, log_group_buffer_.fill_padding_body(lsn, padding_valid_data, padding_valid_data_len, len));
lsn = reuse_lsn;
len = log_group_buffer_.get_available_buffer_size() + 1;
EXPECT_EQ(OB_EAGAIN, log_group_buffer_.fill_padding_body(lsn, len));
EXPECT_EQ(OB_EAGAIN, log_group_buffer_.fill_padding_body(lsn, padding_valid_data, padding_valid_data_len, len));
len = 1024;
int64_t used_size = len;
const int64_t buf_size = log_group_buffer_.get_available_buffer_size();
LSN buf_end_lsn = reuse_lsn + (buf_size - (reuse_lsn.val_ - start_lsn.val_));
while (lsn + len < buf_end_lsn) {
EXPECT_EQ(OB_SUCCESS, log_group_buffer_.fill_padding_body(lsn, len));
EXPECT_EQ(OB_SUCCESS, log_group_buffer_.fill_padding_body(lsn, padding_valid_data, padding_valid_data_len, len));
lsn.val_ += len;
}
EXPECT_GT(lsn + len, buf_end_lsn);
}
TEST_F(TestLogGroupBuffer, test_fill_padding_cross_bround)
{
const int64_t padding_valid_data_len = LogEntryHeader::PADDING_LOG_ENTRY_SIZE;
char padding_valid_data[padding_valid_data_len];
memset(padding_valid_data, 'c', padding_valid_data_len);
int64_t len = 0;
LSN start_lsn(0);
EXPECT_EQ(OB_SUCCESS, log_group_buffer_.init(start_lsn));
const int64_t unittest_log_buffer_size = 4 * 1024 * 1024;
log_group_buffer_.available_buffer_size_ = log_group_buffer_.reserved_buffer_size_ = unittest_log_buffer_size;
// LSN为0,提交3条1M日志
const int64_t log_size = 1*1024*1024;
char *buf = reinterpret_cast<char*>(ob_malloc(1*1024*1024, "unittest"));
ASSERT_NE(nullptr, buf);
LSN lsn0(0);
EXPECT_EQ(OB_SUCCESS, log_group_buffer_.fill(lsn0, buf, log_size));
LSN lsn1(1*1024*1024);
EXPECT_EQ(OB_SUCCESS, log_group_buffer_.fill(lsn1, buf, log_size));
LSN lsn2(2*1024*1024);
EXPECT_EQ(OB_SUCCESS, log_group_buffer_.fill(lsn2, buf, log_size));
// 更新reuse lsn为1M, 继续提交2M的padding日志
LSN lsn3(3*1024*1024);
EXPECT_EQ(OB_SUCCESS, log_group_buffer_.inc_update_reuse_lsn(lsn1));
EXPECT_EQ(OB_SUCCESS, log_group_buffer_.fill_padding_body(lsn3, padding_valid_data,
padding_valid_data_len, 2*log_size));
LSN lsn4(5*1024*1024);
// 预期已经没有可用空间
EXPECT_EQ(OB_EAGAIN, log_group_buffer_.fill(lsn4, buf, 1));
// |5.5|2|3|4|5.0|
// 判断5.0的数据是否是padding, 5号日志为2M, 其余日志为1M
EXPECT_EQ(0, memcmp(padding_valid_data, log_group_buffer_.data_buf_+3*1024*1024, padding_valid_data_len));
char *zero_data = reinterpret_cast<char*>(ob_malloc(2*log_size, "unittest"));
ASSERT_NE(nullptr, zero_data);
memset(zero_data, PADDING_LOG_CONTENT_CHAR, 2*1024*1024);
EXPECT_EQ(0, memcmp(log_group_buffer_.data_buf_+3*1024*1024+1024, zero_data, 1*1024*1024-1*1024));
// 更新reuse_lsn为4M-LogEntryHeader::PADDING_LOG_ENTRY_SIZE,此时正好可以将padding日志的有效日志体放在group_buffer尾部
LSN reuse_lsn = LSN(unittest_log_buffer_size - LogEntryHeader::PADDING_LOG_ENTRY_SIZE);
EXPECT_EQ(OB_SUCCESS, log_group_buffer_.inc_update_reuse_lsn(reuse_lsn));
LSN lsn5(reuse_lsn);
// 提交2M的padding日志,有效日志长度为padding_valid_data_len
// 2.x表示2号日志被复写部分
// |5.x|2.x|3|4|5.0|, 5.0开始的地方为reuse_lsn
// 故意将5.x的一些数据搞坏,验证fill_padding_body是否会清0
memset(log_group_buffer_.data_buf_, 'x', 1*1024);
EXPECT_EQ(OB_SUCCESS, log_group_buffer_.fill_padding_body(lsn5, padding_valid_data, padding_valid_data_len, 2*log_size));
// 预期log_group_buffer的尾部数据恰好和padding_valid_data一样
// log_group_buffer的padding剩余数据和padding_valid_data一样
EXPECT_EQ(0, memcmp(log_group_buffer_.data_buf_+lsn5.val_, padding_valid_data, LogEntryHeader::PADDING_LOG_ENTRY_SIZE));
EXPECT_EQ(0, memcmp(log_group_buffer_.data_buf_, padding_valid_data+LogEntryHeader::PADDING_LOG_ENTRY_SIZE, padding_valid_data_len-LogEntryHeader::PADDING_LOG_ENTRY_SIZE));
EXPECT_EQ(0, memcmp(log_group_buffer_.data_buf_+LogEntryHeader::PADDING_LOG_ENTRY_SIZE, zero_data, 2*log_size-padding_valid_data_len));
// 更新reuse_lsn为4M-LogEntryHeader::PADDING_LOG_ENTRY_SIZE+10,此时padding日志的有效日志体需要放在group_buffer的首尾
memset(log_group_buffer_.data_buf_, 'x', 1*1024);
EXPECT_EQ(OB_SUCCESS, log_group_buffer_.inc_update_reuse_lsn(reuse_lsn+10));
LSN lsn6(reuse_lsn+10);
const int64_t tail_padding_log_size = LogEntryHeader::PADDING_LOG_ENTRY_SIZE - 10;
EXPECT_EQ(OB_SUCCESS, log_group_buffer_.fill_padding_body(lsn6, padding_valid_data, padding_valid_data_len, 2*log_size));
EXPECT_EQ(0, memcmp(log_group_buffer_.data_buf_+lsn6.val_, padding_valid_data, tail_padding_log_size));
EXPECT_EQ(0, memcmp(log_group_buffer_.data_buf_, padding_valid_data+tail_padding_log_size, padding_valid_data_len-tail_padding_log_size));
EXPECT_EQ(0, memcmp(log_group_buffer_.data_buf_+padding_valid_data_len-tail_padding_log_size, zero_data, 2*log_size-padding_valid_data_len));
ob_free(buf);
buf = nullptr;
ob_free(zero_data);
zero_data = nullptr;
}
TEST_F(TestLogGroupBuffer, test_check_log_buf_wrapped)
{
LSN lsn;