Fix residual errors in object storage

This commit is contained in:
obdev
2024-05-08 05:32:14 +00:00
committed by ob-robot
parent 216d9ed8db
commit b89a9a7dde
5 changed files with 154 additions and 57 deletions

View File

@ -102,21 +102,38 @@ void ObCosWrapperHandle::reset()
delete_mode_ = ObIStorageUtil::DELETE;
}
int create_cos_handle(
ObCosMemAllocator &allocator,
const qcloud_cos::ObCosAccount &cos_account,
const bool check_md5,
qcloud_cos::ObCosWrapper::Handle *&handle)
{
int ret = OB_SUCCESS;
handle = nullptr;
qcloud_cos::OB_COS_customMem cos_mem = {ob_cos_malloc, ob_cos_free, &allocator};
if (OB_FAIL(qcloud_cos::ObCosWrapper::create_cos_handle(cos_mem, cos_account,
check_md5, &handle))) {
OB_LOG(WARN, "failed to create tmp cos handle", K(ret));
} else if (OB_ISNULL(handle)) {
ret = OB_COS_ERROR;
OB_LOG(WARN, "create tmp handle succeed, but returned handle is null", K(ret));
}
return ret;
}
int ObCosWrapperHandle::create_cos_handle(const bool check_md5)
{
int ret = OB_SUCCESS;
qcloud_cos::OB_COS_customMem cos_mem = {ob_cos_malloc, ob_cos_free, &allocator_};
if (OB_NOT_NULL(handle_)) {
ret = OB_ERR_UNEXPECTED;
OB_LOG(WARN, "handle is not null", K(ret));
} else if (OB_FAIL(qcloud_cos::ObCosWrapper::create_cos_handle(cos_mem, cos_account_,
check_md5, &handle_))) {
} else if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
OB_LOG(WARN, "handle is not inited", K(ret));
} else if (OB_FAIL(common::create_cos_handle(allocator_, cos_account_, check_md5, handle_))) {
OB_LOG(WARN, "failed to create cos handle", K(ret));
} else if (OB_ISNULL(handle_)) {
ret = OB_COS_ERROR;
OB_LOG(WARN, "create handle succeed, but returned handle is null", K(ret));
}
return ret;
}

View File

