[enhance](S3FileWriter) abort when s3 file writer abnormally quite and optimize s3 buffer pool (#19944)
1. reduce s3 buffer pool's ctor cost 2. before this pr, if one s3 file writer return err when calling append or close function, the caller will not call abort function which result in one confusing DCHECK failed like the following picture
This commit is contained in:
@ -28,11 +28,12 @@
|
||||
namespace doris {
|
||||
namespace io {
|
||||
void S3FileBuffer::on_finished() {
|
||||
if (_buf == nullptr) {
|
||||
if (_buf.empty()) {
|
||||
return;
|
||||
}
|
||||
reset();
|
||||
S3FileBufferPool::GetInstance()->reclaim(shared_from_this());
|
||||
S3FileBufferPool::GetInstance()->reclaim(_buf);
|
||||
_buf.clear();
|
||||
}
|
||||
|
||||
// when there is memory preserved, directly write data to buf
|
||||
@ -42,8 +43,8 @@ void S3FileBuffer::append_data(const Slice& data) {
|
||||
Defer defer {[&] { _size += data.get_size(); }};
|
||||
while (true) {
|
||||
// if buf is not empty, it means there is memory preserved for this buf
|
||||
if (_buf != nullptr) {
|
||||
memcpy(_buf->data() + _size, data.get_data(), data.get_size());
|
||||
if (!_buf.empty()) {
|
||||
memcpy(_buf.data + _size, data.get_data(), data.get_size());
|
||||
break;
|
||||
} else {
|
||||
// wait allocate buffer pool
|
||||
@ -54,8 +55,8 @@ void S3FileBuffer::append_data(const Slice& data) {
|
||||
}
|
||||
|
||||
void S3FileBuffer::submit() {
|
||||
if (LIKELY(_buf != nullptr)) {
|
||||
_stream_ptr = std::make_shared<StringViewStream>(_buf->data(), _size);
|
||||
if (LIKELY(!_buf.empty())) {
|
||||
_stream_ptr = std::make_shared<StringViewStream>(_buf.data, _size);
|
||||
}
|
||||
|
||||
ExecEnv::GetInstance()->buffered_reader_prefetch_thread_pool()->submit_func(
|
||||
@ -69,37 +70,33 @@ S3FileBufferPool::S3FileBufferPool() {
|
||||
(config::s3_write_buffer_whole_size > config::s3_write_buffer_size));
|
||||
LOG_INFO("S3 file buffer pool with {} buffers", buf_num);
|
||||
for (size_t i = 0; i < buf_num; i++) {
|
||||
auto buf = std::make_shared<S3FileBuffer>();
|
||||
buf->reserve_buffer();
|
||||
_free_buffers.emplace_back(std::move(buf));
|
||||
Slice s {_whole_mem_buffer.get() + i * config::s3_write_buffer_size,
|
||||
static_cast<size_t>(config::s3_write_buffer_size)};
|
||||
_free_raw_buffers.emplace_back(s);
|
||||
}
|
||||
}
|
||||
|
||||
std::shared_ptr<S3FileBuffer> S3FileBufferPool::allocate(bool reserve) {
|
||||
std::shared_ptr<S3FileBuffer> buf;
|
||||
std::shared_ptr<S3FileBuffer> buf = std::make_shared<S3FileBuffer>();
|
||||
// if need reserve then we must ensure return buf with memory preserved
|
||||
if (reserve) {
|
||||
{
|
||||
std::unique_lock<std::mutex> lck {_lock};
|
||||
_cv.wait(lck, [this]() { return !_free_buffers.empty(); });
|
||||
buf = std::move(_free_buffers.front());
|
||||
_free_buffers.pop_front();
|
||||
_cv.wait(lck, [this]() { return !_free_raw_buffers.empty(); });
|
||||
buf->reserve_buffer(_free_raw_buffers.front());
|
||||
_free_raw_buffers.pop_front();
|
||||
}
|
||||
return buf;
|
||||
}
|
||||
// try to get one memory reserved buffer
|
||||
{
|
||||
std::unique_lock<std::mutex> lck {_lock};
|
||||
if (!_free_buffers.empty()) {
|
||||
buf = std::move(_free_buffers.front());
|
||||
_free_buffers.pop_front();
|
||||
if (!_free_raw_buffers.empty()) {
|
||||
buf->reserve_buffer(_free_raw_buffers.front());
|
||||
_free_raw_buffers.pop_front();
|
||||
}
|
||||
}
|
||||
if (buf != nullptr) {
|
||||
return buf;
|
||||
}
|
||||
// if there is no free buffer and no need to reserve memory, we could return one empty buffer
|
||||
buf = std::make_shared<S3FileBuffer>();
|
||||
// if the buf has no memory reserved, it would try to write the data to file cache first
|
||||
// or it would try to rob buffer from other S3FileBuffer
|
||||
return buf;
|
||||
|
||||
@ -33,7 +33,7 @@
|
||||
namespace doris {
|
||||
namespace io {
|
||||
|
||||
// TODO(AlexYue): 1. support write into cache 2. unify write buffer and read buffer 3. decouple reserved memory and Callbacks
|
||||
// TODO(AlexYue): 1. support write into cache 2. unify write buffer and read buffer
|
||||
struct S3FileBuffer : public std::enable_shared_from_this<S3FileBuffer> {
|
||||
using Callback = std::function<void()>;
|
||||
|
||||
@ -41,14 +41,13 @@ struct S3FileBuffer : public std::enable_shared_from_this<S3FileBuffer> {
|
||||
~S3FileBuffer() = default;
|
||||
|
||||
void rob_buffer(std::shared_ptr<S3FileBuffer>& other) {
|
||||
_buf = std::move(other->_buf);
|
||||
other->_buf = nullptr;
|
||||
_buf = other->_buf;
|
||||
// we should clear other's memory buffer in case it woule be reclaimed twice
|
||||
// when calling on_finished
|
||||
other->_buf.clear();
|
||||
}
|
||||
|
||||
void reserve_buffer() {
|
||||
_buf = std::make_unique<std::string>();
|
||||
_buf->resize(config::s3_write_buffer_size);
|
||||
}
|
||||
void reserve_buffer(Slice s) { _buf = s; }
|
||||
|
||||
// apend data into the memory buffer inside or into the file cache
|
||||
// if the buffer has no memory buffer
|
||||
@ -109,7 +108,7 @@ struct S3FileBuffer : public std::enable_shared_from_this<S3FileBuffer> {
|
||||
size_t _size;
|
||||
std::shared_ptr<std::iostream> _stream_ptr;
|
||||
// only served as one reserved buffer
|
||||
std::unique_ptr<std::string> _buf;
|
||||
Slice _buf;
|
||||
size_t _append_offset {0};
|
||||
};
|
||||
|
||||
@ -123,9 +122,9 @@ public:
|
||||
return &_pool;
|
||||
}
|
||||
|
||||
void reclaim(std::shared_ptr<S3FileBuffer> buf) {
|
||||
void reclaim(Slice buf) {
|
||||
std::unique_lock<std::mutex> lck {_lock};
|
||||
_free_buffers.emplace_front(std::move(buf));
|
||||
_free_raw_buffers.emplace_front(buf);
|
||||
_cv.notify_all();
|
||||
}
|
||||
|
||||
@ -134,7 +133,8 @@ public:
|
||||
private:
|
||||
std::mutex _lock;
|
||||
std::condition_variable _cv;
|
||||
std::list<std::shared_ptr<S3FileBuffer>> _free_buffers;
|
||||
std::unique_ptr<char[]> _whole_mem_buffer;
|
||||
std::list<Slice> _free_raw_buffers;
|
||||
};
|
||||
} // namespace io
|
||||
} // namespace doris
|
||||
|
||||
@ -87,11 +87,15 @@ S3FileWriter::S3FileWriter(Path path, std::shared_ptr<S3Client> client, const S3
|
||||
}
|
||||
|
||||
S3FileWriter::~S3FileWriter() {
|
||||
if (_opened) {
|
||||
close();
|
||||
if (!_closed) {
|
||||
// if we don't abort multi part upload, the uploaded part in object
|
||||
// store will not automatically reclaim itself, it would cost more money
|
||||
abort();
|
||||
}
|
||||
CHECK(!_opened || _closed) << "open: " << _opened << ", closed: " << _closed;
|
||||
CHECK(_closed) << ", closed: " << _closed;
|
||||
// in case there are task which might run after this object is destroyed
|
||||
// for example, if the whole task failed and some task are still pending
|
||||
// in threadpool
|
||||
_wait_until_finish("dtor");
|
||||
s3_file_being_written << -1;
|
||||
}
|
||||
@ -121,9 +125,10 @@ void S3FileWriter::_wait_until_finish(std::string task_name) {
|
||||
}
|
||||
|
||||
Status S3FileWriter::abort() {
|
||||
// make all pending work early quits
|
||||
_failed = true;
|
||||
if (_closed || !_opened) {
|
||||
_wait_until_finish("Abort");
|
||||
_closed = true;
|
||||
if (_aborted) {
|
||||
return Status::OK();
|
||||
}
|
||||
// we need to reclaim the memory
|
||||
@ -137,7 +142,6 @@ Status S3FileWriter::abort() {
|
||||
}
|
||||
VLOG_DEBUG << "S3FileWriter::abort, path: " << _path.native();
|
||||
_wait_until_finish("Abort");
|
||||
_closed = true;
|
||||
AbortMultipartUploadRequest request;
|
||||
request.WithBucket(_bucket).WithKey(_key).WithUploadId(_upload_id);
|
||||
auto outcome = _client->AbortMultipartUpload(request);
|
||||
@ -147,6 +151,7 @@ Status S3FileWriter::abort() {
|
||||
LOG(INFO) << "Abort multipart upload successfully"
|
||||
<< "bucket=" << _bucket << ", key=" << _path.native()
|
||||
<< ", upload_id=" << _upload_id;
|
||||
_aborted = true;
|
||||
return Status::OK();
|
||||
}
|
||||
return Status::IOError("failed to abort multipart upload(bucket={}, key={}, upload_id={}): {}",
|
||||
@ -154,11 +159,17 @@ Status S3FileWriter::abort() {
|
||||
}
|
||||
|
||||
Status S3FileWriter::close() {
|
||||
if (_closed || _failed) {
|
||||
if (_closed) {
|
||||
_wait_until_finish("close");
|
||||
return _st;
|
||||
}
|
||||
_closed = true;
|
||||
if (_failed) {
|
||||
abort();
|
||||
return _st;
|
||||
}
|
||||
VLOG_DEBUG << "S3FileWriter::close, path: " << _path.native();
|
||||
// it might be one file less than 5MB, we do upload here
|
||||
if (_pending_buf != nullptr) {
|
||||
if (_upload_id.empty()) {
|
||||
_pending_buf->set_upload_remote_callback(
|
||||
@ -169,18 +180,11 @@ Status S3FileWriter::close() {
|
||||
_pending_buf = nullptr;
|
||||
}
|
||||
RETURN_IF_ERROR(_complete());
|
||||
_closed = true;
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status S3FileWriter::appendv(const Slice* data, size_t data_cnt) {
|
||||
// lazy open
|
||||
if (!_opened) {
|
||||
VLOG_DEBUG << "S3FileWriter::open, path: " << _path.native();
|
||||
_closed = false;
|
||||
_opened = true;
|
||||
}
|
||||
DCHECK(!_closed);
|
||||
size_t buffer_size = config::s3_write_buffer_size;
|
||||
SCOPED_RAW_TIMER(_upload_cost_ms.get());
|
||||
@ -339,7 +343,7 @@ Status S3FileWriter::finalize() {
|
||||
}
|
||||
|
||||
void S3FileWriter::_put_object(S3FileBuffer& buf) {
|
||||
DCHECK(!_closed && _opened) << "closed " << _closed << " opened " << _opened;
|
||||
DCHECK(!_closed) << "closed " << _closed;
|
||||
Aws::S3::Model::PutObjectRequest request;
|
||||
request.WithBucket(_bucket).WithKey(_key);
|
||||
request.SetBody(buf.get_stream());
|
||||
@ -351,6 +355,8 @@ void S3FileWriter::_put_object(S3FileBuffer& buf) {
|
||||
response.GetError().GetExceptionName(),
|
||||
response.GetError().GetMessage(),
|
||||
static_cast<int>(response.GetError().GetResponseCode()));
|
||||
buf._on_failed(_st);
|
||||
LOG(WARNING) << _st;
|
||||
}
|
||||
s3_bytes_written_total << buf.get_size();
|
||||
s3_file_created_total << 1;
|
||||
|
||||
@ -109,7 +109,7 @@ private:
|
||||
std::string _bucket;
|
||||
std::string _key;
|
||||
bool _closed = true;
|
||||
bool _opened = false;
|
||||
bool _aborted = false;
|
||||
|
||||
std::unique_ptr<int64_t> _upload_cost_ms;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user