Merge branch '2.1' into develop

This commit is contained in:
Markus Mäkelä 2017-05-17 13:48:31 +03:00
commit 322983a5f4
36 changed files with 645 additions and 245 deletions

View File

@ -60,6 +60,10 @@ include_directories(${PCRE2_INCLUDE_DIRS})
if(NOT MARIADB_CONNECTOR_FOUND)
message(STATUS "Building MariaDB Connector-C from source.")
include(cmake/BuildMariaDBConnector.cmake)
else()
# This is required as the core depends on the `connector-c` target
add_custom_target(connector-c)
message(STATUS "Using system Connector-C")
endif()
if(NOT JANSSON_FOUND)

View File

@ -1,8 +1,8 @@
#Database Firewall filter
# Database Firewall filter
## Overview
The database firewall filter is used to block queries that match a set of
The Database Firewall filter is used to block queries that match a set of
rules. It can be used to prevent harmful queries from reaching the backend
database instances or to limit access to the database based on a more flexible
set of rules compared to the traditional GRANT-based privilege system. Currently
@ -10,9 +10,9 @@ the filter does not support multi-statements.
## Configuration
The database firewall filter only requires minimal configuration in the
maxscale.cnf file. The actual rules of the database firewall filter are located
in a separate text file. The following is an example of a database firewall
The Database Firewall filter only requires minimal configuration in the
maxscale.cnf file. The actual rules of the Database Firewall filter are located
in a separate text file. The following is an example of a Database Firewall
filter configuration in maxscale.cnf.
```
@ -32,7 +32,7 @@ filters=DatabaseFirewall
### Filter Parameters
The database firewall filter has one mandatory parameter, `rules`.
The Database Firewall filter has one mandatory parameter, `rules`.
#### `rules`
@ -128,7 +128,7 @@ parameter (_allow_, _block_ or _ignore_).
### Mandatory rule parameters
The database firewall filter's rules expect a single mandatory parameter for a
The Database Firewall filter's rules expect a single mandatory parameter for a
rule. You can define multiple rules to cover situations where you would like to
apply multiple mandatory rules to a query.
@ -220,7 +220,7 @@ the network address. You can use the `%` character as the wildcard to enable
user name matching from any address or network matching for all users. After the
list of users and networks the keyword match is expected.
After this either the keyword `any` `all` or `strict_all` is expected. This
After this either the keyword `any`, `all` or `strict_all` is expected. This
defined how the rules are matched. If `any` is used when the first rule is
matched the query is considered as matched and the rest of the rules are
skipped. If instead the `all` keyword is used all rules must match for the query
@ -260,7 +260,7 @@ Shows the current statistics of the rules.
To prevent the excessive use of a database we want to set a limit on the rate of
queries. We only want to apply this limit to certain queries that cause unwanted
behavior. To achieve this we can use a regular expression.
behaviour. To achieve this we can use a regular expression.
First we define the limit on the rate of queries. The first parameter for the
rule sets the number of allowed queries to 10 queries and the second parameter

View File

@ -35,6 +35,8 @@ the _ssn_ would be masked, as in
...
```
## Security
Note that he masking filter alone is *not* sufficient for preventing
access to a particular column. As the masking filter works on the column
name alone a query like
@ -49,8 +51,11 @@ a sufficient number of times with different _ssn_ values, will, eventually,
reveal the social security number of all persons in the database.
For a secure solution, the masking filter *must* be combined with the
firewall filter to prevent the use of functions and the use of particular
columns in where-clauses.
firewall filter to prevent the use of functions using which the masking
can be bypassed.
In a future release, the combined use of the masking filter and the
database firewall filter will be simplified.
## Limitations

View File

@ -3,7 +3,7 @@
This filter was introduced in MariaDB MaxScale 2.1.
## Overview
The maxrows filter is capable of restricting the amount of rows that a SELECT,
The Maxrows filter is capable of restricting the amount of rows that a SELECT,
a prepared statement or stored procedure could return to the client application.
If a resultset from a backend server has more rows than the configured limit
@ -12,7 +12,7 @@ or the resultset size exceeds the configured size,
## Configuration
The maxrows filter is easy to configure and to add to any existing service.
The Maxrows filter is easy to configure and to add to any existing service.
```
[MaxRows]
@ -22,12 +22,12 @@ module=maxrows
[MaxRows Routing Service]
type=service
...
filters=maxrows
filters=MaxRows
```
### Filter Parameters
The maxrows filter has no mandatory parameters.
The Maxrows filter has no mandatory parameters.
Optional parameters are:
#### `max_resultset_rows`
@ -81,7 +81,7 @@ ERROR 1415 (0A000): Row limit/size exceeded for query: select * from test.t4
#### `debug`
An integer value, using which the level of debug logging made by the maxrows
An integer value, using which the level of debug logging made by the Maxrows
filter can be controlled. The value is actually a bitfield with different bits
denoting different logging.
@ -97,8 +97,8 @@ debug=2
## Example Configuration
Here is an example of filter configuration where the max number of returned
rows is 10000 and max allowed resultset size is 256KB
Here is an example of filter configuration where the maximum number of returned
rows is 10000 and maximum allowed resultset size is 256KB
```
[MaxRows]

View File

@ -6,7 +6,7 @@ The Query Log All (QLA) filter is a filter module for MariaDB MaxScale that is a
## Configuration
The configuration block for the QLA filter requires the minimal filter options in it's section within the maxscale.cnf file, stored in /etc/maxscale.cnf.
The configuration block for the QLA filter requires the minimal filter options in its section within the maxscale.cnf file, stored in /etc/maxscale.cnf.
```
[MyLogFilter]
type=filter
@ -31,7 +31,7 @@ The QLA filter accepts the following options.
case | Use case-sensitive matching
extended | Use extended regular expression syntax (ERE)
To use multiple filter options, list them in a comma-separated list. If no file settings are given, default will be used. Multiple file settings can be enabled simultaneously.
To use multiple filter options, list them in a comma-separated list. If no options are given, default will be used. Multiple options can be enabled simultaneously.
```
options=case,extended
@ -53,7 +53,7 @@ The basename of the output file created for each session. A session index is add
filebase=/tmp/SqlQueryLog
```
The filebase may also be set as the filter, the mechanism to set the filebase via the filter option is superseded by the parameter. If both are set the parameter setting will be used and the filter option ignored.
The filebase may also be set as the filter option, the mechanism to set the filebase via the filter option is superseded by the parameter. If both are set the parameter setting will be used and the filter option ignored.
### `match`
@ -99,8 +99,7 @@ user=john
### `log_type`
The type of log file to use. Parameter value is a comma separated list of the
following values. The default value is _session_.
The type of log file to use. The default value is _session_.
|Value | Description |
|--------|--------------------------------|
@ -108,7 +107,7 @@ following values. The default value is _session_.
|unified |Use one file for all sessions |
```
log_type=session, unified
log_type=session
```
### `log_data`

View File

