[improvement](compression) support LZ4_HC algorithm and parse LZ4_RAW (#22165)

This commit is contained in:
HHoflittlefish777
2023-07-26 18:23:39 +08:00
committed by GitHub
parent 4e57f45d8e
commit 9e16c69925
5 changed files with 125 additions and 0 deletions

View File

@ -21,6 +21,8 @@
#include <algorithm>
#include <cctype>
// IWYU pragma: no_include <bthread/errno.h>
#include <lz4/lz4hc.h>
#include <cerrno> // IWYU pragma: keep
#include <cstdlib>
#include <cstring>
@ -1044,6 +1046,9 @@ DEFINE_mInt64(auto_inc_low_water_level_mark_size_ratio, "3");
// number of threads that fetch auto-inc ranges from FE
DEFINE_mInt64(auto_inc_fetch_thread_num, "3");
// level of compression when using LZ4_HC, whose defalut value is LZ4HC_CLEVEL_DEFAULT
DEFINE_mInt64(LZ4_HC_compression_level, "9");
#ifdef BE_TEST
// test s3
DEFINE_String(test_s3_resource, "resource");

View File

@ -1075,6 +1075,9 @@ DECLARE_mInt64(auto_inc_low_water_level_mark_size_ratio);
// number of threads that fetch auto-inc ranges from FE
DECLARE_mInt64(auto_inc_fetch_thread_num);
// level of compression when using LZ4_HC, whose defalut value is LZ4HC_CLEVEL_DEFAULT
DECLARE_mInt64(LZ4_HC_compression_level);
#ifdef BE_TEST
// test s3
DECLARE_String(test_s3_resource);

View File

@ -23,6 +23,7 @@
#include <limits.h>
#include <lz4/lz4.h>
#include <lz4/lz4frame.h>
#include <lz4/lz4hc.h>
#include <snappy/snappy-sinksource.h>
#include <snappy/snappy.h>
#include <stdint.h>
@ -37,6 +38,7 @@
#include <new>
#include <ostream>
#include "common/config.h"
#include "gutil/strings/substitute.h"
#include "util/defer_op.h"
#include "util/faststring.h"
@ -405,6 +407,114 @@ LZ4F_preferences_t Lz4fBlockCompression::_s_preferences = {
0u,
{0u, 0u, 0u}};
class Lz4HCBlockCompression : public BlockCompressionCodec {
private:
struct Context {
Context() : ctx(nullptr) {}
LZ4_streamHC_t* ctx;
faststring buffer;
};
public:
static Lz4HCBlockCompression* instance() {
static Lz4HCBlockCompression s_instance;
return &s_instance;
}
~Lz4HCBlockCompression() {
for (auto ctx : _ctx_pool) {
_delete_compression_ctx(ctx);
}
}
Status compress(const Slice& input, faststring* output) override {
Context* context;
RETURN_IF_ERROR(_acquire_compression_ctx(&context));
bool compress_failed = false;
Defer defer {[&] {
if (compress_failed) {
_delete_compression_ctx(context);
} else {
_release_compression_ctx(context);
}
}};
Slice compressed_buf;
size_t max_len = max_compressed_len(input.size);
if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
// use output directly
output->resize(max_len);
compressed_buf.data = reinterpret_cast<char*>(output->data());
compressed_buf.size = max_len;
} else {
// reuse context buffer if max_len < MAX_COMPRESSION_BUFFER_FOR_REUSE
context->buffer.resize(max_len);
compressed_buf.data = reinterpret_cast<char*>(context->buffer.data());
compressed_buf.size = max_len;
}
size_t compressed_len = LZ4_compress_HC_continue(
context->ctx, input.data, compressed_buf.data, input.size, compressed_buf.size);
if (compressed_len == 0) {
compress_failed = true;
return Status::InvalidArgument("Output buffer's capacity is not enough, size={}",
compressed_buf.size);
}
output->resize(compressed_len);
if (max_len < MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
output->assign_copy(reinterpret_cast<uint8_t*>(compressed_buf.data), compressed_len);
}
return Status::OK();
}
Status decompress(const Slice& input, Slice* output) override {
auto decompressed_len =
LZ4_decompress_safe(input.data, output->data, input.size, output->size);
if (decompressed_len < 0) {
return Status::InvalidArgument("fail to do LZ4 decompress, error={}", decompressed_len);
}
output->size = decompressed_len;
return Status::OK();
}
size_t max_compressed_len(size_t len) override { return LZ4_compressBound(len); }
private:
Status _acquire_compression_ctx(Context** out) {
std::lock_guard<std::mutex> l(_ctx_mutex);
if (_ctx_pool.empty()) {
Context* context = new (std::nothrow) Context();
if (context == nullptr) {
return Status::InvalidArgument("new LZ4HC context error");
}
context->ctx = LZ4_createStreamHC();
if (context->ctx == nullptr) {
delete context;
return Status::InvalidArgument("LZ4_createStreamHC error");
}
*out = context;
return Status::OK();
}
*out = _ctx_pool.back();
_ctx_pool.pop_back();
return Status::OK();
}
void _release_compression_ctx(Context* context) {
DCHECK(context);
LZ4_resetStreamHC_fast(context->ctx, _compression_level);
std::lock_guard<std::mutex> l(_ctx_mutex);
_ctx_pool.push_back(context);
}
void _delete_compression_ctx(Context* context) {
DCHECK(context);
LZ4_freeStreamHC(context->ctx);
delete context;
}
private:
int64_t _compression_level = config::LZ4_HC_compression_level;
mutable std::mutex _ctx_mutex;
mutable std::vector<Context*> _ctx_pool;
};
class SnappySlicesSource : public snappy::Source {
public:
SnappySlicesSource(const std::vector<Slice>& slices)
@ -911,6 +1021,9 @@ Status get_block_compression_codec(segment_v2::CompressionTypePB type,
case segment_v2::CompressionTypePB::LZ4F:
*codec = Lz4fBlockCompression::instance();
break;
case segment_v2::CompressionTypePB::LZ4HC:
*codec = Lz4HCBlockCompression::instance();
break;
case segment_v2::CompressionTypePB::ZLIB:
*codec = ZlibBlockCompression::instance();
break;
@ -933,6 +1046,7 @@ Status get_block_compression_codec(tparquet::CompressionCodec::type parquet_code
case tparquet::CompressionCodec::SNAPPY:
*codec = SnappyBlockCompression::instance();
break;
case tparquet::CompressionCodec::LZ4_RAW: // we can use LZ4 compression algorithm parse LZ4_RAW
case tparquet::CompressionCodec::LZ4:
*codec = Lz4BlockCompression::instance();
break;

View File

@ -98,6 +98,7 @@ TEST_F(BlockCompressionTest, single) {
test_single_slice(segment_v2::CompressionTypePB::ZLIB);
test_single_slice(segment_v2::CompressionTypePB::LZ4);
test_single_slice(segment_v2::CompressionTypePB::LZ4F);
test_single_slice(segment_v2::CompressionTypePB::LZ4HC);
test_single_slice(segment_v2::CompressionTypePB::ZSTD);
}
@ -143,6 +144,7 @@ TEST_F(BlockCompressionTest, multi) {
test_multi_slices(segment_v2::CompressionTypePB::ZLIB);
test_multi_slices(segment_v2::CompressionTypePB::LZ4);
test_multi_slices(segment_v2::CompressionTypePB::LZ4F);
test_multi_slices(segment_v2::CompressionTypePB::LZ4HC);
test_multi_slices(segment_v2::CompressionTypePB::ZSTD);
}

View File

@ -51,6 +51,7 @@ enum CompressionTypePB {
LZ4F = 5;
ZLIB = 6;
ZSTD = 7;
LZ4HC = 8;
}
enum PageTypePB {