add -t to support check tagging
This commit is contained in:
1
deps/oblib/src/lib/restore/ob_i_storage.h
vendored
1
deps/oblib/src/lib/restore/ob_i_storage.h
vendored
@ -50,6 +50,7 @@ public:
|
||||
virtual int delete_tmp_files(const common::ObString& dir_path, const common::ObString& storage_info) = 0;
|
||||
virtual int is_empty_directory(
|
||||
const common::ObString& uri, const common::ObString& storage_info, bool& is_empty_directory) = 0;
|
||||
virtual int is_tagging(const common::ObString& uri, const common::ObString& storage_info, bool& is_tagging) = 0;
|
||||
virtual int check_backup_dest_lifecycle(
|
||||
const common::ObString& path, const common::ObString& storage_info, bool& is_set_lifecycle) = 0;
|
||||
virtual int list_directories(const common::ObString& dir_path, const common::ObString& storage_info,
|
||||
|
||||
33
deps/oblib/src/lib/restore/ob_storage.cpp
vendored
33
deps/oblib/src/lib/restore/ob_storage.cpp
vendored
@ -656,6 +656,39 @@ int ObStorageUtil::del_dir(const common::ObString &uri, const common::ObString &
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObStorageUtil::is_tagging(const common::ObString &uri, const common::ObString &storage_info, bool &is_tagging)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const int64_t start_ts = ObTimeUtility::current_time();
|
||||
ObIStorageUtil *util = NULL;
|
||||
is_tagging = false;
|
||||
int64_t retry_times = 0;
|
||||
bool need_retry = true;
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (ObStorageGlobalIns::get_instance().is_io_prohibited()) {
|
||||
ret = OB_BACKUP_IO_PROHIBITED;
|
||||
STORAGE_LOG(WARN, "current observer backup io is prohibited", K(ret), K(uri));
|
||||
} else if (OB_FAIL(get_util(uri, util))) {
|
||||
STORAGE_LOG(WARN, "failed to get util", K(ret), K(uri));
|
||||
} else {
|
||||
while (OB_SUCC(ret) && need_retry) {
|
||||
need_retry = false;
|
||||
if (OB_FAIL(util->is_tagging(uri, storage_info, is_tagging))) {
|
||||
const int64_t cost_ts = ObTimeUtility::current_time() - start_ts;
|
||||
STORAGE_LOG(WARN, "failed to check is tagging", K(ret), K(cost_ts), K(retry_times), K(uri));
|
||||
if (cost_ts < max_retry_duraion_us_) {
|
||||
usleep(retry_sleep_us_);
|
||||
++retry_times;
|
||||
need_retry = true;
|
||||
ret = OB_SUCCESS;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
print_access_storage_log("is_tagging", uri, start_ts);
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObStorageUtil::get_pkeys_from_dir(
|
||||
const common::ObString &uri, const common::ObString &storage_info, common::ObIArray<common::ObPartitionKey> &pkeys)
|
||||
{
|
||||
|
||||
1
deps/oblib/src/lib/restore/ob_storage.h
vendored
1
deps/oblib/src/lib/restore/ob_storage.h
vendored
@ -126,6 +126,7 @@ public:
|
||||
int del_dir(const common::ObString& uri, const common::ObString& storage_info);
|
||||
int get_pkeys_from_dir(const common::ObString& uri, const common::ObString& storage_info,
|
||||
common::ObIArray<common::ObPartitionKey>& pkeys);
|
||||
int is_tagging(const common::ObString& uri, const common::ObString& storage_info, bool& is_tagging);
|
||||
// uri is directory
|
||||
int delete_tmp_files(const common::ObString& uri, const common::ObString& storage_info);
|
||||
int is_empty_directory(const common::ObString& uri, const common::ObString& storage_info, bool& is_empty_directory);
|
||||
|
||||
2161
deps/oblib/src/lib/restore/ob_storage_cos.cpp
vendored
Normal file
2161
deps/oblib/src/lib/restore/ob_storage_cos.cpp
vendored
Normal file
File diff suppressed because it is too large
Load Diff
534
deps/oblib/src/lib/restore/ob_storage_cos.h
vendored
Normal file
534
deps/oblib/src/lib/restore/ob_storage_cos.h
vendored
Normal file
@ -0,0 +1,534 @@
|
||||
/**
|
||||
* Copyright (c) 2021 OceanBase
|
||||
* OceanBase CE is licensed under Mulan PubL v2.
|
||||
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
||||
* You may obtain a copy of Mulan PubL v2 at:
|
||||
* http://license.coscl.org.cn/MulanPubL-2.0
|
||||
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
||||
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
||||
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
||||
* See the Mulan PubL v2 for more details.
|
||||
*/
|
||||
|
||||
#ifndef SRC_LIBRARY_SRC_LIB_RESTORE_OB_STORAGE_COS_H_
|
||||
#define SRC_LIBRARY_SRC_LIB_RESTORE_OB_STORAGE_COS_H_
|
||||
|
||||
#include "lib/allocator/page_arena.h"
|
||||
#include "lib/container/ob_array.h"
|
||||
#include "ob_i_storage.h"
|
||||
#include "cos/ob_storage_cos_obj.h"
|
||||
|
||||
namespace oceanbase {
|
||||
namespace common {
|
||||
|
||||
// Before using cos, you need to initialize cos enviroment.
|
||||
// Thread safe guaranteed by user.
|
||||
int init_cos_env();
|
||||
|
||||
// You need to clean cos resource when not use cos any more.
|
||||
// Thread safe guaranteed by user.
|
||||
void fin_cos_env();
|
||||
|
||||
class ObCosUtil : public ObIStorageUtil {
|
||||
public:
|
||||
ObCosUtil()
|
||||
{}
|
||||
virtual ~ObCosUtil()
|
||||
{}
|
||||
|
||||
int is_exist(const ObString &uri, const ObString &storage_info, bool &exist) override;
|
||||
|
||||
int get_file_length(const ObString &uri, const ObString &storage_info, int64_t &file_length) override;
|
||||
|
||||
int write_single_file(
|
||||
const ObString &uri, const ObString &storage_info, const char *buf, const int64_t size) override;
|
||||
|
||||
// cos no dir
|
||||
int mkdir(const ObString &uri, const ObString &storage_info) override;
|
||||
|
||||
int del_file(const ObString &uri, const ObString &storage_info) override;
|
||||
|
||||
int update_file_modify_time(const ObString &uri, const ObString &storage_info) override;
|
||||
|
||||
int list_files(const ObString &dir_path, const ObString &storage_info, common::ObIAllocator &allocator,
|
||||
common::ObIArray<ObString> &file_names) override;
|
||||
|
||||
int del_dir(const ObString &uri, const ObString &storage_info) override;
|
||||
|
||||
int get_pkeys_from_dir(
|
||||
const ObString &dir_path, const ObString &storage_info, common::ObIArray<common::ObPartitionKey> &pkeys) override;
|
||||
|
||||
int delete_tmp_files(const ObString &dir_path, const ObString &storage_info) override;
|
||||
|
||||
int is_empty_directory(
|
||||
const common::ObString &uri, const common::ObString &storage_info, bool &is_empty_directory) override;
|
||||
int is_tagging(const common::ObString &uri, const common::ObString &storage_info, bool &is_tagging) override;
|
||||
int get_file_meta(const ObString &uri, const ObString &storage_info, bool &exist, qcloud_cos::CosObjectMeta &meta);
|
||||
|
||||
int check_backup_dest_lifecycle(
|
||||
const common::ObString &dir_path, const common::ObString &storage_info, bool &is_set_lifecycle) override;
|
||||
|
||||
int list_directories(const ObString &dir_path, const ObString &storage_info, common::ObIAllocator &allocator,
|
||||
common::ObIArray<ObString> &directory_names) override;
|
||||
};
|
||||
|
||||
// Random accesss object
|
||||
class ObCosRandomAccessObject : public ObIStorageReader {
|
||||
public:
|
||||
ObCosRandomAccessObject();
|
||||
|
||||
virtual ~ObCosRandomAccessObject()
|
||||
{}
|
||||
|
||||
int open(const ObString &uri, const ObString &storage_info) override;
|
||||
|
||||
int pread(char *buf, const int64_t buf_size, int64_t offset, int64_t &read_size) override;
|
||||
|
||||
int close() override;
|
||||
|
||||
int64_t get_length() const override
|
||||
{
|
||||
return meta_.file_length;
|
||||
}
|
||||
|
||||
bool is_opened() const override
|
||||
{
|
||||
return is_opened_;
|
||||
}
|
||||
|
||||
private:
|
||||
// basic cos object
|
||||
ObCosObject obj_;
|
||||
qcloud_cos::CosObjectMeta meta_;
|
||||
bool is_opened_;
|
||||
|
||||
DISALLOW_COPY_AND_ASSIGN(ObCosRandomAccessObject);
|
||||
};
|
||||
|
||||
// Over write object
|
||||
class ObCosOverWriteObject : public ObIStorageWriter {
|
||||
public:
|
||||
ObCosOverWriteObject();
|
||||
virtual ~ObCosOverWriteObject()
|
||||
{}
|
||||
|
||||
int open(const ObString &uri, const ObString &storage_info) override;
|
||||
|
||||
// append write, make sure no concurrent writer.
|
||||
int write(const char *buf, const int64_t size) override;
|
||||
|
||||
int pwrite(const char *buf, const int64_t size, const int64_t offset) override;
|
||||
|
||||
int close() override;
|
||||
|
||||
// Return the amount of written data after open.
|
||||
// The returned value is undefined if have not opened before.
|
||||
int64_t get_length() const override
|
||||
{
|
||||
return file_length_;
|
||||
}
|
||||
|
||||
bool is_opened() const override
|
||||
{
|
||||
return is_opened_;
|
||||
}
|
||||
|
||||
// The returned value is undefined if have not opened before.
|
||||
int64_t get_object_size() const
|
||||
{
|
||||
return object_size_;
|
||||
}
|
||||
|
||||
private:
|
||||
// get the whole object from cos.
|
||||
int get_(char *buf, const int64_t buf_size);
|
||||
|
||||
inline const ObString &bucket_name_string_() const
|
||||
{
|
||||
return obj_.bucket_name_string();
|
||||
}
|
||||
|
||||
inline const ObString &object_name_string_() const
|
||||
{
|
||||
return obj_.object_name_string();
|
||||
}
|
||||
|
||||
private:
|
||||
// basic cos object
|
||||
ObCosObject obj_;
|
||||
// TODO: to rename to data_written_amount
|
||||
// The current value represents the amount of data written between open and close.
|
||||
int64_t file_length_;
|
||||
// The total object size in cos.
|
||||
int64_t object_size_;
|
||||
bool is_opened_;
|
||||
|
||||
DISALLOW_COPY_AND_ASSIGN(ObCosOverWriteObject);
|
||||
};
|
||||
|
||||
class ObCosMetaMgr : public ObIStorageMetaWrapper {
|
||||
public:
|
||||
ObCosMetaMgr()
|
||||
{}
|
||||
virtual ~ObCosMetaMgr()
|
||||
{}
|
||||
|
||||
int get(const common::ObString &uri, const common::ObString &storage_info, char *buf, const int64_t buf_size,
|
||||
int64_t &read_size);
|
||||
|
||||
int set(const common::ObString &uri, const common::ObString &storage_info, const char *buf, const int64_t buf_size);
|
||||
|
||||
private:
|
||||
DISALLOW_COPY_AND_ASSIGN(ObCosMetaMgr);
|
||||
};
|
||||
|
||||
// ObCosSlice
|
||||
class ObCosSlice {
|
||||
public:
|
||||
enum Mask {
|
||||
OB_COS_SLICE_INVALID_MASK = 0,
|
||||
OB_COS_SLICE_ID_MASK = 0x01,
|
||||
OB_COS_MULTI_VERSION_MASK = 0x02, // multi-version slice
|
||||
};
|
||||
|
||||
struct Option {
|
||||
int64_t mask;
|
||||
int64_t sliceid;
|
||||
int64_t version;
|
||||
|
||||
inline bool is_multi_version_slice() const
|
||||
{
|
||||
return 0 != (mask & Mask::OB_COS_MULTI_VERSION_MASK);
|
||||
}
|
||||
|
||||
TO_STRING_KV(K(mask), K(sliceid), K(version));
|
||||
};
|
||||
|
||||
// slice id marker in slice name.
|
||||
static constexpr const char *COS_SLICE_MARKER = "@part@_";
|
||||
// version marker in slice name.
|
||||
static constexpr const char *COS_VERSION_MAKER = "@version@_";
|
||||
// Concatenation to connect the slice id part and version part to a slice name.
|
||||
static constexpr const char *COS_SLICE_CONCATENATION = "-";
|
||||
|
||||
ObCosSlice(const Option &option);
|
||||
|
||||
~ObCosSlice()
|
||||
{}
|
||||
|
||||
// A valid slice name must include slice id, version is optional.
|
||||
static bool is_a_valid_slice_name(const ObString &path, bool &only);
|
||||
|
||||
// Parse slice id and version from slice name.
|
||||
static int parse_slice_name(const ObString &path, ObCosSlice::Option &option);
|
||||
|
||||
static int get_container_name(const ObString &slice_name, ObString &container_name, common::ObIAllocator &allocator);
|
||||
|
||||
inline bool is_valid() const
|
||||
{
|
||||
return 0 != (option_.mask & Mask::OB_COS_SLICE_ID_MASK) && 0 < option_.sliceid;
|
||||
}
|
||||
|
||||
inline bool is_multi_version_slice() const
|
||||
{
|
||||
return option_.is_multi_version_slice();
|
||||
}
|
||||
|
||||
int build_slice_name(char *buffer, int64_t buff_size);
|
||||
|
||||
TO_STRING_KV(K_(option));
|
||||
|
||||
private:
|
||||
Option option_;
|
||||
|
||||
DISALLOW_COPY_AND_ASSIGN(ObCosSlice);
|
||||
};
|
||||
|
||||
// ObCosContainer
|
||||
class ObCosContainer : public ObCosBase {
|
||||
public:
|
||||
struct Option {
|
||||
int64_t version;
|
||||
bool open_version = false;
|
||||
|
||||
// threshold used to switch to next slice.
|
||||
int64_t threshold = 0; // default 0
|
||||
|
||||
TO_STRING_KV(K(version), K(open_version), K(threshold));
|
||||
};
|
||||
|
||||
ObCosContainer() : ObCosBase()
|
||||
{}
|
||||
|
||||
~ObCosContainer()
|
||||
{}
|
||||
|
||||
static inline int64_t get_start_slice_id()
|
||||
{
|
||||
// slice id start from 1.
|
||||
return 1LL;
|
||||
}
|
||||
|
||||
static inline int64_t generate_next_slice_id(int64_t current_slice_id)
|
||||
{
|
||||
return current_slice_id + 1;
|
||||
}
|
||||
|
||||
int build_bucket_and_container_name(const ObString &uri)
|
||||
{
|
||||
return build_bucket_and_object_name(uri);
|
||||
}
|
||||
|
||||
const ObString &container_name_string() const
|
||||
{
|
||||
return object_name_string();
|
||||
}
|
||||
|
||||
const qcloud_cos::CosStringBuffer &container_name() const
|
||||
{
|
||||
return object_name();
|
||||
}
|
||||
|
||||
using ObCosBase::head_meta;
|
||||
|
||||
// Query container meta, returned the slices and each slice length.
|
||||
int head_meta(bool &is_exist, qcloud_cos::CosObjectMeta &meta, common::ObIAllocator &allocator,
|
||||
common::ObIArray<ObString> &slice_names, common::ObIArray<int64_t> &slice_lengths);
|
||||
|
||||
int head_meta(bool &is_exist, qcloud_cos::CosObjectMeta &meta) override;
|
||||
|
||||
using ObCosBase::del;
|
||||
|
||||
// delete all slices in container
|
||||
int del(int64_t &deleted_cnt) override;
|
||||
|
||||
// list all slices in container
|
||||
int list_slices(bool &is_exist, common::ObIAllocator &allocator, common::ObIArray<ObString> &slice_names,
|
||||
common::ObIArray<int64_t> &slice_lengths);
|
||||
|
||||
// return max slice id in container
|
||||
int find_max_slice_option(const int64_t version, bool &is_exist, int64_t &container_size,
|
||||
ObCosSlice::Option &max_slice_option, int64_t &last_slice_size);
|
||||
|
||||
// let slice number in container not too much, limit 800
|
||||
static const int64_t MAX_SLICE_ID = 800;
|
||||
|
||||
private:
|
||||
DISALLOW_COPY_AND_ASSIGN(ObCosContainer);
|
||||
};
|
||||
|
||||
class ObCosWritableContainer : public ObIStorageWriter {
|
||||
public:
|
||||
ObCosWritableContainer();
|
||||
|
||||
ObCosWritableContainer(const ObCosContainer::Option &option);
|
||||
|
||||
virtual ~ObCosWritableContainer()
|
||||
{}
|
||||
|
||||
void set_option(const ObCosContainer::Option &option)
|
||||
{
|
||||
option_ = option;
|
||||
}
|
||||
|
||||
int open(const ObString &uri, const ObString &storage_info) override;
|
||||
|
||||
// append write
|
||||
int write(const char *buf, const int64_t size) override;
|
||||
|
||||
int pwrite(const char *buf, const int64_t size, const int64_t offset) override;
|
||||
|
||||
int close() override;
|
||||
|
||||
int64_t get_length() const override
|
||||
{
|
||||
return length_;
|
||||
}
|
||||
|
||||
bool is_opened() const override
|
||||
{
|
||||
return is_opened_;
|
||||
}
|
||||
|
||||
private:
|
||||
int64_t get_slice_mask() const
|
||||
{
|
||||
int64_t mask = ObCosSlice::Mask::OB_COS_SLICE_INVALID_MASK;
|
||||
mask |= ObCosSlice::Mask::OB_COS_SLICE_ID_MASK;
|
||||
if (option_.open_version) {
|
||||
mask |= ObCosSlice::Mask::OB_COS_MULTI_VERSION_MASK;
|
||||
}
|
||||
|
||||
return mask;
|
||||
}
|
||||
|
||||
int open_current_slice();
|
||||
|
||||
int build_current_slice_name(char *slice_name_buff, int32_t buff_size);
|
||||
|
||||
int switch_to_next_slice(const int64_t size);
|
||||
|
||||
bool need_switch_next_slice(const int64_t size) const;
|
||||
|
||||
const ObString &container_name_string() const
|
||||
{
|
||||
return container_.container_name_string();
|
||||
}
|
||||
|
||||
const ObString &bucket_name_string() const
|
||||
{
|
||||
return container_.bucket_name_string();
|
||||
}
|
||||
|
||||
private:
|
||||
ObCosContainer::Option option_;
|
||||
bool is_opened_;
|
||||
int64_t length_;
|
||||
int64_t container_size_;
|
||||
int64_t last_slice_size_;
|
||||
ObCosContainer container_;
|
||||
ObCosOverWriteObject overwrite_obj_;
|
||||
int64_t current_slice_id_;
|
||||
// record the uri and storage info when open object.
|
||||
ObString uri_;
|
||||
ObString storage_info_;
|
||||
ObArenaAllocator allocator_;
|
||||
|
||||
DISALLOW_COPY_AND_ASSIGN(ObCosWritableContainer);
|
||||
};
|
||||
|
||||
// ObCosRandomAccessContainer
|
||||
class ObCosRandomAccessContainer : public ObIStorageReader {
|
||||
public:
|
||||
ObCosRandomAccessContainer();
|
||||
|
||||
virtual ~ObCosRandomAccessContainer()
|
||||
{}
|
||||
|
||||
int open(const ObString &uri, const ObString &storage_info) override;
|
||||
|
||||
int pread(char *buf, const int64_t buf_size, int64_t offset, int64_t &read_size) override;
|
||||
|
||||
int close() override;
|
||||
|
||||
int64_t get_length() const override
|
||||
{
|
||||
return meta_.file_length;
|
||||
}
|
||||
|
||||
bool is_opened() const override
|
||||
{
|
||||
return is_opened_;
|
||||
}
|
||||
|
||||
private:
|
||||
int build_slice_name(int64_t slice_idx, char *slice_name_buff, int32_t buff_size);
|
||||
|
||||
int pread_from_slice(int64_t slice_idx, char *buf, const int64_t buf_size, int64_t offset, int64_t &read_size);
|
||||
|
||||
private:
|
||||
ObCosContainer container_;
|
||||
bool is_opened_;
|
||||
qcloud_cos::CosObjectMeta meta_;
|
||||
ObArenaAllocator allocator_;
|
||||
ObArray<ObString> slice_names_array_;
|
||||
ObArray<int64_t> slice_lengths_array_;
|
||||
// record the uri and storage info when open object.
|
||||
ObString uri_;
|
||||
ObString storage_info_;
|
||||
|
||||
DISALLOW_COPY_AND_ASSIGN(ObCosRandomAccessContainer);
|
||||
};
|
||||
|
||||
// For a uri is either an object or container, however we do not it.
|
||||
// ObCosRandomAccessReader is actually acted as an adapter. It will
|
||||
// check which type the uri is, and then decides the corresponding reader.
|
||||
class ObCosRandomAccessReader : public ObIStorageReader {
|
||||
public:
|
||||
ObCosRandomAccessReader();
|
||||
|
||||
virtual ~ObCosRandomAccessReader()
|
||||
{}
|
||||
|
||||
int open(const ObString &uri, const ObString &storage_info) override;
|
||||
|
||||
int pread(char *buf, const int64_t buf_size, int64_t offset, int64_t &read_size) override;
|
||||
|
||||
int close() override;
|
||||
|
||||
int64_t get_length() const override
|
||||
{
|
||||
int64_t length = -1;
|
||||
if (NULL != reader_) {
|
||||
length = reader_->get_length();
|
||||
}
|
||||
return length;
|
||||
}
|
||||
|
||||
bool is_opened() const override
|
||||
{
|
||||
return NULL != reader_;
|
||||
}
|
||||
|
||||
private:
|
||||
ObCosRandomAccessContainer random_access_container_;
|
||||
ObCosRandomAccessObject random_access_object_;
|
||||
ObIStorageReader *reader_;
|
||||
};
|
||||
|
||||
// Over write object
|
||||
class ObCosAppender : public ObIStorageWriter {
|
||||
public:
|
||||
ObCosAppender();
|
||||
virtual ~ObCosAppender()
|
||||
{}
|
||||
|
||||
int open(const ObString &uri, const ObString &storage_info) override;
|
||||
|
||||
// append write, make sure no concurrent writer.
|
||||
int write(const char *buf, const int64_t size) override;
|
||||
|
||||
int pwrite(const char *buf, const int64_t size, const int64_t offset) override;
|
||||
|
||||
int close() override;
|
||||
|
||||
// Return the amount of written data after open.
|
||||
// The returned value is undefined if have not opened before.
|
||||
int64_t get_length() const override
|
||||
{
|
||||
int64_t length = -1;
|
||||
if (NULL != writer_) {
|
||||
length = writer_->get_length();
|
||||
}
|
||||
return length;
|
||||
}
|
||||
|
||||
bool is_opened() const override
|
||||
{
|
||||
return NULL != writer_;
|
||||
}
|
||||
|
||||
void set_obj_type(ObCosObjectType type)
|
||||
{
|
||||
obj_type_ = type;
|
||||
}
|
||||
|
||||
void set_container_option(ObCosContainer::Option option)
|
||||
{
|
||||
writable_container_.set_option(option);
|
||||
}
|
||||
|
||||
private:
|
||||
ObCosWritableContainer writable_container_;
|
||||
ObCosOverWriteObject overwrite_obj_;
|
||||
ObIStorageWriter *writer_;
|
||||
|
||||
ObCosObjectType obj_type_;
|
||||
|
||||
DISALLOW_COPY_AND_ASSIGN(ObCosAppender);
|
||||
};
|
||||
|
||||
} // namespace common
|
||||
} // namespace oceanbase
|
||||
|
||||
#endif
|
||||
14
deps/oblib/src/lib/restore/ob_storage_file.cpp
vendored
14
deps/oblib/src/lib/restore/ob_storage_file.cpp
vendored
@ -128,6 +128,20 @@ int ObStorageFileUtil::is_exist(const common::ObString& uri, const common::ObStr
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObStorageFileUtil::is_tagging(const common::ObString &uri, const common::ObString &storage_info, bool &is_tagging)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
is_tagging = false;
|
||||
UNUSED(storage_info);
|
||||
if (uri.empty()) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
STORAGE_LOG(WARN, "invalid args", K(ret), K(uri));
|
||||
} else {
|
||||
is_tagging = false;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObStorageFileUtil::get_file_length(
|
||||
const common::ObString& uri, const common::ObString& storage_info, int64_t& file_length)
|
||||
{
|
||||
|
||||
1
deps/oblib/src/lib/restore/ob_storage_file.h
vendored
1
deps/oblib/src/lib/restore/ob_storage_file.h
vendored
@ -36,6 +36,7 @@ public:
|
||||
virtual int delete_tmp_files(const common::ObString& dir_path, const common::ObString& storage_info);
|
||||
virtual int is_empty_directory(
|
||||
const common::ObString& uri, const common::ObString& storage_info, bool& is_empty_directory);
|
||||
virtual int is_tagging(const common::ObString& uri, const common::ObString& storage_info, bool& is_tagging);
|
||||
virtual int check_backup_dest_lifecycle(
|
||||
const common::ObString& dir_path, const common::ObString& storage_info, bool& is_set_lifecycle);
|
||||
virtual int list_directories(const common::ObString& uri, const common::ObString& storage_info,
|
||||
|
||||
@ -22,7 +22,6 @@
|
||||
|
||||
namespace oceanbase {
|
||||
using namespace common;
|
||||
|
||||
namespace common {
|
||||
|
||||
ObStorageOssStaticVar::ObStorageOssStaticVar() : compressor_(NULL), compress_type_(INVALID_COMPRESSOR)
|
||||
@ -1311,6 +1310,57 @@ int ObStorageOssUtil::tagging_object(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObStorageOssUtil::is_tagging(const common::ObString &uri, const common::ObString &storage_info, bool &is_tagging)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const int64_t OB_MAX_TAGGING_STR_LENGTH = 16;
|
||||
common::ObArenaAllocator allocator;
|
||||
ObString bucket_str;
|
||||
ObString object_str;
|
||||
if (uri.empty()) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
OB_LOG(WARN, "name is empty", K(ret));
|
||||
} else if (OB_FAIL(init(storage_info))) {
|
||||
OB_LOG(WARN, "failed to init storage_info", K(ret), K(storage_info), K(uri));
|
||||
} else if (OB_FAIL(get_bucket_object_name(uri, bucket_str, object_str, allocator))) {
|
||||
OB_LOG(WARN, "bucket or object name is empty", K(ret), K(uri), K(bucket_str), K(object_str));
|
||||
} else {
|
||||
aos_string_t bucket;
|
||||
aos_string_t object;
|
||||
aos_table_t *head_resp_headers = NULL;
|
||||
aos_list_t tag_list;
|
||||
oss_tag_content_t *b;
|
||||
aos_status_t *aos_ret = NULL;
|
||||
aos_str_set(&bucket, bucket_str.ptr());
|
||||
aos_str_set(&object, object_str.ptr());
|
||||
aos_list_init(&tag_list);
|
||||
if (OB_ISNULL(aos_ret = oss_get_object_tagging(oss_option_, &bucket, &object, &tag_list, &head_resp_headers)) ||
|
||||
!aos_status_is_ok(aos_ret)) {
|
||||
ret = OB_OSS_ERROR;
|
||||
OB_LOG(WARN, "get object tag fail", K(ret), K(uri));
|
||||
print_oss_info(aos_ret);
|
||||
} else {
|
||||
aos_list_for_each_entry(oss_tag_content_t, b, &tag_list, node)
|
||||
{
|
||||
char key_str[OB_MAX_TAGGING_STR_LENGTH];
|
||||
char value_str[OB_MAX_TAGGING_STR_LENGTH];
|
||||
if (OB_FAIL(databuff_printf(key_str, OB_MAX_TAGGING_STR_LENGTH, "%.*s", b->key.len, b->key.data))) {
|
||||
OB_LOG(WARN, "failed to databuff printf key str", K(ret));
|
||||
} else if (OB_FAIL(databuff_printf(value_str, OB_MAX_TAGGING_STR_LENGTH, "%.*s", b->key.len, b->value.data))) {
|
||||
OB_LOG(WARN, "failed to databuff printf value str", K(ret));
|
||||
} else if (0 == strcmp("delete_mode", key_str) && 0 == strcmp("tagging", value_str)) {
|
||||
is_tagging = true;
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
reset();
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObStorageOssUtil::del_file(const common::ObString& uri, const common::ObString& storage_info)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -1728,8 +1778,9 @@ int ObStorageOssUtil::is_empty_directory(
|
||||
}
|
||||
}
|
||||
|
||||
// 最后重置,保证该接口可重复调用
|
||||
|
||||
OB_LOG(DEBUG, "is empty directory", K(dir_path));
|
||||
reset();
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -1994,7 +2045,6 @@ int ObStorageOssAppendWriter::close()
|
||||
OB_LOG(WARN, "oss writer cannot close before it is opened");
|
||||
} else {
|
||||
is_opened_ = false;
|
||||
// 释放内存
|
||||
allocator_.clear();
|
||||
|
||||
reset();
|
||||
|
||||
@ -205,6 +205,7 @@ public:
|
||||
virtual int delete_tmp_files(const common::ObString& dir_path, const common::ObString& storage_info);
|
||||
virtual int is_empty_directory(
|
||||
const common::ObString& uri, const common::ObString& storage_info, bool& is_empty_directory);
|
||||
virtual int is_tagging(const common::ObString& uri, const common::ObString& storage_info, bool& is_tagging);
|
||||
virtual int check_backup_dest_lifecycle(
|
||||
const common::ObString& path, const common::ObString& storage_info, bool& is_set_lifecycle);
|
||||
virtual int list_directories(const common::ObString& uri, const common::ObString& storage_info,
|
||||
|
||||
@ -528,7 +528,8 @@ ObAdminDumpBackupDataExecutor::ObAdminDumpBackupDataExecutor()
|
||||
is_quiet_(false),
|
||||
offset_(0),
|
||||
data_length_(0),
|
||||
check_exist_(false)
|
||||
check_exist_(false),
|
||||
check_tagging_(false)
|
||||
{
|
||||
MEMSET(data_path_, 0, common::OB_MAX_URI_LENGTH);
|
||||
MEMSET(storage_info_, 0, OB_MAX_BACKUP_STORAGE_INFO_LENGTH);
|
||||
@ -637,7 +638,7 @@ int ObAdminDumpBackupDataExecutor::parse_cmd(int argc, char *argv[])
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
int opt = 0;
|
||||
const char* opt_string = "hd:s:o:l:f:qc";
|
||||
const char* opt_string = "hd:s:o:l:f:qct";
|
||||
|
||||
struct option longopts[] = {
|
||||
// commands
|
||||
@ -649,7 +650,8 @@ int ObAdminDumpBackupDataExecutor::parse_cmd(int argc, char *argv[])
|
||||
{ "data_length", 1, NULL, 'l'},
|
||||
{ "file_uri", 1, NULL , 'f'},
|
||||
{ "quiet", 0, NULL, 'q' },
|
||||
{ "check_exist", 0, NULL, 'c'}
|
||||
{ "check_exist", 0, NULL, 'c'},
|
||||
{ "check_tagging", 0, NULL, 't'},
|
||||
};
|
||||
|
||||
int index = -1;
|
||||
@ -683,6 +685,10 @@ int ObAdminDumpBackupDataExecutor::parse_cmd(int argc, char *argv[])
|
||||
check_exist_ = true;
|
||||
break;
|
||||
}
|
||||
case 't': {
|
||||
check_tagging_ = true;
|
||||
break;
|
||||
}
|
||||
case 'f': {
|
||||
char current_absolute_path[MAX_PATH_SIZE] = "";
|
||||
if (optarg[0] != '/') {
|
||||
@ -697,7 +703,6 @@ int ObAdminDumpBackupDataExecutor::parse_cmd(int argc, char *argv[])
|
||||
current_absolute_path[length]= '/';
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(databuff_printf(data_path_, sizeof(data_path_), "%s%s%s", OB_FILE_PREFIX, current_absolute_path, optarg))) {
|
||||
STORAGE_LOG(WARN, "failed to printf file uri", K(ret), K(optarg));
|
||||
@ -1643,6 +1648,7 @@ void ObAdminDumpBackupDataExecutor::print_usage()
|
||||
#endif
|
||||
printf(HELP_FMT, "-q,--quiet", "log level: ERROR");
|
||||
printf(HELP_FMT, "-c,--check_exist", "check file is exist or not");
|
||||
printf(HELP_FMT, "-t,--check_tagging", "check file is tagging or not");
|
||||
printf("samples:\n");
|
||||
printf(" dump meta: \n");
|
||||
printf("\tob_admin dump_backup -dfile:///home/admin/backup_info \n");
|
||||
@ -1683,9 +1689,10 @@ int ObAdminDumpBackupDataExecutor::check_exist(const char *data_path, const char
|
||||
bool exist = false;
|
||||
const char *uri_str = "uri";
|
||||
const char *is_exist_str = "is_exist";
|
||||
const char *is_tagging_str = "is_tagging";
|
||||
size_t data_path_len = strlen(data_path);
|
||||
char dir_data_path[common::OB_MAX_URI_LENGTH + 1] = {0};
|
||||
|
||||
bool tagging = false;
|
||||
if (data_path_len <= 0 || data_path[data_path_len - 1] == '/') {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
STORAGE_LOG(ERROR, "uri format not right", K(ret));
|
||||
@ -1702,6 +1709,15 @@ int ObAdminDumpBackupDataExecutor::check_exist(const char *data_path, const char
|
||||
if (exist) {
|
||||
PrintHelper::print_dump_line(uri_str, data_path);
|
||||
PrintHelper::print_dump_line(is_exist_str, "true");
|
||||
if (check_tagging_) {
|
||||
if (OB_FAIL(util.is_tagging(data_path, storage_info, tagging))) {
|
||||
STORAGE_LOG(WARN, "failed to check is_tagging", K(ret), K(data_path));
|
||||
} else if (tagging) {
|
||||
PrintHelper::print_dump_line(is_tagging_str, "true");
|
||||
} else {
|
||||
PrintHelper::print_dump_line(is_tagging_str, "false");
|
||||
}
|
||||
}
|
||||
} else if (OB_FAIL(databuff_printf(dir_data_path, sizeof(dir_data_path), "%s/", data_path))) {
|
||||
STORAGE_LOG(WARN, "failed to printf file uri", K(ret), K(optarg));
|
||||
} else {
|
||||
@ -1716,9 +1732,12 @@ int ObAdminDumpBackupDataExecutor::check_exist(const char *data_path, const char
|
||||
PrintHelper::print_dump_line(uri_str, data_path);
|
||||
PrintHelper::print_dump_line(is_exist_str, "true");
|
||||
}
|
||||
PrintHelper::print_end_line();
|
||||
if (check_tagging_) {
|
||||
PrintHelper::print_dump_line(is_tagging_str, "false");
|
||||
}
|
||||
}
|
||||
}
|
||||
PrintHelper::print_end_line();
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -179,6 +179,7 @@ private:
|
||||
int64_t offset_;
|
||||
int64_t data_length_;
|
||||
bool check_exist_;
|
||||
bool check_tagging_;
|
||||
};
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user