From 8be4aba2230e0c56b53bfb4c94fe5fbeb5b7b925 Mon Sep 17 00:00:00 2001 From: VilhoRaatikka Date: Thu, 8 May 2014 23:17:35 +0300 Subject: [PATCH] Added new state to SESSION: SESSION_STATE_STOPPING, which is set in protocol module before calling closeSession (router). THe new state tells that session is closing and DCBs included may not be polling anymore. Fixed some crash scenarios. --- server/core/dbusers.c | 15 ++- server/core/dcb.c | 12 +- server/core/session.c | 19 +++ server/include/session.h | 4 +- server/modules/protocol/mysql_backend.c | 56 +++++--- server/modules/protocol/mysql_client.c | 96 +++----------- server/modules/protocol/mysql_common.c | 2 +- .../routing/readwritesplit/readwritesplit.c | 125 ++++++++++++++++-- 8 files changed, 216 insertions(+), 113 deletions(-) diff --git a/server/core/dbusers.c b/server/core/dbusers.c index e5289a3a0..b89f831de 100644 --- a/server/core/dbusers.c +++ b/server/core/dbusers.c @@ -217,15 +217,26 @@ getUsers(SERVICE *service, struct users *users) */ server = service->databases; dpwd = decryptPassword(service_passwd); - while (server != NULL && mysql_real_connect(con, + while (server != NULL && (mysql_real_connect(con, server->name, service_user, dpwd, NULL, server->port, NULL, - 0) == NULL) + 0) == NULL)) { + if (server == NULL) + { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Unable to connect to %s:%d, \"%s\"", + server->name, + server->port, + mysql_error(con)))); + mysql_close(con); + return -1; + } server = server->nextdb; } free(dpwd); diff --git a/server/core/dcb.c b/server/core/dcb.c index 88703a86e..c1030faf2 100644 --- a/server/core/dcb.c +++ b/server/core/dcb.c @@ -688,11 +688,20 @@ dcb_write(DCB *dcb, GWBUF *queue) ss_dassert(queue != NULL); + /** + * SESSION_STATE_STOPPING means that one of the backends is closing + * the router session. Some backends may have not completed + * authentication yet and thus they have no information about router + * being closed. Session state is changed to SESSION_STATE_STOPPING + * before router's closeSession is called and that tells that DCB may + * still be writable. + */ if (queue == NULL || (dcb->state != DCB_STATE_ALLOC && dcb->state != DCB_STATE_POLLING && dcb->state != DCB_STATE_LISTENING && - dcb->state != DCB_STATE_NOPOLLING)) + dcb->state != DCB_STATE_NOPOLLING && + dcb->session->state != SESSION_STATE_STOPPING)) { LOGIF(LD, (skygw_log_write( LOGFILE_DEBUG, @@ -703,6 +712,7 @@ dcb_write(DCB *dcb, GWBUF *queue) dcb, STRDCBSTATE(dcb->state), dcb->fd))); + ss_dassert(false); return 0; } diff --git a/server/core/session.c b/server/core/session.c index fd880939e..bab835140 100644 --- a/server/core/session.c +++ b/server/core/session.c @@ -129,6 +129,10 @@ session_alloc(SERVICE *service, DCB *client_dcb) session); if (session->router_session == NULL) { + /** + * Inform other threads that session is closing. + */ + session->state == SESSION_STATE_STOPPING; /*< * Decrease refcount, set dcb's session pointer NULL * and set session pointer to NULL. @@ -439,3 +443,18 @@ session_state(int state) return "Invalid State"; } } + +SESSION* get_session_by_router_ses( + void* rses) +{ + SESSION* ses = allSessions; + + while (ses->router_session != rses && ses->next != NULL) + ses = ses->next; + + if (ses->router_session != rses) + { + ses = NULL; + } + return ses; +} \ No newline at end of file diff --git a/server/include/session.h b/server/include/session.h index 6cafa160d..e301fa1b3 100644 --- a/server/include/session.h +++ b/server/include/session.h @@ -53,6 +53,7 @@ typedef enum { SESSION_STATE_ALLOC, /*< for all sessions */ SESSION_STATE_READY, /*< for router session */ SESSION_STATE_ROUTER_READY, /*< for router session */ + SESSION_STATE_STOPPING, /*< router is being closed */ SESSION_STATE_LISTENER, /*< for listener session */ SESSION_STATE_LISTENER_STOPPED, /*< for listener session */ SESSION_STATE_FREE /*< for all sessions */ @@ -93,4 +94,5 @@ void dprintAllSessions(struct dcb *); void dprintSession(struct dcb *, SESSION *); char *session_state(int); bool session_link_dcb(SESSION *, struct dcb *); -#endif +SESSION* get_session_by_router_ses(void* rses); +#endif \ No newline at end of file diff --git a/server/modules/protocol/mysql_backend.c b/server/modules/protocol/mysql_backend.c index acc2dacdd..d5fc4b04c 100644 --- a/server/modules/protocol/mysql_backend.c +++ b/server/modules/protocol/mysql_backend.c @@ -306,12 +306,14 @@ static int gw_read_backend_event(DCB *dcb) { /* try reload users' table for next connection */ service_refresh_users(dcb->session->client->service); - while (session->state != SESSION_STATE_ROUTER_READY) + while (session->state != SESSION_STATE_ROUTER_READY && + session->state != SESSION_STATE_STOPPING) { ss_dassert( session->state == SESSION_STATE_READY || session->state == - SESSION_STATE_ROUTER_READY); + SESSION_STATE_ROUTER_READY || + session->state == SESSION_STATE_STOPPING); /** * Session shouldn't be NULL at this point * anymore. Just checking.. @@ -323,6 +325,15 @@ static int gw_read_backend_event(DCB *dcb) { } usleep(1); } + + if (session->state == SESSION_STATE_STOPPING) + { + goto return_with_lock; + } + spinlock_acquire(&session->ses_lock); + session->state = SESSION_STATE_STOPPING; + spinlock_release(&session->ses_lock); + /** * rsession shouldn't be NULL since session * state indicates that it was initialized @@ -357,8 +368,7 @@ static int gw_read_backend_event(DCB *dcb) { /* check the delay queue and flush the data */ if (dcb->delayq) { - backend_write_delayqueue(dcb); - rc = 1; + rc = backend_write_delayqueue(dcb); goto return_with_lock; } } @@ -567,9 +577,7 @@ gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue) snprintf(str, len+1, "%s", startpoint); LOGIF(LE, (skygw_log_write_flush( LOGFILE_ERROR, - "Error : Routing query \"%s\" failed due to " - "authentication failure.", - str))); + "Error : Authentication to backend failed."))); /** Consume query buffer */ while ((queue = gwbuf_consume( queue, @@ -667,6 +675,10 @@ static int gw_error_backend_event(DCB *dcb) { if (session->state == SESSION_STATE_ROUTER_READY) { + spinlock_acquire(&session->ses_lock); + session->state = SESSION_STATE_STOPPING; + spinlock_release(&session->ses_lock); + rsession = session->router_session; /*< * rsession should never be NULL here. @@ -847,34 +859,36 @@ static int backend_write_delayqueue(DCB *dcb) spinlock_acquire(&dcb->delayqlock); + if (dcb->delayq == NULL) + { + spinlock_release(&dcb->delayqlock); + rc = 1; + } + else + { localq = dcb->delayq; dcb->delayq = NULL; - spinlock_release(&dcb->delayqlock); rc = dcb_write(dcb, localq); + } if (rc == 0) { - /*< vraa : errorHandle */ - /** - * This error can be muted because it is often due - * unexpected dcb state which means that concurrent thread - * already wrote the queue and closed dcb. - */ -#if 0 LOGIF(LE, (skygw_log_write_flush( LOGFILE_ERROR, - "%lu [backend_write_delayqueue] Some error occurred in " - "backend.", - pthread_self()))); -#endif + "Error : failed to write buffered data to back-end " + "server. Buffer was empty of back-end was disconnected " + "during operation."))); + mysql_send_custom_error( dcb->session->client, 1, 0, - "Unable to write to backend server. Connection was " - "closed."); + "Failed to write buffered data to back-end server. " + "Buffer was empty or back-end was disconnected during " + "operation."); dcb_close(dcb); } + return rc; } diff --git a/server/modules/protocol/mysql_client.c b/server/modules/protocol/mysql_client.c index 1e394d7d3..67b645094 100644 --- a/server/modules/protocol/mysql_client.c +++ b/server/modules/protocol/mysql_client.c @@ -540,7 +540,7 @@ int gw_read_client_event(DCB* dcb) { goto return_rc; } - // close client socket and the sessioA too + // close client socket and the session too dcb->func.close(dcb); } else { // do nothing if reading 1 byte @@ -731,12 +731,8 @@ int gw_read_client_event(DCB* dcb) { "backend. Close client dcb %p", pthread_self(), dcb))); - - /** close client connection */ - (dcb->func).close(dcb); - /** close backends connection */ - router->closeSession(router_instance, rsession); - rc = 1; + /** close client connection, closes router session too */ + rc = dcb->func.close(dcb); } else { @@ -1217,37 +1213,16 @@ return_rc: return rc; } -static int gw_error_client_event(DCB *dcb) { - SESSION* session; - ROUTER_OBJECT* router; - void* router_instance; - void* rsession; - -#if defined(SS_DEBUG) - MySQLProtocol* protocol = (MySQLProtocol *)dcb->protocol; - if (dcb->state == DCB_STATE_POLLING || - dcb->state == DCB_STATE_NOPOLLING || - dcb->state == DCB_STATE_ZOMBIE) - { - CHK_PROTOCOL(protocol); - } -#endif - - session = dcb->session; - - /** - * session may be NULL if session_alloc failed. - * In that case router session was not created. - */ - if (session != NULL) { - CHK_SESSION(session); - router = session->service->router; - router_instance = session->service->router_instance; - rsession = session->router_session; - router->closeSession(router_instance, rsession); - } - dcb_close(dcb); - return 1; +static int gw_error_client_event( + DCB* dcb) +{ + int rc; + + CHK_DCB(dcb); + + rc = dcb->func.close(dcb); + + return rc; } static int @@ -1274,6 +1249,10 @@ gw_client_close(DCB *dcb) */ if (session != NULL) { CHK_SESSION(session); + spinlock_acquire(&session->ses_lock); + session->state = SESSION_STATE_STOPPING; + spinlock_release(&session->ses_lock); + router = session->service->router; router_instance = session->service->router_instance; rsession = session->router_session; @@ -1296,43 +1275,12 @@ gw_client_close(DCB *dcb) static int gw_client_hangup_event(DCB *dcb) { - SESSION* session; - ROUTER_OBJECT* router; - void* router_instance; - void* rsession; - int rc = 1; - - #if defined(SS_DEBUG) - MySQLProtocol* protocol = (MySQLProtocol *)dcb->protocol; - if (dcb->state == DCB_STATE_POLLING || - dcb->state == DCB_STATE_NOPOLLING || - dcb->state == DCB_STATE_ZOMBIE) - { - CHK_PROTOCOL(protocol); - } -#endif - CHK_DCB(dcb); + int rc; - if (dcb->state != DCB_STATE_POLLING) { - goto return_rc; - } - - session = dcb->session; - /** - * session may be NULL if session_alloc failed. - * In that case router session was not created. - */ - if (session != NULL) { - CHK_SESSION(session); - router = session->service->router; - router_instance = session->service->router_instance; - rsession = session->router_session; - router->closeSession(router_instance, rsession); - } - - dcb_close(dcb); -return_rc: - return rc; + CHK_DCB(dcb); + rc = dcb->func.close(dcb); + + return rc; } diff --git a/server/modules/protocol/mysql_common.c b/server/modules/protocol/mysql_common.c index 354bb4e03..81911a9b2 100644 --- a/server/modules/protocol/mysql_common.c +++ b/server/modules/protocol/mysql_common.c @@ -605,7 +605,7 @@ int gw_do_connect_to_backend( LOGIF(LE, (skygw_log_write_flush( LOGFILE_ERROR, "Error: Establishing connection to backend server " - "%s:%d failed. Socket creation failed due " + "%s:%d failed.\n\t\t Socket creation failed due " "%d, %s.", host, port, diff --git a/server/modules/routing/readwritesplit/readwritesplit.c b/server/modules/routing/readwritesplit/readwritesplit.c index 965f1af01..02bdcc903 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.c +++ b/server/modules/routing/readwritesplit/readwritesplit.c @@ -503,6 +503,7 @@ static void* newSession( { /** log this */ free(client_rses); + free(backend_ref); client_rses = NULL; goto return_rses; } @@ -609,7 +610,7 @@ static void closeSession( backend_ref_t* backend_ref; router_cli_ses = (ROUTER_CLIENT_SES *)router_session; - CHK_CLIENT_RSES(router_cli_ses); + CHK_CLIENT_RSES(router_cli_ses); backend_ref = router_cli_ses->rses_backend_ref; /** @@ -618,8 +619,17 @@ static void closeSession( if (!router_cli_ses->rses_closed && rses_begin_locked_router_action(router_cli_ses)) { - DCB* dcbs[router_cli_ses->rses_nbackends]; int i = 0; + /** + * session must be moved to SESSION_STATE_STOPPING state before + * router session is closed. + */ +#if defined(SS_DEBUG) + SESSION* ses = get_session_by_router_ses((void*)router_cli_ses); + + ss_dassert(ses != NULL); + ss_dassert(ses->state == SESSION_STATE_STOPPING); +#endif /** * This sets router closed. Nobody is allowed to use router @@ -629,17 +639,18 @@ static void closeSession( for (i=0; irses_nbackends; i++) { + DCB* dcb = backend_ref[i].bref_dcb; + /** decrease server current connection counters */ atomic_add(&backend_ref[i].bref_backend->backend_server->stats.n_current, -1); - + /** Close those which had been connected */ - if (backend_ref[i].bref_dcb != NULL) + if (dcb != NULL) { - CHK_DCB(backend_ref[i].bref_dcb); - dcbs[i] = backend_ref[i].bref_dcb; + CHK_DCB(dcb); backend_ref[i].bref_dcb = (DCB *)0xdeadbeef; /*< prevent new uses of DCB */ - dcbs[i]->func.close(dcbs[i]); + dcb->func.close(dcb); } } /** Unlock */ @@ -743,6 +754,29 @@ static bool get_dcb( succp = true; } } + + if (!succp) + { + backend_ref = rses->rses_master_ref; + + if (backend_ref->bref_dcb != NULL) + { + *p_dcb = backend_ref->bref_dcb; + succp = true; + + ss_dassert( + SERVER_IS_MASTER(backend_ref->bref_backend->backend_server) && + smallest_nconn == -1); + + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Warning : No slaves connected nor " + "available. Choosing master %s:%d " + "instead.", + backend_ref->bref_backend->backend_server->name, + backend_ref->bref_backend->backend_server->port))); + } + } ss_dassert(succp); } else if (btype == BE_MASTER || BE_JOINED) @@ -962,16 +996,29 @@ static int routeQuery( if (succp) { + /** Lock router session */ + if (!rses_begin_locked_router_action(router_cli_ses)) + { + goto return_ret; + } + if ((ret = slave_dcb->func.write(slave_dcb, querybuf)) == 1) { atomic_add(&inst->stats.n_slave, 1); } - ss_dassert(ret == 1); + else + { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Routing query \"%s\" failed.", + querystr))); + } + rses_end_locked_router_action(router_cli_ses); } ss_dassert(succp); goto return_ret; - } - else + } + else { bool succp = true; @@ -998,10 +1045,17 @@ static int routeQuery( } if (succp) { + /** Lock router session */ + if (!rses_begin_locked_router_action(router_cli_ses)) + { + goto return_ret; + } + if ((ret = master_dcb->func.write(master_dcb, querybuf)) == 1) { atomic_add(&inst->stats.n_master, 1); } + rses_end_locked_router_action(router_cli_ses); } ss_dassert(succp); ss_dassert(ret == 1); @@ -1316,6 +1370,18 @@ static bool select_connect_backend_servers( is_synced_master = false; } + LOGIF(LT, (skygw_log_write(LOGFILE_TRACE, "Servers and conns before ordering:"))); + + for (i=0; ibackend_server->name, + b->backend_server->port, + b->backend_conn_count))); + } /** * Sort the pointer list to servers according to connection counts. As * a consequence those backends having least connections are in the @@ -1323,6 +1389,19 @@ static bool select_connect_backend_servers( */ qsort((void *)backend_ref, (size_t)router_nservers, sizeof(backend_ref_t), bref_cmp); + LOGIF(LT, (skygw_log_write(LOGFILE_TRACE, "Servers and conns after ordering:"))); + + for (i=0; ibackend_server->name, + b->backend_server->port, + b->backend_conn_count))); + + } /** * Choose at least 1+1 (master and slave) and at most 1+max_nslaves * servers from the sorted list. First master found is selected. @@ -1366,6 +1445,12 @@ static bool select_connect_backend_servers( } else { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Unable to establish " + "connection with slave %s:%d", + b->backend_server->name, + b->backend_server->port))); /* handle connect error */ } } @@ -1389,6 +1474,13 @@ static bool select_connect_backend_servers( } else { + succp = false; + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Unable to establish " + "connection with master %s:%d", + b->backend_server->name, + b->backend_server->port))); /* handle connect error */ } } @@ -1929,8 +2021,7 @@ static bool execute_sescmd_in_backend( dcb->session, sescmd_cursor_clone_querybuf(scur)); break; - - case COM_QUIT: + case COM_QUERY: case COM_INIT_DB: default: @@ -2153,7 +2244,14 @@ static bool route_session_write( int rc; succp = true; - + + /** Lock router session */ + if (!rses_begin_locked_router_action(router_cli_ses)) + { + succp = false; + goto return_succp; + } + for (i=0; irses_nbackends; i++) { DCB* dcb = backend_ref[i].bref_dcb; @@ -2168,6 +2266,7 @@ static bool route_session_write( } } } + rses_end_locked_router_action(router_cli_ses); gwbuf_free(querybuf); goto return_succp; }