Supports CRC checksum verification for object storage
This commit is contained in:
parent
65c6511fc9
commit
c7b8a188c3
127
deps/oblib/src/lib/restore/cos/ob_cos_wrapper.cpp
vendored
127
deps/oblib/src/lib/restore/cos/ob_cos_wrapper.cpp
vendored
@ -23,6 +23,7 @@
|
||||
#include "cos_auth.h"
|
||||
#include "cos_sys_util.h"
|
||||
#include "apr_errno.h"
|
||||
#include "cos_crc64.h"
|
||||
|
||||
#include "ob_cos_wrapper.h"
|
||||
|
||||
@ -38,6 +39,7 @@ constexpr int OB_INVALID_ARGUMENT = -4002;
|
||||
constexpr int OB_INIT_TWICE = -4005;
|
||||
constexpr int OB_ALLOCATE_MEMORY_FAILED = -4013;
|
||||
constexpr int OB_SIZE_OVERFLOW = -4019;
|
||||
constexpr int OB_CHECKSUM_ERROR = -4103;
|
||||
constexpr int OB_BACKUP_FILE_NOT_EXIST = -9011;
|
||||
constexpr int OB_COS_ERROR = -9060;
|
||||
constexpr int OB_IO_LIMIT = -9061;
|
||||
@ -814,6 +816,7 @@ int ObCosWrapper::pread(
|
||||
int64_t offset,
|
||||
char *buf,
|
||||
int64_t buf_size,
|
||||
const bool is_range_read,
|
||||
int64_t &read_size)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -849,42 +852,57 @@ int ObCosWrapper::pread(
|
||||
const char* const COS_RANGE_KEY = "Range";
|
||||
|
||||
char range_size[COS_RANGE_SIZE];
|
||||
// Cos read range of [10, 100] include the start offset 10 and the end offset 100.
|
||||
// But what we except is [10, 100) which does not include the end.
|
||||
// So we subtract 1 from the end.
|
||||
int n = snprintf(range_size, COS_RANGE_SIZE, "bytes=%ld-%ld", offset, offset + buf_size - 1);
|
||||
if (0 >= n || COS_RANGE_SIZE <= n) {
|
||||
ret = OB_SIZE_OVERFLOW;
|
||||
cos_warn_log("[COS]fail to format range size,n=%d, ret=%d\n", n, ret);
|
||||
} else if (NULL == (headers = cos_table_make(ctx->mem_pool, 1))) {
|
||||
if (NULL == (headers = cos_table_make(ctx->mem_pool, 1))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
cos_warn_log("[COS]fail to make cos headers, ret=%d\n", ret);
|
||||
} else {
|
||||
apr_table_set(headers, COS_RANGE_KEY, range_size);
|
||||
if (is_range_read) {
|
||||
// Cos read range of [10, 100] include the start offset 10 and the end offset 100.
|
||||
// But what we except is [10, 100) which does not include the end.
|
||||
// So we subtract 1 from the end.
|
||||
int n = snprintf(range_size, COS_RANGE_SIZE, "bytes=%ld-%ld", offset, offset + buf_size - 1);
|
||||
if (0 >= n || COS_RANGE_SIZE <= n) {
|
||||
ret = OB_SIZE_OVERFLOW;
|
||||
cos_warn_log("[COS]fail to format range size,n=%d, ret=%d\n", n, ret);
|
||||
} else {
|
||||
apr_table_set(headers, COS_RANGE_KEY, range_size);
|
||||
}
|
||||
}
|
||||
|
||||
if (NULL == (cos_ret = cos_get_object_to_buffer(ctx->options, &bucket, &object, headers, NULL, &buffer, &resp_headers)) ||
|
||||
!cos_status_is_ok(cos_ret)) {
|
||||
if (OB_SUCCESS != ret) {
|
||||
} else if (NULL == (cos_ret = cos_get_object_to_buffer(ctx->options, &bucket, &object, headers, NULL, &buffer, &resp_headers)) ||
|
||||
!cos_status_is_ok(cos_ret)) {
|
||||
convert_io_error(cos_ret, ret);
|
||||
cos_warn_log("[COS]fail to get object to buffer, ret=%d\n", ret);
|
||||
log_status(cos_ret);
|
||||
} else {
|
||||
read_size = 0;
|
||||
int64_t size = 0;
|
||||
int64_t buf_pos = 0;
|
||||
cos_buf_t *content = NULL;
|
||||
int64_t needed_size = -1;
|
||||
cos_list_for_each_entry(cos_buf_t, content, &buffer, node) {
|
||||
size = cos_buf_size(content);
|
||||
if (buf_pos + size > buf_size) {
|
||||
needed_size = size;
|
||||
if (buf_size - buf_pos < size) {
|
||||
needed_size = buf_size - buf_pos;
|
||||
}
|
||||
if (is_range_read && (buf_pos + size > buf_size)) {
|
||||
ret = OB_COS_ERROR;
|
||||
cos_warn_log("[COS]unexpected error, too much data returned, ret=%d, range_size=%s, buf_pos=%ld, size=%ld, req_id=%s.\n", ret, range_size, buf_pos, size, cos_ret->req_id);
|
||||
log_status(cos_ret);
|
||||
break;
|
||||
} else {
|
||||
// copy to buf
|
||||
memcpy(buf + buf_pos, content->pos, (size_t)size);
|
||||
buf_pos += size;
|
||||
read_size += size;
|
||||
memcpy(buf + buf_pos, content->pos, (size_t)needed_size);
|
||||
buf_pos += needed_size;
|
||||
read_size += needed_size;
|
||||
|
||||
if (buf_pos >= buf_size) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
} // end cos_list_for_each_entry
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -900,62 +918,8 @@ int ObCosWrapper::get_object(
|
||||
int64_t buf_size,
|
||||
int64_t &read_size)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
CosContext *ctx = reinterpret_cast<CosContext *>(h);
|
||||
read_size = 0;
|
||||
|
||||
if (NULL == h) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
cos_warn_log("[COS]cos handle is null, ret=%d\n", ret);
|
||||
} else if (bucket_name.empty()) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
cos_warn_log("[COS]bucket name is null, ret=%d\n", ret);
|
||||
} else if (object_name.empty()) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
cos_warn_log("[COS]object name is null, ret=%d\n", ret);
|
||||
} else if (NULL == buf || 0 >= buf_size) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
cos_warn_log("[COS]buffer is null, ret=%d\n", ret);
|
||||
} else {
|
||||
cos_string_t bucket;
|
||||
cos_string_t object;
|
||||
cos_str_set(&bucket, bucket_name.data_);
|
||||
cos_str_set(&object, object_name.data_);
|
||||
cos_table_t *headers = NULL;
|
||||
cos_table_t *resp_headers = NULL;
|
||||
cos_status_t *cos_ret = NULL;
|
||||
|
||||
cos_list_t buffer;
|
||||
cos_list_init(&buffer);
|
||||
|
||||
if (NULL == (cos_ret = cos_get_object_to_buffer(ctx->options, &bucket, &object, headers, NULL, &buffer, &resp_headers)) ||
|
||||
!cos_status_is_ok(cos_ret)) {
|
||||
convert_io_error(cos_ret, ret);
|
||||
cos_warn_log("[COS]fail to get object to buffer, ret=%d\n", ret);
|
||||
log_status(cos_ret);
|
||||
} else {
|
||||
int64_t size = 0;
|
||||
int64_t buf_pos = 0;
|
||||
cos_buf_t *content = NULL;
|
||||
cos_list_for_each_entry(cos_buf_t, content, &buffer, node) {
|
||||
size = cos_buf_size(content);
|
||||
if (buf_pos + size > buf_size) {
|
||||
ret = OB_COS_ERROR;
|
||||
cos_warn_log("[COS]unexpected error, too much data returned, ret=%d, buf_pos=%ld, size=%ld, buf_size=%ld, req_id=%s.\n", ret, buf_pos, size, buf_size, cos_ret->req_id);
|
||||
log_status(cos_ret);
|
||||
break;
|
||||
} else {
|
||||
// copy to buf
|
||||
memcpy(buf + buf_pos, content->pos, (size_t)size);
|
||||
buf_pos += size;
|
||||
read_size += size;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
return ObCosWrapper::pread(h, bucket_name, object_name,
|
||||
0, buf, buf_size, false/*is_range_read*/, read_size);
|
||||
}
|
||||
|
||||
int ObCosWrapper::is_object_tagging(
|
||||
@ -1444,7 +1408,8 @@ int ObCosWrapper::upload_part_from_buffer(
|
||||
const CosStringBuffer &upload_id_str,
|
||||
const int part_num,
|
||||
const char *buf,
|
||||
const int64_t buf_size)
|
||||
const int64_t buf_size,
|
||||
uint64_t &total_crc)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
CosContext *ctx = reinterpret_cast<CosContext *>(h);
|
||||
@ -1488,6 +1453,9 @@ int ObCosWrapper::upload_part_from_buffer(
|
||||
convert_io_error(cos_ret, ret);
|
||||
cos_warn_log("[COS]fail to upload part to cos, ret=%d, part_num=%d\n", ret, part_num);
|
||||
log_status(cos_ret);
|
||||
} else {
|
||||
// TODO @fangdan: supports parallel uploads
|
||||
total_crc = cos_crc64(total_crc, (void *)buf, buf_size);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1498,7 +1466,8 @@ int ObCosWrapper::complete_multipart_upload(
|
||||
Handle *h,
|
||||
const CosStringBuffer &bucket_name,
|
||||
const CosStringBuffer &object_name,
|
||||
const CosStringBuffer &upload_id_str)
|
||||
const CosStringBuffer &upload_id_str,
|
||||
const uint64_t total_crc)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
CosContext *ctx = reinterpret_cast<CosContext *>(h);
|
||||
@ -1575,6 +1544,16 @@ int ObCosWrapper::complete_multipart_upload(
|
||||
convert_io_error(cos_ret, ret);
|
||||
cos_warn_log("[COS]fail to complete multipart upload, ret=%d\n", ret);
|
||||
log_status(cos_ret);
|
||||
} else {
|
||||
const char *expected_crc_str = (const char *)(apr_table_get(resp_headers, COS_HASH_CRC64_ECMA));
|
||||
if (NULL != expected_crc_str) {
|
||||
uint64_t expected_crc = cos_atoui64(expected_crc_str);
|
||||
if (expected_crc != total_crc) {
|
||||
ret = OB_CHECKSUM_ERROR;
|
||||
cos_warn_log("[COS]multipart object crc is not equal to returned crc, ret=%d, upload_id=%s, total_crc=%llu, expected_crc=%llu\n",
|
||||
ret, upload_id_str.data_, total_crc, expected_crc);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -269,6 +269,7 @@ public:
|
||||
int64_t offset,
|
||||
char *buf,
|
||||
int64_t buf_size,
|
||||
const bool is_range_read,
|
||||
int64_t &read_size);
|
||||
|
||||
// Get whole object
|
||||
@ -372,13 +373,15 @@ public:
|
||||
const CosStringBuffer &upload_id_str,
|
||||
const int part_num, /*the sequence number of this part, [1, 10000]*/
|
||||
const char *buf,
|
||||
const int64_t buf_size);
|
||||
const int64_t buf_size,
|
||||
uint64_t &total_crc);
|
||||
|
||||
static int complete_multipart_upload(
|
||||
Handle *h,
|
||||
const CosStringBuffer &bucket_name,
|
||||
const CosStringBuffer &object_name,
|
||||
const CosStringBuffer &upload_id_str);
|
||||
const CosStringBuffer &upload_id_str,
|
||||
const uint64_t total_crc);
|
||||
|
||||
static int abort_multipart_upload(
|
||||
Handle *h,
|
||||
|
@ -786,13 +786,38 @@ int ObStorageCosReader::pread(
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
OB_LOG(WARN, "invalid argument", K(ret), KP(buf), K(buf_size), K(offset));
|
||||
} else {
|
||||
qcloud_cos::CosStringBuffer bucket_name = qcloud_cos::CosStringBuffer(
|
||||
handle_.get_bucket_name().ptr(), handle_.get_bucket_name().length());
|
||||
qcloud_cos::CosStringBuffer object_name = qcloud_cos::CosStringBuffer(
|
||||
handle_.get_object_name().ptr(), handle_.get_object_name().length());
|
||||
if (OB_FAIL(qcloud_cos::ObCosWrapper::pread(handle_.get_ptr(), bucket_name,
|
||||
object_name, offset, buf, buf_size, read_size))) {
|
||||
OB_LOG(WARN, "fail to read object from cos", K(ret), KP(buf), K(buf_size), K(offset));
|
||||
// When is_range_read is true, it indicates that only a part of the data is read.
|
||||
// When false, it indicates that the entire object is read
|
||||
bool is_range_read = true;
|
||||
int64_t get_data_size = buf_size;
|
||||
if (has_meta_) {
|
||||
if (file_length_ < offset) {
|
||||
ret = OB_FILE_LENGTH_INVALID;
|
||||
OB_LOG(WARN, "File lenth is invilid", K_(file_length), K(offset),
|
||||
K(handle_.get_bucket_name()), K(handle_.get_object_name()), K(ret));
|
||||
} else {
|
||||
get_data_size = MIN(buf_size, file_length_ - offset);
|
||||
if (get_data_size == file_length_) {
|
||||
// read entire object
|
||||
is_range_read = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (get_data_size == 0) {
|
||||
read_size = 0;
|
||||
} else {
|
||||
qcloud_cos::CosStringBuffer bucket_name = qcloud_cos::CosStringBuffer(
|
||||
handle_.get_bucket_name().ptr(), handle_.get_bucket_name().length());
|
||||
qcloud_cos::CosStringBuffer object_name = qcloud_cos::CosStringBuffer(
|
||||
handle_.get_object_name().ptr(), handle_.get_object_name().length());
|
||||
|
||||
if (OB_FAIL(qcloud_cos::ObCosWrapper::pread(handle_.get_ptr(), bucket_name,
|
||||
object_name, offset, buf, get_data_size, is_range_read, read_size))) {
|
||||
OB_LOG(WARN, "fail to read object from cos", K(ret), K(is_range_read),
|
||||
KP(buf), K(buf_size), K(offset), K(get_data_size), K_(has_meta));
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
@ -1026,7 +1051,8 @@ ObStorageCosMultiPartWriter::ObStorageCosMultiPartWriter()
|
||||
base_buf_pos_(0),
|
||||
upload_id_(NULL),
|
||||
partnum_(0),
|
||||
file_length_(-1)
|
||||
file_length_(-1),
|
||||
total_crc_(0)
|
||||
{}
|
||||
|
||||
ObStorageCosMultiPartWriter::~ObStorageCosMultiPartWriter()
|
||||
@ -1084,6 +1110,7 @@ int ObStorageCosMultiPartWriter::open(const ObString &uri, common::ObObjectStora
|
||||
is_opened_ = true;
|
||||
base_buf_pos_ = 0;
|
||||
file_length_ = 0;
|
||||
total_crc_ = 0;
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
@ -1152,7 +1179,7 @@ int ObStorageCosMultiPartWriter::write_single_part()
|
||||
qcloud_cos::CosStringBuffer upload_id_str = qcloud_cos::CosStringBuffer(
|
||||
upload_id_, strlen(upload_id_));
|
||||
if (OB_FAIL(qcloud_cos::ObCosWrapper::upload_part_from_buffer(handle_.get_ptr(), bucket_name,
|
||||
object_name, upload_id_str, partnum_, base_buf_, base_buf_pos_))) {
|
||||
object_name, upload_id_str, partnum_, base_buf_, base_buf_pos_, total_crc_))) {
|
||||
OB_LOG(WARN, "fail to upload part to cos", K(ret), KP_(upload_id));
|
||||
if (OB_TMP_FAIL(cleanup())) {
|
||||
OB_LOG(WARN, "fail to abort multiupload", K(ret), K(tmp_ret), KP_(upload_id));
|
||||
@ -1192,7 +1219,7 @@ int ObStorageCosMultiPartWriter::close()
|
||||
upload_id_, strlen(upload_id_));
|
||||
|
||||
if (OB_FAIL(qcloud_cos::ObCosWrapper::complete_multipart_upload(handle_.get_ptr(), bucket_name,
|
||||
object_name, upload_id_str))) {
|
||||
object_name, upload_id_str, total_crc_))) {
|
||||
OB_LOG(WARN, "fail to complete multipart upload", K(ret), KP_(upload_id));
|
||||
if (OB_TMP_FAIL(cleanup())) {
|
||||
OB_LOG(WARN, "fail to abort multiupload", K(ret), K(tmp_ret), KP_(upload_id));
|
||||
|
@ -191,6 +191,7 @@ private:
|
||||
char *upload_id_;
|
||||
int partnum_;
|
||||
int64_t file_length_;
|
||||
uint64_t total_crc_;
|
||||
|
||||
DISALLOW_COPY_AND_ASSIGN(ObStorageCosMultiPartWriter);
|
||||
};
|
||||
|
100
deps/oblib/src/lib/restore/ob_storage_oss_base.cpp
vendored
100
deps/oblib/src/lib/restore/ob_storage_oss_base.cpp
vendored
@ -17,6 +17,7 @@
|
||||
#include "common/ob_string_buf.h"
|
||||
#include "apr_errno.h"
|
||||
#include "ob_storage.h"
|
||||
#include "aos_crc64.h"
|
||||
#include "lib/hash/ob_hashset.h"
|
||||
#include "lib/utility/ob_tracepoint.h"
|
||||
|
||||
@ -643,7 +644,8 @@ ObStorageOssMultiPartWriter::ObStorageOssMultiPartWriter()
|
||||
object_(),
|
||||
partnum_(0),
|
||||
is_opened_(false),
|
||||
file_length_(-1)
|
||||
file_length_(-1),
|
||||
total_crc_(0)
|
||||
{
|
||||
upload_id_.len = -1;
|
||||
upload_id_.data = NULL;
|
||||
@ -697,6 +699,7 @@ int ObStorageOssMultiPartWriter::open(const ObString &uri, common::ObObjectStora
|
||||
is_opened_ = true;
|
||||
base_buf_pos_ = 0;
|
||||
file_length_ = 0;
|
||||
total_crc_ = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -790,6 +793,9 @@ int ObStorageOssMultiPartWriter::write_single_part()
|
||||
K_(base_buf_pos), K_(bucket), K_(object), K(ret));
|
||||
print_oss_info(resp_headers, aos_ret);
|
||||
cleanup();
|
||||
} else {
|
||||
// TODO @fangdan: supports parallel uploads
|
||||
total_crc_ = aos_crc64(total_crc_, base_buf_, base_buf_pos_);
|
||||
}
|
||||
bool is_slow = false;
|
||||
print_access_storage_log("oss upload one part ", object_, start_time, base_buf_pos_, &is_slow);
|
||||
@ -894,6 +900,16 @@ int ObStorageOssMultiPartWriter::close()
|
||||
OB_LOG(WARN, "fail to complete multipart upload", K_(bucket), K_(object), K(ret));
|
||||
print_oss_info(resp_headers, aos_ret);
|
||||
cleanup();
|
||||
} else {
|
||||
const char *expected_crc_str = (const char *)(apr_table_get(resp_headers, OSS_HASH_CRC64_ECMA));
|
||||
if (OB_NOT_NULL(expected_crc_str)) {
|
||||
uint64_t expected_crc = aos_atoui64(expected_crc_str);
|
||||
if (OB_UNLIKELY(expected_crc != total_crc_)) {
|
||||
ret = OB_CHECKSUM_ERROR;
|
||||
OB_LOG(WARN, "multipart object crc is not equal to returned crc",
|
||||
K(ret), K_(total_crc), K(expected_crc));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1011,15 +1027,29 @@ int ObStorageOssReader::pread(
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
OB_LOG(WARN, "aos pool or oss option is NULL", K(aos_pool), K(oss_option), K(ret));
|
||||
} else {
|
||||
if (has_meta_ && file_length_ == offset) {
|
||||
// When is_range_read is true, it indicates that only a part of the data is read.
|
||||
// When false, it indicates that the entire object is read
|
||||
bool is_range_read = true;
|
||||
int64_t get_data_size = buf_size;
|
||||
if (has_meta_) {
|
||||
if (file_length_ < offset) {
|
||||
ret = OB_FILE_LENGTH_INVALID;
|
||||
OB_LOG(WARN, "File lenth is invilid", K_(file_length),
|
||||
K(offset), K_(bucket), K_(object), K(ret));
|
||||
} else {
|
||||
get_data_size = MIN(buf_size, file_length_ - offset);
|
||||
if (get_data_size == file_length_) {
|
||||
// read entire object
|
||||
is_range_read = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (get_data_size == 0) {
|
||||
read_size = 0;
|
||||
} else if (has_meta_ && file_length_ < offset) {
|
||||
ret = OB_FILE_LENGTH_INVALID;
|
||||
OB_LOG(WARN, "File lenth is invilid", K(file_length_), K(offset),
|
||||
K(bucket_), K(object_), K(ret));
|
||||
} else {
|
||||
const int64_t start_time = ObTimeUtility::current_time();
|
||||
int64_t get_data_size = buf_size;
|
||||
aos_string_t bucket;
|
||||
aos_string_t object;
|
||||
aos_str_set(&bucket, bucket_.ptr());
|
||||
@ -1037,49 +1067,61 @@ int ObStorageOssReader::pread(
|
||||
const char *const OSS_RANGE_KEY = "Range";
|
||||
|
||||
char range_size[OSS_RANGE_SIZE];
|
||||
//oss read size is [10, 100] include the 10 and 100 bytes
|
||||
//we except is [10, 100) not include the end, so we subtraction 1 to the end
|
||||
int n = snprintf(range_size, OSS_RANGE_SIZE, "bytes=%ld-%ld", offset, offset + get_data_size - 1);
|
||||
if(n <=0 || n >= OSS_RANGE_SIZE) {
|
||||
ret = OB_SIZE_OVERFLOW;
|
||||
OB_LOG(WARN, "fail to get range size", K(n), K(OSS_RANGE_SIZE), K(ret));
|
||||
} else if (NULL == (headers = aos_table_make(aos_pool, AOS_TABLE_INIT_SIZE))) {
|
||||
if (NULL == (headers = aos_table_make(aos_pool, AOS_TABLE_INIT_SIZE))) {
|
||||
ret = OB_OSS_ERROR;
|
||||
OB_LOG(WARN, "fail to make oss headers", K(ret));
|
||||
} else {
|
||||
apr_table_set(headers, OSS_RANGE_KEY, range_size);
|
||||
if (is_range_read) {
|
||||
// oss read size is [10, 100] include the 10 and 100 bytes
|
||||
// we except is [10, 100) not include the end, so we subtraction 1 to the end
|
||||
if (OB_FAIL(databuff_printf(range_size, OSS_RANGE_SIZE, "bytes=%ld-%ld",
|
||||
offset, offset + get_data_size - 1))) {
|
||||
OB_LOG(WARN, "fail to get range size", K(ret),
|
||||
K(offset), K(get_data_size), K(OSS_RANGE_SIZE));
|
||||
} else {
|
||||
apr_table_set(headers, OSS_RANGE_KEY, range_size);
|
||||
}
|
||||
}
|
||||
|
||||
if (NULL == (aos_ret = oss_get_object_to_buffer(oss_option, &bucket, &object, headers, params,
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (NULL == (aos_ret = oss_get_object_to_buffer(oss_option, &bucket, &object, headers, params,
|
||||
&buffer, &resp_headers)) || !aos_status_is_ok(aos_ret)) {
|
||||
convert_io_error(aos_ret, ret);
|
||||
OB_LOG(WARN, "fail to get object to buffer", K(bucket_), K(object_), K(ret));
|
||||
convert_io_error(aos_ret, ret);
|
||||
OB_LOG(WARN, "fail to get object to buffer", K_(bucket), K_(object), K(ret));
|
||||
print_oss_info(resp_headers, aos_ret);
|
||||
} else {
|
||||
//check date len
|
||||
aos_list_for_each_entry(aos_buf_t, content, &buffer, node) {
|
||||
len += aos_buf_size(content);
|
||||
}
|
||||
if(len > get_data_size) {
|
||||
// For appendable file, there may be data written in between the time when the reader is opened and pread is called.
|
||||
// At this point, the actual size of the file could be larger than the determined file length at the time of opening.
|
||||
// Therefore, if it's not a range read, the data read could exceed the expected amount to be read
|
||||
if (is_range_read && len > get_data_size) {
|
||||
ret = OB_OSS_ERROR;
|
||||
OB_LOG(WARN, "get data size error", K(get_data_size), K(len), K(bucket_),
|
||||
K(object_), K(ret));
|
||||
OB_LOG(WARN, "get data size error", K(get_data_size), K(len), K_(bucket),
|
||||
K_(object), K(ret), K_(has_meta), K(offset));
|
||||
} else {
|
||||
//copy to buf
|
||||
read_size = 0;
|
||||
int64_t needed_size = -1;
|
||||
aos_list_for_each_entry(aos_buf_t, content, &buffer, node) {
|
||||
size = aos_buf_size(content);
|
||||
if (buf_pos + size > get_data_size) {
|
||||
needed_size = MIN(size, get_data_size - buf_pos);
|
||||
if (is_range_read && (buf_pos + size > get_data_size)) {
|
||||
ret = OB_SIZE_OVERFLOW;
|
||||
OB_LOG(WARN, "the size is too long", K(buf_pos), K(size), K(get_data_size), K(ret));
|
||||
break;
|
||||
} else {
|
||||
memcpy(buf + buf_pos, content->pos, (size_t)size);
|
||||
buf_pos += size;
|
||||
}
|
||||
}
|
||||
memcpy(buf + buf_pos, content->pos, (size_t)needed_size);
|
||||
buf_pos += needed_size;
|
||||
read_size += needed_size;
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
read_size = len;
|
||||
}
|
||||
if (buf_pos >= get_data_size) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
} // end aos_list_for_each_entry
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -186,6 +186,7 @@ private:
|
||||
int partnum_;
|
||||
bool is_opened_;
|
||||
int64_t file_length_;
|
||||
uint64_t total_crc_;
|
||||
|
||||
DISALLOW_COPY_AND_ASSIGN(ObStorageOssMultiPartWriter);
|
||||
};
|
||||
|
@ -1161,14 +1161,19 @@ int ObStorageS3Reader::pread_(char *buf,
|
||||
} else {
|
||||
Aws::S3::Model::GetObjectRequest request;
|
||||
Aws::S3::Model::GetObjectOutcome outcome;
|
||||
const bool check_crc = (default_s3_crc_algo == ObStorageCRCAlgorithm::OB_CRC32_ALGO);
|
||||
if (check_crc) {
|
||||
request.SetChecksumMode(Aws::S3::Model::ChecksumMode::ENABLED);
|
||||
}
|
||||
char range_read[64] = { 0 };
|
||||
request.WithBucket(bucket_.ptr()).WithKey(object_.ptr());
|
||||
request.SetAdditionalCustomHeaderValue("Connection", "keep-alive");
|
||||
const int64_t get_data_size = has_meta_ ? MIN(buf_size, file_length_ - offset) : buf_size;
|
||||
|
||||
if (OB_FAIL(databuff_printf(range_read, sizeof(range_read),
|
||||
"bytes=%ld-%ld", offset, offset + buf_size - 1))) {
|
||||
OB_LOG(WARN, "fail to set range to read",
|
||||
K(ret), K_(bucket), K_(object), K(offset), K(buf_size));
|
||||
"bytes=%ld-%ld", offset, offset + get_data_size - 1))) {
|
||||
OB_LOG(WARN, "fail to set range to read", K(ret),
|
||||
K_(bucket), K_(object), K(offset), K(buf_size), K_(has_meta), K_(file_length));
|
||||
} else if (FALSE_IT(request.SetRange(range_read))) {
|
||||
} else if (OB_FAIL(s3_client_->get_object(request, outcome))) {
|
||||
OB_LOG(WARN, "failed to get s3 object", K(ret), K(range_read));
|
||||
@ -1952,6 +1957,7 @@ int ObStorageS3MultiPartWriter::close_()
|
||||
}
|
||||
|
||||
// list parts
|
||||
const bool check_crc = (default_s3_crc_algo == ObStorageCRCAlgorithm::OB_CRC32_ALGO);
|
||||
Aws::S3::Model::CompletedMultipartUpload completedMultipartUpload;
|
||||
int64_t part_num = 0;
|
||||
if (OB_SUCC(ret)) {
|
||||
@ -1972,6 +1978,9 @@ int ObStorageS3MultiPartWriter::close_()
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < parts.size(); i++) {
|
||||
Aws::S3::Model::CompletedPart tmp_part;
|
||||
tmp_part.WithPartNumber(parts[i].GetPartNumber()).WithETag(parts[i].GetETag());
|
||||
if (check_crc) {
|
||||
tmp_part.SetChecksumCRC32(parts[i].GetChecksumCRC32());
|
||||
}
|
||||
completedMultipartUpload.AddParts(std::move(tmp_part));
|
||||
}
|
||||
}
|
||||
@ -1984,7 +1993,7 @@ int ObStorageS3MultiPartWriter::close_()
|
||||
Aws::S3::Model::CompleteMultipartUploadRequest request;
|
||||
request.WithBucket(bucket_.ptr()).WithKey(object_.ptr());
|
||||
request.WithUploadId(upload_id_).WithMultipartUpload(completedMultipartUpload);
|
||||
const bool check_crc = (default_s3_crc_algo == ObStorageCRCAlgorithm::OB_CRC32_ALGO);
|
||||
|
||||
Aws::String complete_crc;
|
||||
if (check_crc) {
|
||||
if (OB_ISNULL(sum_hash_)) {
|
||||
@ -2070,6 +2079,7 @@ int ObStorageS3MultiPartWriter::write_single_part_()
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
OB_LOG(WARN, "sum_hash should not be null", K(ret));
|
||||
} else {
|
||||
// TODO @fangdan: supports parallel uploads
|
||||
Aws::Utils::Crypto::CRC32 part_hash;
|
||||
request.SetChecksumAlgorithm(Aws::S3::Model::ChecksumAlgorithm::CRC32);
|
||||
Aws::String part_str(base_buf_, base_buf_pos_);
|
||||
|
@ -63,7 +63,7 @@ int init_s3_env();
|
||||
void fin_s3_env();
|
||||
|
||||
// default s3 checksum algorithm
|
||||
static ObStorageCRCAlgorithm default_s3_crc_algo = ObStorageCRCAlgorithm::OB_INVALID_CRC_ALGO;
|
||||
static ObStorageCRCAlgorithm default_s3_crc_algo = ObStorageCRCAlgorithm::OB_CRC32_ALGO;
|
||||
// set checksum algorithm for writing object into s3
|
||||
void set_s3_checksum_algorithm(const ObStorageCRCAlgorithm crc_algo);
|
||||
// get current checksum algorithm
|
||||
|
@ -1240,6 +1240,66 @@ TEST_F(TestObjectStorage, test_cross_testing)
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(TestObjectStorage, test_read_single_file)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (enable_test) {
|
||||
ObStorageUtil util;
|
||||
ASSERT_EQ(OB_SUCCESS, util.open(&info_base));
|
||||
const char *tmp_dir = "test_read_single_file";
|
||||
const int64_t ts = ObTimeUtility::current_time();
|
||||
ASSERT_EQ(OB_SUCCESS, databuff_printf(dir_uri, sizeof(dir_uri), "%s/%s/%s_%ld",
|
||||
bucket, dir_name, tmp_dir, ts));
|
||||
|
||||
{
|
||||
// normal
|
||||
ObStorageWriter writer;
|
||||
ObStorageReader reader;
|
||||
ASSERT_EQ(OB_SUCCESS, databuff_printf(uri, sizeof(uri), "%s/test_normal", dir_uri));
|
||||
ASSERT_EQ(OB_SUCCESS, writer.open(uri, &info_base));
|
||||
ASSERT_EQ(OB_SUCCESS, writer.write("123456", 6));
|
||||
ASSERT_EQ(OB_SUCCESS, writer.close());
|
||||
|
||||
int64_t read_size = -1;
|
||||
char read_buf[100];
|
||||
memset(read_buf, 0, sizeof(read_buf));
|
||||
ASSERT_EQ(OB_SUCCESS, reader.open(uri, &info_base));
|
||||
ASSERT_EQ(OB_SUCCESS, reader.pread(read_buf, 100, 0, read_size));
|
||||
ASSERT_EQ(6, read_size);
|
||||
ASSERT_EQ(0, strncmp(read_buf, "123456", 6));
|
||||
ASSERT_EQ(OB_SUCCESS, reader.close());
|
||||
|
||||
ASSERT_EQ(OB_SUCCESS, util.del_file(uri));
|
||||
}
|
||||
|
||||
{
|
||||
// appendable
|
||||
ObStorageAppender appender;
|
||||
ObStorageAdaptiveReader reader;
|
||||
|
||||
const char *write_content = "0123456789";
|
||||
const int64_t first_content_len = 6;
|
||||
ASSERT_EQ(OB_SUCCESS, databuff_printf(uri, sizeof(uri), "%s/test_appendable", dir_uri));
|
||||
ASSERT_EQ(OB_SUCCESS, appender.open(uri, &info_base));
|
||||
ASSERT_EQ(OB_SUCCESS, appender.pwrite(write_content, first_content_len, 0));
|
||||
|
||||
int64_t read_size = -1;
|
||||
char read_buf[100];
|
||||
memset(read_buf, 0, sizeof(read_buf));
|
||||
ASSERT_EQ(OB_SUCCESS, reader.open(uri, &info_base));
|
||||
|
||||
ASSERT_EQ(OB_SUCCESS, appender.pwrite(write_content, 1, first_content_len));
|
||||
|
||||
ASSERT_EQ(OB_SUCCESS, reader.pread(read_buf, 100, 0, read_size));
|
||||
ASSERT_EQ(first_content_len, read_size);
|
||||
ASSERT_EQ(0, strncmp(read_buf, write_content, first_content_len));
|
||||
ASSERT_EQ(OB_SUCCESS, reader.close());
|
||||
|
||||
ASSERT_EQ(OB_SUCCESS, util.del_file(uri));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(TestObjectStorage, test_multipart_write)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -1265,6 +1325,7 @@ TEST_F(TestObjectStorage, test_multipart_write)
|
||||
|
||||
ASSERT_EQ(OB_SUCCESS, databuff_printf(uri, sizeof(uri), "%s/test_multipart", dir_uri));
|
||||
ObStorageMultiPartWriter writer;
|
||||
// ObStorageWriter writer;
|
||||
ASSERT_EQ(OB_SUCCESS, writer.open(uri, &info_base));
|
||||
ASSERT_EQ(OB_SUCCESS, writer.write(write_buf, content_size));
|
||||
ASSERT_EQ(content_size, writer.get_length());
|
||||
|
Loading…
x
Reference in New Issue
Block a user