diff --git a/server/modules/include/dbshard.h b/server/modules/include/dbshard.h index 2f6816187..705297886 100644 --- a/server/modules/include/dbshard.h +++ b/server/modules/include/dbshard.h @@ -51,6 +51,14 @@ typedef enum prep_stmt_state { #endif /*< PREP_STMT_CACHING */ +typedef enum init_state +{ + INIT_READY = 0x0, + INIT_MAPPING = 0x1, + INIT_USE_DB = 0x02 + +} init_state_t; + typedef enum bref_state { BREF_IN_USE = 0x01, BREF_WAITING_RESULT = 0x02, /*< for session commands only */ @@ -251,8 +259,11 @@ struct router_client_session { struct router_instance *router; /*< The router instance */ struct router_client_session* next; HASHTABLE* dbhash; - bool hash_init; + char connect_db[MYSQL_DATABASE_MAXLEN+1]; + init_state_t init; GWBUF* queue; + DCB* dcb_route; + DCB* dcb_reply; #if defined(SS_DEBUG) skygw_chk_t rses_chk_tail; #endif diff --git a/server/modules/routing/dbshard/dbshard.c b/server/modules/routing/dbshard/dbshard.c index d34c7f992..5b9636e32 100644 --- a/server/modules/routing/dbshard/dbshard.c +++ b/server/modules/routing/dbshard/dbshard.c @@ -300,7 +300,7 @@ int gen_databaselist(ROUTER_INSTANCE* inst, ROUTER_CLIENT_SES* session) int i,rval; unsigned int len; - session->hash_init = false; + session->init |= INIT_MAPPING; len = strlen(query); buffer = gwbuf_alloc(len + 4); @@ -350,7 +350,16 @@ char* get_shard_target_name(ROUTER_INSTANCE* router, ROUTER_CLIENT_SES* client, for(i = 0; i < sz; i++){ if((rval = (char*)hashtable_fetch(ht,dbnms[i]))){ + + if(strcmp(dbnms[i],"information_schema") == 0) + { + has_dbs = false; + rval = NULL; + } + else + { skygw_log_write(LOGFILE_TRACE,"dbshard: Query targets database '%s' on server '%s",dbnms[i],rval); + } for(j = i;j < sz;j++) free(dbnms[j]); break; } @@ -484,6 +493,31 @@ char** tokenize_string(char* str) return list; } +int internalRoute(DCB* dcb) +{ + if(dcb->dcb_readqueue) + { + GWBUF* tmp = dcb->dcb_readqueue; + void* rinst = dcb->session->service->router_instance; + void *rses = dcb->session->router_session; + + dcb->dcb_readqueue = NULL; + return dcb->session->service->router->routeQuery(rinst,rses,tmp); + } + return 1; +} + +int internalReply(DCB* dcb) +{ + if(dcb->dcb_readqueue) + { + GWBUF* tmp = dcb->dcb_readqueue; + dcb->dcb_readqueue = NULL; + return SESSION_ROUTE_REPLY(dcb->session, tmp); + } + return 1; +} + /** * Implementation of the mandatory version entry point * @@ -713,7 +747,27 @@ static void* newSession( bool succp; int router_nservers = 0; /*< # of servers in total */ int i; - + char db[MYSQL_DATABASE_MAXLEN+1]; + MySQLProtocol* protocol = session->client->protocol; + MYSQL_session* data = session->data; + bool using_db = false; + + memset(db,0,MYSQL_DATABASE_MAXLEN+1); + + spinlock_acquire(&protocol->protocol_lock); + + /* To enable connecting directly to a sharded database we first need + * to disable it for the client DCB's protocol so that we can connect to them*/ + if(protocol->client_capabilities & GW_MYSQL_CAPABILITIES_CONNECT_WITH_DB) + { + protocol->client_capabilities &= ~GW_MYSQL_CAPABILITIES_CONNECT_WITH_DB; + strncpy(db,data->db,MYSQL_DATABASE_MAXLEN+1); + memset(data->db,0,MYSQL_DATABASE_MAXLEN+1); + using_db = true; + } + + spinlock_release(&protocol->protocol_lock); + client_rses = (ROUTER_CLIENT_SES *)calloc(1, sizeof(ROUTER_CLIENT_SES)); if (client_rses == NULL) @@ -729,13 +783,18 @@ static void* newSession( client_rses->router = router; client_rses->rses_mysql_session = (MYSQL_session*)session->data; client_rses->rses_client_dcb = (DCB*)session->client; - /** - * If service config has been changed, reload config from service to - * router instance first. - */ - spinlock_acquire(&router->lock); - - spinlock_release(&router->lock); + + client_rses->dcb_reply = dcb_alloc(DCB_ROLE_REQUEST_HANDLER); + client_rses->dcb_reply->func.read = internalReply; + client_rses->dcb_reply->state = DCB_STATE_POLLING; + client_rses->dcb_reply->session = session; + + client_rses->dcb_route = dcb_alloc(DCB_ROLE_REQUEST_HANDLER); + client_rses->dcb_route->func.read = internalRoute; + client_rses->dcb_route->state = DCB_STATE_POLLING; + client_rses->dcb_route->session = session; + if(using_db) + client_rses->init |= INIT_USE_DB; /** * Set defaults to session variables. */ @@ -838,9 +897,18 @@ static void* newSession( goto return_rses; } + + if(db[0] != 0x0) + { + /* Store the database the client is connecting to */ + strncpy(client_rses->connect_db,db,MYSQL_DATABASE_MAXLEN+1); + } + /* Generate database list */ gen_databaselist(router,client_rses); rses_end_locked_router_action(client_rses); + + /** * Version is bigger than zero once initialized. */ @@ -943,6 +1011,13 @@ static void closeSession( atomic_add(&bref->bref_backend->backend_conn_count, -1); } } + + /* Close internal DCBs */ + router_cli_ses->dcb_reply->session = NULL; + router_cli_ses->dcb_route->session = NULL; + dcb_close(router_cli_ses->dcb_reply); + dcb_close(router_cli_ses->dcb_route); + /** Unlock */ rses_end_locked_router_action(router_cli_ses); } @@ -1556,7 +1631,7 @@ static int routeQuery( route_target_t route_target = TARGET_UNDEFINED; bool succp = false; char* tname = NULL; - int i; + CHK_CLIENT_RSES(router_cli_ses); @@ -1567,9 +1642,19 @@ static int routeQuery( } ss_dassert(!GWBUF_IS_TYPE_UNDEFINED(querybuf)); - if(!rses_is_closed && !router_cli_ses->hash_init) + if(!rses_is_closed && router_cli_ses->init != INIT_READY) { - router_cli_ses->queue = querybuf; + gwbuf_make_contiguous(querybuf); + + GWBUF* ptr = router_cli_ses->queue; + + while(ptr && ptr->next) + ptr = ptr->next; + + if(ptr == NULL) + router_cli_ses->queue = querybuf; + else + ptr->next = querybuf; return 1; } packet = GWBUF_DATA(querybuf); @@ -1694,31 +1779,11 @@ static int routeQuery( /** * Generate custom response that contains all the databases */ - - backend_ref_t* backend = NULL; - DCB* backend_dcb = NULL; - - for(i = 0;i < router_cli_ses->rses_nbackends;i++) - { - if(SERVER_IS_RUNNING(router_cli_ses->rses_backend_ref[i].bref_backend->backend_server)) - { - backend = &router_cli_ses->rses_backend_ref[i]; - backend_dcb = backend->bref_dcb; - break; - } - } - - if(backend) - { - GWBUF* fake = gen_show_dbs_response(inst,router_cli_ses); - poll_add_epollin_event_to_dcb(backend_dcb,fake); - ret = 1; - } - else - { - ret = 0; - } - + + GWBUF* fake = gen_show_dbs_response(inst,router_cli_ses); + poll_add_epollin_event_to_dcb(router_cli_ses->dcb_reply,fake); + ret = 1; + goto retblock; } @@ -1862,7 +1927,6 @@ static int routeQuery( if(TARGET_IS_ANY(route_target)) { - /**No valid backends alive*/ skygw_log_write(LOGFILE_TRACE,"dbshard: No backends are running"); rses_end_locked_router_action(router_cli_ses); @@ -2156,7 +2220,7 @@ static void clientReply ( } #endif - if(!router_cli_ses->hash_init) + if(router_cli_ses->init & INIT_MAPPING) { bool mapped = true; int i; @@ -2165,7 +2229,7 @@ static void clientReply ( for(i = 0; i < router_cli_ses->rses_nbackends; i++) { - if(bref->bref_dcb == bkrf[i].bref_dcb) + if(bref->bref_dcb == bkrf[i].bref_dcb && !BREF_IS_MAPPED(&bkrf[i])) { router_cli_ses->rses_backend_ref[i].bref_mapped = true; parse_showdb_response(router_cli_ses, @@ -2183,8 +2247,7 @@ static void clientReply ( } } - gwbuf_free(writebuf); - rses_end_locked_router_action(router_cli_ses); + gwbuf_free(writebuf); if(mapped) { @@ -2192,28 +2255,99 @@ static void clientReply ( * Check if the session is reconnecting with a database name * that is not in the hashtable. If the database is not found * then close the session. - */ - if(router_cli_ses->rses_mysql_session->db[0] != '\0' && - hashtable_fetch(router_cli_ses->dbhash, - router_cli_ses->rses_mysql_session->db) == NULL) + * */ + router_cli_ses->init &= ~INIT_MAPPING; + + if(router_cli_ses->init & INIT_USE_DB) { - router_cli_ses->rses_closed = true; + char* target; + + if((target = hashtable_fetch(router_cli_ses->dbhash, + router_cli_ses->connect_db)) == NULL) + { + skygw_log_write_flush(LOGFILE_TRACE,"dbshard: Connecting to a non-existent database '%s'", + router_cli_ses->connect_db); + router_cli_ses->rses_closed = true; + if(router_cli_ses->queue) + gwbuf_free(router_cli_ses->queue); + rses_end_locked_router_action(router_cli_ses); + return; + } + + /* Send a COM_INIT_DB packet to the server with the right database + * and set it as the client's active database */ + + unsigned int qlen; + GWBUF* buffer; + + qlen = strlen(router_cli_ses->connect_db); + buffer = gwbuf_alloc(qlen + 5); + if(buffer == NULL) + { + skygw_log_write_flush(LOGFILE_ERROR,"Error : Buffer allocation failed."); + router_cli_ses->rses_closed = true; + if(router_cli_ses->queue) + gwbuf_free(router_cli_ses->queue); + rses_end_locked_router_action(router_cli_ses); + return; + } + + gw_mysql_set_byte3((unsigned char*)buffer->start,qlen+1); + gwbuf_set_type(buffer,GWBUF_TYPE_MYSQL); + *((unsigned char*)buffer->start + 3) = 0x0; + *((unsigned char*)buffer->start + 4) = 0x2; + memcpy(buffer->start+5,router_cli_ses->connect_db,qlen); + DCB* dcb = NULL; + + if(get_shard_dcb(&dcb,router_cli_ses,target)) + { + dcb->func.write(dcb,buffer); + } + else + { + skygw_log_write_flush(LOGFILE_TRACE,"dbshard: Couldn't find target DCB for '%s'.",target); + router_cli_ses->rses_closed = true; + if(router_cli_ses->queue) + gwbuf_free(router_cli_ses->queue); + } + + rses_end_locked_router_action(router_cli_ses); + return; } - router_cli_ses->hash_init = true; if(router_cli_ses->queue) { - routeQuery(instance,router_session,router_cli_ses->queue); - router_cli_ses->queue = NULL; + GWBUF* tmp = router_cli_ses->queue; + router_cli_ses->queue = router_cli_ses->queue->next; + tmp->next = NULL; + poll_add_epollin_event_to_dcb(router_cli_ses->dcb_route,tmp); } skygw_log_write_flush(LOGFILE_DEBUG,"session [%p] database map finished.", router_cli_ses); - } + } + + rses_end_locked_router_action(router_cli_ses); return; } + if(router_cli_ses->queue) + { + GWBUF* tmp = router_cli_ses->queue; + router_cli_ses->queue = router_cli_ses->queue->next; + tmp->next = NULL; + poll_add_epollin_event_to_dcb(router_cli_ses->dcb_route,tmp); + } + + if(router_cli_ses->init & INIT_USE_DB) + { + router_cli_ses->init &= ~INIT_USE_DB; + strcpy(router_cli_ses->rses_mysql_session->db,router_cli_ses->connect_db); + rses_end_locked_router_action(router_cli_ses); + return; + } + CHK_BACKEND_REF(bref); scur = &bref->bref_sescmd_cur; /** @@ -3701,7 +3835,7 @@ static bool handle_error_new_connection( goto return_succp; } - rses->hash_init = false; + rses->init |= INIT_MAPPING; for(i = 0;irses_nbackends;i++) { @@ -4009,7 +4143,7 @@ reply_error: skygw_log_write_flush(LOGFILE_ERROR,"Error : All backend connections are down."); return false; } - poll_add_epollin_event_to_dcb(rses->rses_backend_ref->bref_dcb, + poll_add_epollin_event_to_dcb(rses->dcb_reply, gwbuf_clone(errbuf)); gwbuf_free(errbuf); }