[feature](stream load) (step one)Add arrow data type for stream load (#26709)

By using the Arrow data format, we can reduce the streamload of data transferred and improve the data import performance
This commit is contained in:
wuwenchi
2023-12-06 23:29:46 +08:00
committed by GitHub
parent 2ca66ff61c
commit 54d062ddee
15 changed files with 452 additions and 13 deletions

View File

@ -69,6 +69,8 @@ void LoadUtil::parse_format(const std::string& format_str, const std::string& co
*format_type = TFileFormatType::FORMAT_ORC;
} else if (iequal(format_str, "WAL")) {
*format_type = TFileFormatType::FORMAT_WAL;
} else if (iequal(format_str, "ARROW")) {
*format_type = TFileFormatType::FORMAT_ARROW;
}
return;
}
@ -85,6 +87,7 @@ bool LoadUtil::is_format_support_streaming(TFileFormatType::type format) {
case TFileFormatType::FORMAT_CSV_LZOP:
case TFileFormatType::FORMAT_JSON:
case TFileFormatType::FORMAT_WAL:
case TFileFormatType::FORMAT_ARROW:
return true;
default:
return false;

View File

@ -19,11 +19,19 @@
#include <arrow/builder.h>
#include <chrono> // IWYU pragma: keep
#include <type_traits>
#include "vec/columns/column_const.h"
#include "vec/io/io_helper.h"
enum {
DIVISOR_FOR_SECOND = 1,
DIVISOR_FOR_MILLI = 1000,
DIVISOR_FOR_MICRO = 1000000,
DIVISOR_FOR_NANO = 1000000000
};
namespace doris {
namespace vectorized {
static const int64_t timestamp_threshold = -2177481943;
@ -110,6 +118,52 @@ void DataTypeDateTimeV2SerDe::write_column_to_arrow(const IColumn& column, const
}
}
void DataTypeDateTimeV2SerDe::read_column_from_arrow(IColumn& column,
const arrow::Array* arrow_array, int start,
int end, const cctz::time_zone& ctz) const {
auto& col_data = static_cast<ColumnVector<Int64>&>(column).get_data();
int64_t divisor = 1;
if (arrow_array->type()->id() == arrow::Type::TIMESTAMP) {
auto concrete_array = dynamic_cast<const arrow::TimestampArray*>(arrow_array);
const auto type = std::static_pointer_cast<arrow::TimestampType>(arrow_array->type());
switch (type->unit()) {
case arrow::TimeUnit::type::SECOND: {
divisor = DIVISOR_FOR_SECOND;
break;
}
case arrow::TimeUnit::type::MILLI: {
divisor = DIVISOR_FOR_MILLI;
break;
}
case arrow::TimeUnit::type::MICRO: {
divisor = DIVISOR_FOR_MICRO;
break;
}
case arrow::TimeUnit::type::NANO: {
divisor = DIVISOR_FOR_NANO;
break;
}
default: {
LOG(WARNING) << "not support convert to datetimev2 from time_unit:" << type->unit();
return;
}
}
for (size_t value_i = start; value_i < end; ++value_i) {
auto utc_epoch = static_cast<UInt64>(concrete_array->Value(value_i));
DateV2Value<DateTimeV2ValueType> v;
// convert second
v.from_unixtime(utc_epoch / divisor, ctz);
// get rest time
v.set_microsecond(utc_epoch % divisor);
col_data.emplace_back(binary_cast<DateV2Value<DateTimeV2ValueType>, UInt64>(v));
}
} else {
LOG(WARNING) << "not support convert to datetimev2 from arrow type:"
<< arrow_array->type()->id();
}
}
template <bool is_binary_format>
Status DataTypeDateTimeV2SerDe::_write_column_to_mysql(const IColumn& column,
MysqlRowBuffer<is_binary_format>& result,

View File

@ -63,10 +63,7 @@ public:
arrow::ArrayBuilder* array_builder, int start,
int end) const override;
void read_column_from_arrow(IColumn& column, const arrow::Array* arrow_array, int start,
int end, const cctz::time_zone& ctz) const override {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
"read_column_from_arrow with type " + column.get_name());
}
int end, const cctz::time_zone& ctz) const override;
Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<true>& row_buffer,
int row_idx, bool col_const) const override;

