diff --git a/server/core/mysql_binlog.c b/server/core/mysql_binlog.c index ddf5825c0..90590a3da 100644 --- a/server/core/mysql_binlog.c +++ b/server/core/mysql_binlog.c @@ -392,6 +392,20 @@ size_t unpack_bit(uint8_t *ptr, uint8_t *null_mask, uint32_t col_count, return metadata[1]; } +/** + * If the TABLE_COL_TYPE_DATETIME type field is declared as a datetime with + * extra precision, the packed length is shorter than 8 bytes. + */ +size_t datetime_sizes[] = +{ + 5, // DATETIME(0) + 6, // DATETIME(1) + 6, // DATETIME(2) + 7, // DATETIME(3) + 7, // DATETIME(4) + 7, // DATETIME(5) + 8 // DATETIME(6) +}; /** * @brief Get the length of a temporal field @@ -399,7 +413,7 @@ size_t unpack_bit(uint8_t *ptr, uint8_t *null_mask, uint32_t col_count, * @param decimals How many decimals the field has * @return Number of bytes the temporal value takes */ -static size_t temporal_field_size(uint8_t type, uint8_t decimals) +static size_t temporal_field_size(uint8_t type, uint8_t decimals, int length) { switch (type) { @@ -414,7 +428,7 @@ static size_t temporal_field_size(uint8_t type, uint8_t decimals) return 3 + ((decimals + 1) / 2); case TABLE_COL_TYPE_DATETIME: - return 8; + return length < 0 || length > 6 ? 8 : datetime_sizes[length]; case TABLE_COL_TYPE_TIMESTAMP: return 4; @@ -442,7 +456,7 @@ static size_t temporal_field_size(uint8_t type, uint8_t decimals) * @param val Extracted packed value * @param tm Pointer where the unpacked temporal value is stored */ -size_t unpack_temporal_value(uint8_t type, uint8_t *ptr, uint8_t *metadata, struct tm *tm) +size_t unpack_temporal_value(uint8_t type, uint8_t *ptr, uint8_t *metadata, int length, struct tm *tm) { switch (type) { @@ -475,7 +489,7 @@ size_t unpack_temporal_value(uint8_t type, uint8_t *ptr, uint8_t *metadata, stru ss_dassert(false); break; } - return temporal_field_size(type, *metadata); + return temporal_field_size(type, *metadata, length); } void format_temporal_value(char *str, size_t size, uint8_t type, struct tm *tm) diff --git a/server/include/mysql_binlog.h b/server/include/mysql_binlog.h index b1a81eaa2..ebe5574e5 100644 --- a/server/include/mysql_binlog.h +++ b/server/include/mysql_binlog.h @@ -83,7 +83,7 @@ bool column_is_decimal(uint8_t type); bool fixed_string_is_enum(uint8_t type); /** Value unpacking */ -size_t unpack_temporal_value(uint8_t type, uint8_t *ptr, uint8_t* metadata, struct tm *tm); +size_t unpack_temporal_value(uint8_t type, uint8_t *ptr, uint8_t* metadata, int length, struct tm *tm); size_t unpack_enum(uint8_t *ptr, uint8_t *metadata, uint8_t *dest); size_t unpack_numeric_field(uint8_t *ptr, uint8_t type, uint8_t* metadata, uint8_t* val); size_t unpack_bit(uint8_t *ptr, uint8_t *null_mask, uint32_t col_count, diff --git a/server/modules/include/avrorouter.h b/server/modules/include/avrorouter.h index a233915b4..b621c3026 100644 --- a/server/modules/include/avrorouter.h +++ b/server/modules/include/avrorouter.h @@ -98,6 +98,8 @@ typedef struct table_create { uint64_t columns; char **column_names; + char **column_types; + int* column_lengths; char *table; char *database; int version; /**< How many versions of this table have been used */ diff --git a/server/modules/routing/avro/avro_rbr.c b/server/modules/routing/avro/avro_rbr.c index b5a96f5ac..bda08ea54 100644 --- a/server/modules/routing/avro/avro_rbr.c +++ b/server/modules/routing/avro/avro_rbr.c @@ -161,6 +161,11 @@ bool handle_table_map_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr "table until a DDL statement for it is read.", table_ident); } + if (rval) + { + MXS_INFO("Table Map for '%s' at %lu", table_ident, router->current_pos); + } + return rval; } @@ -288,9 +293,13 @@ bool handle_row_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr) * beforehand so we must continue processing them until we reach the end * of the event. */ int rows = 0; + MXS_INFO("Row Event for '%s' at %lu", table_ident, router->current_pos); while (ptr - start < hdr->event_size - BINLOG_EVENT_HDR_LEN) { + static uint64_t total_row_count = 1; + MXS_INFO("Row %lu", total_row_count++); + /** Add the current GTID and timestamp */ uint8_t *end = ptr + hdr->event_size - BINLOG_EVENT_HDR_LEN; int event_type = get_event_type(hdr->event_type); @@ -516,6 +525,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value npresent++; if (bit_is_set(null_bitmap, ncolumns, i)) { + MXS_INFO("[%ld] NULL", i); if (column_is_blob(map->column_types[i])) { uint8_t nullvalue = 0; @@ -545,6 +555,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value MXS_WARNING("ENUM/SET values larger than 255 values aren't supported."); } avro_value_set_string(&field, strval); + MXS_INFO("[%ld] ENUM: %lu bytes", i, bytes); ptr += bytes; ss_dassert(ptr < end); } @@ -594,6 +605,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value MXS_WARNING("BIT is not currently supported, values are stored as 0."); } avro_value_set_int(&field, value); + MXS_INFO("[%ld] BIT", i); ptr += bytes; ss_dassert(ptr < end); } @@ -602,6 +614,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value double f_value = 0.0; ptr += unpack_decimal_field(ptr, metadata + metadata_offset, &f_value); avro_value_set_double(&field, f_value); + MXS_INFO("[%ld] DOUBLE", i); ss_dassert(ptr < end); } else if (column_is_variable_string(map->column_types[i])) @@ -619,6 +632,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value ptr++; } + MXS_INFO("[%ld] VARCHAR: field: %d bytes, data: %lu bytes", i, bytes, sz); char buf[sz + 1]; memcpy(buf, ptr, sz); buf[sz] = '\0'; @@ -632,6 +646,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value uint64_t len = 0; memcpy(&len, ptr, bytes); ptr += bytes; + MXS_INFO("[%ld] BLOB: field: %d bytes, data: %lu bytes", i, bytes, len); if (len) { avro_value_set_bytes(&field, ptr, len); @@ -648,9 +663,12 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value { char buf[80]; struct tm tm; - ptr += unpack_temporal_value(map->column_types[i], ptr, &metadata[metadata_offset], &tm); + ptr += unpack_temporal_value(map->column_types[i], ptr, + &metadata[metadata_offset], + create->column_lengths[i], &tm); format_temporal_value(buf, sizeof(buf), map->column_types[i], &tm); avro_value_set_string(&field, buf); + MXS_INFO("[%ld] TEMPORAL: %s", i, buf); ss_dassert(ptr < end); } /** All numeric types (INT, LONG, FLOAT etc.) */ @@ -661,6 +679,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value ptr += unpack_numeric_field(ptr, map->column_types[i], &metadata[metadata_offset], lval); set_numeric_field_value(&field, map->column_types[i], &metadata[metadata_offset], lval); + MXS_INFO("[%ld] NUMERIC: %ld", i, *((int64_t*)lval)); ss_dassert(ptr < end); } ss_dassert(metadata_offset <= map->column_metadata_size); diff --git a/server/modules/routing/avro/avro_schema.c b/server/modules/routing/avro/avro_schema.c index 8c339e9e9..13bb44bc1 100644 --- a/server/modules/routing/avro/avro_schema.c +++ b/server/modules/routing/avro/avro_schema.c @@ -491,7 +491,6 @@ static const char *extract_field_name(const char* ptr, char* dest, size_t size) dest[bytes] = '\0'; make_valid_avro_identifier(dest); - ptr = next_field_definition(ptr); } else { @@ -501,62 +500,97 @@ static const char *extract_field_name(const char* ptr, char* dest, size_t size) return ptr; } +int extract_type_length(const char* ptr, char *dest) +{ + /** Skip any leading whitespace */ + while (isspace(*ptr) || *ptr == '`') + { + ptr++; + } + + /** The field type definition starts here */ + const char *start = ptr; + + /** Skip characters until we either hit a whitespace character or the start + * of the length definition. */ + while (!isspace(*ptr) && *ptr != '(') + { + ptr++; + } + + /** Store type */ + int typelen = ptr - start; + memcpy(dest, start, typelen); + dest[typelen] = '\0'; + + /** Skip whitespace */ + while (isspace(*ptr)) + { + ptr++; + } + + int rval = -1; // No length defined + + /** Start of length definition */ + if (*ptr == '(') + { + ptr++; + char *end; + int val = strtol(ptr, &end, 10); + + if (*end == ')') + { + rval = val; + } + } + + return rval; +} + +int count_columns(const char* ptr) +{ + int i = 2; + + while ((ptr = strchr(ptr, ','))) + { + ptr++; + i++; + } + + return i; +} + /** * Process a table definition into an array of column names * @param nameptr table definition * @return Number of processed columns or -1 on error */ -static int process_column_definition(const char *nameptr, char*** dest) +static int process_column_definition(const char *nameptr, char*** dest, char*** dest_types, int** dest_lens) { - /** Process columns in groups of 8 */ - size_t chunks = 1; - const size_t chunk_size = 8; - int i = 0; - char **names = malloc(sizeof(char*) * (chunks * chunk_size + 1)); - - if (names == NULL) - { - MXS_ERROR("Memory allocation failed when trying allocate %ld bytes of memory.", - sizeof(char*) * chunks); - return -1; - } + int n = count_columns(nameptr); + *dest = malloc(sizeof(char*) * n); + *dest_types = malloc(sizeof(char*) * n); + *dest_lens = malloc(sizeof(int) * n); + char **names = *dest; + char **types = *dest_types; + int *lengths = *dest_lens; char colname[512]; + int i = 0; while ((nameptr = extract_field_name(nameptr, colname, sizeof(colname)))) { - if (i >= chunks * chunk_size) - { - char **tmp = realloc(names, (++chunks * chunk_size + 1) * sizeof(char*)); - if (tmp == NULL) - { - for (int x = 0; x < i; x++) - { - free(names[x]); - } - free(names); - MXS_ERROR("Memory allocation failed when trying allocate %ld bytes of memory.", - sizeof(char*) * chunks); - return -1; - } - names = tmp; - } + ss_dassert(i < n); + char type[100] = ""; + int len = extract_type_length(nameptr, type); + nameptr = next_field_definition(nameptr); - if ((names[i++] = strdup(colname)) == NULL) - { - for (int x = 0; x < i; x++) - { - free(names[x]); - } - free(names); - MXS_ERROR("Memory allocation failed when trying allocate %lu bytes " - "of memory.", strlen(colname)); - return -1; - } + lengths[i] = len; + types[i] = strdup(type); + names[i] = strdup(colname); + i++; } - *dest = names; - return i; } @@ -600,7 +634,7 @@ TABLE_CREATE* table_create_alloc(const char* sql, const char* event_db) char database[MYSQL_DATABASE_MAXLEN + 1]; const char *db = event_db; - MXS_DEBUG("Create table statement: %.*s", stmt_len, statement_sql); + MXS_INFO("Create table: %s", sql); if (!get_table_name(sql, table)) { @@ -620,8 +654,10 @@ TABLE_CREATE* table_create_alloc(const char* sql, const char* event_db) db = database; } + int* lengths = NULL; char **names = NULL; - int n_columns = process_column_definition(statement_sql, &names); + char **types = NULL; + int n_columns = process_column_definition(statement_sql, &names, &types, &lengths); ss_dassert(n_columns > 0); /** We have appear to have a valid CREATE TABLE statement */ @@ -633,6 +669,8 @@ TABLE_CREATE* table_create_alloc(const char* sql, const char* event_db) rval->version = 1; rval->was_used = false; rval->column_names = names; + rval->column_lengths = lengths; + rval->column_types = types; rval->columns = n_columns; rval->database = strdup(db); rval->table = strdup(table); @@ -675,8 +713,11 @@ void* table_create_free(TABLE_CREATE* value) for (uint64_t i = 0; i < value->columns; i++) { free(value->column_names[i]); + free(value->column_types[i]); } free(value->column_names); + free(value->column_types); + free(value->column_lengths); free(value->table); free(value->database); free(value); @@ -822,7 +863,7 @@ bool table_create_alter(TABLE_CREATE *create, const char *sql, const char *end) if (tok) { - MXS_DEBUG("Altering table %.*s\n", len, tok); + MXS_INFO("Alter table '%.*s'; %.*s\n", len, tok, (int)(end - sql), sql); def = tok + len; }