From 91dae8a5b685ca9054dafe3ca7b161e5d0ac4afc Mon Sep 17 00:00:00 2001 From: amory Date: Mon, 29 May 2023 16:53:33 +0800 Subject: [PATCH] [FIX](mysql_writer) fix mysql output binary object works (#20154) * fix struct_export out data * fix mysql writer output with binary true --- .../serde/data_type_array_serde.cpp | 15 ++++---- .../data_types/serde/data_type_array_serde.h | 18 ++++++---- .../serde/data_type_bitmap_serde.cpp | 17 +++++++-- .../data_types/serde/data_type_bitmap_serde.h | 18 ++++++---- .../serde/data_type_date64_serde.cpp | 5 +-- .../data_types/serde/data_type_date64_serde.h | 18 ++++++---- .../serde/data_type_datetimev2_serde.cpp | 5 +-- .../serde/data_type_datetimev2_serde.h | 18 ++++++---- .../serde/data_type_datev2_serde.cpp | 5 +-- .../data_types/serde/data_type_datev2_serde.h | 18 ++++++---- .../serde/data_type_decimal_serde.h | 23 +++++++----- .../serde/data_type_fixedlengthobject_serde.h | 10 +++--- .../data_types/serde/data_type_hll_serde.cpp | 17 +++++++-- .../data_types/serde/data_type_hll_serde.h | 18 ++++++---- .../data_types/serde/data_type_jsonb_serde.h | 18 ++++++---- .../data_types/serde/data_type_map_serde.cpp | 21 ++++++----- .../data_types/serde/data_type_map_serde.h | 18 ++++++---- .../serde/data_type_nullable_serde.cpp | 8 +++-- .../serde/data_type_nullable_serde.h | 18 ++++++---- .../data_types/serde/data_type_number_serde.h | 23 +++++++----- .../data_types/serde/data_type_object_serde.h | 10 +++--- .../serde/data_type_quantilestate_serde.h | 36 +++++++++++++------ be/src/vec/data_types/serde/data_type_serde.h | 4 +-- .../serde/data_type_string_serde.cpp | 5 +-- .../data_types/serde/data_type_string_serde.h | 18 ++++++---- .../serde/data_type_struct_serde.cpp | 13 +++---- .../data_types/serde/data_type_struct_serde.h | 18 ++++++---- .../data_types/serde/data_type_time_serde.h | 17 +++++---- be/src/vec/sink/vmysql_result_writer.cpp | 4 +-- 29 files changed, 276 insertions(+), 160 deletions(-) diff --git a/be/src/vec/data_types/serde/data_type_array_serde.cpp b/be/src/vec/data_types/serde/data_type_array_serde.cpp index 2c48dea576..2d224cfeac 100644 --- a/be/src/vec/data_types/serde/data_type_array_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_array_serde.cpp @@ -86,8 +86,9 @@ void DataTypeArraySerDe::read_column_from_arrow(IColumn& column, const arrow::Ar template Status DataTypeArraySerDe::_write_column_to_mysql( - const IColumn& column, std::vector>& result, int row_idx, - int start, int end, bool col_const) const { + const IColumn& column, bool return_object_data_as_binary, + std::vector>& result, int row_idx, int start, int end, + bool col_const) const { int buf_ret = 0; auto& column_array = assert_cast(column); auto& offsets = column_array.get_offsets(); @@ -109,12 +110,14 @@ Status DataTypeArraySerDe::_write_column_to_mysql( } else { if (is_nested_string) { buf_ret = result[row_idx].push_string("\"", 1); - RETURN_IF_ERROR(nested_serde->write_column_to_mysql(data, result, row_idx, j, - j + 1, col_const)); + RETURN_IF_ERROR(nested_serde->write_column_to_mysql( + data, return_object_data_as_binary, result, row_idx, j, j + 1, + col_const)); buf_ret = result[row_idx].push_string("\"", 1); } else { - RETURN_IF_ERROR(nested_serde->write_column_to_mysql(data, result, row_idx, j, - j + 1, col_const)); + RETURN_IF_ERROR(nested_serde->write_column_to_mysql( + data, return_object_data_as_binary, result, row_idx, j, j + 1, + col_const)); } } } diff --git a/be/src/vec/data_types/serde/data_type_array_serde.h b/be/src/vec/data_types/serde/data_type_array_serde.h index e46f2c16d1..97c4625360 100644 --- a/be/src/vec/data_types/serde/data_type_array_serde.h +++ b/be/src/vec/data_types/serde/data_type_array_serde.h @@ -56,19 +56,23 @@ public: int end) const override; void read_column_from_arrow(IColumn& column, const arrow::Array* arrow_array, int start, int end, const cctz::time_zone& ctz) const override; - Status write_column_to_mysql(const IColumn& column, std::vector>& result, - int row_idx, int start, int end, bool col_const) const override { - return _write_column_to_mysql(column, result, row_idx, start, end, col_const); + Status write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary, + std::vector>& result, int row_idx, int start, + int end, bool col_const) const override { + return _write_column_to_mysql(column, return_object_data_as_binary, result, row_idx, start, + end, col_const); } - Status write_column_to_mysql(const IColumn& column, std::vector>& result, - int row_idx, int start, int end, bool col_const) const override { - return _write_column_to_mysql(column, result, row_idx, start, end, col_const); + Status write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary, + std::vector>& result, int row_idx, int start, + int end, bool col_const) const override { + return _write_column_to_mysql(column, return_object_data_as_binary, result, row_idx, start, + end, col_const); } private: template - Status _write_column_to_mysql(const IColumn& column, + Status _write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary, std::vector>& result, int row_idx, int start, int end, bool col_const) const; diff --git a/be/src/vec/data_types/serde/data_type_bitmap_serde.cpp b/be/src/vec/data_types/serde/data_type_bitmap_serde.cpp index 1c41e44aa7..231175ff24 100644 --- a/be/src/vec/data_types/serde/data_type_bitmap_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_bitmap_serde.cpp @@ -84,14 +84,25 @@ void DataTypeBitMapSerDe::read_one_cell_from_jsonb(IColumn& column, const JsonbV template Status DataTypeBitMapSerDe::_write_column_to_mysql( - const IColumn& column, std::vector>& result, int row_idx, - int start, int end, bool col_const) const { + const IColumn& column, bool return_object_data_as_binary, + std::vector>& result, int row_idx, int start, int end, + bool col_const) const { + auto& data_column = assert_cast(column); int buf_ret = 0; for (ssize_t i = start; i < end; ++i) { if (0 != buf_ret) { return Status::InternalError("pack mysql buffer failed."); } - buf_ret = result[row_idx].push_null(); + if (return_object_data_as_binary) { + const auto col_index = index_check_const(i, col_const); + BitmapValue bitmapValue = data_column.get_element(col_index); + size_t size = bitmapValue.getSizeInBytes(); + std::unique_ptr buf = std::make_unique(size); + bitmapValue.write_to(buf.get()); + buf_ret = result[row_idx].push_string(buf.get(), size); + } else { + buf_ret = result[row_idx].push_null(); + } ++row_idx; } return Status::OK(); diff --git a/be/src/vec/data_types/serde/data_type_bitmap_serde.h b/be/src/vec/data_types/serde/data_type_bitmap_serde.h index 93eede68cf..378a1bb8e9 100644 --- a/be/src/vec/data_types/serde/data_type_bitmap_serde.h +++ b/be/src/vec/data_types/serde/data_type_bitmap_serde.h @@ -50,20 +50,24 @@ public: int end, const cctz::time_zone& ctz) const override { LOG(FATAL) << "Not support read bitmap column from arrow"; } - Status write_column_to_mysql(const IColumn& column, std::vector>& result, - int row_idx, int start, int end, bool col_const) const override { - return _write_column_to_mysql(column, result, row_idx, start, end, col_const); + Status write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary, + std::vector>& result, int row_idx, int start, + int end, bool col_const) const override { + return _write_column_to_mysql(column, return_object_data_as_binary, result, row_idx, start, + end, col_const); } - Status write_column_to_mysql(const IColumn& column, std::vector>& result, - int row_idx, int start, int end, bool col_const) const override { - return _write_column_to_mysql(column, result, row_idx, start, end, col_const); + Status write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary, + std::vector>& result, int row_idx, int start, + int end, bool col_const) const override { + return _write_column_to_mysql(column, return_object_data_as_binary, result, row_idx, start, + end, col_const); } private: // Bitmap is binary data which is not shown by mysql. template - Status _write_column_to_mysql(const IColumn& column, + Status _write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary, std::vector>& result, int row_idx, int start, int end, bool col_const) const; }; diff --git a/be/src/vec/data_types/serde/data_type_date64_serde.cpp b/be/src/vec/data_types/serde/data_type_date64_serde.cpp index 02e44f04d1..a35c78aa24 100644 --- a/be/src/vec/data_types/serde/data_type_date64_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_date64_serde.cpp @@ -111,8 +111,9 @@ void DataTypeDate64SerDe::read_column_from_arrow(IColumn& column, const arrow::A template Status DataTypeDate64SerDe::_write_column_to_mysql( - const IColumn& column, std::vector>& result, int row_idx, - int start, int end, bool col_const) const { + const IColumn& column, bool return_object_data_as_binary, + std::vector>& result, int row_idx, int start, int end, + bool col_const) const { auto& data = assert_cast&>(column).get_data(); int buf_ret = 0; for (ssize_t i = start; i < end; ++i) { diff --git a/be/src/vec/data_types/serde/data_type_date64_serde.h b/be/src/vec/data_types/serde/data_type_date64_serde.h index a05913cd17..205d3f8009 100644 --- a/be/src/vec/data_types/serde/data_type_date64_serde.h +++ b/be/src/vec/data_types/serde/data_type_date64_serde.h @@ -47,19 +47,23 @@ class DataTypeDate64SerDe : public DataTypeNumberSerDe { int end) const override; void read_column_from_arrow(IColumn& column, const arrow::Array* arrow_array, int start, int end, const cctz::time_zone& ctz) const override; - Status write_column_to_mysql(const IColumn& column, std::vector>& result, - int row_idx, int start, int end, bool col_const) const override { - return _write_column_to_mysql(column, result, row_idx, start, end, col_const); + Status write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary, + std::vector>& result, int row_idx, int start, + int end, bool col_const) const override { + return _write_column_to_mysql(column, return_object_data_as_binary, result, row_idx, start, + end, col_const); } - Status write_column_to_mysql(const IColumn& column, std::vector>& result, - int row_idx, int start, int end, bool col_const) const override { - return _write_column_to_mysql(column, result, row_idx, start, end, col_const); + Status write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary, + std::vector>& result, int row_idx, int start, + int end, bool col_const) const override { + return _write_column_to_mysql(column, return_object_data_as_binary, result, row_idx, start, + end, col_const); } private: template - Status _write_column_to_mysql(const IColumn& column, + Status _write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary, std::vector>& result, int row_idx, int start, int end, bool col_const) const; }; diff --git a/be/src/vec/data_types/serde/data_type_datetimev2_serde.cpp b/be/src/vec/data_types/serde/data_type_datetimev2_serde.cpp index 72231c023d..6f227969f9 100644 --- a/be/src/vec/data_types/serde/data_type_datetimev2_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_datetimev2_serde.cpp @@ -48,8 +48,9 @@ void DataTypeDateTimeV2SerDe::write_column_to_arrow(const IColumn& column, const } template Status DataTypeDateTimeV2SerDe::_write_column_to_mysql( - const IColumn& column, std::vector>& result, int row_idx, - int start, int end, bool col_const) const { + const IColumn& column, bool return_object_data_as_binary, + std::vector>& result, int row_idx, int start, int end, + bool col_const) const { auto& data = assert_cast&>(column).get_data(); int buf_ret = 0; for (ssize_t i = start; i < end; ++i) { diff --git a/be/src/vec/data_types/serde/data_type_datetimev2_serde.h b/be/src/vec/data_types/serde/data_type_datetimev2_serde.h index 3727696089..c92120480a 100644 --- a/be/src/vec/data_types/serde/data_type_datetimev2_serde.h +++ b/be/src/vec/data_types/serde/data_type_datetimev2_serde.h @@ -51,19 +51,23 @@ public: int end, const cctz::time_zone& ctz) const override { LOG(FATAL) << "not support read arrow array to uint64 column"; } - Status write_column_to_mysql(const IColumn& column, std::vector>& result, - int row_idx, int start, int end, bool col_const) const override { - return _write_column_to_mysql(column, result, row_idx, start, end, col_const); + Status write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary, + std::vector>& result, int row_idx, int start, + int end, bool col_const) const override { + return _write_column_to_mysql(column, return_object_data_as_binary, result, row_idx, start, + end, col_const); } - Status write_column_to_mysql(const IColumn& column, std::vector>& result, - int row_idx, int start, int end, bool col_const) const override { - return _write_column_to_mysql(column, result, row_idx, start, end, col_const); + Status write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary, + std::vector>& result, int row_idx, int start, + int end, bool col_const) const override { + return _write_column_to_mysql(column, return_object_data_as_binary, result, row_idx, start, + end, col_const); } private: template - Status _write_column_to_mysql(const IColumn& column, + Status _write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary, std::vector>& result, int row_idx, int start, int end, bool col_const) const; int scale; diff --git a/be/src/vec/data_types/serde/data_type_datev2_serde.cpp b/be/src/vec/data_types/serde/data_type_datev2_serde.cpp index 312654a6a8..ea75fdc641 100644 --- a/be/src/vec/data_types/serde/data_type_datev2_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_datev2_serde.cpp @@ -67,8 +67,9 @@ void DataTypeDateV2SerDe::read_column_from_arrow(IColumn& column, const arrow::A } template Status DataTypeDateV2SerDe::_write_column_to_mysql( - const IColumn& column, std::vector>& result, int row_idx, - int start, int end, bool col_const) const { + const IColumn& column, bool return_object_data_as_binary, + std::vector>& result, int row_idx, int start, int end, + bool col_const) const { auto& data = assert_cast&>(column).get_data(); int buf_ret = 0; for (ssize_t i = start; i < end; ++i) { diff --git a/be/src/vec/data_types/serde/data_type_datev2_serde.h b/be/src/vec/data_types/serde/data_type_datev2_serde.h index 3faa4df7a2..934c00e6ee 100644 --- a/be/src/vec/data_types/serde/data_type_datev2_serde.h +++ b/be/src/vec/data_types/serde/data_type_datev2_serde.h @@ -47,19 +47,23 @@ class DataTypeDateV2SerDe : public DataTypeNumberSerDe { int end) const override; void read_column_from_arrow(IColumn& column, const arrow::Array* arrow_array, int start, int end, const cctz::time_zone& ctz) const override; - Status write_column_to_mysql(const IColumn& column, std::vector>& result, - int row_idx, int start, int end, bool col_const) const override { - return _write_column_to_mysql(column, result, row_idx, start, end, col_const); + Status write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary, + std::vector>& result, int row_idx, int start, + int end, bool col_const) const override { + return _write_column_to_mysql(column, return_object_data_as_binary, result, row_idx, start, + end, col_const); } - Status write_column_to_mysql(const IColumn& column, std::vector>& result, - int row_idx, int start, int end, bool col_const) const override { - return _write_column_to_mysql(column, result, row_idx, start, end, col_const); + Status write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary, + std::vector>& result, int row_idx, int start, + int end, bool col_const) const override { + return _write_column_to_mysql(column, return_object_data_as_binary, result, row_idx, start, + end, col_const); } private: template - Status _write_column_to_mysql(const IColumn& column, + Status _write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary, std::vector>& result, int row_idx, int start, int end, bool col_const) const; }; diff --git a/be/src/vec/data_types/serde/data_type_decimal_serde.h b/be/src/vec/data_types/serde/data_type_decimal_serde.h index 92339da1df..a082490e51 100644 --- a/be/src/vec/data_types/serde/data_type_decimal_serde.h +++ b/be/src/vec/data_types/serde/data_type_decimal_serde.h @@ -63,19 +63,23 @@ public: int end) const override; void read_column_from_arrow(IColumn& column, const arrow::Array* arrow_array, int start, int end, const cctz::time_zone& ctz) const override; - Status write_column_to_mysql(const IColumn& column, std::vector>& result, - int row_idx, int start, int end, bool col_const) const override { - return _write_column_to_mysql(column, result, row_idx, start, end, col_const); + Status write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary, + std::vector>& result, int row_idx, int start, + int end, bool col_const) const override { + return _write_column_to_mysql(column, return_object_data_as_binary, result, row_idx, start, + end, col_const); } - Status write_column_to_mysql(const IColumn& column, std::vector>& result, - int row_idx, int start, int end, bool col_const) const override { - return _write_column_to_mysql(column, result, row_idx, start, end, col_const); + Status write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary, + std::vector>& result, int row_idx, int start, + int end, bool col_const) const override { + return _write_column_to_mysql(column, return_object_data_as_binary, result, row_idx, start, + end, col_const); } private: template - Status _write_column_to_mysql(const IColumn& column, + Status _write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary, std::vector>& result, int row_idx, int start, int end, bool col_const) const; @@ -85,8 +89,9 @@ private: template template Status DataTypeDecimalSerDe::_write_column_to_mysql( - const IColumn& column, std::vector>& result, int row_idx, - int start, int end, bool col_const) const { + const IColumn& column, bool return_object_data_as_binary, + std::vector>& result, int row_idx, int start, int end, + bool col_const) const { int buf_ret = 0; auto& data = assert_cast&>(column).get_data(); for (int i = start; i < end; ++i) { diff --git a/be/src/vec/data_types/serde/data_type_fixedlengthobject_serde.h b/be/src/vec/data_types/serde/data_type_fixedlengthobject_serde.h index 6dabdce9ed..457a013622 100644 --- a/be/src/vec/data_types/serde/data_type_fixedlengthobject_serde.h +++ b/be/src/vec/data_types/serde/data_type_fixedlengthobject_serde.h @@ -60,12 +60,14 @@ public: int end, const cctz::time_zone& ctz) const override { LOG(FATAL) << "Not support read FixedLengthObject column from arrow"; } - Status write_column_to_mysql(const IColumn& column, std::vector>& result, - int row_idx, int start, int end, bool col_const) const override { + Status write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary, + std::vector>& result, int row_idx, int start, + int end, bool col_const) const override { LOG(FATAL) << "Not support write object column to mysql"; } - Status write_column_to_mysql(const IColumn& column, std::vector>& result, - int row_idx, int start, int end, bool col_const) const override { + Status write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary, + std::vector>& result, int row_idx, int start, + int end, bool col_const) const override { LOG(FATAL) << "Not support write object column to mysql"; } }; diff --git a/be/src/vec/data_types/serde/data_type_hll_serde.cpp b/be/src/vec/data_types/serde/data_type_hll_serde.cpp index 5e223a5d36..aadb6e2289 100644 --- a/be/src/vec/data_types/serde/data_type_hll_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_hll_serde.cpp @@ -103,14 +103,25 @@ void DataTypeHLLSerDe::write_column_to_arrow(const IColumn& column, const UInt8* template Status DataTypeHLLSerDe::_write_column_to_mysql( - const IColumn& column, std::vector>& result, int row_idx, - int start, int end, bool col_const) const { + const IColumn& column, bool return_object_data_as_binary, + std::vector>& result, int row_idx, int start, int end, + bool col_const) const { + auto& data_column = assert_cast(column); int buf_ret = 0; for (ssize_t i = start; i < end; ++i) { if (0 != buf_ret) { return Status::InternalError("pack mysql buffer failed."); } - buf_ret = result[row_idx].push_null(); + if (return_object_data_as_binary) { + const auto col_index = index_check_const(i, col_const); + HyperLogLog hyperLogLog = data_column.get_element(col_index); + size_t size = hyperLogLog.max_serialized_size(); + std::unique_ptr buf = std::make_unique(size); + hyperLogLog.serialize((uint8*)buf.get()); + buf_ret = result[row_idx].push_string(buf.get(), size); + } else { + buf_ret = result[row_idx].push_null(); + } ++row_idx; } return Status::OK(); diff --git a/be/src/vec/data_types/serde/data_type_hll_serde.h b/be/src/vec/data_types/serde/data_type_hll_serde.h index dbf1b3455f..c921e55e5f 100644 --- a/be/src/vec/data_types/serde/data_type_hll_serde.h +++ b/be/src/vec/data_types/serde/data_type_hll_serde.h @@ -48,20 +48,24 @@ public: int end, const cctz::time_zone& ctz) const override { LOG(FATAL) << "Not support read hll column from arrow"; } - Status write_column_to_mysql(const IColumn& column, std::vector>& result, - int row_idx, int start, int end, bool col_const) const override { - return _write_column_to_mysql(column, result, row_idx, start, end, col_const); + Status write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary, + std::vector>& result, int row_idx, int start, + int end, bool col_const) const override { + return _write_column_to_mysql(column, return_object_data_as_binary, result, row_idx, start, + end, col_const); } - Status write_column_to_mysql(const IColumn& column, std::vector>& result, - int row_idx, int start, int end, bool col_const) const override { - return _write_column_to_mysql(column, result, row_idx, start, end, col_const); + Status write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary, + std::vector>& result, int row_idx, int start, + int end, bool col_const) const override { + return _write_column_to_mysql(column, return_object_data_as_binary, result, row_idx, start, + end, col_const); } private: // Hll is binary data which is not shown by mysql. template - Status _write_column_to_mysql(const IColumn& column, + Status _write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary, std::vector>& result, int row_idx, int start, int end, bool col_const) const; }; diff --git a/be/src/vec/data_types/serde/data_type_jsonb_serde.h b/be/src/vec/data_types/serde/data_type_jsonb_serde.h index 3443201722..2eb52db5e1 100644 --- a/be/src/vec/data_types/serde/data_type_jsonb_serde.h +++ b/be/src/vec/data_types/serde/data_type_jsonb_serde.h @@ -34,18 +34,22 @@ namespace vectorized { class Arena; class DataTypeJsonbSerDe : public DataTypeStringSerDe { - Status write_column_to_mysql(const IColumn& column, std::vector>& result, - int row_idx, int start, int end, bool col_const) const override { - return _write_jsonb_column_to_mysql(column, result, row_idx, start, end, col_const); + Status write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary, + std::vector>& result, int row_idx, int start, + int end, bool col_const) const override { + return _write_jsonb_column_to_mysql(column, return_object_data_as_binary, result, row_idx, + start, end, col_const); } - Status write_column_to_mysql(const IColumn& column, std::vector>& result, - int row_idx, int start, int end, bool col_const) const override { - return _write_jsonb_column_to_mysql(column, result, row_idx, start, end, col_const); + Status write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary, + std::vector>& result, int row_idx, int start, + int end, bool col_const) const override { + return _write_jsonb_column_to_mysql(column, return_object_data_as_binary, result, row_idx, + start, end, col_const); } private: template - Status _write_jsonb_column_to_mysql(const IColumn& column, + Status _write_jsonb_column_to_mysql(const IColumn& column, bool return_object_data_as_binary, std::vector>& result, int row_idx, int start, int end, bool col_const) const { int buf_ret = 0; diff --git a/be/src/vec/data_types/serde/data_type_map_serde.cpp b/be/src/vec/data_types/serde/data_type_map_serde.cpp index 4cb4fdc039..5b450081b3 100644 --- a/be/src/vec/data_types/serde/data_type_map_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_map_serde.cpp @@ -56,8 +56,9 @@ void DataTypeMapSerDe::read_column_from_arrow(IColumn& column, const arrow::Arra } template Status DataTypeMapSerDe::_write_column_to_mysql( - const IColumn& column, std::vector>& result, int row_idx, - int start, int end, bool col_const) const { + const IColumn& column, bool return_object_data_as_binary, + std::vector>& result, int row_idx, int start, int end, + bool col_const) const { int buf_ret = 0; auto& map_column = assert_cast(column); const IColumn& nested_keys_column = map_column.get_keys(); @@ -82,12 +83,14 @@ Status DataTypeMapSerDe::_write_column_to_mysql( } else { if (is_key_string) { buf_ret = result[row_idx].push_string("\"", 1); - RETURN_IF_ERROR(key_serde->write_column_to_mysql(nested_keys_column, result, - row_idx, j, j + 1, col_const)); + RETURN_IF_ERROR(key_serde->write_column_to_mysql( + nested_keys_column, return_object_data_as_binary, result, row_idx, j, + j + 1, col_const)); buf_ret = result[row_idx].push_string("\"", 1); } else { - RETURN_IF_ERROR(key_serde->write_column_to_mysql(nested_keys_column, result, - row_idx, j, j + 1, col_const)); + RETURN_IF_ERROR(key_serde->write_column_to_mysql( + nested_keys_column, return_object_data_as_binary, result, row_idx, j, + j + 1, col_const)); } } buf_ret = result[row_idx].push_string(":", 1); @@ -97,11 +100,13 @@ Status DataTypeMapSerDe::_write_column_to_mysql( if (is_val_string) { buf_ret = result[row_idx].push_string("\"", 1); RETURN_IF_ERROR(value_serde->write_column_to_mysql( - nested_values_column, result, row_idx, j, j + 1, col_const)); + nested_values_column, return_object_data_as_binary, result, row_idx, j, + j + 1, col_const)); buf_ret = result[row_idx].push_string("\"", 1); } else { RETURN_IF_ERROR(value_serde->write_column_to_mysql( - nested_values_column, result, row_idx, j, j + 1, col_const)); + nested_values_column, return_object_data_as_binary, result, row_idx, j, + j + 1, col_const)); } } } diff --git a/be/src/vec/data_types/serde/data_type_map_serde.h b/be/src/vec/data_types/serde/data_type_map_serde.h index d601519598..21e2c4dacc 100644 --- a/be/src/vec/data_types/serde/data_type_map_serde.h +++ b/be/src/vec/data_types/serde/data_type_map_serde.h @@ -55,19 +55,23 @@ public: int end) const override; void read_column_from_arrow(IColumn& column, const arrow::Array* arrow_array, int start, int end, const cctz::time_zone& ctz) const override; - Status write_column_to_mysql(const IColumn& column, std::vector>& result, - int row_idx, int start, int end, bool col_const) const override { - return _write_column_to_mysql(column, result, row_idx, start, end, col_const); + Status write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary, + std::vector>& result, int row_idx, int start, + int end, bool col_const) const override { + return _write_column_to_mysql(column, return_object_data_as_binary, result, row_idx, start, + end, col_const); } - Status write_column_to_mysql(const IColumn& column, std::vector>& result, - int row_idx, int start, int end, bool col_const) const override { - return _write_column_to_mysql(column, result, row_idx, start, end, col_const); + Status write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary, + std::vector>& result, int row_idx, int start, + int end, bool col_const) const override { + return _write_column_to_mysql(column, return_object_data_as_binary, result, row_idx, start, + end, col_const); } private: template - Status _write_column_to_mysql(const IColumn& column, + Status _write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary, std::vector>& result, int row_idx, int start, int end, bool col_const) const; diff --git a/be/src/vec/data_types/serde/data_type_nullable_serde.cpp b/be/src/vec/data_types/serde/data_type_nullable_serde.cpp index 1902720148..736e1b5f56 100644 --- a/be/src/vec/data_types/serde/data_type_nullable_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_nullable_serde.cpp @@ -134,8 +134,9 @@ void DataTypeNullableSerDe::read_column_from_arrow(IColumn& column, const arrow: template Status DataTypeNullableSerDe::_write_column_to_mysql( - const IColumn& column, std::vector>& result, int row_idx, - int start, int end, bool col_const) const { + const IColumn& column, bool return_object_data_as_binary, + std::vector>& result, int row_idx, int start, int end, + bool col_const) const { int buf_ret = 0; auto& col = static_cast(column); auto& nested_col = col.get_nested_column(); @@ -148,7 +149,8 @@ Status DataTypeNullableSerDe::_write_column_to_mysql( buf_ret = result[row_idx].push_null(); } else { RETURN_IF_ERROR(nested_serde->write_column_to_mysql( - nested_col, result, row_idx, col_index, col_index + 1, col_const)); + nested_col, return_object_data_as_binary, result, row_idx, col_index, + col_index + 1, col_const)); } ++row_idx; } diff --git a/be/src/vec/data_types/serde/data_type_nullable_serde.h b/be/src/vec/data_types/serde/data_type_nullable_serde.h index 8b753478d7..6f05d002a2 100644 --- a/be/src/vec/data_types/serde/data_type_nullable_serde.h +++ b/be/src/vec/data_types/serde/data_type_nullable_serde.h @@ -48,19 +48,23 @@ public: int end) const override; void read_column_from_arrow(IColumn& column, const arrow::Array* arrow_array, int start, int end, const cctz::time_zone& ctz) const override; - Status write_column_to_mysql(const IColumn& column, std::vector>& result, - int row_idx, int start, int end, bool col_const) const override { - return _write_column_to_mysql(column, result, row_idx, start, end, col_const); + Status write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary, + std::vector>& result, int row_idx, int start, + int end, bool col_const) const override { + return _write_column_to_mysql(column, return_object_data_as_binary, result, row_idx, start, + end, col_const); } - Status write_column_to_mysql(const IColumn& column, std::vector>& result, - int row_idx, int start, int end, bool col_const) const override { - return _write_column_to_mysql(column, result, row_idx, start, end, col_const); + Status write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary, + std::vector>& result, int row_idx, int start, + int end, bool col_const) const override { + return _write_column_to_mysql(column, return_object_data_as_binary, result, row_idx, start, + end, col_const); } private: template - Status _write_column_to_mysql(const IColumn& column, + Status _write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary, std::vector>& result, int row_idx, int start, int end, bool col_const) const; diff --git a/be/src/vec/data_types/serde/data_type_number_serde.h b/be/src/vec/data_types/serde/data_type_number_serde.h index 8c0a16fa8b..e301281a6e 100644 --- a/be/src/vec/data_types/serde/data_type_number_serde.h +++ b/be/src/vec/data_types/serde/data_type_number_serde.h @@ -68,19 +68,23 @@ public: int end) const override; void read_column_from_arrow(IColumn& column, const arrow::Array* arrow_array, int start, int end, const cctz::time_zone& ctz) const override; - Status write_column_to_mysql(const IColumn& column, std::vector>& result, - int row_idx, int start, int end, bool col_const) const override { - return _write_column_to_mysql(column, result, row_idx, start, end, col_const); + Status write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary, + std::vector>& result, int row_idx, int start, + int end, bool col_const) const override { + return _write_column_to_mysql(column, return_object_data_as_binary, result, row_idx, start, + end, col_const); } - Status write_column_to_mysql(const IColumn& column, std::vector>& result, - int row_idx, int start, int end, bool col_const) const override { - return _write_column_to_mysql(column, result, row_idx, start, end, col_const); + Status write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary, + std::vector>& result, int row_idx, int start, + int end, bool col_const) const override { + return _write_column_to_mysql(column, return_object_data_as_binary, result, row_idx, start, + end, col_const); } private: template - Status _write_column_to_mysql(const IColumn& column, + Status _write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary, std::vector>& result, int row_idx, int start, int end, bool col_const) const; }; @@ -88,8 +92,9 @@ private: template template Status DataTypeNumberSerDe::_write_column_to_mysql( - const IColumn& column, std::vector>& result, int row_idx, - int start, int end, bool col_const) const { + const IColumn& column, bool return_object_data_as_binary, + std::vector>& result, int row_idx, int start, int end, + bool col_const) const { int buf_ret = 0; auto& data = assert_cast(column).get_data(); for (auto i = start; i < end; ++i) { diff --git a/be/src/vec/data_types/serde/data_type_object_serde.h b/be/src/vec/data_types/serde/data_type_object_serde.h index 2648cdc582..919764b4c7 100644 --- a/be/src/vec/data_types/serde/data_type_object_serde.h +++ b/be/src/vec/data_types/serde/data_type_object_serde.h @@ -61,12 +61,14 @@ public: int end, const cctz::time_zone& ctz) const override { LOG(FATAL) << "Not support read object column from arrow"; } - Status write_column_to_mysql(const IColumn& column, std::vector>& result, - int row_idx, int start, int end, bool col_const) const override { + Status write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary, + std::vector>& result, int row_idx, int start, + int end, bool col_const) const override { LOG(FATAL) << "Not support write object column to mysql"; } - Status write_column_to_mysql(const IColumn& column, std::vector>& result, - int row_idx, int start, int end, bool col_const) const override { + Status write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary, + std::vector>& result, int row_idx, int start, + int end, bool col_const) const override { LOG(FATAL) << "Not support write object column to mysql"; } }; diff --git a/be/src/vec/data_types/serde/data_type_quantilestate_serde.h b/be/src/vec/data_types/serde/data_type_quantilestate_serde.h index fa6f554c20..ac1a5ea500 100644 --- a/be/src/vec/data_types/serde/data_type_quantilestate_serde.h +++ b/be/src/vec/data_types/serde/data_type_quantilestate_serde.h @@ -57,19 +57,23 @@ public: int end, const cctz::time_zone& ctz) const override { LOG(FATAL) << "Not support read " << column.get_name() << " from arrow"; } - Status write_column_to_mysql(const IColumn& column, std::vector>& result, - int row_idx, int start, int end, bool col_const) const override { - return _write_column_to_mysql(column, result, row_idx, start, end, col_const); + Status write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary, + std::vector>& result, int row_idx, int start, + int end, bool col_const) const override { + return _write_column_to_mysql(column, return_object_data_as_binary, result, row_idx, start, + end, col_const); } - Status write_column_to_mysql(const IColumn& column, std::vector>& result, - int row_idx, int start, int end, bool col_const) const override { - return _write_column_to_mysql(column, result, row_idx, start, end, col_const); + Status write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary, + std::vector>& result, int row_idx, int start, + int end, bool col_const) const override { + return _write_column_to_mysql(column, return_object_data_as_binary, result, row_idx, start, + end, col_const); } private: template - Status _write_column_to_mysql(const IColumn& column, + Status _write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary, std::vector>& result, int row_idx, int start, int end, bool col_const) const; }; @@ -123,14 +127,26 @@ void DataTypeQuantileStateSerDe::read_one_cell_from_jsonb(IColumn& column, template template Status DataTypeQuantileStateSerDe::_write_column_to_mysql( - const IColumn& column, std::vector>& result, int row_idx, - int start, int end, bool col_const) const { + const IColumn& column, bool return_object_data_as_binary, + std::vector>& result, int row_idx, int start, int end, + bool col_const) const { + auto& data_column = reinterpret_cast&>(column); int buf_ret = 0; for (ssize_t i = start; i < end; ++i) { if (0 != buf_ret) { return Status::InternalError("pack mysql buffer failed."); } - buf_ret = result[row_idx].push_null(); + if (return_object_data_as_binary) { + const auto col_index = index_check_const(i, col_const); + auto& quantile_value = + const_cast&>(data_column.get_element(col_index)); + size_t size = quantile_value.get_serialized_size(); + std::unique_ptr buf = std::make_unique(size); + quantile_value.serialize((uint8_t*)buf.get()); + buf_ret = result[row_idx].push_string(buf.get(), size); + } else { + buf_ret = result[row_idx].push_null(); + } ++row_idx; } return Status::OK(); diff --git a/be/src/vec/data_types/serde/data_type_serde.h b/be/src/vec/data_types/serde/data_type_serde.h index 8072da3032..6910ac202e 100644 --- a/be/src/vec/data_types/serde/data_type_serde.h +++ b/be/src/vec/data_types/serde/data_type_serde.h @@ -74,11 +74,11 @@ public: virtual void read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg) const = 0; // MySQL serializer and deserializer - virtual Status write_column_to_mysql(const IColumn& column, + virtual Status write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary, std::vector>& result, int row_idx, int start, int end, bool col_const) const = 0; - virtual Status write_column_to_mysql(const IColumn& column, + virtual Status write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary, std::vector>& result, int start, int row_idx, int end, bool col_const) const = 0; // Thrift serializer and deserializer diff --git a/be/src/vec/data_types/serde/data_type_string_serde.cpp b/be/src/vec/data_types/serde/data_type_string_serde.cpp index e12aa45ad5..c9bd6427a2 100644 --- a/be/src/vec/data_types/serde/data_type_string_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_string_serde.cpp @@ -125,8 +125,9 @@ void DataTypeStringSerDe::read_column_from_arrow(IColumn& column, const arrow::A } template Status DataTypeStringSerDe::_write_column_to_mysql( - const IColumn& column, std::vector>& result, int row_idx, - int start, int end, bool col_const) const { + const IColumn& column, bool return_object_data_as_binary, + std::vector>& result, int row_idx, int start, int end, + bool col_const) const { int buf_ret = 0; auto& col = assert_cast(column); for (ssize_t i = start; i < end; ++i) { diff --git a/be/src/vec/data_types/serde/data_type_string_serde.h b/be/src/vec/data_types/serde/data_type_string_serde.h index 24167d1448..a69c416fa5 100644 --- a/be/src/vec/data_types/serde/data_type_string_serde.h +++ b/be/src/vec/data_types/serde/data_type_string_serde.h @@ -46,19 +46,23 @@ public: int end) const override; void read_column_from_arrow(IColumn& column, const arrow::Array* arrow_array, int start, int end, const cctz::time_zone& ctz) const override; - Status write_column_to_mysql(const IColumn& column, std::vector>& result, - int row_idx, int start, int end, bool col_const) const override { - return _write_column_to_mysql(column, result, row_idx, start, end, col_const); + Status write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary, + std::vector>& result, int row_idx, int start, + int end, bool col_const) const override { + return _write_column_to_mysql(column, return_object_data_as_binary, result, row_idx, start, + end, col_const); } - Status write_column_to_mysql(const IColumn& column, std::vector>& result, - int row_idx, int start, int end, bool col_const) const override { - return _write_column_to_mysql(column, result, row_idx, start, end, col_const); + Status write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary, + std::vector>& result, int row_idx, int start, + int end, bool col_const) const override { + return _write_column_to_mysql(column, return_object_data_as_binary, result, row_idx, start, + end, col_const); } private: template - Status _write_column_to_mysql(const IColumn& column, + Status _write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary, std::vector>& result, int row_idx, int start, int end, bool col_const) const; }; diff --git a/be/src/vec/data_types/serde/data_type_struct_serde.cpp b/be/src/vec/data_types/serde/data_type_struct_serde.cpp index 0840cd96d3..3646b9a7ef 100644 --- a/be/src/vec/data_types/serde/data_type_struct_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_struct_serde.cpp @@ -58,8 +58,9 @@ void DataTypeStructSerDe::read_column_from_arrow(IColumn& column, const arrow::A } template Status DataTypeStructSerDe::_write_column_to_mysql( - const IColumn& column, std::vector>& result, int row_idx, - int start, int end, bool col_const) const { + const IColumn& column, bool return_object_data_as_binary, + std::vector>& result, int row_idx, int start, int end, + bool col_const) const { int buf_ret = 0; auto& col = assert_cast(column); for (ssize_t i = start; i < end; ++i) { @@ -81,13 +82,13 @@ Status DataTypeStructSerDe::_write_column_to_mysql( if (remove_nullable(col.get_column_ptr(j))->is_column_string()) { buf_ret = result[row_idx].push_string("\"", 1); RETURN_IF_ERROR(elemSerDeSPtrs[j]->write_column_to_mysql( - col.get_column(j), result, row_idx, col_index, col_index + 1, - col_const)); + col.get_column(j), return_object_data_as_binary, result, row_idx, + col_index, col_index + 1, col_const)); buf_ret = result[row_idx].push_string("\"", 1); } else { RETURN_IF_ERROR(elemSerDeSPtrs[j]->write_column_to_mysql( - col.get_column(j), result, row_idx, col_index, col_index + 1, - col_const)); + col.get_column(j), return_object_data_as_binary, result, row_idx, + col_index, col_index + 1, col_const)); } } begin = false; diff --git a/be/src/vec/data_types/serde/data_type_struct_serde.h b/be/src/vec/data_types/serde/data_type_struct_serde.h index 519c251773..b89f3eedb1 100644 --- a/be/src/vec/data_types/serde/data_type_struct_serde.h +++ b/be/src/vec/data_types/serde/data_type_struct_serde.h @@ -56,19 +56,23 @@ public: int end) const override; void read_column_from_arrow(IColumn& column, const arrow::Array* arrow_array, int start, int end, const cctz::time_zone& ctz) const override; - Status write_column_to_mysql(const IColumn& column, std::vector>& result, - int row_idx, int start, int end, bool col_const) const override { - return _write_column_to_mysql(column, result, row_idx, start, end, col_const); + Status write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary, + std::vector>& result, int row_idx, int start, + int end, bool col_const) const override { + return _write_column_to_mysql(column, return_object_data_as_binary, result, row_idx, start, + end, col_const); } - Status write_column_to_mysql(const IColumn& column, std::vector>& result, - int row_idx, int start, int end, bool col_const) const override { - return _write_column_to_mysql(column, result, row_idx, start, end, col_const); + Status write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary, + std::vector>& result, int row_idx, int start, + int end, bool col_const) const override { + return _write_column_to_mysql(column, return_object_data_as_binary, result, row_idx, start, + end, col_const); } private: template - Status _write_column_to_mysql(const IColumn& column, + Status _write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary, std::vector>& result, int row_idx, int start, int end, bool col_const) const; diff --git a/be/src/vec/data_types/serde/data_type_time_serde.h b/be/src/vec/data_types/serde/data_type_time_serde.h index d274a34571..78fc962466 100644 --- a/be/src/vec/data_types/serde/data_type_time_serde.h +++ b/be/src/vec/data_types/serde/data_type_time_serde.h @@ -31,18 +31,23 @@ namespace vectorized { class Arena; class DataTypeTimeSerDe : public DataTypeNumberSerDe { - Status write_column_to_mysql(const IColumn& column, std::vector>& result, - int row_idx, int start, int end, bool col_const) const override { - return _write_date_time_column_to_mysql(column, result, row_idx, start, end, col_const); + Status write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary, + std::vector>& result, int row_idx, int start, + int end, bool col_const) const override { + return _write_date_time_column_to_mysql(column, return_object_data_as_binary, result, + row_idx, start, end, col_const); } - Status write_column_to_mysql(const IColumn& column, std::vector>& result, - int row_idx, int start, int end, bool col_const) const override { - return _write_date_time_column_to_mysql(column, result, row_idx, start, end, col_const); + Status write_column_to_mysql(const IColumn& column, bool return_object_data_as_binary, + std::vector>& result, int row_idx, int start, + int end, bool col_const) const override { + return _write_date_time_column_to_mysql(column, return_object_data_as_binary, result, + row_idx, start, end, col_const); } private: template Status _write_date_time_column_to_mysql(const IColumn& column, + bool return_object_data_as_binary, std::vector>& result, int row_idx, int start, int end, bool col_const) const { int buf_ret = 0; diff --git a/be/src/vec/sink/vmysql_result_writer.cpp b/be/src/vec/sink/vmysql_result_writer.cpp index b001a10526..6e3e34bcc6 100644 --- a/be/src/vec/sink/vmysql_result_writer.cpp +++ b/be/src/vec/sink/vmysql_result_writer.cpp @@ -624,8 +624,8 @@ Status VMysqlResultWriter::append_block(Block& input_block) { << fmt::format("block's rows({}) != column{}'s size({})", num_rows, i, block.get_by_position(i).column->size()); - RETURN_IF_ERROR(type_ptr->get_serde()->write_column_to_mysql(*column_ptr, rows_buffer, 0, 0, - num_rows, col_const)); + RETURN_IF_ERROR(type_ptr->get_serde()->write_column_to_mysql( + *column_ptr, output_object_data(), rows_buffer, 0, 0, num_rows, col_const)); if (!status) { LOG(WARNING) << "convert row to mysql result failed. block_struct="