Add frame_of_reference page (#1818)

This commit is contained in:
kangkaisen
2019-09-28 01:10:29 +08:00
committed by ZHAO Chun
parent e67b398916
commit 0c22d8fa08
12 changed files with 1263 additions and 139 deletions

View File

@ -0,0 +1,162 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "olap/rowset/segment_v2/page_builder.h" // for PageBuilder
#include "olap/rowset/segment_v2/page_decoder.h" // for PageDecoder
#include "olap/rowset/segment_v2/options.h" // for PageBuilderOptions/PageDecoderOptions
#include "olap/rowset/segment_v2/common.h" // for rowid_t
#include "util/frame_of_reference_coding.h"
namespace doris {
namespace segment_v2 {
// Encode page use frame-of-reference coding
template<FieldType Type>
class FrameOfReferencePageBuilder : public PageBuilder {
public:
explicit FrameOfReferencePageBuilder(const PageBuilderOptions& options) :
_options(options),
_count(0),
_finished(false) {
_encoder.reset(new ForEncoder<CppType>(&_buf));
}
bool is_page_full() override {
return _encoder->len() >= _options.data_page_size;
}
Status add(const uint8_t* vals, size_t* count) override {
DCHECK(!_finished);
auto new_vals = reinterpret_cast<const CppType*>(vals);
_encoder->put_batch(new_vals, *count);
_count += *count;
return Status::OK();
}
Slice finish() override {
_finished = true;
_encoder->flush();
return Slice(_buf.data(), _buf.size());
}
void reset() override {
_count = 0;
_finished = false;
_encoder->clear();
}
size_t count() const override {
return _count;
}
uint64_t size() const override {
return _buf.size();
}
// this api will release the memory ownership of encoded data
// Note:
// release() should be called after finish
// reset() should be called after this function before reuse the builder
void release() override {
uint8_t* ret = _buf.release();
(void)ret;
}
private:
typedef typename TypeTraits<Type>::CppType CppType;
PageBuilderOptions _options;
size_t _count;
bool _finished;
std::unique_ptr<ForEncoder<CppType>> _encoder;
faststring _buf;
};
template<FieldType Type>
class FrameOfReferencePageDecoder : public PageDecoder {
public:
FrameOfReferencePageDecoder(Slice slice, const PageDecoderOptions& options) :
_parsed(false),
_data(slice),
_num_elements(0),
_cur_index(0){
_decoder.reset(new ForDecoder<CppType>((uint8_t*)_data.data, _data.size));
}
Status init() override {
CHECK(!_parsed);
bool result = _decoder->init();
if (result) {
_num_elements = _decoder->count();
_parsed = true;
return Status::OK();
} else {
return Status::Corruption("The frame of reference page metadata maybe broken");
}
}
Status seek_to_position_in_page(size_t pos) override {
DCHECK(_parsed) << "Must call init() firstly";
DCHECK_LE(pos, _num_elements) << "Tried to seek to " << pos << " which is > number of elements ("
<< _num_elements << ") in the block!";
// If the block is empty (e.g. the column is filled with nulls), there is no data to seek.
if (PREDICT_FALSE(_num_elements == 0)) {
return Status::OK();
}
int32_t skip_num = pos - _cur_index;
_decoder->skip(skip_num);
_cur_index = pos;
return Status::OK();
}
Status next_batch(size_t* n, ColumnBlockView* dst) override {
DCHECK(_parsed) << "Must call init() firstly";
if (PREDICT_FALSE(*n == 0 || _cur_index >= _num_elements)) {
*n = 0;
return Status::OK();
}
size_t to_fetch = std::min(*n, static_cast<size_t>(_num_elements - _cur_index));
uint8_t* data_ptr = dst->data();
_decoder->get_batch(reinterpret_cast<CppType*>(data_ptr), to_fetch);
_cur_index += to_fetch;
*n = to_fetch;
return Status::OK();
}
size_t count() const override {
return _num_elements;
}
size_t current_index() const override {
return _cur_index;
}
private:
typedef typename TypeTraits<Type>::CppType CppType;
bool _parsed;
Slice _data;
uint32_t _num_elements;
size_t _cur_index;
std::unique_ptr<ForDecoder<CppType>> _decoder;
};
} // namespace segment_v2
} // namespace doris

View File

@ -81,6 +81,7 @@ set(UTIL_FILES
thrift_rpc_helper.cpp
faststring.cc
slice.cpp
frame_of_reference_coding.cpp
)
if (WITH_MYSQL)

View File

@ -49,8 +49,7 @@ class BitWriter {
int bytes_written() const { return byte_offset_ + BitUtil::Ceil(bit_offset_, 8); }
// Writes a value to buffered_values_, flushing to buffer_ if necessary. This is bit
// packed. num_bits must be <= 32. If 'v' is larger than 'num_bits' bits, the higher
// bits are ignored.
// packed.
void PutValue(uint64_t v, int num_bits);
// Writes v to the next aligned byte using num_bits. If T is larger than num_bits, the

View File

@ -0,0 +1,366 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "util/frame_of_reference_coding.h"
#include <algorithm>
#include <cstring>
#include "util/bit_util.h"
#include "util/coding.h"
namespace doris {
static inline uint8_t bits(const uint64_t v) {
return v == 0 ? 0 : 64 - __builtin_clzll(v);
}
template<typename T>
const T* ForEncoder<T>::copy_value(const T *p_data, size_t count) {
memcpy(&_buffered_values[_buffered_values_num], p_data, count * sizeof(T));
_buffered_values_num += count;
p_data += count;
return p_data;
}
template<typename T>
void ForEncoder<T>::put_batch(const T *in_data, size_t count) {
if (_buffered_values_num + count < FRAME_VALUE_NUM) {
copy_value(in_data, count);
_values_num += count;
return;
}
// 1. padding one frame
size_t padding_num = FRAME_VALUE_NUM - _buffered_values_num;
in_data = copy_value(in_data, padding_num);
bit_packing_one_frame_value(_buffered_values);
// 2. process frame by frame
size_t frame_size = (count - padding_num) / FRAME_VALUE_NUM;
for (size_t i = 0; i < frame_size; i ++) {
// directly encode value to the bit_writer, don't buffer the value
_buffered_values_num = FRAME_VALUE_NUM;
bit_packing_one_frame_value(in_data);
in_data += FRAME_VALUE_NUM;
}
// 3. process remaining value
size_t remaining_num = (count - padding_num) % FRAME_VALUE_NUM;
if (remaining_num > 0) {
copy_value(in_data, remaining_num);
}
_values_num += count;
}
// todo(kks): improve this method by SIMD instructions
// Use as few bit as possible to store a piece of integer data.
// param[in] input: the integer list need to pack
// param[in] in_num: the number integer need to pack
// param[in] bit_width: how many bit we use to store each integer data
// param[out] out: the packed result
// For example:
// The input is int32 list: 1, 2, 4, 8 and bit_width is 4
// The output will be: 0001 0010 0100 1000
template<typename T>
void ForEncoder<T>::bit_pack(T *input, uint8_t in_num, int bit_width, uint8_t *output) {
if (in_num == 0 || bit_width == 0) {
return;
}
T in_mask = 0;
int bit_index = 0;
*output = 0;
for (int i = 0; i < in_num; i++) {
in_mask = 1 << (bit_width - 1);
for (int k = 0; k < bit_width; k++) {
if (bit_index > 7) {
bit_index = 0;
output++;
*output = 0;
}
*output |= (((input[i] & in_mask) >> (bit_width - k - 1)) << (7 - bit_index));
in_mask >>= 1;
bit_index++;
}
}
}
template<typename T>
void ForEncoder<T>::bit_packing_one_frame_value(const T* input) {
T min = input[0];
T max = input[0];
bool is_ascending = true;
uint8_t bit_width = 0;
for (uint8_t i = 1; i < _buffered_values_num; ++i) {
if (is_ascending) {
if (input[i] < input[i - 1]) {
is_ascending = false;
} else {
bit_width = std::max(bit_width, bits(input[i]- input[i - 1])) ;
}
}
if (input[i] < min) {
min = input[i];
continue;
}
if (input[i] > max) {
max = input[i];
}
}
if (sizeof(T) == 8) {
put_fixed64_le(_buffer, min);
} else {
put_fixed32_le(_buffer, min);
}
// improve for ascending order input, we could use fewer bit
T delta_values[FRAME_VALUE_NUM];
u_int8_t order_flag = 0;
if (is_ascending) {
delta_values[0] = 0;
for (uint8_t i = 1; i < _buffered_values_num; ++i) {
delta_values[i] = input[i] - input[i - 1];
}
order_flag = 1;
} else {
bit_width = bits(static_cast<T>(max - min));
for (uint8_t i = 0; i < _buffered_values_num; ++i) {
delta_values[i] = input[i] - min;
}
}
// 2 bit order_flag + 6 bit bit_width
uint8_t order_flag_and_bit_width = order_flag << 6 | bit_width;
_order_flag_and_bit_widths.push_back(order_flag_and_bit_width);
uint32_t packing_len = BitUtil::Ceil(_buffered_values_num * bit_width, 8);
_buffer->reserve(_buffer->size() + packing_len);
size_t origin_size = _buffer->size();
_buffer->resize(_buffer->size() + packing_len);
bit_pack(delta_values, _buffered_values_num, bit_width, _buffer->data() + origin_size);
_buffered_values_num = 0;
}
template<typename T>
uint32_t ForEncoder<T>::flush() {
if (_buffered_values_num != 0) {
bit_packing_one_frame_value(_buffered_values);
}
// write the footer:
// 1 order_flags and bit_widths
for (auto value: _order_flag_and_bit_widths) {
_buffer->append(&value, 1);
}
// 2 frame_value_num and values_num
uint8_t frame_value_num = FRAME_VALUE_NUM;
_buffer->append(&frame_value_num, 1);
put_fixed32_le(_buffer, _values_num);
return _buffer->size();
}
template<typename T>
bool ForDecoder<T>::init() {
// When row count is zero, the minimum footer size is 5:
// only has ValuesNum(4) + FrameValueNum(1)
if (_buffer_len < 5) {
return false;
}
_frame_value_num = decode_fixed32_le(_buffer + _buffer_len - 5);
_values_num = decode_fixed32_le(_buffer + _buffer_len - 4);
_frame_count = _values_num / _frame_value_num + (_values_num % _frame_value_num != 0);
size_t bit_width_offset = _buffer_len - 5 - _frame_count;
if (bit_width_offset < 0) {
return false;
}
// read order_flags, bit_widths and compute frame_offsets
uint8_t mask = (1 << 6) - 1;
u_int32_t frame_start_offset = 0;
for (uint32_t i = 0; i < _frame_count; i++ ) {
uint32_t order_flag_and_bit_width = decode_fixed8(_buffer + bit_width_offset);
uint8_t bit_width = order_flag_and_bit_width & mask;
uint8_t order_flag = order_flag_and_bit_width >> 6;
_bit_widths.push_back(bit_width);
_order_flags.push_back(order_flag);
bit_width_offset += 1;
_frame_offsets.push_back(frame_start_offset);
if (sizeof(T) == 8) {
frame_start_offset += bit_width * _frame_value_num / 8 + 8;
} else {
frame_start_offset += bit_width * _frame_value_num / 8 + 4;
}
}
_out_buffer.reserve(_frame_value_num);
return true;
}
// todo(kks): improve this method by SIMD instructions
// The reverse of bit_pack method, get original integer data list from packed bits
// param[in] input: the packed bits need to unpack
// param[in] in_num: the integer number in packed bits
// param[in] bit_width: how many bit we used to store each integer data
// param[out] output: the original integer data list
template<typename T>
void ForDecoder<T>::bit_unpack(const uint8_t *input, uint8_t in_num, int bit_width, T *output) {
unsigned char in_mask = 0x80;
int bit_index = 0;
while (in_num > 0) {
*output = 0;
for (int i = 0; i < bit_width; i++) {
if (bit_index > 7) {
input++;
bit_index = 0;
}
*output |= ((T)((*input & (in_mask >> bit_index)) >> (7 - bit_index))) << (bit_width - i - 1);
bit_index++;
}
output++;
in_num--;
}
}
template<typename T>
void ForDecoder<T>::decode_current_frame(T* output) {
_current_decoded_frame = _current_index / _frame_value_num;
uint8_t frame_value_num = _frame_value_num;
// compute last frame value num
if (_current_decoded_frame == _frame_count - 1 && _values_num % _frame_value_num != 0) {
frame_value_num = _values_num % _frame_value_num;;
}
uint32_t base_offset = _frame_offsets[_current_decoded_frame];
T min = 0;
uint32_t delta_offset = 0;
if (sizeof(T) == 8) {
min = decode_fixed64_le(_buffer + base_offset);
delta_offset = base_offset + 8;
} else {
min = decode_fixed32_le(_buffer + base_offset);
delta_offset = base_offset + 4;
}
uint8_t bit_width = _bit_widths[_current_decoded_frame];
bool is_ascending = _order_flags[_current_decoded_frame];
std::vector<T> delta_values(_frame_value_num);
bit_unpack(_buffer + delta_offset, frame_value_num, bit_width, delta_values.data());
if (is_ascending) {
T pre_value = min;
for (uint8_t i = 0; i < frame_value_num; i ++) {
T value = delta_values[i] + pre_value;
output[i] = value;
pre_value = value;
}
} else {
for (uint8_t i = 0; i < frame_value_num; i ++) {
output[i] = delta_values[i] + min;
}
}
}
template<typename T>
T* ForDecoder<T>::copy_value(T* val, size_t count) {
memcpy(val, &_out_buffer[_current_index % _frame_value_num], sizeof(T) * count);
_current_index += count;
val += count;
return val;
}
template<typename T>
bool ForDecoder<T>::get_batch(T* val, size_t count) {
if (_current_index + count > _values_num) {
return false;
}
if (need_decode_frame()) {
decode_current_frame(_out_buffer.data());
}
if (_current_index + count < _frame_value_num * (_current_decoded_frame + 1)) {
copy_value(val, count);
return true;
}
// 1. padding one frame
size_t padding_num = _frame_value_num * (_current_decoded_frame + 1) - _current_index;
val = copy_value(val, padding_num);
// 2. process frame by frame
size_t frame_size = (count - padding_num) / _frame_value_num;
for (size_t i = 0; i < frame_size; i++) {
// directly decode value to the output, don't buffer the value
decode_current_frame(val);
_current_index += _frame_value_num;
val += _frame_value_num;
}
// 3. process remaining value
size_t remaining_num = (count - padding_num) % _frame_value_num;
if (remaining_num > 0) {
decode_current_frame(_out_buffer.data());
val = copy_value(val, remaining_num);
}
return true;
}
template<typename T>
bool ForDecoder<T>::skip(int32_t skip_num) {
if (_current_index + skip_num >= _values_num || _current_index + skip_num < 0) {
return false;
}
_current_index = _current_index + skip_num;
return true;
}
template class ForEncoder<int8_t>;
template class ForEncoder<int16_t>;
template class ForEncoder<int32_t>;
template class ForEncoder<int64_t>;
template class ForEncoder<uint8_t>;
template class ForEncoder<uint16_t>;
template class ForEncoder<uint32_t>;
template class ForEncoder<uint64_t>;
template class ForDecoder<int8_t>;
template class ForDecoder<int16_t>;
template class ForDecoder<int32_t>;
template class ForDecoder<int64_t>;
template class ForDecoder<uint8_t>;
template class ForDecoder<uint16_t>;
template class ForDecoder<uint32_t>;
template class ForDecoder<uint64_t>;
}

View File

@ -0,0 +1,149 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#ifndef DORIS_FRAME_OF_REFERENCE_CODING_H
#define DORIS_FRAME_OF_REFERENCE_CODING_H
#include <cstdlib>
#include <iostream>
#include "util/bit_stream_utils.h"
#include "util/bit_stream_utils.inline.h"
#include "util/faststring.h"
namespace doris {
// The implementation for frame-of-reference coding
// The detail of frame-of-reference coding, please refer to
// https://lemire.me/blog/2012/02/08/effective-compression-using-frame-of-reference-and-delta-coding/
// and https://www.elastic.co/cn/blog/frame-of-reference-and-roaring-bitmaps
//
// The encoded data format is as follows:
//
// 1. Body:
// BitPackingFrame * FrameCount
// 2. Footer:
// (2 bit OrderFlag + 6 bit BitWidth) * FrameCount
// 8 bit FrameValueNum
// 32 bit ValuesNum
//
// The not ascending order BitPackingFrame format:
// MinValue, (Value - MinVale) * FrameValueNum
//
// The ascending order BitPackingFrame format:
// MinValue, (Value[i] - Value[i - 1]) * FrameValueNum
//
// The OrderFlag is 1 represents ascending order, 0 represents not ascending order
// The last frame value num maybe less than 128
template<typename T>
class ForEncoder {
public:
explicit ForEncoder(faststring* buffer): _buffer(buffer) {}
void put(const T value) {
return put_batch(&value, 1);
}
void put_batch(const T* value, size_t count);
// Flushes any pending values to the underlying buffer.
// Returns the total number of bytes written
uint32_t flush();
// underlying buffer size + footer meta size.
// Note: should call this method before flush.
uint32_t len() {
return _buffer->size() + _order_flag_and_bit_widths.size() + 5;
}
// Resets all the state in the encoder.
void clear() {
_values_num = 0;
_buffered_values_num = 0;
_buffer->clear();
}
private:
void bit_pack(T *input, uint8_t in_num, int bit_width, uint8_t *output);
void bit_packing_one_frame_value(const T* input);
const T* copy_value(const T* val, size_t count);
uint32_t _values_num = 0;
uint8_t _buffered_values_num = 0;
static const uint8_t FRAME_VALUE_NUM = 128;
T _buffered_values[FRAME_VALUE_NUM];
faststring* _buffer;
std::vector<uint8_t> _order_flag_and_bit_widths;
};
template<typename T>
class ForDecoder {
public:
explicit ForDecoder(const uint8_t* in_buffer, size_t buffer_len)
:_buffer (in_buffer),
_buffer_len (buffer_len){}
// read footer metadata
bool init();
bool get(T* val) {
return get_batch(val, 1);
}
// Gets the next batch value. Returns false if there are no more.
bool get_batch(T* val, size_t count);
// The skip_num is positive means move forwards
// The skip_num is negative means move backwards
bool skip(int32_t skip_num);
uint32_t count() const {
return _values_num;
}
private:
void bit_unpack(const uint8_t *input, uint8_t in_num, int bit_width, T *output);
void decode_current_frame(T* output);
T* copy_value(T* val, size_t count);
bool need_decode_frame() {
return !(_frame_value_num * _current_decoded_frame < _current_index
&& _current_index < _frame_value_num * (_current_decoded_frame + 1));
}
uint8_t _frame_value_num = 0;
uint32_t _values_num = 0;
uint32_t _frame_count = 0;
uint32_t _current_index = 0;
uint32_t _current_decoded_frame = -1;
std::vector<T> _out_buffer;
std::vector<uint32_t> _frame_offsets;
std::vector<uint8_t> _bit_widths;
std::vector<uint8_t> _order_flags;
const uint8_t* _buffer;
size_t _buffer_len = 0;
};
}
#endif //DORIS_FRAME_OF_REFERENCE_CODING_H

View File

@ -57,6 +57,7 @@ ADD_BE_TEST(rowset/segment_v2/binary_dict_page_test)
ADD_BE_TEST(rowset/segment_v2/segment_test)
ADD_BE_TEST(rowset/segment_v2/column_zone_map_test)
ADD_BE_TEST(rowset/segment_v2/row_ranges_test)
ADD_BE_TEST(rowset/segment_v2/frame_of_reference_page_test)
ADD_BE_TEST(tablet_meta_manager_test)
ADD_BE_TEST(tablet_mgr_test)
ADD_BE_TEST(rowset/rowset_meta_manager_test)

View File

@ -0,0 +1,180 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <gtest/gtest.h>
#include <memory>
#include "olap/rowset/segment_v2/options.h"
#include "olap/rowset/segment_v2/page_builder.h"
#include "olap/rowset/segment_v2/page_decoder.h"
#include "olap/rowset/segment_v2/frame_of_reference_page.h"
#include "util/arena.h"
#include "util/logging.h"
using doris::segment_v2::PageBuilderOptions;
using doris::segment_v2::PageDecoderOptions;
namespace doris {
class FrameOfReferencePageTest : public testing::Test {
public:
template<FieldType type, class PageDecoderType>
void copy_one(PageDecoderType* decoder, typename TypeTraits<type>::CppType* ret) {
Arena arena;
uint8_t null_bitmap = 0;
ColumnBlock block(get_type_info(type), (uint8_t*)ret, &null_bitmap, &arena);
ColumnBlockView column_block_view(&block);
size_t n = 1;
decoder->next_batch(&n, &column_block_view);
ASSERT_EQ(1, n);
}
template <FieldType Type, class PageBuilderType, class PageDecoderType>
void test_encode_decode_page_template(typename TypeTraits<Type>::CppType* src,
size_t size) {
typedef typename TypeTraits<Type>::CppType CppType;
PageBuilderOptions builder_options;
builder_options.data_page_size = 256 * 1024;
PageBuilderType for_page_builder(builder_options);
for_page_builder.add(reinterpret_cast<const uint8_t *>(src), &size);
Slice s = for_page_builder.finish();
ASSERT_EQ(size, for_page_builder.count());
LOG(INFO) << "FrameOfReference Encoded size for 10k values: " << s.size
<< ", original size:" << size * sizeof(CppType);
PageDecoderOptions decoder_options;
PageDecoderType for_page_decoder(s, decoder_options);
Status status = for_page_decoder.init();
ASSERT_TRUE(status.ok());
ASSERT_EQ(0, for_page_decoder.current_index());
ASSERT_EQ(size, for_page_decoder.count());
Arena arena;
CppType* values = reinterpret_cast<CppType*>(arena.Allocate(size * sizeof(CppType)));
uint8_t* null_bitmap = reinterpret_cast<uint8_t*>(arena.Allocate(BitmapSize(size)));
ColumnBlock block(get_type_info(Type), (uint8_t*)values, null_bitmap, &arena);
ColumnBlockView column_block_view(&block);
size_t size_to_fetch = size;
status = for_page_decoder.next_batch(&size_to_fetch, &column_block_view);
ASSERT_TRUE(status.ok());
ASSERT_EQ(size, size_to_fetch);
for (uint i = 0; i < size; i++) {
if (src[i] != values[i]) {
FAIL() << "Fail at index " << i <<
" inserted=" << src[i] << " got=" << values[i];
}
}
// Test Seek within block by ordinal
for (int i = 0; i < 100; i++) {
int seek_off = random() % size;
for_page_decoder.seek_to_position_in_page(seek_off);
EXPECT_EQ((int32_t )(seek_off), for_page_decoder.current_index());
CppType ret;
copy_one<Type, PageDecoderType>(&for_page_decoder, &ret);
EXPECT_EQ(values[seek_off], ret);
}
}
};
TEST_F(FrameOfReferencePageTest, TestInt32BlockEncoderRandom) {
const uint32_t size = 10000;
std::unique_ptr<int32_t[]> ints(new int32_t[size]);
for (int i = 0; i < size; i++) {
ints.get()[i] = random();
}
test_encode_decode_page_template<OLAP_FIELD_TYPE_INT, segment_v2::FrameOfReferencePageBuilder<OLAP_FIELD_TYPE_INT>,
segment_v2::FrameOfReferencePageDecoder<OLAP_FIELD_TYPE_INT> >(ints.get(), size);
}
TEST_F(FrameOfReferencePageTest, TestInt32BlockEncoderEqual) {
const uint32_t size = 10000;
std::unique_ptr<int32_t[]> ints(new int32_t[size]);
for (int i = 0; i < size; i++) {
ints.get()[i] = 12345;
}
test_encode_decode_page_template<OLAP_FIELD_TYPE_INT, segment_v2::FrameOfReferencePageBuilder<OLAP_FIELD_TYPE_INT>,
segment_v2::FrameOfReferencePageDecoder<OLAP_FIELD_TYPE_INT> >(ints.get(), size);
}
TEST_F(FrameOfReferencePageTest, TestInt32BlockEncoderSequence) {
const uint32_t size = 10000;
std::unique_ptr<int32_t[]> ints(new int32_t[size]);
for (int i = 0; i < size; i++) {
ints.get()[i] = 12345 + i;
}
test_encode_decode_page_template<OLAP_FIELD_TYPE_INT, segment_v2::FrameOfReferencePageBuilder<OLAP_FIELD_TYPE_INT>,
segment_v2::FrameOfReferencePageDecoder<OLAP_FIELD_TYPE_INT> >(ints.get(), size);
}
TEST_F(FrameOfReferencePageTest, TestInt64BlockEncoderSequence) {
const uint32_t size = 10000;
std::unique_ptr<int64_t[]> ints(new int64_t[size]);
for (int i = 0; i < size; i++) {
ints.get()[i] = 21474836478 + i;
}
test_encode_decode_page_template<OLAP_FIELD_TYPE_BIGINT, segment_v2::FrameOfReferencePageBuilder<OLAP_FIELD_TYPE_BIGINT>,
segment_v2::FrameOfReferencePageDecoder<OLAP_FIELD_TYPE_BIGINT> >(ints.get(), size);
}
TEST_F(FrameOfReferencePageTest, TestInt32SequenceBlockEncoderSize) {
size_t size = 128;
std::unique_ptr<int32_t[]> ints(new int32_t[size]);
for (int i = 0; i < size; i++) {
ints.get()[i] = i;
}
PageBuilderOptions builder_options;
builder_options.data_page_size = 256 * 1024;
segment_v2::FrameOfReferencePageBuilder<OLAP_FIELD_TYPE_INT> page_builder(builder_options);
page_builder.add(reinterpret_cast<const uint8_t *>(ints.get()), &size);
Slice s = page_builder.finish();
// body: 4 bytes min value + 128 * 1 /8 packing value = 20
// header: 1 + 1 + 4 = 6
ASSERT_EQ(26, s.size);
}
TEST_F(FrameOfReferencePageTest, TestInt32NormalBlockEncoderSize) {
size_t size = 128;
std::unique_ptr<int32_t[]> ints(new int32_t[size]);
for (int i = 0; i < size; i++) {
ints.get()[i] = 128 - i;
}
PageBuilderOptions builder_options;
builder_options.data_page_size = 256 * 1024;
segment_v2::FrameOfReferencePageBuilder<OLAP_FIELD_TYPE_INT> page_builder(builder_options);
page_builder.add(reinterpret_cast<const uint8_t *>(ints.get()), &size);
Slice s = page_builder.finish();
// body: 4 bytes min value + 128 * 7 /8 packing value = 116
// header: 1 + 1 + 4 = 6
ASSERT_EQ(122, s.size);
}
}
int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

View File

@ -48,3 +48,5 @@ ADD_BE_TEST(block_compression_test)
ADD_BE_TEST(arrow/arrow_row_block_test)
ADD_BE_TEST(arrow/arrow_row_batch_test)
ADD_BE_TEST(counter_cond_variable_test)
ADD_BE_TEST(frame_of_reference_coding_test)
ADD_BE_TEST(bit_stream_utils_test)

View File

@ -0,0 +1,231 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <algorithm>
#include <cstdint>
#include <cstdlib>
#include <cstring>
#include <ostream>
#include <string>
#include <vector>
#include <limits>
// Must come before gtest.h.
#include <boost/utility/binary.hpp>
#include <glog/logging.h>
#include <gtest/gtest.h>
#include "util/bit_stream_utils.h"
#include "util/bit_stream_utils.inline.h"
#include "util/bit_util.h"
#include "util/faststring.h"
#include "util/debug_util.h"
using std::string;
using std::vector;
namespace doris {
const int kMaxWidth = 64;
class TestBitStreamUtil : public testing::Test {};
TEST(TestBitStreamUtil, TestBool) {
const int len_bytes = 2;
faststring buffer(len_bytes);
BitWriter writer(&buffer);
// Write alternating 0's and 1's
for (int i = 0; i < 8; ++i) {
writer.PutValue(i % 2, 1);
}
writer.Flush();
EXPECT_EQ(buffer[0], BOOST_BINARY(1 0 1 0 1 0 1 0));
// Write 00110011
for (int i = 0; i < 8; ++i) {
switch (i) {
case 0:
case 1:
case 4:
case 5:
writer.PutValue(0, 1);
break;
default:
writer.PutValue(1, 1);
break;
}
}
writer.Flush();
// Validate the exact bit value
EXPECT_EQ(buffer[0], BOOST_BINARY(1 0 1 0 1 0 1 0));
EXPECT_EQ(buffer[1], BOOST_BINARY(1 1 0 0 1 1 0 0));
// Use the reader and validate
BitReader reader(buffer.data(), buffer.size());
for (int i = 0; i < 8; ++i) {
bool val = false;
bool result = reader.GetValue(1, &val);
EXPECT_TRUE(result);
EXPECT_EQ(val, i % 2);
}
for (int i = 0; i < 8; ++i) {
bool val = false;
bool result = reader.GetValue(1, &val);
EXPECT_TRUE(result);
switch (i) {
case 0:
case 1:
case 4:
case 5:
EXPECT_EQ(val, false);
break;
default:
EXPECT_EQ(val, true);
break;
}
}
}
// Writes 'num_vals' values with width 'bit_width' and reads them back.
void TestBitArrayValues(int bit_width, int num_vals) {
const int kTestLen = BitUtil::Ceil(bit_width * num_vals, 8);
const uint64_t mod = bit_width == 64? 1 : 1LL << bit_width;
faststring buffer(kTestLen);
BitWriter writer(&buffer);
for (int i = 0; i < num_vals; ++i) {
writer.PutValue(i % mod, bit_width);
}
writer.Flush();
EXPECT_EQ(writer.bytes_written(), kTestLen);
BitReader reader(buffer.data(), kTestLen);
for (int i = 0; i < num_vals; ++i) {
int64_t val = 0;
bool result = reader.GetValue(bit_width, &val);
EXPECT_TRUE(result);
EXPECT_EQ(val, i % mod);
}
EXPECT_EQ(reader.bytes_left(), 0);
}
TEST(TestBitStreamUtil, TestValues) {
for (int width = 1; width <= kMaxWidth; ++width) {
TestBitArrayValues(width, 1);
TestBitArrayValues(width, 2);
// Don't write too many values
TestBitArrayValues(width, (width < 12) ? (1 << width) : 4096);
TestBitArrayValues(width, 1024);
}
}
// Test some mixed values
TEST(TestBitStreamUtil, TestMixed) {
const int kTestLenBits = 1024;
faststring buffer(kTestLenBits / 8);
bool parity = true;
BitWriter writer(&buffer);
for (int i = 0; i < kTestLenBits; ++i) {
if (i % 2 == 0) {
writer.PutValue(parity, 1);
parity = !parity;
} else {
writer.PutValue(i, 10);
}
}
writer.Flush();
parity = true;
BitReader reader(buffer.data(), buffer.size());
for (int i = 0; i < kTestLenBits; ++i) {
bool result;
if (i % 2 == 0) {
bool val = false;
result = reader.GetValue(1, &val);
EXPECT_EQ(val, parity);
parity = !parity;
} else {
int val;
result = reader.GetValue(10, &val);
EXPECT_EQ(val, i);
}
EXPECT_TRUE(result);
}
}
TEST(TestBitStreamUtil, TestSeekToBit) {
faststring buffer(1);
BitWriter writer(&buffer);
writer.PutValue(2019,32);
writer.PutValue(2020,32);
writer.PutValue(2021,32);
writer.Flush();
BitReader reader(buffer.data(), buffer.size());
reader.SeekToBit(buffer.size() * 8 - 8 * 8);
uint32_t second_value;
reader.GetValue(32, &second_value);
ASSERT_EQ(second_value,2020);
uint32_t third_value;
reader.GetValue(32, &third_value);
ASSERT_EQ(third_value, 2021);
reader.SeekToBit(0);
uint32_t first_value;
reader.GetValue(32, &first_value);
ASSERT_EQ(first_value, 2019);
}
TEST(TestBitStreamUtil, TestUint64) {
faststring buffer(1);
BitWriter writer(&buffer);
writer.PutValue(18446744073709551614U, 64);
writer.PutValue(18446744073709551613U, 64);
writer.PutValue(128, 32);
writer.PutValue(126, 16);
writer.Flush();
BitReader reader(buffer.data(), buffer.size());
uint64_t v1;
reader.GetValue(64, &v1);
ASSERT_EQ(v1, 18446744073709551614U);
uint64_t v2;
reader.GetValue(64, &v2);
ASSERT_EQ(v2, 18446744073709551613U);
uint64_t v3;
reader.GetValue(32, &v3);
ASSERT_EQ(v3, 128);
uint64_t v4;
reader.GetValue(16, &v4);
ASSERT_EQ(v4, 126);
}
} // namespace doris
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

View File

@ -0,0 +1,166 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <gtest/gtest.h>
#include "util/frame_of_reference_coding.h"
namespace doris {
class TestForCoding : public testing::Test {
public:
static void test_frame_of_reference_encode_decode(int32_t element_size) {
faststring buffer(1);
ForEncoder<int32_t> encoder(&buffer);
std::vector<int32_t> data;
for (int32_t i = 0; i < element_size; ++i) {
data.push_back(i);
}
encoder.put_batch(data.data(), element_size);
encoder.flush();
ForDecoder<int32_t> decoder(buffer.data(), buffer.length());
decoder.init();
std::vector<int32_t> actual_result(element_size);
decoder.get_batch(actual_result.data(), element_size);
ASSERT_EQ(data, actual_result);
}
static void test_skip(int32_t skip_num) {
faststring buffer(1);
ForEncoder<uint32_t> encoder(&buffer);
std::vector<uint32_t> input_data;
std::vector<uint32_t> expect_result;
for (uint32_t i = 0; i < 256; ++i) {
input_data.push_back(i);
if (i >= skip_num) {
expect_result.push_back(i);
}
}
encoder.put_batch(input_data.data(), 256);
encoder.flush();
ForDecoder<uint32_t> decoder(buffer.data(), buffer.length());
decoder.init();
decoder.skip(skip_num);
std::vector<uint32_t> actual_result(256 - skip_num);
decoder.get_batch(actual_result.data(), 256 - skip_num);
ASSERT_EQ(expect_result, actual_result);
}
};
TEST_F(TestForCoding, TestHalfFrame) {
test_frame_of_reference_encode_decode(64);
}
TEST_F(TestForCoding, TestOneFrame) {
test_frame_of_reference_encode_decode(128);
}
TEST_F(TestForCoding, TestTwoFrame) {
test_frame_of_reference_encode_decode(256);
}
TEST_F(TestForCoding, TestTwoHlafFrame) {
test_frame_of_reference_encode_decode(320);
}
TEST_F(TestForCoding, TestSkipZero) {
test_skip(0);
}
TEST_F(TestForCoding, TestSkipHalfFrame) {
test_skip(64);
}
TEST_F(TestForCoding, TestSkipOneFrame) {
test_skip(128);
}
TEST_F(TestForCoding, TestInt64) {
faststring buffer(1);
ForEncoder<int64_t> encoder(&buffer);
std::vector<int64_t> data;
for (int64_t i = 0; i < 320; ++i) {
data.push_back(i);
}
encoder.put_batch(data.data(), 320);
encoder.flush();
ForDecoder<int64_t> decoder(buffer.data(), buffer.length());
decoder.init();
std::vector<int64_t> actual_result(320);
decoder.get_batch(actual_result.data(), 320);
ASSERT_EQ(data, actual_result);
}
TEST_F(TestForCoding, TestOneMinValue) {
faststring buffer(1);
ForEncoder<int32_t> encoder(&buffer);
encoder.put(2019);
encoder.flush();
ForDecoder<int32_t> decoder(buffer.data(), buffer.length());
decoder.init();
int32_t actual_value;
decoder.get(&actual_value);
ASSERT_EQ(2019, actual_value);
}
TEST_F(TestForCoding, TestZeroValue) {
faststring buffer(1);
ForEncoder<int32_t> encoder(&buffer);
encoder.flush();
ASSERT_EQ(buffer.length(), 4 + 1);
ForDecoder<int32_t> decoder(buffer.data(), buffer.length());
decoder.init();
int32_t actual_value;
bool result = decoder.get(&actual_value);
ASSERT_EQ(result, false);
}
TEST_F(TestForCoding, TestBytesAlign) {
faststring buffer(1);
ForEncoder<int32_t> encoder(&buffer);
encoder.put(2019);
encoder.put(2020);
encoder.flush();
ForDecoder<int32_t> decoder(buffer.data(), buffer.length());
decoder.init();
int32_t actual_value;
decoder.get(&actual_value);
ASSERT_EQ(2019, actual_value);
decoder.get(&actual_value);
ASSERT_EQ(2020, actual_value);
}
}
int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

View File

@ -43,143 +43,7 @@ namespace doris {
const int kMaxWidth = 64;
class TestRle : public testing::Test {
public:
TestRle() {
}
virtual ~TestRle() {
}
};
TEST(BitArray, TestBool) {
const int len_bytes = 2;
faststring buffer(len_bytes);
BitWriter writer(&buffer);
// Write alternating 0's and 1's
for (int i = 0; i < 8; ++i) {
writer.PutValue(i % 2, 1);
}
writer.Flush();
EXPECT_EQ(buffer[0], BOOST_BINARY(1 0 1 0 1 0 1 0));
// Write 00110011
for (int i = 0; i < 8; ++i) {
switch (i) {
case 0:
case 1:
case 4:
case 5:
writer.PutValue(0, 1);
break;
default:
writer.PutValue(1, 1);
break;
}
}
writer.Flush();
// Validate the exact bit value
EXPECT_EQ(buffer[0], BOOST_BINARY(1 0 1 0 1 0 1 0));
EXPECT_EQ(buffer[1], BOOST_BINARY(1 1 0 0 1 1 0 0));
// Use the reader and validate
BitReader reader(buffer.data(), buffer.size());
for (int i = 0; i < 8; ++i) {
bool val = false;
bool result = reader.GetValue(1, &val);
EXPECT_TRUE(result);
EXPECT_EQ(val, i % 2);
}
for (int i = 0; i < 8; ++i) {
bool val = false;
bool result = reader.GetValue(1, &val);
EXPECT_TRUE(result);
switch (i) {
case 0:
case 1:
case 4:
case 5:
EXPECT_EQ(val, false);
break;
default:
EXPECT_EQ(val, true);
break;
}
}
}
// Writes 'num_vals' values with width 'bit_width' and reads them back.
void TestBitArrayValues(int bit_width, int num_vals) {
const int kTestLen = BitUtil::Ceil(bit_width * num_vals, 8);
const uint64_t mod = bit_width == 64? 1 : 1LL << bit_width;
faststring buffer(kTestLen);
BitWriter writer(&buffer);
for (int i = 0; i < num_vals; ++i) {
writer.PutValue(i % mod, bit_width);
}
writer.Flush();
EXPECT_EQ(writer.bytes_written(), kTestLen);
BitReader reader(buffer.data(), kTestLen);
for (int i = 0; i < num_vals; ++i) {
int64_t val = 0;
bool result = reader.GetValue(bit_width, &val);
EXPECT_TRUE(result);
EXPECT_EQ(val, i % mod);
}
EXPECT_EQ(reader.bytes_left(), 0);
}
TEST(BitArray, TestValues) {
for (int width = 1; width <= kMaxWidth; ++width) {
TestBitArrayValues(width, 1);
TestBitArrayValues(width, 2);
// Don't write too many values
TestBitArrayValues(width, (width < 12) ? (1 << width) : 4096);
TestBitArrayValues(width, 1024);
}
}
// Test some mixed values
TEST(BitArray, TestMixed) {
const int kTestLenBits = 1024;
faststring buffer(kTestLenBits / 8);
bool parity = true;
BitWriter writer(&buffer);
for (int i = 0; i < kTestLenBits; ++i) {
if (i % 2 == 0) {
writer.PutValue(parity, 1);
parity = !parity;
} else {
writer.PutValue(i, 10);
}
}
writer.Flush();
parity = true;
BitReader reader(buffer.data(), buffer.size());
for (int i = 0; i < kTestLenBits; ++i) {
bool result;
if (i % 2 == 0) {
bool val = false;
result = reader.GetValue(1, &val);
EXPECT_EQ(val, parity);
parity = !parity;
} else {
int val;
result = reader.GetValue(10, &val);
EXPECT_EQ(val, i);
}
EXPECT_TRUE(result);
}
}
class TestRle : public testing::Test {};
// Validates encoding of values by encoding and decoding them. If
// expected_encoding != NULL, also validates that the encoded buffer is
// exactly 'expected_encoding'.

View File

@ -159,6 +159,8 @@ ${DORIS_TEST_BINARY_DIR}/util/block_compression_test
${DORIS_TEST_BINARY_DIR}/util/arrow/arrow_row_block_test
${DORIS_TEST_BINARY_DIR}/util/arrow/arrow_row_batch_test
${DORIS_TEST_BINARY_DIR}/util/counter_cond_variable_test
${DORIS_TEST_BINARY_DIR}/util/bit_stream_utils_test
${DORIS_TEST_BINARY_DIR}/util/frame_of_reference_coding_test
# Running common Unittest
${DORIS_TEST_BINARY_DIR}/common/resource_tls_test
@ -267,6 +269,7 @@ ${DORIS_TEST_BINARY_DIR}/olap/rowset/segment_v2/segment_test
${DORIS_TEST_BINARY_DIR}/olap/rowset/segment_v2/page_compression_test
${DORIS_TEST_BINARY_DIR}/olap/rowset/segment_v2/column_zone_map_test
${DORIS_TEST_BINARY_DIR}/olap/rowset/segment_v2/row_ranges_test
${DORIS_TEST_BINARY_DIR}/olap/rowset/segment_v2/frame_of_reference_page_test
${DORIS_TEST_BINARY_DIR}/olap/txn_manager_test
${DORIS_TEST_BINARY_DIR}/olap/storage_types_test
${DORIS_TEST_BINARY_DIR}/olap/generic_iterators_test