Merge branch '2.1' into 2.2
This commit is contained in:
@ -229,6 +229,7 @@ bool handle_row_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr)
|
||||
{
|
||||
bool rval = false;
|
||||
uint8_t *start = ptr;
|
||||
uint8_t *end = ptr + hdr->event_size - BINLOG_EVENT_HDR_LEN;
|
||||
uint8_t table_id_size = router->event_type_hdr_lens[hdr->event_type] == 6 ? 4 : 6;
|
||||
uint64_t table_id = 0;
|
||||
|
||||
@ -293,8 +294,9 @@ bool handle_row_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr)
|
||||
snprintf(table_ident, sizeof(table_ident), "%s.%s", map->database, map->table);
|
||||
AVRO_TABLE* table = hashtable_fetch(router->open_tables, table_ident);
|
||||
TABLE_CREATE* create = map->table_create;
|
||||
ss_dassert(hashtable_fetch(router->created_tables, table_ident) == create);
|
||||
|
||||
if (table && create && ncolumns == map->columns)
|
||||
if (table && create && ncolumns == map->columns && create->columns == map->columns)
|
||||
{
|
||||
avro_value_t record;
|
||||
avro_generic_value_new(table->avro_writer_iface, &record);
|
||||
@ -305,13 +307,12 @@ bool handle_row_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr)
|
||||
int rows = 0;
|
||||
MXS_INFO("Row Event for '%s' at %lu", table_ident, router->current_pos);
|
||||
|
||||
while (ptr - start < hdr->event_size - BINLOG_EVENT_HDR_LEN)
|
||||
while (ptr < end)
|
||||
{
|
||||
static uint64_t total_row_count = 1;
|
||||
MXS_INFO("Row %lu", total_row_count++);
|
||||
|
||||
/** Add the current GTID and timestamp */
|
||||
uint8_t *end = ptr + hdr->event_size - BINLOG_EVENT_HDR_LEN;
|
||||
int event_type = get_event_type(hdr->event_type);
|
||||
prepare_record(router, hdr, event_type, &record);
|
||||
ptr = process_row_event_data(map, create, &record, ptr, col_present, end);
|
||||
@ -353,6 +354,12 @@ bool handle_row_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr)
|
||||
"binary logs or the stored schema was not correct.",
|
||||
map->database, map->table);
|
||||
}
|
||||
else if (ncolumns == map->columns && create->columns != map->columns)
|
||||
{
|
||||
MXS_ERROR("Table map event has a different column count for table "
|
||||
"%s.%s than the CREATE TABLE statement. Possible "
|
||||
"unsupported DDL detected.", map->database, map->table);
|
||||
}
|
||||
else
|
||||
{
|
||||
MXS_ERROR("Row event and table map event have different column "
|
||||
@ -526,7 +533,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
|
||||
{
|
||||
int npresent = 0;
|
||||
avro_value_t field;
|
||||
long ncolumns = MXS_MIN(map->columns, create->columns);
|
||||
long ncolumns = map->columns;
|
||||
uint8_t *metadata = map->column_metadata;
|
||||
size_t metadata_offset = 0;
|
||||
|
||||
@ -576,7 +583,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
|
||||
avro_value_set_string(&field, strval);
|
||||
sprintf(trace[i], "[%ld] ENUM: %lu bytes", i, bytes);
|
||||
ptr += bytes;
|
||||
check_overflow(ptr < end);
|
||||
check_overflow(ptr <= end);
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -612,7 +619,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
|
||||
str[bytes] = '\0';
|
||||
avro_value_set_string(&field, str);
|
||||
ptr += bytes;
|
||||
check_overflow(ptr < end);
|
||||
check_overflow(ptr <= end);
|
||||
}
|
||||
}
|
||||
else if (column_is_bit(map->column_types[i]))
|
||||
@ -631,7 +638,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
|
||||
avro_value_set_int(&field, value);
|
||||
sprintf(trace[i], "[%ld] BIT", i);
|
||||
ptr += bytes;
|
||||
check_overflow(ptr < end);
|
||||
check_overflow(ptr <= end);
|
||||
}
|
||||
else if (column_is_decimal(map->column_types[i]))
|
||||
{
|
||||
@ -639,7 +646,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
|
||||
ptr += unpack_decimal_field(ptr, metadata + metadata_offset, &f_value);
|
||||
avro_value_set_double(&field, f_value);
|
||||
sprintf(trace[i], "[%ld] DECIMAL", i);
|
||||
check_overflow(ptr < end);
|
||||
check_overflow(ptr <= end);
|
||||
}
|
||||
else if (column_is_variable_string(map->column_types[i]))
|
||||
{
|
||||
@ -662,7 +669,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
|
||||
buf[sz] = '\0';
|
||||
ptr += sz;
|
||||
avro_value_set_string(&field, buf);
|
||||
check_overflow(ptr < end);
|
||||
check_overflow(ptr <= end);
|
||||
}
|
||||
else if (column_is_blob(map->column_types[i]))
|
||||
{
|
||||
@ -681,7 +688,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
|
||||
uint8_t nullvalue = 0;
|
||||
avro_value_set_bytes(&field, &nullvalue, 1);
|
||||
}
|
||||
check_overflow(ptr < end);
|
||||
check_overflow(ptr <= end);
|
||||
}
|
||||
else if (column_is_temporal(map->column_types[i]))
|
||||
{
|
||||
@ -693,7 +700,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
|
||||
format_temporal_value(buf, sizeof(buf), map->column_types[i], &tm);
|
||||
avro_value_set_string(&field, buf);
|
||||
sprintf(trace[i], "[%ld] %s: %s", i, column_type_to_string(map->column_types[i]), buf);
|
||||
check_overflow(ptr < end);
|
||||
check_overflow(ptr <= end);
|
||||
}
|
||||
/** All numeric types (INT, LONG, FLOAT etc.) */
|
||||
else
|
||||
@ -704,7 +711,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
|
||||
&metadata[metadata_offset], lval);
|
||||
set_numeric_field_value(&field, map->column_types[i], &metadata[metadata_offset], lval);
|
||||
sprintf(trace[i], "[%ld] %s", i, column_type_to_string(map->column_types[i]));
|
||||
check_overflow(ptr < end);
|
||||
check_overflow(ptr <= end);
|
||||
}
|
||||
ss_dassert(metadata_offset <= map->column_metadata_size);
|
||||
metadata_offset += get_metadata_len(map->column_types[i]);
|
||||
|
||||
Reference in New Issue
Block a user