From 3af5267ab68ad80706065c831404dc4d812dc4b5 Mon Sep 17 00:00:00 2001 From: HaHaJeff Date: Sun, 26 Mar 2023 08:11:08 +0000 Subject: [PATCH] fixed restart failed due to log blocks' integrity after failover --- .../logservice/test_ob_simple_log_engine.cpp | 68 +++++++++++++++++++ ...test_ob_simple_log_single_replica_func.cpp | 59 ++++++++++++++-- src/logservice/palf/log_block_mgr.cpp | 10 +-- src/logservice/palf/log_block_mgr.h | 2 +- src/logservice/palf/log_define.cpp | 34 +++++++++- src/logservice/palf/log_define.h | 10 ++- src/logservice/palf/log_storage.cpp | 16 +++-- 7 files changed, 180 insertions(+), 19 deletions(-) diff --git a/mittest/logservice/test_ob_simple_log_engine.cpp b/mittest/logservice/test_ob_simple_log_engine.cpp index 62f6bfedb..c6fbcdcb8 100644 --- a/mittest/logservice/test_ob_simple_log_engine.cpp +++ b/mittest/logservice/test_ob_simple_log_engine.cpp @@ -183,6 +183,74 @@ int64_t ObSimpleLogClusterTestBase::node_cnt_ = 1; std::string ObSimpleLogClusterTestBase::test_name_ = TEST_NAME; bool ObSimpleLogClusterTestBase::need_add_arb_server_ = false; +// 验证flashback过程中宕机重启 +TEST_F(TestObSimpleLogClusterLogEngine, flashback_restart) +{ + SET_CASE_LOG_FILE(TEST_NAME, "flashback_restart"); + OB_LOGGER.set_log_level("TRACE"); + PALF_LOG(INFO, "begin flashback_restart"); + PalfHandleImplGuard leader; + int64_t id_1 = ATOMIC_AAF(&palf_id_, 1); + int64_t leader_idx_1 = 0; + PalfEnv *palf_env = NULL; + EXPECT_EQ(OB_SUCCESS, create_paxos_group(id_1, leader_idx_1, leader)); + EXPECT_EQ(OB_SUCCESS, get_palf_env(leader_idx_1, palf_env)); + EXPECT_EQ(OB_SUCCESS, submit_log(leader, 66, leader_idx_1, MAX_LOG_BODY_SIZE)); + EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, leader.palf_handle_impl_->get_max_lsn())); + SCN scn; + LogStorage *log_storage = &leader.get_palf_handle_impl()->log_engine_.log_storage_; + LSN log_tail = log_storage->log_tail_; + scn = leader.get_palf_handle_impl()->get_end_scn(); + EXPECT_EQ(OB_SUCCESS, submit_log(leader, 33, leader_idx_1, MAX_LOG_BODY_SIZE)); + EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, leader.palf_handle_impl_->get_max_lsn())); + int64_t mode_version; + AccessMode mode; + EXPECT_EQ(OB_SUCCESS, leader.get_palf_handle_impl()->get_access_mode(mode_version, mode)); + LSN flashback_lsn(PALF_BLOCK_SIZE*lsn_2_block(log_tail, PALF_BLOCK_SIZE)); + EXPECT_EQ(OB_SUCCESS, log_storage->begin_flashback(flashback_lsn)); + leader.reset(); + EXPECT_EQ(OB_SUCCESS, restart_paxos_groups()); + + { + PalfHandleImplGuard leader1; + EXPECT_EQ(OB_SUCCESS, get_leader(id_1, leader1, leader_idx_1)); + LogStorage *log_storage = &leader1.get_palf_handle_impl()->log_engine_.log_storage_; + EXPECT_LE(2, log_storage->block_mgr_.max_block_id_); + EXPECT_EQ(OB_SUCCESS, log_storage->block_mgr_.create_tmp_block_handler(2)); + EXPECT_EQ(OB_SUCCESS, log_storage->update_manifest_cb_(3)); + EXPECT_EQ(OB_SUCCESS, log_storage->block_mgr_.delete_block_from_back_to_front_until(2)); + { + LogBlockMgr *block_mgr = &log_storage->block_mgr_; + int block_id = 2; + int ret = OB_SUCCESS; + // 1. rename "block_id.tmp" to "block_id.flashback" + // 2. delete "block_id", make sure each block has returned into BlockPool + // 3. rename "block_id.flashback" to "block_id" + // NB: for restart, the block which named 'block_id.flashback' must be renamed to 'block_id' + char tmp_block_path[OB_MAX_FILE_NAME_LENGTH] = {'\0'}; + char block_path[OB_MAX_FILE_NAME_LENGTH] = {'\0'}; + char flashback_block_path[OB_MAX_FILE_NAME_LENGTH] = {'\0'}; + if (block_id != block_mgr->curr_writable_block_id_) { + ret = OB_ERR_UNEXPECTED; + PALF_LOG(ERROR, "block_id is not same as curr_writable_handler_, unexpected error", + K(ret), K(block_id), KPC(block_mgr)); + } else if (OB_FAIL(block_id_to_string(block_id, block_path, OB_MAX_FILE_NAME_LENGTH))) { + PALF_LOG(ERROR, "block_id_to_string failed", K(ret), K(block_id)); + } else if (OB_FAIL(block_id_to_tmp_string(block_id, tmp_block_path, OB_MAX_FILE_NAME_LENGTH))) { + PALF_LOG(ERROR, "block_id_to_tmp_string failed", K(ret), K(block_id)); + } else if (OB_FAIL(block_id_to_flashback_string(block_id, flashback_block_path, OB_MAX_FILE_NAME_LENGTH))) { + PALF_LOG(ERROR, "block_id_to_flashback_string failed", K(ret), K(block_id)); + } else if (OB_FAIL(block_mgr->do_rename_and_fsync_(tmp_block_path, flashback_block_path))) { + PALF_LOG(ERROR, "do_rename_and_fsync_ failed", K(ret), KPC(block_mgr)); + } else { + PALF_LOG(INFO, "rename_tmp_block_handler_to_normal success", K(ret), KPC(block_mgr)); + } + } + } + EXPECT_EQ(OB_SUCCESS, restart_paxos_groups()); + EXPECT_EQ(OB_SUCCESS, restart_paxos_groups()); +} + TEST_F(TestObSimpleLogClusterLogEngine, exception_path) { SET_CASE_LOG_FILE(TEST_NAME, "exception_path"); diff --git a/mittest/logservice/test_ob_simple_log_single_replica_func.cpp b/mittest/logservice/test_ob_simple_log_single_replica_func.cpp index 8ccf2a0ea..23560eadf 100644 --- a/mittest/logservice/test_ob_simple_log_single_replica_func.cpp +++ b/mittest/logservice/test_ob_simple_log_single_replica_func.cpp @@ -436,6 +436,7 @@ TEST_F(TestObSimpleLogClusterSingleReplica, single_replica_flashback_restart) EXPECT_EQ(OB_ITER_END, read_log(leader)); } EXPECT_EQ(OB_SUCCESS, restart_paxos_groups()); + { // 验证重启场景 PalfHandleImplGuard new_leader; int64_t curr_mode_version = INVALID_PROPOSAL_ID; @@ -448,14 +449,58 @@ TEST_F(TestObSimpleLogClusterSingleReplica, single_replica_flashback_restart) EXPECT_EQ(OB_ITER_END, read_log(new_leader)); ref_scn.convert_for_tx(1000); LogEntryHeader header_new; + LogStorage *log_storage = &new_leader.palf_handle_impl_->log_engine_.log_storage_; + block_id_t max_block_id = log_storage->block_mgr_.max_block_id_; EXPECT_EQ(OB_SUCCESS, get_middle_scn(1329, new_leader, max_scn, header_new)); + // flashback跨文件场景重启 + EXPECT_EQ(OB_SUCCESS, submit_log(new_leader, 33, leader_idx, MAX_LOG_BODY_SIZE)); + wait_until_has_committed(new_leader, new_leader.palf_handle_impl_->sw_.get_max_lsn()); + EXPECT_LE(max_block_id, log_storage->block_mgr_.max_block_id_); switch_append_to_flashback(new_leader, mode_version); EXPECT_EQ(OB_SUCCESS, new_leader.palf_handle_impl_->flashback(mode_version, max_scn, timeout_ts_us)); + EXPECT_GE(max_block_id, log_storage->block_mgr_.max_block_id_); switch_flashback_to_append(new_leader, mode_version); - PALF_LOG(INFO, "flashback after restart"); + } + EXPECT_EQ(OB_SUCCESS, restart_paxos_groups()); + { + PalfHandleImplGuard new_leader; + int64_t curr_mode_version = INVALID_PROPOSAL_ID; + AccessMode curr_access_mode = AccessMode::INVALID_ACCESS_MODE; + EXPECT_EQ(OB_SUCCESS, get_leader(id, new_leader, leader_idx)); + EXPECT_EQ(OB_SUCCESS, new_leader.palf_handle_impl_->get_access_mode(curr_mode_version, curr_access_mode)); + + // flashback到某个文件的尾部 + EXPECT_EQ(OB_SUCCESS, submit_log(new_leader, 65, leader_idx, MAX_LOG_BODY_SIZE)); + wait_until_has_committed(new_leader, new_leader.palf_handle_impl_->sw_.get_max_lsn()); + switch_append_to_flashback(new_leader, mode_version); + LSN lsn(PALF_BLOCK_SIZE); + LogStorage *log_storage = &new_leader.palf_handle_impl_->log_engine_.log_storage_; + SCN block_end_scn; + { + PalfGroupBufferIterator iterator; + auto get_file_end_lsn = [](){ + return LSN(PALF_BLOCK_SIZE); + }; + EXPECT_EQ(OB_SUCCESS, iterator.init(LSN(0), get_file_end_lsn, log_storage)); + LogGroupEntry entry; + LSN lsn; + while (OB_SUCCESS == iterator.next()) { + EXPECT_EQ(OB_SUCCESS, iterator.get_entry(entry, lsn)); + } + block_end_scn = entry.get_scn(); + } + EXPECT_EQ(OB_SUCCESS, new_leader.palf_handle_impl_->flashback(mode_version, block_end_scn, timeout_ts_us)); + EXPECT_EQ(lsn, log_storage->log_tail_); EXPECT_EQ(OB_ITER_END, read_log(new_leader)); - EXPECT_EQ(OB_SUCCESS, submit_log(new_leader, 1000, leader_idx)); - sleep(1); + } + EXPECT_EQ(OB_SUCCESS, restart_paxos_groups()); + + // 重启后继续提交日志 + PalfHandleImplGuard new_leader; + EXPECT_EQ(OB_SUCCESS, get_leader(id, new_leader, leader_idx)); + switch_flashback_to_append(new_leader, mode_version); + EXPECT_EQ(OB_SUCCESS, submit_log(new_leader, 100, leader_idx)); + wait_until_has_committed(new_leader, new_leader.palf_handle_impl_->sw_.get_max_lsn()); EXPECT_EQ(OB_ITER_END, read_log(new_leader)); new_leader.reset(); delete_paxos_group(id); @@ -491,8 +536,14 @@ TEST_F(TestObSimpleLogClusterSingleReplica, test_truncate_failed) FileDirectoryUtils::get_file_size(block_path, file_size); EXPECT_EQ(file_size, PALF_PHY_BLOCK_SIZE); get_leader(id, leader, leader_idx); - EXPECT_EQ(OB_SUCCESS, submit_log(leader, 10, id, 1000)); + EXPECT_EQ(OB_SUCCESS, submit_log(leader, 32, id, MAX_LOG_BODY_SIZE)); + wait_lsn_until_flushed(leader.palf_handle_impl_->get_max_lsn(), leader); EXPECT_EQ(OB_ITER_END, read_log(leader)); + // 验证truncate文件尾后重启 + EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->log_engine_.truncate(LSN(PALF_BLOCK_SIZE))); + EXPECT_EQ(LSN(PALF_BLOCK_SIZE), leader.palf_handle_impl_->log_engine_.log_storage_.log_tail_); + leader.reset(); + EXPECT_EQ(OB_SUCCESS, restart_paxos_groups()); } TEST_F(TestObSimpleLogClusterSingleReplica, test_meta) diff --git a/src/logservice/palf/log_block_mgr.cpp b/src/logservice/palf/log_block_mgr.cpp index 503c4b469..ae3e02433 100644 --- a/src/logservice/palf/log_block_mgr.cpp +++ b/src/logservice/palf/log_block_mgr.cpp @@ -65,7 +65,7 @@ int LogBlockMgr::init(const char *log_dir, PALF_LOG(ERROR, "::open failed", K(ret), K(log_dir)); } else if (OB_FAIL(curr_writable_handler_.init(dir_fd_, log_block_size, align_size, align_buf_size))) { PALF_LOG(ERROR, "init curr_writable_handler_ failed", K(ret), K(log_dir)); - } else if (OB_FAIL(do_scan_dir_(log_dir, initial_block_id))) { + } else if (OB_FAIL(do_scan_dir_(log_dir, initial_block_id, log_block_pool))) { PALF_LOG(ERROR, "do_scan_dir_ failed", K(ret), K(log_dir)); } else if (OB_FAIL(try_recovery_last_block_(log_dir))) { PALF_LOG(ERROR, "try_recovery_last_block_ failed", K(ret), KPC(this)); @@ -349,7 +349,7 @@ int LogBlockMgr::do_truncate_(const block_id_t block_id, PALF_LOG(ERROR, "unexpected error, block id is not same sa curr_writable_block_id_", K(ret), KPC(this), K(block_id)); } else if (OB_FAIL(block_id_to_string(block_id, block_path, OB_MAX_FILE_NAME_LENGTH))) { - PALF_LOG(ERROR, "block_id_ti_string failed", K(ret), K(block_id), KPC(this)); + PALF_LOG(ERROR, "block_id_to_string failed", K(ret), K(block_id), KPC(this)); } else if (OB_FAIL(curr_writable_handler_.close())) { PALF_LOG(ERROR, "close curr_writable_handler_ failed", K(ret), K(block_id), KPC(this)); } else if (OB_FAIL(curr_writable_handler_.open(block_path))) { @@ -458,14 +458,16 @@ int LogBlockMgr::do_delete_block_(const block_id_t block_id) return ret; } -int LogBlockMgr::do_scan_dir_(const char *dir, const block_id_t initial_block_id) +int LogBlockMgr::do_scan_dir_(const char *dir, + const block_id_t initial_block_id, + ILogBlockPool *log_block_pool) { int ret = OB_SUCCESS; if (LOG_INVALID_BLOCK_ID != min_block_id_ || LOG_INVALID_BLOCK_ID != max_block_id_) { ret = OB_ERR_UNEXPECTED; PALF_LOG(WARN, "unexpected error, the cache data must be invalid", K(ret), K(min_block_id_), K(max_block_id_)); } else { - TrimLogDirectoryFunctor functor(dir); + TrimLogDirectoryFunctor functor(dir, log_block_pool); if (OB_FAIL(scan_dir(dir, functor))) { PALF_LOG(WARN, "scan_dir failed", K(ret), K(dir)); } else { diff --git a/src/logservice/palf/log_block_mgr.h b/src/logservice/palf/log_block_mgr.h index fa1400e05..fa081dbaf 100644 --- a/src/logservice/palf/log_block_mgr.h +++ b/src/logservice/palf/log_block_mgr.h @@ -89,7 +89,7 @@ private: int do_delete_block_(const block_id_t block_id); int delete_block_from_back_to_front_until_(const block_id_t block_id); int do_truncate_(const block_id_t block_id, const offset_t offset); - int do_scan_dir_(const char *dir, const block_id_t initial_block_id); + int do_scan_dir_(const char *dir, const block_id_t initial_block_id, ILogBlockPool *log_block_pool); int do_rename_and_fsync_(const char *block_path, const char *tmp_block_path); bool empty_() const; int try_recovery_last_block_(const char *log_dir); diff --git a/src/logservice/palf/log_define.cpp b/src/logservice/palf/log_define.cpp index 3bf45a315..ea39dd814 100644 --- a/src/logservice/palf/log_define.cpp +++ b/src/logservice/palf/log_define.cpp @@ -14,6 +14,7 @@ #include "lib/list/ob_dlist.h" #include "share/ob_errno.h" #include "linux/falloc.h" // FALLOC_FL_ZERO_RANGE for linux kernel 4.9 +#include "log_block_pool_interface.h" namespace oceanbase { @@ -165,7 +166,7 @@ int TrimLogDirectoryFunctor::func(const dirent *entry) // do nothing, skip invalid block like tmp } else { if (true == str_is_flashback_block - && OB_FAIL(rename_flashback_to_normal(entry_name))) { + && OB_FAIL(rename_flashback_to_normal_(entry_name))) { PALF_LOG(ERROR, "rename_flashback_to_normal failed", K(ret), K(dir_), K(entry_name)); } if (OB_SUCC(ret)) { @@ -182,7 +183,7 @@ int TrimLogDirectoryFunctor::func(const dirent *entry) return ret; } -int TrimLogDirectoryFunctor::rename_flashback_to_normal(const char *file_name) +int TrimLogDirectoryFunctor::rename_flashback_to_normal_(const char *file_name) { int ret = OB_SUCCESS; int dir_fd = -1; @@ -191,7 +192,9 @@ int TrimLogDirectoryFunctor::rename_flashback_to_normal(const char *file_name) const int64_t SLEEP_TS_US = 10 * 1000; if (-1 == (dir_fd = ::open(dir_, O_DIRECTORY | O_RDONLY))) { ret = convert_sys_errno(); - } else { + } else if (OB_FAIL(try_to_remove_block_(dir_fd, normal_file_name))) { + PALF_LOG(ERROR, "try_to_remove_block_ failed", K(file_name), K(normal_file_name)); + } else { do { if (-1 == ::renameat(dir_fd, file_name, dir_fd, normal_file_name)) { ret = convert_sys_errno(); @@ -211,6 +214,31 @@ int TrimLogDirectoryFunctor::rename_flashback_to_normal(const char *file_name) return ret; } +int TrimLogDirectoryFunctor::try_to_remove_block_(const int dir_fd, const char *file_name) +{ + int ret = OB_SUCCESS; + int fd = -1; + if (-1 == (fd = ::openat(dir_fd, file_name, LOG_READ_FLAG))) { + ret = convert_sys_errno(); + } + // if file not exist, return OB_SUCCESS; + if (OB_FAIL(ret)) { + if (OB_NO_SUCH_FILE_OR_DIRECTORY == ret) { + ret = OB_SUCCESS; + PALF_LOG(INFO, "before rename flashback to normal and after delete normal file, restart!!!", K(file_name)); + } else { + PALF_LOG(ERROR, "open file failed", K(file_name)); + } + } else if (OB_FAIL(log_block_pool_->remove_block_at(dir_fd, file_name))) { + PALF_LOG(ERROR, "remove_block_at failed", K(dir_fd), K(file_name)); + } + if (-1 != fd && -1 == ::close(fd)) { + ret = convert_sys_errno(); + PALF_LOG(ERROR, "close fd failed", K(file_name)); + } + return ret; +} + int reuse_block_at(const int dir_fd, const char *block_path) { int ret = OB_SUCCESS; diff --git a/src/logservice/palf/log_define.h b/src/logservice/palf/log_define.h index 001a4570d..f0d1a4fc1 100644 --- a/src/logservice/palf/log_define.h +++ b/src/logservice/palf/log_define.h @@ -45,6 +45,7 @@ typedef uint64_t offset_t; constexpr int64_t INVALID_PALF_ID = -1; class LSN; class LogWriteBuf; +class ILogBlockPool; // ==================== palf env start ============================= const int64_t MIN_DISK_SIZE_PER_PALF_INSTANCE = 512 * 1024 * 1024ul; @@ -351,22 +352,25 @@ private: class TrimLogDirectoryFunctor : public ObBaseDirFunctor { public: - TrimLogDirectoryFunctor(const char *dir) + TrimLogDirectoryFunctor(const char *dir, ILogBlockPool *log_block_pool) : dir_(dir), min_block_id_(LOG_INVALID_BLOCK_ID), - max_block_id_(LOG_INVALID_BLOCK_ID) + max_block_id_(LOG_INVALID_BLOCK_ID), + log_block_pool_(log_block_pool) { } virtual ~TrimLogDirectoryFunctor() = default; - int rename_flashback_to_normal(const char *file_name); int func(const dirent *entry) override final; block_id_t get_min_block_id() const { return min_block_id_; } block_id_t get_max_block_id() const { return max_block_id_; } private: + int rename_flashback_to_normal_(const char *file_name); + int try_to_remove_block_(const int dir_fd, const char *file_name); const char *dir_; block_id_t min_block_id_; block_id_t max_block_id_; + ILogBlockPool *log_block_pool_; DISALLOW_COPY_AND_ASSIGN(TrimLogDirectoryFunctor); }; diff --git a/src/logservice/palf/log_storage.cpp b/src/logservice/palf/log_storage.cpp index b11be27c5..8847b481f 100644 --- a/src/logservice/palf/log_storage.cpp +++ b/src/logservice/palf/log_storage.cpp @@ -312,6 +312,11 @@ int LogStorage::inner_truncate_(const LSN &lsn) int ret = OB_SUCCESS; const block_id_t lsn_block_id = lsn_2_block(lsn, logical_block_size_); const block_id_t log_tail_block_id = lsn_2_block(log_tail_, logical_block_size_); + // 'expected_next_block_id' used to check whether disk is integral, we make sure that either it's + // empty or it doesn't exist. + // because the padding log is submitted by next log, even if the 'lsn' is the end lsn of padding + // the block after 'lsn_block_id' must exist. we just set expected_next_block_id to 'lsn_block_id' + 1 + // and the block after 'lsn_block_id' will be reset to empty. const block_id_t expected_next_block_id = lsn_block_id + 1; if (lsn_block_id != log_tail_block_id && OB_FAIL(update_manifest_cb_(expected_next_block_id))) { PALF_LOG(WARN, @@ -418,9 +423,12 @@ int LogStorage::end_flashback(const LSN &start_lsn_of_block) { int ret = OB_SUCCESS; const block_id_t block_id = lsn_2_block(start_lsn_of_block, logical_block_size_); - // update manifest - const block_id_t log_tail_block_id = lsn_2_block(log_tail_, logical_block_size_); - const block_id_t expected_next_block_id = log_tail_block_id + 1; + // NB: 'expected_next_block_id' is used to check whether disk is integral, we make sure that either it's + // empty or it doesn't exist. + // we can set 'expected_next_block_id' to 'block_id' + 1 because of the block of 'start_lsn_of_block' + // must exist. even if the block after 'block_id' have been deleted, the block of 'expected_next_block_id' + // will not exist. + const block_id_t expected_next_block_id = block_id + 1; if (OB_FAIL(update_manifest_cb_(expected_next_block_id))) { PALF_LOG(WARN, "update_manifest_cb_ failed", K(ret), KPC(this), K(block_id), K(expected_next_block_id), K(start_lsn_of_block)); @@ -615,7 +623,7 @@ int LogStorage::inner_switch_block_() { int ret = OB_SUCCESS; const block_id_t block_id = lsn_2_block(log_tail_, logical_block_size_); - // 'expected_next_block_id' used to check whether disk is integral, we make sure that either it's + // 'expected_next_block_id' is used to check whether disk is integral, we make sure that either it's // empty or it doesn't exist. const block_id_t expected_next_block_id = block_id + 1; if (OB_FAIL(block_mgr_.switch_next_block(block_id))) {