@ -93,7 +93,7 @@ int ObDestRoundCheckpointer::checkpoint(const ObTenantArchiveRoundAttr &round_in
|
||||
} else if (round_info.checkpoint_scn_ > max_checkpoint_scn_) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("backwards, checkpoint scn is bigger than limit scn", K(ret), K(round_info), K_(max_checkpoint_scn));
|
||||
} else if (OB_FAIL(count_(summary, counter))) {
|
||||
} else if (OB_FAIL(count_(round_info, summary, counter))) {
|
||||
LOG_WARN("failed to to count", K(ret), K(round_info), K(summary));
|
||||
} else if (OB_FAIL(gen_new_round_info_(round_info, counter, result.new_round_info_, need_checkpoint))) {
|
||||
LOG_WARN("failed to decide next state", K(ret), K(round_info), K(counter), K(summary));
|
||||
@ -105,7 +105,10 @@ int ObDestRoundCheckpointer::checkpoint(const ObTenantArchiveRoundAttr &round_in
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObDestRoundCheckpointer::count_(const ObDestRoundSummary &summary, ObDestRoundCheckpointer::Counter &counter) const
|
||||
int ObDestRoundCheckpointer::count_(
|
||||
const ObTenantArchiveRoundAttr &old_round_info,
|
||||
const ObDestRoundSummary &summary,
|
||||
ObDestRoundCheckpointer::Counter &counter) const
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
@ -155,9 +158,13 @@ int ObDestRoundCheckpointer::count_(const ObDestRoundSummary &summary, ObDestRou
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (counter.not_start_cnt_ > 0) {
|
||||
counter.max_active_piece_id_ = 0;
|
||||
if (OB_SUCC(ret)) {
|
||||
// adjust scn
|
||||
counter.checkpoint_scn_ = MAX(old_round_info.start_scn_, counter.checkpoint_scn_);
|
||||
counter.max_scn_ = MAX(old_round_info.start_scn_, counter.max_scn_);
|
||||
if (counter.not_start_cnt_ > 0) {
|
||||
counter.max_active_piece_id_ = old_round_info.used_piece_id_;
|
||||
}
|
||||
}
|
||||
|
||||
LOG_INFO("print count result", K(ret), K(counter));
|
||||
@ -190,9 +197,6 @@ int ObDestRoundCheckpointer::gen_new_round_info_(const ObTenantArchiveRoundAttr
|
||||
} else if (old_round_info.state_.is_beginning()) {
|
||||
if (counter.not_start_cnt_ > 0) {
|
||||
need_checkpoint = false;
|
||||
} else if (next_checkpoint_scn < old_round_info.start_scn_) {
|
||||
need_checkpoint = false;
|
||||
} else if (OB_FALSE_IT(new_round_info.checkpoint_scn_ = next_checkpoint_scn)) {
|
||||
} else if (counter.interrupted_cnt_ > 0) {
|
||||
ObSqlString comment;
|
||||
new_round_info.state_.set_interrupted();
|
||||
@ -202,6 +206,9 @@ int ObDestRoundCheckpointer::gen_new_round_info_(const ObTenantArchiveRoundAttr
|
||||
LOG_WARN("failed to assign comment", K(ret), K(new_round_info), K(counter), K(comment));
|
||||
}
|
||||
LOG_INFO("switch to INTERRUPTED state", K(ret), K(old_round_info), K(counter), K(new_round_info));
|
||||
} else if (next_checkpoint_scn <= old_round_info.start_scn_) {
|
||||
need_checkpoint = false;
|
||||
} else if (OB_FALSE_IT(new_round_info.checkpoint_scn_ = next_checkpoint_scn)) {
|
||||
} else if (counter.doing_cnt_ == actual_count) {
|
||||
new_round_info.state_.set_doing();
|
||||
LOG_INFO("switch to DOING state", K(old_round_info), K(counter), K(new_round_info));
|
||||
@ -404,7 +411,11 @@ int ObDestRoundCheckpointer::generate_one_piece_(const ObTenantArchiveRoundAttr
|
||||
} else if (piece_id == max_active_piece_id) {
|
||||
piece.piece_info_.checkpoint_scn_ = MIN(new_round_info.checkpoint_scn_, piece.piece_info_.checkpoint_scn_);
|
||||
piece.piece_info_.status_.set_active();
|
||||
piece.piece_info_.file_status_ = ObBackupFileStatus::STATUS::BACKUP_FILE_AVAILABLE;
|
||||
if (piece.piece_info_.checkpoint_scn_ > piece.piece_info_.start_scn_) {
|
||||
piece.piece_info_.file_status_ = ObBackupFileStatus::STATUS::BACKUP_FILE_AVAILABLE;
|
||||
} else {
|
||||
piece.piece_info_.file_status_ = ObBackupFileStatus::STATUS::BACKUP_FILE_INCOMPLETE;
|
||||
}
|
||||
} else {
|
||||
// piece_id > max_active_piece_id
|
||||
piece.piece_info_.status_.set_active();
|
||||
|
||||
@ -104,7 +104,7 @@ private:
|
||||
|
||||
bool can_do_checkpoint_(const ObTenantArchiveRoundAttr &round_info) const;
|
||||
|
||||
int count_(const ObDestRoundSummary &summary, Counter &counter) const;
|
||||
int count_(const ObTenantArchiveRoundAttr &old_round_info, const ObDestRoundSummary &summary, Counter &counter) const;
|
||||
int gen_new_round_info_(const ObTenantArchiveRoundAttr &old_round_info, const Counter &counter,
|
||||
ObTenantArchiveRoundAttr &new_round_info, bool &need_checkpoint) const;
|
||||
|
||||
|
||||
@ -396,7 +396,7 @@ ObPieceInfoDesc::ObPieceInfoDesc()
|
||||
bool ObPieceInfoDesc::is_valid() const
|
||||
{
|
||||
return 0 <= dest_id_ && OB_START_LOG_ARCHIVE_ROUND_ID < round_id_
|
||||
&& 0 < piece_id_ && !filelist_.empty();
|
||||
&& 0 < piece_id_;
|
||||
}
|
||||
|
||||
|
||||
|
||||
@ -12,6 +12,7 @@
|
||||
|
||||
#define USING_LOG_PREFIX SHARE
|
||||
#include "share/backup/ob_archive_struct.h"
|
||||
#include "share/backup/ob_tenant_archive_mgr.h"
|
||||
#include "lib/ob_define.h"
|
||||
#include "lib/ob_errno.h"
|
||||
#include "lib/utility/ob_macro_utils.h"
|
||||
@ -279,6 +280,43 @@ int ObTenantArchiveRoundAttr::generate_next_round(const int64_t incarnation,
|
||||
next_round.piece_switch_interval_ = piece_switch_interval;
|
||||
next_round.path_ = path;
|
||||
|
||||
next_round.frozen_input_bytes_ = 0;
|
||||
next_round.frozen_input_bytes_ = 0;
|
||||
next_round.active_input_bytes_ = 0;
|
||||
next_round.active_output_bytes_ = 0;
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTenantArchiveRoundAttr::generate_first_piece(ObTenantArchivePieceAttr &first_piece) const
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (!state_.is_beginning()) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("round state is not BEGINNING", K(ret), K(*this));
|
||||
} else if (OB_FAIL(first_piece.set_path(path_))) {
|
||||
LOG_WARN("failed to set path", K(ret), K(*this));
|
||||
} else if (OB_FAIL(ObTenantArchiveMgr::decide_piece_end_scn(start_scn_, base_piece_id_, piece_switch_interval_, base_piece_id_, first_piece.end_scn_))) {
|
||||
LOG_WARN("failed to get end scn", K(ret), K(*this));
|
||||
} else {
|
||||
first_piece.key_.tenant_id_ = key_.tenant_id_;
|
||||
first_piece.key_.dest_id_ = dest_id_;
|
||||
first_piece.key_.round_id_ = round_id_;
|
||||
first_piece.key_.piece_id_ = base_piece_id_;
|
||||
first_piece.incarnation_ = incarnation_;
|
||||
first_piece.dest_no_ = key_.dest_no_;
|
||||
first_piece.file_count_ = 0;
|
||||
first_piece.start_scn_ = start_scn_;
|
||||
first_piece.checkpoint_scn_ = start_scn_;
|
||||
first_piece.max_scn_ = start_scn_;
|
||||
first_piece.compatible_ = compatible_;
|
||||
first_piece.input_bytes_ = 0;
|
||||
first_piece.output_bytes_ = 0;
|
||||
first_piece.set_active();
|
||||
first_piece.file_status_ = ObBackupFileStatus::STATUS::BACKUP_FILE_INCOMPLETE;
|
||||
first_piece.cp_file_id_ = 0;
|
||||
first_piece.cp_file_offset_ = 0;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
@ -103,6 +103,7 @@ struct ObArchiveRoundState
|
||||
|
||||
// current round
|
||||
struct ObTenantArchiveHisRoundAttr;
|
||||
struct ObTenantArchivePieceAttr;
|
||||
// Define dest round table row structure.
|
||||
struct ObTenantArchiveRoundAttr final : public ObIInnerTableRow
|
||||
{
|
||||
@ -253,6 +254,8 @@ struct ObTenantArchiveRoundAttr final : public ObIInnerTableRow
|
||||
int generate_next_round(const int64_t incarnation, const int64_t dest_id,
|
||||
const int64_t piece_switch_interval, const ObBackupPathString &path,
|
||||
ObTenantArchiveRoundAttr &next_round) const;
|
||||
// Generate first piece of the round with status 'ACTIVE' and file_status 'INCOMPLETE'
|
||||
int generate_first_piece(ObTenantArchivePieceAttr &first_piece) const;
|
||||
ObTenantArchiveHisRoundAttr generate_his_round() const;
|
||||
// Generate initial round, and set status to 'PREPARE'.
|
||||
static int generate_initial_round(const Key &key, const int64_t incarnation,
|
||||
|
||||
@ -88,7 +88,9 @@ int ObTenantArchiveMgr::decide_piece_id(
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid start_piece_id", K(ret), K(piece_start_scn), K(start_piece_id), K(piece_switch_interval), K(scn));
|
||||
} else {
|
||||
piece_id = (scn.convert_to_ts() - piece_start_scn.convert_to_ts()) / piece_switch_interval + start_piece_id;
|
||||
const int64_t piece_switch_interval_ns = piece_switch_interval * 1000;
|
||||
const uint64_t delta = scn.get_val_for_inner_table_field() - piece_start_scn.get_val_for_inner_table_field();
|
||||
piece_id = delta / piece_switch_interval_ns + start_piece_id;
|
||||
}
|
||||
|
||||
return ret;
|
||||
@ -111,8 +113,10 @@ int ObTenantArchiveMgr::decide_piece_start_scn(
|
||||
} else if (0 >= piece_switch_interval) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid piece_switch_interval", K(ret), K(piece_start_scn), K(start_piece_id), K(piece_switch_interval), K(piece_id));
|
||||
} else if (OB_FAIL(start_scn.convert_from_ts(piece_start_scn.convert_to_ts() + (piece_id - start_piece_id) * piece_switch_interval))) {
|
||||
LOG_WARN("fail to set start scn", K(ret), K(piece_start_scn), K(piece_id), K(start_piece_id), K(piece_switch_interval));
|
||||
} else {
|
||||
const int64_t piece_switch_interval_ns = piece_switch_interval * 1000;
|
||||
const uint64_t delta = piece_switch_interval_ns * (piece_id - start_piece_id);
|
||||
start_scn = SCN::plus(piece_start_scn, delta);
|
||||
}
|
||||
|
||||
return ret;
|
||||
@ -134,65 +138,6 @@ int ObTenantArchiveMgr::decide_piece_end_scn(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTenantArchiveMgr::decide_first_piece_start_scn(
|
||||
const SCN &start_scn,
|
||||
const int64_t piece_switch_interval,
|
||||
SCN &piece_start_scn)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
const int64_t ONE_DAY = 24 * 3600 * 1000000L; // us
|
||||
const int64_t ONE_HOUR = 3600 * 1000000L; // us
|
||||
const int64_t ONE_MINUTE = 60 * 1000000L; // us
|
||||
const int64_t TEN_SECONDS = 10 * 60 * 1000000L; // us
|
||||
// If 'piece_switch_interval' is equal or bigger than one day, then it must be an integer multiple of day.
|
||||
if (ONE_DAY <= piece_switch_interval) {
|
||||
if (0 != piece_switch_interval % ONE_DAY) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("piece switch interval must be an integer multiple of day", K(ret), K(start_scn), K(piece_switch_interval));
|
||||
} else if (OB_FAIL(piece_start_scn.convert_from_ts((start_scn.convert_to_ts() / ONE_DAY) * ONE_DAY))) {
|
||||
LOG_WARN("fail to set piece start scn", K(ret));
|
||||
}
|
||||
}
|
||||
|
||||
// If 'piece_switch_interval' is equal or bigger than one hour, then it must be an integer multiple of hour.
|
||||
else if (ONE_HOUR <= piece_switch_interval) {
|
||||
if (0 != piece_switch_interval % ONE_HOUR) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("piece switch interval must be an integer multiple of hour", K(ret), K(start_scn), K(piece_switch_interval));
|
||||
} else if (OB_FAIL(piece_start_scn.convert_from_ts((start_scn.convert_to_ts() / ONE_HOUR) * ONE_HOUR))) {
|
||||
LOG_WARN("fail to set piece start scn", K(ret));
|
||||
}
|
||||
}
|
||||
|
||||
// If 'piece_switch_interval' is equal or bigger than one minute, then it must be an integer multiple of minute.
|
||||
else if (ONE_MINUTE <= piece_switch_interval) {
|
||||
if (0 != piece_switch_interval % ONE_MINUTE) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("piece switch interval must be an integer multiple of minute", K(ret), K(start_scn), K(piece_switch_interval));
|
||||
} else if (OB_FAIL(piece_start_scn.convert_from_ts((start_scn.convert_to_ts() / ONE_MINUTE) * ONE_MINUTE))) {
|
||||
LOG_WARN("fail to set piece start scn", K(ret));
|
||||
}
|
||||
}
|
||||
|
||||
// If 'piece_switch_interval' is equal or bigger than 10 seconds, then it must be an integer multiple of 10 seconds.
|
||||
else if (TEN_SECONDS <= piece_switch_interval) {
|
||||
if (0 != piece_switch_interval % TEN_SECONDS) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("piece switch interval must be an integer multiple of 10 seconds", K(ret), K(start_scn), K(piece_switch_interval));
|
||||
} else if (OB_FAIL(piece_start_scn.convert_from_ts((start_scn.convert_to_ts() / TEN_SECONDS) * TEN_SECONDS))) {
|
||||
LOG_WARN("fail to set piece start scn", K(ret));
|
||||
}
|
||||
}
|
||||
|
||||
else {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("piece switch interval is not valid", K(ret), K(start_scn), K(piece_switch_interval));
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTenantArchiveMgr::timestamp_to_day(const int64_t ts, int64_t &day)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
@ -56,10 +56,6 @@ public:
|
||||
const int64_t piece_switch_interval,
|
||||
const int64_t piece_id,
|
||||
SCN &end_scn);
|
||||
static int decide_first_piece_start_scn(
|
||||
const SCN &start_scn,
|
||||
const int64_t piece_switch_interval,
|
||||
SCN &piece_start_scn);
|
||||
// 'ts' must be with unit us.
|
||||
static int timestamp_to_day(const int64_t ts, int64_t &day);
|
||||
};
|
||||
|
||||
@ -83,13 +83,18 @@ int ObArchiveRoundHandler::start_trans_(common::ObMySQLTransaction &trans)
|
||||
int ObArchiveRoundHandler::start_archive(const ObTenantArchiveRoundAttr &round, ObTenantArchiveRoundAttr &new_round)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
ObTenantArchivePieceAttr first_piece;
|
||||
ObArray<ObTenantArchivePieceAttr> pieces;
|
||||
if (!round.state_.is_prepare()) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid round", K(ret), K(round));
|
||||
} else if (OB_FAIL(prepare_beginning_dest_round_(round, new_round))) {
|
||||
LOG_WARN("failed to prepare beginning dest round", K(ret), K(round));
|
||||
} else if (OB_FAIL(checkpoint_to(round, new_round))) {
|
||||
} else if (OB_FAIL(new_round.generate_first_piece(first_piece))) {
|
||||
LOG_WARN("failed to generate first piece", K(ret));
|
||||
} else if (OB_FAIL(pieces.push_back(first_piece))) {
|
||||
LOG_WARN("failed to push back first piece", K(ret));
|
||||
} else if (OB_FAIL(checkpoint_to(round, new_round, pieces))) {
|
||||
LOG_WARN("failed to checkpoint", K(ret), K(round), K(new_round));
|
||||
}
|
||||
return ret;
|
||||
|
||||
@ -43,6 +43,7 @@ public:
|
||||
{
|
||||
const int64_t ONE_SECOND_US = 1000 * 1000;
|
||||
struct tm timeinfo;
|
||||
memset(&timeinfo, 0, sizeof(timeinfo));
|
||||
strptime(time_str.ptr(), "%Y-%m-%d %H:%M:%S", &timeinfo);
|
||||
time_t timestamp = mktime(&timeinfo);
|
||||
return timestamp * ONE_SECOND_US;
|
||||
@ -613,7 +614,7 @@ TEST_F(ArchiveCheckpointerTest, in_beginning_02)
|
||||
100,
|
||||
10,
|
||||
ObArchivePieceStatus::active(),
|
||||
ObBackupFileStatus::STATUS::BACKUP_FILE_AVAILABLE,
|
||||
ObBackupFileStatus::STATUS::BACKUP_FILE_INCOMPLETE,
|
||||
expect_piece);
|
||||
|
||||
|
||||
@ -646,7 +647,7 @@ TEST_F(ArchiveCheckpointerTest, in_beginning_02)
|
||||
100,
|
||||
10,
|
||||
ObArchivePieceStatus::active(),
|
||||
ObBackupFileStatus::STATUS::BACKUP_FILE_AVAILABLE,
|
||||
ObBackupFileStatus::STATUS::BACKUP_FILE_INCOMPLETE,
|
||||
expect_piece.piece_info_);
|
||||
|
||||
ObDestRoundCheckpointer::GeneratedLSPiece ls_piece_1001 = test.gen_checkpoint_ls_piece(
|
||||
|
||||
Reference in New Issue
Block a user