Uing Small Files to Resolve Concurrent Reading and Writing of Archived Metainformation

This commit is contained in:
obdev 2023-07-21 08:42:22 +00:00 committed by ob-robot
parent 325413f334
commit ef07d04a9c
11 changed files with 801 additions and 13 deletions

View File

@ -48,6 +48,7 @@ ob_set_subtarget(ob_share backup
backup/ob_backup_config.cpp
backup/ob_log_restore_config.cpp
backup/ob_log_restore_struct.cpp
backup/ob_archive_checkpoint_mgr.cpp
)
ob_set_subtarget(ob_share cache

View File

@ -0,0 +1,236 @@
// Copyright (c) 2021 OceanBase
// OceanBase is licensed under Mulan PubL v2.
// You can use this software according to the terms and conditions of the Mulan PubL v2.
// You may obtain a copy of Mulan PubL v2 at:
// http://license.coscl.org.cn/MulanPubL-2.0
// THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
// EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
// MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
// See the Mulan PubL v2 for more details.
#define USING_LOG_PREFIX SHARE
#include "ob_archive_checkpoint_mgr.h"
#include "lib/alloc/alloc_assist.h"
#include "lib/oblog/ob_log_module.h"
#include "lib/string/ob_sql_string.h"
#include "share/backup/ob_backup_struct.h"
#include "share/backup/ob_archive_path.h"
using namespace oceanbase;
using namespace share;
bool ObGetMaxCheckpointOp::is_valid() const
{
return max_checkpoint_scn_ >= 0
&& file_name_ != nullptr
&& type_ <= ObBackupFileSuffix::BACKUP
&& type_ >= ObBackupFileSuffix::NONE;
}
int ObGetMaxCheckpointOp::func(const dirent *entry)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(entry)) {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "invalid list entry, entry is null", K(ret));
} else if (OB_ISNULL(entry->d_name) || !is_valid()) {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "invalid list entry, d_name is null", K(ret));
} else {
uint64_t checkpoint_scn = 0;
if (OB_FAIL(ObBackupPath::parse_checkpoint(entry->d_name, file_name_, type_, checkpoint_scn))) {
OB_LOG(WARN, "failed to get checkpoint scn", K(ret), KP(entry->d_name));
} else if (checkpoint_scn > max_checkpoint_scn_) {
max_checkpoint_scn_ = checkpoint_scn;
}
}
return ret;
}
bool ObDelHisCheckpointFileOp::is_valid() const
{
return checkpoint_scn_ >= 0
&& !path_.is_empty()
&& file_name_ != nullptr
&& type_ <= ObBackupFileSuffix::BACKUP
&& type_ >= ObBackupFileSuffix::NONE
&& storage_info_ != nullptr;
}
int ObDelHisCheckpointFileOp::func(const dirent *entry)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(entry)) {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "invalid list entry, entry is null", K(ret));
} else if (OB_ISNULL(entry->d_name) || !is_valid()) {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "invalid list entry, d_name is null", K(ret));
} else {
uint64_t checkpoint_scn = 0;
ObBackupPath full_path = path_;
ObBackupIoAdapter io_util;
if (OB_FAIL(ObBackupPath::parse_checkpoint(entry->d_name, file_name_, type_, checkpoint_scn))) {
OB_LOG(WARN, "failed to get checkpoint scn", K(ret), KP(entry->d_name));
} else if (checkpoint_scn >= checkpoint_scn_ || 0 == checkpoint_scn) {
//do nothing
} else if (OB_FAIL(full_path.join_checkpoint_info_file(file_name_, checkpoint_scn, type_))) {
OB_LOG(WARN, "failed to set full path for del file",
K(ret), K(checkpoint_scn), KP(file_name_), K(type_));
} else if (OB_FAIL(io_util.del_file(full_path.get_ptr(), storage_info_))) {
OB_LOG(WARN, "failed to delete file", K(ret), K(full_path));
}
}
return ret;
}
/**
* ------------------------------ObArchiveCheckpointMgr---------------------
*/
int ObArchiveCheckpointMgr::init(
const ObBackupPath &path,
const char *file_name,
const ObBackupFileSuffix &type,
const ObBackupStorageInfo *storage_info)
{
int ret = OB_SUCCESS;
if (IS_INIT) {
ret = OB_INIT_TWICE;
LOG_WARN("ObArchiveCheckpointMgr init twice", K(ret));
} else if (path.is_empty()
|| OB_ISNULL(file_name)
|| type > ObBackupFileSuffix::BACKUP
|| type < ObBackupFileSuffix::NONE
|| OB_ISNULL(storage_info)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), K(path), KP(file_name), K(type));
} else {
path_ = path;
file_name_ = file_name;
type_ = type;
storage_info_ = storage_info;
is_inited_ = true;
}
return ret;
}
bool ObArchiveCheckpointMgr::is_valid() const
{
return !path_.is_empty()
&& type_ <= ObBackupFileSuffix::BACKUP
&& type_ >= ObBackupFileSuffix::NONE
&& file_name_ != nullptr
&& storage_info_ != nullptr;
}
void ObArchiveCheckpointMgr::reset()
{
is_inited_ = false;
path_.reset();
file_name_ = nullptr;
type_ = ObBackupFileSuffix::NONE;
storage_info_ = nullptr;
}
int ObArchiveCheckpointMgr::check_is_tagging_(const ObBackupStorageInfo *storage_info, bool &is_tagging) const
{
int ret = OB_SUCCESS;
is_tagging = false;
if (OB_STORAGE_OSS == storage_info_ -> device_type_) {
//TODO(zhixing.yh) Adapt the analytic interface in storage_info
}
return ret;
}
int ObArchiveCheckpointMgr::write(const uint64_t checkpoint_scn) const
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
uint64_t max_checkpoint_scn = 0;
ObBackupPath full_path = path_; //checkpoint scn file path
ObBackupPath dir_path = path_; //checkpoint dir file path
ObBackupIoAdapter io_util;
bool is_tagging = false;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("Archive checkpoint mgr not init", K(ret));
} else if (checkpoint_scn <= 0) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument!", K(ret), K(checkpoint_scn));
} else if (OB_FAIL(check_is_tagging_(storage_info_, is_tagging))) {
LOG_WARN("failed to judge delete mode", K(ret));
} else if (OB_FAIL(full_path.join_checkpoint_info_file(file_name_, checkpoint_scn, type_))) {
LOG_WARN("failed to get piece checkpoint file path",
K(ret), K(checkpoint_scn), KP(file_name_), K(full_path), K(type_));
} else if (OB_FAIL(write_checkpoint_file_(full_path))) {
LOG_WARN("failed to write checkpoint file", K(ret), K(full_path));
}
//if the delete mode is not 'tagging', need to list files for deleting smaller checkpoint scn files
if (OB_SUCC(ret) && !is_tagging && OB_TMP_FAIL(del_history_files_(dir_path, checkpoint_scn))) {
LOG_WARN("failed to delete files", K(ret), K(dir_path), K(checkpoint_scn), K(tmp_ret));
}
return ret;
}
int ObArchiveCheckpointMgr::read(uint64_t &max_checkpoint_scn) const
{
int ret = OB_SUCCESS;
ObArray<common::ObString> file_names;
common::ObArenaAllocator allocator;
ObBackupPath checkpoint_dir_path = path_;
max_checkpoint_scn = 0;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("Archive checkpoint mgr not init", K(ret));
} else if (OB_FAIL(get_max_checkpoint_scn_(checkpoint_dir_path, max_checkpoint_scn))) {
LOG_WARN("failed to get max checkpoint scn", K(ret), K(checkpoint_dir_path));
}
return ret;
}
int ObArchiveCheckpointMgr::get_max_checkpoint_scn_(
const ObBackupPath &path,
uint64_t &max_checkpoint_scn) const
{
int ret = OB_SUCCESS;
ObBackupIoAdapter io_util;
max_checkpoint_scn = 0;
ObGetMaxCheckpointOp max_checkpoint_op(max_checkpoint_scn, file_name_, type_);
if (OB_FAIL(io_util.list_files(path.get_ptr(), storage_info_, max_checkpoint_op))) {
LOG_WARN("failed to get max checkpoint scn", K(ret), K(path));
}
return ret;
}
int ObArchiveCheckpointMgr::del_history_files_(
const ObBackupPath &dir_path,
const uint64_t write_checkpoint_scn) const
{
int ret = OB_SUCCESS;
ObBackupIoAdapter io_util;
ObDelHisCheckpointFileOp del_his_file_op(write_checkpoint_scn, dir_path, file_name_, type_, storage_info_);
if (OB_FAIL(io_util.list_files(dir_path.get_ptr(), storage_info_, del_his_file_op))) {
LOG_WARN("failed to del history checkpoint file",
K(ret), K(dir_path), K(write_checkpoint_scn), K(path_), KP(file_name_), K(type_));
}
return ret;
}
int ObArchiveCheckpointMgr::write_checkpoint_file_(const ObBackupPath &path) const
{
int ret = OB_SUCCESS;
int64_t pos = 0;
ObBackupIoAdapter io_util;
char buf = '\0';
const int64_t buf_size = 0;
if (OB_FAIL(io_util.mk_parent_dir(path.get_ptr(), storage_info_))) {
LOG_WARN("failed to mk dir.", K(ret), K(path));
} else if (OB_FAIL(io_util.write_single_file(path.get_ptr(), storage_info_, &buf, buf_size))) {
LOG_WARN("failed to write single file.", K(ret), K(path));
} else {
if (REACH_TIME_INTERVAL(10 * 1000 * 1000)) {
FLOG_INFO("succeed to write checkpoint file.", K(path));
}
}
return ret;
}

View File

@ -0,0 +1,112 @@
// Copyright (c) 2021 OceanBase
// OceanBase is licensed under Mulan PubL v2.
// You can use this software according to the terms and conditions of the Mulan PubL v2.
// You may obtain a copy of Mulan PubL v2 at:
// http://license.coscl.org.cn/MulanPubL-2.0
// THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
// EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
// MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
// See the Mulan PubL v2 for more details.
#ifndef SRC_SHARE_BACKUP_OB_ARCHIVE_CHECKPOINT_MGR_H_
#define SRC_SHARE_BACKUP_OB_ARCHIVE_CHECKPOINT_MGR_H_
#include "share/backup/ob_backup_io_adapter.h"
#include "share/backup/ob_backup_path.h"
#include "share/backup/ob_archive_store.h"
#include "lib/utility/ob_unify_serialize.h"
namespace oceanbase
{
namespace share
{
class ObGetMaxCheckpointOp : public ObBaseDirEntryOperator
{
public:
ObGetMaxCheckpointOp(
uint64_t& max_checkpoint_scn,
const char *file_name,
const ObBackupFileSuffix &type)
: max_checkpoint_scn_(max_checkpoint_scn),
file_name_(file_name),
type_(type) {}
virtual ~ObGetMaxCheckpointOp() {}
bool is_valid() const;
int func(const dirent *entry);
private:
uint64_t& max_checkpoint_scn_;
const char *file_name_;
ObBackupFileSuffix type_;
DISALLOW_COPY_AND_ASSIGN(ObGetMaxCheckpointOp);
};
//delete files with smaller checkpoint_scn in file name
class ObDelHisCheckpointFileOp : public ObBaseDirEntryOperator
{
public:
ObDelHisCheckpointFileOp(
const uint64_t checkpoint_scn,
const ObBackupPath &path,
const char *file_name,
const ObBackupFileSuffix &type,
const share::ObBackupStorageInfo *storage_info)
: checkpoint_scn_(checkpoint_scn),
path_(path),
file_name_(file_name),
type_(type),
storage_info_(storage_info) {}
virtual ~ObDelHisCheckpointFileOp() {}
bool is_valid() const;
int func(const dirent *entry) ;
private:
uint64_t checkpoint_scn_;
ObBackupPath path_;
const char *file_name_;
ObBackupFileSuffix type_;
const share::ObBackupStorageInfo *storage_info_;
DISALLOW_COPY_AND_ASSIGN(ObDelHisCheckpointFileOp);
};
class ObArchiveCheckpointMgr final
{
public:
ObArchiveCheckpointMgr()
: is_inited_(false),
path_(),
file_name_(nullptr),
type_(),
storage_info_(nullptr) {}
~ObArchiveCheckpointMgr() {}
int init(
const ObBackupPath &path,
const char *file_name,
const ObBackupFileSuffix &type,
const ObBackupStorageInfo *storage_info);
void reset();
bool is_valid() const;
int write(const uint64_t checkpoint_scn) const;
int read(uint64_t &checkpoint_scn) const;
private:
int get_max_checkpoint_scn_(const ObBackupPath &path, uint64_t &max_checkpoint_scn) const;
int del_history_files_(const ObBackupPath &dir_path, const uint64_t write_checkpoint_scn) const;
int write_checkpoint_file_(const ObBackupPath &path) const;
int check_is_tagging_(const ObBackupStorageInfo *storage_info, bool &is_tagging) const;
TO_STRING_KV(K_(is_inited), K_(path), KP_(file_name), K_(type));
private:
bool is_inited_;
ObBackupPath path_;
const char *file_name_;
ObBackupFileSuffix type_;
const ObBackupStorageInfo *storage_info_;
DISALLOW_COPY_AND_ASSIGN(ObArchiveCheckpointMgr);
};
}
}
#endif /* SRC_SHARE_BACKUP_OB_ARCHIVE_CHECKPOINT_MGR_H_*/

