[enhance](FileWriter)enhance s3 file writer bvar to avoid adding abort bytes (#20138)
* don't add each time upload or it would add aborted bytes * alloca memory
This commit is contained in:
@ -69,6 +69,7 @@ S3FileBufferPool::S3FileBufferPool() {
|
||||
DCHECK((config::s3_write_buffer_size >= 5 * 1024 * 1024) &&
|
||||
(config::s3_write_buffer_whole_size > config::s3_write_buffer_size));
|
||||
LOG_INFO("S3 file buffer pool with {} buffers", buf_num);
|
||||
_whole_mem_buffer = std::make_unique<char[]>(config::s3_write_buffer_whole_size);
|
||||
for (size_t i = 0; i < buf_num; i++) {
|
||||
Slice s {_whole_mem_buffer.get() + i * config::s3_write_buffer_size,
|
||||
static_cast<size_t>(config::s3_write_buffer_size)};
|
||||
|
||||
@ -46,6 +46,7 @@
|
||||
#include "io/fs/file_writer.h"
|
||||
#include "io/fs/path.h"
|
||||
#include "io/fs/s3_file_write_bufferpool.h"
|
||||
#include "util/defer_op.h"
|
||||
#include "util/doris_metrics.h"
|
||||
#include "util/runtime_profile.h"
|
||||
|
||||
@ -87,11 +88,13 @@ S3FileWriter::S3FileWriter(Path path, std::shared_ptr<S3Client> client, const S3
|
||||
}
|
||||
|
||||
S3FileWriter::~S3FileWriter() {
|
||||
if (!_closed) {
|
||||
if (!_closed || _failed) {
|
||||
// if we don't abort multi part upload, the uploaded part in object
|
||||
// store will not automatically reclaim itself, it would cost more money
|
||||
abort();
|
||||
_bytes_written = 0;
|
||||
}
|
||||
s3_bytes_written_total << _bytes_written;
|
||||
CHECK(_closed) << ", closed: " << _closed;
|
||||
// in case there are task which might run after this object is destroyed
|
||||
// for example, if the whole task failed and some task are still pending
|
||||
@ -163,7 +166,7 @@ Status S3FileWriter::close() {
|
||||
_wait_until_finish("close");
|
||||
return _st;
|
||||
}
|
||||
_closed = true;
|
||||
Defer defer {[&]() { _closed = true; }};
|
||||
if (_failed) {
|
||||
abort();
|
||||
return _st;
|
||||
@ -358,7 +361,7 @@ void S3FileWriter::_put_object(S3FileBuffer& buf) {
|
||||
buf._on_failed(_st);
|
||||
LOG(WARNING) << _st;
|
||||
}
|
||||
s3_bytes_written_total << buf.get_size();
|
||||
_bytes_written += buf.get_size();
|
||||
s3_file_created_total << 1;
|
||||
}
|
||||
|
||||
|
||||
@ -108,7 +108,7 @@ private:
|
||||
|
||||
std::string _bucket;
|
||||
std::string _key;
|
||||
bool _closed = true;
|
||||
bool _closed = false;
|
||||
bool _aborted = false;
|
||||
|
||||
std::unique_ptr<int64_t> _upload_cost_ms;
|
||||
|
||||
Reference in New Issue
Block a user