[Optimize](row store) optimize serialization and deserialization (#19691)

1. Get DataTypeSerde in advance to avoid get temporary DataTypeSerde iterate each column
2. Iterate the original row once is enoungh for deserializing by introducing a map for record the index of each column's unique id
This commit is contained in:
lihangyu
2023-05-18 16:22:38 +08:00
committed by GitHub
parent 294599ee45
commit fd4fa5c64e
12 changed files with 145 additions and 67 deletions

View File

@ -53,6 +53,7 @@
#include "vec/common/string_ref.h"
#include "vec/core/block.h" // Block
#include "vec/data_types/data_type_factory.hpp"
#include "vec/data_types/serde/data_type_serde.h"
#include "vec/jsonb/serialize.h"
namespace doris {
@ -120,7 +121,8 @@ Status RowIDFetcher::_merge_rpc_results(const PMultiGetRequest& request,
return Status::InternalError(cntl.ErrorText());
}
}
vectorized::DataTypeSerDeSPtrs serdes;
std::unordered_map<uint32_t, uint32_t> col_uid_to_idx;
auto merge_function = [&](const PMultiGetResponse& resp) {
Status st(resp.status());
if (!st.ok()) {
@ -136,10 +138,16 @@ Status RowIDFetcher::_merge_rpc_results(const PMultiGetRequest& request,
if (output_block->is_empty_column()) {
*output_block = vectorized::Block(_fetch_option.desc->slots(), 1);
}
if (serdes.empty() && col_uid_to_idx.empty()) {
serdes = vectorized::create_data_type_serdes(_fetch_option.desc->slots());
for (int i = 0; i < _fetch_option.desc->slots().size(); ++i) {
col_uid_to_idx[_fetch_option.desc->slots()[i]->col_unique_id()] = i;
}
}
for (int i = 0; i < resp.binary_row_data_size(); ++i) {
vectorized::JsonbSerializeUtil::jsonb_to_block(
*_fetch_option.desc, resp.binary_row_data(i).data(),
resp.binary_row_data(i).size(), *output_block);
serdes, resp.binary_row_data(i).data(), resp.binary_row_data(i).size(),
col_uid_to_idx, *output_block);
}
return Status::OK();
}

View File

@ -51,6 +51,7 @@
#include "vec/common/assert_cast.h"
#include "vec/core/column_with_type_and_name.h"
#include "vec/data_types/data_type.h"
#include "vec/data_types/serde/data_type_serde.h"
#include "vec/json/path_in_data.h"
#include "vec/jsonb/serialize.h"
@ -500,8 +501,10 @@ void MemTable::serialize_block_to_row_column(vectorized::Block& block) {
.assume_mutable()
.get());
row_store_column->clear();
vectorized::DataTypeSerDeSPtrs serdes =
vectorized::create_data_type_serdes(block.get_data_types());
vectorized::JsonbSerializeUtil::block_to_jsonb(*_tablet_schema, block, *row_store_column,
_num_columns);
_tablet_schema->num_columns(), serdes);
VLOG_DEBUG << "serialize , num_rows:" << block.rows() << ", row_column_id:" << row_column_id
<< ", total_byte_size:" << block.allocated_bytes() << ", serialize_cost(us)"
<< watch.elapsed_time() / 1000;

View File

@ -339,8 +339,10 @@ void SegmentWriter::_serialize_block_to_row_column(vectorized::Block& block) {
.assume_mutable()
.get());
row_store_column->clear();
vectorized::DataTypeSerDeSPtrs serdes =
vectorized::create_data_type_serdes(block.get_data_types());
vectorized::JsonbSerializeUtil::block_to_jsonb(*_tablet_schema, block, *row_store_column,
_tablet_schema->num_columns());
_tablet_schema->num_columns(), serdes);
VLOG_DEBUG << "serialize , num_rows:" << block.rows() << ", row_column_id:" << row_column_id
<< ", total_byte_size:" << block.allocated_bytes() << ", serialize_cost(us)"
<< watch.elapsed_time() / 1000;

View File

