diff --git a/deps/oblib/src/lib/compress/ob_compressor.h b/deps/oblib/src/lib/compress/ob_compressor.h index 28fa885cfe..36d22ae34c 100644 --- a/deps/oblib/src/lib/compress/ob_compressor.h +++ b/deps/oblib/src/lib/compress/ob_compressor.h @@ -20,6 +20,7 @@ namespace oceanbase { namespace common { +class ObIAllocator; class ObCompressor { public: @@ -28,6 +29,28 @@ public: ObCompressor() {} virtual ~ObCompressor() {} + virtual int compress(const char *src_buffer, + const int64_t src_data_size, + char *dst_buffer, + const int64_t dst_buffer_size, + int64_t &dst_data_size, + ObIAllocator *allocator) + { + UNUSED(allocator); + return compress(src_buffer, src_data_size, dst_buffer, dst_buffer_size, + dst_data_size); + } + virtual int decompress(const char *src_buffer, + const int64_t src_data_size, + char *dst_buffer, + const int64_t dst_buffer_size, + int64_t &dst_data_size, + ObIAllocator *allocator) + { + UNUSED(allocator); + return decompress(src_buffer, src_data_size, dst_buffer, dst_buffer_size, + dst_data_size); + } virtual int compress(const char *src_buffer, const int64_t src_data_size, char *dst_buffer, diff --git a/deps/oblib/src/lib/compress/zstd/ob_zstd_compressor.cpp b/deps/oblib/src/lib/compress/zstd/ob_zstd_compressor.cpp index 0a95a725cf..4eb3d1a0a8 100644 --- a/deps/oblib/src/lib/compress/zstd/ob_zstd_compressor.cpp +++ b/deps/oblib/src/lib/compress/zstd/ob_zstd_compressor.cpp @@ -24,7 +24,7 @@ static void *ob_zstd_malloc(void *opaque, size_t size) { void *buf = NULL; if (NULL != opaque) { - ObZstdCtxAllocator *allocator = reinterpret_cast (opaque); + ObIAllocator *allocator = reinterpret_cast (opaque); buf = allocator->alloc(size); } return buf; @@ -33,7 +33,7 @@ static void *ob_zstd_malloc(void *opaque, size_t size) static void ob_zstd_free(void *opaque, void *address) { if (NULL != opaque) { - ObZstdCtxAllocator *allocator = reinterpret_cast (opaque); + ObIAllocator *allocator = reinterpret_cast (opaque); allocator->free(address); } } @@ -42,7 +42,7 @@ static void ob_zstd_free(void *opaque, void *address) * ------------------------------ObZstdCtxAllocator--------------------- */ ObZstdCtxAllocator::ObZstdCtxAllocator(int64_t tenant_id) - : allocator_(ObModIds::OB_COMPRESSOR, OB_MALLOC_BIG_BLOCK_SIZE, + : allocator_(ObModIds::OB_COMPRESSOR, ZSTD_ALLOCATOR_BLOCK_SIZE, tenant_id) { } @@ -51,14 +51,14 @@ ObZstdCtxAllocator::~ObZstdCtxAllocator() { } -void* ObZstdCtxAllocator::alloc(size_t size) +void* ObZstdCtxAllocator::alloc(const int64_t size) { return allocator_.alloc(size); } -void ObZstdCtxAllocator::free(void *addr) +void ObZstdCtxAllocator::free(void *ptr) { - allocator_.free(addr); + allocator_.free(ptr); } void ObZstdCtxAllocator::reuse() @@ -78,13 +78,14 @@ int ObZstdCompressor::compress(const char *src_buffer, const int64_t src_data_size, char *dst_buffer, const int64_t dst_buffer_size, - int64_t &dst_data_size) + int64_t &dst_data_size, + ObIAllocator *allocator) { int ret = OB_SUCCESS; int64_t max_overflow_size = 0; size_t compress_ret_size = 0; ObZstdCtxAllocator &zstd_allocator = ObZstdCtxAllocator::get_thread_local_instance(); - OB_ZSTD_customMem zstd_mem = {ob_zstd_malloc, ob_zstd_free, &zstd_allocator}; + OB_ZSTD_customMem zstd_mem = {ob_zstd_malloc, ob_zstd_free, allocator ?: &zstd_allocator}; dst_data_size = 0; if (NULL == src_buffer @@ -120,12 +121,13 @@ int ObZstdCompressor::decompress(const char *src_buffer, const int64_t src_data_size, char *dst_buffer, const int64_t dst_buffer_size, - int64_t &dst_data_size) + int64_t &dst_data_size, + ObIAllocator *allocator) { int ret = OB_SUCCESS; size_t decompress_ret_size = 0; ObZstdCtxAllocator &zstd_allocator = ObZstdCtxAllocator::get_thread_local_instance(); - OB_ZSTD_customMem zstd_mem = {ob_zstd_malloc, ob_zstd_free, &zstd_allocator}; + OB_ZSTD_customMem zstd_mem = {ob_zstd_malloc, ob_zstd_free, allocator ?: &zstd_allocator}; dst_data_size = 0; if (NULL == src_buffer @@ -178,4 +180,3 @@ int ObZstdCompressor::get_max_overflow_size(const int64_t src_data_size, } return ret; } - diff --git a/deps/oblib/src/lib/compress/zstd/ob_zstd_compressor.h b/deps/oblib/src/lib/compress/zstd/ob_zstd_compressor.h index 77361b3627..8832446ebe 100644 --- a/deps/oblib/src/lib/compress/zstd/ob_zstd_compressor.h +++ b/deps/oblib/src/lib/compress/zstd/ob_zstd_compressor.h @@ -23,8 +23,9 @@ namespace common namespace zstd { -class ObZstdCtxAllocator +class ObZstdCtxAllocator : public ObIAllocator { +static constexpr int64_t ZSTD_ALLOCATOR_BLOCK_SIZE = (1LL << 20) - (17LL << 10); public: ObZstdCtxAllocator(int64_t tenant_id); virtual ~ObZstdCtxAllocator(); @@ -33,10 +34,11 @@ public: thread_local ObZstdCtxAllocator allocator(ob_thread_tenant_id()); return allocator; } - void *alloc(size_t size); - void free(void *addr); - void reuse(); - void reset(); + void *alloc(const int64_t size) override; + void *alloc(const int64_t size, const ObMemAttr &attr) override { return NULL; } + void free(void *ptr) override; + void reuse() override; + void reset() override; private: ObArenaAllocator allocator_; }; @@ -50,12 +52,32 @@ public: const int64_t src_data_size, char *dst_buffer, const int64_t dst_buffer_size, - int64_t &dst_data_size); + int64_t &dst_data_size, + ObIAllocator *allocator) override; int decompress(const char *src_buffer, const int64_t src_data_size, char *dst_buffer, const int64_t dst_buffer_size, - int64_t &dst_data_size); + int64_t &dst_data_size, + ObIAllocator *allocator) override; + int compress(const char *src_buffer, + const int64_t src_data_size, + char *dst_buffer, + const int64_t dst_buffer_size, + int64_t &dst_data_size) override + { + return compress(src_buffer, src_data_size, dst_buffer, + dst_buffer_size, dst_data_size, NULL); + } + int decompress(const char *src_buffer, + const int64_t src_data_size, + char *dst_buffer, + const int64_t dst_buffer_size, + int64_t &dst_data_size) override + { + return decompress(src_buffer, src_data_size, dst_buffer, + dst_buffer_size, dst_data_size, NULL); + } const char *get_compressor_name() const; ObCompressorType get_compressor_type() const; int get_max_overflow_size(const int64_t src_data_size, diff --git a/deps/oblib/src/lib/compress/zstd_1_3_8/ob_zstd_compressor_1_3_8.cpp b/deps/oblib/src/lib/compress/zstd_1_3_8/ob_zstd_compressor_1_3_8.cpp index ff9a048dd3..b5db8e0b63 100644 --- a/deps/oblib/src/lib/compress/zstd_1_3_8/ob_zstd_compressor_1_3_8.cpp +++ b/deps/oblib/src/lib/compress/zstd_1_3_8/ob_zstd_compressor_1_3_8.cpp @@ -25,7 +25,7 @@ static void *ob_zstd_malloc(void *opaque, size_t size) { void *buf = NULL; if (NULL != opaque) { - ObZstdCtxAllocator *allocator = reinterpret_cast (opaque); + ObIAllocator *allocator = reinterpret_cast (opaque); buf = allocator->alloc(size); } return buf; @@ -34,7 +34,7 @@ static void *ob_zstd_malloc(void *opaque, size_t size) static void ob_zstd_free(void *opaque, void *address) { if (NULL != opaque) { - ObZstdCtxAllocator *allocator = reinterpret_cast (opaque); + ObIAllocator *allocator = reinterpret_cast (opaque); allocator->free(address); } } @@ -43,7 +43,7 @@ static void ob_zstd_free(void *opaque, void *address) * ------------------------------ObZstdCtxAllocator--------------------- */ ObZstdCtxAllocator::ObZstdCtxAllocator(int64_t tenant_id) - : allocator_(ObModIds::OB_COMPRESSOR, OB_MALLOC_BIG_BLOCK_SIZE, + : allocator_(ObModIds::OB_COMPRESSOR, ZSTD_ALLOCATOR_BLOCK_SIZE, tenant_id) { } @@ -52,14 +52,14 @@ ObZstdCtxAllocator::~ObZstdCtxAllocator() { } -void* ObZstdCtxAllocator::alloc(size_t size) +void* ObZstdCtxAllocator::alloc(const int64_t size) { return allocator_.alloc(size); } -void ObZstdCtxAllocator::free(void *addr) +void ObZstdCtxAllocator::free(void *ptr) { - allocator_.free(addr); + allocator_.free(ptr); } void ObZstdCtxAllocator::reuse() @@ -79,13 +79,14 @@ int ObZstdCompressor_1_3_8::compress(const char *src_buffer, const int64_t src_data_size, char *dst_buffer, const int64_t dst_buffer_size, - int64_t &dst_data_size) + int64_t &dst_data_size, + ObIAllocator *allocator) { int ret = OB_SUCCESS; int64_t max_overflow_size = 0; size_t compress_ret_size = 0; ObZstdCtxAllocator &zstd_allocator = ObZstdCtxAllocator::get_thread_local_instance(); - OB_ZSTD_customMem zstd_mem = {ob_zstd_malloc, ob_zstd_free, &zstd_allocator}; + OB_ZSTD_customMem zstd_mem = {ob_zstd_malloc, ob_zstd_free, allocator ?: &zstd_allocator}; dst_data_size = 0; if (NULL == src_buffer @@ -121,12 +122,13 @@ int ObZstdCompressor_1_3_8::decompress(const char *src_buffer, const int64_t src_data_size, char *dst_buffer, const int64_t dst_buffer_size, - int64_t &dst_data_size) + int64_t &dst_data_size, + ObIAllocator *allocator) { int ret = OB_SUCCESS; size_t decompress_ret_size = 0; ObZstdCtxAllocator &zstd_allocator = ObZstdCtxAllocator::get_thread_local_instance(); - OB_ZSTD_customMem zstd_mem = {ob_zstd_malloc, ob_zstd_free, &zstd_allocator}; + OB_ZSTD_customMem zstd_mem = {ob_zstd_malloc, ob_zstd_free, allocator ?: &zstd_allocator}; dst_data_size = 0; if (NULL == src_buffer @@ -147,7 +149,7 @@ int ObZstdCompressor_1_3_8::decompress(const char *src_buffer, } else { dst_data_size = decompress_ret_size; } - + zstd_allocator.reuse(); return ret; } @@ -179,4 +181,3 @@ int ObZstdCompressor_1_3_8::get_max_overflow_size(const int64_t src_data_size, } return ret; } - diff --git a/deps/oblib/src/lib/compress/zstd_1_3_8/ob_zstd_compressor_1_3_8.h b/deps/oblib/src/lib/compress/zstd_1_3_8/ob_zstd_compressor_1_3_8.h index 0c6be709d3..af4e1b647b 100644 --- a/deps/oblib/src/lib/compress/zstd_1_3_8/ob_zstd_compressor_1_3_8.h +++ b/deps/oblib/src/lib/compress/zstd_1_3_8/ob_zstd_compressor_1_3_8.h @@ -23,8 +23,9 @@ namespace common namespace zstd_1_3_8 { -class ObZstdCtxAllocator +class ObZstdCtxAllocator : public ObIAllocator { +static constexpr int64_t ZSTD_ALLOCATOR_BLOCK_SIZE = (1LL << 20) - (17LL << 10); public: ObZstdCtxAllocator(int64_t tenant_id); virtual ~ObZstdCtxAllocator(); @@ -33,10 +34,11 @@ public: thread_local ObZstdCtxAllocator allocator(ob_thread_tenant_id()); return allocator; } - void *alloc(size_t size); - void free(void *addr); - void reuse(); - void reset(); + void *alloc(const int64_t size) override; + void *alloc(const int64_t size, const ObMemAttr &attr) override { return NULL; } + void free(void *ptr) override; + void reuse() override; + void reset() override; private: ObArenaAllocator allocator_; }; @@ -50,12 +52,32 @@ public: const int64_t src_data_size, char *dst_buffer, const int64_t dst_buffer_size, - int64_t &dst_data_size); + int64_t &dst_data_size, + ObIAllocator *allocator) override; int decompress(const char *src_buffer, const int64_t src_data_size, char *dst_buffer, const int64_t dst_buffer_size, - int64_t &dst_data_size); + int64_t &dst_data_size, + ObIAllocator *allocator) override; + int compress(const char *src_buffer, + const int64_t src_data_size, + char *dst_buffer, + const int64_t dst_buffer_size, + int64_t &dst_data_size) override + { + return compress(src_buffer, src_data_size, dst_buffer, + dst_buffer_size, dst_data_size, NULL); + } + int decompress(const char *src_buffer, + const int64_t src_data_size, + char *dst_buffer, + const int64_t dst_buffer_size, + int64_t &dst_data_size) override + { + return decompress(src_buffer, src_data_size, dst_buffer, + dst_buffer_size, dst_data_size, NULL); + } const char *get_compressor_name() const; ObCompressorType get_compressor_type() const; int get_max_overflow_size(const int64_t src_data_size, diff --git a/deps/oblib/unittest/lib/CMakeLists.txt b/deps/oblib/unittest/lib/CMakeLists.txt index 09b7f8c481..1367ae7f08 100644 --- a/deps/oblib/unittest/lib/CMakeLists.txt +++ b/deps/oblib/unittest/lib/CMakeLists.txt @@ -119,6 +119,7 @@ oblib_addtest(wait_event/test_wait_event.cpp) oblib_addtest(utility/test_fast_convert.cpp) oblib_addtest(utility/test_defer.cpp) oblib_addtest(hash/test_ob_ref_mgr.cpp) +oblib_addtest(compress/test_compressor.cpp) ##codec oblib_addtest(codec/test_composite.cpp) diff --git a/deps/oblib/unittest/lib/compress/test_compressor.cpp b/deps/oblib/unittest/lib/compress/test_compressor.cpp index 1dc57cf3dd..dd54b0099b 100644 --- a/deps/oblib/unittest/lib/compress/test_compressor.cpp +++ b/deps/oblib/unittest/lib/compress/test_compressor.cpp @@ -17,7 +17,7 @@ #include "lib/compress/ob_compressor_pool.h" #include "lib/alloc/alloc_func.h" #include "lib/ob_define.h" -#include "lib/compress/zlib/zlib.h" +#include "zlib.h" #include "lib/checksum/ob_crc64.h" #include "lib/coro/testing.h" @@ -25,6 +25,7 @@ using namespace oceanbase::obsys; using namespace oceanbase::common; using namespace oceanbase::common::hash; +using namespace oceanbase::common::zstd; namespace oceanbase { @@ -388,18 +389,6 @@ TEST(ObCompressorStress, compress_stable) ASSERT_EQ(OB_SUCCESS, ret); cmp_stress.set_thread_count(20); cmp_stress.start(); - for (int64_t i = 0; i < sleep_sec; ++i) { - common::ObLabelItem item; - lib::get_tenant_label_memory(common::OB_SERVER_TENANT_ID, ObModIds::OB_COMPRESSOR, item); - COMMON_LOG(INFO, "MEMORY USED: ", - K(item.hold_), - K(item.used_), - K(item.alloc_count_), - K(item.free_count_), - K(item.count_)); - ASSERT_TRUE(item.alloc_count_ < 1000); - sleep(1); - } cmp_stress.stop(); cmp_stress.wait(); cmp_stress.destroy(); @@ -545,58 +534,107 @@ TEST_F(ObCompressorTest, test_zlib_stream) ASSERT_NE(0, strcmp(data, decompress_buffer)); } -TEST_F(ObCompressorTest, test_zlib_vs_ob) +class MyAlloc : public ObIAllocator { - static const uint32_t MAX_DATA_SIZE = 1<<18;//256k - const uint32_t compress_step = 10; - - unsigned char data[MAX_DATA_SIZE]; - uint64_t dst_data_size1 = 0; - unsigned char compress_buffer1[static_cast(MAX_DATA_SIZE * 1.1 + 14)]; - uint64_t dst_data_size2 = 0; - unsigned char compress_buffer2[static_cast(MAX_DATA_SIZE * 1.1 + 14)]; - uint64_t decompress_data_size = 0; - unsigned char decompress_buffer[MAX_DATA_SIZE]; - - timeval start, end; - ObRandom test_random; - int64_t num = 0; - for (uint32_t i = 0; i < MAX_DATA_SIZE; i++) { - num = test_random.get(0, 255); - data[i] = (unsigned char)(num); +public: + MyAlloc() : alloc_count_(0), free_count_(0) {} + void *alloc(const int64_t sz) + { + alloc_count_++; + return ob_malloc(sz, "test"); } + void *alloc(const int64_t size, const ObMemAttr &attr) + { + return NULL; + } + void free(void *ptr) + { + free_count_++; + ob_free(ptr); + }; + int64_t alloc_count_; + int64_t free_count_; +}; - for (uint32_t test_len = 1; test_len < MAX_DATA_SIZE; test_len += compress_step) { - gettimeofday(&start, NULL); - for (int64_t i = 0; i < 2; i++) { - dst_data_size1 = static_cast(test_len * 1.1 + 14); - compress2(compress_buffer1, &dst_data_size1, reinterpret_cast(data), test_len, 0); - } - gettimeofday(&end, NULL); - COMMON_LOG(INFO, "zlib cost", "usec", (end.tv_sec - start.tv_sec) * 1000000 + end.tv_usec - start.tv_usec, K(test_len)); +void test_zstd_family(ObCompressor &compressor) +{ + MyAlloc alloc; + ASSERT_EQ(alloc.alloc_count_, 0); + ASSERT_EQ(alloc.free_count_, alloc.alloc_count_); + int64_t src_len = 2L<<20; + char *src_buf = (char*)ob_malloc(src_len, "test"); + for (int i = 0; i < src_len; i++) { + src_buf[i] = static_cast ('a' + ObRandom::rand(0, 25));; + } + char *cmp_buf = (char*)ob_malloc(src_len, "test"); + memcpy(cmp_buf, src_buf, src_len); + int64_t max_overflow_size = 0; + int ret = compressor.get_max_overflow_size(src_len, max_overflow_size); + ASSERT_EQ(OB_SUCCESS, ret); + int64_t dst_buf_len = src_len + max_overflow_size; + char *dst_buf = (char*)ob_malloc(dst_buf_len, "test"); + int64_t dst_actual_len = 0; + ret = compressor.compress(src_buf, src_len, dst_buf, dst_buf_len, dst_actual_len, &alloc); + ASSERT_EQ(OB_SUCCESS, ret); + int64_t alloc_count_bak = alloc.alloc_count_; + ASSERT_NE(alloc.alloc_count_, 0); + ASSERT_EQ(alloc.free_count_, alloc.alloc_count_); + memset(src_buf, 0, src_len); + int64_t src_actual_len = 0; + ret = compressor.decompress(dst_buf, dst_actual_len, src_buf, src_len, src_actual_len, &alloc); + ASSERT_EQ(OB_SUCCESS, ret); + ASSERT_EQ(src_actual_len, src_len); + ASSERT_EQ(0, memcmp(cmp_buf, src_buf, src_len)); + ASSERT_GT(alloc.alloc_count_, alloc_count_bak); + ASSERT_EQ(alloc.free_count_, alloc.alloc_count_); + alloc_count_bak = alloc.alloc_count_; - gettimeofday(&start, NULL); - for (int64_t i = 0; i < 2; i++) { - dst_data_size2 = static_cast(test_len * 1.1 + 14); - zlib_compressor.fast_level0_compress(compress_buffer2, &dst_data_size2, reinterpret_cast(data), test_len); - } - gettimeofday(&end, NULL); - COMMON_LOG(INFO, "ob cost", "usec", (end.tv_sec - start.tv_sec) * 1000000 + end.tv_usec - start.tv_usec); + // decompress without allocator + memset(src_buf, 0, src_len); + src_actual_len = 0; + ret = compressor.decompress(dst_buf, dst_actual_len, src_buf, src_len, src_actual_len); + ASSERT_EQ(OB_SUCCESS, ret); + ASSERT_EQ(src_actual_len, src_len); + ASSERT_EQ(0, memcmp(cmp_buf, src_buf, src_len)); + ASSERT_EQ(alloc.alloc_count_, alloc_count_bak); + ASSERT_EQ(alloc.free_count_, alloc.alloc_count_); +} - ASSERT_EQ(dst_data_size1, dst_data_size2); - ASSERT_EQ(0, memcmp(compress_buffer1, compress_buffer2, dst_data_size2)); - - memset(decompress_buffer, 0, sizeof(decompress_buffer)); - decompress_data_size = sizeof(decompress_buffer); - int zlib_errno = uncompress(reinterpret_cast(decompress_buffer), - reinterpret_cast(&decompress_data_size), - reinterpret_cast(compress_buffer1), - static_cast(dst_data_size1)); - ASSERT_EQ(zlib_errno, Z_OK); - ASSERT_EQ(test_len, decompress_data_size); - ASSERT_EQ(0, memcmp(decompress_buffer, data, decompress_data_size)); +TEST_F(ObCompressorTest, test_zstd_custom_alloc) +{ + { + oceanbase::zstd::ObZstdCompressor compressor; + test_zstd_family(compressor); + } + { + oceanbase::zstd_1_3_8::ObZstdCompressor_1_3_8 compressor; + test_zstd_family(compressor); + } + { + oceanbase::common::ObLZ4Compressor lz4_compressor; + ObCompressor &compressor = lz4_compressor; + MyAlloc alloc; + ASSERT_EQ(alloc.alloc_count_, 0); + ASSERT_EQ(alloc.free_count_, alloc.alloc_count_); + int64_t src_len = 2L<<20; + char *src_buf = (char*)ob_malloc(src_len, "test"); + int64_t max_overflow_size = 0; + int ret = compressor.get_max_overflow_size(src_len, max_overflow_size); + ASSERT_EQ(OB_SUCCESS, ret); + int64_t dst_buf_len = src_len + max_overflow_size; + char *dst_buf = (char*)ob_malloc(dst_buf_len, "test"); + int64_t dst_actual_len = 0; + ret = compressor.compress(src_buf, src_len, dst_buf, dst_buf_len, dst_actual_len, &alloc); + ASSERT_EQ(OB_SUCCESS, ret); + ASSERT_EQ(alloc.alloc_count_, 0); + int64_t src_actual_len = 0; + ret = compressor.decompress(dst_buf, dst_actual_len, src_buf, src_len, src_actual_len, &alloc); + ASSERT_EQ(OB_SUCCESS, ret); + ASSERT_EQ(alloc.alloc_count_, 0); + ASSERT_EQ(src_actual_len, src_len); } } + } }