diff --git a/be/src/olap/CMakeLists.txt b/be/src/olap/CMakeLists.txt index 903e85e21a..66c28e36e2 100644 --- a/be/src/olap/CMakeLists.txt +++ b/be/src/olap/CMakeLists.txt @@ -94,6 +94,7 @@ add_library(Olap STATIC rowset/segment_v2/ordinal_page_index.cpp rowset/segment_v2/page_compression.cpp rowset/segment_v2/binary_dict_page.cpp + rowset/segment_v2/binary_prefix_page.cpp rowset/segment_v2/segment.cpp rowset/segment_v2/segment_iterator.cpp rowset/segment_v2/empty_segment_iterator.cpp diff --git a/be/src/olap/rowset/segment_v2/binary_dict_page.cpp b/be/src/olap/rowset/segment_v2/binary_dict_page.cpp index fff5a38947..f94c528e4b 100644 --- a/be/src/olap/rowset/segment_v2/binary_dict_page.cpp +++ b/be/src/olap/rowset/segment_v2/binary_dict_page.cpp @@ -60,6 +60,11 @@ Status BinaryDictPageBuilder::add(const uint8_t* vals, size_t* count) { const Slice* src = reinterpret_cast(vals); size_t num_added = 0; uint32_t value_code = -1; + + if (_data_page_builder->count() == 0) { + _first_value.assign_copy(reinterpret_cast(src->get_data()), src->get_size()); + } + for (int i = 0; i < *count; ++i, ++src) { auto iter = _dictionary.find(*src); if (iter != _dictionary.end()) { @@ -145,6 +150,35 @@ Status BinaryDictPageBuilder::get_dictionary_page(OwnedSlice* dictionary_page) { return Status::OK(); } +Status BinaryDictPageBuilder::get_first_value(void* value) const { + DCHECK(_finished); + if (_data_page_builder->count() == 0) { + return Status::NotFound("page is empty"); + } + if (_encoding_type != DICT_ENCODING) { + return _data_page_builder->get_first_value(value); + } + *reinterpret_cast(value) = Slice(_first_value); + return Status::OK(); +} + +Status BinaryDictPageBuilder::get_last_value(void* value) const { + DCHECK(_finished); + if (_data_page_builder->count() == 0) { + return Status::NotFound("page is empty"); + } + if (_encoding_type != DICT_ENCODING) { + return _data_page_builder->get_last_value(value); + } + uint32_t value_code; + RETURN_IF_ERROR(_data_page_builder->get_last_value(&value_code)); + // TODO _dict_items is cleared in get_dictionary_page, which could cause + // get_last_value to fail when it's called after get_dictionary_page. + // the solution is to read last value from _dict_builder instead of _dict_items + *reinterpret_cast(value) = _dict_items[value_code]; + return Status::OK(); +} + BinaryDictPageDecoder::BinaryDictPageDecoder(Slice data, const PageDecoderOptions& options) : _data(data), _options(options), diff --git a/be/src/olap/rowset/segment_v2/binary_dict_page.h b/be/src/olap/rowset/segment_v2/binary_dict_page.h index 2d12b07e7c..4fbe946c93 100644 --- a/be/src/olap/rowset/segment_v2/binary_dict_page.h +++ b/be/src/olap/rowset/segment_v2/binary_dict_page.h @@ -68,6 +68,10 @@ public: Status get_dictionary_page(OwnedSlice* dictionary_page) override; + Status get_first_value(void* value) const override; + + Status get_last_value(void* value) const override; + private: PageBuilderOptions _options; bool _finished; @@ -90,6 +94,7 @@ private: MemTracker _tracker; MemPool _pool; faststring _buffer; + faststring _first_value; }; class BinaryDictPageDecoder : public PageDecoder { diff --git a/be/src/olap/rowset/segment_v2/binary_plain_page.h b/be/src/olap/rowset/segment_v2/binary_plain_page.h index 8402a7f548..6aad282f36 100644 --- a/be/src/olap/rowset/segment_v2/binary_plain_page.h +++ b/be/src/olap/rowset/segment_v2/binary_plain_page.h @@ -68,6 +68,7 @@ public: _offsets.push_back(offset); _buffer.append(src->data, src->size); + _last_value_size = src->size; _size_estimate += src->size; _size_estimate += sizeof(uint32_t); @@ -87,6 +88,10 @@ public: put_fixed32_le(&_buffer, _offset); } put_fixed32_le(&_buffer, _offsets.size()); + if (_offsets.size() > 0) { + _copy_value_at(0, &_first_value); + _copy_value_at(_offsets.size() - 1, &_last_value); + } return _buffer.build(); } @@ -97,6 +102,7 @@ public: _size_estimate = sizeof(uint32_t); _prepared_size = sizeof(uint32_t); _finished = false; + _last_value_size = 0; } size_t count() const override { @@ -107,12 +113,35 @@ public: return _size_estimate; } + Status get_first_value(void* value) const override { + DCHECK(_finished); + if (_offsets.size() == 0) { + return Status::NotFound("page is empty"); + } + *reinterpret_cast(value) = Slice(_first_value); + return Status::OK(); + } + Status get_last_value(void* value) const override { + DCHECK(_finished); + if (_offsets.size() == 0) { + return Status::NotFound("page is empty"); + } + *reinterpret_cast(value) = Slice(_last_value); + return Status::OK(); + } + void update_prepared_size(size_t added_size) { _prepared_size += added_size; _prepared_size += sizeof(uint32_t); } private: + void _copy_value_at(size_t idx, faststring* value) const { + size_t value_size = (idx < _offsets.size() - 1) ? + _offsets[idx + 1] - _offsets[idx] : _last_value_size; + value->assign_copy(&_buffer[_offsets[idx]], value_size); + } + faststring _buffer; size_t _size_estimate; size_t _prepared_size; @@ -120,6 +149,10 @@ private: std::vector _offsets; bool _finished; PageBuilderOptions _options; + // size of last added value + uint32_t _last_value_size = 0; + faststring _first_value; + faststring _last_value; }; diff --git a/be/src/olap/rowset/segment_v2/binary_prefix_page.cpp b/be/src/olap/rowset/segment_v2/binary_prefix_page.cpp new file mode 100644 index 0000000000..c26602a04d --- /dev/null +++ b/be/src/olap/rowset/segment_v2/binary_prefix_page.cpp @@ -0,0 +1,276 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/rowset/segment_v2/binary_prefix_page.h" + +#include +#include +#include +#include +#include +#include + +#include "common/logging.h" +#include "gutil/strings/substitute.h" +#include "runtime/mem_pool.h" +#include "util/coding.h" +#include "util/faststring.h" +#include "util/slice.h" + +namespace doris { +namespace segment_v2 { + +using strings::Substitute; + +Status BinaryPrefixPageBuilder::add(const uint8_t* vals, size_t* add_count) { + DCHECK(!_finished); + if (*add_count == 0) { + return Status::OK(); + } + + const Slice* src = reinterpret_cast(vals); + if (_count == 0) { + _first_entry.assign_copy(reinterpret_cast(src->get_data()), src->get_size()); + } + + int i = 0; + for (; i < *add_count; ++i, ++src) { + if (is_page_full()) { + break; + } + const char* entry = src->data; + size_t entry_len = src->size; + int old_size = _buffer.size(); + + int share_len; + if (_count % RESTART_POINT_INTERVAL == 0) { + share_len = 0; + _restart_points_offset.push_back(old_size); + } else { + int max_share_len = std::min(_last_entry.size(), entry_len); + share_len = max_share_len; + for (int i = 0; i < max_share_len; ++i) { + if (entry[i] != _last_entry[i]) { + share_len = i; + break; + } + } + } + int non_share_len = entry_len - share_len; + + put_varint32(&_buffer, share_len); + put_varint32(&_buffer, non_share_len); + _buffer.append(entry + share_len, non_share_len); + + _last_entry.clear(); + _last_entry.append(entry, entry_len); + + ++_count; + } + *add_count = i; + return Status::OK(); +} + +OwnedSlice BinaryPrefixPageBuilder::finish() { + DCHECK(!_finished); + _finished = true; + put_fixed32_le(&_buffer, (uint32_t)_count); + uint8_t restart_point_internal = RESTART_POINT_INTERVAL; + _buffer.append(&restart_point_internal, 1); + auto restart_point_size = _restart_points_offset.size(); + for (uint32_t i = 0; i < restart_point_size; ++i) { + put_fixed32_le(&_buffer, _restart_points_offset[i]); + } + put_fixed32_le(&_buffer, restart_point_size); + return _buffer.build(); +} + +const uint8_t* BinaryPrefixPageDecoder::_decode_value_lengths(const uint8_t* ptr, + uint32_t* shared, + uint32_t* non_shared) { + if ((ptr = decode_varint32_ptr(ptr, _footer_start, shared)) == nullptr) { + return nullptr; + } + if ((ptr = decode_varint32_ptr(ptr, _footer_start, non_shared)) == nullptr) { + return nullptr; + } + if (_footer_start - ptr < *non_shared) { + return nullptr; + } + return ptr; +} + +Status BinaryPrefixPageDecoder::_read_next_value() { + if (_cur_pos >= _num_values) { + return Status::NotFound("no more value to read"); + } + uint32_t shared_len; + uint32_t non_shared_len; + auto data_ptr = _decode_value_lengths(_next_ptr, &shared_len, &non_shared_len); + if (data_ptr == nullptr) { + return Status::Corruption(Substitute("Failed to decode value at position $0", _cur_pos)); + } + _current_value.resize(shared_len); + _current_value.append(data_ptr, non_shared_len); + _next_ptr = data_ptr + non_shared_len; + return Status::OK(); +} + +Status BinaryPrefixPageDecoder::_seek_to_restart_point(size_t restart_point_index) { + _cur_pos = restart_point_index * _restart_point_internal; + _next_ptr = _get_restart_point(restart_point_index); + return _read_next_value(); +} + +Status BinaryPrefixPageDecoder::init() { + _cur_pos = 0; + _next_ptr = reinterpret_cast(_data.get_data()); + + const uint8_t* end = _next_ptr + _data.get_size(); + _num_restarts = decode_fixed32_le(end - 4); + _restarts_ptr = end - (_num_restarts + 1) * 4; + _footer_start = _restarts_ptr - 4 - 1; + _num_values = decode_fixed32_le(_footer_start); + _restart_point_internal = decode_fixed8(_footer_start + 4); + _parsed = true; + return _read_next_value(); +} + +Status BinaryPrefixPageDecoder::seek_to_position_in_page(size_t pos) { + DCHECK(_parsed); + DCHECK_LE(pos, _num_values); + + // seek past the last value is valid + if (pos == _num_values) { + _cur_pos = _num_values; + return Status::OK(); + } + + size_t restart_point_index = pos / _restart_point_internal; + RETURN_IF_ERROR(_seek_to_restart_point(restart_point_index)); + while (_cur_pos < pos) { + _cur_pos++; + RETURN_IF_ERROR(_read_next_value()); + } + return Status::OK(); +} + +Status BinaryPrefixPageDecoder::seek_at_or_after_value(const void* value, bool* exact_match) { + DCHECK(_parsed); + Slice target = *reinterpret_cast(value); + + uint32_t left = 0; + uint32_t right = _num_restarts; + // find the first restart point >= target. after loop, + // - left == index of first restart point >= target when found + // - left == _num_restarts when not found (all restart points < target) + while (left < right) { + uint32_t mid = left + (right - left) / 2; + // read first entry at restart point `mid` + RETURN_IF_ERROR(_seek_to_restart_point(mid)); + Slice mid_entry(_current_value); + if (mid_entry.compare(target) < 0) { + left = mid + 1; + } else { + right = mid; + } + } + + // then linear search from the last restart pointer < target. + // when left == 0, all restart points >= target, so search from first one. + // otherwise search from the last restart point < target, which is left - 1 + uint32_t search_index = left > 0 ? left - 1 : 0; + RETURN_IF_ERROR(_seek_to_restart_point(search_index)); + while (true) { + int cmp = Slice(_current_value).compare(target); + if (cmp >= 0) { + *exact_match = cmp == 0; + return Status::OK(); + } + _cur_pos++; + RETURN_IF_ERROR(_read_next_value()); + } +} + +Status BinaryPrefixPageDecoder::_read_next_value_to_output(Slice prev, MemPool* mem_pool, Slice* output) { + if (_cur_pos >= _num_values) { + return Status::NotFound("no more value to read"); + } + uint32_t shared_len; + uint32_t non_shared_len; + auto data_ptr = _decode_value_lengths(_next_ptr, &shared_len, &non_shared_len); + if (data_ptr == nullptr) { + return Status::Corruption(Substitute("Failed to decode value at position $0", _cur_pos)); + } + + output->size = shared_len + non_shared_len; + if (output->size > 0) { + output->data = (char*) mem_pool->allocate(output->size); + memcpy(output->data, prev.data, shared_len); + memcpy(output->data + shared_len, data_ptr, non_shared_len); + } + + _next_ptr = data_ptr + non_shared_len; + return Status::OK(); +} + +Status BinaryPrefixPageDecoder::_copy_current_to_output(MemPool* mem_pool, Slice* output) { + output->size = _current_value.size(); + if (output->size > 0) { + output->data = (char*) mem_pool->allocate(output->size); + if (output->data == nullptr) { + return Status::MemoryAllocFailed( + Substitute("failed to allocate $0 bytes", output->size)); + } + memcpy(output->data, _current_value.data(), output->size); + } + return Status::OK(); +} + +Status BinaryPrefixPageDecoder::next_batch(size_t* n, ColumnBlockView* dst) { + DCHECK(_parsed); + if (PREDICT_FALSE(*n == 0 || _cur_pos >= _num_values)) { + *n = 0; + return Status::OK(); + } + size_t i = 0; + size_t max_fetch = std::min(*n, static_cast(_num_values - _cur_pos)); + auto out = reinterpret_cast(dst->data()); + auto prev = out; + + // first copy the current value to output + RETURN_IF_ERROR(_copy_current_to_output(dst->pool(), out)); + i++; + out++; + + // read and copy remaining values + for (; i < max_fetch; ++i) { + _cur_pos++; + RETURN_IF_ERROR(_read_next_value_to_output(prev[i - 1], dst->pool(), out)); + out++; + } + + //must update _current_value + _current_value.clear(); + _current_value.assign_copy((uint8_t*)prev[i-1].data, prev[i-1].size); + + *n = max_fetch; + return Status::OK(); +} + +} // namespace segment_v2 +} // namespace doris \ No newline at end of file diff --git a/be/src/olap/rowset/segment_v2/binary_prefix_page.h b/be/src/olap/rowset/segment_v2/binary_prefix_page.h new file mode 100644 index 0000000000..f3a96ae77b --- /dev/null +++ b/be/src/olap/rowset/segment_v2/binary_prefix_page.h @@ -0,0 +1,179 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "util/slice.h" +#include "util/faststring.h" +#include "runtime/mem_pool.h" + +#include "olap/rowset/segment_v2/options.h" +#include "olap/rowset/segment_v2/page_builder.h" +#include "olap/rowset/segment_v2/page_decoder.h" + +namespace doris { +namespace segment_v2 { + +// prefix encoding for string dictionary +// +// BinaryPrefixPage := Entry^EntryNum, Trailer +// Entry := SharedPrefixLength(vint), UnsharedLength(vint), Byte^UnsharedLength +// Trailer := NumEntry(uint32_t), RESTART_POINT_INTERVAL(uint8_t) +// RestartPointStartOffset(uint32_t)^NumRestartPoints,NumRestartPoints(uint32_t) +class BinaryPrefixPageBuilder : public PageBuilder { +public: + BinaryPrefixPageBuilder(const PageBuilderOptions& options) : + _options(options) { + reset(); + } + + bool is_page_full() override { + return size() >= _options.data_page_size; + } + + Status add(const uint8_t* vals, size_t* add_count) override; + + OwnedSlice finish() override; + + void reset() override { + _restart_points_offset.clear(); + _last_entry.clear(); + _count = 0; + _buffer.clear(); + _finished = false; + } + + uint64_t size() const override { + if (_finished) { + return _buffer.size(); + } else { + return _buffer.size() + (_restart_points_offset.size() + 2) * sizeof(uint32_t); + } + } + + size_t count() const override { + return _count; + } + + Status get_first_value(void* value) const override { + DCHECK(_finished); + if (_count == 0) { + return Status::NotFound("page is empty"); + } + *reinterpret_cast(value) = Slice(_first_entry); + return Status::OK(); + } + + Status get_last_value(void* value) const override { + DCHECK(_finished); + if (_count == 0) { + return Status::NotFound("page is empty"); + } + *reinterpret_cast(value) = Slice(_last_entry); + return Status::OK(); + } + +private: + PageBuilderOptions _options; + std::vector _restart_points_offset; + faststring _first_entry; + faststring _last_entry; + size_t _count = 0; + bool _finished = false; + faststring _buffer; + // This is a empirical value, Kudu and LevelDB use this default value + static const uint8_t RESTART_POINT_INTERVAL = 16; +}; + +class BinaryPrefixPageDecoder : public PageDecoder { +public: + BinaryPrefixPageDecoder(Slice data, const PageDecoderOptions& options) : + _data(data), _parsed(false) { + } + + Status init() override; + + Status seek_to_position_in_page(size_t pos) override; + + Status seek_at_or_after_value(const void* value, bool* exact_match) override; + + Status next_batch(size_t* n, ColumnBlockView* dst) override; + + size_t count() const override { + DCHECK(_parsed); + return _num_values; + } + + size_t current_index() const override { + DCHECK(_parsed); + return _cur_pos; + } + +private: + // decode shared and non-shared entry length from `ptr`. + // return ptr past the parsed value when success. + // return nullptr on failure + const uint8_t* _decode_value_lengths(const uint8_t* ptr, uint32_t* shared, uint32_t* non_shared); + + + // return start pointer of the restart point at index `restart_point_index` + const uint8_t* _get_restart_point(size_t restart_point_index) const { + return reinterpret_cast(_data.get_data()) + + decode_fixed32_le(_restarts_ptr + restart_point_index * sizeof(uint32_t)); + } + + // read next value at `_cur_pos` and `_next_ptr` into `_current_value`. + // return OK and advance `_next_ptr` on success. `_cur_pos` is not modified. + // return NotFound when no more entry can be read. + // return other error status otherwise. + Status _read_next_value(); + + // seek to the first value at the given restart point + Status _seek_to_restart_point(size_t restart_point_index); + + // like _read_next_value, but derictly copy next value to output, not _current_value + Status _read_next_value_to_output(Slice prev, MemPool* mem_pool, Slice* output); + + // copy `_current_value` into `output`. + Status _copy_current_to_output(MemPool* mem_pool, Slice* output); + + Slice _data; + bool _parsed = false; + size_t _num_values = 0; + uint8_t _restart_point_internal = 0; + uint32_t _num_restarts = 0; + // pointer to _footer start + const uint8_t* _footer_start = nullptr; + // pointer to restart offsets array + const uint8_t* _restarts_ptr = nullptr; + // ordinal of the first value to return in next_batch() + uint32_t _cur_pos = 0; + // first value to return in next_batch() + faststring _current_value; + // pointer to the start of next value to read, advanced by `_read_next_value` + const uint8_t* _next_ptr = nullptr; +}; + +} // namespace segment_v2 +} // namespace doris \ No newline at end of file diff --git a/be/src/olap/rowset/segment_v2/bitshuffle_page.h b/be/src/olap/rowset/segment_v2/bitshuffle_page.h index 430445ed5c..80068f625c 100644 --- a/be/src/olap/rowset/segment_v2/bitshuffle_page.h +++ b/be/src/olap/rowset/segment_v2/bitshuffle_page.h @@ -107,6 +107,10 @@ public: } OwnedSlice finish() override { + if (_count > 0) { + _first_value = cell(0); + _last_value = cell(_count - 1); + } return _finish(SIZE_OF_TYPE); } @@ -131,6 +135,23 @@ public: return _buffer.size(); } + Status get_first_value(void* value) const override { + DCHECK(_finished); + if (_count == 0) { + return Status::NotFound("page is empty"); + } + memcpy(value, &_first_value, SIZE_OF_TYPE); + return Status::OK(); + } + Status get_last_value(void* value) const override { + DCHECK(_finished); + if (_count == 0) { + return Status::NotFound("page is empty"); + } + memcpy(value, &_last_value, SIZE_OF_TYPE); + return Status::OK(); + } + private: OwnedSlice _finish(int final_size_of_type) { _data.resize(final_size_of_type * _count); @@ -186,6 +207,8 @@ private: bool _finished; faststring _data; faststring _buffer; + CppType _first_value; + CppType _last_value; }; template diff --git a/be/src/olap/rowset/segment_v2/encoding_info.cpp b/be/src/olap/rowset/segment_v2/encoding_info.cpp index 4edd500550..5dcd01ae42 100644 --- a/be/src/olap/rowset/segment_v2/encoding_info.cpp +++ b/be/src/olap/rowset/segment_v2/encoding_info.cpp @@ -21,6 +21,7 @@ #include "olap/olap_common.h" #include "olap/rowset/segment_v2/binary_dict_page.h" #include "olap/rowset/segment_v2/binary_plain_page.h" +#include "olap/rowset/segment_v2/binary_prefix_page.h" #include "olap/rowset/segment_v2/bitshuffle_page.h" #include "olap/rowset/segment_v2/frame_of_reference_page.h" #include "olap/rowset/segment_v2/plain_page.h" @@ -116,6 +117,18 @@ struct TypeEncodingTraits +struct TypeEncodingTraits { + static Status create_page_builder(const PageBuilderOptions& opts, PageBuilder** builder) { + *builder = new BinaryPrefixPageBuilder(opts); + return Status::OK(); + } + static Status create_page_decoder(const Slice& data, const PageDecoderOptions& opts, PageDecoder** decoder) { + *decoder = new BinaryPrefixPageDecoder(data, opts); + return Status::OK(); + } +}; + template struct EncodingTraits : TypeEncodingTraits::CppType> { static const FieldType type = field_type; @@ -184,8 +197,10 @@ EncodingInfoResolver::EncodingInfoResolver() { _add_map(); _add_map(); _add_map(); + _add_map(); _add_map(); _add_map(); + _add_map(); _add_map(); _add_map(); _add_map(); diff --git a/be/src/olap/rowset/segment_v2/frame_of_reference_page.h b/be/src/olap/rowset/segment_v2/frame_of_reference_page.h index f6657d1450..3293931448 100644 --- a/be/src/olap/rowset/segment_v2/frame_of_reference_page.h +++ b/be/src/olap/rowset/segment_v2/frame_of_reference_page.h @@ -43,9 +43,16 @@ public: Status add(const uint8_t* vals, size_t* count) override { DCHECK(!_finished); + if (*count == 0) { + return Status::OK(); + } auto new_vals = reinterpret_cast(vals); + if (_count == 0) { + _first_val = *new_vals; + } _encoder->put_batch(new_vals, *count); _count += *count; + _last_val = new_vals[*count - 1]; return Status::OK(); } @@ -70,6 +77,22 @@ public: return _buf.size(); } + Status get_first_value(void* value) const override { + if (_count == 0) { + return Status::NotFound("page is empty"); + } + memcpy(value, &_first_val, sizeof(CppType)); + return Status::OK(); + } + + Status get_last_value(void* value) const override { + if (_count == 0) { + return Status::NotFound("page is empty"); + } + memcpy(value, &_last_val, sizeof(CppType)); + return Status::OK(); + } + private: typedef typename TypeTraits::CppType CppType; PageBuilderOptions _options; @@ -77,6 +100,8 @@ private: bool _finished; std::unique_ptr> _encoder; faststring _buf; + CppType _first_val; + CppType _last_val; }; template @@ -117,6 +142,16 @@ public: return Status::OK(); } + Status seek_at_or_after_value(const void* value, bool* exact_match) override { + DCHECK(_parsed) << "Must call init() firstly"; + bool found = _decoder->seek_at_or_after_value(value, exact_match); + if (!found) { + return Status::NotFound("not found"); + } + _cur_index = _decoder->current_index(); + return Status::OK(); + } + Status next_batch(size_t* n, ColumnBlockView* dst) override { DCHECK(_parsed) << "Must call init() firstly"; if (PREDICT_FALSE(*n == 0 || _cur_index >= _num_elements)) { diff --git a/be/src/olap/rowset/segment_v2/page_builder.h b/be/src/olap/rowset/segment_v2/page_builder.h index 82f54ba6aa..5280ef26a6 100644 --- a/be/src/olap/rowset/segment_v2/page_builder.h +++ b/be/src/olap/rowset/segment_v2/page_builder.h @@ -74,6 +74,15 @@ public: // Return the total bytes of pageBuilder that have been added to the page. virtual uint64_t size() const = 0; + // Return the first value in this page. + // This method could only be called between finish() and reset(). + // Status::NotFound if no values have been added. + virtual Status get_first_value(void* value) const = 0; + + // Return the last value in this page. + // This method could only be called between finish() and reset(). + // Status::NotFound if no values have been added. + virtual Status get_last_value(void* value) const = 0; private: DISALLOW_COPY_AND_ASSIGN(PageBuilder); }; diff --git a/be/src/olap/rowset/segment_v2/plain_page.h b/be/src/olap/rowset/segment_v2/plain_page.h index 386ba941b4..548f0f1f76 100644 --- a/be/src/olap/rowset/segment_v2/plain_page.h +++ b/be/src/olap/rowset/segment_v2/plain_page.h @@ -59,6 +59,10 @@ public: OwnedSlice finish() override { encode_fixed32_le((uint8_t *) &_buffer[0], _count); + if (_count > 0) { + _first_value.assign_copy(&_buffer[PLAIN_PAGE_HEADER_SIZE], SIZE_OF_TYPE); + _last_value.assign_copy(&_buffer[PLAIN_PAGE_HEADER_SIZE + (_count - 1) * SIZE_OF_TYPE], SIZE_OF_TYPE); + } return _buffer.build(); } @@ -77,6 +81,22 @@ public: return _buffer.size(); } + Status get_first_value(void* value) const override { + if (_count == 0) { + return Status::NotFound("page is empty"); + } + memcpy(value, _first_value.data(), SIZE_OF_TYPE); + return Status::OK(); + } + + Status get_last_value(void* value) const override { + if (_count == 0) { + return Status::NotFound("page is empty"); + } + memcpy(value, _last_value.data(), SIZE_OF_TYPE); + return Status::OK(); + } + private: faststring _buffer; PageBuilderOptions _options; @@ -85,6 +105,8 @@ private: enum { SIZE_OF_TYPE = TypeTraits::size }; + faststring _first_value; + faststring _last_value; }; diff --git a/be/src/olap/rowset/segment_v2/rle_page.h b/be/src/olap/rowset/segment_v2/rle_page.h index 390a309e80..027ffff458 100644 --- a/be/src/olap/rowset/segment_v2/rle_page.h +++ b/be/src/olap/rowset/segment_v2/rle_page.h @@ -93,6 +93,12 @@ public: memcpy(&value, &new_vals[i], SIZE_OF_TYPE); _rle_encoder->Put(value); } + + if (_count == 0) { + memcpy(&_first_value, new_vals, SIZE_OF_TYPE); + } + memcpy(&_last_value, &new_vals[*count - 1], SIZE_OF_TYPE); + _count += *count; return Status::OK(); } @@ -121,6 +127,24 @@ public: return _rle_encoder->len(); } + Status get_first_value(void* value) const override { + DCHECK(_finished); + if (_count == 0) { + return Status::NotFound("page is empty"); + } + memcpy(value, &_first_value, SIZE_OF_TYPE); + return Status::OK(); + } + + Status get_last_value(void* value) const override { + DCHECK(_finished); + if (_count == 0) { + return Status::NotFound("page is empty"); + } + memcpy(value, &_last_value, SIZE_OF_TYPE); + return Status::OK(); + } + private: typedef typename TypeTraits::CppType CppType; enum { @@ -133,6 +157,8 @@ private: int _bit_width; RleEncoder* _rle_encoder; faststring _buf; + CppType _first_value; + CppType _last_value; }; template diff --git a/be/src/util/frame_of_reference_coding.cpp b/be/src/util/frame_of_reference_coding.cpp index a5a8436f0f..e702ee1c33 100644 --- a/be/src/util/frame_of_reference_coding.cpp +++ b/be/src/util/frame_of_reference_coding.cpp @@ -192,9 +192,10 @@ bool ForDecoder::init() { return false; } - _frame_value_num = decode_fixed32_le(_buffer + _buffer_len - 5); + _max_frame_size = decode_fixed8(_buffer + _buffer_len - 5); _values_num = decode_fixed32_le(_buffer + _buffer_len - 4); - _frame_count = _values_num / _frame_value_num + (_values_num % _frame_value_num != 0); + _frame_count = _values_num / _max_frame_size + (_values_num % _max_frame_size != 0); + _last_frame_size = _max_frame_size - (_max_frame_size * _frame_count - _values_num); size_t bit_width_offset = _buffer_len - 5 - _frame_count; if (bit_width_offset < 0) { @@ -216,13 +217,14 @@ bool ForDecoder::init() { _frame_offsets.push_back(frame_start_offset); if (sizeof(T) == 8) { - frame_start_offset += bit_width * _frame_value_num / 8 + 8; + frame_start_offset += bit_width * _max_frame_size / 8 + 8; } else { - frame_start_offset += bit_width * _frame_value_num / 8 + 4; + frame_start_offset += bit_width * _max_frame_size / 8 + 4; } } - _out_buffer.reserve(_frame_value_num); + _out_buffer.reserve(_max_frame_size); + _parsed = true; return true; } @@ -254,12 +256,12 @@ void ForDecoder::bit_unpack(const uint8_t *input, uint8_t in_num, int bit_wid template void ForDecoder::decode_current_frame(T* output) { - _current_decoded_frame = _current_index / _frame_value_num; - uint8_t frame_value_num = _frame_value_num; - // compute last frame value num - if (_current_decoded_frame == _frame_count - 1 && _values_num % _frame_value_num != 0) { - frame_value_num = _values_num % _frame_value_num;; + uint32_t frame_index = _current_index / _max_frame_size; + if (frame_index == _current_decoded_frame) { + return; // current frame already decoded } + _current_decoded_frame = frame_index; + uint8_t current_frame_size = frame_size(frame_index); uint32_t base_offset = _frame_offsets[_current_decoded_frame]; T min = 0; @@ -275,25 +277,37 @@ void ForDecoder::decode_current_frame(T* output) { uint8_t bit_width = _bit_widths[_current_decoded_frame]; bool is_ascending = _order_flags[_current_decoded_frame]; - std::vector delta_values(_frame_value_num); - bit_unpack(_buffer + delta_offset, frame_value_num, bit_width, delta_values.data()); + std::vector delta_values(current_frame_size); + bit_unpack(_buffer + delta_offset, current_frame_size, bit_width, delta_values.data()); if (is_ascending) { T pre_value = min; - for (uint8_t i = 0; i < frame_value_num; i ++) { + for (uint8_t i = 0; i < current_frame_size; i ++) { T value = delta_values[i] + pre_value; output[i] = value; pre_value = value; } } else { - for (uint8_t i = 0; i < frame_value_num; i ++) { + for (uint8_t i = 0; i < current_frame_size; i ++) { output[i] = delta_values[i] + min; } } } +template +T ForDecoder::decode_frame_min_value(uint32_t frame_index) { + uint32_t min_offset = _frame_offsets[frame_index]; + T min = 0; + if (sizeof(T) == 8) { + min = decode_fixed64_le(_buffer + min_offset); + } else { + min = decode_fixed32_le(_buffer + min_offset); + } + return min; +} + template T* ForDecoder::copy_value(T* val, size_t count) { - memcpy(val, &_out_buffer[_current_index % _frame_value_num], sizeof(T) * count); + memcpy(val, &_out_buffer[_current_index % _max_frame_size], sizeof(T) * count); _current_index += count; val += count; return val; @@ -305,30 +319,28 @@ bool ForDecoder::get_batch(T* val, size_t count) { return false; } - if (need_decode_frame()) { - decode_current_frame(_out_buffer.data()); - } + decode_current_frame(_out_buffer.data()); - if (_current_index + count < _frame_value_num * (_current_decoded_frame + 1)) { + if (_current_index + count < _max_frame_size * (_current_decoded_frame + 1)) { copy_value(val, count); return true; } // 1. padding one frame - size_t padding_num = _frame_value_num * (_current_decoded_frame + 1) - _current_index; + size_t padding_num = _max_frame_size * (_current_decoded_frame + 1) - _current_index; val = copy_value(val, padding_num); // 2. process frame by frame - size_t frame_size = (count - padding_num) / _frame_value_num; - for (size_t i = 0; i < frame_size; i++) { + size_t frame_count = (count - padding_num) / _max_frame_size; + for (size_t i = 0; i < frame_count; i++) { // directly decode value to the output, don't buffer the value decode_current_frame(val); - _current_index += _frame_value_num; - val += _frame_value_num; + _current_index += _max_frame_size; + val += _max_frame_size; } // 3. process remaining value - size_t remaining_num = (count - padding_num) % _frame_value_num; + size_t remaining_num = (count - padding_num) % _max_frame_size; if (remaining_num > 0) { decode_current_frame(_out_buffer.data()); val = copy_value(val, remaining_num); @@ -346,6 +358,69 @@ bool ForDecoder::skip(int32_t skip_num) { return true; } +template +uint32_t ForDecoder::seek_last_frame_before_value(T target) { + // first of all, find the first frame >= target + uint32_t left = 0; + uint32_t right = _frame_count; + while (left < right) { + uint32_t mid = left + (right - left) / 2; + T midValue = decode_frame_min_value(mid); + if (midValue < target) { + left = mid + 1; + } else { + right = mid; + } + } + // after loop, left is the first frame >= target + if (left == 0) { + // all frames are >= target, not found + return _frame_count; + } + // otherwise previous frame is the last frame < target + return left - 1; +} + +template +bool ForDecoder::seek_lower_bound_inside_frame(uint32_t frame_index, T target, bool* exact_match) { + _current_index = frame_index * _max_frame_size; + decode_current_frame(_out_buffer.data()); + auto end = _out_buffer.begin() + frame_size(frame_index); + auto pos = std::lower_bound(_out_buffer.begin(), end, target); + if (pos != end) { // found in this frame + uint32_t pos_in_frame = std::distance(_out_buffer.begin(), pos); + *exact_match = _out_buffer[pos_in_frame] == target; + _current_index += pos_in_frame; + return true; + } + return false; +} + + +template +bool ForDecoder::seek_at_or_after_value(const void* value, bool* exact_match) { + T target = *reinterpret_cast(value); + uint32_t frame_to_search = seek_last_frame_before_value(target); + if (frame_to_search == _frame_count) { + // all frames are >= target, the searched value must the be first value + _current_index = 0; + decode_current_frame(_out_buffer.data()); + *exact_match = _out_buffer[0] == target; + return true; + } + // binary search inside the last frame < target + bool found = seek_lower_bound_inside_frame(frame_to_search, target, exact_match); + // if not found, all values in the last frame are less than target. + // then the searched value must be the first value of the next frame. + if (!found && frame_to_search < _frame_count - 1) { + _current_index = (frame_to_search + 1) * _max_frame_size; + decode_current_frame(_out_buffer.data()); + *exact_match = _out_buffer[0] == target; + return true; + } + return found; +} + template class ForEncoder; template class ForEncoder; template class ForEncoder; diff --git a/be/src/util/frame_of_reference_coding.h b/be/src/util/frame_of_reference_coding.h index c48c50f97f..2b964f6cc6 100644 --- a/be/src/util/frame_of_reference_coding.h +++ b/be/src/util/frame_of_reference_coding.h @@ -98,7 +98,8 @@ class ForDecoder { public: explicit ForDecoder(const uint8_t* in_buffer, size_t buffer_len) :_buffer (in_buffer), - _buffer_len (buffer_len){} + _buffer_len (buffer_len), + _parsed(false){} // read footer metadata bool init(); @@ -114,6 +115,12 @@ public: // The skip_num is negative means move backwards bool skip(int32_t skip_num); + bool seek_at_or_after_value(const void* value, bool* exact_match); + + uint32_t current_index() const { + return _current_index; + } + uint32_t count() const { return _values_num; } @@ -122,26 +129,41 @@ private: void bit_unpack(const uint8_t *input, uint8_t in_num, int bit_width, T *output); + inline uint32_t frame_size(uint32_t frame_index) { + return (frame_index == _frame_count - 1) ? _last_frame_size : _max_frame_size; + } + void decode_current_frame(T* output); + T decode_frame_min_value(uint32_t frame_index); + + // Return index of the last frame which contains value < target. + // Return `_frame_count - 1` when all frames are < target. + // Return `_frame_count` when not found (all frames are >= target). + uint32_t seek_last_frame_before_value(T target); + + // Seek to the first value in frame that >= target. + // Return true when found and update exact_match. + // Return false otherwise. + bool seek_lower_bound_inside_frame(uint32_t frame_index, T target, bool* exact_match); + T* copy_value(T* val, size_t count); - bool need_decode_frame() { - return !(_frame_value_num * _current_decoded_frame < _current_index - && _current_index < _frame_value_num * (_current_decoded_frame + 1)); - } + const uint8_t* _buffer = nullptr; + size_t _buffer_len = 0; + bool _parsed = false; - uint8_t _frame_value_num = 0; + uint8_t _max_frame_size = 0; + uint8_t _last_frame_size = 0; uint32_t _values_num = 0; uint32_t _frame_count = 0; - uint32_t _current_index = 0; - uint32_t _current_decoded_frame = -1; - std::vector _out_buffer; std::vector _frame_offsets; std::vector _bit_widths; std::vector _order_flags; - const uint8_t* _buffer; - size_t _buffer_len = 0; + + uint32_t _current_index = 0; + uint32_t _current_decoded_frame = -1; + std::vector _out_buffer; // store values of decoded frame }; } diff --git a/be/test/olap/CMakeLists.txt b/be/test/olap/CMakeLists.txt index 41088c453d..67ea3ecc94 100644 --- a/be/test/olap/CMakeLists.txt +++ b/be/test/olap/CMakeLists.txt @@ -49,6 +49,7 @@ ADD_BE_TEST(aggregate_func_test) ADD_BE_TEST(rowset/segment_v2/bitshuffle_page_test) ADD_BE_TEST(rowset/segment_v2/plain_page_test) ADD_BE_TEST(rowset/segment_v2/binary_plain_page_test) +ADD_BE_TEST(rowset/segment_v2/binary_prefix_page_test) ADD_BE_TEST(rowset/segment_v2/column_reader_writer_test) ADD_BE_TEST(rowset/segment_v2/encoding_info_test) ADD_BE_TEST(rowset/segment_v2/page_compression_test) diff --git a/be/test/olap/rowset/segment_v2/binary_dict_page_test.cpp b/be/test/olap/rowset/segment_v2/binary_dict_page_test.cpp index 16f2877650..1cb1eaa257 100644 --- a/be/test/olap/rowset/segment_v2/binary_dict_page_test.cpp +++ b/be/test/olap/rowset/segment_v2/binary_dict_page_test.cpp @@ -50,6 +50,14 @@ public: ASSERT_EQ(slices.size(), page_builder.count()); ASSERT_FALSE(page_builder.is_page_full()); + //check first value and last value + Slice first_value; + page_builder.get_first_value(&first_value); + ASSERT_EQ(slices[0], first_value); + Slice last_value; + page_builder.get_last_value(&last_value); + ASSERT_EQ(slices[count - 1], last_value); + // construct dict page OwnedSlice dict_slice; Status status = page_builder.get_dictionary_page(&dict_slice); diff --git a/be/test/olap/rowset/segment_v2/binary_plain_page_test.cpp b/be/test/olap/rowset/segment_v2/binary_plain_page_test.cpp index 1947f79161..a43ec35d64 100644 --- a/be/test/olap/rowset/segment_v2/binary_plain_page_test.cpp +++ b/be/test/olap/rowset/segment_v2/binary_plain_page_test.cpp @@ -54,6 +54,15 @@ public: Status ret = page_builder.add(reinterpret_cast(ptr), &count); OwnedSlice owned_slice = page_builder.finish(); + + //check first value and last value + Slice first_value; + page_builder.get_first_value(&first_value); + ASSERT_EQ(slices[0], first_value); + Slice last_value; + page_builder.get_last_value(&last_value); + ASSERT_EQ(slices[count - 1], last_value); + PageDecoderOptions decoder_options; PageDecoderType page_decoder(owned_slice.slice(), decoder_options); Status status = page_decoder.init(); diff --git a/be/test/olap/rowset/segment_v2/binary_prefix_page_test.cpp b/be/test/olap/rowset/segment_v2/binary_prefix_page_test.cpp new file mode 100644 index 0000000000..1a3f3d284a --- /dev/null +++ b/be/test/olap/rowset/segment_v2/binary_prefix_page_test.cpp @@ -0,0 +1,180 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include + +#include "common/logging.h" +#include "olap/rowset/segment_v2/page_builder.h" +#include "olap/rowset/segment_v2/page_decoder.h" +#include "olap/rowset/segment_v2/binary_prefix_page.h" +#include "olap/olap_common.h" +#include "olap/types.h" +#include "util/debug_util.h" +#include "runtime/mem_pool.h" +#include "runtime/mem_tracker.h" + +namespace doris { +namespace segment_v2 { + +class BinaryPrefixPageTest : public testing::Test { + public: + void test_encode_and_decode() { + std::vector test_data; + for (int i = 1000; i < 1038; ++i) { + test_data.push_back(std::to_string(i)); + } + std::vector slices; + for (int i = 0; i < test_data.size(); ++i) { + Slice s(test_data[i]); + slices.emplace_back(s); + } + // encode + PageBuilderOptions options; + BinaryPrefixPageBuilder page_builder(options); + + size_t count = slices.size(); + const Slice *ptr = &slices[0]; + Status ret = page_builder.add(reinterpret_cast(ptr), &count); + + OwnedSlice dict_slice = page_builder.finish(); + ASSERT_EQ(slices.size(), page_builder.count()); + ASSERT_FALSE(page_builder.is_page_full()); + + //check first value and last value + Slice first_value; + page_builder.get_first_value(&first_value); + ASSERT_EQ(slices[0], first_value); + Slice last_value; + page_builder.get_last_value(&last_value); + ASSERT_EQ(slices[count - 1], last_value); + + PageDecoderOptions dict_decoder_options; + std::unique_ptr page_decoder( + new BinaryPrefixPageDecoder(dict_slice.slice(), dict_decoder_options)); + ret = page_decoder->init(); + ASSERT_TRUE(ret.ok()); + // because every slice is unique + ASSERT_EQ(slices.size(), page_decoder->count()); + + //check values + MemTracker tracker; + MemPool pool(&tracker); + TypeInfo* type_info = get_type_info(OLAP_FIELD_TYPE_VARCHAR); + size_t size = slices.size(); + Slice* values = reinterpret_cast(pool.allocate(size * sizeof(Slice))); + ColumnBlock column_block(type_info, (uint8_t*)values, nullptr, size, &pool); + ColumnBlockView block_view(&column_block); + + ret = page_decoder->next_batch(&size, &block_view); + ASSERT_TRUE(ret.ok()); + ASSERT_EQ(slices.size(), size); + for (int i = 1000; i < 1038; ++i) { + ASSERT_EQ(std::to_string(i), values[i - 1000].to_string()); + } + + values = reinterpret_cast(pool.allocate(size * sizeof(Slice))); + ColumnBlock column_block2(type_info, (uint8_t*)values, nullptr, size, &pool); + ColumnBlockView block_view2(&column_block2); + ret = page_decoder->seek_to_position_in_page(15); + ASSERT_TRUE(ret.ok()); + + ret = page_decoder->next_batch(&size, &block_view2); + ASSERT_TRUE(ret.ok()); + ASSERT_EQ(23, size); + for (int i = 1015; i < 1038; ++i) { + ASSERT_EQ(std::to_string(i), values[i - 1015].to_string()); + } + + Slice v1 = Slice(std::to_string(1039)); + bool exact_match; + ret = page_decoder->seek_at_or_after_value(&v1, &exact_match); + ASSERT_TRUE(ret.is_not_found()); + + Slice v2 = Slice(std::to_string(1000)); + ret = page_decoder->seek_at_or_after_value(&v2, &exact_match); + ASSERT_TRUE(ret.ok()); + ASSERT_TRUE(exact_match); + + Slice v3 = Slice(std::to_string(1037)); + ret = page_decoder->seek_at_or_after_value(&v3, &exact_match); + ASSERT_TRUE(ret.ok()); + ASSERT_TRUE(exact_match); + + Slice v4 = Slice(std::to_string(100)); + ret = page_decoder->seek_at_or_after_value(&v4, &exact_match); + ASSERT_TRUE(ret.ok()); + ASSERT_TRUE(!exact_match); + } + + void test_encode_and_decode2() { + std::vector test_data; + test_data.push_back("ab"); + test_data.push_back("c"); + std::vector slices; + for (int i = 0; i < test_data.size(); ++i) { + Slice s(test_data[i]); + slices.emplace_back(s); + } + // encode + PageBuilderOptions options; + BinaryPrefixPageBuilder page_builder(options); + + size_t count = slices.size(); + const Slice *ptr = &slices[0]; + Status ret = page_builder.add(reinterpret_cast(ptr), &count); + + OwnedSlice dict_slice = page_builder.finish(); + + PageDecoderOptions dict_decoder_options; + std::unique_ptr page_decoder( + new BinaryPrefixPageDecoder(dict_slice.slice(), dict_decoder_options)); + ret = page_decoder->init(); + ASSERT_TRUE(ret.ok()); + + MemTracker tracker; + MemPool pool(&tracker); + TypeInfo* type_info = get_type_info(OLAP_FIELD_TYPE_VARCHAR); + size_t size = slices.size(); + Slice* values = reinterpret_cast(pool.allocate(size * sizeof(Slice))); + ColumnBlock column_block(type_info, (uint8_t*)values, nullptr, size, &pool); + ColumnBlockView block_view(&column_block); + + Slice slice("c"); + bool exact_match; + ret = page_decoder->seek_at_or_after_value(&slice, &exact_match); + ASSERT_TRUE(ret.ok()); + ASSERT_TRUE(exact_match); + } +}; + +TEST_F(BinaryPrefixPageTest, TestEncodeAndDecode) { + test_encode_and_decode(); +} + +TEST_F(BinaryPrefixPageTest, TestEncodeAndDecode2) { + test_encode_and_decode2(); +} + +} // namespace segment_v2 +} // namespace doris + +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} \ No newline at end of file diff --git a/be/test/olap/rowset/segment_v2/bitshuffle_page_test.cpp b/be/test/olap/rowset/segment_v2/bitshuffle_page_test.cpp index 202f57cc83..bb9bded9f8 100644 --- a/be/test/olap/rowset/segment_v2/bitshuffle_page_test.cpp +++ b/be/test/olap/rowset/segment_v2/bitshuffle_page_test.cpp @@ -57,8 +57,14 @@ public: page_builder.add(reinterpret_cast(src), &size); OwnedSlice s = page_builder.finish(); - LOG(INFO) << "RLE Encoded size for 10k values: " << s.slice().size - << ", original size:" << size * sizeof(CppType); + + //check first value and last value + CppType first_value; + page_builder.get_first_value(&first_value); + ASSERT_EQ(src[0], first_value); + CppType last_value; + page_builder.get_last_value(&last_value); + ASSERT_EQ(src[size - 1], last_value); segment_v2::PageDecoderOptions decoder_options; PageDecoderType page_decoder(s.slice(), decoder_options); diff --git a/be/test/olap/rowset/segment_v2/frame_of_reference_page_test.cpp b/be/test/olap/rowset/segment_v2/frame_of_reference_page_test.cpp index 00a9803953..261e9f9b48 100644 --- a/be/test/olap/rowset/segment_v2/frame_of_reference_page_test.cpp +++ b/be/test/olap/rowset/segment_v2/frame_of_reference_page_test.cpp @@ -159,6 +159,25 @@ TEST_F(FrameOfReferencePageTest, TestInt32SequenceBlockEncoderSize) { ASSERT_EQ(26, s.slice().size); } +TEST_F(FrameOfReferencePageTest, TestFirstLastValue) { + size_t size = 128; + std::unique_ptr ints(new int32_t[size]); + for (int i = 0; i < size; i++) { + ints.get()[i] = i; + } + PageBuilderOptions builder_options; + builder_options.data_page_size = 256 * 1024; + segment_v2::FrameOfReferencePageBuilder page_builder(builder_options); + page_builder.add(reinterpret_cast(ints.get()), &size); + OwnedSlice s = page_builder.finish(); + int32_t first_value = -1; + page_builder.get_first_value(&first_value); + ASSERT_EQ(0, first_value); + int32_t last_value = 0; + page_builder.get_last_value(&last_value); + ASSERT_EQ(127, last_value); +} + TEST_F(FrameOfReferencePageTest, TestInt32NormalBlockEncoderSize) { size_t size = 128; std::unique_ptr ints(new int32_t[size]); diff --git a/be/test/olap/rowset/segment_v2/plain_page_test.cpp b/be/test/olap/rowset/segment_v2/plain_page_test.cpp index 9e13c967ae..a626f4f232 100644 --- a/be/test/olap/rowset/segment_v2/plain_page_test.cpp +++ b/be/test/olap/rowset/segment_v2/plain_page_test.cpp @@ -69,6 +69,14 @@ public: page_builder.add(reinterpret_cast(src), &size); OwnedSlice s = page_builder.finish(); + //check first value and last value + CppType first_value; + page_builder.get_first_value(&first_value); + ASSERT_EQ(src[0], first_value); + CppType last_value; + page_builder.get_last_value(&last_value); + ASSERT_EQ(src[size - 1], last_value); + PageDecoderOptions decoder_options; PageDecoderType page_decoder(s.slice(), decoder_options); Status status = page_decoder.init(); diff --git a/be/test/olap/rowset/segment_v2/rle_page_test.cpp b/be/test/olap/rowset/segment_v2/rle_page_test.cpp index d548b6ba8c..bb7050b658 100644 --- a/be/test/olap/rowset/segment_v2/rle_page_test.cpp +++ b/be/test/olap/rowset/segment_v2/rle_page_test.cpp @@ -58,8 +58,14 @@ public: rle_page_builder.add(reinterpret_cast(src), &size); OwnedSlice s = rle_page_builder.finish(); ASSERT_EQ(size, rle_page_builder.count()); - LOG(INFO) << "RLE Encoded size for 10k values: " << s.slice().size - << ", original size:" << size * sizeof(CppType); + + //check first value and last value + CppType first_value; + rle_page_builder.get_first_value(&first_value); + ASSERT_EQ(src[0], first_value); + CppType last_value; + rle_page_builder.get_last_value(&last_value); + ASSERT_EQ(src[size - 1], last_value); PageDecoderOptions decodeder_options; PageDecoderType rle_page_decoder(s.slice(), decodeder_options); diff --git a/be/test/util/frame_of_reference_coding_test.cpp b/be/test/util/frame_of_reference_coding_test.cpp index 2a7bca8b27..5ea82b7731 100644 --- a/be/test/util/frame_of_reference_coding_test.cpp +++ b/be/test/util/frame_of_reference_coding_test.cpp @@ -158,6 +158,91 @@ TEST_F(TestForCoding, TestBytesAlign) { ASSERT_EQ(2020, actual_value); } +TEST_F(TestForCoding, TestValueSeekSpecialCase) { + faststring buffer(1); + ForEncoder encoder(&buffer); + + std::vector data; + for (int64_t i = 0; i < 128; ++i) { + data.push_back(i); + } + + for (int64_t i = 300; i < 500; ++i) { + data.push_back(i); + } + + encoder.put_batch(data.data(), data.size()); + encoder.flush(); + + ForDecoder decoder(buffer.data(), buffer.length()); + decoder.init(); + + int64_t target = 160; + bool exact_match; + bool has_value = decoder.seek_at_or_after_value(&target, &exact_match); + ASSERT_EQ(has_value, true); + ASSERT_EQ(exact_match, false); + + int64_t next_value; + decoder.get(&next_value); + ASSERT_EQ(300, next_value); +} + +TEST_F(TestForCoding, TestValueSeek) { + faststring buffer(1); + ForEncoder encoder(&buffer); + + const int64_t SIZE = 320; + std::vector data; + for (int64_t i = 0; i < SIZE; ++i) { + data.push_back(i); + } + encoder.put_batch(data.data(), SIZE); + encoder.flush(); + + ForDecoder decoder(buffer.data(), buffer.length()); + decoder.init(); + + int64_t target = 160; + bool exact_match; + bool found = decoder.seek_at_or_after_value(&target, &exact_match); + ASSERT_EQ(found, true); + ASSERT_EQ(exact_match, true); + + int64_t actual_value; + decoder.get(&actual_value); + ASSERT_EQ(target, actual_value); + + target = -1; + found = decoder.seek_at_or_after_value(&target, &exact_match); + ASSERT_EQ(found, true); + ASSERT_EQ(exact_match, false); + + std::vector actual_result(SIZE); + decoder.get_batch(actual_result.data(), SIZE); + ASSERT_EQ(data, actual_result); + + target = 0; + found = decoder.seek_at_or_after_value(&target, &exact_match); + ASSERT_EQ(found, true); + ASSERT_EQ(exact_match, true); + + decoder.get_batch(actual_result.data(), SIZE); + ASSERT_EQ(data, actual_result); + + target = 319; + found = decoder.seek_at_or_after_value(&target, &exact_match); + ASSERT_EQ(found, true); + ASSERT_EQ(exact_match, true); + + decoder.get(&actual_value); + ASSERT_EQ(target, actual_value); + + target = 320; + found = decoder.seek_at_or_after_value(&target, &exact_match); + ASSERT_EQ(found, false); +} + } int main(int argc, char** argv) { diff --git a/run-ut.sh b/run-ut.sh index 0e9167d3d1..a1566c6b69 100755 --- a/run-ut.sh +++ b/run-ut.sh @@ -270,6 +270,7 @@ ${DORIS_TEST_BINARY_DIR}/olap/rowset/segment_v2/binary_plain_page_test ${DORIS_TEST_BINARY_DIR}/olap/rowset/segment_v2/column_reader_writer_test ${DORIS_TEST_BINARY_DIR}/olap/rowset/segment_v2/rle_page_test ${DORIS_TEST_BINARY_DIR}/olap/rowset/segment_v2/binary_dict_page_test +${DORIS_TEST_BINARY_DIR}/olap/rowset/segment_v2/binary_prefix_page_test ${DORIS_TEST_BINARY_DIR}/olap/rowset/segment_v2/segment_test ${DORIS_TEST_BINARY_DIR}/olap/rowset/segment_v2/page_compression_test ${DORIS_TEST_BINARY_DIR}/olap/rowset/segment_v2/column_zone_map_test