[enhance](S3FIleWriter) Add md5 check for small file not suitable for multi part upload (#22296)
This commit is contained in:
@ -71,7 +71,7 @@ struct S3FileBuffer : public std::enable_shared_from_this<S3FileBuffer> {
|
||||
// get the size of the content already appendded
|
||||
size_t get_size() const { return _size; }
|
||||
// get the underlying stream containing
|
||||
std::shared_ptr<std::iostream> get_stream() const { return _stream_ptr; }
|
||||
const std::shared_ptr<std::iostream>& get_stream() const { return _stream_ptr; }
|
||||
// get file offset corresponding to the buffer
|
||||
size_t get_file_offset() const { return _offset; }
|
||||
// set the offset of the buffer
|
||||
|
||||
@ -123,11 +123,19 @@ Status S3FileWriter::_create_multi_upload_request() {
|
||||
_bucket, _path.native(), _upload_id, outcome.GetError().GetMessage());
|
||||
}
|
||||
|
||||
void S3FileWriter::_wait_until_finish(std::string task_name) {
|
||||
void S3FileWriter::_wait_until_finish(std::string_view task_name) {
|
||||
auto msg =
|
||||
fmt::format("{} multipart upload already takes 5 min, bucket={}, key={}, upload_id={}",
|
||||
std::move(task_name), _bucket, _path.native(), _upload_id);
|
||||
while (!_wait.wait()) {
|
||||
task_name, _bucket, _path.native(), _upload_id);
|
||||
timespec current_time;
|
||||
// We don't need high accuracy here, so we use time(nullptr)
|
||||
// since it's the fastest way to get current time(second)
|
||||
auto current_time_second = time(nullptr);
|
||||
current_time.tv_sec = current_time_second + 300;
|
||||
current_time.tv_nsec = 0;
|
||||
// bthread::countdown_event::timed_wait() should use absolute time
|
||||
while (0 != _countdown_event.timed_wait(current_time)) {
|
||||
current_time.tv_sec += 300;
|
||||
LOG(WARNING) << msg;
|
||||
}
|
||||
}
|
||||
@ -184,7 +192,7 @@ Status S3FileWriter::close() {
|
||||
_pending_buf->set_upload_remote_callback(
|
||||
[this, buf = _pending_buf]() { _put_object(*buf); });
|
||||
}
|
||||
_wait.add();
|
||||
_countdown_event.add_count();
|
||||
_pending_buf->submit();
|
||||
_pending_buf = nullptr;
|
||||
}
|
||||
@ -212,7 +220,7 @@ Status S3FileWriter::appendv(const Slice* data, size_t data_cnt) {
|
||||
});
|
||||
_pending_buf->set_file_offset(_bytes_appended);
|
||||
// later we might need to wait all prior tasks to be finished
|
||||
_pending_buf->set_finish_upload([this]() { _wait.done(); });
|
||||
_pending_buf->set_finish_upload([this]() { _countdown_event.signal(); });
|
||||
_pending_buf->set_is_cancel([this]() { return _failed.load(); });
|
||||
_pending_buf->set_on_failed([this, part_num = _cur_part_num](Status st) {
|
||||
VLOG_NOTICE << "failed at key: " << _key << ", load part " << part_num
|
||||
@ -241,7 +249,7 @@ Status S3FileWriter::appendv(const Slice* data, size_t data_cnt) {
|
||||
RETURN_IF_ERROR(_create_multi_upload_request());
|
||||
}
|
||||
_cur_part_num++;
|
||||
_wait.add();
|
||||
_countdown_event.add_count();
|
||||
_pending_buf->submit();
|
||||
_pending_buf = nullptr;
|
||||
}
|
||||
@ -259,11 +267,9 @@ void S3FileWriter::_upload_one_part(int64_t part_num, S3FileBuffer& buf) {
|
||||
upload_request.WithBucket(_bucket).WithKey(_key).WithPartNumber(part_num).WithUploadId(
|
||||
_upload_id);
|
||||
|
||||
auto _stream_ptr = buf.get_stream();
|
||||
|
||||
upload_request.SetBody(buf.get_stream());
|
||||
|
||||
Aws::Utils::ByteBuffer part_md5(Aws::Utils::HashingUtils::CalculateMD5(*_stream_ptr));
|
||||
Aws::Utils::ByteBuffer part_md5(Aws::Utils::HashingUtils::CalculateMD5(*buf.get_stream()));
|
||||
upload_request.SetContentMD5(Aws::Utils::HashingUtils::Base64Encode(part_md5));
|
||||
|
||||
upload_request.SetContentLength(buf.get_size());
|
||||
@ -286,7 +292,7 @@ void S3FileWriter::_upload_one_part(int64_t part_num, S3FileBuffer& buf) {
|
||||
|
||||
std::unique_ptr<CompletedPart> completed_part = std::make_unique<CompletedPart>();
|
||||
completed_part->SetPartNumber(part_num);
|
||||
auto etag = upload_part_outcome.GetResult().GetETag();
|
||||
const auto& etag = upload_part_outcome.GetResult().GetETag();
|
||||
// DCHECK(etag.empty());
|
||||
completed_part->SetETag(etag);
|
||||
|
||||
@ -345,7 +351,7 @@ Status S3FileWriter::finalize() {
|
||||
_pending_buf->set_upload_remote_callback(
|
||||
[this, buf = _pending_buf]() { _put_object(*buf); });
|
||||
}
|
||||
_wait.add();
|
||||
_countdown_event.add_count();
|
||||
_pending_buf->submit();
|
||||
_pending_buf = nullptr;
|
||||
}
|
||||
@ -357,6 +363,8 @@ void S3FileWriter::_put_object(S3FileBuffer& buf) {
|
||||
DCHECK(!_closed) << "closed " << _closed;
|
||||
Aws::S3::Model::PutObjectRequest request;
|
||||
request.WithBucket(_bucket).WithKey(_key);
|
||||
Aws::Utils::ByteBuffer part_md5(Aws::Utils::HashingUtils::CalculateMD5(*buf.get_stream()));
|
||||
request.SetContentMD5(Aws::Utils::HashingUtils::Base64Encode(part_md5));
|
||||
request.SetBody(buf.get_stream());
|
||||
request.SetContentLength(buf.get_size());
|
||||
request.SetContentType("application/octet-stream");
|
||||
|
||||
@ -18,6 +18,7 @@
|
||||
#pragma once
|
||||
|
||||
#include <aws/core/utils/memory/stl/AWSStringStream.h>
|
||||
#include <bthread/countdown_event.h>
|
||||
|
||||
#include <cstddef>
|
||||
#include <list>
|
||||
@ -57,48 +58,10 @@ public:
|
||||
return Status::NotSupported("not support");
|
||||
}
|
||||
|
||||
int64_t upload_cost_ms() const { return *_upload_cost_ms; }
|
||||
[[nodiscard]] int64_t upload_cost_ms() const { return *_upload_cost_ms; }
|
||||
|
||||
private:
|
||||
class WaitGroup {
|
||||
public:
|
||||
WaitGroup() = default;
|
||||
|
||||
~WaitGroup() = default;
|
||||
|
||||
WaitGroup(const WaitGroup&) = delete;
|
||||
WaitGroup(WaitGroup&&) = delete;
|
||||
void operator=(const WaitGroup&) = delete;
|
||||
void operator=(WaitGroup&&) = delete;
|
||||
// add one counter indicating one more concurrent worker
|
||||
void add(int count = 1) { _count += count; }
|
||||
|
||||
// decrease count if one concurrent worker finished it's work
|
||||
void done() {
|
||||
_count--;
|
||||
if (_count.load() <= 0) {
|
||||
_cv.notify_all();
|
||||
}
|
||||
}
|
||||
|
||||
// wait for all concurrent workers finish their work and return true
|
||||
// would return false if timeout, default timeout would be 5min
|
||||
bool wait(int64_t timeout_seconds = 300) {
|
||||
if (_count.load() <= 0) {
|
||||
return true;
|
||||
}
|
||||
std::unique_lock<std::mutex> lck {_lock};
|
||||
_cv.wait_for(lck, std::chrono::seconds(timeout_seconds),
|
||||
[this]() { return _count.load() <= 0; });
|
||||
return _count.load() <= 0;
|
||||
}
|
||||
|
||||
private:
|
||||
std::mutex _lock;
|
||||
std::condition_variable _cv;
|
||||
std::atomic_int64_t _count {0};
|
||||
};
|
||||
void _wait_until_finish(std::string task_name);
|
||||
void _wait_until_finish(std::string_view task_name);
|
||||
Status _complete();
|
||||
Status _create_multi_upload_request();
|
||||
void _put_object(S3FileBuffer& buf);
|
||||
@ -119,7 +82,8 @@ private:
|
||||
std::mutex _completed_lock;
|
||||
std::vector<std::unique_ptr<Aws::S3::Model::CompletedPart>> _completed_parts;
|
||||
|
||||
WaitGroup _wait;
|
||||
// **Attention** call add_count() before submitting buf to async thread pool
|
||||
bthread::CountdownEvent _countdown_event {0};
|
||||
|
||||
std::atomic_bool _failed = false;
|
||||
Status _st = Status::OK();
|
||||
|
||||
Reference in New Issue
Block a user