diff --git a/be/src/io/fs/s3_file_write_bufferpool.cpp b/be/src/io/fs/s3_file_write_bufferpool.cpp index 56437c2a22..423068f387 100644 --- a/be/src/io/fs/s3_file_write_bufferpool.cpp +++ b/be/src/io/fs/s3_file_write_bufferpool.cpp @@ -28,11 +28,12 @@ namespace doris { namespace io { void S3FileBuffer::on_finished() { - if (_buf == nullptr) { + if (_buf.empty()) { return; } reset(); - S3FileBufferPool::GetInstance()->reclaim(shared_from_this()); + S3FileBufferPool::GetInstance()->reclaim(_buf); + _buf.clear(); } // when there is memory preserved, directly write data to buf @@ -42,8 +43,8 @@ void S3FileBuffer::append_data(const Slice& data) { Defer defer {[&] { _size += data.get_size(); }}; while (true) { // if buf is not empty, it means there is memory preserved for this buf - if (_buf != nullptr) { - memcpy(_buf->data() + _size, data.get_data(), data.get_size()); + if (!_buf.empty()) { + memcpy(_buf.data + _size, data.get_data(), data.get_size()); break; } else { // wait allocate buffer pool @@ -54,8 +55,8 @@ void S3FileBuffer::append_data(const Slice& data) { } void S3FileBuffer::submit() { - if (LIKELY(_buf != nullptr)) { - _stream_ptr = std::make_shared(_buf->data(), _size); + if (LIKELY(!_buf.empty())) { + _stream_ptr = std::make_shared(_buf.data, _size); } ExecEnv::GetInstance()->buffered_reader_prefetch_thread_pool()->submit_func( @@ -69,37 +70,33 @@ S3FileBufferPool::S3FileBufferPool() { (config::s3_write_buffer_whole_size > config::s3_write_buffer_size)); LOG_INFO("S3 file buffer pool with {} buffers", buf_num); for (size_t i = 0; i < buf_num; i++) { - auto buf = std::make_shared(); - buf->reserve_buffer(); - _free_buffers.emplace_back(std::move(buf)); + Slice s {_whole_mem_buffer.get() + i * config::s3_write_buffer_size, + static_cast(config::s3_write_buffer_size)}; + _free_raw_buffers.emplace_back(s); } } std::shared_ptr S3FileBufferPool::allocate(bool reserve) { - std::shared_ptr buf; + std::shared_ptr buf = std::make_shared(); // if need reserve then we must ensure return buf with memory preserved if (reserve) { { std::unique_lock lck {_lock}; - _cv.wait(lck, [this]() { return !_free_buffers.empty(); }); - buf = std::move(_free_buffers.front()); - _free_buffers.pop_front(); + _cv.wait(lck, [this]() { return !_free_raw_buffers.empty(); }); + buf->reserve_buffer(_free_raw_buffers.front()); + _free_raw_buffers.pop_front(); } return buf; } // try to get one memory reserved buffer { std::unique_lock lck {_lock}; - if (!_free_buffers.empty()) { - buf = std::move(_free_buffers.front()); - _free_buffers.pop_front(); + if (!_free_raw_buffers.empty()) { + buf->reserve_buffer(_free_raw_buffers.front()); + _free_raw_buffers.pop_front(); } } - if (buf != nullptr) { - return buf; - } // if there is no free buffer and no need to reserve memory, we could return one empty buffer - buf = std::make_shared(); // if the buf has no memory reserved, it would try to write the data to file cache first // or it would try to rob buffer from other S3FileBuffer return buf; diff --git a/be/src/io/fs/s3_file_write_bufferpool.h b/be/src/io/fs/s3_file_write_bufferpool.h index b7b1b9f261..b69964b48e 100644 --- a/be/src/io/fs/s3_file_write_bufferpool.h +++ b/be/src/io/fs/s3_file_write_bufferpool.h @@ -33,7 +33,7 @@ namespace doris { namespace io { -// TODO(AlexYue): 1. support write into cache 2. unify write buffer and read buffer 3. decouple reserved memory and Callbacks +// TODO(AlexYue): 1. support write into cache 2. unify write buffer and read buffer struct S3FileBuffer : public std::enable_shared_from_this { using Callback = std::function; @@ -41,14 +41,13 @@ struct S3FileBuffer : public std::enable_shared_from_this { ~S3FileBuffer() = default; void rob_buffer(std::shared_ptr& other) { - _buf = std::move(other->_buf); - other->_buf = nullptr; + _buf = other->_buf; + // we should clear other's memory buffer in case it woule be reclaimed twice + // when calling on_finished + other->_buf.clear(); } - void reserve_buffer() { - _buf = std::make_unique(); - _buf->resize(config::s3_write_buffer_size); - } + void reserve_buffer(Slice s) { _buf = s; } // apend data into the memory buffer inside or into the file cache // if the buffer has no memory buffer @@ -109,7 +108,7 @@ struct S3FileBuffer : public std::enable_shared_from_this { size_t _size; std::shared_ptr _stream_ptr; // only served as one reserved buffer - std::unique_ptr _buf; + Slice _buf; size_t _append_offset {0}; }; @@ -123,9 +122,9 @@ public: return &_pool; } - void reclaim(std::shared_ptr buf) { + void reclaim(Slice buf) { std::unique_lock lck {_lock}; - _free_buffers.emplace_front(std::move(buf)); + _free_raw_buffers.emplace_front(buf); _cv.notify_all(); } @@ -134,7 +133,8 @@ public: private: std::mutex _lock; std::condition_variable _cv; - std::list> _free_buffers; + std::unique_ptr _whole_mem_buffer; + std::list _free_raw_buffers; }; } // namespace io } // namespace doris diff --git a/be/src/io/fs/s3_file_writer.cpp b/be/src/io/fs/s3_file_writer.cpp index e6eab1a21a..d31a144543 100644 --- a/be/src/io/fs/s3_file_writer.cpp +++ b/be/src/io/fs/s3_file_writer.cpp @@ -87,11 +87,15 @@ S3FileWriter::S3FileWriter(Path path, std::shared_ptr client, const S3 } S3FileWriter::~S3FileWriter() { - if (_opened) { - close(); + if (!_closed) { + // if we don't abort multi part upload, the uploaded part in object + // store will not automatically reclaim itself, it would cost more money + abort(); } - CHECK(!_opened || _closed) << "open: " << _opened << ", closed: " << _closed; + CHECK(_closed) << ", closed: " << _closed; // in case there are task which might run after this object is destroyed + // for example, if the whole task failed and some task are still pending + // in threadpool _wait_until_finish("dtor"); s3_file_being_written << -1; } @@ -121,9 +125,10 @@ void S3FileWriter::_wait_until_finish(std::string task_name) { } Status S3FileWriter::abort() { + // make all pending work early quits _failed = true; - if (_closed || !_opened) { - _wait_until_finish("Abort"); + _closed = true; + if (_aborted) { return Status::OK(); } // we need to reclaim the memory @@ -137,7 +142,6 @@ Status S3FileWriter::abort() { } VLOG_DEBUG << "S3FileWriter::abort, path: " << _path.native(); _wait_until_finish("Abort"); - _closed = true; AbortMultipartUploadRequest request; request.WithBucket(_bucket).WithKey(_key).WithUploadId(_upload_id); auto outcome = _client->AbortMultipartUpload(request); @@ -147,6 +151,7 @@ Status S3FileWriter::abort() { LOG(INFO) << "Abort multipart upload successfully" << "bucket=" << _bucket << ", key=" << _path.native() << ", upload_id=" << _upload_id; + _aborted = true; return Status::OK(); } return Status::IOError("failed to abort multipart upload(bucket={}, key={}, upload_id={}): {}", @@ -154,11 +159,17 @@ Status S3FileWriter::abort() { } Status S3FileWriter::close() { - if (_closed || _failed) { + if (_closed) { _wait_until_finish("close"); return _st; } + _closed = true; + if (_failed) { + abort(); + return _st; + } VLOG_DEBUG << "S3FileWriter::close, path: " << _path.native(); + // it might be one file less than 5MB, we do upload here if (_pending_buf != nullptr) { if (_upload_id.empty()) { _pending_buf->set_upload_remote_callback( @@ -169,18 +180,11 @@ Status S3FileWriter::close() { _pending_buf = nullptr; } RETURN_IF_ERROR(_complete()); - _closed = true; return Status::OK(); } Status S3FileWriter::appendv(const Slice* data, size_t data_cnt) { - // lazy open - if (!_opened) { - VLOG_DEBUG << "S3FileWriter::open, path: " << _path.native(); - _closed = false; - _opened = true; - } DCHECK(!_closed); size_t buffer_size = config::s3_write_buffer_size; SCOPED_RAW_TIMER(_upload_cost_ms.get()); @@ -339,7 +343,7 @@ Status S3FileWriter::finalize() { } void S3FileWriter::_put_object(S3FileBuffer& buf) { - DCHECK(!_closed && _opened) << "closed " << _closed << " opened " << _opened; + DCHECK(!_closed) << "closed " << _closed; Aws::S3::Model::PutObjectRequest request; request.WithBucket(_bucket).WithKey(_key); request.SetBody(buf.get_stream()); @@ -351,6 +355,8 @@ void S3FileWriter::_put_object(S3FileBuffer& buf) { response.GetError().GetExceptionName(), response.GetError().GetMessage(), static_cast(response.GetError().GetResponseCode())); + buf._on_failed(_st); + LOG(WARNING) << _st; } s3_bytes_written_total << buf.get_size(); s3_file_created_total << 1; diff --git a/be/src/io/fs/s3_file_writer.h b/be/src/io/fs/s3_file_writer.h index 8f04fbc0c3..26b77c42f8 100644 --- a/be/src/io/fs/s3_file_writer.h +++ b/be/src/io/fs/s3_file_writer.h @@ -109,7 +109,7 @@ private: std::string _bucket; std::string _key; bool _closed = true; - bool _opened = false; + bool _aborted = false; std::unique_ptr _upload_cost_ms;