KILL [CONNECTION | QUERY] support, part3

The text-version of "KILL CONNECTION" command is now supported. To keep the
overhead low, only minimal parsing is done on the query. The query
needs to start in the beginning of the mysql-packet, have no comments
and have limited whitespace as the total length of the query is limited.
Both "KILL 123" and "KILL CONNECTION 123" are accepted.

"KILL QUERY 123" is also accepted but not acted on, as it requires larger
changes.
This commit is contained in:
Esa Korhonen
2017-05-04 17:51:10 +03:00
parent aebe839990
commit a0cd067a03

View File

@ -14,6 +14,11 @@
#define MXS_MODULE_NAME "MySQLClient"
#include <inttypes.h>
#include <limits.h>
#include <netinet/tcp.h>
#include <sys/stat.h>
#include <maxscale/protocol.h>
#include <maxscale/alloc.h>
#include <maxscale/log_manager.h>
@ -21,14 +26,33 @@
#include <maxscale/ssl.h>
#include <maxscale/poll.h>
#include <maxscale/modinfo.h>
#include <sys/stat.h>
#include <maxscale/modutil.h>
#include <netinet/tcp.h>
#include <maxscale/query_classifier.h>
#include <maxscale/authenticator.h>
#include <maxscale/session.h>
#include <maxscale/worker.h>
/** Return type of process_special_commands() */
typedef enum spec_com_res_t
{
RES_CONTINUE, // No special command detected, proceed as normal.
RES_END, // Query handling completed, do not send to filters/router.
RES_MORE_DATA // Possible special command, but not enough data to be sure. Must
// wait for more data.
} spec_com_res_t;
/* Type of the kill-command sent by client. */
typedef enum kill_type
{
KT_CONNECTION,
KT_QUERY
} kill_type_t;
/* Limits on the length of the queries in which "KILL" is searched for. Reducing
* LONGEST_KILL will reduce overhead but also limit the range of accepted queries. */
const int SHORTEST_KILL = sizeof("KILL 1") - 1;
const int LONGEST_KILL = sizeof("KILL CONNECTION 12345678901234567890 ;");
static int process_init(void);
static void process_finish(void);
static int thread_init(void);
@ -52,11 +76,10 @@ static int gw_read_normal_data(DCB *dcb, GWBUF *read_buffer, int nbytes_read);
static int gw_read_finish_processing(DCB *dcb, GWBUF *read_buffer, uint64_t capabilities);
static bool ensure_complete_packet(DCB *dcb, GWBUF **read_buffer, int nbytes_read);
static void gw_process_one_new_client(DCB *client_dcb);
static bool process_special_commands(DCB* client_dcb, GWBUF *read_buffer, int nbytes_read);
/*
* The "module object" for the mysqld client protocol module.
*/
static spec_com_res_t process_special_commands(DCB *client_dcb, GWBUF *read_buffer, int nbytes_read);
static spec_com_res_t handle_query_kill(DCB* dcb, GWBUF* read_buffer, spec_com_res_t current,
bool is_complete, unsigned int packet_len);
static uint64_t parse_kill_query(char *query, kill_type_t *kt_out);
/**
* The module entry point routine. It is this routine that
@ -908,12 +931,29 @@ gw_read_normal_data(DCB *dcb, GWBUF *read_buffer, int nbytes_read)
}
}
if (!process_special_commands(dcb, read_buffer, nbytes_read))
spec_com_res_t res = process_special_commands(dcb, read_buffer, nbytes_read);
int rval = 1;
switch (res)
{
return 0;
}
case RES_MORE_DATA:
dcb->dcb_readqueue = read_buffer;
rval = 0;
break;
return gw_read_finish_processing(dcb, read_buffer, capabilities);
case RES_END:
// Do not send this packet for routing
gwbuf_free(read_buffer);
rval = 0;
break;
case RES_CONTINUE:
rval = gw_read_finish_processing(dcb, read_buffer, capabilities);
break;
default:
ss_dassert(!true);
}
return rval;
}
/**
@ -1484,14 +1524,23 @@ static bool ensure_complete_packet(DCB *dcb, GWBUF **read_buffer, int nbytes_rea
/**
* Some SQL commands/queries need to be detected and handled by the protocol
* and MaxScale instead of being routed forward as is.
*
* @param dcb Client dcb
* @param read_buffer the current read buffer
* @param nbytes_read How many bytes were read
* @return true if read buffer should be sent forward to routing, false if more
* data is required or processing is complete
* @return see @c spec_com_res_t
*/
static bool process_special_commands(DCB* dcb, GWBUF *read_buffer, int nbytes_read)
static spec_com_res_t process_special_commands(DCB *dcb, GWBUF *read_buffer, int nbytes_read)
{
spec_com_res_t rval = RES_CONTINUE;
bool is_complete = false;
unsigned int packet_len =
MYSQL_GET_PAYLOAD_LEN((uint8_t *)GWBUF_DATA(read_buffer)) + MYSQL_HEADER_LEN;
if (gwbuf_length(read_buffer) == packet_len)
{
is_complete = true;
}
/**
* Handle COM_SET_OPTION. This seems to be only used by some versions of PHP.
*
@ -1516,32 +1565,226 @@ static bool process_special_commands(DCB* dcb, GWBUF *read_buffer, int nbytes_re
/**
* Handle COM_PROCESS_KILL
*/
else if ((proto->current_command == MYSQL_COM_PROCESS_KILL))
else if (proto->current_command == MYSQL_COM_PROCESS_KILL)
{
/* Make sure we have a complete SQL packet before trying to read the
* process id. If not, try again next time. */
unsigned int expected_len =
MYSQL_GET_PAYLOAD_LEN((uint8_t *)GWBUF_DATA(read_buffer)) + MYSQL_HEADER_LEN;
if (gwbuf_length(read_buffer) < expected_len)
if (!is_complete)
{
dcb->dcb_readqueue = read_buffer;
return false;
rval = RES_MORE_DATA;
}
else
{
uint8_t bytes[4];
if (gwbuf_copy_data(read_buffer, MYSQL_HEADER_LEN + 1, sizeof(bytes), (uint8_t*)bytes)
if (gwbuf_copy_data(read_buffer, MYSQL_HEADER_LEN + 1, sizeof(bytes), bytes)
== sizeof(bytes))
{
uint64_t process_id = gw_mysql_get_byte4(bytes);
// Do not send this packet for routing
gwbuf_free(read_buffer);
session_broadcast_kill_command(dcb->session, process_id);
// Even if id not found, send ok. TODO: send a correct response to client
mxs_mysql_send_ok(dcb, 1, 0, NULL);
return false;
rval = RES_END;
}
}
}
return true;
}
else if (proto->current_command == MYSQL_COM_QUERY)
{
/* Is length within limits for a kill-type query? */
if (packet_len >= (MYSQL_HEADER_LEN + 1 + SHORTEST_KILL) &&
packet_len <= (MYSQL_HEADER_LEN + 1 + LONGEST_KILL))
{
rval = handle_query_kill(dcb, read_buffer, rval, is_complete, packet_len);
}
}
return rval;
}
/**
* Handle text version of KILL [CONNECTION | QUERY] <process_id>. Only detects
* commands in the beginning of the packet and with no comments.
* Increased parsing would slow down the handling of every single query.
*
* @param dcb Client dcb
* @param read_buffer Input buffer
* @param current Latest value of rval in calling function
* @param is_complete Is read_buffer a complete sql packet
* @param packet_len Read from sql header
* @return Updated (or old) value of rval
*/
spec_com_res_t handle_query_kill(DCB* dcb, GWBUF* read_buffer, spec_com_res_t current,
bool is_complete, unsigned int packet_len)
{
spec_com_res_t rval = current;
/* First, we need to detect the text "KILL" (ignorecase) in the start
* of the packet. Copy just enough characters. */
const char KILL_BEGIN[] = "KILL";
const size_t KILL_BEGIN_LEN = sizeof(KILL_BEGIN) - 1;
char startbuf[KILL_BEGIN_LEN]; // Not 0-terminated, careful...
size_t copied_len = gwbuf_copy_data(read_buffer, MYSQL_HEADER_LEN + 1,
KILL_BEGIN_LEN, (uint8_t*)startbuf);
if (is_complete)
{
if (strncasecmp(KILL_BEGIN, startbuf, KILL_BEGIN_LEN) == 0)
{
/* Good chance that the query is a KILL-query. Copy the entire
* buffer (skip the "KILL ") and process. */
size_t buffer_len = packet_len - (MYSQL_HEADER_LEN + 1) - KILL_BEGIN_LEN;
char querybuf[buffer_len + 1]; // 0-terminated
copied_len = gwbuf_copy_data(read_buffer,
MYSQL_HEADER_LEN + 1 + KILL_BEGIN_LEN,
buffer_len,
(uint8_t*)querybuf);
querybuf[copied_len] = '\0';
kill_type_t kt = KT_CONNECTION;
uint64_t thread_id = parse_kill_query(querybuf, &kt);
if (thread_id)
{
switch (kt)
{
case KT_CONNECTION:
session_broadcast_kill_command(dcb->session, thread_id);
// Even if id not found, send ok. TODO: send a correct response to client
mxs_mysql_send_ok(dcb, 1, 0, NULL);
rval = RES_END;
break;
case KT_QUERY:
// TODO: Implement this
MXS_WARNING("Received 'KILL QUERY %" PRIu64 "' from "
"the client. This feature is not supported.", thread_id);
mysql_send_custom_error(dcb, 1, 0, "'KILL QUERY <thread_id>' "
"is not supported.");
rval = RES_END;
break;
default:
ss_dassert(!true);
}
}
}
}
else
{
/* Look at the start of the query and see if it might contain "KILL" */
if (strncasecmp(KILL_BEGIN, startbuf, copied_len) == 0)
{
rval = RES_MORE_DATA;
}
}
return rval;
}
/**
* Parse and process a "KILL [CONNECTION | QUERY] <process_id>" query. Will modify
* the argument string even if not successful.
*
* @param query The query string
* @param kt_out The kill command type output
* @return Zero on error, a valid ID otherwise
*/
static uint64_t parse_kill_query(char *query, kill_type_t *kt_out)
{
const char WORD_CONNECTION[] = "CONNECTION";
const char WORD_QUERY[] = "QUERY";
const char DELIM[] = " \n\t";
kill_type_t kill_type = KT_CONNECTION;
unsigned long long int thread_id = 0;
enum kill_parse_state_t
{
CONN_QUERY,
ID,
SEMICOLON,
DONE
} state = CONN_QUERY;
char *saveptr = NULL;
char *token = strtok_r(query, DELIM, &saveptr);
bool error = false;
while (token && !error && state != DONE)
{
bool get_next = false;
switch (state)
{
case CONN_QUERY:
{
if (strncasecmp(token, WORD_QUERY, sizeof(WORD_QUERY) - 1) == 0)
{
kill_type = KT_QUERY;
get_next = true;
}
else if (strncasecmp(token, WORD_CONNECTION, sizeof(WORD_CONNECTION) - 1) == 0)
{
get_next = true;
}
/* Move to next state regardless of comparison result. The current
* part is optional and the process id may already be in the token. */
state = ID;
}
break;
case ID:
{
char *endptr_id;
thread_id = strtoull(token, &endptr_id, 0);
/* Zero is an error value, also MaxScale session id:s start at 1. */
if (thread_id == 0 || (thread_id == ULLONG_MAX && errno == ERANGE))
{
error = true;
errno = 0;
}
else if (*endptr_id == '\0') // Can be real end or written by strtok
{
state = SEMICOLON; // In case we have space before ;
get_next = true;
}
else if (*endptr_id == ';')
{
token = endptr_id;
state = SEMICOLON;
}
else
{
error = true;
}
}
break;
case SEMICOLON:
{
if (strncmp(token, ";", 1) == 0)
{
state = DONE;
}
else
{
error = true;
}
}
break;
default:
{
error = true;
}
break;
}
if (get_next)
{
token = strtok_r(NULL, DELIM, &saveptr);
}
}
if (error || (state != DONE && state != SEMICOLON))
{
return 0;
}
else
{
*kt_out = kill_type;
return thread_id;
}
}