@ -2,7 +2,7 @@
## Overview
The regex filter is a filter module for MariaDB MaxScale that is able to rewrite query content using regular expression matches and text substitution. It uses the PCRE2 syntax which differs from the POSIX regular expressions used in MariaDB MaxScale versions prior to 1.3.0.
The Regex filter is a filter module for MariaDB MaxScale that is able to rewrite query content using regular expression matches and text substitution. It uses the PCRE2 syntax which differs from the POSIX regular expressions used in MariaDB MaxScale versions prior to 1.3.0.
For all details about the PCRE2 syntax, please read the [PCRE2 syntax documentation](http://www.pcre.org/current/doc/html/pcre2syntax.html).
@ -10,7 +10,7 @@ Please note that the PCRE2 library uses a different syntax to refer to capture g
## Configuration
The configuration block for the Regex filter requires the minimal filter options in its section within the maxscale.cnf file, stored in /etc/maxscale.cnf.
The configuration block for the Regex filter requires the minimal filter options in its section within the maxscale.cnf file, stored in /etc/maxscale.cnf.
```
[MyRegexFilter]
@ -30,7 +30,7 @@ filters=MyRegexfilter
## Filter Options
The regex filter accepts the options ignorecase or case. These define if the pattern text should take the case of the string it is matching against into consideration or not.
The Regex filter accepts the options ignorecase or case. These define if the pattern text should take the case of the string it is matching against into consideration or not.
## Filter Parameters

View File

@ -104,7 +104,7 @@ user=john
You have an order system and believe the updates of the PRODUCTS table is causing some performance issues for the rest of your application. You would like to know which of the many updates in your application is causing the issue.
Add a filter with the following definition;
Add a filter with the following definition:
```
[ProductsUpdateTop20]
@ -120,9 +120,9 @@ Note the exclude entry, this is to prevent updates to the PRODUCTS_STOCK table f
### Example 2 - One Application Server is Slow
One of your applications servers is slower than the rest, you believe it is related to database access but you not not sure what is taking the time.
One of your applications servers is slower than the rest, you believe it is related to database access but you are not sure what is taking the time.
Add a filter with the following definition;
Add a filter with the following definition:
```
[SlowAppServer]

View File

@ -31,7 +31,7 @@ You can install the packages with the following commands.
```
sudo yum install git gcc gcc-c++ ncurses-devel bison flex glibc-devel cmake \
libgcc perl make libtool openssl openssl-devel libcurl-devel pcre-devel \
tcl tcl-devel systemtap-sdt-devel libuuid libuuid-devel sqlite3 sqlite3-devel \
tcl tcl-devel systemtap-sdt-devel libuuid libuuid-devel sqlite sqlite-devel \
libmicrohttpd-devel
```

View File

@ -74,7 +74,7 @@ same node for writes.
If the `root_node_as_master` option is disabled for galeramon, the node with the
lowest index will always be chosen as the master. If it is enabled, only the
node with a a _wsrep_local_index_ value of 0 can be chosed as the master.
node with a a _wsrep_local_index_ value of 0 can be chosen as the master.
### `set_donor_nodes`
@ -107,7 +107,7 @@ set_donor_nodes=true
If the `use_priority` option is set and a server is configured with the
`priority=<int>` parameter, galeramon will use that as the basis on which the
master node is chosen. This requires the `disable_master_role_setting` to be
undefined or disabled. The server with the lowest positive value in _priority_
undefined or disabled. The server with the lowest positive value of _priority_
will be chosen as the master node when a replacement Galera node is promoted to
a master server inside MaxScale.
@ -115,7 +115,7 @@ Nodes with a non-positive value (_priority_ <= 0) will never be chosen as the ma
you to mark some servers as permanent slaves by assigning a non-positive value
into _priority_.
Here is an example with two servers.
Here is an example.
```
[node-1]
@ -147,9 +147,9 @@ In this example `node-1` is always used as the master if available. If `node-1`
is not available, then the next node with the highest priority rank is used. In
this case it would be `node-3`. If both `node-1` and `node-3` were down, then
`node-2` would be used. Because `node-4` has a value of 0 in _priority_, it will
never be the master. Nodes without priority are considered as having the lowest
priority rank and will be used only if all nodes with priority ranks are not
available.
never be the master. Nodes without _priority_ parameter are considered as
having the lowest priority rank and will be used only if all nodes
with _priority_ parameter are not available.
With priority ranks you can control the order in which MaxScale chooses the
master node. This will allow for a controlled failure and replacement of nodes.

View File

@ -35,9 +35,9 @@ These are optional parameters specific to the MySQL Monitor.
### `detect_replication_lag`
A truth value which controls if replication lag between the master and the
A boolean value which controls if replication lag between the master and the
slaves is monitored. This allows the routers to route read queries to only
slaves that are up to date. Default value for this parameter is false.
slaves that are up to date. Default value for this parameter is _false_.
To detect the replication lag, MaxScale uses the _maxscale_schema.replication_heartbeat_
table. This table is created on the master server and it is updated at every heartbeat
@ -87,7 +87,8 @@ detect_stale_slave=true
### `mysql51_replication`
Enable support for MySQL 5.1 replication monitoring. This is needed if a MySQL server older than 5.5 is used as a slave in replication.
Enable support for MySQL 5.1 replication monitoring. This is needed if a MySQL
server older than 5.5 is used as a slave in replication.
```
mysql51_replication=true
@ -112,7 +113,7 @@ the master status.
By setting the servers into read-only mode, the user can control which
server receive the master status. To do this:
- Enable `@@read_only` on all servers (preferrably through the configuration file)
- Enable `@@read_only` on all servers (preferably through the configuration file)
- Manually disable `@@read_only` on the server which should be the master
This functionality is similar to the [Multi-Master Monitor](MM-Monitor.md)
@ -146,7 +147,7 @@ This mode in mysqlmon is completely passive in the sense that it does not modify
the cluster or any of the servers in it. It only labels the last remaining
server in a cluster as the master server.
Before a server is labeled as a standalone master, the following conditions must
Before a server is labelled as a standalone master, the following conditions must
have been met:
- Previous attempts to connect to other servers in the cluster have failed,
@ -173,7 +174,7 @@ been set up.
### `failcount`
Number of failures that must occur on all failed servers before a standalone
server is labeled as a master. The default value is 5 failures.
server is labelled as a master. The default value is 5 failures.
The monitor will attempt to contact all servers once per monitoring cycle. When
`detect_standalone_master` is enabled, all of the failed servers must fail
@ -181,7 +182,7 @@ _failcount_ number of connection attempts before the last server is labeled as
the master.
The formula for calculating the actual number of milliseconds before the server
is labeled as the master is `monitor_interval * failcount`.
is labelled as the master is `monitor_interval * failcount`.
### `allow_cluster_recovery`
@ -190,7 +191,7 @@ takes a boolean parameter is enabled by default. This parameter requires that
`detect_standalone_master` is set to true. In MaxScale 2.1.0, this parameter was
called `failover_recovery`.
When this parameter is disabled, if the last remaining server is labeled as the
When this parameter is disabled, if the last remaining server is labelled as the
master, the monitor will set all of the failed servers into maintenance
mode. When this option is enabled, the failed servers are allowed to rejoin the
cluster.
@ -228,7 +229,8 @@ starting MaxScale.
## Example 1 - Monitor script
Here is an example shell script which sends an email to an admin when a server goes down.
Here is an example shell script which sends an email to an admin@my.org
when a server goes down.
```
#!/usr/bin/env bash

View File

@ -18,7 +18,7 @@ report at [Jira](https://jira.mariadb.org).
MaxScale 2.1 has not been extended to understand all new features that
MariaDB 10.2 introduces. Please see
[Support for 10.2](About/Support-for-10.2.md)
[Support for 10.2](../About/Support-for-10.2.md)
for details.
## Changed Features
@ -42,8 +42,10 @@ for details.
[Here is a list of bugs fixed since the release of MaxScale 2.1.2.](https://jira.mariadb.org/browse/MXS-1212?jql=project%20%3D%20MXS%20AND%20issuetype%20%3D%20Bug%20AND%20resolution%20in%20(Fixed%2C%20Done)%20AND%20fixVersion%20%3D%202.1.3)
* [MXS-1244](https://jira.mariadb.org/browse/MXS-1244) MySQL monitor "detect_replication_lag=true" doesn't work with "mysql51_replication=true"
* [MXS-1227](https://jira.mariadb.org/browse/MXS-1227) Nagios Plugins broken by change in output of "show monitors" in 2.1
* [MXS-1221](https://jira.mariadb.org/browse/MXS-1221) Nagios plugin scripts does not process -S option properly
* [MXS-1213](https://jira.mariadb.org/browse/MXS-1213) Improve documentation of dynamic configuration changes
* [MXS-1212](https://jira.mariadb.org/browse/MXS-1212) Excessive execution time when maxrows limit has been reached
* [MXS-1202](https://jira.mariadb.org/browse/MXS-1202) maxadmin "show service" counters overflow
* [MXS-1200](https://jira.mariadb.org/browse/MXS-1200) config file lines limited to ~1024 chars
@ -64,6 +66,6 @@ Packages can be downloaded [here](https://mariadb.com/resources/downloads).
The source code of MaxScale is tagged at GitHub with a tag, which is identical
with the version of MaxScale. For instance, the tag of version X.Y.Z of MaxScale
is X.Y.Z. Further, *master* always refers to the latest released non-beta version.
is X.Y.Z.
The source code is available [here](https://github.com/mariadb-corporation/MaxScale).

View File

@ -273,5 +273,12 @@ To build the avrorouter from source, you will need the [Avro C](https://avro.apa
library, liblzma, [the Jansson library](http://www.digip.org/jansson/) and sqlite3 development headers. When
configuring MaxScale with CMake, you will need to add `-DBUILD_CDC=Y` to build the CDC module set.
The Avro C library needs to be build with position independent code enabled. You can do this by
adding the following flags to the CMake invocation when configuring the Avro C library.
```
-DCMAKE_C_FLAGS=-fPIC -DCMAKE_CXX_FLAGS=-fPIC
```
For more details about building MaxScale from source, please refer to the
[Building MaxScale from Source Code](../Getting-Started/Building-MaxScale-from-Source-Code.md) document.

View File

@ -29,7 +29,8 @@ binlog_format=row
binlog_row_image=full
```
_You can find out more about replication formats from the [MariaDB Knowledge Base](https://mariadb.com/kb/en/mariadb/binary-log-formats/)_
_You can find out more about replication formats from the
[MariaDB Knowledge Base](https://mariadb.com/kb/en/mariadb/binary-log-formats/)_
## Configuring MaxScale
@ -77,7 +78,8 @@ You can see that the `source` parameter in the _avro-service_ points to the
_replication-service_ we defined before. This service will be the data source
for the avrorouter. The _filestem_ is the prefix in the binlog files and the
additional _avrodir_ router_option is where the converted Avro files are stored.
For more information on the avrorouter options, read the [Avrorouter Documentation](../Routers/Avrorouter.md).
For more information on the avrorouter options, read the
[Avrorouter Documentation](../Routers/Avrorouter.md).
After the services were defined, we added the listeners for the
_replication-service_ and the _avro-service_. The _CDC_ protocol is a new
@ -103,29 +105,29 @@ the following format:
```
{
"Namespace": "MaxScaleChangeDataSchema.avro",
"Type": "record",
"Name": "ChangeRecord",
"Fields":
"namespace": "MaxScaleChangeDataSchema.avro",
"type": "record",
"name": "ChangeRecord",
"fields":
[
{
"Name": "name",
"Type": "string"
"name": "name",
"type": "string"
},
{
"Name":"address",
"Type":"string"
"name":"address",
"type":"string"
},
{
"Name":"age",
"Type":"int"
"name":"age",
"type":"int"
}
]
}
```
The avrorouter uses the schema file to identify the columns, their names and
what type they are. The Name fiels contains the name of the column and the Type
what type they are. The _name_ field contains the name of the column and the _type_
contains the Avro type. Read the [Avro specification](https://avro.apache.org/docs/1.8.1/spec.html)
for details on the layout of the schema files.

View File

@ -130,13 +130,16 @@ servers=dbbubble1,dbbubble2,dbbubble3,dbbubble4,dbbubble5
user=maxscale
passwd=6628C50E07CCE1F0392EDEEB9D1203F3
```
The table you wish to store in Cassandra in called HighScore and will contain the same columns in both the MariaDB table and the Cassandra table. The first step is to install a MariaDB instance with the Cassandra storage engine to act as a bridge server between the relational database and Cassandra. In this bridge server add a table definition for the HighScore table with the engine type set to cassandra. Add this server into the MariaDB MaxScale configuration and create a service that will connect to this server.
The table you wish to store in Cassandra in called HighScore and will contain the same columns in both the MariaDB table and the Cassandra table. The first step is to install a MariaDB instance with the Cassandra storage engine to act as a bridge server between the relational database and Cassandra. In this bridge server add a table definition for the HighScore table with the engine type set to Cassandra.
See [Cassandra Storage Engine Overview]( https://mariadb.com/kb/en/mariadb/cassandra-storage-engine-overview/) for details.
Add this server into the MariaDB MaxScale configuration and create a service that will connect to this server.
```
[CassandraDB]
type=server
address=192.168.4.28
port=3306
protocol=MySQLBackend
[Cassandra]
type=service
router=readconnrouter

View File

@ -28,7 +28,8 @@ set up replication between the two. The only thing we need to do is to create th
users we will use for monitoring and authentication.
The process of creating monitoring and authentication users for MariaDB MaxScale is described
in the Creating Database Users section of the [MariaDB MaxScale Tutorial](MaxScale-Tutorial.md).
in the Creating Database Users section of the
[MariaDB MaxScale Tutorial](MaxScale-Tutorial.md#creating-database-users).
## Setting up RabbitMQ server
@ -301,7 +302,7 @@ router=cli
type=listener
service=MaxAdmin Service
protocol=maxscaled
port=6603
socket=default
```
## Testing the setup
@ -317,11 +318,11 @@ sudo systemctl start maxscale
We can see the state of the two servers with MaxAdmin:
```
maxadmin list servers
sudo maxadmin list servers
Servers.
-------------------+-----------------+-------+-------------+--------------------
Server | Address | Port | Connections | Status
Server | Address | Port | Connections | Status
-------------------+-----------------+-------+-------------+--------------------
production-1 | 192.168.0.200 | 3306 | 0 | Running
archive-1 | 192.168.0.201 | 3000 | 0 | Running

View File

@ -166,27 +166,6 @@ typedef enum skygw_chk_t
((n) == LOG_DEBUG ? "LOG_DEBUG" : \
"Unknown log priority"))))))))
#define STRPACKETTYPE(p) ((p) == MYSQL_COM_INIT_DB ? "COM_INIT_DB" : \
((p) == MYSQL_COM_CREATE_DB ? "COM_CREATE_DB" : \
((p) == MYSQL_COM_DROP_DB ? "COM_DROP_DB" : \
((p) == MYSQL_COM_REFRESH ? "COM_REFRESH" : \
((p) == MYSQL_COM_DEBUG ? "COM_DEBUG" : \
((p) == MYSQL_COM_PING ? "COM_PING" : \
((p) == MYSQL_COM_CHANGE_USER ? "COM_CHANGE_USER" : \
((p) == MYSQL_COM_QUERY ? "COM_QUERY" : \
((p) == MYSQL_COM_SHUTDOWN ? "COM_SHUTDOWN" : \
((p) == MYSQL_COM_PROCESS_INFO ? "COM_PROCESS_INFO" : \
((p) == MYSQL_COM_CONNECT ? "COM_CONNECT" : \
((p) == MYSQL_COM_PROCESS_KILL ? "COM_PROCESS_KILL" : \
((p) == MYSQL_COM_TIME ? "COM_TIME" : \
((p) == MYSQL_COM_DELAYED_INSERT ? "COM_DELAYED_INSERT" : \
((p) == MYSQL_COM_DAEMON ? "COM_DAEMON" : \
((p) == MYSQL_COM_QUIT ? "COM_QUIT" : \
((p) == MYSQL_COM_STMT_PREPARE ? "MYSQL_COM_STMT_PREPARE" : \
((p) == MYSQL_COM_STMT_EXECUTE ? "MYSQL_COM_STMT_EXECUTE" : \
((p) == MYSQL_COM_SET_OPTION ? "MYSQL_COM_SET_OPTION" : \
"UNKNOWN MYSQL PACKET TYPE")))))))))))))))))))
#define STRDCBSTATE(s) ((s) == DCB_STATE_ALLOC ? "DCB_STATE_ALLOC" : \
((s) == DCB_STATE_POLLING ? "DCB_STATE_POLLING" : \
((s) == DCB_STATE_LISTENING ? "DCB_STATE_LISTENING" : \

View File

@ -103,4 +103,7 @@ bool is_mysql_statement_end(const char* start, int len);
bool is_mysql_sp_end(const char* start, int len);
char* modutil_get_canonical(GWBUF* querybuf);
// TODO: Move modutil out of the core
const char* STRPACKETTYPE(int p);
MXS_END_DECLS

View File

@ -85,7 +85,7 @@ bool column_is_decimal(uint8_t type);
bool fixed_string_is_enum(uint8_t type);
/** Value unpacking */
size_t unpack_temporal_value(uint8_t type, uint8_t *ptr, uint8_t* metadata, struct tm *tm);
size_t unpack_temporal_value(uint8_t type, uint8_t *ptr, uint8_t* metadata, int length, struct tm *tm);
size_t unpack_enum(uint8_t *ptr, uint8_t *metadata, uint8_t *dest);
size_t unpack_numeric_field(uint8_t *ptr, uint8_t type, uint8_t* metadata, uint8_t* val);
size_t unpack_bit(uint8_t *ptr, uint8_t *null_mask, uint32_t col_count,

View File

@ -11,7 +11,6 @@
* Public License.
*/
#include <my_config.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>

View File

@ -215,6 +215,7 @@ dcb_alloc(dcb_role_t role, SERV_LISTENER *listener)
dcb_initialize(newdcb);
newdcb->dcb_role = role;
newdcb->listener = listener;
newdcb->last_read = hkheartbeat;
return newdcb;
}
@ -616,6 +617,7 @@ dcb_connect(SERVER *server, MXS_SESSION *session, const char *protocol)
MXS_DEBUG("Reusing a persistent connection, dcb %p", dcb);
dcb->persistentstart = 0;
dcb->was_persistent = true;
dcb->last_read = hkheartbeat;
return dcb;
}
else
@ -3016,14 +3018,22 @@ void dcb_process_idle_sessions(int thr)
{
if (dcb->dcb_role == DCB_ROLE_CLIENT_HANDLER)
{
MXS_SESSION *session = dcb->session;
ss_dassert(dcb->listener);
SERVICE *service = dcb->listener->service;
if (session->service && session->client_dcb &&
session->client_dcb->state == DCB_STATE_POLLING &&
session->service->conn_idle_timeout &&
hkheartbeat - session->client_dcb->last_read > session->service->conn_idle_timeout * 10)
if (service->conn_idle_timeout && dcb->state == DCB_STATE_POLLING)
{
poll_fake_hangup_event(dcb);
int64_t idle = hkheartbeat - dcb->last_read;
int64_t timeout = service->conn_idle_timeout * 10;
if (idle > timeout)
{
MXS_WARNING("Timing out '%s'@%s, idle for %.1f seconds",
dcb->user ? dcb->user : "<unknown>",
dcb->remote ? dcb->remote : "<unknown>",
(float)idle / 10.f);
poll_fake_hangup_event(dcb);
}
}
}
}

View File

@ -58,7 +58,7 @@ void dShowThreads(DCB *dcb);
void dShowEventQ(DCB *dcb);
void dShowEventStats(DCB *dcb);
int poll_get_stat(POLL_STAT stat);
int64_t poll_get_stat(POLL_STAT stat);
RESULTSET *eventTimesGetList();
MXS_END_DECLS

View File

@ -33,6 +33,7 @@
#include <maxscale/alloc.h>
#include <maxscale/poll.h>
#include <maxscale/modutil.h>
#include <maxscale/platform.h>
/** These are used when converting MySQL wildcards to regular expressions */
static SPINLOCK re_lock = SPINLOCK_INIT;
@ -1308,3 +1309,79 @@ bool modutil_ignorable_ping(DCB *dcb)
return rval;
}
const char format_str[] = "COM_UNKNOWN(%02x)";
// The message always fits inside the buffer
thread_local char unknow_type[sizeof(format_str)] = "";
const char* STRPACKETTYPE(int p)
{
switch (p)
{
case MYSQL_COM_SLEEP:
return "MYSQL_COM_SLEEP";
case MYSQL_COM_QUIT:
return "MYSQL_COM_QUIT";
case MYSQL_COM_INIT_DB:
return "MYSQL_COM_INIT_DB";
case MYSQL_COM_QUERY:
return "MYSQL_COM_QUERY";
case MYSQL_COM_FIELD_LIST:
return "MYSQL_COM_FIELD_LIST";
case MYSQL_COM_CREATE_DB:
return "MYSQL_COM_CREATE_DB";
case MYSQL_COM_DROP_DB:
return "MYSQL_COM_DROP_DB";
case MYSQL_COM_REFRESH:
return "MYSQL_COM_REFRESH";
case MYSQL_COM_SHUTDOWN:
return "MYSQL_COM_SHUTDOWN";
case MYSQL_COM_STATISTICS:
return "MYSQL_COM_STATISTICS";
case MYSQL_COM_PROCESS_INFO:
return "MYSQL_COM_PROCESS_INFO";
case MYSQL_COM_CONNECT:
return "MYSQL_COM_CONNECT";
case MYSQL_COM_PROCESS_KILL:
return "MYSQL_COM_PROCESS_KILL";
case MYSQL_COM_DEBUG:
return "MYSQL_COM_DEBUG";
case MYSQL_COM_PING:
return "MYSQL_COM_PING";
case MYSQL_COM_TIME:
return "MYSQL_COM_TIME";
case MYSQL_COM_DELAYED_INSERT:
return "MYSQL_COM_DELAYED_INSERT";
case MYSQL_COM_CHANGE_USER:
return "MYSQL_COM_CHANGE_USER";
case MYSQL_COM_BINLOG_DUMP:
return "MYSQL_COM_BINLOG_DUMP";
case MYSQL_COM_TABLE_DUMP:
return "MYSQL_COM_TABLE_DUMP";
case MYSQL_COM_CONNECT_OUT:
return "MYSQL_COM_CONNECT_OUT";
case MYSQL_COM_REGISTER_SLAVE:
return "MYSQL_COM_REGISTER_SLAVE";
case MYSQL_COM_STMT_PREPARE:
return "MYSQL_COM_STMT_PREPARE";
case MYSQL_COM_STMT_EXECUTE:
return "MYSQL_COM_STMT_EXECUTE";
case MYSQL_COM_STMT_SEND_LONG_DATA:
return "MYSQL_COM_STMT_SEND_LONG_DATA";
case MYSQL_COM_STMT_CLOSE:
return "MYSQL_COM_STMT_CLOSE";
case MYSQL_COM_STMT_RESET:
return "MYSQL_COM_STMT_RESET";
case MYSQL_COM_SET_OPTION:
return "MYSQL_COM_SET_OPTION";
case MYSQL_COM_STMT_FETCH:
return "MYSQL_COM_STMT_FETCH";
case MYSQL_COM_DAEMON:
return "MYSQL_COM_DAEMON";
}
snprintf(unknow_type, sizeof(unknow_type), format_str, p);
return unknow_type;
}

View File

@ -25,6 +25,10 @@
#include <strings.h>
#include <math.h>
#include <maxscale/protocol/mysql.h>
static uint64_t unpack_bytes(uint8_t *ptr, size_t bytes);
/**
* @brief Convert a table column type to a string
*
@ -216,6 +220,35 @@ static void unpack_year(uint8_t *ptr, struct tm *dest)
dest->tm_year = *ptr;
}
/** Base-10 logarithm values */
int64_t log_10_values[] =
{
1,
10,
100,
1000,
10000,
100000,
1000000,
10000000,
100000000
};
/**
* If the TABLE_COL_TYPE_DATETIME type field is declared as a datetime with
* extra precision, the packed length is shorter than 8 bytes.
*/
size_t datetime_sizes[] =
{
5, // DATETIME(0)
6, // DATETIME(1)
6, // DATETIME(2)
7, // DATETIME(3)
7, // DATETIME(4)
7, // DATETIME(5)
8 // DATETIME(6)
};
/**
* @brief Unpack a DATETIME
*
@ -224,21 +257,52 @@ static void unpack_year(uint8_t *ptr, struct tm *dest)
* @param val Value read from the binary log
* @param dest Pointer where the unpacked value is stored
*/
static void unpack_datetime(uint8_t *ptr, struct tm *dest)
static void unpack_datetime(uint8_t *ptr, int length, struct tm *dest)
{
uint64_t val = 0;
memcpy(&val, ptr, sizeof(val));
uint32_t second = val - ((val / 100) * 100);
val /= 100;
uint32_t minute = val - ((val / 100) * 100);
val /= 100;
uint32_t hour = val - ((val / 100) * 100);
val /= 100;
uint32_t day = val - ((val / 100) * 100);
val /= 100;
uint32_t month = val - ((val / 100) * 100);
val /= 100;
uint32_t year = val;
int64_t val = 0;
uint32_t second, minute, hour, day, month, year;
if (length == -1)
{
val = gw_mysql_get_byte8(ptr);
second = val - ((val / 100) * 100);
val /= 100;
minute = val - ((val / 100) * 100);
val /= 100;
hour = val - ((val / 100) * 100);
val /= 100;
day = val - ((val / 100) * 100);
val /= 100;
month = val - ((val / 100) * 100);
val /= 100;
year = val;
}
else
{
// TODO: Figure out why DATETIME(0) doesn't work like it others do
val = unpack_bytes(ptr, datetime_sizes[length]);
val *= log_10_values[6 - length];
if (val < 0)
{
val = -val;
}
int subsecond = val % 1000000;
val /= 1000000;
second = val % 60;
val /= 60;
minute = val % 60;
val /= 60;
hour = val % 24;
val /= 24;
day = val % 32;
val /= 32;
month = val % 13;
val /= 13;
year = val;
}
memset(dest, 0, sizeof(struct tm));
dest->tm_year = year - 1900;
@ -391,14 +455,13 @@ size_t unpack_bit(uint8_t *ptr, uint8_t *null_mask, uint32_t col_count,
return metadata[1];
}
/**
* @brief Get the length of a temporal field
* @param type Field type
* @param decimals How many decimals the field has
* @return Number of bytes the temporal value takes
*/
static size_t temporal_field_size(uint8_t type, uint8_t decimals)
static size_t temporal_field_size(uint8_t type, uint8_t decimals, int length)
{
switch (type)
{
@ -413,7 +476,7 @@ static size_t temporal_field_size(uint8_t type, uint8_t decimals)
return 3 + ((decimals + 1) / 2);
case TABLE_COL_TYPE_DATETIME:
return 8;
return length < 0 || length > 6 ? 8 : datetime_sizes[length];
case TABLE_COL_TYPE_TIMESTAMP:
return 4;
@ -441,7 +504,7 @@ static size_t temporal_field_size(uint8_t type, uint8_t decimals)
* @param val Extracted packed value
* @param tm Pointer where the unpacked temporal value is stored
*/
size_t unpack_temporal_value(uint8_t type, uint8_t *ptr, uint8_t *metadata, struct tm *tm)
size_t unpack_temporal_value(uint8_t type, uint8_t *ptr, uint8_t *metadata, int length, struct tm *tm)
{
switch (type)
{
@ -450,7 +513,7 @@ size_t unpack_temporal_value(uint8_t type, uint8_t *ptr, uint8_t *metadata, stru
break;
case TABLE_COL_TYPE_DATETIME:
unpack_datetime(ptr, tm);
unpack_datetime(ptr, length, tm);
break;
case TABLE_COL_TYPE_DATETIME2:
@ -474,7 +537,7 @@ size_t unpack_temporal_value(uint8_t type, uint8_t *ptr, uint8_t *metadata, stru
ss_dassert(false);
break;
}
return temporal_field_size(type, *metadata);
return temporal_field_size(type, *metadata, length);
}
void format_temporal_value(char *str, size_t size, uint8_t type, struct tm *tm)

View File

@ -314,7 +314,7 @@ dShowEventStats(DCB *pdcb)
* @param what The required statistic
* @return The value of that statistic
*/
int
int64_t
poll_get_stat(POLL_STAT what)
{
return Worker::get_one_statistic(what);

View File

@ -31,7 +31,6 @@
#undef NDEBUG
#endif
#define FAILTEST(s) printf("TEST FAILED: " s "\n");return 1;
#include <my_config.h>
#include <mysql.h>
#include <stdio.h>
#include <maxscale/notification.h>

View File

@ -140,7 +140,6 @@ MaskingFilterSession* MaskingFilter::newSession(MXS_SESSION* pSession)
// static
void MaskingFilter::diagnostics(DCB* pDcb)
{
dcb_printf(pDcb, "Hello, World!\n");
}
// static

View File

@ -59,7 +59,6 @@
#define MXS_MODULE_NAME "mqfilter"
#include <my_config.h>
#include <stdio.h>
#include <fcntl.h>
#include <maxscale/filter.h>

View File

@ -2,4 +2,5 @@ install_script(cdc.py core)
install_script(cdc_users.py core)
install_script(cdc_last_transaction.py core)
install_script(cdc_kafka_producer.py core)
install_script(cdc_schema.py core)
install_file(cdc_schema.go core)

View File

@ -12,52 +12,32 @@
# Public License.
import time
import json
import re
import sys
import socket
import hashlib
import argparse
import subprocess
import selectors
import binascii
import os
# Read data as JSON
def read_json():
decoder = json.JSONDecoder()
rbuf = bytes()
ep = selectors.EpollSelector()
ep.register(sock, selectors.EVENT_READ)
def read_data():
sel = selectors.DefaultSelector()
sel.register(sock, selectors.EVENT_READ)
while True:
pollrc = ep.select(timeout=int(opts.read_timeout) if int(opts.read_timeout) > 0 else None)
try:
events = sel.select(timeout=int(opts.read_timeout) if int(opts.read_timeout) > 0 else None)
buf = sock.recv(4096, socket.MSG_DONTWAIT)
rbuf += buf
while True:
rbuf = rbuf.lstrip()
data = decoder.raw_decode(rbuf.decode('utf_8'))
rbuf = rbuf[data[1]:]
print(json.dumps(data[0]))
except ValueError as err:
sys.stdout.flush()
pass
except Exception:
if len(buf) > 0:
os.write(sys.stdout.fileno(), buf)
sys.stdout.flush()
else:
raise Exception('Socket was closed')
except BlockingIOError:
break
# Read data as Avro
def read_avro():
ep = selectors.EpollSelector()
ep.register(sock, selectors.EVENT_READ)
while True:
pollrc = ep.select(timeout=int(opts.read_timeout) if int(opts.read_timeout) > 0 else None)
try:
buf = sock.recv(4096, socket.MSG_DONTWAIT)
os.write(sys.stdout.fileno(), buf)
sys.stdout.flush()
except Exception:
except Exception as ex:
print(ex, file=sys.stderr)
break
parser = argparse.ArgumentParser(description = "CDC Binary consumer", conflict_handler="resolve")
@ -91,7 +71,4 @@ response = str(sock.recv(1024)).encode('utf_8')
# Request a data stream
sock.send(bytes(("REQUEST-DATA " + opts.FILE + (" " + opts.GTID if opts.GTID else "")).encode()))
if opts.format == "JSON":
read_json()
elif opts.format == "AVRO":
read_avro()
read_data()

View File

@ -0,0 +1,88 @@
#!/usr/bin/env python3
# Copyright (c) 2016 MariaDB Corporation Ab
#
# Use of this software is governed by the Business Source License included
# in the LICENSE.TXT file and at www.mariadb.com/bsl11.
#
# Change Date: 2019-07-01
#
# On the date above, in accordance with the Business Source License, use
# of this software will be governed by version 2 or later of the General
# Public License.
#
# This program requires the MySQL Connector/Python to work
#
import mysql.connector as mysql
import json
import sys
import argparse
parser = argparse.ArgumentParser(description = "CDC Schema Generator", conflict_handler="resolve", epilog="""This program generates CDC schema files for a specific table or all the tables in a database. The
schema files need to be generated if the binary log files do not contain the
CREATE TABLE events that define the table layout.""")
parser.add_argument("-h", "--host", dest="host", help="Network address where the connection is made", default="localhost")
parser.add_argument("-P", "--port", dest="port", help="Port where the connection is made", default="3306")
parser.add_argument("-u", "--user", dest="user", help="Username used when connecting", default="")
parser.add_argument("-p", "--password", dest="password", help="Password used when connecting", default="")
parser.add_argument("DATABASE", help="Generate Avro schemas for this database")
opts = parser.parse_args(sys.argv[1:])
def parse_field(row):
res = dict()
name = row[1].lower().split('(')[0]
if name in ("date", "datetime", "time", "timestamp", "year", "tinytext", "text",
"mediumtext", "longtext", "char", "varchar", "enum", "set"):
res["type"] = "string"
elif name in ("tinyblob", "blob", "mediumblob", "longblob", "binary", "varbinary"):
res["type"] = "bytes"
elif name in ("int", "smallint", "mediumint", "integer", "tinyint", "short", "bit"):
res["type"] = "int"
elif name in ("float"):
res["type"] = "float"
elif name in ("double", "decimal"):
res["type"] = "double"
elif name in ("null"):
res["type"] = "null"
elif name in ("long", "bigint"):
res["type"] = "long"
else:
res["type"] = "string"
res["name"] = row[0].lower()
return res
try:
conn = mysql.connect(user=opts.user, password=opts.password, host=opts.host, port=opts.port)
cursor = conn.cursor()
cursor.execute("SHOW TABLES FROM {}".format(opts.DATABASE))
tables = []
for res in cursor:
tables.append(res[0])
for t in tables:
schema = dict(namespace="MaxScaleChangeDataSchema.avro", type="record", name="ChangeRecord", fields=[])
cursor.execute("DESCRIBE {}.{}".format(opts.DATABASE, t))
for res in cursor:
schema["fields"].append(parse_field(res))
dest = open("{}.{}.000001.avsc".format(opts.DATABASE, t), 'w')
dest.write(json.dumps(schema))
dest.close()
cursor.close()
conn.close()
except Exception as e:
print(e)
exit(1)

View File

@ -473,6 +473,17 @@ void notify_all_clients(AVRO_INSTANCE *router)
}
}
void do_checkpoint(AVRO_INSTANCE *router, uint64_t *total_rows, uint64_t *total_commits)
{
update_used_tables(router);
avro_flush_all_tables(router, AVROROUTER_FLUSH);
avro_save_conversion_state(router);
notify_all_clients(router);
*total_rows += router->row_count;
*total_commits += router->trx_count;
router->row_count = router->trx_count = 0;
}
/**
* @brief Read all replication events from a binlog file.
*
@ -552,6 +563,8 @@ avro_binlog_end_t avro_read_all_events(AVRO_INSTANCE *router)
}
else
{
do_checkpoint(router, &total_rows, &total_commits);
MXS_INFO("Processed %lu transactions and %lu row events.",
total_commits, total_rows);
if (rotate_seen)
@ -739,13 +752,7 @@ avro_binlog_end_t avro_read_all_events(AVRO_INSTANCE *router)
if (router->row_count >= router->row_target ||
router->trx_count >= router->trx_target)
{
update_used_tables(router);
avro_flush_all_tables(router, AVROROUTER_SYNC);
avro_save_conversion_state(router);
notify_all_clients(router);
total_rows += router->row_count;
total_commits += router->trx_count;
router->row_count = router->trx_count = 0;
do_checkpoint(router, &total_rows, &total_commits);
}
}

View File

@ -180,6 +180,11 @@ bool handle_table_map_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr
"table until a DDL statement for it is read.", table_ident);
}
if (rval)
{
MXS_INFO("Table Map for '%s' at %lu", table_ident, router->current_pos);
}
return rval;
}
@ -307,9 +312,13 @@ bool handle_row_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr)
* beforehand so we must continue processing them until we reach the end
* of the event. */
int rows = 0;
MXS_INFO("Row Event for '%s' at %lu", table_ident, router->current_pos);
while (ptr - start < hdr->event_size - BINLOG_EVENT_HDR_LEN)
{
static uint64_t total_row_count = 1;
MXS_INFO("Row %lu", total_row_count++);
/** Add the current GTID and timestamp */
uint8_t *end = ptr + hdr->event_size - BINLOG_EVENT_HDR_LEN;
int event_type = get_event_type(hdr->event_type);
@ -525,9 +534,8 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
ptr += (ncolumns + 7) / 8;
ss_dassert(ptr < end);
for (long i = 0; i < map->columns && npresent < ncolumns; i++)
for (long i = 0; i < map->columns && i < create->columns && npresent < ncolumns; i++)
{
ss_dassert(create->columns == map->columns);
ss_debug(int rc = )avro_value_get_by_name(record, create->column_names[i], &field, NULL);
ss_dassert(rc == 0);
@ -536,6 +544,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
npresent++;
if (bit_is_set(null_bitmap, ncolumns, i))
{
MXS_INFO("[%ld] NULL", i);
if (column_is_blob(map->column_types[i]))
{
uint8_t nullvalue = 0;
@ -565,17 +574,45 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
MXS_WARNING("ENUM/SET values larger than 255 values aren't supported.");
}
avro_value_set_string(&field, strval);
MXS_INFO("[%ld] ENUM: %lu bytes", i, bytes);
ptr += bytes;
ss_dassert(ptr < end);
}
else
{
uint8_t bytes = *ptr;
/**
* The first byte in the metadata stores the real type of
* the string (ENUM and SET types are also stored as fixed
* length strings).
*
* The first two bits of the second byte contain the XOR'ed
* field length but as that information is not relevant for
* us, we just use this information to know whether to read
* one or two bytes for string length.
*/
uint16_t meta = metadata[metadata_offset + 1] + (metadata[metadata_offset] << 8);
int bytes = 0;
uint16_t extra_length = (((meta >> 4) & 0x300) ^ 0x300);
uint16_t field_length = (meta & 0xff) + extra_length;
if (field_length > 255)
{
bytes = ptr[0] + (ptr[1] << 8);
ptr += 2;
}
else
{
bytes = *ptr++;
}
MXS_INFO("[%ld] CHAR: field: %d bytes, data: %d bytes", i, field_length, bytes);
ss_dassert(bytes || *ptr == '\0');
char str[bytes + 1];
memcpy(str, ptr + 1, bytes);
memcpy(str, ptr, bytes);
str[bytes] = '\0';
avro_value_set_string(&field, str);
ptr += bytes + 1;
ptr += bytes;
ss_dassert(ptr < end);
}
}
@ -595,6 +632,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
MXS_WARNING("BIT is not currently supported, values are stored as 0.");
}
avro_value_set_int(&field, value);
MXS_INFO("[%ld] BIT", i);
ptr += bytes;
ss_dassert(ptr < end);
}
@ -603,6 +641,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
double f_value = 0.0;
ptr += unpack_decimal_field(ptr, metadata + metadata_offset, &f_value);
avro_value_set_double(&field, f_value);
MXS_INFO("[%ld] DOUBLE", i);
ss_dassert(ptr < end);
}
else if (column_is_variable_string(map->column_types[i]))
@ -620,6 +659,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
ptr++;
}
MXS_INFO("[%ld] VARCHAR: field: %d bytes, data: %lu bytes", i, bytes, sz);
char buf[sz + 1];
memcpy(buf, ptr, sz);
buf[sz] = '\0';
@ -633,6 +673,7 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
uint64_t len = 0;
memcpy(&len, ptr, bytes);
ptr += bytes;
MXS_INFO("[%ld] BLOB: field: %d bytes, data: %lu bytes", i, bytes, len);
if (len)
{
avro_value_set_bytes(&field, ptr, len);
@ -649,9 +690,12 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
{
char buf[80];
struct tm tm;
ptr += unpack_temporal_value(map->column_types[i], ptr, &metadata[metadata_offset], &tm);
ptr += unpack_temporal_value(map->column_types[i], ptr,
&metadata[metadata_offset],
create->column_lengths[i], &tm);
format_temporal_value(buf, sizeof(buf), map->column_types[i], &tm);
avro_value_set_string(&field, buf);
MXS_INFO("[%ld] TEMPORAL: %s", i, buf);
ss_dassert(ptr < end);
}
/** All numeric types (INT, LONG, FLOAT etc.) */

View File

@ -124,9 +124,11 @@ char* json_new_schema_from_table(TABLE_MAP *map)
for (uint64_t i = 0; i < map->columns; i++)
{
json_array_append(array, json_pack_ex(&err, 0, "{s:s, s:s}", "name",
create->column_names[i], "type",
column_type_to_avro_type(map->column_types[i])));
json_array_append(array, json_pack_ex(&err, 0, "{s:s, s:s, s:s, s:i}",
"name", create->column_names[i],
"type", column_type_to_avro_type(map->column_types[i]),
"real_type", create->column_types[i],
"length", create->column_lengths[i]));
}
json_object_set_new(schema, "fields", array);
char* rval = json_dumps(schema, JSON_PRESERVE_ORDER);
@ -172,8 +174,10 @@ bool json_extract_field_names(const char* filename, TABLE_CREATE *table)
{
int array_size = json_array_size(arr);
table->column_names = (char**)MXS_MALLOC(sizeof(char*) * (array_size));
table->column_types = (char**)MXS_MALLOC(sizeof(char*) * (array_size));
table->column_lengths = (int*)MXS_MALLOC(sizeof(int) * (array_size));
if (table->column_names)
if (table->column_names && table->column_types && table->column_lengths)
{
int columns = 0;
rval = true;
@ -184,6 +188,28 @@ bool json_extract_field_names(const char* filename, TABLE_CREATE *table)
if (json_is_object(val))
{
json_t* value;
if ((value = json_object_get(val, "real_type")) && json_is_string(value))
{
table->column_types[columns] = MXS_STRDUP_A(json_string_value(value));
}
else
{
table->column_types[columns] = MXS_STRDUP_A("unknown");
MXS_WARNING("No \"real_type\" value defined. Treating as unknown type field.");
}
if ((value = json_object_get(val, "length")) && json_is_integer(value))
{
table->column_lengths[columns] = json_integer_value(value);
}
else
{
table->column_lengths[columns] = -1;
MXS_WARNING("No \"length\" value defined. Treating as default length field.");
}
json_t *name = json_object_get(val, "name");
if (name && json_is_string(name))
{
@ -489,7 +515,6 @@ static const char *extract_field_name(const char* ptr, char* dest, size_t size)
dest[bytes] = '\0';
make_valid_avro_identifier(dest);
ptr = next_field_definition(ptr);
}
else
{
@ -499,56 +524,98 @@ static const char *extract_field_name(const char* ptr, char* dest, size_t size)
return ptr;
}
int extract_type_length(const char* ptr, char *dest)
{
/** Skip any leading whitespace */
while (isspace(*ptr) || *ptr == '`')
{
ptr++;
}
/** The field type definition starts here */
const char *start = ptr;
/** Skip characters until we either hit a whitespace character or the start
* of the length definition. */
while (!isspace(*ptr) && *ptr != '(')
{
ptr++;
}
/** Store type */
int typelen = ptr - start;
memcpy(dest, start, typelen);
dest[typelen] = '\0';
/** Skip whitespace */
while (isspace(*ptr))
{
ptr++;
}
int rval = -1; // No length defined
/** Start of length definition */
if (*ptr == '(')
{
ptr++;
char *end;
int val = strtol(ptr, &end, 10);
if (*end == ')')
{
rval = val;
}
}
return rval;
}
int count_columns(const char* ptr)
{
int i = 2;
while ((ptr = strchr(ptr, ',')))
{
ptr++;
i++;
}
return i;
}
/**
* Process a table definition into an array of column names
* @param nameptr table definition
* @return Number of processed columns or -1 on error
*/
static int process_column_definition(const char *nameptr, char*** dest)
static int process_column_definition(const char *nameptr, char*** dest, char*** dest_types, int** dest_lens)
{
/** Process columns in groups of 8 */
size_t chunks = 1;
const size_t chunk_size = 8;
int i = 0;
char **names = MXS_MALLOC(sizeof(char*) * (chunks * chunk_size + 1));
if (names == NULL)
{
return -1;
}
int n = count_columns(nameptr);
*dest = MXS_MALLOC(sizeof(char*) * n);
*dest_types = MXS_MALLOC(sizeof(char*) * n);
*dest_lens = MXS_MALLOC(sizeof(int) * n);
char **names = *dest;
char **types = *dest_types;
int *lengths = *dest_lens;
char colname[512];
int i = 0;
while ((nameptr = extract_field_name(nameptr, colname, sizeof(colname))))
{
if (i >= chunks * chunk_size)
{
char **tmp = MXS_REALLOC(names, (++chunks * chunk_size + 1) * sizeof(char*));
if (tmp == NULL)
{
for (int x = 0; x < i; x++)
{
MXS_FREE(names[x]);
}
MXS_FREE(names);
return -1;
}
names = tmp;
}
ss_dassert(i < n);
char type[100] = "";
int len = extract_type_length(nameptr, type);
nameptr = next_field_definition(nameptr);
fix_reserved_word(colname);
if ((names[i++] = MXS_STRDUP(colname)) == NULL)
{
for (int x = 0; x < i; x++)
{
MXS_FREE(names[x]);
}
MXS_FREE(names);
return -1;
}
lengths[i] = len;
types[i] = MXS_STRDUP_A(type);
names[i] = MXS_STRDUP_A(colname);
i++;
}
*dest = names;
return i;
}
@ -601,7 +668,7 @@ TABLE_CREATE* table_create_alloc(const char* sql, const char* event_db)
char database[MYSQL_DATABASE_MAXLEN + 1];
const char *db = event_db;
MXS_DEBUG("Create table statement: %.*s", stmt_len, statement_sql);
MXS_INFO("Create table: %s", sql);
if (!get_table_name(sql, table))
{
@ -621,8 +688,10 @@ TABLE_CREATE* table_create_alloc(const char* sql, const char* event_db)
db = database;
}
int* lengths = NULL;
char **names = NULL;
int n_columns = process_column_definition(statement_sql, &names);
char **types = NULL;
int n_columns = process_column_definition(statement_sql, &names, &types, &lengths);
ss_dassert(n_columns > 0);
/** We have appear to have a valid CREATE TABLE statement */
@ -634,6 +703,8 @@ TABLE_CREATE* table_create_alloc(const char* sql, const char* event_db)
rval->version = 1;
rval->was_used = false;
rval->column_names = names;
rval->column_lengths = lengths;
rval->column_types = types;
rval->columns = n_columns;
rval->database = MXS_STRDUP(db);
rval->table = MXS_STRDUP(table);
@ -675,8 +746,11 @@ void table_create_free(TABLE_CREATE* value)
for (uint64_t i = 0; i < value->columns; i++)
{
MXS_FREE(value->column_names[i]);
MXS_FREE(value->column_types[i]);
}
MXS_FREE(value->column_names);
MXS_FREE(value->column_types);
MXS_FREE(value->column_lengths);
MXS_FREE(value->table);
MXS_FREE(value->database);
MXS_FREE(value);
@ -792,6 +866,26 @@ void make_avro_token(char* dest, const char* src, int length)
memcpy(dest, src, length);
dest[length] = '\0';
fix_reserved_word(dest);
}
int get_column_index(TABLE_CREATE *create, const char *tok)
{
int idx = -1;
char safe_tok[strlen(tok) + 2];
strcpy(safe_tok, tok);
fix_reserved_word(safe_tok);
for (int x = 0; x < create->columns; x++)
{
if (strcasecmp(create->column_names[x], tok) == 0)
{
idx = x;
break;
}
}
return idx;
}
bool table_create_alter(TABLE_CREATE *create, const char *sql, const char *end)
@ -805,7 +899,7 @@ bool table_create_alter(TABLE_CREATE *create, const char *sql, const char *end)
if (tok)
{
MXS_DEBUG("Altering table %.*s\n", len, tok);
MXS_INFO("Alter table '%.*s'; %.*s\n", len, tok, (int)(end - sql), sql);
def = tok + len;
}
@ -844,27 +938,45 @@ bool table_create_alter(TABLE_CREATE *create, const char *sql, const char *end)
{
tok = get_tok(tok + len, &len, end);
MXS_FREE(create->column_names[create->columns - 1]);
char ** tmp = MXS_REALLOC(create->column_names, sizeof(char*) * create->columns - 1);
ss_dassert(tmp);
int idx = get_column_index(create, tok);
if (tmp == NULL)
if (idx != -1)
{
return false;
MXS_FREE(create->column_names[idx]);
for (int i = idx; i < (int)create->columns - 1; i++)
{
create->column_names[i] = create->column_names[i + 1];
}
char ** tmp = realloc(create->column_names, sizeof(char*) * create->columns - 1);
ss_dassert(tmp);
if (tmp == NULL)
{
return false;
}
create->column_names = tmp;
create->columns--;
updates++;
}
create->column_names = tmp;
create->columns--;
updates++;
tok = get_next_def(tok, end);
len = 0;
}
else if (tok_eq(ptok, "change", plen) && tok_eq(tok, "column", len))
{
tok = get_tok(tok + len, &len, end);
MXS_FREE(create->column_names[create->columns - 1]);
create->column_names[create->columns - 1] = strndup(tok, len);
updates++;
int idx = get_column_index(create, tok);
if (idx != -1)
{
MXS_FREE(create->column_names[idx]);
create->column_names[idx] = strndup(tok, len);
updates++;
}
tok = get_next_def(tok, end);
len = 0;
}
@ -975,7 +1087,6 @@ TABLE_MAP *table_map_alloc(uint8_t *ptr, uint8_t hdr_len, TABLE_CREATE* create)
map->id = table_id;
map->version = create->version;
map->flags = flags;
ss_dassert(column_count == create->columns);
map->columns = column_count;
map->column_types = MXS_MALLOC(column_count);
/** Allocate at least one byte for the metadata */

View File

@ -84,6 +84,23 @@ static const char *avro_event_type = "event_type";
static const char *avro_timestamp = "timestamp";
static char *avro_client_ouput[] = { "Undefined", "JSON", "Avro" };
static inline bool is_reserved_word(const char* word)
{
return strcasecmp(word, avro_domain) == 0 ||
strcasecmp(word, avro_server_id) == 0 ||
strcasecmp(word, avro_sequence) == 0 ||
strcasecmp(word, avro_event_number) == 0 ||
strcasecmp(word, avro_event_type) == 0 ||
strcasecmp(word, avro_timestamp) == 0;
}
static inline void fix_reserved_word(char *tok)
{
if (is_reserved_word(tok))
{
strcat(tok, "_");
}
}
/** How a binlog file is closed */
typedef enum avro_binlog_end
@ -111,6 +128,8 @@ typedef struct table_create
{
uint64_t columns;
char **column_names;
char **column_types;
int* column_lengths;
char *table;
char *database;
int version; /**< How many versions of this table have been used */

View File

@ -29,6 +29,7 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <inttypes.h>
#include <maxscale/alloc.h>
#include <maxscale/atomic.h>
@ -998,7 +999,7 @@ maxinfo_zombie_dcbs()
/**
* Interface to poll stats for reads
*/
static int
static int64_t
maxinfo_read_events()
{
return poll_get_stat(POLL_STAT_READ);
@ -1007,7 +1008,7 @@ maxinfo_read_events()
/**
* Interface to poll stats for writes
*/
static int
static int64_t
maxinfo_write_events()
{
return poll_get_stat(POLL_STAT_WRITE);
@ -1016,7 +1017,7 @@ maxinfo_write_events()
/**
* Interface to poll stats for errors
*/
static int
static int64_t
maxinfo_error_events()
{
return poll_get_stat(POLL_STAT_ERROR);
@ -1025,7 +1026,7 @@ maxinfo_error_events()
/**
* Interface to poll stats for hangup
*/
static int
static int64_t
maxinfo_hangup_events()
{
return poll_get_stat(POLL_STAT_HANGUP);
@ -1034,7 +1035,7 @@ maxinfo_hangup_events()
/**
* Interface to poll stats for accepts
*/
static int
static int64_t
maxinfo_accept_events()
{
return poll_get_stat(POLL_STAT_ACCEPT);
@ -1043,7 +1044,7 @@ maxinfo_accept_events()
/**
* Interface to poll stats for event queue length
*/
static int
static int64_t
maxinfo_event_queue_length()
{
return poll_get_stat(POLL_STAT_EVQ_LEN);
@ -1052,7 +1053,7 @@ maxinfo_event_queue_length()
/**
* Interface to poll stats for max event queue length
*/
static int
static int64_t
maxinfo_max_event_queue_length()
{
return poll_get_stat(POLL_STAT_EVQ_MAX);
@ -1061,7 +1062,7 @@ maxinfo_max_event_queue_length()
/**
* Interface to poll stats for max queue time
*/
static int
static int64_t
maxinfo_max_event_queue_time()
{
return poll_get_stat(POLL_STAT_MAX_QTIME);
@ -1070,7 +1071,7 @@ maxinfo_max_event_queue_time()
/**
* Interface to poll stats for max event execution time
*/
static int
static int64_t
maxinfo_max_event_exec_time()
{
return poll_get_stat(POLL_STAT_MAX_EXECTIME);
@ -1142,8 +1143,8 @@ status_row(RESULTSET *result, void *data)
(char *)(*status[context->index].func)());
break;
case VT_INT:
snprintf(buf, 80, "%ld",
(long)(*status[context->index].func)());
snprintf(buf, 80, "%" PRId64,
(int64_t)(*status[context->index].func)());
resultset_row_set(row, 1, buf);
break;
default:

View File

@ -13,7 +13,6 @@
#include "readwritesplit.h"
#include <my_config.h>
#include <stdio.h>
#include <strings.h>
#include <string.h>
@ -162,12 +161,12 @@ log_transaction_status(ROUTER_CLIENT_SES *rses, GWBUF *querybuf, qc_query_type_t
{
if (rses->load_data_state == LOAD_DATA_INACTIVE)
{
unsigned char command = MYSQL_GET_COMMAND(GWBUF_DATA(querybuf));
char *qtypestr = qc_typemask_to_string(qtype);
char *sql;
uint8_t *packet = GWBUF_DATA(querybuf);
unsigned char command = packet[4];
int len = 0;
char* sql;
modutil_extract_SQL(querybuf, &sql, &len);
char *qtypestr = qc_typemask_to_string(qtype);
if (len > RWSPLIT_TRACE_MSG_LEN)
{
@ -181,9 +180,9 @@ log_transaction_status(ROUTER_CLIENT_SES *rses, GWBUF *querybuf, qc_query_type_t
const char *hint = querybuf->hint == NULL ? "" : ", Hint:";
const char *hint_type = querybuf->hint == NULL ? "" : STRHINTTYPE(querybuf->hint->type);
MXS_INFO("> Autocommit: %s, trx is %s, cmd: %s, type: %s, stmt: %.*s%s %s",
autocommit, transaction, STRPACKETTYPE(command), querytype, len,
sql, hint, hint_type);
MXS_INFO("> Autocommit: %s, trx is %s, cmd: (0x%02x) %s, type: %s, stmt: %.*s%s %s",
autocommit, transaction, command, STRPACKETTYPE(command),
querytype, len, sql, hint, hint_type);
MXS_FREE(qtypestr);
}