diff --git a/server/modules/protocol/mysql_backend.c b/server/modules/protocol/mysql_backend.c index aac99511d..23bf2a2b4 100644 --- a/server/modules/protocol/mysql_backend.c +++ b/server/modules/protocol/mysql_backend.c @@ -79,7 +79,7 @@ static int backend_write_delayqueue(DCB *dcb); static void backend_set_delayqueue(DCB *dcb, GWBUF *queue); static int gw_change_user(DCB *backend_dcb, SERVER *server, SESSION *in_session, GWBUF *queue); static char *gw_backend_default_auth(); -static GWBUF* process_response_data(DCB* dcb, GWBUF* readbuf, int nbytes_to_process); +static GWBUF* process_response_data(DCB* dcb, GWBUF** readbuf, int nbytes_to_process); extern char* create_auth_failed_msg(GWBUF* readbuf, char* hostaddr, uint8_t* sha1); static bool sescmd_response_complete(DCB* dcb); static int gw_read_reply_or_error(DCB *dcb, MYSQL_session local_session); @@ -1091,24 +1091,31 @@ gw_read_and_write(DCB *dcb, MYSQL_session local_session) } } + do + { + GWBUF *stmt = NULL; /** * If protocol has session command set, concatenate whole * response into one buffer. */ - if (protocol_get_srv_command((MySQLProtocol *) dcb->protocol, false) != MYSQL_COM_UNDEFINED) + if (protocol_get_srv_command((MySQLProtocol *)dcb->protocol, false) != MYSQL_COM_UNDEFINED) { - read_buffer = process_response_data(dcb, read_buffer, gwbuf_length(read_buffer)); + stmt = process_response_data(dcb, &read_buffer, gwbuf_length(read_buffer)); /** * Received incomplete response to session command. * Store it to readqueue and return. */ if (!sescmd_response_complete(dcb)) { + stmt = gwbuf_append(stmt, read_buffer); + spinlock_acquire(&dcb->authlock); + dcb->dcb_readqueue = gwbuf_append(stmt, dcb->dcb_readqueue); + spinlock_release(&dcb->authlock); return_code = 0; goto return_rc; } - if (!read_buffer) + if (!stmt) { MXS_NOTICE("%lu [gw_read_backend_event] " "Read buffer unexpectedly null, even though response " @@ -1119,6 +1126,12 @@ gw_read_and_write(DCB *dcb, MYSQL_session local_session) goto return_rc; } } + else + { + stmt = read_buffer; + read_buffer = NULL; + } + /** * Check that session is operable, and that client DCB is * still listening the socket for replies. @@ -1127,7 +1140,7 @@ gw_read_and_write(DCB *dcb, MYSQL_session local_session) dcb->session->client_dcb != NULL && dcb->session->client_dcb->state == DCB_STATE_POLLING && (session->router_session || - session->service->router->getCapabilities() & (int)RCAP_TYPE_NO_RSESSION)) + session->service->router->getCapabilities() & (int)RCAP_TYPE_NO_RSESSION)) { MySQLProtocol *client_protocol = (MySQLProtocol *)dcb->session->client_dcb->protocol; if (client_protocol != NULL) @@ -1136,30 +1149,29 @@ gw_read_and_write(DCB *dcb, MYSQL_session local_session) if (client_protocol->protocol_auth_state == MYSQL_IDLE) { - gwbuf_set_type(read_buffer, GWBUF_TYPE_MYSQL); + gwbuf_set_type(stmt, GWBUF_TYPE_MYSQL); - session->service->router->clientReply( - session->service->router_instance, - session->router_session, - read_buffer, - dcb); + session->service->router->clientReply(session->service->router_instance, + session->router_session, + stmt, dcb); return_code = 1; } } else if (dcb->session->client_dcb->dcb_role == DCB_ROLE_INTERNAL) { - gwbuf_set_type(read_buffer, GWBUF_TYPE_MYSQL); - session->service->router->clientReply( - session->service->router_instance, - session->router_session, - read_buffer, dcb); + gwbuf_set_type(stmt, GWBUF_TYPE_MYSQL); + session->service->router->clientReply(session->service->router_instance, + session->router_session, + stmt, dcb); return_code = 1; } } else /*< session is closing; replying to client isn't possible */ { - gwbuf_free(read_buffer); + gwbuf_free(stmt); } + } + while (read_buffer); return_rc: return return_code; @@ -1928,7 +1940,7 @@ retblock: * @return GWBUF which includes complete MySQL packet */ static GWBUF* process_response_data(DCB* dcb, - GWBUF* readbuf, + GWBUF** readbuf, int nbytes_to_process) { int npackets_left = 0; /*< response's packet count */ @@ -1946,7 +1958,7 @@ static GWBUF* process_response_data(DCB* dcb, } /** All buffers processed here are sescmd responses */ - gwbuf_set_type(readbuf, GWBUF_TYPE_SESCMD_RESPONSE); + gwbuf_set_type(*readbuf, GWBUF_TYPE_SESCMD_RESPONSE); /** * Now it is known how many packets there should be and how much @@ -1980,7 +1992,7 @@ static GWBUF* process_response_data(DCB* dcb, * packet content. Fails if read buffer doesn't include * enough data to read the packet length. */ - init_response_status(readbuf, srvcmd, &npackets_left, &nbytes_left); + init_response_status(*readbuf, srvcmd, &npackets_left, &nbytes_left); } initial_packets = npackets_left; @@ -1996,7 +2008,7 @@ static GWBUF* process_response_data(DCB* dcb, if (nbytes_to_process >= 5) { /** discard source buffer */ - readbuf = gwbuf_consume(readbuf, GWBUF_LENGTH(readbuf)); + *readbuf = gwbuf_consume(*readbuf, GWBUF_LENGTH(*readbuf)); nbytes_left -= nbytes_to_process; } nbytes_to_process = 0; @@ -2008,8 +2020,8 @@ static GWBUF* process_response_data(DCB* dcb, nbytes_to_process = 0; ss_dassert(npackets_left > 0); npackets_left -= 1; - outbuf = gwbuf_append(outbuf, readbuf); - readbuf = NULL; + outbuf = gwbuf_append(outbuf, *readbuf); + *readbuf = NULL; } /** * Buffer contains more data than we need. Split the complete packet and @@ -2020,7 +2032,7 @@ static GWBUF* process_response_data(DCB* dcb, ss_dassert(nbytes_left < nbytes_to_process); ss_dassert(nbytes_left > 0); ss_dassert(npackets_left > 0); - outbuf = gwbuf_append(outbuf, gwbuf_split(&readbuf, nbytes_left)); + outbuf = gwbuf_append(outbuf, gwbuf_split(readbuf, nbytes_left)); nbytes_to_process -= nbytes_left; npackets_left -= 1; nbytes_left = 0; @@ -2056,7 +2068,7 @@ static GWBUF* process_response_data(DCB* dcb, * three bytes left. If there is less than three * bytes in the buffer or it is NULL, we need to wait for more data from the backend server.*/ - if (readbuf == NULL || gwbuf_length(readbuf) < 3) + if (*readbuf == NULL || gwbuf_length(*readbuf) < 3) { MXS_DEBUG("%lu [%s] Read %d packets. Waiting for %d more " "packets for a total of %d packets.", @@ -2073,7 +2085,7 @@ static GWBUF* process_response_data(DCB* dcb, return NULL; } uint8_t packet_len[3]; - gwbuf_copy_data(readbuf, 0, 3, packet_len); + gwbuf_copy_data(*readbuf, 0, 3, packet_len); nbytes_left = gw_mysql_get_byte3(packet_len) + MYSQL_HEADER_LEN; /** Store new status to protocol structure */ protocol_set_response_status(p, npackets_left, nbytes_left); diff --git a/server/modules/routing/readwritesplit/readwritesplit.c b/server/modules/routing/readwritesplit/readwritesplit.c index 0320b60dd..353c40b4e 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.c +++ b/server/modules/routing/readwritesplit/readwritesplit.c @@ -2421,23 +2421,11 @@ static bool route_single_stmt(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, bref->bref_backend->backend_server->port); /** * Store current stmt if execution of previous session command - * haven't completed yet. - * - * !!! Note that according to MySQL protocol - * there can only be one such non-sescmd stmt at the time. - * It is possible that bref->bref_pending_cmd includes a pending - * command if rwsplit is parent or child for another router, - * which runs all the same commands. - * - * If the assertion below traps, pending queries are treated - * somehow wrong, or client is sending more queries before - * previous is received. + * hasn't completed yet. */ - if (sescmd_cursor_is_active(scur)) + if (sescmd_cursor_is_active(scur) && bref != rses->rses_master_ref) { - ss_dassert(bref->bref_pending_cmd == NULL); - bref->bref_pending_cmd = gwbuf_clone(querybuf); - + bref->bref_pending_cmd = gwbuf_append(bref->bref_pending_cmd, gwbuf_clone(querybuf)); rses_end_locked_router_action(rses); goto retblock; } @@ -4247,7 +4235,7 @@ static bool route_session_write(ROUTER_CLIENT_SES *router_cli_ses, * Otherwise, cursor will execute pending commands * when it completes with previous commands. */ - if (sescmd_cursor_is_active(scur)) + if (sescmd_cursor_is_active(scur) && &backend_ref[i] != router_cli_ses->rses_master_ref) { nsucc += 1; MXS_INFO("Backend %s:%d already executing sescmd.",