diff --git a/be/src/exec/rowid_fetcher.cpp b/be/src/exec/rowid_fetcher.cpp index b75f968800..28adba4ec2 100644 --- a/be/src/exec/rowid_fetcher.cpp +++ b/be/src/exec/rowid_fetcher.cpp @@ -53,6 +53,7 @@ #include "vec/common/string_ref.h" #include "vec/core/block.h" // Block #include "vec/data_types/data_type_factory.hpp" +#include "vec/data_types/serde/data_type_serde.h" #include "vec/jsonb/serialize.h" namespace doris { @@ -120,7 +121,8 @@ Status RowIDFetcher::_merge_rpc_results(const PMultiGetRequest& request, return Status::InternalError(cntl.ErrorText()); } } - + vectorized::DataTypeSerDeSPtrs serdes; + std::unordered_map col_uid_to_idx; auto merge_function = [&](const PMultiGetResponse& resp) { Status st(resp.status()); if (!st.ok()) { @@ -136,10 +138,16 @@ Status RowIDFetcher::_merge_rpc_results(const PMultiGetRequest& request, if (output_block->is_empty_column()) { *output_block = vectorized::Block(_fetch_option.desc->slots(), 1); } + if (serdes.empty() && col_uid_to_idx.empty()) { + serdes = vectorized::create_data_type_serdes(_fetch_option.desc->slots()); + for (int i = 0; i < _fetch_option.desc->slots().size(); ++i) { + col_uid_to_idx[_fetch_option.desc->slots()[i]->col_unique_id()] = i; + } + } for (int i = 0; i < resp.binary_row_data_size(); ++i) { vectorized::JsonbSerializeUtil::jsonb_to_block( - *_fetch_option.desc, resp.binary_row_data(i).data(), - resp.binary_row_data(i).size(), *output_block); + serdes, resp.binary_row_data(i).data(), resp.binary_row_data(i).size(), + col_uid_to_idx, *output_block); } return Status::OK(); } diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index 627cd327af..c2d8d6f43e 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -51,6 +51,7 @@ #include "vec/common/assert_cast.h" #include "vec/core/column_with_type_and_name.h" #include "vec/data_types/data_type.h" +#include "vec/data_types/serde/data_type_serde.h" #include "vec/json/path_in_data.h" #include "vec/jsonb/serialize.h" @@ -500,8 +501,10 @@ void MemTable::serialize_block_to_row_column(vectorized::Block& block) { .assume_mutable() .get()); row_store_column->clear(); + vectorized::DataTypeSerDeSPtrs serdes = + vectorized::create_data_type_serdes(block.get_data_types()); vectorized::JsonbSerializeUtil::block_to_jsonb(*_tablet_schema, block, *row_store_column, - _num_columns); + _tablet_schema->num_columns(), serdes); VLOG_DEBUG << "serialize , num_rows:" << block.rows() << ", row_column_id:" << row_column_id << ", total_byte_size:" << block.allocated_bytes() << ", serialize_cost(us)" << watch.elapsed_time() / 1000; diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp index bcf685a046..1f63937a40 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp @@ -339,8 +339,10 @@ void SegmentWriter::_serialize_block_to_row_column(vectorized::Block& block) { .assume_mutable() .get()); row_store_column->clear(); + vectorized::DataTypeSerDeSPtrs serdes = + vectorized::create_data_type_serdes(block.get_data_types()); vectorized::JsonbSerializeUtil::block_to_jsonb(*_tablet_schema, block, *row_store_column, - _tablet_schema->num_columns()); + _tablet_schema->num_columns(), serdes); VLOG_DEBUG << "serialize , num_rows:" << block.rows() << ", row_column_id:" << row_column_id << ", total_byte_size:" << block.allocated_bytes() << ", serialize_cost(us)" << watch.elapsed_time() / 1000; diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 680686d850..25b1c18b1e 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -116,6 +116,7 @@ #include "vec/common/string_ref.h" #include "vec/data_types/data_type.h" #include "vec/data_types/data_type_factory.hpp" +#include "vec/data_types/serde/data_type_serde.h" #include "vec/jsonb/serialize.h" namespace doris { @@ -2429,7 +2430,17 @@ Status Tablet::fetch_value_through_row_column(RowsetSharedPtr input_rowset, uint RETURN_IF_ERROR(column_iterator->read_by_rowids(rowids.data(), rowids.size(), column_ptr)); assert(column_ptr->size() == rowids.size()); auto string_column = static_cast(column_ptr.get()); - vectorized::JsonbSerializeUtil::jsonb_to_block(tablet_schema, cids, *string_column, block); + vectorized::DataTypeSerDeSPtrs serdes; + serdes.resize(cids.size()); + std::unordered_map col_uid_to_idx; + for (int i = 0; i < cids.size(); ++i) { + const TabletColumn& column = tablet_schema->column(cids[i]); + vectorized::DataTypePtr type = + vectorized::DataTypeFactory::instance().create_data_type(column); + col_uid_to_idx[column.unique_id()] = i; + serdes[i] = type->get_serde(); + } + vectorized::JsonbSerializeUtil::jsonb_to_block(serdes, *string_column, col_uid_to_idx, block); return Status::OK(); } diff --git a/be/src/olap/tablet_schema.h b/be/src/olap/tablet_schema.h index 2ee7122a6b..85c58c02c8 100644 --- a/be/src/olap/tablet_schema.h +++ b/be/src/olap/tablet_schema.h @@ -40,7 +40,7 @@ namespace doris { namespace vectorized { class Block; -} +} // namespace vectorized struct OlapTableIndexSchema; class TColumn; diff --git a/be/src/service/point_query_executor.cpp b/be/src/service/point_query_executor.cpp index ab2ae7519e..1f982e6937 100644 --- a/be/src/service/point_query_executor.cpp +++ b/be/src/service/point_query_executor.cpp @@ -23,8 +23,10 @@ #include #include +#include #include +#include "gutil/integral_types.h" #include "olap/lru_cache.h" #include "olap/olap_tuple.h" #include "olap/row_cursor.h" @@ -35,6 +37,7 @@ #include "util/key_util.h" #include "util/runtime_profile.h" #include "util/thrift_util.h" +#include "vec/data_types/serde/data_type_serde.h" #include "vec/exprs/vexpr.h" #include "vec/exprs/vexpr_context.h" #include "vec/jsonb/serialize.h" @@ -65,6 +68,10 @@ Status Reusable::init(const TDescriptorTable& t_desc_tbl, const std::vectorslots()); + for (int i = 0; i < tuple_desc()->slots().size(); ++i) { + _col_uid_to_idx[tuple_desc()->slots()[i]->col_unique_id()] = i; + } return Status::OK(); } @@ -279,8 +286,10 @@ Status PointQueryExecutor::_lookup_row_data() { for (size_t i = 0; i < _row_read_ctxs.size(); ++i) { if (_row_read_ctxs[i]._cached_row_data.valid()) { vectorized::JsonbSerializeUtil::jsonb_to_block( - *_reusable->tuple_desc(), _row_read_ctxs[i]._cached_row_data.data().data, - _row_read_ctxs[i]._cached_row_data.data().size, *_result_block); + _reusable->get_data_type_serdes(), + _row_read_ctxs[i]._cached_row_data.data().data, + _row_read_ctxs[i]._cached_row_data.data().size, _reusable->get_col_uid_to_idx(), + *_result_block); continue; } if (!_row_read_ctxs[i]._row_location.has_value()) { @@ -293,8 +302,9 @@ Status PointQueryExecutor::_lookup_row_data() { _profile_metrics.read_stats, value, !config::disable_storage_row_cache /*whether write row cache*/)); // serilize value to block, currently only jsonb row formt - vectorized::JsonbSerializeUtil::jsonb_to_block(*_reusable->tuple_desc(), value.data(), - value.size(), *_result_block); + vectorized::JsonbSerializeUtil::jsonb_to_block( + _reusable->get_data_type_serdes(), value.data(), value.size(), + _reusable->get_col_uid_to_idx(), *_result_block); } return Status::OK(); } diff --git a/be/src/service/point_query_executor.h b/be/src/service/point_query_executor.h index bc8e770049..fa27bfcc54 100644 --- a/be/src/service/point_query_executor.h +++ b/be/src/service/point_query_executor.h @@ -31,6 +31,7 @@ #include #include #include +#include #include #include @@ -49,6 +50,7 @@ #include "util/runtime_profile.h" #include "util/slice.h" #include "vec/core/block.h" +#include "vec/data_types/serde/data_type_serde.h" namespace doris { @@ -76,6 +78,12 @@ public: std::unique_ptr get_block(); + const vectorized::DataTypeSerDeSPtrs& get_data_type_serdes() const { return _data_type_serdes; } + + const std::unordered_map& get_col_uid_to_idx() const { + return _col_uid_to_idx; + } + // do not touch block after returned void return_block(std::unique_ptr& block); @@ -92,6 +100,8 @@ private: std::vector> _block_pool; std::vector _output_exprs_ctxs; int64_t _create_timestamp = 0; + vectorized::DataTypeSerDeSPtrs _data_type_serdes; + std::unordered_map _col_uid_to_idx; }; // RowCache is a LRU cache for row store diff --git a/be/src/vec/data_types/serde/data_type_serde.cpp b/be/src/vec/data_types/serde/data_type_serde.cpp index 4ca78837d3..6a49ffe3da 100644 --- a/be/src/vec/data_types/serde/data_type_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_serde.cpp @@ -16,9 +16,30 @@ // under the License. #include "data_type_serde.h" +#include "runtime/descriptors.h" +#include "vec/data_types/data_type.h" + namespace doris { namespace vectorized { DataTypeSerDe::DataTypeSerDe() = default; DataTypeSerDe::~DataTypeSerDe() = default; + +DataTypeSerDeSPtrs create_data_type_serdes(const DataTypes& types) { + DataTypeSerDeSPtrs serdes; + serdes.reserve(types.size()); + for (const DataTypePtr& type : types) { + serdes.push_back(type->get_serde()); + } + return serdes; +} + +DataTypeSerDeSPtrs create_data_type_serdes(const std::vector& slots) { + DataTypeSerDeSPtrs serdes; + serdes.reserve(slots.size()); + for (const SlotDescriptor* slot : slots) { + serdes.push_back(slot->get_data_type_ptr()->get_serde()); + } + return serdes; +} } // namespace vectorized } // namespace doris diff --git a/be/src/vec/data_types/serde/data_type_serde.h b/be/src/vec/data_types/serde/data_type_serde.h index 2d196fcb9e..d48fe8751a 100644 --- a/be/src/vec/data_types/serde/data_type_serde.h +++ b/be/src/vec/data_types/serde/data_type_serde.h @@ -39,10 +39,12 @@ class time_zone; namespace doris { class PValues; class JsonbValue; +class SlotDescriptor; namespace vectorized { class IColumn; class Arena; +class IDataType; // Deserialize means read from different file format or memory format, // for example read from arrow, read from parquet. // Serialize means write the column cell or the total column into another @@ -99,5 +101,9 @@ inline void checkArrowStatus(const arrow::Status& status, const std::string& col using DataTypeSerDeSPtr = std::shared_ptr; using DataTypeSerDeSPtrs = std::vector; +DataTypeSerDeSPtrs create_data_type_serdes( + const std::vector>& types); +DataTypeSerDeSPtrs create_data_type_serdes(const std::vector& slots); + } // namespace vectorized } // namespace doris diff --git a/be/src/vec/jsonb/serialize.cpp b/be/src/vec/jsonb/serialize.cpp index c8b2eff0f4..c7b98095bb 100644 --- a/be/src/vec/jsonb/serialize.cpp +++ b/be/src/vec/jsonb/serialize.cpp @@ -37,13 +37,15 @@ #include "vec/common/arena.h" #include "vec/common/string_ref.h" #include "vec/core/column_with_type_and_name.h" +#include "vec/core/columns_with_type_and_name.h" #include "vec/data_types/data_type.h" #include "vec/data_types/serde/data_type_serde.h" namespace doris::vectorized { void JsonbSerializeUtil::block_to_jsonb(const TabletSchema& schema, const Block& block, - ColumnString& dst, int num_cols) { + ColumnString& dst, int num_cols, + const DataTypeSerDeSPtrs& serdes) { auto num_rows = block.rows(); Arena pool; assert(num_cols <= block.columns()); @@ -57,8 +59,8 @@ void JsonbSerializeUtil::block_to_jsonb(const TabletSchema& schema, const Block& // ignore dst row store column continue; } - block.get_data_type(j)->get_serde()->write_one_cell_to_jsonb( - *column, jsonb_writer, &pool, tablet_column.unique_id(), i); + serdes[j]->write_one_cell_to_jsonb(*column, jsonb_writer, &pool, + tablet_column.unique_id(), i); } jsonb_writer.writeEndObject(); dst.insert_data(jsonb_writer.getOutput()->getBuffer(), jsonb_writer.getOutput()->getSize()); @@ -66,56 +68,44 @@ void JsonbSerializeUtil::block_to_jsonb(const TabletSchema& schema, const Block& } // batch rows -void JsonbSerializeUtil::jsonb_to_block(const TupleDescriptor& desc, - const ColumnString& jsonb_column, Block& dst) { +void JsonbSerializeUtil::jsonb_to_block(const DataTypeSerDeSPtrs& serdes, + const ColumnString& jsonb_column, + const std::unordered_map& col_id_to_idx, + Block& dst) { for (int i = 0; i < jsonb_column.size(); ++i) { StringRef jsonb_data = jsonb_column.get_data_at(i); - jsonb_to_block(desc, jsonb_data.data, jsonb_data.size, dst); + jsonb_to_block(serdes, jsonb_data.data, jsonb_data.size, col_id_to_idx, dst); } } // single row -void JsonbSerializeUtil::jsonb_to_block(const TupleDescriptor& desc, const char* data, size_t size, +void JsonbSerializeUtil::jsonb_to_block(const DataTypeSerDeSPtrs& serdes, const char* data, + size_t size, + const std::unordered_map& col_id_to_idx, Block& dst) { auto pdoc = JsonbDocument::createDocument(data, size); JsonbDocument& doc = *pdoc; - for (int j = 0; j < desc.slots().size(); ++j) { - SlotDescriptor* slot = desc.slots()[j]; - JsonbValue* slot_value = doc->find(slot->col_unique_id()); - MutableColumnPtr dst_column = dst.get_by_position(j).column->assume_mutable(); - if (!slot_value || slot_value->isNull()) { - // null or not exist - dst_column->insert_default(); - continue; + size_t num_rows = dst.rows(); + size_t filled_columns = 0; + for (auto it = doc->begin(); it != doc->end(); ++it) { + auto col_it = col_id_to_idx.find(it->getKeyId()); + if (col_it != col_id_to_idx.end()) { + MutableColumnPtr dst_column = + dst.get_by_position(col_it->second).column->assume_mutable(); + serdes[col_it->second]->read_one_cell_from_jsonb(*dst_column, it->value()); + ++filled_columns; } - dst.get_data_type(j)->get_serde()->read_one_cell_from_jsonb(*dst_column, slot_value); } -} - -void JsonbSerializeUtil::jsonb_to_block(TabletSchemaSPtr schema, - const std::vector& col_ids, - const ColumnString& jsonb_column, Block& dst) { - for (int i = 0; i < jsonb_column.size(); ++i) { - StringRef jsonb_data = jsonb_column.get_data_at(i); - jsonb_to_block(schema, col_ids, jsonb_data.data, jsonb_data.size, dst); - } -} - -void JsonbSerializeUtil::jsonb_to_block(TabletSchemaSPtr schema, - const std::vector& col_ids, const char* data, - size_t size, Block& dst) { - auto pdoc = JsonbDocument::createDocument(data, size); - JsonbDocument& doc = *pdoc; - for (int j = 0; j < col_ids.size(); ++j) { - auto column = schema->column(col_ids[j]); - JsonbValue* slot_value = doc->find(column.unique_id()); - MutableColumnPtr dst_column = dst.get_by_position(j).column->assume_mutable(); - if (!slot_value || slot_value->isNull()) { - // null or not exist - dst_column->insert_default(); - continue; + if (filled_columns < dst.columns()) { + // fill missing slot + for (auto& column_type_name : dst) { + MutableColumnPtr col = column_type_name.column->assume_mutable(); + if (col->size() < num_rows + 1) { + DCHECK(col->size() == num_rows); + col->insert_default(); + } + DCHECK(col->size() == num_rows + 1); } - dst.get_data_type(j)->get_serde()->read_one_cell_from_jsonb(*dst_column, slot_value); } } diff --git a/be/src/vec/jsonb/serialize.h b/be/src/vec/jsonb/serialize.h index 4aa9eef0f8..725dbc0707 100644 --- a/be/src/vec/jsonb/serialize.h +++ b/be/src/vec/jsonb/serialize.h @@ -18,6 +18,8 @@ #pragma once #include +#include + #include "olap/tablet_schema.h" #include "runtime/descriptors.h" #include "vec/columns/column_string.h" @@ -37,20 +39,14 @@ namespace doris::vectorized { class JsonbSerializeUtil { public: static void block_to_jsonb(const TabletSchema& schema, const Block& block, ColumnString& dst, - int num_cols); + int num_cols, const DataTypeSerDeSPtrs& serdes); // batch rows - static void jsonb_to_block(const TupleDescriptor& desc, const ColumnString& jsonb_column, + static void jsonb_to_block(const DataTypeSerDeSPtrs& serdes, const ColumnString& jsonb_column, + const std::unordered_map& col_id_to_idx, Block& dst); // single row - static void jsonb_to_block(const TupleDescriptor& desc, const char* data, size_t size, + static void jsonb_to_block(const DataTypeSerDeSPtrs& serdes, const char* data, size_t size, + const std::unordered_map& col_id_to_idx, Block& dst); - - static void jsonb_to_block(TabletSchemaSPtr schema, const std::vector& col_ids, - const ColumnString& jsonb_column, Block& dst); - - static void jsonb_to_block(TabletSchemaSPtr schema, const std::vector& col_ids, - const char* data, size_t size, Block& dst); - - static PrimitiveType get_primity_type(FieldType type); }; } // namespace doris::vectorized \ No newline at end of file diff --git a/be/test/vec/jsonb/serialize_test.cpp b/be/test/vec/jsonb/serialize_test.cpp index 93a408fb30..5a43f58a0f 100644 --- a/be/test/vec/jsonb/serialize_test.cpp +++ b/be/test/vec/jsonb/serialize_test.cpp @@ -26,6 +26,7 @@ #include #include #include +#include #include #include @@ -59,6 +60,7 @@ #include "vec/data_types/data_type_number.h" #include "vec/data_types/data_type_string.h" #include "vec/data_types/data_type_time_v2.h" +#include "vec/data_types/serde/data_type_serde.h" #include "vec/runtime/vdatetime_value.h" namespace doris::vectorized { @@ -126,12 +128,15 @@ TEST(BlockSerializeTest, Array) { MutableColumnPtr col = ColumnString::create(); // serialize JsonbSerializeUtil::block_to_jsonb(schema, block, static_cast(*col.get()), - block.columns()); + block.columns(), + create_data_type_serdes(block.get_data_types())); // deserialize TupleDescriptor read_desc(PTupleDescriptor(), true); // slot1 TSlotDescriptor tslot1; tslot1.__set_colName("k1"); + tslot1.nullIndicatorBit = -1; + tslot1.nullIndicatorByte = 0; TypeDescriptor type_desc(TYPE_ARRAY); type_desc.children.push_back(TypeDescriptor(TYPE_INT)); type_desc.contains_nulls.push_back(true); @@ -143,6 +148,8 @@ TEST(BlockSerializeTest, Array) { // slot2 TSlotDescriptor tslot2; tslot2.__set_colName("k2"); + tslot2.nullIndicatorBit = -1; + tslot2.nullIndicatorByte = 0; TypeDescriptor type_desc2(TYPE_ARRAY); type_desc2.children.push_back(TypeDescriptor(TYPE_STRING)); type_desc2.contains_nulls.push_back(true); @@ -152,7 +159,15 @@ TEST(BlockSerializeTest, Array) { read_desc.add_slot(slot2); Block new_block = block.clone_empty(); - JsonbSerializeUtil::jsonb_to_block(read_desc, static_cast(*col.get()), + std::unordered_map col_uid_to_idx; + for (int i = 0; i < read_desc.slots().size(); ++i) { + col_uid_to_idx[read_desc.slots()[i]->col_unique_id()] = i; + std::cout << "uid " << read_desc.slots()[i]->col_unique_id() << ":" << i << std::endl; + } + std::cout << block.dump_data() << std::endl; + std::cout << new_block.dump_data() << std::endl; + JsonbSerializeUtil::jsonb_to_block(create_data_type_serdes(read_desc.slots()), + static_cast(*col.get()), col_uid_to_idx, new_block); std::cout << block.dump_data() << std::endl; std::cout << new_block.dump_data() << std::endl; @@ -302,7 +317,8 @@ TEST(BlockSerializeTest, JsonbBlock) { MutableColumnPtr col = ColumnString::create(); // serialize JsonbSerializeUtil::block_to_jsonb(schema, block, static_cast(*col.get()), - block.columns()); + block.columns(), + create_data_type_serdes(block.get_data_types())); // deserialize TupleDescriptor read_desc(PTupleDescriptor(), true); for (auto t : cols) { @@ -322,7 +338,12 @@ TEST(BlockSerializeTest, JsonbBlock) { read_desc.add_slot(slot); } Block new_block = block.clone_empty(); - JsonbSerializeUtil::jsonb_to_block(read_desc, static_cast(*col.get()), + std::unordered_map col_uid_to_idx; + for (int i = 0; i < read_desc.slots().size(); ++i) { + col_uid_to_idx[read_desc.slots()[i]->col_unique_id()] = i; + } + JsonbSerializeUtil::jsonb_to_block(create_data_type_serdes(block.get_data_types()), + static_cast(*col.get()), col_uid_to_idx, new_block); std::cout << block.dump_data() << std::endl; std::cout << new_block.dump_data() << std::endl;