Merge branch '2.0' into 2.1

This commit is contained in:
Markus Mäkelä
2017-05-10 09:12:42 +03:00
15 changed files with 198 additions and 83 deletions

View File

@ -60,6 +60,10 @@ include_directories(${PCRE2_INCLUDE_DIRS})
if(NOT MARIADB_CONNECTOR_FOUND) if(NOT MARIADB_CONNECTOR_FOUND)
message(STATUS "Building MariaDB Connector-C from source.") message(STATUS "Building MariaDB Connector-C from source.")
include(cmake/BuildMariaDBConnector.cmake) include(cmake/BuildMariaDBConnector.cmake)
else()
# This is required as the core depends on the `connector-c` target
add_custom_target(connector-c)
message(STATUS "Using system Connector-C")
endif() endif()
if(NOT JANSSON_FOUND) if(NOT JANSSON_FOUND)

View File

@ -11,7 +11,6 @@
* Public License. * Public License.
*/ */
#include <my_config.h>
#include <sys/types.h> #include <sys/types.h>
#include <sys/stat.h> #include <sys/stat.h>
#include <fcntl.h> #include <fcntl.h>

View File

@ -57,7 +57,7 @@ void dShowThreads(DCB *dcb);
void dShowEventQ(DCB *dcb); void dShowEventQ(DCB *dcb);
void dShowEventStats(DCB *dcb); void dShowEventStats(DCB *dcb);
int poll_get_stat(POLL_STAT stat); int64_t poll_get_stat(POLL_STAT stat);
RESULTSET *eventTimesGetList(); RESULTSET *eventTimesGetList();
void poll_send_message(enum poll_message msg, void *data); void poll_send_message(enum poll_message msg, void *data);

View File

@ -1517,8 +1517,7 @@ dShowEventStats(DCB *pdcb)
* @param stat The required statistic * @param stat The required statistic
* @return The value of that statistic * @return The value of that statistic
*/ */
int int64_t poll_get_stat(POLL_STAT stat)
poll_get_stat(POLL_STAT stat)
{ {
switch (stat) switch (stat)
{ {

View File

@ -31,7 +31,6 @@
#undef NDEBUG #undef NDEBUG
#endif #endif
#define FAILTEST(s) printf("TEST FAILED: " s "\n");return 1; #define FAILTEST(s) printf("TEST FAILED: " s "\n");return 1;
#include <my_config.h>
#include <mysql.h> #include <mysql.h>
#include <stdio.h> #include <stdio.h>
#include <maxscale/notification.h> #include <maxscale/notification.h>

View File

@ -59,7 +59,6 @@
#define MXS_MODULE_NAME "mqfilter" #define MXS_MODULE_NAME "mqfilter"
#include <my_config.h>
#include <stdio.h> #include <stdio.h>
#include <fcntl.h> #include <fcntl.h>
#include <maxscale/filter.h> #include <maxscale/filter.h>

View File

@ -2,4 +2,5 @@ install_script(cdc.py core)
install_script(cdc_users.py core) install_script(cdc_users.py core)
install_script(cdc_last_transaction.py core) install_script(cdc_last_transaction.py core)
install_script(cdc_kafka_producer.py core) install_script(cdc_kafka_producer.py core)
install_script(cdc_schema.py core)
install_file(cdc_schema.go core) install_file(cdc_schema.go core)

View File

@ -12,52 +12,32 @@
# Public License. # Public License.
import time import time
import json
import re
import sys import sys
import socket import socket
import hashlib import hashlib
import argparse import argparse
import subprocess
import selectors import selectors
import binascii import binascii
import os import os
# Read data as JSON def read_data():
def read_json(): sel = selectors.DefaultSelector()
decoder = json.JSONDecoder() sel.register(sock, selectors.EVENT_READ)
rbuf = bytes()
ep = selectors.EpollSelector()
ep.register(sock, selectors.EVENT_READ)
while True: while True:
pollrc = ep.select(timeout=int(opts.read_timeout) if int(opts.read_timeout) > 0 else None)
try: try:
events = sel.select(timeout=int(opts.read_timeout) if int(opts.read_timeout) > 0 else None)
buf = sock.recv(4096, socket.MSG_DONTWAIT) buf = sock.recv(4096, socket.MSG_DONTWAIT)
rbuf += buf if len(buf) > 0:
while True: os.write(sys.stdout.fileno(), buf)
rbuf = rbuf.lstrip() sys.stdout.flush()
data = decoder.raw_decode(rbuf.decode('utf_8')) else:
rbuf = rbuf[data[1]:] raise Exception('Socket was closed')
print(json.dumps(data[0]))
except ValueError as err: except BlockingIOError:
sys.stdout.flush()
pass
except Exception:
break break
except Exception as ex:
# Read data as Avro print(ex, file=sys.stderr)
def read_avro():
ep = selectors.EpollSelector()
ep.register(sock, selectors.EVENT_READ)
while True:
pollrc = ep.select(timeout=int(opts.read_timeout) if int(opts.read_timeout) > 0 else None)
try:
buf = sock.recv(4096, socket.MSG_DONTWAIT)
os.write(sys.stdout.fileno(), buf)
sys.stdout.flush()
except Exception:
break break
parser = argparse.ArgumentParser(description = "CDC Binary consumer", conflict_handler="resolve") parser = argparse.ArgumentParser(description = "CDC Binary consumer", conflict_handler="resolve")
@ -91,7 +71,4 @@ response = str(sock.recv(1024)).encode('utf_8')
# Request a data stream # Request a data stream
sock.send(bytes(("REQUEST-DATA " + opts.FILE + (" " + opts.GTID if opts.GTID else "")).encode())) sock.send(bytes(("REQUEST-DATA " + opts.FILE + (" " + opts.GTID if opts.GTID else "")).encode()))
if opts.format == "JSON": read_data()
read_json()
elif opts.format == "AVRO":
read_avro()

View File

@ -0,0 +1,88 @@
#!/usr/bin/env python3
# Copyright (c) 2016 MariaDB Corporation Ab
#
# Use of this software is governed by the Business Source License included
# in the LICENSE.TXT file and at www.mariadb.com/bsl11.
#
# Change Date: 2019-07-01
#
# On the date above, in accordance with the Business Source License, use
# of this software will be governed by version 2 or later of the General
# Public License.
#
# This program requires the MySQL Connector/Python to work
#
import mysql.connector as mysql
import json
import sys
import argparse
parser = argparse.ArgumentParser(description = "CDC Schema Generator", conflict_handler="resolve", epilog="""This program generates CDC schema files for a specific table or all the tables in a database. The
schema files need to be generated if the binary log files do not contain the
CREATE TABLE events that define the table layout.""")
parser.add_argument("-h", "--host", dest="host", help="Network address where the connection is made", default="localhost")
parser.add_argument("-P", "--port", dest="port", help="Port where the connection is made", default="3306")
parser.add_argument("-u", "--user", dest="user", help="Username used when connecting", default="")
parser.add_argument("-p", "--password", dest="password", help="Password used when connecting", default="")
parser.add_argument("DATABASE", help="Generate Avro schemas for this database")
opts = parser.parse_args(sys.argv[1:])
def parse_field(row):
res = dict()
name = row[1].lower().split('(')[0]
if name in ("date", "datetime", "time", "timestamp", "year", "tinytext", "text",
"mediumtext", "longtext", "char", "varchar", "enum", "set"):
res["type"] = "string"
elif name in ("tinyblob", "blob", "mediumblob", "longblob", "binary", "varbinary"):
res["type"] = "bytes"
elif name in ("int", "smallint", "mediumint", "integer", "tinyint", "short", "bit"):
res["type"] = "int"
elif name in ("float"):
res["type"] = "float"
elif name in ("double", "decimal"):
res["type"] = "double"
elif name in ("null"):
res["type"] = "null"
elif name in ("long", "bigint"):
res["type"] = "long"
else:
res["type"] = "string"
res["name"] = row[0].lower()
return res
try:
conn = mysql.connect(user=opts.user, password=opts.password, host=opts.host, port=opts.port)
cursor = conn.cursor()
cursor.execute("SHOW TABLES FROM {}".format(opts.DATABASE))
tables = []
for res in cursor:
tables.append(res[0])
for t in tables:
schema = dict(namespace="MaxScaleChangeDataSchema.avro", type="record", name="ChangeRecord", fields=[])
cursor.execute("DESCRIBE {}.{}".format(opts.DATABASE, t))
for res in cursor:
schema["fields"].append(parse_field(res))
dest = open("{}.{}.000001.avsc".format(opts.DATABASE, t), 'w')
dest.write(json.dumps(schema))
dest.close()
cursor.close()
conn.close()
except Exception as e:
print(e)
exit(1)

View File

@ -507,9 +507,8 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
ptr += (ncolumns + 7) / 8; ptr += (ncolumns + 7) / 8;
ss_dassert(ptr < end); ss_dassert(ptr < end);
for (long i = 0; i < map->columns && npresent < ncolumns; i++) for (long i = 0; i < map->columns && i < create->columns && npresent < ncolumns; i++)
{ {
ss_dassert(create->columns == map->columns);
ss_debug(int rc = )avro_value_get_by_name(record, create->column_names[i], &field, NULL); ss_debug(int rc = )avro_value_get_by_name(record, create->column_names[i], &field, NULL);
ss_dassert(rc == 0); ss_dassert(rc == 0);
@ -552,12 +551,31 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
} }
else else
{ {
uint8_t bytes = *ptr; /**
* The first byte in the metadata stores the real type of
* the string (ENUM and SET types are also stored as fixed
* length strings).
*
* The first two bits of the second byte contain the XOR'ed
* field length but as that information is not relevant for
* us, we just use this information to know whether to read
* one or two bytes for string length.
*/
uint8_t bytes = *ptr++;
int len = metadata[metadata_offset] +
(((metadata[metadata_offset + 1] >> 4) & 0x3) ^ 0x3);
if (len <= 255)
{
bytes += *ptr++ << 8;
}
char str[bytes + 1]; char str[bytes + 1];
memcpy(str, ptr + 1, bytes); memcpy(str, ptr, bytes);
str[bytes] = '\0'; str[bytes] = '\0';
avro_value_set_string(&field, str); avro_value_set_string(&field, str);
ptr += bytes + 1; ptr += bytes;
ss_dassert(ptr < end); ss_dassert(ptr < end);
} }
} }

View File

@ -794,6 +794,22 @@ void make_avro_token(char* dest, const char* src, int length)
dest[length] = '\0'; dest[length] = '\0';
} }
int get_column_index(TABLE_CREATE *create, const char *tok)
{
int idx = -1;
for (int x = 0; x < create->columns; x++)
{
if (strcasecmp(create->column_names[x], tok) == 0)
{
idx = x;
break;
}
}
return idx;
}
bool table_create_alter(TABLE_CREATE *create, const char *sql, const char *end) bool table_create_alter(TABLE_CREATE *create, const char *sql, const char *end)
{ {
const char *tbl = strcasestr(sql, "table"), *def; const char *tbl = strcasestr(sql, "table"), *def;
@ -844,27 +860,45 @@ bool table_create_alter(TABLE_CREATE *create, const char *sql, const char *end)
{ {
tok = get_tok(tok + len, &len, end); tok = get_tok(tok + len, &len, end);
MXS_FREE(create->column_names[create->columns - 1]); int idx = get_column_index(create, tok);
char ** tmp = MXS_REALLOC(create->column_names, sizeof(char*) * create->columns - 1);
ss_dassert(tmp);
if (tmp == NULL) if (idx != -1)
{ {
return false; free(create->column_names[idx]);
for (int i = idx; i < (int)create->columns - 1; i++)
{
create->column_names[i] = create->column_names[i + 1];
}
char ** tmp = realloc(create->column_names, sizeof(char*) * create->columns - 1);
ss_dassert(tmp);
if (tmp == NULL)
{
return false;
}
create->column_names = tmp;
create->columns--;
updates++;
} }
create->column_names = tmp;
create->columns--;
updates++;
tok = get_next_def(tok, end); tok = get_next_def(tok, end);
len = 0; len = 0;
} }
else if (tok_eq(ptok, "change", plen) && tok_eq(tok, "column", len)) else if (tok_eq(ptok, "change", plen) && tok_eq(tok, "column", len))
{ {
tok = get_tok(tok + len, &len, end); tok = get_tok(tok + len, &len, end);
MXS_FREE(create->column_names[create->columns - 1]);
create->column_names[create->columns - 1] = strndup(tok, len); int idx = get_column_index(create, tok);
updates++;
if (idx != -1)
{
free(create->column_names[idx]);
create->column_names[idx] = strndup(tok, len);
updates++;
}
tok = get_next_def(tok, end); tok = get_next_def(tok, end);
len = 0; len = 0;
} }
@ -975,7 +1009,6 @@ TABLE_MAP *table_map_alloc(uint8_t *ptr, uint8_t hdr_len, TABLE_CREATE* create)
map->id = table_id; map->id = table_id;
map->version = create->version; map->version = create->version;
map->flags = flags; map->flags = flags;
ss_dassert(column_count == create->columns);
map->columns = column_count; map->columns = column_count;
map->column_types = MXS_MALLOC(column_count); map->column_types = MXS_MALLOC(column_count);
/** Allocate at least one byte for the metadata */ /** Allocate at least one byte for the metadata */

View File

@ -29,6 +29,7 @@
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#include <inttypes.h>
#include <maxscale/alloc.h> #include <maxscale/alloc.h>
#include <maxscale/atomic.h> #include <maxscale/atomic.h>
@ -998,7 +999,7 @@ maxinfo_zombie_dcbs()
/** /**
* Interface to poll stats for reads * Interface to poll stats for reads
*/ */
static int static int64_t
maxinfo_read_events() maxinfo_read_events()
{ {
return poll_get_stat(POLL_STAT_READ); return poll_get_stat(POLL_STAT_READ);
@ -1007,7 +1008,7 @@ maxinfo_read_events()
/** /**
* Interface to poll stats for writes * Interface to poll stats for writes
*/ */
static int static int64_t
maxinfo_write_events() maxinfo_write_events()
{ {
return poll_get_stat(POLL_STAT_WRITE); return poll_get_stat(POLL_STAT_WRITE);
@ -1016,7 +1017,7 @@ maxinfo_write_events()
/** /**
* Interface to poll stats for errors * Interface to poll stats for errors
*/ */
static int static int64_t
maxinfo_error_events() maxinfo_error_events()
{ {
return poll_get_stat(POLL_STAT_ERROR); return poll_get_stat(POLL_STAT_ERROR);
@ -1025,7 +1026,7 @@ maxinfo_error_events()
/** /**
* Interface to poll stats for hangup * Interface to poll stats for hangup
*/ */
static int static int64_t
maxinfo_hangup_events() maxinfo_hangup_events()
{ {
return poll_get_stat(POLL_STAT_HANGUP); return poll_get_stat(POLL_STAT_HANGUP);
@ -1034,7 +1035,7 @@ maxinfo_hangup_events()
/** /**
* Interface to poll stats for accepts * Interface to poll stats for accepts
*/ */
static int static int64_t
maxinfo_accept_events() maxinfo_accept_events()
{ {
return poll_get_stat(POLL_STAT_ACCEPT); return poll_get_stat(POLL_STAT_ACCEPT);
@ -1043,7 +1044,7 @@ maxinfo_accept_events()
/** /**
* Interface to poll stats for event queue length * Interface to poll stats for event queue length
*/ */
static int static int64_t
maxinfo_event_queue_length() maxinfo_event_queue_length()
{ {
return poll_get_stat(POLL_STAT_EVQ_LEN); return poll_get_stat(POLL_STAT_EVQ_LEN);
@ -1052,7 +1053,7 @@ maxinfo_event_queue_length()
/** /**
* Interface to poll stats for max event queue length * Interface to poll stats for max event queue length
*/ */
static int static int64_t
maxinfo_max_event_queue_length() maxinfo_max_event_queue_length()
{ {
return poll_get_stat(POLL_STAT_EVQ_MAX); return poll_get_stat(POLL_STAT_EVQ_MAX);
@ -1061,7 +1062,7 @@ maxinfo_max_event_queue_length()
/** /**
* Interface to poll stats for max queue time * Interface to poll stats for max queue time
*/ */
static int static int64_t
maxinfo_max_event_queue_time() maxinfo_max_event_queue_time()
{ {
return poll_get_stat(POLL_STAT_MAX_QTIME); return poll_get_stat(POLL_STAT_MAX_QTIME);
@ -1070,7 +1071,7 @@ maxinfo_max_event_queue_time()
/** /**
* Interface to poll stats for max event execution time * Interface to poll stats for max event execution time
*/ */
static int static int64_t
maxinfo_max_event_exec_time() maxinfo_max_event_exec_time()
{ {
return poll_get_stat(POLL_STAT_MAX_EXECTIME); return poll_get_stat(POLL_STAT_MAX_EXECTIME);
@ -1137,17 +1138,17 @@ status_row(RESULTSET *result, void *data)
resultset_row_set(row, 0, status[context->index].name); resultset_row_set(row, 0, status[context->index].name);
switch (status[context->index].type) switch (status[context->index].type)
{ {
case VT_STRING: case VT_STRING:
resultset_row_set(row, 1, resultset_row_set(row, 1,
(char *)(*status[context->index].func)()); (char *)(*status[context->index].func)());
break; break;
case VT_INT: case VT_INT:
snprintf(buf, 80, "%ld", snprintf(buf, 80, "%" PRId64,
(long)(*status[context->index].func)()); (int64_t)(*status[context->index].func)());
resultset_row_set(row, 1, buf); resultset_row_set(row, 1, buf);
break; break;
default: default:
ss_dassert(!true); ss_dassert(!true);
} }
context->index++; context->index++;
return row; return row;

View File

@ -13,7 +13,6 @@
#include "readwritesplit.h" #include "readwritesplit.h"
#include <my_config.h>
#include <stdio.h> #include <stdio.h>
#include <strings.h> #include <strings.h>
#include <string.h> #include <string.h>

View File

@ -13,7 +13,6 @@
#include "schemarouter.h" #include "schemarouter.h"
#include <my_config.h>
#include <stdio.h> #include <stdio.h>
#include <strings.h> #include <strings.h>
#include <string.h> #include <string.h>

View File

@ -15,7 +15,6 @@
*/ */
#include <maxscale/cdefs.h> #include <maxscale/cdefs.h>
#include <my_config.h>
#include <poll.h> #include <poll.h>
#include <maxscale/buffer.h> #include <maxscale/buffer.h>
#include <maxscale/modutil.h> #include <maxscale/modutil.h>