View File

@ -101,7 +101,7 @@ void DataTypeDateV2SerDe::read_column_from_arrow(IColumn& column, const arrow::A
int start, int end,
const cctz::time_zone& ctz) const {
auto& col_data = static_cast<ColumnVector<UInt32>&>(column).get_data();
auto concrete_array = dynamic_cast<const arrow::Date64Array*>(arrow_array);
auto concrete_array = dynamic_cast<const arrow::Date32Array*>(arrow_array);
int64_t divisor = 1;
int64_t multiplier = 1;

View File

@ -167,6 +167,8 @@ void DataTypeDecimalSerDe<T>::read_column_from_arrow(IColumn& column,
static_cast<const arrow::DecimalType*>(arrow_array->type().get());
const auto arrow_scale = arrow_decimal_type->scale();
auto& column_data = static_cast<ColumnDecimal<T>&>(column).get_data();
// Decimal<Int128> for decimalv2
// Decimal<Int128I> for deicmalv3
if constexpr (std::is_same_v<T, Decimal<Int128>>) {
// TODO check precision
for (size_t value_i = start; value_i < end; ++value_i) {
@ -190,13 +192,13 @@ void DataTypeDecimalSerDe<T>::read_column_from_arrow(IColumn& column,
}
column_data.emplace_back(value);
}
} else if constexpr (std::is_same_v<T, Decimal64> || std::is_same_v<T, Decimal32>) {
} else if constexpr (std::is_same_v<T, Decimal128I> || std::is_same_v<T, Decimal64> ||
std::is_same_v<T, Decimal32>) {
for (size_t value_i = start; value_i < end; ++value_i) {
column_data.emplace_back(*reinterpret_cast<const T*>(concrete_array->Value(value_i)));
}
} else {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
"read_column_from_arrow with type " + column.get_name());
LOG(WARNING) << "Unsuppoted convertion to decimal from " << column.get_name();
}
}

View File

@ -0,0 +1,101 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "arrow_pip_input_stream.h"
#include "arrow/array.h"
#include "arrow/buffer.h"
#include "arrow/io/buffered.h"
#include "arrow/io/stdio.h"
#include "arrow/ipc/options.h"
#include "arrow/ipc/reader.h"
#include "arrow/record_batch.h"
#include "arrow/result.h"
#include "common/logging.h"
#include "io/fs/stream_load_pipe.h"
#include "olap/wal_manager.h"
#include "runtime/runtime_state.h"
namespace doris::vectorized {
ArrowPipInputStream::ArrowPipInputStream(io::FileReaderSPtr file_reader)
: _file_reader(file_reader), _pos(0), _begin(true), _read_buf(new uint8_t[4]) {
set_mode(arrow::io::FileMode::READ);
}
arrow::Status ArrowPipInputStream::Close() {
return arrow::Status::OK();
}
bool ArrowPipInputStream::closed() const {
return false;
}
arrow::Result<int64_t> ArrowPipInputStream::Tell() const {
return _pos;
}
Status ArrowPipInputStream::HasNext(bool* get) {
// 1. Arrow's serialization uses a 4-byte data to specify the length of the data that follows,
// so there must be 4-byte data here.
// 2. If it is not determined whether there is a next batch of data (the data has already been transmitted),
// then the `_file_reader->read_at` will return a buff with a read length of 0,
// and the `RecordBatchStreamReader::Open` function will directly report an error when it gets this buff
Slice file_slice(_read_buf, 4);
size_t read_length = 0;
RETURN_IF_ERROR(_file_reader->read_at(0, file_slice, &read_length, NULL));
if (read_length == 0) {
*get = false;
} else {
*get = true;
}
return Status::OK();
}
arrow::Result<int64_t> ArrowPipInputStream::Read(int64_t nbytes, void* out) {
// RecordBatchStreamReader::Open will create a new reader that will stream a batch of arrow data.
// But the first four bytes of this batch of data were taken by the HasNext function, so they need to be copied back here.
uint8_t* out_ptr = (uint8_t*)out;
if (_begin) {
memmove(out_ptr, _read_buf, 4);
out_ptr += 4;
nbytes -= 4;
}
Slice file_slice(out_ptr, nbytes);
size_t read_length = 0;
Status status = _file_reader->read_at(0, file_slice, &read_length, NULL);
if (UNLIKELY(!status.ok())) {
return arrow::Status::IOError("Error to read data from pip");
}
if (_begin) {
read_length += 4;
_begin = false;
}
return (int64_t)read_length;
}
arrow::Result<std::shared_ptr<arrow::Buffer>> ArrowPipInputStream::Read(int64_t nbytes) {
ARROW_ASSIGN_OR_RAISE(auto buffer, arrow::AllocateResizableBuffer(nbytes));
ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, Read(nbytes, buffer->mutable_data()));
ARROW_RETURN_NOT_OK(buffer->Resize(bytes_read, false));
buffer->ZeroPadding();
return buffer;
}
} // namespace doris::vectorized

View File

@ -0,0 +1,66 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <cstddef>
#include <cstdint>
#include <memory>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <vector>
#include "arrow/io/interfaces.h"
#include "io/file_factory.h"
namespace doris {
namespace io {
class FileSystem;
struct IOContext;
} // namespace io
namespace vectorized {
class ArrowPipInputStream : public arrow::io::InputStream {
ENABLE_FACTORY_CREATOR(ArrowPipInputStream);
public:
ArrowPipInputStream(io::FileReaderSPtr file_reader);
~ArrowPipInputStream() override {}
arrow::Status Close() override;
bool closed() const override;
arrow::Result<int64_t> Tell() const override;
arrow::Result<int64_t> Read(int64_t nbytes, void* out) override;
arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t nbytes) override;
Status HasNext(bool* get);
private:
io::FileReaderSPtr _file_reader;
int64_t _pos;
bool _begin;
uint8_t* _read_buf;
};
} // namespace vectorized
} // namespace doris

View File

@ -0,0 +1,117 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "arrow_stream_reader.h"
#include "arrow/array.h"
#include "arrow/io/buffered.h"
#include "arrow/io/stdio.h"
#include "arrow/ipc/options.h"
#include "arrow/ipc/reader.h"
#include "arrow/record_batch.h"
#include "arrow/result.h"
#include "arrow_pip_input_stream.h"
#include "common/logging.h"
#include "io/fs/stream_load_pipe.h"
#include "olap/wal_manager.h"
#include "runtime/descriptors.h"
#include "runtime/runtime_state.h"
#include "vec/utils/arrow_column_to_doris_column.h"
namespace doris {
class RuntimeProfile;
} // namespace doris
namespace doris::vectorized {
ArrowStreamReader::ArrowStreamReader(RuntimeState* state, RuntimeProfile* profile,
ScannerCounter* counter, const TFileScanRangeParams& params,
const TFileRangeDesc& range,
const std::vector<SlotDescriptor*>& file_slot_descs,
io::IOContext* io_ctx)
: _state(state), _range(range), _file_slot_descs(file_slot_descs), _file_reader(nullptr) {
TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone, _ctzz);
}
ArrowStreamReader::~ArrowStreamReader() = default;
Status ArrowStreamReader::init_reader() {
RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &_file_reader, _state, false));
_pip_stream = ArrowPipInputStream::create_unique(_file_reader);
return Status::OK();
}
Status ArrowStreamReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
bool has_next = false;
RETURN_IF_ERROR(_pip_stream->HasNext(&has_next));
if (!has_next) {
*read_rows = 0;
*eof = true;
return Status::OK();
}
// create a reader to read data
arrow::Result<std::shared_ptr<arrow::ipc::RecordBatchStreamReader>> tRet =
arrow::ipc::RecordBatchStreamReader::Open(_pip_stream.get(),
arrow::ipc::IpcReadOptions::Defaults());
if (!tRet.ok()) {
LOG(WARNING) << "failed to open stream reader: " << tRet.status().message();
return Status::InternalError("failed to open stream reader: {}", tRet.status().message());
}
auto reader = std::move(tRet).ValueUnsafe();
// get arrow data from reader
arrow::Result<arrow::RecordBatchVector> tRet2 = reader->ToRecordBatches();
if (!tRet2.ok()) {
LOG(WARNING) << "failed to read batch: " << tRet2.status().message();
return Status::InternalError("failed to read batch: {}", tRet2.status().message());
}
std::vector<std::shared_ptr<arrow::RecordBatch>> out_batches = std::move(tRet2).ValueUnsafe();
// convert arrow batch to block
auto columns = block->mutate_columns();
int batch_size = out_batches.size();
for (int i = 0; i < batch_size; i++) {
arrow::RecordBatch& batch = *out_batches[i];
int num_rows = batch.num_rows();
int num_columns = batch.num_columns();
for (int c = 0; c < num_columns; ++c) {
arrow::Array* column = batch.column(c).get();
std::string column_name = batch.schema()->field(c)->name();
vectorized::ColumnWithTypeAndName& column_with_name = block->get_by_name(column_name);
column_with_name.type->get_serde()->read_column_from_arrow(
column_with_name.column->assume_mutable_ref(), column, 0, num_rows, _ctzz);
}
*read_rows += batch.num_rows();
}
*eof = (*read_rows == 0);
return Status::OK();
}
Status ArrowStreamReader::get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type,
std::unordered_set<std::string>* missing_cols) {
for (auto& slot : _file_slot_descs) {
name_to_type->emplace(slot->col_name(), slot->type());
}
return Status::OK();
}
} // namespace doris::vectorized

