diff --git a/deps/oblib/src/lib/restore/cos/ob_cos_wrapper.cpp b/deps/oblib/src/lib/restore/cos/ob_cos_wrapper.cpp index 5b86ad2fe4..967cfcb92f 100644 --- a/deps/oblib/src/lib/restore/cos/ob_cos_wrapper.cpp +++ b/deps/oblib/src/lib/restore/cos/ob_cos_wrapper.cpp @@ -23,6 +23,7 @@ #include "cos_auth.h" #include "cos_sys_util.h" #include "apr_errno.h" +#include "cos_crc64.h" #include "ob_cos_wrapper.h" @@ -38,6 +39,7 @@ constexpr int OB_INVALID_ARGUMENT = -4002; constexpr int OB_INIT_TWICE = -4005; constexpr int OB_ALLOCATE_MEMORY_FAILED = -4013; constexpr int OB_SIZE_OVERFLOW = -4019; +constexpr int OB_CHECKSUM_ERROR = -4103; constexpr int OB_BACKUP_FILE_NOT_EXIST = -9011; constexpr int OB_COS_ERROR = -9060; constexpr int OB_IO_LIMIT = -9061; @@ -814,6 +816,7 @@ int ObCosWrapper::pread( int64_t offset, char *buf, int64_t buf_size, + const bool is_range_read, int64_t &read_size) { int ret = OB_SUCCESS; @@ -849,42 +852,57 @@ int ObCosWrapper::pread( const char* const COS_RANGE_KEY = "Range"; char range_size[COS_RANGE_SIZE]; - // Cos read range of [10, 100] include the start offset 10 and the end offset 100. - // But what we except is [10, 100) which does not include the end. - // So we subtract 1 from the end. - int n = snprintf(range_size, COS_RANGE_SIZE, "bytes=%ld-%ld", offset, offset + buf_size - 1); - if (0 >= n || COS_RANGE_SIZE <= n) { - ret = OB_SIZE_OVERFLOW; - cos_warn_log("[COS]fail to format range size,n=%d, ret=%d\n", n, ret); - } else if (NULL == (headers = cos_table_make(ctx->mem_pool, 1))) { + if (NULL == (headers = cos_table_make(ctx->mem_pool, 1))) { ret = OB_ALLOCATE_MEMORY_FAILED; cos_warn_log("[COS]fail to make cos headers, ret=%d\n", ret); } else { - apr_table_set(headers, COS_RANGE_KEY, range_size); + if (is_range_read) { + // Cos read range of [10, 100] include the start offset 10 and the end offset 100. + // But what we except is [10, 100) which does not include the end. + // So we subtract 1 from the end. + int n = snprintf(range_size, COS_RANGE_SIZE, "bytes=%ld-%ld", offset, offset + buf_size - 1); + if (0 >= n || COS_RANGE_SIZE <= n) { + ret = OB_SIZE_OVERFLOW; + cos_warn_log("[COS]fail to format range size,n=%d, ret=%d\n", n, ret); + } else { + apr_table_set(headers, COS_RANGE_KEY, range_size); + } + } - if (NULL == (cos_ret = cos_get_object_to_buffer(ctx->options, &bucket, &object, headers, NULL, &buffer, &resp_headers)) || - !cos_status_is_ok(cos_ret)) { + if (OB_SUCCESS != ret) { + } else if (NULL == (cos_ret = cos_get_object_to_buffer(ctx->options, &bucket, &object, headers, NULL, &buffer, &resp_headers)) || + !cos_status_is_ok(cos_ret)) { convert_io_error(cos_ret, ret); cos_warn_log("[COS]fail to get object to buffer, ret=%d\n", ret); log_status(cos_ret); } else { + read_size = 0; int64_t size = 0; int64_t buf_pos = 0; cos_buf_t *content = NULL; + int64_t needed_size = -1; cos_list_for_each_entry(cos_buf_t, content, &buffer, node) { size = cos_buf_size(content); - if (buf_pos + size > buf_size) { + needed_size = size; + if (buf_size - buf_pos < size) { + needed_size = buf_size - buf_pos; + } + if (is_range_read && (buf_pos + size > buf_size)) { ret = OB_COS_ERROR; cos_warn_log("[COS]unexpected error, too much data returned, ret=%d, range_size=%s, buf_pos=%ld, size=%ld, req_id=%s.\n", ret, range_size, buf_pos, size, cos_ret->req_id); log_status(cos_ret); break; } else { // copy to buf - memcpy(buf + buf_pos, content->pos, (size_t)size); - buf_pos += size; - read_size += size; + memcpy(buf + buf_pos, content->pos, (size_t)needed_size); + buf_pos += needed_size; + read_size += needed_size; + + if (buf_pos >= buf_size) { + break; + } } - } + } // end cos_list_for_each_entry } } } @@ -900,62 +918,8 @@ int ObCosWrapper::get_object( int64_t buf_size, int64_t &read_size) { - int ret = OB_SUCCESS; - - CosContext *ctx = reinterpret_cast(h); - read_size = 0; - - if (NULL == h) { - ret = OB_INVALID_ARGUMENT; - cos_warn_log("[COS]cos handle is null, ret=%d\n", ret); - } else if (bucket_name.empty()) { - ret = OB_INVALID_ARGUMENT; - cos_warn_log("[COS]bucket name is null, ret=%d\n", ret); - } else if (object_name.empty()) { - ret = OB_INVALID_ARGUMENT; - cos_warn_log("[COS]object name is null, ret=%d\n", ret); - } else if (NULL == buf || 0 >= buf_size) { - ret = OB_INVALID_ARGUMENT; - cos_warn_log("[COS]buffer is null, ret=%d\n", ret); - } else { - cos_string_t bucket; - cos_string_t object; - cos_str_set(&bucket, bucket_name.data_); - cos_str_set(&object, object_name.data_); - cos_table_t *headers = NULL; - cos_table_t *resp_headers = NULL; - cos_status_t *cos_ret = NULL; - - cos_list_t buffer; - cos_list_init(&buffer); - - if (NULL == (cos_ret = cos_get_object_to_buffer(ctx->options, &bucket, &object, headers, NULL, &buffer, &resp_headers)) || - !cos_status_is_ok(cos_ret)) { - convert_io_error(cos_ret, ret); - cos_warn_log("[COS]fail to get object to buffer, ret=%d\n", ret); - log_status(cos_ret); - } else { - int64_t size = 0; - int64_t buf_pos = 0; - cos_buf_t *content = NULL; - cos_list_for_each_entry(cos_buf_t, content, &buffer, node) { - size = cos_buf_size(content); - if (buf_pos + size > buf_size) { - ret = OB_COS_ERROR; - cos_warn_log("[COS]unexpected error, too much data returned, ret=%d, buf_pos=%ld, size=%ld, buf_size=%ld, req_id=%s.\n", ret, buf_pos, size, buf_size, cos_ret->req_id); - log_status(cos_ret); - break; - } else { - // copy to buf - memcpy(buf + buf_pos, content->pos, (size_t)size); - buf_pos += size; - read_size += size; - } - } - } - } - - return ret; + return ObCosWrapper::pread(h, bucket_name, object_name, + 0, buf, buf_size, false/*is_range_read*/, read_size); } int ObCosWrapper::is_object_tagging( @@ -1444,7 +1408,8 @@ int ObCosWrapper::upload_part_from_buffer( const CosStringBuffer &upload_id_str, const int part_num, const char *buf, - const int64_t buf_size) + const int64_t buf_size, + uint64_t &total_crc) { int ret = OB_SUCCESS; CosContext *ctx = reinterpret_cast(h); @@ -1488,6 +1453,9 @@ int ObCosWrapper::upload_part_from_buffer( convert_io_error(cos_ret, ret); cos_warn_log("[COS]fail to upload part to cos, ret=%d, part_num=%d\n", ret, part_num); log_status(cos_ret); + } else { + // TODO @fangdan: supports parallel uploads + total_crc = cos_crc64(total_crc, (void *)buf, buf_size); } } } @@ -1498,7 +1466,8 @@ int ObCosWrapper::complete_multipart_upload( Handle *h, const CosStringBuffer &bucket_name, const CosStringBuffer &object_name, - const CosStringBuffer &upload_id_str) + const CosStringBuffer &upload_id_str, + const uint64_t total_crc) { int ret = OB_SUCCESS; CosContext *ctx = reinterpret_cast(h); @@ -1575,6 +1544,16 @@ int ObCosWrapper::complete_multipart_upload( convert_io_error(cos_ret, ret); cos_warn_log("[COS]fail to complete multipart upload, ret=%d\n", ret); log_status(cos_ret); + } else { + const char *expected_crc_str = (const char *)(apr_table_get(resp_headers, COS_HASH_CRC64_ECMA)); + if (NULL != expected_crc_str) { + uint64_t expected_crc = cos_atoui64(expected_crc_str); + if (expected_crc != total_crc) { + ret = OB_CHECKSUM_ERROR; + cos_warn_log("[COS]multipart object crc is not equal to returned crc, ret=%d, upload_id=%s, total_crc=%llu, expected_crc=%llu\n", + ret, upload_id_str.data_, total_crc, expected_crc); + } + } } } diff --git a/deps/oblib/src/lib/restore/cos/ob_cos_wrapper.h b/deps/oblib/src/lib/restore/cos/ob_cos_wrapper.h index b035bbe176..0472defcd2 100644 --- a/deps/oblib/src/lib/restore/cos/ob_cos_wrapper.h +++ b/deps/oblib/src/lib/restore/cos/ob_cos_wrapper.h @@ -269,6 +269,7 @@ public: int64_t offset, char *buf, int64_t buf_size, + const bool is_range_read, int64_t &read_size); // Get whole object @@ -372,13 +373,15 @@ public: const CosStringBuffer &upload_id_str, const int part_num, /*the sequence number of this part, [1, 10000]*/ const char *buf, - const int64_t buf_size); + const int64_t buf_size, + uint64_t &total_crc); static int complete_multipart_upload( Handle *h, const CosStringBuffer &bucket_name, const CosStringBuffer &object_name, - const CosStringBuffer &upload_id_str); + const CosStringBuffer &upload_id_str, + const uint64_t total_crc); static int abort_multipart_upload( Handle *h, diff --git a/deps/oblib/src/lib/restore/ob_storage_cos_base.cpp b/deps/oblib/src/lib/restore/ob_storage_cos_base.cpp index 2685d3fa28..58d0419fbe 100644 --- a/deps/oblib/src/lib/restore/ob_storage_cos_base.cpp +++ b/deps/oblib/src/lib/restore/ob_storage_cos_base.cpp @@ -786,13 +786,38 @@ int ObStorageCosReader::pread( ret = OB_INVALID_ARGUMENT; OB_LOG(WARN, "invalid argument", K(ret), KP(buf), K(buf_size), K(offset)); } else { - qcloud_cos::CosStringBuffer bucket_name = qcloud_cos::CosStringBuffer( - handle_.get_bucket_name().ptr(), handle_.get_bucket_name().length()); - qcloud_cos::CosStringBuffer object_name = qcloud_cos::CosStringBuffer( - handle_.get_object_name().ptr(), handle_.get_object_name().length()); - if (OB_FAIL(qcloud_cos::ObCosWrapper::pread(handle_.get_ptr(), bucket_name, - object_name, offset, buf, buf_size, read_size))) { - OB_LOG(WARN, "fail to read object from cos", K(ret), KP(buf), K(buf_size), K(offset)); + // When is_range_read is true, it indicates that only a part of the data is read. + // When false, it indicates that the entire object is read + bool is_range_read = true; + int64_t get_data_size = buf_size; + if (has_meta_) { + if (file_length_ < offset) { + ret = OB_FILE_LENGTH_INVALID; + OB_LOG(WARN, "File lenth is invilid", K_(file_length), K(offset), + K(handle_.get_bucket_name()), K(handle_.get_object_name()), K(ret)); + } else { + get_data_size = MIN(buf_size, file_length_ - offset); + if (get_data_size == file_length_) { + // read entire object + is_range_read = false; + } + } + } + + if (OB_FAIL(ret)) { + } else if (get_data_size == 0) { + read_size = 0; + } else { + qcloud_cos::CosStringBuffer bucket_name = qcloud_cos::CosStringBuffer( + handle_.get_bucket_name().ptr(), handle_.get_bucket_name().length()); + qcloud_cos::CosStringBuffer object_name = qcloud_cos::CosStringBuffer( + handle_.get_object_name().ptr(), handle_.get_object_name().length()); + + if (OB_FAIL(qcloud_cos::ObCosWrapper::pread(handle_.get_ptr(), bucket_name, + object_name, offset, buf, get_data_size, is_range_read, read_size))) { + OB_LOG(WARN, "fail to read object from cos", K(ret), K(is_range_read), + KP(buf), K(buf_size), K(offset), K(get_data_size), K_(has_meta)); + } } } return ret; @@ -1026,7 +1051,8 @@ ObStorageCosMultiPartWriter::ObStorageCosMultiPartWriter() base_buf_pos_(0), upload_id_(NULL), partnum_(0), - file_length_(-1) + file_length_(-1), + total_crc_(0) {} ObStorageCosMultiPartWriter::~ObStorageCosMultiPartWriter() @@ -1084,6 +1110,7 @@ int ObStorageCosMultiPartWriter::open(const ObString &uri, common::ObObjectStora is_opened_ = true; base_buf_pos_ = 0; file_length_ = 0; + total_crc_ = 0; } } return ret; @@ -1152,7 +1179,7 @@ int ObStorageCosMultiPartWriter::write_single_part() qcloud_cos::CosStringBuffer upload_id_str = qcloud_cos::CosStringBuffer( upload_id_, strlen(upload_id_)); if (OB_FAIL(qcloud_cos::ObCosWrapper::upload_part_from_buffer(handle_.get_ptr(), bucket_name, - object_name, upload_id_str, partnum_, base_buf_, base_buf_pos_))) { + object_name, upload_id_str, partnum_, base_buf_, base_buf_pos_, total_crc_))) { OB_LOG(WARN, "fail to upload part to cos", K(ret), KP_(upload_id)); if (OB_TMP_FAIL(cleanup())) { OB_LOG(WARN, "fail to abort multiupload", K(ret), K(tmp_ret), KP_(upload_id)); @@ -1192,7 +1219,7 @@ int ObStorageCosMultiPartWriter::close() upload_id_, strlen(upload_id_)); if (OB_FAIL(qcloud_cos::ObCosWrapper::complete_multipart_upload(handle_.get_ptr(), bucket_name, - object_name, upload_id_str))) { + object_name, upload_id_str, total_crc_))) { OB_LOG(WARN, "fail to complete multipart upload", K(ret), KP_(upload_id)); if (OB_TMP_FAIL(cleanup())) { OB_LOG(WARN, "fail to abort multiupload", K(ret), K(tmp_ret), KP_(upload_id)); diff --git a/deps/oblib/src/lib/restore/ob_storage_cos_base.h b/deps/oblib/src/lib/restore/ob_storage_cos_base.h index 5d2caff698..95753fdfea 100644 --- a/deps/oblib/src/lib/restore/ob_storage_cos_base.h +++ b/deps/oblib/src/lib/restore/ob_storage_cos_base.h @@ -191,6 +191,7 @@ private: char *upload_id_; int partnum_; int64_t file_length_; + uint64_t total_crc_; DISALLOW_COPY_AND_ASSIGN(ObStorageCosMultiPartWriter); }; diff --git a/deps/oblib/src/lib/restore/ob_storage_oss_base.cpp b/deps/oblib/src/lib/restore/ob_storage_oss_base.cpp index e3a5778d76..f2b1f88ea6 100644 --- a/deps/oblib/src/lib/restore/ob_storage_oss_base.cpp +++ b/deps/oblib/src/lib/restore/ob_storage_oss_base.cpp @@ -17,6 +17,7 @@ #include "common/ob_string_buf.h" #include "apr_errno.h" #include "ob_storage.h" +#include "aos_crc64.h" #include "lib/hash/ob_hashset.h" #include "lib/utility/ob_tracepoint.h" @@ -643,7 +644,8 @@ ObStorageOssMultiPartWriter::ObStorageOssMultiPartWriter() object_(), partnum_(0), is_opened_(false), - file_length_(-1) + file_length_(-1), + total_crc_(0) { upload_id_.len = -1; upload_id_.data = NULL; @@ -697,6 +699,7 @@ int ObStorageOssMultiPartWriter::open(const ObString &uri, common::ObObjectStora is_opened_ = true; base_buf_pos_ = 0; file_length_ = 0; + total_crc_ = 0; } } } @@ -790,6 +793,9 @@ int ObStorageOssMultiPartWriter::write_single_part() K_(base_buf_pos), K_(bucket), K_(object), K(ret)); print_oss_info(resp_headers, aos_ret); cleanup(); + } else { + // TODO @fangdan: supports parallel uploads + total_crc_ = aos_crc64(total_crc_, base_buf_, base_buf_pos_); } bool is_slow = false; print_access_storage_log("oss upload one part ", object_, start_time, base_buf_pos_, &is_slow); @@ -894,6 +900,16 @@ int ObStorageOssMultiPartWriter::close() OB_LOG(WARN, "fail to complete multipart upload", K_(bucket), K_(object), K(ret)); print_oss_info(resp_headers, aos_ret); cleanup(); + } else { + const char *expected_crc_str = (const char *)(apr_table_get(resp_headers, OSS_HASH_CRC64_ECMA)); + if (OB_NOT_NULL(expected_crc_str)) { + uint64_t expected_crc = aos_atoui64(expected_crc_str); + if (OB_UNLIKELY(expected_crc != total_crc_)) { + ret = OB_CHECKSUM_ERROR; + OB_LOG(WARN, "multipart object crc is not equal to returned crc", + K(ret), K_(total_crc), K(expected_crc)); + } + } } } } @@ -1011,15 +1027,29 @@ int ObStorageOssReader::pread( ret = OB_INVALID_ARGUMENT; OB_LOG(WARN, "aos pool or oss option is NULL", K(aos_pool), K(oss_option), K(ret)); } else { - if (has_meta_ && file_length_ == offset) { + // When is_range_read is true, it indicates that only a part of the data is read. + // When false, it indicates that the entire object is read + bool is_range_read = true; + int64_t get_data_size = buf_size; + if (has_meta_) { + if (file_length_ < offset) { + ret = OB_FILE_LENGTH_INVALID; + OB_LOG(WARN, "File lenth is invilid", K_(file_length), + K(offset), K_(bucket), K_(object), K(ret)); + } else { + get_data_size = MIN(buf_size, file_length_ - offset); + if (get_data_size == file_length_) { + // read entire object + is_range_read = false; + } + } + } + + if (OB_FAIL(ret)) { + } else if (get_data_size == 0) { read_size = 0; - } else if (has_meta_ && file_length_ < offset) { - ret = OB_FILE_LENGTH_INVALID; - OB_LOG(WARN, "File lenth is invilid", K(file_length_), K(offset), - K(bucket_), K(object_), K(ret)); } else { const int64_t start_time = ObTimeUtility::current_time(); - int64_t get_data_size = buf_size; aos_string_t bucket; aos_string_t object; aos_str_set(&bucket, bucket_.ptr()); @@ -1037,49 +1067,61 @@ int ObStorageOssReader::pread( const char *const OSS_RANGE_KEY = "Range"; char range_size[OSS_RANGE_SIZE]; - //oss read size is [10, 100] include the 10 and 100 bytes - //we except is [10, 100) not include the end, so we subtraction 1 to the end - int n = snprintf(range_size, OSS_RANGE_SIZE, "bytes=%ld-%ld", offset, offset + get_data_size - 1); - if(n <=0 || n >= OSS_RANGE_SIZE) { - ret = OB_SIZE_OVERFLOW; - OB_LOG(WARN, "fail to get range size", K(n), K(OSS_RANGE_SIZE), K(ret)); - } else if (NULL == (headers = aos_table_make(aos_pool, AOS_TABLE_INIT_SIZE))) { + if (NULL == (headers = aos_table_make(aos_pool, AOS_TABLE_INIT_SIZE))) { ret = OB_OSS_ERROR; OB_LOG(WARN, "fail to make oss headers", K(ret)); } else { - apr_table_set(headers, OSS_RANGE_KEY, range_size); + if (is_range_read) { + // oss read size is [10, 100] include the 10 and 100 bytes + // we except is [10, 100) not include the end, so we subtraction 1 to the end + if (OB_FAIL(databuff_printf(range_size, OSS_RANGE_SIZE, "bytes=%ld-%ld", + offset, offset + get_data_size - 1))) { + OB_LOG(WARN, "fail to get range size", K(ret), + K(offset), K(get_data_size), K(OSS_RANGE_SIZE)); + } else { + apr_table_set(headers, OSS_RANGE_KEY, range_size); + } + } - if (NULL == (aos_ret = oss_get_object_to_buffer(oss_option, &bucket, &object, headers, params, + if (OB_FAIL(ret)) { + } else if (NULL == (aos_ret = oss_get_object_to_buffer(oss_option, &bucket, &object, headers, params, &buffer, &resp_headers)) || !aos_status_is_ok(aos_ret)) { - convert_io_error(aos_ret, ret); - OB_LOG(WARN, "fail to get object to buffer", K(bucket_), K(object_), K(ret)); + convert_io_error(aos_ret, ret); + OB_LOG(WARN, "fail to get object to buffer", K_(bucket), K_(object), K(ret)); print_oss_info(resp_headers, aos_ret); } else { //check date len aos_list_for_each_entry(aos_buf_t, content, &buffer, node) { len += aos_buf_size(content); } - if(len > get_data_size) { + // For appendable file, there may be data written in between the time when the reader is opened and pread is called. + // At this point, the actual size of the file could be larger than the determined file length at the time of opening. + // Therefore, if it's not a range read, the data read could exceed the expected amount to be read + if (is_range_read && len > get_data_size) { ret = OB_OSS_ERROR; - OB_LOG(WARN, "get data size error", K(get_data_size), K(len), K(bucket_), - K(object_), K(ret)); + OB_LOG(WARN, "get data size error", K(get_data_size), K(len), K_(bucket), + K_(object), K(ret), K_(has_meta), K(offset)); } else { //copy to buf + read_size = 0; + int64_t needed_size = -1; aos_list_for_each_entry(aos_buf_t, content, &buffer, node) { size = aos_buf_size(content); - if (buf_pos + size > get_data_size) { + needed_size = MIN(size, get_data_size - buf_pos); + if (is_range_read && (buf_pos + size > get_data_size)) { ret = OB_SIZE_OVERFLOW; OB_LOG(WARN, "the size is too long", K(buf_pos), K(size), K(get_data_size), K(ret)); break; } else { - memcpy(buf + buf_pos, content->pos, (size_t)size); - buf_pos += size; - } - } + memcpy(buf + buf_pos, content->pos, (size_t)needed_size); + buf_pos += needed_size; + read_size += needed_size; - if (OB_SUCC(ret)) { - read_size = len; - } + if (buf_pos >= get_data_size) { + break; + } + } + } // end aos_list_for_each_entry } } } diff --git a/deps/oblib/src/lib/restore/ob_storage_oss_base.h b/deps/oblib/src/lib/restore/ob_storage_oss_base.h index 53c8a74bc5..d5951514ad 100644 --- a/deps/oblib/src/lib/restore/ob_storage_oss_base.h +++ b/deps/oblib/src/lib/restore/ob_storage_oss_base.h @@ -186,6 +186,7 @@ private: int partnum_; bool is_opened_; int64_t file_length_; + uint64_t total_crc_; DISALLOW_COPY_AND_ASSIGN(ObStorageOssMultiPartWriter); }; diff --git a/deps/oblib/src/lib/restore/ob_storage_s3_base.cpp b/deps/oblib/src/lib/restore/ob_storage_s3_base.cpp index 6a617d85d5..586ad16ad9 100644 --- a/deps/oblib/src/lib/restore/ob_storage_s3_base.cpp +++ b/deps/oblib/src/lib/restore/ob_storage_s3_base.cpp @@ -1161,14 +1161,19 @@ int ObStorageS3Reader::pread_(char *buf, } else { Aws::S3::Model::GetObjectRequest request; Aws::S3::Model::GetObjectOutcome outcome; + const bool check_crc = (default_s3_crc_algo == ObStorageCRCAlgorithm::OB_CRC32_ALGO); + if (check_crc) { + request.SetChecksumMode(Aws::S3::Model::ChecksumMode::ENABLED); + } char range_read[64] = { 0 }; request.WithBucket(bucket_.ptr()).WithKey(object_.ptr()); request.SetAdditionalCustomHeaderValue("Connection", "keep-alive"); + const int64_t get_data_size = has_meta_ ? MIN(buf_size, file_length_ - offset) : buf_size; if (OB_FAIL(databuff_printf(range_read, sizeof(range_read), - "bytes=%ld-%ld", offset, offset + buf_size - 1))) { - OB_LOG(WARN, "fail to set range to read", - K(ret), K_(bucket), K_(object), K(offset), K(buf_size)); + "bytes=%ld-%ld", offset, offset + get_data_size - 1))) { + OB_LOG(WARN, "fail to set range to read", K(ret), + K_(bucket), K_(object), K(offset), K(buf_size), K_(has_meta), K_(file_length)); } else if (FALSE_IT(request.SetRange(range_read))) { } else if (OB_FAIL(s3_client_->get_object(request, outcome))) { OB_LOG(WARN, "failed to get s3 object", K(ret), K(range_read)); @@ -1952,6 +1957,7 @@ int ObStorageS3MultiPartWriter::close_() } // list parts + const bool check_crc = (default_s3_crc_algo == ObStorageCRCAlgorithm::OB_CRC32_ALGO); Aws::S3::Model::CompletedMultipartUpload completedMultipartUpload; int64_t part_num = 0; if (OB_SUCC(ret)) { @@ -1972,6 +1978,9 @@ int ObStorageS3MultiPartWriter::close_() for (int64_t i = 0; OB_SUCC(ret) && i < parts.size(); i++) { Aws::S3::Model::CompletedPart tmp_part; tmp_part.WithPartNumber(parts[i].GetPartNumber()).WithETag(parts[i].GetETag()); + if (check_crc) { + tmp_part.SetChecksumCRC32(parts[i].GetChecksumCRC32()); + } completedMultipartUpload.AddParts(std::move(tmp_part)); } } @@ -1984,7 +1993,7 @@ int ObStorageS3MultiPartWriter::close_() Aws::S3::Model::CompleteMultipartUploadRequest request; request.WithBucket(bucket_.ptr()).WithKey(object_.ptr()); request.WithUploadId(upload_id_).WithMultipartUpload(completedMultipartUpload); - const bool check_crc = (default_s3_crc_algo == ObStorageCRCAlgorithm::OB_CRC32_ALGO); + Aws::String complete_crc; if (check_crc) { if (OB_ISNULL(sum_hash_)) { @@ -2070,6 +2079,7 @@ int ObStorageS3MultiPartWriter::write_single_part_() ret = OB_ERR_UNEXPECTED; OB_LOG(WARN, "sum_hash should not be null", K(ret)); } else { + // TODO @fangdan: supports parallel uploads Aws::Utils::Crypto::CRC32 part_hash; request.SetChecksumAlgorithm(Aws::S3::Model::ChecksumAlgorithm::CRC32); Aws::String part_str(base_buf_, base_buf_pos_); diff --git a/deps/oblib/src/lib/restore/ob_storage_s3_base.h b/deps/oblib/src/lib/restore/ob_storage_s3_base.h index c79c86db21..c0b13382d3 100644 --- a/deps/oblib/src/lib/restore/ob_storage_s3_base.h +++ b/deps/oblib/src/lib/restore/ob_storage_s3_base.h @@ -63,7 +63,7 @@ int init_s3_env(); void fin_s3_env(); // default s3 checksum algorithm -static ObStorageCRCAlgorithm default_s3_crc_algo = ObStorageCRCAlgorithm::OB_INVALID_CRC_ALGO; +static ObStorageCRCAlgorithm default_s3_crc_algo = ObStorageCRCAlgorithm::OB_CRC32_ALGO; // set checksum algorithm for writing object into s3 void set_s3_checksum_algorithm(const ObStorageCRCAlgorithm crc_algo); // get current checksum algorithm diff --git a/deps/oblib/unittest/lib/restore/test_object_storage.cpp b/deps/oblib/unittest/lib/restore/test_object_storage.cpp index 00562baeb0..57b3d0dc99 100644 --- a/deps/oblib/unittest/lib/restore/test_object_storage.cpp +++ b/deps/oblib/unittest/lib/restore/test_object_storage.cpp @@ -1240,6 +1240,66 @@ TEST_F(TestObjectStorage, test_cross_testing) } } +TEST_F(TestObjectStorage, test_read_single_file) +{ + int ret = OB_SUCCESS; + if (enable_test) { + ObStorageUtil util; + ASSERT_EQ(OB_SUCCESS, util.open(&info_base)); + const char *tmp_dir = "test_read_single_file"; + const int64_t ts = ObTimeUtility::current_time(); + ASSERT_EQ(OB_SUCCESS, databuff_printf(dir_uri, sizeof(dir_uri), "%s/%s/%s_%ld", + bucket, dir_name, tmp_dir, ts)); + + { + // normal + ObStorageWriter writer; + ObStorageReader reader; + ASSERT_EQ(OB_SUCCESS, databuff_printf(uri, sizeof(uri), "%s/test_normal", dir_uri)); + ASSERT_EQ(OB_SUCCESS, writer.open(uri, &info_base)); + ASSERT_EQ(OB_SUCCESS, writer.write("123456", 6)); + ASSERT_EQ(OB_SUCCESS, writer.close()); + + int64_t read_size = -1; + char read_buf[100]; + memset(read_buf, 0, sizeof(read_buf)); + ASSERT_EQ(OB_SUCCESS, reader.open(uri, &info_base)); + ASSERT_EQ(OB_SUCCESS, reader.pread(read_buf, 100, 0, read_size)); + ASSERT_EQ(6, read_size); + ASSERT_EQ(0, strncmp(read_buf, "123456", 6)); + ASSERT_EQ(OB_SUCCESS, reader.close()); + + ASSERT_EQ(OB_SUCCESS, util.del_file(uri)); + } + + { + // appendable + ObStorageAppender appender; + ObStorageAdaptiveReader reader; + + const char *write_content = "0123456789"; + const int64_t first_content_len = 6; + ASSERT_EQ(OB_SUCCESS, databuff_printf(uri, sizeof(uri), "%s/test_appendable", dir_uri)); + ASSERT_EQ(OB_SUCCESS, appender.open(uri, &info_base)); + ASSERT_EQ(OB_SUCCESS, appender.pwrite(write_content, first_content_len, 0)); + + int64_t read_size = -1; + char read_buf[100]; + memset(read_buf, 0, sizeof(read_buf)); + ASSERT_EQ(OB_SUCCESS, reader.open(uri, &info_base)); + + ASSERT_EQ(OB_SUCCESS, appender.pwrite(write_content, 1, first_content_len)); + + ASSERT_EQ(OB_SUCCESS, reader.pread(read_buf, 100, 0, read_size)); + ASSERT_EQ(first_content_len, read_size); + ASSERT_EQ(0, strncmp(read_buf, write_content, first_content_len)); + ASSERT_EQ(OB_SUCCESS, reader.close()); + + ASSERT_EQ(OB_SUCCESS, util.del_file(uri)); + } + } +} + TEST_F(TestObjectStorage, test_multipart_write) { int ret = OB_SUCCESS; @@ -1265,6 +1325,7 @@ TEST_F(TestObjectStorage, test_multipart_write) ASSERT_EQ(OB_SUCCESS, databuff_printf(uri, sizeof(uri), "%s/test_multipart", dir_uri)); ObStorageMultiPartWriter writer; + // ObStorageWriter writer; ASSERT_EQ(OB_SUCCESS, writer.open(uri, &info_base)); ASSERT_EQ(OB_SUCCESS, writer.write(write_buf, content_size)); ASSERT_EQ(content_size, writer.get_length());