From 41f29cf4cdbf64c1cc14892ebb88d20c2dca5c0b Mon Sep 17 00:00:00 2001 From: Yongqiang YANG <98214048+dataroaring@users.noreply.github.com> Date: Fri, 24 May 2024 17:40:13 +0800 Subject: [PATCH] [fix](decompress)(review) context leaked in failure path (#33622) (#35364) * [fix](decompress)(review) context leaked in failure path * [fix](decompress)(review) context leaked in failure path review fix Co-authored-by: Vallish Pai --- be/src/util/block_compression.cpp | 297 ++++++++++++++---------------- 1 file changed, 143 insertions(+), 154 deletions(-) diff --git a/be/src/util/block_compression.cpp b/be/src/util/block_compression.cpp index b5e800a7d0..332a581f3e 100644 --- a/be/src/util/block_compression.cpp +++ b/be/src/util/block_compression.cpp @@ -44,6 +44,7 @@ #include #include "common/config.h" +#include "common/factory_creator.h" #include "exec/decompressor.h" #include "gutil/endian.h" #include "gutil/strings/substitute.h" @@ -90,10 +91,18 @@ bool BlockCompressionCodec::exceed_max_compress_len(size_t uncompressed_size) { class Lz4BlockCompression : public BlockCompressionCodec { private: - struct Context { + class Context { + ENABLE_FACTORY_CREATOR(Context); + + public: Context() : ctx(nullptr) {} LZ4_stream_t* ctx; faststring buffer; + ~Context() { + if (ctx) { + LZ4_freeStream(ctx); + } + } }; public: @@ -101,11 +110,7 @@ public: static Lz4BlockCompression s_instance; return &s_instance; } - ~Lz4BlockCompression() { - for (auto ctx : _ctx_pool) { - _delete_compression_ctx(ctx); - } - } + ~Lz4BlockCompression() { _ctx_pool.clear(); } Status compress(const Slice& input, faststring* output) override { if (input.size > INT_MAX) { @@ -115,14 +120,12 @@ public: input.size); } - Context* context; - RETURN_IF_ERROR(_acquire_compression_ctx(&context)); + std::unique_ptr context; + RETURN_IF_ERROR(_acquire_compression_ctx(context)); bool compress_failed = false; Defer defer {[&] { - if (compress_failed) { - _delete_compression_ctx(context); - } else { - _release_compression_ctx(context); + if (!compress_failed) { + _release_compression_ctx(std::move(context)); } }}; Slice compressed_buf; @@ -168,40 +171,34 @@ public: private: // reuse LZ4 compress stream - Status _acquire_compression_ctx(Context** out) { + Status _acquire_compression_ctx(std::unique_ptr& out) { std::lock_guard l(_ctx_mutex); if (_ctx_pool.empty()) { - Context* context = new (std::nothrow) Context(); - if (context == nullptr) { + std::unique_ptr localCtx = Context::create_unique(); + if (localCtx.get() == nullptr) { return Status::InvalidArgument("new LZ4 context error"); } - context->ctx = LZ4_createStream(); - if (context->ctx == nullptr) { - delete context; + localCtx->ctx = LZ4_createStream(); + if (localCtx->ctx == nullptr) { return Status::InvalidArgument("LZ4_createStream error"); } - *out = context; + out = std::move(localCtx); return Status::OK(); } - *out = _ctx_pool.back(); + out = std::move(_ctx_pool.back()); _ctx_pool.pop_back(); return Status::OK(); } - void _release_compression_ctx(Context* context) { + void _release_compression_ctx(std::unique_ptr context) { DCHECK(context); LZ4_resetStream(context->ctx); std::lock_guard l(_ctx_mutex); - _ctx_pool.push_back(context); - } - void _delete_compression_ctx(Context* context) { - DCHECK(context); - LZ4_freeStream(context->ctx); - delete context; + _ctx_pool.push_back(std::move(context)); } private: mutable std::mutex _ctx_mutex; - mutable std::vector _ctx_pool; + mutable std::vector> _ctx_pool; static const int32_t ACCELARATION = 1; }; @@ -233,14 +230,30 @@ private: // Used for LZ4 frame format, decompress speed is two times faster than LZ4. class Lz4fBlockCompression : public BlockCompressionCodec { private: - struct CContext { + class CContext { + ENABLE_FACTORY_CREATOR(CContext); + + public: CContext() : ctx(nullptr) {} LZ4F_compressionContext_t ctx; faststring buffer; + ~CContext() { + if (ctx) { + LZ4F_freeCompressionContext(ctx); + } + } }; - struct DContext { + class DContext { + ENABLE_FACTORY_CREATOR(DContext); + + public: DContext() : ctx(nullptr) {} LZ4F_decompressionContext_t ctx; + ~DContext() { + if (ctx) { + LZ4F_freeDecompressionContext(ctx); + } + } }; public: @@ -249,12 +262,8 @@ public: return &s_instance; } ~Lz4fBlockCompression() { - for (auto ctx : _ctx_c_pool) { - _delete_compression_ctx(ctx); - } - for (auto ctx : _ctx_d_pool) { - _delete_decompression_ctx(ctx); - } + _ctx_c_pool.clear(); + _ctx_d_pool.clear(); } Status compress(const Slice& input, faststring* output) override { @@ -279,14 +288,12 @@ public: private: Status _compress(const std::vector& inputs, size_t uncompressed_size, faststring* output) { - CContext* context = nullptr; - RETURN_IF_ERROR(_acquire_compression_ctx(&context)); + std::unique_ptr context; + RETURN_IF_ERROR(_acquire_compression_ctx(context)); bool compress_failed = false; Defer defer {[&] { - if (compress_failed) { - _delete_compression_ctx(context); - } else { - _release_compression_ctx(context); + if (!compress_failed) { + _release_compression_ctx(std::move(context)); } }}; Slice compressed_buf; @@ -340,13 +347,11 @@ private: Status _decompress(const Slice& input, Slice* output) { bool decompress_failed = false; - DContext* context = nullptr; - RETURN_IF_ERROR(_acquire_decompression_ctx(&context)); + std::unique_ptr context; + RETURN_IF_ERROR(_acquire_decompression_ctx(context)); Defer defer {[&] { - if (decompress_failed) { - _delete_decompression_ctx(context); - } else { - _release_decompression_ctx(context); + if (!decompress_failed) { + _release_decompression_ctx(std::move(context)); } }}; size_t input_size = input.size; @@ -373,66 +378,56 @@ private: private: // acquire a compression ctx from pool, release while finish compress, // delete if compression failed - Status _acquire_compression_ctx(CContext** out) { + Status _acquire_compression_ctx(std::unique_ptr& out) { std::lock_guard l(_ctx_c_mutex); if (_ctx_c_pool.empty()) { - CContext* context = new (std::nothrow) CContext(); - if (context == nullptr) { + std::unique_ptr localCtx = CContext::create_unique(); + if (localCtx.get() == nullptr) { return Status::InvalidArgument("failed to new LZ4F CContext"); } - auto res = LZ4F_createCompressionContext(&context->ctx, LZ4F_VERSION); + auto res = LZ4F_createCompressionContext(&localCtx->ctx, LZ4F_VERSION); if (LZ4F_isError(res) != 0) { return Status::InvalidArgument(strings::Substitute( "LZ4F_createCompressionContext error, res=$0", LZ4F_getErrorName(res))); } - *out = context; + out = std::move(localCtx); return Status::OK(); } - *out = _ctx_c_pool.back(); + out = std::move(_ctx_c_pool.back()); _ctx_c_pool.pop_back(); return Status::OK(); } - void _release_compression_ctx(CContext* context) { + void _release_compression_ctx(std::unique_ptr context) { DCHECK(context); std::lock_guard l(_ctx_c_mutex); - _ctx_c_pool.push_back(context); - } - void _delete_compression_ctx(CContext* context) { - DCHECK(context); - LZ4F_freeCompressionContext(context->ctx); - delete context; + _ctx_c_pool.push_back(std::move(context)); } - Status _acquire_decompression_ctx(DContext** out) { + Status _acquire_decompression_ctx(std::unique_ptr& out) { std::lock_guard l(_ctx_d_mutex); if (_ctx_d_pool.empty()) { - DContext* context = new (std::nothrow) DContext(); - if (context == nullptr) { + std::unique_ptr localCtx = DContext::create_unique(); + if (localCtx.get() == nullptr) { return Status::InvalidArgument("failed to new LZ4F DContext"); } - auto res = LZ4F_createDecompressionContext(&context->ctx, LZ4F_VERSION); + auto res = LZ4F_createDecompressionContext(&localCtx->ctx, LZ4F_VERSION); if (LZ4F_isError(res) != 0) { return Status::InvalidArgument(strings::Substitute( "LZ4F_createDeompressionContext error, res=$0", LZ4F_getErrorName(res))); } - *out = context; + out = std::move(localCtx); return Status::OK(); } - *out = _ctx_d_pool.back(); + out = std::move(_ctx_d_pool.back()); _ctx_d_pool.pop_back(); return Status::OK(); } - void _release_decompression_ctx(DContext* context) { + void _release_decompression_ctx(std::unique_ptr context) { DCHECK(context); // reset decompression context to avoid ERROR_maxBlockSize_invalid LZ4F_resetDecompressionContext(context->ctx); std::lock_guard l(_ctx_d_mutex); - _ctx_d_pool.push_back(context); - } - void _delete_decompression_ctx(DContext* context) { - DCHECK(context); - LZ4F_freeDecompressionContext(context->ctx); - delete context; + _ctx_d_pool.push_back(std::move(context)); } private: @@ -440,10 +435,10 @@ private: std::mutex _ctx_c_mutex; // LZ4F_compressionContext_t is a pointer so no copy here - std::vector _ctx_c_pool; + std::vector> _ctx_c_pool; std::mutex _ctx_d_mutex; - std::vector _ctx_d_pool; + std::vector> _ctx_d_pool; }; LZ4F_preferences_t Lz4fBlockCompression::_s_preferences = { @@ -456,10 +451,18 @@ LZ4F_preferences_t Lz4fBlockCompression::_s_preferences = { class Lz4HCBlockCompression : public BlockCompressionCodec { private: - struct Context { + class Context { + ENABLE_FACTORY_CREATOR(Context); + + public: Context() : ctx(nullptr) {} LZ4_streamHC_t* ctx; faststring buffer; + ~Context() { + if (ctx) { + LZ4_freeStreamHC(ctx); + } + } }; public: @@ -467,21 +470,15 @@ public: static Lz4HCBlockCompression s_instance; return &s_instance; } - ~Lz4HCBlockCompression() { - for (auto ctx : _ctx_pool) { - _delete_compression_ctx(ctx); - } - } + ~Lz4HCBlockCompression() { _ctx_pool.clear(); } Status compress(const Slice& input, faststring* output) override { - Context* context; - RETURN_IF_ERROR(_acquire_compression_ctx(&context)); + std::unique_ptr context; + RETURN_IF_ERROR(_acquire_compression_ctx(context)); bool compress_failed = false; Defer defer {[&] { - if (compress_failed) { - _delete_compression_ctx(context); - } else { - _release_compression_ctx(context); + if (!compress_failed) { + _release_compression_ctx(std::move(context)); } }}; Slice compressed_buf; @@ -525,41 +522,35 @@ public: size_t max_compressed_len(size_t len) override { return LZ4_compressBound(len); } private: - Status _acquire_compression_ctx(Context** out) { + Status _acquire_compression_ctx(std::unique_ptr& out) { std::lock_guard l(_ctx_mutex); if (_ctx_pool.empty()) { - Context* context = new (std::nothrow) Context(); - if (context == nullptr) { + std::unique_ptr localCtx = Context::create_unique(); + if (localCtx.get() == nullptr) { return Status::InvalidArgument("new LZ4HC context error"); } - context->ctx = LZ4_createStreamHC(); - if (context->ctx == nullptr) { - delete context; + localCtx->ctx = LZ4_createStreamHC(); + if (localCtx->ctx == nullptr) { return Status::InvalidArgument("LZ4_createStreamHC error"); } - *out = context; + out = std::move(localCtx); return Status::OK(); } - *out = _ctx_pool.back(); + out = std::move(_ctx_pool.back()); _ctx_pool.pop_back(); return Status::OK(); } - void _release_compression_ctx(Context* context) { + void _release_compression_ctx(std::unique_ptr context) { DCHECK(context); LZ4_resetStreamHC_fast(context->ctx, _compression_level); std::lock_guard l(_ctx_mutex); - _ctx_pool.push_back(context); - } - void _delete_compression_ctx(Context* context) { - DCHECK(context); - LZ4_freeStreamHC(context->ctx); - delete context; + _ctx_pool.push_back(std::move(context)); } private: int64_t _compression_level = config::LZ4_HC_compression_level; mutable std::mutex _ctx_mutex; - mutable std::vector _ctx_pool; + mutable std::vector> _ctx_pool; }; class SnappySlicesSource : public snappy::Source { @@ -751,14 +742,30 @@ public: // for ZSTD compression and decompression, with BOTH fast and high compression ratio class ZstdBlockCompression : public BlockCompressionCodec { private: - struct CContext { + class CContext { + ENABLE_FACTORY_CREATOR(CContext); + + public: CContext() : ctx(nullptr) {} ZSTD_CCtx* ctx; faststring buffer; + ~CContext() { + if (ctx) { + ZSTD_freeCCtx(ctx); + } + } }; - struct DContext { + class DContext { + ENABLE_FACTORY_CREATOR(DContext); + + public: DContext() : ctx(nullptr) {} ZSTD_DCtx* ctx; + ~DContext() { + if (ctx) { + ZSTD_freeDCtx(ctx); + } + } }; public: @@ -767,12 +774,8 @@ public: return &s_instance; } ~ZstdBlockCompression() { - for (auto ctx : _ctx_c_pool) { - _delete_compression_ctx(ctx); - } - for (auto ctx : _ctx_d_pool) { - _delete_decompression_ctx(ctx); - } + _ctx_c_pool.clear(); + _ctx_d_pool.clear(); } size_t max_compressed_len(size_t len) override { return ZSTD_compressBound(len); } @@ -786,14 +789,12 @@ public: // https://github.com/facebook/zstd/blob/dev/examples/streaming_compression.c Status compress(const std::vector& inputs, size_t uncompressed_size, faststring* output) override { - CContext* context; - RETURN_IF_ERROR(_acquire_compression_ctx(&context)); + std::unique_ptr context; + RETURN_IF_ERROR(_acquire_compression_ctx(context)); bool compress_failed = false; Defer defer {[&] { - if (compress_failed) { - _delete_compression_ctx(context); - } else { - _release_compression_ctx(context); + if (!compress_failed) { + _release_compression_ctx(std::move(context)); } }}; @@ -864,14 +865,12 @@ public: } Status decompress(const Slice& input, Slice* output) override { - DContext* context; + std::unique_ptr context; bool decompress_failed = false; - RETURN_IF_ERROR(_acquire_decompression_ctx(&context)); + RETURN_IF_ERROR(_acquire_decompression_ctx(context)); Defer defer {[&] { - if (decompress_failed) { - _delete_decompression_ctx(context); - } else { - _release_decompression_ctx(context); + if (!decompress_failed) { + _release_decompression_ctx(std::move(context)); } }}; @@ -890,76 +889,66 @@ public: } private: - Status _acquire_compression_ctx(CContext** out) { + Status _acquire_compression_ctx(std::unique_ptr& out) { std::lock_guard l(_ctx_c_mutex); if (_ctx_c_pool.empty()) { - CContext* context = new (std::nothrow) CContext(); - if (context == nullptr) { + std::unique_ptr localCtx = CContext::create_unique(); + if (localCtx.get() == nullptr) { return Status::InvalidArgument("failed to new ZSTD CContext"); } //typedef LZ4F_cctx* LZ4F_compressionContext_t; - context->ctx = ZSTD_createCCtx(); - if (context->ctx == nullptr) { + localCtx->ctx = ZSTD_createCCtx(); + if (localCtx->ctx == nullptr) { return Status::InvalidArgument("Failed to create ZSTD compress ctx"); } - *out = context; + out = std::move(localCtx); return Status::OK(); } - *out = _ctx_c_pool.back(); + out = std::move(_ctx_c_pool.back()); _ctx_c_pool.pop_back(); return Status::OK(); } - void _release_compression_ctx(CContext* context) { + void _release_compression_ctx(std::unique_ptr context) { DCHECK(context); auto ret = ZSTD_CCtx_reset(context->ctx, ZSTD_reset_session_only); DCHECK(!ZSTD_isError(ret)); std::lock_guard l(_ctx_c_mutex); - _ctx_c_pool.push_back(context); - } - void _delete_compression_ctx(CContext* context) { - DCHECK(context); - ZSTD_freeCCtx(context->ctx); - delete context; + _ctx_c_pool.push_back(std::move(context)); } - Status _acquire_decompression_ctx(DContext** out) { + Status _acquire_decompression_ctx(std::unique_ptr& out) { std::lock_guard l(_ctx_d_mutex); if (_ctx_d_pool.empty()) { - DContext* context = new (std::nothrow) DContext(); - if (context == nullptr) { + std::unique_ptr localCtx = DContext::create_unique(); + if (localCtx.get() == nullptr) { return Status::InvalidArgument("failed to new ZSTD DContext"); } - context->ctx = ZSTD_createDCtx(); - if (context->ctx == nullptr) { + localCtx->ctx = ZSTD_createDCtx(); + if (localCtx->ctx == nullptr) { return Status::InvalidArgument("Fail to init ZSTD decompress context"); } - *out = context; + out = std::move(localCtx); return Status::OK(); } - *out = _ctx_d_pool.back(); + out = std::move(_ctx_d_pool.back()); _ctx_d_pool.pop_back(); return Status::OK(); } - void _release_decompression_ctx(DContext* context) { + void _release_decompression_ctx(std::unique_ptr context) { DCHECK(context); // reset ctx to start a new decompress session auto ret = ZSTD_DCtx_reset(context->ctx, ZSTD_reset_session_only); DCHECK(!ZSTD_isError(ret)); std::lock_guard l(_ctx_d_mutex); - _ctx_d_pool.push_back(context); - } - void _delete_decompression_ctx(DContext* context) { - DCHECK(context); - ZSTD_freeDCtx(context->ctx); - delete context; + _ctx_d_pool.push_back(std::move(context)); } private: mutable std::mutex _ctx_c_mutex; - mutable std::vector _ctx_c_pool; + mutable std::vector> _ctx_c_pool; mutable std::mutex _ctx_d_mutex; - mutable std::vector _ctx_d_pool; + mutable std::vector> _ctx_d_pool; }; class GzipBlockCompression : public ZlibBlockCompression {