From c47ef968f7d004e291df4ee12e1abd642ad1398a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Tue, 21 Feb 2017 13:03:38 +0200 Subject: [PATCH] Add initial support for deflate compression in maxavro Maxavro now supports reading records with the zlib deflate algorithm. With this change, each data block is read into memory in one IO operation. This allows the library to decompress the data block if necessary. The avrorouter does not yet use compression when writing the records. --- avro/maxavro.c | 119 ++++++++------ avro/maxavro.h | 52 +++---- avro/maxavro_datablock.c | 4 +- avro/maxavro_file.c | 145 +++++++++++++++--- avro/maxavro_internal.h | 56 +++++++ avro/maxavro_record.c | 29 +--- avro/maxavro_schema.c | 12 +- avro/maxavro_write.c | 2 +- avro/maxavrocheck.c | 2 +- .../modules/routing/avrorouter/avro_client.c | 4 +- 10 files changed, 295 insertions(+), 130 deletions(-) create mode 100644 avro/maxavro_internal.h diff --git a/avro/maxavro.c b/avro/maxavro.c index cf8f9ca51..9dee71363 100644 --- a/avro/maxavro.c +++ b/avro/maxavro.c @@ -14,7 +14,7 @@ #include #include #include -#include "maxavro.h" +#include "maxavro_internal.h" #include #include @@ -37,6 +37,39 @@ * @return True if value was read successfully */ bool maxavro_read_integer(MAXAVRO_FILE* file, uint64_t *dest) +{ + uint64_t rval = 0; + uint8_t nread = 0; + uint8_t byte; + do + { + if (nread >= MAX_INTEGER_SIZE) + { + file->last_error = MAXAVRO_ERR_VALUE_OVERFLOW; + return false; + } + + if (file->buffer_ptr < file->buffer_end) + { + byte = *file->buffer_ptr; + file->buffer_ptr++; + } + else + { + return false; + } + rval |= (uint64_t)(byte & 0x7f) << (nread++ * 7); + } + while (more_bytes(byte)); + + if (dest) + { + *dest = avro_decode(rval); + } + return true; +} + +bool maxavro_read_integer_from_file(MAXAVRO_FILE* file, uint64_t *dest) { uint64_t rval = 0; uint8_t nread = 0; @@ -110,23 +143,12 @@ char* maxavro_read_string(MAXAVRO_FILE* file) if (maxavro_read_integer(file, &len)) { - key = malloc(len + 1); + key = MXS_MALLOC(len + 1); if (key) { - size_t nread = fread(key, 1, len, file->file); - if (nread == len) - { - key[len] = '\0'; - } - else - { - if (nread != 0) - { - file->last_error = MAXAVRO_ERR_IO; - } - free(key); - key = NULL; - } + memcpy(key, file->buffer_ptr, len); + key[len] = '\0'; + file->buffer_ptr += len; } else { @@ -149,14 +171,8 @@ bool maxavro_skip_string(MAXAVRO_FILE* file) if (maxavro_read_integer(file, &len)) { - if (fseek(file->file, len, SEEK_CUR) != 0) - { - file->last_error = MAXAVRO_ERR_IO; - } - else - { - return true; - } + file->buffer_ptr += len; + return true; } return false; @@ -186,13 +202,16 @@ uint64_t avro_length_string(const char* str) */ bool maxavro_read_float(MAXAVRO_FILE* file, float *dest) { - size_t nread = fread(dest, 1, sizeof(*dest), file->file); - if (nread != sizeof(*dest) && nread != 0) + bool rval = false; + + if (file->buffer_ptr + sizeof(*dest) < file->buffer_end) { - file->last_error = MAXAVRO_ERR_IO; - return false; + memcpy(dest, file->buffer_ptr, sizeof(*dest)); + file->buffer_ptr += sizeof(*dest); + rval = true; } - return nread == sizeof(*dest); + + return rval; } /** @@ -217,13 +236,16 @@ uint64_t avro_length_float(float val) */ bool maxavro_read_double(MAXAVRO_FILE* file, double *dest) { - size_t nread = fread(dest, 1, sizeof(*dest), file->file); - if (nread != sizeof(*dest) && nread != 0) + bool rval = false; + + if (file->buffer_ptr + sizeof(*dest) < file->buffer_end) { - file->last_error = MAXAVRO_ERR_IO; - return false; + memcpy(dest, file->buffer_ptr, sizeof(*dest)); + file->buffer_ptr += sizeof(*dest); + rval = true; } - return nread == sizeof(*dest); + + return rval; } /** @@ -246,13 +268,12 @@ uint64_t avro_length_double(double val) * @return A read map or NULL if an error occurred. The return value needs to be * freed with maxavro_map_free(). */ -MAXAVRO_MAP* maxavro_map_read(MAXAVRO_FILE *file) +MAXAVRO_MAP* maxavro_read_map_from_file(MAXAVRO_FILE *file) { - MAXAVRO_MAP* rval = NULL; uint64_t blocks; - if (!maxavro_read_integer(file, &blocks)) + if (!maxavro_read_integer_from_file(file, &blocks)) { return NULL; } @@ -262,23 +283,29 @@ MAXAVRO_MAP* maxavro_map_read(MAXAVRO_FILE *file) for (long i = 0; i < blocks; i++) { MAXAVRO_MAP* val = calloc(1, sizeof(MAXAVRO_MAP)); - if (val && (val->key = maxavro_read_string(file)) && (val->value = maxavro_read_string(file))) + uint64_t keylen; + uint64_t valuelen; + + if (val && maxavro_read_integer_from_file(file, &keylen) && + (val->key = MXS_MALLOC(keylen + 1)) && + fread(val->key, 1, keylen, file->file) == keylen && + maxavro_read_integer_from_file(file, &valuelen) && + (val->value = MXS_MALLOC(valuelen + 1)) && + fread(val->value, 1, valuelen, file->file) == valuelen) { + val->key[keylen] = '\0'; + val->value[valuelen] = '\0'; val->next = rval; rval = val; } else { - if (val == NULL) - { - file->last_error = MAXAVRO_ERR_MEMORY; - } maxavro_map_free(val); maxavro_map_free(rval); return NULL; } } - if (!maxavro_read_integer(file, &blocks)) + if (!maxavro_read_integer_from_file(file, &blocks)) { maxavro_map_free(rval); return NULL; @@ -299,9 +326,9 @@ void maxavro_map_free(MAXAVRO_MAP *value) { MAXAVRO_MAP* tmp = value; value = value->next; - free(tmp->key); - free(tmp->value); - free(tmp); + MXS_FREE(tmp->key); + MXS_FREE(tmp->value); + MXS_FREE(tmp); } } diff --git a/avro/maxavro.h b/avro/maxavro.h index 83ea63ea9..00a9649b9 100644 --- a/avro/maxavro.h +++ b/avro/maxavro.h @@ -55,6 +55,13 @@ typedef struct size_t num_fields; } MAXAVRO_SCHEMA; +enum maxavro_codec +{ + MAXAVRO_CODEC_NULL, + MAXAVRO_CODEC_DEFLATE, + MAXAVRO_CODEC_SNAPPY, /**< Not yet implemented */ +}; + enum maxavro_error { MAXAVRO_ERR_NONE, @@ -68,14 +75,17 @@ typedef struct FILE* file; char* filename; /*< The filename */ MAXAVRO_SCHEMA* schema; + enum maxavro_codec codec; uint64_t blocks_read; /*< Total number of data blocks read */ uint64_t records_read; /*< Total number of records read */ uint64_t bytes_read; /*< Total number of bytes read */ uint64_t records_in_block; uint64_t records_read_from_block; uint64_t bytes_read_from_block; - uint64_t block_size; /*< Size of the block in bytes */ - + uint64_t buffer_size; /*< Size of the block in bytes */ + uint8_t *buffer; /**< The uncompressed data */ + uint8_t *buffer_end; /**< The byte after the end of the buffer*/ + uint8_t *buffer_ptr; /**< Pointer to @c buffer which is moved as records are read */ /** The position @c ftell returns before the first record is read */ long header_end_pos; long data_start_pos; @@ -124,48 +134,24 @@ typedef struct avro_map_value int blocks; /*< Number of added key-value blocks */ } MAXAVRO_MAP; -/** Data block generation */ -MAXAVRO_DATABLOCK* maxavro_datablock_allocate(MAXAVRO_FILE *file, size_t buffersize); -void maxavro_datablock_free(MAXAVRO_DATABLOCK* block); -bool maxavro_datablock_finalize(MAXAVRO_DATABLOCK* block); +/** Opening and closing files */ +MAXAVRO_FILE* maxavro_file_open(const char* filename); +void maxavro_file_close(MAXAVRO_FILE *file); -/** Adding values to a datablock. The caller must ensure that the inserted - * values conform to the file schema and that the required amount of fields - * is added before finalizing the block. */ -bool maxavro_datablock_add_integer(MAXAVRO_DATABLOCK *file, uint64_t val); -bool maxavro_datablock_add_string(MAXAVRO_DATABLOCK *file, const char* str); -bool maxavro_datablock_add_float(MAXAVRO_DATABLOCK *file, float val); -bool maxavro_datablock_add_double(MAXAVRO_DATABLOCK *file, double val); - -/** Reading primitives */ -bool maxavro_read_integer(MAXAVRO_FILE *file, uint64_t *val); -char* maxavro_read_string(MAXAVRO_FILE *file); -bool maxavro_skip_string(MAXAVRO_FILE* file); -bool maxavro_read_float(MAXAVRO_FILE *file, float *dest); -bool maxavro_read_double(MAXAVRO_FILE *file, double *dest); - -/** Reading complex types */ -MAXAVRO_MAP* maxavro_map_read(MAXAVRO_FILE *file); -void maxavro_map_free(MAXAVRO_MAP *value); - -/** Reading and seeking records */ +/** Reading records */ json_t* maxavro_record_read_json(MAXAVRO_FILE *file); GWBUF* maxavro_record_read_binary(MAXAVRO_FILE *file); + +/** Navigation of the file */ bool maxavro_record_seek(MAXAVRO_FILE *file, uint64_t offset); bool maxavro_record_set_pos(MAXAVRO_FILE *file, long pos); bool maxavro_next_block(MAXAVRO_FILE *file); -/** File operations */ -MAXAVRO_FILE* maxavro_file_open(const char* filename); -void maxavro_file_close(MAXAVRO_FILE *file); +/** Get binary format header */ GWBUF* maxavro_file_binary_header(MAXAVRO_FILE *file); /** File error functions */ enum maxavro_error maxavro_get_error(MAXAVRO_FILE *file); const char* maxavro_get_error_string(MAXAVRO_FILE *file); -/** Schema creation */ -MAXAVRO_SCHEMA* maxavro_schema_alloc(const char* json); -void maxavro_schema_free(MAXAVRO_SCHEMA* schema); - #endif diff --git a/avro/maxavro_datablock.c b/avro/maxavro_datablock.c index 7ddd1d8d5..1b21d2615 100644 --- a/avro/maxavro_datablock.c +++ b/avro/maxavro_datablock.c @@ -53,8 +53,8 @@ void maxavro_datablock_free(MAXAVRO_DATABLOCK* block) { if (block) { - free(block->buffer); - free(block); + MXS_FREE(block->buffer); + MXS_FREE(block); } } diff --git a/avro/maxavro_file.c b/avro/maxavro_file.c index fc45d2046..aabb5ba0c 100644 --- a/avro/maxavro_file.c +++ b/avro/maxavro_file.c @@ -11,10 +11,11 @@ * Public License. */ -#include "maxavro.h" +#include "maxavro_internal.h" #include #include #include +#include static bool maxavro_read_sync(FILE *file, uint8_t* sync) { @@ -65,7 +66,7 @@ bool maxavro_verify_block(MAXAVRO_FILE *file) if (memcmp(file->sync, sync, SYNC_MARKER_SIZE)) { long pos = ftell(file->file); - long expected = file->data_start_pos + file->block_size + SYNC_MARKER_SIZE; + long expected = file->data_start_pos + file->buffer_size + SYNC_MARKER_SIZE; if (pos != expected) { MXS_ERROR("Sync marker mismatch due to wrong file offset. file is at %ld " @@ -80,37 +81,126 @@ bool maxavro_verify_block(MAXAVRO_FILE *file) /** Increment block count */ file->blocks_read++; - file->bytes_read += file->block_size; + file->bytes_read += file->buffer_size; return true; } +static uint8_t* read_block_data(MAXAVRO_FILE* file, long deflate_size) +{ + uint8_t *temp_buffer = MXS_MALLOC(deflate_size); + uint8_t *buffer = NULL; + + if (temp_buffer && fread(temp_buffer, 1, deflate_size, file->file) == deflate_size) + { + unsigned long inflate_size = 0; + + switch (file->codec) + { + case MAXAVRO_CODEC_NULL: + file->buffer_size = deflate_size; + buffer = temp_buffer; + temp_buffer = NULL; + break; + + case MAXAVRO_CODEC_DEFLATE: + inflate_size = deflate_size * 2; + + if ((buffer = MXS_MALLOC(inflate_size))) + { + z_stream stream; + stream.avail_in = deflate_size; + stream.next_in = temp_buffer; + stream.avail_out = inflate_size; + stream.next_out = buffer; + stream.zalloc = 0; + stream.zfree = 0; + inflateInit2(&stream, -15); + + int rc; + + while((rc = inflate(&stream, Z_FINISH)) == Z_BUF_ERROR) + { + int increment = inflate_size; + uint8_t *temp = MXS_REALLOC(buffer, inflate_size + increment); + + if (temp) + { + buffer = temp; + stream.avail_out += increment; + stream.next_out = buffer + stream.total_out; + inflate_size += increment; + } + else + { + break; + } + } + + if (rc == Z_STREAM_END) + { + file->buffer_size = stream.total_out; + } + else + { + MXS_ERROR("Failed to inflate value: %s", zError(rc)); + MXS_FREE(buffer); + buffer = NULL; + } + + inflateEnd(&stream); + } + break; + + case MAXAVRO_CODEC_SNAPPY: + // TODO: implement snappy compression + break; + + default: + break; + } + + MXS_FREE(temp_buffer); + } + + return buffer; +} + bool maxavro_read_datablock_start(MAXAVRO_FILE* file) { /** The actual start of the binary block */ file->block_start_pos = ftell(file->file); file->metadata_read = false; uint64_t records, bytes; - bool rval = maxavro_read_integer(file, &records) && maxavro_read_integer(file, &bytes); + bool rval = maxavro_read_integer_from_file(file, &records) && + maxavro_read_integer_from_file(file, &bytes); if (rval) { + rval = false; long pos = ftell(file->file); if (pos == -1) { - rval = false; char err[MXS_STRERROR_BUFLEN]; MXS_ERROR("Failed to read datablock start: %d, %s", errno, strerror_r(errno, err, sizeof(err))); } else { - file->block_size = bytes; - file->records_in_block = records; - file->records_read_from_block = 0; - file->data_start_pos = pos; - ss_dassert(file->data_start_pos > file->block_start_pos); - file->metadata_read = true; + MXS_FREE(file->buffer); + file->buffer = read_block_data(file, bytes); + + if (file->buffer) + { + file->buffer_end = file->buffer + file->buffer_size; + file->buffer_ptr = file->buffer; + file->records_in_block = records; + file->records_read_from_block = 0; + file->data_start_pos = pos; + ss_dassert(file->data_start_pos > file->block_start_pos); + file->metadata_read = true; + rval = maxavro_verify_block(file); + } } } else if (maxavro_get_error(file) != MAXAVRO_ERR_NONE) @@ -131,7 +221,7 @@ bool maxavro_read_datablock_start(MAXAVRO_FILE* file) static char* read_schema(MAXAVRO_FILE* file) { char *rval = NULL; - MAXAVRO_MAP* head = maxavro_map_read(file); + MAXAVRO_MAP* head = maxavro_read_map_from_file(file); MAXAVRO_MAP* map = head; while (map) @@ -139,7 +229,25 @@ static char* read_schema(MAXAVRO_FILE* file) if (strcmp(map->key, "avro.schema") == 0) { rval = strdup(map->value); - break; + } + if (strcmp(map->key, "avro.codec") == 0) + { + if (strcmp(map->value, "null") == 0) + { + file->codec = MAXAVRO_CODEC_NULL; + } + else if (strcmp(map->value, "deflate") == 0) + { + file->codec = MAXAVRO_CODEC_DEFLATE; + } + else if (strcmp(map->value, "snappy") == 0) + { + file->codec = MAXAVRO_CODEC_SNAPPY; + } + else + { + MXS_ERROR("Unknown Avro codec: %s", map->value); + } } map = map->next; } @@ -216,7 +324,7 @@ MAXAVRO_FILE* maxavro_file_open(const char* filename) maxavro_schema_free(avrofile->schema); error = true; } - free(schema); + MXS_FREE(schema); } else { @@ -231,8 +339,8 @@ MAXAVRO_FILE* maxavro_file_open(const char* filename) if (error) { fclose(file); - free(avrofile); - free(my_filename); + MXS_FREE(avrofile); + MXS_FREE(my_filename); avrofile = NULL; } @@ -284,9 +392,10 @@ void maxavro_file_close(MAXAVRO_FILE *file) if (file) { fclose(file->file); - free(file->filename); + MXS_FREE(file->buffer); + MXS_FREE(file->filename); maxavro_schema_free(file->schema); - free(file); + MXS_FREE(file); } } diff --git a/avro/maxavro_internal.h b/avro/maxavro_internal.h new file mode 100644 index 000000000..23c7b231d --- /dev/null +++ b/avro/maxavro_internal.h @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2016 MariaDB Corporation Ab + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file and at www.mariadb.com/bsl11. + * + * Change Date: 2019-07-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2 or later of the General + * Public License. + */ + +#pragma once + +#include "maxavro.h" +#include + +/** + * Private header for maxavro + */ + +/** Reading primitives */ +bool maxavro_read_integer(MAXAVRO_FILE *file, uint64_t *val); +char* maxavro_read_string(MAXAVRO_FILE *file); +bool maxavro_skip_string(MAXAVRO_FILE* file); +bool maxavro_read_float(MAXAVRO_FILE *file, float *dest); +bool maxavro_read_double(MAXAVRO_FILE *file, double *dest); + +/** Only used when opening the file */ +bool maxavro_read_integer_from_file(MAXAVRO_FILE *file, uint64_t *val); + +/** Reading complex types */ +MAXAVRO_MAP* maxavro_read_map_from_file(MAXAVRO_FILE *file); +void maxavro_map_free(MAXAVRO_MAP *value); + +/** + * The following functionality is not yet fully implemented + */ + +/** Schema creation */ +MAXAVRO_SCHEMA* maxavro_schema_alloc(const char* json); +void maxavro_schema_free(MAXAVRO_SCHEMA* schema); + +/** Data block generation */ +MAXAVRO_DATABLOCK* maxavro_datablock_allocate(MAXAVRO_FILE *file, size_t buffersize); +void maxavro_datablock_free(MAXAVRO_DATABLOCK* block); +bool maxavro_datablock_finalize(MAXAVRO_DATABLOCK* block); + +/** Adding values to a datablock. The caller must ensure that the inserted + * values conform to the file schema and that the required amount of fields + * is added before finalizing the block. */ +bool maxavro_datablock_add_integer(MAXAVRO_DATABLOCK *file, uint64_t val); +bool maxavro_datablock_add_string(MAXAVRO_DATABLOCK *file, const char* str); +bool maxavro_datablock_add_float(MAXAVRO_DATABLOCK *file, float val); +bool maxavro_datablock_add_double(MAXAVRO_DATABLOCK *file, double val); diff --git a/avro/maxavro_record.c b/avro/maxavro_record.c index fb5b6b5f3..586e3bcb8 100644 --- a/avro/maxavro_record.c +++ b/avro/maxavro_record.c @@ -12,7 +12,7 @@ */ #include -#include "maxavro.h" +#include "maxavro_internal.h" #include #include #include @@ -36,12 +36,11 @@ static json_t* read_and_pack_value(MAXAVRO_FILE *file, MAXAVRO_SCHEMA_FIELD *fie switch (field->type) { case MAXAVRO_TYPE_BOOL: + if (file->buffer_ptr < file->buffer_end) { int i = 0; - if (fread(&i, 1, 1, file->file) == 1) - { - value = json_pack("b", i); - } + memcpy(&i, file->buffer_ptr++, 1); + value = json_pack("b", i); } break; @@ -103,7 +102,7 @@ static json_t* read_and_pack_value(MAXAVRO_FILE *file, MAXAVRO_SCHEMA_FIELD *fie if (str) { value = json_string(str); - free(str); + MXS_FREE(str); } } break; @@ -220,19 +219,7 @@ bool maxavro_next_block(MAXAVRO_FILE *file) { if (file->last_error == MAXAVRO_ERR_NONE) { - if (file->records_read_from_block < file->records_in_block) - { - file->records_read += file->records_in_block - file->records_read_from_block; - long curr_pos = ftell(file->file); - long offset = (long) file->block_size - (curr_pos - file->data_start_pos); - - if (offset > 0) - { - fseek(file->file, offset, SEEK_CUR); - } - } - - return maxavro_verify_block(file) && maxavro_read_datablock_start(file); + return maxavro_read_datablock_start(file); } return false; } @@ -268,7 +255,7 @@ bool maxavro_record_seek(MAXAVRO_FILE *file, uint64_t offset) { /** Skip full blocks that don't have the position we want */ offset -= file->records_in_block; - fseek(file->file, file->block_size, SEEK_CUR); + fseek(file->file, file->buffer_size, SEEK_CUR); maxavro_next_block(file); } @@ -321,7 +308,7 @@ GWBUF* maxavro_record_read_binary(MAXAVRO_FILE *file) return NULL; } - long data_size = (file->data_start_pos - file->block_start_pos) + file->block_size; + long data_size = (file->data_start_pos - file->block_start_pos) + file->buffer_size; ss_dassert(data_size > 0); rval = gwbuf_alloc(data_size + SYNC_MARKER_SIZE); diff --git a/avro/maxavro_schema.c b/avro/maxavro_schema.c index 1586b62ff..5bb6fa4a4 100644 --- a/avro/maxavro_schema.c +++ b/avro/maxavro_schema.c @@ -11,7 +11,7 @@ * Public License. */ -#include "maxavro.h" +#include "maxavro_internal.h" #include #include #include @@ -152,7 +152,7 @@ MAXAVRO_SCHEMA* maxavro_schema_alloc(const char* json) for (int j = 0; j < i; j++) { - free(rval->fields[j].name); + MXS_FREE(rval->fields[j].name); } break; } @@ -174,7 +174,7 @@ MAXAVRO_SCHEMA* maxavro_schema_alloc(const char* json) if (error) { - free(rval); + MXS_FREE(rval); rval = NULL; } } @@ -190,7 +190,7 @@ static void maxavro_schema_field_free(MAXAVRO_SCHEMA_FIELD *field) { if (field) { - free(field->name); + MXS_FREE(field->name); if (field->type == MAXAVRO_TYPE_ENUM) { json_decref((json_t*)field->extra); @@ -210,7 +210,7 @@ void maxavro_schema_free(MAXAVRO_SCHEMA* schema) { maxavro_schema_field_free(&schema->fields[i]); } - free(schema->fields); - free(schema); + MXS_FREE(schema->fields); + MXS_FREE(schema); } } diff --git a/avro/maxavro_write.c b/avro/maxavro_write.c index 9847915ec..1ea2abc4a 100644 --- a/avro/maxavro_write.c +++ b/avro/maxavro_write.c @@ -14,7 +14,7 @@ #include #include #include -#include "maxavro.h" +#include "maxavro_internal.h" #include #include diff --git a/avro/maxavrocheck.c b/avro/maxavrocheck.c index 3ddb94fad..191802515 100644 --- a/avro/maxavrocheck.c +++ b/avro/maxavrocheck.c @@ -89,7 +89,7 @@ int check_file(const char* filename) if (verbose && !dump) { printf("Block %lu: %lu records, %lu bytes\n", file->blocks_read, - file->records_in_block, file->block_size); + file->records_in_block, file->buffer_size); } } while (num_rows != 0 && maxavro_next_block(file)); diff --git a/server/modules/routing/avrorouter/avro_client.c b/server/modules/routing/avrorouter/avro_client.c index 9957524db..6bdc8490e 100644 --- a/server/modules/routing/avrorouter/avro_client.c +++ b/server/modules/routing/avrorouter/avro_client.c @@ -615,7 +615,7 @@ static bool stream_json(AVRO_CLIENT *client) set_current_gtid(client, row); json_decref(row); } - bytes += file->block_size; + bytes += file->buffer_size; } while (maxavro_next_block(file) && bytes < AVRO_DATA_BURST_SIZE); @@ -639,7 +639,7 @@ static bool stream_binary(AVRO_CLIENT *client) while (rc > 0 && bytes < AVRO_DATA_BURST_SIZE) { - bytes += file->block_size; + bytes += file->buffer_size; if ((buffer = maxavro_record_read_binary(file))) { rc = dcb->func.write(dcb, buffer);