diff --git a/server/modules/include/shardrouter.h b/server/modules/include/shardrouter.h index 571670ace..83b3d1da0 100644 --- a/server/modules/include/shardrouter.h +++ b/server/modules/include/shardrouter.h @@ -173,6 +173,7 @@ struct router_client_session { bool rses_closed; /*< true when closeSession is called */ DCB* rses_client_dcb; DCB* dummy_dcb; /* DCB used to send the client write messages from the router itself */ + DCB* queue_dcb; /* DCB used to send queued queries to the router */ MYSQL_session* rses_mysql_session; /** Properties listed by their type */ rses_property_t* rses_properties[RSES_PROP_TYPE_COUNT]; diff --git a/server/modules/routing/dbshard/shardrouter.c b/server/modules/routing/dbshard/shardrouter.c index 0784fdb16..6eee8e78a 100644 --- a/server/modules/routing/dbshard/shardrouter.c +++ b/server/modules/routing/dbshard/shardrouter.c @@ -480,6 +480,7 @@ filterReply(FILTER* instance, void *session, GWBUF *reply) sescmd_cursor_t* scur; GWBUF* tmp = NULL; + skygw_log_write_flush(LOGFILE_TRACE,"shardrouter: filterReply mapped: %s",rses->hash_init ? "true" : "false"); if(!rses_begin_locked_router_action(rses)) { @@ -554,8 +555,9 @@ filterReply(FILTER* instance, void *session, GWBUF *reply) retblock: rses_end_locked_router_action(rses); if(tmp) - { - rv = routeQuery((ROUTER*) rses->router, rses, tmp); + { + poll_add_epollin_event_to_dcb(rses->queue_dcb,tmp); + //routeQuery((ROUTER*) rses->router, rses, tmp); } return rv; } @@ -566,7 +568,7 @@ retblock: * @param dcb The dummy DCB * @return 1 on success, 0 on failure */ -int fakeRead(DCB* dcb) +int fakeReply(DCB* dcb) { if(dcb->dcb_readqueue) { @@ -578,6 +580,27 @@ int fakeRead(DCB* dcb) } + +/** + * This function reads the DCB's readqueue and sends it as a query directly to the router. + * The function is used to route queued queries to the subservices when replies are received. + * @param dcb The dummy DCB + * @return 1 on success, 0 on failure + */ +int fakeQuery(DCB* dcb) +{ + if(dcb->dcb_readqueue) + { + GWBUF* tmp = dcb->dcb_readqueue; + void* rinst = dcb->session->service->router_instance; + void *rses = dcb->session->router_session; + + dcb->dcb_readqueue = NULL; + return dcb->session->service->router->routeQuery(rinst,rses,tmp); + } + return 1; +} + /** * Implementation of the mandatory version entry point * @@ -819,9 +842,15 @@ newSession( client_rses->rses_transaction_active = false; client_rses->session = session; client_rses->dummy_dcb = dcb_alloc(DCB_ROLE_REQUEST_HANDLER); - client_rses->dummy_dcb->func.read = fakeRead; + client_rses->dummy_dcb->func.read = fakeReply; client_rses->dummy_dcb->state = DCB_STATE_POLLING; client_rses->dummy_dcb->session = session; + + client_rses->queue_dcb = dcb_alloc(DCB_ROLE_REQUEST_HANDLER); + client_rses->queue_dcb->func.read = fakeQuery; + client_rses->queue_dcb->state = DCB_STATE_POLLING; + client_rses->queue_dcb->session = session; + spinlock_init(&client_rses->rses_lock); client_rses->subservice = calloc(router->n_services, sizeof(SUBSERVICE*)); @@ -1000,6 +1029,10 @@ closeSession( } router_cli_ses->subservice[i]->state = SUBSVC_CLOSED; } + router_cli_ses->dummy_dcb->session = NULL; + router_cli_ses->queue_dcb->session = NULL; + dcb_close(router_cli_ses->dummy_dcb); + dcb_close(router_cli_ses->queue_dcb); /** Unlock */ rses_end_locked_router_action(router_cli_ses); @@ -1289,7 +1322,7 @@ routeQuery(ROUTER* instance, route_target_t route_target = TARGET_UNDEFINED; bool succp = false; char* tname = NULL; - + skygw_log_write_flush(LOGFILE_TRACE,"shardrouter: routeQuery"); CHK_CLIENT_RSES(router_cli_ses); /** Dirty read for quick check if router is closed. */ @@ -1311,6 +1344,7 @@ routeQuery(ROUTER* instance, if(!router_cli_ses->hash_init) { + skygw_log_write(LOGFILE_TRACE,"shardrouter: got a query while mapping databases."); GWBUF* tmp = router_cli_ses->queue; while(tmp && tmp->next) @@ -1421,31 +1455,6 @@ routeQuery(ROUTER* instance, } } - - if(LOG_IS_ENABLED(LOGFILE_TRACE)) - { - uint8_t* packet = GWBUF_DATA(querybuf); - unsigned char ptype = packet[4]; - size_t len = MIN(GWBUF_LENGTH(querybuf), - MYSQL_GET_PACKET_LEN((unsigned char *) querybuf->start) - 1); - char* data = (char*) &packet[5]; - char* contentstr = strndup(data, len); - char* qtypestr = skygw_get_qtype_str(qtype); - - skygw_log_write(LOGFILE_TRACE, - "> Autocommit: %s, trx is %s, cmd: %s, type: %s, " - "stmt: %s%s %s", - (router_cli_ses->rses_autocommit_enabled ? "[enabled]" : "[disabled]"), - (router_cli_ses->rses_transaction_active ? "[open]" : "[not open]"), - STRPACKETTYPE(ptype), - (qtypestr == NULL ? "N/A" : qtypestr), - contentstr, - (querybuf->hint == NULL ? "" : ", Hint:"), - (querybuf->hint == NULL ? "" : STRHINTTYPE(querybuf->hint->type))); - - free(contentstr); - free(qtypestr); - } /** * Find out whether the query should be routed to single server or to * all of them.