Merge branch '2.3' into develop
This commit is contained in:
commit
44d1b821c3
@ -50,6 +50,7 @@ find_package(ASAN)
|
||||
find_package(TSAN)
|
||||
find_package(CURL)
|
||||
find_package(PAM)
|
||||
find_package(UBSAN)
|
||||
|
||||
# Build PCRE2 so we always know the version
|
||||
# Read BuildPCRE2 for details about how to add pcre2 as a dependency to a target
|
||||
@ -155,6 +156,9 @@ if (WITH_ASAN AND ASAN_FOUND)
|
||||
elseif (WITH_TSAN AND TSAN_FOUND)
|
||||
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fsanitize=thread")
|
||||
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=thread")
|
||||
elseif (WITH_UBSAN AND UBSAN_FOUND)
|
||||
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fsanitize=undefined")
|
||||
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=undefined")
|
||||
endif()
|
||||
|
||||
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -std=c99 -D_GNU_SOURCE=1 ${FLAGS}")
|
||||
|
@ -92,7 +92,7 @@ generating the Unix user and password should be enough.
|
||||
|
||||
## Implementation details and limitations
|
||||
|
||||
The PAM general authentication scheme is difficult for a proxy such as MaxScale.
|
||||
The general PAM authentication scheme is difficult for a proxy such as MaxScale.
|
||||
An application using the PAM interface needs to define a *conversation function*
|
||||
to allow the OS PAM modules to communicate with the client, possibly exchanging
|
||||
multiple messages. This works when a client logs in to a normal server, but not
|
||||
@ -101,25 +101,22 @@ MaxScale to successfully log into the servers, the messages and answers need to
|
||||
be predefined. This requirement denies the use of more exotic schemes such as
|
||||
one-time passwords or two-factor authentication.
|
||||
|
||||
The current version of the MaxScale PAM authentication module only supports a
|
||||
The MaxScale PAM authentication module only supports a
|
||||
simple password exchange. On the client side, the authentication begins with
|
||||
MaxScale sending an AuthSwitchRequest packet. In addition to the command, the
|
||||
packet contains the client plugin name `dialog`, a message type byte `4` and the
|
||||
message `Password: `. In the next packet, the client should send the password,
|
||||
which MaxScale will forward to the PAM API running on the local machine. If the
|
||||
password is correct, an OK packet is sent to the client. No additional
|
||||
PAM-related messaging is allowed, as this would indicate a more complicated
|
||||
authentication scheme.
|
||||
password is correct, an OK packet is sent to the client. If the local PAM API asks
|
||||
for additional credentials as is typical in two-factor authentication schemes,
|
||||
authentication fails. Informational messages such as password expiration
|
||||
notifications are allowed. These are simply printed to the log.
|
||||
|
||||
On the backend side, MaxScale expects the servers to act as MaxScale did towards
|
||||
the client. The servers should send an AuthSwitchRequest packet as defined
|
||||
above, MaxScale responds with the password received by the client authenticator
|
||||
and finally backend replies with OK.
|
||||
|
||||
## SSL support
|
||||
|
||||
PAM Authenticator supports SSL connections from client to MaxScale, but not from
|
||||
MaxScale to backends.
|
||||
and finally backend replies with OK. Informational messages from backends are
|
||||
only printed to the info-log.
|
||||
|
||||
## Building the module
|
||||
|
||||
|
@ -1364,7 +1364,7 @@ server.
|
||||
serversize=10
|
||||
```
|
||||
|
||||
The service would then have the parameter `weightby=serversize`. If there are 4
|
||||
The service would then have the parameter `weightby=serv_weight`. If there are 4
|
||||
servers defined in the service (serverA, serverB, serverC and serverD) with the
|
||||
serversize set as shown in the table below, the connections would balanced using
|
||||
the percentages in this table.
|
||||
|
@ -360,10 +360,11 @@ following:
|
||||
|
||||
1. Prepare the old master for demotion:
|
||||
1. Stop any external replication.
|
||||
2. Enable the *read\_only*-flag to stop writes.
|
||||
3. Disable scheduled server events (if event handling is on).
|
||||
4. Run the commands in `demotion_sql_file`.
|
||||
5. Flush the binary log (FLUSH LOGS) so that all events are on disk.
|
||||
2. Kill connections from super-users since *read\_only* does not affect them.
|
||||
3. Enable the *read\_only*-flag to stop writes.
|
||||
4. Disable scheduled server events (if event handling is on).
|
||||
5. Run the commands in `demotion_sql_file`.
|
||||
6. Flush the binary log (FLUSH LOGS) so that all events are on disk.
|
||||
2. Wait for the new master to catch up with the old master.
|
||||
3. Promote new master and redirect slaves as in failover steps 3 and 4. Also
|
||||
redirect the demoted old master.
|
||||
|
14
cmake/FindUBSAN.cmake
Normal file
14
cmake/FindUBSAN.cmake
Normal file
@ -0,0 +1,14 @@
|
||||
# Find UBSan libraries
|
||||
#
|
||||
# The following variables are set:
|
||||
# UBSAN_FOUND - If UBSan was found
|
||||
# UBSAN_LIBRARIES - Path to the libubsan library
|
||||
|
||||
find_library(UBSAN_LIBRARIES NAMES libubsan.so.0 libubsan.so.1 libubsan.so.2 libubsan.so.3 libubsan.so.4)
|
||||
|
||||
if (UBSAN_LIBRARIES)
|
||||
message(STATUS "Found UBSan libraries: ${UBSAN_LIBRARIES}")
|
||||
set(UBSAN_FOUND TRUE CACHE INTERNAL "")
|
||||
else()
|
||||
message(STATUS "Could not find UBSan")
|
||||
endif()
|
@ -735,6 +735,7 @@ public:
|
||||
Buffer(GWBUF* pBuffer)
|
||||
: m_pBuffer(pBuffer)
|
||||
{
|
||||
mxb_assert(pBuffer);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -305,11 +305,10 @@ typedef enum
|
||||
MXS_COM_RESET_CONNECTION = 31,
|
||||
MXS_COM_STMT_BULK_EXECUTE = 0xfa,
|
||||
MXS_COM_MULTI = 0xfe,
|
||||
MXS_COM_END
|
||||
MXS_COM_END,
|
||||
MXS_COM_UNDEFINED = -1
|
||||
} mxs_mysql_cmd_t;
|
||||
|
||||
static const mxs_mysql_cmd_t MXS_COM_UNDEFINED = (mxs_mysql_cmd_t) -1;
|
||||
|
||||
/**
|
||||
* A GWBUF property with this name will contain the latest GTID in string form.
|
||||
* This information is only available in OK packets.
|
||||
@ -378,6 +377,7 @@ static inline uint32_t MYSQL_GET_PAYLOAD_LEN(const uint8_t* header)
|
||||
|
||||
static inline uint32_t MYSQL_GET_PACKET_LEN(const GWBUF* buffer)
|
||||
{
|
||||
mxb_assert(buffer);
|
||||
return MYSQL_GET_PAYLOAD_LEN(GWBUF_DATA(buffer)) + MYSQL_HEADER_LEN;
|
||||
}
|
||||
|
||||
@ -628,6 +628,7 @@ void mxs_mysql_set_current_db(MXS_SESSION* session, const char* db);
|
||||
*/
|
||||
static inline uint8_t mxs_mysql_get_command(GWBUF* buffer)
|
||||
{
|
||||
mxb_assert(buffer);
|
||||
if (GWBUF_LENGTH(buffer) > MYSQL_HEADER_LEN)
|
||||
{
|
||||
return GWBUF_DATA(buffer)[4];
|
||||
@ -651,6 +652,7 @@ static inline uint8_t mxs_mysql_get_command(GWBUF* buffer)
|
||||
*/
|
||||
static inline uint32_t mxs_mysql_get_packet_len(GWBUF* buffer)
|
||||
{
|
||||
mxb_assert(buffer);
|
||||
// The first three bytes of the packet header contain its length
|
||||
uint8_t buf[3];
|
||||
gwbuf_copy_data(buffer, 0, 3, buf);
|
||||
|
@ -199,6 +199,9 @@ add_test_executable(maxctrl_basic.cpp maxctrl_basic maxctrl_basic LABELS maxctrl
|
||||
# MXS-2167: Monitors should be able to use extra_port
|
||||
add_test_executable(mxs2167_extra_port.cpp mxs2167_extra_port mxs2167_extra_port LABELS REPL_BACKEND)
|
||||
|
||||
# Test KILL QUERY functionality
|
||||
add_test_executable(kill_query.cpp kill_query replication LABELS REPL_BACKEND)
|
||||
|
||||
############################################
|
||||
# BEGIN: Tests that require GTID #
|
||||
############################################
|
||||
@ -964,6 +967,12 @@ add_test_executable(pam_authentication.cpp pam_authentication pam_authentication
|
||||
# MXS-2350: On-demand connection creation
|
||||
add_test_executable(mxs2350_lazy_connect.cpp mxs2350_lazy_connect mxs2350_lazy_connect LABELS REPL_BACKEND)
|
||||
|
||||
# MXS-2520: Allow master reconnection on reads
|
||||
add_test_executable(mxs2520_master_read_reconnect.cpp mxs2520_master_read_reconnect mxs2520_master_read_reconnect LABELS REPL_BACKEND readwritesplit)
|
||||
|
||||
# MXS-2464: Crash in route_stored_query with ReadWriteSplit
|
||||
add_test_executable(mxs2464_sescmd_reconnect.cpp mxs2464_sescmd_reconnect mxs2464_sescmd_reconnect LABELS REPL_BACKEND readwritesplit)
|
||||
|
||||
############################################
|
||||
# BEGIN: binlogrouter and avrorouter tests #
|
||||
############################################
|
||||
|
@ -0,0 +1,29 @@
|
||||
[maxscale]
|
||||
threads=###threads###
|
||||
|
||||
[MySQL-Monitor]
|
||||
type=monitor
|
||||
module=mysqlmon
|
||||
servers=###server_line###
|
||||
user=maxskysql
|
||||
password=skysql
|
||||
monitor_interval=1000
|
||||
backend_read_timeout=1
|
||||
backend_connect_timeout=1
|
||||
|
||||
[RW-Split-Router]
|
||||
type=service
|
||||
router=readwritesplit
|
||||
servers=###server_line###
|
||||
user=maxskysql
|
||||
password=skysql
|
||||
delayed_retry_timeout=1
|
||||
transaction_replay=true
|
||||
|
||||
[RW-Split-Listener]
|
||||
type=listener
|
||||
service=RW-Split-Router
|
||||
protocol=MySQLClient
|
||||
port=4006
|
||||
|
||||
###server###
|
@ -0,0 +1,30 @@
|
||||
[maxscale]
|
||||
threads=###threads###
|
||||
log_info=1
|
||||
|
||||
[MySQL-Monitor]
|
||||
type=monitor
|
||||
module=mysqlmon
|
||||
servers=server1
|
||||
user=maxskysql
|
||||
password=skysql
|
||||
monitor_interval=1000
|
||||
backend_read_timeout=1
|
||||
backend_connect_timeout=1
|
||||
|
||||
[RW-Split-Router]
|
||||
type=service
|
||||
router=readwritesplit
|
||||
servers=server1
|
||||
user=maxskysql
|
||||
password=skysql
|
||||
delayed_retry_timeout=45
|
||||
transaction_replay=true
|
||||
|
||||
[RW-Split-Listener]
|
||||
type=listener
|
||||
service=RW-Split-Router
|
||||
protocol=MySQLClient
|
||||
port=4006
|
||||
|
||||
###server###
|
50
maxscale-system-test/kill_query.cpp
Normal file
50
maxscale-system-test/kill_query.cpp
Normal file
@ -0,0 +1,50 @@
|
||||
/**
|
||||
* Test KILL QUERY functionality
|
||||
*/
|
||||
|
||||
#include "testconnections.h"
|
||||
|
||||
int main(int argc, char* argv[])
|
||||
{
|
||||
TestConnections test(argc, argv);
|
||||
|
||||
auto conn = test.maxscales->rwsplit();
|
||||
conn.connect();
|
||||
conn.query("CREATE OR REPLACE TABLE test.t1 (id LONGTEXT)");
|
||||
|
||||
for (int x = 0; x < 10; x++)
|
||||
{
|
||||
conn.query("INSERT INTO test.t1 VALUES (REPEAT('a', 5000000))");
|
||||
}
|
||||
|
||||
for (int i = 0; i < 3; i++)
|
||||
{
|
||||
auto a = test.maxscales->rwsplit();
|
||||
auto b = test.maxscales->rwsplit();
|
||||
test.expect(a.connect() && b.connect(), "Connections should work");
|
||||
|
||||
auto id = a.thread_id();
|
||||
|
||||
test.set_timeout(15);
|
||||
std::thread thr(
|
||||
[&]() {
|
||||
// The ALTER should take longer than 15 seconds to complete so that a KILL is required to
|
||||
// interrupt it.
|
||||
test.expect(!a.query("ALTER TABLE test.t1 FORCE"), "ALTER should fail");
|
||||
|
||||
const char* expected = "Query execution was interrupted";
|
||||
test.expect(strstr(a.error(), expected),
|
||||
"Alter should fail with '%s' but it failed with '%s'",
|
||||
expected, a.error());
|
||||
});
|
||||
|
||||
test.expect(b.query("KILL QUERY " + std::to_string(id)), "KILL QUERY failed: %s", b.error());
|
||||
thr.join();
|
||||
|
||||
test.stop_timeout();
|
||||
}
|
||||
|
||||
conn.query("DROP TABLE test.t1");
|
||||
|
||||
return test.global_result;
|
||||
}
|
@ -348,6 +348,11 @@ public:
|
||||
m_pw = pw;
|
||||
}
|
||||
|
||||
uint32_t thread_id() const
|
||||
{
|
||||
return mysql_thread_id(m_conn);
|
||||
}
|
||||
|
||||
private:
|
||||
std::string m_host;
|
||||
int m_port;
|
||||
|
35
maxscale-system-test/mxs2464_sescmd_reconnect.cpp
Normal file
35
maxscale-system-test/mxs2464_sescmd_reconnect.cpp
Normal file
@ -0,0 +1,35 @@
|
||||
/**
|
||||
* MXS-2464: Crash in route_stored_query with ReadWriteSplit
|
||||
* https://jira.mariadb.org/browse/MXS-2464
|
||||
*/
|
||||
|
||||
#include "testconnections.h"
|
||||
|
||||
int main(int argc, char* argv[])
|
||||
{
|
||||
TestConnections test(argc, argv);
|
||||
|
||||
test.maxscales->connect();
|
||||
std::thread thr([&]() {
|
||||
sleep(5);
|
||||
test.tprintf("block node 0");
|
||||
test.repl->block_node(0);
|
||||
test.tprintf("wait for monitor");
|
||||
test.maxscales->wait_for_monitor(2);
|
||||
test.tprintf("unblock node 0");
|
||||
test.repl->unblock_node(0);
|
||||
});
|
||||
|
||||
constexpr const char* query = "SET @a = (SELECT SLEEP(10))";
|
||||
test.set_timeout(60);
|
||||
test.tprintf("%s", query);
|
||||
test.try_query(test.maxscales->conn_rwsplit[0], query);
|
||||
test.stop_timeout();
|
||||
|
||||
test.tprintf("disconnect");
|
||||
test.maxscales->disconnect();
|
||||
test.tprintf("join");
|
||||
thr.join();
|
||||
|
||||
return test.global_result;
|
||||
}
|
34
maxscale-system-test/mxs2520_master_read_reconnect.cpp
Normal file
34
maxscale-system-test/mxs2520_master_read_reconnect.cpp
Normal file
@ -0,0 +1,34 @@
|
||||
/**
|
||||
* MXS-2520: Allow master reconnection on reads
|
||||
* https://jira.mariadb.org/browse/MXS-2520
|
||||
*/
|
||||
|
||||
#include "testconnections.h"
|
||||
|
||||
int main(int argc, char* argv[])
|
||||
{
|
||||
TestConnections test(argc, argv);
|
||||
|
||||
test.maxscales->connect();
|
||||
std::thread thr([&]() {
|
||||
sleep(5);
|
||||
test.tprintf("block node 0");
|
||||
test.repl->block_node(0);
|
||||
test.tprintf("wait for monitor");
|
||||
test.maxscales->wait_for_monitor(2);
|
||||
test.tprintf("unblock node 0");
|
||||
test.repl->unblock_node(0);
|
||||
});
|
||||
|
||||
test.set_timeout(60);
|
||||
test.tprintf("SELECT SLEEP(10)");
|
||||
test.try_query(test.maxscales->conn_rwsplit[0], "SELECT SLEEP(10)");
|
||||
test.stop_timeout();
|
||||
|
||||
test.tprintf("disconnect");
|
||||
test.maxscales->disconnect();
|
||||
test.tprintf("join");
|
||||
thr.join();
|
||||
|
||||
return test.global_result;
|
||||
}
|
@ -322,11 +322,11 @@ int Nodes::read_basic_env()
|
||||
{
|
||||
// reading IPs
|
||||
sprintf(env_name, "%s_%03d_network", prefix, i);
|
||||
IP[i] = get_nc_item((char*) env_name);
|
||||
IP[i] = strdup(get_nc_item(env_name).c_str());
|
||||
|
||||
// reading private IPs
|
||||
sprintf(env_name, "%s_%03d_private_ip", prefix, i);
|
||||
IP_private[i] = get_nc_item((char*) env_name);
|
||||
IP_private[i] = strdup(get_nc_item(env_name).c_str());
|
||||
if (IP_private[i] == NULL)
|
||||
{
|
||||
IP_private[i] = IP[i];
|
||||
@ -335,7 +335,7 @@ int Nodes::read_basic_env()
|
||||
|
||||
// reading IPv6
|
||||
sprintf(env_name, "%s_%03d_network6", prefix, i);
|
||||
IP6[i] = get_nc_item((char*) env_name);
|
||||
IP6[i] = strdup(get_nc_item(env_name).c_str());
|
||||
if (IP6[i] == NULL)
|
||||
{
|
||||
IP6[i] = IP[i];
|
||||
@ -344,11 +344,11 @@ int Nodes::read_basic_env()
|
||||
|
||||
//reading sshkey
|
||||
sprintf(env_name, "%s_%03d_keyfile", prefix, i);
|
||||
sshkey[i] = get_nc_item((char*) env_name);
|
||||
sshkey[i] = strdup(get_nc_item(env_name).c_str());
|
||||
|
||||
|
||||
sprintf(env_name, "%s_%03d_whoami", prefix, i);
|
||||
access_user[i] = get_nc_item((char*) env_name);
|
||||
access_user[i] = strdup(get_nc_item(env_name).c_str());
|
||||
if (access_user[i] == NULL)
|
||||
{
|
||||
access_user[i] = (char *) "vagrant";
|
||||
@ -369,7 +369,7 @@ int Nodes::read_basic_env()
|
||||
}
|
||||
|
||||
sprintf(env_name, "%s_%03d_hostname", prefix, i);
|
||||
hostname[i] = get_nc_item((char*) env_name);
|
||||
hostname[i] = strdup(get_nc_item(env_name).c_str());
|
||||
if (hostname[i] == NULL)
|
||||
{
|
||||
hostname[i] = IP[i];
|
||||
@ -396,13 +396,14 @@ const char* Nodes::ip(int i) const
|
||||
return use_ipv6 ? IP6[i] : IP[i];
|
||||
}
|
||||
|
||||
char * Nodes::get_nc_item(char * item_name)
|
||||
std::string Nodes::get_nc_item(const char* item_name)
|
||||
{
|
||||
size_t start = network_config.find(item_name);
|
||||
if (start == std::string::npos)
|
||||
{
|
||||
return NULL;
|
||||
return "";
|
||||
}
|
||||
|
||||
size_t end = network_config.find("\n", start);
|
||||
size_t equal = network_config.find("=", start);
|
||||
if (end == std::string::npos)
|
||||
@ -411,14 +412,14 @@ char * Nodes::get_nc_item(char * item_name)
|
||||
}
|
||||
if (equal == std::string::npos)
|
||||
{
|
||||
return NULL;
|
||||
return "";
|
||||
}
|
||||
|
||||
char * cstr = new char [end - equal + 1];
|
||||
strcpy(cstr, network_config.substr(equal + 1, end - equal - 1).c_str());
|
||||
setenv(item_name, cstr, 1);
|
||||
std::string str = network_config.substr(equal + 1, end - equal - 1);
|
||||
|
||||
return (cstr);
|
||||
setenv(item_name, str.c_str(), 1);
|
||||
|
||||
return str;
|
||||
}
|
||||
|
||||
int Nodes::get_N()
|
||||
|
@ -181,9 +181,9 @@ public:
|
||||
/**
|
||||
* @brief get_nc_item Find variable in the MDBCI network_config file
|
||||
* @param item_name Name of the variable
|
||||
* @return value of variable
|
||||
* @return value of variable or empty value if not found
|
||||
*/
|
||||
char *get_nc_item(char * item_name);
|
||||
std::string get_nc_item(const char* item_name);
|
||||
|
||||
/**
|
||||
* @brief get_N Calculate the number of nodes discribed in the _netoek_config file
|
||||
|
@ -74,6 +74,8 @@ target_link_libraries(maxscale-common
|
||||
|
||||
if(WITH_ASAN AND ASAN_FOUND)
|
||||
target_link_libraries(maxscale-common ${ASAN_LIBRARIES})
|
||||
elseif(WITH_UBSAN AND UBSAN_FOUND)
|
||||
target_link_libraries(maxscale-common ${UBSAN_LIBRARIES})
|
||||
endif()
|
||||
|
||||
find_library(HAVE_LIBDL NAMES dl)
|
||||
|
@ -53,6 +53,7 @@ void Backend::close(close_type type)
|
||||
{
|
||||
m_closed = true;
|
||||
m_closed_at = time(NULL);
|
||||
m_session_commands.clear();
|
||||
|
||||
if (in_use())
|
||||
{
|
||||
|
@ -623,6 +623,10 @@ static bool mysql_auth_set_client_data(MYSQL_session* client_data,
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
client_data->correct_authenticator = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -305,7 +305,7 @@ class Rule;
|
||||
typedef std::shared_ptr<Rule> SRule;
|
||||
|
||||
/** Helper function for strdup'ing in printf style */
|
||||
char* create_error(const char* format, ...);
|
||||
char* create_error(const char* format, ...) __attribute__ ((nonnull));
|
||||
|
||||
/**
|
||||
* Check if a rule matches
|
||||
|
@ -229,6 +229,53 @@ std::string get_version_string(SERVICE* service)
|
||||
return rval;
|
||||
}
|
||||
|
||||
uint8_t get_charset(SERVER_REF* servers)
|
||||
{
|
||||
uint8_t rval = 0;
|
||||
|
||||
for (SERVER_REF* s = servers; s; s = s->next)
|
||||
{
|
||||
if (server_ref_is_active(s))
|
||||
{
|
||||
if (s->server->is_master())
|
||||
{
|
||||
// Master found, stop searching
|
||||
rval = s->server->charset;
|
||||
break;
|
||||
}
|
||||
else if (s->server->is_slave() || (s->server->is_running() && rval == 0))
|
||||
{
|
||||
// Slaves precede Running servers
|
||||
rval = s->server->charset;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (rval == 0)
|
||||
{
|
||||
// Charset 8 is latin1, the server default
|
||||
rval = 8;
|
||||
}
|
||||
|
||||
return rval;
|
||||
}
|
||||
|
||||
bool supports_extended_caps(SERVER_REF* servers)
|
||||
{
|
||||
bool rval = false;
|
||||
|
||||
for (SERVER_REF* s = servers; s; s = s->next)
|
||||
{
|
||||
if (s->active && s->server->is_active && s->server->version().total >= 100200)
|
||||
{
|
||||
rval = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return rval;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Check whether a DCB requires SSL.
|
||||
*
|
||||
@ -280,25 +327,13 @@ int MySQLSendHandshake(DCB* dcb)
|
||||
uint8_t mysql_plugin_data[13] = "";
|
||||
uint8_t mysql_server_capabilities_one[2];
|
||||
uint8_t mysql_server_capabilities_two[2];
|
||||
uint8_t mysql_server_language = 8;
|
||||
uint8_t mysql_server_language = get_charset(dcb->service->dbref);
|
||||
uint8_t mysql_server_status[2];
|
||||
uint8_t mysql_scramble_len = 21;
|
||||
uint8_t mysql_filler_ten[10] = {};
|
||||
/* uint8_t mysql_last_byte = 0x00; not needed */
|
||||
char server_scramble[GW_MYSQL_SCRAMBLE_SIZE + 1] = "";
|
||||
|
||||
bool is_maria = false;
|
||||
|
||||
if (dcb->service->dbref)
|
||||
{
|
||||
mysql_server_language = dcb->service->dbref->server->charset;
|
||||
|
||||
if (dcb->service->dbref->server->version().total >= 100200)
|
||||
{
|
||||
/** The backend servers support the extended capabilities */
|
||||
is_maria = true;
|
||||
}
|
||||
}
|
||||
bool is_maria = supports_extended_caps(dcb->service->dbref);
|
||||
|
||||
MySQLProtocol* protocol = DCB_PROTOCOL(dcb, MySQLProtocol);
|
||||
GWBUF* buf;
|
||||
@ -2062,11 +2097,12 @@ spec_com_res_t handle_query_kill(DCB* dcb,
|
||||
querybuf[copied_len] = '\0';
|
||||
kill_type_t kt = KT_CONNECTION;
|
||||
uint64_t thread_id = 0;
|
||||
rval = RES_END;
|
||||
std::string user;
|
||||
|
||||
if (parse_kill_query(querybuf, &thread_id, &kt, &user))
|
||||
{
|
||||
rval = RES_END;
|
||||
|
||||
if (thread_id > 0)
|
||||
{
|
||||
mxs_mysql_execute_kill(dcb->session, thread_id, kt);
|
||||
|
@ -282,16 +282,20 @@ RWBackend* RWSplitSession::get_slave_backend(int max_rlag)
|
||||
auto counts = get_slave_counts(m_raw_backends, m_current_master);
|
||||
int best_priority {INT_MAX};
|
||||
auto current_rank = get_current_rank();
|
||||
// Slaves can be taken into use if we need more slave connections
|
||||
bool need_slaves = counts.second < m_router->max_slave_count();
|
||||
|
||||
// Create a list of backends valid for read operations
|
||||
for (auto& backend : m_raw_backends)
|
||||
{
|
||||
bool can_take_slave_into_use = !backend->in_use() && can_recover_servers()
|
||||
&& backend->can_connect() && counts.second < m_router->max_slave_count()
|
||||
&& (backend->is_slave() || backend->is_master());
|
||||
|
||||
// We can take the current master back into use even for reads
|
||||
bool my_master = backend == m_current_master;
|
||||
bool can_take_into_use = !backend->in_use() && can_recover_servers() && backend->can_connect();
|
||||
bool master_or_slave = backend->is_master() || backend->is_slave();
|
||||
bool is_usable = backend->in_use() || can_take_slave_into_use;
|
||||
|
||||
// The server is usable if it's already in use or it can be taken into use and we need either more
|
||||
// slaves or a master.
|
||||
bool is_usable = backend->in_use() || (can_take_into_use && (need_slaves || my_master));
|
||||
bool rlag_ok = rpl_lag_is_ok(backend, max_rlag);
|
||||
int priority = get_backend_priority(backend, m_config.master_accept_reads);
|
||||
auto rank = backend->server()->rank();
|
||||
|
@ -155,6 +155,7 @@ int32_t RWSplitSession::routeQuery(GWBUF* querybuf)
|
||||
m_query_queue.emplace_back(querybuf);
|
||||
querybuf = NULL;
|
||||
rval = 1;
|
||||
mxb_assert(m_expected_responses != 0);
|
||||
|
||||
if (m_expected_responses == 0 && !route_stored_query())
|
||||
{
|
||||
@ -806,6 +807,7 @@ void RWSplitSession::clientReply(GWBUF* writebuf, DCB* backend_dcb)
|
||||
// Backend is still in use and has more session commands to execute
|
||||
if (backend->execute_session_command() && backend->is_waiting_result())
|
||||
{
|
||||
MXS_INFO("%lu session commands left on '%s'", backend->session_command_count(), backend->name());
|
||||
m_expected_responses++;
|
||||
}
|
||||
}
|
||||
@ -948,6 +950,44 @@ bool RWSplitSession::start_trx_replay()
|
||||
return rval;
|
||||
}
|
||||
|
||||
bool RWSplitSession::retry_master_query(RWBackend* backend)
|
||||
{
|
||||
bool can_continue = false;
|
||||
|
||||
if (backend->has_session_commands())
|
||||
{
|
||||
// Try to route the session command again. If the master is not available, the response will be
|
||||
// returned from one of the slaves.
|
||||
|
||||
mxb_assert(!m_current_query.get());
|
||||
mxb_assert(!m_sescmd_list.empty());
|
||||
mxb_assert(m_sescmd_count >= 2);
|
||||
MXS_INFO("Retrying session command due to master failure: %s",
|
||||
backend->next_session_command()->to_string().c_str());
|
||||
|
||||
// Before routing it, pop the failed session command off the list and decrement the number of
|
||||
// executed session commands. This "overwrites" the existing command and prevents history duplication.
|
||||
m_sescmd_list.pop_back();
|
||||
--m_sescmd_count;
|
||||
|
||||
retry_query(backend->next_session_command()->deep_copy_buffer());
|
||||
can_continue = true;
|
||||
}
|
||||
else if (m_current_query.get())
|
||||
{
|
||||
retry_query(m_current_query.release());
|
||||
can_continue = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
// This should never happen
|
||||
mxb_assert_message(!true, "m_current_query is empty and no session commands being executed");
|
||||
MXS_ERROR("Current query unexpectedly empty when trying to retry query on master");
|
||||
}
|
||||
|
||||
return can_continue;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Router error handling routine
|
||||
*
|
||||
@ -1014,8 +1054,7 @@ void RWSplitSession::handleError(GWBUF* errmsgbuf,
|
||||
|
||||
if (can_retry_query())
|
||||
{
|
||||
can_continue = true;
|
||||
retry_query(m_current_query.release());
|
||||
can_continue = retry_master_query(backend);
|
||||
}
|
||||
else if (m_config.master_failure_mode == RW_ERROR_ON_WRITE)
|
||||
{
|
||||
@ -1151,6 +1190,7 @@ bool RWSplitSession::handle_error_new_connection(DCB* backend_dcb, GWBUF* errmsg
|
||||
|
||||
if (stored && m_config.retry_failed_reads)
|
||||
{
|
||||
mxb_assert(m_expected_responses == 0);
|
||||
MXS_INFO("Re-routing failed read after server '%s' failed", backend->name());
|
||||
retry_query(stored, 0);
|
||||
}
|
||||
|
@ -186,6 +186,7 @@ private:
|
||||
int get_max_replication_lag();
|
||||
mxs::RWBackend* get_backend_from_dcb(DCB* dcb);
|
||||
|
||||
bool retry_master_query(mxs::RWBackend* backend);
|
||||
void handle_error_reply_client(DCB* backend_dcb, GWBUF* errmsg);
|
||||
bool handle_error_new_connection(DCB* backend_dcb, GWBUF* errmsg);
|
||||
void manage_transactions(mxs::RWBackend* backend, GWBUF* writebuf);
|
||||
|
Loading…
x
Reference in New Issue
Block a user