@ -39,6 +39,14 @@ private:
ObArenaAllocator allocator_;
};
// Create a temporary cos_handle object,
// utilizing the 'allocator' to allocate the necessary memory.
int create_cos_handle(
ObCosMemAllocator &allocator,
const qcloud_cos::ObCosAccount &cos_account,
const bool check_md5,
qcloud_cos::ObCosWrapper::Handle *&handle);
class ObCosWrapperHandle
{
public:
@ -52,8 +60,9 @@ public:
qcloud_cos::ObCosWrapper::Handle *get_ptr() { return handle_; }
int build_bucket_and_object_name(const ObString &uri);
const ObString& get_bucket_name() const { return bucket_name_; }
const ObString& get_object_name() const { return object_name_; }
const ObString &get_bucket_name() const { return bucket_name_; }
const ObString &get_object_name() const { return object_name_; }
const qcloud_cos::ObCosAccount &get_cos_account() const { return cos_account_; }
bool is_valid() const { return is_inited_ && handle_ != nullptr; }
bool is_inited() const { return is_inited_; }

View File

@ -95,6 +95,14 @@ const char *get_storage_type_str(const ObStorageType &type)
return str;
}
bool is_storage_type_match(const common::ObString &uri, const ObStorageType &type)
{
return (OB_STORAGE_OSS == type && uri.prefix_match(OB_OSS_PREFIX))
|| (OB_STORAGE_COS == type && uri.prefix_match(OB_COS_PREFIX))
|| (OB_STORAGE_S3 == type && uri.prefix_match(OB_S3_PREFIX))
|| (OB_STORAGE_FILE == type && uri.prefix_match(OB_FILE_PREFIX));
}
bool is_io_error(const int result)
{
return OB_IO_ERROR == result || OB_OSS_ERROR == result || OB_COS_ERROR == result || OB_S3_ERROR == result;
@ -397,6 +405,10 @@ int ObStorageUtil::detect_storage_obj_meta(
} else if (ObStorageGlobalIns::get_instance().is_io_prohibited()) {
ret = OB_BACKUP_IO_PROHIBITED;
STORAGE_LOG(WARN, "current observer backup io is prohibited", K(ret), K(uri));
} else if (OB_UNLIKELY(!is_storage_type_match(uri, device_type_))) {
ret = OB_INVALID_BACKUP_DEST;
STORAGE_LOG(WARN, "uri prefix does not match the expected device type",
K(ret), K(uri), K_(device_type));
} else if (OB_FAIL(util_->head_object_meta(uri, obj_meta))) {
STORAGE_LOG(WARN, "fail to head object meta", K(ret), K(uri));
} else if (obj_meta.is_exist_) {
@ -537,8 +549,10 @@ int ObStorageUtil::is_exist(const common::ObString &uri, const bool is_adaptive,
#endif
if (OB_FAIL(ret)) {
//do nothing
} else if (OB_FAIL(validate_uri_type(uri))) {
OB_LOG(WARN, "fail to validate uri!", K(ret), K(uri));
} else if (OB_UNLIKELY(!is_storage_type_match(uri, device_type_))) {
ret = OB_INVALID_BACKUP_DEST;
STORAGE_LOG(WARN, "uri prefix does not match the expected device type",
K(ret), K(uri), K_(device_type));
} else if (OB_FAIL(detect_storage_obj_meta(uri, is_adaptive, false/*need_fragment_meta*/, obj_meta))) {
OB_LOG(WARN, "fail to detect storage obj type", K(ret), K(uri), K(is_adaptive));
} else {
@ -560,8 +574,10 @@ int ObStorageUtil::get_file_length(const common::ObString &uri, const bool is_ad
ret = OB_E(EventTable::EN_BACKUP_IO_GET_FILE_LENGTH) OB_SUCCESS;
#endif
if (OB_FAIL(ret)) {
} else if (OB_FAIL(validate_uri_type(uri))) {
OB_LOG(WARN, "fail to validate uri!", K(ret), K(uri));
} else if (OB_UNLIKELY(!is_storage_type_match(uri, device_type_))) {
ret = OB_INVALID_BACKUP_DEST;
STORAGE_LOG(WARN, "uri prefix does not match the expected device type",
K(ret), K(uri), K_(device_type));
} else if (OB_FAIL(detect_storage_obj_meta(uri, is_adaptive, true/*need_fragment_meta*/, obj_meta))) {
OB_LOG(WARN, "fail to detect storage obj type", K(ret), K(uri), K(is_adaptive));
} else if (!obj_meta.is_exist_) {
@ -594,8 +610,10 @@ int ObStorageUtil::del_file(const common::ObString &uri, const bool is_adaptive)
ret = OB_E(EventTable::EN_BACKUP_IO_BEFORE_DEL_FILE) OB_SUCCESS;
#endif
if (OB_FAIL(ret)) {
} else if (OB_FAIL(validate_uri_type(uri))) {
STORAGE_LOG(WARN, "fail to validate uri!", K(ret), K(uri));
} else if (OB_UNLIKELY(!is_storage_type_match(uri, device_type_))) {
ret = OB_INVALID_BACKUP_DEST;
STORAGE_LOG(WARN, "uri prefix does not match the expected device type",
K(ret), K(uri), K_(device_type));
} else if (OB_FAIL(detect_storage_obj_meta(uri, is_adaptive, false/*need_fragment_meta*/, obj_meta))) {
OB_LOG(WARN, "fail to detect storage obj type", K(ret), K(uri), K(is_adaptive));
} else if (obj_meta.is_simulate_append_type()) {
@ -646,8 +664,10 @@ int ObStorageUtil::del_appendable_file(const ObString &uri)
} else if (ObStorageGlobalIns::get_instance().is_io_prohibited()) {
ret = OB_BACKUP_IO_PROHIBITED;
STORAGE_LOG(WARN, "current observer backup io is prohibited", K(ret), K(uri));
} else if (OB_FAIL(validate_uri_type(uri))) {
STORAGE_LOG(WARN, "fail to validate uri!", K(ret), K(uri));
} else if (OB_UNLIKELY(!is_storage_type_match(uri, device_type_))) {
ret = OB_INVALID_BACKUP_DEST;
STORAGE_LOG(WARN, "uri prefix does not match the expected device type",
K(ret), K(uri), K_(device_type));
} else {
DelAppendableObjectFragmentOp op(uri, *this);
if (OB_FAIL(list_files(uri, false/*is_adaptive*/, op))) {
@ -672,8 +692,10 @@ int ObStorageUtil::list_files(
} else if (ObStorageGlobalIns::get_instance().is_io_prohibited()) {
ret = OB_BACKUP_IO_PROHIBITED;
STORAGE_LOG(WARN, "current observer backup io is prohibited", K(ret), K(uri));
} else if (OB_FAIL(validate_uri_type(uri))) {
STORAGE_LOG(WARN, "fail to validate uri!", K(ret), K(uri));
} else if (OB_UNLIKELY(!is_storage_type_match(uri, device_type_))) {
ret = OB_INVALID_BACKUP_DEST;
STORAGE_LOG(WARN, "uri prefix does not match the expected device type",
K(ret), K(uri), K_(device_type));
} else if (OB_FAIL(build_full_dir_path(uri.ptr(), uri_buf, sizeof(uri_buf)))) {
OB_LOG(WARN, "fail to make uri end with '/'", K(ret), K(uri));
} else if (is_adaptive && OB_FAIL(list_adaptive_files(uri_buf, op))) {
@ -958,8 +980,10 @@ int ObStorageUtil::list_directories(
} else if (OB_UNLIKELY(!is_init())) {
ret = OB_NOT_INIT;
STORAGE_LOG(WARN, "util is not inited", K(ret), K(uri));
} else if (OB_FAIL(validate_uri_type(uri))) {
STORAGE_LOG(WARN, "fail to validate uri!", K(ret), K(uri));
} else if (OB_UNLIKELY(!is_storage_type_match(uri, device_type_))) {
ret = OB_INVALID_BACKUP_DEST;
STORAGE_LOG(WARN, "uri prefix does not match the expected device type",
K(ret), K(uri), K_(device_type));
} else if (OB_FAIL(build_full_dir_path(uri.ptr(), uri_buf, sizeof(uri_buf)))) {
OB_LOG(WARN, "fail to make uri end with '/'", K(ret), K(uri));
} else if (OB_FAIL(util_->list_directories(uri_buf, op))) {
@ -988,8 +1012,10 @@ int ObStorageUtil::is_exist(const common::ObString &uri, bool &exist)
} else if (ObStorageGlobalIns::get_instance().is_io_prohibited()) {
ret = OB_BACKUP_IO_PROHIBITED;
STORAGE_LOG(WARN, "current observer backup io is prohibited", K(ret), K(uri));
} else if (OB_FAIL(validate_uri_type(uri))) {
STORAGE_LOG(WARN, "fail to validate uri!", K(ret), K(uri));
} else if (OB_UNLIKELY(!is_storage_type_match(uri, device_type_))) {
ret = OB_INVALID_BACKUP_DEST;
STORAGE_LOG(WARN, "uri prefix does not match the expected device type",
K(ret), K(uri), K_(device_type));
} else if (OB_FAIL(util_->is_exist(uri, exist))) {
STORAGE_LOG(WARN, "failed to check is exist", K(ret), K(uri));
}
@ -1014,8 +1040,10 @@ int ObStorageUtil::get_file_length(const common::ObString &uri, int64_t &file_le
} else if (ObStorageGlobalIns::get_instance().is_io_prohibited()) {
ret = OB_BACKUP_IO_PROHIBITED;
STORAGE_LOG(WARN, "current observer backup io is prohibited", K(ret), K(uri));
} else if (OB_FAIL(validate_uri_type(uri))) {
STORAGE_LOG(WARN, "fail to validate uri!", K(ret), K(uri));
} else if (OB_UNLIKELY(!is_storage_type_match(uri, device_type_))) {
ret = OB_INVALID_BACKUP_DEST;
STORAGE_LOG(WARN, "uri prefix does not match the expected device type",
K(ret), K(uri), K_(device_type));
} else if (OB_FAIL(util_->get_file_length(uri, file_length))) {
if (OB_BACKUP_FILE_NOT_EXIST == ret) {
STORAGE_LOG(INFO, "cannot get file length for not exist file", K(ret), K(uri));
@ -1053,8 +1081,10 @@ int ObStorageUtil::get_adaptive_file_length(const common::ObString &uri, int64_t
} else if (ObStorageGlobalIns::get_instance().is_io_prohibited()) {
ret = OB_BACKUP_IO_PROHIBITED;
STORAGE_LOG(WARN, "current observer backup io is prohibited", K(ret), K(uri));
} else if (OB_FAIL(validate_uri_type(uri))) {
STORAGE_LOG(WARN, "fail to validate uri!", K(ret), K(uri));
} else if (OB_UNLIKELY(!is_storage_type_match(uri, device_type_))) {
ret = OB_INVALID_BACKUP_DEST;
STORAGE_LOG(WARN, "uri prefix does not match the expected device type",
K(ret), K(uri), K_(device_type));
} else {
if (OB_FAIL(read_seal_meta_if_needed(uri, obj_meta))) {
OB_LOG(WARN, "fail to read seal meta if needed", K(ret), K(uri));
@ -1101,8 +1131,10 @@ int ObStorageUtil::del_file(const common::ObString &uri)
} else if (ObStorageGlobalIns::get_instance().is_io_prohibited()) {
ret = OB_BACKUP_IO_PROHIBITED;
STORAGE_LOG(WARN, "current observer backup io is prohibited", K(ret), K(uri));
} else if (OB_FAIL(validate_uri_type(uri))) {
STORAGE_LOG(WARN, "fail to validate uri!", K(ret), K(uri));
} else if (OB_UNLIKELY(!is_storage_type_match(uri, device_type_))) {
ret = OB_INVALID_BACKUP_DEST;
STORAGE_LOG(WARN, "uri prefix does not match the expected device type",
K(ret), K(uri), K_(device_type));
} else {
const int max_try_cnt = 5;
int try_cnt = 0;
@ -1153,8 +1185,10 @@ int ObStorageUtil::mkdir(const common::ObString &uri)
} else if (ObStorageGlobalIns::get_instance().is_io_prohibited()) {
ret = OB_BACKUP_IO_PROHIBITED;
STORAGE_LOG(WARN, "current observer backup io is prohibited", K(ret), K(uri));
} else if (OB_FAIL(validate_uri_type(uri))) {
STORAGE_LOG(WARN, "fail to validate uri!", K(ret), K(uri));
} else if (OB_UNLIKELY(!is_storage_type_match(uri, device_type_))) {
ret = OB_INVALID_BACKUP_DEST;
STORAGE_LOG(WARN, "uri prefix does not match the expected device type",
K(ret), K(uri), K_(device_type));
} else if (OB_FAIL(util_->mkdir(uri))) {
STORAGE_LOG(WARN, "failed to mkdir", K(ret), K(uri));
}
@ -1184,8 +1218,10 @@ int ObStorageUtil::list_files(const common::ObString &uri, common::ObBaseDirEntr
} else if (ObStorageGlobalIns::get_instance().is_io_prohibited()) {
ret = OB_BACKUP_IO_PROHIBITED;
STORAGE_LOG(WARN, "current observer backup io is prohibited", K(ret), K(uri));
} else if (OB_FAIL(validate_uri_type(uri))) {
STORAGE_LOG(WARN, "fail to validate uri!", K(ret), K(uri));
} else if (OB_UNLIKELY(!is_storage_type_match(uri, device_type_))) {
ret = OB_INVALID_BACKUP_DEST;
STORAGE_LOG(WARN, "uri prefix does not match the expected device type",
K(ret), K(uri), K_(device_type));
} else if (OB_FAIL(build_full_dir_path(uri.ptr(), uri_buf, sizeof(uri_buf)))) {
OB_LOG(WARN, "fail to make dir path end with '/'", K(ret), K(uri));
} else if (OB_FAIL(util_->list_files(uri_buf, op))) {
@ -1217,8 +1253,10 @@ int ObStorageUtil::write_single_file(const common::ObString &uri, const char *bu
} else if (ObStorageGlobalIns::get_instance().is_io_prohibited()) {
ret = OB_BACKUP_IO_PROHIBITED;
STORAGE_LOG(WARN, "current observer backup io is prohibited", K(ret), K(uri));
} else if (OB_FAIL(validate_uri_type(uri))) {
STORAGE_LOG(WARN, "fail to validate uri!", K(ret), K(uri));
} else if (OB_UNLIKELY(!is_storage_type_match(uri, device_type_))) {
ret = OB_INVALID_BACKUP_DEST;
STORAGE_LOG(WARN, "uri prefix does not match the expected device type",
K(ret), K(uri), K_(device_type));
} else if (OB_FAIL(util_->write_single_file(uri, buf, size))) {
STORAGE_LOG(WARN, "failed to write single file", K(ret), K(uri));
} else {
@ -1251,8 +1289,10 @@ int ObStorageUtil::del_dir(const common::ObString &uri)
} else if (OB_UNLIKELY(!is_init())) {
ret = OB_NOT_INIT;
STORAGE_LOG(WARN, "util is not inited", K(ret), K(uri));
} else if (OB_FAIL(validate_uri_type(uri))) {
STORAGE_LOG(WARN, "fail to validate uri!", K(ret), K(uri));
} else if (OB_UNLIKELY(!is_storage_type_match(uri, device_type_))) {
ret = OB_INVALID_BACKUP_DEST;
STORAGE_LOG(WARN, "uri prefix does not match the expected device type",
K(ret), K(uri), K_(device_type));
} else if (OB_FAIL(util_->del_dir(uri))) {
STORAGE_LOG(WARN, "failed to del_file", K(ret), K(uri));
}
@ -1277,8 +1317,10 @@ int ObStorageUtil::list_directories(const common::ObString &uri, common::ObBaseD
} else if (OB_UNLIKELY(!is_init())) {
ret = OB_NOT_INIT;
STORAGE_LOG(WARN, "util is not inited", K(ret), K(uri));
} else if (OB_FAIL(validate_uri_type(uri))) {
STORAGE_LOG(WARN, "fail to validate uri!", K(ret), K(uri));
} else if (OB_UNLIKELY(!is_storage_type_match(uri, device_type_))) {
ret = OB_INVALID_BACKUP_DEST;
STORAGE_LOG(WARN, "uri prefix does not match the expected device type",
K(ret), K(uri), K_(device_type));
} else if (OB_FAIL(build_full_dir_path(uri.ptr(), uri_buf, sizeof(uri_buf)))) {
OB_LOG(WARN, "fail to make dir path end with '/'", K(ret), K(uri));
} else if (OB_FAIL(util_->list_directories(uri_buf, op))) {
@ -1301,8 +1343,10 @@ int ObStorageUtil::is_tagging(const common::ObString &uri, bool &is_tagging)
} else if (ObStorageGlobalIns::get_instance().is_io_prohibited()) {
ret = OB_BACKUP_IO_PROHIBITED;
STORAGE_LOG(WARN, "current observer backup io is prohibited", K(ret), K(uri));
} else if (OB_FAIL(validate_uri_type(uri))) {
STORAGE_LOG(WARN, "fail to validate uri!", K(ret), K(uri));
} else if (OB_UNLIKELY(!is_storage_type_match(uri, device_type_))) {
ret = OB_INVALID_BACKUP_DEST;
STORAGE_LOG(WARN, "uri prefix does not match the expected device type",
K(ret), K(uri), K_(device_type));
} else if (OB_FAIL(util_->is_tagging(uri, is_tagging))) {
STORAGE_LOG(WARN, "failed to check is tagging", K(ret), K(uri));
}
@ -1322,8 +1366,10 @@ int ObStorageUtil::del_unmerged_parts(const common::ObString &uri)
// } else if (ObStorageGlobalIns::get_instance().is_io_prohibited()) {
// ret = OB_BACKUP_IO_PROHIBITED;
// STORAGE_LOG(WARN, "current observer backup io is prohibited", K(ret), K(uri));
// } else if (OB_FAIL(validate_uri_type(uri))) {
// STORAGE_LOG(WARN, "fail to validate uri!", K(ret), K(uri));
// else if (OB_UNLIKELY(!is_storage_type_match(uri, device_type_))) {
// ret = OB_INVALID_BACKUP_DEST;
// STORAGE_LOG(WARN, "uri prefix does not match the expected device type",
// K(ret), K(uri), K_(device_type));
// } else if (OB_FAIL(util_->del_unmerged_parts(uri))) {
// STORAGE_LOG(WARN, "fail to del unmerged parts!", K(ret), K(uri));
// }
@ -1373,10 +1419,13 @@ int ObStorageReader::open(const common::ObString &uri, common::ObObjectStorageIn
} else if (OB_ISNULL(storage_info) || OB_UNLIKELY(uri.empty() || !storage_info->is_valid())) {
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "invalid arguments", K(ret), K(uri), KPC(storage_info));
} else if (FALSE_IT(type = storage_info->get_type())) {
} else if (OB_UNLIKELY(!is_storage_type_match(uri, type))) {
ret = OB_INVALID_BACKUP_DEST;
STORAGE_LOG(WARN, "uri prefix does not match the expected device type",
K(ret), K(uri), KPC(storage_info), K(type));
} else if (OB_FAIL(databuff_printf(uri_, sizeof(uri_), "%.*s", uri.length(), uri.ptr()))) {
STORAGE_LOG(WARN, "failed to fill uri", K(ret), K(uri));
} else if (OB_FAIL(get_storage_type_from_path(uri, type))) {
STORAGE_LOG(WARN, "failed to get type", K(ret), K(uri));
} else if (OB_STORAGE_OSS == type) {
reader_ = &oss_reader_;
} else if (OB_STORAGE_COS == type) {
@ -1508,10 +1557,13 @@ int ObStorageAdaptiveReader::open(const common::ObString &uri,
} else if (OB_ISNULL(storage_info) || OB_UNLIKELY(uri.empty() || !storage_info->is_valid())) {
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "invalid arguments", K(ret), K(uri), KPC(storage_info));
} else if (FALSE_IT(type = storage_info->get_type())) {
} else if (OB_UNLIKELY(!is_storage_type_match(uri, type))) {
ret = OB_INVALID_BACKUP_DEST;
STORAGE_LOG(WARN, "uri prefix does not match the expected device type",
K(ret), K(uri), KPC(storage_info), K(type));
} else if (OB_FAIL(databuff_printf(uri_, sizeof(uri_), "%.*s", uri.length(), uri.ptr()))) {
STORAGE_LOG(WARN, "failed to fill uri", K(ret), K(uri));
} else if (OB_FAIL(get_storage_type_from_path(uri, type))) {
STORAGE_LOG(WARN, "failed to get type", K(ret), K(uri));
} else if (OB_STORAGE_OSS == type) {
reader_ = &oss_reader_;
} else if (OB_STORAGE_COS == type) {
@ -1710,10 +1762,13 @@ int ObStorageWriter::open(const common::ObString &uri, common::ObObjectStorageIn
} else if (OB_ISNULL(storage_info) || OB_UNLIKELY(uri.empty() || !storage_info->is_valid())) {
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "invalid arguments", K(ret), K(uri), KPC(storage_info));
} else if (FALSE_IT(type = storage_info->get_type())) {
} else if (OB_UNLIKELY(!is_storage_type_match(uri, type))) {
ret = OB_INVALID_BACKUP_DEST;
STORAGE_LOG(WARN, "uri prefix does not match the expected device type",
K(ret), K(uri), KPC(storage_info), K(type));
} else if (OB_FAIL(databuff_printf(uri_, sizeof(uri_), "%.*s", uri.length(), uri.ptr()))) {
STORAGE_LOG(WARN, "failed to fill uri", K(ret), K(uri));
} else if (OB_FAIL(get_storage_type_from_path(uri, type))) {
STORAGE_LOG(WARN, "failed to get type", K(ret), K(uri));
} else if (OB_STORAGE_OSS == type) {
writer_ = &oss_writer_;
} else if (OB_STORAGE_COS == type) {
@ -1854,10 +1909,13 @@ int ObStorageAppender::open(
} else if (OB_ISNULL(storage_info) || OB_UNLIKELY(uri.empty() || !storage_info->is_valid())) {
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "invalid arguments", K(ret), K(uri), KPC(storage_info));
} else if (FALSE_IT(type_ = storage_info->get_type())) {
} else if (OB_UNLIKELY(!is_storage_type_match(uri, type_))) {
ret = OB_INVALID_BACKUP_DEST;
STORAGE_LOG(WARN, "uri prefix does not match the expected device type",
K(ret), K(uri), KPC(storage_info), K_(type));
} else if (OB_FAIL(databuff_printf(uri_, sizeof(uri_), "%.*s", uri.length(), uri.ptr()))) {
STORAGE_LOG(WARN, "failed to fill uri", K(ret), K(uri));
} else if (OB_FAIL(get_storage_type_from_path(uri, type_))) {
STORAGE_LOG(WARN, "failed to get type", K(ret), K(uri));
} else if (OB_STORAGE_OSS == type_ || OB_STORAGE_COS == type_ || OB_STORAGE_S3 == type_) {
if (OB_FAIL(storage_info_.assign(*storage_info))) {
STORAGE_LOG(WARN, "failed to copy storage info", K(ret));
@ -2149,10 +2207,13 @@ int ObStorageMultiPartWriter::open(
} else if (OB_ISNULL(storage_info) || OB_UNLIKELY(uri.empty() || !storage_info->is_valid())) {
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "invalid arguments", K(ret), K(uri), KPC(storage_info));
} else if (FALSE_IT(type = storage_info->get_type())) {
} else if (OB_UNLIKELY(!is_storage_type_match(uri, type))) {
ret = OB_INVALID_BACKUP_DEST;
STORAGE_LOG(WARN, "uri prefix does not match the expected device type",
K(ret), K(uri), KPC(storage_info), K(type));
} else if (OB_FAIL(databuff_printf(uri_, sizeof(uri_), "%.*s", uri.length(), uri.ptr()))) {
STORAGE_LOG(WARN, "failed to fill uri", K(ret), K(uri));
} else if (OB_FAIL(get_storage_type_from_path(uri, type))) {
STORAGE_LOG(WARN, "failed to get type", K(ret), K(uri));
} else if (OB_STORAGE_OSS == type || OB_STORAGE_COS == type || OB_STORAGE_S3 == type) {
if (OB_FAIL(storage_info_.assign(*storage_info))) {
STORAGE_LOG(WARN, "failed to copy storage info", K(ret), KPC(storage_info));

View File

@ -781,6 +781,8 @@ int ObStorageCosReader::pread(
int64_t &read_size)
{
int ret = OB_SUCCESS;
ObCosMemAllocator allocator;
qcloud_cos::ObCosWrapper::Handle *tmp_cos_handle = nullptr;
ObExternalIOCounterGuard io_guard;
if (!is_opened_) {
@ -789,6 +791,17 @@ int ObStorageCosReader::pread(
} else if (OB_ISNULL(buf) || OB_UNLIKELY(buf_size <= 0 || offset < 0)) {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "invalid argument", K(ret), KP(buf), K(buf_size), K(offset));
// The created cos_handle contains a memory allocator that is not thread-safe and
// cannot be used concurrently. As the allocator is not designed to handle concurrent calls,
// using it in parallel (such as calling reader.pread simultaneously from multiple threads)
// can lead to race conditions, undefined behavior, and potential crashes (core dumps).
// To maintain thread safety, a new temporary cos_handle should be created for each individual
// pread operation rather than reusing the same handle. This approach ensures that memory
// allocation is safely performed without conflicts across concurrent operations.
} else if (OB_FAIL(create_cos_handle(
allocator, handle_.get_cos_account(),
checksum_type_ == ObStorageChecksumType::OB_MD5_ALGO, tmp_cos_handle))) {
OB_LOG(WARN, "fail to create tmp cos handle", K(ret), K_(checksum_type));
} else {
// When is_range_read is true, it indicates that only a part of the data is read.
// When false, it indicates that the entire object is read
@ -817,7 +830,7 @@ int ObStorageCosReader::pread(
qcloud_cos::CosStringBuffer object_name = qcloud_cos::CosStringBuffer(
handle_.get_object_name().ptr(), handle_.get_object_name().length());
if (OB_FAIL(qcloud_cos::ObCosWrapper::pread(handle_.get_ptr(), bucket_name,
if (OB_FAIL(qcloud_cos::ObCosWrapper::pread(tmp_cos_handle, bucket_name,
object_name, offset, buf, get_data_size, is_range_read, read_size))) {
OB_LOG(WARN, "fail to read object from cos", K(ret), K(is_range_read),
KP(buf), K(buf_size), K(offset), K(get_data_size), K_(has_meta));

View File

@ -1246,9 +1246,6 @@ int ObStorageS3Reader::pread_(char *buf,
} else {
Aws::S3::Model::GetObjectRequest request;
Aws::S3::Model::GetObjectOutcome outcome;
if (get_data_size == file_length_) {
request.SetChecksumMode(Aws::S3::Model::ChecksumMode::ENABLED);
}
char range_read[64] = { 0 };
request.WithBucket(bucket_.ptr()).WithKey(object_.ptr());