Merge branch 'develop' into MXS-1209

This commit is contained in:
MassimilianoPinto
2017-05-15 10:39:07 +02:00
73 changed files with 4204 additions and 1039 deletions

View File

@ -66,7 +66,8 @@
/* The maximum size for query statements in a transaction (64MB) */
static size_t sql_size_limit = 64 * 1024 * 1024;
/* The size of the buffer for recording latency of individual statements */
static size_t latency_buf_size = 64 * 1024;
static const int default_sql_size = 4 * 1024;
#define DEFAULT_QUERY_DELIMITER "@@@"
@ -125,14 +126,17 @@ typedef struct
char *clientHost;
char *userName;
char* sql;
char* latency;
struct timeval start;
char *current;
int n_statements;
struct timeval total;
struct timeval current_start;
struct timeval last_statement_start;
bool query_end;
char *buf;
int sql_index;
int latency_index;
size_t max_sql_size;
} TPM_SESSION;
@ -310,10 +314,12 @@ newSession(MXS_FILTER *instance, MXS_SESSION *session)
{
atomic_add(&my_instance->sessions, 1);
my_session->latency = (char*)MXS_CALLOC(latency_buf_size, sizeof(char));
my_session->max_sql_size = default_sql_size; // default max query size of 4k.
my_session->sql = (char*)MXS_CALLOC(my_session->max_sql_size, sizeof(char));
memset(my_session->sql, 0x00, my_session->max_sql_size);
my_session->sql_index = 0;
my_session->latency_index = 0;
my_session->n_statements = 0;
my_session->total.tv_sec = 0;
my_session->total.tv_usec = 0;
@ -384,6 +390,7 @@ freeSession(MXS_FILTER *instance, MXS_FILTER_SESSION *session)
MXS_FREE(my_session->clientHost);
MXS_FREE(my_session->userName);
MXS_FREE(my_session->sql);
MXS_FREE(my_session->latency);
MXS_FREE(session);
return;
}
@ -508,6 +515,7 @@ routeQuery(MXS_FILTER *instance, MXS_FILTER_SESSION *session, GWBUF *queue)
/* set new pointer for the buffer */
my_session->sql_index += (my_instance->query_delimiter_size + strlen(ptr));
}
gettimeofday(&my_session->last_statement_start, NULL);
}
}
}
@ -528,6 +536,28 @@ clientReply(MXS_FILTER *instance, MXS_FILTER_SESSION *session, GWBUF *reply)
struct timeval tv, diff;
int i, inserted;
/* records latency of the SQL statement. */
if (my_session->sql_index > 0)
{
gettimeofday(&tv, NULL);
timersub(&tv, &(my_session->last_statement_start), &diff);
/* get latency */
double millis = (diff.tv_sec * 1000 + diff.tv_usec / 1000.0);
int written = sprintf(my_session->latency + my_session->latency_index, "%.3f", millis);
my_session->latency_index += written;
if (!my_session->query_end)
{
written = sprintf(my_session->latency + my_session->latency_index, "%s", my_instance->query_delimiter);
my_session->latency_index += written;
}
if (my_session->latency_index > latency_buf_size)
{
MXS_ERROR("Latency buffer overflow.");
}
}
/* found 'commit' and sql statements exist. */
if (my_session->query_end && my_session->sql_index > 0)
{
@ -544,8 +574,8 @@ clientReply(MXS_FILTER *instance, MXS_FILTER_SESSION *session, GWBUF *reply)
/* print to log. */
if (my_instance->log_enabled)
{
/* this prints "timestamp | server_name | user_name | latency | sql_statements" */
fprintf(my_instance->fp, "%ld%s%s%s%s%s%ld%s%s\n",
/* this prints "timestamp | server_name | user_name | latency of entire transaction | latencies of individual statements | sql_statements" */
fprintf(my_instance->fp, "%ld%s%s%s%s%s%ld%s%s%s%s\n",
timestamp,
my_instance->delimiter,
reply->server->unique_name,
@ -554,10 +584,13 @@ clientReply(MXS_FILTER *instance, MXS_FILTER_SESSION *session, GWBUF *reply)
my_instance->delimiter,
millis,
my_instance->delimiter,
my_session->latency,
my_instance->delimiter,
my_session->sql);
}
my_session->sql_index = 0;
my_session->latency_index = 0;
}
/* Pass the result upstream */

View File

@ -5,3 +5,4 @@ install_module(MySQLCommon core)
add_subdirectory(MySQLBackend)
add_subdirectory(MySQLClient)
add_subdirectory(test)

View File

@ -14,6 +14,11 @@
#define MXS_MODULE_NAME "MySQLClient"
#include <inttypes.h>
#include <limits.h>
#include <netinet/tcp.h>
#include <sys/stat.h>
#include <maxscale/protocol.h>
#include <maxscale/alloc.h>
#include <maxscale/log_manager.h>
@ -21,14 +26,30 @@
#include <maxscale/ssl.h>
#include <maxscale/poll.h>
#include <maxscale/modinfo.h>
#include <sys/stat.h>
#include <maxscale/modutil.h>
#include <netinet/tcp.h>
#include <maxscale/query_classifier.h>
#include <maxscale/authenticator.h>
#include <maxscale/session.h>
#include <maxscale/worker.h>
/** Return type of process_special_commands() */
typedef enum spec_com_res_t
{
RES_CONTINUE, // No special command detected, proceed as normal.
RES_END, // Query handling completed, do not send to filters/router.
RES_MORE_DATA // Possible special command, but not enough data to be sure. Must
// wait for more data.
} spec_com_res_t;
/* Type of the kill-command sent by client. */
typedef enum kill_type
{
KT_CONNECTION,
KT_QUERY
} kill_type_t;
const char WORD_KILL[] = "KILL";
static int process_init(void);
static void process_finish(void);
static int thread_init(void);
@ -52,11 +73,10 @@ static int gw_read_normal_data(DCB *dcb, GWBUF *read_buffer, int nbytes_read);
static int gw_read_finish_processing(DCB *dcb, GWBUF *read_buffer, uint64_t capabilities);
static bool ensure_complete_packet(DCB *dcb, GWBUF **read_buffer, int nbytes_read);
static void gw_process_one_new_client(DCB *client_dcb);
static bool process_special_commands(DCB* client_dcb, GWBUF *read_buffer, int nbytes_read);
/*
* The "module object" for the mysqld client protocol module.
*/
static spec_com_res_t process_special_commands(DCB *client_dcb, GWBUF *read_buffer, int nbytes_read);
static spec_com_res_t handle_query_kill(DCB* dcb, GWBUF* read_buffer, spec_com_res_t current,
bool is_complete, unsigned int packet_len);
static bool parse_kill_query(char *query, uint64_t *thread_id_out, kill_type_t *kt_out);
/**
* The module entry point routine. It is this routine that
@ -241,9 +261,10 @@ int MySQLSendHandshake(DCB* dcb)
memcpy(mysql_filler_ten + 6, &new_flags, sizeof(new_flags));
}
// Get the equivalent of the server process id.
protocol->tid = session_get_next_id();
gw_mysql_set_byte4(mysql_thread_id_num, protocol->tid);
// Get the equivalent of the server thread id.
protocol->thread_id = session_get_next_id();
// Send only the low 32bits in the handshake.
gw_mysql_set_byte4(mysql_thread_id_num, (uint32_t)(protocol->thread_id));
memcpy(mysql_scramble_buf, server_scramble, 8);
memcpy(mysql_plugin_data, server_scramble + 8, 12);
@ -668,7 +689,7 @@ gw_read_do_authentication(DCB *dcb, GWBUF *read_buffer, int nbytes_read)
* normal data handling function instead of this one.
*/
MXS_SESSION *session =
session_alloc_with_id(dcb->service, dcb, protocol->tid);
session_alloc_with_id(dcb->service, dcb, protocol->thread_id);
if (session != NULL)
{
@ -907,12 +928,29 @@ gw_read_normal_data(DCB *dcb, GWBUF *read_buffer, int nbytes_read)
}
}
if (!process_special_commands(dcb, read_buffer, nbytes_read))
spec_com_res_t res = process_special_commands(dcb, read_buffer, nbytes_read);
int rval = 1;
switch (res)
{
return 0;
}
case RES_MORE_DATA:
dcb->dcb_readqueue = read_buffer;
rval = 0;
break;
return gw_read_finish_processing(dcb, read_buffer, capabilities);
case RES_END:
// Do not send this packet for routing
gwbuf_free(read_buffer);
rval = 0;
break;
case RES_CONTINUE:
rval = gw_read_finish_processing(dcb, read_buffer, capabilities);
break;
default:
ss_dassert(!true);
}
return rval;
}
/**
@ -1483,14 +1521,23 @@ static bool ensure_complete_packet(DCB *dcb, GWBUF **read_buffer, int nbytes_rea
/**
* Some SQL commands/queries need to be detected and handled by the protocol
* and MaxScale instead of being routed forward as is.
*
* @param dcb Client dcb
* @param read_buffer the current read buffer
* @param nbytes_read How many bytes were read
* @return true if read buffer should be sent forward to routing, false if more
* data is required or processing is complete
* @return see @c spec_com_res_t
*/
static bool process_special_commands(DCB* dcb, GWBUF *read_buffer, int nbytes_read)
static spec_com_res_t process_special_commands(DCB *dcb, GWBUF *read_buffer, int nbytes_read)
{
spec_com_res_t rval = RES_CONTINUE;
bool is_complete = false;
unsigned int packet_len =
MYSQL_GET_PAYLOAD_LEN((uint8_t *)GWBUF_DATA(read_buffer)) + MYSQL_HEADER_LEN;
if (gwbuf_length(read_buffer) == packet_len)
{
is_complete = true;
}
/**
* Handle COM_SET_OPTION. This seems to be only used by some versions of PHP.
*
@ -1515,32 +1562,252 @@ static bool process_special_commands(DCB* dcb, GWBUF *read_buffer, int nbytes_re
/**
* Handle COM_PROCESS_KILL
*/
else if ((proto->current_command == MYSQL_COM_PROCESS_KILL))
else if (proto->current_command == MYSQL_COM_PROCESS_KILL)
{
/* Make sure we have a complete SQL packet before trying to read the
* process id. If not, try again next time. */
unsigned int expected_len =
MYSQL_GET_PAYLOAD_LEN((uint8_t *)GWBUF_DATA(read_buffer)) + MYSQL_HEADER_LEN;
if (gwbuf_length(read_buffer) < expected_len)
if (!is_complete)
{
dcb->dcb_readqueue = read_buffer;
return false;
rval = RES_MORE_DATA;
}
else
{
uint8_t bytes[4];
if (gwbuf_copy_data(read_buffer, MYSQL_HEADER_LEN + 1, sizeof(bytes), (uint8_t*)bytes)
if (gwbuf_copy_data(read_buffer, MYSQL_HEADER_LEN + 1, sizeof(bytes), bytes)
== sizeof(bytes))
{
uint64_t process_id = gw_mysql_get_byte4(bytes);
// Do not send this packet for routing
gwbuf_free(read_buffer);
session_broadcast_kill_command(dcb->session, process_id);
// Even if id not found, send ok. TODO: send a correct response to client
mxs_mysql_send_ok(dcb, 1, 0, NULL);
return false;
rval = RES_END;
}
}
}
return true;
}
else if (proto->current_command == MYSQL_COM_QUERY)
{
/* Limits on the length of the queries in which "KILL" is searched for. Reducing
* LONGEST_KILL will reduce overhead but also limit the range of accepted queries. */
const int SHORTEST_KILL = sizeof("KILL 1") - 1;
const int LONGEST_KILL = sizeof("KILL CONNECTION 12345678901234567890 ;");
/* Is length within limits for a kill-type query? */
if (packet_len >= (MYSQL_HEADER_LEN + 1 + SHORTEST_KILL) &&
packet_len <= (MYSQL_HEADER_LEN + 1 + LONGEST_KILL))
{
rval = handle_query_kill(dcb, read_buffer, rval, is_complete, packet_len);
}
}
return rval;
}
/**
* Handle text version of KILL [CONNECTION | QUERY] <process_id>. Only detects
* commands in the beginning of the packet and with no comments.
* Increased parsing would slow down the handling of every single query.
*
* @param dcb Client dcb
* @param read_buffer Input buffer
* @param current Latest value of rval in calling function
* @param is_complete Is read_buffer a complete sql packet
* @param packet_len Read from sql header
* @return Updated (or old) value of rval
*/
spec_com_res_t handle_query_kill(DCB* dcb, GWBUF* read_buffer, spec_com_res_t current,
bool is_complete, unsigned int packet_len)
{
spec_com_res_t rval = current;
/* First, we need to detect the text "KILL" (ignorecase) in the start
* of the packet. Copy just enough characters. */
const size_t KILL_BEGIN_LEN = sizeof(WORD_KILL) - 1;
char startbuf[KILL_BEGIN_LEN]; // Not 0-terminated, careful...
size_t copied_len = gwbuf_copy_data(read_buffer, MYSQL_HEADER_LEN + 1,
KILL_BEGIN_LEN, (uint8_t*)startbuf);
if (is_complete)
{
if (strncasecmp(WORD_KILL, startbuf, KILL_BEGIN_LEN) == 0)
{
/* Good chance that the query is a KILL-query. Copy the entire
* buffer and process. */
size_t buffer_len = packet_len - (MYSQL_HEADER_LEN + 1);
char querybuf[buffer_len + 1]; // 0-terminated
copied_len = gwbuf_copy_data(read_buffer,
MYSQL_HEADER_LEN + 1,
buffer_len,
(uint8_t*)querybuf);
querybuf[copied_len] = '\0';
kill_type_t kt = KT_CONNECTION;
uint64_t thread_id = 0;
bool parsed = parse_kill_query(querybuf, &thread_id, &kt);
if (parsed && (thread_id > 0)) // MaxScale session counter starts at 1
{
switch (kt)
{
case KT_CONNECTION:
session_broadcast_kill_command(dcb->session, thread_id);
// Even if id not found, send ok. TODO: send a correct response to client
mxs_mysql_send_ok(dcb, 1, 0, NULL);
rval = RES_END;
break;
case KT_QUERY:
// TODO: Implement this
MXS_WARNING("Received 'KILL QUERY %" PRIu64 "' from "
"the client. This feature is not supported.", thread_id);
mysql_send_custom_error(dcb, 1, 0, "'KILL QUERY <thread_id>' "
"is not supported.");
rval = RES_END;
break;
default:
ss_dassert(!true);
}
}
}
}
else
{
/* Look at the start of the query and see if it might contain "KILL" */
if (strncasecmp(WORD_KILL, startbuf, copied_len) == 0)
{
rval = RES_MORE_DATA;
}
}
return rval;
}
/**
* Parse a "KILL [CONNECTION | QUERY] <process_id>" query. Will modify
* the argument string even if unsuccessful.
*
* @param query Query string to parse
* @paran thread_id_out Thread id output
* @param kt_out Kill command type output
* @return true on success, false on error
*/
static bool parse_kill_query(char *query, uint64_t *thread_id_out, kill_type_t *kt_out)
{
const char WORD_CONNECTION[] = "CONNECTION";
const char WORD_QUERY[] = "QUERY";
const char DELIM[] = " \n\t";
kill_type_t kill_type = KT_CONNECTION;
unsigned long long int thread_id = 0;
enum kill_parse_state_t
{
KILL,
CONN_QUERY,
ID,
SEMICOLON,
DONE
} state = KILL;
char *saveptr = NULL;
bool error = false;
char *token = strtok_r(query, DELIM, &saveptr);
while (token && !error)
{
bool get_next = false;
switch (state)
{
case KILL:
if (strncasecmp(token, WORD_KILL, sizeof(WORD_KILL) - 1) == 0)
{
state = CONN_QUERY;
get_next = true;
}
else
{
error = true;
}
break;
case CONN_QUERY:
if (strncasecmp(token, WORD_QUERY, sizeof(WORD_QUERY) - 1) == 0)
{
kill_type = KT_QUERY;
get_next = true;
}
else if (strncasecmp(token, WORD_CONNECTION, sizeof(WORD_CONNECTION) - 1) == 0)
{
get_next = true;
}
/* Move to next state regardless of comparison result. The current
* part is optional and the process id may already be in the token. */
state = ID;
break;
case ID:
{
/* strtoull() accepts negative numbers, so check for '-' here */
if (*token == '-')
{
error = true;
break;
}
char *endptr_id = NULL;
thread_id = strtoull(token, &endptr_id, 0);
if ((thread_id == ULLONG_MAX) && (errno == ERANGE))
{
error = true;
errno = 0;
}
else if (endptr_id == token)
{
error = true; // No digits were read
}
else if (*endptr_id == '\0') // Can be real end or written by strtok
{
state = SEMICOLON; // In case we have space before ;
get_next = true;
}
else if (*endptr_id == ';')
{
token = endptr_id;
state = SEMICOLON;
}
else
{
error = true;
}
}
break;
case SEMICOLON:
{
if (strncmp(token, ";", 1) == 0)
{
state = DONE;
get_next = true;
}
else
{
error = true;
}
}
break;
default:
error = true;
break;
}
if (get_next)
{
token = strtok_r(NULL, DELIM, &saveptr);
}
}
if (error || (state != DONE && state != SEMICOLON))
{
return false;
}
else
{
*thread_id_out = thread_id;
*kt_out = kill_type;
return true;
}
}

