【bugfix合入】zstd压缩库内存管理改造

Co-authored-by: zhjc1124 <zhjc1124@gmail.com>
This commit is contained in:
obdev 2024-03-22 09:52:27 +00:00 committed by ob-robot
parent d98a647a20
commit 8a7386cdbf
21 changed files with 77 additions and 476 deletions

View File

@ -228,15 +228,21 @@ public:
void purge_extra_cached_block(int keep) {
for(int i = MAX_ARENA_NUM - 1; i >= keep; i--) {
Arena& arena = arena_[i];
arena.ref(1);
Block* old_blk = arena.clear();
if (NULL != old_blk) {
int64_t old_pos = INT64_MAX;
if (old_blk->freeze(old_pos)) {
arena.ref(-1);
arena.sync();
if (old_blk->retire(old_pos)) {
destroy_block(old_blk);
}
} else {
arena.ref(-1);
}
} else {
arena.ref(-1);
}
}
}

View File

@ -14,6 +14,7 @@
#define OB_COMPRESSOR_UTIL_H_
#include "lib/ob_define.h"
#include "lib/utility/ob_template_utils.h"
#include "lib/allocator/ob_allocator.h"
namespace oceanbase
{
@ -72,4 +73,23 @@ const char *const compress_funcs[] =
} /* namespace common */
} /* namespace oceanbase */
using oceanbase::common::ObIAllocator;
static void *ob_zstd_malloc(void *opaque, size_t size)
{
void *buf = NULL;
if (NULL != opaque) {
ObIAllocator *allocator = reinterpret_cast<ObIAllocator*> (opaque);
buf = allocator->alloc(size);
}
return buf;
}
static void ob_zstd_free(void *opaque, void *address)
{
if (NULL != opaque) {
ObIAllocator *allocator = reinterpret_cast<ObIAllocator*> (opaque);
allocator->free(address);
}
}
#endif /* OB_COMPRESSOR_UTIL_H_ */

View File

@ -29,28 +29,6 @@ public:
ObCompressor() {}
virtual ~ObCompressor() {}
virtual int compress(const char *src_buffer,
const int64_t src_data_size,
char *dst_buffer,
const int64_t dst_buffer_size,
int64_t &dst_data_size,
ObIAllocator *allocator)
{
UNUSED(allocator);
return compress(src_buffer, src_data_size, dst_buffer, dst_buffer_size,
dst_data_size);
}
virtual int decompress(const char *src_buffer,
const int64_t src_data_size,
char *dst_buffer,
const int64_t dst_buffer_size,
int64_t &dst_data_size,
ObIAllocator *allocator)
{
UNUSED(allocator);
return decompress(src_buffer, src_data_size, dst_buffer, dst_buffer_size,
dst_data_size);
}
virtual int compress(const char *src_buffer,
const int64_t src_data_size,
char *dst_buffer,
@ -63,8 +41,6 @@ public:
int64_t &dst_data_size) = 0;
virtual int get_max_overflow_size(const int64_t src_data_size,
int64_t &max_overflow_size) const = 0;
virtual void reset_mem() {}
virtual const char *get_compressor_name() const = 0;
virtual ObCompressorType get_compressor_type() const = 0;
};

View File

