diff --git a/be/src/olap/rowset/segment_v2/column_reader.cpp b/be/src/olap/rowset/segment_v2/column_reader.cpp index c9dc6e2c32..d42358c5e7 100644 --- a/be/src/olap/rowset/segment_v2/column_reader.cpp +++ b/be/src/olap/rowset/segment_v2/column_reader.cpp @@ -575,7 +575,7 @@ FileColumnIterator::FileColumnIterator(ColumnReader* reader) : _reader(reader) { Status FileColumnIterator::init(const ColumnIteratorOptions& opts) { _opts = opts; - RETURN_IF_ERROR(get_block_compression_codec(_reader->get_compression(), _compress_codec)); + RETURN_IF_ERROR(get_block_compression_codec(_reader->get_compression(), &_compress_codec)); if (config::enable_low_cardinality_optimize && _reader->encoding_info()->encoding() == DICT_ENCODING) { auto dict_encoding_type = _reader->get_dict_encoding_type(); @@ -860,8 +860,8 @@ Status FileColumnIterator::_read_data_page(const OrdinalPageIndexIterator& iter) Slice page_body; PageFooterPB footer; _opts.type = DATA_PAGE; - RETURN_IF_ERROR(_reader->read_page(_opts, iter.page(), &handle, &page_body, &footer, - _compress_codec.get())); + RETURN_IF_ERROR( + _reader->read_page(_opts, iter.page(), &handle, &page_body, &footer, _compress_codec)); // parse data page RETURN_IF_ERROR(ParsedPage::create(std::move(handle), page_body, footer.data_page_footer(), _reader->encoding_info(), iter.page(), iter.page_index(), @@ -882,7 +882,7 @@ Status FileColumnIterator::_read_data_page(const OrdinalPageIndexIterator& iter) _opts.type = INDEX_PAGE; RETURN_IF_ERROR(_reader->read_page(_opts, _reader->get_dict_page_pointer(), &_dict_page_handle, &dict_data, &dict_footer, - _compress_codec.get())); + _compress_codec)); // ignore dict_footer.dict_page_footer().encoding() due to only // PLAIN_ENCODING is supported for dict page right now _dict_decoder = std::make_unique>( diff --git a/be/src/olap/rowset/segment_v2/column_reader.h b/be/src/olap/rowset/segment_v2/column_reader.h index 92b6572e4e..33c7c418cd 100644 --- a/be/src/olap/rowset/segment_v2/column_reader.h +++ b/be/src/olap/rowset/segment_v2/column_reader.h @@ -329,7 +329,7 @@ private: ColumnReader* _reader; // iterator owned compress codec, should NOT be shared by threads, initialized in init() - std::unique_ptr _compress_codec; + BlockCompressionCodec* _compress_codec; // 1. The _page represents current page. // 2. We define an operation is one seek and following read, diff --git a/be/src/olap/rowset/segment_v2/column_writer.cpp b/be/src/olap/rowset/segment_v2/column_writer.cpp index e49f0270e3..caf9dc0629 100644 --- a/be/src/olap/rowset/segment_v2/column_writer.cpp +++ b/be/src/olap/rowset/segment_v2/column_writer.cpp @@ -264,7 +264,7 @@ ScalarColumnWriter::~ScalarColumnWriter() { } Status ScalarColumnWriter::init() { - RETURN_IF_ERROR(get_block_compression_codec(_opts.meta->compression(), _compress_codec)); + RETURN_IF_ERROR(get_block_compression_codec(_opts.meta->compression(), &_compress_codec)); PageBuilder* page_builder = nullptr; @@ -407,7 +407,7 @@ Status ScalarColumnWriter::write_data() { PagePointer dict_pp; RETURN_IF_ERROR(PageIO::compress_and_write_page( - _compress_codec.get(), _opts.compression_min_space_saving, _file_writer, + _compress_codec, _opts.compression_min_space_saving, _file_writer, {dict_body.slice()}, footer, &dict_pp)); dict_pp.to_proto(_opts.meta->mutable_dict_page()); } @@ -494,8 +494,8 @@ Status ScalarColumnWriter::finish_current_page() { } // trying to compress page body OwnedSlice compressed_body; - RETURN_IF_ERROR(PageIO::compress_page_body( - _compress_codec.get(), _opts.compression_min_space_saving, body, &compressed_body)); + RETURN_IF_ERROR(PageIO::compress_page_body(_compress_codec, _opts.compression_min_space_saving, + body, &compressed_body)); if (compressed_body.slice().empty()) { // page body is uncompressed page->data.emplace_back(std::move(encoded_values)); diff --git a/be/src/olap/rowset/segment_v2/column_writer.h b/be/src/olap/rowset/segment_v2/column_writer.h index 68fef318e5..45829b37d9 100644 --- a/be/src/olap/rowset/segment_v2/column_writer.h +++ b/be/src/olap/rowset/segment_v2/column_writer.h @@ -242,7 +242,7 @@ private: PageHead _pages; ordinal_t _first_rowid = 0; - std::unique_ptr _compress_codec; + BlockCompressionCodec* _compress_codec; std::unique_ptr _ordinal_index_builder; std::unique_ptr _zone_map_index_builder; diff --git a/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp b/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp index 02f27aa6e6..31bab3a912 100644 --- a/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp +++ b/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp @@ -69,10 +69,10 @@ Status IndexedColumnReader::load_index_page(const PagePointerPB& pp, PageHandle* IndexPageReader* reader) { Slice body; PageFooterPB footer; - std::unique_ptr local_compress_codec; - RETURN_IF_ERROR(get_block_compression_codec(_meta.compression(), local_compress_codec)); + BlockCompressionCodec* local_compress_codec; + RETURN_IF_ERROR(get_block_compression_codec(_meta.compression(), &local_compress_codec)); RETURN_IF_ERROR(read_page(PagePointer(pp), handle, &body, &footer, INDEX_PAGE, - local_compress_codec.get(), false)); + local_compress_codec, false)); RETURN_IF_ERROR(reader->parse(body, footer.index_page_footer())); _mem_size += body.get_size(); return Status::OK(); @@ -101,14 +101,14 @@ Status IndexedColumnReader::read_page(const PagePointer& pp, PageHandle* handle, Status IndexedColumnIterator::_read_data_page(const PagePointer& pp) { // there is not init() for IndexedColumnIterator, so do it here if (!_compress_codec) { - RETURN_IF_ERROR(get_block_compression_codec(_reader->get_compression(), _compress_codec)); + RETURN_IF_ERROR(get_block_compression_codec(_reader->get_compression(), &_compress_codec)); } PageHandle handle; Slice body; PageFooterPB footer; - RETURN_IF_ERROR(_reader->read_page(pp, &handle, &body, &footer, DATA_PAGE, - _compress_codec.get(), true)); + RETURN_IF_ERROR( + _reader->read_page(pp, &handle, &body, &footer, DATA_PAGE, _compress_codec, true)); // parse data page // note that page_index is not used in IndexedColumnIterator, so we pass 0 PageDecoderOptions opts; diff --git a/be/src/olap/rowset/segment_v2/indexed_column_reader.h b/be/src/olap/rowset/segment_v2/indexed_column_reader.h index 15b4c175d5..a991e8d4a8 100644 --- a/be/src/olap/rowset/segment_v2/indexed_column_reader.h +++ b/be/src/olap/rowset/segment_v2/indexed_column_reader.h @@ -143,7 +143,7 @@ private: // next_batch() will read from this position ordinal_t _current_ordinal = 0; // iterator owned compress codec, should NOT be shared by threads, initialized before used - std::unique_ptr _compress_codec; + BlockCompressionCodec* _compress_codec = nullptr; }; } // namespace segment_v2 diff --git a/be/src/olap/rowset/segment_v2/indexed_column_writer.cpp b/be/src/olap/rowset/segment_v2/indexed_column_writer.cpp index 2d2bdd620e..cfce5afbb8 100644 --- a/be/src/olap/rowset/segment_v2/indexed_column_writer.cpp +++ b/be/src/olap/rowset/segment_v2/indexed_column_writer.cpp @@ -73,7 +73,7 @@ Status IndexedColumnWriter::init() { } if (_options.compression != NO_COMPRESSION) { - RETURN_IF_ERROR(get_block_compression_codec(_options.compression, _compress_codec)); + RETURN_IF_ERROR(get_block_compression_codec(_options.compression, &_compress_codec)); } return Status::OK(); } @@ -112,7 +112,7 @@ Status IndexedColumnWriter::_finish_current_data_page() { footer.mutable_data_page_footer()->set_nullmap_size(0); RETURN_IF_ERROR(PageIO::compress_and_write_page( - _compress_codec.get(), _options.compression_min_space_saving, _file_writer, + _compress_codec, _options.compression_min_space_saving, _file_writer, {page_body.slice()}, footer, &_last_data_page)); _num_data_pages++; @@ -160,7 +160,7 @@ Status IndexedColumnWriter::_flush_index(IndexPageBuilder* index_builder, BTreeM PagePointer pp; RETURN_IF_ERROR(PageIO::compress_and_write_page( - _compress_codec.get(), _options.compression_min_space_saving, _file_writer, + _compress_codec, _options.compression_min_space_saving, _file_writer, {page_body.slice()}, page_footer, &pp)); meta->set_is_root_data_page(false); diff --git a/be/src/olap/rowset/segment_v2/indexed_column_writer.h b/be/src/olap/rowset/segment_v2/indexed_column_writer.h index 8be96d7b35..0c2df791f1 100644 --- a/be/src/olap/rowset/segment_v2/indexed_column_writer.h +++ b/be/src/olap/rowset/segment_v2/indexed_column_writer.h @@ -108,7 +108,7 @@ private: std::unique_ptr _value_index_builder; // encoder for value index's key const KeyCoder* _value_key_coder; - std::unique_ptr _compress_codec; + BlockCompressionCodec* _compress_codec; DISALLOW_COPY_AND_ASSIGN(IndexedColumnWriter); }; diff --git a/be/src/olap/rowset/segment_v2/page_io.cpp b/be/src/olap/rowset/segment_v2/page_io.cpp index 4388feb6dc..affb73d303 100644 --- a/be/src/olap/rowset/segment_v2/page_io.cpp +++ b/be/src/olap/rowset/segment_v2/page_io.cpp @@ -35,27 +35,19 @@ namespace segment_v2 { using strings::Substitute; -Status PageIO::compress_page_body(const BlockCompressionCodec* codec, double min_space_saving, +Status PageIO::compress_page_body(BlockCompressionCodec* codec, double min_space_saving, const std::vector& body, OwnedSlice* compressed_body) { size_t uncompressed_size = Slice::compute_total_size(body); - if (codec != nullptr && uncompressed_size > 0) { - size_t max_compressed_size = codec->max_compressed_len(uncompressed_size); - if (max_compressed_size) { - faststring buf; - buf.resize(max_compressed_size); - Slice compressed_slice(buf); - RETURN_IF_ERROR(codec->compress(body, &compressed_slice)); - buf.resize(compressed_slice.get_size()); - - double space_saving = 1.0 - static_cast(buf.size()) / uncompressed_size; - // return compressed body only when it saves more than min_space_saving - if (space_saving > 0 && space_saving >= min_space_saving) { - // shrink the buf to fit the len size to avoid taking - // up the memory of the size MAX_COMPRESSED_SIZE - buf.shrink_to_fit(); - *compressed_body = buf.build(); - return Status::OK(); - } + if (codec != nullptr && !codec->exceed_max_compress_len(uncompressed_size)) { + faststring buf; + RETURN_IF_ERROR(codec->compress(body, uncompressed_size, &buf)); + double space_saving = 1.0 - static_cast(buf.size()) / uncompressed_size; + // return compressed body only when it saves more than min_space_saving + if (space_saving > 0 && space_saving >= min_space_saving) { + // shrink the buf to fit the len size to avoid taking + // up the memory of the size MAX_COMPRESSED_SIZE + *compressed_body = buf.build(); + return Status::OK(); } } // otherwise, do not compress diff --git a/be/src/olap/rowset/segment_v2/page_io.h b/be/src/olap/rowset/segment_v2/page_io.h index 86cff00f89..d08f05c8ba 100644 --- a/be/src/olap/rowset/segment_v2/page_io.h +++ b/be/src/olap/rowset/segment_v2/page_io.h @@ -50,7 +50,7 @@ struct PageReadOptions { // location of the page PagePointer page_pointer; // decompressor for page body (null means page body is not compressed) - const BlockCompressionCodec* codec = nullptr; + BlockCompressionCodec* codec = nullptr; // used to collect IO metrics OlapReaderStatistics* stats = nullptr; // whether to verify page checksum @@ -89,7 +89,7 @@ public: // Compress `body' using `codec' into `compressed_body'. // The size of returned `compressed_body' is 0 when the body is not compressed, this // could happen when `codec' is null or space saving is less than `min_space_saving'. - static Status compress_page_body(const BlockCompressionCodec* codec, double min_space_saving, + static Status compress_page_body(BlockCompressionCodec* codec, double min_space_saving, const std::vector& body, OwnedSlice* compressed_body); // Encode page from `body' and `footer' and write to `file'. @@ -99,9 +99,8 @@ public: const PageFooterPB& footer, PagePointer* result); // Convenient function to compress page body and write page in one go. - static Status compress_and_write_page(const BlockCompressionCodec* codec, - double min_space_saving, io::FileWriter* writer, - const std::vector& body, + static Status compress_and_write_page(BlockCompressionCodec* codec, double min_space_saving, + io::FileWriter* writer, const std::vector& body, const PageFooterPB& footer, PagePointer* result) { DCHECK_EQ(footer.uncompressed_size(), Slice::compute_total_size(body)); OwnedSlice compressed_body; diff --git a/be/src/olap/tablet_schema.h b/be/src/olap/tablet_schema.h index ce7a1171f8..300b603e83 100644 --- a/be/src/olap/tablet_schema.h +++ b/be/src/olap/tablet_schema.h @@ -221,4 +221,4 @@ bool operator!=(const TabletSchema& a, const TabletSchema& b); using TabletSchemaSPtr = std::shared_ptr; -} // namespace doris +} // namespace doris \ No newline at end of file diff --git a/be/src/util/block_compression.cpp b/be/src/util/block_compression.cpp index 4c58a7f86a..ac34cfc17d 100644 --- a/be/src/util/block_compression.cpp +++ b/be/src/util/block_compression.cpp @@ -28,47 +28,91 @@ #include #include "gutil/strings/substitute.h" +#include "util/defer_op.h" #include "util/faststring.h" namespace doris { using strings::Substitute; -Status BlockCompressionCodec::compress(const std::vector& inputs, Slice* output) const { +Status BlockCompressionCodec::compress(const std::vector& inputs, size_t uncompressed_size, + faststring* output) { faststring buf; // we compute total size to avoid more memory copy - size_t total_size = Slice::compute_total_size(inputs); - buf.reserve(total_size); + buf.reserve(uncompressed_size); for (auto& input : inputs) { buf.append(input.data, input.size); } return compress(buf, output); } +bool BlockCompressionCodec::exceed_max_compress_len(size_t uncompressed_size) { + if (uncompressed_size > std::numeric_limits::max()) { + return true; + } + return false; +} + class Lz4BlockCompression : public BlockCompressionCodec { +private: + struct Context { + Context() : ctx(nullptr) {} + LZ4_stream_t* ctx; + faststring buffer; + }; + public: - static const Lz4BlockCompression* instance() { + static Lz4BlockCompression* instance() { static Lz4BlockCompression s_instance; return &s_instance; } - ~Lz4BlockCompression() override {} + ~Lz4BlockCompression() { + for (auto ctx : _ctx_pool) { + _delete_compression_ctx(ctx); + } + } - Status compress(const Slice& input, Slice* output) const override { - if (input.size > std::numeric_limits::max() || - output->size > std::numeric_limits::max()) { - return Status::InvalidArgument("LZ4 cannot handle data large than 2G"); + Status compress(const Slice& input, faststring* output) override { + Context* context; + RETURN_IF_ERROR(_acquire_compression_ctx(&context)); + bool compress_failed = false; + Defer defer {[&] { + if (compress_failed) { + _delete_compression_ctx(context); + } else { + _release_compression_ctx(context); + } + }}; + Slice compressed_buf; + size_t max_len = max_compressed_len(input.size); + if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) { + // use output directly + output->resize(max_len); + compressed_buf.data = reinterpret_cast(output->data()); + compressed_buf.size = max_len; + } else { + // reuse context buffer if max_len < MAX_COMPRESSION_BUFFER_FOR_REUSE + context->buffer.resize(max_len); + compressed_buf.data = reinterpret_cast(context->buffer.data()); + compressed_buf.size = max_len; } - auto compressed_len = - LZ4_compress_default(input.data, output->data, input.size, output->size); + + size_t compressed_len = + LZ4_compress_fast_continue(context->ctx, input.data, compressed_buf.data, + input.size, compressed_buf.size, ACCELARATION); if (compressed_len == 0) { + compress_failed = true; return Status::InvalidArgument("Output buffer's capacity is not enough, size={}", - output->size); + compressed_buf.size); + } + output->resize(compressed_len); + if (max_len < MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) { + output->assign_copy(reinterpret_cast(compressed_buf.data), compressed_len); } - output->size = compressed_len; return Status::OK(); } - Status decompress(const Slice& input, Slice* output) const override { + Status decompress(const Slice& input, Slice* output) override { auto decompressed_len = LZ4_decompress_safe(input.data, output->data, input.size, output->size); if (decompressed_len < 0) { @@ -78,122 +122,261 @@ public: return Status::OK(); } - size_t max_compressed_len(size_t len) const override { - if (len > std::numeric_limits::max()) { - return 0; + size_t max_compressed_len(size_t len) override { return LZ4_compressBound(len); } + +private: + // reuse LZ4 compress stream + Status _acquire_compression_ctx(Context** out) { + if (_ctx_pool.empty()) { + Context* context = new (std::nothrow) Context(); + if (context == nullptr) { + return Status::InvalidArgument("new LZ4 context error"); + } + context->ctx = LZ4_createStream(); + if (context->ctx == nullptr) { + delete context; + return Status::InvalidArgument("LZ4_createStream error"); + } + *out = context; + return Status::OK(); } - return LZ4_compressBound(len); + std::lock_guard l(_ctx_mutex); + *out = _ctx_pool.back(); + _ctx_pool.pop_back(); + return Status::OK(); } + void _release_compression_ctx(Context* context) { + DCHECK(context); + LZ4_resetStream(context->ctx); + std::lock_guard l(_ctx_mutex); + _ctx_pool.push_back(context); + } + void _delete_compression_ctx(Context* context) { + DCHECK(context); + LZ4_freeStream(context->ctx); + delete context; + } + +private: + mutable std::mutex _ctx_mutex; + mutable std::vector _ctx_pool; + static const int32_t ACCELARATION = 1; }; // Used for LZ4 frame format, decompress speed is two times faster than LZ4. class Lz4fBlockCompression : public BlockCompressionCodec { +private: + struct CContext { + CContext() : ctx(nullptr) {} + LZ4F_compressionContext_t ctx; + faststring buffer; + }; + struct DContext { + DContext() : ctx(nullptr) {} + LZ4F_decompressionContext_t ctx; + }; + public: - Status init() override { - auto ret1 = LZ4F_createCompressionContext(&ctx_c, LZ4F_VERSION); - if (LZ4F_isError(ret1)) { - return Status::InvalidArgument(strings::Substitute( - "Fail to LZ4F_createCompressionContext, msg=$0", LZ4F_getErrorName(ret1))); + static Lz4fBlockCompression* instance() { + static Lz4fBlockCompression s_instance; + return &s_instance; + } + ~Lz4fBlockCompression() { + for (auto ctx : _ctx_c_pool) { + _delete_compression_ctx(ctx); } - ctx_c_inited = true; - - auto ret2 = LZ4F_createDecompressionContext(&ctx_d, LZ4F_VERSION); - if (LZ4F_isError(ret2)) { - return Status::InvalidArgument(strings::Substitute( - "Fail to LZ4F_createDecompressionContext, msg=$0", LZ4F_getErrorName(ret2))); + for (auto ctx : _ctx_d_pool) { + _delete_decompression_ctx(ctx); } - ctx_d_inited = true; - - return Status::OK(); } - ~Lz4fBlockCompression() override { - if (ctx_c_inited) LZ4F_freeCompressionContext(ctx_c); - if (ctx_d_inited) LZ4F_freeDecompressionContext(ctx_d); - } - - Status compress(const Slice& input, Slice* output) const override { + Status compress(const Slice& input, faststring* output) override { std::vector inputs {input}; - return compress(inputs, output); + return compress(inputs, input.size, output); } - Status compress(const std::vector& inputs, Slice* output) const override { - if (!ctx_c_inited) - return Status::InvalidArgument("LZ4F_createCompressionContext not sucess"); - - return _compress(ctx_c, inputs, output); + Status compress(const std::vector& inputs, size_t uncompressed_size, + faststring* output) override { + return _compress(inputs, uncompressed_size, output); } - Status decompress(const Slice& input, Slice* output) const override { - if (!ctx_d_inited) - return Status::InvalidArgument("LZ4F_createDecompressionContext not sucess"); - - return _decompress(ctx_d, input, output); + Status decompress(const Slice& input, Slice* output) override { + return _decompress(input, output); } - size_t max_compressed_len(size_t len) const override { - if (len > std::numeric_limits::max()) { - return 0; - } + size_t max_compressed_len(size_t len) override { return std::max(LZ4F_compressBound(len, &_s_preferences), LZ4F_compressFrameBound(len, &_s_preferences)); } private: - Status _compress(LZ4F_compressionContext_t ctx, const std::vector& inputs, - Slice* output) const { - auto wbytes = LZ4F_compressBegin(ctx, output->data, output->size, &_s_preferences); + Status _compress(const std::vector& inputs, size_t uncompressed_size, + faststring* output) { + CContext* context; + RETURN_IF_ERROR(_acquire_compression_ctx(&context)); + bool compress_failed = false; + Defer defer {[&] { + if (compress_failed) { + _delete_compression_ctx(context); + } else { + _release_compression_ctx(context); + } + }}; + Slice compressed_buf; + size_t max_len = max_compressed_len(uncompressed_size); + if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) { + // use output directly + output->resize(max_len); + compressed_buf.data = reinterpret_cast(output->data()); + compressed_buf.size = max_len; + } else { + // reuse context buffer if max_len < MAX_COMPRESSION_BUFFER_FOR_REUSE + context->buffer.resize(max_len); + compressed_buf.data = reinterpret_cast(context->buffer.data()); + compressed_buf.size = max_len; + } + + auto wbytes = LZ4F_compressBegin(context->ctx, compressed_buf.data, compressed_buf.size, + &_s_preferences); if (LZ4F_isError(wbytes)) { + compress_failed = true; return Status::InvalidArgument("Fail to do LZ4F compress begin, res={}", LZ4F_getErrorName(wbytes)); } size_t offset = wbytes; for (auto input : inputs) { - wbytes = LZ4F_compressUpdate(ctx, output->data + offset, output->size - offset, - input.data, input.size, nullptr); + wbytes = LZ4F_compressUpdate(context->ctx, compressed_buf.data + offset, + compressed_buf.size - offset, input.data, input.size, + nullptr); if (LZ4F_isError(wbytes)) { + compress_failed = true; return Status::InvalidArgument("Fail to do LZ4F compress update, res={}", LZ4F_getErrorName(wbytes)); } offset += wbytes; } - wbytes = LZ4F_compressEnd(ctx, output->data + offset, output->size - offset, nullptr); + wbytes = LZ4F_compressEnd(context->ctx, compressed_buf.data + offset, + compressed_buf.size - offset, nullptr); if (LZ4F_isError(wbytes)) { + compress_failed = true; return Status::InvalidArgument("Fail to do LZ4F compress end, res={}", LZ4F_getErrorName(wbytes)); } offset += wbytes; - output->size = offset; + output->resize(offset); + if (max_len < MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) { + output->assign_copy(reinterpret_cast(compressed_buf.data), offset); + } + return Status::OK(); } - Status _decompress(LZ4F_decompressionContext_t ctx, const Slice& input, Slice* output) const { - // reset decompression context to avoid ERROR_maxBlockSize_invalid - LZ4F_resetDecompressionContext(ctx); + Status _decompress(const Slice& input, Slice* output) { + bool decompress_failed = false; + DContext* context = nullptr; + RETURN_IF_ERROR(_acquire_decompression_ctx(&context)); + Defer defer {[&] { + if (decompress_failed) { + _delete_decompression_ctx(context); + } else { + _release_decompression_ctx(context); + } + }}; size_t input_size = input.size; - auto lres = - LZ4F_decompress(ctx, output->data, &output->size, input.data, &input_size, nullptr); + auto lres = LZ4F_decompress(context->ctx, output->data, &output->size, input.data, + &input_size, nullptr); if (LZ4F_isError(lres)) { + decompress_failed = true; return Status::InvalidArgument("Fail to do LZ4F decompress, res={}", LZ4F_getErrorName(lres)); } else if (input_size != input.size) { + decompress_failed = true; return Status::InvalidArgument( strings::Substitute("Fail to do LZ4F decompress: trailing data left in " "compressed data, read=$0 vs given=$1", input_size, input.size)); } else if (lres != 0) { + decompress_failed = true; return Status::InvalidArgument( "Fail to do LZ4F decompress: expect more compressed data, expect={}", lres); } return Status::OK(); } +private: + // acquire a compression ctx from pool, release while finish compress, + // delete if compression failed + Status _acquire_compression_ctx(CContext** out) { + if (_ctx_c_pool.empty()) { + CContext* context = new (std::nothrow) CContext(); + if (context == nullptr) { + return Status::InvalidArgument("failed to new LZ4F CContext"); + } + auto res = LZ4F_createCompressionContext(&context->ctx, LZ4F_VERSION); + if (LZ4F_isError(res) != 0) { + return Status::InvalidArgument(strings::Substitute( + "LZ4F_createCompressionContext error, res=$0", LZ4F_getErrorName(res))); + } + *out = context; + return Status::OK(); + } + std::lock_guard l(_ctx_c_mutex); + *out = _ctx_c_pool.back(); + _ctx_c_pool.pop_back(); + return Status::OK(); + } + void _release_compression_ctx(CContext* context) { + DCHECK(context); + std::lock_guard l(_ctx_c_mutex); + _ctx_c_pool.push_back(context); + } + void _delete_compression_ctx(CContext* context) { + DCHECK(context); + LZ4F_freeCompressionContext(context->ctx); + delete context; + } + + Status _acquire_decompression_ctx(DContext** out) { + std::lock_guard l(_ctx_d_mutex); + if (_ctx_d_pool.empty()) { + DContext* context = new (std::nothrow) DContext(); + if (context == nullptr) { + return Status::InvalidArgument("failed to new LZ4F DContext"); + } + auto res = LZ4F_createDecompressionContext(&context->ctx, LZ4F_VERSION); + if (LZ4F_isError(res) != 0) { + return Status::InvalidArgument(strings::Substitute( + "LZ4F_createDeompressionContext error, res=$0", LZ4F_getErrorName(res))); + } + *out = context; + return Status::OK(); + } + *out = _ctx_d_pool.back(); + _ctx_d_pool.pop_back(); + return Status::OK(); + } + void _release_decompression_ctx(DContext* context) { + DCHECK(context); + // reset decompression context to avoid ERROR_maxBlockSize_invalid + LZ4F_resetDecompressionContext(context->ctx); + std::lock_guard l(_ctx_d_mutex); + _ctx_d_pool.push_back(context); + } + void _delete_decompression_ctx(DContext* context) { + DCHECK(context); + LZ4F_freeDecompressionContext(context->ctx); + delete context; + } + private: static LZ4F_preferences_t _s_preferences; - LZ4F_compressionContext_t ctx_c; - bool ctx_c_inited = false; - LZ4F_decompressionContext_t ctx_d; - bool ctx_d_inited = false; + + std::mutex _ctx_c_mutex; + // LZ4F_compressionContext_t is a pointer so no copy here + std::vector _ctx_c_pool; + + std::mutex _ctx_d_mutex; + std::vector _ctx_d_pool; }; LZ4F_preferences_t Lz4fBlockCompression::_s_preferences = { @@ -273,18 +456,23 @@ private: class SnappyBlockCompression : public BlockCompressionCodec { public: - static const SnappyBlockCompression* instance() { + static SnappyBlockCompression* instance() { static SnappyBlockCompression s_instance; return &s_instance; } ~SnappyBlockCompression() override {} - Status compress(const Slice& input, Slice* output) const override { - snappy::RawCompress(input.data, input.size, output->data, &output->size); + Status compress(const Slice& input, faststring* output) override { + size_t max_len = max_compressed_len(input.size); + output->resize(max_len); + Slice s(*output); + + snappy::RawCompress(input.data, input.size, s.data, &s.size); + output->resize(s.size); return Status::OK(); } - Status decompress(const Slice& input, Slice* output) const override { + Status decompress(const Slice& input, Slice* output) override { if (!snappy::RawUncompress(input.data, input.size, output->data)) { return Status::InvalidArgument("Fail to do Snappy decompress"); } @@ -293,35 +481,46 @@ public: return Status::OK(); } - Status compress(const std::vector& inputs, Slice* output) const override { + Status compress(const std::vector& inputs, size_t uncompressed_size, + faststring* output) override { + auto max_len = max_compressed_len(uncompressed_size); + output->resize(max_len); + SnappySlicesSource source(inputs); - snappy::UncheckedByteArraySink sink(output->data); - output->size = snappy::Compress(&source, &sink); + snappy::UncheckedByteArraySink sink(reinterpret_cast(output->data())); + output->resize(snappy::Compress(&source, &sink)); return Status::OK(); } - size_t max_compressed_len(size_t len) const override { - return snappy::MaxCompressedLength(len); - } + size_t max_compressed_len(size_t len) override { return snappy::MaxCompressedLength(len); } }; class ZlibBlockCompression : public BlockCompressionCodec { public: - static const ZlibBlockCompression* instance() { + static ZlibBlockCompression* instance() { static ZlibBlockCompression s_instance; return &s_instance; } ~ZlibBlockCompression() {} - Status compress(const Slice& input, Slice* output) const override { - auto zres = ::compress((Bytef*)output->data, &output->size, (Bytef*)input.data, input.size); + Status compress(const Slice& input, faststring* output) override { + size_t max_len = max_compressed_len(input.size); + output->resize(max_len); + Slice s(*output); + + auto zres = ::compress((Bytef*)s.data, &s.size, (Bytef*)input.data, input.size); if (zres != Z_OK) { return Status::InvalidArgument("Fail to do ZLib compress, error={}", zError(zres)); } + output->resize(s.size); return Status::OK(); } - Status compress(const std::vector& inputs, Slice* output) const override { + Status compress(const std::vector& inputs, size_t uncompressed_size, + faststring* output) override { + size_t max_len = max_compressed_len(uncompressed_size); + output->resize(max_len); + z_stream zstrm; zstrm.zalloc = Z_NULL; zstrm.zfree = Z_NULL; @@ -332,8 +531,8 @@ public: zError(zres), zres); } // we assume that output is e - zstrm.next_out = (Bytef*)output->data; - zstrm.avail_out = output->size; + zstrm.next_out = (Bytef*)output->data(); + zstrm.avail_out = output->size(); for (int i = 0; i < inputs.size(); ++i) { if (inputs[i].size == 0) { continue; @@ -349,7 +548,7 @@ public: } } - output->size = zstrm.total_out; + output->resize(zstrm.total_out); zres = deflateEnd(&zstrm); if (zres != Z_OK) { return Status::InvalidArgument("Fail to do deflateEnd on ZLib stream, error={}, res={}", @@ -358,7 +557,7 @@ public: return Status::OK(); } - Status decompress(const Slice& input, Slice* output) const override { + Status decompress(const Slice& input, Slice* output) override { size_t input_size = input.size; auto zres = ::uncompress2((Bytef*)output->data, &output->size, (Bytef*)input.data, &input_size); @@ -368,7 +567,7 @@ public: return Status::OK(); } - size_t max_compressed_len(size_t len) const override { + size_t max_compressed_len(size_t len) override { // one-time overhead of six bytes for the entire stream plus five bytes per 16 KB block return len + 6 + 5 * ((len >> 14) + 1); } @@ -376,68 +575,82 @@ public: // for ZSTD compression and decompression, with BOTH fast and high compression ratio class ZstdBlockCompression : public BlockCompressionCodec { +private: + struct CContext { + CContext() : ctx(nullptr) {} + ZSTD_CCtx* ctx; + faststring buffer; + }; + struct DContext { + DContext() : ctx(nullptr) {} + ZSTD_DCtx* ctx; + }; + public: - // reenterable initialization for compress/decompress context - inline Status init() override { - if (!ctx_c) { - ctx_c = ZSTD_createCCtx(); - if (!ctx_c) { - return Status::InvalidArgument("Fail to ZSTD_createCCtx"); - } + static ZstdBlockCompression* instance() { + static ZstdBlockCompression s_instance; + return &s_instance; + } + ~ZstdBlockCompression() { + for (auto ctx : _ctx_c_pool) { + _delete_compression_ctx(ctx); } - - if (!ctx_d) { - ctx_d = ZSTD_createDCtx(); - if (!ctx_d) { - return Status::InvalidArgument("Fail to ZSTD_createDCtx"); - } + for (auto ctx : _ctx_d_pool) { + _delete_decompression_ctx(ctx); } - - return Status::OK(); } - ~ZstdBlockCompression() override { - if (ctx_c) ZSTD_freeCCtx(ctx_c); - if (ctx_d) ZSTD_freeDCtx(ctx_d); - } + size_t max_compressed_len(size_t len) override { return ZSTD_compressBound(len); } - size_t max_compressed_len(size_t len) const override { - if (len > std::numeric_limits::max()) { - return 0; - } - return ZSTD_compressBound(len); - } - - Status compress(const Slice& input, Slice* output) const override { + Status compress(const Slice& input, faststring* output) override { std::vector inputs {input}; - return compress(inputs, output); + return compress(inputs, input.size, output); } // follow ZSTD official example // https://github.com/facebook/zstd/blob/dev/examples/streaming_compression.c - Status compress(const std::vector& inputs, Slice* output) const override { - if (!ctx_c) return Status::InvalidArgument("compression context NOT initialized"); + Status compress(const std::vector& inputs, size_t uncompressed_size, + faststring* output) override { + CContext* context; + RETURN_IF_ERROR(_acquire_compression_ctx(&context)); + bool compress_failed = false; + Defer defer {[&] { + if (compress_failed) { + _delete_compression_ctx(context); + } else { + _release_compression_ctx(context); + } + }}; - // reset ctx to start new compress session - auto ret = ZSTD_CCtx_reset(ctx_c, ZSTD_reset_session_only); - if (ZSTD_isError(ret)) { - return Status::InvalidArgument("ZSTD_CCtx_reset error: {}", - ZSTD_getErrorString(ZSTD_getErrorCode(ret))); + size_t max_len = max_compressed_len(uncompressed_size); + Slice compressed_buf; + if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) { + // use output directly + output->resize(max_len); + compressed_buf.data = reinterpret_cast(output->data()); + compressed_buf.size = max_len; + } else { + // reuse context buffer if max_len < MAX_COMPRESSION_BUFFER_FOR_REUSE + context->buffer.resize(max_len); + compressed_buf.data = reinterpret_cast(context->buffer.data()); + compressed_buf.size = max_len; } + // set compression level to default 3 - ret = ZSTD_CCtx_setParameter(ctx_c, ZSTD_c_compressionLevel, ZSTD_CLEVEL_DEFAULT); + auto ret = + ZSTD_CCtx_setParameter(context->ctx, ZSTD_c_compressionLevel, ZSTD_CLEVEL_DEFAULT); if (ZSTD_isError(ret)) { return Status::InvalidArgument("ZSTD_CCtx_setParameter compression level error: {}", ZSTD_getErrorString(ZSTD_getErrorCode(ret))); } // set checksum flag to 1 - ret = ZSTD_CCtx_setParameter(ctx_c, ZSTD_c_checksumFlag, 1); + ret = ZSTD_CCtx_setParameter(context->ctx, ZSTD_c_checksumFlag, 1); if (ZSTD_isError(ret)) { return Status::InvalidArgument("ZSTD_CCtx_setParameter checksumFlag error: {}", ZSTD_getErrorString(ZSTD_getErrorCode(ret))); } - ZSTD_outBuffer out_buf = {output->data, output->size, 0}; + ZSTD_outBuffer out_buf = {compressed_buf.data, compressed_buf.size, 0}; for (size_t i = 0; i < inputs.size(); i++) { ZSTD_inBuffer in_buf = {inputs[i].data, inputs[i].size, 0}; @@ -448,15 +661,17 @@ public: bool finished = false; do { // do compress - auto ret = ZSTD_compressStream2(ctx_c, &out_buf, &in_buf, mode); + auto ret = ZSTD_compressStream2(context->ctx, &out_buf, &in_buf, mode); if (ZSTD_isError(ret)) { + compress_failed = true; return Status::InvalidArgument("ZSTD_compressStream2 error: {}", ZSTD_getErrorString(ZSTD_getErrorCode(ret))); } // ret is ZSTD hint for needed output buffer size if (ret > 0 && out_buf.pos == out_buf.size) { + compress_failed = true; return Status::InvalidArgument("ZSTD_compressStream2 output buffer full"); } @@ -465,37 +680,44 @@ public: } // set compressed size for caller - output->size = out_buf.pos; + output->resize(out_buf.pos); + if (max_len < MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) { + output->assign_copy(reinterpret_cast(compressed_buf.data), out_buf.pos); + } return Status::OK(); } // follow ZSTD official example // https://github.com/facebook/zstd/blob/dev/examples/streaming_decompression.c - Status decompress(const Slice& input, Slice* output) const override { - if (!ctx_d) return Status::InvalidArgument("decompression context NOT initialized"); - - // reset ctx to start a new decompress session - auto ret = ZSTD_DCtx_reset(ctx_d, ZSTD_reset_session_only); - if (ZSTD_isError(ret)) { - return Status::InvalidArgument("ZSTD_DCtx_reset error: {}", - ZSTD_getErrorString(ZSTD_getErrorCode(ret))); - } + Status decompress(const Slice& input, Slice* output) override { + DContext* context; + bool compress_failed = false; + RETURN_IF_ERROR(_acquire_decompression_ctx(&context)); + Defer defer {[&] { + if (compress_failed) { + _delete_decompression_ctx(context); + } else { + _release_decompression_ctx(context); + } + }}; ZSTD_inBuffer in_buf = {input.data, input.size, 0}; ZSTD_outBuffer out_buf = {output->data, output->size, 0}; while (in_buf.pos < in_buf.size) { // do decompress - auto ret = ZSTD_decompressStream(ctx_d, &out_buf, &in_buf); + auto ret = ZSTD_decompressStream(context->ctx, &out_buf, &in_buf); if (ZSTD_isError(ret)) { + compress_failed = true; return Status::InvalidArgument("ZSTD_decompressStream error: {}", ZSTD_getErrorString(ZSTD_getErrorCode(ret))); } // ret is ZSTD hint for needed output buffer size if (ret > 0 && out_buf.pos == out_buf.size) { + compress_failed = true; return Status::InvalidArgument("ZSTD_decompressStream output buffer full"); } } @@ -507,16 +729,87 @@ public: } private: - // will be reused by compress/decompress - ZSTD_CCtx* ctx_c = nullptr; - ZSTD_DCtx* ctx_d = nullptr; + Status _acquire_compression_ctx(CContext** out) { + if (_ctx_c_pool.empty()) { + CContext* context = new (std::nothrow) CContext(); + if (context == nullptr) { + return Status::InvalidArgument("failed to new ZSTD CContext"); + } + //typedef LZ4F_cctx* LZ4F_compressionContext_t; + context->ctx = ZSTD_createCCtx(); + if (context->ctx == nullptr) { + return Status::InvalidArgument("Failed to create ZSTD compress ctx"); + } + *out = context; + return Status::OK(); + } + std::lock_guard l(_ctx_c_mutex); + *out = _ctx_c_pool.back(); + _ctx_c_pool.pop_back(); + return Status::OK(); + } + void _release_compression_ctx(CContext* context) { + DCHECK(context); + auto ret = ZSTD_CCtx_reset(context->ctx, ZSTD_reset_session_only); + DCHECK(!ZSTD_isError(ret)); + std::lock_guard l(_ctx_c_mutex); + _ctx_c_pool.push_back(context); + } + void _delete_compression_ctx(CContext* context) { + DCHECK(context); + ZSTD_freeCCtx(context->ctx); + delete context; + } + + Status _acquire_decompression_ctx(DContext** out) { + std::lock_guard l(_ctx_d_mutex); + if (_ctx_d_pool.empty()) { + DContext* context = new (std::nothrow) DContext(); + if (context == nullptr) { + return Status::InvalidArgument("failed to new ZSTD DContext"); + } + context->ctx = ZSTD_createDCtx(); + if (context->ctx == nullptr) { + return Status::InvalidArgument("Fail to init ZSTD decompress context"); + } + *out = context; + return Status::OK(); + } + *out = _ctx_d_pool.back(); + _ctx_d_pool.pop_back(); + return Status::OK(); + } + void _release_decompression_ctx(DContext* context) { + DCHECK(context); + // reset ctx to start a new decompress session + auto ret = ZSTD_DCtx_reset(context->ctx, ZSTD_reset_session_only); + DCHECK(!ZSTD_isError(ret)); + std::lock_guard l(_ctx_d_mutex); + _ctx_d_pool.push_back(context); + } + void _delete_decompression_ctx(DContext* context) { + DCHECK(context); + ZSTD_freeDCtx(context->ctx); + delete context; + } + +private: + mutable std::mutex _ctx_c_mutex; + mutable std::vector _ctx_c_pool; + + mutable std::mutex _ctx_d_mutex; + mutable std::vector _ctx_d_pool; }; class GzipBlockCompression final : public ZlibBlockCompression { public: + static GzipBlockCompression* instance() { + static GzipBlockCompression s_instance; + return &s_instance; + } ~GzipBlockCompression() override = default; - Status decompress(const Slice& input, Slice* output) const override { + Status decompress(const Slice& input, Slice* output) override { z_stream z_strm = {nullptr}; z_strm.zalloc = Z_NULL; z_strm.zfree = Z_NULL; @@ -548,7 +841,7 @@ public: return Status::OK(); } - size_t max_compressed_len(size_t len) const override { + size_t max_compressed_len(size_t len) override { z_stream zstrm; zstrm.zalloc = Z_NULL; zstrm.zfree = Z_NULL; @@ -586,74 +879,56 @@ private: }; Status get_block_compression_codec(segment_v2::CompressionTypePB type, - std::unique_ptr& codec) { - BlockCompressionCodec* ptr = nullptr; + BlockCompressionCodec** codec) { switch (type) { case segment_v2::CompressionTypePB::NO_COMPRESSION: - codec.reset(nullptr); - return Status::OK(); + *codec = nullptr; + break; case segment_v2::CompressionTypePB::SNAPPY: - ptr = new SnappyBlockCompression(); + *codec = SnappyBlockCompression::instance(); break; case segment_v2::CompressionTypePB::LZ4: - ptr = new Lz4BlockCompression(); + *codec = Lz4BlockCompression::instance(); break; case segment_v2::CompressionTypePB::LZ4F: - ptr = new Lz4fBlockCompression(); + *codec = Lz4fBlockCompression::instance(); break; case segment_v2::CompressionTypePB::ZLIB: - ptr = new ZlibBlockCompression(); + *codec = ZlibBlockCompression::instance(); break; case segment_v2::CompressionTypePB::ZSTD: - ptr = new ZstdBlockCompression(); + *codec = ZstdBlockCompression::instance(); break; default: return Status::NotFound("unknown compression type({})", type); } - if (!ptr) return Status::NotFound("Failed to create compression codec"); - - Status st = ptr->init(); - if (st.ok()) { - codec.reset(ptr); - } else { - delete ptr; - } - - return st; + return Status::OK(); } Status get_block_compression_codec(tparquet::CompressionCodec::type parquet_codec, - std::unique_ptr& codec) { - BlockCompressionCodec* ptr = nullptr; + BlockCompressionCodec** codec) { switch (parquet_codec) { case tparquet::CompressionCodec::UNCOMPRESSED: - codec.reset(nullptr); - return Status::OK(); + *codec = nullptr; + break; case tparquet::CompressionCodec::SNAPPY: - ptr = new SnappyBlockCompression(); + *codec = SnappyBlockCompression::instance(); break; case tparquet::CompressionCodec::LZ4: - ptr = new Lz4BlockCompression(); + *codec = Lz4BlockCompression::instance(); break; case tparquet::CompressionCodec::ZSTD: - ptr = new ZstdBlockCompression(); + *codec = ZstdBlockCompression::instance(); break; case tparquet::CompressionCodec::GZIP: - ptr = new GzipBlockCompression(); + *codec = GzipBlockCompression::instance(); break; default: - return Status::NotFound("unknown compression type({})", tparquet::to_string(parquet_codec)); + return Status::NotFound("unknown compression type({})", parquet_codec); } - Status st = ptr->init(); - if (st.ok()) { - codec.reset(ptr); - } else { - delete ptr; - } - - return st; + return Status::OK(); } } // namespace doris diff --git a/be/src/util/block_compression.h b/be/src/util/block_compression.h index 9de8eb16ff..c2939452de 100644 --- a/be/src/util/block_compression.h +++ b/be/src/util/block_compression.h @@ -34,6 +34,10 @@ namespace doris { // // NOTICE!! BlockCompressionCodec is NOT thread safe, it should NOT be shared by threads // + +// max compression reuse buffer size +// if max_compress_len is bigger than this, donot use faststring in context +const static int MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE = 1024 * 1024 * 8; class BlockCompressionCodec { public: virtual ~BlockCompressionCodec() {} @@ -44,20 +48,23 @@ public: // output should be preallocated, and its capacity must be large enough // for compressed input, which can be get through max_compressed_len function. // Size of compressed data will be set in output's size. - virtual Status compress(const Slice& input, Slice* output) const = 0; + virtual Status compress(const Slice& input, faststring* output) = 0; // Default implementation will merge input list into a big buffer and call // compress(Slice) to finish compression. If compression type support digesting // slice one by one, it should reimplement this function. - virtual Status compress(const std::vector& input, Slice* output) const; + virtual Status compress(const std::vector& input, size_t uncompressed_size, + faststring* output); // Decompress input data into output, output's capacity should be large enough // for decompressed data. // Size of decompressed data will be set in output's size. - virtual Status decompress(const Slice& input, Slice* output) const = 0; + virtual Status decompress(const Slice& input, Slice* output) = 0; // Returns an upper bound on the max compressed length. - virtual size_t max_compressed_len(size_t len) const = 0; + virtual size_t max_compressed_len(size_t len) = 0; + + virtual bool exceed_max_compress_len(size_t uncompressed_size); }; // Get a BlockCompressionCodec through type. @@ -69,9 +76,9 @@ public: // // Return not OK, if error happens. Status get_block_compression_codec(segment_v2::CompressionTypePB type, - std::unique_ptr& codec); + BlockCompressionCodec** codec); Status get_block_compression_codec(tparquet::CompressionCodec::type parquet_codec, - std::unique_ptr& codec); + BlockCompressionCodec** codec); } // namespace doris diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp index 7da0f0e525..7b751f134f 100644 --- a/be/src/vec/core/block.cpp +++ b/be/src/vec/core/block.cpp @@ -30,6 +30,7 @@ #include "runtime/tuple_row.h" #include "udf/udf.h" #include "util/block_compression.h" +#include "util/faststring.h" #include "util/simd/bits.h" #include "vec/columns/column.h" #include "vec/columns/column_array.h" @@ -76,8 +77,8 @@ Block::Block(const PBlock& pblock) { size_t compressed_size = pblock.column_values().size(); size_t uncompressed_size = 0; if (pblock.has_compression_type() && pblock.has_uncompressed_size()) { - std::unique_ptr codec; - get_block_compression_codec(pblock.compression_type(), codec); + BlockCompressionCodec* codec; + get_block_compression_codec(pblock.compression_type(), &codec); uncompressed_size = pblock.uncompressed_size(); compression_scratch.resize(uncompressed_size); Slice decompressed_slice(compression_scratch); @@ -692,15 +693,8 @@ Status Block::filter_block(Block* block, int filter_column_id, int column_to_kee return Status::OK(); } -Status Block::serialize(PBlock* pblock, size_t* uncompressed_bytes, size_t* compressed_bytes, - segment_v2::CompressionTypePB compression_type, - bool allow_transfer_large_data) const { - std::string compression_scratch; - return serialize(pblock, &compression_scratch, uncompressed_bytes, compressed_bytes, - compression_type, allow_transfer_large_data); -} - -Status Block::serialize(PBlock* pblock, std::string* compressed_buffer, size_t* uncompressed_bytes, +Status Block::serialize(PBlock* pblock, + /*std::string* compressed_buffer,*/ size_t* uncompressed_bytes, size_t* compressed_bytes, segment_v2::CompressionTypePB compression_type, bool allow_transfer_large_data) const { CHECK(config::block_data_version <= Block::max_data_version) @@ -720,10 +714,9 @@ Status Block::serialize(PBlock* pblock, std::string* compressed_buffer, size_t* // serialize data values // when data type is HLL, content_uncompressed_size maybe larger than real size. - std::string* column_values = nullptr; + std::string column_values; try { - column_values = pblock->mutable_column_values(); - column_values->resize(content_uncompressed_size); + column_values.resize(content_uncompressed_size); } catch (...) { std::exception_ptr p = std::current_exception(); std::string msg = fmt::format( @@ -732,7 +725,7 @@ Status Block::serialize(PBlock* pblock, std::string* compressed_buffer, size_t* LOG(WARNING) << msg; return Status::BufferAllocFailed(msg); } - char* buf = column_values->data(); + char* buf = column_values.data(); for (const auto& c : *this) { buf = c.type->serialize(*(c.column), buf, config::block_data_version); @@ -745,34 +738,19 @@ Status Block::serialize(PBlock* pblock, std::string* compressed_buffer, size_t* pblock->set_compression_type(compression_type); pblock->set_uncompressed_size(content_uncompressed_size); - std::unique_ptr codec; - RETURN_IF_ERROR(get_block_compression_codec(compression_type, codec)); + BlockCompressionCodec* codec; + RETURN_IF_ERROR(get_block_compression_codec(compression_type, &codec)); - size_t max_compressed_size = codec->max_compressed_len(content_uncompressed_size); - try { - // Try compressing the content to compressed_buffer, - // swap if compressed data is smaller - // Allocation of extra-long contiguous memory may fail, and data compression cannot be used if it fails - compressed_buffer->resize(max_compressed_size); - } catch (...) { - std::exception_ptr p = std::current_exception(); - std::string msg = - fmt::format("Try to alloc {} bytes for compression scratch failed. reason {}", - max_compressed_size, p ? p.__cxa_exception_type()->name() : "null"); - LOG(WARNING) << msg; - return Status::BufferAllocFailed(msg); - } - - Slice compressed_slice(*compressed_buffer); - codec->compress(Slice(column_values->data(), content_uncompressed_size), &compressed_slice); - size_t compressed_size = compressed_slice.size; + faststring buf_compressed; + codec->compress(Slice(column_values.data(), content_uncompressed_size), &buf_compressed); + size_t compressed_size = buf_compressed.size(); if (LIKELY(compressed_size < content_uncompressed_size)) { - compressed_buffer->resize(compressed_size); - column_values->swap(*compressed_buffer); + pblock->set_column_values(buf_compressed.data(), buf_compressed.size()); pblock->set_compressed(true); *compressed_bytes = compressed_size; } else { + pblock->set_column_values(std::move(column_values)); *compressed_bytes = content_uncompressed_size; } diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h index b9a147bcf7..c50960a74a 100644 --- a/be/src/vec/core/block.h +++ b/be/src/vec/core/block.h @@ -277,13 +277,6 @@ public: segment_v2::CompressionTypePB compression_type, bool allow_transfer_large_data = false) const; - // serialize block to PBlock - // compressed_buffer reuse to avoid frequent allocation and deallocation, - // NOTE: compressed_buffer's data may be swapped with pblock->mutable_column_values - Status serialize(PBlock* pblock, std::string* compressed_buffer, size_t* uncompressed_bytes, - size_t* compressed_bytes, segment_v2::CompressionTypePB compression_type, - bool allow_transfer_large_data = false) const; - // serialize block to PRowbatch void serialize(RowBatch*, const RowDescriptor&); diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp index c8192f46b2..ed85e93f5c 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp @@ -36,7 +36,7 @@ Status ColumnChunkReader::init() { size_t chunk_size = _metadata.total_compressed_size; _page_reader = std::make_unique(_stream_reader, start_offset, chunk_size); // get the block compression codec - RETURN_IF_ERROR(get_block_compression_codec(_metadata.codec, _block_compress_codec)); + RETURN_IF_ERROR(get_block_compression_codec(_metadata.codec, &_block_compress_codec)); if (_metadata.__isset.dictionary_page_offset) { // seek to the directory page _page_reader->seek_to_page(_metadata.dictionary_page_offset); diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h index 2b4487ff60..2c6b620cce 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h @@ -129,7 +129,7 @@ private: cctz::time_zone* _ctz; std::unique_ptr _page_reader = nullptr; - std::unique_ptr _block_compress_codec = nullptr; + BlockCompressionCodec* _block_compress_codec = nullptr; LevelDecoder _rep_level_decoder; LevelDecoder _def_level_decoder; diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index 03482ed3b6..fc01d8a632 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -95,7 +95,7 @@ Status VDataStreamSender::Channel::send_current_block(bool eos) { return send_local_block(eos); } auto block = _mutable_block->to_block(); - RETURN_IF_ERROR(_parent->serialize_block(&block, _ch_cur_pb_block, &_compressed_data_buffer)); + RETURN_IF_ERROR(_parent->serialize_block(&block, _ch_cur_pb_block)); block.clear_column_data(); _mutable_block->set_muatable_columns(block.mutate_columns()); RETURN_IF_ERROR(send_block(_ch_cur_pb_block, eos)); @@ -472,8 +472,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block) { RETURN_IF_ERROR(channel->send_local_block(block)); } } else { - RETURN_IF_ERROR(serialize_block(block, _cur_pb_block, &_compressed_data_buffer, - _channels.size())); + RETURN_IF_ERROR(serialize_block(block, _cur_pb_block, _channels.size())); for (auto channel : _channels) { if (channel->is_local()) { RETURN_IF_ERROR(channel->send_local_block(block)); @@ -491,8 +490,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block) { if (current_channel->is_local()) { RETURN_IF_ERROR(current_channel->send_local_block(block)); } else { - RETURN_IF_ERROR(serialize_block(block, current_channel->ch_cur_pb_block(), - &_compressed_data_buffer)); + RETURN_IF_ERROR(serialize_block(block, current_channel->ch_cur_pb_block())); RETURN_IF_ERROR(current_channel->send_block(current_channel->ch_cur_pb_block())); current_channel->ch_roll_pb_block(); } @@ -582,18 +580,12 @@ Status VDataStreamSender::close(RuntimeState* state, Status exec_status) { } Status VDataStreamSender::serialize_block(Block* src, PBlock* dest, int num_receivers) { - return serialize_block(src, dest, &_compressed_data_buffer, num_receivers); -} - -Status VDataStreamSender::serialize_block(Block* src, PBlock* dest, std::string* compressed_buffer, - int num_receivers) { { SCOPED_TIMER(_serialize_batch_timer); dest->Clear(); size_t uncompressed_bytes = 0, compressed_bytes = 0; - RETURN_IF_ERROR(src->serialize(dest, compressed_buffer, &uncompressed_bytes, - &compressed_bytes, _compression_type, - _transfer_large_data_by_brpc)); + RETURN_IF_ERROR(src->serialize(dest, &uncompressed_bytes, &compressed_bytes, + _compression_type, _transfer_large_data_by_brpc)); COUNTER_UPDATE(_bytes_sent_counter, compressed_bytes * num_receivers); COUNTER_UPDATE(_uncompressed_bytes_counter, uncompressed_bytes * num_receivers); COUNTER_UPDATE(_compress_timer, src->get_compress_time()); diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index 16a55fa9af..e537749dd7 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -75,8 +75,6 @@ public: RuntimeState* state() { return _state; } Status serialize_block(Block* src, PBlock* dest, int num_receivers = 1); - Status serialize_block(Block* src, PBlock* dest, std::string* compressed_buffer, - int num_receivers = 1); protected: void _roll_pb_block(); @@ -153,8 +151,6 @@ protected: // User can change this config at runtime, avoid it being modified during query or loading process. bool _transfer_large_data_by_brpc = false; - std::string _compressed_data_buffer; - segment_v2::CompressionTypePB _compression_type; }; @@ -311,7 +307,6 @@ private: PBlock* _ch_cur_pb_block = nullptr; PBlock _ch_pb_block1; PBlock _ch_pb_block2; - std::string _compressed_data_buffer; bool _enable_local_exchange = false; }; diff --git a/be/test/util/block_compression_test.cpp b/be/test/util/block_compression_test.cpp index b1ca291f78..867ee65d20 100644 --- a/be/test/util/block_compression_test.cpp +++ b/be/test/util/block_compression_test.cpp @@ -21,6 +21,8 @@ #include +#include "util/faststring.h" + namespace doris { class BlockCompressionTest : public testing::Test { public: @@ -42,21 +44,19 @@ static std::string generate_str(size_t len) { } void test_single_slice(segment_v2::CompressionTypePB type) { - std::unique_ptr codec; - auto st = get_block_compression_codec(type, codec); + BlockCompressionCodec* codec; + auto st = get_block_compression_codec(type, &codec); EXPECT_TRUE(st.ok()); size_t test_sizes[] = {0, 1, 10, 1000, 1000000}; for (auto size : test_sizes) { auto orig = generate_str(size); - size_t max_len = codec->max_compressed_len(size); - std::string compressed; - compressed.resize(max_len); + faststring compressed_str; { - Slice compressed_slice(compressed); - st = codec->compress(orig, &compressed_slice); + st = codec->compress(orig, &compressed_str); EXPECT_TRUE(st.ok()); + Slice compressed_slice(compressed_str); std::string uncompressed; uncompressed.resize(size); { @@ -86,13 +86,6 @@ void test_single_slice(segment_v2::CompressionTypePB type) { compressed_slice.size += 1; } } - // buffer not enough for compress - if (type != segment_v2::CompressionTypePB::SNAPPY && size > 0) { - Slice compressed_slice(compressed); - compressed_slice.size = 1; - st = codec->compress(orig, &compressed_slice); - EXPECT_FALSE(st.ok()); - } } } @@ -105,8 +98,8 @@ TEST_F(BlockCompressionTest, single) { } void test_multi_slices(segment_v2::CompressionTypePB type) { - std::unique_ptr codec; - auto st = get_block_compression_codec(type, codec); + BlockCompressionCodec* codec; + auto st = get_block_compression_codec(type, &codec); EXPECT_TRUE(st.ok()); size_t test_sizes[] = {0, 1, 10, 1000, 1000000}; @@ -122,15 +115,12 @@ void test_multi_slices(segment_v2::CompressionTypePB type) { } size_t total_size = orig.size(); - size_t max_len = codec->max_compressed_len(total_size); - - std::string compressed; - compressed.resize(max_len); + faststring compressed; { - Slice compressed_slice(compressed); - st = codec->compress(orig_slices, &compressed_slice); + st = codec->compress(orig_slices, total_size, &compressed); EXPECT_TRUE(st.ok()); + Slice compressed_slice(compressed); std::string uncompressed; uncompressed.resize(total_size); // normal case @@ -142,14 +132,6 @@ void test_multi_slices(segment_v2::CompressionTypePB type) { EXPECT_STREQ(orig.c_str(), uncompressed.c_str()); } } - - // buffer not enough failed - if (type != segment_v2::CompressionTypePB::SNAPPY) { - Slice compressed_slice(compressed); - compressed_slice.size = 10; - st = codec->compress(orig, &compressed_slice); - EXPECT_FALSE(st.ok()); - } } TEST_F(BlockCompressionTest, multi) {