diff --git a/server/core/dcb.c b/server/core/dcb.c index d7262fd07..6eeb0b8ff 100644 --- a/server/core/dcb.c +++ b/server/core/dcb.c @@ -146,7 +146,8 @@ dcb_add_to_zombieslist(DCB *dcb) CHK_DCB(dcb); if (dcb->state != DCB_STATE_NOPOLLING) { - ss_dassert(dcb->state != DCB_STATE_POLLING); + ss_dassert(dcb->state != DCB_STATE_POLLING && + dcb->state != DCB_STATE_LISTENING); return; } /** @@ -236,35 +237,7 @@ void* rsession = NULL; } spinlock_release(&dcbspin); - /** - * Terminate router session. - */ if (dcb->session) { - service = dcb->session->service; - - if (service != NULL && - service->router != NULL && - dcb->session->router_session != NULL) - { - /** - * Protect call of closeSession. - */ - spinlock_acquire(&dcb->session->ses_lock); - rsession = dcb->session->router_session; - spinlock_release(&dcb->session->ses_lock); - if (rsession != NULL) { - service->router->closeSession( - service->router_instance, - rsession); - } else { - skygw_log_write_flush( - LOGFILE_TRACE, - "%lu [dcb_final_free] rsession was NULL in " - "dcb_close.", - pthread_self()); - } - } - /** * Terminate client session. */ @@ -283,7 +256,7 @@ void* rsession = NULL; } } - if (dcb->protocol) + if (dcb->protocol != NULL) free(dcb->protocol); if (dcb->data) free(dcb->data); @@ -379,7 +352,7 @@ bool succp = false; spinlock_release(&zombiespin); dcb = dcb_list; - + /** Close, and set DISCONNECTED victims */ while (dcb != NULL) { DCB* dcb_next = NULL; int rc = 0; @@ -447,6 +420,7 @@ int fd; } if ((funcs = (GWPROTOCOL *)load_module(protocol, MODULE_PROTOCOL)) == NULL) { + dcb_set_state(dcb, DCB_STATE_DISCONNECTED, NULL); dcb_final_free(dcb); skygw_log_write( LOGFILE_ERROR, @@ -553,17 +527,19 @@ int eno = 0; errno = 0; skygw_log_write( LOGFILE_ERROR, - "%lu [dcb_read] ioctl FIONREAD for fd %d failed. " + "%lu [dcb_read] ioctl FIONREAD for dcb %p fd %d " + "failed. " "errno %d, %s. dcb->state = %d", pthread_self(), + dcb, dcb->fd, - eno , + eno, strerror(eno), dcb->state); n = -1; goto return_n; } - + /** Nothing to read - leave */ if (b == 0) { goto return_n; } @@ -575,7 +551,18 @@ int eno = 0; * This is a fatal error which should cause shutdown. * vraa : todo shutdown if memory allocation fails. */ + skygw_log_write( + LOGFILE_ERROR, + "%lu [dcb_read] Failed to allocate read buffer " + "for dcb %p fd %d, due %d, %d.", + pthread_self(), + dcb, + dcb->fd, + eno, + strerror(eno)); + n = -1; + ss_dassert(buffer != NULL); goto return_n; } GW_NOINTR_CALL(n = read(dcb->fd, GWBUF_DATA(buffer), bufsize); @@ -583,15 +570,28 @@ int eno = 0; if (n <= 0) { + int eno = errno; + errno = 0; + + skygw_log_write( + LOGFILE_ERROR, + "%lu [dcb_read] Read failed, dcb %p fd %d, due " + "%d, %d.", + pthread_self(), + dcb, + dcb->fd, + eno, + strerror(eno)); gwbuf_free(buffer); goto return_n; } skygw_log_write( LOGFILE_TRACE, - "%lu [dcb_read] Read %d bytes from fd %d", + "%lu [dcb_read] Read %d bytes from dcb %p fd %d", pthread_self(), n, + dcb, dcb->fd); /** Append read data to the gwbuf */ *head = gwbuf_append(*head, buffer); @@ -613,6 +613,7 @@ dcb_write(DCB *dcb, GWBUF *queue) int w, saved_errno = 0; spinlock_acquire(&dcb->writeqlock); + if (dcb->writeq) { /* @@ -696,9 +697,10 @@ int w, saved_errno = 0; } skygw_log_write( LOGFILE_TRACE, - "%lu [dcb_write] Wrote %d Bytes to fd %d", + "%lu [dcb_write] Wrote %d Bytes to dcb %p fd %d", pthread_self(), w, + dcb, dcb->fd); } /* Buffer the balance of any data */ @@ -805,29 +807,32 @@ int saved_errno = 0; void dcb_close(DCB *dcb) { - dcb_state_t prev_state; - bool succp; - + bool succp; + int rc; CHK_DCB(dcb); /** - * Only the first call to dcb_close removes dcb from poll set. + * Stop dcb's listening and modify state accordingly. */ - spinlock_acquire(&dcb->dcb_initlock); - succp = dcb_set_state_nomutex(dcb, DCB_STATE_NOPOLLING, &prev_state); - - if (succp) { - poll_remove_dcb(dcb); - /* Set the bitmask of running polling threads */ - bitmask_copy(&dcb->memdata.bitmask, poll_bitmask()); + rc = poll_remove_dcb(dcb); + + if (rc == 0) { + skygw_log_write( + LOGFILE_TRACE, + "%lu [dcb_close] Removed dcb %p in state %s from " + "poll set.", + pthread_self(), + dcb, + STRDCBSTATE(dcb->state)); } else { - ss_info_dassert(!dcb_isclient(dcb) || - prev_state == DCB_STATE_NOPOLLING || - prev_state == DCB_STATE_ZOMBIE, - "Invalid state transition."); + skygw_log_write( + LOGFILE_TRACE, + "%lu [poll_remove_dcb] Removing dcb %p in state %s from " + "poll set failed.", + pthread_self(), + dcb, + STRDCBSTATE(dcb->state)); } - spinlock_release(&dcb->dcb_initlock); - if (dcb->state == DCB_STATE_NOPOLLING) { dcb_add_to_zombieslist(dcb); } @@ -1034,7 +1039,7 @@ bool dcb_set_state( CHK_DCB(dcb); spinlock_acquire(&dcb->dcb_initlock); succp = dcb_set_state_nomutex(dcb, new_state, &state); - ss_info_dassert(succp, "Failed to set new state for dcb"); + spinlock_release(&dcb->dcb_initlock); if (old_state != NULL) { diff --git a/server/core/poll.c b/server/core/poll.c index 2a1067252..54308f7df 100644 --- a/server/core/poll.c +++ b/server/core/poll.c @@ -90,29 +90,66 @@ poll_init() int poll_add_dcb(DCB *dcb) { - int rc; + int rc = -1; dcb_state_t old_state = DCB_STATE_UNDEFINED; + dcb_state_t new_state; struct epoll_event ev; + CHK_DCB(dcb); + ev.events = EPOLLIN | EPOLLOUT | EPOLLET; ev.data.ptr = dcb; /** - * Service listeners have different state than - * DCBs serving client requests. + * Choose new state according to the role of dcb. */ - if (dcb->dcb_role == DCB_ROLE_SERVICE_LISTENER) { - dcb_set_state(dcb, DCB_STATE_LISTENING, &old_state); - } else if (dcb->dcb_role == DCB_ROLE_REQUEST_HANDLER) { - dcb_set_state(dcb, DCB_STATE_POLLING, &old_state); - } - - rc = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, dcb->fd, &ev); - if (rc != 0) { - dcb_set_state(dcb, old_state, NULL); + if (dcb->dcb_role == DCB_ROLE_REQUEST_HANDLER) { + new_state = DCB_STATE_POLLING; + } else { + ss_dassert(dcb->dcb_role == DCB_ROLE_SERVICE_LISTENER); + new_state = DCB_STATE_LISTENING; } + /** + * If dcb is in unexpected state, state change fails indicating that dcb + * is not polling anymore. + */ + if (dcb_set_state(dcb, new_state, &old_state)) { + rc = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, dcb->fd, &ev); + ss_dassert(rc == 0); + } else { + skygw_log_write( + LOGFILE_ERROR, + "%lu [poll_add_dcb] Unable to set new state for dcb %p " + "in state %s. Adding to poll set failed.", + pthread_self(), + dcb, + STRDCBSTATE(dcb->state)); + goto return_rc; + } + + if (rc != 0) { + int eno = errno; + errno = 0; + skygw_log_write( + LOGFILE_ERROR, + "%lu [poll_add_dcb] Adding dcb %p to poll set failed. " + "epoll_ctl failed due %d, %s.", + pthread_self(), + dcb, + STRDCBSTATE(dcb->state), + eno, + strerror(eno)); + goto return_rc; + } else { + skygw_log_write( + LOGFILE_DEBUG, + "%lu [poll_add_dcb] Added dcb %p to " + "poll set.", + pthread_self(), + dcb); + } +return_rc: return rc; - } /** @@ -125,31 +162,60 @@ poll_add_dcb(DCB *dcb) int poll_remove_dcb(DCB *dcb) { - struct epoll_event ev; - int rc; + struct epoll_event ev; + int rc = -1; + dcb_state_t old_state = DCB_STATE_UNDEFINED; + dcb_state_t new_state = DCB_STATE_NOPOLLING; - rc = epoll_ctl(epoll_fd, EPOLL_CTL_DEL, dcb->fd, &ev); - if (rc == 0) { - skygw_log_write( - LOGFILE_TRACE, - "%lu [poll_remove_dcb] Removed dcb %p in state %s from " - "poll set.", - pthread_self(), - dcb, - STRDCBSTATE(dcb->state)); - } else { - int eno = errno; - errno = 0; - skygw_log_write( - LOGFILE_TRACE, - "%lu [poll_remove_dcb] Removing dcb %p in state %s from " - "poll set failed due %d %s.", - pthread_self(), - dcb, - STRDCBSTATE(dcb->state), - eno, - strerror(eno)); + CHK_DCB(dcb); + + /** + * Set state to NOPOLLING and remove dcb from poll set. + */ + if (dcb_set_state(dcb, new_state, &old_state)) { + rc = epoll_ctl(epoll_fd, EPOLL_CTL_DEL, dcb->fd, &ev); + + if (rc != 0) { + int eno = errno; + errno = 0; + skygw_log_write_flush( + LOGFILE_ERROR, + "%lu [poll_remove_dcb] epoll_ctl failed due %d, %s.", + pthread_self(), + eno, + strerror(eno)); + } + ss_dassert(rc == 0); } + /** + * This call was redundant, but the end result is correct. + */ + else if (old_state == new_state) + { + rc = 0; + goto return_rc; + } + /** + * State transition failed. This may be due some more serious error + * in how dcb is handled. + */ + else + { + skygw_log_write_flush( + LOGFILE_ERROR, + "%lu [poll_remove_dcb] Unable to set state %s for dcb %p " + "in state %s. Removing from poll set failed.", + pthread_self(), + STRDCBSTATE(new_state), + STRDCBSTATE(old_state)); + ss_dassert(false); + goto return_rc; + } + + /** Set bit for each maxscale thread */ + bitmask_copy(&dcb->memdata.bitmask, poll_bitmask()); + rc = 0; +return_rc: return rc; } @@ -198,7 +264,7 @@ poll_waitevents(void *arg) } #else if (!no_op) { - skygw_log_write(LOGFILE_TRACE, + skygw_log_write(LOGFILE_DEBUG, "%lu [poll_waitevents] MaxScale thread %d > " "epoll_wait <", pthread_self(), @@ -211,7 +277,7 @@ poll_waitevents(void *arg) { int eno = errno; errno = 0; - skygw_log_write(LOGFILE_TRACE, + skygw_log_write(LOGFILE_DEBUG, "%lu [poll_waitevents] epoll_wait returned " "%d, errno %d", pthread_self(), @@ -232,7 +298,7 @@ poll_waitevents(void *arg) if (nfds > 0) { skygw_log_write( - LOGFILE_TRACE, + LOGFILE_DEBUG, "%lu [poll_waitevents] epoll_wait found %d fds", pthread_self(), nfds); @@ -248,7 +314,7 @@ poll_waitevents(void *arg) #if defined(SS_DEBUG) if (dcb_fake_write_ev[dcb->fd] != 0) { skygw_log_write( - LOGFILE_TRACE, + LOGFILE_DEBUG, "%lu %d [poll_waitevents] " "Added fake events %d to ev %d.", pthread_self(), @@ -265,8 +331,8 @@ poll_waitevents(void *arg) ss_dassert(dcb->state != DCB_STATE_FREED); ss_debug(spinlock_release(&dcb->dcb_initlock);) - skygw_log_write( - LOGFILE_TRACE, + skygw_log_write_flush( + LOGFILE_DEBUG, "%lu %d [poll_waitevents] event %d dcb %p", pthread_self(), thread_id, @@ -280,7 +346,7 @@ poll_waitevents(void *arg) if (eno == 0) { eno = dcb_fake_write_errno[dcb->fd]; skygw_log_write( - LOGFILE_TRACE, + LOGFILE_DEBUG, "%lu %d [poll_waitevents] " "Added fake errno %d. %s", pthread_self(), @@ -292,7 +358,7 @@ poll_waitevents(void *arg) #endif if (eno != 0) { skygw_log_write( - LOGFILE_TRACE, + LOGFILE_DEBUG, "%lu %d [poll_waitevents] " "EPOLLERR due %d, %s.", pthread_self(), @@ -321,13 +387,6 @@ poll_waitevents(void *arg) ss_info_dassert(!dcb->dcb_write_active, "Write already active"); dcb->dcb_write_active = TRUE; - skygw_log_write( - LOGFILE_TRACE, - "%lu %d [poll_waitevents] " - "Write in fd %d", - pthread_self(), - thread_id, - dcb->fd); atomic_add(&pollStats.n_write, 1); dcb->func.write_ready(dcb); dcb->dcb_write_active = FALSE; @@ -344,7 +403,7 @@ poll_waitevents(void *arg) if (dcb->state == DCB_STATE_LISTENING) { skygw_log_write( - LOGFILE_TRACE, + LOGFILE_DEBUG, "%lu %d [poll_waitevents] " "Accept in fd %d", pthread_self(), @@ -352,15 +411,16 @@ poll_waitevents(void *arg) dcb->fd); atomic_add(&pollStats.n_accept, 1); dcb->func.accept(dcb); - } + } else { skygw_log_write( - LOGFILE_TRACE, + LOGFILE_DEBUG, "%lu %d [poll_waitevents] " - "Read in fd %d", + "Read in dcb %p fd %d", pthread_self(), thread_id, + dcb, dcb->fd); atomic_add(&pollStats.n_read, 1); dcb->func.read(dcb); diff --git a/server/modules/include/mysql_client_server_protocol.h b/server/modules/include/mysql_client_server_protocol.h index 3eb96f7e4..c82444349 100644 --- a/server/modules/include/mysql_client_server_protocol.h +++ b/server/modules/include/mysql_client_server_protocol.h @@ -132,9 +132,6 @@ typedef struct mysql_session { } MYSQL_session; -/* MySQL states for authentication reply */ -#define MYSQL_FAILED_AUTHENTICATION 1 -#define MYSQL_SUCCESFUL_AUTHENTICATION 0 /* Protocol packing macros. */ #define gw_mysql_set_byte2(__buffer, __int) do { \ @@ -228,9 +225,9 @@ typedef enum /* Basic mysql commands */ #define MYSQL_COM_CHANGE_USER 0x11 -#define MYSQL_COM_QUIT 0x1 -#define MYSQL_COM_INIT_DB 0x2 -#define MYSQL_COM_QUERY 0x3 +#define MYSQL_COM_QUIT 0x1 +#define MYSQL_COM_INIT_DB 0x2 +#define MYSQL_COM_QUERY 0x3 #define MYSQL_GET_COMMAND(payload) (payload[4]) #define MYSQL_GET_PACKET_NO(payload) (payload[3]) @@ -242,14 +239,14 @@ void gw_mysql_close(MySQLProtocol **ptr); MySQLProtocol* mysql_protocol_init(DCB* dcb, int fd); MySQLProtocol *gw_mysql_init(MySQLProtocol *data); void gw_mysql_close(MySQLProtocol **ptr); -int gw_receive_backend_auth(MySQLProtocol *conn); -int gw_decode_mysql_server_handshake(MySQLProtocol *conn, uint8_t *payload); -int gw_read_backend_handshake(MySQLProtocol *conn); -int gw_send_authentication_to_backend( +bool gw_receive_backend_auth(MySQLProtocol *protocol); +int gw_decode_mysql_server_handshake(MySQLProtocol *protocol, uint8_t *payload); +int gw_read_backend_handshake(MySQLProtocol *protocol); +int gw_send_authentication_to_backend( char *dbname, char *user, uint8_t *passwd, - MySQLProtocol *conn); + MySQLProtocol *protocol); const char *gw_mysql_protocol_state2string(int state); int gw_do_connect_to_backend(char *host, int port, int* fd); int mysql_send_custom_error ( @@ -261,7 +258,7 @@ int gw_send_change_user_to_backend( char *dbname, char *user, uint8_t *passwd, - MySQLProtocol *conn); + MySQLProtocol *protocol); int gw_find_mysql_user_password_sha1( char *username, uint8_t *gateway_password, diff --git a/server/modules/protocol/mysql_backend.c b/server/modules/protocol/mysql_backend.c index 7e62539b8..a07d216b8 100644 --- a/server/modules/protocol/mysql_backend.c +++ b/server/modules/protocol/mysql_backend.c @@ -156,11 +156,16 @@ static int gw_read_backend_event(DCB *dcb) { /** return only with complete session */ current_session = gw_get_shared_session_auth_info(dcb); ss_dassert(current_session != NULL); - - /* fprintf(stderr, ">>> backend EPOLLIN from %i, command %i,protocol - * state [%s]\n", dcb->fd, dcb->command, gw_mysql_protocol_state2string - * (backend_protocol->state)); - */ + + skygw_log_write( + LOGFILE_DEBUG, + "%lu [gw_read_backend_event] Read dcb %p fd %d protocol " + "state %d, %s.", + pthread_self(), + dcb, + dcb->fd, + backend_protocol->state, + STRPROTOCOLSTATE(backend_protocol->state)); /* backend is connected: * @@ -176,30 +181,33 @@ static int gw_read_backend_event(DCB *dcb) { } else { // handshake decoded, send the auth credentials if (gw_send_authentication_to_backend( - current_session->db, - current_session->user, - current_session->client_sha1, - backend_protocol) != 0) + current_session->db, + current_session->user, + current_session->client_sha1, + backend_protocol) != 0) { backend_protocol->state = MYSQL_AUTH_FAILED; rc = 1; } else { - // next step is waiting server response with a new EPOLLIN event + /** + * next step is to wait server's response with + * a new EPOLLIN event + */ backend_protocol->state = MYSQL_AUTH_RECV; rc = 0; goto return_rc; } } } - /* * Now: * -- check the authentication reply from backend * OR * -- handle a previous handshake error */ - - if (backend_protocol->state == MYSQL_AUTH_RECV || backend_protocol->state == MYSQL_AUTH_FAILED) { + if (backend_protocol->state == MYSQL_AUTH_RECV || + backend_protocol->state == MYSQL_AUTH_FAILED) + { ROUTER_OBJECT *router = NULL; ROUTER *router_instance = NULL; void *rsession = NULL; @@ -211,98 +219,88 @@ static int gw_read_backend_event(DCB *dcb) { router = session->service->router; router_instance = session->service->router_instance; - /* if a MYSQL_AUTH_FAILED is detected, don't read from backend and set rv */ - if (backend_protocol->state == MYSQL_AUTH_FAILED) { - rv = MYSQL_FAILED_AUTHENTICATION; + if (backend_protocol->state == MYSQL_AUTH_RECV) { + /** + * Read backed auth reply + */ + if (!gw_receive_backend_auth(backend_protocol)) { + rv = -1; + backend_protocol->state = MYSQL_AUTH_FAILED; + } + } + + if (backend_protocol->state == MYSQL_AUTH_FAILED) { + skygw_log_write_flush( + LOGFILE_ERROR, + "%lu [gw_read_backend_event] " + "gw_receive_backend_auth failed. Fd %d, " + "user %s.", + pthread_self(), + dcb->fd, + current_session->user); + + + /* check the delayq before the reply */ + if (dcb->delayq) { + /* send an error to the client */ + mysql_send_custom_error( + dcb->session->client, + 1, + 0, + "Connection to backend lost right now"); + } + /** + * Protect call of closeSession. + */ + spinlock_acquire(&session->ses_lock); + rsession = session->router_session; + session->router_session = NULL; + spinlock_release(&session->ses_lock); + + if (rsession != NULL) { + skygw_log_write_flush( + LOGFILE_DEBUG, + "%lu [gw_read_backend_event] " + "Call closeSession for backend's " + "router client session.", + pthread_self()); + /* close router_session */ + router->closeSession(router_instance, rsession); + } else { + skygw_log_write( + LOGFILE_DEBUG, + "%lu [gw_read_backend_event] " + "closeSession already called " + "for backend session.", + pthread_self()); + } + rc = 1; + goto return_rc; } else { - /* yes, do read backed auth reply */ - rv = gw_receive_backend_auth(backend_protocol); - } + ss_dassert(backend_protocol->state == MYSQL_AUTH_RECV); + skygw_log_write_flush( + LOGFILE_DEBUG, + "%lu [gw_read_backend_event] " + "gw_receive_backend_auth succeed. Fd %d, " + "user %s.", + pthread_self(), + dcb->fd, + current_session->user); - switch (rv) { - case MYSQL_FAILED_AUTHENTICATION: - skygw_log_write_flush( - LOGFILE_ERROR, - "%lu [gw_read_backend_event] caught " - "MYSQL_FAILED_AUTHENTICATION from " - "gw_receive_backend_auth. Fd %d, " - "user %s.", - pthread_self(), - dcb->fd, - current_session->user); - - backend_protocol->state = MYSQL_AUTH_FAILED; - - /* check the delayq before the reply */ - if (dcb->delayq) { - /* send an error to the client */ - mysql_send_custom_error( - dcb->session->client, - 1, - 0, - "Connection to backend lost right now"); - } - - /** - * Protect call of closeSession. - */ - spinlock_acquire(&session->ses_lock); - rsession = session->router_session; - session->router_session = NULL; - spinlock_release(&session->ses_lock); - - if (rsession != NULL) { - skygw_log_write_flush( - LOGFILE_TRACE, - "%lu [gw_read_backend_event] " - "Call closeSession for backend " - "session.", - pthread_self()); - /* close the active session */ - router->closeSession(router_instance, - rsession); - } else { - skygw_log_write( - LOGFILE_TRACE, - "%lu [gw_read_backend_event] " - "closeSession already called " - "for backend session.", - pthread_self()); - } + spinlock_acquire(&dcb->authlock); + backend_protocol->state = MYSQL_IDLE; + /* check the delay queue and flush the data */ + if(dcb->delayq) { + backend_write_delayqueue(dcb); + spinlock_release(&dcb->authlock); rc = 1; - goto return_rc; - - case MYSQL_SUCCESFUL_AUTHENTICATION: - skygw_log_write_flush( - LOGFILE_TRACE, - "%lu [gw_read_backend_event] caught " - "MYSQL_SUCCESFUL_AUTHENTICATION from " - "gw_receive_backend_auth. Fd %d, " - "user %s.", - pthread_self(), - dcb->fd, - current_session->user); - - spinlock_acquire(&dcb->authlock); - backend_protocol->state = MYSQL_IDLE; - /* check the delay queue and flush the data */ - if(dcb->delayq) { - backend_write_delayqueue(dcb); - spinlock_release(&dcb->authlock); - rc = 1; - goto return_rc; - } - spinlock_release(&dcb->authlock); - rc = 0; - goto return_rc; - - default: - /* no other authentication state here right - * now, so just return */ - rc = 0; - goto return_rc; - } /**< switch (rv) */ - } /**< if (backend_protocol->state == MYSQL_AUTH_RECV) */ + goto return_rc; + } + spinlock_release(&dcb->authlock); + rc = 0; + goto return_rc; + } /* MYSQL_AUTH_FAILED */ + } /* MYSQL_AUTH_RECV || MYSQL_AUTH_FAILED */ /* reading MySQL command output from backend and writing to the client */ { @@ -317,10 +315,16 @@ static int gw_read_backend_event(DCB *dcb) { rc = dcb_read(dcb, &head); if (rc <= 0) { - rc = 1; + /** + * Backend generated EPOLLIN event and if there is + * nothing to read or backend failed, connection + * must be closed to avoid backend dcb from getting + * hanged. + */ + (dcb->func).close(dcb); + rc = 0; goto return_rc; } - router = session->service->router; router_instance = session->service->router_instance; rsession = session->router_session; @@ -364,9 +368,10 @@ return_rc: * EPOLLOUT handler for the MySQL Backend protocol module. * * @param dcb The descriptor control block - * @return The number of bytes written + * @return 1 in success, 0 in case of failure, */ static int gw_write_backend_event(DCB *dcb) { + int rc; MySQLProtocol *backend_protocol = dcb->protocol; //fprintf(stderr, ">>> backend EPOLLOUT %i, protocol state [%s]\n", backend_protocol->fd, gw_mysql_protocol_state2string(backend_protocol->state)); @@ -383,9 +388,8 @@ static int gw_write_backend_event(DCB *dcb) { 1, 0, "Writing to backend failed"); - - return 0; - + rc = 0; + goto return_rc; } /** * vraa: what is the logic in this? @@ -393,11 +397,23 @@ static int gw_write_backend_event(DCB *dcb) { if (backend_protocol->state == MYSQL_PENDING_CONNECT) { backend_protocol->state = MYSQL_CONNECTED; // spinlock_release(&dcb->connectlock); - return 1; + rc = 1; + goto return_rc; } // spinlock_release(&dcb->connectlock); dcb_drain_writeq(dcb); - return 1; + rc = 1; +return_rc: + skygw_log_write( + LOGFILE_TRACE, + "%lu [gw_write_backend_event] " + "wrote to dcb %p fd %d, return %d", + pthread_self(), + dcb, + dcb->fd, + rc); + + return rc; } /* @@ -417,21 +433,27 @@ gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue) * Don't write to backend if backend_dcb is not in poll set anymore. */ if (dcb->state != DCB_STATE_POLLING) { - mysql_send_custom_error( - dcb->session->client, - 1, - 0, - "Writing to backend failed"); - + skygw_log_write_flush( + LOGFILE_ERROR, + "%lu [gw_MySQLWrite_backend] Write to backend failed. " + "Backend dcb is %s.", + pthread_self(), + STRDCBSTATE(dcb->state)); return 0; - } spinlock_acquire(&dcb->authlock); - /** * Now put the incoming data to the delay queue unless backend is connected with auth ok */ if (backend_protocol->state != MYSQL_IDLE) { + skygw_log_write( + LOGFILE_DEBUG, + "%lu [gw_MySQLWrite_backend] dcb %p fd %d protocol state %s.", + pthread_self(), + dcb, + dcb->fd, + STRPROTOCOLSTATE(backend_protocol->state)); + //fprintf(stderr, ">>> Writing in the backend %i delay queue: last dcb command %i, queue command %i, protocol state [%s]\n", dcb->fd, dcb->command, queue->command, gw_mysql_protocol_state2string(dcb->state)); backend_set_delayqueue(dcb, queue); @@ -454,7 +476,7 @@ gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue) * */ static int gw_error_backend_event(DCB *dcb) { - SESSION *session = dcb->session; + SESSION *session; void *rsession; ROUTER_OBJECT *router; ROUTER *router_instance; @@ -466,7 +488,7 @@ static int gw_error_backend_event(DCB *dcb) { router = session->service->router; router_instance = session->service->router_instance; - + if (dcb->state != DCB_STATE_POLLING) { /** * if client is not available it needs to be handled in send @@ -487,29 +509,28 @@ static int gw_error_backend_event(DCB *dcb) { "Closed backend connection."); rc = 1; } - skygw_log_write_flush( LOGFILE_ERROR, "%lu [gw_error_backend_event] Some error occurred in backend. rc = %d", pthread_self(), rc); - + /* close the active session */ spinlock_acquire(&session->ses_lock); rsession = session->router_session; session->router_session = NULL; spinlock_release(&session->ses_lock); - + if (rsession != NULL) { - skygw_log_write_flush( - LOGFILE_TRACE, - "%lu [gw_read_backend_event] " - "Call closeSession for backend " - "session.", + skygw_log_write_flush( + LOGFILE_TRACE, + "%lu [gw_read_backend_event] " + "Call closeSession for backend " + "session.", pthread_self()); - + router->closeSession(router_instance, rsession); } - + return rc; } @@ -557,6 +578,7 @@ static int gw_create_backend_connection( switch (rv) { case 0: ss_dassert(fd > 0); + protocol->fd = fd; protocol->state = MYSQL_CONNECTED; skygw_log_write( LOGFILE_TRACE, @@ -573,6 +595,7 @@ static int gw_create_backend_connection( case 1: ss_dassert(fd > 0); protocol->state = MYSQL_PENDING_CONNECT; + protocol->fd = fd; skygw_log_write( LOGFILE_TRACE, "%lu [gw_create_backend_connection] Connection " @@ -679,9 +702,24 @@ static int backend_write_delayqueue(DCB *dcb) spinlock_release(&dcb->delayqlock); rc = dcb_write(dcb, localq); - return rc; -} + if (rc == 0) { + skygw_log_write_flush( + LOGFILE_ERROR, + "%lu [backend_write_delayqueue] Some error occurred in " + "backend.", + pthread_self()); + + mysql_send_custom_error( + dcb->session->client, + 1, + 0, + "Closed backend connection."); + dcb_close(dcb); + } + return rc; +} + static int gw_change_user(DCB *backend, SERVER *server, SESSION *in_session, GWBUF *queue) { diff --git a/server/modules/protocol/mysql_client.c b/server/modules/protocol/mysql_client.c index 884d3fca3..75811e68c 100644 --- a/server/modules/protocol/mysql_client.c +++ b/server/modules/protocol/mysql_client.c @@ -576,7 +576,7 @@ int gw_read_client_event(DCB* dcb) { //write to client mysql AUTH_OK packet, packet n. is 2 // start a new session, and connect to backends session = session_alloc(dcb->service, dcb); - + if (session != NULL) { CHK_SESSION(session); ss_dassert(session->state != SESSION_STATE_ALLOC); @@ -667,6 +667,14 @@ int gw_read_client_event(DCB* dcb) { * fprintf(stderr, "COM_QUIT received with * no connected backends from %i\n", dcb->fd); */ + skygw_log_write_flush( + LOGFILE_DEBUG, + "%lu [gw_read_client_event] Client read " + "COM_QUIT and rsession == NULL. Closing " + "client dcb %p.", + pthread_self(), + dcb); + (dcb->func).close(dcb); } else { /* Send a custom error as MySQL command reply */ @@ -683,6 +691,12 @@ int gw_read_client_event(DCB* dcb) { /* We can route the query */ /* COM_QUIT handling */ if (mysql_command == '\x01') { + skygw_log_write_flush( + LOGFILE_DEBUG, + "%lu [gw_read_client_event] Before routeQuery. " + "dcb %p.", + pthread_self(), + dcb); /** * fprintf(stderr, "COM_QUIT received from %i and * passed to backed\n", dcb->fd); @@ -690,6 +704,13 @@ int gw_read_client_event(DCB* dcb) { * fprintf(stderr, "<<< Routing the COM_QUIT ...\n"); */ router->routeQuery(router_instance, rsession, queue); + skygw_log_write_flush( + LOGFILE_DEBUG, + "%lu [gw_read_client_event] After routeQuery. " + "dcb %p.", + pthread_self(), + dcb); + /* close client connection */ (dcb->func).close(dcb); rc = 1; @@ -701,10 +722,18 @@ int gw_read_client_event(DCB* dcb) { /* writing in the backend buffer queue, via routeQuery */ //fprintf(stderr, "<<< Routing the Query ...\n"); - router->routeQuery(router_instance, - rsession, - queue); - protocol->state = MYSQL_WAITING_RESULT; + rc = router->routeQuery(router_instance, rsession, queue); + + if (rc == 1) { + protocol->state = MYSQL_WAITING_RESULT; + } else { + mysql_send_custom_error(dcb, + 1, + 0, + "Connection to backend lost."); + protocol->state = MYSQL_IDLE; + goto return_rc; + } } break; @@ -715,6 +744,14 @@ int gw_read_client_event(DCB* dcb) { rc = 0; return_rc: +#if defined(SS_DEBUG) + if (dcb->state == DCB_STATE_POLLING || + dcb->state == DCB_STATE_NOPOLLING || + dcb->state == DCB_STATE_ZOMBIE) + { + CHK_PROTOCOL(protocol); + } +#endif return rc; } @@ -766,6 +803,14 @@ int gw_write_client_event(DCB *dcb) } return_1: +#if defined(SS_DEBUG) + if (dcb->state == DCB_STATE_POLLING || + dcb->state == DCB_STATE_NOPOLLING || + dcb->state == DCB_STATE_ZOMBIE) + { + CHK_PROTOCOL(protocol); + } +#endif return 1; } @@ -867,20 +912,36 @@ int gw_MySQLListener( } +/** + * @node (write brief function description here) + * + * Parameters: + * @param listener - + * + * + * @return 0 in success, 1 in failure + * + * + * @details (write detailed description here) + * + */ int gw_MySQLAccept(DCB *listener) -{ +{ + int rc = 0; + DCB *client_dcb; + MySQLProtocol *protocol; + int c_sock; + struct sockaddr_in local; + socklen_t addrlen = sizeof(struct sockaddr_in); + int sendbuf = GW_BACKEND_SO_SNDBUF; + socklen_t optlen = sizeof(sendbuf); + int eno = 0; + int i = 0; + + CHK_DCB(listener); fprintf(stderr, "MySQL Listener socket is: %i\n", listener->fd); while (1) { - int c_sock; - struct sockaddr_in local; - socklen_t addrlen = sizeof(struct sockaddr_in); - DCB *client_dcb; - MySQLProtocol *protocol; - int sendbuf = GW_BACKEND_SO_SNDBUF; - socklen_t optlen = sizeof(sendbuf); - int eno = 0; - static int i; retry_accept: // new connection from client @@ -895,6 +956,7 @@ int gw_MySQLAccept(DCB *listener) if (eno == EAGAIN || eno == EWOULDBLOCK) { + rc = 1; /* We have processed all incoming connections. */ break; } @@ -915,7 +977,8 @@ int gw_MySQLAccept(DCB *listener) if (i<10) { goto retry_accept; } - goto return_to_poll; + rc = 1; + goto return_rc; } else if (eno == EMFILE) { @@ -934,7 +997,8 @@ int gw_MySQLAccept(DCB *listener) if (i<10) { goto retry_accept; } - goto return_to_poll; + rc = 1; + goto return_rc; } else { @@ -982,12 +1046,16 @@ int gw_MySQLAccept(DCB *listener) ss_dassert(protocol != NULL); if (protocol == NULL) { + /** delete client_dcb */ + dcb_close(client_dcb); + skygw_log_write_flush( LOGFILE_ERROR, "%lu [gw_MySQLAccept] Failed to create " "protocol object for client connection.", pthread_self()); - return 1; + rc = 1; + goto return_rc; } client_dcb->protocol = protocol; // assign function poiters to "func" field @@ -1005,6 +1073,9 @@ int gw_MySQLAccept(DCB *listener) */ if (poll_add_dcb(client_dcb) == -1) { + /** delete client_dcb */ + dcb_close(client_dcb); + /** Previous state is recovered in poll_add_dcb. */ skygw_log_write_flush( LOGFILE_ERROR, @@ -1013,7 +1084,8 @@ int gw_MySQLAccept(DCB *listener) pthread_self(), client_dcb, client_dcb->fd); - return 1; + rc = 1; + goto return_rc; } else { @@ -1026,8 +1098,15 @@ int gw_MySQLAccept(DCB *listener) client_dcb->fd); } } /**< while 1 */ -return_to_poll: - return 0; +#if defined(SS_DEBUG) + if (rc == 0) { + CHK_DCB(client_dcb); + protocol = (MySQLProtocol *)client_dcb->protocol; + CHK_PROTOCOL(protocol); + } +#endif +return_rc: + return rc; } /* @@ -1036,16 +1115,34 @@ static int gw_error_client_event(DCB *dcb) { /** * should this be removed if we don't want to execute it ? * - //fprintf(stderr, "#### Handle error function gw_error_client_event, for [%i] is [%s]\n", dcb->fd, gw_dcb_state2string(dcb->state)); + //fprintf(stderr, "#### Handle error function gw_error_client_event, for [%i] is [%s]\n", dcb->fd, gw_state2string(dcb->state)); //dcb_close(dcb); */ - +#if defined(SS_DEBUG) + MySQLProtocol* protocol = (MySQLProtocol *)dcb->protocol; + if (dcb->state == DCB_STATE_POLLING || + dcb->state == DCB_STATE_NOPOLLING || + dcb->state == DCB_STATE_ZOMBIE) + { + CHK_PROTOCOL(protocol); + } +#endif return 1; } static int gw_client_close(DCB *dcb) { +#if defined(SS_DEBUG) + MySQLProtocol* protocol = (MySQLProtocol *)dcb->protocol; + if (dcb->state == DCB_STATE_POLLING || + dcb->state == DCB_STATE_NOPOLLING || + dcb->state == DCB_STATE_ZOMBIE) + { + CHK_PROTOCOL(protocol); + } +#endif + dcb_close(dcb); return 1; } @@ -1061,6 +1158,15 @@ gw_client_close(DCB *dcb) static int gw_client_hangup_event(DCB *dcb) { +#if defined(SS_DEBUG) + MySQLProtocol* protocol = (MySQLProtocol *)dcb->protocol; + if (dcb->state == DCB_STATE_POLLING || + dcb->state == DCB_STATE_NOPOLLING || + dcb->state == DCB_STATE_ZOMBIE) + { + CHK_PROTOCOL(protocol); + } +#endif dcb_close(dcb); return 1; } diff --git a/server/modules/protocol/mysql_common.c b/server/modules/protocol/mysql_common.c index c7f38c1f3..adb72381b 100644 --- a/server/modules/protocol/mysql_common.c +++ b/server/modules/protocol/mysql_common.c @@ -290,34 +290,41 @@ int gw_decode_mysql_server_handshake(MySQLProtocol *conn, uint8_t *payload) { return 0; } + /** * Receive the MySQL authentication packet from backend, packet # is 2 * * @param conn The MySQL protocol structure - * @return 0 for user authenticated or 1 for authentication failed + * @return true if authentication succeed, false otherwise */ -int gw_receive_backend_auth(MySQLProtocol *conn) { - int rv = 1; +bool gw_receive_backend_auth( + MySQLProtocol *protocol) +{ int n = -1; - GWBUF *head = NULL; - DCB *dcb = conn->owner_dcb; + GWBUF *head = NULL; + DCB *dcb = protocol->owner_dcb; uint8_t *ptr = NULL; + bool succp = false; - if ((n = dcb_read(dcb, &head)) != -1) { - if (head) { - ptr = GWBUF_DATA(head); - // check if the auth is SUCCESFUL - if (ptr[4] == '\x00') { - // Auth is OK - rv = 0; - } else { - rv = 1; - } - // consume all the data here - head = gwbuf_consume(head, gwbuf_length(head)); - } - } - return rv; + n = dcb_read(dcb, &head); + + if (n != -1 && + head != NULL && + GWBUF_LENGTH(head) >= 5) + { + ptr = GWBUF_DATA(head); + /** + * 5th byte is 0x0 if successful. + */ + if (ptr[4] == '\x00') { + succp = true; + } + /** + * Remove data from buffer. + */ + head = gwbuf_consume(head, gwbuf_length(head)); + } + return succp; } /** diff --git a/server/modules/routing/readconnroute.c b/server/modules/routing/readconnroute.c index 8cfb165fa..21b09dfb1 100644 --- a/server/modules/routing/readconnroute.c +++ b/server/modules/routing/readconnroute.c @@ -263,10 +263,10 @@ int i, n; static void * newSession(ROUTER *instance, SESSION *session) { -ROUTER_INSTANCE *inst = (ROUTER_INSTANCE *)instance; -ROUTER_CLIENT_SES *client_ses; -BACKEND *candidate = NULL; -int i; +ROUTER_INSTANCE *inst = (ROUTER_INSTANCE *)instance; +ROUTER_CLIENT_SES *client_ses; +BACKEND *candidate = NULL; +int i; skygw_log_write_flush( LOGFILE_TRACE, @@ -277,8 +277,10 @@ int i; inst); - if ((client_ses = (ROUTER_CLIENT_SES *)malloc(sizeof(ROUTER_CLIENT_SES))) == NULL) { - return NULL; + client_ses = (ROUTER_CLIENT_SES *)malloc(sizeof(ROUTER_CLIENT_SES)); + + if (client_ses == NULL) { + return NULL; } /* * Find a backend server to connect to. This is the extent of the @@ -494,7 +496,7 @@ routeQuery(ROUTER *instance, void *router_session, GWBUF *queue) ROUTER_INSTANCE *inst = (ROUTER_INSTANCE *)instance; ROUTER_CLIENT_SES *session = (ROUTER_CLIENT_SES *)router_session; uint8_t *payload = GWBUF_DATA(queue); -int mysql_command = -1; +int mysql_command; int rc; inst->stats.n_queries++; @@ -515,6 +517,7 @@ int rc; session->backend_dcb, queue); } + CHK_PROTOCOL(((MySQLProtocol*)session->backend_dcb->protocol)); skygw_log_write( LOGFILE_DEBUG, "%lu [readconnroute:routeQuery] Routed command %d to dcb %p " diff --git a/utils/skygw_debug.h b/utils/skygw_debug.h index 9d8ed6909..0dff06c3a 100644 --- a/utils/skygw_debug.h +++ b/utils/skygw_debug.h @@ -158,6 +158,17 @@ typedef enum skygw_chk_t { ((s) == SESSION_STATE_LISTENER_STOPPED ? "SESSION_STATE_LISTENER_STOPPED" : \ "SESSION_STATE_UNKNOWN")))) +#define STRPROTOCOLSTATE(s) ((s) == MYSQL_ALLOC ? "MYSQL_ALLOC" : \ + ((s) == MYSQL_PENDING_CONNECT ? "MYSQL_PENDING_CONNECT" : \ + ((s) == MYSQL_CONNECTED ? "MYSQL_CONNECTED" : \ + ((s) == MYSQL_AUTH_SENT ? "MYSQL_AUTH_SENT" : \ + ((s) == MYSQL_AUTH_RECV ? "MYSQL_AUTH_RECV" : \ + ((s) == MYSQL_AUTH_FAILED ? "MYSQL_AUTH_FAILED" : \ + ((s) == MYSQL_IDLE ? "MYSQL_IDLE" : \ + ((s) == MYSQL_ROUTING ? "MYSQL_ROUTING" : \ + ((s) == MYSQL_WAITING_RESULT ? "MYSQL_WAITING_RESULT" : \ + ((s) == MYSQL_SESSION_CHANGE ? "MYSQL_SESSION_CHANGE" : \ + "UNKNOWN MYSQL STATE")))))))))) #define CHK_MLIST(l) { \ ss_info_dassert((l->mlist_chk_top == CHK_NUM_MLIST && \