View File

@ -1410,7 +1410,6 @@ gw_decode_mysql_server_handshake(MySQLProtocol *conn, uint8_t *payload)
uint8_t *server_version_end = NULL;
uint16_t mysql_server_capabilities_one = 0;
uint16_t mysql_server_capabilities_two = 0;
unsigned long tid = 0;
uint8_t scramble_data_1[GW_SCRAMBLE_LENGTH_323] = "";
uint8_t scramble_data_2[GW_MYSQL_SCRAMBLE_SIZE - GW_SCRAMBLE_LENGTH_323] = "";
uint8_t capab_ptr[4] = "";
@ -1433,8 +1432,10 @@ gw_decode_mysql_server_handshake(MySQLProtocol *conn, uint8_t *payload)
payload = server_version_end + 1;
// get ThreadID: 4 bytes
tid = gw_mysql_get_byte4(payload);
memcpy(&conn->tid, &tid, 4);
uint32_t tid = gw_mysql_get_byte4(payload);
/* TODO: Correct value of thread id could be queried later from backend if
* there is any worry it might be larger than 32bit allows. */
conn->thread_id = tid;
payload += 4;

View File

@ -0,0 +1,4 @@
add_executable(test_parse_kill test_parse_kill.c)
target_link_libraries(test_parse_kill maxscale-common MySQLCommon)
add_test(test_parse_kill test_parse_kill)

View File

@ -0,0 +1,89 @@
#include <maxscale/cdefs.h>
#include "../MySQLClient/mysql_client.c"
int test_one_query(char *query, bool should_succeed, uint64_t expected_tid,
kill_type_t expected_kt)
{
char *query_copy = MXS_STRDUP_A(query);
uint64_t result_tid = 1111111;
kill_type_t result_kt = KT_QUERY;
/* If the parse fails, these should remain unchanged */
if (!should_succeed)
{
result_tid = expected_tid;
result_kt = expected_kt;
}
bool success = parse_kill_query(query_copy, &result_tid, &result_kt);
MXS_FREE(query_copy);
if ((success == should_succeed) && (result_tid == expected_tid) &&
(result_kt == expected_kt))
{
return 0;
}
else
{
printf("Result wrong on query: '%s'.\n", query);
if (success != should_succeed)
{
printf("Expected success '%d', got '%d'.\n", should_succeed, success);
}
if (result_tid != expected_tid)
{
printf("Expected thread id '%" PRIu64 "', got '%" PRIu64 "'.\n",
expected_tid, result_tid);
}
if (result_kt != expected_kt)
{
printf("Expected kill type '%u', got '%u'.\n",
expected_kt, result_kt);
}
printf("\n");
return 1;
}
}
typedef struct test_t
{
char *query;
bool should_succeed;
uint64_t correct_id;
kill_type_t correct_kt;
} test_t;
int main(int argc, char **argv)
{
test_t tests[] =
{
{" kill ConNectioN 123 ", true, 123, KT_CONNECTION},
{"kIlL coNNectioN 987654321 ;", true, 987654321, KT_CONNECTION},
{" Ki5L CoNNectioN 987654321 ", false, 0, KT_CONNECTION},
{"1", false, 0, KT_CONNECTION},
{"kILL 1", true, 1, KT_CONNECTION},
{"\n\t kill \nQueRy 456", true, 456, KT_QUERY},
{" A kill 1; ", false, 0, KT_CONNECTION},
{" kill connection 1A", false, 0, KT_CONNECTION},
{" kill connection 1 A ", false, 0, KT_CONNECTION},
{"kill query 7 ; select * ", false, 0, KT_CONNECTION},
{
"KIll query \t \n \t 12345678901234567890 \n \t ",
true, 12345678901234567890ULL, KT_QUERY
},
{"KIll query \t \n \t 21 \n \t ", true, 21, KT_QUERY},
{"KIll \t \n \t -6 \n \t ", false, 0, KT_CONNECTION},
{"KIll 12345678901234567890123456 \n \t ", false, 0, KT_CONNECTION},
{"kill ;", false, 0, KT_QUERY}
};
int result = 0;
int arr_size = sizeof(tests) / sizeof(test_t);
for (int i = 0; i < arr_size; i++)
{
char *query = tests[i].query;
bool should_succeed = tests[i].should_succeed;
uint64_t expected_tid = tests[i].correct_id;
kill_type_t expected_kt = tests[i].correct_kt;
result += test_one_query(query, should_succeed, expected_tid, expected_kt);
}
return result;
}

View File

@ -129,8 +129,8 @@ int main(int argc, char **argv)
}
inst->service = service;
inst->user = service->credentials.name;
inst->password = service->credentials.authdata;
inst->user = MXS_STRDUP_A(service->credentials.name);
inst->password = MXS_STRDUP_A(service->credentials.authdata);
MXS_NOTICE("testbinlog v1.0");

View File

@ -363,7 +363,7 @@ int32_t SchemaRouterSession::routeQuery(GWBUF* pPacket)
if (m_config->debug)
{
sprintf(errbuf + strlen(errbuf),
" ([%" PRIu32 "]: DB change failed)",
" ([%" PRIu64 "]: DB change failed)",
m_client->session->ses_id);
}
@ -991,7 +991,7 @@ bool SchemaRouterSession::handle_default_db()
sprintf(errmsg, "Unknown database '%s'", m_connect_db.c_str());
if (m_config->debug)
{
sprintf(errmsg + strlen(errmsg), " ([%" PRIu32 "]: DB not found on connect)",
sprintf(errmsg + strlen(errmsg), " ([%" PRIu64 "]: DB not found on connect)",
m_client->session->ses_id);
}
write_error_to_client(m_client,