From c988735a03bdc923e2303e1c7094be3229b4f535 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Fri, 12 May 2017 11:22:04 +0300 Subject: [PATCH 1/6] MXS-1216: Fix crash on MariaDB 10.0 DATETIME(n) When a MariaDB 10.0 DATETIME field with a custom length was defined, the field offsets weren't calculated properly. As there is no metadata for pre-10.1 DATETIME types with decimal precision, the metadata (i.e. decimal count) needs to be gathered from the CREATE TABLE statement. This information is then used to calculate the correct field length when the value is decoded. This change does not fix the incorrect interpretation of the old DATETIME value. The converted values are still garbled due to the fact that the value needs to be shifted out of the decimal format before it can be properly converted. --- server/core/mysql_binlog.c | 22 +++- server/include/mysql_binlog.h | 2 +- server/modules/include/avrorouter.h | 2 + server/modules/routing/avro/avro_rbr.c | 20 +++- server/modules/routing/avro/avro_schema.c | 133 ++++++++++++++-------- 5 files changed, 127 insertions(+), 52 deletions(-) diff --git a/server/core/mysql_binlog.c b/server/core/mysql_binlog.c index ddf5825c0..90590a3da 100644 --- a/server/core/mysql_binlog.c +++ b/server/core/mysql_binlog.c @@ -392,6 +392,20 @@ size_t unpack_bit(uint8_t *ptr, uint8_t *null_mask, uint32_t col_count, return metadata[1]; } +/** + * If the TABLE_COL_TYPE_DATETIME type field is declared as a datetime with + * extra precision, the packed length is shorter than 8 bytes. + */ +size_t datetime_sizes[] = +{ + 5, // DATETIME(0) + 6, // DATETIME(1) + 6, // DATETIME(2) + 7, // DATETIME(3) + 7, // DATETIME(4) + 7, // DATETIME(5) + 8 // DATETIME(6) +}; /** * @brief Get the length of a temporal field @@ -399,7 +413,7 @@ size_t unpack_bit(uint8_t *ptr, uint8_t *null_mask, uint32_t col_count, * @param decimals How many decimals the field has * @return Number of bytes the temporal value takes */ -static size_t temporal_field_size(uint8_t type, uint8_t decimals) +static size_t temporal_field_size(uint8_t type, uint8_t decimals, int length) { switch (type) { @@ -414,7 +428,7 @@ static size_t temporal_field_size(uint8_t type, uint8_t decimals) return 3 + ((decimals + 1) / 2); case TABLE_COL_TYPE_DATETIME: - return 8; + return length < 0 || length > 6 ? 8 : datetime_sizes[length]; case TABLE_COL_TYPE_TIMESTAMP: return 4; @@ -442,7 +456,7 @@ static size_t temporal_field_size(uint8_t type, uint8_t decimals) * @param val Extracted packed value * @param tm Pointer where the unpacked temporal value is stored */ -size_t unpack_temporal_value(uint8_t type, uint8_t *ptr, uint8_t *metadata, struct tm *tm) +size_t unpack_temporal_value(uint8_t type, uint8_t *ptr, uint8_t *metadata, int length, struct tm *tm) { switch (type) { @@ -475,7 +489,7 @@ size_t unpack_temporal_value(uint8_t type, uint8_t *ptr, uint8_t *metadata, stru ss_dassert(false); break; } - return temporal_field_size(type, *metadata); + return temporal_field_size(type, *metadata, length); } void format_temporal_value(char *str, size_t size, uint8_t type, struct tm *tm) diff --git a/server/include/mysql_binlog.h b/server/include/mysql_binlog.h index b1a81eaa2..ebe5574e5 100644 --- a/server/include/mysql_binlog.h +++ b/server/include/mysql_binlog.h @@ -83,7 +83,7 @@ bool column_is_decimal(uint8_t type); bool fixed_string_is_enum(uint8_t type); /** Value unpacking */ -size_t unpack_temporal_value(uint8_t type, uint8_t *ptr, uint8_t* metadata, struct tm *tm); +size_t unpack_temporal_value(uint8_t type, uint8_t *ptr, uint8_t* metadata, int length, struct tm *tm); size_t unpack_enum(uint8_t *ptr, uint8_t *metadata, uint8_t *dest); size_t unpack_numeric_field(uint8_t *ptr, uint8_t type, uint8_t* metadata, uint8_t* val); size_t unpack_bit(uint8_t *ptr, uint8_t *null_mask, uint32_t col_count, diff --git a/server/modules/include/avrorouter.h b/server/modules/include/avrorouter.h index a233915b4..b621c3026 100644 --- a/server/modules/include/avrorouter.h +++ b/server/modules/include/avrorouter.h @@ -98,6 +98,8 @@ typedef struct table_create { uint64_t columns; char **column_names; + char **column_types; + int* column_lengths; char *table; char *database; int version; /**< How many versions of this table have been used */ diff --git a/server/modules/routing/avro/avro_rbr.c b/server/modules/routing/avro/avro_rbr.c index b5a96f5ac..65c2c03d3 100644 --- a/server/modules/routing/avro/avro_rbr.c +++ b/server/modules/routing/avro/avro_rbr.c @@ -161,6 +161,11 @@ bool handle_table_map_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr "table until a DDL statement for it is read.", table_ident); } + if (rval) + { + MXS_INFO("Table Map for '%s' at %lu", table_ident, router->current_pos); + } + return rval; } @@ -288,9 +293,13 @@ bool handle_row_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr) * beforehand so we must continue processing them until we reach the end * of the event. */ int rows = 0; + MXS_INFO("Row Event for '%s' at %lu", table_ident, router->current_pos); while (ptr - start < hdr->event_size - BINLOG_EVENT_HDR_LEN) { + static uint64_t total_row_count = 1; + MXS_INFO("Row %lu", total_row_count++); + /** Add the current GTID and timestamp */ uint8_t *end = ptr + hdr->event_size - BINLOG_EVENT_HDR_LEN; int event_type = get_event_type(hdr->event_type); @@ -516,6 +525,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value npresent++; if (bit_is_set(null_bitmap, ncolumns, i)) { + MXS_INFO("[%ld] NULL", i); if (column_is_blob(map->column_types[i])) { uint8_t nullvalue = 0; @@ -545,6 +555,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value MXS_WARNING("ENUM/SET values larger than 255 values aren't supported."); } avro_value_set_string(&field, strval); + MXS_INFO("[%ld] ENUM: %lu bytes", i, bytes); ptr += bytes; ss_dassert(ptr < end); } @@ -594,6 +605,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value MXS_WARNING("BIT is not currently supported, values are stored as 0."); } avro_value_set_int(&field, value); + MXS_INFO("[%ld] BIT", i); ptr += bytes; ss_dassert(ptr < end); } @@ -602,6 +614,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value double f_value = 0.0; ptr += unpack_decimal_field(ptr, metadata + metadata_offset, &f_value); avro_value_set_double(&field, f_value); + MXS_INFO("[%ld] DOUBLE", i); ss_dassert(ptr < end); } else if (column_is_variable_string(map->column_types[i])) @@ -619,6 +632,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value ptr++; } + MXS_INFO("[%ld] VARCHAR: field: %d bytes, data: %lu bytes", i, bytes, sz); char buf[sz + 1]; memcpy(buf, ptr, sz); buf[sz] = '\0'; @@ -632,6 +646,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value uint64_t len = 0; memcpy(&len, ptr, bytes); ptr += bytes; + MXS_INFO("[%ld] BLOB: field: %d bytes, data: %lu bytes", i, bytes, len); if (len) { avro_value_set_bytes(&field, ptr, len); @@ -648,9 +663,12 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value { char buf[80]; struct tm tm; - ptr += unpack_temporal_value(map->column_types[i], ptr, &metadata[metadata_offset], &tm); + ptr += unpack_temporal_value(map->column_types[i], ptr, + &metadata[metadata_offset], + create->column_lengths[i], &tm); format_temporal_value(buf, sizeof(buf), map->column_types[i], &tm); avro_value_set_string(&field, buf); + MXS_INFO("[%ld] TEMPORAL: %s", i, buf); ss_dassert(ptr < end); } /** All numeric types (INT, LONG, FLOAT etc.) */ diff --git a/server/modules/routing/avro/avro_schema.c b/server/modules/routing/avro/avro_schema.c index 8c339e9e9..13bb44bc1 100644 --- a/server/modules/routing/avro/avro_schema.c +++ b/server/modules/routing/avro/avro_schema.c @@ -491,7 +491,6 @@ static const char *extract_field_name(const char* ptr, char* dest, size_t size) dest[bytes] = '\0'; make_valid_avro_identifier(dest); - ptr = next_field_definition(ptr); } else { @@ -501,62 +500,97 @@ static const char *extract_field_name(const char* ptr, char* dest, size_t size) return ptr; } +int extract_type_length(const char* ptr, char *dest) +{ + /** Skip any leading whitespace */ + while (isspace(*ptr) || *ptr == '`') + { + ptr++; + } + + /** The field type definition starts here */ + const char *start = ptr; + + /** Skip characters until we either hit a whitespace character or the start + * of the length definition. */ + while (!isspace(*ptr) && *ptr != '(') + { + ptr++; + } + + /** Store type */ + int typelen = ptr - start; + memcpy(dest, start, typelen); + dest[typelen] = '\0'; + + /** Skip whitespace */ + while (isspace(*ptr)) + { + ptr++; + } + + int rval = -1; // No length defined + + /** Start of length definition */ + if (*ptr == '(') + { + ptr++; + char *end; + int val = strtol(ptr, &end, 10); + + if (*end == ')') + { + rval = val; + } + } + + return rval; +} + +int count_columns(const char* ptr) +{ + int i = 2; + + while ((ptr = strchr(ptr, ','))) + { + ptr++; + i++; + } + + return i; +} + /** * Process a table definition into an array of column names * @param nameptr table definition * @return Number of processed columns or -1 on error */ -static int process_column_definition(const char *nameptr, char*** dest) +static int process_column_definition(const char *nameptr, char*** dest, char*** dest_types, int** dest_lens) { - /** Process columns in groups of 8 */ - size_t chunks = 1; - const size_t chunk_size = 8; - int i = 0; - char **names = malloc(sizeof(char*) * (chunks * chunk_size + 1)); - - if (names == NULL) - { - MXS_ERROR("Memory allocation failed when trying allocate %ld bytes of memory.", - sizeof(char*) * chunks); - return -1; - } + int n = count_columns(nameptr); + *dest = malloc(sizeof(char*) * n); + *dest_types = malloc(sizeof(char*) * n); + *dest_lens = malloc(sizeof(int) * n); + char **names = *dest; + char **types = *dest_types; + int *lengths = *dest_lens; char colname[512]; + int i = 0; while ((nameptr = extract_field_name(nameptr, colname, sizeof(colname)))) { - if (i >= chunks * chunk_size) - { - char **tmp = realloc(names, (++chunks * chunk_size + 1) * sizeof(char*)); - if (tmp == NULL) - { - for (int x = 0; x < i; x++) - { - free(names[x]); - } - free(names); - MXS_ERROR("Memory allocation failed when trying allocate %ld bytes of memory.", - sizeof(char*) * chunks); - return -1; - } - names = tmp; - } + ss_dassert(i < n); + char type[100] = ""; + int len = extract_type_length(nameptr, type); + nameptr = next_field_definition(nameptr); - if ((names[i++] = strdup(colname)) == NULL) - { - for (int x = 0; x < i; x++) - { - free(names[x]); - } - free(names); - MXS_ERROR("Memory allocation failed when trying allocate %lu bytes " - "of memory.", strlen(colname)); - return -1; - } + lengths[i] = len; + types[i] = strdup(type); + names[i] = strdup(colname); + i++; } - *dest = names; - return i; } @@ -600,7 +634,7 @@ TABLE_CREATE* table_create_alloc(const char* sql, const char* event_db) char database[MYSQL_DATABASE_MAXLEN + 1]; const char *db = event_db; - MXS_DEBUG("Create table statement: %.*s", stmt_len, statement_sql); + MXS_INFO("Create table: %s", sql); if (!get_table_name(sql, table)) { @@ -620,8 +654,10 @@ TABLE_CREATE* table_create_alloc(const char* sql, const char* event_db) db = database; } + int* lengths = NULL; char **names = NULL; - int n_columns = process_column_definition(statement_sql, &names); + char **types = NULL; + int n_columns = process_column_definition(statement_sql, &names, &types, &lengths); ss_dassert(n_columns > 0); /** We have appear to have a valid CREATE TABLE statement */ @@ -633,6 +669,8 @@ TABLE_CREATE* table_create_alloc(const char* sql, const char* event_db) rval->version = 1; rval->was_used = false; rval->column_names = names; + rval->column_lengths = lengths; + rval->column_types = types; rval->columns = n_columns; rval->database = strdup(db); rval->table = strdup(table); @@ -675,8 +713,11 @@ void* table_create_free(TABLE_CREATE* value) for (uint64_t i = 0; i < value->columns; i++) { free(value->column_names[i]); + free(value->column_types[i]); } free(value->column_names); + free(value->column_types); + free(value->column_lengths); free(value->table); free(value->database); free(value); @@ -822,7 +863,7 @@ bool table_create_alter(TABLE_CREATE *create, const char *sql, const char *end) if (tok) { - MXS_DEBUG("Altering table %.*s\n", len, tok); + MXS_INFO("Alter table '%.*s'; %.*s\n", len, tok, (int)(end - sql), sql); def = tok + len; } From 926930e2411a5a3a4d39f43f0a89603de0522e15 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Fri, 12 May 2017 11:25:48 +0300 Subject: [PATCH 2/6] MXS-1216: Correct CHAR(n) handling The field length was wrongly compared to less than 255 for two byte field lengths. In addition to that, the metadata was interpreted in the wrong way. --- server/modules/routing/avro/avro_rbr.c | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/server/modules/routing/avro/avro_rbr.c b/server/modules/routing/avro/avro_rbr.c index 65c2c03d3..5d684d159 100644 --- a/server/modules/routing/avro/avro_rbr.c +++ b/server/modules/routing/avro/avro_rbr.c @@ -572,15 +572,23 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value * one or two bytes for string length. */ - uint8_t bytes = *ptr++; - int len = metadata[metadata_offset] + - (((metadata[metadata_offset + 1] >> 4) & 0x3) ^ 0x3); + uint16_t meta = metadata[metadata_offset + 1] + (metadata[metadata_offset] << 8); + int bytes = 0; + uint16_t extra_length = (((meta >> 4) & 0x300) ^ 0x300); + uint16_t field_length = (meta & 0xff) + extra_length; - if (len <= 255) + if (field_length > 255) { - bytes += *ptr++ << 8; + bytes = ptr[0] + (ptr[1] << 8); + ptr += 2; + } + else + { + bytes = *ptr++; } + MXS_INFO("[%ld] CHAR: field: %d bytes, data: %d bytes", i, field_length, bytes); + ss_dassert(bytes || *ptr == '\0'); char str[bytes + 1]; memcpy(str, ptr, bytes); str[bytes] = '\0'; From 8a288110a9cd790c85778925e46816504ea6eba5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Fri, 12 May 2017 11:28:07 +0300 Subject: [PATCH 3/6] Rename conflicting Avro fields When a user defined field conflicts with an internal MaxScale field, the field is suffixed with an underscore. --- server/modules/include/avrorouter.h | 17 +++++++++++++++++ server/modules/routing/avro/avro_schema.c | 5 +++++ 2 files changed, 22 insertions(+) diff --git a/server/modules/include/avrorouter.h b/server/modules/include/avrorouter.h index b621c3026..eeb02f6b6 100644 --- a/server/modules/include/avrorouter.h +++ b/server/modules/include/avrorouter.h @@ -71,6 +71,23 @@ static const char *avro_event_type = "event_type"; static const char *avro_timestamp = "timestamp"; static char *avro_client_ouput[] = { "Undefined", "JSON", "Avro" }; +static inline bool is_reserved_word(const char* word) +{ + return strcasecmp(word, avro_domain) == 0 || + strcasecmp(word, avro_server_id) == 0 || + strcasecmp(word, avro_sequence) == 0 || + strcasecmp(word, avro_event_number) == 0 || + strcasecmp(word, avro_event_type) == 0 || + strcasecmp(word, avro_timestamp) == 0; +} + +static inline void fix_reserved_word(char *tok) +{ + if (is_reserved_word(tok)) + { + strcat(tok, "_"); + } +} /** How a binlog file is closed */ typedef enum avro_binlog_end diff --git a/server/modules/routing/avro/avro_schema.c b/server/modules/routing/avro/avro_schema.c index 13bb44bc1..468c37c6a 100644 --- a/server/modules/routing/avro/avro_schema.c +++ b/server/modules/routing/avro/avro_schema.c @@ -584,6 +584,7 @@ static int process_column_definition(const char *nameptr, char*** dest, char*** char type[100] = ""; int len = extract_type_length(nameptr, type); nameptr = next_field_definition(nameptr); + fix_reserved_word(colname); lengths[i] = len; types[i] = strdup(type); @@ -834,11 +835,15 @@ void make_avro_token(char* dest, const char* src, int length) memcpy(dest, src, length); dest[length] = '\0'; + fix_reserved_word(dest); } int get_column_index(TABLE_CREATE *create, const char *tok) { int idx = -1; + char safe_tok[strlen(tok) + 2]; + strcpy(safe_tok, tok); + fix_reserved_word(safe_tok); for (int x = 0; x < create->columns; x++) { From 6b6a7fa4a1b7ae555f9a58d476e5ad4b215fe0ca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Fri, 12 May 2017 11:30:19 +0300 Subject: [PATCH 4/6] Do checkpoint processing at end of binlog When the binlog has been read, it needs to be treated as if the transaction or row limit has been hit. This will cause all tables to be flushed to disk before the files are indexed. --- server/modules/routing/avro/avro_file.c | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/server/modules/routing/avro/avro_file.c b/server/modules/routing/avro/avro_file.c index 4d9819f0d..fc8834cff 100644 --- a/server/modules/routing/avro/avro_file.c +++ b/server/modules/routing/avro/avro_file.c @@ -460,6 +460,17 @@ void notify_all_clients(AVRO_INSTANCE *router) } } +void do_checkpoint(AVRO_INSTANCE *router, uint64_t *total_rows, uint64_t *total_commits) +{ + update_used_tables(router); + avro_flush_all_tables(router); + avro_save_conversion_state(router); + notify_all_clients(router); + *total_rows += router->row_count; + *total_commits += router->trx_count; + router->row_count = router->trx_count = 0; +} + /** * @brief Read all replication events from a binlog file. * @@ -541,6 +552,8 @@ avro_binlog_end_t avro_read_all_events(AVRO_INSTANCE *router) } else { + do_checkpoint(router, &total_rows, &total_commits); + MXS_INFO("Processed %lu transactions and %lu row events.", total_commits, total_rows); if (rotate_seen) @@ -734,13 +747,7 @@ avro_binlog_end_t avro_read_all_events(AVRO_INSTANCE *router) if (router->row_count >= router->row_target || router->trx_count >= router->trx_target) { - update_used_tables(router); - avro_flush_all_tables(router); - avro_save_conversion_state(router); - notify_all_clients(router); - total_rows += router->row_count; - total_commits += router->trx_count; - router->row_count = router->trx_count = 0; + do_checkpoint(router, &total_rows, &total_commits); } } From a12d19591efc3e977b54288b79498b77ccc5c2a0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Fri, 12 May 2017 16:47:20 +0300 Subject: [PATCH 5/6] MXS-1216: Store field real type and length in Avro schema The avro schema allows custom properties to be defined for the schema fields. The avrorouter stored extra information about the table into the schema for later use. Currently, this information is only generated by the avrorouter itself. Further improvements to the schema generator scripts need to be done. --- server/modules/routing/avro/avro_schema.c | 34 ++++++++++++++++++++--- 1 file changed, 30 insertions(+), 4 deletions(-) diff --git a/server/modules/routing/avro/avro_schema.c b/server/modules/routing/avro/avro_schema.c index 468c37c6a..444b571a4 100644 --- a/server/modules/routing/avro/avro_schema.c +++ b/server/modules/routing/avro/avro_schema.c @@ -126,9 +126,11 @@ char* json_new_schema_from_table(TABLE_MAP *map) for (uint64_t i = 0; i < map->columns; i++) { - json_array_append(array, json_pack_ex(&err, 0, "{s:s, s:s}", "name", - create->column_names[i], "type", - column_type_to_avro_type(map->column_types[i]))); + json_array_append(array, json_pack_ex(&err, 0, "{s:s, s:s, s:s, s:i}", + "name", create->column_names[i], + "type", column_type_to_avro_type(map->column_types[i]), + "real_type", create->column_types[i], + "length", create->column_lengths[i])); } json_object_set_new(schema, "fields", array); char* rval = json_dumps(schema, JSON_PRESERVE_ORDER); @@ -174,8 +176,10 @@ bool json_extract_field_names(const char* filename, TABLE_CREATE *table) { int array_size = json_array_size(arr); table->column_names = (char**)malloc(sizeof(char*) * (array_size)); + table->column_types = (char**)malloc(sizeof(char*) * (array_size)); + table->column_lengths = (int*)malloc(sizeof(int) * (array_size)); - if (table->column_names) + if (table->column_names && table->column_types && table->column_lengths) { int columns = 0; rval = true; @@ -186,6 +190,28 @@ bool json_extract_field_names(const char* filename, TABLE_CREATE *table) if (json_is_object(val)) { + json_t* value; + + if ((value = json_object_get(val, "real_type")) && json_is_string(value)) + { + table->column_types[columns] = strdup(json_string_value(value)); + } + else + { + table->column_types[columns] = strdup("unknown"); + MXS_WARNING("No \"real_type\" value defined. Treating as unknown type field."); + } + + if ((value = json_object_get(val, "length")) && json_is_integer(value)) + { + table->column_lengths[columns] = json_integer_value(value); + } + else + { + table->column_lengths[columns] = -1; + MXS_WARNING("No \"length\" value defined. Treating as default length field."); + } + json_t *name = json_object_get(val, "name"); if (name && json_is_string(name)) { From 5a0d2c54bd564688af44695067953ac16a09ee85 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Fri, 12 May 2017 16:49:15 +0300 Subject: [PATCH 6/6] MXS-1216: Fix DATETIME(n) value interpretation The DATETIME(n) values generated by a MariaDB 10.0 server were not interpreted correctly as the wrong algorithm was used to extract the values. DATETIME(0) values still do not work properly and they require further debugging and changes to the code. --- server/core/mysql_binlog.c | 109 +++++++++++++++++++++++++++---------- 1 file changed, 79 insertions(+), 30 deletions(-) diff --git a/server/core/mysql_binlog.c b/server/core/mysql_binlog.c index 90590a3da..93de0e3aa 100644 --- a/server/core/mysql_binlog.c +++ b/server/core/mysql_binlog.c @@ -26,6 +26,10 @@ #include #include +#include "mysql_client_server_protocol.h" + +static uint64_t unpack_bytes(uint8_t *ptr, size_t bytes); + /** * @brief Convert a table column type to a string * @@ -217,6 +221,35 @@ static void unpack_year(uint8_t *ptr, struct tm *dest) dest->tm_year = *ptr; } +/** Base-10 logarithm values */ +int64_t log_10_values[] = +{ + 1, + 10, + 100, + 1000, + 10000, + 100000, + 1000000, + 10000000, + 100000000 +}; + +/** + * If the TABLE_COL_TYPE_DATETIME type field is declared as a datetime with + * extra precision, the packed length is shorter than 8 bytes. + */ +size_t datetime_sizes[] = +{ + 5, // DATETIME(0) + 6, // DATETIME(1) + 6, // DATETIME(2) + 7, // DATETIME(3) + 7, // DATETIME(4) + 7, // DATETIME(5) + 8 // DATETIME(6) +}; + /** * @brief Unpack a DATETIME * @@ -225,21 +258,52 @@ static void unpack_year(uint8_t *ptr, struct tm *dest) * @param val Value read from the binary log * @param dest Pointer where the unpacked value is stored */ -static void unpack_datetime(uint8_t *ptr, struct tm *dest) +static void unpack_datetime(uint8_t *ptr, int length, struct tm *dest) { - uint64_t val = 0; - memcpy(&val, ptr, sizeof(val)); - uint32_t second = val - ((val / 100) * 100); - val /= 100; - uint32_t minute = val - ((val / 100) * 100); - val /= 100; - uint32_t hour = val - ((val / 100) * 100); - val /= 100; - uint32_t day = val - ((val / 100) * 100); - val /= 100; - uint32_t month = val - ((val / 100) * 100); - val /= 100; - uint32_t year = val; + int64_t val = 0; + uint32_t second, minute, hour, day, month, year; + + if (length == -1) + { + val = gw_mysql_get_byte8(ptr); + second = val - ((val / 100) * 100); + val /= 100; + minute = val - ((val / 100) * 100); + val /= 100; + hour = val - ((val / 100) * 100); + val /= 100; + day = val - ((val / 100) * 100); + val /= 100; + month = val - ((val / 100) * 100); + val /= 100; + year = val; + } + else + { + // TODO: Figure out why DATETIME(0) doesn't work like it others do + val = unpack_bytes(ptr, datetime_sizes[length]); + val *= log_10_values[6 - length]; + + if (val < 0) + { + val = -val; + } + + int subsecond = val % 1000000; + val /= 1000000; + + second = val % 60; + val /= 60; + minute = val % 60; + val /= 60; + hour = val % 24; + val /= 24; + day = val % 32; + val /= 32; + month = val % 13; + val /= 13; + year = val; + } memset(dest, 0, sizeof(struct tm)); dest->tm_year = year - 1900; @@ -392,21 +456,6 @@ size_t unpack_bit(uint8_t *ptr, uint8_t *null_mask, uint32_t col_count, return metadata[1]; } -/** - * If the TABLE_COL_TYPE_DATETIME type field is declared as a datetime with - * extra precision, the packed length is shorter than 8 bytes. - */ -size_t datetime_sizes[] = -{ - 5, // DATETIME(0) - 6, // DATETIME(1) - 6, // DATETIME(2) - 7, // DATETIME(3) - 7, // DATETIME(4) - 7, // DATETIME(5) - 8 // DATETIME(6) -}; - /** * @brief Get the length of a temporal field * @param type Field type @@ -465,7 +514,7 @@ size_t unpack_temporal_value(uint8_t type, uint8_t *ptr, uint8_t *metadata, int break; case TABLE_COL_TYPE_DATETIME: - unpack_datetime(ptr, tm); + unpack_datetime(ptr, length, tm); break; case TABLE_COL_TYPE_DATETIME2: