From 6d040a33afa292e2c2e75239d77f82b77fcc3cfa Mon Sep 17 00:00:00 2001 From: kangpinghuang Date: Sat, 24 Aug 2019 00:57:30 +0800 Subject: [PATCH] Add zone map page(#1390) (#1633) --- be/src/olap/CMakeLists.txt | 1 + be/src/olap/field.h | 15 +- be/src/olap/iterators.h | 9 +- .../rowset/segment_v2/binary_plain_page.h | 15 +- .../olap/rowset/segment_v2/column_reader.cpp | 68 +++- be/src/olap/rowset/segment_v2/column_reader.h | 19 +- .../olap/rowset/segment_v2/column_writer.cpp | 41 ++- be/src/olap/rowset/segment_v2/column_writer.h | 8 +- .../rowset/segment_v2/column_zone_map.cpp | 107 ++++++ .../olap/rowset/segment_v2/column_zone_map.h | 100 ++++++ .../rowset/segment_v2/ordinal_page_index.cpp | 5 +- .../rowset/segment_v2/ordinal_page_index.h | 33 +- be/src/olap/rowset/segment_v2/row_ranges.h | 284 ++++++++++++++++ be/src/olap/rowset/segment_v2/segment.cpp | 2 +- .../rowset/segment_v2/segment_iterator.cpp | 96 +++++- .../olap/rowset/segment_v2/segment_iterator.h | 10 + .../olap/rowset/segment_v2/segment_writer.cpp | 12 + .../olap/rowset/segment_v2/segment_writer.h | 1 + be/src/olap/stream_index_common.cpp | 9 +- be/src/olap/types.cpp | 2 +- be/src/olap/types.h | 12 +- be/src/olap/wrapper_field.cpp | 14 +- be/src/olap/wrapper_field.h | 16 + be/src/util/doris_metrics.cpp | 18 + be/src/util/doris_metrics.h | 5 + be/test/olap/CMakeLists.txt | 2 + .../segment_v2/column_reader_writer_test.cpp | 11 +- .../segment_v2/column_zone_map_test.cpp | 125 +++++++ .../segment_v2/ordinal_page_index_test.cpp | 8 +- .../rowset/segment_v2/row_ranges_test.cpp | 113 +++++++ .../olap/rowset/segment_v2/segment_test.cpp | 113 ++++++- gensrc/proto/segment_v2.proto | 317 +++++++++--------- run-ut.sh | 2 + 33 files changed, 1366 insertions(+), 227 deletions(-) create mode 100644 be/src/olap/rowset/segment_v2/column_zone_map.cpp create mode 100644 be/src/olap/rowset/segment_v2/column_zone_map.h create mode 100644 be/src/olap/rowset/segment_v2/row_ranges.h create mode 100644 be/test/olap/rowset/segment_v2/column_zone_map_test.cpp create mode 100644 be/test/olap/rowset/segment_v2/row_ranges_test.cpp diff --git a/be/src/olap/CMakeLists.txt b/be/src/olap/CMakeLists.txt index 9148a25925..aba1a3817c 100644 --- a/be/src/olap/CMakeLists.txt +++ b/be/src/olap/CMakeLists.txt @@ -93,6 +93,7 @@ add_library(Olap STATIC rowset/segment_v2/segment.cpp rowset/segment_v2/segment_iterator.cpp rowset/segment_v2/segment_writer.cpp + rowset/segment_v2/column_zone_map.cpp rowset_factory.cpp task/engine_batch_load_task.cpp task/engine_checksum_task.cpp diff --git a/be/src/olap/field.h b/be/src/olap/field.h index c2aded1212..ccc5cbdae5 100644 --- a/be/src/olap/field.h +++ b/be/src/olap/field.h @@ -45,13 +45,7 @@ public: } static Field* create_by_type(const FieldType& type) { - // TODO(zc): To be compatible with old version, we should return nullptr for - // CHAR, VARCHAR, HLL. Because ColumnStatistics depend on this function return nullptr - if (type == OLAP_FIELD_TYPE_CHAR || - type == OLAP_FIELD_TYPE_VARCHAR || - type == OLAP_FIELD_TYPE_HLL) { - return nullptr; - } + // create by type return new Field(type); } @@ -189,7 +183,7 @@ public: if (is_null) { return; } - _type_info->copy_with_arena(dst->mutable_cell_ptr(), src.cell_ptr(), arena); + _type_info->deep_copy_with_arena(dst->mutable_cell_ptr(), src.cell_ptr(), arena); } // This function will initialize destination with source. @@ -210,6 +204,11 @@ public: _type_info->shallow_copy(dst, src); } + // copy filed content from src to dest without nullbyte + inline void deep_copy_content(char* dest, const char* src, Arena* arena) const { + _type_info->deep_copy_with_arena(dest, src, arena); + } + // Copy srouce content to destination in index format. template void to_index(DstCellType* dst, const SrcCellType& src) const; diff --git a/be/src/olap/iterators.h b/be/src/olap/iterators.h index bbb2ce6290..25ceb2d140 100644 --- a/be/src/olap/iterators.h +++ b/be/src/olap/iterators.h @@ -26,6 +26,7 @@ namespace doris { class RowCursor; class RowBlockV2; class Schema; +class Conditions; struct StorageReadOptions { // lower_bound defines the smallest key at which iterator will @@ -35,7 +36,7 @@ struct StorageReadOptions { // If include_lower_bound is true, data equal with lower_bound will // be read - bool include_lower_bound; + bool include_lower_bound = false; // upper_bound defines the extend upto which the iterator can return // data. @@ -43,7 +44,11 @@ struct StorageReadOptions { // If include_upper_bound is true, data equal with upper_bound will // be read - bool include_upper_bound; + bool include_upper_bound = false; + + // reader's column predicates + // used by zone map/bloom filter/secondary index to prune data + std::shared_ptr conditions; }; // Used to read data in RowBlockV2 one by one diff --git a/be/src/olap/rowset/segment_v2/binary_plain_page.h b/be/src/olap/rowset/segment_v2/binary_plain_page.h index dd069df378..882cf49c08 100644 --- a/be/src/olap/rowset/segment_v2/binary_plain_page.h +++ b/be/src/olap/rowset/segment_v2/binary_plain_page.h @@ -50,18 +50,19 @@ public: } bool is_page_full() override { - return _size_estimate > _options.data_page_size - || _prepared_size > _options.data_page_size; + // data_page_size is 0, do not limit the page size + return _options.data_page_size != 0 && (_size_estimate > _options.data_page_size + || _prepared_size > _options.data_page_size); } - Status add(const uint8_t *vals, size_t *count) override { + Status add(const uint8_t* vals, size_t* count) override { DCHECK(!_finished); DCHECK_GT(*count, 0); size_t i = 0; // If the page is full, should stop adding more items. while (!is_page_full() && i < *count) { - const Slice *src = reinterpret_cast(vals); + auto src = reinterpret_cast(vals); size_t offset = _buffer.size(); _offsets.push_back(offset); _buffer.append(src->data, src->size); @@ -110,7 +111,7 @@ public: // release() should be called after finish // reset() should be called after this function before reuse the builder void release() override { - uint8_t *ret = _buffer.release(); + uint8_t* ret = _buffer.release(); _buffer.reserve(_options.data_page_size); (void) ret; } @@ -135,6 +136,8 @@ private: class BinaryPlainPageDecoder : public PageDecoder { public: + BinaryPlainPageDecoder(Slice data) : BinaryPlainPageDecoder(data, PageDecoderOptions()) { } + BinaryPlainPageDecoder(Slice data, const PageDecoderOptions& options) : _data(data), _options(options), _parsed(false), @@ -187,7 +190,7 @@ public: return Status::OK(); } - Status next_batch(size_t *n, ColumnBlockView *dst) override { + Status next_batch(size_t* n, ColumnBlockView* dst) override { DCHECK(_parsed); if (PREDICT_FALSE(*n == 0 || _cur_idx >= _num_elems)) { *n = 0; diff --git a/be/src/olap/rowset/segment_v2/column_reader.cpp b/be/src/olap/rowset/segment_v2/column_reader.cpp index a314a94007..bc28f731cf 100644 --- a/be/src/olap/rowset/segment_v2/column_reader.cpp +++ b/be/src/olap/rowset/segment_v2/column_reader.cpp @@ -72,9 +72,11 @@ struct ParsedPage { ColumnReader::ColumnReader(const ColumnReaderOptions& opts, const ColumnMetaPB& meta, + uint64_t num_rows, RandomAccessFile* file) : _opts(opts), _meta(meta), + _num_rows(num_rows), _file(file) { } @@ -93,6 +95,8 @@ Status ColumnReader::init() { RETURN_IF_ERROR(_init_ordinal_index()); + RETURN_IF_ERROR(_init_column_zone_map()); + return Status::OK(); } @@ -156,18 +160,75 @@ Status ColumnReader::read_page(const PagePointer& pp, PageHandle* handle) { return Status::OK(); } +void ColumnReader::get_row_ranges_by_zone_map(CondColumn* cond_column, RowRanges* row_ranges) { + std::vector page_indexes; + _get_filtered_pages(cond_column, &page_indexes); + _calculate_row_ranges(page_indexes, row_ranges); +} + +void ColumnReader::_get_filtered_pages(CondColumn* cond_column, std::vector* page_indexes) { + FieldType type = _type_info->type(); + const std::vector& zone_maps = _column_zone_map->get_column_zone_map(); + int32_t page_size = _column_zone_map->num_pages(); + std::unique_ptr min_value(WrapperField::create_by_type(type)); + std::unique_ptr max_value(WrapperField::create_by_type(type)); + for (int32_t i = 0; i < page_size; ++i) { + // min value and max value are valid if exisst_none_null is true + if (zone_maps[i].has_not_null()) { + min_value->from_string(zone_maps[i].min()); + max_value->from_string(zone_maps[i].max()); + } + // for compatible original Cond eval logic + // TODO(hkp): optimize OlapCond + if (zone_maps[i].has_null()) { + // for compatible, if exist null, original logic treat null as min + min_value->set_null(); + if (!zone_maps[i].has_not_null()) { + // for compatible OlapCond's 'is not null' + max_value->set_null(); + } + } + if (cond_column->eval({min_value.get(), max_value.get()})) { + page_indexes->push_back(i); + } + } +} + +void ColumnReader::_calculate_row_ranges(const std::vector& page_indexes, RowRanges* row_ranges) { + for (auto i : page_indexes) { + rowid_t page_first_id = _ordinal_index->get_first_row_id(i); + rowid_t page_last_id = _ordinal_index->get_last_row_id(i); + RowRanges page_row_ranges(RowRanges::create_single(page_first_id, page_last_id + 1)); + RowRanges::ranges_union(*row_ranges, page_row_ranges, row_ranges); + } +} + // initial ordinal index Status ColumnReader::_init_ordinal_index() { PagePointer pp = _meta.ordinal_index_page(); PageHandle ph; RETURN_IF_ERROR(read_page(pp, &ph)); - _ordinal_index.reset(new OrdinalPageIndex(ph.data())); + _ordinal_index.reset(new OrdinalPageIndex(ph.data(), _num_rows)); RETURN_IF_ERROR(_ordinal_index->load()); return Status::OK(); } +// initialize column zone map +Status ColumnReader::_init_column_zone_map() { + if (_meta.has_zone_map_page()) { + PagePointer pp = _meta.zone_map_page(); + PageHandle ph; + RETURN_IF_ERROR(read_page(pp, &ph)); + _column_zone_map.reset(new ColumnZoneMap(ph.data())); + RETURN_IF_ERROR(_column_zone_map->load()); + } else { + _column_zone_map.reset(nullptr); + } + return Status::OK(); +} + Status ColumnReader::seek_to_first(OrdinalPageIndexIterator* iter) { *iter = _ordinal_index->begin(); if (!iter->valid()) { @@ -206,12 +267,11 @@ Status FileColumnIterator::seek_to_ordinal(rowid_t rid) { if (_page != nullptr && _page->contains(rid)) { // current page contains this row, we just } else { - // we need to seek to + // we need to seek to RETURN_IF_ERROR(_reader->seek_at_or_before(rid, &_page_iter)); _page.reset(new ParsedPage()); RETURN_IF_ERROR(_read_page(_page_iter, _page.get())); } - _seek_to_pos_in_page(_page.get(), rid - _page->first_rowid); _current_rowid = rid; return Status::OK(); @@ -270,7 +330,6 @@ Status FileColumnIterator::next_batch(size_t* n, ColumnBlock* dst) { while (nrows_to_read > 0) { bool is_null = false; size_t this_run = _page->null_decoder.GetNextRun(&is_null, nrows_to_read); - // we use num_rows only for CHECK size_t num_rows = this_run; if (!is_null) { @@ -294,7 +353,6 @@ Status FileColumnIterator::next_batch(size_t* n, ColumnBlock* dst) { column_view.set_null_bits(nrows_to_read, false); } - // set null bits to _page->offset_in_page += nrows_to_read; column_view.advance(nrows_to_read); _current_rowid += nrows_to_read; diff --git a/be/src/olap/rowset/segment_v2/column_reader.h b/be/src/olap/rowset/segment_v2/column_reader.h index 7d721c7f67..d3f25553a9 100644 --- a/be/src/olap/rowset/segment_v2/column_reader.h +++ b/be/src/olap/rowset/segment_v2/column_reader.h @@ -23,8 +23,11 @@ #include "common/status.h" // for Status #include "gen_cpp/segment_v2.pb.h" // for ColumnMetaPB +#include "olap/olap_cond.h" // for CondColumn #include "olap/rowset/segment_v2/common.h" // for rowid_t #include "olap/rowset/segment_v2/ordinal_page_index.h" // for OrdinalPageIndexIterator +#include "olap/rowset/segment_v2/column_zone_map.h" // for ColumnZoneMap +#include "olap/rowset/segment_v2/row_ranges.h" // for RowRanges namespace doris { @@ -55,7 +58,8 @@ struct ColumnReaderOptions { // This will cache data shared by all reader class ColumnReader { public: - ColumnReader(const ColumnReaderOptions& opts, const ColumnMetaPB& meta, RandomAccessFile* file); + ColumnReader(const ColumnReaderOptions& opts, const ColumnMetaPB& meta, + uint64_t num_rows, RandomAccessFile* file); ~ColumnReader(); Status init(); @@ -74,15 +78,25 @@ public: const EncodingInfo* encoding_info() const { return _encoding_info; } const TypeInfo* type_info() const { return _type_info; } + bool has_zone_map() { return _meta.has_zone_map_page(); } + void get_row_ranges_by_zone_map(CondColumn* cond_column, RowRanges* row_ranges); + private: Status _init_ordinal_index(); + Status _init_column_zone_map(); + + void _get_filtered_pages(CondColumn* cond_column, std::vector* page_indexes); + + void _calculate_row_ranges(const std::vector& page_indexes, RowRanges* row_ranges); + private: // input param ColumnReaderOptions _opts; // we need colun data to parse column data. // use shared_ptr here is to make things simple ColumnMetaPB _meta; + uint64_t _num_rows; RandomAccessFile* _file = nullptr; const TypeInfo* _type_info = nullptr; @@ -91,6 +105,9 @@ private: // get page pointer from index std::unique_ptr _ordinal_index; + + // column zone map info + std::unique_ptr _column_zone_map; }; // Base iterator to read one column data diff --git a/be/src/olap/rowset/segment_v2/column_writer.cpp b/be/src/olap/rowset/segment_v2/column_writer.cpp index 066e824e24..95ac006e99 100644 --- a/be/src/olap/rowset/segment_v2/column_writer.cpp +++ b/be/src/olap/rowset/segment_v2/column_writer.cpp @@ -105,17 +105,23 @@ Status ColumnWriter::init() { } _page_builder.reset(page_builder); // create ordinal builder - _ordinal_index_builer.reset(new OrdinalPageIndexBuilder()); + _ordinal_index_builder.reset(new OrdinalPageIndexBuilder()); // create null bitmap builder if (_is_nullable) { _null_bitmap_builder.reset(new NullBitmapBuilder()); } + if (_opts.need_zone_map) { + _column_zone_map_builder.reset(new ColumnZoneMapBuilder(_type_info)); + } return Status::OK(); } Status ColumnWriter::append_nulls(size_t num_rows) { _null_bitmap_builder->add_run(true, num_rows); _next_rowid += num_rows; + if (_opts.need_zone_map) { + RETURN_IF_ERROR(_column_zone_map_builder->add(nullptr, 1)); + } return Status::OK(); } @@ -131,6 +137,9 @@ Status ColumnWriter::_append_data(const uint8_t** ptr, size_t num_rows) { while (remaining > 0) { size_t num_written = remaining; RETURN_IF_ERROR(_page_builder->add(*ptr, &num_written)); + if (_opts.need_zone_map) { + RETURN_IF_ERROR(_column_zone_map_builder->add(*ptr, num_written)); + } bool is_page_full = (num_written < remaining); remaining -= num_written; @@ -142,8 +151,6 @@ Status ColumnWriter::_append_data(const uint8_t** ptr, size_t num_rows) { _null_bitmap_builder->add_run(false, num_written); } - // TODO(zc): update statistics for this page - if (is_page_full) { RETURN_IF_ERROR(_finish_current_page()); } @@ -161,6 +168,9 @@ Status ColumnWriter::append_nullable( if (is_null) { _null_bitmap_builder->add_run(true, this_run); _next_rowid += this_run; + if (_opts.need_zone_map) { + RETURN_IF_ERROR(_column_zone_map_builder->add(nullptr, 1)); + } } else { RETURN_IF_ERROR(_append_data(&ptr, this_run)); } @@ -178,24 +188,33 @@ Status ColumnWriter::write_data() { RETURN_IF_ERROR(_write_data_page(page)); page = page->next; } - // write ordinal index - // auto slice = _ordinal_index_builer->finish(); - // file->append return Status::OK(); } Status ColumnWriter::write_ordinal_index() { - Slice data = _ordinal_index_builer->finish(); + Slice data = _ordinal_index_builder->finish(); std::vector slices{data}; return _write_physical_page(&slices, &_ordinal_index_pp); } +Status ColumnWriter::write_zone_map() { + if (_opts.need_zone_map) { + Slice data = _column_zone_map_builder->finish(); + std::vector slices{data}; + return _write_physical_page(&slices, &_zone_map_pp); + } + return Status::OK(); +} + void ColumnWriter::write_meta(ColumnMetaPB* meta) { meta->set_type(_type_info->type()); meta->set_encoding(_opts.encoding_type); meta->set_compression(_opts.compression_type); meta->set_is_nullable(_is_nullable); _ordinal_index_pp.to_proto(meta->mutable_ordinal_index_page()); + if (_opts.need_zone_map) { + _zone_map_pp.to_proto(meta->mutable_zone_map_page()); + } } // write a page into file and update ordinal index @@ -219,8 +238,7 @@ Status ColumnWriter::_write_data_page(Page* page) { PagePointer pp; RETURN_IF_ERROR(_write_physical_page(&origin_data, &pp)); - - _ordinal_index_builer->append_entry(page->first_rowid, pp); + _ordinal_index_builder->append_entry(page->first_rowid, pp); return Status::OK(); } @@ -237,7 +255,7 @@ Status ColumnWriter::_write_physical_page(std::vector* origin_data, PageP output_data = &compressed_data; } - // always compute checksum + // checksum uint8_t checksum_buf[sizeof(uint32_t)]; uint32_t checksum = HashUtil::crc_hash(*output_data, 0); encode_fixed32_le(checksum_buf, checksum); @@ -285,6 +303,9 @@ Status ColumnWriter::_finish_current_page() { _last_first_rowid = _next_rowid; _push_back_page(page); + if (_opts.need_zone_map) { + RETURN_IF_ERROR(_column_zone_map_builder->flush()); + } return Status::OK(); } diff --git a/be/src/olap/rowset/segment_v2/column_writer.h b/be/src/olap/rowset/segment_v2/column_writer.h index f98e38743d..b32f93ef03 100644 --- a/be/src/olap/rowset/segment_v2/column_writer.h +++ b/be/src/olap/rowset/segment_v2/column_writer.h @@ -25,6 +25,7 @@ #include "util/slice.h" // for slice #include "olap/rowset/segment_v2/common.h" // for rowid_t #include "olap/rowset/segment_v2/page_pointer.h" // for PagePointer +#include "olap/rowset/segment_v2/column_zone_map.h" // for ColumnZoneMapBuilder namespace doris { @@ -41,6 +42,7 @@ struct ColumnWriterOptions { // store compressed page only when space saving is above the threshold. // space saving = 1 - compressed_size / uncompressed_size double compression_min_space_saving = 0.1; + bool need_zone_map = false; }; class EncodingInfo; @@ -91,6 +93,7 @@ public: // write all data into file Status write_data(); Status write_ordinal_index(); + Status write_zone_map(); void write_meta(ColumnMetaPB* meta); private: @@ -130,6 +133,7 @@ private: Status _write_data_page(Page* page); Status _write_physical_page(std::vector* origin_data, PagePointer* pp); + private: ColumnWriterOptions _opts; const TypeInfo* _type_info = nullptr; @@ -146,9 +150,11 @@ private: std::unique_ptr _page_builder; std::unique_ptr _null_bitmap_builder; - std::unique_ptr _ordinal_index_builer; + std::unique_ptr _ordinal_index_builder; + std::unique_ptr _column_zone_map_builder; PagePointer _ordinal_index_pp; + PagePointer _zone_map_pp; uint64_t _written_size = 0; }; diff --git a/be/src/olap/rowset/segment_v2/column_zone_map.cpp b/be/src/olap/rowset/segment_v2/column_zone_map.cpp new file mode 100644 index 0000000000..50e901aa64 --- /dev/null +++ b/be/src/olap/rowset/segment_v2/column_zone_map.cpp @@ -0,0 +1,107 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/rowset/segment_v2/column_zone_map.h" + +#include "olap/olap_define.h" + +namespace doris { + +namespace segment_v2 { + +ColumnZoneMapBuilder::ColumnZoneMapBuilder(const TypeInfo* type_info) : _type_info(type_info) { + PageBuilderOptions options; + options.data_page_size = 0; + _page_builder.reset(new BinaryPlainPageBuilder(options)); + _field.reset(Field::create_by_type(_type_info->type())); + _max_string_value = _arena.Allocate(OLAP_STRING_MAX_LENGTH); + _zone_map.min_value = _arena.Allocate(_type_info->size()); + _zone_map.max_value = _arena.Allocate(_type_info->size()); + _reset_zone_map(); +} + +Status ColumnZoneMapBuilder::add(const uint8_t *vals, size_t count) { + if (vals != nullptr) { + for (int i = 0; i < count; ++i) { + if (_field->compare(_zone_map.min_value, (char *)vals) > 0) { + _field->deep_copy_content(_zone_map.min_value, (const char *)vals, &_arena); + } + if (_field->compare(_zone_map.max_value, (char *)vals) < 0) { + _field->deep_copy_content(_zone_map.max_value, (const char *)vals, &_arena); + } + vals += _type_info->size(); + if (!_zone_map.has_not_null) { + _zone_map.has_not_null = true; + } + } + } + else { + if (!_zone_map.has_null) { + _zone_map.has_null = true; + } + } + return Status::OK(); +} + +Status ColumnZoneMapBuilder::flush() { + ZoneMapPB page_zone_map; + page_zone_map.set_min(_field->to_string(_zone_map.min_value)); + page_zone_map.set_max(_field->to_string(_zone_map.max_value)); + page_zone_map.set_has_null(_zone_map.has_null); + page_zone_map.set_has_not_null(_zone_map.has_not_null); + std::string serialized_zone_map; + bool ret = page_zone_map.SerializeToString(&serialized_zone_map); + if (!ret) { + return Status::InternalError("serialize zone map failed"); + } + Slice data(serialized_zone_map.data(), serialized_zone_map.size()); + size_t num = 1; + RETURN_IF_ERROR(_page_builder->add((const uint8_t *)&data, &num)); + // reset the variables + // we should allocate max varchar length and set to max for min value + _reset_zone_map(); + return Status::OK(); +} + +void ColumnZoneMapBuilder::_reset_zone_map() { + // we should allocate max varchar length and set to max for min value + Slice *min_slice = (Slice *)_zone_map.min_value; + min_slice->data = _max_string_value; + min_slice->size = OLAP_STRING_MAX_LENGTH; + _field->set_to_max(_zone_map.min_value); + _field->set_to_min(_zone_map.max_value); + _zone_map.has_null = false; + _zone_map.has_not_null = false; +} + +Status ColumnZoneMap::load() { + BinaryPlainPageDecoder page_decoder(_data); + RETURN_IF_ERROR(page_decoder.init()); + _num_pages = page_decoder.count(); + _page_zone_maps.resize(_num_pages); + for (int i = 0; i < _num_pages; ++i) { + Slice data = page_decoder.string_at_index(i); + bool ret = _page_zone_maps[i].ParseFromString(std::string(data.data, data.size)); + if (!ret) { + return Status::Corruption("parse zone map failed"); + } + } + return Status::OK(); +} + +} // namespace segment_v2 +} // namespace doris diff --git a/be/src/olap/rowset/segment_v2/column_zone_map.h b/be/src/olap/rowset/segment_v2/column_zone_map.h new file mode 100644 index 0000000000..760e887ba6 --- /dev/null +++ b/be/src/olap/rowset/segment_v2/column_zone_map.h @@ -0,0 +1,100 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +#include "common/status.h" +#include "util/slice.h" +#include "olap/field.h" +#include "gen_cpp/segment_v2.pb.h" +#include "olap/rowset/segment_v2/binary_plain_page.h" + +namespace doris { + +namespace segment_v2 { + +struct ZoneMap { + // min value of zone + char* min_value = nullptr; + // max value of zone + char* max_value = nullptr; + + // if both has_null and has_not_null is false, means no rows. + // if has_null is true and has_not_null is false, means all rows is null. + // if has_null is false and has_not_null is true, means all rows is not null. + // if has_null is true and has_not_null is true, means some rows is null and others are not. + // has_null means whether zone has null value + bool has_null = false; + // has_not_null means whether zone has none-null value + bool has_not_null = false; +}; + +// This class encode column pages' zone map. +// The binary is encoded by BinaryPlainPageBuilder +class ColumnZoneMapBuilder { +public: + ColumnZoneMapBuilder(const TypeInfo* type_info); + + Status add(const uint8_t* vals, size_t count); + + Status flush(); + + Slice finish() { + return _page_builder->finish(); + } + +private: + void _reset_zone_map(); + +private: + const TypeInfo* _type_info; + std::unique_ptr _page_builder; + std::unique_ptr _field; + // memory will be managed by arena + ZoneMap _zone_map; + char* _max_string_value; + Arena _arena; +}; + +// ColumnZoneMap +class ColumnZoneMap { +public: + ColumnZoneMap(const Slice& data) : _data(data), _num_pages(0) { } + + Status load(); + + const std::vector& get_column_zone_map() const { + return _page_zone_maps; + } + + int32_t num_pages() const { + return _num_pages; + } + +private: + Slice _data; + + // valid after load + int32_t _num_pages; + std::vector _page_zone_maps; +}; + +} // namespace segment_v2 +} // namespace doris diff --git a/be/src/olap/rowset/segment_v2/ordinal_page_index.cpp b/be/src/olap/rowset/segment_v2/ordinal_page_index.cpp index e8388a4459..3ab23b63bc 100644 --- a/be/src/olap/rowset/segment_v2/ordinal_page_index.cpp +++ b/be/src/olap/rowset/segment_v2/ordinal_page_index.cpp @@ -33,7 +33,8 @@ Status OrdinalPageIndex::load() { _num_pages = decode_fixed32_le(ptr); ptr += 4; - _rowids = new rowid_t[_num_pages]; + // add a additional rowid for row id compute convenience + _rowids = new rowid_t[_num_pages + 1]; _pages = new PagePointer[_num_pages]; for (int i = 0; i < _num_pages; ++i) { ptr = decode_varint32_ptr(ptr, limit, &_rowids[i]); @@ -45,6 +46,8 @@ Status OrdinalPageIndex::load() { return Status::InternalError("Data corruption"); } } + // set the additional last row id as number of rows + _rowids[_num_pages] = _num_rows; return Status::OK(); } diff --git a/be/src/olap/rowset/segment_v2/ordinal_page_index.h b/be/src/olap/rowset/segment_v2/ordinal_page_index.h index efe21fdd7a..b1c991fdbb 100644 --- a/be/src/olap/rowset/segment_v2/ordinal_page_index.h +++ b/be/src/olap/rowset/segment_v2/ordinal_page_index.h @@ -33,7 +33,7 @@ namespace segment_v2 { // the binary format is like that // Header | Content // Header: -// number of elements (4 Bytes) +// number of pages (4 Bytes) // Content: // array of index_pair // index_pair: @@ -43,7 +43,7 @@ class OrdinalPageIndexBuilder { public: OrdinalPageIndexBuilder() : _num_pages(0) { _buffer.reserve(4 * 1024); - // reserve space for number of elements + // reserve space for number of pages _buffer.resize(4); } @@ -56,7 +56,7 @@ public: } Slice finish() { - // encoded number of elements + // encoded number of pages encode_fixed32_le((uint8_t*)_buffer.data(), _num_pages); return Slice(_buffer); } @@ -75,6 +75,7 @@ public: inline bool valid() const; inline void next(); inline rowid_t rowid() const; + inline int32_t cur_idx() const; inline const PagePointer& page() const; private: OrdinalPageIndex* _index; @@ -84,8 +85,8 @@ private: // Page index class OrdinalPageIndex { public: - OrdinalPageIndex(const Slice& data) - : _data(data), _num_pages(0), _rowids(nullptr), _pages(nullptr) { + OrdinalPageIndex(const Slice& data, uint64_t num_rows) + : _data(data), _num_rows(num_rows), _num_pages(0), _rowids(nullptr), _pages(nullptr) { } ~OrdinalPageIndex(); @@ -98,17 +99,33 @@ public: OrdinalPageIndexIterator end() { return OrdinalPageIndexIterator(this, _num_pages); } + rowid_t get_first_row_id(int page_index) const { + return _rowids[page_index]; + } + + rowid_t get_last_row_id(int page_index) const { + // because add additional number of rows as the last rowid + // so just return next_page_first_id - 1 + int next_page_index = page_index + 1; + return get_first_row_id(next_page_index) - 1; + } + + int32_t num_pages() const { + return _num_pages; + } private: - uint32_t _header_size() const { return 4; } + uint32_t _header_size() const { return 8; } private: friend OrdinalPageIndexIterator; Slice _data; + uint64_t _num_rows; // valid after laod int32_t _num_pages; + // the last row id is additional, set to number of rows rowid_t* _rowids; PagePointer* _pages; }; @@ -126,6 +143,10 @@ inline rowid_t OrdinalPageIndexIterator::rowid() const { return _index->_rowids[_cur_idx]; } +int32_t OrdinalPageIndexIterator::cur_idx() const { + return _cur_idx; +} + inline const PagePointer& OrdinalPageIndexIterator::page() const { return _index->_pages[_cur_idx]; } diff --git a/be/src/olap/rowset/segment_v2/row_ranges.h b/be/src/olap/rowset/segment_v2/row_ranges.h new file mode 100644 index 0000000000..a0664e86cb --- /dev/null +++ b/be/src/olap/rowset/segment_v2/row_ranges.h @@ -0,0 +1,284 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +#include "common/logging.h" +#include "olap/rowset/segment_v2/common.h" +#include "gutil/strings/substitute.h" + +namespace doris { +namespace segment_v2 { + +// RowRange stands for range[From, To), From is inclusive, +// To is exclusive. It is used for row id range calculation. +class RowRange { +public: + // Returns true if two ranges are overlapped or false. + // The union range will be returned through range. + static bool range_union(const RowRange& left, const RowRange& right, RowRange* range) { + if (left._from <= right._from) { + if (left._to >= right._from) { + range->_from = left._from; + range->_to = std::max(left._to, right._to); + return true; + } + } else if (right._to >= left._from) { + range->_from = right._from; + range->_to = std::max(left._to, right._to); + return true; + } + // return a invalid range + range->_from = 0; + range->_to = 0; + return false; + } + + // Returns true if the two ranges are overlapped or false. + // The intersection of the two ranges is returned through range. + static bool range_intersection(const RowRange& left, const RowRange& right, RowRange* range) { + if (left._from <= right._from) { + if (left._to > right._from) { + range->_from = right._from; + range->_to = std::min(left._to, right._to); + return true; + } + } else if (right._to > left._from) { + range->_from = left._from; + range->_to = std::min(left._to, right._to); + return true; + } + // return a invalid range + range->_from = 0; + range->_to = 0; + return false; + } + + RowRange() : _from(0), _to(0) { } + + // Creates a range of [from, to) (from inclusive and to exclusive; empty ranges are invalid) + RowRange(int64_t from, int64_t to) : _from(from), _to(to) { } + + bool is_valid() const { + return _from < _to; + } + + size_t count() const { + return _to - _from; + } + + bool is_before(const RowRange& other) const { + return _to <= other._from; + } + + bool is_after(const RowRange& other) const { + return _from >= other._to; + } + + int64_t from() const { + return _from; + } + + int64_t to() const { + return _to; + } + + std::string to_string() const { + return strings::Substitute("[$0-$1)", _from, _to); + } + +private: + int64_t _from; + int64_t _to; +}; + +class RowRanges { +public: + RowRanges() : _count(0) { } + + // Creates a new RowRanges object with the single range [0, row_count). + static RowRanges create_single(uint64_t row_count) { + RowRanges ranges; + ranges.add(RowRange(0, row_count)); + return ranges; + } + + // Creates a new RowRanges object with the single range [from, to). + static RowRanges create_single(int64_t from, int64_t to) { + DCHECK(from <= to); + RowRanges ranges; + ranges.add(RowRange(from, to)); + return ranges; + } + + + // Calculates the union of the two specified RowRanges object. The union of two range is calculated if there are + // elements between them. Otherwise, the two disjunct ranges are stored separately. + // For example: + // [113, 241) ∪ [221, 340) = [113, 340) + // [113, 230) ∪ [230, 340) = [113, 340] + // while + // [113, 230) ∪ [231, 340) = [113, 230), [231, 340) + static void ranges_union(const RowRanges& left, const RowRanges& right, RowRanges* result) { + RowRanges tmp_range; + auto it1 = left._ranges.begin(); + auto it2 = right._ranges.begin(); + // merge and add + while (it1 != left._ranges.end() && it2 != right._ranges.end()) { + if (it1->is_after(*it2)) { + tmp_range.add(*it2); + ++it2; + } else { + tmp_range.add(*it1); + ++it1; + } + } + while (it1 != left._ranges.end()) { + tmp_range.add(*it1); + ++it1; + } + while (it2 != right._ranges.end()) { + tmp_range.add(*it2); + ++it2; + } + *result = std::move(tmp_range); + } + + + // Calculates the intersection of the two specified RowRanges object. Two ranges intersect if they have common + // elements otherwise the result is empty. + // For example: + // [113, 241) ∩ [221, 340) = [221, 241) + // while + // [113, 230) ∩ [230, 340) = + // + // The result RowRanges object will contain all the row indexes there were contained in both of the specified objects + static void ranges_intersection(const RowRanges& left, const RowRanges& right, RowRanges* result) { + RowRanges tmp_range; + int right_index = 0; + for (auto it1 = left._ranges.begin(); it1 != left._ranges.end(); ++it1) { + const RowRange& range1 = *it1; + for (int i = right_index; i < right._ranges.size(); ++i) { + const RowRange& range2 = right._ranges[i]; + if (range1.is_before(range2)) { + break; + } else if (range1.is_after(range2)) { + right_index = i + 1; + continue; + } + RowRange merge_range; + bool ret = RowRange::range_intersection(range1, range2, &merge_range); + DCHECK(ret); + tmp_range.add(merge_range); + } + } + *result = std::move(tmp_range); + } + + size_t count() { + return _count; + } + + bool is_empty() { + return _count == 0; + } + + bool contain(rowid_t from, rowid_t to) { + // binary search + RowRange tmp_range = RowRange(from, to); + int32_t start = 0; + int32_t end = _ranges.size(); + while (start <= end) { + int32_t mid = (start + end) / 2; + if (_ranges[mid].is_before(tmp_range)) { + start = mid; + } else if (_ranges[mid].is_after(tmp_range)) { + end = mid - 1; + } else { + return true; + } + } + return false; + } + + int64_t from() { + DCHECK(!is_empty()); + return _ranges[0].from(); + } + + int64_t to() { + DCHECK(!is_empty()); + return _ranges[_ranges.size() - 1].to(); + } + + size_t range_size() { + return _ranges.size(); + } + + int64_t get_range_from(size_t range_index) { + return _ranges[range_index].from(); + } + + int64_t get_range_to(size_t range_index) { + return _ranges[range_index].to(); + } + + size_t get_range_count(size_t range_index) { + return _ranges[range_index].count(); + } + + std::string to_string() { + std::string result; + for (auto range : _ranges) { + result += range.to_string() + " "; + } + return result; + } + +private: + // Adds a range to the end of the list of ranges. It maintains the disjunct ascending order(*) of the ranges by + // trying to union the specified range to the last ranges in the list. The specified range shall be larger(*) than + // the last one or might be overlapped with some of the last ones. + void add(const RowRange& range) { + RowRange range_to_add = range; + for (int i = _ranges.size() - 1; i >= 0; --i) { + const RowRange last = _ranges[i]; + DCHECK(!last.is_after(range)); + RowRange u; + bool ret = RowRange::range_union(last, range_to_add, &u); + if (!ret) { + // range do not intersect with the last + break; + } + range_to_add = u; + _ranges.erase(_ranges.begin() + i); + _count -= last.count(); + } + _ranges.emplace_back(range_to_add); + _count += range_to_add.count(); + } + +private: + std::vector _ranges; + size_t _count; +}; + +} // namespace segment_v2 +} // namespace doris diff --git a/be/src/olap/rowset/segment_v2/segment.cpp b/be/src/olap/rowset/segment_v2/segment.cpp index 63c6117058..a42e2c5513 100644 --- a/be/src/olap/rowset/segment_v2/segment.cpp +++ b/be/src/olap/rowset/segment_v2/segment.cpp @@ -163,7 +163,7 @@ Status Segment::_initial_column_readers() { ColumnReaderOptions opts; std::unique_ptr reader( - new ColumnReader(opts, _footer.columns(iter->second), _input_file.get())); + new ColumnReader(opts, _footer.columns(iter->second), _footer.num_rows(), _input_file.get())); RETURN_IF_ERROR(reader->init()); _column_readers[ordinal] = reader.release(); diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp b/be/src/olap/rowset/segment_v2/segment_iterator.cpp index 8ad70a1d52..fe1bcbe6c0 100644 --- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp +++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp @@ -20,6 +20,7 @@ #include #include "gutil/strings/substitute.h" +#include "util/doris_metrics.h" #include "olap/rowset/segment_v2/segment.h" #include "olap/rowset/segment_v2/column_reader.h" #include "olap/row_block2.h" @@ -35,7 +36,9 @@ SegmentIterator::SegmentIterator(std::shared_ptr segment, const Schema& schema) : _segment(std::move(segment)), _schema(schema), - _column_iterators(_schema.num_columns(), nullptr) { + _cur_range_id(0), + _column_iterators(_schema.num_columns(), nullptr), + _cur_rowid(0) { } SegmentIterator::~SegmentIterator() { @@ -45,16 +48,26 @@ SegmentIterator::~SegmentIterator() { } Status SegmentIterator::init(const StorageReadOptions& opts) { + DorisMetrics::segment_read_total.increment(1); _opts = opts; RETURN_IF_ERROR(_init_short_key_range()); + RETURN_IF_ERROR(_init_row_ranges()); + if (!_row_ranges.is_empty()) { + _cur_range_id = 0; + _cur_rowid = _row_ranges.get_range_from(_cur_range_id); + } RETURN_IF_ERROR(_init_column_iterators()); + return Status::OK(); } // This function will use input key bounds to get a row range. Status SegmentIterator::_init_short_key_range() { + DorisMetrics::segment_row_total.increment(num_rows()); _lower_rowid = 0; _upper_rowid = num_rows(); + // initial short key row ranges: [0, num_rows()) + _row_ranges = RowRanges::create_single(_lower_rowid, _upper_rowid); // fast path for empty segment if (_upper_rowid == 0) { @@ -79,6 +92,9 @@ Status SegmentIterator::_init_short_key_range() { RETURN_IF_ERROR(_lookup_ordinal( *_opts.lower_bound, _opts.include_lower_bound, _upper_rowid, &_lower_rowid)); } + // seeked short key row ranges: [_lower_rowid, _upper_rowid) + _row_ranges = RowRanges::create_single(_lower_rowid, _upper_rowid); + DorisMetrics::segment_rows_by_short_key.increment(_upper_rowid - _lower_rowid); return Status::OK(); } @@ -114,8 +130,45 @@ Status SegmentIterator::_prepare_seek() { return Status::OK(); } +Status SegmentIterator::_init_row_ranges() { + if (_lower_rowid == _upper_rowid) { + // no data just return; + return Status::OK(); + } + + if (_opts.conditions != nullptr) { + RowRanges zone_map_row_ranges; + RETURN_IF_ERROR(_get_row_ranges_from_zone_map(&zone_map_row_ranges)); + RowRanges::ranges_intersection(_row_ranges, zone_map_row_ranges, &_row_ranges); + // TODO(hkp): get row ranges from bloom filter and secondary index + } + + // TODO(hkp): calculate filter rate to decide whether to + // use zone map/bloom filter/secondary index or not. + return Status::OK(); +} + +Status SegmentIterator::_get_row_ranges_from_zone_map(RowRanges* zone_map_row_ranges) { + RowRanges origin_row_ranges = RowRanges::create_single(num_rows()); + for (auto& column_condition : _opts.conditions->columns()) { + int32_t column_id = column_condition.first; + // get row ranges from zone map + if (!_segment->_column_readers[column_id]->has_zone_map()) { + // there is no zone map for this column + continue; + } + // get row ranges by zone map of this column + RowRanges column_zone_map_row_ranges; + _segment->_column_readers[column_id]->get_row_ranges_by_zone_map(column_condition.second, &column_zone_map_row_ranges); + // intersection different columns's row ranges to get final row ranges by zone map + RowRanges::ranges_intersection(origin_row_ranges, column_zone_map_row_ranges, &origin_row_ranges); + } + *zone_map_row_ranges = std::move(origin_row_ranges); + DorisMetrics::segment_rows_read_by_zone_map.increment(zone_map_row_ranges->count()); + return Status::OK(); +} + Status SegmentIterator::_init_column_iterators() { - _cur_rowid = _lower_rowid; if (_cur_rowid >= num_rows()) { return Status::OK(); } @@ -226,7 +279,7 @@ Status SegmentIterator::_next_batch(RowBlockV2* block, size_t* rows_read) { size_t first_read = 0; for (int i = 0; i < block->schema()->column_ids().size(); ++i) { auto cid = block->schema()->column_ids()[i]; - size_t num_rows = has_read ? first_read : block->num_rows(); + size_t num_rows = has_read ? first_read : *rows_read; auto column_block = block->column_block(i); RETURN_IF_ERROR(_column_iterators[cid]->next_batch(&num_rows, &column_block)); if (!has_read) { @@ -244,13 +297,38 @@ Status SegmentIterator::_next_batch(RowBlockV2* block, size_t* rows_read) { } Status SegmentIterator::next_batch(RowBlockV2* block) { - size_t rows_to_read = std::min((rowid_t)block->capacity(), _upper_rowid - _cur_rowid); - block->resize(rows_to_read); - if (rows_to_read == 0) { - return Status::OK(); + if (_row_ranges.is_empty() || _cur_rowid >= _row_ranges.to()) { + block->resize(0); + return Status::EndOfFile("no more data in segment"); + } + size_t rows_to_read = block->capacity(); + while (rows_to_read > 0) { + if (_cur_rowid >= _row_ranges.get_range_to(_cur_range_id)) { + // current row range is read over, + if (_cur_range_id >= _row_ranges.range_size() - 1) { + // there is no more row range + break; + } + // step to next row range + ++_cur_range_id; + _cur_rowid = _row_ranges.get_range_from(_cur_range_id); + if (_row_ranges.get_range_count(_cur_range_id) == 0) { + // current row range is empty, just skip seek + continue; + } + for (auto cid : block->schema()->column_ids()) { + RETURN_IF_ERROR(_column_iterators[cid]->seek_to_ordinal(_cur_rowid)); + } + } + size_t to_read_in_range = std::min(rows_to_read, size_t(_row_ranges.get_range_to(_cur_range_id) - _cur_rowid)); + RETURN_IF_ERROR(_next_batch(block, &to_read_in_range)); + _cur_rowid += to_read_in_range; + rows_to_read -= to_read_in_range; + } + block->resize(block->capacity() - rows_to_read); + if (block->num_rows() == 0) { + return Status::EndOfFile("no more data in segment"); } - RETURN_IF_ERROR(_next_batch(block, &rows_to_read)); - _cur_rowid += rows_to_read; return Status::OK(); } diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.h b/be/src/olap/rowset/segment_v2/segment_iterator.h index 46a1e696b3..0b2494c055 100644 --- a/be/src/olap/rowset/segment_v2/segment_iterator.h +++ b/be/src/olap/rowset/segment_v2/segment_iterator.h @@ -25,6 +25,10 @@ #include "olap/rowset/segment_v2/segment.h" #include "olap/iterators.h" #include "olap/schema.h" +#include "olap/rowset/segment_v2/row_ranges.h" +#include "olap/rowset/segment_v2/column_zone_map.h" +#include "olap/rowset/segment_v2/ordinal_page_index.h" +#include "olap/olap_cond.h" #include "util/arena.h" namespace doris { @@ -47,6 +51,8 @@ public: private: Status _init_short_key_range(); Status _prepare_seek(); + Status _init_row_ranges(); + Status _get_row_ranges_from_zone_map(RowRanges* zone_map_row_ranges); Status _init_column_iterators(); Status _create_column_iterator(uint32_t cid, ColumnIterator** iter); @@ -65,6 +71,10 @@ private: StorageReadOptions _opts; + // row ranges to scan + size_t _cur_range_id; + RowRanges _row_ranges; + // Only used when init is called, help to finish seek_and_peek. // Data will be saved in this batch std::unique_ptr _seek_schema; diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp index 79069f8b34..203acd685c 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp @@ -65,6 +65,10 @@ Status SegmentWriter::init(uint32_t write_mbytes_per_sec) { ColumnWriterOptions opts; opts.compression_type = segment_v2::CompressionTypePB::LZ4F; + // now we create zone map for key columns + if (column.is_key()) { + opts.need_zone_map = true; + } std::unique_ptr writer(new ColumnWriter(opts, type_info, is_nullable, _output_file.get())); RETURN_IF_ERROR(writer->init()); _column_writers.push_back(writer.release()); @@ -104,6 +108,7 @@ Status SegmentWriter::finalize(uint32_t* segment_file_size) { RETURN_IF_ERROR(_write_raw_data({k_segment_magic})); RETURN_IF_ERROR(_write_data()); RETURN_IF_ERROR(_write_ordinal_index()); + RETURN_IF_ERROR(_write_zone_map()); RETURN_IF_ERROR(_write_short_key_index()); RETURN_IF_ERROR(_write_footer()); return Status::OK(); @@ -125,6 +130,13 @@ Status SegmentWriter::_write_ordinal_index() { return Status::OK(); } +Status SegmentWriter::_write_zone_map() { + for (auto column_writer : _column_writers) { + RETURN_IF_ERROR(column_writer->write_zone_map()); + } + return Status::OK(); +} + Status SegmentWriter::_write_short_key_index() { std::vector slices; // TODO(zc): we should get segment_size diff --git a/be/src/olap/rowset/segment_v2/segment_writer.h b/be/src/olap/rowset/segment_v2/segment_writer.h index 464a215370..9b05ce3f7b 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.h +++ b/be/src/olap/rowset/segment_v2/segment_writer.h @@ -65,6 +65,7 @@ public: private: Status _write_data(); Status _write_ordinal_index(); + Status _write_zone_map(); Status _write_short_key_index(); Status _write_footer(); Status _write_raw_data(const std::vector& slices); diff --git a/be/src/olap/stream_index_common.cpp b/be/src/olap/stream_index_common.cpp index 61aa806e63..242944a6b1 100755 --- a/be/src/olap/stream_index_common.cpp +++ b/be/src/olap/stream_index_common.cpp @@ -38,14 +38,15 @@ ColumnStatistics::~ColumnStatistics() { OLAPStatus ColumnStatistics::init(const FieldType& type, bool null_supported) { SAFE_DELETE(_minimum); SAFE_DELETE(_maximum); - // 当数据类型为 String和varchar或是未知类型时,实际上不会有统计信息。 - _minimum = WrapperField::create_by_type(type); - _maximum = WrapperField::create_by_type(type); _null_supported = null_supported; - if (NULL == _minimum || NULL == _maximum) { + if (type == OLAP_FIELD_TYPE_CHAR + || type == OLAP_FIELD_TYPE_VARCHAR || type == OLAP_FIELD_TYPE_HLL) { _ignored = true; } else { + // 当数据类型为 String和varchar或是未知类型时,实际上不会有统计信息。 + _minimum = WrapperField::create_by_type(type); + _maximum = WrapperField::create_by_type(type); _ignored = false; reset(); } diff --git a/be/src/olap/types.cpp b/be/src/olap/types.cpp index 41adba80b9..87ef27b643 100644 --- a/be/src/olap/types.cpp +++ b/be/src/olap/types.cpp @@ -25,7 +25,7 @@ TypeInfo::TypeInfo(TypeTraitsClass t) _cmp(TypeTraitsClass::cmp), _shallow_copy(TypeTraitsClass::shallow_copy), _deep_copy(TypeTraitsClass::deep_copy), - _copy_with_arena(TypeTraitsClass::copy_with_arena), + _deep_copy_with_arena(TypeTraitsClass::deep_copy_with_arena), _direct_copy(TypeTraitsClass::direct_copy), _from_string(TypeTraitsClass::from_string), _to_string(TypeTraitsClass::to_string), diff --git a/be/src/olap/types.h b/be/src/olap/types.h index 9d81fbbcb0..d85f365fb8 100644 --- a/be/src/olap/types.h +++ b/be/src/olap/types.h @@ -56,8 +56,8 @@ public: _deep_copy(dest, src, mem_pool); } - inline void copy_with_arena(void* dest, const void* src, Arena* arena) const { - _copy_with_arena(dest, src, arena); + inline void deep_copy_with_arena(void* dest, const void* src, Arena* arena) const { + _deep_copy_with_arena(dest, src, arena); } inline void direct_copy(void* dest, const void* src) const { @@ -83,7 +83,7 @@ private: void (*_shallow_copy)(void* dest, const void* src); void (*_deep_copy)(void* dest, const void* src, MemPool* mem_pool); - void (*_copy_with_arena)(void* dest, const void* src, Arena* arena); + void (*_deep_copy_with_arena)(void* dest, const void* src, Arena* arena); void (*_direct_copy)(void* dest, const void* src); OLAPStatus (*_from_string)(void* buf, const std::string& scan_key); @@ -193,7 +193,7 @@ struct BaseFieldtypeTraits : public CppTypeTraits { *reinterpret_cast(dest) = *reinterpret_cast(src); } - static inline void copy_with_arena(void* dest, const void* src, Arena* arena) { + static inline void deep_copy_with_arena(void* dest, const void* src, Arena* arena) { *reinterpret_cast(dest) = *reinterpret_cast(src); } @@ -343,7 +343,7 @@ struct FieldTypeTraits : public BaseFieldtypeTraits
    (dest) = *reinterpret_cast(src); } - static void copy_with_arena(void* dest, const void* src, Arena* arena) { + static void deep_copy_with_arena(void* dest, const void* src, Arena* arena) { *reinterpret_cast(dest) = *reinterpret_cast(src); } static void direct_copy(void* dest, const void* src) { @@ -538,7 +538,7 @@ struct FieldTypeTraits : public BaseFieldtypeTraitsdata, r_slice->data, r_slice->size); l_slice->size = r_slice->size; } - static void copy_with_arena(void* dest, const void* src, Arena* arena) { + static void deep_copy_with_arena(void* dest, const void* src, Arena* arena) { auto l_slice = reinterpret_cast(dest); auto r_slice = reinterpret_cast(src); l_slice->data = reinterpret_cast(arena->Allocate(r_slice->size)); diff --git a/be/src/olap/wrapper_field.cpp b/be/src/olap/wrapper_field.cpp index b137bd2a5f..a1b251d174 100644 --- a/be/src/olap/wrapper_field.cpp +++ b/be/src/olap/wrapper_field.cpp @@ -19,6 +19,8 @@ namespace doris { +const size_t DEFAULT_STRING_LENGTH = 50; + WrapperField* WrapperField::create(const TabletColumn& column, uint32_t len) { bool is_string_type = (column.type() == OLAP_FIELD_TYPE_CHAR || column.type() == OLAP_FIELD_TYPE_VARCHAR || column.type() == OLAP_FIELD_TYPE_HLL); @@ -37,6 +39,9 @@ WrapperField* WrapperField::create(const TabletColumn& column, uint32_t len) { if (column.type() == OLAP_FIELD_TYPE_CHAR) { variable_len = std::max(len, (uint32_t)(column.length())); } else if (column.type() == OLAP_FIELD_TYPE_VARCHAR || column.type() == OLAP_FIELD_TYPE_HLL) { + // column.length is the serialized varchar length + // the first sizeof(StringLengthType) bytes is the length of varchar + // variable_len is the real length of varchar variable_len = std::max(len, static_cast(column.length() - sizeof(StringLengthType))); } else { @@ -60,18 +65,19 @@ WrapperField* WrapperField::create_by_type(const FieldType& type) { } WrapperField::WrapperField(Field* rep, size_t variable_len, bool is_string_type) - : _rep(rep), _is_string_type(is_string_type) { + : _rep(rep), _is_string_type(is_string_type), _var_length(0) { size_t fixed_len = _rep->size(); - _length = fixed_len + variable_len + 1; + _length = fixed_len + 1; _field_buf = new char[_length]; memset(_field_buf, 0, _length); _owned_buf = _field_buf; char* buf = _field_buf + 1; if (_is_string_type) { + size_t _var_length = variable_len > 0 ? variable_len : DEFAULT_STRING_LENGTH; Slice* slice = reinterpret_cast(buf); - slice->size = variable_len; - slice->data = buf + fixed_len; + slice->size = _var_length; + slice->data = _arena.Allocate(_var_length); } } diff --git a/be/src/olap/wrapper_field.h b/be/src/olap/wrapper_field.h index ea41363160..1806bf86ee 100644 --- a/be/src/olap/wrapper_field.h +++ b/be/src/olap/wrapper_field.h @@ -40,13 +40,22 @@ public: // 将内部的value转成string输出 // 没有考虑实现的性能,仅供DEBUG使用 + // do not include the null flag std::string to_string() const { return _rep->to_string(_field_buf + 1); } // 从传入的字符串反序列化field的值 // 参数必须是一个\0结尾的字符串 + // do not include the null flag OLAPStatus from_string(const std::string& value_string) { + if (_is_string_type) { + if (value_string.size() > _var_length) { + Slice* slice = reinterpret_cast(cell_ptr()); + slice->size = value_string.size(); + slice->data = _arena.Allocate(slice->size); + } + } return _rep->from_string(_field_buf + 1, value_string); } @@ -87,6 +96,11 @@ public: _rep->direct_copy(this, *field); } + void copy(const char* value) { + set_is_null(false); + _rep->deep_copy_content((char*)cell_ptr(), value, &_arena); + } + private: Field* _rep = nullptr; @@ -96,6 +110,8 @@ private: //include fixed and variable length and null bytes size_t _length; + size_t _var_length; + Arena _arena; }; } diff --git a/be/src/util/doris_metrics.cpp b/be/src/util/doris_metrics.cpp index 7efdad8dee..8e71f42c36 100644 --- a/be/src/util/doris_metrics.cpp +++ b/be/src/util/doris_metrics.cpp @@ -87,6 +87,11 @@ IntCounter DorisMetrics::meta_write_request_duration_us; IntCounter DorisMetrics::meta_read_request_total; IntCounter DorisMetrics::meta_read_request_duration_us; +IntCounter DorisMetrics::segment_read_total; +IntCounter DorisMetrics::segment_row_total; +IntCounter DorisMetrics::segment_rows_by_short_key; +IntCounter DorisMetrics::segment_rows_read_by_zone_map; + IntCounter DorisMetrics::txn_begin_request_total; IntCounter DorisMetrics::txn_commit_request_total; IntCounter DorisMetrics::txn_rollback_request_total; @@ -219,6 +224,19 @@ void DorisMetrics::initialize( "meta_request_duration", MetricLabels().add("type", "read"), &meta_read_request_duration_us); + _metrics->register_metric( + "segment_read", MetricLabels().add("type", "segment_total_read_times"), + &segment_read_total); + _metrics->register_metric( + "segment_read", MetricLabels().add("type", "segment_total_row_num"), + &segment_row_total); + _metrics->register_metric( + "segment_read", MetricLabels().add("type", "segment_rows_by_short_key"), + &segment_rows_by_short_key); + _metrics->register_metric( + "segment_read", MetricLabels().add("type", "segment_rows_read_by_zone_map"), + &segment_rows_read_by_zone_map); + _metrics->register_metric( "txn_request", MetricLabels().add("type", "begin"), &txn_begin_request_total); diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h index c9ba75b2c0..7e0f3bdb4b 100644 --- a/be/src/util/doris_metrics.h +++ b/be/src/util/doris_metrics.h @@ -107,6 +107,11 @@ public: static IntCounter meta_read_request_total; static IntCounter meta_read_request_duration_us; + static IntCounter segment_read_total; + static IntCounter segment_row_total; + static IntCounter segment_rows_by_short_key; + static IntCounter segment_rows_read_by_zone_map; + static IntCounter txn_begin_request_total; static IntCounter txn_commit_request_total; static IntCounter txn_rollback_request_total; diff --git a/be/test/olap/CMakeLists.txt b/be/test/olap/CMakeLists.txt index 3e9470a276..2b3056ed39 100644 --- a/be/test/olap/CMakeLists.txt +++ b/be/test/olap/CMakeLists.txt @@ -55,6 +55,8 @@ ADD_BE_TEST(rowset/segment_v2/ordinal_page_index_test) ADD_BE_TEST(rowset/segment_v2/rle_page_test) ADD_BE_TEST(rowset/segment_v2/binary_dict_page_test) ADD_BE_TEST(rowset/segment_v2/segment_test) +ADD_BE_TEST(rowset/segment_v2/column_zone_map_test) +ADD_BE_TEST(rowset/segment_v2/row_ranges_test) ADD_BE_TEST(tablet_meta_manager_test) ADD_BE_TEST(tablet_mgr_test) ADD_BE_TEST(rowset/rowset_meta_manager_test) diff --git a/be/test/olap/rowset/segment_v2/column_reader_writer_test.cpp b/be/test/olap/rowset/segment_v2/column_reader_writer_test.cpp index 95de39702f..cfdc90500b 100644 --- a/be/test/olap/rowset/segment_v2/column_reader_writer_test.cpp +++ b/be/test/olap/rowset/segment_v2/column_reader_writer_test.cpp @@ -59,6 +59,7 @@ void test_nullable_data(uint8_t* src_data, uint8_t* src_is_null, int num_rows, s ColumnWriterOptions writer_opts; writer_opts.encoding_type = encoding; writer_opts.compression_type = segment_v2::CompressionTypePB::LZ4F; + writer_opts.need_zone_map = true; ColumnWriter writer(writer_opts, type_info, true, wfile.get()); st = writer.init(); @@ -76,8 +77,11 @@ void test_nullable_data(uint8_t* src_data, uint8_t* src_is_null, int num_rows, s ASSERT_TRUE(st.ok()); st = writer.write_ordinal_index(); ASSERT_TRUE(st.ok()); + st = writer.write_zone_map(); + ASSERT_TRUE(st.ok()); writer.write_meta(&meta); + ASSERT_TRUE(meta.has_zone_map_page()); // close the file wfile.reset(); @@ -90,11 +94,13 @@ void test_nullable_data(uint8_t* src_data, uint8_t* src_is_null, int num_rows, s ASSERT_TRUE(st.ok()); ColumnReaderOptions reader_opts; - ColumnReader reader(reader_opts, meta, rfile.get()); + ColumnReader reader(reader_opts, meta, num_rows, rfile.get()); st = reader.init(); ASSERT_TRUE(st.ok()); + ASSERT_EQ(reader._ordinal_index->num_pages(), reader._column_zone_map->get_column_zone_map().size()); + ColumnIterator* iter = nullptr; st = reader.new_iterator(&iter); ASSERT_TRUE(st.ok()); @@ -175,11 +181,12 @@ TEST_F(ColumnReaderWriterTest, test_nullable) { test_nullable_data(val, is_null, num_uint8_rows / 16, "null_largeint_bs"); float* float_vals = new float[num_uint8_rows]; - test_nullable_data((uint8_t*)float_vals, is_null, num_uint8_rows, "null_float_bs"); for (int i = 0; i < num_uint8_rows; ++i) { float_vals[i] = i; is_null[i] = ((i % 16) == 0); } + test_nullable_data((uint8_t*)float_vals, is_null, num_uint8_rows, "null_float_bs"); + double* double_vals = new double[num_uint8_rows]; for (int i = 0; i < num_uint8_rows; ++i) { double_vals[i] = i; diff --git a/be/test/olap/rowset/segment_v2/column_zone_map_test.cpp b/be/test/olap/rowset/segment_v2/column_zone_map_test.cpp new file mode 100644 index 0000000000..b30157fe42 --- /dev/null +++ b/be/test/olap/rowset/segment_v2/column_zone_map_test.cpp @@ -0,0 +1,125 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include + +#include "olap/rowset/segment_v2/column_zone_map.h" + +namespace doris { +namespace segment_v2 { + +class ColumnZoneMapTest : public testing::Test { +public: + void test_string(FieldType type) { + TypeInfo *type_info = get_type_info(OLAP_FIELD_TYPE_CHAR); + ColumnZoneMapBuilder builder(type_info); + std::vector values1 = {"aaaa", "bbbb", "cccc", "dddd", "eeee", "ffff"}; + for (auto value : values1) { + builder.add((const uint8_t*)&value, 1); + } + builder.flush(); + std::vector values2 = {"aaaaa", "bbbbb", "ccccc", "ddddd", "eeeee", "fffff"}; + for (auto value : values2) { + builder.add((const uint8_t*)&value, 1); + } + builder.add(nullptr, 1); + builder.flush(); + for (int i = 0; i < 6; ++i) { + builder.add(nullptr, 1); + } + builder.flush(); + Slice zone_map_page = builder.finish(); + ColumnZoneMap column_zone_map(zone_map_page); + Status status = column_zone_map.load(); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(3, column_zone_map.num_pages()); + const std::vector& zone_maps = column_zone_map.get_column_zone_map(); + ASSERT_EQ(3, zone_maps.size()); + ASSERT_EQ("aaaa", zone_maps[0].min()); + ASSERT_EQ("ffff", zone_maps[0].max()); + ASSERT_EQ(false, zone_maps[0].has_null()); + ASSERT_EQ(true, zone_maps[0].has_not_null()); + + ASSERT_EQ("aaaaa", zone_maps[1].min()); + ASSERT_EQ("fffff", zone_maps[1].max()); + ASSERT_EQ(true, zone_maps[1].has_null()); + ASSERT_EQ(true, zone_maps[1].has_not_null()); + + ASSERT_EQ(true, zone_maps[2].has_null()); + ASSERT_EQ(false, zone_maps[2].has_not_null()); + } +}; + +// Test for int +TEST_F(ColumnZoneMapTest, NormalTestIntPage) { + TypeInfo* type_info = get_type_info(OLAP_FIELD_TYPE_INT); + ColumnZoneMapBuilder builder(type_info); + std::vector values1 = {1, 10, 11, 20, 21, 22}; + for (auto value : values1) { + builder.add((const uint8_t*)&value, 1); + } + builder.flush(); + std::vector values2 = {2, 12, 31, 23, 21, 22}; + for (auto value : values2) { + builder.add((const uint8_t*)&value, 1); + } + builder.add(nullptr, 1); + builder.flush(); + for (int i = 0; i < 6; ++i) { + builder.add(nullptr, 1); + } + builder.flush(); + Slice zone_map_page = builder.finish(); + ColumnZoneMap column_zone_map(zone_map_page); + Status status = column_zone_map.load(); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(3, column_zone_map.num_pages()); + const std::vector& zone_maps = column_zone_map.get_column_zone_map(); + ASSERT_EQ(3, zone_maps.size()); + + ASSERT_EQ(std::to_string(1), zone_maps[0].min()); + ASSERT_EQ(std::to_string(22), zone_maps[0].max()); + ASSERT_EQ(false, zone_maps[0].has_null()); + ASSERT_EQ(true, zone_maps[0].has_not_null()); + + ASSERT_EQ(std::to_string(2), zone_maps[1].min()); + ASSERT_EQ(std::to_string(31), zone_maps[1].max()); + ASSERT_EQ(true, zone_maps[1].has_null()); + ASSERT_EQ(true, zone_maps[1].has_not_null()); + + ASSERT_EQ(true, zone_maps[2].has_null()); + ASSERT_EQ(false, zone_maps[2].has_not_null()); +} + +// Test for string +TEST_F(ColumnZoneMapTest, NormalTestVarcharPage) { + test_string(OLAP_FIELD_TYPE_VARCHAR); +} + +// Test for string +TEST_F(ColumnZoneMapTest, NormalTestCharPage) { + test_string(OLAP_FIELD_TYPE_CHAR); +} + +} +} + +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/be/test/olap/rowset/segment_v2/ordinal_page_index_test.cpp b/be/test/olap/rowset/segment_v2/ordinal_page_index_test.cpp index f2789621a8..ca663aee85 100644 --- a/be/test/olap/rowset/segment_v2/ordinal_page_index_test.cpp +++ b/be/test/olap/rowset/segment_v2/ordinal_page_index_test.cpp @@ -47,9 +47,13 @@ TEST_F(OrdinalPageIndexTest, normal) { auto slice = builder.finish(); LOG(INFO) << "index block's size=" << slice.size; - OrdinalPageIndex index(slice); + OrdinalPageIndex index(slice, 16 * 1024 * 4096 + 1); auto st = index.load(); ASSERT_TRUE(st.ok()); + ASSERT_EQ(1, index.get_first_row_id(0)); + ASSERT_EQ(4096, index.get_last_row_id(0)); + ASSERT_EQ((16 * 1024 - 1) * 4096 + 1, index.get_first_row_id(16 * 1024 - 1)); + ASSERT_EQ(16 * 1024 * 4096, index.get_last_row_id(16 * 1024 - 1)); PagePointer page; { @@ -90,7 +94,7 @@ TEST_F(OrdinalPageIndexTest, corrupt) { encode_fixed32_le((uint8_t*)str.data(), 1); Slice slice(str); - OrdinalPageIndex index(slice); + OrdinalPageIndex index(slice, 10); auto st = index.load(); ASSERT_FALSE(st.ok()); } diff --git a/be/test/olap/rowset/segment_v2/row_ranges_test.cpp b/be/test/olap/rowset/segment_v2/row_ranges_test.cpp new file mode 100644 index 0000000000..ce9faf245f --- /dev/null +++ b/be/test/olap/rowset/segment_v2/row_ranges_test.cpp @@ -0,0 +1,113 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include + +#include "olap/rowset/segment_v2/row_ranges.h" + +namespace doris { +namespace segment_v2 { + +class RowRangesTest : public testing::Test { +public: + virtual ~RowRangesTest() { } +}; + +// Test for int +TEST_F(RowRangesTest, TestRange) { + RowRange range1(10, 20); + RowRange range2(15, 25); + RowRange range3(30, 40); + ASSERT_TRUE(range1.is_valid()); + ASSERT_EQ(10, range1.from()); + ASSERT_EQ(20, range1.to()); + ASSERT_EQ(10, range1.count()); + ASSERT_TRUE(range1.is_before(range3)); + ASSERT_FALSE(range1.is_after(range2)); + ASSERT_TRUE(range3.is_after(range1)); + RowRange tmp; + RowRange::range_intersection(range1, range2, &tmp); + ASSERT_TRUE(tmp.is_valid()); + ASSERT_EQ(5, tmp.count()); + ASSERT_TRUE(tmp.is_valid()); + RowRange tmp2; + RowRange::range_intersection(range1, range3, &tmp2); + ASSERT_FALSE(tmp2.is_valid()); + RowRange tmp3; + RowRange::range_union(range1, range3, &tmp3); + ASSERT_FALSE(tmp3.is_valid()); + RowRange range4(0, 0); + ASSERT_FALSE(range4.is_valid()); + RowRange range5(20, 25); + RowRange tmp4; + ASSERT_FALSE(RowRange::range_intersection(range1, range5, &tmp4)); + ASSERT_TRUE(RowRange::range_union(range1, range5, &tmp4)); + ASSERT_EQ(15, tmp4.count()); + ASSERT_EQ(10, tmp4.from()); + ASSERT_EQ(25, tmp4.to()); +} + +TEST_F(RowRangesTest, TestRowRanges) { + RowRanges row_ranges; + RowRanges row_ranges1 = RowRanges::create_single(10, 20); + RowRanges row_ranges2 = RowRanges::create_single(20, 30); + RowRanges row_ranges3 = RowRanges::create_single(15, 30); + RowRanges row_ranges4 = RowRanges::create_single(40, 50); + + RowRanges row_ranges_merge; + RowRanges::ranges_intersection(row_ranges1, row_ranges2, &row_ranges_merge); + ASSERT_EQ(0, row_ranges_merge.count()); + ASSERT_TRUE(row_ranges_merge.is_empty()); + + RowRanges row_ranges_merge2; + RowRanges::ranges_intersection(row_ranges1, row_ranges3, &row_ranges_merge2); + ASSERT_EQ(5, row_ranges_merge2.count()); + ASSERT_FALSE(row_ranges_merge2.is_empty()); + ASSERT_TRUE(row_ranges_merge2.contain(16, 19)); + ASSERT_EQ(15, row_ranges_merge2.from()); + ASSERT_EQ(20, row_ranges_merge2.to()); + ASSERT_EQ(15, row_ranges_merge2.get_range_from(0)); + ASSERT_EQ(20, row_ranges_merge2.get_range_to(0)); + ASSERT_EQ(5, row_ranges_merge2.get_range_count(0)); + + RowRanges row_ranges_merge3; + RowRanges::ranges_intersection(row_ranges1, row_ranges4, &row_ranges_merge3); + ASSERT_EQ(0, row_ranges_merge3.count()); + ASSERT_TRUE(row_ranges_merge3.is_empty()); + + RowRanges row_ranges_union; + RowRanges::ranges_union(row_ranges1, row_ranges2, &row_ranges_union); + ASSERT_EQ(20, row_ranges_union.count()); + RowRanges::ranges_union(row_ranges_union, row_ranges4, &row_ranges_union); + ASSERT_EQ(30, row_ranges_union.count()); + ASSERT_FALSE(row_ranges_union.is_empty()); + ASSERT_TRUE(row_ranges_union.contain(16, 19)); + ASSERT_EQ(10, row_ranges_union.from()); + ASSERT_EQ(50, row_ranges_union.to()); + ASSERT_EQ(10, row_ranges_union.get_range_from(0)); + ASSERT_EQ(30, row_ranges_union.get_range_to(0)); + ASSERT_EQ(20, row_ranges_union.get_range_count(0)); +} + +} +} + +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/be/test/olap/rowset/segment_v2/segment_test.cpp b/be/test/olap/rowset/segment_v2/segment_test.cpp index d2f11c567e..6f955efd79 100644 --- a/be/test/olap/rowset/segment_v2/segment_test.cpp +++ b/be/test/olap/rowset/segment_v2/segment_test.cpp @@ -204,7 +204,7 @@ TEST_F(SegmentReaderWriterTest, normal) { Arena arena; RowBlockV2 block(schema, 100, &arena); st = iter->next_batch(&block); - ASSERT_TRUE(st.ok()); + ASSERT_TRUE(st.is_end_of_file()); ASSERT_EQ(0, block.num_rows()); } // test seek, key (-2, -1) @@ -243,10 +243,118 @@ TEST_F(SegmentReaderWriterTest, normal) { Arena arena; RowBlockV2 block(schema, 100, &arena); st = iter->next_batch(&block); - ASSERT_TRUE(st.ok()); + ASSERT_TRUE(st.is_end_of_file()); ASSERT_EQ(0, block.num_rows()); } } + FileUtils::remove_all(dname); +} + +TEST_F(SegmentReaderWriterTest, TestZoneMap) { + size_t num_rows_per_block = 10; + + std::shared_ptr tablet_schema(new TabletSchema()); + tablet_schema->_num_columns = 4; + tablet_schema->_num_key_columns = 3; + tablet_schema->_num_short_key_columns = 2; + tablet_schema->_num_rows_per_row_block = num_rows_per_block; + tablet_schema->_cols.push_back(create_int_key(1)); + tablet_schema->_cols.push_back(create_int_key(2)); + tablet_schema->_cols.push_back(create_int_key(3)); + tablet_schema->_cols.push_back(create_int_value(4)); + + // segment write + std::string dname = "./ut_dir/segment_test"; + FileUtils::create_dir(dname); + + SegmentWriterOptions opts; + opts.num_rows_per_block = num_rows_per_block; + + std::string fname = dname + "/int_case2"; + SegmentWriter writer(fname, 0, tablet_schema.get(), opts); + auto st = writer.init(10); + ASSERT_TRUE(st.ok()); + + RowCursor row; + auto olap_st = row.init(*tablet_schema); + ASSERT_EQ(OLAP_SUCCESS, olap_st); + + // 0, 1, 2, 3 + // 10, 11, 12, 13 + // 20, 21, 22, 23 + // + // 64k int will generate 4 pages + for (int i = 0; i < 64 * 1024; ++i) { + for (int j = 0; j < 4; ++j) { + auto cell = row.cell(j); + cell.set_not_null(); + *(int*)cell.mutable_cell_ptr() = i * 10 + j; + } + writer.append_row(row); + } + + uint32_t file_size = 0; + st = writer.finalize(&file_size); + ASSERT_TRUE(st.ok()); + + // reader with condition + { + std::shared_ptr segment(new Segment(fname, 0, tablet_schema, num_rows_per_block)); + st = segment->open(); + ASSERT_TRUE(st.ok()); + ASSERT_EQ(64 * 1024, segment->num_rows()); + Schema schema(*tablet_schema); + // scan all rows + { + std::unique_ptr iter; + st = segment->new_iterator(schema, &iter); + ASSERT_TRUE(st.ok()); + + StorageReadOptions read_opts; + TCondition condition; + condition.__set_column_name("2"); + condition.__set_condition_op("<"); + std::vector vals = {"100"}; + condition.__set_condition_values(vals); + std::shared_ptr conditions(new Conditions()); + conditions->set_tablet_schema(tablet_schema.get()); + conditions->append_condition(condition); + read_opts.conditions = conditions; + st = iter->init(read_opts); + ASSERT_TRUE(st.ok()); + + Arena arena; + RowBlockV2 block(schema, 1024, &arena); + + // only first page will be read because of zone map + int left = 16 * 1024; + + int rowid = 0; + while (left > 0) { + int rows_read = left > 1024 ? 1024 : left; + st = iter->next_batch(&block); + ASSERT_TRUE(st.ok()); + ASSERT_EQ(rows_read, block.num_rows()); + left -= rows_read; + + for (int j = 0; j < block.schema()->column_ids().size(); ++j) { + auto cid = block.schema()->column_ids()[j]; + auto column_block = block.column_block(j); + for (int i = 0; i < rows_read; ++i) { + int rid = rowid + i; + ASSERT_FALSE(BitmapTest(column_block.null_bitmap(), i)); + ASSERT_EQ(rid * 10 + cid, *(int*)column_block.cell_ptr(i)) << "rid:" << rid << ", i:" << i; + } + } + rowid += rows_read; + } + ASSERT_EQ(16 * 1024, rowid); + st = iter->next_batch(&block); + ASSERT_TRUE(st.is_end_of_file()); + ASSERT_EQ(0, block.num_rows()); + } + } + FileUtils::remove_all(dname); } } @@ -256,4 +364,3 @@ int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); } - diff --git a/gensrc/proto/segment_v2.proto b/gensrc/proto/segment_v2.proto index a9fc6c58b6..305377a0b2 100644 --- a/gensrc/proto/segment_v2.proto +++ b/gensrc/proto/segment_v2.proto @@ -1,155 +1,162 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. -// Define file format struct, like data header, index header. - -syntax="proto2"; - -package doris.segment_v2; - -message ColumnSchemaPB { - optional uint32 column_id = 1; - optional string type = 2; - optional string aggregation = 3; - optional uint32 length = 4; - optional bool is_key = 5; - optional string default_value = 6; - optional uint32 precision = 9 [default = 27]; - optional uint32 frac = 10 [default = 9]; - optional bool is_nullable = 11 [default=false]; - optional bool is_bf_column = 15 [default=false]; // is bloom filter indexed column - optional bool is_bitmap_column = 16 [default=false]; -} - -// page position info -message PagePointerPB { - required uint64 offset = 1; // offset in segment file - required uint32 size = 2; // size of page in byte -} - -message MetadataPairPB { - optional string key = 1; - optional bytes value = 2; -} - -enum EncodingTypePB { - UNKNOWN_ENCODING = 0; - DEFAULT_ENCODING = 1; - PLAIN_ENCODING = 2; - PREFIX_ENCODING = 3; - RLE = 4; - DICT_ENCODING = 5; - BIT_SHUFFLE = 6; -} - -enum CompressionTypePB { - UNKNOWN_COMPRESSION = 0; - DEFAULT_COMPRESSION = 1; - NO_COMPRESSION = 2; - SNAPPY = 3; - LZ4 = 4; - LZ4F = 5; - ZLIB = 6; - ZSTD = 7; -} - -message ZoneMapPB { - optional bytes min = 1; - optional bytes max = 2; - optional bool null_flag = 3; -} - -message ColumnMetaPB { - // column id in table schema - optional uint32 column_id = 1; - // unique column id - optional uint32 unique_id = 2; - // this field is FieldType's value - optional int32 type = 3; - optional EncodingTypePB encoding = 4; - // compress type for column - optional CompressionTypePB compression = 5; - // if this column can be nullable - optional bool is_nullable = 6; - // ordinal index page - optional PagePointerPB ordinal_index_page = 7; - - // // dictionary page for DICT_ENCODING - // optional PagePointerPB dict_page = 2; - - // // bloom filter pages for bloom filter column - // repeated PagePointerPB bloom_filter_pages = 3; - - // optional PagePointerPB page_zonemap_page = 5; // page zonemap info of column - - // optional PagePointerPB bitmap_index_page = 6; // bitmap index page - - // // data footprint of column after encoding and compress - // optional uint64 data_footprint = 7; - // // index footprint of column after encoding and compress - // optional uint64 index_footprint = 8; - // // raw column data footprint - // optional uint64 raw_data_footprint = 9; - - // optional ZoneMapPB column_zonemap = 11; // column zonemap info - // repeated MetadataPairPB column_meta_datas = 12; -} - -message FileFooterPB { - optional uint32 version = 1 [default = 1]; // file version - repeated ColumnSchemaPB schema = 2; // tablet schema - optional uint64 num_values = 3; // number of values - optional uint64 index_footprint = 4; // total idnex footprint of all columns - optional uint64 data_footprint = 5; // total data footprint of all columns - optional uint64 raw_data_footprint = 6; // raw data footprint - - optional CompressionTypePB compress_type = 7 [default = LZ4F]; // default compression type for file columns - repeated MetadataPairPB file_meta_datas = 8; // meta data of file - optional PagePointerPB key_index_page = 9; // short key index page -} - -message ShortKeyFooterPB { - // How many index item in this index. - optional uint32 num_items = 1; - // The total bytes occupied by the index key - optional uint32 key_bytes = 2; - // The total bytes occupied by the key offsets - optional uint32 offset_bytes = 3; - // Segment id which this index is belong to - optional uint32 segment_id = 4; - // number rows in each block - optional uint32 num_rows_per_block = 5; - // How many rows in this segment - optional uint32 num_segment_rows = 6; - // Total bytes for this segment - optional uint32 segment_bytes = 7; -} - -message SegmentFooterPB { - optional uint32 version = 1 [default = 1]; // file version - repeated ColumnMetaPB columns = 2; // tablet schema - optional uint64 num_rows = 3; // number of values - optional uint64 index_footprint = 4; // total idnex footprint of all columns - optional uint64 data_footprint = 5; // total data footprint of all columns - optional uint64 raw_data_footprint = 6; // raw data footprint - - optional CompressionTypePB compress_type = 7 [default = LZ4F]; // default compression type for file columns - repeated MetadataPairPB file_meta_datas = 8; // meta data of file - - // Short key index's page - optional PagePointerPB short_key_index_page = 9; -} - +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// Define file format struct, like data header, index header. + +syntax="proto2"; + +package doris.segment_v2; + +message ColumnSchemaPB { + optional uint32 column_id = 1; + optional string type = 2; + optional string aggregation = 3; + optional uint32 length = 4; + optional bool is_key = 5; + optional string default_value = 6; + optional uint32 precision = 9 [default = 27]; + optional uint32 frac = 10 [default = 9]; + optional bool is_nullable = 11 [default=false]; + optional bool is_bf_column = 15 [default=false]; // is bloom filter indexed column + optional bool is_bitmap_column = 16 [default=false]; +} + +// page position info +message PagePointerPB { + required uint64 offset = 1; // offset in segment file + required uint32 size = 2; // size of page in byte +} + +message MetadataPairPB { + optional string key = 1; + optional bytes value = 2; +} + +enum EncodingTypePB { + UNKNOWN_ENCODING = 0; + DEFAULT_ENCODING = 1; + PLAIN_ENCODING = 2; + PREFIX_ENCODING = 3; + RLE = 4; + DICT_ENCODING = 5; + BIT_SHUFFLE = 6; +} + +enum CompressionTypePB { + UNKNOWN_COMPRESSION = 0; + DEFAULT_COMPRESSION = 1; + NO_COMPRESSION = 2; + SNAPPY = 3; + LZ4 = 4; + LZ4F = 5; + ZLIB = 6; + ZSTD = 7; +} + +message ZoneMapPB { + // minimum not-null value, invalid when all values are null(has_not_null==false) + optional bytes min = 1; + // maximum not-null value, invalid when all values are null (has_not_null==false) + optional bytes max = 2; + // whether the zone has null value + optional bool has_null = 3; + // whether the zone has not-null value + optional bool has_not_null = 4; +} + +message ColumnMetaPB { + // column id in table schema + optional uint32 column_id = 1; + // unique column id + optional uint32 unique_id = 2; + // this field is FieldType's value + optional int32 type = 3; + optional EncodingTypePB encoding = 4; + // compress type for column + optional CompressionTypePB compression = 5; + // if this column can be nullable + optional bool is_nullable = 6; + // ordinal index page + optional PagePointerPB ordinal_index_page = 7; + // zone map page + optional PagePointerPB zone_map_page = 8; + + // // dictionary page for DICT_ENCODING + // optional PagePointerPB dict_page = 2; + + // // bloom filter pages for bloom filter column + // repeated PagePointerPB bloom_filter_pages = 3; + + // optional PagePointerPB page_zonemap_page = 5; // page zonemap info of column + + // optional PagePointerPB bitmap_index_page = 6; // bitmap index page + + // // data footprint of column after encoding and compress + // optional uint64 data_footprint = 7; + // // index footprint of column after encoding and compress + // optional uint64 index_footprint = 8; + // // raw column data footprint + // optional uint64 raw_data_footprint = 9; + + // optional ZoneMapPB column_zonemap = 11; // column zonemap info + // repeated MetadataPairPB column_meta_datas = 12; +} + +message FileFooterPB { + optional uint32 version = 1 [default = 1]; // file version + repeated ColumnSchemaPB schema = 2; // tablet schema + optional uint64 num_values = 3; // number of values + optional uint64 index_footprint = 4; // total idnex footprint of all columns + optional uint64 data_footprint = 5; // total data footprint of all columns + optional uint64 raw_data_footprint = 6; // raw data footprint + + optional CompressionTypePB compress_type = 7 [default = LZ4F]; // default compression type for file columns + repeated MetadataPairPB file_meta_datas = 8; // meta data of file + optional PagePointerPB key_index_page = 9; // short key index page +} + +message ShortKeyFooterPB { + // How many index item in this index. + optional uint32 num_items = 1; + // The total bytes occupied by the index key + optional uint32 key_bytes = 2; + // The total bytes occupied by the key offsets + optional uint32 offset_bytes = 3; + // Segment id which this index is belong to + optional uint32 segment_id = 4; + // number rows in each block + optional uint32 num_rows_per_block = 5; + // How many rows in this segment + optional uint32 num_segment_rows = 6; + // Total bytes for this segment + optional uint32 segment_bytes = 7; +} + +message SegmentFooterPB { + optional uint32 version = 1 [default = 1]; // file version + repeated ColumnMetaPB columns = 2; // tablet schema + optional uint64 num_rows = 3; // number of values + optional uint64 index_footprint = 4; // total idnex footprint of all columns + optional uint64 data_footprint = 5; // total data footprint of all columns + optional uint64 raw_data_footprint = 6; // raw data footprint + + optional CompressionTypePB compress_type = 7 [default = LZ4F]; // default compression type for file columns + repeated MetadataPairPB file_meta_datas = 8; // meta data of file + + // Short key index's page + optional PagePointerPB short_key_index_page = 9; +} + diff --git a/run-ut.sh b/run-ut.sh index 10c9fd291a..1e18c2c736 100755 --- a/run-ut.sh +++ b/run-ut.sh @@ -254,6 +254,8 @@ ${DORIS_TEST_BINARY_DIR}/olap/rowset/segment_v2/rle_page_test ${DORIS_TEST_BINARY_DIR}/olap/rowset/segment_v2/binary_dict_page_test ${DORIS_TEST_BINARY_DIR}/olap/rowset/segment_v2/segment_test ${DORIS_TEST_BINARY_DIR}/olap/rowset/segment_v2/page_compression_test +${DORIS_TEST_BINARY_DIR}/olap/rowset/segment_v2/column_zone_map_test +${DORIS_TEST_BINARY_DIR}/olap/rowset/segment_v2/row_ranges_test ${DORIS_TEST_BINARY_DIR}/olap/txn_manager_test ${DORIS_TEST_BINARY_DIR}/olap/storage_types_test ${DORIS_TEST_BINARY_DIR}/olap/generic_iterators_test