Files
openGauss-server/contrib/gms_compress/gms_compress.cpp
2024-11-26 14:44:59 +08:00

401 lines
14 KiB
C++

#include <cstdlib>
#include <zlib.h>
#include "postgres.h"
#include "knl/knl_variable.h"
#include "commands/extension.h"
#include "utils/builtins.h"
#include "utils/varbit.h"
#include "utils/memutils.h"
#include "gms_compress.h"
PG_MODULE_MAGIC;
#define MIN_QUALITY 1
#define MAX_QUALITY 9
#define MIN_HANDLE 0
#define MAX_HANDLE 4
#define HANDLE_OFFSET 1
#define UNCOMPRESS_LEVEL 0
PG_FUNCTION_INFO_V1(gms_lz_compress);
PG_FUNCTION_INFO_V1(gms_lz_uncompress);
PG_FUNCTION_INFO_V1(gms_lz_compress_open);
PG_FUNCTION_INFO_V1(gms_lz_compress_close);
PG_FUNCTION_INFO_V1(gms_lz_compress_add);
PG_FUNCTION_INFO_V1(gms_lz_uncompress_open);
PG_FUNCTION_INFO_V1(gms_lz_uncompress_close);
PG_FUNCTION_INFO_V1(gms_lz_uncompress_extract);
PG_FUNCTION_INFO_V1(gms_isopen);
static void gzip_compress(void* src, Size src_len, void** dst, Size* dst_len, int quality);
static void gzip_uncompress(void* src, Size src_len, void** dst, Size* dst_len);
static void free_context(int handle);
static inline void Check_Invalid_Input(gms_compress_context* compress_cxt, int handle);
static uint32 compress_index;
void set_extension_index(uint32 index)
{
compress_index = index;
}
void init_session_vars(void) {
RepallocSessionVarsArrayIfNecessary();
gms_compress_context* psc =
(gms_compress_context*)MemoryContextAlloc(u_sess->self_mem_cxt, sizeof(gms_compress_context));
u_sess->attr.attr_common.extension_session_vars_array[compress_index] = psc;
for (int i =0;i < UTLCOMP_MAX_HANDLE;i++) {
psc->context[i].compress_level = -1;
psc->context[i].compressed_data = NULL;
psc->context[i].uncompressed_data = NULL;
psc->context[i].used = false;
}
}
gms_compress_context* get_session_context() {
if (u_sess->attr.attr_common.extension_session_vars_array[compress_index] == NULL) {
init_session_vars();
}
return (gms_compress_context*)u_sess->attr.attr_common.extension_session_vars_array[compress_index];
}
Datum gms_lz_compress(PG_FUNCTION_ARGS)
{
if (PG_ARGISNULL(0)) {
ereport(ERROR, (errmsg("compressed data cannot be NULL")));
}
bytea *input_bytea = PG_GETARG_BYTEA_PP(0);
int quality = PG_GETARG_INT32(1);
Size src_len = VARSIZE_ANY_EXHDR(input_bytea);
bytea *dst = NULL;
Size dst_len = 0;
/* Call gzip_compress function for compression */
gzip_compress(input_bytea, src_len, (void**)&dst, &dst_len, quality);
SET_VARSIZE(dst, VARHDRSZ + dst_len);
PG_RETURN_BYTEA_P(dst);
}
Datum gms_lz_uncompress(PG_FUNCTION_ARGS)
{
if (PG_ARGISNULL(0)) {
ereport(ERROR, (errmsg("uncompressed data cannot be NULL")));
}
bytea *input_bytea = PG_GETARG_BYTEA_PP(0);
Size src_len = VARSIZE_ANY_EXHDR(input_bytea);
bytea *dst = NULL;
Size dst_len = 0;
/* Call gzip_uncompress function for uncompression */
gzip_uncompress(input_bytea, src_len, (void**)&dst, &dst_len);
SET_VARSIZE(dst, VARHDRSZ + dst_len);
PG_RETURN_TEXT_P(dst);
}
/* Open a handle and initialize it */
Datum gms_lz_compress_open(PG_FUNCTION_ARGS)
{
bytea *input_bytea = PG_GETARG_BYTEA_PP(0);
int quality = PG_GETARG_INT32(1);
if (quality < MIN_QUALITY || quality > MAX_QUALITY) {
ereport(ERROR, (errmsg("compression quality must be within the range of %d to %d", MIN_QUALITY, MAX_QUALITY)));
}
gms_compress_context *compress_cxt = get_session_context();
for(int i = 0;i < UTLCOMP_MAX_HANDLE;i++) {
if (!compress_cxt->context[i].used) {
compress_cxt->context[i].compress_level = quality;
compress_cxt->context[i].used = true;
PG_RETURN_INT32(i + HANDLE_OFFSET);
}
}
ereport(ERROR, (errmsg("no handle free, the maximum number of handles is %d", MAX_HANDLE + HANDLE_OFFSET)));
PG_RETURN_VOID();
}
/* Close a handle and compress the data back */
Datum gms_lz_compress_close(PG_FUNCTION_ARGS)
{
int handle = PG_GETARG_INT32(0) - HANDLE_OFFSET;
gms_compress_context *compress_cxt = get_session_context();
Check_Invalid_Input(compress_cxt, handle);
if (compress_cxt->context[handle].uncompressed_data == NULL) {
free_context(handle);
PG_RETURN_NULL();
}
if (compress_cxt->context[handle].compress_level == UNCOMPRESS_LEVEL) {
ereport(ERROR, (errmsg("handle %d is a uncompressed handle", handle + HANDLE_OFFSET)));
}
bytea *input_bytea = (bytea *)compress_cxt->context[handle].uncompressed_data;
Assert((char*)input_bytea + VARHDRSZ == VARDATA_ANY(input_bytea));
Size src_len = VARSIZE_ANY_EXHDR(input_bytea);
if (src_len > MaxAllocSize - VARHDRSZ) {
free_context(handle);
ereport(ERROR, (errmsg("data too long, data size cannot exceed 1GB")));
}
bytea *dst = NULL;
Size dst_len = 0;
int quality = compress_cxt->context[handle].compress_level;
/* Call gzip_compress function for compression */
gzip_compress(input_bytea, src_len, (void**)&dst, &dst_len, quality);
SET_VARSIZE(dst, VARHDRSZ + dst_len);
free_context(handle);
PG_RETURN_BYTEA_P(dst);
}
/* Open a handle and store data into it */
Datum gms_lz_compress_add(PG_FUNCTION_ARGS)
{
int handle = PG_GETARG_INT32(0) - HANDLE_OFFSET;
gms_compress_context *compress_cxt = get_session_context();
bytea *input_bytea = PG_GETARG_BYTEA_PP(1);
Size src_len = VARSIZE_ANY_EXHDR(input_bytea);
Check_Invalid_Input(compress_cxt, handle);
if (compress_cxt->context[handle].compress_level == UNCOMPRESS_LEVEL) {
ereport(ERROR, (errmsg("handle %d is a uncompressed handle", handle + HANDLE_OFFSET)));
}
if (src_len > MaxAllocSize - VARHDRSZ) {
ereport(ERROR, (errmsg("data too long, data size cannot exceed 1GB")));
}
bytea *new_data = (bytea *)compress_cxt->context[handle].uncompressed_data;
if (new_data == NULL) {
new_data = (bytea *)MemoryContextAlloc(u_sess->self_mem_cxt, src_len + VARHDRSZ);
SET_VARSIZE(new_data, src_len + VARHDRSZ);
Assert((char*)new_data + VARHDRSZ == VARDATA_ANY(new_data));
errno_t rc = memcpy_s(VARDATA(new_data), src_len, VARDATA(input_bytea), src_len);
securec_check(rc, "\0", "\0");
compress_cxt->context[handle].uncompressed_data = new_data;
} else {
Size dst_len = VARSIZE_ANY_EXHDR(new_data);
Size uncompressed_size = src_len + dst_len + VARHDRSZ;
if (uncompressed_size > MaxAllocSize) {
ereport(ERROR, (errmsg("data too long, data size cannot exceed 1GB")));
}
new_data = (bytea *)repalloc(new_data, uncompressed_size);
SET_VARSIZE(new_data, uncompressed_size);
Assert((char*)new_data + VARHDRSZ == VARDATA_ANY(new_data));
errno_t rc = memcpy_s(VARDATA(new_data) + dst_len, src_len, VARDATA(input_bytea), src_len);
securec_check(rc, "\0", "\0");
compress_cxt->context[handle].uncompressed_data = new_data;
}
PG_RETURN_VOID();
}
/* Open a handle and initialize it */
Datum gms_lz_uncompress_open(PG_FUNCTION_ARGS)
{
if (PG_ARGISNULL(0)) {
ereport(ERROR, (errmsg("uncompress_data can not be NULL")));
}
bytea *input_bytea = PG_GETARG_BYTEA_PP(0);
gms_compress_context *compress_cxt = get_session_context();
for(int i = 0;i < UTLCOMP_MAX_HANDLE;i++) {
if (!compress_cxt->context[i].used) {
compress_cxt->context[i].compressed_data = input_bytea;
compress_cxt->context[i].compress_level = UNCOMPRESS_LEVEL;
compress_cxt->context[i].used = true;
PG_RETURN_INT32(i + HANDLE_OFFSET);
}
}
ereport(ERROR, (errmsg("no handle free, the maximum number of handles is %d", MAX_HANDLE + HANDLE_OFFSET)));
PG_RETURN_VOID();
}
/* Close and free the handle */
Datum gms_lz_uncompress_close(PG_FUNCTION_ARGS)
{
int handle = PG_GETARG_INT32(0) - HANDLE_OFFSET;
gms_compress_context *compress_cxt = get_session_context();
Check_Invalid_Input(compress_cxt, handle);
if (compress_cxt->context[handle].compress_level != UNCOMPRESS_LEVEL) {
ereport(ERROR, (errmsg("handle %d is a compressed handle", handle + HANDLE_OFFSET)));
}
free_context(handle);
PG_RETURN_VOID();
}
/* Open a handle and uncompress the data back */
Datum gms_lz_uncompress_extract(PG_FUNCTION_ARGS)
{
int handle = PG_GETARG_INT32(0) - HANDLE_OFFSET;
gms_compress_context *compress_cxt = get_session_context();
Check_Invalid_Input(compress_cxt, handle);
if (compress_cxt->context[handle].compressed_data == NULL) {
ereport(ERROR, (errmsg("no compressed data found in handle %d", handle + HANDLE_OFFSET)));
}
if (compress_cxt->context[handle].compress_level != UNCOMPRESS_LEVEL) {
ereport(ERROR, (errmsg("handle %d is a compressed handle", handle + HANDLE_OFFSET)));
}
bytea *src = (bytea *)compress_cxt->context[handle].compressed_data;
Size src_len = VARSIZE_ANY_EXHDR(src);
Size dst_len = 0;
bytea *dst = NULL;
compress_cxt->context[handle].compressed_data = NULL;
gzip_uncompress(src, src_len, (void**)&dst, &dst_len);
SET_VARSIZE(dst, VARHDRSZ + dst_len);
PG_RETURN_BYTEA_P(dst);
}
/* Check if a handle has been opened */
Datum gms_isopen(PG_FUNCTION_ARGS)
{
int handle = PG_GETARG_INT32(0) - HANDLE_OFFSET;
if (handle < MIN_HANDLE || handle > MAX_HANDLE) {
return false;
}
return get_session_context()->context[handle].used;
}
/* Using the zlib library and Lemoel Xiv algorithm to implement compression functionality */
static void gzip_compress(void* src, Size src_len, void** dst, Size* dst_len, int quality)
{
/* The compression quality is limited between 1 and 9 */
if (quality < MIN_QUALITY || quality > MAX_QUALITY) {
ereport(ERROR, (errmsg("compression quality must be within the range of %d to %d", MIN_QUALITY, MAX_QUALITY)));
}
bytea *input_bytea = (bytea*)src;
Size compressed_size = compressBound(src_len) + GZIP_COMPRESS_EXTRA_LENGTH;
bytea *result = (bytea*)palloc(VARHDRSZ + compressed_size);
SET_VARSIZE(result, VARHDRSZ + compressed_size);
z_stream c_stream;
c_stream.zalloc = NULL;
c_stream.zfree = NULL;
c_stream.opaque = NULL;
// MAX_WBITS + 16 for gzip
if (deflateInit2(&c_stream, quality, Z_DEFLATED, MAX_WBITS + 16, 8, Z_DEFAULT_STRATEGY) != Z_OK) {
pfree_ext(result);
ereport(ERROR, (errmsg("zlib compression initialization failed")));
}
c_stream.avail_out = compressed_size; // output size
c_stream.next_out = (Bytef*)VARDATA_ANY(result); // output buffer
c_stream.avail_in = src_len; // input size
c_stream.next_in = (Bytef*)VARDATA_ANY(input_bytea); // input buffer
if (deflate(&c_stream, Z_FINISH) != Z_STREAM_END) {
pfree_ext(result);
ereport(ERROR, (errmsg("zlib compression failed")));
}
if (deflateEnd(&c_stream) != Z_OK) {
pfree_ext(result);
ereport(ERROR, (errmsg("zlib cleaning up compression stream failed")));
}
*dst_len = c_stream.total_out;
*dst = result;
}
/* Using the zlib library and Lemoel Xiv algorithm to implement uncompression functionality */
static void gzip_uncompress(void* src, Size src_len, void** dst, Size* dst_len)
{
bytea *input_bytea = (bytea*)src;
if (src_len < GZIP_MIN_LENGTH) {
ereport(ERROR, (errmsg("too small, minimum length of gzip format is %d bytes", GZIP_MIN_LENGTH)));
}
unsigned char *gzip_content = (unsigned char*)VARDATA_ANY(input_bytea);
if (gzip_content[0] != GZIP_HEADER_1 || gzip_content[1] != GZIP_HEADER_2) {
ereport(ERROR, (errmsg("data corrupt, invalid compressed data head")));
}
uint4 uncompressed_size = *(uint4*)(gzip_content + src_len - sizeof(int));
#ifdef WORDS_BIGENDIAN
uncompressed_size = BSWAP32(uncompressed_size);
#endif
if (uncompressed_size > MaxAllocSize - VARHDRSZ) {
ereport(ERROR, (errmsg("data too long, data size cannot exceed 1GB")));
}
bytea *result = (bytea*)palloc(VARHDRSZ + uncompressed_size);
SET_VARSIZE(result, VARHDRSZ + uncompressed_size);
z_stream d_stream = { 0 };
d_stream.zalloc = NULL;
d_stream.zfree = NULL;
d_stream.opaque = NULL;
d_stream.next_in = (Bytef*)VARDATA_ANY(input_bytea);
d_stream.avail_in = src_len;
d_stream.avail_out = VARSIZE_ANY_EXHDR(result);
d_stream.next_out = (Bytef*)VARDATA_ANY(result);
// MAX_WBITS + 16 for gzip
if (inflateInit2(&d_stream, 16 + MAX_WBITS) != Z_OK) {
pfree_ext(result);
ereport(ERROR, (errmsg("zlib uncompression initialization failed")));
}
if (inflate(&d_stream, Z_FINISH) != Z_STREAM_END) {
pfree_ext(result);
ereport(ERROR, (errmsg("zlib uncompression failed")));
}
if (inflateEnd(&d_stream) != Z_OK) {
pfree_ext(result);
ereport(ERROR, (errmsg("zlib cleaning up uncompression stream failed")));
}
*dst_len = d_stream.total_out;
*dst = result;
return;
}
/**
* Because compressed_data is the data source address and
* uncompressed_data is the newly created data address,
* it is necessary to point compressed_data to null and release uncompressed_data
*/
static void free_context(int handle)
{
if (handle < MIN_HANDLE || handle > MAX_HANDLE)
{
return;
}
gms_compress_context *compress_cxt = get_session_context();
compress_cxt->context[handle].compressed_data =NULL;
pfree_ext(compress_cxt->context[handle].uncompressed_data);
compress_cxt->context[handle].compress_level = -1;
compress_cxt->context[handle].used = false;
return;
}
static inline void Check_Invalid_Input(gms_compress_context* compress_cxt, int handle) {
if (handle < MIN_HANDLE || handle > MAX_HANDLE) {
ereport(ERROR, (errmsg("invalid handle, it be within the range of %d to %d",
MIN_HANDLE + HANDLE_OFFSET, MAX_HANDLE + HANDLE_OFFSET)));
}
if (!compress_cxt->context[handle].used) {
ereport(ERROR, (errmsg("handle %d is not be used", handle + HANDLE_OFFSET)));
}
}