From bc4e0e97f2473df488a6a00d3806d7bde7ee380b Mon Sep 17 00:00:00 2001 From: AlexYue Date: Fri, 26 May 2023 09:14:38 +0800 Subject: [PATCH] [enhance](S3FileWriter) abort when s3 file writer abnormally quite and optimize s3 buffer pool (#19944) 1. reduce s3 buffer pool's ctor cost 2. before this pr, if one s3 file writer return err when calling append or close function, the caller will not call abort function which result in one confusing DCHECK failed like the following picture --- be/src/io/fs/s3_file_write_bufferpool.cpp | 37 +++++++++++------------ be/src/io/fs/s3_file_write_bufferpool.h | 22 +++++++------- be/src/io/fs/s3_file_writer.cpp | 36 +++++++++++++--------- be/src/io/fs/s3_file_writer.h | 2 +- 4 files changed, 50 insertions(+), 47 deletions(-) diff --git a/be/src/io/fs/s3_file_write_bufferpool.cpp b/be/src/io/fs/s3_file_write_bufferpool.cpp index 56437c2a22..423068f387 100644 --- a/be/src/io/fs/s3_file_write_bufferpool.cpp +++ b/be/src/io/fs/s3_file_write_bufferpool.cpp @@ -28,11 +28,12 @@ namespace doris { namespace io { void S3FileBuffer::on_finished() { - if (_buf == nullptr) { + if (_buf.empty()) { return; } reset(); - S3FileBufferPool::GetInstance()->reclaim(shared_from_this()); + S3FileBufferPool::GetInstance()->reclaim(_buf); + _buf.clear(); } // when there is memory preserved, directly write data to buf @@ -42,8 +43,8 @@ void S3FileBuffer::append_data(const Slice& data) { Defer defer {[&] { _size += data.get_size(); }}; while (true) { // if buf is not empty, it means there is memory preserved for this buf - if (_buf != nullptr) { - memcpy(_buf->data() + _size, data.get_data(), data.get_size()); + if (!_buf.empty()) { + memcpy(_buf.data + _size, data.get_data(), data.get_size()); break; } else { // wait allocate buffer pool @@ -54,8 +55,8 @@ void S3FileBuffer::append_data(const Slice& data) { } void S3FileBuffer::submit() { - if (LIKELY(_buf != nullptr)) { - _stream_ptr = std::make_shared(_buf->data(), _size); + if (LIKELY(!_buf.empty())) { + _stream_ptr = std::make_shared(_buf.data, _size); } ExecEnv::GetInstance()->buffered_reader_prefetch_thread_pool()->submit_func( @@ -69,37 +70,33 @@ S3FileBufferPool::S3FileBufferPool() { (config::s3_write_buffer_whole_size > config::s3_write_buffer_size)); LOG_INFO("S3 file buffer pool with {} buffers", buf_num); for (size_t i = 0; i < buf_num; i++) { - auto buf = std::make_shared(); - buf->reserve_buffer(); - _free_buffers.emplace_back(std::move(buf)); + Slice s {_whole_mem_buffer.get() + i * config::s3_write_buffer_size, + static_cast(config::s3_write_buffer_size)}; + _free_raw_buffers.emplace_back(s); } } std::shared_ptr S3FileBufferPool::allocate(bool reserve) { - std::shared_ptr buf; + std::shared_ptr buf = std::make_shared(); // if need reserve then we must ensure return buf with memory preserved if (reserve) { { std::unique_lock lck {_lock}; - _cv.wait(lck, [this]() { return !_free_buffers.empty(); }); - buf = std::move(_free_buffers.front()); - _free_buffers.pop_front(); + _cv.wait(lck, [this]() { return !_free_raw_buffers.empty(); }); + buf->reserve_buffer(_free_raw_buffers.front()); + _free_raw_buffers.pop_front(); } return buf; } // try to get one memory reserved buffer { std::unique_lock lck {_lock}; - if (!_free_buffers.empty()) { - buf = std::move(_free_buffers.front()); - _free_buffers.pop_front(); + if (!_free_raw_buffers.empty()) { + buf->reserve_buffer(_free_raw_buffers.front()); + _free_raw_buffers.pop_front(); } } - if (buf != nullptr) { - return buf; - } // if there is no free buffer and no need to reserve memory, we could return one empty buffer - buf = std::make_shared(); // if the buf has no memory reserved, it would try to write the data to file cache first // or it would try to rob buffer from other S3FileBuffer return buf; diff --git a/be/src/io/fs/s3_file_write_bufferpool.h b/be/src/io/fs/s3_file_write_bufferpool.h index b7b1b9f261..b69964b48e 100644 --- a/be/src/io/fs/s3_file_write_bufferpool.h +++ b/be/src/io/fs/s3_file_write_bufferpool.h @@ -33,7 +33,7 @@ namespace doris { namespace io { -// TODO(AlexYue): 1. support write into cache 2. unify write buffer and read buffer 3. decouple reserved memory and Callbacks +// TODO(AlexYue): 1. support write into cache 2. unify write buffer and read buffer struct S3FileBuffer : public std::enable_shared_from_this { using Callback = std::function; @@ -41,14 +41,13 @@ struct S3FileBuffer : public std::enable_shared_from_this { ~S3FileBuffer() = default; void rob_buffer(std::shared_ptr& other) { - _buf = std::move(other->_buf); - other->_buf = nullptr; + _buf = other->_buf; + // we should clear other's memory buffer in case it woule be reclaimed twice + // when calling on_finished + other->_buf.clear(); } - void reserve_buffer() { - _buf = std::make_unique(); - _buf->resize(config::s3_write_buffer_size); - } + void reserve_buffer(Slice s) { _buf = s; } // apend data into the memory buffer inside or into the file cache // if the buffer has no memory buffer @@ -109,7 +108,7 @@ struct S3FileBuffer : public std::enable_shared_from_this { size_t _size; std::shared_ptr _stream_ptr; // only served as one reserved buffer - std::unique_ptr _buf; + Slice _buf; size_t _append_offset {0}; }; @@ -123,9 +122,9 @@ public: return &_pool; } - void reclaim(std::shared_ptr buf) { + void reclaim(Slice buf) { std::unique_lock lck {_lock}; - _free_buffers.emplace_front(std::move(buf)); + _free_raw_buffers.emplace_front(buf); _cv.notify_all(); } @@ -134,7 +133,8 @@ public: private: std::mutex _lock; std::condition_variable _cv; - std::list> _free_buffers; + std::unique_ptr _whole_mem_buffer; + std::list _free_raw_buffers; }; } // namespace io } // namespace doris diff --git a/be/src/io/fs/s3_file_writer.cpp b/be/src/io/fs/s3_file_writer.cpp index e6eab1a21a..d31a144543 100644 --- a/be/src/io/fs/s3_file_writer.cpp +++ b/be/src/io/fs/s3_file_writer.cpp @@ -87,11 +87,15 @@ S3FileWriter::S3FileWriter(Path path, std::shared_ptr client, const S3 } S3FileWriter::~S3FileWriter() { - if (_opened) { - close(); + if (!_closed) { + // if we don't abort multi part upload, the uploaded part in object + // store will not automatically reclaim itself, it would cost more money + abort(); } - CHECK(!_opened || _closed) << "open: " << _opened << ", closed: " << _closed; + CHECK(_closed) << ", closed: " << _closed; // in case there are task which might run after this object is destroyed + // for example, if the whole task failed and some task are still pending + // in threadpool _wait_until_finish("dtor"); s3_file_being_written << -1; } @@ -121,9 +125,10 @@ void S3FileWriter::_wait_until_finish(std::string task_name) { } Status S3FileWriter::abort() { + // make all pending work early quits _failed = true; - if (_closed || !_opened) { - _wait_until_finish("Abort"); + _closed = true; + if (_aborted) { return Status::OK(); } // we need to reclaim the memory @@ -137,7 +142,6 @@ Status S3FileWriter::abort() { } VLOG_DEBUG << "S3FileWriter::abort, path: " << _path.native(); _wait_until_finish("Abort"); - _closed = true; AbortMultipartUploadRequest request; request.WithBucket(_bucket).WithKey(_key).WithUploadId(_upload_id); auto outcome = _client->AbortMultipartUpload(request); @@ -147,6 +151,7 @@ Status S3FileWriter::abort() { LOG(INFO) << "Abort multipart upload successfully" << "bucket=" << _bucket << ", key=" << _path.native() << ", upload_id=" << _upload_id; + _aborted = true; return Status::OK(); } return Status::IOError("failed to abort multipart upload(bucket={}, key={}, upload_id={}): {}", @@ -154,11 +159,17 @@ Status S3FileWriter::abort() { } Status S3FileWriter::close() { - if (_closed || _failed) { + if (_closed) { _wait_until_finish("close"); return _st; } + _closed = true; + if (_failed) { + abort(); + return _st; + } VLOG_DEBUG << "S3FileWriter::close, path: " << _path.native(); + // it might be one file less than 5MB, we do upload here if (_pending_buf != nullptr) { if (_upload_id.empty()) { _pending_buf->set_upload_remote_callback( @@ -169,18 +180,11 @@ Status S3FileWriter::close() { _pending_buf = nullptr; } RETURN_IF_ERROR(_complete()); - _closed = true; return Status::OK(); } Status S3FileWriter::appendv(const Slice* data, size_t data_cnt) { - // lazy open - if (!_opened) { - VLOG_DEBUG << "S3FileWriter::open, path: " << _path.native(); - _closed = false; - _opened = true; - } DCHECK(!_closed); size_t buffer_size = config::s3_write_buffer_size; SCOPED_RAW_TIMER(_upload_cost_ms.get()); @@ -339,7 +343,7 @@ Status S3FileWriter::finalize() { } void S3FileWriter::_put_object(S3FileBuffer& buf) { - DCHECK(!_closed && _opened) << "closed " << _closed << " opened " << _opened; + DCHECK(!_closed) << "closed " << _closed; Aws::S3::Model::PutObjectRequest request; request.WithBucket(_bucket).WithKey(_key); request.SetBody(buf.get_stream()); @@ -351,6 +355,8 @@ void S3FileWriter::_put_object(S3FileBuffer& buf) { response.GetError().GetExceptionName(), response.GetError().GetMessage(), static_cast(response.GetError().GetResponseCode())); + buf._on_failed(_st); + LOG(WARNING) << _st; } s3_bytes_written_total << buf.get_size(); s3_file_created_total << 1; diff --git a/be/src/io/fs/s3_file_writer.h b/be/src/io/fs/s3_file_writer.h index 8f04fbc0c3..26b77c42f8 100644 --- a/be/src/io/fs/s3_file_writer.h +++ b/be/src/io/fs/s3_file_writer.h @@ -109,7 +109,7 @@ private: std::string _bucket; std::string _key; bool _closed = true; - bool _opened = false; + bool _aborted = false; std::unique_ptr _upload_cost_ms;