Fix to MXS-430: https://mariadb.atlassian.net/browse/MXS-430
Added caching of the hashtables used to map databases to servers.
This commit is contained in:
@ -41,6 +41,9 @@
|
||||
/** Size of the hashtable used to store ignored databases */
|
||||
#define SCHEMAROUTER_HASHSIZE 100
|
||||
|
||||
/** Hashtable size for the per user shard maps */
|
||||
#define SCHEMAROUTER_USERHASH_SIZE 10
|
||||
|
||||
MODULE_INFO info = {
|
||||
MODULE_API_ROUTER,
|
||||
MODULE_BETA_RELEASE,
|
||||
@ -223,6 +226,7 @@ int inspect_backend_mapping_states(ROUTER_CLIENT_SES *router_cli_ses,
|
||||
GWBUF** wbuf);
|
||||
bool handle_default_db(ROUTER_CLIENT_SES *router_cli_ses);
|
||||
void route_queued_query(ROUTER_CLIENT_SES *router_cli_ses);
|
||||
void synchronize_shard_map(ROUTER_CLIENT_SES *client);
|
||||
static int hashkeyfun(void* key)
|
||||
{
|
||||
if(key == NULL){
|
||||
@ -247,7 +251,39 @@ static int hashcmpfun(
|
||||
return strcmp(i1,i2);
|
||||
}
|
||||
|
||||
void* keyfreefun(void* data)
|
||||
{
|
||||
free(data);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/**
|
||||
* Allocate a shard map and initialize it.
|
||||
* @return Pointer to new shard_map_t or NULL if memory allocation failed
|
||||
*/
|
||||
shard_map_t* create_shard_map()
|
||||
{
|
||||
shard_map_t *rval;
|
||||
|
||||
if ((rval = (shard_map_t*) malloc(sizeof(shard_map_t))))
|
||||
{
|
||||
if ((rval->hash = hashtable_alloc(SCHEMAROUTER_HASHSIZE, hashkeyfun, hashcmpfun)))
|
||||
{
|
||||
HASHMEMORYFN kcopy = (HASHMEMORYFN)strdup;
|
||||
HASHMEMORYFN kfree = (HASHMEMORYFN)keyfreefun;
|
||||
hashtable_memory_fns(rval->hash, kcopy, NULL, kfree, NULL);
|
||||
spinlock_init(&rval->lock);
|
||||
rval->last_updated = 0;
|
||||
rval->state = SHMAP_UNINIT;
|
||||
}
|
||||
else
|
||||
{
|
||||
free(rval);
|
||||
rval = NULL;
|
||||
}
|
||||
}
|
||||
return rval;
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert a length encoded string into a C string.
|
||||
@ -363,6 +399,7 @@ showdb_response_t parse_showdb_response(ROUTER_CLIENT_SES* rses, backend_ref_t*
|
||||
ptr += gw_mysql_get_byte3(ptr) + 4;
|
||||
}
|
||||
|
||||
spinlock_acquire(&rses->shardmap->lock);
|
||||
while (ptr < (unsigned char*) buf->end && !PTR_IS_EOF(ptr))
|
||||
{
|
||||
int payloadlen = gw_mysql_get_byte3(ptr);
|
||||
@ -371,7 +408,7 @@ showdb_response_t parse_showdb_response(ROUTER_CLIENT_SES* rses, backend_ref_t*
|
||||
|
||||
if (data)
|
||||
{
|
||||
if (hashtable_add(rses->dbhash, data, target))
|
||||
if (hashtable_add(rses->shardmap->hash, data, target))
|
||||
{
|
||||
skygw_log_write(LOGFILE_TRACE, "schemarouter: <%s, %s>", target, data);
|
||||
}
|
||||
@ -385,7 +422,7 @@ showdb_response_t parse_showdb_response(ROUTER_CLIENT_SES* rses, backend_ref_t*
|
||||
{
|
||||
duplicate_found = true;
|
||||
skygw_log_write(LE, "Error: Database '%s' found on servers '%s' and '%s' for user %s@%s.",
|
||||
data, target, hashtable_fetch(rses->dbhash, data),
|
||||
data, target, hashtable_fetch(rses->shardmap->hash, data),
|
||||
rses->rses_client_dcb->user,
|
||||
rses->rses_client_dcb->remote);
|
||||
}
|
||||
@ -394,6 +431,7 @@ showdb_response_t parse_showdb_response(ROUTER_CLIENT_SES* rses, backend_ref_t*
|
||||
}
|
||||
ptr += packetlen;
|
||||
}
|
||||
spinlock_release(&rses->shardmap->lock);
|
||||
|
||||
if (ptr < (unsigned char*) buf->end && PTR_IS_EOF(ptr) &&
|
||||
bref->n_mapping_eof == 1)
|
||||
@ -478,8 +516,8 @@ int gen_databaselist(ROUTER_INSTANCE* inst, ROUTER_CLIENT_SES* session)
|
||||
* @param buffer Query to inspect
|
||||
* @return Name of the backend or NULL if the query contains no known databases.
|
||||
*/
|
||||
char* get_shard_target_name(ROUTER_INSTANCE* router, ROUTER_CLIENT_SES* client, GWBUF* buffer,skygw_query_type_t qtype){
|
||||
HASHTABLE* ht = client->dbhash;
|
||||
char* get_shard_target_name(ROUTER_INSTANCE* router, ROUTER_CLIENT_SES* client, GWBUF* buffer,skygw_query_type_t qtype)
|
||||
{
|
||||
int sz = 0,i,j;
|
||||
char** dbnms = NULL;
|
||||
char* rval = NULL,*query, *tmp = NULL;
|
||||
@ -491,6 +529,9 @@ char* get_shard_target_name(ROUTER_INSTANCE* router, ROUTER_CLIENT_SES* client,
|
||||
|
||||
dbnms = skygw_get_database_names(buffer,&sz);
|
||||
|
||||
spinlock_acquire(&client->shardmap->lock);
|
||||
HASHTABLE* ht = client->shardmap->hash;
|
||||
|
||||
if(sz > 0){
|
||||
for(i = 0; i < sz; i++){
|
||||
char* name;
|
||||
@ -553,6 +594,7 @@ char* get_shard_target_name(ROUTER_INSTANCE* router, ROUTER_CLIENT_SES* client,
|
||||
|
||||
|
||||
}
|
||||
spinlock_release(&client->shardmap->lock);
|
||||
return rval;
|
||||
}
|
||||
|
||||
@ -584,7 +626,8 @@ char* get_shard_target_name(ROUTER_INSTANCE* router, ROUTER_CLIENT_SES* client,
|
||||
skygw_log_write(LOGFILE_TRACE,"schemarouter: Using active database '%s'",client->rses_mysql_session->db);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
spinlock_release(&client->shardmap->lock);
|
||||
return rval;
|
||||
}
|
||||
|
||||
@ -765,6 +808,17 @@ createInstance(SERVICE *service, char **options)
|
||||
(HASHMEMORYFN)free,
|
||||
NULL);
|
||||
|
||||
if ((router->shard_maps = hashtable_alloc(SCHEMAROUTER_USERHASH_SIZE, hashkeyfun, hashcmpfun)) == NULL)
|
||||
{
|
||||
skygw_log_write(LE, "Error: Memory allocation failed when allocating schemarouter database ignore list.");
|
||||
hashtable_free(router->ignored_dbs);
|
||||
free(router);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
hashtable_memory_fns(router->shard_maps,(HASHMEMORYFN)strdup,
|
||||
NULL, (HASHMEMORYFN)keyfreefun, NULL);
|
||||
|
||||
/** Add default system databases to ignore */
|
||||
hashtable_add(router->ignored_dbs,"mysql","");
|
||||
hashtable_add(router->ignored_dbs,"information_schema","");
|
||||
@ -1007,7 +1061,7 @@ static void* newSession(
|
||||
|
||||
memset(db,0,MYSQL_DATABASE_MAXLEN+1);
|
||||
|
||||
spinlock_acquire(&protocol->protocol_lock);
|
||||
spinlock_acquire(&session->ses_lock);
|
||||
|
||||
/* To enable connecting directly to a sharded database we first need
|
||||
* to disable it for the client DCB's protocol so that we can connect to them*/
|
||||
@ -1028,7 +1082,7 @@ static void* newSession(
|
||||
LOGIF(LT,(skygw_log_write(LT,"schemarouter: Client'%s' connecting with empty database.",data->user)));
|
||||
}
|
||||
|
||||
spinlock_release(&protocol->protocol_lock);
|
||||
spinlock_release(&session->ses_lock);
|
||||
|
||||
client_rses = (ROUTER_CLIENT_SES *)calloc(1, sizeof(ROUTER_CLIENT_SES));
|
||||
|
||||
@ -1045,7 +1099,44 @@ static void* newSession(
|
||||
client_rses->router = router;
|
||||
client_rses->rses_mysql_session = (MYSQL_session*)session->data;
|
||||
client_rses->rses_client_dcb = (DCB*)session->client;
|
||||
|
||||
|
||||
spinlock_acquire(&router->lock);
|
||||
|
||||
shard_map_t *map = hashtable_fetch(router->shard_maps, session->client->user);
|
||||
enum shard_map_state state;
|
||||
|
||||
if (map)
|
||||
{
|
||||
spinlock_acquire(&map->lock);
|
||||
double tdiff = difftime(time(NULL), map->last_updated);
|
||||
if (tdiff > router->schemarouter_config.refresh_min_interval)
|
||||
{
|
||||
map->state = SHMAP_STALE;
|
||||
}
|
||||
state = map->state;
|
||||
spinlock_release(&map->lock);
|
||||
}
|
||||
|
||||
spinlock_release(&router->lock);
|
||||
|
||||
if (map == NULL || state != SHMAP_READY)
|
||||
{
|
||||
if ((map = create_shard_map()) == NULL)
|
||||
{
|
||||
skygw_log_write(LE, "Error: Failed to allocate enough memory to create"
|
||||
"new shard mapping. Session will be closed.");
|
||||
free(client_rses);
|
||||
return NULL;
|
||||
}
|
||||
client_rses->init = INIT_UNINT;
|
||||
}
|
||||
else
|
||||
{
|
||||
client_rses->init = INIT_READY;
|
||||
atomic_add(&router->stats.shmap_cache_hit, 1);
|
||||
}
|
||||
|
||||
client_rses->shardmap = map;
|
||||
client_rses->dcb_reply = dcb_alloc(DCB_ROLE_REQUEST_HANDLER);
|
||||
client_rses->dcb_reply->func.read = internalReply;
|
||||
client_rses->dcb_reply->state = DCB_STATE_POLLING;
|
||||
@ -1057,7 +1148,7 @@ static void* newSession(
|
||||
client_rses->dcb_route->state = DCB_STATE_POLLING;
|
||||
client_rses->dcb_route->session = session;
|
||||
client_rses->rses_config.last_refresh = time(NULL);
|
||||
client_rses->init = INIT_UNINT;
|
||||
|
||||
if(using_db)
|
||||
client_rses->init |= INIT_USE_DB;
|
||||
/**
|
||||
@ -1131,12 +1222,6 @@ static void* newSession(
|
||||
router_nservers,
|
||||
session,
|
||||
router);
|
||||
|
||||
client_rses->dbhash = hashtable_alloc(SCHEMAROUTER_HASHSIZE, hashkeyfun, hashcmpfun);
|
||||
hashtable_memory_fns(client_rses->dbhash,(HASHMEMORYFN)strdup,
|
||||
(HASHMEMORYFN)strdup,
|
||||
(HASHMEMORYFN)free,
|
||||
(HASHMEMORYFN)free);
|
||||
|
||||
rses_end_locked_router_action(client_rses);
|
||||
|
||||
@ -1366,7 +1451,6 @@ static void freeSession(
|
||||
* all the memory and other resources associated
|
||||
* to the client session.
|
||||
*/
|
||||
hashtable_free(router_cli_ses->dbhash);
|
||||
free(router_cli_ses->rses_backend_ref);
|
||||
free(router_cli_ses);
|
||||
return;
|
||||
@ -1744,8 +1828,6 @@ GWBUF*
|
||||
gen_show_dbs_response(ROUTER_INSTANCE* router, ROUTER_CLIENT_SES* client)
|
||||
{
|
||||
GWBUF* rval = NULL;
|
||||
HASHTABLE* ht = client->dbhash;
|
||||
HASHITERATOR* iter = hashtable_iterator(ht);
|
||||
backend_ref_t *bref = client->rses_backend_ref;
|
||||
BACKEND** backends = router->servers;
|
||||
unsigned int coldef_len = 0;
|
||||
@ -1845,44 +1927,55 @@ gen_show_dbs_response(ROUTER_INSTANCE* router, ROUTER_CLIENT_SES* client)
|
||||
int j = 0,ndbs = 0, bufsz = 10;
|
||||
char** dbs;
|
||||
|
||||
if((dbs = malloc(sizeof(char*)*bufsz)) == NULL)
|
||||
if((dbs = malloc(sizeof(char*) * bufsz)) == NULL)
|
||||
{
|
||||
gwbuf_free(rval);
|
||||
hashtable_iterator_free(iter);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
while((value = (char*) hashtable_next(iter)))
|
||||
spinlock_acquire(&client->shardmap->lock);
|
||||
|
||||
if(client->shardmap->state == SHMAP_READY)
|
||||
{
|
||||
char* bend = hashtable_fetch(ht, value);
|
||||
HASHTABLE* ht = client->shardmap->hash;
|
||||
HASHITERATOR* iter = hashtable_iterator(ht);
|
||||
|
||||
for(i = 0; backends[i]; i++)
|
||||
while((value = (char*) hashtable_next(iter)))
|
||||
{
|
||||
if(strcmp(bref[i].bref_backend->backend_server->unique_name, bend) == 0 &&
|
||||
BREF_IS_IN_USE(&bref[i]) && !BREF_IS_CLOSED(&bref[i]))
|
||||
char* bend = hashtable_fetch(ht, value);
|
||||
for(i = 0; backends[i]; i++)
|
||||
{
|
||||
ndbs++;
|
||||
|
||||
if(ndbs >= bufsz)
|
||||
{
|
||||
bufsz += bufsz / 2;
|
||||
char** tmp = realloc(dbs,sizeof(char*)*bufsz);
|
||||
if(tmp == NULL)
|
||||
{
|
||||
gwbuf_free(rval);
|
||||
hashtable_iterator_free(iter);
|
||||
for(i=0;i<ndbs-1;i++)free(dbs[i]);
|
||||
free(dbs);
|
||||
return NULL;
|
||||
}
|
||||
dbs = tmp;
|
||||
}
|
||||
|
||||
dbs[j++] = strdup(value);
|
||||
if(strcmp(bref[i].bref_backend->backend_server->unique_name, bend) == 0 &&
|
||||
BREF_IS_IN_USE(&bref[i]) && !BREF_IS_CLOSED(&bref[i]))
|
||||
{
|
||||
ndbs++;
|
||||
if(ndbs >= bufsz)
|
||||
{
|
||||
bufsz += bufsz / 2;
|
||||
char** tmp = realloc(dbs,sizeof(char*) * bufsz);
|
||||
if(tmp == NULL)
|
||||
{
|
||||
gwbuf_free(rval);
|
||||
hashtable_iterator_free(iter);
|
||||
for (i = 0; i < ndbs - 1; i++)
|
||||
{
|
||||
free(dbs[i]);
|
||||
}
|
||||
free(dbs);
|
||||
spinlock_release(&client->shardmap->lock);
|
||||
return NULL;
|
||||
}
|
||||
dbs = tmp;
|
||||
}
|
||||
dbs[j++] = strdup(value);
|
||||
}
|
||||
}
|
||||
}
|
||||
hashtable_iterator_free(iter);
|
||||
}
|
||||
|
||||
spinlock_release(&client->shardmap->lock);
|
||||
|
||||
qsort(&dbs[0],(size_t)ndbs,sizeof(char*),cmpfn);
|
||||
|
||||
for(j = 0;j<ndbs;j++)
|
||||
@ -1912,9 +2005,6 @@ gen_show_dbs_response(ROUTER_INSTANCE* router, ROUTER_CLIENT_SES* client)
|
||||
GWBUF* last_packet = gwbuf_alloc(sizeof(eof));
|
||||
memcpy(last_packet->start, eof, sizeof(eof));
|
||||
rval = gwbuf_append(rval, last_packet);
|
||||
|
||||
rval = gwbuf_make_contiguous(rval);
|
||||
hashtable_iterator_free(iter);
|
||||
free(dbs);
|
||||
return rval;
|
||||
}
|
||||
@ -1987,9 +2077,15 @@ static int routeQuery(
|
||||
|
||||
}
|
||||
|
||||
if(router_cli_ses->init & INIT_MAPPING)
|
||||
/**
|
||||
* If the databases are still being mapped or if the client connected
|
||||
* with a default database but no database mapping was performed we need
|
||||
* to store the query. Once the databases have been mapped and/or the
|
||||
* default database is taken into use we can send the query forward.
|
||||
*/
|
||||
if(router_cli_ses->init & (INIT_MAPPING|INIT_USE_DB))
|
||||
{
|
||||
|
||||
int init_rval = 1;
|
||||
char* querystr = modutil_get_SQL(querybuf);
|
||||
skygw_log_write(LOGFILE_DEBUG|LOGFILE_TRACE,"schemarouter: Storing query for session %p: %s",
|
||||
router_cli_ses->rses_client_dcb->session,
|
||||
@ -2012,14 +2108,26 @@ static int routeQuery(
|
||||
ptr->next = querybuf;
|
||||
|
||||
}
|
||||
|
||||
if(router_cli_ses->init == (INIT_READY|INIT_USE_DB))
|
||||
{
|
||||
/**
|
||||
* This state is possible if a client connects with a default database
|
||||
* and the shard map was found from the router cache
|
||||
*/
|
||||
if (!handle_default_db(router_cli_ses))
|
||||
{
|
||||
init_rval = 0;
|
||||
}
|
||||
}
|
||||
rses_end_locked_router_action(router_cli_ses);
|
||||
return 1;
|
||||
return init_rval;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
rses_end_locked_router_action(router_cli_ses);
|
||||
|
||||
|
||||
packet = GWBUF_DATA(querybuf);
|
||||
packet_type = packet[4];
|
||||
|
||||
@ -2139,9 +2247,12 @@ static int routeQuery(
|
||||
if (packet_type == MYSQL_COM_INIT_DB ||
|
||||
op == QUERY_OP_CHANGE_DB)
|
||||
{
|
||||
if (!(change_successful = change_current_db(router_cli_ses->rses_mysql_session,
|
||||
router_cli_ses->dbhash,
|
||||
querybuf)))
|
||||
spinlock_acquire(&router_cli_ses->shardmap->lock);
|
||||
change_successful = change_current_db(router_cli_ses->rses_mysql_session,
|
||||
router_cli_ses->shardmap->hash,
|
||||
querybuf);
|
||||
spinlock_release(&router_cli_ses->shardmap->lock);
|
||||
if (!change_successful)
|
||||
{
|
||||
time_t now = time(NULL);
|
||||
if(router_cli_ses->rses_config.refresh_databases &&
|
||||
@ -2149,22 +2260,21 @@ static int routeQuery(
|
||||
router_cli_ses->rses_config.refresh_min_interval)
|
||||
{
|
||||
rses_begin_locked_router_action(router_cli_ses);
|
||||
|
||||
router_cli_ses->rses_config.last_refresh = now;
|
||||
router_cli_ses->queue = querybuf;
|
||||
hashtable_free(router_cli_ses->dbhash);
|
||||
if((router_cli_ses->dbhash = hashtable_alloc(SCHEMAROUTER_HASHSIZE, hashkeyfun, hashcmpfun)) == NULL)
|
||||
{
|
||||
skygw_log_write(LE,"Error: Hashtable allocation failed.");
|
||||
rses_end_locked_router_action(router_cli_ses);
|
||||
return 1;
|
||||
}
|
||||
hashtable_memory_fns(router_cli_ses->dbhash,(HASHMEMORYFN)strdup,
|
||||
(HASHMEMORYFN)strdup,
|
||||
(HASHMEMORYFN)free,
|
||||
(HASHMEMORYFN)free);
|
||||
gen_databaselist(inst,router_cli_ses);
|
||||
int rc_refresh = 1;
|
||||
|
||||
if((router_cli_ses->shardmap = create_shard_map()))
|
||||
{
|
||||
gen_databaselist(inst,router_cli_ses);
|
||||
}
|
||||
else
|
||||
{
|
||||
rc_refresh = 0;
|
||||
}
|
||||
rses_end_locked_router_action(router_cli_ses);
|
||||
return 1;
|
||||
return rc_refresh;
|
||||
}
|
||||
extract_database(querybuf,db);
|
||||
snprintf(errbuf,25+MYSQL_DATABASE_MAXLEN,"Unknown database: %s",db);
|
||||
@ -2207,8 +2317,11 @@ static int routeQuery(
|
||||
op == QUERY_OP_CHANGE_DB)
|
||||
{
|
||||
route_target = TARGET_UNDEFINED;
|
||||
tname = hashtable_fetch(router_cli_ses->dbhash,router_cli_ses->rses_mysql_session->db);
|
||||
|
||||
|
||||
spinlock_acquire(&router_cli_ses->shardmap->lock);
|
||||
tname = hashtable_fetch(router_cli_ses->shardmap->hash,router_cli_ses->rses_mysql_session->db);
|
||||
spinlock_release(&router_cli_ses->shardmap->lock);
|
||||
|
||||
if(tname)
|
||||
{
|
||||
skygw_log_write(LOGFILE_TRACE,"schemarouter: INIT_DB for database '%s' on server '%s'",
|
||||
@ -2556,6 +2669,8 @@ diagnostic(ROUTER *instance, DCB *dcb)
|
||||
dcb_printf(dcb,"Shortest session: %.2lf seconds\n",router->stats.ses_shortest);
|
||||
dcb_printf(dcb,"Average session length: %.2lf seconds\n",router->stats.ses_average);
|
||||
}
|
||||
dcb_printf(dcb,"Shard map cache hits: %d\n",router->stats.shmap_cache_hit);
|
||||
dcb_printf(dcb,"Shard map cache misses: %d\n",router->stats.shmap_cache_miss);
|
||||
dcb_printf(dcb,"\n");
|
||||
}
|
||||
|
||||
@ -2633,6 +2748,21 @@ static void clientReply(ROUTER* instance,
|
||||
|
||||
if (rc == 1)
|
||||
{
|
||||
spinlock_acquire(&router_cli_ses->shardmap->lock);
|
||||
|
||||
router_cli_ses->shardmap->state = SHMAP_READY;
|
||||
router_cli_ses->shardmap->last_updated = time(NULL);
|
||||
spinlock_release(&router_cli_ses->shardmap->lock);
|
||||
|
||||
rses_end_locked_router_action(router_cli_ses);
|
||||
|
||||
synchronize_shard_map(router_cli_ses);
|
||||
|
||||
if (!rses_begin_locked_router_action(router_cli_ses))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
/*
|
||||
* Check if the session is reconnecting with a database name
|
||||
* that is not in the hashtable. If the database is not found
|
||||
@ -2653,6 +2783,7 @@ static void clientReply(ROUTER* instance,
|
||||
|
||||
if (router_cli_ses->queue)
|
||||
{
|
||||
ss_dassert(router_cli_ses->init == INIT_READY);
|
||||
route_queued_query(router_cli_ses);
|
||||
}
|
||||
skygw_log_write_flush(LOGFILE_DEBUG,
|
||||
@ -2669,11 +2800,6 @@ static void clientReply(ROUTER* instance,
|
||||
return;
|
||||
}
|
||||
|
||||
if (router_cli_ses->queue)
|
||||
{
|
||||
route_queued_query(router_cli_ses);
|
||||
}
|
||||
|
||||
if (router_cli_ses->init & INIT_USE_DB)
|
||||
{
|
||||
skygw_log_write(LOGFILE_DEBUG, "schemarouter: Reply to USE '%s' received for session %p",
|
||||
@ -2682,6 +2808,12 @@ static void clientReply(ROUTER* instance,
|
||||
router_cli_ses->init &= ~INIT_USE_DB;
|
||||
strcpy(router_cli_ses->rses_mysql_session->db, router_cli_ses->connect_db);
|
||||
ss_dassert(router_cli_ses->init == INIT_READY);
|
||||
|
||||
if (router_cli_ses->queue)
|
||||
{
|
||||
route_queued_query(router_cli_ses);
|
||||
}
|
||||
|
||||
rses_end_locked_router_action(router_cli_ses);
|
||||
if (writebuf)
|
||||
{
|
||||
@ -2690,6 +2822,14 @@ static void clientReply(ROUTER* instance,
|
||||
return;
|
||||
}
|
||||
|
||||
if (router_cli_ses->queue)
|
||||
{
|
||||
ss_dassert(router_cli_ses->init == INIT_READY);
|
||||
route_queued_query(router_cli_ses);
|
||||
rses_end_locked_router_action(router_cli_ses);
|
||||
return;
|
||||
}
|
||||
|
||||
CHK_BACKEND_REF(bref);
|
||||
scur = &bref->bref_sescmd_cur;
|
||||
/**
|
||||
@ -4447,7 +4587,7 @@ RESULT_ROW* shard_list_cb(struct resultset* rset, void* data)
|
||||
RESULT_ROW* rval = NULL;
|
||||
|
||||
if((key = hashtable_next(sl->iter)) &&
|
||||
(value = hashtable_fetch(sl->rses->dbhash,key)))
|
||||
(value = hashtable_fetch(sl->rses->shardmap->hash,key)))
|
||||
{
|
||||
if((rval = resultset_make_row(sl->rset)))
|
||||
{
|
||||
@ -4465,23 +4605,32 @@ RESULT_ROW* shard_list_cb(struct resultset* rset, void* data)
|
||||
*/
|
||||
int process_show_shards(ROUTER_CLIENT_SES* rses)
|
||||
{
|
||||
HASHITERATOR* iter = hashtable_iterator(rses->dbhash);
|
||||
struct shard_list sl;
|
||||
int rval = 0;
|
||||
|
||||
sl.iter = iter;
|
||||
sl.rses = rses;
|
||||
if((sl.rset = resultset_create(shard_list_cb,&sl)) == NULL)
|
||||
spinlock_acquire(&rses->shardmap->lock);
|
||||
if(rses->shardmap->state == SHMAP_READY)
|
||||
{
|
||||
skygw_log_write(LE,"[%s] Error: Failed to create resultset.",__FUNCTION__);
|
||||
return -1;
|
||||
}
|
||||
HASHITERATOR* iter = hashtable_iterator(rses->shardmap->hash);
|
||||
struct shard_list sl;
|
||||
|
||||
resultset_add_column(sl.rset,"Database",MYSQL_DATABASE_MAXLEN,COL_TYPE_VARCHAR);
|
||||
resultset_add_column(sl.rset,"Server",MYSQL_DATABASE_MAXLEN,COL_TYPE_VARCHAR);
|
||||
resultset_stream_mysql(sl.rset,rses->rses_client_dcb);
|
||||
resultset_free(sl.rset);
|
||||
hashtable_iterator_free(iter);
|
||||
return 0;
|
||||
sl.iter = iter;
|
||||
sl.rses = rses;
|
||||
if ((sl.rset = resultset_create(shard_list_cb, &sl)) == NULL)
|
||||
{
|
||||
skygw_log_write(LE, "[%s] Error: Failed to create resultset.", __FUNCTION__);
|
||||
rval = -1;
|
||||
}
|
||||
else
|
||||
{
|
||||
resultset_add_column(sl.rset, "Database", MYSQL_DATABASE_MAXLEN, COL_TYPE_VARCHAR);
|
||||
resultset_add_column(sl.rset, "Server", MYSQL_DATABASE_MAXLEN, COL_TYPE_VARCHAR);
|
||||
resultset_stream_mysql(sl.rset, rses->rses_client_dcb);
|
||||
resultset_free(sl.rset);
|
||||
hashtable_iterator_free(iter);
|
||||
}
|
||||
}
|
||||
spinlock_release(&rses->shardmap->lock);
|
||||
return rval;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -4514,10 +4663,53 @@ void write_error_to_client(DCB* dcb, int errnum, const char* mysqlstate, const c
|
||||
*/
|
||||
bool handle_default_db(ROUTER_CLIENT_SES *router_cli_ses)
|
||||
{
|
||||
char* target;
|
||||
bool rval = false;
|
||||
char* target = NULL;
|
||||
|
||||
if ((target = hashtable_fetch(router_cli_ses->dbhash,
|
||||
router_cli_ses->connect_db)) == NULL)
|
||||
spinlock_acquire(&router_cli_ses->shardmap->lock);
|
||||
if(router_cli_ses->shardmap->state == SHMAP_READY)
|
||||
{
|
||||
target = hashtable_fetch(router_cli_ses->shardmap->hash, router_cli_ses->connect_db);
|
||||
}
|
||||
spinlock_release(&router_cli_ses->shardmap->lock);
|
||||
|
||||
if (target)
|
||||
{
|
||||
/* Send a COM_INIT_DB packet to the server with the right database
|
||||
* and set it as the client's active database */
|
||||
|
||||
unsigned int qlen = strlen(router_cli_ses->connect_db);
|
||||
GWBUF* buffer = gwbuf_alloc(qlen + 5);
|
||||
|
||||
if (buffer)
|
||||
{
|
||||
gw_mysql_set_byte3((unsigned char*) buffer->start, qlen + 1);
|
||||
gwbuf_set_type(buffer, GWBUF_TYPE_MYSQL);
|
||||
*((unsigned char*) buffer->start + 3) = 0x0;
|
||||
*((unsigned char*) buffer->start + 4) = 0x2;
|
||||
memcpy(buffer->start + 5, router_cli_ses->connect_db, qlen);
|
||||
DCB* dcb = NULL;
|
||||
|
||||
if (get_shard_dcb(&dcb, router_cli_ses, target))
|
||||
{
|
||||
dcb->func.write(dcb, buffer);
|
||||
skygw_log_write(LOGFILE_DEBUG, "schemarouter: USE '%s' sent to %s for session %p",
|
||||
router_cli_ses->connect_db,
|
||||
target,
|
||||
router_cli_ses->rses_client_dcb->session);
|
||||
rval = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
skygw_log_write_flush(LOGFILE_TRACE, "schemarouter: Couldn't find target DCB for '%s'.", target);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
skygw_log_write_flush(LOGFILE_ERROR, "Error : Buffer allocation failed.");
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
/** Unknown database, hang up on the client*/
|
||||
skygw_log_write_flush(LOGFILE_TRACE, "schemarouter: Connecting to a non-existent database '%s'",
|
||||
@ -4526,50 +4718,16 @@ bool handle_default_db(ROUTER_CLIENT_SES *router_cli_ses)
|
||||
sprintf(errmsg, "Unknown database '%s'", router_cli_ses->connect_db);
|
||||
if (router_cli_ses->rses_config.debug)
|
||||
{
|
||||
sprintf(errmsg + strlen(errmsg), " ([%lu]: DB not found on connect)", router_cli_ses->rses_client_dcb->session->ses_id);
|
||||
sprintf(errmsg + strlen(errmsg), " ([%lu]: DB not found on connect)",
|
||||
router_cli_ses->rses_client_dcb->session->ses_id);
|
||||
}
|
||||
write_error_to_client(router_cli_ses->rses_client_dcb,
|
||||
SCHEMA_ERR_DBNOTFOUND,
|
||||
SCHEMA_ERRSTR_DBNOTFOUND,
|
||||
errmsg);
|
||||
return false;
|
||||
}
|
||||
|
||||
/* Send a COM_INIT_DB packet to the server with the right database
|
||||
* and set it as the client's active database */
|
||||
|
||||
unsigned int qlen;
|
||||
GWBUF* buffer;
|
||||
|
||||
qlen = strlen(router_cli_ses->connect_db);
|
||||
buffer = gwbuf_alloc(qlen + 5);
|
||||
if (buffer == NULL)
|
||||
{
|
||||
skygw_log_write_flush(LOGFILE_ERROR, "Error : Buffer allocation failed.");
|
||||
return false;
|
||||
}
|
||||
|
||||
gw_mysql_set_byte3((unsigned char*) buffer->start, qlen + 1);
|
||||
gwbuf_set_type(buffer, GWBUF_TYPE_MYSQL);
|
||||
*((unsigned char*) buffer->start + 3) = 0x0;
|
||||
*((unsigned char*) buffer->start + 4) = 0x2;
|
||||
memcpy(buffer->start + 5, router_cli_ses->connect_db, qlen);
|
||||
DCB* dcb = NULL;
|
||||
|
||||
if (get_shard_dcb(&dcb, router_cli_ses, target))
|
||||
{
|
||||
dcb->func.write(dcb, buffer);
|
||||
skygw_log_write(LOGFILE_DEBUG, "schemarouter: USE '%s' sent to %s for session %p",
|
||||
router_cli_ses->connect_db,
|
||||
target,
|
||||
router_cli_ses->rses_client_dcb->session);
|
||||
}
|
||||
else
|
||||
{
|
||||
skygw_log_write_flush(LOGFILE_TRACE, "schemarouter: Couldn't find target DCB for '%s'.", target);
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
return rval;
|
||||
}
|
||||
|
||||
void route_queued_query(ROUTER_CLIENT_SES *router_cli_ses)
|
||||
@ -4684,3 +4842,75 @@ int inspect_backend_mapping_states(ROUTER_CLIENT_SES *router_cli_ses,
|
||||
*wbuf = writebuf;
|
||||
return mapped ? 1 : 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Replace a shard map with another one. This function copies the contents of
|
||||
* the source shard map to the target and frees the source memory.
|
||||
* @param target Target shard map to replace
|
||||
* @param source Source shard map to use
|
||||
*/
|
||||
void replace_shard_map(shard_map_t **target, shard_map_t **source)
|
||||
{
|
||||
shard_map_t *tgt = *target;
|
||||
shard_map_t *src = *source;
|
||||
tgt->last_updated = src->last_updated;
|
||||
tgt->state = src->state;
|
||||
hashtable_free(tgt->hash);
|
||||
tgt->hash = src->hash;
|
||||
free(src);
|
||||
*source = NULL;
|
||||
}
|
||||
|
||||
/**
|
||||
* Synchronize the router client session shard map with the global shard map for
|
||||
* this user.
|
||||
*
|
||||
* If the router doesn't have a shard map for this user then the current shard map
|
||||
* of the client session is added to the router. If the shard map in the router is
|
||||
* out of date, its contents are replaced with the contents of the current client
|
||||
* session. If the router has a usable shard map, the current shard map of the client
|
||||
* is discarded and the router's shard map is used.
|
||||
* @param client Router session
|
||||
*/
|
||||
void synchronize_shard_map(ROUTER_CLIENT_SES *client)
|
||||
{
|
||||
spinlock_acquire(&client->router->lock);
|
||||
|
||||
client->router->stats.shmap_cache_miss++;
|
||||
|
||||
shard_map_t *map = hashtable_fetch(client->router->shard_maps,
|
||||
client->rses_client_dcb->user);
|
||||
if (map)
|
||||
{
|
||||
spinlock_acquire(&map->lock);
|
||||
if (map->state == SHMAP_STALE)
|
||||
{
|
||||
replace_shard_map(&map, &client->shardmap);
|
||||
}
|
||||
else if (map->state != SHMAP_READY)
|
||||
{
|
||||
skygw_log_write(LE, "Warning: Shard map state is not ready but"
|
||||
"it is in use. Replacing it with a newer one.");
|
||||
replace_shard_map(&map, &client->shardmap);
|
||||
}
|
||||
else
|
||||
{
|
||||
/**
|
||||
* Another thread has already updated the shard map for this user
|
||||
*/
|
||||
hashtable_free(client->shardmap->hash);
|
||||
free(client->shardmap);
|
||||
}
|
||||
spinlock_release(&map->lock);
|
||||
client->shardmap = map;
|
||||
}
|
||||
else
|
||||
{
|
||||
hashtable_add(client->router->shard_maps,
|
||||
client->rses_client_dcb->user,
|
||||
client->shardmap);
|
||||
ss_dassert(hashtable_fetch(client->router->shard_maps,
|
||||
client->rses_client_dcb->user) == client->shardmap);
|
||||
}
|
||||
spinlock_release(&client->router->lock);
|
||||
}
|
||||
|
Reference in New Issue
Block a user