[Refactor](map) remove using column array in map to reduce offset column (#17330)

1. remove column array in map 
2. add offsets column in map 
Aim to reduce duplicate offset  from key-array and value-array in disk
This commit is contained in:
amory
2023-03-09 11:22:26 +08:00
committed by GitHub
parent 368e6a4f9c
commit 06dee69174
19 changed files with 659 additions and 326 deletions

View File

@ -102,27 +102,34 @@ Status ColumnReader::create(const ColumnReaderOptions& opts, const ColumnMetaPB&
return Status::OK();
}
case FieldType::OLAP_FIELD_TYPE_MAP: {
// map reader now has 3 sub readers for key(arr), value(arr), null(scala)
// map reader now has 3 sub readers for key, value, offsets(scalar), null(scala)
std::unique_ptr<ColumnReader> map_reader(
new ColumnReader(opts, meta, num_rows, file_reader));
std::unique_ptr<ColumnReader> key_reader;
RETURN_IF_ERROR(ColumnReader::create(opts, meta.children_columns(0), num_rows,
file_reader, &key_reader));
RETURN_IF_ERROR(ColumnReader::create(opts, meta.children_columns(0),
meta.children_columns(0).num_rows(), file_reader,
&key_reader));
std::unique_ptr<ColumnReader> val_reader;
RETURN_IF_ERROR(ColumnReader::create(opts, meta.children_columns(1), num_rows,
file_reader, &val_reader));
RETURN_IF_ERROR(ColumnReader::create(opts, meta.children_columns(1),
meta.children_columns(1).num_rows(), file_reader,
&val_reader));
std::unique_ptr<ColumnReader> offset_reader;
RETURN_IF_ERROR(ColumnReader::create(opts, meta.children_columns(2),
meta.children_columns(2).num_rows(), file_reader,
&offset_reader));
std::unique_ptr<ColumnReader> null_reader;
if (meta.is_nullable()) {
RETURN_IF_ERROR(ColumnReader::create(opts, meta.children_columns(2),
meta.children_columns(2).num_rows(),
RETURN_IF_ERROR(ColumnReader::create(opts, meta.children_columns(3),
meta.children_columns(3).num_rows(),
file_reader, &null_reader));
}
map_reader->_sub_readers.resize(meta.children_columns_size());
map_reader->_sub_readers[0] = std::move(key_reader);
map_reader->_sub_readers[1] = std::move(val_reader);
map_reader->_sub_readers[2] = std::move(offset_reader);
if (meta.is_nullable()) {
map_reader->_sub_readers[2] = std::move(null_reader);
map_reader->_sub_readers[3] = std::move(null_reader);
}
*reader = std::move(map_reader);
return Status::OK();
@ -516,14 +523,14 @@ Status ColumnReader::new_iterator(ColumnIterator** iterator) {
ColumnIterator* offset_iterator = nullptr;
RETURN_IF_ERROR(_sub_readers[1]->new_iterator(&offset_iterator));
OffsetFileColumnIterator* ofcIter = new OffsetFileColumnIterator(
reinterpret_cast<FileColumnIterator*>(offset_iterator));
ColumnIterator* null_iterator = nullptr;
if (is_nullable()) {
RETURN_IF_ERROR(_sub_readers[2]->new_iterator(&null_iterator));
}
*iterator = new ArrayFileColumnIterator(
this, reinterpret_cast<FileColumnIterator*>(offset_iterator), item_iterator,
null_iterator);
*iterator = new ArrayFileColumnIterator(this, ofcIter, item_iterator, null_iterator);
return Status::OK();
}
case FieldType::OLAP_FIELD_TYPE_MAP: {
@ -531,11 +538,17 @@ Status ColumnReader::new_iterator(ColumnIterator** iterator) {
RETURN_IF_ERROR(_sub_readers[0]->new_iterator(&key_iterator));
ColumnIterator* val_iterator = nullptr;
RETURN_IF_ERROR(_sub_readers[1]->new_iterator(&val_iterator));
ColumnIterator* offsets_iterator = nullptr;
RETURN_IF_ERROR(_sub_readers[2]->new_iterator(&offsets_iterator));
OffsetFileColumnIterator* ofcIter = new OffsetFileColumnIterator(
reinterpret_cast<FileColumnIterator*>(offsets_iterator));
ColumnIterator* null_iterator = nullptr;
if (is_nullable()) {
RETURN_IF_ERROR(_sub_readers[2]->new_iterator(&null_iterator));
RETURN_IF_ERROR(_sub_readers[3]->new_iterator(&null_iterator));
}
*iterator = new MapFileColumnIterator(this, null_iterator, key_iterator, val_iterator);
*iterator = new MapFileColumnIterator(this, null_iterator, ofcIter, key_iterator,
val_iterator);
return Status::OK();
}
default:
@ -547,11 +560,14 @@ Status ColumnReader::new_iterator(ColumnIterator** iterator) {
///====================== MapFileColumnIterator ============================////
MapFileColumnIterator::MapFileColumnIterator(ColumnReader* reader, ColumnIterator* null_iterator,
OffsetFileColumnIterator* offsets_iterator,
ColumnIterator* key_iterator,
ColumnIterator* val_iterator)
: _map_reader(reader) {
_key_iterator.reset(key_iterator);
_val_iterator.reset(val_iterator);
_offsets_iterator.reset(offsets_iterator);
if (_map_reader->is_nullable()) {
_null_iterator.reset(null_iterator);
}
@ -560,6 +576,7 @@ MapFileColumnIterator::MapFileColumnIterator(ColumnReader* reader, ColumnIterato
Status MapFileColumnIterator::init(const ColumnIteratorOptions& opts) {
RETURN_IF_ERROR(_key_iterator->init(opts));
RETURN_IF_ERROR(_val_iterator->init(opts));
RETURN_IF_ERROR(_offsets_iterator->init(opts));
if (_map_reader->is_nullable()) {
RETURN_IF_ERROR(_null_iterator->init(opts));
}
@ -567,11 +584,15 @@ Status MapFileColumnIterator::init(const ColumnIteratorOptions& opts) {
}
Status MapFileColumnIterator::seek_to_ordinal(ordinal_t ord) {
RETURN_IF_ERROR(_key_iterator->seek_to_ordinal(ord));
RETURN_IF_ERROR(_val_iterator->seek_to_ordinal(ord));
if (_map_reader->is_nullable()) {
RETURN_IF_ERROR(_null_iterator->seek_to_ordinal(ord));
}
RETURN_IF_ERROR(_offsets_iterator->seek_to_ordinal(ord));
// here to use offset info
ordinal_t offset = 0;
RETURN_IF_ERROR(_offsets_iterator->_peek_one_offset(&offset));
RETURN_IF_ERROR(_key_iterator->seek_to_ordinal(offset));
RETURN_IF_ERROR(_val_iterator->seek_to_ordinal(offset));
return Status::OK();
}
@ -580,13 +601,32 @@ Status MapFileColumnIterator::next_batch(size_t* n, vectorized::MutableColumnPtr
const auto* column_map = vectorized::check_and_get_column<vectorized::ColumnMap>(
dst->is_nullable() ? static_cast<vectorized::ColumnNullable&>(*dst).get_nested_column()
: *dst);
size_t num_read = *n;
auto column_key_ptr = column_map->get_keys().assume_mutable();
auto column_val_ptr = column_map->get_values().assume_mutable();
RETURN_IF_ERROR(_key_iterator->next_batch(&num_read, column_key_ptr, has_null));
RETURN_IF_ERROR(_val_iterator->next_batch(&num_read, column_val_ptr, has_null));
auto column_offsets_ptr = column_map->get_offsets_column().assume_mutable();
bool offsets_has_null = false;
ssize_t start = column_offsets_ptr->size();
RETURN_IF_ERROR(_offsets_iterator->next_batch(n, column_offsets_ptr, &offsets_has_null));
if (*n == 0) {
return Status::OK();
}
auto& column_offsets =
static_cast<vectorized::ColumnArray::ColumnOffsets&>(*column_offsets_ptr);
RETURN_IF_ERROR(_offsets_iterator->_calculate_offsets(start, column_offsets));
size_t num_items =
column_offsets.get_data().back() - column_offsets.get_data()[start - 1]; // -1 is valid
auto key_ptr = column_map->get_keys().assume_mutable();
auto val_ptr = column_map->get_values().assume_mutable();
if (num_items > 0) {
size_t num_read = num_items;
bool key_has_null = false;
bool val_has_null = false;
RETURN_IF_ERROR(_key_iterator->next_batch(&num_read, key_ptr, &key_has_null));
RETURN_IF_ERROR(_val_iterator->next_batch(&num_read, val_ptr, &val_has_null));
DCHECK(num_read == num_items);
}
if (dst->is_nullable()) {
size_t num_read = *n;
auto null_map_ptr =
static_cast<vectorized::ColumnNullable&>(*dst).get_null_map_column_ptr();
bool null_signs_has_null = false;
@ -680,9 +720,51 @@ Status StructFileColumnIterator::read_by_rowids(const rowid_t* rowids, const siz
}
////////////////////////////////////////////////////////////////////////////////
Status OffsetFileColumnIterator::init(const ColumnIteratorOptions& opts) {
RETURN_IF_ERROR(_offset_iterator->init(opts));
return Status::OK();
}
Status OffsetFileColumnIterator::next_batch(size_t* n, vectorized::MutableColumnPtr& dst,
bool* has_null) {
RETURN_IF_ERROR(_offset_iterator->next_batch(n, dst, has_null));
return Status::OK();
}
Status OffsetFileColumnIterator::_peek_one_offset(ordinal_t* offset) {
if (_offset_iterator->get_current_page()->has_remaining()) {
PageDecoder* offset_page_decoder = _offset_iterator->get_current_page()->data_decoder;
vectorized::MutableColumnPtr offset_col = vectorized::ColumnUInt64::create();
size_t n = 1;
RETURN_IF_ERROR(offset_page_decoder->peek_next_batch(&n, offset_col)); // not null
DCHECK(offset_col->size() == 1);
*offset = offset_col->get_uint(0);
} else {
*offset = _offset_iterator->get_current_page()->next_array_item_ordinal;
}
return Status::OK();
}
Status OffsetFileColumnIterator::_calculate_offsets(
ssize_t start, vectorized::ColumnArray::ColumnOffsets& column_offsets) {
ordinal_t last_offset = 0;
RETURN_IF_ERROR(_peek_one_offset(&last_offset));
// calculate real offsets
auto& offsets_data = column_offsets.get_data();
ordinal_t first_offset = offsets_data[start - 1]; // -1 is valid
ordinal_t first_ord = offsets_data[start];
for (ssize_t i = start; i < offsets_data.size() - 1; ++i) {
offsets_data[i] = first_offset + (offsets_data[i + 1] - first_ord);
}
// last offset
offsets_data[offsets_data.size() - 1] = first_offset + (last_offset - first_ord);
return Status::OK();
}
////////////////////////////////////////////////////////////////////////////////
ArrayFileColumnIterator::ArrayFileColumnIterator(ColumnReader* reader,
FileColumnIterator* offset_reader,
OffsetFileColumnIterator* offset_reader,
ColumnIterator* item_iterator,
ColumnIterator* null_iterator)
: _array_reader(reader) {
@ -702,24 +784,10 @@ Status ArrayFileColumnIterator::init(const ColumnIteratorOptions& opts) {
return Status::OK();
}
Status ArrayFileColumnIterator::_peek_one_offset(ordinal_t* offset) {
if (_offset_iterator->get_current_page()->has_remaining()) {
PageDecoder* offset_page_decoder = _offset_iterator->get_current_page()->data_decoder;
vectorized::MutableColumnPtr offset_col = vectorized::ColumnUInt64::create();
size_t n = 1;
RETURN_IF_ERROR(offset_page_decoder->peek_next_batch(&n, offset_col)); // not null
DCHECK(offset_col->size() == 1);
*offset = offset_col->get_uint(0);
} else {
*offset = _offset_iterator->get_current_page()->next_array_item_ordinal;
}
return Status::OK();
}
Status ArrayFileColumnIterator::_seek_by_offsets(ordinal_t ord) {
// using offsets info
ordinal_t offset = 0;
RETURN_IF_ERROR(_peek_one_offset(&offset));
RETURN_IF_ERROR(_offset_iterator->_peek_one_offset(&offset));
RETURN_IF_ERROR(_item_iterator->seek_to_ordinal(offset));
return Status::OK();
}
@ -732,23 +800,6 @@ Status ArrayFileColumnIterator::seek_to_ordinal(ordinal_t ord) {
return _seek_by_offsets(ord);
}
Status ArrayFileColumnIterator::_calculate_offsets(
ssize_t start, vectorized::ColumnArray::ColumnOffsets& column_offsets) {
ordinal_t last_offset = 0;
RETURN_IF_ERROR(_peek_one_offset(&last_offset));
// calculate real offsets
auto& offsets_data = column_offsets.get_data();
ordinal_t first_offset = offsets_data[start - 1]; // -1 is valid
ordinal_t first_ord = offsets_data[start];
for (ssize_t i = start; i < offsets_data.size() - 1; ++i) {
offsets_data[i] = first_offset + (offsets_data[i + 1] - first_ord);
}
// last offset
offsets_data[offsets_data.size() - 1] = first_offset + (last_offset - first_ord);
return Status::OK();
}
Status ArrayFileColumnIterator::next_batch(size_t* n, vectorized::MutableColumnPtr& dst,
bool* has_null) {
const auto* column_array = vectorized::check_and_get_column<vectorized::ColumnArray>(
@ -764,7 +815,7 @@ Status ArrayFileColumnIterator::next_batch(size_t* n, vectorized::MutableColumnP
}
auto& column_offsets =
static_cast<vectorized::ColumnArray::ColumnOffsets&>(*column_offsets_ptr);
RETURN_IF_ERROR(_calculate_offsets(start, column_offsets));
RETURN_IF_ERROR(_offset_iterator->_calculate_offsets(start, column_offsets));
size_t num_items =
column_offsets.get_data().back() - column_offsets.get_data()[start - 1]; // -1 is valid
auto column_items_ptr = column_array->get_data().assume_mutable();

View File

@ -380,10 +380,45 @@ public:
ordinal_t get_current_ordinal() const override { return 0; }
};
// This iterator make offset operation write once for
class OffsetFileColumnIterator final : public ColumnIterator {
public:
explicit OffsetFileColumnIterator(FileColumnIterator* offset_reader) {
_offset_iterator.reset(offset_reader);
}
~OffsetFileColumnIterator() override = default;
Status init(const ColumnIteratorOptions& opts) override;
Status next_batch(size_t* n, vectorized::MutableColumnPtr& dst, bool* has_null) override;
ordinal_t get_current_ordinal() const override {
return _offset_iterator->get_current_ordinal();
}
Status seek_to_ordinal(ordinal_t ord) override {
RETURN_IF_ERROR(_offset_iterator->seek_to_ordinal(ord));
return Status::OK();
}
Status seek_to_first() override {
RETURN_IF_ERROR(_offset_iterator->seek_to_first());
return Status::OK();
}
Status _peek_one_offset(ordinal_t* offset);
Status _calculate_offsets(ssize_t start,
vectorized::ColumnArray::ColumnOffsets& column_offsets);
private:
std::unique_ptr<FileColumnIterator> _offset_iterator;
};
// This iterator is used to read map value column
class MapFileColumnIterator final : public ColumnIterator {
public:
explicit MapFileColumnIterator(ColumnReader* reader, ColumnIterator* null_iterator,
OffsetFileColumnIterator* offsets_iterator,
ColumnIterator* key_iterator, ColumnIterator* val_iterator);
~MapFileColumnIterator() override = default;
@ -394,23 +429,28 @@ public:
Status read_by_rowids(const rowid_t* rowids, const size_t count,
vectorized::MutableColumnPtr& dst) override;
Status seek_to_first() override {
RETURN_IF_ERROR(_key_iterator->seek_to_first());
RETURN_IF_ERROR(_val_iterator->seek_to_first());
RETURN_IF_ERROR(_null_iterator->seek_to_first());
RETURN_IF_ERROR(_offsets_iterator->seek_to_first());
if (_map_reader->is_nullable()) {
RETURN_IF_ERROR(_null_iterator->seek_to_first());
}
return Status::OK();
}
Status seek_to_ordinal(ordinal_t ord) override;
ordinal_t get_current_ordinal() const override { return _key_iterator->get_current_ordinal(); }
ordinal_t get_current_ordinal() const override {
return _offsets_iterator->get_current_ordinal();
}
private:
ColumnReader* _map_reader;
std::unique_ptr<ColumnIterator> _null_iterator;
std::unique_ptr<ColumnIterator> _key_iterator; // ArrayFileColumnIterator
std::unique_ptr<ColumnIterator> _val_iterator; // ArrayFileColumnIterator
std::unique_ptr<OffsetFileColumnIterator> _offsets_iterator; //OffsetFileIterator
std::unique_ptr<ColumnIterator> _key_iterator;
std::unique_ptr<ColumnIterator> _val_iterator;
};
class StructFileColumnIterator final : public ColumnIterator {
@ -451,7 +491,7 @@ private:
class ArrayFileColumnIterator final : public ColumnIterator {
public:
explicit ArrayFileColumnIterator(ColumnReader* reader, FileColumnIterator* offset_reader,
explicit ArrayFileColumnIterator(ColumnReader* reader, OffsetFileColumnIterator* offset_reader,
ColumnIterator* item_iterator, ColumnIterator* null_iterator);
~ArrayFileColumnIterator() override = default;
@ -480,14 +520,11 @@ public:
private:
ColumnReader* _array_reader;
std::unique_ptr<FileColumnIterator> _offset_iterator;
std::unique_ptr<OffsetFileColumnIterator> _offset_iterator;
std::unique_ptr<ColumnIterator> _null_iterator;
std::unique_ptr<ColumnIterator> _item_iterator;
Status _peek_one_offset(ordinal_t* offset);
Status _seek_by_offsets(ordinal_t ord);
Status _calculate_offsets(ssize_t start,
vectorized::ColumnArray::ColumnOffsets& column_offsets);
};
class RowIdColumnIterator : public ColumnIterator {

View File

@ -77,6 +77,7 @@ private:
RleEncoder<bool> _rle_encoder;
};
//Todo(Amory): here should according nullable and offset and need sub to simply this function
Status ColumnWriter::create(const ColumnWriterOptions& opts, const TabletColumn* column,
io::FileWriter* file_writer, std::unique_ptr<ColumnWriter>* writer) {
std::unique_ptr<Field> field(FieldFactory::create(*column));
@ -247,14 +248,67 @@ Status ColumnWriter::create(const ColumnWriterOptions& opts, const TabletColumn*
}
case FieldType::OLAP_FIELD_TYPE_MAP: {
DCHECK(column->get_subtype_count() == 2);
// create key & value writer
std::vector<std::unique_ptr<ColumnWriter>> inner_writer_list;
for (int i = 0; i < 2; ++i) {
const TabletColumn& item_column = column->get_sub_column(i);
// create item writer
ColumnWriterOptions item_options;
item_options.meta = opts.meta->mutable_children_columns(i);
item_options.need_zone_map = false;
item_options.need_bloom_filter = item_column.is_bf_column();
item_options.need_bitmap_index = item_column.has_bitmap_index();
item_options.inverted_index = nullptr;
if (item_column.type() == FieldType::OLAP_FIELD_TYPE_ARRAY) {
if (item_options.need_bloom_filter) {
return Status::NotSupported("Do not support bloom filter for map type");
}
if (item_options.need_bitmap_index) {
return Status::NotSupported("Do not support bitmap index for map type");
}
}
std::unique_ptr<ColumnWriter> item_writer;
RETURN_IF_ERROR(ColumnWriter::create(item_options, &item_column, file_writer,
&item_writer));
inner_writer_list.push_back(std::move(item_writer));
}
ScalarColumnWriter* null_writer = nullptr;
// create offset writer
FieldType length_type = FieldType::OLAP_FIELD_TYPE_UNSIGNED_BIGINT;
// Be Cautious: column unique id is used for column reader creation
ColumnWriterOptions length_options;
length_options.meta = opts.meta->add_children_columns();
length_options.meta->set_column_id(column->get_subtype_count() + 1);
length_options.meta->set_unique_id(column->get_subtype_count() + 1);
length_options.meta->set_type(length_type);
length_options.meta->set_is_nullable(false);
length_options.meta->set_length(
get_scalar_type_info<OLAP_FIELD_TYPE_UNSIGNED_BIGINT>()->size());
length_options.meta->set_encoding(DEFAULT_ENCODING);
length_options.meta->set_compression(opts.meta->compression());
length_options.need_zone_map = false;
length_options.need_bloom_filter = false;
length_options.need_bitmap_index = false;
TabletColumn length_column = TabletColumn(
OLAP_FIELD_AGGREGATION_NONE, length_type, length_options.meta->is_nullable(),
length_options.meta->unique_id(), length_options.meta->length());
length_column.set_name("length");
length_column.set_index_length(-1); // no short key index
std::unique_ptr<Field> bigint_field(FieldFactory::create(length_column));
auto* length_writer =
new ScalarColumnWriter(length_options, std::move(bigint_field), file_writer);
// create null writer
if (opts.meta->is_nullable()) {
FieldType null_type = FieldType::OLAP_FIELD_TYPE_TINYINT;
ColumnWriterOptions null_options;
null_options.meta = opts.meta->add_children_columns();
null_options.meta->set_column_id(3);
null_options.meta->set_unique_id(3);
null_options.meta->set_column_id(column->get_subtype_count() + 2);
null_options.meta->set_unique_id(column->get_subtype_count() + 2);
null_options.meta->set_type(null_type);
null_options.meta->set_is_nullable(false);
null_options.meta->set_length(
@ -276,45 +330,11 @@ Status ColumnWriter::create(const ColumnWriterOptions& opts, const TabletColumn*
new ScalarColumnWriter(null_options, std::move(null_field), file_writer);
}
// create key & value writer
std::vector<std::unique_ptr<ColumnWriter>> inner_writer_list;
for (int i = 0; i < 2; ++i) {
std::unique_ptr<ColumnWriter> inner_array_writer;
ColumnWriterOptions arr_opts;
TabletColumn array_column(OLAP_FIELD_AGGREGATION_NONE, OLAP_FIELD_TYPE_ARRAY);
array_column.set_index_length(-1);
arr_opts.meta = opts.meta->mutable_children_columns(i);
ColumnMetaPB* child_meta = arr_opts.meta->add_children_columns();
// inner column meta from actual opts meta
const TabletColumn& inner_column =
column->get_sub_column(i); // field_type is true key and value
array_column.add_sub_column(const_cast<TabletColumn&>(inner_column));
std::string col_name = i == 0 ? "map.keys" : "map.vals";
array_column.set_name(col_name);
child_meta->set_type(inner_column.type());
child_meta->set_length(inner_column.length());
child_meta->set_column_id(arr_opts.meta->column_id() + 1);
child_meta->set_unique_id(arr_opts.meta->unique_id() + 1);
child_meta->set_compression(arr_opts.meta->compression());
child_meta->set_encoding(arr_opts.meta->encoding());
child_meta->set_is_nullable(true);
// set array column meta
arr_opts.meta->set_type(OLAP_FIELD_TYPE_ARRAY);
arr_opts.meta->set_encoding(opts.meta->encoding());
arr_opts.meta->set_compression(opts.meta->compression());
arr_opts.need_zone_map = false;
// no need inner array's null map
arr_opts.meta->set_is_nullable(false);
RETURN_IF_ERROR(ColumnWriter::create(arr_opts, &array_column, file_writer,
&inner_array_writer));
inner_writer_list.push_back(std::move(inner_array_writer));
}
// create map writer
std::unique_ptr<ColumnWriter> sub_column_writer;
std::unique_ptr<ColumnWriter> writer_local = std::unique_ptr<ColumnWriter>(
new MapColumnWriter(opts, std::move(field), null_writer, inner_writer_list));
std::unique_ptr<ColumnWriter> writer_local =
std::unique_ptr<ColumnWriter>(new MapColumnWriter(
opts, std::move(field), null_writer, length_writer, inner_writer_list));
*writer = std::move(writer_local);
return Status::OK();
@ -952,10 +972,11 @@ Status ArrayColumnWriter::finish_current_page() {
/// ============================= MapColumnWriter =====================////
MapColumnWriter::MapColumnWriter(const ColumnWriterOptions& opts, std::unique_ptr<Field> field,
ScalarColumnWriter* null_writer,
ScalarColumnWriter* null_writer, ScalarColumnWriter* offset_writer,
std::vector<std::unique_ptr<ColumnWriter>>& kv_writers)
: ColumnWriter(std::move(field), opts.meta->is_nullable()), _opts(opts) {
CHECK_EQ(kv_writers.size(), 2);
_offsets_writer.reset(offset_writer);
if (is_nullable()) {
_null_writer.reset(null_writer);
}
@ -965,9 +986,13 @@ MapColumnWriter::MapColumnWriter(const ColumnWriterOptions& opts, std::unique_pt
}
Status MapColumnWriter::init() {
RETURN_IF_ERROR(_offsets_writer->init());
if (is_nullable()) {
RETURN_IF_ERROR(_null_writer->init());
}
// here register_flush_page_callback to call this.put_extra_info_in_page()
// when finish cur data page
_offsets_writer->register_flush_page_callback(this);
for (auto& sub_writer : _kv_writers) {
RETURN_IF_ERROR(sub_writer->init());
}
@ -979,6 +1004,7 @@ uint64_t MapColumnWriter::estimate_buffer_size() {
for (auto& sub_writer : _kv_writers) {
estimate += sub_writer->estimate_buffer_size();
}
estimate += _offsets_writer->estimate_buffer_size();
if (is_nullable()) {
estimate += _null_writer->estimate_buffer_size();
}
@ -986,6 +1012,7 @@ uint64_t MapColumnWriter::estimate_buffer_size() {
}
Status MapColumnWriter::finish() {
RETURN_IF_ERROR(_offsets_writer->finish());
if (is_nullable()) {
RETURN_IF_ERROR(_null_writer->finish());
}
@ -997,24 +1024,39 @@ Status MapColumnWriter::finish() {
Status MapColumnWriter::append_nullable(const uint8_t* null_map, const uint8_t** ptr,
size_t num_rows) {
RETURN_IF_ERROR(append_data(ptr, num_rows));
if (is_nullable()) {
RETURN_IF_ERROR(_null_writer->append_data(&null_map, num_rows));
}
RETURN_IF_ERROR(append_data(ptr, num_rows));
return Status::OK();
}
// write key value data with offsets
Status MapColumnWriter::append_data(const uint8_t** ptr, size_t num_rows) {
auto kv_ptr = reinterpret_cast<const uint64_t*>(*ptr);
// data_ptr contains
// [size, offset_ptr, key_data_ptr, val_data_ptr, k_nullmap_ptr, v_nullmap_pr]
// which converted results from olap_map_convertor and later will use a structure to replace it
auto data_ptr = reinterpret_cast<const uint64_t*>(*ptr);
// total number length
size_t element_cnt = size_t((unsigned long)(*data_ptr));
auto offset_data = *(data_ptr + 1);
const uint8_t* offsets_ptr = (const uint8_t*)offset_data;
RETURN_IF_ERROR(_offsets_writer->append_data(&offsets_ptr, num_rows));
if (element_cnt == 0) {
return Status::OK();
}
for (size_t i = 0; i < 2; ++i) {
auto data = *(kv_ptr + i);
const uint8_t* val_ptr = (const uint8_t*)data;
RETURN_IF_ERROR(_kv_writers[i]->append_data(&val_ptr, num_rows));
auto data = *(data_ptr + 2 + i);
auto nested_null_map = *(data_ptr + 2 + 2 + i);
RETURN_IF_ERROR(_kv_writers[i]->append(reinterpret_cast<const uint8_t*>(nested_null_map),
reinterpret_cast<const void*>(data), element_cnt));
}
return Status::OK();
}
Status MapColumnWriter::write_data() {
RETURN_IF_ERROR(_offsets_writer->write_data());
if (is_nullable()) {
RETURN_IF_ERROR(_null_writer->write_data());
}
@ -1025,11 +1067,14 @@ Status MapColumnWriter::write_data() {
}
Status MapColumnWriter::write_ordinal_index() {
RETURN_IF_ERROR(_offsets_writer->write_ordinal_index());
if (is_nullable()) {
RETURN_IF_ERROR(_null_writer->write_ordinal_index());
}
for (auto& sub_writer : _kv_writers) {
RETURN_IF_ERROR(sub_writer->write_ordinal_index());
if (sub_writer->get_next_rowid() != 0) {
RETURN_IF_ERROR(sub_writer->write_ordinal_index());
}
}
return Status::OK();
}
@ -1038,13 +1083,13 @@ Status MapColumnWriter::append_nulls(size_t num_rows) {
for (auto& sub_writer : _kv_writers) {
RETURN_IF_ERROR(sub_writer->append_nulls(num_rows));
}
return write_null_column(num_rows, true);
}
const ordinal_t offset = _kv_writers[0]->get_next_rowid();
std::vector<vectorized::UInt8> offsets_data(num_rows, offset);
const uint8_t* offsets_ptr = offsets_data.data();
RETURN_IF_ERROR(_offsets_writer->append_data(&offsets_ptr, num_rows));
Status MapColumnWriter::write_null_column(size_t num_rows, bool is_null) {
if (is_nullable()) {
uint8_t null_sign = is_null ? 1 : 0;
std::vector<vectorized::UInt8> null_signs(num_rows, null_sign);
std::vector<vectorized::UInt8> null_signs(num_rows, 1);
const uint8_t* null_sign_ptr = null_signs.data();
RETURN_IF_ERROR(_null_writer->append_data(&null_sign_ptr, num_rows));
}
@ -1052,12 +1097,12 @@ Status MapColumnWriter::write_null_column(size_t num_rows, bool is_null) {
}
Status MapColumnWriter::finish_current_page() {
if (is_nullable()) {
RETURN_IF_ERROR(_null_writer->finish_current_page());
}
for (auto& sub_writer : _kv_writers) {
RETURN_IF_ERROR(sub_writer->finish_current_page());
}
return Status::NotSupported("map writer has no data, can not finish_current_page");
}
// write this value for column reader to read according offsets
Status MapColumnWriter::put_extra_info_in_page(DataPageFooterPB* footer) {
footer->set_next_array_item_ordinal(_kv_writers[0]->get_next_rowid());
return Status::OK();
}

View File

@ -374,7 +374,7 @@ private:
class MapColumnWriter final : public ColumnWriter, public FlushPageCallback {
public:
explicit MapColumnWriter(const ColumnWriterOptions& opts, std::unique_ptr<Field> field,
ScalarColumnWriter* null_writer,
ScalarColumnWriter* null_writer, ScalarColumnWriter* offsets_writer,
std::vector<std::unique_ptr<ColumnWriter>>& _kv_writers);
~MapColumnWriter() override = default;
@ -414,14 +414,15 @@ public:
}
// according key writer to get next rowid
ordinal_t get_next_rowid() const override { return _kv_writers[0]->get_next_rowid(); }
ordinal_t get_next_rowid() const override { return _offsets_writer->get_next_rowid(); }
private:
Status write_null_column(size_t num_rows, bool is_null);
Status put_extra_info_in_page(DataPageFooterPB* header) override;
std::vector<std::unique_ptr<ColumnWriter>> _kv_writers;
// we need null writer to make sure a row is null or not
std::unique_ptr<ScalarColumnWriter> _null_writer;
std::unique_ptr<ScalarColumnWriter> _offsets_writer;
std::unique_ptr<InvertedIndexColumnWriter> _inverted_index_builder;
ColumnWriterOptions _opts;
};

View File

@ -195,10 +195,9 @@ vectorized::IColumn::MutablePtr Schema::get_predicate_column_ptr(const Field& fi
}
case OLAP_FIELD_TYPE_MAP:
ptr = doris::vectorized::ColumnMap::create(
doris::vectorized::ColumnArray::create(
get_predicate_column_ptr(*field.get_sub_field(0), true)),
doris::vectorized::ColumnArray::create(
get_predicate_column_ptr(*field.get_sub_field(1), true)));
get_predicate_column_ptr(*field.get_sub_field(0)),
get_predicate_column_ptr(*field.get_sub_field(1)),
doris::vectorized::ColumnArray::ColumnOffsets::create());
break;
default:
LOG(FATAL) << "Unexpected type when choosing predicate column, type=" << field.type();

View File

@ -38,14 +38,6 @@ extern const int LOGICAL_ERROR;
extern const int TOO_LARGE_ARRAY_SIZE;
} // namespace ErrorCodes
/** Obtaining array as Field can be slow for large arrays and consume vast amount of memory.
* Just don't allow to do it.
* You can increase the limit if the following query:
* SELECT range(10000000)
* will take less than 500ms on your machine.
*/
static constexpr size_t max_array_size_as_field = 1000000;
template <typename T>
ColumnPtr ColumnArray::index_impl(const PaddedPODArray<T>& indexes, size_t limit) const {
assert(limit <= indexes.size());

View File

@ -29,6 +29,13 @@
namespace doris::vectorized {
/** Obtaining array as Field can be slow for large arrays and consume vast amount of memory.
* Just don't allow to do it.
* You can increase the limit if the following query:
* SELECT range(10000000)
* will take less than 500ms on your machine.
*/
static constexpr size_t max_array_size_as_field = 1000000;
/** A column of array values.
* In memory, it is represented as one column of a nested type, whose size is equal to the sum of the sizes of all arrays,
* and as an array of offsets in it, which allows you to get each element.

View File

@ -25,51 +25,91 @@ namespace doris::vectorized {
/** A column of map values.
*/
std::string ColumnMap::get_name() const {
return "Map(" + keys->get_name() + ", " + values->get_name() + ")";
return "Map(" + keys_column->get_name() + ", " + values_column->get_name() + ")";
}
ColumnMap::ColumnMap(MutableColumnPtr&& keys, MutableColumnPtr&& values)
: keys(std::move(keys)), values(std::move(values)) {
check_size();
}
ColumnMap::ColumnMap(MutableColumnPtr&& keys, MutableColumnPtr&& values, MutableColumnPtr&& offsets)
: keys_column(std::move(keys)),
values_column(std::move(values)),
offsets_column(std::move(offsets)) {
const COffsets* offsets_concrete = typeid_cast<const COffsets*>(offsets_column.get());
ColumnArray::Offsets64& ColumnMap::get_offsets() const {
const ColumnArray& column_keys = assert_cast<const ColumnArray&>(get_keys());
// todo . did here check size ?
return const_cast<Offsets64&>(column_keys.get_offsets());
}
if (!offsets_concrete) {
LOG(FATAL) << "offsets_column must be a ColumnUInt64";
}
void ColumnMap::check_size() const {
const auto* key_array = typeid_cast<const ColumnArray*>(keys.get());
const auto* value_array = typeid_cast<const ColumnArray*>(values.get());
CHECK(key_array) << "ColumnMap keys can be created only from array";
CHECK(value_array) << "ColumnMap values can be created only from array";
CHECK_EQ(get_keys_ptr()->size(), get_values_ptr()->size());
if (!offsets_concrete->empty() && keys && values) {
auto last_offset = offsets_concrete->get_data().back();
/// This will also prevent possible overflow in offset.
if (keys_column->size() != last_offset) {
LOG(FATAL) << "offsets_column has data inconsistent with key_column";
}
if (values_column->size() != last_offset) {
LOG(FATAL) << "offsets_column has data inconsistent with value_column";
}
}
}
// todo. here to resize every row map
MutableColumnPtr ColumnMap::clone_resized(size_t to_size) const {
auto res = ColumnMap::create(keys->clone_resized(to_size), values->clone_resized(to_size));
auto res = ColumnMap::create(get_keys().clone_empty(), get_values().clone_empty(),
COffsets::create());
if (to_size == 0) {
return res;
}
size_t from_size = size();
if (to_size <= from_size) {
res->get_offsets().assign(get_offsets().begin(), get_offsets().begin() + to_size);
res->get_keys().insert_range_from(get_keys(), 0, get_offsets()[to_size - 1]);
res->get_values().insert_range_from(get_values(), 0, get_offsets()[to_size - 1]);
} else {
/// Copy column and append empty arrays for extra elements.
Offset64 offset = 0;
if (from_size > 0) {
res->get_offsets().assign(get_offsets().begin(), get_offsets().end());
res->get_keys().insert_range_from(get_keys(), 0, get_keys().size());
res->get_values().insert_range_from(get_values(), 0, get_values().size());
offset = get_offsets().back();
}
res->get_offsets().resize(to_size);
for (size_t i = from_size; i < to_size; ++i) {
res->get_offsets()[i] = offset;
}
}
return res;
}
// to support field functions
Field ColumnMap::operator[](size_t n) const {
// Map is FieldVector , see in field.h
Map res(2);
keys->get(n, res[0]);
values->get(n, res[1]);
// Map is FieldVector, now we keep key value in seperate , see in field.h
Map m(2);
size_t start_offset = offset_at(n);
size_t element_size = size_at(n);
return res;
if (element_size > max_array_size_as_field) {
LOG(FATAL) << "element size " << start_offset
<< " is too large to be manipulated as single map field,"
<< "maximum size " << max_array_size_as_field;
}
Array k(element_size), v(element_size);
for (size_t i = 0; i < element_size; ++i) {
k[i] = get_keys()[start_offset + i];
v[i] = get_values()[start_offset + i];
}
m.push_back(k);
m.push_back(v);
return m;
}
// here to compare to below
void ColumnMap::get(size_t n, Field& res) const {
Map map(2);
keys->get(n, map[0]);
values->get(n, map[1]);
res = map;
res = operator[](n);
}
StringRef ColumnMap::get_data_at(size_t n) const {
@ -83,34 +123,41 @@ void ColumnMap::insert_data(const char*, size_t) {
void ColumnMap::insert(const Field& x) {
const auto& map = doris::vectorized::get<const Map&>(x);
CHECK_EQ(map.size(), 2);
keys->insert(map[0]);
values->insert(map[1]);
const auto& k_f = doris::vectorized::get<const Array&>(map[0]);
const auto& v_f = doris::vectorized::get<const Array&>(map[1]);
size_t element_size = k_f.size();
for (size_t i = 0; i < element_size; ++i) {
keys_column->insert(k_f[i]);
values_column->insert(v_f[i]);
}
get_offsets().push_back(get_offsets().back() + element_size);
}
void ColumnMap::insert_default() {
keys->insert_default();
values->insert_default();
auto last_offset = get_offsets().back();
get_offsets().push_back(last_offset);
}
void ColumnMap::pop_back(size_t n) {
keys->pop_back(n);
values->pop_back(n);
}
auto& offsets_data = get_offsets();
DCHECK(n <= offsets_data.size());
size_t elems_size = offsets_data.back() - offset_at(offsets_data.size() - n);
StringRef ColumnMap::serialize_value_into_arena(size_t n, Arena& arena, char const*& begin) const {
StringRef res(begin, 0);
auto keys_ref = keys->serialize_value_into_arena(n, arena, begin);
res.data = keys_ref.data - res.size;
res.size += keys_ref.size;
auto value_ref = values->serialize_value_into_arena(n, arena, begin);
res.data = value_ref.data - res.size;
res.size += value_ref.size;
DCHECK_EQ(keys_column->size(), values_column->size());
if (elems_size) {
keys_column->pop_back(elems_size);
values_column->pop_back(elems_size);
}
return res;
offsets_data.resize_assume_reserved(offsets_data.size() - n);
}
void ColumnMap::insert_from(const IColumn& src_, size_t n) {
const ColumnMap& src = assert_cast<const ColumnMap&>(src_);
size_t size = src.size_at(n);
size_t offset = src.offset_at(n);
if ((!get_keys().is_nullable() && src.get_keys().is_nullable()) ||
(!get_values().is_nullable() && src.get_values().is_nullable())) {
@ -119,9 +166,11 @@ void ColumnMap::insert_from(const IColumn& src_, size_t n) {
(get_values().is_nullable() && !src.get_values().is_nullable())) {
DCHECK(false);
} else {
keys->insert_from(*assert_cast<const ColumnMap&>(src_).keys, n);
values->insert_from(*assert_cast<const ColumnMap&>(src_).values, n);
keys_column->insert_range_from(src.get_keys(), offset, size);
values_column->insert_range_from(src.get_values(), offset, size);
}
get_offsets().push_back(get_offsets().back() + size);
}
void ColumnMap::insert_indices_from(const IColumn& src, const int* indices_begin,
@ -135,71 +184,195 @@ void ColumnMap::insert_indices_from(const IColumn& src, const int* indices_begin
}
}
const char* ColumnMap::deserialize_and_insert_from_arena(const char* pos) {
pos = keys->deserialize_and_insert_from_arena(pos);
pos = values->deserialize_and_insert_from_arena(pos);
StringRef ColumnMap::serialize_value_into_arena(size_t n, Arena& arena, char const*& begin) const {
size_t array_size = size_at(n);
size_t offset = offset_at(n);
char* pos = arena.alloc_continue(sizeof(array_size), begin);
memcpy(pos, &array_size, sizeof(array_size));
StringRef res(pos, sizeof(array_size));
for (size_t i = 0; i < array_size; ++i) {
auto value_ref = get_keys().serialize_value_into_arena(offset + i, arena, begin);
res.data = value_ref.data - res.size;
res.size += value_ref.size;
}
for (size_t i = 0; i < array_size; ++i) {
auto value_ref = get_values().serialize_value_into_arena(offset + i, arena, begin);
res.data = value_ref.data - res.size;
res.size += value_ref.size;
}
return res;
}
const char* ColumnMap::deserialize_and_insert_from_arena(const char* pos) {
size_t array_size = unaligned_load<size_t>(pos);
pos += 2 * sizeof(array_size);
for (size_t i = 0; i < array_size; ++i) {
pos = get_keys().deserialize_and_insert_from_arena(pos);
}
for (size_t i = 0; i < array_size; ++i) {
pos = get_values().deserialize_and_insert_from_arena(pos);
}
get_offsets().push_back(get_offsets().back() + array_size);
return pos;
}
void ColumnMap::update_hash_with_value(size_t n, SipHash& hash) const {
keys->update_hash_with_value(n, hash);
values->update_hash_with_value(n, hash);
size_t array_size = size_at(n);
size_t offset = offset_at(n);
for (size_t i = 0; i < array_size; ++i) {
get_keys().update_hash_with_value(offset + i, hash);
get_values().update_hash_with_value(offset + i, hash);
}
}
void ColumnMap::insert_range_from(const IColumn& src, size_t start, size_t length) {
keys->insert_range_from(*assert_cast<const ColumnMap&>(src).keys, start, length);
values->insert_range_from(*assert_cast<const ColumnMap&>(src).values, start, length);
if (length == 0) {
return;
}
const ColumnMap& src_concrete = assert_cast<const ColumnMap&>(src);
if (start + length > src_concrete.size()) {
LOG(FATAL) << "Parameter out of bound in ColumnMap::insert_range_from method. [start("
<< std::to_string(start) << ") + length(" << std::to_string(length)
<< ") > offsets.size(" << std::to_string(src_concrete.size()) << ")]";
}
size_t nested_offset = src_concrete.offset_at(start);
size_t nested_length = src_concrete.get_offsets()[start + length - 1] - nested_offset;
keys_column->insert_range_from(src_concrete.get_keys(), nested_offset, nested_length);
values_column->insert_range_from(src_concrete.get_values(), nested_offset, nested_length);
auto& cur_offsets = get_offsets();
const auto& src_offsets = src_concrete.get_offsets();
if (start == 0 && cur_offsets.empty()) {
cur_offsets.assign(src_offsets.begin(), src_offsets.begin() + length);
} else {
size_t old_size = cur_offsets.size();
// -1 is ok, because PaddedPODArray pads zeros on the left.
size_t prev_max_offset = cur_offsets.back();
cur_offsets.resize(old_size + length);
for (size_t i = 0; i < length; ++i) {
cur_offsets[old_size + i] = src_offsets[start + i] - nested_offset + prev_max_offset;
}
}
}
ColumnPtr ColumnMap::filter(const Filter& filt, ssize_t result_size_hint) const {
return ColumnMap::create(keys->filter(filt, result_size_hint),
values->filter(filt, result_size_hint));
auto k_arr =
ColumnArray::create(keys_column->assume_mutable(), offsets_column->assume_mutable())
->filter(filt, result_size_hint);
auto v_arr =
ColumnArray::create(values_column->assume_mutable(), offsets_column->assume_mutable())
->filter(filt, result_size_hint);
return ColumnMap::create(assert_cast<const ColumnArray&>(*k_arr).get_data_ptr(),
assert_cast<const ColumnArray&>(*v_arr).get_data_ptr(),
assert_cast<const ColumnArray&>(*k_arr).get_offsets_ptr());
}
size_t ColumnMap::filter(const Filter& filter) {
const auto key_result_size = keys->filter(filter);
const auto value_result_size = values->filter(filter);
CHECK_EQ(key_result_size, value_result_size);
return value_result_size;
MutableColumnPtr copied_off = offsets_column->clone_empty();
copied_off->insert_range_from(*offsets_column, 0, offsets_column->size());
ColumnArray::create(keys_column->assume_mutable(), offsets_column->assume_mutable())
->filter(filter);
ColumnArray::create(values_column->assume_mutable(), copied_off->assume_mutable())
->filter(filter);
return get_offsets().size();
}
Status ColumnMap::filter_by_selector(const uint16_t* sel, size_t sel_size, IColumn* col_ptr) {
auto to = reinterpret_cast<vectorized::ColumnMap*>(col_ptr);
auto& array_keys = assert_cast<vectorized::ColumnArray&>(*keys);
array_keys.filter_by_selector(sel, sel_size, &to->get_keys());
auto& to_offsets = to->get_offsets();
auto& array_values = assert_cast<vectorized::ColumnArray&>(*values);
array_values.filter_by_selector(sel, sel_size, &to->get_values());
size_t element_size = 0;
size_t max_offset = 0;
for (size_t i = 0; i < sel_size; ++i) {
element_size += size_at(sel[i]);
max_offset = std::max(max_offset, offset_at(sel[i]));
}
if (max_offset > std::numeric_limits<uint16_t>::max()) {
return Status::IOError("map elements too large than uint16_t::max");
}
to_offsets.reserve(to_offsets.size() + sel_size);
auto nested_sel = std::make_unique<uint16_t[]>(element_size);
size_t nested_sel_size = 0;
for (size_t i = 0; i < sel_size; ++i) {
auto row_off = offset_at(sel[i]);
auto row_size = size_at(sel[i]);
to_offsets.push_back(to_offsets.back() + row_size);
for (auto j = 0; j < row_size; ++j) {
nested_sel[nested_sel_size++] = row_off + j;
}
}
if (nested_sel_size > 0) {
keys_column->filter_by_selector(nested_sel.get(), nested_sel_size, &to->get_keys());
values_column->filter_by_selector(nested_sel.get(), nested_sel_size, &to->get_values());
}
return Status::OK();
}
ColumnPtr ColumnMap::permute(const Permutation& perm, size_t limit) const {
return ColumnMap::create(keys->permute(perm, limit), values->permute(perm, limit));
// Make a temp column array
auto k_arr =
ColumnArray::create(keys_column->assume_mutable(), offsets_column->assume_mutable())
->permute(perm, limit);
auto v_arr =
ColumnArray::create(values_column->assume_mutable(), offsets_column->assume_mutable())
->permute(perm, limit);
return ColumnMap::create(assert_cast<const ColumnArray&>(*k_arr).get_data_ptr(),
assert_cast<const ColumnArray&>(*v_arr).get_data_ptr(),
assert_cast<const ColumnArray&>(*k_arr).get_offsets_ptr());
}
ColumnPtr ColumnMap::replicate(const Offsets& offsets) const {
return ColumnMap::create(keys->replicate(offsets), values->replicate(offsets));
// Make a temp column array for reusing its replicate function
auto k_arr =
ColumnArray::create(keys_column->assume_mutable(), offsets_column->assume_mutable())
->replicate(offsets);
auto v_arr =
ColumnArray::create(values_column->assume_mutable(), offsets_column->assume_mutable())
->replicate(offsets);
auto res = ColumnMap::create(assert_cast<const ColumnArray&>(*k_arr).get_data_ptr(),
assert_cast<const ColumnArray&>(*v_arr).get_data_ptr(),
assert_cast<const ColumnArray&>(*k_arr).get_offsets_ptr());
return res;
}
void ColumnMap::reserve(size_t n) {
get_keys().reserve(n);
get_values().reserve(n);
get_offsets().reserve(n);
keys_column->reserve(n);
values_column->reserve(n);
}
size_t ColumnMap::byte_size() const {
return get_keys().byte_size() + get_values().byte_size();
return keys_column->byte_size() + values_column->byte_size() + offsets_column->byte_size();
;
}
size_t ColumnMap::allocated_bytes() const {
return get_keys().allocated_bytes() + get_values().allocated_bytes();
return keys_column->allocated_bytes() + values_column->allocated_bytes() +
get_offsets().allocated_bytes();
}
void ColumnMap::protect() {
get_keys().protect();
get_values().protect();
offsets_column->protect();
keys_column->protect();
values_column->protect();
}
} // namespace doris::vectorized

View File

@ -37,9 +37,11 @@ public:
* Use IColumn::mutate in order to make mutable column and mutate shared nested columns.
*/
using Base = COWHelper<IColumn, ColumnMap>;
using COffsets = ColumnArray::ColumnOffsets;
static Ptr create(const ColumnPtr& keys, const ColumnPtr& values) {
return ColumnMap::create(keys->assume_mutable(), values->assume_mutable());
static Ptr create(const ColumnPtr& keys, const ColumnPtr& values, const ColumnPtr& offsets) {
return ColumnMap::create(keys->assume_mutable(), values->assume_mutable(),
offsets->assume_mutable());
}
template <typename... Args,
@ -53,19 +55,21 @@ public:
TypeIndex get_data_type() const override { return TypeIndex::Map; }
void for_each_subcolumn(ColumnCallback callback) override {
callback(keys);
callback(values);
callback(keys_column);
callback(values_column);
callback(offsets_column);
}
void clear() override {
keys->clear();
values->clear();
keys_column->clear();
values_column->clear();
offsets_column->clear();
}
MutableColumnPtr clone_resized(size_t size) const override;
bool can_be_inside_nullable() const override { return true; }
size_t size() const override { return keys->size(); }
Field operator[](size_t n) const override;
void get(size_t n, Field& res) const override;
StringRef get_data_at(size_t n) const override;
@ -116,38 +120,51 @@ public:
void replace_column_data_default(size_t self_row = 0) override {
LOG(FATAL) << "replace_column_data_default not implemented";
}
void check_size() const;
ColumnArray::Offsets64& get_offsets() const;
ColumnArray::Offsets64& ALWAYS_INLINE get_offsets() {
return assert_cast<COffsets&>(*offsets_column).get_data();
}
const ColumnArray::Offsets64& ALWAYS_INLINE get_offsets() const {
return assert_cast<const COffsets&>(*offsets_column).get_data();
}
IColumn& get_offsets_column() { return *offsets_column; }
const IColumn& get_offsets_column() const { return *offsets_column; }
const ColumnPtr& get_offsets_ptr() const { return offsets_column; }
ColumnPtr& get_offsets_ptr() { return offsets_column; }
size_t size() const override { return get_offsets().size(); }
void reserve(size_t n) override;
size_t byte_size() const override;
size_t allocated_bytes() const override;
void protect() override;
/******************** keys and values ***************/
const ColumnPtr& get_keys_ptr() const { return keys; }
ColumnPtr& get_keys_ptr() { return keys; }
const ColumnPtr& get_keys_ptr() const { return keys_column; }
ColumnPtr& get_keys_ptr() { return keys_column; }
const IColumn& get_keys() const { return *keys; }
IColumn& get_keys() { return *keys; }
const IColumn& get_keys() const { return *keys_column; }
IColumn& get_keys() { return *keys_column; }
const ColumnPtr& get_values_ptr() const { return values; }
ColumnPtr& get_values_ptr() { return values; }
const ColumnPtr& get_values_ptr() const { return values_column; }
ColumnPtr& get_values_ptr() { return values_column; }
const IColumn& get_values() const { return *values; }
IColumn& get_values() { return *values; }
const IColumn& get_values() const { return *values_column; }
IColumn& get_values() { return *values_column; }
private:
friend class COWHelper<IColumn, ColumnMap>;
WrappedPtr keys; // nullable
WrappedPtr values; // nullable
WrappedPtr keys_column; // nullable
WrappedPtr values_column; // nullable
WrappedPtr offsets_column; // offset
size_t ALWAYS_INLINE offset_at(ssize_t i) const { return get_offsets()[i - 1]; }
size_t ALWAYS_INLINE size_at(ssize_t i) const {
return get_offsets()[i] - get_offsets()[i - 1];
}
ColumnMap(MutableColumnPtr&& keys, MutableColumnPtr&& values);
ColumnMap(MutableColumnPtr&& keys, MutableColumnPtr&& values, MutableColumnPtr&& offsets);
ColumnMap(const ColumnMap&) = default;
};

View File

@ -363,9 +363,8 @@ DataTypePtr DataTypeFactory::create_data_type(const PColumnMeta& pcolumn) {
case PGenericType::MAP:
DCHECK(pcolumn.children_size() == 2);
// here to check pcolumn is list?
nested = std::make_shared<vectorized::DataTypeMap>(
create_data_type(pcolumn.children(0).children(0)),
create_data_type(pcolumn.children(1).children(0)));
nested = std::make_shared<vectorized::DataTypeMap>(create_data_type(pcolumn.children(0)),
create_data_type(pcolumn.children(1)));
break;
case PGenericType::STRUCT: {
size_t col_size = pcolumn.children_size();

View File

@ -21,12 +21,9 @@
namespace doris::vectorized {
DataTypeMap::DataTypeMap(const DataTypePtr& keys_, const DataTypePtr& values_) {
key_type = make_nullable(keys_);
value_type = make_nullable(values_);
keys = std::make_shared<DataTypeArray>(key_type);
values = std::make_shared<DataTypeArray>(value_type);
DataTypeMap::DataTypeMap(const DataTypePtr& key_type_, const DataTypePtr& value_type_) {
key_type = key_type_;
value_type = value_type_;
}
std::string DataTypeMap::to_string(const IColumn& column, size_t row_num) const {
@ -36,11 +33,8 @@ std::string DataTypeMap::to_string(const IColumn& column, size_t row_num) const
size_t offset = offsets[row_num - 1];
size_t next_offset = offsets[row_num];
auto& keys_arr = assert_cast<const ColumnArray&>(map_column.get_keys());
auto& values_arr = assert_cast<const ColumnArray&>(map_column.get_values());
const IColumn& nested_keys_column = keys_arr.get_data();
const IColumn& nested_values_column = values_arr.get_data();
const IColumn& nested_keys_column = map_column.get_keys();
const IColumn& nested_values_column = map_column.get_values();
std::string str;
str += "{";
@ -51,7 +45,7 @@ std::string DataTypeMap::to_string(const IColumn& column, size_t row_num) const
if (nested_keys_column.is_null_at(i)) {
str += "null";
} else if (WhichDataType(remove_nullable(key_type)).is_string_or_fixed_string()) {
str += "'" + key_type->to_string(nested_keys_column, i) + "'";
str += "\"" + key_type->to_string(nested_keys_column, i) + "\"";
} else {
str += key_type->to_string(nested_keys_column, i);
}
@ -59,7 +53,7 @@ std::string DataTypeMap::to_string(const IColumn& column, size_t row_num) const
if (nested_values_column.is_null_at(i)) {
str += "null";
} else if (WhichDataType(remove_nullable(value_type)).is_string_or_fixed_string()) {
str += "'" + value_type->to_string(nested_values_column, i) + "'";
str += "\"" + value_type->to_string(nested_values_column, i) + "\"";
} else {
str += value_type->to_string(nested_values_column, i);
}
@ -172,14 +166,10 @@ Status DataTypeMap::from_string(ReadBuffer& rb, IColumn* column) const {
// {"aaa": 1, "bbb": 20}, need to handle key slot and value slot to make key column arr and value arr
// skip "{"
++rb.position();
auto& keys_arr = reinterpret_cast<ColumnArray&>(map_column->get_keys());
ColumnArray::Offsets64& key_off = keys_arr.get_offsets();
auto& values_arr = reinterpret_cast<ColumnArray&>(map_column->get_values());
ColumnArray::Offsets64& val_off = values_arr.get_offsets();
IColumn& nested_key_column = keys_arr.get_data();
ColumnArray::Offsets64& map_off = map_column->get_offsets();
IColumn& nested_key_column = map_column->get_keys();
DCHECK(nested_key_column.is_nullable());
IColumn& nested_val_column = values_arr.get_data();
IColumn& nested_val_column = map_column->get_values();
DCHECK(nested_val_column.is_nullable());
size_t element_num = 0;
@ -187,13 +177,18 @@ Status DataTypeMap::from_string(ReadBuffer& rb, IColumn* column) const {
StringRef key_element(rb.position(), rb.count());
bool has_quota = false;
if (!next_slot_from_string(rb, key_element, has_quota)) {
// pop this current row which already put element_num item into this row.
map_column->get_keys().pop_back(element_num);
map_column->get_values().pop_back(element_num);
return Status::InvalidArgument("Cannot read map key from text '{}'",
key_element.to_string());
}
if (!is_empty_null_element(key_element, &nested_key_column, has_quota)) {
ReadBuffer krb(const_cast<char*>(key_element.data), key_element.size);
if (auto st = key_type->from_string(krb, &nested_key_column); !st.ok()) {
map_column->pop_back(element_num);
// pop this current row which already put element_num item into this row.
map_column->get_keys().pop_back(element_num);
map_column->get_values().pop_back(element_num);
return st;
}
}
@ -201,34 +196,38 @@ Status DataTypeMap::from_string(ReadBuffer& rb, IColumn* column) const {
has_quota = false;
StringRef value_element(rb.position(), rb.count());
if (!next_slot_from_string(rb, value_element, has_quota)) {
// +1 just because key column already put succeed , but element_num not refresh here
map_column->get_keys().pop_back(element_num + 1);
map_column->get_values().pop_back(element_num);
return Status::InvalidArgument("Cannot read map value from text '{}'",
value_element.to_string());
}
if (!is_empty_null_element(value_element, &nested_val_column, has_quota)) {
ReadBuffer vrb(const_cast<char*>(value_element.data), value_element.size);
if (auto st = value_type->from_string(vrb, &nested_val_column); !st.ok()) {
map_column->pop_back(element_num);
map_column->get_keys().pop_back(element_num + 1);
map_column->get_values().pop_back(element_num);
return st;
}
}
++element_num;
}
key_off.push_back(key_off.back() + element_num);
val_off.push_back(val_off.back() + element_num);
map_off.push_back(map_off.back() + element_num);
}
return Status::OK();
}
MutableColumnPtr DataTypeMap::create_column() const {
return ColumnMap::create(keys->create_column(), values->create_column());
return ColumnMap::create(key_type->create_column(), value_type->create_column(),
ColumnArray::ColumnOffsets::create());
}
void DataTypeMap::to_pb_column_meta(PColumnMeta* col_meta) const {
IDataType::to_pb_column_meta(col_meta);
auto key_children = col_meta->add_children();
auto value_children = col_meta->add_children();
keys->to_pb_column_meta(key_children);
values->to_pb_column_meta(value_children);
key_type->to_pb_column_meta(key_children);
value_type->to_pb_column_meta(value_children);
}
bool DataTypeMap::equals(const IDataType& rhs) const {
@ -238,11 +237,11 @@ bool DataTypeMap::equals(const IDataType& rhs) const {
const DataTypeMap& rhs_map = static_cast<const DataTypeMap&>(rhs);
if (!keys->equals(*rhs_map.keys)) {
if (!key_type->equals(*rhs_map.key_type)) {
return false;
}
if (!values->equals(*rhs_map.values)) {
if (!value_type->equals(*rhs_map.value_type)) {
return false;
}
@ -253,8 +252,10 @@ int64_t DataTypeMap::get_uncompressed_serialized_bytes(const IColumn& column,
int data_version) const {
auto ptr = column.convert_to_full_column_if_const();
const auto& data_column = assert_cast<const ColumnMap&>(*ptr.get());
return get_keys()->get_uncompressed_serialized_bytes(data_column.get_keys(), data_version) +
get_values()->get_uncompressed_serialized_bytes(data_column.get_values(), data_version);
return sizeof(ColumnArray::Offset64) * (column.size() + 1) +
get_key_type()->get_uncompressed_serialized_bytes(data_column.get_keys(), data_version) +
get_value_type()->get_uncompressed_serialized_bytes(data_column.get_values(),
data_version);
}
// serialize to binary
@ -262,15 +263,32 @@ char* DataTypeMap::serialize(const IColumn& column, char* buf, int data_version)
auto ptr = column.convert_to_full_column_if_const();
const auto& map_column = assert_cast<const ColumnMap&>(*ptr.get());
buf = get_keys()->serialize(map_column.get_keys(), buf, data_version);
return get_values()->serialize(map_column.get_values(), buf, data_version);
// row num
*reinterpret_cast<ColumnArray::Offset64*>(buf) = column.size();
buf += sizeof(ColumnArray::Offset64);
// offsets
memcpy(buf, map_column.get_offsets().data(), column.size() * sizeof(ColumnArray::Offset64));
buf += column.size() * sizeof(ColumnArray::Offset64);
// key value
buf = get_key_type()->serialize(map_column.get_keys(), buf, data_version);
return get_value_type()->serialize(map_column.get_values(), buf, data_version);
}
const char* DataTypeMap::deserialize(const char* buf, IColumn* column, int data_version) const {
const auto* map_column = assert_cast<const ColumnMap*>(column);
buf = get_keys()->deserialize(buf, map_column->get_keys_ptr()->assume_mutable(), data_version);
return get_values()->deserialize(buf, map_column->get_values_ptr()->assume_mutable(),
data_version);
auto* map_column = assert_cast<ColumnMap*>(column);
auto& map_offsets = map_column->get_offsets();
// row num
ColumnArray::Offset64 row_num = *reinterpret_cast<const ColumnArray::Offset64*>(buf);
buf += sizeof(ColumnArray::Offset64);
// offsets
map_offsets.resize(row_num);
memcpy(map_offsets.data(), buf, sizeof(ColumnArray::Offset64) * row_num);
buf += sizeof(ColumnArray::Offset64) * row_num;
// key value
buf = get_key_type()->deserialize(buf, map_column->get_keys_ptr()->assume_mutable(),
data_version);
return get_value_type()->deserialize(buf, map_column->get_values_ptr()->assume_mutable(),
data_version);
}
} // namespace doris::vectorized

View File

@ -31,21 +31,16 @@
namespace doris::vectorized {
/** Map data type.
*
* Map's key and value only have types.
* If only one type is set, then key's type is "String" in default.
*/
class DataTypeMap final : public IDataType {
private:
DataTypePtr key_type;
DataTypePtr value_type;
DataTypePtr keys; // array
DataTypePtr values; // array
public:
static constexpr bool is_parametric = true;
DataTypeMap(const DataTypePtr& keys_, const DataTypePtr& values_);
DataTypeMap(const DataTypePtr& key_type_, const DataTypePtr& value_type_);
TypeIndex get_type_id() const override { return TypeIndex::Map; }
std::string do_get_name() const override {
@ -67,9 +62,6 @@ public:
return true;
}
const DataTypePtr& get_keys() const { return keys; }
const DataTypePtr& get_values() const { return values; }
const DataTypePtr& get_key_type() const { return key_type; }
const DataTypePtr& get_value_type() const { return value_type; }

View File

@ -97,12 +97,7 @@ public:
private:
//=========================== map element===========================//
ColumnPtr _get_mapped_idx(const ColumnArray& key_column,
const ColumnWithTypeAndName& argument) {
return _mapped_key(key_column, argument);
}
ColumnPtr _mapped_key(const ColumnArray& column, const ColumnWithTypeAndName& argument) {
ColumnPtr _get_mapped_idx(const ColumnArray& column, const ColumnWithTypeAndName& argument) {
auto right_column = argument.column->convert_to_full_column_if_const();
const ColumnArray::Offsets64& offsets = column.get_offsets();
ColumnPtr nested_ptr = nullptr;
@ -236,25 +231,28 @@ private:
const UInt8* src_null_map, UInt8* dst_null_map) {
auto left_column = arguments[0].column->convert_to_full_column_if_const();
DataTypePtr val_type =
reinterpret_cast<const DataTypeMap&>(*arguments[0].type).get_values();
reinterpret_cast<const DataTypeMap&>(*arguments[0].type).get_value_type();
const auto& map_column = reinterpret_cast<const ColumnMap&>(*left_column);
const ColumnArray& column_keys = assert_cast<const ColumnArray&>(map_column.get_keys());
// create column array to find keys
auto key_arr = ColumnArray::create(map_column.get_keys_ptr(), map_column.get_offsets_ptr());
auto val_arr =
ColumnArray::create(map_column.get_values_ptr(), map_column.get_offsets_ptr());
const auto& offsets = column_keys.get_offsets();
const auto& offsets = map_column.get_offsets();
const size_t rows = offsets.size();
if (rows <= 0) {
return nullptr;
}
ColumnPtr matched_indices = _get_mapped_idx(column_keys, arguments[1]);
ColumnPtr matched_indices = _get_mapped_idx(*key_arr, arguments[1]);
if (!matched_indices) {
return nullptr;
}
DataTypePtr indices_type(std::make_shared<vectorized::DataTypeInt8>());
ColumnWithTypeAndName indices(matched_indices, indices_type, "indices");
ColumnWithTypeAndName data(map_column.get_values_ptr(), val_type, "value");
ColumnWithTypeAndName data(val_arr, val_type, "value");
ColumnsWithTypeAndName args = {data, indices};
return _execute_non_nullable(args, input_rows_count, src_null_map, dst_null_map);
}

View File

@ -136,10 +136,8 @@ OlapBlockDataConvertor::create_olap_column_data_convertor(const TabletColumn& co
const auto& key_column = column.get_sub_column(0);
const auto& value_column = column.get_sub_column(1);
return std::make_unique<OlapColumnDataConvertorMap>(
std::make_unique<OlapColumnDataConvertorArray>(
create_olap_column_data_convertor(key_column)),
std::make_unique<OlapColumnDataConvertorArray>(
create_olap_column_data_convertor(value_column)));
create_olap_column_data_convertor(key_column),
create_olap_column_data_convertor(value_column));
}
default: {
DCHECK(false) << "Invalid type in RowBlockV2:" << column.type();
@ -810,30 +808,38 @@ Status OlapBlockDataConvertor::OlapColumnDataConvertorMap::convert_to_olap(
const ColumnMap* column_map, const DataTypeMap* data_type_map) {
ColumnPtr key_data = column_map->get_keys_ptr();
ColumnPtr value_data = column_map->get_values_ptr();
if (column_map->get_keys().is_nullable()) {
const auto& key_nullable_column =
assert_cast<const ColumnNullable&>(column_map->get_keys());
key_data = key_nullable_column.get_nested_column_ptr();
}
if (column_map->get_values().is_nullable()) {
const auto& val_nullable_column =
assert_cast<const ColumnNullable&>(column_map->get_values());
value_data = val_nullable_column.get_nested_column_ptr();
}
// offsets data
auto& offsets = column_map->get_offsets();
// make first offset
auto offsets_col = ColumnArray::ColumnOffsets::create();
ColumnWithTypeAndName key_typed_column = {key_data, remove_nullable(data_type_map->get_keys()),
"map.key"};
_key_convertor->set_source_column(key_typed_column, _row_pos, _num_rows);
// Now map column offsets data layout in memory is [3, 6, 9], and in disk should be [0, 3, 6, 9]
_offsets.reserve(offsets.size() + 1);
_offsets.push_back(_row_pos); // _offsets start with current map offsets
_offsets.insert_assume_reserved(offsets.begin(), offsets.end());
int64_t start_index = _row_pos - 1;
int64_t end_index = _row_pos + _num_rows - 1;
auto start = offsets[start_index];
auto size = offsets[end_index] - start;
ColumnWithTypeAndName key_typed_column = {key_data, data_type_map->get_key_type(), "map.key"};
_key_convertor->set_source_column(key_typed_column, start, size);
_key_convertor->convert_to_olap();
ColumnWithTypeAndName value_typed_column = {
value_data, remove_nullable(data_type_map->get_values()), "map.value"};
_value_convertor->set_source_column(value_typed_column, _row_pos, _num_rows);
ColumnWithTypeAndName value_typed_column = {value_data, data_type_map->get_value_type(),
"map.value"};
_value_convertor->set_source_column(value_typed_column, start, size);
_value_convertor->convert_to_olap();
_results[0] = _key_convertor->get_data();
_results[1] = _value_convertor->get_data();
// todo (Amory). put this value into MapValue
_results[0] = (void*)size;
_results[1] = _offsets.data();
_results[2] = _key_convertor->get_data();
_results[3] = _value_convertor->get_data();
_results[4] = _key_convertor->get_nullmap();
_results[5] = _value_convertor->get_nullmap();
return Status::OK();
}

View File

@ -408,7 +408,7 @@ private:
OlapColumnDataConvertorBaseUPtr value_convertor)
: _key_convertor(std::move(key_convertor)),
_value_convertor(std::move(value_convertor)) {
_results.resize(2);
_results.resize(6); // size + offset + k_data + v_data + k_nullmap + v_nullmap
}
Status convert_to_olap() override;
@ -422,6 +422,7 @@ private:
OlapColumnDataConvertorBaseUPtr _key_convertor;
OlapColumnDataConvertorBaseUPtr _value_convertor;
std::vector<const void*> _results;
PaddedPODArray<UInt64> _offsets;
}; //OlapColumnDataConvertorMap
private:

View File

@ -1,4 +1,4 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql --
1 {1:'a', 2:'doris'}
1 {1:"a", 2:"doris"}
2 {}

View File

@ -2,7 +2,6 @@
-- !select --
1 \N
2 {}
3 {' 33,amory ':2, ' bet ':20, ' cler ':26}
4 {'k3':23, null:20, 'k4':null}
3 {" 33,amory ":2, " bet ":20, " cler ":26}
4 {"k3":23, null:20, "k4":null}
5 {null:null}

View File

@ -1,8 +1,7 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select --
1 {' amory ':6, 'happy':38}
6 {'amory':6, 'is':38, 'cl':0}
1 {" amory ":6, "happy":38}
6 {"amory":6, "is":38, "cl":0}
-- !select --
100 {1:'1', 2:'2', 3:'3'} {32767:'32767', 32768:'32768', 32769:'32769'} [65534, 65535, 65536] {2022-07-13:1} {2022-07-13 12:30:00:'2022-07-13 12:30:00'} {0.33:33, 0.67:67}
100 {1:"1", 2:"2", 3:"3"} {32767:"32767", 32768:"32768", 32769:"32769"} [65534, 65535, 65536] {2022-07-13:1} {2022-07-13 12:30:00:"2022-07-13 12:30:00"} {0.33:33, 0.67:67}

View File

@ -1,19 +1,19 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select_all --
1 \N
2 {' 11amory ':23, 'beat':20, ' clever ':66}
3 {'k1':31, 'k2':300}
2 {" 11amory ":23, "beat":20, " clever ":66}
3 {"k1":31, "k2":300}
4 {}
5 \N
6 {'k1':41, 'k2':400}
7 {' 33,amory ':2, ' bet ':20, ' cler ':26}
6 {"k1":41, "k2":400}
7 {" 33,amory ":2, " bet ":20, " cler ":26}
8 {}
9 {' 1,amy ':2, ' k2 ':90, ' k7 ':33}
9 {" 1,amy ":2, " k2 ":90, " k7 ":33}
10 {}
11 \N
12 {'k3':23, null:20, 'k4':null}
13 {'null':1}
15 {'':2, 'k2':0}
12 {"k3":23, null:20, "k4":null}
13 {"null":1}
15 {"":2, "k2":0}
16 {null:null}
-- !select_m --
@ -49,4 +49,3 @@
-- !select_m5 --
1 100 200 \N