From 74aa3638f931da2e5bef7cecaf10dd36072611d4 Mon Sep 17 00:00:00 2001 From: vraatikka Date: Fri, 20 Sep 2013 14:32:28 +0300 Subject: [PATCH] dcb.c dcb_final_free: Router session is not closed anymore when dcb is closed. Router session is shared among all dcbs and is closed and freed with session. dcb_connect: dcb's state must be switched fro DCB_STATE_ALLOC to DCB_STATE_DISCONNECTED before dcb_final_free can be called for it. dcb_close: poll_remove_dcb encapsulates dcb's state transition in the same way as poll_add_dcb. Removed state modification from dcb_close. Read return value of poll_remove_dcb and log accordingly. dcb_set state: remove dassert if dcb_set_state_nomutex returned false. False can be returned indicating that intented change didn't occur but the end state, for example, may be acceptable. Failures in state transitions are asserted in dcb_Set_state_nomutex. poll.c poll_add_dcb: dcb state is now set here to either DCB_STATE_LISTENING or to DCB_STATE_POLLING according to dcb's role. Failures in state setting and in epoll_ctl are detected, logged and handled. poll_remove_dcb: Failures in state setting and epoll_ctl are detected, logged, and handled. mysql_client_server_protocol.h Removed macros MYSQL_FAILED_AUTHENTICATION & MYSQL_SUCCESFUL_AUTHENTICATION as they were not necessary and used with constant values 0 and 1 depending on the case. Renamed variable 'conn' to 'protocol' in cases where it meant protocol. mysql_backend.c gw_read_backend_event: In case when there was nothing to read or read failed, backend dcb is closed because situation is assumed to be such that backend server closed its side of the socket. Removed macros MYSQL_FAILED/SUCCESFUL_AUTHENTICATION gw_create_backend_connection: Assigned protocol with fd which is connected to backend. backend_write_delayqueue: In case where dcb_write fails to write anything, close backend dcb to avoid it getting hanging. mysql_client.c gw_read_client_event: Read return value of routeQuery and if it isn't == 1, call mysql_send_custom_error with client dcb and set client's protocol state to MYSQL_IDLE instead of MYSQL_ROUTING. gw_MySQLAccept: Static reply counter was erroneously used as a criteria for jumping to return point where return value was constantly 'success' (=0). Replaced static reply counter with private. Fixed return value in different cases. mysql_common.c gw_receive_backend_auth: Changed to return boolean values indicating of success or failue. Used integers which were assigned to macroed values on the caller side. Added length check before accessing buffer. readconnroute.c Cut too long lines and removed statements with side effects. skygw_debug.h Added macro STRPROTOCOLSTATE(s) to produce string representation of a given protocol state. --- server/core/dcb.c | 113 +++---- server/core/poll.c | 170 ++++++---- .../include/mysql_client_server_protocol.h | 21 +- server/modules/protocol/mysql_backend.c | 302 ++++++++++-------- server/modules/protocol/mysql_client.c | 152 +++++++-- server/modules/protocol/mysql_common.c | 47 +-- server/modules/routing/readconnroute.c | 17 +- utils/skygw_debug.h | 11 + 8 files changed, 530 insertions(+), 303 deletions(-) diff --git a/server/core/dcb.c b/server/core/dcb.c index d7262fd07..6eeb0b8ff 100644 --- a/server/core/dcb.c +++ b/server/core/dcb.c @@ -146,7 +146,8 @@ dcb_add_to_zombieslist(DCB *dcb) CHK_DCB(dcb); if (dcb->state != DCB_STATE_NOPOLLING) { - ss_dassert(dcb->state != DCB_STATE_POLLING); + ss_dassert(dcb->state != DCB_STATE_POLLING && + dcb->state != DCB_STATE_LISTENING); return; } /** @@ -236,35 +237,7 @@ void* rsession = NULL; } spinlock_release(&dcbspin); - /** - * Terminate router session. - */ if (dcb->session) { - service = dcb->session->service; - - if (service != NULL && - service->router != NULL && - dcb->session->router_session != NULL) - { - /** - * Protect call of closeSession. - */ - spinlock_acquire(&dcb->session->ses_lock); - rsession = dcb->session->router_session; - spinlock_release(&dcb->session->ses_lock); - if (rsession != NULL) { - service->router->closeSession( - service->router_instance, - rsession); - } else { - skygw_log_write_flush( - LOGFILE_TRACE, - "%lu [dcb_final_free] rsession was NULL in " - "dcb_close.", - pthread_self()); - } - } - /** * Terminate client session. */ @@ -283,7 +256,7 @@ void* rsession = NULL; } } - if (dcb->protocol) + if (dcb->protocol != NULL) free(dcb->protocol); if (dcb->data) free(dcb->data); @@ -379,7 +352,7 @@ bool succp = false; spinlock_release(&zombiespin); dcb = dcb_list; - + /** Close, and set DISCONNECTED victims */ while (dcb != NULL) { DCB* dcb_next = NULL; int rc = 0; @@ -447,6 +420,7 @@ int fd; } if ((funcs = (GWPROTOCOL *)load_module(protocol, MODULE_PROTOCOL)) == NULL) { + dcb_set_state(dcb, DCB_STATE_DISCONNECTED, NULL); dcb_final_free(dcb); skygw_log_write( LOGFILE_ERROR, @@ -553,17 +527,19 @@ int eno = 0; errno = 0; skygw_log_write( LOGFILE_ERROR, - "%lu [dcb_read] ioctl FIONREAD for fd %d failed. " + "%lu [dcb_read] ioctl FIONREAD for dcb %p fd %d " + "failed. " "errno %d, %s. dcb->state = %d", pthread_self(), + dcb, dcb->fd, - eno , + eno, strerror(eno), dcb->state); n = -1; goto return_n; } - + /** Nothing to read - leave */ if (b == 0) { goto return_n; } @@ -575,7 +551,18 @@ int eno = 0; * This is a fatal error which should cause shutdown. * vraa : todo shutdown if memory allocation fails. */ + skygw_log_write( + LOGFILE_ERROR, + "%lu [dcb_read] Failed to allocate read buffer " + "for dcb %p fd %d, due %d, %d.", + pthread_self(), + dcb, + dcb->fd, + eno, + strerror(eno)); + n = -1; + ss_dassert(buffer != NULL); goto return_n; } GW_NOINTR_CALL(n = read(dcb->fd, GWBUF_DATA(buffer), bufsize); @@ -583,15 +570,28 @@ int eno = 0; if (n <= 0) { + int eno = errno; + errno = 0; + + skygw_log_write( + LOGFILE_ERROR, + "%lu [dcb_read] Read failed, dcb %p fd %d, due " + "%d, %d.", + pthread_self(), + dcb, + dcb->fd, + eno, + strerror(eno)); gwbuf_free(buffer); goto return_n; } skygw_log_write( LOGFILE_TRACE, - "%lu [dcb_read] Read %d bytes from fd %d", + "%lu [dcb_read] Read %d bytes from dcb %p fd %d", pthread_self(), n, + dcb, dcb->fd); /** Append read data to the gwbuf */ *head = gwbuf_append(*head, buffer); @@ -613,6 +613,7 @@ dcb_write(DCB *dcb, GWBUF *queue) int w, saved_errno = 0; spinlock_acquire(&dcb->writeqlock); + if (dcb->writeq) { /* @@ -696,9 +697,10 @@ int w, saved_errno = 0; } skygw_log_write( LOGFILE_TRACE, - "%lu [dcb_write] Wrote %d Bytes to fd %d", + "%lu [dcb_write] Wrote %d Bytes to dcb %p fd %d", pthread_self(), w, + dcb, dcb->fd); } /* Buffer the balance of any data */ @@ -805,29 +807,32 @@ int saved_errno = 0; void dcb_close(DCB *dcb) { - dcb_state_t prev_state; - bool succp; - + bool succp; + int rc; CHK_DCB(dcb); /** - * Only the first call to dcb_close removes dcb from poll set. + * Stop dcb's listening and modify state accordingly. */ - spinlock_acquire(&dcb->dcb_initlock); - succp = dcb_set_state_nomutex(dcb, DCB_STATE_NOPOLLING, &prev_state); - - if (succp) { - poll_remove_dcb(dcb); - /* Set the bitmask of running polling threads */ - bitmask_copy(&dcb->memdata.bitmask, poll_bitmask()); + rc = poll_remove_dcb(dcb); + + if (rc == 0) { + skygw_log_write( + LOGFILE_TRACE, + "%lu [dcb_close] Removed dcb %p in state %s from " + "poll set.", + pthread_self(), + dcb, + STRDCBSTATE(dcb->state)); } else { - ss_info_dassert(!dcb_isclient(dcb) || - prev_state == DCB_STATE_NOPOLLING || - prev_state == DCB_STATE_ZOMBIE, - "Invalid state transition."); + skygw_log_write( + LOGFILE_TRACE, + "%lu [poll_remove_dcb] Removing dcb %p in state %s from " + "poll set failed.", + pthread_self(), + dcb, + STRDCBSTATE(dcb->state)); } - spinlock_release(&dcb->dcb_initlock); - if (dcb->state == DCB_STATE_NOPOLLING) { dcb_add_to_zombieslist(dcb); } @@ -1034,7 +1039,7 @@ bool dcb_set_state( CHK_DCB(dcb); spinlock_acquire(&dcb->dcb_initlock); succp = dcb_set_state_nomutex(dcb, new_state, &state); - ss_info_dassert(succp, "Failed to set new state for dcb"); + spinlock_release(&dcb->dcb_initlock); if (old_state != NULL) { diff --git a/server/core/poll.c b/server/core/poll.c index 2a1067252..54308f7df 100644 --- a/server/core/poll.c +++ b/server/core/poll.c @@ -90,29 +90,66 @@ poll_init() int poll_add_dcb(DCB *dcb) { - int rc; + int rc = -1; dcb_state_t old_state = DCB_STATE_UNDEFINED; + dcb_state_t new_state; struct epoll_event ev; + CHK_DCB(dcb); + ev.events = EPOLLIN | EPOLLOUT | EPOLLET; ev.data.ptr = dcb; /** - * Service listeners have different state than - * DCBs serving client requests. + * Choose new state according to the role of dcb. */ - if (dcb->dcb_role == DCB_ROLE_SERVICE_LISTENER) { - dcb_set_state(dcb, DCB_STATE_LISTENING, &old_state); - } else if (dcb->dcb_role == DCB_ROLE_REQUEST_HANDLER) { - dcb_set_state(dcb, DCB_STATE_POLLING, &old_state); - } - - rc = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, dcb->fd, &ev); - if (rc != 0) { - dcb_set_state(dcb, old_state, NULL); + if (dcb->dcb_role == DCB_ROLE_REQUEST_HANDLER) { + new_state = DCB_STATE_POLLING; + } else { + ss_dassert(dcb->dcb_role == DCB_ROLE_SERVICE_LISTENER); + new_state = DCB_STATE_LISTENING; } + /** + * If dcb is in unexpected state, state change fails indicating that dcb + * is not polling anymore. + */ + if (dcb_set_state(dcb, new_state, &old_state)) { + rc = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, dcb->fd, &ev); + ss_dassert(rc == 0); + } else { + skygw_log_write( + LOGFILE_ERROR, + "%lu [poll_add_dcb] Unable to set new state for dcb %p " + "in state %s. Adding to poll set failed.", + pthread_self(), + dcb, + STRDCBSTATE(dcb->state)); + goto return_rc; + } + + if (rc != 0) { + int eno = errno; + errno = 0; + skygw_log_write( + LOGFILE_ERROR, + "%lu [poll_add_dcb] Adding dcb %p to poll set failed. " + "epoll_ctl failed due %d, %s.", + pthread_self(), + dcb, + STRDCBSTATE(dcb->state), + eno, + strerror(eno)); + goto return_rc; + } else { + skygw_log_write( + LOGFILE_DEBUG, + "%lu [poll_add_dcb] Added dcb %p to " + "poll set.", + pthread_self(), + dcb); + } +return_rc: return rc; - } /** @@ -125,31 +162,60 @@ poll_add_dcb(DCB *dcb) int poll_remove_dcb(DCB *dcb) { - struct epoll_event ev; - int rc; + struct epoll_event ev; + int rc = -1; + dcb_state_t old_state = DCB_STATE_UNDEFINED; + dcb_state_t new_state = DCB_STATE_NOPOLLING; - rc = epoll_ctl(epoll_fd, EPOLL_CTL_DEL, dcb->fd, &ev); - if (rc == 0) { - skygw_log_write( - LOGFILE_TRACE, - "%lu [poll_remove_dcb] Removed dcb %p in state %s from " - "poll set.", - pthread_self(), - dcb, - STRDCBSTATE(dcb->state)); - } else { - int eno = errno; - errno = 0; - skygw_log_write( - LOGFILE_TRACE, - "%lu [poll_remove_dcb] Removing dcb %p in state %s from " - "poll set failed due %d %s.", - pthread_self(), - dcb, - STRDCBSTATE(dcb->state), - eno, - strerror(eno)); + CHK_DCB(dcb); + + /** + * Set state to NOPOLLING and remove dcb from poll set. + */ + if (dcb_set_state(dcb, new_state, &old_state)) { + rc = epoll_ctl(epoll_fd, EPOLL_CTL_DEL, dcb->fd, &ev); + + if (rc != 0) { + int eno = errno; + errno = 0; + skygw_log_write_flush( + LOGFILE_ERROR, + "%lu [poll_remove_dcb] epoll_ctl failed due %d, %s.", + pthread_self(), + eno, + strerror(eno)); + } + ss_dassert(rc == 0); } + /** + * This call was redundant, but the end result is correct. + */ + else if (old_state == new_state) + { + rc = 0; + goto return_rc; + } + /** + * State transition failed. This may be due some more serious error + * in how dcb is handled. + */ + else + { + skygw_log_write_flush( + LOGFILE_ERROR, + "%lu [poll_remove_dcb] Unable to set state %s for dcb %p " + "in state %s. Removing from poll set failed.", + pthread_self(), + STRDCBSTATE(new_state), + STRDCBSTATE(old_state)); + ss_dassert(false); + goto return_rc; + } + + /** Set bit for each maxscale thread */ + bitmask_copy(&dcb->memdata.bitmask, poll_bitmask()); + rc = 0; +return_rc: return rc; } @@ -198,7 +264,7 @@ poll_waitevents(void *arg) } #else if (!no_op) { - skygw_log_write(LOGFILE_TRACE, + skygw_log_write(LOGFILE_DEBUG, "%lu [poll_waitevents] MaxScale thread %d > " "epoll_wait <", pthread_self(), @@ -211,7 +277,7 @@ poll_waitevents(void *arg) { int eno = errno; errno = 0; - skygw_log_write(LOGFILE_TRACE, + skygw_log_write(LOGFILE_DEBUG, "%lu [poll_waitevents] epoll_wait returned " "%d, errno %d", pthread_self(), @@ -232,7 +298,7 @@ poll_waitevents(void *arg) if (nfds > 0) { skygw_log_write( - LOGFILE_TRACE, + LOGFILE_DEBUG, "%lu [poll_waitevents] epoll_wait found %d fds", pthread_self(), nfds); @@ -248,7 +314,7 @@ poll_waitevents(void *arg) #if defined(SS_DEBUG) if (dcb_fake_write_ev[dcb->fd] != 0) { skygw_log_write( - LOGFILE_TRACE, + LOGFILE_DEBUG, "%lu %d [poll_waitevents] " "Added fake events %d to ev %d.", pthread_self(), @@ -265,8 +331,8 @@ poll_waitevents(void *arg) ss_dassert(dcb->state != DCB_STATE_FREED); ss_debug(spinlock_release(&dcb->dcb_initlock);) - skygw_log_write( - LOGFILE_TRACE, + skygw_log_write_flush( + LOGFILE_DEBUG, "%lu %d [poll_waitevents] event %d dcb %p", pthread_self(), thread_id, @@ -280,7 +346,7 @@ poll_waitevents(void *arg) if (eno == 0) { eno = dcb_fake_write_errno[dcb->fd]; skygw_log_write( - LOGFILE_TRACE, + LOGFILE_DEBUG, "%lu %d [poll_waitevents] " "Added fake errno %d. %s", pthread_self(), @@ -292,7 +358,7 @@ poll_waitevents(void *arg) #endif if (eno != 0) { skygw_log_write( - LOGFILE_TRACE, + LOGFILE_DEBUG, "%lu %d [poll_waitevents] " "EPOLLERR due %d, %s.", pthread_self(), @@ -321,13 +387,6 @@ poll_waitevents(void *arg) ss_info_dassert(!dcb->dcb_write_active, "Write already active"); dcb->dcb_write_active = TRUE; - skygw_log_write( - LOGFILE_TRACE, - "%lu %d [poll_waitevents] " - "Write in fd %d", - pthread_self(), - thread_id, - dcb->fd); atomic_add(&pollStats.n_write, 1); dcb->func.write_ready(dcb); dcb->dcb_write_active = FALSE; @@ -344,7 +403,7 @@ poll_waitevents(void *arg) if (dcb->state == DCB_STATE_LISTENING) { skygw_log_write( - LOGFILE_TRACE, + LOGFILE_DEBUG, "%lu %d [poll_waitevents] " "Accept in fd %d", pthread_self(), @@ -352,15 +411,16 @@ poll_waitevents(void *arg) dcb->fd); atomic_add(&pollStats.n_accept, 1); dcb->func.accept(dcb); - } + } else { skygw_log_write( - LOGFILE_TRACE, + LOGFILE_DEBUG, "%lu %d [poll_waitevents] " - "Read in fd %d", + "Read in dcb %p fd %d", pthread_self(), thread_id, + dcb, dcb->fd); atomic_add(&pollStats.n_read, 1); dcb->func.read(dcb); diff --git a/server/modules/include/mysql_client_server_protocol.h b/server/modules/include/mysql_client_server_protocol.h index 3eb96f7e4..c82444349 100644 --- a/server/modules/include/mysql_client_server_protocol.h +++ b/server/modules/include/mysql_client_server_protocol.h @@ -132,9 +132,6 @@ typedef struct mysql_session { } MYSQL_session; -/* MySQL states for authentication reply */ -#define MYSQL_FAILED_AUTHENTICATION 1 -#define MYSQL_SUCCESFUL_AUTHENTICATION 0 /* Protocol packing macros. */ #define gw_mysql_set_byte2(__buffer, __int) do { \ @@ -228,9 +225,9 @@ typedef enum /* Basic mysql commands */ #define MYSQL_COM_CHANGE_USER 0x11 -#define MYSQL_COM_QUIT 0x1 -#define MYSQL_COM_INIT_DB 0x2 -#define MYSQL_COM_QUERY 0x3 +#define MYSQL_COM_QUIT 0x1 +#define MYSQL_COM_INIT_DB 0x2 +#define MYSQL_COM_QUERY 0x3 #define MYSQL_GET_COMMAND(payload) (payload[4]) #define MYSQL_GET_PACKET_NO(payload) (payload[3]) @@ -242,14 +239,14 @@ void gw_mysql_close(MySQLProtocol **ptr); MySQLProtocol* mysql_protocol_init(DCB* dcb, int fd); MySQLProtocol *gw_mysql_init(MySQLProtocol *data); void gw_mysql_close(MySQLProtocol **ptr); -int gw_receive_backend_auth(MySQLProtocol *conn); -int gw_decode_mysql_server_handshake(MySQLProtocol *conn, uint8_t *payload); -int gw_read_backend_handshake(MySQLProtocol *conn); -int gw_send_authentication_to_backend( +bool gw_receive_backend_auth(MySQLProtocol *protocol); +int gw_decode_mysql_server_handshake(MySQLProtocol *protocol, uint8_t *payload); +int gw_read_backend_handshake(MySQLProtocol *protocol); +int gw_send_authentication_to_backend( char *dbname, char *user, uint8_t *passwd, - MySQLProtocol *conn); + MySQLProtocol *protocol); const char *gw_mysql_protocol_state2string(int state); int gw_do_connect_to_backend(char *host, int port, int* fd); int mysql_send_custom_error ( @@ -261,7 +258,7 @@ int gw_send_change_user_to_backend( char *dbname, char *user, uint8_t *passwd, - MySQLProtocol *conn); + MySQLProtocol *protocol); int gw_find_mysql_user_password_sha1( char *username, uint8_t *gateway_password, diff --git a/server/modules/protocol/mysql_backend.c b/server/modules/protocol/mysql_backend.c index 7e62539b8..a07d216b8 100644 --- a/server/modules/protocol/mysql_backend.c +++ b/server/modules/protocol/mysql_backend.c @@ -156,11 +156,16 @@ static int gw_read_backend_event(DCB *dcb) { /** return only with complete session */ current_session = gw_get_shared_session_auth_info(dcb); ss_dassert(current_session != NULL); - - /* fprintf(stderr, ">>> backend EPOLLIN from %i, command %i,protocol - * state [%s]\n", dcb->fd, dcb->command, gw_mysql_protocol_state2string - * (backend_protocol->state)); - */ + + skygw_log_write( + LOGFILE_DEBUG, + "%lu [gw_read_backend_event] Read dcb %p fd %d protocol " + "state %d, %s.", + pthread_self(), + dcb, + dcb->fd, + backend_protocol->state, + STRPROTOCOLSTATE(backend_protocol->state)); /* backend is connected: * @@ -176,30 +181,33 @@ static int gw_read_backend_event(DCB *dcb) { } else { // handshake decoded, send the auth credentials if (gw_send_authentication_to_backend( - current_session->db, - current_session->user, - current_session->client_sha1, - backend_protocol) != 0) + current_session->db, + current_session->user, + current_session->client_sha1, + backend_protocol) != 0) { backend_protocol->state = MYSQL_AUTH_FAILED; rc = 1; } else { - // next step is waiting server response with a new EPOLLIN event + /** + * next step is to wait server's response with + * a new EPOLLIN event + */ backend_protocol->state = MYSQL_AUTH_RECV; rc = 0; goto return_rc; } } } - /* * Now: * -- check the authentication reply from backend * OR * -- handle a previous handshake error */ - - if (backend_protocol->state == MYSQL_AUTH_RECV || backend_protocol->state == MYSQL_AUTH_FAILED) { + if (backend_protocol->state == MYSQL_AUTH_RECV || + backend_protocol->state == MYSQL_AUTH_FAILED) + { ROUTER_OBJECT *router = NULL; ROUTER *router_instance = NULL; void *rsession = NULL; @@ -211,98 +219,88 @@ static int gw_read_backend_event(DCB *dcb) { router = session->service->router; router_instance = session->service->router_instance; - /* if a MYSQL_AUTH_FAILED is detected, don't read from backend and set rv */ - if (backend_protocol->state == MYSQL_AUTH_FAILED) { - rv = MYSQL_FAILED_AUTHENTICATION; + if (backend_protocol->state == MYSQL_AUTH_RECV) { + /** + * Read backed auth reply + */ + if (!gw_receive_backend_auth(backend_protocol)) { + rv = -1; + backend_protocol->state = MYSQL_AUTH_FAILED; + } + } + + if (backend_protocol->state == MYSQL_AUTH_FAILED) { + skygw_log_write_flush( + LOGFILE_ERROR, + "%lu [gw_read_backend_event] " + "gw_receive_backend_auth failed. Fd %d, " + "user %s.", + pthread_self(), + dcb->fd, + current_session->user); + + + /* check the delayq before the reply */ + if (dcb->delayq) { + /* send an error to the client */ + mysql_send_custom_error( + dcb->session->client, + 1, + 0, + "Connection to backend lost right now"); + } + /** + * Protect call of closeSession. + */ + spinlock_acquire(&session->ses_lock); + rsession = session->router_session; + session->router_session = NULL; + spinlock_release(&session->ses_lock); + + if (rsession != NULL) { + skygw_log_write_flush( + LOGFILE_DEBUG, + "%lu [gw_read_backend_event] " + "Call closeSession for backend's " + "router client session.", + pthread_self()); + /* close router_session */ + router->closeSession(router_instance, rsession); + } else { + skygw_log_write( + LOGFILE_DEBUG, + "%lu [gw_read_backend_event] " + "closeSession already called " + "for backend session.", + pthread_self()); + } + rc = 1; + goto return_rc; } else { - /* yes, do read backed auth reply */ - rv = gw_receive_backend_auth(backend_protocol); - } + ss_dassert(backend_protocol->state == MYSQL_AUTH_RECV); + skygw_log_write_flush( + LOGFILE_DEBUG, + "%lu [gw_read_backend_event] " + "gw_receive_backend_auth succeed. Fd %d, " + "user %s.", + pthread_self(), + dcb->fd, + current_session->user); - switch (rv) { - case MYSQL_FAILED_AUTHENTICATION: - skygw_log_write_flush( - LOGFILE_ERROR, - "%lu [gw_read_backend_event] caught " - "MYSQL_FAILED_AUTHENTICATION from " - "gw_receive_backend_auth. Fd %d, " - "user %s.", - pthread_self(), - dcb->fd, - current_session->user); - - backend_protocol->state = MYSQL_AUTH_FAILED; - - /* check the delayq before the reply */ - if (dcb->delayq) { - /* send an error to the client */ - mysql_send_custom_error( - dcb->session->client, - 1, - 0, - "Connection to backend lost right now"); - } - - /** - * Protect call of closeSession. - */ - spinlock_acquire(&session->ses_lock); - rsession = session->router_session; - session->router_session = NULL; - spinlock_release(&session->ses_lock); - - if (rsession != NULL) { - skygw_log_write_flush( - LOGFILE_TRACE, - "%lu [gw_read_backend_event] " - "Call closeSession for backend " - "session.", - pthread_self()); - /* close the active session */ - router->closeSession(router_instance, - rsession); - } else { - skygw_log_write( - LOGFILE_TRACE, - "%lu [gw_read_backend_event] " - "closeSession already called " - "for backend session.", - pthread_self()); - } + spinlock_acquire(&dcb->authlock); + backend_protocol->state = MYSQL_IDLE; + /* check the delay queue and flush the data */ + if(dcb->delayq) { + backend_write_delayqueue(dcb); + spinlock_release(&dcb->authlock); rc = 1; - goto return_rc; - - case MYSQL_SUCCESFUL_AUTHENTICATION: - skygw_log_write_flush( - LOGFILE_TRACE, - "%lu [gw_read_backend_event] caught " - "MYSQL_SUCCESFUL_AUTHENTICATION from " - "gw_receive_backend_auth. Fd %d, " - "user %s.", - pthread_self(), - dcb->fd, - current_session->user); - - spinlock_acquire(&dcb->authlock); - backend_protocol->state = MYSQL_IDLE; - /* check the delay queue and flush the data */ - if(dcb->delayq) { - backend_write_delayqueue(dcb); - spinlock_release(&dcb->authlock); - rc = 1; - goto return_rc; - } - spinlock_release(&dcb->authlock); - rc = 0; - goto return_rc; - - default: - /* no other authentication state here right - * now, so just return */ - rc = 0; - goto return_rc; - } /**< switch (rv) */ - } /**< if (backend_protocol->state == MYSQL_AUTH_RECV) */ + goto return_rc; + } + spinlock_release(&dcb->authlock); + rc = 0; + goto return_rc; + } /* MYSQL_AUTH_FAILED */ + } /* MYSQL_AUTH_RECV || MYSQL_AUTH_FAILED */ /* reading MySQL command output from backend and writing to the client */ { @@ -317,10 +315,16 @@ static int gw_read_backend_event(DCB *dcb) { rc = dcb_read(dcb, &head); if (rc <= 0) { - rc = 1; + /** + * Backend generated EPOLLIN event and if there is + * nothing to read or backend failed, connection + * must be closed to avoid backend dcb from getting + * hanged. + */ + (dcb->func).close(dcb); + rc = 0; goto return_rc; } - router = session->service->router; router_instance = session->service->router_instance; rsession = session->router_session; @@ -364,9 +368,10 @@ return_rc: * EPOLLOUT handler for the MySQL Backend protocol module. * * @param dcb The descriptor control block - * @return The number of bytes written + * @return 1 in success, 0 in case of failure, */ static int gw_write_backend_event(DCB *dcb) { + int rc; MySQLProtocol *backend_protocol = dcb->protocol; //fprintf(stderr, ">>> backend EPOLLOUT %i, protocol state [%s]\n", backend_protocol->fd, gw_mysql_protocol_state2string(backend_protocol->state)); @@ -383,9 +388,8 @@ static int gw_write_backend_event(DCB *dcb) { 1, 0, "Writing to backend failed"); - - return 0; - + rc = 0; + goto return_rc; } /** * vraa: what is the logic in this? @@ -393,11 +397,23 @@ static int gw_write_backend_event(DCB *dcb) { if (backend_protocol->state == MYSQL_PENDING_CONNECT) { backend_protocol->state = MYSQL_CONNECTED; // spinlock_release(&dcb->connectlock); - return 1; + rc = 1; + goto return_rc; } // spinlock_release(&dcb->connectlock); dcb_drain_writeq(dcb); - return 1; + rc = 1; +return_rc: + skygw_log_write( + LOGFILE_TRACE, + "%lu [gw_write_backend_event] " + "wrote to dcb %p fd %d, return %d", + pthread_self(), + dcb, + dcb->fd, + rc); + + return rc; } /* @@ -417,21 +433,27 @@ gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue) * Don't write to backend if backend_dcb is not in poll set anymore. */ if (dcb->state != DCB_STATE_POLLING) { - mysql_send_custom_error( - dcb->session->client, - 1, - 0, - "Writing to backend failed"); - + skygw_log_write_flush( + LOGFILE_ERROR, + "%lu [gw_MySQLWrite_backend] Write to backend failed. " + "Backend dcb is %s.", + pthread_self(), + STRDCBSTATE(dcb->state)); return 0; - } spinlock_acquire(&dcb->authlock); - /** * Now put the incoming data to the delay queue unless backend is connected with auth ok */ if (backend_protocol->state != MYSQL_IDLE) { + skygw_log_write( + LOGFILE_DEBUG, + "%lu [gw_MySQLWrite_backend] dcb %p fd %d protocol state %s.", + pthread_self(), + dcb, + dcb->fd, + STRPROTOCOLSTATE(backend_protocol->state)); + //fprintf(stderr, ">>> Writing in the backend %i delay queue: last dcb command %i, queue command %i, protocol state [%s]\n", dcb->fd, dcb->command, queue->command, gw_mysql_protocol_state2string(dcb->state)); backend_set_delayqueue(dcb, queue); @@ -454,7 +476,7 @@ gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue) * */ static int gw_error_backend_event(DCB *dcb) { - SESSION *session = dcb->session; + SESSION *session; void *rsession; ROUTER_OBJECT *router; ROUTER *router_instance; @@ -466,7 +488,7 @@ static int gw_error_backend_event(DCB *dcb) { router = session->service->router; router_instance = session->service->router_instance; - + if (dcb->state != DCB_STATE_POLLING) { /** * if client is not available it needs to be handled in send @@ -487,29 +509,28 @@ static int gw_error_backend_event(DCB *dcb) { "Closed backend connection."); rc = 1; } - skygw_log_write_flush( LOGFILE_ERROR, "%lu [gw_error_backend_event] Some error occurred in backend. rc = %d", pthread_self(), rc); - + /* close the active session */ spinlock_acquire(&session->ses_lock); rsession = session->router_session; session->router_session = NULL; spinlock_release(&session->ses_lock); - + if (rsession != NULL) { - skygw_log_write_flush( - LOGFILE_TRACE, - "%lu [gw_read_backend_event] " - "Call closeSession for backend " - "session.", + skygw_log_write_flush( + LOGFILE_TRACE, + "%lu [gw_read_backend_event] " + "Call closeSession for backend " + "session.", pthread_self()); - + router->closeSession(router_instance, rsession); } - + return rc; } @@ -557,6 +578,7 @@ static int gw_create_backend_connection( switch (rv) { case 0: ss_dassert(fd > 0); + protocol->fd = fd; protocol->state = MYSQL_CONNECTED; skygw_log_write( LOGFILE_TRACE, @@ -573,6 +595,7 @@ static int gw_create_backend_connection( case 1: ss_dassert(fd > 0); protocol->state = MYSQL_PENDING_CONNECT; + protocol->fd = fd; skygw_log_write( LOGFILE_TRACE, "%lu [gw_create_backend_connection] Connection " @@ -679,9 +702,24 @@ static int backend_write_delayqueue(DCB *dcb) spinlock_release(&dcb->delayqlock); rc = dcb_write(dcb, localq); - return rc; -} + if (rc == 0) { + skygw_log_write_flush( + LOGFILE_ERROR, + "%lu [backend_write_delayqueue] Some error occurred in " + "backend.", + pthread_self()); + + mysql_send_custom_error( + dcb->session->client, + 1, + 0, + "Closed backend connection."); + dcb_close(dcb); + } + return rc; +} + static int gw_change_user(DCB *backend, SERVER *server, SESSION *in_session, GWBUF *queue) { diff --git a/server/modules/protocol/mysql_client.c b/server/modules/protocol/mysql_client.c index 884d3fca3..75811e68c 100644 --- a/server/modules/protocol/mysql_client.c +++ b/server/modules/protocol/mysql_client.c @@ -576,7 +576,7 @@ int gw_read_client_event(DCB* dcb) { //write to client mysql AUTH_OK packet, packet n. is 2 // start a new session, and connect to backends session = session_alloc(dcb->service, dcb); - + if (session != NULL) { CHK_SESSION(session); ss_dassert(session->state != SESSION_STATE_ALLOC); @@ -667,6 +667,14 @@ int gw_read_client_event(DCB* dcb) { * fprintf(stderr, "COM_QUIT received with * no connected backends from %i\n", dcb->fd); */ + skygw_log_write_flush( + LOGFILE_DEBUG, + "%lu [gw_read_client_event] Client read " + "COM_QUIT and rsession == NULL. Closing " + "client dcb %p.", + pthread_self(), + dcb); + (dcb->func).close(dcb); } else { /* Send a custom error as MySQL command reply */ @@ -683,6 +691,12 @@ int gw_read_client_event(DCB* dcb) { /* We can route the query */ /* COM_QUIT handling */ if (mysql_command == '\x01') { + skygw_log_write_flush( + LOGFILE_DEBUG, + "%lu [gw_read_client_event] Before routeQuery. " + "dcb %p.", + pthread_self(), + dcb); /** * fprintf(stderr, "COM_QUIT received from %i and * passed to backed\n", dcb->fd); @@ -690,6 +704,13 @@ int gw_read_client_event(DCB* dcb) { * fprintf(stderr, "<<< Routing the COM_QUIT ...\n"); */ router->routeQuery(router_instance, rsession, queue); + skygw_log_write_flush( + LOGFILE_DEBUG, + "%lu [gw_read_client_event] After routeQuery. " + "dcb %p.", + pthread_self(), + dcb); + /* close client connection */ (dcb->func).close(dcb); rc = 1; @@ -701,10 +722,18 @@ int gw_read_client_event(DCB* dcb) { /* writing in the backend buffer queue, via routeQuery */ //fprintf(stderr, "<<< Routing the Query ...\n"); - router->routeQuery(router_instance, - rsession, - queue); - protocol->state = MYSQL_WAITING_RESULT; + rc = router->routeQuery(router_instance, rsession, queue); + + if (rc == 1) { + protocol->state = MYSQL_WAITING_RESULT; + } else { + mysql_send_custom_error(dcb, + 1, + 0, + "Connection to backend lost."); + protocol->state = MYSQL_IDLE; + goto return_rc; + } } break; @@ -715,6 +744,14 @@ int gw_read_client_event(DCB* dcb) { rc = 0; return_rc: +#if defined(SS_DEBUG) + if (dcb->state == DCB_STATE_POLLING || + dcb->state == DCB_STATE_NOPOLLING || + dcb->state == DCB_STATE_ZOMBIE) + { + CHK_PROTOCOL(protocol); + } +#endif return rc; } @@ -766,6 +803,14 @@ int gw_write_client_event(DCB *dcb) } return_1: +#if defined(SS_DEBUG) + if (dcb->state == DCB_STATE_POLLING || + dcb->state == DCB_STATE_NOPOLLING || + dcb->state == DCB_STATE_ZOMBIE) + { + CHK_PROTOCOL(protocol); + } +#endif return 1; } @@ -867,20 +912,36 @@ int gw_MySQLListener( } +/** + * @node (write brief function description here) + * + * Parameters: + * @param listener - + * + * + * @return 0 in success, 1 in failure + * + * + * @details (write detailed description here) + * + */ int gw_MySQLAccept(DCB *listener) -{ +{ + int rc = 0; + DCB *client_dcb; + MySQLProtocol *protocol; + int c_sock; + struct sockaddr_in local; + socklen_t addrlen = sizeof(struct sockaddr_in); + int sendbuf = GW_BACKEND_SO_SNDBUF; + socklen_t optlen = sizeof(sendbuf); + int eno = 0; + int i = 0; + + CHK_DCB(listener); fprintf(stderr, "MySQL Listener socket is: %i\n", listener->fd); while (1) { - int c_sock; - struct sockaddr_in local; - socklen_t addrlen = sizeof(struct sockaddr_in); - DCB *client_dcb; - MySQLProtocol *protocol; - int sendbuf = GW_BACKEND_SO_SNDBUF; - socklen_t optlen = sizeof(sendbuf); - int eno = 0; - static int i; retry_accept: // new connection from client @@ -895,6 +956,7 @@ int gw_MySQLAccept(DCB *listener) if (eno == EAGAIN || eno == EWOULDBLOCK) { + rc = 1; /* We have processed all incoming connections. */ break; } @@ -915,7 +977,8 @@ int gw_MySQLAccept(DCB *listener) if (i<10) { goto retry_accept; } - goto return_to_poll; + rc = 1; + goto return_rc; } else if (eno == EMFILE) { @@ -934,7 +997,8 @@ int gw_MySQLAccept(DCB *listener) if (i<10) { goto retry_accept; } - goto return_to_poll; + rc = 1; + goto return_rc; } else { @@ -982,12 +1046,16 @@ int gw_MySQLAccept(DCB *listener) ss_dassert(protocol != NULL); if (protocol == NULL) { + /** delete client_dcb */ + dcb_close(client_dcb); + skygw_log_write_flush( LOGFILE_ERROR, "%lu [gw_MySQLAccept] Failed to create " "protocol object for client connection.", pthread_self()); - return 1; + rc = 1; + goto return_rc; } client_dcb->protocol = protocol; // assign function poiters to "func" field @@ -1005,6 +1073,9 @@ int gw_MySQLAccept(DCB *listener) */ if (poll_add_dcb(client_dcb) == -1) { + /** delete client_dcb */ + dcb_close(client_dcb); + /** Previous state is recovered in poll_add_dcb. */ skygw_log_write_flush( LOGFILE_ERROR, @@ -1013,7 +1084,8 @@ int gw_MySQLAccept(DCB *listener) pthread_self(), client_dcb, client_dcb->fd); - return 1; + rc = 1; + goto return_rc; } else { @@ -1026,8 +1098,15 @@ int gw_MySQLAccept(DCB *listener) client_dcb->fd); } } /**< while 1 */ -return_to_poll: - return 0; +#if defined(SS_DEBUG) + if (rc == 0) { + CHK_DCB(client_dcb); + protocol = (MySQLProtocol *)client_dcb->protocol; + CHK_PROTOCOL(protocol); + } +#endif +return_rc: + return rc; } /* @@ -1036,16 +1115,34 @@ static int gw_error_client_event(DCB *dcb) { /** * should this be removed if we don't want to execute it ? * - //fprintf(stderr, "#### Handle error function gw_error_client_event, for [%i] is [%s]\n", dcb->fd, gw_dcb_state2string(dcb->state)); + //fprintf(stderr, "#### Handle error function gw_error_client_event, for [%i] is [%s]\n", dcb->fd, gw_state2string(dcb->state)); //dcb_close(dcb); */ - +#if defined(SS_DEBUG) + MySQLProtocol* protocol = (MySQLProtocol *)dcb->protocol; + if (dcb->state == DCB_STATE_POLLING || + dcb->state == DCB_STATE_NOPOLLING || + dcb->state == DCB_STATE_ZOMBIE) + { + CHK_PROTOCOL(protocol); + } +#endif return 1; } static int gw_client_close(DCB *dcb) { +#if defined(SS_DEBUG) + MySQLProtocol* protocol = (MySQLProtocol *)dcb->protocol; + if (dcb->state == DCB_STATE_POLLING || + dcb->state == DCB_STATE_NOPOLLING || + dcb->state == DCB_STATE_ZOMBIE) + { + CHK_PROTOCOL(protocol); + } +#endif + dcb_close(dcb); return 1; } @@ -1061,6 +1158,15 @@ gw_client_close(DCB *dcb) static int gw_client_hangup_event(DCB *dcb) { +#if defined(SS_DEBUG) + MySQLProtocol* protocol = (MySQLProtocol *)dcb->protocol; + if (dcb->state == DCB_STATE_POLLING || + dcb->state == DCB_STATE_NOPOLLING || + dcb->state == DCB_STATE_ZOMBIE) + { + CHK_PROTOCOL(protocol); + } +#endif dcb_close(dcb); return 1; } diff --git a/server/modules/protocol/mysql_common.c b/server/modules/protocol/mysql_common.c index c7f38c1f3..adb72381b 100644 --- a/server/modules/protocol/mysql_common.c +++ b/server/modules/protocol/mysql_common.c @@ -290,34 +290,41 @@ int gw_decode_mysql_server_handshake(MySQLProtocol *conn, uint8_t *payload) { return 0; } + /** * Receive the MySQL authentication packet from backend, packet # is 2 * * @param conn The MySQL protocol structure - * @return 0 for user authenticated or 1 for authentication failed + * @return true if authentication succeed, false otherwise */ -int gw_receive_backend_auth(MySQLProtocol *conn) { - int rv = 1; +bool gw_receive_backend_auth( + MySQLProtocol *protocol) +{ int n = -1; - GWBUF *head = NULL; - DCB *dcb = conn->owner_dcb; + GWBUF *head = NULL; + DCB *dcb = protocol->owner_dcb; uint8_t *ptr = NULL; + bool succp = false; - if ((n = dcb_read(dcb, &head)) != -1) { - if (head) { - ptr = GWBUF_DATA(head); - // check if the auth is SUCCESFUL - if (ptr[4] == '\x00') { - // Auth is OK - rv = 0; - } else { - rv = 1; - } - // consume all the data here - head = gwbuf_consume(head, gwbuf_length(head)); - } - } - return rv; + n = dcb_read(dcb, &head); + + if (n != -1 && + head != NULL && + GWBUF_LENGTH(head) >= 5) + { + ptr = GWBUF_DATA(head); + /** + * 5th byte is 0x0 if successful. + */ + if (ptr[4] == '\x00') { + succp = true; + } + /** + * Remove data from buffer. + */ + head = gwbuf_consume(head, gwbuf_length(head)); + } + return succp; } /** diff --git a/server/modules/routing/readconnroute.c b/server/modules/routing/readconnroute.c index 8cfb165fa..21b09dfb1 100644 --- a/server/modules/routing/readconnroute.c +++ b/server/modules/routing/readconnroute.c @@ -263,10 +263,10 @@ int i, n; static void * newSession(ROUTER *instance, SESSION *session) { -ROUTER_INSTANCE *inst = (ROUTER_INSTANCE *)instance; -ROUTER_CLIENT_SES *client_ses; -BACKEND *candidate = NULL; -int i; +ROUTER_INSTANCE *inst = (ROUTER_INSTANCE *)instance; +ROUTER_CLIENT_SES *client_ses; +BACKEND *candidate = NULL; +int i; skygw_log_write_flush( LOGFILE_TRACE, @@ -277,8 +277,10 @@ int i; inst); - if ((client_ses = (ROUTER_CLIENT_SES *)malloc(sizeof(ROUTER_CLIENT_SES))) == NULL) { - return NULL; + client_ses = (ROUTER_CLIENT_SES *)malloc(sizeof(ROUTER_CLIENT_SES)); + + if (client_ses == NULL) { + return NULL; } /* * Find a backend server to connect to. This is the extent of the @@ -494,7 +496,7 @@ routeQuery(ROUTER *instance, void *router_session, GWBUF *queue) ROUTER_INSTANCE *inst = (ROUTER_INSTANCE *)instance; ROUTER_CLIENT_SES *session = (ROUTER_CLIENT_SES *)router_session; uint8_t *payload = GWBUF_DATA(queue); -int mysql_command = -1; +int mysql_command; int rc; inst->stats.n_queries++; @@ -515,6 +517,7 @@ int rc; session->backend_dcb, queue); } + CHK_PROTOCOL(((MySQLProtocol*)session->backend_dcb->protocol)); skygw_log_write( LOGFILE_DEBUG, "%lu [readconnroute:routeQuery] Routed command %d to dcb %p " diff --git a/utils/skygw_debug.h b/utils/skygw_debug.h index 9d8ed6909..0dff06c3a 100644 --- a/utils/skygw_debug.h +++ b/utils/skygw_debug.h @@ -158,6 +158,17 @@ typedef enum skygw_chk_t { ((s) == SESSION_STATE_LISTENER_STOPPED ? "SESSION_STATE_LISTENER_STOPPED" : \ "SESSION_STATE_UNKNOWN")))) +#define STRPROTOCOLSTATE(s) ((s) == MYSQL_ALLOC ? "MYSQL_ALLOC" : \ + ((s) == MYSQL_PENDING_CONNECT ? "MYSQL_PENDING_CONNECT" : \ + ((s) == MYSQL_CONNECTED ? "MYSQL_CONNECTED" : \ + ((s) == MYSQL_AUTH_SENT ? "MYSQL_AUTH_SENT" : \ + ((s) == MYSQL_AUTH_RECV ? "MYSQL_AUTH_RECV" : \ + ((s) == MYSQL_AUTH_FAILED ? "MYSQL_AUTH_FAILED" : \ + ((s) == MYSQL_IDLE ? "MYSQL_IDLE" : \ + ((s) == MYSQL_ROUTING ? "MYSQL_ROUTING" : \ + ((s) == MYSQL_WAITING_RESULT ? "MYSQL_WAITING_RESULT" : \ + ((s) == MYSQL_SESSION_CHANGE ? "MYSQL_SESSION_CHANGE" : \ + "UNKNOWN MYSQL STATE")))))))))) #define CHK_MLIST(l) { \ ss_info_dassert((l->mlist_chk_top == CHK_NUM_MLIST && \