diff --git a/CMakeLists.txt b/CMakeLists.txt index 4f8cf8105..38a98f78c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -60,6 +60,10 @@ include_directories(${PCRE2_INCLUDE_DIRS}) if(NOT MARIADB_CONNECTOR_FOUND) message(STATUS "Building MariaDB Connector-C from source.") include(cmake/BuildMariaDBConnector.cmake) +else() + # This is required as the core depends on the `connector-c` target + add_custom_target(connector-c) + message(STATUS "Using system Connector-C") endif() if(NOT JANSSON_FOUND) diff --git a/include/maxscale/mysql_binlog.h b/include/maxscale/mysql_binlog.h index cb0c3ae97..6ff0ab6b3 100644 --- a/include/maxscale/mysql_binlog.h +++ b/include/maxscale/mysql_binlog.h @@ -85,7 +85,7 @@ bool column_is_decimal(uint8_t type); bool fixed_string_is_enum(uint8_t type); /** Value unpacking */ -size_t unpack_temporal_value(uint8_t type, uint8_t *ptr, uint8_t* metadata, struct tm *tm); +size_t unpack_temporal_value(uint8_t type, uint8_t *ptr, uint8_t* metadata, int length, struct tm *tm); size_t unpack_enum(uint8_t *ptr, uint8_t *metadata, uint8_t *dest); size_t unpack_numeric_field(uint8_t *ptr, uint8_t type, uint8_t* metadata, uint8_t* val); size_t unpack_bit(uint8_t *ptr, uint8_t *null_mask, uint32_t col_count, diff --git a/query_classifier/test/classify.c b/query_classifier/test/classify.c index 894672b50..533f42b65 100644 --- a/query_classifier/test/classify.c +++ b/query_classifier/test/classify.c @@ -11,7 +11,6 @@ * Public License. */ -#include #include #include #include diff --git a/server/core/maxscale/poll.h b/server/core/maxscale/poll.h index 4a70af0ad..dcf4a8736 100644 --- a/server/core/maxscale/poll.h +++ b/server/core/maxscale/poll.h @@ -57,7 +57,7 @@ void dShowThreads(DCB *dcb); void dShowEventQ(DCB *dcb); void dShowEventStats(DCB *dcb); -int poll_get_stat(POLL_STAT stat); +int64_t poll_get_stat(POLL_STAT stat); RESULTSET *eventTimesGetList(); void poll_send_message(enum poll_message msg, void *data); diff --git a/server/core/mysql_binlog.c b/server/core/mysql_binlog.c index 4c5c38c20..951e6eace 100644 --- a/server/core/mysql_binlog.c +++ b/server/core/mysql_binlog.c @@ -25,6 +25,10 @@ #include #include +#include + +static uint64_t unpack_bytes(uint8_t *ptr, size_t bytes); + /** * @brief Convert a table column type to a string * @@ -216,6 +220,35 @@ static void unpack_year(uint8_t *ptr, struct tm *dest) dest->tm_year = *ptr; } +/** Base-10 logarithm values */ +int64_t log_10_values[] = +{ + 1, + 10, + 100, + 1000, + 10000, + 100000, + 1000000, + 10000000, + 100000000 +}; + +/** + * If the TABLE_COL_TYPE_DATETIME type field is declared as a datetime with + * extra precision, the packed length is shorter than 8 bytes. + */ +size_t datetime_sizes[] = +{ + 5, // DATETIME(0) + 6, // DATETIME(1) + 6, // DATETIME(2) + 7, // DATETIME(3) + 7, // DATETIME(4) + 7, // DATETIME(5) + 8 // DATETIME(6) +}; + /** * @brief Unpack a DATETIME * @@ -224,21 +257,52 @@ static void unpack_year(uint8_t *ptr, struct tm *dest) * @param val Value read from the binary log * @param dest Pointer where the unpacked value is stored */ -static void unpack_datetime(uint8_t *ptr, struct tm *dest) +static void unpack_datetime(uint8_t *ptr, int length, struct tm *dest) { - uint64_t val = 0; - memcpy(&val, ptr, sizeof(val)); - uint32_t second = val - ((val / 100) * 100); - val /= 100; - uint32_t minute = val - ((val / 100) * 100); - val /= 100; - uint32_t hour = val - ((val / 100) * 100); - val /= 100; - uint32_t day = val - ((val / 100) * 100); - val /= 100; - uint32_t month = val - ((val / 100) * 100); - val /= 100; - uint32_t year = val; + int64_t val = 0; + uint32_t second, minute, hour, day, month, year; + + if (length == -1) + { + val = gw_mysql_get_byte8(ptr); + second = val - ((val / 100) * 100); + val /= 100; + minute = val - ((val / 100) * 100); + val /= 100; + hour = val - ((val / 100) * 100); + val /= 100; + day = val - ((val / 100) * 100); + val /= 100; + month = val - ((val / 100) * 100); + val /= 100; + year = val; + } + else + { + // TODO: Figure out why DATETIME(0) doesn't work like it others do + val = unpack_bytes(ptr, datetime_sizes[length]); + val *= log_10_values[6 - length]; + + if (val < 0) + { + val = -val; + } + + int subsecond = val % 1000000; + val /= 1000000; + + second = val % 60; + val /= 60; + minute = val % 60; + val /= 60; + hour = val % 24; + val /= 24; + day = val % 32; + val /= 32; + month = val % 13; + val /= 13; + year = val; + } memset(dest, 0, sizeof(struct tm)); dest->tm_year = year - 1900; @@ -391,14 +455,13 @@ size_t unpack_bit(uint8_t *ptr, uint8_t *null_mask, uint32_t col_count, return metadata[1]; } - /** * @brief Get the length of a temporal field * @param type Field type * @param decimals How many decimals the field has * @return Number of bytes the temporal value takes */ -static size_t temporal_field_size(uint8_t type, uint8_t decimals) +static size_t temporal_field_size(uint8_t type, uint8_t decimals, int length) { switch (type) { @@ -413,7 +476,7 @@ static size_t temporal_field_size(uint8_t type, uint8_t decimals) return 3 + ((decimals + 1) / 2); case TABLE_COL_TYPE_DATETIME: - return 8; + return length < 0 || length > 6 ? 8 : datetime_sizes[length]; case TABLE_COL_TYPE_TIMESTAMP: return 4; @@ -441,7 +504,7 @@ static size_t temporal_field_size(uint8_t type, uint8_t decimals) * @param val Extracted packed value * @param tm Pointer where the unpacked temporal value is stored */ -size_t unpack_temporal_value(uint8_t type, uint8_t *ptr, uint8_t *metadata, struct tm *tm) +size_t unpack_temporal_value(uint8_t type, uint8_t *ptr, uint8_t *metadata, int length, struct tm *tm) { switch (type) { @@ -450,7 +513,7 @@ size_t unpack_temporal_value(uint8_t type, uint8_t *ptr, uint8_t *metadata, stru break; case TABLE_COL_TYPE_DATETIME: - unpack_datetime(ptr, tm); + unpack_datetime(ptr, length, tm); break; case TABLE_COL_TYPE_DATETIME2: @@ -474,7 +537,7 @@ size_t unpack_temporal_value(uint8_t type, uint8_t *ptr, uint8_t *metadata, stru ss_dassert(false); break; } - return temporal_field_size(type, *metadata); + return temporal_field_size(type, *metadata, length); } void format_temporal_value(char *str, size_t size, uint8_t type, struct tm *tm) diff --git a/server/core/poll.c b/server/core/poll.c index 45bafd917..bbc0ab076 100644 --- a/server/core/poll.c +++ b/server/core/poll.c @@ -1517,8 +1517,7 @@ dShowEventStats(DCB *pdcb) * @param stat The required statistic * @return The value of that statistic */ -int -poll_get_stat(POLL_STAT stat) +int64_t poll_get_stat(POLL_STAT stat) { switch (stat) { diff --git a/server/core/test/testfeedback.c b/server/core/test/testfeedback.c index 0375b530d..47463ee85 100644 --- a/server/core/test/testfeedback.c +++ b/server/core/test/testfeedback.c @@ -31,7 +31,6 @@ #undef NDEBUG #endif #define FAILTEST(s) printf("TEST FAILED: " s "\n");return 1; -#include #include #include #include diff --git a/server/modules/filter/mqfilter/mqfilter.c b/server/modules/filter/mqfilter/mqfilter.c index 15b5a6cf2..076000bca 100644 --- a/server/modules/filter/mqfilter/mqfilter.c +++ b/server/modules/filter/mqfilter/mqfilter.c @@ -59,7 +59,6 @@ #define MXS_MODULE_NAME "mqfilter" -#include #include #include #include diff --git a/server/modules/protocol/examples/CMakeLists.txt b/server/modules/protocol/examples/CMakeLists.txt index 679099ef3..ec26bb0b0 100644 --- a/server/modules/protocol/examples/CMakeLists.txt +++ b/server/modules/protocol/examples/CMakeLists.txt @@ -2,4 +2,5 @@ install_script(cdc.py core) install_script(cdc_users.py core) install_script(cdc_last_transaction.py core) install_script(cdc_kafka_producer.py core) +install_script(cdc_schema.py core) install_file(cdc_schema.go core) diff --git a/server/modules/protocol/examples/cdc.py b/server/modules/protocol/examples/cdc.py index cfbb74c47..df5853d9f 100755 --- a/server/modules/protocol/examples/cdc.py +++ b/server/modules/protocol/examples/cdc.py @@ -12,52 +12,32 @@ # Public License. import time -import json -import re import sys import socket import hashlib import argparse -import subprocess import selectors import binascii import os -# Read data as JSON -def read_json(): - decoder = json.JSONDecoder() - rbuf = bytes() - ep = selectors.EpollSelector() - ep.register(sock, selectors.EVENT_READ) +def read_data(): + sel = selectors.DefaultSelector() + sel.register(sock, selectors.EVENT_READ) while True: - pollrc = ep.select(timeout=int(opts.read_timeout) if int(opts.read_timeout) > 0 else None) try: + events = sel.select(timeout=int(opts.read_timeout) if int(opts.read_timeout) > 0 else None) buf = sock.recv(4096, socket.MSG_DONTWAIT) - rbuf += buf - while True: - rbuf = rbuf.lstrip() - data = decoder.raw_decode(rbuf.decode('utf_8')) - rbuf = rbuf[data[1]:] - print(json.dumps(data[0])) - except ValueError as err: - sys.stdout.flush() - pass - except Exception: + if len(buf) > 0: + os.write(sys.stdout.fileno(), buf) + sys.stdout.flush() + else: + raise Exception('Socket was closed') + + except BlockingIOError: break - -# Read data as Avro -def read_avro(): - ep = selectors.EpollSelector() - ep.register(sock, selectors.EVENT_READ) - - while True: - pollrc = ep.select(timeout=int(opts.read_timeout) if int(opts.read_timeout) > 0 else None) - try: - buf = sock.recv(4096, socket.MSG_DONTWAIT) - os.write(sys.stdout.fileno(), buf) - sys.stdout.flush() - except Exception: + except Exception as ex: + print(ex, file=sys.stderr) break parser = argparse.ArgumentParser(description = "CDC Binary consumer", conflict_handler="resolve") @@ -91,7 +71,4 @@ response = str(sock.recv(1024)).encode('utf_8') # Request a data stream sock.send(bytes(("REQUEST-DATA " + opts.FILE + (" " + opts.GTID if opts.GTID else "")).encode())) -if opts.format == "JSON": - read_json() -elif opts.format == "AVRO": - read_avro() +read_data() diff --git a/server/modules/protocol/examples/cdc_schema.py b/server/modules/protocol/examples/cdc_schema.py new file mode 100755 index 000000000..8b22bfd20 --- /dev/null +++ b/server/modules/protocol/examples/cdc_schema.py @@ -0,0 +1,88 @@ +#!/usr/bin/env python3 + +# Copyright (c) 2016 MariaDB Corporation Ab +# +# Use of this software is governed by the Business Source License included +# in the LICENSE.TXT file and at www.mariadb.com/bsl11. +# +# Change Date: 2019-07-01 +# +# On the date above, in accordance with the Business Source License, use +# of this software will be governed by version 2 or later of the General +# Public License. + +# +# This program requires the MySQL Connector/Python to work +# + +import mysql.connector as mysql +import json +import sys +import argparse + +parser = argparse.ArgumentParser(description = "CDC Schema Generator", conflict_handler="resolve", epilog="""This program generates CDC schema files for a specific table or all the tables in a database. The +schema files need to be generated if the binary log files do not contain the +CREATE TABLE events that define the table layout.""") +parser.add_argument("-h", "--host", dest="host", help="Network address where the connection is made", default="localhost") +parser.add_argument("-P", "--port", dest="port", help="Port where the connection is made", default="3306") +parser.add_argument("-u", "--user", dest="user", help="Username used when connecting", default="") +parser.add_argument("-p", "--password", dest="password", help="Password used when connecting", default="") +parser.add_argument("DATABASE", help="Generate Avro schemas for this database") + +opts = parser.parse_args(sys.argv[1:]) + +def parse_field(row): + res = dict() + name = row[1].lower().split('(')[0] + + if name in ("date", "datetime", "time", "timestamp", "year", "tinytext", "text", + "mediumtext", "longtext", "char", "varchar", "enum", "set"): + res["type"] = "string" + elif name in ("tinyblob", "blob", "mediumblob", "longblob", "binary", "varbinary"): + res["type"] = "bytes" + elif name in ("int", "smallint", "mediumint", "integer", "tinyint", "short", "bit"): + res["type"] = "int" + elif name in ("float"): + res["type"] = "float" + elif name in ("double", "decimal"): + res["type"] = "double" + elif name in ("null"): + res["type"] = "null" + elif name in ("long", "bigint"): + res["type"] = "long" + else: + res["type"] = "string" + + + res["name"] = row[0].lower() + + return res + +try: + conn = mysql.connect(user=opts.user, password=opts.password, host=opts.host, port=opts.port) + cursor = conn.cursor() + cursor.execute("SHOW TABLES FROM {}".format(opts.DATABASE)) + + tables = [] + for res in cursor: + tables.append(res[0]) + + + for t in tables: + schema = dict(namespace="MaxScaleChangeDataSchema.avro", type="record", name="ChangeRecord", fields=[]) + cursor.execute("DESCRIBE {}.{}".format(opts.DATABASE, t)) + + for res in cursor: + schema["fields"].append(parse_field(res)) + + dest = open("{}.{}.000001.avsc".format(opts.DATABASE, t), 'w') + dest.write(json.dumps(schema)) + dest.close() + + cursor.close() + conn.close() + +except Exception as e: + print(e) + exit(1) + diff --git a/server/modules/routing/avrorouter/avro_file.c b/server/modules/routing/avrorouter/avro_file.c index 5f9745435..f1f357d0b 100644 --- a/server/modules/routing/avrorouter/avro_file.c +++ b/server/modules/routing/avrorouter/avro_file.c @@ -475,6 +475,17 @@ void notify_all_clients(AVRO_INSTANCE *router) } } +void do_checkpoint(AVRO_INSTANCE *router, uint64_t *total_rows, uint64_t *total_commits) +{ + update_used_tables(router); + avro_flush_all_tables(router, AVROROUTER_FLUSH); + avro_save_conversion_state(router); + notify_all_clients(router); + *total_rows += router->row_count; + *total_commits += router->trx_count; + router->row_count = router->trx_count = 0; +} + /** * @brief Read all replication events from a binlog file. * @@ -555,6 +566,8 @@ avro_binlog_end_t avro_read_all_events(AVRO_INSTANCE *router) } else { + do_checkpoint(router, &total_rows, &total_commits); + MXS_INFO("Processed %lu transactions and %lu row events.", total_commits, total_rows); if (rotate_seen) @@ -742,13 +755,7 @@ avro_binlog_end_t avro_read_all_events(AVRO_INSTANCE *router) if (router->row_count >= router->row_target || router->trx_count >= router->trx_target) { - update_used_tables(router); - avro_flush_all_tables(router, AVROROUTER_SYNC); - avro_save_conversion_state(router); - notify_all_clients(router); - total_rows += router->row_count; - total_commits += router->trx_count; - router->row_count = router->trx_count = 0; + do_checkpoint(router, &total_rows, &total_commits); } } diff --git a/server/modules/routing/avrorouter/avro_rbr.c b/server/modules/routing/avrorouter/avro_rbr.c index 0b086cefb..bc4cffb87 100644 --- a/server/modules/routing/avrorouter/avro_rbr.c +++ b/server/modules/routing/avrorouter/avro_rbr.c @@ -162,6 +162,11 @@ bool handle_table_map_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr "table until a DDL statement for it is read.", table_ident); } + if (rval) + { + MXS_INFO("Table Map for '%s' at %lu", table_ident, router->current_pos); + } + return rval; } @@ -289,9 +294,13 @@ bool handle_row_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr) * beforehand so we must continue processing them until we reach the end * of the event. */ int rows = 0; + MXS_INFO("Row Event for '%s' at %lu", table_ident, router->current_pos); while (ptr - start < hdr->event_size - BINLOG_EVENT_HDR_LEN) { + static uint64_t total_row_count = 1; + MXS_INFO("Row %lu", total_row_count++); + /** Add the current GTID and timestamp */ uint8_t *end = ptr + hdr->event_size - BINLOG_EVENT_HDR_LEN; int event_type = get_event_type(hdr->event_type); @@ -507,9 +516,8 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value ptr += (ncolumns + 7) / 8; ss_dassert(ptr < end); - for (long i = 0; i < map->columns && npresent < ncolumns; i++) + for (long i = 0; i < map->columns && i < create->columns && npresent < ncolumns; i++) { - ss_dassert(create->columns == map->columns); ss_debug(int rc = )avro_value_get_by_name(record, create->column_names[i], &field, NULL); ss_dassert(rc == 0); @@ -518,6 +526,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value npresent++; if (bit_is_set(null_bitmap, ncolumns, i)) { + MXS_INFO("[%ld] NULL", i); if (column_is_blob(map->column_types[i])) { uint8_t nullvalue = 0; @@ -547,17 +556,45 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value MXS_WARNING("ENUM/SET values larger than 255 values aren't supported."); } avro_value_set_string(&field, strval); + MXS_INFO("[%ld] ENUM: %lu bytes", i, bytes); ptr += bytes; ss_dassert(ptr < end); } else { - uint8_t bytes = *ptr; + /** + * The first byte in the metadata stores the real type of + * the string (ENUM and SET types are also stored as fixed + * length strings). + * + * The first two bits of the second byte contain the XOR'ed + * field length but as that information is not relevant for + * us, we just use this information to know whether to read + * one or two bytes for string length. + */ + + uint16_t meta = metadata[metadata_offset + 1] + (metadata[metadata_offset] << 8); + int bytes = 0; + uint16_t extra_length = (((meta >> 4) & 0x300) ^ 0x300); + uint16_t field_length = (meta & 0xff) + extra_length; + + if (field_length > 255) + { + bytes = ptr[0] + (ptr[1] << 8); + ptr += 2; + } + else + { + bytes = *ptr++; + } + + MXS_INFO("[%ld] CHAR: field: %d bytes, data: %d bytes", i, field_length, bytes); + ss_dassert(bytes || *ptr == '\0'); char str[bytes + 1]; - memcpy(str, ptr + 1, bytes); + memcpy(str, ptr, bytes); str[bytes] = '\0'; avro_value_set_string(&field, str); - ptr += bytes + 1; + ptr += bytes; ss_dassert(ptr < end); } } @@ -577,6 +614,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value MXS_WARNING("BIT is not currently supported, values are stored as 0."); } avro_value_set_int(&field, value); + MXS_INFO("[%ld] BIT", i); ptr += bytes; ss_dassert(ptr < end); } @@ -585,6 +623,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value double f_value = 0.0; ptr += unpack_decimal_field(ptr, metadata + metadata_offset, &f_value); avro_value_set_double(&field, f_value); + MXS_INFO("[%ld] DOUBLE", i); ss_dassert(ptr < end); } else if (column_is_variable_string(map->column_types[i])) @@ -602,6 +641,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value ptr++; } + MXS_INFO("[%ld] VARCHAR: field: %d bytes, data: %lu bytes", i, bytes, sz); char buf[sz + 1]; memcpy(buf, ptr, sz); buf[sz] = '\0'; @@ -615,6 +655,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value uint64_t len = 0; memcpy(&len, ptr, bytes); ptr += bytes; + MXS_INFO("[%ld] BLOB: field: %d bytes, data: %lu bytes", i, bytes, len); if (len) { avro_value_set_bytes(&field, ptr, len); @@ -631,9 +672,12 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value { char buf[80]; struct tm tm; - ptr += unpack_temporal_value(map->column_types[i], ptr, &metadata[metadata_offset], &tm); + ptr += unpack_temporal_value(map->column_types[i], ptr, + &metadata[metadata_offset], + create->column_lengths[i], &tm); format_temporal_value(buf, sizeof(buf), map->column_types[i], &tm); avro_value_set_string(&field, buf); + MXS_INFO("[%ld] TEMPORAL: %s", i, buf); ss_dassert(ptr < end); } /** All numeric types (INT, LONG, FLOAT etc.) */ diff --git a/server/modules/routing/avrorouter/avro_schema.c b/server/modules/routing/avrorouter/avro_schema.c index 4e9458074..a15547784 100644 --- a/server/modules/routing/avrorouter/avro_schema.c +++ b/server/modules/routing/avrorouter/avro_schema.c @@ -124,9 +124,11 @@ char* json_new_schema_from_table(TABLE_MAP *map) for (uint64_t i = 0; i < map->columns; i++) { - json_array_append(array, json_pack_ex(&err, 0, "{s:s, s:s}", "name", - create->column_names[i], "type", - column_type_to_avro_type(map->column_types[i]))); + json_array_append(array, json_pack_ex(&err, 0, "{s:s, s:s, s:s, s:i}", + "name", create->column_names[i], + "type", column_type_to_avro_type(map->column_types[i]), + "real_type", create->column_types[i], + "length", create->column_lengths[i])); } json_object_set_new(schema, "fields", array); char* rval = json_dumps(schema, JSON_PRESERVE_ORDER); @@ -172,8 +174,10 @@ bool json_extract_field_names(const char* filename, TABLE_CREATE *table) { int array_size = json_array_size(arr); table->column_names = (char**)MXS_MALLOC(sizeof(char*) * (array_size)); + table->column_types = (char**)MXS_MALLOC(sizeof(char*) * (array_size)); + table->column_lengths = (int*)MXS_MALLOC(sizeof(int) * (array_size)); - if (table->column_names) + if (table->column_names && table->column_types && table->column_lengths) { int columns = 0; rval = true; @@ -184,6 +188,28 @@ bool json_extract_field_names(const char* filename, TABLE_CREATE *table) if (json_is_object(val)) { + json_t* value; + + if ((value = json_object_get(val, "real_type")) && json_is_string(value)) + { + table->column_types[columns] = MXS_STRDUP_A(json_string_value(value)); + } + else + { + table->column_types[columns] = MXS_STRDUP_A("unknown"); + MXS_WARNING("No \"real_type\" value defined. Treating as unknown type field."); + } + + if ((value = json_object_get(val, "length")) && json_is_integer(value)) + { + table->column_lengths[columns] = json_integer_value(value); + } + else + { + table->column_lengths[columns] = -1; + MXS_WARNING("No \"length\" value defined. Treating as default length field."); + } + json_t *name = json_object_get(val, "name"); if (name && json_is_string(name)) { @@ -489,7 +515,6 @@ static const char *extract_field_name(const char* ptr, char* dest, size_t size) dest[bytes] = '\0'; make_valid_avro_identifier(dest); - ptr = next_field_definition(ptr); } else { @@ -499,56 +524,98 @@ static const char *extract_field_name(const char* ptr, char* dest, size_t size) return ptr; } +int extract_type_length(const char* ptr, char *dest) +{ + /** Skip any leading whitespace */ + while (isspace(*ptr) || *ptr == '`') + { + ptr++; + } + + /** The field type definition starts here */ + const char *start = ptr; + + /** Skip characters until we either hit a whitespace character or the start + * of the length definition. */ + while (!isspace(*ptr) && *ptr != '(') + { + ptr++; + } + + /** Store type */ + int typelen = ptr - start; + memcpy(dest, start, typelen); + dest[typelen] = '\0'; + + /** Skip whitespace */ + while (isspace(*ptr)) + { + ptr++; + } + + int rval = -1; // No length defined + + /** Start of length definition */ + if (*ptr == '(') + { + ptr++; + char *end; + int val = strtol(ptr, &end, 10); + + if (*end == ')') + { + rval = val; + } + } + + return rval; +} + +int count_columns(const char* ptr) +{ + int i = 2; + + while ((ptr = strchr(ptr, ','))) + { + ptr++; + i++; + } + + return i; +} + /** * Process a table definition into an array of column names * @param nameptr table definition * @return Number of processed columns or -1 on error */ -static int process_column_definition(const char *nameptr, char*** dest) +static int process_column_definition(const char *nameptr, char*** dest, char*** dest_types, int** dest_lens) { - /** Process columns in groups of 8 */ - size_t chunks = 1; - const size_t chunk_size = 8; - int i = 0; - char **names = MXS_MALLOC(sizeof(char*) * (chunks * chunk_size + 1)); - - if (names == NULL) - { - return -1; - } + int n = count_columns(nameptr); + *dest = MXS_MALLOC(sizeof(char*) * n); + *dest_types = MXS_MALLOC(sizeof(char*) * n); + *dest_lens = MXS_MALLOC(sizeof(int) * n); + char **names = *dest; + char **types = *dest_types; + int *lengths = *dest_lens; char colname[512]; + int i = 0; while ((nameptr = extract_field_name(nameptr, colname, sizeof(colname)))) { - if (i >= chunks * chunk_size) - { - char **tmp = MXS_REALLOC(names, (++chunks * chunk_size + 1) * sizeof(char*)); - if (tmp == NULL) - { - for (int x = 0; x < i; x++) - { - MXS_FREE(names[x]); - } - MXS_FREE(names); - return -1; - } - names = tmp; - } + ss_dassert(i < n); + char type[100] = ""; + int len = extract_type_length(nameptr, type); + nameptr = next_field_definition(nameptr); + fix_reserved_word(colname); - if ((names[i++] = MXS_STRDUP(colname)) == NULL) - { - for (int x = 0; x < i; x++) - { - MXS_FREE(names[x]); - } - MXS_FREE(names); - return -1; - } + lengths[i] = len; + types[i] = MXS_STRDUP_A(type); + names[i] = MXS_STRDUP_A(colname); + i++; } - *dest = names; - return i; } @@ -601,7 +668,7 @@ TABLE_CREATE* table_create_alloc(const char* sql, const char* event_db) char database[MYSQL_DATABASE_MAXLEN + 1]; const char *db = event_db; - MXS_DEBUG("Create table statement: %.*s", stmt_len, statement_sql); + MXS_INFO("Create table: %s", sql); if (!get_table_name(sql, table)) { @@ -621,8 +688,10 @@ TABLE_CREATE* table_create_alloc(const char* sql, const char* event_db) db = database; } + int* lengths = NULL; char **names = NULL; - int n_columns = process_column_definition(statement_sql, &names); + char **types = NULL; + int n_columns = process_column_definition(statement_sql, &names, &types, &lengths); ss_dassert(n_columns > 0); /** We have appear to have a valid CREATE TABLE statement */ @@ -634,6 +703,8 @@ TABLE_CREATE* table_create_alloc(const char* sql, const char* event_db) rval->version = 1; rval->was_used = false; rval->column_names = names; + rval->column_lengths = lengths; + rval->column_types = types; rval->columns = n_columns; rval->database = MXS_STRDUP(db); rval->table = MXS_STRDUP(table); @@ -675,8 +746,11 @@ void table_create_free(TABLE_CREATE* value) for (uint64_t i = 0; i < value->columns; i++) { MXS_FREE(value->column_names[i]); + MXS_FREE(value->column_types[i]); } MXS_FREE(value->column_names); + MXS_FREE(value->column_types); + MXS_FREE(value->column_lengths); MXS_FREE(value->table); MXS_FREE(value->database); MXS_FREE(value); @@ -792,6 +866,26 @@ void make_avro_token(char* dest, const char* src, int length) memcpy(dest, src, length); dest[length] = '\0'; + fix_reserved_word(dest); +} + +int get_column_index(TABLE_CREATE *create, const char *tok) +{ + int idx = -1; + char safe_tok[strlen(tok) + 2]; + strcpy(safe_tok, tok); + fix_reserved_word(safe_tok); + + for (int x = 0; x < create->columns; x++) + { + if (strcasecmp(create->column_names[x], tok) == 0) + { + idx = x; + break; + } + } + + return idx; } bool table_create_alter(TABLE_CREATE *create, const char *sql, const char *end) @@ -805,7 +899,7 @@ bool table_create_alter(TABLE_CREATE *create, const char *sql, const char *end) if (tok) { - MXS_DEBUG("Altering table %.*s\n", len, tok); + MXS_INFO("Alter table '%.*s'; %.*s\n", len, tok, (int)(end - sql), sql); def = tok + len; } @@ -844,27 +938,45 @@ bool table_create_alter(TABLE_CREATE *create, const char *sql, const char *end) { tok = get_tok(tok + len, &len, end); - MXS_FREE(create->column_names[create->columns - 1]); - char ** tmp = MXS_REALLOC(create->column_names, sizeof(char*) * create->columns - 1); - ss_dassert(tmp); + int idx = get_column_index(create, tok); - if (tmp == NULL) + if (idx != -1) { - return false; + MXS_FREE(create->column_names[idx]); + for (int i = idx; i < (int)create->columns - 1; i++) + { + create->column_names[i] = create->column_names[i + 1]; + } + + char ** tmp = realloc(create->column_names, sizeof(char*) * create->columns - 1); + ss_dassert(tmp); + + if (tmp == NULL) + { + return false; + } + + create->column_names = tmp; + create->columns--; + updates++; } - create->column_names = tmp; - create->columns--; - updates++; tok = get_next_def(tok, end); len = 0; } else if (tok_eq(ptok, "change", plen) && tok_eq(tok, "column", len)) { tok = get_tok(tok + len, &len, end); - MXS_FREE(create->column_names[create->columns - 1]); - create->column_names[create->columns - 1] = strndup(tok, len); - updates++; + + int idx = get_column_index(create, tok); + + if (idx != -1) + { + MXS_FREE(create->column_names[idx]); + create->column_names[idx] = strndup(tok, len); + updates++; + } + tok = get_next_def(tok, end); len = 0; } @@ -975,7 +1087,6 @@ TABLE_MAP *table_map_alloc(uint8_t *ptr, uint8_t hdr_len, TABLE_CREATE* create) map->id = table_id; map->version = create->version; map->flags = flags; - ss_dassert(column_count == create->columns); map->columns = column_count; map->column_types = MXS_MALLOC(column_count); /** Allocate at least one byte for the metadata */ diff --git a/server/modules/routing/avrorouter/avrorouter.h b/server/modules/routing/avrorouter/avrorouter.h index aa33922a9..ec9f0f268 100644 --- a/server/modules/routing/avrorouter/avrorouter.h +++ b/server/modules/routing/avrorouter/avrorouter.h @@ -84,6 +84,23 @@ static const char *avro_event_type = "event_type"; static const char *avro_timestamp = "timestamp"; static char *avro_client_ouput[] = { "Undefined", "JSON", "Avro" }; +static inline bool is_reserved_word(const char* word) +{ + return strcasecmp(word, avro_domain) == 0 || + strcasecmp(word, avro_server_id) == 0 || + strcasecmp(word, avro_sequence) == 0 || + strcasecmp(word, avro_event_number) == 0 || + strcasecmp(word, avro_event_type) == 0 || + strcasecmp(word, avro_timestamp) == 0; +} + +static inline void fix_reserved_word(char *tok) +{ + if (is_reserved_word(tok)) + { + strcat(tok, "_"); + } +} /** How a binlog file is closed */ typedef enum avro_binlog_end @@ -111,6 +128,8 @@ typedef struct table_create { uint64_t columns; char **column_names; + char **column_types; + int* column_lengths; char *table; char *database; int version; /**< How many versions of this table have been used */ diff --git a/server/modules/routing/maxinfo/maxinfo_exec.c b/server/modules/routing/maxinfo/maxinfo_exec.c index 9d58a2f56..f4a37f967 100644 --- a/server/modules/routing/maxinfo/maxinfo_exec.c +++ b/server/modules/routing/maxinfo/maxinfo_exec.c @@ -29,6 +29,7 @@ #include #include #include +#include #include #include @@ -998,7 +999,7 @@ maxinfo_zombie_dcbs() /** * Interface to poll stats for reads */ -static int +static int64_t maxinfo_read_events() { return poll_get_stat(POLL_STAT_READ); @@ -1007,7 +1008,7 @@ maxinfo_read_events() /** * Interface to poll stats for writes */ -static int +static int64_t maxinfo_write_events() { return poll_get_stat(POLL_STAT_WRITE); @@ -1016,7 +1017,7 @@ maxinfo_write_events() /** * Interface to poll stats for errors */ -static int +static int64_t maxinfo_error_events() { return poll_get_stat(POLL_STAT_ERROR); @@ -1025,7 +1026,7 @@ maxinfo_error_events() /** * Interface to poll stats for hangup */ -static int +static int64_t maxinfo_hangup_events() { return poll_get_stat(POLL_STAT_HANGUP); @@ -1034,7 +1035,7 @@ maxinfo_hangup_events() /** * Interface to poll stats for accepts */ -static int +static int64_t maxinfo_accept_events() { return poll_get_stat(POLL_STAT_ACCEPT); @@ -1043,7 +1044,7 @@ maxinfo_accept_events() /** * Interface to poll stats for event queue length */ -static int +static int64_t maxinfo_event_queue_length() { return poll_get_stat(POLL_STAT_EVQ_LEN); @@ -1052,7 +1053,7 @@ maxinfo_event_queue_length() /** * Interface to poll stats for max event queue length */ -static int +static int64_t maxinfo_max_event_queue_length() { return poll_get_stat(POLL_STAT_EVQ_MAX); @@ -1061,7 +1062,7 @@ maxinfo_max_event_queue_length() /** * Interface to poll stats for max queue time */ -static int +static int64_t maxinfo_max_event_queue_time() { return poll_get_stat(POLL_STAT_MAX_QTIME); @@ -1070,7 +1071,7 @@ maxinfo_max_event_queue_time() /** * Interface to poll stats for max event execution time */ -static int +static int64_t maxinfo_max_event_exec_time() { return poll_get_stat(POLL_STAT_MAX_EXECTIME); @@ -1137,17 +1138,17 @@ status_row(RESULTSET *result, void *data) resultset_row_set(row, 0, status[context->index].name); switch (status[context->index].type) { - case VT_STRING: - resultset_row_set(row, 1, - (char *)(*status[context->index].func)()); - break; - case VT_INT: - snprintf(buf, 80, "%ld", - (long)(*status[context->index].func)()); - resultset_row_set(row, 1, buf); - break; - default: - ss_dassert(!true); + case VT_STRING: + resultset_row_set(row, 1, + (char *)(*status[context->index].func)()); + break; + case VT_INT: + snprintf(buf, 80, "%" PRId64, + (int64_t)(*status[context->index].func)()); + resultset_row_set(row, 1, buf); + break; + default: + ss_dassert(!true); } context->index++; return row; diff --git a/server/modules/routing/readwritesplit/rwsplit_mysql.c b/server/modules/routing/readwritesplit/rwsplit_mysql.c index fb8534089..7dd2f41d7 100644 --- a/server/modules/routing/readwritesplit/rwsplit_mysql.c +++ b/server/modules/routing/readwritesplit/rwsplit_mysql.c @@ -13,7 +13,6 @@ #include "readwritesplit.h" -#include #include #include #include diff --git a/server/modules/routing/schemarouter/schemarouter.c b/server/modules/routing/schemarouter/schemarouter.c index 9baf801d4..bc7811a41 100644 --- a/server/modules/routing/schemarouter/schemarouter.c +++ b/server/modules/routing/schemarouter/schemarouter.c @@ -13,7 +13,6 @@ #include "schemarouter.h" -#include #include #include #include diff --git a/server/modules/routing/schemarouter/sharding_common.h b/server/modules/routing/schemarouter/sharding_common.h index 5ad404c86..1ad65fe07 100644 --- a/server/modules/routing/schemarouter/sharding_common.h +++ b/server/modules/routing/schemarouter/sharding_common.h @@ -15,7 +15,6 @@ */ #include -#include #include #include #include