MXS-827: Add connection keepalive

The readwritesplit now sends COM_PING queries to backend servers that have
been idle for too long. The option is configured with the
`connection_keepalive` parameter.
This commit is contained in:
Markus Mäkelä
2017-03-24 05:22:25 +02:00
parent e3bae59e1a
commit 710012ac5d
10 changed files with 125 additions and 14 deletions

View File

@ -39,6 +39,12 @@ for more details about this new parameter.
MaxScale now supports IPv6 connections on both the client and backend side as MaxScale now supports IPv6 connections on both the client and backend side as
well as being able to listen on IPv6 addresses. well as being able to listen on IPv6 addresses.
### ReadWriteSplit Connection Keepalive
The _readwritesplit_ module now implements a `connection_keepalive`
feature which allows sending of keepalive pings to idle connections. For
more details, read the [ReadWriteSplit documentation](../Routers/ReadWriteSplit.md).
## Bug fixes ## Bug fixes
[Here is a list of bugs fixed since the release of MaxScale 2.1.1.](https://jira.mariadb.org/issues/?jql=project%20%3D%20MXS%20AND%20issuetype%20%3D%20Bug%20AND%20resolution%20in%20(Fixed%2C%20Done)%20AND%20fixVersion%20%3D%202.1.2%20AND%20fixVersion%20NOT%20IN%20(2.1.1)) [Here is a list of bugs fixed since the release of MaxScale 2.1.1.](https://jira.mariadb.org/issues/?jql=project%20%3D%20MXS%20AND%20issuetype%20%3D%20Bug%20AND%20resolution%20in%20(Fixed%2C%20Done)%20AND%20fixVersion%20%3D%202.1.2%20AND%20fixVersion%20NOT%20IN%20(2.1.1))

View File

@ -65,6 +65,22 @@ disable_sescmd_history=true
master_failure_mode=fail_on_write master_failure_mode=fail_on_write
``` ```
### `connection_keepalive`
Send keepalive pings to backend servers. This feature was introduced in MaxScale
2.1.2 and is disabled by default.
The parameter value is the interval in seconds between each keepalive ping. A
keepalive ping will be sent to a backend server if the connection is idle and it
has not been used within `n` seconds where `n` is greater than or equal to the
value of _connection_keepalive_. The keepalive pings are only sent when the
client executes a query.
This functionality allows the readwritesplit module to keep all backend
connections alive even if they are not used. This is a common problem if the
backend servers have a low _wait_timeout_ value and the client connections live
for a long time.
## Router options ## Router options
**`router_options`** may include multiple **readwritesplit**-specific options. All the options are parameter-value pairs. All parameters listed in this section must be configured as a value in `router_options`. **`router_options`** may include multiple **readwritesplit**-specific options. All the options are parameter-value pairs. All parameters listed in this section must be configured as a value in `router_options`.

View File

@ -58,7 +58,8 @@ typedef enum
GWBUF_TYPE_SESCMD_RESPONSE = 0x08, GWBUF_TYPE_SESCMD_RESPONSE = 0x08,
GWBUF_TYPE_RESPONSE_END = 0x10, GWBUF_TYPE_RESPONSE_END = 0x10,
GWBUF_TYPE_SESCMD = 0x20, GWBUF_TYPE_SESCMD = 0x20,
GWBUF_TYPE_HTTP = 0x40 GWBUF_TYPE_HTTP = 0x40,
GWBUF_TYPE_IGNORABLE = 0x80
} gwbuf_type_t; } gwbuf_type_t;
#define GWBUF_IS_TYPE_UNDEFINED(b) (b->gwbuf_type == 0) #define GWBUF_IS_TYPE_UNDEFINED(b) (b->gwbuf_type == 0)
@ -68,6 +69,7 @@ typedef enum
#define GWBUF_IS_TYPE_SESCMD_RESPONSE(b) (b->gwbuf_type & GWBUF_TYPE_SESCMD_RESPONSE) #define GWBUF_IS_TYPE_SESCMD_RESPONSE(b) (b->gwbuf_type & GWBUF_TYPE_SESCMD_RESPONSE)
#define GWBUF_IS_TYPE_RESPONSE_END(b) (b->gwbuf_type & GWBUF_TYPE_RESPONSE_END) #define GWBUF_IS_TYPE_RESPONSE_END(b) (b->gwbuf_type & GWBUF_TYPE_RESPONSE_END)
#define GWBUF_IS_TYPE_SESCMD(b) (b->gwbuf_type & GWBUF_TYPE_SESCMD) #define GWBUF_IS_TYPE_SESCMD(b) (b->gwbuf_type & GWBUF_TYPE_SESCMD)
#define GWBUF_IS_IGNORABLE(b) (b->gwbuf_type & GWBUF_TYPE_IGNORABLE)
/** /**
* A structure to encapsulate the data in a form that the data itself can be * A structure to encapsulate the data in a form that the data itself can be

View File

@ -76,6 +76,14 @@ mxs_pcre2_result_t modutil_mysql_wildcard_match(const char* pattern, const char*
*/ */
char* modutil_MySQL_bypass_whitespace(char* sql, size_t len); char* modutil_MySQL_bypass_whitespace(char* sql, size_t len);
/**
* @brief Write a COM_PING to a DCB and ignore the response
*
* @param dcb The backend DCB where the COM_PING is written
* @return True if command was successfully sent
*/
bool modutil_ignorable_ping(DCB *dcb);
/** Character and token searching functions */ /** Character and token searching functions */
char* strnchr_esc(char* ptr, char c, int len); char* strnchr_esc(char* ptr, char c, int len);
char* strnchr_esc_mysql(char* ptr, char c, int len); char* strnchr_esc_mysql(char* ptr, char c, int len);

