Merge branch '2.1-merge-2.0' into 2.1

This commit is contained in:
Markus Mäkelä
2017-05-15 10:36:05 +03:00
19 changed files with 464 additions and 156 deletions

View File

@ -60,6 +60,10 @@ include_directories(${PCRE2_INCLUDE_DIRS})
if(NOT MARIADB_CONNECTOR_FOUND) if(NOT MARIADB_CONNECTOR_FOUND)
message(STATUS "Building MariaDB Connector-C from source.") message(STATUS "Building MariaDB Connector-C from source.")
include(cmake/BuildMariaDBConnector.cmake) include(cmake/BuildMariaDBConnector.cmake)
else()
# This is required as the core depends on the `connector-c` target
add_custom_target(connector-c)
message(STATUS "Using system Connector-C")
endif() endif()
if(NOT JANSSON_FOUND) if(NOT JANSSON_FOUND)

View File

@ -85,7 +85,7 @@ bool column_is_decimal(uint8_t type);
bool fixed_string_is_enum(uint8_t type); bool fixed_string_is_enum(uint8_t type);
/** Value unpacking */ /** Value unpacking */
size_t unpack_temporal_value(uint8_t type, uint8_t *ptr, uint8_t* metadata, struct tm *tm); size_t unpack_temporal_value(uint8_t type, uint8_t *ptr, uint8_t* metadata, int length, struct tm *tm);
size_t unpack_enum(uint8_t *ptr, uint8_t *metadata, uint8_t *dest); size_t unpack_enum(uint8_t *ptr, uint8_t *metadata, uint8_t *dest);
size_t unpack_numeric_field(uint8_t *ptr, uint8_t type, uint8_t* metadata, uint8_t* val); size_t unpack_numeric_field(uint8_t *ptr, uint8_t type, uint8_t* metadata, uint8_t* val);
size_t unpack_bit(uint8_t *ptr, uint8_t *null_mask, uint32_t col_count, size_t unpack_bit(uint8_t *ptr, uint8_t *null_mask, uint32_t col_count,

View File

@ -11,7 +11,6 @@
* Public License. * Public License.
*/ */
#include <my_config.h>
#include <sys/types.h> #include <sys/types.h>
#include <sys/stat.h> #include <sys/stat.h>
#include <fcntl.h> #include <fcntl.h>

View File

@ -57,7 +57,7 @@ void dShowThreads(DCB *dcb);
void dShowEventQ(DCB *dcb); void dShowEventQ(DCB *dcb);
void dShowEventStats(DCB *dcb); void dShowEventStats(DCB *dcb);
int poll_get_stat(POLL_STAT stat); int64_t poll_get_stat(POLL_STAT stat);
RESULTSET *eventTimesGetList(); RESULTSET *eventTimesGetList();
void poll_send_message(enum poll_message msg, void *data); void poll_send_message(enum poll_message msg, void *data);

View File

@ -25,6 +25,10 @@
#include <strings.h> #include <strings.h>
#include <math.h> #include <math.h>
#include <maxscale/protocol/mysql.h>
static uint64_t unpack_bytes(uint8_t *ptr, size_t bytes);
/** /**
* @brief Convert a table column type to a string * @brief Convert a table column type to a string
* *
@ -216,6 +220,35 @@ static void unpack_year(uint8_t *ptr, struct tm *dest)
dest->tm_year = *ptr; dest->tm_year = *ptr;
} }
/** Base-10 logarithm values */
int64_t log_10_values[] =
{
1,
10,
100,
1000,
10000,
100000,
1000000,
10000000,
100000000
};
/**
* If the TABLE_COL_TYPE_DATETIME type field is declared as a datetime with
* extra precision, the packed length is shorter than 8 bytes.
*/
size_t datetime_sizes[] =
{
5, // DATETIME(0)
6, // DATETIME(1)
6, // DATETIME(2)
7, // DATETIME(3)
7, // DATETIME(4)
7, // DATETIME(5)
8 // DATETIME(6)
};
/** /**
* @brief Unpack a DATETIME * @brief Unpack a DATETIME
* *
@ -224,21 +257,52 @@ static void unpack_year(uint8_t *ptr, struct tm *dest)
* @param val Value read from the binary log * @param val Value read from the binary log
* @param dest Pointer where the unpacked value is stored * @param dest Pointer where the unpacked value is stored
*/ */
static void unpack_datetime(uint8_t *ptr, struct tm *dest) static void unpack_datetime(uint8_t *ptr, int length, struct tm *dest)
{ {
uint64_t val = 0; int64_t val = 0;
memcpy(&val, ptr, sizeof(val)); uint32_t second, minute, hour, day, month, year;
uint32_t second = val - ((val / 100) * 100);
if (length == -1)
{
val = gw_mysql_get_byte8(ptr);
second = val - ((val / 100) * 100);
val /= 100; val /= 100;
uint32_t minute = val - ((val / 100) * 100); minute = val - ((val / 100) * 100);
val /= 100; val /= 100;
uint32_t hour = val - ((val / 100) * 100); hour = val - ((val / 100) * 100);
val /= 100; val /= 100;
uint32_t day = val - ((val / 100) * 100); day = val - ((val / 100) * 100);
val /= 100; val /= 100;
uint32_t month = val - ((val / 100) * 100); month = val - ((val / 100) * 100);
val /= 100; val /= 100;
uint32_t year = val; year = val;
}
else
{
// TODO: Figure out why DATETIME(0) doesn't work like it others do
val = unpack_bytes(ptr, datetime_sizes[length]);
val *= log_10_values[6 - length];
if (val < 0)
{
val = -val;
}
int subsecond = val % 1000000;
val /= 1000000;
second = val % 60;
val /= 60;
minute = val % 60;
val /= 60;
hour = val % 24;
val /= 24;
day = val % 32;
val /= 32;
month = val % 13;
val /= 13;
year = val;
}
memset(dest, 0, sizeof(struct tm)); memset(dest, 0, sizeof(struct tm));
dest->tm_year = year - 1900; dest->tm_year = year - 1900;
@ -391,14 +455,13 @@ size_t unpack_bit(uint8_t *ptr, uint8_t *null_mask, uint32_t col_count,
return metadata[1]; return metadata[1];
} }
/** /**
* @brief Get the length of a temporal field * @brief Get the length of a temporal field
* @param type Field type * @param type Field type
* @param decimals How many decimals the field has * @param decimals How many decimals the field has
* @return Number of bytes the temporal value takes * @return Number of bytes the temporal value takes
*/ */
static size_t temporal_field_size(uint8_t type, uint8_t decimals) static size_t temporal_field_size(uint8_t type, uint8_t decimals, int length)
{ {
switch (type) switch (type)
{ {
@ -413,7 +476,7 @@ static size_t temporal_field_size(uint8_t type, uint8_t decimals)
return 3 + ((decimals + 1) / 2); return 3 + ((decimals + 1) / 2);
case TABLE_COL_TYPE_DATETIME: case TABLE_COL_TYPE_DATETIME:
return 8; return length < 0 || length > 6 ? 8 : datetime_sizes[length];
case TABLE_COL_TYPE_TIMESTAMP: case TABLE_COL_TYPE_TIMESTAMP:
return 4; return 4;
@ -441,7 +504,7 @@ static size_t temporal_field_size(uint8_t type, uint8_t decimals)
* @param val Extracted packed value * @param val Extracted packed value
* @param tm Pointer where the unpacked temporal value is stored * @param tm Pointer where the unpacked temporal value is stored
*/ */
size_t unpack_temporal_value(uint8_t type, uint8_t *ptr, uint8_t *metadata, struct tm *tm) size_t unpack_temporal_value(uint8_t type, uint8_t *ptr, uint8_t *metadata, int length, struct tm *tm)
{ {
switch (type) switch (type)
{ {
@ -450,7 +513,7 @@ size_t unpack_temporal_value(uint8_t type, uint8_t *ptr, uint8_t *metadata, stru
break; break;
case TABLE_COL_TYPE_DATETIME: case TABLE_COL_TYPE_DATETIME:
unpack_datetime(ptr, tm); unpack_datetime(ptr, length, tm);
break; break;
case TABLE_COL_TYPE_DATETIME2: case TABLE_COL_TYPE_DATETIME2:
@ -474,7 +537,7 @@ size_t unpack_temporal_value(uint8_t type, uint8_t *ptr, uint8_t *metadata, stru
ss_dassert(false); ss_dassert(false);
break; break;
} }
return temporal_field_size(type, *metadata); return temporal_field_size(type, *metadata, length);
} }
void format_temporal_value(char *str, size_t size, uint8_t type, struct tm *tm) void format_temporal_value(char *str, size_t size, uint8_t type, struct tm *tm)

View File

@ -1517,8 +1517,7 @@ dShowEventStats(DCB *pdcb)
* @param stat The required statistic * @param stat The required statistic
* @return The value of that statistic * @return The value of that statistic
*/ */
int int64_t poll_get_stat(POLL_STAT stat)
poll_get_stat(POLL_STAT stat)
{ {
switch (stat) switch (stat)
{ {

View File

@ -31,7 +31,6 @@
#undef NDEBUG #undef NDEBUG
#endif #endif
#define FAILTEST(s) printf("TEST FAILED: " s "\n");return 1; #define FAILTEST(s) printf("TEST FAILED: " s "\n");return 1;
#include <my_config.h>
#include <mysql.h> #include <mysql.h>
#include <stdio.h> #include <stdio.h>
#include <maxscale/notification.h> #include <maxscale/notification.h>

View File

@ -59,7 +59,6 @@
#define MXS_MODULE_NAME "mqfilter" #define MXS_MODULE_NAME "mqfilter"
#include <my_config.h>
#include <stdio.h> #include <stdio.h>
#include <fcntl.h> #include <fcntl.h>
#include <maxscale/filter.h> #include <maxscale/filter.h>

View File

@ -2,4 +2,5 @@ install_script(cdc.py core)
install_script(cdc_users.py core) install_script(cdc_users.py core)
install_script(cdc_last_transaction.py core) install_script(cdc_last_transaction.py core)
install_script(cdc_kafka_producer.py core) install_script(cdc_kafka_producer.py core)
install_script(cdc_schema.py core)
install_file(cdc_schema.go core) install_file(cdc_schema.go core)

View File

@ -12,52 +12,32 @@
# Public License. # Public License.
import time import time
import json
import re
import sys import sys
import socket import socket
import hashlib import hashlib
import argparse import argparse
import subprocess
import selectors import selectors
import binascii import binascii
import os import os
# Read data as JSON def read_data():
def read_json(): sel = selectors.DefaultSelector()
decoder = json.JSONDecoder() sel.register(sock, selectors.EVENT_READ)
rbuf = bytes()
ep = selectors.EpollSelector()
ep.register(sock, selectors.EVENT_READ)
while True: while True:
pollrc = ep.select(timeout=int(opts.read_timeout) if int(opts.read_timeout) > 0 else None)
try:
buf = sock.recv(4096, socket.MSG_DONTWAIT)
rbuf += buf
while True:
rbuf = rbuf.lstrip()
data = decoder.raw_decode(rbuf.decode('utf_8'))
rbuf = rbuf[data[1]:]
print(json.dumps(data[0]))
except ValueError as err:
sys.stdout.flush()
pass
except Exception:
break
# Read data as Avro
def read_avro():
ep = selectors.EpollSelector()
ep.register(sock, selectors.EVENT_READ)
while True:
pollrc = ep.select(timeout=int(opts.read_timeout) if int(opts.read_timeout) > 0 else None)
try: try:
events = sel.select(timeout=int(opts.read_timeout) if int(opts.read_timeout) > 0 else None)
buf = sock.recv(4096, socket.MSG_DONTWAIT) buf = sock.recv(4096, socket.MSG_DONTWAIT)
if len(buf) > 0:
os.write(sys.stdout.fileno(), buf) os.write(sys.stdout.fileno(), buf)
sys.stdout.flush() sys.stdout.flush()
except Exception: else:
raise Exception('Socket was closed')
except BlockingIOError:
break
except Exception as ex:
print(ex, file=sys.stderr)
break break
parser = argparse.ArgumentParser(description = "CDC Binary consumer", conflict_handler="resolve") parser = argparse.ArgumentParser(description = "CDC Binary consumer", conflict_handler="resolve")
@ -91,7 +71,4 @@ response = str(sock.recv(1024)).encode('utf_8')
# Request a data stream # Request a data stream
sock.send(bytes(("REQUEST-DATA " + opts.FILE + (" " + opts.GTID if opts.GTID else "")).encode())) sock.send(bytes(("REQUEST-DATA " + opts.FILE + (" " + opts.GTID if opts.GTID else "")).encode()))
if opts.format == "JSON": read_data()
read_json()
elif opts.format == "AVRO":
read_avro()

View File

@ -0,0 +1,88 @@
#!/usr/bin/env python3
# Copyright (c) 2016 MariaDB Corporation Ab
#
# Use of this software is governed by the Business Source License included
# in the LICENSE.TXT file and at www.mariadb.com/bsl11.
#
# Change Date: 2019-07-01
#
# On the date above, in accordance with the Business Source License, use
# of this software will be governed by version 2 or later of the General
# Public License.
#
# This program requires the MySQL Connector/Python to work
#
import mysql.connector as mysql
import json
import sys
import argparse
parser = argparse.ArgumentParser(description = "CDC Schema Generator", conflict_handler="resolve", epilog="""This program generates CDC schema files for a specific table or all the tables in a database. The
schema files need to be generated if the binary log files do not contain the
CREATE TABLE events that define the table layout.""")
parser.add_argument("-h", "--host", dest="host", help="Network address where the connection is made", default="localhost")
parser.add_argument("-P", "--port", dest="port", help="Port where the connection is made", default="3306")
parser.add_argument("-u", "--user", dest="user", help="Username used when connecting", default="")
parser.add_argument("-p", "--password", dest="password", help="Password used when connecting", default="")
parser.add_argument("DATABASE", help="Generate Avro schemas for this database")
opts = parser.parse_args(sys.argv[1:])
def parse_field(row):
res = dict()
name = row[1].lower().split('(')[0]
if name in ("date", "datetime", "time", "timestamp", "year", "tinytext", "text",
"mediumtext", "longtext", "char", "varchar", "enum", "set"):
res["type"] = "string"
elif name in ("tinyblob", "blob", "mediumblob", "longblob", "binary", "varbinary"):
res["type"] = "bytes"
elif name in ("int", "smallint", "mediumint", "integer", "tinyint", "short", "bit"):
res["type"] = "int"
elif name in ("float"):
res["type"] = "float"
elif name in ("double", "decimal"):
res["type"] = "double"
elif name in ("null"):
res["type"] = "null"
elif name in ("long", "bigint"):
res["type"] = "long"
else:
res["type"] = "string"
res["name"] = row[0].lower()
return res
try:
conn = mysql.connect(user=opts.user, password=opts.password, host=opts.host, port=opts.port)
cursor = conn.cursor()
cursor.execute("SHOW TABLES FROM {}".format(opts.DATABASE))
tables = []
for res in cursor:
tables.append(res[0])
for t in tables:
schema = dict(namespace="MaxScaleChangeDataSchema.avro", type="record", name="ChangeRecord", fields=[])
cursor.execute("DESCRIBE {}.{}".format(opts.DATABASE, t))
for res in cursor:
schema["fields"].append(parse_field(res))
dest = open("{}.{}.000001.avsc".format(opts.DATABASE, t), 'w')
dest.write(json.dumps(schema))
dest.close()
cursor.close()
conn.close()
except Exception as e:
print(e)
exit(1)

View File

@ -475,6 +475,17 @@ void notify_all_clients(AVRO_INSTANCE *router)
} }
} }
void do_checkpoint(AVRO_INSTANCE *router, uint64_t *total_rows, uint64_t *total_commits)
{
update_used_tables(router);
avro_flush_all_tables(router, AVROROUTER_FLUSH);
avro_save_conversion_state(router);
notify_all_clients(router);
*total_rows += router->row_count;
*total_commits += router->trx_count;
router->row_count = router->trx_count = 0;
}
/** /**
* @brief Read all replication events from a binlog file. * @brief Read all replication events from a binlog file.
* *
@ -555,6 +566,8 @@ avro_binlog_end_t avro_read_all_events(AVRO_INSTANCE *router)
} }
else else
{ {
do_checkpoint(router, &total_rows, &total_commits);
MXS_INFO("Processed %lu transactions and %lu row events.", MXS_INFO("Processed %lu transactions and %lu row events.",
total_commits, total_rows); total_commits, total_rows);
if (rotate_seen) if (rotate_seen)
@ -742,13 +755,7 @@ avro_binlog_end_t avro_read_all_events(AVRO_INSTANCE *router)
if (router->row_count >= router->row_target || if (router->row_count >= router->row_target ||
router->trx_count >= router->trx_target) router->trx_count >= router->trx_target)
{ {
update_used_tables(router); do_checkpoint(router, &total_rows, &total_commits);
avro_flush_all_tables(router, AVROROUTER_SYNC);
avro_save_conversion_state(router);
notify_all_clients(router);
total_rows += router->row_count;
total_commits += router->trx_count;
router->row_count = router->trx_count = 0;
} }
} }

View File

@ -162,6 +162,11 @@ bool handle_table_map_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr
"table until a DDL statement for it is read.", table_ident); "table until a DDL statement for it is read.", table_ident);
} }
if (rval)
{
MXS_INFO("Table Map for '%s' at %lu", table_ident, router->current_pos);
}
return rval; return rval;
} }
@ -289,9 +294,13 @@ bool handle_row_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr)
* beforehand so we must continue processing them until we reach the end * beforehand so we must continue processing them until we reach the end
* of the event. */ * of the event. */
int rows = 0; int rows = 0;
MXS_INFO("Row Event for '%s' at %lu", table_ident, router->current_pos);
while (ptr - start < hdr->event_size - BINLOG_EVENT_HDR_LEN) while (ptr - start < hdr->event_size - BINLOG_EVENT_HDR_LEN)
{ {
static uint64_t total_row_count = 1;
MXS_INFO("Row %lu", total_row_count++);
/** Add the current GTID and timestamp */ /** Add the current GTID and timestamp */
uint8_t *end = ptr + hdr->event_size - BINLOG_EVENT_HDR_LEN; uint8_t *end = ptr + hdr->event_size - BINLOG_EVENT_HDR_LEN;
int event_type = get_event_type(hdr->event_type); int event_type = get_event_type(hdr->event_type);
@ -507,9 +516,8 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
ptr += (ncolumns + 7) / 8; ptr += (ncolumns + 7) / 8;
ss_dassert(ptr < end); ss_dassert(ptr < end);
for (long i = 0; i < map->columns && npresent < ncolumns; i++) for (long i = 0; i < map->columns && i < create->columns && npresent < ncolumns; i++)
{ {
ss_dassert(create->columns == map->columns);
ss_debug(int rc = )avro_value_get_by_name(record, create->column_names[i], &field, NULL); ss_debug(int rc = )avro_value_get_by_name(record, create->column_names[i], &field, NULL);
ss_dassert(rc == 0); ss_dassert(rc == 0);
@ -518,6 +526,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
npresent++; npresent++;
if (bit_is_set(null_bitmap, ncolumns, i)) if (bit_is_set(null_bitmap, ncolumns, i))
{ {
MXS_INFO("[%ld] NULL", i);
if (column_is_blob(map->column_types[i])) if (column_is_blob(map->column_types[i]))
{ {
uint8_t nullvalue = 0; uint8_t nullvalue = 0;
@ -547,17 +556,45 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
MXS_WARNING("ENUM/SET values larger than 255 values aren't supported."); MXS_WARNING("ENUM/SET values larger than 255 values aren't supported.");
} }
avro_value_set_string(&field, strval); avro_value_set_string(&field, strval);
MXS_INFO("[%ld] ENUM: %lu bytes", i, bytes);
ptr += bytes; ptr += bytes;
ss_dassert(ptr < end); ss_dassert(ptr < end);
} }
else else
{ {
uint8_t bytes = *ptr; /**
* The first byte in the metadata stores the real type of
* the string (ENUM and SET types are also stored as fixed
* length strings).
*
* The first two bits of the second byte contain the XOR'ed
* field length but as that information is not relevant for
* us, we just use this information to know whether to read
* one or two bytes for string length.
*/
uint16_t meta = metadata[metadata_offset + 1] + (metadata[metadata_offset] << 8);
int bytes = 0;
uint16_t extra_length = (((meta >> 4) & 0x300) ^ 0x300);
uint16_t field_length = (meta & 0xff) + extra_length;
if (field_length > 255)
{
bytes = ptr[0] + (ptr[1] << 8);
ptr += 2;
}
else
{
bytes = *ptr++;
}
MXS_INFO("[%ld] CHAR: field: %d bytes, data: %d bytes", i, field_length, bytes);
ss_dassert(bytes || *ptr == '\0');
char str[bytes + 1]; char str[bytes + 1];
memcpy(str, ptr + 1, bytes); memcpy(str, ptr, bytes);
str[bytes] = '\0'; str[bytes] = '\0';
avro_value_set_string(&field, str); avro_value_set_string(&field, str);
ptr += bytes + 1; ptr += bytes;
ss_dassert(ptr < end); ss_dassert(ptr < end);
} }
} }
@ -577,6 +614,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
MXS_WARNING("BIT is not currently supported, values are stored as 0."); MXS_WARNING("BIT is not currently supported, values are stored as 0.");
} }
avro_value_set_int(&field, value); avro_value_set_int(&field, value);
MXS_INFO("[%ld] BIT", i);
ptr += bytes; ptr += bytes;
ss_dassert(ptr < end); ss_dassert(ptr < end);
} }
@ -585,6 +623,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
double f_value = 0.0; double f_value = 0.0;
ptr += unpack_decimal_field(ptr, metadata + metadata_offset, &f_value); ptr += unpack_decimal_field(ptr, metadata + metadata_offset, &f_value);
avro_value_set_double(&field, f_value); avro_value_set_double(&field, f_value);
MXS_INFO("[%ld] DOUBLE", i);
ss_dassert(ptr < end); ss_dassert(ptr < end);
} }
else if (column_is_variable_string(map->column_types[i])) else if (column_is_variable_string(map->column_types[i]))
@ -602,6 +641,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
ptr++; ptr++;
} }
MXS_INFO("[%ld] VARCHAR: field: %d bytes, data: %lu bytes", i, bytes, sz);
char buf[sz + 1]; char buf[sz + 1];
memcpy(buf, ptr, sz); memcpy(buf, ptr, sz);
buf[sz] = '\0'; buf[sz] = '\0';
@ -615,6 +655,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
uint64_t len = 0; uint64_t len = 0;
memcpy(&len, ptr, bytes); memcpy(&len, ptr, bytes);
ptr += bytes; ptr += bytes;
MXS_INFO("[%ld] BLOB: field: %d bytes, data: %lu bytes", i, bytes, len);
if (len) if (len)
{ {
avro_value_set_bytes(&field, ptr, len); avro_value_set_bytes(&field, ptr, len);
@ -631,9 +672,12 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
{ {
char buf[80]; char buf[80];
struct tm tm; struct tm tm;
ptr += unpack_temporal_value(map->column_types[i], ptr, &metadata[metadata_offset], &tm); ptr += unpack_temporal_value(map->column_types[i], ptr,
&metadata[metadata_offset],
create->column_lengths[i], &tm);
format_temporal_value(buf, sizeof(buf), map->column_types[i], &tm); format_temporal_value(buf, sizeof(buf), map->column_types[i], &tm);
avro_value_set_string(&field, buf); avro_value_set_string(&field, buf);
MXS_INFO("[%ld] TEMPORAL: %s", i, buf);
ss_dassert(ptr < end); ss_dassert(ptr < end);
} }
/** All numeric types (INT, LONG, FLOAT etc.) */ /** All numeric types (INT, LONG, FLOAT etc.) */

View File

@ -124,9 +124,11 @@ char* json_new_schema_from_table(TABLE_MAP *map)
for (uint64_t i = 0; i < map->columns; i++) for (uint64_t i = 0; i < map->columns; i++)
{ {
json_array_append(array, json_pack_ex(&err, 0, "{s:s, s:s}", "name", json_array_append(array, json_pack_ex(&err, 0, "{s:s, s:s, s:s, s:i}",
create->column_names[i], "type", "name", create->column_names[i],
column_type_to_avro_type(map->column_types[i]))); "type", column_type_to_avro_type(map->column_types[i]),
"real_type", create->column_types[i],
"length", create->column_lengths[i]));
} }
json_object_set_new(schema, "fields", array); json_object_set_new(schema, "fields", array);
char* rval = json_dumps(schema, JSON_PRESERVE_ORDER); char* rval = json_dumps(schema, JSON_PRESERVE_ORDER);
@ -172,8 +174,10 @@ bool json_extract_field_names(const char* filename, TABLE_CREATE *table)
{ {
int array_size = json_array_size(arr); int array_size = json_array_size(arr);
table->column_names = (char**)MXS_MALLOC(sizeof(char*) * (array_size)); table->column_names = (char**)MXS_MALLOC(sizeof(char*) * (array_size));
table->column_types = (char**)MXS_MALLOC(sizeof(char*) * (array_size));
table->column_lengths = (int*)MXS_MALLOC(sizeof(int) * (array_size));
if (table->column_names) if (table->column_names && table->column_types && table->column_lengths)
{ {
int columns = 0; int columns = 0;
rval = true; rval = true;
@ -184,6 +188,28 @@ bool json_extract_field_names(const char* filename, TABLE_CREATE *table)
if (json_is_object(val)) if (json_is_object(val))
{ {
json_t* value;
if ((value = json_object_get(val, "real_type")) && json_is_string(value))
{
table->column_types[columns] = MXS_STRDUP_A(json_string_value(value));
}
else
{
table->column_types[columns] = MXS_STRDUP_A("unknown");
MXS_WARNING("No \"real_type\" value defined. Treating as unknown type field.");
}
if ((value = json_object_get(val, "length")) && json_is_integer(value))
{
table->column_lengths[columns] = json_integer_value(value);
}
else
{
table->column_lengths[columns] = -1;
MXS_WARNING("No \"length\" value defined. Treating as default length field.");
}
json_t *name = json_object_get(val, "name"); json_t *name = json_object_get(val, "name");
if (name && json_is_string(name)) if (name && json_is_string(name))
{ {
@ -489,7 +515,6 @@ static const char *extract_field_name(const char* ptr, char* dest, size_t size)
dest[bytes] = '\0'; dest[bytes] = '\0';
make_valid_avro_identifier(dest); make_valid_avro_identifier(dest);
ptr = next_field_definition(ptr);
} }
else else
{ {
@ -499,55 +524,97 @@ static const char *extract_field_name(const char* ptr, char* dest, size_t size)
return ptr; return ptr;
} }
int extract_type_length(const char* ptr, char *dest)
{
/** Skip any leading whitespace */
while (isspace(*ptr) || *ptr == '`')
{
ptr++;
}
/** The field type definition starts here */
const char *start = ptr;
/** Skip characters until we either hit a whitespace character or the start
* of the length definition. */
while (!isspace(*ptr) && *ptr != '(')
{
ptr++;
}
/** Store type */
int typelen = ptr - start;
memcpy(dest, start, typelen);
dest[typelen] = '\0';
/** Skip whitespace */
while (isspace(*ptr))
{
ptr++;
}
int rval = -1; // No length defined
/** Start of length definition */
if (*ptr == '(')
{
ptr++;
char *end;
int val = strtol(ptr, &end, 10);
if (*end == ')')
{
rval = val;
}
}
return rval;
}
int count_columns(const char* ptr)
{
int i = 2;
while ((ptr = strchr(ptr, ',')))
{
ptr++;
i++;
}
return i;
}
/** /**
* Process a table definition into an array of column names * Process a table definition into an array of column names
* @param nameptr table definition * @param nameptr table definition
* @return Number of processed columns or -1 on error * @return Number of processed columns or -1 on error
*/ */
static int process_column_definition(const char *nameptr, char*** dest) static int process_column_definition(const char *nameptr, char*** dest, char*** dest_types, int** dest_lens)
{ {
/** Process columns in groups of 8 */ int n = count_columns(nameptr);
size_t chunks = 1; *dest = MXS_MALLOC(sizeof(char*) * n);
const size_t chunk_size = 8; *dest_types = MXS_MALLOC(sizeof(char*) * n);
int i = 0; *dest_lens = MXS_MALLOC(sizeof(int) * n);
char **names = MXS_MALLOC(sizeof(char*) * (chunks * chunk_size + 1));
if (names == NULL)
{
return -1;
}
char **names = *dest;
char **types = *dest_types;
int *lengths = *dest_lens;
char colname[512]; char colname[512];
int i = 0;
while ((nameptr = extract_field_name(nameptr, colname, sizeof(colname)))) while ((nameptr = extract_field_name(nameptr, colname, sizeof(colname))))
{ {
if (i >= chunks * chunk_size) ss_dassert(i < n);
{ char type[100] = "";
char **tmp = MXS_REALLOC(names, (++chunks * chunk_size + 1) * sizeof(char*)); int len = extract_type_length(nameptr, type);
if (tmp == NULL) nameptr = next_field_definition(nameptr);
{ fix_reserved_word(colname);
for (int x = 0; x < i; x++)
{
MXS_FREE(names[x]);
}
MXS_FREE(names);
return -1;
}
names = tmp;
}
if ((names[i++] = MXS_STRDUP(colname)) == NULL) lengths[i] = len;
{ types[i] = MXS_STRDUP_A(type);
for (int x = 0; x < i; x++) names[i] = MXS_STRDUP_A(colname);
{ i++;
MXS_FREE(names[x]);
} }
MXS_FREE(names);
return -1;
}
}
*dest = names;
return i; return i;
} }
@ -601,7 +668,7 @@ TABLE_CREATE* table_create_alloc(const char* sql, const char* event_db)
char database[MYSQL_DATABASE_MAXLEN + 1]; char database[MYSQL_DATABASE_MAXLEN + 1];
const char *db = event_db; const char *db = event_db;
MXS_DEBUG("Create table statement: %.*s", stmt_len, statement_sql); MXS_INFO("Create table: %s", sql);
if (!get_table_name(sql, table)) if (!get_table_name(sql, table))
{ {
@ -621,8 +688,10 @@ TABLE_CREATE* table_create_alloc(const char* sql, const char* event_db)
db = database; db = database;
} }
int* lengths = NULL;
char **names = NULL; char **names = NULL;
int n_columns = process_column_definition(statement_sql, &names); char **types = NULL;
int n_columns = process_column_definition(statement_sql, &names, &types, &lengths);
ss_dassert(n_columns > 0); ss_dassert(n_columns > 0);
/** We have appear to have a valid CREATE TABLE statement */ /** We have appear to have a valid CREATE TABLE statement */
@ -634,6 +703,8 @@ TABLE_CREATE* table_create_alloc(const char* sql, const char* event_db)
rval->version = 1; rval->version = 1;
rval->was_used = false; rval->was_used = false;
rval->column_names = names; rval->column_names = names;
rval->column_lengths = lengths;
rval->column_types = types;
rval->columns = n_columns; rval->columns = n_columns;
rval->database = MXS_STRDUP(db); rval->database = MXS_STRDUP(db);
rval->table = MXS_STRDUP(table); rval->table = MXS_STRDUP(table);
@ -675,8 +746,11 @@ void table_create_free(TABLE_CREATE* value)
for (uint64_t i = 0; i < value->columns; i++) for (uint64_t i = 0; i < value->columns; i++)
{ {
MXS_FREE(value->column_names[i]); MXS_FREE(value->column_names[i]);
MXS_FREE(value->column_types[i]);
} }
MXS_FREE(value->column_names); MXS_FREE(value->column_names);
MXS_FREE(value->column_types);
MXS_FREE(value->column_lengths);
MXS_FREE(value->table); MXS_FREE(value->table);
MXS_FREE(value->database); MXS_FREE(value->database);
MXS_FREE(value); MXS_FREE(value);
@ -792,6 +866,26 @@ void make_avro_token(char* dest, const char* src, int length)
memcpy(dest, src, length); memcpy(dest, src, length);
dest[length] = '\0'; dest[length] = '\0';
fix_reserved_word(dest);
}
int get_column_index(TABLE_CREATE *create, const char *tok)
{
int idx = -1;
char safe_tok[strlen(tok) + 2];
strcpy(safe_tok, tok);
fix_reserved_word(safe_tok);
for (int x = 0; x < create->columns; x++)
{
if (strcasecmp(create->column_names[x], tok) == 0)
{
idx = x;
break;
}
}
return idx;
} }
bool table_create_alter(TABLE_CREATE *create, const char *sql, const char *end) bool table_create_alter(TABLE_CREATE *create, const char *sql, const char *end)
@ -805,7 +899,7 @@ bool table_create_alter(TABLE_CREATE *create, const char *sql, const char *end)
if (tok) if (tok)
{ {
MXS_DEBUG("Altering table %.*s\n", len, tok); MXS_INFO("Alter table '%.*s'; %.*s\n", len, tok, (int)(end - sql), sql);
def = tok + len; def = tok + len;
} }
@ -844,8 +938,17 @@ bool table_create_alter(TABLE_CREATE *create, const char *sql, const char *end)
{ {
tok = get_tok(tok + len, &len, end); tok = get_tok(tok + len, &len, end);
MXS_FREE(create->column_names[create->columns - 1]); int idx = get_column_index(create, tok);
char ** tmp = MXS_REALLOC(create->column_names, sizeof(char*) * create->columns - 1);
if (idx != -1)
{
MXS_FREE(create->column_names[idx]);
for (int i = idx; i < (int)create->columns - 1; i++)
{
create->column_names[i] = create->column_names[i + 1];
}
char ** tmp = realloc(create->column_names, sizeof(char*) * create->columns - 1);
ss_dassert(tmp); ss_dassert(tmp);
if (tmp == NULL) if (tmp == NULL)
@ -856,15 +959,24 @@ bool table_create_alter(TABLE_CREATE *create, const char *sql, const char *end)
create->column_names = tmp; create->column_names = tmp;
create->columns--; create->columns--;
updates++; updates++;
}
tok = get_next_def(tok, end); tok = get_next_def(tok, end);
len = 0; len = 0;
} }
else if (tok_eq(ptok, "change", plen) && tok_eq(tok, "column", len)) else if (tok_eq(ptok, "change", plen) && tok_eq(tok, "column", len))
{ {
tok = get_tok(tok + len, &len, end); tok = get_tok(tok + len, &len, end);
MXS_FREE(create->column_names[create->columns - 1]);
create->column_names[create->columns - 1] = strndup(tok, len); int idx = get_column_index(create, tok);
if (idx != -1)
{
MXS_FREE(create->column_names[idx]);
create->column_names[idx] = strndup(tok, len);
updates++; updates++;
}
tok = get_next_def(tok, end); tok = get_next_def(tok, end);
len = 0; len = 0;
} }
@ -975,7 +1087,6 @@ TABLE_MAP *table_map_alloc(uint8_t *ptr, uint8_t hdr_len, TABLE_CREATE* create)
map->id = table_id; map->id = table_id;
map->version = create->version; map->version = create->version;
map->flags = flags; map->flags = flags;
ss_dassert(column_count == create->columns);
map->columns = column_count; map->columns = column_count;
map->column_types = MXS_MALLOC(column_count); map->column_types = MXS_MALLOC(column_count);
/** Allocate at least one byte for the metadata */ /** Allocate at least one byte for the metadata */

View File

@ -84,6 +84,23 @@ static const char *avro_event_type = "event_type";
static const char *avro_timestamp = "timestamp"; static const char *avro_timestamp = "timestamp";
static char *avro_client_ouput[] = { "Undefined", "JSON", "Avro" }; static char *avro_client_ouput[] = { "Undefined", "JSON", "Avro" };
static inline bool is_reserved_word(const char* word)
{
return strcasecmp(word, avro_domain) == 0 ||
strcasecmp(word, avro_server_id) == 0 ||
strcasecmp(word, avro_sequence) == 0 ||
strcasecmp(word, avro_event_number) == 0 ||
strcasecmp(word, avro_event_type) == 0 ||
strcasecmp(word, avro_timestamp) == 0;
}
static inline void fix_reserved_word(char *tok)
{
if (is_reserved_word(tok))
{
strcat(tok, "_");
}
}
/** How a binlog file is closed */ /** How a binlog file is closed */
typedef enum avro_binlog_end typedef enum avro_binlog_end
@ -111,6 +128,8 @@ typedef struct table_create
{ {
uint64_t columns; uint64_t columns;
char **column_names; char **column_names;
char **column_types;
int* column_lengths;
char *table; char *table;
char *database; char *database;
int version; /**< How many versions of this table have been used */ int version; /**< How many versions of this table have been used */

View File

@ -29,6 +29,7 @@
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#include <inttypes.h>
#include <maxscale/alloc.h> #include <maxscale/alloc.h>
#include <maxscale/atomic.h> #include <maxscale/atomic.h>
@ -998,7 +999,7 @@ maxinfo_zombie_dcbs()
/** /**
* Interface to poll stats for reads * Interface to poll stats for reads
*/ */
static int static int64_t
maxinfo_read_events() maxinfo_read_events()
{ {
return poll_get_stat(POLL_STAT_READ); return poll_get_stat(POLL_STAT_READ);
@ -1007,7 +1008,7 @@ maxinfo_read_events()
/** /**
* Interface to poll stats for writes * Interface to poll stats for writes
*/ */
static int static int64_t
maxinfo_write_events() maxinfo_write_events()
{ {
return poll_get_stat(POLL_STAT_WRITE); return poll_get_stat(POLL_STAT_WRITE);
@ -1016,7 +1017,7 @@ maxinfo_write_events()
/** /**
* Interface to poll stats for errors * Interface to poll stats for errors
*/ */
static int static int64_t
maxinfo_error_events() maxinfo_error_events()
{ {
return poll_get_stat(POLL_STAT_ERROR); return poll_get_stat(POLL_STAT_ERROR);
@ -1025,7 +1026,7 @@ maxinfo_error_events()
/** /**
* Interface to poll stats for hangup * Interface to poll stats for hangup
*/ */
static int static int64_t
maxinfo_hangup_events() maxinfo_hangup_events()
{ {
return poll_get_stat(POLL_STAT_HANGUP); return poll_get_stat(POLL_STAT_HANGUP);
@ -1034,7 +1035,7 @@ maxinfo_hangup_events()
/** /**
* Interface to poll stats for accepts * Interface to poll stats for accepts
*/ */
static int static int64_t
maxinfo_accept_events() maxinfo_accept_events()
{ {
return poll_get_stat(POLL_STAT_ACCEPT); return poll_get_stat(POLL_STAT_ACCEPT);
@ -1043,7 +1044,7 @@ maxinfo_accept_events()
/** /**
* Interface to poll stats for event queue length * Interface to poll stats for event queue length
*/ */
static int static int64_t
maxinfo_event_queue_length() maxinfo_event_queue_length()
{ {
return poll_get_stat(POLL_STAT_EVQ_LEN); return poll_get_stat(POLL_STAT_EVQ_LEN);
@ -1052,7 +1053,7 @@ maxinfo_event_queue_length()
/** /**
* Interface to poll stats for max event queue length * Interface to poll stats for max event queue length
*/ */
static int static int64_t
maxinfo_max_event_queue_length() maxinfo_max_event_queue_length()
{ {
return poll_get_stat(POLL_STAT_EVQ_MAX); return poll_get_stat(POLL_STAT_EVQ_MAX);
@ -1061,7 +1062,7 @@ maxinfo_max_event_queue_length()
/** /**
* Interface to poll stats for max queue time * Interface to poll stats for max queue time
*/ */
static int static int64_t
maxinfo_max_event_queue_time() maxinfo_max_event_queue_time()
{ {
return poll_get_stat(POLL_STAT_MAX_QTIME); return poll_get_stat(POLL_STAT_MAX_QTIME);
@ -1070,7 +1071,7 @@ maxinfo_max_event_queue_time()
/** /**
* Interface to poll stats for max event execution time * Interface to poll stats for max event execution time
*/ */
static int static int64_t
maxinfo_max_event_exec_time() maxinfo_max_event_exec_time()
{ {
return poll_get_stat(POLL_STAT_MAX_EXECTIME); return poll_get_stat(POLL_STAT_MAX_EXECTIME);
@ -1142,8 +1143,8 @@ status_row(RESULTSET *result, void *data)
(char *)(*status[context->index].func)()); (char *)(*status[context->index].func)());
break; break;
case VT_INT: case VT_INT:
snprintf(buf, 80, "%ld", snprintf(buf, 80, "%" PRId64,
(long)(*status[context->index].func)()); (int64_t)(*status[context->index].func)());
resultset_row_set(row, 1, buf); resultset_row_set(row, 1, buf);
break; break;
default: default:

View File

@ -13,7 +13,6 @@
#include "readwritesplit.h" #include "readwritesplit.h"
#include <my_config.h>
#include <stdio.h> #include <stdio.h>
#include <strings.h> #include <strings.h>
#include <string.h> #include <string.h>

View File

@ -13,7 +13,6 @@
#include "schemarouter.h" #include "schemarouter.h"
#include <my_config.h>
#include <stdio.h> #include <stdio.h>
#include <strings.h> #include <strings.h>
#include <string.h> #include <string.h>

View File

@ -15,7 +15,6 @@
*/ */
#include <maxscale/cdefs.h> #include <maxscale/cdefs.h>
#include <my_config.h>
#include <poll.h> #include <poll.h>
#include <maxscale/buffer.h> #include <maxscale/buffer.h>
#include <maxscale/modutil.h> #include <maxscale/modutil.h>