[Enhancement]Decode bitshuffle data before adding it into PageCache (#10036)

* [Enhancement]Decode bitshuffle data before add into PageCache

* Fix be ut failed
This commit is contained in:
Jerry Hu
2022-06-13 09:04:23 +08:00
committed by GitHub
parent 415b6b8086
commit 797f6e1472
11 changed files with 228 additions and 75 deletions

View File

@ -250,7 +250,7 @@ Status BinaryDictPageDecoder::next_batch(size_t* n, vectorized::MutableColumnPtr
_bit_shuffle_ptr->_cur_index));
*n = max_fetch;
const auto* data_array = reinterpret_cast<const int32_t*>(_bit_shuffle_ptr->_chunk.data);
const auto* data_array = reinterpret_cast<const int32_t*>(_bit_shuffle_ptr->get_data(0));
size_t start_index = _bit_shuffle_ptr->_cur_index;
dst->insert_many_dict_data(data_array, start_index, _dict_word_info, max_fetch,

View File

@ -239,6 +239,44 @@ private:
CppType _last_value;
};
static Status parse_bit_shuffle_header(const Slice& data, size_t& num_elements,
size_t& compressed_size, size_t& num_element_after_padding,
int& size_of_element) {
if (data.size < BITSHUFFLE_PAGE_HEADER_SIZE) {
std::stringstream ss;
ss << "file corruption: invalid data size:" << data.size
<< ", header size:" << BITSHUFFLE_PAGE_HEADER_SIZE;
return Status::InternalError(ss.str());
}
num_elements = decode_fixed32_le((const uint8_t*)&data[0]);
compressed_size = decode_fixed32_le((const uint8_t*)&data[4]);
num_element_after_padding = decode_fixed32_le((const uint8_t*)&data[8]);
if (num_element_after_padding != ALIGN_UP(num_elements, 8)) {
std::stringstream ss;
ss << "num of element information corrupted,"
<< " _num_element_after_padding:" << num_element_after_padding
<< ", _num_elements:" << num_elements;
return Status::InternalError(ss.str());
}
size_of_element = decode_fixed32_le((const uint8_t*)&data[12]);
switch (size_of_element) {
case 1:
case 2:
case 3:
case 4:
case 8:
case 12:
case 16:
break;
default:
std::stringstream ss;
ss << "invalid size_of_elem:" << size_of_element;
return Status::InternalError(ss.str());
}
return Status::OK();
}
template <FieldType Type>
class BitShufflePageDecoder : public PageDecoder {
public:
@ -247,50 +285,22 @@ public:
_options(options),
_parsed(false),
_num_elements(0),
_compressed_size(0),
_num_element_after_padding(0),
_size_of_element(0),
_cur_index(0) {}
~BitShufflePageDecoder() { ChunkAllocator::instance()->free(_chunk); }
Status init() override {
CHECK(!_parsed);
if (_data.size < BITSHUFFLE_PAGE_HEADER_SIZE) {
size_t unused;
RETURN_IF_ERROR(parse_bit_shuffle_header(_data, _num_elements, unused,
_num_element_after_padding, _size_of_element));
if (_data.size !=
_num_element_after_padding * _size_of_element + BITSHUFFLE_PAGE_HEADER_SIZE) {
std::stringstream ss;
ss << "file corruption: invalid data size:" << _data.size
<< ", header size:" << BITSHUFFLE_PAGE_HEADER_SIZE;
return Status::InternalError(ss.str());
}
_num_elements = decode_fixed32_le((const uint8_t*)&_data[0]);
_compressed_size = decode_fixed32_le((const uint8_t*)&_data[4]);
if (_compressed_size != _data.size) {
std::stringstream ss;
ss << "Size information unmatched, _compressed_size:" << _compressed_size
<< ", _num_elements:" << _num_elements << ", data size:" << _data.size;
return Status::InternalError(ss.str());
}
_num_element_after_padding = decode_fixed32_le((const uint8_t*)&_data[8]);
if (_num_element_after_padding != ALIGN_UP(_num_elements, 8)) {
std::stringstream ss;
ss << "num of element information corrupted,"
<< " _num_element_after_padding:" << _num_element_after_padding
<< ", _num_elements:" << _num_elements;
return Status::InternalError(ss.str());
}
_size_of_element = decode_fixed32_le((const uint8_t*)&_data[12]);
switch (_size_of_element) {
case 1:
case 2:
case 3:
case 4:
case 8:
case 12:
case 16:
break;
default:
std::stringstream ss;
ss << "invalid size_of_elem:" << _size_of_element;
ss << "Size information unmatched, _data.size:" << _data.size
<< ", _num_elements:" << _num_elements << ", expected size is "
<< _num_element_after_padding * _size_of_element + BITSHUFFLE_PAGE_HEADER_SIZE;
return Status::InternalError(ss.str());
}
@ -307,8 +317,6 @@ public:
<< ", SIZE_OF_TYPE:" << SIZE_OF_TYPE;
return Status::InternalError(ss.str());
}
RETURN_IF_ERROR(_decode());
_parsed = true;
return Status::OK();
}
@ -342,7 +350,7 @@ public:
// - left == _num_elements when not found (all values < target)
while (left < right) {
size_t mid = left + (right - left) / 2;
mid_value = &_chunk.data[mid * SIZE_OF_TYPE];
mid_value = get_data(mid);
if (TypeTraits<Type>::cmp(mid_value, value) < 0) {
left = mid + 1;
} else {
@ -352,7 +360,7 @@ public:
if (left >= _num_elements) {
return Status::NotFound("all value small than the value");
}
void* find_value = &_chunk.data[left * SIZE_OF_TYPE];
void* find_value = get_data(left);
if (TypeTraits<Type>::cmp(find_value, value) == 0) {
*exact_match = true;
} else {
@ -392,7 +400,7 @@ public:
size_t max_fetch = std::min(*n, static_cast<size_t>(_num_elements - _cur_index));
dst->insert_many_fix_len_data((char*)&_chunk.data[_cur_index * SIZE_OF_TYPE], max_fetch);
dst->insert_many_fix_len_data(get_data(_cur_index), max_fetch);
*n = max_fetch;
_cur_index += max_fetch;
@ -408,28 +416,13 @@ public:
size_t current_index() const override { return _cur_index; }
private:
void _copy_next_values(size_t n, void* data) {
memcpy(data, &_chunk.data[_cur_index * SIZE_OF_TYPE], n * SIZE_OF_TYPE);
char* get_data(size_t index) const {
return &_data.data[BITSHUFFLE_PAGE_HEADER_SIZE + index * SIZE_OF_TYPE];
}
Status _decode() {
if (_num_elements > 0) {
int64_t bytes;
if (!ChunkAllocator::instance()->allocate_align(
_num_element_after_padding * _size_of_element, &_chunk)) {
return Status::RuntimeError("Decoded Memory Alloc failed");
}
char* in = const_cast<char*>(&_data[BITSHUFFLE_PAGE_HEADER_SIZE]);
bytes = bitshuffle::decompress_lz4(in, _chunk.data, _num_element_after_padding,
_size_of_element, 0);
if (PREDICT_FALSE(bytes < 0)) {
// Ideally, this should not happen.
warn_with_bitshuffle_error(bytes);
return Status::RuntimeError("Unshuffle Process failed");
}
}
return Status::OK();
private:
void _copy_next_values(size_t n, void* data) {
memcpy(data, get_data(_cur_index), n * SIZE_OF_TYPE);
}
typedef typename TypeTraits<Type>::CppType CppType;
@ -440,12 +433,11 @@ private:
PageDecoderOptions _options;
bool _parsed;
size_t _num_elements;
size_t _compressed_size;
size_t _num_element_after_padding;
int _size_of_element;
size_t _cur_index;
Chunk _chunk;
friend class BinaryDictPageDecoder;
};

View File

@ -0,0 +1,98 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "olap/rowset/segment_v2/binary_dict_page.h"
#include "olap/rowset/segment_v2/bitshuffle_page.h"
#include "olap/rowset/segment_v2/encoding_info.h"
namespace doris {
namespace segment_v2 {
template <bool USED_IN_DICT_ENCODING>
struct BitShufflePagePreDecoder : public DataPagePreDecoder {
/**
* @brief Decode bitshuffle data
* The input should be data encoded by bitshuffle + lz4 or
* the input may be data of BinaryDictPage, if its encoding type is plain,
* it is no need to decode.
*
* @param page unique_ptr to hold page data, maybe be replaced by decoded data
* @param page_slice data to decode
* @param size_of_tail including size of footer and null map
* @return Status
*/
virtual Status decode(std::unique_ptr<char[]>* page, Slice* page_slice,
size_t size_of_tail) override {
size_t num_elements, compressed_size, num_element_after_padding;
int size_of_element;
size_t size_of_dict_header = 0;
Slice data(page_slice->data, page_slice->size - size_of_tail);
if constexpr (USED_IN_DICT_ENCODING) {
auto type = decode_fixed32_le((const uint8_t*)&data.data[0]);
if (static_cast<EncodingTypePB>(type) != EncodingTypePB::DICT_ENCODING) {
return Status::OK();
}
size_of_dict_header = BINARY_DICT_PAGE_HEADER_SIZE;
data.remove_prefix(4);
}
RETURN_IF_ERROR(parse_bit_shuffle_header(data, num_elements, compressed_size,
num_element_after_padding, size_of_element));
if (compressed_size != data.size) {
std::stringstream ss;
ss << "Size information unmatched, compressed_size:" << compressed_size
<< ", num_elements:" << num_elements << ", data size:" << data.size;
return Status::InternalError(ss.str());
}
Slice decoded_slice;
decoded_slice.size = size_of_dict_header + BITSHUFFLE_PAGE_HEADER_SIZE +
num_element_after_padding * size_of_element + size_of_tail;
std::unique_ptr<char[]> decoded_page(new char[decoded_slice.size]);
decoded_slice.data = decoded_page.get();
if constexpr (USED_IN_DICT_ENCODING) {
memcpy(decoded_slice.data, page_slice->data, size_of_dict_header);
}
memcpy(decoded_slice.data + size_of_dict_header, data.data, BITSHUFFLE_PAGE_HEADER_SIZE);
auto bytes = bitshuffle::decompress_lz4(
&data.data[BITSHUFFLE_PAGE_HEADER_SIZE],
decoded_slice.data + BITSHUFFLE_PAGE_HEADER_SIZE + size_of_dict_header,
num_element_after_padding, size_of_element, 0);
if (PREDICT_FALSE(bytes < 0)) {
// Ideally, this should not happen.
warn_with_bitshuffle_error(bytes);
return Status::RuntimeError("Unshuffle Process failed");
}
memcpy(decoded_slice.data + decoded_slice.size - size_of_tail,
page_slice->data + page_slice->size - size_of_tail, size_of_tail);
*page_slice = decoded_slice;
*page = std::move(decoded_page);
return Status::OK();
}
};
} // namespace segment_v2
} // namespace doris

View File

@ -155,6 +155,7 @@ Status ColumnReader::read_page(const ColumnIteratorOptions& iter_opts, const Pag
opts.use_page_cache = iter_opts.use_page_cache;
opts.kept_in_memory = _opts.kept_in_memory;
opts.type = iter_opts.type;
opts.encoding_info = _encoding_info;
return PageIO::read_and_decompress_page(opts, handle, page_body, footer);
}

View File

@ -25,6 +25,7 @@
#include "olap/rowset/segment_v2/binary_plain_page.h"
#include "olap/rowset/segment_v2/binary_prefix_page.h"
#include "olap/rowset/segment_v2/bitshuffle_page.h"
#include "olap/rowset/segment_v2/bitshuffle_page_pre_decoder.h"
#include "olap/rowset/segment_v2/frame_of_reference_page.h"
#include "olap/rowset/segment_v2/plain_page.h"
#include "olap/rowset/segment_v2/rle_page.h"
@ -312,7 +313,13 @@ EncodingInfo::EncodingInfo(TraitsClass traits)
: _create_builder_func(TraitsClass::create_page_builder),
_create_decoder_func(TraitsClass::create_page_decoder),
_type(TraitsClass::type),
_encoding(TraitsClass::encoding) {}
_encoding(TraitsClass::encoding) {
if (_encoding == BIT_SHUFFLE) {
_data_page_pre_decoder = std::make_unique<BitShufflePagePreDecoder<false>>();
} else if (_encoding == DICT_ENCODING) {
_data_page_pre_decoder = std::make_unique<BitShufflePagePreDecoder<true>>();
}
}
Status EncodingInfo::get(const TypeInfo* type_info, EncodingTypePB encoding_type,
const EncodingInfo** out) {

View File

@ -34,6 +34,14 @@ class PageDecoder;
struct PageBuilderOptions;
struct PageDecoderOptions;
// For better performance, some encodings (like BitShuffle) need to be decoded before being added to the PageCache.
class DataPagePreDecoder {
public:
virtual Status decode(std::unique_ptr<char[]>* page, Slice* page_slice,
size_t size_of_tail) = 0;
virtual ~DataPagePreDecoder() = default;
};
class EncodingInfo {
public:
// Get EncodingInfo for TypeInfo and EncodingTypePB
@ -54,6 +62,8 @@ public:
FieldType type() const { return _type; }
EncodingTypePB encoding() const { return _encoding; }
DataPagePreDecoder* get_data_page_pre_decoder() const { return _data_page_pre_decoder.get(); };
private:
friend class EncodingInfoResolver;
@ -69,6 +79,7 @@ private:
FieldType _type;
EncodingTypePB _encoding;
std::unique_ptr<DataPagePreDecoder> _data_page_pre_decoder = nullptr;
};
} // namespace segment_v2

View File

@ -91,6 +91,7 @@ Status IndexedColumnReader::read_page(fs::ReadableBlock* rblock, const PagePoint
opts.use_page_cache = _use_page_cache;
opts.kept_in_memory = _kept_in_memory;
opts.type = type;
opts.encoding_info = _encoding_info;
return PageIO::read_and_decompress_page(opts, handle, body, footer);
}

View File

@ -195,6 +195,15 @@ Status PageIO::read_and_decompress_page(const PageReadOptions& opts, PageHandle*
opts.stats->uncompressed_bytes_read += body_size;
}
if (opts.encoding_info) {
auto* pre_decoder = opts.encoding_info->get_data_page_pre_decoder();
if (pre_decoder) {
RETURN_IF_ERROR(pre_decoder->decode(
&page, &page_slice,
footer->data_page_footer().nullmap_size() + footer_size + 4));
}
}
*body = Slice(page_slice.data, page_slice.size - 4 - footer_size);
if (opts.use_page_cache && cache->is_cache_available(opts.type)) {
// insert this page into cache and return the cache handle

View File

@ -22,6 +22,7 @@
#include "common/logging.h"
#include "common/status.h"
#include "gen_cpp/segment_v2.pb.h"
#include "olap/rowset/segment_v2/encoding_info.h"
#include "olap/rowset/segment_v2/page_handle.h"
#include "olap/rowset/segment_v2/page_pointer.h"
#include "util/slice.h"
@ -59,6 +60,8 @@ struct PageReadOptions {
// INDEX_PAGE including index_page, dict_page and short_key_page
PageTypePB type;
const EncodingInfo* encoding_info = nullptr;
void sanity_check() const {
CHECK_NOTNULL(rblock);
CHECK_NOTNULL(stats);

View File

@ -25,6 +25,7 @@
#include "common/logging.h"
#include "olap/olap_common.h"
#include "olap/rowset/segment_v2/binary_plain_page.h"
#include "olap/rowset/segment_v2/bitshuffle_page_pre_decoder.h"
#include "olap/rowset/segment_v2/page_builder.h"
#include "olap/rowset/segment_v2/page_decoder.h"
#include "olap/types.h"
@ -79,11 +80,17 @@ public:
// decode
PageDecoderOptions decoder_options;
BinaryDictPageDecoder page_decoder(s.slice(), decoder_options);
page_decoder.set_dict_decoder(dict_page_decoder.get(), dict_word_info);
BinaryDictPageDecoder page_decoder_(s.slice(), decoder_options);
status = page_decoder_.init();
EXPECT_FALSE(status.ok());
segment_v2::BitShufflePagePreDecoder<true> pre_decoder;
Slice page_slice = s.slice();
std::unique_ptr<char[]> auto_release;
pre_decoder.decode(&auto_release, &page_slice, 0);
BinaryDictPageDecoder page_decoder(page_slice, decoder_options);
status = page_decoder.init();
page_decoder.set_dict_decoder(dict_page_decoder.get(), dict_word_info);
EXPECT_TRUE(status.ok());
EXPECT_EQ(slices.size(), page_decoder.count());
@ -178,9 +185,16 @@ public:
// decode
PageDecoderOptions decoder_options;
BinaryDictPageDecoder page_decoder(results[slice_index].slice(), decoder_options);
status = page_decoder.init();
Slice page_slice = results[slice_index].slice();
BinaryDictPageDecoder page_decoder_(page_slice, decoder_options);
status = page_decoder_.init();
EXPECT_FALSE(status.ok());
segment_v2::BitShufflePagePreDecoder<true> pre_decoder;
std::unique_ptr<char[]> auto_release;
pre_decoder.decode(&auto_release, &page_slice, 0);
BinaryDictPageDecoder page_decoder(page_slice, decoder_options);
status = page_decoder.init();
page_decoder.set_dict_decoder(dict_page_decoder.get(), dict_word_info);
EXPECT_TRUE(status.ok());

View File

@ -21,6 +21,7 @@
#include <memory>
#include "olap/rowset/segment_v2/bitshuffle_page_pre_decoder.h"
#include "olap/rowset/segment_v2/options.h"
#include "olap/rowset/segment_v2/page_builder.h"
#include "olap/rowset/segment_v2/page_decoder.h"
@ -71,8 +72,16 @@ public:
EXPECT_EQ(src[size - 1], last_value);
segment_v2::PageDecoderOptions decoder_options;
PageDecoderType page_decoder(s.slice(), decoder_options);
Status status = page_decoder.init();
PageDecoderType page_decoder_(s.slice(), decoder_options);
Status status = page_decoder_.init();
EXPECT_FALSE(status.ok());
segment_v2::BitShufflePagePreDecoder<false> pre_decoder;
Slice page_slice = s.slice();
std::unique_ptr<char[]> auto_release;
pre_decoder.decode(&auto_release, &page_slice, 0);
PageDecoderType page_decoder(page_slice, decoder_options);
status = page_decoder.init();
EXPECT_TRUE(status.ok());
EXPECT_EQ(0, page_decoder.current_index());
@ -121,8 +130,16 @@ public:
OwnedSlice s = page_builder.finish();
segment_v2::PageDecoderOptions decoder_options;
PageDecoderType page_decoder(s.slice(), decoder_options);
Status status = page_decoder.init();
PageDecoderType page_decoder_(s.slice(), decoder_options);
Status status = page_decoder_.init();
EXPECT_FALSE(status.ok());
segment_v2::BitShufflePagePreDecoder<false> pre_decoder;
Slice page_slice = s.slice();
std::unique_ptr<char[]> auto_release;
pre_decoder.decode(&auto_release, &page_slice, 0);
PageDecoderType page_decoder(page_slice, decoder_options);
status = page_decoder.init();
EXPECT_TRUE(status.ok());
EXPECT_EQ(0, page_decoder.current_index());