MXS-1621: Detect TABLE_MAP ↔ TABLE_CREATE column count mismatch
If the TABLE_MAP and TABLE_CREATE have different column counts, an error is logged and the row events are skipped.
This commit is contained in:
@ -649,6 +649,7 @@ size_t unpack_numeric_field(uint8_t *src, uint8_t type, uint8_t *metadata, uint8
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ss_dassert(size > 0);
|
||||||
memcpy(dest, src, size);
|
memcpy(dest, src, size);
|
||||||
return size;
|
return size;
|
||||||
}
|
}
|
||||||
|
@ -210,6 +210,7 @@ bool handle_row_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr)
|
|||||||
{
|
{
|
||||||
bool rval = false;
|
bool rval = false;
|
||||||
uint8_t *start = ptr;
|
uint8_t *start = ptr;
|
||||||
|
uint8_t *end = ptr + hdr->event_size - BINLOG_EVENT_HDR_LEN;
|
||||||
uint8_t table_id_size = router->event_type_hdr_lens[hdr->event_type] == 6 ? 4 : 6;
|
uint8_t table_id_size = router->event_type_hdr_lens[hdr->event_type] == 6 ? 4 : 6;
|
||||||
uint64_t table_id = 0;
|
uint64_t table_id = 0;
|
||||||
|
|
||||||
@ -274,8 +275,9 @@ bool handle_row_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr)
|
|||||||
snprintf(table_ident, sizeof(table_ident), "%s.%s", map->database, map->table);
|
snprintf(table_ident, sizeof(table_ident), "%s.%s", map->database, map->table);
|
||||||
AVRO_TABLE* table = hashtable_fetch(router->open_tables, table_ident);
|
AVRO_TABLE* table = hashtable_fetch(router->open_tables, table_ident);
|
||||||
TABLE_CREATE* create = map->table_create;
|
TABLE_CREATE* create = map->table_create;
|
||||||
|
ss_dassert(hashtable_fetch(router->created_tables, table_ident) == create);
|
||||||
|
|
||||||
if (table && create && ncolumns == map->columns)
|
if (table && create && ncolumns == map->columns && create->columns == map->columns)
|
||||||
{
|
{
|
||||||
avro_value_t record;
|
avro_value_t record;
|
||||||
avro_generic_value_new(table->avro_writer_iface, &record);
|
avro_generic_value_new(table->avro_writer_iface, &record);
|
||||||
@ -292,7 +294,6 @@ bool handle_row_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr)
|
|||||||
MXS_INFO("Row %lu", total_row_count++);
|
MXS_INFO("Row %lu", total_row_count++);
|
||||||
|
|
||||||
/** Add the current GTID and timestamp */
|
/** Add the current GTID and timestamp */
|
||||||
uint8_t *end = ptr + hdr->event_size - BINLOG_EVENT_HDR_LEN;
|
|
||||||
int event_type = get_event_type(hdr->event_type);
|
int event_type = get_event_type(hdr->event_type);
|
||||||
prepare_record(router, hdr, event_type, &record);
|
prepare_record(router, hdr, event_type, &record);
|
||||||
ptr = process_row_event_data(map, create, &record, ptr, col_present, end);
|
ptr = process_row_event_data(map, create, &record, ptr, col_present, end);
|
||||||
@ -334,6 +335,12 @@ bool handle_row_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr)
|
|||||||
"binary logs or the stored schema was not correct.",
|
"binary logs or the stored schema was not correct.",
|
||||||
map->database, map->table);
|
map->database, map->table);
|
||||||
}
|
}
|
||||||
|
else if (ncolumns == map->columns && create->columns != map->columns)
|
||||||
|
{
|
||||||
|
MXS_ERROR("Table map event has a different column "
|
||||||
|
"count for table %s.%s than the CREATE TABLE statement.",
|
||||||
|
map->database, map->table);
|
||||||
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
MXS_ERROR("Row event and table map event have different column "
|
MXS_ERROR("Row event and table map event have different column "
|
||||||
|
Reference in New Issue
Block a user