From f61c56228c0755d34891498b7b19765292cb6a34 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Tue, 29 May 2018 00:41:53 +0300 Subject: [PATCH] MXS-1881: Refactor TABLE_MAP initialization Changed TABLE_MAP to use STL containers and types. The initialization is now done in the constructor. Removed unnecessary linkage between TABLE_MAP and TABLE_CREATE. --- server/modules/routing/avrorouter/avro_rbr.cc | 37 +++++------ .../modules/routing/avrorouter/avro_schema.cc | 61 +++++-------------- .../modules/routing/avrorouter/avrorouter.hh | 45 ++++++++------ 3 files changed, 59 insertions(+), 84 deletions(-) diff --git a/server/modules/routing/avrorouter/avro_rbr.cc b/server/modules/routing/avrorouter/avro_rbr.cc index 4e058ce25..c000437c5 100644 --- a/server/modules/routing/avrorouter/avro_rbr.cc +++ b/server/modules/routing/avrorouter/avro_rbr.cc @@ -113,15 +113,14 @@ bool handle_table_map_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr) auto old = it->second; if (old->id == map->id && old->version == map->version && - strcmp(old->table, map->table) == 0 && - strcmp(old->database, map->database) == 0) + old->table == map->table && old->database == map->database) { // We can reuse the table map object return true; } } - char* json_schema = json_new_schema_from_table(map.get()); + char* json_schema = json_new_schema_from_table(map, create->second); if (json_schema) { @@ -145,7 +144,7 @@ bool handle_table_map_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr) router->table_maps[table_ident] = map; router->open_tables[table_ident] = avro_table; - save_avro_schema(router->avrodir.c_str(), json_schema, map.get()); + save_avro_schema(router->avrodir.c_str(), json_schema, map, create->second); router->active_maps[map->id] = map; ss_dassert(router->active_maps[id] == map); MXS_DEBUG("Table %s mapped to %lu", table_ident, map->id); @@ -295,7 +294,7 @@ bool handle_row_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr) { TABLE_MAP* map = it->second.get(); char table_ident[MYSQL_TABLE_MAXLEN + MYSQL_DATABASE_MAXLEN + 2]; - snprintf(table_ident, sizeof(table_ident), "%s.%s", map->database, map->table); + snprintf(table_ident, sizeof(table_ident), "%s.%s", map->database.c_str(), map->table.c_str()); SAvroTable table; auto it = router->open_tables.find(table_ident); @@ -304,9 +303,10 @@ bool handle_row_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr) table = it->second; } - TABLE_CREATE* create = map->table_create; + auto create = router->created_tables.find(table_ident); - if (table && create && ncolumns == map->columns && create->columns.size() == map->columns) + if (table && create != router->created_tables.end() && + ncolumns == map->columns() && create->second->columns.size() == map->columns()) { avro_value_t record; avro_generic_value_new(table->avro_writer_iface, &record); @@ -325,7 +325,7 @@ bool handle_row_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr) /** Add the current GTID and timestamp */ int event_type = get_event_type(hdr->event_type); prepare_record(router, hdr, event_type, &record); - ptr = process_row_event_data(map, create, &record, ptr, col_present, end); + ptr = process_row_event_data(map, create->second.get(), &record, ptr, col_present, end); if (avro_file_writer_append_value(table->avro_file, &record)) { MXS_ERROR("Failed to write value at position %ld: %s", @@ -338,7 +338,8 @@ bool handle_row_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr) if (event_type == UPDATE_EVENT) { prepare_record(router, hdr, UPDATE_EVENT_AFTER, &record); - ptr = process_row_event_data(map, create, &record, ptr, col_present, end); + ptr = process_row_event_data(map, create->second.get(), &record, ptr, col_present, end); + if (avro_file_writer_append_value(table->avro_file, &record)) { MXS_ERROR("Failed to write value at position %ld: %s", @@ -356,25 +357,25 @@ bool handle_row_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr) else if (!table) { MXS_ERROR("Avro file handle was not found for table %s.%s. See earlier" - " errors for more details.", map->database, map->table); + " errors for more details.", map->database.c_str(), map->table.c_str()); } - else if (create == NULL) + else if (create == router->created_tables.end()) { MXS_ERROR("Create table statement for %s.%s was not found from the " "binary logs or the stored schema was not correct.", - map->database, map->table); + map->database.c_str(), map->table.c_str()); } - else if (ncolumns == map->columns && create->columns.size() != map->columns) + else if (ncolumns == map->columns() && create->second->columns.size() != map->columns()) { MXS_ERROR("Table map event has a different column count for table " "%s.%s than the CREATE TABLE statement. Possible " - "unsupported DDL detected.", map->database, map->table); + "unsupported DDL detected.", map->database.c_str(), map->table.c_str()); } else { MXS_ERROR("Row event and table map event have different column " "counts for table %s.%s, only full row image is currently " - "supported.", map->database, map->table); + "supported.", map->database.c_str(), map->table.c_str()); } } else @@ -560,8 +561,8 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value { int npresent = 0; avro_value_t field; - long ncolumns = map->columns; - uint8_t *metadata = map->column_metadata; + long ncolumns = map->columns(); + uint8_t *metadata = &map->column_metadata[0]; size_t metadata_offset = 0; /** BIT type values use the extra bits in the row event header */ @@ -741,7 +742,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value sprintf(trace[i], "[%ld] %s", i, column_type_to_string(map->column_types[i])); check_overflow(ptr <= end); } - ss_dassert(metadata_offset <= map->column_metadata_size); + ss_dassert(metadata_offset <= map->column_metadata.size()); metadata_offset += get_metadata_len(map->column_types[i]); } else diff --git a/server/modules/routing/avrorouter/avro_schema.cc b/server/modules/routing/avrorouter/avro_schema.cc index 4ada06657..c78295a32 100644 --- a/server/modules/routing/avrorouter/avro_schema.cc +++ b/server/modules/routing/avrorouter/avro_schema.cc @@ -82,15 +82,14 @@ static const char* column_type_to_avro_type(uint8_t type) * @param create The TABLE_CREATE for this table * @return New schema or NULL if an error occurred */ -char* json_new_schema_from_table(TABLE_MAP *map) +char* json_new_schema_from_table(const STableMap& map, const STableCreate& create) { - TABLE_CREATE *create = map->table_create; - if (map->version != create->version) { MXS_ERROR("Version mismatch for table %s.%s. Table map version is %d and " - "the table definition version is %d.", map->database, map->table, - map->version, create->version); + "the table definition version is %d.", map->database.c_str(), + map->table.c_str(), map->version, create->version); + ss_dassert(!true); // Should not happen return NULL; } @@ -123,7 +122,7 @@ char* json_new_schema_from_table(TABLE_MAP *map) json_array_append_new(array, json_pack_ex(&err, 0, "{s:s, s:o}", "name", avro_event_type, "type", event_types)); - for (uint64_t i = 0; i < map->columns && i < create->columns.size(); i++) + for (uint64_t i = 0; i < map->columns() && i < create->columns.size(); i++) { json_array_append_new(array, json_pack_ex(&err, 0, "{s:s, s:s, s:s, s:i}", "name", create->columns[i].name.c_str(), @@ -250,21 +249,21 @@ bool json_extract_field_names(const char* filename, std::vector& columns * @param schema Schema in JSON format * @param map Table map that @p schema represents */ -void save_avro_schema(const char *path, const char* schema, TABLE_MAP *map) +void save_avro_schema(const char *path, const char* schema, STableMap& map, STableCreate& create) { char filepath[PATH_MAX]; - snprintf(filepath, sizeof(filepath), "%s/%s.%s.%06d.avsc", path, map->database, - map->table, map->version); + snprintf(filepath, sizeof(filepath), "%s/%s.%s.%06d.avsc", path, + map->database.c_str(), map->table.c_str(), map->version); if (access(filepath, F_OK) != 0) { - if (!map->table_create->was_used) + if (!create->was_used) { FILE *file = fopen(filepath, "wb"); if (file) { fprintf(file, "%s\n", schema); - map->table_create->was_used = true; + create->was_used = true; fclose(file); } } @@ -1472,40 +1471,10 @@ TABLE_MAP *table_map_alloc(uint8_t *ptr, uint8_t hdr_len, TABLE_CREATE* create) uint8_t* metadata = (uint8_t*)mxs_lestr_consume(&ptr, &metadata_size); uint8_t *nullmap = ptr; size_t nullmap_size = (column_count + 7) / 8; - TABLE_MAP *map = new (std::nothrow)TABLE_MAP; - if (map) - { - map->id = table_id; - map->version = create->version; - map->flags = flags; - map->columns = column_count; - map->column_types = static_cast(MXS_MALLOC(column_count)); - /** Allocate at least one byte for the metadata */ - map->column_metadata = static_cast(MXS_CALLOC(1, metadata_size + 1)); - map->column_metadata_size = metadata_size; - map->null_bitmap = static_cast(MXS_MALLOC(nullmap_size)); - map->database = MXS_STRDUP(schema_name); - map->table = MXS_STRDUP(table_name); - map->table_create = create; - if (map->column_types && map->database && map->table && - map->column_metadata && map->null_bitmap) - { - memcpy(map->column_types, column_types, column_count); - memcpy(map->null_bitmap, nullmap, nullmap_size); - memcpy(map->column_metadata, metadata, metadata_size); - } - else - { - MXS_FREE(map->null_bitmap); - MXS_FREE(map->column_metadata); - MXS_FREE(map->column_types); - MXS_FREE(map->database); - MXS_FREE(map->table); - MXS_FREE(map); - map = NULL; - } - } - - return map; + Bytes cols(column_types, column_types + column_count); + Bytes nulls(nullmap, nullmap + nullmap_size); + Bytes meta(metadata, metadata + metadata_size); + return new (std::nothrow)TABLE_MAP(schema_name, table_name, table_id, create->version, + std::move(cols), std::move(nulls), std::move(meta)); } diff --git a/server/modules/routing/avrorouter/avrorouter.hh b/server/modules/routing/avrorouter/avrorouter.hh index 0c2b00bb5..26ab94041 100644 --- a/server/modules/routing/avrorouter/avrorouter.hh +++ b/server/modules/routing/avrorouter/avrorouter.hh @@ -156,33 +156,38 @@ struct TABLE_CREATE bool was_used; /**< Has this schema been persisted to disk */ }; +typedef std::vector Bytes; + /** A representation of a table map event read from a binary log. A table map * maps a table to a unique ID which can be used to match row events to table map * events. The table map event tells us how the table is laid out and gives us * some meta information on the columns. */ struct TABLE_MAP { - ~TABLE_MAP() + TABLE_MAP(const std::string& db, const std::string& table, uint64_t id, + int version, Bytes&& cols, Bytes&& nulls, Bytes&& metadata): + database(db), + table(table), + id(id), + version(version), + column_types(cols), + null_bitmap(nulls), + column_metadata(metadata) { - MXS_FREE(column_types); - MXS_FREE(column_metadata); - MXS_FREE(null_bitmap); - MXS_FREE(database); - MXS_FREE(table); } - uint64_t id; - uint64_t columns; - uint16_t flags; - uint8_t* column_types; - uint8_t* null_bitmap; - uint8_t* column_metadata; - size_t column_metadata_size; - TABLE_CREATE* table_create; /*< The definition of the table */ - int version; - char version_string[TABLE_MAP_VERSION_DIGITS + 1]; - char* table; - char* database; + uint64_t columns() const + { + return column_types.size(); + } + + std::string database; + std::string table; + uint64_t id; + int version; + Bytes column_types; + Bytes null_bitmap; + Bytes column_metadata; }; struct AVRO_TABLE @@ -363,8 +368,8 @@ extern void avro_close_binlog(int fd); extern avro_binlog_end_t avro_read_all_events(Avro *router); extern AVRO_TABLE* avro_table_alloc(const char* filepath, const char* json_schema, const char *codec, size_t block_size); -extern char* json_new_schema_from_table(TABLE_MAP *map); -extern void save_avro_schema(const char *path, const char* schema, TABLE_MAP *map); +extern char* json_new_schema_from_table(const STableMap& map, const STableCreate& create); +extern void save_avro_schema(const char *path, const char* schema, STableMap& map, STableCreate& create); extern bool handle_table_map_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr); extern bool handle_row_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr);