[CP] fix round checkpoint scn is not consistent with piece checkpoint scn

This commit is contained in:
wxhwang 2024-04-28 06:18:16 +00:00 committed by ob-robot
parent 57b9848525
commit ead3e7cb36
3 changed files with 344 additions and 8 deletions

View File

@ -100,7 +100,7 @@ int ObDestRoundCheckpointer::checkpoint(const ObTenantArchiveRoundAttr &round_in
LOG_WARN("backwards, checkpoint scn is bigger than limit scn", K(ret), K(round_info), K_(max_checkpoint_scn));
} else if (OB_FAIL(count_(round_info, summary, counter))) {
LOG_WARN("failed to to count", K(ret), K(round_info), K(summary));
} else if (OB_FAIL(gen_new_round_info_(round_info, counter, result.new_round_info_, need_checkpoint))) {
} else if (OB_FAIL(gen_new_round_info_(round_info, summary, counter, result.new_round_info_, need_checkpoint))) {
LOG_WARN("failed to decide next state", K(ret), K(round_info), K(counter), K(summary));
} else if (!need_checkpoint) {
} else if (OB_FAIL(checkpoint_(round_info, summary, result))) {
@ -177,8 +177,81 @@ int ObDestRoundCheckpointer::count_(
return ret;
}
int ObDestRoundCheckpointer::gen_new_round_info_(const ObTenantArchiveRoundAttr &old_round_info, const ObDestRoundCheckpointer::Counter &counter,
ObTenantArchiveRoundAttr &new_round_info, bool &need_checkpoint) const
int ObDestRoundCheckpointer::calc_next_checkpoint_scn_(
const ObTenantArchiveRoundAttr &old_round_info,
const ObDestRoundSummary &summary,
const Counter &counter,
SCN &next_checkpoint_scn) const
{
int ret = OB_SUCCESS;
SCN max_avail_piece_start_scn;
SCN max_avail_piece_checkpoint_scn;
int64_t max_avail_piece_id = 0;
// The next checkpoint scn can not exceed the max_checkpoint_scn_ which takes the GTS.
next_checkpoint_scn = MIN(max_checkpoint_scn_, counter.checkpoint_scn_);
if (OB_FAIL(ObTenantArchiveMgr::decide_piece_id(
old_round_info.start_scn_,
old_round_info.base_piece_id_,
old_round_info.piece_switch_interval_,
next_checkpoint_scn,
max_avail_piece_id))) {
LOG_WARN("failed to calc max available piece id", K(ret), K(old_round_info), K(next_checkpoint_scn));
} else if (OB_FAIL(ObTenantArchiveMgr::decide_piece_start_scn(
old_round_info.start_scn_,
old_round_info.base_piece_id_,
old_round_info.piece_switch_interval_,
max_avail_piece_id,
max_avail_piece_start_scn))) {
LOG_WARN("failed to calc max available piece start scn", K(ret), K(old_round_info), K(max_avail_piece_id));
}
// Consider 2 log streams, the log groups info are as following :
// 1001: [500, 600], [700, 1200]
// 1002: [500, 900]
// Then the reasonable next round checkpoint scn is 900. However, suppose the piece switch end scn is 1000,
// if we specify to restore until 800, the result is that it will return and cannot be recovered. As the
// log with range [700, 800] is in next piece, but the file status is BACKUP_FILE_INCOMPLETE which we will
// ignore during restore. In this case, the next round checkpoint scn will be adjust to 600, instead of 900.
max_avail_piece_checkpoint_scn = SCN::max_scn();
const ObArray<ObLSDestRoundSummary> &ls_round_list = summary.ls_round_list_;
for (int64_t i = 0; OB_SUCC(ret) && i < ls_round_list.count(); i++) {
const ObLSDestRoundSummary &ls_round = ls_round_list.at(i);
// search the piece
int64_t idx = ls_round.get_piece_idx(max_avail_piece_id);
if (-1 == idx) {
LOG_INFO("ls piece not found", K(ret), K(max_avail_piece_id), K(ls_round));
} else {
bool last_piece = false;
const ObLSDestRoundSummary::OnePiece &ls_piece = ls_round.piece_list_.at(idx);
if (OB_FAIL(ls_round.check_is_last_piece_for_deleted_ls(max_avail_piece_id, last_piece))) {
LOG_WARN("failed to check is last piece for deleted ls", K(ret));
} else if (last_piece) {
// If the ls is deleted, and this is the last piece. It should not
// affect the checkpoint_scn.
// Mark the last piece deleted for deleted ls. For example, piece 10 and 11 is found of
// a deleted ls for current checkpoint, piece 10 is not marked with deleted, but piece 11
// is marked with deleted.
// do nothing.
} else {
// checkpoint scn may be smaller than start scn for empty piece.
max_avail_piece_checkpoint_scn = MAX(max_avail_piece_start_scn, MIN(max_avail_piece_checkpoint_scn, ls_piece.checkpoint_scn_));
}
}
}
if (OB_SUCC(ret)) {
next_checkpoint_scn = MIN(next_checkpoint_scn, max_avail_piece_checkpoint_scn);
}
return ret;
}
int ObDestRoundCheckpointer::gen_new_round_info_(
const ObTenantArchiveRoundAttr &old_round_info,
const ObDestRoundSummary &summary,
const ObDestRoundCheckpointer::Counter &counter,
ObTenantArchiveRoundAttr &new_round_info,
bool &need_checkpoint) const
{
int ret = OB_SUCCESS;
// Current existing log stream count.
@ -193,11 +266,11 @@ int ObDestRoundCheckpointer::gen_new_round_info_(const ObTenantArchiveRoundAttr
old_round_info.piece_switch_interval_, counter.max_scn_, new_round_info.used_piece_id_))) {
LOG_WARN("failed to calc MAX piece id", K(ret), K(old_round_info), K(counter));
} else if (OB_FALSE_IT(new_round_info.max_scn_ = counter.max_scn_)) {
} else if (OB_FALSE_IT(next_checkpoint_scn = MIN(max_checkpoint_scn_, counter.checkpoint_scn_))) {
// Checkpoint can not over limit ts. However, if old round goes into STOPPING, then we will not
// move checkpoint_scn on.
} else if (OB_FAIL(calc_next_checkpoint_scn_(old_round_info, summary, counter, next_checkpoint_scn))) {
LOG_WARN("failed to calc next checkpoint scn", K(ret), K(old_round_info), K(summary), K(counter));
}
if (OB_FAIL(ret)) {
} else if (old_round_info.state_.is_beginning()) {
if (counter.not_start_cnt_ > 0) {

View File

@ -106,8 +106,17 @@ private:
bool can_do_checkpoint_(const ObTenantArchiveRoundAttr &round_info) const;
int count_(const ObTenantArchiveRoundAttr &old_round_info, const ObDestRoundSummary &summary, Counter &counter) const;
int gen_new_round_info_(const ObTenantArchiveRoundAttr &old_round_info, const Counter &counter,
ObTenantArchiveRoundAttr &new_round_info, bool &need_checkpoint) const;
int calc_next_checkpoint_scn_(
const ObTenantArchiveRoundAttr &old_round_info,
const ObDestRoundSummary &summary,
const Counter &counter,
SCN &next_checkpoint_scn) const;
int gen_new_round_info_(
const ObTenantArchiveRoundAttr &old_round_info,
const ObDestRoundSummary &summary,
const Counter &counter,
ObTenantArchiveRoundAttr &new_round_info,
bool &need_checkpoint) const;
// do checkpoint to checkpoint_ts.
int checkpoint_(const ObTenantArchiveRoundAttr &round_info, const ObDestRoundSummary &summary,

View File

@ -2184,6 +2184,260 @@ TEST_F(ArchiveCheckpointerTest, in_doing_06)
ASSERT_EQ(g_call_cnt, 3);
}
TEST_F(ArchiveCheckpointerTest, in_doing_07)
{
// old round's status is DOING.
ObTenantArchiveRoundAttr old_round;
fill_round(
ObArchiveRoundState::doing(),
"2022-01-01 00:00:00", /* start time */
"2022-01-01 00:00:20", /* checkpoint time */
"2022-01-01 00:00:30", /* max time */
1, /* used piece id */
0, /* frozen_input_bytes */
0, /* frozen_output_bytes */
100, /* active_input_bytes */
10, /* active_output_bytes */
0, /* deleted_input_bytes */
0, /* deleted_output_bytes */
old_round);
// 2 log streams are archiving.
ObDestRoundSummary summary;
// log stream 1001 is archiving.
ObLSDestRoundSummary ls_1001;
ObArchiveLSPieceSummary piece_1001_1;
fill_archive_ls_piece(
1001, /* ls id */
false, /* is deleted */
1, /* piece id */
ObArchiveRoundState::doing(), /* state */
"2022-01-01 00:00:00", /* start time */
"2022-01-01 00:00:50", /* checkpoint time */
0, /* min_lsn */
2000, /* max_lsn */
200, /* input_bytes */
20, /* output_bytes */
piece_1001_1);
ASSERT_EQ(ls_1001.add_one_piece(piece_1001_1), OB_SUCCESS);
// log stream 1002 is archiving.
ObLSDestRoundSummary ls_1002;
ObArchiveLSPieceSummary piece_1002_1;
fill_archive_ls_piece(
1002, /* ls id */
false, /* is deleted */
1, /* piece id */
ObArchiveRoundState::doing(), /* state */
"2022-01-01 00:00:00", /* start time */
"2022-01-01 00:00:40", /* checkpoint time */
0, /* min_lsn */
1000, /* max_lsn */
100, /* input_bytes */
10, /* output_bytes */
piece_1002_1);
ObArchiveLSPieceSummary piece_1002_2;
fill_archive_ls_piece(
1002, /* ls id */
false, /* is deleted */
2, /* piece id */
ObArchiveRoundState::doing(), /* state */
"2022-01-01 00:01:00", /* start time */
"2022-01-01 00:01:30", /* checkpoint time */
1000, /* min_lsn */
2000, /* max_lsn */
100, /* input_bytes */
10, /* output_bytes */
piece_1002_2);
ASSERT_EQ(ls_1002.add_one_piece(piece_1002_1), OB_SUCCESS);
ASSERT_EQ(ls_1002.add_one_piece(piece_1002_2), OB_SUCCESS);
ASSERT_EQ(summary.add_ls_dest_round_summary(ls_1001), OB_SUCCESS);
ASSERT_EQ(summary.add_ls_dest_round_summary(ls_1002), OB_SUCCESS);
// All log streams are archiving, the next status is DOING.
class MockRoundHandler final: public ObArchiveRoundHandler
{
public:
int checkpoint_to(
const ObTenantArchiveRoundAttr &old_round,
const ObTenantArchiveRoundAttr &new_round,
const common::ObIArray<ObTenantArchivePieceAttr> &pieces) override
{
int ret = OB_SUCCESS;
g_call_cnt++;
ArchiveCheckpointerTest test;
ObTenantArchiveRoundAttr expect_round;
test.fill_new_round(
old_round,
ObArchiveRoundState::doing(), /* state */
"2022-01-01 00:00:40", /* checkpoint_time */
"2022-01-01 00:01:30", /* max_time */
2, /* used_piece_id */
0, /* frozen_input_bytes */
0, /* frozen_output_bytes */
400, /* active_input_bytes */
40, /* active_output_bytes */
0, /* deleted_input_bytes */
0, /* deleted_output_bytes */
expect_round);
ObTenantArchivePieceAttr expect_piece_1;
test.fill_piece(
old_round,
1, /* piece id */
"2022-01-01 00:00:40", /* checkpoint time */
"2022-01-01 00:00:50", /* max time */
300, /* input_bytes */
30, /* output_bytes */
ObArchivePieceStatus::active(), /* piece status */
ObBackupFileStatus::STATUS::BACKUP_FILE_AVAILABLE, /* file status */
expect_piece_1);
ObTenantArchivePieceAttr expect_piece_2;
test.fill_piece(
old_round,
2, /* piece id */
"2022-01-01 00:01:00", /* checkpoint time */
"2022-01-01 00:01:30", /* max time */
100, /* input_bytes */
10, /* output_bytes */
ObArchivePieceStatus::active(), /* piece status */
ObBackupFileStatus::STATUS::BACKUP_FILE_INCOMPLETE, /* file status */
expect_piece_2);
ret = test.compare_two_rounds(new_round, expect_round);
if (OB_SUCC(ret)) {
if (pieces.count() != 2) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid pieces count", K(ret), K(pieces));
}
}
if (OB_SUCC(ret)) {
const ObTenantArchivePieceAttr &piece_1 = pieces.at(0);
if (OB_FAIL(test.compare_two_pieces(piece_1, expect_piece_1))) {
LOG_WARN("not equal pieces", K(ret), K(piece_1), K(expect_piece_1));
}
}
if (OB_SUCC(ret)) {
const ObTenantArchivePieceAttr &piece_2 = pieces.at(1);
if (OB_FAIL(test.compare_two_pieces(piece_2, expect_piece_2))) {
LOG_WARN("not equal pieces", K(ret), K(piece_2), K(expect_piece_2));
}
}
return ret;
}
};
ObDestRoundCheckpointer::PieceGeneratedCb gen_piece_cb =
[](common::ObISQLClient *proxy, const ObTenantArchiveRoundAttr &old_round, const ObDestRoundCheckpointer::Result &result, const ObDestRoundCheckpointer::GeneratedPiece &piece)
{
int ret = OB_SUCCESS;
g_call_cnt++;
ArchiveCheckpointerTest test;
ObDestRoundCheckpointer::GeneratedPiece expect_piece;
if (piece.piece_info_.key_.piece_id_ == 1) {
test.fill_piece(
old_round,
1,
"2022-01-01 00:00:40",
"2022-01-01 00:00:50",
300,
30,
ObArchivePieceStatus::active(),
ObBackupFileStatus::STATUS::BACKUP_FILE_AVAILABLE,
expect_piece.piece_info_);
ObDestRoundCheckpointer::GeneratedLSPiece ls_piece_1001 = test.gen_checkpoint_ls_piece(
1001,
"2022-01-01 00:00:00",
"2022-01-01 00:00:50",
0,
2000,
200,
20);
ObDestRoundCheckpointer::GeneratedLSPiece ls_piece_1002 = test.gen_checkpoint_ls_piece(
1002,
"2022-01-01 00:00:00",
"2022-01-01 00:00:40",
0,
1000,
100,
10);
expect_piece.ls_piece_list_.push_back(ls_piece_1001);
expect_piece.ls_piece_list_.push_back(ls_piece_1002);
} else {
test.fill_piece(
old_round,
2,
"2022-01-01 00:01:00",
"2022-01-01 00:01:30",
100,
10,
ObArchivePieceStatus::active(),
ObBackupFileStatus::STATUS::BACKUP_FILE_INCOMPLETE,
expect_piece.piece_info_);
ObDestRoundCheckpointer::GeneratedLSPiece ls_piece_1002 = test.gen_checkpoint_ls_piece(
1002,
"2022-01-01 00:01:00",
"2022-01-01 00:01:30",
1000,
2000,
100,
10);
expect_piece.ls_piece_list_.push_back(ls_piece_1002);
}
ret = test.compare_two_checkpoint_pieces(piece, expect_piece);
return ret;
};
ObDestRoundCheckpointer::RoundCheckpointCb round_cb =
[](common::ObISQLClient *proxy, const ObTenantArchiveRoundAttr &old_round, const ObTenantArchiveRoundAttr &new_round)
{
int ret = OB_SUCCESS;
g_call_cnt++;
ArchiveCheckpointerTest test;
ObTenantArchiveRoundAttr expect_round;
test.fill_new_round(
old_round,
ObArchiveRoundState::doing(),
"2022-01-01 00:00:40",
"2022-01-01 00:01:30",
2,
0,
0,
400,
40,
0,
0,
expect_round);
ret = test.compare_two_rounds(new_round, expect_round);
return ret;
};
int ret = OB_SUCCESS;
g_call_cnt = 0;
MockRoundHandler mock_handler;
ObDestRoundCheckpointer checkpointer;
share::SCN limit_scn;
(void)limit_scn.convert_for_logservice(convert_timestr_2_scn("2022-01-01 00:00:45"));
ret = checkpointer.init(&mock_handler, gen_piece_cb, round_cb, limit_scn);
ASSERT_EQ(OB_SUCCESS, ret);
ret = checkpointer.checkpoint(old_round, summary);
ASSERT_EQ(OB_SUCCESS, ret);
ASSERT_EQ(g_call_cnt, 4);
}
TEST_F(ArchiveCheckpointerTest, in_stopping_01)
{