From 58801c6ab089b4dfdca967b01d6f7bfb8567b2d2 Mon Sep 17 00:00:00 2001 From: ZHAO Chun Date: Tue, 27 Aug 2019 11:30:00 +0800 Subject: [PATCH] Support converting RowBatch and RowBlockV2 to/from Arrow (#1699) --- be/src/exprs/slot_ref.cpp | 18 + be/src/exprs/slot_ref.h | 3 + be/src/olap/rowset/segment_v2/page_builder.h | 1 + .../util => src/runtime}/descriptor_helper.h | 0 be/src/runtime/descriptors.h | 1 + be/src/runtime/row_batch.h | 2 +- be/src/util/CMakeLists.txt | 3 + be/src/util/arrow/row_batch.cpp | 351 ++++++++++++++++++ be/src/util/arrow/row_batch.h | 74 ++++ be/src/util/arrow/row_block.cpp | 323 ++++++++++++++++ be/src/util/arrow/row_block.h | 68 ++++ be/src/util/arrow/utils.cpp | 49 +++ be/src/util/arrow/utils.h | 49 +++ be/src/util/slice.h | 2 - be/test/exec/tablet_info_test.cpp | 2 +- be/test/exec/tablet_sink_test.cpp | 2 +- be/test/olap/delta_writer_test.cpp | 2 +- be/test/runtime/tablet_writer_mgr_test.cpp | 2 +- be/test/util/CMakeLists.txt | 2 + be/test/util/arrow/arrow_row_batch_test.cpp | 97 +++++ be/test/util/arrow/arrow_row_block_test.cpp | 98 +++++ run-ut.sh | 2 + 22 files changed, 1144 insertions(+), 7 deletions(-) rename be/{test/util => src/runtime}/descriptor_helper.h (100%) create mode 100644 be/src/util/arrow/row_batch.cpp create mode 100644 be/src/util/arrow/row_batch.h create mode 100644 be/src/util/arrow/row_block.cpp create mode 100644 be/src/util/arrow/row_block.h create mode 100644 be/src/util/arrow/utils.cpp create mode 100644 be/src/util/arrow/utils.h create mode 100644 be/test/util/arrow/arrow_row_batch_test.cpp create mode 100644 be/test/util/arrow/arrow_row_block_test.cpp diff --git a/be/src/exprs/slot_ref.cpp b/be/src/exprs/slot_ref.cpp index dbc0386d35..c14a42db65 100644 --- a/be/src/exprs/slot_ref.cpp +++ b/be/src/exprs/slot_ref.cpp @@ -69,6 +69,24 @@ SlotRef::SlotRef(const TypeDescriptor& type, int offset) : _slot_id(-1) { } +Status SlotRef::prepare(const SlotDescriptor* slot_desc, + const RowDescriptor& row_desc) { + if (!slot_desc->is_materialized()) { + std::stringstream error; + error << "reference to non-materialized slot. slot_id: " << _slot_id; + return Status::InternalError(error.str()); + } + _tuple_idx = row_desc.get_tuple_idx(slot_desc->parent()); + if (_tuple_idx == RowDescriptor::INVALID_IDX) { + return Status::InternalError("can't support"); + } + _tuple_is_nullable = row_desc.tuple_is_nullable(_tuple_idx); + _slot_offset = slot_desc->tuple_offset(); + _null_indicator_offset = slot_desc->null_indicator_offset(); + _is_nullable = slot_desc->is_nullable(); + return Status::OK(); +} + Status SlotRef::prepare( RuntimeState* state, const RowDescriptor& row_desc, ExprContext* ctx) { DCHECK_EQ(_children.size(), 0); diff --git a/be/src/exprs/slot_ref.h b/be/src/exprs/slot_ref.h index acdecca947..91c708c1d7 100644 --- a/be/src/exprs/slot_ref.h +++ b/be/src/exprs/slot_ref.h @@ -42,6 +42,9 @@ public: // Used for testing. get_value will return tuple + offset interpreted as 'type' SlotRef(const TypeDescriptor& type, int offset); + Status prepare(const SlotDescriptor* slot_desc, + const RowDescriptor& row_desc); + virtual Status prepare( RuntimeState* state, const RowDescriptor& row_desc, ExprContext* ctx); static void* get_value(Expr* expr, TupleRow* row); diff --git a/be/src/olap/rowset/segment_v2/page_builder.h b/be/src/olap/rowset/segment_v2/page_builder.h index df53c35786..4cf2d931e5 100644 --- a/be/src/olap/rowset/segment_v2/page_builder.h +++ b/be/src/olap/rowset/segment_v2/page_builder.h @@ -20,6 +20,7 @@ #include #include +#include "gutil/macros.h" #include "util/slice.h" #include "common/status.h" #include "olap/rowset/segment_v2/common.h" diff --git a/be/test/util/descriptor_helper.h b/be/src/runtime/descriptor_helper.h similarity index 100% rename from be/test/util/descriptor_helper.h rename to be/src/runtime/descriptor_helper.h diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h index e88cfb4478..3a7489fe9c 100644 --- a/be/src/runtime/descriptors.h +++ b/be/src/runtime/descriptors.h @@ -433,6 +433,7 @@ public: // standard copy c'tor, made explicit here RowDescriptor(const RowDescriptor& desc) : _tuple_desc_map(desc._tuple_desc_map), + _tuple_idx_nullable_map(desc._tuple_idx_nullable_map), _tuple_idx_map(desc._tuple_idx_map), _has_varlen_slots(desc._has_varlen_slots) { _num_null_slots = 0; diff --git a/be/src/runtime/row_batch.h b/be/src/runtime/row_batch.h index 93e562bec1..531c6ec830 100644 --- a/be/src/runtime/row_batch.h +++ b/be/src/runtime/row_batch.h @@ -179,7 +179,7 @@ public: // string data). int total_byte_size(); - TupleRow* get_row(int row_idx) { + TupleRow* get_row(int row_idx) const { DCHECK(_tuple_ptrs != NULL); DCHECK_GE(row_idx, 0); //DCHECK_LT(row_idx, _num_rows + (_has_in_flight_row ? 1 : 0)); diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt index 68e3e0b747..4924a46814 100644 --- a/be/src/util/CMakeLists.txt +++ b/be/src/util/CMakeLists.txt @@ -22,6 +22,9 @@ set(LIBRARY_OUTPUT_PATH "${BUILD_DIR}/src/util") set(EXECUTABLE_OUTPUT_PATH "${BUILD_DIR}/src/util") set(UTIL_FILES + arrow/row_batch.cpp + arrow/row_block.cpp + arrow/utils.cpp arena.cpp bfd_parser.cpp bitmap.cpp diff --git a/be/src/util/arrow/row_batch.cpp b/be/src/util/arrow/row_batch.cpp new file mode 100644 index 0000000000..99c23f5295 --- /dev/null +++ b/be/src/util/arrow/row_batch.cpp @@ -0,0 +1,351 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "util/arrow/row_batch.h" + +#include +#include +#include +#include +#include +#include +#include + +#include "exprs/slot_ref.h" +#include "gutil/strings/substitute.h" +#include "runtime/row_batch.h" +#include "runtime/descriptors.h" +#include "runtime/descriptor_helper.h" +#include "util/arrow/utils.h" + +namespace doris { + +using strings::Substitute; + +Status convert_to_arrow_type(const TypeDescriptor& type, + std::shared_ptr* result) { + switch (type.type) { + case TYPE_TINYINT: + *result = arrow::int8(); + break; + case TYPE_SMALLINT: + *result = arrow::int16(); + break; + case TYPE_INT: + *result = arrow::int32(); + break; + case TYPE_BIGINT: + *result = arrow::int64(); + break; + case TYPE_FLOAT: + *result = arrow::float32(); + break; + case TYPE_DOUBLE: + *result = arrow::float64(); + break; + default: + return Status::InvalidArgument(Substitute("Unknown primitive type($0)", type.type)); + } + return Status::OK(); +} + +Status convert_to_arrow_field(SlotDescriptor* desc, + std::shared_ptr* field) { + std::shared_ptr type; + RETURN_IF_ERROR(convert_to_arrow_type(desc->type(), &type)); + *field = arrow::field(desc->col_name(), type, desc->is_nullable()); + return Status::OK(); +} + +Status convert_to_arrow_schema( + const RowDescriptor& row_desc, + std::shared_ptr* result) { + std::vector> fields; + for (auto tuple_desc : row_desc.tuple_descriptors()) { + for (auto desc : tuple_desc->slots()) { + std::shared_ptr field; + RETURN_IF_ERROR(convert_to_arrow_field(desc, &field)); + fields.push_back(field); + } + } + *result = arrow::schema(std::move(fields)); + return Status::OK(); +} + +Status convert_to_doris_type(const arrow::DataType& type, + TSlotDescriptorBuilder* builder) { + switch (type.id()) { + case arrow::Type::INT8: + builder->type(TYPE_TINYINT); + break; + case arrow::Type::INT16: + builder->type(TYPE_SMALLINT); + break; + case arrow::Type::INT32: + builder->type(TYPE_INT); + break; + case arrow::Type::INT64: + builder->type(TYPE_BIGINT); + break; + case arrow::Type::FLOAT: + builder->type(TYPE_FLOAT); + break; + case arrow::Type::DOUBLE: + builder->type(TYPE_DOUBLE); + break; + default: + return Status::InvalidArgument(Substitute("Unknown arrow type id($0)", type.id())); + } + return Status::OK(); +} + +Status convert_to_slot_desc(const arrow::Field& field, + int column_pos, + TSlotDescriptorBuilder* builder) { + RETURN_IF_ERROR(convert_to_doris_type(*field.type(), builder)); + builder->column_name(field.name()).nullable(field.nullable()).column_pos(column_pos); + return Status::OK(); +} + +Status convert_to_row_desc( + ObjectPool* pool, + const arrow::Schema& schema, + RowDescriptor** row_desc) { + TDescriptorTableBuilder builder; + TTupleDescriptorBuilder tuple_builder; + for (int i = 0; i < schema.num_fields(); ++i) { + auto field = schema.field(i); + TSlotDescriptorBuilder slot_builder; + RETURN_IF_ERROR(convert_to_slot_desc(*field, i, &slot_builder)); + tuple_builder.add_slot(slot_builder.build()); + } + tuple_builder.build(&builder); + DescriptorTbl* tbl = nullptr; + RETURN_IF_ERROR(DescriptorTbl::create(pool, builder.desc_tbl(), &tbl)); + auto tuple_desc = tbl->get_tuple_descriptor(0); + *row_desc = pool->add(new RowDescriptor(tuple_desc, false)); + return Status::OK(); +} + +// Convert RowBatch to an Arrow::Array +// We should keep this function to keep compatible with arrow's type visitor +// Now we inherit TypeVisitor to use default Visit implemention +class FromRowBatchConverter : public arrow::TypeVisitor { +public: + FromRowBatchConverter(const RowBatch& batch, + const std::shared_ptr& schema, + arrow::MemoryPool* pool) + : _batch(batch), + _schema(schema), + _pool(pool) { } + + ~FromRowBatchConverter() override { } + + // Use base class function + using arrow::TypeVisitor::Visit; + +#define PRIMITIVE_VISIT(TYPE) \ + arrow::Status Visit(const arrow::TYPE& type) override { \ + return _visit(type); \ + } + + PRIMITIVE_VISIT(Int8Type); + PRIMITIVE_VISIT(Int16Type); + PRIMITIVE_VISIT(Int32Type); + PRIMITIVE_VISIT(Int64Type); + PRIMITIVE_VISIT(FloatType); + PRIMITIVE_VISIT(DoubleType); + +#undef PRIMITIVE_VISIT + + Status convert(std::shared_ptr* out); + +private: + + template + typename std::enable_if::value, + arrow::Status>::type + _visit(const T& type) { + arrow::NumericBuilder builder(_pool); + + size_t num_rows = _batch.num_rows(); + builder.Reserve(num_rows); + for (size_t i = 0; i < num_rows; ++i) { + auto cell_ptr = _cur_slot_ref->get_slot(_batch.get_row(i)); + if (cell_ptr == nullptr) { + ARROW_RETURN_NOT_OK(builder.AppendNull()); + } else { + ARROW_RETURN_NOT_OK(builder.Append(*(typename T::c_type*)cell_ptr)); + } + } + + return builder.Finish(&_arrays[_cur_field_idx]); + } + +private: + const RowBatch& _batch; + const std::shared_ptr& _schema; + arrow::MemoryPool* _pool; + + size_t _cur_field_idx; + std::unique_ptr _cur_slot_ref; + + std::vector> _arrays; +}; + +Status FromRowBatchConverter::convert(std::shared_ptr* out) { + std::vector slot_descs; + for (auto tuple_desc : _batch.row_desc().tuple_descriptors()) { + for (auto desc : tuple_desc->slots()) { + slot_descs.push_back(desc); + } + } + size_t num_fields = _schema->num_fields(); + if (slot_descs.size() != num_fields) { + return Status::InvalidArgument("number fields not match"); + } + + _arrays.resize(num_fields); + + for (size_t idx = 0; idx < num_fields; ++idx) { + _cur_field_idx = idx; + _cur_slot_ref.reset(new SlotRef(slot_descs[idx])); + RETURN_IF_ERROR(_cur_slot_ref->prepare(slot_descs[idx], _batch.row_desc())); + auto arrow_st = arrow::VisitTypeInline(*_schema->field(idx)->type(), this); + if (!arrow_st.ok()) { + return to_status(arrow_st); + } + } + *out = arrow::RecordBatch::Make(_schema, _batch.num_rows(), std::move(_arrays)); + return Status::OK(); +} + +Status convert_to_arrow_batch( + const RowBatch& batch, + const std::shared_ptr& schema, + arrow::MemoryPool* pool, + std::shared_ptr* result) { + FromRowBatchConverter converter(batch, schema, pool); + return converter.convert(result); +} + +// Convert Arrow Array to RowBatch +class ToRowBatchConverter : public arrow::ArrayVisitor { +public: + using arrow::ArrayVisitor::Visit; + + ToRowBatchConverter(const arrow::RecordBatch& batch, + const RowDescriptor& row_desc, + MemTracker* tracker) + : _batch(batch), _row_desc(row_desc), _tracker(tracker) { } + +#define PRIMITIVE_VISIT(TYPE) \ + arrow::Status Visit(const arrow::TYPE& array) override { \ + return _visit(array); \ + } + + PRIMITIVE_VISIT(Int8Array); + PRIMITIVE_VISIT(Int16Array); + PRIMITIVE_VISIT(Int32Array); + PRIMITIVE_VISIT(Int64Array); + PRIMITIVE_VISIT(FloatArray); + PRIMITIVE_VISIT(DoubleArray); + +#undef PRIMITIVE_VISIT + + // Convert to a RowBatch + Status convert(std::shared_ptr* result); + +private: + + template + typename std::enable_if::value, + arrow::Status>::type + _visit(const T& array) { + auto raw_values = array.raw_values(); + for (size_t i = 0; i < array.length(); ++i) { + auto row = _output->get_row(i); + auto tuple = _cur_slot_ref->get_tuple(row); + if (array.IsValid(i)) { + tuple->set_not_null(_cur_slot_ref->null_indicator_offset()); + auto slot = _cur_slot_ref->get_slot(row); + *(typename T::TypeClass::c_type*)slot = raw_values[i]; + } else { + tuple->set_null(_cur_slot_ref->null_indicator_offset()); + } + } + return arrow::Status::OK(); + } + +private: + const arrow::RecordBatch& _batch; + const RowDescriptor& _row_desc; + MemTracker* _tracker; + + std::unique_ptr _cur_slot_ref; + std::shared_ptr _output; +}; + +Status ToRowBatchConverter:: convert(std::shared_ptr* result) { + std::vector slot_descs; + for (auto tuple_desc : _row_desc.tuple_descriptors()) { + for (auto desc : tuple_desc->slots()) { + slot_descs.push_back(desc); + } + } + size_t num_fields = slot_descs.size(); + if (num_fields != _batch.schema()->num_fields()) { + return Status::InvalidArgument("Schema not match"); + } + // TODO(zc): check if field type match + + size_t num_rows = _batch.num_rows(); + _output.reset(new RowBatch(_row_desc, num_rows, _tracker)); + _output->commit_rows(num_rows); + auto pool = _output->tuple_data_pool(); + for (size_t row_id = 0; row_id < num_rows; ++row_id) { + auto row = _output->get_row(row_id); + for (int tuple_id = 0; tuple_id < _row_desc.tuple_descriptors().size(); ++tuple_id) { + auto tuple_desc = _row_desc.tuple_descriptors()[tuple_id]; + auto tuple = pool->allocate(tuple_desc->byte_size()); + row->set_tuple(tuple_id, (Tuple*)tuple); + } + } + for (size_t idx = 0; idx < num_fields; ++idx) { + _cur_slot_ref.reset(new SlotRef(slot_descs[idx])); + RETURN_IF_ERROR(_cur_slot_ref->prepare(slot_descs[idx], _row_desc)); + auto arrow_st = arrow::VisitArrayInline(*_batch.column(idx), this); + if (!arrow_st.ok()) { + return to_status(arrow_st); + } + } + + *result = std::move(_output); + + return Status::OK(); +} + +Status convert_to_row_batch(const arrow::RecordBatch& batch, + const RowDescriptor& row_desc, + MemTracker* tracker, + std::shared_ptr* result) { + ToRowBatchConverter converter(batch, row_desc, tracker); + return converter.convert(result); +} + +} + diff --git a/be/src/util/arrow/row_batch.h b/be/src/util/arrow/row_batch.h new file mode 100644 index 0000000000..23e80fb7ba --- /dev/null +++ b/be/src/util/arrow/row_batch.h @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include "common/status.h" + +// This file will convert Doris RowBatch to/from Arrow's RecordBatch +// RowBatch is used by Doris query engine to exchange data between +// each execute node. + +namespace arrow { + +class MemoryPool; +class RecordBatch; +class Schema; + +} + +namespace doris { + +class MemTracker; +class ObjectPool; +class RowBatch; +class RowDescriptor; + +// Convert Doris RowDescriptor to Arrow Schema. +Status convert_to_arrow_schema( + const RowDescriptor& row_desc, + std::shared_ptr* result); + +// Convert an Arrow Schema to a Doris RowDescriptor which will be add to +// input pool. +// Why we should +Status convert_to_row_desc( + ObjectPool* pool, + const arrow::Schema& schema, + RowDescriptor** row_desc); + +// Converte a Doris RowBatch to an Arrow RecordBatch. A valid Arrow Schema +// who should match RowBatch's schema is given. Memory used by result RecordBatch +// will be allocated from input pool. +Status convert_to_arrow_batch( + const RowBatch& batch, + const std::shared_ptr& schema, + arrow::MemoryPool* pool, + std::shared_ptr* result); + +// Convert an Arrow RecordBatch to a Doris RowBatch. A valid RowDescriptor +// whose schema is the same with RecordBatch's should be given. Memory used +// by result RowBatch will be tracked by tracker. +Status convert_to_row_batch( + const arrow::RecordBatch& batch, + const RowDescriptor& row_desc, + MemTracker* tracker, + std::shared_ptr* result); + +} diff --git a/be/src/util/arrow/row_block.cpp b/be/src/util/arrow/row_block.cpp new file mode 100644 index 0000000000..b47e296aa1 --- /dev/null +++ b/be/src/util/arrow/row_block.cpp @@ -0,0 +1,323 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "util/arrow/row_block.h" + +#include +#include +#include +#include +#include +#include +#include + +#include "gutil/strings/substitute.h" +#include "olap/column_block.h" +#include "olap/field.h" +#include "olap/olap_common.h" +#include "olap/tablet_schema.h" +#include "olap/row_block2.h" +#include "olap/schema.h" +#include "util/arrow/utils.h" + +namespace doris { + +using strings::Substitute; + +Status convert_to_arrow_type(FieldType type, std::shared_ptr *result) { + switch (type) { + case OLAP_FIELD_TYPE_TINYINT: + *result = arrow::int8(); + break; + case OLAP_FIELD_TYPE_SMALLINT: + *result = arrow::int16(); + break; + case OLAP_FIELD_TYPE_INT: + *result = arrow::int32(); + break; + case OLAP_FIELD_TYPE_BIGINT: + *result = arrow::int64(); + break; + case OLAP_FIELD_TYPE_FLOAT: + *result = arrow::float32(); + break; + case OLAP_FIELD_TYPE_DOUBLE: + *result = arrow::float64(); + break; + default: + return Status::InvalidArgument(Substitute("Unknown FieldType($0)", type)); + } + return Status::OK(); +} + +Status convert_to_arrow_field(uint32_t cid, + const Field* field, + std::shared_ptr* result) { + std::shared_ptr type; + RETURN_IF_ERROR(convert_to_arrow_type(field->type(), &type)); + *result = arrow::field(Substitute("Col$0", cid), type, field->is_nullable()); + return Status::OK(); +} + +Status convert_to_arrow_schema(const Schema& schema, + std::shared_ptr* result) { + std::vector> fields; + size_t num_fields = schema.num_column_ids(); + fields.resize(num_fields); + for (int i = 0; i < num_fields; ++i) { + auto cid = schema.column_ids()[i]; + RETURN_IF_ERROR(convert_to_arrow_field(cid, schema.column(cid), &fields[i])); + } + *result = arrow::schema(std::move(fields)); + return Status::OK(); +} + +Status convert_to_type_name(const arrow::DataType& type, + std::string* name) { + switch (type.id()) { + case arrow::Type::INT8: + *name = "TINYINT"; + break; + case arrow::Type::INT16: + *name = "SMALLINT"; + break; + case arrow::Type::INT32: + *name = "INT"; + break; + case arrow::Type::INT64: + *name = "BIGINT"; + break; + case arrow::Type::FLOAT: + *name = "FLOAT"; + break; + case arrow::Type::DOUBLE: + *name = "DOUBLE"; + break; + default: + return Status::InvalidArgument(Substitute("Unknown arrow type id($0)", type.id())); + } + return Status::OK(); +} + +Status convert_to_tablet_column(const arrow::Field& field, + int32_t cid, + TabletColumn* output) { + ColumnPB column_pb; + std::string type_name; + RETURN_IF_ERROR(convert_to_type_name(*field.type(), &type_name)); + + column_pb.set_unique_id(cid); + column_pb.set_name(field.name()); + column_pb.set_type(type_name); + column_pb.set_is_key(true); + column_pb.set_is_nullable(field.nullable()); + + output->init_from_pb(column_pb); + return Status::OK(); +} + +Status convert_to_doris_schema(const arrow::Schema& schema, + std::shared_ptr* result) { + auto num_fields = schema.num_fields(); + std::vector columns(num_fields); + std::vector col_ids(num_fields); + for (int i = 0; i < num_fields; ++i) { + RETURN_IF_ERROR(convert_to_tablet_column(*schema.field(i), i, &columns[i])); + col_ids[i] = i; + } + result->reset(new Schema(columns, col_ids)); + return Status::OK(); +} + +// Convert data in RowBlockV2 to an Arrow RecordBatch +// We should keep this function to keep compatible with arrow's type visitor +// Now we inherit TypeVisitor to use default Visit implemention +class FromRowBlockConverter : public arrow::TypeVisitor { +public: + FromRowBlockConverter(const RowBlockV2& block, + const std::shared_ptr& schema, + arrow::MemoryPool* pool) + : _block(block), + _schema(schema), + _pool(pool) { } + + ~FromRowBlockConverter() override { } + + // Use base class function + using arrow::TypeVisitor::Visit; + +#define PRIMITIVE_VISIT(TYPE) \ + arrow::Status Visit(const arrow::TYPE& type) override { \ + return _visit(type); \ + } + + PRIMITIVE_VISIT(Int8Type); + PRIMITIVE_VISIT(Int16Type); + PRIMITIVE_VISIT(Int32Type); + PRIMITIVE_VISIT(Int64Type); + PRIMITIVE_VISIT(FloatType); + PRIMITIVE_VISIT(DoubleType); + +#undef PRIMITIVE_VISIT + + Status convert(std::shared_ptr* out); + +private: + + template + typename std::enable_if::value, + arrow::Status>::type + _visit(const T& type) { + arrow::NumericBuilder builder(_pool); + size_t num_rows = _block.num_rows(); + builder.Reserve(num_rows); + + auto column_block = _block.column_block(_cur_field_idx); + for (size_t i = 0; i < num_rows; ++i) { + if (column_block.is_null(i)) { + ARROW_RETURN_NOT_OK(builder.AppendNull()); + } else { + auto cell_ptr = column_block.cell_ptr(i); + ARROW_RETURN_NOT_OK(builder.Append(*(typename T::c_type*)cell_ptr)); + } + } + return builder.Finish(&_arrays[_cur_field_idx]); + } + +private: + const RowBlockV2& _block; + std::shared_ptr _schema; + arrow::MemoryPool* _pool; + + size_t _cur_field_idx; + std::vector> _arrays; +}; + +Status FromRowBlockConverter::convert(std::shared_ptr* out) { + size_t num_fields = _schema->num_fields(); + if (num_fields != _block.schema()->num_column_ids()) { + return Status::InvalidArgument("Schema not match"); + } + + _arrays.resize(num_fields); + for (int idx = 0; idx < num_fields; ++idx) { + _cur_field_idx = idx; + auto arrow_st = arrow::VisitTypeInline(*_schema->field(idx)->type(), this); + if (!arrow_st.ok()) { + return to_status(arrow_st); + } + } + *out = arrow::RecordBatch::Make(_schema, _block.num_rows(), std::move(_arrays)); + return Status::OK(); +} + +Status convert_to_arrow_batch(const RowBlockV2& block, + const std::shared_ptr& schema, + arrow::MemoryPool* pool, + std::shared_ptr* result) { + FromRowBlockConverter converter(block, schema, pool); + return converter.convert(result); +} + +// Convert Arrow RecordBatch to Doris RowBlockV2 +class ToRowBlockConverter : public arrow::ArrayVisitor { +public: + ToRowBlockConverter(const arrow::RecordBatch& batch, + const Schema& schema, + Arena* arena) + : _batch(batch), + _schema(schema), + _arena(arena) { } + + ~ToRowBlockConverter() override { } + + using arrow::ArrayVisitor::Visit; + +#define PRIMITIVE_VISIT(TYPE) \ + arrow::Status Visit(const arrow::TYPE& array) override { \ + return _visit(array); \ + } + + PRIMITIVE_VISIT(Int8Array); + PRIMITIVE_VISIT(Int16Array); + PRIMITIVE_VISIT(Int32Array); + PRIMITIVE_VISIT(Int64Array); + PRIMITIVE_VISIT(FloatArray); + PRIMITIVE_VISIT(DoubleArray); + +#undef PRIMITIVE_VISIT + + Status convert(std::shared_ptr* result); + +private: + template + typename std::enable_if::value, + arrow::Status>::type + _visit(const T& array) { + auto raw_values = array.raw_values(); + auto column_block = _output->column_block(_cur_field_idx); + for (size_t idx = 0; idx < array.length(); ++idx) { + if (array.IsValid(idx)) { + auto cell_ptr = column_block.mutable_cell_ptr(idx); + column_block.set_is_null(idx, false); + *(typename T::TypeClass::c_type*)cell_ptr = raw_values[idx]; + } else { + column_block.set_is_null(idx, true); + } + } + return arrow::Status::OK(); + } + +private: + const arrow::RecordBatch& _batch; + const Schema& _schema; + Arena* _arena; + + size_t _cur_field_idx; + + std::shared_ptr _output; +}; + +Status ToRowBlockConverter::convert(std::shared_ptr* result) { + size_t num_fields = _schema.num_column_ids(); + if (_batch.schema()->num_fields() != num_fields) { + return Status::InvalidArgument("Schema not match"); + } + + auto num_rows = _batch.num_rows(); + _output.reset(new RowBlockV2(_schema, num_rows, _arena)); + for (int idx = 0; idx < num_fields; ++idx) { + _cur_field_idx = idx; + auto arrow_st = arrow::VisitArrayInline(*_batch.column(idx), this); + if (!arrow_st.ok()) { + return to_status(arrow_st); + } + } + _output->resize(num_rows); + *result = std::move(_output); + return Status::OK(); +} + +Status convert_to_row_block(const arrow::RecordBatch& batch, + const Schema& schema, + Arena* arena, + std::shared_ptr* result) { + ToRowBlockConverter converter(batch, schema, arena); + return converter.convert(result); +} + +} diff --git a/be/src/util/arrow/row_block.h b/be/src/util/arrow/row_block.h new file mode 100644 index 0000000000..b797651a82 --- /dev/null +++ b/be/src/util/arrow/row_block.h @@ -0,0 +1,68 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include "common/status.h" + +// Convert Doris RowBlockV2 to/from Arrow RecordBatch. +// RowBlockV2 is used in Doris storage engine. + +namespace arrow { + +class Schema; +class MemoryPool; +class RecordBatch; + +} + +namespace doris { + +class Arena; +class RowBlockV2; +class Schema; + +// Convert Doris Schema to Arrow Schema. +Status convert_to_arrow_schema( + const Schema& row_desc, + std::shared_ptr* result); + +// Convert Arrow Schema to Doris Schema. +Status convert_to_doris_schema( + const arrow::Schema& schema, + std::shared_ptr* result); + +// Convert a Doris RowBlockV2 to an Arrow RecordBatch. A valid Arrow Schema +// who should match RowBlockV2's schema is given. Memory used by result RecordBatch +// will be allocated from input pool. +Status convert_to_arrow_batch(const RowBlockV2& block, + const std::shared_ptr& schema, + arrow::MemoryPool* pool, + std::shared_ptr* result); + + +// Convert an Arrow RecordBatch to a Doris RowBlockV2. Schema should match +// with RecordBatch's schema. Memory used by result RowBlockV2 will be +// allocated from arena. +Status convert_to_row_block(const arrow::RecordBatch& batch, + const Schema& schema, + Arena* arena, + std::shared_ptr* result); + +} diff --git a/be/src/util/arrow/utils.cpp b/be/src/util/arrow/utils.cpp new file mode 100644 index 0000000000..2217e350b8 --- /dev/null +++ b/be/src/util/arrow/utils.cpp @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "util/arrow/utils.h" + +#include +#include +#include +#include + +#include "gutil/strings/substitute.h" + +namespace doris { + +using strings::Substitute; + +Status to_status(const arrow::Status& status) { + if (status.ok()) { + return Status::OK(); + } else { + // TODO(zc): convert arrow status to doris status + return Status::InvalidArgument(status.ToString()); + } +} + +Status arrow_pretty_print(const arrow::RecordBatch& rb, std::ostream* os) { + return to_status(arrow::PrettyPrint(rb, 0, os)); +} + +Status arrow_pretty_print(const arrow::Array& arr, std::ostream* os) { + arrow::PrettyPrintOptions opts(4); + return to_status(arrow::PrettyPrint(arr, opts, os)); +} + +} diff --git a/be/src/util/arrow/utils.h b/be/src/util/arrow/utils.h new file mode 100644 index 0000000000..93c7c64665 --- /dev/null +++ b/be/src/util/arrow/utils.h @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +#include "common/status.h" + +// This files contains some utilities to convert Doris internal +// data format into/from Apache Arrow format. When Doris needs +// to share data with others we can leverage Arrow's ability. +// We can convert our internal data into Arrow's memory format +// and then convert to other format, for example Parquet. Because +// Arrow have already implemented this functions. And what's more +// Arrow support multiple languages, so it will make it easy to +// handle Doris data in other languages. + +// Forward declaration of arrow class +namespace arrow { +class Array; +class RecordBatch; +class Status; +} + +namespace doris { + +// Pretty print a arrow RecordBatch. +Status arrow_pretty_print(const arrow::RecordBatch& rb, std::ostream* os); +Status arrow_pretty_print(const arrow::Array& rb, std::ostream* os); + +Status to_status(const arrow::Status& status); + +} diff --git a/be/src/util/slice.h b/be/src/util/slice.h index c5658117cd..05b5125eb8 100644 --- a/be/src/util/slice.h +++ b/be/src/util/slice.h @@ -26,8 +26,6 @@ #include #include -#include "olap/olap_define.h" - namespace doris { class faststring; diff --git a/be/test/exec/tablet_info_test.cpp b/be/test/exec/tablet_info_test.cpp index 3c60e1d506..acd29b40a4 100644 --- a/be/test/exec/tablet_info_test.cpp +++ b/be/test/exec/tablet_info_test.cpp @@ -19,7 +19,7 @@ #include -#include "util/descriptor_helper.h" +#include "runtime/descriptor_helper.h" #include "runtime/mem_tracker.h" #include "runtime/row_batch.h" #include "runtime/tuple_row.h" diff --git a/be/test/exec/tablet_sink_test.cpp b/be/test/exec/tablet_sink_test.cpp index c1bf8c602b..843ef5c228 100644 --- a/be/test/exec/tablet_sink_test.cpp +++ b/be/test/exec/tablet_sink_test.cpp @@ -32,7 +32,7 @@ #include "service/brpc.h" #include "util/brpc_stub_cache.h" #include "util/cpu_info.h" -#include "util/descriptor_helper.h" +#include "runtime/descriptor_helper.h" namespace doris { namespace stream_load { diff --git a/be/test/olap/delta_writer_test.cpp b/be/test/olap/delta_writer_test.cpp index f52a4a0f94..435619e1bc 100644 --- a/be/test/olap/delta_writer_test.cpp +++ b/be/test/olap/delta_writer_test.cpp @@ -29,7 +29,7 @@ #include "olap/tablet.h" #include "olap/utils.h" #include "runtime/tuple.h" -#include "util/descriptor_helper.h" +#include "runtime/descriptor_helper.h" #include "util/logging.h" #include "olap/options.h" #include "olap/tablet_meta_manager.h" diff --git a/be/test/runtime/tablet_writer_mgr_test.cpp b/be/test/runtime/tablet_writer_mgr_test.cpp index 1a1114e041..69fce8f919 100644 --- a/be/test/runtime/tablet_writer_mgr_test.cpp +++ b/be/test/runtime/tablet_writer_mgr_test.cpp @@ -29,7 +29,7 @@ #include "runtime/mem_tracker.h" #include "runtime/row_batch.h" #include "runtime/tuple_row.h" -#include "util/descriptor_helper.h" +#include "runtime/descriptor_helper.h" #include "util/thrift_util.h" #include "olap/delta_writer.h" diff --git a/be/test/util/CMakeLists.txt b/be/test/util/CMakeLists.txt index 4d37abbc1c..4acc439100 100644 --- a/be/test/util/CMakeLists.txt +++ b/be/test/util/CMakeLists.txt @@ -44,3 +44,5 @@ ADD_BE_TEST(faststring_test) ADD_BE_TEST(rle_encoding_test) ADD_BE_TEST(tdigest_test) ADD_BE_TEST(block_compression_test) +ADD_BE_TEST(arrow/arrow_row_block_test) +ADD_BE_TEST(arrow/arrow_row_batch_test) diff --git a/be/test/util/arrow/arrow_row_batch_test.cpp b/be/test/util/arrow/arrow_row_batch_test.cpp new file mode 100644 index 0000000000..293808149c --- /dev/null +++ b/be/test/util/arrow/arrow_row_batch_test.cpp @@ -0,0 +1,97 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "util/arrow/row_batch.h" + +#include + +#include +#include + +#include "common/logging.h" + +#define ARROW_UTIL_LOGGING_H +#include +#include +#include +#include + +#include "common/object_pool.h" +#include "runtime/mem_tracker.h" +#include "runtime/row_batch.h" +#include "util/debug_util.h" + +namespace doris { + +class ArrowRowBatchTest : public testing::Test { +public: + ArrowRowBatchTest() { } + virtual ~ArrowRowBatchTest() { + } +}; + +std::string test_str() { + return R"( + { "c1": 1, "c2": 1.1 } + { "c1": 2, "c2": 2.2 } + { "c1": 3, "c2": 3.3 } + )"; +} + +TEST_F(ArrowRowBatchTest, PrettyPrint) { + auto json = test_str(); + std::shared_ptr buffer; + arrow::json::MakeBuffer(test_str(), &buffer); + arrow::json::ParseOptions parse_opts = arrow::json::ParseOptions::Defaults(); + parse_opts.explicit_schema = arrow::schema( + { + arrow::field("c1", arrow::int64()), + }); + + std::shared_ptr record_batch; + auto arrow_st = arrow::json::ParseOne(parse_opts, buffer, &record_batch); + ASSERT_TRUE(arrow_st.ok()); + + ObjectPool obj_pool; + RowDescriptor* row_desc; + auto doris_st = convert_to_row_desc(&obj_pool, *record_batch->schema(), &row_desc); + ASSERT_TRUE(doris_st.ok()); + MemTracker tracker; + std::shared_ptr row_batch; + doris_st = convert_to_row_batch(*record_batch, *row_desc, &tracker, &row_batch); + ASSERT_TRUE(doris_st.ok()); + + { + std::shared_ptr check_schema; + doris_st = convert_to_arrow_schema(*row_desc, &check_schema); + ASSERT_TRUE(doris_st.ok()); + + arrow::MemoryPool* pool = arrow::default_memory_pool(); + std::shared_ptr check_batch; + doris_st = convert_to_arrow_batch(*row_batch, check_schema, pool, &check_batch); + ASSERT_TRUE(doris_st.ok()); + ASSERT_EQ(3, check_batch->num_rows()); + ASSERT_TRUE(record_batch->Equals(*check_batch)); + } +} + +} + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/be/test/util/arrow/arrow_row_block_test.cpp b/be/test/util/arrow/arrow_row_block_test.cpp new file mode 100644 index 0000000000..41702d04ec --- /dev/null +++ b/be/test/util/arrow/arrow_row_block_test.cpp @@ -0,0 +1,98 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "util/arrow/row_block.h" + +#include +#include + +#include "common/logging.h" + +#define ARROW_UTIL_LOGGING_H +#include +#include +#include +#include +#include +#include + +#include "olap/tablet_schema_helper.h" +#include "util/arena.h" +#include "olap/schema.h" +#include "olap/row_block2.h" + +namespace doris { + +class ArrowRowBlockTest : public testing::Test { +public: + ArrowRowBlockTest() { } + virtual ~ArrowRowBlockTest() { + } +}; + +std::string test_str() { + return R"( + { "c1": 1, "c2": 1.1 } + { "c1": 2, "c2": 2.2 } + { "c1": 3, "c2": 3.3 } + )"; +} + +TEST_F(ArrowRowBlockTest, Normal) { + auto json = test_str(); + std::shared_ptr buffer; + arrow::json::MakeBuffer(test_str(), &buffer); + arrow::json::ParseOptions parse_opts = arrow::json::ParseOptions::Defaults(); + parse_opts.explicit_schema = arrow::schema( + { + arrow::field("c1", arrow::int64()), + }); + + std::shared_ptr record_batch; + auto arrow_st = arrow::json::ParseOne(parse_opts, buffer, &record_batch); + ASSERT_TRUE(arrow_st.ok()); + + std::shared_ptr schema; + auto doris_st = convert_to_doris_schema(*record_batch->schema(), &schema); + ASSERT_TRUE(doris_st.ok()); + + Arena arena; + std::shared_ptr row_block; + doris_st = convert_to_row_block(*record_batch, *schema, &arena, &row_block); + ASSERT_TRUE(doris_st.ok()); + + { + std::shared_ptr check_schema; + doris_st = convert_to_arrow_schema(*schema, &check_schema); + ASSERT_TRUE(doris_st.ok()); + + arrow::MemoryPool* pool = arrow::default_memory_pool(); + std::shared_ptr check_batch; + doris_st = convert_to_arrow_batch(*row_block, check_schema, pool, &check_batch); + ASSERT_TRUE(doris_st.ok()); + ASSERT_EQ(3, check_batch->num_rows()); + ASSERT_TRUE(record_batch->Equals(*check_batch)); + } +} + +} + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + diff --git a/run-ut.sh b/run-ut.sh index dd982e60a5..b34280042a 100755 --- a/run-ut.sh +++ b/run-ut.sh @@ -155,6 +155,8 @@ ${DORIS_TEST_BINARY_DIR}/util/coding_test ${DORIS_TEST_BINARY_DIR}/util/faststring_test ${DORIS_TEST_BINARY_DIR}/util/tdigest_test ${DORIS_TEST_BINARY_DIR}/util/block_compression_test +${DORIS_TEST_BINARY_DIR}/util/arrow/arrow_row_block_test +${DORIS_TEST_BINARY_DIR}/util/arrow/arrow_row_batch_test # Running common Unittest ${DORIS_TEST_BINARY_DIR}/common/resource_tls_test