View File

@ -305,7 +305,7 @@ typedef struct
uint32_t extra_capabilities; /*< MariaDB 10.2 capabilities */ uint32_t extra_capabilities; /*< MariaDB 10.2 capabilities */
unsigned long tid; /*< MySQL Thread ID, in handshake */ unsigned long tid; /*< MySQL Thread ID, in handshake */
unsigned int charset; /*< MySQL character set at connect time */ unsigned int charset; /*< MySQL character set at connect time */
bool ignore_reply; /*< If the reply should be discarded */ int ignore_replies; /*< How many replies should be discarded */
GWBUF* stored_query; /*< Temporarily stored queries */ GWBUF* stored_query; /*< Temporarily stored queries */
#if defined(SS_DEBUG) #if defined(SS_DEBUG)
skygw_chk_t protocol_chk_tail; skygw_chk_t protocol_chk_tail;

View File

@ -1321,3 +1321,26 @@ char* modutil_MySQL_bypass_whitespace(char* sql, size_t len)
return i; return i;
} }
bool modutil_ignorable_ping(DCB *dcb)
{
static uint8_t com_ping_packet[] =
{
0x01, 0x00, 0x00, 0x00, 0x0e
};
GWBUF *buf = gwbuf_alloc_and_load(sizeof(com_ping_packet), com_ping_packet);
bool rval = false;
if (buf)
{
gwbuf_set_type(buf, GWBUF_TYPE_IGNORABLE);
if (dcb->func.write(dcb, buf))
{
rval = true;
}
}
return rval;
}

View File

@ -707,17 +707,21 @@ gw_read_and_write(DCB *dcb)
MySQLProtocol *proto = (MySQLProtocol *)dcb->protocol; MySQLProtocol *proto = (MySQLProtocol *)dcb->protocol;
if (proto->ignore_reply) if (proto->ignore_replies > 0)
{ {
/**
/** The reply to a COM_CHANGE_USER is in packet */ * The reply to an ignorable command is in the packet. Extract the
GWBUF *query = proto->stored_query; * response type and discard the response.
uint8_t result = *((uint8_t*)GWBUF_DATA(read_buffer) + 4); */
proto->stored_query = NULL; uint8_t result;
proto->ignore_reply = false; gwbuf_copy_data(read_buffer, MYSQL_HEADER_LEN, 1, &result);
proto->ignore_replies--;
ss_dassert(proto->ignore_replies >= 0);
gwbuf_free(read_buffer); gwbuf_free(read_buffer);
int rval = 0; int rval = 0;
GWBUF *query = proto->stored_query;
proto->stored_query = NULL;
if (result == MYSQL_REPLY_OK) if (result == MYSQL_REPLY_OK)
{ {
@ -725,8 +729,11 @@ gw_read_and_write(DCB *dcb)
} }
else if (query) else if (query)
{ {
/** The COM_CHANGE USER failed, generate a fake hangup event to /**
* close the DCB and send an error to the client. */ * The ignorable command failed when we had a queued query from the
* client. Generate a fake hangup event to close the DCB and send
* an error to the client.
*/
gwbuf_free(query); gwbuf_free(query);
poll_fake_hangup_event(dcb); poll_fake_hangup_event(dcb);
} }
@ -884,13 +891,14 @@ static int gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue)
gwbuf_free(backend_protocol->stored_query); gwbuf_free(backend_protocol->stored_query);
} }
dcb->was_persistent = false; dcb->was_persistent = false;
backend_protocol->ignore_reply = true; backend_protocol->ignore_replies++;
ss_dassert(backend_protocol->ignore_replies > 0);
backend_protocol->stored_query = queue; backend_protocol->stored_query = queue;
GWBUF *buf = gw_create_change_user_packet(dcb->session->client_dcb->data, dcb->protocol); GWBUF *buf = gw_create_change_user_packet(dcb->session->client_dcb->data, dcb->protocol);
return dcb_write(dcb, buf) ? 1 : 0; return dcb_write(dcb, buf) ? 1 : 0;
} }
else if (backend_protocol->ignore_reply) else if (backend_protocol->ignore_replies > 0)
{ {
if (MYSQL_IS_COM_QUIT((uint8_t*)GWBUF_DATA(queue))) if (MYSQL_IS_COM_QUIT((uint8_t*)GWBUF_DATA(queue)))
{ {
@ -974,6 +982,13 @@ static int gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue)
} }
else else
{ {
if (GWBUF_IS_IGNORABLE(queue))
{
/** The response to this command should be ignored */
backend_protocol->ignore_replies++;
ss_dassert(backend_protocol->ignore_replies > 0);
}
/** Write to backend */ /** Write to backend */
rc = dcb_write(dcb, queue); rc = dcb_write(dcb, queue);
} }

View File

@ -196,6 +196,7 @@ MXS_MODULE *MXS_CREATE_MODULE()
{"max_sescmd_history", MXS_MODULE_PARAM_COUNT, "0"}, {"max_sescmd_history", MXS_MODULE_PARAM_COUNT, "0"},
{"strict_multi_stmt", MXS_MODULE_PARAM_BOOL, "true"}, {"strict_multi_stmt", MXS_MODULE_PARAM_BOOL, "true"},
{"master_accept_reads", MXS_MODULE_PARAM_BOOL, "false"}, {"master_accept_reads", MXS_MODULE_PARAM_BOOL, "false"},
{"connection_keepalive", MXS_MODULE_PARAM_COUNT, "0"},
{MXS_END_MODULE_PARAMS} {MXS_END_MODULE_PARAMS}
} }
}; };
@ -278,6 +279,7 @@ static MXS_ROUTER *createInstance(SERVICE *service, char **options)
router->rwsplit_config.disable_sescmd_history = config_get_bool(params, "disable_sescmd_history"); router->rwsplit_config.disable_sescmd_history = config_get_bool(params, "disable_sescmd_history");
router->rwsplit_config.max_sescmd_history = config_get_integer(params, "max_sescmd_history"); router->rwsplit_config.max_sescmd_history = config_get_integer(params, "max_sescmd_history");
router->rwsplit_config.master_accept_reads = config_get_bool(params, "master_accept_reads"); router->rwsplit_config.master_accept_reads = config_get_bool(params, "master_accept_reads");
router->rwsplit_config.connection_keepalive = config_get_integer(params, "connection_keepalive");
if (!handle_max_slaves(router, config_get_string(params, "max_slave_connections")) || if (!handle_max_slaves(router, config_get_string(params, "max_slave_connections")) ||
(options && !rwsplit_process_router_options(router, options))) (options && !rwsplit_process_router_options(router, options)))

View File

@ -275,6 +275,8 @@ typedef struct rwsplit_config_st
enum failure_mode master_failure_mode; /**< Master server failure handling mode. enum failure_mode master_failure_mode; /**< Master server failure handling mode.
* @see enum failure_mode */ * @see enum failure_mode */
bool retry_failed_reads; /**< Retry failed reads on other servers */ bool retry_failed_reads; /**< Retry failed reads on other servers */
int connection_keepalive; /**< Send pings to servers that have
* been idle for too long */
} rwsplit_config_t; } rwsplit_config_t;
#if defined(PREP_STMT_CACHING) #if defined(PREP_STMT_CACHING)

View File

@ -19,9 +19,11 @@
#include <stdlib.h> #include <stdlib.h>
#include <stdint.h> #include <stdint.h>
#include <maxscale/alloc.h> #include <maxscale/alloc.h>
#include <maxscale/router.h> #include <maxscale/router.h>
#include <maxscale/modutil.h>
#include "rwsplit_internal.h" #include "rwsplit_internal.h"
/** /**
* @file rwsplit_route_stmt.c The functions that support the routing of * @file rwsplit_route_stmt.c The functions that support the routing of
* queries to back end servers. All the functions in this module are internal * queries to back end servers. All the functions in this module are internal
@ -44,6 +46,35 @@ static backend_ref_t *check_candidate_bref(backend_ref_t *cand,
select_criteria_t sc); select_criteria_t sc);
static backend_ref_t *get_root_master_bref(ROUTER_CLIENT_SES *rses); static backend_ref_t *get_root_master_bref(ROUTER_CLIENT_SES *rses);
void handle_connection_keepalive(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
DCB *target_dcb)
{
ss_dassert(target_dcb);
ss_debug(int nserv = 0);
for (int i = 0; i < rses->rses_nbackends; i++)
{
/** Each heartbeat is 1/10th of a second */
int keepalive = inst->rwsplit_config.connection_keepalive * 10;
backend_ref_t *bref = &rses->rses_backend_ref[i];
if (bref->bref_dcb != target_dcb && BREF_IS_IN_USE(bref) &&
!BREF_IS_WAITING_RESULT(bref))
{
ss_debug(nserv++);
int diff = hkheartbeat - bref->bref_dcb->last_read;
if (diff > keepalive)
{
MXS_INFO("Pinging %s, idle for %d seconds",
bref->bref_dcb->server->unique_name, diff / 10);
modutil_ignorable_ping(bref->bref_dcb);
}
}
}
ss_dassert(nserv < rses->rses_nbackends);
}
/** /**
* Routing function. Find out query type, backend type, and target DCB(s). * Routing function. Find out query type, backend type, and target DCB(s).
* Then route query to found target(s). * Then route query to found target(s).
@ -148,6 +179,12 @@ bool route_single_stmt(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
} }
} }
if (succp && inst->rwsplit_config.connection_keepalive &&
(TARGET_IS_SLAVE(route_target) || TARGET_IS_MASTER(route_target)))
{
handle_connection_keepalive(inst, rses, target_dcb);
}
return succp; return succp;
} /* route_single_stmt */ } /* route_single_stmt */