From cb698570608cd6b9781586bff22bd98edca92f83 Mon Sep 17 00:00:00 2001 From: HaHaJeff Date: Wed, 29 Mar 2023 10:45:31 +0000 Subject: [PATCH] fixed replay error after failover error due to padding. --- .../logservice/test_ob_simple_log_replay.cpp | 225 +++++++++++++++++- .../replayservice/ob_log_replay_service.cpp | 1 + .../replayservice/ob_replay_status.cpp | 7 + 3 files changed, 231 insertions(+), 2 deletions(-) diff --git a/mittest/logservice/test_ob_simple_log_replay.cpp b/mittest/logservice/test_ob_simple_log_replay.cpp index 8115105be..4b8b972cb 100644 --- a/mittest/logservice/test_ob_simple_log_replay.cpp +++ b/mittest/logservice/test_ob_simple_log_replay.cpp @@ -14,10 +14,13 @@ #include #include #define private public +#define protected public #include "logservice/ob_ls_adapter.h" #include "share/scn.h" #include "env/ob_simple_log_cluster_env.h" +#include "logservice/palf/palf_iterator.h" #undef private +#undef protected const std::string TEST_NAME = "replay_func"; using namespace oceanbase::common; @@ -103,13 +106,15 @@ public: ObReplayStatus *rp_st_; }; -int64_t ObSimpleLogClusterTestBase::member_cnt_ = 3; -int64_t ObSimpleLogClusterTestBase::node_cnt_ = 3; +int64_t ObSimpleLogClusterTestBase::member_cnt_ = 1; +int64_t ObSimpleLogClusterTestBase::node_cnt_ = 1; std::string ObSimpleLogClusterTestBase::test_name_ = TEST_NAME; bool ObSimpleLogClusterTestBase::need_add_arb_server_ = false; TEST_F(TestObSimpleLogReplayFunc, basic_replay) { + + SET_CASE_LOG_FILE(TEST_NAME, "basic_replay"); const int64_t task_count = 1024; const int64_t id = ATOMIC_AAF(&palf_id_, 1); ObLSID ls_id(id); @@ -206,6 +211,222 @@ TEST_F(TestObSimpleLogReplayFunc, basic_replay) rp_sv.destroy(); CLOG_LOG(INFO, "test replay finish", K(id)); } + +TEST_F(TestObSimpleLogReplayFunc, test_flashback_to_padding) +{ + SET_CASE_LOG_FILE(TEST_NAME, "flashback_to_padding"); + const int64_t id = ATOMIC_AAF(&palf_id_, 1); + ObLSID ls_id(id); + int64_t leader_idx = 0; + LSN basic_lsn(0); + PalfHandleImplGuard leader; + share::SCN basic_scn = share::SCN::min_scn(); + CLOG_LOG(INFO, "test replay begin", K(id)); + EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, leader_idx, leader)); + MockLSAdapter ls_adapter; + ls_adapter.init((ObLSService *)(0x1)); + ObLogReplayService rp_sv; + ObReplayStatus *rp_st = NULL; + PalfEnv *palf_env; + EXPECT_EQ(OB_SUCCESS, get_palf_env(leader_idx, palf_env)); + rp_sv.init(palf_env, &ls_adapter, get_cluster()[0]->get_allocator()); + rp_sv.start(); + get_cluster()[0]->get_tenant_base()->update_thread_cnt(10); + LSN iterator_end_lsn(0); + LSN *iterator_end_lsn_ptr = &iterator_end_lsn; + auto get_file_end_lsn =[iterator_end_lsn_ptr]() { + CLOG_LOG(INFO, "get_file_end_lsn", K(*iterator_end_lsn_ptr)); + return *iterator_end_lsn_ptr; + }; + EXPECT_EQ(OB_SUCCESS, rp_sv.add_ls(ls_id, ObReplicaType::REPLICA_TYPE_FULL)); + EXPECT_EQ(OB_SUCCESS, rp_sv.enable(ls_id, basic_lsn, basic_scn)); + { + ObReplayStatusGuard guard; + EXPECT_EQ(OB_SUCCESS, rp_sv.get_replay_status_(ls_id, guard)); + rp_st = guard.get_replay_status(); + ls_adapter.rp_st_ = rp_st; + } + PalfBufferIterator &iterator = rp_st->submit_log_task_.iterator_; + iterator.iterator_storage_.get_file_end_lsn_ = get_file_end_lsn; + // 停止拉日志 + rp_st->block_submit(); + EXPECT_EQ(OB_SUCCESS, submit_log(leader, 31, leader_idx, MAX_LOG_BODY_SIZE)); + EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, leader.get_palf_handle_impl()->get_max_lsn())); + LogStorage *log_storage = &leader.get_palf_handle_impl()->log_engine_.log_storage_; + LSN padding_header = log_storage->log_tail_; + SCN padding_header_scn = leader.get_palf_handle_impl()->get_max_scn(); + EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, MAX_LOG_BODY_SIZE)); + EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, leader.get_palf_handle_impl()->get_max_lsn())); + LSN log_tail = log_storage->log_tail_; + SCN padding_tail_scn = leader.get_palf_handle_impl()->get_end_scn(); + EXPECT_LE(padding_header, LSN(PALF_BLOCK_SIZE)); + EXPECT_GE(padding_header+MAX_LOG_BODY_SIZE, LSN(PALF_BLOCK_SIZE)); + int64_t mode_version; + switch_append_to_flashback(leader, mode_version); + + // Test1: flashback到padding头部, replay先回放到padding再执行flashback + { + int ret = OB_SUCCESS; + CLOG_LOG(WARN, "flashback to padding header case1"); + int64_t abs_timeout_us = 4*1000*1000; + SCN flashback_scn = padding_header_scn; + EXPECT_EQ(OB_SUCCESS, leader.get_palf_handle_impl()->flashback(mode_version, flashback_scn, abs_timeout_us)); + // iterator看到的终点是padding日志尾 + iterator_end_lsn = LSN(PALF_BLOCK_SIZE); + // replay看到的committed位点是padding尾 + rp_st->unblock_submit(); + EXPECT_EQ(OB_SUCCESS, rp_st->update_end_offset(LSN(PALF_BLOCK_SIZE))); + // 开启拉日志 + bool is_done = false; + while (!is_done) { + rp_sv.is_replay_done(ls_id, LSN(PALF_BLOCK_SIZE), is_done); + usleep(10*1000); + CLOG_LOG(WARN, "not replay done", KPC(rp_st), K(padding_header)); + } + is_done = false; + CLOG_LOG(INFO, "runlin trace 3", K(iterator), KPC(rp_st)); + // 预期replay的next_to_submit_lsn是padding尾 + EXPECT_EQ(iterator_end_lsn, rp_st->submit_log_task_.next_to_submit_lsn_); + EXPECT_EQ(OB_SUCCESS, rp_st->flashback()); + // replay执行flashback后,next_to_submit_lsn是padding头 + EXPECT_EQ(padding_header, rp_st->submit_log_task_.next_to_submit_lsn_); + switch_flashback_to_append(leader, mode_version); + iterator_end_lsn = LSN(100000000); + // 停止拉日志 + rp_st->block_submit(); + EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, MAX_LOG_BODY_SIZE)); + padding_tail_scn = leader.get_palf_handle_impl()->get_max_scn(); + LSN max_lsn = leader.get_palf_handle_impl()->get_max_lsn(); + EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, max_lsn)); + //更新replay的committed_end_lsn + rp_st->submit_log_task_.update_committed_end_lsn(max_lsn); + } + // Test2: flashback到padding头部, replay先执行flashback,再执行replay padding + { + int ret = OB_SUCCESS; + CLOG_LOG(WARN, "flashback to padding header case2"); + int64_t abs_timeout_us = 4*1000*1000; + SCN flashback_scn = padding_header_scn; + switch_append_to_flashback(leader, mode_version); + EXPECT_EQ(OB_SUCCESS, leader.get_palf_handle_impl()->flashback(mode_version, flashback_scn, abs_timeout_us)); + // iterator看到的终点是padding日志头 + iterator_end_lsn = padding_header; + // replay看到的committed位点是padding尾 + rp_st->unblock_submit(); + rp_st->submit_log_task_.committed_end_lsn_ = LSN(PALF_BLOCK_SIZE); + bool is_done = false; + while (!is_done) { + rp_sv.is_replay_done(ls_id, LSN(PALF_BLOCK_SIZE), is_done); + usleep(10*1000); + CLOG_LOG(WARN, "not replay done", KPC(rp_st), K(padding_header)); + } + // 先执行replay的flashback + EXPECT_EQ(OB_SUCCESS, rp_st->flashback()); + // 预期replay的next_to_submit_lsn是padding头 + EXPECT_EQ(iterator_end_lsn, rp_st->submit_log_task_.next_to_submit_lsn_); + // 修改iterator看到的终点为padding尾 + iterator_end_lsn = LSN(PALF_BLOCK_SIZE); + // 触发拉日志 + rp_st->trigger_fetch_log(); + sleep(1); + // 预期replay的next_to_submit_lsn是padding头 + EXPECT_EQ(padding_header, rp_st->submit_log_task_.next_to_submit_lsn_); + switch_flashback_to_append(leader, mode_version); + // 停止拉日志 + rp_st->block_submit(); + EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, MAX_LOG_BODY_SIZE)); + padding_tail_scn = leader.get_palf_handle_impl()->get_max_scn(); + LSN max_lsn = leader.get_palf_handle_impl()->get_max_lsn(); + EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, max_lsn)); + //更新replay的committed_end_lsn + rp_st->submit_log_task_.update_committed_end_lsn(max_lsn); + } + // Test3: flashback到padding尾部, replay先执行flashback,再执行replay padding + { + int ret = OB_SUCCESS; + CLOG_LOG(WARN, "flashback to padding tailer case1"); + int64_t abs_timeout_us = 4*1000*1000; + // flashback_scn为padding尾 + SCN flashback_scn = SCN::minus(padding_tail_scn, 1); + switch_append_to_flashback(leader, mode_version); + EXPECT_EQ(OB_SUCCESS, leader.get_palf_handle_impl()->flashback(mode_version, flashback_scn, abs_timeout_us)); + // iterator看到的终点是padding日志头 + iterator_end_lsn = padding_header; + // replay看到的committed位点是padding头 + rp_st->unblock_submit(); + rp_st->submit_log_task_.committed_end_lsn_ = (padding_header); + bool is_done = false; + // iterator尽管没有吐出padding日志,但replay会直接更新next_to_submit_lsn到padding尾部 + while (!is_done) { + rp_sv.is_replay_done(ls_id, padding_header, is_done); + usleep(10*1000); + CLOG_LOG(WARN, "not replay done", KPC(rp_st), K(padding_header)); + } + is_done = false; + // 预期replay的next_to_submit_lsn是padding头 + EXPECT_EQ(padding_header, rp_st->submit_log_task_.next_to_submit_lsn_); + // 先执行replay的flashback + EXPECT_EQ(OB_SUCCESS, rp_st->flashback()); + // replay执行flashback后,next_to_submit_lsn是padding头, palf的committed位点是padding尾,不会更新next_to_submit_lsn + EXPECT_EQ(padding_header, rp_st->submit_log_task_.next_to_submit_lsn_); + rp_st->trigger_fetch_log(); + while (!is_done) { + rp_sv.is_replay_done(ls_id, LSN(PALF_BLOCK_SIZE), is_done); + usleep(10*1000); + CLOG_LOG(WARN, "not replay done", KPC(rp_st), K(padding_header)); + } + EXPECT_EQ(LSN(PALF_BLOCK_SIZE), rp_st->submit_log_task_.next_to_submit_lsn_); + switch_flashback_to_append(leader, mode_version); + rp_st->block_submit(); + EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, MAX_LOG_BODY_SIZE)); + padding_tail_scn = leader.get_palf_handle_impl()->get_max_scn(); + LSN max_lsn = leader.get_palf_handle_impl()->get_max_lsn(); + EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, max_lsn)); + //更新replay的committed_end_lsn + rp_st->submit_log_task_.update_committed_end_lsn(max_lsn); + + } + // Test4: flashback到padding尾部, 先replay完,在执行flashback + { + int ret = OB_SUCCESS; + CLOG_LOG(WARN, "flashback to padding tailer case1"); + int64_t abs_timeout_us = 4*1000*1000; + // flashback_scn为padding尾 + SCN flashback_scn = SCN::minus(padding_tail_scn, 1); + switch_append_to_flashback(leader, mode_version); + EXPECT_EQ(OB_SUCCESS, leader.get_palf_handle_impl()->flashback(mode_version, flashback_scn, abs_timeout_us)); + // iterator看到的终点是padding日志尾 + iterator_end_lsn = LSN(PALF_BLOCK_SIZE); + // replay看到的committed位点是padding尾 + rp_st->unblock_submit(); + rp_st->submit_log_task_.next_to_submit_lsn_ = (iterator_end_lsn); + bool is_done = false; + while (!is_done) { + rp_sv.is_replay_done(ls_id, iterator_end_lsn, is_done); + usleep(10*1000); + CLOG_LOG(WARN, "not replay done", KPC(rp_st), K(padding_header)); + } + is_done = false; + // 预期replay的next_to_submit_lsn是padding尾 + EXPECT_EQ(iterator_end_lsn, rp_st->submit_log_task_.next_to_submit_lsn_); + // 先执行replay的flashback + EXPECT_EQ(OB_SUCCESS, rp_st->flashback()); + // replay执行flashback后,next_to_submit_lsn是padding尾 + EXPECT_EQ(iterator_end_lsn, rp_st->submit_log_task_.next_to_submit_lsn_); + switch_flashback_to_append(leader, mode_version); + iterator_end_lsn = LSN(1000000000); + EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1000, leader_idx, 100000)); + LSN max_lsn = leader.get_palf_handle_impl()->get_max_lsn(); + EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, max_lsn)); + + while (!is_done) { + rp_sv.is_replay_done(ls_id, max_lsn, is_done); + usleep(10*1000); + CLOG_LOG(WARN, "not replay done", KPC(rp_st), K(padding_header)); + rp_st->trigger_fetch_log(); + } + } +} } // unitest } // oceanbase diff --git a/src/logservice/replayservice/ob_log_replay_service.cpp b/src/logservice/replayservice/ob_log_replay_service.cpp index 7e885af3c..dfd7b92fa 100644 --- a/src/logservice/replayservice/ob_log_replay_service.cpp +++ b/src/logservice/replayservice/ob_log_replay_service.cpp @@ -1097,6 +1097,7 @@ int ObLogReplayService::handle_submit_task_(ObReplayServiceSubmitTask *submit_ta // getting committed_end_lsn first ensures palf has all logs until committed_end_lsn. CLOG_LOG(ERROR, "failed to get_committed_end_lsn", KR(ret), K(committed_end_lsn), KPC(replay_status)); } else if (replay_status->try_rdlock()) { + (void)submit_task->get_committed_end_lsn(committed_end_lsn); const int64_t start_ts = ObClockGenerator::getClock(); bool need_submit_log = true; int64_t count = 0; diff --git a/src/logservice/replayservice/ob_replay_status.cpp b/src/logservice/replayservice/ob_replay_status.cpp index 437a56658..36fc3a0cb 100644 --- a/src/logservice/replayservice/ob_replay_status.cpp +++ b/src/logservice/replayservice/ob_replay_status.cpp @@ -255,6 +255,10 @@ int ObReplayServiceSubmitTask::set_committed_end_lsn(const LSN &lsn) { int ret = OB_SUCCESS; ATOMIC_SET(&committed_end_lsn_.val_, lsn.val_); + if (next_to_submit_lsn_ > committed_end_lsn_) { + CLOG_LOG(INFO, "need rollback next_to_submit_lsn_", K(lsn), KPC(this)); + next_to_submit_lsn_ = committed_end_lsn_; + } return ret; } @@ -854,6 +858,9 @@ int ObReplayStatus::flashback_() CLOG_LOG(WARN, "get_end_lsn failed", K(ret), KPC(this)); } else if (OB_FAIL(submit_log_task_.set_committed_end_lsn(committed_end_lsn))) { CLOG_LOG(WARN, "set_committed_end_lsn failed", K(ret), KPC(this), K(committed_end_lsn)); + } else if (OB_FAIL(submit_task_to_replay_service_(submit_log_task_))) { + CLOG_LOG(ERROR, "failed to submit submit_log_task to replay service", K(submit_log_task_), + KPC(this), K(ret)); } else { // do nothing }