Added new state to SESSION: SESSION_STATE_STOPPING, which is set in protocol module before calling closeSession (router). THe new state tells that session is closing and DCBs included may not be polling anymore.

Fixed some crash scenarios.
This commit is contained in:
VilhoRaatikka
2014-05-08 23:17:35 +03:00
parent a1361d9c9e
commit 8be4aba223
8 changed files with 216 additions and 113 deletions

View File

@ -217,15 +217,26 @@ getUsers(SERVICE *service, struct users *users)
*/ */
server = service->databases; server = service->databases;
dpwd = decryptPassword(service_passwd); dpwd = decryptPassword(service_passwd);
while (server != NULL && mysql_real_connect(con, while (server != NULL && (mysql_real_connect(con,
server->name, server->name,
service_user, service_user,
dpwd, dpwd,
NULL, NULL,
server->port, server->port,
NULL, NULL,
0) == NULL) 0) == NULL))
{ {
if (server == NULL)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Unable to connect to %s:%d, \"%s\"",
server->name,
server->port,
mysql_error(con))));
mysql_close(con);
return -1;
}
server = server->nextdb; server = server->nextdb;
} }
free(dpwd); free(dpwd);

View File

@ -688,11 +688,20 @@ dcb_write(DCB *dcb, GWBUF *queue)
ss_dassert(queue != NULL); ss_dassert(queue != NULL);
/**
* SESSION_STATE_STOPPING means that one of the backends is closing
* the router session. Some backends may have not completed
* authentication yet and thus they have no information about router
* being closed. Session state is changed to SESSION_STATE_STOPPING
* before router's closeSession is called and that tells that DCB may
* still be writable.
*/
if (queue == NULL || if (queue == NULL ||
(dcb->state != DCB_STATE_ALLOC && (dcb->state != DCB_STATE_ALLOC &&
dcb->state != DCB_STATE_POLLING && dcb->state != DCB_STATE_POLLING &&
dcb->state != DCB_STATE_LISTENING && dcb->state != DCB_STATE_LISTENING &&
dcb->state != DCB_STATE_NOPOLLING)) dcb->state != DCB_STATE_NOPOLLING &&
dcb->session->state != SESSION_STATE_STOPPING))
{ {
LOGIF(LD, (skygw_log_write( LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG, LOGFILE_DEBUG,
@ -703,6 +712,7 @@ dcb_write(DCB *dcb, GWBUF *queue)
dcb, dcb,
STRDCBSTATE(dcb->state), STRDCBSTATE(dcb->state),
dcb->fd))); dcb->fd)));
ss_dassert(false);
return 0; return 0;
} }

View File

