Code clean up

This commit is contained in:
VilhoRaatikka
2014-06-17 16:15:19 +03:00
parent 49163a4c43
commit e7fa80a591
4 changed files with 179 additions and 115 deletions

View File

@ -1074,7 +1074,6 @@ dcb_close(DCB *dcb)
{ {
dcb->func.close(dcb); dcb->func.close(dcb);
} }
dcb_call_callback(dcb, DCB_REASON_CLOSE); dcb_call_callback(dcb, DCB_REASON_CLOSE);
if (rc == 0) { if (rc == 0) {

View File

@ -257,6 +257,7 @@ static int gw_read_backend_event(DCB *dcb) {
router = session->service->router; router = session->service->router;
router_instance = session->service->router_instance; router_instance = session->service->router_instance;
rsession = session->router_session;
if (backend_protocol->state == MYSQL_AUTH_RECV) { if (backend_protocol->state == MYSQL_AUTH_RECV) {
/*< /*<
@ -323,59 +324,48 @@ static int gw_read_backend_event(DCB *dcb) {
*/ */
spinlock_release(&dcb->authlock); spinlock_release(&dcb->authlock);
spinlock_acquire(&dcb->delayqlock); spinlock_acquire(&dcb->delayqlock);
/*<
* vraa : errorHandle if (dcb->delayq != NULL)
* check the delayq before the reply {
*/
if (dcb->delayq != NULL) {
/* send an error to the client */
mysql_send_custom_error(
dcb->session->client,
1,
0,
"Connection to backend lost.");
// consume all the delay queue
while ((dcb->delayq = gwbuf_consume( while ((dcb->delayq = gwbuf_consume(
dcb->delayq, dcb->delayq,
GWBUF_LENGTH(dcb->delayq))) != NULL); GWBUF_LENGTH(dcb->delayq))) != NULL);
} }
spinlock_release(&dcb->delayqlock); spinlock_release(&dcb->delayqlock);
/** Whole session is being closed so return. */
if (session->state == SESSION_STATE_STOPPING)
{ {
goto return_rc; GWBUF* errbuf;
} bool succp;
/* try reload users' table for next connection */
service_refresh_users(dcb->session->client->service);
while (session->state != SESSION_STATE_ROUTER_READY && #if defined(SS_DEBUG)
session->state != SESSION_STATE_STOPPING) LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Backend read error handling.")));
#endif
errbuf = mysql_create_custom_error(
1,
0,
"Authentication with backend failed. "
"Session will be closed.");
router->handleError(router_instance,
rsession,
errbuf,
dcb,
ERRACT_REPLY_CLIENT,
&succp);
ss_dassert(!succp);
if (session != NULL)
{ {
ss_dassert(
session->state == SESSION_STATE_READY ||
session->state ==
SESSION_STATE_ROUTER_READY ||
session->state == SESSION_STATE_STOPPING);
/**
* Session shouldn't be NULL at this point
* anymore. Just checking..
*/
if (session->client->session == NULL)
{
rc = 1;
goto return_rc;
}
usleep(1);
}
spinlock_acquire(&session->ses_lock); spinlock_acquire(&session->ses_lock);
session->state = SESSION_STATE_STOPPING; session->state = SESSION_STATE_STOPPING;
spinlock_release(&session->ses_lock); spinlock_release(&session->ses_lock);
/** }
* Start terminating the session dcb_close(dcb);
* by closing the client. }
*/
dcb_close(session->client);
rc = 1; rc = 1;
goto return_rc; goto return_rc;
} }
@ -436,6 +426,14 @@ static int gw_read_backend_event(DCB *dcb) {
* - go through all servers and select one according to * - go through all servers and select one according to
* the criteria that user specified in the beginning. * the criteria that user specified in the beginning.
*/ */
#if defined(SS_DEBUG)
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Backend read error handling #2.")));
#endif
errbuf = mysql_create_custom_error( errbuf = mysql_create_custom_error(
1, 1,
0, 0,
@ -678,6 +676,13 @@ static int gw_error_backend_event(DCB *dcb)
router = session->service->router; router = session->service->router;
router_instance = session->service->router_instance; router_instance = session->service->router_instance;
#if defined(SS_DEBUG)
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Backend error event handling.")));
#endif
errbuf = mysql_create_custom_error( errbuf = mysql_create_custom_error(
1, 1,
0, 0,
@ -826,6 +831,13 @@ gw_backend_hangup(DCB *dcb)
router = session->service->router; router = session->service->router;
router_instance = session->service->router_instance; router_instance = session->service->router_instance;
#if defined(SS_DEBUG)
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Backend hangup error handling.")));
#endif
errbuf = mysql_create_custom_error( errbuf = mysql_create_custom_error(
1, 1,
0, 0,
@ -840,6 +852,12 @@ gw_backend_hangup(DCB *dcb)
/** There are not required backends available, close session. */ /** There are not required backends available, close session. */
if (!succp) { if (!succp) {
#if defined(SS_DEBUG)
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Backend hangup -> closing session.")));
#endif
spinlock_acquire(&session->ses_lock); spinlock_acquire(&session->ses_lock);
session->state = SESSION_STATE_STOPPING; session->state = SESSION_STATE_STOPPING;
spinlock_release(&session->ses_lock); spinlock_release(&session->ses_lock);
@ -937,24 +955,51 @@ static int backend_write_delayqueue(DCB *dcb)
rc = dcb_write(dcb, localq); rc = dcb_write(dcb, localq);
} }
if (rc == 0) { if (rc == 0)
{
GWBUF* errbuf;
bool succp;
ROUTER_OBJECT *router = NULL;
ROUTER *router_instance = NULL;
void *rsession = NULL;
SESSION *session = dcb->session;
int receive_rc = 0;
CHK_SESSION(session);
router = session->service->router;
router_instance = session->service->router_instance;
rsession = session->router_session;
#if defined(SS_DEBUG)
LOGIF(LE, (skygw_log_write_flush( LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR, LOGFILE_ERROR,
"Error : failed to write buffered data to back-end " "Backend write delayqueue error handling.")));
"server. Buffer was empty of back-end was disconnected " #endif
"during operation."))); errbuf = mysql_create_custom_error(
mysql_send_custom_error(
dcb->session->client,
1, 1,
0, 0,
"Failed to write buffered data to back-end server. " "Failed to write buffered data to back-end server. "
"Buffer was empty or back-end was disconnected during " "Buffer was empty or back-end was disconnected during "
"operation."); "operation. Session will be closed.");
dcb->session->state = SESSION_STATE_STOPPING; router->handleError(router_instance,
rsession,
errbuf,
dcb,
ERRACT_NEW_CONNECTION,
&succp);
if (!succp)
{
if (session != NULL)
{
spinlock_acquire(&session->ses_lock);
session->state = SESSION_STATE_STOPPING;
spinlock_release(&session->ses_lock);
}
dcb_close(dcb); dcb_close(dcb);
} }
}
return rc; return rc;
} }

View File

@ -522,12 +522,6 @@ int gw_read_client_event(
if (rc < 0) if (rc < 0)
{ {
if (dcb->session != NULL)
{
spinlock_acquire(&dcb->session->ses_lock);
dcb->session->state = SESSION_STATE_STOPPING;
spinlock_release(&dcb->session->ses_lock);
}
dcb_close(dcb); dcb_close(dcb);
} }
nbytes_read = gwbuf_length(read_buffer); nbytes_read = gwbuf_length(read_buffer);
@ -714,16 +708,16 @@ int gw_read_client_event(
* close router session and that closes * close router session and that closes
* backends * backends
*/ */
if (session != NULL)
{
spinlock_acquire(&session->ses_lock);
session->state = SESSION_STATE_STOPPING;
spinlock_release(&session->ses_lock);
}
dcb_close(dcb); dcb_close(dcb);
} }
else else
{ {
#if defined(SS_DEBUG)
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Client read error handling.")));
#endif
/* Send a custom error as MySQL command reply */ /* Send a custom error as MySQL command reply */
mysql_send_custom_error( mysql_send_custom_error(
dcb, dcb,
@ -731,7 +725,6 @@ int gw_read_client_event(
0, 0,
"Can't route query. Connection to " "Can't route query. Connection to "
"backend lost"); "backend lost");
protocol->state = MYSQL_IDLE;
} }
rc = 1; rc = 1;
/** Free buffer */ /** Free buffer */
@ -780,10 +773,6 @@ int gw_read_client_event(
/** /**
* Close router session which causes closing of backends. * Close router session which causes closing of backends.
*/ */
spinlock_acquire(&session->ses_lock);
session->state = SESSION_STATE_STOPPING;
spinlock_release(&session->ses_lock);
dcb_close(dcb); dcb_close(dcb);
} }
else else
@ -819,7 +808,11 @@ int gw_read_client_event(
1, 1,
0, 0,
"Write to backend failed. Session closed."); "Write to backend failed. Session closed.");
#if defined(SS_DEBUG)
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Client routing error handling.")));
#endif
LOGIF(LE, (skygw_log_write_flush( LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR, LOGFILE_ERROR,
"Error : Routing the query failed. " "Error : Routing the query failed. "
@ -831,19 +824,11 @@ int gw_read_client_event(
dcb, dcb,
ERRACT_REPLY_CLIENT, ERRACT_REPLY_CLIENT,
&succp); &succp);
ss_dassert(!succp);
if (!succp)
{
if (session != NULL)
{
spinlock_acquire(&session->ses_lock);
session->state = SESSION_STATE_STOPPING;
spinlock_release(&session->ses_lock);
}
dcb_close(dcb); dcb_close(dcb);
} }
} }
}
goto return_rc; goto return_rc;
} /* MYSQL_IDLE */ } /* MYSQL_IDLE */
break; break;
@ -1307,12 +1292,11 @@ static int gw_error_client_event(
session = dcb->session; session = dcb->session;
CHK_SESSION(session); CHK_SESSION(session);
if (session != NULL) #if defined(SS_DEBUG)
{ LOGIF(LE, (skygw_log_write_flush(
spinlock_acquire(&session->ses_lock); LOGFILE_ERROR,
session->state = SESSION_STATE_STOPPING; "Client error event handling.")));
spinlock_release(&session->ses_lock); #endif
}
dcb_close(dcb); dcb_close(dcb);
return 1; return 1;
} }
@ -1348,7 +1332,7 @@ gw_client_close(DCB *dcb)
router = session->service->router; router = session->service->router;
router_instance = session->service->router_instance; router_instance = session->service->router_instance;
rsession = session->router_session; rsession = session->router_session;
/** Close router session and all its connections */
router->closeSession(router_instance, rsession); router->closeSession(router_instance, rsession);
} }
return 1; return 1;
@ -1372,12 +1356,11 @@ gw_client_hangup_event(DCB *dcb)
session = dcb->session; session = dcb->session;
CHK_SESSION(session); CHK_SESSION(session);
if (session != NULL) #if defined(SS_DEBUG)
{ LOGIF(LE, (skygw_log_write_flush(
spinlock_acquire(&session->ses_lock); LOGFILE_ERROR,
session->state = SESSION_STATE_STOPPING; "Client hangup error handling.")));
spinlock_release(&session->ses_lock); #endif
}
dcb_close(dcb); dcb_close(dcb);
return 1; return 1;
} }

