Add workaround for null value handling in Avro C API

The Avro C API fails to write bytes of size zero. A workaround is to write
a single zero byte for each NULL field of type bytes.

Also added an option to configure the Avro block size in case very large
records are written.
This commit is contained in:
Markus Mäkelä 2017-03-04 10:08:52 +02:00
parent 09df0acb00
commit f2fc9b9d9f
5 changed files with 38 additions and 9 deletions

View File

@ -142,6 +142,11 @@ data block. The default value is 1 transaction.
Controls the number of row events that are grouped into a single Avro
data block. The default value is 1000 row events.
#### `block_size`
The Avro data block size in bytes. The default is 16 kilobytes. Increase this
value if individual events in the binary logs are very large.
# Files Created by the Avrorouter
The avrorouter creates two files in the location pointed by _avrodir_:

View File

@ -261,6 +261,7 @@ typedef struct avro_instance
uint64_t row_count; /*< Row events processed */
uint64_t row_target; /*< Minimum about of row events that will trigger
* a flush of all tables */
uint64_t block_size; /**< Avro datablock size */
struct avro_instance *next;
} AVRO_INSTANCE;
@ -278,7 +279,7 @@ extern void avro_client_rotate(AVRO_INSTANCE *router, AVRO_CLIENT *client, uint8
extern bool avro_open_binlog(const char *binlogdir, const char *file, int *fd);
extern void avro_close_binlog(int fd);
extern avro_binlog_end_t avro_read_all_events(AVRO_INSTANCE *router);
extern AVRO_TABLE* avro_table_alloc(const char* filepath, const char* json_schema);
extern AVRO_TABLE* avro_table_alloc(const char* filepath, const char* json_schema, size_t block_size);
extern void* avro_table_free(AVRO_TABLE *table);
extern void avro_flush_all_tables(AVRO_INSTANCE *router);
extern char* json_new_schema_from_table(TABLE_MAP *map);

View File

@ -332,6 +332,7 @@ createInstance(SERVICE *service, char **options)
inst->trx_count = 0;
inst->row_target = AVRO_DEFAULT_BLOCK_ROW_COUNT;
inst->trx_target = AVRO_DEFAULT_BLOCK_TRX_COUNT;
inst->block_size = 0;
int first_file = 1;
bool err = false;
@ -402,6 +403,10 @@ createInstance(SERVICE *service, char **options)
{
first_file = MAX(1, atoi(value));
}
else if (strcmp(options[i], "block_size") == 0)
{
inst->block_size = atoi(value);
}
else
{
MXS_WARNING("[avrorouter] Unknown router option: '%s'", options[i]);

View File

@ -105,7 +105,7 @@ void avro_close_binlog(int fd)
* @param filepath Path to the created file
* @param json_schema The schema of the table in JSON format
*/
AVRO_TABLE* avro_table_alloc(const char* filepath, const char* json_schema)
AVRO_TABLE* avro_table_alloc(const char* filepath, const char* json_schema, size_t block_size)
{
AVRO_TABLE *table = calloc(1, sizeof(AVRO_TABLE));
if (table)
@ -126,7 +126,7 @@ AVRO_TABLE* avro_table_alloc(const char* filepath, const char* json_schema)
}
else
{
rc = avro_file_writer_create(filepath, table->avro_schema, &table->avro_file);
rc = avro_file_writer_create_with_codec(filepath, table->avro_schema, &table->avro_file, "null", block_size);
}
if (rc)

View File

@ -104,7 +104,7 @@ bool handle_table_map_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr
/** Close the file and open a new one */
hashtable_delete(router->open_tables, table_ident);
AVRO_TABLE *avro_table = avro_table_alloc(filepath, json_schema);
AVRO_TABLE *avro_table = avro_table_alloc(filepath, json_schema, router->block_size);
if (avro_table)
{
@ -296,7 +296,11 @@ bool handle_row_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr)
int event_type = get_event_type(hdr->event_type);
prepare_record(router, hdr, event_type, &record);
ptr = process_row_event_data(map, create, &record, ptr, col_present, end);
avro_file_writer_append_value(table->avro_file, &record);
if (avro_file_writer_append_value(table->avro_file, &record))
{
MXS_ERROR("Failed to write value at position %ld: %s",
router->current_pos, avro_strerror());
}
/** Update rows events have the before and after images of the
* affected rows so we'll process them as another record with
@ -305,7 +309,11 @@ bool handle_row_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr)
{
prepare_record(router, hdr, UPDATE_EVENT_AFTER, &record);
ptr = process_row_event_data(map, create, &record, ptr, col_present, end);
avro_file_writer_append_value(table->avro_file, &record);
if (avro_file_writer_append_value(table->avro_file, &record))
{
MXS_ERROR("Failed to write value at position %ld: %s",
router->current_pos, avro_strerror());
}
}
rows++;
@ -501,14 +509,23 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
for (long i = 0; i < map->columns && npresent < ncolumns; i++)
{
ss_dassert(create->columns == map->columns);
avro_value_get_by_name(record, create->column_names[i], &field, NULL);
ss_debug(int rc = )avro_value_get_by_name(record, create->column_names[i], &field, NULL);
ss_dassert(rc == 0);
if (bit_is_set(columns_present, ncolumns, i))
{
npresent++;
if (bit_is_set(null_bitmap, ncolumns, i))
{
avro_value_set_null(&field);
if (column_is_blob(map->column_types[i]))
{
uint8_t nullvalue = 0;
avro_value_set_bytes(&field, &nullvalue, 1);
}
else
{
avro_value_set_null(&field);
}
}
else if (column_is_fixed_string(map->column_types[i]))
{
@ -604,7 +621,8 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
}
else
{
avro_value_set_null(&field);
uint8_t nullvalue = 0;
avro_value_set_bytes(&field, &nullvalue, 1);
}
ss_dassert(ptr < end);
}