From 9716f60c5838f17bc73b5b58dd0e848501004598 Mon Sep 17 00:00:00 2001 From: HaHaJeff Date: Mon, 27 Feb 2023 14:11:06 +0000 Subject: [PATCH] add defense code in palf --- mittest/logservice/CMakeLists.txt | 1 + .../test_ob_simple_log_data_intergrity.cpp | 159 ++++++++++++++++++ src/logservice/CMakeLists.txt | 1 + src/logservice/palf/log_block_mgr.cpp | 63 ++++++- src/logservice/palf/log_block_mgr.h | 2 + src/logservice/palf/log_checksum.cpp | 44 +++-- src/logservice/palf/log_checksum.h | 4 + src/logservice/palf/log_group_entry.cpp | 11 +- src/logservice/palf/log_group_entry.h | 1 + .../palf/log_group_entry_header.cpp | 1 + src/logservice/palf/log_io_uitls.h | 29 ++++ src/logservice/palf/log_io_utils.cpp | 65 +++++++ src/logservice/palf/log_iterator_impl.h | 130 ++++++++------ 13 files changed, 444 insertions(+), 67 deletions(-) create mode 100644 mittest/logservice/test_ob_simple_log_data_intergrity.cpp create mode 100644 src/logservice/palf/log_io_uitls.h create mode 100644 src/logservice/palf/log_io_utils.cpp diff --git a/mittest/logservice/CMakeLists.txt b/mittest/logservice/CMakeLists.txt index a3419212f8..6bab340bd4 100644 --- a/mittest/logservice/CMakeLists.txt +++ b/mittest/logservice/CMakeLists.txt @@ -29,5 +29,6 @@ ob_unittest_clog(test_ob_simple_log_arb test_ob_simple_log_arb.cpp) ob_unittest_clog(test_ob_simple_log_single_replica_func test_ob_simple_log_single_replica_func.cpp) ob_unittest_clog(test_ob_simple_arb_server_single_replica test_ob_simple_arb_server_single_replica.cpp) ob_unittest_clog(test_ob_simple_arb_server_mutil_replica test_ob_simple_arb_server_mutil_replica.cpp) +ob_unittest_clog(test_ob_simple_log_data_intergrity test_ob_simple_log_data_intergrity.cpp) add_subdirectory(archiveservice) diff --git a/mittest/logservice/test_ob_simple_log_data_intergrity.cpp b/mittest/logservice/test_ob_simple_log_data_intergrity.cpp new file mode 100644 index 0000000000..81a203998c --- /dev/null +++ b/mittest/logservice/test_ob_simple_log_data_intergrity.cpp @@ -0,0 +1,159 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#include +#include +#include +#include +#define private public +#include "env/ob_simple_log_cluster_env.h" +#undef private + +const std::string TEST_NAME = "data_integrity"; + +using namespace oceanbase::common; +using namespace oceanbase; +namespace oceanbase +{ +using namespace logservice; +namespace unittest +{ +class TestObSimpleLogDataIntergrity: public ObSimpleLogClusterTestEnv +{ +public: + TestObSimpleLogDataIntergrity() : ObSimpleLogClusterTestEnv() + {} +}; + +int64_t ObSimpleLogClusterTestBase::member_cnt_ = 2; +int64_t ObSimpleLogClusterTestBase::node_cnt_ = 2; +std::string ObSimpleLogClusterTestBase::test_name_ = TEST_NAME; +bool ObSimpleLogClusterTestBase::need_add_arb_server_ = false; + +int pwrite_one_log_by_log_storage(PalfHandleImplGuard &leader, const LogGroupEntry &entry, const LSN &lsn) +{ + int ret = OB_SUCCESS; + LogStorage *log_storage = &leader.palf_handle_impl_->log_engine_.log_storage_; + int dir_fd = log_storage->block_mgr_.dir_fd_; + block_id_t writable_block_id = log_storage->block_mgr_.curr_writable_block_id_; + LSN log_tail = log_storage->log_tail_; + offset_t write_offset = log_storage->get_phy_offset_(lsn); + char block_path[OB_MAX_FILE_NAME_LENGTH] = {'\0'}; + block_id_to_string(writable_block_id, block_path, OB_MAX_FILE_NAME_LENGTH); + int block_fd = -1; + if (-1 == (block_fd = ::openat(dir_fd, block_path, O_WRONLY))) { + ret = convert_sys_errno(); + PALF_LOG(ERROR, "openat failed", K(ret), K(block_path), KPC(log_storage)); + } else if (0 >= pwrite(block_fd, + entry.get_data_buf() - sizeof(LogGroupEntryHeader), + entry.get_serialize_size(), write_offset)) { + ret = convert_sys_errno(); + PALF_LOG(ERROR, "pwrite failed", K(ret), K(block_path), KPC(log_storage), K(write_offset), K(log_tail), K(entry)); + } else { + + } + if (-1 != block_fd) { + ::close(block_fd); + block_fd = -1; + } + return ret; +} + +TEST_F(TestObSimpleLogDataIntergrity, accumlate_checksum) +{ + SET_CASE_LOG_FILE(TEST_NAME, "accumlate_checksum"); + OB_LOGGER.set_log_level("TRACE"); + ObTimeGuard guard("accum_checksum", 0); + const int64_t id = ATOMIC_AAF(&palf_id_, 1); + const int64_t id_raw_write = ATOMIC_AAF(&palf_id_, 1); + PALF_LOG(INFO, "start test accumlate checksum", K(id)); + int64_t leader_idx = 0; + + PalfHandleImplGuard leader; + EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, leader_idx, leader)); + EXPECT_EQ(OB_SUCCESS, submit_log(leader, 100, id, 100 * 1024)); + const LSN max_lsn = leader.get_palf_handle_impl()->get_max_lsn(); + EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, max_lsn)); + EXPECT_EQ(OB_ITER_END, read_log(leader)); + EXPECT_EQ(OB_SUCCESS, submit_log(leader, 2, id, 1234)); + const LSN end_max_lsn = leader.get_palf_handle_impl()->get_max_lsn(); + EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, end_max_lsn)); + + LSN curr_lsn; + LogGroupEntry entry; + PalfGroupBufferIterator iterator; + EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->alloc_palf_group_buffer_iterator(max_lsn, iterator)); + EXPECT_EQ(OB_SUCCESS, iterator.next()); + EXPECT_EQ(OB_SUCCESS, iterator.get_entry(entry, curr_lsn)); + EXPECT_EQ(curr_lsn, max_lsn); + EXPECT_EQ(OB_SUCCESS, iterator.next()); + EXPECT_EQ(OB_SUCCESS, iterator.get_entry(entry, curr_lsn)); + EXPECT_EQ(OB_ITER_END, iterator.next()); + EXPECT_EQ(OB_SUCCESS, pwrite_one_log_by_log_storage(leader, entry, max_lsn)); + PALF_LOG(INFO, "start first check"); + EXPECT_EQ(OB_CHECKSUM_ERROR, read_log(leader, max_lsn)); + PALF_LOG(INFO, "end first check"); + EXPECT_EQ(OB_CHECKSUM_ERROR, read_log(leader)); + EXPECT_EQ(OB_CHECKSUM_ERROR, read_group_log(leader, max_lsn)); + EXPECT_EQ(OB_CHECKSUM_ERROR, read_group_log(leader, LSN(0))); + PALF_LOG(INFO, "end test accumlate checksum", K(id)); +} + +TEST_F(TestObSimpleLogDataIntergrity, log_corrupted) +{ + SET_CASE_LOG_FILE(TEST_NAME, "log_corrupted"); + OB_LOGGER.set_log_level("TRACE"); + ObTimeGuard guard("log_corrupted", 0); + const int64_t id = ATOMIC_AAF(&palf_id_, 1); + const int64_t id_raw_write = ATOMIC_AAF(&palf_id_, 1); + PALF_LOG(INFO, "start test log corrupted", K(id)); + int64_t leader_idx = 0; + + PalfHandleImplGuard leader; + EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, leader_idx, leader)); + EXPECT_EQ(OB_SUCCESS, submit_log(leader, 100, id, 100 * 1024)); + const LSN max_lsn = leader.get_palf_handle_impl()->get_max_lsn(); + EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, max_lsn)); + EXPECT_EQ(OB_ITER_END, read_log(leader)); + EXPECT_EQ(OB_SUCCESS, submit_log(leader, 2, id, 1234)); + const LSN end_max_lsn = leader.get_palf_handle_impl()->get_max_lsn(); + EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, end_max_lsn)); + + LSN curr_lsn; + LogGroupEntry entry; + PalfGroupBufferIterator iterator; + EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->alloc_palf_group_buffer_iterator(max_lsn, iterator)); + EXPECT_EQ(OB_SUCCESS, iterator.next()); + EXPECT_EQ(OB_SUCCESS, iterator.get_entry(entry, curr_lsn)); + EXPECT_EQ(curr_lsn, max_lsn); + EXPECT_EQ(OB_SUCCESS, iterator.next()); + EXPECT_EQ(OB_SUCCESS, iterator.get_entry(entry, curr_lsn)); + EXPECT_EQ(OB_ITER_END, iterator.next()); + char *buf = const_cast(entry.buf_); + buf[4] = 1; + EXPECT_EQ(OB_SUCCESS, pwrite_one_log_by_log_storage(leader, entry, max_lsn)); + PALF_LOG(INFO, "start first check"); + EXPECT_EQ(OB_INVALID_DATA, read_log(leader, max_lsn)); + PALF_LOG(INFO, "end first check"); + EXPECT_EQ(OB_INVALID_DATA, read_log(leader)); + EXPECT_EQ(OB_INVALID_DATA, read_group_log(leader, max_lsn)); + EXPECT_EQ(OB_INVALID_DATA, read_group_log(leader, LSN(0))); + PALF_LOG(INFO, "end test log corrupted", K(id)); +} + +} // end unittest +} // end oceanbase + +int main(int argc, char **argv) +{ + RUN_SIMPLE_LOG_CLUSTER_TEST(TEST_NAME); +} diff --git a/src/logservice/CMakeLists.txt b/src/logservice/CMakeLists.txt index 4441628dc9..8ebad69809 100644 --- a/src/logservice/CMakeLists.txt +++ b/src/logservice/CMakeLists.txt @@ -141,6 +141,7 @@ ob_set_subtarget(ob_logservice palf palf/palf_options.cpp palf/log_mode_mgr.cpp palf/log_updater.cpp + palf/log_io_utils.cpp ) ob_set_subtarget(ob_logservice palf_election diff --git a/src/logservice/palf/log_block_mgr.cpp b/src/logservice/palf/log_block_mgr.cpp index cd4f30ffd4..ca07926150 100644 --- a/src/logservice/palf/log_block_mgr.cpp +++ b/src/logservice/palf/log_block_mgr.cpp @@ -14,7 +14,6 @@ #include // std::sort #include // renameat #include // ::open -#include "lib/lock/ob_spin_lock.h" #include "lib/ob_define.h" // some constexpr #include "lib/ob_errno.h" // OB_SUCCESS... #include "lib/container/ob_se_array_iterator.h" // ObSEArrayIterator @@ -22,6 +21,7 @@ #include "share/ob_errno.h" // OB_NO_SUCH_FILE_OR_DIRECTORY #include "log_writer_utils.h" // LogWriteBuf #include "lsn.h" // LSN +#include "log_io_uitls.h" // openat_with_retry namespace oceanbase { @@ -345,13 +345,17 @@ int LogBlockMgr::do_truncate_(const block_id_t block_id, ret = OB_ERR_UNEXPECTED; PALF_LOG(ERROR, "unexpected error, block id is not same sa curr_writable_block_id_", K(ret), KPC(this), K(block_id)); - } else if (OB_FAIL(block_id_to_string(block_id, block_path, OB_MAX_FILE_NAME_LENGTH)) - && OB_FAIL(curr_writable_handler_.close() - && OB_FAIL(curr_writable_handler_.open(block_path)))) { + } else if (OB_FAIL(block_id_to_string(block_id, block_path, OB_MAX_FILE_NAME_LENGTH))) { + PALF_LOG(ERROR, "block_id_ti_string failed", K(ret), K(block_id), KPC(this)); + } else if (OB_FAIL(curr_writable_handler_.close())) { + PALF_LOG(ERROR, "close curr_writable_handler_ failed", K(ret), K(block_id), KPC(this)); + } else if (OB_FAIL(curr_writable_handler_.open(block_path))) { PALF_LOG(ERROR, "open block after delete_block_from_back_to_front_until_ failed", K(ret), K(block_id), KPC(this)); } else if (OB_FAIL(curr_writable_handler_.truncate(offset))) { PALF_LOG(WARN, "truncate curr_writable_handler_ failed", K(ret), K(block_id), K(offset)); + } else if (OB_FAIL(check_after_truncate_(block_path, offset))) { + PALF_LOG(ERROR, "check_after_truncate_ failed", K(ret), K(block_id), K(offset)); } else if (OB_FAIL(curr_writable_handler_.load_data(offset))) { PALF_LOG(WARN, "load_data failed", K(ret), K(block_id), K(offset)); } else { @@ -361,6 +365,57 @@ int LogBlockMgr::do_truncate_(const block_id_t block_id, return ret; } +int LogBlockMgr::check_after_truncate_(const char *block_path, const offset_t offset) +{ + int ret = OB_SUCCESS; + int fd = -1; + char *buf = NULL; + const int buf_len = 8*1024; + char *expected_data = NULL; + offset_t read_offset = lower_align(offset, LOG_DIO_ALIGN_SIZE); + const int backoff = offset - read_offset; + // The min length of PADDING is 4K, therefore, read_offset may be 64MB-4KB, + // in_read_size is 4K, otherwise, in_read_size is 8K. + const int in_read_size = MIN(buf_len, log_block_size_-read_offset); + OB_ASSERT(backoff < LOG_DIO_ALIGN_SIZE); + if (OB_FAIL(openat_with_retry(dir_fd_, block_path, LOG_READ_FLAG, FILE_OPEN_MODE, fd))) { + PALF_LOG(ERROR, "openat_with_retry failed", KPC(this), K(block_path)); + } else if (NULL == (expected_data = \ + reinterpret_cast(ob_malloc(buf_len, "LogBlockMgr")))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + PALF_LOG(ERROR, "malloc failed", KPC(this)); + } else if (FALSE_IT(memset(expected_data, 0, buf_len))) { + } else if (NULL == (buf = \ + reinterpret_cast(ob_malloc_align(LOG_DIO_ALIGN_SIZE, buf_len, "LogBlockMgr")))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + PALF_LOG(ERROR, "malloc failed", KPC(this)); + } else if (in_read_size!= (ob_pread(fd, buf, in_read_size, read_offset))) { + ret = convert_sys_errno(); + PALF_LOG(ERROR, "ob_pread failed", KPC(this), K(fd), K(offset)); + // TODO by runlin: after support reuse block, need use another method. + } else if (0 != MEMCMP(buf+backoff, expected_data, in_read_size-backoff)) { + ret = OB_ERR_UNEXPECTED; + while (OB_FAIL(ret)) { + PALF_LOG(ERROR, "after truncate, data is not zero", KPC(this), K(fd), K(offset), + KP(buf), KP(expected_data), K(in_read_size), K(backoff)); + usleep(1000*1000); + } + } else { + PALF_LOG(INFO, "check_after_truncate_ success", KPC(this), K(block_path), K(offset)); + } + + if (-1 != fd && OB_FAIL(close_with_retry(fd))) { + PALF_LOG(ERROR, "close_with_retry failed", KPC(this), K(block_path)); + } + if (NULL != buf) { + ob_free_align(buf); + } + if (NULL != expected_data) { + ob_free(expected_data); + } + return ret; +} + int LogBlockMgr::delete_block_from_back_to_front_until_(const block_id_t block_id) { int ret = OB_SUCCESS; diff --git a/src/logservice/palf/log_block_mgr.h b/src/logservice/palf/log_block_mgr.h index 9b1fe0d8ae..fa1400e05b 100644 --- a/src/logservice/palf/log_block_mgr.h +++ b/src/logservice/palf/log_block_mgr.h @@ -93,6 +93,8 @@ private: int do_rename_and_fsync_(const char *block_path, const char *tmp_block_path); bool empty_() const; int try_recovery_last_block_(const char *log_dir); + + int check_after_truncate_(const char *block_path, const offset_t offset); const int64_t SLEEP_TS_US = 1 * 1000; private: diff --git a/src/logservice/palf/log_checksum.cpp b/src/logservice/palf/log_checksum.cpp index f01e1b7e79..d5973cdfef 100644 --- a/src/logservice/palf/log_checksum.cpp +++ b/src/logservice/palf/log_checksum.cpp @@ -72,24 +72,40 @@ int LogChecksum::verify_accum_checksum(const int64_t data_checksum, // This interface is re-entrant. // If canlculated checksum is unexpected, the verify_checksum_ won't change. int ret = common::OB_SUCCESS; + int64_t new_verify_checksum = -1; + int64_t old_verify_checksum = verify_checksum_; if (IS_NOT_INIT) { ret = OB_NOT_INIT; + } else if (OB_FAIL(verify_accum_checksum(old_verify_checksum, data_checksum, accum_checksum, new_verify_checksum))) { + PALF_LOG(ERROR, "verify_accum_checksum failed", K(data_checksum), K(accum_checksum), K(old_verify_checksum)); } else { - const int64_t old_verify_checksum = verify_checksum_; - const int64_t new_verify_checksum = common::ob_crc64(verify_checksum_, const_cast(&data_checksum), - sizeof(data_checksum)); - if (new_verify_checksum != accum_checksum) { - // Checksum error occurs, verify_checksum_ won't change. - ret = common::OB_CHECKSUM_ERROR; - LOG_DBA_ERROR(OB_CHECKSUM_ERROR, "msg", "log checksum error", "ret", ret, K_(palf_id), K(data_checksum), - K(accum_checksum), K(old_verify_checksum), K(new_verify_checksum)); - } else { - // Update verify_checksum_ when checking succeeds. - verify_checksum_ = new_verify_checksum; - PALF_LOG(TRACE, "verify_accum_checksum success", K(ret), K_(palf_id), K(data_checksum), K(accum_checksum), - K_(verify_checksum), K_(accum_checksum)); - } + // Update verify_checksum_ when checking succeeds. + verify_checksum_ = new_verify_checksum; + PALF_LOG(TRACE, "verify_accum_checksum success", K(ret), K_(palf_id), K(data_checksum), K(accum_checksum), + K_(verify_checksum), K_(accum_checksum)); } + + return ret; +} + +int LogChecksum::verify_accum_checksum(const int64_t old_accum_checksum, + const int64_t data_checksum, + const int64_t expected_accum_checksum, + int64_t &new_accum_checksum) +{ + + int ret = common::OB_SUCCESS; + new_accum_checksum = common::ob_crc64(old_accum_checksum, const_cast(&data_checksum), + sizeof(data_checksum)); + if (new_accum_checksum != expected_accum_checksum) { + ret = common::OB_CHECKSUM_ERROR; + LOG_DBA_ERROR(OB_CHECKSUM_ERROR, "msg", "log checksum error", "ret", ret, K(data_checksum), + K(expected_accum_checksum), K(old_accum_checksum), K(new_accum_checksum)); + } else { + PALF_LOG(TRACE, "verify_accum_checksum success", K(ret), K(data_checksum), + K(expected_accum_checksum), K(old_accum_checksum), K(new_accum_checksum)); + } + return ret; } diff --git a/src/logservice/palf/log_checksum.h b/src/logservice/palf/log_checksum.h index 38124ecbab..e176992988 100644 --- a/src/logservice/palf/log_checksum.h +++ b/src/logservice/palf/log_checksum.h @@ -32,6 +32,10 @@ public: int64_t &accum_checksum); virtual int verify_accum_checksum(const int64_t data_checksum, const int64_t accum_checksum); + static int verify_accum_checksum(const int64_t old_accum_checksum, + const int64_t data_checksum, + const int64_t expected_accum_checksum, + int64_t &new_accum_checksum); virtual void set_accum_checksum(const int64_t accum_checksum); virtual void set_verify_checksum(const int64_t verify_checksum); virtual int rollback_accum_checksum(const int64_t curr_accum_checksum); diff --git a/src/logservice/palf/log_group_entry.cpp b/src/logservice/palf/log_group_entry.cpp index 7ecefb4047..5e49b2c275 100644 --- a/src/logservice/palf/log_group_entry.cpp +++ b/src/logservice/palf/log_group_entry.cpp @@ -71,10 +71,17 @@ void LogGroupEntry::reset() bool LogGroupEntry::check_integrity() const { - int64_t data_len = header_.get_data_len(); - return header_.check_integrity(buf_, data_len); + int64_t unused_data_checksum = -1; + return check_integrity(unused_data_checksum); } +bool LogGroupEntry::check_integrity(int64_t &data_checksum) const +{ + int64_t data_len = header_.get_data_len(); + return header_.check_integrity(buf_, data_len, data_checksum); +} + + int LogGroupEntry::truncate(const SCN &upper_limit_scn, const int64_t pre_accum_checksum) { return header_.truncate(buf_, get_data_len(), upper_limit_scn, pre_accum_checksum); diff --git a/src/logservice/palf/log_group_entry.h b/src/logservice/palf/log_group_entry.h index db705d7c15..bcb09d0f93 100644 --- a/src/logservice/palf/log_group_entry.h +++ b/src/logservice/palf/log_group_entry.h @@ -38,6 +38,7 @@ public: bool is_valid() const; void reset(); bool check_integrity() const; + bool check_integrity(int64_t &data_checksum) const; int64_t get_header_size() const { return header_.get_serialize_size(); } int64_t get_payload_offset() const { return header_.get_serialize_size() + (header_.is_padding_log() ? header_.get_data_len() : 0); } diff --git a/src/logservice/palf/log_group_entry_header.cpp b/src/logservice/palf/log_group_entry_header.cpp index 6c7384479f..0dde7fdf0c 100644 --- a/src/logservice/palf/log_group_entry_header.cpp +++ b/src/logservice/palf/log_group_entry_header.cpp @@ -382,6 +382,7 @@ bool LogGroupEntryHeader::check_log_checksum_(const char *buf, PALF_LOG_RET(ERROR, OB_INVALID_ARGUMENT, "Invalid argument!!!", K(buf), K(data_len), K(group_size_)); } else if (is_padding_log()) { bool_ret = true; + group_data_checksum = PADDING_LOG_DATA_CHECKSUM; PALF_LOG(INFO, "This is a padding log, no need check log checksum", K(bool_ret), K(data_len)); } else { int64_t pos = 0; diff --git a/src/logservice/palf/log_io_uitls.h b/src/logservice/palf/log_io_uitls.h new file mode 100644 index 0000000000..4c44682dfb --- /dev/null +++ b/src/logservice/palf/log_io_uitls.h @@ -0,0 +1,29 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#ifndef OCEANBASE_LOGSERVICE_LOG_IO_UTILS_ +#define OCEANBASE_LOGSERVICE_LOG_IO_UTILS_ +namespace oceanbase +{ +namespace palf +{ + +int openat_with_retry(const int dir_fd, + const char *block_path, + const int flag, + const int mode, + int &fd); +int close_with_retry(const int fd); + +} // end namespace palf +} // end namespace oceanbase +#endif \ No newline at end of file diff --git a/src/logservice/palf/log_io_utils.cpp b/src/logservice/palf/log_io_utils.cpp new file mode 100644 index 0000000000..475b495498 --- /dev/null +++ b/src/logservice/palf/log_io_utils.cpp @@ -0,0 +1,65 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ +#include "log_io_uitls.h" +#include "log_define.h" +namespace oceanbase +{ +namespace palf +{ + +const int64_t RETRY_INTERVAL = 10*1000; +int openat_with_retry(const int dir_fd, + const char *block_path, + const int flag, + const int mode, + int &fd) +{ + int ret = OB_SUCCESS; + if (-1 == dir_fd || NULL == block_path || -1 == flag || -1 == mode) { + ret = OB_INVALID_ARGUMENT; + PALF_LOG(ERROR, "invalid argument", K(dir_fd), KP(block_path), K(flag), K(mode)); + } else { + do { + if (-1 == (fd = ::openat(dir_fd, block_path, flag, mode))) { + ret = convert_sys_errno(); + PALF_LOG(ERROR, "open block failed", K(ret), K(errno), K(block_path), K(dir_fd)); + ob_usleep(RETRY_INTERVAL); + } else { + ret = OB_SUCCESS; + break; + } + } while (OB_FAIL(ret)); + } + return ret; +} +int close_with_retry(const int fd) +{ + int ret = OB_SUCCESS; + if (-1 == fd) { + ret = OB_INVALID_ARGUMENT; + PALF_LOG(ERROR, "invalid argument", K(fd)); + } else { + do { + if (-1 == (::close(fd))) { + ret = convert_sys_errno(); + PALF_LOG(ERROR, "open block failed", K(ret), K(errno), K(fd)); + ob_usleep(RETRY_INTERVAL); + } else { + ret = OB_SUCCESS; + break; + } + } while (OB_FAIL(ret)); + } + return ret; +} +} // end namespace palf +} // end namespace oceanbase \ No newline at end of file diff --git a/src/logservice/palf/log_iterator_impl.h b/src/logservice/palf/log_iterator_impl.h index a95ef88ffc..f69a166524 100644 --- a/src/logservice/palf/log_iterator_impl.h +++ b/src/logservice/palf/log_iterator_impl.h @@ -27,6 +27,7 @@ #include "log_group_entry.h" // LogGroupEntry #include "log_meta_entry.h" // LogMetaEntry #include "log_iterator_storage.h" // LogIteratorStorage +#include "log_checksum.h" // LogChecksum namespace oceanbase { @@ -60,11 +61,21 @@ public: // @retval // OB_SUCCESS. // OB_INVALID_DATA. - // OB_ITER_END, has iterated to the end of block. - // OB_NEED_RETRY, the data in cache is not integrity, and the integrity data has been truncate from disk, - // need read data from storage eagin.(data in cache will not been clean up, therefore, - // user need used a new iterator to read data again) - // OB_ERR_OUT_LOWER_BOUND, block has been recycled + // OB_ITER_END + // - has iterated to the end of block. + // OB_NEED_RETRY + // - the data in cache is not integrity, and the integrity data has been truncate from disk, + // need read data from storage eagin.(data in cache will not been clean up, therefore, + // user need used a new iterator to read data again) + // - if the end_lsn get from get_file_end_lsn is smaller than 'log_tail_' of LogStorage, and it's + // not the exact boundary of LogGroupEntry(for PalgGroupeBufferIterator, or LogEntry for PalfBufferIterator), + // OB_NEED_RETRY may be return. + // + // OB_ERR_OUT_LOWER_BOUND + // - block has been recycled + // OB_CHECKSUM_ERROR + // - the accumlate checksum calc by accum_checksum_ and the data checksum of LogGroupEntry is not + // same as the accumlate checksum of LogGroupEntry int next(const share::SCN &replayable_point_scn); // param[in] replayable point scn, iterator will ensure that no log will return when the log scn is greater than @@ -87,7 +98,6 @@ public: // OB_SUCCESS // OB_INVALID_DATA // OB_ITER_END - // OB_ITER_END // NB: if the last write option success, but the data has been // corrupted, we also regard it as the last write option is // not atomic. @@ -100,7 +110,7 @@ public: TO_STRING_KV(KP(buf_), K_(next_round_pread_size), K_(curr_read_pos), K_(curr_read_buf_start_pos), K_(curr_read_buf_end_pos), KPC(log_storage_), K_(curr_entry_is_raw_write), K_(curr_entry_size), - K_(prev_entry_scn), K_(curr_entry), K_(init_mode_version)); + K_(prev_entry_scn), K_(curr_entry), K_(init_mode_version), K_(accumlate_checksum)); private: // @brief get next entry from data storage or cache. @@ -125,18 +135,23 @@ private: int parse_one_entry_(); template < - class TMP_ENTRY, class ACTUAL_ENTRY> - int parse_one_specific_entry_(TMP_ENTRY &entry, ACTUAL_ENTRY &actual_entry) + int parse_one_specific_entry_(ACTUAL_ENTRY &actual_entry) { int ret = OB_SUCCESS; - const bool matched_type = std::is_same::value; + const bool matched_type = std::is_same::value; int64_t pos = curr_read_pos_; if (true == matched_type) { - if (OB_FAIL(entry.deserialize(buf_, curr_read_buf_end_pos_, pos))) { + if (OB_FAIL(curr_entry_.deserialize(buf_, curr_read_buf_end_pos_, pos))) { + // When curr_entry_ is LogGroupEntry, need check accumlate checksum + // and check whether it's raw write + } else if (OB_FAIL(handle_each_log_group_entry_(curr_entry_))) { + PALF_LOG(WARN, "handle_each_log_group_entry_ failed", KPC(this)); } } else if (OB_FAIL(actual_entry.deserialize(buf_, curr_read_buf_end_pos_, pos))) { PALF_LOG(TRACE, "deserialize entry failed", K(ret), KPC(this)); + } else if (OB_FAIL(handle_each_log_group_entry_(actual_entry))) { + PALF_LOG(ERROR, "handle_each_log_group_entry_ failed", KPC(this), K(actual_entry)); } else { ret = OB_EAGAIN; advance_read_lsn_(actual_entry.get_payload_offset()); @@ -146,30 +161,6 @@ private: return ret; } - template < - class ACTUAL_ENTRY> - int parse_one_specific_entry_(LogGroupEntry &entry, ACTUAL_ENTRY &actual_entry) - { - int ret = OB_SUCCESS; - const bool matched_type = std::is_same::value; - int64_t pos = curr_read_pos_; - if (true == matched_type) { - if (OB_FAIL(entry.deserialize(buf_, curr_read_buf_end_pos_, pos))) { - } else { - curr_entry_is_raw_write_ = entry.get_header().is_raw_write(); - } - } else if (OB_FAIL(actual_entry.deserialize(buf_, curr_read_buf_end_pos_, pos))) { - PALF_LOG(TRACE, "deserialize entry failed", K(ret), KPC(this)); - } else { - ret = OB_EAGAIN; - advance_read_lsn_(actual_entry.get_payload_offset()); - PALF_LOG(TRACE, "advance_read_lsn_ payload offset", K(ret), KPC(this), K(actual_entry), "payload offset", - actual_entry.get_payload_offset()); - } - return ret; - } - - int parse_log_block_header_(); int get_log_entry_type_(LogEntryType &log_entry_type); @@ -183,8 +174,33 @@ private: void advance_read_lsn_(const offset_t step); void try_clean_up_cache_(); + template + int handle_each_log_group_entry_(const T&entry) + { + PALF_LOG(TRACE, "T is not LogGroupEntry, do no thing", K(entry)); + return OB_SUCCESS; + } + + template <> + int handle_each_log_group_entry_(const LogGroupEntry&entry) + { + int ret = OB_SUCCESS; + PALF_LOG(TRACE, "T is LogGroupEntry, do no thing", K(entry)); + if (OB_FAIL(verify_accum_checksum_(entry))) { + PALF_LOG(ERROR, "verify_accum_checksum_ failed", K(ret), KPC(this), K(entry)); + } else { + curr_entry_is_raw_write_ = entry.get_header().is_raw_write(); + } + return ret; + } + // @brief: accumlate checksum verify, only verify checkum when accum_checksum_ is not -1. + // ret val: + // OB_SUCCESS + // OB_CHECKSUM_ERROR + int verify_accum_checksum_(const LogGroupEntry &entry); + private: - static constexpr int MAX_READ_TIMES_IN_EACH_NEXT = 2; +static constexpr int MAX_READ_TIMES_IN_EACH_NEXT = 2; // In each `next_entry` round, need read data from `LogStorage` directlly, // to amortized reading cost, use `read_buf` to cache the last read result. // @@ -221,6 +237,7 @@ private: // share::SCN prev_entry_scn_; GetModeVersion get_mode_version_; + int64_t accumlate_checksum_; bool is_inited_; }; @@ -237,6 +254,7 @@ LogIteratorImpl::LogIteratorImpl() curr_entry_size_(0), init_mode_version_(0), prev_entry_scn_(), + accumlate_checksum_(-1), is_inited_(false) { } @@ -263,6 +281,7 @@ int LogIteratorImpl::init(const GetModeVersion &get_mode_version, curr_entry_size_ = 0; init_mode_version_ = PALF_INITIAL_PROPOSAL_ID; get_mode_version_ = get_mode_version; + accumlate_checksum_ = -1; is_inited_ = true; PALF_LOG(TRACE, "LogIteratorImpl init success", K(ret), KPC(this)); } @@ -279,6 +298,7 @@ void LogIteratorImpl::reuse() curr_entry_size_ = 0; prev_entry_scn_.reset(); init_mode_version_ = PALF_INITIAL_PROPOSAL_ID; + accumlate_checksum_ = -1; } template @@ -294,6 +314,7 @@ void LogIteratorImpl::destroy() curr_read_buf_start_pos_ = 0; curr_read_pos_ = 0; init_mode_version_ = 0; + accumlate_checksum_ = -1; } } @@ -412,9 +433,8 @@ int LogIteratorImpl::next(const share::SCN &replayable_point_scn, // Therefore, we should try_clean_up_cache_ in the beginning of each round of next. (void) try_clean_up_cache_(); if (OB_FAIL(get_next_entry_())) { - // NB: if get_next_entry_ failed, set 'curr_entry_size_' to 0, ensure 'is_valid' - // return false. // NB: if the data which has been corrupted, clean cache. + // NB: if the accum_checksum_ is not match, return OB_CHECKSUM_ERROR. if (OB_INVALID_DATA == ret) { PALF_LOG(WARN, "read invalid data, need clean cache", K(ret), KPC(this)); log_storage_->reuse(log_storage_->get_lsn(curr_read_pos_)); @@ -500,9 +520,6 @@ int LogIteratorImpl::get_entry(ENTRY &entry, LSN &lsn, bool &is_raw_write } else if (OB_FAIL(entry.shallow_copy(curr_entry_))) { ret = OB_ERR_UNEXPECTED; PALF_LOG(ERROR, "shallow_copy failed", K(ret), KPC(this)); - } else if (false == entry.check_integrity()) { - ret = OB_INVALID_DATA; - PALF_LOG(WARN, "data has been corrupted, attention!!!", K(ret), KPC(this)); } else { lsn = log_storage_->get_lsn(curr_read_pos_); is_raw_write = curr_entry_is_raw_write_; @@ -510,6 +527,28 @@ int LogIteratorImpl::get_entry(ENTRY &entry, LSN &lsn, bool &is_raw_write return ret; } +template +int LogIteratorImpl::verify_accum_checksum_(const LogGroupEntry &entry) +{ + int ret = OB_SUCCESS; + int64_t data_checksum = -1; + int64_t expected_verify_checksum = entry.get_header().get_accum_checksum(); + if (!entry.check_integrity(data_checksum)) { + ret = OB_INVALID_DATA; + PALF_LOG(WARN, "invalid data", K(ret), KPC(this), K(entry)); + } else if (-1 == accumlate_checksum_) { + accumlate_checksum_ = expected_verify_checksum; + PALF_LOG(INFO, "init accumlate_checksum to first LogGroupEntry", K(entry), KPC(this)); + } else if (OB_FAIL(LogChecksum::verify_accum_checksum( + accumlate_checksum_, data_checksum, + expected_verify_checksum, accumlate_checksum_))) { + PALF_LOG(ERROR, "verify checksum failed", K(ret), KPC(this), K(entry)); + } else { + PALF_LOG(TRACE, "verify_accum_checksum_ success", K(ret), KPC(this), K(entry)); + } + return ret; +} + // step1. according to magic number, acquire log entry type; // step2. deserialize ENTRY from 'read_buf_', if buf not enough, return and run in next // round; step3. check entry integrity, if failed, return OB_INVALID_DATA; step4. if the @@ -529,22 +568,19 @@ int LogIteratorImpl::parse_one_entry_() case LogEntryType::GROUP_ENTRY_HEADER: { LogGroupEntry entry; - ret = parse_one_specific_entry_(curr_entry_, entry); - if (true == entry.is_valid()) { - curr_entry_is_raw_write_ = entry.get_header().is_raw_write(); - } + ret = parse_one_specific_entry_(entry); break; } case LogEntryType::LOG_ENTRY_HEADER: { LogEntry entry; - ret = parse_one_specific_entry_(curr_entry_, entry); + ret = parse_one_specific_entry_(entry); break; } case LogEntryType::LOG_META_ENTRY_HEADER: { LogMetaEntry entry; - ret = parse_one_specific_entry_(curr_entry_, entry); + ret = parse_one_specific_entry_(entry); break; } case LogEntryType::LOG_INFO_BLOCK_HEADER: