[Improvement](outfile) Support ORC format in outfile (#13019)

This commit is contained in:
Gabriel
2022-10-08 20:56:32 +08:00
committed by GitHub
parent 344377beb7
commit 869fe2bc5d
10 changed files with 838 additions and 32 deletions

View File

@ -53,6 +53,7 @@ struct ResultFileOptions {
//But in order to consider the compatibility when upgrading, so add a bool to check
//Now the code version is 1.1.2, so when the version is after 1.2, could remove this code.
bool is_refactor_before_flag = false;
std::string orc_schema;
ResultFileOptions(const TResultFileSinkOptions& t_opt) {
file_path = t_opt.file_path;
@ -93,6 +94,9 @@ struct ResultFileOptions {
if (t_opt.__isset.parquet_version) {
parquet_version = t_opt.parquet_version;
}
if (t_opt.__isset.orc_schema) {
orc_schema = t_opt.orc_schema;
}
}
};

View File

@ -225,6 +225,7 @@ set(VEC_FILES
runtime/vfile_result_writer.cpp
runtime/vpartition_info.cpp
runtime/vparquet_writer.cpp
runtime/vorc_writer.cpp
utils/arrow_column_to_doris_column.cpp
runtime/vsorted_run_merger.cpp
exec/file_arrow_scanner.cpp

View File

@ -36,6 +36,7 @@
#include "vec/core/block.h"
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vexpr_context.h"
#include "vec/runtime/vorc_writer.h"
namespace doris::vectorized {
const size_t VFileResultWriter::OUTSTREAM_BUFFER_SIZE_BYTES = 1024 * 1024;
@ -55,7 +56,7 @@ VFileResultWriter::VFileResultWriter(
_sinker(sinker),
_output_block(output_block),
_output_row_descriptor(output_row_descriptor),
_vparquet_writer(nullptr) {
_vfile_writer(nullptr) {
_output_object_data = output_object_data;
}
@ -118,11 +119,16 @@ Status VFileResultWriter::_create_file_writer(const std::string& file_name) {
// just use file writer is enough
break;
case TFileFormatType::FORMAT_PARQUET:
_vparquet_writer.reset(new VParquetWriterWrapper(
_vfile_writer.reset(new VParquetWriterWrapper(
_file_writer_impl.get(), _output_vexpr_ctxs, _file_opts->parquet_schemas,
_file_opts->parquet_commpression_type, _file_opts->parquert_disable_dictionary,
_file_opts->parquet_version, _output_object_data));
RETURN_IF_ERROR(_vparquet_writer->init_parquet_writer());
RETURN_IF_ERROR(_vfile_writer->prepare());
break;
case TFileFormatType::FORMAT_ORC:
_vfile_writer.reset(new VOrcWriterWrapper(_file_writer_impl.get(), _output_vexpr_ctxs,
_file_opts->orc_schema, _output_object_data));
RETURN_IF_ERROR(_vfile_writer->prepare());
break;
default:
return Status::InternalError("unsupported file format: {}", _file_opts->file_format);
@ -177,6 +183,8 @@ std::string VFileResultWriter::_file_format_to_name() {
return "csv";
case TFileFormatType::FORMAT_PARQUET:
return "parquet";
case TFileFormatType::FORMAT_ORC:
return "orc";
default:
return "unknown";
}
@ -197,8 +205,8 @@ Status VFileResultWriter::append_block(Block& block) {
if (UNLIKELY(num_rows == 0)) {
return status;
}
if (_vparquet_writer) {
_write_parquet_file(output_block);
if (_vfile_writer) {
_write_file(output_block);
} else {
RETURN_IF_ERROR(_write_csv_file(output_block));
}
@ -207,10 +215,10 @@ Status VFileResultWriter::append_block(Block& block) {
return Status::OK();
}
Status VFileResultWriter::_write_parquet_file(const Block& block) {
RETURN_IF_ERROR(_vparquet_writer->write(block));
Status VFileResultWriter::_write_file(const Block& block) {
RETURN_IF_ERROR(_vfile_writer->write(block));
// split file if exceed limit
_current_written_bytes = _vparquet_writer->written_len();
_current_written_bytes = _vfile_writer->written_len();
return _create_new_file_if_exceed_size();
}
@ -415,10 +423,10 @@ Status VFileResultWriter::_create_new_file_if_exceed_size() {
}
Status VFileResultWriter::_close_file_writer(bool done) {
if (_vparquet_writer) {
_vparquet_writer->close();
if (_vfile_writer) {
_vfile_writer->close();
COUNTER_UPDATE(_written_data_bytes, _current_written_bytes);
_vparquet_writer.reset(nullptr);
_vfile_writer.reset(nullptr);
} else if (_file_writer_impl) {
_file_writer_impl->close();
}

View File

@ -22,9 +22,9 @@
#include "vec/runtime/vparquet_writer.h"
#include "vec/sink/vresult_sink.h"
namespace doris {
namespace doris::vectorized {
class VFileWriterWrapper;
namespace vectorized {
// write result to file
class VFileResultWriter final : public VResultWriter {
public:
@ -52,7 +52,7 @@ public:
Status write_csv_header();
private:
Status _write_parquet_file(const Block& block);
Status _write_file(const Block& block);
Status _write_csv_file(const Block& block);
// if buffer exceed the limit, write the data buffered in _plain_text_outstream via file_writer
@ -122,8 +122,7 @@ private:
bool _is_result_sent = false;
bool _header_sent = false;
RowDescriptor _output_row_descriptor;
// parquet file writer
std::unique_ptr<VParquetWriterWrapper> _vparquet_writer;
// parquet/orc file writer
std::unique_ptr<VFileWriterWrapper> _vfile_writer;
};
} // namespace vectorized
} // namespace doris
} // namespace doris::vectorized

View File

@ -0,0 +1,481 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "vec/runtime/vorc_writer.h"
#include "io/file_writer.h"
#include "vec/columns/column_complex.h"
#include "vec/columns/column_nullable.h"
#include "vec/columns/column_string.h"
#include "vec/columns/column_vector.h"
#include "vec/data_types/data_type_decimal.h"
#include "vec/data_types/data_type_nullable.h"
#include "vec/exprs/vexpr.h"
#include "vec/functions/function_helpers.h"
namespace doris::vectorized {
VOrcOutputStream::VOrcOutputStream(doris::FileWriter* file_writer)
: _file_writer(file_writer), _cur_pos(0), _written_len(0), _name("VOrcOutputStream") {}
VOrcOutputStream::~VOrcOutputStream() {
if (!_is_closed) {
close();
}
}
void VOrcOutputStream::close() {
if (!_is_closed) {
Status st = _file_writer->close();
if (!st.ok()) {
LOG(WARNING) << "close orc output stream failed: " << st.get_error_msg();
}
_is_closed = true;
}
}
void VOrcOutputStream::write(const void* data, size_t length) {
if (!_is_closed) {
size_t written_len = 0;
Status st = _file_writer->write(static_cast<const uint8_t*>(data), length, &written_len);
if (!st.ok()) {
LOG(WARNING) << "Write to ORC file failed: " << st.get_error_msg();
return;
}
_cur_pos += written_len;
_written_len += written_len;
}
}
void VOrcOutputStream::set_written_len(int64_t written_len) {
_written_len = written_len;
}
VOrcWriterWrapper::VOrcWriterWrapper(doris::FileWriter* file_writer,
const std::vector<VExprContext*>& output_vexpr_ctxs,
const std::string& schema, bool output_object_data)
: VFileWriterWrapper(output_vexpr_ctxs, output_object_data),
_file_writer(file_writer),
_write_options(new orc::WriterOptions()),
_schema_str(schema) {}
Status VOrcWriterWrapper::prepare() {
try {
_schema = orc::Type::buildTypeFromString(_schema_str);
} catch (const std::exception& e) {
return Status::InternalError("Orc build schema from \"{}\" failed: {}", _schema_str,
e.what());
}
_output_stream = std::unique_ptr<VOrcOutputStream>(new VOrcOutputStream(_file_writer));
_writer = orc::createWriter(*_schema, _output_stream.get(), *_write_options);
if (_writer == nullptr) {
return Status::InternalError("Failed to create file writer");
}
return Status::OK();
}
std::unique_ptr<orc::ColumnVectorBatch> VOrcWriterWrapper::_create_row_batch(size_t sz) {
return _writer->createRowBatch(sz);
}
int64_t VOrcWriterWrapper::written_len() {
return _output_stream->getLength();
}
void VOrcWriterWrapper::close() {
if (_writer != nullptr) {
_writer->close();
}
}
#define RETURN_WRONG_TYPE \
return Status::InvalidArgument("Invalid column type: {}", raw_column->get_name());
#define WRITE_SINGLE_ELEMENTS_INTO_BATCH(VECTOR_BATCH, COLUMN) \
VECTOR_BATCH* cur_batch = dynamic_cast<VECTOR_BATCH*>(root->fields[i]); \
if (null_map != nullptr) { \
cur_batch->hasNulls = true; \
auto& null_data = assert_cast<const ColumnUInt8&>(*null_map).get_data(); \
for (size_t row_id = 0; row_id < sz; row_id++) { \
if (null_data[row_id] != 0) { \
cur_batch->notNull[row_id] = 0; \
} else { \
cur_batch->notNull[row_id] = 1; \
cur_batch->data[row_id] = assert_cast<const COLUMN&>(*col).get_data()[row_id]; \
} \
} \
} else if (const auto& not_null_column = check_and_get_column<const COLUMN>(col)) { \
for (size_t row_id = 0; row_id < sz; row_id++) { \
cur_batch->data[row_id] = not_null_column->get_data()[row_id]; \
} \
} else { \
RETURN_WRONG_TYPE \
}
#define WRITE_CONTINUOUS_ELEMENTS_INTO_BATCH(VECTOR_BATCH, COLUMN, NATIVE_TYPE) \
VECTOR_BATCH* cur_batch = dynamic_cast<VECTOR_BATCH*>(root->fields[i]); \
if (null_map != nullptr) { \
cur_batch->hasNulls = true; \
auto& null_data = assert_cast<const ColumnUInt8&>(*null_map).get_data(); \
for (size_t row_id = 0; row_id < sz; row_id++) { \
if (null_data[row_id] != 0) { \
cur_batch->notNull[row_id] = 0; \
} else { \
cur_batch->notNull[row_id] = 1; \
cur_batch->data[row_id] = assert_cast<const COLUMN&>(*col).get_data()[row_id]; \
} \
} \
} else if (const auto& not_null_column = check_and_get_column<const COLUMN>(col)) { \
memcpy(cur_batch->data.data(), not_null_column->get_data().data(), \
sz * sizeof(NATIVE_TYPE)); \
} else { \
RETURN_WRONG_TYPE \
}
#define WRITE_DATE_STRING_INTO_BATCH(FROM, TO) \
orc::StringVectorBatch* cur_batch = dynamic_cast<orc::StringVectorBatch*>(root->fields[i]); \
size_t offset = 0; \
if (null_map != nullptr) { \
cur_batch->hasNulls = true; \
auto& null_data = assert_cast<const ColumnUInt8&>(*null_map).get_data(); \
for (size_t row_id = 0; row_id < sz; row_id++) { \
if (null_data[row_id] != 0) { \
cur_batch->notNull[row_id] = 0; \
} else { \
cur_batch->notNull[row_id] = 1; \
int len = binary_cast<FROM, TO>( \
assert_cast<const ColumnVector<FROM>&>(*col).get_data()[row_id]) \
.to_buffer(buffer.ptr); \
while (buffer.len < offset + len) { \
char* new_ptr = (char*)malloc(buffer.len + BUFFER_UNIT_SIZE); \
memcpy(new_ptr, buffer.ptr, buffer.len); \
free(buffer.ptr); \
buffer.ptr = new_ptr; \
buffer.len = buffer.len + BUFFER_UNIT_SIZE; \
} \
cur_batch->length[row_id] = len; \
offset += len; \
} \
} \
offset = 0; \
for (size_t row_id = 0; row_id < sz; row_id++) { \
if (null_data[row_id] != 0) { \
cur_batch->notNull[row_id] = 0; \
} else { \
cur_batch->data[row_id] = buffer.ptr + offset; \
offset += cur_batch->length[row_id]; \
} \
} \
} else if (const auto& not_null_column = \
check_and_get_column<const ColumnVector<FROM>>(col)) { \
for (size_t row_id = 0; row_id < sz; row_id++) { \
int len = binary_cast<FROM, TO>(not_null_column->get_data()[row_id]) \
.to_buffer(buffer.ptr); \
while (buffer.len < offset + len) { \
char* new_ptr = (char*)malloc(buffer.len + BUFFER_UNIT_SIZE); \
memcpy(new_ptr, buffer.ptr, buffer.len); \
free(buffer.ptr); \
buffer.ptr = new_ptr; \
buffer.len = buffer.len + BUFFER_UNIT_SIZE; \
} \
cur_batch->length[row_id] = len; \
offset += len; \
} \
offset = 0; \
for (size_t row_id = 0; row_id < sz; row_id++) { \
cur_batch->data[row_id] = buffer.ptr + offset; \
offset += cur_batch->length[row_id]; \
} \
} else { \
RETURN_WRONG_TYPE \
}
#define WRITE_DECIMAL_INTO_BATCH(VECTOR_BATCH, COLUMN) \
VECTOR_BATCH* cur_batch = dynamic_cast<VECTOR_BATCH*>(root->fields[i]); \
if (null_map != nullptr) { \
cur_batch->hasNulls = true; \
auto& null_data = assert_cast<const ColumnUInt8&>(*null_map).get_data(); \
for (size_t row_id = 0; row_id < sz; row_id++) { \
if (null_data[row_id] != 0) { \
cur_batch->notNull[row_id] = 0; \
} else { \
cur_batch->notNull[row_id] = 1; \
cur_batch->values[row_id] = assert_cast<const COLUMN&>(*col).get_data()[row_id]; \
} \
} \
} else if (const auto& not_null_column = check_and_get_column<const COLUMN>(col)) { \
for (size_t row_id = 0; row_id < sz; row_id++) { \
cur_batch->values[row_id] = not_null_column->get_data()[row_id]; \
} \
} else { \
RETURN_WRONG_TYPE \
}
#define WRITE_COMPLEX_TYPE_INTO_BATCH(VECTOR_BATCH, COLUMN) \
VECTOR_BATCH* cur_batch = dynamic_cast<VECTOR_BATCH*>(root->fields[i]); \
if (null_map != nullptr) { \
cur_batch->hasNulls = true; \
auto& null_data = assert_cast<const ColumnUInt8&>(*null_map).get_data(); \
for (size_t row_id = 0; row_id < sz; row_id++) { \
if (null_data[row_id] != 0) { \
cur_batch->notNull[row_id] = 0; \
} else { \
cur_batch->notNull[row_id] = 1; \
const auto& ele = col->get_data_at(row_id); \
cur_batch->data[row_id] = const_cast<char*>(ele.data); \
cur_batch->length[row_id] = ele.size; \
} \
} \
} else if (const auto& not_null_column = check_and_get_column<const COLUMN>(col)) { \
for (size_t row_id = 0; row_id < sz; row_id++) { \
const auto& ele = not_null_column->get_data_at(row_id); \
cur_batch->data[row_id] = const_cast<char*>(ele.data); \
cur_batch->length[row_id] = ele.size; \
} \
} else { \
RETURN_WRONG_TYPE \
}
#define SET_NUM_ELEMENTS cur_batch->numElements = sz;
Status VOrcWriterWrapper::write(const Block& block) {
if (block.rows() == 0) {
return Status::OK();
}
// Buffer used by date type
char* ptr = (char*)malloc(BUFFER_UNIT_SIZE);
StringValue buffer(ptr, BUFFER_UNIT_SIZE);
size_t sz = block.rows();
auto row_batch = _create_row_batch(sz);
orc::StructVectorBatch* root = dynamic_cast<orc::StructVectorBatch*>(row_batch.get());
try {
for (size_t i = 0; i < block.columns(); i++) {
auto& raw_column = block.get_by_position(i).column;
auto nullable = raw_column->is_nullable();
const auto col = nullable ? reinterpret_cast<const ColumnNullable*>(
block.get_by_position(i).column.get())
->get_nested_column_ptr()
.get()
: block.get_by_position(i).column.get();
auto null_map = nullable && reinterpret_cast<const ColumnNullable*>(
block.get_by_position(i).column.get())
->has_null()
? reinterpret_cast<const ColumnNullable*>(
block.get_by_position(i).column.get())
->get_null_map_column_ptr()
: nullptr;
switch (_output_vexpr_ctxs[i]->root()->type().type) {
case TYPE_BOOLEAN: {
WRITE_SINGLE_ELEMENTS_INTO_BATCH(orc::LongVectorBatch, ColumnVector<UInt8>)
SET_NUM_ELEMENTS
break;
}
case TYPE_TINYINT: {
WRITE_SINGLE_ELEMENTS_INTO_BATCH(orc::LongVectorBatch, ColumnVector<Int8>)
SET_NUM_ELEMENTS
break;
}
case TYPE_SMALLINT: {
WRITE_SINGLE_ELEMENTS_INTO_BATCH(orc::LongVectorBatch, ColumnVector<Int16>)
SET_NUM_ELEMENTS
break;
}
case TYPE_INT: {
WRITE_SINGLE_ELEMENTS_INTO_BATCH(orc::LongVectorBatch, ColumnVector<Int32>)
SET_NUM_ELEMENTS
break;
}
case TYPE_BIGINT: {
WRITE_CONTINUOUS_ELEMENTS_INTO_BATCH(orc::LongVectorBatch, ColumnVector<Int64>,
Int64)
SET_NUM_ELEMENTS
break;
}
case TYPE_LARGEINT: {
return Status::InvalidArgument("do not support large int type.");
}
case TYPE_FLOAT: {
WRITE_SINGLE_ELEMENTS_INTO_BATCH(orc::DoubleVectorBatch, ColumnVector<Float32>)
SET_NUM_ELEMENTS
break;
}
case TYPE_DOUBLE: {
WRITE_CONTINUOUS_ELEMENTS_INTO_BATCH(orc::DoubleVectorBatch, ColumnVector<Float64>,
Float64)
SET_NUM_ELEMENTS
break;
}
case TYPE_DATETIME:
case TYPE_DATE: {
WRITE_DATE_STRING_INTO_BATCH(Int64, VecDateTimeValue)
SET_NUM_ELEMENTS
break;
}
case TYPE_DATEV2: {
WRITE_DATE_STRING_INTO_BATCH(UInt32, DateV2Value<DateV2ValueType>)
SET_NUM_ELEMENTS
break;
}
case TYPE_DATETIMEV2: {
orc::StringVectorBatch* cur_batch =
dynamic_cast<orc::StringVectorBatch*>(root->fields[i]);
size_t offset = 0;
if (null_map != nullptr) {
cur_batch->hasNulls = true;
auto& null_data = assert_cast<const ColumnUInt8&>(*null_map).get_data();
for (size_t row_id = 0; row_id < sz; row_id++) {
if (null_data[row_id] != 0) {
cur_batch->notNull[row_id] = 0;
} else {
cur_batch->notNull[row_id] = 1;
int output_scale = _output_vexpr_ctxs[i]->root()->type().scale;
int len = binary_cast<UInt64, DateV2Value<DateTimeV2ValueType>>(
assert_cast<const ColumnVector<UInt64>&>(*col)
.get_data()[row_id])
.to_buffer(buffer.ptr, output_scale);
while (buffer.len < offset + len) {
char* new_ptr = (char*)malloc(buffer.len + BUFFER_UNIT_SIZE);
memcpy(new_ptr, buffer.ptr, buffer.len);
free(buffer.ptr);
buffer.ptr = new_ptr;
buffer.len = buffer.len + BUFFER_UNIT_SIZE;
}
cur_batch->length[row_id] = len;
offset += len;
}
}
offset = 0;
for (size_t row_id = 0; row_id < sz; row_id++) {
if (null_data[row_id] != 0) {
cur_batch->notNull[row_id] = 0;
} else {
cur_batch->data[row_id] = buffer.ptr + offset;
offset += cur_batch->length[row_id];
}
}
} else if (const auto& not_null_column =
check_and_get_column<const ColumnVector<UInt64>>(col)) {
for (size_t row_id = 0; row_id < sz; row_id++) {
int output_scale = _output_vexpr_ctxs[i]->root()->type().scale;
int len = binary_cast<UInt64, DateV2Value<DateTimeV2ValueType>>(
not_null_column->get_data()[row_id])
.to_buffer(buffer.ptr, output_scale);
while (buffer.len < offset + len) {
char* new_ptr = (char*)malloc(buffer.len + BUFFER_UNIT_SIZE);
memcpy(new_ptr, buffer.ptr, buffer.len);
free(buffer.ptr);
buffer.ptr = new_ptr;
buffer.len = buffer.len + BUFFER_UNIT_SIZE;
}
cur_batch->length[row_id] = len;
offset += len;
}
offset = 0;
for (size_t row_id = 0; row_id < sz; row_id++) {
cur_batch->data[row_id] = buffer.ptr + offset;
offset += cur_batch->length[row_id];
}
} else {
RETURN_WRONG_TYPE
}
SET_NUM_ELEMENTS
break;
}
case TYPE_OBJECT: {
if (_output_object_data) {
WRITE_COMPLEX_TYPE_INTO_BATCH(orc::StringVectorBatch, ColumnBitmap)
SET_NUM_ELEMENTS
} else {
RETURN_WRONG_TYPE
}
break;
}
case TYPE_HLL: {
if (_output_object_data) {
WRITE_COMPLEX_TYPE_INTO_BATCH(orc::StringVectorBatch, ColumnHLL)
SET_NUM_ELEMENTS
} else {
RETURN_WRONG_TYPE
}
break;
}
case TYPE_CHAR:
case TYPE_VARCHAR:
case TYPE_STRING: {
WRITE_COMPLEX_TYPE_INTO_BATCH(orc::StringVectorBatch, ColumnString)
SET_NUM_ELEMENTS
break;
}
case TYPE_DECIMAL32: {
WRITE_DECIMAL_INTO_BATCH(orc::Decimal64VectorBatch, ColumnDecimal32)
SET_NUM_ELEMENTS
break;
}
case TYPE_DECIMAL64: {
WRITE_DECIMAL_INTO_BATCH(orc::Decimal64VectorBatch, ColumnDecimal64)
SET_NUM_ELEMENTS
break;
}
case TYPE_DECIMALV2:
case TYPE_DECIMAL128: {
orc::Decimal128VectorBatch* cur_batch =
dynamic_cast<orc::Decimal128VectorBatch*>(root->fields[i]);
if (null_map != nullptr) {
cur_batch->hasNulls = true;
auto& null_data = assert_cast<const ColumnUInt8&>(*null_map).get_data();
for (size_t row_id = 0; row_id < sz; row_id++) {
if (null_data[row_id] != 0) {
cur_batch->notNull[row_id] = 0;
} else {
cur_batch->notNull[row_id] = 1;
auto& v = assert_cast<const ColumnDecimal128&>(*col).get_data()[row_id];
orc::Int128 value(v >> 64, (uint64_t)v);
cur_batch->values[row_id] += value;
}
}
} else if (const auto& not_null_column =
check_and_get_column<const ColumnDecimal128>(col)) {
memcpy(reinterpret_cast<void*>(cur_batch->values.data()),
not_null_column->get_data().data(), sz * sizeof(Int128));
} else {
RETURN_WRONG_TYPE
}
SET_NUM_ELEMENTS
break;
}
default: {
return Status::InvalidArgument(
"Invalid expression type: {}",
_output_vexpr_ctxs[i]->root()->type().debug_string());
}
}
}
} catch (const std::exception& e) {
LOG(WARNING) << "Parquet write error: " << e.what();
return Status::InternalError(e.what());
}
root->numElements = sz;
_writer->add(*row_batch);
_cur_written_rows += sz;
free(buffer.ptr);
return Status::OK();
}
} // namespace doris::vectorized

View File

@ -0,0 +1,91 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <stdint.h>
#include <map>
#include <orc/OrcFile.hh>
#include <string>
#include "common/status.h"
#include "io/file_writer.h"
#include "vec/core/block.h"
#include "vec/exprs/vexpr_context.h"
#include "vec/runtime/vfile_result_writer.h"
namespace doris::vectorized {
class FileWriter;
class VOrcOutputStream : public orc::OutputStream {
public:
VOrcOutputStream(doris::FileWriter* file_writer);
~VOrcOutputStream() override;
uint64_t getLength() const override { return _written_len; }
uint64_t getNaturalWriteSize() const override { return 128 * 1024; }
void write(const void* buf, size_t length) override;
const std::string& getName() const override { return _name; }
void close() override;
void set_written_len(int64_t written_len);
private:
doris::FileWriter* _file_writer; // not owned
int64_t _cur_pos = 0; // current write position
bool _is_closed = false;
int64_t _written_len = 0;
const std::string _name;
};
// a wrapper of parquet output stream
class VOrcWriterWrapper final : public VFileWriterWrapper {
public:
VOrcWriterWrapper(doris::FileWriter* file_writer,
const std::vector<VExprContext*>& output_vexpr_ctxs,
const std::string& schema, bool output_object_data);
~VOrcWriterWrapper() = default;
Status prepare() override;
Status write(const Block& block) override;
void close() override;
int64_t written_len() override;
private:
std::unique_ptr<orc::ColumnVectorBatch> _create_row_batch(size_t sz);
doris::FileWriter* _file_writer;
std::unique_ptr<orc::OutputStream> _output_stream;
std::unique_ptr<orc::WriterOptions> _write_options;
const std::string& _schema_str;
std::unique_ptr<orc::Type> _schema;
std::unique_ptr<orc::Writer> _writer;
static constexpr size_t BUFFER_UNIT_SIZE = 4096;
};
} // namespace doris::vectorized

View File

@ -43,10 +43,7 @@ VParquetWriterWrapper::VParquetWriterWrapper(doris::FileWriter* file_writer,
const bool& parquet_disable_dictionary,
const TParquetVersion::type& parquet_version,
bool output_object_data)
: _output_vexpr_ctxs(output_vexpr_ctxs),
_cur_written_rows(0),
_rg_writer(nullptr),
_output_object_data(output_object_data) {
: VFileWriterWrapper(output_vexpr_ctxs, output_object_data), _rg_writer(nullptr) {
_outstream = std::shared_ptr<ParquetOutputStream>(new ParquetOutputStream(file_writer));
parse_properties(compression_type, parquet_disable_dictionary, parquet_version);
parse_schema(parquet_schemas);
@ -480,7 +477,7 @@ Status VParquetWriterWrapper::write(const Block& block) {
return Status::OK();
}
Status VParquetWriterWrapper::init_parquet_writer() {
Status VParquetWriterWrapper::prepare() {
_writer = parquet::ParquetFileWriter::Open(_outstream, _schema, _properties);
if (_writer == nullptr) {
return Status::InternalError("Failed to create file writer");

View File

@ -37,12 +37,35 @@
#include "exec/parquet_writer.h"
#include "vec/core/block.h"
#include "vec/exprs/vexpr_context.h"
#include "vec/runtime/vfile_result_writer.h"
namespace doris::vectorized {
class FileWriter;
class VFileWriterWrapper {
public:
VFileWriterWrapper(const std::vector<VExprContext*>& output_vexpr_ctxs, bool output_object_data)
: _output_vexpr_ctxs(output_vexpr_ctxs),
_cur_written_rows(0),
_output_object_data(output_object_data) {}
virtual ~VFileWriterWrapper() = default;
virtual Status prepare() = 0;
virtual Status write(const Block& block) = 0;
virtual void close() = 0;
virtual int64_t written_len() = 0;
protected:
const std::vector<VExprContext*>& _output_vexpr_ctxs;
int64_t _cur_written_rows;
bool _output_object_data;
};
// a wrapper of parquet output stream
class VParquetWriterWrapper {
class VParquetWriterWrapper final : public VFileWriterWrapper {
public:
VParquetWriterWrapper(doris::FileWriter* file_writer,
const std::vector<VExprContext*>& output_vexpr_ctxs,
@ -53,13 +76,13 @@ public:
~VParquetWriterWrapper() = default;
Status init_parquet_writer();
Status prepare() override;
Status write(const Block& block);
Status write(const Block& block) override;
void close();
void close() override;
int64_t written_len();
int64_t written_len() override;
private:
parquet::RowGroupWriter* get_rg_writer();
@ -75,11 +98,8 @@ private:
std::shared_ptr<parquet::WriterProperties> _properties;
std::shared_ptr<parquet::schema::GroupNode> _schema;
std::unique_ptr<parquet::ParquetFileWriter> _writer;
const std::vector<VExprContext*>& _output_vexpr_ctxs;
int64_t _cur_written_rows = 0;
parquet::RowGroupWriter* _rg_writer;
const int64_t _max_row_per_group = 10;
bool _output_object_data;
};
} // namespace doris::vectorized

View File

@ -20,11 +20,13 @@ package org.apache.doris.analysis;
import org.apache.doris.backup.HdfsStorage;
import org.apache.doris.backup.S3Storage;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.BrokerUtil;
import org.apache.doris.common.util.ParseUtil;
@ -64,6 +66,7 @@ public class OutFileClause {
public static final Map<String, TParquetDataType> PARQUET_DATA_TYPE_MAP = Maps.newHashMap();
public static final Map<String, TParquetCompressionType> PARQUET_COMPRESSION_TYPE_MAP = Maps.newHashMap();
public static final Map<String, TParquetVersion> PARQUET_VERSION_MAP = Maps.newHashMap();
public static final Set<String> ORC_DATA_TYPE = Sets.newHashSet();
static {
RESULT_COL_NAMES.add("FileNumber");
@ -100,6 +103,15 @@ public class OutFileClause {
PARQUET_VERSION_MAP.put("v1", TParquetVersion.PARQUET_1_0);
PARQUET_VERSION_MAP.put("latest", TParquetVersion.PARQUET_2_LATEST);
ORC_DATA_TYPE.add("bigint");
ORC_DATA_TYPE.add("boolean");
ORC_DATA_TYPE.add("double");
ORC_DATA_TYPE.add("float");
ORC_DATA_TYPE.add("int");
ORC_DATA_TYPE.add("smallint");
ORC_DATA_TYPE.add("string");
ORC_DATA_TYPE.add("tinyint");
}
public static final String LOCAL_FILE_PREFIX = "file:///";
@ -114,6 +126,7 @@ public class OutFileClause {
private static final String PROP_MAX_FILE_SIZE = "max_file_size";
private static final String PROP_SUCCESS_FILE_NAME = "success_file_name";
private static final String PARQUET_PROP_PREFIX = "parquet.";
private static final String ORC_PROP_PREFIX = "orc.";
private static final String SCHEMA = "schema";
private static final long DEFAULT_MAX_FILE_SIZE_BYTES = 1 * 1024 * 1024 * 1024; // 1GB
@ -138,6 +151,8 @@ public class OutFileClause {
private List<TParquetSchema> parquetSchemas = new ArrayList<>();
private List<Pair<String, String>> orcSchemas = new ArrayList<>();
private boolean isAnalyzed = false;
private String headerType = "";
@ -207,6 +222,9 @@ public class OutFileClause {
case "parquet":
fileFormatType = TFileFormatType.FORMAT_PARQUET;
break;
case "orc":
fileFormatType = TFileFormatType.FORMAT_ORC;
break;
case "csv_with_names":
headerType = FeConstants.csv_with_names;
fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN;
@ -230,6 +248,146 @@ public class OutFileClause {
if (isParquetFormat()) {
analyzeForParquetFormat(resultExprs, colLabels);
} else if (isOrcFormat()) {
analyzeForOrcFormat(resultExprs, colLabels);
}
}
private void genOrcSchema(List<Expr> resultExprs, List<String> colLabels) throws AnalysisException {
Preconditions.checkState(this.parquetSchemas.isEmpty());
for (int i = 0; i < resultExprs.size(); ++i) {
Expr expr = resultExprs.get(i);
String type = "";
switch (expr.getType().getPrimitiveType()) {
case BOOLEAN:
case TINYINT:
case SMALLINT:
case INT:
case BIGINT:
case FLOAT:
case DOUBLE:
case STRING:
type = expr.getType().getPrimitiveType().toString().toLowerCase();
break;
case HLL:
case BITMAP:
if (!(ConnectContext.get() != null && ConnectContext.get()
.getSessionVariable().isReturnObjectDataAsBinary())) {
break;
}
type = "string";
break;
case DATE:
case DATETIME:
case DATETIMEV2:
case DATEV2:
case CHAR:
case VARCHAR:
type = "string";
break;
case DECIMALV2:
if (!expr.getType().isWildcardDecimal()) {
type = String.format("decimal(%d, %d)", ScalarType.MAX_DECIMAL128_PRECISION,
((ScalarType) expr.getType()).decimalScale());
} else {
throw new AnalysisException("currently ORC writer do not support WildcardDecimal!");
}
break;
case DECIMAL32:
case DECIMAL64:
case DECIMAL128:
if (!expr.getType().isWildcardDecimal()) {
type = String.format("decimal(%d, %d)", ((ScalarType) expr.getType()).getPrecision(),
((ScalarType) expr.getType()).decimalScale());
} else {
throw new AnalysisException("currently ORC writer do not support WildcardDecimal!");
}
break;
default:
throw new AnalysisException("currently orc do not support column type: "
+ expr.getType().getPrimitiveType());
}
orcSchemas.add(Pair.of(colLabels.get(i), type));
}
}
private String serializeOrcSchema() {
StringBuilder sb = new StringBuilder();
sb.append("struct<");
this.orcSchemas.forEach(pair -> sb.append(pair.first + ":" + pair.second + ","));
if (!this.orcSchemas.isEmpty()) {
return sb.substring(0, sb.length() - 1) + ">";
} else {
return sb.toString() + ">";
}
}
private void analyzeForOrcFormat(List<Expr> resultExprs, List<String> colLabels) throws AnalysisException {
if (this.orcSchemas.isEmpty()) {
genOrcSchema(resultExprs, colLabels);
}
// check schema number
if (resultExprs.size() != this.orcSchemas.size()) {
throw new AnalysisException("Orc schema number does not equal to select item number");
}
// check type
for (int i = 0; i < this.orcSchemas.size(); ++i) {
Pair<String, String> schema = this.orcSchemas.get(i);
Type resultType = resultExprs.get(i).getType();
switch (resultType.getPrimitiveType()) {
case BOOLEAN:
case TINYINT:
case SMALLINT:
case INT:
case BIGINT:
case FLOAT:
case DOUBLE:
case STRING:
if (!schema.second.equals(resultType.getPrimitiveType().toString().toLowerCase())) {
throw new AnalysisException("project field type is " + resultType.getPrimitiveType().toString()
+ ", should use " + resultType.getPrimitiveType().toString() + ","
+ " but the type of column " + i + " is " + schema.second);
}
break;
case DATE:
case DATETIME:
case DATETIMEV2:
case DATEV2:
case CHAR:
case VARCHAR:
if (!schema.second.equals("string")) {
throw new AnalysisException("project field type is " + resultType.getPrimitiveType().toString()
+ ", should use string, but the definition type of column " + i + " is "
+ schema.second);
}
break;
case DECIMAL32:
case DECIMAL64:
case DECIMAL128:
case DECIMALV2:
if (!schema.second.startsWith("decimal")) {
throw new AnalysisException("project field type is " + resultType.getPrimitiveType().toString()
+ ", should use string, but the definition type of column " + i + " is "
+ schema.second);
}
break;
case HLL:
case BITMAP:
if (ConnectContext.get() != null && ConnectContext.get()
.getSessionVariable().isReturnObjectDataAsBinary()) {
if (!schema.second.equals("string")) {
throw new AnalysisException("project field type is HLL/BITMAP, should use string, "
+ "but the definition type of column " + i + " is " + schema.second);
}
} else {
throw new AnalysisException("Orc format does not support column type: "
+ resultType.getPrimitiveType());
}
break;
default:
throw new AnalysisException("Orc format does not support column type: "
+ resultType.getPrimitiveType());
}
}
}
@ -438,6 +596,10 @@ public class OutFileClause {
getParquetProperties(processedPropKeys);
}
if (this.fileFormatType == TFileFormatType.FORMAT_ORC) {
getOrcProperties(processedPropKeys);
}
if (processedPropKeys.size() != properties.size()) {
LOG.debug("{} vs {}", processedPropKeys, properties);
throw new AnalysisException("Unknown properties: " + properties.keySet().stream()
@ -575,6 +737,41 @@ public class OutFileClause {
processedPropKeys.add(SCHEMA);
}
private void getOrcProperties(Set<String> processedPropKeys) throws AnalysisException {
// check schema. if schema is not set, Doris will gen schema by select items
String schema = properties.get(SCHEMA);
if (schema == null) {
return;
}
if (schema.isEmpty()) {
throw new AnalysisException("Orc schema property should not be empty");
}
schema = schema.replace(" ", "");
schema = schema.toLowerCase();
String[] schemas = schema.split(";");
for (String item : schemas) {
String[] properties = item.split(",");
if (properties.length != 2) {
throw new AnalysisException("must only contains type and column name");
}
if (!ORC_DATA_TYPE.contains(properties[1]) && !properties[1].startsWith("decimal")) {
throw new AnalysisException("data type is not supported:" + properties[1]);
} else if (!ORC_DATA_TYPE.contains(properties[1]) && properties[1].startsWith("decimal")) {
String errorMsg = "Format of decimal type must be decimal(%d,%d)";
String precisionAndScale = properties[1].substring(0, "decimal".length()).trim();
if (!precisionAndScale.startsWith("(") || !precisionAndScale.endsWith(")")) {
throw new AnalysisException(errorMsg);
}
String[] str = precisionAndScale.substring(1, precisionAndScale.length() - 1).split(",");
if (str.length != 2) {
throw new AnalysisException(errorMsg);
}
}
orcSchemas.add(Pair.of(properties[0], properties[1]));
}
processedPropKeys.add(SCHEMA);
}
private boolean isCsvFormat() {
return fileFormatType == TFileFormatType.FORMAT_CSV_BZ2
|| fileFormatType == TFileFormatType.FORMAT_CSV_DEFLATE
@ -589,6 +786,10 @@ public class OutFileClause {
return fileFormatType == TFileFormatType.FORMAT_PARQUET;
}
private boolean isOrcFormat() {
return fileFormatType == TFileFormatType.FORMAT_ORC;
}
@Override
public OutFileClause clone() {
return new OutFileClause(this);
@ -635,6 +836,9 @@ public class OutFileClause {
sinkOptions.setParquetVersion(parquetVersion);
sinkOptions.setParquetSchemas(parquetSchemas);
}
if (isOrcFormat()) {
sinkOptions.setOrcSchema(serializeOrcSchema());
}
return sinkOptions;
}
}

View File

@ -100,6 +100,7 @@ struct TResultFileSinkOptions {
12: optional TParquetCompressionType parquet_compression_type
13: optional bool parquet_disable_dictionary
14: optional TParquetVersion parquet_version
15: optional string orc_schema
}
struct TMemoryScratchSink {