View File

@ -0,0 +1,76 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <cstddef>
#include <cstdint>
#include <memory>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <vector>
#include "arrow_pip_input_stream.h"
#include "cctz/time_zone.h"
#include "common/status.h"
#include "io/file_factory.h"
#include "io/fs/file_reader_writer_fwd.h"
#include "util/slice.h"
#include "vec/data_types/data_type.h"
#include "vec/exec/format/file_reader/new_plain_text_line_reader.h"
#include "vec/exec/format/generic_reader.h"
namespace doris {
namespace io {
class FileSystem;
struct IOContext;
} // namespace io
namespace vectorized {
struct ScannerCounter;
class Block;
class ArrowStreamReader : public GenericReader {
ENABLE_FACTORY_CREATOR(ArrowStreamReader);
public:
ArrowStreamReader(RuntimeState* state, RuntimeProfile* profile, ScannerCounter* counter,
const TFileScanRangeParams& params, const TFileRangeDesc& range,
const std::vector<SlotDescriptor*>& file_slot_descs, io::IOContext* io_ctx);
~ArrowStreamReader() override;
Status init_reader();
Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
Status get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type,
std::unordered_set<std::string>* missing_cols) override;
private:
RuntimeState* _state;
const TFileRangeDesc& _range;
const std::vector<SlotDescriptor*>& _file_slot_descs;
io::FileReaderSPtr _file_reader;
std::unique_ptr<doris::vectorized::ArrowPipInputStream> _pip_stream;
cctz::time_zone _ctzz;
};
} // namespace vectorized
} // namespace doris

View File

@ -53,6 +53,7 @@
#include "vec/data_types/data_type_nullable.h"
#include "vec/data_types/data_type_number.h"
#include "vec/data_types/data_type_string.h"
#include "vec/exec/format/arrow/arrow_stream_reader.h"
#include "vec/exec/format/avro/avro_jni_reader.h"
#include "vec/exec/format/csv/csv_reader.h"
#include "vec/exec/format/json/new_json_reader.h"
@ -448,7 +449,7 @@ Status VFileScanner::_cast_to_input_block(Block* block) {
auto return_type = slot_desc->get_data_type_ptr();
// remove nullable here, let the get_function decide whether nullable
auto data_type = vectorized::DataTypeFactory::instance().create_data_type(
remove_nullable(return_type)->get_type_id());
remove_nullable(return_type)->get_type_as_type_descriptor());
ColumnsWithTypeAndName arguments {
arg, {data_type->create_column(), data_type, slot_desc->col_name()}};
auto func_cast =
@ -873,6 +874,12 @@ Status VFileScanner::_get_next_reader() {
init_status = ((WalReader*)(_cur_reader.get()))->init_reader();
break;
}
case TFileFormatType::FORMAT_ARROW: {
_cur_reader = ArrowStreamReader::create_unique(_state, _profile, &_counter, *_params,
range, _file_slot_descs, _io_ctx.get());
init_status = ((ArrowStreamReader*)(_cur_reader.get()))->init_reader();
break;
}
default:
return Status::InternalError("Not supported file format: {}", _params->format_type);
}

