From dadc0d6a9dd58216f878cb3520404e000c755379 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Sat, 4 Mar 2017 00:26:51 +0200 Subject: [PATCH 1/3] Fix DATETIME and BLOB processing The old DATETIME format wasn't processed properly which caused a corruption of following events. A BLOB type value could be non-NULL but still have no data. In this case, the value should be stored as a null Avro value. --- server/core/mysql_binlog.c | 22 ++++++++++++++++------ server/modules/routing/avro/avro_rbr.c | 15 +++++++++++---- 2 files changed, 27 insertions(+), 10 deletions(-) diff --git a/server/core/mysql_binlog.c b/server/core/mysql_binlog.c index 5e0539679..ddf5825c0 100644 --- a/server/core/mysql_binlog.c +++ b/server/core/mysql_binlog.c @@ -98,6 +98,7 @@ const char* column_type_to_string(uint8_t type) case TABLE_COL_TYPE_GEOMETRY: return "GEOMETRY"; default: + ss_dassert(false); break; } return "UNKNOWN"; @@ -216,7 +217,6 @@ static void unpack_year(uint8_t *ptr, struct tm *dest) dest->tm_year = *ptr; } -#ifdef USE_OLD_DATETIME /** * @brief Unpack a DATETIME * @@ -225,8 +225,10 @@ static void unpack_year(uint8_t *ptr, struct tm *dest) * @param val Value read from the binary log * @param dest Pointer where the unpacked value is stored */ -static void unpack_datetime(uint8_t *ptr, uint8_t decimals, struct tm *dest) +static void unpack_datetime(uint8_t *ptr, struct tm *dest) { + uint64_t val = 0; + memcpy(&val, ptr, sizeof(val)); uint32_t second = val - ((val / 100) * 100); val /= 100; uint32_t minute = val - ((val / 100) * 100); @@ -241,13 +243,12 @@ static void unpack_datetime(uint8_t *ptr, uint8_t decimals, struct tm *dest) memset(dest, 0, sizeof(struct tm)); dest->tm_year = year - 1900; - dest->tm_mon = month; + dest->tm_mon = month - 1; dest->tm_mday = day; dest->tm_hour = hour; dest->tm_min = minute; dest->tm_sec = second; } -#endif /** * Unpack a 5 byte reverse byte order value @@ -413,6 +414,8 @@ static size_t temporal_field_size(uint8_t type, uint8_t decimals) return 3 + ((decimals + 1) / 2); case TABLE_COL_TYPE_DATETIME: + return 8; + case TABLE_COL_TYPE_TIMESTAMP: return 4; @@ -448,8 +451,7 @@ size_t unpack_temporal_value(uint8_t type, uint8_t *ptr, uint8_t *metadata, stru break; case TABLE_COL_TYPE_DATETIME: - // This is not used with MariaDB RBR - //unpack_datetime(ptr, *metadata, tm); + unpack_datetime(ptr, tm); break; case TABLE_COL_TYPE_DATETIME2: @@ -468,6 +470,10 @@ size_t unpack_temporal_value(uint8_t type, uint8_t *ptr, uint8_t *metadata, stru case TABLE_COL_TYPE_TIMESTAMP2: unpack_timestamp(ptr, *metadata, tm); break; + + default: + ss_dassert(false); + break; } return temporal_field_size(type, *metadata); } @@ -597,6 +603,10 @@ static uint64_t unpack_bytes(uint8_t *ptr, size_t bytes) ((uint64_t)ptr[3] << 32) | ((uint64_t)ptr[2] << 40) | ((uint64_t)ptr[1] << 48) | ((uint64_t)ptr[0] << 56); break; + + default: + ss_dassert(false); + break; } return val; diff --git a/server/modules/routing/avro/avro_rbr.c b/server/modules/routing/avro/avro_rbr.c index 3ac3cf8d5..7390a6dc0 100644 --- a/server/modules/routing/avro/avro_rbr.c +++ b/server/modules/routing/avro/avro_rbr.c @@ -271,7 +271,6 @@ bool handle_row_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr) /** There should always be a table map event prior to a row event. * TODO: Make the active_maps dynamic */ TABLE_MAP *map = router->active_maps[table_id % sizeof(router->active_maps)]; - ss_dassert(map); if (map) { @@ -289,10 +288,11 @@ bool handle_row_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr) * beforehand so we must continue processing them until we reach the end * of the event. */ int rows = 0; + while (ptr - start < hdr->event_size - BINLOG_EVENT_HDR_LEN) { /** Add the current GTID and timestamp */ - uint8_t *end = ptr + hdr->event_size; + uint8_t *end = ptr + hdr->event_size - BINLOG_EVENT_HDR_LEN; int event_type = get_event_type(hdr->event_type); prepare_record(router, hdr, event_type, &record); ptr = process_row_event_data(map, create, &record, ptr, col_present, end); @@ -597,8 +597,15 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value uint64_t len = 0; memcpy(&len, ptr, bytes); ptr += bytes; - avro_value_set_bytes(&field, ptr, len); - ptr += len; + if (len) + { + avro_value_set_bytes(&field, ptr, len); + ptr += len; + } + else + { + avro_value_set_null(&field); + } ss_dassert(ptr < end); } else if (column_is_temporal(map->column_types[i])) From 09df0acb008a8e1e3d1451b3b5294a45c9dae520 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Sat, 4 Mar 2017 00:31:07 +0200 Subject: [PATCH 2/3] Fix binlog rotation detection The rotations of binlogs weren't detected as the file names weren't compared. Moved the indexing of the binlogs to the end of the binlog processing. This way the files can be flushed multiple times before they are indexed. --- server/modules/routing/avro/avro.c | 8 +++++++- server/modules/routing/avro/avro_file.c | 3 --- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/server/modules/routing/avro/avro.c b/server/modules/routing/avro/avro.c index 398549817..8a7cfd65b 100644 --- a/server/modules/routing/avro/avro.c +++ b/server/modules/routing/avro/avro.c @@ -1000,14 +1000,20 @@ void converter_func(void* data) while (ok && binlog_end == AVRO_OK) { uint64_t start_pos = router->current_pos; + char binlog_name[BINLOG_FNAMELEN + 1]; + strcpy(binlog_name, router->binlog_name); + if (avro_open_binlog(router->binlogdir, router->binlog_name, &router->binlog_fd)) { binlog_end = avro_read_all_events(router); - if (router->current_pos != start_pos) + if (router->current_pos != start_pos || strcmp(binlog_name, router->binlog_name) != 0) { /** We processed some data, reset the conversion task delay */ router->task_delay = 1; + + /** Update the GTID index */ + avro_update_index(router); } avro_close_binlog(router->binlog_fd); diff --git a/server/modules/routing/avro/avro_file.c b/server/modules/routing/avro/avro_file.c index c1d65b1b0..cfa9e66cd 100644 --- a/server/modules/routing/avro/avro_file.c +++ b/server/modules/routing/avro/avro_file.c @@ -867,9 +867,6 @@ void avro_flush_all_tables(AVRO_INSTANCE *router) } hashtable_iterator_free(iter); } - - /** Update the GTID index */ - avro_update_index(router); } /** From f2fc9b9d9f32596364f1eda639e9f837d8a9f050 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Sat, 4 Mar 2017 10:08:52 +0200 Subject: [PATCH 3/3] Add workaround for null value handling in Avro C API The Avro C API fails to write bytes of size zero. A workaround is to write a single zero byte for each NULL field of type bytes. Also added an option to configure the Avro block size in case very large records are written. --- Documentation/Routers/Avrorouter.md | 5 +++++ server/modules/include/avrorouter.h | 3 ++- server/modules/routing/avro/avro.c | 5 +++++ server/modules/routing/avro/avro_file.c | 4 ++-- server/modules/routing/avro/avro_rbr.c | 30 ++++++++++++++++++++----- 5 files changed, 38 insertions(+), 9 deletions(-) diff --git a/Documentation/Routers/Avrorouter.md b/Documentation/Routers/Avrorouter.md index f9e376ab7..ad58d3105 100644 --- a/Documentation/Routers/Avrorouter.md +++ b/Documentation/Routers/Avrorouter.md @@ -142,6 +142,11 @@ data block. The default value is 1 transaction. Controls the number of row events that are grouped into a single Avro data block. The default value is 1000 row events. +#### `block_size` + +The Avro data block size in bytes. The default is 16 kilobytes. Increase this +value if individual events in the binary logs are very large. + # Files Created by the Avrorouter The avrorouter creates two files in the location pointed by _avrodir_: diff --git a/server/modules/include/avrorouter.h b/server/modules/include/avrorouter.h index b853ca57e..a233915b4 100644 --- a/server/modules/include/avrorouter.h +++ b/server/modules/include/avrorouter.h @@ -261,6 +261,7 @@ typedef struct avro_instance uint64_t row_count; /*< Row events processed */ uint64_t row_target; /*< Minimum about of row events that will trigger * a flush of all tables */ + uint64_t block_size; /**< Avro datablock size */ struct avro_instance *next; } AVRO_INSTANCE; @@ -278,7 +279,7 @@ extern void avro_client_rotate(AVRO_INSTANCE *router, AVRO_CLIENT *client, uint8 extern bool avro_open_binlog(const char *binlogdir, const char *file, int *fd); extern void avro_close_binlog(int fd); extern avro_binlog_end_t avro_read_all_events(AVRO_INSTANCE *router); -extern AVRO_TABLE* avro_table_alloc(const char* filepath, const char* json_schema); +extern AVRO_TABLE* avro_table_alloc(const char* filepath, const char* json_schema, size_t block_size); extern void* avro_table_free(AVRO_TABLE *table); extern void avro_flush_all_tables(AVRO_INSTANCE *router); extern char* json_new_schema_from_table(TABLE_MAP *map); diff --git a/server/modules/routing/avro/avro.c b/server/modules/routing/avro/avro.c index 8a7cfd65b..3306b4052 100644 --- a/server/modules/routing/avro/avro.c +++ b/server/modules/routing/avro/avro.c @@ -332,6 +332,7 @@ createInstance(SERVICE *service, char **options) inst->trx_count = 0; inst->row_target = AVRO_DEFAULT_BLOCK_ROW_COUNT; inst->trx_target = AVRO_DEFAULT_BLOCK_TRX_COUNT; + inst->block_size = 0; int first_file = 1; bool err = false; @@ -402,6 +403,10 @@ createInstance(SERVICE *service, char **options) { first_file = MAX(1, atoi(value)); } + else if (strcmp(options[i], "block_size") == 0) + { + inst->block_size = atoi(value); + } else { MXS_WARNING("[avrorouter] Unknown router option: '%s'", options[i]); diff --git a/server/modules/routing/avro/avro_file.c b/server/modules/routing/avro/avro_file.c index cfa9e66cd..41ef968f3 100644 --- a/server/modules/routing/avro/avro_file.c +++ b/server/modules/routing/avro/avro_file.c @@ -105,7 +105,7 @@ void avro_close_binlog(int fd) * @param filepath Path to the created file * @param json_schema The schema of the table in JSON format */ -AVRO_TABLE* avro_table_alloc(const char* filepath, const char* json_schema) +AVRO_TABLE* avro_table_alloc(const char* filepath, const char* json_schema, size_t block_size) { AVRO_TABLE *table = calloc(1, sizeof(AVRO_TABLE)); if (table) @@ -126,7 +126,7 @@ AVRO_TABLE* avro_table_alloc(const char* filepath, const char* json_schema) } else { - rc = avro_file_writer_create(filepath, table->avro_schema, &table->avro_file); + rc = avro_file_writer_create_with_codec(filepath, table->avro_schema, &table->avro_file, "null", block_size); } if (rc) diff --git a/server/modules/routing/avro/avro_rbr.c b/server/modules/routing/avro/avro_rbr.c index 7390a6dc0..feb4058ff 100644 --- a/server/modules/routing/avro/avro_rbr.c +++ b/server/modules/routing/avro/avro_rbr.c @@ -104,7 +104,7 @@ bool handle_table_map_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr /** Close the file and open a new one */ hashtable_delete(router->open_tables, table_ident); - AVRO_TABLE *avro_table = avro_table_alloc(filepath, json_schema); + AVRO_TABLE *avro_table = avro_table_alloc(filepath, json_schema, router->block_size); if (avro_table) { @@ -296,7 +296,11 @@ bool handle_row_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr) int event_type = get_event_type(hdr->event_type); prepare_record(router, hdr, event_type, &record); ptr = process_row_event_data(map, create, &record, ptr, col_present, end); - avro_file_writer_append_value(table->avro_file, &record); + if (avro_file_writer_append_value(table->avro_file, &record)) + { + MXS_ERROR("Failed to write value at position %ld: %s", + router->current_pos, avro_strerror()); + } /** Update rows events have the before and after images of the * affected rows so we'll process them as another record with @@ -305,7 +309,11 @@ bool handle_row_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr) { prepare_record(router, hdr, UPDATE_EVENT_AFTER, &record); ptr = process_row_event_data(map, create, &record, ptr, col_present, end); - avro_file_writer_append_value(table->avro_file, &record); + if (avro_file_writer_append_value(table->avro_file, &record)) + { + MXS_ERROR("Failed to write value at position %ld: %s", + router->current_pos, avro_strerror()); + } } rows++; @@ -501,14 +509,23 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value for (long i = 0; i < map->columns && npresent < ncolumns; i++) { ss_dassert(create->columns == map->columns); - avro_value_get_by_name(record, create->column_names[i], &field, NULL); + ss_debug(int rc = )avro_value_get_by_name(record, create->column_names[i], &field, NULL); + ss_dassert(rc == 0); if (bit_is_set(columns_present, ncolumns, i)) { npresent++; if (bit_is_set(null_bitmap, ncolumns, i)) { - avro_value_set_null(&field); + if (column_is_blob(map->column_types[i])) + { + uint8_t nullvalue = 0; + avro_value_set_bytes(&field, &nullvalue, 1); + } + else + { + avro_value_set_null(&field); + } } else if (column_is_fixed_string(map->column_types[i])) { @@ -604,7 +621,8 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value } else { - avro_value_set_null(&field); + uint8_t nullvalue = 0; + avro_value_set_bytes(&field, &nullvalue, 1); } ss_dassert(ptr < end); }