fixed restart failed because of manifest.

This commit is contained in:
HaHaJeff 2023-04-18 12:45:15 +00:00 committed by ob-robot
parent ba51bb38c6
commit 4ebbfc88a9
5 changed files with 159 additions and 34 deletions

View File

@ -217,7 +217,7 @@ TEST_F(TestObSimpleLogClusterLogEngine, flashback_restart)
LogStorage *log_storage = &leader1.get_palf_handle_impl()->log_engine_.log_storage_;
EXPECT_LE(2, log_storage->block_mgr_.max_block_id_);
EXPECT_EQ(OB_SUCCESS, log_storage->block_mgr_.create_tmp_block_handler(2));
EXPECT_EQ(OB_SUCCESS, log_storage->update_manifest_cb_(3));
EXPECT_EQ(OB_SUCCESS, log_storage->update_manifest_(3));
EXPECT_EQ(OB_SUCCESS, log_storage->block_mgr_.delete_block_from_back_to_front_until(2));
{
LogBlockMgr *block_mgr = &log_storage->block_mgr_;

View File

@ -620,9 +620,94 @@ TEST_F(TestObSimpleLogClusterSingleReplica, test_restart)
sleep(1);
EXPECT_EQ(OB_ERR_UNEXPECTED, restart_paxos_groups());
system(log_fd);
PALF_LOG(INFO, "first restart_paxos_groups");
EXPECT_EQ(OB_SUCCESS, restart_paxos_groups());
PalfHandleImplGuard leader;
EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, leader_idx, leader));
// 验证切文件过程中宕机重启
{
PalfHandleImplGuard leader;
EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, leader_idx, leader));
EXPECT_EQ(OB_SUCCESS, get_leader(id, leader, leader_idx));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 33, id, MAX_LOG_BODY_SIZE));
wait_lsn_until_flushed(leader.palf_handle_impl_->get_max_lsn(), leader);
block_id_t min_block_id, max_block_id;
LogStorage *log_storage = &leader.palf_handle_impl_->log_engine_.log_storage_;
LogStorage *meta_storage = &leader.get_palf_handle_impl()->log_engine_.log_meta_storage_;
EXPECT_EQ(OB_SUCCESS, log_storage->get_block_id_range(min_block_id, max_block_id));
EXPECT_EQ(1, max_block_id);
// 模拟只switch block,但没有更新manifest, 此时manifest依旧是1, 宕机重启后由于2号文件为空,manifest会被更新为2
EXPECT_EQ(OB_SUCCESS, log_storage->truncate(LSN(PALF_BLOCK_SIZE)));
EXPECT_EQ(OB_SUCCESS, log_storage->update_manifest_(1));
EXPECT_EQ(PALF_BLOCK_SIZE, log_storage->curr_block_writable_size_);
EXPECT_EQ(1, lsn_2_block(meta_storage->log_block_header_.min_lsn_, PALF_BLOCK_SIZE));
}
PALF_LOG(INFO, "second restart_paxos_groups");
EXPECT_EQ(OB_SUCCESS, restart_paxos_groups());
{
PalfHandleImplGuard leader;
EXPECT_EQ(OB_SUCCESS, get_leader(id, leader, leader_idx));
//检查manifest是否为3
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, id, MAX_LOG_BODY_SIZE));
LogStorage *meta_storage = &leader.get_palf_handle_impl()->log_engine_.log_meta_storage_;
EXPECT_EQ(2, lsn_2_block(meta_storage->log_block_header_.min_lsn_, PALF_BLOCK_SIZE));
}
PALF_LOG(INFO, "third restart_paxos_groups");
EXPECT_EQ(OB_SUCCESS, restart_paxos_groups());
// 验证重启后新建日志流
{
PalfHandleImplGuard leader;
id = ATOMIC_AAF(&palf_id_, 1);
EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, leader_idx, leader));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 66, id, MAX_LOG_BODY_SIZE));
wait_lsn_until_flushed(leader.palf_handle_impl_->get_max_lsn(), leader);
EXPECT_EQ(OB_ITER_END, read_log(leader));
}
// 验证truncate或flashback过程中,修改完manifest后,删除文件前宕机重启(删除1个文件)
{
PalfHandleImplGuard leader;
EXPECT_EQ(OB_SUCCESS, get_leader(id, leader, leader_idx));
block_id_t min_block_id, max_block_id;
// 此时manifest为3
LogStorage *log_storage = &leader.palf_handle_impl_->log_engine_.log_storage_;
LogStorage *meta_storage = &leader.get_palf_handle_impl()->log_engine_.log_meta_storage_;
EXPECT_EQ(OB_SUCCESS, log_storage->get_block_id_range(min_block_id, max_block_id));
EXPECT_EQ(2, max_block_id);
EXPECT_EQ(3, lsn_2_block(meta_storage->log_block_header_.min_lsn_, PALF_BLOCK_SIZE));
// truncate 或 flashback会先更新manifest为2
EXPECT_EQ(OB_SUCCESS, log_storage->update_manifest_(2));
EXPECT_EQ(2, lsn_2_block(meta_storage->log_block_header_.min_lsn_, PALF_BLOCK_SIZE));
}
// 验证truncate或flashback过程中,修改完manifest后,truncaet/flashback正好将最后一个文件空
EXPECT_EQ(OB_SUCCESS, restart_paxos_groups());
{
PalfHandleImplGuard leader;
EXPECT_EQ(OB_SUCCESS, get_leader(id, leader, leader_idx));
block_id_t min_block_id, max_block_id;
// 此时manifest为2
LogStorage *log_storage = &leader.palf_handle_impl_->log_engine_.log_storage_;
LogStorage *meta_storage = &leader.get_palf_handle_impl()->log_engine_.log_meta_storage_;
EXPECT_EQ(OB_SUCCESS, log_storage->get_block_id_range(min_block_id, max_block_id));
EXPECT_EQ(2, max_block_id);
// 尽管manifest为2,但在这种场景下,2号文件是可以删除的
EXPECT_EQ(2, lsn_2_block(meta_storage->log_block_header_.min_lsn_, PALF_BLOCK_SIZE));
EXPECT_EQ(OB_SUCCESS, log_storage->truncate(LSN(2*PALF_BLOCK_SIZE)));
EXPECT_EQ(OB_SUCCESS, log_storage->update_manifest_(2));
}
EXPECT_EQ(OB_SUCCESS, restart_paxos_groups());
{
PalfHandleImplGuard leader;
EXPECT_EQ(OB_SUCCESS, get_leader(id, leader, leader_idx));
block_id_t min_block_id, max_block_id;
// 重启之后,由于磁盘上最大的文件为2,同时该文件为空,此时会更新manifest为3
LogStorage *log_storage = &leader.palf_handle_impl_->log_engine_.log_storage_;
LogStorage *meta_storage = &leader.get_palf_handle_impl()->log_engine_.log_meta_storage_;
EXPECT_EQ(OB_SUCCESS, log_storage->get_block_id_range(min_block_id, max_block_id));
EXPECT_EQ(2, max_block_id);
EXPECT_EQ(3, lsn_2_block(meta_storage->log_block_header_.min_lsn_, PALF_BLOCK_SIZE));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, id, MAX_LOG_BODY_SIZE));
wait_lsn_until_flushed(leader.palf_handle_impl_->get_max_lsn(), leader);
EXPECT_EQ(3, lsn_2_block(meta_storage->log_block_header_.min_lsn_, PALF_BLOCK_SIZE));
}
}
TEST_F(TestObSimpleLogClusterSingleReplica, test_iterator)