@ -116,6 +116,7 @@
#include "vec/common/string_ref.h"
#include "vec/data_types/data_type.h"
#include "vec/data_types/data_type_factory.hpp"
#include "vec/data_types/serde/data_type_serde.h"
#include "vec/jsonb/serialize.h"
namespace doris {
@ -2429,7 +2430,17 @@ Status Tablet::fetch_value_through_row_column(RowsetSharedPtr input_rowset, uint
RETURN_IF_ERROR(column_iterator->read_by_rowids(rowids.data(), rowids.size(), column_ptr));
assert(column_ptr->size() == rowids.size());
auto string_column = static_cast<vectorized::ColumnString*>(column_ptr.get());
vectorized::JsonbSerializeUtil::jsonb_to_block(tablet_schema, cids, *string_column, block);
vectorized::DataTypeSerDeSPtrs serdes;
serdes.resize(cids.size());
std::unordered_map<uint32_t, uint32_t> col_uid_to_idx;
for (int i = 0; i < cids.size(); ++i) {
const TabletColumn& column = tablet_schema->column(cids[i]);
vectorized::DataTypePtr type =
vectorized::DataTypeFactory::instance().create_data_type(column);
col_uid_to_idx[column.unique_id()] = i;
serdes[i] = type->get_serde();
}
vectorized::JsonbSerializeUtil::jsonb_to_block(serdes, *string_column, col_uid_to_idx, block);
return Status::OK();
}

View File

@ -40,7 +40,7 @@
namespace doris {
namespace vectorized {
class Block;
}
} // namespace vectorized
struct OlapTableIndexSchema;
class TColumn;

View File

@ -23,8 +23,10 @@
#include <gen_cpp/internal_service.pb.h>
#include <stdlib.h>
#include <unordered_map>
#include <vector>
#include "gutil/integral_types.h"
#include "olap/lru_cache.h"
#include "olap/olap_tuple.h"
#include "olap/row_cursor.h"
@ -35,6 +37,7 @@
#include "util/key_util.h"
#include "util/runtime_profile.h"
#include "util/thrift_util.h"
#include "vec/data_types/serde/data_type_serde.h"
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vexpr_context.h"
#include "vec/jsonb/serialize.h"
@ -65,6 +68,10 @@ Status Reusable::init(const TDescriptorTable& t_desc_tbl, const std::vector<TExp
// Prepare the exprs to run.
RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_exprs_ctxs, _runtime_state.get(), row_desc));
_create_timestamp = butil::gettimeofday_ms();
_data_type_serdes = vectorized::create_data_type_serdes(tuple_desc()->slots());
for (int i = 0; i < tuple_desc()->slots().size(); ++i) {
_col_uid_to_idx[tuple_desc()->slots()[i]->col_unique_id()] = i;
}
return Status::OK();
}
@ -279,8 +286,10 @@ Status PointQueryExecutor::_lookup_row_data() {
for (size_t i = 0; i < _row_read_ctxs.size(); ++i) {
if (_row_read_ctxs[i]._cached_row_data.valid()) {
vectorized::JsonbSerializeUtil::jsonb_to_block(
*_reusable->tuple_desc(), _row_read_ctxs[i]._cached_row_data.data().data,
_row_read_ctxs[i]._cached_row_data.data().size, *_result_block);
_reusable->get_data_type_serdes(),
_row_read_ctxs[i]._cached_row_data.data().data,
_row_read_ctxs[i]._cached_row_data.data().size, _reusable->get_col_uid_to_idx(),
*_result_block);
continue;
}
if (!_row_read_ctxs[i]._row_location.has_value()) {
@ -293,8 +302,9 @@ Status PointQueryExecutor::_lookup_row_data() {
_profile_metrics.read_stats, value,
!config::disable_storage_row_cache /*whether write row cache*/));
// serilize value to block, currently only jsonb row formt
vectorized::JsonbSerializeUtil::jsonb_to_block(*_reusable->tuple_desc(), value.data(),
value.size(), *_result_block);
vectorized::JsonbSerializeUtil::jsonb_to_block(
_reusable->get_data_type_serdes(), value.data(), value.size(),
_reusable->get_col_uid_to_idx(), *_result_block);
}
return Status::OK();
}

View File

@ -31,6 +31,7 @@
#include <optional>
#include <ostream>
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>
@ -49,6 +50,7 @@
#include "util/runtime_profile.h"
#include "util/slice.h"
#include "vec/core/block.h"
#include "vec/data_types/serde/data_type_serde.h"
namespace doris {
@ -76,6 +78,12 @@ public:
std::unique_ptr<vectorized::Block> get_block();
const vectorized::DataTypeSerDeSPtrs& get_data_type_serdes() const { return _data_type_serdes; }
const std::unordered_map<uint32_t, uint32_t>& get_col_uid_to_idx() const {
return _col_uid_to_idx;
}
// do not touch block after returned
void return_block(std::unique_ptr<vectorized::Block>& block);
@ -92,6 +100,8 @@ private:
std::vector<std::unique_ptr<vectorized::Block>> _block_pool;
std::vector<vectorized::VExprContext*> _output_exprs_ctxs;
int64_t _create_timestamp = 0;
vectorized::DataTypeSerDeSPtrs _data_type_serdes;
std::unordered_map<uint32_t, uint32_t> _col_uid_to_idx;
};
// RowCache is a LRU cache for row store

