fixed restart failed due to log blocks' integrity after failover

This commit is contained in:
HaHaJeff 2023-03-26 08:11:08 +00:00 committed by ob-robot
parent 92032e7a19
commit 3af5267ab6
7 changed files with 180 additions and 19 deletions

View File

@ -183,6 +183,74 @@ int64_t ObSimpleLogClusterTestBase::node_cnt_ = 1;
std::string ObSimpleLogClusterTestBase::test_name_ = TEST_NAME;
bool ObSimpleLogClusterTestBase::need_add_arb_server_ = false;
// 验证flashback过程中宕机重启
TEST_F(TestObSimpleLogClusterLogEngine, flashback_restart)
{
SET_CASE_LOG_FILE(TEST_NAME, "flashback_restart");
OB_LOGGER.set_log_level("TRACE");
PALF_LOG(INFO, "begin flashback_restart");
PalfHandleImplGuard leader;
int64_t id_1 = ATOMIC_AAF(&palf_id_, 1);
int64_t leader_idx_1 = 0;
PalfEnv *palf_env = NULL;
EXPECT_EQ(OB_SUCCESS, create_paxos_group(id_1, leader_idx_1, leader));
EXPECT_EQ(OB_SUCCESS, get_palf_env(leader_idx_1, palf_env));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 66, leader_idx_1, MAX_LOG_BODY_SIZE));
EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, leader.palf_handle_impl_->get_max_lsn()));
SCN scn;
LogStorage *log_storage = &leader.get_palf_handle_impl()->log_engine_.log_storage_;
LSN log_tail = log_storage->log_tail_;
scn = leader.get_palf_handle_impl()->get_end_scn();
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 33, leader_idx_1, MAX_LOG_BODY_SIZE));
EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, leader.palf_handle_impl_->get_max_lsn()));
int64_t mode_version;
AccessMode mode;
EXPECT_EQ(OB_SUCCESS, leader.get_palf_handle_impl()->get_access_mode(mode_version, mode));
LSN flashback_lsn(PALF_BLOCK_SIZE*lsn_2_block(log_tail, PALF_BLOCK_SIZE));
EXPECT_EQ(OB_SUCCESS, log_storage->begin_flashback(flashback_lsn));
leader.reset();
EXPECT_EQ(OB_SUCCESS, restart_paxos_groups());
{
PalfHandleImplGuard leader1;
EXPECT_EQ(OB_SUCCESS, get_leader(id_1, leader1, leader_idx_1));
LogStorage *log_storage = &leader1.get_palf_handle_impl()->log_engine_.log_storage_;
EXPECT_LE(2, log_storage->block_mgr_.max_block_id_);
EXPECT_EQ(OB_SUCCESS, log_storage->block_mgr_.create_tmp_block_handler(2));
EXPECT_EQ(OB_SUCCESS, log_storage->update_manifest_cb_(3));
EXPECT_EQ(OB_SUCCESS, log_storage->block_mgr_.delete_block_from_back_to_front_until(2));
{
LogBlockMgr *block_mgr = &log_storage->block_mgr_;
int block_id = 2;
int ret = OB_SUCCESS;
// 1. rename "block_id.tmp" to "block_id.flashback"
// 2. delete "block_id", make sure each block has returned into BlockPool
// 3. rename "block_id.flashback" to "block_id"
// NB: for restart, the block which named 'block_id.flashback' must be renamed to 'block_id'
char tmp_block_path[OB_MAX_FILE_NAME_LENGTH] = {'\0'};
char block_path[OB_MAX_FILE_NAME_LENGTH] = {'\0'};
char flashback_block_path[OB_MAX_FILE_NAME_LENGTH] = {'\0'};
if (block_id != block_mgr->curr_writable_block_id_) {
ret = OB_ERR_UNEXPECTED;
PALF_LOG(ERROR, "block_id is not same as curr_writable_handler_, unexpected error",
K(ret), K(block_id), KPC(block_mgr));
} else if (OB_FAIL(block_id_to_string(block_id, block_path, OB_MAX_FILE_NAME_LENGTH))) {
PALF_LOG(ERROR, "block_id_to_string failed", K(ret), K(block_id));
} else if (OB_FAIL(block_id_to_tmp_string(block_id, tmp_block_path, OB_MAX_FILE_NAME_LENGTH))) {
PALF_LOG(ERROR, "block_id_to_tmp_string failed", K(ret), K(block_id));
} else if (OB_FAIL(block_id_to_flashback_string(block_id, flashback_block_path, OB_MAX_FILE_NAME_LENGTH))) {
PALF_LOG(ERROR, "block_id_to_flashback_string failed", K(ret), K(block_id));
} else if (OB_FAIL(block_mgr->do_rename_and_fsync_(tmp_block_path, flashback_block_path))) {
PALF_LOG(ERROR, "do_rename_and_fsync_ failed", K(ret), KPC(block_mgr));
} else {
PALF_LOG(INFO, "rename_tmp_block_handler_to_normal success", K(ret), KPC(block_mgr));
}
}
}
EXPECT_EQ(OB_SUCCESS, restart_paxos_groups());
EXPECT_EQ(OB_SUCCESS, restart_paxos_groups());
}
TEST_F(TestObSimpleLogClusterLogEngine, exception_path)
{
SET_CASE_LOG_FILE(TEST_NAME, "exception_path");

View File

@ -436,6 +436,7 @@ TEST_F(TestObSimpleLogClusterSingleReplica, single_replica_flashback_restart)
EXPECT_EQ(OB_ITER_END, read_log(leader));
}
EXPECT_EQ(OB_SUCCESS, restart_paxos_groups());
{
// 验证重启场景
PalfHandleImplGuard new_leader;
int64_t curr_mode_version = INVALID_PROPOSAL_ID;
@ -448,14 +449,58 @@ TEST_F(TestObSimpleLogClusterSingleReplica, single_replica_flashback_restart)
EXPECT_EQ(OB_ITER_END, read_log(new_leader));
ref_scn.convert_for_tx(1000);
LogEntryHeader header_new;
LogStorage *log_storage = &new_leader.palf_handle_impl_->log_engine_.log_storage_;
block_id_t max_block_id = log_storage->block_mgr_.max_block_id_;
EXPECT_EQ(OB_SUCCESS, get_middle_scn(1329, new_leader, max_scn, header_new));
// flashback跨文件场景重启
EXPECT_EQ(OB_SUCCESS, submit_log(new_leader, 33, leader_idx, MAX_LOG_BODY_SIZE));
wait_until_has_committed(new_leader, new_leader.palf_handle_impl_->sw_.get_max_lsn());
EXPECT_LE(max_block_id, log_storage->block_mgr_.max_block_id_);
switch_append_to_flashback(new_leader, mode_version);
EXPECT_EQ(OB_SUCCESS, new_leader.palf_handle_impl_->flashback(mode_version, max_scn, timeout_ts_us));
EXPECT_GE(max_block_id, log_storage->block_mgr_.max_block_id_);
switch_flashback_to_append(new_leader, mode_version);
PALF_LOG(INFO, "flashback after restart");
}
EXPECT_EQ(OB_SUCCESS, restart_paxos_groups());
{
PalfHandleImplGuard new_leader;
int64_t curr_mode_version = INVALID_PROPOSAL_ID;
AccessMode curr_access_mode = AccessMode::INVALID_ACCESS_MODE;
EXPECT_EQ(OB_SUCCESS, get_leader(id, new_leader, leader_idx));
EXPECT_EQ(OB_SUCCESS, new_leader.palf_handle_impl_->get_access_mode(curr_mode_version, curr_access_mode));
// flashback到某个文件的尾部
EXPECT_EQ(OB_SUCCESS, submit_log(new_leader, 65, leader_idx, MAX_LOG_BODY_SIZE));
wait_until_has_committed(new_leader, new_leader.palf_handle_impl_->sw_.get_max_lsn());
switch_append_to_flashback(new_leader, mode_version);
LSN lsn(PALF_BLOCK_SIZE);
LogStorage *log_storage = &new_leader.palf_handle_impl_->log_engine_.log_storage_;
SCN block_end_scn;
{
PalfGroupBufferIterator iterator;
auto get_file_end_lsn = [](){
return LSN(PALF_BLOCK_SIZE);
};
EXPECT_EQ(OB_SUCCESS, iterator.init(LSN(0), get_file_end_lsn, log_storage));
LogGroupEntry entry;
LSN lsn;
while (OB_SUCCESS == iterator.next()) {
EXPECT_EQ(OB_SUCCESS, iterator.get_entry(entry, lsn));
}
block_end_scn = entry.get_scn();
}
EXPECT_EQ(OB_SUCCESS, new_leader.palf_handle_impl_->flashback(mode_version, block_end_scn, timeout_ts_us));
EXPECT_EQ(lsn, log_storage->log_tail_);
EXPECT_EQ(OB_ITER_END, read_log(new_leader));
EXPECT_EQ(OB_SUCCESS, submit_log(new_leader, 1000, leader_idx));
sleep(1);
}
EXPECT_EQ(OB_SUCCESS, restart_paxos_groups());
// 重启后继续提交日志
PalfHandleImplGuard new_leader;
EXPECT_EQ(OB_SUCCESS, get_leader(id, new_leader, leader_idx));
switch_flashback_to_append(new_leader, mode_version);
EXPECT_EQ(OB_SUCCESS, submit_log(new_leader, 100, leader_idx));
wait_until_has_committed(new_leader, new_leader.palf_handle_impl_->sw_.get_max_lsn());
EXPECT_EQ(OB_ITER_END, read_log(new_leader));
new_leader.reset();
delete_paxos_group(id);
@ -491,8 +536,14 @@ TEST_F(TestObSimpleLogClusterSingleReplica, test_truncate_failed)
FileDirectoryUtils::get_file_size(block_path, file_size);
EXPECT_EQ(file_size, PALF_PHY_BLOCK_SIZE);
get_leader(id, leader, leader_idx);
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 10, id, 1000));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 32, id, MAX_LOG_BODY_SIZE));
wait_lsn_until_flushed(leader.palf_handle_impl_->get_max_lsn(), leader);
EXPECT_EQ(OB_ITER_END, read_log(leader));
// 验证truncate文件尾后重启
EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->log_engine_.truncate(LSN(PALF_BLOCK_SIZE)));
EXPECT_EQ(LSN(PALF_BLOCK_SIZE), leader.palf_handle_impl_->log_engine_.log_storage_.log_tail_);
leader.reset();
EXPECT_EQ(OB_SUCCESS, restart_paxos_groups());
}
TEST_F(TestObSimpleLogClusterSingleReplica, test_meta)

