Make active table map array dynamic
By storing the table maps in a std::unordered_map, the storage of mapped tables is made dynamic. This also makes the management of mapped tables a lot more robust.
This commit is contained in:
@ -488,7 +488,6 @@ createInstance(SERVICE *service, char **options)
|
|||||||
inst->gtid.seq = 0;
|
inst->gtid.seq = 0;
|
||||||
inst->gtid.server_id = 0;
|
inst->gtid.server_id = 0;
|
||||||
inst->gtid.timestamp = 0;
|
inst->gtid.timestamp = 0;
|
||||||
memset(&inst->active_maps, 0, sizeof(inst->active_maps));
|
|
||||||
bool err = false;
|
bool err = false;
|
||||||
|
|
||||||
if (param)
|
if (param)
|
||||||
|
|||||||
@ -932,22 +932,18 @@ bool is_alter_table_statement(Avro *router, char* ptr, size_t len)
|
|||||||
*/
|
*/
|
||||||
bool save_and_replace_table_create(Avro *router, TABLE_CREATE *created)
|
bool save_and_replace_table_create(Avro *router, TABLE_CREATE *created)
|
||||||
{
|
{
|
||||||
char table_ident[MYSQL_TABLE_MAXLEN + MYSQL_DATABASE_MAXLEN + 2];
|
std::string table_ident = created->database + "." + created->table;
|
||||||
snprintf(table_ident, sizeof(table_ident), "%s.%s", created->database.c_str(), created->table.c_str());
|
|
||||||
|
|
||||||
auto it = router->created_tables.find(table_ident);
|
auto it = router->created_tables.find(table_ident);
|
||||||
|
|
||||||
if (it != router->created_tables.end())
|
if (it != router->created_tables.end())
|
||||||
{
|
{
|
||||||
for (auto a = router->table_maps.begin(); a != router->table_maps.end(); a++)
|
auto tm_it = router->table_maps.find(table_ident);
|
||||||
{
|
|
||||||
if (a->first == table_ident)
|
|
||||||
{
|
|
||||||
router->active_maps[a->second->id % MAX_MAPPED_TABLES] = NULL;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
router->table_maps.erase(table_ident);
|
if (tm_it != router->table_maps.end())
|
||||||
|
{
|
||||||
|
router->active_maps.erase(tm_it->second->id);
|
||||||
|
router->table_maps.erase(tm_it);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
router->created_tables[table_ident] = STableCreate(created);
|
router->created_tables[table_ident] = STableCreate(created);
|
||||||
|
|||||||
@ -140,14 +140,14 @@ bool handle_table_map_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr)
|
|||||||
|
|
||||||
if (notify)
|
if (notify)
|
||||||
{
|
{
|
||||||
router->active_maps[old->second->id % MAX_MAPPED_TABLES] = NULL;
|
router->active_maps.erase(old->second->id);
|
||||||
}
|
}
|
||||||
|
|
||||||
router->table_maps[table_ident] = map;
|
router->table_maps[table_ident] = map;
|
||||||
router->open_tables[table_ident] = avro_table;
|
router->open_tables[table_ident] = avro_table;
|
||||||
save_avro_schema(router->avrodir, json_schema, map.get());
|
save_avro_schema(router->avrodir, json_schema, map.get());
|
||||||
router->active_maps[map->id % MAX_MAPPED_TABLES] = map.get();
|
router->active_maps[map->id] = map;
|
||||||
ss_dassert(router->active_maps[id % MAX_MAPPED_TABLES] == map.get());
|
ss_dassert(router->active_maps[id] == map);
|
||||||
MXS_DEBUG("Table %s mapped to %lu", table_ident, map->id);
|
MXS_DEBUG("Table %s mapped to %lu", table_ident, map->id);
|
||||||
rval = true;
|
rval = true;
|
||||||
|
|
||||||
@ -287,12 +287,13 @@ bool handle_row_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr)
|
|||||||
ptr += coldata_size;
|
ptr += coldata_size;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** There should always be a table map event prior to a row event.
|
// There should always be a table map event prior to a row event.
|
||||||
* TODO: Make the active_maps dynamic */
|
|
||||||
TABLE_MAP *map = router->active_maps[table_id % MAX_MAPPED_TABLES];
|
|
||||||
|
|
||||||
if (map)
|
auto it = router->active_maps.find(table_id);
|
||||||
|
|
||||||
|
if (it != router->active_maps.end())
|
||||||
{
|
{
|
||||||
|
TABLE_MAP* map = it->second.get();
|
||||||
char table_ident[MYSQL_TABLE_MAXLEN + MYSQL_DATABASE_MAXLEN + 2];
|
char table_ident[MYSQL_TABLE_MAXLEN + MYSQL_DATABASE_MAXLEN + 2];
|
||||||
snprintf(table_ident, sizeof(table_ident), "%s.%s", map->database, map->table);
|
snprintf(table_ident, sizeof(table_ident), "%s.%s", map->database, map->table);
|
||||||
SAvroTable table;
|
SAvroTable table;
|
||||||
|
|||||||
@ -253,8 +253,9 @@ typedef std::tr1::shared_ptr<AVRO_TABLE> SAvroTable;
|
|||||||
typedef std::tr1::shared_ptr<TABLE_MAP> STableMap;
|
typedef std::tr1::shared_ptr<TABLE_MAP> STableMap;
|
||||||
|
|
||||||
typedef std::tr1::unordered_map<std::string, STableCreate> CreatedTables;
|
typedef std::tr1::unordered_map<std::string, STableCreate> CreatedTables;
|
||||||
typedef std::tr1::unordered_map<std::string, SAvroTable> AvroTables;
|
typedef std::tr1::unordered_map<std::string, SAvroTable> AvroTables;
|
||||||
typedef std::tr1::unordered_map<std::string, STableMap> MappedTables;
|
typedef std::tr1::unordered_map<std::string, STableMap> MappedTables;
|
||||||
|
typedef std::tr1::unordered_map<uint64_t, STableMap> ActiveMaps;
|
||||||
|
|
||||||
struct Avro
|
struct Avro
|
||||||
{
|
{
|
||||||
@ -276,7 +277,7 @@ struct Avro
|
|||||||
uint8_t event_type_hdr_lens[MAX_EVENT_TYPE_END];
|
uint8_t event_type_hdr_lens[MAX_EVENT_TYPE_END];
|
||||||
uint8_t binlog_checksum;
|
uint8_t binlog_checksum;
|
||||||
gtid_pos_t gtid;
|
gtid_pos_t gtid;
|
||||||
TABLE_MAP* active_maps[MAX_MAPPED_TABLES];
|
ActiveMaps active_maps;
|
||||||
MappedTables table_maps;
|
MappedTables table_maps;
|
||||||
AvroTables open_tables;
|
AvroTables open_tables;
|
||||||
CreatedTables created_tables;
|
CreatedTables created_tables;
|
||||||
|
|||||||
Reference in New Issue
Block a user