MXS-1536: Always recreate TABLE_MAP objects
When a table map event is read after an alter table, the old TABLE_MAP object contains old information. Due to this, as well as the added benefit of making the code easier to read, the recycling of TABLE_MAP objects was removed. In practice, there were no benefits to re-mapping the tables to a different ID.
This commit is contained in:
@ -971,6 +971,8 @@ bool save_and_replace_table_create(AVRO_INSTANCE *router, TABLE_CREATE *created)
|
|||||||
{
|
{
|
||||||
if (strcmp(key, table_ident) == 0)
|
if (strcmp(key, table_ident) == 0)
|
||||||
{
|
{
|
||||||
|
TABLE_MAP* map = hashtable_fetch(router->table_maps, key);
|
||||||
|
router->active_maps[map->id % MAX_MAPPED_TABLES] = NULL;
|
||||||
hashtable_delete(router->table_maps, key);
|
hashtable_delete(router->table_maps, key);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -88,70 +88,51 @@ bool handle_table_map_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr
|
|||||||
{
|
{
|
||||||
ss_dassert(create->columns > 0);
|
ss_dassert(create->columns > 0);
|
||||||
TABLE_MAP *old = hashtable_fetch(router->table_maps, table_ident);
|
TABLE_MAP *old = hashtable_fetch(router->table_maps, table_ident);
|
||||||
|
TABLE_MAP *map = table_map_alloc(ptr, ev_len, create);
|
||||||
|
MXS_ABORT_IF_NULL(map); // Fatal error at this point
|
||||||
|
char* json_schema = json_new_schema_from_table(map);
|
||||||
|
|
||||||
if (old == NULL || old->version != create->version)
|
if (json_schema)
|
||||||
{
|
{
|
||||||
TABLE_MAP *map = table_map_alloc(ptr, ev_len, create);
|
char filepath[PATH_MAX + 1];
|
||||||
|
snprintf(filepath, sizeof(filepath), "%s/%s.%06d.avro",
|
||||||
|
router->avrodir, table_ident, map->version);
|
||||||
|
|
||||||
if (map)
|
/** Close the file and open a new one */
|
||||||
|
hashtable_delete(router->open_tables, table_ident);
|
||||||
|
AVRO_TABLE *avro_table = avro_table_alloc(filepath, json_schema, router->block_size);
|
||||||
|
|
||||||
|
if (avro_table)
|
||||||
{
|
{
|
||||||
char* json_schema = json_new_schema_from_table(map);
|
bool notify = old != NULL;
|
||||||
|
|
||||||
if (json_schema)
|
if (old)
|
||||||
{
|
{
|
||||||
char filepath[PATH_MAX + 1];
|
router->active_maps[old->id % MAX_MAPPED_TABLES] = NULL;
|
||||||
snprintf(filepath, sizeof(filepath), "%s/%s.%06d.avro",
|
|
||||||
router->avrodir, table_ident, map->version);
|
|
||||||
|
|
||||||
/** Close the file and open a new one */
|
|
||||||
hashtable_delete(router->open_tables, table_ident);
|
|
||||||
AVRO_TABLE *avro_table = avro_table_alloc(filepath, json_schema, router->block_size);
|
|
||||||
|
|
||||||
if (avro_table)
|
|
||||||
{
|
|
||||||
bool notify = old != NULL;
|
|
||||||
|
|
||||||
if (old)
|
|
||||||
{
|
|
||||||
router->active_maps[old->id % MAX_MAPPED_TABLES] = NULL;
|
|
||||||
}
|
|
||||||
hashtable_delete(router->table_maps, table_ident);
|
|
||||||
hashtable_add(router->table_maps, (void*) table_ident, map);
|
|
||||||
hashtable_add(router->open_tables, table_ident, avro_table);
|
|
||||||
save_avro_schema(router->avrodir, json_schema, map);
|
|
||||||
router->active_maps[map->id % MAX_MAPPED_TABLES] = map;
|
|
||||||
MXS_DEBUG("Table %s mapped to %lu", table_ident, map->id);
|
|
||||||
rval = true;
|
|
||||||
|
|
||||||
if (notify)
|
|
||||||
{
|
|
||||||
notify_all_clients(router);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
MXS_ERROR("Failed to open new Avro file for writing.");
|
|
||||||
}
|
|
||||||
MXS_FREE(json_schema);
|
|
||||||
}
|
}
|
||||||
else
|
hashtable_delete(router->table_maps, table_ident);
|
||||||
|
hashtable_add(router->table_maps, (void*)table_ident, map);
|
||||||
|
hashtable_add(router->open_tables, table_ident, avro_table);
|
||||||
|
save_avro_schema(router->avrodir, json_schema, map);
|
||||||
|
router->active_maps[map->id % MAX_MAPPED_TABLES] = map;
|
||||||
|
ss_dassert(router->active_maps[id % MAX_MAPPED_TABLES] == map);
|
||||||
|
MXS_DEBUG("Table %s mapped to %lu", table_ident, map->id);
|
||||||
|
rval = true;
|
||||||
|
|
||||||
|
if (notify)
|
||||||
{
|
{
|
||||||
MXS_ERROR("Failed to create JSON schema.");
|
notify_all_clients(router);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
MXS_ERROR("Failed to allocate new table map.");
|
MXS_ERROR("Failed to open new Avro file for writing.");
|
||||||
}
|
}
|
||||||
|
MXS_FREE(json_schema);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
router->active_maps[old->id % MAX_MAPPED_TABLES] = NULL;
|
MXS_ERROR("Failed to create JSON schema.");
|
||||||
table_map_remap(ptr, ev_len, old);
|
|
||||||
router->active_maps[old->id % MAX_MAPPED_TABLES] = old;
|
|
||||||
MXS_DEBUG("Table %s re-mapped to %lu", table_ident, old->id);
|
|
||||||
/** No changes in the schema */
|
|
||||||
rval = true;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
@ -345,8 +326,9 @@ bool handle_row_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr)
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
MXS_ERROR("Row event and table map event have different column counts."
|
MXS_ERROR("Row event and table map event have different column "
|
||||||
" Only full row image is currently supported.");
|
"counts for table %s.%s, only full row image is currently "
|
||||||
|
"supported.", map->database, map->table);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
@ -1206,20 +1206,33 @@ bool table_create_alter(TABLE_CREATE *create, const char *sql, const char *end)
|
|||||||
if (tok_eq(ptok, "add", plen) && tok_eq(tok, "column", len))
|
if (tok_eq(ptok, "add", plen) && tok_eq(tok, "column", len))
|
||||||
{
|
{
|
||||||
tok = get_tok(tok + len, &len, end);
|
tok = get_tok(tok + len, &len, end);
|
||||||
|
|
||||||
create->column_names = MXS_REALLOC(create->column_names, sizeof(char*) * (create->columns + 1));
|
|
||||||
create->column_types = MXS_REALLOC(create->column_types, sizeof(char*) * (create->columns + 1));
|
|
||||||
create->column_lengths = MXS_REALLOC(create->column_lengths, sizeof(int) * (create->columns + 1));
|
|
||||||
|
|
||||||
char avro_token[len + 1];
|
char avro_token[len + 1];
|
||||||
make_avro_token(avro_token, tok, len);
|
make_avro_token(avro_token, tok, len);
|
||||||
char field_type[200] = ""; // Enough to hold all types
|
bool is_new = true;
|
||||||
int field_length = extract_type_length(tok + len, field_type);
|
|
||||||
create->column_names[create->columns] = MXS_STRDUP_A(avro_token);
|
for (uint64_t i = 0; i < create->columns; i++)
|
||||||
create->column_types[create->columns] = MXS_STRDUP_A(field_type);
|
{
|
||||||
create->column_lengths[create->columns] = field_length;
|
if (strcmp(avro_token, create->column_names[i]) == 0)
|
||||||
create->columns++;
|
{
|
||||||
updates++;
|
is_new = false;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (is_new)
|
||||||
|
{
|
||||||
|
create->column_names = MXS_REALLOC(create->column_names, sizeof(char*) * (create->columns + 1));
|
||||||
|
create->column_types = MXS_REALLOC(create->column_types, sizeof(char*) * (create->columns + 1));
|
||||||
|
create->column_lengths = MXS_REALLOC(create->column_lengths, sizeof(int) * (create->columns + 1));
|
||||||
|
|
||||||
|
char field_type[200] = ""; // Enough to hold all types
|
||||||
|
int field_length = extract_type_length(tok + len, field_type);
|
||||||
|
create->column_names[create->columns] = MXS_STRDUP_A(avro_token);
|
||||||
|
create->column_types[create->columns] = MXS_STRDUP_A(field_type);
|
||||||
|
create->column_lengths[create->columns] = field_length;
|
||||||
|
create->columns++;
|
||||||
|
updates++;
|
||||||
|
}
|
||||||
tok = get_next_def(tok, end);
|
tok = get_next_def(tok, end);
|
||||||
len = 0;
|
len = 0;
|
||||||
}
|
}
|
||||||
@ -1425,20 +1438,3 @@ void table_map_free(TABLE_MAP *map)
|
|||||||
MXS_FREE(map);
|
MXS_FREE(map);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* @brief Map a table to a different ID
|
|
||||||
*
|
|
||||||
* This updates the table ID that the @c TABLE_MAP object is assigned with
|
|
||||||
*
|
|
||||||
* @param ptr Pointer to the start of a table map event
|
|
||||||
* @param hdr_len Post-header length
|
|
||||||
* @param map Table map to remap
|
|
||||||
*/
|
|
||||||
void table_map_remap(uint8_t *ptr, uint8_t hdr_len, TABLE_MAP *map)
|
|
||||||
{
|
|
||||||
uint64_t table_id = 0;
|
|
||||||
size_t id_size = hdr_len == 6 ? 4 : 6;
|
|
||||||
memcpy(&table_id, ptr, id_size);
|
|
||||||
map->id = table_id;
|
|
||||||
}
|
|
||||||
|
@ -319,7 +319,6 @@ extern char* json_new_schema_from_table(TABLE_MAP *map);
|
|||||||
extern void save_avro_schema(const char *path, const char* schema, TABLE_MAP *map);
|
extern void save_avro_schema(const char *path, const char* schema, TABLE_MAP *map);
|
||||||
extern bool handle_table_map_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr);
|
extern bool handle_table_map_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr);
|
||||||
extern bool handle_row_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr);
|
extern bool handle_row_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr);
|
||||||
extern void table_map_remap(uint8_t *ptr, uint8_t hdr_len, TABLE_MAP *map);
|
|
||||||
|
|
||||||
enum avrorouter_file_op
|
enum avrorouter_file_op
|
||||||
{
|
{
|
||||||
|
Reference in New Issue
Block a user