401 lines
14 KiB
C++
401 lines
14 KiB
C++
#include <cstdlib>
|
|
#include <zlib.h>
|
|
#include "postgres.h"
|
|
#include "knl/knl_variable.h"
|
|
#include "commands/extension.h"
|
|
#include "utils/builtins.h"
|
|
#include "utils/varbit.h"
|
|
#include "utils/memutils.h"
|
|
|
|
#include "gms_compress.h"
|
|
|
|
PG_MODULE_MAGIC;
|
|
|
|
#define MIN_QUALITY 1
|
|
#define MAX_QUALITY 9
|
|
#define MIN_HANDLE 0
|
|
#define MAX_HANDLE 4
|
|
#define HANDLE_OFFSET 1
|
|
#define UNCOMPRESS_LEVEL 0
|
|
|
|
PG_FUNCTION_INFO_V1(gms_lz_compress);
|
|
PG_FUNCTION_INFO_V1(gms_lz_uncompress);
|
|
PG_FUNCTION_INFO_V1(gms_lz_compress_open);
|
|
PG_FUNCTION_INFO_V1(gms_lz_compress_close);
|
|
PG_FUNCTION_INFO_V1(gms_lz_compress_add);
|
|
PG_FUNCTION_INFO_V1(gms_lz_uncompress_open);
|
|
PG_FUNCTION_INFO_V1(gms_lz_uncompress_close);
|
|
PG_FUNCTION_INFO_V1(gms_lz_uncompress_extract);
|
|
PG_FUNCTION_INFO_V1(gms_isopen);
|
|
|
|
static void gzip_compress(void* src, Size src_len, void** dst, Size* dst_len, int quality);
|
|
static void gzip_uncompress(void* src, Size src_len, void** dst, Size* dst_len);
|
|
static void free_context(int handle);
|
|
static inline void Check_Invalid_Input(gms_compress_context* compress_cxt, int handle);
|
|
|
|
static uint32 compress_index;
|
|
|
|
void set_extension_index(uint32 index)
|
|
{
|
|
compress_index = index;
|
|
}
|
|
|
|
void init_session_vars(void) {
|
|
RepallocSessionVarsArrayIfNecessary();
|
|
gms_compress_context* psc =
|
|
(gms_compress_context*)MemoryContextAlloc(u_sess->self_mem_cxt, sizeof(gms_compress_context));
|
|
u_sess->attr.attr_common.extension_session_vars_array[compress_index] = psc;
|
|
|
|
for (int i =0;i < UTLCOMP_MAX_HANDLE;i++) {
|
|
psc->context[i].compress_level = -1;
|
|
psc->context[i].compressed_data = NULL;
|
|
psc->context[i].uncompressed_data = NULL;
|
|
psc->context[i].used = false;
|
|
}
|
|
}
|
|
|
|
gms_compress_context* get_session_context() {
|
|
if (u_sess->attr.attr_common.extension_session_vars_array[compress_index] == NULL) {
|
|
init_session_vars();
|
|
}
|
|
return (gms_compress_context*)u_sess->attr.attr_common.extension_session_vars_array[compress_index];
|
|
}
|
|
|
|
Datum gms_lz_compress(PG_FUNCTION_ARGS)
|
|
{
|
|
if (PG_ARGISNULL(0)) {
|
|
ereport(ERROR, (errmsg("compressed data cannot be NULL")));
|
|
}
|
|
|
|
bytea *input_bytea = PG_GETARG_BYTEA_PP(0);
|
|
int quality = PG_GETARG_INT32(1);
|
|
Size src_len = VARSIZE_ANY_EXHDR(input_bytea);
|
|
bytea *dst = NULL;
|
|
Size dst_len = 0;
|
|
|
|
/* Call gzip_compress function for compression */
|
|
gzip_compress(input_bytea, src_len, (void**)&dst, &dst_len, quality);
|
|
|
|
SET_VARSIZE(dst, VARHDRSZ + dst_len);
|
|
PG_RETURN_BYTEA_P(dst);
|
|
}
|
|
|
|
Datum gms_lz_uncompress(PG_FUNCTION_ARGS)
|
|
{
|
|
if (PG_ARGISNULL(0)) {
|
|
ereport(ERROR, (errmsg("uncompressed data cannot be NULL")));
|
|
}
|
|
|
|
bytea *input_bytea = PG_GETARG_BYTEA_PP(0);
|
|
Size src_len = VARSIZE_ANY_EXHDR(input_bytea);
|
|
bytea *dst = NULL;
|
|
Size dst_len = 0;
|
|
|
|
/* Call gzip_uncompress function for uncompression */
|
|
gzip_uncompress(input_bytea, src_len, (void**)&dst, &dst_len);
|
|
|
|
SET_VARSIZE(dst, VARHDRSZ + dst_len);
|
|
PG_RETURN_TEXT_P(dst);
|
|
}
|
|
|
|
/* Open a handle and initialize it */
|
|
Datum gms_lz_compress_open(PG_FUNCTION_ARGS)
|
|
{
|
|
bytea *input_bytea = PG_GETARG_BYTEA_PP(0);
|
|
int quality = PG_GETARG_INT32(1);
|
|
|
|
if (quality < MIN_QUALITY || quality > MAX_QUALITY) {
|
|
ereport(ERROR, (errmsg("compression quality must be within the range of %d to %d", MIN_QUALITY, MAX_QUALITY)));
|
|
}
|
|
|
|
gms_compress_context *compress_cxt = get_session_context();
|
|
for(int i = 0;i < UTLCOMP_MAX_HANDLE;i++) {
|
|
if (!compress_cxt->context[i].used) {
|
|
compress_cxt->context[i].compress_level = quality;
|
|
compress_cxt->context[i].used = true;
|
|
PG_RETURN_INT32(i + HANDLE_OFFSET);
|
|
}
|
|
}
|
|
|
|
ereport(ERROR, (errmsg("no handle free, the maximum number of handles is %d", MAX_HANDLE + HANDLE_OFFSET)));
|
|
PG_RETURN_VOID();
|
|
}
|
|
|
|
/* Close a handle and compress the data back */
|
|
Datum gms_lz_compress_close(PG_FUNCTION_ARGS)
|
|
{
|
|
int handle = PG_GETARG_INT32(0) - HANDLE_OFFSET;
|
|
gms_compress_context *compress_cxt = get_session_context();
|
|
|
|
Check_Invalid_Input(compress_cxt, handle);
|
|
|
|
if (compress_cxt->context[handle].uncompressed_data == NULL) {
|
|
free_context(handle);
|
|
PG_RETURN_NULL();
|
|
}
|
|
|
|
if (compress_cxt->context[handle].compress_level == UNCOMPRESS_LEVEL) {
|
|
ereport(ERROR, (errmsg("handle %d is a uncompressed handle", handle + HANDLE_OFFSET)));
|
|
}
|
|
|
|
bytea *input_bytea = (bytea *)compress_cxt->context[handle].uncompressed_data;
|
|
Assert((char*)input_bytea + VARHDRSZ == VARDATA_ANY(input_bytea));
|
|
Size src_len = VARSIZE_ANY_EXHDR(input_bytea);
|
|
if (src_len > MaxAllocSize - VARHDRSZ) {
|
|
free_context(handle);
|
|
ereport(ERROR, (errmsg("data too long, data size cannot exceed 1GB")));
|
|
}
|
|
|
|
bytea *dst = NULL;
|
|
Size dst_len = 0;
|
|
int quality = compress_cxt->context[handle].compress_level;
|
|
/* Call gzip_compress function for compression */
|
|
gzip_compress(input_bytea, src_len, (void**)&dst, &dst_len, quality);
|
|
|
|
SET_VARSIZE(dst, VARHDRSZ + dst_len);
|
|
|
|
free_context(handle);
|
|
PG_RETURN_BYTEA_P(dst);
|
|
}
|
|
|
|
/* Open a handle and store data into it */
|
|
Datum gms_lz_compress_add(PG_FUNCTION_ARGS)
|
|
{
|
|
int handle = PG_GETARG_INT32(0) - HANDLE_OFFSET;
|
|
gms_compress_context *compress_cxt = get_session_context();
|
|
bytea *input_bytea = PG_GETARG_BYTEA_PP(1);
|
|
Size src_len = VARSIZE_ANY_EXHDR(input_bytea);
|
|
|
|
Check_Invalid_Input(compress_cxt, handle);
|
|
|
|
if (compress_cxt->context[handle].compress_level == UNCOMPRESS_LEVEL) {
|
|
ereport(ERROR, (errmsg("handle %d is a uncompressed handle", handle + HANDLE_OFFSET)));
|
|
}
|
|
|
|
if (src_len > MaxAllocSize - VARHDRSZ) {
|
|
ereport(ERROR, (errmsg("data too long, data size cannot exceed 1GB")));
|
|
}
|
|
|
|
bytea *new_data = (bytea *)compress_cxt->context[handle].uncompressed_data;
|
|
if (new_data == NULL) {
|
|
new_data = (bytea *)MemoryContextAlloc(u_sess->self_mem_cxt, src_len + VARHDRSZ);
|
|
SET_VARSIZE(new_data, src_len + VARHDRSZ);
|
|
Assert((char*)new_data + VARHDRSZ == VARDATA_ANY(new_data));
|
|
errno_t rc = memcpy_s(VARDATA(new_data), src_len, VARDATA(input_bytea), src_len);
|
|
securec_check(rc, "\0", "\0");
|
|
compress_cxt->context[handle].uncompressed_data = new_data;
|
|
} else {
|
|
Size dst_len = VARSIZE_ANY_EXHDR(new_data);
|
|
Size uncompressed_size = src_len + dst_len + VARHDRSZ;
|
|
if (uncompressed_size > MaxAllocSize) {
|
|
ereport(ERROR, (errmsg("data too long, data size cannot exceed 1GB")));
|
|
}
|
|
new_data = (bytea *)repalloc(new_data, uncompressed_size);
|
|
SET_VARSIZE(new_data, uncompressed_size);
|
|
Assert((char*)new_data + VARHDRSZ == VARDATA_ANY(new_data));
|
|
errno_t rc = memcpy_s(VARDATA(new_data) + dst_len, src_len, VARDATA(input_bytea), src_len);
|
|
securec_check(rc, "\0", "\0");
|
|
compress_cxt->context[handle].uncompressed_data = new_data;
|
|
}
|
|
PG_RETURN_VOID();
|
|
}
|
|
|
|
/* Open a handle and initialize it */
|
|
Datum gms_lz_uncompress_open(PG_FUNCTION_ARGS)
|
|
{
|
|
if (PG_ARGISNULL(0)) {
|
|
ereport(ERROR, (errmsg("uncompress_data can not be NULL")));
|
|
}
|
|
|
|
bytea *input_bytea = PG_GETARG_BYTEA_PP(0);
|
|
gms_compress_context *compress_cxt = get_session_context();
|
|
|
|
for(int i = 0;i < UTLCOMP_MAX_HANDLE;i++) {
|
|
if (!compress_cxt->context[i].used) {
|
|
compress_cxt->context[i].compressed_data = input_bytea;
|
|
compress_cxt->context[i].compress_level = UNCOMPRESS_LEVEL;
|
|
compress_cxt->context[i].used = true;
|
|
|
|
PG_RETURN_INT32(i + HANDLE_OFFSET);
|
|
}
|
|
}
|
|
|
|
ereport(ERROR, (errmsg("no handle free, the maximum number of handles is %d", MAX_HANDLE + HANDLE_OFFSET)));
|
|
PG_RETURN_VOID();
|
|
}
|
|
|
|
/* Close and free the handle */
|
|
Datum gms_lz_uncompress_close(PG_FUNCTION_ARGS)
|
|
{
|
|
int handle = PG_GETARG_INT32(0) - HANDLE_OFFSET;
|
|
gms_compress_context *compress_cxt = get_session_context();
|
|
|
|
Check_Invalid_Input(compress_cxt, handle);
|
|
|
|
if (compress_cxt->context[handle].compress_level != UNCOMPRESS_LEVEL) {
|
|
ereport(ERROR, (errmsg("handle %d is a compressed handle", handle + HANDLE_OFFSET)));
|
|
}
|
|
|
|
free_context(handle);
|
|
PG_RETURN_VOID();
|
|
}
|
|
|
|
/* Open a handle and uncompress the data back */
|
|
Datum gms_lz_uncompress_extract(PG_FUNCTION_ARGS)
|
|
{
|
|
int handle = PG_GETARG_INT32(0) - HANDLE_OFFSET;
|
|
gms_compress_context *compress_cxt = get_session_context();
|
|
|
|
Check_Invalid_Input(compress_cxt, handle);
|
|
|
|
if (compress_cxt->context[handle].compressed_data == NULL) {
|
|
ereport(ERROR, (errmsg("no compressed data found in handle %d", handle + HANDLE_OFFSET)));
|
|
}
|
|
|
|
if (compress_cxt->context[handle].compress_level != UNCOMPRESS_LEVEL) {
|
|
ereport(ERROR, (errmsg("handle %d is a compressed handle", handle + HANDLE_OFFSET)));
|
|
}
|
|
|
|
bytea *src = (bytea *)compress_cxt->context[handle].compressed_data;
|
|
Size src_len = VARSIZE_ANY_EXHDR(src);
|
|
Size dst_len = 0;
|
|
bytea *dst = NULL;
|
|
|
|
compress_cxt->context[handle].compressed_data = NULL;
|
|
gzip_uncompress(src, src_len, (void**)&dst, &dst_len);
|
|
|
|
SET_VARSIZE(dst, VARHDRSZ + dst_len);
|
|
|
|
PG_RETURN_BYTEA_P(dst);
|
|
}
|
|
|
|
/* Check if a handle has been opened */
|
|
Datum gms_isopen(PG_FUNCTION_ARGS)
|
|
{
|
|
int handle = PG_GETARG_INT32(0) - HANDLE_OFFSET;
|
|
if (handle < MIN_HANDLE || handle > MAX_HANDLE) {
|
|
return false;
|
|
}
|
|
|
|
return get_session_context()->context[handle].used;
|
|
}
|
|
|
|
/* Using the zlib library and Lemoel Xiv algorithm to implement compression functionality */
|
|
static void gzip_compress(void* src, Size src_len, void** dst, Size* dst_len, int quality)
|
|
{
|
|
/* The compression quality is limited between 1 and 9 */
|
|
if (quality < MIN_QUALITY || quality > MAX_QUALITY) {
|
|
ereport(ERROR, (errmsg("compression quality must be within the range of %d to %d", MIN_QUALITY, MAX_QUALITY)));
|
|
}
|
|
|
|
bytea *input_bytea = (bytea*)src;
|
|
Size compressed_size = compressBound(src_len) + GZIP_COMPRESS_EXTRA_LENGTH;
|
|
bytea *result = (bytea*)palloc(VARHDRSZ + compressed_size);
|
|
SET_VARSIZE(result, VARHDRSZ + compressed_size);
|
|
|
|
z_stream c_stream;
|
|
c_stream.zalloc = NULL;
|
|
c_stream.zfree = NULL;
|
|
c_stream.opaque = NULL;
|
|
// MAX_WBITS + 16 for gzip
|
|
if (deflateInit2(&c_stream, quality, Z_DEFLATED, MAX_WBITS + 16, 8, Z_DEFAULT_STRATEGY) != Z_OK) {
|
|
pfree_ext(result);
|
|
ereport(ERROR, (errmsg("zlib compression initialization failed")));
|
|
}
|
|
|
|
c_stream.avail_out = compressed_size; // output size
|
|
c_stream.next_out = (Bytef*)VARDATA_ANY(result); // output buffer
|
|
c_stream.avail_in = src_len; // input size
|
|
c_stream.next_in = (Bytef*)VARDATA_ANY(input_bytea); // input buffer
|
|
|
|
if (deflate(&c_stream, Z_FINISH) != Z_STREAM_END) {
|
|
pfree_ext(result);
|
|
ereport(ERROR, (errmsg("zlib compression failed")));
|
|
}
|
|
|
|
if (deflateEnd(&c_stream) != Z_OK) {
|
|
pfree_ext(result);
|
|
ereport(ERROR, (errmsg("zlib cleaning up compression stream failed")));
|
|
}
|
|
*dst_len = c_stream.total_out;
|
|
*dst = result;
|
|
}
|
|
|
|
/* Using the zlib library and Lemoel Xiv algorithm to implement uncompression functionality */
|
|
static void gzip_uncompress(void* src, Size src_len, void** dst, Size* dst_len)
|
|
{
|
|
bytea *input_bytea = (bytea*)src;
|
|
if (src_len < GZIP_MIN_LENGTH) {
|
|
ereport(ERROR, (errmsg("too small, minimum length of gzip format is %d bytes", GZIP_MIN_LENGTH)));
|
|
}
|
|
unsigned char *gzip_content = (unsigned char*)VARDATA_ANY(input_bytea);
|
|
if (gzip_content[0] != GZIP_HEADER_1 || gzip_content[1] != GZIP_HEADER_2) {
|
|
ereport(ERROR, (errmsg("data corrupt, invalid compressed data head")));
|
|
}
|
|
uint4 uncompressed_size = *(uint4*)(gzip_content + src_len - sizeof(int));
|
|
#ifdef WORDS_BIGENDIAN
|
|
uncompressed_size = BSWAP32(uncompressed_size);
|
|
#endif
|
|
if (uncompressed_size > MaxAllocSize - VARHDRSZ) {
|
|
ereport(ERROR, (errmsg("data too long, data size cannot exceed 1GB")));
|
|
}
|
|
bytea *result = (bytea*)palloc(VARHDRSZ + uncompressed_size);
|
|
SET_VARSIZE(result, VARHDRSZ + uncompressed_size);
|
|
|
|
z_stream d_stream = { 0 };
|
|
d_stream.zalloc = NULL;
|
|
d_stream.zfree = NULL;
|
|
d_stream.opaque = NULL;
|
|
d_stream.next_in = (Bytef*)VARDATA_ANY(input_bytea);
|
|
d_stream.avail_in = src_len;
|
|
d_stream.avail_out = VARSIZE_ANY_EXHDR(result);
|
|
d_stream.next_out = (Bytef*)VARDATA_ANY(result);
|
|
// MAX_WBITS + 16 for gzip
|
|
if (inflateInit2(&d_stream, 16 + MAX_WBITS) != Z_OK) {
|
|
pfree_ext(result);
|
|
ereport(ERROR, (errmsg("zlib uncompression initialization failed")));
|
|
}
|
|
if (inflate(&d_stream, Z_FINISH) != Z_STREAM_END) {
|
|
pfree_ext(result);
|
|
ereport(ERROR, (errmsg("zlib uncompression failed")));
|
|
}
|
|
|
|
if (inflateEnd(&d_stream) != Z_OK) {
|
|
pfree_ext(result);
|
|
ereport(ERROR, (errmsg("zlib cleaning up uncompression stream failed")));
|
|
}
|
|
*dst_len = d_stream.total_out;
|
|
*dst = result;
|
|
return;
|
|
}
|
|
|
|
/**
|
|
* Because compressed_data is the data source address and
|
|
* uncompressed_data is the newly created data address,
|
|
* it is necessary to point compressed_data to null and release uncompressed_data
|
|
*/
|
|
static void free_context(int handle)
|
|
{
|
|
if (handle < MIN_HANDLE || handle > MAX_HANDLE)
|
|
{
|
|
return;
|
|
}
|
|
gms_compress_context *compress_cxt = get_session_context();
|
|
compress_cxt->context[handle].compressed_data =NULL;
|
|
pfree_ext(compress_cxt->context[handle].uncompressed_data);
|
|
compress_cxt->context[handle].compress_level = -1;
|
|
compress_cxt->context[handle].used = false;
|
|
return;
|
|
}
|
|
|
|
static inline void Check_Invalid_Input(gms_compress_context* compress_cxt, int handle) {
|
|
if (handle < MIN_HANDLE || handle > MAX_HANDLE) {
|
|
ereport(ERROR, (errmsg("invalid handle, it be within the range of %d to %d",
|
|
MIN_HANDLE + HANDLE_OFFSET, MAX_HANDLE + HANDLE_OFFSET)));
|
|
}
|
|
|
|
if (!compress_cxt->context[handle].used) {
|
|
ereport(ERROR, (errmsg("handle %d is not be used", handle + HANDLE_OFFSET)));
|
|
}
|
|
}
|