diff --git a/mittest/logservice/CMakeLists.txt b/mittest/logservice/CMakeLists.txt index 1086108d2..6bab340bd 100644 --- a/mittest/logservice/CMakeLists.txt +++ b/mittest/logservice/CMakeLists.txt @@ -29,6 +29,6 @@ ob_unittest_clog(test_ob_simple_log_arb test_ob_simple_log_arb.cpp) ob_unittest_clog(test_ob_simple_log_single_replica_func test_ob_simple_log_single_replica_func.cpp) ob_unittest_clog(test_ob_simple_arb_server_single_replica test_ob_simple_arb_server_single_replica.cpp) ob_unittest_clog(test_ob_simple_arb_server_mutil_replica test_ob_simple_arb_server_mutil_replica.cpp) -#ob_unittest_clog(test_ob_simple_log_data_intergrity test_ob_simple_log_data_intergrity.cpp) +ob_unittest_clog(test_ob_simple_log_data_intergrity test_ob_simple_log_data_intergrity.cpp) add_subdirectory(archiveservice) diff --git a/mittest/logservice/test_ob_simple_log_basic_func.cpp b/mittest/logservice/test_ob_simple_log_basic_func.cpp index e19f7fab1..8af9f0556 100644 --- a/mittest/logservice/test_ob_simple_log_basic_func.cpp +++ b/mittest/logservice/test_ob_simple_log_basic_func.cpp @@ -415,7 +415,7 @@ TEST_F(TestObSimpleLogClusterBasicFunc, data_corrupted) { SET_CASE_LOG_FILE(TEST_NAME, "data_corrupted"); ObTimeGuard guard("data_corrupted", 0); - OB_LOGGER.set_log_level("INFO"); + OB_LOGGER.set_log_level("TRACE"); const int64_t id = ATOMIC_AAF(&palf_id_, 1); PALF_LOG(INFO, "start advance_base_lsn", K(id)); int64_t leader_idx = 0; diff --git a/mittest/logservice/test_ob_simple_log_data_intergrity.cpp b/mittest/logservice/test_ob_simple_log_data_intergrity.cpp index 81a203998..76ed3a381 100644 --- a/mittest/logservice/test_ob_simple_log_data_intergrity.cpp +++ b/mittest/logservice/test_ob_simple_log_data_intergrity.cpp @@ -34,8 +34,8 @@ public: {} }; -int64_t ObSimpleLogClusterTestBase::member_cnt_ = 2; -int64_t ObSimpleLogClusterTestBase::node_cnt_ = 2; +int64_t ObSimpleLogClusterTestBase::member_cnt_ = 1; +int64_t ObSimpleLogClusterTestBase::node_cnt_ = 1; std::string ObSimpleLogClusterTestBase::test_name_ = TEST_NAME; bool ObSimpleLogClusterTestBase::need_add_arb_server_ = false; @@ -50,12 +50,18 @@ int pwrite_one_log_by_log_storage(PalfHandleImplGuard &leader, const LogGroupEnt char block_path[OB_MAX_FILE_NAME_LENGTH] = {'\0'}; block_id_to_string(writable_block_id, block_path, OB_MAX_FILE_NAME_LENGTH); int block_fd = -1; - if (-1 == (block_fd = ::openat(dir_fd, block_path, O_WRONLY))) { + int64_t pos = 0; + char *serialize_buf = reinterpret_cast(ob_malloc(entry.get_serialize_size(), "MitTest")); + if (NULL == serialize_buf) { + ret = OB_ALLOCATE_MEMORY_FAILED; + } else if (-1 == (block_fd = ::openat(dir_fd, block_path, O_WRONLY))) { ret = convert_sys_errno(); PALF_LOG(ERROR, "openat failed", K(ret), K(block_path), KPC(log_storage)); + } else if (OB_FAIL(entry.serialize(serialize_buf, entry.get_serialize_size(), pos))) { + PALF_LOG(ERROR, "serialize failed", K(ret), K(block_path), KPC(log_storage), K(entry)); } else if (0 >= pwrite(block_fd, - entry.get_data_buf() - sizeof(LogGroupEntryHeader), - entry.get_serialize_size(), write_offset)) { + serialize_buf, + entry.get_serialize_size(), write_offset)) { ret = convert_sys_errno(); PALF_LOG(ERROR, "pwrite failed", K(ret), K(block_path), KPC(log_storage), K(write_offset), K(log_tail), K(entry)); } else { @@ -65,6 +71,35 @@ int pwrite_one_log_by_log_storage(PalfHandleImplGuard &leader, const LogGroupEnt ::close(block_fd); block_fd = -1; } + if (NULL != serialize_buf) { + ob_free(serialize_buf); + } + return ret; +} +typedef ObFunction DataFaultInject; +int make_log_group_entry_partial_error(LogGroupEntry &entry, char *&output_buf, DataFaultInject &inject) +{ + int ret = OB_SUCCESS; + if (!entry.is_valid() || !inject.is_valid()) { + ret = OB_INVALID_ARGUMENT; + return ret; + } + int64_t pos = 0; + char *serialize_buf = reinterpret_cast(ob_malloc(entry.get_serialize_size(), "MitTest")); + if (NULL == serialize_buf) { + ret = OB_ALLOCATE_MEMORY_FAILED; + } else if (OB_FAIL(entry.serialize(serialize_buf, entry.get_serialize_size(), pos))) { + PALF_LOG(ERROR, "serialize failed", K(ret), K(entry)); + } else { + inject(serialize_buf); + pos = 0; + entry.deserialize(serialize_buf, entry.get_serialize_size(), pos); + entry.buf_ = serialize_buf + entry.header_.get_serialize_size(); + output_buf = serialize_buf; + } + if (OB_FAIL(ret)) { + ob_free(serialize_buf); + } return ret; } @@ -73,38 +108,212 @@ TEST_F(TestObSimpleLogDataIntergrity, accumlate_checksum) SET_CASE_LOG_FILE(TEST_NAME, "accumlate_checksum"); OB_LOGGER.set_log_level("TRACE"); ObTimeGuard guard("accum_checksum", 0); - const int64_t id = ATOMIC_AAF(&palf_id_, 1); - const int64_t id_raw_write = ATOMIC_AAF(&palf_id_, 1); + int64_t id = ATOMIC_AAF(&palf_id_, 1); PALF_LOG(INFO, "start test accumlate checksum", K(id)); int64_t leader_idx = 0; - PalfHandleImplGuard leader; - EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, leader_idx, leader)); - EXPECT_EQ(OB_SUCCESS, submit_log(leader, 100, id, 100 * 1024)); - const LSN max_lsn = leader.get_palf_handle_impl()->get_max_lsn(); - EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, max_lsn)); - EXPECT_EQ(OB_ITER_END, read_log(leader)); - EXPECT_EQ(OB_SUCCESS, submit_log(leader, 2, id, 1234)); - const LSN end_max_lsn = leader.get_palf_handle_impl()->get_max_lsn(); - EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, end_max_lsn)); + { + PalfHandleImplGuard leader; + EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, leader_idx, leader)); + EXPECT_EQ(OB_SUCCESS, submit_log(leader, 10, id, 100 * 1024)); + const LSN max_lsn = leader.get_palf_handle_impl()->get_max_lsn(); + EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, max_lsn)); + EXPECT_EQ(OB_ITER_END, read_log(leader)); + EXPECT_EQ(OB_SUCCESS, submit_log(leader, 2, id, 1234)); + const LSN end_max_lsn = leader.get_palf_handle_impl()->get_max_lsn(); + EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, end_max_lsn)); + + LSN curr_lsn; + LogGroupEntry entry; + PalfGroupBufferIterator iterator; + EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->alloc_palf_group_buffer_iterator(max_lsn, iterator)); + EXPECT_EQ(OB_SUCCESS, iterator.next()); + EXPECT_EQ(OB_SUCCESS, iterator.get_entry(entry, curr_lsn)); + EXPECT_EQ(curr_lsn, max_lsn); + EXPECT_EQ(OB_SUCCESS, iterator.next()); + EXPECT_EQ(OB_SUCCESS, iterator.get_entry(entry, curr_lsn)); + EXPECT_EQ(OB_ITER_END, iterator.next()); + EXPECT_EQ(OB_SUCCESS, pwrite_one_log_by_log_storage(leader, entry, max_lsn)); + PALF_LOG(INFO, "start first check"); + EXPECT_EQ(OB_CHECKSUM_ERROR, read_log(leader, max_lsn)); + PALF_LOG(INFO, "end first check"); + EXPECT_EQ(OB_CHECKSUM_ERROR, read_log(leader)); + EXPECT_EQ(OB_CHECKSUM_ERROR, read_group_log(leader, max_lsn)); + EXPECT_EQ(OB_CHECKSUM_ERROR, read_group_log(leader, LSN(0))); + } + EXPECT_EQ(OB_SUCCESS, delete_paxos_group(id)); + PALF_LOG(INFO, "runlin trace delete_paxos_group"); + // 模拟最后一条的LogEntry非原子写入(LogEntry没有写入),报错OB_INVALID_DATA, 重启成功,预期log_tail是该日志头 + LSN expected_log_tail; + { + id = ATOMIC_AAF(&palf_id_, 1); + PalfHandleImplGuard leader; + EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, leader_idx, leader)); + EXPECT_EQ(OB_SUCCESS, submit_log(leader, 10, id, 100 * 1024)); + const LSN max_lsn = leader.get_palf_handle_impl()->get_max_lsn(); + EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, max_lsn)); + EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 100 * 1024)); + EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, leader.get_palf_handle_impl()->get_max_lsn())); + PalfGroupBufferIterator iterator; + LogGroupEntry entry; + LSN curr_lsn; + EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->alloc_palf_group_buffer_iterator(max_lsn, iterator)); + EXPECT_EQ(OB_SUCCESS, iterator.next()); + EXPECT_EQ(OB_SUCCESS, iterator.get_entry(entry, curr_lsn)); + EXPECT_EQ(curr_lsn, max_lsn); + // LogEntry完全被写坏 + char *output_buf = NULL; + int64_t pos = sizeof(LogGroupEntryHeader); + DataFaultInject inject = [&pos, &entry](char *buf) { + int64_t memset_len = entry.get_serialize_size()-pos; + memset(buf+pos, 0, memset_len); + }; + EXPECT_EQ(OB_SUCCESS, make_log_group_entry_partial_error(entry, output_buf, inject)); + EXPECT_EQ(OB_SUCCESS, pwrite_one_log_by_log_storage(leader, entry, max_lsn)); + EXPECT_EQ(OB_ITER_END, iterator.next()); + EXPECT_EQ(OB_INVALID_DATA, read_log(leader)); + if (NULL != output_buf) { + ob_free(output_buf); + } + expected_log_tail = curr_lsn; + } + PALF_LOG(INFO, "runlin trace first restart_paxos_groups begin"); + EXPECT_EQ(OB_SUCCESS, restart_paxos_groups()); + PALF_LOG(INFO, "runlin trace first restart_paxos_groups end"); + { + PalfHandleImplGuard leader; + EXPECT_EQ(OB_SUCCESS, get_leader(id, leader, leader_idx)); + EXPECT_EQ(expected_log_tail, leader.palf_handle_impl_->get_max_lsn()); + EXPECT_EQ(OB_SUCCESS, submit_log(leader, 10, id, 100 * 1024)); + } + PALF_LOG(INFO, "runlin trace second restart_paxos_groups begin"); + EXPECT_EQ(OB_SUCCESS, restart_paxos_groups()); + PALF_LOG(INFO, "runlin trace second restart_paxos_groups end"); + // 模拟最后一条的LogEntry非原子写入(LogEntry部分写入, datacheck sum以及后续的数据被写坏为0),报错OB_CHECKSUM_ERROR, 重启成功,预期log_tail是该日志头 + { + PalfHandleImplGuard leader; + EXPECT_EQ(OB_SUCCESS, get_leader(id, leader, leader_idx)); + EXPECT_EQ(OB_SUCCESS, submit_log(leader, 10, id, 100 * 1024)); + const LSN max_lsn = leader.get_palf_handle_impl()->get_max_lsn(); + EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, max_lsn)); + EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 100 * 1024)); + EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, leader.get_palf_handle_impl()->get_max_lsn())); + PalfGroupBufferIterator iterator; + LogGroupEntry entry; + LSN curr_lsn; + EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->alloc_palf_group_buffer_iterator(max_lsn, iterator)); + EXPECT_EQ(OB_SUCCESS, iterator.next()); + EXPECT_EQ(OB_SUCCESS, iterator.get_entry(entry, curr_lsn)); + EXPECT_EQ(curr_lsn, max_lsn); + // 模拟LogEntry的datachecsum以及后续的数据被置为全0 + // LogEntryHeader 16bit(maigc) 16bit(version) 32bit(size) 64bit(scn) datachecsum + char *output_buf = NULL; + int64_t pos = sizeof(LogGroupEntryHeader) + 16; + DataFaultInject inject = [&pos, &entry](char *buf) { + int64_t memset_len = entry.get_serialize_size()-pos; + memset(buf+pos, 0, memset_len); + }; + EXPECT_EQ(OB_SUCCESS, make_log_group_entry_partial_error(entry, output_buf, inject)); + EXPECT_EQ(OB_SUCCESS, pwrite_one_log_by_log_storage(leader, entry, max_lsn)); + EXPECT_EQ(OB_ITER_END, iterator.next()); + EXPECT_EQ(OB_CHECKSUM_ERROR, read_log(leader)); + if (NULL != output_buf) { + ob_free(output_buf); + } + expected_log_tail = curr_lsn; + } + PALF_LOG(INFO, "runlin trace third restart_paxos_groups begin"); + EXPECT_EQ(OB_SUCCESS, restart_paxos_groups()); + PALF_LOG(INFO, "runlin trace third restart_paxos_groups end"); + { + PalfHandleImplGuard leader; + EXPECT_EQ(OB_SUCCESS, get_leader(id, leader, leader_idx)); + EXPECT_EQ(expected_log_tail, leader.palf_handle_impl_->get_max_lsn()); + } + // 模拟最后一条的LogEntryHeadr bit位反转, 报错OB_INVALID_DATA, 重启成功,预期log_tail是该日志头 + { + PalfHandleImplGuard leader; + EXPECT_EQ(OB_SUCCESS, get_leader(id, leader, leader_idx)); + EXPECT_EQ(OB_SUCCESS, submit_log(leader, 10, id, 100 * 1024)); + const LSN max_lsn = leader.get_palf_handle_impl()->get_max_lsn(); + EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, max_lsn)); + EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 100 * 1024)); + EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, leader.get_palf_handle_impl()->get_max_lsn())); + PalfGroupBufferIterator iterator; + LogGroupEntry entry; + LSN curr_lsn; + EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->alloc_palf_group_buffer_iterator(max_lsn, iterator)); + EXPECT_EQ(OB_SUCCESS, iterator.next()); + EXPECT_EQ(OB_SUCCESS, iterator.get_entry(entry, curr_lsn)); + EXPECT_EQ(curr_lsn, max_lsn); + // 模拟LogEntry的datachecsum以及后续的数据被置为全0 + // LogEntryHeader 16bit(maigc) 16bit(version) 32bit(size) 64bit(scn) datachecsum + char *output_buf = NULL; + int64_t pos = sizeof(LogGroupEntryHeader) + 14; + DataFaultInject inject = [&pos](char *buf) { + char ch = buf[pos]; + int random_bit = rand() % 8; + int bit_value = 1 << random_bit; + char tmp_ch = (ch ^ bit_value); + PALF_LOG(INFO, "runlin trace print", K(pos), K(random_bit), K(bit_value), K(ch), K(tmp_ch)); + buf[pos] = tmp_ch; + }; + EXPECT_EQ(OB_SUCCESS, make_log_group_entry_partial_error(entry, output_buf, inject)); + EXPECT_EQ(OB_SUCCESS, pwrite_one_log_by_log_storage(leader, entry, max_lsn)); + EXPECT_EQ(OB_ITER_END, iterator.next()); + EXPECT_EQ(OB_INVALID_DATA, read_log(leader)); + if (NULL != output_buf) { + ob_free(output_buf); + } + expected_log_tail = curr_lsn; + } + PALF_LOG(INFO, "runlin trace fourth restart_paxos_groups begin"); + EXPECT_EQ(OB_SUCCESS, restart_paxos_groups()); + PALF_LOG(INFO, "runlin trace fourth restart_paxos_groups end"); + // 模拟最后一条的LogGroupEntryHeadr bit位反转, 报错OB_INVALID_DATA, 重启成功,预期log_tail是该日志头 + { + PalfHandleImplGuard leader; + EXPECT_EQ(OB_SUCCESS, get_leader(id, leader, leader_idx)); + EXPECT_EQ(OB_SUCCESS, submit_log(leader, 10, id, 100 * 1024)); + const LSN max_lsn = leader.get_palf_handle_impl()->get_max_lsn(); + EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, max_lsn)); + EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 100 * 1024)); + EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, leader.get_palf_handle_impl()->get_max_lsn())); + PalfGroupBufferIterator iterator; + LogGroupEntry entry; + LSN curr_lsn; + EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->alloc_palf_group_buffer_iterator(max_lsn, iterator)); + EXPECT_EQ(OB_SUCCESS, iterator.next()); + EXPECT_EQ(OB_SUCCESS, iterator.get_entry(entry, curr_lsn)); + EXPECT_EQ(curr_lsn, max_lsn); + // 模拟LogGroupEntryHeader bit位反转 + char *output_buf = NULL; + int64_t pos = 14; + DataFaultInject inject = [&pos](char *buf) { + char ch = buf[pos]; + int random_bit = rand() % 8; + int bit_value = 1 << random_bit; + char tmp_ch = (ch ^ bit_value); + PALF_LOG(INFO, "runlin trace print", K(pos), K(random_bit), K(bit_value), K(ch), K(tmp_ch)); + buf[pos] = tmp_ch; + }; + EXPECT_EQ(OB_SUCCESS, make_log_group_entry_partial_error(entry, output_buf, inject)); + EXPECT_EQ(OB_SUCCESS, pwrite_one_log_by_log_storage(leader, entry, max_lsn)); + EXPECT_EQ(OB_ITER_END, iterator.next()); + EXPECT_EQ(OB_INVALID_DATA, read_log(leader)); + if (NULL != output_buf) { + ob_free(output_buf); + } + expected_log_tail = curr_lsn; + } + PALF_LOG(INFO, "runlin trace five restart_paxos_groups begin"); + EXPECT_EQ(OB_SUCCESS, restart_paxos_groups()); + { + PalfHandleImplGuard leader; + EXPECT_EQ(OB_SUCCESS, get_leader(id, leader, leader_idx)); + EXPECT_EQ(expected_log_tail, leader.palf_handle_impl_->get_max_lsn()); + } - LSN curr_lsn; - LogGroupEntry entry; - PalfGroupBufferIterator iterator; - EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->alloc_palf_group_buffer_iterator(max_lsn, iterator)); - EXPECT_EQ(OB_SUCCESS, iterator.next()); - EXPECT_EQ(OB_SUCCESS, iterator.get_entry(entry, curr_lsn)); - EXPECT_EQ(curr_lsn, max_lsn); - EXPECT_EQ(OB_SUCCESS, iterator.next()); - EXPECT_EQ(OB_SUCCESS, iterator.get_entry(entry, curr_lsn)); - EXPECT_EQ(OB_ITER_END, iterator.next()); - EXPECT_EQ(OB_SUCCESS, pwrite_one_log_by_log_storage(leader, entry, max_lsn)); - PALF_LOG(INFO, "start first check"); - EXPECT_EQ(OB_CHECKSUM_ERROR, read_log(leader, max_lsn)); - PALF_LOG(INFO, "end first check"); - EXPECT_EQ(OB_CHECKSUM_ERROR, read_log(leader)); - EXPECT_EQ(OB_CHECKSUM_ERROR, read_group_log(leader, max_lsn)); - EXPECT_EQ(OB_CHECKSUM_ERROR, read_group_log(leader, LSN(0))); PALF_LOG(INFO, "end test accumlate checksum", K(id)); } @@ -138,8 +347,7 @@ TEST_F(TestObSimpleLogDataIntergrity, log_corrupted) EXPECT_EQ(OB_SUCCESS, iterator.next()); EXPECT_EQ(OB_SUCCESS, iterator.get_entry(entry, curr_lsn)); EXPECT_EQ(OB_ITER_END, iterator.next()); - char *buf = const_cast(entry.buf_); - buf[4] = 1; + entry.header_.group_size_ = 2; EXPECT_EQ(OB_SUCCESS, pwrite_one_log_by_log_storage(leader, entry, max_lsn)); PALF_LOG(INFO, "start first check"); EXPECT_EQ(OB_INVALID_DATA, read_log(leader, max_lsn)); diff --git a/src/logservice/palf/log_iterator_impl.h b/src/logservice/palf/log_iterator_impl.h index 70b94acca..c1f4ac9d5 100644 --- a/src/logservice/palf/log_iterator_impl.h +++ b/src/logservice/palf/log_iterator_impl.h @@ -698,7 +698,7 @@ int LogIteratorImpl::verify_accum_checksum_(const LogGroupEntry &entry, } else if (OB_FAIL(LogChecksum::verify_accum_checksum( accumlate_checksum_, data_checksum, expected_verify_checksum, new_accumlate_checksum))) { - PALF_LOG(ERROR, "verify accumlate checksum failed", K(ret), KPC(this), K(entry)); + PALF_LOG(WARN, "verify accumlate checksum failed", K(ret), KPC(this), K(entry)); } else { PALF_LOG(TRACE, "verify_accum_checksum_ success", K(ret), KPC(this), K(entry)); } diff --git a/src/logservice/palf/log_storage.h b/src/logservice/palf/log_storage.h index 1bda72a39..1b6ceb912 100644 --- a/src/logservice/palf/log_storage.h +++ b/src/logservice/palf/log_storage.h @@ -260,6 +260,7 @@ int LogStorage::locate_log_tail_and_last_valid_entry_header_(const block_id_t mi update_log_tail_guarded_by_lock_(LSN((max_block_id + 1) * logical_block_size_)); // the last block may has not valid data, we need iterate prev block // for GC, we must ensure that the block which include 'max_committed_lsn' will no be reused + const bool need_print_error = false; while (OB_SUCC(ret) && true == is_valid_block_id(iterate_block_id) && iterate_block_id >= min_block_id) { // NB: 'log_tail_' need point to the tail of 'iterate_block_id', because 'pread' interface @@ -272,6 +273,7 @@ int LogStorage::locate_log_tail_and_last_valid_entry_header_(const block_id_t mi if (OB_FAIL(iterator.init(start_lsn, get_file_end_lsn, this))) { PALF_LOG(WARN, "PalfGroupBufferIterator init failed", K(ret), K(start_lsn)); } else { + iterator.set_need_print_error(need_print_error); EntryType curr_entry; LSN curr_lsn; while (OB_SUCC(ret) && OB_SUCC(iterator.next())) { @@ -283,7 +285,7 @@ int LogStorage::locate_log_tail_and_last_valid_entry_header_(const block_id_t mi } } if (OB_ITER_END == ret - || (OB_INVALID_DATA == ret && true == iterator.check_is_the_last_entry())) { + || ((OB_CHECKSUM_ERROR == ret || OB_INVALID_DATA == ret) && true == iterator.check_is_the_last_entry())) { ret = OB_SUCCESS; // NB: lsn is valid when there are some valid data in last block, otherwise, we need // iterate prev block. diff --git a/src/logservice/palf/palf_iterator.h b/src/logservice/palf/palf_iterator.h index 079fdd33d..2df6bfeff 100644 --- a/src/logservice/palf/palf_iterator.h +++ b/src/logservice/palf/palf_iterator.h @@ -27,7 +27,7 @@ template class PalfIterator { public: - PalfIterator() : iterator_storage_(), iterator_impl_(), is_inited_(false) {} + PalfIterator() : iterator_storage_(), iterator_impl_(), need_print_error_(true), is_inited_(false) {} ~PalfIterator() {destroy();} int init(const LSN &start_offset, @@ -97,12 +97,12 @@ public: { int ret = OB_SUCCESS; const share::SCN replayable_point_scn = SCN::max_scn(); + bool iterate_end_by_replayable_point = false; + SCN next_min_scn; if (IS_NOT_INIT) { ret = OB_NOT_INIT; - } else if (OB_FAIL(iterator_impl_.next(replayable_point_scn)) && OB_ITER_END != ret) { - PALF_LOG(WARN, "PalfIterator next failed", K(ret), KPC(this)); } else { - PALF_LOG(TRACE, "PalfIterator next success", K(ret), KPC(this)); + ret = next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point); } return ret; } @@ -121,12 +121,12 @@ public: int next(const share::SCN &replayable_point_scn) { int ret = OB_SUCCESS; + bool iterate_end_by_replayable_point = false; + SCN next_min_scn; if (IS_NOT_INIT) { ret = OB_NOT_INIT; - } else if (OB_FAIL(iterator_impl_.next(replayable_point_scn)) && OB_ITER_END != ret) { - PALF_LOG(WARN, "PalfIterator next failed", K(ret), KPC(this)); } else { - PALF_LOG(TRACE, "PalfIterator next success", K(ret), KPC(this)); + ret = next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point); } return ret; } @@ -144,7 +144,9 @@ public: // need read data from storage eagin.(data in cache will not been clean up, therefore, // user need used a new iterator to read data again) // OB_ERR_OUT_LOWER_BOUND, block has been recycled - int next(const share::SCN &replayable_point_scn, share::SCN &next_min_scn, bool &iterate_end_by_replayable_point) + int next(const share::SCN &replayable_point_scn, + share::SCN &next_min_scn, + bool &iterate_end_by_replayable_point) { int ret = OB_SUCCESS; if (IS_NOT_INIT) { @@ -152,6 +154,7 @@ public: } else if (OB_FAIL(iterator_impl_.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point)) && OB_ITER_END != ret) { PALF_LOG(WARN, "PalfIterator next failed", K(ret), KPC(this)); + print_error_log(ret); } else { PALF_LOG(TRACE, "PalfIterator next success", K(iterator_impl_), K(ret), KPC(this), K(replayable_point_scn), K(next_min_scn), K(iterate_end_by_replayable_point)); @@ -243,6 +246,16 @@ public: { return iterator_impl_.get_curr_read_lsn(); } + void print_error_log(int ret) const + { + if (need_print_error_ && (OB_INVALID_DATA == ret || OB_CHECKSUM_ERROR == ret)) { + PALF_LOG_RET(ERROR, ret, "invliad data or checksum error!!!", KPC(this)); + } + } + void set_need_print_error(const bool need_print_error) + { + need_print_error_ = need_print_error; + } TO_STRING_KV(K_(iterator_impl)); private: @@ -271,6 +284,7 @@ private: private: PalfIteratorStorage iterator_storage_; LogIteratorImpl iterator_impl_; + bool need_print_error_; bool is_inited_; };