Added some error handling to shardrouter.

The shardrouter now handles situations where the subservice sessions have failed and returns an error if an attempt to
query such a service is made.
This commit is contained in:
Markus Makela
2015-01-30 22:28:01 +02:00
parent 1348947faa
commit a69e1d5f01
2 changed files with 133 additions and 38 deletions

View File

@ -256,22 +256,6 @@ hashcmpfun(
return strcmp(i1, i2);
}
/*
static void*
hstrdup(void* fval)
{
char* str = (char*) fval;
return strdup(str);
}
static void*
hfree(void* fval)
{
free(fval);
return NULL;
}
*/
bool
parse_mapping_response(ROUTER_CLIENT_SES* rses, char* target, GWBUF* buf)
@ -301,6 +285,32 @@ parse_mapping_response(ROUTER_CLIENT_SES* rses, char* target, GWBUF* buf)
return rval;
}
bool subsvc_is_valid(SUBSERVICE* sub)
{
if(sub->session == NULL ||
sub->service->router == NULL)
{
return false;
}
spinlock_acquire(&sub->session->ses_lock);
session_state_t ses_state = sub->session->state;
spinlock_release(&sub->session->ses_lock);
spinlock_acquire(&sub->service->spin);
int svc_state = sub->service->state;
spinlock_release(&sub->service->spin);
if(ses_state == SESSION_STATE_ROUTER_READY &&
svc_state == SERVICE_STATE_STARTED)
{
return true;
}
return false;
}
int
gen_tablelist(ROUTER_INSTANCE* inst, ROUTER_CLIENT_SES* session)
{
@ -323,10 +333,13 @@ gen_tablelist(ROUTER_INSTANCE* inst, ROUTER_CLIENT_SES* session)
for(i = 0; i < session->n_subservice; i++)
{
clone = gwbuf_clone(buffer);
rval |= !SESSION_ROUTE_QUERY(session->subservice[i]->session,clone);
subsvc_set_state(session->subservice[i],SUBSVC_WAITING_RESULT|SUBSVC_QUERY_ACTIVE);
if(SUBSVC_IS_OK(session->subservice[i]))
{
clone = gwbuf_clone(buffer);
rval |= !SESSION_ROUTE_QUERY(session->subservice[i]->session,clone);
subsvc_set_state(session->subservice[i],SUBSVC_WAITING_RESULT|SUBSVC_QUERY_ACTIVE);
}
}
gwbuf_free(buffer);
@ -441,6 +454,7 @@ filterReply (FILTER* instance, void *session, GWBUF *reply)
int i,rv = 1;
bool mapped = true;
sescmd_cursor_t* scur;
GWBUF* tmp = NULL;
if(!rses_begin_locked_router_action(rses))
{
@ -448,7 +462,14 @@ filterReply (FILTER* instance, void *session, GWBUF *reply)
}
subsvc = get_subsvc_from_ses(rses,session);
subsvc_clear_state(subsvc,SUBSVC_WAITING_RESULT|SUBSVC_QUERY_ACTIVE);
if(SUBSVC_IS_WAITING(subsvc))
{
subsvc_clear_state(subsvc,SUBSVC_WAITING_RESULT);
}
subsvc_clear_state(subsvc,SUBSVC_QUERY_ACTIVE);
if(!rses->hash_init)
{
subsvc_set_state(subsvc,SUBSVC_MAPPED);
@ -456,7 +477,7 @@ filterReply (FILTER* instance, void *session, GWBUF *reply)
for(i = 0;i<rses->n_subservice;i++)
{
if(!SUBSVC_IS_MAPPED(rses->subservice[i]))
if(SUBSVC_IS_OK(rses->subservice[i]) && !SUBSVC_IS_MAPPED(rses->subservice[i]))
{
mapped = false;
break;
@ -468,7 +489,8 @@ filterReply (FILTER* instance, void *session, GWBUF *reply)
if(mapped)
{
rses->hash_init = true;
rses->hash_init = true;
tmp = rses->queue;
}
goto retblock;
@ -494,6 +516,10 @@ filterReply (FILTER* instance, void *session, GWBUF *reply)
retblock:
rses_end_locked_router_action(rses);
if(tmp)
{
routeQuery((ROUTER*)rses->router,rses,tmp);
}
return rv;
}
@ -779,24 +805,60 @@ newSession(
}
/* TODO: add NULL value checks */
client_rses->subservice[i] = subsvc;
subsvc->scur = calloc(1,sizeof(sescmd_cursor_t));
if(subsvc->scur == NULL)
{
subsvc_set_state(subsvc,SUBSVC_FAILED);
skygw_log_write_flush(LOGFILE_ERROR,"Error : Memory allocation failed in shardrouter.");
continue;
}
subsvc->scur->scmd_cur_rses = client_rses;
subsvc->scur->scmd_cur_ptr_property = client_rses->rses_properties;
subsvc->service = router->services[i];
subsvc->dcb =dcb_clone(client_rses->rses_client_dcb);
subsvc->session = session_alloc(subsvc->service,subsvc->dcb);
if(subsvc->session == NULL){
subsvc->state = SUBSVC_FAILED;
subsvc->dcb = dcb_clone(client_rses->rses_client_dcb);
if(subsvc->dcb == NULL){
subsvc_set_state(subsvc,SUBSVC_FAILED);
skygw_log_write_flush(LOGFILE_ERROR,"Error : Failed to clone client DCB in shardrouter.");
continue;
}
subsvc_set_state(subsvc,SUBSVC_OK);
subsvc->session = session_alloc(subsvc->service,subsvc->dcb);
if(subsvc->session == NULL){
dcb_close(subsvc->dcb);
subsvc->dcb = NULL;
subsvc_set_state(subsvc,SUBSVC_FAILED);
skygw_log_write_flush(LOGFILE_ERROR,"Error : Failed to create subsession for service %s in shardrouter.",subsvc->service->name);
continue;
}
dummy_filterdef = filter_alloc("tee_dummy","tee_dummy");
if(dummy_filterdef == NULL)
{
subsvc_set_state(subsvc,SUBSVC_FAILED);
skygw_log_write_flush(LOGFILE_ERROR,"Error : Failed to allocate filter definition in shardrouter.");
continue;
}
dummy_filterdef->obj = &dummyObject;
dummy_filterdef->filter = (FILTER*)client_rses;
dummy_upstream = filterUpstream(dummy_filterdef,subsvc->session,&subsvc->session->tail);
if(dummy_upstream == NULL)
{
subsvc_set_state(subsvc,SUBSVC_FAILED);
skygw_log_write_flush(LOGFILE_ERROR,"Error : Failed to set filterUpstream in shardrouter.");
continue;
}
subsvc->session->tail = *dummy_upstream;
client_rses->subservice[i] = subsvc;
subsvc_set_state(subsvc,SUBSVC_OK);
free(dummy_upstream);
}
@ -893,9 +955,12 @@ closeSession(
rtr = router_cli_ses->subservice[i]->service->router;
rinst = router_cli_ses->subservice[i]->service->router_instance;
ses = router_cli_ses->subservice[i]->session;
rses = ses->router_session;
ses->state = SESSION_STATE_STOPPING;
rtr->closeSession(rinst,rses);
if(ses != NULL)
{
rses = ses->router_session;
ses->state = SESSION_STATE_STOPPING;
rtr->closeSession(rinst,rses);
}
router_cli_ses->subservice[i]->state = SUBSVC_CLOSED;
}
@ -1197,6 +1262,11 @@ routeQuery(ROUTER* instance,
}
ss_dassert(!GWBUF_IS_TYPE_UNDEFINED(querybuf));
if(!router_cli_ses->hash_init)
{
router_cli_ses->queue = querybuf;
return 1;
}
packet = GWBUF_DATA(querybuf);
packet_type = packet[4];
@ -1497,7 +1567,7 @@ routeQuery(ROUTER* instance,
}
}
if(succp) /*< Have DCB of the target backend */
if(succp) /*< Have SUBSERVICE of the target service */
{
sescmd_cursor_t* scur;
scur = target_subsvc->scur;
@ -1532,8 +1602,13 @@ routeQuery(ROUTER* instance,
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Routing query failed.")));
ret = 0;
}
}
else
{
ret = 0;
}
rses_end_locked_router_action(router_cli_ses);
retblock:
@ -2092,7 +2167,13 @@ execute_sescmd_in_backend(SUBSERVICE* subsvc)
sescmd_cursor_t* scur;
if(SUBSVC_IS_CLOSED(subsvc))
if(SUBSVC_IS_CLOSED(subsvc) || !SUBSVC_IS_OK(subsvc))
{
succp = false;
goto return_succp;
}
if(!subsvc_is_valid(subsvc))
{
succp = false;
goto return_succp;
@ -2336,7 +2417,7 @@ route_session_write(
i+1 >= router_cli_ses->n_subservice ? "<" : "")));
}
if(!SUBSVC_IS_CLOSED(subsvc))
if(!SUBSVC_IS_CLOSED(subsvc) && SUBSVC_IS_OK(subsvc))
{
rc = SESSION_ROUTE_QUERY(subsvc->session,gwbuf_clone(querybuf));
@ -2532,8 +2613,20 @@ get_shard_subsvc(SUBSERVICE** subsvc,ROUTER_CLIENT_SES* session,char* target)
{
if(strcmp(session->subservice[i]->service->name,target) == 0)
{
*subsvc = session->subservice[i];
return true;
if (SUBSVC_IS_OK(session->subservice[i]))
{
if(subsvc_is_valid(session->subservice[i])){
*subsvc = session->subservice[i];
return true;
}
/**
* The service has failed
*/
subsvc_set_state(session->subservice[i],SUBSVC_FAILED);
}
}
}