Merge branch 'develop' of github.com:mariadb-corporation/MaxScale into develop
This commit is contained in:
commit
ca21d27c68
@ -79,6 +79,7 @@ For more details, please refer to:
|
||||
the master. There is also limited capability for rejoining nodes.
|
||||
|
||||
For more details, please refer to:
|
||||
* [MariaDB MaxScale 2.2.21 Release Notes](Release-Notes/MaxScale-2.2.21-Release-Notes.md)
|
||||
* [MariaDB MaxScale 2.2.20 Release Notes](Release-Notes/MaxScale-2.2.20-Release-Notes.md)
|
||||
* [MariaDB MaxScale 2.2.19 Release Notes](Release-Notes/MaxScale-2.2.19-Release-Notes.md)
|
||||
* [MariaDB MaxScale 2.2.18 Release Notes](Release-Notes/MaxScale-2.2.18-Release-Notes.md)
|
||||
|
@ -15,6 +15,10 @@ GET /v1/sessions/:id
|
||||
Get a single session. _:id_ must be a valid session ID. The session ID is the
|
||||
same that is exposed to the client as the connection ID.
|
||||
|
||||
This endpoint also supports the `rdns=true` parameter, which instructs MaxScale to
|
||||
perform reverse DNS on the client IP address. As this requires communicating with
|
||||
an external server, the operation may be expensive.
|
||||
|
||||
#### Response
|
||||
|
||||
`Status: 200 OK`
|
||||
|
41
Documentation/Release-Notes/MaxScale-2.2.21-Release-Notes.md
Normal file
41
Documentation/Release-Notes/MaxScale-2.2.21-Release-Notes.md
Normal file
@ -0,0 +1,41 @@
|
||||
# MariaDB MaxScale 2.2.21 Release Notes -- 2019-05-08
|
||||
|
||||
Release 2.2.21 is a GA release.
|
||||
|
||||
This document describes the changes in release 2.2.21, when compared to the
|
||||
previous release in the same series.
|
||||
|
||||
For any problems you encounter, please consider submitting a bug
|
||||
report on [our Jira](https://jira.mariadb.org/projects/MXS).
|
||||
|
||||
## Bug fixes
|
||||
|
||||
* [MXS-2410](https://jira.mariadb.org/browse/MXS-2410) Hangup delivered to wrong DCB
|
||||
* [MXS-2366](https://jira.mariadb.org/browse/MXS-2366) Wrong tarball RPATH
|
||||
|
||||
## Changes to MariaDB-Monitor failover
|
||||
|
||||
Failover is no longer disabled permanently if it or any other cluster operation fails.
|
||||
The disabling is now only temporary and lasts for 'failcount' monitor iterations. Check
|
||||
[MariaDB-Monitor documentation](../Monitors/MariaDB-Monitor.md#limitations-and-requirements)
|
||||
for more information.
|
||||
|
||||
## Known Issues and Limitations
|
||||
|
||||
There are some limitations and known issues within this version of MaxScale.
|
||||
For more information, please refer to the [Limitations](../About/Limitations.md) document.
|
||||
|
||||
## Packaging
|
||||
|
||||
RPM and Debian packages are provided for supported the Linux distributions.
|
||||
|
||||
Packages can be downloaded [here](https://mariadb.com/downloads/mariadb-tx/maxscale).
|
||||
|
||||
## Source Code
|
||||
|
||||
The source code of MaxScale is tagged at GitHub with a tag, which is identical
|
||||
with the version of MaxScale. For instance, the tag of version X.Y.Z of MaxScale
|
||||
is `maxscale-X.Y.Z`. Further, the default branch is always the latest GA version
|
||||
of MaxScale.
|
||||
|
||||
The source code is available [here](https://github.com/mariadb-corporation/MaxScale).
|
@ -5,7 +5,7 @@
|
||||
|
||||
set(MAXSCALE_VERSION_MAJOR "2" CACHE STRING "Major version")
|
||||
set(MAXSCALE_VERSION_MINOR "2" CACHE STRING "Minor version")
|
||||
set(MAXSCALE_VERSION_PATCH "21" CACHE STRING "Patch version")
|
||||
set(MAXSCALE_VERSION_PATCH "22" CACHE STRING "Patch version")
|
||||
|
||||
# This should only be incremented if a package is rebuilt
|
||||
set(MAXSCALE_BUILD_NUMBER 1 CACHE STRING "Release number")
|
||||
|
@ -480,6 +480,42 @@ public:
|
||||
return &m_i;
|
||||
}
|
||||
|
||||
/**
|
||||
* Advance the iterator
|
||||
*
|
||||
* This provides similar behavior to random access iterators with operator+= but does it in
|
||||
* non-constant time.
|
||||
*
|
||||
* @param i Number of steps to advance the iterator
|
||||
*/
|
||||
void advance(int i)
|
||||
{
|
||||
mxb_assert(m_i != m_end);
|
||||
mxb_assert(i >= 0);
|
||||
|
||||
while (m_i && m_i + i >= m_end)
|
||||
{
|
||||
i -= m_end - m_i;
|
||||
m_pBuffer = m_pBuffer->next;
|
||||
|
||||
if (m_pBuffer)
|
||||
{
|
||||
m_i = GWBUF_DATA(m_pBuffer);
|
||||
m_end = m_i + GWBUF_LENGTH(m_pBuffer);
|
||||
}
|
||||
else
|
||||
{
|
||||
m_i = NULL;
|
||||
m_end = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
if (m_i)
|
||||
{
|
||||
m_i += i;
|
||||
}
|
||||
}
|
||||
|
||||
protected:
|
||||
iterator_base(buf_type pBuffer = NULL)
|
||||
: m_pBuffer(pBuffer)
|
||||
|
@ -438,19 +438,19 @@ void session_put_ref(MXS_SESSION* session);
|
||||
*
|
||||
* @param session Session to convert
|
||||
* @param host Hostname of this server
|
||||
*
|
||||
* @param rdns Attempt reverse DNS on client ip address
|
||||
* @return New JSON object or NULL on error
|
||||
*/
|
||||
json_t* session_to_json(const MXS_SESSION* session, const char* host);
|
||||
json_t* session_to_json(const MXS_SESSION* session, const char* host, bool rdns);
|
||||
|
||||
/**
|
||||
* @brief Convert all sessions to JSON
|
||||
*
|
||||
* @param host Hostname of this server
|
||||
*
|
||||
* @param rdns Attempt reverse DNS on client ip addresses
|
||||
* @return A JSON array with all sessions
|
||||
*/
|
||||
json_t* session_list_to_json(const char* host);
|
||||
json_t* session_list_to_json(const char* host, bool rdns);
|
||||
|
||||
/**
|
||||
* Qualify the session for connection pooling
|
||||
|
@ -409,6 +409,16 @@ module.exports = function() {
|
||||
this.error = function(err) {
|
||||
return Promise.reject(colors.red('Error: ') + err)
|
||||
}
|
||||
|
||||
this.rDnsOption = {
|
||||
shortname: 'rdns',
|
||||
optionOn: 'rdns=true',
|
||||
definition : {
|
||||
describe: 'Reverse DNS on client IP. May slow MaxScale down.',
|
||||
type: 'bool',
|
||||
default: false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
@ -115,9 +115,15 @@ exports.builder = function(yargs) {
|
||||
.command('sessions', 'List sessions', function(yargs) {
|
||||
return yargs.epilog('List all client sessions.')
|
||||
.usage('Usage: list sessions')
|
||||
.group([rDnsOption.shortname], 'Options:')
|
||||
.option(rDnsOption.shortname, rDnsOption.definition)
|
||||
}, function(argv) {
|
||||
maxctrl(argv, function(host) {
|
||||
return getCollection(host, 'sessions',[
|
||||
var resource = 'sessions'
|
||||
if (argv[this.rDnsOption.shortname]) {
|
||||
resource += '?' + this.rDnsOption.optionOn
|
||||
}
|
||||
return getCollection(host, resource,[
|
||||
{'Id': 'id'},
|
||||
{'User': 'attributes.user'},
|
||||
{'Host': 'attributes.remote'},
|
||||
|
@ -174,18 +174,30 @@ exports.builder = function(yargs) {
|
||||
'the session is connected and the `Connection IDs` ' +
|
||||
'field lists the IDs for those connections.')
|
||||
.usage('Usage: show session <session>')
|
||||
.group([rDnsOption.shortname], 'Options:')
|
||||
.option(rDnsOption.shortname, rDnsOption.definition)
|
||||
}, function(argv) {
|
||||
maxctrl(argv, function(host) {
|
||||
return getResource(host, 'sessions/' + argv.session, session_fields)
|
||||
var resource = 'sessions/' + argv.session
|
||||
if (argv[this.rDnsOption.shortname]) {
|
||||
resource += '?' + this.rDnsOption.optionOn
|
||||
}
|
||||
return getResource(host, resource, session_fields)
|
||||
})
|
||||
})
|
||||
.command('sessions', 'Show all sessions', function(yargs) {
|
||||
return yargs.epilog('Show detailed information about all sessions. ' +
|
||||
'See `help show session` for more details.')
|
||||
.usage('Usage: show sessions')
|
||||
.group([rDnsOption.shortname], 'Options:')
|
||||
.option(rDnsOption.shortname, rDnsOption.definition)
|
||||
}, function(argv) {
|
||||
maxctrl(argv, function(host) {
|
||||
return getCollectionAsResource(host, 'sessions/', session_fields)
|
||||
var resource = 'sessions/'
|
||||
if (argv[this.rDnsOption.shortname]) {
|
||||
resource += '?' + this.rDnsOption.optionOn
|
||||
}
|
||||
return getCollectionAsResource(host, resource, session_fields)
|
||||
})
|
||||
})
|
||||
.command('filter <filter>', 'Show filter', function(yargs) {
|
||||
|
@ -108,4 +108,14 @@ inline bool operator!=(const Host& l, const Host& r)
|
||||
{
|
||||
return !(l == r);
|
||||
}
|
||||
|
||||
/**
|
||||
* Perform reverse DNS on an IP address. This may involve network communication so can be slow.
|
||||
*
|
||||
* @param ip IP to convert to hostname
|
||||
* @param output Where to write the output. If operation fails, original IP is written.
|
||||
* @return True on success
|
||||
*/
|
||||
bool reverse_dns(const std::string& ip, std::string* output);
|
||||
|
||||
}
|
||||
|
@ -16,6 +16,8 @@
|
||||
#include <ostream>
|
||||
#include <vector>
|
||||
#include <algorithm>
|
||||
#include <arpa/inet.h>
|
||||
#include <netdb.h>
|
||||
|
||||
namespace
|
||||
{
|
||||
@ -222,4 +224,52 @@ std::istream& operator>>(std::istream& is, Host& host)
|
||||
host = Host(input);
|
||||
return is;
|
||||
}
|
||||
|
||||
bool reverse_dns(const std::string& ip, std::string* output)
|
||||
{
|
||||
sockaddr_storage socket_address;
|
||||
memset(&socket_address, 0, sizeof(socket_address));
|
||||
socklen_t slen = 0;
|
||||
|
||||
if (is_valid_ipv4(ip))
|
||||
{
|
||||
// Casts between the different sockaddr-types should work.
|
||||
int family = AF_INET;
|
||||
auto sa_in = reinterpret_cast<sockaddr_in*>(&socket_address);
|
||||
if (inet_pton(family, ip.c_str(), &sa_in->sin_addr) == 1)
|
||||
{
|
||||
sa_in->sin_family = family;
|
||||
slen = sizeof(sockaddr_in);
|
||||
}
|
||||
}
|
||||
else if (is_valid_ipv6(ip))
|
||||
{
|
||||
int family = AF_INET6;
|
||||
auto sa_in6 = reinterpret_cast<sockaddr_in6*>(&socket_address);
|
||||
if (inet_pton(family, ip.c_str(), &sa_in6->sin6_addr) == 1)
|
||||
{
|
||||
sa_in6->sin6_family = family;
|
||||
slen = sizeof(sockaddr_in6);
|
||||
}
|
||||
}
|
||||
|
||||
bool success = false;
|
||||
if (slen > 0)
|
||||
{
|
||||
char host[NI_MAXHOST];
|
||||
auto sa = reinterpret_cast<sockaddr*>(&socket_address);
|
||||
if (getnameinfo(sa, slen, host, sizeof(host), nullptr, 0, NI_NAMEREQD) == 0)
|
||||
{
|
||||
*output = host;
|
||||
success = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (!success)
|
||||
{
|
||||
*output = ip;
|
||||
}
|
||||
return success;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -2368,7 +2368,7 @@ bool pid_is_maxscale(int pid)
|
||||
|
||||
if (file && std::getline(file, line))
|
||||
{
|
||||
if (line == "maxscale")
|
||||
if (line == "maxscale" && pid != getpid())
|
||||
{
|
||||
rval = true;
|
||||
}
|
||||
|
@ -136,6 +136,11 @@ public:
|
||||
return m_sQuery;
|
||||
}
|
||||
|
||||
timespec time_completed() const
|
||||
{
|
||||
return m_completed;
|
||||
}
|
||||
|
||||
void book_server_response(SERVER* pServer, bool final_response);
|
||||
void book_as_complete();
|
||||
void reset_server_bookkeeping();
|
||||
|
@ -178,6 +178,12 @@ bool Resource::requires_body() const
|
||||
namespace
|
||||
{
|
||||
|
||||
bool option_rdns_is_on(const HttpRequest& request)
|
||||
{
|
||||
return request.get_option("rdns") == "true";
|
||||
}
|
||||
|
||||
|
||||
static bool drop_path_part(std::string& path)
|
||||
{
|
||||
size_t pos = path.find_last_of('/');
|
||||
@ -622,7 +628,8 @@ HttpResponse cb_get_monitor(const HttpRequest& request)
|
||||
|
||||
HttpResponse cb_all_sessions(const HttpRequest& request)
|
||||
{
|
||||
return HttpResponse(MHD_HTTP_OK, session_list_to_json(request.host()));
|
||||
bool rdns = option_rdns_is_on(request);
|
||||
return HttpResponse(MHD_HTTP_OK, session_list_to_json(request.host(), rdns));
|
||||
}
|
||||
|
||||
HttpResponse cb_get_session(const HttpRequest& request)
|
||||
@ -632,7 +639,8 @@ HttpResponse cb_get_session(const HttpRequest& request)
|
||||
|
||||
if (session)
|
||||
{
|
||||
json_t* json = session_to_json(session, request.host());
|
||||
bool rdns = option_rdns_is_on(request);
|
||||
json_t* json = session_to_json(session, request.host(), rdns);
|
||||
session_put_ref(session);
|
||||
return HttpResponse(MHD_HTTP_OK, json);
|
||||
}
|
||||
|
@ -27,6 +27,7 @@
|
||||
#include <sstream>
|
||||
|
||||
#include <maxbase/atomic.hh>
|
||||
#include <maxbase/host.hh>
|
||||
#include <maxscale/alloc.h>
|
||||
#include <maxscale/clock.h>
|
||||
#include <maxscale/dcb.hh>
|
||||
@ -714,7 +715,7 @@ uint64_t session_get_next_id()
|
||||
return mxb::atomic::add(&this_unit.next_session_id, 1, mxb::atomic::RELAXED);
|
||||
}
|
||||
|
||||
json_t* session_json_data(const Session* session, const char* host)
|
||||
json_t* session_json_data(const Session* session, const char* host, bool rdns)
|
||||
{
|
||||
json_t* data = json_object();
|
||||
|
||||
@ -761,7 +762,17 @@ json_t* session_json_data(const Session* session, const char* host)
|
||||
|
||||
if (session->client_dcb->remote)
|
||||
{
|
||||
json_object_set_new(attr, "remote", json_string(session->client_dcb->remote));
|
||||
string result_address;
|
||||
auto remote = session->client_dcb->remote;
|
||||
if (rdns)
|
||||
{
|
||||
maxbase::reverse_dns(remote, &result_address);
|
||||
}
|
||||
else
|
||||
{
|
||||
result_address = remote;
|
||||
}
|
||||
json_object_set_new(attr, "remote", json_string(result_address.c_str()));
|
||||
}
|
||||
|
||||
struct tm result;
|
||||
@ -798,18 +809,26 @@ json_t* session_json_data(const Session* session, const char* host)
|
||||
return data;
|
||||
}
|
||||
|
||||
json_t* session_to_json(const MXS_SESSION* session, const char* host)
|
||||
json_t* session_to_json(const MXS_SESSION* session, const char* host, bool rdns)
|
||||
{
|
||||
stringstream ss;
|
||||
ss << MXS_JSON_API_SESSIONS << session->ses_id;
|
||||
const Session* s = static_cast<const Session*>(session);
|
||||
return mxs_json_resource(host, ss.str().c_str(), session_json_data(s, host));
|
||||
return mxs_json_resource(host, ss.str().c_str(), session_json_data(s, host, rdns));
|
||||
}
|
||||
|
||||
struct SessionListData
|
||||
{
|
||||
json_t* json;
|
||||
const char* host;
|
||||
SessionListData(const char* host, bool rdns)
|
||||
: json(json_array())
|
||||
, host(host)
|
||||
, rdns(rdns)
|
||||
{
|
||||
}
|
||||
|
||||
json_t* json {nullptr};
|
||||
const char* host {nullptr};
|
||||
bool rdns {false};
|
||||
};
|
||||
|
||||
bool seslist_cb(DCB* dcb, void* data)
|
||||
@ -818,15 +837,15 @@ bool seslist_cb(DCB* dcb, void* data)
|
||||
{
|
||||
SessionListData* d = (SessionListData*)data;
|
||||
Session* session = static_cast<Session*>(dcb->session);
|
||||
json_array_append_new(d->json, session_json_data(session, d->host));
|
||||
json_array_append_new(d->json, session_json_data(session, d->host, d->rdns));
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
json_t* session_list_to_json(const char* host)
|
||||
json_t* session_list_to_json(const char* host, bool rdns)
|
||||
{
|
||||
SessionListData data = {json_array(), host};
|
||||
SessionListData data(host, rdns);
|
||||
dcb_foreach(seslist_cb, &data);
|
||||
return mxs_json_resource(host, MXS_JSON_API_SESSIONS, data.json);
|
||||
}
|
||||
@ -1209,6 +1228,10 @@ void Session::dump_statements() const
|
||||
{
|
||||
const QueryInfo& info = *i;
|
||||
GWBUF* pBuffer = info.query().get();
|
||||
timespec ts = info.time_completed();
|
||||
struct tm *tm = localtime(&ts.tv_sec);
|
||||
char timestamp[20];
|
||||
strftime(timestamp, 20, "%Y-%m-%d %H:%M:%S", tm);
|
||||
|
||||
const char* pCmd;
|
||||
char* pStmt;
|
||||
@ -1219,14 +1242,14 @@ void Session::dump_statements() const
|
||||
{
|
||||
if (id != 0)
|
||||
{
|
||||
MXS_NOTICE("Stmt %d: %.*s", n, len, pStmt);
|
||||
MXS_NOTICE("Stmt %d(%s): %.*s", n, timestamp, len, pStmt);
|
||||
}
|
||||
else
|
||||
{
|
||||
// We are in a context where we do not have a current session, so we need to
|
||||
// log the session id ourselves.
|
||||
|
||||
MXS_NOTICE("(%" PRIu64 ") Stmt %d: %.*s", ses_id, n, len, pStmt);
|
||||
MXS_NOTICE("(%" PRIu64 ") Stmt %d(%s): %.*s", ses_id, n, timestamp, len, pStmt);
|
||||
}
|
||||
|
||||
if (deallocate)
|
||||
|
@ -277,37 +277,41 @@ void GaleraMonitor::update_server_status(MonitorServer* monitored_server)
|
||||
/* Node is in desync - lets take it offline */
|
||||
if (strcmp(row[0], "wsrep_desync") == 0)
|
||||
{
|
||||
if (strcasecmp(row[1],"YES") || strcasecmp(row[1],"ON") || strcasecmp(row[1],"1") || strcasecmp(row[1],"true"))
|
||||
{
|
||||
info.joined = 0;
|
||||
}
|
||||
if (strcasecmp(row[1], "YES") == 0 || strcasecmp(row[1], "ON") == 0
|
||||
|| strcasecmp(row[1], "1") == 0 || strcasecmp(row[1], "true") == 0)
|
||||
{
|
||||
info.joined = 0;
|
||||
}
|
||||
}
|
||||
|
||||
/* Node rejects queries - lets take it offline */
|
||||
if (strcmp(row[0], "wsrep_reject_queries") == 0)
|
||||
{
|
||||
if (strcasecmp(row[1],"ALL") || strcasecmp(row[1],"ALL_KILL"))
|
||||
{
|
||||
info.joined = 0;
|
||||
}
|
||||
if (strcasecmp(row[1], "ALL") == 0
|
||||
|| strcasecmp(row[1], "ALL_KILL") == 0)
|
||||
{
|
||||
info.joined = 0;
|
||||
}
|
||||
}
|
||||
|
||||
/* Node rejects queries - lets take it offline */
|
||||
if (strcmp(row[0], "wsrep_sst_donor_rejects_queries") == 0)
|
||||
{
|
||||
if (strcasecmp(row[1],"YES") || strcasecmp(row[1],"ON") || strcasecmp(row[1],"1") || strcasecmp(row[1],"true"))
|
||||
{
|
||||
info.joined = 0;
|
||||
}
|
||||
if (strcasecmp(row[1], "YES") == 0 || strcasecmp(row[1], "ON") == 0
|
||||
|| strcasecmp(row[1], "1") == 0 || strcasecmp(row[1], "true") == 0)
|
||||
{
|
||||
info.joined = 0;
|
||||
}
|
||||
}
|
||||
|
||||
/* Node is not ready - lets take it offline */
|
||||
if (strcmp(row[0], "wsrep_ready") == 0)
|
||||
{
|
||||
if (strcasecmp(row[1],"NO") || strcasecmp(row[1],"OFF") || strcasecmp(row[1],"0") || strcasecmp(row[1],"false"))
|
||||
{
|
||||
info.joined = 0;
|
||||
}
|
||||
if (strcasecmp(row[1], "NO") == 0 || strcasecmp(row[1], "OFF") == 0
|
||||
|| strcasecmp(row[1], "0") == 0 || strcasecmp(row[1], "false") == 0)
|
||||
{
|
||||
info.joined = 0;
|
||||
}
|
||||
}
|
||||
|
||||
if (strcmp(row[0], "wsrep_cluster_state_uuid") == 0 && row[1] && *row[1])
|
||||
|
@ -204,19 +204,19 @@ Iter skip_encoded_int(Iter it)
|
||||
switch (*it)
|
||||
{
|
||||
case 0xfc:
|
||||
std::advance(it, 3);
|
||||
it.advance(3);
|
||||
break;
|
||||
|
||||
case 0xfd:
|
||||
std::advance(it, 4);
|
||||
it.advance(4);
|
||||
break;
|
||||
|
||||
case 0xfe:
|
||||
std::advance(it, 9);
|
||||
it.advance(9);
|
||||
break;
|
||||
|
||||
default:
|
||||
std::advance(it, 1);
|
||||
++it;
|
||||
break;
|
||||
}
|
||||
|
||||
@ -301,7 +301,8 @@ void RWBackend::process_packets(GWBUF* result)
|
||||
len |= (*it++) << 16;
|
||||
++it; // Skip the sequence
|
||||
mxb_assert(it != buffer.end());
|
||||
auto end = std::next(it, len);
|
||||
auto end = it;
|
||||
end.advance(len);
|
||||
uint8_t cmd = *it;
|
||||
|
||||
// Ignore the tail end of a large packet large packet. Only resultsets can generate packets this large
|
||||
|
@ -243,7 +243,7 @@ bool file_in_dir(const char* dir, const char* file)
|
||||
*/
|
||||
void AvroSession::queue_client_callback()
|
||||
{
|
||||
auto worker = mxs::RoutingWorker::get(mxs::RoutingWorker::MAIN);
|
||||
auto worker = static_cast<mxs::RoutingWorker*>(dcb->owner);
|
||||
worker->execute([this]() {
|
||||
client_callback();
|
||||
}, mxs::RoutingWorker::EXECUTE_QUEUED);
|
||||
|
@ -46,6 +46,7 @@
|
||||
#include <maxscale/service.hh>
|
||||
#include <maxscale/utils.h>
|
||||
#include <maxscale/version.h>
|
||||
#include <maxscale/routingworker.hh>
|
||||
|
||||
using std::string;
|
||||
using std::vector;
|
||||
@ -241,7 +242,7 @@ static int blr_slave_send_columndef_with_status_schema(ROUTER_INSTANCE* router,
|
||||
int len,
|
||||
uint8_t seqno);
|
||||
static bool blr_send_slave_heartbeat(void* inst);
|
||||
static int blr_slave_send_heartbeat(ROUTER_INSTANCE* router,
|
||||
static void blr_slave_send_heartbeat(ROUTER_INSTANCE* router,
|
||||
ROUTER_SLAVE* slave);
|
||||
static int blr_set_master_ssl(ROUTER_INSTANCE* router,
|
||||
const ChangeMasterConfig& config,
|
||||
@ -6204,13 +6205,11 @@ static bool blr_send_slave_heartbeat(void* inst)
|
||||
sptr->heartbeat,
|
||||
(unsigned long)sptr->lastReply);
|
||||
|
||||
if (blr_slave_send_heartbeat(router, sptr))
|
||||
{
|
||||
/* Set last event */
|
||||
sptr->lastEventReceived = HEARTBEAT_EVENT;
|
||||
/* Set last time */
|
||||
sptr->lastReply = t_now;
|
||||
}
|
||||
blr_slave_send_heartbeat(router, sptr);
|
||||
/* Set last event */
|
||||
sptr->lastEventReceived = HEARTBEAT_EVENT;
|
||||
/* Set last time */
|
||||
sptr->lastReply = t_now;
|
||||
}
|
||||
|
||||
sptr = sptr->next;
|
||||
@ -6228,7 +6227,7 @@ static bool blr_send_slave_heartbeat(void* inst)
|
||||
* @param slave The current slave connection
|
||||
* @return Number of bytes sent or 0 in case of failure
|
||||
*/
|
||||
static int blr_slave_send_heartbeat(ROUTER_INSTANCE* router, ROUTER_SLAVE* slave)
|
||||
static void send_heartbeat(ROUTER_INSTANCE* router, ROUTER_SLAVE* slave)
|
||||
{
|
||||
REP_HEADER hdr;
|
||||
GWBUF* h_event;
|
||||
@ -6255,10 +6254,7 @@ static int blr_slave_send_heartbeat(ROUTER_INSTANCE* router, ROUTER_SLAVE* slave
|
||||
*
|
||||
* Total = 5 bytes + len
|
||||
*/
|
||||
if ((h_event = gwbuf_alloc(MYSQL_HEADER_LEN + 1 + len)) == NULL)
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
h_event = gwbuf_alloc(MYSQL_HEADER_LEN + 1 + len);
|
||||
|
||||
/* The OK/Err byte is part of payload */
|
||||
hdr.payload_len = len + 1;
|
||||
@ -6306,7 +6302,18 @@ static int blr_slave_send_heartbeat(ROUTER_INSTANCE* router, ROUTER_SLAVE* slave
|
||||
}
|
||||
|
||||
/* Write the packet */
|
||||
return MXS_SESSION_ROUTE_REPLY(slave->dcb->session, h_event);
|
||||
mxs::RoutingWorker* worker = static_cast<mxs::RoutingWorker*>(slave->dcb->owner);
|
||||
worker->execute([slave, h_event]() {
|
||||
MXS_SESSION_ROUTE_REPLY(slave->dcb->session, h_event);
|
||||
}, mxs::RoutingWorker::EXECUTE_AUTO);
|
||||
}
|
||||
|
||||
static void blr_slave_send_heartbeat(ROUTER_INSTANCE* router, ROUTER_SLAVE* slave)
|
||||
{
|
||||
mxs::RoutingWorker* worker = static_cast<mxs::RoutingWorker*>(slave->dcb->owner);
|
||||
worker->execute([router, slave]() {
|
||||
send_heartbeat(router, slave);
|
||||
}, mxs::RoutingWorker::EXECUTE_AUTO);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -13,7 +13,9 @@
|
||||
|
||||
#include "rwsplitsession.hh"
|
||||
|
||||
#include <algorithm>
|
||||
#include <cmath>
|
||||
#include <vector>
|
||||
|
||||
#include <maxscale/modutil.hh>
|
||||
#include <maxscale/poll.hh>
|
||||
@ -569,81 +571,36 @@ void RWSplitSession::close_stale_connections()
|
||||
namespace
|
||||
{
|
||||
|
||||
// TODO: It is not OK that knowledge about Clustrix is embedded into RWS.
|
||||
// TODO: The capacity for recovery should be abstracted into SERVER, of
|
||||
// TODO: which there then would be backend specific concrete specializations.
|
||||
|
||||
const int CLUSTRIX_ERROR_CODE = 1;
|
||||
|
||||
// NOTE: Keep these alphabetically ordered!
|
||||
const char CLUSTRIX_ERROR_1[] = "[16389] Group change during GTM operation";
|
||||
|
||||
const struct ClustrixError
|
||||
{
|
||||
const void* message;
|
||||
int len;
|
||||
|
||||
bool operator == (const ClustrixError& rhs) const
|
||||
{
|
||||
return len == rhs.len && memcmp(message, rhs.message, len) == 0;
|
||||
}
|
||||
|
||||
bool operator < (const ClustrixError& rhs) const
|
||||
{
|
||||
int rv = memcmp(message, rhs.message, std::min(len, rhs.len));
|
||||
|
||||
if (rv == 0)
|
||||
{
|
||||
rv = len - rhs.len;
|
||||
}
|
||||
|
||||
return rv < 0 ? true : false;
|
||||
}
|
||||
} clustrix_errors[] =
|
||||
{
|
||||
{ CLUSTRIX_ERROR_1, sizeof(CLUSTRIX_ERROR_1) - 1 }
|
||||
};
|
||||
|
||||
const int nClustrix_errors = sizeof(clustrix_errors) / sizeof(clustrix_errors[0]);
|
||||
|
||||
bool is_manageable_clustrix_error(GWBUF* writebuf)
|
||||
bool is_transaction_rollback(GWBUF* writebuf)
|
||||
{
|
||||
bool rv = false;
|
||||
|
||||
if (MYSQL_IS_ERROR_PACKET(GWBUF_DATA(writebuf)))
|
||||
{
|
||||
uint8_t* pData = GWBUF_DATA(writebuf);
|
||||
uint8_t data[MYSQL_HEADER_LEN + MYSQL_GET_PAYLOAD_LEN(pData)];
|
||||
mxs::Buffer buffer(writebuf);
|
||||
auto it = buffer.begin();
|
||||
it.advance(MYSQL_HEADER_LEN + 1 + 2 + 1);
|
||||
|
||||
if (!GWBUF_IS_CONTIGUOUS(writebuf))
|
||||
if (*it++ == '4' && *it == '0')
|
||||
{
|
||||
gwbuf_copy_data(writebuf, 0, sizeof(data), data);
|
||||
pData = data;
|
||||
}
|
||||
rv = true;
|
||||
|
||||
uint16_t code = MYSQL_GET_ERRCODE(pData);
|
||||
|
||||
if (code == CLUSTRIX_ERROR_CODE)
|
||||
{
|
||||
// May be a recoverable error.
|
||||
uint8_t* pMessage;
|
||||
uint16_t nMessage;
|
||||
extract_error_message(pData, &pMessage, &nMessage);
|
||||
|
||||
if (std::binary_search(clustrix_errors, clustrix_errors + nClustrix_errors,
|
||||
ClustrixError { pMessage, nMessage }))
|
||||
if (mxb_log_is_priority_enabled(LOG_INFO))
|
||||
{
|
||||
if (mxb_log_is_priority_enabled(LOG_INFO))
|
||||
{
|
||||
char message[nMessage + 1];
|
||||
memcpy(message, pMessage, nMessage);
|
||||
message[nMessage] = 0;
|
||||
// it now points at the second byte of the 5 byte long 'sql_state'.
|
||||
it.advance(4); // And now at the start of the human readable error message.
|
||||
|
||||
MXS_INFO("A recoverable Clustrix error: %s", message);
|
||||
}
|
||||
rv = true;
|
||||
auto end = buffer.begin();
|
||||
end.advance(MYSQL_HEADER_LEN + MYSQL_GET_PAYLOAD_LEN(GWBUF_DATA(writebuf)));
|
||||
mxb_assert(end == buffer.end());
|
||||
|
||||
std::string message(it, end);
|
||||
|
||||
MXS_INFO("Transaction rollback, transaction can be retried: %s", message.c_str());
|
||||
}
|
||||
}
|
||||
|
||||
buffer.release();
|
||||
}
|
||||
|
||||
return rv;
|
||||
@ -689,7 +646,7 @@ void RWSplitSession::clientReply(GWBUF* writebuf, DCB* backend_dcb)
|
||||
|
||||
backend->process_reply(writebuf);
|
||||
|
||||
if (m_config.transaction_replay && is_manageable_clustrix_error(writebuf))
|
||||
if (m_config.transaction_replay && is_transaction_rollback(writebuf))
|
||||
{
|
||||
// writebuf was an error that can be handled by replaying the transaction.
|
||||
m_expected_responses--;
|
||||
|
Loading…
x
Reference in New Issue
Block a user