From b136d80e1a72f712a6ea0f103187e77254d623c2 Mon Sep 17 00:00:00 2001 From: yixiutt <102007456+yixiutt@users.noreply.github.com> Date: Thu, 15 Sep 2022 10:59:46 +0800 Subject: [PATCH] [enhancement](compress) reuse compression ctx and buffer (#12573) Reuse compression ctx and buffer. Use a global instance for every compression algorithm, and use a thread saft buffer pool to reuse compression buffer, pool size is equal to max parallel thread num in compression, and this will not be too large. Test shows this feature increase 5% of data import and compaction. Co-authored-by: yixiutt --- .../olap/rowset/segment_v2/column_reader.cpp | 8 +- be/src/olap/rowset/segment_v2/column_reader.h | 2 +- .../olap/rowset/segment_v2/column_writer.cpp | 8 +- be/src/olap/rowset/segment_v2/column_writer.h | 2 +- .../segment_v2/indexed_column_reader.cpp | 12 +- .../rowset/segment_v2/indexed_column_reader.h | 2 +- .../segment_v2/indexed_column_writer.cpp | 6 +- .../rowset/segment_v2/indexed_column_writer.h | 2 +- be/src/olap/rowset/segment_v2/page_io.cpp | 30 +- be/src/olap/rowset/segment_v2/page_io.h | 9 +- be/src/olap/tablet_schema.h | 2 +- be/src/util/block_compression.cpp | 633 +++++++++++++----- be/src/util/block_compression.h | 19 +- be/src/vec/core/block.cpp | 52 +- be/src/vec/core/block.h | 7 - .../parquet/vparquet_column_chunk_reader.cpp | 2 +- .../parquet/vparquet_column_chunk_reader.h | 2 +- be/src/vec/sink/vdata_stream_sender.cpp | 18 +- be/src/vec/sink/vdata_stream_sender.h | 5 - be/test/util/block_compression_test.cpp | 42 +- 20 files changed, 538 insertions(+), 325 deletions(-) diff --git a/be/src/olap/rowset/segment_v2/column_reader.cpp b/be/src/olap/rowset/segment_v2/column_reader.cpp index c9dc6e2c32..d42358c5e7 100644 --- a/be/src/olap/rowset/segment_v2/column_reader.cpp +++ b/be/src/olap/rowset/segment_v2/column_reader.cpp @@ -575,7 +575,7 @@ FileColumnIterator::FileColumnIterator(ColumnReader* reader) : _reader(reader) { Status FileColumnIterator::init(const ColumnIteratorOptions& opts) { _opts = opts; - RETURN_IF_ERROR(get_block_compression_codec(_reader->get_compression(), _compress_codec)); + RETURN_IF_ERROR(get_block_compression_codec(_reader->get_compression(), &_compress_codec)); if (config::enable_low_cardinality_optimize && _reader->encoding_info()->encoding() == DICT_ENCODING) { auto dict_encoding_type = _reader->get_dict_encoding_type(); @@ -860,8 +860,8 @@ Status FileColumnIterator::_read_data_page(const OrdinalPageIndexIterator& iter) Slice page_body; PageFooterPB footer; _opts.type = DATA_PAGE; - RETURN_IF_ERROR(_reader->read_page(_opts, iter.page(), &handle, &page_body, &footer, - _compress_codec.get())); + RETURN_IF_ERROR( + _reader->read_page(_opts, iter.page(), &handle, &page_body, &footer, _compress_codec)); // parse data page RETURN_IF_ERROR(ParsedPage::create(std::move(handle), page_body, footer.data_page_footer(), _reader->encoding_info(), iter.page(), iter.page_index(), @@ -882,7 +882,7 @@ Status FileColumnIterator::_read_data_page(const OrdinalPageIndexIterator& iter) _opts.type = INDEX_PAGE; RETURN_IF_ERROR(_reader->read_page(_opts, _reader->get_dict_page_pointer(), &_dict_page_handle, &dict_data, &dict_footer, - _compress_codec.get())); + _compress_codec)); // ignore dict_footer.dict_page_footer().encoding() due to only // PLAIN_ENCODING is supported for dict page right now _dict_decoder = std::make_unique>( diff --git a/be/src/olap/rowset/segment_v2/column_reader.h b/be/src/olap/rowset/segment_v2/column_reader.h index 92b6572e4e..33c7c418cd 100644 --- a/be/src/olap/rowset/segment_v2/column_reader.h +++ b/be/src/olap/rowset/segment_v2/column_reader.h @@ -329,7 +329,7 @@ private: ColumnReader* _reader; // iterator owned compress codec, should NOT be shared by threads, initialized in init() - std::unique_ptr _compress_codec; + BlockCompressionCodec* _compress_codec; // 1. The _page represents current page. // 2. We define an operation is one seek and following read, diff --git a/be/src/olap/rowset/segment_v2/column_writer.cpp b/be/src/olap/rowset/segment_v2/column_writer.cpp index e49f0270e3..caf9dc0629 100644 --- a/be/src/olap/rowset/segment_v2/column_writer.cpp +++ b/be/src/olap/rowset/segment_v2/column_writer.cpp @@ -264,7 +264,7 @@ ScalarColumnWriter::~ScalarColumnWriter() { } Status ScalarColumnWriter::init() { - RETURN_IF_ERROR(get_block_compression_codec(_opts.meta->compression(), _compress_codec)); + RETURN_IF_ERROR(get_block_compression_codec(_opts.meta->compression(), &_compress_codec)); PageBuilder* page_builder = nullptr; @@ -407,7 +407,7 @@ Status ScalarColumnWriter::write_data() { PagePointer dict_pp; RETURN_IF_ERROR(PageIO::compress_and_write_page( - _compress_codec.get(), _opts.compression_min_space_saving, _file_writer, + _compress_codec, _opts.compression_min_space_saving, _file_writer, {dict_body.slice()}, footer, &dict_pp)); dict_pp.to_proto(_opts.meta->mutable_dict_page()); } @@ -494,8 +494,8 @@ Status ScalarColumnWriter::finish_current_page() { } // trying to compress page body OwnedSlice compressed_body; - RETURN_IF_ERROR(PageIO::compress_page_body( - _compress_codec.get(), _opts.compression_min_space_saving, body, &compressed_body)); + RETURN_IF_ERROR(PageIO::compress_page_body(_compress_codec, _opts.compression_min_space_saving, + body, &compressed_body)); if (compressed_body.slice().empty()) { // page body is uncompressed page->data.emplace_back(std::move(encoded_values)); diff --git a/be/src/olap/rowset/segment_v2/column_writer.h b/be/src/olap/rowset/segment_v2/column_writer.h index 68fef318e5..45829b37d9 100644 --- a/be/src/olap/rowset/segment_v2/column_writer.h +++ b/be/src/olap/rowset/segment_v2/column_writer.h @@ -242,7 +242,7 @@ private: PageHead _pages; ordinal_t _first_rowid = 0; - std::unique_ptr _compress_codec; + BlockCompressionCodec* _compress_codec; std::unique_ptr _ordinal_index_builder; std::unique_ptr _zone_map_index_builder; diff --git a/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp b/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp index 02f27aa6e6..31bab3a912 100644 --- a/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp +++ b/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp @@ -69,10 +69,10 @@ Status IndexedColumnReader::load_index_page(const PagePointerPB& pp, PageHandle* IndexPageReader* reader) { Slice body; PageFooterPB footer; - std::unique_ptr local_compress_codec; - RETURN_IF_ERROR(get_block_compression_codec(_meta.compression(), local_compress_codec)); + BlockCompressionCodec* local_compress_codec; + RETURN_IF_ERROR(get_block_compression_codec(_meta.compression(), &local_compress_codec)); RETURN_IF_ERROR(read_page(PagePointer(pp), handle, &body, &footer, INDEX_PAGE, - local_compress_codec.get(), false)); + local_compress_codec, false)); RETURN_IF_ERROR(reader->parse(body, footer.index_page_footer())); _mem_size += body.get_size(); return Status::OK(); @@ -101,14 +101,14 @@ Status IndexedColumnReader::read_page(const PagePointer& pp, PageHandle* handle, Status IndexedColumnIterator::_read_data_page(const PagePointer& pp) { // there is not init() for IndexedColumnIterator, so do it here if (!_compress_codec) { - RETURN_IF_ERROR(get_block_compression_codec(_reader->get_compression(), _compress_codec)); + RETURN_IF_ERROR(get_block_compression_codec(_reader->get_compression(), &_compress_codec)); } PageHandle handle; Slice body; PageFooterPB footer; - RETURN_IF_ERROR(_reader->read_page(pp, &handle, &body, &footer, DATA_PAGE, - _compress_codec.get(), true)); + RETURN_IF_ERROR( + _reader->read_page(pp, &handle, &body, &footer, DATA_PAGE, _compress_codec, true)); // parse data page // note that page_index is not used in IndexedColumnIterator, so we pass 0 PageDecoderOptions opts; diff --git a/be/src/olap/rowset/segment_v2/indexed_column_reader.h b/be/src/olap/rowset/segment_v2/indexed_column_reader.h index 15b4c175d5..a991e8d4a8 100644 --- a/be/src/olap/rowset/segment_v2/indexed_column_reader.h +++ b/be/src/olap/rowset/segment_v2/indexed_column_reader.h @@ -143,7 +143,7 @@ private: // next_batch() will read from this position ordinal_t _current_ordinal = 0; // iterator owned compress codec, should NOT be shared by threads, initialized before used - std::unique_ptr _compress_codec; + BlockCompressionCodec* _compress_codec = nullptr; }; } // namespace segment_v2 diff --git a/be/src/olap/rowset/segment_v2/indexed_column_writer.cpp b/be/src/olap/rowset/segment_v2/indexed_column_writer.cpp index 2d2bdd620e..cfce5afbb8 100644 --- a/be/src/olap/rowset/segment_v2/indexed_column_writer.cpp +++ b/be/src/olap/rowset/segment_v2/indexed_column_writer.cpp @@ -73,7 +73,7 @@ Status IndexedColumnWriter::init() { } if (_options.compression != NO_COMPRESSION) { - RETURN_IF_ERROR(get_block_compression_codec(_options.compression, _compress_codec)); + RETURN_IF_ERROR(get_block_compression_codec(_options.compression, &_compress_codec)); } return Status::OK(); } @@ -112,7 +112,7 @@ Status IndexedColumnWriter::_finish_current_data_page() { footer.mutable_data_page_footer()->set_nullmap_size(0); RETURN_IF_ERROR(PageIO::compress_and_write_page( - _compress_codec.get(), _options.compression_min_space_saving, _file_writer, + _compress_codec, _options.compression_min_space_saving, _file_writer, {page_body.slice()}, footer, &_last_data_page)); _num_data_pages++; @@ -160,7 +160,7 @@ Status IndexedColumnWriter::_flush_index(IndexPageBuilder* index_builder, BTreeM PagePointer pp; RETURN_IF_ERROR(PageIO::compress_and_write_page( - _compress_codec.get(), _options.compression_min_space_saving, _file_writer, + _compress_codec, _options.compression_min_space_saving, _file_writer, {page_body.slice()}, page_footer, &pp)); meta->set_is_root_data_page(false); diff --git a/be/src/olap/rowset/segment_v2/indexed_column_writer.h b/be/src/olap/rowset/segment_v2/indexed_column_writer.h index 8be96d7b35..0c2df791f1 100644 --- a/be/src/olap/rowset/segment_v2/indexed_column_writer.h +++ b/be/src/olap/rowset/segment_v2/indexed_column_writer.h @@ -108,7 +108,7 @@ private: std::unique_ptr _value_index_builder; // encoder for value index's key const KeyCoder* _value_key_coder; - std::unique_ptr _compress_codec; + BlockCompressionCodec* _compress_codec; DISALLOW_COPY_AND_ASSIGN(IndexedColumnWriter); }; diff --git a/be/src/olap/rowset/segment_v2/page_io.cpp b/be/src/olap/rowset/segment_v2/page_io.cpp index 4388feb6dc..affb73d303 100644 --- a/be/src/olap/rowset/segment_v2/page_io.cpp +++ b/be/src/olap/rowset/segment_v2/page_io.cpp @@ -35,27 +35,19 @@ namespace segment_v2 { using strings::Substitute; -Status PageIO::compress_page_body(const BlockCompressionCodec* codec, double min_space_saving, +Status PageIO::compress_page_body(BlockCompressionCodec* codec, double min_space_saving, const std::vector& body, OwnedSlice* compressed_body) { size_t uncompressed_size = Slice::compute_total_size(body); - if (codec != nullptr && uncompressed_size > 0) { - size_t max_compressed_size = codec->max_compressed_len(uncompressed_size); - if (max_compressed_size) { - faststring buf; - buf.resize(max_compressed_size); - Slice compressed_slice(buf); - RETURN_IF_ERROR(codec->compress(body, &compressed_slice)); - buf.resize(compressed_slice.get_size()); - - double space_saving = 1.0 - static_cast(buf.size()) / uncompressed_size; - // return compressed body only when it saves more than min_space_saving - if (space_saving > 0 && space_saving >= min_space_saving) { - // shrink the buf to fit the len size to avoid taking - // up the memory of the size MAX_COMPRESSED_SIZE - buf.shrink_to_fit(); - *compressed_body = buf.build(); - return Status::OK(); - } + if (codec != nullptr && !codec->exceed_max_compress_len(uncompressed_size)) { + faststring buf; + RETURN_IF_ERROR(codec->compress(body, uncompressed_size, &buf)); + double space_saving = 1.0 - static_cast(buf.size()) / uncompressed_size; + // return compressed body only when it saves more than min_space_saving + if (space_saving > 0 && space_saving >= min_space_saving) { + // shrink the buf to fit the len size to avoid taking + // up the memory of the size MAX_COMPRESSED_SIZE + *compressed_body = buf.build(); + return Status::OK(); } } // otherwise, do not compress diff --git a/be/src/olap/rowset/segment_v2/page_io.h b/be/src/olap/rowset/segment_v2/page_io.h index 86cff00f89..d08f05c8ba 100644 --- a/be/src/olap/rowset/segment_v2/page_io.h +++ b/be/src/olap/rowset/segment_v2/page_io.h @@ -50,7 +50,7 @@ struct PageReadOptions { // location of the page PagePointer page_pointer; // decompressor for page body (null means page body is not compressed) - const BlockCompressionCodec* codec = nullptr; + BlockCompressionCodec* codec = nullptr; // used to collect IO metrics OlapReaderStatistics* stats = nullptr; // whether to verify page checksum @@ -89,7 +89,7 @@ public: // Compress `body' using `codec' into `compressed_body'. // The size of returned `compressed_body' is 0 when the body is not compressed, this // could happen when `codec' is null or space saving is less than `min_space_saving'. - static Status compress_page_body(const BlockCompressionCodec* codec, double min_space_saving, + static Status compress_page_body(BlockCompressionCodec* codec, double min_space_saving, const std::vector& body, OwnedSlice* compressed_body); // Encode page from `body' and `footer' and write to `file'. @@ -99,9 +99,8 @@ public: const PageFooterPB& footer, PagePointer* result); // Convenient function to compress page body and write page in one go. - static Status compress_and_write_page(const BlockCompressionCodec* codec, - double min_space_saving, io::FileWriter* writer, - const std::vector& body, + static Status compress_and_write_page(BlockCompressionCodec* codec, double min_space_saving, + io::FileWriter* writer, const std::vector& body, const PageFooterPB& footer, PagePointer* result) { DCHECK_EQ(footer.uncompressed_size(), Slice::compute_total_size(body)); OwnedSlice compressed_body; diff --git a/be/src/olap/tablet_schema.h b/be/src/olap/tablet_schema.h index ce7a1171f8..300b603e83 100644 --- a/be/src/olap/tablet_schema.h +++ b/be/src/olap/tablet_schema.h @@ -221,4 +221,4 @@ bool operator!=(const TabletSchema& a, const TabletSchema& b); using TabletSchemaSPtr = std::shared_ptr; -} // namespace doris +} // namespace doris \ No newline at end of file diff --git a/be/src/util/block_compression.cpp b/be/src/util/block_compression.cpp index 4c58a7f86a..ac34cfc17d 100644 --- a/be/src/util/block_compression.cpp +++ b/be/src/util/block_compression.cpp @@ -28,47 +28,91 @@ #include #include "gutil/strings/substitute.h" +#include "util/defer_op.h" #include "util/faststring.h" namespace doris { using strings::Substitute; -Status BlockCompressionCodec::compress(const std::vector& inputs, Slice* output) const { +Status BlockCompressionCodec::compress(const std::vector& inputs, size_t uncompressed_size, + faststring* output) { faststring buf; // we compute total size to avoid more memory copy - size_t total_size = Slice::compute_total_size(inputs); - buf.reserve(total_size); + buf.reserve(uncompressed_size); for (auto& input : inputs) { buf.append(input.data, input.size); } return compress(buf, output); } +bool BlockCompressionCodec::exceed_max_compress_len(size_t uncompressed_size) { + if (uncompressed_size > std::numeric_limits::max()) { + return true; + } + return false; +} + class Lz4BlockCompression : public BlockCompressionCodec { +private: + struct Context { + Context() : ctx(nullptr) {} + LZ4_stream_t* ctx; + faststring buffer; + }; + public: - static const Lz4BlockCompression* instance() { + static Lz4BlockCompression* instance() { static Lz4BlockCompression s_instance; return &s_instance; } - ~Lz4BlockCompression() override {} + ~Lz4BlockCompression() { + for (auto ctx : _ctx_pool) { + _delete_compression_ctx(ctx); + } + } - Status compress(const Slice& input, Slice* output) const override { - if (input.size > std::numeric_limits::max() || - output->size > std::numeric_limits::max()) { - return Status::InvalidArgument("LZ4 cannot handle data large than 2G"); + Status compress(const Slice& input, faststring* output) override { + Context* context; + RETURN_IF_ERROR(_acquire_compression_ctx(&context)); + bool compress_failed = false; + Defer defer {[&] { + if (compress_failed) { + _delete_compression_ctx(context); + } else { + _release_compression_ctx(context); + } + }}; + Slice compressed_buf; + size_t max_len = max_compressed_len(input.size); + if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) { + // use output directly + output->resize(max_len); + compressed_buf.data = reinterpret_cast(output->data()); + compressed_buf.size = max_len; + } else { + // reuse context buffer if max_len < MAX_COMPRESSION_BUFFER_FOR_REUSE + context->buffer.resize(max_len); + compressed_buf.data = reinterpret_cast(context->buffer.data()); + compressed_buf.size = max_len; } - auto compressed_len = - LZ4_compress_default(input.data, output->data, input.size, output->size); + + size_t compressed_len = + LZ4_compress_fast_continue(context->ctx, input.data, compressed_buf.data, + input.size, compressed_buf.size, ACCELARATION); if (compressed_len == 0) { + compress_failed = true; return Status::InvalidArgument("Output buffer's capacity is not enough, size={}", - output->size); + compressed_buf.size); + } + output->resize(compressed_len); + if (max_len < MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) { + output->assign_copy(reinterpret_cast(compressed_buf.data), compressed_len); } - output->size = compressed_len; return Status::OK(); } - Status decompress(const Slice& input, Slice* output) const override { + Status decompress(const Slice& input, Slice* output) override { auto decompressed_len = LZ4_decompress_safe(input.data, output->data, input.size, output->size); if (decompressed_len < 0) { @@ -78,122 +122,261 @@ public: return Status::OK(); } - size_t max_compressed_len(size_t len) const override { - if (len > std::numeric_limits::max()) { - return 0; + size_t max_compressed_len(size_t len) override { return LZ4_compressBound(len); } + +private: + // reuse LZ4 compress stream + Status _acquire_compression_ctx(Context** out) { + if (_ctx_pool.empty()) { + Context* context = new (std::nothrow) Context(); + if (context == nullptr) { + return Status::InvalidArgument("new LZ4 context error"); + } + context->ctx = LZ4_createStream(); + if (context->ctx == nullptr) { + delete context; + return Status::InvalidArgument("LZ4_createStream error"); + } + *out = context; + return Status::OK(); } - return LZ4_compressBound(len); + std::lock_guard l(_ctx_mutex); + *out = _ctx_pool.back(); + _ctx_pool.pop_back(); + return Status::OK(); } + void _release_compression_ctx(Context* context) { + DCHECK(context); + LZ4_resetStream(context->ctx); + std::lock_guard l(_ctx_mutex); + _ctx_pool.push_back(context); + } + void _delete_compression_ctx(Context* context) { + DCHECK(context); + LZ4_freeStream(context->ctx); + delete context; + } + +private: + mutable std::mutex _ctx_mutex; + mutable std::vector _ctx_pool; + static const int32_t ACCELARATION = 1; }; // Used for LZ4 frame format, decompress speed is two times faster than LZ4. class Lz4fBlockCompression : public BlockCompressionCodec { +private: + struct CContext { + CContext() : ctx(nullptr) {} + LZ4F_compressionContext_t ctx; + faststring buffer; + }; + struct DContext { + DContext() : ctx(nullptr) {} + LZ4F_decompressionContext_t ctx; + }; + public: - Status init() override { - auto ret1 = LZ4F_createCompressionContext(&ctx_c, LZ4F_VERSION); - if (LZ4F_isError(ret1)) { - return Status::InvalidArgument(strings::Substitute( - "Fail to LZ4F_createCompressionContext, msg=$0", LZ4F_getErrorName(ret1))); + static Lz4fBlockCompression* instance() { + static Lz4fBlockCompression s_instance; + return &s_instance; + } + ~Lz4fBlockCompression() { + for (auto ctx : _ctx_c_pool) { + _delete_compression_ctx(ctx); } - ctx_c_inited = true; - - auto ret2 = LZ4F_createDecompressionContext(&ctx_d, LZ4F_VERSION); - if (LZ4F_isError(ret2)) { - return Status::InvalidArgument(strings::Substitute( - "Fail to LZ4F_createDecompressionContext, msg=$0", LZ4F_getErrorName(ret2))); + for (auto ctx : _ctx_d_pool) { + _delete_decompression_ctx(ctx); } - ctx_d_inited = true; - - return Status::OK(); } - ~Lz4fBlockCompression() override { - if (ctx_c_inited) LZ4F_freeCompressionContext(ctx_c); - if (ctx_d_inited) LZ4F_freeDecompressionContext(ctx_d); - } - - Status compress(const Slice& input, Slice* output) const override { + Status compress(const Slice& input, faststring* output) override { std::vector inputs {input}; - return compress(inputs, output); + return compress(inputs, input.size, output); } - Status compress(const std::vector& inputs, Slice* output) const override { - if (!ctx_c_inited) - return Status::InvalidArgument("LZ4F_createCompressionContext not sucess"); - - return _compress(ctx_c, inputs, output); + Status compress(const std::vector& inputs, size_t uncompressed_size, + faststring* output) override { + return _compress(inputs, uncompressed_size, output); } - Status decompress(const Slice& input, Slice* output) const override { - if (!ctx_d_inited) - return Status::InvalidArgument("LZ4F_createDecompressionContext not sucess"); - - return _decompress(ctx_d, input, output); + Status decompress(const Slice& input, Slice* output) override { + return _decompress(input, output); } - size_t max_compressed_len(size_t len) const override { - if (len > std::numeric_limits::max()) { - return 0; - } + size_t max_compressed_len(size_t len) override { return std::max(LZ4F_compressBound(len, &_s_preferences), LZ4F_compressFrameBound(len, &_s_preferences)); } private: - Status _compress(LZ4F_compressionContext_t ctx, const std::vector& inputs, - Slice* output) const { - auto wbytes = LZ4F_compressBegin(ctx, output->data, output->size, &_s_preferences); + Status _compress(const std::vector& inputs, size_t uncompressed_size, + faststring* output) { + CContext* context; + RETURN_IF_ERROR(_acquire_compression_ctx(&context)); + bool compress_failed = false; + Defer defer {[&] { + if (compress_failed) { + _delete_compression_ctx(context); + } else { + _release_compression_ctx(context); + } + }}; + Slice compressed_buf; + size_t max_len = max_compressed_len(uncompressed_size); + if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) { + // use output directly + output->resize(max_len); + compressed_buf.data = reinterpret_cast(output->data()); + compressed_buf.size = max_len; + } else { + // reuse context buffer if max_len < MAX_COMPRESSION_BUFFER_FOR_REUSE + context->buffer.resize(max_len); + compressed_buf.data = reinterpret_cast(context->buffer.data()); + compressed_buf.size = max_len; + } + + auto wbytes = LZ4F_compressBegin(context->ctx, compressed_buf.data, compressed_buf.size, + &_s_preferences); if (LZ4F_isError(wbytes)) { + compress_failed = true; return Status::InvalidArgument("Fail to do LZ4F compress begin, res={}", LZ4F_getErrorName(wbytes)); } size_t offset = wbytes; for (auto input : inputs) { - wbytes = LZ4F_compressUpdate(ctx, output->data + offset, output->size - offset, - input.data, input.size, nullptr); + wbytes = LZ4F_compressUpdate(context->ctx, compressed_buf.data + offset, + compressed_buf.size - offset, input.data, input.size, + nullptr); if (LZ4F_isError(wbytes)) { + compress_failed = true; return Status::InvalidArgument("Fail to do LZ4F compress update, res={}", LZ4F_getErrorName(wbytes)); } offset += wbytes; } - wbytes = LZ4F_compressEnd(ctx, output->data + offset, output->size - offset, nullptr); + wbytes = LZ4F_compressEnd(context->ctx, compressed_buf.data + offset, + compressed_buf.size - offset, nullptr); if (LZ4F_isError(wbytes)) { + compress_failed = true; return Status::InvalidArgument("Fail to do LZ4F compress end, res={}", LZ4F_getErrorName(wbytes)); } offset += wbytes; - output->size = offset; + output->resize(offset); + if (max_len < MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) { + output->assign_copy(reinterpret_cast(compressed_buf.data), offset); + } + return Status::OK(); } - Status _decompress(LZ4F_decompressionContext_t ctx, const Slice& input, Slice* output) const { - // reset decompression context to avoid ERROR_maxBlockSize_invalid - LZ4F_resetDecompressionContext(ctx); + Status _decompress(const Slice& input, Slice* output) { + bool decompress_failed = false; + DContext* context = nullptr; + RETURN_IF_ERROR(_acquire_decompression_ctx(&context)); + Defer defer {[&] { + if (decompress_failed) { + _delete_decompression_ctx(context); + } else { + _release_decompression_ctx(context); + } + }}; size_t input_size = input.size; - auto lres = - LZ4F_decompress(ctx, output->data, &output->size, input.data, &input_size, nullptr); + auto lres = LZ4F_decompress(context->ctx, output->data, &output->size, input.data, + &input_size, nullptr); if (LZ4F_isError(lres)) { + decompress_failed = true; return Status::InvalidArgument("Fail to do LZ4F decompress, res={}", LZ4F_getErrorName(lres)); } else if (input_size != input.size) { + decompress_failed = true; return Status::InvalidArgument( strings::Substitute("Fail to do LZ4F decompress: trailing data left in " "compressed data, read=$0 vs given=$1", input_size, input.size)); } else if (lres != 0) { + decompress_failed = true; return Status::InvalidArgument( "Fail to do LZ4F decompress: expect more compressed data, expect={}", lres); } return Status::OK(); } +private: + // acquire a compression ctx from pool, release while finish compress, + // delete if compression failed + Status _acquire_compression_ctx(CContext** out) { + if (_ctx_c_pool.empty()) { + CContext* context = new (std::nothrow) CContext(); + if (context == nullptr) { + return Status::InvalidArgument("failed to new LZ4F CContext"); + } + auto res = LZ4F_createCompressionContext(&context->ctx, LZ4F_VERSION); + if (LZ4F_isError(res) != 0) { + return Status::InvalidArgument(strings::Substitute( + "LZ4F_createCompressionContext error, res=$0", LZ4F_getErrorName(res))); + } + *out = context; + return Status::OK(); + } + std::lock_guard l(_ctx_c_mutex); + *out = _ctx_c_pool.back(); + _ctx_c_pool.pop_back(); + return Status::OK(); + } + void _release_compression_ctx(CContext* context) { + DCHECK(context); + std::lock_guard l(_ctx_c_mutex); + _ctx_c_pool.push_back(context); + } + void _delete_compression_ctx(CContext* context) { + DCHECK(context); + LZ4F_freeCompressionContext(context->ctx); + delete context; + } + + Status _acquire_decompression_ctx(DContext** out) { + std::lock_guard l(_ctx_d_mutex); + if (_ctx_d_pool.empty()) { + DContext* context = new (std::nothrow) DContext(); + if (context == nullptr) { + return Status::InvalidArgument("failed to new LZ4F DContext"); + } + auto res = LZ4F_createDecompressionContext(&context->ctx, LZ4F_VERSION); + if (LZ4F_isError(res) != 0) { + return Status::InvalidArgument(strings::Substitute( + "LZ4F_createDeompressionContext error, res=$0", LZ4F_getErrorName(res))); + } + *out = context; + return Status::OK(); + } + *out = _ctx_d_pool.back(); + _ctx_d_pool.pop_back(); + return Status::OK(); + } + void _release_decompression_ctx(DContext* context) { + DCHECK(context); + // reset decompression context to avoid ERROR_maxBlockSize_invalid + LZ4F_resetDecompressionContext(context->ctx); + std::lock_guard l(_ctx_d_mutex); + _ctx_d_pool.push_back(context); + } + void _delete_decompression_ctx(DContext* context) { + DCHECK(context); + LZ4F_freeDecompressionContext(context->ctx); + delete context; + } + private: static LZ4F_preferences_t _s_preferences; - LZ4F_compressionContext_t ctx_c; - bool ctx_c_inited = false; - LZ4F_decompressionContext_t ctx_d; - bool ctx_d_inited = false; + + std::mutex _ctx_c_mutex; + // LZ4F_compressionContext_t is a pointer so no copy here + std::vector _ctx_c_pool; + + std::mutex _ctx_d_mutex; + std::vector _ctx_d_pool; }; LZ4F_preferences_t Lz4fBlockCompression::_s_preferences = { @@ -273,18 +456,23 @@ private: class SnappyBlockCompression : public BlockCompressionCodec { public: - static const SnappyBlockCompression* instance() { + static SnappyBlockCompression* instance() { static SnappyBlockCompression s_instance; return &s_instance; } ~SnappyBlockCompression() override {} - Status compress(const Slice& input, Slice* output) const override { - snappy::RawCompress(input.data, input.size, output->data, &output->size); + Status compress(const Slice& input, faststring* output) override { + size_t max_len = max_compressed_len(input.size); + output->resize(max_len); + Slice s(*output); + + snappy::RawCompress(input.data, input.size, s.data, &s.size); + output->resize(s.size); return Status::OK(); } - Status decompress(const Slice& input, Slice* output) const override { + Status decompress(const Slice& input, Slice* output) override { if (!snappy::RawUncompress(input.data, input.size, output->data)) { return Status::InvalidArgument("Fail to do Snappy decompress"); } @@ -293,35 +481,46 @@ public: return Status::OK(); } - Status compress(const std::vector& inputs, Slice* output) const override { + Status compress(const std::vector& inputs, size_t uncompressed_size, + faststring* output) override { + auto max_len = max_compressed_len(uncompressed_size); + output->resize(max_len); + SnappySlicesSource source(inputs); - snappy::UncheckedByteArraySink sink(output->data); - output->size = snappy::Compress(&source, &sink); + snappy::UncheckedByteArraySink sink(reinterpret_cast(output->data())); + output->resize(snappy::Compress(&source, &sink)); return Status::OK(); } - size_t max_compressed_len(size_t len) const override { - return snappy::MaxCompressedLength(len); - } + size_t max_compressed_len(size_t len) override { return snappy::MaxCompressedLength(len); } }; class ZlibBlockCompression : public BlockCompressionCodec { public: - static const ZlibBlockCompression* instance() { + static ZlibBlockCompression* instance() { static ZlibBlockCompression s_instance; return &s_instance; } ~ZlibBlockCompression() {} - Status compress(const Slice& input, Slice* output) const override { - auto zres = ::compress((Bytef*)output->data, &output->size, (Bytef*)input.data, input.size); + Status compress(const Slice& input, faststring* output) override { + size_t max_len = max_compressed_len(input.size); + output->resize(max_len); + Slice s(*output); + + auto zres = ::compress((Bytef*)s.data, &s.size, (Bytef*)input.data, input.size); if (zres != Z_OK) { return Status::InvalidArgument("Fail to do ZLib compress, error={}", zError(zres)); } + output->resize(s.size); return Status::OK(); } - Status compress(const std::vector& inputs, Slice* output) const override { + Status compress(const std::vector& inputs, size_t uncompressed_size, + faststring* output) override { + size_t max_len = max_compressed_len(uncompressed_size); + output->resize(max_len); + z_stream zstrm; zstrm.zalloc = Z_NULL; zstrm.zfree = Z_NULL; @@ -332,8 +531,8 @@ public: zError(zres), zres); } // we assume that output is e - zstrm.next_out = (Bytef*)output->data; - zstrm.avail_out = output->size; + zstrm.next_out = (Bytef*)output->data(); + zstrm.avail_out = output->size(); for (int i = 0; i < inputs.size(); ++i) { if (inputs[i].size == 0) { continue; @@ -349,7 +548,7 @@ public: } } - output->size = zstrm.total_out; + output->resize(zstrm.total_out); zres = deflateEnd(&zstrm); if (zres != Z_OK) { return Status::InvalidArgument("Fail to do deflateEnd on ZLib stream, error={}, res={}", @@ -358,7 +557,7 @@ public: return Status::OK(); } - Status decompress(const Slice& input, Slice* output) const override { + Status decompress(const Slice& input, Slice* output) override { size_t input_size = input.size; auto zres = ::uncompress2((Bytef*)output->data, &output->size, (Bytef*)input.data, &input_size); @@ -368,7 +567,7 @@ public: return Status::OK(); } - size_t max_compressed_len(size_t len) const override { + size_t max_compressed_len(size_t len) override { // one-time overhead of six bytes for the entire stream plus five bytes per 16 KB block return len + 6 + 5 * ((len >> 14) + 1); } @@ -376,68 +575,82 @@ public: // for ZSTD compression and decompression, with BOTH fast and high compression ratio class ZstdBlockCompression : public BlockCompressionCodec { +private: + struct CContext { + CContext() : ctx(nullptr) {} + ZSTD_CCtx* ctx; + faststring buffer; + }; + struct DContext { + DContext() : ctx(nullptr) {} + ZSTD_DCtx* ctx; + }; + public: - // reenterable initialization for compress/decompress context - inline Status init() override { - if (!ctx_c) { - ctx_c = ZSTD_createCCtx(); - if (!ctx_c) { - return Status::InvalidArgument("Fail to ZSTD_createCCtx"); - } + static ZstdBlockCompression* instance() { + static ZstdBlockCompression s_instance; + return &s_instance; + } + ~ZstdBlockCompression() { + for (auto ctx : _ctx_c_pool) { + _delete_compression_ctx(ctx); } - - if (!ctx_d) { - ctx_d = ZSTD_createDCtx(); - if (!ctx_d) { - return Status::InvalidArgument("Fail to ZSTD_createDCtx"); - } + for (auto ctx : _ctx_d_pool) { + _delete_decompression_ctx(ctx); } - - return Status::OK(); } - ~ZstdBlockCompression() override { - if (ctx_c) ZSTD_freeCCtx(ctx_c); - if (ctx_d) ZSTD_freeDCtx(ctx_d); - } + size_t max_compressed_len(size_t len) override { return ZSTD_compressBound(len); } - size_t max_compressed_len(size_t len) const override { - if (len > std::numeric_limits::max()) { - return 0; - } - return ZSTD_compressBound(len); - } - - Status compress(const Slice& input, Slice* output) const override { + Status compress(const Slice& input, faststring* output) override { std::vector inputs {input}; - return compress(inputs, output); + return compress(inputs, input.size, output); } // follow ZSTD official example // https://github.com/facebook/zstd/blob/dev/examples/streaming_compression.c - Status compress(const std::vector& inputs, Slice* output) const override { - if (!ctx_c) return Status::InvalidArgument("compression context NOT initialized"); + Status compress(const std::vector& inputs, size_t uncompressed_size, + faststring* output) override { + CContext* context; + RETURN_IF_ERROR(_acquire_compression_ctx(&context)); + bool compress_failed = false; + Defer defer {[&] { + if (compress_failed) { + _delete_compression_ctx(context); + } else { + _release_compression_ctx(context); + } + }}; - // reset ctx to start new compress session - auto ret = ZSTD_CCtx_reset(ctx_c, ZSTD_reset_session_only); - if (ZSTD_isError(ret)) { - return Status::InvalidArgument("ZSTD_CCtx_reset error: {}", - ZSTD_getErrorString(ZSTD_getErrorCode(ret))); + size_t max_len = max_compressed_len(uncompressed_size); + Slice compressed_buf; + if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) { + // use output directly + output->resize(max_len); + compressed_buf.data = reinterpret_cast(output->data()); + compressed_buf.size = max_len; + } else { + // reuse context buffer if max_len < MAX_COMPRESSION_BUFFER_FOR_REUSE + context->buffer.resize(max_len); + compressed_buf.data = reinterpret_cast(context->buffer.data()); + compressed_buf.size = max_len; } + // set compression level to default 3 - ret = ZSTD_CCtx_setParameter(ctx_c, ZSTD_c_compressionLevel, ZSTD_CLEVEL_DEFAULT); + auto ret = + ZSTD_CCtx_setParameter(context->ctx, ZSTD_c_compressionLevel, ZSTD_CLEVEL_DEFAULT); if (ZSTD_isError(ret)) { return Status::InvalidArgument("ZSTD_CCtx_setParameter compression level error: {}", ZSTD_getErrorString(ZSTD_getErrorCode(ret))); } // set checksum flag to 1 - ret = ZSTD_CCtx_setParameter(ctx_c, ZSTD_c_checksumFlag, 1); + ret = ZSTD_CCtx_setParameter(context->ctx, ZSTD_c_checksumFlag, 1); if (ZSTD_isError(ret)) { return Status::InvalidArgument("ZSTD_CCtx_setParameter checksumFlag error: {}", ZSTD_getErrorString(ZSTD_getErrorCode(ret))); } - ZSTD_outBuffer out_buf = {output->data, output->size, 0}; + ZSTD_outBuffer out_buf = {compressed_buf.data, compressed_buf.size, 0}; for (size_t i = 0; i < inputs.size(); i++) { ZSTD_inBuffer in_buf = {inputs[i].data, inputs[i].size, 0}; @@ -448,15 +661,17 @@ public: bool finished = false; do { // do compress - auto ret = ZSTD_compressStream2(ctx_c, &out_buf, &in_buf, mode); + auto ret = ZSTD_compressStream2(context->ctx, &out_buf, &in_buf, mode); if (ZSTD_isError(ret)) { + compress_failed = true; return Status::InvalidArgument("ZSTD_compressStream2 error: {}", ZSTD_getErrorString(ZSTD_getErrorCode(ret))); } // ret is ZSTD hint for needed output buffer size if (ret > 0 && out_buf.pos == out_buf.size) { + compress_failed = true; return Status::InvalidArgument("ZSTD_compressStream2 output buffer full"); } @@ -465,37 +680,44 @@ public: } // set compressed size for caller - output->size = out_buf.pos; + output->resize(out_buf.pos); + if (max_len < MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) { + output->assign_copy(reinterpret_cast(compressed_buf.data), out_buf.pos); + } return Status::OK(); } // follow ZSTD official example // https://github.com/facebook/zstd/blob/dev/examples/streaming_decompression.c - Status decompress(const Slice& input, Slice* output) const override { - if (!ctx_d) return Status::InvalidArgument("decompression context NOT initialized"); - - // reset ctx to start a new decompress session - auto ret = ZSTD_DCtx_reset(ctx_d, ZSTD_reset_session_only); - if (ZSTD_isError(ret)) { - return Status::InvalidArgument("ZSTD_DCtx_reset error: {}", - ZSTD_getErrorString(ZSTD_getErrorCode(ret))); - } + Status decompress(const Slice& input, Slice* output) override { + DContext* context; + bool compress_failed = false; + RETURN_IF_ERROR(_acquire_decompression_ctx(&context)); + Defer defer {[&] { + if (compress_failed) { + _delete_decompression_ctx(context); + } else { + _release_decompression_ctx(context); + } + }}; ZSTD_inBuffer in_buf = {input.data, input.size, 0}; ZSTD_outBuffer out_buf = {output->data, output->size, 0}; while (in_buf.pos < in_buf.size) { // do decompress - auto ret = ZSTD_decompressStream(ctx_d, &out_buf, &in_buf); + auto ret = ZSTD_decompressStream(context->ctx, &out_buf, &in_buf); if (ZSTD_isError(ret)) { + compress_failed = true; return Status::InvalidArgument("ZSTD_decompressStream error: {}", ZSTD_getErrorString(ZSTD_getErrorCode(ret))); } // ret is ZSTD hint for needed output buffer size if (ret > 0 && out_buf.pos == out_buf.size) { + compress_failed = true; return Status::InvalidArgument("ZSTD_decompressStream output buffer full"); } } @@ -507,16 +729,87 @@ public: } private: - // will be reused by compress/decompress - ZSTD_CCtx* ctx_c = nullptr; - ZSTD_DCtx* ctx_d = nullptr; + Status _acquire_compression_ctx(CContext** out) { + if (_ctx_c_pool.empty()) { + CContext* context = new (std::nothrow) CContext(); + if (context == nullptr) { + return Status::InvalidArgument("failed to new ZSTD CContext"); + } + //typedef LZ4F_cctx* LZ4F_compressionContext_t; + context->ctx = ZSTD_createCCtx(); + if (context->ctx == nullptr) { + return Status::InvalidArgument("Failed to create ZSTD compress ctx"); + } + *out = context; + return Status::OK(); + } + std::lock_guard l(_ctx_c_mutex); + *out = _ctx_c_pool.back(); + _ctx_c_pool.pop_back(); + return Status::OK(); + } + void _release_compression_ctx(CContext* context) { + DCHECK(context); + auto ret = ZSTD_CCtx_reset(context->ctx, ZSTD_reset_session_only); + DCHECK(!ZSTD_isError(ret)); + std::lock_guard l(_ctx_c_mutex); + _ctx_c_pool.push_back(context); + } + void _delete_compression_ctx(CContext* context) { + DCHECK(context); + ZSTD_freeCCtx(context->ctx); + delete context; + } + + Status _acquire_decompression_ctx(DContext** out) { + std::lock_guard l(_ctx_d_mutex); + if (_ctx_d_pool.empty()) { + DContext* context = new (std::nothrow) DContext(); + if (context == nullptr) { + return Status::InvalidArgument("failed to new ZSTD DContext"); + } + context->ctx = ZSTD_createDCtx(); + if (context->ctx == nullptr) { + return Status::InvalidArgument("Fail to init ZSTD decompress context"); + } + *out = context; + return Status::OK(); + } + *out = _ctx_d_pool.back(); + _ctx_d_pool.pop_back(); + return Status::OK(); + } + void _release_decompression_ctx(DContext* context) { + DCHECK(context); + // reset ctx to start a new decompress session + auto ret = ZSTD_DCtx_reset(context->ctx, ZSTD_reset_session_only); + DCHECK(!ZSTD_isError(ret)); + std::lock_guard l(_ctx_d_mutex); + _ctx_d_pool.push_back(context); + } + void _delete_decompression_ctx(DContext* context) { + DCHECK(context); + ZSTD_freeDCtx(context->ctx); + delete context; + } + +private: + mutable std::mutex _ctx_c_mutex; + mutable std::vector _ctx_c_pool; + + mutable std::mutex _ctx_d_mutex; + mutable std::vector _ctx_d_pool; }; class GzipBlockCompression final : public ZlibBlockCompression { public: + static GzipBlockCompression* instance() { + static GzipBlockCompression s_instance; + return &s_instance; + } ~GzipBlockCompression() override = default; - Status decompress(const Slice& input, Slice* output) const override { + Status decompress(const Slice& input, Slice* output) override { z_stream z_strm = {nullptr}; z_strm.zalloc = Z_NULL; z_strm.zfree = Z_NULL; @@ -548,7 +841,7 @@ public: return Status::OK(); } - size_t max_compressed_len(size_t len) const override { + size_t max_compressed_len(size_t len) override { z_stream zstrm; zstrm.zalloc = Z_NULL; zstrm.zfree = Z_NULL; @@ -586,74 +879,56 @@ private: }; Status get_block_compression_codec(segment_v2::CompressionTypePB type, - std::unique_ptr& codec) { - BlockCompressionCodec* ptr = nullptr; + BlockCompressionCodec** codec) { switch (type) { case segment_v2::CompressionTypePB::NO_COMPRESSION: - codec.reset(nullptr); - return Status::OK(); + *codec = nullptr; + break; case segment_v2::CompressionTypePB::SNAPPY: - ptr = new SnappyBlockCompression(); + *codec = SnappyBlockCompression::instance(); break; case segment_v2::CompressionTypePB::LZ4: - ptr = new Lz4BlockCompression(); + *codec = Lz4BlockCompression::instance(); break; case segment_v2::CompressionTypePB::LZ4F: - ptr = new Lz4fBlockCompression(); + *codec = Lz4fBlockCompression::instance(); break; case segment_v2::CompressionTypePB::ZLIB: - ptr = new ZlibBlockCompression(); + *codec = ZlibBlockCompression::instance(); break; case segment_v2::CompressionTypePB::ZSTD: - ptr = new ZstdBlockCompression(); + *codec = ZstdBlockCompression::instance(); break; default: return Status::NotFound("unknown compression type({})", type); } - if (!ptr) return Status::NotFound("Failed to create compression codec"); - - Status st = ptr->init(); - if (st.ok()) { - codec.reset(ptr); - } else { - delete ptr; - } - - return st; + return Status::OK(); } Status get_block_compression_codec(tparquet::CompressionCodec::type parquet_codec, - std::unique_ptr& codec) { - BlockCompressionCodec* ptr = nullptr; + BlockCompressionCodec** codec) { switch (parquet_codec) { case tparquet::CompressionCodec::UNCOMPRESSED: - codec.reset(nullptr); - return Status::OK(); + *codec = nullptr; + break; case tparquet::CompressionCodec::SNAPPY: - ptr = new SnappyBlockCompression(); + *codec = SnappyBlockCompression::instance(); break; case tparquet::CompressionCodec::LZ4: - ptr = new Lz4BlockCompression(); + *codec = Lz4BlockCompression::instance(); break; case tparquet::CompressionCodec::ZSTD: - ptr = new ZstdBlockCompression(); + *codec = ZstdBlockCompression::instance(); break; case tparquet::CompressionCodec::GZIP: - ptr = new GzipBlockCompression(); + *codec = GzipBlockCompression::instance(); break; default: - return Status::NotFound("unknown compression type({})", tparquet::to_string(parquet_codec)); + return Status::NotFound("unknown compression type({})", parquet_codec); } - Status st = ptr->init(); - if (st.ok()) { - codec.reset(ptr); - } else { - delete ptr; - } - - return st; + return Status::OK(); } } // namespace doris diff --git a/be/src/util/block_compression.h b/be/src/util/block_compression.h index 9de8eb16ff..c2939452de 100644 --- a/be/src/util/block_compression.h +++ b/be/src/util/block_compression.h @@ -34,6 +34,10 @@ namespace doris { // // NOTICE!! BlockCompressionCodec is NOT thread safe, it should NOT be shared by threads // + +// max compression reuse buffer size +// if max_compress_len is bigger than this, donot use faststring in context +const static int MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE = 1024 * 1024 * 8; class BlockCompressionCodec { public: virtual ~BlockCompressionCodec() {} @@ -44,20 +48,23 @@ public: // output should be preallocated, and its capacity must be large enough // for compressed input, which can be get through max_compressed_len function. // Size of compressed data will be set in output's size. - virtual Status compress(const Slice& input, Slice* output) const = 0; + virtual Status compress(const Slice& input, faststring* output) = 0; // Default implementation will merge input list into a big buffer and call // compress(Slice) to finish compression. If compression type support digesting // slice one by one, it should reimplement this function. - virtual Status compress(const std::vector& input, Slice* output) const; + virtual Status compress(const std::vector& input, size_t uncompressed_size, + faststring* output); // Decompress input data into output, output's capacity should be large enough // for decompressed data. // Size of decompressed data will be set in output's size. - virtual Status decompress(const Slice& input, Slice* output) const = 0; + virtual Status decompress(const Slice& input, Slice* output) = 0; // Returns an upper bound on the max compressed length. - virtual size_t max_compressed_len(size_t len) const = 0; + virtual size_t max_compressed_len(size_t len) = 0; + + virtual bool exceed_max_compress_len(size_t uncompressed_size); }; // Get a BlockCompressionCodec through type. @@ -69,9 +76,9 @@ public: // // Return not OK, if error happens. Status get_block_compression_codec(segment_v2::CompressionTypePB type, - std::unique_ptr& codec); + BlockCompressionCodec** codec); Status get_block_compression_codec(tparquet::CompressionCodec::type parquet_codec, - std::unique_ptr& codec); + BlockCompressionCodec** codec); } // namespace doris diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp index 7da0f0e525..7b751f134f 100644 --- a/be/src/vec/core/block.cpp +++ b/be/src/vec/core/block.cpp @@ -30,6 +30,7 @@ #include "runtime/tuple_row.h" #include "udf/udf.h" #include "util/block_compression.h" +#include "util/faststring.h" #include "util/simd/bits.h" #include "vec/columns/column.h" #include "vec/columns/column_array.h" @@ -76,8 +77,8 @@ Block::Block(const PBlock& pblock) { size_t compressed_size = pblock.column_values().size(); size_t uncompressed_size = 0; if (pblock.has_compression_type() && pblock.has_uncompressed_size()) { - std::unique_ptr codec; - get_block_compression_codec(pblock.compression_type(), codec); + BlockCompressionCodec* codec; + get_block_compression_codec(pblock.compression_type(), &codec); uncompressed_size = pblock.uncompressed_size(); compression_scratch.resize(uncompressed_size); Slice decompressed_slice(compression_scratch); @@ -692,15 +693,8 @@ Status Block::filter_block(Block* block, int filter_column_id, int column_to_kee return Status::OK(); } -Status Block::serialize(PBlock* pblock, size_t* uncompressed_bytes, size_t* compressed_bytes, - segment_v2::CompressionTypePB compression_type, - bool allow_transfer_large_data) const { - std::string compression_scratch; - return serialize(pblock, &compression_scratch, uncompressed_bytes, compressed_bytes, - compression_type, allow_transfer_large_data); -} - -Status Block::serialize(PBlock* pblock, std::string* compressed_buffer, size_t* uncompressed_bytes, +Status Block::serialize(PBlock* pblock, + /*std::string* compressed_buffer,*/ size_t* uncompressed_bytes, size_t* compressed_bytes, segment_v2::CompressionTypePB compression_type, bool allow_transfer_large_data) const { CHECK(config::block_data_version <= Block::max_data_version) @@ -720,10 +714,9 @@ Status Block::serialize(PBlock* pblock, std::string* compressed_buffer, size_t* // serialize data values // when data type is HLL, content_uncompressed_size maybe larger than real size. - std::string* column_values = nullptr; + std::string column_values; try { - column_values = pblock->mutable_column_values(); - column_values->resize(content_uncompressed_size); + column_values.resize(content_uncompressed_size); } catch (...) { std::exception_ptr p = std::current_exception(); std::string msg = fmt::format( @@ -732,7 +725,7 @@ Status Block::serialize(PBlock* pblock, std::string* compressed_buffer, size_t* LOG(WARNING) << msg; return Status::BufferAllocFailed(msg); } - char* buf = column_values->data(); + char* buf = column_values.data(); for (const auto& c : *this) { buf = c.type->serialize(*(c.column), buf, config::block_data_version); @@ -745,34 +738,19 @@ Status Block::serialize(PBlock* pblock, std::string* compressed_buffer, size_t* pblock->set_compression_type(compression_type); pblock->set_uncompressed_size(content_uncompressed_size); - std::unique_ptr codec; - RETURN_IF_ERROR(get_block_compression_codec(compression_type, codec)); + BlockCompressionCodec* codec; + RETURN_IF_ERROR(get_block_compression_codec(compression_type, &codec)); - size_t max_compressed_size = codec->max_compressed_len(content_uncompressed_size); - try { - // Try compressing the content to compressed_buffer, - // swap if compressed data is smaller - // Allocation of extra-long contiguous memory may fail, and data compression cannot be used if it fails - compressed_buffer->resize(max_compressed_size); - } catch (...) { - std::exception_ptr p = std::current_exception(); - std::string msg = - fmt::format("Try to alloc {} bytes for compression scratch failed. reason {}", - max_compressed_size, p ? p.__cxa_exception_type()->name() : "null"); - LOG(WARNING) << msg; - return Status::BufferAllocFailed(msg); - } - - Slice compressed_slice(*compressed_buffer); - codec->compress(Slice(column_values->data(), content_uncompressed_size), &compressed_slice); - size_t compressed_size = compressed_slice.size; + faststring buf_compressed; + codec->compress(Slice(column_values.data(), content_uncompressed_size), &buf_compressed); + size_t compressed_size = buf_compressed.size(); if (LIKELY(compressed_size < content_uncompressed_size)) { - compressed_buffer->resize(compressed_size); - column_values->swap(*compressed_buffer); + pblock->set_column_values(buf_compressed.data(), buf_compressed.size()); pblock->set_compressed(true); *compressed_bytes = compressed_size; } else { + pblock->set_column_values(std::move(column_values)); *compressed_bytes = content_uncompressed_size; } diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h index b9a147bcf7..c50960a74a 100644 --- a/be/src/vec/core/block.h +++ b/be/src/vec/core/block.h @@ -277,13 +277,6 @@ public: segment_v2::CompressionTypePB compression_type, bool allow_transfer_large_data = false) const; - // serialize block to PBlock - // compressed_buffer reuse to avoid frequent allocation and deallocation, - // NOTE: compressed_buffer's data may be swapped with pblock->mutable_column_values - Status serialize(PBlock* pblock, std::string* compressed_buffer, size_t* uncompressed_bytes, - size_t* compressed_bytes, segment_v2::CompressionTypePB compression_type, - bool allow_transfer_large_data = false) const; - // serialize block to PRowbatch void serialize(RowBatch*, const RowDescriptor&); diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp index c8192f46b2..ed85e93f5c 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp @@ -36,7 +36,7 @@ Status ColumnChunkReader::init() { size_t chunk_size = _metadata.total_compressed_size; _page_reader = std::make_unique(_stream_reader, start_offset, chunk_size); // get the block compression codec - RETURN_IF_ERROR(get_block_compression_codec(_metadata.codec, _block_compress_codec)); + RETURN_IF_ERROR(get_block_compression_codec(_metadata.codec, &_block_compress_codec)); if (_metadata.__isset.dictionary_page_offset) { // seek to the directory page _page_reader->seek_to_page(_metadata.dictionary_page_offset); diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h index 2b4487ff60..2c6b620cce 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h @@ -129,7 +129,7 @@ private: cctz::time_zone* _ctz; std::unique_ptr _page_reader = nullptr; - std::unique_ptr _block_compress_codec = nullptr; + BlockCompressionCodec* _block_compress_codec = nullptr; LevelDecoder _rep_level_decoder; LevelDecoder _def_level_decoder; diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index 03482ed3b6..fc01d8a632 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -95,7 +95,7 @@ Status VDataStreamSender::Channel::send_current_block(bool eos) { return send_local_block(eos); } auto block = _mutable_block->to_block(); - RETURN_IF_ERROR(_parent->serialize_block(&block, _ch_cur_pb_block, &_compressed_data_buffer)); + RETURN_IF_ERROR(_parent->serialize_block(&block, _ch_cur_pb_block)); block.clear_column_data(); _mutable_block->set_muatable_columns(block.mutate_columns()); RETURN_IF_ERROR(send_block(_ch_cur_pb_block, eos)); @@ -472,8 +472,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block) { RETURN_IF_ERROR(channel->send_local_block(block)); } } else { - RETURN_IF_ERROR(serialize_block(block, _cur_pb_block, &_compressed_data_buffer, - _channels.size())); + RETURN_IF_ERROR(serialize_block(block, _cur_pb_block, _channels.size())); for (auto channel : _channels) { if (channel->is_local()) { RETURN_IF_ERROR(channel->send_local_block(block)); @@ -491,8 +490,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block) { if (current_channel->is_local()) { RETURN_IF_ERROR(current_channel->send_local_block(block)); } else { - RETURN_IF_ERROR(serialize_block(block, current_channel->ch_cur_pb_block(), - &_compressed_data_buffer)); + RETURN_IF_ERROR(serialize_block(block, current_channel->ch_cur_pb_block())); RETURN_IF_ERROR(current_channel->send_block(current_channel->ch_cur_pb_block())); current_channel->ch_roll_pb_block(); } @@ -582,18 +580,12 @@ Status VDataStreamSender::close(RuntimeState* state, Status exec_status) { } Status VDataStreamSender::serialize_block(Block* src, PBlock* dest, int num_receivers) { - return serialize_block(src, dest, &_compressed_data_buffer, num_receivers); -} - -Status VDataStreamSender::serialize_block(Block* src, PBlock* dest, std::string* compressed_buffer, - int num_receivers) { { SCOPED_TIMER(_serialize_batch_timer); dest->Clear(); size_t uncompressed_bytes = 0, compressed_bytes = 0; - RETURN_IF_ERROR(src->serialize(dest, compressed_buffer, &uncompressed_bytes, - &compressed_bytes, _compression_type, - _transfer_large_data_by_brpc)); + RETURN_IF_ERROR(src->serialize(dest, &uncompressed_bytes, &compressed_bytes, + _compression_type, _transfer_large_data_by_brpc)); COUNTER_UPDATE(_bytes_sent_counter, compressed_bytes * num_receivers); COUNTER_UPDATE(_uncompressed_bytes_counter, uncompressed_bytes * num_receivers); COUNTER_UPDATE(_compress_timer, src->get_compress_time()); diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index 16a55fa9af..e537749dd7 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -75,8 +75,6 @@ public: RuntimeState* state() { return _state; } Status serialize_block(Block* src, PBlock* dest, int num_receivers = 1); - Status serialize_block(Block* src, PBlock* dest, std::string* compressed_buffer, - int num_receivers = 1); protected: void _roll_pb_block(); @@ -153,8 +151,6 @@ protected: // User can change this config at runtime, avoid it being modified during query or loading process. bool _transfer_large_data_by_brpc = false; - std::string _compressed_data_buffer; - segment_v2::CompressionTypePB _compression_type; }; @@ -311,7 +307,6 @@ private: PBlock* _ch_cur_pb_block = nullptr; PBlock _ch_pb_block1; PBlock _ch_pb_block2; - std::string _compressed_data_buffer; bool _enable_local_exchange = false; }; diff --git a/be/test/util/block_compression_test.cpp b/be/test/util/block_compression_test.cpp index b1ca291f78..867ee65d20 100644 --- a/be/test/util/block_compression_test.cpp +++ b/be/test/util/block_compression_test.cpp @@ -21,6 +21,8 @@ #include +#include "util/faststring.h" + namespace doris { class BlockCompressionTest : public testing::Test { public: @@ -42,21 +44,19 @@ static std::string generate_str(size_t len) { } void test_single_slice(segment_v2::CompressionTypePB type) { - std::unique_ptr codec; - auto st = get_block_compression_codec(type, codec); + BlockCompressionCodec* codec; + auto st = get_block_compression_codec(type, &codec); EXPECT_TRUE(st.ok()); size_t test_sizes[] = {0, 1, 10, 1000, 1000000}; for (auto size : test_sizes) { auto orig = generate_str(size); - size_t max_len = codec->max_compressed_len(size); - std::string compressed; - compressed.resize(max_len); + faststring compressed_str; { - Slice compressed_slice(compressed); - st = codec->compress(orig, &compressed_slice); + st = codec->compress(orig, &compressed_str); EXPECT_TRUE(st.ok()); + Slice compressed_slice(compressed_str); std::string uncompressed; uncompressed.resize(size); { @@ -86,13 +86,6 @@ void test_single_slice(segment_v2::CompressionTypePB type) { compressed_slice.size += 1; } } - // buffer not enough for compress - if (type != segment_v2::CompressionTypePB::SNAPPY && size > 0) { - Slice compressed_slice(compressed); - compressed_slice.size = 1; - st = codec->compress(orig, &compressed_slice); - EXPECT_FALSE(st.ok()); - } } } @@ -105,8 +98,8 @@ TEST_F(BlockCompressionTest, single) { } void test_multi_slices(segment_v2::CompressionTypePB type) { - std::unique_ptr codec; - auto st = get_block_compression_codec(type, codec); + BlockCompressionCodec* codec; + auto st = get_block_compression_codec(type, &codec); EXPECT_TRUE(st.ok()); size_t test_sizes[] = {0, 1, 10, 1000, 1000000}; @@ -122,15 +115,12 @@ void test_multi_slices(segment_v2::CompressionTypePB type) { } size_t total_size = orig.size(); - size_t max_len = codec->max_compressed_len(total_size); - - std::string compressed; - compressed.resize(max_len); + faststring compressed; { - Slice compressed_slice(compressed); - st = codec->compress(orig_slices, &compressed_slice); + st = codec->compress(orig_slices, total_size, &compressed); EXPECT_TRUE(st.ok()); + Slice compressed_slice(compressed); std::string uncompressed; uncompressed.resize(total_size); // normal case @@ -142,14 +132,6 @@ void test_multi_slices(segment_v2::CompressionTypePB type) { EXPECT_STREQ(orig.c_str(), uncompressed.c_str()); } } - - // buffer not enough failed - if (type != segment_v2::CompressionTypePB::SNAPPY) { - Slice compressed_slice(compressed); - compressed_slice.size = 10; - st = codec->compress(orig, &compressed_slice); - EXPECT_FALSE(st.ok()); - } } TEST_F(BlockCompressionTest, multi) {