Fixed a deadlock when the same thread tried to route a reply and a query at the same time.

This commit is contained in:
Markus Makela
2015-02-01 21:36:54 +02:00
parent d9ab0261b9
commit b6cd0916a0
2 changed files with 40 additions and 30 deletions

View File

@ -173,6 +173,7 @@ struct router_client_session {
bool rses_closed; /*< true when closeSession is called */ bool rses_closed; /*< true when closeSession is called */
DCB* rses_client_dcb; DCB* rses_client_dcb;
DCB* dummy_dcb; /* DCB used to send the client write messages from the router itself */ DCB* dummy_dcb; /* DCB used to send the client write messages from the router itself */
DCB* queue_dcb; /* DCB used to send queued queries to the router */
MYSQL_session* rses_mysql_session; MYSQL_session* rses_mysql_session;
/** Properties listed by their type */ /** Properties listed by their type */
rses_property_t* rses_properties[RSES_PROP_TYPE_COUNT]; rses_property_t* rses_properties[RSES_PROP_TYPE_COUNT];

View File

@ -480,6 +480,7 @@ filterReply(FILTER* instance, void *session, GWBUF *reply)
sescmd_cursor_t* scur; sescmd_cursor_t* scur;
GWBUF* tmp = NULL; GWBUF* tmp = NULL;
skygw_log_write_flush(LOGFILE_TRACE,"shardrouter: filterReply mapped: %s",rses->hash_init ? "true" : "false");
if(!rses_begin_locked_router_action(rses)) if(!rses_begin_locked_router_action(rses))
{ {
@ -555,7 +556,8 @@ retblock:
rses_end_locked_router_action(rses); rses_end_locked_router_action(rses);
if(tmp) if(tmp)
{ {
rv = routeQuery((ROUTER*) rses->router, rses, tmp); poll_add_epollin_event_to_dcb(rses->queue_dcb,tmp);
//routeQuery((ROUTER*) rses->router, rses, tmp);
} }
return rv; return rv;
} }
@ -566,7 +568,7 @@ retblock:
* @param dcb The dummy DCB * @param dcb The dummy DCB
* @return 1 on success, 0 on failure * @return 1 on success, 0 on failure
*/ */
int fakeRead(DCB* dcb) int fakeReply(DCB* dcb)
{ {
if(dcb->dcb_readqueue) if(dcb->dcb_readqueue)
{ {
@ -578,6 +580,27 @@ int fakeRead(DCB* dcb)
} }
/**
* This function reads the DCB's readqueue and sends it as a query directly to the router.
* The function is used to route queued queries to the subservices when replies are received.
* @param dcb The dummy DCB
* @return 1 on success, 0 on failure
*/
int fakeQuery(DCB* dcb)
{
if(dcb->dcb_readqueue)
{
GWBUF* tmp = dcb->dcb_readqueue;
void* rinst = dcb->session->service->router_instance;
void *rses = dcb->session->router_session;
dcb->dcb_readqueue = NULL;
return dcb->session->service->router->routeQuery(rinst,rses,tmp);
}
return 1;
}
/** /**
* Implementation of the mandatory version entry point * Implementation of the mandatory version entry point
* *
@ -819,9 +842,15 @@ newSession(
client_rses->rses_transaction_active = false; client_rses->rses_transaction_active = false;
client_rses->session = session; client_rses->session = session;
client_rses->dummy_dcb = dcb_alloc(DCB_ROLE_REQUEST_HANDLER); client_rses->dummy_dcb = dcb_alloc(DCB_ROLE_REQUEST_HANDLER);
client_rses->dummy_dcb->func.read = fakeRead; client_rses->dummy_dcb->func.read = fakeReply;
client_rses->dummy_dcb->state = DCB_STATE_POLLING; client_rses->dummy_dcb->state = DCB_STATE_POLLING;
client_rses->dummy_dcb->session = session; client_rses->dummy_dcb->session = session;
client_rses->queue_dcb = dcb_alloc(DCB_ROLE_REQUEST_HANDLER);
client_rses->queue_dcb->func.read = fakeQuery;
client_rses->queue_dcb->state = DCB_STATE_POLLING;
client_rses->queue_dcb->session = session;
spinlock_init(&client_rses->rses_lock); spinlock_init(&client_rses->rses_lock);
client_rses->subservice = calloc(router->n_services, sizeof(SUBSERVICE*)); client_rses->subservice = calloc(router->n_services, sizeof(SUBSERVICE*));
@ -1000,6 +1029,10 @@ closeSession(
} }
router_cli_ses->subservice[i]->state = SUBSVC_CLOSED; router_cli_ses->subservice[i]->state = SUBSVC_CLOSED;
} }
router_cli_ses->dummy_dcb->session = NULL;
router_cli_ses->queue_dcb->session = NULL;
dcb_close(router_cli_ses->dummy_dcb);
dcb_close(router_cli_ses->queue_dcb);
/** Unlock */ /** Unlock */
rses_end_locked_router_action(router_cli_ses); rses_end_locked_router_action(router_cli_ses);
@ -1289,7 +1322,7 @@ routeQuery(ROUTER* instance,
route_target_t route_target = TARGET_UNDEFINED; route_target_t route_target = TARGET_UNDEFINED;
bool succp = false; bool succp = false;
char* tname = NULL; char* tname = NULL;
skygw_log_write_flush(LOGFILE_TRACE,"shardrouter: routeQuery");
CHK_CLIENT_RSES(router_cli_ses); CHK_CLIENT_RSES(router_cli_ses);
/** Dirty read for quick check if router is closed. */ /** Dirty read for quick check if router is closed. */
@ -1311,6 +1344,7 @@ routeQuery(ROUTER* instance,
if(!router_cli_ses->hash_init) if(!router_cli_ses->hash_init)
{ {
skygw_log_write(LOGFILE_TRACE,"shardrouter: got a query while mapping databases.");
GWBUF* tmp = router_cli_ses->queue; GWBUF* tmp = router_cli_ses->queue;
while(tmp && tmp->next) while(tmp && tmp->next)
@ -1421,31 +1455,6 @@ routeQuery(ROUTER* instance,
} }
} }
if(LOG_IS_ENABLED(LOGFILE_TRACE))
{
uint8_t* packet = GWBUF_DATA(querybuf);
unsigned char ptype = packet[4];
size_t len = MIN(GWBUF_LENGTH(querybuf),
MYSQL_GET_PACKET_LEN((unsigned char *) querybuf->start) - 1);
char* data = (char*) &packet[5];
char* contentstr = strndup(data, len);
char* qtypestr = skygw_get_qtype_str(qtype);
skygw_log_write(LOGFILE_TRACE,
"> Autocommit: %s, trx is %s, cmd: %s, type: %s, "
"stmt: %s%s %s",
(router_cli_ses->rses_autocommit_enabled ? "[enabled]" : "[disabled]"),
(router_cli_ses->rses_transaction_active ? "[open]" : "[not open]"),
STRPACKETTYPE(ptype),
(qtypestr == NULL ? "N/A" : qtypestr),
contentstr,
(querybuf->hint == NULL ? "" : ", Hint:"),
(querybuf->hint == NULL ? "" : STRHINTTYPE(querybuf->hint->type)));
free(contentstr);
free(qtypestr);
}
/** /**
* Find out whether the query should be routed to single server or to * Find out whether the query should be routed to single server or to
* all of them. * all of them.