fix log archive checkpoint issuing LIST and TAGGING requests too frequently when delete mode is tagging
This commit is contained in:
parent
f1845ce8e9
commit
315a2dd5ec
13
deps/oblib/src/lib/restore/ob_storage_info.cpp
vendored
13
deps/oblib/src/lib/restore/ob_storage_info.cpp
vendored
@ -35,7 +35,8 @@ const char *get_storage_checksum_type_str(const ObStorageChecksumType &type)
|
||||
|
||||
//***********************ObObjectStorageInfo***************************
|
||||
ObObjectStorageInfo::ObObjectStorageInfo()
|
||||
: device_type_(ObStorageType::OB_STORAGE_MAX_TYPE),
|
||||
: delete_mode_(ObIStorageUtil::DELETE),
|
||||
device_type_(ObStorageType::OB_STORAGE_MAX_TYPE),
|
||||
checksum_type_(ObStorageChecksumType::OB_MD5_ALGO)
|
||||
{
|
||||
endpoint_[0] = '\0';
|
||||
@ -51,6 +52,7 @@ ObObjectStorageInfo::~ObObjectStorageInfo()
|
||||
|
||||
void ObObjectStorageInfo::reset()
|
||||
{
|
||||
delete_mode_ = ObIStorageUtil::DELETE;
|
||||
device_type_ = ObStorageType::OB_STORAGE_MAX_TYPE;
|
||||
checksum_type_ = ObStorageChecksumType::OB_MD5_ALGO;
|
||||
endpoint_[0] = '\0';
|
||||
@ -230,8 +232,8 @@ int ObObjectStorageInfo::parse_storage_info_(const char *storage_info, bool &has
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObObjectStorageInfo::check_delete_mode_(const char *delete_mode) const
|
||||
//TODO(shifagndan): define delete mode as enum
|
||||
int ObObjectStorageInfo::check_delete_mode_(const char *delete_mode)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_ISNULL(delete_mode)) {
|
||||
@ -240,6 +242,10 @@ int ObObjectStorageInfo::check_delete_mode_(const char *delete_mode) const
|
||||
} else if (0 != strcmp(delete_mode, "delete") && 0 != strcmp(delete_mode, "tagging")) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
OB_LOG(WARN, "delete mode is invalid", K(ret), K(delete_mode));
|
||||
} else if (0 == strcmp(delete_mode, "delete")) {
|
||||
delete_mode_ = ObIStorageUtil::DELETE;
|
||||
} else {
|
||||
delete_mode_ = ObIStorageUtil::TAGGING;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -324,6 +330,7 @@ int ObObjectStorageInfo::set_storage_info_field_(const char *info, char *field,
|
||||
int ObObjectStorageInfo::assign(const ObObjectStorageInfo &storage_info)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
delete_mode_ = storage_info.delete_mode_;
|
||||
device_type_ = storage_info.device_type_;
|
||||
checksum_type_ = storage_info.checksum_type_;
|
||||
MEMCPY(endpoint_, storage_info.endpoint_, sizeof(endpoint_));
|
||||
|
4
deps/oblib/src/lib/restore/ob_storage_info.h
vendored
4
deps/oblib/src/lib/restore/ob_storage_info.h
vendored
@ -76,6 +76,7 @@ public:
|
||||
ObStorageChecksumType get_checksum_type() const;
|
||||
const char *get_checksum_type_str() const;
|
||||
virtual int get_storage_info_str(char *storage_info, const int64_t info_len) const;
|
||||
int get_delete_mode() const { return delete_mode_; }
|
||||
|
||||
virtual bool is_valid() const;
|
||||
virtual void reset();
|
||||
@ -88,11 +89,12 @@ public:
|
||||
protected:
|
||||
virtual int get_access_key_(char *key_buf, const int64_t key_buf_len) const;
|
||||
virtual int parse_storage_info_(const char *storage_info, bool &has_appid);
|
||||
int check_delete_mode_(const char *delete_mode) const;
|
||||
int check_delete_mode_(const char *delete_mode);
|
||||
int set_checksum_type_(const char *checksum_type_str);
|
||||
int set_storage_info_field_(const char *info, char *field, const int64_t length);
|
||||
|
||||
public:
|
||||
int delete_mode_;
|
||||
// TODO: Rename device_type_ to storage_protocol_type_ for better clarity
|
||||
// Prefix in the storage_info string, such as 's3://', indicates the protocol used to access the target
|
||||
// Currently, both OBS and GCS are accessed via the s3 protocol,
|
||||
|
@ -71,7 +71,7 @@ void ObArchiveSchedulerService::run2()
|
||||
{
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
int64_t round = 0;
|
||||
share::ObLogArchiveStatus::STATUS last_log_archive_status = ObLogArchiveStatus::INVALID;
|
||||
ObArchiveRoundState round_state;
|
||||
|
||||
FLOG_INFO("ObArchiveSchedulerService run start");
|
||||
if (IS_NOT_INIT) {
|
||||
@ -80,18 +80,19 @@ void ObArchiveSchedulerService::run2()
|
||||
} else {
|
||||
while (true) {
|
||||
++round;
|
||||
round_state.set_invalid();
|
||||
ObCurTraceId::init(GCONF.self_addr_);
|
||||
FLOG_INFO("start do ObArchiveSchedulerService round", K(round));
|
||||
if (has_set_stop()) {
|
||||
tmp_ret = OB_IN_STOP_STATE;
|
||||
LOG_WARN_RET(tmp_ret, "exit for stop state", K(tmp_ret));
|
||||
break;
|
||||
} else if (OB_SUCCESS != (tmp_ret = process_())) {
|
||||
} else if (OB_SUCCESS != (tmp_ret = process_(round_state))) {
|
||||
LOG_WARN_RET(tmp_ret, "failed to do process", K(tmp_ret));
|
||||
}
|
||||
|
||||
int64_t checkpoint_interval = 1 * 1000 * 1000L;
|
||||
set_checkpoint_interval_(checkpoint_interval);
|
||||
int64_t checkpoint_interval = 120 * 1000 * 1000L;
|
||||
set_checkpoint_interval_(checkpoint_interval, round_state);
|
||||
idle();
|
||||
}
|
||||
}
|
||||
@ -259,7 +260,7 @@ int ObArchiveSchedulerService::stop_tenant_archive_(const uint64_t tenant_id)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObArchiveSchedulerService::process_()
|
||||
int ObArchiveSchedulerService::process_(ObArchiveRoundState &round_state)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
@ -271,6 +272,7 @@ int ObArchiveSchedulerService::process_()
|
||||
const bool lock = false;
|
||||
bool no_dest = false;
|
||||
bool no_round = false;
|
||||
round_state.set_invalid();
|
||||
|
||||
ObArchiveHandler tenant_scheduler;
|
||||
const uint64_t tenant_id = tenant_id_;
|
||||
@ -307,6 +309,8 @@ int ObArchiveSchedulerService::process_()
|
||||
no_round = true;
|
||||
ret = OB_SUCCESS;
|
||||
}
|
||||
} else {
|
||||
round_state = round.state_;
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
@ -315,17 +319,23 @@ int ObArchiveSchedulerService::process_()
|
||||
if (no_round || round.state_.is_stop() || round.state_.is_stopping()) {
|
||||
} else if (OB_FAIL(tenant_scheduler.disable_archive(dest_no))) {
|
||||
LOG_WARN("failed to disable archive", K(ret), K(tenant_id), K(dest_no), K(dest_state));
|
||||
} else {
|
||||
round_state.set_stopping();
|
||||
}
|
||||
} else if (dest_state.is_defer()) {
|
||||
if (no_round || round.state_.is_stop() || round.state_.is_suspend() || round.state_.is_suspending()) {
|
||||
} else if (OB_FAIL(tenant_scheduler.defer_archive(dest_no))) {
|
||||
LOG_WARN("failed to defer archive", K(ret), K(tenant_id), K(dest_no), K(dest_state));
|
||||
} else {
|
||||
round_state.set_suspending();
|
||||
}
|
||||
} else {
|
||||
// dest is enable
|
||||
if (no_round || round.state_.is_suspend() || round.state_.is_stop()) {
|
||||
if (OB_FAIL(tenant_scheduler.enable_archive(dest_no))) {
|
||||
LOG_WARN("failed to enable archive", K(ret), K(tenant_id), K(dest_no), K(dest_state));
|
||||
} else {
|
||||
round_state.set_beginning();
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -371,22 +381,33 @@ int ObArchiveSchedulerService::get_all_tenant_ids_(common::ObIArray<uint64_t> &t
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ObArchiveSchedulerService::set_checkpoint_interval_(const int64_t interval_us)
|
||||
void ObArchiveSchedulerService::set_checkpoint_interval_(const int64_t interval_us, const share::ObArchiveRoundState &round_state)
|
||||
{
|
||||
const int64_t max_idle_us = interval_us / 2 - RESERVED_FETCH_US;
|
||||
int64_t max_idle_us = interval_us / 2 - RESERVED_FETCH_US;
|
||||
int64_t idle_time_us = 0;
|
||||
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(tenant_id_));
|
||||
const int64_t lag_target = tenant_config.is_valid() ? tenant_config->archive_lag_target : 0L;
|
||||
|
||||
if (interval_us <= 0) {
|
||||
idle_time_us = MAX_IDLE_INTERVAL_US;
|
||||
} else {
|
||||
if (max_idle_us <= MIN_IDLE_INTERVAL_US) {
|
||||
idle_time_us = MIN_IDLE_INTERVAL_US;
|
||||
idle_time_us = max(MIN_IDLE_INTERVAL_US, lag_target / 2);
|
||||
} else if (max_idle_us > MAX_IDLE_INTERVAL_US) {
|
||||
idle_time_us = MAX_IDLE_INTERVAL_US;
|
||||
idle_time_us = min(MAX_IDLE_INTERVAL_US, max(lag_target / 2, MIN_IDLE_INTERVAL_US));
|
||||
} else {
|
||||
idle_time_us = max_idle_us;
|
||||
}
|
||||
}
|
||||
|
||||
if (idle_time_us > FAST_IDLE_INTERVAL_US
|
||||
&& (round_state.is_prepare()
|
||||
|| round_state.is_beginning()
|
||||
|| round_state.is_suspending()
|
||||
|| round_state.is_stopping())) {
|
||||
idle_time_us = FAST_IDLE_INTERVAL_US;
|
||||
}
|
||||
|
||||
if (idle_time_us != get_idle_time()) {
|
||||
FLOG_INFO("change idle_time_us", "idle_time_ts_", get_idle_time(), K(idle_time_us));
|
||||
set_idle_time(idle_time_us);
|
||||
|
@ -17,6 +17,7 @@
|
||||
#include "lib/mysqlclient/ob_isql_client.h"
|
||||
#include "lib/container/ob_iarray.h"
|
||||
#include "share/backup/ob_backup_struct.h"
|
||||
#include "share/backup/ob_archive_struct.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
@ -41,8 +42,7 @@ public:
|
||||
const int64_t RESERVED_FETCH_US = 10 * 1000 * 1000; // 10s, used for fetch observer log archive status
|
||||
const int64_t MIN_IDLE_INTERVAL_US = 2 * 1000 * 1000; // 2s
|
||||
const int64_t FAST_IDLE_INTERVAL_US = 10 * 1000 * 1000; // 10s, used during BEGINNING or STOPPING
|
||||
//const int64_t MAX_IDLE_INTERVAL_US = 60 * 1000 * 1000; // 60s
|
||||
const int64_t MAX_IDLE_INTERVAL_US = 10 * 1000 * 1000; // 60s
|
||||
const int64_t MAX_IDLE_INTERVAL_US = 60 * 1000 * 1000; // 60s
|
||||
DEFINE_MTL_FUNC(ObArchiveSchedulerService);
|
||||
int init();
|
||||
void run2() override;
|
||||
@ -64,7 +64,7 @@ public:
|
||||
int stop_archive(const uint64_t tenant_id, const common::ObIArray<uint64_t> &archive_tenant_ids);
|
||||
|
||||
private:
|
||||
int process_();
|
||||
int process_(share::ObArchiveRoundState &round_state);
|
||||
int start_tenant_archive_(const uint64_t tenant_id);
|
||||
// Return the first error that failed to start archive if force_start is true. Otherwise,
|
||||
// ignore all error.
|
||||
@ -75,7 +75,7 @@ private:
|
||||
int stop_tenant_archive_(const uint64_t tenant_id);
|
||||
int get_all_tenant_ids_(common::ObIArray<uint64_t> &tenantid_array);
|
||||
|
||||
void set_checkpoint_interval_(const int64_t interval_us);
|
||||
void set_checkpoint_interval_(const int64_t interval_us, const share::ObArchiveRoundState &round_state);
|
||||
int open_tenant_archive_mode_(const common::ObIArray<uint64_t> &tenant_ids_array);
|
||||
int open_tenant_archive_mode_(const uint64_t tenant_id);
|
||||
int close_tenant_archive_mode_(const common::ObIArray<uint64_t> &tenant_ids_array);
|
||||
|
@ -129,7 +129,7 @@ static int record_piece_extend_info(
|
||||
return ret;
|
||||
}
|
||||
|
||||
static int record_piece_checkpoint(const ObTenantArchivePieceAttr &piece_info, const ObArchiveStore &store)
|
||||
static int record_piece_checkpoint(const ObTenantArchiveRoundAttr &old_round_info, const ObTenantArchivePieceAttr &piece_info, const ObArchiveStore &store)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (!(piece_info.status_.is_active()
|
||||
@ -147,7 +147,7 @@ static int record_piece_checkpoint(const ObTenantArchivePieceAttr &piece_info, c
|
||||
checkpoint_desc.checkpoint_scn_ = piece_info.checkpoint_scn_;
|
||||
checkpoint_desc.max_scn_ = piece_info.max_scn_;
|
||||
checkpoint_desc.end_scn_ = piece_info.end_scn_;
|
||||
if (OB_FAIL(store.write_piece_checkpoint(piece_info.key_.dest_id_, piece_info.key_.round_id_, piece_info.key_.piece_id_, 0, checkpoint_desc))) {
|
||||
if (OB_FAIL(store.write_piece_checkpoint(piece_info.key_.dest_id_, piece_info.key_.round_id_, piece_info.key_.piece_id_, 0, old_round_info.checkpoint_scn_, checkpoint_desc))) {
|
||||
LOG_WARN("failed to write piece checkpoint info file", K(ret), K(piece_info), K(checkpoint_desc));
|
||||
}
|
||||
}
|
||||
@ -233,6 +233,7 @@ static int record_piece_inner_placeholder(const ObTenantArchivePieceAttr &piece_
|
||||
static int record_single_piece_info(const ObTenantArchivePieceAttr &piece_info, const ObArchiveStore &store)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
bool is_exist = false;
|
||||
ObSinglePieceDesc single_piece_desc;
|
||||
if (!piece_info.status_.is_frozen()) {
|
||||
@ -244,7 +245,7 @@ static int record_single_piece_info(const ObTenantArchivePieceAttr &piece_info,
|
||||
} else if (OB_FAIL(store.write_single_piece(piece_info.key_.dest_id_, piece_info.key_.round_id_, piece_info.key_.piece_id_, single_piece_desc))) {
|
||||
LOG_WARN("failed to write single piece info file", K(ret), K(piece_info), K(single_piece_desc));
|
||||
}
|
||||
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -340,7 +341,7 @@ static int piece_generated_cb(
|
||||
LOG_WARN("failed to record piece start", K(ret), K(old_round_info), K(piece));
|
||||
} else if (OB_FAIL(record_piece_extend_info(*sql_proxy, old_round_info, result, piece.piece_info_, store))) {
|
||||
LOG_WARN("failed to record piece extend info", K(ret));
|
||||
} else if (OB_FAIL(record_piece_checkpoint(piece_info, store))) {
|
||||
} else if (OB_FAIL(record_piece_checkpoint(old_round_info, piece_info, store))) {
|
||||
LOG_WARN("failed to record piece checkpoint", K(ret), K(old_round_info), K(piece));
|
||||
} else if (OB_FAIL(record_piece_info(piece, store))) {
|
||||
LOG_WARN("failed to record piece info", K(ret), K(old_round_info), K(piece));
|
||||
|
@ -66,6 +66,7 @@ int ObDelHisCheckpointFileOp::func(const dirent *entry)
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
OB_LOG(WARN, "invalid list entry, d_name is null", K(ret));
|
||||
} else {
|
||||
handled_file_num_++;
|
||||
uint64_t checkpoint_scn = 0;
|
||||
ObBackupPath full_path = path_;
|
||||
ObBackupIoAdapter io_util;
|
||||
@ -135,13 +136,16 @@ int ObArchiveCheckpointMgr::check_is_tagging_(const ObBackupStorageInfo *storage
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
is_tagging = false;
|
||||
if (OB_STORAGE_OSS == storage_info_ -> device_type_) {
|
||||
//TODO(zhixing.yh) Adapt the analytic interface in storage_info
|
||||
if (OB_ISNULL(storage_info)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("storage info is nullptr", K(ret));
|
||||
} else if (ObIStorageUtil::TAGGING == storage_info->get_delete_mode()) {
|
||||
is_tagging = true;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObArchiveCheckpointMgr::write(const uint64_t checkpoint_scn) const
|
||||
int ObArchiveCheckpointMgr::write(const uint64_t old_checkpoint_scn, const uint64_t checkpoint_scn) const
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
@ -149,24 +153,23 @@ int ObArchiveCheckpointMgr::write(const uint64_t checkpoint_scn) const
|
||||
ObBackupPath full_path = path_; //checkpoint scn file path
|
||||
ObBackupPath dir_path = path_; //checkpoint dir file path
|
||||
ObBackupIoAdapter io_util;
|
||||
bool is_tagging = false;
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("Archive checkpoint mgr not init", K(ret));
|
||||
} else if (checkpoint_scn <= 0) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument!", K(ret), K(checkpoint_scn));
|
||||
} else if (OB_FAIL(check_is_tagging_(storage_info_, is_tagging))) {
|
||||
LOG_WARN("failed to judge delete mode", K(ret));
|
||||
} else if (old_checkpoint_scn >= checkpoint_scn) { //do nothing
|
||||
} else if (OB_FAIL(full_path.join_checkpoint_info_file(file_name_, checkpoint_scn, type_))) {
|
||||
LOG_WARN("failed to get piece checkpoint file path",
|
||||
K(ret), K(checkpoint_scn), KP(file_name_), K(full_path), K(type_));
|
||||
} else if (OB_FAIL(write_checkpoint_file_(full_path))) {
|
||||
LOG_WARN("failed to write checkpoint file", K(ret), K(full_path));
|
||||
}
|
||||
//if the delete mode is not 'tagging', need to list files for deleting smaller checkpoint scn files
|
||||
if (OB_SUCC(ret) && !is_tagging && OB_TMP_FAIL(del_history_files_(dir_path, checkpoint_scn))) {
|
||||
LOG_WARN("failed to delete files", K(ret), K(dir_path), K(checkpoint_scn), K(tmp_ret));
|
||||
// delete only the last ckpt file:
|
||||
// 1. reduce listing requests
|
||||
// 2. guarantee all expired files can be eventually removed
|
||||
} else if (OB_TMP_FAIL(del_last_ckpt_file_(dir_path, old_checkpoint_scn))) {
|
||||
LOG_WARN("failed to delete last ckpt file", K(ret), K(dir_path), K(old_checkpoint_scn), K(tmp_ret));
|
||||
}
|
||||
|
||||
return ret;
|
||||
@ -202,16 +205,40 @@ int ObArchiveCheckpointMgr::get_max_checkpoint_scn_(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObArchiveCheckpointMgr::del_history_files_(
|
||||
int ObArchiveCheckpointMgr::del_last_ckpt_file_(
|
||||
const ObBackupPath &dir_path,
|
||||
const uint64_t old_checkpoint_scn) const
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObBackupIoAdapter io_util;
|
||||
ObBackupPath file_path = dir_path;
|
||||
|
||||
if (dir_path.is_empty()) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("dir path is empty", K(ret), K(dir_path));
|
||||
} else if (0 == old_checkpoint_scn) { // 0 is reserved
|
||||
} else if (OB_FAIL(file_path.join_checkpoint_info_file(file_name_, old_checkpoint_scn, type_))) {
|
||||
LOG_WARN("fail to join checkpoint info file", K(ret), K(old_checkpoint_scn));
|
||||
} else if (OB_FAIL(io_util.del_file(file_path.get_obstr(), storage_info_))) {
|
||||
LOG_WARN("fail to delete file", K(ret), K(file_path));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObArchiveCheckpointMgr::del_history_files(
|
||||
const uint64_t write_checkpoint_scn) const
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObBackupIoAdapter io_util;
|
||||
ObDelHisCheckpointFileOp del_his_file_op(write_checkpoint_scn, dir_path, file_name_, type_, storage_info_);
|
||||
if (OB_FAIL(io_util.list_files(dir_path.get_ptr(), storage_info_, del_his_file_op))) {
|
||||
LOG_WARN("failed to del history checkpoint file",
|
||||
K(ret), K(dir_path), K(write_checkpoint_scn), K(path_), KP(file_name_), K(type_));
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObArchiveCheckpointMgr not inited", K(ret));
|
||||
} else {
|
||||
ObDelHisCheckpointFileOp del_his_file_op(write_checkpoint_scn, path_, file_name_, type_, storage_info_);
|
||||
if (OB_FAIL(io_util.list_files(path_.get_ptr(), storage_info_, del_his_file_op))) {
|
||||
LOG_WARN("failed to del history checkpoint file",
|
||||
K(ret), K(path_), K(write_checkpoint_scn), K(path_), KP(file_name_), K(type_));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
@ -55,17 +55,19 @@ public:
|
||||
path_(path),
|
||||
file_name_(file_name),
|
||||
type_(type),
|
||||
storage_info_(storage_info) {}
|
||||
storage_info_(storage_info),
|
||||
handled_file_num_(0) {}
|
||||
virtual ~ObDelHisCheckpointFileOp() {}
|
||||
bool is_valid() const;
|
||||
int func(const dirent *entry) ;
|
||||
|
||||
int64_t get_handed_file_num() { return handled_file_num_; };
|
||||
private:
|
||||
uint64_t checkpoint_scn_;
|
||||
ObBackupPath path_;
|
||||
const char *file_name_;
|
||||
ObBackupFileSuffix type_;
|
||||
const share::ObBackupStorageInfo *storage_info_;
|
||||
int64_t handled_file_num_;
|
||||
|
||||
DISALLOW_COPY_AND_ASSIGN(ObDelHisCheckpointFileOp);
|
||||
};
|
||||
@ -87,11 +89,12 @@ public:
|
||||
const ObBackupStorageInfo *storage_info);
|
||||
void reset();
|
||||
bool is_valid() const;
|
||||
int write(const uint64_t checkpoint_scn) const;
|
||||
int write(const uint64_t old_checkpoint_scn, const uint64_t checkpoint_scn) const;
|
||||
int read(uint64_t &checkpoint_scn) const;
|
||||
int del_history_files(const uint64_t write_checkpoint_scn) const;
|
||||
private:
|
||||
int get_max_checkpoint_scn_(const ObBackupPath &path, uint64_t &max_checkpoint_scn) const;
|
||||
int del_history_files_(const ObBackupPath &dir_path, const uint64_t write_checkpoint_scn) const;
|
||||
int del_last_ckpt_file_(const ObBackupPath &dir_path, const uint64_t write_checkpoint_scn) const;
|
||||
int write_checkpoint_file_(const ObBackupPath &path) const;
|
||||
int check_is_tagging_(const ObBackupStorageInfo *storage_info, bool &is_tagging) const;
|
||||
|
||||
|
@ -892,7 +892,8 @@ int ObArchiveStore::read_piece_checkpoint(const int64_t dest_id, const int64_t r
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObArchiveStore::write_piece_checkpoint(const int64_t dest_id, const int64_t round_id, const int64_t piece_id, const int64_t file_id, const ObPieceCheckpointDesc &desc) const
|
||||
int ObArchiveStore::write_piece_checkpoint(const int64_t dest_id, const int64_t round_id, const int64_t piece_id,
|
||||
const int64_t file_id, const share::SCN &old_checkpoint_scn, const ObPieceCheckpointDesc &desc) const
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObBackupPath full_path;
|
||||
@ -919,13 +920,33 @@ int ObArchiveStore::write_piece_checkpoint(const int64_t dest_id, const int64_t
|
||||
LOG_WARN("failed to get piece checkpoint dir path", K(ret), K(dest), K(dest_id), K(round_id), K(piece_id));
|
||||
} else if (OB_FAIL(mgr.init(dir_path, OB_STR_CHECKPOINT_FILE_NAME, ObBackupFileSuffix::ARCHIVE, get_storage_info()))) {
|
||||
LOG_WARN("failed to init ObArchiveCheckPointMgr", K(ret), K(dir_path));
|
||||
} else if (OB_FAIL(mgr.write(desc.checkpoint_scn_.get_val_for_inner_table_field()))) {
|
||||
} else if (OB_FAIL(mgr.write(old_checkpoint_scn.get_val_for_inner_table_field(),
|
||||
desc.checkpoint_scn_.get_val_for_inner_table_field()))) {
|
||||
LOG_WARN("failed to write checkpoint info", K(ret), K(desc));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObArchiveStore::delete_piece_his_checkpoint(const int64_t dest_id, const int64_t round_id, const int64_t piece_id, const int64_t file_id, const uint64_t checkpoint_scn) const
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObBackupPath dir_path;
|
||||
const ObBackupDest &dest = get_backup_dest();
|
||||
ObArchiveCheckpointMgr mgr;
|
||||
if (!is_init()) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObArchiveStore not init", K(ret));
|
||||
} else if (OB_FAIL(ObArchivePathUtil::get_piece_checkpoint_dir_path(dest, dest_id, round_id, piece_id, dir_path))) {
|
||||
LOG_WARN("failed to get piece checkpoint dir path", K(ret), K(dest), K(dest_id), K(round_id), K(piece_id));
|
||||
} else if (OB_FAIL(mgr.init(dir_path, OB_STR_CHECKPOINT_FILE_NAME, ObBackupFileSuffix::ARCHIVE, get_storage_info()))) {
|
||||
LOG_WARN("failed to init ObArchiveCheckPointMgr", K(ret), K(dir_path));
|
||||
} else if (OB_FAIL(mgr.del_history_files(checkpoint_scn))) {
|
||||
LOG_WARN("fail to delete all checkpoint files", K(ret));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObArchiveStore::read_piece_checkpoint(ObPieceCheckpointDesc &desc) const
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
@ -378,7 +378,8 @@ public:
|
||||
// oss://archive/d[dest_id]r[round_id]p[piece_id]/checkpoint/checkpoint_info.[file_id].obarc
|
||||
int is_piece_checkpoint_file_exist(const int64_t dest_id, const int64_t round_id, const int64_t piece_id, const int64_t file_id, bool &is_exist) const;
|
||||
int read_piece_checkpoint(const int64_t dest_id, const int64_t round_id, const int64_t piece_id, const int64_t file_id, ObPieceCheckpointDesc &desc) const;
|
||||
int write_piece_checkpoint(const int64_t dest_id, const int64_t round_id, const int64_t piece_id, const int64_t file_id, const ObPieceCheckpointDesc &desc) const;
|
||||
int write_piece_checkpoint(const int64_t dest_id, const int64_t round_id, const int64_t piece_id, const int64_t file_id, const share::SCN &old_checkpoint_scn, const ObPieceCheckpointDesc &desc) const;
|
||||
int delete_piece_his_checkpoint(const int64_t dest_id, const int64_t round_id, const int64_t piece_id, const int64_t file_id, const uint64_t checkpoint_scn) const;
|
||||
// oss://[user_specified_path]/checkpoint/checkpoint_info.0.obarc
|
||||
int read_piece_checkpoint(ObPieceCheckpointDesc &desc) const;
|
||||
// oss://archive/d[dest_id]r[round_id]p[piece_id]/piece_d[dest_id]r[round_id]p[piece_id]_20220601T120000_20220602T120000.obarc
|
||||
|
@ -6227,13 +6227,14 @@ int ObLSBackupComplementLogTask::copy_checkpoint_info(const ObTenantArchivePiece
|
||||
ObPieceCheckpointDesc desc;
|
||||
bool is_exist = false;
|
||||
const int64_t file_id = 0;
|
||||
const share::SCN old_ckpt_scn = SCN::min_scn(); //set 0, but will not delete
|
||||
if (OB_FAIL(src_store.is_piece_checkpoint_file_exist(src_dest_id, round_id, piece_id, file_id, is_exist))) {
|
||||
LOG_WARN("failed to check is piece checkpoint file file exist", K(ret), K(src_dest_id), K(round_id), K(piece_id));
|
||||
} else if (!is_exist) {
|
||||
// do nothing
|
||||
} else if (OB_FAIL(src_store.read_piece_checkpoint(src_dest_id, round_id, piece_id, file_id, desc))) {
|
||||
LOG_WARN("failed to read piece checkpoint", K(ret), K(piece_attr));
|
||||
} else if (OB_FAIL(dest_store.write_piece_checkpoint(dest_dest_id, round_id, piece_id, file_id, desc))) {
|
||||
} else if (OB_FAIL(dest_store.write_piece_checkpoint(dest_dest_id, round_id, piece_id, file_id, old_ckpt_scn, desc))) {
|
||||
LOG_WARN("failed to write piece checkpoint", K(ret), K(piece_attr));
|
||||
}
|
||||
return ret;
|
||||
|
@ -245,17 +245,17 @@ int TestArchiveCheckpointMgr::test_write_and_read_checkpoint(const ObStorageType
|
||||
uint64_t checkpoint = 0;
|
||||
if (OB_FAIL(mgr.init(root_path, OB_STR_CHECKPOINT_FILE_NAME, ObBackupFileSuffix::ARCHIVE, &storage_info_))) {
|
||||
LOG_WARN("failed to init checkpoint mgr", K(ret), K(root_path));
|
||||
} else if (OB_FAIL(mgr.write(10))) {
|
||||
} else if (OB_FAIL(mgr.write(10, 9))) {
|
||||
LOG_WARN("failed to write files", K(ret), K(root_path));
|
||||
} else if (OB_FAIL(mgr.write(9))) {
|
||||
} else if (OB_FAIL(mgr.write(9, 8))) {
|
||||
LOG_WARN("failed to write files", K(ret), K(root_path));
|
||||
} else if (OB_FAIL(mgr.read(checkpoint))) {
|
||||
LOG_WARN("failed to read files", K(ret));
|
||||
} else if (checkpoint != 10) {
|
||||
ret = OB_ERROR;
|
||||
} else if (OB_FAIL(mgr.write(100))) {
|
||||
} else if (OB_FAIL(mgr.write(100, 99))) {
|
||||
LOG_WARN("failed to write files", K(ret), K(root_path));
|
||||
} else if (OB_FAIL(mgr.write(50))) {
|
||||
} else if (OB_FAIL(mgr.write(50, 49))) {
|
||||
LOG_WARN("failed to write files", K(ret), K(root_path));
|
||||
} else if (OB_FAIL(mgr.read(checkpoint))) {
|
||||
LOG_WARN("failed to read files", K(ret));
|
||||
|
Loading…
x
Reference in New Issue
Block a user