From e95b6cc0d94ac66a89325a42ed947ad631f8b38b Mon Sep 17 00:00:00 2001 From: VilhoRaatikka Date: Thu, 12 Jun 2014 19:02:47 +0300 Subject: [PATCH] dcb.c, gw_utils.c, mysql_server_protocol.h, mysql_client.c : Replaced gw_read_gwbuff with dcb_read in mysql_client.c:gw_read_client_event. rwsplit.sh, test_sescmd.sql : Added test case for session commands. --- server/core/dcb.c | 222 ++++++++++-------- server/core/gw_utils.c | 56 ----- .../include/mysql_client_server_protocol.h | 25 +- server/modules/protocol/mysql_client.c | 98 +++----- .../routing/readwritesplit/test/rwsplit.sh | 39 +++ .../readwritesplit/test/test_sescmd.sql | 4 + 6 files changed, 213 insertions(+), 231 deletions(-) create mode 100644 server/modules/routing/readwritesplit/test/test_sescmd.sql diff --git a/server/core/dcb.c b/server/core/dcb.c index bab2f87fe..be71d9677 100644 --- a/server/core/dcb.c +++ b/server/core/dcb.c @@ -595,26 +595,29 @@ int rc; * * @param dcb The DCB to read from * @param head Pointer to linked list to append data to - * @return -1 on error, otherwise the number of read bytes on the last. - * 0 is returned if no data available on the last iteration of while loop. + * @return -1 on error, otherwise the number of read bytes on the last + * iteration of while loop. 0 is returned if no data available. */ -int -dcb_read(DCB *dcb, GWBUF **head) +int dcb_read( + DCB *dcb, + GWBUF **head) { -GWBUF *buffer = NULL; -int b; -int rc; -int n = 0; -int eno = 0; - + GWBUF *buffer = NULL; + int b; + int rc; + int n ; + int nread = 0; + int eno = 0; + CHK_DCB(dcb); while (true) - { - int bufsize; - + { + int bufsize; + rc = ioctl(dcb->fd, FIONREAD, &b); - - if (rc == -1) { + + if (rc == -1) + { eno = errno; errno = 0; LOGIF(LE, (skygw_log_write_flush( @@ -629,19 +632,39 @@ int eno = 0; n = -1; goto return_n; } - /*< Nothing to read - leave */ - if (b == 0) { + + if (b == 0 && nread == 0) + { + /** Handle closed client socket */ + if (dcb_isclient(dcb)) + { + char c; + int l_errno = 0; + int r = -1; + + /* try to read 1 byte, without consuming the socket buffer */ + r = recv(dcb->fd, &c, sizeof(char), MSG_PEEK); + l_errno = errno; + + if (r <= 0 && + l_errno != EAGAIN && + l_errno != EWOULDBLOCK) + { + n = -1; + goto return_n; + } + } n = 0; goto return_n; } bufsize = MIN(b, MAX_BUFFER_SIZE); - - if ((buffer = gwbuf_alloc(bufsize)) == NULL) - { + + if ((buffer = gwbuf_alloc(bufsize)) == NULL) + { /*< - * This is a fatal error which should cause shutdown. - * Todo shutdown if memory allocation fails. - */ + * This is a fatal error which should cause shutdown. + * Todo shutdown if memory allocation fails. + */ LOGIF(LE, (skygw_log_write_flush( LOGFILE_ERROR, "Error : Failed to allocate read buffer " @@ -654,16 +677,17 @@ int eno = 0; n = -1; ss_dassert(buffer != NULL); goto return_n; - } - GW_NOINTR_CALL(n = read(dcb->fd, GWBUF_DATA(buffer), bufsize); - dcb->stats.n_reads++); - - if (n <= 0) - { + } + GW_NOINTR_CALL(n = read(dcb->fd, GWBUF_DATA(buffer), bufsize); + dcb->stats.n_reads++); + + if (n <= 0) + { int eno = errno; errno = 0; - - if (eno != EAGAIN && eno != EWOULDBLOCK) { + + if (eno != 0 && eno != EAGAIN && eno != EWOULDBLOCK) + { LOGIF(LE, (skygw_log_write_flush( LOGFILE_ERROR, "Error : Read failed, dcb %p in state " @@ -674,18 +698,11 @@ int eno = 0; eno, strerror(eno)))); } - else - { - /*< - * If read would block it means that other thread - * has probably read the data. - */ - n = 0; - } - - gwbuf_free(buffer); + gwbuf_free(buffer); goto return_n; } + nread += n; + LOGIF(LD, (skygw_log_write( LOGFILE_DEBUG, "%lu [dcb_read] Read %d bytes from dcb %p in state %s " @@ -695,14 +712,13 @@ int eno = 0; dcb, STRDCBSTATE(dcb->state), dcb->fd))); - /*< Append read data to the gwbuf */ - *head = gwbuf_append(*head, buffer); - } /*< while (true) */ + /*< Append read data to the gwbuf */ + *head = gwbuf_append(*head, buffer); + } /*< while (true) */ return_n: - return n; + return n; } - /** * General purpose routine to write to a DCB * @@ -712,7 +728,7 @@ return_n: int dcb_write(DCB *dcb, GWBUF *queue) { -int w, qlen; +int w; int saved_errno = 0; int below_water; @@ -761,26 +777,26 @@ int below_water; * not have a race condition on the event. */ if (queue) - qlen = gwbuf_length(queue); - else - qlen = 0; - atomic_add(&dcb->writeqlen, qlen); - dcb->writeq = gwbuf_append(dcb->writeq, queue); - dcb->stats.n_buffered++; - LOGIF(LD, (skygw_log_write( - LOGFILE_DEBUG, - "%lu [dcb_write] Append to writequeue. %d writes " - "buffered for dcb %p in state %s fd %d", - pthread_self(), - dcb->stats.n_buffered, - dcb, - STRDCBSTATE(dcb->state), - dcb->fd))); + { + int qlen; + + qlen = gwbuf_length(queue); + atomic_add(&dcb->writeqlen, qlen); + dcb->writeq = gwbuf_append(dcb->writeq, queue); + dcb->stats.n_buffered++; + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [dcb_write] Append to writequeue. %d writes " + "buffered for dcb %p in state %s fd %d", + pthread_self(), + dcb->stats.n_buffered, + dcb, + STRDCBSTATE(dcb->state), + dcb->fd))); + } } else { - int len; - /* * Loop over the buffer chain that has been passed to us * from the reading side. @@ -789,6 +805,7 @@ int below_water; */ while (queue != NULL) { + int qlen; #if defined(SS_DEBUG) if (dcb->dcb_role == DCB_ROLE_REQUEST_HANDLER && dcb->session != NULL) @@ -806,13 +823,13 @@ int below_water; } } #endif /* SS_DEBUG */ - len = GWBUF_LENGTH(queue); + qlen = GWBUF_LENGTH(queue); GW_NOINTR_CALL( w = gw_write( #if defined(SS_DEBUG) dcb, #endif - dcb->fd, GWBUF_DATA(queue), len); + dcb->fd, GWBUF_DATA(queue), qlen); dcb->stats.n_writes++; ); @@ -823,37 +840,39 @@ int below_water; if (LOG_IS_ENABLED(LOGFILE_DEBUG)) { - if (saved_errno == EPIPE) { - LOGIF(LD, (skygw_log_write( - LOGFILE_DEBUG, - "%lu [dcb_write] Write to dcb " - "%p in state %s fd %d failed " - "due errno %d, %s", - pthread_self(), - dcb, - STRDCBSTATE(dcb->state), - dcb->fd, - saved_errno, - strerror(saved_errno)))); + if (saved_errno == EPIPE) + { + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [dcb_write] Write to dcb " + "%p in state %s fd %d failed " + "due errno %d, %s", + pthread_self(), + dcb, + STRDCBSTATE(dcb->state), + dcb->fd, + saved_errno, + strerror(saved_errno)))); } } + if (LOG_IS_ENABLED(LOGFILE_ERROR)) { if (saved_errno != EPIPE && saved_errno != EAGAIN && - saved_errno != EWOULDBLOCK) - { - LOGIF(LE, (skygw_log_write_flush( - LOGFILE_ERROR, - "Error : Write to dcb %p in " - "state %s fd %d failed due " - "errno %d, %s", - dcb, - STRDCBSTATE(dcb->state), - dcb->fd, - saved_errno, - strerror(saved_errno)))); - } + saved_errno != EWOULDBLOCK) + { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Write to dcb %p in " + "state %s fd %d failed due " + "errno %d, %s", + dcb, + STRDCBSTATE(dcb->state), + dcb->fd, + saved_errno, + strerror(saved_errno)))); + } } break; } @@ -877,20 +896,15 @@ int below_water; * for suspended write. */ dcb->writeq = queue; - if (queue) + + if (queue) { + int qlen; + qlen = gwbuf_length(queue); - } - else - { - qlen = 0; - } - atomic_add(&dcb->writeqlen, qlen); - - if (queue != NULL) - { - dcb->stats.n_buffered++; - } + atomic_add(&dcb->writeqlen, qlen); + dcb->stats.n_buffered++; + } } /* if (dcb->writeq) */ if (saved_errno != 0 && @@ -1449,7 +1463,7 @@ static bool dcb_set_state_nomutex( } /*< switch (dcb->state) */ if (succp) { - LOGIF(LD, (skygw_log_write( + LOGIF(LD, (skygw_log_write_flush( LOGFILE_DEBUG, "%lu [dcb_set_state_nomutex] dcb %p fd %d %s -> %s", pthread_self(), diff --git a/server/core/gw_utils.c b/server/core/gw_utils.c index 0507e3d1c..1ace170e5 100644 --- a/server/core/gw_utils.c +++ b/server/core/gw_utils.c @@ -157,62 +157,6 @@ void gw_daemonize(void) { } } -///////////////////////////////////////////////// -// Read data from dcb and store it in the gwbuf -///////////////////////////////////////////////// -int gw_read_gwbuff(DCB *dcb, GWBUF **head, int b) { - GWBUF *buffer = NULL; - int n = -1; - - if (b <= 0) { - ss_dassert(false); -#if 0 - dcb->func.close(dcb); -#endif - return 1; - } - - while (b > 0) { - int bufsize = b < MAX_BUFFER_SIZE ? b : MAX_BUFFER_SIZE; - if ((buffer = gwbuf_alloc(bufsize)) == NULL) { - /* Bad news, we have run out of memory */ - /* Error handling */ - (dcb->func).close(dcb); - return 1; - } - - GW_NOINTR_CALL(n = read(dcb->fd, GWBUF_DATA(buffer), bufsize); dcb->stats.n_reads++); - - if (n < 0) { - if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) { - gwbuf_free(buffer); - return 1; - } else { - gwbuf_free(buffer); - (dcb->func).close(dcb); - return 1; - } - } - - if (n == 0) { - // socket closed - gwbuf_free(buffer); -#if 1 - (dcb->func).close(dcb); -#endif - return 1; - } - - // append read data to the gwbuf - *head = gwbuf_append(*head, buffer); - - // how many bytes left - b -= n; - } - - return 0; -} - /** * Parse the bind config data. This is passed in a string as address:port. * diff --git a/server/modules/include/mysql_client_server_protocol.h b/server/modules/include/mysql_client_server_protocol.h index 838dd40e6..5f5d30735 100644 --- a/server/modules/include/mysql_client_server_protocol.h +++ b/server/modules/include/mysql_client_server_protocol.h @@ -236,9 +236,10 @@ typedef enum #define MYSQL_COM_INIT_DB 0x2 #define MYSQL_COM_QUERY 0x3 -#define MYSQL_GET_COMMAND(payload) (payload[4]) -#define MYSQL_GET_PACKET_NO(payload) (payload[3]) +#define MYSQL_GET_COMMAND(payload) (payload[4]) +#define MYSQL_GET_PACKET_NO(payload) (payload[3]) #define MYSQL_GET_PACKET_LEN(payload) (gw_mysql_get_byte3(payload)) +#define MYSQL_GET_ERRCODE(payload) (gw_mysql_get_byte2(&payload[5])) #endif @@ -255,8 +256,10 @@ int gw_send_authentication_to_backend( uint8_t *passwd, MySQLProtocol *protocol); const char *gw_mysql_protocol_state2string(int state); -int gw_do_connect_to_backend(char *host, int port, int* fd); -int mysql_send_com_quit(DCB* dcb, int packet_number); +int gw_do_connect_to_backend(char *host, int port, int* fd); +int mysql_send_com_quit(DCB* dcb, int packet_number, GWBUF* buf); +GWBUF* mysql_create_com_quit(GWBUF* bufparam, int packet_number); + int mysql_send_custom_error ( DCB *dcb, int packet_number, @@ -297,12 +300,12 @@ void gw_str_xor( const uint8_t *input1, const uint8_t *input2, unsigned int len); -char *gw_bin2hex(char *out, const uint8_t *in, unsigned int len); -int gw_hex2bin(uint8_t *out, const char *in, unsigned int len); -int gw_generate_random_str(char *output, int len); -char *gw_strend(register const char *s); -int setnonblocking(int fd); -int setipaddress(struct in_addr *a, char *p); -int gw_read_gwbuff(DCB *dcb, GWBUF **head, int b); + +char *gw_bin2hex(char *out, const uint8_t *in, unsigned int len); +int gw_hex2bin(uint8_t *out, const char *in, unsigned int len); +int gw_generate_random_str(char *output, int len); +char *gw_strend(register const char *s); +int setnonblocking(int fd); +int setipaddress(struct in_addr *a, char *p); GWBUF* gw_MySQL_get_next_packet(GWBUF** p_readbuf); diff --git a/server/modules/protocol/mysql_client.c b/server/modules/protocol/mysql_client.c index d99b23e18..9c6955f7d 100644 --- a/server/modules/protocol/mysql_client.c +++ b/server/modules/protocol/mysql_client.c @@ -504,75 +504,32 @@ gw_MySQLWrite_client(DCB *dcb, GWBUF *queue) * @param dcb Descriptor control block * @return 0 if succeed, 1 otherwise */ -int gw_read_client_event(DCB* dcb) { +int gw_read_client_event( + DCB* dcb) +{ SESSION *session = NULL; ROUTER_OBJECT *router = NULL; ROUTER *router_instance = NULL; void *rsession = NULL; MySQLProtocol *protocol = NULL; GWBUF *read_buffer = NULL; - int b = -1; int rc = 0; int nbytes_read = 0; CHK_DCB(dcb); protocol = DCB_PROTOCOL(dcb, MySQLProtocol); CHK_PROTOCOL(protocol); - /** - * Check how many bytes are readable in dcb->fd. - */ - if (ioctl(dcb->fd, FIONREAD, &b) != 0) { - int eno = errno; - errno = 0; - - LOGIF(LE, (skygw_log_write( - LOGFILE_ERROR, - "%lu [gw_read_client_event] ioctl FIONREAD for fd " - "%d failed. errno %d, %s. dcb->state = %d", - pthread_self(), - dcb->fd, - eno, - strerror(eno), - dcb->state))); - rc = 1; - goto return_rc; + rc = dcb_read(dcb, &read_buffer); + + if (rc < 0) + { + dcb_close(dcb); } - - /* - * Handle the closed client socket. - */ - if (b == 0) { - char c; - int l_errno = 0; - int r = -1; - - rc = 0; - - /* try to read 1 byte, without consuming the socket buffer */ - r = recv(dcb->fd, &c, sizeof(char), MSG_PEEK); - l_errno = errno; - - if (r <= 0) { - if ( (l_errno == EAGAIN) || (l_errno == EWOULDBLOCK)) { - goto return_rc; - } - - // close client socket and the session too - dcb->func.close(dcb); - } else { - // do nothing if reading 1 byte - } - - goto return_rc; - } - rc = gw_read_gwbuff(dcb, &read_buffer, b); - - if (rc != 0) { - goto return_rc; - } - nbytes_read = gwbuf_length(read_buffer); - ss_dassert(nbytes_read > 0); - + + if (nbytes_read == 0) + { + goto return_rc; + } /** * if read queue existed appent read to it. * if length of read buffer is less than 3 or less than mysql packet @@ -660,6 +617,14 @@ int gw_read_client_event(DCB* dcb) { else { protocol->state = MYSQL_AUTH_FAILED; + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [gw_read_client_event] session " + "creation failed. fd %d, " + "state = MYSQL_AUTH_FAILED.", + protocol->owner_dcb->fd, + pthread_self()))); + /** Send ERR 1045 to client */ mysql_send_auth_error( dcb, @@ -676,6 +641,14 @@ int gw_read_client_event(DCB* dcb) { else { protocol->state = MYSQL_AUTH_FAILED; + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [gw_read_client_event] after " + "gw_mysql_do_authentication, fd %d, " + "state = MYSQL_AUTH_FAILED.", + protocol->owner_dcb->fd, + pthread_self()))); + /** Send ERR 1045 to client */ mysql_send_auth_error( dcb, @@ -794,12 +767,17 @@ int gw_read_client_event(DCB* dcb) { } /** Route COM_QUIT to backend */ - if (mysql_command == '\x01') { + if (mysql_command == '\x01') + { #if defined(ERRHANDLE) /** - * Close router session and that closes - * backends. - * Closing backends includes sending COM_QUIT packets. + * Sends COM_QUIT packets since buffer is already + * created. A BREF_CLOSED flag is set so dcb_close won't + * send redundant COM_QUIT. + */ + SESSION_ROUTE_QUERY(session, read_buffer); + /** + * Close router session which causes closing of backends. */ dcb_close(dcb); #else diff --git a/server/modules/routing/readwritesplit/test/rwsplit.sh b/server/modules/routing/readwritesplit/test/rwsplit.sh index 6c5268d46..36199e384 100755 --- a/server/modules/routing/readwritesplit/test/rwsplit.sh +++ b/server/modules/routing/readwritesplit/test/rwsplit.sh @@ -191,3 +191,42 @@ else echo "$TINPUT PASSED">>$TLOG ; fi + +TINPUT=test_sescmd.sql +TRETVAL=2 +a=`$RUNCMD < ./$TINPUT` +if [ "$a" != "$TRETVAL" ]; then + echo "$TINPUT FAILED, return value $a when $TRETVAL was expected">>$TLOG; +else + echo "$TINPUT PASSED">>$TLOG ; +fi +a=`$RUNCMD < ./$TINPUT` +if [ "$a" != "$TRETVAL" ]; then + echo "$TINPUT FAILED, return value $a when $TRETVAL was expected">>$TLOG; +else + echo "$TINPUT PASSED">>$TLOG ; +fi +a=`$RUNCMD < ./$TINPUT` +if [ "$a" != "$TRETVAL" ]; then + echo "$TINPUT FAILED, return value $a when $TRETVAL was expected">>$TLOG; +else + echo "$TINPUT PASSED">>$TLOG ; +fi +a=`$RUNCMD < ./$TINPUT` +if [ "$a" != "$TRETVAL" ]; then + echo "$TINPUT FAILED, return value $a when $TRETVAL was expected">>$TLOG; +else + echo "$TINPUT PASSED">>$TLOG ; +fi +a=`$RUNCMD < ./$TINPUT` +if [ "$a" != "$TRETVAL" ]; then + echo "$TINPUT FAILED, return value $a when $TRETVAL was expected">>$TLOG; +else + echo "$TINPUT PASSED">>$TLOG ; +fi +a=`$RUNCMD < ./$TINPUT` +if [ "$a" != "$TRETVAL" ]; then + echo "$TINPUT FAILED, return value $a when $TRETVAL was expected">>$TLOG; +else + echo "$TINPUT PASSED">>$TLOG ; +fi diff --git a/server/modules/routing/readwritesplit/test/test_sescmd.sql b/server/modules/routing/readwritesplit/test/test_sescmd.sql new file mode 100644 index 000000000..7bc6edcf9 --- /dev/null +++ b/server/modules/routing/readwritesplit/test/test_sescmd.sql @@ -0,0 +1,4 @@ +use test; +set autocommit=1; +use mysql; +select count(*) from user where user='maxuser'