[fix](s3FileWriter) don't use bthread countdown event to sync #19534
Unfortunately BthreadCountDownEvent will not serve as one sync primitive for this scenario where are all pthread workers. BthreadCountDownEvent::time_wait is used for bthread so it will result in some confusing sync problem like heap buffer use after free.
This commit is contained in:
@ -38,7 +38,7 @@ struct S3FileBuffer : public std::enable_shared_from_this<S3FileBuffer> {
|
||||
using Callback = std::function<void()>;
|
||||
|
||||
S3FileBuffer() = default;
|
||||
~S3FileBuffer() = default;
|
||||
~S3FileBuffer() { on_finished(); }
|
||||
|
||||
void rob_buffer(std::shared_ptr<S3FileBuffer>& other) {
|
||||
_buf = std::move(other->_buf);
|
||||
|
||||
@ -106,7 +106,7 @@ void S3FileWriter::_wait_until_finish(std::string task_name) {
|
||||
auto msg =
|
||||
fmt::format("{} multipart upload already takes 5 min, bucket={}, key={}, upload_id={}",
|
||||
std::move(task_name), _bucket, _path.native(), _upload_id);
|
||||
while (_count.timed_wait({5 * 60, 0}) < 0) {
|
||||
while (!_wait.wait()) {
|
||||
LOG(WARNING) << msg;
|
||||
}
|
||||
}
|
||||
@ -114,6 +114,7 @@ void S3FileWriter::_wait_until_finish(std::string task_name) {
|
||||
Status S3FileWriter::abort() {
|
||||
_failed = true;
|
||||
if (_closed || !_opened) {
|
||||
_wait_until_finish("Abort");
|
||||
return Status::OK();
|
||||
}
|
||||
// we need to reclaim the memory
|
||||
@ -126,8 +127,8 @@ Status S3FileWriter::abort() {
|
||||
return Status::OK();
|
||||
}
|
||||
VLOG_DEBUG << "S3FileWriter::abort, path: " << _path.native();
|
||||
_closed = true;
|
||||
_wait_until_finish("Abort");
|
||||
_closed = true;
|
||||
AbortMultipartUploadRequest request;
|
||||
request.WithBucket(_bucket).WithKey(_key).WithUploadId(_upload_id);
|
||||
auto outcome = _client->AbortMultipartUpload(request);
|
||||
@ -144,17 +145,22 @@ Status S3FileWriter::abort() {
|
||||
}
|
||||
|
||||
Status S3FileWriter::close() {
|
||||
if (_closed) {
|
||||
return Status::OK();
|
||||
if (_closed || _failed) {
|
||||
_wait_until_finish("close");
|
||||
return _st;
|
||||
}
|
||||
VLOG_DEBUG << "S3FileWriter::close, path: " << _path.native();
|
||||
_closed = true;
|
||||
if (_pending_buf != nullptr) {
|
||||
_count.add_count();
|
||||
if (_upload_id.empty()) {
|
||||
_pending_buf->set_upload_remote_callback(
|
||||
[this, buf = _pending_buf]() { _put_object(*buf); });
|
||||
}
|
||||
_wait.add();
|
||||
_pending_buf->submit();
|
||||
_pending_buf = nullptr;
|
||||
}
|
||||
RETURN_IF_ERROR(_complete());
|
||||
_closed = true;
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
@ -172,6 +178,9 @@ Status S3FileWriter::appendv(const Slice* data, size_t data_cnt) {
|
||||
for (size_t i = 0; i < data_cnt; i++) {
|
||||
size_t data_size = data[i].get_size();
|
||||
for (size_t pos = 0, data_size_to_append = 0; pos < data_size; pos += data_size_to_append) {
|
||||
if (_failed) {
|
||||
return _st;
|
||||
}
|
||||
if (!_pending_buf) {
|
||||
_pending_buf = S3FileBufferPool::GetInstance()->allocate();
|
||||
// capture part num by value along with the value of the shared ptr
|
||||
@ -181,14 +190,14 @@ Status S3FileWriter::appendv(const Slice* data, size_t data_cnt) {
|
||||
});
|
||||
_pending_buf->set_file_offset(_bytes_appended);
|
||||
// later we might need to wait all prior tasks to be finished
|
||||
_pending_buf->set_finish_upload([this]() { _count.signal(); });
|
||||
_pending_buf->set_finish_upload([this]() { _wait.done(); });
|
||||
_pending_buf->set_is_cancel([this]() { return _failed.load(); });
|
||||
_pending_buf->set_on_failed([this, part_num = _cur_part_num](Status st) {
|
||||
VLOG_NOTICE << "failed at key: " << _key << ", load part " << part_num
|
||||
<< ", st " << st.to_string();
|
||||
std::unique_lock<std::mutex> _lck {_completed_lock};
|
||||
_failed = true;
|
||||
this->_st = std::move(st);
|
||||
_failed = true;
|
||||
});
|
||||
}
|
||||
// we need to make sure all parts except the last one to be 5MB or more
|
||||
@ -210,7 +219,7 @@ Status S3FileWriter::appendv(const Slice* data, size_t data_cnt) {
|
||||
RETURN_IF_ERROR(_create_multi_upload_request());
|
||||
}
|
||||
_cur_part_num++;
|
||||
_count.add_count();
|
||||
_wait.add();
|
||||
_pending_buf->submit();
|
||||
_pending_buf = nullptr;
|
||||
}
|
||||
@ -310,15 +319,16 @@ Status S3FileWriter::finalize() {
|
||||
_pending_buf->set_upload_remote_callback(
|
||||
[this, buf = _pending_buf]() { _put_object(*buf); });
|
||||
}
|
||||
_count.add_count();
|
||||
_wait.add();
|
||||
_pending_buf->submit();
|
||||
_pending_buf = nullptr;
|
||||
}
|
||||
return Status::OK();
|
||||
_wait_until_finish("finalize");
|
||||
return _st;
|
||||
}
|
||||
|
||||
void S3FileWriter::_put_object(S3FileBuffer& buf) {
|
||||
DCHECK(!_closed && _opened);
|
||||
DCHECK(!_closed && _opened) << "closed " << _closed << " opened " << _opened;
|
||||
Aws::S3::Model::PutObjectRequest request;
|
||||
request.WithBucket(_bucket).WithKey(_key);
|
||||
request.SetBody(buf.get_stream());
|
||||
|
||||
@ -18,7 +18,6 @@
|
||||
#pragma once
|
||||
|
||||
#include <aws/core/utils/memory/stl/AWSStringStream.h>
|
||||
#include <bthread/countdown_event.h>
|
||||
|
||||
#include <cstddef>
|
||||
#include <list>
|
||||
@ -63,6 +62,44 @@ public:
|
||||
int64_t upload_cost_ms() const { return *_upload_cost_ms; }
|
||||
|
||||
private:
|
||||
class WaitGroup {
|
||||
public:
|
||||
WaitGroup() = default;
|
||||
|
||||
~WaitGroup() = default;
|
||||
|
||||
WaitGroup(const WaitGroup&) = delete;
|
||||
WaitGroup(WaitGroup&&) = delete;
|
||||
void operator=(const WaitGroup&) = delete;
|
||||
void operator=(WaitGroup&&) = delete;
|
||||
// add one counter indicating one more concurrent worker
|
||||
void add(int count = 1) { _count += count; }
|
||||
|
||||
// decrease count if one concurrent worker finished it's work
|
||||
void done() {
|
||||
_count--;
|
||||
if (_count.load() <= 0) {
|
||||
_cv.notify_all();
|
||||
}
|
||||
}
|
||||
|
||||
// wait for all concurrent workers finish their work and return true
|
||||
// would return false if timeout, default timeout would be 5min
|
||||
bool wait(int64_t timeout_seconds = 300) {
|
||||
if (_count.load() <= 0) {
|
||||
return true;
|
||||
}
|
||||
std::unique_lock<std::mutex> lck {_lock};
|
||||
_cv.wait_for(lck, std::chrono::seconds(timeout_seconds),
|
||||
[this]() { return _count.load() <= 0; });
|
||||
return _count.load() <= 0;
|
||||
}
|
||||
|
||||
private:
|
||||
std::mutex _lock;
|
||||
std::condition_variable _cv;
|
||||
std::atomic_int64_t _count {0};
|
||||
};
|
||||
void _wait_until_finish(std::string task_name);
|
||||
Status _complete();
|
||||
Status _create_multi_upload_request();
|
||||
@ -85,7 +122,7 @@ private:
|
||||
std::mutex _completed_lock;
|
||||
std::vector<std::unique_ptr<Aws::S3::Model::CompletedPart>> _completed_parts;
|
||||
|
||||
bthread::CountdownEvent _count;
|
||||
WaitGroup _wait;
|
||||
|
||||
std::atomic_bool _failed = false;
|
||||
Status _st = Status::OK();
|
||||
|
||||
Reference in New Issue
Block a user