Detect session command replies with trailing data

When a session command was received, any trailing data was lost even
though an attempt to split is was made.

With this change, each session command reply will be routed individually
and any trailing data is routed separately.
This commit is contained in:
Markus Makela
2016-12-08 13:25:46 +02:00
parent fab7accef3
commit 0e50ecb525

View File

@ -76,7 +76,7 @@ static int backend_write_delayqueue(DCB *dcb, GWBUF *buffer);
static void backend_set_delayqueue(DCB *dcb, GWBUF *queue); static void backend_set_delayqueue(DCB *dcb, GWBUF *queue);
static int gw_change_user(DCB *backend_dcb, SERVER *server, SESSION *in_session, GWBUF *queue); static int gw_change_user(DCB *backend_dcb, SERVER *server, SESSION *in_session, GWBUF *queue);
static char *gw_backend_default_auth(); static char *gw_backend_default_auth();
static GWBUF* process_response_data(DCB* dcb, GWBUF* readbuf, int nbytes_to_process); static GWBUF* process_response_data(DCB* dcb, GWBUF** readbuf, int nbytes_to_process);
extern char* create_auth_failed_msg(GWBUF* readbuf, char* hostaddr, uint8_t* sha1); extern char* create_auth_failed_msg(GWBUF* readbuf, char* hostaddr, uint8_t* sha1);
static bool sescmd_response_complete(DCB* dcb); static bool sescmd_response_complete(DCB* dcb);
static void gw_reply_on_error(DCB *dcb, mxs_auth_state_t state); static void gw_reply_on_error(DCB *dcb, mxs_auth_state_t state);
@ -832,44 +832,58 @@ gw_read_and_write(DCB *dcb)
return rval; return rval;
} }
/** do
* If protocol has session command set, concatenate whole
* response into one buffer.
*/
if (protocol_get_srv_command((MySQLProtocol *)dcb->protocol, false) != MYSQL_COM_UNDEFINED)
{ {
read_buffer = process_response_data(dcb, read_buffer, gwbuf_length(read_buffer)); GWBUF *stmt = NULL;
/** /**
* Received incomplete response to session command. * If protocol has session command set, concatenate whole
* Store it to readqueue and return. * response into one buffer.
*/ */
if (!sescmd_response_complete(dcb)) if (protocol_get_srv_command((MySQLProtocol *)dcb->protocol, false) != MYSQL_COM_UNDEFINED)
{ {
return 0; stmt = process_response_data(dcb, &read_buffer, gwbuf_length(read_buffer));
/**
* Received incomplete response to session command.
* Store it to readqueue and return.
*/
if (!sescmd_response_complete(dcb))
{
return 0;
}
if (!stmt)
{
MXS_ERROR("%lu [gw_read_backend_event] "
"Read buffer unexpectedly null, even though response "
"not marked as complete. User: %s",
pthread_self(), dcb->session->client_dcb->user);
return 0;
}
}
else if (rcap_type_required(capabilities, RCAP_TYPE_STMT_OUTPUT))
{
stmt = modutil_get_next_MySQL_packet(&read_buffer);
}
else
{
stmt = read_buffer;
read_buffer = NULL;
} }
if (!read_buffer) if (session_ok_to_route(dcb))
{ {
MXS_ERROR("%lu [gw_read_backend_event] " gwbuf_set_type(stmt, GWBUF_TYPE_MYSQL);
"Read buffer unexpectedly null, even though response " session->service->router->clientReply(session->service->router_instance,
"not marked as complete. User: %s", session->router_session,
pthread_self(), dcb->session->client_dcb->user); stmt, dcb);
return 0; return_code = 1;
}
else /*< session is closing; replying to client isn't possible */
{
gwbuf_free(stmt);
} }
} }
while (read_buffer);
if (session_ok_to_route(dcb))
{
gwbuf_set_type(read_buffer, GWBUF_TYPE_MYSQL);
session->service->router->clientReply(session->service->router_instance,
session->router_session,
read_buffer, dcb);
return_code = 1;
}
else /*< session is closing; replying to client isn't possible */
{
gwbuf_free(read_buffer);
}
return return_code; return return_code;
} }
@ -1667,7 +1681,7 @@ retblock:
* @return GWBUF which includes complete MySQL packet * @return GWBUF which includes complete MySQL packet
*/ */
static GWBUF* process_response_data(DCB* dcb, static GWBUF* process_response_data(DCB* dcb,
GWBUF* readbuf, GWBUF** readbuf,
int nbytes_to_process) int nbytes_to_process)
{ {
int npackets_left = 0; /*< response's packet count */ int npackets_left = 0; /*< response's packet count */
@ -1685,7 +1699,7 @@ static GWBUF* process_response_data(DCB* dcb,
} }
/** All buffers processed here are sescmd responses */ /** All buffers processed here are sescmd responses */
gwbuf_set_type(readbuf, GWBUF_TYPE_SESCMD_RESPONSE); gwbuf_set_type(*readbuf, GWBUF_TYPE_SESCMD_RESPONSE);
/** /**
* Now it is known how many packets there should be and how much * Now it is known how many packets there should be and how much
@ -1719,7 +1733,7 @@ static GWBUF* process_response_data(DCB* dcb,
* packet content. Fails if read buffer doesn't include * packet content. Fails if read buffer doesn't include
* enough data to read the packet length. * enough data to read the packet length.
*/ */
init_response_status(readbuf, srvcmd, &npackets_left, &nbytes_left); init_response_status(*readbuf, srvcmd, &npackets_left, &nbytes_left);
} }
initial_packets = npackets_left; initial_packets = npackets_left;
@ -1735,7 +1749,7 @@ static GWBUF* process_response_data(DCB* dcb,
if (nbytes_to_process >= 5) if (nbytes_to_process >= 5)
{ {
/** discard source buffer */ /** discard source buffer */
readbuf = gwbuf_consume(readbuf, GWBUF_LENGTH(readbuf)); *readbuf = gwbuf_consume(*readbuf, GWBUF_LENGTH(*readbuf));
nbytes_left -= nbytes_to_process; nbytes_left -= nbytes_to_process;
} }
nbytes_to_process = 0; nbytes_to_process = 0;
@ -1747,8 +1761,8 @@ static GWBUF* process_response_data(DCB* dcb,
nbytes_to_process = 0; nbytes_to_process = 0;
ss_dassert(npackets_left > 0); ss_dassert(npackets_left > 0);
npackets_left -= 1; npackets_left -= 1;
outbuf = gwbuf_append(outbuf, readbuf); outbuf = gwbuf_append(outbuf, *readbuf);
readbuf = NULL; *readbuf = NULL;
} }
/** /**
* Buffer contains more data than we need. Split the complete packet and * Buffer contains more data than we need. Split the complete packet and
@ -1759,7 +1773,7 @@ static GWBUF* process_response_data(DCB* dcb,
ss_dassert(nbytes_left < nbytes_to_process); ss_dassert(nbytes_left < nbytes_to_process);
ss_dassert(nbytes_left > 0); ss_dassert(nbytes_left > 0);
ss_dassert(npackets_left > 0); ss_dassert(npackets_left > 0);
outbuf = gwbuf_append(outbuf, gwbuf_split(&readbuf, nbytes_left)); outbuf = gwbuf_append(outbuf, gwbuf_split(readbuf, nbytes_left));
nbytes_to_process -= nbytes_left; nbytes_to_process -= nbytes_left;
npackets_left -= 1; npackets_left -= 1;
nbytes_left = 0; nbytes_left = 0;
@ -1785,6 +1799,9 @@ static GWBUF* process_response_data(DCB* dcb,
/** Archive the command */ /** Archive the command */
protocol_archive_srv_command(p); protocol_archive_srv_command(p);
/** Ignore the rest of the response */
nbytes_to_process = 0;
} }
/** Read next packet */ /** Read next packet */
else else
@ -1795,7 +1812,7 @@ static GWBUF* process_response_data(DCB* dcb,
* three bytes left. If there is less than three * three bytes left. If there is less than three
* bytes in the buffer or it is NULL, we need to * bytes in the buffer or it is NULL, we need to
wait for more data from the backend server.*/ wait for more data from the backend server.*/
if (readbuf == NULL || gwbuf_length(readbuf) < 3) if (*readbuf == NULL || gwbuf_length(*readbuf) < 3)
{ {
MXS_DEBUG("%lu [%s] Read %d packets. Waiting for %d more " MXS_DEBUG("%lu [%s] Read %d packets. Waiting for %d more "
"packets for a total of %d packets.", "packets for a total of %d packets.",
@ -1812,7 +1829,7 @@ static GWBUF* process_response_data(DCB* dcb,
return NULL; return NULL;
} }
uint8_t packet_len[3]; uint8_t packet_len[3];
gwbuf_copy_data(readbuf, 0, 3, packet_len); gwbuf_copy_data(*readbuf, 0, 3, packet_len);
nbytes_left = gw_mysql_get_byte3(packet_len) + MYSQL_HEADER_LEN; nbytes_left = gw_mysql_get_byte3(packet_len) + MYSQL_HEADER_LEN;
/** Store new status to protocol structure */ /** Store new status to protocol structure */
protocol_set_response_status(p, npackets_left, nbytes_left); protocol_set_response_status(p, npackets_left, nbytes_left);