MXS-852: Handle one-way session commands

Session commands that will not return a response can be completed
immediately. This requires some special code in the readwritesplit Backend
class implementation as well as a small addition to the Backend class
itself.

Since not all commands expect a response from the server, the queued query
routing function needs some adjustment. The routing of queued queries
should be attempted until a command which expects a response is found or
the queue is empty.

By properly handling these types of session commands, the router can
enable the execution of COM_STMT_CLOSE and COM_STMT_RESET on all
servers. This will prevent resource leakages in the server and allow
proper handling of COM_STMT type command.
This commit is contained in:
Markus Mäkelä
2017-06-22 13:16:12 +03:00
parent 5fc30740b7
commit 52b7fb9340
8 changed files with 46 additions and 28 deletions

View File

@ -547,4 +547,13 @@ bool mxs_mysql_extract_ps_response(GWBUF* buffer, MXS_PS_RESPONSE* out);
*/ */
uint32_t mxs_mysql_extract_ps_id(GWBUF* buffer); uint32_t mxs_mysql_extract_ps_id(GWBUF* buffer);
/**
* @brief Determine if a packet contains a one way message
*
* @param cmd Command to inspect
*
* @return True if a response is expected from the server
*/
bool mxs_mysql_command_will_respond(uint8_t cmd);
MXS_END_DECLS MXS_END_DECLS

View File

@ -93,7 +93,9 @@ bool Backend::execute_session_command()
{ {
case MYSQL_COM_QUIT: case MYSQL_COM_QUIT:
case MYSQL_COM_STMT_CLOSE: case MYSQL_COM_STMT_CLOSE:
/** These commands do not generate responses */
rval = write(buffer, NO_RESPONSE); rval = write(buffer, NO_RESPONSE);
complete_session_command();
break; break;
case MYSQL_COM_CHANGE_USER: case MYSQL_COM_CHANGE_USER:

View File

@ -1649,3 +1649,10 @@ uint32_t mxs_mysql_extract_ps_id(GWBUF* buffer)
return rval; return rval;
} }
bool mxs_mysql_command_will_respond(uint8_t cmd)
{
return cmd != MYSQL_COM_STMT_SEND_LONG_DATA &&
cmd != MYSQL_COM_QUIT &&
cmd != MYSQL_COM_STMT_CLOSE;
}

View File

@ -547,7 +547,10 @@ bool route_stored_query(ROUTER_CLIENT_SES *rses)
{ {
bool rval = true; bool rval = true;
if (rses->query_queue) /** Loop over the stored statements as long as the routeQuery call doesn't
* append more data to the queue. If it appends data to the queue, we need
* to wait for a response before attempting another reroute */
while (rses->query_queue)
{ {
GWBUF* query_queue = modutil_get_next_MySQL_packet(&rses->query_queue); GWBUF* query_queue = modutil_get_next_MySQL_packet(&rses->query_queue);
query_queue = gwbuf_make_contiguous(query_queue); query_queue = gwbuf_make_contiguous(query_queue);
@ -574,9 +577,18 @@ bool route_stored_query(ROUTER_CLIENT_SES *rses)
gwbuf_free(query_queue); gwbuf_free(query_queue);
} }
ss_dassert(rses->query_queue == NULL); if (rses->query_queue == NULL)
{
/** Query successfully routed and no responses are expected */
rses->query_queue = temp_storage; rses->query_queue = temp_storage;
} }
else
{
/** Routing was stopped, we need to wait for a response before retrying */
rses->query_queue = gwbuf_append(temp_storage, rses->query_queue);
break;
}
}
return rval; return rval;
} }

View File

