Merge branch '2.3' into develop
This commit is contained in:
@ -56,8 +56,10 @@ static std::string do_query(MXS_MONITORED_SERVER* srv, const char* query)
|
||||
// Returns a numeric version similar to mysql_get_server_version
|
||||
int get_cs_version(MXS_MONITORED_SERVER* srv)
|
||||
{
|
||||
// GCC 4.8 appears to have a broken std::regex_constants::ECMAScript that doesn't support brackets
|
||||
std::regex re("Columnstore \\([0-9]*\\)[.]\\([0-9]*\\)[.]\\([0-9]*\\)-[0-9]*",
|
||||
std::regex_constants::basic);
|
||||
std::string result = do_query(srv, "SELECT @@version_comment");
|
||||
std::regex re("Columnstore ([0-9]*)[.]([0-9]*)[.]([0-9]*)-[0-9]*");
|
||||
std::smatch match;
|
||||
int rval = 0;
|
||||
|
||||
|
@ -3,4 +3,5 @@ install_script(cdc_users.py core)
|
||||
install_script(cdc_last_transaction.py core)
|
||||
install_script(cdc_kafka_producer.py core)
|
||||
install_script(cdc_schema.py core)
|
||||
install_script(cdc_one_schema.py core)
|
||||
install_file(cdc_schema.go core)
|
||||
|
98
server/modules/protocol/examples/cdc_one_schema.py
Executable file
98
server/modules/protocol/examples/cdc_one_schema.py
Executable file
@ -0,0 +1,98 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright (c) 2016 MariaDB Corporation Ab
|
||||
#
|
||||
# Use of this software is governed by the Business Source License included
|
||||
# in the LICENSE.TXT file and at www.mariadb.com/bsl11.
|
||||
#
|
||||
# Change Date: 2022-01-01
|
||||
#
|
||||
# On the date above, in accordance with the Business Source License, use
|
||||
# of this software will be governed by version 2 or later of the General
|
||||
# Public License.
|
||||
|
||||
import json
|
||||
import sys
|
||||
import argparse
|
||||
|
||||
parser = argparse.ArgumentParser(description = """
|
||||
|
||||
This program generates a schema file for a single table by reading a tab
|
||||
separated list of field and type names from the standard input.
|
||||
|
||||
To use this script, pipe the output of the `mysql` command line into the
|
||||
`cdc_one_schema.py` script:
|
||||
|
||||
mysql -ss -u <user> -p -h <host> -P <port> -e 'DESCRIBE `<database>`.`<table>`'|./cdc_one_schema.py <database> <table>
|
||||
|
||||
Replace the <user>, <host>, <port>, <database> and <table> with appropriate
|
||||
values and run the command. Note that the `-ss` parameter is mandatory as that
|
||||
will generate the tab separated output instead of the default pretty-printed
|
||||
output.
|
||||
|
||||
An .avsc file named after the database and table name will be generated in the
|
||||
current working directory. Copy this file to the location pointed by the
|
||||
`avrodir` parameter of the avrorouter.
|
||||
|
||||
Alternatively, you can also copy the output of the `mysql` command to a file and
|
||||
feed it into the script if you cannot execute the SQL command directly:
|
||||
|
||||
# On the database server
|
||||
mysql -ss -u <user> -p -h <host> -P <port> -e 'DESCRIBE `<database>`.`<table>`' > schema.tsv
|
||||
# On the MaxScale server
|
||||
./cdc_one_schema.py <database> <table> < schema.tsv
|
||||
|
||||
""", formatter_class=argparse.RawDescriptionHelpFormatter)
|
||||
parser.add_argument("DATABASE", help="The database name where the table is located")
|
||||
parser.add_argument("TABLE", help="The name of the table")
|
||||
|
||||
opts = parser.parse_args(sys.argv[1:])
|
||||
|
||||
def parse_field(row):
|
||||
res = dict()
|
||||
parts = row[1].lower().split('(')
|
||||
name = parts[0]
|
||||
|
||||
res["real_type"] = name
|
||||
|
||||
if len(parts) > 1 and name not in ["enum", "set", "decimal"]:
|
||||
res["length"] = int(parts[1].split(')')[0])
|
||||
else:
|
||||
res["length"] = -1
|
||||
|
||||
type = "string"
|
||||
|
||||
if name in ("date", "datetime", "time", "timestamp", "year", "tinytext", "text",
|
||||
"mediumtext", "longtext", "char", "varchar", "enum", "set"):
|
||||
type = "string"
|
||||
elif name in ("tinyblob", "blob", "mediumblob", "longblob", "binary", "varbinary"):
|
||||
type = "bytes"
|
||||
elif name in ("int", "smallint", "mediumint", "integer", "tinyint", "short", "bit"):
|
||||
type = "int"
|
||||
elif name in ("float"):
|
||||
type = "float"
|
||||
elif name in ("double", "decimal"):
|
||||
type = "double"
|
||||
elif name in ("long", "bigint"):
|
||||
type = "long"
|
||||
|
||||
res["type"] = ["null", type]
|
||||
|
||||
res["name"] = row[0].lower()
|
||||
|
||||
return res
|
||||
|
||||
try:
|
||||
schema = dict(namespace="MaxScaleChangeDataSchema.avro", type="record", name="ChangeRecord", fields=[])
|
||||
for line in sys.stdin:
|
||||
schema["fields"].append(parse_field(line.split('\t')))
|
||||
|
||||
print("Creating: {}.{}.000001.avsc".format(opts.DATABASE, opts.TABLE))
|
||||
dest = open("{}.{}.000001.avsc".format(opts.DATABASE, opts.TABLE), 'w')
|
||||
dest.write(json.dumps(schema))
|
||||
dest.close()
|
||||
|
||||
except Exception as e:
|
||||
print(e)
|
||||
exit(1)
|
||||
|
@ -652,17 +652,30 @@ void avro_load_metadata_from_schemas(Avro* router)
|
||||
for (int i = files.gl_pathc - 1; i > -1; i--)
|
||||
{
|
||||
char* dbstart = strrchr(files.gl_pathv[i], '/');
|
||||
mxb_assert(dbstart);
|
||||
|
||||
if (!dbstart)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
dbstart++;
|
||||
|
||||
char* tablestart = strchr(dbstart, '.');
|
||||
mxb_assert(tablestart);
|
||||
|
||||
if (!tablestart)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
snprintf(db, sizeof(db), "%.*s", (int)(tablestart - dbstart), dbstart);
|
||||
tablestart++;
|
||||
|
||||
char* versionstart = strchr(tablestart, '.');
|
||||
mxb_assert(versionstart);
|
||||
|
||||
if (!versionstart)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
snprintf(table, sizeof(table), "%.*s", (int)(versionstart - tablestart), tablestart);
|
||||
versionstart++;
|
||||
|
Reference in New Issue
Block a user