View File

@ -19,6 +19,7 @@
#include "lib/oblog/ob_log_module.h"
#include "lib/utility/utility.h"
#include "share/backup/ob_archive_path.h"
#include "share/backup/ob_archive_checkpoint_mgr.h"
using namespace oceanbase;
using namespace common;
@ -838,15 +839,35 @@ int ObArchiveStore::is_piece_checkpoint_file_exist(const int64_t dest_id, const
int ObArchiveStore::read_piece_checkpoint(const int64_t dest_id, const int64_t round_id, const int64_t piece_id, const int64_t file_id, ObPieceCheckpointDesc &desc) const
{
int ret = OB_SUCCESS;
ObBackupPath full_path;
ObBackupPath dir_path;
ObBackupPath meta_full_path;
const ObBackupDest &dest = get_backup_dest();
if (!is_init()) {
ret = OB_NOT_INIT;
LOG_WARN("ObArchiveStore not init", K(ret));
} else if (OB_FAIL(ObArchivePathUtil::get_piece_checkpoint_file_path(dest, dest_id, round_id, piece_id, file_id, full_path))) {
LOG_WARN("failed to get piece checkpoint file path", K(ret), K(dest), K(dest_id), K(round_id), K(piece_id), K(file_id));
} else if (OB_FAIL(read_single_file(full_path.get_ptr(), desc))) {
LOG_WARN("failed to read single file", K(ret), K(full_path), K(dest_id), K(round_id), K(piece_id));
} else {
ObArchiveCheckpointMgr mgr;
uint64_t max_checkpoint_scn = 0;
if (OB_FAIL(ObArchivePathUtil::get_piece_checkpoint_file_path(dest, dest_id, round_id, piece_id, 0, meta_full_path))) {
LOG_WARN("failed to get checkpoint meta file path", K(ret), K(dest), K(round_id), K(dest_id), K(piece_id));
} else if (OB_FAIL(read_single_file(meta_full_path.get_ptr(), desc))) {
LOG_WARN("failed to read mate file", K(ret), K(meta_full_path));
} else if (OB_FAIL(ObArchivePathUtil::get_piece_checkpoint_dir_path(dest, dest_id, round_id, piece_id, dir_path))) {
LOG_WARN("failed to get piece checkpoint dir path", K(ret), K(dest), K(dest_id), K(round_id), K(piece_id));
} else if (OB_FAIL(mgr.init(dir_path, OB_STR_CHECKPOINT_FILE_NAME, ObBackupFileSuffix::ARCHIVE, get_storage_info()))) {
LOG_WARN("failed to init ObArchiveCheckPointMgr", K(ret), K(dir_path));
} else if (OB_FAIL(mgr.read(max_checkpoint_scn))) {
LOG_WARN("failed to read checkpoint scn", K(ret), K(max_checkpoint_scn));
} else if (0 == max_checkpoint_scn) {
//do nothing
} else if (OB_FAIL(desc.checkpoint_scn_.convert_for_inner_table_field(max_checkpoint_scn))) {
LOG_WARN("failed to set checkpoint scn", K(ret), K(max_checkpoint_scn));
} else if (OB_FAIL(desc.max_scn_.convert_for_inner_table_field(max_checkpoint_scn))) {
LOG_WARN("failed to set max scn", K(ret), K(max_checkpoint_scn));
}
if (OB_SUCC(ret)) {
FLOG_INFO("succeed to read checkpoint desc.", K(desc));
}
}
return ret;
}
@ -855,14 +876,32 @@ int ObArchiveStore::write_piece_checkpoint(const int64_t dest_id, const int64_t
{
int ret = OB_SUCCESS;
ObBackupPath full_path;
ObBackupPath dir_path;
const ObBackupDest &dest = get_backup_dest();
if (!is_init()) {
ret = OB_NOT_INIT;
LOG_WARN("ObArchiveStore not init", K(ret));
} else if (OB_FAIL(ObArchivePathUtil::get_piece_checkpoint_file_path(dest, dest_id, round_id, piece_id, file_id, full_path))) {
LOG_WARN("failed to get piece checkpoint file path", K(ret), K(dest), K(dest_id), K(round_id), K(piece_id), K(file_id));
} else if (OB_FAIL(write_single_file(full_path.get_ptr(), desc))) {
LOG_WARN("failed to write single file", K(ret), K(full_path));
} else {
ObArchiveCheckpointMgr mgr;
bool meta_is_exist = false;
if (OB_FAIL(is_piece_checkpoint_file_exist(dest_id, round_id, piece_id, 0, meta_is_exist))) {
LOG_WARN("failed to check checkpoint meta file exist.", K(ret));
} else if (!meta_is_exist) {
if (OB_FAIL(ObArchivePathUtil::get_piece_checkpoint_file_path(dest, dest_id, round_id, piece_id, 0, full_path))) {
LOG_WARN("failed to get piece checkpoint meta file path", K(ret), K(dest), K(dest_id), K(round_id), K(piece_id));
} else if (OB_FAIL(write_single_file(full_path.get_ptr(), desc))) {
LOG_WARN("failed to write checkpoint index file", K(ret), K(desc), K(full_path));
}
}
if (OB_FAIL(ret)) {
//do noting
} else if (OB_FAIL(ObArchivePathUtil::get_piece_checkpoint_dir_path(dest, dest_id, round_id, piece_id, dir_path))) {
LOG_WARN("failed to get piece checkpoint dir path", K(ret), K(dest), K(dest_id), K(round_id), K(piece_id));
} else if (OB_FAIL(mgr.init(dir_path, OB_STR_CHECKPOINT_FILE_NAME, ObBackupFileSuffix::ARCHIVE, get_storage_info()))) {
LOG_WARN("failed to init ObArchiveCheckPointMgr", K(ret), K(dir_path));
} else if (OB_FAIL(mgr.write(desc.checkpoint_scn_.get_val_for_inner_table_field()))) {
LOG_WARN("failed to write checkpoint info", K(ret), K(desc));
}
}
return ret;
}

View File

@ -460,6 +460,59 @@ int ObBackupPath::join_tenant_meta_index_file(const ObBackupDataType &backup_typ
return ret;
}
// param case: file_name -> 'checkpoint_info', checkpoint -> 1632889932327676777, type -> ARCHIVE
// result : oss://backup/[file_name].[checkpoint].obarc
int ObBackupPath::join_checkpoint_info_file(const common::ObString &file_name, const uint64_t checkpoint, const ObBackupFileSuffix &type)
{
int ret = OB_SUCCESS;
if (cur_pos_ <= 0) {
ret = OB_NOT_INIT;
LOG_WARN("not inited", K(ret), K(*this));
} else if (file_name.length() <= 0 ) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", K(ret), K(file_name));
} else if (OB_FAIL(databuff_printf(path_, sizeof(path_), cur_pos_, "/%.*s", file_name.length(), file_name.ptr()))) {
LOG_WARN("failed to join file name", K(ret), K(file_name), K(*this));
} else if (OB_FAIL(databuff_printf(path_, sizeof(path_), cur_pos_, ".%lu", checkpoint))) {
LOG_WARN("failed to join checkpoint", K(ret), K(checkpoint), K(*this));
} else if (OB_FAIL(add_backup_suffix(type))) {
LOG_WARN("failed to add backup file suffix", K(ret), K(type), K(*this));
} else if (OB_FAIL(trim_right_backslash())) {
OB_LOG(WARN, "fail to trim_right_backslash", K(ret));
}
return ret;
}
// param case: entry_d_name -> 'checkpoint_info.1678226622262333112.obarc', file_name -> 'checkpoint_info', type -> ARCHIVE
// result : checkpoint -> 1678226622262333112
int ObBackupPath::parse_checkpoint(const common::ObString &entry_d_name, const common::ObString &file_name, const ObBackupFileSuffix &type, uint64_t &checkpoint)
{
int ret = OB_SUCCESS;
checkpoint = 0;
ObBackupPath tmp_path; //format string for sscanf
char tmp_file_name[OB_MAX_FILE_NAME_LENGTH] = { 0 };
if (entry_d_name.length() <= 0 || file_name.length() <= 0 || type > ObBackupFileSuffix::BACKUP || type < ObBackupFileSuffix::NONE) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", K(ret), K(entry_d_name), K(file_name));
} else if (OB_FAIL(databuff_printf(tmp_file_name, sizeof(tmp_file_name), "%s.%%lu", file_name.ptr()))) {
LOG_WARN("failed to join tmp file name", K(ret), K(file_name));
} else if (OB_FAIL(tmp_path.init(tmp_file_name))) {
LOG_WARN("failed to init tmp path", K(ret), K(tmp_file_name));
} else if (OB_FAIL(tmp_path.add_backup_suffix(type))) {
LOG_WARN("failed to add backup file suffix", K(ret), K(type), K(tmp_path));
} else if (OB_FAIL(tmp_path.trim_right_backslash())) {
OB_LOG(WARN, "fail to trim_right_backslash", K(ret));
} else if (1 == sscanf(entry_d_name.ptr(), tmp_path.get_ptr(), &checkpoint)) {
if (REACH_TIME_INTERVAL(10 * 1000 * 1000)) {
OB_LOG(INFO, "succeed to get checkpoint scn", K(ret), K(entry_d_name), K(checkpoint), K(tmp_path));
}
} else {
ret = OB_ERR_UNEXPECTED;
OB_LOG(WARN, "failed to get checkpoint", K(ret), K(entry_d_name), K(file_name), K(type), K(checkpoint), K(tmp_path));
}
return ret;
}
common::ObString ObBackupPath::get_obstr() const
{
return ObString(cur_pos_, path_);

View File

@ -55,6 +55,8 @@ public:
int join_meta_info_turn_and_retry(const int64_t turn_id, const int64_t retry_id);
int join_tenant_macro_range_index_file(const share::ObBackupDataType &type, const int64_t retry_id);
int join_tenant_meta_index_file(const share::ObBackupDataType &type, const int64_t retry_id, const bool is_sec_meta);
int join_checkpoint_info_file(const common::ObString &path, const uint64_t checkpoint, const ObBackupFileSuffix &type);
static int parse_checkpoint(const common::ObString &entry_d_name, const common::ObString &file_name, const ObBackupFileSuffix &type, uint64_t &checkpoint);
int add_backup_suffix(const ObBackupFileSuffix &type);
const char *get_ptr() const { return path_; }

View File

@ -92,12 +92,13 @@ int ObBackupSerializeHeaderWrapper::deserialize(const char *buf, const int64_t d
LOG_WARN("serializer is null", K(ret));
} else {
const ObBackupCommonHeader *common_header = reinterpret_cast<const ObBackupCommonHeader*>(buf);
int64_t pos = common_header->header_length_;
int64_t header_len = common_header->header_length_;
uint16_t data_type = common_header->data_type_;
uint16_t version = common_header->data_version_;
pos = pos + header_len;
if (OB_FAIL(common_header->check_header_checksum())) {
LOG_WARN("failed to check common header", K(ret));
} else if (common_header->data_zlength_ > data_len - pos) {
} else if (common_header->data_zlength_ > data_len - header_len) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("buf size is too small", K(ret), K(data_len), K(*common_header));
} else if (data_type != get_data_type()) {
@ -106,9 +107,9 @@ int ObBackupSerializeHeaderWrapper::deserialize(const char *buf, const int64_t d
} else if (version != get_data_version()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("data version not match", K(ret), K(*common_header), K(get_data_version()));
} else if (OB_FAIL(common_header->check_data_checksum(buf + pos, common_header->data_zlength_))) {
} else if (OB_FAIL(common_header->check_data_checksum(buf + header_len, common_header->data_zlength_))) {
LOG_WARN("failed to check checksum", K(ret), K(*common_header));
} else if (OB_FAIL(serializer_->deserialize(buf, pos + common_header->data_zlength_, pos))) {
} else if (OB_FAIL(serializer_->deserialize(buf, header_len + common_header->data_zlength_, pos))) {
LOG_WARN("failed to do deserialize", K(ret), K(*common_header));
} else if (!serializer_->is_valid()) {
ret = OB_ERR_UNEXPECTED;

View File

@ -413,6 +413,7 @@ const char *const OB_STR_CLUSTER_VERSION = "cluster_version";
const char *const OB_BACKUP_SUFFIX=".obbak";
const char *const OB_ARCHIVE_SUFFIX=".obarc";
const char *const OB_STR_MIN_RESTORE_SCN_DISPLAY = "min_restore_scn_display";
const char *const OB_STR_CHECKPOINT_FILE_NAME = "checkpoint_info";
enum ObBackupFileType
{

View File

@ -1,3 +1,4 @@
storage_unittest(test_backup_path)
storage_unittest(test_backup_struct)
storage_unittest(test_log_archive_backup_info_mgr)
storage_unittest(test_archive_checkpoint_mgr)

View File

@ -0,0 +1,318 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX SHARE
#define private public
#include "lib/restore/ob_storage.h"
#include "lib/restore/ob_storage_oss_base.cpp"
#undef private
#include <gtest/gtest.h>
#include "lib/utility/ob_test_util.h"
#include "share/backup/ob_backup_io_adapter.h"
#include "share/backup/ob_backup_struct.h"
#define private public
#include "share/backup/ob_archive_checkpoint_mgr.h"
#undef private
#include "test_ob_backup_dest_config.h"
#include "lib/allocator/page_arena.h"
#include "lib/oblog/ob_log.h"
#include "lib/oblog/ob_log_module.h"
#include "share/ob_force_print_log.h"
#include "share/backup/ob_backup_path.h"
#include "share/ob_thread_pool.h"
#include "apr_allocator.h"
using namespace oceanbase::common;
using namespace oceanbase::share;
const ObStorageType TEST_STORAGE_TYPES[] = {
OB_STORAGE_FILE,
OB_STORAGE_OSS,
};
class TestArchiveCheckpointMgr: public ::testing::Test
{
public:
TestArchiveCheckpointMgr() {}
virtual ~TestArchiveCheckpointMgr() {}
virtual void SetUp()
{
}
virtual void TearDown()
{
}
static void SetUpTestCase()
{
}
static void TearDownTestCase()
{
}
// function members
static int get_root_path(const ObStorageType &type, ObBackupPath &root_path);
static int get_storage_info(const ObStorageType &type, ObString &storage_info);
int clean_root_dir(const ObStorageType &type);
int clean_dir(const ObStorageType &type, const ObString &dir_uri);
int generate_simple_files(const ObStorageType &type);
// test cases
typedef int (TestArchiveCheckpointMgr::*TEST_FUNCTION) (const ObStorageType &type);
int run_test_func(TEST_FUNCTION function);
int test_write_and_read_checkpoint(const ObStorageType &type);
ObBackupStorageInfo storage_info_;
private:
// disallow copy
DISALLOW_COPY_AND_ASSIGN(TestArchiveCheckpointMgr);
};
int TestArchiveCheckpointMgr::run_test_func(TEST_FUNCTION f)
{
int ret = OB_SUCCESS;
const int64_t type_count = sizeof(TEST_STORAGE_TYPES) / sizeof(ObStorageType);
ObBackupPath path;
ObString storage_info;
for (int64_t i = 0; OB_SUCC(ret) && i < type_count; ++i) {
const ObStorageType type = TEST_STORAGE_TYPES[i];
const int64_t start_ts = ObTimeUtility::current_time();
storage_info_.reset();
FLOG_INFO("start run_test_func", K(type));
if (OB_FAIL(get_root_path(type, path))) {
LOG_WARN("failed to get root patch", K(ret), K(type));
} else if (path.is_empty()) {
LOG_INFO("path is not set, skip unittest", K(type));
} else if (OB_FAIL(get_storage_info(type, storage_info))) {
LOG_WARN("failed to get storage info", K(ret), K(type));
} else if (OB_FAIL(storage_info_.set(type, storage_info.ptr()))) {
LOG_INFO("path is not set, skip unittest", K(type));
} else if (OB_FAIL(clean_root_dir(type))) {
LOG_WARN("failed to clean root dir", K(ret), K(type));
} else if (OB_FAIL((this->*f)(type))) {
LOG_WARN("failed to to test function", K(ret), K(type));
} else if (OB_FAIL(clean_root_dir(type))) {
LOG_WARN("failed to clean root dir", K(ret), K(type));
}
const int64_t cost_ts = ObTimeUtility::current_time() - start_ts;
FLOG_INFO("end run_test_func", K(ret), K(type), K(cost_ts));
}
return ret;
}
int TestArchiveCheckpointMgr::get_root_path(const ObStorageType &type, ObBackupPath &root_path)
{
int ret = OB_SUCCESS;
root_path.reset();
if (OB_STORAGE_OSS == type) {
if (OB_FAIL(root_path.init(oss_root_path))) {
LOG_WARN("failed to init oss root path", K(ret));
}
} else if (OB_STORAGE_FILE == type) {
if (OB_FAIL(root_path.init(file_root_path))) {
LOG_WARN("failed to init file root path", K(ret));
}
} else {
ret = OB_ERROR;
LOG_ERROR("unknown type", K(type));
}
return ret;
}
int TestArchiveCheckpointMgr::get_storage_info(const ObStorageType &type, ObString &storage_info)
{
int ret = OB_SUCCESS;
storage_info.reset();
if (OB_STORAGE_OSS == type) {
storage_info = ObString(oss_storage_info);
} else if (OB_STORAGE_FILE == type) {
storage_info = ObString(file_storage_info);
} else {
ret = OB_ERROR;
LOG_ERROR("unknown type", K(type));
}
return ret;
}
int TestArchiveCheckpointMgr::generate_simple_files(const ObStorageType &type)
{
int ret = OB_SUCCESS;
ObBackupPath root_path;
ObBackupPath tmp_root_path;
ObBackupIoAdapter util;
char buf[] = "test";
int64_t start_ts = ObTimeUtility::current_time();
LOG_INFO("start generate_simple_files", K(type));
if (OB_FAIL(get_root_path(type, root_path))) {
LOG_WARN("failed to get root path", K(ret), K(root_path));
}
//├── test_checkpoint
//│   ├── file1
//│   ├── file2
//│   ├── file3
for (int64_t i = 1; OB_SUCC(ret) && i <= 4; ++i) {
tmp_root_path.reset();
tmp_root_path = root_path;
if (OB_FAIL(tmp_root_path.join_checkpoint_info_file(OB_STR_CHECKPOINT_FILE_NAME, i, ObBackupFileSuffix::ARCHIVE))) {
LOG_WARN("failed to join path", K(ret));
} else if (OB_FAIL(util.mk_parent_dir(tmp_root_path.get_ptr(), &storage_info_))) {
LOG_WARN("failed to mk dir", K(ret));
} else if (OB_FAIL(util.write_single_file(tmp_root_path.get_ptr(), &storage_info_, buf, sizeof(buf)))) {
LOG_WARN("failed to write dir less", K(ret));
}
}
const int64_t cost_ts = ObTimeUtility::current_time() - start_ts;
LOG_INFO("finish prepare simple dir", K(type), K(cost_ts), K(ret));
return ret;
}
int TestArchiveCheckpointMgr::clean_root_dir(const ObStorageType &type)
{
int ret = OB_SUCCESS;
ObBackupPath root_path;
if (OB_FAIL(get_root_path(type, root_path))) {
LOG_WARN("failed to get root path", K(ret), K(root_path));
} else if (OB_FAIL(clean_dir(type, root_path.get_ptr()))) {
LOG_WARN("failed to clean dir", K(ret), K(type), K(root_path));
}
return ret;
}
int TestArchiveCheckpointMgr::clean_dir(const ObStorageType &type, const ObString &dir_uri)
{
int ret = OB_SUCCESS;
ObBackupIoAdapter util;
char path[OB_MAX_BACKUP_PATH_LENGTH] = "";
ObArray <ObString> file_names;
ObArenaAllocator allocator;
char uri_del[OB_MAX_URI_LENGTH];
ObFileListArrayOp op(file_names, allocator);
if (OB_FAIL(util.list_files(dir_uri, &storage_info_, op))) {
LOG_WARN("failed to list files", K(ret), K(dir_uri));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < file_names.count(); ++i) {
const ObString file = file_names.at(i);
if (OB_FAIL(databuff_printf(path, sizeof(path), "%.*s/%.*s",
dir_uri.length(), dir_uri.ptr(), file.length(), file.ptr()))) {
LOG_WARN("failed to join path", K(ret));
} else if (OB_FAIL(util.del_file(path, &storage_info_))) {
LOG_WARN("failed to clean dir", K(ret), K(type), K(path));
}
LOG_INFO("del_file", K(path));
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(util.del_dir(dir_uri, &storage_info_))) {
LOG_WARN("failed to del dir", K(ret), K(dir_uri));
}
LOG_INFO("del_dir", K(dir_uri));
}
return ret;
};
int TestArchiveCheckpointMgr::test_write_and_read_checkpoint(const ObStorageType &type)
{
int ret = OB_SUCCESS;
int64_t start_ts = ObTimeUtility::current_time();
ObBackupPath root_path;
ObBackupPath tmp_root_path;
ObBackupIoAdapter util;
if (OB_FAIL(get_root_path(type, root_path))) {
LOG_WARN("failed to get root path", K(ret), K(root_path));
} else {
ObArchiveCheckpointMgr mgr;
bool is_exist = true;
uint64_t checkpoint = 0;
if (OB_FAIL(mgr.init(root_path, OB_STR_CHECKPOINT_FILE_NAME, ObBackupFileSuffix::ARCHIVE, &storage_info_))) {
LOG_WARN("failed to init checkpoint mgr", K(ret), K(root_path));
} else if (OB_FAIL(mgr.write(10))) {
LOG_WARN("failed to write files", K(ret), K(root_path));
} else if (OB_FAIL(mgr.write(9))) {
LOG_WARN("failed to write files", K(ret), K(root_path));
} else if (OB_FAIL(mgr.read(checkpoint))) {
LOG_WARN("failed to read files", K(ret));
} else if (checkpoint != 10) {
ret = OB_ERROR;
} else if (OB_FAIL(mgr.write(100))) {
LOG_WARN("failed to write files", K(ret), K(root_path));
} else if (OB_FAIL(mgr.write(50))) {
LOG_WARN("failed to write files", K(ret), K(root_path));
} else if (OB_FAIL(mgr.read(checkpoint))) {
LOG_WARN("failed to read files", K(ret));
} else if (checkpoint != 100) {
ret = OB_ERROR;
} else {
tmp_root_path.reset();
tmp_root_path = root_path;
if (OB_FAIL(tmp_root_path.join_checkpoint_info_file(OB_STR_CHECKPOINT_FILE_NAME, 10, ObBackupFileSuffix::ARCHIVE))) {
LOG_WARN("failed to join path", K(ret));
} else if (OB_FAIL(util.is_exist(tmp_root_path.get_ptr(), &storage_info_, is_exist))) {
LOG_WARN("failed to judge file exist", K(ret), K(tmp_root_path));
} else if (is_exist) {
ret = OB_ERROR;
}
tmp_root_path.reset();
tmp_root_path = root_path;
if (OB_SUCC(ret)) {
if (OB_FAIL(tmp_root_path.join_checkpoint_info_file(OB_STR_CHECKPOINT_FILE_NAME, 9, ObBackupFileSuffix::ARCHIVE))) {
LOG_WARN("failed to join path", K(ret));
} else if (OB_FAIL(util.is_exist(tmp_root_path.get_ptr(), &storage_info_, is_exist))) {
LOG_WARN("failed to judge file exist", K(ret), K(tmp_root_path));
} else if (is_exist) {
ret = OB_ERROR;
}
}
tmp_root_path.reset();
tmp_root_path = root_path;
if (OB_SUCC(ret)) {
if (OB_FAIL(tmp_root_path.join_checkpoint_info_file(OB_STR_CHECKPOINT_FILE_NAME, 50, ObBackupFileSuffix::ARCHIVE))) {
LOG_WARN("failed to join path", K(ret));
} else if (OB_FAIL(util.is_exist(tmp_root_path.get_ptr(), &storage_info_, is_exist))) {
LOG_WARN("failed to judge file exist", K(ret), K(tmp_root_path));
} else if (!is_exist) {
ret = OB_ERROR;
}
}
}
}
const int64_t cost_ts = ObTimeUtility::current_time() - start_ts;
LOG_INFO("finish test_list_util", K(type), K(cost_ts), K(ret));
return ret;
}
TEST_F(TestArchiveCheckpointMgr, test_write_read_op)
{
TEST_FUNCTION f = &TestArchiveCheckpointMgr::test_write_and_read_checkpoint;
int ret = run_test_func(f);
ASSERT_EQ(OB_SUCCESS, ret);
}
int main(int argc, char **argv)
{
system("rm -f test_archive_checkpoint_mgr.log");
OB_LOGGER.set_file_name("test_archive_checkpoint_mgr.log");
OB_LOGGER.set_log_level("INFO");
::testing::InitGoogleTest(&argc,argv);
return RUN_ALL_TESTS();
}

View File

@ -0,0 +1,24 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef UNITTEST_BACKUP_OB_ARCHIVE_CHECKPOINT_MGR_CONFIG_H_
#define UNITTEST_BACKUP_OB_ARCHIVE_CHECKPOINT_MGR_CONFIG_H_
// file path use relative path, object path use absolute path
const char *file_root_path = "";
const char *file_storage_info = "";
const char *oss_root_path = "";//"oss://xxx/unittest"
const char *oss_storage_info = "host=xxxx&access_id=xxx&access_key=xxx&delete=xxx";
#endif /* UNITTEST_BACKUP_OB_ARCHIVE_CHECKPOINT_MGR_CONFIG_H_ */