From f2fc9b9d9f32596364f1eda639e9f837d8a9f050 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Sat, 4 Mar 2017 10:08:52 +0200 Subject: [PATCH] Add workaround for null value handling in Avro C API The Avro C API fails to write bytes of size zero. A workaround is to write a single zero byte for each NULL field of type bytes. Also added an option to configure the Avro block size in case very large records are written. --- Documentation/Routers/Avrorouter.md | 5 +++++ server/modules/include/avrorouter.h | 3 ++- server/modules/routing/avro/avro.c | 5 +++++ server/modules/routing/avro/avro_file.c | 4 ++-- server/modules/routing/avro/avro_rbr.c | 30 ++++++++++++++++++++----- 5 files changed, 38 insertions(+), 9 deletions(-) diff --git a/Documentation/Routers/Avrorouter.md b/Documentation/Routers/Avrorouter.md index f9e376ab7..ad58d3105 100644 --- a/Documentation/Routers/Avrorouter.md +++ b/Documentation/Routers/Avrorouter.md @@ -142,6 +142,11 @@ data block. The default value is 1 transaction. Controls the number of row events that are grouped into a single Avro data block. The default value is 1000 row events. +#### `block_size` + +The Avro data block size in bytes. The default is 16 kilobytes. Increase this +value if individual events in the binary logs are very large. + # Files Created by the Avrorouter The avrorouter creates two files in the location pointed by _avrodir_: diff --git a/server/modules/include/avrorouter.h b/server/modules/include/avrorouter.h index b853ca57e..a233915b4 100644 --- a/server/modules/include/avrorouter.h +++ b/server/modules/include/avrorouter.h @@ -261,6 +261,7 @@ typedef struct avro_instance uint64_t row_count; /*< Row events processed */ uint64_t row_target; /*< Minimum about of row events that will trigger * a flush of all tables */ + uint64_t block_size; /**< Avro datablock size */ struct avro_instance *next; } AVRO_INSTANCE; @@ -278,7 +279,7 @@ extern void avro_client_rotate(AVRO_INSTANCE *router, AVRO_CLIENT *client, uint8 extern bool avro_open_binlog(const char *binlogdir, const char *file, int *fd); extern void avro_close_binlog(int fd); extern avro_binlog_end_t avro_read_all_events(AVRO_INSTANCE *router); -extern AVRO_TABLE* avro_table_alloc(const char* filepath, const char* json_schema); +extern AVRO_TABLE* avro_table_alloc(const char* filepath, const char* json_schema, size_t block_size); extern void* avro_table_free(AVRO_TABLE *table); extern void avro_flush_all_tables(AVRO_INSTANCE *router); extern char* json_new_schema_from_table(TABLE_MAP *map); diff --git a/server/modules/routing/avro/avro.c b/server/modules/routing/avro/avro.c index 8a7cfd65b..3306b4052 100644 --- a/server/modules/routing/avro/avro.c +++ b/server/modules/routing/avro/avro.c @@ -332,6 +332,7 @@ createInstance(SERVICE *service, char **options) inst->trx_count = 0; inst->row_target = AVRO_DEFAULT_BLOCK_ROW_COUNT; inst->trx_target = AVRO_DEFAULT_BLOCK_TRX_COUNT; + inst->block_size = 0; int first_file = 1; bool err = false; @@ -402,6 +403,10 @@ createInstance(SERVICE *service, char **options) { first_file = MAX(1, atoi(value)); } + else if (strcmp(options[i], "block_size") == 0) + { + inst->block_size = atoi(value); + } else { MXS_WARNING("[avrorouter] Unknown router option: '%s'", options[i]); diff --git a/server/modules/routing/avro/avro_file.c b/server/modules/routing/avro/avro_file.c index cfa9e66cd..41ef968f3 100644 --- a/server/modules/routing/avro/avro_file.c +++ b/server/modules/routing/avro/avro_file.c @@ -105,7 +105,7 @@ void avro_close_binlog(int fd) * @param filepath Path to the created file * @param json_schema The schema of the table in JSON format */ -AVRO_TABLE* avro_table_alloc(const char* filepath, const char* json_schema) +AVRO_TABLE* avro_table_alloc(const char* filepath, const char* json_schema, size_t block_size) { AVRO_TABLE *table = calloc(1, sizeof(AVRO_TABLE)); if (table) @@ -126,7 +126,7 @@ AVRO_TABLE* avro_table_alloc(const char* filepath, const char* json_schema) } else { - rc = avro_file_writer_create(filepath, table->avro_schema, &table->avro_file); + rc = avro_file_writer_create_with_codec(filepath, table->avro_schema, &table->avro_file, "null", block_size); } if (rc) diff --git a/server/modules/routing/avro/avro_rbr.c b/server/modules/routing/avro/avro_rbr.c index 7390a6dc0..feb4058ff 100644 --- a/server/modules/routing/avro/avro_rbr.c +++ b/server/modules/routing/avro/avro_rbr.c @@ -104,7 +104,7 @@ bool handle_table_map_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr /** Close the file and open a new one */ hashtable_delete(router->open_tables, table_ident); - AVRO_TABLE *avro_table = avro_table_alloc(filepath, json_schema); + AVRO_TABLE *avro_table = avro_table_alloc(filepath, json_schema, router->block_size); if (avro_table) { @@ -296,7 +296,11 @@ bool handle_row_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr) int event_type = get_event_type(hdr->event_type); prepare_record(router, hdr, event_type, &record); ptr = process_row_event_data(map, create, &record, ptr, col_present, end); - avro_file_writer_append_value(table->avro_file, &record); + if (avro_file_writer_append_value(table->avro_file, &record)) + { + MXS_ERROR("Failed to write value at position %ld: %s", + router->current_pos, avro_strerror()); + } /** Update rows events have the before and after images of the * affected rows so we'll process them as another record with @@ -305,7 +309,11 @@ bool handle_row_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr) { prepare_record(router, hdr, UPDATE_EVENT_AFTER, &record); ptr = process_row_event_data(map, create, &record, ptr, col_present, end); - avro_file_writer_append_value(table->avro_file, &record); + if (avro_file_writer_append_value(table->avro_file, &record)) + { + MXS_ERROR("Failed to write value at position %ld: %s", + router->current_pos, avro_strerror()); + } } rows++; @@ -501,14 +509,23 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value for (long i = 0; i < map->columns && npresent < ncolumns; i++) { ss_dassert(create->columns == map->columns); - avro_value_get_by_name(record, create->column_names[i], &field, NULL); + ss_debug(int rc = )avro_value_get_by_name(record, create->column_names[i], &field, NULL); + ss_dassert(rc == 0); if (bit_is_set(columns_present, ncolumns, i)) { npresent++; if (bit_is_set(null_bitmap, ncolumns, i)) { - avro_value_set_null(&field); + if (column_is_blob(map->column_types[i])) + { + uint8_t nullvalue = 0; + avro_value_set_bytes(&field, &nullvalue, 1); + } + else + { + avro_value_set_null(&field); + } } else if (column_is_fixed_string(map->column_types[i])) { @@ -604,7 +621,8 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value } else { - avro_value_set_null(&field); + uint8_t nullvalue = 0; + avro_value_set_bytes(&field, &nullvalue, 1); } ss_dassert(ptr < end); }