From 157ec5757aeeadcafc2dac0e985e1ebfc9371d21 Mon Sep 17 00:00:00 2001 From: AlexYue Date: Fri, 12 May 2023 09:19:57 +0800 Subject: [PATCH] [fix](s3FileWriter) don't use bthread countdown event to sync #19534 Unfortunately BthreadCountDownEvent will not serve as one sync primitive for this scenario where are all pthread workers. BthreadCountDownEvent::time_wait is used for bthread so it will result in some confusing sync problem like heap buffer use after free. --- be/src/io/fs/s3_file_write_bufferpool.h | 2 +- be/src/io/fs/s3_file_writer.cpp | 34 ++++++++++++-------- be/src/io/fs/s3_file_writer.h | 41 +++++++++++++++++++++++-- 3 files changed, 62 insertions(+), 15 deletions(-) diff --git a/be/src/io/fs/s3_file_write_bufferpool.h b/be/src/io/fs/s3_file_write_bufferpool.h index 7ece0f5618..b1190cc78a 100644 --- a/be/src/io/fs/s3_file_write_bufferpool.h +++ b/be/src/io/fs/s3_file_write_bufferpool.h @@ -38,7 +38,7 @@ struct S3FileBuffer : public std::enable_shared_from_this { using Callback = std::function; S3FileBuffer() = default; - ~S3FileBuffer() = default; + ~S3FileBuffer() { on_finished(); } void rob_buffer(std::shared_ptr& other) { _buf = std::move(other->_buf); diff --git a/be/src/io/fs/s3_file_writer.cpp b/be/src/io/fs/s3_file_writer.cpp index dc6f84fa1c..4fe6cf8ba6 100644 --- a/be/src/io/fs/s3_file_writer.cpp +++ b/be/src/io/fs/s3_file_writer.cpp @@ -106,7 +106,7 @@ void S3FileWriter::_wait_until_finish(std::string task_name) { auto msg = fmt::format("{} multipart upload already takes 5 min, bucket={}, key={}, upload_id={}", std::move(task_name), _bucket, _path.native(), _upload_id); - while (_count.timed_wait({5 * 60, 0}) < 0) { + while (!_wait.wait()) { LOG(WARNING) << msg; } } @@ -114,6 +114,7 @@ void S3FileWriter::_wait_until_finish(std::string task_name) { Status S3FileWriter::abort() { _failed = true; if (_closed || !_opened) { + _wait_until_finish("Abort"); return Status::OK(); } // we need to reclaim the memory @@ -126,8 +127,8 @@ Status S3FileWriter::abort() { return Status::OK(); } VLOG_DEBUG << "S3FileWriter::abort, path: " << _path.native(); - _closed = true; _wait_until_finish("Abort"); + _closed = true; AbortMultipartUploadRequest request; request.WithBucket(_bucket).WithKey(_key).WithUploadId(_upload_id); auto outcome = _client->AbortMultipartUpload(request); @@ -144,17 +145,22 @@ Status S3FileWriter::abort() { } Status S3FileWriter::close() { - if (_closed) { - return Status::OK(); + if (_closed || _failed) { + _wait_until_finish("close"); + return _st; } VLOG_DEBUG << "S3FileWriter::close, path: " << _path.native(); - _closed = true; if (_pending_buf != nullptr) { - _count.add_count(); + if (_upload_id.empty()) { + _pending_buf->set_upload_remote_callback( + [this, buf = _pending_buf]() { _put_object(*buf); }); + } + _wait.add(); _pending_buf->submit(); _pending_buf = nullptr; } RETURN_IF_ERROR(_complete()); + _closed = true; return Status::OK(); } @@ -172,6 +178,9 @@ Status S3FileWriter::appendv(const Slice* data, size_t data_cnt) { for (size_t i = 0; i < data_cnt; i++) { size_t data_size = data[i].get_size(); for (size_t pos = 0, data_size_to_append = 0; pos < data_size; pos += data_size_to_append) { + if (_failed) { + return _st; + } if (!_pending_buf) { _pending_buf = S3FileBufferPool::GetInstance()->allocate(); // capture part num by value along with the value of the shared ptr @@ -181,14 +190,14 @@ Status S3FileWriter::appendv(const Slice* data, size_t data_cnt) { }); _pending_buf->set_file_offset(_bytes_appended); // later we might need to wait all prior tasks to be finished - _pending_buf->set_finish_upload([this]() { _count.signal(); }); + _pending_buf->set_finish_upload([this]() { _wait.done(); }); _pending_buf->set_is_cancel([this]() { return _failed.load(); }); _pending_buf->set_on_failed([this, part_num = _cur_part_num](Status st) { VLOG_NOTICE << "failed at key: " << _key << ", load part " << part_num << ", st " << st.to_string(); std::unique_lock _lck {_completed_lock}; - _failed = true; this->_st = std::move(st); + _failed = true; }); } // we need to make sure all parts except the last one to be 5MB or more @@ -210,7 +219,7 @@ Status S3FileWriter::appendv(const Slice* data, size_t data_cnt) { RETURN_IF_ERROR(_create_multi_upload_request()); } _cur_part_num++; - _count.add_count(); + _wait.add(); _pending_buf->submit(); _pending_buf = nullptr; } @@ -310,15 +319,16 @@ Status S3FileWriter::finalize() { _pending_buf->set_upload_remote_callback( [this, buf = _pending_buf]() { _put_object(*buf); }); } - _count.add_count(); + _wait.add(); _pending_buf->submit(); _pending_buf = nullptr; } - return Status::OK(); + _wait_until_finish("finalize"); + return _st; } void S3FileWriter::_put_object(S3FileBuffer& buf) { - DCHECK(!_closed && _opened); + DCHECK(!_closed && _opened) << "closed " << _closed << " opened " << _opened; Aws::S3::Model::PutObjectRequest request; request.WithBucket(_bucket).WithKey(_key); request.SetBody(buf.get_stream()); diff --git a/be/src/io/fs/s3_file_writer.h b/be/src/io/fs/s3_file_writer.h index cb34477f54..8f04fbc0c3 100644 --- a/be/src/io/fs/s3_file_writer.h +++ b/be/src/io/fs/s3_file_writer.h @@ -18,7 +18,6 @@ #pragma once #include -#include #include #include @@ -63,6 +62,44 @@ public: int64_t upload_cost_ms() const { return *_upload_cost_ms; } private: + class WaitGroup { + public: + WaitGroup() = default; + + ~WaitGroup() = default; + + WaitGroup(const WaitGroup&) = delete; + WaitGroup(WaitGroup&&) = delete; + void operator=(const WaitGroup&) = delete; + void operator=(WaitGroup&&) = delete; + // add one counter indicating one more concurrent worker + void add(int count = 1) { _count += count; } + + // decrease count if one concurrent worker finished it's work + void done() { + _count--; + if (_count.load() <= 0) { + _cv.notify_all(); + } + } + + // wait for all concurrent workers finish their work and return true + // would return false if timeout, default timeout would be 5min + bool wait(int64_t timeout_seconds = 300) { + if (_count.load() <= 0) { + return true; + } + std::unique_lock lck {_lock}; + _cv.wait_for(lck, std::chrono::seconds(timeout_seconds), + [this]() { return _count.load() <= 0; }); + return _count.load() <= 0; + } + + private: + std::mutex _lock; + std::condition_variable _cv; + std::atomic_int64_t _count {0}; + }; void _wait_until_finish(std::string task_name); Status _complete(); Status _create_multi_upload_request(); @@ -85,7 +122,7 @@ private: std::mutex _completed_lock; std::vector> _completed_parts; - bthread::CountdownEvent _count; + WaitGroup _wait; std::atomic_bool _failed = false; Status _st = Status::OK();