Add initial support for deflate compression in maxavro
Maxavro now supports reading records with the zlib deflate algorithm. With this change, each data block is read into memory in one IO operation. This allows the library to decompress the data block if necessary. The avrorouter does not yet use compression when writing the records.
This commit is contained in:
parent
1d8d526a01
commit
c47ef968f7
119
avro/maxavro.c
119
avro/maxavro.c
@ -14,7 +14,7 @@
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <stdbool.h>
|
||||
#include "maxavro.h"
|
||||
#include "maxavro_internal.h"
|
||||
#include <maxscale/log_manager.h>
|
||||
#include <errno.h>
|
||||
|
||||
@ -37,6 +37,39 @@
|
||||
* @return True if value was read successfully
|
||||
*/
|
||||
bool maxavro_read_integer(MAXAVRO_FILE* file, uint64_t *dest)
|
||||
{
|
||||
uint64_t rval = 0;
|
||||
uint8_t nread = 0;
|
||||
uint8_t byte;
|
||||
do
|
||||
{
|
||||
if (nread >= MAX_INTEGER_SIZE)
|
||||
{
|
||||
file->last_error = MAXAVRO_ERR_VALUE_OVERFLOW;
|
||||
return false;
|
||||
}
|
||||
|
||||
if (file->buffer_ptr < file->buffer_end)
|
||||
{
|
||||
byte = *file->buffer_ptr;
|
||||
file->buffer_ptr++;
|
||||
}
|
||||
else
|
||||
{
|
||||
return false;
|
||||
}
|
||||
rval |= (uint64_t)(byte & 0x7f) << (nread++ * 7);
|
||||
}
|
||||
while (more_bytes(byte));
|
||||
|
||||
if (dest)
|
||||
{
|
||||
*dest = avro_decode(rval);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
bool maxavro_read_integer_from_file(MAXAVRO_FILE* file, uint64_t *dest)
|
||||
{
|
||||
uint64_t rval = 0;
|
||||
uint8_t nread = 0;
|
||||
@ -110,23 +143,12 @@ char* maxavro_read_string(MAXAVRO_FILE* file)
|
||||
|
||||
if (maxavro_read_integer(file, &len))
|
||||
{
|
||||
key = malloc(len + 1);
|
||||
key = MXS_MALLOC(len + 1);
|
||||
if (key)
|
||||
{
|
||||
size_t nread = fread(key, 1, len, file->file);
|
||||
if (nread == len)
|
||||
{
|
||||
key[len] = '\0';
|
||||
}
|
||||
else
|
||||
{
|
||||
if (nread != 0)
|
||||
{
|
||||
file->last_error = MAXAVRO_ERR_IO;
|
||||
}
|
||||
free(key);
|
||||
key = NULL;
|
||||
}
|
||||
memcpy(key, file->buffer_ptr, len);
|
||||
key[len] = '\0';
|
||||
file->buffer_ptr += len;
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -149,14 +171,8 @@ bool maxavro_skip_string(MAXAVRO_FILE* file)
|
||||
|
||||
if (maxavro_read_integer(file, &len))
|
||||
{
|
||||
if (fseek(file->file, len, SEEK_CUR) != 0)
|
||||
{
|
||||
file->last_error = MAXAVRO_ERR_IO;
|
||||
}
|
||||
else
|
||||
{
|
||||
return true;
|
||||
}
|
||||
file->buffer_ptr += len;
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
@ -186,13 +202,16 @@ uint64_t avro_length_string(const char* str)
|
||||
*/
|
||||
bool maxavro_read_float(MAXAVRO_FILE* file, float *dest)
|
||||
{
|
||||
size_t nread = fread(dest, 1, sizeof(*dest), file->file);
|
||||
if (nread != sizeof(*dest) && nread != 0)
|
||||
bool rval = false;
|
||||
|
||||
if (file->buffer_ptr + sizeof(*dest) < file->buffer_end)
|
||||
{
|
||||
file->last_error = MAXAVRO_ERR_IO;
|
||||
return false;
|
||||
memcpy(dest, file->buffer_ptr, sizeof(*dest));
|
||||
file->buffer_ptr += sizeof(*dest);
|
||||
rval = true;
|
||||
}
|
||||
return nread == sizeof(*dest);
|
||||
|
||||
return rval;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -217,13 +236,16 @@ uint64_t avro_length_float(float val)
|
||||
*/
|
||||
bool maxavro_read_double(MAXAVRO_FILE* file, double *dest)
|
||||
{
|
||||
size_t nread = fread(dest, 1, sizeof(*dest), file->file);
|
||||
if (nread != sizeof(*dest) && nread != 0)
|
||||
bool rval = false;
|
||||
|
||||
if (file->buffer_ptr + sizeof(*dest) < file->buffer_end)
|
||||
{
|
||||
file->last_error = MAXAVRO_ERR_IO;
|
||||
return false;
|
||||
memcpy(dest, file->buffer_ptr, sizeof(*dest));
|
||||
file->buffer_ptr += sizeof(*dest);
|
||||
rval = true;
|
||||
}
|
||||
return nread == sizeof(*dest);
|
||||
|
||||
return rval;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -246,13 +268,12 @@ uint64_t avro_length_double(double val)
|
||||
* @return A read map or NULL if an error occurred. The return value needs to be
|
||||
* freed with maxavro_map_free().
|
||||
*/
|
||||
MAXAVRO_MAP* maxavro_map_read(MAXAVRO_FILE *file)
|
||||
MAXAVRO_MAP* maxavro_read_map_from_file(MAXAVRO_FILE *file)
|
||||
{
|
||||
|
||||
MAXAVRO_MAP* rval = NULL;
|
||||
uint64_t blocks;
|
||||
|
||||
if (!maxavro_read_integer(file, &blocks))
|
||||
if (!maxavro_read_integer_from_file(file, &blocks))
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
@ -262,23 +283,29 @@ MAXAVRO_MAP* maxavro_map_read(MAXAVRO_FILE *file)
|
||||
for (long i = 0; i < blocks; i++)
|
||||
{
|
||||
MAXAVRO_MAP* val = calloc(1, sizeof(MAXAVRO_MAP));
|
||||
if (val && (val->key = maxavro_read_string(file)) && (val->value = maxavro_read_string(file)))
|
||||
uint64_t keylen;
|
||||
uint64_t valuelen;
|
||||
|
||||
if (val && maxavro_read_integer_from_file(file, &keylen) &&
|
||||
(val->key = MXS_MALLOC(keylen + 1)) &&
|
||||
fread(val->key, 1, keylen, file->file) == keylen &&
|
||||
maxavro_read_integer_from_file(file, &valuelen) &&
|
||||
(val->value = MXS_MALLOC(valuelen + 1)) &&
|
||||
fread(val->value, 1, valuelen, file->file) == valuelen)
|
||||
{
|
||||
val->key[keylen] = '\0';
|
||||
val->value[valuelen] = '\0';
|
||||
val->next = rval;
|
||||
rval = val;
|
||||
}
|
||||
else
|
||||
{
|
||||
if (val == NULL)
|
||||
{
|
||||
file->last_error = MAXAVRO_ERR_MEMORY;
|
||||
}
|
||||
maxavro_map_free(val);
|
||||
maxavro_map_free(rval);
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
if (!maxavro_read_integer(file, &blocks))
|
||||
if (!maxavro_read_integer_from_file(file, &blocks))
|
||||
{
|
||||
maxavro_map_free(rval);
|
||||
return NULL;
|
||||
@ -299,9 +326,9 @@ void maxavro_map_free(MAXAVRO_MAP *value)
|
||||
{
|
||||
MAXAVRO_MAP* tmp = value;
|
||||
value = value->next;
|
||||
free(tmp->key);
|
||||
free(tmp->value);
|
||||
free(tmp);
|
||||
MXS_FREE(tmp->key);
|
||||
MXS_FREE(tmp->value);
|
||||
MXS_FREE(tmp);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -55,6 +55,13 @@ typedef struct
|
||||
size_t num_fields;
|
||||
} MAXAVRO_SCHEMA;
|
||||
|
||||
enum maxavro_codec
|
||||
{
|
||||
MAXAVRO_CODEC_NULL,
|
||||
MAXAVRO_CODEC_DEFLATE,
|
||||
MAXAVRO_CODEC_SNAPPY, /**< Not yet implemented */
|
||||
};
|
||||
|
||||
enum maxavro_error
|
||||
{
|
||||
MAXAVRO_ERR_NONE,
|
||||
@ -68,14 +75,17 @@ typedef struct
|
||||
FILE* file;
|
||||
char* filename; /*< The filename */
|
||||
MAXAVRO_SCHEMA* schema;
|
||||
enum maxavro_codec codec;
|
||||
uint64_t blocks_read; /*< Total number of data blocks read */
|
||||
uint64_t records_read; /*< Total number of records read */
|
||||
uint64_t bytes_read; /*< Total number of bytes read */
|
||||
uint64_t records_in_block;
|
||||
uint64_t records_read_from_block;
|
||||
uint64_t bytes_read_from_block;
|
||||
uint64_t block_size; /*< Size of the block in bytes */
|
||||
|
||||
uint64_t buffer_size; /*< Size of the block in bytes */
|
||||
uint8_t *buffer; /**< The uncompressed data */
|
||||
uint8_t *buffer_end; /**< The byte after the end of the buffer*/
|
||||
uint8_t *buffer_ptr; /**< Pointer to @c buffer which is moved as records are read */
|
||||
/** The position @c ftell returns before the first record is read */
|
||||
long header_end_pos;
|
||||
long data_start_pos;
|
||||
@ -124,48 +134,24 @@ typedef struct avro_map_value
|
||||
int blocks; /*< Number of added key-value blocks */
|
||||
} MAXAVRO_MAP;
|
||||
|
||||
/** Data block generation */
|
||||
MAXAVRO_DATABLOCK* maxavro_datablock_allocate(MAXAVRO_FILE *file, size_t buffersize);
|
||||
void maxavro_datablock_free(MAXAVRO_DATABLOCK* block);
|
||||
bool maxavro_datablock_finalize(MAXAVRO_DATABLOCK* block);
|
||||
/** Opening and closing files */
|
||||
MAXAVRO_FILE* maxavro_file_open(const char* filename);
|
||||
void maxavro_file_close(MAXAVRO_FILE *file);
|
||||
|
||||
/** Adding values to a datablock. The caller must ensure that the inserted
|
||||
* values conform to the file schema and that the required amount of fields
|
||||
* is added before finalizing the block. */
|
||||
bool maxavro_datablock_add_integer(MAXAVRO_DATABLOCK *file, uint64_t val);
|
||||
bool maxavro_datablock_add_string(MAXAVRO_DATABLOCK *file, const char* str);
|
||||
bool maxavro_datablock_add_float(MAXAVRO_DATABLOCK *file, float val);
|
||||
bool maxavro_datablock_add_double(MAXAVRO_DATABLOCK *file, double val);
|
||||
|
||||
/** Reading primitives */
|
||||
bool maxavro_read_integer(MAXAVRO_FILE *file, uint64_t *val);
|
||||
char* maxavro_read_string(MAXAVRO_FILE *file);
|
||||
bool maxavro_skip_string(MAXAVRO_FILE* file);
|
||||
bool maxavro_read_float(MAXAVRO_FILE *file, float *dest);
|
||||
bool maxavro_read_double(MAXAVRO_FILE *file, double *dest);
|
||||
|
||||
/** Reading complex types */
|
||||
MAXAVRO_MAP* maxavro_map_read(MAXAVRO_FILE *file);
|
||||
void maxavro_map_free(MAXAVRO_MAP *value);
|
||||
|
||||
/** Reading and seeking records */
|
||||
/** Reading records */
|
||||
json_t* maxavro_record_read_json(MAXAVRO_FILE *file);
|
||||
GWBUF* maxavro_record_read_binary(MAXAVRO_FILE *file);
|
||||
|
||||
/** Navigation of the file */
|
||||
bool maxavro_record_seek(MAXAVRO_FILE *file, uint64_t offset);
|
||||
bool maxavro_record_set_pos(MAXAVRO_FILE *file, long pos);
|
||||
bool maxavro_next_block(MAXAVRO_FILE *file);
|
||||
|
||||
/** File operations */
|
||||
MAXAVRO_FILE* maxavro_file_open(const char* filename);
|
||||
void maxavro_file_close(MAXAVRO_FILE *file);
|
||||
/** Get binary format header */
|
||||
GWBUF* maxavro_file_binary_header(MAXAVRO_FILE *file);
|
||||
|
||||
/** File error functions */
|
||||
enum maxavro_error maxavro_get_error(MAXAVRO_FILE *file);
|
||||
const char* maxavro_get_error_string(MAXAVRO_FILE *file);
|
||||
|
||||
/** Schema creation */
|
||||
MAXAVRO_SCHEMA* maxavro_schema_alloc(const char* json);
|
||||
void maxavro_schema_free(MAXAVRO_SCHEMA* schema);
|
||||
|
||||
#endif
|
||||
|
@ -53,8 +53,8 @@ void maxavro_datablock_free(MAXAVRO_DATABLOCK* block)
|
||||
{
|
||||
if (block)
|
||||
{
|
||||
free(block->buffer);
|
||||
free(block);
|
||||
MXS_FREE(block->buffer);
|
||||
MXS_FREE(block);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -11,10 +11,11 @@
|
||||
* Public License.
|
||||
*/
|
||||
|
||||
#include "maxavro.h"
|
||||
#include "maxavro_internal.h"
|
||||
#include <errno.h>
|
||||
#include <string.h>
|
||||
#include <maxscale/log_manager.h>
|
||||
#include <zlib.h>
|
||||
|
||||
static bool maxavro_read_sync(FILE *file, uint8_t* sync)
|
||||
{
|
||||
@ -65,7 +66,7 @@ bool maxavro_verify_block(MAXAVRO_FILE *file)
|
||||
if (memcmp(file->sync, sync, SYNC_MARKER_SIZE))
|
||||
{
|
||||
long pos = ftell(file->file);
|
||||
long expected = file->data_start_pos + file->block_size + SYNC_MARKER_SIZE;
|
||||
long expected = file->data_start_pos + file->buffer_size + SYNC_MARKER_SIZE;
|
||||
if (pos != expected)
|
||||
{
|
||||
MXS_ERROR("Sync marker mismatch due to wrong file offset. file is at %ld "
|
||||
@ -80,37 +81,126 @@ bool maxavro_verify_block(MAXAVRO_FILE *file)
|
||||
|
||||
/** Increment block count */
|
||||
file->blocks_read++;
|
||||
file->bytes_read += file->block_size;
|
||||
file->bytes_read += file->buffer_size;
|
||||
return true;
|
||||
}
|
||||
|
||||
static uint8_t* read_block_data(MAXAVRO_FILE* file, long deflate_size)
|
||||
{
|
||||
uint8_t *temp_buffer = MXS_MALLOC(deflate_size);
|
||||
uint8_t *buffer = NULL;
|
||||
|
||||
if (temp_buffer && fread(temp_buffer, 1, deflate_size, file->file) == deflate_size)
|
||||
{
|
||||
unsigned long inflate_size = 0;
|
||||
|
||||
switch (file->codec)
|
||||
{
|
||||
case MAXAVRO_CODEC_NULL:
|
||||
file->buffer_size = deflate_size;
|
||||
buffer = temp_buffer;
|
||||
temp_buffer = NULL;
|
||||
break;
|
||||
|
||||
case MAXAVRO_CODEC_DEFLATE:
|
||||
inflate_size = deflate_size * 2;
|
||||
|
||||
if ((buffer = MXS_MALLOC(inflate_size)))
|
||||
{
|
||||
z_stream stream;
|
||||
stream.avail_in = deflate_size;
|
||||
stream.next_in = temp_buffer;
|
||||
stream.avail_out = inflate_size;
|
||||
stream.next_out = buffer;
|
||||
stream.zalloc = 0;
|
||||
stream.zfree = 0;
|
||||
inflateInit2(&stream, -15);
|
||||
|
||||
int rc;
|
||||
|
||||
while((rc = inflate(&stream, Z_FINISH)) == Z_BUF_ERROR)
|
||||
{
|
||||
int increment = inflate_size;
|
||||
uint8_t *temp = MXS_REALLOC(buffer, inflate_size + increment);
|
||||
|
||||
if (temp)
|
||||
{
|
||||
buffer = temp;
|
||||
stream.avail_out += increment;
|
||||
stream.next_out = buffer + stream.total_out;
|
||||
inflate_size += increment;
|
||||
}
|
||||
else
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (rc == Z_STREAM_END)
|
||||
{
|
||||
file->buffer_size = stream.total_out;
|
||||
}
|
||||
else
|
||||
{
|
||||
MXS_ERROR("Failed to inflate value: %s", zError(rc));
|
||||
MXS_FREE(buffer);
|
||||
buffer = NULL;
|
||||
}
|
||||
|
||||
inflateEnd(&stream);
|
||||
}
|
||||
break;
|
||||
|
||||
case MAXAVRO_CODEC_SNAPPY:
|
||||
// TODO: implement snappy compression
|
||||
break;
|
||||
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
MXS_FREE(temp_buffer);
|
||||
}
|
||||
|
||||
return buffer;
|
||||
}
|
||||
|
||||
bool maxavro_read_datablock_start(MAXAVRO_FILE* file)
|
||||
{
|
||||
/** The actual start of the binary block */
|
||||
file->block_start_pos = ftell(file->file);
|
||||
file->metadata_read = false;
|
||||
uint64_t records, bytes;
|
||||
bool rval = maxavro_read_integer(file, &records) && maxavro_read_integer(file, &bytes);
|
||||
bool rval = maxavro_read_integer_from_file(file, &records) &&
|
||||
maxavro_read_integer_from_file(file, &bytes);
|
||||
|
||||
if (rval)
|
||||
{
|
||||
rval = false;
|
||||
long pos = ftell(file->file);
|
||||
|
||||
if (pos == -1)
|
||||
{
|
||||
rval = false;
|
||||
char err[MXS_STRERROR_BUFLEN];
|
||||
MXS_ERROR("Failed to read datablock start: %d, %s", errno,
|
||||
strerror_r(errno, err, sizeof(err)));
|
||||
}
|
||||
else
|
||||
{
|
||||
file->block_size = bytes;
|
||||
file->records_in_block = records;
|
||||
file->records_read_from_block = 0;
|
||||
file->data_start_pos = pos;
|
||||
ss_dassert(file->data_start_pos > file->block_start_pos);
|
||||
file->metadata_read = true;
|
||||
MXS_FREE(file->buffer);
|
||||
file->buffer = read_block_data(file, bytes);
|
||||
|
||||
if (file->buffer)
|
||||
{
|
||||
file->buffer_end = file->buffer + file->buffer_size;
|
||||
file->buffer_ptr = file->buffer;
|
||||
file->records_in_block = records;
|
||||
file->records_read_from_block = 0;
|
||||
file->data_start_pos = pos;
|
||||
ss_dassert(file->data_start_pos > file->block_start_pos);
|
||||
file->metadata_read = true;
|
||||
rval = maxavro_verify_block(file);
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (maxavro_get_error(file) != MAXAVRO_ERR_NONE)
|
||||
@ -131,7 +221,7 @@ bool maxavro_read_datablock_start(MAXAVRO_FILE* file)
|
||||
static char* read_schema(MAXAVRO_FILE* file)
|
||||
{
|
||||
char *rval = NULL;
|
||||
MAXAVRO_MAP* head = maxavro_map_read(file);
|
||||
MAXAVRO_MAP* head = maxavro_read_map_from_file(file);
|
||||
MAXAVRO_MAP* map = head;
|
||||
|
||||
while (map)
|
||||
@ -139,7 +229,25 @@ static char* read_schema(MAXAVRO_FILE* file)
|
||||
if (strcmp(map->key, "avro.schema") == 0)
|
||||
{
|
||||
rval = strdup(map->value);
|
||||
break;
|
||||
}
|
||||
if (strcmp(map->key, "avro.codec") == 0)
|
||||
{
|
||||
if (strcmp(map->value, "null") == 0)
|
||||
{
|
||||
file->codec = MAXAVRO_CODEC_NULL;
|
||||
}
|
||||
else if (strcmp(map->value, "deflate") == 0)
|
||||
{
|
||||
file->codec = MAXAVRO_CODEC_DEFLATE;
|
||||
}
|
||||
else if (strcmp(map->value, "snappy") == 0)
|
||||
{
|
||||
file->codec = MAXAVRO_CODEC_SNAPPY;
|
||||
}
|
||||
else
|
||||
{
|
||||
MXS_ERROR("Unknown Avro codec: %s", map->value);
|
||||
}
|
||||
}
|
||||
map = map->next;
|
||||
}
|
||||
@ -216,7 +324,7 @@ MAXAVRO_FILE* maxavro_file_open(const char* filename)
|
||||
maxavro_schema_free(avrofile->schema);
|
||||
error = true;
|
||||
}
|
||||
free(schema);
|
||||
MXS_FREE(schema);
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -231,8 +339,8 @@ MAXAVRO_FILE* maxavro_file_open(const char* filename)
|
||||
if (error)
|
||||
{
|
||||
fclose(file);
|
||||
free(avrofile);
|
||||
free(my_filename);
|
||||
MXS_FREE(avrofile);
|
||||
MXS_FREE(my_filename);
|
||||
avrofile = NULL;
|
||||
}
|
||||
|
||||
@ -284,9 +392,10 @@ void maxavro_file_close(MAXAVRO_FILE *file)
|
||||
if (file)
|
||||
{
|
||||
fclose(file->file);
|
||||
free(file->filename);
|
||||
MXS_FREE(file->buffer);
|
||||
MXS_FREE(file->filename);
|
||||
maxavro_schema_free(file->schema);
|
||||
free(file);
|
||||
MXS_FREE(file);
|
||||
}
|
||||
}
|
||||
|
||||
|
56
avro/maxavro_internal.h
Normal file
56
avro/maxavro_internal.h
Normal file
@ -0,0 +1,56 @@
|
||||
/*
|
||||
* Copyright (c) 2016 MariaDB Corporation Ab
|
||||
*
|
||||
* Use of this software is governed by the Business Source License included
|
||||
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
|
||||
*
|
||||
* Change Date: 2019-07-01
|
||||
*
|
||||
* On the date above, in accordance with the Business Source License, use
|
||||
* of this software will be governed by version 2 or later of the General
|
||||
* Public License.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "maxavro.h"
|
||||
#include <maxscale/alloc.h>
|
||||
|
||||
/**
|
||||
* Private header for maxavro
|
||||
*/
|
||||
|
||||
/** Reading primitives */
|
||||
bool maxavro_read_integer(MAXAVRO_FILE *file, uint64_t *val);
|
||||
char* maxavro_read_string(MAXAVRO_FILE *file);
|
||||
bool maxavro_skip_string(MAXAVRO_FILE* file);
|
||||
bool maxavro_read_float(MAXAVRO_FILE *file, float *dest);
|
||||
bool maxavro_read_double(MAXAVRO_FILE *file, double *dest);
|
||||
|
||||
/** Only used when opening the file */
|
||||
bool maxavro_read_integer_from_file(MAXAVRO_FILE *file, uint64_t *val);
|
||||
|
||||
/** Reading complex types */
|
||||
MAXAVRO_MAP* maxavro_read_map_from_file(MAXAVRO_FILE *file);
|
||||
void maxavro_map_free(MAXAVRO_MAP *value);
|
||||
|
||||
/**
|
||||
* The following functionality is not yet fully implemented
|
||||
*/
|
||||
|
||||
/** Schema creation */
|
||||
MAXAVRO_SCHEMA* maxavro_schema_alloc(const char* json);
|
||||
void maxavro_schema_free(MAXAVRO_SCHEMA* schema);
|
||||
|
||||
/** Data block generation */
|
||||
MAXAVRO_DATABLOCK* maxavro_datablock_allocate(MAXAVRO_FILE *file, size_t buffersize);
|
||||
void maxavro_datablock_free(MAXAVRO_DATABLOCK* block);
|
||||
bool maxavro_datablock_finalize(MAXAVRO_DATABLOCK* block);
|
||||
|
||||
/** Adding values to a datablock. The caller must ensure that the inserted
|
||||
* values conform to the file schema and that the required amount of fields
|
||||
* is added before finalizing the block. */
|
||||
bool maxavro_datablock_add_integer(MAXAVRO_DATABLOCK *file, uint64_t val);
|
||||
bool maxavro_datablock_add_string(MAXAVRO_DATABLOCK *file, const char* str);
|
||||
bool maxavro_datablock_add_float(MAXAVRO_DATABLOCK *file, float val);
|
||||
bool maxavro_datablock_add_double(MAXAVRO_DATABLOCK *file, double val);
|
@ -12,7 +12,7 @@
|
||||
*/
|
||||
|
||||
#include <maxscale/cdefs.h>
|
||||
#include "maxavro.h"
|
||||
#include "maxavro_internal.h"
|
||||
#include <string.h>
|
||||
#include <maxscale/debug.h>
|
||||
#include <maxscale/log_manager.h>
|
||||
@ -36,12 +36,11 @@ static json_t* read_and_pack_value(MAXAVRO_FILE *file, MAXAVRO_SCHEMA_FIELD *fie
|
||||
switch (field->type)
|
||||
{
|
||||
case MAXAVRO_TYPE_BOOL:
|
||||
if (file->buffer_ptr < file->buffer_end)
|
||||
{
|
||||
int i = 0;
|
||||
if (fread(&i, 1, 1, file->file) == 1)
|
||||
{
|
||||
value = json_pack("b", i);
|
||||
}
|
||||
memcpy(&i, file->buffer_ptr++, 1);
|
||||
value = json_pack("b", i);
|
||||
}
|
||||
break;
|
||||
|
||||
@ -103,7 +102,7 @@ static json_t* read_and_pack_value(MAXAVRO_FILE *file, MAXAVRO_SCHEMA_FIELD *fie
|
||||
if (str)
|
||||
{
|
||||
value = json_string(str);
|
||||
free(str);
|
||||
MXS_FREE(str);
|
||||
}
|
||||
}
|
||||
break;
|
||||
@ -220,19 +219,7 @@ bool maxavro_next_block(MAXAVRO_FILE *file)
|
||||
{
|
||||
if (file->last_error == MAXAVRO_ERR_NONE)
|
||||
{
|
||||
if (file->records_read_from_block < file->records_in_block)
|
||||
{
|
||||
file->records_read += file->records_in_block - file->records_read_from_block;
|
||||
long curr_pos = ftell(file->file);
|
||||
long offset = (long) file->block_size - (curr_pos - file->data_start_pos);
|
||||
|
||||
if (offset > 0)
|
||||
{
|
||||
fseek(file->file, offset, SEEK_CUR);
|
||||
}
|
||||
}
|
||||
|
||||
return maxavro_verify_block(file) && maxavro_read_datablock_start(file);
|
||||
return maxavro_read_datablock_start(file);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
@ -268,7 +255,7 @@ bool maxavro_record_seek(MAXAVRO_FILE *file, uint64_t offset)
|
||||
{
|
||||
/** Skip full blocks that don't have the position we want */
|
||||
offset -= file->records_in_block;
|
||||
fseek(file->file, file->block_size, SEEK_CUR);
|
||||
fseek(file->file, file->buffer_size, SEEK_CUR);
|
||||
maxavro_next_block(file);
|
||||
}
|
||||
|
||||
@ -321,7 +308,7 @@ GWBUF* maxavro_record_read_binary(MAXAVRO_FILE *file)
|
||||
return NULL;
|
||||
}
|
||||
|
||||
long data_size = (file->data_start_pos - file->block_start_pos) + file->block_size;
|
||||
long data_size = (file->data_start_pos - file->block_start_pos) + file->buffer_size;
|
||||
ss_dassert(data_size > 0);
|
||||
rval = gwbuf_alloc(data_size + SYNC_MARKER_SIZE);
|
||||
|
||||
|
@ -11,7 +11,7 @@
|
||||
* Public License.
|
||||
*/
|
||||
|
||||
#include "maxavro.h"
|
||||
#include "maxavro_internal.h"
|
||||
#include <jansson.h>
|
||||
#include <string.h>
|
||||
#include <maxscale/debug.h>
|
||||
@ -152,7 +152,7 @@ MAXAVRO_SCHEMA* maxavro_schema_alloc(const char* json)
|
||||
|
||||
for (int j = 0; j < i; j++)
|
||||
{
|
||||
free(rval->fields[j].name);
|
||||
MXS_FREE(rval->fields[j].name);
|
||||
}
|
||||
break;
|
||||
}
|
||||
@ -174,7 +174,7 @@ MAXAVRO_SCHEMA* maxavro_schema_alloc(const char* json)
|
||||
|
||||
if (error)
|
||||
{
|
||||
free(rval);
|
||||
MXS_FREE(rval);
|
||||
rval = NULL;
|
||||
}
|
||||
}
|
||||
@ -190,7 +190,7 @@ static void maxavro_schema_field_free(MAXAVRO_SCHEMA_FIELD *field)
|
||||
{
|
||||
if (field)
|
||||
{
|
||||
free(field->name);
|
||||
MXS_FREE(field->name);
|
||||
if (field->type == MAXAVRO_TYPE_ENUM)
|
||||
{
|
||||
json_decref((json_t*)field->extra);
|
||||
@ -210,7 +210,7 @@ void maxavro_schema_free(MAXAVRO_SCHEMA* schema)
|
||||
{
|
||||
maxavro_schema_field_free(&schema->fields[i]);
|
||||
}
|
||||
free(schema->fields);
|
||||
free(schema);
|
||||
MXS_FREE(schema->fields);
|
||||
MXS_FREE(schema);
|
||||
}
|
||||
}
|
||||
|
@ -14,7 +14,7 @@
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <stdbool.h>
|
||||
#include "maxavro.h"
|
||||
#include "maxavro_internal.h"
|
||||
#include <maxscale/log_manager.h>
|
||||
#include <errno.h>
|
||||
|
||||
|
@ -89,7 +89,7 @@ int check_file(const char* filename)
|
||||
if (verbose && !dump)
|
||||
{
|
||||
printf("Block %lu: %lu records, %lu bytes\n", file->blocks_read,
|
||||
file->records_in_block, file->block_size);
|
||||
file->records_in_block, file->buffer_size);
|
||||
}
|
||||
}
|
||||
while (num_rows != 0 && maxavro_next_block(file));
|
||||
|
@ -615,7 +615,7 @@ static bool stream_json(AVRO_CLIENT *client)
|
||||
set_current_gtid(client, row);
|
||||
json_decref(row);
|
||||
}
|
||||
bytes += file->block_size;
|
||||
bytes += file->buffer_size;
|
||||
}
|
||||
while (maxavro_next_block(file) && bytes < AVRO_DATA_BURST_SIZE);
|
||||
|
||||
@ -639,7 +639,7 @@ static bool stream_binary(AVRO_CLIENT *client)
|
||||
|
||||
while (rc > 0 && bytes < AVRO_DATA_BURST_SIZE)
|
||||
{
|
||||
bytes += file->block_size;
|
||||
bytes += file->buffer_size;
|
||||
if ((buffer = maxavro_record_read_binary(file)))
|
||||
{
|
||||
rc = dcb->func.write(dcb, buffer);
|
||||
|
Loading…
x
Reference in New Issue
Block a user