[Feature](hive-writer) Implements s3 file committer. (#34307)

Backport #33937.
This commit is contained in:
Qi Chen
2024-04-29 19:56:49 +08:00
committed by GitHub
parent 1bfe0f0393
commit 7cb00a8e54
35 changed files with 988 additions and 196 deletions

View File

@ -67,7 +67,8 @@ Status FileFactory::create_file_writer(TFileType::type type, ExecEnv* env,
const std::vector<TNetworkAddress>& broker_addresses,
const std::map<std::string, std::string>& properties,
const std::string& path, int64_t start_offset,
std::unique_ptr<io::FileWriter>& file_writer) {
std::unique_ptr<io::FileWriter>& file_writer,
const io::FileWriterOptions* opts) {
switch (type) {
case TFileType::FILE_LOCAL: {
RETURN_IF_ERROR(io::global_local_filesystem()->create_file(path, &file_writer));
@ -76,7 +77,7 @@ Status FileFactory::create_file_writer(TFileType::type type, ExecEnv* env,
case TFileType::FILE_BROKER: {
std::shared_ptr<io::BrokerFileSystem> fs;
RETURN_IF_ERROR(io::BrokerFileSystem::create(broker_addresses[0], properties, &fs));
RETURN_IF_ERROR(fs->create_file(path, &file_writer));
RETURN_IF_ERROR(fs->create_file(path, &file_writer, opts));
break;
}
case TFileType::FILE_S3: {
@ -87,7 +88,7 @@ Status FileFactory::create_file_writer(TFileType::type type, ExecEnv* env,
S3ClientFactory::convert_properties_to_s3_conf(properties, s3_uri, &s3_conf));
std::shared_ptr<io::S3FileSystem> fs;
RETURN_IF_ERROR(io::S3FileSystem::create(s3_conf, "", &fs));
RETURN_IF_ERROR(fs->create_file(path, &file_writer));
RETURN_IF_ERROR(fs->create_file(path, &file_writer, opts));
break;
}
case TFileType::FILE_HDFS: {
@ -95,7 +96,7 @@ Status FileFactory::create_file_writer(TFileType::type type, ExecEnv* env,
std::shared_ptr<io::HdfsFileSystem> fs;
RETURN_IF_ERROR(
io::HdfsFileSystem::create(hdfs_params, "", hdfs_params.fs_name, nullptr, &fs));
RETURN_IF_ERROR(fs->create_file(path, &file_writer));
RETURN_IF_ERROR(fs->create_file(path, &file_writer, opts));
break;
}
default:

View File

@ -28,6 +28,7 @@
#include "common/factory_creator.h"
#include "common/status.h"
#include "io/fs/file_reader.h"
#include "io/fs/file_reader_writer_fwd.h"
namespace doris {
namespace io {
@ -73,7 +74,8 @@ public:
const std::vector<TNetworkAddress>& broker_addresses,
const std::map<std::string, std::string>& properties,
const std::string& path, int64_t start_offset,
std::unique_ptr<io::FileWriter>& file_writer);
std::unique_ptr<io::FileWriter>& file_writer,
const io::FileWriterOptions* opts = nullptr);
/// Create FileReader
static Status create_file_reader(const io::FileSystemProperties& system_properties,

View File

@ -31,6 +31,14 @@ class FileSystem;
// Only affects remote file writers
struct FileWriterOptions {
// S3 committer will start multipart uploading all files on BE side,
// and then complete multipart upload these files on FE side.
// If you do not complete multi parts of a file, the file will not be visible.
// So in this way, the atomicity of a single file can be guaranteed. But it still cannot
// guarantee the atomicity of multiple files.
// Because hive committers have best-effort semantics,
// this shortens the inconsistent time window.
bool used_by_s3_committer = false;
bool write_file_cache = false;
bool is_cold_data = false;
bool sync_file_data = true; // Whether flush data into storage system

View File

@ -89,7 +89,8 @@ S3FileWriter::S3FileWriter(std::string key, std::shared_ptr<S3FileSystem> fs,
_cache(nullptr),
_expiration_time(opts ? opts->file_cache_expiration : 0),
_is_cold_data(opts ? opts->is_cold_data : true),
_write_file_cache(opts ? opts->write_file_cache : false) {
_write_file_cache(opts ? opts->write_file_cache : false),
_used_by_s3_committer(opts ? opts->used_by_s3_committer : false) {
s3_file_writer_total << 1;
s3_file_being_written << 1;
@ -203,10 +204,7 @@ Status S3FileWriter::close() {
if (_upload_id.empty()) {
if (_pending_buf != nullptr) {
// it might be one file less than 5MB, we do upload here
auto* buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get());
DCHECK(buf != nullptr);
buf->set_upload_to_remote([this](UploadFileBuffer& b) { _put_object(b); });
RETURN_IF_ERROR(_set_upload_to_remote_less_than_buffer_size());
}
if (_bytes_appended == 0 && _create_empty_file) {
@ -234,6 +232,13 @@ Status S3FileWriter::close() {
RETURN_IF_ERROR(builder.build(&_pending_buf));
auto* buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get());
DCHECK(buf != nullptr);
if (_used_by_s3_committer) {
buf->set_upload_to_remote([part_num = _cur_part_num, this](UploadFileBuffer& buf) {
_upload_one_part(part_num, buf);
});
DCHECK(_cur_part_num == 1);
RETURN_IF_ERROR(_create_multi_upload_request());
}
}
}
if (_pending_buf != nullptr) {
@ -404,56 +409,61 @@ Status S3FileWriter::_complete() {
_wait_until_finish("PutObject");
return _st;
}
CompleteMultipartUploadRequest complete_request;
complete_request.WithBucket(_bucket).WithKey(_key).WithUploadId(_upload_id);
// Wait multipart load and finish.
_wait_until_finish("Complete");
DBUG_EXECUTE_IF("s3_file_writer::_complete:1", { _cur_part_num++; });
if (_failed || _completed_parts.size() != _cur_part_num) {
_st = Status::InternalError(
"error status {}, complete parts {}, cur part num {}, whole parts {}", _st,
_completed_parts.size(), _cur_part_num, _dump_completed_part());
LOG(WARNING) << _st;
return _st;
}
// make sure _completed_parts are ascending order
std::sort(_completed_parts.begin(), _completed_parts.end(),
[](auto& p1, auto& p2) { return p1->GetPartNumber() < p2->GetPartNumber(); });
DBUG_EXECUTE_IF("s3_file_writer::_complete:2",
{ _completed_parts.back()->SetPartNumber(10 * _completed_parts.size()); });
CompletedMultipartUpload completed_upload;
for (size_t i = 0; i < _completed_parts.size(); i++) {
if (_completed_parts[i]->GetPartNumber() != i + 1) [[unlikely]] {
auto st = Status::InternalError(
"error status {}, part num not continous, expected num {}, actual num {}, "
"whole parts {}",
_st, i + 1, _completed_parts[i]->GetPartNumber(), _dump_completed_part());
LOG(WARNING) << st;
_st = st;
return st;
if (!_used_by_s3_committer) { // S3 committer will complete multipart upload file on FE side.
CompleteMultipartUploadRequest complete_request;
complete_request.WithBucket(_bucket).WithKey(_key).WithUploadId(_upload_id);
if (_failed || _completed_parts.size() != _cur_part_num) {
_st = Status::InternalError(
"error status {}, complete parts {}, cur part num {}, whole parts {}", _st,
_completed_parts.size(), _cur_part_num, _dump_completed_part());
LOG(WARNING) << _st;
return _st;
}
// make sure _completed_parts are ascending order
std::sort(_completed_parts.begin(), _completed_parts.end(),
[](auto& p1, auto& p2) { return p1->GetPartNumber() < p2->GetPartNumber(); });
DBUG_EXECUTE_IF("s3_file_writer::_complete:2",
{ _completed_parts.back()->SetPartNumber(10 * _completed_parts.size()); });
CompletedMultipartUpload completed_upload;
for (size_t i = 0; i < _completed_parts.size(); i++) {
if (_completed_parts[i]->GetPartNumber() != i + 1) [[unlikely]] {
auto st = Status::InternalError(
"error status {}, part num not continous, expected num {}, actual num {}, "
"whole parts {}",
_st, i + 1, _completed_parts[i]->GetPartNumber(), _dump_completed_part());
LOG(WARNING) << st;
_st = st;
return st;
}
completed_upload.AddParts(*_completed_parts[i]);
}
completed_upload.AddParts(*_completed_parts[i]);
}
complete_request.WithMultipartUpload(completed_upload);
complete_request.WithMultipartUpload(completed_upload);
DBUG_EXECUTE_IF("s3_file_writer::_complete:3", {
auto s = Status::IOError(
"failed to create complete multi part upload (bucket={}, key={}): injected error",
_bucket, _path.native());
LOG_WARNING(s.to_string());
return s;
});
SCOPED_BVAR_LATENCY(s3_bvar::s3_multi_part_upload_latency);
auto complete_outcome = _client->CompleteMultipartUpload(complete_request);
DBUG_EXECUTE_IF("s3_file_writer::_complete:3", {
auto s = Status::IOError(
"failed to create complete multi part upload (bucket={}, key={}): injected "
"error",
_bucket, _path.native());
LOG_WARNING(s.to_string());
return s;
});
SCOPED_BVAR_LATENCY(s3_bvar::s3_multi_part_upload_latency);
auto complete_outcome = _client->CompleteMultipartUpload(complete_request);
if (!complete_outcome.IsSuccess()) {
_st = s3fs_error(
complete_outcome.GetError(),
fmt::format("failed to complete multi part upload {}, upload_id={}, whole parts={}",
if (!complete_outcome.IsSuccess()) {
_st = s3fs_error(
complete_outcome.GetError(),
fmt::format(
"failed to complete multi part upload {}, upload_id={}, whole parts={}",
_path.native(), _upload_id, _dump_completed_part()));
LOG(WARNING) << _st;
return _st;
LOG(WARNING) << _st;
return _st;
}
}
s3_file_created_total << 1;
return Status::OK();
@ -466,12 +476,8 @@ Status S3FileWriter::finalize() {
// submit pending buf if it's not nullptr
// it's the last buf, we can submit it right now
if (_pending_buf != nullptr) {
// if we only need to upload one file less than 5MB, we can just
// call PutObject to reduce the network IO
if (_upload_id.empty()) {
auto* buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get());
DCHECK(buf != nullptr);
buf->set_upload_to_remote([this](UploadFileBuffer& b) { _put_object(b); });
RETURN_IF_ERROR(_set_upload_to_remote_less_than_buffer_size());
}
_countdown_event.add_count();
RETURN_IF_ERROR(_pending_buf->submit(std::move(_pending_buf)));
@ -481,6 +487,24 @@ Status S3FileWriter::finalize() {
return _st;
}
Status S3FileWriter::_set_upload_to_remote_less_than_buffer_size() {
auto* buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get());
DCHECK(buf != nullptr);
if (_used_by_s3_committer) {
// If used_by_s3_committer, we always use multi-parts uploading.
buf->set_upload_to_remote([part_num = _cur_part_num, this](UploadFileBuffer& buf) {
_upload_one_part(part_num, buf);
});
DCHECK(_cur_part_num == 1);
RETURN_IF_ERROR(_create_multi_upload_request());
} else {
// if we only need to upload one file less than 5MB, we can just
// call PutObject to reduce the network IO
buf->set_upload_to_remote([this](UploadFileBuffer& b) { _put_object(b); });
}
return Status::OK();
}
void S3FileWriter::_put_object(UploadFileBuffer& buf) {
DCHECK(!_closed) << "closed " << _closed;
Aws::S3::Model::PutObjectRequest request;

View File

@ -51,12 +51,21 @@ public:
Status appendv(const Slice* data, size_t data_cnt) override;
Status finalize() override;
const std::vector<std::unique_ptr<Aws::S3::Model::CompletedPart>>& completed_parts() const {
return _completed_parts;
}
const std::string& key() const { return _key; }
const std::string& bucket() const { return _bucket; }
const std::string& upload_id() const { return _upload_id; }
private:
Status _abort();
[[nodiscard]] std::string _dump_completed_part() const;
void _wait_until_finish(std::string_view task_name);
Status _complete();
Status _create_multi_upload_request();
Status _set_upload_to_remote_less_than_buffer_size();
void _put_object(UploadFileBuffer& buf);
void _upload_one_part(int64_t part_num, UploadFileBuffer& buf);
@ -85,6 +94,14 @@ private:
int64_t _expiration_time;
bool _is_cold_data;
bool _write_file_cache;
// S3 committer will start multipart uploading all files on BE side,
// and then complete multipart upload these files on FE side.
// If you do not complete multi parts of a file, the file will not be visible.
// So in this way, the atomicity of a single file can be guaranteed. But it still cannot
// guarantee the atomicity of multiple files.
// Because hive committers have best-effort semantics,
// this shortens the inconsistent time window.
bool _used_by_s3_committer;
};
} // namespace io

View File

@ -17,8 +17,11 @@
#include "vhive_partition_writer.h"
#include <aws/s3/model/CompletedPart.h>
#include "io/file_factory.h"
#include "io/fs/file_system.h"
#include "io/fs/s3_file_writer.h"
#include "runtime/runtime_state.h"
#include "vec/columns/column_map.h"
#include "vec/core/materialize_block.h"
@ -54,10 +57,11 @@ Status VHivePartitionWriter::open(RuntimeState* state, RuntimeProfile* profile)
_state = state;
std::vector<TNetworkAddress> broker_addresses;
io::FileWriterOptions file_writer_options = {.used_by_s3_committer = true};
RETURN_IF_ERROR(FileFactory::create_file_writer(
_write_info.file_type, state->exec_env(), broker_addresses, _hadoop_conf,
fmt::format("{}/{}", _write_info.write_path, _get_target_file_name()), 0,
_file_writer));
fmt::format("{}/{}", _write_info.write_path, _get_target_file_name()), 0, _file_writer,
&file_writer_options));
std::vector<std::string> column_names;
column_names.reserve(_columns.size());
@ -191,12 +195,28 @@ THivePartitionUpdate VHivePartitionWriter::_build_partition_update() {
hive_partition_update.__set_name(_partition_name);
hive_partition_update.__set_update_mode(_update_mode);
THiveLocationParams location;
location.__set_write_path(_write_info.write_path);
location.__set_write_path(_write_info.original_write_path);
location.__set_target_path(_write_info.target_path);
hive_partition_update.__set_location(location);
hive_partition_update.__set_file_names({_get_target_file_name()});
hive_partition_update.__set_row_count(_row_count);
hive_partition_update.__set_file_size(_input_size_in_bytes);
if (_write_info.file_type == TFileType::FILE_S3) {
doris::io::S3FileWriter* s3_mpu_file_writer =
dynamic_cast<doris::io::S3FileWriter*>(_file_writer.get());
TS3MPUPendingUpload s3_mpu_pending_upload;
s3_mpu_pending_upload.__set_bucket(s3_mpu_file_writer->bucket());
s3_mpu_pending_upload.__set_key(s3_mpu_file_writer->key());
s3_mpu_pending_upload.__set_upload_id(s3_mpu_file_writer->upload_id());
std::map<int, std::string> etags;
for (auto& completed_part : s3_mpu_file_writer->completed_parts()) {
etags.insert({completed_part->GetPartNumber(), completed_part->GetETag()});
}
s3_mpu_pending_upload.__set_etags(etags);
hive_partition_update.__set_s3_mpu_pending_uploads({s3_mpu_pending_upload});
}
return hive_partition_update;
}

View File

@ -39,6 +39,7 @@ class VHivePartitionWriter {
public:
struct WriteInfo {
std::string write_path;
std::string original_write_path;
std::string target_path;
TFileType::type file_type;
};

View File

@ -256,26 +256,30 @@ std::shared_ptr<VHivePartitionWriter> VHiveTableWriter::_create_partition_writer
if (existing_table == false) { // new table
update_mode = TUpdateMode::NEW;
if (_partition_columns_input_index.empty()) { // new unpartitioned table
write_info = {write_location.write_path, write_location.target_path,
write_location.file_type};
write_info = {write_location.write_path, write_location.original_write_path,
write_location.target_path, write_location.file_type};
} else { // a new partition in a new partitioned table
auto write_path = fmt::format("{}/{}", write_location.write_path, partition_name);
auto original_write_path =
fmt::format("{}/{}", write_location.original_write_path, partition_name);
auto target_path = fmt::format("{}/{}", write_location.target_path, partition_name);
write_info = {std::move(write_path), std::move(target_path),
write_location.file_type};
write_info = {std::move(write_path), std::move(original_write_path),
std::move(target_path), write_location.file_type};
}
} else { // a new partition in an existing partitioned table, or an existing unpartitioned table
if (_partition_columns_input_index.empty()) { // an existing unpartitioned table
update_mode =
!hive_table_sink.overwrite ? TUpdateMode::APPEND : TUpdateMode::OVERWRITE;
write_info = {write_location.write_path, write_location.target_path,
write_location.file_type};
write_info = {write_location.write_path, write_location.original_write_path,
write_location.target_path, write_location.file_type};
} else { // a new partition in an existing partitioned table
update_mode = TUpdateMode::NEW;
auto write_path = fmt::format("{}/{}", write_location.write_path, partition_name);
auto original_write_path =
fmt::format("{}/{}", write_location.original_write_path, partition_name);
auto target_path = fmt::format("{}/{}", write_location.target_path, partition_name);
write_info = {std::move(write_path), std::move(target_path),
write_location.file_type};
write_info = {std::move(write_path), std::move(original_write_path),
std::move(target_path), write_location.file_type};
}
// need to get schema from existing table ?
}
@ -285,16 +289,21 @@ std::shared_ptr<VHivePartitionWriter> VHiveTableWriter::_create_partition_writer
if (!hive_table_sink.overwrite) {
update_mode = TUpdateMode::APPEND;
auto write_path = fmt::format("{}/{}", write_location.write_path, partition_name);
auto original_write_path =
fmt::format("{}/{}", write_location.original_write_path, partition_name);
auto target_path = fmt::format("{}", existing_partition->location.target_path);
write_info = {std::move(write_path), std::move(target_path),
existing_partition->location.file_type};
write_info = {std::move(write_path), std::move(original_write_path),
std::move(target_path), existing_partition->location.file_type};
file_format_type = existing_partition->file_format;
write_compress_type = hive_table_sink.compression_type;
} else {
update_mode = TUpdateMode::OVERWRITE;
auto write_path = fmt::format("{}/{}", write_location.write_path, partition_name);
auto original_write_path =
fmt::format("{}/{}", write_location.original_write_path, partition_name);
auto target_path = fmt::format("{}/{}", write_location.target_path, partition_name);
write_info = {std::move(write_path), std::move(target_path), write_location.file_type};
write_info = {std::move(write_path), std::move(original_write_path),
std::move(target_path), write_location.file_type};
file_format_type = hive_table_sink.file_format;
write_compress_type = hive_table_sink.compression_type;
// need to get schema from existing table ?