@ -195,9 +195,10 @@ public:
bool execute_session_command() bool execute_session_command()
{ {
bool expect_response = mxs_mysql_command_will_respond(next_session_command()->get_command());
bool rval = mxs::Backend::execute_session_command(); bool rval = mxs::Backend::execute_session_command();
if (rval) if (rval && expect_response)
{ {
set_reply_state(REPLY_STATE_START); set_reply_state(REPLY_STATE_START);
} }

View File

@ -45,7 +45,6 @@ bool handle_target_is_all(route_target_t route_target,
GWBUF *querybuf, int packet_type, uint32_t qtype); GWBUF *querybuf, int packet_type, uint32_t qtype);
uint8_t determine_packet_type(GWBUF *querybuf, bool *non_empty_packet); uint8_t determine_packet_type(GWBUF *querybuf, bool *non_empty_packet);
void log_transaction_status(ROUTER_CLIENT_SES *rses, GWBUF *querybuf, uint32_t qtype); void log_transaction_status(ROUTER_CLIENT_SES *rses, GWBUF *querybuf, uint32_t qtype);
bool command_will_respond(uint8_t packet_type);
bool is_packet_a_query(int packet_type); bool is_packet_a_query(int packet_type);
bool send_readonly_error(DCB *dcb); bool send_readonly_error(DCB *dcb);

View File

@ -104,27 +104,6 @@ is_packet_a_query(int packet_type)
return (packet_type == MYSQL_COM_QUERY); return (packet_type == MYSQL_COM_QUERY);
} }
/*
* This looks MySQL specific
*/
/**
* @brief Determine if a packet contains a one way message
*
* Packet type tells us this, but in a DB specific way. This function is
* provided so that code that is not DB specific can find out whether a packet
* contains a one way messsage. Clearly, to be effective different functions must be
* called for different DB types.
*
* @param packet_type Type of packet (integer)
* @return bool indicating whether packet contains a one way message
*/
bool command_will_respond(uint8_t packet_type)
{
return packet_type != MYSQL_COM_STMT_SEND_LONG_DATA &&
packet_type != MYSQL_COM_QUIT &&
packet_type != MYSQL_COM_STMT_CLOSE;
}
/* /*
* This one is problematic because it is MySQL specific, but also router * This one is problematic because it is MySQL specific, but also router
* specific. * specific.

View File

@ -270,7 +270,7 @@ bool route_session_write(ROUTER_CLIENT_SES *rses, GWBUF *querybuf,
/** The SessionCommand takes ownership of the buffer */ /** The SessionCommand takes ownership of the buffer */
uint64_t id = rses->sescmd_count++; uint64_t id = rses->sescmd_count++;
mxs::SSessionCommand sescmd(new mxs::SessionCommand(querybuf, id)); mxs::SSessionCommand sescmd(new mxs::SessionCommand(querybuf, id));
bool expecting_response = command_will_respond(command); bool expecting_response = mxs_mysql_command_will_respond(command);
int nsucc = 0; int nsucc = 0;
uint64_t lowest_pos = id; uint64_t lowest_pos = id;
@ -349,6 +349,13 @@ bool route_session_write(ROUTER_CLIENT_SES *rses, GWBUF *querybuf,
if (nsucc) if (nsucc)
{ {
rses->sent_sescmd = id; rses->sent_sescmd = id;
if (!expecting_response)
{
/** The command doesn't generate a response so we increment the
* completed session command count */
rses->recv_sescmd++;
}
} }
return nsucc; return nsucc;
@ -552,7 +559,9 @@ route_target_t get_route_target(ROUTER_CLIENT_SES *rses, uint8_t command,
target = TARGET_MASTER; target = TARGET_MASTER;
} }
else if (qc_query_is_type(qtype, QUERY_TYPE_PREPARE_STMT) || else if (qc_query_is_type(qtype, QUERY_TYPE_PREPARE_STMT) ||
qc_query_is_type(qtype, QUERY_TYPE_PREPARE_NAMED_STMT)) qc_query_is_type(qtype, QUERY_TYPE_PREPARE_NAMED_STMT) ||
command == MYSQL_COM_STMT_CLOSE ||
command == MYSQL_COM_STMT_RESET)
{ {
target = TARGET_ALL; target = TARGET_ALL;
} }