[feature](S3FileWriter) Reduce network RTT for files that multipart are not applicable (#19135)

For files less than 5MB, we don't need to use multi part upload which would at least takes 3 network IO.
Instead we can just call PutObject which only takes one shot.
This commit is contained in:
AlexYue
2023-05-06 14:46:18 +08:00
committed by GitHub
parent ff6e0d3943
commit 83040c8f25
2 changed files with 75 additions and 69 deletions

View File

@ -32,8 +32,10 @@
#include <aws/s3/model/CompletedPart.h>
#include <aws/s3/model/CreateMultipartUploadRequest.h>
#include <aws/s3/model/CreateMultipartUploadResult.h>
#include <aws/s3/model/PutObjectRequest.h>
#include <aws/s3/model/UploadPartRequest.h>
#include <aws/s3/model/UploadPartResult.h>
#include <fmt/core.h>
#include <glog/logging.h>
#include <sstream>
@ -73,7 +75,7 @@ S3FileWriter::S3FileWriter(Path path, std::shared_ptr<S3Client> client, const S3
: FileWriter(Path(s3_conf.endpoint) / s3_conf.bucket / path, std::move(fs)),
_bucket(s3_conf.bucket),
_key(std::move(path)),
_upload_cost_ms(std::make_shared<int64_t>()),
_upload_cost_ms(std::make_unique<int64_t>()),
_client(std::move(client)) {}
S3FileWriter::~S3FileWriter() {
@ -83,36 +85,47 @@ S3FileWriter::~S3FileWriter() {
CHECK(!_opened || _closed) << "open: " << _opened << ", closed: " << _closed;
}
Status S3FileWriter::open() {
VLOG_DEBUG << "S3FileWriter::open, path: " << _path.native();
Status S3FileWriter::_create_multi_upload_request() {
CreateMultipartUploadRequest create_request;
create_request.WithBucket(_bucket).WithKey(_key);
create_request.SetContentType("text/plain");
create_request.SetContentType("application/octet-stream");
auto outcome = _client->CreateMultipartUpload(create_request);
if (outcome.IsSuccess()) {
_upload_id = outcome.GetResult().GetUploadId();
_closed = false;
_opened = true;
return Status::OK();
}
return Status::IOError("failed to create multipart upload(bucket={}, key={}, upload_id={}): {}",
_bucket, _path.native(), _upload_id, outcome.GetError().GetMessage());
}
void S3FileWriter::_wait_until_finish(std::string task_name) {
auto msg =
fmt::format("{} multipart upload already takes 5 min, bucket={}, key={}, upload_id={}",
std::move(task_name), _bucket, _path.native(), _upload_id);
while (_count.timed_wait({5 * 60, 0}) < 0) {
LOG(WARNING) << msg;
}
}
Status S3FileWriter::abort() {
_failed = true;
if (_closed || !_opened) {
return Status::OK();
}
// we need to reclaim the memory
if (_pending_buf) {
_pending_buf->on_finished();
_pending_buf = nullptr;
}
// upload id is empty means there was no create multi upload
if (_upload_id.empty()) {
return Status::OK();
}
VLOG_DEBUG << "S3FileWriter::abort, path: " << _path.native();
_closed = true;
while (!_wait.wait()) {
LOG(WARNING) << "Abort multipart upload already takes 5 min"
<< "bucket=" << _bucket << ", key=" << _path.native()
<< ", upload_id=" << _upload_id;
}
_wait_until_finish("Abort");
AbortMultipartUploadRequest request;
request.WithBucket(_bucket).WithKey(_key).WithUploadId(_upload_id);
auto outcome = _client->AbortMultipartUpload(request);
@ -135,7 +148,7 @@ Status S3FileWriter::close() {
VLOG_DEBUG << "S3FileWriter::close, path: " << _path.native();
_closed = true;
if (_pending_buf != nullptr) {
_wait.add();
_count.add_count();
_pending_buf->submit();
_pending_buf = nullptr;
}
@ -147,7 +160,9 @@ Status S3FileWriter::close() {
Status S3FileWriter::appendv(const Slice* data, size_t data_cnt) {
// lazy open
if (!_opened) {
RETURN_IF_ERROR(open());
VLOG_DEBUG << "S3FileWriter::open, path: " << _path.native();
_closed = false;
_opened = true;
}
DCHECK(!_closed);
size_t buffer_size = config::s3_write_buffer_size;
@ -164,7 +179,7 @@ Status S3FileWriter::appendv(const Slice* data, size_t data_cnt) {
});
_pending_buf->set_file_offset(_bytes_appended);
// later we might need to wait all prior tasks to be finished
_pending_buf->set_finish_upload([this]() { _wait.done(); });
_pending_buf->set_finish_upload([this]() { _count.signal(); });
_pending_buf->set_is_cancel([this]() { return _failed.load(); });
_pending_buf->set_on_failed([this, part_num = _cur_part_num](Status st) {
VLOG_NOTICE << "failed at key: " << _key << ", load part " << part_num
@ -187,8 +202,13 @@ Status S3FileWriter::appendv(const Slice* data, size_t data_cnt) {
// satisfy that the size is larger than or euqal to 5MB
// _complete() would handle the first situation
if (_pending_buf->get_size() == buffer_size) {
// only create multiple upload request when the data is more
// than one memory buffer
if (_cur_part_num == 1) {
RETURN_IF_ERROR(_create_multi_upload_request());
}
_cur_part_num++;
_wait.add();
_count.add_count();
_pending_buf->submit();
_pending_buf = nullptr;
}
@ -214,6 +234,7 @@ void S3FileWriter::_upload_one_part(int64_t part_num, S3FileBuffer& buf) {
upload_request.SetContentMD5(Aws::Utils::HashingUtils::Base64Encode(part_md5));
upload_request.SetContentLength(buf.get_size());
upload_request.SetContentType("application/octet-stream");
auto upload_part_callable = _client->UploadPartCallable(upload_request);
@ -228,37 +249,36 @@ void S3FileWriter::_upload_one_part(int64_t part_num, S3FileBuffer& buf) {
return;
}
std::shared_ptr<CompletedPart> completed_part = std::make_shared<CompletedPart>();
std::unique_ptr<CompletedPart> completed_part = std::make_unique<CompletedPart>();
completed_part->SetPartNumber(part_num);
auto etag = upload_part_outcome.GetResult().GetETag();
// DCHECK(etag.empty());
completed_part->SetETag(etag);
std::unique_lock<std::mutex> lck {_completed_lock};
_completed_parts.emplace_back(completed_part);
_completed_parts.emplace_back(std::move(completed_part));
_bytes_written += buf.get_size();
}
// TODO(AlexYue): if the whole size is less than 5MB, we can use just call put object method
// to reduce the network IO num to just one time
Status S3FileWriter::_complete() {
SCOPED_RAW_TIMER(_upload_cost_ms.get());
if (_failed) {
return _st;
}
// upload id is empty means there was no multipart upload
if (_upload_id.empty()) {
_wait_until_finish("PutObject");
return _st;
}
CompleteMultipartUploadRequest complete_request;
complete_request.WithBucket(_bucket).WithKey(_key).WithUploadId(_upload_id);
while (!_wait.wait()) {
LOG(WARNING) << "Complete multipart upload already takes 5 min"
<< "bucket=" << _bucket << ", key=" << _path.native()
<< ", upload_id=" << _upload_id;
}
_wait_until_finish("Complete");
// make sure _completed_parts are ascending order
std::sort(_completed_parts.begin(), _completed_parts.end(),
[](auto& p1, auto& p2) { return p1->GetPartNumber() < p2->GetPartNumber(); });
CompletedMultipartUpload completed_upload;
for (std::shared_ptr<CompletedPart> part : _completed_parts) {
for (auto& part : _completed_parts) {
completed_upload.AddParts(*part);
}
@ -281,12 +301,34 @@ Status S3FileWriter::finalize() {
// submit pending buf if it's not nullptr
// it's the last buf, we can submit it right now
if (_pending_buf != nullptr) {
_wait.add();
// if we only need to upload one file less than 5MB, we can just
// call PutObject to reduce the network IO
if (_upload_id.empty()) {
_pending_buf->set_upload_remote_callback(
[this, buf = _pending_buf]() { _put_object(*buf); });
}
_count.add_count();
_pending_buf->submit();
_pending_buf = nullptr;
}
return Status::OK();
}
void S3FileWriter::_put_object(S3FileBuffer& buf) {
DCHECK(!_closed && _opened);
Aws::S3::Model::PutObjectRequest request;
request.WithBucket(_bucket).WithKey(_key);
request.SetBody(buf.get_stream());
request.SetContentLength(buf.get_size());
request.SetContentType("application/octet-stream");
auto response = _client->PutObject(request);
if (!response.IsSuccess()) {
_st = Status::InternalError("Error: [{}:{}, responseCode:{}]",
response.GetError().GetExceptionName(),
response.GetError().GetMessage(),
static_cast<int>(response.GetError().GetResponseCode()));
}
}
} // namespace io
} // namespace doris

View File

@ -18,6 +18,7 @@
#pragma once
#include <aws/core/utils/memory/stl/AWSStringStream.h>
#include <bthread/countdown_event.h>
#include <cstddef>
#include <list>
@ -48,8 +49,6 @@ public:
FileSystemSPtr fs);
~S3FileWriter() override;
Status open();
Status close() override;
Status abort() override;
@ -64,45 +63,10 @@ public:
int64_t upload_cost_ms() const { return *_upload_cost_ms; }
private:
class WaitGroup {
public:
WaitGroup() = default;
~WaitGroup() = default;
WaitGroup(const WaitGroup&) = delete;
WaitGroup(WaitGroup&&) = delete;
void operator=(const WaitGroup&) = delete;
void operator=(WaitGroup&&) = delete;
// add one counter indicating one more concurrent worker
void add(int count = 1) { _count += count; }
// decrease count if one concurrent worker finished it's work
void done() {
_count--;
if (_count.load() <= 0) {
_cv.notify_all();
}
}
// wait for all concurrent workers finish their work and return true
// would return false if timeout, default timeout would be 5min
bool wait(int64_t timeout_seconds = 300) {
if (_count.load() <= 0) {
return true;
}
std::unique_lock<std::mutex> lck {_lock};
_cv.wait_for(lck, std::chrono::seconds(timeout_seconds),
[this]() { return _count.load() <= 0; });
return _count.load() <= 0;
}
private:
std::mutex _lock;
std::condition_variable _cv;
std::atomic_int64_t _count {0};
};
void _wait_until_finish(std::string task_name);
Status _complete();
Status _create_multi_upload_request();
void _put_object(S3FileBuffer& buf);
void _upload_one_part(int64_t part_num, S3FileBuffer& buf);
std::string _bucket;
@ -110,7 +74,7 @@ private:
bool _closed = true;
bool _opened = false;
std::shared_ptr<int64_t> _upload_cost_ms;
std::unique_ptr<int64_t> _upload_cost_ms;
std::shared_ptr<Aws::S3::S3Client> _client;
std::string _upload_id;
@ -119,9 +83,9 @@ private:
// Current Part Num for CompletedPart
int _cur_part_num = 1;
std::mutex _completed_lock;
std::vector<std::shared_ptr<Aws::S3::Model::CompletedPart>> _completed_parts;
std::vector<std::unique_ptr<Aws::S3::Model::CompletedPart>> _completed_parts;
WaitGroup _wait;
bthread::CountdownEvent _count;
std::atomic_bool _failed = false;
Status _st = Status::OK();