diff --git a/include/maxscale/mysql_binlog.h b/include/maxscale/mysql_binlog.h index cb0c3ae97..6ff0ab6b3 100644 --- a/include/maxscale/mysql_binlog.h +++ b/include/maxscale/mysql_binlog.h @@ -85,7 +85,7 @@ bool column_is_decimal(uint8_t type); bool fixed_string_is_enum(uint8_t type); /** Value unpacking */ -size_t unpack_temporal_value(uint8_t type, uint8_t *ptr, uint8_t* metadata, struct tm *tm); +size_t unpack_temporal_value(uint8_t type, uint8_t *ptr, uint8_t* metadata, int length, struct tm *tm); size_t unpack_enum(uint8_t *ptr, uint8_t *metadata, uint8_t *dest); size_t unpack_numeric_field(uint8_t *ptr, uint8_t type, uint8_t* metadata, uint8_t* val); size_t unpack_bit(uint8_t *ptr, uint8_t *null_mask, uint32_t col_count, diff --git a/server/core/mysql_binlog.c b/server/core/mysql_binlog.c index 4c5c38c20..0d9aeb987 100644 --- a/server/core/mysql_binlog.c +++ b/server/core/mysql_binlog.c @@ -25,6 +25,10 @@ #include #include +#include + +static uint64_t unpack_bytes(uint8_t *ptr, size_t bytes); + /** * @brief Convert a table column type to a string * @@ -216,6 +220,35 @@ static void unpack_year(uint8_t *ptr, struct tm *dest) dest->tm_year = *ptr; } +/** Base-10 logarithm values */ +int64_t log_10_values[] = +{ + 1, + 10, + 100, + 1000, + 10000, + 100000, + 1000000, + 10000000, + 100000000 +}; + +/** + * If the TABLE_COL_TYPE_DATETIME type field is declared as a datetime with + * extra precision, the packed length is shorter than 8 bytes. + */ +size_t datetime_sizes[] = +{ + 5, // DATETIME(0) + 6, // DATETIME(1) + 6, // DATETIME(2) + 7, // DATETIME(3) + 7, // DATETIME(4) + 7, // DATETIME(5) + 8 // DATETIME(6) +}; + /** * @brief Unpack a DATETIME * @@ -224,21 +257,52 @@ static void unpack_year(uint8_t *ptr, struct tm *dest) * @param val Value read from the binary log * @param dest Pointer where the unpacked value is stored */ -static void unpack_datetime(uint8_t *ptr, struct tm *dest) +static void unpack_datetime(uint8_t *ptr, int length, struct tm *dest) { - uint64_t val = 0; - memcpy(&val, ptr, sizeof(val)); - uint32_t second = val - ((val / 100) * 100); - val /= 100; - uint32_t minute = val - ((val / 100) * 100); - val /= 100; - uint32_t hour = val - ((val / 100) * 100); - val /= 100; - uint32_t day = val - ((val / 100) * 100); - val /= 100; - uint32_t month = val - ((val / 100) * 100); - val /= 100; - uint32_t year = val; + int64_t val = 0; + uint32_t second, minute, hour, day, month, year; + + if (length == -1) + { + val = gw_mysql_get_byte8(ptr); + second = val - ((val / 100) * 100); + val /= 100; + minute = val - ((val / 100) * 100); + val /= 100; + hour = val - ((val / 100) * 100); + val /= 100; + day = val - ((val / 100) * 100); + val /= 100; + month = val - ((val / 100) * 100); + val /= 100; + year = val; + } + else + { + // TODO: Figure out why DATETIME(0) doesn't work like it others do + val = unpack_bytes(ptr, datetime_sizes[length]); + val *= log_10_values[6 - length]; + + if (val < 0) + { + val = -val; + } + + int subsecond = val % 1000000; + val /= 1000000; + + second = val % 60; + val /= 60; + minute = val % 60; + val /= 60; + hour = val % 24; + val /= 24; + day = val % 32; + val /= 32; + month = val % 13; + val /= 13; + year = val; + } memset(dest, 0, sizeof(struct tm)); dest->tm_year = year - 1900; @@ -391,14 +455,13 @@ size_t unpack_bit(uint8_t *ptr, uint8_t *null_mask, uint32_t col_count, return metadata[1]; } - /** * @brief Get the length of a temporal field * @param type Field type * @param decimals How many decimals the field has * @return Number of bytes the temporal value takes */ -static size_t temporal_field_size(uint8_t type, uint8_t decimals) +static size_t temporal_field_size(uint8_t type, uint8_t decimals, int length) { switch (type) { @@ -412,8 +475,8 @@ static size_t temporal_field_size(uint8_t type, uint8_t decimals) case TABLE_COL_TYPE_TIME2: return 3 + ((decimals + 1) / 2); - case TABLE_COL_TYPE_DATETIME: - return 8; + case TABLE_COL_TYPE_DATETIME: + return length < 0 || length > 6 ? 8 : datetime_sizes[length]; case TABLE_COL_TYPE_TIMESTAMP: return 4; @@ -441,40 +504,40 @@ static size_t temporal_field_size(uint8_t type, uint8_t decimals) * @param val Extracted packed value * @param tm Pointer where the unpacked temporal value is stored */ -size_t unpack_temporal_value(uint8_t type, uint8_t *ptr, uint8_t *metadata, struct tm *tm) +size_t unpack_temporal_value(uint8_t type, uint8_t *ptr, uint8_t *metadata, int length, struct tm *tm) { switch (type) { - case TABLE_COL_TYPE_YEAR: - unpack_year(ptr, tm); - break; + case TABLE_COL_TYPE_YEAR: + unpack_year(ptr, tm); + break; - case TABLE_COL_TYPE_DATETIME: - unpack_datetime(ptr, tm); - break; + case TABLE_COL_TYPE_DATETIME: + unpack_datetime(ptr, length, tm); + break; - case TABLE_COL_TYPE_DATETIME2: - unpack_datetime2(ptr, *metadata, tm); - break; + case TABLE_COL_TYPE_DATETIME2: + unpack_datetime2(ptr, *metadata, tm); + break; - case TABLE_COL_TYPE_TIME: - unpack_time(ptr, tm); - break; + case TABLE_COL_TYPE_TIME: + unpack_time(ptr, tm); + break; - case TABLE_COL_TYPE_DATE: - unpack_date(ptr, tm); - break; + case TABLE_COL_TYPE_DATE: + unpack_date(ptr, tm); + break; - case TABLE_COL_TYPE_TIMESTAMP: - case TABLE_COL_TYPE_TIMESTAMP2: - unpack_timestamp(ptr, *metadata, tm); - break; + case TABLE_COL_TYPE_TIMESTAMP: + case TABLE_COL_TYPE_TIMESTAMP2: + unpack_timestamp(ptr, *metadata, tm); + break; - default: - ss_dassert(false); - break; + default: + ss_dassert(false); + break; } - return temporal_field_size(type, *metadata); + return temporal_field_size(type, *metadata, length); } void format_temporal_value(char *str, size_t size, uint8_t type, struct tm *tm) diff --git a/server/modules/routing/avrorouter/avro_file.c b/server/modules/routing/avrorouter/avro_file.c index 5f9745435..f1f357d0b 100644 --- a/server/modules/routing/avrorouter/avro_file.c +++ b/server/modules/routing/avrorouter/avro_file.c @@ -475,6 +475,17 @@ void notify_all_clients(AVRO_INSTANCE *router) } } +void do_checkpoint(AVRO_INSTANCE *router, uint64_t *total_rows, uint64_t *total_commits) +{ + update_used_tables(router); + avro_flush_all_tables(router, AVROROUTER_FLUSH); + avro_save_conversion_state(router); + notify_all_clients(router); + *total_rows += router->row_count; + *total_commits += router->trx_count; + router->row_count = router->trx_count = 0; +} + /** * @brief Read all replication events from a binlog file. * @@ -555,6 +566,8 @@ avro_binlog_end_t avro_read_all_events(AVRO_INSTANCE *router) } else { + do_checkpoint(router, &total_rows, &total_commits); + MXS_INFO("Processed %lu transactions and %lu row events.", total_commits, total_rows); if (rotate_seen) @@ -742,13 +755,7 @@ avro_binlog_end_t avro_read_all_events(AVRO_INSTANCE *router) if (router->row_count >= router->row_target || router->trx_count >= router->trx_target) { - update_used_tables(router); - avro_flush_all_tables(router, AVROROUTER_SYNC); - avro_save_conversion_state(router); - notify_all_clients(router); - total_rows += router->row_count; - total_commits += router->trx_count; - router->row_count = router->trx_count = 0; + do_checkpoint(router, &total_rows, &total_commits); } } diff --git a/server/modules/routing/avrorouter/avro_rbr.c b/server/modules/routing/avrorouter/avro_rbr.c index c3aaa6e0b..e374ca463 100644 --- a/server/modules/routing/avrorouter/avro_rbr.c +++ b/server/modules/routing/avrorouter/avro_rbr.c @@ -162,6 +162,11 @@ bool handle_table_map_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr "table until a DDL statement for it is read.", table_ident); } + if (rval) + { + MXS_INFO("Table Map for '%s' at %lu", table_ident, router->current_pos); + } + return rval; } @@ -289,9 +294,13 @@ bool handle_row_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr) * beforehand so we must continue processing them until we reach the end * of the event. */ int rows = 0; + MXS_INFO("Row Event for '%s' at %lu", table_ident, router->current_pos); while (ptr - start < hdr->event_size - BINLOG_EVENT_HDR_LEN) { + static uint64_t total_row_count = 1; + MXS_INFO("Row %lu", total_row_count++); + /** Add the current GTID and timestamp */ uint8_t *end = ptr + hdr->event_size - BINLOG_EVENT_HDR_LEN; int event_type = get_event_type(hdr->event_type); @@ -517,6 +526,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value npresent++; if (bit_is_set(null_bitmap, ncolumns, i)) { + MXS_INFO("[%ld] NULL", i); if (column_is_blob(map->column_types[i])) { uint8_t nullvalue = 0; @@ -546,6 +556,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value MXS_WARNING("ENUM/SET values larger than 255 values aren't supported."); } avro_value_set_string(&field, strval); + MXS_INFO("[%ld] ENUM: %lu bytes", i, bytes); ptr += bytes; ss_dassert(ptr < end); } @@ -562,15 +573,23 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value * one or two bytes for string length. */ - uint8_t bytes = *ptr++; - int len = metadata[metadata_offset] + - (((metadata[metadata_offset + 1] >> 4) & 0x3) ^ 0x3); + uint16_t meta = metadata[metadata_offset + 1] + (metadata[metadata_offset] << 8); + int bytes = 0; + uint16_t extra_length = (((meta >> 4) & 0x300) ^ 0x300); + uint16_t field_length = (meta & 0xff) + extra_length; - if (len <= 255) + if (field_length > 255) { - bytes += *ptr++ << 8; + bytes = ptr[0] + (ptr[1] << 8); + ptr += 2; + } + else + { + bytes = *ptr++; } + MXS_INFO("[%ld] CHAR: field: %d bytes, data: %d bytes", i, field_length, bytes); + ss_dassert(bytes || *ptr == '\0'); char str[bytes + 1]; memcpy(str, ptr, bytes); str[bytes] = '\0'; @@ -595,6 +614,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value MXS_WARNING("BIT is not currently supported, values are stored as 0."); } avro_value_set_int(&field, value); + MXS_INFO("[%ld] BIT", i); ptr += bytes; ss_dassert(ptr < end); } @@ -603,6 +623,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value double f_value = 0.0; ptr += unpack_decimal_field(ptr, metadata + metadata_offset, &f_value); avro_value_set_double(&field, f_value); + MXS_INFO("[%ld] DOUBLE", i); ss_dassert(ptr < end); } else if (column_is_variable_string(map->column_types[i])) @@ -620,6 +641,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value ptr++; } + MXS_INFO("[%ld] VARCHAR: field: %d bytes, data: %lu bytes", i, bytes, sz); char buf[sz + 1]; memcpy(buf, ptr, sz); buf[sz] = '\0'; @@ -633,6 +655,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value uint64_t len = 0; memcpy(&len, ptr, bytes); ptr += bytes; + MXS_INFO("[%ld] BLOB: field: %d bytes, data: %lu bytes", i, bytes, len); if (len) { avro_value_set_bytes(&field, ptr, len); @@ -649,9 +672,12 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value { char buf[80]; struct tm tm; - ptr += unpack_temporal_value(map->column_types[i], ptr, &metadata[metadata_offset], &tm); + ptr += unpack_temporal_value(map->column_types[i], ptr, + &metadata[metadata_offset], + create->column_lengths[i], &tm); format_temporal_value(buf, sizeof(buf), map->column_types[i], &tm); avro_value_set_string(&field, buf); + MXS_INFO("[%ld] TEMPORAL: %s", i, buf); ss_dassert(ptr < end); } /** All numeric types (INT, LONG, FLOAT etc.) */ @@ -662,6 +688,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value ptr += unpack_numeric_field(ptr, map->column_types[i], &metadata[metadata_offset], lval); set_numeric_field_value(&field, map->column_types[i], &metadata[metadata_offset], lval); + MXS_INFO("[%ld] NUMERIC: %ld", i, *((int64_t*)lval)); ss_dassert(ptr < end); } ss_dassert(metadata_offset <= map->column_metadata_size); diff --git a/server/modules/routing/avrorouter/avro_schema.c b/server/modules/routing/avrorouter/avro_schema.c index 22e17dc48..a15547784 100644 --- a/server/modules/routing/avrorouter/avro_schema.c +++ b/server/modules/routing/avrorouter/avro_schema.c @@ -124,9 +124,11 @@ char* json_new_schema_from_table(TABLE_MAP *map) for (uint64_t i = 0; i < map->columns; i++) { - json_array_append(array, json_pack_ex(&err, 0, "{s:s, s:s}", "name", - create->column_names[i], "type", - column_type_to_avro_type(map->column_types[i]))); + json_array_append(array, json_pack_ex(&err, 0, "{s:s, s:s, s:s, s:i}", + "name", create->column_names[i], + "type", column_type_to_avro_type(map->column_types[i]), + "real_type", create->column_types[i], + "length", create->column_lengths[i])); } json_object_set_new(schema, "fields", array); char* rval = json_dumps(schema, JSON_PRESERVE_ORDER); @@ -172,8 +174,10 @@ bool json_extract_field_names(const char* filename, TABLE_CREATE *table) { int array_size = json_array_size(arr); table->column_names = (char**)MXS_MALLOC(sizeof(char*) * (array_size)); + table->column_types = (char**)MXS_MALLOC(sizeof(char*) * (array_size)); + table->column_lengths = (int*)MXS_MALLOC(sizeof(int) * (array_size)); - if (table->column_names) + if (table->column_names && table->column_types && table->column_lengths) { int columns = 0; rval = true; @@ -184,6 +188,28 @@ bool json_extract_field_names(const char* filename, TABLE_CREATE *table) if (json_is_object(val)) { + json_t* value; + + if ((value = json_object_get(val, "real_type")) && json_is_string(value)) + { + table->column_types[columns] = MXS_STRDUP_A(json_string_value(value)); + } + else + { + table->column_types[columns] = MXS_STRDUP_A("unknown"); + MXS_WARNING("No \"real_type\" value defined. Treating as unknown type field."); + } + + if ((value = json_object_get(val, "length")) && json_is_integer(value)) + { + table->column_lengths[columns] = json_integer_value(value); + } + else + { + table->column_lengths[columns] = -1; + MXS_WARNING("No \"length\" value defined. Treating as default length field."); + } + json_t *name = json_object_get(val, "name"); if (name && json_is_string(name)) { @@ -489,7 +515,6 @@ static const char *extract_field_name(const char* ptr, char* dest, size_t size) dest[bytes] = '\0'; make_valid_avro_identifier(dest); - ptr = next_field_definition(ptr); } else { @@ -499,56 +524,98 @@ static const char *extract_field_name(const char* ptr, char* dest, size_t size) return ptr; } +int extract_type_length(const char* ptr, char *dest) +{ + /** Skip any leading whitespace */ + while (isspace(*ptr) || *ptr == '`') + { + ptr++; + } + + /** The field type definition starts here */ + const char *start = ptr; + + /** Skip characters until we either hit a whitespace character or the start + * of the length definition. */ + while (!isspace(*ptr) && *ptr != '(') + { + ptr++; + } + + /** Store type */ + int typelen = ptr - start; + memcpy(dest, start, typelen); + dest[typelen] = '\0'; + + /** Skip whitespace */ + while (isspace(*ptr)) + { + ptr++; + } + + int rval = -1; // No length defined + + /** Start of length definition */ + if (*ptr == '(') + { + ptr++; + char *end; + int val = strtol(ptr, &end, 10); + + if (*end == ')') + { + rval = val; + } + } + + return rval; +} + +int count_columns(const char* ptr) +{ + int i = 2; + + while ((ptr = strchr(ptr, ','))) + { + ptr++; + i++; + } + + return i; +} + /** * Process a table definition into an array of column names * @param nameptr table definition * @return Number of processed columns or -1 on error */ -static int process_column_definition(const char *nameptr, char*** dest) +static int process_column_definition(const char *nameptr, char*** dest, char*** dest_types, int** dest_lens) { - /** Process columns in groups of 8 */ - size_t chunks = 1; - const size_t chunk_size = 8; - int i = 0; - char **names = MXS_MALLOC(sizeof(char*) * (chunks * chunk_size + 1)); - - if (names == NULL) - { - return -1; - } + int n = count_columns(nameptr); + *dest = MXS_MALLOC(sizeof(char*) * n); + *dest_types = MXS_MALLOC(sizeof(char*) * n); + *dest_lens = MXS_MALLOC(sizeof(int) * n); + char **names = *dest; + char **types = *dest_types; + int *lengths = *dest_lens; char colname[512]; + int i = 0; while ((nameptr = extract_field_name(nameptr, colname, sizeof(colname)))) { - if (i >= chunks * chunk_size) - { - char **tmp = MXS_REALLOC(names, (++chunks * chunk_size + 1) * sizeof(char*)); - if (tmp == NULL) - { - for (int x = 0; x < i; x++) - { - MXS_FREE(names[x]); - } - MXS_FREE(names); - return -1; - } - names = tmp; - } + ss_dassert(i < n); + char type[100] = ""; + int len = extract_type_length(nameptr, type); + nameptr = next_field_definition(nameptr); + fix_reserved_word(colname); - if ((names[i++] = MXS_STRDUP(colname)) == NULL) - { - for (int x = 0; x < i; x++) - { - MXS_FREE(names[x]); - } - MXS_FREE(names); - return -1; - } + lengths[i] = len; + types[i] = MXS_STRDUP_A(type); + names[i] = MXS_STRDUP_A(colname); + i++; } - *dest = names; - return i; } @@ -601,7 +668,7 @@ TABLE_CREATE* table_create_alloc(const char* sql, const char* event_db) char database[MYSQL_DATABASE_MAXLEN + 1]; const char *db = event_db; - MXS_DEBUG("Create table statement: %.*s", stmt_len, statement_sql); + MXS_INFO("Create table: %s", sql); if (!get_table_name(sql, table)) { @@ -621,8 +688,10 @@ TABLE_CREATE* table_create_alloc(const char* sql, const char* event_db) db = database; } + int* lengths = NULL; char **names = NULL; - int n_columns = process_column_definition(statement_sql, &names); + char **types = NULL; + int n_columns = process_column_definition(statement_sql, &names, &types, &lengths); ss_dassert(n_columns > 0); /** We have appear to have a valid CREATE TABLE statement */ @@ -634,6 +703,8 @@ TABLE_CREATE* table_create_alloc(const char* sql, const char* event_db) rval->version = 1; rval->was_used = false; rval->column_names = names; + rval->column_lengths = lengths; + rval->column_types = types; rval->columns = n_columns; rval->database = MXS_STRDUP(db); rval->table = MXS_STRDUP(table); @@ -675,8 +746,11 @@ void table_create_free(TABLE_CREATE* value) for (uint64_t i = 0; i < value->columns; i++) { MXS_FREE(value->column_names[i]); + MXS_FREE(value->column_types[i]); } MXS_FREE(value->column_names); + MXS_FREE(value->column_types); + MXS_FREE(value->column_lengths); MXS_FREE(value->table); MXS_FREE(value->database); MXS_FREE(value); @@ -792,11 +866,15 @@ void make_avro_token(char* dest, const char* src, int length) memcpy(dest, src, length); dest[length] = '\0'; + fix_reserved_word(dest); } int get_column_index(TABLE_CREATE *create, const char *tok) { int idx = -1; + char safe_tok[strlen(tok) + 2]; + strcpy(safe_tok, tok); + fix_reserved_word(safe_tok); for (int x = 0; x < create->columns; x++) { @@ -821,7 +899,7 @@ bool table_create_alter(TABLE_CREATE *create, const char *sql, const char *end) if (tok) { - MXS_DEBUG("Altering table %.*s\n", len, tok); + MXS_INFO("Alter table '%.*s'; %.*s\n", len, tok, (int)(end - sql), sql); def = tok + len; } @@ -864,7 +942,7 @@ bool table_create_alter(TABLE_CREATE *create, const char *sql, const char *end) if (idx != -1) { - free(create->column_names[idx]); + MXS_FREE(create->column_names[idx]); for (int i = idx; i < (int)create->columns - 1; i++) { create->column_names[i] = create->column_names[i + 1]; @@ -894,7 +972,7 @@ bool table_create_alter(TABLE_CREATE *create, const char *sql, const char *end) if (idx != -1) { - free(create->column_names[idx]); + MXS_FREE(create->column_names[idx]); create->column_names[idx] = strndup(tok, len); updates++; } diff --git a/server/modules/routing/avrorouter/avrorouter.h b/server/modules/routing/avrorouter/avrorouter.h index aa33922a9..ec9f0f268 100644 --- a/server/modules/routing/avrorouter/avrorouter.h +++ b/server/modules/routing/avrorouter/avrorouter.h @@ -84,6 +84,23 @@ static const char *avro_event_type = "event_type"; static const char *avro_timestamp = "timestamp"; static char *avro_client_ouput[] = { "Undefined", "JSON", "Avro" }; +static inline bool is_reserved_word(const char* word) +{ + return strcasecmp(word, avro_domain) == 0 || + strcasecmp(word, avro_server_id) == 0 || + strcasecmp(word, avro_sequence) == 0 || + strcasecmp(word, avro_event_number) == 0 || + strcasecmp(word, avro_event_type) == 0 || + strcasecmp(word, avro_timestamp) == 0; +} + +static inline void fix_reserved_word(char *tok) +{ + if (is_reserved_word(tok)) + { + strcat(tok, "_"); + } +} /** How a binlog file is closed */ typedef enum avro_binlog_end @@ -111,6 +128,8 @@ typedef struct table_create { uint64_t columns; char **column_names; + char **column_types; + int* column_lengths; char *table; char *database; int version; /**< How many versions of this table have been used */