View File

@ -357,6 +357,9 @@ public class DataDescription implements InsertStmt.DataDesc {
case FORMAT_WAL:
this.fileFormat = "wal";
break;
case FORMAT_ARROW:
this.fileFormat = "arrow";
break;
default:
this.fileFormat = "unknown";
break;
@ -1124,6 +1127,7 @@ public class DataDescription implements InsertStmt.DataDesc {
&& !fileFormat.equalsIgnoreCase(FileFormatConstants.FORMAT_ORC)
&& !fileFormat.equalsIgnoreCase(FileFormatConstants.FORMAT_JSON)
&& !fileFormat.equalsIgnoreCase(FileFormatConstants.FORMAT_WAL)
&& !fileFormat.equalsIgnoreCase(FileFormatConstants.FORMAT_ARROW)
&& !fileFormat.equalsIgnoreCase(FileFormatConstants.FORMAT_HIVE_TEXT)) {
throw new AnalysisException("File Format Type " + fileFormat + " is invalid.");
}

View File

@ -33,6 +33,7 @@ public class FileFormatConstants {
public static final String FORMAT_JSON = "json";
public static final String FORMAT_AVRO = "avro";
public static final String FORMAT_WAL = "wal";
public static final String FORMAT_ARROW = "arrow";
public static final String PROP_FORMAT = "format";
public static final String PROP_COLUMN_SEPARATOR = "column_separator";

View File

@ -564,8 +564,10 @@ public class Util {
// TODO: Add TEXTFILE to TFileFormatType to Support hive text file format.
|| lowerFileFormat.equals(FileFormatConstants.FORMAT_HIVE_TEXT)) {
return TFileFormatType.FORMAT_CSV_PLAIN;
} else if (lowerFileFormat.equals("wal")) {
} else if (lowerFileFormat.equals(FileFormatConstants.FORMAT_WAL)) {
return TFileFormatType.FORMAT_WAL;
} else if (lowerFileFormat.equals(FileFormatConstants.FORMAT_ARROW)) {
return TFileFormatType.FORMAT_ARROW;
} else {
return TFileFormatType.FORMAT_UNKNOWN;
}

View File

@ -344,10 +344,12 @@ public class Load {
for (ImportColumnDesc importColumnDesc : copiedColumnExprs) {
columnExprMap.put(importColumnDesc.getColumnName(), importColumnDesc.getExpr());
}
HashMap<String, Type> colToType = new HashMap<>();
// check default value and auto-increment column
for (Column column : tbl.getBaseSchema()) {
String columnName = column.getName();
colToType.put(columnName, column.getType());
if (columnExprMap.containsKey(columnName)) {
continue;
}
@ -427,9 +429,15 @@ public class Load {
exprsByName.put(realColName, expr);
} else {
SlotDescriptor slotDesc = analyzer.getDescTbl().addSlotDescriptor(srcTupleDesc);
// columns default be varchar type
slotDesc.setType(ScalarType.createType(PrimitiveType.VARCHAR));
slotDesc.setColumn(new Column(realColName, PrimitiveType.VARCHAR));
if (formatType == TFileFormatType.FORMAT_ARROW) {
slotDesc.setColumn(new Column(realColName, colToType.get(realColName)));
} else {
// columns default be varchar type
slotDesc.setType(ScalarType.createType(PrimitiveType.VARCHAR));
slotDesc.setColumn(new Column(realColName, PrimitiveType.VARCHAR));
}
// ISSUE A: src slot should be nullable even if the column is not nullable.
// because src slot is what we read from file, not represent to real column value.
// If column is not nullable, error will be thrown when filling the dest slot,

View File

@ -120,6 +120,7 @@ enum TFileFormatType {
FORMAT_CSV_LZ4BLOCK,
FORMAT_CSV_SNAPPYBLOCK,
FORMAT_WAL,
FORMAT_ARROW
}
// In previous versions, the data compression format and file format were stored together, as TFileFormatType,