[Feature][Enhancement](hive-writer) Add hive-writer runtime profiles, change output file names (#33245)
Issue Number: #31442 - Add hive-writer runtime profiles. - Change output file names to `${query_id}${uuid}-${index}.${compression}.${format}`. e.g. `"d8735c6fa444a6d-acd392981e510c2b_34fbdcbb-b2e1-4f2c-b68c-a384238954a9-0.snappy.parquet"`. For the same partition writer, when the file size exceeds `hive_sink_max_file_size`, the currently written file will be closed and a new file will be generated, in which ${index} in the new file name will be incremented, while the rest will be the same .
This commit is contained in:
@ -1268,7 +1268,7 @@ DECLARE_mInt64(table_sink_partition_write_skewed_data_processed_rebalance_thresh
|
||||
DECLARE_mInt32(table_sink_partition_write_max_partition_nums_per_writer);
|
||||
|
||||
/** Hive sink configurations **/
|
||||
DECLARE_mInt64(hive_sink_max_file_size); // 1GB
|
||||
DECLARE_mInt64(hive_sink_max_file_size);
|
||||
|
||||
// Number of open tries, default 1 means only try to open once.
|
||||
// Retry the Open num_retries time waiting 100 milliseconds between retries.
|
||||
|
||||
@ -32,8 +32,8 @@ VHivePartitionWriter::VHivePartitionWriter(
|
||||
const TDataSink& t_sink, std::string partition_name, TUpdateMode::type update_mode,
|
||||
const VExprContextSPtrs& output_expr_ctxs, const VExprContextSPtrs& write_output_expr_ctxs,
|
||||
const std::set<size_t>& non_write_columns_indices, const std::vector<THiveColumn>& columns,
|
||||
WriteInfo write_info, std::string file_name, TFileFormatType::type file_format_type,
|
||||
TFileCompressType::type hive_compress_type,
|
||||
WriteInfo write_info, std::string file_name, int file_name_index,
|
||||
TFileFormatType::type file_format_type, TFileCompressType::type hive_compress_type,
|
||||
const std::map<std::string, std::string>& hadoop_conf)
|
||||
: _partition_name(std::move(partition_name)),
|
||||
_update_mode(update_mode),
|
||||
@ -43,6 +43,7 @@ VHivePartitionWriter::VHivePartitionWriter(
|
||||
_columns(columns),
|
||||
_write_info(std::move(write_info)),
|
||||
_file_name(std::move(file_name)),
|
||||
_file_name_index(file_name_index),
|
||||
_file_format_type(file_format_type),
|
||||
_hive_compress_type(hive_compress_type),
|
||||
_hadoop_conf(hadoop_conf)
|
||||
@ -55,12 +56,14 @@ Status VHivePartitionWriter::open(RuntimeState* state, RuntimeProfile* profile)
|
||||
std::vector<TNetworkAddress> broker_addresses;
|
||||
RETURN_IF_ERROR(FileFactory::create_file_writer(
|
||||
_write_info.file_type, state->exec_env(), broker_addresses, _hadoop_conf,
|
||||
fmt::format("{}/{}", _write_info.write_path, _file_name), 0, _file_writer));
|
||||
fmt::format("{}/{}", _write_info.write_path, _file_name, _file_name_index,
|
||||
_get_file_extension(_file_format_type, _hive_compress_type)),
|
||||
0, _file_writer));
|
||||
|
||||
std::vector<std::string> column_names;
|
||||
column_names.reserve(_columns.size());
|
||||
for (int i = 0; i < _columns.size(); i++) {
|
||||
column_names.push_back(_columns[i].name);
|
||||
column_names.emplace_back(_columns[i].name);
|
||||
}
|
||||
|
||||
switch (_file_format_type) {
|
||||
@ -192,11 +195,53 @@ THivePartitionUpdate VHivePartitionWriter::_build_partition_update() {
|
||||
location.__set_write_path(_write_info.write_path);
|
||||
location.__set_target_path(_write_info.target_path);
|
||||
hive_partition_update.__set_location(location);
|
||||
hive_partition_update.__set_file_names({_file_name});
|
||||
hive_partition_update.__set_file_names(
|
||||
{fmt::format("{}-{}{}", _file_name, _file_name_index,
|
||||
_get_file_extension(_file_format_type, _hive_compress_type))});
|
||||
hive_partition_update.__set_row_count(_row_count);
|
||||
hive_partition_update.__set_file_size(_input_size_in_bytes);
|
||||
return hive_partition_update;
|
||||
}
|
||||
|
||||
std::string VHivePartitionWriter::_get_file_extension(TFileFormatType::type file_format_type,
|
||||
TFileCompressType::type write_compress_type) {
|
||||
std::string compress_name;
|
||||
switch (write_compress_type) {
|
||||
case TFileCompressType::SNAPPYBLOCK: {
|
||||
compress_name = ".snappy";
|
||||
break;
|
||||
}
|
||||
case TFileCompressType::ZLIB: {
|
||||
compress_name = ".zlib";
|
||||
break;
|
||||
}
|
||||
case TFileCompressType::ZSTD: {
|
||||
compress_name = ".zstd";
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
compress_name = "";
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
std::string file_format_name;
|
||||
switch (file_format_type) {
|
||||
case TFileFormatType::FORMAT_PARQUET: {
|
||||
file_format_name = ".parquet";
|
||||
break;
|
||||
}
|
||||
case TFileFormatType::FORMAT_ORC: {
|
||||
file_format_name = ".orc";
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
file_format_name = "";
|
||||
break;
|
||||
}
|
||||
}
|
||||
return fmt::format("{}{}", compress_name, file_format_name);
|
||||
}
|
||||
|
||||
} // namespace vectorized
|
||||
} // namespace doris
|
||||
|
||||
@ -48,7 +48,8 @@ public:
|
||||
const VExprContextSPtrs& write_output_expr_ctxs,
|
||||
const std::set<size_t>& non_write_columns_indices,
|
||||
const std::vector<THiveColumn>& columns, WriteInfo write_info,
|
||||
const std::string file_name, TFileFormatType::type file_format_type,
|
||||
std::string file_name, int file_name_index,
|
||||
TFileFormatType::type file_format_type,
|
||||
TFileCompressType::type hive_compress_type,
|
||||
const std::map<std::string, std::string>& hadoop_conf);
|
||||
|
||||
@ -60,6 +61,10 @@ public:
|
||||
|
||||
Status close(const Status& status);
|
||||
|
||||
inline const std::string& file_name() const { return _file_name; }
|
||||
|
||||
inline int file_name_index() const { return _file_name_index; }
|
||||
|
||||
inline size_t written_len() { return _file_format_transformer->written_len(); }
|
||||
|
||||
private:
|
||||
@ -69,6 +74,9 @@ private:
|
||||
|
||||
THivePartitionUpdate _build_partition_update();
|
||||
|
||||
std::string _get_file_extension(TFileFormatType::type file_format_type,
|
||||
TFileCompressType::type write_compress_type);
|
||||
|
||||
std::string _path;
|
||||
|
||||
std::string _partition_name;
|
||||
@ -85,6 +93,7 @@ private:
|
||||
const std::vector<THiveColumn>& _columns;
|
||||
WriteInfo _write_info;
|
||||
std::string _file_name;
|
||||
int _file_name_index;
|
||||
TFileFormatType::type _file_format_type;
|
||||
TFileCompressType::type _hive_compress_type;
|
||||
const std::map<std::string, std::string>& _hadoop_conf;
|
||||
|
||||
@ -42,6 +42,19 @@ Status VHiveTableWriter::open(RuntimeState* state, RuntimeProfile* profile) {
|
||||
_state = state;
|
||||
_profile = profile;
|
||||
|
||||
// add all counter
|
||||
_written_rows_counter = ADD_COUNTER(_profile, "WrittenRows", TUnit::UNIT);
|
||||
_send_data_timer = ADD_TIMER(_profile, "SendDataTime");
|
||||
_partition_writers_dispatch_timer =
|
||||
ADD_CHILD_TIMER(_profile, "PartitionsDispatchTime", "SendDataTime");
|
||||
_partition_writers_write_timer =
|
||||
ADD_CHILD_TIMER(_profile, "PartitionsWriteTime", "SendDataTime");
|
||||
_partition_writers_count = ADD_COUNTER(_profile, "PartitionsWriteCount", TUnit::UNIT);
|
||||
_open_timer = ADD_TIMER(_profile, "OpenTime");
|
||||
_close_timer = ADD_TIMER(_profile, "CloseTime");
|
||||
_write_file_counter = ADD_COUNTER(_profile, "WriteFileCount", TUnit::UNIT);
|
||||
|
||||
SCOPED_TIMER(_open_timer);
|
||||
for (int i = 0; i < _t_sink.hive_table_sink.columns.size(); ++i) {
|
||||
switch (_t_sink.hive_table_sink.columns[i].column_type) {
|
||||
case THiveColumnType::PARTITION_KEY: {
|
||||
@ -68,94 +81,120 @@ Status VHiveTableWriter::open(RuntimeState* state, RuntimeProfile* profile) {
|
||||
}
|
||||
|
||||
Status VHiveTableWriter::write(vectorized::Block& block) {
|
||||
SCOPED_RAW_TIMER(&_send_data_ns);
|
||||
std::unordered_map<std::shared_ptr<VHivePartitionWriter>, IColumn::Filter> writer_positions;
|
||||
|
||||
_row_count += block.rows();
|
||||
auto& hive_table_sink = _t_sink.hive_table_sink;
|
||||
|
||||
if (_partition_columns_input_index.empty()) {
|
||||
auto writer_iter = _partitions_to_writers.find("");
|
||||
if (writer_iter == _partitions_to_writers.end()) {
|
||||
try {
|
||||
std::shared_ptr<VHivePartitionWriter> writer = _create_partition_writer(block, -1);
|
||||
_partitions_to_writers.insert({"", writer});
|
||||
RETURN_IF_ERROR(writer->open(_state, _profile));
|
||||
RETURN_IF_ERROR(writer->write(block));
|
||||
} catch (doris::Exception& e) {
|
||||
return e.to_status();
|
||||
}
|
||||
return Status::OK();
|
||||
} else {
|
||||
std::shared_ptr<VHivePartitionWriter> writer;
|
||||
if (writer_iter->second->written_len() > config::hive_sink_max_file_size) {
|
||||
static_cast<void>(writer_iter->second->close(Status::OK()));
|
||||
_partitions_to_writers.erase(writer_iter);
|
||||
std::shared_ptr<VHivePartitionWriter> writer;
|
||||
{
|
||||
SCOPED_RAW_TIMER(&_partition_writers_dispatch_ns);
|
||||
auto writer_iter = _partitions_to_writers.find("");
|
||||
if (writer_iter == _partitions_to_writers.end()) {
|
||||
try {
|
||||
writer = _create_partition_writer(block, -1);
|
||||
_partitions_to_writers.insert({"", writer});
|
||||
RETURN_IF_ERROR(writer->open(_state, _profile));
|
||||
} catch (doris::Exception& e) {
|
||||
return e.to_status();
|
||||
}
|
||||
_partitions_to_writers.insert({"", writer});
|
||||
RETURN_IF_ERROR(writer->open(_state, _profile));
|
||||
} else {
|
||||
writer = writer_iter->second;
|
||||
if (writer_iter->second->written_len() > config::hive_sink_max_file_size) {
|
||||
std::string file_name(writer_iter->second->file_name());
|
||||
int file_name_index = writer_iter->second->file_name_index();
|
||||
{
|
||||
SCOPED_RAW_TIMER(&_close_ns);
|
||||
static_cast<void>(writer_iter->second->close(Status::OK()));
|
||||
}
|
||||
_partitions_to_writers.erase(writer_iter);
|
||||
try {
|
||||
writer = _create_partition_writer(block, -1, &file_name,
|
||||
file_name_index + 1);
|
||||
} catch (doris::Exception& e) {
|
||||
return e.to_status();
|
||||
}
|
||||
_partitions_to_writers.insert({"", writer});
|
||||
RETURN_IF_ERROR(writer->open(_state, _profile));
|
||||
} else {
|
||||
writer = writer_iter->second;
|
||||
}
|
||||
}
|
||||
RETURN_IF_ERROR(writer->write(block));
|
||||
return Status::OK();
|
||||
}
|
||||
SCOPED_RAW_TIMER(&_partition_writers_write_ns);
|
||||
RETURN_IF_ERROR(writer->write(block));
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
for (int i = 0; i < block.rows(); ++i) {
|
||||
std::vector<std::string> partition_values;
|
||||
try {
|
||||
partition_values = _create_partition_values(block, i);
|
||||
} catch (doris::Exception& e) {
|
||||
return e.to_status();
|
||||
}
|
||||
std::string partition_name = VHiveUtils::make_partition_name(
|
||||
hive_table_sink.columns, _partition_columns_input_index, partition_values);
|
||||
|
||||
auto create_and_open_writer =
|
||||
[&](const std::string& partition_name, int position,
|
||||
std::shared_ptr<VHivePartitionWriter>& writer_ptr) -> Status {
|
||||
{
|
||||
SCOPED_RAW_TIMER(&_partition_writers_dispatch_ns);
|
||||
for (int i = 0; i < block.rows(); ++i) {
|
||||
std::vector<std::string> partition_values;
|
||||
try {
|
||||
auto writer = _create_partition_writer(block, position);
|
||||
RETURN_IF_ERROR(writer->open(_state, _profile));
|
||||
IColumn::Filter filter(block.rows(), 0);
|
||||
filter[position] = 1;
|
||||
writer_positions.insert({writer, std::move(filter)});
|
||||
_partitions_to_writers.insert({partition_name, writer});
|
||||
writer_ptr = writer;
|
||||
partition_values = _create_partition_values(block, i);
|
||||
} catch (doris::Exception& e) {
|
||||
return e.to_status();
|
||||
}
|
||||
return Status::OK();
|
||||
};
|
||||
std::string partition_name = VHiveUtils::make_partition_name(
|
||||
hive_table_sink.columns, _partition_columns_input_index, partition_values);
|
||||
|
||||
auto writer_iter = _partitions_to_writers.find(partition_name);
|
||||
if (writer_iter == _partitions_to_writers.end()) {
|
||||
std::shared_ptr<VHivePartitionWriter> writer;
|
||||
RETURN_IF_ERROR(create_and_open_writer(partition_name, i, writer));
|
||||
} else {
|
||||
std::shared_ptr<VHivePartitionWriter> writer;
|
||||
if (writer_iter->second->written_len() > config::hive_sink_max_file_size) {
|
||||
static_cast<void>(writer_iter->second->close(Status::OK()));
|
||||
writer_positions.erase(writer_iter->second);
|
||||
_partitions_to_writers.erase(writer_iter);
|
||||
RETURN_IF_ERROR(create_and_open_writer(partition_name, i, writer));
|
||||
auto create_and_open_writer =
|
||||
[&](const std::string& partition_name, int position,
|
||||
const std::string* file_name, int file_name_index,
|
||||
std::shared_ptr<VHivePartitionWriter>& writer_ptr) -> Status {
|
||||
try {
|
||||
auto writer =
|
||||
_create_partition_writer(block, position, file_name, file_name_index);
|
||||
RETURN_IF_ERROR(writer->open(_state, _profile));
|
||||
IColumn::Filter filter(block.rows(), 0);
|
||||
filter[position] = 1;
|
||||
writer_positions.insert({writer, std::move(filter)});
|
||||
_partitions_to_writers.insert({partition_name, writer});
|
||||
writer_ptr = writer;
|
||||
} catch (doris::Exception& e) {
|
||||
return e.to_status();
|
||||
}
|
||||
return Status::OK();
|
||||
};
|
||||
|
||||
auto writer_iter = _partitions_to_writers.find(partition_name);
|
||||
if (writer_iter == _partitions_to_writers.end()) {
|
||||
std::shared_ptr<VHivePartitionWriter> writer;
|
||||
if (_partitions_to_writers.size() + 1 >
|
||||
config::table_sink_partition_write_max_partition_nums_per_writer) {
|
||||
return Status::InternalError(
|
||||
"Too many open partitions {}",
|
||||
config::table_sink_partition_write_max_partition_nums_per_writer);
|
||||
}
|
||||
RETURN_IF_ERROR(create_and_open_writer(partition_name, i, nullptr, 0, writer));
|
||||
} else {
|
||||
writer = writer_iter->second;
|
||||
}
|
||||
auto writer_pos_iter = writer_positions.find(writer);
|
||||
if (writer_pos_iter == writer_positions.end()) {
|
||||
IColumn::Filter filter(block.rows(), 0);
|
||||
filter[i] = 1;
|
||||
writer_positions.insert({writer, std::move(filter)});
|
||||
} else {
|
||||
writer_pos_iter->second[i] = 1;
|
||||
std::shared_ptr<VHivePartitionWriter> writer;
|
||||
if (writer_iter->second->written_len() > config::hive_sink_max_file_size) {
|
||||
std::string file_name(writer_iter->second->file_name());
|
||||
int file_name_index = writer_iter->second->file_name_index();
|
||||
{
|
||||
SCOPED_RAW_TIMER(&_close_ns);
|
||||
static_cast<void>(writer_iter->second->close(Status::OK()));
|
||||
}
|
||||
writer_positions.erase(writer_iter->second);
|
||||
_partitions_to_writers.erase(writer_iter);
|
||||
RETURN_IF_ERROR(create_and_open_writer(partition_name, i, &file_name,
|
||||
file_name_index + 1, writer));
|
||||
} else {
|
||||
writer = writer_iter->second;
|
||||
}
|
||||
auto writer_pos_iter = writer_positions.find(writer);
|
||||
if (writer_pos_iter == writer_positions.end()) {
|
||||
IColumn::Filter filter(block.rows(), 0);
|
||||
filter[i] = 1;
|
||||
writer_positions.insert({writer, std::move(filter)});
|
||||
} else {
|
||||
writer_pos_iter->second[i] = 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
SCOPED_RAW_TIMER(&_partition_writers_write_ns);
|
||||
for (auto it = writer_positions.begin(); it != writer_positions.end(); ++it) {
|
||||
RETURN_IF_ERROR(it->first->write(block, &it->second));
|
||||
}
|
||||
@ -163,19 +202,34 @@ Status VHiveTableWriter::write(vectorized::Block& block) {
|
||||
}
|
||||
|
||||
Status VHiveTableWriter::close(Status status) {
|
||||
for (const auto& pair : _partitions_to_writers) {
|
||||
Status st = pair.second->close(status);
|
||||
if (st != Status::OK()) {
|
||||
LOG(WARNING) << fmt::format("Unsupported type for partition {}", st.to_string());
|
||||
continue;
|
||||
int64_t partitions_to_writers_size = _partitions_to_writers.size();
|
||||
{
|
||||
SCOPED_RAW_TIMER(&_close_ns);
|
||||
for (const auto& pair : _partitions_to_writers) {
|
||||
Status st = pair.second->close(status);
|
||||
if (st != Status::OK()) {
|
||||
LOG(WARNING) << fmt::format("Unsupported type for partition {}", st.to_string());
|
||||
continue;
|
||||
}
|
||||
}
|
||||
_partitions_to_writers.clear();
|
||||
}
|
||||
if (status.ok()) {
|
||||
SCOPED_TIMER(_profile->total_time_counter());
|
||||
|
||||
COUNTER_SET(_written_rows_counter, static_cast<int64_t>(_row_count));
|
||||
COUNTER_SET(_send_data_timer, _send_data_ns);
|
||||
COUNTER_SET(_partition_writers_dispatch_timer, _partition_writers_dispatch_ns);
|
||||
COUNTER_SET(_partition_writers_write_timer, _partition_writers_write_ns);
|
||||
COUNTER_SET(_partition_writers_count, partitions_to_writers_size);
|
||||
COUNTER_SET(_close_timer, _close_ns);
|
||||
COUNTER_SET(_write_file_counter, _write_file_count);
|
||||
}
|
||||
_partitions_to_writers.clear();
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
std::shared_ptr<VHivePartitionWriter> VHiveTableWriter::_create_partition_writer(
|
||||
vectorized::Block& block, int position) {
|
||||
vectorized::Block& block, int position, const std::string* file_name, int file_name_index) {
|
||||
auto& hive_table_sink = _t_sink.hive_table_sink;
|
||||
std::vector<std::string> partition_values;
|
||||
std::string partition_name;
|
||||
@ -247,13 +301,12 @@ std::shared_ptr<VHivePartitionWriter> VHiveTableWriter::_create_partition_writer
|
||||
}
|
||||
}
|
||||
|
||||
_write_file_count++;
|
||||
return std::make_shared<VHivePartitionWriter>(
|
||||
_t_sink, std::move(partition_name), update_mode, _vec_output_expr_ctxs,
|
||||
_write_output_vexpr_ctxs, _non_write_columns_indices, hive_table_sink.columns,
|
||||
std::move(write_info),
|
||||
fmt::format("{}{}", _compute_file_name(),
|
||||
_get_file_extension(file_format_type, write_compress_type)),
|
||||
file_format_type, write_compress_type, hive_table_sink.hadoop_config);
|
||||
std::move(write_info), (file_name == nullptr) ? _compute_file_name() : *file_name,
|
||||
file_name_index, file_format_type, write_compress_type, hive_table_sink.hadoop_config);
|
||||
}
|
||||
|
||||
std::vector<std::string> VHiveTableWriter::_create_partition_values(vectorized::Block& block,
|
||||
@ -397,46 +450,6 @@ std::string VHiveTableWriter::_to_partition_value(const TypeDescriptor& type_des
|
||||
}
|
||||
}
|
||||
|
||||
std::string VHiveTableWriter::_get_file_extension(TFileFormatType::type file_format_type,
|
||||
TFileCompressType::type write_compress_type) {
|
||||
std::string compress_name;
|
||||
switch (write_compress_type) {
|
||||
case TFileCompressType::SNAPPYBLOCK: {
|
||||
compress_name = ".snappy";
|
||||
break;
|
||||
}
|
||||
case TFileCompressType::ZLIB: {
|
||||
compress_name = ".zlib";
|
||||
break;
|
||||
}
|
||||
case TFileCompressType::ZSTD: {
|
||||
compress_name = ".zstd";
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
compress_name = "";
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
std::string file_format_name;
|
||||
switch (file_format_type) {
|
||||
case TFileFormatType::FORMAT_PARQUET: {
|
||||
file_format_name = ".parquet";
|
||||
break;
|
||||
}
|
||||
case TFileFormatType::FORMAT_ORC: {
|
||||
file_format_name = ".orc";
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
file_format_name = "";
|
||||
break;
|
||||
}
|
||||
}
|
||||
return fmt::format("{}{}", compress_name, file_format_name);
|
||||
}
|
||||
|
||||
std::string VHiveTableWriter::_compute_file_name() {
|
||||
boost::uuids::uuid uuid = boost::uuids::random_generator()();
|
||||
|
||||
|
||||
@ -19,6 +19,7 @@
|
||||
|
||||
#include <gen_cpp/DataSinks_types.h>
|
||||
|
||||
#include "util/runtime_profile.h"
|
||||
#include "vec/exprs/vexpr_fwd.h"
|
||||
#include "vec/sink/writer/async_result_writer.h"
|
||||
|
||||
@ -50,17 +51,15 @@ public:
|
||||
Status close(Status) override;
|
||||
|
||||
private:
|
||||
std::shared_ptr<VHivePartitionWriter> _create_partition_writer(vectorized::Block& block,
|
||||
int position);
|
||||
std::shared_ptr<VHivePartitionWriter> _create_partition_writer(
|
||||
vectorized::Block& block, int position, const std::string* file_name = nullptr,
|
||||
int file_name_index = 0);
|
||||
|
||||
std::vector<std::string> _create_partition_values(vectorized::Block& block, int position);
|
||||
|
||||
std::string _to_partition_value(const TypeDescriptor& type_desc,
|
||||
const ColumnWithTypeAndName& partition_column, int position);
|
||||
|
||||
std::string _get_file_extension(TFileFormatType::type file_format_type,
|
||||
TFileCompressType::type write_compress_type);
|
||||
|
||||
std::string _compute_file_name();
|
||||
|
||||
// Currently it is a copy, maybe it is better to use move semantics to eliminate it.
|
||||
@ -72,6 +71,24 @@ private:
|
||||
std::unordered_map<std::string, std::shared_ptr<VHivePartitionWriter>> _partitions_to_writers;
|
||||
|
||||
VExprContextSPtrs _write_output_vexpr_ctxs;
|
||||
|
||||
size_t _row_count = 0;
|
||||
|
||||
// profile counters
|
||||
int64_t _send_data_ns = 0;
|
||||
int64_t _partition_writers_dispatch_ns = 0;
|
||||
int64_t _partition_writers_write_ns = 0;
|
||||
int64_t _close_ns = 0;
|
||||
int64_t _write_file_count = 0;
|
||||
|
||||
RuntimeProfile::Counter* _written_rows_counter = nullptr;
|
||||
RuntimeProfile::Counter* _send_data_timer = nullptr;
|
||||
RuntimeProfile::Counter* _partition_writers_dispatch_timer = nullptr;
|
||||
RuntimeProfile::Counter* _partition_writers_write_timer = nullptr;
|
||||
RuntimeProfile::Counter* _partition_writers_count = nullptr;
|
||||
RuntimeProfile::Counter* _open_timer = nullptr;
|
||||
RuntimeProfile::Counter* _close_timer = nullptr;
|
||||
RuntimeProfile::Counter* _write_file_counter = nullptr;
|
||||
};
|
||||
} // namespace vectorized
|
||||
} // namespace doris
|
||||
|
||||
Reference in New Issue
Block a user