Merge branch '2.0' into 2.1-merge-2.0

This commit is contained in:
Markus Mäkelä
2017-05-14 10:25:12 +03:00
6 changed files with 296 additions and 102 deletions

View File

@ -85,7 +85,7 @@ bool column_is_decimal(uint8_t type);
bool fixed_string_is_enum(uint8_t type); bool fixed_string_is_enum(uint8_t type);
/** Value unpacking */ /** Value unpacking */
size_t unpack_temporal_value(uint8_t type, uint8_t *ptr, uint8_t* metadata, struct tm *tm); size_t unpack_temporal_value(uint8_t type, uint8_t *ptr, uint8_t* metadata, int length, struct tm *tm);
size_t unpack_enum(uint8_t *ptr, uint8_t *metadata, uint8_t *dest); size_t unpack_enum(uint8_t *ptr, uint8_t *metadata, uint8_t *dest);
size_t unpack_numeric_field(uint8_t *ptr, uint8_t type, uint8_t* metadata, uint8_t* val); size_t unpack_numeric_field(uint8_t *ptr, uint8_t type, uint8_t* metadata, uint8_t* val);
size_t unpack_bit(uint8_t *ptr, uint8_t *null_mask, uint32_t col_count, size_t unpack_bit(uint8_t *ptr, uint8_t *null_mask, uint32_t col_count,

View File

@ -25,6 +25,10 @@
#include <strings.h> #include <strings.h>
#include <math.h> #include <math.h>
#include <maxscale/protocol/mysql.h>
static uint64_t unpack_bytes(uint8_t *ptr, size_t bytes);
/** /**
* @brief Convert a table column type to a string * @brief Convert a table column type to a string
* *
@ -216,6 +220,35 @@ static void unpack_year(uint8_t *ptr, struct tm *dest)
dest->tm_year = *ptr; dest->tm_year = *ptr;
} }
/** Base-10 logarithm values */
int64_t log_10_values[] =
{
1,
10,
100,
1000,
10000,
100000,
1000000,
10000000,
100000000
};
/**
* If the TABLE_COL_TYPE_DATETIME type field is declared as a datetime with
* extra precision, the packed length is shorter than 8 bytes.
*/
size_t datetime_sizes[] =
{
5, // DATETIME(0)
6, // DATETIME(1)
6, // DATETIME(2)
7, // DATETIME(3)
7, // DATETIME(4)
7, // DATETIME(5)
8 // DATETIME(6)
};
/** /**
* @brief Unpack a DATETIME * @brief Unpack a DATETIME
* *
@ -224,21 +257,52 @@ static void unpack_year(uint8_t *ptr, struct tm *dest)
* @param val Value read from the binary log * @param val Value read from the binary log
* @param dest Pointer where the unpacked value is stored * @param dest Pointer where the unpacked value is stored
*/ */
static void unpack_datetime(uint8_t *ptr, struct tm *dest) static void unpack_datetime(uint8_t *ptr, int length, struct tm *dest)
{ {
uint64_t val = 0; int64_t val = 0;
memcpy(&val, ptr, sizeof(val)); uint32_t second, minute, hour, day, month, year;
uint32_t second = val - ((val / 100) * 100);
val /= 100; if (length == -1)
uint32_t minute = val - ((val / 100) * 100); {
val /= 100; val = gw_mysql_get_byte8(ptr);
uint32_t hour = val - ((val / 100) * 100); second = val - ((val / 100) * 100);
val /= 100; val /= 100;
uint32_t day = val - ((val / 100) * 100); minute = val - ((val / 100) * 100);
val /= 100; val /= 100;
uint32_t month = val - ((val / 100) * 100); hour = val - ((val / 100) * 100);
val /= 100; val /= 100;
uint32_t year = val; day = val - ((val / 100) * 100);
val /= 100;
month = val - ((val / 100) * 100);
val /= 100;
year = val;
}
else
{
// TODO: Figure out why DATETIME(0) doesn't work like it others do
val = unpack_bytes(ptr, datetime_sizes[length]);
val *= log_10_values[6 - length];
if (val < 0)
{
val = -val;
}
int subsecond = val % 1000000;
val /= 1000000;
second = val % 60;
val /= 60;
minute = val % 60;
val /= 60;
hour = val % 24;
val /= 24;
day = val % 32;
val /= 32;
month = val % 13;
val /= 13;
year = val;
}
memset(dest, 0, sizeof(struct tm)); memset(dest, 0, sizeof(struct tm));
dest->tm_year = year - 1900; dest->tm_year = year - 1900;
@ -391,14 +455,13 @@ size_t unpack_bit(uint8_t *ptr, uint8_t *null_mask, uint32_t col_count,
return metadata[1]; return metadata[1];
} }
/** /**
* @brief Get the length of a temporal field * @brief Get the length of a temporal field
* @param type Field type * @param type Field type
* @param decimals How many decimals the field has * @param decimals How many decimals the field has
* @return Number of bytes the temporal value takes * @return Number of bytes the temporal value takes
*/ */
static size_t temporal_field_size(uint8_t type, uint8_t decimals) static size_t temporal_field_size(uint8_t type, uint8_t decimals, int length)
{ {
switch (type) switch (type)
{ {
@ -412,8 +475,8 @@ static size_t temporal_field_size(uint8_t type, uint8_t decimals)
case TABLE_COL_TYPE_TIME2: case TABLE_COL_TYPE_TIME2:
return 3 + ((decimals + 1) / 2); return 3 + ((decimals + 1) / 2);
case TABLE_COL_TYPE_DATETIME: case TABLE_COL_TYPE_DATETIME:
return 8; return length < 0 || length > 6 ? 8 : datetime_sizes[length];
case TABLE_COL_TYPE_TIMESTAMP: case TABLE_COL_TYPE_TIMESTAMP:
return 4; return 4;
@ -441,40 +504,40 @@ static size_t temporal_field_size(uint8_t type, uint8_t decimals)
* @param val Extracted packed value * @param val Extracted packed value
* @param tm Pointer where the unpacked temporal value is stored * @param tm Pointer where the unpacked temporal value is stored
*/ */
size_t unpack_temporal_value(uint8_t type, uint8_t *ptr, uint8_t *metadata, struct tm *tm) size_t unpack_temporal_value(uint8_t type, uint8_t *ptr, uint8_t *metadata, int length, struct tm *tm)
{ {
switch (type) switch (type)
{ {
case TABLE_COL_TYPE_YEAR: case TABLE_COL_TYPE_YEAR:
unpack_year(ptr, tm); unpack_year(ptr, tm);
break; break;
case TABLE_COL_TYPE_DATETIME: case TABLE_COL_TYPE_DATETIME:
unpack_datetime(ptr, tm); unpack_datetime(ptr, length, tm);
break; break;
case TABLE_COL_TYPE_DATETIME2: case TABLE_COL_TYPE_DATETIME2:
unpack_datetime2(ptr, *metadata, tm); unpack_datetime2(ptr, *metadata, tm);
break; break;
case TABLE_COL_TYPE_TIME: case TABLE_COL_TYPE_TIME:
unpack_time(ptr, tm); unpack_time(ptr, tm);
break; break;
case TABLE_COL_TYPE_DATE: case TABLE_COL_TYPE_DATE:
unpack_date(ptr, tm); unpack_date(ptr, tm);
break; break;
case TABLE_COL_TYPE_TIMESTAMP: case TABLE_COL_TYPE_TIMESTAMP:
case TABLE_COL_TYPE_TIMESTAMP2: case TABLE_COL_TYPE_TIMESTAMP2:
unpack_timestamp(ptr, *metadata, tm); unpack_timestamp(ptr, *metadata, tm);
break; break;
default: default:
ss_dassert(false); ss_dassert(false);
break; break;
} }
return temporal_field_size(type, *metadata); return temporal_field_size(type, *metadata, length);
} }
void format_temporal_value(char *str, size_t size, uint8_t type, struct tm *tm) void format_temporal_value(char *str, size_t size, uint8_t type, struct tm *tm)

View File

@ -475,6 +475,17 @@ void notify_all_clients(AVRO_INSTANCE *router)
} }
} }
void do_checkpoint(AVRO_INSTANCE *router, uint64_t *total_rows, uint64_t *total_commits)
{
update_used_tables(router);
avro_flush_all_tables(router, AVROROUTER_FLUSH);
avro_save_conversion_state(router);
notify_all_clients(router);
*total_rows += router->row_count;
*total_commits += router->trx_count;
router->row_count = router->trx_count = 0;
}
/** /**
* @brief Read all replication events from a binlog file. * @brief Read all replication events from a binlog file.
* *
@ -555,6 +566,8 @@ avro_binlog_end_t avro_read_all_events(AVRO_INSTANCE *router)
} }
else else
{ {
do_checkpoint(router, &total_rows, &total_commits);
MXS_INFO("Processed %lu transactions and %lu row events.", MXS_INFO("Processed %lu transactions and %lu row events.",
total_commits, total_rows); total_commits, total_rows);
if (rotate_seen) if (rotate_seen)
@ -742,13 +755,7 @@ avro_binlog_end_t avro_read_all_events(AVRO_INSTANCE *router)
if (router->row_count >= router->row_target || if (router->row_count >= router->row_target ||
router->trx_count >= router->trx_target) router->trx_count >= router->trx_target)
{ {
update_used_tables(router); do_checkpoint(router, &total_rows, &total_commits);
avro_flush_all_tables(router, AVROROUTER_SYNC);
avro_save_conversion_state(router);
notify_all_clients(router);
total_rows += router->row_count;
total_commits += router->trx_count;
router->row_count = router->trx_count = 0;
} }
} }

View File

@ -162,6 +162,11 @@ bool handle_table_map_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr
"table until a DDL statement for it is read.", table_ident); "table until a DDL statement for it is read.", table_ident);
} }
if (rval)
{
MXS_INFO("Table Map for '%s' at %lu", table_ident, router->current_pos);
}
return rval; return rval;
} }
@ -289,9 +294,13 @@ bool handle_row_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr)
* beforehand so we must continue processing them until we reach the end * beforehand so we must continue processing them until we reach the end
* of the event. */ * of the event. */
int rows = 0; int rows = 0;
MXS_INFO("Row Event for '%s' at %lu", table_ident, router->current_pos);
while (ptr - start < hdr->event_size - BINLOG_EVENT_HDR_LEN) while (ptr - start < hdr->event_size - BINLOG_EVENT_HDR_LEN)
{ {
static uint64_t total_row_count = 1;
MXS_INFO("Row %lu", total_row_count++);
/** Add the current GTID and timestamp */ /** Add the current GTID and timestamp */
uint8_t *end = ptr + hdr->event_size - BINLOG_EVENT_HDR_LEN; uint8_t *end = ptr + hdr->event_size - BINLOG_EVENT_HDR_LEN;
int event_type = get_event_type(hdr->event_type); int event_type = get_event_type(hdr->event_type);
@ -517,6 +526,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
npresent++; npresent++;
if (bit_is_set(null_bitmap, ncolumns, i)) if (bit_is_set(null_bitmap, ncolumns, i))
{ {
MXS_INFO("[%ld] NULL", i);
if (column_is_blob(map->column_types[i])) if (column_is_blob(map->column_types[i]))
{ {
uint8_t nullvalue = 0; uint8_t nullvalue = 0;
@ -546,6 +556,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
MXS_WARNING("ENUM/SET values larger than 255 values aren't supported."); MXS_WARNING("ENUM/SET values larger than 255 values aren't supported.");
} }
avro_value_set_string(&field, strval); avro_value_set_string(&field, strval);
MXS_INFO("[%ld] ENUM: %lu bytes", i, bytes);
ptr += bytes; ptr += bytes;
ss_dassert(ptr < end); ss_dassert(ptr < end);
} }
@ -562,15 +573,23 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
* one or two bytes for string length. * one or two bytes for string length.
*/ */
uint8_t bytes = *ptr++; uint16_t meta = metadata[metadata_offset + 1] + (metadata[metadata_offset] << 8);
int len = metadata[metadata_offset] + int bytes = 0;
(((metadata[metadata_offset + 1] >> 4) & 0x3) ^ 0x3); uint16_t extra_length = (((meta >> 4) & 0x300) ^ 0x300);
uint16_t field_length = (meta & 0xff) + extra_length;
if (len <= 255) if (field_length > 255)
{ {
bytes += *ptr++ << 8; bytes = ptr[0] + (ptr[1] << 8);
ptr += 2;
}
else
{
bytes = *ptr++;
} }
MXS_INFO("[%ld] CHAR: field: %d bytes, data: %d bytes", i, field_length, bytes);
ss_dassert(bytes || *ptr == '\0');
char str[bytes + 1]; char str[bytes + 1];
memcpy(str, ptr, bytes); memcpy(str, ptr, bytes);
str[bytes] = '\0'; str[bytes] = '\0';
@ -595,6 +614,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
MXS_WARNING("BIT is not currently supported, values are stored as 0."); MXS_WARNING("BIT is not currently supported, values are stored as 0.");
} }
avro_value_set_int(&field, value); avro_value_set_int(&field, value);
MXS_INFO("[%ld] BIT", i);
ptr += bytes; ptr += bytes;
ss_dassert(ptr < end); ss_dassert(ptr < end);
} }
@ -603,6 +623,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
double f_value = 0.0; double f_value = 0.0;
ptr += unpack_decimal_field(ptr, metadata + metadata_offset, &f_value); ptr += unpack_decimal_field(ptr, metadata + metadata_offset, &f_value);
avro_value_set_double(&field, f_value); avro_value_set_double(&field, f_value);
MXS_INFO("[%ld] DOUBLE", i);
ss_dassert(ptr < end); ss_dassert(ptr < end);
} }
else if (column_is_variable_string(map->column_types[i])) else if (column_is_variable_string(map->column_types[i]))
@ -620,6 +641,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
ptr++; ptr++;
} }
MXS_INFO("[%ld] VARCHAR: field: %d bytes, data: %lu bytes", i, bytes, sz);
char buf[sz + 1]; char buf[sz + 1];
memcpy(buf, ptr, sz); memcpy(buf, ptr, sz);
buf[sz] = '\0'; buf[sz] = '\0';
@ -633,6 +655,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
uint64_t len = 0; uint64_t len = 0;
memcpy(&len, ptr, bytes); memcpy(&len, ptr, bytes);
ptr += bytes; ptr += bytes;
MXS_INFO("[%ld] BLOB: field: %d bytes, data: %lu bytes", i, bytes, len);
if (len) if (len)
{ {
avro_value_set_bytes(&field, ptr, len); avro_value_set_bytes(&field, ptr, len);
@ -649,9 +672,12 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
{ {
char buf[80]; char buf[80];
struct tm tm; struct tm tm;
ptr += unpack_temporal_value(map->column_types[i], ptr, &metadata[metadata_offset], &tm); ptr += unpack_temporal_value(map->column_types[i], ptr,
&metadata[metadata_offset],
create->column_lengths[i], &tm);
format_temporal_value(buf, sizeof(buf), map->column_types[i], &tm); format_temporal_value(buf, sizeof(buf), map->column_types[i], &tm);
avro_value_set_string(&field, buf); avro_value_set_string(&field, buf);
MXS_INFO("[%ld] TEMPORAL: %s", i, buf);
ss_dassert(ptr < end); ss_dassert(ptr < end);
} }
/** All numeric types (INT, LONG, FLOAT etc.) */ /** All numeric types (INT, LONG, FLOAT etc.) */
@ -662,6 +688,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
ptr += unpack_numeric_field(ptr, map->column_types[i], ptr += unpack_numeric_field(ptr, map->column_types[i],
&metadata[metadata_offset], lval); &metadata[metadata_offset], lval);
set_numeric_field_value(&field, map->column_types[i], &metadata[metadata_offset], lval); set_numeric_field_value(&field, map->column_types[i], &metadata[metadata_offset], lval);
MXS_INFO("[%ld] NUMERIC: %ld", i, *((int64_t*)lval));
ss_dassert(ptr < end); ss_dassert(ptr < end);
} }
ss_dassert(metadata_offset <= map->column_metadata_size); ss_dassert(metadata_offset <= map->column_metadata_size);

View File

@ -124,9 +124,11 @@ char* json_new_schema_from_table(TABLE_MAP *map)
for (uint64_t i = 0; i < map->columns; i++) for (uint64_t i = 0; i < map->columns; i++)
{ {
json_array_append(array, json_pack_ex(&err, 0, "{s:s, s:s}", "name", json_array_append(array, json_pack_ex(&err, 0, "{s:s, s:s, s:s, s:i}",
create->column_names[i], "type", "name", create->column_names[i],
column_type_to_avro_type(map->column_types[i]))); "type", column_type_to_avro_type(map->column_types[i]),
"real_type", create->column_types[i],
"length", create->column_lengths[i]));
} }
json_object_set_new(schema, "fields", array); json_object_set_new(schema, "fields", array);
char* rval = json_dumps(schema, JSON_PRESERVE_ORDER); char* rval = json_dumps(schema, JSON_PRESERVE_ORDER);
@ -172,8 +174,10 @@ bool json_extract_field_names(const char* filename, TABLE_CREATE *table)
{ {
int array_size = json_array_size(arr); int array_size = json_array_size(arr);
table->column_names = (char**)MXS_MALLOC(sizeof(char*) * (array_size)); table->column_names = (char**)MXS_MALLOC(sizeof(char*) * (array_size));
table->column_types = (char**)MXS_MALLOC(sizeof(char*) * (array_size));
table->column_lengths = (int*)MXS_MALLOC(sizeof(int) * (array_size));
if (table->column_names) if (table->column_names && table->column_types && table->column_lengths)
{ {
int columns = 0; int columns = 0;
rval = true; rval = true;
@ -184,6 +188,28 @@ bool json_extract_field_names(const char* filename, TABLE_CREATE *table)
if (json_is_object(val)) if (json_is_object(val))
{ {
json_t* value;
if ((value = json_object_get(val, "real_type")) && json_is_string(value))
{
table->column_types[columns] = MXS_STRDUP_A(json_string_value(value));
}
else
{
table->column_types[columns] = MXS_STRDUP_A("unknown");
MXS_WARNING("No \"real_type\" value defined. Treating as unknown type field.");
}
if ((value = json_object_get(val, "length")) && json_is_integer(value))
{
table->column_lengths[columns] = json_integer_value(value);
}
else
{
table->column_lengths[columns] = -1;
MXS_WARNING("No \"length\" value defined. Treating as default length field.");
}
json_t *name = json_object_get(val, "name"); json_t *name = json_object_get(val, "name");
if (name && json_is_string(name)) if (name && json_is_string(name))
{ {
@ -489,7 +515,6 @@ static const char *extract_field_name(const char* ptr, char* dest, size_t size)
dest[bytes] = '\0'; dest[bytes] = '\0';
make_valid_avro_identifier(dest); make_valid_avro_identifier(dest);
ptr = next_field_definition(ptr);
} }
else else
{ {
@ -499,56 +524,98 @@ static const char *extract_field_name(const char* ptr, char* dest, size_t size)
return ptr; return ptr;
} }
int extract_type_length(const char* ptr, char *dest)
{
/** Skip any leading whitespace */
while (isspace(*ptr) || *ptr == '`')
{
ptr++;
}
/** The field type definition starts here */
const char *start = ptr;
/** Skip characters until we either hit a whitespace character or the start
* of the length definition. */
while (!isspace(*ptr) && *ptr != '(')
{
ptr++;
}
/** Store type */
int typelen = ptr - start;
memcpy(dest, start, typelen);
dest[typelen] = '\0';
/** Skip whitespace */
while (isspace(*ptr))
{
ptr++;
}
int rval = -1; // No length defined
/** Start of length definition */
if (*ptr == '(')
{
ptr++;
char *end;
int val = strtol(ptr, &end, 10);
if (*end == ')')
{
rval = val;
}
}
return rval;
}
int count_columns(const char* ptr)
{
int i = 2;
while ((ptr = strchr(ptr, ',')))
{
ptr++;
i++;
}
return i;
}
/** /**
* Process a table definition into an array of column names * Process a table definition into an array of column names
* @param nameptr table definition * @param nameptr table definition
* @return Number of processed columns or -1 on error * @return Number of processed columns or -1 on error
*/ */
static int process_column_definition(const char *nameptr, char*** dest) static int process_column_definition(const char *nameptr, char*** dest, char*** dest_types, int** dest_lens)
{ {
/** Process columns in groups of 8 */ int n = count_columns(nameptr);
size_t chunks = 1; *dest = MXS_MALLOC(sizeof(char*) * n);
const size_t chunk_size = 8; *dest_types = MXS_MALLOC(sizeof(char*) * n);
int i = 0; *dest_lens = MXS_MALLOC(sizeof(int) * n);
char **names = MXS_MALLOC(sizeof(char*) * (chunks * chunk_size + 1));
if (names == NULL)
{
return -1;
}
char **names = *dest;
char **types = *dest_types;
int *lengths = *dest_lens;
char colname[512]; char colname[512];
int i = 0;
while ((nameptr = extract_field_name(nameptr, colname, sizeof(colname)))) while ((nameptr = extract_field_name(nameptr, colname, sizeof(colname))))
{ {
if (i >= chunks * chunk_size) ss_dassert(i < n);
{ char type[100] = "";
char **tmp = MXS_REALLOC(names, (++chunks * chunk_size + 1) * sizeof(char*)); int len = extract_type_length(nameptr, type);
if (tmp == NULL) nameptr = next_field_definition(nameptr);
{ fix_reserved_word(colname);
for (int x = 0; x < i; x++)
{
MXS_FREE(names[x]);
}
MXS_FREE(names);
return -1;
}
names = tmp;
}
if ((names[i++] = MXS_STRDUP(colname)) == NULL) lengths[i] = len;
{ types[i] = MXS_STRDUP_A(type);
for (int x = 0; x < i; x++) names[i] = MXS_STRDUP_A(colname);
{ i++;
MXS_FREE(names[x]);
}
MXS_FREE(names);
return -1;
}
} }
*dest = names;
return i; return i;
} }
@ -601,7 +668,7 @@ TABLE_CREATE* table_create_alloc(const char* sql, const char* event_db)
char database[MYSQL_DATABASE_MAXLEN + 1]; char database[MYSQL_DATABASE_MAXLEN + 1];
const char *db = event_db; const char *db = event_db;
MXS_DEBUG("Create table statement: %.*s", stmt_len, statement_sql); MXS_INFO("Create table: %s", sql);
if (!get_table_name(sql, table)) if (!get_table_name(sql, table))
{ {
@ -621,8 +688,10 @@ TABLE_CREATE* table_create_alloc(const char* sql, const char* event_db)
db = database; db = database;
} }
int* lengths = NULL;
char **names = NULL; char **names = NULL;
int n_columns = process_column_definition(statement_sql, &names); char **types = NULL;
int n_columns = process_column_definition(statement_sql, &names, &types, &lengths);
ss_dassert(n_columns > 0); ss_dassert(n_columns > 0);
/** We have appear to have a valid CREATE TABLE statement */ /** We have appear to have a valid CREATE TABLE statement */
@ -634,6 +703,8 @@ TABLE_CREATE* table_create_alloc(const char* sql, const char* event_db)
rval->version = 1; rval->version = 1;
rval->was_used = false; rval->was_used = false;
rval->column_names = names; rval->column_names = names;
rval->column_lengths = lengths;
rval->column_types = types;
rval->columns = n_columns; rval->columns = n_columns;
rval->database = MXS_STRDUP(db); rval->database = MXS_STRDUP(db);
rval->table = MXS_STRDUP(table); rval->table = MXS_STRDUP(table);
@ -675,8 +746,11 @@ void table_create_free(TABLE_CREATE* value)
for (uint64_t i = 0; i < value->columns; i++) for (uint64_t i = 0; i < value->columns; i++)
{ {
MXS_FREE(value->column_names[i]); MXS_FREE(value->column_names[i]);
MXS_FREE(value->column_types[i]);
} }
MXS_FREE(value->column_names); MXS_FREE(value->column_names);
MXS_FREE(value->column_types);
MXS_FREE(value->column_lengths);
MXS_FREE(value->table); MXS_FREE(value->table);
MXS_FREE(value->database); MXS_FREE(value->database);
MXS_FREE(value); MXS_FREE(value);
@ -792,11 +866,15 @@ void make_avro_token(char* dest, const char* src, int length)
memcpy(dest, src, length); memcpy(dest, src, length);
dest[length] = '\0'; dest[length] = '\0';
fix_reserved_word(dest);
} }
int get_column_index(TABLE_CREATE *create, const char *tok) int get_column_index(TABLE_CREATE *create, const char *tok)
{ {
int idx = -1; int idx = -1;
char safe_tok[strlen(tok) + 2];
strcpy(safe_tok, tok);
fix_reserved_word(safe_tok);
for (int x = 0; x < create->columns; x++) for (int x = 0; x < create->columns; x++)
{ {
@ -821,7 +899,7 @@ bool table_create_alter(TABLE_CREATE *create, const char *sql, const char *end)
if (tok) if (tok)
{ {
MXS_DEBUG("Altering table %.*s\n", len, tok); MXS_INFO("Alter table '%.*s'; %.*s\n", len, tok, (int)(end - sql), sql);
def = tok + len; def = tok + len;
} }
@ -864,7 +942,7 @@ bool table_create_alter(TABLE_CREATE *create, const char *sql, const char *end)
if (idx != -1) if (idx != -1)
{ {
free(create->column_names[idx]); MXS_FREE(create->column_names[idx]);
for (int i = idx; i < (int)create->columns - 1; i++) for (int i = idx; i < (int)create->columns - 1; i++)
{ {
create->column_names[i] = create->column_names[i + 1]; create->column_names[i] = create->column_names[i + 1];
@ -894,7 +972,7 @@ bool table_create_alter(TABLE_CREATE *create, const char *sql, const char *end)
if (idx != -1) if (idx != -1)
{ {
free(create->column_names[idx]); MXS_FREE(create->column_names[idx]);
create->column_names[idx] = strndup(tok, len); create->column_names[idx] = strndup(tok, len);
updates++; updates++;
} }

View File

@ -84,6 +84,23 @@ static const char *avro_event_type = "event_type";
static const char *avro_timestamp = "timestamp"; static const char *avro_timestamp = "timestamp";
static char *avro_client_ouput[] = { "Undefined", "JSON", "Avro" }; static char *avro_client_ouput[] = { "Undefined", "JSON", "Avro" };
static inline bool is_reserved_word(const char* word)
{
return strcasecmp(word, avro_domain) == 0 ||
strcasecmp(word, avro_server_id) == 0 ||
strcasecmp(word, avro_sequence) == 0 ||
strcasecmp(word, avro_event_number) == 0 ||
strcasecmp(word, avro_event_type) == 0 ||
strcasecmp(word, avro_timestamp) == 0;
}
static inline void fix_reserved_word(char *tok)
{
if (is_reserved_word(tok))
{
strcat(tok, "_");
}
}
/** How a binlog file is closed */ /** How a binlog file is closed */
typedef enum avro_binlog_end typedef enum avro_binlog_end
@ -111,6 +128,8 @@ typedef struct table_create
{ {
uint64_t columns; uint64_t columns;
char **column_names; char **column_names;
char **column_types;
int* column_lengths;
char *table; char *table;
char *database; char *database;
int version; /**< How many versions of this table have been used */ int version; /**< How many versions of this table have been used */