From 54d062ddeeea50a74a760ec3020de4a0a99656eb Mon Sep 17 00:00:00 2001 From: wuwenchi Date: Wed, 6 Dec 2023 23:29:46 +0800 Subject: [PATCH] [feature](stream load) (step one)Add arrow data type for stream load (#26709) By using the Arrow data format, we can reduce the streamload of data transferred and improve the data import performance --- be/src/util/load_util.cpp | 3 + .../serde/data_type_datetimev2_serde.cpp | 54 ++++++++ .../serde/data_type_datetimev2_serde.h | 5 +- .../serde/data_type_datev2_serde.cpp | 2 +- .../serde/data_type_decimal_serde.cpp | 8 +- .../format/arrow/arrow_pip_input_stream.cpp | 101 +++++++++++++++ .../format/arrow/arrow_pip_input_stream.h | 66 ++++++++++ .../exec/format/arrow/arrow_stream_reader.cpp | 117 ++++++++++++++++++ .../exec/format/arrow/arrow_stream_reader.h | 76 ++++++++++++ be/src/vec/exec/scan/vfile_scanner.cpp | 9 +- .../doris/analysis/DataDescription.java | 4 + .../common/util/FileFormatConstants.java | 1 + .../org/apache/doris/common/util/Util.java | 4 +- .../main/java/org/apache/doris/load/Load.java | 14 ++- gensrc/thrift/PlanNodes.thrift | 1 + 15 files changed, 452 insertions(+), 13 deletions(-) create mode 100644 be/src/vec/exec/format/arrow/arrow_pip_input_stream.cpp create mode 100644 be/src/vec/exec/format/arrow/arrow_pip_input_stream.h create mode 100644 be/src/vec/exec/format/arrow/arrow_stream_reader.cpp create mode 100644 be/src/vec/exec/format/arrow/arrow_stream_reader.h diff --git a/be/src/util/load_util.cpp b/be/src/util/load_util.cpp index 1a0dff2f90..789038823f 100644 --- a/be/src/util/load_util.cpp +++ b/be/src/util/load_util.cpp @@ -69,6 +69,8 @@ void LoadUtil::parse_format(const std::string& format_str, const std::string& co *format_type = TFileFormatType::FORMAT_ORC; } else if (iequal(format_str, "WAL")) { *format_type = TFileFormatType::FORMAT_WAL; + } else if (iequal(format_str, "ARROW")) { + *format_type = TFileFormatType::FORMAT_ARROW; } return; } @@ -85,6 +87,7 @@ bool LoadUtil::is_format_support_streaming(TFileFormatType::type format) { case TFileFormatType::FORMAT_CSV_LZOP: case TFileFormatType::FORMAT_JSON: case TFileFormatType::FORMAT_WAL: + case TFileFormatType::FORMAT_ARROW: return true; default: return false; diff --git a/be/src/vec/data_types/serde/data_type_datetimev2_serde.cpp b/be/src/vec/data_types/serde/data_type_datetimev2_serde.cpp index d0839d2c9b..8fcf9c3c94 100644 --- a/be/src/vec/data_types/serde/data_type_datetimev2_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_datetimev2_serde.cpp @@ -19,11 +19,19 @@ #include +#include // IWYU pragma: keep #include #include "vec/columns/column_const.h" #include "vec/io/io_helper.h" +enum { + DIVISOR_FOR_SECOND = 1, + DIVISOR_FOR_MILLI = 1000, + DIVISOR_FOR_MICRO = 1000000, + DIVISOR_FOR_NANO = 1000000000 +}; + namespace doris { namespace vectorized { static const int64_t timestamp_threshold = -2177481943; @@ -110,6 +118,52 @@ void DataTypeDateTimeV2SerDe::write_column_to_arrow(const IColumn& column, const } } +void DataTypeDateTimeV2SerDe::read_column_from_arrow(IColumn& column, + const arrow::Array* arrow_array, int start, + int end, const cctz::time_zone& ctz) const { + auto& col_data = static_cast&>(column).get_data(); + int64_t divisor = 1; + if (arrow_array->type()->id() == arrow::Type::TIMESTAMP) { + auto concrete_array = dynamic_cast(arrow_array); + const auto type = std::static_pointer_cast(arrow_array->type()); + switch (type->unit()) { + case arrow::TimeUnit::type::SECOND: { + divisor = DIVISOR_FOR_SECOND; + break; + } + case arrow::TimeUnit::type::MILLI: { + divisor = DIVISOR_FOR_MILLI; + break; + } + case arrow::TimeUnit::type::MICRO: { + divisor = DIVISOR_FOR_MICRO; + break; + } + case arrow::TimeUnit::type::NANO: { + divisor = DIVISOR_FOR_NANO; + break; + } + default: { + LOG(WARNING) << "not support convert to datetimev2 from time_unit:" << type->unit(); + return; + } + } + for (size_t value_i = start; value_i < end; ++value_i) { + auto utc_epoch = static_cast(concrete_array->Value(value_i)); + + DateV2Value v; + // convert second + v.from_unixtime(utc_epoch / divisor, ctz); + // get rest time + v.set_microsecond(utc_epoch % divisor); + col_data.emplace_back(binary_cast, UInt64>(v)); + } + } else { + LOG(WARNING) << "not support convert to datetimev2 from arrow type:" + << arrow_array->type()->id(); + } +} + template Status DataTypeDateTimeV2SerDe::_write_column_to_mysql(const IColumn& column, MysqlRowBuffer& result, diff --git a/be/src/vec/data_types/serde/data_type_datetimev2_serde.h b/be/src/vec/data_types/serde/data_type_datetimev2_serde.h index c88f10a52a..a6bb8d4554 100644 --- a/be/src/vec/data_types/serde/data_type_datetimev2_serde.h +++ b/be/src/vec/data_types/serde/data_type_datetimev2_serde.h @@ -63,10 +63,7 @@ public: arrow::ArrayBuilder* array_builder, int start, int end) const override; void read_column_from_arrow(IColumn& column, const arrow::Array* arrow_array, int start, - int end, const cctz::time_zone& ctz) const override { - throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, - "read_column_from_arrow with type " + column.get_name()); - } + int end, const cctz::time_zone& ctz) const override; Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer& row_buffer, int row_idx, bool col_const) const override; diff --git a/be/src/vec/data_types/serde/data_type_datev2_serde.cpp b/be/src/vec/data_types/serde/data_type_datev2_serde.cpp index f0462d5697..788be2fd00 100644 --- a/be/src/vec/data_types/serde/data_type_datev2_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_datev2_serde.cpp @@ -101,7 +101,7 @@ void DataTypeDateV2SerDe::read_column_from_arrow(IColumn& column, const arrow::A int start, int end, const cctz::time_zone& ctz) const { auto& col_data = static_cast&>(column).get_data(); - auto concrete_array = dynamic_cast(arrow_array); + auto concrete_array = dynamic_cast(arrow_array); int64_t divisor = 1; int64_t multiplier = 1; diff --git a/be/src/vec/data_types/serde/data_type_decimal_serde.cpp b/be/src/vec/data_types/serde/data_type_decimal_serde.cpp index 1f2263b882..65b504474f 100644 --- a/be/src/vec/data_types/serde/data_type_decimal_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_decimal_serde.cpp @@ -167,6 +167,8 @@ void DataTypeDecimalSerDe::read_column_from_arrow(IColumn& column, static_cast(arrow_array->type().get()); const auto arrow_scale = arrow_decimal_type->scale(); auto& column_data = static_cast&>(column).get_data(); + // Decimal for decimalv2 + // Decimal for deicmalv3 if constexpr (std::is_same_v>) { // TODO check precision for (size_t value_i = start; value_i < end; ++value_i) { @@ -190,13 +192,13 @@ void DataTypeDecimalSerDe::read_column_from_arrow(IColumn& column, } column_data.emplace_back(value); } - } else if constexpr (std::is_same_v || std::is_same_v) { + } else if constexpr (std::is_same_v || std::is_same_v || + std::is_same_v) { for (size_t value_i = start; value_i < end; ++value_i) { column_data.emplace_back(*reinterpret_cast(concrete_array->Value(value_i))); } } else { - throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, - "read_column_from_arrow with type " + column.get_name()); + LOG(WARNING) << "Unsuppoted convertion to decimal from " << column.get_name(); } } diff --git a/be/src/vec/exec/format/arrow/arrow_pip_input_stream.cpp b/be/src/vec/exec/format/arrow/arrow_pip_input_stream.cpp new file mode 100644 index 0000000000..1c9edb84a4 --- /dev/null +++ b/be/src/vec/exec/format/arrow/arrow_pip_input_stream.cpp @@ -0,0 +1,101 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow_pip_input_stream.h" + +#include "arrow/array.h" +#include "arrow/buffer.h" +#include "arrow/io/buffered.h" +#include "arrow/io/stdio.h" +#include "arrow/ipc/options.h" +#include "arrow/ipc/reader.h" +#include "arrow/record_batch.h" +#include "arrow/result.h" +#include "common/logging.h" +#include "io/fs/stream_load_pipe.h" +#include "olap/wal_manager.h" +#include "runtime/runtime_state.h" + +namespace doris::vectorized { + +ArrowPipInputStream::ArrowPipInputStream(io::FileReaderSPtr file_reader) + : _file_reader(file_reader), _pos(0), _begin(true), _read_buf(new uint8_t[4]) { + set_mode(arrow::io::FileMode::READ); +} + +arrow::Status ArrowPipInputStream::Close() { + return arrow::Status::OK(); +} + +bool ArrowPipInputStream::closed() const { + return false; +} + +arrow::Result ArrowPipInputStream::Tell() const { + return _pos; +} + +Status ArrowPipInputStream::HasNext(bool* get) { + // 1. Arrow's serialization uses a 4-byte data to specify the length of the data that follows, + // so there must be 4-byte data here. + // 2. If it is not determined whether there is a next batch of data (the data has already been transmitted), + // then the `_file_reader->read_at` will return a buff with a read length of 0, + // and the `RecordBatchStreamReader::Open` function will directly report an error when it gets this buff + Slice file_slice(_read_buf, 4); + size_t read_length = 0; + RETURN_IF_ERROR(_file_reader->read_at(0, file_slice, &read_length, NULL)); + if (read_length == 0) { + *get = false; + } else { + *get = true; + } + return Status::OK(); +} + +arrow::Result ArrowPipInputStream::Read(int64_t nbytes, void* out) { + // RecordBatchStreamReader::Open will create a new reader that will stream a batch of arrow data. + // But the first four bytes of this batch of data were taken by the HasNext function, so they need to be copied back here. + uint8_t* out_ptr = (uint8_t*)out; + if (_begin) { + memmove(out_ptr, _read_buf, 4); + out_ptr += 4; + nbytes -= 4; + } + + Slice file_slice(out_ptr, nbytes); + size_t read_length = 0; + Status status = _file_reader->read_at(0, file_slice, &read_length, NULL); + if (UNLIKELY(!status.ok())) { + return arrow::Status::IOError("Error to read data from pip"); + } + + if (_begin) { + read_length += 4; + _begin = false; + } + return (int64_t)read_length; +} + +arrow::Result> ArrowPipInputStream::Read(int64_t nbytes) { + ARROW_ASSIGN_OR_RAISE(auto buffer, arrow::AllocateResizableBuffer(nbytes)); + ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, Read(nbytes, buffer->mutable_data())); + ARROW_RETURN_NOT_OK(buffer->Resize(bytes_read, false)); + buffer->ZeroPadding(); + return buffer; +} + +} // namespace doris::vectorized diff --git a/be/src/vec/exec/format/arrow/arrow_pip_input_stream.h b/be/src/vec/exec/format/arrow/arrow_pip_input_stream.h new file mode 100644 index 0000000000..fef4cf1090 --- /dev/null +++ b/be/src/vec/exec/format/arrow/arrow_pip_input_stream.h @@ -0,0 +1,66 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include "arrow/io/interfaces.h" +#include "io/file_factory.h" + +namespace doris { + +namespace io { +class FileSystem; +struct IOContext; +} // namespace io + +namespace vectorized { + +class ArrowPipInputStream : public arrow::io::InputStream { + ENABLE_FACTORY_CREATOR(ArrowPipInputStream); + +public: + ArrowPipInputStream(io::FileReaderSPtr file_reader); + ~ArrowPipInputStream() override {} + + arrow::Status Close() override; + bool closed() const override; + + arrow::Result Tell() const override; + + arrow::Result Read(int64_t nbytes, void* out) override; + + arrow::Result> Read(int64_t nbytes) override; + + Status HasNext(bool* get); + +private: + io::FileReaderSPtr _file_reader; + int64_t _pos; + bool _begin; + uint8_t* _read_buf; +}; + +} // namespace vectorized +} // namespace doris diff --git a/be/src/vec/exec/format/arrow/arrow_stream_reader.cpp b/be/src/vec/exec/format/arrow/arrow_stream_reader.cpp new file mode 100644 index 0000000000..29ee451e3a --- /dev/null +++ b/be/src/vec/exec/format/arrow/arrow_stream_reader.cpp @@ -0,0 +1,117 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow_stream_reader.h" + +#include "arrow/array.h" +#include "arrow/io/buffered.h" +#include "arrow/io/stdio.h" +#include "arrow/ipc/options.h" +#include "arrow/ipc/reader.h" +#include "arrow/record_batch.h" +#include "arrow/result.h" +#include "arrow_pip_input_stream.h" +#include "common/logging.h" +#include "io/fs/stream_load_pipe.h" +#include "olap/wal_manager.h" +#include "runtime/descriptors.h" +#include "runtime/runtime_state.h" +#include "vec/utils/arrow_column_to_doris_column.h" + +namespace doris { +class RuntimeProfile; +} // namespace doris + +namespace doris::vectorized { + +ArrowStreamReader::ArrowStreamReader(RuntimeState* state, RuntimeProfile* profile, + ScannerCounter* counter, const TFileScanRangeParams& params, + const TFileRangeDesc& range, + const std::vector& file_slot_descs, + io::IOContext* io_ctx) + : _state(state), _range(range), _file_slot_descs(file_slot_descs), _file_reader(nullptr) { + TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone, _ctzz); +} + +ArrowStreamReader::~ArrowStreamReader() = default; + +Status ArrowStreamReader::init_reader() { + RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &_file_reader, _state, false)); + _pip_stream = ArrowPipInputStream::create_unique(_file_reader); + return Status::OK(); +} + +Status ArrowStreamReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { + bool has_next = false; + RETURN_IF_ERROR(_pip_stream->HasNext(&has_next)); + if (!has_next) { + *read_rows = 0; + *eof = true; + return Status::OK(); + } + + // create a reader to read data + arrow::Result> tRet = + arrow::ipc::RecordBatchStreamReader::Open(_pip_stream.get(), + arrow::ipc::IpcReadOptions::Defaults()); + if (!tRet.ok()) { + LOG(WARNING) << "failed to open stream reader: " << tRet.status().message(); + return Status::InternalError("failed to open stream reader: {}", tRet.status().message()); + } + auto reader = std::move(tRet).ValueUnsafe(); + + // get arrow data from reader + arrow::Result tRet2 = reader->ToRecordBatches(); + if (!tRet2.ok()) { + LOG(WARNING) << "failed to read batch: " << tRet2.status().message(); + return Status::InternalError("failed to read batch: {}", tRet2.status().message()); + } + std::vector> out_batches = std::move(tRet2).ValueUnsafe(); + + // convert arrow batch to block + auto columns = block->mutate_columns(); + int batch_size = out_batches.size(); + for (int i = 0; i < batch_size; i++) { + arrow::RecordBatch& batch = *out_batches[i]; + int num_rows = batch.num_rows(); + int num_columns = batch.num_columns(); + for (int c = 0; c < num_columns; ++c) { + arrow::Array* column = batch.column(c).get(); + + std::string column_name = batch.schema()->field(c)->name(); + + vectorized::ColumnWithTypeAndName& column_with_name = block->get_by_name(column_name); + + column_with_name.type->get_serde()->read_column_from_arrow( + column_with_name.column->assume_mutable_ref(), column, 0, num_rows, _ctzz); + } + *read_rows += batch.num_rows(); + } + + *eof = (*read_rows == 0); + return Status::OK(); +} + +Status ArrowStreamReader::get_columns(std::unordered_map* name_to_type, + std::unordered_set* missing_cols) { + for (auto& slot : _file_slot_descs) { + name_to_type->emplace(slot->col_name(), slot->type()); + } + return Status::OK(); +} + +} // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/exec/format/arrow/arrow_stream_reader.h b/be/src/vec/exec/format/arrow/arrow_stream_reader.h new file mode 100644 index 0000000000..eb0acca4ae --- /dev/null +++ b/be/src/vec/exec/format/arrow/arrow_stream_reader.h @@ -0,0 +1,76 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include "arrow_pip_input_stream.h" +#include "cctz/time_zone.h" +#include "common/status.h" +#include "io/file_factory.h" +#include "io/fs/file_reader_writer_fwd.h" +#include "util/slice.h" +#include "vec/data_types/data_type.h" +#include "vec/exec/format/file_reader/new_plain_text_line_reader.h" +#include "vec/exec/format/generic_reader.h" + +namespace doris { + +namespace io { +class FileSystem; +struct IOContext; +} // namespace io + +namespace vectorized { + +struct ScannerCounter; +class Block; + +class ArrowStreamReader : public GenericReader { + ENABLE_FACTORY_CREATOR(ArrowStreamReader); + +public: + ArrowStreamReader(RuntimeState* state, RuntimeProfile* profile, ScannerCounter* counter, + const TFileScanRangeParams& params, const TFileRangeDesc& range, + const std::vector& file_slot_descs, io::IOContext* io_ctx); + + ~ArrowStreamReader() override; + + Status init_reader(); + + Status get_next_block(Block* block, size_t* read_rows, bool* eof) override; + + Status get_columns(std::unordered_map* name_to_type, + std::unordered_set* missing_cols) override; + +private: + RuntimeState* _state; + const TFileRangeDesc& _range; + const std::vector& _file_slot_descs; + io::FileReaderSPtr _file_reader; + std::unique_ptr _pip_stream; + cctz::time_zone _ctzz; +}; +} // namespace vectorized +} // namespace doris diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index 80d6518e72..6bee42bfe8 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -53,6 +53,7 @@ #include "vec/data_types/data_type_nullable.h" #include "vec/data_types/data_type_number.h" #include "vec/data_types/data_type_string.h" +#include "vec/exec/format/arrow/arrow_stream_reader.h" #include "vec/exec/format/avro/avro_jni_reader.h" #include "vec/exec/format/csv/csv_reader.h" #include "vec/exec/format/json/new_json_reader.h" @@ -448,7 +449,7 @@ Status VFileScanner::_cast_to_input_block(Block* block) { auto return_type = slot_desc->get_data_type_ptr(); // remove nullable here, let the get_function decide whether nullable auto data_type = vectorized::DataTypeFactory::instance().create_data_type( - remove_nullable(return_type)->get_type_id()); + remove_nullable(return_type)->get_type_as_type_descriptor()); ColumnsWithTypeAndName arguments { arg, {data_type->create_column(), data_type, slot_desc->col_name()}}; auto func_cast = @@ -873,6 +874,12 @@ Status VFileScanner::_get_next_reader() { init_status = ((WalReader*)(_cur_reader.get()))->init_reader(); break; } + case TFileFormatType::FORMAT_ARROW: { + _cur_reader = ArrowStreamReader::create_unique(_state, _profile, &_counter, *_params, + range, _file_slot_descs, _io_ctx.get()); + init_status = ((ArrowStreamReader*)(_cur_reader.get()))->init_reader(); + break; + } default: return Status::InternalError("Not supported file format: {}", _params->format_type); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java index 22e9dca39c..dab0859fb0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java @@ -357,6 +357,9 @@ public class DataDescription implements InsertStmt.DataDesc { case FORMAT_WAL: this.fileFormat = "wal"; break; + case FORMAT_ARROW: + this.fileFormat = "arrow"; + break; default: this.fileFormat = "unknown"; break; @@ -1124,6 +1127,7 @@ public class DataDescription implements InsertStmt.DataDesc { && !fileFormat.equalsIgnoreCase(FileFormatConstants.FORMAT_ORC) && !fileFormat.equalsIgnoreCase(FileFormatConstants.FORMAT_JSON) && !fileFormat.equalsIgnoreCase(FileFormatConstants.FORMAT_WAL) + && !fileFormat.equalsIgnoreCase(FileFormatConstants.FORMAT_ARROW) && !fileFormat.equalsIgnoreCase(FileFormatConstants.FORMAT_HIVE_TEXT)) { throw new AnalysisException("File Format Type " + fileFormat + " is invalid."); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatConstants.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatConstants.java index 7d60222d29..e86c06fcb6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatConstants.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatConstants.java @@ -33,6 +33,7 @@ public class FileFormatConstants { public static final String FORMAT_JSON = "json"; public static final String FORMAT_AVRO = "avro"; public static final String FORMAT_WAL = "wal"; + public static final String FORMAT_ARROW = "arrow"; public static final String PROP_FORMAT = "format"; public static final String PROP_COLUMN_SEPARATOR = "column_separator"; diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java index 3b0676118f..58de5133c6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java @@ -564,8 +564,10 @@ public class Util { // TODO: Add TEXTFILE to TFileFormatType to Support hive text file format. || lowerFileFormat.equals(FileFormatConstants.FORMAT_HIVE_TEXT)) { return TFileFormatType.FORMAT_CSV_PLAIN; - } else if (lowerFileFormat.equals("wal")) { + } else if (lowerFileFormat.equals(FileFormatConstants.FORMAT_WAL)) { return TFileFormatType.FORMAT_WAL; + } else if (lowerFileFormat.equals(FileFormatConstants.FORMAT_ARROW)) { + return TFileFormatType.FORMAT_ARROW; } else { return TFileFormatType.FORMAT_UNKNOWN; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java index ea50266539..714bd4b6f7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java @@ -344,10 +344,12 @@ public class Load { for (ImportColumnDesc importColumnDesc : copiedColumnExprs) { columnExprMap.put(importColumnDesc.getColumnName(), importColumnDesc.getExpr()); } + HashMap colToType = new HashMap<>(); // check default value and auto-increment column for (Column column : tbl.getBaseSchema()) { String columnName = column.getName(); + colToType.put(columnName, column.getType()); if (columnExprMap.containsKey(columnName)) { continue; } @@ -427,9 +429,15 @@ public class Load { exprsByName.put(realColName, expr); } else { SlotDescriptor slotDesc = analyzer.getDescTbl().addSlotDescriptor(srcTupleDesc); - // columns default be varchar type - slotDesc.setType(ScalarType.createType(PrimitiveType.VARCHAR)); - slotDesc.setColumn(new Column(realColName, PrimitiveType.VARCHAR)); + + if (formatType == TFileFormatType.FORMAT_ARROW) { + slotDesc.setColumn(new Column(realColName, colToType.get(realColName))); + } else { + // columns default be varchar type + slotDesc.setType(ScalarType.createType(PrimitiveType.VARCHAR)); + slotDesc.setColumn(new Column(realColName, PrimitiveType.VARCHAR)); + } + // ISSUE A: src slot should be nullable even if the column is not nullable. // because src slot is what we read from file, not represent to real column value. // If column is not nullable, error will be thrown when filling the dest slot, diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index b78bd90b4e..9e020abdbd 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -120,6 +120,7 @@ enum TFileFormatType { FORMAT_CSV_LZ4BLOCK, FORMAT_CSV_SNAPPYBLOCK, FORMAT_WAL, + FORMAT_ARROW } // In previous versions, the data compression format and file format were stored together, as TFileFormatType,