diff --git a/server/modules/include/schemarouter.h b/server/modules/include/schemarouter.h index 16fb8640b..b988df8bd 100644 --- a/server/modules/include/schemarouter.h +++ b/server/modules/include/schemarouter.h @@ -56,6 +56,26 @@ typedef enum showdb_response SHOWDB_DUPLICATE_DATABASES, SHOWDB_FATAL_ERROR } showdb_response_t; + +enum shard_map_state +{ + SHMAP_UNINIT, /*< No databases have been added to this shard map */ + SHMAP_READY, /*< All available databases have been added */ + SHMAP_STALE /*< The shard map has old data or has not been updated recently */ +}; + +/** + * A map of the shards tied to a single user. + */ +typedef struct shard_map +{ + HASHTABLE *hash; /*< A hashtable of database names and the servers which + * have these databases. */ + SPINLOCK lock; + time_t last_updated; + enum shard_map_state state; /*< State of the shard map */ +}shard_map_t; + /** * The state of the backend server reference */ @@ -274,6 +294,8 @@ typedef struct { double ses_longest; /*< Longest session */ double ses_shortest; /*< Shortest session */ double ses_average; /*< Average session length */ + int shmap_cache_hit; /*< Shard map was found from the cache */ + int shmap_cache_miss;/*< No shard map found from the cache */ } ROUTER_STATS; /** @@ -299,7 +321,7 @@ struct router_client_session { bool rses_transaction_active; /*< Is a transaction active */ struct router_instance *router; /*< The router instance */ struct router_client_session* next; /*< List of router sessions */ - HASHTABLE* dbhash; /*< Database hash containing names of the databases mapped to the servers that contain them */ + shard_map_t* shardmap; /*< Database hash containing names of the databases mapped to the servers that contain them */ char connect_db[MYSQL_DATABASE_MAXLEN+1]; /*< Database the user was trying to connect to */ init_mask_t init; /*< Initialization state bitmask */ GWBUF* queue; /*< Query that was received before the session was ready */ @@ -318,6 +340,7 @@ struct router_client_session { * The per instance data for the router. */ typedef struct router_instance { + HASHTABLE* shard_maps; /*< Shard maps hashed by user name */ SERVICE* service; /*< Pointer to service */ ROUTER_CLIENT_SES* connections; /*< List of client connections */ SPINLOCK lock; /*< Lock for the instance data */ diff --git a/server/modules/routing/schemarouter/schemarouter.c b/server/modules/routing/schemarouter/schemarouter.c index 22330c24d..d2e076254 100644 --- a/server/modules/routing/schemarouter/schemarouter.c +++ b/server/modules/routing/schemarouter/schemarouter.c @@ -41,6 +41,9 @@ /** Size of the hashtable used to store ignored databases */ #define SCHEMAROUTER_HASHSIZE 100 +/** Hashtable size for the per user shard maps */ +#define SCHEMAROUTER_USERHASH_SIZE 10 + MODULE_INFO info = { MODULE_API_ROUTER, MODULE_BETA_RELEASE, @@ -223,6 +226,7 @@ int inspect_backend_mapping_states(ROUTER_CLIENT_SES *router_cli_ses, GWBUF** wbuf); bool handle_default_db(ROUTER_CLIENT_SES *router_cli_ses); void route_queued_query(ROUTER_CLIENT_SES *router_cli_ses); +void synchronize_shard_map(ROUTER_CLIENT_SES *client); static int hashkeyfun(void* key) { if(key == NULL){ @@ -247,7 +251,39 @@ static int hashcmpfun( return strcmp(i1,i2); } +void* keyfreefun(void* data) +{ + free(data); + return NULL; +} +/** + * Allocate a shard map and initialize it. + * @return Pointer to new shard_map_t or NULL if memory allocation failed + */ +shard_map_t* create_shard_map() +{ + shard_map_t *rval; + + if ((rval = (shard_map_t*) malloc(sizeof(shard_map_t)))) + { + if ((rval->hash = hashtable_alloc(SCHEMAROUTER_HASHSIZE, hashkeyfun, hashcmpfun))) + { + HASHMEMORYFN kcopy = (HASHMEMORYFN)strdup; + HASHMEMORYFN kfree = (HASHMEMORYFN)keyfreefun; + hashtable_memory_fns(rval->hash, kcopy, NULL, kfree, NULL); + spinlock_init(&rval->lock); + rval->last_updated = 0; + rval->state = SHMAP_UNINIT; + } + else + { + free(rval); + rval = NULL; + } + } + return rval; +} /** * Convert a length encoded string into a C string. @@ -363,6 +399,7 @@ showdb_response_t parse_showdb_response(ROUTER_CLIENT_SES* rses, backend_ref_t* ptr += gw_mysql_get_byte3(ptr) + 4; } + spinlock_acquire(&rses->shardmap->lock); while (ptr < (unsigned char*) buf->end && !PTR_IS_EOF(ptr)) { int payloadlen = gw_mysql_get_byte3(ptr); @@ -371,7 +408,7 @@ showdb_response_t parse_showdb_response(ROUTER_CLIENT_SES* rses, backend_ref_t* if (data) { - if (hashtable_add(rses->dbhash, data, target)) + if (hashtable_add(rses->shardmap->hash, data, target)) { skygw_log_write(LOGFILE_TRACE, "schemarouter: <%s, %s>", target, data); } @@ -385,7 +422,7 @@ showdb_response_t parse_showdb_response(ROUTER_CLIENT_SES* rses, backend_ref_t* { duplicate_found = true; skygw_log_write(LE, "Error: Database '%s' found on servers '%s' and '%s' for user %s@%s.", - data, target, hashtable_fetch(rses->dbhash, data), + data, target, hashtable_fetch(rses->shardmap->hash, data), rses->rses_client_dcb->user, rses->rses_client_dcb->remote); } @@ -394,6 +431,7 @@ showdb_response_t parse_showdb_response(ROUTER_CLIENT_SES* rses, backend_ref_t* } ptr += packetlen; } + spinlock_release(&rses->shardmap->lock); if (ptr < (unsigned char*) buf->end && PTR_IS_EOF(ptr) && bref->n_mapping_eof == 1) @@ -478,8 +516,8 @@ int gen_databaselist(ROUTER_INSTANCE* inst, ROUTER_CLIENT_SES* session) * @param buffer Query to inspect * @return Name of the backend or NULL if the query contains no known databases. */ -char* get_shard_target_name(ROUTER_INSTANCE* router, ROUTER_CLIENT_SES* client, GWBUF* buffer,skygw_query_type_t qtype){ - HASHTABLE* ht = client->dbhash; +char* get_shard_target_name(ROUTER_INSTANCE* router, ROUTER_CLIENT_SES* client, GWBUF* buffer,skygw_query_type_t qtype) +{ int sz = 0,i,j; char** dbnms = NULL; char* rval = NULL,*query, *tmp = NULL; @@ -491,6 +529,9 @@ char* get_shard_target_name(ROUTER_INSTANCE* router, ROUTER_CLIENT_SES* client, dbnms = skygw_get_database_names(buffer,&sz); + spinlock_acquire(&client->shardmap->lock); + HASHTABLE* ht = client->shardmap->hash; + if(sz > 0){ for(i = 0; i < sz; i++){ char* name; @@ -553,6 +594,7 @@ char* get_shard_target_name(ROUTER_INSTANCE* router, ROUTER_CLIENT_SES* client, } + spinlock_release(&client->shardmap->lock); return rval; } @@ -584,7 +626,8 @@ char* get_shard_target_name(ROUTER_INSTANCE* router, ROUTER_CLIENT_SES* client, skygw_log_write(LOGFILE_TRACE,"schemarouter: Using active database '%s'",client->rses_mysql_session->db); } } - + + spinlock_release(&client->shardmap->lock); return rval; } @@ -765,6 +808,17 @@ createInstance(SERVICE *service, char **options) (HASHMEMORYFN)free, NULL); + if ((router->shard_maps = hashtable_alloc(SCHEMAROUTER_USERHASH_SIZE, hashkeyfun, hashcmpfun)) == NULL) + { + skygw_log_write(LE, "Error: Memory allocation failed when allocating schemarouter database ignore list."); + hashtable_free(router->ignored_dbs); + free(router); + return NULL; + } + + hashtable_memory_fns(router->shard_maps,(HASHMEMORYFN)strdup, + NULL, (HASHMEMORYFN)keyfreefun, NULL); + /** Add default system databases to ignore */ hashtable_add(router->ignored_dbs,"mysql",""); hashtable_add(router->ignored_dbs,"information_schema",""); @@ -1007,7 +1061,7 @@ static void* newSession( memset(db,0,MYSQL_DATABASE_MAXLEN+1); - spinlock_acquire(&protocol->protocol_lock); + spinlock_acquire(&session->ses_lock); /* To enable connecting directly to a sharded database we first need * to disable it for the client DCB's protocol so that we can connect to them*/ @@ -1028,7 +1082,7 @@ static void* newSession( LOGIF(LT,(skygw_log_write(LT,"schemarouter: Client'%s' connecting with empty database.",data->user))); } - spinlock_release(&protocol->protocol_lock); + spinlock_release(&session->ses_lock); client_rses = (ROUTER_CLIENT_SES *)calloc(1, sizeof(ROUTER_CLIENT_SES)); @@ -1045,7 +1099,44 @@ static void* newSession( client_rses->router = router; client_rses->rses_mysql_session = (MYSQL_session*)session->data; client_rses->rses_client_dcb = (DCB*)session->client; - + + spinlock_acquire(&router->lock); + + shard_map_t *map = hashtable_fetch(router->shard_maps, session->client->user); + enum shard_map_state state; + + if (map) + { + spinlock_acquire(&map->lock); + double tdiff = difftime(time(NULL), map->last_updated); + if (tdiff > router->schemarouter_config.refresh_min_interval) + { + map->state = SHMAP_STALE; + } + state = map->state; + spinlock_release(&map->lock); + } + + spinlock_release(&router->lock); + + if (map == NULL || state != SHMAP_READY) + { + if ((map = create_shard_map()) == NULL) + { + skygw_log_write(LE, "Error: Failed to allocate enough memory to create" + "new shard mapping. Session will be closed."); + free(client_rses); + return NULL; + } + client_rses->init = INIT_UNINT; + } + else + { + client_rses->init = INIT_READY; + atomic_add(&router->stats.shmap_cache_hit, 1); + } + + client_rses->shardmap = map; client_rses->dcb_reply = dcb_alloc(DCB_ROLE_REQUEST_HANDLER); client_rses->dcb_reply->func.read = internalReply; client_rses->dcb_reply->state = DCB_STATE_POLLING; @@ -1057,7 +1148,7 @@ static void* newSession( client_rses->dcb_route->state = DCB_STATE_POLLING; client_rses->dcb_route->session = session; client_rses->rses_config.last_refresh = time(NULL); - client_rses->init = INIT_UNINT; + if(using_db) client_rses->init |= INIT_USE_DB; /** @@ -1131,12 +1222,6 @@ static void* newSession( router_nservers, session, router); - - client_rses->dbhash = hashtable_alloc(SCHEMAROUTER_HASHSIZE, hashkeyfun, hashcmpfun); - hashtable_memory_fns(client_rses->dbhash,(HASHMEMORYFN)strdup, - (HASHMEMORYFN)strdup, - (HASHMEMORYFN)free, - (HASHMEMORYFN)free); rses_end_locked_router_action(client_rses); @@ -1366,7 +1451,6 @@ static void freeSession( * all the memory and other resources associated * to the client session. */ - hashtable_free(router_cli_ses->dbhash); free(router_cli_ses->rses_backend_ref); free(router_cli_ses); return; @@ -1744,8 +1828,6 @@ GWBUF* gen_show_dbs_response(ROUTER_INSTANCE* router, ROUTER_CLIENT_SES* client) { GWBUF* rval = NULL; - HASHTABLE* ht = client->dbhash; - HASHITERATOR* iter = hashtable_iterator(ht); backend_ref_t *bref = client->rses_backend_ref; BACKEND** backends = router->servers; unsigned int coldef_len = 0; @@ -1845,44 +1927,55 @@ gen_show_dbs_response(ROUTER_INSTANCE* router, ROUTER_CLIENT_SES* client) int j = 0,ndbs = 0, bufsz = 10; char** dbs; - if((dbs = malloc(sizeof(char*)*bufsz)) == NULL) + if((dbs = malloc(sizeof(char*) * bufsz)) == NULL) { gwbuf_free(rval); - hashtable_iterator_free(iter); return NULL; } - while((value = (char*) hashtable_next(iter))) + spinlock_acquire(&client->shardmap->lock); + + if(client->shardmap->state == SHMAP_READY) { - char* bend = hashtable_fetch(ht, value); + HASHTABLE* ht = client->shardmap->hash; + HASHITERATOR* iter = hashtable_iterator(ht); - for(i = 0; backends[i]; i++) + while((value = (char*) hashtable_next(iter))) { - if(strcmp(bref[i].bref_backend->backend_server->unique_name, bend) == 0 && - BREF_IS_IN_USE(&bref[i]) && !BREF_IS_CLOSED(&bref[i])) + char* bend = hashtable_fetch(ht, value); + for(i = 0; backends[i]; i++) { - ndbs++; - - if(ndbs >= bufsz) - { - bufsz += bufsz / 2; - char** tmp = realloc(dbs,sizeof(char*)*bufsz); - if(tmp == NULL) - { - gwbuf_free(rval); - hashtable_iterator_free(iter); - for(i=0;ibackend_server->unique_name, bend) == 0 && + BREF_IS_IN_USE(&bref[i]) && !BREF_IS_CLOSED(&bref[i])) + { + ndbs++; + if(ndbs >= bufsz) + { + bufsz += bufsz / 2; + char** tmp = realloc(dbs,sizeof(char*) * bufsz); + if(tmp == NULL) + { + gwbuf_free(rval); + hashtable_iterator_free(iter); + for (i = 0; i < ndbs - 1; i++) + { + free(dbs[i]); + } + free(dbs); + spinlock_release(&client->shardmap->lock); + return NULL; + } + dbs = tmp; + } + dbs[j++] = strdup(value); + } } } + hashtable_iterator_free(iter); } + spinlock_release(&client->shardmap->lock); + qsort(&dbs[0],(size_t)ndbs,sizeof(char*),cmpfn); for(j = 0;jstart, eof, sizeof(eof)); rval = gwbuf_append(rval, last_packet); - - rval = gwbuf_make_contiguous(rval); - hashtable_iterator_free(iter); free(dbs); return rval; } @@ -1987,9 +2077,15 @@ static int routeQuery( } - if(router_cli_ses->init & INIT_MAPPING) + /** + * If the databases are still being mapped or if the client connected + * with a default database but no database mapping was performed we need + * to store the query. Once the databases have been mapped and/or the + * default database is taken into use we can send the query forward. + */ + if(router_cli_ses->init & (INIT_MAPPING|INIT_USE_DB)) { - + int init_rval = 1; char* querystr = modutil_get_SQL(querybuf); skygw_log_write(LOGFILE_DEBUG|LOGFILE_TRACE,"schemarouter: Storing query for session %p: %s", router_cli_ses->rses_client_dcb->session, @@ -2012,14 +2108,26 @@ static int routeQuery( ptr->next = querybuf; } + + if(router_cli_ses->init == (INIT_READY|INIT_USE_DB)) + { + /** + * This state is possible if a client connects with a default database + * and the shard map was found from the router cache + */ + if (!handle_default_db(router_cli_ses)) + { + init_rval = 0; + } + } rses_end_locked_router_action(router_cli_ses); - return 1; + return init_rval; } } rses_end_locked_router_action(router_cli_ses); - + packet = GWBUF_DATA(querybuf); packet_type = packet[4]; @@ -2139,9 +2247,12 @@ static int routeQuery( if (packet_type == MYSQL_COM_INIT_DB || op == QUERY_OP_CHANGE_DB) { - if (!(change_successful = change_current_db(router_cli_ses->rses_mysql_session, - router_cli_ses->dbhash, - querybuf))) + spinlock_acquire(&router_cli_ses->shardmap->lock); + change_successful = change_current_db(router_cli_ses->rses_mysql_session, + router_cli_ses->shardmap->hash, + querybuf); + spinlock_release(&router_cli_ses->shardmap->lock); + if (!change_successful) { time_t now = time(NULL); if(router_cli_ses->rses_config.refresh_databases && @@ -2149,22 +2260,21 @@ static int routeQuery( router_cli_ses->rses_config.refresh_min_interval) { rses_begin_locked_router_action(router_cli_ses); + router_cli_ses->rses_config.last_refresh = now; router_cli_ses->queue = querybuf; - hashtable_free(router_cli_ses->dbhash); - if((router_cli_ses->dbhash = hashtable_alloc(SCHEMAROUTER_HASHSIZE, hashkeyfun, hashcmpfun)) == NULL) - { - skygw_log_write(LE,"Error: Hashtable allocation failed."); - rses_end_locked_router_action(router_cli_ses); - return 1; - } - hashtable_memory_fns(router_cli_ses->dbhash,(HASHMEMORYFN)strdup, - (HASHMEMORYFN)strdup, - (HASHMEMORYFN)free, - (HASHMEMORYFN)free); - gen_databaselist(inst,router_cli_ses); + int rc_refresh = 1; + + if((router_cli_ses->shardmap = create_shard_map())) + { + gen_databaselist(inst,router_cli_ses); + } + else + { + rc_refresh = 0; + } rses_end_locked_router_action(router_cli_ses); - return 1; + return rc_refresh; } extract_database(querybuf,db); snprintf(errbuf,25+MYSQL_DATABASE_MAXLEN,"Unknown database: %s",db); @@ -2207,8 +2317,11 @@ static int routeQuery( op == QUERY_OP_CHANGE_DB) { route_target = TARGET_UNDEFINED; - tname = hashtable_fetch(router_cli_ses->dbhash,router_cli_ses->rses_mysql_session->db); - + + spinlock_acquire(&router_cli_ses->shardmap->lock); + tname = hashtable_fetch(router_cli_ses->shardmap->hash,router_cli_ses->rses_mysql_session->db); + spinlock_release(&router_cli_ses->shardmap->lock); + if(tname) { skygw_log_write(LOGFILE_TRACE,"schemarouter: INIT_DB for database '%s' on server '%s'", @@ -2556,6 +2669,8 @@ diagnostic(ROUTER *instance, DCB *dcb) dcb_printf(dcb,"Shortest session: %.2lf seconds\n",router->stats.ses_shortest); dcb_printf(dcb,"Average session length: %.2lf seconds\n",router->stats.ses_average); } + dcb_printf(dcb,"Shard map cache hits: %d\n",router->stats.shmap_cache_hit); + dcb_printf(dcb,"Shard map cache misses: %d\n",router->stats.shmap_cache_miss); dcb_printf(dcb,"\n"); } @@ -2633,6 +2748,21 @@ static void clientReply(ROUTER* instance, if (rc == 1) { + spinlock_acquire(&router_cli_ses->shardmap->lock); + + router_cli_ses->shardmap->state = SHMAP_READY; + router_cli_ses->shardmap->last_updated = time(NULL); + spinlock_release(&router_cli_ses->shardmap->lock); + + rses_end_locked_router_action(router_cli_ses); + + synchronize_shard_map(router_cli_ses); + + if (!rses_begin_locked_router_action(router_cli_ses)) + { + return; + } + /* * Check if the session is reconnecting with a database name * that is not in the hashtable. If the database is not found @@ -2653,6 +2783,7 @@ static void clientReply(ROUTER* instance, if (router_cli_ses->queue) { + ss_dassert(router_cli_ses->init == INIT_READY); route_queued_query(router_cli_ses); } skygw_log_write_flush(LOGFILE_DEBUG, @@ -2669,11 +2800,6 @@ static void clientReply(ROUTER* instance, return; } - if (router_cli_ses->queue) - { - route_queued_query(router_cli_ses); - } - if (router_cli_ses->init & INIT_USE_DB) { skygw_log_write(LOGFILE_DEBUG, "schemarouter: Reply to USE '%s' received for session %p", @@ -2682,6 +2808,12 @@ static void clientReply(ROUTER* instance, router_cli_ses->init &= ~INIT_USE_DB; strcpy(router_cli_ses->rses_mysql_session->db, router_cli_ses->connect_db); ss_dassert(router_cli_ses->init == INIT_READY); + + if (router_cli_ses->queue) + { + route_queued_query(router_cli_ses); + } + rses_end_locked_router_action(router_cli_ses); if (writebuf) { @@ -2690,6 +2822,14 @@ static void clientReply(ROUTER* instance, return; } + if (router_cli_ses->queue) + { + ss_dassert(router_cli_ses->init == INIT_READY); + route_queued_query(router_cli_ses); + rses_end_locked_router_action(router_cli_ses); + return; + } + CHK_BACKEND_REF(bref); scur = &bref->bref_sescmd_cur; /** @@ -4447,7 +4587,7 @@ RESULT_ROW* shard_list_cb(struct resultset* rset, void* data) RESULT_ROW* rval = NULL; if((key = hashtable_next(sl->iter)) && - (value = hashtable_fetch(sl->rses->dbhash,key))) + (value = hashtable_fetch(sl->rses->shardmap->hash,key))) { if((rval = resultset_make_row(sl->rset))) { @@ -4465,23 +4605,32 @@ RESULT_ROW* shard_list_cb(struct resultset* rset, void* data) */ int process_show_shards(ROUTER_CLIENT_SES* rses) { - HASHITERATOR* iter = hashtable_iterator(rses->dbhash); - struct shard_list sl; + int rval = 0; - sl.iter = iter; - sl.rses = rses; - if((sl.rset = resultset_create(shard_list_cb,&sl)) == NULL) + spinlock_acquire(&rses->shardmap->lock); + if(rses->shardmap->state == SHMAP_READY) { - skygw_log_write(LE,"[%s] Error: Failed to create resultset.",__FUNCTION__); - return -1; - } + HASHITERATOR* iter = hashtable_iterator(rses->shardmap->hash); + struct shard_list sl; - resultset_add_column(sl.rset,"Database",MYSQL_DATABASE_MAXLEN,COL_TYPE_VARCHAR); - resultset_add_column(sl.rset,"Server",MYSQL_DATABASE_MAXLEN,COL_TYPE_VARCHAR); - resultset_stream_mysql(sl.rset,rses->rses_client_dcb); - resultset_free(sl.rset); - hashtable_iterator_free(iter); - return 0; + sl.iter = iter; + sl.rses = rses; + if ((sl.rset = resultset_create(shard_list_cb, &sl)) == NULL) + { + skygw_log_write(LE, "[%s] Error: Failed to create resultset.", __FUNCTION__); + rval = -1; + } + else + { + resultset_add_column(sl.rset, "Database", MYSQL_DATABASE_MAXLEN, COL_TYPE_VARCHAR); + resultset_add_column(sl.rset, "Server", MYSQL_DATABASE_MAXLEN, COL_TYPE_VARCHAR); + resultset_stream_mysql(sl.rset, rses->rses_client_dcb); + resultset_free(sl.rset); + hashtable_iterator_free(iter); + } + } + spinlock_release(&rses->shardmap->lock); + return rval; } /** @@ -4514,10 +4663,53 @@ void write_error_to_client(DCB* dcb, int errnum, const char* mysqlstate, const c */ bool handle_default_db(ROUTER_CLIENT_SES *router_cli_ses) { - char* target; + bool rval = false; + char* target = NULL; - if ((target = hashtable_fetch(router_cli_ses->dbhash, - router_cli_ses->connect_db)) == NULL) + spinlock_acquire(&router_cli_ses->shardmap->lock); + if(router_cli_ses->shardmap->state == SHMAP_READY) + { + target = hashtable_fetch(router_cli_ses->shardmap->hash, router_cli_ses->connect_db); + } + spinlock_release(&router_cli_ses->shardmap->lock); + + if (target) + { + /* Send a COM_INIT_DB packet to the server with the right database + * and set it as the client's active database */ + + unsigned int qlen = strlen(router_cli_ses->connect_db); + GWBUF* buffer = gwbuf_alloc(qlen + 5); + + if (buffer) + { + gw_mysql_set_byte3((unsigned char*) buffer->start, qlen + 1); + gwbuf_set_type(buffer, GWBUF_TYPE_MYSQL); + *((unsigned char*) buffer->start + 3) = 0x0; + *((unsigned char*) buffer->start + 4) = 0x2; + memcpy(buffer->start + 5, router_cli_ses->connect_db, qlen); + DCB* dcb = NULL; + + if (get_shard_dcb(&dcb, router_cli_ses, target)) + { + dcb->func.write(dcb, buffer); + skygw_log_write(LOGFILE_DEBUG, "schemarouter: USE '%s' sent to %s for session %p", + router_cli_ses->connect_db, + target, + router_cli_ses->rses_client_dcb->session); + rval = true; + } + else + { + skygw_log_write_flush(LOGFILE_TRACE, "schemarouter: Couldn't find target DCB for '%s'.", target); + } + } + else + { + skygw_log_write_flush(LOGFILE_ERROR, "Error : Buffer allocation failed."); + } + } + else { /** Unknown database, hang up on the client*/ skygw_log_write_flush(LOGFILE_TRACE, "schemarouter: Connecting to a non-existent database '%s'", @@ -4526,50 +4718,16 @@ bool handle_default_db(ROUTER_CLIENT_SES *router_cli_ses) sprintf(errmsg, "Unknown database '%s'", router_cli_ses->connect_db); if (router_cli_ses->rses_config.debug) { - sprintf(errmsg + strlen(errmsg), " ([%lu]: DB not found on connect)", router_cli_ses->rses_client_dcb->session->ses_id); + sprintf(errmsg + strlen(errmsg), " ([%lu]: DB not found on connect)", + router_cli_ses->rses_client_dcb->session->ses_id); } write_error_to_client(router_cli_ses->rses_client_dcb, SCHEMA_ERR_DBNOTFOUND, SCHEMA_ERRSTR_DBNOTFOUND, errmsg); - return false; } - /* Send a COM_INIT_DB packet to the server with the right database - * and set it as the client's active database */ - - unsigned int qlen; - GWBUF* buffer; - - qlen = strlen(router_cli_ses->connect_db); - buffer = gwbuf_alloc(qlen + 5); - if (buffer == NULL) - { - skygw_log_write_flush(LOGFILE_ERROR, "Error : Buffer allocation failed."); - return false; - } - - gw_mysql_set_byte3((unsigned char*) buffer->start, qlen + 1); - gwbuf_set_type(buffer, GWBUF_TYPE_MYSQL); - *((unsigned char*) buffer->start + 3) = 0x0; - *((unsigned char*) buffer->start + 4) = 0x2; - memcpy(buffer->start + 5, router_cli_ses->connect_db, qlen); - DCB* dcb = NULL; - - if (get_shard_dcb(&dcb, router_cli_ses, target)) - { - dcb->func.write(dcb, buffer); - skygw_log_write(LOGFILE_DEBUG, "schemarouter: USE '%s' sent to %s for session %p", - router_cli_ses->connect_db, - target, - router_cli_ses->rses_client_dcb->session); - } - else - { - skygw_log_write_flush(LOGFILE_TRACE, "schemarouter: Couldn't find target DCB for '%s'.", target); - return false; - } - return true; + return rval; } void route_queued_query(ROUTER_CLIENT_SES *router_cli_ses) @@ -4684,3 +4842,75 @@ int inspect_backend_mapping_states(ROUTER_CLIENT_SES *router_cli_ses, *wbuf = writebuf; return mapped ? 1 : 0; } + +/** + * Replace a shard map with another one. This function copies the contents of + * the source shard map to the target and frees the source memory. + * @param target Target shard map to replace + * @param source Source shard map to use + */ +void replace_shard_map(shard_map_t **target, shard_map_t **source) +{ + shard_map_t *tgt = *target; + shard_map_t *src = *source; + tgt->last_updated = src->last_updated; + tgt->state = src->state; + hashtable_free(tgt->hash); + tgt->hash = src->hash; + free(src); + *source = NULL; +} + +/** + * Synchronize the router client session shard map with the global shard map for + * this user. + * + * If the router doesn't have a shard map for this user then the current shard map + * of the client session is added to the router. If the shard map in the router is + * out of date, its contents are replaced with the contents of the current client + * session. If the router has a usable shard map, the current shard map of the client + * is discarded and the router's shard map is used. + * @param client Router session + */ +void synchronize_shard_map(ROUTER_CLIENT_SES *client) +{ + spinlock_acquire(&client->router->lock); + + client->router->stats.shmap_cache_miss++; + + shard_map_t *map = hashtable_fetch(client->router->shard_maps, + client->rses_client_dcb->user); + if (map) + { + spinlock_acquire(&map->lock); + if (map->state == SHMAP_STALE) + { + replace_shard_map(&map, &client->shardmap); + } + else if (map->state != SHMAP_READY) + { + skygw_log_write(LE, "Warning: Shard map state is not ready but" + "it is in use. Replacing it with a newer one."); + replace_shard_map(&map, &client->shardmap); + } + else + { + /** + * Another thread has already updated the shard map for this user + */ + hashtable_free(client->shardmap->hash); + free(client->shardmap); + } + spinlock_release(&map->lock); + client->shardmap = map; + } + else + { + hashtable_add(client->router->shard_maps, + client->rses_client_dcb->user, + client->shardmap); + ss_dassert(hashtable_fetch(client->router->shard_maps, + client->rses_client_dcb->user) == client->shardmap); + } + spinlock_release(&client->router->lock); +}