Merge branch '2.2' into develop

This commit is contained in:
Markus Mäkelä 2018-01-19 11:47:45 +02:00
commit 998652bf24
21 changed files with 1568 additions and 426 deletions

View File

@ -18,11 +18,11 @@ then
ctest --output-on-failure || exit 1
fi
if [ $remove_strip == "yes" ] ; then
sudo rm -rf /usr/bin/strip
sudo touch /usr/bin/strip
sudo chmod a+x /usr/bin/strip
fi
# Never strip binaries
sudo rm -rf /usr/bin/strip
sudo touch /usr/bin/strip
sudo chmod a+x /usr/bin/strip
sudo make package
res=$?
if [ $res != 0 ] ; then

View File

@ -67,3 +67,16 @@ injected into the list of users.
```
authenticator_options=inject_service_user=false
```
### `lower_case_table_names`
Enable case-insensitive identifier matching for authentication. This parameter
is disabled by default.
The parameter functions exactly as the MariaDB Server system variable
[lower_case_table_names](https://mariadb.com/kb/en/library/server-system-variables/#lower_case_table_names).
This makes the matching done by the authenticator on database names to be
case-insensitive by converting all names into their lowercase form.
**Note:** The identifier names are converted using an ASCII-only function. This
means that non-ASCII characters will retain their case-sensitivity.

View File

@ -75,7 +75,7 @@ matching entry point is made.
The luafilter exposes three functions that can be called from the Lua script.
- `string lua_qc_get_type()`
- `string lua_qc_get_type_mask()`
- Returns the type of the current query being executed as a string. The values
are the string versions of the query types defined in _query_classifier.h_
@ -94,3 +94,46 @@ The luafilter exposes three functions that can be called from the Lua script.
- This function generates unique integers that can be used to distinct
sessions from each other.
## Example Configuration and Script
Here is a minimal configuration entry for a luafilter definition.
```
[MyLuaFilter]
type=filter
module=luafilter
global_script=/path/to/script.lua
```
And here is a script that opens a file in `/tmp/` and logs output to it.
```
f = io.open("/tmp/test.log", "a+")
function createInstance()
f:write("createInstance\n")
end
function newSession(a, b)
f:write("newSession for: " .. a .. "@" .. b .. "\n")
end
function closeSession()
f:write("closeSession\n")
end
function routeQuery(string)
f:write("routeQuery: " .. string .. " -- type: " .. lua_qc_get_type_mask() .. " operation: " .. lua_qc_get_operation() .. "\n")
end
function clientReply()
f:write("clientReply\n")
end
function diagnostic()
f:write("diagnostics\n")
return "Hello from Lua!"
end
```

View File

@ -2,11 +2,13 @@
## Overview
The Query Log All (QLA) filter is a filter module for MariaDB MaxScale that is able to log all query content on a per client session basis. Logs are written in a csv format file that lists the time submitted and the SQL statement text.
The Query Log All (QLA) filter logs query content. Logs are written to a file in
CSV format. Log elements are configurable and include the time submitted and the
SQL statement text, among others.
## Configuration
The configuration block for the QLA filter requires the minimal filter options in its section within the maxscale.cnf file, stored in /etc/maxscale.cnf.
A minimal configuration is below.
```
[MyLogFilter]
type=filter
@ -31,72 +33,64 @@ The QLA filter accepts the following options.
case | Use case-sensitive matching
extended | Use extended regular expression syntax (ERE)
To use multiple filter options, list them in a comma-separated list. If no options are given, default will be used. Multiple options can be enabled simultaneously.
To use multiple filter options, list them in a comma-separated list. If no
options are given, default will be used. Multiple options can be enabled
simultaneously.
```
options=case,extended
```
**Note**: older the version of the QLA filter in 0.7 of MariaDB MaxScale used the `options`
to define the location of the log files. This functionality is not supported
anymore and the `filebase` parameter should be used instead.
**Note**: older the version of the QLA filter in 0.7 of MariaDB MaxScale used
the `options` to define the location of the log files. This functionality is not
supported anymore and the `filebase` parameter should be used instead.
## Filter Parameters
The QLA filter has one mandatory parameter, `filebase`, and a number of optional parameters. These were introduced in the 1.0 release of MariaDB MaxScale.
The QLA filter has one mandatory parameter, `filebase`, and a number of optional
parameters. These were introduced in the 1.0 release of MariaDB MaxScale.
### `filebase`
The basename of the output file created for each session. A session index is added to the filename for each file written. This is a mandatory parameter.
The basename of the output file created for each session. A session index is
added to the filename for each written session file. For unified log files,
*.unified* is appended. This is a mandatory parameter.
```
filebase=/tmp/SqlQueryLog
```
The filebase may also be set as the filter option, the mechanism to set the filebase via the filter option is superseded by the parameter. If both are set the parameter setting will be used and the filter option ignored.
The filebase may also be set as the filter option. If both option and parameter
are set, the parameter setting will be used and the filter option ignored.
### `match`
### `match` and `exclude`
An optional parameter that can be used to limit the queries that will be logged by the QLA filter. The parameter value is a regular expression that is used to match against the SQL text. Only SQL statements that matches the text passed as the value of this parameter will be logged.
These optional parameters limit logging on a query level. The parameter values
are regular expressions which are matched against the SQL query text. Only SQL
statements that match the regular expression in *match* but do not match the
*exclude* expression are logged.
```
match=select.*from.*customer.*where
exclude=^insert
```
All regular expressions are evaluated with the option to ignore the case of the text, therefore a match option of select will match both select, SELECT and any form of the word with upper or lowercase characters.
*match* is checked before *exclude*. If *match* is empty, all queries are
considered matching. If *exclude* is empty, no query is exluded. If both are
empty, all queries are logged.
### `exclude`
### `user` and `source`
An optional parameter that can be used to limit the queries that will be logged by the QLA filter. The parameter value is a regular expression that is used to match against the SQL text. SQL statements that match the text passed as the value of this parameter will be excluded from the log output.
```
exclude=where
```
All regular expressions are evaluated with the option to ignore the case of the text, therefore an exclude option of select will exclude statements that contain both select, SELECT or any form of the word with upper or lowercase characters.
### `source`
The optional source parameter defines an address that is used to match against the address from which the client connection to MariaDB MaxScale originates. Only sessions that originate from this address will be logged.
```
source=127.0.0.1
```
### `user`
The optional user parameter defines a user name that is used to match against the user from which the client connection to MariaDB MaxScale originates. Only sessions that are connected using this username are logged.
These optional parameters limit logging on a session level. If `user` is
defined, only the sessions with a matching client username are logged. If
`source` is defined, only sessions with a matching client source address are
logged.
```
user=john
source=127.0.0.1
```
-----------------------------------------------------------
**The following parameters were added in MaxScale 2.1.0**
-----------------------------------------------------------
### `log_type`
The type of log file to use. The default value is _session_.
@ -110,11 +104,13 @@ The type of log file to use. The default value is _session_.
log_type=session
```
If both logs are required, define `log_type=session,unified`.
### `log_data`
Type of data to log in the log files. Parameter value is a comma separated list
of the following values. By default the _date_, _user_ and _query_ options are
enabled.
Type of data to log in the log files. The parameter value is a comma separated
list of the following elements. By default the _date_, _user_ and _query_
options are enabled.
| Value | Description |
| -------- |--------------------------------------------------|
@ -129,8 +125,9 @@ enabled.
log_data=date, user, query
```
If *reply_time* is enabled, the log entry is written when the first reply from server is received.
Otherwise, the entry is written when receiving query from client.
If *reply_time* is enabled, the log entry is written when the first reply from
server is received. Otherwise, the entry is written when receiving query from
client.
### `flush`
@ -142,17 +139,42 @@ flush=true
### `append`
Append new entries to log files instead of overwriting them. The default is false.
Append new entries to log files instead of overwriting them. The default is
false.
```
append=true
```
### `separator`
Default value is "," (a comma). Defines the separator string between elements of
a log entry. The value should be enclosed in quotes.
```
separator=" | "
```
### `newline_replacement`
Default value is " " (one space). SQL-queries may include line breaks, which, if
printed directly to the log, may break automatic parsing. This parameter defines
what should be written in the place of a newline sequence (\r, \n or \r\n). If
this is set as the empty string, then newlines are not replaced and printed as
is to the output. The value should be enclosed in quotes.
```
newline_replacement=" NL "
```
## Examples
### Example 1 - Query without primary key
Imagine you have observed an issue with a particular table and you want to determine if there are queries that are accessing that table but not using the primary key of the table. Let's assume the table name is PRODUCTS and the primary key is called PRODUCT_ID. Add a filter with the following definition:
Imagine you have observed an issue with a particular table and you want to
determine if there are queries that are accessing that table but not using the
primary key of the table. Let's assume the table name is PRODUCTS and the
primary key is called PRODUCT_ID. Add a filter with the following definition:
```
[ProductsSelectLogger]
@ -171,8 +193,10 @@ passwd=mypasswd
filters=ProductsSelectLogger
```
The result of then putting this filter into the service used by the application would be a log file of all select queries that mentioned the table but did not mention the PRODUCT_ID primary key in the predicates for the query.
Executing `SELECT * FROM PRODUCTS` would log the following into `/var/logs/qla/SelectProducts`:
The result of using this filter with the service used by the application would
be a log file of all select queries querying PRODUCTS without using the
PRODUCT_ID primary key in the predicates of the query. Executing `SELECT * FROM
PRODUCTS` would log the following into `/var/logs/qla/SelectProducts`:
```
07:12:56.324 7/01/2016, SELECT * FROM PRODUCTS
```

View File

@ -277,6 +277,18 @@ The backends must all use GTID-based replication, and the domain id should not
change during a switchover or failover. Master and slaves must have
well-behaving GTIDs with no extra events on slave servers.
Switchover requires that the cluster is "frozen" for the duration of the
operation. This means that no data modifying statements such as INSERT or UPDATE
are executed and the GTID position of the master server is stable. When
switchover begins, the monitor sets the global *read_only* flag on the old
master backend to stop any updates. *read_only* does not affect users with the
SUPER-privilege so any such user can issue writes during a switchover. These
writes have a high chance to break replication, because the write may not be
replicated to all slaves before they switch to the new master. To prevent this,
any users who commonly do updates should not have the SUPER-privilege. For even
more security, the only SUPER-user session during a switchover should be the
MaxScale monitor user.
### Configuration parameters
#### `auto_failover`

View File

@ -8,8 +8,8 @@
set(MARIADB_CONNECTOR_C_REPO "https://github.com/MariaDB/mariadb-connector-c.git"
CACHE STRING "MariaDB Connector-C Git repository")
# Release 2.3.3 (preliminary) of the Connector-C
set(MARIADB_CONNECTOR_C_TAG "v3.0.2"
# Connector-C tag to use
set(MARIADB_CONNECTOR_C_TAG "v3.0.3"
CACHE STRING "MariaDB Connector-C Git tag")
ExternalProject_Add(connector-c

View File

@ -123,8 +123,9 @@ mysqlmon_failover_rejoin_old_slave
mysqlmon_failover_rolling_master
mysqlmon_failover_rolling_restart_slaves
mysqlmon_failover_stress
mysqlmon_switchover_bad_master
mysqlmon_switchover
mysqlmon_switchover_bad_master
mysqlmon_switchover_stress
mxs1045
mxs1071_maxrows
mxs1110_16mb

View File

@ -290,8 +290,12 @@ add_test_executable(mysqlmon_failover_rejoin_old_slave.cpp mysqlmon_failover_rej
# MySQL Monitor rolling restart slaves
add_test_executable(mysqlmon_failover_rolling_restart_slaves.cpp mysqlmon_failover_rolling_restart_slaves mysqlmon_failover_rolling_restart_slaves LABELS mysqlmon REPL_BACKEND)
# MySQL Monitor failover stress
add_test_executable(mysqlmon_failover_stress.cpp mysqlmon_failover_stress mysqlmon_failover_stress LABELS mysqlmon REPL_BACKEND)
# MySQL Monitor switchover stress
add_test_executable(mysqlmon_switchover_stress.cpp mysqlmon_switchover_stress mysqlmon_switchover_stress LABELS mysqlmon REPL_BACKEND)
# Test monitor state change events when manually clearing server bits
add_test_executable(false_monitor_state_change.cpp false_monitor_state_change replication LABELS mysqlmon REPL_BACKEND)

View File

@ -0,0 +1,94 @@
[maxscale]
threads=###threads###
[MySQL-Monitor]
type=monitor
module=mysqlmon
servers=server1, server2, server3, server4
user=maxskysql
passwd=skysql
monitor_interval=1000
allow_cluster_recovery=true
detect_standalone_master=true
auto_failover=false
auto_rejoin=false
replication_user=repl
replication_password=repl
backend_connect_timeout=5
backend_read_timeout=5
backend_write_timeout=5
[RW-Split-Router]
type=service
router= readwritesplit
servers=server1, server2, server3, server4
user=maxskysql
passwd=skysql
[Read-Connection-Router-Slave]
type=service
router=readconnroute
router_options= slave
servers=server1, server2, server3, server4
user=maxskysql
passwd=skysql
[Read-Connection-Router-Master]
type=service
router=readconnroute
router_options=master
servers=server1, server2, server3, server4
user=maxskysql
passwd=skysql
[RW-Split-Listener]
type=listener
service=RW-Split-Router
protocol=MySQLClient
port=4006
[Read-Connection-Listener-Slave]
type=listener
service=Read-Connection-Router-Slave
protocol=MySQLClient
port=4009
[Read-Connection-Listener-Master]
type=listener
service=Read-Connection-Router-Master
protocol=MySQLClient
port=4008
[CLI]
type=service
router=cli
[CLI Listener]
type=listener
service=CLI
protocol=maxscaled
socket=default
[server1]
type=server
address=###node_server_IP_1###
port=###node_server_port_1###
protocol=MySQLBackend
[server2]
type=server
address=###node_server_IP_2###
port=###node_server_port_2###
protocol=MySQLBackend
[server3]
type=server
address=###node_server_IP_3###
port=###node_server_port_3###
protocol=MySQLBackend
[server4]
type=server
address=###node_server_IP_4###
port=###node_server_port_4###
protocol=MySQLBackend

View File

@ -26,7 +26,7 @@ int main(int argc, char** argv)
// Wait for the avrorouter to process the data
sleep(10);
test.check_log_err("Possible STATEMENT or MIXED", true);
test.check_log_err(0, "Possible STATEMENT or MIXED", true);
return test.global_result;
}

View File

@ -0,0 +1,649 @@
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2020-01-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include <iostream>
#include <iterator>
#include <string>
#include <sstream>
#include <thread>
#include <vector>
#include "testconnections.h"
#include "fail_switch_rejoin_common.cpp"
using namespace std;
// How often the monitor checks the server state.
// NOTE: Ensure this is identical with the value in the configuration file.
const time_t MONITOR_INTERVAL = 1;
// After how many seconds should the switchover operation surely have
// been performed. Not very critical.
const time_t SWITCHOVER_DURATION = 5;
// How long should we keep in running.
const time_t TEST_DURATION = 90;
const char* CLIENT_USER = "mysqlmon_switchover_stress";
const char* CLIENT_PASSWORD = "mysqlmon_switchover_stress";
#define CMESSAGE(msg) \
do {\
stringstream ss;\
ss << "client(" << m_id << ") : " << msg << "\n";\
cout << ss.str() << flush;\
} while (false)
#if !defined(NDEBUG)
#define ss_dassert(x) do { if (!(x)) { fprintf(stderr, "Assertion failed: %s", #x); abort(); } } while(false)
#define ss_debug(x) x
#else
#define ss_dassert(s)
#define ss_debug(x)
#endif
namespace
{
class Client
{
public:
enum
{
DEFAULT_N_CLIENTS = 4,
DEFAULT_N_ROWS = 100
};
static void init(TestConnections& test, size_t nClients, size_t nRows)
{
s_nClients = nClients;
s_nRows = nRows;
if (create_tables(test))
{
if (insert_data(test))
{
cout << "\nSyncing slaves." << endl;
test.repl->sync_slaves();
}
}
}
static void start(bool verbose,
const char* zHost, int port, const char* zUser, const char* zPassword)
{
for (size_t i = 0; i < s_nClients; ++i)
{
s_threads.push_back(std::thread(&Client::thread_main,
i, verbose, zHost, port, zUser, zPassword));
}
}
static void stop()
{
s_shutdown = true;
for (size_t i = 0; i < s_nClients; ++i)
{
s_threads[i].join();
}
}
private:
Client(int id, bool verbose)
: m_id(id)
, m_verbose(verbose)
, m_value(1)
{
ss_debug(int rv);
unsigned int seed = (time(NULL) << m_id);
ss_debug(rv =) initstate_r(seed, m_initstate, sizeof(m_initstate), &m_random_data);
ss_dassert(rv == 0);
ss_debug(rv=) srandom_r(seed, &m_random_data);
ss_dassert(rv == 0);
}
enum action_t
{
ACTION_SELECT,
ACTION_UPDATE
};
action_t action() const
{
double d = random_decimal_fraction();
// 20% updates
// 80% selects
if (d <= 0.2)
{
return ACTION_UPDATE;
}
else
{
return ACTION_SELECT;
}
}
bool run(MYSQL* pConn)
{
bool rv = false;
switch (action())
{
case ACTION_SELECT:
rv = run_select(pConn);
break;
case ACTION_UPDATE:
rv = run_update(pConn);
break;
default:
ss_dassert(!true);
}
return rv;
}
bool run_select(MYSQL* pConn)
{
bool rv = true;
string stmt("SELECT * FROM test.t");
stmt += std::to_string(m_id);
stmt += " WHERE id=";
stmt += std::to_string(get_random_id());
if (mysql_query(pConn, stmt.c_str()) == 0)
{
flush_response(pConn);
}
else
{
if (m_verbose)
{
CMESSAGE("\"" << stmt << "\" failed: " << mysql_error(pConn));
}
rv = false;
}
return rv;
}
bool run_update(MYSQL* pConn)
{
bool rv = true;
string stmt("UPDATE test.t");
stmt += std::to_string(m_id);
stmt += " SET id=";
stmt += std::to_string(m_value);
stmt += " WHERE id=";
stmt += std::to_string(get_random_id());
m_value = (m_value + 1) % s_nRows;
if (mysql_query(pConn, stmt.c_str()) == 0)
{
flush_response(pConn);
}
else
{
if (m_verbose)
{
CMESSAGE("\"" << stmt << "\" failed: " << mysql_error(pConn));
}
rv = false;
}
return rv;
}
static void flush_response(MYSQL* pConn)
{
do
{
MYSQL_RES* pRes = mysql_store_result(pConn);
mysql_free_result(pRes);
}
while (mysql_next_result(pConn) == 0);
}
int get_random_id() const
{
int id = s_nRows * random_decimal_fraction();
ss_dassert(id >= 0);
ss_dassert(id <= s_nRows);
return id;
}
double random_decimal_fraction() const
{
int32_t r;
ss_debug(int rv=) random_r(&m_random_data, &r);
ss_dassert(rv == 0);
return double(r) / RAND_MAX;
}
void run(const char* zHost, int port, const char* zUser, const char* zPassword)
{
do
{
MYSQL* pMysql = mysql_init(NULL);
if (pMysql)
{
unsigned int timeout = 5;
mysql_options(pMysql, MYSQL_OPT_CONNECT_TIMEOUT, &timeout);
mysql_options(pMysql, MYSQL_OPT_READ_TIMEOUT, &timeout);
mysql_options(pMysql, MYSQL_OPT_WRITE_TIMEOUT, &timeout);
if (m_verbose)
{
CMESSAGE("Connecting");
}
if (mysql_real_connect(pMysql, zHost, zUser, zPassword, "test", port, NULL, 0))
{
if (m_verbose)
{
CMESSAGE("Connected.");
}
while (!s_shutdown && run(pMysql))
{
;
}
}
else
{
if (m_verbose)
{
CMESSAGE("mysql_real_connect() failed: " << mysql_error(pMysql));
}
}
if (m_verbose)
{
CMESSAGE("Closing");
}
mysql_close(pMysql);
}
else
{
CMESSAGE("mysql_init() failed.");
}
// To prevent some backend from becoming overwhelmed.
sleep(1);
}
while (!s_shutdown);
}
static void thread_main(int i, bool verbose,
const char* zHost, int port, const char* zUser, const char* zPassword)
{
if (mysql_thread_init() == 0)
{
Client client(i, verbose);
client.run(zHost, port, zUser, zPassword);
mysql_thread_end();
}
else
{
int m_id = i;
CMESSAGE("mysql_thread_init() failed.");
}
}
static bool create_tables(TestConnections& test)
{
cout << "\nCreating tables." << endl;
MYSQL* pConn = test.maxscales->conn_rwsplit[0];
string drop_head("DROP TABLE IF EXISTS test.t");
string create_head("CREATE TABLE test.t");
string create_tail(" (id INT)");
for (size_t i = 0; i < s_nClients; ++i)
{
string drop = drop_head + std::to_string(i);
test.try_query(pConn, drop.c_str());
string create = create_head + std::to_string(i) + create_tail;
test.try_query(pConn, create.c_str());
}
return test.ok();
}
static bool insert_data(TestConnections& test)
{
cout << "\nInserting data." << endl;
MYSQL* pConn = test.maxscales->conn_rwsplit[0];
for (size_t i = 0; i < s_nClients; ++i)
{
string insert("insert into test.t");
insert += std::to_string(i);
insert += " values ";
for (size_t j = 0; j < s_nRows; ++j)
{
insert += "(";
insert += std::to_string(j);
insert += ")";
if (j < s_nRows - 1)
{
insert += ", ";
}
}
test.try_query(pConn, insert.c_str());
}
return test.ok();
}
private:
enum
{
INITSTATE_SIZE = 32
};
size_t m_id;
bool m_verbose;
size_t m_value;
char m_initstate[INITSTATE_SIZE];
mutable struct random_data m_random_data;
static size_t s_nClients;
static size_t s_nRows;
static bool s_shutdown;
static std::vector<std::thread> s_threads;
};
size_t Client::s_nClients;
size_t Client::s_nRows;
bool Client::s_shutdown;
std::vector<std::thread> Client::s_threads;
}
namespace
{
void list_servers(TestConnections& test)
{
test.maxscales->execute_maxadmin_command_print(0, (char*)"list servers");
}
void sleep(int s)
{
cout << "Sleeping " << s << " times 1 second" << flush;
do
{
::sleep(1);
cout << "." << flush;
--s;
}
while (s > 0);
cout << endl;
}
bool check_server_status(TestConnections& test, int id)
{
bool is_master = false;
Mariadb_nodes* pRepl = test.repl;
string server = string("server") + std::to_string(id);
StringSet statuses = test.get_server_status(server.c_str());
std::ostream_iterator<string> oi(cout, " ");
cout << server << ": ";
std::copy(statuses.begin(), statuses.end(), oi);
cout << " => ";
if (statuses.count("Master"))
{
is_master = true;
cout << "OK" << endl;
}
else if (statuses.count("Slave"))
{
cout << "OK" << endl;
}
else if (statuses.count("Running"))
{
MYSQL* pConn = pRepl->nodes[id - 1];
char result[1024];
if (find_field(pConn, "SHOW SLAVE STATUS", "Last_IO_Error", result) == 0)
{
cout << result << endl;
test.assert(false, "Server is neither slave, nor master.");
}
else
{
cout << "?" << endl;
test.assert(false, "Could not execute \"SHOW SLAVE STATUS\"");
}
}
else
{
cout << "?" << endl;
test.assert(false, "Unexpected server state for %s.", server.c_str());
}
return is_master;
}
void check_server_statuses(TestConnections& test)
{
int masters = 0;
masters += check_server_status(test, 1);
masters += check_server_status(test, 2);
masters += check_server_status(test, 3);
masters += check_server_status(test, 4);
test.assert(masters == 1, "Unpexpected number of masters: %d", masters);
}
int get_next_master_id(TestConnections& test, int current_id)
{
int next_id = current_id;
do
{
next_id = (next_id + 1) % 5;
if (next_id == 0)
{
next_id = 1;
}
ss_dassert(next_id >= 1);
ss_dassert(next_id <= 4);
string server("server");
server += std::to_string(next_id);
StringSet states = test.get_server_status(server.c_str());
if (states.count("Slave") != 0)
{
break;
}
}
while (next_id != current_id);
return next_id != current_id ? next_id : -1;
}
void create_client_user(TestConnections& test)
{
string stmt;
// Drop user
stmt = "DROP USER IF EXISTS ";
stmt += "'";
stmt += CLIENT_USER;
stmt += "'@'%%'";
test.try_query(test.maxscales->conn_rwsplit[0], stmt.c_str());
// Create user
stmt = "CREATE USER ";
stmt += "'";
stmt += CLIENT_USER;
stmt += "'@'%%'";
stmt += " IDENTIFIED BY ";
stmt += "'";
stmt += CLIENT_PASSWORD;
stmt += "'";
test.try_query(test.maxscales->conn_rwsplit[0], stmt.c_str());
// Grant access
stmt = "GRANT SELECT, INSERT, UPDATE ON *.* TO ";
stmt += "'";
stmt += CLIENT_USER;
stmt += "'@'%%'";
test.try_query(test.maxscales->conn_rwsplit[0], stmt.c_str());
test.try_query(test.maxscales->conn_rwsplit[0], "FLUSH PRIVILEGES");
}
void switchover(TestConnections& test, int next_master_id, int current_master_id)
{
cout << "\nTrying to do manual switchover from server" << current_master_id
<< " to server" << next_master_id << endl;
string command("call command mysqlmon switchover MySQL-Monitor ");
command += "server";
command += std::to_string(next_master_id);
command += " ";
command += "server";
command += std::to_string(current_master_id);
cout << "\nCommand: " << command << endl;
test.maxscales->execute_maxadmin_command_print(0, (char*)command.c_str());
sleep(1);
list_servers(test);
}
void run(TestConnections& test)
{
int n_threads = Client::DEFAULT_N_CLIENTS;
cout << "\nConnecting to MaxScale." << endl;
test.maxscales->connect_maxscale();
create_client_user(test);
Client::init(test, Client::DEFAULT_N_CLIENTS, Client::DEFAULT_N_ROWS);
if (test.ok())
{
const char* zHost = test.maxscales->IP[0];
int port = test.maxscales->rwsplit_port[0];
const char* zUser = CLIENT_USER;
const char* zPassword = CLIENT_PASSWORD;
cout << "Connecting to " << zHost << ":" << port << " as " << zUser << ":" << zPassword << endl;
cout << "Starting clients." << endl;
Client::start(test.verbose, zHost, port, zUser, zPassword);
time_t start = time(NULL);
list_servers(test);
int current_master_id = 1;
while ((test.global_result == 0) && (time(NULL) - start < TEST_DURATION))
{
sleep(SWITCHOVER_DURATION);
int next_master_id = get_next_master_id(test, current_master_id);
if (next_master_id != -1)
{
switchover(test, next_master_id, current_master_id);
current_master_id = next_master_id;
sleep(SWITCHOVER_DURATION);
int master_id = get_master_server_id(test);
if (master_id < 0)
{
test.assert(false, "No master available after switchover.");
}
else if (master_id != current_master_id)
{
test.assert(false,
"Master should have been server%d, but it was server%d.",
current_master_id, master_id);
}
}
else
{
test.assert(false,
"Could not find any slave to switch to.");
}
}
cout << "\nStopping clients.\n" << flush;
Client::stop();
// Ensure master is at server1. Shortens startup time for next test.
if (current_master_id != 1)
{
switchover(test, 1, current_master_id);
}
test.repl->close_connections();
test.repl->connect();
check_server_statuses(test);
}
}
}
int main(int argc, char* argv[])
{
std::ios::sync_with_stdio(true);
Mariadb_nodes::require_gtid(true);
TestConnections test(argc, argv);
run(test);
return test.global_result;
}

View File

@ -1398,15 +1398,15 @@ int main(int argc, char **argv)
char* cnf_file_arg = NULL; /*< conf filename from cmd-line arg */
THREAD log_flush_thr;
char* tmp_path;
char* tmp_var;
int option_index;
int *syslog_enabled = &config_get_global_options()->syslog; /** Log to syslog */
int *maxlog_enabled = &config_get_global_options()->maxlog; /** Log with MaxScale */
int *log_to_shm = &config_get_global_options()->log_to_shm; /** Log to shared memory */
MXS_CONFIG* cnf = config_get_global_options();
ss_dassert(cnf);
int *syslog_enabled = &cnf->syslog; /** Log to syslog */
int *maxlog_enabled = &cnf->maxlog; /** Log with MaxScale */
int *log_to_shm = &cnf->log_to_shm; /** Log to shared memory */
ssize_t log_flush_timeout_ms = 0;
sigset_t sigpipe_mask;
sigset_t saved_mask;
bool config_check = false;
bool to_stdout = false;
void (*exitfunp[4])(void) = { mxs_log_finish, cleanup_process_datadir, write_footer, NULL };
int numlocks = 0;
@ -1415,7 +1415,6 @@ int main(int argc, char **argv)
const char* specified_user = NULL;
config_set_global_defaults();
MXS_CONFIG* cnf = config_get_global_options();
ss_dassert(cnf);
maxscale_reset_starttime();
@ -1719,7 +1718,7 @@ int main(int argc, char **argv)
goto return_main;
case 'c':
config_check = true;
cnf->config_check = true;
break;
case 'p':
@ -1753,7 +1752,7 @@ int main(int argc, char **argv)
goto return_main;
}
if (config_check)
if (cnf->config_check)
{
daemon_mode = false;
to_stdout = true;
@ -1918,7 +1917,7 @@ int main(int argc, char **argv)
{
bool succp;
if (mkdir(get_logdir(), 0777) != 0 && errno != EEXIST)
if (!cnf->config_check && mkdir(get_logdir(), 0777) != 0 && errno != EEXIST)
{
fprintf(stderr,
"Error: Cannot create log directory: %s\n",
@ -1965,19 +1964,23 @@ int main(int argc, char **argv)
MXS_NOTICE("Commit: %s", MAXSCALE_COMMIT);
#endif
/*
* Set the data directory. We use a unique directory name to avoid conflicts
* if multiple instances of MaxScale are being run on the same machine.
*/
if (create_datadir(get_datadir(), datadir))
if (!cnf->config_check)
{
set_process_datadir(datadir);
}
else
{
MXS_ERROR("Cannot create data directory '%s': %d %s\n",
datadir, errno, mxs_strerror(errno));
goto return_main;
/*
* Set the data directory. We use a unique directory name to avoid conflicts
* if multiple instances of MaxScale are being run on the same machine.
*/
if (create_datadir(get_datadir(), datadir))
{
set_process_datadir(datadir);
}
else
{
char errbuf[MXS_STRERROR_BUFLEN];
MXS_ERROR("Cannot create data directory '%s': %d %s\n",
datadir, errno, strerror_r(errno, errbuf, sizeof(errbuf)));
goto return_main;
}
}
if (!daemon_mode)
@ -2027,9 +2030,7 @@ int main(int argc, char **argv)
goto return_main;
}
cnf->config_check = config_check;
if (!config_check)
if (!cnf->config_check)
{
/** Check if a MaxScale process is already running */
if (pid_file_exists())
@ -2130,7 +2131,7 @@ int main(int argc, char **argv)
goto return_main;
}
if (config_check)
if (cnf->config_check)
{
MXS_NOTICE("Configuration was successfully verified.");
rc = MAXSCALE_SHUTDOWN;

View File

@ -184,7 +184,10 @@ int validate_mysql_user(MYSQL_AUTH* instance, DCB *dcb, MYSQL_session *session,
uint8_t *scramble, size_t scramble_len)
{
sqlite3 *handle = get_handle(instance);
size_t len = sizeof(mysqlauth_validate_user_query) + strlen(session->user) * 2 +
const char* validate_query = instance->lower_case_table_names ?
mysqlauth_validate_user_query_lower :
mysqlauth_validate_user_query;
size_t len = strlen(validate_query) + 1 + strlen(session->user) * 2 +
strlen(session->db) * 2 + MYSQL_HOST_MAXLEN + session->auth_token_len * 4 + 1;
char sql[len + 1];
int rval = MXS_AUTH_FAILED;
@ -196,7 +199,7 @@ int validate_mysql_user(MYSQL_AUTH* instance, DCB *dcb, MYSQL_session *session,
}
else
{
sprintf(sql, mysqlauth_validate_user_query, session->user, dcb->remote,
sprintf(sql, validate_query, session->user, dcb->remote,
dcb->remote, session->db, session->db);
}
@ -212,7 +215,7 @@ int validate_mysql_user(MYSQL_AUTH* instance, DCB *dcb, MYSQL_session *session,
if (!res.ok && strchr(dcb->remote, ':') && strchr(dcb->remote, '.'))
{
const char *ipv4 = strrchr(dcb->remote, ':') + 1;
sprintf(sql, mysqlauth_validate_user_query, session->user, ipv4, ipv4,
sprintf(sql, validate_query, session->user, ipv4, ipv4,
session->db, session->db);
if (sqlite3_exec(handle, sql, auth_cb, &res, &err) != SQLITE_OK)
@ -231,7 +234,7 @@ int validate_mysql_user(MYSQL_AUTH* instance, DCB *dcb, MYSQL_session *session,
char client_hostname[MYSQL_HOST_MAXLEN] = "";
get_hostname(dcb, client_hostname, sizeof(client_hostname) - 1);
sprintf(sql, mysqlauth_validate_user_query, session->user, client_hostname,
sprintf(sql, validate_query, session->user, client_hostname,
client_hostname, session->db, session->db);
if (sqlite3_exec(handle, sql, auth_cb, &res, &err) != SQLITE_OK)

View File

@ -179,6 +179,7 @@ static void* mysql_auth_init(char **options)
instance->inject_service_user = true;
instance->skip_auth = false;
instance->check_permissions = true;
instance->lower_case_table_names = false;
for (int i = 0; options[i]; i++)
{
@ -204,6 +205,10 @@ static void* mysql_auth_init(char **options)
{
instance->skip_auth = config_truth_value(value);
}
else if (strcmp(options[i], "lower_case_table_names") == 0)
{
instance->lower_case_table_names = config_truth_value(value);
}
else
{
MXS_ERROR("Unknown authenticator option: %s", options[i]);

View File

@ -66,6 +66,12 @@ static const char mysqlauth_validate_user_query[] =
" WHERE user = '%s' AND ( '%s' = host OR '%s' LIKE host) AND (anydb = '1' OR '%s' = '' OR '%s' LIKE db)"
" LIMIT 1";
/** Query that checks if there's a grant for the user being authenticated */
static const char mysqlauth_validate_user_query_lower[] =
"SELECT password FROM " MYSQLAUTH_USERS_TABLE_NAME
" WHERE user = '%s' AND ( '%s' = host OR '%s' LIKE host) AND (anydb = '1' OR '%s' = '' OR LOWER('%s') LIKE LOWER(db))"
" LIMIT 1";
/** Query that only checks if there's a matching user */
static const char mysqlauth_skip_auth_query[] =
"SELECT password FROM " MYSQLAUTH_USERS_TABLE_NAME
@ -111,6 +117,7 @@ typedef struct mysql_auth
bool inject_service_user; /**< Inject the service user into the list of users */
bool skip_auth; /**< Authentication will always be successful */
bool check_permissions;
bool lower_case_table_names; /**< Disable database case-sensitivity */
} MYSQL_AUTH;
/**

View File

@ -39,9 +39,9 @@ char VERSION_STRING[] = "V1.0.0";
bool masking_command_reload(const MODULECMD_ARG* pArgs, json_t** output)
{
ss_dassert(pArgs->argc == 1);
ss_dassert(MODULECMD_GET_TYPE(&pArgs->argv[1].type) == MODULECMD_ARG_FILTER);
ss_dassert(MODULECMD_GET_TYPE(&pArgs->argv[0].type) == MODULECMD_ARG_FILTER);
const MXS_FILTER_DEF* pFilterDef = pArgs->argv[1].value.filter;
const MXS_FILTER_DEF* pFilterDef = pArgs->argv[0].value.filter;
ss_dassert(pFilterDef);
MaskingFilter* pFilter = reinterpret_cast<MaskingFilter*>(filter_def_get_instance(pFilterDef));
@ -178,9 +178,17 @@ bool MaskingFilter::reload()
if (sRules.get())
{
MXS_NOTICE("Rules for masking filter '%s' were reloaded from '%s'.",
m_config.name().c_str(), m_config.rules().c_str());
m_sRules = sRules;
rval = true;
}
else
{
MXS_ERROR("Rules for masking filter '%s' could not be reloaded from '%s'.",
m_config.name().c_str(), m_config.rules().c_str());
}
return rval;
}

View File

@ -485,9 +485,11 @@ MaskingRules::MatchRule::MatchRule(const std::string& column,
const std::vector<SAccount>& applies_to,
const std::vector<SAccount>& exempted,
pcre2_code* regexp,
const std::string& value,
const std::string& fill)
: MaskingRules::Rule::Rule(column, table, database, applies_to, exempted)
, m_regexp(regexp)
, m_value(value)
, m_fill(fill)
{
}
@ -767,13 +769,15 @@ bool rule_get_values(json_t* pRule,
*
* @param pRule The Json rule doc
* @param pMatch The string buffer for 'match'value
* @param pValue The string buffer for 'value' value
* @param pFill The string buffer for 'fill' value
*
* @return True on success, false on errors
*/
bool rule_get_match_fill(json_t* pRule,
std::string *pMatch,
std::string* pFill)
bool rule_get_match_value_fill(json_t* pRule,
std::string *pMatch,
std::string* pValue,
std::string* pFill)
{
// Get the 'with' key from the rule
json_t* pWith = json_object_get(pRule, KEY_WITH);
@ -794,17 +798,22 @@ bool rule_get_match_fill(json_t* pRule,
// Get fill from 'with' object
json_t* pTheFill = rule_get_fill(pWith);
// Get value from 'with' object
json_t* pTheValue = json_object_get(pWith, KEY_VALUE);
// Get 'match' from 'replace' ojbect
json_t* pTheMatch = json_object_get(pKeyObj, KEY_MATCH);
// Check values
// Check values: 'match' and 'fill' are mandatory (if not provided, there will be
// a default 'fill'), while 'value' is optional, but if provided it must be a string.
if ((!pTheFill || !json_is_string(pTheFill)) ||
(pTheValue && !json_is_string(pTheValue)) ||
((!pTheMatch || !json_is_string(pTheMatch))))
{
MXS_ERROR("A masking '%s' rule has '%s' and/or '%s' "
MXS_ERROR("A masking '%s' rule has '%s', '%s' and/or '%s' "
"invalid Json strings.",
KEY_REPLACE,
KEY_MATCH,
KEY_VALUE,
KEY_FILL);
return false;
}
@ -814,6 +823,11 @@ bool rule_get_match_fill(json_t* pRule,
pFill->assign(json_string_value(pTheFill));
pMatch->assign(json_string_value(pTheMatch));
if (pTheValue)
{
pValue->assign(json_string_value(pTheValue));
}
return true;
}
}
@ -984,9 +998,10 @@ auto_ptr<MaskingRules::Rule> MaskingRules::MatchRule::create_from(json_t* pRule)
&table,
&database,
KEY_REPLACE) &&
rule_get_match_fill(pRule, // get match/fill
&match,
&fill))
rule_get_match_value_fill(pRule, // get match/value/fill
&match,
&value,
&fill))
{
if (!match.empty() && !fill.empty())
@ -1004,6 +1019,7 @@ auto_ptr<MaskingRules::Rule> MaskingRules::MatchRule::create_from(json_t* pRule)
applies_to,
exempted,
pCode,
value,
fill));
// Ownership of pCode has been moved to the MatchRule object.
@ -1162,7 +1178,14 @@ void MaskingRules::MatchRule::rewrite(LEncString& s) const
}
// Copy the fill string into substring
fill_buffer(m_fill.begin(), m_fill.end(), i, i + substring_len);
if (m_value.length() == substring_len)
{
std::copy(m_value.begin(), m_value.end(), i);
}
else
{
fill_buffer(m_fill.begin(), m_fill.end(), i, i + substring_len);
}
// Set offset to the end of Full Match substring or break
startoffset = ovector[1];
@ -1187,7 +1210,7 @@ void MaskingRules::ObfuscateRule::rewrite(LEncString& s) const
LEncString::iterator i = s.begin();
size_t c = *i + i_len;
for (LEncString::iterator i = s.begin(); i <= s.end(); i++)
for (LEncString::iterator i = s.begin(); i != s.end(); i++)
{
// ASCII 32 is first printable char
unsigned char d = abs((char)(*i ^ c)) + 32;

View File

@ -268,6 +268,7 @@ public:
const std::vector<SAccount>& applies_to,
const std::vector<SAccount>& exempted,
pcre2_code* regexp,
const std::string& value,
const std::string& fill);
~MatchRule();
@ -277,6 +278,10 @@ public:
return *m_regexp;
}
const std::string& value() const
{
return m_value;
}
const std::string& fill() const
{
return m_fill;
@ -301,6 +306,7 @@ public:
private:
pcre2_code* m_regexp;
std::string m_value;
std::string m_fill;
private:

View File

@ -107,9 +107,9 @@ typedef struct
char *unified_filename; /* Filename of the unified log file */
bool flush_writes; /* Flush log file after every write? */
bool append; /* Open files in append-mode? */
/* Avoid repeatedly printing some errors/warnings. */
bool write_warning_given;
char *query_newline; /* Character(s) used to replace a newline within a query */
char *separator; /* Character(s) used to separate elements */
bool write_warning_given; /* Avoid repeatedly printing some errors/warnings. */
} QLA_INSTANCE;
/**
@ -156,8 +156,8 @@ typedef struct
LOG_EVENT_DATA event_data; /* Information about the latest event, required if logging execution time. */
} QLA_SESSION;
static FILE* open_log_file(uint32_t, QLA_INSTANCE *, const char *);
static int write_log_entry(uint32_t, FILE*, QLA_INSTANCE*, QLA_SESSION*,
static FILE* open_log_file(QLA_INSTANCE *, uint32_t, const char *);
static int write_log_entry(FILE*, QLA_INSTANCE*, QLA_SESSION*, uint32_t,
const char*, const char*, size_t, int);
static bool cb_log(const MODULECMD_ARG *argv, json_t** output);
@ -197,6 +197,8 @@ static const char PARAM_LOG_TYPE[] = "log_type";
static const char PARAM_LOG_DATA[] = "log_data";
static const char PARAM_FLUSH[] = "flush";
static const char PARAM_APPEND[] = "append";
static const char PARAM_NEWLINE[] = "newline_replacement";
static const char PARAM_SEPARATOR[] = "separator";
MXS_BEGIN_DECLS
@ -300,6 +302,18 @@ MXS_MODULE* MXS_CREATE_MODULE()
MXS_MODULE_OPT_NONE,
log_data_values
},
{
PARAM_NEWLINE,
MXS_MODULE_PARAM_QUOTEDSTRING,
" ",
MXS_MODULE_OPT_NONE
},
{
PARAM_SEPARATOR,
MXS_MODULE_PARAM_QUOTEDSTRING,
",",
MXS_MODULE_OPT_NONE
},
{
PARAM_FLUSH,
MXS_MODULE_PARAM_BOOL,
@ -350,6 +364,8 @@ createInstance(const char *name, char **options, MXS_CONFIG_PARAMETER *params)
my_instance->flush_writes = config_get_bool(params, PARAM_FLUSH);
my_instance->log_file_data_flags = config_get_enum(params, PARAM_LOG_DATA, log_data_values);
my_instance->log_mode_flags = config_get_enum(params, PARAM_LOG_TYPE, log_type_values);
my_instance->query_newline = config_copy_string(params, PARAM_NEWLINE);
my_instance->separator = config_copy_string(params, PARAM_SEPARATOR);
my_instance->match = config_copy_string(params, PARAM_MATCH);
my_instance->exclude = config_copy_string(params, PARAM_EXCLUDE);
@ -378,8 +394,7 @@ createInstance(const char *name, char **options, MXS_CONFIG_PARAMETER *params)
{
snprintf(filename, namelen, "%s.unified", my_instance->filebase);
// Open the file. It is only closed at program exit
my_instance->unified_fp = open_log_file(my_instance->log_file_data_flags,
my_instance, filename);
my_instance->unified_fp = open_log_file(my_instance, my_instance->log_file_data_flags, filename);
if (my_instance->unified_fp == NULL)
{
@ -413,6 +428,8 @@ createInstance(const char *name, char **options, MXS_CONFIG_PARAMETER *params)
MXS_FREE(my_instance->filebase);
MXS_FREE(my_instance->source);
MXS_FREE(my_instance->user_name);
MXS_FREE(my_instance->query_newline);
MXS_FREE(my_instance->separator);
MXS_FREE(my_instance);
my_instance = NULL;
}
@ -485,7 +502,7 @@ newSession(MXS_FILTER *instance, MXS_SESSION *session)
{
uint32_t data_flags = (my_instance->log_file_data_flags &
~LOG_DATA_SESSION); // No point printing "Session"
my_session->fp = open_log_file(data_flags, my_instance, my_session->filename);
my_session->fp = open_log_file(my_instance, data_flags, my_session->filename);
if (my_session->fp == NULL)
{
@ -574,22 +591,21 @@ setUpstream(MXS_FILTER *instance, MXS_FILTER_SESSION *session, MXS_UPSTREAM *ups
*
* @param my_instance Filter instance
* @param my_session Filter session
* @param date_string Date string
* @param query Query string, not 0-terminated
* @param querylen Query string length
* @param date_string Date string
* @param elapsed_ms Query execution time, in milliseconds
*/
void write_log_entries(QLA_INSTANCE* my_instance, QLA_SESSION* my_session,
const char* query, int querylen, const char* date_string, int elapsed_ms)
const char* date_string, const char* query, int querylen, int elapsed_ms)
{
bool write_error = false;
if (my_instance->log_mode_flags & CONFIG_FILE_SESSION)
{
// In this case there is no need to write the session
// number into the files.
uint32_t data_flags = (my_instance->log_file_data_flags &
~LOG_DATA_SESSION);
if (write_log_entry(data_flags, my_session->fp, my_instance, my_session,
uint32_t data_flags = (my_instance->log_file_data_flags & ~LOG_DATA_SESSION);
if (write_log_entry(my_session->fp, my_instance, my_session, data_flags,
date_string, query, querylen, elapsed_ms) < 0)
{
write_error = true;
@ -598,7 +614,7 @@ void write_log_entries(QLA_INSTANCE* my_instance, QLA_SESSION* my_session,
if (my_instance->log_mode_flags & CONFIG_FILE_UNIFIED)
{
uint32_t data_flags = my_instance->log_file_data_flags;
if (write_log_entry(data_flags, my_instance->unified_fp, my_instance, my_session,
if (write_log_entry(my_instance->unified_fp, my_instance, my_session, data_flags,
date_string, query, querylen, elapsed_ms) < 0)
{
write_error = true;
@ -667,7 +683,7 @@ routeQuery(MXS_FILTER *instance, MXS_FILTER_SESSION *session, GWBUF *queue)
else
{
// If execution times are not logged, write the log entry now.
write_log_entries(my_instance, my_session, query, query_len, event.query_date, -1);
write_log_entries(my_instance, my_session, event.query_date, query, query_len, -1);
}
}
/* Pass the query downstream */
@ -703,7 +719,7 @@ clientReply(MXS_FILTER *instance, MXS_FILTER_SESSION *session, GWBUF *queue)
// Calculate elapsed time in milliseconds.
double elapsed_ms = 1E3 * (now.tv_sec - event.begin_time.tv_sec) +
(now.tv_nsec - event.begin_time.tv_nsec) / (double)1E6;
write_log_entries(my_instance, my_session, query, query_len, event.query_date,
write_log_entries(my_instance, my_session, event.query_date, query, query_len,
std::floor(elapsed_ms + 0.5));
clear(event);
}
@ -811,12 +827,13 @@ static uint64_t getCapabilities(MXS_FILTER* instance)
/**
* Open the log file and print a header if appropriate.
* @param data_flags Data save settings flags
*
* @param instance The filter instance
* @param data_flags Data save settings flags
* @param filename Target file path
* @return A valid file on success, null otherwise.
*/
static FILE* open_log_file(uint32_t data_flags, QLA_INSTANCE *instance, const char *filename)
static FILE* open_log_file(QLA_INSTANCE *instance, uint32_t data_flags, const char *filename)
{
bool file_existed = false;
FILE *fp = NULL;
@ -843,68 +860,56 @@ static FILE* open_log_file(uint32_t data_flags, QLA_INSTANCE *instance, const ch
}
}
}
if (fp && !file_existed)
if (fp && !file_existed && data_flags != 0)
{
// Print a header. Luckily, we know the header has limited length
const char SERVICE[] = "Service,";
const char SESSION[] = "Session,";
const char DATE[] = "Date,";
const char USERHOST[] = "User@Host,";
const char QUERY[] = "Query,";
const char REPLY_TIME[] = "Reply_time,";
const int headerlen = sizeof(SERVICE) + sizeof(SERVICE) + sizeof(DATE) +
sizeof(USERHOST) + sizeof(QUERY) + sizeof(REPLY_TIME);
// Print a header.
const char SERVICE[] = "Service";
const char SESSION[] = "Session";
const char DATE[] = "Date";
const char USERHOST[] = "User@Host";
const char QUERY[] = "Query";
const char REPLY_TIME[] = "Reply_time";
char print_str[headerlen];
memset(print_str, '\0', headerlen);
std::stringstream header;
const char* curr_sep = ""; // Use empty string as the first separator
const char* real_sep = instance->separator;
char *current_pos = print_str;
if (instance->log_file_data_flags & LOG_DATA_SERVICE)
if (data_flags & LOG_DATA_SERVICE)
{
strcat(current_pos, SERVICE);
current_pos += sizeof(SERVICE) - 1;
header << SERVICE;
curr_sep = real_sep;
}
if (instance->log_file_data_flags & LOG_DATA_SESSION)
if (data_flags & LOG_DATA_SESSION)
{
strcat(current_pos, SESSION);
current_pos += sizeof(SERVICE) - 1;
header << curr_sep << SESSION;
curr_sep = real_sep;
}
if (instance->log_file_data_flags & LOG_DATA_DATE)
if (data_flags & LOG_DATA_DATE)
{
strcat(current_pos, DATE);
current_pos += sizeof(DATE) - 1;
header << curr_sep << DATE;
curr_sep = real_sep;
}
if (instance->log_file_data_flags & LOG_DATA_USER)
if (data_flags & LOG_DATA_USER)
{
strcat(current_pos, USERHOST);
current_pos += sizeof(USERHOST) - 1;
header << curr_sep << USERHOST;
curr_sep = real_sep;
}
if (instance->log_file_data_flags & LOG_DATA_REPLY_TIME)
if (data_flags & LOG_DATA_REPLY_TIME)
{
strcat(current_pos, REPLY_TIME);
current_pos += sizeof(REPLY_TIME) - 1;
header << curr_sep << REPLY_TIME;
curr_sep = real_sep;
}
if (instance->log_file_data_flags & LOG_DATA_QUERY)
if (data_flags & LOG_DATA_QUERY)
{
strcat(current_pos, QUERY);
current_pos += sizeof(QUERY) - 1;
}
if (current_pos > print_str)
{
// Overwrite the last ','.
*(current_pos - 1) = '\n';
}
else
{
// Nothing to print
return fp;
header << curr_sep << QUERY;
}
header << '\n';
// Finally, write the log header.
int written = fprintf(fp, "%s", print_str);
int written = fprintf(fp, "%s", header.str().c_str());
if ((written <= 0) ||
((instance->flush_writes) && (fflush(fp) < 0)))
if ((written <= 0) || ((instance->flush_writes) && (fflush(fp) < 0)))
{
// Weird error, file opened but a write failed. Best to stop.
fclose(fp);
@ -915,151 +920,129 @@ static FILE* open_log_file(uint32_t data_flags, QLA_INSTANCE *instance, const ch
return fp;
}
static void print_string_replace_newlines(const char *sql_string,
size_t sql_str_len, const char* rep_newline,
std::stringstream* output)
{
ss_dassert(output);
size_t line_begin = 0;
size_t search_pos = 0;
while (search_pos < sql_str_len)
{
int line_end_chars = 0;
// A newline is either \r\n, \n or \r
if (sql_string[search_pos] == '\r')
{
if (search_pos + 1 < sql_str_len && sql_string[search_pos + 1] == '\n')
{
// Got \r\n
line_end_chars = 2;
}
else
{
// Just \r
line_end_chars = 1;
}
}
else if (sql_string[search_pos] == '\n')
{
// Just \n
line_end_chars = 1;
}
if (line_end_chars > 0)
{
// Found line ending characters, write out the line excluding line end.
output->write(&sql_string[line_begin], search_pos - line_begin);
*output << rep_newline;
// Next line begins after line end chars
line_begin = search_pos + line_end_chars;
// For \r\n, advance search_pos
search_pos += line_end_chars - 1;
}
search_pos++;
}
// Print anything left
if (line_begin < sql_str_len)
{
output->write(&sql_string[line_begin], sql_str_len - line_begin);
}
}
/**
* Write an entry to the log file.
*
* @param data_flags Controls what to write
* @param logfile Target file
* @param instance Filter instance
* @param session Filter session
* @param data_flags Controls what to write
* @param time_string Date entry
* @param sql_string SQL-query, *not* NULL terminated
* @param sql_str_len Length of SQL-string
* @param elapsed_ms Query execution time, in milliseconds
* @return The number of characters written, or a negative value on failure
*/
static int write_log_entry(uint32_t data_flags, FILE *logfile, QLA_INSTANCE *instance,
QLA_SESSION *session, const char *time_string, const char *sql_string,
size_t sql_str_len, int elapsed_ms)
static int write_log_entry(FILE *logfile, QLA_INSTANCE *instance, QLA_SESSION *session, uint32_t data_flags,
const char *time_string, const char *sql_string, size_t sql_str_len,
int elapsed_ms)
{
ss_dassert(logfile != NULL);
size_t print_len = 0;
/**
* First calculate an upper limit for the total length. The strlen()-calls
* could be removed if the values would be saved into the instance or session
* or if we had some reasonable max lengths. (Apparently there are max lengths
* but they are much higher than what is typically needed.)
*/
if (data_flags == 0)
{
// Nothing to print
return 0;
}
/* Printing to the file in parts would likely cause garbled printing if several threads write
* simultaneously, so we have to first print to a string. */
std::stringstream output;
const char* curr_sep = ""; // Use empty string as the first separator
const char* real_sep = instance->separator;
// The numbers have some extra for delimiters.
const size_t integer_chars = 20; // Enough space for any integer type
if (data_flags & LOG_DATA_SERVICE)
{
print_len += strlen(session->service) + 1;
output << session->service;
curr_sep = real_sep;
}
if (data_flags & LOG_DATA_SESSION)
{
print_len += integer_chars;
output << curr_sep << session->ses_id;
curr_sep = real_sep;
}
if (data_flags & LOG_DATA_DATE)
{
print_len += QLA_DATE_BUFFER_SIZE + 1;
output << curr_sep << time_string;
curr_sep = real_sep;
}
if (data_flags & LOG_DATA_USER)
{
print_len += strlen(session->user) + strlen(session->remote) + 2;
output << curr_sep << session->user << "@" << session->remote;
curr_sep = real_sep;
}
if (data_flags & LOG_DATA_REPLY_TIME)
{
print_len += integer_chars;
output << curr_sep << elapsed_ms;
curr_sep = real_sep;
}
if (data_flags & LOG_DATA_QUERY)
{
print_len += sql_str_len + 1; // Can't use strlen, not null-terminated
}
if (print_len == 0)
{
return 0; // Nothing to print
}
/* Allocate space for a buffer. Printing to the file in parts would likely
cause garbled printing if several threads write simultaneously, so we
have to first print to a string. */
char *print_str = NULL;
if ((print_str = (char*)MXS_CALLOC(print_len, sizeof(char))) == NULL)
{
return -1;
}
bool error = false;
char *current_pos = print_str;
int rval = 0;
if (!error && (data_flags & LOG_DATA_SERVICE))
{
if ((rval = sprintf(current_pos, "%s,", session->service)) < 0)
output << curr_sep;
if (*instance->query_newline)
{
error = true;
print_string_replace_newlines(sql_string, sql_str_len, instance->query_newline, &output);
}
else
{
current_pos += rval;
// The newline replacement is an empty string so print the query as is
output.write(sql_string, sql_str_len); // non-null-terminated string
}
}
if (!error && (data_flags & LOG_DATA_SESSION))
{
if ((rval = sprintf(current_pos, "%lu,", session->ses_id)) < 0)
{
error = true;
}
else
{
current_pos += rval;
}
}
if (!error && (data_flags & LOG_DATA_DATE))
{
if ((rval = sprintf(current_pos, "%s,", time_string)) < 0)
{
error = true;
}
else
{
current_pos += rval;
}
}
if (!error && (data_flags & LOG_DATA_USER))
{
if ((rval = sprintf(current_pos, "%s@%s,", session->user, session->remote)) < 0)
{
error = true;
}
else
{
current_pos += rval;
}
}
if (!error && (data_flags & LOG_DATA_REPLY_TIME))
{
if ((rval = sprintf(current_pos, "%d,", elapsed_ms)) < 0)
{
error = true;
}
else
{
current_pos += rval;
}
}
if (!error && (data_flags & LOG_DATA_QUERY))
{
strncat(current_pos, sql_string, sql_str_len); // non-null-terminated string
current_pos += sql_str_len + 1; // +1 to move to the next char after
}
if (error || current_pos <= print_str)
{
MXS_FREE(print_str);
MXS_ERROR("qlafilter ('%s'): Failed to format log event.", instance->name);
return -1;
}
else
{
// Overwrite the last ','. The rest is already filled with 0.
*(current_pos - 1) = '\n';
}
output << "\n";
// Finally, write the log event.
int written = fprintf(logfile, "%s", print_str);
MXS_FREE(print_str);
int written = fprintf(logfile, "%s", output.str().c_str());
if ((!instance->flush_writes) || (written <= 0))
{

View File

@ -98,6 +98,7 @@ static void set_slave_heartbeat(MXS_MONITOR *, MXS_MONITORED_SERVER *);
static int add_slave_to_master(long *, int, long);
static bool isMySQLEvent(mxs_monitor_event_t event);
void check_maxscale_schema_replication(MXS_MONITOR *monitor);
static MySqlServerInfo* get_server_info(const MYSQL_MONITOR* handle, const MXS_MONITORED_SERVER* db);
static bool mon_process_failover(MYSQL_MONITOR*, uint32_t, bool*);
static bool do_failover(MYSQL_MONITOR* mon, json_t** output);
static bool do_switchover(MYSQL_MONITOR* mon, MXS_MONITORED_SERVER* current_master,
@ -107,9 +108,19 @@ static bool update_replication_settings(MXS_MONITORED_SERVER *database, MySqlSer
static bool query_one_row(MXS_MONITORED_SERVER *database, const char* query, unsigned int expected_cols,
StringVector* output);
static void read_server_variables(MXS_MONITORED_SERVER* database, MySqlServerInfo* serv_info);
static int check_and_join_cluster(MYSQL_MONITOR* mon);
static bool server_is_rejoin_suspect(MYSQL_MONITOR* mon, MXS_MONITORED_SERVER* server,
MySqlServerInfo* master_info);
static bool get_joinable_servers(MYSQL_MONITOR* mon, ServerVector* output);
static uint32_t do_rejoin(MYSQL_MONITOR* mon, const ServerVector& servers);
static bool join_cluster(MXS_MONITORED_SERVER* server, const char* change_cmd);
static void disable_setting(MYSQL_MONITOR* mon, const char* setting);
static bool cluster_can_be_joined(MYSQL_MONITOR* mon);
static bool can_replicate_from(MYSQL_MONITOR* mon,
MXS_MONITORED_SERVER* slave, MySqlServerInfo* slave_info,
MXS_MONITORED_SERVER* master, MySqlServerInfo* master_info);
static bool wait_cluster_stabilization(MYSQL_MONITOR* mon, MXS_MONITORED_SERVER* new_master,
const ServerVector& slaves, int seconds_remaining);
static string get_connection_errors(const ServerVector& servers);
static bool report_version_err = true;
static const char* hb_table_name = "maxscale_schema.replication_heartbeat";
@ -528,6 +539,118 @@ bool mysql_handle_failover(const MODULECMD_ARG* args, json_t** output)
return rv;
}
/**
* Perform user-activated rejoin
*
* @param mon Cluster monitor
* @param rejoin_server Server to join
* @param output Json error output
* @return True on success
*/
bool mysql_rejoin(MXS_MONITOR* mon, SERVER* rejoin_server, json_t** output)
{
MYSQL_MONITOR *handle = static_cast<MYSQL_MONITOR*>(mon->handle);
bool stopped = stop_monitor(mon);
if (stopped)
{
MXS_NOTICE("Stopped monitor %s for the duration of rejoin.", mon->name);
}
else
{
MXS_NOTICE("Monitor %s already stopped, rejoin can proceed.", mon->name);
}
bool rval = false;
if (cluster_can_be_joined(handle))
{
MXS_MONITORED_SERVER* mon_server = NULL;
// Search for the MONITORED_SERVER. Could this be a general monitor function?
for (MXS_MONITORED_SERVER* iterator = mon->monitored_servers;
iterator != NULL && mon_server == NULL;
iterator = iterator->next)
{
if (iterator->server == rejoin_server)
{
mon_server = iterator;
}
}
if (mon_server)
{
MXS_MONITORED_SERVER* master = handle->master;
MySqlServerInfo* master_info = get_server_info(handle, master);
MySqlServerInfo* server_info = get_server_info(handle, mon_server);
if (server_is_rejoin_suspect(handle, mon_server, master_info) &&
update_gtids(handle, master, master_info) &&
can_replicate_from(handle, mon_server, server_info, master, master_info))
{
ServerVector joinable_server;
joinable_server.push_back(mon_server);
if (do_rejoin(handle, joinable_server) == 1)
{
rval = true;
MXS_NOTICE("Rejoin performed.");
}
else
{
PRINT_MXS_JSON_ERROR(output, "Rejoin attempted but failed.");
}
}
else
{
PRINT_MXS_JSON_ERROR(output, "Server is not eligible for rejoin or eligibility could not be "
"ascertained.");
}
}
else
{
PRINT_MXS_JSON_ERROR(output, "The given server '%s' is not monitored by this monitor.",
rejoin_server->unique_name);
}
}
else
{
const char BAD_CLUSTER[] = "The server cluster of monitor '%s' is not in a state valid for joining. "
"Either it has no master or its gtid domain is unknown.";
PRINT_MXS_JSON_ERROR(output, BAD_CLUSTER, mon->name);
}
if (stopped)
{
startMonitor(mon, mon->parameters);
}
return rval;
}
/**
* Command handler for 'rejoin'
*
* @param args Arguments given by user
* @param output Json error output
* @return True on success
*/
bool mysql_handle_rejoin(const MODULECMD_ARG* args, json_t** output)
{
ss_dassert(args->argc == 2);
ss_dassert(MODULECMD_GET_TYPE(&args->argv[0].type) == MODULECMD_ARG_MONITOR);
ss_dassert(MODULECMD_GET_TYPE(&args->argv[1].type) == MODULECMD_ARG_SERVER);
MXS_MONITOR* mon = args->argv[0].value.monitor;
SERVER* server = args->argv[1].value.server;
bool rv = false;
if (!config_get_global_options()->passive)
{
rv = mysql_rejoin(mon, server, output);
}
else
{
PRINT_MXS_JSON_ERROR(output, "Rejoin attempted but not performed, as MaxScale is in passive mode.");
}
return rv;
}
/**
* The module entry point routine. It is this routine that
* must populate the structure that is referred to as the
@ -542,7 +665,7 @@ extern "C"
MXS_MODULE* MXS_CREATE_MODULE()
{
MXS_NOTICE("Initialise the MySQL Monitor module.");
const char ARG_MONITOR_DESC[] = "MySQL Monitor name (from configuration file)";
static const char ARG_MONITOR_DESC[] = "MySQL Monitor name (from configuration file)";
static modulecmd_arg_type_t switchover_argv[] =
{
{
@ -569,6 +692,19 @@ extern "C"
mysql_handle_failover, MXS_ARRAY_NELEMS(failover_argv),
failover_argv, "Perform master failover");
static modulecmd_arg_type_t rejoin_argv[] =
{
{
MODULECMD_ARG_MONITOR | MODULECMD_ARG_NAME_MATCHES_DOMAIN,
ARG_MONITOR_DESC
},
{ MODULECMD_ARG_SERVER, "Joining server" }
};
modulecmd_register_command(MXS_MODULE_NAME, "rejoin", MODULECMD_TYPE_ACTIVE,
mysql_handle_rejoin, MXS_ARRAY_NELEMS(rejoin_argv),
rejoin_argv, "Rejoin server to a cluster");
static MXS_MONITOR_OBJECT MyObject =
{
startMonitor,
@ -1257,11 +1393,11 @@ static bool do_show_slave_status(MYSQL_MONITOR* mon,
const char* last_io_error = mxs_mysql_get_value(result, row, "Last_IO_Error");
const char* last_sql_error = mxs_mysql_get_value(result, row, "Last_SQL_Error");
ss_dassert(beats && period && using_gtid && master_host && master_port &&
last_io_error && last_sql_error);
last_io_error && last_sql_error);
serv_info->slave_status.master_host = master_host;
serv_info->slave_status.master_port = atoi(master_port);
serv_info->slave_status.last_error = *last_io_error ? last_io_error :
(*last_sql_error ? last_sql_error : "");
(*last_sql_error ? last_sql_error : "");
int heartbeats = atoi(beats);
if (serv_info->slave_heartbeats < heartbeats)
@ -1276,7 +1412,8 @@ static bool do_show_slave_status(MYSQL_MONITOR* mon,
const char* gtid_io_pos = mxs_mysql_get_value(result, row, "Gtid_IO_Pos");
ss_dassert(gtid_io_pos);
serv_info->slave_status.gtid_io_pos = gtid_io_pos[0] != '\0' ?
Gtid(gtid_io_pos, mon->master_gtid_domain) : Gtid();
Gtid(gtid_io_pos, mon->master_gtid_domain) :
Gtid();
}
else
{
@ -2220,7 +2357,8 @@ monitorMain(void *arg)
{
monitor_clear_pending_status(root_master,
SERVER_SLAVE | SERVER_SLAVE_OF_EXTERNAL_MASTER);
server_clear_status_nolock(root_master->server, SERVER_SLAVE | SERVER_SLAVE_OF_EXTERNAL_MASTER);
server_clear_status_nolock(root_master->server,
SERVER_SLAVE | SERVER_SLAVE_OF_EXTERNAL_MASTER);
}
}
@ -2319,23 +2457,30 @@ monitorMain(void *arg)
// Do not auto-join servers on this monitor loop if a failover (or any other cluster modification)
// has been performed, as server states have not been updated yet. It will happen next iteration.
if (handle->auto_rejoin && !failover_performed &&
handle->master != NULL && SERVER_IS_MASTER(handle->master->server) &&
handle->master_gtid_domain >= 0)
if (handle->auto_rejoin && !failover_performed && cluster_can_be_joined(handle))
{
// Check if any servers should be autojoined to the cluster
int joins = check_and_join_cluster(handle);
if (joins < 0)
ServerVector joinable_servers;
if (get_joinable_servers(handle, &joinable_servers))
{
MXS_ERROR("A cluster join operation failed, disabling automatic rejoining. "
"To re-enable, manually set '%s' to 'true' for monitor '%s' via MaxAdmin or "
"the REST API.", CN_AUTO_REJOIN, mon->name);
handle->auto_rejoin = false;
disable_setting(handle, CN_AUTO_REJOIN);
uint32_t joins = do_rejoin(handle, joinable_servers);
if (joins > 0)
{
MXS_NOTICE("%d server(s) redirected or rejoined the cluster.", joins);
}
if (joins < joinable_servers.size())
{
MXS_ERROR("A cluster join operation failed, disabling automatic rejoining. "
"To re-enable, manually set '%s' to 'true' for monitor '%s' via MaxAdmin or "
"the REST API.", CN_AUTO_REJOIN, mon->name);
handle->auto_rejoin = false;
disable_setting(handle, CN_AUTO_REJOIN);
}
}
else if (joins > 0)
else
{
MXS_NOTICE("%d server(s) redirected or rejoined the cluster.", joins);
MXS_ERROR("Query error to master '%s' prevented a possible rejoin operation.",
handle->master->server->unique_name);
}
}
@ -2393,10 +2538,36 @@ getSlaveOfNodeId(MXS_MONITORED_SERVER *ptr, long node_id, slave_down_setting_t s
return NULL;
}
/**
* Simple wrapper for mxs_mysql_query and mysql_num_rows
*
* @param database Database connection
* @param query Query to execute
*
* @return Number of rows or -1 on error
*/
static int get_row_count(MXS_MONITORED_SERVER *database, const char* query)
{
int returned_rows = -1;
if (mxs_mysql_query(database->con, query) == 0)
{
MYSQL_RES* result = mysql_store_result(database->con);
if (result)
{
returned_rows = mysql_num_rows(result);
mysql_free_result(result);
}
}
return returned_rows;
}
/*******
* This function sets the replication heartbeat
* into the maxscale_schema.replication_heartbeat table in the current master.
* The inserted values will be seen from all slaves replication from this master.
* The inserted values will be seen from all slaves replicating from this master.
*
* @param handle The monitor handle
* @param database The number database server
@ -2417,42 +2588,25 @@ static void set_master_heartbeat(MYSQL_MONITOR *handle, MXS_MONITORED_SERVER *da
return;
}
/* check if the maxscale_schema database and replication_heartbeat table exist */
if (mxs_mysql_query(database->con, "SELECT table_name FROM information_schema.tables "
"WHERE table_schema = 'maxscale_schema' AND table_name = 'replication_heartbeat'"))
int n_db = get_row_count(database, "SELECT schema_name FROM information_schema.schemata "
"WHERE schema_name = 'maxscale_schema'");
int n_tbl = get_row_count(database, "SELECT table_name FROM information_schema.tables "
"WHERE table_schema = 'maxscale_schema' "
"AND table_name = 'replication_heartbeat'");
if (n_db == -1 || n_tbl == -1 ||
(n_db == 0 && mxs_mysql_query(database->con, "CREATE DATABASE maxscale_schema")) ||
(n_tbl == 0 && mxs_mysql_query(database->con, "CREATE TABLE IF NOT EXISTS "
"maxscale_schema.replication_heartbeat "
"(maxscale_id INT NOT NULL, "
"master_server_id INT NOT NULL, "
"master_timestamp INT UNSIGNED NOT NULL, "
"PRIMARY KEY ( master_server_id, maxscale_id ) )")))
{
MXS_ERROR( "Error checking for replication_heartbeat in Master server"
": %s", mysql_error(database->con));
MXS_ERROR("Error creating maxscale_schema.replication_heartbeat "
"table in Master server: %s", mysql_error(database->con));
database->server->rlag = MAX_RLAG_NOT_AVAILABLE;
}
result = mysql_store_result(database->con);
if (result == NULL)
{
returned_rows = 0;
}
else
{
returned_rows = mysql_num_rows(result);
mysql_free_result(result);
}
if (0 == returned_rows)
{
/* create repl_heartbeat table in maxscale_schema database */
if (mxs_mysql_query(database->con, "CREATE TABLE IF NOT EXISTS "
"maxscale_schema.replication_heartbeat "
"(maxscale_id INT NOT NULL, "
"master_server_id INT NOT NULL, "
"master_timestamp INT UNSIGNED NOT NULL, "
"PRIMARY KEY ( master_server_id, maxscale_id ) )"))
{
MXS_ERROR("Error creating maxscale_schema.replication_heartbeat "
"table in Master server: %s", mysql_error(database->con));
database->server->rlag = MAX_RLAG_NOT_AVAILABLE;
}
return;
}
/* auto purge old values after 48 hours*/
@ -3263,9 +3417,10 @@ MXS_MONITORED_SERVER* select_new_master(MYSQL_MONITOR* mon,
if (cand_io > master_io ||
// If io sequences are identical, the slave with more events processed wins.
(cand_io == master_io && (cand_processed > master_processed ||
// Finally, if binlog positions are identical, prefer a slave with
// log_slave_updates.
(cand_processed == master_processed && cand_updates && !master_updates))))
// Finally, if binlog positions are identical,
// prefer a slave with log_slave_updates.
(cand_processed == master_processed &&
cand_updates && !master_updates))))
{
select_this = true;
}
@ -3303,10 +3458,12 @@ MXS_MONITORED_SERVER* select_new_master(MYSQL_MONITOR* mon,
*
* @param mon The monitor
* @param new_master The new master
* @param seconds_remaining How much time left
* @param err_out Json error output
* @return True if relay log was processed within time limit, or false if time ran out or an error occurred.
*/
bool failover_wait_relay_log(MYSQL_MONITOR* mon, MXS_MONITORED_SERVER* new_master, json_t** err_out)
bool failover_wait_relay_log(MYSQL_MONITOR* mon, MXS_MONITORED_SERVER* new_master, int seconds_remaining,
json_t** err_out)
{
MySqlServerInfo* master_info = get_server_info(mon, new_master);
time_t begin = time(NULL);
@ -3315,7 +3472,7 @@ bool failover_wait_relay_log(MYSQL_MONITOR* mon, MXS_MONITORED_SERVER* new_maste
while (master_info->relay_log_events() > 0 &&
query_ok &&
io_pos_stable &&
difftime(time(NULL), begin) < mon->failover_timeout)
difftime(time(NULL), begin) < seconds_remaining)
{
MXS_INFO("Relay log of server '%s' not yet empty, waiting to clear %" PRId64 " events.",
new_master->server->unique_name, master_info->relay_log_events());
@ -3465,6 +3622,36 @@ int redirect_slaves(MYSQL_MONITOR* mon, MXS_MONITORED_SERVER* new_master, const
return successes;
}
/**
* Print a redirect error to logs. If err_out exists, generate a combined error message by querying all
* the server parameters for connection errors and append these errors to err_out.
*
* @param demotion_target If not NULL, this is the first server to query.
* @param redirectable_slaves Other servers to query for errors.
* @param err_out If not null, the error output object.
*/
void print_redirect_errors(MXS_MONITORED_SERVER* first_server, const ServerVector& servers,
json_t** err_out)
{
// Individual server errors have already been printed to the log.
// For JSON, gather the errors again.
const char MSG[] = "Could not redirect any slaves to the new master.";
MXS_ERROR(MSG);
if (err_out)
{
ServerVector failed_slaves;
if (first_server)
{
failed_slaves.push_back(first_server);
}
failed_slaves.insert(failed_slaves.end(),
servers.begin(), servers.end());
string combined_error = get_connection_errors(failed_slaves);
*err_out = mxs_json_error_append(*err_out,
"%s Errors: %s.", MSG, combined_error.c_str());
}
}
/**
* Performs failover for a simple topology (1 master, N slaves, no intermediate masters).
*
@ -3480,23 +3667,61 @@ static bool do_failover(MYSQL_MONITOR* mon, json_t** err_out)
PRINT_MXS_JSON_ERROR(err_out, "Cluster gtid domain is unknown. Cannot failover.");
return false;
}
// Total time limit on how long this operation may take. Checked and modified after significant steps are
// completed.
int seconds_remaining = mon->failover_timeout;
time_t start_time = time(NULL);
// Step 1: Select new master. Also populate a vector with all slaves not the selected master.
ServerVector slaves;
MXS_MONITORED_SERVER* new_master = select_new_master(mon, &slaves, err_out);
ServerVector redirectable_slaves;
MXS_MONITORED_SERVER* new_master = select_new_master(mon, &redirectable_slaves, err_out);
if (new_master == NULL)
{
return false;
}
time_t step1_time = time(NULL);
seconds_remaining -= difftime(step1_time, start_time);
bool rval = false;
// Step 2: Wait until relay log consumed.
if (failover_wait_relay_log(mon, new_master, err_out) &&
// Step 3: Stop and reset slave, set read-only to 0.
promote_new_master(new_master, err_out))
if (failover_wait_relay_log(mon, new_master, seconds_remaining, err_out))
{
// Step 4: Redirect slaves.
int redirects = redirect_slaves(mon, new_master, slaves);
rval = slaves.empty() ? true : redirects > 0;
time_t step2_time = time(NULL);
int seconds_step2 = difftime(step2_time, step1_time);
MXS_DEBUG("Failover: relay log processing took %d seconds.", seconds_step2);
seconds_remaining -= seconds_step2;
// Step 3: Stop and reset slave, set read-only to 0.
if (promote_new_master(new_master, err_out))
{
// Step 4: Redirect slaves.
ServerVector redirected_slaves;
int redirects = redirect_slaves(mon, new_master, redirectable_slaves, &redirected_slaves);
bool success = redirectable_slaves.empty() ? true : redirects > 0;
if (success)
{
time_t step4_time = time(NULL);
seconds_remaining -= difftime(step4_time, step2_time);
// Step 5: Finally, add an event to the new master to advance gtid and wait for the slaves
// to receive it. seconds_remaining can be 0 or less at this point. Even in such a case
// wait_cluster_stabilization() may succeed if replication is fast enough.
if (wait_cluster_stabilization(mon, new_master, redirected_slaves, seconds_remaining))
{
rval = true;
time_t step5_time = time(NULL);
int seconds_step5 = difftime(step5_time, step4_time);
seconds_remaining -= seconds_step5;
MXS_DEBUG("Failover: slave replication confirmation took %d seconds with "
"%d seconds to spare.", seconds_step5, seconds_remaining);
}
}
else
{
print_redirect_errors(NULL, redirectable_slaves, err_out);
}
}
}
return rval;
}
@ -3994,28 +4219,11 @@ static bool do_switchover(MYSQL_MONITOR* mon, MXS_MONITORED_SERVER* current_mast
{
redirected_slaves.push_back(demotion_target);
}
int redirects = redirect_slaves(mon, promotion_target, redirectable_slaves, &redirected_slaves);
int redirects = redirect_slaves(mon, promotion_target,
redirectable_slaves, &redirected_slaves);
bool success = redirectable_slaves.empty() ? start_ok : start_ok || redirects > 0;
if (success == false)
{
rval = false;
// This is a special case. Individual server errors have already been printed to the log.
// For JSON, gather the errors again.
const char MSG[] = "Could not redirect any slaves to the new master.";
MXS_ERROR(MSG);
if (err_out)
{
ServerVector failed_slaves;
failed_slaves.push_back(demotion_target);
failed_slaves.insert(failed_slaves.end(),
redirectable_slaves.begin(), redirectable_slaves.end());
string combined_error = get_connection_errors(failed_slaves);
*err_out = mxs_json_error_append(*err_out,
"%s Errors: %s.", MSG, combined_error.c_str());
}
}
else
if (success)
{
time_t step5_time = time(NULL);
seconds_remaining -= difftime(step5_time, step3_time);
@ -4033,6 +4241,10 @@ static bool do_switchover(MYSQL_MONITOR* mon, MXS_MONITORED_SERVER* current_mast
"%d seconds to spare.", seconds_step6, seconds_remaining);
}
}
else
{
print_redirect_errors(demotion_target, redirectable_slaves, err_out);
}
}
}
@ -4043,12 +4255,12 @@ static bool do_switchover(MYSQL_MONITOR* mon, MXS_MONITORED_SERVER* current_mast
if (mxs_mysql_query(demotion_target->con, QUERY_UNDO) == 0)
{
PRINT_MXS_JSON_ERROR(err_out, "read_only disabled on server %s.",
demotion_target->server->unique_name);
demotion_target->server->unique_name);
}
else
{
PRINT_MXS_JSON_ERROR(err_out, "Could not disable read_only on server %s: '%s'.",
demotion_target->server->unique_name, mysql_error(demotion_target->con));
demotion_target->server->unique_name, mysql_error(demotion_target->con));
}
}
}
@ -4128,102 +4340,145 @@ static bool can_replicate_from(MYSQL_MONITOR* mon,
}
/**
* Check cluster for servers not replicating from the current master and redirect/join them. If an error
* occurs, stop and return negative value.
* Checks if a server is a possible rejoin candidate. A true result from this function is not yet sufficient
* criteria and another call to can_replicate_from() should be made.
*
* @param mon Cluster monitor
* @return The number of servers successfully redirected. Negative on I/O-error.
* @param server Server to check.
* @param master_info Master server info
* @return True, if server is a rejoin suspect.
*/
static int check_and_join_cluster(MYSQL_MONITOR* mon)
static bool server_is_rejoin_suspect(MYSQL_MONITOR* mon, MXS_MONITORED_SERVER* server,
MySqlServerInfo* master_info)
{
bool is_suspect = false;
if (!SERVER_IS_MASTER(server->server) && SERVER_IS_RUNNING(server->server))
{
MySqlServerInfo* server_info = get_server_info(mon, server);
SlaveStatusInfo* slave_status = &server_info->slave_status;
// Has no slave connection, yet is not a master.
if (server_info->n_slaves_configured == 0)
{
is_suspect = true;
}
// Or has existing slave connection ...
else if (server_info->n_slaves_configured == 1)
{
MXS_MONITORED_SERVER* master = mon->master;
// which is connected to master but it's the wrong one
if (slave_status->slave_io_running &&
slave_status->master_server_id != master_info->server_id)
{
is_suspect = true;
}
// or is disconnected but master host or port is wrong.
else if (!slave_status->slave_io_running && slave_status->slave_sql_running &&
(slave_status->master_host != master->server->name ||
slave_status->master_port != master->server->port))
{
is_suspect = true;
}
}
}
return is_suspect;
}
/**
* Scan the servers in the cluster and add (re)joinable servers to an array.
*
* @param mon Cluster monitor
* @param output Array to save results to. Each element is a valid (re)joinable server according
* to latest data.
* @return False, if there were possible rejoinable servers but communications error to master server
* prevented final checks.
*/
static bool get_joinable_servers(MYSQL_MONITOR* mon, ServerVector* output)
{
ss_dassert(output);
MXS_MONITORED_SERVER* master = mon->master;
MySqlServerInfo *master_info = get_server_info(mon, master);
// Whether a join operation should be attempted or not depends on several criteria. Start with the ones
// easiest to test. Go though all slaves and construct a preliminary list.
ServerVector suspects;
std::vector<MySqlServerInfo*> suspect_infos;
for (MXS_MONITORED_SERVER* server = mon->monitor->monitored_servers;
server != NULL;
server = server->next)
{
if (!SERVER_IS_MASTER(server->server) && SERVER_IS_RUNNING(server->server))
if (server_is_rejoin_suspect(mon, server, master_info))
{
MySqlServerInfo* server_info = get_server_info(mon, server);
SlaveStatusInfo* slave_status = &server_info->slave_status;
bool is_suspect = false;
// Has no slave connection, yet is not a master.
if (server_info->n_slaves_configured == 0)
{
is_suspect = true;
}
// Or has existing slave connection ...
else if (server_info->n_slaves_configured == 1)
{
// which is connected to master but it's the wrong one
if (slave_status->slave_io_running &&
slave_status->master_server_id != master_info->server_id)
{
is_suspect = true;
}
// or is disconnected but master host or port is wrong
else if (!slave_status->slave_io_running && slave_status->slave_sql_running &&
(slave_status->master_host != master->server->name ||
slave_status->master_port != master->server->port))
{
is_suspect = true;
}
}
if (is_suspect)
{
suspects.push_back(server);
suspect_infos.push_back(server_info);
}
suspects.push_back(server);
}
}
int rval = 0;
// Update Gtid of master for better info.
bool comm_ok = true;
if (!suspects.empty())
{
// Update Gtid of master for better info.
if (!update_gtids(mon, master, master_info))
if (update_gtids(mon, master, master_info))
{
rval = -1;
}
string change_cmd = generate_change_master_cmd(mon, master);
for (size_t i = 0; i < suspects.size() && rval >= 0; i++)
{
MXS_MONITORED_SERVER* suspect = suspects[i];
MySqlServerInfo* suspect_info = suspect_infos[i];
if (can_replicate_from(mon, suspect, suspect_info, master, master_info))
for (size_t i = 0; i < suspects.size(); i++)
{
bool op_success = true;
const char* name = suspect->server->unique_name;
const char* master_name = master->server->unique_name;
if (suspect_info->n_slaves_configured == 0)
MXS_MONITORED_SERVER* suspect = suspects[i];
MySqlServerInfo* suspect_info = get_server_info(mon, suspect);
if (can_replicate_from(mon, suspect, suspect_info, master, master_info))
{
MXS_NOTICE("Directing standalone server '%s' to replicate from '%s'.", name, master_name);
op_success = join_cluster(suspect, change_cmd.c_str());
}
else
{
MXS_NOTICE("Server '%s' is replicating from a server other than '%s', "
"redirecting it to '%s'.", name, master_name, master_name);
op_success = redirect_one_slave(suspect, change_cmd.c_str());
}
if (op_success)
{
rval++;
}
else
{
rval = -1;
output->push_back(suspect);
}
}
}
else
{
comm_ok = false;
}
}
return rval;
return comm_ok;
}
/**
* (Re)join given servers to the cluster. The servers in the array are assumed to be joinable.
* Usually the list is created by get_joinable_servers().
*
* @param mon Cluster monitor
* @param joinable_servers Which servers to rejoin
* @return The number of servers successfully rejoined
*/
static uint32_t do_rejoin(MYSQL_MONITOR* mon, const ServerVector& joinable_servers)
{
MXS_MONITORED_SERVER* master = mon->master;
uint32_t servers_joined = 0;
if (!joinable_servers.empty())
{
string change_cmd = generate_change_master_cmd(mon, master);
for (ServerVector::const_iterator iter = joinable_servers.begin();
iter != joinable_servers.end();
iter++)
{
MXS_MONITORED_SERVER* joinable = *iter;
const char* name = joinable->server->unique_name;
const char* master_name = master->server->unique_name;
MySqlServerInfo* redir_info = get_server_info(mon, joinable);
bool op_success;
if (redir_info->n_slaves_configured == 0)
{
MXS_NOTICE("Directing standalone server '%s' to replicate from '%s'.", name, master_name);
op_success = join_cluster(joinable, change_cmd.c_str());
}
else
{
MXS_NOTICE("Server '%s' is replicating from a server other than '%s', "
"redirecting it to '%s'.", name, master_name, master_name);
op_success = redirect_one_slave(joinable, change_cmd.c_str());
}
if (op_success)
{
servers_joined++;
}
}
}
return servers_joined;
}
/**
@ -4265,3 +4520,14 @@ static void disable_setting(MYSQL_MONITOR* mon, const char* setting)
p.value = const_cast<char*>("false");
monitorAddParameters(mon->monitor, &p);
}
/**
* Is the cluster a valid rejoin target
*
* @param mon Cluster monitor
* @return True, if cluster can be joined
*/
static bool cluster_can_be_joined(MYSQL_MONITOR* mon)
{
return (mon->master != NULL && SERVER_IS_MASTER(mon->master->server) && mon->master_gtid_domain >= 0);
}

View File

@ -1051,8 +1051,8 @@ static int
blr_slave_send_master_status(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave)
{
GWBUF *pkt;
char file[40];
char position[40];
char file[BINLOG_FNAMELEN + 1];
char position[BINLOG_FNAMELEN + 1];
uint8_t *ptr;
int len, file_len;
@ -1069,10 +1069,10 @@ blr_slave_send_master_status(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave)
BLR_TYPE_STRING, 40, 6);
blr_slave_send_eof(router, slave, 7);
sprintf(file, "%s", router->binlog_name);
snprintf(file, sizeof(file), "%s", router->binlog_name);
file_len = strlen(file);
sprintf(position, "%lu", router->binlog_position);
snprintf(position, sizeof(position), "%lu", router->binlog_position);
len = MYSQL_HEADER_LEN + 1 + file_len + strlen(position) + 1 + 3;
if ((pkt = gwbuf_alloc(len)) == NULL)