diff --git a/mittest/logservice/test_ob_simple_log_flashback.cpp b/mittest/logservice/test_ob_simple_log_flashback.cpp index d8ed1d9718..b1c658c2ff 100644 --- a/mittest/logservice/test_ob_simple_log_flashback.cpp +++ b/mittest/logservice/test_ob_simple_log_flashback.cpp @@ -106,13 +106,6 @@ int LogRequestHandler::get_self_addr_(common::ObAddr &self) const return ret; } -// could not enable replay service in mittest for now, just skip it -int ObLogReplayService::flashback(const share::ObLSID &id) -{ - int ret = OB_SUCCESS; - return ret; -} - } namespace unittest diff --git a/mittest/logservice/test_ob_simple_log_replay.cpp b/mittest/logservice/test_ob_simple_log_replay.cpp index 4b8b972cbf..88d68bc460 100644 --- a/mittest/logservice/test_ob_simple_log_replay.cpp +++ b/mittest/logservice/test_ob_simple_log_replay.cpp @@ -222,6 +222,7 @@ TEST_F(TestObSimpleLogReplayFunc, test_flashback_to_padding) PalfHandleImplGuard leader; share::SCN basic_scn = share::SCN::min_scn(); CLOG_LOG(INFO, "test replay begin", K(id)); + OB_LOGGER.set_log_level("TRACE"); EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, leader_idx, leader)); MockLSAdapter ls_adapter; ls_adapter.init((ObLSService *)(0x1)); @@ -279,16 +280,14 @@ TEST_F(TestObSimpleLogReplayFunc, test_flashback_to_padding) // 开启拉日志 bool is_done = false; while (!is_done) { - rp_sv.is_replay_done(ls_id, LSN(PALF_BLOCK_SIZE), is_done); + // 由于padding日志被受控回放,replay此时的回放位点最多为padding_header + rp_sv.is_replay_done(ls_id, padding_header, is_done); usleep(10*1000); CLOG_LOG(WARN, "not replay done", KPC(rp_st), K(padding_header)); } is_done = false; CLOG_LOG(INFO, "runlin trace 3", K(iterator), KPC(rp_st)); - // 预期replay的next_to_submit_lsn是padding尾 - EXPECT_EQ(iterator_end_lsn, rp_st->submit_log_task_.next_to_submit_lsn_); - EXPECT_EQ(OB_SUCCESS, rp_st->flashback()); - // replay执行flashback后,next_to_submit_lsn是padding头 + // 预期replay的next_to_submit_lsn是padding_header EXPECT_EQ(padding_header, rp_st->submit_log_task_.next_to_submit_lsn_); switch_flashback_to_append(leader, mode_version); iterator_end_lsn = LSN(100000000); @@ -298,8 +297,6 @@ TEST_F(TestObSimpleLogReplayFunc, test_flashback_to_padding) padding_tail_scn = leader.get_palf_handle_impl()->get_max_scn(); LSN max_lsn = leader.get_palf_handle_impl()->get_max_lsn(); EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, max_lsn)); - //更新replay的committed_end_lsn - rp_st->submit_log_task_.update_committed_end_lsn(max_lsn); } // Test2: flashback到padding头部, replay先执行flashback,再执行replay padding { @@ -313,15 +310,12 @@ TEST_F(TestObSimpleLogReplayFunc, test_flashback_to_padding) iterator_end_lsn = padding_header; // replay看到的committed位点是padding尾 rp_st->unblock_submit(); - rp_st->submit_log_task_.committed_end_lsn_ = LSN(PALF_BLOCK_SIZE); bool is_done = false; while (!is_done) { - rp_sv.is_replay_done(ls_id, LSN(PALF_BLOCK_SIZE), is_done); + rp_sv.is_replay_done(ls_id, padding_header, is_done); usleep(10*1000); CLOG_LOG(WARN, "not replay done", KPC(rp_st), K(padding_header)); } - // 先执行replay的flashback - EXPECT_EQ(OB_SUCCESS, rp_st->flashback()); // 预期replay的next_to_submit_lsn是padding头 EXPECT_EQ(iterator_end_lsn, rp_st->submit_log_task_.next_to_submit_lsn_); // 修改iterator看到的终点为padding尾 @@ -338,9 +332,8 @@ TEST_F(TestObSimpleLogReplayFunc, test_flashback_to_padding) padding_tail_scn = leader.get_palf_handle_impl()->get_max_scn(); LSN max_lsn = leader.get_palf_handle_impl()->get_max_lsn(); EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, max_lsn)); - //更新replay的committed_end_lsn - rp_st->submit_log_task_.update_committed_end_lsn(max_lsn); } + // Test3: flashback到padding尾部, replay先执行flashback,再执行replay padding { int ret = OB_SUCCESS; @@ -354,9 +347,8 @@ TEST_F(TestObSimpleLogReplayFunc, test_flashback_to_padding) iterator_end_lsn = padding_header; // replay看到的committed位点是padding头 rp_st->unblock_submit(); - rp_st->submit_log_task_.committed_end_lsn_ = (padding_header); bool is_done = false; - // iterator尽管没有吐出padding日志,但replay会直接更新next_to_submit_lsn到padding尾部 + // iterator由于文件长度,不会吐出padding日志,replay的next_to_submit_lsn到padding头部 while (!is_done) { rp_sv.is_replay_done(ls_id, padding_header, is_done); usleep(10*1000); @@ -365,10 +357,8 @@ TEST_F(TestObSimpleLogReplayFunc, test_flashback_to_padding) is_done = false; // 预期replay的next_to_submit_lsn是padding头 EXPECT_EQ(padding_header, rp_st->submit_log_task_.next_to_submit_lsn_); - // 先执行replay的flashback - EXPECT_EQ(OB_SUCCESS, rp_st->flashback()); - // replay执行flashback后,next_to_submit_lsn是padding头, palf的committed位点是padding尾,不会更新next_to_submit_lsn - EXPECT_EQ(padding_header, rp_st->submit_log_task_.next_to_submit_lsn_); + // iterator能看到padding日志 + iterator_end_lsn = LSN(PALF_BLOCK_SIZE); rp_st->trigger_fetch_log(); while (!is_done) { rp_sv.is_replay_done(ls_id, LSN(PALF_BLOCK_SIZE), is_done); @@ -382,49 +372,67 @@ TEST_F(TestObSimpleLogReplayFunc, test_flashback_to_padding) padding_tail_scn = leader.get_palf_handle_impl()->get_max_scn(); LSN max_lsn = leader.get_palf_handle_impl()->get_max_lsn(); EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, max_lsn)); - //更新replay的committed_end_lsn - rp_st->submit_log_task_.update_committed_end_lsn(max_lsn); } - // Test4: flashback到padding尾部, 先replay完,在执行flashback - { - int ret = OB_SUCCESS; - CLOG_LOG(WARN, "flashback to padding tailer case1"); - int64_t abs_timeout_us = 4*1000*1000; - // flashback_scn为padding尾 - SCN flashback_scn = SCN::minus(padding_tail_scn, 1); - switch_append_to_flashback(leader, mode_version); - EXPECT_EQ(OB_SUCCESS, leader.get_palf_handle_impl()->flashback(mode_version, flashback_scn, abs_timeout_us)); - // iterator看到的终点是padding日志尾 - iterator_end_lsn = LSN(PALF_BLOCK_SIZE); - // replay看到的committed位点是padding尾 - rp_st->unblock_submit(); - rp_st->submit_log_task_.next_to_submit_lsn_ = (iterator_end_lsn); - bool is_done = false; - while (!is_done) { - rp_sv.is_replay_done(ls_id, iterator_end_lsn, is_done); - usleep(10*1000); - CLOG_LOG(WARN, "not replay done", KPC(rp_st), K(padding_header)); - } - is_done = false; - // 预期replay的next_to_submit_lsn是padding尾 - EXPECT_EQ(iterator_end_lsn, rp_st->submit_log_task_.next_to_submit_lsn_); - // 先执行replay的flashback - EXPECT_EQ(OB_SUCCESS, rp_st->flashback()); - // replay执行flashback后,next_to_submit_lsn是padding尾 - EXPECT_EQ(iterator_end_lsn, rp_st->submit_log_task_.next_to_submit_lsn_); - switch_flashback_to_append(leader, mode_version); - iterator_end_lsn = LSN(1000000000); - EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1000, leader_idx, 100000)); - LSN max_lsn = leader.get_palf_handle_impl()->get_max_lsn(); - EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, max_lsn)); +} - while (!is_done) { - rp_sv.is_replay_done(ls_id, max_lsn, is_done); - usleep(10*1000); - CLOG_LOG(WARN, "not replay done", KPC(rp_st), K(padding_header)); - rp_st->trigger_fetch_log(); - } +TEST_F(TestObSimpleLogReplayFunc, test_wait_replay_done) +{ + SET_CASE_LOG_FILE(TEST_NAME, "test_wait_replay_done"); + const int64_t id = ATOMIC_AAF(&palf_id_, 1); + ObLSID ls_id(id); + int64_t leader_idx = 0; + LSN basic_lsn(0); + PalfHandleImplGuard leader; + share::SCN basic_scn = share::SCN::min_scn(); + CLOG_LOG(INFO, "test replay begin", K(id)); + OB_LOGGER.set_log_level("TRACE"); + EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, leader_idx, leader)); + MockLSAdapter ls_adapter; + ls_adapter.init((ObLSService *)(0x1)); + ObLogReplayService rp_sv; + ObReplayStatus *rp_st = NULL; + PalfEnv *palf_env; + EXPECT_EQ(OB_SUCCESS, get_palf_env(leader_idx, palf_env)); + rp_sv.init(palf_env, &ls_adapter, get_cluster()[0]->get_allocator()); + rp_sv.start(); + get_cluster()[0]->get_tenant_base()->update_thread_cnt(10); + LSN iterator_end_lsn(0); + LSN *iterator_end_lsn_ptr = &iterator_end_lsn; + auto get_file_end_lsn =[iterator_end_lsn_ptr]() { + CLOG_LOG(INFO, "get_file_end_lsn", K(*iterator_end_lsn_ptr)); + return *iterator_end_lsn_ptr; + }; + iterator_end_lsn = LSN(PALF_BLOCK_SIZE); + EXPECT_EQ(OB_SUCCESS, rp_sv.add_ls(ls_id, ObReplicaType::REPLICA_TYPE_FULL)); + EXPECT_EQ(OB_SUCCESS, rp_sv.enable(ls_id, basic_lsn, basic_scn)); + { + ObReplayStatusGuard guard; + EXPECT_EQ(OB_SUCCESS, rp_sv.get_replay_status_(ls_id, guard)); + rp_st = guard.get_replay_status(); + ls_adapter.rp_st_ = rp_st; + } + PalfBufferIterator &iterator = rp_st->submit_log_task_.iterator_; + iterator.iterator_storage_.get_file_end_lsn_ = get_file_end_lsn; + EXPECT_EQ(OB_SUCCESS, submit_log(leader, 31, leader_idx, MAX_LOG_BODY_SIZE)); + EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, leader.get_palf_handle_impl()->get_max_lsn())); + int64_t remained_log_size = LSN(PALF_BLOCK_SIZE) - leader.get_palf_handle_impl()->get_max_lsn() - sizeof(LogGroupEntryHeader) - sizeof(LogEntryHeader); + EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, remained_log_size)); + EXPECT_EQ(LSN(PALF_BLOCK_SIZE), leader.get_palf_handle_impl()->get_max_lsn()); + EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, leader.get_palf_handle_impl()->get_max_lsn())); + bool is_done = false; + // 无padding日志,committed位点是文件头 + while (!is_done) { + rp_sv.is_replay_done(ls_id, LSN(PALF_BLOCK_SIZE), is_done); + } + iterator_end_lsn = LSN(2*PALF_BLOCK_SIZE); + EXPECT_EQ(OB_SUCCESS, submit_log(leader, 31, leader_idx, MAX_LOG_BODY_SIZE)); + EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, leader.get_palf_handle_impl()->get_max_lsn())); + EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, MAX_LOG_BODY_SIZE)); + // 有padding日志,committed位点是文件头 + is_done =false; + while (!is_done) { + rp_sv.is_replay_done(ls_id, LSN(PALF_BLOCK_SIZE), is_done); } } } // unitest diff --git a/mittest/logservice/test_ob_simple_log_single_replica_func.cpp b/mittest/logservice/test_ob_simple_log_single_replica_func.cpp index 5a1dc67319..61c227f014 100644 --- a/mittest/logservice/test_ob_simple_log_single_replica_func.cpp +++ b/mittest/logservice/test_ob_simple_log_single_replica_func.cpp @@ -66,6 +66,35 @@ std::string ObSimpleLogClusterTestBase::test_name_ = TEST_NAME; bool ObSimpleLogClusterTestBase::need_add_arb_server_ = false; constexpr int64_t timeout_ts_us = 3 * 1000 * 1000; +void read_padding_entry(PalfHandleImplGuard &leader, SCN padding_scn, LSN padding_log_lsn) +{ + // 从padding group entry开始读取 + { + PalfBufferIterator iterator; + EXPECT_EQ(OB_SUCCESS, leader.get_palf_handle_impl()->alloc_palf_buffer_iterator(padding_log_lsn, iterator)); + EXPECT_EQ(OB_SUCCESS, iterator.next()); + LogEntry padding_log_entry; + LSN check_lsn; + EXPECT_EQ(OB_SUCCESS, iterator.get_entry(padding_log_entry, check_lsn)); + EXPECT_EQ(true, padding_log_entry.header_.is_padding_log_()); + EXPECT_EQ(true, padding_log_entry.check_integrity()); + EXPECT_EQ(padding_scn, padding_log_entry.get_scn()); + } + // 从padding log entry开始读取 + { + PalfBufferIterator iterator; + EXPECT_EQ(OB_SUCCESS, leader.get_palf_handle_impl()->alloc_palf_buffer_iterator(padding_log_lsn+LogGroupEntryHeader::HEADER_SER_SIZE, iterator)); + EXPECT_EQ(OB_SUCCESS, iterator.next()); + LogEntry padding_log_entry; + LSN check_lsn; + EXPECT_EQ(OB_SUCCESS, iterator.get_entry(padding_log_entry, check_lsn)); + EXPECT_EQ(true, padding_log_entry.header_.is_padding_log_()); + EXPECT_EQ(true, padding_log_entry.check_integrity()); + EXPECT_EQ(padding_scn, padding_log_entry.get_scn()); + } + +} + TEST_F(TestObSimpleLogClusterSingleReplica, update_disk_options) { SET_CASE_LOG_FILE(TEST_NAME, "update_disk_options"); @@ -242,11 +271,21 @@ TEST_F(TestObSimpleLogClusterSingleReplica, single_replica_flashback) remained_log_size = LSN(PALF_BLOCK_SIZE) - leader.palf_handle_impl_->sw_.get_max_lsn(); EXPECT_LT(remained_log_size, 5*1024); EXPECT_GT(remained_log_size, 0); - // 写一条大小为2KB的日志 + // 写一条大小为5KB的日志 + LSN padding_log_lsn = leader.get_palf_handle_impl()->get_max_lsn(); EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, 5*1024)); + EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, leader.get_palf_handle_impl()->get_max_lsn())); + // 验证读取padding是否成功 + { + share::SCN padding_scn = leader.get_palf_handle_impl()->get_max_scn(); + padding_scn = padding_scn.minus(padding_scn, 1); + read_padding_entry(leader, padding_scn, padding_log_lsn); + } PALF_LOG(INFO, "runlin trace print sw3", K(leader.palf_handle_impl_->sw_)); - // Padding日志预期不占用日志条数,因此存在33条日志 + // Padding日志占用日志条数,因此存在34条日志 EXPECT_EQ(OB_SUCCESS, get_middle_scn(33, leader, mid_scn, header)); + EXPECT_EQ(OB_SUCCESS, get_middle_scn(34, leader, mid_scn, header)); + EXPECT_EQ(OB_ITER_END, get_middle_scn(35, leader, mid_scn, header)); EXPECT_LT(LSN(PALF_BLOCK_SIZE), leader.palf_handle_impl_->sw_.get_max_lsn()); max_scn = leader.palf_handle_impl_->sw_.get_max_scn(); EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, leader.palf_handle_impl_->get_max_lsn())); @@ -257,16 +296,25 @@ TEST_F(TestObSimpleLogClusterSingleReplica, single_replica_flashback) PALF_LOG(INFO, "flashback to padding tail"); EXPECT_EQ(leader.palf_handle_impl_->get_max_lsn(), LSN(PALF_BLOCK_SIZE)); EXPECT_EQ(OB_ITER_END, read_log(leader)); - // flashback后存在32条日志 - EXPECT_EQ(OB_SUCCESS, get_middle_scn(32, leader, mid_scn, header)); - EXPECT_EQ(OB_ITER_END, get_middle_scn(33, leader, mid_scn, header)); + // flashback后存在33条日志(包含padding日志) + EXPECT_EQ(OB_SUCCESS, get_middle_scn(33, leader, mid_scn, header)); + EXPECT_EQ(OB_ITER_END, get_middle_scn(34, leader, mid_scn, header)); - // flashback到padding日志头部 + // 验证读取padding是否成功 + { + share::SCN padding_scn = leader.get_palf_handle_impl()->get_max_scn(); + padding_scn.minus(padding_scn, 1); + PALF_LOG(INFO, "begin read_padding_entry", K(padding_scn), K(padding_log_lsn)); + read_padding_entry(leader, padding_scn, padding_log_lsn); + } + + // flashback到padding日志头部,磁盘上还有32条日志 tmp_scn = leader.palf_handle_impl_->get_max_scn(); EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->flashback(mode_version, SCN::minus(tmp_scn, 1), timeout_ts_us)); EXPECT_LT(leader.palf_handle_impl_->get_max_lsn(), LSN(PALF_BLOCK_SIZE)); EXPECT_EQ(OB_SUCCESS, get_middle_scn(32, leader, mid_scn, header)); EXPECT_EQ(OB_ITER_END, get_middle_scn(33, leader, mid_scn, header)); + EXPECT_EQ(padding_log_lsn, leader.palf_handle_impl_->get_max_lsn()); switch_flashback_to_append(leader, mode_version); EXPECT_EQ(OB_SUCCESS, submit_log(leader, 10, leader_idx, 1000)); EXPECT_EQ(OB_SUCCESS, wait_lsn_until_flushed(LSN(PALF_BLOCK_SIZE), leader)); @@ -414,11 +462,11 @@ TEST_F(TestObSimpleLogClusterSingleReplica, single_replica_flashback_restart) { unittest::PalfHandleImplGuard leader; EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, leader_idx, leader)); - EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1000, leader_idx)); + EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1000, leader_idx, 1000)); LogEntryHeader header_origin; EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, leader.palf_handle_impl_->get_max_lsn())); EXPECT_EQ(OB_SUCCESS, get_middle_scn(323, leader, max_scn, header_origin)); - EXPECT_EQ(OB_SUCCESS, submit_log(leader, 100, leader_idx)); + EXPECT_EQ(OB_SUCCESS, submit_log(leader, 100, leader_idx, 1000)); wait_until_has_committed(leader, leader.palf_handle_impl_->sw_.get_max_lsn()); EXPECT_EQ(OB_ITER_END, read_log(leader)); switch_append_to_flashback(leader, mode_version); @@ -430,7 +478,7 @@ TEST_F(TestObSimpleLogClusterSingleReplica, single_replica_flashback_restart) EXPECT_EQ(header_origin.data_checksum_, header_new.data_checksum_); EXPECT_EQ(OB_ITER_END, get_middle_scn(324, leader, new_scn, header_new)); switch_flashback_to_append(leader, mode_version); - EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1000, leader_idx)); + EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1000, leader_idx, 1000)); EXPECT_EQ(OB_SUCCESS, get_middle_scn(1323, leader, new_scn, header_new)); EXPECT_EQ(OB_ITER_END, get_middle_scn(1324, leader, new_scn, header_new)); EXPECT_EQ(OB_ITER_END, read_log(leader)); @@ -444,7 +492,7 @@ TEST_F(TestObSimpleLogClusterSingleReplica, single_replica_flashback_restart) EXPECT_EQ(OB_SUCCESS, get_leader(id, new_leader, leader_idx)); EXPECT_EQ(OB_SUCCESS, new_leader.palf_handle_impl_->get_access_mode(curr_mode_version, curr_access_mode)); EXPECT_EQ(curr_mode_version, mode_version); - EXPECT_EQ(OB_SUCCESS, submit_log(new_leader, 1000, leader_idx)); + EXPECT_EQ(OB_SUCCESS, submit_log(new_leader, 1000, leader_idx, 1000)); wait_until_has_committed(new_leader, new_leader.palf_handle_impl_->sw_.get_max_lsn()); EXPECT_EQ(OB_ITER_END, read_log(new_leader)); ref_scn.convert_for_tx(1000); @@ -496,13 +544,56 @@ TEST_F(TestObSimpleLogClusterSingleReplica, single_replica_flashback_restart) EXPECT_EQ(OB_SUCCESS, restart_paxos_groups()); // 重启后继续提交日志 - PalfHandleImplGuard new_leader; - EXPECT_EQ(OB_SUCCESS, get_leader(id, new_leader, leader_idx)); - switch_flashback_to_append(new_leader, mode_version); - EXPECT_EQ(OB_SUCCESS, submit_log(new_leader, 100, leader_idx)); - wait_until_has_committed(new_leader, new_leader.palf_handle_impl_->sw_.get_max_lsn()); - EXPECT_EQ(OB_ITER_END, read_log(new_leader)); - new_leader.reset(); + { + PalfHandleImplGuard new_leader; + EXPECT_EQ(OB_SUCCESS, get_leader(id, new_leader, leader_idx)); + switch_flashback_to_append(new_leader, mode_version); + EXPECT_EQ(true, 0 == lsn_2_offset(new_leader.get_palf_handle_impl()->get_max_lsn(), PALF_BLOCK_SIZE)); + share::SCN padding_scn = new_leader.get_palf_handle_impl()->get_max_scn(); + EXPECT_EQ(OB_SUCCESS, submit_log(new_leader, 100, leader_idx)); + wait_until_has_committed(new_leader, new_leader.palf_handle_impl_->sw_.get_max_lsn()); + EXPECT_EQ(OB_ITER_END, read_log(new_leader)); + switch_append_to_flashback(new_leader, mode_version); + // flashback到padding日志头后重启 + EXPECT_EQ(OB_SUCCESS, new_leader.palf_handle_impl_->flashback(mode_version, padding_scn.minus(padding_scn, 1), timeout_ts_us)); + EXPECT_EQ(true, 0 != lsn_2_offset(new_leader.get_palf_handle_impl()->get_max_lsn(), PALF_BLOCK_SIZE)); + new_leader.reset(); + } + EXPECT_EQ(OB_SUCCESS, restart_paxos_groups()); + // 重启提交日志,不产生padding日志 + { + PalfHandleImplGuard new_leader; + EXPECT_EQ(OB_SUCCESS, get_leader(id, new_leader, leader_idx)); + LSN padding_start_lsn = new_leader.get_palf_handle_impl()->get_max_lsn(); + EXPECT_EQ(true, 0 != lsn_2_offset(new_leader.get_palf_handle_impl()->get_max_lsn(), PALF_BLOCK_SIZE)); + const int64_t remained_size = PALF_BLOCK_SIZE - lsn_2_offset(new_leader.get_palf_handle_impl()->get_max_lsn(), PALF_BLOCK_SIZE); + EXPECT_GE(remained_size, 0); + const int64_t group_entry_body_size = remained_size - LogGroupEntryHeader::HEADER_SER_SIZE - LogEntryHeader::HEADER_SER_SIZE; + PALF_LOG(INFO, "runlin trace print remained_size", K(remained_size), K(group_entry_body_size)); + switch_flashback_to_append(new_leader, mode_version); + EXPECT_EQ(OB_SUCCESS, submit_log(new_leader, 1, leader_idx, group_entry_body_size)); + EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(new_leader, new_leader.get_palf_handle_impl()->get_max_lsn())); + PalfBufferIterator iterator; + EXPECT_EQ(OB_SUCCESS, new_leader.get_palf_handle_impl()->alloc_palf_buffer_iterator(padding_start_lsn, iterator)); + EXPECT_EQ(OB_SUCCESS, iterator.next()); + LogEntry log_entry; + LSN check_lsn; + EXPECT_EQ(OB_SUCCESS, iterator.get_entry(log_entry, check_lsn)); + EXPECT_EQ(check_lsn, padding_start_lsn + LogGroupEntryHeader::HEADER_SER_SIZE); + EXPECT_EQ(false, log_entry.header_.is_padding_log_()); + EXPECT_EQ(true, log_entry.check_integrity()); + new_leader.reset(); + } + EXPECT_EQ(OB_SUCCESS, restart_paxos_groups()); + // 重启后继续提交日志 + { + PalfHandleImplGuard new_leader; + EXPECT_EQ(OB_SUCCESS, get_leader(id, new_leader, leader_idx)); + EXPECT_EQ(true, 0 == lsn_2_offset(new_leader.get_palf_handle_impl()->get_max_lsn(), PALF_BLOCK_SIZE)); + EXPECT_EQ(OB_SUCCESS, submit_log(new_leader, 100, leader_idx, 1000)); + EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(new_leader, new_leader.get_palf_handle_impl()->get_max_lsn())); + EXPECT_EQ(OB_ITER_END, read_log(new_leader)); + } delete_paxos_group(id); } @@ -1751,9 +1842,101 @@ TEST_F(TestObSimpleLogClusterSingleReplica, test_iterator_with_flashback) // 迭代新写入的日志成功 EXPECT_EQ(OB_SUCCESS, iterator.next(SCN::min_scn(), next_min_scn, iterate_end_by_replayable_point)); EXPECT_EQ(OB_ITER_END, iterator.next(SCN::min_scn())); - } + // 验证一条padding LogGroupEntry需要受控回放 + { + const int64_t append_id = ATOMIC_AAF(&palf_id_, 1); + PalfHandleImplGuard append_leader; + EXPECT_EQ(OB_SUCCESS, create_paxos_group(append_id, leader_idx, append_leader)); + EXPECT_EQ(OB_SUCCESS, submit_log(append_leader, 31, leader_idx, MAX_LOG_BODY_SIZE)); + const LSN padding_start_lsn = append_leader.get_palf_handle_impl()->get_max_lsn(); + EXPECT_EQ(OB_SUCCESS, submit_log(append_leader, 1, leader_idx, MAX_LOG_BODY_SIZE)); + EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(append_leader, append_leader.get_palf_handle_impl()->get_max_lsn())); + SCN padding_scn = append_leader.get_palf_handle_impl()->get_max_scn(); + padding_scn = padding_scn.minus(padding_scn, 1); + + const int64_t raw_write_id = ATOMIC_AAF(&palf_id_, 1); + PalfHandleImplGuard raw_write_leader; + EXPECT_EQ(OB_SUCCESS, create_paxos_group(raw_write_id, leader_idx, raw_write_leader)); + EXPECT_EQ(OB_SUCCESS, change_access_mode_to_raw_write(raw_write_leader)); + EXPECT_EQ(OB_ITER_END, read_and_submit_group_log(append_leader, raw_write_leader)); + EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(raw_write_leader, raw_write_leader.get_palf_handle_impl()->get_max_lsn())); + switch_append_to_flashback(raw_write_leader, mode_version); + + PalfBufferIterator buff_iterator; + PalfGroupBufferIterator group_buff_iterator; + PalfBufferIterator buff_iterator_padding_start; + PalfGroupBufferIterator group_buff_iterator_padding_start; + EXPECT_EQ(OB_SUCCESS, raw_write_leader.get_palf_handle_impl()->alloc_palf_buffer_iterator(LSN(0), buff_iterator)); + EXPECT_EQ(OB_SUCCESS, raw_write_leader.get_palf_handle_impl()->alloc_palf_group_buffer_iterator(LSN(0), group_buff_iterator)); + EXPECT_EQ(OB_SUCCESS, raw_write_leader.get_palf_handle_impl()->alloc_palf_buffer_iterator(LSN(0), buff_iterator_padding_start)); + EXPECT_EQ(OB_SUCCESS, raw_write_leader.get_palf_handle_impl()->alloc_palf_group_buffer_iterator(LSN(0), group_buff_iterator_padding_start)); + SCN next_min_scn; + bool iterate_end_by_replayable_point = false; + EXPECT_EQ(OB_ITER_END, buff_iterator.next(share::SCN::min_scn(), next_min_scn, iterate_end_by_replayable_point)); + EXPECT_EQ(true, iterate_end_by_replayable_point); + EXPECT_EQ(OB_ITER_END, group_buff_iterator.next(share::SCN::min_scn(), next_min_scn, iterate_end_by_replayable_point)); + EXPECT_EQ(true, iterate_end_by_replayable_point); + + // 一共有33条日志,包括padding + SCN replayable_point_scn = padding_scn.minus(padding_scn, 1); + // 直到padding日志受控回放 + int ret = OB_SUCCESS; + while (OB_SUCC(buff_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point))) { + } + ret = OB_SUCCESS; + while (OB_SUCC(buff_iterator_padding_start.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point))) { + } + EXPECT_EQ(OB_ITER_END, ret); + EXPECT_EQ(true, iterate_end_by_replayable_point); + EXPECT_EQ(next_min_scn, padding_scn); + ret = OB_SUCCESS; + while (OB_SUCC(group_buff_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point))) { + } + ret = OB_SUCCESS; + while (OB_SUCC(group_buff_iterator_padding_start.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point))) { + } + EXPECT_EQ(OB_ITER_END, ret); + EXPECT_EQ(true, iterate_end_by_replayable_point); + EXPECT_EQ(next_min_scn, padding_scn); + + EXPECT_EQ(false, buff_iterator.iterator_impl_.curr_entry_is_padding_); + EXPECT_EQ(false, group_buff_iterator.iterator_impl_.curr_entry_is_padding_); + // flashback到padding日志尾 + EXPECT_EQ(OB_SUCCESS, raw_write_leader.get_palf_handle_impl()->flashback(mode_version, padding_scn, timeout_ts_us)); + EXPECT_EQ(OB_SUCCESS, buff_iterator.next(padding_scn, next_min_scn, iterate_end_by_replayable_point)); + LogEntry padding_log_entry; + LSN padding_log_lsn; + EXPECT_EQ(OB_SUCCESS, buff_iterator.get_entry(padding_log_entry, padding_log_lsn)); + EXPECT_EQ(true, padding_log_entry.check_integrity()); + EXPECT_EQ(true, padding_log_entry.header_.is_padding_log_()); + EXPECT_EQ(padding_scn, padding_log_entry.header_.scn_); + EXPECT_EQ(false, buff_iterator.iterator_impl_.padding_entry_scn_.is_valid()); + + EXPECT_EQ(OB_SUCCESS, group_buff_iterator.next(padding_scn, next_min_scn, iterate_end_by_replayable_point)); + LogGroupEntry padding_group_entry; + LSN padding_group_lsn; + EXPECT_EQ(OB_SUCCESS, group_buff_iterator.get_entry(padding_group_entry, padding_group_lsn)); + EXPECT_EQ(true, padding_group_entry.check_integrity()); + EXPECT_EQ(true, padding_group_entry.header_.is_padding_log()); + // 对于LogGruopEntry的iterator,在construct_padding_log_entry_后,不会重置padding状态 + EXPECT_EQ(true, group_buff_iterator.iterator_impl_.padding_entry_scn_.is_valid()); + EXPECT_EQ(padding_log_entry.header_.scn_, padding_group_entry.header_.max_scn_); + // flashback到padding日志头 + EXPECT_EQ(OB_SUCCESS, raw_write_leader.get_palf_handle_impl()->flashback(mode_version, padding_scn.minus(padding_scn, 1), timeout_ts_us)); + // 预期是由于文件长度导致的OB_ITER_END + EXPECT_EQ(OB_ITER_END, buff_iterator_padding_start.next(padding_scn, next_min_scn, iterate_end_by_replayable_point)); + EXPECT_EQ(false, iterate_end_by_replayable_point); + EXPECT_GE(next_min_scn, buff_iterator_padding_start.iterator_impl_.prev_entry_scn_); + EXPECT_EQ(OB_ITER_END, group_buff_iterator_padding_start.next(padding_scn, next_min_scn, iterate_end_by_replayable_point)); + EXPECT_EQ(false, iterate_end_by_replayable_point); + EXPECT_GE(next_min_scn, group_buff_iterator_padding_start.iterator_impl_.prev_entry_scn_); + switch_flashback_to_append(raw_write_leader, mode_version); + EXPECT_EQ(OB_SUCCESS, submit_log(raw_write_leader, 100, leader_idx, 1000)); + EXPECT_EQ(OB_SUCCESS, buff_iterator_padding_start.next()); + EXPECT_EQ(OB_SUCCESS, group_buff_iterator_padding_start.next()); + } } TEST_F(TestObSimpleLogClusterSingleReplica, read_block_in_flashback) diff --git a/src/logservice/logrpc/ob_log_request_handler.cpp b/src/logservice/logrpc/ob_log_request_handler.cpp index 7660e889da..da46b2a64e 100644 --- a/src/logservice/logrpc/ob_log_request_handler.cpp +++ b/src/logservice/logrpc/ob_log_request_handler.cpp @@ -358,8 +358,6 @@ int LogRequestHandler::handle_request(const LogFlashbackMsg &re K(curr_access_mode), K(req)); } else if (OB_FAIL(palf_handle_guard.flashback(req.mode_version_, req.flashback_scn_, FLASHBACK_TIMEOUT_US))) { CLOG_LOG(WARN, "flashback failed", K(ret), K(palf_id), K(req)); - } else if (OB_FAIL(replay_srv->flashback(ls_id))) { - CLOG_LOG(WARN, "replay_service flashback failed", K(ret), K(ls_id)); } else if (OB_FAIL(get_rpc_proxy_(rpc_proxy))) { CLOG_LOG(WARN, "get_rpc_proxy_ failed", K(ret), K(palf_id)); } else if (OB_FAIL(get_self_addr_(self))) { diff --git a/src/logservice/ob_log_base_type.h b/src/logservice/ob_log_base_type.h index 84a6674174..65c10fe21f 100644 --- a/src/logservice/ob_log_base_type.h +++ b/src/logservice/ob_log_base_type.h @@ -77,6 +77,9 @@ enum ObLogBaseType ARBITRATION_SERVICE_LOG_BASE_TYPE = 21, HEARTBEAT_SERVICE_LOG_BASE_TYPE = 22, + + // for padding log entry + PADDING_LOG_BASE_TYPE = 23, // pay attention!!! // add log type in log_base_type_to_string // max value @@ -137,6 +140,8 @@ int log_base_type_to_string(const ObLogBaseType log_type, strncpy(str ,"ARBITRATION_SERVICE", str_len); } else if (log_type == HEARTBEAT_SERVICE_LOG_BASE_TYPE) { strncpy(str ,"HEARTBEAT_SERVICE", str_len); + } else if (log_type == PADDING_LOG_BASE_TYPE) { + strncpy(str ,"PADDING_LOG_ENTRY", str_len); } else { ret = OB_INVALID_ARGUMENT; } diff --git a/src/logservice/ob_ls_adapter.cpp b/src/logservice/ob_ls_adapter.cpp index f383543ce4..17cfc58859 100644 --- a/src/logservice/ob_ls_adapter.cpp +++ b/src/logservice/ob_ls_adapter.cpp @@ -68,6 +68,9 @@ int ObLSAdapter::replay(ObLogReplayTask *replay_task) } else if (OB_ISNULL(ls = ls_handle.get_ls())) { ret = OB_ERR_UNEXPECTED; CLOG_LOG(ERROR, " log stream not exist", KPC(replay_task), K(ret)); + } else if (ObLogBaseType::PADDING_LOG_BASE_TYPE == replay_task->log_type_) { + ret = OB_ERR_UNEXPECTED; + CLOG_LOG(ERROR, "padding log entry can't be replayed, unexpected error", KPC(replay_task)); } else if (OB_FAIL(ls->replay(replay_task->log_type_, replay_task->log_buf_, replay_task->log_size_, diff --git a/src/logservice/palf/log_entry.cpp b/src/logservice/palf/log_entry.cpp index a24baa20af..5cce3b89f8 100644 --- a/src/logservice/palf/log_entry.cpp +++ b/src/logservice/palf/log_entry.cpp @@ -11,10 +11,9 @@ */ #include "log_entry.h" -#include "lib/oblog/ob_log_module.h" // LOG* -#include "lib/ob_errno.h" // ERROR NUMBER -#include "lib/checksum/ob_crc64.h" // ob_crc64 - +#include "lib/oblog/ob_log_module.h" // LOG* +#include "lib/ob_errno.h" // ERROR NUMBER +#include "lib/checksum/ob_crc64.h" // ob_crc64 namespace oceanbase { namespace palf diff --git a/src/logservice/palf/log_entry_header.cpp b/src/logservice/palf/log_entry_header.cpp index 9af6aee76d..699328fc85 100644 --- a/src/logservice/palf/log_entry_header.cpp +++ b/src/logservice/palf/log_entry_header.cpp @@ -11,9 +11,10 @@ */ #include "log_entry_header.h" -#include "lib/checksum/ob_crc64.h" -#include "lib/checksum/ob_parity_check.h" // parity_check -#include "lib/ob_errno.h" +#include "lib/checksum/ob_crc64.h" // ob_crc64 +#include "lib/checksum/ob_parity_check.h" // parity_check +#include "lib/ob_errno.h" // errno +#include "logservice/ob_log_base_header.h" // ObLogBaseHeader namespace oceanbase { @@ -22,6 +23,7 @@ namespace palf { const int64_t LogEntryHeader::HEADER_SER_SIZE = sizeof(LogEntryHeader); +const int64_t LogEntryHeader::PADDING_LOG_ENTRY_SIZE = sizeof(LogEntryHeader) + sizeof(logservice::ObLogBaseHeader); LogEntryHeader::LogEntryHeader() : magic_(0), @@ -110,6 +112,53 @@ bool LogEntryHeader::check_header_checksum_() const return (header_checksum == saved_header_checksum); } +bool LogEntryHeader::is_padding_log_() const +{ + return (flag_ & PADDING_TYPE_MASK) > 0; +} + +// static member function +// the format of out_buf +// | LogEntryHeader | ObLogBaseHeader | +int LogEntryHeader::generate_padding_log_buf(const int64_t padding_data_len, + const share::SCN &scn, + char *out_buf, + const int64_t padding_valid_data_len) +{ + int ret = OB_SUCCESS; + LogEntryHeader header; + logservice::ObLogBaseHeader base_header(logservice::ObLogBaseType::PADDING_LOG_BASE_TYPE, + logservice::ObReplayBarrierType::NO_NEED_BARRIER, + 0); + const int64_t base_header_len = base_header.get_serialize_size(); + const int64_t header_len = header.get_serialize_size(); + const int64_t serialize_len = base_header_len + header_len; + int64_t serialize_header_pos = 0; + int64_t serialize_base_header_pos = serialize_header_pos + header_len; + if (padding_data_len <= 0 + || !scn.is_valid() + || NULL == out_buf + || padding_valid_data_len < serialize_len + || padding_data_len < padding_valid_data_len) { + ret = OB_INVALID_ARGUMENT; + PALF_LOG(WARN, "invalid argument", K(padding_data_len), K(scn), KP(out_buf), K(padding_valid_data_len)); + } else if(OB_FAIL(base_header.serialize(out_buf, padding_valid_data_len, serialize_base_header_pos))) { + PALF_LOG(WARN, "serailize ObLogBaseHeader failed", K(padding_data_len), KP(out_buf), K(padding_valid_data_len), + K(serialize_base_header_pos)); + } else if (FALSE_IT(serialize_base_header_pos = serialize_header_pos + header_len)) { + } else if (OB_FAIL(header.generate_padding_header_(out_buf+serialize_base_header_pos, + base_header_len, + padding_data_len, + scn))) { + PALF_LOG(WARN, "generaet LogEntryHeader failed", K(padding_data_len), K(scn), KP(out_buf), K(padding_valid_data_len)); + } else if (OB_FAIL(header.serialize(out_buf, header_len, serialize_header_pos))) { + PALF_LOG(WARN, "serialize LogEntryHeader failed", K(padding_data_len), K(scn), KP(out_buf), K(padding_valid_data_len)); + } else { + PALF_LOG(INFO, "generate_padding_log_buf success", K(header), K(padding_data_len), K(scn), KP(out_buf), K(padding_valid_data_len)); + } + return ret; +} + bool LogEntryHeader::check_header_integrity() const { return true == is_valid() && true == check_header_checksum_(); @@ -118,6 +167,8 @@ bool LogEntryHeader::check_header_integrity() const bool LogEntryHeader::check_integrity(const char *buf, const int64_t data_len) const { bool bool_ret = false; + // for padding log, only check integrity of ObLogBaseHeader + int64_t valid_data_len = is_padding_log_() ? sizeof(logservice::ObLogBaseHeader) : data_len; if (NULL == buf || data_len <= 0) { PALF_LOG_RET(WARN, OB_INVALID_ARGUMENT, "invalid arguments", KP(buf), K(data_len)); } else if (LogEntryHeader::MAGIC != magic_) { @@ -126,17 +177,40 @@ bool LogEntryHeader::check_integrity(const char *buf, const int64_t data_len) co } else if (false == check_header_checksum_()) { PALF_LOG_RET(WARN, OB_ERROR, "check header checsum failed", K(*this)); } else { - const int64_t tmp_data_checksum = common::ob_crc64(buf, data_len); + const int64_t tmp_data_checksum = common::ob_crc64(buf, valid_data_len); if (data_checksum_ == tmp_data_checksum) { bool_ret = true; } else { bool_ret = false; - PALF_LOG_RET(WARN, OB_ERR_UNEXPECTED, "data checksum mismatch", K_(data_checksum), K(tmp_data_checksum), K(data_len), KPC(this)); + PALF_LOG_RET(WARN, OB_ERR_UNEXPECTED, "data checksum mismatch", K_(data_checksum), K(tmp_data_checksum), K(data_len), + K(valid_data_len), KPC(this)); } } return bool_ret; } +int LogEntryHeader::generate_padding_header_(const char *log_data, + const int64_t base_header_len, + const int64_t padding_data_len, + const share::SCN &scn) +{ + int ret = OB_SUCCESS; + if (NULL == log_data || base_header_len <= 0 || padding_data_len <= 0 || !scn.is_valid()) { + ret = OB_INVALID_ARGUMENT; + } else { + magic_ = LogEntryHeader::MAGIC; + version_ = LogEntryHeader::LOG_ENTRY_HEADER_VERSION; + log_size_ = padding_data_len; + scn_ = scn; + data_checksum_ = common::ob_crc64(log_data, base_header_len); + flag_ = (flag_ | LogEntryHeader::PADDING_TYPE_MASK); + // update header checksum after all member vars assigned + (void) update_header_checksum_(); + PALF_LOG(INFO, "generate_padding_header_ success", KPC(this), K(log_data), K(base_header_len), K(padding_data_len)); + } + return ret; +} + DEFINE_SERIALIZE(LogEntryHeader) { int ret = OB_SUCCESS; diff --git a/src/logservice/palf/log_entry_header.h b/src/logservice/palf/log_entry_header.h index 268b5b8eef..65e8ca60bc 100644 --- a/src/logservice/palf/log_entry_header.h +++ b/src/logservice/palf/log_entry_header.h @@ -38,6 +38,18 @@ public: const share::SCN get_scn() const { return scn_; } int64_t get_data_checksum() const { return data_checksum_; } bool check_header_integrity() const; + + // @brief: generate padding log entry + // @param[in]: padding_data_len, the data len of padding entry(the group_size_ in LogGroupEntry + // - sizeof(LogEntryHeader)) + // @param[in]: scn, the SCN of padding log entry + // @param[in&out]: out_buf, out_buf just only include LogEntryHeader and ObLogBaseHeader + // @param[in]: padding_valid_data_len, the valid data len of padding LogEntry(just only include + // LogEntryHeader and ObLogBaseHeader). + static int generate_padding_log_buf(const int64_t padding_data_len, + const share::SCN &scn, + char *out_buf, + const int64_t padding_valid_data_len); NEED_SERIALIZE_AND_DESERIALIZE; TO_STRING_KV("magic", magic_, "version", version_, @@ -48,12 +60,20 @@ public: public: static constexpr int16_t MAGIC = 0x4C48; // 'LH' means LOG ENTRY HEADER static const int64_t HEADER_SER_SIZE; + static const int64_t PADDING_LOG_ENTRY_SIZE; private: bool get_header_parity_check_res_() const; void update_header_checksum_(); bool check_header_checksum_() const; + bool is_padding_log_() const; + + int generate_padding_header_(const char *log_data, + const int64_t base_header_len, + const int64_t padding_data_len, + const share::SCN &scn); private: static constexpr int16_t LOG_ENTRY_HEADER_VERSION = 1; + static constexpr int64_t PADDING_TYPE_MASK = 1 << 1; private: int16_t magic_; int16_t version_; diff --git a/src/logservice/palf/log_group_buffer.cpp b/src/logservice/palf/log_group_buffer.cpp index 9d88648715..79662366e6 100644 --- a/src/logservice/palf/log_group_buffer.cpp +++ b/src/logservice/palf/log_group_buffer.cpp @@ -201,7 +201,6 @@ int LogGroupBuffer::fill(const LSN &lsn, LSN start_lsn, reuse_lsn; get_buffer_start_lsn_(start_lsn); get_reuse_lsn_(reuse_lsn); - const int64_t reserved_buf_size = get_reserved_buffer_size(); const int64_t available_buf_size = get_available_buffer_size(); if (IS_NOT_INIT) { ret = OB_NOT_INIT; @@ -219,33 +218,27 @@ int LogGroupBuffer::fill(const LSN &lsn, // double check: 要填充的终点超过了buffer可复用的范围 ret = OB_EAGAIN; PALF_LOG(WARN, "end_lsn is greater than reuse end pos", K(ret), K(lsn), K(end_lsn), K(reuse_lsn), K(available_buf_size)); - } else if (OB_FAIL(get_buffer_pos_(lsn, start_pos))) { - PALF_LOG(WARN, "get_buffer_pos_ failed", K(ret), K(lsn)); + } else if (OB_FAIL(fill_(lsn, data, data_len))) { + PALF_LOG(WARN, "fill data failed", K(lsn), K(data_len), KP(data_buf_)); } else { - const int64_t group_buf_tail_len = reserved_buf_size - start_pos; - int64_t first_part_len = min(group_buf_tail_len, data_len); - memcpy(data_buf_ + start_pos, data, first_part_len); - if (data_len > first_part_len) { - // seeking to buffer's beginning - memcpy(data_buf_, data + first_part_len, data_len - first_part_len); - } - PALF_LOG(TRACE, "fill group buffer success", K(ret), K(lsn), K(data_len), K(start_pos), K(group_buf_tail_len), - K(first_part_len), "second_part_len", data_len - first_part_len, KP(data_buf_)); + PALF_LOG(TRACE, "fill group buffer success", K(ret), K(lsn), K(data_len), KP(data_buf_)); } return ret; } int LogGroupBuffer::fill_padding_body(const LSN &lsn, + const char *data, + const int64_t data_len, const int64_t log_body_size) { int ret = OB_SUCCESS; - int64_t start_pos = 0; const LSN end_lsn = lsn + log_body_size; LSN start_lsn, reuse_lsn; get_buffer_start_lsn_(start_lsn); get_reuse_lsn_(reuse_lsn); - const int64_t reserved_buf_size = get_reserved_buffer_size(); const int64_t available_buf_size = get_available_buffer_size(); + const int64_t reserved_buf_size = get_reserved_buffer_size(); + int64_t start_pos = 0; if (IS_NOT_INIT) { ret = OB_NOT_INIT; } else if (!lsn.is_valid() || log_body_size <= 0) { @@ -266,6 +259,7 @@ int LogGroupBuffer::fill_padding_body(const LSN &lsn, } else if (OB_FAIL(get_buffer_pos_(lsn, start_pos))) { PALF_LOG(WARN, "get_buffer_pos_ failed", K(ret), K(lsn)); } else { + // reset data to zero firstly. const int64_t group_buf_tail_len = reserved_buf_size - start_pos; int64_t first_part_len = min(group_buf_tail_len, log_body_size); memset(data_buf_ + start_pos, PADDING_LOG_CONTENT_CHAR, first_part_len); @@ -273,8 +267,15 @@ int LogGroupBuffer::fill_padding_body(const LSN &lsn, // seeking to buffer's beginning memset(data_buf_, PADDING_LOG_CONTENT_CHAR, log_body_size - first_part_len); } - PALF_LOG(INFO, "fill padding body success", K(ret), K(lsn), K(log_body_size), K(start_pos), K(group_buf_tail_len), - K(first_part_len), "second_part_len", log_body_size - first_part_len); + // fill valid padding data. + if (OB_FAIL(fill_(lsn, data, data_len))) { + PALF_LOG(WARN, "fill padding data filled", K(ret), K(lsn), K(log_body_size), K(start_pos), K(data_len), + K(group_buf_tail_len), K(first_part_len), "second_part_len", data_len - first_part_len); + } else { + PALF_LOG(INFO, "fill padding log success", K(ret), K(lsn), K(log_body_size), K(start_pos), K(data_len), + K(group_buf_tail_len), K(first_part_len), "second_part_len", data_len - first_part_len); + } + } return ret; } @@ -395,5 +396,28 @@ int LogGroupBuffer::set_reuse_lsn(const LSN &new_reuse_lsn) } return ret; } + +int LogGroupBuffer::fill_(const LSN &lsn, + const char *data, + const int64_t data_len) +{ + int ret = OB_SUCCESS; + int64_t start_pos = 0; + const int64_t reserved_buf_size = get_reserved_buffer_size(); + if (OB_FAIL(get_buffer_pos_(lsn, start_pos))) { + PALF_LOG(WARN, "get_buffer_pos_ failed", K(ret), K(lsn)); + } else { + const int64_t group_buf_tail_len = reserved_buf_size - start_pos; + int64_t first_part_len = min(group_buf_tail_len, data_len); + memcpy(data_buf_ + start_pos, data, first_part_len); + if (data_len > first_part_len) { + // seeking to buffer's beginning + memcpy(data_buf_, data+first_part_len, data_len - first_part_len); + } + PALF_LOG(TRACE, "fill data success", K(ret), K(lsn), K(data_len), K(start_pos), K(group_buf_tail_len), + K(first_part_len), "second_part_len", data_len - first_part_len); + } + return ret; +} } // namespace palf } // namespace oceanbase diff --git a/src/logservice/palf/log_group_buffer.h b/src/logservice/palf/log_group_buffer.h index 761bb453ef..dd265be267 100644 --- a/src/logservice/palf/log_group_buffer.h +++ b/src/logservice/palf/log_group_buffer.h @@ -47,6 +47,8 @@ public: const char *data, const int64_t data_len); int fill_padding_body(const LSN &lsn, + const char *data, + const int64_t data_len, const int64_t log_body_size); int get_log_buf(const LSN &lsn, const int64_t total_len, LogWriteBuf &log_buf); bool can_handle_new_log(const LSN &lsn, @@ -70,6 +72,9 @@ private: int get_buffer_pos_(const LSN &lsn, int64_t &start_pos) const; void get_buffer_start_lsn_(LSN &start_lsn) const; void get_reuse_lsn_(LSN &reuse_lsn) const; + int fill_(const LSN &lsn, + const char *data, + const int64_t data_len); private: // buffer起始位置对应的lsn LSN start_lsn_; diff --git a/src/logservice/palf/log_group_entry.h b/src/logservice/palf/log_group_entry.h index bcb09d0f93..8a48110340 100644 --- a/src/logservice/palf/log_group_entry.h +++ b/src/logservice/palf/log_group_entry.h @@ -40,8 +40,7 @@ public: bool check_integrity() const; bool check_integrity(int64_t &data_checksum) const; int64_t get_header_size() const { return header_.get_serialize_size(); } - int64_t get_payload_offset() const { return header_.get_serialize_size() + - (header_.is_padding_log() ? header_.get_data_len() : 0); } + int64_t get_payload_offset() const { return header_.get_serialize_size(); } int64_t get_data_len() const { return header_.get_data_len(); } // return total size of header and body, including the length of padding log int64_t get_group_entry_size() const { return header_.get_serialize_size() + diff --git a/src/logservice/palf/log_iterator_impl.h b/src/logservice/palf/log_iterator_impl.h index c1f4ac9d58..46c33e78de 100644 --- a/src/logservice/palf/log_iterator_impl.h +++ b/src/logservice/palf/log_iterator_impl.h @@ -18,16 +18,16 @@ #include "lib/alloc/alloc_assist.h" #include "lib/utility/ob_utility.h" #include "lib/utility/ob_macro_utils.h" -#include "lib/utility/ob_print_utils.h" // TO_STRING_KV -#include "log_define.h" // LogItemType -#include "log_block_header.h" // LogBlockHeader -#include "lsn.h" // LSN -#include "log_reader_utils.h" // ReadBuf -#include "log_entry.h" // LogEntry -#include "log_group_entry.h" // LogGroupEntry -#include "log_meta_entry.h" // LogMetaEntry -#include "log_iterator_storage.h" // LogIteratorStorage -#include "log_checksum.h" // LogChecksum +#include "lib/utility/ob_print_utils.h" // TO_STRING_KV +#include "log_define.h" // LogItemType +#include "log_block_header.h" // LogBlockHeader +#include "lsn.h" // LSN +#include "log_reader_utils.h" // ReadBuf +#include "log_entry.h" // LogEntry +#include "log_group_entry.h" // LogGroupEntry +#include "log_meta_entry.h" // LogMetaEntry +#include "log_iterator_storage.h" // LogIteratorStorage +#include "log_checksum.h" // LogChecksum namespace oceanbase { @@ -47,8 +47,8 @@ enum class LogEntryType // =========== LogEntryType end ============= enum class IterateEndReason { - DUE_TO_REPLAYBLE_POINT_SCN_LOG_GROUP_ENTRY = 0, - DUE_TO_REPLAYBLE_POINT_SCN_LOG_ENTRY = 1, + DUE_TO_REPLAYABLE_POINT_SCN_LOG_GROUP_ENTRY = 0, + DUE_TO_REPLAYABLE_POINT_SCN_LOG_ENTRY = 1, DUE_TO_FILE_END_LSN_NOT_READ_NEW_DATA = 2, DUE_TO_FILE_END_LSN_READ_NEW_DATA = 3, MAX_TYPE = 4 @@ -74,8 +74,8 @@ struct IterateEndInfo { } bool is_iterate_end_by_replayable_point_scn() const { - return IterateEndReason::DUE_TO_REPLAYBLE_POINT_SCN_LOG_ENTRY == reason_ - || IterateEndReason::DUE_TO_REPLAYBLE_POINT_SCN_LOG_GROUP_ENTRY == reason_; + return IterateEndReason::DUE_TO_REPLAYABLE_POINT_SCN_LOG_ENTRY == reason_ + || IterateEndReason::DUE_TO_REPLAYABLE_POINT_SCN_LOG_GROUP_ENTRY == reason_; } IterateEndReason reason_; SCN log_scn_; @@ -101,18 +101,18 @@ public: // - has iterated to the end of block. // OB_NEED_RETRY // - the data in cache is not integrity, and the integrity data has been truncate from disk, - // need read data from storage eagin.(data in cache will not been clean up, therefore, + // need read data from storage again.(data in cache will not been clean up, therefore, // user need used a new iterator to read data again) // - if the end_lsn get from get_file_end_lsn is smaller than 'log_tail_' of LogStorage, and it's - // not the exact boundary of LogGroupEntry(for PalgGroupeBufferIterator, or LogEntry for PalfBufferIterator), + // not the exact boundary of LogGroupEntry(for PalfGroupeBufferIterator, or LogEntry for PalfBufferIterator), // OB_NEED_RETRY may be return. // - if read_data_from_storage_ is concurrent with the last step of flashback, opening last block on disk may be failed // due to rename, return OB_NEED_RETRY in this case.(TODO by runlin: retry by myself) // OB_ERR_OUT_LOWER_BOUND // - block has been recycled // OB_CHECKSUM_ERROR - // - the accumlate checksum calc by accum_checksum_ and the data checksum of LogGroupEntry is not - // same as the accumlate checksum of LogGroupEntry + // - the accumulate checksum calc by accum_checksum_ and the data checksum of LogGroupEntry is not + // same as the accumulate checksum of LogGroupEntry int next(const share::SCN &replayable_point_scn); // param[in] replayable point scn, iterator will ensure that no log will return when the log scn is greater than @@ -124,7 +124,7 @@ public: // OB_INVALID_DATA. // OB_ITER_END, has iterated to the end of block. // OB_NEED_RETRY, the data in cache is not integrity, and the integrity data has been truncate from disk, - // need read data from storage eagin.(data in cache will not been clean up, therefore, + // need read data from storage again.(data in cache will not been clean up, therefore, // user need used a new iterator to read data again) // OB_ERR_OUT_LOWER_BOUND, block has been recycled // @@ -147,11 +147,12 @@ public: TO_STRING_KV(KP(buf_), K_(next_round_pread_size), K_(curr_read_pos), K_(curr_read_buf_start_pos), K_(curr_read_buf_end_pos), KPC(log_storage_), K_(curr_entry_is_raw_write), K_(curr_entry_size), - K_(prev_entry_scn), K_(curr_entry), K_(init_mode_version), K_(accumlate_checksum)); + K_(prev_entry_scn), K_(curr_entry), K_(init_mode_version), K_(accumulate_checksum), + K_(curr_entry_is_padding), K_(padding_entry_size), K_(padding_entry_scn)); private: // @brief get next entry from data storage or cache. - // @rtval + // @retval // OB_SUCCESS // OB_INVALID_DATA // OB_ITER_END @@ -171,7 +172,7 @@ private: // OB_INVALID_DATA // -- means log entry is not integrity, need check this log entry whether is the last one. // OB_CHECKSUM_ERROR - // -- means accumlate checksum is not matched. + // -- means accumulate checksum is not matched. // OB_ITER_END // -- means log entry is iterated end by replayable_point_scn int parse_one_entry_(const SCN &replayable_point_scn, @@ -199,11 +200,13 @@ private: const bool matched_type = std::is_same::value; int64_t pos = curr_read_pos_; if (true == matched_type) { - if (OB_FAIL(curr_entry_.deserialize(buf_, curr_read_buf_end_pos_, pos))) { + if (curr_entry_is_padding_ && OB_FAIL(construct_padding_log_entry_(pos, padding_entry_size_))) { + PALF_LOG(WARN, "construct_padding_log_entry_ failed", KPC(this)); + } else if (OB_FAIL(curr_entry_.deserialize(buf_, curr_read_buf_end_pos_, pos))) { } else if (curr_entry_is_raw_write_ && curr_entry_.get_scn() > replayable_point_scn) { ret = OB_ITER_END; info.log_scn_ = curr_entry_.get_scn(); - info.reason_ = IterateEndReason::DUE_TO_REPLAYBLE_POINT_SCN_LOG_ENTRY; + info.reason_ = IterateEndReason::DUE_TO_REPLAYABLE_POINT_SCN_LOG_ENTRY; PALF_LOG(TRACE, "iterate end by replayable_point", KPC(this), K(replayable_point_scn), K(info)); } } else { @@ -215,7 +218,7 @@ private: // When entry in file is LogGroupEntry, handle it specifically. // 1. for raw write LogGroupEntry, if it's controlled by replayable_point_scn, no need update - // 'curr_read_pos_', 'accum_checksum_', because this log may be flashbacked. + // 'curr_read_pos_', 'accum_checksum_', because this log may be flashback. // 2. for append LogGroupEntry, handle it normally. int parse_log_group_entry_(const SCN &replayable_point_scn, IterateEndInfo &info) @@ -265,8 +268,8 @@ private: template <> // When T is LogGroupEntry, need do: - // 1. check accumlate checksum: - // - if accumlate checksum is not match, return OB_CHECKSUM_ERROR + // 1. check accumulate checksum: + // - if accumulate checksum is not match, return OB_CHECKSUM_ERROR // - if data checksum is not match, return OB_INVALID_DATA // 2. check this entry whether need control by 'replayable_point_scn': // - if control by 'replayable_point_scn', return OB_ITER_END, and don't modify @@ -278,14 +281,14 @@ private: { int ret = OB_SUCCESS; bool curr_entry_is_raw_write = entry.get_header().is_raw_write(); - int64_t new_accumlate_checksum = -1; + int64_t new_accumulate_checksum = -1; PALF_LOG(TRACE, "T is LogGroupEntry", K(entry)); - if (OB_FAIL(verify_accum_checksum_(entry, new_accumlate_checksum))) { + if (OB_FAIL(verify_accum_checksum_(entry, new_accumulate_checksum))) { PALF_LOG(WARN, "verify_accum_checksum_ failed", K(ret), KPC(this), K(entry)); // NB: when current entry is raw write, and the log scn of current entry is greater than // replayable_point_scn, this log may be clean up, therefore we can not update several fields of - // LogIteratorImpl, return OB_ITER_END directlly, otherwise, we may not parse new LogGroupEntryHeader - // after flashback position, this will cause one log which is append, but control by replable_point_scn. + // LogIteratorImpl, return OB_ITER_END directly, otherwise, we may not parse new LogGroupEntryHeader + // after flashback position, this will cause one log which is append, but control by replayable_point_scn. // // NB: we need check the min scn of LogGroupEntry whether has been greater than // replayable_point_scn: @@ -313,7 +316,7 @@ private: } else if ((is_group_iterator && entry.get_scn() > replayable_point_scn) || (!is_group_iterator && min_scn > replayable_point_scn)) { info.log_scn_ = min_scn; - info.reason_ = IterateEndReason::DUE_TO_REPLAYBLE_POINT_SCN_LOG_GROUP_ENTRY; + info.reason_ = IterateEndReason::DUE_TO_REPLAYABLE_POINT_SCN_LOG_GROUP_ENTRY; ret = OB_ITER_END; PALF_LOG(TRACE, "iterate end by replayable_point", K(ret), KPC(this), K(min_scn), K(entry), K(replayable_point_scn), K(info), K(is_group_iterator)); @@ -322,20 +325,42 @@ private: } if (OB_SUCC(ret)) { curr_entry_is_raw_write_ = entry.get_header().is_raw_write(); - accumlate_checksum_ = new_accumlate_checksum; + accumulate_checksum_ = new_accumulate_checksum; + // To support get PADDING entry, need record the meta info of PADDING entry + set_padding_info_(entry); } return ret; } - // @brief: accumlate checksum verify, only verify checkum when accum_checksum_ is not -1. + // @brief: accumulate checksum verify, only verify checksum when accum_checksum_ is not -1. // ret val: // OB_SUCCESS // OB_CHECKSUM_ERROR int verify_accum_checksum_(const LogGroupEntry &entry, - int64_t &new_accumlate_checksum); + int64_t &new_accumulate_checksum); + int construct_padding_log_entry_(const int64_t memset_start_pos, + const int64_t padding_log_entry_len); + bool is_padding_entry_end_lsn_(const LSN &lsn) + { + return 0 == lsn_2_offset(lsn, PALF_BLOCK_SIZE); + } + + void set_padding_info_(const LogGroupEntry &entry) + { + curr_entry_is_padding_ = entry.get_header().is_padding_log(); + padding_entry_size_ = entry.get_header().get_data_len(); + padding_entry_scn_ = entry.get_header().get_max_scn(); + } + + void reset_padding_info_() + { + curr_entry_is_padding_ = false; + padding_entry_size_ = 0; + padding_entry_scn_.reset(); + } private: static constexpr int MAX_READ_TIMES_IN_EACH_NEXT = 2; - // In each `next_entry` round, need read data from `LogStorage` directlly, + // In each `next_entry` round, need read data from `LogStorage` directly, // to amortized reading cost, use `read_buf` to cache the last read result. // // NB: each log must not exceed than 2MB + 4KB. @@ -371,7 +396,11 @@ static constexpr int MAX_READ_TIMES_IN_EACH_NEXT = 2; // share::SCN prev_entry_scn_; GetModeVersion get_mode_version_; - int64_t accumlate_checksum_; + int64_t accumulate_checksum_; + // To support get PADDING ENTRY, add following fields: + int64_t curr_entry_is_padding_; + int64_t padding_entry_size_; + SCN padding_entry_scn_; bool is_inited_; }; @@ -388,7 +417,10 @@ LogIteratorImpl::LogIteratorImpl() curr_entry_size_(0), init_mode_version_(0), prev_entry_scn_(), - accumlate_checksum_(-1), + accumulate_checksum_(-1), + curr_entry_is_padding_(false), + padding_entry_size_(0), + padding_entry_scn_(), is_inited_(false) { } @@ -418,7 +450,10 @@ int LogIteratorImpl::init(const GetModeVersion &get_mode_version, init_mode_version_ = PALF_INITIAL_PROPOSAL_ID; get_mode_version_ = get_mode_version; prev_entry_scn_.reset(); - accumlate_checksum_ = -1; + accumulate_checksum_ = -1; + curr_entry_is_padding_ = false; + padding_entry_size_ = 0; + padding_entry_scn_.reset(); is_inited_ = true; PALF_LOG(TRACE, "LogIteratorImpl init success", K(ret), KPC(this)); } @@ -437,7 +472,10 @@ void LogIteratorImpl::reuse() curr_entry_size_ = 0; init_mode_version_ = PALF_INITIAL_PROPOSAL_ID; prev_entry_scn_.reset(); - accumlate_checksum_ = -1; + accumulate_checksum_ = -1; + curr_entry_is_padding_ = false; + padding_entry_size_ = 0; + padding_entry_scn_.reset(); } template @@ -445,7 +483,10 @@ void LogIteratorImpl::destroy() { if (IS_INIT) { is_inited_ = false; - accumlate_checksum_ = -1; + padding_entry_scn_.reset(); + padding_entry_size_ = 0; + curr_entry_is_padding_ = false; + accumulate_checksum_ = -1; prev_entry_scn_.reset(); init_mode_version_ = PALF_INITIAL_PROPOSAL_ID; curr_entry_size_ = 0; @@ -497,12 +538,12 @@ int LogIteratorImpl::get_next_entry_(const SCN &replayable_point_scn, // // For example, the end lsn of this log is 64MB, the start lsn of this log // is 62M, the end lsn of 'read_buf_' is 63MB, however, this log has been - // truncate from disk, and then new log which length is 1.5MB has writen, + // truncate from disk, and then new log which length is 1.5MB has written, // the log tail is 63.5MB. even if we read data from storage, the data is // always not integrity. // // We can limit the number of disk reads to 2, the reason is that: when - // we pasrs a PADDING entry with PalfBufferIterator, we need read data + // we parse a PADDING entry with PalfBufferIterator, we need read data // from disk again. // int read_times = 0; @@ -568,9 +609,9 @@ int LogIteratorImpl::next(const share::SCN &replayable_point_scn, // NB: when return OB_ITER_END, we need try to clean up cache, and we should clean up cache only when // the log ts of curr entry is greater than 'replayable_point_scn', otherwise, we would return some logs - // which has been flasback, consider following case: + // which has been flashback, consider following case: // 1. T1, 'replayable_point_scn' is 10, the log ts of curr entry is 15, but there is no flashback option.(no any bad effect) - // 2. T2, 'replayable_point_scn' is 10, the logs on disk which the log ts after 10 has been flashbacked, and + // 2. T2, 'replayable_point_scn' is 10, the logs on disk which the log ts after 10 has been flashback, and // return OB_ITER_END because of 'file end lsn'.(no any bad effect) // 3. T3, 'replayable_point_scn' has been advanced to 16, and write several logs on disk, however, the cache // of iterator has not been clean up, the old logs will be returned.(bad effect) @@ -579,7 +620,7 @@ int LogIteratorImpl::next(const share::SCN &replayable_point_scn, (void) try_clean_up_cache_(); if (!replayable_point_scn.is_valid()) { ret = OB_INVALID_ARGUMENT; - PALF_LOG(WARN, "invalid argumetn", K(replayable_point_scn), KPC(this)); + PALF_LOG(WARN, "invalid argument", K(replayable_point_scn), KPC(this)); } else if (OB_FAIL(get_next_entry_(replayable_point_scn, info))) { // NB: if the data which has been corrupted, clean cache. // NB: if the accum_checksum_ is not match, return OB_CHECKSUM_ERROR. @@ -609,11 +650,11 @@ int LogIteratorImpl::next(const share::SCN &replayable_point_scn, // 1. if 'curr_entry_' iterate end by replayable point scn: // we should set next_min_scn to std::min(replayable_point_scn+1, the log scn of 'curr_entry_'), // however, replayable_point_scn may be smaller than 'prev_entry_scn_'(for example, the log - // entry correspond to'prev_entry_scn_' was writen by APPEND, its' scn may be greater than - // replayable_point_scn), we shoud set next_min_scn to std::max( + // entry correspond to'prev_entry_scn_' was written by APPEND, its' scn may be greater than + // replayable_point_scn), we should set next_min_scn to std::max( // std::min(replayable_point_scn + 1, the log scn of 'curr_entry_'), 'prev_entry_scn_' + 1). // - // 2. oterwise, iterate end by file end lsn: + // 2. otherwise, iterate end by file end lsn: // - case 1: if 'curr_entry_' has been parsed from LogIteratorStorage, however, it's not readable // due to file end lsn(consider that someone set the file end lsn to the middle of one // LogGroupEntry), we should set next_min_scn to the max of min(curr_entry_'s scn, @@ -641,7 +682,7 @@ int LogIteratorImpl::next(const share::SCN &replayable_point_scn, // consider follower case: // T1, iterate an entry successfully, and make prev_entry_scn to 5; // T2, iterate control by readable point scn, scn of 'curr_entry_' is 10, and readable point scn is 8. - // T3, the logs after 8 have been flashabcked, and no log has been writen. + // T3, the logs after 8 have been flashback, and no log has been written. // if we don't advance 'prev_entry_scn_' to 8, the continuous point of replay service can not update // to 8. if (info.log_scn_ > replayable_point_scn) { @@ -653,7 +694,6 @@ int LogIteratorImpl::next(const share::SCN &replayable_point_scn, next_min_scn = prev_entry_scn_.is_valid() ? SCN::plus(prev_entry_scn_, 1) : SCN::min_scn(); PALF_LOG(TRACE, "update next_min_scn to prev_entry_scn_ + 1", KPC(this), K(info), K(replayable_point_scn)); } else { - } } if (OB_FAIL(ret)) { @@ -674,6 +714,12 @@ int LogIteratorImpl::get_entry(ENTRY &entry, LSN &lsn, bool &is_raw_write } else if (OB_FAIL(entry.shallow_copy(curr_entry_))) { ret = OB_ERR_UNEXPECTED; PALF_LOG(ERROR, "shallow_copy failed", K(ret), KPC(this)); + // If the 'start_lsn' of PalfBufferIterator is not pointed to LogGroupEntry, + // before iterate next LogGroupEntry, the LogEntry will not be checked integrity, + // therefore, when accumulate_checksum_ is -1, check the integrity of entry. + } else if (-1 == accumulate_checksum_ && !entry.check_integrity()) { + ret = OB_INVALID_DATA; + PALF_LOG(WARN, "invalid data", K(ret), KPC(this), K(entry)); } else { lsn = log_storage_->get_lsn(curr_read_pos_); is_raw_write = curr_entry_is_raw_write_; @@ -683,7 +729,7 @@ int LogIteratorImpl::get_entry(ENTRY &entry, LSN &lsn, bool &is_raw_write template int LogIteratorImpl::verify_accum_checksum_(const LogGroupEntry &entry, - int64_t &new_accumlate_checksum) + int64_t &new_accumulate_checksum) { int ret = OB_SUCCESS; int64_t data_checksum = -1; @@ -691,13 +737,13 @@ int LogIteratorImpl::verify_accum_checksum_(const LogGroupEntry &entry, if (!entry.check_integrity(data_checksum)) { ret = OB_INVALID_DATA; PALF_LOG(WARN, "invalid data", K(ret), KPC(this), K(entry)); - } else if (-1 == accumlate_checksum_) { - new_accumlate_checksum = expected_verify_checksum; - PALF_LOG(INFO, "init accumlate_checksum to first LogGroupEntry", K(entry), KPC(this), - K(new_accumlate_checksum)); + } else if (-1 == accumulate_checksum_) { + new_accumulate_checksum = expected_verify_checksum; + PALF_LOG(INFO, "init accumulate_checksum to first LogGroupEntry", K(entry), KPC(this), + K(new_accumulate_checksum)); } else if (OB_FAIL(LogChecksum::verify_accum_checksum( - accumlate_checksum_, data_checksum, - expected_verify_checksum, new_accumlate_checksum))) { + accumulate_checksum_, data_checksum, + expected_verify_checksum, new_accumulate_checksum))) { PALF_LOG(WARN, "verify accumlate checksum failed", K(ret), KPC(this), K(entry)); } else { PALF_LOG(TRACE, "verify_accum_checksum_ success", K(ret), KPC(this), K(entry)); @@ -705,6 +751,47 @@ int LogIteratorImpl::verify_accum_checksum_(const LogGroupEntry &entry, return ret; } +template +int LogIteratorImpl::construct_padding_log_entry_(const int64_t memset_start_pos, + const int64_t padding_log_entry_len) +{ + int ret = OB_SUCCESS; + LSN padding_log_entry_start_lsn = log_storage_->get_lsn(memset_start_pos); + // defense code + if (!curr_entry_is_padding_) { + ret = OB_ERR_UNEXPECTED; + PALF_LOG(ERROR, "only call this function when LogGroupEntry is padding", KPC(this)); + } else if (!is_padding_entry_end_lsn_(padding_log_entry_start_lsn + padding_log_entry_len)) { + ret = OB_ERR_UNEXPECTED; + PALF_LOG(ERROR, "unexpected error, the end lsn of padding log entry is not the header of nexet block!!!", + KPC(this), K(memset_start_pos), K(padding_log_entry_len)); + } else if (memset_start_pos + padding_log_entry_len > curr_read_buf_end_pos_) { + ret = OB_ERR_UNEXPECTED; + PALF_LOG(ERROR, "unexpected error, the end pos of 'curr_read_buf_' is not enough!!!", + KPC(this), K(memset_start_pos), K(padding_log_entry_len)); + } else { + // set memory which contained padding log entry to PADDING_LOG_CONTENT_CHAR + memset(buf_+memset_start_pos, PADDING_LOG_CONTENT_CHAR, padding_log_entry_len); + // The format of LogEntry + // | LogEntryHeader | ObLogBaseHeader | data | + // NB: construct padding log entry by self + int64_t header_size = LogEntryHeader::HEADER_SER_SIZE; + int64_t padding_log_entry_data_len = padding_log_entry_len - header_size; + int64_t serialize_log_entry_header_pos = memset_start_pos; + if (OB_FAIL(LogEntryHeader::generate_padding_log_buf(padding_log_entry_data_len, + padding_entry_scn_, + buf_+serialize_log_entry_header_pos, + LogEntryHeader::PADDING_LOG_ENTRY_SIZE))) { + PALF_LOG(ERROR, "generate_padding_log_buf failed", KPC(this), K(padding_log_entry_data_len), + K(header_size), K(padding_log_entry_len), K(serialize_log_entry_header_pos)); + } else { + PALF_LOG(INFO, "generate padding log entry successfully", KPC(this)); + reset_padding_info_(); + } + } + return ret; +} + // step1. according to magic number, acquire log entry type; // step2. deserialize ENTRY from 'read_buf_', if buf not enough, return and run in next // round; step3. check entry integrity, if failed, return OB_INVALID_DATA; step4. if the @@ -821,6 +908,7 @@ int LogIteratorImpl::get_log_entry_type_(LogEntryType &log_entry_type) int ret = OB_SUCCESS; int16_t magic_number; int64_t pos = curr_read_pos_; + bool is_log_entry_iterator = std::is_same::value; // ensure that we can get the magic number of each log entry if (OB_FAIL( serialization::decode_i16(buf_, curr_read_buf_end_pos_, pos, &magic_number))) { @@ -833,6 +921,14 @@ int LogIteratorImpl::get_log_entry_type_(LogEntryType &log_entry_type) log_entry_type = LogEntryType::LOG_META_ENTRY_HEADER; } else if (LogBlockHeader::MAGIC == magic_number) { log_entry_type = LogEntryType::LOG_INFO_BLOCK_HEADER; + } else if (is_log_entry_iterator && curr_entry_is_padding_ + // defense code + // after iterate padding log entry successfully, assume next block has been corrupted, + // iterate next log entry in next block will be failed, we mustn't return padding log + // entry again.(In this case, curr_entry_is_padding_ is the value of prev log entry) + && !is_padding_entry_end_lsn_(log_storage_->get_lsn(curr_read_pos_))) { + log_entry_type = LogEntryType::LOG_ENTRY_HEADER; + PALF_LOG(INFO, "need consume padding log", KPC(this)); } else { ret = OB_INVALID_DATA; } @@ -851,7 +947,7 @@ int LogIteratorImpl::get_log_entry_type_(LogEntryType &log_entry_type) // When buf not enough to hold a complete log, need read data from 'curr', // however, to avoid read amplification, we need read data from 'end': // 1. limit read size into LOG_MAX_LOG_BUFFER_SIZE - valid_tail_part_size. -// 2. memove 'valid_tail_part' to the header of read_buf_. +// 2. memmove 'valid_tail_part' to the header of read_buf_. // 3. read data from 'end' and memcpy these data into read_buf_ + valid_tail_part_size. // // NB: when iterate to the end of block, need switch to next block. @@ -890,7 +986,7 @@ void LogIteratorImpl::try_clean_up_cache_() // reuse LogIteratorStorage firstly, only the log before 'start_lsn_' + 'curr_read_pos_' // has been consumed. // NB: need ensure that we can not update 'curr_read_pos_' to 'curr_read_pos_' + sizeof(LogGroupEntryHeader) - // when the LogGroupEntryHeader after 'curr_read_pos_' need be controlled by replable_point_scn. + // when the LogGroupEntryHeader after 'curr_read_pos_' need be controlled by replayable_point_scn. log_storage_->reuse(curr_read_lsn); curr_read_buf_start_pos_ = 0; curr_read_pos_ = 0; @@ -898,18 +994,28 @@ void LogIteratorImpl::try_clean_up_cache_() curr_entry_.reset(); // NB: we can not reset curr_entry_is_raw_write_, otherwise, the log entry after replayable_point_scn may no be - // controlled by replable_point_scn. + // controlled by replayable_point_scn. + // consider that: + // 1. At T1 timestamp, the LogGroupEntry has three LogEntry, the first LogEntry has been seen by ReplayService, + // however, the remained LogEntry can not been seen due to replayable_point_scn (current replayable_point_scn + // may be lag behind others). + // 2. At T2 timestamp, this replica is in flashback mode, the logs are not been flashback(flashback_scn is greater + // than replayable_point_scn, and there are several logs which SCN is greater than flashback_scn) + // 3. At T3 timestamp, ReplayService use next() function, iterator will try_clean_up_cache_ because mode version + // has been changed. if reset curr_entry_is_raw_write_, the second LogEntry may be seen by ReplayService even + // if the SCN of this LogEntry is greater than flashback_scn. // curr_entry_is_raw_write_ = false; curr_entry_size_ = 0; init_mode_version_ = current_mode_version; - // we can not reset prev_entry_scn_, otherwise, after flashback, if there is no logs which can be readbale on disk, + // we can not reset prev_entry_scn_, otherwise, after flashback, if there is no logs which can be readable on disk, // we can not return a valid next_min_scn. // - prev_entry_.reset(); // we need reset accum_checksum_, otherwise, the accum_checksum_ is the log before flashback, and iterate new // group log will fail. - accumlate_checksum_ = -1; + accumulate_checksum_ = -1; + reset_padding_info_(); } } } // end namespace palf diff --git a/src/logservice/palf/log_sliding_window.cpp b/src/logservice/palf/log_sliding_window.cpp index 85238f33a7..106580afbd 100644 --- a/src/logservice/palf/log_sliding_window.cpp +++ b/src/logservice/palf/log_sliding_window.cpp @@ -720,8 +720,22 @@ int LogSlidingWindow::generate_new_group_log_(const LSN &lsn, if (OB_FAIL(wait_group_buffer_ready_(lsn, log_body_size + LogGroupEntryHeader::HEADER_SER_SIZE))) { PALF_LOG(ERROR, "group_buffer wait failed", K(ret), K_(palf_id), K_(self)); } else if (is_padding_log) { - // padding log, fill log body with '\0'. - if (OB_FAIL(group_buffer_.fill_padding_body(lsn + LogGroupEntryHeader::HEADER_SER_SIZE, log_body_size))) { + const int64_t padding_log_body_size = log_body_size - LogEntryHeader::HEADER_SER_SIZE; + const int64_t padding_valid_data_len = LogEntryHeader::PADDING_LOG_ENTRY_SIZE; + // padding_valid_data only include LogEntryHeader and ObLogBaseHeader + // The format like follow: + // | LogEntryHeader | ObLogBaseHeader| + // and the format of padding log entry like follow: + // | LogEntryHeader | ObLogBaseHeader| PADDING_LOG_CONTENT_CHAR | + // | 32 BYTE | 16 BYTE | padding_log_body_size - 48 BYTE | + char padding_valid_data[padding_valid_data_len]; + memset(padding_valid_data, 0, padding_valid_data_len); + if (OB_FAIL(LogEntryHeader::generate_padding_log_buf(padding_log_body_size, scn, padding_valid_data, padding_valid_data_len))) { + PALF_LOG(ERROR, "generate_padding_log_buf failed", K_(palf_id), K_(self), K(padding_valid_data_len), + K(scn), K(padding_log_body_size)); + } + // padding log, fill log body with PADDING_LOG_CONTENT_CHAR. + else if (OB_FAIL(group_buffer_.fill_padding_body(lsn + LogGroupEntryHeader::HEADER_SER_SIZE, padding_valid_data, padding_valid_data_len, log_body_size))) { PALF_LOG(WARN, "group_buffer fill_padding_body failed", K(ret), K_(palf_id), K_(self), K(log_body_size)); } else { // inc ref diff --git a/src/logservice/replayservice/ob_log_replay_service.cpp b/src/logservice/replayservice/ob_log_replay_service.cpp index ee4d9c30b8..744cab58a6 100644 --- a/src/logservice/replayservice/ob_log_replay_service.cpp +++ b/src/logservice/replayservice/ob_log_replay_service.cpp @@ -575,25 +575,6 @@ int ObLogReplayService::is_replay_done(const share::ObLSID &id, return ret; } -int ObLogReplayService::flashback(const share::ObLSID &id) -{ - int ret = OB_SUCCESS; - ObReplayStatus *replay_status = NULL; - ObReplayStatusGuard guard; - if (IS_NOT_INIT) { - ret = OB_NOT_INIT; - CLOG_LOG(WARN, "replay service not init", K(ret)); - } else if (OB_FAIL(get_replay_status_(id, guard))) { - CLOG_LOG(WARN, "guard get replay status failed", K(ret), K(id)); - } else if (NULL == (replay_status = guard.get_replay_status())) { - ret = OB_ERR_UNEXPECTED; - CLOG_LOG(WARN, "replay status is not exist", K(ret), K(id)); - } else if (OB_FAIL(replay_status->flashback())) { - CLOG_LOG(WARN, "replay status flashback failed", K(ret), K(id)); - } - return ret; -} - //通用接口, 受控回放时最终返回值为受控回放点前的最后一条日志的log_ts int ObLogReplayService::get_max_replayed_scn(const share::ObLSID &id, SCN &scn) { @@ -1005,6 +986,10 @@ int ObLogReplayService::fetch_and_submit_single_log_(ObReplayStatus &replay_stat CLOG_LOG(TRACE, "skip current log", K(replay_status), K(cur_lsn), K(log_size), K(cur_log_submit_scn)); } else if (OB_FAIL(header.deserialize(log_buf, log_size, header_pos))) { CLOG_LOG(WARN, "basic header deserialize failed", K(ret), K(header_pos), K(id)); + } else if (ObLogBaseType::PADDING_LOG_BASE_TYPE == header.get_log_type()) { + // For padding log entry, iterate next log directly. + need_iterate_next_log = true; + CLOG_LOG(INFO, "no need to replay padding log entry", KPC(submit_task), K(header)); } else if (header.need_pre_replay_barrier()) { // 前向barrier日志的replay task和log buf需要分别分配内存 if (OB_FAIL(fetch_pre_barrier_log_(replay_status, @@ -1077,7 +1062,6 @@ int ObLogReplayService::handle_submit_task_(ObReplayServiceSubmitTask *submit_ta { int ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS; - LSN committed_end_lsn; ObReplayStatus *replay_status = NULL; if (IS_NOT_INIT) { ret = OB_NOT_INIT; @@ -1090,11 +1074,7 @@ int ObLogReplayService::handle_submit_task_(ObReplayServiceSubmitTask *submit_ta ret = OB_ERR_UNEXPECTED; on_replay_error_(); CLOG_LOG(ERROR, "replay status is NULL", KPC(submit_task), KPC(replay_status), KR(ret)); - } else if (OB_FAIL(submit_task->get_committed_end_lsn(committed_end_lsn))) { - // getting committed_end_lsn first ensures palf has all logs until committed_end_lsn. - CLOG_LOG(ERROR, "failed to get_committed_end_lsn", KR(ret), K(committed_end_lsn), KPC(replay_status)); } else if (replay_status->try_rdlock()) { - (void)submit_task->get_committed_end_lsn(committed_end_lsn); const int64_t start_ts = ObClockGenerator::getClock(); bool need_submit_log = true; int64_t count = 0; @@ -1110,41 +1090,7 @@ int ObLogReplayService::handle_submit_task_(ObReplayServiceSubmitTask *submit_ta const SCN &replayable_point = replayable_point_.atomic_load(); need_submit_log = submit_task->has_remained_submit_log(replayable_point, iterate_end_by_replayable_point); - // consider 3 cases - // 1. has_remained_submit_log return true: - // just fetch this log. - // 2. has_remained_submit_log return false and is iterate_end_by_replayable_point: - // do nothing, if replayable_point changed, thread lease ensures this task retry. - // 3. has_remained_submit_log return true and not iterate_end_by_replayable_point: - // check whether has padding log. if (!need_submit_log) { - if (OB_FAIL(submit_task->get_next_to_submit_log_info(to_submit_lsn, to_submit_scn))) { - CLOG_LOG(ERROR, "failed to get_next_to_submit_log_info", KR(ret), K(to_submit_lsn), - K(to_submit_scn)); - } else if (!iterate_end_by_replayable_point - && committed_end_lsn.is_valid() - && to_submit_lsn < committed_end_lsn) { - //TODO: @runlin 移除padding日志时此处特殊处理需要一并移除 - //当前palf最后一条日志为padding,直接推大to_submit_lsn - if (lsn_2_offset(committed_end_lsn, PALF_BLOCK_SIZE) != 0) { - CLOG_LOG(WARN, "no log to fetch but committed_end_lsn is not new file header", - KR(ret), K(to_submit_lsn), K(committed_end_lsn), KPC(replay_status)); - } else if (1 != lsn_2_block(committed_end_lsn, PALF_BLOCK_SIZE) - - lsn_2_block(to_submit_lsn, PALF_BLOCK_SIZE)) { - CLOG_LOG(ERROR, "padding log committed_end_lsn is not continuous with to_submit_lsn", - KR(ret), K(to_submit_lsn), K(committed_end_lsn), KPC(replay_status)); - //padding日志scn和尾部日志重复,只更新lsn - } else if (OB_FAIL(submit_task->update_next_to_submit_lsn(committed_end_lsn))) { - // log info回退 - CLOG_LOG(ERROR, "failed to update_next_submit_log_info", KR(ret), K(committed_end_lsn), KPC(replay_status)); - replay_status->set_err_info(committed_end_lsn, to_submit_scn, ObLogBaseType::INVALID_LOG_BASE_TYPE, - 0, true, ObClockGenerator::getClock(), ret); - } else { - CLOG_LOG(INFO, "no log to fetch but committed_end_lsn not reached, last log may be padding", - KR(ret), K(to_submit_lsn), K(committed_end_lsn), K(to_submit_scn), KPC(replay_status)); - } - } - //end loop, return OB_SUCCESS for drop current task } else if (OB_SUCC(fetch_and_submit_single_log_(*replay_status, submit_task, to_submit_lsn, to_submit_scn, log_size))) { count++; diff --git a/src/logservice/replayservice/ob_log_replay_service.h b/src/logservice/replayservice/ob_log_replay_service.h index c13f97948d..1d86dcf247 100644 --- a/src/logservice/replayservice/ob_log_replay_service.h +++ b/src/logservice/replayservice/ob_log_replay_service.h @@ -148,7 +148,6 @@ public: int is_replay_done(const share::ObLSID &id, const palf::LSN &end_lsn, bool &is_done); - int flashback(const share::ObLSID &id); int get_max_replayed_scn(const share::ObLSID &id, share::SCN &scn); int submit_task(ObReplayServiceTask *task); int update_replayable_point(const share::SCN &replayable_scn); diff --git a/src/logservice/replayservice/ob_replay_status.cpp b/src/logservice/replayservice/ob_replay_status.cpp index f13dd662d2..83aff0c5b3 100644 --- a/src/logservice/replayservice/ob_replay_status.cpp +++ b/src/logservice/replayservice/ob_replay_status.cpp @@ -111,7 +111,6 @@ int ObReplayServiceSubmitTask::init(const palf::LSN &base_lsn, CLOG_LOG(ERROR, "base_scn is invalid", K(type_), K(base_lsn), K(base_scn), KR(ret)); } else { replay_status_ = replay_status; - committed_end_lsn_ = base_lsn; next_to_submit_lsn_ = base_lsn; next_to_submit_scn_.set_min(); base_lsn_ = base_lsn; @@ -121,7 +120,7 @@ int ObReplayServiceSubmitTask::init(const palf::LSN &base_lsn, // 在没有写入的情况下有可能已经到达边界 CLOG_LOG(WARN, "iterator next failed", K(iterator_), K(tmp_ret)); } - CLOG_LOG(INFO, "submit log task init success", K(type_), K(next_to_submit_lsn_), K(committed_end_lsn_), + CLOG_LOG(INFO, "submit log task init success", K(type_), K(next_to_submit_lsn_), K(next_to_submit_scn_), K(replay_status_), K(ret)); } return ret; @@ -131,7 +130,6 @@ void ObReplayServiceSubmitTask::reset() { ObLockGuard guard(lock_); next_to_submit_lsn_.reset(); - committed_end_lsn_.reset(); next_to_submit_scn_.reset(); base_lsn_.reset(); base_scn_.reset(); @@ -160,13 +158,6 @@ int ObReplayServiceSubmitTask::get_next_to_submit_log_info_(LSN &lsn, SCN &scn) return ret; } -int ObReplayServiceSubmitTask::get_committed_end_lsn(LSN &lsn) const -{ - int ret = OB_SUCCESS; - lsn = committed_end_lsn_; - return ret; -} - int ObReplayServiceSubmitTask::get_base_lsn(LSN &lsn) const { ObLockGuard guard(lock_); @@ -233,35 +224,6 @@ int ObReplayServiceSubmitTask::update_submit_log_meta_info(const LSN &lsn, return ret; } -int ObReplayServiceSubmitTask::update_committed_end_lsn(const LSN &lsn) -{ - return update_committed_end_lsn_(lsn); -} - -int ObReplayServiceSubmitTask::update_committed_end_lsn_(const LSN &lsn) -{ - int ret = OB_SUCCESS; - if (OB_UNLIKELY(lsn <= committed_end_lsn_)) { - ret = OB_INVALID_ARGUMENT; - CLOG_LOG(WARN, "update_committed_end_lsn invalid argument", K(type_), K(lsn), - K(committed_end_lsn_), K(ret)); - } else { - committed_end_lsn_ = lsn; - } - return ret; -} - -int ObReplayServiceSubmitTask::set_committed_end_lsn(const LSN &lsn) -{ - int ret = OB_SUCCESS; - ATOMIC_SET(&committed_end_lsn_.val_, lsn.val_); - if (next_to_submit_lsn_ > committed_end_lsn_) { - CLOG_LOG(INFO, "need rollback next_to_submit_lsn_", K(lsn), KPC(this)); - next_to_submit_lsn_ = committed_end_lsn_; - } - return ret; -} - int ObReplayServiceSubmitTask::need_skip(const SCN &scn, bool &need_skip) { int ret = OB_SUCCESS; @@ -838,35 +800,6 @@ void ObReplayStatus::switch_to_follower(const palf::LSN &begin_lsn) CLOG_LOG(INFO, "replay status switch_to_follower", KPC(this), K(begin_lsn)); } -int ObReplayStatus::flashback() -{ - int ret = OB_SUCCESS; - WLockGuardWithRetryInterval wguard(rwlock_, WRLOCK_TRY_THRESHOLD, WRLOCK_RETRY_INTERVAL); - if (OB_FAIL(flashback_())) { - CLOG_LOG(WARN, "replay status flashback failed", K(ret), KPC(this)); - } else { - CLOG_LOG(INFO, "replay status flashback success", K(ret), KPC(this)); - } - return ret; -} - -int ObReplayStatus::flashback_() -{ - int ret = OB_SUCCESS; - LSN committed_end_lsn; - if (OB_FAIL(palf_handle_.get_end_lsn(committed_end_lsn))) { - CLOG_LOG(WARN, "get_end_lsn failed", K(ret), KPC(this)); - } else if (OB_FAIL(submit_log_task_.set_committed_end_lsn(committed_end_lsn))) { - CLOG_LOG(WARN, "set_committed_end_lsn failed", K(ret), KPC(this), K(committed_end_lsn)); - } else if (OB_FAIL(submit_task_to_replay_service_(submit_log_task_))) { - CLOG_LOG(ERROR, "failed to submit submit_log_task to replay service", K(submit_log_task_), - KPC(this), K(ret)); - } else { - // do nothing - } - return ret; -} - bool ObReplayStatus::has_remained_replay_task() const { int64_t count = 0; @@ -928,9 +861,6 @@ int ObReplayStatus::update_end_offset(const LSN &lsn) CLOG_LOG(ERROR, "invalid arguments", K(ls_id_), K(lsn), K(ret)); } else if (!need_submit_log()) { // leader do nothing, keep submit_log_task recording last round status as follower - } else if (OB_FAIL(submit_log_task_.update_committed_end_lsn(lsn))) { - // update offset and submit submit_log_task - CLOG_LOG(ERROR, "failed to update_apply_end_offset", KR(ret), K(ls_id_), K(lsn)); } else if (OB_FAIL(submit_task_to_replay_service_(submit_log_task_))) { CLOG_LOG(ERROR, "failed to submit submit_log_task to replay Service", K(submit_log_task_), KPC(this), K(ret)); @@ -1345,8 +1275,8 @@ int ObReplayStatus::stat(LSReplayStat &stat) const if (OB_FAIL(submit_log_task_.get_next_to_submit_log_info(stat.unsubmitted_lsn_, stat.unsubmitted_scn_))) { CLOG_LOG(WARN, "get_next_to_submit_log_info failed", KPC(this), K(ret)); - } else if (OB_FAIL(submit_log_task_.get_committed_end_lsn(stat.end_lsn_))) { - CLOG_LOG(WARN, "get_committed_end_lsn failed", KPC(this), K(ret)); + } else if (OB_FAIL(palf_handle_.get_end_lsn(stat.end_lsn_))) { + CLOG_LOG(WARN, "get_end_lsn from palf failed", KPC(this), K(ret)); } } return ret; diff --git a/src/logservice/replayservice/ob_replay_status.h b/src/logservice/replayservice/ob_replay_status.h index 207b392d7c..81017fb72e 100644 --- a/src/logservice/replayservice/ob_replay_status.h +++ b/src/logservice/replayservice/ob_replay_status.h @@ -270,7 +270,6 @@ class ObReplayServiceSubmitTask : public ObReplayServiceTask public: ObReplayServiceSubmitTask(): ObReplayServiceTask(), next_to_submit_lsn_(), - committed_end_lsn_(), next_to_submit_scn_(), base_lsn_(), base_scn_(), @@ -296,9 +295,6 @@ public: // 不允许回退 int update_submit_log_meta_info(const palf::LSN &lsn, const share::SCN &scn); int update_next_to_submit_lsn(const palf::LSN &lsn); - int update_committed_end_lsn(const palf::LSN &lsn); - // only for flashback, should not complicated with update_committed_end_lsn - int set_committed_end_lsn(const palf::LSN &lsn); int get_next_to_submit_log_info(palf::LSN &lsn, share::SCN &scn) const; int get_committed_end_lsn(palf::LSN &lsn) const; int get_base_lsn(palf::LSN &lsn) const; @@ -313,7 +309,6 @@ public: INHERIT_TO_STRING_KV("ObReplayServiceSubmitTask", ObReplayServiceTask, K(next_to_submit_lsn_), - K(committed_end_lsn_), K(next_to_submit_scn_), K(base_lsn_), K(base_scn_), @@ -321,7 +316,6 @@ public: private: int update_next_to_submit_lsn_(const palf::LSN &lsn); int update_next_to_submit_scn_(const share::SCN &scn); - int update_committed_end_lsn_(const palf::LSN &lsn); void set_next_to_submit_log_info_(const palf::LSN &lsn, const share::SCN &scn); int get_next_to_submit_log_info_(palf::LSN &lsn, share::SCN &scn) const; int get_base_lsn_(palf::LSN &lsn) const; @@ -330,8 +324,6 @@ private: private: // location of next log after the last log that has already been submit to replay, consider as left side of iterator palf::LSN next_to_submit_lsn_; - //location of the last log that need submit to replay, consider as right side of iterator - palf::LSN committed_end_lsn_; share::SCN next_to_submit_scn_; //initial log lsn when enable replay, for stat replay process palf::LSN base_lsn_; @@ -502,7 +494,6 @@ public: bool has_remained_replay_task() const; // update right margin of logs that need to replay int update_end_offset(const palf::LSN &lsn); - int flashback(); int push_log_replay_task(ObLogReplayTask &task); int batch_push_all_task_queue(); @@ -591,7 +582,6 @@ private: // 注销回调并清空任务 int disable_(); bool is_replay_enabled_() const; - int flashback_(); private: static const int64_t PENDING_COUNT_THRESHOLD = 100; static const int64_t EAGAIN_COUNT_THRESHOLD = 50000; diff --git a/unittest/logservice/test_log_checksum.cpp b/unittest/logservice/test_log_checksum.cpp index 1b20b3957c..5c0c7223d4 100644 --- a/unittest/logservice/test_log_checksum.cpp +++ b/unittest/logservice/test_log_checksum.cpp @@ -76,7 +76,6 @@ TEST(TestLogChecksum, test_log_checksum) EXPECT_TRUE(parity_check(v1)); EXPECT_FALSE(parity_check(v2)); } - } } diff --git a/unittest/logservice/test_log_entry_and_group_entry.cpp b/unittest/logservice/test_log_entry_and_group_entry.cpp index 940b035630..58c1b71f97 100644 --- a/unittest/logservice/test_log_entry_and_group_entry.cpp +++ b/unittest/logservice/test_log_entry_and_group_entry.cpp @@ -14,12 +14,16 @@ #include "lib/ob_errno.h" #include "lib/net/ob_addr.h" // ObAddr #include "logservice/palf/log_define.h" -#include "logservice/palf/log_group_entry.h" -#include "logservice/palf/log_writer_utils.h" +#include "lib/checksum/ob_crc64.h" // ob_crc64 #define private public #include "logservice/palf/log_group_entry_header.h" #include "logservice/palf/log_entry.h" #include "share/scn.h" +#include "logservice/ob_log_base_header.h" // ObLogBaseHeader +#include "logservice/palf/log_group_buffer.h" +#include "logservice/palf/log_group_entry.h" +#include "logservice/palf/log_writer_utils.h" +#include "share/rc/ob_tenant_base.h" #undef private #include @@ -220,14 +224,199 @@ TEST(TestLogGroupEntryHeader, test_log_group_entry_header) EXPECT_TRUE(log_group_entry2.check_integrity()); } +TEST(TestPaddingLogEntry, test_invalid_padding_log_entry) +{ + LogEntryHeader header; + char buf[1024]; + EXPECT_EQ(OB_INVALID_ARGUMENT, header.generate_padding_header_(NULL, 1, 1, share::SCN::min_scn())); + EXPECT_EQ(OB_INVALID_ARGUMENT, header.generate_padding_header_(buf, 0, 1, share::SCN::min_scn())); + EXPECT_EQ(OB_INVALID_ARGUMENT, header.generate_padding_header_(buf, 1, 0, share::SCN::min_scn())); + EXPECT_EQ(OB_INVALID_ARGUMENT, header.generate_padding_header_(buf, 1, 1, share::SCN::invalid_scn())); + EXPECT_EQ(OB_SUCCESS, header.generate_padding_header_(buf, 1, 1, share::SCN::min_scn())); + + EXPECT_EQ(OB_INVALID_ARGUMENT, LogEntryHeader::generate_padding_log_buf(0, share::SCN::min_scn(), buf, 1)); + EXPECT_EQ(OB_INVALID_ARGUMENT, LogEntryHeader::generate_padding_log_buf(1, share::SCN::invalid_scn(), buf, 1)); + EXPECT_EQ(OB_INVALID_ARGUMENT, LogEntryHeader::generate_padding_log_buf(1, share::SCN::min_scn(), NULL, 1)); + EXPECT_EQ(OB_INVALID_ARGUMENT, LogEntryHeader::generate_padding_log_buf(1, share::SCN::min_scn(), buf, 0)); + EXPECT_EQ(OB_INVALID_ARGUMENT, LogEntryHeader::generate_padding_log_buf(1, share::SCN::min_scn(), buf, 2)); + EXPECT_EQ(OB_INVALID_ARGUMENT, LogEntryHeader::generate_padding_log_buf(2, share::SCN::min_scn(), buf, 1)); + EXPECT_EQ(OB_INVALID_ARGUMENT, LogEntryHeader::generate_padding_log_buf(1, share::SCN::min_scn(), buf, 1)); + logservice::ObLogBaseHeader base_header; + const int64_t min_padding_valid_data_len = header.get_serialize_size() + base_header.get_serialize_size(); + EXPECT_EQ(OB_SUCCESS, LogEntryHeader::generate_padding_log_buf(1+min_padding_valid_data_len, share::SCN::min_scn(), buf, min_padding_valid_data_len)); +} + +TEST(TestPaddingLogEntry, test_padding_log_entry) +{ + PALF_LOG(INFO, "test_padding_log_entry"); + LogEntry padding_log_entry; + const int64_t padding_data_len = MAX_LOG_BODY_SIZE; + share::SCN padding_group_scn; + padding_group_scn.convert_for_logservice(ObTimeUtility::current_time_ns()); + LogEntryHeader padding_log_entry_header; + char base_header_data[1024] = {'\0'}; + logservice::ObLogBaseHeader base_header(logservice::ObLogBaseType::PADDING_LOG_BASE_TYPE, + logservice::ObReplayBarrierType::NO_NEED_BARRIER); + int64_t serialize_pos = 0; + EXPECT_EQ(OB_SUCCESS, base_header.serialize(base_header_data, 1024, serialize_pos)); + // 生成padding log entry的有效数据 + EXPECT_EQ(OB_SUCCESS, padding_log_entry_header.generate_padding_header_( + base_header_data, base_header.get_serialize_size(), + padding_data_len-LogEntryHeader::HEADER_SER_SIZE, padding_group_scn)); + EXPECT_EQ(true, padding_log_entry_header.check_integrity(base_header_data, padding_data_len)); + + // padding group log format + // | GroupHeader | EntryHeader | BaseHeader | '\0' | + LogGroupEntry padding_group_entry; + LogGroupEntryHeader padding_group_entry_header; + + const int64_t padding_buffer_len = MAX_LOG_BUFFER_SIZE; + char *padding_buffer = reinterpret_cast(ob_malloc(padding_buffer_len, "unittest")); + ASSERT_NE(nullptr, padding_buffer); + memset(padding_buffer, PADDING_LOG_CONTENT_CHAR, padding_buffer_len); + { + // 将base_data的数据拷贝到对应位置 + memcpy(padding_buffer+padding_group_entry_header.get_serialize_size() + LogEntryHeader::HEADER_SER_SIZE, base_header_data, 1024); + padding_log_entry.header_ = padding_log_entry_header; + padding_log_entry.buf_ = padding_buffer+padding_group_entry_header.get_serialize_size() + LogEntryHeader::HEADER_SER_SIZE; + // 构造有效的padding_log_entry,用于后续的序列化操作 + EXPECT_EQ(true, padding_log_entry.check_integrity()); + EXPECT_EQ(true, padding_log_entry.header_.is_padding_log_()); + } + { + LogEntry deserialize_padding_log_entry; + const int64_t tmp_padding_buffer_len = MAX_LOG_BUFFER_SIZE; + char *tmp_padding_buffer = reinterpret_cast(ob_malloc(tmp_padding_buffer_len, "unittest")); + ASSERT_NE(nullptr, tmp_padding_buffer); + int64_t pos = 0; + EXPECT_EQ(OB_SUCCESS, padding_log_entry.serialize(tmp_padding_buffer, tmp_padding_buffer_len, pos)); + pos = 0; + EXPECT_EQ(OB_SUCCESS, deserialize_padding_log_entry.deserialize(tmp_padding_buffer, tmp_padding_buffer_len, pos)); + EXPECT_EQ(true, deserialize_padding_log_entry.check_integrity()); + EXPECT_EQ(padding_log_entry.header_.data_checksum_, deserialize_padding_log_entry.header_.data_checksum_); + ob_free(tmp_padding_buffer); + tmp_padding_buffer = nullptr; + } + + serialize_pos = padding_group_entry_header.get_serialize_size(); + // 拷贝LogEntry到padding_buffer指定位置 + EXPECT_EQ(OB_SUCCESS, padding_log_entry.serialize(padding_buffer, padding_buffer_len, serialize_pos)); + + LogWriteBuf write_buf; + EXPECT_EQ(OB_SUCCESS, write_buf.push_back(padding_buffer, padding_buffer_len)); + bool is_raw_write = false; + bool is_padding_log = true; + int64_t data_checksum = 0; + EXPECT_EQ(OB_SUCCESS, padding_group_entry_header.generate(is_raw_write, is_padding_log, write_buf, padding_data_len, padding_group_scn, 1, LSN(0), 1, data_checksum)); + padding_group_entry_header.update_accumulated_checksum(0); + padding_group_entry_header.update_header_checksum(); + padding_group_entry.header_ = padding_group_entry_header; + padding_group_entry.buf_ = padding_buffer + padding_group_entry_header.get_serialize_size(); + EXPECT_EQ(true, padding_group_entry.check_integrity()); + EXPECT_EQ(true, padding_group_entry.header_.is_padding_log()); + // 验证反序列化LogEntry + { + int64_t pos = 0; + LogEntry tmp_padding_log_entry; + EXPECT_EQ(OB_SUCCESS, tmp_padding_log_entry.deserialize(padding_group_entry.buf_, padding_group_entry.get_data_len(), pos)); + EXPECT_EQ(pos, padding_group_entry.get_data_len()); + EXPECT_EQ(true, tmp_padding_log_entry.check_integrity()); + EXPECT_EQ(true, tmp_padding_log_entry.header_.is_padding_log_()); + logservice::ObLogBaseHeader tmp_base_header; + pos = 0; + EXPECT_EQ(OB_SUCCESS, tmp_base_header.deserialize(tmp_padding_log_entry.buf_, tmp_padding_log_entry.get_data_len(), pos)); + EXPECT_EQ(base_header.log_type_, logservice::ObLogBaseType::PADDING_LOG_BASE_TYPE); + } + + char *serialize_buffer = reinterpret_cast(ob_malloc(padding_buffer_len, "unittest")); + ASSERT_NE(nullptr, serialize_buffer); + memset(serialize_buffer, PADDING_LOG_CONTENT_CHAR, padding_buffer_len); + serialize_pos = 0; + // 验证序列化的数据是否符合预期 + EXPECT_EQ(OB_SUCCESS, padding_group_entry.serialize(serialize_buffer, padding_buffer_len, serialize_pos)); + EXPECT_EQ(serialize_pos, padding_data_len+padding_group_entry_header.get_serialize_size()); + + LogGroupEntry deserialize_group_entry; + serialize_pos = 0; + EXPECT_EQ(OB_SUCCESS, deserialize_group_entry.deserialize(serialize_buffer, padding_buffer_len, serialize_pos)); + EXPECT_EQ(true, deserialize_group_entry.check_integrity()); + EXPECT_EQ(true, deserialize_group_entry.header_.is_padding_log()); + EXPECT_EQ(padding_group_entry.header_, deserialize_group_entry.header_); + + // 验证反序列化LogEntry + { + int64_t pos = 0; + LogEntry tmp_padding_log_entry; + EXPECT_EQ(OB_SUCCESS, tmp_padding_log_entry.deserialize(deserialize_group_entry.buf_, padding_group_entry.get_data_len(), pos)); + EXPECT_EQ(pos, deserialize_group_entry.get_data_len()); + EXPECT_EQ(true, tmp_padding_log_entry.check_integrity()); + EXPECT_EQ(true, tmp_padding_log_entry.header_.is_padding_log_()); + logservice::ObLogBaseHeader tmp_base_header; + pos = 0; + EXPECT_EQ(OB_SUCCESS, tmp_base_header.deserialize(tmp_padding_log_entry.buf_, tmp_padding_log_entry.get_data_len(), pos)); + EXPECT_EQ(base_header.log_type_, logservice::ObLogBaseType::PADDING_LOG_BASE_TYPE); + } + + LogGroupBuffer group_buffer; + LSN start_lsn(0); + + ObMallocAllocator::get_instance()->create_and_add_tenant_allocator(1001); + // init MTL + share::ObTenantBase tbase(1001); + share::ObTenantEnv::set_tenant(&tbase); + EXPECT_EQ(OB_SUCCESS, group_buffer.init(start_lsn)); + + const int64_t padding_valid_data_len = deserialize_group_entry.get_header().get_serialize_size() + padding_log_entry_header.get_serialize_size() + base_header.get_serialize_size(); + // 填充有效的padding日志到group buffer,验证数据是否相等 + // padding_buffer包括LogGruopEntryHeader, LogEntryHeader, ObLogBaseHeader, padding log body(is filled with '\0') + EXPECT_EQ(OB_SUCCESS, group_buffer.fill_padding_body(start_lsn, serialize_buffer, padding_valid_data_len, padding_buffer_len)); + EXPECT_EQ(0, memcmp(group_buffer.data_buf_, serialize_buffer, deserialize_group_entry.get_serialize_size())); + PALF_LOG(INFO, "runlin trace", K(group_buffer.data_buf_), K(serialize_buffer), K(padding_buffer_len), KP(group_buffer.data_buf_), KP(padding_buffer)); + ob_free(padding_buffer); + padding_buffer = NULL; + ob_free(serialize_buffer); + serialize_buffer = NULL; + + group_buffer.destroy(); + ObMallocAllocator::get_instance()->recycle_tenant_allocator(1001); +} + +TEST(TestPaddingLogEntry, test_generate_padding_log_entry) +{ + PALF_LOG(INFO, "test_generate_padding_log_entry"); + LogEntry padding_log_entry; + const int64_t padding_data_len = 1024; + const share::SCN padding_scn = share::SCN::min_scn(); + const int64_t padding_log_entry_len = padding_data_len + LogEntryHeader::HEADER_SER_SIZE; + char *out_buf = reinterpret_cast(ob_malloc(padding_log_entry_len, "unittest")); + ASSERT_NE(nullptr, out_buf); + LogEntryHeader padding_header; + logservice::ObLogBaseHeader base_header(logservice::ObLogBaseType::PADDING_LOG_BASE_TYPE, logservice::ObReplayBarrierType::NO_NEED_BARRIER, 0); + char base_header_buf[1024]; + memset(base_header_buf, 0, 1024); + int64_t serialize_base_header_pos = 0; + EXPECT_EQ(OB_SUCCESS, base_header.serialize(base_header_buf, 1024, serialize_base_header_pos)); + EXPECT_EQ(OB_SUCCESS, padding_header.generate_padding_header_(base_header_buf, base_header.get_serialize_size(), padding_data_len, padding_scn)); + EXPECT_EQ(true, padding_header.check_header_integrity()); + EXPECT_EQ(OB_SUCCESS, LogEntryHeader::generate_padding_log_buf(padding_data_len, padding_scn, out_buf, LogEntryHeader::PADDING_LOG_ENTRY_SIZE)); + int64_t pos = 0; + EXPECT_EQ(OB_SUCCESS, padding_log_entry.deserialize(out_buf, padding_log_entry_len, pos)); + EXPECT_EQ(true, padding_log_entry.check_integrity()); + EXPECT_EQ(true, padding_log_entry.header_.is_padding_log_()); + EXPECT_EQ(padding_log_entry.header_.data_checksum_, padding_header.data_checksum_); + ob_free(out_buf); + out_buf = nullptr; +} + } // namespace unittest } // namespace oceanbase int main(int argc, char **argv) { + system("rm -f test_log_entry_and_group_entry.log"); OB_LOGGER.set_file_name("test_log_entry_and_group_entry.log", true); OB_LOGGER.set_log_level("INFO"); - PALF_LOG(INFO, "begin unittest::test_log_entry_and_grou_entry"); + PALF_LOG(INFO, "begin unittest::test_log_entry_and_group_entry"); ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); } diff --git a/unittest/logservice/test_log_group_buffer.cpp b/unittest/logservice/test_log_group_buffer.cpp index 72f0cf97ca..d2ac4f3eac 100644 --- a/unittest/logservice/test_log_group_buffer.cpp +++ b/unittest/logservice/test_log_group_buffer.cpp @@ -18,6 +18,7 @@ #define private public #include "logservice/palf/log_group_buffer.h" #include "logservice/palf/log_writer_utils.h" +#include "logservice/palf/log_entry_header.h" #undef private #include "share/rc/ob_tenant_base.h" @@ -63,6 +64,7 @@ void TestLogGroupBuffer::TearDown() { PALF_LOG(INFO, "TestLogGroupBuffer has TearDown"); PALF_LOG(INFO, "TearDown success"); + log_group_buffer_.destroy(); ObMallocAllocator::get_instance()->recycle_tenant_allocator(1001); } @@ -170,35 +172,104 @@ TEST_F(TestLogGroupBuffer, test_fill) TEST_F(TestLogGroupBuffer, test_fill_padding) { LSN lsn; + const int64_t padding_valid_data_len = LogEntryHeader::PADDING_LOG_ENTRY_SIZE; + char padding_valid_data[padding_valid_data_len]; int64_t len = 0; LSN reuse_lsn(1024); - EXPECT_EQ(OB_NOT_INIT, log_group_buffer_.fill_padding_body(lsn, len)); + EXPECT_EQ(OB_NOT_INIT, log_group_buffer_.fill_padding_body(lsn, padding_valid_data, padding_valid_data_len, len)); LSN start_lsn(100); EXPECT_EQ(OB_SUCCESS, log_group_buffer_.init(start_lsn)); - EXPECT_EQ(OB_INVALID_ARGUMENT, log_group_buffer_.fill_padding_body(lsn, len)); + EXPECT_EQ(OB_INVALID_ARGUMENT, log_group_buffer_.fill_padding_body(lsn, padding_valid_data, padding_valid_data_len, len)); lsn = reuse_lsn; - EXPECT_EQ(OB_INVALID_ARGUMENT, log_group_buffer_.fill_padding_body(lsn, len)); - EXPECT_EQ(OB_INVALID_ARGUMENT, log_group_buffer_.fill_padding_body(lsn, len)); + EXPECT_EQ(OB_INVALID_ARGUMENT, log_group_buffer_.fill_padding_body(lsn, padding_valid_data, padding_valid_data_len, len)); len = 100; lsn.val_ = start_lsn.val_ - 1; - EXPECT_EQ(OB_ERR_UNEXPECTED, log_group_buffer_.fill_padding_body(lsn, len)); + EXPECT_EQ(OB_ERR_UNEXPECTED, log_group_buffer_.fill_padding_body(lsn, padding_valid_data, padding_valid_data_len, len)); lsn.val_ = start_lsn.val_; EXPECT_EQ(OB_SUCCESS, log_group_buffer_.inc_update_reuse_lsn(reuse_lsn)); - EXPECT_EQ(OB_ERR_UNEXPECTED, log_group_buffer_.fill_padding_body(lsn, len)); + EXPECT_EQ(OB_ERR_UNEXPECTED, log_group_buffer_.fill_padding_body(lsn, padding_valid_data, padding_valid_data_len, len)); lsn = reuse_lsn; len = log_group_buffer_.get_available_buffer_size() + 1; - EXPECT_EQ(OB_EAGAIN, log_group_buffer_.fill_padding_body(lsn, len)); + EXPECT_EQ(OB_EAGAIN, log_group_buffer_.fill_padding_body(lsn, padding_valid_data, padding_valid_data_len, len)); len = 1024; int64_t used_size = len; const int64_t buf_size = log_group_buffer_.get_available_buffer_size(); LSN buf_end_lsn = reuse_lsn + (buf_size - (reuse_lsn.val_ - start_lsn.val_)); while (lsn + len < buf_end_lsn) { - EXPECT_EQ(OB_SUCCESS, log_group_buffer_.fill_padding_body(lsn, len)); + EXPECT_EQ(OB_SUCCESS, log_group_buffer_.fill_padding_body(lsn, padding_valid_data, padding_valid_data_len, len)); lsn.val_ += len; } EXPECT_GT(lsn + len, buf_end_lsn); } +TEST_F(TestLogGroupBuffer, test_fill_padding_cross_bround) +{ + const int64_t padding_valid_data_len = LogEntryHeader::PADDING_LOG_ENTRY_SIZE; + char padding_valid_data[padding_valid_data_len]; + memset(padding_valid_data, 'c', padding_valid_data_len); + int64_t len = 0; + LSN start_lsn(0); + EXPECT_EQ(OB_SUCCESS, log_group_buffer_.init(start_lsn)); + const int64_t unittest_log_buffer_size = 4 * 1024 * 1024; + log_group_buffer_.available_buffer_size_ = log_group_buffer_.reserved_buffer_size_ = unittest_log_buffer_size; + // LSN为0,提交3条1M日志 + const int64_t log_size = 1*1024*1024; + char *buf = reinterpret_cast(ob_malloc(1*1024*1024, "unittest")); + ASSERT_NE(nullptr, buf); + LSN lsn0(0); + EXPECT_EQ(OB_SUCCESS, log_group_buffer_.fill(lsn0, buf, log_size)); + LSN lsn1(1*1024*1024); + EXPECT_EQ(OB_SUCCESS, log_group_buffer_.fill(lsn1, buf, log_size)); + LSN lsn2(2*1024*1024); + EXPECT_EQ(OB_SUCCESS, log_group_buffer_.fill(lsn2, buf, log_size)); + + // 更新reuse lsn为1M, 继续提交2M的padding日志 + LSN lsn3(3*1024*1024); + EXPECT_EQ(OB_SUCCESS, log_group_buffer_.inc_update_reuse_lsn(lsn1)); + EXPECT_EQ(OB_SUCCESS, log_group_buffer_.fill_padding_body(lsn3, padding_valid_data, + padding_valid_data_len, 2*log_size)); + LSN lsn4(5*1024*1024); + // 预期已经没有可用空间 + EXPECT_EQ(OB_EAGAIN, log_group_buffer_.fill(lsn4, buf, 1)); + // |5.5|2|3|4|5.0| + // 判断5.0的数据是否是padding, 5号日志为2M, 其余日志为1M + EXPECT_EQ(0, memcmp(padding_valid_data, log_group_buffer_.data_buf_+3*1024*1024, padding_valid_data_len)); + char *zero_data = reinterpret_cast(ob_malloc(2*log_size, "unittest")); + ASSERT_NE(nullptr, zero_data); + memset(zero_data, PADDING_LOG_CONTENT_CHAR, 2*1024*1024); + EXPECT_EQ(0, memcmp(log_group_buffer_.data_buf_+3*1024*1024+1024, zero_data, 1*1024*1024-1*1024)); + // 更新reuse_lsn为4M-LogEntryHeader::PADDING_LOG_ENTRY_SIZE,此时正好可以将padding日志的有效日志体放在group_buffer尾部 + LSN reuse_lsn = LSN(unittest_log_buffer_size - LogEntryHeader::PADDING_LOG_ENTRY_SIZE); + EXPECT_EQ(OB_SUCCESS, log_group_buffer_.inc_update_reuse_lsn(reuse_lsn)); + LSN lsn5(reuse_lsn); + // 提交2M的padding日志,有效日志长度为padding_valid_data_len + // 2.x表示2号日志被复写部分 + // |5.x|2.x|3|4|5.0|, 5.0开始的地方为reuse_lsn + // 故意将5.x的一些数据搞坏,验证fill_padding_body是否会清0 + memset(log_group_buffer_.data_buf_, 'x', 1*1024); + EXPECT_EQ(OB_SUCCESS, log_group_buffer_.fill_padding_body(lsn5, padding_valid_data, padding_valid_data_len, 2*log_size)); + // 预期log_group_buffer的尾部数据恰好和padding_valid_data一样 + // log_group_buffer的padding剩余数据和padding_valid_data一样 + EXPECT_EQ(0, memcmp(log_group_buffer_.data_buf_+lsn5.val_, padding_valid_data, LogEntryHeader::PADDING_LOG_ENTRY_SIZE)); + EXPECT_EQ(0, memcmp(log_group_buffer_.data_buf_, padding_valid_data+LogEntryHeader::PADDING_LOG_ENTRY_SIZE, padding_valid_data_len-LogEntryHeader::PADDING_LOG_ENTRY_SIZE)); + EXPECT_EQ(0, memcmp(log_group_buffer_.data_buf_+LogEntryHeader::PADDING_LOG_ENTRY_SIZE, zero_data, 2*log_size-padding_valid_data_len)); + + // 更新reuse_lsn为4M-LogEntryHeader::PADDING_LOG_ENTRY_SIZE+10,此时padding日志的有效日志体需要放在group_buffer的首尾 + memset(log_group_buffer_.data_buf_, 'x', 1*1024); + EXPECT_EQ(OB_SUCCESS, log_group_buffer_.inc_update_reuse_lsn(reuse_lsn+10)); + LSN lsn6(reuse_lsn+10); + const int64_t tail_padding_log_size = LogEntryHeader::PADDING_LOG_ENTRY_SIZE - 10; + EXPECT_EQ(OB_SUCCESS, log_group_buffer_.fill_padding_body(lsn6, padding_valid_data, padding_valid_data_len, 2*log_size)); + EXPECT_EQ(0, memcmp(log_group_buffer_.data_buf_+lsn6.val_, padding_valid_data, tail_padding_log_size)); + EXPECT_EQ(0, memcmp(log_group_buffer_.data_buf_, padding_valid_data+tail_padding_log_size, padding_valid_data_len-tail_padding_log_size)); + EXPECT_EQ(0, memcmp(log_group_buffer_.data_buf_+padding_valid_data_len-tail_padding_log_size, zero_data, 2*log_size-padding_valid_data_len)); + + ob_free(buf); + buf = nullptr; + ob_free(zero_data); + zero_data = nullptr; +} + TEST_F(TestLogGroupBuffer, test_check_log_buf_wrapped) { LSN lsn;