View File

@ -90,11 +90,11 @@ int LogEngine::init(const int64_t palf_id,
const int64_t log_meta_storage_block_ize)
{
int ret = OB_SUCCESS;
auto log_meta_storage_update_manifest_cb = [](const block_id_t max_block_id) {
auto log_meta_storage_update_manifest_cb = [](const block_id_t max_block_id, const bool in_restart) {
// do nothing
return OB_SUCCESS;
};
auto log_storage_update_manifest_cb = [this](const block_id_t max_block_id) {
auto log_storage_update_manifest_cb = [this](const block_id_t max_block_id, const bool in_restart) {
return this->update_manifest(max_block_id);
};
if (IS_INIT) {
@ -185,17 +185,28 @@ int LogEngine::load(const int64_t palf_id,
{
int ret = OB_SUCCESS;
ObTimeGuard guard("load", 0);
auto log_meta_storage_update_manifest_cb = [&](const block_id_t max_block_id) {
block_id_t expected_next_block_id = LOG_INVALID_BLOCK_ID;
auto log_meta_storage_update_manifest_cb = [](const block_id_t new_expected_next_block_id, const bool in_restart) {
// do nothing
return OB_SUCCESS;
};
auto log_storage_update_manifest_cb = [&](const block_id_t max_block_id) {
return this->update_manifest(max_block_id);
auto log_storage_update_manifest_cb = [&expected_next_block_id, this](const block_id_t new_expected_next_block_id, const bool in_restart) {
int ret = OB_SUCCESS;
if (in_restart) {
if (new_expected_next_block_id == expected_next_block_id + 1) {
PALF_LOG(INFO, "need update_manifest in restart", K(in_restart), K(new_expected_next_block_id), K(expected_next_block_id));
ret = this->update_manifest(new_expected_next_block_id);
} else {
PALF_LOG(INFO, "no need update_manifest in restart", K(in_restart), K(new_expected_next_block_id), K(expected_next_block_id));
}
} else {
ret = this->update_manifest(new_expected_next_block_id);
}
return ret;
};
LSN last_group_entry_header_lsn;
LSN last_meta_entry_start_lsn;
LogMetaEntryHeader unused_meta_entry_header;
block_id_t expected_next_block_id = LOG_INVALID_BLOCK_ID;
if (IS_INIT) {
ret = OB_INIT_TWICE;
PALF_LOG(ERROR, "LogEngine has initted!!!", K(ret), K(palf_id));
@ -779,7 +790,10 @@ int LogEngine::update_base_lsn_used_for_gc(const LSN &lsn)
int LogEngine::update_manifest(const block_id_t block_id)
{
int ret = OB_SUCCESS;
if (OB_FAIL(log_meta_storage_.update_manifest_used_for_meta_storage(block_id))) {
if (!is_valid_block_id(block_id)) {
ret = OB_INVALID_ARGUMENT;
PALF_LOG(ERROR, "invalid argument!!!", KPC(this), K(block_id));
} else if (OB_FAIL(log_meta_storage_.update_manifest_used_for_meta_storage(block_id))) {
PALF_LOG(WARN, "update_manifest_used_for_meta_storage failed", K(ret), K_(palf_id), K_(is_inited));
} else {
PALF_LOG(INFO,
@ -1378,7 +1392,7 @@ int LogEngine::try_clear_up_holes_and_check_storage_integrity_(
//
// Ensure that:
// 1. check LogStorage integrity only when 'expected_next_block_id' is greater than
// 'base_block_id';
// 'base_block_id', consider rebuild, all blocks on disk may be deleted.;
// 2. 'min_block_id' must be smaller than or equal to 'base_block_id';
// 3. the last block is integral, means that 'max_block_id' is ethier equal to
// 'expected_next_block_id'(last block is
@ -1437,13 +1451,17 @@ bool LogEngine::check_last_block_whether_is_integrity_(const block_id_t expected
const block_id_t max_block_id,
const LSN &log_storage_tail)
{
// 1. 'expected_next_block_id' == 'max_block_id' + 1, last block is not empty
// 2. 'expected_next_block_id' == 'max_block_id', last block is empty(no data and LogBlockHeader)
// 3. 'expected_next_block_id' < 'max_block_id', means there is a 'truncate' opt.
// NB:
// 1. 'expected_next_block_id' == 'max_block_id' + 1, normal case
// 2. 'expected_next_block_id' <= 'max_block_id', means:
// 1. a 'truncate' or 'flashback' opt before stop palf, we need update manifest first,
// and stop palf before delete blocks on disk, 'expected_next_block_id' is smaller
// than or equal to 'max_block_id'.
// 2. a switch block opt before stop palf, and just create new block on disk success,
// expected_next_block_id is equal to 'max_block_id', and 'max_block_id' is empty,
// we need update manifest to max_block_id + 1 in process of restart.
return expected_next_block_id == max_block_id + 1
|| (expected_next_block_id == max_block_id
&& LSN(max_block_id * PALF_BLOCK_SIZE) == log_storage_tail)
|| expected_next_block_id < max_block_id;
|| expected_next_block_id <= max_block_id;
}
void LogEngine::reset_min_block_info_guarded_by_lock_(const block_id_t min_block_id, const SCN &min_block_max_scn)

View File

@ -83,7 +83,7 @@ int LogStorage::load_manifest_for_meta_storage(block_id_t &expected_next_block_i
//
// If we need support switch block when write failed, the solution is that:
// 1. only delete prev block when in append_meta interface;
// 2. if last meta block is empty, we alose need read its block header.
// 2. if last meta block is empty, we also need read its block header.
} else if (OB_FAIL(
read_block_header_(last_block_id, log_block_header_))) {
PALF_LOG(WARN, "read_block_header_ failed", K(ret), KPC(this));
@ -312,15 +312,13 @@ int LogStorage::inner_truncate_(const LSN &lsn)
int ret = OB_SUCCESS;
const block_id_t lsn_block_id = lsn_2_block(lsn, logical_block_size_);
const block_id_t log_tail_block_id = lsn_2_block(log_tail_, logical_block_size_);
// 'expected_next_block_id' used to check whether disk is integral, we make sure that either it's
// empty or it doesn't exist.
// because the padding log is submitted by next log, even if the 'lsn' is the end lsn of padding
// the block after 'lsn_block_id' must exist. we just set expected_next_block_id to 'lsn_block_id' + 1
// and the block after 'lsn_block_id' will be reset to empty.
// constriaints: 'expected_next_block_id' is used to check whether blocks on disk are integral,
// we make sure that the content in each block_id which is greater than or equal to
// 'expected_next_block_id' are not been used.
const block_id_t expected_next_block_id = lsn_block_id + 1;
if (lsn_block_id != log_tail_block_id && OB_FAIL(update_manifest_cb_(expected_next_block_id))) {
if (lsn_block_id != log_tail_block_id && OB_FAIL(update_manifest_(expected_next_block_id))) {
PALF_LOG(WARN,
"inner_truncat_ update_manifest_cb_ failed",
"inner_truncat_ update_manifest_ failed",
K(ret),
K(expected_next_block_id),
KPC(this));
@ -423,14 +421,14 @@ int LogStorage::end_flashback(const LSN &start_lsn_of_block)
{
int ret = OB_SUCCESS;
const block_id_t block_id = lsn_2_block(start_lsn_of_block, logical_block_size_);
// NB: 'expected_next_block_id' is used to check whether disk is integral, we make sure that either it's
// empty or it doesn't exist.
// constriaints: 'expected_next_block_id' is used to check whether blocks on disk are integral,
// we make sure that the content in each block_id which is greater than or equal to
// 'expected_next_block_id' are not been used.
// we can set 'expected_next_block_id' to 'block_id' + 1 because of the block of 'start_lsn_of_block'
// must exist. even if the block after 'block_id' have been deleted, the block of 'expected_next_block_id'
// will not exist.
// must exist.(we will delete each block after 'block_id', not include 'block_id')
const block_id_t expected_next_block_id = block_id + 1;
if (OB_FAIL(update_manifest_cb_(expected_next_block_id))) {
PALF_LOG(WARN, "update_manifest_cb_ failed", K(ret), KPC(this), K(block_id),
if (OB_FAIL(update_manifest_(expected_next_block_id))) {
PALF_LOG(WARN, "update_manifest_ failed", K(ret), KPC(this), K(block_id),
K(expected_next_block_id), K(start_lsn_of_block));
} else if (OB_FAIL(block_mgr_.delete_block_from_back_to_front_until(block_id))) {
PALF_LOG(ERROR, "delete_block_from_back_to_front_until failed", K(ret),
@ -557,6 +555,24 @@ int LogStorage::load_last_block_(const block_id_t min_block_id,
// update 'curr_block_id_' of LogBlockHeader
OB_ASSERT(curr_block_writable_size_ <= logical_block_size_);
}
// update manifest when last block is empty, because we update manifest after create new block, if stop observer between
// create new block and update manifest, after restart we can append log to this block and will not update manifest because
// the last block has been created successfully before restart. and then resatrt will fail because new write option will
// no longer switch block. the constriaints of manifest are broken.
//
// constriaints: 'expected_next_block_id' is used to check whether blocks on disk are integral, we make sure that the content
// in each block_id which is greater than or equal to 'expected_next_block_id' is not been used.
//
const bool in_restart = true;
if (logical_block_size_ == curr_block_writable_size_) {
const block_id_t expected_next_block_id = max_block_id + 1;
// for restart, update_manifest_cb_ will check whther expected_next_block_id is 'manifest' + 1
if (OB_FAIL(update_manifest_cb_(expected_next_block_id, in_restart))) {
PALF_LOG(WARN, "update_manifest_ failed", KPC(this), K(expected_next_block_id));
} else {
PALF_LOG(INFO, "need update manifest in restart", KPC(this), K(expected_next_block_id));
}
}
return ret;
}
@ -637,8 +653,8 @@ int LogStorage::inner_switch_block_()
const block_id_t expected_next_block_id = block_id + 1;
if (OB_FAIL(block_mgr_.switch_next_block(block_id))) {
PALF_LOG(ERROR, "switch_next_block failed", K(ret));
} else if (OB_FAIL(update_manifest_cb_(expected_next_block_id))) {
PALF_LOG(WARN, "update_manifest_cb_ failed", K(ret), KPC(this), K(block_id));
} else if (OB_FAIL(update_manifest_(expected_next_block_id))) {
PALF_LOG(WARN, "update_manifest_ failed", K(ret), KPC(this), K(block_id));
} else {
PALF_LOG(INFO, "inner_switch_block_ success", K(ret), K(log_block_header_),
K(block_id));
@ -851,5 +867,10 @@ void LogStorage::reset_log_tail_for_last_block_(const LSN &lsn, bool last_block_
need_append_block_header_ = (curr_block_writable_size_ == logical_block_size_) ? true : false;
log_tail_ = readable_log_tail_ = lsn;
}
int LogStorage::update_manifest_(const block_id_t expected_next_block_id, const bool in_restart)
{
return update_manifest_cb_(expected_next_block_id, in_restart);
}
} // end namespace palf
} // end namespace oceanbase

View File

@ -39,7 +39,7 @@ class ReadBuf;
class LogStorage : public ILogStorage
{
public:
using UpdateManifestCallback = ObFunction<int(const block_id_t)>;
using UpdateManifestCallback = ObFunction<int(const block_id_t, const bool in_restart)>;
LogStorage();
~LogStorage();
int init(const char *log_dir,
@ -167,6 +167,7 @@ private:
ReadBuf &read_buf,
int64_t &out_read_size);
void reset_log_tail_for_last_block_(const LSN &lsn, bool last_block_exist);
int update_manifest_(const block_id_t expected_next_block_id, const bool in_restart = false);
private:
// Used to perform IO tasks in the background
LogBlockMgr block_mgr_;