From bb364f531bb48cc0f55c3372c020782be9b7ab56 Mon Sep 17 00:00:00 2001 From: VilhoRaatikka Date: Fri, 16 May 2014 17:01:10 +0200 Subject: [PATCH] Bug #425, http://bugs.skysql.com/show_bug.cgi?id=425 mysqlk_client.c now assumes that mysql packets may arrive in separate pieces. gw_read_client_event reads client data, and if packet is incomplete it is added to new dcb->dcb_readqueue. When next packet is combined with data in readqueue. mysql_common.c:gw_MySQL_get_next_packet can now return mysql packet from read buffer although it would span multiple gwbuf buffers. --- server/core/dcb.c | 8 +- server/include/dcb.h | 2 + .../include/mysql_client_server_protocol.h | 3 +- server/modules/protocol/mysql_client.c | 151 +++++++++++------- server/modules/protocol/mysql_common.c | 98 ++++++++---- .../routing/readwritesplit/readwritesplit.c | 30 +--- 6 files changed, 180 insertions(+), 112 deletions(-) diff --git a/server/core/dcb.c b/server/core/dcb.c index 88703a86e..bfc815dfe 100644 --- a/server/core/dcb.c +++ b/server/core/dcb.c @@ -117,6 +117,7 @@ DCB *rval; spinlock_init(&rval->dcb_initlock); spinlock_init(&rval->writeqlock); spinlock_init(&rval->delayqlock); + spinlock_init(&rval->dcb_readqlock); spinlock_init(&rval->authlock); rval->fd = -1; memset(&rval->stats, 0, sizeof(DCBSTATS)); // Zero the statistics @@ -302,12 +303,17 @@ dcb_final_free(DCB *dcb) if (dcb->remote) free(dcb->remote); - /* Consume dcb->delayq buffer */ + /* Clear write and read buffers */ if (dcb->delayq) { GWBUF *queue = dcb->delayq; while ((queue = gwbuf_consume(queue, GWBUF_LENGTH(queue))) != NULL); } + if (dcb->dcb_readqueue) + { + GWBUF* queue = dcb->dcb_readqueue; + while ((queue = gwbuf_consume(queue, GWBUF_LENGTH(queue))) != NULL); + } bitmask_free(&dcb->memdata.bitmask); simple_mutex_done(&dcb->dcb_read_lock); simple_mutex_done(&dcb->dcb_write_lock); diff --git a/server/include/dcb.h b/server/include/dcb.h index 9ca60e004..f0c4cff31 100644 --- a/server/include/dcb.h +++ b/server/include/dcb.h @@ -176,6 +176,8 @@ typedef struct dcb { GWBUF *writeq; /**< Write Data Queue */ SPINLOCK delayqlock; /**< Delay Backend Write Queue spinlock */ GWBUF *delayq; /**< Delay Backend Write Data Queue */ + GWBUF *dcb_readqueue; /**< read queue for storing incomplete reads */ + SPINLOCK dcb_readqlock; /**< read/write access protection to read queue */ SPINLOCK authlock; /**< Generic Authorization spinlock */ DCBSTATS stats; /**< DCB related statistics */ diff --git a/server/modules/include/mysql_client_server_protocol.h b/server/modules/include/mysql_client_server_protocol.h index 3ad4502eb..41bcf0416 100644 --- a/server/modules/include/mysql_client_server_protocol.h +++ b/server/modules/include/mysql_client_server_protocol.h @@ -304,4 +304,5 @@ char *gw_strend(register const char *s); int setnonblocking(int fd); int setipaddress(struct in_addr *a, char *p); int gw_read_gwbuff(DCB *dcb, GWBUF **head, int b); -GWBUF* gw_MySQL_get_next_stmt(GWBUF** p_readbuf); +GWBUF* gw_MySQL_get_next_packet(GWBUF** p_readbuf); + diff --git a/server/modules/protocol/mysql_client.c b/server/modules/protocol/mysql_client.c index 5e4c9bb21..574c5d426 100644 --- a/server/modules/protocol/mysql_client.c +++ b/server/modules/protocol/mysql_client.c @@ -507,9 +507,11 @@ int gw_read_client_event(DCB* dcb) { ROUTER *router_instance = NULL; void *rsession = NULL; MySQLProtocol *protocol = NULL; + GWBUF *read_buffer = NULL; int b = -1; int rc = 0; - + int nbytes_read = 0; + CHK_DCB(dcb); protocol = DCB_PROTOCOL(dcb, MySQLProtocol); CHK_PROTOCOL(protocol); @@ -536,7 +538,6 @@ int gw_read_client_event(DCB* dcb) { /* * Handle the closed client socket. */ - if (b == 0) { char c; int l_errno = 0; @@ -562,40 +563,83 @@ int gw_read_client_event(DCB* dcb) { goto return_rc; } + rc = gw_read_gwbuff(dcb, &read_buffer, b); + + if (rc != 0) { + goto return_rc; + } + + nbytes_read = gwbuf_length(read_buffer); + ss_dassert(nbytes_read > 0); + + /** + * if read queue existed appent read to it. + * if length of read buffer is less than 3 or less than mysql packet + * then return. + * else copy mysql packets to separate buffers from read buffer and + * continue. + * else + * if read queue didn't exist, length of read is less than 3 or less + * than mysql packet then + * create read queue and append to it and return. + * if length read is less than mysql packet length append to read queue + * append to it and return. + * else (complete packet was read) continue. + */ + if (dcb->dcb_readqueue) + { + uint8_t* data = (uint8_t *)GWBUF_DATA(read_buffer); + + read_buffer = gwbuf_append(dcb->dcb_readqueue, read_buffer); + nbytes_read = gwbuf_length(read_buffer); + + if (nbytes_read < 3 || nbytes_read < MYSQL_GET_PACKET_LEN(data)) + { + rc = 0; + goto return_rc; + } + else + { + /** + * There is at least one complete mysql packet read + */ + read_buffer = dcb->dcb_readqueue; + dcb->dcb_readqueue = NULL; + } + } + else + { + uint8_t* data = (uint8_t *)GWBUF_DATA(read_buffer); + size_t packetlen = MYSQL_GET_PACKET_LEN(data)+4; + + if (nbytes_read < 3 || nbytes_read < packetlen) + { + gwbuf_append(dcb->dcb_readqueue, read_buffer); + rc = 0; + goto return_rc; + } + } + ss_dassert(nbytes_read >= MYSQL_GET_PACKET_LEN((uint8_t *)GWBUF_DATA(read_buffer))+4); + ss_dassert(nbytes_read == MYSQL_GET_PACKET_LEN((uint8_t *)GWBUF_DATA(read_buffer))+4 || + dcb->dcb_readqueue != NULL); + + /** + * Now there should be at least one complete mysql packet in read_buffer. + */ switch (protocol->state) { + case MYSQL_AUTH_SENT: /* * Read all the data that is available into a chain of buffers */ { - int len = -1; - GWBUF *queue = NULL; - GWBUF *gw_buffer = NULL; int auth_val = -1; - ////////////////////////////////////////////////////// - // read and handle errors & close, or return if busy - // note: if b == 0 error handling is not - // triggered, just return - // without closing - ////////////////////////////////////////////////////// - rc = gw_read_gwbuff(dcb, &gw_buffer, b); - - if (rc != 0) { - goto return_rc; - } - - // example with consume, assuming one buffer only ... - queue = gw_buffer; - len = GWBUF_LENGTH(queue); - - ss_dassert(len > 0); - - auth_val = gw_mysql_do_authentication(dcb, queue); - + + auth_val = gw_mysql_do_authentication(dcb, read_buffer); // Data handled withot the dcb->func.write // so consume it now // be sure to consume it all - queue = gwbuf_consume(queue, len); + read_buffer = gwbuf_consume(read_buffer, nbytes_read); if (auth_val == 0) { @@ -638,9 +682,7 @@ int gw_read_client_event(DCB* dcb) { * Read all the data that is available into a chain of buffers */ { - int len = -1; uint8_t cap = 0; - GWBUF *read_buffer = NULL; uint8_t *ptr_buff = NULL; int mysql_command = -1; bool stmt_input; /*< router input type */ @@ -655,22 +697,14 @@ int gw_read_client_event(DCB* dcb) { session->service->router_instance; rsession = session->router_session; } - - ////////////////////////////////////////////////////// - // read and handle errors & close, or return if busy - ////////////////////////////////////////////////////// - rc = gw_read_gwbuff(dcb, &read_buffer, b); - - if (rc != 0) { - goto return_rc; - } + /* Now, we are assuming in the first buffer there is * the information form mysql command */ - len = GWBUF_LENGTH(read_buffer); ptr_buff = GWBUF_DATA(read_buffer); /* get mysql commang at fifth byte */ if (ptr_buff) { + ss_dassert(nbytes_read >= 5); mysql_command = ptr_buff[4]; } /** @@ -701,7 +735,7 @@ int gw_read_client_event(DCB* dcb) { } rc = 1; /** Free buffer */ - read_buffer = gwbuf_consume(read_buffer, len); + read_buffer = gwbuf_consume(read_buffer, nbytes_read); goto return_rc; } /** Ask what type of input the router expects */ @@ -762,7 +796,12 @@ int gw_read_client_event(DCB* dcb) { rc = route_by_statement(router_instance, router, rsession, - read_buffer); + read_buffer); + if (read_buffer != NULL) + { + /** add incomplete mysql packet to read queue */ + gwbuf_append(dcb->dcb_readqueue, read_buffer); + } } else { @@ -1353,6 +1392,10 @@ return_rc: * Detect if buffer includes partial mysql packet or multiple packets. * Store partial packet to pendingqueue. Send complete packets one by one * to router. + * + * It is assumed readbuf includes at least one complete packet. + * Return 1 in success. If the last packet is incomplete return success but + * leave incomplete packet to readbuf. */ static int route_by_statement( ROUTER* router_instance, @@ -1361,26 +1404,26 @@ static int route_by_statement( GWBUF* readbuf) { int rc = -1; - DCB* master_dcb; - GWBUF* stmtbuf; - uint8_t* payload; - static size_t len; + GWBUF* packetbuf; do { - stmtbuf = gw_MySQL_get_next_stmt(&readbuf); - ss_dassert(stmtbuf != NULL); - CHK_GWBUF(stmtbuf); - - payload = (uint8_t *)GWBUF_DATA(stmtbuf); - /** - * If message is longer than read data, suspend routing and - * add statement buffer to wait queue. - */ - rc = router->routeQuery(router_instance, rsession, stmtbuf); + packetbuf = gw_MySQL_get_next_packet(&readbuf); + + if (packetbuf != NULL) + { + CHK_GWBUF(packetbuf); + rc = router->routeQuery(router_instance, rsession, packetbuf); + } + else + { + rc = 1; + goto return_rc; + } } while (readbuf != NULL); +return_rc: return rc; } diff --git a/server/modules/protocol/mysql_common.c b/server/modules/protocol/mysql_common.c index 354bb4e03..7a017d041 100644 --- a/server/modules/protocol/mysql_common.c +++ b/server/modules/protocol/mysql_common.c @@ -1279,54 +1279,86 @@ mysql_send_auth_error (DCB *dcb, int packet_number, int in_affected_rows, const /** - * Remove the first mysql statement from buffer. Return pointer to the removed - * statement or NULL if buffer is empty. - * - * Clone buf, calculate the length of included mysql stmt, and point the - * statement with cloned buffer. Move the start pointer of buf accordingly - * so that it only cover the remaining buffer. + * Buffer contains at least one of the following: + * complete [complete] [partial] mysql packet * + * return pointer to gwbuf containing a complete packet or + * NULL if no complete packet was found. */ -GWBUF* gw_MySQL_get_next_stmt( +GWBUF* gw_MySQL_get_next_packet( GWBUF** p_readbuf) { - GWBUF* stmtbuf; - size_t buflen; - size_t strlen; - uint8_t* packet; + GWBUF* packetbuf; + GWBUF* readbuf; + size_t buflen; + size_t packetlen; + size_t totalbuflen; + uint8_t* data; - if (*p_readbuf == NULL) + readbuf = *p_readbuf; + + if (readbuf == NULL) { - stmtbuf = NULL; - goto return_stmtbuf; + packetbuf = NULL; + goto return_packetbuf; } - CHK_GWBUF(*p_readbuf); + CHK_GWBUF(readbuf); - if (GWBUF_EMPTY(*p_readbuf)) + if (GWBUF_EMPTY(readbuf)) { - stmtbuf = NULL; - goto return_stmtbuf; + packetbuf = NULL; + goto return_packetbuf; } - buflen = GWBUF_LENGTH((*p_readbuf)); - packet = GWBUF_DATA((*p_readbuf)); - strlen = MYSQL_GET_PACKET_LEN(packet); + + buflen = GWBUF_LENGTH((readbuf)); + totalbuflen = gwbuf_length(readbuf); + data = (uint8_t *)GWBUF_DATA((readbuf)); + packetlen = MYSQL_GET_PACKET_LEN(data)+4; - if (strlen+4 == buflen) + /** packet is incomplete */ + if (packetlen > totalbuflen) { - stmtbuf = *p_readbuf; - *p_readbuf = NULL; - goto return_stmtbuf; + packetbuf = NULL; + goto return_packetbuf; } - /** vraa :Multi-packet stmt is not supported as of 7.3.14 */ - if (strlen-1 > buflen-5) + + if (packetlen == buflen) { - stmtbuf = NULL; - goto return_stmtbuf; + packetbuf = gwbuf_clone_portion(readbuf, 0, packetlen); + *p_readbuf = gwbuf_consume(readbuf, packetlen); + goto return_packetbuf; + } + /** + * Packet spans multiple buffers. + * Allocate buffer for complete packet + * copy packet parts into it and consume copied bytes + */ + else if (packetlen > buflen) + { + size_t nbytes_copied = 0; + uint8_t* target; + + packetbuf = gwbuf_alloc(packetlen); + target = GWBUF_DATA(packetbuf); + + while (nbytes_copied < packetlen) + { + uint8_t* src = GWBUF_DATA(readbuf); + size_t buflen = GWBUF_LENGTH(readbuf); + + memcpy(target+nbytes_copied, src, buflen); + *p_readbuf = gwbuf_consume(readbuf, buflen); + nbytes_copied += buflen; + } + ss_dassert(nbytes_copied == packetlen); + } + else + { + packetbuf = gwbuf_clone_portion(readbuf, 0, packetlen); + *p_readbuf = gwbuf_consume(readbuf, packetlen); } - stmtbuf = gwbuf_clone_portion(*p_readbuf, 0, strlen+4); - *p_readbuf = gwbuf_consume(*p_readbuf, strlen+4); -return_stmtbuf: - return stmtbuf; +return_packetbuf: + return packetbuf; } diff --git a/server/modules/routing/readwritesplit/readwritesplit.c b/server/modules/routing/readwritesplit/readwritesplit.c index dcea42d19..614c5c3e8 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.c +++ b/server/modules/routing/readwritesplit/readwritesplit.c @@ -547,19 +547,12 @@ static void freeSession( } /** - * The main routing entry, this is called with every packet that is - * received and has to be forwarded to the backend database. + * It is assumed that whole query comes in a single gwbuf instead of linked list. + * * - * The routeQuery will make the routing decision based on the contents - * of the instance, session and the query itself in the queue. The - * data in the queue may not represent a complete query, it represents - * the data that has been received. The query router itself is responsible - * for buffering the partial query, a later call to the query router will - * contain the remainder, or part thereof of the query. - * - * @param instance The query router instance - * @param session The session associated with the client - * @param queue Gateway buffer queue with the packets received + * @param instance The router instance + * @param router_session The session associated with the client + * @param querybuf Gateway buffer queue with the packets received * * @return The number of queries forwarded */ @@ -580,12 +573,7 @@ static int routeQuery( ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance; ROUTER_CLIENT_SES* router_cli_ses = (ROUTER_CLIENT_SES *)router_session; bool rses_is_closed; - rses_property_t* prop; size_t len; - /** if false everything goes to master and session commands to slave too */ - static bool autocommit_enabled = true; - /** if true everything goes to master and session commands to slave too */ - static bool transaction_active = false; CHK_CLIENT_RSES(router_cli_ses); @@ -737,9 +725,7 @@ static int routeQuery( } else if (QUERY_IS_TYPE(qtype, QUERY_TYPE_READ) && !router_cli_ses->rses_transaction_active) - { - bool succp; - + { LOGIF(LT, (skygw_log_write( LOGFILE_TRACE, "Read-only query, routing to Slave."))); @@ -751,9 +737,7 @@ static int routeQuery( goto return_ret; } else - { - bool succp = true; - + { if (LOG_IS_ENABLED(LOGFILE_TRACE)) { if (router_cli_ses->rses_transaction_active) /*< all to master */