@ -129,6 +129,10 @@ session_alloc(SERVICE *service, DCB *client_dcb)
session); session);
if (session->router_session == NULL) { if (session->router_session == NULL) {
/**
* Inform other threads that session is closing.
*/
session->state == SESSION_STATE_STOPPING;
/*< /*<
* Decrease refcount, set dcb's session pointer NULL * Decrease refcount, set dcb's session pointer NULL
* and set session pointer to NULL. * and set session pointer to NULL.
@ -439,3 +443,18 @@ session_state(int state)
return "Invalid State"; return "Invalid State";
} }
} }
SESSION* get_session_by_router_ses(
void* rses)
{
SESSION* ses = allSessions;
while (ses->router_session != rses && ses->next != NULL)
ses = ses->next;
if (ses->router_session != rses)
{
ses = NULL;
}
return ses;
}

View File

@ -53,6 +53,7 @@ typedef enum {
SESSION_STATE_ALLOC, /*< for all sessions */ SESSION_STATE_ALLOC, /*< for all sessions */
SESSION_STATE_READY, /*< for router session */ SESSION_STATE_READY, /*< for router session */
SESSION_STATE_ROUTER_READY, /*< for router session */ SESSION_STATE_ROUTER_READY, /*< for router session */
SESSION_STATE_STOPPING, /*< router is being closed */
SESSION_STATE_LISTENER, /*< for listener session */ SESSION_STATE_LISTENER, /*< for listener session */
SESSION_STATE_LISTENER_STOPPED, /*< for listener session */ SESSION_STATE_LISTENER_STOPPED, /*< for listener session */
SESSION_STATE_FREE /*< for all sessions */ SESSION_STATE_FREE /*< for all sessions */
@ -93,4 +94,5 @@ void dprintAllSessions(struct dcb *);
void dprintSession(struct dcb *, SESSION *); void dprintSession(struct dcb *, SESSION *);
char *session_state(int); char *session_state(int);
bool session_link_dcb(SESSION *, struct dcb *); bool session_link_dcb(SESSION *, struct dcb *);
#endif SESSION* get_session_by_router_ses(void* rses);
#endif

View File

@ -306,12 +306,14 @@ static int gw_read_backend_event(DCB *dcb) {
/* try reload users' table for next connection */ /* try reload users' table for next connection */
service_refresh_users(dcb->session->client->service); service_refresh_users(dcb->session->client->service);
while (session->state != SESSION_STATE_ROUTER_READY) while (session->state != SESSION_STATE_ROUTER_READY &&
session->state != SESSION_STATE_STOPPING)
{ {
ss_dassert( ss_dassert(
session->state == SESSION_STATE_READY || session->state == SESSION_STATE_READY ||
session->state == session->state ==
SESSION_STATE_ROUTER_READY); SESSION_STATE_ROUTER_READY ||
session->state == SESSION_STATE_STOPPING);
/** /**
* Session shouldn't be NULL at this point * Session shouldn't be NULL at this point
* anymore. Just checking.. * anymore. Just checking..
@ -323,6 +325,15 @@ static int gw_read_backend_event(DCB *dcb) {
} }
usleep(1); usleep(1);
} }
if (session->state == SESSION_STATE_STOPPING)
{
goto return_with_lock;
}
spinlock_acquire(&session->ses_lock);
session->state = SESSION_STATE_STOPPING;
spinlock_release(&session->ses_lock);
/** /**
* rsession shouldn't be NULL since session * rsession shouldn't be NULL since session
* state indicates that it was initialized * state indicates that it was initialized
@ -357,8 +368,7 @@ static int gw_read_backend_event(DCB *dcb) {
/* check the delay queue and flush the data */ /* check the delay queue and flush the data */
if (dcb->delayq) if (dcb->delayq)
{ {
backend_write_delayqueue(dcb); rc = backend_write_delayqueue(dcb);
rc = 1;
goto return_with_lock; goto return_with_lock;
} }
} }
@ -567,9 +577,7 @@ gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue)
snprintf(str, len+1, "%s", startpoint); snprintf(str, len+1, "%s", startpoint);
LOGIF(LE, (skygw_log_write_flush( LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR, LOGFILE_ERROR,
"Error : Routing query \"%s\" failed due to " "Error : Authentication to backend failed.")));
"authentication failure.",
str)));
/** Consume query buffer */ /** Consume query buffer */
while ((queue = gwbuf_consume( while ((queue = gwbuf_consume(
queue, queue,
@ -667,6 +675,10 @@ static int gw_error_backend_event(DCB *dcb) {
if (session->state == SESSION_STATE_ROUTER_READY) if (session->state == SESSION_STATE_ROUTER_READY)
{ {
spinlock_acquire(&session->ses_lock);
session->state = SESSION_STATE_STOPPING;
spinlock_release(&session->ses_lock);
rsession = session->router_session; rsession = session->router_session;
/*< /*<
* rsession should never be NULL here. * rsession should never be NULL here.
@ -847,34 +859,36 @@ static int backend_write_delayqueue(DCB *dcb)
spinlock_acquire(&dcb->delayqlock); spinlock_acquire(&dcb->delayqlock);
if (dcb->delayq == NULL)
{
spinlock_release(&dcb->delayqlock);
rc = 1;
}
else
{
localq = dcb->delayq; localq = dcb->delayq;
dcb->delayq = NULL; dcb->delayq = NULL;
spinlock_release(&dcb->delayqlock); spinlock_release(&dcb->delayqlock);
rc = dcb_write(dcb, localq); rc = dcb_write(dcb, localq);
}
if (rc == 0) { if (rc == 0) {
/*< vraa : errorHandle */
/**
* This error can be muted because it is often due
* unexpected dcb state which means that concurrent thread
* already wrote the queue and closed dcb.
*/
#if 0
LOGIF(LE, (skygw_log_write_flush( LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR, LOGFILE_ERROR,
"%lu [backend_write_delayqueue] Some error occurred in " "Error : failed to write buffered data to back-end "
"backend.", "server. Buffer was empty of back-end was disconnected "
pthread_self()))); "during operation.")));
#endif
mysql_send_custom_error( mysql_send_custom_error(
dcb->session->client, dcb->session->client,
1, 1,
0, 0,
"Unable to write to backend server. Connection was " "Failed to write buffered data to back-end server. "
"closed."); "Buffer was empty or back-end was disconnected during "
"operation.");
dcb_close(dcb); dcb_close(dcb);
} }
return rc; return rc;
} }

View File

@ -540,7 +540,7 @@ int gw_read_client_event(DCB* dcb) {
goto return_rc; goto return_rc;
} }
// close client socket and the sessioA too // close client socket and the session too
dcb->func.close(dcb); dcb->func.close(dcb);
} else { } else {
// do nothing if reading 1 byte // do nothing if reading 1 byte
@ -731,12 +731,8 @@ int gw_read_client_event(DCB* dcb) {
"backend. Close client dcb %p", "backend. Close client dcb %p",
pthread_self(), pthread_self(),
dcb))); dcb)));
/** close client connection, closes router session too */
/** close client connection */ rc = dcb->func.close(dcb);
(dcb->func).close(dcb);
/** close backends connection */
router->closeSession(router_instance, rsession);
rc = 1;
} }
else else
{ {
@ -1217,37 +1213,16 @@ return_rc:
return rc; return rc;
} }
static int gw_error_client_event(DCB *dcb) { static int gw_error_client_event(
SESSION* session; DCB* dcb)
ROUTER_OBJECT* router; {
void* router_instance; int rc;
void* rsession;
CHK_DCB(dcb);
#if defined(SS_DEBUG)
MySQLProtocol* protocol = (MySQLProtocol *)dcb->protocol; rc = dcb->func.close(dcb);
if (dcb->state == DCB_STATE_POLLING ||
dcb->state == DCB_STATE_NOPOLLING || return rc;
dcb->state == DCB_STATE_ZOMBIE)
{
CHK_PROTOCOL(protocol);
}
#endif
session = dcb->session;
/**
* session may be NULL if session_alloc failed.
* In that case router session was not created.
*/
if (session != NULL) {
CHK_SESSION(session);
router = session->service->router;
router_instance = session->service->router_instance;
rsession = session->router_session;
router->closeSession(router_instance, rsession);
}
dcb_close(dcb);
return 1;
} }
static int static int
@ -1274,6 +1249,10 @@ gw_client_close(DCB *dcb)
*/ */
if (session != NULL) { if (session != NULL) {
CHK_SESSION(session); CHK_SESSION(session);
spinlock_acquire(&session->ses_lock);
session->state = SESSION_STATE_STOPPING;
spinlock_release(&session->ses_lock);
router = session->service->router; router = session->service->router;
router_instance = session->service->router_instance; router_instance = session->service->router_instance;
rsession = session->router_session; rsession = session->router_session;
@ -1296,43 +1275,12 @@ gw_client_close(DCB *dcb)
static int static int
gw_client_hangup_event(DCB *dcb) gw_client_hangup_event(DCB *dcb)
{ {
SESSION* session; int rc;
ROUTER_OBJECT* router;
void* router_instance;
void* rsession;
int rc = 1;
#if defined(SS_DEBUG)
MySQLProtocol* protocol = (MySQLProtocol *)dcb->protocol;
if (dcb->state == DCB_STATE_POLLING ||
dcb->state == DCB_STATE_NOPOLLING ||
dcb->state == DCB_STATE_ZOMBIE)
{
CHK_PROTOCOL(protocol);
}
#endif
CHK_DCB(dcb);
if (dcb->state != DCB_STATE_POLLING) { CHK_DCB(dcb);
goto return_rc; rc = dcb->func.close(dcb);
}
return rc;
session = dcb->session;
/**
* session may be NULL if session_alloc failed.
* In that case router session was not created.
*/
if (session != NULL) {
CHK_SESSION(session);
router = session->service->router;
router_instance = session->service->router_instance;
rsession = session->router_session;
router->closeSession(router_instance, rsession);
}
dcb_close(dcb);
return_rc:
return rc;
} }

View File

@ -605,7 +605,7 @@ int gw_do_connect_to_backend(
LOGIF(LE, (skygw_log_write_flush( LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR, LOGFILE_ERROR,
"Error: Establishing connection to backend server " "Error: Establishing connection to backend server "
"%s:%d failed. Socket creation failed due " "%s:%d failed.\n\t\t Socket creation failed due "
"%d, %s.", "%d, %s.",
host, host,
port, port,

View File

@ -503,6 +503,7 @@ static void* newSession(
{ {
/** log this */ /** log this */
free(client_rses); free(client_rses);
free(backend_ref);
client_rses = NULL; client_rses = NULL;
goto return_rses; goto return_rses;
} }
@ -609,7 +610,7 @@ static void closeSession(
backend_ref_t* backend_ref; backend_ref_t* backend_ref;
router_cli_ses = (ROUTER_CLIENT_SES *)router_session; router_cli_ses = (ROUTER_CLIENT_SES *)router_session;
CHK_CLIENT_RSES(router_cli_ses); CHK_CLIENT_RSES(router_cli_ses);
backend_ref = router_cli_ses->rses_backend_ref; backend_ref = router_cli_ses->rses_backend_ref;
/** /**
@ -618,8 +619,17 @@ static void closeSession(
if (!router_cli_ses->rses_closed && if (!router_cli_ses->rses_closed &&
rses_begin_locked_router_action(router_cli_ses)) rses_begin_locked_router_action(router_cli_ses))
{ {
DCB* dcbs[router_cli_ses->rses_nbackends];
int i = 0; int i = 0;
/**
* session must be moved to SESSION_STATE_STOPPING state before
* router session is closed.
*/
#if defined(SS_DEBUG)
SESSION* ses = get_session_by_router_ses((void*)router_cli_ses);
ss_dassert(ses != NULL);
ss_dassert(ses->state == SESSION_STATE_STOPPING);
#endif
/** /**
* This sets router closed. Nobody is allowed to use router * This sets router closed. Nobody is allowed to use router
@ -629,17 +639,18 @@ static void closeSession(
for (i=0; i<router_cli_ses->rses_nbackends; i++) for (i=0; i<router_cli_ses->rses_nbackends; i++)
{ {
DCB* dcb = backend_ref[i].bref_dcb;
/** decrease server current connection counters */ /** decrease server current connection counters */
atomic_add(&backend_ref[i].bref_backend->backend_server->stats.n_current, -1); atomic_add(&backend_ref[i].bref_backend->backend_server->stats.n_current, -1);
/** Close those which had been connected */ /** Close those which had been connected */
if (backend_ref[i].bref_dcb != NULL) if (dcb != NULL)
{ {
CHK_DCB(backend_ref[i].bref_dcb); CHK_DCB(dcb);
dcbs[i] = backend_ref[i].bref_dcb;
backend_ref[i].bref_dcb = backend_ref[i].bref_dcb =
(DCB *)0xdeadbeef; /*< prevent new uses of DCB */ (DCB *)0xdeadbeef; /*< prevent new uses of DCB */
dcbs[i]->func.close(dcbs[i]); dcb->func.close(dcb);
} }
} }
/** Unlock */ /** Unlock */
@ -743,6 +754,29 @@ static bool get_dcb(
succp = true; succp = true;
} }
} }
if (!succp)
{
backend_ref = rses->rses_master_ref;
if (backend_ref->bref_dcb != NULL)
{
*p_dcb = backend_ref->bref_dcb;
succp = true;
ss_dassert(
SERVER_IS_MASTER(backend_ref->bref_backend->backend_server) &&
smallest_nconn == -1);
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Warning : No slaves connected nor "
"available. Choosing master %s:%d "
"instead.",
backend_ref->bref_backend->backend_server->name,
backend_ref->bref_backend->backend_server->port)));
}
}
ss_dassert(succp); ss_dassert(succp);
} }
else if (btype == BE_MASTER || BE_JOINED) else if (btype == BE_MASTER || BE_JOINED)
@ -962,16 +996,29 @@ static int routeQuery(
if (succp) if (succp)
{ {
/** Lock router session */
if (!rses_begin_locked_router_action(router_cli_ses))
{
goto return_ret;
}
if ((ret = slave_dcb->func.write(slave_dcb, querybuf)) == 1) if ((ret = slave_dcb->func.write(slave_dcb, querybuf)) == 1)
{ {
atomic_add(&inst->stats.n_slave, 1); atomic_add(&inst->stats.n_slave, 1);
} }
ss_dassert(ret == 1); else
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Routing query \"%s\" failed.",
querystr)));
}
rses_end_locked_router_action(router_cli_ses);
} }
ss_dassert(succp); ss_dassert(succp);
goto return_ret; goto return_ret;
} }
else else
{ {
bool succp = true; bool succp = true;
@ -998,10 +1045,17 @@ static int routeQuery(
} }
if (succp) if (succp)
{ {
/** Lock router session */
if (!rses_begin_locked_router_action(router_cli_ses))
{
goto return_ret;
}
if ((ret = master_dcb->func.write(master_dcb, querybuf)) == 1) if ((ret = master_dcb->func.write(master_dcb, querybuf)) == 1)
{ {
atomic_add(&inst->stats.n_master, 1); atomic_add(&inst->stats.n_master, 1);
} }
rses_end_locked_router_action(router_cli_ses);
} }
ss_dassert(succp); ss_dassert(succp);
ss_dassert(ret == 1); ss_dassert(ret == 1);
@ -1316,6 +1370,18 @@ static bool select_connect_backend_servers(
is_synced_master = false; is_synced_master = false;
} }
LOGIF(LT, (skygw_log_write(LOGFILE_TRACE, "Servers and conns before ordering:")));
for (i=0; i<router_nservers; i++)
{
BACKEND* b = backend_ref[i].bref_backend;
LOGIF(LT, (skygw_log_write(LOGFILE_TRACE,
"%s %d:%d",
b->backend_server->name,
b->backend_server->port,
b->backend_conn_count)));
}
/** /**
* Sort the pointer list to servers according to connection counts. As * Sort the pointer list to servers according to connection counts. As
* a consequence those backends having least connections are in the * a consequence those backends having least connections are in the
@ -1323,6 +1389,19 @@ static bool select_connect_backend_servers(
*/ */
qsort((void *)backend_ref, (size_t)router_nservers, sizeof(backend_ref_t), bref_cmp); qsort((void *)backend_ref, (size_t)router_nservers, sizeof(backend_ref_t), bref_cmp);
LOGIF(LT, (skygw_log_write(LOGFILE_TRACE, "Servers and conns after ordering:")));
for (i=0; i<router_nservers; i++)
{
BACKEND* b = backend_ref[i].bref_backend;
LOGIF(LT, (skygw_log_write(LOGFILE_TRACE,
"%s %d:%d",
b->backend_server->name,
b->backend_server->port,
b->backend_conn_count)));
}
/** /**
* Choose at least 1+1 (master and slave) and at most 1+max_nslaves * Choose at least 1+1 (master and slave) and at most 1+max_nslaves
* servers from the sorted list. First master found is selected. * servers from the sorted list. First master found is selected.
@ -1366,6 +1445,12 @@ static bool select_connect_backend_servers(
} }
else else
{ {
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Unable to establish "
"connection with slave %s:%d",
b->backend_server->name,
b->backend_server->port)));
/* handle connect error */ /* handle connect error */
} }
} }
@ -1389,6 +1474,13 @@ static bool select_connect_backend_servers(
} }
else else
{ {
succp = false;
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Unable to establish "
"connection with master %s:%d",
b->backend_server->name,
b->backend_server->port)));
/* handle connect error */ /* handle connect error */
} }
} }
@ -1929,8 +2021,7 @@ static bool execute_sescmd_in_backend(
dcb->session, dcb->session,
sescmd_cursor_clone_querybuf(scur)); sescmd_cursor_clone_querybuf(scur));
break; break;
case COM_QUIT:
case COM_QUERY: case COM_QUERY:
case COM_INIT_DB: case COM_INIT_DB:
default: default:
@ -2153,7 +2244,14 @@ static bool route_session_write(
int rc; int rc;
succp = true; succp = true;
/** Lock router session */
if (!rses_begin_locked_router_action(router_cli_ses))
{
succp = false;
goto return_succp;
}
for (i=0; i<router_cli_ses->rses_nbackends; i++) for (i=0; i<router_cli_ses->rses_nbackends; i++)
{ {
DCB* dcb = backend_ref[i].bref_dcb; DCB* dcb = backend_ref[i].bref_dcb;
@ -2168,6 +2266,7 @@ static bool route_session_write(
} }
} }
} }
rses_end_locked_router_action(router_cli_ses);
gwbuf_free(querybuf); gwbuf_free(querybuf);
goto return_succp; goto return_succp;
} }