add defense code in palf

This commit is contained in:
HaHaJeff
2023-02-27 14:11:06 +00:00
committed by ob-robot
parent af56a8d8cc
commit 9716f60c58
13 changed files with 444 additions and 67 deletions

View File

@ -29,5 +29,6 @@ ob_unittest_clog(test_ob_simple_log_arb test_ob_simple_log_arb.cpp)
ob_unittest_clog(test_ob_simple_log_single_replica_func test_ob_simple_log_single_replica_func.cpp)
ob_unittest_clog(test_ob_simple_arb_server_single_replica test_ob_simple_arb_server_single_replica.cpp)
ob_unittest_clog(test_ob_simple_arb_server_mutil_replica test_ob_simple_arb_server_mutil_replica.cpp)
ob_unittest_clog(test_ob_simple_log_data_intergrity test_ob_simple_log_data_intergrity.cpp)
add_subdirectory(archiveservice)

View File

@ -0,0 +1,159 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#include <cstdio>
#include <gtest/gtest.h>
#include <signal.h>
#include <share/scn.h>
#define private public
#include "env/ob_simple_log_cluster_env.h"
#undef private
const std::string TEST_NAME = "data_integrity";
using namespace oceanbase::common;
using namespace oceanbase;
namespace oceanbase
{
using namespace logservice;
namespace unittest
{
class TestObSimpleLogDataIntergrity: public ObSimpleLogClusterTestEnv
{
public:
TestObSimpleLogDataIntergrity() : ObSimpleLogClusterTestEnv()
{}
};
int64_t ObSimpleLogClusterTestBase::member_cnt_ = 2;
int64_t ObSimpleLogClusterTestBase::node_cnt_ = 2;
std::string ObSimpleLogClusterTestBase::test_name_ = TEST_NAME;
bool ObSimpleLogClusterTestBase::need_add_arb_server_ = false;
int pwrite_one_log_by_log_storage(PalfHandleImplGuard &leader, const LogGroupEntry &entry, const LSN &lsn)
{
int ret = OB_SUCCESS;
LogStorage *log_storage = &leader.palf_handle_impl_->log_engine_.log_storage_;
int dir_fd = log_storage->block_mgr_.dir_fd_;
block_id_t writable_block_id = log_storage->block_mgr_.curr_writable_block_id_;
LSN log_tail = log_storage->log_tail_;
offset_t write_offset = log_storage->get_phy_offset_(lsn);
char block_path[OB_MAX_FILE_NAME_LENGTH] = {'\0'};
block_id_to_string(writable_block_id, block_path, OB_MAX_FILE_NAME_LENGTH);
int block_fd = -1;
if (-1 == (block_fd = ::openat(dir_fd, block_path, O_WRONLY))) {
ret = convert_sys_errno();
PALF_LOG(ERROR, "openat failed", K(ret), K(block_path), KPC(log_storage));
} else if (0 >= pwrite(block_fd,
entry.get_data_buf() - sizeof(LogGroupEntryHeader),
entry.get_serialize_size(), write_offset)) {
ret = convert_sys_errno();
PALF_LOG(ERROR, "pwrite failed", K(ret), K(block_path), KPC(log_storage), K(write_offset), K(log_tail), K(entry));
} else {
}
if (-1 != block_fd) {
::close(block_fd);
block_fd = -1;
}
return ret;
}
TEST_F(TestObSimpleLogDataIntergrity, accumlate_checksum)
{
SET_CASE_LOG_FILE(TEST_NAME, "accumlate_checksum");
OB_LOGGER.set_log_level("TRACE");
ObTimeGuard guard("accum_checksum", 0);
const int64_t id = ATOMIC_AAF(&palf_id_, 1);
const int64_t id_raw_write = ATOMIC_AAF(&palf_id_, 1);
PALF_LOG(INFO, "start test accumlate checksum", K(id));
int64_t leader_idx = 0;
PalfHandleImplGuard leader;
EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, leader_idx, leader));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 100, id, 100 * 1024));
const LSN max_lsn = leader.get_palf_handle_impl()->get_max_lsn();
EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, max_lsn));
EXPECT_EQ(OB_ITER_END, read_log(leader));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 2, id, 1234));
const LSN end_max_lsn = leader.get_palf_handle_impl()->get_max_lsn();
EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, end_max_lsn));
LSN curr_lsn;
LogGroupEntry entry;
PalfGroupBufferIterator iterator;
EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->alloc_palf_group_buffer_iterator(max_lsn, iterator));
EXPECT_EQ(OB_SUCCESS, iterator.next());
EXPECT_EQ(OB_SUCCESS, iterator.get_entry(entry, curr_lsn));
EXPECT_EQ(curr_lsn, max_lsn);
EXPECT_EQ(OB_SUCCESS, iterator.next());
EXPECT_EQ(OB_SUCCESS, iterator.get_entry(entry, curr_lsn));
EXPECT_EQ(OB_ITER_END, iterator.next());
EXPECT_EQ(OB_SUCCESS, pwrite_one_log_by_log_storage(leader, entry, max_lsn));
PALF_LOG(INFO, "start first check");
EXPECT_EQ(OB_CHECKSUM_ERROR, read_log(leader, max_lsn));
PALF_LOG(INFO, "end first check");
EXPECT_EQ(OB_CHECKSUM_ERROR, read_log(leader));
EXPECT_EQ(OB_CHECKSUM_ERROR, read_group_log(leader, max_lsn));
EXPECT_EQ(OB_CHECKSUM_ERROR, read_group_log(leader, LSN(0)));
PALF_LOG(INFO, "end test accumlate checksum", K(id));
}
TEST_F(TestObSimpleLogDataIntergrity, log_corrupted)
{
SET_CASE_LOG_FILE(TEST_NAME, "log_corrupted");
OB_LOGGER.set_log_level("TRACE");
ObTimeGuard guard("log_corrupted", 0);
const int64_t id = ATOMIC_AAF(&palf_id_, 1);
const int64_t id_raw_write = ATOMIC_AAF(&palf_id_, 1);
PALF_LOG(INFO, "start test log corrupted", K(id));
int64_t leader_idx = 0;
PalfHandleImplGuard leader;
EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, leader_idx, leader));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 100, id, 100 * 1024));
const LSN max_lsn = leader.get_palf_handle_impl()->get_max_lsn();
EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, max_lsn));
EXPECT_EQ(OB_ITER_END, read_log(leader));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 2, id, 1234));
const LSN end_max_lsn = leader.get_palf_handle_impl()->get_max_lsn();
EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, end_max_lsn));
LSN curr_lsn;
LogGroupEntry entry;
PalfGroupBufferIterator iterator;
EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->alloc_palf_group_buffer_iterator(max_lsn, iterator));
EXPECT_EQ(OB_SUCCESS, iterator.next());
EXPECT_EQ(OB_SUCCESS, iterator.get_entry(entry, curr_lsn));
EXPECT_EQ(curr_lsn, max_lsn);
EXPECT_EQ(OB_SUCCESS, iterator.next());
EXPECT_EQ(OB_SUCCESS, iterator.get_entry(entry, curr_lsn));
EXPECT_EQ(OB_ITER_END, iterator.next());
char *buf = const_cast<char*>(entry.buf_);
buf[4] = 1;
EXPECT_EQ(OB_SUCCESS, pwrite_one_log_by_log_storage(leader, entry, max_lsn));
PALF_LOG(INFO, "start first check");
EXPECT_EQ(OB_INVALID_DATA, read_log(leader, max_lsn));
PALF_LOG(INFO, "end first check");
EXPECT_EQ(OB_INVALID_DATA, read_log(leader));
EXPECT_EQ(OB_INVALID_DATA, read_group_log(leader, max_lsn));
EXPECT_EQ(OB_INVALID_DATA, read_group_log(leader, LSN(0)));
PALF_LOG(INFO, "end test log corrupted", K(id));
}
} // end unittest
} // end oceanbase
int main(int argc, char **argv)
{
RUN_SIMPLE_LOG_CLUSTER_TEST(TEST_NAME);
}

