diff --git a/src/share/backup/ob_archive_checkpoint.cpp b/src/share/backup/ob_archive_checkpoint.cpp index 74b25013b..4e8848748 100644 --- a/src/share/backup/ob_archive_checkpoint.cpp +++ b/src/share/backup/ob_archive_checkpoint.cpp @@ -100,7 +100,7 @@ int ObDestRoundCheckpointer::checkpoint(const ObTenantArchiveRoundAttr &round_in LOG_WARN("backwards, checkpoint scn is bigger than limit scn", K(ret), K(round_info), K_(max_checkpoint_scn)); } else if (OB_FAIL(count_(round_info, summary, counter))) { LOG_WARN("failed to to count", K(ret), K(round_info), K(summary)); - } else if (OB_FAIL(gen_new_round_info_(round_info, counter, result.new_round_info_, need_checkpoint))) { + } else if (OB_FAIL(gen_new_round_info_(round_info, summary, counter, result.new_round_info_, need_checkpoint))) { LOG_WARN("failed to decide next state", K(ret), K(round_info), K(counter), K(summary)); } else if (!need_checkpoint) { } else if (OB_FAIL(checkpoint_(round_info, summary, result))) { @@ -177,8 +177,81 @@ int ObDestRoundCheckpointer::count_( return ret; } -int ObDestRoundCheckpointer::gen_new_round_info_(const ObTenantArchiveRoundAttr &old_round_info, const ObDestRoundCheckpointer::Counter &counter, - ObTenantArchiveRoundAttr &new_round_info, bool &need_checkpoint) const +int ObDestRoundCheckpointer::calc_next_checkpoint_scn_( + const ObTenantArchiveRoundAttr &old_round_info, + const ObDestRoundSummary &summary, + const Counter &counter, + SCN &next_checkpoint_scn) const +{ + int ret = OB_SUCCESS; + SCN max_avail_piece_start_scn; + SCN max_avail_piece_checkpoint_scn; + int64_t max_avail_piece_id = 0; + // The next checkpoint scn can not exceed the max_checkpoint_scn_ which takes the GTS. + next_checkpoint_scn = MIN(max_checkpoint_scn_, counter.checkpoint_scn_); + if (OB_FAIL(ObTenantArchiveMgr::decide_piece_id( + old_round_info.start_scn_, + old_round_info.base_piece_id_, + old_round_info.piece_switch_interval_, + next_checkpoint_scn, + max_avail_piece_id))) { + LOG_WARN("failed to calc max available piece id", K(ret), K(old_round_info), K(next_checkpoint_scn)); + } else if (OB_FAIL(ObTenantArchiveMgr::decide_piece_start_scn( + old_round_info.start_scn_, + old_round_info.base_piece_id_, + old_round_info.piece_switch_interval_, + max_avail_piece_id, + max_avail_piece_start_scn))) { + LOG_WARN("failed to calc max available piece start scn", K(ret), K(old_round_info), K(max_avail_piece_id)); + } + + // Consider 2 log streams, the log groups info are as following : + // 1001: [500, 600], [700, 1200] + // 1002: [500, 900] + // Then the reasonable next round checkpoint scn is 900. However, suppose the piece switch end scn is 1000, + // if we specify to restore until 800, the result is that it will return and cannot be recovered. As the + // log with range [700, 800] is in next piece, but the file status is BACKUP_FILE_INCOMPLETE which we will + // ignore during restore. In this case, the next round checkpoint scn will be adjust to 600, instead of 900. + max_avail_piece_checkpoint_scn = SCN::max_scn(); + const ObArray &ls_round_list = summary.ls_round_list_; + for (int64_t i = 0; OB_SUCC(ret) && i < ls_round_list.count(); i++) { + const ObLSDestRoundSummary &ls_round = ls_round_list.at(i); + // search the piece + int64_t idx = ls_round.get_piece_idx(max_avail_piece_id); + if (-1 == idx) { + LOG_INFO("ls piece not found", K(ret), K(max_avail_piece_id), K(ls_round)); + } else { + bool last_piece = false; + const ObLSDestRoundSummary::OnePiece &ls_piece = ls_round.piece_list_.at(idx); + if (OB_FAIL(ls_round.check_is_last_piece_for_deleted_ls(max_avail_piece_id, last_piece))) { + LOG_WARN("failed to check is last piece for deleted ls", K(ret)); + } else if (last_piece) { + // If the ls is deleted, and this is the last piece. It should not + // affect the checkpoint_scn. + // Mark the last piece deleted for deleted ls. For example, piece 10 and 11 is found of + // a deleted ls for current checkpoint, piece 10 is not marked with deleted, but piece 11 + // is marked with deleted. + // do nothing. + } else { + // checkpoint scn may be smaller than start scn for empty piece. + max_avail_piece_checkpoint_scn = MAX(max_avail_piece_start_scn, MIN(max_avail_piece_checkpoint_scn, ls_piece.checkpoint_scn_)); + } + } + } + + if (OB_SUCC(ret)) { + next_checkpoint_scn = MIN(next_checkpoint_scn, max_avail_piece_checkpoint_scn); + } + + return ret; +} + +int ObDestRoundCheckpointer::gen_new_round_info_( + const ObTenantArchiveRoundAttr &old_round_info, + const ObDestRoundSummary &summary, + const ObDestRoundCheckpointer::Counter &counter, + ObTenantArchiveRoundAttr &new_round_info, + bool &need_checkpoint) const { int ret = OB_SUCCESS; // Current existing log stream count. @@ -193,11 +266,11 @@ int ObDestRoundCheckpointer::gen_new_round_info_(const ObTenantArchiveRoundAttr old_round_info.piece_switch_interval_, counter.max_scn_, new_round_info.used_piece_id_))) { LOG_WARN("failed to calc MAX piece id", K(ret), K(old_round_info), K(counter)); } else if (OB_FALSE_IT(new_round_info.max_scn_ = counter.max_scn_)) { - } else if (OB_FALSE_IT(next_checkpoint_scn = MIN(max_checkpoint_scn_, counter.checkpoint_scn_))) { - // Checkpoint can not over limit ts. However, if old round goes into STOPPING, then we will not - // move checkpoint_scn on. + } else if (OB_FAIL(calc_next_checkpoint_scn_(old_round_info, summary, counter, next_checkpoint_scn))) { + LOG_WARN("failed to calc next checkpoint scn", K(ret), K(old_round_info), K(summary), K(counter)); } + if (OB_FAIL(ret)) { } else if (old_round_info.state_.is_beginning()) { if (counter.not_start_cnt_ > 0) { diff --git a/src/share/backup/ob_archive_checkpoint.h b/src/share/backup/ob_archive_checkpoint.h index d058922d1..b8c8d2e11 100644 --- a/src/share/backup/ob_archive_checkpoint.h +++ b/src/share/backup/ob_archive_checkpoint.h @@ -106,8 +106,17 @@ private: bool can_do_checkpoint_(const ObTenantArchiveRoundAttr &round_info) const; int count_(const ObTenantArchiveRoundAttr &old_round_info, const ObDestRoundSummary &summary, Counter &counter) const; - int gen_new_round_info_(const ObTenantArchiveRoundAttr &old_round_info, const Counter &counter, - ObTenantArchiveRoundAttr &new_round_info, bool &need_checkpoint) const; + int calc_next_checkpoint_scn_( + const ObTenantArchiveRoundAttr &old_round_info, + const ObDestRoundSummary &summary, + const Counter &counter, + SCN &next_checkpoint_scn) const; + int gen_new_round_info_( + const ObTenantArchiveRoundAttr &old_round_info, + const ObDestRoundSummary &summary, + const Counter &counter, + ObTenantArchiveRoundAttr &new_round_info, + bool &need_checkpoint) const; // do checkpoint to checkpoint_ts. int checkpoint_(const ObTenantArchiveRoundAttr &round_info, const ObDestRoundSummary &summary, diff --git a/unittest/rootserver/test_archive_checkpoint.cpp b/unittest/rootserver/test_archive_checkpoint.cpp index 7cbe0ce6d..c819a3c2e 100644 --- a/unittest/rootserver/test_archive_checkpoint.cpp +++ b/unittest/rootserver/test_archive_checkpoint.cpp @@ -2184,6 +2184,260 @@ TEST_F(ArchiveCheckpointerTest, in_doing_06) ASSERT_EQ(g_call_cnt, 3); } +TEST_F(ArchiveCheckpointerTest, in_doing_07) +{ + // old round's status is DOING. + ObTenantArchiveRoundAttr old_round; + fill_round( + ObArchiveRoundState::doing(), + "2022-01-01 00:00:00", /* start time */ + "2022-01-01 00:00:20", /* checkpoint time */ + "2022-01-01 00:00:30", /* max time */ + 1, /* used piece id */ + 0, /* frozen_input_bytes */ + 0, /* frozen_output_bytes */ + 100, /* active_input_bytes */ + 10, /* active_output_bytes */ + 0, /* deleted_input_bytes */ + 0, /* deleted_output_bytes */ + old_round); + + // 2 log streams are archiving. + ObDestRoundSummary summary; + // log stream 1001 is archiving. + ObLSDestRoundSummary ls_1001; + ObArchiveLSPieceSummary piece_1001_1; + fill_archive_ls_piece( + 1001, /* ls id */ + false, /* is deleted */ + 1, /* piece id */ + ObArchiveRoundState::doing(), /* state */ + "2022-01-01 00:00:00", /* start time */ + "2022-01-01 00:00:50", /* checkpoint time */ + 0, /* min_lsn */ + 2000, /* max_lsn */ + 200, /* input_bytes */ + 20, /* output_bytes */ + piece_1001_1); + ASSERT_EQ(ls_1001.add_one_piece(piece_1001_1), OB_SUCCESS); + + // log stream 1002 is archiving. + ObLSDestRoundSummary ls_1002; + ObArchiveLSPieceSummary piece_1002_1; + fill_archive_ls_piece( + 1002, /* ls id */ + false, /* is deleted */ + 1, /* piece id */ + ObArchiveRoundState::doing(), /* state */ + "2022-01-01 00:00:00", /* start time */ + "2022-01-01 00:00:40", /* checkpoint time */ + 0, /* min_lsn */ + 1000, /* max_lsn */ + 100, /* input_bytes */ + 10, /* output_bytes */ + piece_1002_1); + + ObArchiveLSPieceSummary piece_1002_2; + fill_archive_ls_piece( + 1002, /* ls id */ + false, /* is deleted */ + 2, /* piece id */ + ObArchiveRoundState::doing(), /* state */ + "2022-01-01 00:01:00", /* start time */ + "2022-01-01 00:01:30", /* checkpoint time */ + 1000, /* min_lsn */ + 2000, /* max_lsn */ + 100, /* input_bytes */ + 10, /* output_bytes */ + piece_1002_2); + ASSERT_EQ(ls_1002.add_one_piece(piece_1002_1), OB_SUCCESS); + ASSERT_EQ(ls_1002.add_one_piece(piece_1002_2), OB_SUCCESS); + ASSERT_EQ(summary.add_ls_dest_round_summary(ls_1001), OB_SUCCESS); + ASSERT_EQ(summary.add_ls_dest_round_summary(ls_1002), OB_SUCCESS); + + + // All log streams are archiving, the next status is DOING. + class MockRoundHandler final: public ObArchiveRoundHandler + { + public: + int checkpoint_to( + const ObTenantArchiveRoundAttr &old_round, + const ObTenantArchiveRoundAttr &new_round, + const common::ObIArray &pieces) override + { + int ret = OB_SUCCESS; + g_call_cnt++; + ArchiveCheckpointerTest test; + ObTenantArchiveRoundAttr expect_round; + test.fill_new_round( + old_round, + ObArchiveRoundState::doing(), /* state */ + "2022-01-01 00:00:40", /* checkpoint_time */ + "2022-01-01 00:01:30", /* max_time */ + 2, /* used_piece_id */ + 0, /* frozen_input_bytes */ + 0, /* frozen_output_bytes */ + 400, /* active_input_bytes */ + 40, /* active_output_bytes */ + 0, /* deleted_input_bytes */ + 0, /* deleted_output_bytes */ + expect_round); + + ObTenantArchivePieceAttr expect_piece_1; + test.fill_piece( + old_round, + 1, /* piece id */ + "2022-01-01 00:00:40", /* checkpoint time */ + "2022-01-01 00:00:50", /* max time */ + 300, /* input_bytes */ + 30, /* output_bytes */ + ObArchivePieceStatus::active(), /* piece status */ + ObBackupFileStatus::STATUS::BACKUP_FILE_AVAILABLE, /* file status */ + expect_piece_1); + + ObTenantArchivePieceAttr expect_piece_2; + test.fill_piece( + old_round, + 2, /* piece id */ + "2022-01-01 00:01:00", /* checkpoint time */ + "2022-01-01 00:01:30", /* max time */ + 100, /* input_bytes */ + 10, /* output_bytes */ + ObArchivePieceStatus::active(), /* piece status */ + ObBackupFileStatus::STATUS::BACKUP_FILE_INCOMPLETE, /* file status */ + expect_piece_2); + + + ret = test.compare_two_rounds(new_round, expect_round); + if (OB_SUCC(ret)) { + if (pieces.count() != 2) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("invalid pieces count", K(ret), K(pieces)); + } + } + + if (OB_SUCC(ret)) { + const ObTenantArchivePieceAttr &piece_1 = pieces.at(0); + if (OB_FAIL(test.compare_two_pieces(piece_1, expect_piece_1))) { + LOG_WARN("not equal pieces", K(ret), K(piece_1), K(expect_piece_1)); + } + } + + if (OB_SUCC(ret)) { + const ObTenantArchivePieceAttr &piece_2 = pieces.at(1); + if (OB_FAIL(test.compare_two_pieces(piece_2, expect_piece_2))) { + LOG_WARN("not equal pieces", K(ret), K(piece_2), K(expect_piece_2)); + } + } + return ret; + } + }; + + ObDestRoundCheckpointer::PieceGeneratedCb gen_piece_cb = + [](common::ObISQLClient *proxy, const ObTenantArchiveRoundAttr &old_round, const ObDestRoundCheckpointer::Result &result, const ObDestRoundCheckpointer::GeneratedPiece &piece) + { + int ret = OB_SUCCESS; + g_call_cnt++; + ArchiveCheckpointerTest test; + ObDestRoundCheckpointer::GeneratedPiece expect_piece; + if (piece.piece_info_.key_.piece_id_ == 1) { + test.fill_piece( + old_round, + 1, + "2022-01-01 00:00:40", + "2022-01-01 00:00:50", + 300, + 30, + ObArchivePieceStatus::active(), + ObBackupFileStatus::STATUS::BACKUP_FILE_AVAILABLE, + expect_piece.piece_info_); + + ObDestRoundCheckpointer::GeneratedLSPiece ls_piece_1001 = test.gen_checkpoint_ls_piece( + 1001, + "2022-01-01 00:00:00", + "2022-01-01 00:00:50", + 0, + 2000, + 200, + 20); + + ObDestRoundCheckpointer::GeneratedLSPiece ls_piece_1002 = test.gen_checkpoint_ls_piece( + 1002, + "2022-01-01 00:00:00", + "2022-01-01 00:00:40", + 0, + 1000, + 100, + 10); + + expect_piece.ls_piece_list_.push_back(ls_piece_1001); + expect_piece.ls_piece_list_.push_back(ls_piece_1002); + } else { + test.fill_piece( + old_round, + 2, + "2022-01-01 00:01:00", + "2022-01-01 00:01:30", + 100, + 10, + ObArchivePieceStatus::active(), + ObBackupFileStatus::STATUS::BACKUP_FILE_INCOMPLETE, + expect_piece.piece_info_); + + ObDestRoundCheckpointer::GeneratedLSPiece ls_piece_1002 = test.gen_checkpoint_ls_piece( + 1002, + "2022-01-01 00:01:00", + "2022-01-01 00:01:30", + 1000, + 2000, + 100, + 10); + expect_piece.ls_piece_list_.push_back(ls_piece_1002); + } + + ret = test.compare_two_checkpoint_pieces(piece, expect_piece); + return ret; + }; + + ObDestRoundCheckpointer::RoundCheckpointCb round_cb = + [](common::ObISQLClient *proxy, const ObTenantArchiveRoundAttr &old_round, const ObTenantArchiveRoundAttr &new_round) + { + int ret = OB_SUCCESS; + g_call_cnt++; + ArchiveCheckpointerTest test; + ObTenantArchiveRoundAttr expect_round; + test.fill_new_round( + old_round, + ObArchiveRoundState::doing(), + "2022-01-01 00:00:40", + "2022-01-01 00:01:30", + 2, + 0, + 0, + 400, + 40, + 0, + 0, + expect_round); + + ret = test.compare_two_rounds(new_round, expect_round); + return ret; + }; + + int ret = OB_SUCCESS; + g_call_cnt = 0; + MockRoundHandler mock_handler; + ObDestRoundCheckpointer checkpointer; + share::SCN limit_scn; + (void)limit_scn.convert_for_logservice(convert_timestr_2_scn("2022-01-01 00:00:45")); + ret = checkpointer.init(&mock_handler, gen_piece_cb, round_cb, limit_scn); + ASSERT_EQ(OB_SUCCESS, ret); + + ret = checkpointer.checkpoint(old_round, summary); + ASSERT_EQ(OB_SUCCESS, ret); + ASSERT_EQ(g_call_cnt, 4); +} + TEST_F(ArchiveCheckpointerTest, in_stopping_01) {