[bug](s3) Fix object data is overwritten by empty object (#32258)
This commit is contained in:
@ -89,7 +89,7 @@ Status localfs_error(const std::error_code& ec, std::string_view msg) {
|
||||
} else if (ec == std::errc::permission_denied) {
|
||||
return Status::Error<PERMISSION_DENIED, false>(msg);
|
||||
} else {
|
||||
return Status::Error<doris::INTERNAL_ERROR, false>("{}: {}", msg, ec.message());
|
||||
return Status::Error<ErrorCode::INTERNAL_ERROR, false>("{}: {}", msg, ec.message());
|
||||
}
|
||||
}
|
||||
|
||||
@ -106,8 +106,8 @@ Status localfs_error(int posix_errno, std::string_view msg) {
|
||||
case EACCES:
|
||||
return Status::Error<PERMISSION_DENIED, false>(msg);
|
||||
default:
|
||||
return Status::Error<doris::INTERNAL_ERROR, false>("{}: {}", msg,
|
||||
std::strerror(posix_errno));
|
||||
return Status::Error<ErrorCode::INTERNAL_ERROR, false>("{}: {}", msg,
|
||||
std::strerror(posix_errno));
|
||||
}
|
||||
}
|
||||
|
||||
@ -122,7 +122,7 @@ Status s3fs_error(const Aws::S3::S3Error& err, std::string_view msg) {
|
||||
err.GetExceptionName(), err.GetMessage(),
|
||||
err.GetErrorType());
|
||||
default:
|
||||
return Status::Error<doris::INTERNAL_ERROR, false>(
|
||||
return Status::Error<ErrorCode::INTERNAL_ERROR, false>(
|
||||
"{}: {} {} code={} type={}", msg, err.GetExceptionName(), err.GetMessage(),
|
||||
err.GetResponseCode(), err.GetErrorType());
|
||||
}
|
||||
|
||||
@ -107,9 +107,8 @@ S3FileWriter::~S3FileWriter() {
|
||||
// if we don't abort multi part upload, the uploaded part in object
|
||||
// store will not automatically reclaim itself, it would cost more money
|
||||
static_cast<void>(_abort());
|
||||
_bytes_written = 0;
|
||||
}
|
||||
s3_bytes_written_total << _bytes_written;
|
||||
s3_bytes_written_total << _bytes_appended;
|
||||
s3_file_being_written << -1;
|
||||
}
|
||||
|
||||
@ -207,8 +206,10 @@ Status S3FileWriter::close() {
|
||||
auto* buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get());
|
||||
DCHECK(buf != nullptr);
|
||||
buf->set_upload_to_remote([this](UploadFileBuffer& b) { _put_object(b); });
|
||||
} else if (_create_empty_file) {
|
||||
// if there is no pending buffer, we need to create an empty file
|
||||
}
|
||||
|
||||
if (_bytes_appended == 0 && _create_empty_file) {
|
||||
// No data written, but need to create an empty file
|
||||
auto builder = FileBufferBuilder();
|
||||
builder.set_type(BufferType::UPLOAD)
|
||||
.set_upload_callback([this](UploadFileBuffer& buf) { _put_object(buf); })
|
||||
@ -390,7 +391,6 @@ void S3FileWriter::_upload_one_part(int64_t part_num, UploadFileBuffer& buf) {
|
||||
|
||||
std::unique_lock<std::mutex> lck {_completed_lock};
|
||||
_completed_parts.emplace_back(std::move(completed_part));
|
||||
_bytes_written += buf.get_size();
|
||||
}
|
||||
|
||||
Status S3FileWriter::_complete() {
|
||||
@ -501,7 +501,6 @@ void S3FileWriter::_put_object(UploadFileBuffer& buf) {
|
||||
buf.set_status(_st);
|
||||
return;
|
||||
}
|
||||
_bytes_written += buf.get_size();
|
||||
s3_file_created_total << 1;
|
||||
}
|
||||
|
||||
|
||||
@ -79,7 +79,6 @@ private:
|
||||
|
||||
std::atomic_bool _failed = false;
|
||||
Status _st;
|
||||
size_t _bytes_written = 0;
|
||||
|
||||
std::shared_ptr<FileBuffer> _pending_buf;
|
||||
int64_t _expiration_time;
|
||||
|
||||
Reference in New Issue
Block a user