dcb.c:dcb_write accept also dcb state DCB_STATE_NOPOLLING since it only means that dcb has been removed from epoll set but it is still possible to write to it. Bug #384 http://bugs.skysql.com/show_bug.cgi?id=384 session.h:added new state for SESSION, SESSION_STATE_ROUTER_READY which follows SESSION_STATE_READY. The difference is that ROUTER_READY is set only after router session is successfully created while READY means that session still lacks router. session.c:set SESSION_STATE_ROUTER_READY when router is created. mysql_backend.c:gw_read_backend_event, added SESSION_STATE_ROUTER_READY check before router session is closed. Changed chec kso that it doesn't block in infinite loop (although it shouldn't be possible anyway). mysql_backend.c:gw_error_backend_event, added similar check before session is closed.
This commit is contained in:
@ -585,9 +585,10 @@ static skygw_query_type_t resolve_query_type(
|
|||||||
LOGIF(LD, (skygw_log_write(
|
LOGIF(LD, (skygw_log_write(
|
||||||
LOGFILE_DEBUG,
|
LOGFILE_DEBUG,
|
||||||
"%lu [resolve_query_type] "
|
"%lu [resolve_query_type] "
|
||||||
"Unknown functype. Something "
|
"Unknown functype %d. Something "
|
||||||
"has gone wrong.",
|
"has gone wrong.",
|
||||||
pthread_self())));
|
pthread_self(),
|
||||||
|
ftype)));
|
||||||
break;
|
break;
|
||||||
} /**< switch */
|
} /**< switch */
|
||||||
/**< Set new query type */
|
/**< Set new query type */
|
||||||
|
@ -659,7 +659,8 @@ dcb_write(DCB *dcb, GWBUF *queue)
|
|||||||
if (queue == NULL ||
|
if (queue == NULL ||
|
||||||
(dcb->state != DCB_STATE_ALLOC &&
|
(dcb->state != DCB_STATE_ALLOC &&
|
||||||
dcb->state != DCB_STATE_POLLING &&
|
dcb->state != DCB_STATE_POLLING &&
|
||||||
dcb->state != DCB_STATE_LISTENING))
|
dcb->state != DCB_STATE_LISTENING &&
|
||||||
|
dcb->state != DCB_STATE_NOPOLLING))
|
||||||
{
|
{
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -105,7 +105,7 @@ session_alloc(SERVICE *service, DCB *client_dcb)
|
|||||||
session->refcount = 1;
|
session->refcount = 1;
|
||||||
/*<
|
/*<
|
||||||
* This indicates that session is ready to be shared with backend
|
* This indicates that session is ready to be shared with backend
|
||||||
* DCBs.
|
* DCBs. Note that this doesn't mean that router is initialized yet!
|
||||||
*/
|
*/
|
||||||
session->state = SESSION_STATE_READY;
|
session->state = SESSION_STATE_READY;
|
||||||
|
|
||||||
@ -145,6 +145,7 @@ session_alloc(SERVICE *service, DCB *client_dcb)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
spinlock_acquire(&session_spin);
|
spinlock_acquire(&session_spin);
|
||||||
|
session->state = SESSION_STATE_ROUTER_READY;
|
||||||
session->next = allSessions;
|
session->next = allSessions;
|
||||||
allSessions = session;
|
allSessions = session;
|
||||||
spinlock_release(&session_spin);
|
spinlock_release(&session_spin);
|
||||||
|
@ -50,11 +50,12 @@ typedef struct {
|
|||||||
} SESSION_STATS;
|
} SESSION_STATS;
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
SESSION_STATE_ALLOC,
|
SESSION_STATE_ALLOC, /*< for all sessions */
|
||||||
SESSION_STATE_READY,
|
SESSION_STATE_READY, /*< for router session */
|
||||||
SESSION_STATE_LISTENER,
|
SESSION_STATE_ROUTER_READY, /*< for router session */
|
||||||
SESSION_STATE_LISTENER_STOPPED,
|
SESSION_STATE_LISTENER, /*< for listener session */
|
||||||
SESSION_STATE_FREE
|
SESSION_STATE_LISTENER_STOPPED, /*< for listener session */
|
||||||
|
SESSION_STATE_FREE /*< for all sessions */
|
||||||
} session_state_t;
|
} session_state_t;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -299,17 +299,31 @@ static int gw_read_backend_event(DCB *dcb) {
|
|||||||
dcb->delayq,
|
dcb->delayq,
|
||||||
gwbuf_length(dcb->delayq));
|
gwbuf_length(dcb->delayq));
|
||||||
}
|
}
|
||||||
ss_dassert(session->state == SESSION_READY);
|
|
||||||
/**
|
while (session->state != SESSION_STATE_ROUTER_READY)
|
||||||
* vraa : errorHandle
|
|
||||||
* rsession may be NULL if session is being created
|
|
||||||
* in parallel by another thread.
|
|
||||||
*/
|
|
||||||
while(session->router_session == NULL)
|
|
||||||
{
|
{
|
||||||
|
ss_dassert(
|
||||||
|
session->state == SESSION_STATE_READY ||
|
||||||
|
session->state ==
|
||||||
|
SESSION_STATE_ROUTER_READY);
|
||||||
|
/**
|
||||||
|
* Session shouldn't be NULL at this point
|
||||||
|
* anymore. Just checking..
|
||||||
|
*/
|
||||||
|
if (session->client->session == NULL)
|
||||||
|
{
|
||||||
|
rc = 1;
|
||||||
|
goto return_with_lock;
|
||||||
|
}
|
||||||
usleep(1);
|
usleep(1);
|
||||||
}
|
}
|
||||||
|
/**
|
||||||
|
* rsession shouldn't be NULL since session
|
||||||
|
* state indicates that it was initialized
|
||||||
|
* successfully.
|
||||||
|
*/
|
||||||
rsession = session->router_session;
|
rsession = session->router_session;
|
||||||
|
ss_dassert(rsession != NULL);
|
||||||
|
|
||||||
LOGIF(LD, (skygw_log_write_flush(
|
LOGIF(LD, (skygw_log_write_flush(
|
||||||
LOGFILE_DEBUG,
|
LOGFILE_DEBUG,
|
||||||
@ -596,17 +610,19 @@ static int gw_error_backend_event(DCB *dcb) {
|
|||||||
rc = 1;
|
rc = 1;
|
||||||
}
|
}
|
||||||
LOGIF(LE, (skygw_log_write_flush(
|
LOGIF(LE, (skygw_log_write_flush(
|
||||||
LOGFILE_ERROR,
|
LOGFILE_DEBUG,
|
||||||
"%lu [gw_error_backend_event] Some error occurred in backend. "
|
"%lu [gw_error_backend_event] Some error occurred in backend. "
|
||||||
"rc = %d",
|
"rc = %d",
|
||||||
pthread_self(),
|
pthread_self(),
|
||||||
rc)));
|
rc)));
|
||||||
|
|
||||||
|
if (session->state == SESSION_STATE_ROUTER_READY)
|
||||||
|
{
|
||||||
rsession = session->router_session;
|
rsession = session->router_session;
|
||||||
ss_dassert(rsession != NULL);
|
|
||||||
/*<
|
/*<
|
||||||
* vraa : errorHandle
|
|
||||||
* rsession should never be NULL here.
|
* rsession should never be NULL here.
|
||||||
*/
|
*/
|
||||||
|
ss_dassert(rsession != NULL);
|
||||||
LOGIF(LD, (skygw_log_write_flush(
|
LOGIF(LD, (skygw_log_write_flush(
|
||||||
LOGFILE_DEBUG,
|
LOGFILE_DEBUG,
|
||||||
"%lu [gw_error_backend_event] "
|
"%lu [gw_error_backend_event] "
|
||||||
@ -615,7 +631,7 @@ static int gw_error_backend_event(DCB *dcb) {
|
|||||||
pthread_self())));
|
pthread_self())));
|
||||||
|
|
||||||
router->closeSession(router_instance, rsession);
|
router->closeSession(router_instance, rsession);
|
||||||
|
}
|
||||||
return rc;
|
return rc;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -315,7 +315,6 @@ static void* newSession(
|
|||||||
client_rses->master_dcb = dcb_connect(be_master->backend_server,
|
client_rses->master_dcb = dcb_connect(be_master->backend_server,
|
||||||
session,
|
session,
|
||||||
be_master->backend_server->protocol);
|
be_master->backend_server->protocol);
|
||||||
|
|
||||||
if (client_rses->master_dcb == NULL)
|
if (client_rses->master_dcb == NULL)
|
||||||
{
|
{
|
||||||
/** Close slave connection first. */
|
/** Close slave connection first. */
|
||||||
@ -543,7 +542,7 @@ static int routeQuery(
|
|||||||
|
|
||||||
if (rses_is_closed || (master_dcb == NULL && slave_dcb == NULL))
|
if (rses_is_closed || (master_dcb == NULL && slave_dcb == NULL))
|
||||||
{
|
{
|
||||||
LOGIF(LE, (skygw_log_write(
|
LOGIF(LE, (skygw_log_write_flush(
|
||||||
LOGFILE_ERROR,
|
LOGFILE_ERROR,
|
||||||
"Error: Failed to route %s:%s:\"%s\" to backend server. "
|
"Error: Failed to route %s:%s:\"%s\" to backend server. "
|
||||||
"%s.",
|
"%s.",
|
||||||
|
Reference in New Issue
Block a user