[fix](decompress)(review) context leaked in failure path (#33622) (#35364)

* [fix](decompress)(review) context leaked in failure path

* [fix](decompress)(review) context leaked in failure path review fix

Co-authored-by: Vallish Pai <vallishpai@gmail.com>
This commit is contained in:
Yongqiang YANG
2024-05-24 17:40:13 +08:00
committed by GitHub
parent 88e2753e40
commit 41f29cf4cd

View File

@ -44,6 +44,7 @@
#include <ostream>
#include "common/config.h"
#include "common/factory_creator.h"
#include "exec/decompressor.h"
#include "gutil/endian.h"
#include "gutil/strings/substitute.h"
@ -90,10 +91,18 @@ bool BlockCompressionCodec::exceed_max_compress_len(size_t uncompressed_size) {
class Lz4BlockCompression : public BlockCompressionCodec {
private:
struct Context {
class Context {
ENABLE_FACTORY_CREATOR(Context);
public:
Context() : ctx(nullptr) {}
LZ4_stream_t* ctx;
faststring buffer;
~Context() {
if (ctx) {
LZ4_freeStream(ctx);
}
}
};
public:
@ -101,11 +110,7 @@ public:
static Lz4BlockCompression s_instance;
return &s_instance;
}
~Lz4BlockCompression() {
for (auto ctx : _ctx_pool) {
_delete_compression_ctx(ctx);
}
}
~Lz4BlockCompression() { _ctx_pool.clear(); }
Status compress(const Slice& input, faststring* output) override {
if (input.size > INT_MAX) {
@ -115,14 +120,12 @@ public:
input.size);
}
Context* context;
RETURN_IF_ERROR(_acquire_compression_ctx(&context));
std::unique_ptr<Context> context;
RETURN_IF_ERROR(_acquire_compression_ctx(context));
bool compress_failed = false;
Defer defer {[&] {
if (compress_failed) {
_delete_compression_ctx(context);
} else {
_release_compression_ctx(context);
if (!compress_failed) {
_release_compression_ctx(std::move(context));
}
}};
Slice compressed_buf;
@ -168,40 +171,34 @@ public:
private:
// reuse LZ4 compress stream
Status _acquire_compression_ctx(Context** out) {
Status _acquire_compression_ctx(std::unique_ptr<Context>& out) {
std::lock_guard<std::mutex> l(_ctx_mutex);
if (_ctx_pool.empty()) {
Context* context = new (std::nothrow) Context();
if (context == nullptr) {
std::unique_ptr<Context> localCtx = Context::create_unique();
if (localCtx.get() == nullptr) {
return Status::InvalidArgument("new LZ4 context error");
}
context->ctx = LZ4_createStream();
if (context->ctx == nullptr) {
delete context;
localCtx->ctx = LZ4_createStream();
if (localCtx->ctx == nullptr) {
return Status::InvalidArgument("LZ4_createStream error");
}
*out = context;
out = std::move(localCtx);
return Status::OK();
}
*out = _ctx_pool.back();
out = std::move(_ctx_pool.back());
_ctx_pool.pop_back();
return Status::OK();
}
void _release_compression_ctx(Context* context) {
void _release_compression_ctx(std::unique_ptr<Context> context) {
DCHECK(context);
LZ4_resetStream(context->ctx);
std::lock_guard<std::mutex> l(_ctx_mutex);
_ctx_pool.push_back(context);
}
void _delete_compression_ctx(Context* context) {
DCHECK(context);
LZ4_freeStream(context->ctx);
delete context;
_ctx_pool.push_back(std::move(context));
}
private:
mutable std::mutex _ctx_mutex;
mutable std::vector<Context*> _ctx_pool;
mutable std::vector<std::unique_ptr<Context>> _ctx_pool;
static const int32_t ACCELARATION = 1;
};
@ -233,14 +230,30 @@ private:
// Used for LZ4 frame format, decompress speed is two times faster than LZ4.
class Lz4fBlockCompression : public BlockCompressionCodec {
private:
struct CContext {
class CContext {
ENABLE_FACTORY_CREATOR(CContext);
public:
CContext() : ctx(nullptr) {}
LZ4F_compressionContext_t ctx;
faststring buffer;
~CContext() {
if (ctx) {
LZ4F_freeCompressionContext(ctx);
}
}
};
struct DContext {
class DContext {
ENABLE_FACTORY_CREATOR(DContext);
public:
DContext() : ctx(nullptr) {}
LZ4F_decompressionContext_t ctx;
~DContext() {
if (ctx) {
LZ4F_freeDecompressionContext(ctx);
}
}
};
public:
@ -249,12 +262,8 @@ public:
return &s_instance;
}
~Lz4fBlockCompression() {
for (auto ctx : _ctx_c_pool) {
_delete_compression_ctx(ctx);
}
for (auto ctx : _ctx_d_pool) {
_delete_decompression_ctx(ctx);
}
_ctx_c_pool.clear();
_ctx_d_pool.clear();
}
Status compress(const Slice& input, faststring* output) override {
@ -279,14 +288,12 @@ public:
private:
Status _compress(const std::vector<Slice>& inputs, size_t uncompressed_size,
faststring* output) {
CContext* context = nullptr;
RETURN_IF_ERROR(_acquire_compression_ctx(&context));
std::unique_ptr<CContext> context;
RETURN_IF_ERROR(_acquire_compression_ctx(context));
bool compress_failed = false;
Defer defer {[&] {
if (compress_failed) {
_delete_compression_ctx(context);
} else {
_release_compression_ctx(context);
if (!compress_failed) {
_release_compression_ctx(std::move(context));
}
}};
Slice compressed_buf;
@ -340,13 +347,11 @@ private:
Status _decompress(const Slice& input, Slice* output) {
bool decompress_failed = false;
DContext* context = nullptr;
RETURN_IF_ERROR(_acquire_decompression_ctx(&context));
std::unique_ptr<DContext> context;
RETURN_IF_ERROR(_acquire_decompression_ctx(context));
Defer defer {[&] {
if (decompress_failed) {
_delete_decompression_ctx(context);
} else {
_release_decompression_ctx(context);
if (!decompress_failed) {
_release_decompression_ctx(std::move(context));
}
}};
size_t input_size = input.size;
@ -373,66 +378,56 @@ private:
private:
// acquire a compression ctx from pool, release while finish compress,
// delete if compression failed
Status _acquire_compression_ctx(CContext** out) {
Status _acquire_compression_ctx(std::unique_ptr<CContext>& out) {
std::lock_guard<std::mutex> l(_ctx_c_mutex);
if (_ctx_c_pool.empty()) {
CContext* context = new (std::nothrow) CContext();
if (context == nullptr) {
std::unique_ptr<CContext> localCtx = CContext::create_unique();
if (localCtx.get() == nullptr) {
return Status::InvalidArgument("failed to new LZ4F CContext");
}
auto res = LZ4F_createCompressionContext(&context->ctx, LZ4F_VERSION);
auto res = LZ4F_createCompressionContext(&localCtx->ctx, LZ4F_VERSION);
if (LZ4F_isError(res) != 0) {
return Status::InvalidArgument(strings::Substitute(
"LZ4F_createCompressionContext error, res=$0", LZ4F_getErrorName(res)));
}
*out = context;
out = std::move(localCtx);
return Status::OK();
}
*out = _ctx_c_pool.back();
out = std::move(_ctx_c_pool.back());
_ctx_c_pool.pop_back();
return Status::OK();
}
void _release_compression_ctx(CContext* context) {
void _release_compression_ctx(std::unique_ptr<CContext> context) {
DCHECK(context);
std::lock_guard<std::mutex> l(_ctx_c_mutex);
_ctx_c_pool.push_back(context);
}
void _delete_compression_ctx(CContext* context) {
DCHECK(context);
LZ4F_freeCompressionContext(context->ctx);
delete context;
_ctx_c_pool.push_back(std::move(context));
}
Status _acquire_decompression_ctx(DContext** out) {
Status _acquire_decompression_ctx(std::unique_ptr<DContext>& out) {
std::lock_guard<std::mutex> l(_ctx_d_mutex);
if (_ctx_d_pool.empty()) {
DContext* context = new (std::nothrow) DContext();
if (context == nullptr) {
std::unique_ptr<DContext> localCtx = DContext::create_unique();
if (localCtx.get() == nullptr) {
return Status::InvalidArgument("failed to new LZ4F DContext");
}
auto res = LZ4F_createDecompressionContext(&context->ctx, LZ4F_VERSION);
auto res = LZ4F_createDecompressionContext(&localCtx->ctx, LZ4F_VERSION);
if (LZ4F_isError(res) != 0) {
return Status::InvalidArgument(strings::Substitute(
"LZ4F_createDeompressionContext error, res=$0", LZ4F_getErrorName(res)));
}
*out = context;
out = std::move(localCtx);
return Status::OK();
}
*out = _ctx_d_pool.back();
out = std::move(_ctx_d_pool.back());
_ctx_d_pool.pop_back();
return Status::OK();
}
void _release_decompression_ctx(DContext* context) {
void _release_decompression_ctx(std::unique_ptr<DContext> context) {
DCHECK(context);
// reset decompression context to avoid ERROR_maxBlockSize_invalid
LZ4F_resetDecompressionContext(context->ctx);
std::lock_guard<std::mutex> l(_ctx_d_mutex);
_ctx_d_pool.push_back(context);
}
void _delete_decompression_ctx(DContext* context) {
DCHECK(context);
LZ4F_freeDecompressionContext(context->ctx);
delete context;
_ctx_d_pool.push_back(std::move(context));
}
private:
@ -440,10 +435,10 @@ private:
std::mutex _ctx_c_mutex;
// LZ4F_compressionContext_t is a pointer so no copy here
std::vector<CContext*> _ctx_c_pool;
std::vector<std::unique_ptr<CContext>> _ctx_c_pool;
std::mutex _ctx_d_mutex;
std::vector<DContext*> _ctx_d_pool;
std::vector<std::unique_ptr<DContext>> _ctx_d_pool;
};
LZ4F_preferences_t Lz4fBlockCompression::_s_preferences = {
@ -456,10 +451,18 @@ LZ4F_preferences_t Lz4fBlockCompression::_s_preferences = {
class Lz4HCBlockCompression : public BlockCompressionCodec {
private:
struct Context {
class Context {
ENABLE_FACTORY_CREATOR(Context);
public:
Context() : ctx(nullptr) {}
LZ4_streamHC_t* ctx;
faststring buffer;
~Context() {
if (ctx) {
LZ4_freeStreamHC(ctx);
}
}
};
public:
@ -467,21 +470,15 @@ public:
static Lz4HCBlockCompression s_instance;
return &s_instance;
}
~Lz4HCBlockCompression() {
for (auto ctx : _ctx_pool) {
_delete_compression_ctx(ctx);
}
}
~Lz4HCBlockCompression() { _ctx_pool.clear(); }
Status compress(const Slice& input, faststring* output) override {
Context* context;
RETURN_IF_ERROR(_acquire_compression_ctx(&context));
std::unique_ptr<Context> context;
RETURN_IF_ERROR(_acquire_compression_ctx(context));
bool compress_failed = false;
Defer defer {[&] {
if (compress_failed) {
_delete_compression_ctx(context);
} else {
_release_compression_ctx(context);
if (!compress_failed) {
_release_compression_ctx(std::move(context));
}
}};
Slice compressed_buf;
@ -525,41 +522,35 @@ public:
size_t max_compressed_len(size_t len) override { return LZ4_compressBound(len); }
private:
Status _acquire_compression_ctx(Context** out) {
Status _acquire_compression_ctx(std::unique_ptr<Context>& out) {
std::lock_guard<std::mutex> l(_ctx_mutex);
if (_ctx_pool.empty()) {
Context* context = new (std::nothrow) Context();
if (context == nullptr) {
std::unique_ptr<Context> localCtx = Context::create_unique();
if (localCtx.get() == nullptr) {
return Status::InvalidArgument("new LZ4HC context error");
}
context->ctx = LZ4_createStreamHC();
if (context->ctx == nullptr) {
delete context;
localCtx->ctx = LZ4_createStreamHC();
if (localCtx->ctx == nullptr) {
return Status::InvalidArgument("LZ4_createStreamHC error");
}
*out = context;
out = std::move(localCtx);
return Status::OK();
}
*out = _ctx_pool.back();
out = std::move(_ctx_pool.back());
_ctx_pool.pop_back();
return Status::OK();
}
void _release_compression_ctx(Context* context) {
void _release_compression_ctx(std::unique_ptr<Context> context) {
DCHECK(context);
LZ4_resetStreamHC_fast(context->ctx, _compression_level);
std::lock_guard<std::mutex> l(_ctx_mutex);
_ctx_pool.push_back(context);
}
void _delete_compression_ctx(Context* context) {
DCHECK(context);
LZ4_freeStreamHC(context->ctx);
delete context;
_ctx_pool.push_back(std::move(context));
}
private:
int64_t _compression_level = config::LZ4_HC_compression_level;
mutable std::mutex _ctx_mutex;
mutable std::vector<Context*> _ctx_pool;
mutable std::vector<std::unique_ptr<Context>> _ctx_pool;
};
class SnappySlicesSource : public snappy::Source {
@ -751,14 +742,30 @@ public:
// for ZSTD compression and decompression, with BOTH fast and high compression ratio
class ZstdBlockCompression : public BlockCompressionCodec {
private:
struct CContext {
class CContext {
ENABLE_FACTORY_CREATOR(CContext);
public:
CContext() : ctx(nullptr) {}
ZSTD_CCtx* ctx;
faststring buffer;
~CContext() {
if (ctx) {
ZSTD_freeCCtx(ctx);
}
}
};
struct DContext {
class DContext {
ENABLE_FACTORY_CREATOR(DContext);
public:
DContext() : ctx(nullptr) {}
ZSTD_DCtx* ctx;
~DContext() {
if (ctx) {
ZSTD_freeDCtx(ctx);
}
}
};
public:
@ -767,12 +774,8 @@ public:
return &s_instance;
}
~ZstdBlockCompression() {
for (auto ctx : _ctx_c_pool) {
_delete_compression_ctx(ctx);
}
for (auto ctx : _ctx_d_pool) {
_delete_decompression_ctx(ctx);
}
_ctx_c_pool.clear();
_ctx_d_pool.clear();
}
size_t max_compressed_len(size_t len) override { return ZSTD_compressBound(len); }
@ -786,14 +789,12 @@ public:
// https://github.com/facebook/zstd/blob/dev/examples/streaming_compression.c
Status compress(const std::vector<Slice>& inputs, size_t uncompressed_size,
faststring* output) override {
CContext* context;
RETURN_IF_ERROR(_acquire_compression_ctx(&context));
std::unique_ptr<CContext> context;
RETURN_IF_ERROR(_acquire_compression_ctx(context));
bool compress_failed = false;
Defer defer {[&] {
if (compress_failed) {
_delete_compression_ctx(context);
} else {
_release_compression_ctx(context);
if (!compress_failed) {
_release_compression_ctx(std::move(context));
}
}};
@ -864,14 +865,12 @@ public:
}
Status decompress(const Slice& input, Slice* output) override {
DContext* context;
std::unique_ptr<DContext> context;
bool decompress_failed = false;
RETURN_IF_ERROR(_acquire_decompression_ctx(&context));
RETURN_IF_ERROR(_acquire_decompression_ctx(context));
Defer defer {[&] {
if (decompress_failed) {
_delete_decompression_ctx(context);
} else {
_release_decompression_ctx(context);
if (!decompress_failed) {
_release_decompression_ctx(std::move(context));
}
}};
@ -890,76 +889,66 @@ public:
}
private:
Status _acquire_compression_ctx(CContext** out) {
Status _acquire_compression_ctx(std::unique_ptr<CContext>& out) {
std::lock_guard<std::mutex> l(_ctx_c_mutex);
if (_ctx_c_pool.empty()) {
CContext* context = new (std::nothrow) CContext();
if (context == nullptr) {
std::unique_ptr<CContext> localCtx = CContext::create_unique();
if (localCtx.get() == nullptr) {
return Status::InvalidArgument("failed to new ZSTD CContext");
}
//typedef LZ4F_cctx* LZ4F_compressionContext_t;
context->ctx = ZSTD_createCCtx();
if (context->ctx == nullptr) {
localCtx->ctx = ZSTD_createCCtx();
if (localCtx->ctx == nullptr) {
return Status::InvalidArgument("Failed to create ZSTD compress ctx");
}
*out = context;
out = std::move(localCtx);
return Status::OK();
}
*out = _ctx_c_pool.back();
out = std::move(_ctx_c_pool.back());
_ctx_c_pool.pop_back();
return Status::OK();
}
void _release_compression_ctx(CContext* context) {
void _release_compression_ctx(std::unique_ptr<CContext> context) {
DCHECK(context);
auto ret = ZSTD_CCtx_reset(context->ctx, ZSTD_reset_session_only);
DCHECK(!ZSTD_isError(ret));
std::lock_guard<std::mutex> l(_ctx_c_mutex);
_ctx_c_pool.push_back(context);
}
void _delete_compression_ctx(CContext* context) {
DCHECK(context);
ZSTD_freeCCtx(context->ctx);
delete context;
_ctx_c_pool.push_back(std::move(context));
}
Status _acquire_decompression_ctx(DContext** out) {
Status _acquire_decompression_ctx(std::unique_ptr<DContext>& out) {
std::lock_guard<std::mutex> l(_ctx_d_mutex);
if (_ctx_d_pool.empty()) {
DContext* context = new (std::nothrow) DContext();
if (context == nullptr) {
std::unique_ptr<DContext> localCtx = DContext::create_unique();
if (localCtx.get() == nullptr) {
return Status::InvalidArgument("failed to new ZSTD DContext");
}
context->ctx = ZSTD_createDCtx();
if (context->ctx == nullptr) {
localCtx->ctx = ZSTD_createDCtx();
if (localCtx->ctx == nullptr) {
return Status::InvalidArgument("Fail to init ZSTD decompress context");
}
*out = context;
out = std::move(localCtx);
return Status::OK();
}
*out = _ctx_d_pool.back();
out = std::move(_ctx_d_pool.back());
_ctx_d_pool.pop_back();
return Status::OK();
}
void _release_decompression_ctx(DContext* context) {
void _release_decompression_ctx(std::unique_ptr<DContext> context) {
DCHECK(context);
// reset ctx to start a new decompress session
auto ret = ZSTD_DCtx_reset(context->ctx, ZSTD_reset_session_only);
DCHECK(!ZSTD_isError(ret));
std::lock_guard<std::mutex> l(_ctx_d_mutex);
_ctx_d_pool.push_back(context);
}
void _delete_decompression_ctx(DContext* context) {
DCHECK(context);
ZSTD_freeDCtx(context->ctx);
delete context;
_ctx_d_pool.push_back(std::move(context));
}
private:
mutable std::mutex _ctx_c_mutex;
mutable std::vector<CContext*> _ctx_c_pool;
mutable std::vector<std::unique_ptr<CContext>> _ctx_c_pool;
mutable std::mutex _ctx_d_mutex;
mutable std::vector<DContext*> _ctx_d_pool;
mutable std::vector<std::unique_ptr<DContext>> _ctx_d_pool;
};
class GzipBlockCompression : public ZlibBlockCompression {