[CP] Support specify customed allocator for zstd_compressor

This commit is contained in:
obdev
2024-02-07 11:04:40 +00:00
committed by ob-robot
parent 9a22f3ea88
commit 61cdad9330
7 changed files with 202 additions and 92 deletions

View File

@ -20,6 +20,7 @@ namespace oceanbase
{
namespace common
{
class ObIAllocator;
class ObCompressor
{
public:
@ -28,6 +29,28 @@ public:
ObCompressor() {}
virtual ~ObCompressor() {}
virtual int compress(const char *src_buffer,
const int64_t src_data_size,
char *dst_buffer,
const int64_t dst_buffer_size,
int64_t &dst_data_size,
ObIAllocator *allocator)
{
UNUSED(allocator);
return compress(src_buffer, src_data_size, dst_buffer, dst_buffer_size,
dst_data_size);
}
virtual int decompress(const char *src_buffer,
const int64_t src_data_size,
char *dst_buffer,
const int64_t dst_buffer_size,
int64_t &dst_data_size,
ObIAllocator *allocator)
{
UNUSED(allocator);
return decompress(src_buffer, src_data_size, dst_buffer, dst_buffer_size,
dst_data_size);
}
virtual int compress(const char *src_buffer,
const int64_t src_data_size,
char *dst_buffer,

View File

@ -24,7 +24,7 @@ static void *ob_zstd_malloc(void *opaque, size_t size)
{
void *buf = NULL;
if (NULL != opaque) {
ObZstdCtxAllocator *allocator = reinterpret_cast<ObZstdCtxAllocator*> (opaque);
ObIAllocator *allocator = reinterpret_cast<ObIAllocator*> (opaque);
buf = allocator->alloc(size);
}
return buf;
@ -33,7 +33,7 @@ static void *ob_zstd_malloc(void *opaque, size_t size)
static void ob_zstd_free(void *opaque, void *address)
{
if (NULL != opaque) {
ObZstdCtxAllocator *allocator = reinterpret_cast<ObZstdCtxAllocator*> (opaque);
ObIAllocator *allocator = reinterpret_cast<ObIAllocator*> (opaque);
allocator->free(address);
}
}
@ -42,7 +42,7 @@ static void ob_zstd_free(void *opaque, void *address)
* ------------------------------ObZstdCtxAllocator---------------------
*/
ObZstdCtxAllocator::ObZstdCtxAllocator(int64_t tenant_id)
: allocator_(ObModIds::OB_COMPRESSOR, OB_MALLOC_BIG_BLOCK_SIZE,
: allocator_(ObModIds::OB_COMPRESSOR, ZSTD_ALLOCATOR_BLOCK_SIZE,
tenant_id)
{
}
@ -51,14 +51,14 @@ ObZstdCtxAllocator::~ObZstdCtxAllocator()
{
}
void* ObZstdCtxAllocator::alloc(size_t size)
void* ObZstdCtxAllocator::alloc(const int64_t size)
{
return allocator_.alloc(size);
}
void ObZstdCtxAllocator::free(void *addr)
void ObZstdCtxAllocator::free(void *ptr)
{
allocator_.free(addr);
allocator_.free(ptr);
}
void ObZstdCtxAllocator::reuse()
@ -78,13 +78,14 @@ int ObZstdCompressor::compress(const char *src_buffer,
const int64_t src_data_size,
char *dst_buffer,
const int64_t dst_buffer_size,
int64_t &dst_data_size)
int64_t &dst_data_size,
ObIAllocator *allocator)
{
int ret = OB_SUCCESS;
int64_t max_overflow_size = 0;
size_t compress_ret_size = 0;
ObZstdCtxAllocator &zstd_allocator = ObZstdCtxAllocator::get_thread_local_instance();
OB_ZSTD_customMem zstd_mem = {ob_zstd_malloc, ob_zstd_free, &zstd_allocator};
OB_ZSTD_customMem zstd_mem = {ob_zstd_malloc, ob_zstd_free, allocator ?: &zstd_allocator};
dst_data_size = 0;
if (NULL == src_buffer
@ -120,12 +121,13 @@ int ObZstdCompressor::decompress(const char *src_buffer,
const int64_t src_data_size,
char *dst_buffer,
const int64_t dst_buffer_size,
int64_t &dst_data_size)
int64_t &dst_data_size,
ObIAllocator *allocator)
{
int ret = OB_SUCCESS;
size_t decompress_ret_size = 0;
ObZstdCtxAllocator &zstd_allocator = ObZstdCtxAllocator::get_thread_local_instance();
OB_ZSTD_customMem zstd_mem = {ob_zstd_malloc, ob_zstd_free, &zstd_allocator};
OB_ZSTD_customMem zstd_mem = {ob_zstd_malloc, ob_zstd_free, allocator ?: &zstd_allocator};
dst_data_size = 0;
if (NULL == src_buffer

View File

@ -23,8 +23,9 @@ namespace common
namespace zstd
{
class ObZstdCtxAllocator
class ObZstdCtxAllocator : public ObIAllocator
{
static constexpr int64_t ZSTD_ALLOCATOR_BLOCK_SIZE = (1LL << 20) - (17LL << 10);
public:
ObZstdCtxAllocator(int64_t tenant_id);
virtual ~ObZstdCtxAllocator();
@ -33,10 +34,11 @@ public:
thread_local ObZstdCtxAllocator allocator(ob_thread_tenant_id());
return allocator;
}
void *alloc(size_t size);
void free(void *addr);
void reuse();
void reset();
void *alloc(const int64_t size) override;
void *alloc(const int64_t size, const ObMemAttr &attr) override { return NULL; }
void free(void *ptr) override;
void reuse() override;
void reset() override;
private:
ObArenaAllocator allocator_;
};
@ -50,12 +52,32 @@ public:
const int64_t src_data_size,
char *dst_buffer,
const int64_t dst_buffer_size,
int64_t &dst_data_size);
int64_t &dst_data_size,
ObIAllocator *allocator) override;
int decompress(const char *src_buffer,
const int64_t src_data_size,
char *dst_buffer,
const int64_t dst_buffer_size,
int64_t &dst_data_size);
int64_t &dst_data_size,
ObIAllocator *allocator) override;
int compress(const char *src_buffer,
const int64_t src_data_size,
char *dst_buffer,
const int64_t dst_buffer_size,
int64_t &dst_data_size) override
{
return compress(src_buffer, src_data_size, dst_buffer,
dst_buffer_size, dst_data_size, NULL);
}
int decompress(const char *src_buffer,
const int64_t src_data_size,
char *dst_buffer,
const int64_t dst_buffer_size,
int64_t &dst_data_size) override
{
return decompress(src_buffer, src_data_size, dst_buffer,
dst_buffer_size, dst_data_size, NULL);
}
const char *get_compressor_name() const;
ObCompressorType get_compressor_type() const;
int get_max_overflow_size(const int64_t src_data_size,

View File

@ -25,7 +25,7 @@ static void *ob_zstd_malloc(void *opaque, size_t size)
{
void *buf = NULL;
if (NULL != opaque) {
ObZstdCtxAllocator *allocator = reinterpret_cast<ObZstdCtxAllocator*> (opaque);
ObIAllocator *allocator = reinterpret_cast<ObIAllocator*> (opaque);
buf = allocator->alloc(size);
}
return buf;
@ -34,7 +34,7 @@ static void *ob_zstd_malloc(void *opaque, size_t size)
static void ob_zstd_free(void *opaque, void *address)
{
if (NULL != opaque) {
ObZstdCtxAllocator *allocator = reinterpret_cast<ObZstdCtxAllocator*> (opaque);
ObIAllocator *allocator = reinterpret_cast<ObIAllocator*> (opaque);
allocator->free(address);
}
}
@ -43,7 +43,7 @@ static void ob_zstd_free(void *opaque, void *address)
* ------------------------------ObZstdCtxAllocator---------------------
*/
ObZstdCtxAllocator::ObZstdCtxAllocator(int64_t tenant_id)
: allocator_(ObModIds::OB_COMPRESSOR, OB_MALLOC_BIG_BLOCK_SIZE,
: allocator_(ObModIds::OB_COMPRESSOR, ZSTD_ALLOCATOR_BLOCK_SIZE,
tenant_id)
{
}
@ -52,14 +52,14 @@ ObZstdCtxAllocator::~ObZstdCtxAllocator()
{
}
void* ObZstdCtxAllocator::alloc(size_t size)
void* ObZstdCtxAllocator::alloc(const int64_t size)
{
return allocator_.alloc(size);
}
void ObZstdCtxAllocator::free(void *addr)
void ObZstdCtxAllocator::free(void *ptr)
{
allocator_.free(addr);
allocator_.free(ptr);
}
void ObZstdCtxAllocator::reuse()
@ -79,13 +79,14 @@ int ObZstdCompressor_1_3_8::compress(const char *src_buffer,
const int64_t src_data_size,
char *dst_buffer,
const int64_t dst_buffer_size,
int64_t &dst_data_size)
int64_t &dst_data_size,
ObIAllocator *allocator)
{
int ret = OB_SUCCESS;
int64_t max_overflow_size = 0;
size_t compress_ret_size = 0;
ObZstdCtxAllocator &zstd_allocator = ObZstdCtxAllocator::get_thread_local_instance();
OB_ZSTD_customMem zstd_mem = {ob_zstd_malloc, ob_zstd_free, &zstd_allocator};
OB_ZSTD_customMem zstd_mem = {ob_zstd_malloc, ob_zstd_free, allocator ?: &zstd_allocator};
dst_data_size = 0;
if (NULL == src_buffer
@ -121,12 +122,13 @@ int ObZstdCompressor_1_3_8::decompress(const char *src_buffer,
const int64_t src_data_size,
char *dst_buffer,
const int64_t dst_buffer_size,
int64_t &dst_data_size)
int64_t &dst_data_size,
ObIAllocator *allocator)
{
int ret = OB_SUCCESS;
size_t decompress_ret_size = 0;
ObZstdCtxAllocator &zstd_allocator = ObZstdCtxAllocator::get_thread_local_instance();
OB_ZSTD_customMem zstd_mem = {ob_zstd_malloc, ob_zstd_free, &zstd_allocator};
OB_ZSTD_customMem zstd_mem = {ob_zstd_malloc, ob_zstd_free, allocator ?: &zstd_allocator};
dst_data_size = 0;
if (NULL == src_buffer

View File

@ -23,8 +23,9 @@ namespace common
namespace zstd_1_3_8
{
class ObZstdCtxAllocator
class ObZstdCtxAllocator : public ObIAllocator
{
static constexpr int64_t ZSTD_ALLOCATOR_BLOCK_SIZE = (1LL << 20) - (17LL << 10);
public:
ObZstdCtxAllocator(int64_t tenant_id);
virtual ~ObZstdCtxAllocator();
@ -33,10 +34,11 @@ public:
thread_local ObZstdCtxAllocator allocator(ob_thread_tenant_id());
return allocator;
}
void *alloc(size_t size);
void free(void *addr);
void reuse();
void reset();
void *alloc(const int64_t size) override;
void *alloc(const int64_t size, const ObMemAttr &attr) override { return NULL; }
void free(void *ptr) override;
void reuse() override;
void reset() override;
private:
ObArenaAllocator allocator_;
};
@ -50,12 +52,32 @@ public:
const int64_t src_data_size,
char *dst_buffer,
const int64_t dst_buffer_size,
int64_t &dst_data_size);
int64_t &dst_data_size,
ObIAllocator *allocator) override;
int decompress(const char *src_buffer,
const int64_t src_data_size,
char *dst_buffer,
const int64_t dst_buffer_size,
int64_t &dst_data_size);
int64_t &dst_data_size,
ObIAllocator *allocator) override;
int compress(const char *src_buffer,
const int64_t src_data_size,
char *dst_buffer,
const int64_t dst_buffer_size,
int64_t &dst_data_size) override
{
return compress(src_buffer, src_data_size, dst_buffer,
dst_buffer_size, dst_data_size, NULL);
}
int decompress(const char *src_buffer,
const int64_t src_data_size,
char *dst_buffer,
const int64_t dst_buffer_size,
int64_t &dst_data_size) override
{
return decompress(src_buffer, src_data_size, dst_buffer,
dst_buffer_size, dst_data_size, NULL);
}
const char *get_compressor_name() const;
ObCompressorType get_compressor_type() const;
int get_max_overflow_size(const int64_t src_data_size,