Merge branch '2.1' into develop

This commit is contained in:
Johan Wikman
2017-03-03 13:37:14 +02:00
56 changed files with 2355 additions and 630 deletions

View File

@ -30,7 +30,7 @@ static bool warn_large_enumset = false; /**< Remove when support for ENUM/SET va
uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create,
avro_value_t *record, uint8_t *ptr,
uint8_t *columns_present);
uint8_t *columns_present, uint8_t *end);
void notify_all_clients(AVRO_INSTANCE *router);
void add_used_table(AVRO_INSTANCE* router, const char* table);
@ -309,9 +309,10 @@ bool handle_row_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr)
while (ptr - start < hdr->event_size - BINLOG_EVENT_HDR_LEN)
{
/** Add the current GTID and timestamp */
uint8_t *end = ptr + hdr->event_size;
int event_type = get_event_type(hdr->event_type);
prepare_record(router, hdr, event_type, &record);
ptr = process_row_event_data(map, create, &record, ptr, col_present);
ptr = process_row_event_data(map, create, &record, ptr, col_present, end);
avro_file_writer_append_value(table->avro_file, &record);
/** Update rows events have the before and after images of the
@ -320,7 +321,7 @@ bool handle_row_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr)
if (event_type == UPDATE_EVENT)
{
prepare_record(router, hdr, UPDATE_EVENT_AFTER, &record);
ptr = process_row_event_data(map, create, &record, ptr, col_present);
ptr = process_row_event_data(map, create, &record, ptr, col_present, end);
avro_file_writer_append_value(table->avro_file, &record);
}
@ -497,7 +498,7 @@ int get_metadata_len(uint8_t type)
* @return Pointer to the first byte after the current row event
*/
uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value_t *record,
uint8_t *ptr, uint8_t *columns_present)
uint8_t *ptr, uint8_t *columns_present, uint8_t *end)
{
int npresent = 0;
avro_value_t field;
@ -507,10 +508,12 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
/** BIT type values use the extra bits in the row event header */
int extra_bits = (((ncolumns + 7) / 8) * 8) - ncolumns;
ss_dassert(ptr < end);
/** Store the null value bitmap */
uint8_t *null_bitmap = ptr;
ptr += (ncolumns + 7) / 8;
ss_dassert(ptr < end);
for (long i = 0; i < map->columns && npresent < ncolumns; i++)
{
@ -544,6 +547,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
}
avro_value_set_string(&field, strval);
ptr += bytes;
ss_dassert(ptr < end);
}
else
{
@ -553,6 +557,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
str[bytes] = '\0';
avro_value_set_string(&field, str);
ptr += bytes + 1;
ss_dassert(ptr < end);
}
}
else if (column_is_bit(map->column_types[i]))
@ -572,21 +577,36 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
}
avro_value_set_int(&field, value);
ptr += bytes;
ss_dassert(ptr < end);
}
else if (column_is_decimal(map->column_types[i]))
{
double f_value = 0.0;
ptr += unpack_decimal_field(ptr, metadata + metadata_offset, &f_value);
avro_value_set_double(&field, f_value);
ss_dassert(ptr < end);
}
else if (column_is_variable_string(map->column_types[i]))
{
size_t sz;
char *str = mxs_lestr_consume(&ptr, &sz);
int bytes = metadata[metadata_offset] | metadata[metadata_offset + 1] << 8;
if (bytes > 255)
{
sz = gw_mysql_get_byte2(ptr);
ptr += 2;
}
else
{
sz = *ptr;
ptr++;
}
char buf[sz + 1];
memcpy(buf, str, sz);
memcpy(buf, ptr, sz);
buf[sz] = '\0';
ptr += sz;
avro_value_set_string(&field, buf);
ss_dassert(ptr < end);
}
else if (column_is_blob(map->column_types[i]))
{
@ -596,6 +616,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
ptr += bytes;
avro_value_set_bytes(&field, ptr, len);
ptr += len;
ss_dassert(ptr < end);
}
else if (column_is_temporal(map->column_types[i]))
{
@ -604,6 +625,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
ptr += unpack_temporal_value(map->column_types[i], ptr, &metadata[metadata_offset], &tm);
format_temporal_value(buf, sizeof(buf), map->column_types[i], &tm);
avro_value_set_string(&field, buf);
ss_dassert(ptr < end);
}
/** All numeric types (INT, LONG, FLOAT etc.) */
else
@ -613,6 +635,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
ptr += unpack_numeric_field(ptr, map->column_types[i],
&metadata[metadata_offset], lval);
set_numeric_field_value(&field, map->column_types[i], &metadata[metadata_offset], lval);
ss_dassert(ptr < end);
}
ss_dassert(metadata_offset <= map->column_metadata_size);
metadata_offset += get_metadata_len(map->column_types[i]);