From bbc9dcc9a3d6978db47903e0977f855471af22b3 Mon Sep 17 00:00:00 2001 From: vraatikka Date: Thu, 12 Sep 2013 22:17:11 +0300 Subject: [PATCH] poll.c: added maxscale thread id to log session.c: Replaced free(session->router_sesision) with call to freeSession callback users.c: Removed reference to uninitialized variable. router.h: Added freeSession callback to function block. mysql_backend.c: try to ensure that client dcb is still listening in epoll_wait when writing reply to it. mysql_common.c: assert debug build is mysql_protocol_init is called with dcb == NULL readconnroute.c, readwritesplit.c, debugcli.c and testroute.c : Added freeSession to function block and an inmplementation of it. --- server/core/poll.c | 35 +++-- server/core/session.c | 4 +- server/core/users.c | 1 - server/include/poll.h | 2 +- server/include/router.h | 1 + server/modules/protocol/mysql_backend.c | 95 +++++++++---- server/modules/routing/debugcli.c | 18 ++- server/modules/routing/readconnroute.c | 125 ++++++++++++------ .../routing/readwritesplit/readwritesplit.c | 23 +++- server/modules/routing/testroute.c | 18 ++- 10 files changed, 227 insertions(+), 95 deletions(-) diff --git a/server/core/poll.c b/server/core/poll.c index 4d94a91a1..2e37df3c8 100644 --- a/server/core/poll.c +++ b/server/core/poll.c @@ -199,8 +199,10 @@ poll_waitevents(void *arg) #else if (!no_op) { skygw_log_write(LOGFILE_TRACE, - "%lu [poll_waitevents] > epoll_wait <", - pthread_self()); + "%lu [poll_waitevents] MaxScale thread %d > " + "epoll_wait <", + pthread_self(), + thread_id); no_op = TRUE; } simple_mutex_lock(&epoll_wait_mutex, TRUE); @@ -251,27 +253,21 @@ poll_waitevents(void *arg) skygw_log_write( LOGFILE_TRACE, - "%lu [poll_waitevents] event %d", + "%lu %d [poll_waitevents] event %d dcb %p", pthread_self(), - ev); + thread_id, + ev, + dcb); if (ev & EPOLLERR) { atomic_add(&pollStats.n_error, 1); dcb->func.error(dcb); - - if (DCB_ISZOMBIE(dcb)) { - continue; - } - } + } if (ev & EPOLLHUP) { atomic_add(&pollStats.n_hup, 1); dcb->func.hangup(dcb); - - if (DCB_ISZOMBIE(dcb)) { - continue; - } } if (ev & EPOLLOUT) { @@ -282,9 +278,10 @@ poll_waitevents(void *arg) dcb->dcb_write_active = TRUE; skygw_log_write( LOGFILE_TRACE, - "%lu [poll_waitevents] " - "Write in fd %d", + "%lu %d [poll_waitevents] " + "Write in fd %d", pthread_self(), + thread_id, dcb->fd); atomic_add(&pollStats.n_write, 1); dcb->func.write_ready(dcb); @@ -303,9 +300,10 @@ poll_waitevents(void *arg) { skygw_log_write( LOGFILE_TRACE, - "%lu [poll_waitevents] " + "%lu %d [poll_waitevents] " "Accept in fd %d", pthread_self(), + thread_id, dcb->fd); atomic_add(&pollStats.n_accept, 1); dcb->func.accept(dcb); @@ -314,9 +312,10 @@ poll_waitevents(void *arg) { skygw_log_write( LOGFILE_TRACE, - "%lu [poll_waitevents] " - "Read in fd %d", + "%lu %d [poll_waitevents] " + "Read in fd %d", pthread_self(), + thread_id, dcb->fd); atomic_add(&pollStats.n_read, 1); dcb->func.read(dcb); diff --git a/server/core/session.c b/server/core/session.c index e27ab282c..04021475c 100644 --- a/server/core/session.c +++ b/server/core/session.c @@ -205,7 +205,9 @@ bool session_free( /* Free router_session and session */ if (session->router_session) { - free(session->router_session); + session->service->router->freeSession( + session->service->router_instance, + session->router_session); } free(session); succp = true; diff --git a/server/core/users.c b/server/core/users.c index 23dfc1ef2..bcb894763 100644 --- a/server/core/users.c +++ b/server/core/users.c @@ -114,7 +114,6 @@ int del; atomic_add(&users->stats.n_deletes, 1); if (users->stats.n_entries == 1) { - atomic_add(&users->stats.n_entries, del * -1); return 0; } del = hashtable_delete(users->data, user); diff --git a/server/include/poll.h b/server/include/poll.h index bd5b1762b..e19be9c94 100644 --- a/server/include/poll.h +++ b/server/include/poll.h @@ -32,7 +32,7 @@ * @endverbatim */ #define MAX_EVENTS 1000 -#define EPOLL_TIMEOUT 1000 /**< The epoll timeout we use (milliseconds) */ +#define EPOLL_TIMEOUT 1000 /**< The epoll timeout in milliseconds */ extern void poll_init(); extern int poll_add_dcb(DCB *); diff --git a/server/include/router.h b/server/include/router.h index fdeb6a27e..208b2f5df 100644 --- a/server/include/router.h +++ b/server/include/router.h @@ -66,6 +66,7 @@ typedef struct router_object { ROUTER *(*createInstance)(SERVICE *service, char **options); void *(*newSession)(ROUTER *instance, SESSION *session); void (*closeSession)(ROUTER *instance, void *router_session); + void (*freeSession)(ROUTER *instance, void *router_session); int (*routeQuery)(ROUTER *instance, void *router_session, GWBUF *queue); void (*diagnostics)(ROUTER *instance, DCB *dcb); void (*clientReply)(ROUTER* instance, void* router_session, GWBUF* queue, DCB *backend_dcb); diff --git a/server/modules/protocol/mysql_backend.c b/server/modules/protocol/mysql_backend.c index 4076f2e5e..abdecedde 100644 --- a/server/modules/protocol/mysql_backend.c +++ b/server/modules/protocol/mysql_backend.c @@ -152,9 +152,6 @@ static int gw_read_backend_event(DCB *dcb) { ss_info_dassert(dcb->session != NULL, "Backend dcb doesn't have session"); - ss_info_dassert(dcb->session->client != NULL, - "Session's client dcb pointer is NULL"); - client_protocol = SESSION_PROTOCOL(dcb->session, MySQLProtocol); backend_protocol = (MySQLProtocol *) dcb->protocol; /** return only with complete session */ @@ -225,11 +222,6 @@ static int gw_read_backend_event(DCB *dcb) { current_session->user); backend_protocol->state = MYSQL_AUTH_FAILED; -#if 0 - /** vraa : this traps easily. Why? */ - ss_dassert(backend_protocol->state != - MYSQL_AUTH_FAILED); -#endif /* send an error to the client */ mysql_send_custom_error( dcb->session->client, @@ -327,26 +319,30 @@ static int gw_read_backend_event(DCB *dcb) { */ /** - * If dcb->session->client is freed already it may be NULL, and - * protocol can't be read. However, then it wouldn't be possible - * that there was anything to write to client in that case. - * Should this be protected somehow, anyway? + * If dcb->session->client is freed already it may be NULL. */ - client_protocol = SESSION_PROTOCOL(dcb->session, MySQLProtocol); - CHK_PROTOCOL(client_protocol); - - if (client_protocol != NULL && - (client_protocol->state == MYSQL_WAITING_RESULT || - client_protocol->state == MYSQL_IDLE)) - { - router->clientReply(router_instance, rsession, head, dcb); - rc = 1; + if (dcb->session->client != NULL) { + client_protocol = SESSION_PROTOCOL(dcb->session, MySQLProtocol); } - goto return_rc; - } - rc = 0; + + if (client_protocol != NULL) { + CHK_PROTOCOL(client_protocol); + + if (client_protocol->state == MYSQL_WAITING_RESULT || + client_protocol->state == MYSQL_IDLE) + { + router->clientReply(router_instance, + rsession, + head, + dcb); + rc = 1; + } + goto return_rc; + } + } + return_rc: - return rc; + return rc; } /* @@ -361,6 +357,19 @@ static int gw_write_backend_event(DCB *dcb) { //fprintf(stderr, ">>> backend EPOLLOUT %i, protocol state [%s]\n", backend_protocol->fd, gw_mysql_protocol_state2string(backend_protocol->state)); // spinlock_acquire(&dcb->connectlock); + /** + * Don't write to backend if backend_dcb is not in poll set anymore. + */ + if (dcb->state != DCB_STATE_POLLING) { + mysql_send_custom_error( + dcb->session->client, + 1, + 0, + "Writing to backend failed"); + + return 0; + + } /** * vraa: what is the logic in this? */ @@ -386,6 +395,19 @@ gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue) { MySQLProtocol *backend_protocol = dcb->protocol; + /** + * Don't write to backend if backend_dcb is not in poll set anymore. + */ + if (dcb->state != DCB_STATE_POLLING) { + mysql_send_custom_error( + dcb->session->client, + 1, + 0, + "Writing to backend failed"); + + return 0; + + } spinlock_acquire(&dcb->authlock); /** @@ -414,11 +436,30 @@ gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue) * */ static int gw_error_backend_event(DCB *dcb) { - +/* fprintf(stderr, ">>> Handle Backend error function for %i\n", dcb->fd); +*/ + if (dcb->state != DCB_STATE_POLLING) { + mysql_send_custom_error( + dcb->session->client, + 1, + 0, + "Writing to backend failed."); + + return 0; + } + skygw_log_write_flush( + LOGFILE_ERROR, + "%lu [gw_error_backend_event] Some error occurred in backend.", + pthread_self()); + mysql_send_custom_error( + dcb->session->client, + 1, + 0, + "Closed backend connection."); dcb_close(dcb); - + return 1; } diff --git a/server/modules/routing/debugcli.c b/server/modules/routing/debugcli.c index fa605b23a..f168474f1 100644 --- a/server/modules/routing/debugcli.c +++ b/server/modules/routing/debugcli.c @@ -49,11 +49,20 @@ static char *version_str = "V1.0.1"; static ROUTER *createInstance(SERVICE *service, char **options); static void *newSession(ROUTER *instance, SESSION *session); static void closeSession(ROUTER *instance, void *router_session); +static void freeSession(ROUTER *instance, void *router_session); static int execute(ROUTER *instance, void *router_session, GWBUF *queue); static void diagnostics(ROUTER *instance, DCB *dcb); /** The module object definition */ -static ROUTER_OBJECT MyObject = { createInstance, newSession, closeSession, execute, diagnostics, NULL }; +static ROUTER_OBJECT MyObject = { + createInstance, + newSession, + closeSession, + freeSession, + execute, + diagnostics, + NULL +}; extern int execute_cmd(CLI_SESSION *cli); @@ -199,6 +208,13 @@ CLI_SESSION *session = (CLI_SESSION *)router_session; */ } +static void freeSession( + ROUTER* router_instance, + void* router_client_session) +{ + return; +} + /** * We have data from the client, we must route it to the backend. * This is simply a case of sending it to the connection that was diff --git a/server/modules/routing/readconnroute.c b/server/modules/routing/readconnroute.c index 68f3eddec..be246ed0a 100644 --- a/server/modules/routing/readconnroute.c +++ b/server/modules/routing/readconnroute.c @@ -88,6 +88,7 @@ static char *version_str = "V1.0.2"; static ROUTER *createInstance(SERVICE *service, char **options); static void *newSession(ROUTER *instance, SESSION *session); static void closeSession(ROUTER *instance, void *router_session); +static void freeSession(ROUTER *instance, void *router_session); static int routeQuery(ROUTER *instance, void *router_session, GWBUF *queue); static void diagnostics(ROUTER *instance, DCB *dcb); static void clientReply( @@ -101,6 +102,7 @@ static ROUTER_OBJECT MyObject = { createInstance, newSession, closeSession, + freeSession, routeQuery, diagnostics, clientReply @@ -393,6 +395,69 @@ int i; return (void *)client_ses; } +/** + * @node Unlink from backend server, unlink from router's connection list, + * and free memory of a router client session. + * + * Parameters: + * @param router - + * + * + * @param router_cli_ses - + * + * + * @return void + * + * + * @details (write detailed description here) + * + */ +static void freeSession( + ROUTER* router_instance, + void* router_client_ses) +{ + ROUTER_INSTANCE* router = (ROUTER_INSTANCE *)router_instance; + ROUTER_CLIENT_SES* router_cli_ses = + (ROUTER_CLIENT_SES *)router_client_ses; + int prev_val; + + prev_val = atomic_add(&router_cli_ses->backend->current_connection_count, -1); + ss_dassert(prev_val > 0); + + atomic_add(&router_cli_ses->backend->server->stats.n_current, -1); + spinlock_acquire(&router->lock); + + if (router->connections == router_cli_ses) { + router->connections = router_cli_ses->next; + } else { + ROUTER_CLIENT_SES *ptr = router->connections; + + while (ptr != NULL && ptr->next != router_cli_ses) { + ptr = ptr->next; + } + + if (ptr != NULL) { + ptr->next = router_cli_ses->next; + } + } + spinlock_release(&router->lock); + + skygw_log_write_flush( + LOGFILE_TRACE, + "%lu [freeSession] Unlinked router_client_session %p from " + "router %p and form server on port %d. Connections : %d " + "session %p.", + pthread_self(), + router_cli_ses, + router, + router_cli_ses->backend->server->port, + prev_val-1, + router_cli_ses->backend_dcb->session); + + free(router_cli_ses); +} + + /** * Close a session with the router, this is the mechanism * by which a router may cleanup data structure etc. @@ -410,34 +475,7 @@ bool succp = false; /* * Close the connection to the backend */ - skygw_log_write_flush( - LOGFILE_TRACE, - "%lu [closeSession] closing session with " - "router_session " - "%p, and inst %p.", - pthread_self(), - router_ses, - router_inst); router_ses->backend_dcb->func.close(router_ses->backend_dcb); - atomic_add(&router_ses->backend->current_connection_count, -1); - atomic_add(&router_ses->backend->server->stats.n_current, -1); - spinlock_acquire(&router_inst->lock); - - if (router_inst->connections == router_ses) - router_inst->connections = router_ses->next; - else - { - ROUTER_CLIENT_SES *ptr = router_inst->connections; - while (ptr && ptr->next != router_ses) - ptr = ptr->next; - if (ptr) - ptr->next = router_ses->next; - } - spinlock_release(&router_inst->lock); - /** - * Router session is freed in session.c:session_close, when session who - * owns it, is freed. - */ } /** @@ -457,23 +495,34 @@ ROUTER_INSTANCE *inst = (ROUTER_INSTANCE *)instance; ROUTER_CLIENT_SES *session = (ROUTER_CLIENT_SES *)router_session; uint8_t *payload = GWBUF_DATA(queue); int mysql_command = -1; +int rc; inst->stats.n_queries++; mysql_command = MYSQL_GET_COMMAND(payload); switch(mysql_command) { - case MYSQL_COM_CHANGE_USER: - return session->backend_dcb->func.auth( - session->backend_dcb, - NULL, - session->backend_dcb->session, - queue); - default: - return session->backend_dcb->func.write( - session->backend_dcb, - queue); + case MYSQL_COM_CHANGE_USER: + rc = session->backend_dcb->func.auth( + session->backend_dcb, + NULL, + session->backend_dcb->session, + queue); + default: + rc = session->backend_dcb->func.write( + session->backend_dcb, + queue); } + skygw_log_write( + LOGFILE_DEBUG, + "%lu [readconnroute:routeQuery] Routed command %d to dcb %p " + "with return value %d.", + pthread_self(), + mysql_command, + session->backend_dcb, + rc); + + return rc; } /** @@ -530,4 +579,4 @@ clientReply( client->func.write(client, queue); } -/// + diff --git a/server/modules/routing/readwritesplit/readwritesplit.c b/server/modules/routing/readwritesplit/readwritesplit.c index b0ea41a00..b54924963 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.c +++ b/server/modules/routing/readwritesplit/readwritesplit.c @@ -55,18 +55,21 @@ static char *version_str = "V1.0.2"; static ROUTER* createInstance(SERVICE *service, char **options); static void* newSession(ROUTER *instance, SESSION *session); static void closeSession(ROUTER *instance, void *session); +static void freeSession(ROUTER *instance, void *session); static int routeQuery(ROUTER *instance, void *session, GWBUF *queue); static void diagnostic(ROUTER *instance, DCB *dcb); static void clientReply(ROUTER* instance, void* router_session, GWBUF* queue, DCB *backend_dcb); -static ROUTER_OBJECT MyObject = -{ createInstance, - newSession, - closeSession, - routeQuery, - diagnostic, - clientReply }; +static ROUTER_OBJECT MyObject = { + createInstance, + newSession, + closeSession, + freeSession, + routeQuery, + diagnostic, + clientReply +}; static SPINLOCK instlock; static INSTANCE* instances; @@ -365,6 +368,12 @@ static void closeSession( free(session); } +static void freeSession( + ROUTER* router_instance, + void* router_client_session) +{ + return; +} /** * The main routing entry, this is called with every packet that is diff --git a/server/modules/routing/testroute.c b/server/modules/routing/testroute.c index 21324ba68..e4e9fb9e2 100644 --- a/server/modules/routing/testroute.c +++ b/server/modules/routing/testroute.c @@ -23,10 +23,20 @@ static char *version_str = "V1.0.0"; static ROUTER *createInstance(SERVICE *service, char **options); static void *newSession(ROUTER *instance, SESSION *session); static void closeSession(ROUTER *instance, void *session); +static void freeSession(ROUTER *instance, void *session); static int routeQuery(ROUTER *instance, void *session, GWBUF *queue); static void diagnostic(ROUTER *instance, DCB *dcb); -static ROUTER_OBJECT MyObject = { createInstance, newSession, closeSession, routeQuery, diagnostic, NULL }; + +static ROUTER_OBJECT MyObject = { + createInstance, + newSession, + closeSession, + freeSession, + routeQuery, + diagnostic, + NULL +}; /** * Implementation of the mandatory version entry point @@ -104,6 +114,12 @@ closeSession(ROUTER *instance, void *session) { } +static void freeSession( + ROUTER* router_instance, + void* router_client_session) +{ + return; +} static int routeQuery(ROUTER *instance, void *session, GWBUF *queue)