From 869fe2bc5dc7a2a995c491b3f3af57489fa24daf Mon Sep 17 00:00:00 2001 From: Gabriel Date: Sat, 8 Oct 2022 20:56:32 +0800 Subject: [PATCH] [Improvement](outfile) Support ORC format in outfile (#13019) --- be/src/runtime/file_result_writer.h | 4 + be/src/vec/CMakeLists.txt | 1 + be/src/vec/runtime/vfile_result_writer.cpp | 30 +- be/src/vec/runtime/vfile_result_writer.h | 13 +- be/src/vec/runtime/vorc_writer.cpp | 481 ++++++++++++++++++ be/src/vec/runtime/vorc_writer.h | 91 ++++ be/src/vec/runtime/vparquet_writer.cpp | 7 +- be/src/vec/runtime/vparquet_writer.h | 38 +- .../apache/doris/analysis/OutFileClause.java | 204 ++++++++ gensrc/thrift/DataSinks.thrift | 1 + 10 files changed, 838 insertions(+), 32 deletions(-) create mode 100644 be/src/vec/runtime/vorc_writer.cpp create mode 100644 be/src/vec/runtime/vorc_writer.h diff --git a/be/src/runtime/file_result_writer.h b/be/src/runtime/file_result_writer.h index 42e279c08e..4e346f3af5 100644 --- a/be/src/runtime/file_result_writer.h +++ b/be/src/runtime/file_result_writer.h @@ -53,6 +53,7 @@ struct ResultFileOptions { //But in order to consider the compatibility when upgrading, so add a bool to check //Now the code version is 1.1.2, so when the version is after 1.2, could remove this code. bool is_refactor_before_flag = false; + std::string orc_schema; ResultFileOptions(const TResultFileSinkOptions& t_opt) { file_path = t_opt.file_path; @@ -93,6 +94,9 @@ struct ResultFileOptions { if (t_opt.__isset.parquet_version) { parquet_version = t_opt.parquet_version; } + if (t_opt.__isset.orc_schema) { + orc_schema = t_opt.orc_schema; + } } }; diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt index c69cdeb01d..87900dcd0b 100644 --- a/be/src/vec/CMakeLists.txt +++ b/be/src/vec/CMakeLists.txt @@ -225,6 +225,7 @@ set(VEC_FILES runtime/vfile_result_writer.cpp runtime/vpartition_info.cpp runtime/vparquet_writer.cpp + runtime/vorc_writer.cpp utils/arrow_column_to_doris_column.cpp runtime/vsorted_run_merger.cpp exec/file_arrow_scanner.cpp diff --git a/be/src/vec/runtime/vfile_result_writer.cpp b/be/src/vec/runtime/vfile_result_writer.cpp index 4ae0e79dab..973f79de27 100644 --- a/be/src/vec/runtime/vfile_result_writer.cpp +++ b/be/src/vec/runtime/vfile_result_writer.cpp @@ -36,6 +36,7 @@ #include "vec/core/block.h" #include "vec/exprs/vexpr.h" #include "vec/exprs/vexpr_context.h" +#include "vec/runtime/vorc_writer.h" namespace doris::vectorized { const size_t VFileResultWriter::OUTSTREAM_BUFFER_SIZE_BYTES = 1024 * 1024; @@ -55,7 +56,7 @@ VFileResultWriter::VFileResultWriter( _sinker(sinker), _output_block(output_block), _output_row_descriptor(output_row_descriptor), - _vparquet_writer(nullptr) { + _vfile_writer(nullptr) { _output_object_data = output_object_data; } @@ -118,11 +119,16 @@ Status VFileResultWriter::_create_file_writer(const std::string& file_name) { // just use file writer is enough break; case TFileFormatType::FORMAT_PARQUET: - _vparquet_writer.reset(new VParquetWriterWrapper( + _vfile_writer.reset(new VParquetWriterWrapper( _file_writer_impl.get(), _output_vexpr_ctxs, _file_opts->parquet_schemas, _file_opts->parquet_commpression_type, _file_opts->parquert_disable_dictionary, _file_opts->parquet_version, _output_object_data)); - RETURN_IF_ERROR(_vparquet_writer->init_parquet_writer()); + RETURN_IF_ERROR(_vfile_writer->prepare()); + break; + case TFileFormatType::FORMAT_ORC: + _vfile_writer.reset(new VOrcWriterWrapper(_file_writer_impl.get(), _output_vexpr_ctxs, + _file_opts->orc_schema, _output_object_data)); + RETURN_IF_ERROR(_vfile_writer->prepare()); break; default: return Status::InternalError("unsupported file format: {}", _file_opts->file_format); @@ -177,6 +183,8 @@ std::string VFileResultWriter::_file_format_to_name() { return "csv"; case TFileFormatType::FORMAT_PARQUET: return "parquet"; + case TFileFormatType::FORMAT_ORC: + return "orc"; default: return "unknown"; } @@ -197,8 +205,8 @@ Status VFileResultWriter::append_block(Block& block) { if (UNLIKELY(num_rows == 0)) { return status; } - if (_vparquet_writer) { - _write_parquet_file(output_block); + if (_vfile_writer) { + _write_file(output_block); } else { RETURN_IF_ERROR(_write_csv_file(output_block)); } @@ -207,10 +215,10 @@ Status VFileResultWriter::append_block(Block& block) { return Status::OK(); } -Status VFileResultWriter::_write_parquet_file(const Block& block) { - RETURN_IF_ERROR(_vparquet_writer->write(block)); +Status VFileResultWriter::_write_file(const Block& block) { + RETURN_IF_ERROR(_vfile_writer->write(block)); // split file if exceed limit - _current_written_bytes = _vparquet_writer->written_len(); + _current_written_bytes = _vfile_writer->written_len(); return _create_new_file_if_exceed_size(); } @@ -415,10 +423,10 @@ Status VFileResultWriter::_create_new_file_if_exceed_size() { } Status VFileResultWriter::_close_file_writer(bool done) { - if (_vparquet_writer) { - _vparquet_writer->close(); + if (_vfile_writer) { + _vfile_writer->close(); COUNTER_UPDATE(_written_data_bytes, _current_written_bytes); - _vparquet_writer.reset(nullptr); + _vfile_writer.reset(nullptr); } else if (_file_writer_impl) { _file_writer_impl->close(); } diff --git a/be/src/vec/runtime/vfile_result_writer.h b/be/src/vec/runtime/vfile_result_writer.h index f1f28495ff..277fe07789 100644 --- a/be/src/vec/runtime/vfile_result_writer.h +++ b/be/src/vec/runtime/vfile_result_writer.h @@ -22,9 +22,9 @@ #include "vec/runtime/vparquet_writer.h" #include "vec/sink/vresult_sink.h" -namespace doris { +namespace doris::vectorized { +class VFileWriterWrapper; -namespace vectorized { // write result to file class VFileResultWriter final : public VResultWriter { public: @@ -52,7 +52,7 @@ public: Status write_csv_header(); private: - Status _write_parquet_file(const Block& block); + Status _write_file(const Block& block); Status _write_csv_file(const Block& block); // if buffer exceed the limit, write the data buffered in _plain_text_outstream via file_writer @@ -122,8 +122,7 @@ private: bool _is_result_sent = false; bool _header_sent = false; RowDescriptor _output_row_descriptor; - // parquet file writer - std::unique_ptr _vparquet_writer; + // parquet/orc file writer + std::unique_ptr _vfile_writer; }; -} // namespace vectorized -} // namespace doris +} // namespace doris::vectorized diff --git a/be/src/vec/runtime/vorc_writer.cpp b/be/src/vec/runtime/vorc_writer.cpp new file mode 100644 index 0000000000..2ac738c799 --- /dev/null +++ b/be/src/vec/runtime/vorc_writer.cpp @@ -0,0 +1,481 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/runtime/vorc_writer.h" + +#include "io/file_writer.h" +#include "vec/columns/column_complex.h" +#include "vec/columns/column_nullable.h" +#include "vec/columns/column_string.h" +#include "vec/columns/column_vector.h" +#include "vec/data_types/data_type_decimal.h" +#include "vec/data_types/data_type_nullable.h" +#include "vec/exprs/vexpr.h" +#include "vec/functions/function_helpers.h" + +namespace doris::vectorized { +VOrcOutputStream::VOrcOutputStream(doris::FileWriter* file_writer) + : _file_writer(file_writer), _cur_pos(0), _written_len(0), _name("VOrcOutputStream") {} + +VOrcOutputStream::~VOrcOutputStream() { + if (!_is_closed) { + close(); + } +} + +void VOrcOutputStream::close() { + if (!_is_closed) { + Status st = _file_writer->close(); + if (!st.ok()) { + LOG(WARNING) << "close orc output stream failed: " << st.get_error_msg(); + } + _is_closed = true; + } +} + +void VOrcOutputStream::write(const void* data, size_t length) { + if (!_is_closed) { + size_t written_len = 0; + Status st = _file_writer->write(static_cast(data), length, &written_len); + if (!st.ok()) { + LOG(WARNING) << "Write to ORC file failed: " << st.get_error_msg(); + return; + } + _cur_pos += written_len; + _written_len += written_len; + } +} + +void VOrcOutputStream::set_written_len(int64_t written_len) { + _written_len = written_len; +} + +VOrcWriterWrapper::VOrcWriterWrapper(doris::FileWriter* file_writer, + const std::vector& output_vexpr_ctxs, + const std::string& schema, bool output_object_data) + : VFileWriterWrapper(output_vexpr_ctxs, output_object_data), + _file_writer(file_writer), + _write_options(new orc::WriterOptions()), + _schema_str(schema) {} + +Status VOrcWriterWrapper::prepare() { + try { + _schema = orc::Type::buildTypeFromString(_schema_str); + } catch (const std::exception& e) { + return Status::InternalError("Orc build schema from \"{}\" failed: {}", _schema_str, + e.what()); + } + _output_stream = std::unique_ptr(new VOrcOutputStream(_file_writer)); + _writer = orc::createWriter(*_schema, _output_stream.get(), *_write_options); + if (_writer == nullptr) { + return Status::InternalError("Failed to create file writer"); + } + return Status::OK(); +} + +std::unique_ptr VOrcWriterWrapper::_create_row_batch(size_t sz) { + return _writer->createRowBatch(sz); +} + +int64_t VOrcWriterWrapper::written_len() { + return _output_stream->getLength(); +} + +void VOrcWriterWrapper::close() { + if (_writer != nullptr) { + _writer->close(); + } +} + +#define RETURN_WRONG_TYPE \ + return Status::InvalidArgument("Invalid column type: {}", raw_column->get_name()); + +#define WRITE_SINGLE_ELEMENTS_INTO_BATCH(VECTOR_BATCH, COLUMN) \ + VECTOR_BATCH* cur_batch = dynamic_cast(root->fields[i]); \ + if (null_map != nullptr) { \ + cur_batch->hasNulls = true; \ + auto& null_data = assert_cast(*null_map).get_data(); \ + for (size_t row_id = 0; row_id < sz; row_id++) { \ + if (null_data[row_id] != 0) { \ + cur_batch->notNull[row_id] = 0; \ + } else { \ + cur_batch->notNull[row_id] = 1; \ + cur_batch->data[row_id] = assert_cast(*col).get_data()[row_id]; \ + } \ + } \ + } else if (const auto& not_null_column = check_and_get_column(col)) { \ + for (size_t row_id = 0; row_id < sz; row_id++) { \ + cur_batch->data[row_id] = not_null_column->get_data()[row_id]; \ + } \ + } else { \ + RETURN_WRONG_TYPE \ + } + +#define WRITE_CONTINUOUS_ELEMENTS_INTO_BATCH(VECTOR_BATCH, COLUMN, NATIVE_TYPE) \ + VECTOR_BATCH* cur_batch = dynamic_cast(root->fields[i]); \ + if (null_map != nullptr) { \ + cur_batch->hasNulls = true; \ + auto& null_data = assert_cast(*null_map).get_data(); \ + for (size_t row_id = 0; row_id < sz; row_id++) { \ + if (null_data[row_id] != 0) { \ + cur_batch->notNull[row_id] = 0; \ + } else { \ + cur_batch->notNull[row_id] = 1; \ + cur_batch->data[row_id] = assert_cast(*col).get_data()[row_id]; \ + } \ + } \ + } else if (const auto& not_null_column = check_and_get_column(col)) { \ + memcpy(cur_batch->data.data(), not_null_column->get_data().data(), \ + sz * sizeof(NATIVE_TYPE)); \ + } else { \ + RETURN_WRONG_TYPE \ + } + +#define WRITE_DATE_STRING_INTO_BATCH(FROM, TO) \ + orc::StringVectorBatch* cur_batch = dynamic_cast(root->fields[i]); \ + size_t offset = 0; \ + if (null_map != nullptr) { \ + cur_batch->hasNulls = true; \ + auto& null_data = assert_cast(*null_map).get_data(); \ + for (size_t row_id = 0; row_id < sz; row_id++) { \ + if (null_data[row_id] != 0) { \ + cur_batch->notNull[row_id] = 0; \ + } else { \ + cur_batch->notNull[row_id] = 1; \ + int len = binary_cast( \ + assert_cast&>(*col).get_data()[row_id]) \ + .to_buffer(buffer.ptr); \ + while (buffer.len < offset + len) { \ + char* new_ptr = (char*)malloc(buffer.len + BUFFER_UNIT_SIZE); \ + memcpy(new_ptr, buffer.ptr, buffer.len); \ + free(buffer.ptr); \ + buffer.ptr = new_ptr; \ + buffer.len = buffer.len + BUFFER_UNIT_SIZE; \ + } \ + cur_batch->length[row_id] = len; \ + offset += len; \ + } \ + } \ + offset = 0; \ + for (size_t row_id = 0; row_id < sz; row_id++) { \ + if (null_data[row_id] != 0) { \ + cur_batch->notNull[row_id] = 0; \ + } else { \ + cur_batch->data[row_id] = buffer.ptr + offset; \ + offset += cur_batch->length[row_id]; \ + } \ + } \ + } else if (const auto& not_null_column = \ + check_and_get_column>(col)) { \ + for (size_t row_id = 0; row_id < sz; row_id++) { \ + int len = binary_cast(not_null_column->get_data()[row_id]) \ + .to_buffer(buffer.ptr); \ + while (buffer.len < offset + len) { \ + char* new_ptr = (char*)malloc(buffer.len + BUFFER_UNIT_SIZE); \ + memcpy(new_ptr, buffer.ptr, buffer.len); \ + free(buffer.ptr); \ + buffer.ptr = new_ptr; \ + buffer.len = buffer.len + BUFFER_UNIT_SIZE; \ + } \ + cur_batch->length[row_id] = len; \ + offset += len; \ + } \ + offset = 0; \ + for (size_t row_id = 0; row_id < sz; row_id++) { \ + cur_batch->data[row_id] = buffer.ptr + offset; \ + offset += cur_batch->length[row_id]; \ + } \ + } else { \ + RETURN_WRONG_TYPE \ + } + +#define WRITE_DECIMAL_INTO_BATCH(VECTOR_BATCH, COLUMN) \ + VECTOR_BATCH* cur_batch = dynamic_cast(root->fields[i]); \ + if (null_map != nullptr) { \ + cur_batch->hasNulls = true; \ + auto& null_data = assert_cast(*null_map).get_data(); \ + for (size_t row_id = 0; row_id < sz; row_id++) { \ + if (null_data[row_id] != 0) { \ + cur_batch->notNull[row_id] = 0; \ + } else { \ + cur_batch->notNull[row_id] = 1; \ + cur_batch->values[row_id] = assert_cast(*col).get_data()[row_id]; \ + } \ + } \ + } else if (const auto& not_null_column = check_and_get_column(col)) { \ + for (size_t row_id = 0; row_id < sz; row_id++) { \ + cur_batch->values[row_id] = not_null_column->get_data()[row_id]; \ + } \ + } else { \ + RETURN_WRONG_TYPE \ + } + +#define WRITE_COMPLEX_TYPE_INTO_BATCH(VECTOR_BATCH, COLUMN) \ + VECTOR_BATCH* cur_batch = dynamic_cast(root->fields[i]); \ + if (null_map != nullptr) { \ + cur_batch->hasNulls = true; \ + auto& null_data = assert_cast(*null_map).get_data(); \ + for (size_t row_id = 0; row_id < sz; row_id++) { \ + if (null_data[row_id] != 0) { \ + cur_batch->notNull[row_id] = 0; \ + } else { \ + cur_batch->notNull[row_id] = 1; \ + const auto& ele = col->get_data_at(row_id); \ + cur_batch->data[row_id] = const_cast(ele.data); \ + cur_batch->length[row_id] = ele.size; \ + } \ + } \ + } else if (const auto& not_null_column = check_and_get_column(col)) { \ + for (size_t row_id = 0; row_id < sz; row_id++) { \ + const auto& ele = not_null_column->get_data_at(row_id); \ + cur_batch->data[row_id] = const_cast(ele.data); \ + cur_batch->length[row_id] = ele.size; \ + } \ + } else { \ + RETURN_WRONG_TYPE \ + } + +#define SET_NUM_ELEMENTS cur_batch->numElements = sz; + +Status VOrcWriterWrapper::write(const Block& block) { + if (block.rows() == 0) { + return Status::OK(); + } + + // Buffer used by date type + char* ptr = (char*)malloc(BUFFER_UNIT_SIZE); + StringValue buffer(ptr, BUFFER_UNIT_SIZE); + + size_t sz = block.rows(); + auto row_batch = _create_row_batch(sz); + orc::StructVectorBatch* root = dynamic_cast(row_batch.get()); + try { + for (size_t i = 0; i < block.columns(); i++) { + auto& raw_column = block.get_by_position(i).column; + auto nullable = raw_column->is_nullable(); + const auto col = nullable ? reinterpret_cast( + block.get_by_position(i).column.get()) + ->get_nested_column_ptr() + .get() + : block.get_by_position(i).column.get(); + auto null_map = nullable && reinterpret_cast( + block.get_by_position(i).column.get()) + ->has_null() + ? reinterpret_cast( + block.get_by_position(i).column.get()) + ->get_null_map_column_ptr() + : nullptr; + switch (_output_vexpr_ctxs[i]->root()->type().type) { + case TYPE_BOOLEAN: { + WRITE_SINGLE_ELEMENTS_INTO_BATCH(orc::LongVectorBatch, ColumnVector) + SET_NUM_ELEMENTS + break; + } + case TYPE_TINYINT: { + WRITE_SINGLE_ELEMENTS_INTO_BATCH(orc::LongVectorBatch, ColumnVector) + SET_NUM_ELEMENTS + break; + } + case TYPE_SMALLINT: { + WRITE_SINGLE_ELEMENTS_INTO_BATCH(orc::LongVectorBatch, ColumnVector) + SET_NUM_ELEMENTS + break; + } + case TYPE_INT: { + WRITE_SINGLE_ELEMENTS_INTO_BATCH(orc::LongVectorBatch, ColumnVector) + SET_NUM_ELEMENTS + break; + } + case TYPE_BIGINT: { + WRITE_CONTINUOUS_ELEMENTS_INTO_BATCH(orc::LongVectorBatch, ColumnVector, + Int64) + SET_NUM_ELEMENTS + break; + } + case TYPE_LARGEINT: { + return Status::InvalidArgument("do not support large int type."); + } + case TYPE_FLOAT: { + WRITE_SINGLE_ELEMENTS_INTO_BATCH(orc::DoubleVectorBatch, ColumnVector) + SET_NUM_ELEMENTS + break; + } + case TYPE_DOUBLE: { + WRITE_CONTINUOUS_ELEMENTS_INTO_BATCH(orc::DoubleVectorBatch, ColumnVector, + Float64) + SET_NUM_ELEMENTS + break; + } + case TYPE_DATETIME: + case TYPE_DATE: { + WRITE_DATE_STRING_INTO_BATCH(Int64, VecDateTimeValue) + SET_NUM_ELEMENTS + break; + } + case TYPE_DATEV2: { + WRITE_DATE_STRING_INTO_BATCH(UInt32, DateV2Value) + SET_NUM_ELEMENTS + break; + } + case TYPE_DATETIMEV2: { + orc::StringVectorBatch* cur_batch = + dynamic_cast(root->fields[i]); + size_t offset = 0; + if (null_map != nullptr) { + cur_batch->hasNulls = true; + auto& null_data = assert_cast(*null_map).get_data(); + for (size_t row_id = 0; row_id < sz; row_id++) { + if (null_data[row_id] != 0) { + cur_batch->notNull[row_id] = 0; + } else { + cur_batch->notNull[row_id] = 1; + int output_scale = _output_vexpr_ctxs[i]->root()->type().scale; + int len = binary_cast>( + assert_cast&>(*col) + .get_data()[row_id]) + .to_buffer(buffer.ptr, output_scale); + while (buffer.len < offset + len) { + char* new_ptr = (char*)malloc(buffer.len + BUFFER_UNIT_SIZE); + memcpy(new_ptr, buffer.ptr, buffer.len); + free(buffer.ptr); + buffer.ptr = new_ptr; + buffer.len = buffer.len + BUFFER_UNIT_SIZE; + } + cur_batch->length[row_id] = len; + offset += len; + } + } + offset = 0; + for (size_t row_id = 0; row_id < sz; row_id++) { + if (null_data[row_id] != 0) { + cur_batch->notNull[row_id] = 0; + } else { + cur_batch->data[row_id] = buffer.ptr + offset; + offset += cur_batch->length[row_id]; + } + } + } else if (const auto& not_null_column = + check_and_get_column>(col)) { + for (size_t row_id = 0; row_id < sz; row_id++) { + int output_scale = _output_vexpr_ctxs[i]->root()->type().scale; + int len = binary_cast>( + not_null_column->get_data()[row_id]) + .to_buffer(buffer.ptr, output_scale); + while (buffer.len < offset + len) { + char* new_ptr = (char*)malloc(buffer.len + BUFFER_UNIT_SIZE); + memcpy(new_ptr, buffer.ptr, buffer.len); + free(buffer.ptr); + buffer.ptr = new_ptr; + buffer.len = buffer.len + BUFFER_UNIT_SIZE; + } + cur_batch->length[row_id] = len; + offset += len; + } + offset = 0; + for (size_t row_id = 0; row_id < sz; row_id++) { + cur_batch->data[row_id] = buffer.ptr + offset; + offset += cur_batch->length[row_id]; + } + } else { + RETURN_WRONG_TYPE + } + SET_NUM_ELEMENTS + break; + } + case TYPE_OBJECT: { + if (_output_object_data) { + WRITE_COMPLEX_TYPE_INTO_BATCH(orc::StringVectorBatch, ColumnBitmap) + SET_NUM_ELEMENTS + } else { + RETURN_WRONG_TYPE + } + break; + } + case TYPE_HLL: { + if (_output_object_data) { + WRITE_COMPLEX_TYPE_INTO_BATCH(orc::StringVectorBatch, ColumnHLL) + SET_NUM_ELEMENTS + } else { + RETURN_WRONG_TYPE + } + break; + } + case TYPE_CHAR: + case TYPE_VARCHAR: + case TYPE_STRING: { + WRITE_COMPLEX_TYPE_INTO_BATCH(orc::StringVectorBatch, ColumnString) + SET_NUM_ELEMENTS + break; + } + case TYPE_DECIMAL32: { + WRITE_DECIMAL_INTO_BATCH(orc::Decimal64VectorBatch, ColumnDecimal32) + SET_NUM_ELEMENTS + break; + } + case TYPE_DECIMAL64: { + WRITE_DECIMAL_INTO_BATCH(orc::Decimal64VectorBatch, ColumnDecimal64) + SET_NUM_ELEMENTS + break; + } + case TYPE_DECIMALV2: + case TYPE_DECIMAL128: { + orc::Decimal128VectorBatch* cur_batch = + dynamic_cast(root->fields[i]); + if (null_map != nullptr) { + cur_batch->hasNulls = true; + auto& null_data = assert_cast(*null_map).get_data(); + for (size_t row_id = 0; row_id < sz; row_id++) { + if (null_data[row_id] != 0) { + cur_batch->notNull[row_id] = 0; + } else { + cur_batch->notNull[row_id] = 1; + auto& v = assert_cast(*col).get_data()[row_id]; + orc::Int128 value(v >> 64, (uint64_t)v); + cur_batch->values[row_id] += value; + } + } + } else if (const auto& not_null_column = + check_and_get_column(col)) { + memcpy(reinterpret_cast(cur_batch->values.data()), + not_null_column->get_data().data(), sz * sizeof(Int128)); + } else { + RETURN_WRONG_TYPE + } + SET_NUM_ELEMENTS + break; + } + default: { + return Status::InvalidArgument( + "Invalid expression type: {}", + _output_vexpr_ctxs[i]->root()->type().debug_string()); + } + } + } + } catch (const std::exception& e) { + LOG(WARNING) << "Parquet write error: " << e.what(); + return Status::InternalError(e.what()); + } + root->numElements = sz; + + _writer->add(*row_batch); + _cur_written_rows += sz; + + free(buffer.ptr); + return Status::OK(); +} + +} // namespace doris::vectorized diff --git a/be/src/vec/runtime/vorc_writer.h b/be/src/vec/runtime/vorc_writer.h new file mode 100644 index 0000000000..284e8b7887 --- /dev/null +++ b/be/src/vec/runtime/vorc_writer.h @@ -0,0 +1,91 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include +#include +#include + +#include "common/status.h" +#include "io/file_writer.h" +#include "vec/core/block.h" +#include "vec/exprs/vexpr_context.h" +#include "vec/runtime/vfile_result_writer.h" + +namespace doris::vectorized { +class FileWriter; + +class VOrcOutputStream : public orc::OutputStream { +public: + VOrcOutputStream(doris::FileWriter* file_writer); + + ~VOrcOutputStream() override; + + uint64_t getLength() const override { return _written_len; } + + uint64_t getNaturalWriteSize() const override { return 128 * 1024; } + + void write(const void* buf, size_t length) override; + + const std::string& getName() const override { return _name; } + + void close() override; + + void set_written_len(int64_t written_len); + +private: + doris::FileWriter* _file_writer; // not owned + int64_t _cur_pos = 0; // current write position + bool _is_closed = false; + int64_t _written_len = 0; + const std::string _name; +}; + +// a wrapper of parquet output stream +class VOrcWriterWrapper final : public VFileWriterWrapper { +public: + VOrcWriterWrapper(doris::FileWriter* file_writer, + const std::vector& output_vexpr_ctxs, + const std::string& schema, bool output_object_data); + + ~VOrcWriterWrapper() = default; + + Status prepare() override; + + Status write(const Block& block) override; + + void close() override; + + int64_t written_len() override; + +private: + std::unique_ptr _create_row_batch(size_t sz); + + doris::FileWriter* _file_writer; + std::unique_ptr _output_stream; + std::unique_ptr _write_options; + const std::string& _schema_str; + std::unique_ptr _schema; + std::unique_ptr _writer; + + static constexpr size_t BUFFER_UNIT_SIZE = 4096; +}; + +} // namespace doris::vectorized diff --git a/be/src/vec/runtime/vparquet_writer.cpp b/be/src/vec/runtime/vparquet_writer.cpp index 9737fba854..67d94d8f14 100644 --- a/be/src/vec/runtime/vparquet_writer.cpp +++ b/be/src/vec/runtime/vparquet_writer.cpp @@ -43,10 +43,7 @@ VParquetWriterWrapper::VParquetWriterWrapper(doris::FileWriter* file_writer, const bool& parquet_disable_dictionary, const TParquetVersion::type& parquet_version, bool output_object_data) - : _output_vexpr_ctxs(output_vexpr_ctxs), - _cur_written_rows(0), - _rg_writer(nullptr), - _output_object_data(output_object_data) { + : VFileWriterWrapper(output_vexpr_ctxs, output_object_data), _rg_writer(nullptr) { _outstream = std::shared_ptr(new ParquetOutputStream(file_writer)); parse_properties(compression_type, parquet_disable_dictionary, parquet_version); parse_schema(parquet_schemas); @@ -480,7 +477,7 @@ Status VParquetWriterWrapper::write(const Block& block) { return Status::OK(); } -Status VParquetWriterWrapper::init_parquet_writer() { +Status VParquetWriterWrapper::prepare() { _writer = parquet::ParquetFileWriter::Open(_outstream, _schema, _properties); if (_writer == nullptr) { return Status::InternalError("Failed to create file writer"); diff --git a/be/src/vec/runtime/vparquet_writer.h b/be/src/vec/runtime/vparquet_writer.h index 34806dd9b5..2c5d0e102b 100644 --- a/be/src/vec/runtime/vparquet_writer.h +++ b/be/src/vec/runtime/vparquet_writer.h @@ -37,12 +37,35 @@ #include "exec/parquet_writer.h" #include "vec/core/block.h" #include "vec/exprs/vexpr_context.h" +#include "vec/runtime/vfile_result_writer.h" namespace doris::vectorized { -class FileWriter; + +class VFileWriterWrapper { +public: + VFileWriterWrapper(const std::vector& output_vexpr_ctxs, bool output_object_data) + : _output_vexpr_ctxs(output_vexpr_ctxs), + _cur_written_rows(0), + _output_object_data(output_object_data) {} + + virtual ~VFileWriterWrapper() = default; + + virtual Status prepare() = 0; + + virtual Status write(const Block& block) = 0; + + virtual void close() = 0; + + virtual int64_t written_len() = 0; + +protected: + const std::vector& _output_vexpr_ctxs; + int64_t _cur_written_rows; + bool _output_object_data; +}; // a wrapper of parquet output stream -class VParquetWriterWrapper { +class VParquetWriterWrapper final : public VFileWriterWrapper { public: VParquetWriterWrapper(doris::FileWriter* file_writer, const std::vector& output_vexpr_ctxs, @@ -53,13 +76,13 @@ public: ~VParquetWriterWrapper() = default; - Status init_parquet_writer(); + Status prepare() override; - Status write(const Block& block); + Status write(const Block& block) override; - void close(); + void close() override; - int64_t written_len(); + int64_t written_len() override; private: parquet::RowGroupWriter* get_rg_writer(); @@ -75,11 +98,8 @@ private: std::shared_ptr _properties; std::shared_ptr _schema; std::unique_ptr _writer; - const std::vector& _output_vexpr_ctxs; - int64_t _cur_written_rows = 0; parquet::RowGroupWriter* _rg_writer; const int64_t _max_row_per_group = 10; - bool _output_object_data; }; } // namespace doris::vectorized diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java index d51e3e3b6f..d7b59f675f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java @@ -20,11 +20,13 @@ package org.apache.doris.analysis; import org.apache.doris.backup.HdfsStorage; import org.apache.doris.backup.S3Storage; import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.FeConstants; import org.apache.doris.common.FeNameFormat; +import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; import org.apache.doris.common.util.BrokerUtil; import org.apache.doris.common.util.ParseUtil; @@ -64,6 +66,7 @@ public class OutFileClause { public static final Map PARQUET_DATA_TYPE_MAP = Maps.newHashMap(); public static final Map PARQUET_COMPRESSION_TYPE_MAP = Maps.newHashMap(); public static final Map PARQUET_VERSION_MAP = Maps.newHashMap(); + public static final Set ORC_DATA_TYPE = Sets.newHashSet(); static { RESULT_COL_NAMES.add("FileNumber"); @@ -100,6 +103,15 @@ public class OutFileClause { PARQUET_VERSION_MAP.put("v1", TParquetVersion.PARQUET_1_0); PARQUET_VERSION_MAP.put("latest", TParquetVersion.PARQUET_2_LATEST); + + ORC_DATA_TYPE.add("bigint"); + ORC_DATA_TYPE.add("boolean"); + ORC_DATA_TYPE.add("double"); + ORC_DATA_TYPE.add("float"); + ORC_DATA_TYPE.add("int"); + ORC_DATA_TYPE.add("smallint"); + ORC_DATA_TYPE.add("string"); + ORC_DATA_TYPE.add("tinyint"); } public static final String LOCAL_FILE_PREFIX = "file:///"; @@ -114,6 +126,7 @@ public class OutFileClause { private static final String PROP_MAX_FILE_SIZE = "max_file_size"; private static final String PROP_SUCCESS_FILE_NAME = "success_file_name"; private static final String PARQUET_PROP_PREFIX = "parquet."; + private static final String ORC_PROP_PREFIX = "orc."; private static final String SCHEMA = "schema"; private static final long DEFAULT_MAX_FILE_SIZE_BYTES = 1 * 1024 * 1024 * 1024; // 1GB @@ -138,6 +151,8 @@ public class OutFileClause { private List parquetSchemas = new ArrayList<>(); + private List> orcSchemas = new ArrayList<>(); + private boolean isAnalyzed = false; private String headerType = ""; @@ -207,6 +222,9 @@ public class OutFileClause { case "parquet": fileFormatType = TFileFormatType.FORMAT_PARQUET; break; + case "orc": + fileFormatType = TFileFormatType.FORMAT_ORC; + break; case "csv_with_names": headerType = FeConstants.csv_with_names; fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN; @@ -230,6 +248,146 @@ public class OutFileClause { if (isParquetFormat()) { analyzeForParquetFormat(resultExprs, colLabels); + } else if (isOrcFormat()) { + analyzeForOrcFormat(resultExprs, colLabels); + } + } + + private void genOrcSchema(List resultExprs, List colLabels) throws AnalysisException { + Preconditions.checkState(this.parquetSchemas.isEmpty()); + for (int i = 0; i < resultExprs.size(); ++i) { + Expr expr = resultExprs.get(i); + String type = ""; + switch (expr.getType().getPrimitiveType()) { + case BOOLEAN: + case TINYINT: + case SMALLINT: + case INT: + case BIGINT: + case FLOAT: + case DOUBLE: + case STRING: + type = expr.getType().getPrimitiveType().toString().toLowerCase(); + break; + case HLL: + case BITMAP: + if (!(ConnectContext.get() != null && ConnectContext.get() + .getSessionVariable().isReturnObjectDataAsBinary())) { + break; + } + type = "string"; + break; + case DATE: + case DATETIME: + case DATETIMEV2: + case DATEV2: + case CHAR: + case VARCHAR: + type = "string"; + break; + case DECIMALV2: + if (!expr.getType().isWildcardDecimal()) { + type = String.format("decimal(%d, %d)", ScalarType.MAX_DECIMAL128_PRECISION, + ((ScalarType) expr.getType()).decimalScale()); + } else { + throw new AnalysisException("currently ORC writer do not support WildcardDecimal!"); + } + break; + case DECIMAL32: + case DECIMAL64: + case DECIMAL128: + if (!expr.getType().isWildcardDecimal()) { + type = String.format("decimal(%d, %d)", ((ScalarType) expr.getType()).getPrecision(), + ((ScalarType) expr.getType()).decimalScale()); + } else { + throw new AnalysisException("currently ORC writer do not support WildcardDecimal!"); + } + break; + default: + throw new AnalysisException("currently orc do not support column type: " + + expr.getType().getPrimitiveType()); + } + orcSchemas.add(Pair.of(colLabels.get(i), type)); + } + } + + private String serializeOrcSchema() { + StringBuilder sb = new StringBuilder(); + sb.append("struct<"); + this.orcSchemas.forEach(pair -> sb.append(pair.first + ":" + pair.second + ",")); + if (!this.orcSchemas.isEmpty()) { + return sb.substring(0, sb.length() - 1) + ">"; + } else { + return sb.toString() + ">"; + } + } + + private void analyzeForOrcFormat(List resultExprs, List colLabels) throws AnalysisException { + if (this.orcSchemas.isEmpty()) { + genOrcSchema(resultExprs, colLabels); + } + // check schema number + if (resultExprs.size() != this.orcSchemas.size()) { + throw new AnalysisException("Orc schema number does not equal to select item number"); + } + // check type + for (int i = 0; i < this.orcSchemas.size(); ++i) { + Pair schema = this.orcSchemas.get(i); + Type resultType = resultExprs.get(i).getType(); + switch (resultType.getPrimitiveType()) { + case BOOLEAN: + case TINYINT: + case SMALLINT: + case INT: + case BIGINT: + case FLOAT: + case DOUBLE: + case STRING: + if (!schema.second.equals(resultType.getPrimitiveType().toString().toLowerCase())) { + throw new AnalysisException("project field type is " + resultType.getPrimitiveType().toString() + + ", should use " + resultType.getPrimitiveType().toString() + "," + + " but the type of column " + i + " is " + schema.second); + } + break; + case DATE: + case DATETIME: + case DATETIMEV2: + case DATEV2: + case CHAR: + case VARCHAR: + if (!schema.second.equals("string")) { + throw new AnalysisException("project field type is " + resultType.getPrimitiveType().toString() + + ", should use string, but the definition type of column " + i + " is " + + schema.second); + } + break; + case DECIMAL32: + case DECIMAL64: + case DECIMAL128: + case DECIMALV2: + if (!schema.second.startsWith("decimal")) { + throw new AnalysisException("project field type is " + resultType.getPrimitiveType().toString() + + ", should use string, but the definition type of column " + i + " is " + + schema.second); + } + break; + case HLL: + case BITMAP: + if (ConnectContext.get() != null && ConnectContext.get() + .getSessionVariable().isReturnObjectDataAsBinary()) { + if (!schema.second.equals("string")) { + throw new AnalysisException("project field type is HLL/BITMAP, should use string, " + + "but the definition type of column " + i + " is " + schema.second); + } + } else { + throw new AnalysisException("Orc format does not support column type: " + + resultType.getPrimitiveType()); + } + break; + default: + throw new AnalysisException("Orc format does not support column type: " + + resultType.getPrimitiveType()); + } } } @@ -438,6 +596,10 @@ public class OutFileClause { getParquetProperties(processedPropKeys); } + if (this.fileFormatType == TFileFormatType.FORMAT_ORC) { + getOrcProperties(processedPropKeys); + } + if (processedPropKeys.size() != properties.size()) { LOG.debug("{} vs {}", processedPropKeys, properties); throw new AnalysisException("Unknown properties: " + properties.keySet().stream() @@ -575,6 +737,41 @@ public class OutFileClause { processedPropKeys.add(SCHEMA); } + private void getOrcProperties(Set processedPropKeys) throws AnalysisException { + // check schema. if schema is not set, Doris will gen schema by select items + String schema = properties.get(SCHEMA); + if (schema == null) { + return; + } + if (schema.isEmpty()) { + throw new AnalysisException("Orc schema property should not be empty"); + } + schema = schema.replace(" ", ""); + schema = schema.toLowerCase(); + String[] schemas = schema.split(";"); + for (String item : schemas) { + String[] properties = item.split(","); + if (properties.length != 2) { + throw new AnalysisException("must only contains type and column name"); + } + if (!ORC_DATA_TYPE.contains(properties[1]) && !properties[1].startsWith("decimal")) { + throw new AnalysisException("data type is not supported:" + properties[1]); + } else if (!ORC_DATA_TYPE.contains(properties[1]) && properties[1].startsWith("decimal")) { + String errorMsg = "Format of decimal type must be decimal(%d,%d)"; + String precisionAndScale = properties[1].substring(0, "decimal".length()).trim(); + if (!precisionAndScale.startsWith("(") || !precisionAndScale.endsWith(")")) { + throw new AnalysisException(errorMsg); + } + String[] str = precisionAndScale.substring(1, precisionAndScale.length() - 1).split(","); + if (str.length != 2) { + throw new AnalysisException(errorMsg); + } + } + orcSchemas.add(Pair.of(properties[0], properties[1])); + } + processedPropKeys.add(SCHEMA); + } + private boolean isCsvFormat() { return fileFormatType == TFileFormatType.FORMAT_CSV_BZ2 || fileFormatType == TFileFormatType.FORMAT_CSV_DEFLATE @@ -589,6 +786,10 @@ public class OutFileClause { return fileFormatType == TFileFormatType.FORMAT_PARQUET; } + private boolean isOrcFormat() { + return fileFormatType == TFileFormatType.FORMAT_ORC; + } + @Override public OutFileClause clone() { return new OutFileClause(this); @@ -635,6 +836,9 @@ public class OutFileClause { sinkOptions.setParquetVersion(parquetVersion); sinkOptions.setParquetSchemas(parquetSchemas); } + if (isOrcFormat()) { + sinkOptions.setOrcSchema(serializeOrcSchema()); + } return sinkOptions; } } diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift index 01fe5f2b6e..6a5a6e1944 100644 --- a/gensrc/thrift/DataSinks.thrift +++ b/gensrc/thrift/DataSinks.thrift @@ -100,6 +100,7 @@ struct TResultFileSinkOptions { 12: optional TParquetCompressionType parquet_compression_type 13: optional bool parquet_disable_dictionary 14: optional TParquetVersion parquet_version + 15: optional string orc_schema } struct TMemoryScratchSink {