MXS-1047: Backport to 2.0

Backported commits 0e50ecb525003b87e8199708008e4a18606c5e54 and
570e12942ba3d2363ec7098dcf1d35c6b68667a1 to 2.0.
This commit is contained in:
Markus Makela
2016-12-09 11:13:27 +02:00
parent 0fab454e66
commit d4d40c0b9b
2 changed files with 42 additions and 42 deletions

View File

@ -79,7 +79,7 @@ static int backend_write_delayqueue(DCB *dcb);
static void backend_set_delayqueue(DCB *dcb, GWBUF *queue); static void backend_set_delayqueue(DCB *dcb, GWBUF *queue);
static int gw_change_user(DCB *backend_dcb, SERVER *server, SESSION *in_session, GWBUF *queue); static int gw_change_user(DCB *backend_dcb, SERVER *server, SESSION *in_session, GWBUF *queue);
static char *gw_backend_default_auth(); static char *gw_backend_default_auth();
static GWBUF* process_response_data(DCB* dcb, GWBUF* readbuf, int nbytes_to_process); static GWBUF* process_response_data(DCB* dcb, GWBUF** readbuf, int nbytes_to_process);
extern char* create_auth_failed_msg(GWBUF* readbuf, char* hostaddr, uint8_t* sha1); extern char* create_auth_failed_msg(GWBUF* readbuf, char* hostaddr, uint8_t* sha1);
static bool sescmd_response_complete(DCB* dcb); static bool sescmd_response_complete(DCB* dcb);
static int gw_read_reply_or_error(DCB *dcb, MYSQL_session local_session); static int gw_read_reply_or_error(DCB *dcb, MYSQL_session local_session);
@ -1091,24 +1091,31 @@ gw_read_and_write(DCB *dcb, MYSQL_session local_session)
} }
} }
do
{
GWBUF *stmt = NULL;
/** /**
* If protocol has session command set, concatenate whole * If protocol has session command set, concatenate whole
* response into one buffer. * response into one buffer.
*/ */
if (protocol_get_srv_command((MySQLProtocol *) dcb->protocol, false) != MYSQL_COM_UNDEFINED) if (protocol_get_srv_command((MySQLProtocol *)dcb->protocol, false) != MYSQL_COM_UNDEFINED)
{ {
read_buffer = process_response_data(dcb, read_buffer, gwbuf_length(read_buffer)); stmt = process_response_data(dcb, &read_buffer, gwbuf_length(read_buffer));
/** /**
* Received incomplete response to session command. * Received incomplete response to session command.
* Store it to readqueue and return. * Store it to readqueue and return.
*/ */
if (!sescmd_response_complete(dcb)) if (!sescmd_response_complete(dcb))
{ {
stmt = gwbuf_append(stmt, read_buffer);
spinlock_acquire(&dcb->authlock);
dcb->dcb_readqueue = gwbuf_append(stmt, dcb->dcb_readqueue);
spinlock_release(&dcb->authlock);
return_code = 0; return_code = 0;
goto return_rc; goto return_rc;
} }
if (!read_buffer) if (!stmt)
{ {
MXS_NOTICE("%lu [gw_read_backend_event] " MXS_NOTICE("%lu [gw_read_backend_event] "
"Read buffer unexpectedly null, even though response " "Read buffer unexpectedly null, even though response "
@ -1119,6 +1126,12 @@ gw_read_and_write(DCB *dcb, MYSQL_session local_session)
goto return_rc; goto return_rc;
} }
} }
else
{
stmt = read_buffer;
read_buffer = NULL;
}
/** /**
* Check that session is operable, and that client DCB is * Check that session is operable, and that client DCB is
* still listening the socket for replies. * still listening the socket for replies.
@ -1127,7 +1140,7 @@ gw_read_and_write(DCB *dcb, MYSQL_session local_session)
dcb->session->client_dcb != NULL && dcb->session->client_dcb != NULL &&
dcb->session->client_dcb->state == DCB_STATE_POLLING && dcb->session->client_dcb->state == DCB_STATE_POLLING &&
(session->router_session || (session->router_session ||
session->service->router->getCapabilities() & (int)RCAP_TYPE_NO_RSESSION)) session->service->router->getCapabilities() & (int)RCAP_TYPE_NO_RSESSION))
{ {
MySQLProtocol *client_protocol = (MySQLProtocol *)dcb->session->client_dcb->protocol; MySQLProtocol *client_protocol = (MySQLProtocol *)dcb->session->client_dcb->protocol;
if (client_protocol != NULL) if (client_protocol != NULL)
@ -1136,30 +1149,29 @@ gw_read_and_write(DCB *dcb, MYSQL_session local_session)
if (client_protocol->protocol_auth_state == MYSQL_IDLE) if (client_protocol->protocol_auth_state == MYSQL_IDLE)
{ {
gwbuf_set_type(read_buffer, GWBUF_TYPE_MYSQL); gwbuf_set_type(stmt, GWBUF_TYPE_MYSQL);
session->service->router->clientReply( session->service->router->clientReply(session->service->router_instance,
session->service->router_instance, session->router_session,
session->router_session, stmt, dcb);
read_buffer,
dcb);
return_code = 1; return_code = 1;
} }
} }
else if (dcb->session->client_dcb->dcb_role == DCB_ROLE_INTERNAL) else if (dcb->session->client_dcb->dcb_role == DCB_ROLE_INTERNAL)
{ {
gwbuf_set_type(read_buffer, GWBUF_TYPE_MYSQL); gwbuf_set_type(stmt, GWBUF_TYPE_MYSQL);
session->service->router->clientReply( session->service->router->clientReply(session->service->router_instance,
session->service->router_instance, session->router_session,
session->router_session, stmt, dcb);
read_buffer, dcb);
return_code = 1; return_code = 1;
} }
} }
else /*< session is closing; replying to client isn't possible */ else /*< session is closing; replying to client isn't possible */
{ {
gwbuf_free(read_buffer); gwbuf_free(stmt);
} }
}
while (read_buffer);
return_rc: return_rc:
return return_code; return return_code;
@ -1928,7 +1940,7 @@ retblock:
* @return GWBUF which includes complete MySQL packet * @return GWBUF which includes complete MySQL packet
*/ */
static GWBUF* process_response_data(DCB* dcb, static GWBUF* process_response_data(DCB* dcb,
GWBUF* readbuf, GWBUF** readbuf,
int nbytes_to_process) int nbytes_to_process)
{ {
int npackets_left = 0; /*< response's packet count */ int npackets_left = 0; /*< response's packet count */
@ -1946,7 +1958,7 @@ static GWBUF* process_response_data(DCB* dcb,
} }
/** All buffers processed here are sescmd responses */ /** All buffers processed here are sescmd responses */
gwbuf_set_type(readbuf, GWBUF_TYPE_SESCMD_RESPONSE); gwbuf_set_type(*readbuf, GWBUF_TYPE_SESCMD_RESPONSE);
/** /**
* Now it is known how many packets there should be and how much * Now it is known how many packets there should be and how much
@ -1980,7 +1992,7 @@ static GWBUF* process_response_data(DCB* dcb,
* packet content. Fails if read buffer doesn't include * packet content. Fails if read buffer doesn't include
* enough data to read the packet length. * enough data to read the packet length.
*/ */
init_response_status(readbuf, srvcmd, &npackets_left, &nbytes_left); init_response_status(*readbuf, srvcmd, &npackets_left, &nbytes_left);
} }
initial_packets = npackets_left; initial_packets = npackets_left;
@ -1996,7 +2008,7 @@ static GWBUF* process_response_data(DCB* dcb,
if (nbytes_to_process >= 5) if (nbytes_to_process >= 5)
{ {
/** discard source buffer */ /** discard source buffer */
readbuf = gwbuf_consume(readbuf, GWBUF_LENGTH(readbuf)); *readbuf = gwbuf_consume(*readbuf, GWBUF_LENGTH(*readbuf));
nbytes_left -= nbytes_to_process; nbytes_left -= nbytes_to_process;
} }
nbytes_to_process = 0; nbytes_to_process = 0;
@ -2008,8 +2020,8 @@ static GWBUF* process_response_data(DCB* dcb,
nbytes_to_process = 0; nbytes_to_process = 0;
ss_dassert(npackets_left > 0); ss_dassert(npackets_left > 0);
npackets_left -= 1; npackets_left -= 1;
outbuf = gwbuf_append(outbuf, readbuf); outbuf = gwbuf_append(outbuf, *readbuf);
readbuf = NULL; *readbuf = NULL;
} }
/** /**
* Buffer contains more data than we need. Split the complete packet and * Buffer contains more data than we need. Split the complete packet and
@ -2020,7 +2032,7 @@ static GWBUF* process_response_data(DCB* dcb,
ss_dassert(nbytes_left < nbytes_to_process); ss_dassert(nbytes_left < nbytes_to_process);
ss_dassert(nbytes_left > 0); ss_dassert(nbytes_left > 0);
ss_dassert(npackets_left > 0); ss_dassert(npackets_left > 0);
outbuf = gwbuf_append(outbuf, gwbuf_split(&readbuf, nbytes_left)); outbuf = gwbuf_append(outbuf, gwbuf_split(readbuf, nbytes_left));
nbytes_to_process -= nbytes_left; nbytes_to_process -= nbytes_left;
npackets_left -= 1; npackets_left -= 1;
nbytes_left = 0; nbytes_left = 0;
@ -2056,7 +2068,7 @@ static GWBUF* process_response_data(DCB* dcb,
* three bytes left. If there is less than three * three bytes left. If there is less than three
* bytes in the buffer or it is NULL, we need to * bytes in the buffer or it is NULL, we need to
wait for more data from the backend server.*/ wait for more data from the backend server.*/
if (readbuf == NULL || gwbuf_length(readbuf) < 3) if (*readbuf == NULL || gwbuf_length(*readbuf) < 3)
{ {
MXS_DEBUG("%lu [%s] Read %d packets. Waiting for %d more " MXS_DEBUG("%lu [%s] Read %d packets. Waiting for %d more "
"packets for a total of %d packets.", "packets for a total of %d packets.",
@ -2073,7 +2085,7 @@ static GWBUF* process_response_data(DCB* dcb,
return NULL; return NULL;
} }
uint8_t packet_len[3]; uint8_t packet_len[3];
gwbuf_copy_data(readbuf, 0, 3, packet_len); gwbuf_copy_data(*readbuf, 0, 3, packet_len);
nbytes_left = gw_mysql_get_byte3(packet_len) + MYSQL_HEADER_LEN; nbytes_left = gw_mysql_get_byte3(packet_len) + MYSQL_HEADER_LEN;
/** Store new status to protocol structure */ /** Store new status to protocol structure */
protocol_set_response_status(p, npackets_left, nbytes_left); protocol_set_response_status(p, npackets_left, nbytes_left);

View File

@ -2421,23 +2421,11 @@ static bool route_single_stmt(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
bref->bref_backend->backend_server->port); bref->bref_backend->backend_server->port);
/** /**
* Store current stmt if execution of previous session command * Store current stmt if execution of previous session command
* haven't completed yet. * hasn't completed yet.
*
* !!! Note that according to MySQL protocol
* there can only be one such non-sescmd stmt at the time.
* It is possible that bref->bref_pending_cmd includes a pending
* command if rwsplit is parent or child for another router,
* which runs all the same commands.
*
* If the assertion below traps, pending queries are treated
* somehow wrong, or client is sending more queries before
* previous is received.
*/ */
if (sescmd_cursor_is_active(scur)) if (sescmd_cursor_is_active(scur) && bref != rses->rses_master_ref)
{ {
ss_dassert(bref->bref_pending_cmd == NULL); bref->bref_pending_cmd = gwbuf_append(bref->bref_pending_cmd, gwbuf_clone(querybuf));
bref->bref_pending_cmd = gwbuf_clone(querybuf);
rses_end_locked_router_action(rses); rses_end_locked_router_action(rses);
goto retblock; goto retblock;
} }
@ -4247,7 +4235,7 @@ static bool route_session_write(ROUTER_CLIENT_SES *router_cli_ses,
* Otherwise, cursor will execute pending commands * Otherwise, cursor will execute pending commands
* when it completes with previous commands. * when it completes with previous commands.
*/ */
if (sescmd_cursor_is_active(scur)) if (sescmd_cursor_is_active(scur) && &backend_ref[i] != router_cli_ses->rses_master_ref)
{ {
nsucc += 1; nsucc += 1;
MXS_INFO("Backend %s:%d already executing sescmd.", MXS_INFO("Backend %s:%d already executing sescmd.",