MXS-1216: Fix crash on MariaDB 10.0 DATETIME(n)
When a MariaDB 10.0 DATETIME field with a custom length was defined, the field offsets weren't calculated properly. As there is no metadata for pre-10.1 DATETIME types with decimal precision, the metadata (i.e. decimal count) needs to be gathered from the CREATE TABLE statement. This information is then used to calculate the correct field length when the value is decoded. This change does not fix the incorrect interpretation of the old DATETIME value. The converted values are still garbled due to the fact that the value needs to be shifted out of the decimal format before it can be properly converted.
This commit is contained in:
@ -161,6 +161,11 @@ bool handle_table_map_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr
|
||||
"table until a DDL statement for it is read.", table_ident);
|
||||
}
|
||||
|
||||
if (rval)
|
||||
{
|
||||
MXS_INFO("Table Map for '%s' at %lu", table_ident, router->current_pos);
|
||||
}
|
||||
|
||||
return rval;
|
||||
}
|
||||
|
||||
@ -288,9 +293,13 @@ bool handle_row_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr)
|
||||
* beforehand so we must continue processing them until we reach the end
|
||||
* of the event. */
|
||||
int rows = 0;
|
||||
MXS_INFO("Row Event for '%s' at %lu", table_ident, router->current_pos);
|
||||
|
||||
while (ptr - start < hdr->event_size - BINLOG_EVENT_HDR_LEN)
|
||||
{
|
||||
static uint64_t total_row_count = 1;
|
||||
MXS_INFO("Row %lu", total_row_count++);
|
||||
|
||||
/** Add the current GTID and timestamp */
|
||||
uint8_t *end = ptr + hdr->event_size - BINLOG_EVENT_HDR_LEN;
|
||||
int event_type = get_event_type(hdr->event_type);
|
||||
@ -516,6 +525,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
|
||||
npresent++;
|
||||
if (bit_is_set(null_bitmap, ncolumns, i))
|
||||
{
|
||||
MXS_INFO("[%ld] NULL", i);
|
||||
if (column_is_blob(map->column_types[i]))
|
||||
{
|
||||
uint8_t nullvalue = 0;
|
||||
@ -545,6 +555,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
|
||||
MXS_WARNING("ENUM/SET values larger than 255 values aren't supported.");
|
||||
}
|
||||
avro_value_set_string(&field, strval);
|
||||
MXS_INFO("[%ld] ENUM: %lu bytes", i, bytes);
|
||||
ptr += bytes;
|
||||
ss_dassert(ptr < end);
|
||||
}
|
||||
@ -594,6 +605,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
|
||||
MXS_WARNING("BIT is not currently supported, values are stored as 0.");
|
||||
}
|
||||
avro_value_set_int(&field, value);
|
||||
MXS_INFO("[%ld] BIT", i);
|
||||
ptr += bytes;
|
||||
ss_dassert(ptr < end);
|
||||
}
|
||||
@ -602,6 +614,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
|
||||
double f_value = 0.0;
|
||||
ptr += unpack_decimal_field(ptr, metadata + metadata_offset, &f_value);
|
||||
avro_value_set_double(&field, f_value);
|
||||
MXS_INFO("[%ld] DOUBLE", i);
|
||||
ss_dassert(ptr < end);
|
||||
}
|
||||
else if (column_is_variable_string(map->column_types[i]))
|
||||
@ -619,6 +632,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
|
||||
ptr++;
|
||||
}
|
||||
|
||||
MXS_INFO("[%ld] VARCHAR: field: %d bytes, data: %lu bytes", i, bytes, sz);
|
||||
char buf[sz + 1];
|
||||
memcpy(buf, ptr, sz);
|
||||
buf[sz] = '\0';
|
||||
@ -632,6 +646,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
|
||||
uint64_t len = 0;
|
||||
memcpy(&len, ptr, bytes);
|
||||
ptr += bytes;
|
||||
MXS_INFO("[%ld] BLOB: field: %d bytes, data: %lu bytes", i, bytes, len);
|
||||
if (len)
|
||||
{
|
||||
avro_value_set_bytes(&field, ptr, len);
|
||||
@ -648,9 +663,12 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
|
||||
{
|
||||
char buf[80];
|
||||
struct tm tm;
|
||||
ptr += unpack_temporal_value(map->column_types[i], ptr, &metadata[metadata_offset], &tm);
|
||||
ptr += unpack_temporal_value(map->column_types[i], ptr,
|
||||
&metadata[metadata_offset],
|
||||
create->column_lengths[i], &tm);
|
||||
format_temporal_value(buf, sizeof(buf), map->column_types[i], &tm);
|
||||
avro_value_set_string(&field, buf);
|
||||
MXS_INFO("[%ld] TEMPORAL: %s", i, buf);
|
||||
ss_dassert(ptr < end);
|
||||
}
|
||||
/** All numeric types (INT, LONG, FLOAT etc.) */
|
||||
@ -661,6 +679,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
|
||||
ptr += unpack_numeric_field(ptr, map->column_types[i],
|
||||
&metadata[metadata_offset], lval);
|
||||
set_numeric_field_value(&field, map->column_types[i], &metadata[metadata_offset], lval);
|
||||
MXS_INFO("[%ld] NUMERIC: %ld", i, *((int64_t*)lval));
|
||||
ss_dassert(ptr < end);
|
||||
}
|
||||
ss_dassert(metadata_offset <= map->column_metadata_size);
|
||||
|
||||
@ -491,7 +491,6 @@ static const char *extract_field_name(const char* ptr, char* dest, size_t size)
|
||||
dest[bytes] = '\0';
|
||||
|
||||
make_valid_avro_identifier(dest);
|
||||
ptr = next_field_definition(ptr);
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -501,62 +500,97 @@ static const char *extract_field_name(const char* ptr, char* dest, size_t size)
|
||||
return ptr;
|
||||
}
|
||||
|
||||
int extract_type_length(const char* ptr, char *dest)
|
||||
{
|
||||
/** Skip any leading whitespace */
|
||||
while (isspace(*ptr) || *ptr == '`')
|
||||
{
|
||||
ptr++;
|
||||
}
|
||||
|
||||
/** The field type definition starts here */
|
||||
const char *start = ptr;
|
||||
|
||||
/** Skip characters until we either hit a whitespace character or the start
|
||||
* of the length definition. */
|
||||
while (!isspace(*ptr) && *ptr != '(')
|
||||
{
|
||||
ptr++;
|
||||
}
|
||||
|
||||
/** Store type */
|
||||
int typelen = ptr - start;
|
||||
memcpy(dest, start, typelen);
|
||||
dest[typelen] = '\0';
|
||||
|
||||
/** Skip whitespace */
|
||||
while (isspace(*ptr))
|
||||
{
|
||||
ptr++;
|
||||
}
|
||||
|
||||
int rval = -1; // No length defined
|
||||
|
||||
/** Start of length definition */
|
||||
if (*ptr == '(')
|
||||
{
|
||||
ptr++;
|
||||
char *end;
|
||||
int val = strtol(ptr, &end, 10);
|
||||
|
||||
if (*end == ')')
|
||||
{
|
||||
rval = val;
|
||||
}
|
||||
}
|
||||
|
||||
return rval;
|
||||
}
|
||||
|
||||
int count_columns(const char* ptr)
|
||||
{
|
||||
int i = 2;
|
||||
|
||||
while ((ptr = strchr(ptr, ',')))
|
||||
{
|
||||
ptr++;
|
||||
i++;
|
||||
}
|
||||
|
||||
return i;
|
||||
}
|
||||
|
||||
/**
|
||||
* Process a table definition into an array of column names
|
||||
* @param nameptr table definition
|
||||
* @return Number of processed columns or -1 on error
|
||||
*/
|
||||
static int process_column_definition(const char *nameptr, char*** dest)
|
||||
static int process_column_definition(const char *nameptr, char*** dest, char*** dest_types, int** dest_lens)
|
||||
{
|
||||
/** Process columns in groups of 8 */
|
||||
size_t chunks = 1;
|
||||
const size_t chunk_size = 8;
|
||||
int i = 0;
|
||||
char **names = malloc(sizeof(char*) * (chunks * chunk_size + 1));
|
||||
|
||||
if (names == NULL)
|
||||
{
|
||||
MXS_ERROR("Memory allocation failed when trying allocate %ld bytes of memory.",
|
||||
sizeof(char*) * chunks);
|
||||
return -1;
|
||||
}
|
||||
int n = count_columns(nameptr);
|
||||
*dest = malloc(sizeof(char*) * n);
|
||||
*dest_types = malloc(sizeof(char*) * n);
|
||||
*dest_lens = malloc(sizeof(int) * n);
|
||||
|
||||
char **names = *dest;
|
||||
char **types = *dest_types;
|
||||
int *lengths = *dest_lens;
|
||||
char colname[512];
|
||||
int i = 0;
|
||||
|
||||
while ((nameptr = extract_field_name(nameptr, colname, sizeof(colname))))
|
||||
{
|
||||
if (i >= chunks * chunk_size)
|
||||
{
|
||||
char **tmp = realloc(names, (++chunks * chunk_size + 1) * sizeof(char*));
|
||||
if (tmp == NULL)
|
||||
{
|
||||
for (int x = 0; x < i; x++)
|
||||
{
|
||||
free(names[x]);
|
||||
}
|
||||
free(names);
|
||||
MXS_ERROR("Memory allocation failed when trying allocate %ld bytes of memory.",
|
||||
sizeof(char*) * chunks);
|
||||
return -1;
|
||||
}
|
||||
names = tmp;
|
||||
}
|
||||
ss_dassert(i < n);
|
||||
char type[100] = "";
|
||||
int len = extract_type_length(nameptr, type);
|
||||
nameptr = next_field_definition(nameptr);
|
||||
|
||||
if ((names[i++] = strdup(colname)) == NULL)
|
||||
{
|
||||
for (int x = 0; x < i; x++)
|
||||
{
|
||||
free(names[x]);
|
||||
}
|
||||
free(names);
|
||||
MXS_ERROR("Memory allocation failed when trying allocate %lu bytes "
|
||||
"of memory.", strlen(colname));
|
||||
return -1;
|
||||
}
|
||||
lengths[i] = len;
|
||||
types[i] = strdup(type);
|
||||
names[i] = strdup(colname);
|
||||
i++;
|
||||
}
|
||||
|
||||
*dest = names;
|
||||
|
||||
return i;
|
||||
}
|
||||
|
||||
@ -600,7 +634,7 @@ TABLE_CREATE* table_create_alloc(const char* sql, const char* event_db)
|
||||
char database[MYSQL_DATABASE_MAXLEN + 1];
|
||||
const char *db = event_db;
|
||||
|
||||
MXS_DEBUG("Create table statement: %.*s", stmt_len, statement_sql);
|
||||
MXS_INFO("Create table: %s", sql);
|
||||
|
||||
if (!get_table_name(sql, table))
|
||||
{
|
||||
@ -620,8 +654,10 @@ TABLE_CREATE* table_create_alloc(const char* sql, const char* event_db)
|
||||
db = database;
|
||||
}
|
||||
|
||||
int* lengths = NULL;
|
||||
char **names = NULL;
|
||||
int n_columns = process_column_definition(statement_sql, &names);
|
||||
char **types = NULL;
|
||||
int n_columns = process_column_definition(statement_sql, &names, &types, &lengths);
|
||||
ss_dassert(n_columns > 0);
|
||||
|
||||
/** We have appear to have a valid CREATE TABLE statement */
|
||||
@ -633,6 +669,8 @@ TABLE_CREATE* table_create_alloc(const char* sql, const char* event_db)
|
||||
rval->version = 1;
|
||||
rval->was_used = false;
|
||||
rval->column_names = names;
|
||||
rval->column_lengths = lengths;
|
||||
rval->column_types = types;
|
||||
rval->columns = n_columns;
|
||||
rval->database = strdup(db);
|
||||
rval->table = strdup(table);
|
||||
@ -675,8 +713,11 @@ void* table_create_free(TABLE_CREATE* value)
|
||||
for (uint64_t i = 0; i < value->columns; i++)
|
||||
{
|
||||
free(value->column_names[i]);
|
||||
free(value->column_types[i]);
|
||||
}
|
||||
free(value->column_names);
|
||||
free(value->column_types);
|
||||
free(value->column_lengths);
|
||||
free(value->table);
|
||||
free(value->database);
|
||||
free(value);
|
||||
@ -822,7 +863,7 @@ bool table_create_alter(TABLE_CREATE *create, const char *sql, const char *end)
|
||||
|
||||
if (tok)
|
||||
{
|
||||
MXS_DEBUG("Altering table %.*s\n", len, tok);
|
||||
MXS_INFO("Alter table '%.*s'; %.*s\n", len, tok, (int)(end - sql), sql);
|
||||
def = tok + len;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user