diff --git a/deps/oblib/src/lib/restore/cos/ob_cos_wrapper_handle.cpp b/deps/oblib/src/lib/restore/cos/ob_cos_wrapper_handle.cpp index 13706e8802..ca974d672c 100644 --- a/deps/oblib/src/lib/restore/cos/ob_cos_wrapper_handle.cpp +++ b/deps/oblib/src/lib/restore/cos/ob_cos_wrapper_handle.cpp @@ -102,21 +102,38 @@ void ObCosWrapperHandle::reset() delete_mode_ = ObIStorageUtil::DELETE; } +int create_cos_handle( + ObCosMemAllocator &allocator, + const qcloud_cos::ObCosAccount &cos_account, + const bool check_md5, + qcloud_cos::ObCosWrapper::Handle *&handle) +{ + int ret = OB_SUCCESS; + handle = nullptr; + qcloud_cos::OB_COS_customMem cos_mem = {ob_cos_malloc, ob_cos_free, &allocator}; + if (OB_FAIL(qcloud_cos::ObCosWrapper::create_cos_handle(cos_mem, cos_account, + check_md5, &handle))) { + OB_LOG(WARN, "failed to create tmp cos handle", K(ret)); + } else if (OB_ISNULL(handle)) { + ret = OB_COS_ERROR; + OB_LOG(WARN, "create tmp handle succeed, but returned handle is null", K(ret)); + } + + return ret; +} + int ObCosWrapperHandle::create_cos_handle(const bool check_md5) { int ret = OB_SUCCESS; - qcloud_cos::OB_COS_customMem cos_mem = {ob_cos_malloc, ob_cos_free, &allocator_}; if (OB_NOT_NULL(handle_)) { ret = OB_ERR_UNEXPECTED; OB_LOG(WARN, "handle is not null", K(ret)); - } else if (OB_FAIL(qcloud_cos::ObCosWrapper::create_cos_handle(cos_mem, cos_account_, - check_md5, &handle_))) { + } else if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + OB_LOG(WARN, "handle is not inited", K(ret)); + } else if (OB_FAIL(common::create_cos_handle(allocator_, cos_account_, check_md5, handle_))) { OB_LOG(WARN, "failed to create cos handle", K(ret)); - } else if (OB_ISNULL(handle_)) { - ret = OB_COS_ERROR; - OB_LOG(WARN, "create handle succeed, but returned handle is null", K(ret)); } - return ret; } diff --git a/deps/oblib/src/lib/restore/cos/ob_cos_wrapper_handle.h b/deps/oblib/src/lib/restore/cos/ob_cos_wrapper_handle.h index 5b6b955f27..fc06cf3cbb 100644 --- a/deps/oblib/src/lib/restore/cos/ob_cos_wrapper_handle.h +++ b/deps/oblib/src/lib/restore/cos/ob_cos_wrapper_handle.h @@ -39,6 +39,14 @@ private: ObArenaAllocator allocator_; }; +// Create a temporary cos_handle object, +// utilizing the 'allocator' to allocate the necessary memory. +int create_cos_handle( + ObCosMemAllocator &allocator, + const qcloud_cos::ObCosAccount &cos_account, + const bool check_md5, + qcloud_cos::ObCosWrapper::Handle *&handle); + class ObCosWrapperHandle { public: @@ -52,8 +60,9 @@ public: qcloud_cos::ObCosWrapper::Handle *get_ptr() { return handle_; } int build_bucket_and_object_name(const ObString &uri); - const ObString& get_bucket_name() const { return bucket_name_; } - const ObString& get_object_name() const { return object_name_; } + const ObString &get_bucket_name() const { return bucket_name_; } + const ObString &get_object_name() const { return object_name_; } + const qcloud_cos::ObCosAccount &get_cos_account() const { return cos_account_; } bool is_valid() const { return is_inited_ && handle_ != nullptr; } bool is_inited() const { return is_inited_; } diff --git a/deps/oblib/src/lib/restore/ob_storage.cpp b/deps/oblib/src/lib/restore/ob_storage.cpp index 028165ac5a..bbf58d0551 100644 --- a/deps/oblib/src/lib/restore/ob_storage.cpp +++ b/deps/oblib/src/lib/restore/ob_storage.cpp @@ -95,6 +95,14 @@ const char *get_storage_type_str(const ObStorageType &type) return str; } +bool is_storage_type_match(const common::ObString &uri, const ObStorageType &type) +{ + return (OB_STORAGE_OSS == type && uri.prefix_match(OB_OSS_PREFIX)) + || (OB_STORAGE_COS == type && uri.prefix_match(OB_COS_PREFIX)) + || (OB_STORAGE_S3 == type && uri.prefix_match(OB_S3_PREFIX)) + || (OB_STORAGE_FILE == type && uri.prefix_match(OB_FILE_PREFIX)); +} + bool is_io_error(const int result) { return OB_IO_ERROR == result || OB_OSS_ERROR == result || OB_COS_ERROR == result || OB_S3_ERROR == result; @@ -397,6 +405,10 @@ int ObStorageUtil::detect_storage_obj_meta( } else if (ObStorageGlobalIns::get_instance().is_io_prohibited()) { ret = OB_BACKUP_IO_PROHIBITED; STORAGE_LOG(WARN, "current observer backup io is prohibited", K(ret), K(uri)); + } else if (OB_UNLIKELY(!is_storage_type_match(uri, device_type_))) { + ret = OB_INVALID_BACKUP_DEST; + STORAGE_LOG(WARN, "uri prefix does not match the expected device type", + K(ret), K(uri), K_(device_type)); } else if (OB_FAIL(util_->head_object_meta(uri, obj_meta))) { STORAGE_LOG(WARN, "fail to head object meta", K(ret), K(uri)); } else if (obj_meta.is_exist_) { @@ -537,8 +549,10 @@ int ObStorageUtil::is_exist(const common::ObString &uri, const bool is_adaptive, #endif if (OB_FAIL(ret)) { //do nothing - } else if (OB_FAIL(validate_uri_type(uri))) { - OB_LOG(WARN, "fail to validate uri!", K(ret), K(uri)); + } else if (OB_UNLIKELY(!is_storage_type_match(uri, device_type_))) { + ret = OB_INVALID_BACKUP_DEST; + STORAGE_LOG(WARN, "uri prefix does not match the expected device type", + K(ret), K(uri), K_(device_type)); } else if (OB_FAIL(detect_storage_obj_meta(uri, is_adaptive, false/*need_fragment_meta*/, obj_meta))) { OB_LOG(WARN, "fail to detect storage obj type", K(ret), K(uri), K(is_adaptive)); } else { @@ -560,8 +574,10 @@ int ObStorageUtil::get_file_length(const common::ObString &uri, const bool is_ad ret = OB_E(EventTable::EN_BACKUP_IO_GET_FILE_LENGTH) OB_SUCCESS; #endif if (OB_FAIL(ret)) { - } else if (OB_FAIL(validate_uri_type(uri))) { - OB_LOG(WARN, "fail to validate uri!", K(ret), K(uri)); + } else if (OB_UNLIKELY(!is_storage_type_match(uri, device_type_))) { + ret = OB_INVALID_BACKUP_DEST; + STORAGE_LOG(WARN, "uri prefix does not match the expected device type", + K(ret), K(uri), K_(device_type)); } else if (OB_FAIL(detect_storage_obj_meta(uri, is_adaptive, true/*need_fragment_meta*/, obj_meta))) { OB_LOG(WARN, "fail to detect storage obj type", K(ret), K(uri), K(is_adaptive)); } else if (!obj_meta.is_exist_) { @@ -594,8 +610,10 @@ int ObStorageUtil::del_file(const common::ObString &uri, const bool is_adaptive) ret = OB_E(EventTable::EN_BACKUP_IO_BEFORE_DEL_FILE) OB_SUCCESS; #endif if (OB_FAIL(ret)) { - } else if (OB_FAIL(validate_uri_type(uri))) { - STORAGE_LOG(WARN, "fail to validate uri!", K(ret), K(uri)); + } else if (OB_UNLIKELY(!is_storage_type_match(uri, device_type_))) { + ret = OB_INVALID_BACKUP_DEST; + STORAGE_LOG(WARN, "uri prefix does not match the expected device type", + K(ret), K(uri), K_(device_type)); } else if (OB_FAIL(detect_storage_obj_meta(uri, is_adaptive, false/*need_fragment_meta*/, obj_meta))) { OB_LOG(WARN, "fail to detect storage obj type", K(ret), K(uri), K(is_adaptive)); } else if (obj_meta.is_simulate_append_type()) { @@ -646,8 +664,10 @@ int ObStorageUtil::del_appendable_file(const ObString &uri) } else if (ObStorageGlobalIns::get_instance().is_io_prohibited()) { ret = OB_BACKUP_IO_PROHIBITED; STORAGE_LOG(WARN, "current observer backup io is prohibited", K(ret), K(uri)); - } else if (OB_FAIL(validate_uri_type(uri))) { - STORAGE_LOG(WARN, "fail to validate uri!", K(ret), K(uri)); + } else if (OB_UNLIKELY(!is_storage_type_match(uri, device_type_))) { + ret = OB_INVALID_BACKUP_DEST; + STORAGE_LOG(WARN, "uri prefix does not match the expected device type", + K(ret), K(uri), K_(device_type)); } else { DelAppendableObjectFragmentOp op(uri, *this); if (OB_FAIL(list_files(uri, false/*is_adaptive*/, op))) { @@ -672,8 +692,10 @@ int ObStorageUtil::list_files( } else if (ObStorageGlobalIns::get_instance().is_io_prohibited()) { ret = OB_BACKUP_IO_PROHIBITED; STORAGE_LOG(WARN, "current observer backup io is prohibited", K(ret), K(uri)); - } else if (OB_FAIL(validate_uri_type(uri))) { - STORAGE_LOG(WARN, "fail to validate uri!", K(ret), K(uri)); + } else if (OB_UNLIKELY(!is_storage_type_match(uri, device_type_))) { + ret = OB_INVALID_BACKUP_DEST; + STORAGE_LOG(WARN, "uri prefix does not match the expected device type", + K(ret), K(uri), K_(device_type)); } else if (OB_FAIL(build_full_dir_path(uri.ptr(), uri_buf, sizeof(uri_buf)))) { OB_LOG(WARN, "fail to make uri end with '/'", K(ret), K(uri)); } else if (is_adaptive && OB_FAIL(list_adaptive_files(uri_buf, op))) { @@ -958,8 +980,10 @@ int ObStorageUtil::list_directories( } else if (OB_UNLIKELY(!is_init())) { ret = OB_NOT_INIT; STORAGE_LOG(WARN, "util is not inited", K(ret), K(uri)); - } else if (OB_FAIL(validate_uri_type(uri))) { - STORAGE_LOG(WARN, "fail to validate uri!", K(ret), K(uri)); + } else if (OB_UNLIKELY(!is_storage_type_match(uri, device_type_))) { + ret = OB_INVALID_BACKUP_DEST; + STORAGE_LOG(WARN, "uri prefix does not match the expected device type", + K(ret), K(uri), K_(device_type)); } else if (OB_FAIL(build_full_dir_path(uri.ptr(), uri_buf, sizeof(uri_buf)))) { OB_LOG(WARN, "fail to make uri end with '/'", K(ret), K(uri)); } else if (OB_FAIL(util_->list_directories(uri_buf, op))) { @@ -988,8 +1012,10 @@ int ObStorageUtil::is_exist(const common::ObString &uri, bool &exist) } else if (ObStorageGlobalIns::get_instance().is_io_prohibited()) { ret = OB_BACKUP_IO_PROHIBITED; STORAGE_LOG(WARN, "current observer backup io is prohibited", K(ret), K(uri)); - } else if (OB_FAIL(validate_uri_type(uri))) { - STORAGE_LOG(WARN, "fail to validate uri!", K(ret), K(uri)); + } else if (OB_UNLIKELY(!is_storage_type_match(uri, device_type_))) { + ret = OB_INVALID_BACKUP_DEST; + STORAGE_LOG(WARN, "uri prefix does not match the expected device type", + K(ret), K(uri), K_(device_type)); } else if (OB_FAIL(util_->is_exist(uri, exist))) { STORAGE_LOG(WARN, "failed to check is exist", K(ret), K(uri)); } @@ -1014,8 +1040,10 @@ int ObStorageUtil::get_file_length(const common::ObString &uri, int64_t &file_le } else if (ObStorageGlobalIns::get_instance().is_io_prohibited()) { ret = OB_BACKUP_IO_PROHIBITED; STORAGE_LOG(WARN, "current observer backup io is prohibited", K(ret), K(uri)); - } else if (OB_FAIL(validate_uri_type(uri))) { - STORAGE_LOG(WARN, "fail to validate uri!", K(ret), K(uri)); + } else if (OB_UNLIKELY(!is_storage_type_match(uri, device_type_))) { + ret = OB_INVALID_BACKUP_DEST; + STORAGE_LOG(WARN, "uri prefix does not match the expected device type", + K(ret), K(uri), K_(device_type)); } else if (OB_FAIL(util_->get_file_length(uri, file_length))) { if (OB_BACKUP_FILE_NOT_EXIST == ret) { STORAGE_LOG(INFO, "cannot get file length for not exist file", K(ret), K(uri)); @@ -1053,8 +1081,10 @@ int ObStorageUtil::get_adaptive_file_length(const common::ObString &uri, int64_t } else if (ObStorageGlobalIns::get_instance().is_io_prohibited()) { ret = OB_BACKUP_IO_PROHIBITED; STORAGE_LOG(WARN, "current observer backup io is prohibited", K(ret), K(uri)); - } else if (OB_FAIL(validate_uri_type(uri))) { - STORAGE_LOG(WARN, "fail to validate uri!", K(ret), K(uri)); + } else if (OB_UNLIKELY(!is_storage_type_match(uri, device_type_))) { + ret = OB_INVALID_BACKUP_DEST; + STORAGE_LOG(WARN, "uri prefix does not match the expected device type", + K(ret), K(uri), K_(device_type)); } else { if (OB_FAIL(read_seal_meta_if_needed(uri, obj_meta))) { OB_LOG(WARN, "fail to read seal meta if needed", K(ret), K(uri)); @@ -1101,8 +1131,10 @@ int ObStorageUtil::del_file(const common::ObString &uri) } else if (ObStorageGlobalIns::get_instance().is_io_prohibited()) { ret = OB_BACKUP_IO_PROHIBITED; STORAGE_LOG(WARN, "current observer backup io is prohibited", K(ret), K(uri)); - } else if (OB_FAIL(validate_uri_type(uri))) { - STORAGE_LOG(WARN, "fail to validate uri!", K(ret), K(uri)); + } else if (OB_UNLIKELY(!is_storage_type_match(uri, device_type_))) { + ret = OB_INVALID_BACKUP_DEST; + STORAGE_LOG(WARN, "uri prefix does not match the expected device type", + K(ret), K(uri), K_(device_type)); } else { const int max_try_cnt = 5; int try_cnt = 0; @@ -1153,8 +1185,10 @@ int ObStorageUtil::mkdir(const common::ObString &uri) } else if (ObStorageGlobalIns::get_instance().is_io_prohibited()) { ret = OB_BACKUP_IO_PROHIBITED; STORAGE_LOG(WARN, "current observer backup io is prohibited", K(ret), K(uri)); - } else if (OB_FAIL(validate_uri_type(uri))) { - STORAGE_LOG(WARN, "fail to validate uri!", K(ret), K(uri)); + } else if (OB_UNLIKELY(!is_storage_type_match(uri, device_type_))) { + ret = OB_INVALID_BACKUP_DEST; + STORAGE_LOG(WARN, "uri prefix does not match the expected device type", + K(ret), K(uri), K_(device_type)); } else if (OB_FAIL(util_->mkdir(uri))) { STORAGE_LOG(WARN, "failed to mkdir", K(ret), K(uri)); } @@ -1184,8 +1218,10 @@ int ObStorageUtil::list_files(const common::ObString &uri, common::ObBaseDirEntr } else if (ObStorageGlobalIns::get_instance().is_io_prohibited()) { ret = OB_BACKUP_IO_PROHIBITED; STORAGE_LOG(WARN, "current observer backup io is prohibited", K(ret), K(uri)); - } else if (OB_FAIL(validate_uri_type(uri))) { - STORAGE_LOG(WARN, "fail to validate uri!", K(ret), K(uri)); + } else if (OB_UNLIKELY(!is_storage_type_match(uri, device_type_))) { + ret = OB_INVALID_BACKUP_DEST; + STORAGE_LOG(WARN, "uri prefix does not match the expected device type", + K(ret), K(uri), K_(device_type)); } else if (OB_FAIL(build_full_dir_path(uri.ptr(), uri_buf, sizeof(uri_buf)))) { OB_LOG(WARN, "fail to make dir path end with '/'", K(ret), K(uri)); } else if (OB_FAIL(util_->list_files(uri_buf, op))) { @@ -1217,8 +1253,10 @@ int ObStorageUtil::write_single_file(const common::ObString &uri, const char *bu } else if (ObStorageGlobalIns::get_instance().is_io_prohibited()) { ret = OB_BACKUP_IO_PROHIBITED; STORAGE_LOG(WARN, "current observer backup io is prohibited", K(ret), K(uri)); - } else if (OB_FAIL(validate_uri_type(uri))) { - STORAGE_LOG(WARN, "fail to validate uri!", K(ret), K(uri)); + } else if (OB_UNLIKELY(!is_storage_type_match(uri, device_type_))) { + ret = OB_INVALID_BACKUP_DEST; + STORAGE_LOG(WARN, "uri prefix does not match the expected device type", + K(ret), K(uri), K_(device_type)); } else if (OB_FAIL(util_->write_single_file(uri, buf, size))) { STORAGE_LOG(WARN, "failed to write single file", K(ret), K(uri)); } else { @@ -1251,8 +1289,10 @@ int ObStorageUtil::del_dir(const common::ObString &uri) } else if (OB_UNLIKELY(!is_init())) { ret = OB_NOT_INIT; STORAGE_LOG(WARN, "util is not inited", K(ret), K(uri)); - } else if (OB_FAIL(validate_uri_type(uri))) { - STORAGE_LOG(WARN, "fail to validate uri!", K(ret), K(uri)); + } else if (OB_UNLIKELY(!is_storage_type_match(uri, device_type_))) { + ret = OB_INVALID_BACKUP_DEST; + STORAGE_LOG(WARN, "uri prefix does not match the expected device type", + K(ret), K(uri), K_(device_type)); } else if (OB_FAIL(util_->del_dir(uri))) { STORAGE_LOG(WARN, "failed to del_file", K(ret), K(uri)); } @@ -1277,8 +1317,10 @@ int ObStorageUtil::list_directories(const common::ObString &uri, common::ObBaseD } else if (OB_UNLIKELY(!is_init())) { ret = OB_NOT_INIT; STORAGE_LOG(WARN, "util is not inited", K(ret), K(uri)); - } else if (OB_FAIL(validate_uri_type(uri))) { - STORAGE_LOG(WARN, "fail to validate uri!", K(ret), K(uri)); + } else if (OB_UNLIKELY(!is_storage_type_match(uri, device_type_))) { + ret = OB_INVALID_BACKUP_DEST; + STORAGE_LOG(WARN, "uri prefix does not match the expected device type", + K(ret), K(uri), K_(device_type)); } else if (OB_FAIL(build_full_dir_path(uri.ptr(), uri_buf, sizeof(uri_buf)))) { OB_LOG(WARN, "fail to make dir path end with '/'", K(ret), K(uri)); } else if (OB_FAIL(util_->list_directories(uri_buf, op))) { @@ -1301,8 +1343,10 @@ int ObStorageUtil::is_tagging(const common::ObString &uri, bool &is_tagging) } else if (ObStorageGlobalIns::get_instance().is_io_prohibited()) { ret = OB_BACKUP_IO_PROHIBITED; STORAGE_LOG(WARN, "current observer backup io is prohibited", K(ret), K(uri)); - } else if (OB_FAIL(validate_uri_type(uri))) { - STORAGE_LOG(WARN, "fail to validate uri!", K(ret), K(uri)); + } else if (OB_UNLIKELY(!is_storage_type_match(uri, device_type_))) { + ret = OB_INVALID_BACKUP_DEST; + STORAGE_LOG(WARN, "uri prefix does not match the expected device type", + K(ret), K(uri), K_(device_type)); } else if (OB_FAIL(util_->is_tagging(uri, is_tagging))) { STORAGE_LOG(WARN, "failed to check is tagging", K(ret), K(uri)); } @@ -1322,8 +1366,10 @@ int ObStorageUtil::del_unmerged_parts(const common::ObString &uri) // } else if (ObStorageGlobalIns::get_instance().is_io_prohibited()) { // ret = OB_BACKUP_IO_PROHIBITED; // STORAGE_LOG(WARN, "current observer backup io is prohibited", K(ret), K(uri)); - // } else if (OB_FAIL(validate_uri_type(uri))) { - // STORAGE_LOG(WARN, "fail to validate uri!", K(ret), K(uri)); + // else if (OB_UNLIKELY(!is_storage_type_match(uri, device_type_))) { + // ret = OB_INVALID_BACKUP_DEST; + // STORAGE_LOG(WARN, "uri prefix does not match the expected device type", + // K(ret), K(uri), K_(device_type)); // } else if (OB_FAIL(util_->del_unmerged_parts(uri))) { // STORAGE_LOG(WARN, "fail to del unmerged parts!", K(ret), K(uri)); // } @@ -1373,10 +1419,13 @@ int ObStorageReader::open(const common::ObString &uri, common::ObObjectStorageIn } else if (OB_ISNULL(storage_info) || OB_UNLIKELY(uri.empty() || !storage_info->is_valid())) { ret = OB_INVALID_ARGUMENT; STORAGE_LOG(WARN, "invalid arguments", K(ret), K(uri), KPC(storage_info)); + } else if (FALSE_IT(type = storage_info->get_type())) { + } else if (OB_UNLIKELY(!is_storage_type_match(uri, type))) { + ret = OB_INVALID_BACKUP_DEST; + STORAGE_LOG(WARN, "uri prefix does not match the expected device type", + K(ret), K(uri), KPC(storage_info), K(type)); } else if (OB_FAIL(databuff_printf(uri_, sizeof(uri_), "%.*s", uri.length(), uri.ptr()))) { STORAGE_LOG(WARN, "failed to fill uri", K(ret), K(uri)); - } else if (OB_FAIL(get_storage_type_from_path(uri, type))) { - STORAGE_LOG(WARN, "failed to get type", K(ret), K(uri)); } else if (OB_STORAGE_OSS == type) { reader_ = &oss_reader_; } else if (OB_STORAGE_COS == type) { @@ -1508,10 +1557,13 @@ int ObStorageAdaptiveReader::open(const common::ObString &uri, } else if (OB_ISNULL(storage_info) || OB_UNLIKELY(uri.empty() || !storage_info->is_valid())) { ret = OB_INVALID_ARGUMENT; STORAGE_LOG(WARN, "invalid arguments", K(ret), K(uri), KPC(storage_info)); + } else if (FALSE_IT(type = storage_info->get_type())) { + } else if (OB_UNLIKELY(!is_storage_type_match(uri, type))) { + ret = OB_INVALID_BACKUP_DEST; + STORAGE_LOG(WARN, "uri prefix does not match the expected device type", + K(ret), K(uri), KPC(storage_info), K(type)); } else if (OB_FAIL(databuff_printf(uri_, sizeof(uri_), "%.*s", uri.length(), uri.ptr()))) { STORAGE_LOG(WARN, "failed to fill uri", K(ret), K(uri)); - } else if (OB_FAIL(get_storage_type_from_path(uri, type))) { - STORAGE_LOG(WARN, "failed to get type", K(ret), K(uri)); } else if (OB_STORAGE_OSS == type) { reader_ = &oss_reader_; } else if (OB_STORAGE_COS == type) { @@ -1710,10 +1762,13 @@ int ObStorageWriter::open(const common::ObString &uri, common::ObObjectStorageIn } else if (OB_ISNULL(storage_info) || OB_UNLIKELY(uri.empty() || !storage_info->is_valid())) { ret = OB_INVALID_ARGUMENT; STORAGE_LOG(WARN, "invalid arguments", K(ret), K(uri), KPC(storage_info)); + } else if (FALSE_IT(type = storage_info->get_type())) { + } else if (OB_UNLIKELY(!is_storage_type_match(uri, type))) { + ret = OB_INVALID_BACKUP_DEST; + STORAGE_LOG(WARN, "uri prefix does not match the expected device type", + K(ret), K(uri), KPC(storage_info), K(type)); } else if (OB_FAIL(databuff_printf(uri_, sizeof(uri_), "%.*s", uri.length(), uri.ptr()))) { STORAGE_LOG(WARN, "failed to fill uri", K(ret), K(uri)); - } else if (OB_FAIL(get_storage_type_from_path(uri, type))) { - STORAGE_LOG(WARN, "failed to get type", K(ret), K(uri)); } else if (OB_STORAGE_OSS == type) { writer_ = &oss_writer_; } else if (OB_STORAGE_COS == type) { @@ -1854,10 +1909,13 @@ int ObStorageAppender::open( } else if (OB_ISNULL(storage_info) || OB_UNLIKELY(uri.empty() || !storage_info->is_valid())) { ret = OB_INVALID_ARGUMENT; STORAGE_LOG(WARN, "invalid arguments", K(ret), K(uri), KPC(storage_info)); + } else if (FALSE_IT(type_ = storage_info->get_type())) { + } else if (OB_UNLIKELY(!is_storage_type_match(uri, type_))) { + ret = OB_INVALID_BACKUP_DEST; + STORAGE_LOG(WARN, "uri prefix does not match the expected device type", + K(ret), K(uri), KPC(storage_info), K_(type)); } else if (OB_FAIL(databuff_printf(uri_, sizeof(uri_), "%.*s", uri.length(), uri.ptr()))) { STORAGE_LOG(WARN, "failed to fill uri", K(ret), K(uri)); - } else if (OB_FAIL(get_storage_type_from_path(uri, type_))) { - STORAGE_LOG(WARN, "failed to get type", K(ret), K(uri)); } else if (OB_STORAGE_OSS == type_ || OB_STORAGE_COS == type_ || OB_STORAGE_S3 == type_) { if (OB_FAIL(storage_info_.assign(*storage_info))) { STORAGE_LOG(WARN, "failed to copy storage info", K(ret)); @@ -2149,10 +2207,13 @@ int ObStorageMultiPartWriter::open( } else if (OB_ISNULL(storage_info) || OB_UNLIKELY(uri.empty() || !storage_info->is_valid())) { ret = OB_INVALID_ARGUMENT; STORAGE_LOG(WARN, "invalid arguments", K(ret), K(uri), KPC(storage_info)); + } else if (FALSE_IT(type = storage_info->get_type())) { + } else if (OB_UNLIKELY(!is_storage_type_match(uri, type))) { + ret = OB_INVALID_BACKUP_DEST; + STORAGE_LOG(WARN, "uri prefix does not match the expected device type", + K(ret), K(uri), KPC(storage_info), K(type)); } else if (OB_FAIL(databuff_printf(uri_, sizeof(uri_), "%.*s", uri.length(), uri.ptr()))) { STORAGE_LOG(WARN, "failed to fill uri", K(ret), K(uri)); - } else if (OB_FAIL(get_storage_type_from_path(uri, type))) { - STORAGE_LOG(WARN, "failed to get type", K(ret), K(uri)); } else if (OB_STORAGE_OSS == type || OB_STORAGE_COS == type || OB_STORAGE_S3 == type) { if (OB_FAIL(storage_info_.assign(*storage_info))) { STORAGE_LOG(WARN, "failed to copy storage info", K(ret), KPC(storage_info)); diff --git a/deps/oblib/src/lib/restore/ob_storage_cos_base.cpp b/deps/oblib/src/lib/restore/ob_storage_cos_base.cpp index b0f20c7caf..41ecda36da 100644 --- a/deps/oblib/src/lib/restore/ob_storage_cos_base.cpp +++ b/deps/oblib/src/lib/restore/ob_storage_cos_base.cpp @@ -781,6 +781,8 @@ int ObStorageCosReader::pread( int64_t &read_size) { int ret = OB_SUCCESS; + ObCosMemAllocator allocator; + qcloud_cos::ObCosWrapper::Handle *tmp_cos_handle = nullptr; ObExternalIOCounterGuard io_guard; if (!is_opened_) { @@ -789,6 +791,17 @@ int ObStorageCosReader::pread( } else if (OB_ISNULL(buf) || OB_UNLIKELY(buf_size <= 0 || offset < 0)) { ret = OB_INVALID_ARGUMENT; OB_LOG(WARN, "invalid argument", K(ret), KP(buf), K(buf_size), K(offset)); + // The created cos_handle contains a memory allocator that is not thread-safe and + // cannot be used concurrently. As the allocator is not designed to handle concurrent calls, + // using it in parallel (such as calling reader.pread simultaneously from multiple threads) + // can lead to race conditions, undefined behavior, and potential crashes (core dumps). + // To maintain thread safety, a new temporary cos_handle should be created for each individual + // pread operation rather than reusing the same handle. This approach ensures that memory + // allocation is safely performed without conflicts across concurrent operations. + } else if (OB_FAIL(create_cos_handle( + allocator, handle_.get_cos_account(), + checksum_type_ == ObStorageChecksumType::OB_MD5_ALGO, tmp_cos_handle))) { + OB_LOG(WARN, "fail to create tmp cos handle", K(ret), K_(checksum_type)); } else { // When is_range_read is true, it indicates that only a part of the data is read. // When false, it indicates that the entire object is read @@ -817,7 +830,7 @@ int ObStorageCosReader::pread( qcloud_cos::CosStringBuffer object_name = qcloud_cos::CosStringBuffer( handle_.get_object_name().ptr(), handle_.get_object_name().length()); - if (OB_FAIL(qcloud_cos::ObCosWrapper::pread(handle_.get_ptr(), bucket_name, + if (OB_FAIL(qcloud_cos::ObCosWrapper::pread(tmp_cos_handle, bucket_name, object_name, offset, buf, get_data_size, is_range_read, read_size))) { OB_LOG(WARN, "fail to read object from cos", K(ret), K(is_range_read), KP(buf), K(buf_size), K(offset), K(get_data_size), K_(has_meta)); diff --git a/deps/oblib/src/lib/restore/ob_storage_s3_base.cpp b/deps/oblib/src/lib/restore/ob_storage_s3_base.cpp index beefdc81bc..598e1af7c8 100644 --- a/deps/oblib/src/lib/restore/ob_storage_s3_base.cpp +++ b/deps/oblib/src/lib/restore/ob_storage_s3_base.cpp @@ -1246,9 +1246,6 @@ int ObStorageS3Reader::pread_(char *buf, } else { Aws::S3::Model::GetObjectRequest request; Aws::S3::Model::GetObjectOutcome outcome; - if (get_data_size == file_length_) { - request.SetChecksumMode(Aws::S3::Model::ChecksumMode::ENABLED); - } char range_read[64] = { 0 }; request.WithBucket(bucket_.ptr()).WithKey(object_.ptr());