[bugfix]fix column reader compress codec unsafe problem (#9741)
by moving codec from shared reader to unshared iterator
This commit is contained in:
@ -104,7 +104,6 @@ Status ColumnReader::init() {
|
||||
strings::Substitute("unsupported typeinfo, type=$0", _meta.type()));
|
||||
}
|
||||
RETURN_IF_ERROR(EncodingInfo::get(_type_info.get(), _meta.encoding(), &_encoding_info));
|
||||
RETURN_IF_ERROR(get_block_compression_codec(_meta.compression(), _compress_codec));
|
||||
|
||||
for (int i = 0; i < _meta.indexes_size(); i++) {
|
||||
auto& index_meta = _meta.indexes(i);
|
||||
@ -144,12 +143,13 @@ Status ColumnReader::new_bitmap_index_iterator(BitmapIndexIterator** iterator) {
|
||||
}
|
||||
|
||||
Status ColumnReader::read_page(const ColumnIteratorOptions& iter_opts, const PagePointer& pp,
|
||||
PageHandle* handle, Slice* page_body, PageFooterPB* footer) {
|
||||
PageHandle* handle, Slice* page_body, PageFooterPB* footer,
|
||||
BlockCompressionCodec* codec) {
|
||||
iter_opts.sanity_check();
|
||||
PageReadOptions opts;
|
||||
opts.rblock = iter_opts.rblock;
|
||||
opts.page_pointer = pp;
|
||||
opts.codec = _compress_codec.get();
|
||||
opts.codec = codec;
|
||||
opts.stats = iter_opts.stats;
|
||||
opts.verify_checksum = _opts.verify_checksum;
|
||||
opts.use_page_cache = iter_opts.use_page_cache;
|
||||
@ -465,6 +465,12 @@ Status ArrayFileColumnIterator::next_batch(size_t* n, ColumnBlockView* dst, bool
|
||||
|
||||
FileColumnIterator::FileColumnIterator(ColumnReader* reader) : _reader(reader) {}
|
||||
|
||||
Status FileColumnIterator::init(const ColumnIteratorOptions& opts) {
|
||||
_opts = opts;
|
||||
RETURN_IF_ERROR(get_block_compression_codec(_reader->get_compression(), _compress_codec));
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
FileColumnIterator::~FileColumnIterator() = default;
|
||||
|
||||
Status FileColumnIterator::seek_to_first() {
|
||||
@ -653,7 +659,8 @@ Status FileColumnIterator::_read_data_page(const OrdinalPageIndexIterator& iter)
|
||||
Slice page_body;
|
||||
PageFooterPB footer;
|
||||
_opts.type = DATA_PAGE;
|
||||
RETURN_IF_ERROR(_reader->read_page(_opts, iter.page(), &handle, &page_body, &footer));
|
||||
RETURN_IF_ERROR(_reader->read_page(_opts, iter.page(), &handle, &page_body, &footer,
|
||||
_compress_codec.get()));
|
||||
// parse data page
|
||||
RETURN_IF_ERROR(ParsedPage::create(std::move(handle), page_body, footer.data_page_footer(),
|
||||
_reader->encoding_info(), iter.page(), iter.page_index(),
|
||||
@ -673,7 +680,8 @@ Status FileColumnIterator::_read_data_page(const OrdinalPageIndexIterator& iter)
|
||||
PageFooterPB dict_footer;
|
||||
_opts.type = INDEX_PAGE;
|
||||
RETURN_IF_ERROR(_reader->read_page(_opts, _reader->get_dict_page_pointer(),
|
||||
&_dict_page_handle, &dict_data, &dict_footer));
|
||||
&_dict_page_handle, &dict_data, &dict_footer,
|
||||
_compress_codec.get()));
|
||||
// ignore dict_footer.dict_page_footer().encoding() due to only
|
||||
// PLAIN_ENCODING is supported for dict page right now
|
||||
_dict_decoder = std::make_unique<BinaryPlainPageDecoder>(dict_data);
|
||||
|
||||
@ -103,7 +103,8 @@ public:
|
||||
|
||||
// read a page from file into a page handle
|
||||
Status read_page(const ColumnIteratorOptions& iter_opts, const PagePointer& pp,
|
||||
PageHandle* handle, Slice* page_body, PageFooterPB* footer);
|
||||
PageHandle* handle, Slice* page_body, PageFooterPB* footer,
|
||||
BlockCompressionCodec* codec);
|
||||
|
||||
bool is_nullable() const { return _meta.is_nullable(); }
|
||||
|
||||
@ -131,6 +132,8 @@ public:
|
||||
|
||||
bool is_empty() const { return _num_rows == 0; }
|
||||
|
||||
CompressionTypePB get_compression() const { return _meta.compression(); }
|
||||
|
||||
private:
|
||||
ColumnReader(const ColumnReaderOptions& opts, const ColumnMetaPB& meta, uint64_t num_rows,
|
||||
FilePathDesc path_desc);
|
||||
@ -175,7 +178,6 @@ private:
|
||||
TypeInfoPtr(nullptr, nullptr); // initialized in init(), may changed by subclasses.
|
||||
const EncodingInfo* _encoding_info =
|
||||
nullptr; // initialized in init(), used for create PageDecoder
|
||||
std::unique_ptr<BlockCompressionCodec> _compress_codec; // initialized in init()
|
||||
|
||||
// meta for various column indexes (null if the index is absent)
|
||||
const ZoneMapIndexPB* _zone_map_index_meta = nullptr;
|
||||
@ -253,6 +255,8 @@ public:
|
||||
explicit FileColumnIterator(ColumnReader* reader);
|
||||
~FileColumnIterator() override;
|
||||
|
||||
Status init(const ColumnIteratorOptions& opts) override;
|
||||
|
||||
Status seek_to_first() override;
|
||||
|
||||
Status seek_to_ordinal(ordinal_t ord) override;
|
||||
@ -285,6 +289,9 @@ private:
|
||||
private:
|
||||
ColumnReader* _reader;
|
||||
|
||||
// iterator owned compress codec, should NOT be shared by threads, initialized in init()
|
||||
std::unique_ptr<BlockCompressionCodec> _compress_codec;
|
||||
|
||||
// 1. The _page represents current page.
|
||||
// 2. We define an operation is one seek and following read,
|
||||
// If new seek is issued, the _page will be reset.
|
||||
|
||||
@ -37,7 +37,6 @@ Status IndexedColumnReader::load(bool use_page_cache, bool kept_in_memory) {
|
||||
strings::Substitute("unsupported typeinfo, type=$0", _meta.data_type()));
|
||||
}
|
||||
RETURN_IF_ERROR(EncodingInfo::get(_type_info, _meta.encoding(), &_encoding_info));
|
||||
RETURN_IF_ERROR(get_block_compression_codec(_meta.compression(), _compress_codec));
|
||||
_value_key_coder = get_key_coder(_type_info->type());
|
||||
|
||||
std::unique_ptr<fs::ReadableBlock> rblock;
|
||||
@ -72,18 +71,21 @@ Status IndexedColumnReader::load_index_page(fs::ReadableBlock* rblock, const Pag
|
||||
PageHandle* handle, IndexPageReader* reader) {
|
||||
Slice body;
|
||||
PageFooterPB footer;
|
||||
RETURN_IF_ERROR(read_page(rblock, PagePointer(pp), handle, &body, &footer, INDEX_PAGE));
|
||||
std::unique_ptr<BlockCompressionCodec> local_compress_codec;
|
||||
RETURN_IF_ERROR(get_block_compression_codec(_meta.compression(), local_compress_codec));
|
||||
RETURN_IF_ERROR(read_page(rblock, PagePointer(pp), handle, &body, &footer, INDEX_PAGE,
|
||||
local_compress_codec.get()));
|
||||
RETURN_IF_ERROR(reader->parse(body, footer.index_page_footer()));
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status IndexedColumnReader::read_page(fs::ReadableBlock* rblock, const PagePointer& pp,
|
||||
PageHandle* handle, Slice* body, PageFooterPB* footer,
|
||||
PageTypePB type) const {
|
||||
PageTypePB type, BlockCompressionCodec* codec) const {
|
||||
PageReadOptions opts;
|
||||
opts.rblock = rblock;
|
||||
opts.page_pointer = pp;
|
||||
opts.codec = _compress_codec.get();
|
||||
opts.codec = codec;
|
||||
OlapReaderStatistics tmp_stats;
|
||||
opts.stats = &tmp_stats;
|
||||
opts.use_page_cache = _use_page_cache;
|
||||
@ -96,10 +98,15 @@ Status IndexedColumnReader::read_page(fs::ReadableBlock* rblock, const PagePoint
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
Status IndexedColumnIterator::_read_data_page(const PagePointer& pp) {
|
||||
// there is not init() for IndexedColumnIterator, so do it here
|
||||
if (!_compress_codec.get())
|
||||
RETURN_IF_ERROR(get_block_compression_codec(_reader->get_compression(), _compress_codec));
|
||||
|
||||
PageHandle handle;
|
||||
Slice body;
|
||||
PageFooterPB footer;
|
||||
RETURN_IF_ERROR(_reader->read_page(_rblock.get(), pp, &handle, &body, &footer, DATA_PAGE));
|
||||
RETURN_IF_ERROR(_reader->read_page(_rblock.get(), pp, &handle, &body, &footer, DATA_PAGE,
|
||||
_compress_codec.get()));
|
||||
// parse data page
|
||||
// note that page_index is not used in IndexedColumnIterator, so we pass 0
|
||||
return ParsedPage::create(std::move(handle), body, footer.data_page_footer(),
|
||||
|
||||
@ -52,7 +52,8 @@ public:
|
||||
|
||||
// read a page specified by `pp' from `file' into `handle'
|
||||
Status read_page(fs::ReadableBlock* rblock, const PagePointer& pp, PageHandle* handle,
|
||||
Slice* body, PageFooterPB* footer, PageTypePB type) const;
|
||||
Slice* body, PageFooterPB* footer, PageTypePB type,
|
||||
BlockCompressionCodec* codec) const;
|
||||
|
||||
int64_t num_values() const { return _num_values; }
|
||||
const EncodingInfo* encoding_info() const { return _encoding_info; }
|
||||
@ -60,6 +61,8 @@ public:
|
||||
bool support_ordinal_seek() const { return _meta.has_ordinal_index_meta(); }
|
||||
bool support_value_seek() const { return _meta.has_value_index_meta(); }
|
||||
|
||||
CompressionTypePB get_compression() const { return _meta.compression(); }
|
||||
|
||||
private:
|
||||
Status load_index_page(fs::ReadableBlock* rblock, const PagePointerPB& pp, PageHandle* handle,
|
||||
IndexPageReader* reader);
|
||||
@ -84,7 +87,6 @@ private:
|
||||
|
||||
const TypeInfo* _type_info = nullptr;
|
||||
const EncodingInfo* _encoding_info = nullptr;
|
||||
std::unique_ptr<BlockCompressionCodec> _compress_codec;
|
||||
const KeyCoder* _value_key_coder = nullptr;
|
||||
};
|
||||
|
||||
@ -145,6 +147,8 @@ private:
|
||||
ordinal_t _current_ordinal = 0;
|
||||
// open file handle
|
||||
std::unique_ptr<fs::ReadableBlock> _rblock;
|
||||
// iterator owned compress codec, should NOT be shared by threads, initialized before used
|
||||
std::unique_ptr<BlockCompressionCodec> _compress_codec;
|
||||
};
|
||||
|
||||
} // namespace segment_v2
|
||||
|
||||
@ -30,6 +30,9 @@ namespace doris {
|
||||
// This class only used to compress a block data, which means all data
|
||||
// should given when call compress or decompress. This class don't handle
|
||||
// stream compression.
|
||||
//
|
||||
// NOTICE!! BlockCompressionCodec is NOT thread safe, it should NOT be shared by threads
|
||||
//
|
||||
class BlockCompressionCodec {
|
||||
public:
|
||||
virtual ~BlockCompressionCodec() {}
|
||||
@ -59,7 +62,9 @@ public:
|
||||
// Get a BlockCompressionCodec through type.
|
||||
// Return Status::OK if a valid codec is found. If codec is null, it means it is
|
||||
// NO_COMPRESSION. If codec is not null, user can use it to compress/decompress
|
||||
// data. And client doesn't have to release the codec.
|
||||
// data.
|
||||
//
|
||||
// NOTICE!! BlockCompressionCodec is NOT thread safe, it should NOT be shared by threads
|
||||
//
|
||||
// Return not OK, if error happens.
|
||||
Status get_block_compression_codec(segment_v2::CompressionTypePB type,
|
||||
|
||||
Reference in New Issue
Block a user