[CP] Support specify customed allocator for zstd_compressor
This commit is contained in:
154
deps/oblib/unittest/lib/compress/test_compressor.cpp
vendored
154
deps/oblib/unittest/lib/compress/test_compressor.cpp
vendored
@ -17,7 +17,7 @@
|
||||
#include "lib/compress/ob_compressor_pool.h"
|
||||
#include "lib/alloc/alloc_func.h"
|
||||
#include "lib/ob_define.h"
|
||||
#include "lib/compress/zlib/zlib.h"
|
||||
#include "zlib.h"
|
||||
#include "lib/checksum/ob_crc64.h"
|
||||
#include "lib/coro/testing.h"
|
||||
|
||||
@ -25,6 +25,7 @@
|
||||
using namespace oceanbase::obsys;
|
||||
using namespace oceanbase::common;
|
||||
using namespace oceanbase::common::hash;
|
||||
using namespace oceanbase::common::zstd;
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
@ -388,18 +389,6 @@ TEST(ObCompressorStress, compress_stable)
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
cmp_stress.set_thread_count(20);
|
||||
cmp_stress.start();
|
||||
for (int64_t i = 0; i < sleep_sec; ++i) {
|
||||
common::ObLabelItem item;
|
||||
lib::get_tenant_label_memory(common::OB_SERVER_TENANT_ID, ObModIds::OB_COMPRESSOR, item);
|
||||
COMMON_LOG(INFO, "MEMORY USED: ",
|
||||
K(item.hold_),
|
||||
K(item.used_),
|
||||
K(item.alloc_count_),
|
||||
K(item.free_count_),
|
||||
K(item.count_));
|
||||
ASSERT_TRUE(item.alloc_count_ < 1000);
|
||||
sleep(1);
|
||||
}
|
||||
cmp_stress.stop();
|
||||
cmp_stress.wait();
|
||||
cmp_stress.destroy();
|
||||
@ -545,58 +534,107 @@ TEST_F(ObCompressorTest, test_zlib_stream)
|
||||
ASSERT_NE(0, strcmp(data, decompress_buffer));
|
||||
}
|
||||
|
||||
TEST_F(ObCompressorTest, test_zlib_vs_ob)
|
||||
class MyAlloc : public ObIAllocator
|
||||
{
|
||||
static const uint32_t MAX_DATA_SIZE = 1<<18;//256k
|
||||
const uint32_t compress_step = 10;
|
||||
|
||||
unsigned char data[MAX_DATA_SIZE];
|
||||
uint64_t dst_data_size1 = 0;
|
||||
unsigned char compress_buffer1[static_cast<uint32_t>(MAX_DATA_SIZE * 1.1 + 14)];
|
||||
uint64_t dst_data_size2 = 0;
|
||||
unsigned char compress_buffer2[static_cast<uint32_t>(MAX_DATA_SIZE * 1.1 + 14)];
|
||||
uint64_t decompress_data_size = 0;
|
||||
unsigned char decompress_buffer[MAX_DATA_SIZE];
|
||||
|
||||
timeval start, end;
|
||||
ObRandom test_random;
|
||||
int64_t num = 0;
|
||||
for (uint32_t i = 0; i < MAX_DATA_SIZE; i++) {
|
||||
num = test_random.get(0, 255);
|
||||
data[i] = (unsigned char)(num);
|
||||
public:
|
||||
MyAlloc() : alloc_count_(0), free_count_(0) {}
|
||||
void *alloc(const int64_t sz)
|
||||
{
|
||||
alloc_count_++;
|
||||
return ob_malloc(sz, "test");
|
||||
}
|
||||
void *alloc(const int64_t size, const ObMemAttr &attr)
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
void free(void *ptr)
|
||||
{
|
||||
free_count_++;
|
||||
ob_free(ptr);
|
||||
};
|
||||
int64_t alloc_count_;
|
||||
int64_t free_count_;
|
||||
};
|
||||
|
||||
for (uint32_t test_len = 1; test_len < MAX_DATA_SIZE; test_len += compress_step) {
|
||||
gettimeofday(&start, NULL);
|
||||
for (int64_t i = 0; i < 2; i++) {
|
||||
dst_data_size1 = static_cast<uint32_t>(test_len * 1.1 + 14);
|
||||
compress2(compress_buffer1, &dst_data_size1, reinterpret_cast<const Bytef*>(data), test_len, 0);
|
||||
}
|
||||
gettimeofday(&end, NULL);
|
||||
COMMON_LOG(INFO, "zlib cost", "usec", (end.tv_sec - start.tv_sec) * 1000000 + end.tv_usec - start.tv_usec, K(test_len));
|
||||
void test_zstd_family(ObCompressor &compressor)
|
||||
{
|
||||
MyAlloc alloc;
|
||||
ASSERT_EQ(alloc.alloc_count_, 0);
|
||||
ASSERT_EQ(alloc.free_count_, alloc.alloc_count_);
|
||||
int64_t src_len = 2L<<20;
|
||||
char *src_buf = (char*)ob_malloc(src_len, "test");
|
||||
for (int i = 0; i < src_len; i++) {
|
||||
src_buf[i] = static_cast<char> ('a' + ObRandom::rand(0, 25));;
|
||||
}
|
||||
char *cmp_buf = (char*)ob_malloc(src_len, "test");
|
||||
memcpy(cmp_buf, src_buf, src_len);
|
||||
int64_t max_overflow_size = 0;
|
||||
int ret = compressor.get_max_overflow_size(src_len, max_overflow_size);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
int64_t dst_buf_len = src_len + max_overflow_size;
|
||||
char *dst_buf = (char*)ob_malloc(dst_buf_len, "test");
|
||||
int64_t dst_actual_len = 0;
|
||||
ret = compressor.compress(src_buf, src_len, dst_buf, dst_buf_len, dst_actual_len, &alloc);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
int64_t alloc_count_bak = alloc.alloc_count_;
|
||||
ASSERT_NE(alloc.alloc_count_, 0);
|
||||
ASSERT_EQ(alloc.free_count_, alloc.alloc_count_);
|
||||
memset(src_buf, 0, src_len);
|
||||
int64_t src_actual_len = 0;
|
||||
ret = compressor.decompress(dst_buf, dst_actual_len, src_buf, src_len, src_actual_len, &alloc);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
ASSERT_EQ(src_actual_len, src_len);
|
||||
ASSERT_EQ(0, memcmp(cmp_buf, src_buf, src_len));
|
||||
ASSERT_GT(alloc.alloc_count_, alloc_count_bak);
|
||||
ASSERT_EQ(alloc.free_count_, alloc.alloc_count_);
|
||||
alloc_count_bak = alloc.alloc_count_;
|
||||
|
||||
gettimeofday(&start, NULL);
|
||||
for (int64_t i = 0; i < 2; i++) {
|
||||
dst_data_size2 = static_cast<uint32_t>(test_len * 1.1 + 14);
|
||||
zlib_compressor.fast_level0_compress(compress_buffer2, &dst_data_size2, reinterpret_cast<const Bytef*>(data), test_len);
|
||||
}
|
||||
gettimeofday(&end, NULL);
|
||||
COMMON_LOG(INFO, "ob cost", "usec", (end.tv_sec - start.tv_sec) * 1000000 + end.tv_usec - start.tv_usec);
|
||||
// decompress without allocator
|
||||
memset(src_buf, 0, src_len);
|
||||
src_actual_len = 0;
|
||||
ret = compressor.decompress(dst_buf, dst_actual_len, src_buf, src_len, src_actual_len);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
ASSERT_EQ(src_actual_len, src_len);
|
||||
ASSERT_EQ(0, memcmp(cmp_buf, src_buf, src_len));
|
||||
ASSERT_EQ(alloc.alloc_count_, alloc_count_bak);
|
||||
ASSERT_EQ(alloc.free_count_, alloc.alloc_count_);
|
||||
}
|
||||
|
||||
ASSERT_EQ(dst_data_size1, dst_data_size2);
|
||||
ASSERT_EQ(0, memcmp(compress_buffer1, compress_buffer2, dst_data_size2));
|
||||
|
||||
memset(decompress_buffer, 0, sizeof(decompress_buffer));
|
||||
decompress_data_size = sizeof(decompress_buffer);
|
||||
int zlib_errno = uncompress(reinterpret_cast<Bytef*>(decompress_buffer),
|
||||
reinterpret_cast<uLongf*>(&decompress_data_size),
|
||||
reinterpret_cast<const Byte*>(compress_buffer1),
|
||||
static_cast<uLong>(dst_data_size1));
|
||||
ASSERT_EQ(zlib_errno, Z_OK);
|
||||
ASSERT_EQ(test_len, decompress_data_size);
|
||||
ASSERT_EQ(0, memcmp(decompress_buffer, data, decompress_data_size));
|
||||
TEST_F(ObCompressorTest, test_zstd_custom_alloc)
|
||||
{
|
||||
{
|
||||
oceanbase::zstd::ObZstdCompressor compressor;
|
||||
test_zstd_family(compressor);
|
||||
}
|
||||
{
|
||||
oceanbase::zstd_1_3_8::ObZstdCompressor_1_3_8 compressor;
|
||||
test_zstd_family(compressor);
|
||||
}
|
||||
{
|
||||
oceanbase::common::ObLZ4Compressor lz4_compressor;
|
||||
ObCompressor &compressor = lz4_compressor;
|
||||
MyAlloc alloc;
|
||||
ASSERT_EQ(alloc.alloc_count_, 0);
|
||||
ASSERT_EQ(alloc.free_count_, alloc.alloc_count_);
|
||||
int64_t src_len = 2L<<20;
|
||||
char *src_buf = (char*)ob_malloc(src_len, "test");
|
||||
int64_t max_overflow_size = 0;
|
||||
int ret = compressor.get_max_overflow_size(src_len, max_overflow_size);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
int64_t dst_buf_len = src_len + max_overflow_size;
|
||||
char *dst_buf = (char*)ob_malloc(dst_buf_len, "test");
|
||||
int64_t dst_actual_len = 0;
|
||||
ret = compressor.compress(src_buf, src_len, dst_buf, dst_buf_len, dst_actual_len, &alloc);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
ASSERT_EQ(alloc.alloc_count_, 0);
|
||||
int64_t src_actual_len = 0;
|
||||
ret = compressor.decompress(dst_buf, dst_actual_len, src_buf, src_len, src_actual_len, &alloc);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
ASSERT_EQ(alloc.alloc_count_, 0);
|
||||
ASSERT_EQ(src_actual_len, src_len);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user