From 564446e52fa061b828cdb2d75f6d714302cd25dc Mon Sep 17 00:00:00 2001 From: amory Date: Tue, 18 Apr 2023 14:13:56 +0800 Subject: [PATCH] [Refact](type system) refact serde for type system and pb serde impl (#18627) --- be/src/vec/CMakeLists.txt | 13 + .../aggregate_function_rpc.h | 19 +- be/src/vec/common/schema_util.cpp | 5 - be/src/vec/data_types/data_type.h | 4 + be/src/vec/data_types/data_type_array.h | 5 + be/src/vec/data_types/data_type_bitmap.h | 5 + be/src/vec/data_types/data_type_decimal.h | 4 + .../data_type_fixed_length_object.h | 4 + be/src/vec/data_types/data_type_hll.h | 3 + be/src/vec/data_types/data_type_jsonb.h | 3 + be/src/vec/data_types/data_type_map.h | 4 + be/src/vec/data_types/data_type_nothing.h | 3 + be/src/vec/data_types/data_type_nullable.h | 5 + be/src/vec/data_types/data_type_number_base.h | 4 + be/src/vec/data_types/data_type_object.h | 6 + .../vec/data_types/data_type_quantilestate.h | 6 + be/src/vec/data_types/data_type_string.h | 4 + be/src/vec/data_types/data_type_struct.h | 8 + be/src/vec/data_types/data_type_time.h | 3 + be/src/vec/data_types/data_type_time_v2.h | 3 + .../serde/data_type_array_serde.cpp | 24 + .../data_types/serde/data_type_array_serde.h | 39 ++ .../serde/data_type_bitmap_serde.cpp | 52 ++ .../data_types/serde/data_type_bitmap_serde.h | 30 ++ .../serde/data_type_decimal_serde.cpp | 22 + .../serde/data_type_decimal_serde.h | 75 +++ .../data_type_fixedlengthobject_serde.cpp | 22 + .../serde/data_type_fixedlengthobject_serde.h | 34 ++ .../data_types/serde/data_type_hll_serde.cpp | 52 ++ .../data_types/serde/data_type_hll_serde.h | 30 ++ .../data_types/serde/data_type_map_serde.cpp | 22 + .../data_types/serde/data_type_map_serde.h | 41 ++ .../serde/data_type_nullable_serde.cpp | 63 +++ .../serde/data_type_nullable_serde.h | 35 ++ .../serde/data_type_number_serde.cpp | 22 + .../data_types/serde/data_type_number_serde.h | 161 ++++++ .../serde/data_type_object_serde.cpp | 22 + .../data_types/serde/data_type_object_serde.h | 34 ++ .../serde/data_type_quantilestate_serde.cpp | 22 + .../serde/data_type_quantilestate_serde.h | 52 ++ .../vec/data_types/serde/data_type_serde.cpp | 24 + be/src/vec/data_types/serde/data_type_serde.h | 68 +++ .../serde/data_type_string_serde.cpp | 43 ++ .../data_types/serde/data_type_string_serde.h | 30 ++ .../serde/data_type_struct_serde.cpp | 22 + .../data_types/serde/data_type_struct_serde.h | 40 ++ be/src/vec/functions/function_rpc.cpp | 479 +----------------- be/src/vec/functions/function_rpc.h | 11 - be/test/CMakeLists.txt | 1 + .../data_types/serde/data_type_serde_test.cpp | 183 +++++++ 50 files changed, 1361 insertions(+), 505 deletions(-) create mode 100644 be/src/vec/data_types/serde/data_type_array_serde.cpp create mode 100644 be/src/vec/data_types/serde/data_type_array_serde.h create mode 100644 be/src/vec/data_types/serde/data_type_bitmap_serde.cpp create mode 100644 be/src/vec/data_types/serde/data_type_bitmap_serde.h create mode 100644 be/src/vec/data_types/serde/data_type_decimal_serde.cpp create mode 100644 be/src/vec/data_types/serde/data_type_decimal_serde.h create mode 100644 be/src/vec/data_types/serde/data_type_fixedlengthobject_serde.cpp create mode 100644 be/src/vec/data_types/serde/data_type_fixedlengthobject_serde.h create mode 100644 be/src/vec/data_types/serde/data_type_hll_serde.cpp create mode 100644 be/src/vec/data_types/serde/data_type_hll_serde.h create mode 100644 be/src/vec/data_types/serde/data_type_map_serde.cpp create mode 100644 be/src/vec/data_types/serde/data_type_map_serde.h create mode 100644 be/src/vec/data_types/serde/data_type_nullable_serde.cpp create mode 100644 be/src/vec/data_types/serde/data_type_nullable_serde.h create mode 100644 be/src/vec/data_types/serde/data_type_number_serde.cpp create mode 100644 be/src/vec/data_types/serde/data_type_number_serde.h create mode 100644 be/src/vec/data_types/serde/data_type_object_serde.cpp create mode 100644 be/src/vec/data_types/serde/data_type_object_serde.h create mode 100644 be/src/vec/data_types/serde/data_type_quantilestate_serde.cpp create mode 100644 be/src/vec/data_types/serde/data_type_quantilestate_serde.h create mode 100644 be/src/vec/data_types/serde/data_type_serde.cpp create mode 100644 be/src/vec/data_types/serde/data_type_serde.h create mode 100644 be/src/vec/data_types/serde/data_type_string_serde.cpp create mode 100644 be/src/vec/data_types/serde/data_type_string_serde.h create mode 100644 be/src/vec/data_types/serde/data_type_struct_serde.cpp create mode 100644 be/src/vec/data_types/serde/data_type_struct_serde.h create mode 100644 be/test/vec/data_types/serde/data_type_serde_test.cpp diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt index 65ad90c762..ddc74e8ab5 100644 --- a/be/src/vec/CMakeLists.txt +++ b/be/src/vec/CMakeLists.txt @@ -82,6 +82,19 @@ set(VEC_FILES core/field.cpp core/sort_block.cpp core/materialize_block.cpp + data_types/serde/data_type_serde.cpp + data_types/serde/data_type_map_serde.cpp + data_types/serde/data_type_array_serde.cpp + data_types/serde/data_type_struct_serde.cpp + data_types/serde/data_type_number_serde.cpp + data_types/serde/data_type_string_serde.cpp + data_types/serde/data_type_decimal_serde.cpp + data_types/serde/data_type_object_serde.cpp + data_types/serde/data_type_fixedlengthobject_serde.cpp + data_types/serde/data_type_hll_serde.cpp + data_types/serde/data_type_bitmap_serde.cpp + data_types/serde/data_type_quantilestate_serde.cpp + data_types/serde/data_type_nullable_serde.cpp data_types/data_type.cpp data_types/data_type_array.cpp data_types/data_type_struct.cpp diff --git a/be/src/vec/aggregate_functions/aggregate_function_rpc.h b/be/src/vec/aggregate_functions/aggregate_function_rpc.h index 937c936d86..eabfdf2fbc 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_rpc.h +++ b/be/src/vec/aggregate_functions/aggregate_function_rpc.h @@ -151,21 +151,10 @@ public: int end, const DataTypes& argument_types) { for (int i = 0; i < argument_types.size(); i++) { PValues* arg = request.add_args(); - if (auto* nullable = vectorized::check_and_get_column( - *columns[i])) { - auto data_col = nullable->get_nested_column_ptr(); - auto& null_col = nullable->get_null_map_column(); - auto data_type = std::reinterpret_pointer_cast( - argument_types[i]); - data_col->get_data_at(0); - RPCFnImpl::convert_nullable_col_to_pvalue( - data_col->convert_to_full_column_if_const(), data_type->get_nested_type(), - null_col, arg, start, end); - - } else { - RPCFnImpl::convert_col_to_pvalue( - columns[i]->convert_to_full_column_if_const(), argument_types[i], arg, - start, end); + auto data_type = argument_types[i]; + if (auto st = data_type->get_serde()->write_column_to_pb(*columns[i], *arg, start, end); + st != Status::OK()) { + return st; } } return Status::OK(); diff --git a/be/src/vec/common/schema_util.cpp b/be/src/vec/common/schema_util.cpp index 48e13019e2..4afca9097a 100644 --- a/be/src/vec/common/schema_util.cpp +++ b/be/src/vec/common/schema_util.cpp @@ -20,23 +20,18 @@ #include #include #include -#include #include -#include #include #include -#include "common/compiler_util.h" #include "gen_cpp/FrontendService.h" #include "gen_cpp/HeartbeatService_types.h" #include "olap/rowset/rowset_writer_context.h" #include "runtime/client_cache.h" -#include "runtime/descriptors.h" #include "runtime/exec_env.h" #include "util/thrift_rpc_helper.h" #include "vec/columns/column.h" -#include "vec/columns/columns_number.h" #include "vec/core/types.h" #include "vec/data_types/data_type.h" #include "vec/data_types/data_type_nullable.h" diff --git a/be/src/vec/data_types/data_type.h b/be/src/vec/data_types/data_type.h index 724b8556d7..d05560bd5c 100644 --- a/be/src/vec/data_types/data_type.h +++ b/be/src/vec/data_types/data_type.h @@ -28,6 +28,7 @@ #include "vec/common/cow.h" #include "vec/common/string_buffer.hpp" #include "vec/core/types.h" +#include "vec/data_types/serde/data_type_serde.h" #include "vec/io/reader_buffer.h" namespace doris { @@ -75,6 +76,9 @@ public: virtual std::string to_string(const IColumn& column, size_t row_num) const; virtual Status from_string(ReadBuffer& rb, IColumn* column) const; + // get specific serializer or deserializer + virtual DataTypeSerDeSPtr get_serde() const = 0; + protected: virtual String do_get_name() const; diff --git a/be/src/vec/data_types/data_type_array.h b/be/src/vec/data_types/data_type_array.h index a01e96b460..47a23320c3 100644 --- a/be/src/vec/data_types/data_type_array.h +++ b/be/src/vec/data_types/data_type_array.h @@ -22,6 +22,7 @@ #include +#include "serde/data_type_array_serde.h" #include "vec/data_types/data_type.h" namespace doris::vectorized { @@ -89,6 +90,10 @@ public: std::string to_string(const IColumn& column, size_t row_num) const override; void to_string(const IColumn& column, size_t row_num, BufferWritable& ostr) const override; Status from_string(ReadBuffer& rb, IColumn* column) const override; + + DataTypeSerDeSPtr get_serde() const override { + return std::make_shared(nested->get_serde()); + }; }; } // namespace doris::vectorized diff --git a/be/src/vec/data_types/data_type_bitmap.h b/be/src/vec/data_types/data_type_bitmap.h index bba593eb30..8946c2fec6 100644 --- a/be/src/vec/data_types/data_type_bitmap.h +++ b/be/src/vec/data_types/data_type_bitmap.h @@ -18,6 +18,7 @@ #pragma once #include +#include "serde/data_type_bitmap_serde.h" #include "util/bitmap_value.h" #include "vec/columns/column.h" #include "vec/columns/column_complex.h" @@ -87,6 +88,10 @@ public: static void serialize_as_stream(const BitmapValue& value, BufferWritable& buf); static void deserialize_as_stream(BitmapValue& value, BufferReadable& buf); + + DataTypeSerDeSPtr get_serde() const override { + return std::make_shared(); + }; }; } // namespace doris::vectorized diff --git a/be/src/vec/data_types/data_type_decimal.h b/be/src/vec/data_types/data_type_decimal.h index ec207055a9..d4f9690c69 100644 --- a/be/src/vec/data_types/data_type_decimal.h +++ b/be/src/vec/data_types/data_type_decimal.h @@ -24,6 +24,7 @@ #include "common/config.h" #include "runtime/define_primitive_type.h" +#include "serde/data_type_decimal_serde.h" #include "vec/columns/column_decimal.h" #include "vec/common/arithmetic_overflow.h" #include "vec/common/typeid_cast.h" @@ -198,6 +199,9 @@ public: std::string to_string(const IColumn& column, size_t row_num) const override; void to_string(const IColumn& column, size_t row_num, BufferWritable& ostr) const override; Status from_string(ReadBuffer& rb, IColumn* column) const override; + DataTypeSerDeSPtr get_serde() const override { + return std::make_shared>(); + }; /// Decimal specific diff --git a/be/src/vec/data_types/data_type_fixed_length_object.h b/be/src/vec/data_types/data_type_fixed_length_object.h index dec11b5959..65fc01f137 100644 --- a/be/src/vec/data_types/data_type_fixed_length_object.h +++ b/be/src/vec/data_types/data_type_fixed_length_object.h @@ -17,6 +17,7 @@ #pragma once +#include "serde/data_type_fixedlengthobject_serde.h" #include "vec/columns/column_fixed_length_object.h" #include "vec/core/types.h" #include "vec/data_types/data_type.h" @@ -61,6 +62,9 @@ public: bool is_categorial() const override { return is_value_represented_by_integer(); } bool can_be_inside_low_cardinality() const override { return false; } + DataTypeSerDeSPtr get_serde() const override { + return std::make_shared(); + }; }; } // namespace doris::vectorized diff --git a/be/src/vec/data_types/data_type_hll.h b/be/src/vec/data_types/data_type_hll.h index 8b7e7bf1be..8d7d77a79d 100644 --- a/be/src/vec/data_types/data_type_hll.h +++ b/be/src/vec/data_types/data_type_hll.h @@ -17,6 +17,7 @@ #pragma once #include "olap/hll.h" +#include "serde/data_type_hll_serde.h" #include "vec/columns/column.h" #include "vec/columns/column_complex.h" #include "vec/core/types.h" @@ -82,6 +83,8 @@ public: static void serialize_as_stream(const HyperLogLog& value, BufferWritable& buf); static void deserialize_as_stream(HyperLogLog& value, BufferReadable& buf); + + DataTypeSerDeSPtr get_serde() const override { return std::make_shared(); }; }; } // namespace doris::vectorized diff --git a/be/src/vec/data_types/data_type_jsonb.h b/be/src/vec/data_types/data_type_jsonb.h index df57a5e562..c6460a2f36 100644 --- a/be/src/vec/data_types/data_type_jsonb.h +++ b/be/src/vec/data_types/data_type_jsonb.h @@ -66,6 +66,9 @@ public: std::string to_string(const IColumn& column, size_t row_num) const override; void to_string(const IColumn& column, size_t row_num, BufferWritable& ostr) const override; Status from_string(ReadBuffer& rb, IColumn* column) const override; + DataTypeSerDeSPtr get_serde() const override { + return std::make_shared(); + }; private: DataTypeString data_type_string; diff --git a/be/src/vec/data_types/data_type_map.h b/be/src/vec/data_types/data_type_map.h index 1e902961b5..db8dadff8f 100644 --- a/be/src/vec/data_types/data_type_map.h +++ b/be/src/vec/data_types/data_type_map.h @@ -21,6 +21,7 @@ #pragma once #include "gen_cpp/data.pb.h" +#include "serde/data_type_map_serde.h" #include "util/stack_util.h" #include "vec/columns/column_array.h" #include "vec/columns/column_map.h" @@ -79,6 +80,9 @@ public: std::string to_string(const IColumn& column, size_t row_num) const override; void to_string(const IColumn& column, size_t row_num, BufferWritable& ostr) const override; Status from_string(ReadBuffer& rb, IColumn* column) const override; + DataTypeSerDeSPtr get_serde() const override { + return std::make_shared(key_type->get_serde(), value_type->get_serde()); + }; }; } // namespace doris::vectorized diff --git a/be/src/vec/data_types/data_type_nothing.h b/be/src/vec/data_types/data_type_nothing.h index 2b150991f1..de27f62810 100644 --- a/be/src/vec/data_types/data_type_nothing.h +++ b/be/src/vec/data_types/data_type_nothing.h @@ -68,6 +68,9 @@ public: bool have_subtypes() const override { return false; } bool cannot_be_stored_in_tables() const override { return true; } + DataTypeSerDeSPtr get_serde() const override { + LOG(FATAL) << get_name() << " not support serde"; + }; }; } // namespace doris::vectorized diff --git a/be/src/vec/data_types/data_type_nullable.h b/be/src/vec/data_types/data_type_nullable.h index c1c920e068..57d24c10b0 100644 --- a/be/src/vec/data_types/data_type_nullable.h +++ b/be/src/vec/data_types/data_type_nullable.h @@ -21,6 +21,7 @@ #pragma once #include "vec/data_types/data_type.h" +#include "vec/data_types/serde/data_type_nullable_serde.h" namespace doris::vectorized { @@ -98,6 +99,10 @@ public: const DataTypePtr& get_nested_type() const { return nested_data_type; } bool is_null_literal() const override { return nested_data_type->is_null_literal(); } + DataTypeSerDeSPtr get_serde() const override { + return std::make_shared(nested_data_type->get_serde()); + } + private: DataTypePtr nested_data_type; }; diff --git a/be/src/vec/data_types/data_type_number_base.h b/be/src/vec/data_types/data_type_number_base.h index da12e9a82c..4e9b3b6a37 100644 --- a/be/src/vec/data_types/data_type_number_base.h +++ b/be/src/vec/data_types/data_type_number_base.h @@ -22,6 +22,7 @@ #include +#include "serde/data_type_number_serde.h" #include "vec/columns/column_vector.h" #include "vec/core/types.h" #include "vec/data_types/data_type.h" @@ -119,6 +120,9 @@ public: Status from_string(ReadBuffer& rb, IColumn* column) const override; bool is_null_literal() const override { return _is_null_literal; } void set_null_literal(bool flag) { _is_null_literal = flag; } + DataTypeSerDeSPtr get_serde() const override { + return std::make_shared>(); + }; private: bool _is_null_literal = false; diff --git a/be/src/vec/data_types/data_type_object.h b/be/src/vec/data_types/data_type_object.h index 99e1e5f3f9..202a1fd79f 100644 --- a/be/src/vec/data_types/data_type_object.h +++ b/be/src/vec/data_types/data_type_object.h @@ -22,6 +22,9 @@ #include #include #include + +#include "serde/data_type_object_serde.h" + namespace doris::vectorized { class DataTypeObject : public IDataType { private: @@ -54,5 +57,8 @@ public: [[noreturn]] Field get_default() const override { LOG(FATAL) << "Method getDefault() is not implemented for data type " << get_name(); } + DataTypeSerDeSPtr get_serde() const override { + return std::make_shared(); + }; }; } // namespace doris::vectorized diff --git a/be/src/vec/data_types/data_type_quantilestate.h b/be/src/vec/data_types/data_type_quantilestate.h index 531f8c697f..9659e65822 100644 --- a/be/src/vec/data_types/data_type_quantilestate.h +++ b/be/src/vec/data_types/data_type_quantilestate.h @@ -16,11 +16,14 @@ // under the License. #pragma once + +#include "serde/data_type_quantilestate_serde.h" #include "util/quantile_state.h" #include "vec/columns/column.h" #include "vec/columns/column_complex.h" #include "vec/core/types.h" #include "vec/data_types/data_type.h" + namespace doris::vectorized { template class DataTypeQuantileState : public IDataType { @@ -82,6 +85,9 @@ public: static void serialize_as_stream(const QuantileState& value, BufferWritable& buf); static void deserialize_as_stream(QuantileState& value, BufferReadable& buf); + DataTypeSerDeSPtr get_serde() const override { + return std::make_shared>(); + }; }; using DataTypeQuantileStateDouble = DataTypeQuantileState; diff --git a/be/src/vec/data_types/data_type_string.h b/be/src/vec/data_types/data_type_string.h index efd760187b..ea400b1438 100644 --- a/be/src/vec/data_types/data_type_string.h +++ b/be/src/vec/data_types/data_type_string.h @@ -22,6 +22,7 @@ #include +#include "serde/data_type_string_serde.h" #include "vec/data_types/data_type.h" namespace doris::vectorized { @@ -65,6 +66,9 @@ public: std::string to_string(const IColumn& column, size_t row_num) const override; void to_string(const IColumn& column, size_t row_num, BufferWritable& ostr) const override; Status from_string(ReadBuffer& rb, IColumn* column) const override; + DataTypeSerDeSPtr get_serde() const override { + return std::make_shared(); + }; }; } // namespace doris::vectorized diff --git a/be/src/vec/data_types/data_type_struct.h b/be/src/vec/data_types/data_type_struct.h index 6de9f7d07e..65f2131ac7 100644 --- a/be/src/vec/data_types/data_type_struct.h +++ b/be/src/vec/data_types/data_type_struct.h @@ -24,6 +24,7 @@ #include #include "gen_cpp/data.pb.h" +#include "serde/data_type_struct_serde.h" #include "util/stack_util.h" #include "vec/columns/column_array.h" #include "vec/columns/column_nullable.h" @@ -99,6 +100,13 @@ public: std::string to_string(const IColumn& column, size_t row_num) const override; void to_string(const IColumn& column, size_t row_num, BufferWritable& ostr) const override; bool get_have_explicit_names() const { return have_explicit_names; } + DataTypeSerDeSPtr get_serde() const override { + DataTypeSerDeSPtrs ptrs; + for (auto iter = elems.begin(); iter < elems.end(); ++iter) { + ptrs.push_back((*iter)->get_serde()); + } + return std::make_shared(ptrs); + }; }; } // namespace doris::vectorized diff --git a/be/src/vec/data_types/data_type_time.h b/be/src/vec/data_types/data_type_time.h index 9c398842e4..64c06c073d 100644 --- a/be/src/vec/data_types/data_type_time.h +++ b/be/src/vec/data_types/data_type_time.h @@ -52,6 +52,9 @@ public: using PromotedType = DataTypeNumber>; return std::make_shared(); } + DataTypeSerDeSPtr get_serde() const override { + return std::make_shared>(); + }; }; } // namespace doris::vectorized diff --git a/be/src/vec/data_types/data_type_time_v2.h b/be/src/vec/data_types/data_type_time_v2.h index 97928a9b23..a60547b562 100644 --- a/be/src/vec/data_types/data_type_time_v2.h +++ b/be/src/vec/data_types/data_type_time_v2.h @@ -85,6 +85,9 @@ public: std::string to_string(const IColumn& column, size_t row_num) const override; void to_string(const IColumn& column, size_t row_num, BufferWritable& ostr) const override; Status from_string(ReadBuffer& rb, IColumn* column) const override; + DataTypeSerDeSPtr get_serde() const override { + return std::make_shared>(); + }; MutableColumnPtr create_column() const override; diff --git a/be/src/vec/data_types/serde/data_type_array_serde.cpp b/be/src/vec/data_types/serde/data_type_array_serde.cpp new file mode 100644 index 0000000000..46029ac5d2 --- /dev/null +++ b/be/src/vec/data_types/serde/data_type_array_serde.cpp @@ -0,0 +1,24 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "data_type_array_serde.h" + +#include "vec/columns/column_array.cpp" +namespace doris { + +namespace vectorized {} +} // namespace doris \ No newline at end of file diff --git a/be/src/vec/data_types/serde/data_type_array_serde.h b/be/src/vec/data_types/serde/data_type_array_serde.h new file mode 100644 index 0000000000..d7b4e2fc62 --- /dev/null +++ b/be/src/vec/data_types/serde/data_type_array_serde.h @@ -0,0 +1,39 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "data_type_serde.h" +namespace doris { + +namespace vectorized { + +class DataTypeArraySerDe : public DataTypeSerDe { +public: + DataTypeArraySerDe(const DataTypeSerDeSPtr& _nested_serde) : nested_serde(_nested_serde) {} + + Status write_column_to_pb(const IColumn& column, PValues& result, int start, + int end) const override { + LOG(FATAL) << "Not support write array column to pb"; + } + Status read_column_from_pb(IColumn& column, const PValues& arg) const override { + LOG(FATAL) << "Not support read from pb to array"; + } + +private: + DataTypeSerDeSPtr nested_serde; +}; +} // namespace vectorized +} // namespace doris diff --git a/be/src/vec/data_types/serde/data_type_bitmap_serde.cpp b/be/src/vec/data_types/serde/data_type_bitmap_serde.cpp new file mode 100644 index 0000000000..da0f2ed3ea --- /dev/null +++ b/be/src/vec/data_types/serde/data_type_bitmap_serde.cpp @@ -0,0 +1,52 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "data_type_bitmap_serde.h" + +#include "vec/columns/column_complex.h" +#include "vec/data_types/data_type.h" + +namespace doris { + +namespace vectorized { +Status DataTypeBitMapSerDe::write_column_to_pb(const IColumn& column, PValues& result, int start, + int end) const { + auto ptype = result.mutable_type(); + ptype->set_id(PGenericType::BITMAP); + auto& data_column = assert_cast(column); + int row_count = end - start; + result.mutable_bytes_value()->Reserve(row_count); + for (int row = start; row < end; ++row) { + auto& value = const_cast(data_column.get_element(row)); + std::string memory_buffer; + int bytesize = value.getSizeInBytes(); + memory_buffer.resize(bytesize); + value.write_to(const_cast(memory_buffer.data())); + result.add_bytes_value(memory_buffer); + } + return Status::OK(); +} +Status DataTypeBitMapSerDe::read_column_from_pb(IColumn& column, const PValues& arg) const { + auto& col = reinterpret_cast(column); + for (int i = 0; i < arg.bytes_value_size(); ++i) { + BitmapValue value(arg.bytes_value(i).data()); + col.insert_value(value); + } + return Status::OK(); +} +} // namespace vectorized +} // namespace doris diff --git a/be/src/vec/data_types/serde/data_type_bitmap_serde.h b/be/src/vec/data_types/serde/data_type_bitmap_serde.h new file mode 100644 index 0000000000..d37575a16c --- /dev/null +++ b/be/src/vec/data_types/serde/data_type_bitmap_serde.h @@ -0,0 +1,30 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "data_type_serde.h" +namespace doris { + +namespace vectorized { + +class DataTypeBitMapSerDe : public DataTypeSerDe { +public: + Status write_column_to_pb(const IColumn& column, PValues& result, int start, + int end) const override; + Status read_column_from_pb(IColumn& column, const PValues& arg) const override; +}; +} // namespace vectorized +} // namespace doris diff --git a/be/src/vec/data_types/serde/data_type_decimal_serde.cpp b/be/src/vec/data_types/serde/data_type_decimal_serde.cpp new file mode 100644 index 0000000000..6e6fa5f4d3 --- /dev/null +++ b/be/src/vec/data_types/serde/data_type_decimal_serde.cpp @@ -0,0 +1,22 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "data_type_decimal_serde.h" +namespace doris { + +namespace vectorized {} // namespace vectorized +} // namespace doris \ No newline at end of file diff --git a/be/src/vec/data_types/serde/data_type_decimal_serde.h b/be/src/vec/data_types/serde/data_type_decimal_serde.h new file mode 100644 index 0000000000..4cd5ee484f --- /dev/null +++ b/be/src/vec/data_types/serde/data_type_decimal_serde.h @@ -0,0 +1,75 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "data_type_serde.h" +#include "vec/columns/column_decimal.h" + +namespace doris { + +namespace vectorized { + +template +class DataTypeDecimalSerDe : public DataTypeSerDe { + static_assert(IsDecimalNumber); + +public: + Status write_column_to_pb(const IColumn& column, PValues& result, int start, + int end) const override; + Status read_column_from_pb(IColumn& column, const PValues& arg) const override; +}; + +template +Status DataTypeDecimalSerDe::write_column_to_pb(const IColumn& column, PValues& result, + int start, int end) const { + int row_count = end - start; + const auto* col = check_and_get_column>(column); + auto ptype = result.mutable_type(); + if constexpr (std::is_same_v>) { + ptype->set_id(PGenericType::DECIMAL128); + } else if constexpr (std::is_same_v>) { + ptype->set_id(PGenericType::DECIMAL128I); + } else if constexpr (std::is_same_v>) { + ptype->set_id(PGenericType::INT32); + } else if constexpr (std::is_same_v>) { + ptype->set_id(PGenericType::INT64); + } else { + return Status::NotSupported("unknown ColumnType for writing to pb"); + } + result.mutable_bytes_value()->Reserve(row_count); + for (size_t row_num = start; row_num < end; ++row_num) { + StringRef single_data = col->get_data_at(row_num); + result.add_bytes_value(single_data.data, single_data.size); + } + return Status::OK(); +} + +template +Status DataTypeDecimalSerDe::read_column_from_pb(IColumn& column, const PValues& arg) const { + if constexpr (std::is_same_v> || std::is_same_v> || + std::is_same_v> || std::is_same_v>) { + column.resize(arg.bytes_value_size()); + auto& data = reinterpret_cast&>(column).get_data(); + for (int i = 0; i < arg.bytes_value_size(); ++i) { + data[i] = *(int128_t*)(arg.bytes_value(i).c_str()); + } + return Status::OK(); + } + + return Status::NotSupported("unknown ColumnType for reading from pb"); +} +} // namespace vectorized +} // namespace doris diff --git a/be/src/vec/data_types/serde/data_type_fixedlengthobject_serde.cpp b/be/src/vec/data_types/serde/data_type_fixedlengthobject_serde.cpp new file mode 100644 index 0000000000..84a89ba3f2 --- /dev/null +++ b/be/src/vec/data_types/serde/data_type_fixedlengthobject_serde.cpp @@ -0,0 +1,22 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#include "data_type_fixedlengthobject_serde.h" + +namespace doris { + +namespace vectorized {} +} // namespace doris diff --git a/be/src/vec/data_types/serde/data_type_fixedlengthobject_serde.h b/be/src/vec/data_types/serde/data_type_fixedlengthobject_serde.h new file mode 100644 index 0000000000..61244184ca --- /dev/null +++ b/be/src/vec/data_types/serde/data_type_fixedlengthobject_serde.h @@ -0,0 +1,34 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "data_type_serde.h" +namespace doris { + +namespace vectorized { + +class DataTypeFixedLengthObjectSerDe : public DataTypeSerDe { +public: + Status write_column_to_pb(const IColumn& column, PValues& result, int start, + int end) const override { + LOG(FATAL) << "Not support write FixedLengthObject column to pb"; + } + Status read_column_from_pb(IColumn& column, const PValues& arg) const override { + LOG(FATAL) << "Not support read from pb to FixedLengthObject"; + }; +}; +} // namespace vectorized +} // namespace doris diff --git a/be/src/vec/data_types/serde/data_type_hll_serde.cpp b/be/src/vec/data_types/serde/data_type_hll_serde.cpp new file mode 100644 index 0000000000..880818b830 --- /dev/null +++ b/be/src/vec/data_types/serde/data_type_hll_serde.cpp @@ -0,0 +1,52 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "data_type_hll_serde.h" + +#include "vec/columns/column_complex.h" + +namespace doris { + +namespace vectorized { + +Status DataTypeHLLSerDe::write_column_to_pb(const IColumn& column, PValues& result, int start, + int end) const { + auto ptype = result.mutable_type(); + ptype->set_id(PGenericType::HLL); + auto& data_column = assert_cast(column); + int row_count = end - start; + result.mutable_bytes_value()->Reserve(row_count); + for (size_t row_num = start; row_num < end; ++row_num) { + auto& value = const_cast(data_column.get_element(row_num)); + std::string memory_buffer(value.max_serialized_size(), '0'); + value.serialize((uint8_t*)memory_buffer.data()); + result.add_bytes_value(memory_buffer.data(), memory_buffer.size()); + } + return Status::OK(); +} +Status DataTypeHLLSerDe::read_column_from_pb(IColumn& column, const PValues& arg) const { + auto& col = reinterpret_cast(column); + for (int i = 0; i < arg.bytes_value_size(); ++i) { + HyperLogLog value; + value.deserialize(Slice(arg.bytes_value(i))); + col.insert_value(value); + } + return Status::OK(); +} + +} // namespace vectorized +} // namespace doris \ No newline at end of file diff --git a/be/src/vec/data_types/serde/data_type_hll_serde.h b/be/src/vec/data_types/serde/data_type_hll_serde.h new file mode 100644 index 0000000000..6a84e394a2 --- /dev/null +++ b/be/src/vec/data_types/serde/data_type_hll_serde.h @@ -0,0 +1,30 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "data_type_serde.h" +namespace doris { + +namespace vectorized { + +class DataTypeHLLSerDe : public DataTypeSerDe { +public: + Status write_column_to_pb(const IColumn& column, PValues& result, int start, + int end) const override; + Status read_column_from_pb(IColumn& column, const PValues& arg) const override; +}; +} // namespace vectorized +} // namespace doris diff --git a/be/src/vec/data_types/serde/data_type_map_serde.cpp b/be/src/vec/data_types/serde/data_type_map_serde.cpp new file mode 100644 index 0000000000..4a26c9d0d1 --- /dev/null +++ b/be/src/vec/data_types/serde/data_type_map_serde.cpp @@ -0,0 +1,22 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "data_type_map_serde.h" + +namespace doris { +namespace vectorized {} +} // namespace doris diff --git a/be/src/vec/data_types/serde/data_type_map_serde.h b/be/src/vec/data_types/serde/data_type_map_serde.h new file mode 100644 index 0000000000..2b105fec61 --- /dev/null +++ b/be/src/vec/data_types/serde/data_type_map_serde.h @@ -0,0 +1,41 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "data_type_serde.h" +namespace doris { + +namespace vectorized { + +class DataTypeMapSerDe : public DataTypeSerDe { +public: + DataTypeMapSerDe(const DataTypeSerDeSPtr& _key_serde, const DataTypeSerDeSPtr& _value_serde) + : key_serde(_key_serde), value_serde(_value_serde) {} + + Status write_column_to_pb(const IColumn& column, PValues& result, int start, + int end) const override { + LOG(FATAL) << "Not support write map column to pb"; + } + Status read_column_from_pb(IColumn& column, const PValues& arg) const override { + LOG(FATAL) << "Not support read from pb to map"; + } + +private: + DataTypeSerDeSPtr key_serde; + DataTypeSerDeSPtr value_serde; +}; +} // namespace vectorized +} // namespace doris diff --git a/be/src/vec/data_types/serde/data_type_nullable_serde.cpp b/be/src/vec/data_types/serde/data_type_nullable_serde.cpp new file mode 100644 index 0000000000..e43cf818f2 --- /dev/null +++ b/be/src/vec/data_types/serde/data_type_nullable_serde.cpp @@ -0,0 +1,63 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "data_type_nullable_serde.h" + +#include "vec/columns/column_nullable.h" + +namespace doris { + +namespace vectorized { + +Status DataTypeNullableSerDe::write_column_to_pb(const IColumn& column, PValues& result, int start, + int end) const { + int row_count = end - start; + auto& nullable_col = assert_cast(column); + auto& null_col = nullable_col.get_null_map_column(); + if (nullable_col.has_null(row_count)) { + result.set_has_null(true); + auto* null_map = result.mutable_null_map(); + null_map->Reserve(row_count); + const auto* col = check_and_get_column(null_col); + auto& data = col->get_data(); + null_map->Add(data.begin() + start, data.begin() + end); + } + return nested_serde->write_column_to_pb(nullable_col.get_nested_column(), result, start, end); +} + +// read from PValues to column +Status DataTypeNullableSerDe::read_column_from_pb(IColumn& column, const PValues& arg) const { + auto& col = reinterpret_cast(column); + auto& null_map_data = col.get_null_map_data(); + auto& nested = col.get_nested_column(); + if (Status st = nested_serde->read_column_from_pb(nested, arg); st != Status::OK()) { + return st; + } + null_map_data.resize(nested.size()); + if (arg.has_null()) { + for (int i = 0; i < arg.null_map_size(); ++i) { + null_map_data[i] = arg.null_map(i); + } + } else { + for (int i = 0; i < nested.size(); ++i) { + null_map_data[i] = false; + } + } + return Status::OK(); +} +} // namespace vectorized +} // namespace doris diff --git a/be/src/vec/data_types/serde/data_type_nullable_serde.h b/be/src/vec/data_types/serde/data_type_nullable_serde.h new file mode 100644 index 0000000000..0c9666fbf8 --- /dev/null +++ b/be/src/vec/data_types/serde/data_type_nullable_serde.h @@ -0,0 +1,35 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "data_type_serde.h" +namespace doris { + +namespace vectorized { + +class DataTypeNullableSerDe : public DataTypeSerDe { +public: + DataTypeNullableSerDe(const DataTypeSerDeSPtr& _nested_serde) : nested_serde(_nested_serde) {} + + Status write_column_to_pb(const IColumn& column, PValues& result, int start, + int end) const override; + Status read_column_from_pb(IColumn& column, const PValues& arg) const override; + +private: + DataTypeSerDeSPtr nested_serde; +}; +} // namespace vectorized +} // namespace doris diff --git a/be/src/vec/data_types/serde/data_type_number_serde.cpp b/be/src/vec/data_types/serde/data_type_number_serde.cpp new file mode 100644 index 0000000000..f6667e8930 --- /dev/null +++ b/be/src/vec/data_types/serde/data_type_number_serde.cpp @@ -0,0 +1,22 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "data_type_number_serde.h" + +namespace doris { +namespace vectorized {} // namespace vectorized +} // namespace doris \ No newline at end of file diff --git a/be/src/vec/data_types/serde/data_type_number_serde.h b/be/src/vec/data_types/serde/data_type_number_serde.h new file mode 100644 index 0000000000..c5d63fa1c5 --- /dev/null +++ b/be/src/vec/data_types/serde/data_type_number_serde.h @@ -0,0 +1,161 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "data_type_serde.h" +#include "vec/columns/column_vector.h" + +namespace doris { + +namespace vectorized { + +template +class DataTypeNumberSerDe : public DataTypeSerDe { + static_assert(IsNumber); + +public: + using ColumnType = ColumnVector; + Status write_column_to_pb(const IColumn& column, PValues& result, int start, + int end) const override; + Status read_column_from_pb(IColumn& column, const PValues& arg) const override; +}; + +template +Status DataTypeNumberSerDe::read_column_from_pb(IColumn& column, const PValues& arg) const { + if constexpr (std::is_same_v || std::is_same_v || + std::is_same_v) { + column.resize(arg.uint32_value_size()); + auto& data = reinterpret_cast(column).get_data(); + for (int i = 0; i < arg.uint32_value_size(); ++i) { + data[i] = arg.uint32_value(i); + } + } else if constexpr (std::is_same_v || std::is_same_v || + std::is_same_v) { + column.resize(arg.int32_value_size()); + auto& data = reinterpret_cast(column).get_data(); + for (int i = 0; i < arg.int32_value_size(); ++i) { + data[i] = arg.int32_value(i); + } + } else if constexpr (std::is_same_v) { + column.resize(arg.uint64_value_size()); + auto& data = reinterpret_cast(column).get_data(); + for (int i = 0; i < arg.uint64_value_size(); ++i) { + data[i] = arg.uint64_value(i); + } + } else if constexpr (std::is_same_v) { + column.resize(arg.int64_value_size()); + auto& data = reinterpret_cast(column).get_data(); + for (int i = 0; i < arg.int64_value_size(); ++i) { + data[i] = arg.int64_value(i); + } + } else if constexpr (std::is_same_v) { + column.resize(arg.float_value_size()); + auto& data = reinterpret_cast(column).get_data(); + for (int i = 0; i < arg.float_value_size(); ++i) { + data[i] = arg.float_value(i); + } + } else if constexpr (std::is_same_v) { + column.resize(arg.double_value_size()); + auto& data = reinterpret_cast(column).get_data(); + for (int i = 0; i < arg.float_value_size(); ++i) { + data[i] = arg.double_value(i); + } + } else if constexpr (std::is_same_v) { + column.resize(arg.bytes_value_size()); + auto& data = reinterpret_cast(column).get_data(); + for (int i = 0; i < arg.bytes_value_size(); ++i) { + data[i] = *(int128_t*)(arg.bytes_value(i).c_str()); + } + } else { + return Status::NotSupported("unknown ColumnType for reading from pb"); + } + return Status::OK(); +} + +template +Status DataTypeNumberSerDe::write_column_to_pb(const IColumn& column, PValues& result, int start, + int end) const { + int row_count = end - start; + auto ptype = result.mutable_type(); + const auto* col = check_and_get_column>(column); + if constexpr (std::is_same_v) { + ptype->set_id(PGenericType::INT128); + result.mutable_bytes_value()->Reserve(row_count); + for (size_t row_num = start; row_num < end; ++row_num) { + StringRef single_data = col->get_data_at(row_num); + result.add_bytes_value(single_data.data, single_data.size); + } + return Status::OK(); + } + auto& data = col->get_data(); + if constexpr (std::is_same_v) { + ptype->set_id(PGenericType::UINT8); + auto* values = result.mutable_uint32_value(); + values->Reserve(row_count); + values->Add(data.begin() + start, data.begin() + end); + } else if constexpr (std::is_same_v) { + ptype->set_id(PGenericType::UINT16); + auto* values = result.mutable_uint32_value(); + values->Reserve(row_count); + values->Add(data.begin() + start, data.begin() + end); + } else if constexpr (std::is_same_v) { + ptype->set_id(PGenericType::UINT32); + auto* values = result.mutable_uint32_value(); + values->Reserve(row_count); + values->Add(data.begin() + start, data.begin() + end); + } else if constexpr (std::is_same_v) { + ptype->set_id(PGenericType::UINT64); + auto* values = result.mutable_uint64_value(); + values->Reserve(row_count); + values->Add(data.begin() + start, data.begin() + end); + } else if constexpr (std::is_same_v) { + ptype->set_id(PGenericType::INT8); + auto* values = result.mutable_int32_value(); + values->Reserve(row_count); + values->Add(data.begin() + start, data.begin() + end); + } else if constexpr (std::is_same_v) { + ptype->set_id(PGenericType::INT16); + auto* values = result.mutable_int32_value(); + values->Reserve(row_count); + values->Add(data.begin() + start, data.begin() + end); + } else if constexpr (std::is_same_v) { + ptype->set_id(PGenericType::INT32); + auto* values = result.mutable_int32_value(); + values->Reserve(row_count); + values->Add(data.begin() + start, data.begin() + end); + } else if constexpr (std::is_same_v) { + ptype->set_id(PGenericType::INT64); + auto* values = result.mutable_int64_value(); + values->Reserve(row_count); + values->Add(data.begin() + start, data.begin() + end); + } else if constexpr (std::is_same_v) { + ptype->set_id(PGenericType::FLOAT); + auto* values = result.mutable_float_value(); + values->Reserve(row_count); + values->Add(data.begin() + start, data.begin() + end); + } else if constexpr (std::is_same_v) { + ptype->set_id(PGenericType::DOUBLE); + auto* values = result.mutable_double_value(); + values->Reserve(row_count); + values->Add(data.begin() + start, data.begin() + end); + } else { + return Status::NotSupported("unknown ColumnType for writing to pb"); + } + return Status::OK(); +} + +} // namespace vectorized +} // namespace doris \ No newline at end of file diff --git a/be/src/vec/data_types/serde/data_type_object_serde.cpp b/be/src/vec/data_types/serde/data_type_object_serde.cpp new file mode 100644 index 0000000000..273abae7a3 --- /dev/null +++ b/be/src/vec/data_types/serde/data_type_object_serde.cpp @@ -0,0 +1,22 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "data_type_object_serde.h" +namespace doris { + +namespace vectorized {} +} // namespace doris \ No newline at end of file diff --git a/be/src/vec/data_types/serde/data_type_object_serde.h b/be/src/vec/data_types/serde/data_type_object_serde.h new file mode 100644 index 0000000000..bf2f77712a --- /dev/null +++ b/be/src/vec/data_types/serde/data_type_object_serde.h @@ -0,0 +1,34 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "data_type_serde.h" +namespace doris { + +namespace vectorized { + +class DataTypeObjectSerDe : public DataTypeSerDe { +public: + Status write_column_to_pb(const IColumn& column, PValues& result, int start, + int end) const override { + LOG(FATAL) << "Not support write object column to pb"; + } + Status read_column_from_pb(IColumn& column, const PValues& arg) const override { + LOG(FATAL) << "Not support read from pb to object"; + } +}; +} // namespace vectorized +} // namespace doris diff --git a/be/src/vec/data_types/serde/data_type_quantilestate_serde.cpp b/be/src/vec/data_types/serde/data_type_quantilestate_serde.cpp new file mode 100644 index 0000000000..6fee387bd0 --- /dev/null +++ b/be/src/vec/data_types/serde/data_type_quantilestate_serde.cpp @@ -0,0 +1,22 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "data_type_quantilestate_serde.h" +namespace doris { + +namespace vectorized {} // namespace vectorized +} // namespace doris \ No newline at end of file diff --git a/be/src/vec/data_types/serde/data_type_quantilestate_serde.h b/be/src/vec/data_types/serde/data_type_quantilestate_serde.h new file mode 100644 index 0000000000..8aa70a142a --- /dev/null +++ b/be/src/vec/data_types/serde/data_type_quantilestate_serde.h @@ -0,0 +1,52 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "data_type_serde.h" +namespace doris { + +namespace vectorized { + +template +class DataTypeQuantileStateSerDe : public DataTypeSerDe { +public: + Status write_column_to_pb(const IColumn& column, PValues& result, int start, + int end) const override; + Status read_column_from_pb(IColumn& column, const PValues& arg) const override; +}; + +template +Status DataTypeQuantileStateSerDe::write_column_to_pb(const IColumn& column, PValues& result, + int start, int end) const { + result.mutable_bytes_value()->Reserve(end - start); + for (size_t row_num = start; row_num < end; ++row_num) { + StringRef data = column.get_data_at(row_num); + result.add_bytes_value(data.to_string()); + } + return Status::OK(); +} + +template +Status DataTypeQuantileStateSerDe::read_column_from_pb(IColumn& column, + const PValues& arg) const { + column.reserve(arg.bytes_value_size()); + for (int i = 0; i < arg.bytes_value_size(); ++i) { + column.insert_data(arg.bytes_value(i).c_str(), arg.bytes_value(i).size()); + } + return Status::OK(); +} +} // namespace vectorized +} // namespace doris diff --git a/be/src/vec/data_types/serde/data_type_serde.cpp b/be/src/vec/data_types/serde/data_type_serde.cpp new file mode 100644 index 0000000000..4ca78837d3 --- /dev/null +++ b/be/src/vec/data_types/serde/data_type_serde.cpp @@ -0,0 +1,24 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#include "data_type_serde.h" + +namespace doris { +namespace vectorized { +DataTypeSerDe::DataTypeSerDe() = default; +DataTypeSerDe::~DataTypeSerDe() = default; +} // namespace vectorized +} // namespace doris diff --git a/be/src/vec/data_types/serde/data_type_serde.h b/be/src/vec/data_types/serde/data_type_serde.h new file mode 100644 index 0000000000..a9ade5befa --- /dev/null +++ b/be/src/vec/data_types/serde/data_type_serde.h @@ -0,0 +1,68 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once +#include +#include + +#include "common/status.h" +#include "vec/columns/column.h" + +namespace doris { + +namespace vectorized { +// Deserialize means read from different file format or memory format, +// for example read from arrow, read from parquet. +// Serialize means write the column cell or the total column into another +// memory format or file format. +// Every data type should implement this interface. +// In the past, there are many switch cases in the code and we do not know +// how many cases or files we has to modify when we add a new type. And also +// it is very difficult to add a new read file format or write file format because +// the developer does not know how many datatypes has to deal. + +class DataTypeSerDe { +public: + DataTypeSerDe(); + virtual ~DataTypeSerDe(); + + // Protobuf serializer and deserializer + virtual Status write_column_to_pb(const IColumn& column, PValues& result, int start, + int end) const = 0; + + virtual Status read_column_from_pb(IColumn& column, const PValues& arg) const = 0; + + // JSONB serializer and deserializer + + // MySQL serializer and deserializer + + // Thrift serializer and deserializer + + // ORC serializer and deserializer + + // CSV serializer and deserializer + + // JSON serializer and deserializer + + // Arrow serializer and deserializer +}; + +using DataTypeSerDeSPtr = std::shared_ptr; +using DataTypeSerDeSPtrs = std::vector; + +} // namespace vectorized +} // namespace doris \ No newline at end of file diff --git a/be/src/vec/data_types/serde/data_type_string_serde.cpp b/be/src/vec/data_types/serde/data_type_string_serde.cpp new file mode 100644 index 0000000000..83e96e1bed --- /dev/null +++ b/be/src/vec/data_types/serde/data_type_string_serde.cpp @@ -0,0 +1,43 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "data_type_string_serde.h" + +#include "vec/columns/column_string.h" + +namespace doris { +namespace vectorized { + +Status DataTypeStringSerDe::write_column_to_pb(const IColumn& column, PValues& result, int start, + int end) const { + result.mutable_bytes_value()->Reserve(end - start); + for (size_t row_num = start; row_num < end; ++row_num) { + StringRef data = column.get_data_at(row_num); + result.add_string_value(data.to_string()); + } + return Status::OK(); +} +Status DataTypeStringSerDe::read_column_from_pb(IColumn& column, const PValues& arg) const { + auto& col = reinterpret_cast(column); + col.reserve(arg.string_value_size()); + for (int i = 0; i < arg.string_value_size(); ++i) { + column.insert_data(arg.string_value(i).c_str(), arg.string_value(i).size()); + } + return Status::OK(); +} +} // namespace vectorized +} // namespace doris \ No newline at end of file diff --git a/be/src/vec/data_types/serde/data_type_string_serde.h b/be/src/vec/data_types/serde/data_type_string_serde.h new file mode 100644 index 0000000000..1ddb1c113c --- /dev/null +++ b/be/src/vec/data_types/serde/data_type_string_serde.h @@ -0,0 +1,30 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "data_type_serde.h" +namespace doris { + +namespace vectorized { + +class DataTypeStringSerDe : public DataTypeSerDe { +public: + Status write_column_to_pb(const IColumn& column, PValues& result, int start, + int end) const override; + Status read_column_from_pb(IColumn& column, const PValues& arg) const override; +}; +} // namespace vectorized +} // namespace doris diff --git a/be/src/vec/data_types/serde/data_type_struct_serde.cpp b/be/src/vec/data_types/serde/data_type_struct_serde.cpp new file mode 100644 index 0000000000..730f5a6980 --- /dev/null +++ b/be/src/vec/data_types/serde/data_type_struct_serde.cpp @@ -0,0 +1,22 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "data_type_struct_serde.h" +namespace doris { + +namespace vectorized {} +} // namespace doris \ No newline at end of file diff --git a/be/src/vec/data_types/serde/data_type_struct_serde.h b/be/src/vec/data_types/serde/data_type_struct_serde.h new file mode 100644 index 0000000000..c2907ae583 --- /dev/null +++ b/be/src/vec/data_types/serde/data_type_struct_serde.h @@ -0,0 +1,40 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "data_type_serde.h" +namespace doris { + +namespace vectorized { + +class DataTypeStructSerDe : public DataTypeSerDe { +public: + DataTypeStructSerDe(const DataTypeSerDeSPtrs& _elemSerDeSPtrs) + : elemSerDeSPtrs(_elemSerDeSPtrs) {} + + Status write_column_to_pb(const IColumn& column, PValues& result, int start, + int end) const override { + LOG(FATAL) << "Not support write struct column to pb"; + } + Status read_column_from_pb(IColumn& column, const PValues& arg) const override { + LOG(FATAL) << "Not support read from pb to strut"; + } + +private: + DataTypeSerDeSPtrs elemSerDeSPtrs; +}; +} // namespace vectorized +} // namespace doris diff --git a/be/src/vec/functions/function_rpc.cpp b/be/src/vec/functions/function_rpc.cpp index a093ac64cf..9a5f71b8c5 100644 --- a/be/src/vec/functions/function_rpc.cpp +++ b/be/src/vec/functions/function_rpc.cpp @@ -36,446 +36,6 @@ RPCFnImpl::RPCFnImpl(const TFunction& fn) : _fn(fn) { _fn.scalar_fn.symbol); } -void RPCFnImpl::convert_nullable_col_to_pvalue(const ColumnPtr& column, - const DataTypePtr& data_type, - const ColumnUInt8& null_col, PValues* arg, int start, - int end) { - int row_count = end - start; - if (column->has_null(row_count)) { - auto* null_map = arg->mutable_null_map(); - null_map->Reserve(row_count); - const auto* col = check_and_get_column(null_col); - auto& data = col->get_data(); - null_map->Add(data.begin() + start, data.begin() + end); - RPCFnImpl::convert_col_to_pvalue(column, data_type, arg, start, end); - } else { - RPCFnImpl::convert_col_to_pvalue(column, data_type, arg, start, end); - } -} - -template -void RPCFnImpl::convert_col_to_pvalue(const ColumnPtr& column, const DataTypePtr& data_type, - PValues* arg, int start, int end) { - int row_count = end - start; - PGenericType* ptype = arg->mutable_type(); - switch (data_type->get_type_id()) { - case TypeIndex::UInt8: { - ptype->set_id(PGenericType::UINT8); - auto* values = arg->mutable_bool_value(); - values->Reserve(row_count); - const auto* col = check_and_get_column(column); - auto& data = col->get_data(); - values->Add(data.begin() + start, data.begin() + end); - break; - } - case TypeIndex::UInt16: { - ptype->set_id(PGenericType::UINT16); - auto* values = arg->mutable_uint32_value(); - values->Reserve(row_count); - const auto* col = check_and_get_column(column); - auto& data = col->get_data(); - values->Add(data.begin() + start, data.begin() + end); - break; - } - case TypeIndex::UInt32: { - ptype->set_id(PGenericType::UINT32); - auto* values = arg->mutable_uint32_value(); - values->Reserve(row_count); - const auto* col = check_and_get_column(column); - auto& data = col->get_data(); - values->Add(data.begin() + start, data.begin() + end); - break; - } - case TypeIndex::UInt64: { - ptype->set_id(PGenericType::UINT64); - auto* values = arg->mutable_uint64_value(); - values->Reserve(row_count); - const auto* col = check_and_get_column(column); - auto& data = col->get_data(); - values->Add(data.begin() + start, data.begin() + end); - break; - } - case TypeIndex::UInt128: { - ptype->set_id(PGenericType::UINT128); - arg->mutable_bytes_value()->Reserve(row_count); - for (size_t row_num = start; row_num < end; ++row_num) { - if constexpr (nullable) { - if (column->is_null_at(row_num)) { - arg->add_bytes_value(nullptr); - } else { - StringRef data = column->get_data_at(row_num); - arg->add_bytes_value(data.data, data.size); - } - } else { - StringRef data = column->get_data_at(row_num); - arg->add_bytes_value(data.data, data.size); - } - } - break; - } - case TypeIndex::Int8: { - ptype->set_id(PGenericType::INT8); - auto* values = arg->mutable_int32_value(); - values->Reserve(row_count); - const auto* col = check_and_get_column(column); - auto& data = col->get_data(); - values->Add(data.begin() + start, data.begin() + end); - break; - } - case TypeIndex::Int16: { - ptype->set_id(PGenericType::INT16); - auto* values = arg->mutable_int32_value(); - values->Reserve(row_count); - const auto* col = check_and_get_column(column); - auto& data = col->get_data(); - values->Add(data.begin() + start, data.begin() + end); - break; - } - case TypeIndex::Int32: { - ptype->set_id(PGenericType::INT32); - auto* values = arg->mutable_int32_value(); - values->Reserve(row_count); - const auto* col = check_and_get_column(column); - auto& data = col->get_data(); - values->Add(data.begin() + start, data.begin() + end); - break; - } - case TypeIndex::Int64: { - ptype->set_id(PGenericType::INT64); - auto* values = arg->mutable_int64_value(); - values->Reserve(row_count); - const auto* col = check_and_get_column(column); - auto& data = col->get_data(); - values->Add(data.begin() + start, data.begin() + end); - break; - } - case TypeIndex::Int128: { - ptype->set_id(PGenericType::INT128); - arg->mutable_bytes_value()->Reserve(row_count); - for (size_t row_num = start; row_num < end; ++row_num) { - if constexpr (nullable) { - if (column->is_null_at(row_num)) { - arg->add_bytes_value(nullptr); - } else { - StringRef data = column->get_data_at(row_num); - arg->add_bytes_value(data.data, data.size); - } - } else { - StringRef data = column->get_data_at(row_num); - arg->add_bytes_value(data.data, data.size); - } - } - break; - } - case TypeIndex::Float32: { - ptype->set_id(PGenericType::FLOAT); - auto* values = arg->mutable_float_value(); - values->Reserve(row_count); - const auto* col = check_and_get_column(column); - auto& data = col->get_data(); - values->Add(data.begin() + start, data.begin() + end); - break; - } - - case TypeIndex::Float64: { - ptype->set_id(PGenericType::DOUBLE); - auto* values = arg->mutable_double_value(); - values->Reserve(row_count); - const auto* col = check_and_get_column(column); - auto& data = col->get_data(); - values->Add(data.begin() + start, data.begin() + end); - break; - } - case TypeIndex::String: { - ptype->set_id(PGenericType::STRING); - arg->mutable_bytes_value()->Reserve(row_count); - for (size_t row_num = start; row_num < end; ++row_num) { - if constexpr (nullable) { - if (column->is_null_at(row_num)) { - arg->add_string_value(nullptr); - } else { - StringRef data = column->get_data_at(row_num); - arg->add_string_value(data.to_string()); - } - } else { - StringRef data = column->get_data_at(row_num); - arg->add_string_value(data.to_string()); - } - } - break; - } - case TypeIndex::Date: { - ptype->set_id(PGenericType::DATE); - arg->mutable_datetime_value()->Reserve(row_count); - for (size_t row_num = start; row_num < end; ++row_num) { - PDateTime* date_time = arg->add_datetime_value(); - if constexpr (nullable) { - if (!column->is_null_at(row_num)) { - VecDateTimeValue v = - VecDateTimeValue::create_from_olap_date(column->get_int(row_num)); - date_time->set_day(v.day()); - date_time->set_month(v.month()); - date_time->set_year(v.year()); - } - } else { - VecDateTimeValue v = - VecDateTimeValue::create_from_olap_date(column->get_int(row_num)); - date_time->set_day(v.day()); - date_time->set_month(v.month()); - date_time->set_year(v.year()); - } - } - break; - } - case TypeIndex::DateTime: { - ptype->set_id(PGenericType::DATETIME); - arg->mutable_datetime_value()->Reserve(row_count); - for (size_t row_num = start; row_num < end; ++row_num) { - PDateTime* date_time = arg->add_datetime_value(); - if constexpr (nullable) { - if (!column->is_null_at(row_num)) { - VecDateTimeValue v = - VecDateTimeValue::create_from_olap_datetime(column->get_int(row_num)); - date_time->set_day(v.day()); - date_time->set_month(v.month()); - date_time->set_year(v.year()); - date_time->set_hour(v.hour()); - date_time->set_minute(v.minute()); - date_time->set_second(v.second()); - } - } else { - VecDateTimeValue v = - VecDateTimeValue::create_from_olap_datetime(column->get_int(row_num)); - date_time->set_day(v.day()); - date_time->set_month(v.month()); - date_time->set_year(v.year()); - date_time->set_hour(v.hour()); - date_time->set_minute(v.minute()); - date_time->set_second(v.second()); - } - } - break; - } - case TypeIndex::BitMap: { - ptype->set_id(PGenericType::BITMAP); - arg->mutable_bytes_value()->Reserve(row_count); - for (size_t row_num = start; row_num < end; ++row_num) { - if constexpr (nullable) { - if (column->is_null_at(row_num)) { - arg->add_bytes_value(nullptr); - } else { - StringRef data = column->get_data_at(row_num); - arg->add_bytes_value(data.data, data.size); - } - } else { - StringRef data = column->get_data_at(row_num); - arg->add_bytes_value(data.data, data.size); - } - } - break; - } - case TypeIndex::HLL: { - ptype->set_id(PGenericType::HLL); - arg->mutable_bytes_value()->Reserve(row_count); - for (size_t row_num = start; row_num < end; ++row_num) { - if constexpr (nullable) { - if (column->is_null_at(row_num)) { - arg->add_bytes_value(nullptr); - } else { - StringRef data = column->get_data_at(row_num); - arg->add_bytes_value(data.data, data.size); - } - } else { - StringRef data = column->get_data_at(row_num); - arg->add_bytes_value(data.data, data.size); - } - } - break; - } - case TypeIndex::QuantileState: { - ptype->set_id(PGenericType::QUANTILE_STATE); - arg->mutable_bytes_value()->Reserve(row_count); - for (size_t row_num = start; row_num < end; ++row_num) { - if constexpr (nullable) { - if (column->is_null_at(row_num)) { - arg->add_bytes_value(nullptr); - } else { - StringRef data = column->get_data_at(row_num); - arg->add_bytes_value(data.data, data.size); - } - } else { - StringRef data = column->get_data_at(row_num); - arg->add_bytes_value(data.data, data.size); - } - } - break; - } - default: - LOG(INFO) << "unknown type: " << data_type->get_name(); - ptype->set_id(PGenericType::UNKNOWN); - break; - } -} - -template -void RPCFnImpl::_convert_to_column(MutableColumnPtr& column, const PValues& result) { - switch (result.type().id()) { - case PGenericType::UINT8: { - column->reserve(result.uint32_value_size()); - column->resize(result.uint32_value_size()); - auto& data = reinterpret_cast(column.get())->get_data(); - for (int i = 0; i < result.uint32_value_size(); ++i) { - data[i] = result.uint32_value(i); - } - break; - } - case PGenericType::UINT16: { - column->reserve(result.uint32_value_size()); - column->resize(result.uint32_value_size()); - auto& data = reinterpret_cast(column.get())->get_data(); - for (int i = 0; i < result.uint32_value_size(); ++i) { - data[i] = result.uint32_value(i); - } - break; - } - case PGenericType::UINT32: { - column->reserve(result.uint32_value_size()); - column->resize(result.uint32_value_size()); - auto& data = reinterpret_cast(column.get())->get_data(); - for (int i = 0; i < result.uint32_value_size(); ++i) { - data[i] = result.uint32_value(i); - } - break; - } - case PGenericType::UINT64: { - column->reserve(result.uint64_value_size()); - column->resize(result.uint64_value_size()); - auto& data = reinterpret_cast(column.get())->get_data(); - for (int i = 0; i < result.uint64_value_size(); ++i) { - data[i] = result.uint64_value(i); - } - break; - } - case PGenericType::INT8: { - column->reserve(result.int32_value_size()); - column->resize(result.int32_value_size()); - auto& data = reinterpret_cast(column.get())->get_data(); - for (int i = 0; i < result.int32_value_size(); ++i) { - data[i] = result.int32_value(i); - } - break; - } - case PGenericType::INT16: { - column->reserve(result.int32_value_size()); - column->resize(result.int32_value_size()); - auto& data = reinterpret_cast(column.get())->get_data(); - for (int i = 0; i < result.int32_value_size(); ++i) { - data[i] = result.int32_value(i); - } - break; - } - case PGenericType::INT32: { - column->reserve(result.int32_value_size()); - column->resize(result.int32_value_size()); - auto& data = reinterpret_cast(column.get())->get_data(); - for (int i = 0; i < result.int32_value_size(); ++i) { - data[i] = result.int32_value(i); - } - break; - } - case PGenericType::INT64: { - column->reserve(result.int64_value_size()); - column->resize(result.int64_value_size()); - auto& data = reinterpret_cast(column.get())->get_data(); - for (int i = 0; i < result.int64_value_size(); ++i) { - data[i] = result.int64_value(i); - } - break; - } - case PGenericType::DATE: - case PGenericType::DATETIME: { - column->reserve(result.datetime_value_size()); - column->resize(result.datetime_value_size()); - auto& data = reinterpret_cast(column.get())->get_data(); - for (int i = 0; i < result.datetime_value_size(); ++i) { - VecDateTimeValue v; - PDateTime pv = result.datetime_value(i); - v.set_time(pv.year(), pv.month(), pv.day(), pv.hour(), pv.minute(), pv.minute()); - data[i] = binary_cast(v); - } - break; - } - case PGenericType::FLOAT: { - column->reserve(result.float_value_size()); - column->resize(result.float_value_size()); - auto& data = reinterpret_cast(column.get())->get_data(); - for (int i = 0; i < result.float_value_size(); ++i) { - data[i] = result.float_value(i); - } - break; - } - case PGenericType::DOUBLE: { - column->reserve(result.double_value_size()); - column->resize(result.double_value_size()); - auto& data = reinterpret_cast(column.get())->get_data(); - for (int i = 0; i < result.double_value_size(); ++i) { - data[i] = result.double_value(i); - } - break; - } - case PGenericType::INT128: { - column->reserve(result.bytes_value_size()); - column->resize(result.bytes_value_size()); - auto& data = reinterpret_cast(column.get())->get_data(); - for (int i = 0; i < result.bytes_value_size(); ++i) { - data[i] = *(int128_t*)(result.bytes_value(i).c_str()); - } - break; - } - case PGenericType::STRING: { - column->reserve(result.string_value_size()); - for (int i = 0; i < result.string_value_size(); ++i) { - column->insert_data(result.string_value(i).c_str(), result.string_value(i).size()); - } - break; - } - case PGenericType::DECIMAL128: { - column->reserve(result.bytes_value_size()); - column->resize(result.bytes_value_size()); - auto& data = reinterpret_cast(column.get())->get_data(); - for (int i = 0; i < result.bytes_value_size(); ++i) { - data[i] = *(int128_t*)(result.bytes_value(i).c_str()); - } - break; - } - case PGenericType::BITMAP: { - column->reserve(result.bytes_value_size()); - for (int i = 0; i < result.bytes_value_size(); ++i) { - column->insert_data(result.bytes_value(i).c_str(), result.bytes_value(i).size()); - } - break; - } - case PGenericType::HLL: { - column->reserve(result.bytes_value_size()); - for (int i = 0; i < result.bytes_value_size(); ++i) { - column->insert_data(result.bytes_value(i).c_str(), result.bytes_value(i).size()); - } - break; - } - case PGenericType::QUANTILE_STATE: { - column->reserve(result.bytes_value_size()); - for (int i = 0; i < result.bytes_value_size(); ++i) { - column->insert_data(result.bytes_value(i).c_str(), result.bytes_value(i).size()); - } - break; - } - default: { - LOG(WARNING) << "unknown PGenericType: " << result.type().DebugString(); - break; - } - } -} - Status RPCFnImpl::vec_call(FunctionContext* context, Block& block, const ColumnNumbers& arguments, size_t result, size_t input_rows_count) { PFunctionCallRequest request; @@ -508,45 +68,16 @@ void RPCFnImpl::_convert_block_to_proto(Block& block, const ColumnNumbers& argum ColumnWithTypeAndName& column = block.get_by_position(col_idx); arg->set_has_null(column.column->has_null(row_count)); auto col = column.column->convert_to_full_column_if_const(); - if (auto* nullable = check_and_get_column(*col)) { - auto data_col = nullable->get_nested_column_ptr(); - auto& null_col = nullable->get_null_map_column(); - auto data_type = std::reinterpret_pointer_cast(column.type); - convert_nullable_col_to_pvalue(data_col->convert_to_full_column_if_const(), - data_type->get_nested_type(), null_col, arg, 0, - row_count); - } else { - convert_col_to_pvalue(col, column.type, arg, 0, row_count); - } + column.type->get_serde()->write_column_to_pb(*col, *arg, 0, row_count); } } void RPCFnImpl::_convert_to_block(Block& block, const PValues& result, size_t pos) { auto data_type = block.get_data_type(pos); - if (data_type->is_nullable()) { - auto null_type = std::reinterpret_pointer_cast(data_type); - auto data_col = null_type->get_nested_type()->create_column(); - _convert_to_column(data_col, result); - auto null_col = ColumnUInt8::create(data_col->size(), 0); - auto& null_map_data = null_col->get_data(); - null_col->reserve(data_col->size()); - null_col->resize(data_col->size()); - if (result.has_null()) { - for (int i = 0; i < data_col->size(); ++i) { - null_map_data[i] = result.null_map(i); - } - } else { - for (int i = 0; i < data_col->size(); ++i) { - null_map_data[i] = false; - } - } - block.replace_by_position(pos, - ColumnNullable::create(std::move(data_col), std::move(null_col))); - } else { - auto column = data_type->create_column(); - _convert_to_column(column, result); - block.replace_by_position(pos, std::move(column)); - } + auto col = data_type->create_column(); + auto serde = data_type->get_serde(); + serde->read_column_from_pb(*col, result); + block.replace_by_position(pos, std::move(col)); } FunctionRPC::FunctionRPC(const TFunction& fn, const DataTypes& argument_types, diff --git a/be/src/vec/functions/function_rpc.h b/be/src/vec/functions/function_rpc.h index 30f2dbaf8d..0c7fda85b4 100644 --- a/be/src/vec/functions/function_rpc.h +++ b/be/src/vec/functions/function_rpc.h @@ -31,22 +31,11 @@ public: const std::vector& arguments, size_t result, size_t input_rows_count); bool available() { return _client != nullptr; } - static void convert_nullable_col_to_pvalue(const vectorized::ColumnPtr& column, - const vectorized::DataTypePtr& data_type, - const vectorized::ColumnUInt8& null_col, - PValues* arg, int start, int end); - template - static void convert_col_to_pvalue(const vectorized::ColumnPtr& column, - const vectorized::DataTypePtr& data_type, PValues* arg, - int start, int end); - private: void _convert_block_to_proto(vectorized::Block& block, const vectorized::ColumnNumbers& arguments, size_t input_rows_count, PFunctionCallRequest* request); void _convert_to_block(vectorized::Block& block, const PValues& result, size_t pos); - template - void _convert_to_column(vectorized::MutableColumnPtr& column, const PValues& result); std::shared_ptr _client; std::string _function_name; diff --git a/be/test/CMakeLists.txt b/be/test/CMakeLists.txt index 9f366c9d30..d543c48570 100644 --- a/be/test/CMakeLists.txt +++ b/be/test/CMakeLists.txt @@ -228,6 +228,7 @@ set(VEC_TEST_FILES vec/columns/column_decimal_test.cpp vec/columns/column_fixed_length_object_test.cpp vec/data_types/complex_type_test.cpp + vec/data_types/serde/data_type_serde_test.cpp vec/core/block_test.cpp vec/core/block_spill_test.cpp vec/core/column_array_test.cpp diff --git a/be/test/vec/data_types/serde/data_type_serde_test.cpp b/be/test/vec/data_types/serde/data_type_serde_test.cpp new file mode 100644 index 0000000000..9c2258c133 --- /dev/null +++ b/be/test/vec/data_types/serde/data_type_serde_test.cpp @@ -0,0 +1,183 @@ + +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include +#include + +#include "vec/columns/column_complex.h" +#include "vec/columns/column_decimal.h" +#include "vec/columns/column_nullable.h" +#include "vec/data_types/data_type_bitmap.h" +#include "vec/data_types/data_type_decimal.h" +#include "vec/data_types/data_type_hll.h" +#include "vec/data_types/data_type_nullable.h" +#include "vec/data_types/data_type_number.h" +#include "vec/data_types/data_type_quantilestate.h" +#include "vec/data_types/data_type_string.h" + +namespace doris::vectorized { + +void column_to_pb(const DataTypePtr data_type, const IColumn& col, PValues* result) { + const DataTypeSerDeSPtr serde = data_type->get_serde(); + serde->write_column_to_pb(col, *result, 0, col.size()); +} + +void pb_to_column(const DataTypePtr data_type, PValues& result, IColumn& col) { + auto serde = data_type->get_serde(); + serde->read_column_from_pb(col, result); +} + +void check_pb_col(const DataTypePtr data_type, const IColumn& col) { + PValues pv = PValues(); + column_to_pb(data_type, col, &pv); + std::string s1 = pv.DebugString(); + + auto col1 = data_type->create_column(); + pb_to_column(data_type, pv, *col1); + PValues as_pv = PValues(); + column_to_pb(data_type, *col1, &as_pv); + + std::string s2 = as_pv.DebugString(); + EXPECT_EQ(s1, s2); +} + +void serialize_and_deserialize_pb_test() { + // int + { + auto vec = vectorized::ColumnVector::create(); + auto& data = vec->get_data(); + for (int i = 0; i < 1024; ++i) { + data.push_back(i); + } + vectorized::DataTypePtr data_type(std::make_shared()); + check_pb_col(data_type, *vec.get()); + } + // string + { + auto strcol = vectorized::ColumnString::create(); + for (int i = 0; i < 1024; ++i) { + std::string is = std::to_string(i); + strcol->insert_data(is.c_str(), is.size()); + } + vectorized::DataTypePtr data_type(std::make_shared()); + check_pb_col(data_type, *strcol.get()); + } + // decimal + { + vectorized::DataTypePtr decimal_data_type(doris::vectorized::create_decimal(27, 9, true)); + auto decimal_column = decimal_data_type->create_column(); + auto& data = ((vectorized::ColumnDecimal>*) + decimal_column.get()) + ->get_data(); + for (int i = 0; i < 1024; ++i) { + __int128_t value = i * pow(10, 9) + i * pow(10, 8); + data.push_back(value); + } + check_pb_col(decimal_data_type, *decimal_column.get()); + } + // bitmap + { + vectorized::DataTypePtr bitmap_data_type(std::make_shared()); + auto bitmap_column = bitmap_data_type->create_column(); + std::vector& container = + ((vectorized::ColumnBitmap*)bitmap_column.get())->get_data(); + for (int i = 0; i < 4; ++i) { + BitmapValue bv; + for (int j = 0; j <= i; ++j) { + bv.add(j); + } + container.push_back(bv); + } + check_pb_col(bitmap_data_type, *bitmap_column.get()); + } + // hll + { + vectorized::DataTypePtr hll_data_type(std::make_shared()); + auto hll_column = hll_data_type->create_column(); + std::vector& container = + ((vectorized::ColumnHLL*)hll_column.get())->get_data(); + for (int i = 0; i < 4; ++i) { + HyperLogLog hll; + hll.update(i); + container.push_back(hll); + } + check_pb_col(hll_data_type, *hll_column.get()); + } + // quantilestate + { + vectorized::DataTypePtr quantile_data_type( + std::make_shared()); + auto quantile_column = quantile_data_type->create_column(); + std::vector& container = + ((vectorized::ColumnQuantileStateDouble*)quantile_column.get())->get_data(); + const long max_rand = 1000000L; + double lower_bound = 0; + double upper_bound = 100; + srandom(time(nullptr)); + for (int i = 0; i < 1024; ++i) { + QuantileStateDouble q; + double random_double = + lower_bound + (upper_bound - lower_bound) * (random() % max_rand) / max_rand; + q.add_value(random_double); + container.push_back(q); + } + check_pb_col(quantile_data_type, *quantile_column.get()); + } + // nullable string + { + vectorized::DataTypePtr string_data_type(std::make_shared()); + vectorized::DataTypePtr nullable_data_type( + std::make_shared(string_data_type)); + auto nullable_column = nullable_data_type->create_column(); + ((vectorized::ColumnNullable*)nullable_column.get())->insert_null_elements(1024); + check_pb_col(nullable_data_type, *nullable_column.get()); + } + // nullable decimal + { + vectorized::DataTypePtr decimal_data_type(doris::vectorized::create_decimal(27, 9, true)); + vectorized::DataTypePtr nullable_data_type( + std::make_shared(decimal_data_type)); + auto nullable_column = nullable_data_type->create_column(); + ((vectorized::ColumnNullable*)nullable_column.get())->insert_null_elements(1024); + check_pb_col(nullable_data_type, *nullable_column.get()); + } + // int with 1024 batch size + { + auto vec = vectorized::ColumnVector::create(); + auto& data = vec->get_data(); + for (int i = 0; i < 1024; ++i) { + data.push_back(i); + } + std::cout << vec->size() << std::endl; + vectorized::DataTypePtr data_type(std::make_shared()); + vectorized::DataTypePtr nullable_data_type( + std::make_shared(data_type)); + auto nullable_column = nullable_data_type->create_column(); + ((vectorized::ColumnNullable*)nullable_column.get()) + ->insert_range_from_not_nullable(*vec, 0, 1024); + check_pb_col(nullable_data_type, *nullable_column.get()); + } +} + +TEST(DataTypeSerDeTest, DataTypeScalaSerDeTest) { + serialize_and_deserialize_pb_test(); +} + +} // namespace doris::vectorized