diff --git a/server/core/dcb.c b/server/core/dcb.c index c1030faf2..9850fb710 100644 --- a/server/core/dcb.c +++ b/server/core/dcb.c @@ -302,12 +302,17 @@ dcb_final_free(DCB *dcb) if (dcb->remote) free(dcb->remote); - /* Consume dcb->delayq buffer */ + /* Clear write and read buffers */ if (dcb->delayq) { GWBUF *queue = dcb->delayq; while ((queue = gwbuf_consume(queue, GWBUF_LENGTH(queue))) != NULL); } + if (dcb->dcb_readqueue) + { + GWBUF* queue = dcb->dcb_readqueue; + while ((queue = gwbuf_consume(queue, GWBUF_LENGTH(queue))) != NULL); + } bitmask_free(&dcb->memdata.bitmask); simple_mutex_done(&dcb->dcb_read_lock); simple_mutex_done(&dcb->dcb_write_lock); diff --git a/server/include/dcb.h b/server/include/dcb.h index 9ca60e004..b56a8be5f 100644 --- a/server/include/dcb.h +++ b/server/include/dcb.h @@ -176,6 +176,7 @@ typedef struct dcb { GWBUF *writeq; /**< Write Data Queue */ SPINLOCK delayqlock; /**< Delay Backend Write Queue spinlock */ GWBUF *delayq; /**< Delay Backend Write Data Queue */ + GWBUF *dcb_readqueue; /**< read queue for storing incomplete reads */ SPINLOCK authlock; /**< Generic Authorization spinlock */ DCBSTATS stats; /**< DCB related statistics */ diff --git a/server/modules/include/mysql_client_server_protocol.h b/server/modules/include/mysql_client_server_protocol.h index 3ad4502eb..41bcf0416 100644 --- a/server/modules/include/mysql_client_server_protocol.h +++ b/server/modules/include/mysql_client_server_protocol.h @@ -304,4 +304,5 @@ char *gw_strend(register const char *s); int setnonblocking(int fd); int setipaddress(struct in_addr *a, char *p); int gw_read_gwbuff(DCB *dcb, GWBUF **head, int b); -GWBUF* gw_MySQL_get_next_stmt(GWBUF** p_readbuf); +GWBUF* gw_MySQL_get_next_packet(GWBUF** p_readbuf); + diff --git a/server/modules/include/readwritesplit.h b/server/modules/include/readwritesplit.h index bea8f82aa..a4eecf4d5 100644 --- a/server/modules/include/readwritesplit.h +++ b/server/modules/include/readwritesplit.h @@ -192,6 +192,7 @@ struct router_client_session { int rses_capabilities; /*< input type, for example */ bool rses_autocommit_enabled; bool rses_transaction_active; + uint64_t rses_id; /*< ID for router client session */ struct router_client_session* next; #if defined(SS_DEBUG) skygw_chk_t rses_chk_tail; diff --git a/server/modules/protocol/mysql_client.c b/server/modules/protocol/mysql_client.c index 71146a29c..bcd94e423 100644 --- a/server/modules/protocol/mysql_client.c +++ b/server/modules/protocol/mysql_client.c @@ -33,10 +33,9 @@ * If current user is authenticated the new users' table will replace the old one * 28/02/2014 Massimiliano Pinto Added: client IPv4 in dcb->ipv4 and inet_ntop for string representation * 11/03/2014 Massimiliano Pinto Added: Unix socket support - * 07/05/2014 Massimiliano Pinto Added: specific version string in server handshake + * 07/05/2014 Massimiliano Pinto Added: specific version string in server handshake * */ - #include #include #include @@ -257,7 +256,7 @@ MySQLSendHandshake(DCB* dcb) memcpy(mysql_plugin_data, server_scramble + 8, 12); - mysql_payload_size = sizeof(mysql_protocol_version) + (len_version_string + 1) + sizeof(mysql_thread_id) + 8 + sizeof(mysql_filler) + sizeof(mysql_server_capabilities_one) + sizeof(mysql_server_language) + sizeof(mysql_server_status) + sizeof(mysql_server_capabilities_two) + sizeof(mysql_scramble_len) + sizeof(mysql_filler_ten) + 12 + sizeof(mysql_last_byte) + strlen("mysql_native_password") + sizeof(mysql_last_byte); + mysql_payload_size = sizeof(mysql_protocol_version) + (len_version_string + 1) + sizeof(mysql_thread_id) + 8 + sizeof(mysql_filler) + sizeof(mysql_server_capabilities_one) + sizeof(mysql_server_language) + sizeof(mysql_server_status) + sizeof(mysql_server_capabilities_two) + sizeof(mysql_scramble_len) + sizeof(mysql_filler_ten) + 12 + sizeof(mysql_last_byte) + strlen("mysql_native_password") + sizeof(mysql_last_byte); // allocate memory for packet header + payload if ((buf = gwbuf_alloc(sizeof(mysql_packet_header) + mysql_payload_size)) == NULL) @@ -282,9 +281,9 @@ MySQLSendHandshake(DCB* dcb) memcpy(mysql_handshake_payload, &mysql_protocol_version, sizeof(mysql_protocol_version)); mysql_handshake_payload = mysql_handshake_payload + sizeof(mysql_protocol_version); - // write server version plus 0 filler - strcpy((char *)mysql_handshake_payload, version_string); - mysql_handshake_payload = mysql_handshake_payload + len_version_string; + // write server version plus 0 filler + strcpy((char *)mysql_handshake_payload, version_string); + mysql_handshake_payload = mysql_handshake_payload + len_version_string; *mysql_handshake_payload = 0x00; @@ -507,9 +506,10 @@ int gw_read_client_event(DCB* dcb) { ROUTER *router_instance = NULL; void *rsession = NULL; MySQLProtocol *protocol = NULL; + GWBUF *read_buffer = NULL; int b = -1; int rc = 0; - + int nbytes_read = 0; CHK_DCB(dcb); protocol = DCB_PROTOCOL(dcb, MySQLProtocol); CHK_PROTOCOL(protocol); @@ -536,7 +536,6 @@ int gw_read_client_event(DCB* dcb) { /* * Handle the closed client socket. */ - if (b == 0) { char c; int l_errno = 0; @@ -561,41 +560,80 @@ int gw_read_client_event(DCB* dcb) { goto return_rc; } + rc = gw_read_gwbuff(dcb, &read_buffer, b); + + if (rc != 0) { + goto return_rc; + } + + nbytes_read = gwbuf_length(read_buffer); + ss_dassert(nbytes_read > 0); + + /** + * if read queue existed appent read to it. + * if length of read buffer is less than 3 or less than mysql packet + * then return. + * else copy mysql packets to separate buffers from read buffer and + * continue. + * else + * if read queue didn't exist, length of read is less than 3 or less + * than mysql packet then + * create read queue and append to it and return. + * if length read is less than mysql packet length append to read queue + * append to it and return. + * else (complete packet was read) continue. + */ + if (dcb->dcb_readqueue) + { + uint8_t* data = (uint8_t *)GWBUF_DATA(read_buffer); + + read_buffer = gwbuf_append(dcb->dcb_readqueue, read_buffer); + nbytes_read = gwbuf_length(read_buffer); + + if (nbytes_read < 3 || nbytes_read < MYSQL_GET_PACKET_LEN(data)) + { + rc = 0; + goto return_rc; + } + else + { + /** + * There is at least one complete mysql packet read + */ + read_buffer = dcb->dcb_readqueue; + dcb->dcb_readqueue = NULL; + } + } + else + { + uint8_t* data = (uint8_t *)GWBUF_DATA(read_buffer); + size_t packetlen = MYSQL_GET_PACKET_LEN(data)+4; + if (nbytes_read < 3 || nbytes_read < packetlen) + { + gwbuf_append(dcb->dcb_readqueue, read_buffer); + rc = 0; + goto return_rc; + } + } + + /** + * Now there should be at least one complete mysql packet in read_buffer. + */ switch (protocol->state) { + case MYSQL_AUTH_SENT: /* * Read all the data that is available into a chain of buffers */ { - int len = -1; - GWBUF *queue = NULL; - GWBUF *gw_buffer = NULL; int auth_val = -1; - ////////////////////////////////////////////////////// - // read and handle errors & close, or return if busy - // note: if b == 0 error handling is not - // triggered, just return - // without closing - ////////////////////////////////////////////////////// - rc = gw_read_gwbuff(dcb, &gw_buffer, b); - - if (rc != 0) { - goto return_rc; - } - - // example with consume, assuming one buffer only ... - queue = gw_buffer; - len = GWBUF_LENGTH(queue); - - ss_dassert(len > 0); - - auth_val = gw_mysql_do_authentication(dcb, queue); - + + auth_val = gw_mysql_do_authentication(dcb, read_buffer); // Data handled withot the dcb->func.write // so consume it now // be sure to consume it all - queue = gwbuf_consume(queue, len); + read_buffer = gwbuf_consume(read_buffer, nbytes_read); if (auth_val == 0) { @@ -638,9 +676,7 @@ int gw_read_client_event(DCB* dcb) { * Read all the data that is available into a chain of buffers */ { - int len = -1; uint8_t cap = 0; - GWBUF *read_buffer = NULL; uint8_t *ptr_buff = NULL; int mysql_command = -1; bool stmt_input; /*< router input type */ @@ -655,22 +691,14 @@ int gw_read_client_event(DCB* dcb) { session->service->router_instance; rsession = session->router_session; } - - ////////////////////////////////////////////////////// - // read and handle errors & close, or return if busy - ////////////////////////////////////////////////////// - rc = gw_read_gwbuff(dcb, &read_buffer, b); - - if (rc != 0) { - goto return_rc; - } + /* Now, we are assuming in the first buffer there is * the information form mysql command */ - len = GWBUF_LENGTH(read_buffer); ptr_buff = GWBUF_DATA(read_buffer); /* get mysql commang at fifth byte */ if (ptr_buff) { + ss_dassert(nbytes_read >= 5); mysql_command = ptr_buff[4]; } /** @@ -701,7 +729,7 @@ int gw_read_client_event(DCB* dcb) { } rc = 1; /** Free buffer */ - read_buffer = gwbuf_consume(read_buffer, len); + read_buffer = gwbuf_consume(read_buffer, nbytes_read); goto return_rc; } /** Ask what type of input the router expects */ @@ -758,7 +786,12 @@ int gw_read_client_event(DCB* dcb) { rc = route_by_statement(router_instance, router, rsession, - read_buffer); + read_buffer); + if (read_buffer != NULL) + { + /** add incomplete mysql packet to read queue */ + gwbuf_append(dcb->dcb_readqueue, read_buffer); + } } else { @@ -1228,11 +1261,11 @@ return_rc: static int gw_error_client_event( DCB* dcb) -{ + { int rc; - + CHK_DCB(dcb); - + rc = dcb->func.close(dcb); return rc; @@ -1289,11 +1322,11 @@ static int gw_client_hangup_event(DCB *dcb) { int rc; - + CHK_DCB(dcb); rc = dcb->func.close(dcb); - return rc; + return rc; } @@ -1301,6 +1334,10 @@ gw_client_hangup_event(DCB *dcb) * Detect if buffer includes partial mysql packet or multiple packets. * Store partial packet to pendingqueue. Send complete packets one by one * to router. + * + * It is assumed readbuf includes at least one complete packet. + * Return 1 in success. If the last packet is incomplete return success but + * leave incomplete packet to readbuf. */ static int route_by_statement( ROUTER* router_instance, @@ -1309,26 +1346,26 @@ static int route_by_statement( GWBUF* readbuf) { int rc = -1; - DCB* master_dcb; - GWBUF* stmtbuf; - uint8_t* payload; - static size_t len; + GWBUF* packetbuf; do { - stmtbuf = gw_MySQL_get_next_stmt(&readbuf); - ss_dassert(stmtbuf != NULL); - CHK_GWBUF(stmtbuf); - - payload = (uint8_t *)GWBUF_DATA(stmtbuf); - /** - * If message is longer than read data, suspend routing and - * add statement buffer to wait queue. - */ - rc = router->routeQuery(router_instance, rsession, stmtbuf); + packetbuf = gw_MySQL_get_next_packet(&readbuf); + + if (packetbuf != NULL) + { + CHK_GWBUF(packetbuf); + rc = router->routeQuery(router_instance, rsession, packetbuf); + } + else + { + rc = 1; + goto return_rc; + } } while (readbuf != NULL); +return_rc: return rc; } diff --git a/server/modules/protocol/mysql_common.c b/server/modules/protocol/mysql_common.c index 81911a9b2..b82b10583 100644 --- a/server/modules/protocol/mysql_common.c +++ b/server/modules/protocol/mysql_common.c @@ -1279,54 +1279,86 @@ mysql_send_auth_error (DCB *dcb, int packet_number, int in_affected_rows, const /** - * Remove the first mysql statement from buffer. Return pointer to the removed - * statement or NULL if buffer is empty. - * - * Clone buf, calculate the length of included mysql stmt, and point the - * statement with cloned buffer. Move the start pointer of buf accordingly - * so that it only cover the remaining buffer. + * Buffer contains at least one of the following: + * complete [complete] [partial] mysql packet * + * return pointer to gwbuf containing a complete packet or + * NULL if no complete packet was found. */ -GWBUF* gw_MySQL_get_next_stmt( +GWBUF* gw_MySQL_get_next_packet( GWBUF** p_readbuf) { - GWBUF* stmtbuf; - size_t buflen; - size_t strlen; - uint8_t* packet; + GWBUF* packetbuf; + GWBUF* readbuf; + size_t buflen; + size_t packetlen; + size_t totalbuflen; + uint8_t* data; - if (*p_readbuf == NULL) + readbuf = *p_readbuf; + + if (readbuf == NULL) { - stmtbuf = NULL; - goto return_stmtbuf; + packetbuf = NULL; + goto return_packetbuf; } - CHK_GWBUF(*p_readbuf); + CHK_GWBUF(readbuf); - if (GWBUF_EMPTY(*p_readbuf)) + if (GWBUF_EMPTY(readbuf)) { - stmtbuf = NULL; - goto return_stmtbuf; + packetbuf = NULL; + goto return_packetbuf; } - buflen = GWBUF_LENGTH((*p_readbuf)); - packet = GWBUF_DATA((*p_readbuf)); - strlen = MYSQL_GET_PACKET_LEN(packet); + + buflen = GWBUF_LENGTH((readbuf)); + totalbuflen = gwbuf_length(readbuf); + data = (uint8_t *)GWBUF_DATA((readbuf)); + packetlen = MYSQL_GET_PACKET_LEN(data)+4; - if (strlen+4 == buflen) + /** packet is incomplete */ + if (packetlen > totalbuflen) { - stmtbuf = *p_readbuf; - *p_readbuf = NULL; - goto return_stmtbuf; + packetbuf = NULL; + goto return_packetbuf; } - /** vraa :Multi-packet stmt is not supported as of 7.3.14 */ - if (strlen-1 > buflen-5) + + if (packetlen == buflen) { - stmtbuf = NULL; - goto return_stmtbuf; + packetbuf = gwbuf_clone_portion(readbuf, 0, packetlen); + *p_readbuf = gwbuf_consume(readbuf, packetlen); + goto return_packetbuf; + } + /** + * Packet spans multiple buffers. + * Allocate buffer for complete packet + * copy packet parts into it and consume copied bytes + */ + else if (packetlen > buflen) + { + size_t nbytes_copied = 0; + uint8_t* target; + + packetbuf = gwbuf_alloc(packetlen); + target = GWBUF_DATA(packetbuf); + + while (nbytes_copied < packetlen) + { + uint8_t* src = GWBUF_DATA(readbuf); + size_t buflen = GWBUF_LENGTH(readbuf); + + memcpy(target+nbytes_copied, src, buflen); + *p_readbuf = gwbuf_consume(readbuf, buflen); + nbytes_copied += buflen; + } + ss_dassert(nbytes_copied == packetlen); + } + else + { + packetbuf = gwbuf_clone_portion(readbuf, 0, packetlen); + *p_readbuf = gwbuf_consume(readbuf, packetlen); } - stmtbuf = gwbuf_clone_portion(*p_readbuf, 0, strlen+4); - *p_readbuf = gwbuf_consume(*p_readbuf, strlen+4); -return_stmtbuf: - return stmtbuf; +return_packetbuf: + return packetbuf; } diff --git a/server/modules/routing/readwritesplit/readwritesplit.c b/server/modules/routing/readwritesplit/readwritesplit.c index 93d339a65..a203a9865 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.c +++ b/server/modules/routing/readwritesplit/readwritesplit.c @@ -411,6 +411,7 @@ static void* newSession( int conf_max_nslaves; /*< value from configuration file */ int i; const int min_nservers = 1; /*< hard-coded for now */ + static uint64_t router_client_ses_seq; /*< ID for client session */ client_rses = (ROUTER_CLIENT_SES *)calloc(1, sizeof(ROUTER_CLIENT_SES)); @@ -443,6 +444,9 @@ static void* newSession( } /** Copy config struct from router instance */ client_rses->rses_config = router->rwsplit_config; + /** Create ID for the new client (router_client_ses) session */ + client_rses->rses_id = router_client_ses_seq += 1; + spinlock_release(&router->lock); /** * Set defaults to session variables. @@ -926,13 +930,14 @@ static int routeQuery( default: break; } /**< switch by packet type */ - +#if 0 LOGIF(LT, (skygw_log_write(LOGFILE_TRACE, "String\t\"%s\"", querystr == NULL ? "(empty)" : querystr))); LOGIF(LT, (skygw_log_write(LOGFILE_TRACE, "Packet type\t%s", STRPACKETTYPE(packet_type)))); +#endif #if defined(AUTOCOMMIT_OPT) if ((QUERY_IS_TYPE(qtype, QUERY_TYPE_DISABLE_AUTOCOMMIT) && !router_cli_ses->rses_autocommit_enabled) || @@ -1005,7 +1010,9 @@ static int routeQuery( LOGIF(LT, (skygw_log_write( LOGFILE_TRACE, - "Read-only query, routing to Slave."))); + "[%s.%d]\tRead-only query, routing to Slave.", + inst->service->name, + router_cli_ses->rses_id))); ss_dassert(QUERY_IS_TYPE(qtype, QUERY_TYPE_READ)); succp = get_dcb(&slave_dcb, router_cli_ses, BE_SLAVE);