View File

@ -16,9 +16,30 @@
// under the License.
#include "data_type_serde.h"
#include "runtime/descriptors.h"
#include "vec/data_types/data_type.h"
namespace doris {
namespace vectorized {
DataTypeSerDe::DataTypeSerDe() = default;
DataTypeSerDe::~DataTypeSerDe() = default;
DataTypeSerDeSPtrs create_data_type_serdes(const DataTypes& types) {
DataTypeSerDeSPtrs serdes;
serdes.reserve(types.size());
for (const DataTypePtr& type : types) {
serdes.push_back(type->get_serde());
}
return serdes;
}
DataTypeSerDeSPtrs create_data_type_serdes(const std::vector<SlotDescriptor*>& slots) {
DataTypeSerDeSPtrs serdes;
serdes.reserve(slots.size());
for (const SlotDescriptor* slot : slots) {
serdes.push_back(slot->get_data_type_ptr()->get_serde());
}
return serdes;
}
} // namespace vectorized
} // namespace doris

View File

@ -39,10 +39,12 @@ class time_zone;
namespace doris {
class PValues;
class JsonbValue;
class SlotDescriptor;
namespace vectorized {
class IColumn;
class Arena;
class IDataType;
// Deserialize means read from different file format or memory format,
// for example read from arrow, read from parquet.
// Serialize means write the column cell or the total column into another
@ -99,5 +101,9 @@ inline void checkArrowStatus(const arrow::Status& status, const std::string& col
using DataTypeSerDeSPtr = std::shared_ptr<DataTypeSerDe>;
using DataTypeSerDeSPtrs = std::vector<DataTypeSerDeSPtr>;
DataTypeSerDeSPtrs create_data_type_serdes(
const std::vector<std::shared_ptr<const IDataType>>& types);
DataTypeSerDeSPtrs create_data_type_serdes(const std::vector<SlotDescriptor*>& slots);
} // namespace vectorized
} // namespace doris

View File

@ -37,13 +37,15 @@
#include "vec/common/arena.h"
#include "vec/common/string_ref.h"
#include "vec/core/column_with_type_and_name.h"
#include "vec/core/columns_with_type_and_name.h"
#include "vec/data_types/data_type.h"
#include "vec/data_types/serde/data_type_serde.h"
namespace doris::vectorized {
void JsonbSerializeUtil::block_to_jsonb(const TabletSchema& schema, const Block& block,
ColumnString& dst, int num_cols) {
ColumnString& dst, int num_cols,
const DataTypeSerDeSPtrs& serdes) {
auto num_rows = block.rows();
Arena pool;
assert(num_cols <= block.columns());
@ -57,8 +59,8 @@ void JsonbSerializeUtil::block_to_jsonb(const TabletSchema& schema, const Block&
// ignore dst row store column
continue;
}
block.get_data_type(j)->get_serde()->write_one_cell_to_jsonb(
*column, jsonb_writer, &pool, tablet_column.unique_id(), i);
serdes[j]->write_one_cell_to_jsonb(*column, jsonb_writer, &pool,
tablet_column.unique_id(), i);
}
jsonb_writer.writeEndObject();
dst.insert_data(jsonb_writer.getOutput()->getBuffer(), jsonb_writer.getOutput()->getSize());
@ -66,56 +68,44 @@ void JsonbSerializeUtil::block_to_jsonb(const TabletSchema& schema, const Block&
}
// batch rows
void JsonbSerializeUtil::jsonb_to_block(const TupleDescriptor& desc,
const ColumnString& jsonb_column, Block& dst) {
void JsonbSerializeUtil::jsonb_to_block(const DataTypeSerDeSPtrs& serdes,
const ColumnString& jsonb_column,
const std::unordered_map<uint32_t, uint32_t>& col_id_to_idx,
Block& dst) {
for (int i = 0; i < jsonb_column.size(); ++i) {
StringRef jsonb_data = jsonb_column.get_data_at(i);
jsonb_to_block(desc, jsonb_data.data, jsonb_data.size, dst);
jsonb_to_block(serdes, jsonb_data.data, jsonb_data.size, col_id_to_idx, dst);
}
}
// single row
void JsonbSerializeUtil::jsonb_to_block(const TupleDescriptor& desc, const char* data, size_t size,
void JsonbSerializeUtil::jsonb_to_block(const DataTypeSerDeSPtrs& serdes, const char* data,
size_t size,
const std::unordered_map<uint32_t, uint32_t>& col_id_to_idx,
Block& dst) {
auto pdoc = JsonbDocument::createDocument(data, size);
JsonbDocument& doc = *pdoc;
for (int j = 0; j < desc.slots().size(); ++j) {
SlotDescriptor* slot = desc.slots()[j];
JsonbValue* slot_value = doc->find(slot->col_unique_id());
MutableColumnPtr dst_column = dst.get_by_position(j).column->assume_mutable();
if (!slot_value || slot_value->isNull()) {
// null or not exist
dst_column->insert_default();
continue;
size_t num_rows = dst.rows();
size_t filled_columns = 0;
for (auto it = doc->begin(); it != doc->end(); ++it) {
auto col_it = col_id_to_idx.find(it->getKeyId());
if (col_it != col_id_to_idx.end()) {
MutableColumnPtr dst_column =
dst.get_by_position(col_it->second).column->assume_mutable();
serdes[col_it->second]->read_one_cell_from_jsonb(*dst_column, it->value());
++filled_columns;
}
dst.get_data_type(j)->get_serde()->read_one_cell_from_jsonb(*dst_column, slot_value);
}
}
void JsonbSerializeUtil::jsonb_to_block(TabletSchemaSPtr schema,
const std::vector<uint32_t>& col_ids,
const ColumnString& jsonb_column, Block& dst) {
for (int i = 0; i < jsonb_column.size(); ++i) {
StringRef jsonb_data = jsonb_column.get_data_at(i);
jsonb_to_block(schema, col_ids, jsonb_data.data, jsonb_data.size, dst);
}
}
void JsonbSerializeUtil::jsonb_to_block(TabletSchemaSPtr schema,
const std::vector<uint32_t>& col_ids, const char* data,
size_t size, Block& dst) {
auto pdoc = JsonbDocument::createDocument(data, size);
JsonbDocument& doc = *pdoc;
for (int j = 0; j < col_ids.size(); ++j) {
auto column = schema->column(col_ids[j]);
JsonbValue* slot_value = doc->find(column.unique_id());
MutableColumnPtr dst_column = dst.get_by_position(j).column->assume_mutable();
if (!slot_value || slot_value->isNull()) {
// null or not exist
dst_column->insert_default();
continue;
if (filled_columns < dst.columns()) {
// fill missing slot
for (auto& column_type_name : dst) {
MutableColumnPtr col = column_type_name.column->assume_mutable();
if (col->size() < num_rows + 1) {
DCHECK(col->size() == num_rows);
col->insert_default();
}
DCHECK(col->size() == num_rows + 1);
}
dst.get_data_type(j)->get_serde()->read_one_cell_from_jsonb(*dst_column, slot_value);
}
}

View File

@ -18,6 +18,8 @@
#pragma once
#include <stddef.h>
#include <unordered_map>
#include "olap/tablet_schema.h"
#include "runtime/descriptors.h"
#include "vec/columns/column_string.h"
@ -37,20 +39,14 @@ namespace doris::vectorized {
class JsonbSerializeUtil {
public:
static void block_to_jsonb(const TabletSchema& schema, const Block& block, ColumnString& dst,
int num_cols);
int num_cols, const DataTypeSerDeSPtrs& serdes);
// batch rows
static void jsonb_to_block(const TupleDescriptor& desc, const ColumnString& jsonb_column,
static void jsonb_to_block(const DataTypeSerDeSPtrs& serdes, const ColumnString& jsonb_column,
const std::unordered_map<uint32_t, uint32_t>& col_id_to_idx,
Block& dst);
// single row
static void jsonb_to_block(const TupleDescriptor& desc, const char* data, size_t size,
static void jsonb_to_block(const DataTypeSerDeSPtrs& serdes, const char* data, size_t size,
const std::unordered_map<uint32_t, uint32_t>& col_id_to_idx,
Block& dst);
static void jsonb_to_block(TabletSchemaSPtr schema, const std::vector<uint32_t>& col_ids,
const ColumnString& jsonb_column, Block& dst);
static void jsonb_to_block(TabletSchemaSPtr schema, const std::vector<uint32_t>& col_ids,
const char* data, size_t size, Block& dst);
static PrimitiveType get_primity_type(FieldType type);
};
} // namespace doris::vectorized

View File

@ -26,6 +26,7 @@
#include <memory>
#include <string>
#include <tuple>
#include <type_traits>
#include <utility>
#include <vector>
@ -59,6 +60,7 @@
#include "vec/data_types/data_type_number.h"
#include "vec/data_types/data_type_string.h"
#include "vec/data_types/data_type_time_v2.h"
#include "vec/data_types/serde/data_type_serde.h"
#include "vec/runtime/vdatetime_value.h"
namespace doris::vectorized {
@ -126,12 +128,15 @@ TEST(BlockSerializeTest, Array) {
MutableColumnPtr col = ColumnString::create();
// serialize
JsonbSerializeUtil::block_to_jsonb(schema, block, static_cast<ColumnString&>(*col.get()),
block.columns());
block.columns(),
create_data_type_serdes(block.get_data_types()));
// deserialize
TupleDescriptor read_desc(PTupleDescriptor(), true);
// slot1
TSlotDescriptor tslot1;
tslot1.__set_colName("k1");
tslot1.nullIndicatorBit = -1;
tslot1.nullIndicatorByte = 0;
TypeDescriptor type_desc(TYPE_ARRAY);
type_desc.children.push_back(TypeDescriptor(TYPE_INT));
type_desc.contains_nulls.push_back(true);
@ -143,6 +148,8 @@ TEST(BlockSerializeTest, Array) {
// slot2
TSlotDescriptor tslot2;
tslot2.__set_colName("k2");
tslot2.nullIndicatorBit = -1;
tslot2.nullIndicatorByte = 0;
TypeDescriptor type_desc2(TYPE_ARRAY);
type_desc2.children.push_back(TypeDescriptor(TYPE_STRING));
type_desc2.contains_nulls.push_back(true);
@ -152,7 +159,15 @@ TEST(BlockSerializeTest, Array) {
read_desc.add_slot(slot2);
Block new_block = block.clone_empty();
JsonbSerializeUtil::jsonb_to_block(read_desc, static_cast<ColumnString&>(*col.get()),
std::unordered_map<uint32_t, uint32_t> col_uid_to_idx;
for (int i = 0; i < read_desc.slots().size(); ++i) {
col_uid_to_idx[read_desc.slots()[i]->col_unique_id()] = i;
std::cout << "uid " << read_desc.slots()[i]->col_unique_id() << ":" << i << std::endl;
}
std::cout << block.dump_data() << std::endl;
std::cout << new_block.dump_data() << std::endl;
JsonbSerializeUtil::jsonb_to_block(create_data_type_serdes(read_desc.slots()),
static_cast<ColumnString&>(*col.get()), col_uid_to_idx,
new_block);
std::cout << block.dump_data() << std::endl;
std::cout << new_block.dump_data() << std::endl;
@ -302,7 +317,8 @@ TEST(BlockSerializeTest, JsonbBlock) {
MutableColumnPtr col = ColumnString::create();
// serialize
JsonbSerializeUtil::block_to_jsonb(schema, block, static_cast<ColumnString&>(*col.get()),
block.columns());
block.columns(),
create_data_type_serdes(block.get_data_types()));
// deserialize
TupleDescriptor read_desc(PTupleDescriptor(), true);
for (auto t : cols) {
@ -322,7 +338,12 @@ TEST(BlockSerializeTest, JsonbBlock) {
read_desc.add_slot(slot);
}
Block new_block = block.clone_empty();
JsonbSerializeUtil::jsonb_to_block(read_desc, static_cast<const ColumnString&>(*col.get()),
std::unordered_map<uint32_t, uint32_t> col_uid_to_idx;
for (int i = 0; i < read_desc.slots().size(); ++i) {
col_uid_to_idx[read_desc.slots()[i]->col_unique_id()] = i;
}
JsonbSerializeUtil::jsonb_to_block(create_data_type_serdes(block.get_data_types()),
static_cast<const ColumnString&>(*col.get()), col_uid_to_idx,
new_block);
std::cout << block.dump_data() << std::endl;
std::cout << new_block.dump_data() << std::endl;