diff --git a/deps/oblib/src/lib/restore/ob_storage_info.cpp b/deps/oblib/src/lib/restore/ob_storage_info.cpp index 591c55412..412e75391 100644 --- a/deps/oblib/src/lib/restore/ob_storage_info.cpp +++ b/deps/oblib/src/lib/restore/ob_storage_info.cpp @@ -35,7 +35,8 @@ const char *get_storage_checksum_type_str(const ObStorageChecksumType &type) //***********************ObObjectStorageInfo*************************** ObObjectStorageInfo::ObObjectStorageInfo() - : device_type_(ObStorageType::OB_STORAGE_MAX_TYPE), + : delete_mode_(ObIStorageUtil::DELETE), + device_type_(ObStorageType::OB_STORAGE_MAX_TYPE), checksum_type_(ObStorageChecksumType::OB_MD5_ALGO) { endpoint_[0] = '\0'; @@ -51,6 +52,7 @@ ObObjectStorageInfo::~ObObjectStorageInfo() void ObObjectStorageInfo::reset() { + delete_mode_ = ObIStorageUtil::DELETE; device_type_ = ObStorageType::OB_STORAGE_MAX_TYPE; checksum_type_ = ObStorageChecksumType::OB_MD5_ALGO; endpoint_[0] = '\0'; @@ -230,8 +232,8 @@ int ObObjectStorageInfo::parse_storage_info_(const char *storage_info, bool &has } return ret; } - -int ObObjectStorageInfo::check_delete_mode_(const char *delete_mode) const +//TODO(shifagndan): define delete mode as enum +int ObObjectStorageInfo::check_delete_mode_(const char *delete_mode) { int ret = OB_SUCCESS; if (OB_ISNULL(delete_mode)) { @@ -240,6 +242,10 @@ int ObObjectStorageInfo::check_delete_mode_(const char *delete_mode) const } else if (0 != strcmp(delete_mode, "delete") && 0 != strcmp(delete_mode, "tagging")) { ret = OB_INVALID_ARGUMENT; OB_LOG(WARN, "delete mode is invalid", K(ret), K(delete_mode)); + } else if (0 == strcmp(delete_mode, "delete")) { + delete_mode_ = ObIStorageUtil::DELETE; + } else { + delete_mode_ = ObIStorageUtil::TAGGING; } return ret; } @@ -324,6 +330,7 @@ int ObObjectStorageInfo::set_storage_info_field_(const char *info, char *field, int ObObjectStorageInfo::assign(const ObObjectStorageInfo &storage_info) { int ret = OB_SUCCESS; + delete_mode_ = storage_info.delete_mode_; device_type_ = storage_info.device_type_; checksum_type_ = storage_info.checksum_type_; MEMCPY(endpoint_, storage_info.endpoint_, sizeof(endpoint_)); diff --git a/deps/oblib/src/lib/restore/ob_storage_info.h b/deps/oblib/src/lib/restore/ob_storage_info.h index 5a851b33a..8daee6299 100644 --- a/deps/oblib/src/lib/restore/ob_storage_info.h +++ b/deps/oblib/src/lib/restore/ob_storage_info.h @@ -76,6 +76,7 @@ public: ObStorageChecksumType get_checksum_type() const; const char *get_checksum_type_str() const; virtual int get_storage_info_str(char *storage_info, const int64_t info_len) const; + int get_delete_mode() const { return delete_mode_; } virtual bool is_valid() const; virtual void reset(); @@ -88,11 +89,12 @@ public: protected: virtual int get_access_key_(char *key_buf, const int64_t key_buf_len) const; virtual int parse_storage_info_(const char *storage_info, bool &has_appid); - int check_delete_mode_(const char *delete_mode) const; + int check_delete_mode_(const char *delete_mode); int set_checksum_type_(const char *checksum_type_str); int set_storage_info_field_(const char *info, char *field, const int64_t length); public: + int delete_mode_; // TODO: Rename device_type_ to storage_protocol_type_ for better clarity // Prefix in the storage_info string, such as 's3://', indicates the protocol used to access the target // Currently, both OBS and GCS are accessed via the s3 protocol, diff --git a/src/rootserver/backup/ob_archive_scheduler_service.cpp b/src/rootserver/backup/ob_archive_scheduler_service.cpp index d71094012..9d0aa38ce 100644 --- a/src/rootserver/backup/ob_archive_scheduler_service.cpp +++ b/src/rootserver/backup/ob_archive_scheduler_service.cpp @@ -71,7 +71,7 @@ void ObArchiveSchedulerService::run2() { int tmp_ret = OB_SUCCESS; int64_t round = 0; - share::ObLogArchiveStatus::STATUS last_log_archive_status = ObLogArchiveStatus::INVALID; + ObArchiveRoundState round_state; FLOG_INFO("ObArchiveSchedulerService run start"); if (IS_NOT_INIT) { @@ -80,18 +80,19 @@ void ObArchiveSchedulerService::run2() } else { while (true) { ++round; + round_state.set_invalid(); ObCurTraceId::init(GCONF.self_addr_); FLOG_INFO("start do ObArchiveSchedulerService round", K(round)); if (has_set_stop()) { tmp_ret = OB_IN_STOP_STATE; LOG_WARN_RET(tmp_ret, "exit for stop state", K(tmp_ret)); break; - } else if (OB_SUCCESS != (tmp_ret = process_())) { + } else if (OB_SUCCESS != (tmp_ret = process_(round_state))) { LOG_WARN_RET(tmp_ret, "failed to do process", K(tmp_ret)); } - int64_t checkpoint_interval = 1 * 1000 * 1000L; - set_checkpoint_interval_(checkpoint_interval); + int64_t checkpoint_interval = 120 * 1000 * 1000L; + set_checkpoint_interval_(checkpoint_interval, round_state); idle(); } } @@ -259,7 +260,7 @@ int ObArchiveSchedulerService::stop_tenant_archive_(const uint64_t tenant_id) return ret; } -int ObArchiveSchedulerService::process_() +int ObArchiveSchedulerService::process_(ObArchiveRoundState &round_state) { int ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS; @@ -271,6 +272,7 @@ int ObArchiveSchedulerService::process_() const bool lock = false; bool no_dest = false; bool no_round = false; + round_state.set_invalid(); ObArchiveHandler tenant_scheduler; const uint64_t tenant_id = tenant_id_; @@ -307,6 +309,8 @@ int ObArchiveSchedulerService::process_() no_round = true; ret = OB_SUCCESS; } + } else { + round_state = round.state_; } if (OB_FAIL(ret)) { @@ -315,17 +319,23 @@ int ObArchiveSchedulerService::process_() if (no_round || round.state_.is_stop() || round.state_.is_stopping()) { } else if (OB_FAIL(tenant_scheduler.disable_archive(dest_no))) { LOG_WARN("failed to disable archive", K(ret), K(tenant_id), K(dest_no), K(dest_state)); + } else { + round_state.set_stopping(); } } else if (dest_state.is_defer()) { if (no_round || round.state_.is_stop() || round.state_.is_suspend() || round.state_.is_suspending()) { } else if (OB_FAIL(tenant_scheduler.defer_archive(dest_no))) { LOG_WARN("failed to defer archive", K(ret), K(tenant_id), K(dest_no), K(dest_state)); + } else { + round_state.set_suspending(); } } else { // dest is enable if (no_round || round.state_.is_suspend() || round.state_.is_stop()) { if (OB_FAIL(tenant_scheduler.enable_archive(dest_no))) { LOG_WARN("failed to enable archive", K(ret), K(tenant_id), K(dest_no), K(dest_state)); + } else { + round_state.set_beginning(); } } } @@ -371,22 +381,33 @@ int ObArchiveSchedulerService::get_all_tenant_ids_(common::ObIArray &t return ret; } -void ObArchiveSchedulerService::set_checkpoint_interval_(const int64_t interval_us) +void ObArchiveSchedulerService::set_checkpoint_interval_(const int64_t interval_us, const share::ObArchiveRoundState &round_state) { - const int64_t max_idle_us = interval_us / 2 - RESERVED_FETCH_US; + int64_t max_idle_us = interval_us / 2 - RESERVED_FETCH_US; int64_t idle_time_us = 0; + omt::ObTenantConfigGuard tenant_config(TENANT_CONF(tenant_id_)); + const int64_t lag_target = tenant_config.is_valid() ? tenant_config->archive_lag_target : 0L; + if (interval_us <= 0) { idle_time_us = MAX_IDLE_INTERVAL_US; } else { if (max_idle_us <= MIN_IDLE_INTERVAL_US) { - idle_time_us = MIN_IDLE_INTERVAL_US; + idle_time_us = max(MIN_IDLE_INTERVAL_US, lag_target / 2); } else if (max_idle_us > MAX_IDLE_INTERVAL_US) { - idle_time_us = MAX_IDLE_INTERVAL_US; + idle_time_us = min(MAX_IDLE_INTERVAL_US, max(lag_target / 2, MIN_IDLE_INTERVAL_US)); } else { idle_time_us = max_idle_us; } } + if (idle_time_us > FAST_IDLE_INTERVAL_US + && (round_state.is_prepare() + || round_state.is_beginning() + || round_state.is_suspending() + || round_state.is_stopping())) { + idle_time_us = FAST_IDLE_INTERVAL_US; + } + if (idle_time_us != get_idle_time()) { FLOG_INFO("change idle_time_us", "idle_time_ts_", get_idle_time(), K(idle_time_us)); set_idle_time(idle_time_us); diff --git a/src/rootserver/backup/ob_archive_scheduler_service.h b/src/rootserver/backup/ob_archive_scheduler_service.h index 7d55883f4..9e01bc23b 100644 --- a/src/rootserver/backup/ob_archive_scheduler_service.h +++ b/src/rootserver/backup/ob_archive_scheduler_service.h @@ -17,6 +17,7 @@ #include "lib/mysqlclient/ob_isql_client.h" #include "lib/container/ob_iarray.h" #include "share/backup/ob_backup_struct.h" +#include "share/backup/ob_archive_struct.h" namespace oceanbase { @@ -41,8 +42,7 @@ public: const int64_t RESERVED_FETCH_US = 10 * 1000 * 1000; // 10s, used for fetch observer log archive status const int64_t MIN_IDLE_INTERVAL_US = 2 * 1000 * 1000; // 2s const int64_t FAST_IDLE_INTERVAL_US = 10 * 1000 * 1000; // 10s, used during BEGINNING or STOPPING - //const int64_t MAX_IDLE_INTERVAL_US = 60 * 1000 * 1000; // 60s - const int64_t MAX_IDLE_INTERVAL_US = 10 * 1000 * 1000; // 60s + const int64_t MAX_IDLE_INTERVAL_US = 60 * 1000 * 1000; // 60s DEFINE_MTL_FUNC(ObArchiveSchedulerService); int init(); void run2() override; @@ -64,7 +64,7 @@ public: int stop_archive(const uint64_t tenant_id, const common::ObIArray &archive_tenant_ids); private: - int process_(); + int process_(share::ObArchiveRoundState &round_state); int start_tenant_archive_(const uint64_t tenant_id); // Return the first error that failed to start archive if force_start is true. Otherwise, // ignore all error. @@ -75,7 +75,7 @@ private: int stop_tenant_archive_(const uint64_t tenant_id); int get_all_tenant_ids_(common::ObIArray &tenantid_array); - void set_checkpoint_interval_(const int64_t interval_us); + void set_checkpoint_interval_(const int64_t interval_us, const share::ObArchiveRoundState &round_state); int open_tenant_archive_mode_(const common::ObIArray &tenant_ids_array); int open_tenant_archive_mode_(const uint64_t tenant_id); int close_tenant_archive_mode_(const common::ObIArray &tenant_ids_array); diff --git a/src/rootserver/backup/ob_tenant_archive_scheduler.cpp b/src/rootserver/backup/ob_tenant_archive_scheduler.cpp index 2a58b0bf8..138d204f9 100644 --- a/src/rootserver/backup/ob_tenant_archive_scheduler.cpp +++ b/src/rootserver/backup/ob_tenant_archive_scheduler.cpp @@ -129,7 +129,7 @@ static int record_piece_extend_info( return ret; } -static int record_piece_checkpoint(const ObTenantArchivePieceAttr &piece_info, const ObArchiveStore &store) +static int record_piece_checkpoint(const ObTenantArchiveRoundAttr &old_round_info, const ObTenantArchivePieceAttr &piece_info, const ObArchiveStore &store) { int ret = OB_SUCCESS; if (!(piece_info.status_.is_active() @@ -147,7 +147,7 @@ static int record_piece_checkpoint(const ObTenantArchivePieceAttr &piece_info, c checkpoint_desc.checkpoint_scn_ = piece_info.checkpoint_scn_; checkpoint_desc.max_scn_ = piece_info.max_scn_; checkpoint_desc.end_scn_ = piece_info.end_scn_; - if (OB_FAIL(store.write_piece_checkpoint(piece_info.key_.dest_id_, piece_info.key_.round_id_, piece_info.key_.piece_id_, 0, checkpoint_desc))) { + if (OB_FAIL(store.write_piece_checkpoint(piece_info.key_.dest_id_, piece_info.key_.round_id_, piece_info.key_.piece_id_, 0, old_round_info.checkpoint_scn_, checkpoint_desc))) { LOG_WARN("failed to write piece checkpoint info file", K(ret), K(piece_info), K(checkpoint_desc)); } } @@ -233,6 +233,7 @@ static int record_piece_inner_placeholder(const ObTenantArchivePieceAttr &piece_ static int record_single_piece_info(const ObTenantArchivePieceAttr &piece_info, const ObArchiveStore &store) { int ret = OB_SUCCESS; + int tmp_ret = OB_SUCCESS; bool is_exist = false; ObSinglePieceDesc single_piece_desc; if (!piece_info.status_.is_frozen()) { @@ -244,7 +245,7 @@ static int record_single_piece_info(const ObTenantArchivePieceAttr &piece_info, } else if (OB_FAIL(store.write_single_piece(piece_info.key_.dest_id_, piece_info.key_.round_id_, piece_info.key_.piece_id_, single_piece_desc))) { LOG_WARN("failed to write single piece info file", K(ret), K(piece_info), K(single_piece_desc)); } - + return ret; } @@ -340,7 +341,7 @@ static int piece_generated_cb( LOG_WARN("failed to record piece start", K(ret), K(old_round_info), K(piece)); } else if (OB_FAIL(record_piece_extend_info(*sql_proxy, old_round_info, result, piece.piece_info_, store))) { LOG_WARN("failed to record piece extend info", K(ret)); - } else if (OB_FAIL(record_piece_checkpoint(piece_info, store))) { + } else if (OB_FAIL(record_piece_checkpoint(old_round_info, piece_info, store))) { LOG_WARN("failed to record piece checkpoint", K(ret), K(old_round_info), K(piece)); } else if (OB_FAIL(record_piece_info(piece, store))) { LOG_WARN("failed to record piece info", K(ret), K(old_round_info), K(piece)); diff --git a/src/share/backup/ob_archive_checkpoint_mgr.cpp b/src/share/backup/ob_archive_checkpoint_mgr.cpp index 0b243371d..1f2ea17b0 100644 --- a/src/share/backup/ob_archive_checkpoint_mgr.cpp +++ b/src/share/backup/ob_archive_checkpoint_mgr.cpp @@ -66,6 +66,7 @@ int ObDelHisCheckpointFileOp::func(const dirent *entry) ret = OB_INVALID_ARGUMENT; OB_LOG(WARN, "invalid list entry, d_name is null", K(ret)); } else { + handled_file_num_++; uint64_t checkpoint_scn = 0; ObBackupPath full_path = path_; ObBackupIoAdapter io_util; @@ -135,13 +136,16 @@ int ObArchiveCheckpointMgr::check_is_tagging_(const ObBackupStorageInfo *storage { int ret = OB_SUCCESS; is_tagging = false; - if (OB_STORAGE_OSS == storage_info_ -> device_type_) { - //TODO(zhixing.yh) Adapt the analytic interface in storage_info + if (OB_ISNULL(storage_info)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("storage info is nullptr", K(ret)); + } else if (ObIStorageUtil::TAGGING == storage_info->get_delete_mode()) { + is_tagging = true; } return ret; } -int ObArchiveCheckpointMgr::write(const uint64_t checkpoint_scn) const +int ObArchiveCheckpointMgr::write(const uint64_t old_checkpoint_scn, const uint64_t checkpoint_scn) const { int ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS; @@ -149,24 +153,23 @@ int ObArchiveCheckpointMgr::write(const uint64_t checkpoint_scn) const ObBackupPath full_path = path_; //checkpoint scn file path ObBackupPath dir_path = path_; //checkpoint dir file path ObBackupIoAdapter io_util; - bool is_tagging = false; if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("Archive checkpoint mgr not init", K(ret)); } else if (checkpoint_scn <= 0) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument!", K(ret), K(checkpoint_scn)); - } else if (OB_FAIL(check_is_tagging_(storage_info_, is_tagging))) { - LOG_WARN("failed to judge delete mode", K(ret)); + } else if (old_checkpoint_scn >= checkpoint_scn) { //do nothing } else if (OB_FAIL(full_path.join_checkpoint_info_file(file_name_, checkpoint_scn, type_))) { LOG_WARN("failed to get piece checkpoint file path", K(ret), K(checkpoint_scn), KP(file_name_), K(full_path), K(type_)); } else if (OB_FAIL(write_checkpoint_file_(full_path))) { LOG_WARN("failed to write checkpoint file", K(ret), K(full_path)); - } - //if the delete mode is not 'tagging', need to list files for deleting smaller checkpoint scn files - if (OB_SUCC(ret) && !is_tagging && OB_TMP_FAIL(del_history_files_(dir_path, checkpoint_scn))) { - LOG_WARN("failed to delete files", K(ret), K(dir_path), K(checkpoint_scn), K(tmp_ret)); + // delete only the last ckpt file: + // 1. reduce listing requests + // 2. guarantee all expired files can be eventually removed + } else if (OB_TMP_FAIL(del_last_ckpt_file_(dir_path, old_checkpoint_scn))) { + LOG_WARN("failed to delete last ckpt file", K(ret), K(dir_path), K(old_checkpoint_scn), K(tmp_ret)); } return ret; @@ -202,16 +205,40 @@ int ObArchiveCheckpointMgr::get_max_checkpoint_scn_( return ret; } -int ObArchiveCheckpointMgr::del_history_files_( +int ObArchiveCheckpointMgr::del_last_ckpt_file_( const ObBackupPath &dir_path, + const uint64_t old_checkpoint_scn) const +{ + int ret = OB_SUCCESS; + ObBackupIoAdapter io_util; + ObBackupPath file_path = dir_path; + + if (dir_path.is_empty()) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("dir path is empty", K(ret), K(dir_path)); + } else if (0 == old_checkpoint_scn) { // 0 is reserved + } else if (OB_FAIL(file_path.join_checkpoint_info_file(file_name_, old_checkpoint_scn, type_))) { + LOG_WARN("fail to join checkpoint info file", K(ret), K(old_checkpoint_scn)); + } else if (OB_FAIL(io_util.del_file(file_path.get_obstr(), storage_info_))) { + LOG_WARN("fail to delete file", K(ret), K(file_path)); + } + return ret; +} + +int ObArchiveCheckpointMgr::del_history_files( const uint64_t write_checkpoint_scn) const { int ret = OB_SUCCESS; ObBackupIoAdapter io_util; - ObDelHisCheckpointFileOp del_his_file_op(write_checkpoint_scn, dir_path, file_name_, type_, storage_info_); - if (OB_FAIL(io_util.list_files(dir_path.get_ptr(), storage_info_, del_his_file_op))) { - LOG_WARN("failed to del history checkpoint file", - K(ret), K(dir_path), K(write_checkpoint_scn), K(path_), KP(file_name_), K(type_)); + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_WARN("ObArchiveCheckpointMgr not inited", K(ret)); + } else { + ObDelHisCheckpointFileOp del_his_file_op(write_checkpoint_scn, path_, file_name_, type_, storage_info_); + if (OB_FAIL(io_util.list_files(path_.get_ptr(), storage_info_, del_his_file_op))) { + LOG_WARN("failed to del history checkpoint file", + K(ret), K(path_), K(write_checkpoint_scn), K(path_), KP(file_name_), K(type_)); + } } return ret; } diff --git a/src/share/backup/ob_archive_checkpoint_mgr.h b/src/share/backup/ob_archive_checkpoint_mgr.h index f0bc9eb57..ed63d4340 100644 --- a/src/share/backup/ob_archive_checkpoint_mgr.h +++ b/src/share/backup/ob_archive_checkpoint_mgr.h @@ -55,17 +55,19 @@ public: path_(path), file_name_(file_name), type_(type), - storage_info_(storage_info) {} + storage_info_(storage_info), + handled_file_num_(0) {} virtual ~ObDelHisCheckpointFileOp() {} bool is_valid() const; int func(const dirent *entry) ; - + int64_t get_handed_file_num() { return handled_file_num_; }; private: uint64_t checkpoint_scn_; ObBackupPath path_; const char *file_name_; ObBackupFileSuffix type_; const share::ObBackupStorageInfo *storage_info_; + int64_t handled_file_num_; DISALLOW_COPY_AND_ASSIGN(ObDelHisCheckpointFileOp); }; @@ -87,11 +89,12 @@ public: const ObBackupStorageInfo *storage_info); void reset(); bool is_valid() const; - int write(const uint64_t checkpoint_scn) const; + int write(const uint64_t old_checkpoint_scn, const uint64_t checkpoint_scn) const; int read(uint64_t &checkpoint_scn) const; + int del_history_files(const uint64_t write_checkpoint_scn) const; private: int get_max_checkpoint_scn_(const ObBackupPath &path, uint64_t &max_checkpoint_scn) const; - int del_history_files_(const ObBackupPath &dir_path, const uint64_t write_checkpoint_scn) const; + int del_last_ckpt_file_(const ObBackupPath &dir_path, const uint64_t write_checkpoint_scn) const; int write_checkpoint_file_(const ObBackupPath &path) const; int check_is_tagging_(const ObBackupStorageInfo *storage_info, bool &is_tagging) const; diff --git a/src/share/backup/ob_archive_store.cpp b/src/share/backup/ob_archive_store.cpp index bcadc210b..3087f4a93 100644 --- a/src/share/backup/ob_archive_store.cpp +++ b/src/share/backup/ob_archive_store.cpp @@ -892,7 +892,8 @@ int ObArchiveStore::read_piece_checkpoint(const int64_t dest_id, const int64_t r return ret; } -int ObArchiveStore::write_piece_checkpoint(const int64_t dest_id, const int64_t round_id, const int64_t piece_id, const int64_t file_id, const ObPieceCheckpointDesc &desc) const +int ObArchiveStore::write_piece_checkpoint(const int64_t dest_id, const int64_t round_id, const int64_t piece_id, + const int64_t file_id, const share::SCN &old_checkpoint_scn, const ObPieceCheckpointDesc &desc) const { int ret = OB_SUCCESS; ObBackupPath full_path; @@ -919,13 +920,33 @@ int ObArchiveStore::write_piece_checkpoint(const int64_t dest_id, const int64_t LOG_WARN("failed to get piece checkpoint dir path", K(ret), K(dest), K(dest_id), K(round_id), K(piece_id)); } else if (OB_FAIL(mgr.init(dir_path, OB_STR_CHECKPOINT_FILE_NAME, ObBackupFileSuffix::ARCHIVE, get_storage_info()))) { LOG_WARN("failed to init ObArchiveCheckPointMgr", K(ret), K(dir_path)); - } else if (OB_FAIL(mgr.write(desc.checkpoint_scn_.get_val_for_inner_table_field()))) { + } else if (OB_FAIL(mgr.write(old_checkpoint_scn.get_val_for_inner_table_field(), + desc.checkpoint_scn_.get_val_for_inner_table_field()))) { LOG_WARN("failed to write checkpoint info", K(ret), K(desc)); } } return ret; } +int ObArchiveStore::delete_piece_his_checkpoint(const int64_t dest_id, const int64_t round_id, const int64_t piece_id, const int64_t file_id, const uint64_t checkpoint_scn) const +{ + int ret = OB_SUCCESS; + ObBackupPath dir_path; + const ObBackupDest &dest = get_backup_dest(); + ObArchiveCheckpointMgr mgr; + if (!is_init()) { + ret = OB_NOT_INIT; + LOG_WARN("ObArchiveStore not init", K(ret)); + } else if (OB_FAIL(ObArchivePathUtil::get_piece_checkpoint_dir_path(dest, dest_id, round_id, piece_id, dir_path))) { + LOG_WARN("failed to get piece checkpoint dir path", K(ret), K(dest), K(dest_id), K(round_id), K(piece_id)); + } else if (OB_FAIL(mgr.init(dir_path, OB_STR_CHECKPOINT_FILE_NAME, ObBackupFileSuffix::ARCHIVE, get_storage_info()))) { + LOG_WARN("failed to init ObArchiveCheckPointMgr", K(ret), K(dir_path)); + } else if (OB_FAIL(mgr.del_history_files(checkpoint_scn))) { + LOG_WARN("fail to delete all checkpoint files", K(ret)); + } + return ret; +} + int ObArchiveStore::read_piece_checkpoint(ObPieceCheckpointDesc &desc) const { int ret = OB_SUCCESS; diff --git a/src/share/backup/ob_archive_store.h b/src/share/backup/ob_archive_store.h index 67bd82d23..152d32f9d 100644 --- a/src/share/backup/ob_archive_store.h +++ b/src/share/backup/ob_archive_store.h @@ -378,7 +378,8 @@ public: // oss://archive/d[dest_id]r[round_id]p[piece_id]/checkpoint/checkpoint_info.[file_id].obarc int is_piece_checkpoint_file_exist(const int64_t dest_id, const int64_t round_id, const int64_t piece_id, const int64_t file_id, bool &is_exist) const; int read_piece_checkpoint(const int64_t dest_id, const int64_t round_id, const int64_t piece_id, const int64_t file_id, ObPieceCheckpointDesc &desc) const; - int write_piece_checkpoint(const int64_t dest_id, const int64_t round_id, const int64_t piece_id, const int64_t file_id, const ObPieceCheckpointDesc &desc) const; + int write_piece_checkpoint(const int64_t dest_id, const int64_t round_id, const int64_t piece_id, const int64_t file_id, const share::SCN &old_checkpoint_scn, const ObPieceCheckpointDesc &desc) const; + int delete_piece_his_checkpoint(const int64_t dest_id, const int64_t round_id, const int64_t piece_id, const int64_t file_id, const uint64_t checkpoint_scn) const; // oss://[user_specified_path]/checkpoint/checkpoint_info.0.obarc int read_piece_checkpoint(ObPieceCheckpointDesc &desc) const; // oss://archive/d[dest_id]r[round_id]p[piece_id]/piece_d[dest_id]r[round_id]p[piece_id]_20220601T120000_20220602T120000.obarc diff --git a/src/storage/backup/ob_backup_task.cpp b/src/storage/backup/ob_backup_task.cpp index d392765b2..6cbd40144 100644 --- a/src/storage/backup/ob_backup_task.cpp +++ b/src/storage/backup/ob_backup_task.cpp @@ -6227,13 +6227,14 @@ int ObLSBackupComplementLogTask::copy_checkpoint_info(const ObTenantArchivePiece ObPieceCheckpointDesc desc; bool is_exist = false; const int64_t file_id = 0; + const share::SCN old_ckpt_scn = SCN::min_scn(); //set 0, but will not delete if (OB_FAIL(src_store.is_piece_checkpoint_file_exist(src_dest_id, round_id, piece_id, file_id, is_exist))) { LOG_WARN("failed to check is piece checkpoint file file exist", K(ret), K(src_dest_id), K(round_id), K(piece_id)); } else if (!is_exist) { // do nothing } else if (OB_FAIL(src_store.read_piece_checkpoint(src_dest_id, round_id, piece_id, file_id, desc))) { LOG_WARN("failed to read piece checkpoint", K(ret), K(piece_attr)); - } else if (OB_FAIL(dest_store.write_piece_checkpoint(dest_dest_id, round_id, piece_id, file_id, desc))) { + } else if (OB_FAIL(dest_store.write_piece_checkpoint(dest_dest_id, round_id, piece_id, file_id, old_ckpt_scn, desc))) { LOG_WARN("failed to write piece checkpoint", K(ret), K(piece_attr)); } return ret; diff --git a/unittest/share/backup/test_archive_checkpoint_mgr.cpp b/unittest/share/backup/test_archive_checkpoint_mgr.cpp index 689c802d5..4d54c3f90 100644 --- a/unittest/share/backup/test_archive_checkpoint_mgr.cpp +++ b/unittest/share/backup/test_archive_checkpoint_mgr.cpp @@ -245,17 +245,17 @@ int TestArchiveCheckpointMgr::test_write_and_read_checkpoint(const ObStorageType uint64_t checkpoint = 0; if (OB_FAIL(mgr.init(root_path, OB_STR_CHECKPOINT_FILE_NAME, ObBackupFileSuffix::ARCHIVE, &storage_info_))) { LOG_WARN("failed to init checkpoint mgr", K(ret), K(root_path)); - } else if (OB_FAIL(mgr.write(10))) { + } else if (OB_FAIL(mgr.write(10, 9))) { LOG_WARN("failed to write files", K(ret), K(root_path)); - } else if (OB_FAIL(mgr.write(9))) { + } else if (OB_FAIL(mgr.write(9, 8))) { LOG_WARN("failed to write files", K(ret), K(root_path)); } else if (OB_FAIL(mgr.read(checkpoint))) { LOG_WARN("failed to read files", K(ret)); } else if (checkpoint != 10) { ret = OB_ERROR; - } else if (OB_FAIL(mgr.write(100))) { + } else if (OB_FAIL(mgr.write(100, 99))) { LOG_WARN("failed to write files", K(ret), K(root_path)); - } else if (OB_FAIL(mgr.write(50))) { + } else if (OB_FAIL(mgr.write(50, 49))) { LOG_WARN("failed to write files", K(ret), K(root_path)); } else if (OB_FAIL(mgr.read(checkpoint))) { LOG_WARN("failed to read files", K(ret));