diff --git a/CMakeLists.txt b/CMakeLists.txt index 4f8cf8105..38a98f78c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -60,6 +60,10 @@ include_directories(${PCRE2_INCLUDE_DIRS}) if(NOT MARIADB_CONNECTOR_FOUND) message(STATUS "Building MariaDB Connector-C from source.") include(cmake/BuildMariaDBConnector.cmake) +else() + # This is required as the core depends on the `connector-c` target + add_custom_target(connector-c) + message(STATUS "Using system Connector-C") endif() if(NOT JANSSON_FOUND) diff --git a/query_classifier/test/classify.c b/query_classifier/test/classify.c index 894672b50..533f42b65 100644 --- a/query_classifier/test/classify.c +++ b/query_classifier/test/classify.c @@ -11,7 +11,6 @@ * Public License. */ -#include #include #include #include diff --git a/server/core/maxscale/poll.h b/server/core/maxscale/poll.h index 4a70af0ad..dcf4a8736 100644 --- a/server/core/maxscale/poll.h +++ b/server/core/maxscale/poll.h @@ -57,7 +57,7 @@ void dShowThreads(DCB *dcb); void dShowEventQ(DCB *dcb); void dShowEventStats(DCB *dcb); -int poll_get_stat(POLL_STAT stat); +int64_t poll_get_stat(POLL_STAT stat); RESULTSET *eventTimesGetList(); void poll_send_message(enum poll_message msg, void *data); diff --git a/server/core/poll.c b/server/core/poll.c index 45bafd917..bbc0ab076 100644 --- a/server/core/poll.c +++ b/server/core/poll.c @@ -1517,8 +1517,7 @@ dShowEventStats(DCB *pdcb) * @param stat The required statistic * @return The value of that statistic */ -int -poll_get_stat(POLL_STAT stat) +int64_t poll_get_stat(POLL_STAT stat) { switch (stat) { diff --git a/server/core/test/testfeedback.c b/server/core/test/testfeedback.c index 0375b530d..47463ee85 100644 --- a/server/core/test/testfeedback.c +++ b/server/core/test/testfeedback.c @@ -31,7 +31,6 @@ #undef NDEBUG #endif #define FAILTEST(s) printf("TEST FAILED: " s "\n");return 1; -#include #include #include #include diff --git a/server/modules/filter/mqfilter/mqfilter.c b/server/modules/filter/mqfilter/mqfilter.c index 15b5a6cf2..076000bca 100644 --- a/server/modules/filter/mqfilter/mqfilter.c +++ b/server/modules/filter/mqfilter/mqfilter.c @@ -59,7 +59,6 @@ #define MXS_MODULE_NAME "mqfilter" -#include #include #include #include diff --git a/server/modules/protocol/examples/CMakeLists.txt b/server/modules/protocol/examples/CMakeLists.txt index 679099ef3..ec26bb0b0 100644 --- a/server/modules/protocol/examples/CMakeLists.txt +++ b/server/modules/protocol/examples/CMakeLists.txt @@ -2,4 +2,5 @@ install_script(cdc.py core) install_script(cdc_users.py core) install_script(cdc_last_transaction.py core) install_script(cdc_kafka_producer.py core) +install_script(cdc_schema.py core) install_file(cdc_schema.go core) diff --git a/server/modules/protocol/examples/cdc.py b/server/modules/protocol/examples/cdc.py index cfbb74c47..df5853d9f 100755 --- a/server/modules/protocol/examples/cdc.py +++ b/server/modules/protocol/examples/cdc.py @@ -12,52 +12,32 @@ # Public License. import time -import json -import re import sys import socket import hashlib import argparse -import subprocess import selectors import binascii import os -# Read data as JSON -def read_json(): - decoder = json.JSONDecoder() - rbuf = bytes() - ep = selectors.EpollSelector() - ep.register(sock, selectors.EVENT_READ) +def read_data(): + sel = selectors.DefaultSelector() + sel.register(sock, selectors.EVENT_READ) while True: - pollrc = ep.select(timeout=int(opts.read_timeout) if int(opts.read_timeout) > 0 else None) try: + events = sel.select(timeout=int(opts.read_timeout) if int(opts.read_timeout) > 0 else None) buf = sock.recv(4096, socket.MSG_DONTWAIT) - rbuf += buf - while True: - rbuf = rbuf.lstrip() - data = decoder.raw_decode(rbuf.decode('utf_8')) - rbuf = rbuf[data[1]:] - print(json.dumps(data[0])) - except ValueError as err: - sys.stdout.flush() - pass - except Exception: + if len(buf) > 0: + os.write(sys.stdout.fileno(), buf) + sys.stdout.flush() + else: + raise Exception('Socket was closed') + + except BlockingIOError: break - -# Read data as Avro -def read_avro(): - ep = selectors.EpollSelector() - ep.register(sock, selectors.EVENT_READ) - - while True: - pollrc = ep.select(timeout=int(opts.read_timeout) if int(opts.read_timeout) > 0 else None) - try: - buf = sock.recv(4096, socket.MSG_DONTWAIT) - os.write(sys.stdout.fileno(), buf) - sys.stdout.flush() - except Exception: + except Exception as ex: + print(ex, file=sys.stderr) break parser = argparse.ArgumentParser(description = "CDC Binary consumer", conflict_handler="resolve") @@ -91,7 +71,4 @@ response = str(sock.recv(1024)).encode('utf_8') # Request a data stream sock.send(bytes(("REQUEST-DATA " + opts.FILE + (" " + opts.GTID if opts.GTID else "")).encode())) -if opts.format == "JSON": - read_json() -elif opts.format == "AVRO": - read_avro() +read_data() diff --git a/server/modules/protocol/examples/cdc_schema.py b/server/modules/protocol/examples/cdc_schema.py new file mode 100755 index 000000000..8b22bfd20 --- /dev/null +++ b/server/modules/protocol/examples/cdc_schema.py @@ -0,0 +1,88 @@ +#!/usr/bin/env python3 + +# Copyright (c) 2016 MariaDB Corporation Ab +# +# Use of this software is governed by the Business Source License included +# in the LICENSE.TXT file and at www.mariadb.com/bsl11. +# +# Change Date: 2019-07-01 +# +# On the date above, in accordance with the Business Source License, use +# of this software will be governed by version 2 or later of the General +# Public License. + +# +# This program requires the MySQL Connector/Python to work +# + +import mysql.connector as mysql +import json +import sys +import argparse + +parser = argparse.ArgumentParser(description = "CDC Schema Generator", conflict_handler="resolve", epilog="""This program generates CDC schema files for a specific table or all the tables in a database. The +schema files need to be generated if the binary log files do not contain the +CREATE TABLE events that define the table layout.""") +parser.add_argument("-h", "--host", dest="host", help="Network address where the connection is made", default="localhost") +parser.add_argument("-P", "--port", dest="port", help="Port where the connection is made", default="3306") +parser.add_argument("-u", "--user", dest="user", help="Username used when connecting", default="") +parser.add_argument("-p", "--password", dest="password", help="Password used when connecting", default="") +parser.add_argument("DATABASE", help="Generate Avro schemas for this database") + +opts = parser.parse_args(sys.argv[1:]) + +def parse_field(row): + res = dict() + name = row[1].lower().split('(')[0] + + if name in ("date", "datetime", "time", "timestamp", "year", "tinytext", "text", + "mediumtext", "longtext", "char", "varchar", "enum", "set"): + res["type"] = "string" + elif name in ("tinyblob", "blob", "mediumblob", "longblob", "binary", "varbinary"): + res["type"] = "bytes" + elif name in ("int", "smallint", "mediumint", "integer", "tinyint", "short", "bit"): + res["type"] = "int" + elif name in ("float"): + res["type"] = "float" + elif name in ("double", "decimal"): + res["type"] = "double" + elif name in ("null"): + res["type"] = "null" + elif name in ("long", "bigint"): + res["type"] = "long" + else: + res["type"] = "string" + + + res["name"] = row[0].lower() + + return res + +try: + conn = mysql.connect(user=opts.user, password=opts.password, host=opts.host, port=opts.port) + cursor = conn.cursor() + cursor.execute("SHOW TABLES FROM {}".format(opts.DATABASE)) + + tables = [] + for res in cursor: + tables.append(res[0]) + + + for t in tables: + schema = dict(namespace="MaxScaleChangeDataSchema.avro", type="record", name="ChangeRecord", fields=[]) + cursor.execute("DESCRIBE {}.{}".format(opts.DATABASE, t)) + + for res in cursor: + schema["fields"].append(parse_field(res)) + + dest = open("{}.{}.000001.avsc".format(opts.DATABASE, t), 'w') + dest.write(json.dumps(schema)) + dest.close() + + cursor.close() + conn.close() + +except Exception as e: + print(e) + exit(1) + diff --git a/server/modules/routing/avrorouter/avro_rbr.c b/server/modules/routing/avrorouter/avro_rbr.c index 0b086cefb..c3aaa6e0b 100644 --- a/server/modules/routing/avrorouter/avro_rbr.c +++ b/server/modules/routing/avrorouter/avro_rbr.c @@ -507,9 +507,8 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value ptr += (ncolumns + 7) / 8; ss_dassert(ptr < end); - for (long i = 0; i < map->columns && npresent < ncolumns; i++) + for (long i = 0; i < map->columns && i < create->columns && npresent < ncolumns; i++) { - ss_dassert(create->columns == map->columns); ss_debug(int rc = )avro_value_get_by_name(record, create->column_names[i], &field, NULL); ss_dassert(rc == 0); @@ -552,12 +551,31 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value } else { - uint8_t bytes = *ptr; + /** + * The first byte in the metadata stores the real type of + * the string (ENUM and SET types are also stored as fixed + * length strings). + * + * The first two bits of the second byte contain the XOR'ed + * field length but as that information is not relevant for + * us, we just use this information to know whether to read + * one or two bytes for string length. + */ + + uint8_t bytes = *ptr++; + int len = metadata[metadata_offset] + + (((metadata[metadata_offset + 1] >> 4) & 0x3) ^ 0x3); + + if (len <= 255) + { + bytes += *ptr++ << 8; + } + char str[bytes + 1]; - memcpy(str, ptr + 1, bytes); + memcpy(str, ptr, bytes); str[bytes] = '\0'; avro_value_set_string(&field, str); - ptr += bytes + 1; + ptr += bytes; ss_dassert(ptr < end); } } diff --git a/server/modules/routing/avrorouter/avro_schema.c b/server/modules/routing/avrorouter/avro_schema.c index 4e9458074..22e17dc48 100644 --- a/server/modules/routing/avrorouter/avro_schema.c +++ b/server/modules/routing/avrorouter/avro_schema.c @@ -794,6 +794,22 @@ void make_avro_token(char* dest, const char* src, int length) dest[length] = '\0'; } +int get_column_index(TABLE_CREATE *create, const char *tok) +{ + int idx = -1; + + for (int x = 0; x < create->columns; x++) + { + if (strcasecmp(create->column_names[x], tok) == 0) + { + idx = x; + break; + } + } + + return idx; +} + bool table_create_alter(TABLE_CREATE *create, const char *sql, const char *end) { const char *tbl = strcasestr(sql, "table"), *def; @@ -844,27 +860,45 @@ bool table_create_alter(TABLE_CREATE *create, const char *sql, const char *end) { tok = get_tok(tok + len, &len, end); - MXS_FREE(create->column_names[create->columns - 1]); - char ** tmp = MXS_REALLOC(create->column_names, sizeof(char*) * create->columns - 1); - ss_dassert(tmp); + int idx = get_column_index(create, tok); - if (tmp == NULL) + if (idx != -1) { - return false; + free(create->column_names[idx]); + for (int i = idx; i < (int)create->columns - 1; i++) + { + create->column_names[i] = create->column_names[i + 1]; + } + + char ** tmp = realloc(create->column_names, sizeof(char*) * create->columns - 1); + ss_dassert(tmp); + + if (tmp == NULL) + { + return false; + } + + create->column_names = tmp; + create->columns--; + updates++; } - create->column_names = tmp; - create->columns--; - updates++; tok = get_next_def(tok, end); len = 0; } else if (tok_eq(ptok, "change", plen) && tok_eq(tok, "column", len)) { tok = get_tok(tok + len, &len, end); - MXS_FREE(create->column_names[create->columns - 1]); - create->column_names[create->columns - 1] = strndup(tok, len); - updates++; + + int idx = get_column_index(create, tok); + + if (idx != -1) + { + free(create->column_names[idx]); + create->column_names[idx] = strndup(tok, len); + updates++; + } + tok = get_next_def(tok, end); len = 0; } @@ -975,7 +1009,6 @@ TABLE_MAP *table_map_alloc(uint8_t *ptr, uint8_t hdr_len, TABLE_CREATE* create) map->id = table_id; map->version = create->version; map->flags = flags; - ss_dassert(column_count == create->columns); map->columns = column_count; map->column_types = MXS_MALLOC(column_count); /** Allocate at least one byte for the metadata */ diff --git a/server/modules/routing/maxinfo/maxinfo_exec.c b/server/modules/routing/maxinfo/maxinfo_exec.c index 9d58a2f56..f4a37f967 100644 --- a/server/modules/routing/maxinfo/maxinfo_exec.c +++ b/server/modules/routing/maxinfo/maxinfo_exec.c @@ -29,6 +29,7 @@ #include #include #include +#include #include #include @@ -998,7 +999,7 @@ maxinfo_zombie_dcbs() /** * Interface to poll stats for reads */ -static int +static int64_t maxinfo_read_events() { return poll_get_stat(POLL_STAT_READ); @@ -1007,7 +1008,7 @@ maxinfo_read_events() /** * Interface to poll stats for writes */ -static int +static int64_t maxinfo_write_events() { return poll_get_stat(POLL_STAT_WRITE); @@ -1016,7 +1017,7 @@ maxinfo_write_events() /** * Interface to poll stats for errors */ -static int +static int64_t maxinfo_error_events() { return poll_get_stat(POLL_STAT_ERROR); @@ -1025,7 +1026,7 @@ maxinfo_error_events() /** * Interface to poll stats for hangup */ -static int +static int64_t maxinfo_hangup_events() { return poll_get_stat(POLL_STAT_HANGUP); @@ -1034,7 +1035,7 @@ maxinfo_hangup_events() /** * Interface to poll stats for accepts */ -static int +static int64_t maxinfo_accept_events() { return poll_get_stat(POLL_STAT_ACCEPT); @@ -1043,7 +1044,7 @@ maxinfo_accept_events() /** * Interface to poll stats for event queue length */ -static int +static int64_t maxinfo_event_queue_length() { return poll_get_stat(POLL_STAT_EVQ_LEN); @@ -1052,7 +1053,7 @@ maxinfo_event_queue_length() /** * Interface to poll stats for max event queue length */ -static int +static int64_t maxinfo_max_event_queue_length() { return poll_get_stat(POLL_STAT_EVQ_MAX); @@ -1061,7 +1062,7 @@ maxinfo_max_event_queue_length() /** * Interface to poll stats for max queue time */ -static int +static int64_t maxinfo_max_event_queue_time() { return poll_get_stat(POLL_STAT_MAX_QTIME); @@ -1070,7 +1071,7 @@ maxinfo_max_event_queue_time() /** * Interface to poll stats for max event execution time */ -static int +static int64_t maxinfo_max_event_exec_time() { return poll_get_stat(POLL_STAT_MAX_EXECTIME); @@ -1137,17 +1138,17 @@ status_row(RESULTSET *result, void *data) resultset_row_set(row, 0, status[context->index].name); switch (status[context->index].type) { - case VT_STRING: - resultset_row_set(row, 1, - (char *)(*status[context->index].func)()); - break; - case VT_INT: - snprintf(buf, 80, "%ld", - (long)(*status[context->index].func)()); - resultset_row_set(row, 1, buf); - break; - default: - ss_dassert(!true); + case VT_STRING: + resultset_row_set(row, 1, + (char *)(*status[context->index].func)()); + break; + case VT_INT: + snprintf(buf, 80, "%" PRId64, + (int64_t)(*status[context->index].func)()); + resultset_row_set(row, 1, buf); + break; + default: + ss_dassert(!true); } context->index++; return row; diff --git a/server/modules/routing/readwritesplit/rwsplit_mysql.c b/server/modules/routing/readwritesplit/rwsplit_mysql.c index fb8534089..7dd2f41d7 100644 --- a/server/modules/routing/readwritesplit/rwsplit_mysql.c +++ b/server/modules/routing/readwritesplit/rwsplit_mysql.c @@ -13,7 +13,6 @@ #include "readwritesplit.h" -#include #include #include #include diff --git a/server/modules/routing/schemarouter/schemarouter.c b/server/modules/routing/schemarouter/schemarouter.c index 9baf801d4..bc7811a41 100644 --- a/server/modules/routing/schemarouter/schemarouter.c +++ b/server/modules/routing/schemarouter/schemarouter.c @@ -13,7 +13,6 @@ #include "schemarouter.h" -#include #include #include #include diff --git a/server/modules/routing/schemarouter/sharding_common.h b/server/modules/routing/schemarouter/sharding_common.h index 5ad404c86..1ad65fe07 100644 --- a/server/modules/routing/schemarouter/sharding_common.h +++ b/server/modules/routing/schemarouter/sharding_common.h @@ -15,7 +15,6 @@ */ #include -#include #include #include #include