diff --git a/be/src/olap/rowset/segment_v2/frame_of_reference_page.h b/be/src/olap/rowset/segment_v2/frame_of_reference_page.h new file mode 100644 index 0000000000..6b33ccf02b --- /dev/null +++ b/be/src/olap/rowset/segment_v2/frame_of_reference_page.h @@ -0,0 +1,162 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "olap/rowset/segment_v2/page_builder.h" // for PageBuilder +#include "olap/rowset/segment_v2/page_decoder.h" // for PageDecoder +#include "olap/rowset/segment_v2/options.h" // for PageBuilderOptions/PageDecoderOptions +#include "olap/rowset/segment_v2/common.h" // for rowid_t +#include "util/frame_of_reference_coding.h" + +namespace doris { +namespace segment_v2 { + +// Encode page use frame-of-reference coding +template +class FrameOfReferencePageBuilder : public PageBuilder { +public: + explicit FrameOfReferencePageBuilder(const PageBuilderOptions& options) : + _options(options), + _count(0), + _finished(false) { + _encoder.reset(new ForEncoder(&_buf)); + } + + bool is_page_full() override { + return _encoder->len() >= _options.data_page_size; + } + + Status add(const uint8_t* vals, size_t* count) override { + DCHECK(!_finished); + auto new_vals = reinterpret_cast(vals); + _encoder->put_batch(new_vals, *count); + _count += *count; + return Status::OK(); + } + + Slice finish() override { + _finished = true; + _encoder->flush(); + return Slice(_buf.data(), _buf.size()); + } + + void reset() override { + _count = 0; + _finished = false; + _encoder->clear(); + } + + size_t count() const override { + return _count; + } + + uint64_t size() const override { + return _buf.size(); + } + + // this api will release the memory ownership of encoded data + // Note: + // release() should be called after finish + // reset() should be called after this function before reuse the builder + void release() override { + uint8_t* ret = _buf.release(); + (void)ret; + } + +private: + typedef typename TypeTraits::CppType CppType; + PageBuilderOptions _options; + size_t _count; + bool _finished; + std::unique_ptr> _encoder; + faststring _buf; +}; + +template +class FrameOfReferencePageDecoder : public PageDecoder { +public: + FrameOfReferencePageDecoder(Slice slice, const PageDecoderOptions& options) : + _parsed(false), + _data(slice), + _num_elements(0), + _cur_index(0){ + _decoder.reset(new ForDecoder((uint8_t*)_data.data, _data.size)); + } + + Status init() override { + CHECK(!_parsed); + bool result = _decoder->init(); + if (result) { + _num_elements = _decoder->count(); + _parsed = true; + return Status::OK(); + } else { + return Status::Corruption("The frame of reference page metadata maybe broken"); + } + } + + Status seek_to_position_in_page(size_t pos) override { + DCHECK(_parsed) << "Must call init() firstly"; + DCHECK_LE(pos, _num_elements) << "Tried to seek to " << pos << " which is > number of elements (" + << _num_elements << ") in the block!"; + // If the block is empty (e.g. the column is filled with nulls), there is no data to seek. + if (PREDICT_FALSE(_num_elements == 0)) { + return Status::OK(); + } + + int32_t skip_num = pos - _cur_index; + _decoder->skip(skip_num); + _cur_index = pos; + return Status::OK(); + } + + Status next_batch(size_t* n, ColumnBlockView* dst) override { + DCHECK(_parsed) << "Must call init() firstly"; + if (PREDICT_FALSE(*n == 0 || _cur_index >= _num_elements)) { + *n = 0; + return Status::OK(); + } + + size_t to_fetch = std::min(*n, static_cast(_num_elements - _cur_index)); + uint8_t* data_ptr = dst->data(); + _decoder->get_batch(reinterpret_cast(data_ptr), to_fetch); + _cur_index += to_fetch; + *n = to_fetch; + return Status::OK(); + } + + size_t count() const override { + return _num_elements; + } + + size_t current_index() const override { + return _cur_index; + } + +private: + typedef typename TypeTraits::CppType CppType; + + bool _parsed; + Slice _data; + uint32_t _num_elements; + size_t _cur_index; + std::unique_ptr> _decoder; +}; + +} // namespace segment_v2 +} // namespace doris \ No newline at end of file diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt index c282b6e625..66e8a9238d 100644 --- a/be/src/util/CMakeLists.txt +++ b/be/src/util/CMakeLists.txt @@ -81,6 +81,7 @@ set(UTIL_FILES thrift_rpc_helper.cpp faststring.cc slice.cpp + frame_of_reference_coding.cpp ) if (WITH_MYSQL) diff --git a/be/src/util/bit_stream_utils.h b/be/src/util/bit_stream_utils.h index 2b68c8426d..220c8cb4f1 100644 --- a/be/src/util/bit_stream_utils.h +++ b/be/src/util/bit_stream_utils.h @@ -49,8 +49,7 @@ class BitWriter { int bytes_written() const { return byte_offset_ + BitUtil::Ceil(bit_offset_, 8); } // Writes a value to buffered_values_, flushing to buffer_ if necessary. This is bit - // packed. num_bits must be <= 32. If 'v' is larger than 'num_bits' bits, the higher - // bits are ignored. + // packed. void PutValue(uint64_t v, int num_bits); // Writes v to the next aligned byte using num_bits. If T is larger than num_bits, the diff --git a/be/src/util/frame_of_reference_coding.cpp b/be/src/util/frame_of_reference_coding.cpp new file mode 100644 index 0000000000..a5a8436f0f --- /dev/null +++ b/be/src/util/frame_of_reference_coding.cpp @@ -0,0 +1,366 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "util/frame_of_reference_coding.h" + +#include +#include + +#include "util/bit_util.h" +#include "util/coding.h" + +namespace doris { + +static inline uint8_t bits(const uint64_t v) { + return v == 0 ? 0 : 64 - __builtin_clzll(v); +} + +template +const T* ForEncoder::copy_value(const T *p_data, size_t count) { + memcpy(&_buffered_values[_buffered_values_num], p_data, count * sizeof(T)); + _buffered_values_num += count; + p_data += count; + return p_data; +} + +template +void ForEncoder::put_batch(const T *in_data, size_t count) { + if (_buffered_values_num + count < FRAME_VALUE_NUM) { + copy_value(in_data, count); + _values_num += count; + return; + } + + // 1. padding one frame + size_t padding_num = FRAME_VALUE_NUM - _buffered_values_num; + in_data = copy_value(in_data, padding_num); + bit_packing_one_frame_value(_buffered_values); + + // 2. process frame by frame + size_t frame_size = (count - padding_num) / FRAME_VALUE_NUM; + for (size_t i = 0; i < frame_size; i ++) { + // directly encode value to the bit_writer, don't buffer the value + _buffered_values_num = FRAME_VALUE_NUM; + bit_packing_one_frame_value(in_data); + in_data += FRAME_VALUE_NUM; + } + + // 3. process remaining value + size_t remaining_num = (count - padding_num) % FRAME_VALUE_NUM; + if (remaining_num > 0) { + copy_value(in_data, remaining_num); + } + + _values_num += count; +} + +// todo(kks): improve this method by SIMD instructions + +// Use as few bit as possible to store a piece of integer data. +// param[in] input: the integer list need to pack +// param[in] in_num: the number integer need to pack +// param[in] bit_width: how many bit we use to store each integer data +// param[out] out: the packed result + +// For example: +// The input is int32 list: 1, 2, 4, 8 and bit_width is 4 +// The output will be: 0001 0010 0100 1000 +template +void ForEncoder::bit_pack(T *input, uint8_t in_num, int bit_width, uint8_t *output) { + if (in_num == 0 || bit_width == 0) { + return; + } + + T in_mask = 0; + int bit_index = 0; + *output = 0; + for (int i = 0; i < in_num; i++) { + in_mask = 1 << (bit_width - 1); + for (int k = 0; k < bit_width; k++) { + if (bit_index > 7) { + bit_index = 0; + output++; + *output = 0; + } + *output |= (((input[i] & in_mask) >> (bit_width - k - 1)) << (7 - bit_index)); + in_mask >>= 1; + bit_index++; + } + } +} + +template +void ForEncoder::bit_packing_one_frame_value(const T* input) { + T min = input[0]; + T max = input[0]; + bool is_ascending = true; + uint8_t bit_width = 0; + + for (uint8_t i = 1; i < _buffered_values_num; ++i) { + if (is_ascending) { + if (input[i] < input[i - 1]) { + is_ascending = false; + } else { + bit_width = std::max(bit_width, bits(input[i]- input[i - 1])) ; + } + } + + if (input[i] < min) { + min = input[i]; + continue; + } + + if (input[i] > max) { + max = input[i]; + } + } + + if (sizeof(T) == 8) { + put_fixed64_le(_buffer, min); + } else { + put_fixed32_le(_buffer, min); + } + + // improve for ascending order input, we could use fewer bit + T delta_values[FRAME_VALUE_NUM]; + u_int8_t order_flag = 0; + if (is_ascending) { + delta_values[0] = 0; + for (uint8_t i = 1; i < _buffered_values_num; ++i) { + delta_values[i] = input[i] - input[i - 1]; + } + order_flag = 1; + } else { + bit_width = bits(static_cast(max - min)); + for (uint8_t i = 0; i < _buffered_values_num; ++i) { + delta_values[i] = input[i] - min; + } + } + // 2 bit order_flag + 6 bit bit_width + uint8_t order_flag_and_bit_width = order_flag << 6 | bit_width; + _order_flag_and_bit_widths.push_back(order_flag_and_bit_width); + + uint32_t packing_len = BitUtil::Ceil(_buffered_values_num * bit_width, 8); + + _buffer->reserve(_buffer->size() + packing_len); + size_t origin_size = _buffer->size(); + _buffer->resize(_buffer->size() + packing_len); + bit_pack(delta_values, _buffered_values_num, bit_width, _buffer->data() + origin_size); + + _buffered_values_num = 0; +} + +template +uint32_t ForEncoder::flush() { + if (_buffered_values_num != 0) { + bit_packing_one_frame_value(_buffered_values); + } + + // write the footer: + // 1 order_flags and bit_widths + for (auto value: _order_flag_and_bit_widths) { + _buffer->append(&value, 1); + } + // 2 frame_value_num and values_num + uint8_t frame_value_num = FRAME_VALUE_NUM; + _buffer->append(&frame_value_num, 1); + put_fixed32_le(_buffer, _values_num); + + return _buffer->size(); +} + + +template +bool ForDecoder::init() { + // When row count is zero, the minimum footer size is 5: + // only has ValuesNum(4) + FrameValueNum(1) + if (_buffer_len < 5) { + return false; + } + + _frame_value_num = decode_fixed32_le(_buffer + _buffer_len - 5); + _values_num = decode_fixed32_le(_buffer + _buffer_len - 4); + _frame_count = _values_num / _frame_value_num + (_values_num % _frame_value_num != 0); + + size_t bit_width_offset = _buffer_len - 5 - _frame_count; + if (bit_width_offset < 0) { + return false; + } + + // read order_flags, bit_widths and compute frame_offsets + uint8_t mask = (1 << 6) - 1; + u_int32_t frame_start_offset = 0; + for (uint32_t i = 0; i < _frame_count; i++ ) { + uint32_t order_flag_and_bit_width = decode_fixed8(_buffer + bit_width_offset); + + uint8_t bit_width = order_flag_and_bit_width & mask; + uint8_t order_flag = order_flag_and_bit_width >> 6; + _bit_widths.push_back(bit_width); + _order_flags.push_back(order_flag); + + bit_width_offset += 1; + + _frame_offsets.push_back(frame_start_offset); + if (sizeof(T) == 8) { + frame_start_offset += bit_width * _frame_value_num / 8 + 8; + } else { + frame_start_offset += bit_width * _frame_value_num / 8 + 4; + } + } + + _out_buffer.reserve(_frame_value_num); + + return true; +} + +// todo(kks): improve this method by SIMD instructions +// The reverse of bit_pack method, get original integer data list from packed bits +// param[in] input: the packed bits need to unpack +// param[in] in_num: the integer number in packed bits +// param[in] bit_width: how many bit we used to store each integer data +// param[out] output: the original integer data list +template +void ForDecoder::bit_unpack(const uint8_t *input, uint8_t in_num, int bit_width, T *output) { + unsigned char in_mask = 0x80; + int bit_index = 0; + while (in_num > 0) { + *output = 0; + for (int i = 0; i < bit_width; i++) { + if (bit_index > 7) { + input++; + bit_index = 0; + } + *output |= ((T)((*input & (in_mask >> bit_index)) >> (7 - bit_index))) << (bit_width - i - 1); + bit_index++; + } + output++; + in_num--; + } +} + +template +void ForDecoder::decode_current_frame(T* output) { + _current_decoded_frame = _current_index / _frame_value_num; + uint8_t frame_value_num = _frame_value_num; + // compute last frame value num + if (_current_decoded_frame == _frame_count - 1 && _values_num % _frame_value_num != 0) { + frame_value_num = _values_num % _frame_value_num;; + } + + uint32_t base_offset = _frame_offsets[_current_decoded_frame]; + T min = 0; + uint32_t delta_offset = 0; + if (sizeof(T) == 8) { + min = decode_fixed64_le(_buffer + base_offset); + delta_offset = base_offset + 8; + } else { + min = decode_fixed32_le(_buffer + base_offset); + delta_offset = base_offset + 4; + } + + uint8_t bit_width = _bit_widths[_current_decoded_frame]; + bool is_ascending = _order_flags[_current_decoded_frame]; + + std::vector delta_values(_frame_value_num); + bit_unpack(_buffer + delta_offset, frame_value_num, bit_width, delta_values.data()); + if (is_ascending) { + T pre_value = min; + for (uint8_t i = 0; i < frame_value_num; i ++) { + T value = delta_values[i] + pre_value; + output[i] = value; + pre_value = value; + } + } else { + for (uint8_t i = 0; i < frame_value_num; i ++) { + output[i] = delta_values[i] + min; + } + } +} + +template +T* ForDecoder::copy_value(T* val, size_t count) { + memcpy(val, &_out_buffer[_current_index % _frame_value_num], sizeof(T) * count); + _current_index += count; + val += count; + return val; +} + +template +bool ForDecoder::get_batch(T* val, size_t count) { + if (_current_index + count > _values_num) { + return false; + } + + if (need_decode_frame()) { + decode_current_frame(_out_buffer.data()); + } + + if (_current_index + count < _frame_value_num * (_current_decoded_frame + 1)) { + copy_value(val, count); + return true; + } + + // 1. padding one frame + size_t padding_num = _frame_value_num * (_current_decoded_frame + 1) - _current_index; + val = copy_value(val, padding_num); + + // 2. process frame by frame + size_t frame_size = (count - padding_num) / _frame_value_num; + for (size_t i = 0; i < frame_size; i++) { + // directly decode value to the output, don't buffer the value + decode_current_frame(val); + _current_index += _frame_value_num; + val += _frame_value_num; + } + + // 3. process remaining value + size_t remaining_num = (count - padding_num) % _frame_value_num; + if (remaining_num > 0) { + decode_current_frame(_out_buffer.data()); + val = copy_value(val, remaining_num); + } + + return true; +} + +template +bool ForDecoder::skip(int32_t skip_num) { + if (_current_index + skip_num >= _values_num || _current_index + skip_num < 0) { + return false; + } + _current_index = _current_index + skip_num; + return true; +} + +template class ForEncoder; +template class ForEncoder; +template class ForEncoder; +template class ForEncoder; +template class ForEncoder; +template class ForEncoder; +template class ForEncoder; +template class ForEncoder; + +template class ForDecoder; +template class ForDecoder; +template class ForDecoder; +template class ForDecoder; +template class ForDecoder; +template class ForDecoder; +template class ForDecoder; +template class ForDecoder; +} diff --git a/be/src/util/frame_of_reference_coding.h b/be/src/util/frame_of_reference_coding.h new file mode 100644 index 0000000000..c48c50f97f --- /dev/null +++ b/be/src/util/frame_of_reference_coding.h @@ -0,0 +1,149 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef DORIS_FRAME_OF_REFERENCE_CODING_H +#define DORIS_FRAME_OF_REFERENCE_CODING_H + + +#include +#include + +#include "util/bit_stream_utils.h" +#include "util/bit_stream_utils.inline.h" +#include "util/faststring.h" + +namespace doris { +// The implementation for frame-of-reference coding +// The detail of frame-of-reference coding, please refer to +// https://lemire.me/blog/2012/02/08/effective-compression-using-frame-of-reference-and-delta-coding/ +// and https://www.elastic.co/cn/blog/frame-of-reference-and-roaring-bitmaps +// +// The encoded data format is as follows: +// +// 1. Body: +// BitPackingFrame * FrameCount +// 2. Footer: +// (2 bit OrderFlag + 6 bit BitWidth) * FrameCount +// 8 bit FrameValueNum +// 32 bit ValuesNum +// +// The not ascending order BitPackingFrame format: +// MinValue, (Value - MinVale) * FrameValueNum +// +// The ascending order BitPackingFrame format: +// MinValue, (Value[i] - Value[i - 1]) * FrameValueNum +// +// The OrderFlag is 1 represents ascending order, 0 represents not ascending order +// The last frame value num maybe less than 128 +template +class ForEncoder { +public: + explicit ForEncoder(faststring* buffer): _buffer(buffer) {} + + void put(const T value) { + return put_batch(&value, 1); + } + + void put_batch(const T* value, size_t count); + + // Flushes any pending values to the underlying buffer. + // Returns the total number of bytes written + uint32_t flush(); + + // underlying buffer size + footer meta size. + // Note: should call this method before flush. + uint32_t len() { + return _buffer->size() + _order_flag_and_bit_widths.size() + 5; + } + + // Resets all the state in the encoder. + void clear() { + _values_num = 0; + _buffered_values_num = 0; + _buffer->clear(); + } + +private: + void bit_pack(T *input, uint8_t in_num, int bit_width, uint8_t *output); + + void bit_packing_one_frame_value(const T* input); + + const T* copy_value(const T* val, size_t count); + + uint32_t _values_num = 0; + uint8_t _buffered_values_num = 0; + static const uint8_t FRAME_VALUE_NUM = 128; + T _buffered_values[FRAME_VALUE_NUM]; + + faststring* _buffer; + std::vector _order_flag_and_bit_widths; +}; + +template +class ForDecoder { +public: + explicit ForDecoder(const uint8_t* in_buffer, size_t buffer_len) + :_buffer (in_buffer), + _buffer_len (buffer_len){} + + // read footer metadata + bool init(); + + bool get(T* val) { + return get_batch(val, 1); + } + + // Gets the next batch value. Returns false if there are no more. + bool get_batch(T* val, size_t count); + + // The skip_num is positive means move forwards + // The skip_num is negative means move backwards + bool skip(int32_t skip_num); + + uint32_t count() const { + return _values_num; + } + +private: + + void bit_unpack(const uint8_t *input, uint8_t in_num, int bit_width, T *output); + + void decode_current_frame(T* output); + + T* copy_value(T* val, size_t count); + + bool need_decode_frame() { + return !(_frame_value_num * _current_decoded_frame < _current_index + && _current_index < _frame_value_num * (_current_decoded_frame + 1)); + } + + uint8_t _frame_value_num = 0; + uint32_t _values_num = 0; + uint32_t _frame_count = 0; + uint32_t _current_index = 0; + uint32_t _current_decoded_frame = -1; + std::vector _out_buffer; + std::vector _frame_offsets; + std::vector _bit_widths; + std::vector _order_flags; + const uint8_t* _buffer; + size_t _buffer_len = 0; +}; +} + + +#endif //DORIS_FRAME_OF_REFERENCE_CODING_H diff --git a/be/test/olap/CMakeLists.txt b/be/test/olap/CMakeLists.txt index 3f248e6c30..03deacac1b 100644 --- a/be/test/olap/CMakeLists.txt +++ b/be/test/olap/CMakeLists.txt @@ -57,6 +57,7 @@ ADD_BE_TEST(rowset/segment_v2/binary_dict_page_test) ADD_BE_TEST(rowset/segment_v2/segment_test) ADD_BE_TEST(rowset/segment_v2/column_zone_map_test) ADD_BE_TEST(rowset/segment_v2/row_ranges_test) +ADD_BE_TEST(rowset/segment_v2/frame_of_reference_page_test) ADD_BE_TEST(tablet_meta_manager_test) ADD_BE_TEST(tablet_mgr_test) ADD_BE_TEST(rowset/rowset_meta_manager_test) diff --git a/be/test/olap/rowset/segment_v2/frame_of_reference_page_test.cpp b/be/test/olap/rowset/segment_v2/frame_of_reference_page_test.cpp new file mode 100644 index 0000000000..756b0963b0 --- /dev/null +++ b/be/test/olap/rowset/segment_v2/frame_of_reference_page_test.cpp @@ -0,0 +1,180 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include + +#include "olap/rowset/segment_v2/options.h" +#include "olap/rowset/segment_v2/page_builder.h" +#include "olap/rowset/segment_v2/page_decoder.h" +#include "olap/rowset/segment_v2/frame_of_reference_page.h" +#include "util/arena.h" +#include "util/logging.h" + +using doris::segment_v2::PageBuilderOptions; +using doris::segment_v2::PageDecoderOptions; + +namespace doris { +class FrameOfReferencePageTest : public testing::Test { +public: + template + void copy_one(PageDecoderType* decoder, typename TypeTraits::CppType* ret) { + Arena arena; + uint8_t null_bitmap = 0; + ColumnBlock block(get_type_info(type), (uint8_t*)ret, &null_bitmap, &arena); + ColumnBlockView column_block_view(&block); + + size_t n = 1; + decoder->next_batch(&n, &column_block_view); + ASSERT_EQ(1, n); + } + + template + void test_encode_decode_page_template(typename TypeTraits::CppType* src, + size_t size) { + typedef typename TypeTraits::CppType CppType; + PageBuilderOptions builder_options; + builder_options.data_page_size = 256 * 1024; + PageBuilderType for_page_builder(builder_options); + for_page_builder.add(reinterpret_cast(src), &size); + Slice s = for_page_builder.finish(); + ASSERT_EQ(size, for_page_builder.count()); + LOG(INFO) << "FrameOfReference Encoded size for 10k values: " << s.size + << ", original size:" << size * sizeof(CppType); + + PageDecoderOptions decoder_options; + PageDecoderType for_page_decoder(s, decoder_options); + Status status = for_page_decoder.init(); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(0, for_page_decoder.current_index()); + ASSERT_EQ(size, for_page_decoder.count()); + + Arena arena; + CppType* values = reinterpret_cast(arena.Allocate(size * sizeof(CppType))); + uint8_t* null_bitmap = reinterpret_cast(arena.Allocate(BitmapSize(size))); + ColumnBlock block(get_type_info(Type), (uint8_t*)values, null_bitmap, &arena); + ColumnBlockView column_block_view(&block); + size_t size_to_fetch = size; + status = for_page_decoder.next_batch(&size_to_fetch, &column_block_view); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(size, size_to_fetch); + + for (uint i = 0; i < size; i++) { + if (src[i] != values[i]) { + FAIL() << "Fail at index " << i << + " inserted=" << src[i] << " got=" << values[i]; + } + } + + // Test Seek within block by ordinal + for (int i = 0; i < 100; i++) { + int seek_off = random() % size; + for_page_decoder.seek_to_position_in_page(seek_off); + EXPECT_EQ((int32_t )(seek_off), for_page_decoder.current_index()); + CppType ret; + copy_one(&for_page_decoder, &ret); + EXPECT_EQ(values[seek_off], ret); + } + } +}; + +TEST_F(FrameOfReferencePageTest, TestInt32BlockEncoderRandom) { + const uint32_t size = 10000; + + std::unique_ptr ints(new int32_t[size]); + for (int i = 0; i < size; i++) { + ints.get()[i] = random(); + } + + test_encode_decode_page_template, + segment_v2::FrameOfReferencePageDecoder >(ints.get(), size); +} + +TEST_F(FrameOfReferencePageTest, TestInt32BlockEncoderEqual) { + const uint32_t size = 10000; + + std::unique_ptr ints(new int32_t[size]); + for (int i = 0; i < size; i++) { + ints.get()[i] = 12345; + } + + test_encode_decode_page_template, + segment_v2::FrameOfReferencePageDecoder >(ints.get(), size); +} + +TEST_F(FrameOfReferencePageTest, TestInt32BlockEncoderSequence) { + const uint32_t size = 10000; + + std::unique_ptr ints(new int32_t[size]); + for (int i = 0; i < size; i++) { + ints.get()[i] = 12345 + i; + } + + test_encode_decode_page_template, + segment_v2::FrameOfReferencePageDecoder >(ints.get(), size); +} + +TEST_F(FrameOfReferencePageTest, TestInt64BlockEncoderSequence) { + const uint32_t size = 10000; + + std::unique_ptr ints(new int64_t[size]); + for (int i = 0; i < size; i++) { + ints.get()[i] = 21474836478 + i; + } + + test_encode_decode_page_template, + segment_v2::FrameOfReferencePageDecoder >(ints.get(), size); +} + +TEST_F(FrameOfReferencePageTest, TestInt32SequenceBlockEncoderSize) { + size_t size = 128; + std::unique_ptr ints(new int32_t[size]); + for (int i = 0; i < size; i++) { + ints.get()[i] = i; + } + PageBuilderOptions builder_options; + builder_options.data_page_size = 256 * 1024; + segment_v2::FrameOfReferencePageBuilder page_builder(builder_options); + page_builder.add(reinterpret_cast(ints.get()), &size); + Slice s = page_builder.finish(); + // body: 4 bytes min value + 128 * 1 /8 packing value = 20 + // header: 1 + 1 + 4 = 6 + ASSERT_EQ(26, s.size); +} + +TEST_F(FrameOfReferencePageTest, TestInt32NormalBlockEncoderSize) { + size_t size = 128; + std::unique_ptr ints(new int32_t[size]); + for (int i = 0; i < size; i++) { + ints.get()[i] = 128 - i; + } + PageBuilderOptions builder_options; + builder_options.data_page_size = 256 * 1024; + segment_v2::FrameOfReferencePageBuilder page_builder(builder_options); + page_builder.add(reinterpret_cast(ints.get()), &size); + Slice s = page_builder.finish(); + // body: 4 bytes min value + 128 * 7 /8 packing value = 116 + // header: 1 + 1 + 4 = 6 + ASSERT_EQ(122, s.size); +} + +} + +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/be/test/util/CMakeLists.txt b/be/test/util/CMakeLists.txt index ff910ddfa2..c9cfb5eb0b 100644 --- a/be/test/util/CMakeLists.txt +++ b/be/test/util/CMakeLists.txt @@ -48,3 +48,5 @@ ADD_BE_TEST(block_compression_test) ADD_BE_TEST(arrow/arrow_row_block_test) ADD_BE_TEST(arrow/arrow_row_batch_test) ADD_BE_TEST(counter_cond_variable_test) +ADD_BE_TEST(frame_of_reference_coding_test) +ADD_BE_TEST(bit_stream_utils_test) diff --git a/be/test/util/bit_stream_utils_test.cpp b/be/test/util/bit_stream_utils_test.cpp new file mode 100644 index 0000000000..9d979b53d5 --- /dev/null +++ b/be/test/util/bit_stream_utils_test.cpp @@ -0,0 +1,231 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include +#include +#include +#include +#include + +// Must come before gtest.h. +#include +#include +#include + +#include "util/bit_stream_utils.h" +#include "util/bit_stream_utils.inline.h" +#include "util/bit_util.h" +#include "util/faststring.h" +#include "util/debug_util.h" + +using std::string; +using std::vector; + +namespace doris { + +const int kMaxWidth = 64; +class TestBitStreamUtil : public testing::Test {}; + +TEST(TestBitStreamUtil, TestBool) { + const int len_bytes = 2; + faststring buffer(len_bytes); + + BitWriter writer(&buffer); + + // Write alternating 0's and 1's + for (int i = 0; i < 8; ++i) { + writer.PutValue(i % 2, 1); + } + writer.Flush(); + EXPECT_EQ(buffer[0], BOOST_BINARY(1 0 1 0 1 0 1 0)); + + // Write 00110011 + for (int i = 0; i < 8; ++i) { + switch (i) { + case 0: + case 1: + case 4: + case 5: + writer.PutValue(0, 1); + break; + default: + writer.PutValue(1, 1); + break; + } + } + writer.Flush(); + + // Validate the exact bit value + EXPECT_EQ(buffer[0], BOOST_BINARY(1 0 1 0 1 0 1 0)); + EXPECT_EQ(buffer[1], BOOST_BINARY(1 1 0 0 1 1 0 0)); + + // Use the reader and validate + BitReader reader(buffer.data(), buffer.size()); + for (int i = 0; i < 8; ++i) { + bool val = false; + bool result = reader.GetValue(1, &val); + EXPECT_TRUE(result); + EXPECT_EQ(val, i % 2); + } + + for (int i = 0; i < 8; ++i) { + bool val = false; + bool result = reader.GetValue(1, &val); + EXPECT_TRUE(result); + switch (i) { + case 0: + case 1: + case 4: + case 5: + EXPECT_EQ(val, false); + break; + default: + EXPECT_EQ(val, true); + break; + } + } +} + +// Writes 'num_vals' values with width 'bit_width' and reads them back. +void TestBitArrayValues(int bit_width, int num_vals) { + const int kTestLen = BitUtil::Ceil(bit_width * num_vals, 8); + const uint64_t mod = bit_width == 64? 1 : 1LL << bit_width; + + faststring buffer(kTestLen); + BitWriter writer(&buffer); + for (int i = 0; i < num_vals; ++i) { + writer.PutValue(i % mod, bit_width); + } + writer.Flush(); + EXPECT_EQ(writer.bytes_written(), kTestLen); + + BitReader reader(buffer.data(), kTestLen); + for (int i = 0; i < num_vals; ++i) { + int64_t val = 0; + bool result = reader.GetValue(bit_width, &val); + EXPECT_TRUE(result); + EXPECT_EQ(val, i % mod); + } + EXPECT_EQ(reader.bytes_left(), 0); +} + +TEST(TestBitStreamUtil, TestValues) { + for (int width = 1; width <= kMaxWidth; ++width) { + TestBitArrayValues(width, 1); + TestBitArrayValues(width, 2); + // Don't write too many values + TestBitArrayValues(width, (width < 12) ? (1 << width) : 4096); + TestBitArrayValues(width, 1024); + } +} + +// Test some mixed values +TEST(TestBitStreamUtil, TestMixed) { + const int kTestLenBits = 1024; + faststring buffer(kTestLenBits / 8); + bool parity = true; + + BitWriter writer(&buffer); + for (int i = 0; i < kTestLenBits; ++i) { + if (i % 2 == 0) { + writer.PutValue(parity, 1); + parity = !parity; + } else { + writer.PutValue(i, 10); + } + } + writer.Flush(); + + parity = true; + BitReader reader(buffer.data(), buffer.size()); + for (int i = 0; i < kTestLenBits; ++i) { + bool result; + if (i % 2 == 0) { + bool val = false; + result = reader.GetValue(1, &val); + EXPECT_EQ(val, parity); + parity = !parity; + } else { + int val; + result = reader.GetValue(10, &val); + EXPECT_EQ(val, i); + } + EXPECT_TRUE(result); + } +} + +TEST(TestBitStreamUtil, TestSeekToBit) { + faststring buffer(1); + + BitWriter writer(&buffer); + writer.PutValue(2019,32); + writer.PutValue(2020,32); + writer.PutValue(2021,32); + writer.Flush(); + + BitReader reader(buffer.data(), buffer.size()); + reader.SeekToBit(buffer.size() * 8 - 8 * 8); + uint32_t second_value; + reader.GetValue(32, &second_value); + ASSERT_EQ(second_value,2020); + + uint32_t third_value; + reader.GetValue(32, &third_value); + ASSERT_EQ(third_value, 2021); + + reader.SeekToBit(0); + uint32_t first_value; + reader.GetValue(32, &first_value); + ASSERT_EQ(first_value, 2019); +} + +TEST(TestBitStreamUtil, TestUint64) { + faststring buffer(1); + BitWriter writer(&buffer); + writer.PutValue(18446744073709551614U, 64); + writer.PutValue(18446744073709551613U, 64); + writer.PutValue(128, 32); + writer.PutValue(126, 16); + writer.Flush(); + + BitReader reader(buffer.data(), buffer.size()); + + uint64_t v1; + reader.GetValue(64, &v1); + ASSERT_EQ(v1, 18446744073709551614U); + + uint64_t v2; + reader.GetValue(64, &v2); + ASSERT_EQ(v2, 18446744073709551613U); + + uint64_t v3; + reader.GetValue(32, &v3); + ASSERT_EQ(v3, 128); + + uint64_t v4; + reader.GetValue(16, &v4); + ASSERT_EQ(v4, 126); +} +} // namespace doris + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/be/test/util/frame_of_reference_coding_test.cpp b/be/test/util/frame_of_reference_coding_test.cpp new file mode 100644 index 0000000000..2a7bca8b27 --- /dev/null +++ b/be/test/util/frame_of_reference_coding_test.cpp @@ -0,0 +1,166 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include "util/frame_of_reference_coding.h" + +namespace doris { +class TestForCoding : public testing::Test { +public: + static void test_frame_of_reference_encode_decode(int32_t element_size) { + faststring buffer(1); + ForEncoder encoder(&buffer); + + std::vector data; + for (int32_t i = 0; i < element_size; ++i) { + data.push_back(i); + } + encoder.put_batch(data.data(), element_size); + encoder.flush(); + + ForDecoder decoder(buffer.data(), buffer.length()); + decoder.init(); + std::vector actual_result(element_size); + decoder.get_batch(actual_result.data(), element_size); + + ASSERT_EQ(data, actual_result); + } + + static void test_skip(int32_t skip_num) { + faststring buffer(1); + ForEncoder encoder(&buffer); + + std::vector input_data; + std::vector expect_result; + for (uint32_t i = 0; i < 256; ++i) { + input_data.push_back(i); + if (i >= skip_num) { + expect_result.push_back(i); + } + } + encoder.put_batch(input_data.data(), 256); + encoder.flush(); + + ForDecoder decoder(buffer.data(), buffer.length()); + decoder.init(); + decoder.skip(skip_num); + + std::vector actual_result(256 - skip_num); + decoder.get_batch(actual_result.data(), 256 - skip_num); + + ASSERT_EQ(expect_result, actual_result); + } +}; + + +TEST_F(TestForCoding, TestHalfFrame) { + test_frame_of_reference_encode_decode(64); +} + +TEST_F(TestForCoding, TestOneFrame) { + test_frame_of_reference_encode_decode(128); +} + +TEST_F(TestForCoding, TestTwoFrame) { + test_frame_of_reference_encode_decode(256); +} + +TEST_F(TestForCoding, TestTwoHlafFrame) { + test_frame_of_reference_encode_decode(320); +} + +TEST_F(TestForCoding, TestSkipZero) { + test_skip(0); +} + +TEST_F(TestForCoding, TestSkipHalfFrame) { + test_skip(64); +} + +TEST_F(TestForCoding, TestSkipOneFrame) { + test_skip(128); +} + +TEST_F(TestForCoding, TestInt64) { + faststring buffer(1); + ForEncoder encoder(&buffer); + + std::vector data; + for (int64_t i = 0; i < 320; ++i) { + data.push_back(i); + } + encoder.put_batch(data.data(), 320); + encoder.flush(); + + ForDecoder decoder(buffer.data(), buffer.length()); + decoder.init(); + std::vector actual_result(320); + decoder.get_batch(actual_result.data(), 320); + + ASSERT_EQ(data, actual_result); +} + +TEST_F(TestForCoding, TestOneMinValue) { + faststring buffer(1); + ForEncoder encoder(&buffer); + encoder.put(2019); + encoder.flush(); + + ForDecoder decoder(buffer.data(), buffer.length()); + decoder.init(); + int32_t actual_value; + decoder.get(&actual_value); + ASSERT_EQ(2019, actual_value); +} + +TEST_F(TestForCoding, TestZeroValue) { + faststring buffer(1); + ForEncoder encoder(&buffer); + encoder.flush(); + + ASSERT_EQ(buffer.length(), 4 + 1); + + ForDecoder decoder(buffer.data(), buffer.length()); + decoder.init(); + int32_t actual_value; + bool result = decoder.get(&actual_value); + ASSERT_EQ(result, false); +} + +TEST_F(TestForCoding, TestBytesAlign) { + faststring buffer(1); + ForEncoder encoder(&buffer); + encoder.put(2019); + encoder.put(2020); + encoder.flush(); + + ForDecoder decoder(buffer.data(), buffer.length()); + decoder.init(); + + int32_t actual_value; + decoder.get(&actual_value); + ASSERT_EQ(2019, actual_value); + decoder.get(&actual_value); + ASSERT_EQ(2020, actual_value); +} + +} + +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} \ No newline at end of file diff --git a/be/test/util/rle_encoding_test.cpp b/be/test/util/rle_encoding_test.cpp index b40ef074f8..50c92c707d 100644 --- a/be/test/util/rle_encoding_test.cpp +++ b/be/test/util/rle_encoding_test.cpp @@ -43,143 +43,7 @@ namespace doris { const int kMaxWidth = 64; -class TestRle : public testing::Test { -public: - TestRle() { - } - - virtual ~TestRle() { - } -}; - -TEST(BitArray, TestBool) { - const int len_bytes = 2; - faststring buffer(len_bytes); - - BitWriter writer(&buffer); - - // Write alternating 0's and 1's - for (int i = 0; i < 8; ++i) { - writer.PutValue(i % 2, 1); - } - writer.Flush(); - EXPECT_EQ(buffer[0], BOOST_BINARY(1 0 1 0 1 0 1 0)); - - // Write 00110011 - for (int i = 0; i < 8; ++i) { - switch (i) { - case 0: - case 1: - case 4: - case 5: - writer.PutValue(0, 1); - break; - default: - writer.PutValue(1, 1); - break; - } - } - writer.Flush(); - - // Validate the exact bit value - EXPECT_EQ(buffer[0], BOOST_BINARY(1 0 1 0 1 0 1 0)); - EXPECT_EQ(buffer[1], BOOST_BINARY(1 1 0 0 1 1 0 0)); - - // Use the reader and validate - BitReader reader(buffer.data(), buffer.size()); - for (int i = 0; i < 8; ++i) { - bool val = false; - bool result = reader.GetValue(1, &val); - EXPECT_TRUE(result); - EXPECT_EQ(val, i % 2); - } - - for (int i = 0; i < 8; ++i) { - bool val = false; - bool result = reader.GetValue(1, &val); - EXPECT_TRUE(result); - switch (i) { - case 0: - case 1: - case 4: - case 5: - EXPECT_EQ(val, false); - break; - default: - EXPECT_EQ(val, true); - break; - } - } -} - -// Writes 'num_vals' values with width 'bit_width' and reads them back. -void TestBitArrayValues(int bit_width, int num_vals) { - const int kTestLen = BitUtil::Ceil(bit_width * num_vals, 8); - const uint64_t mod = bit_width == 64? 1 : 1LL << bit_width; - - faststring buffer(kTestLen); - BitWriter writer(&buffer); - for (int i = 0; i < num_vals; ++i) { - writer.PutValue(i % mod, bit_width); - } - writer.Flush(); - EXPECT_EQ(writer.bytes_written(), kTestLen); - - BitReader reader(buffer.data(), kTestLen); - for (int i = 0; i < num_vals; ++i) { - int64_t val = 0; - bool result = reader.GetValue(bit_width, &val); - EXPECT_TRUE(result); - EXPECT_EQ(val, i % mod); - } - EXPECT_EQ(reader.bytes_left(), 0); -} - -TEST(BitArray, TestValues) { - for (int width = 1; width <= kMaxWidth; ++width) { - TestBitArrayValues(width, 1); - TestBitArrayValues(width, 2); - // Don't write too many values - TestBitArrayValues(width, (width < 12) ? (1 << width) : 4096); - TestBitArrayValues(width, 1024); - } -} - -// Test some mixed values -TEST(BitArray, TestMixed) { - const int kTestLenBits = 1024; - faststring buffer(kTestLenBits / 8); - bool parity = true; - - BitWriter writer(&buffer); - for (int i = 0; i < kTestLenBits; ++i) { - if (i % 2 == 0) { - writer.PutValue(parity, 1); - parity = !parity; - } else { - writer.PutValue(i, 10); - } - } - writer.Flush(); - - parity = true; - BitReader reader(buffer.data(), buffer.size()); - for (int i = 0; i < kTestLenBits; ++i) { - bool result; - if (i % 2 == 0) { - bool val = false; - result = reader.GetValue(1, &val); - EXPECT_EQ(val, parity); - parity = !parity; - } else { - int val; - result = reader.GetValue(10, &val); - EXPECT_EQ(val, i); - } - EXPECT_TRUE(result); - } -} - +class TestRle : public testing::Test {}; // Validates encoding of values by encoding and decoding them. If // expected_encoding != NULL, also validates that the encoded buffer is // exactly 'expected_encoding'. diff --git a/run-ut.sh b/run-ut.sh index 95a9456e65..349fe8fdbb 100755 --- a/run-ut.sh +++ b/run-ut.sh @@ -159,6 +159,8 @@ ${DORIS_TEST_BINARY_DIR}/util/block_compression_test ${DORIS_TEST_BINARY_DIR}/util/arrow/arrow_row_block_test ${DORIS_TEST_BINARY_DIR}/util/arrow/arrow_row_batch_test ${DORIS_TEST_BINARY_DIR}/util/counter_cond_variable_test +${DORIS_TEST_BINARY_DIR}/util/bit_stream_utils_test +${DORIS_TEST_BINARY_DIR}/util/frame_of_reference_coding_test # Running common Unittest ${DORIS_TEST_BINARY_DIR}/common/resource_tls_test @@ -267,6 +269,7 @@ ${DORIS_TEST_BINARY_DIR}/olap/rowset/segment_v2/segment_test ${DORIS_TEST_BINARY_DIR}/olap/rowset/segment_v2/page_compression_test ${DORIS_TEST_BINARY_DIR}/olap/rowset/segment_v2/column_zone_map_test ${DORIS_TEST_BINARY_DIR}/olap/rowset/segment_v2/row_ranges_test +${DORIS_TEST_BINARY_DIR}/olap/rowset/segment_v2/frame_of_reference_page_test ${DORIS_TEST_BINARY_DIR}/olap/txn_manager_test ${DORIS_TEST_BINARY_DIR}/olap/storage_types_test ${DORIS_TEST_BINARY_DIR}/olap/generic_iterators_test