From ef07d04a9cbd23c7dbf428a1186687dc438963e4 Mon Sep 17 00:00:00 2001 From: obdev Date: Fri, 21 Jul 2023 08:42:22 +0000 Subject: [PATCH] Uing Small Files to Resolve Concurrent Reading and Writing of Archived Metainformation --- src/share/CMakeLists.txt | 1 + .../backup/ob_archive_checkpoint_mgr.cpp | 236 +++++++++++++ src/share/backup/ob_archive_checkpoint_mgr.h | 112 ++++++ src/share/backup/ob_archive_store.cpp | 57 +++- src/share/backup/ob_backup_path.cpp | 53 +++ src/share/backup/ob_backup_path.h | 2 + .../backup/ob_backup_serialize_provider.cpp | 9 +- src/share/backup/ob_backup_struct.h | 1 + unittest/share/backup/CMakeLists.txt | 1 + .../backup/test_archive_checkpoint_mgr.cpp | 318 ++++++++++++++++++ .../share/backup/test_ob_backup_dest_config.h | 24 ++ 11 files changed, 801 insertions(+), 13 deletions(-) create mode 100644 src/share/backup/ob_archive_checkpoint_mgr.cpp create mode 100644 src/share/backup/ob_archive_checkpoint_mgr.h create mode 100644 unittest/share/backup/test_archive_checkpoint_mgr.cpp create mode 100644 unittest/share/backup/test_ob_backup_dest_config.h diff --git a/src/share/CMakeLists.txt b/src/share/CMakeLists.txt index 877f1a89f..d2fae2503 100755 --- a/src/share/CMakeLists.txt +++ b/src/share/CMakeLists.txt @@ -48,6 +48,7 @@ ob_set_subtarget(ob_share backup backup/ob_backup_config.cpp backup/ob_log_restore_config.cpp backup/ob_log_restore_struct.cpp + backup/ob_archive_checkpoint_mgr.cpp ) ob_set_subtarget(ob_share cache diff --git a/src/share/backup/ob_archive_checkpoint_mgr.cpp b/src/share/backup/ob_archive_checkpoint_mgr.cpp new file mode 100644 index 000000000..0b243371d --- /dev/null +++ b/src/share/backup/ob_archive_checkpoint_mgr.cpp @@ -0,0 +1,236 @@ +// Copyright (c) 2021 OceanBase +// OceanBase is licensed under Mulan PubL v2. +// You can use this software according to the terms and conditions of the Mulan PubL v2. +// You may obtain a copy of Mulan PubL v2 at: +// http://license.coscl.org.cn/MulanPubL-2.0 +// THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, +// EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, +// MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. +// See the Mulan PubL v2 for more details. +#define USING_LOG_PREFIX SHARE +#include "ob_archive_checkpoint_mgr.h" +#include "lib/alloc/alloc_assist.h" +#include "lib/oblog/ob_log_module.h" +#include "lib/string/ob_sql_string.h" +#include "share/backup/ob_backup_struct.h" +#include "share/backup/ob_archive_path.h" + +using namespace oceanbase; +using namespace share; + +bool ObGetMaxCheckpointOp::is_valid() const +{ + return max_checkpoint_scn_ >= 0 + && file_name_ != nullptr + && type_ <= ObBackupFileSuffix::BACKUP + && type_ >= ObBackupFileSuffix::NONE; +} + +int ObGetMaxCheckpointOp::func(const dirent *entry) +{ + int ret = OB_SUCCESS; + if (OB_ISNULL(entry)) { + ret = OB_INVALID_ARGUMENT; + OB_LOG(WARN, "invalid list entry, entry is null", K(ret)); + } else if (OB_ISNULL(entry->d_name) || !is_valid()) { + ret = OB_INVALID_ARGUMENT; + OB_LOG(WARN, "invalid list entry, d_name is null", K(ret)); + } else { + uint64_t checkpoint_scn = 0; + if (OB_FAIL(ObBackupPath::parse_checkpoint(entry->d_name, file_name_, type_, checkpoint_scn))) { + OB_LOG(WARN, "failed to get checkpoint scn", K(ret), KP(entry->d_name)); + } else if (checkpoint_scn > max_checkpoint_scn_) { + max_checkpoint_scn_ = checkpoint_scn; + } + } + return ret; +} + +bool ObDelHisCheckpointFileOp::is_valid() const +{ + return checkpoint_scn_ >= 0 + && !path_.is_empty() + && file_name_ != nullptr + && type_ <= ObBackupFileSuffix::BACKUP + && type_ >= ObBackupFileSuffix::NONE + && storage_info_ != nullptr; +} + +int ObDelHisCheckpointFileOp::func(const dirent *entry) +{ + int ret = OB_SUCCESS; + if (OB_ISNULL(entry)) { + ret = OB_INVALID_ARGUMENT; + OB_LOG(WARN, "invalid list entry, entry is null", K(ret)); + } else if (OB_ISNULL(entry->d_name) || !is_valid()) { + ret = OB_INVALID_ARGUMENT; + OB_LOG(WARN, "invalid list entry, d_name is null", K(ret)); + } else { + uint64_t checkpoint_scn = 0; + ObBackupPath full_path = path_; + ObBackupIoAdapter io_util; + if (OB_FAIL(ObBackupPath::parse_checkpoint(entry->d_name, file_name_, type_, checkpoint_scn))) { + OB_LOG(WARN, "failed to get checkpoint scn", K(ret), KP(entry->d_name)); + } else if (checkpoint_scn >= checkpoint_scn_ || 0 == checkpoint_scn) { + //do nothing + } else if (OB_FAIL(full_path.join_checkpoint_info_file(file_name_, checkpoint_scn, type_))) { + OB_LOG(WARN, "failed to set full path for del file", + K(ret), K(checkpoint_scn), KP(file_name_), K(type_)); + } else if (OB_FAIL(io_util.del_file(full_path.get_ptr(), storage_info_))) { + OB_LOG(WARN, "failed to delete file", K(ret), K(full_path)); + } + } + return ret; +} + +/** + * ------------------------------ObArchiveCheckpointMgr--------------------- + */ +int ObArchiveCheckpointMgr::init( + const ObBackupPath &path, + const char *file_name, + const ObBackupFileSuffix &type, + const ObBackupStorageInfo *storage_info) +{ + int ret = OB_SUCCESS; + if (IS_INIT) { + ret = OB_INIT_TWICE; + LOG_WARN("ObArchiveCheckpointMgr init twice", K(ret)); + } else if (path.is_empty() + || OB_ISNULL(file_name) + || type > ObBackupFileSuffix::BACKUP + || type < ObBackupFileSuffix::NONE + || OB_ISNULL(storage_info)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", K(ret), K(path), KP(file_name), K(type)); + } else { + path_ = path; + file_name_ = file_name; + type_ = type; + storage_info_ = storage_info; + is_inited_ = true; + } + return ret; +} + +bool ObArchiveCheckpointMgr::is_valid() const +{ + return !path_.is_empty() + && type_ <= ObBackupFileSuffix::BACKUP + && type_ >= ObBackupFileSuffix::NONE + && file_name_ != nullptr + && storage_info_ != nullptr; +} + +void ObArchiveCheckpointMgr::reset() +{ + is_inited_ = false; + path_.reset(); + file_name_ = nullptr; + type_ = ObBackupFileSuffix::NONE; + storage_info_ = nullptr; +} + +int ObArchiveCheckpointMgr::check_is_tagging_(const ObBackupStorageInfo *storage_info, bool &is_tagging) const +{ + int ret = OB_SUCCESS; + is_tagging = false; + if (OB_STORAGE_OSS == storage_info_ -> device_type_) { + //TODO(zhixing.yh) Adapt the analytic interface in storage_info + } + return ret; +} + +int ObArchiveCheckpointMgr::write(const uint64_t checkpoint_scn) const +{ + int ret = OB_SUCCESS; + int tmp_ret = OB_SUCCESS; + uint64_t max_checkpoint_scn = 0; + ObBackupPath full_path = path_; //checkpoint scn file path + ObBackupPath dir_path = path_; //checkpoint dir file path + ObBackupIoAdapter io_util; + bool is_tagging = false; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_WARN("Archive checkpoint mgr not init", K(ret)); + } else if (checkpoint_scn <= 0) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument!", K(ret), K(checkpoint_scn)); + } else if (OB_FAIL(check_is_tagging_(storage_info_, is_tagging))) { + LOG_WARN("failed to judge delete mode", K(ret)); + } else if (OB_FAIL(full_path.join_checkpoint_info_file(file_name_, checkpoint_scn, type_))) { + LOG_WARN("failed to get piece checkpoint file path", + K(ret), K(checkpoint_scn), KP(file_name_), K(full_path), K(type_)); + } else if (OB_FAIL(write_checkpoint_file_(full_path))) { + LOG_WARN("failed to write checkpoint file", K(ret), K(full_path)); + } + //if the delete mode is not 'tagging', need to list files for deleting smaller checkpoint scn files + if (OB_SUCC(ret) && !is_tagging && OB_TMP_FAIL(del_history_files_(dir_path, checkpoint_scn))) { + LOG_WARN("failed to delete files", K(ret), K(dir_path), K(checkpoint_scn), K(tmp_ret)); + } + + return ret; +} + +int ObArchiveCheckpointMgr::read(uint64_t &max_checkpoint_scn) const +{ + int ret = OB_SUCCESS; + ObArray file_names; + common::ObArenaAllocator allocator; + ObBackupPath checkpoint_dir_path = path_; + max_checkpoint_scn = 0; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_WARN("Archive checkpoint mgr not init", K(ret)); + } else if (OB_FAIL(get_max_checkpoint_scn_(checkpoint_dir_path, max_checkpoint_scn))) { + LOG_WARN("failed to get max checkpoint scn", K(ret), K(checkpoint_dir_path)); + } + return ret; +} + +int ObArchiveCheckpointMgr::get_max_checkpoint_scn_( + const ObBackupPath &path, + uint64_t &max_checkpoint_scn) const +{ + int ret = OB_SUCCESS; + ObBackupIoAdapter io_util; + max_checkpoint_scn = 0; + ObGetMaxCheckpointOp max_checkpoint_op(max_checkpoint_scn, file_name_, type_); + if (OB_FAIL(io_util.list_files(path.get_ptr(), storage_info_, max_checkpoint_op))) { + LOG_WARN("failed to get max checkpoint scn", K(ret), K(path)); + } + return ret; +} + +int ObArchiveCheckpointMgr::del_history_files_( + const ObBackupPath &dir_path, + const uint64_t write_checkpoint_scn) const +{ + int ret = OB_SUCCESS; + ObBackupIoAdapter io_util; + ObDelHisCheckpointFileOp del_his_file_op(write_checkpoint_scn, dir_path, file_name_, type_, storage_info_); + if (OB_FAIL(io_util.list_files(dir_path.get_ptr(), storage_info_, del_his_file_op))) { + LOG_WARN("failed to del history checkpoint file", + K(ret), K(dir_path), K(write_checkpoint_scn), K(path_), KP(file_name_), K(type_)); + } + return ret; +} + +int ObArchiveCheckpointMgr::write_checkpoint_file_(const ObBackupPath &path) const +{ + int ret = OB_SUCCESS; + int64_t pos = 0; + ObBackupIoAdapter io_util; + char buf = '\0'; + const int64_t buf_size = 0; + if (OB_FAIL(io_util.mk_parent_dir(path.get_ptr(), storage_info_))) { + LOG_WARN("failed to mk dir.", K(ret), K(path)); + } else if (OB_FAIL(io_util.write_single_file(path.get_ptr(), storage_info_, &buf, buf_size))) { + LOG_WARN("failed to write single file.", K(ret), K(path)); + } else { + if (REACH_TIME_INTERVAL(10 * 1000 * 1000)) { + FLOG_INFO("succeed to write checkpoint file.", K(path)); + } + } + return ret; +} \ No newline at end of file diff --git a/src/share/backup/ob_archive_checkpoint_mgr.h b/src/share/backup/ob_archive_checkpoint_mgr.h new file mode 100644 index 000000000..f0bc9eb57 --- /dev/null +++ b/src/share/backup/ob_archive_checkpoint_mgr.h @@ -0,0 +1,112 @@ +// Copyright (c) 2021 OceanBase +// OceanBase is licensed under Mulan PubL v2. +// You can use this software according to the terms and conditions of the Mulan PubL v2. +// You may obtain a copy of Mulan PubL v2 at: +// http://license.coscl.org.cn/MulanPubL-2.0 +// THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, +// EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, +// MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. +// See the Mulan PubL v2 for more details. + +#ifndef SRC_SHARE_BACKUP_OB_ARCHIVE_CHECKPOINT_MGR_H_ +#define SRC_SHARE_BACKUP_OB_ARCHIVE_CHECKPOINT_MGR_H_ + +#include "share/backup/ob_backup_io_adapter.h" +#include "share/backup/ob_backup_path.h" +#include "share/backup/ob_archive_store.h" +#include "lib/utility/ob_unify_serialize.h" + +namespace oceanbase +{ +namespace share +{ +class ObGetMaxCheckpointOp : public ObBaseDirEntryOperator +{ +public: + ObGetMaxCheckpointOp( + uint64_t& max_checkpoint_scn, + const char *file_name, + const ObBackupFileSuffix &type) + : max_checkpoint_scn_(max_checkpoint_scn), + file_name_(file_name), + type_(type) {} + virtual ~ObGetMaxCheckpointOp() {} + bool is_valid() const; + int func(const dirent *entry); +private: + uint64_t& max_checkpoint_scn_; + const char *file_name_; + ObBackupFileSuffix type_; + + DISALLOW_COPY_AND_ASSIGN(ObGetMaxCheckpointOp); +}; + +//delete files with smaller checkpoint_scn in file name +class ObDelHisCheckpointFileOp : public ObBaseDirEntryOperator +{ +public: + ObDelHisCheckpointFileOp( + const uint64_t checkpoint_scn, + const ObBackupPath &path, + const char *file_name, + const ObBackupFileSuffix &type, + const share::ObBackupStorageInfo *storage_info) + : checkpoint_scn_(checkpoint_scn), + path_(path), + file_name_(file_name), + type_(type), + storage_info_(storage_info) {} + virtual ~ObDelHisCheckpointFileOp() {} + bool is_valid() const; + int func(const dirent *entry) ; + +private: + uint64_t checkpoint_scn_; + ObBackupPath path_; + const char *file_name_; + ObBackupFileSuffix type_; + const share::ObBackupStorageInfo *storage_info_; + + DISALLOW_COPY_AND_ASSIGN(ObDelHisCheckpointFileOp); +}; + +class ObArchiveCheckpointMgr final +{ +public: + ObArchiveCheckpointMgr() + : is_inited_(false), + path_(), + file_name_(nullptr), + type_(), + storage_info_(nullptr) {} + ~ObArchiveCheckpointMgr() {} + int init( + const ObBackupPath &path, + const char *file_name, + const ObBackupFileSuffix &type, + const ObBackupStorageInfo *storage_info); + void reset(); + bool is_valid() const; + int write(const uint64_t checkpoint_scn) const; + int read(uint64_t &checkpoint_scn) const; +private: + int get_max_checkpoint_scn_(const ObBackupPath &path, uint64_t &max_checkpoint_scn) const; + int del_history_files_(const ObBackupPath &dir_path, const uint64_t write_checkpoint_scn) const; + int write_checkpoint_file_(const ObBackupPath &path) const; + int check_is_tagging_(const ObBackupStorageInfo *storage_info, bool &is_tagging) const; + + TO_STRING_KV(K_(is_inited), K_(path), KP_(file_name), K_(type)); + +private: + bool is_inited_; + ObBackupPath path_; + const char *file_name_; + ObBackupFileSuffix type_; + const ObBackupStorageInfo *storage_info_; + DISALLOW_COPY_AND_ASSIGN(ObArchiveCheckpointMgr); +}; + +} +} + +#endif /* SRC_SHARE_BACKUP_OB_ARCHIVE_CHECKPOINT_MGR_H_*/ diff --git a/src/share/backup/ob_archive_store.cpp b/src/share/backup/ob_archive_store.cpp index 68c16917a..a3bf4fdc8 100644 --- a/src/share/backup/ob_archive_store.cpp +++ b/src/share/backup/ob_archive_store.cpp @@ -19,6 +19,7 @@ #include "lib/oblog/ob_log_module.h" #include "lib/utility/utility.h" #include "share/backup/ob_archive_path.h" +#include "share/backup/ob_archive_checkpoint_mgr.h" using namespace oceanbase; using namespace common; @@ -838,15 +839,35 @@ int ObArchiveStore::is_piece_checkpoint_file_exist(const int64_t dest_id, const int ObArchiveStore::read_piece_checkpoint(const int64_t dest_id, const int64_t round_id, const int64_t piece_id, const int64_t file_id, ObPieceCheckpointDesc &desc) const { int ret = OB_SUCCESS; - ObBackupPath full_path; + ObBackupPath dir_path; + ObBackupPath meta_full_path; const ObBackupDest &dest = get_backup_dest(); if (!is_init()) { ret = OB_NOT_INIT; LOG_WARN("ObArchiveStore not init", K(ret)); - } else if (OB_FAIL(ObArchivePathUtil::get_piece_checkpoint_file_path(dest, dest_id, round_id, piece_id, file_id, full_path))) { - LOG_WARN("failed to get piece checkpoint file path", K(ret), K(dest), K(dest_id), K(round_id), K(piece_id), K(file_id)); - } else if (OB_FAIL(read_single_file(full_path.get_ptr(), desc))) { - LOG_WARN("failed to read single file", K(ret), K(full_path), K(dest_id), K(round_id), K(piece_id)); + } else { + ObArchiveCheckpointMgr mgr; + uint64_t max_checkpoint_scn = 0; + if (OB_FAIL(ObArchivePathUtil::get_piece_checkpoint_file_path(dest, dest_id, round_id, piece_id, 0, meta_full_path))) { + LOG_WARN("failed to get checkpoint meta file path", K(ret), K(dest), K(round_id), K(dest_id), K(piece_id)); + } else if (OB_FAIL(read_single_file(meta_full_path.get_ptr(), desc))) { + LOG_WARN("failed to read mate file", K(ret), K(meta_full_path)); + } else if (OB_FAIL(ObArchivePathUtil::get_piece_checkpoint_dir_path(dest, dest_id, round_id, piece_id, dir_path))) { + LOG_WARN("failed to get piece checkpoint dir path", K(ret), K(dest), K(dest_id), K(round_id), K(piece_id)); + } else if (OB_FAIL(mgr.init(dir_path, OB_STR_CHECKPOINT_FILE_NAME, ObBackupFileSuffix::ARCHIVE, get_storage_info()))) { + LOG_WARN("failed to init ObArchiveCheckPointMgr", K(ret), K(dir_path)); + } else if (OB_FAIL(mgr.read(max_checkpoint_scn))) { + LOG_WARN("failed to read checkpoint scn", K(ret), K(max_checkpoint_scn)); + } else if (0 == max_checkpoint_scn) { + //do nothing + } else if (OB_FAIL(desc.checkpoint_scn_.convert_for_inner_table_field(max_checkpoint_scn))) { + LOG_WARN("failed to set checkpoint scn", K(ret), K(max_checkpoint_scn)); + } else if (OB_FAIL(desc.max_scn_.convert_for_inner_table_field(max_checkpoint_scn))) { + LOG_WARN("failed to set max scn", K(ret), K(max_checkpoint_scn)); + } + if (OB_SUCC(ret)) { + FLOG_INFO("succeed to read checkpoint desc.", K(desc)); + } } return ret; } @@ -855,14 +876,32 @@ int ObArchiveStore::write_piece_checkpoint(const int64_t dest_id, const int64_t { int ret = OB_SUCCESS; ObBackupPath full_path; + ObBackupPath dir_path; const ObBackupDest &dest = get_backup_dest(); if (!is_init()) { ret = OB_NOT_INIT; LOG_WARN("ObArchiveStore not init", K(ret)); - } else if (OB_FAIL(ObArchivePathUtil::get_piece_checkpoint_file_path(dest, dest_id, round_id, piece_id, file_id, full_path))) { - LOG_WARN("failed to get piece checkpoint file path", K(ret), K(dest), K(dest_id), K(round_id), K(piece_id), K(file_id)); - } else if (OB_FAIL(write_single_file(full_path.get_ptr(), desc))) { - LOG_WARN("failed to write single file", K(ret), K(full_path)); + } else { + ObArchiveCheckpointMgr mgr; + bool meta_is_exist = false; + if (OB_FAIL(is_piece_checkpoint_file_exist(dest_id, round_id, piece_id, 0, meta_is_exist))) { + LOG_WARN("failed to check checkpoint meta file exist.", K(ret)); + } else if (!meta_is_exist) { + if (OB_FAIL(ObArchivePathUtil::get_piece_checkpoint_file_path(dest, dest_id, round_id, piece_id, 0, full_path))) { + LOG_WARN("failed to get piece checkpoint meta file path", K(ret), K(dest), K(dest_id), K(round_id), K(piece_id)); + } else if (OB_FAIL(write_single_file(full_path.get_ptr(), desc))) { + LOG_WARN("failed to write checkpoint index file", K(ret), K(desc), K(full_path)); + } + } + if (OB_FAIL(ret)) { + //do noting + } else if (OB_FAIL(ObArchivePathUtil::get_piece_checkpoint_dir_path(dest, dest_id, round_id, piece_id, dir_path))) { + LOG_WARN("failed to get piece checkpoint dir path", K(ret), K(dest), K(dest_id), K(round_id), K(piece_id)); + } else if (OB_FAIL(mgr.init(dir_path, OB_STR_CHECKPOINT_FILE_NAME, ObBackupFileSuffix::ARCHIVE, get_storage_info()))) { + LOG_WARN("failed to init ObArchiveCheckPointMgr", K(ret), K(dir_path)); + } else if (OB_FAIL(mgr.write(desc.checkpoint_scn_.get_val_for_inner_table_field()))) { + LOG_WARN("failed to write checkpoint info", K(ret), K(desc)); + } } return ret; } diff --git a/src/share/backup/ob_backup_path.cpp b/src/share/backup/ob_backup_path.cpp index ab810c2f6..206802113 100755 --- a/src/share/backup/ob_backup_path.cpp +++ b/src/share/backup/ob_backup_path.cpp @@ -460,6 +460,59 @@ int ObBackupPath::join_tenant_meta_index_file(const ObBackupDataType &backup_typ return ret; } +// param case: file_name -> 'checkpoint_info', checkpoint -> 1632889932327676777, type -> ARCHIVE +// result : oss://backup/[file_name].[checkpoint].obarc +int ObBackupPath::join_checkpoint_info_file(const common::ObString &file_name, const uint64_t checkpoint, const ObBackupFileSuffix &type) +{ + int ret = OB_SUCCESS; + if (cur_pos_ <= 0) { + ret = OB_NOT_INIT; + LOG_WARN("not inited", K(ret), K(*this)); + } else if (file_name.length() <= 0 ) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid args", K(ret), K(file_name)); + } else if (OB_FAIL(databuff_printf(path_, sizeof(path_), cur_pos_, "/%.*s", file_name.length(), file_name.ptr()))) { + LOG_WARN("failed to join file name", K(ret), K(file_name), K(*this)); + } else if (OB_FAIL(databuff_printf(path_, sizeof(path_), cur_pos_, ".%lu", checkpoint))) { + LOG_WARN("failed to join checkpoint", K(ret), K(checkpoint), K(*this)); + } else if (OB_FAIL(add_backup_suffix(type))) { + LOG_WARN("failed to add backup file suffix", K(ret), K(type), K(*this)); + } else if (OB_FAIL(trim_right_backslash())) { + OB_LOG(WARN, "fail to trim_right_backslash", K(ret)); + } + return ret; +} + +// param case: entry_d_name -> 'checkpoint_info.1678226622262333112.obarc', file_name -> 'checkpoint_info', type -> ARCHIVE +// result : checkpoint -> 1678226622262333112 +int ObBackupPath::parse_checkpoint(const common::ObString &entry_d_name, const common::ObString &file_name, const ObBackupFileSuffix &type, uint64_t &checkpoint) +{ + int ret = OB_SUCCESS; + checkpoint = 0; + ObBackupPath tmp_path; //format string for sscanf + char tmp_file_name[OB_MAX_FILE_NAME_LENGTH] = { 0 }; + if (entry_d_name.length() <= 0 || file_name.length() <= 0 || type > ObBackupFileSuffix::BACKUP || type < ObBackupFileSuffix::NONE) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid args", K(ret), K(entry_d_name), K(file_name)); + } else if (OB_FAIL(databuff_printf(tmp_file_name, sizeof(tmp_file_name), "%s.%%lu", file_name.ptr()))) { + LOG_WARN("failed to join tmp file name", K(ret), K(file_name)); + } else if (OB_FAIL(tmp_path.init(tmp_file_name))) { + LOG_WARN("failed to init tmp path", K(ret), K(tmp_file_name)); + } else if (OB_FAIL(tmp_path.add_backup_suffix(type))) { + LOG_WARN("failed to add backup file suffix", K(ret), K(type), K(tmp_path)); + } else if (OB_FAIL(tmp_path.trim_right_backslash())) { + OB_LOG(WARN, "fail to trim_right_backslash", K(ret)); + } else if (1 == sscanf(entry_d_name.ptr(), tmp_path.get_ptr(), &checkpoint)) { + if (REACH_TIME_INTERVAL(10 * 1000 * 1000)) { + OB_LOG(INFO, "succeed to get checkpoint scn", K(ret), K(entry_d_name), K(checkpoint), K(tmp_path)); + } + } else { + ret = OB_ERR_UNEXPECTED; + OB_LOG(WARN, "failed to get checkpoint", K(ret), K(entry_d_name), K(file_name), K(type), K(checkpoint), K(tmp_path)); + } + return ret; +} + common::ObString ObBackupPath::get_obstr() const { return ObString(cur_pos_, path_); diff --git a/src/share/backup/ob_backup_path.h b/src/share/backup/ob_backup_path.h index 84fc948fa..2d3c6e0b4 100755 --- a/src/share/backup/ob_backup_path.h +++ b/src/share/backup/ob_backup_path.h @@ -55,6 +55,8 @@ public: int join_meta_info_turn_and_retry(const int64_t turn_id, const int64_t retry_id); int join_tenant_macro_range_index_file(const share::ObBackupDataType &type, const int64_t retry_id); int join_tenant_meta_index_file(const share::ObBackupDataType &type, const int64_t retry_id, const bool is_sec_meta); + int join_checkpoint_info_file(const common::ObString &path, const uint64_t checkpoint, const ObBackupFileSuffix &type); + static int parse_checkpoint(const common::ObString &entry_d_name, const common::ObString &file_name, const ObBackupFileSuffix &type, uint64_t &checkpoint); int add_backup_suffix(const ObBackupFileSuffix &type); const char *get_ptr() const { return path_; } diff --git a/src/share/backup/ob_backup_serialize_provider.cpp b/src/share/backup/ob_backup_serialize_provider.cpp index f2f55e9f2..35a5431a7 100644 --- a/src/share/backup/ob_backup_serialize_provider.cpp +++ b/src/share/backup/ob_backup_serialize_provider.cpp @@ -92,12 +92,13 @@ int ObBackupSerializeHeaderWrapper::deserialize(const char *buf, const int64_t d LOG_WARN("serializer is null", K(ret)); } else { const ObBackupCommonHeader *common_header = reinterpret_cast(buf); - int64_t pos = common_header->header_length_; + int64_t header_len = common_header->header_length_; uint16_t data_type = common_header->data_type_; uint16_t version = common_header->data_version_; + pos = pos + header_len; if (OB_FAIL(common_header->check_header_checksum())) { LOG_WARN("failed to check common header", K(ret)); - } else if (common_header->data_zlength_ > data_len - pos) { + } else if (common_header->data_zlength_ > data_len - header_len) { ret = OB_ERR_UNEXPECTED; LOG_WARN("buf size is too small", K(ret), K(data_len), K(*common_header)); } else if (data_type != get_data_type()) { @@ -106,9 +107,9 @@ int ObBackupSerializeHeaderWrapper::deserialize(const char *buf, const int64_t d } else if (version != get_data_version()) { ret = OB_ERR_UNEXPECTED; LOG_WARN("data version not match", K(ret), K(*common_header), K(get_data_version())); - } else if (OB_FAIL(common_header->check_data_checksum(buf + pos, common_header->data_zlength_))) { + } else if (OB_FAIL(common_header->check_data_checksum(buf + header_len, common_header->data_zlength_))) { LOG_WARN("failed to check checksum", K(ret), K(*common_header)); - } else if (OB_FAIL(serializer_->deserialize(buf, pos + common_header->data_zlength_, pos))) { + } else if (OB_FAIL(serializer_->deserialize(buf, header_len + common_header->data_zlength_, pos))) { LOG_WARN("failed to do deserialize", K(ret), K(*common_header)); } else if (!serializer_->is_valid()) { ret = OB_ERR_UNEXPECTED; diff --git a/src/share/backup/ob_backup_struct.h b/src/share/backup/ob_backup_struct.h index a5bdb47a6..1e357034a 100755 --- a/src/share/backup/ob_backup_struct.h +++ b/src/share/backup/ob_backup_struct.h @@ -413,6 +413,7 @@ const char *const OB_STR_CLUSTER_VERSION = "cluster_version"; const char *const OB_BACKUP_SUFFIX=".obbak"; const char *const OB_ARCHIVE_SUFFIX=".obarc"; const char *const OB_STR_MIN_RESTORE_SCN_DISPLAY = "min_restore_scn_display"; +const char *const OB_STR_CHECKPOINT_FILE_NAME = "checkpoint_info"; enum ObBackupFileType { diff --git a/unittest/share/backup/CMakeLists.txt b/unittest/share/backup/CMakeLists.txt index 6ed9a18a9..b116e7750 100644 --- a/unittest/share/backup/CMakeLists.txt +++ b/unittest/share/backup/CMakeLists.txt @@ -1,3 +1,4 @@ storage_unittest(test_backup_path) storage_unittest(test_backup_struct) storage_unittest(test_log_archive_backup_info_mgr) +storage_unittest(test_archive_checkpoint_mgr) \ No newline at end of file diff --git a/unittest/share/backup/test_archive_checkpoint_mgr.cpp b/unittest/share/backup/test_archive_checkpoint_mgr.cpp new file mode 100644 index 000000000..689c802d5 --- /dev/null +++ b/unittest/share/backup/test_archive_checkpoint_mgr.cpp @@ -0,0 +1,318 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ +#define USING_LOG_PREFIX SHARE +#define private public +#include "lib/restore/ob_storage.h" +#include "lib/restore/ob_storage_oss_base.cpp" +#undef private + +#include +#include "lib/utility/ob_test_util.h" +#include "share/backup/ob_backup_io_adapter.h" +#include "share/backup/ob_backup_struct.h" +#define private public +#include "share/backup/ob_archive_checkpoint_mgr.h" +#undef private +#include "test_ob_backup_dest_config.h" +#include "lib/allocator/page_arena.h" +#include "lib/oblog/ob_log.h" +#include "lib/oblog/ob_log_module.h" +#include "share/ob_force_print_log.h" +#include "share/backup/ob_backup_path.h" +#include "share/ob_thread_pool.h" +#include "apr_allocator.h" + + +using namespace oceanbase::common; +using namespace oceanbase::share; + +const ObStorageType TEST_STORAGE_TYPES[] = { + OB_STORAGE_FILE, + OB_STORAGE_OSS, +}; + +class TestArchiveCheckpointMgr: public ::testing::Test +{ +public: + TestArchiveCheckpointMgr() {} + virtual ~TestArchiveCheckpointMgr() {} + virtual void SetUp() + { + } + virtual void TearDown() + { + } + + static void SetUpTestCase() + { + } + + static void TearDownTestCase() + { + } + + // function members + static int get_root_path(const ObStorageType &type, ObBackupPath &root_path); + static int get_storage_info(const ObStorageType &type, ObString &storage_info); + int clean_root_dir(const ObStorageType &type); + int clean_dir(const ObStorageType &type, const ObString &dir_uri); + int generate_simple_files(const ObStorageType &type); + + // test cases + typedef int (TestArchiveCheckpointMgr::*TEST_FUNCTION) (const ObStorageType &type); + int run_test_func(TEST_FUNCTION function); + int test_write_and_read_checkpoint(const ObStorageType &type); + + ObBackupStorageInfo storage_info_; + +private: + // disallow copy + DISALLOW_COPY_AND_ASSIGN(TestArchiveCheckpointMgr); +}; + +int TestArchiveCheckpointMgr::run_test_func(TEST_FUNCTION f) +{ + int ret = OB_SUCCESS; + const int64_t type_count = sizeof(TEST_STORAGE_TYPES) / sizeof(ObStorageType); + ObBackupPath path; + ObString storage_info; + for (int64_t i = 0; OB_SUCC(ret) && i < type_count; ++i) { + const ObStorageType type = TEST_STORAGE_TYPES[i]; + const int64_t start_ts = ObTimeUtility::current_time(); + storage_info_.reset(); + FLOG_INFO("start run_test_func", K(type)); + if (OB_FAIL(get_root_path(type, path))) { + LOG_WARN("failed to get root patch", K(ret), K(type)); + } else if (path.is_empty()) { + LOG_INFO("path is not set, skip unittest", K(type)); + } else if (OB_FAIL(get_storage_info(type, storage_info))) { + LOG_WARN("failed to get storage info", K(ret), K(type)); + } else if (OB_FAIL(storage_info_.set(type, storage_info.ptr()))) { + LOG_INFO("path is not set, skip unittest", K(type)); + } else if (OB_FAIL(clean_root_dir(type))) { + LOG_WARN("failed to clean root dir", K(ret), K(type)); + } else if (OB_FAIL((this->*f)(type))) { + LOG_WARN("failed to to test function", K(ret), K(type)); + } else if (OB_FAIL(clean_root_dir(type))) { + LOG_WARN("failed to clean root dir", K(ret), K(type)); + } + const int64_t cost_ts = ObTimeUtility::current_time() - start_ts; + FLOG_INFO("end run_test_func", K(ret), K(type), K(cost_ts)); + } + + return ret; +} + +int TestArchiveCheckpointMgr::get_root_path(const ObStorageType &type, ObBackupPath &root_path) +{ + int ret = OB_SUCCESS; + root_path.reset(); + + if (OB_STORAGE_OSS == type) { + if (OB_FAIL(root_path.init(oss_root_path))) { + LOG_WARN("failed to init oss root path", K(ret)); + } + } else if (OB_STORAGE_FILE == type) { + if (OB_FAIL(root_path.init(file_root_path))) { + LOG_WARN("failed to init file root path", K(ret)); + } + } else { + ret = OB_ERROR; + LOG_ERROR("unknown type", K(type)); + } + return ret; +} + +int TestArchiveCheckpointMgr::get_storage_info(const ObStorageType &type, ObString &storage_info) +{ + int ret = OB_SUCCESS; + storage_info.reset(); + if (OB_STORAGE_OSS == type) { + storage_info = ObString(oss_storage_info); + } else if (OB_STORAGE_FILE == type) { + storage_info = ObString(file_storage_info); + } else { + ret = OB_ERROR; + LOG_ERROR("unknown type", K(type)); + } + return ret; +} + +int TestArchiveCheckpointMgr::generate_simple_files(const ObStorageType &type) +{ + int ret = OB_SUCCESS; + ObBackupPath root_path; + ObBackupPath tmp_root_path; + ObBackupIoAdapter util; + char buf[] = "test"; + int64_t start_ts = ObTimeUtility::current_time(); + + LOG_INFO("start generate_simple_files", K(type)); + if (OB_FAIL(get_root_path(type, root_path))) { + LOG_WARN("failed to get root path", K(ret), K(root_path)); + } + + //├── test_checkpoint + //│   ├── file1 + //│   ├── file2 + //│   ├── file3 + + for (int64_t i = 1; OB_SUCC(ret) && i <= 4; ++i) { + tmp_root_path.reset(); + tmp_root_path = root_path; + if (OB_FAIL(tmp_root_path.join_checkpoint_info_file(OB_STR_CHECKPOINT_FILE_NAME, i, ObBackupFileSuffix::ARCHIVE))) { + LOG_WARN("failed to join path", K(ret)); + } else if (OB_FAIL(util.mk_parent_dir(tmp_root_path.get_ptr(), &storage_info_))) { + LOG_WARN("failed to mk dir", K(ret)); + } else if (OB_FAIL(util.write_single_file(tmp_root_path.get_ptr(), &storage_info_, buf, sizeof(buf)))) { + LOG_WARN("failed to write dir less", K(ret)); + } + } + + const int64_t cost_ts = ObTimeUtility::current_time() - start_ts; + LOG_INFO("finish prepare simple dir", K(type), K(cost_ts), K(ret)); + return ret; +} + +int TestArchiveCheckpointMgr::clean_root_dir(const ObStorageType &type) +{ + int ret = OB_SUCCESS; + ObBackupPath root_path; + if (OB_FAIL(get_root_path(type, root_path))) { + LOG_WARN("failed to get root path", K(ret), K(root_path)); + } else if (OB_FAIL(clean_dir(type, root_path.get_ptr()))) { + LOG_WARN("failed to clean dir", K(ret), K(type), K(root_path)); + } + return ret; +} + +int TestArchiveCheckpointMgr::clean_dir(const ObStorageType &type, const ObString &dir_uri) +{ + int ret = OB_SUCCESS; + ObBackupIoAdapter util; + char path[OB_MAX_BACKUP_PATH_LENGTH] = ""; + ObArray file_names; + ObArenaAllocator allocator; + char uri_del[OB_MAX_URI_LENGTH]; + ObFileListArrayOp op(file_names, allocator); + + if (OB_FAIL(util.list_files(dir_uri, &storage_info_, op))) { + LOG_WARN("failed to list files", K(ret), K(dir_uri)); + } else { + for (int64_t i = 0; OB_SUCC(ret) && i < file_names.count(); ++i) { + const ObString file = file_names.at(i); + if (OB_FAIL(databuff_printf(path, sizeof(path), "%.*s/%.*s", + dir_uri.length(), dir_uri.ptr(), file.length(), file.ptr()))) { + LOG_WARN("failed to join path", K(ret)); + } else if (OB_FAIL(util.del_file(path, &storage_info_))) { + LOG_WARN("failed to clean dir", K(ret), K(type), K(path)); + } + LOG_INFO("del_file", K(path)); + } + } + + if (OB_SUCC(ret)) { + if (OB_FAIL(util.del_dir(dir_uri, &storage_info_))) { + LOG_WARN("failed to del dir", K(ret), K(dir_uri)); + } + LOG_INFO("del_dir", K(dir_uri)); + } + + return ret; +}; + +int TestArchiveCheckpointMgr::test_write_and_read_checkpoint(const ObStorageType &type) +{ + int ret = OB_SUCCESS; + int64_t start_ts = ObTimeUtility::current_time(); + ObBackupPath root_path; + ObBackupPath tmp_root_path; + ObBackupIoAdapter util; + if (OB_FAIL(get_root_path(type, root_path))) { + LOG_WARN("failed to get root path", K(ret), K(root_path)); + } else { + ObArchiveCheckpointMgr mgr; + bool is_exist = true; + uint64_t checkpoint = 0; + if (OB_FAIL(mgr.init(root_path, OB_STR_CHECKPOINT_FILE_NAME, ObBackupFileSuffix::ARCHIVE, &storage_info_))) { + LOG_WARN("failed to init checkpoint mgr", K(ret), K(root_path)); + } else if (OB_FAIL(mgr.write(10))) { + LOG_WARN("failed to write files", K(ret), K(root_path)); + } else if (OB_FAIL(mgr.write(9))) { + LOG_WARN("failed to write files", K(ret), K(root_path)); + } else if (OB_FAIL(mgr.read(checkpoint))) { + LOG_WARN("failed to read files", K(ret)); + } else if (checkpoint != 10) { + ret = OB_ERROR; + } else if (OB_FAIL(mgr.write(100))) { + LOG_WARN("failed to write files", K(ret), K(root_path)); + } else if (OB_FAIL(mgr.write(50))) { + LOG_WARN("failed to write files", K(ret), K(root_path)); + } else if (OB_FAIL(mgr.read(checkpoint))) { + LOG_WARN("failed to read files", K(ret)); + } else if (checkpoint != 100) { + ret = OB_ERROR; + } else { + tmp_root_path.reset(); + tmp_root_path = root_path; + if (OB_FAIL(tmp_root_path.join_checkpoint_info_file(OB_STR_CHECKPOINT_FILE_NAME, 10, ObBackupFileSuffix::ARCHIVE))) { + LOG_WARN("failed to join path", K(ret)); + } else if (OB_FAIL(util.is_exist(tmp_root_path.get_ptr(), &storage_info_, is_exist))) { + LOG_WARN("failed to judge file exist", K(ret), K(tmp_root_path)); + } else if (is_exist) { + ret = OB_ERROR; + } + tmp_root_path.reset(); + tmp_root_path = root_path; + if (OB_SUCC(ret)) { + if (OB_FAIL(tmp_root_path.join_checkpoint_info_file(OB_STR_CHECKPOINT_FILE_NAME, 9, ObBackupFileSuffix::ARCHIVE))) { + LOG_WARN("failed to join path", K(ret)); + } else if (OB_FAIL(util.is_exist(tmp_root_path.get_ptr(), &storage_info_, is_exist))) { + LOG_WARN("failed to judge file exist", K(ret), K(tmp_root_path)); + } else if (is_exist) { + ret = OB_ERROR; + } + } + tmp_root_path.reset(); + tmp_root_path = root_path; + if (OB_SUCC(ret)) { + if (OB_FAIL(tmp_root_path.join_checkpoint_info_file(OB_STR_CHECKPOINT_FILE_NAME, 50, ObBackupFileSuffix::ARCHIVE))) { + LOG_WARN("failed to join path", K(ret)); + } else if (OB_FAIL(util.is_exist(tmp_root_path.get_ptr(), &storage_info_, is_exist))) { + LOG_WARN("failed to judge file exist", K(ret), K(tmp_root_path)); + } else if (!is_exist) { + ret = OB_ERROR; + } + } + } + } + + const int64_t cost_ts = ObTimeUtility::current_time() - start_ts; + LOG_INFO("finish test_list_util", K(type), K(cost_ts), K(ret)); + return ret; +} + +TEST_F(TestArchiveCheckpointMgr, test_write_read_op) +{ + TEST_FUNCTION f = &TestArchiveCheckpointMgr::test_write_and_read_checkpoint; + int ret = run_test_func(f); + ASSERT_EQ(OB_SUCCESS, ret); +} + +int main(int argc, char **argv) +{ + system("rm -f test_archive_checkpoint_mgr.log"); + OB_LOGGER.set_file_name("test_archive_checkpoint_mgr.log"); + OB_LOGGER.set_log_level("INFO"); + ::testing::InitGoogleTest(&argc,argv); + return RUN_ALL_TESTS(); +} diff --git a/unittest/share/backup/test_ob_backup_dest_config.h b/unittest/share/backup/test_ob_backup_dest_config.h new file mode 100644 index 000000000..7e1ac6db6 --- /dev/null +++ b/unittest/share/backup/test_ob_backup_dest_config.h @@ -0,0 +1,24 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + + +#ifndef UNITTEST_BACKUP_OB_ARCHIVE_CHECKPOINT_MGR_CONFIG_H_ +#define UNITTEST_BACKUP_OB_ARCHIVE_CHECKPOINT_MGR_CONFIG_H_ + + // file path use relative path, object path use absolute path +const char *file_root_path = ""; +const char *file_storage_info = ""; + +const char *oss_root_path = "";//"oss://xxx/unittest" +const char *oss_storage_info = "host=xxxx&access_id=xxx&access_key=xxx&delete=xxx"; + +#endif /* UNITTEST_BACKUP_OB_ARCHIVE_CHECKPOINT_MGR_CONFIG_H_ */ \ No newline at end of file