@ -17,17 +17,20 @@ namespace oceanbase
namespace common
{
ObCompressorPool::ObCompressorPool()
:none_compressor(),
:allocator_(SET_USE_500(ObMemAttr(OB_SERVER_TENANT_ID, "Compressor")), OB_MALLOC_BIG_BLOCK_SIZE),
none_compressor(),
lz4_compressor(),
lz4_compressor_1_9_1(),
snappy_compressor(),
zlib_compressor(),
zstd_compressor_1_3_8(),
zstd_compressor(allocator_),
zstd_compressor_1_3_8(allocator_),
zlib_lite_compressor(),
lz4_stream_compressor(),
zstd_stream_compressor(),
zstd_stream_compressor_1_3_8()
zstd_stream_compressor(allocator_),
zstd_stream_compressor_1_3_8(allocator_)
{
allocator_.set_nway(32);
}
ObCompressorPool &ObCompressorPool::get_instance()
{

View File

@ -15,6 +15,7 @@
#include "lib/compress/ob_compressor.h"
#include "lib/compress/ob_stream_compressor.h"
#include "lib/allocator/ob_vslice_alloc.h"
#include "none/ob_none_compressor.h"
#include "lz4/ob_lz4_compressor.h"
#include "snappy/ob_snappy_compressor.h"
@ -62,6 +63,7 @@ private:
ObCompressorPool();
virtual ~ObCompressorPool() {}
ObVSliceAlloc allocator_;
ObNoneCompressor none_compressor;
ObLZ4Compressor lz4_compressor;
ObLZ4Compressor191 lz4_compressor_1_9_1;

View File

@ -12,6 +12,7 @@
#include "ob_zstd_compressor.h"
#include "lib/ob_errno.h"
#include "lib/rc/context.h"
#include "lib/thread_local/ob_tsi_factory.h"
#include "ob_zstd_wrapper.h"
@ -20,57 +21,6 @@ using namespace common;
using namespace zstd;
static void *ob_zstd_malloc(void *opaque, size_t size)
{
void *buf = NULL;
if (NULL != opaque) {
ObIAllocator *allocator = reinterpret_cast<ObIAllocator*> (opaque);
buf = allocator->alloc(size);
}
return buf;
}
static void ob_zstd_free(void *opaque, void *address)
{
if (NULL != opaque) {
ObIAllocator *allocator = reinterpret_cast<ObIAllocator*> (opaque);
allocator->free(address);
}
}
/**
* ------------------------------ObZstdCtxAllocator---------------------
*/
ObZstdCtxAllocator::ObZstdCtxAllocator(int64_t tenant_id)
: allocator_(ObModIds::OB_COMPRESSOR, ZSTD_ALLOCATOR_BLOCK_SIZE,
tenant_id)
{
}
ObZstdCtxAllocator::~ObZstdCtxAllocator()
{
}
void* ObZstdCtxAllocator::alloc(const int64_t size)
{
return allocator_.alloc(size);
}
void ObZstdCtxAllocator::free(void *ptr)
{
allocator_.free(ptr);
}
void ObZstdCtxAllocator::reuse()
{
allocator_.reuse();
}
void ObZstdCtxAllocator::reset()
{
allocator_.reset();
}
/**
* ----------------------------ObZstdCompressor---------------------------
*/
@ -78,14 +28,12 @@ int ObZstdCompressor::compress(const char *src_buffer,
const int64_t src_data_size,
char *dst_buffer,
const int64_t dst_buffer_size,
int64_t &dst_data_size,
ObIAllocator *allocator)
int64_t &dst_data_size)
{
int ret = OB_SUCCESS;
int64_t max_overflow_size = 0;
size_t compress_ret_size = 0;
ObZstdCtxAllocator &zstd_allocator = ObZstdCtxAllocator::get_thread_local_instance();
OB_ZSTD_customMem zstd_mem = {ob_zstd_malloc, ob_zstd_free, allocator ?: &zstd_allocator};
OB_ZSTD_customMem zstd_mem = {ob_zstd_malloc, ob_zstd_free, &allocator_};
dst_data_size = 0;
if (NULL == src_buffer
@ -113,7 +61,6 @@ int ObZstdCompressor::compress(const char *src_buffer,
dst_data_size = compress_ret_size;
}
zstd_allocator.reuse();
return ret;
}
@ -121,13 +68,11 @@ int ObZstdCompressor::decompress(const char *src_buffer,
const int64_t src_data_size,
char *dst_buffer,
const int64_t dst_buffer_size,
int64_t &dst_data_size,
ObIAllocator *allocator)
int64_t &dst_data_size)
{
int ret = OB_SUCCESS;
size_t decompress_ret_size = 0;
ObZstdCtxAllocator &zstd_allocator = ObZstdCtxAllocator::get_thread_local_instance();
OB_ZSTD_customMem zstd_mem = {ob_zstd_malloc, ob_zstd_free, allocator ?: &zstd_allocator};
OB_ZSTD_customMem zstd_mem = {ob_zstd_malloc, ob_zstd_free, &allocator_};
dst_data_size = 0;
if (NULL == src_buffer
@ -148,16 +93,9 @@ int ObZstdCompressor::decompress(const char *src_buffer,
} else {
dst_data_size = decompress_ret_size;
}
zstd_allocator.reuse();
return ret;
}
void ObZstdCompressor::reset_mem()
{
ObZstdCtxAllocator &zstd_allocator = ObZstdCtxAllocator::get_thread_local_instance();
zstd_allocator.reset();
}
const char *ObZstdCompressor::get_compressor_name() const
{
return all_compressor_name[ObCompressorType::ZSTD_COMPRESSOR];

View File

@ -13,7 +13,6 @@
#ifndef OCEANBASE_COMMON_COMPRESS_ZSTD_COMPRESSOR_
#define OCEANBASE_COMMON_COMPRESS_ZSTD_COMPRESSOR_
#include "lib/compress/ob_compressor.h"
#include "lib/allocator/page_arena.h"
namespace oceanbase
{
@ -23,66 +22,28 @@ namespace common
namespace zstd
{
class ObZstdCtxAllocator : public ObIAllocator
{
static constexpr int64_t ZSTD_ALLOCATOR_BLOCK_SIZE = (1LL << 20) - (17LL << 10);
public:
ObZstdCtxAllocator(int64_t tenant_id);
virtual ~ObZstdCtxAllocator();
static ObZstdCtxAllocator &get_thread_local_instance()
{
thread_local ObZstdCtxAllocator allocator(ob_thread_tenant_id());
return allocator;
}
void *alloc(const int64_t size) override;
void *alloc(const int64_t size, const ObMemAttr &attr) override { return NULL; }
void free(void *ptr) override;
void reuse() override;
void reset() override;
private:
ObArenaAllocator allocator_;
};
class ObZstdCompressor : public ObCompressor
{
public:
explicit ObZstdCompressor() {}
explicit ObZstdCompressor(ObIAllocator &allocator)
: allocator_(allocator) {}
virtual ~ObZstdCompressor() {}
int compress(const char *src_buffer,
const int64_t src_data_size,
char *dst_buffer,
const int64_t dst_buffer_size,
int64_t &dst_data_size,
ObIAllocator *allocator) override;
int64_t &dst_data_size) override;
int decompress(const char *src_buffer,
const int64_t src_data_size,
char *dst_buffer,
const int64_t dst_buffer_size,
int64_t &dst_data_size,
ObIAllocator *allocator) override;
int compress(const char *src_buffer,
const int64_t src_data_size,
char *dst_buffer,
const int64_t dst_buffer_size,
int64_t &dst_data_size) override
{
return compress(src_buffer, src_data_size, dst_buffer,
dst_buffer_size, dst_data_size, NULL);
}
int decompress(const char *src_buffer,
const int64_t src_data_size,
char *dst_buffer,
const int64_t dst_buffer_size,
int64_t &dst_data_size) override
{
return decompress(src_buffer, src_data_size, dst_buffer,
dst_buffer_size, dst_data_size, NULL);
}
int64_t &dst_data_size) override;
const char *get_compressor_name() const;
ObCompressorType get_compressor_type() const;
int get_max_overflow_size(const int64_t src_data_size,
int64_t &max_overflow_size) const;
void reset_mem();
private:
ObIAllocator &allocator_;
};
} // namespace zstd

View File

@ -20,47 +20,6 @@ using namespace oceanbase;
using namespace common;
using namespace zstd;
static void *ob_zstd_stream_malloc(void *opaque, size_t size)
{
void *buf = NULL;
if (NULL != opaque) {
ObZstdStreamCtxAllocator *allocator = reinterpret_cast<ObZstdStreamCtxAllocator*> (opaque);
buf = allocator->alloc(size);
}
return buf;
}
static void ob_zstd_stream_free(void *opaque, void *address)
{
if (NULL != opaque) {
ObZstdStreamCtxAllocator *allocator = reinterpret_cast<ObZstdStreamCtxAllocator*> (opaque);
allocator->free(address);
}
}
/**
* ------------------------------ObZstdStreamCtxAllocator---------------------
*/
ObZstdStreamCtxAllocator::ObZstdStreamCtxAllocator()
: allocator_(ObModIds::OB_STREAM_COMPRESSOR, OB_SERVER_TENANT_ID)
{
}
ObZstdStreamCtxAllocator::~ObZstdStreamCtxAllocator()
{
}
void* ObZstdStreamCtxAllocator::alloc(size_t size)
{
return allocator_.alloc(size);
}
void ObZstdStreamCtxAllocator::free(void *addr)
{
allocator_.free(addr);
}
/**
* ------------------------------ObZstdStreamCompressor---------------------
*/
@ -79,8 +38,7 @@ int ObZstdStreamCompressor::create_compress_ctx(void *&ctx)
int ret = OB_SUCCESS;
ctx = NULL;
ObZstdStreamCtxAllocator &zstd_allocator = ObZstdStreamCtxAllocator::get_thread_local_instance();
OB_ZSTD_customMem zstd_mem = {ob_zstd_stream_malloc, ob_zstd_stream_free, &zstd_allocator};
OB_ZSTD_customMem zstd_mem = {ob_zstd_malloc, ob_zstd_free, &allocator_};
if (OB_FAIL(ObZstdWrapper::create_cctx(zstd_mem, ctx))) {
LIB_LOG(WARN, "failed to create cctx", K(ret));
}
@ -146,8 +104,7 @@ int ObZstdStreamCompressor::stream_compress(void *ctx, const char *src, const in
int ObZstdStreamCompressor::create_decompress_ctx(void *&ctx)
{
int ret = OB_SUCCESS;
ObZstdStreamCtxAllocator &zstd_allocator = ObZstdStreamCtxAllocator::get_thread_local_instance();
OB_ZSTD_customMem zstd_mem = {ob_zstd_stream_malloc, ob_zstd_stream_free, &zstd_allocator};
OB_ZSTD_customMem zstd_mem = {ob_zstd_malloc, ob_zstd_free, &allocator_};
ctx = NULL;
if (OB_FAIL(ObZstdWrapper::create_dctx(zstd_mem, ctx))) {

View File

@ -14,7 +14,6 @@
#define OCEANBASE_COMMON_STREAM_COMPRESS_ZSTD_COMPRESSOR_
#include "lib/compress/ob_stream_compressor.h"
#include "lib/allocator/page_arena.h"
namespace oceanbase
{
@ -23,26 +22,11 @@ namespace common
namespace zstd
{
class ObZstdStreamCtxAllocator
{
public:
ObZstdStreamCtxAllocator();
virtual ~ObZstdStreamCtxAllocator();
static ObZstdStreamCtxAllocator &get_thread_local_instance()
{
thread_local ObZstdStreamCtxAllocator allocator;
return allocator;
}
void *alloc(size_t size);
void free(void *addr);
private:
ModulePageAllocator allocator_;
};
class ObZstdStreamCompressor : public ObStreamCompressor
{
public:
explicit ObZstdStreamCompressor() {}
explicit ObZstdStreamCompressor(ObIAllocator &allocator)
: allocator_(allocator) {}
virtual ~ObZstdStreamCompressor() {}
const char *get_compressor_name() const;
@ -63,7 +47,8 @@ public:
int get_compress_bound_size(const int64_t src_size, int64_t &bound_size) const;
int insert_uncompressed_block(void *dctx, const void *block, const int64_t block_size);
private:
ObIAllocator &allocator_;
};
} // namespace zstd
} //namespace common

View File

@ -13,6 +13,7 @@
#include "ob_zstd_compressor_1_3_8.h"
#include "lib/ob_errno.h"
#include "lib/rc/context.h"
#include "lib/thread_local/ob_tsi_factory.h"
#include "ob_zstd_wrapper.h"
@ -21,57 +22,6 @@ using namespace common;
using namespace zstd_1_3_8;
static void *ob_zstd_malloc(void *opaque, size_t size)
{
void *buf = NULL;
if (NULL != opaque) {
ObIAllocator *allocator = reinterpret_cast<ObIAllocator*> (opaque);
buf = allocator->alloc(size);
}
return buf;
}
static void ob_zstd_free(void *opaque, void *address)
{
if (NULL != opaque) {
ObIAllocator *allocator = reinterpret_cast<ObIAllocator*> (opaque);
allocator->free(address);
}
}
/**
* ------------------------------ObZstdCtxAllocator---------------------
*/
ObZstdCtxAllocator::ObZstdCtxAllocator(int64_t tenant_id)
: allocator_(ObModIds::OB_COMPRESSOR, ZSTD_ALLOCATOR_BLOCK_SIZE,
tenant_id)
{
}
ObZstdCtxAllocator::~ObZstdCtxAllocator()
{
}
void* ObZstdCtxAllocator::alloc(const int64_t size)
{
return allocator_.alloc(size);
}
void ObZstdCtxAllocator::free(void *ptr)
{
allocator_.free(ptr);
}
void ObZstdCtxAllocator::reuse()
{
allocator_.reuse();
}
void ObZstdCtxAllocator::reset()
{
allocator_.reset();
}
/**
* ----------------------------ObZstdCompressor---------------------------
*/
@ -79,14 +29,12 @@ int ObZstdCompressor_1_3_8::compress(const char *src_buffer,
const int64_t src_data_size,
char *dst_buffer,
const int64_t dst_buffer_size,
int64_t &dst_data_size,
ObIAllocator *allocator)
int64_t &dst_data_size)
{
int ret = OB_SUCCESS;
int64_t max_overflow_size = 0;
size_t compress_ret_size = 0;
ObZstdCtxAllocator &zstd_allocator = ObZstdCtxAllocator::get_thread_local_instance();
OB_ZSTD_customMem zstd_mem = {ob_zstd_malloc, ob_zstd_free, allocator ?: &zstd_allocator};
OB_ZSTD_customMem zstd_mem = {ob_zstd_malloc, ob_zstd_free, &allocator_};
dst_data_size = 0;
if (NULL == src_buffer
@ -114,7 +62,6 @@ int ObZstdCompressor_1_3_8::compress(const char *src_buffer,
dst_data_size = compress_ret_size;
}
zstd_allocator.reuse();
return ret;
}
@ -122,13 +69,11 @@ int ObZstdCompressor_1_3_8::decompress(const char *src_buffer,
const int64_t src_data_size,
char *dst_buffer,
const int64_t dst_buffer_size,
int64_t &dst_data_size,
ObIAllocator *allocator)
int64_t &dst_data_size)
{
int ret = OB_SUCCESS;
size_t decompress_ret_size = 0;
ObZstdCtxAllocator &zstd_allocator = ObZstdCtxAllocator::get_thread_local_instance();
OB_ZSTD_customMem zstd_mem = {ob_zstd_malloc, ob_zstd_free, allocator ?: &zstd_allocator};
OB_ZSTD_customMem zstd_mem = {ob_zstd_malloc, ob_zstd_free, &allocator_};
dst_data_size = 0;
if (NULL == src_buffer
@ -150,16 +95,9 @@ int ObZstdCompressor_1_3_8::decompress(const char *src_buffer,
dst_data_size = decompress_ret_size;
}
zstd_allocator.reuse();
return ret;
}
void ObZstdCompressor_1_3_8::reset_mem()
{
ObZstdCtxAllocator &zstd_allocator = ObZstdCtxAllocator::get_thread_local_instance();
zstd_allocator.reset();
}
const char *ObZstdCompressor_1_3_8::get_compressor_name() const
{
return all_compressor_name[ObCompressorType::ZSTD_1_3_8_COMPRESSOR];

View File

@ -13,7 +13,6 @@
#ifndef OCEANBASE_COMMON_COMPRESS_ZSTD_1_3_8_COMPRESSOR_
#define OCEANBASE_COMMON_COMPRESS_ZSTD_1_3_8_COMPRESSOR_
#include "lib/compress/ob_compressor.h"
#include "lib/allocator/page_arena.h"
namespace oceanbase
{
@ -23,66 +22,28 @@ namespace common
namespace zstd_1_3_8
{
class ObZstdCtxAllocator : public ObIAllocator
{
static constexpr int64_t ZSTD_ALLOCATOR_BLOCK_SIZE = (1LL << 20) - (17LL << 10);
public:
ObZstdCtxAllocator(int64_t tenant_id);
virtual ~ObZstdCtxAllocator();
static ObZstdCtxAllocator &get_thread_local_instance()
{
thread_local ObZstdCtxAllocator allocator(ob_thread_tenant_id());
return allocator;
}
void *alloc(const int64_t size) override;
void *alloc(const int64_t size, const ObMemAttr &attr) override { return NULL; }
void free(void *ptr) override;
void reuse() override;
void reset() override;
private:
ObArenaAllocator allocator_;
};
class __attribute__((visibility ("default"))) ObZstdCompressor_1_3_8 : public ObCompressor
{
public:
explicit ObZstdCompressor_1_3_8() {}
explicit ObZstdCompressor_1_3_8(ObIAllocator &allocator)
: allocator_(allocator) {}
virtual ~ObZstdCompressor_1_3_8() {}
int compress(const char *src_buffer,
const int64_t src_data_size,
char *dst_buffer,
const int64_t dst_buffer_size,
int64_t &dst_data_size,
ObIAllocator *allocator) override;
int64_t &dst_data_size) override;
int decompress(const char *src_buffer,
const int64_t src_data_size,
char *dst_buffer,
const int64_t dst_buffer_size,
int64_t &dst_data_size,
ObIAllocator *allocator) override;
int compress(const char *src_buffer,
const int64_t src_data_size,
char *dst_buffer,
const int64_t dst_buffer_size,
int64_t &dst_data_size) override
{
return compress(src_buffer, src_data_size, dst_buffer,
dst_buffer_size, dst_data_size, NULL);
}
int decompress(const char *src_buffer,
const int64_t src_data_size,
char *dst_buffer,
const int64_t dst_buffer_size,
int64_t &dst_data_size) override
{
return decompress(src_buffer, src_data_size, dst_buffer,
dst_buffer_size, dst_data_size, NULL);
}
int64_t &dst_data_size) override;
const char *get_compressor_name() const;
ObCompressorType get_compressor_type() const;
int get_max_overflow_size(const int64_t src_data_size,
int64_t &max_overflow_size) const;
void reset_mem();
private:
ObIAllocator &allocator_;
};
} // namespace zstd_1_3_8

View File

@ -21,47 +21,6 @@ using namespace oceanbase;
using namespace common;
using namespace zstd_1_3_8;
static void *ob_zstd_stream_malloc(void *opaque, size_t size)
{
void *buf = NULL;
if (NULL != opaque) {
ObZstdStreamCtxAllocator *allocator = reinterpret_cast<ObZstdStreamCtxAllocator*> (opaque);
buf = allocator->alloc(size);
}
return buf;
}
static void ob_zstd_stream_free(void *opaque, void *address)
{
if (NULL != opaque) {
ObZstdStreamCtxAllocator *allocator = reinterpret_cast<ObZstdStreamCtxAllocator*> (opaque);
allocator->free(address);
}
}
/**
* ------------------------------ObZstdStreamCtxAllocator---------------------
*/
ObZstdStreamCtxAllocator::ObZstdStreamCtxAllocator()
: allocator_(ObModIds::OB_STREAM_COMPRESSOR, OB_SERVER_TENANT_ID)
{
}
ObZstdStreamCtxAllocator::~ObZstdStreamCtxAllocator()
{
}
void* ObZstdStreamCtxAllocator::alloc(size_t size)
{
return allocator_.alloc(size);
}
void ObZstdStreamCtxAllocator::free(void *addr)
{
allocator_.free(addr);
}
/**
* ------------------------------ObZstdStreamCompressor---------------------
*/
@ -81,8 +40,7 @@ int ObZstdStreamCompressor_1_3_8::create_compress_ctx(void *&ctx)
int ret = OB_SUCCESS;
ctx = NULL;
ObZstdStreamCtxAllocator &zstd_allocator = ObZstdStreamCtxAllocator::get_thread_local_instance();
OB_ZSTD_customMem zstd_mem = {ob_zstd_stream_malloc, ob_zstd_stream_free, &zstd_allocator};
OB_ZSTD_customMem zstd_mem = {ob_zstd_malloc, ob_zstd_free, &allocator_};
if (OB_FAIL(ObZstdWrapper::create_cctx(zstd_mem, ctx))) {
LIB_LOG(WARN, "failed to create cctx", K(ret));
}
@ -148,8 +106,7 @@ int ObZstdStreamCompressor_1_3_8::stream_compress(void *ctx, const char *src, co
int ObZstdStreamCompressor_1_3_8::create_decompress_ctx(void *&ctx)
{
int ret = OB_SUCCESS;
ObZstdStreamCtxAllocator &zstd_allocator = ObZstdStreamCtxAllocator::get_thread_local_instance();
OB_ZSTD_customMem zstd_mem = {ob_zstd_stream_malloc, ob_zstd_stream_free, &zstd_allocator};
OB_ZSTD_customMem zstd_mem = {ob_zstd_malloc, ob_zstd_free, &allocator_};
ctx = NULL;
if (OB_FAIL(ObZstdWrapper::create_dctx(zstd_mem, ctx))) {

View File

@ -23,26 +23,11 @@ namespace common
namespace zstd_1_3_8
{
class ObZstdStreamCtxAllocator
{
public:
ObZstdStreamCtxAllocator();
virtual ~ObZstdStreamCtxAllocator();
static ObZstdStreamCtxAllocator &get_thread_local_instance()
{
thread_local ObZstdStreamCtxAllocator allocator;
return allocator;
}
void *alloc(size_t size);
void free(void *addr);
private:
ModulePageAllocator allocator_;
};
class ObZstdStreamCompressor_1_3_8 : public ObStreamCompressor
{
public:
explicit ObZstdStreamCompressor_1_3_8() {}
explicit ObZstdStreamCompressor_1_3_8(ObIAllocator &allocator)
: allocator_(allocator) {}
virtual ~ObZstdStreamCompressor_1_3_8() {}
inline const char *get_compressor_name() const;
@ -63,7 +48,8 @@ public:
int get_compress_bound_size(const int64_t src_size, int64_t &bound_size) const;
int insert_uncompressed_block(void *dctx, const void *block, const int64_t block_size);
private:
ObIAllocator &allocator_;
};
} // namespace zstd_1_3_8
} //namespace common

View File

@ -17,7 +17,7 @@
#include "lib/profile/ob_trace_id.h"
#include "lib/utility/ob_print_utils.h"
#include "lib/checksum/ob_crc64.h"
#include "lib/compress/ob_compressor_pool.h"
#include "lib/compress/ob_compressor.h"
#include "rpc/obrpc/ob_rpc_time.h"
#include "rpc/ob_packet.h"
#include "common/errsim_module/ob_errsim_module_type.h"

View File

@ -132,3 +132,4 @@ oblib_addtest(codec/test_fast_delta.cpp)
oblib_addtest(codec/test_bitpacking.cpp)
oblib_addtest(codec/test_codec_performance.cpp)
oblib_addtest(codec/test_bitpacking_performance.cpp)
oblib_addtest(compress/test_compressor_pool.cpp)

View File

@ -199,6 +199,7 @@ void TestCompressorStress::run1()
class ObCompressorTest : public testing::Test
{
public:
ObCompressorTest() : zstd_compressor(alloc) {}
static void SetUpTestCase()
{
memset(const_cast<char *>(compress_buffer), '\0', 100);
@ -215,6 +216,7 @@ public:
static char decompress_buffer[1000];
static int64_t buffer_size;
static int64_t dst_data_size;
ObMalloc alloc;
ObNoneCompressor none_compressor;
ObLZ4Compressor lz4_compressor;
ObSnappyCompressor snappy_compressor;
@ -383,7 +385,8 @@ TEST(ObCompressorStress, compress_stable)
int ret = OB_SUCCESS;
const int64_t sleep_sec = 1;
TestCompressorStress cmp_stress;
ObZstdCompressor zstd_compressor;
ObMalloc alloc;
ObZstdCompressor zstd_compressor(alloc);
ret = cmp_stress.init(30000, 100000, &zstd_compressor);
ASSERT_EQ(OB_SUCCESS, ret);
@ -556,85 +559,6 @@ public:
int64_t free_count_;
};
void test_zstd_family(ObCompressor &compressor)
{
MyAlloc alloc;
ASSERT_EQ(alloc.alloc_count_, 0);
ASSERT_EQ(alloc.free_count_, alloc.alloc_count_);
int64_t src_len = 2L<<20;
char *src_buf = (char*)ob_malloc(src_len, "test");
for (int i = 0; i < src_len; i++) {
src_buf[i] = static_cast<char> ('a' + ObRandom::rand(0, 25));;
}
char *cmp_buf = (char*)ob_malloc(src_len, "test");
memcpy(cmp_buf, src_buf, src_len);
int64_t max_overflow_size = 0;
int ret = compressor.get_max_overflow_size(src_len, max_overflow_size);
ASSERT_EQ(OB_SUCCESS, ret);
int64_t dst_buf_len = src_len + max_overflow_size;
char *dst_buf = (char*)ob_malloc(dst_buf_len, "test");
int64_t dst_actual_len = 0;
ret = compressor.compress(src_buf, src_len, dst_buf, dst_buf_len, dst_actual_len, &alloc);
ASSERT_EQ(OB_SUCCESS, ret);
int64_t alloc_count_bak = alloc.alloc_count_;
ASSERT_NE(alloc.alloc_count_, 0);
ASSERT_EQ(alloc.free_count_, alloc.alloc_count_);
memset(src_buf, 0, src_len);
int64_t src_actual_len = 0;
ret = compressor.decompress(dst_buf, dst_actual_len, src_buf, src_len, src_actual_len, &alloc);
ASSERT_EQ(OB_SUCCESS, ret);
ASSERT_EQ(src_actual_len, src_len);
ASSERT_EQ(0, memcmp(cmp_buf, src_buf, src_len));
ASSERT_GT(alloc.alloc_count_, alloc_count_bak);
ASSERT_EQ(alloc.free_count_, alloc.alloc_count_);
alloc_count_bak = alloc.alloc_count_;
// decompress without allocator
memset(src_buf, 0, src_len);
src_actual_len = 0;
ret = compressor.decompress(dst_buf, dst_actual_len, src_buf, src_len, src_actual_len);
ASSERT_EQ(OB_SUCCESS, ret);
ASSERT_EQ(src_actual_len, src_len);
ASSERT_EQ(0, memcmp(cmp_buf, src_buf, src_len));
ASSERT_EQ(alloc.alloc_count_, alloc_count_bak);
ASSERT_EQ(alloc.free_count_, alloc.alloc_count_);
}
TEST_F(ObCompressorTest, test_zstd_custom_alloc)
{
{
oceanbase::zstd::ObZstdCompressor compressor;
test_zstd_family(compressor);
}
{
oceanbase::zstd_1_3_8::ObZstdCompressor_1_3_8 compressor;
test_zstd_family(compressor);
}
{
oceanbase::common::ObLZ4Compressor lz4_compressor;
ObCompressor &compressor = lz4_compressor;
MyAlloc alloc;
ASSERT_EQ(alloc.alloc_count_, 0);
ASSERT_EQ(alloc.free_count_, alloc.alloc_count_);
int64_t src_len = 2L<<20;
char *src_buf = (char*)ob_malloc(src_len, "test");
int64_t max_overflow_size = 0;
int ret = compressor.get_max_overflow_size(src_len, max_overflow_size);
ASSERT_EQ(OB_SUCCESS, ret);
int64_t dst_buf_len = src_len + max_overflow_size;
char *dst_buf = (char*)ob_malloc(dst_buf_len, "test");
int64_t dst_actual_len = 0;
ret = compressor.compress(src_buf, src_len, dst_buf, dst_buf_len, dst_actual_len, &alloc);
ASSERT_EQ(OB_SUCCESS, ret);
ASSERT_EQ(alloc.alloc_count_, 0);
int64_t src_actual_len = 0;
ret = compressor.decompress(dst_buf, dst_actual_len, src_buf, src_len, src_actual_len, &alloc);
ASSERT_EQ(OB_SUCCESS, ret);
ASSERT_EQ(alloc.alloc_count_, 0);
ASSERT_EQ(src_actual_len, src_len);
}
}
}
}

View File

@ -140,7 +140,6 @@ public:
void free_log_io_flashback_task(palf::LogIOFlashbackTask *ptr);
palf::LogIOPurgeThrottlingTask *alloc_log_io_purge_throttling_task(const int64_t palf_id, const int64_t palf_epoch);
void free_log_io_purge_throttling_task(palf::LogIOPurgeThrottlingTask *ptr);
private:
uint64_t tenant_id_ CACHE_ALIGNED;
int64_t total_limit_;

View File

@ -1172,7 +1172,7 @@ int ObOptStatSqlService::fill_table_stat(common::sqlclient::ObMySQLResult &resul
} else if (OB_LIKELY(obj_type.is_double())) {
EXTRACT_DOUBLE_FIELD_TO_CLASS_MYSQL(result, avg_row_size, stat, int64_t);
} else {
EXTRACT_INT_FIELD_TO_CLASS_MYSQL(result, avg_row_size, stat, int64_t);
EXTRACT_INT_FIELD_TO_CLASS_MYSQL(result, avg_row_size, stat, int64_t);
}
}
EXTRACT_INT_FIELD_TO_CLASS_MYSQL(result, macro_block_num, stat, int64_t);
@ -1483,9 +1483,6 @@ int ObOptStatSqlService::get_compressed_llc_bitmap(ObIAllocator &allocator,
comp_buf = const_cast<char*>(bitmap_buf);
comp_size = bitmap_size;
}
if (compressor != nullptr) {
compressor->reset_mem();
}
}
return ret;
}
@ -1527,8 +1524,6 @@ int ObOptStatSqlService::get_decompressed_llc_bitmap(ObIAllocator &allocator,
LOG_WARN("decompress bitmap buffer failed.",
KP(comp_buf), K(comp_size), KP(bitmap_buf),
K(max_bitmap_size), K(bitmap_size), K(ret));
} else {
compressor->reset_mem();
}
return ret;
}

View File

@ -38,7 +38,6 @@ void ObChunkBlockCompressor::reset()
{
compressor_type_ = NONE_COMPRESSOR;
if (compressor_ != nullptr) {
compressor_->reset_mem();
compressor_ = nullptr;
}
}
@ -125,4 +124,4 @@ int ObChunkBlockCompressor::decompress(const char *in, const int64_t in_size,
}
}
}
}

View File

@ -52,9 +52,6 @@ ObMicroBlockCompressor::ObMicroBlockCompressor()
ObMicroBlockCompressor::~ObMicroBlockCompressor()
{
if (compressor_ != nullptr) {
compressor_->reset_mem();
}
}
void ObMicroBlockCompressor::reset()
@ -62,7 +59,6 @@ void ObMicroBlockCompressor::reset()
is_none_ = false;
micro_block_size_ = 0;
if (compressor_ != nullptr) {
compressor_->reset_mem();
compressor_ = nullptr;
}
comp_buf_.reuse();

View File

@ -55,9 +55,6 @@ ObMacroBlockReader::~ObMacroBlockReader()
ob_free(encryption_);
encryption_ = nullptr;
}
if (nullptr != compressor_) {
compressor_->reset_mem();
}
}
#ifdef OB_BUILD_TDE_SECURITY