View File

@ -610,6 +610,7 @@ static void* newSession(
} }
/** Copy backend pointers to router session. */ /** Copy backend pointers to router session. */
client_rses->rses_master_ref = master_ref; client_rses->rses_master_ref = master_ref;
ss_dassert(SERVER_IS_MASTER(master_ref->bref_backend->backend_server));
client_rses->rses_backend_ref = backend_ref; client_rses->rses_backend_ref = backend_ref;
client_rses->rses_nbackends = router_nservers; /*< # of backend servers */ client_rses->rses_nbackends = router_nservers; /*< # of backend servers */
client_rses->rses_capabilities = RCAP_TYPE_STMT_INPUT; client_rses->rses_capabilities = RCAP_TYPE_STMT_INPUT;
@ -1040,8 +1041,6 @@ static int routeQuery(
{ {
ret = 1; ret = 1;
} }
ss_dassert(succp);
ss_dassert(ret == 1);
goto return_ret; goto return_ret;
} }
else if (QUERY_IS_TYPE(qtype, QUERY_TYPE_READ) && else if (QUERY_IS_TYPE(qtype, QUERY_TYPE_READ) &&
@ -1366,6 +1365,7 @@ static void clientReply(
{ {
/** Write reply to client DCB */ /** Write reply to client DCB */
client_dcb->func.write(client_dcb, writebuf); client_dcb->func.write(client_dcb, writebuf);
bref_clear_state(backend_ref, BREF_WAITING_RESULT);
} }
lock_failed: lock_failed:
@ -1488,6 +1488,7 @@ static bool select_connect_backend_servers(
master_found = true; master_found = true;
master_connected = true; master_connected = true;
ss_dassert(SERVER_IS_MASTER((*p_master_ref)->bref_backend->backend_server));
} }
/** New session or master failure case */ /** New session or master failure case */
else else
@ -1521,7 +1522,7 @@ static bool select_connect_backend_servers(
is_synced_master = false; is_synced_master = false;
} }
#if defined(EXTRA_DEBUGGING) #if defined(EXTRA_SS_DEBUG)
LOGIF(LT, (skygw_log_write(LOGFILE_TRACE, "Servers and conns before ordering:"))); LOGIF(LT, (skygw_log_write(LOGFILE_TRACE, "Servers and conns before ordering:")));
for (i=0; i<router_nservers; i++) for (i=0; i<router_nservers; i++)
@ -1529,18 +1530,23 @@ static bool select_connect_backend_servers(
BACKEND* b = backend_ref[i].bref_backend; BACKEND* b = backend_ref[i].bref_backend;
LOGIF(LT, (skygw_log_write(LOGFILE_TRACE, LOGIF(LT, (skygw_log_write(LOGFILE_TRACE,
"%s %d:%d", "master bref %p bref %p %d %s %d:%d",
*p_master_ref,
&backend_ref[i],
backend_ref[i].bref_state,
b->backend_server->name, b->backend_server->name,
b->backend_server->port, b->backend_server->port,
b->backend_conn_count))); b->backend_conn_count)));
} }
#endif #endif
ss_dassert(!master_connected ||
SERVER_IS_MASTER((*p_master_ref)->bref_backend->backend_server));
/** /**
* Sort the pointer list to servers according to connection counts. As * Sort the pointer list to servers according to connection counts. As
* a consequence those backends having least connections are in the * a consequence those backends having least connections are in the
* beginning of the list. * beginning of the list.
*/ */
qsort((void *)backend_ref, (size_t)router_nservers, sizeof(backend_ref_t), p); qsort(backend_ref, (size_t)router_nservers, sizeof(backend_ref_t), p);
if (LOG_IS_ENABLED(LOGFILE_TRACE)) if (LOG_IS_ENABLED(LOGFILE_TRACE))
{ {
@ -1600,7 +1606,6 @@ static bool select_connect_backend_servers(
b->backend_conn_count, b->backend_conn_count,
router->bitmask))); router->bitmask)));
if (SERVER_IS_RUNNING(b->backend_server) && if (SERVER_IS_RUNNING(b->backend_server) &&
((b->backend_server->status & router->bitmask) == ((b->backend_server->status & router->bitmask) ==
router->bitvalue)) router->bitvalue))
@ -1667,9 +1672,14 @@ static bool select_connect_backend_servers(
} }
} }
} }
else if (!master_connected && else if (SERVER_IS_MASTER(b->backend_server))
(SERVER_IS_MASTER(b->backend_server)))
{ {
*p_master_ref = &backend_ref[i];
if (master_connected)
{
continue;
}
master_found = true; master_found = true;
backend_ref[i].bref_dcb = dcb_connect( backend_ref[i].bref_dcb = dcb_connect(
@ -1691,7 +1701,7 @@ static bool select_connect_backend_servers(
BREF_NOT_USED); BREF_NOT_USED);
bref_set_state(&backend_ref[i], bref_set_state(&backend_ref[i],
BREF_IN_USE); BREF_IN_USE);
*p_master_ref = &backend_ref[i];
/** Increase backend connection counter */ /** Increase backend connection counter */
/** Increase backend connection counter */ /** Increase backend connection counter */
atomic_add(&b->backend_server->stats.n_current, 1); atomic_add(&b->backend_server->stats.n_current, 1);
@ -1713,6 +1723,26 @@ static bool select_connect_backend_servers(
} }
} /*< for */ } /*< for */
#if defined(EXTRA_SS_DEBUG)
LOGIF(LT, (skygw_log_write(LOGFILE_TRACE, "Servers and conns after ordering:")));
for (i=0; i<router_nservers; i++)
{
BACKEND* b = backend_ref[i].bref_backend;
LOGIF(LT, (skygw_log_write_flush(LOGFILE_TRACE,
"master bref %p bref %p %d %s %d:%d",
*p_master_ref,
&backend_ref[i],
backend_ref[i].bref_state,
b->backend_server->name,
b->backend_server->port,
b->backend_conn_count)));
}
ss_dassert(!master_connected ||
SERVER_IS_MASTER((*p_master_ref)->bref_backend->backend_server));
#endif
/** /**
* Successful cases * Successful cases
*/ */
@ -2695,6 +2725,13 @@ static void handleError (
bref = get_bref_from_dcb(rses, backend_dcb); bref = get_bref_from_dcb(rses, backend_dcb);
/** failed DCB has already been replaced */
if (bref == NULL)
{
rses_end_locked_router_action(rses);
*succp = true;
return;
}
/** /**
* Error handler is already called for this DCB because * Error handler is already called for this DCB because
* it's not polling anymore. It can be assumed that * it's not polling anymore. It can be assumed that
@ -2714,9 +2751,9 @@ static void handleError (
DCB* client_dcb; DCB* client_dcb;
client_dcb = session->client; client_dcb = session->client;
client_dcb->func.write(client_dcb, errmsgbuf); client_dcb->func.write(client_dcb, errmsgbuf);
bref_clear_state(bref, BREF_WAITING_RESULT);
} }
bref_clear_state(bref, BREF_IN_USE); bref_clear_state(bref, BREF_IN_USE);
bref_clear_state(bref, BREF_WAITING_RESULT);
bref_set_state(bref, BREF_NOT_USED); bref_set_state(bref, BREF_NOT_USED);
bref_set_state(bref, BREF_CLOSED); bref_set_state(bref, BREF_CLOSED);
/** /**