diff --git a/server/modules/routing/avrorouter/avro_file.c b/server/modules/routing/avrorouter/avro_file.c index f902f52be..71f54432a 100644 --- a/server/modules/routing/avrorouter/avro_file.c +++ b/server/modules/routing/avrorouter/avro_file.c @@ -971,6 +971,8 @@ bool save_and_replace_table_create(AVRO_INSTANCE *router, TABLE_CREATE *created) { if (strcmp(key, table_ident) == 0) { + TABLE_MAP* map = hashtable_fetch(router->table_maps, key); + router->active_maps[map->id % MAX_MAPPED_TABLES] = NULL; hashtable_delete(router->table_maps, key); } } diff --git a/server/modules/routing/avrorouter/avro_rbr.c b/server/modules/routing/avrorouter/avro_rbr.c index 2df4a07c6..f394bad66 100644 --- a/server/modules/routing/avrorouter/avro_rbr.c +++ b/server/modules/routing/avrorouter/avro_rbr.c @@ -88,70 +88,51 @@ bool handle_table_map_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr { ss_dassert(create->columns > 0); TABLE_MAP *old = hashtable_fetch(router->table_maps, table_ident); + TABLE_MAP *map = table_map_alloc(ptr, ev_len, create); + MXS_ABORT_IF_NULL(map); // Fatal error at this point + char* json_schema = json_new_schema_from_table(map); - if (old == NULL || old->version != create->version) + if (json_schema) { - TABLE_MAP *map = table_map_alloc(ptr, ev_len, create); + char filepath[PATH_MAX + 1]; + snprintf(filepath, sizeof(filepath), "%s/%s.%06d.avro", + router->avrodir, table_ident, map->version); - if (map) + /** Close the file and open a new one */ + hashtable_delete(router->open_tables, table_ident); + AVRO_TABLE *avro_table = avro_table_alloc(filepath, json_schema, router->block_size); + + if (avro_table) { - char* json_schema = json_new_schema_from_table(map); + bool notify = old != NULL; - if (json_schema) + if (old) { - char filepath[PATH_MAX + 1]; - snprintf(filepath, sizeof(filepath), "%s/%s.%06d.avro", - router->avrodir, table_ident, map->version); - - /** Close the file and open a new one */ - hashtable_delete(router->open_tables, table_ident); - AVRO_TABLE *avro_table = avro_table_alloc(filepath, json_schema, router->block_size); - - if (avro_table) - { - bool notify = old != NULL; - - if (old) - { - router->active_maps[old->id % MAX_MAPPED_TABLES] = NULL; - } - hashtable_delete(router->table_maps, table_ident); - hashtable_add(router->table_maps, (void*) table_ident, map); - hashtable_add(router->open_tables, table_ident, avro_table); - save_avro_schema(router->avrodir, json_schema, map); - router->active_maps[map->id % MAX_MAPPED_TABLES] = map; - MXS_DEBUG("Table %s mapped to %lu", table_ident, map->id); - rval = true; - - if (notify) - { - notify_all_clients(router); - } - } - else - { - MXS_ERROR("Failed to open new Avro file for writing."); - } - MXS_FREE(json_schema); + router->active_maps[old->id % MAX_MAPPED_TABLES] = NULL; } - else + hashtable_delete(router->table_maps, table_ident); + hashtable_add(router->table_maps, (void*)table_ident, map); + hashtable_add(router->open_tables, table_ident, avro_table); + save_avro_schema(router->avrodir, json_schema, map); + router->active_maps[map->id % MAX_MAPPED_TABLES] = map; + ss_dassert(router->active_maps[id % MAX_MAPPED_TABLES] == map); + MXS_DEBUG("Table %s mapped to %lu", table_ident, map->id); + rval = true; + + if (notify) { - MXS_ERROR("Failed to create JSON schema."); + notify_all_clients(router); } } else { - MXS_ERROR("Failed to allocate new table map."); + MXS_ERROR("Failed to open new Avro file for writing."); } + MXS_FREE(json_schema); } else { - router->active_maps[old->id % MAX_MAPPED_TABLES] = NULL; - table_map_remap(ptr, ev_len, old); - router->active_maps[old->id % MAX_MAPPED_TABLES] = old; - MXS_DEBUG("Table %s re-mapped to %lu", table_ident, old->id); - /** No changes in the schema */ - rval = true; + MXS_ERROR("Failed to create JSON schema."); } } else @@ -345,8 +326,9 @@ bool handle_row_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr) } else { - MXS_ERROR("Row event and table map event have different column counts." - " Only full row image is currently supported."); + MXS_ERROR("Row event and table map event have different column " + "counts for table %s.%s, only full row image is currently " + "supported.", map->database, map->table); } } else diff --git a/server/modules/routing/avrorouter/avro_schema.c b/server/modules/routing/avrorouter/avro_schema.c index d546b56bc..9949c9ebe 100644 --- a/server/modules/routing/avrorouter/avro_schema.c +++ b/server/modules/routing/avrorouter/avro_schema.c @@ -1206,20 +1206,33 @@ bool table_create_alter(TABLE_CREATE *create, const char *sql, const char *end) if (tok_eq(ptok, "add", plen) && tok_eq(tok, "column", len)) { tok = get_tok(tok + len, &len, end); - - create->column_names = MXS_REALLOC(create->column_names, sizeof(char*) * (create->columns + 1)); - create->column_types = MXS_REALLOC(create->column_types, sizeof(char*) * (create->columns + 1)); - create->column_lengths = MXS_REALLOC(create->column_lengths, sizeof(int) * (create->columns + 1)); - char avro_token[len + 1]; make_avro_token(avro_token, tok, len); - char field_type[200] = ""; // Enough to hold all types - int field_length = extract_type_length(tok + len, field_type); - create->column_names[create->columns] = MXS_STRDUP_A(avro_token); - create->column_types[create->columns] = MXS_STRDUP_A(field_type); - create->column_lengths[create->columns] = field_length; - create->columns++; - updates++; + bool is_new = true; + + for (uint64_t i = 0; i < create->columns; i++) + { + if (strcmp(avro_token, create->column_names[i]) == 0) + { + is_new = false; + break; + } + } + + if (is_new) + { + create->column_names = MXS_REALLOC(create->column_names, sizeof(char*) * (create->columns + 1)); + create->column_types = MXS_REALLOC(create->column_types, sizeof(char*) * (create->columns + 1)); + create->column_lengths = MXS_REALLOC(create->column_lengths, sizeof(int) * (create->columns + 1)); + + char field_type[200] = ""; // Enough to hold all types + int field_length = extract_type_length(tok + len, field_type); + create->column_names[create->columns] = MXS_STRDUP_A(avro_token); + create->column_types[create->columns] = MXS_STRDUP_A(field_type); + create->column_lengths[create->columns] = field_length; + create->columns++; + updates++; + } tok = get_next_def(tok, end); len = 0; } @@ -1425,20 +1438,3 @@ void table_map_free(TABLE_MAP *map) MXS_FREE(map); } } - -/** - * @brief Map a table to a different ID - * - * This updates the table ID that the @c TABLE_MAP object is assigned with - * - * @param ptr Pointer to the start of a table map event - * @param hdr_len Post-header length - * @param map Table map to remap - */ -void table_map_remap(uint8_t *ptr, uint8_t hdr_len, TABLE_MAP *map) -{ - uint64_t table_id = 0; - size_t id_size = hdr_len == 6 ? 4 : 6; - memcpy(&table_id, ptr, id_size); - map->id = table_id; -} diff --git a/server/modules/routing/avrorouter/avrorouter.h b/server/modules/routing/avrorouter/avrorouter.h index 1f91b31a2..9f4ebb1a5 100644 --- a/server/modules/routing/avrorouter/avrorouter.h +++ b/server/modules/routing/avrorouter/avrorouter.h @@ -319,7 +319,6 @@ extern char* json_new_schema_from_table(TABLE_MAP *map); extern void save_avro_schema(const char *path, const char* schema, TABLE_MAP *map); extern bool handle_table_map_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr); extern bool handle_row_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr); -extern void table_map_remap(uint8_t *ptr, uint8_t hdr_len, TABLE_MAP *map); enum avrorouter_file_op {