View File

@ -141,6 +141,7 @@ ob_set_subtarget(ob_logservice palf
palf/palf_options.cpp
palf/log_mode_mgr.cpp
palf/log_updater.cpp
palf/log_io_utils.cpp
)
ob_set_subtarget(ob_logservice palf_election

View File

@ -14,7 +14,6 @@
#include <algorithm> // std::sort
#include <cstdio> // renameat
#include <fcntl.h> // ::open
#include "lib/lock/ob_spin_lock.h"
#include "lib/ob_define.h" // some constexpr
#include "lib/ob_errno.h" // OB_SUCCESS...
#include "lib/container/ob_se_array_iterator.h" // ObSEArrayIterator
@ -22,6 +21,7 @@
#include "share/ob_errno.h" // OB_NO_SUCH_FILE_OR_DIRECTORY
#include "log_writer_utils.h" // LogWriteBuf
#include "lsn.h" // LSN
#include "log_io_uitls.h" // openat_with_retry
namespace oceanbase
{
@ -345,13 +345,17 @@ int LogBlockMgr::do_truncate_(const block_id_t block_id,
ret = OB_ERR_UNEXPECTED;
PALF_LOG(ERROR, "unexpected error, block id is not same sa curr_writable_block_id_", K(ret),
KPC(this), K(block_id));
} else if (OB_FAIL(block_id_to_string(block_id, block_path, OB_MAX_FILE_NAME_LENGTH))
&& OB_FAIL(curr_writable_handler_.close()
&& OB_FAIL(curr_writable_handler_.open(block_path)))) {
} else if (OB_FAIL(block_id_to_string(block_id, block_path, OB_MAX_FILE_NAME_LENGTH))) {
PALF_LOG(ERROR, "block_id_ti_string failed", K(ret), K(block_id), KPC(this));
} else if (OB_FAIL(curr_writable_handler_.close())) {
PALF_LOG(ERROR, "close curr_writable_handler_ failed", K(ret), K(block_id), KPC(this));
} else if (OB_FAIL(curr_writable_handler_.open(block_path))) {
PALF_LOG(ERROR, "open block after delete_block_from_back_to_front_until_ failed",
K(ret), K(block_id), KPC(this));
} else if (OB_FAIL(curr_writable_handler_.truncate(offset))) {
PALF_LOG(WARN, "truncate curr_writable_handler_ failed", K(ret), K(block_id), K(offset));
} else if (OB_FAIL(check_after_truncate_(block_path, offset))) {
PALF_LOG(ERROR, "check_after_truncate_ failed", K(ret), K(block_id), K(offset));
} else if (OB_FAIL(curr_writable_handler_.load_data(offset))) {
PALF_LOG(WARN, "load_data failed", K(ret), K(block_id), K(offset));
} else {
@ -361,6 +365,57 @@ int LogBlockMgr::do_truncate_(const block_id_t block_id,
return ret;
}
int LogBlockMgr::check_after_truncate_(const char *block_path, const offset_t offset)
{
int ret = OB_SUCCESS;
int fd = -1;
char *buf = NULL;
const int buf_len = 8*1024;
char *expected_data = NULL;
offset_t read_offset = lower_align(offset, LOG_DIO_ALIGN_SIZE);
const int backoff = offset - read_offset;
// The min length of PADDING is 4K, therefore, read_offset may be 64MB-4KB,
// in_read_size is 4K, otherwise, in_read_size is 8K.
const int in_read_size = MIN(buf_len, log_block_size_-read_offset);
OB_ASSERT(backoff < LOG_DIO_ALIGN_SIZE);
if (OB_FAIL(openat_with_retry(dir_fd_, block_path, LOG_READ_FLAG, FILE_OPEN_MODE, fd))) {
PALF_LOG(ERROR, "openat_with_retry failed", KPC(this), K(block_path));
} else if (NULL == (expected_data = \
reinterpret_cast<char*>(ob_malloc(buf_len, "LogBlockMgr")))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
PALF_LOG(ERROR, "malloc failed", KPC(this));
} else if (FALSE_IT(memset(expected_data, 0, buf_len))) {
} else if (NULL == (buf = \
reinterpret_cast<char*>(ob_malloc_align(LOG_DIO_ALIGN_SIZE, buf_len, "LogBlockMgr")))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
PALF_LOG(ERROR, "malloc failed", KPC(this));
} else if (in_read_size!= (ob_pread(fd, buf, in_read_size, read_offset))) {
ret = convert_sys_errno();
PALF_LOG(ERROR, "ob_pread failed", KPC(this), K(fd), K(offset));
// TODO by runlin: after support reuse block, need use another method.
} else if (0 != MEMCMP(buf+backoff, expected_data, in_read_size-backoff)) {
ret = OB_ERR_UNEXPECTED;
while (OB_FAIL(ret)) {
PALF_LOG(ERROR, "after truncate, data is not zero", KPC(this), K(fd), K(offset),
KP(buf), KP(expected_data), K(in_read_size), K(backoff));
usleep(1000*1000);
}
} else {
PALF_LOG(INFO, "check_after_truncate_ success", KPC(this), K(block_path), K(offset));
}
if (-1 != fd && OB_FAIL(close_with_retry(fd))) {
PALF_LOG(ERROR, "close_with_retry failed", KPC(this), K(block_path));
}
if (NULL != buf) {
ob_free_align(buf);
}
if (NULL != expected_data) {
ob_free(expected_data);
}
return ret;
}
int LogBlockMgr::delete_block_from_back_to_front_until_(const block_id_t block_id)
{
int ret = OB_SUCCESS;

View File

@ -93,6 +93,8 @@ private:
int do_rename_and_fsync_(const char *block_path, const char *tmp_block_path);
bool empty_() const;
int try_recovery_last_block_(const char *log_dir);
int check_after_truncate_(const char *block_path, const offset_t offset);
const int64_t SLEEP_TS_US = 1 * 1000;
private:

View File

@ -72,24 +72,40 @@ int LogChecksum::verify_accum_checksum(const int64_t data_checksum,
// This interface is re-entrant.
// If canlculated checksum is unexpected, the verify_checksum_ won't change.
int ret = common::OB_SUCCESS;
int64_t new_verify_checksum = -1;
int64_t old_verify_checksum = verify_checksum_;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
} else {
const int64_t old_verify_checksum = verify_checksum_;
const int64_t new_verify_checksum = common::ob_crc64(verify_checksum_, const_cast<int64_t *>(&data_checksum),
sizeof(data_checksum));
if (new_verify_checksum != accum_checksum) {
// Checksum error occurs, verify_checksum_ won't change.
ret = common::OB_CHECKSUM_ERROR;
LOG_DBA_ERROR(OB_CHECKSUM_ERROR, "msg", "log checksum error", "ret", ret, K_(palf_id), K(data_checksum),
K(accum_checksum), K(old_verify_checksum), K(new_verify_checksum));
} else if (OB_FAIL(verify_accum_checksum(old_verify_checksum, data_checksum, accum_checksum, new_verify_checksum))) {
PALF_LOG(ERROR, "verify_accum_checksum failed", K(data_checksum), K(accum_checksum), K(old_verify_checksum));
} else {
// Update verify_checksum_ when checking succeeds.
verify_checksum_ = new_verify_checksum;
PALF_LOG(TRACE, "verify_accum_checksum success", K(ret), K_(palf_id), K(data_checksum), K(accum_checksum),
K_(verify_checksum), K_(accum_checksum));
}
return ret;
}
int LogChecksum::verify_accum_checksum(const int64_t old_accum_checksum,
const int64_t data_checksum,
const int64_t expected_accum_checksum,
int64_t &new_accum_checksum)
{
int ret = common::OB_SUCCESS;
new_accum_checksum = common::ob_crc64(old_accum_checksum, const_cast<int64_t *>(&data_checksum),
sizeof(data_checksum));
if (new_accum_checksum != expected_accum_checksum) {
ret = common::OB_CHECKSUM_ERROR;
LOG_DBA_ERROR(OB_CHECKSUM_ERROR, "msg", "log checksum error", "ret", ret, K(data_checksum),
K(expected_accum_checksum), K(old_accum_checksum), K(new_accum_checksum));
} else {
PALF_LOG(TRACE, "verify_accum_checksum success", K(ret), K(data_checksum),
K(expected_accum_checksum), K(old_accum_checksum), K(new_accum_checksum));
}
return ret;
}

View File

@ -32,6 +32,10 @@ public:
int64_t &accum_checksum);
virtual int verify_accum_checksum(const int64_t data_checksum,
const int64_t accum_checksum);
static int verify_accum_checksum(const int64_t old_accum_checksum,
const int64_t data_checksum,
const int64_t expected_accum_checksum,
int64_t &new_accum_checksum);
virtual void set_accum_checksum(const int64_t accum_checksum);
virtual void set_verify_checksum(const int64_t verify_checksum);
virtual int rollback_accum_checksum(const int64_t curr_accum_checksum);

View File

@ -71,10 +71,17 @@ void LogGroupEntry::reset()
bool LogGroupEntry::check_integrity() const
{
int64_t data_len = header_.get_data_len();
return header_.check_integrity(buf_, data_len);
int64_t unused_data_checksum = -1;
return check_integrity(unused_data_checksum);
}
bool LogGroupEntry::check_integrity(int64_t &data_checksum) const
{
int64_t data_len = header_.get_data_len();
return header_.check_integrity(buf_, data_len, data_checksum);
}
int LogGroupEntry::truncate(const SCN &upper_limit_scn, const int64_t pre_accum_checksum)
{
return header_.truncate(buf_, get_data_len(), upper_limit_scn, pre_accum_checksum);

View File

@ -38,6 +38,7 @@ public:
bool is_valid() const;
void reset();
bool check_integrity() const;
bool check_integrity(int64_t &data_checksum) const;
int64_t get_header_size() const { return header_.get_serialize_size(); }
int64_t get_payload_offset() const { return header_.get_serialize_size() +
(header_.is_padding_log() ? header_.get_data_len() : 0); }

View File

@ -382,6 +382,7 @@ bool LogGroupEntryHeader::check_log_checksum_(const char *buf,
PALF_LOG_RET(ERROR, OB_INVALID_ARGUMENT, "Invalid argument!!!", K(buf), K(data_len), K(group_size_));
} else if (is_padding_log()) {
bool_ret = true;
group_data_checksum = PADDING_LOG_DATA_CHECKSUM;
PALF_LOG(INFO, "This is a padding log, no need check log checksum", K(bool_ret), K(data_len));
} else {
int64_t pos = 0;

View File

@ -0,0 +1,29 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OCEANBASE_LOGSERVICE_LOG_IO_UTILS_
#define OCEANBASE_LOGSERVICE_LOG_IO_UTILS_
namespace oceanbase
{
namespace palf
{
int openat_with_retry(const int dir_fd,
const char *block_path,
const int flag,
const int mode,
int &fd);
int close_with_retry(const int fd);
} // end namespace palf
} // end namespace oceanbase
#endif

View File

@ -0,0 +1,65 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#include "log_io_uitls.h"
#include "log_define.h"
namespace oceanbase
{
namespace palf
{
const int64_t RETRY_INTERVAL = 10*1000;
int openat_with_retry(const int dir_fd,
const char *block_path,
const int flag,
const int mode,
int &fd)
{
int ret = OB_SUCCESS;
if (-1 == dir_fd || NULL == block_path || -1 == flag || -1 == mode) {
ret = OB_INVALID_ARGUMENT;
PALF_LOG(ERROR, "invalid argument", K(dir_fd), KP(block_path), K(flag), K(mode));
} else {
do {
if (-1 == (fd = ::openat(dir_fd, block_path, flag, mode))) {
ret = convert_sys_errno();
PALF_LOG(ERROR, "open block failed", K(ret), K(errno), K(block_path), K(dir_fd));
ob_usleep(RETRY_INTERVAL);
} else {
ret = OB_SUCCESS;
break;
}
} while (OB_FAIL(ret));
}
return ret;
}
int close_with_retry(const int fd)
{
int ret = OB_SUCCESS;
if (-1 == fd) {
ret = OB_INVALID_ARGUMENT;
PALF_LOG(ERROR, "invalid argument", K(fd));
} else {
do {
if (-1 == (::close(fd))) {
ret = convert_sys_errno();
PALF_LOG(ERROR, "open block failed", K(ret), K(errno), K(fd));
ob_usleep(RETRY_INTERVAL);
} else {
ret = OB_SUCCESS;
break;
}
} while (OB_FAIL(ret));
}
return ret;
}
} // end namespace palf
} // end namespace oceanbase

View File

@ -27,6 +27,7 @@
#include "log_group_entry.h" // LogGroupEntry
#include "log_meta_entry.h" // LogMetaEntry
#include "log_iterator_storage.h" // LogIteratorStorage
#include "log_checksum.h" // LogChecksum
namespace oceanbase
{
@ -60,11 +61,21 @@ public:
// @retval
// OB_SUCCESS.
// OB_INVALID_DATA.
// OB_ITER_END, has iterated to the end of block.
// OB_NEED_RETRY, the data in cache is not integrity, and the integrity data has been truncate from disk,
// OB_ITER_END
// - has iterated to the end of block.
// OB_NEED_RETRY
// - the data in cache is not integrity, and the integrity data has been truncate from disk,
// need read data from storage eagin.(data in cache will not been clean up, therefore,
// user need used a new iterator to read data again)
// OB_ERR_OUT_LOWER_BOUND, block has been recycled
// - if the end_lsn get from get_file_end_lsn is smaller than 'log_tail_' of LogStorage, and it's
// not the exact boundary of LogGroupEntry(for PalgGroupeBufferIterator, or LogEntry for PalfBufferIterator),
// OB_NEED_RETRY may be return.
//
// OB_ERR_OUT_LOWER_BOUND
// - block has been recycled
// OB_CHECKSUM_ERROR
// - the accumlate checksum calc by accum_checksum_ and the data checksum of LogGroupEntry is not
// same as the accumlate checksum of LogGroupEntry
int next(const share::SCN &replayable_point_scn);
// param[in] replayable point scn, iterator will ensure that no log will return when the log scn is greater than
@ -87,7 +98,6 @@ public:
// OB_SUCCESS
// OB_INVALID_DATA
// OB_ITER_END
// OB_ITER_END
// NB: if the last write option success, but the data has been
// corrupted, we also regard it as the last write option is
// not atomic.
@ -100,7 +110,7 @@ public:
TO_STRING_KV(KP(buf_), K_(next_round_pread_size), K_(curr_read_pos), K_(curr_read_buf_start_pos),
K_(curr_read_buf_end_pos), KPC(log_storage_), K_(curr_entry_is_raw_write), K_(curr_entry_size),
K_(prev_entry_scn), K_(curr_entry), K_(init_mode_version));
K_(prev_entry_scn), K_(curr_entry), K_(init_mode_version), K_(accumlate_checksum));
private:
// @brief get next entry from data storage or cache.
@ -125,18 +135,23 @@ private:
int parse_one_entry_();
template <
class TMP_ENTRY,
class ACTUAL_ENTRY>
int parse_one_specific_entry_(TMP_ENTRY &entry, ACTUAL_ENTRY &actual_entry)
int parse_one_specific_entry_(ACTUAL_ENTRY &actual_entry)
{
int ret = OB_SUCCESS;
const bool matched_type = std::is_same<ACTUAL_ENTRY, TMP_ENTRY>::value;
const bool matched_type = std::is_same<ACTUAL_ENTRY, ENTRY>::value;
int64_t pos = curr_read_pos_;
if (true == matched_type) {
if (OB_FAIL(entry.deserialize(buf_, curr_read_buf_end_pos_, pos))) {
if (OB_FAIL(curr_entry_.deserialize(buf_, curr_read_buf_end_pos_, pos))) {
// When curr_entry_ is LogGroupEntry, need check accumlate checksum
// and check whether it's raw write
} else if (OB_FAIL(handle_each_log_group_entry_(curr_entry_))) {
PALF_LOG(WARN, "handle_each_log_group_entry_ failed", KPC(this));
}
} else if (OB_FAIL(actual_entry.deserialize(buf_, curr_read_buf_end_pos_, pos))) {
PALF_LOG(TRACE, "deserialize entry failed", K(ret), KPC(this));
} else if (OB_FAIL(handle_each_log_group_entry_(actual_entry))) {
PALF_LOG(ERROR, "handle_each_log_group_entry_ failed", KPC(this), K(actual_entry));
} else {
ret = OB_EAGAIN;
advance_read_lsn_(actual_entry.get_payload_offset());
@ -146,30 +161,6 @@ private:
return ret;
}
template <
class ACTUAL_ENTRY>
int parse_one_specific_entry_(LogGroupEntry &entry, ACTUAL_ENTRY &actual_entry)
{
int ret = OB_SUCCESS;
const bool matched_type = std::is_same<ACTUAL_ENTRY, LogGroupEntry>::value;
int64_t pos = curr_read_pos_;
if (true == matched_type) {
if (OB_FAIL(entry.deserialize(buf_, curr_read_buf_end_pos_, pos))) {
} else {
curr_entry_is_raw_write_ = entry.get_header().is_raw_write();
}
} else if (OB_FAIL(actual_entry.deserialize(buf_, curr_read_buf_end_pos_, pos))) {
PALF_LOG(TRACE, "deserialize entry failed", K(ret), KPC(this));
} else {
ret = OB_EAGAIN;
advance_read_lsn_(actual_entry.get_payload_offset());
PALF_LOG(TRACE, "advance_read_lsn_ payload offset", K(ret), KPC(this), K(actual_entry), "payload offset",
actual_entry.get_payload_offset());
}
return ret;
}
int parse_log_block_header_();
int get_log_entry_type_(LogEntryType &log_entry_type);
@ -183,8 +174,33 @@ private:
void advance_read_lsn_(const offset_t step);
void try_clean_up_cache_();
template <class T>
int handle_each_log_group_entry_(const T&entry)
{
PALF_LOG(TRACE, "T is not LogGroupEntry, do no thing", K(entry));
return OB_SUCCESS;
}
template <>
int handle_each_log_group_entry_(const LogGroupEntry&entry)
{
int ret = OB_SUCCESS;
PALF_LOG(TRACE, "T is LogGroupEntry, do no thing", K(entry));
if (OB_FAIL(verify_accum_checksum_(entry))) {
PALF_LOG(ERROR, "verify_accum_checksum_ failed", K(ret), KPC(this), K(entry));
} else {
curr_entry_is_raw_write_ = entry.get_header().is_raw_write();
}
return ret;
}
// @brief: accumlate checksum verify, only verify checkum when accum_checksum_ is not -1.
// ret val:
// OB_SUCCESS
// OB_CHECKSUM_ERROR
int verify_accum_checksum_(const LogGroupEntry &entry);
private:
static constexpr int MAX_READ_TIMES_IN_EACH_NEXT = 2;
static constexpr int MAX_READ_TIMES_IN_EACH_NEXT = 2;
// In each `next_entry` round, need read data from `LogStorage` directlly,
// to amortized reading cost, use `read_buf` to cache the last read result.
//
@ -221,6 +237,7 @@ private:
//
share::SCN prev_entry_scn_;
GetModeVersion get_mode_version_;
int64_t accumlate_checksum_;
bool is_inited_;
};
@ -237,6 +254,7 @@ LogIteratorImpl<ENTRY>::LogIteratorImpl()
curr_entry_size_(0),
init_mode_version_(0),
prev_entry_scn_(),
accumlate_checksum_(-1),
is_inited_(false)
{
}
@ -263,6 +281,7 @@ int LogIteratorImpl<ENTRY>::init(const GetModeVersion &get_mode_version,
curr_entry_size_ = 0;
init_mode_version_ = PALF_INITIAL_PROPOSAL_ID;
get_mode_version_ = get_mode_version;
accumlate_checksum_ = -1;
is_inited_ = true;
PALF_LOG(TRACE, "LogIteratorImpl init success", K(ret), KPC(this));
}
@ -279,6 +298,7 @@ void LogIteratorImpl<ENTRY>::reuse()
curr_entry_size_ = 0;
prev_entry_scn_.reset();
init_mode_version_ = PALF_INITIAL_PROPOSAL_ID;
accumlate_checksum_ = -1;
}
template <class ENTRY>
@ -294,6 +314,7 @@ void LogIteratorImpl<ENTRY>::destroy()
curr_read_buf_start_pos_ = 0;
curr_read_pos_ = 0;
init_mode_version_ = 0;
accumlate_checksum_ = -1;
}
}
@ -412,9 +433,8 @@ int LogIteratorImpl<ENTRY>::next(const share::SCN &replayable_point_scn,
// Therefore, we should try_clean_up_cache_ in the beginning of each round of next.
(void) try_clean_up_cache_();
if (OB_FAIL(get_next_entry_())) {
// NB: if get_next_entry_ failed, set 'curr_entry_size_' to 0, ensure 'is_valid'
// return false.
// NB: if the data which has been corrupted, clean cache.
// NB: if the accum_checksum_ is not match, return OB_CHECKSUM_ERROR.
if (OB_INVALID_DATA == ret) {
PALF_LOG(WARN, "read invalid data, need clean cache", K(ret), KPC(this));
log_storage_->reuse(log_storage_->get_lsn(curr_read_pos_));
@ -500,9 +520,6 @@ int LogIteratorImpl<ENTRY>::get_entry(ENTRY &entry, LSN &lsn, bool &is_raw_write
} else if (OB_FAIL(entry.shallow_copy(curr_entry_))) {
ret = OB_ERR_UNEXPECTED;
PALF_LOG(ERROR, "shallow_copy failed", K(ret), KPC(this));
} else if (false == entry.check_integrity()) {
ret = OB_INVALID_DATA;
PALF_LOG(WARN, "data has been corrupted, attention!!!", K(ret), KPC(this));
} else {
lsn = log_storage_->get_lsn(curr_read_pos_);
is_raw_write = curr_entry_is_raw_write_;
@ -510,6 +527,28 @@ int LogIteratorImpl<ENTRY>::get_entry(ENTRY &entry, LSN &lsn, bool &is_raw_write
return ret;
}
template<class ENTRY>
int LogIteratorImpl<ENTRY>::verify_accum_checksum_(const LogGroupEntry &entry)
{
int ret = OB_SUCCESS;
int64_t data_checksum = -1;
int64_t expected_verify_checksum = entry.get_header().get_accum_checksum();
if (!entry.check_integrity(data_checksum)) {
ret = OB_INVALID_DATA;
PALF_LOG(WARN, "invalid data", K(ret), KPC(this), K(entry));
} else if (-1 == accumlate_checksum_) {
accumlate_checksum_ = expected_verify_checksum;
PALF_LOG(INFO, "init accumlate_checksum to first LogGroupEntry", K(entry), KPC(this));
} else if (OB_FAIL(LogChecksum::verify_accum_checksum(
accumlate_checksum_, data_checksum,
expected_verify_checksum, accumlate_checksum_))) {
PALF_LOG(ERROR, "verify checksum failed", K(ret), KPC(this), K(entry));
} else {
PALF_LOG(TRACE, "verify_accum_checksum_ success", K(ret), KPC(this), K(entry));
}
return ret;
}
// step1. according to magic number, acquire log entry type;
// step2. deserialize ENTRY from 'read_buf_', if buf not enough, return and run in next
// round; step3. check entry integrity, if failed, return OB_INVALID_DATA; step4. if the
@ -529,22 +568,19 @@ int LogIteratorImpl<ENTRY>::parse_one_entry_()
case LogEntryType::GROUP_ENTRY_HEADER:
{
LogGroupEntry entry;
ret = parse_one_specific_entry_(curr_entry_, entry);
if (true == entry.is_valid()) {
curr_entry_is_raw_write_ = entry.get_header().is_raw_write();
}
ret = parse_one_specific_entry_(entry);
break;
}
case LogEntryType::LOG_ENTRY_HEADER:
{
LogEntry entry;
ret = parse_one_specific_entry_(curr_entry_, entry);
ret = parse_one_specific_entry_(entry);
break;
}
case LogEntryType::LOG_META_ENTRY_HEADER:
{
LogMetaEntry entry;
ret = parse_one_specific_entry_(curr_entry_, entry);
ret = parse_one_specific_entry_(entry);
break;
}
case LogEntryType::LOG_INFO_BLOCK_HEADER: