MXS-1216: Fix crash on MariaDB 10.0 DATETIME(n)

When a MariaDB 10.0 DATETIME field with a custom length was defined, the
field offsets weren't calculated properly.

As there is no metadata for pre-10.1 DATETIME types with decimal
precision, the metadata (i.e. decimal count) needs to be gathered from the
CREATE TABLE statement. This information is then used to calculate the
correct field length when the value is decoded.

This change does not fix the incorrect interpretation of the old DATETIME
value. The converted values are still garbled due to the fact that the
value needs to be shifted out of the decimal format before it can be
properly converted.
This commit is contained in:
Markus Mäkelä 2017-05-12 11:22:04 +03:00
parent 898bc3444e
commit c988735a03
5 changed files with 127 additions and 52 deletions

View File

@ -392,6 +392,20 @@ size_t unpack_bit(uint8_t *ptr, uint8_t *null_mask, uint32_t col_count,
return metadata[1];
}
/**
* If the TABLE_COL_TYPE_DATETIME type field is declared as a datetime with
* extra precision, the packed length is shorter than 8 bytes.
*/
size_t datetime_sizes[] =
{
5, // DATETIME(0)
6, // DATETIME(1)
6, // DATETIME(2)
7, // DATETIME(3)
7, // DATETIME(4)
7, // DATETIME(5)
8 // DATETIME(6)
};
/**
* @brief Get the length of a temporal field
@ -399,7 +413,7 @@ size_t unpack_bit(uint8_t *ptr, uint8_t *null_mask, uint32_t col_count,
* @param decimals How many decimals the field has
* @return Number of bytes the temporal value takes
*/
static size_t temporal_field_size(uint8_t type, uint8_t decimals)
static size_t temporal_field_size(uint8_t type, uint8_t decimals, int length)
{
switch (type)
{
@ -414,7 +428,7 @@ static size_t temporal_field_size(uint8_t type, uint8_t decimals)
return 3 + ((decimals + 1) / 2);
case TABLE_COL_TYPE_DATETIME:
return 8;
return length < 0 || length > 6 ? 8 : datetime_sizes[length];
case TABLE_COL_TYPE_TIMESTAMP:
return 4;
@ -442,7 +456,7 @@ static size_t temporal_field_size(uint8_t type, uint8_t decimals)
* @param val Extracted packed value
* @param tm Pointer where the unpacked temporal value is stored
*/
size_t unpack_temporal_value(uint8_t type, uint8_t *ptr, uint8_t *metadata, struct tm *tm)
size_t unpack_temporal_value(uint8_t type, uint8_t *ptr, uint8_t *metadata, int length, struct tm *tm)
{
switch (type)
{
@ -475,7 +489,7 @@ size_t unpack_temporal_value(uint8_t type, uint8_t *ptr, uint8_t *metadata, stru
ss_dassert(false);
break;
}
return temporal_field_size(type, *metadata);
return temporal_field_size(type, *metadata, length);
}
void format_temporal_value(char *str, size_t size, uint8_t type, struct tm *tm)

View File

@ -83,7 +83,7 @@ bool column_is_decimal(uint8_t type);
bool fixed_string_is_enum(uint8_t type);
/** Value unpacking */
size_t unpack_temporal_value(uint8_t type, uint8_t *ptr, uint8_t* metadata, struct tm *tm);
size_t unpack_temporal_value(uint8_t type, uint8_t *ptr, uint8_t* metadata, int length, struct tm *tm);
size_t unpack_enum(uint8_t *ptr, uint8_t *metadata, uint8_t *dest);
size_t unpack_numeric_field(uint8_t *ptr, uint8_t type, uint8_t* metadata, uint8_t* val);
size_t unpack_bit(uint8_t *ptr, uint8_t *null_mask, uint32_t col_count,

View File

@ -98,6 +98,8 @@ typedef struct table_create
{
uint64_t columns;
char **column_names;
char **column_types;
int* column_lengths;
char *table;
char *database;
int version; /**< How many versions of this table have been used */

View File

@ -161,6 +161,11 @@ bool handle_table_map_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr
"table until a DDL statement for it is read.", table_ident);
}
if (rval)
{
MXS_INFO("Table Map for '%s' at %lu", table_ident, router->current_pos);
}
return rval;
}
@ -288,9 +293,13 @@ bool handle_row_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr)
* beforehand so we must continue processing them until we reach the end
* of the event. */
int rows = 0;
MXS_INFO("Row Event for '%s' at %lu", table_ident, router->current_pos);
while (ptr - start < hdr->event_size - BINLOG_EVENT_HDR_LEN)
{
static uint64_t total_row_count = 1;
MXS_INFO("Row %lu", total_row_count++);
/** Add the current GTID and timestamp */
uint8_t *end = ptr + hdr->event_size - BINLOG_EVENT_HDR_LEN;
int event_type = get_event_type(hdr->event_type);
@ -516,6 +525,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
npresent++;
if (bit_is_set(null_bitmap, ncolumns, i))
{
MXS_INFO("[%ld] NULL", i);
if (column_is_blob(map->column_types[i]))
{
uint8_t nullvalue = 0;
@ -545,6 +555,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
MXS_WARNING("ENUM/SET values larger than 255 values aren't supported.");
}
avro_value_set_string(&field, strval);
MXS_INFO("[%ld] ENUM: %lu bytes", i, bytes);
ptr += bytes;
ss_dassert(ptr < end);
}
@ -594,6 +605,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
MXS_WARNING("BIT is not currently supported, values are stored as 0.");
}
avro_value_set_int(&field, value);
MXS_INFO("[%ld] BIT", i);
ptr += bytes;
ss_dassert(ptr < end);
}
@ -602,6 +614,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
double f_value = 0.0;
ptr += unpack_decimal_field(ptr, metadata + metadata_offset, &f_value);
avro_value_set_double(&field, f_value);
MXS_INFO("[%ld] DOUBLE", i);
ss_dassert(ptr < end);
}
else if (column_is_variable_string(map->column_types[i]))
@ -619,6 +632,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
ptr++;
}
MXS_INFO("[%ld] VARCHAR: field: %d bytes, data: %lu bytes", i, bytes, sz);
char buf[sz + 1];
memcpy(buf, ptr, sz);
buf[sz] = '\0';
@ -632,6 +646,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
uint64_t len = 0;
memcpy(&len, ptr, bytes);
ptr += bytes;
MXS_INFO("[%ld] BLOB: field: %d bytes, data: %lu bytes", i, bytes, len);
if (len)
{
avro_value_set_bytes(&field, ptr, len);
@ -648,9 +663,12 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
{
char buf[80];
struct tm tm;
ptr += unpack_temporal_value(map->column_types[i], ptr, &metadata[metadata_offset], &tm);
ptr += unpack_temporal_value(map->column_types[i], ptr,
&metadata[metadata_offset],
create->column_lengths[i], &tm);
format_temporal_value(buf, sizeof(buf), map->column_types[i], &tm);
avro_value_set_string(&field, buf);
MXS_INFO("[%ld] TEMPORAL: %s", i, buf);
ss_dassert(ptr < end);
}
/** All numeric types (INT, LONG, FLOAT etc.) */

View File

@ -491,7 +491,6 @@ static const char *extract_field_name(const char* ptr, char* dest, size_t size)
dest[bytes] = '\0';
make_valid_avro_identifier(dest);
ptr = next_field_definition(ptr);
}
else
{
@ -501,62 +500,97 @@ static const char *extract_field_name(const char* ptr, char* dest, size_t size)
return ptr;
}
int extract_type_length(const char* ptr, char *dest)
{
/** Skip any leading whitespace */
while (isspace(*ptr) || *ptr == '`')
{
ptr++;
}
/** The field type definition starts here */
const char *start = ptr;
/** Skip characters until we either hit a whitespace character or the start
* of the length definition. */
while (!isspace(*ptr) && *ptr != '(')
{
ptr++;
}
/** Store type */
int typelen = ptr - start;
memcpy(dest, start, typelen);
dest[typelen] = '\0';
/** Skip whitespace */
while (isspace(*ptr))
{
ptr++;
}
int rval = -1; // No length defined
/** Start of length definition */
if (*ptr == '(')
{
ptr++;
char *end;
int val = strtol(ptr, &end, 10);
if (*end == ')')
{
rval = val;
}
}
return rval;
}
int count_columns(const char* ptr)
{
int i = 2;
while ((ptr = strchr(ptr, ',')))
{
ptr++;
i++;
}
return i;
}
/**
* Process a table definition into an array of column names
* @param nameptr table definition
* @return Number of processed columns or -1 on error
*/
static int process_column_definition(const char *nameptr, char*** dest)
static int process_column_definition(const char *nameptr, char*** dest, char*** dest_types, int** dest_lens)
{
/** Process columns in groups of 8 */
size_t chunks = 1;
const size_t chunk_size = 8;
int i = 0;
char **names = malloc(sizeof(char*) * (chunks * chunk_size + 1));
if (names == NULL)
{
MXS_ERROR("Memory allocation failed when trying allocate %ld bytes of memory.",
sizeof(char*) * chunks);
return -1;
}
int n = count_columns(nameptr);
*dest = malloc(sizeof(char*) * n);
*dest_types = malloc(sizeof(char*) * n);
*dest_lens = malloc(sizeof(int) * n);
char **names = *dest;
char **types = *dest_types;
int *lengths = *dest_lens;
char colname[512];
int i = 0;
while ((nameptr = extract_field_name(nameptr, colname, sizeof(colname))))
{
if (i >= chunks * chunk_size)
{
char **tmp = realloc(names, (++chunks * chunk_size + 1) * sizeof(char*));
if (tmp == NULL)
{
for (int x = 0; x < i; x++)
{
free(names[x]);
}
free(names);
MXS_ERROR("Memory allocation failed when trying allocate %ld bytes of memory.",
sizeof(char*) * chunks);
return -1;
}
names = tmp;
}
ss_dassert(i < n);
char type[100] = "";
int len = extract_type_length(nameptr, type);
nameptr = next_field_definition(nameptr);
if ((names[i++] = strdup(colname)) == NULL)
{
for (int x = 0; x < i; x++)
{
free(names[x]);
}
free(names);
MXS_ERROR("Memory allocation failed when trying allocate %lu bytes "
"of memory.", strlen(colname));
return -1;
}
lengths[i] = len;
types[i] = strdup(type);
names[i] = strdup(colname);
i++;
}
*dest = names;
return i;
}
@ -600,7 +634,7 @@ TABLE_CREATE* table_create_alloc(const char* sql, const char* event_db)
char database[MYSQL_DATABASE_MAXLEN + 1];
const char *db = event_db;
MXS_DEBUG("Create table statement: %.*s", stmt_len, statement_sql);
MXS_INFO("Create table: %s", sql);
if (!get_table_name(sql, table))
{
@ -620,8 +654,10 @@ TABLE_CREATE* table_create_alloc(const char* sql, const char* event_db)
db = database;
}
int* lengths = NULL;
char **names = NULL;
int n_columns = process_column_definition(statement_sql, &names);
char **types = NULL;
int n_columns = process_column_definition(statement_sql, &names, &types, &lengths);
ss_dassert(n_columns > 0);
/** We have appear to have a valid CREATE TABLE statement */
@ -633,6 +669,8 @@ TABLE_CREATE* table_create_alloc(const char* sql, const char* event_db)
rval->version = 1;
rval->was_used = false;
rval->column_names = names;
rval->column_lengths = lengths;
rval->column_types = types;
rval->columns = n_columns;
rval->database = strdup(db);
rval->table = strdup(table);
@ -675,8 +713,11 @@ void* table_create_free(TABLE_CREATE* value)
for (uint64_t i = 0; i < value->columns; i++)
{
free(value->column_names[i]);
free(value->column_types[i]);
}
free(value->column_names);
free(value->column_types);
free(value->column_lengths);
free(value->table);
free(value->database);
free(value);
@ -822,7 +863,7 @@ bool table_create_alter(TABLE_CREATE *create, const char *sql, const char *end)
if (tok)
{
MXS_DEBUG("Altering table %.*s\n", len, tok);
MXS_INFO("Alter table '%.*s'; %.*s\n", len, tok, (int)(end - sql), sql);
def = tok + len;
}