From 9e16c69925a4e1f49f2ac4169a717892dcefbf0d Mon Sep 17 00:00:00 2001 From: HHoflittlefish777 <77738092+HHoflittlefish777@users.noreply.github.com> Date: Wed, 26 Jul 2023 18:23:39 +0800 Subject: [PATCH] [improvement](compression) support LZ4_HC algorithm and parse LZ4_RAW (#22165) --- be/src/common/config.cpp | 5 ++ be/src/common/config.h | 3 + be/src/util/block_compression.cpp | 114 ++++++++++++++++++++++++ be/test/util/block_compression_test.cpp | 2 + gensrc/proto/segment_v2.proto | 1 + 5 files changed, 125 insertions(+) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index cb7f0f3698..7bea40cd92 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -21,6 +21,8 @@ #include #include // IWYU pragma: no_include +#include + #include // IWYU pragma: keep #include #include @@ -1044,6 +1046,9 @@ DEFINE_mInt64(auto_inc_low_water_level_mark_size_ratio, "3"); // number of threads that fetch auto-inc ranges from FE DEFINE_mInt64(auto_inc_fetch_thread_num, "3"); +// level of compression when using LZ4_HC, whose defalut value is LZ4HC_CLEVEL_DEFAULT +DEFINE_mInt64(LZ4_HC_compression_level, "9"); + #ifdef BE_TEST // test s3 DEFINE_String(test_s3_resource, "resource"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 63cb352618..80eed68775 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1075,6 +1075,9 @@ DECLARE_mInt64(auto_inc_low_water_level_mark_size_ratio); // number of threads that fetch auto-inc ranges from FE DECLARE_mInt64(auto_inc_fetch_thread_num); +// level of compression when using LZ4_HC, whose defalut value is LZ4HC_CLEVEL_DEFAULT +DECLARE_mInt64(LZ4_HC_compression_level); + #ifdef BE_TEST // test s3 DECLARE_String(test_s3_resource); diff --git a/be/src/util/block_compression.cpp b/be/src/util/block_compression.cpp index 64aa3e83fd..fb4c963c11 100644 --- a/be/src/util/block_compression.cpp +++ b/be/src/util/block_compression.cpp @@ -23,6 +23,7 @@ #include #include #include +#include #include #include #include @@ -37,6 +38,7 @@ #include #include +#include "common/config.h" #include "gutil/strings/substitute.h" #include "util/defer_op.h" #include "util/faststring.h" @@ -405,6 +407,114 @@ LZ4F_preferences_t Lz4fBlockCompression::_s_preferences = { 0u, {0u, 0u, 0u}}; +class Lz4HCBlockCompression : public BlockCompressionCodec { +private: + struct Context { + Context() : ctx(nullptr) {} + LZ4_streamHC_t* ctx; + faststring buffer; + }; + +public: + static Lz4HCBlockCompression* instance() { + static Lz4HCBlockCompression s_instance; + return &s_instance; + } + ~Lz4HCBlockCompression() { + for (auto ctx : _ctx_pool) { + _delete_compression_ctx(ctx); + } + } + + Status compress(const Slice& input, faststring* output) override { + Context* context; + RETURN_IF_ERROR(_acquire_compression_ctx(&context)); + bool compress_failed = false; + Defer defer {[&] { + if (compress_failed) { + _delete_compression_ctx(context); + } else { + _release_compression_ctx(context); + } + }}; + Slice compressed_buf; + size_t max_len = max_compressed_len(input.size); + if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) { + // use output directly + output->resize(max_len); + compressed_buf.data = reinterpret_cast(output->data()); + compressed_buf.size = max_len; + } else { + // reuse context buffer if max_len < MAX_COMPRESSION_BUFFER_FOR_REUSE + context->buffer.resize(max_len); + compressed_buf.data = reinterpret_cast(context->buffer.data()); + compressed_buf.size = max_len; + } + + size_t compressed_len = LZ4_compress_HC_continue( + context->ctx, input.data, compressed_buf.data, input.size, compressed_buf.size); + if (compressed_len == 0) { + compress_failed = true; + return Status::InvalidArgument("Output buffer's capacity is not enough, size={}", + compressed_buf.size); + } + output->resize(compressed_len); + if (max_len < MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) { + output->assign_copy(reinterpret_cast(compressed_buf.data), compressed_len); + } + return Status::OK(); + } + + Status decompress(const Slice& input, Slice* output) override { + auto decompressed_len = + LZ4_decompress_safe(input.data, output->data, input.size, output->size); + if (decompressed_len < 0) { + return Status::InvalidArgument("fail to do LZ4 decompress, error={}", decompressed_len); + } + output->size = decompressed_len; + return Status::OK(); + } + + size_t max_compressed_len(size_t len) override { return LZ4_compressBound(len); } + +private: + Status _acquire_compression_ctx(Context** out) { + std::lock_guard l(_ctx_mutex); + if (_ctx_pool.empty()) { + Context* context = new (std::nothrow) Context(); + if (context == nullptr) { + return Status::InvalidArgument("new LZ4HC context error"); + } + context->ctx = LZ4_createStreamHC(); + if (context->ctx == nullptr) { + delete context; + return Status::InvalidArgument("LZ4_createStreamHC error"); + } + *out = context; + return Status::OK(); + } + *out = _ctx_pool.back(); + _ctx_pool.pop_back(); + return Status::OK(); + } + void _release_compression_ctx(Context* context) { + DCHECK(context); + LZ4_resetStreamHC_fast(context->ctx, _compression_level); + std::lock_guard l(_ctx_mutex); + _ctx_pool.push_back(context); + } + void _delete_compression_ctx(Context* context) { + DCHECK(context); + LZ4_freeStreamHC(context->ctx); + delete context; + } + +private: + int64_t _compression_level = config::LZ4_HC_compression_level; + mutable std::mutex _ctx_mutex; + mutable std::vector _ctx_pool; +}; + class SnappySlicesSource : public snappy::Source { public: SnappySlicesSource(const std::vector& slices) @@ -911,6 +1021,9 @@ Status get_block_compression_codec(segment_v2::CompressionTypePB type, case segment_v2::CompressionTypePB::LZ4F: *codec = Lz4fBlockCompression::instance(); break; + case segment_v2::CompressionTypePB::LZ4HC: + *codec = Lz4HCBlockCompression::instance(); + break; case segment_v2::CompressionTypePB::ZLIB: *codec = ZlibBlockCompression::instance(); break; @@ -933,6 +1046,7 @@ Status get_block_compression_codec(tparquet::CompressionCodec::type parquet_code case tparquet::CompressionCodec::SNAPPY: *codec = SnappyBlockCompression::instance(); break; + case tparquet::CompressionCodec::LZ4_RAW: // we can use LZ4 compression algorithm parse LZ4_RAW case tparquet::CompressionCodec::LZ4: *codec = Lz4BlockCompression::instance(); break; diff --git a/be/test/util/block_compression_test.cpp b/be/test/util/block_compression_test.cpp index 80328b2d38..f430a0274e 100644 --- a/be/test/util/block_compression_test.cpp +++ b/be/test/util/block_compression_test.cpp @@ -98,6 +98,7 @@ TEST_F(BlockCompressionTest, single) { test_single_slice(segment_v2::CompressionTypePB::ZLIB); test_single_slice(segment_v2::CompressionTypePB::LZ4); test_single_slice(segment_v2::CompressionTypePB::LZ4F); + test_single_slice(segment_v2::CompressionTypePB::LZ4HC); test_single_slice(segment_v2::CompressionTypePB::ZSTD); } @@ -143,6 +144,7 @@ TEST_F(BlockCompressionTest, multi) { test_multi_slices(segment_v2::CompressionTypePB::ZLIB); test_multi_slices(segment_v2::CompressionTypePB::LZ4); test_multi_slices(segment_v2::CompressionTypePB::LZ4F); + test_multi_slices(segment_v2::CompressionTypePB::LZ4HC); test_multi_slices(segment_v2::CompressionTypePB::ZSTD); } diff --git a/gensrc/proto/segment_v2.proto b/gensrc/proto/segment_v2.proto index d124433b65..9bf62846ec 100644 --- a/gensrc/proto/segment_v2.proto +++ b/gensrc/proto/segment_v2.proto @@ -51,6 +51,7 @@ enum CompressionTypePB { LZ4F = 5; ZLIB = 6; ZSTD = 7; + LZ4HC = 8; } enum PageTypePB {