diff --git a/server/modules/include/shardrouter.h b/server/modules/include/shardrouter.h index f47e074a4..dfd83d520 100644 --- a/server/modules/include/shardrouter.h +++ b/server/modules/include/shardrouter.h @@ -159,7 +159,20 @@ typedef struct subservice_t{ int state; int n_res_waiting; bool mapped; -}SUBSERVICE; +}SUBSERVICE; + +/** + * Bitmask values for the router session's initialization. These values are used + * to prevent responses from internal commands being forwarded to the client. + */ +typedef enum shard_init_mask +{ + INIT_READY = 0x0, + INIT_MAPPING = 0x1, + INIT_USE_DB = 0x02, + INIT_UNINT = 0x04 + +} shard_init_mask_t; /** * The client session structure used within this router. @@ -172,8 +185,8 @@ struct router_client_session { int rses_versno; /*< even = no active update, else odd. not used 4/14 */ bool rses_closed; /*< true when closeSession is called */ DCB* rses_client_dcb; - DCB* dummy_dcb; /* DCB used to send the client write messages from the router itself */ - DCB* queue_dcb; /* DCB used to send queued queries to the router */ + DCB* replydcb; /* DCB used to send the client write messages from the router itself */ + DCB* routedcb; /* DCB used to send queued queries to the router */ MYSQL_session* rses_mysql_session; /** Properties listed by their type */ rses_property_t* rses_properties[RSES_PROP_TYPE_COUNT]; @@ -190,6 +203,8 @@ struct router_client_session { bool hash_init; SESSION* session; GWBUF* queue; + char connect_db[MYSQL_DATABASE_MAXLEN+1]; /*< Database the user was trying to connect to */ + shard_init_mask_t init; /*< Initialization state bitmask */ #if defined(SS_DEBUG) skygw_chk_t rses_chk_tail; #endif diff --git a/server/modules/routing/schemarouter/shardrouter.c b/server/modules/routing/schemarouter/shardrouter.c index c4a9e88ed..5d76ca9ad 100644 --- a/server/modules/routing/schemarouter/shardrouter.c +++ b/server/modules/routing/schemarouter/shardrouter.c @@ -119,7 +119,6 @@ static route_target_t get_shard_route_target( static uint8_t getCapabilities(ROUTER* inst, void* router_session); -//bool parse_db_ignore_list(ROUTER_INSTANCE* router, char* param); static void subsvc_clear_state(SUBSERVICE* svc,subsvc_state_t state); static void subsvc_set_state(SUBSERVICE* svc,subsvc_state_t state); static bool get_shard_subsvc(SUBSERVICE** subsvc,ROUTER_CLIENT_SES* session,char* target); @@ -414,8 +413,14 @@ bool subsvc_is_valid(SUBSERVICE* sub) return false; } +/** + * Map the databases of all subservices. + * @param inst router instance + * @param session router session + * @return 0 on success, 1 on error + */ int -gen_tablelist(ROUTER_INSTANCE* inst, ROUTER_CLIENT_SES* session) +gen_subsvc_dblist(ROUTER_INSTANCE* inst, ROUTER_CLIENT_SES* session) { const char* query = "SHOW DATABASES;"; GWBUF *buffer, *clone; @@ -475,7 +480,6 @@ get_shard_target_name(ROUTER_INSTANCE* router, ROUTER_CLIENT_SES* client, GWBUF* if(sz > 0) { - has_dbs = true; for(i = 0; i < sz; i++) { @@ -489,9 +493,8 @@ get_shard_target_name(ROUTER_INSTANCE* router, ROUTER_CLIENT_SES* client, GWBUF* else { skygw_log_write(LOGFILE_TRACE,"shardrouter: Query targets database '%s' on server '%s",dbnms[i],rval); + has_dbs = true; } - for(j = i; j < sz; j++) free(dbnms[j]); - break; } free(dbnms[i]); } @@ -554,6 +557,10 @@ get_shard_target_name(ROUTER_INSTANCE* router, ROUTER_CLIENT_SES* client, GWBUF* */ rval = (char*) hashtable_fetch(ht, client->rses_mysql_session->db); + if(rval) + { + skygw_log_write(LOGFILE_TRACE,"shardrouter: Using active database '%s'",client->rses_mysql_session->db); + } } return rval; @@ -612,64 +619,160 @@ filterReply(FILTER* instance, void *session, GWBUF *reply) ROUTER_CLIENT_SES* rses = (ROUTER_CLIENT_SES*) instance; SUBSERVICE* subsvc; int i, rv = 1; - bool mapped = true; sescmd_cursor_t* scur; GWBUF* tmp = NULL; - skygw_log_write_flush(LOGFILE_TRACE,"shardrouter: filterReply mapped: %s",rses->hash_init ? "true" : "false"); - if(!rses_begin_locked_router_action(rses)) { + tmp = reply; + while((tmp = gwbuf_consume(tmp,gwbuf_length(tmp)))); return 0; } subsvc = get_subsvc_from_ses(rses, session); - if(SUBSVC_IS_WAITING(subsvc)) + if(rses->init & INIT_MAPPING) { - subsvc_clear_state(subsvc, SUBSVC_WAITING_RESULT); + bool mapped = true, logged = false; + int i; + + for(i = 0; i < rses->n_subservice; i++) + { + + if(subsvc->session == rses->subservice[i]->session && + !SUBSVC_IS_MAPPED(rses->subservice[i])) + { + rses->subservice[i]->state |= SUBSVC_MAPPED; + parse_mapping_response(rses, + rses->subservice[i]->service->name, + reply); + + } + + if(SUBSVC_IS_OK(rses->subservice[i]) && + !SUBSVC_IS_MAPPED(rses->subservice[i])) + { + mapped = false; + if(!logged) + { +/* + skygw_log_write(LOGFILE_DEBUG,"schemarouter: Still waiting for reply to SHOW DATABASES from %s for session %p", + bkrf[i].bref_backend->backend_server->unique_name, + rses->rses_client_dcb->session); +*/ + logged = true; + } + } + } + + if(mapped) + { + /* + * Check if the session is reconnecting with a database name + * that is not in the hashtable. If the database is not found + * then close the session. + */ + + rses->init &= ~INIT_MAPPING; + + if(rses->init & INIT_USE_DB) + { + char* target; + + if((target = hashtable_fetch(rses->dbhash, + rses->connect_db)) == NULL) + { + skygw_log_write_flush(LOGFILE_TRACE,"schemarouter: Connecting to a non-existent database '%s'", + rses->connect_db); + rses->rses_closed = true; + if(rses->queue) + { + while((rses->queue = gwbuf_consume( + rses->queue,gwbuf_length(rses->queue)))); + } + rses_end_locked_router_action(rses); + goto retblock; + } + + /* Send a COM_INIT_DB packet to the server with the right database + * and set it as the client's active database */ + + unsigned int qlen; + GWBUF* buffer; + + qlen = strlen(rses->connect_db); + buffer = gwbuf_alloc(qlen + 5); + if(buffer == NULL) + { + skygw_log_write_flush(LOGFILE_ERROR,"Error : Buffer allocation failed."); + rses->rses_closed = true; + if(rses->queue) + gwbuf_free(rses->queue); + goto retblock; + } + + gw_mysql_set_byte3((unsigned char*)buffer->start,qlen+1); + gwbuf_set_type(buffer,GWBUF_TYPE_MYSQL); + *((unsigned char*)buffer->start + 3) = 0x0; + *((unsigned char*)buffer->start + 4) = 0x2; + memcpy(buffer->start+5,rses->connect_db,qlen); + DCB* dcb = NULL; + + SESSION_ROUTE_QUERY(subsvc->session,buffer); + + goto retblock; + } + + if(rses->queue) + { + GWBUF* tmp = rses->queue; + rses->queue = rses->queue->next; + tmp->next = NULL; + char* querystr = modutil_get_SQL(tmp); + skygw_log_write(LOGFILE_DEBUG,"schemarouter: Sending queued buffer for session %p: %s", + rses->rses_client_dcb->session, + querystr); + poll_add_epollin_event_to_dcb(rses->routedcb,tmp); + free(querystr); + + } + skygw_log_write_flush(LOGFILE_DEBUG,"session [%p] database map finished.", + rses); + } + + goto retblock; } - subsvc_clear_state(subsvc, SUBSVC_QUERY_ACTIVE); - - if(!rses->hash_init) - { - subsvc_set_state(subsvc, SUBSVC_MAPPED); - parse_mapping_response(rses, subsvc->service->name, reply); - - for(i = 0; i < rses->n_subservice; i++) - { - if(SUBSVC_IS_OK(rses->subservice[i]) && !SUBSVC_IS_MAPPED(rses->subservice[i])) - { - mapped = false; - break; - } - - } - - gwbuf_free(reply); - - if(mapped) - { - rses->hash_init = true; - if(rses->queue) - { - tmp = rses->queue; - rses->queue = rses->queue->next; - } - } - - goto retblock; - } - - - if(rses->queue) { - tmp = rses->queue; - rses->queue = rses->queue->next; + GWBUF* tmp = rses->queue; + rses->queue = rses->queue->next; + tmp->next = NULL; + char* querystr = modutil_get_SQL(tmp); + skygw_log_write(LOGFILE_DEBUG,"schemarouter: Sending queued buffer for session %p: %s", + rses->rses_client_dcb->session, + querystr); + poll_add_epollin_event_to_dcb(rses->routedcb,tmp); + free(querystr); + tmp = NULL; } + if(rses->init & INIT_USE_DB) + { + skygw_log_write(LOGFILE_DEBUG,"schemarouter: Reply to USE '%s' received for session %p", + rses->connect_db, + rses->rses_client_dcb->session); + rses->init &= ~INIT_USE_DB; + strcpy(rses->rses_mysql_session->db,rses->connect_db); + ss_dassert(rses->init == INIT_READY); + if(reply) + { + tmp = reply; + while((tmp = gwbuf_consume(tmp,gwbuf_length(tmp)))); + tmp = NULL; + } + goto retblock; + } scur = subsvc->scur; @@ -688,12 +791,8 @@ filterReply(FILTER* instance, void *session, GWBUF *reply) rv = SESSION_ROUTE_REPLY(rses->session, reply); -retblock: - rses_end_locked_router_action(rses); - if(tmp) - { - poll_add_epollin_event_to_dcb(rses->queue_dcb,tmp); - } + retblock: + rses_end_locked_router_action(rses); return rv; } @@ -876,6 +975,14 @@ createInstance(SERVICE *service, char **options) tok = strtok(services, ","); */ + if(options == NULL) + { + free(router); + skygw_log_write(LOGFILE_ERROR, "Error : No 'subservice' router option found. Shardrouter requires at least %d " + "configured services listed in the 'subservices' router option to work.", min_nsvc); + return NULL; + } + while(options[i]) { if(sz <= i) @@ -896,6 +1003,13 @@ createInstance(SERVICE *service, char **options) } res_svc[i] = service_find(options[i]); + if(res_svc[i] == NULL) + { + free(res_svc); + free(router); + skygw_log_write(LOGFILE_ERROR, "Error : No service named '%s' found.", options[i]); + return NULL; + } i++; } /* @@ -907,7 +1021,7 @@ createInstance(SERVICE *service, char **options) if(i < min_nsvc) { - skygw_log_write(LOGFILE_ERROR, "Error : Not enough services. Shardrouter requires at least %d " + skygw_log_write(LOGFILE_ERROR, "Error : Not enough parameters for 'subservice' router option. Shardrouter requires at least %d " "configured services to work.", min_nsvc); free(router->services); free(router); @@ -980,15 +1094,15 @@ newSession( client_rses->rses_autocommit_enabled = true; client_rses->rses_transaction_active = false; client_rses->session = session; - client_rses->dummy_dcb = dcb_alloc(DCB_ROLE_REQUEST_HANDLER); - client_rses->dummy_dcb->func.read = fakeReply; - client_rses->dummy_dcb->state = DCB_STATE_POLLING; - client_rses->dummy_dcb->session = session; + client_rses->replydcb = dcb_alloc(DCB_ROLE_REQUEST_HANDLER); + client_rses->replydcb->func.read = fakeReply; + client_rses->replydcb->state = DCB_STATE_POLLING; + client_rses->replydcb->session = session; - client_rses->queue_dcb = dcb_alloc(DCB_ROLE_REQUEST_HANDLER); - client_rses->queue_dcb->func.read = fakeQuery; - client_rses->queue_dcb->state = DCB_STATE_POLLING; - client_rses->queue_dcb->session = session; + client_rses->routedcb = dcb_alloc(DCB_ROLE_REQUEST_HANDLER); + client_rses->routedcb->func.read = fakeQuery; + client_rses->routedcb->state = DCB_STATE_POLLING; + client_rses->routedcb->session = session; spinlock_init(&client_rses->rses_lock); @@ -1168,10 +1282,10 @@ closeSession( } router_cli_ses->subservice[i]->state = SUBSVC_CLOSED; } - router_cli_ses->dummy_dcb->session = NULL; - router_cli_ses->queue_dcb->session = NULL; - dcb_close(router_cli_ses->dummy_dcb); - dcb_close(router_cli_ses->queue_dcb); + router_cli_ses->replydcb->session = NULL; + router_cli_ses->routedcb->session = NULL; + dcb_close(router_cli_ses->replydcb); + dcb_close(router_cli_ses->routedcb); /** Unlock */ rses_end_locked_router_action(router_cli_ses); @@ -1480,32 +1594,46 @@ routeQuery(ROUTER* instance, ret = 0; goto retblock; } - - if(!router_cli_ses->hash_init) - { - gen_tablelist(inst, router_cli_ses); - - skygw_log_write(LOGFILE_TRACE,"shardrouter: got a query while mapping databases."); - GWBUF* tmp = router_cli_ses->queue; - - while(tmp && tmp->next) + if(!(rses_is_closed = router_cli_ses->rses_closed)) { - tmp = tmp->next; + if(router_cli_ses->init & INIT_UNINT) + { + /* Generate database list */ + gen_subsvc_dblist(inst,router_cli_ses); + + } + + if(router_cli_ses->init & INIT_MAPPING) + { + + char* querystr = modutil_get_SQL(querybuf); + skygw_log_write(LOGFILE_DEBUG,"shardrouter: Storing query for session %p: %s", + router_cli_ses->rses_client_dcb->session, + querystr); + free(querystr); + gwbuf_make_contiguous(querybuf); + GWBUF* ptr = router_cli_ses->queue; + + while(ptr && ptr->next) + { + ptr = ptr->next; + } + + if(ptr == NULL) + { + router_cli_ses->queue = querybuf; + } + else + { + ptr->next = querybuf; + + } + rses_end_locked_router_action(router_cli_ses); + return 1; + } + } - - if(tmp == NULL) - { - router_cli_ses->queue = querybuf; - } - else - { - tmp->next = querybuf; - } - - rses_end_locked_router_action(router_cli_ses); - return 1; - } - + rses_end_locked_router_action(router_cli_ses); packet = GWBUF_DATA(querybuf); @@ -1608,7 +1736,7 @@ routeQuery(ROUTER* instance, */ GWBUF* dbres = gen_show_dbs_response(inst,router_cli_ses); - poll_add_epollin_event_to_dcb(router_cli_ses->dummy_dcb,dbres); + poll_add_epollin_event_to_dcb(router_cli_ses->replydcb,dbres); ret = 1; goto retblock; } @@ -2982,7 +3110,7 @@ reply_error: * Create an incoming event for randomly selected backend DCB which * will then be notified and replied 'back' to the client. */ - poll_add_epollin_event_to_dcb(rses->dummy_dcb, + poll_add_epollin_event_to_dcb(rses->replydcb, gwbuf_clone(errbuf)); gwbuf_free(errbuf); }