[FEAT MERGE]Added object storage COS as the backup destination

This commit is contained in:
WenJinyu
2023-08-21 07:10:29 +00:00
committed by ob-robot
parent 26d0ec3d60
commit 4144cf1751
45 changed files with 5000 additions and 687 deletions

View File

@ -19,7 +19,8 @@ namespace common
const char *OB_STORAGE_ACCESS_TYPES_STR[] = {"reader", "overwriter", "appender", "random_write"};
ObObjectDevice::ObObjectDevice() : oss_account_(), base_info_(NULL), is_started_(false), lock_(common::ObLatchIds::OBJECT_DEVICE_LOCK)
ObObjectDevice::ObObjectDevice()
: storage_info_(), is_started_(false), lock_(common::ObLatchIds::OBJECT_DEVICE_LOCK)
{
auto attr = SET_USE_500("ObjectDevice");
reader_ctx_pool_.set_attr(attr);
@ -51,7 +52,7 @@ void ObObjectDevice::destroy()
ObObjectDevice::~ObObjectDevice()
{
destroy();
OB_LOG(INFO, "destory the device!", KCSTRING(storage_info_));
OB_LOG(INFO, "destory the device!", KCSTRING(storage_info_str_));
}
/*the app logical use call ObBackupIoAdapter::get_and_init_device*/
@ -70,29 +71,19 @@ int ObObjectDevice::start(const ObIODOpts &opts)
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "fail to start device, args wrong !", KCSTRING(opts.opts_[0].key_), K(ret));
} else {
ObString storage_info(opts.opts_[0].value_.value_str);
if (OB_STORAGE_OSS == device_type_) {
if (OB_FAIL(oss_account_.parse_oss_arg(storage_info))) {
OB_LOG(WARN, "fail to init oss base", K(storage_info) ,K(ret));
}
base_info_ = (void*)&oss_account_;
} else if (OB_STORAGE_FILE == device_type_) {
//do nothing
base_info_ = NULL;
} else {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "Invalid object device type!", K(ret), K(device_type_), KCSTRING(opts.opts_[0].value_.value_str));
if (OB_FAIL(storage_info_.set(device_type_, opts.opts_[0].value_.value_str))) {
OB_LOG(WARN, "failed to build storage_info");
}
if (OB_SUCCESS != ret) {
//mem resource will be free with device destroy
} else if (OB_FAIL(util_.open(base_info_, device_type_))) {
} else if (OB_FAIL(util_.open(&storage_info_))) {
OB_LOG(WARN, "fail to open the util!", K(ret), KCSTRING(opts.opts_[0].value_.value_str));
} else if (OB_FAIL(fd_mng_.init())) {
OB_LOG(WARN, "fail to init fd manager!", K(ret));
} else {
is_started_ = true;
if (OB_FAIL(databuff_printf(storage_info_, OB_MAX_URI_LENGTH, "%s", opts.opts_[0].value_.value_str))) {
if (OB_FAIL(databuff_printf(storage_info_str_, OB_MAX_URI_LENGTH, "%s", opts.opts_[0].value_.value_str))) {
OB_LOG(WARN, "fail to copy str to storage info", K(ret));
}
}
@ -161,7 +152,7 @@ int ObObjectDevice::open_for_reader(const char *pathname, void*& ctx)
ret = OB_ALLOCATE_MEMORY_FAILED;
OB_LOG(WARN, "fail to alloc mem for object device reader! ", K(ret));
} else {
if (OB_FAIL(reader->open(pathname, base_info_))) {
if (OB_FAIL(reader->open(pathname, &storage_info_))) {
OB_LOG(WARN, "fail to open for read!", K(ret));
} else {
ctx = (void*)reader;
@ -184,7 +175,7 @@ int ObObjectDevice::open_for_overwriter(const char *pathname, void*& ctx)
ret = OB_ALLOCATE_MEMORY_FAILED;
OB_LOG(WARN, "fail to alloc mem for object device reader! ", K(ret));
} else {
if (OB_FAIL(overwriter->open(pathname, base_info_))) {
if (OB_FAIL(overwriter->open(pathname, &storage_info_))) {
OB_LOG(WARN, "fail to open for overwrite!", K(ret));
} else {
ctx = (void*)overwriter;
@ -202,16 +193,10 @@ int ObObjectDevice::open_for_appender(const char *pathname, ObIODOpts *opts, voi
int ret = OB_SUCCESS;
ObStorageAppender *appender = NULL;
ObOptValue opt_value;
const char* append_strategy = NULL;
int64_t append_version= -1;
const char* open_mode = NULL;
get_opt_value(opts, "AppendStrategy", append_strategy);
get_opt_value(opts, "AppendVersion", append_version);
get_opt_value(opts, "OpenMode", open_mode);
ObStorageAppender::AppenderParam param;
StorageOpenMode mode = StorageOpenMode::CREATE_OPEN_LOCK;
ObAppendStrategy strategy = ObAppendStrategy::OB_APPEND_USE_OVERRITE;
appender = appender_ctx_pool_.alloc();
if (OB_ISNULL(appender)) {
@ -232,29 +217,9 @@ int ObObjectDevice::open_for_appender(const char *pathname, ObIODOpts *opts, voi
OB_LOG(WARN, "Invalid open mode!", KCSTRING(open_mode), K(ret));
}
if (NULL == append_strategy || 0 == STRCMP(append_strategy, "OB_APPEND_USE_OVERRITE")) {
//just keep the default value
} else if (0 == STRCMP(append_strategy, "OB_APPEND_USE_SLICE_PUT")) {
strategy = ObAppendStrategy::OB_APPEND_USE_SLICE_PUT;
} else {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "Invalid append strategy!", KCSTRING(append_strategy), K(ret));
}
param.strategy_ = strategy;
param.version_param_.version_ = append_version;
if (-1 != append_version) {
param.version_param_.open_object_version_ = true;
if (param.version_param_.version_ <= 0) {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "Invalid strategy version!", K(param.version_param_.version_), K(ret));
}
} else {
param.version_param_.open_object_version_ = false;
}
if (OB_SUCCESS == ret) {
appender->set_open_mode(mode);
if (OB_FAIL(appender->open(pathname, base_info_, param))){
if (OB_FAIL(appender->open(pathname, &storage_info_))){
OB_LOG(WARN, "fail to open the appender!", K(ret));
} else {
ctx = appender;
@ -558,43 +523,11 @@ int ObObjectDevice::pwrite(const ObIOFd &fd, const int64_t offset, const int64_t
return ret;
}
int ObObjectDevice::ob_get_bucket_lifecycle(ObIODOpts &opts)
{
int ret = OB_SUCCESS;
const char* path = NULL;
bool is_set_lifecycle = false;
if (opts.opt_cnt_ != 2) {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "ObGetBucketLifecycle need 3 element at least!", K(opts.opt_cnt_), K(ret));
} else if (0 != STRCMP(opts.opts_[1].key_, "path") || NULL == opts.opts_[1].value_.value_str) {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "ObGetBucketLifecycle, the first para is wrong!", KCSTRING(opts.opts_[1].key_), K(ret));
} else {
path = opts.opts_[1].value_.value_str;
if (OB_FAIL(util_.check_backup_dest_lifecycle(ObString(path), is_set_lifecycle))) {
OB_LOG(WARN, "ObGetBucketLifecycle, the second para is wrong!", K(ret));
} else {
opts.opts_[0].set("get_bucket_lifecycle", is_set_lifecycle);
}
}
return ret;
}
//the first opt is config name & the return value
int ObObjectDevice::get_config(ObIODOpts &opts)
{
int ret = OB_SUCCESS;
if (opts.opt_cnt_ < 1) {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "opts need 1 element at least!", K(opts.opt_cnt_), K(ret));
} else if (0 == STRCMP(opts.opts_[0].key_, "get_bucket_lifecycle")) {
if (OB_FAIL(ob_get_bucket_lifecycle(opts))) {
OB_LOG(WARN, "fail to get bucket life cycle!", K(ret));
}
} else {
ret = OB_NOT_SUPPORTED;
OB_LOG(WARN, "the para is wrong, this interface only support get_bucket_lifecycle now!", K(device_type_), K(ret));
}
UNUSED(opts);
return ret;
}