View File

@ -65,7 +65,7 @@ int LogBlockMgr::init(const char *log_dir,
PALF_LOG(ERROR, "::open failed", K(ret), K(log_dir));
} else if (OB_FAIL(curr_writable_handler_.init(dir_fd_, log_block_size, align_size, align_buf_size))) {
PALF_LOG(ERROR, "init curr_writable_handler_ failed", K(ret), K(log_dir));
} else if (OB_FAIL(do_scan_dir_(log_dir, initial_block_id))) {
} else if (OB_FAIL(do_scan_dir_(log_dir, initial_block_id, log_block_pool))) {
PALF_LOG(ERROR, "do_scan_dir_ failed", K(ret), K(log_dir));
} else if (OB_FAIL(try_recovery_last_block_(log_dir))) {
PALF_LOG(ERROR, "try_recovery_last_block_ failed", K(ret), KPC(this));
@ -349,7 +349,7 @@ int LogBlockMgr::do_truncate_(const block_id_t block_id,
PALF_LOG(ERROR, "unexpected error, block id is not same sa curr_writable_block_id_", K(ret),
KPC(this), K(block_id));
} else if (OB_FAIL(block_id_to_string(block_id, block_path, OB_MAX_FILE_NAME_LENGTH))) {
PALF_LOG(ERROR, "block_id_ti_string failed", K(ret), K(block_id), KPC(this));
PALF_LOG(ERROR, "block_id_to_string failed", K(ret), K(block_id), KPC(this));
} else if (OB_FAIL(curr_writable_handler_.close())) {
PALF_LOG(ERROR, "close curr_writable_handler_ failed", K(ret), K(block_id), KPC(this));
} else if (OB_FAIL(curr_writable_handler_.open(block_path))) {
@ -458,14 +458,16 @@ int LogBlockMgr::do_delete_block_(const block_id_t block_id)
return ret;
}
int LogBlockMgr::do_scan_dir_(const char *dir, const block_id_t initial_block_id)
int LogBlockMgr::do_scan_dir_(const char *dir,
const block_id_t initial_block_id,
ILogBlockPool *log_block_pool)
{
int ret = OB_SUCCESS;
if (LOG_INVALID_BLOCK_ID != min_block_id_ || LOG_INVALID_BLOCK_ID != max_block_id_) {
ret = OB_ERR_UNEXPECTED;
PALF_LOG(WARN, "unexpected error, the cache data must be invalid", K(ret), K(min_block_id_), K(max_block_id_));
} else {
TrimLogDirectoryFunctor functor(dir);
TrimLogDirectoryFunctor functor(dir, log_block_pool);
if (OB_FAIL(scan_dir(dir, functor))) {
PALF_LOG(WARN, "scan_dir failed", K(ret), K(dir));
} else {

View File

@ -89,7 +89,7 @@ private:
int do_delete_block_(const block_id_t block_id);
int delete_block_from_back_to_front_until_(const block_id_t block_id);
int do_truncate_(const block_id_t block_id, const offset_t offset);
int do_scan_dir_(const char *dir, const block_id_t initial_block_id);
int do_scan_dir_(const char *dir, const block_id_t initial_block_id, ILogBlockPool *log_block_pool);
int do_rename_and_fsync_(const char *block_path, const char *tmp_block_path);
bool empty_() const;
int try_recovery_last_block_(const char *log_dir);

View File

@ -14,6 +14,7 @@
#include "lib/list/ob_dlist.h"
#include "share/ob_errno.h"
#include "linux/falloc.h" // FALLOC_FL_ZERO_RANGE for linux kernel 4.9
#include "log_block_pool_interface.h"
namespace oceanbase
{
@ -165,7 +166,7 @@ int TrimLogDirectoryFunctor::func(const dirent *entry)
// do nothing, skip invalid block like tmp
} else {
if (true == str_is_flashback_block
&& OB_FAIL(rename_flashback_to_normal(entry_name))) {
&& OB_FAIL(rename_flashback_to_normal_(entry_name))) {
PALF_LOG(ERROR, "rename_flashback_to_normal failed", K(ret), K(dir_), K(entry_name));
}
if (OB_SUCC(ret)) {
@ -182,7 +183,7 @@ int TrimLogDirectoryFunctor::func(const dirent *entry)
return ret;
}
int TrimLogDirectoryFunctor::rename_flashback_to_normal(const char *file_name)
int TrimLogDirectoryFunctor::rename_flashback_to_normal_(const char *file_name)
{
int ret = OB_SUCCESS;
int dir_fd = -1;
@ -191,7 +192,9 @@ int TrimLogDirectoryFunctor::rename_flashback_to_normal(const char *file_name)
const int64_t SLEEP_TS_US = 10 * 1000;
if (-1 == (dir_fd = ::open(dir_, O_DIRECTORY | O_RDONLY))) {
ret = convert_sys_errno();
} else {
} else if (OB_FAIL(try_to_remove_block_(dir_fd, normal_file_name))) {
PALF_LOG(ERROR, "try_to_remove_block_ failed", K(file_name), K(normal_file_name));
} else {
do {
if (-1 == ::renameat(dir_fd, file_name, dir_fd, normal_file_name)) {
ret = convert_sys_errno();
@ -211,6 +214,31 @@ int TrimLogDirectoryFunctor::rename_flashback_to_normal(const char *file_name)
return ret;
}
int TrimLogDirectoryFunctor::try_to_remove_block_(const int dir_fd, const char *file_name)
{
int ret = OB_SUCCESS;
int fd = -1;
if (-1 == (fd = ::openat(dir_fd, file_name, LOG_READ_FLAG))) {
ret = convert_sys_errno();
}
// if file not exist, return OB_SUCCESS;
if (OB_FAIL(ret)) {
if (OB_NO_SUCH_FILE_OR_DIRECTORY == ret) {
ret = OB_SUCCESS;
PALF_LOG(INFO, "before rename flashback to normal and after delete normal file, restart!!!", K(file_name));
} else {
PALF_LOG(ERROR, "open file failed", K(file_name));
}
} else if (OB_FAIL(log_block_pool_->remove_block_at(dir_fd, file_name))) {
PALF_LOG(ERROR, "remove_block_at failed", K(dir_fd), K(file_name));
}
if (-1 != fd && -1 == ::close(fd)) {
ret = convert_sys_errno();
PALF_LOG(ERROR, "close fd failed", K(file_name));
}
return ret;
}
int reuse_block_at(const int dir_fd, const char *block_path)
{
int ret = OB_SUCCESS;

View File

@ -45,6 +45,7 @@ typedef uint64_t offset_t;
constexpr int64_t INVALID_PALF_ID = -1;
class LSN;
class LogWriteBuf;
class ILogBlockPool;
// ==================== palf env start =============================
const int64_t MIN_DISK_SIZE_PER_PALF_INSTANCE = 512 * 1024 * 1024ul;
@ -351,22 +352,25 @@ private:
class TrimLogDirectoryFunctor : public ObBaseDirFunctor
{
public:
TrimLogDirectoryFunctor(const char *dir)
TrimLogDirectoryFunctor(const char *dir, ILogBlockPool *log_block_pool)
: dir_(dir),
min_block_id_(LOG_INVALID_BLOCK_ID),
max_block_id_(LOG_INVALID_BLOCK_ID)
max_block_id_(LOG_INVALID_BLOCK_ID),
log_block_pool_(log_block_pool)
{
}
virtual ~TrimLogDirectoryFunctor() = default;
int rename_flashback_to_normal(const char *file_name);
int func(const dirent *entry) override final;
block_id_t get_min_block_id() const { return min_block_id_; }
block_id_t get_max_block_id() const { return max_block_id_; }
private:
int rename_flashback_to_normal_(const char *file_name);
int try_to_remove_block_(const int dir_fd, const char *file_name);
const char *dir_;
block_id_t min_block_id_;
block_id_t max_block_id_;
ILogBlockPool *log_block_pool_;
DISALLOW_COPY_AND_ASSIGN(TrimLogDirectoryFunctor);
};

View File

@ -312,6 +312,11 @@ int LogStorage::inner_truncate_(const LSN &lsn)
int ret = OB_SUCCESS;
const block_id_t lsn_block_id = lsn_2_block(lsn, logical_block_size_);
const block_id_t log_tail_block_id = lsn_2_block(log_tail_, logical_block_size_);
// 'expected_next_block_id' used to check whether disk is integral, we make sure that either it's
// empty or it doesn't exist.
// because the padding log is submitted by next log, even if the 'lsn' is the end lsn of padding
// the block after 'lsn_block_id' must exist. we just set expected_next_block_id to 'lsn_block_id' + 1
// and the block after 'lsn_block_id' will be reset to empty.
const block_id_t expected_next_block_id = lsn_block_id + 1;
if (lsn_block_id != log_tail_block_id && OB_FAIL(update_manifest_cb_(expected_next_block_id))) {
PALF_LOG(WARN,
@ -418,9 +423,12 @@ int LogStorage::end_flashback(const LSN &start_lsn_of_block)
{
int ret = OB_SUCCESS;
const block_id_t block_id = lsn_2_block(start_lsn_of_block, logical_block_size_);
// update manifest
const block_id_t log_tail_block_id = lsn_2_block(log_tail_, logical_block_size_);
const block_id_t expected_next_block_id = log_tail_block_id + 1;
// NB: 'expected_next_block_id' is used to check whether disk is integral, we make sure that either it's
// empty or it doesn't exist.
// we can set 'expected_next_block_id' to 'block_id' + 1 because of the block of 'start_lsn_of_block'
// must exist. even if the block after 'block_id' have been deleted, the block of 'expected_next_block_id'
// will not exist.
const block_id_t expected_next_block_id = block_id + 1;
if (OB_FAIL(update_manifest_cb_(expected_next_block_id))) {
PALF_LOG(WARN, "update_manifest_cb_ failed", K(ret), KPC(this), K(block_id),
K(expected_next_block_id), K(start_lsn_of_block));
@ -615,7 +623,7 @@ int LogStorage::inner_switch_block_()
{
int ret = OB_SUCCESS;
const block_id_t block_id = lsn_2_block(log_tail_, logical_block_size_);
// 'expected_next_block_id' used to check whether disk is integral, we make sure that either it's
// 'expected_next_block_id' is used to check whether disk is integral, we make sure that either it's
// empty or it doesn't exist.
const block_id_t expected_next_block_id = block_id + 1;
if (OB_FAIL(block_mgr_.switch_next_block(block_id))) {