[feature](outfile) support parquet writer (#12492)

This commit is contained in:
Gabriel
2022-09-15 11:09:12 +08:00
committed by GitHub
parent 22a8d35999
commit fc4298e85e
6 changed files with 777 additions and 23 deletions

View File

@ -219,6 +219,7 @@ set(VEC_FILES
runtime/vdata_stream_mgr.cpp
runtime/vfile_result_writer.cpp
runtime/vpartition_info.cpp
runtime/vparquet_writer.cpp
utils/arrow_column_to_doris_column.cpp
runtime/vsorted_run_merger.cpp
exec/file_arrow_scanner.cpp

View File

@ -54,7 +54,8 @@ VFileResultWriter::VFileResultWriter(
_parent_profile(parent_profile),
_sinker(sinker),
_output_block(output_block),
_output_row_descriptor(output_row_descriptor) {
_output_row_descriptor(output_row_descriptor),
_vparquet_writer(nullptr) {
_output_object_data = output_object_data;
}
@ -117,7 +118,10 @@ Status VFileResultWriter::_create_file_writer(const std::string& file_name) {
// just use file writer is enough
break;
case TFileFormatType::FORMAT_PARQUET:
return Status::NotSupported("Parquet Writer is not supported yet!");
_vparquet_writer.reset(new VParquetWriterWrapper(
_file_writer_impl.get(), _output_vexpr_ctxs, _file_opts->file_properties,
_file_opts->schema, _output_object_data));
RETURN_IF_ERROR(_vparquet_writer->init());
break;
default:
return Status::InternalError("unsupported file format: {}", _file_opts->file_format);
@ -183,18 +187,18 @@ Status VFileResultWriter::append_block(Block& block) {
}
RETURN_IF_ERROR(write_csv_header());
SCOPED_TIMER(_append_row_batch_timer);
if (_parquet_writer != nullptr) {
return Status::NotSupported("Parquet Writer is not supported yet!");
Status status = Status::OK();
// Exec vectorized expr here to speed up, block.rows() == 0 means expr exec
// failed, just return the error status
auto output_block =
VExprContext::get_output_block_after_execute_exprs(_output_vexpr_ctxs, block, status);
auto num_rows = output_block.rows();
if (UNLIKELY(num_rows == 0)) {
return status;
}
if (_vparquet_writer) {
_write_parquet_file(output_block);
} else {
Status status = Status::OK();
// Exec vectorized expr here to speed up, block.rows() == 0 means expr exec
// failed, just return the error status
auto output_block = VExprContext::get_output_block_after_execute_exprs(_output_vexpr_ctxs,
block, status);
auto num_rows = output_block.rows();
if (UNLIKELY(num_rows == 0)) {
return status;
}
RETURN_IF_ERROR(_write_csv_file(output_block));
}
@ -202,6 +206,12 @@ Status VFileResultWriter::append_block(Block& block) {
return Status::OK();
}
Status VFileResultWriter::_write_parquet_file(const Block& block) {
RETURN_IF_ERROR(_vparquet_writer->write(block));
// split file if exceed limit
return _create_new_file_if_exceed_size();
}
Status VFileResultWriter::_write_csv_file(const Block& block) {
for (size_t i = 0; i < block.rows(); i++) {
for (size_t col_id = 0; col_id < block.columns(); col_id++) {
@ -397,8 +407,11 @@ Status VFileResultWriter::_create_new_file_if_exceed_size() {
}
Status VFileResultWriter::_close_file_writer(bool done) {
if (_parquet_writer != nullptr) {
return Status::NotSupported("Parquet Writer is not supported yet!");
if (_vparquet_writer) {
_vparquet_writer->close();
_current_written_bytes = _vparquet_writer->written_len();
COUNTER_UPDATE(_written_data_bytes, _current_written_bytes);
_vparquet_writer.reset(nullptr);
} else if (_file_writer_impl) {
_file_writer_impl->close();
}

View File

@ -19,6 +19,7 @@
#include "io/file_writer.h"
#include "runtime/file_result_writer.h"
#include "vec/runtime/vparquet_writer.h"
#include "vec/sink/vresult_sink.h"
namespace doris {
@ -51,6 +52,7 @@ public:
Status write_csv_header();
private:
Status _write_parquet_file(const Block& block);
Status _write_csv_file(const Block& block);
// if buffer exceed the limit, write the data buffered in _plain_text_outstream via file_writer
@ -84,9 +86,7 @@ private:
// If the result file format is plain text, like CSV, this _file_writer is owned by this FileResultWriter.
// If the result file format is Parquet, this _file_writer is owned by _parquet_writer.
std::unique_ptr<FileWriter> _file_writer_impl;
// parquet file writer
ParquetWriterWrapper* _parquet_writer = nullptr;
std::unique_ptr<doris::FileWriter> _file_writer_impl;
// Used to buffer the export data of plain text
// TODO(cmy): I simply use a stringstrteam to buffer the data, to avoid calling
// file writer's write() for every single row.
@ -122,6 +122,8 @@ private:
bool _is_result_sent = false;
bool _header_sent = false;
RowDescriptor _output_row_descriptor;
// parquet file writer
std::unique_ptr<VParquetWriterWrapper> _vparquet_writer;
};
} // namespace vectorized
} // namespace doris

View File

@ -0,0 +1,654 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "vec/runtime/vparquet_writer.h"
#include <arrow/array.h>
#include <arrow/status.h>
#include <time.h>
#include "io/file_writer.h"
#include "util/mysql_global.h"
#include "util/types.h"
#include "vec/columns/column_complex.h"
#include "vec/columns/column_nullable.h"
#include "vec/columns/column_string.h"
#include "vec/columns/column_vector.h"
#include "vec/data_types/data_type_decimal.h"
#include "vec/data_types/data_type_nullable.h"
#include "vec/exprs/vexpr.h"
#include "vec/functions/function_helpers.h"
namespace doris::vectorized {
VParquetWriterWrapper::VParquetWriterWrapper(doris::FileWriter* file_writer,
const std::vector<VExprContext*>& output_vexpr_ctxs,
const std::map<std::string, std::string>& properties,
const std::vector<std::vector<std::string>>& schema,
bool output_object_data)
: _output_vexpr_ctxs(output_vexpr_ctxs),
_str_schema(schema),
_cur_written_rows(0),
_rg_writer(nullptr),
_output_object_data(output_object_data) {
_outstream = std::shared_ptr<ParquetOutputStream>(new ParquetOutputStream(file_writer));
parse_properties(properties);
}
void VParquetWriterWrapper::parse_properties(
const std::map<std::string, std::string>& propertie_map) {
parquet::WriterProperties::Builder builder;
for (auto it = propertie_map.begin(); it != propertie_map.end(); it++) {
std::string property_name = it->first;
std::string property_value = it->second;
if (property_name == "compression") {
// UNCOMPRESSED, SNAPPY, GZIP, BROTLI, ZSTD, LZ4, LZO, BZ2
if (property_value == "snappy") {
builder.compression(parquet::Compression::SNAPPY);
} else if (property_value == "gzip") {
builder.compression(parquet::Compression::GZIP);
} else if (property_value == "brotli") {
builder.compression(parquet::Compression::BROTLI);
} else if (property_value == "zstd") {
builder.compression(parquet::Compression::ZSTD);
} else if (property_value == "lz4") {
builder.compression(parquet::Compression::LZ4);
} else if (property_value == "lzo") {
builder.compression(parquet::Compression::LZO);
} else if (property_value == "bz2") {
builder.compression(parquet::Compression::BZ2);
} else {
builder.compression(parquet::Compression::UNCOMPRESSED);
}
} else if (property_name == "disable_dictionary") {
if (property_value == "true") {
builder.enable_dictionary();
} else {
builder.disable_dictionary();
}
} else if (property_name == "version") {
if (property_value == "v1") {
builder.version(parquet::ParquetVersion::PARQUET_1_0);
} else {
builder.version(parquet::ParquetVersion::PARQUET_2_LATEST);
}
}
}
_properties = builder.build();
}
Status VParquetWriterWrapper::parse_schema(const std::vector<std::vector<std::string>>& schema) {
parquet::schema::NodeVector fields;
for (auto column = schema.begin(); column != schema.end(); column++) {
std::string repetition_type = (*column)[0];
parquet::Repetition::type parquet_repetition_type = parquet::Repetition::REQUIRED;
if (repetition_type.find("required") != std::string::npos) {
parquet_repetition_type = parquet::Repetition::REQUIRED;
} else if (repetition_type.find("repeated") != std::string::npos) {
parquet_repetition_type = parquet::Repetition::REPEATED;
} else if (repetition_type.find("optional") != std::string::npos) {
parquet_repetition_type = parquet::Repetition::OPTIONAL;
} else {
parquet_repetition_type = parquet::Repetition::UNDEFINED;
}
std::string data_type = (*column)[1];
parquet::Type::type parquet_data_type = parquet::Type::BYTE_ARRAY;
if (data_type == "boolean") {
parquet_data_type = parquet::Type::BOOLEAN;
} else if (data_type.find("int32") != std::string::npos) {
parquet_data_type = parquet::Type::INT32;
} else if (data_type.find("int64") != std::string::npos) {
parquet_data_type = parquet::Type::INT64;
} else if (data_type.find("int96") != std::string::npos) {
parquet_data_type = parquet::Type::INT96;
} else if (data_type.find("float") != std::string::npos) {
parquet_data_type = parquet::Type::FLOAT;
} else if (data_type.find("double") != std::string::npos) {
parquet_data_type = parquet::Type::DOUBLE;
} else if (data_type.find("byte_array") != std::string::npos) {
parquet_data_type = parquet::Type::BYTE_ARRAY;
} else if (data_type.find("fixed_len_byte_array") != std::string::npos) {
parquet_data_type = parquet::Type::FIXED_LEN_BYTE_ARRAY;
} else {
parquet_data_type = parquet::Type::UNDEFINED;
}
std::string column_name = (*column)[2];
fields.push_back(parquet::schema::PrimitiveNode::Make(column_name, parquet_repetition_type,
parquet::LogicalType::None(),
parquet_data_type));
_schema = std::static_pointer_cast<parquet::schema::GroupNode>(
parquet::schema::GroupNode::Make("schema", parquet::Repetition::REQUIRED, fields));
}
return Status::OK();
}
Status VParquetWriterWrapper::init() {
RETURN_IF_ERROR(parse_schema(_str_schema));
RETURN_IF_ERROR(init_parquet_writer());
RETURN_IF_ERROR(validate_schema());
return Status::OK();
}
Status VParquetWriterWrapper::validate_schema() {
for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) {
switch (_output_vexpr_ctxs[i]->root()->type().type) {
case TYPE_BOOLEAN: {
if (_str_schema[i][1] != "boolean") {
return Status::InvalidArgument(
"project field type is boolean, "
"but the definition type of column {} is {}",
_str_schema[i][2], _str_schema[i][1]);
}
break;
}
case TYPE_TINYINT:
case TYPE_SMALLINT:
case TYPE_INT: {
if (_str_schema[i][1] != "int32") {
return Status::InvalidArgument(
"project field type is {}, should use int32,"
" but the definition type of column {} is {}",
_output_vexpr_ctxs[i]->root()->type().debug_string(), _str_schema[i][2],
_str_schema[i][1]);
}
break;
}
case TYPE_LARGEINT: {
return Status::InvalidArgument("do not support large int type.");
}
case TYPE_FLOAT: {
if (_str_schema[i][1] != "float") {
return Status::InvalidArgument(
"project field type is float, "
"but the definition type of column {} is {}",
_str_schema[i][2], _str_schema[i][1]);
}
break;
}
case TYPE_DOUBLE: {
if (_str_schema[i][1] != "double") {
return Status::InvalidArgument(
"project field type is double, "
"but the definition type of column {} is {}",
_str_schema[i][2], _str_schema[i][1]);
}
break;
}
case TYPE_BIGINT:
case TYPE_DATETIME:
case TYPE_DATE:
case TYPE_DATEV2:
case TYPE_DATETIMEV2: {
if (_str_schema[i][1] != "int64") {
return Status::InvalidArgument(
"project field type is {}, should use int64, "
"but the definition type of column {} is {}",
_output_vexpr_ctxs[i]->root()->type().debug_string(), _str_schema[i][2],
_str_schema[i][1]);
}
break;
}
case TYPE_HLL:
case TYPE_OBJECT: {
if (!_output_object_data) {
return Status::InvalidArgument(
"Invalid expression type: {}",
_output_vexpr_ctxs[i]->root()->type().debug_string());
}
[[fallthrough]];
}
case TYPE_CHAR:
case TYPE_VARCHAR:
case TYPE_STRING:
case TYPE_DECIMALV2:
case TYPE_DECIMAL32:
case TYPE_DECIMAL64:
case TYPE_DECIMAL128: {
if (_str_schema[i][1] != "byte_array") {
return Status::InvalidArgument(
"project field type is {}, should use byte_array, "
"but the definition type of column {} is {}",
_output_vexpr_ctxs[i]->root()->type().debug_string(), _str_schema[i][2],
_str_schema[i][1]);
}
break;
}
default: {
return Status::InvalidArgument("Invalid expression type: {}",
_output_vexpr_ctxs[i]->root()->type().debug_string());
}
}
}
return Status::OK();
}
#define RETURN_WRONG_TYPE \
return Status::InvalidArgument("Invalid column type: {}", raw_column->get_name());
#define DISPATCH_PARQUET_NUMERIC_WRITER(WRITER, COLUMN_TYPE, NATIVE_TYPE) \
parquet::RowGroupWriter* rgWriter = get_rg_writer(); \
parquet::WRITER* col_writer = static_cast<parquet::WRITER*>(rgWriter->column(i)); \
__int128 default_value = 0; \
if (null_map != nullptr) { \
for (size_t row_id = 0; row_id < sz; row_id++) { \
col_writer->WriteBatch(1, nullptr, nullptr, \
(*null_map)[row_id] != 0 \
? reinterpret_cast<const NATIVE_TYPE*>(&default_value) \
: reinterpret_cast<const NATIVE_TYPE*>( \
assert_cast<const COLUMN_TYPE&>(*col) \
.get_data_at(row_id) \
.data)); \
} \
} else if (const auto* not_nullable_column = check_and_get_column<const COLUMN_TYPE>(col)) { \
col_writer->WriteBatch( \
sz, nullptr, nullptr, \
reinterpret_cast<const NATIVE_TYPE*>(not_nullable_column->get_data().data())); \
} else { \
RETURN_WRONG_TYPE \
}
#define DISPATCH_PARQUET_DECIMAL_WRITER(DECIMAL_TYPE) \
parquet::RowGroupWriter* rgWriter = get_rg_writer(); \
parquet::ByteArrayWriter* col_writer = \
static_cast<parquet::ByteArrayWriter*>(rgWriter->column(i)); \
parquet::ByteArray value; \
auto decimal_type = \
check_and_get_data_type<DataTypeDecimal<DECIMAL_TYPE>>(remove_nullable(type).get()); \
DCHECK(decimal_type); \
if (null_map != nullptr) { \
for (size_t row_id = 0; row_id < sz; row_id++) { \
if ((*null_map)[row_id] != 0) { \
col_writer->WriteBatch(1, nullptr, nullptr, &value); \
} else { \
auto s = decimal_type->to_string(*col, row_id); \
value.ptr = reinterpret_cast<const uint8_t*>(s.data()); \
value.len = s.size(); \
col_writer->WriteBatch(1, nullptr, nullptr, &value); \
} \
} \
} else { \
for (size_t row_id = 0; row_id < sz; row_id++) { \
auto s = decimal_type->to_string(*col, row_id); \
value.ptr = reinterpret_cast<const uint8_t*>(s.data()); \
value.len = s.size(); \
col_writer->WriteBatch(1, nullptr, nullptr, &value); \
} \
}
#define DISPATCH_PARQUET_COMPLEX_WRITER(COLUMN_TYPE) \
parquet::RowGroupWriter* rgWriter = get_rg_writer(); \
parquet::ByteArrayWriter* col_writer = \
static_cast<parquet::ByteArrayWriter*>(rgWriter->column(i)); \
if (null_map != nullptr) { \
for (size_t row_id = 0; row_id < sz; row_id++) { \
if ((*null_map)[row_id] != 0) { \
parquet::ByteArray value; \
col_writer->WriteBatch(1, nullptr, nullptr, &value); \
} else { \
const auto& tmp = col->get_data_at(row_id); \
parquet::ByteArray value; \
value.ptr = reinterpret_cast<const uint8_t*>(tmp.data); \
value.len = tmp.size; \
col_writer->WriteBatch(1, nullptr, nullptr, &value); \
} \
} \
} else if (const auto* not_nullable_column = check_and_get_column<const COLUMN_TYPE>(col)) { \
for (size_t row_id = 0; row_id < sz; row_id++) { \
const auto& tmp = not_nullable_column->get_data_at(row_id); \
parquet::ByteArray value; \
value.ptr = reinterpret_cast<const uint8_t*>(tmp.data); \
value.len = tmp.size; \
col_writer->WriteBatch(1, nullptr, nullptr, &value); \
} \
} else { \
RETURN_WRONG_TYPE \
}
Status VParquetWriterWrapper::write(const Block& block) {
if (block.rows() == 0) {
return Status::OK();
}
size_t sz = block.rows();
try {
for (size_t i = 0; i < block.columns(); i++) {
auto& raw_column = block.get_by_position(i).column;
const auto col = raw_column->is_nullable()
? reinterpret_cast<const ColumnNullable*>(
block.get_by_position(i).column.get())
->get_nested_column_ptr()
.get()
: block.get_by_position(i).column.get();
auto null_map =
raw_column->is_nullable() && reinterpret_cast<const ColumnNullable*>(
block.get_by_position(i).column.get())
->get_null_map_column_ptr()
->has_null()
? reinterpret_cast<const ColumnNullable*>(
block.get_by_position(i).column.get())
->get_null_map_column_ptr()
: nullptr;
auto& type = block.get_by_position(i).type;
switch (_output_vexpr_ctxs[i]->root()->type().type) {
case TYPE_BOOLEAN: {
DISPATCH_PARQUET_NUMERIC_WRITER(BoolWriter, ColumnVector<UInt8>, bool)
break;
}
case TYPE_BIGINT: {
DISPATCH_PARQUET_NUMERIC_WRITER(Int64Writer, ColumnVector<Int64>, int64_t)
break;
}
case TYPE_LARGEINT: {
return Status::InvalidArgument("do not support large int type.");
}
case TYPE_FLOAT: {
DISPATCH_PARQUET_NUMERIC_WRITER(FloatWriter, ColumnVector<Float32>, float_t)
break;
}
case TYPE_DOUBLE: {
DISPATCH_PARQUET_NUMERIC_WRITER(DoubleWriter, ColumnVector<Float64>, double_t)
break;
}
case TYPE_TINYINT:
case TYPE_SMALLINT:
case TYPE_INT: {
parquet::RowGroupWriter* rgWriter = get_rg_writer();
parquet::Int32Writer* col_writer =
static_cast<parquet::Int32Writer*>(rgWriter->column(i));
int32_t default_int32 = 0;
if (null_map != nullptr) {
if (const auto* nested_column =
check_and_get_column<const ColumnVector<Int32>>(col)) {
for (size_t row_id = 0; row_id < sz; row_id++) {
col_writer->WriteBatch(
1, nullptr, nullptr,
(*null_map)[row_id] != 0
? &default_int32
: reinterpret_cast<const int32_t*>(
nested_column->get_data_at(row_id).data));
}
} else if (const auto* int16_column =
check_and_get_column<const ColumnVector<Int16>>(col)) {
for (size_t row_id = 0; row_id < sz; row_id++) {
const int32_t tmp = int16_column->get_data()[row_id];
col_writer->WriteBatch(
1, nullptr, nullptr,
(*null_map)[row_id] != 0
? &default_int32
: reinterpret_cast<const int32_t*>(&tmp));
}
} else if (const auto* int8_column =
check_and_get_column<const ColumnVector<Int8>>(col)) {
for (size_t row_id = 0; row_id < sz; row_id++) {
const int32_t tmp = int8_column->get_data()[row_id];
col_writer->WriteBatch(
1, nullptr, nullptr,
(*null_map)[row_id] != 0
? &default_int32
: reinterpret_cast<const int32_t*>(&tmp));
}
} else {
RETURN_WRONG_TYPE
}
} else if (const auto* not_nullable_column =
check_and_get_column<const ColumnVector<Int32>>(col)) {
col_writer->WriteBatch(sz, nullptr, nullptr,
reinterpret_cast<const int32_t*>(
not_nullable_column->get_data().data()));
} else if (const auto& int16_column =
check_and_get_column<const ColumnVector<Int16>>(col)) {
for (size_t row_id = 0; row_id < sz; row_id++) {
const int32_t tmp = int16_column->get_data()[row_id];
col_writer->WriteBatch(1, nullptr, nullptr,
reinterpret_cast<const int32_t*>(&tmp));
}
} else if (const auto& int8_column =
check_and_get_column<const ColumnVector<Int8>>(col)) {
for (size_t row_id = 0; row_id < sz; row_id++) {
const int32_t tmp = int8_column->get_data()[row_id];
col_writer->WriteBatch(1, nullptr, nullptr,
reinterpret_cast<const int32_t*>(&tmp));
}
} else {
RETURN_WRONG_TYPE
}
break;
}
case TYPE_DATETIME:
case TYPE_DATE: {
parquet::RowGroupWriter* rgWriter = get_rg_writer();
parquet::Int64Writer* col_writer =
static_cast<parquet::Int64Writer*>(rgWriter->column(i));
int64_t default_int64 = 0;
if (null_map != nullptr) {
for (size_t row_id = 0; row_id < sz; row_id++) {
if ((*null_map)[row_id] != 0) {
col_writer->WriteBatch(1, nullptr, nullptr, &default_int64);
} else {
const auto tmp = binary_cast<Int64, VecDateTimeValue>(
assert_cast<const ColumnVector<Int64>&>(*col)
.get_data()[row_id])
.to_olap_datetime();
col_writer->WriteBatch(1, nullptr, nullptr,
reinterpret_cast<const int64_t*>(&tmp));
}
}
} else if (const auto* not_nullable_column =
check_and_get_column<const ColumnVector<Int64>>(col)) {
std::vector<uint64_t> res(sz);
for (size_t row_id = 0; row_id < sz; row_id++) {
res[row_id] = binary_cast<Int64, VecDateTimeValue>(
not_nullable_column->get_data()[row_id])
.to_olap_datetime();
}
col_writer->WriteBatch(sz, nullptr, nullptr,
reinterpret_cast<const int64_t*>(res.data()));
} else {
RETURN_WRONG_TYPE
}
break;
}
case TYPE_DATEV2: {
parquet::RowGroupWriter* rgWriter = get_rg_writer();
parquet::Int64Writer* col_writer =
static_cast<parquet::Int64Writer*>(rgWriter->column(i));
int64_t default_int64 = 0;
if (null_map != nullptr) {
for (size_t row_id = 0; row_id < sz; row_id++) {
if ((*null_map)[row_id] != 0) {
col_writer->WriteBatch(1, nullptr, nullptr, &default_int64);
} else {
uint64_t tmp = binary_cast<UInt32, DateV2Value<DateV2ValueType>>(
assert_cast<const ColumnVector<UInt32>&>(*col)
.get_data()[row_id])
.to_olap_datetime();
col_writer->WriteBatch(1, nullptr, nullptr,
reinterpret_cast<const int64_t*>(&tmp));
}
}
} else if (const auto* not_nullable_column =
check_and_get_column<const ColumnVector<UInt32>>(col)) {
std::vector<uint64_t> res(sz);
for (size_t row_id = 0; row_id < sz; row_id++) {
res[row_id] = binary_cast<UInt32, DateV2Value<DateV2ValueType>>(
not_nullable_column->get_data()[row_id])
.to_olap_datetime();
}
col_writer->WriteBatch(sz, nullptr, nullptr,
reinterpret_cast<const int64_t*>(res.data()));
} else {
RETURN_WRONG_TYPE
}
break;
}
case TYPE_DATETIMEV2: {
parquet::RowGroupWriter* rgWriter = get_rg_writer();
parquet::Int64Writer* col_writer =
static_cast<parquet::Int64Writer*>(rgWriter->column(i));
int64_t default_int64 = 0;
if (null_map != nullptr) {
for (size_t row_id = 0; row_id < sz; row_id++) {
if ((*null_map)[row_id] != 0) {
col_writer->WriteBatch(1, nullptr, nullptr, &default_int64);
} else {
uint64_t tmp = binary_cast<UInt64, DateV2Value<DateTimeV2ValueType>>(
assert_cast<const ColumnVector<UInt64>&>(*col)
.get_data()[row_id])
.to_olap_datetime();
col_writer->WriteBatch(1, nullptr, nullptr,
reinterpret_cast<const int64_t*>(&tmp));
}
}
} else if (const auto* not_nullable_column =
check_and_get_column<const ColumnVector<UInt64>>(col)) {
std::vector<uint64_t> res(sz);
for (size_t row_id = 0; row_id < sz; row_id++) {
res[row_id] = binary_cast<UInt64, DateV2Value<DateTimeV2ValueType>>(
not_nullable_column->get_data()[row_id])
.to_olap_datetime();
}
col_writer->WriteBatch(sz, nullptr, nullptr,
reinterpret_cast<const int64_t*>(res.data()));
} else {
RETURN_WRONG_TYPE
}
break;
}
case TYPE_OBJECT: {
DISPATCH_PARQUET_COMPLEX_WRITER(ColumnBitmap)
break;
}
case TYPE_HLL: {
DISPATCH_PARQUET_COMPLEX_WRITER(ColumnHLL)
break;
}
case TYPE_CHAR:
case TYPE_VARCHAR:
case TYPE_STRING: {
DISPATCH_PARQUET_COMPLEX_WRITER(ColumnString)
break;
}
case TYPE_DECIMALV2: {
parquet::RowGroupWriter* rgWriter = get_rg_writer();
parquet::ByteArrayWriter* col_writer =
static_cast<parquet::ByteArrayWriter*>(rgWriter->column(i));
parquet::ByteArray value;
if (null_map != nullptr) {
for (size_t row_id = 0; row_id < sz; row_id++) {
if ((*null_map)[row_id] != 0) {
col_writer->WriteBatch(1, nullptr, nullptr, &value);
} else {
const DecimalV2Value decimal_val(reinterpret_cast<const PackedInt128*>(
col->get_data_at(row_id).data)
->value);
char decimal_buffer[MAX_DECIMAL_WIDTH];
int output_scale = _output_vexpr_ctxs[i]->root()->type().scale;
value.ptr = reinterpret_cast<const uint8_t*>(decimal_buffer);
value.len = decimal_val.to_buffer(decimal_buffer, output_scale);
col_writer->WriteBatch(1, nullptr, nullptr, &value);
}
}
} else if (const auto* not_nullable_column =
check_and_get_column<const ColumnDecimal128>(col)) {
for (size_t row_id = 0; row_id < sz; row_id++) {
const DecimalV2Value decimal_val(
reinterpret_cast<const PackedInt128*>(
not_nullable_column->get_data_at(row_id).data)
->value);
char decimal_buffer[MAX_DECIMAL_WIDTH];
int output_scale = _output_vexpr_ctxs[i]->root()->type().scale;
value.ptr = reinterpret_cast<const uint8_t*>(decimal_buffer);
value.len = decimal_val.to_buffer(decimal_buffer, output_scale);
col_writer->WriteBatch(1, nullptr, nullptr, &value);
}
} else {
RETURN_WRONG_TYPE
}
break;
}
case TYPE_DECIMAL32: {
DISPATCH_PARQUET_DECIMAL_WRITER(Decimal32)
break;
}
case TYPE_DECIMAL64: {
DISPATCH_PARQUET_DECIMAL_WRITER(Decimal64)
break;
}
case TYPE_DECIMAL128: {
DISPATCH_PARQUET_DECIMAL_WRITER(Decimal128)
break;
}
default: {
return Status::InvalidArgument(
"Invalid expression type: {}",
_output_vexpr_ctxs[i]->root()->type().debug_string());
}
}
}
} catch (const std::exception& e) {
LOG(WARNING) << "Parquet write error: " << e.what();
return Status::InternalError(e.what());
}
_cur_written_rows += sz;
return Status::OK();
}
Status VParquetWriterWrapper::init_parquet_writer() {
_writer = parquet::ParquetFileWriter::Open(_outstream, _schema, _properties);
if (_writer == nullptr) {
return Status::InternalError("Failed to create file writer");
}
return Status::OK();
}
parquet::RowGroupWriter* VParquetWriterWrapper::get_rg_writer() {
if (_rg_writer == nullptr) {
_rg_writer = _writer->AppendBufferedRowGroup();
}
if (_cur_written_rows > _max_row_per_group) {
_rg_writer->Close();
_rg_writer = _writer->AppendBufferedRowGroup();
_cur_written_rows = 0;
}
return _rg_writer;
}
int64_t VParquetWriterWrapper::written_len() {
return _outstream->get_written_len();
}
void VParquetWriterWrapper::close() {
try {
if (_rg_writer != nullptr) {
_rg_writer->Close();
_rg_writer = nullptr;
}
_writer->Close();
arrow::Status st = _outstream->Close();
if (!st.ok()) {
LOG(WARNING) << "close parquet file error: " << st.ToString();
}
} catch (const std::exception& e) {
_rg_writer = nullptr;
LOG(WARNING) << "Parquet writer close error: " << e.what();
}
}
VParquetWriterWrapper::~VParquetWriterWrapper() {}
} // namespace doris::vectorized

View File

@ -0,0 +1,84 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <arrow/api.h>
#include <arrow/buffer.h>
#include <arrow/io/api.h>
#include <arrow/io/file.h>
#include <arrow/io/interfaces.h>
#include <parquet/api/reader.h>
#include <parquet/api/writer.h>
#include <parquet/arrow/reader.h>
#include <parquet/arrow/writer.h>
#include <parquet/exception.h>
#include <stdint.h>
#include <map>
#include <string>
#include "common/status.h"
#include "exec/parquet_writer.h"
#include "vec/core/block.h"
#include "vec/exprs/vexpr_context.h"
namespace doris::vectorized {
class FileWriter;
// a wrapper of parquet output stream
class VParquetWriterWrapper {
public:
VParquetWriterWrapper(doris::FileWriter* file_writer,
const std::vector<VExprContext*>& output_vexpr_ctxs,
const std::map<std::string, std::string>& properties,
const std::vector<std::vector<std::string>>& schema,
bool output_object_data);
virtual ~VParquetWriterWrapper();
Status init();
Status validate_schema();
Status write(const Block& block);
Status init_parquet_writer();
void close();
void parse_properties(const std::map<std::string, std::string>& propertie_map);
Status parse_schema(const std::vector<std::vector<std::string>>& schema);
parquet::RowGroupWriter* get_rg_writer();
int64_t written_len();
private:
std::shared_ptr<ParquetOutputStream> _outstream;
std::shared_ptr<parquet::WriterProperties> _properties;
std::shared_ptr<parquet::schema::GroupNode> _schema;
std::unique_ptr<parquet::ParquetFileWriter> _writer;
const std::vector<VExprContext*>& _output_vexpr_ctxs;
std::vector<std::vector<std::string>> _str_schema;
int64_t _cur_written_rows = 0;
parquet::RowGroupWriter* _rg_writer;
const int64_t _max_row_per_group = 10;
bool _output_object_data;
};
} // namespace doris::vectorized

View File

@ -233,9 +233,8 @@ public class OutFileClause {
case TINYINT:
case SMALLINT:
case INT:
case DATEV2:
if (!type.equals("int32")) {
throw new AnalysisException("project field type is TINYINT/SMALLINT/INT/DATEV2,"
throw new AnalysisException("project field type is TINYINT/SMALLINT/INT,"
+ "should use int32, " + "but the definition type of column " + i + " is " + type);
}
break;
@ -243,9 +242,10 @@ public class OutFileClause {
case DATE:
case DATETIME:
case DATETIMEV2:
case DATEV2:
if (!type.equals("int64")) {
throw new AnalysisException("project field type is BIGINT/DATE/DATETIME, should use int64, "
+ "but the definition type of column " + i + " is " + type);
throw new AnalysisException("project field type is BIGINT/DATE/DATETIME/DATEV2/DATETIMEV2,"
+ "should use int64, but the definition type of column " + i + " is " + type);
}
break;
case FLOAT:
@ -305,13 +305,13 @@ public class OutFileClause {
case TINYINT:
case SMALLINT:
case INT:
case DATEV2:
column.add("int32");
break;
case BIGINT:
case DATE:
case DATETIME:
case DATETIMEV2:
case DATEV2:
column.add("int64");
break;
case FLOAT: