From d8d3e72534ddac68b371d88d42fe24ffe0a8123f Mon Sep 17 00:00:00 2001 From: AlexYue Date: Thu, 10 Aug 2023 18:11:07 +0800 Subject: [PATCH] [enhance](S3FIleWriter) Add md5 check for small file not suitable for multi part upload (#22296) --- be/src/io/fs/s3_file_write_bufferpool.h | 2 +- be/src/io/fs/s3_file_writer.cpp | 30 ++++++++++------ be/src/io/fs/s3_file_writer.h | 46 +++---------------------- 3 files changed, 25 insertions(+), 53 deletions(-) diff --git a/be/src/io/fs/s3_file_write_bufferpool.h b/be/src/io/fs/s3_file_write_bufferpool.h index 55fa53df42..f87a78289f 100644 --- a/be/src/io/fs/s3_file_write_bufferpool.h +++ b/be/src/io/fs/s3_file_write_bufferpool.h @@ -71,7 +71,7 @@ struct S3FileBuffer : public std::enable_shared_from_this { // get the size of the content already appendded size_t get_size() const { return _size; } // get the underlying stream containing - std::shared_ptr get_stream() const { return _stream_ptr; } + const std::shared_ptr& get_stream() const { return _stream_ptr; } // get file offset corresponding to the buffer size_t get_file_offset() const { return _offset; } // set the offset of the buffer diff --git a/be/src/io/fs/s3_file_writer.cpp b/be/src/io/fs/s3_file_writer.cpp index 6b7bd6ad8e..519470ebae 100644 --- a/be/src/io/fs/s3_file_writer.cpp +++ b/be/src/io/fs/s3_file_writer.cpp @@ -123,11 +123,19 @@ Status S3FileWriter::_create_multi_upload_request() { _bucket, _path.native(), _upload_id, outcome.GetError().GetMessage()); } -void S3FileWriter::_wait_until_finish(std::string task_name) { +void S3FileWriter::_wait_until_finish(std::string_view task_name) { auto msg = fmt::format("{} multipart upload already takes 5 min, bucket={}, key={}, upload_id={}", - std::move(task_name), _bucket, _path.native(), _upload_id); - while (!_wait.wait()) { + task_name, _bucket, _path.native(), _upload_id); + timespec current_time; + // We don't need high accuracy here, so we use time(nullptr) + // since it's the fastest way to get current time(second) + auto current_time_second = time(nullptr); + current_time.tv_sec = current_time_second + 300; + current_time.tv_nsec = 0; + // bthread::countdown_event::timed_wait() should use absolute time + while (0 != _countdown_event.timed_wait(current_time)) { + current_time.tv_sec += 300; LOG(WARNING) << msg; } } @@ -184,7 +192,7 @@ Status S3FileWriter::close() { _pending_buf->set_upload_remote_callback( [this, buf = _pending_buf]() { _put_object(*buf); }); } - _wait.add(); + _countdown_event.add_count(); _pending_buf->submit(); _pending_buf = nullptr; } @@ -212,7 +220,7 @@ Status S3FileWriter::appendv(const Slice* data, size_t data_cnt) { }); _pending_buf->set_file_offset(_bytes_appended); // later we might need to wait all prior tasks to be finished - _pending_buf->set_finish_upload([this]() { _wait.done(); }); + _pending_buf->set_finish_upload([this]() { _countdown_event.signal(); }); _pending_buf->set_is_cancel([this]() { return _failed.load(); }); _pending_buf->set_on_failed([this, part_num = _cur_part_num](Status st) { VLOG_NOTICE << "failed at key: " << _key << ", load part " << part_num @@ -241,7 +249,7 @@ Status S3FileWriter::appendv(const Slice* data, size_t data_cnt) { RETURN_IF_ERROR(_create_multi_upload_request()); } _cur_part_num++; - _wait.add(); + _countdown_event.add_count(); _pending_buf->submit(); _pending_buf = nullptr; } @@ -259,11 +267,9 @@ void S3FileWriter::_upload_one_part(int64_t part_num, S3FileBuffer& buf) { upload_request.WithBucket(_bucket).WithKey(_key).WithPartNumber(part_num).WithUploadId( _upload_id); - auto _stream_ptr = buf.get_stream(); - upload_request.SetBody(buf.get_stream()); - Aws::Utils::ByteBuffer part_md5(Aws::Utils::HashingUtils::CalculateMD5(*_stream_ptr)); + Aws::Utils::ByteBuffer part_md5(Aws::Utils::HashingUtils::CalculateMD5(*buf.get_stream())); upload_request.SetContentMD5(Aws::Utils::HashingUtils::Base64Encode(part_md5)); upload_request.SetContentLength(buf.get_size()); @@ -286,7 +292,7 @@ void S3FileWriter::_upload_one_part(int64_t part_num, S3FileBuffer& buf) { std::unique_ptr completed_part = std::make_unique(); completed_part->SetPartNumber(part_num); - auto etag = upload_part_outcome.GetResult().GetETag(); + const auto& etag = upload_part_outcome.GetResult().GetETag(); // DCHECK(etag.empty()); completed_part->SetETag(etag); @@ -345,7 +351,7 @@ Status S3FileWriter::finalize() { _pending_buf->set_upload_remote_callback( [this, buf = _pending_buf]() { _put_object(*buf); }); } - _wait.add(); + _countdown_event.add_count(); _pending_buf->submit(); _pending_buf = nullptr; } @@ -357,6 +363,8 @@ void S3FileWriter::_put_object(S3FileBuffer& buf) { DCHECK(!_closed) << "closed " << _closed; Aws::S3::Model::PutObjectRequest request; request.WithBucket(_bucket).WithKey(_key); + Aws::Utils::ByteBuffer part_md5(Aws::Utils::HashingUtils::CalculateMD5(*buf.get_stream())); + request.SetContentMD5(Aws::Utils::HashingUtils::Base64Encode(part_md5)); request.SetBody(buf.get_stream()); request.SetContentLength(buf.get_size()); request.SetContentType("application/octet-stream"); diff --git a/be/src/io/fs/s3_file_writer.h b/be/src/io/fs/s3_file_writer.h index d8956da88a..2c139242ed 100644 --- a/be/src/io/fs/s3_file_writer.h +++ b/be/src/io/fs/s3_file_writer.h @@ -18,6 +18,7 @@ #pragma once #include +#include #include #include @@ -57,48 +58,10 @@ public: return Status::NotSupported("not support"); } - int64_t upload_cost_ms() const { return *_upload_cost_ms; } + [[nodiscard]] int64_t upload_cost_ms() const { return *_upload_cost_ms; } private: - class WaitGroup { - public: - WaitGroup() = default; - - ~WaitGroup() = default; - - WaitGroup(const WaitGroup&) = delete; - WaitGroup(WaitGroup&&) = delete; - void operator=(const WaitGroup&) = delete; - void operator=(WaitGroup&&) = delete; - // add one counter indicating one more concurrent worker - void add(int count = 1) { _count += count; } - - // decrease count if one concurrent worker finished it's work - void done() { - _count--; - if (_count.load() <= 0) { - _cv.notify_all(); - } - } - - // wait for all concurrent workers finish their work and return true - // would return false if timeout, default timeout would be 5min - bool wait(int64_t timeout_seconds = 300) { - if (_count.load() <= 0) { - return true; - } - std::unique_lock lck {_lock}; - _cv.wait_for(lck, std::chrono::seconds(timeout_seconds), - [this]() { return _count.load() <= 0; }); - return _count.load() <= 0; - } - - private: - std::mutex _lock; - std::condition_variable _cv; - std::atomic_int64_t _count {0}; - }; - void _wait_until_finish(std::string task_name); + void _wait_until_finish(std::string_view task_name); Status _complete(); Status _create_multi_upload_request(); void _put_object(S3FileBuffer& buf); @@ -119,7 +82,8 @@ private: std::mutex _completed_lock; std::vector> _completed_parts; - WaitGroup _wait; + // **Attention** call add_count() before submitting buf to async thread pool + bthread::CountdownEvent _countdown_event {0}; std::atomic_bool _failed = false; Status _st = Status::OK();