Code changes based on review of 1e8afe5063134b7c56492b3777ae1248b1587ba4

This commit is contained in:
Markus Makela
2015-10-30 16:54:59 +02:00
parent d5c38b93f6
commit 50b0a9b71b
2 changed files with 71 additions and 38 deletions

View File

@ -261,7 +261,7 @@ void* keyfreefun(void* data)
* Allocate a shard map and initialize it.
* @return Pointer to new shard_map_t or NULL if memory allocation failed
*/
shard_map_t* create_shard_map()
shard_map_t* shard_map_alloc()
{
shard_map_t *rval;
@ -271,7 +271,7 @@ shard_map_t* create_shard_map()
{
HASHMEMORYFN kcopy = (HASHMEMORYFN)strdup;
HASHMEMORYFN kfree = (HASHMEMORYFN)keyfreefun;
hashtable_memory_fns(rval->hash, kcopy, NULL, kfree, NULL);
hashtable_memory_fns(rval->hash, kcopy, kcopy, kfree, kfree);
spinlock_init(&rval->lock);
rval->last_updated = 0;
rval->state = SHMAP_UNINIT;
@ -529,7 +529,6 @@ char* get_shard_target_name(ROUTER_INSTANCE* router, ROUTER_CLIENT_SES* client,
dbnms = skygw_get_database_names(buffer,&sz);
spinlock_acquire(&client->shardmap->lock);
HASHTABLE* ht = client->shardmap->hash;
if(sz > 0){
@ -591,13 +590,10 @@ char* get_shard_target_name(ROUTER_INSTANCE* router, ROUTER_CLIENT_SES* client,
{
rval = tmp;
has_dbs = true;
}
spinlock_release(&client->shardmap->lock);
return rval;
}
else
{
if(buffer->hint && buffer->hint->type == HINT_ROUTE_TO_NAMED_SERVER)
{
for(i = 0; i < client->rses_nbackends; i++)
@ -625,9 +621,9 @@ char* get_shard_target_name(ROUTER_INSTANCE* router, ROUTER_CLIENT_SES* client,
{
skygw_log_write(LOGFILE_TRACE,"schemarouter: Using active database '%s'",client->current_db);
}
}
}
spinlock_release(&client->shardmap->lock);
return rval;
}
@ -1033,6 +1029,25 @@ retblock:
return (ROUTER *)router;
}
/**
* Check if the shard map is out of date and update its state if necessary.
* @param router Router instance
* @param map Shard map to update
* @return Current state of the shard map
*/
enum shard_map_state shard_map_update_state(ROUTER_INSTANCE* router, shard_map_t *map)
{
spinlock_acquire(&map->lock);
double tdiff = difftime(time(NULL), map->last_updated);
if (tdiff > router->schemarouter_config.refresh_min_interval)
{
map->state = SHMAP_STALE;
}
enum shard_map_state state = map->state;
spinlock_release(&map->lock);
return state;
}
/**
* Associate a new session with this instance of the router.
*
@ -1107,21 +1122,14 @@ static void* newSession(
if (map)
{
spinlock_acquire(&map->lock);
double tdiff = difftime(time(NULL), map->last_updated);
if (tdiff > router->schemarouter_config.refresh_min_interval)
{
map->state = SHMAP_STALE;
}
state = map->state;
spinlock_release(&map->lock);
state = shard_map_update_state(router, map);
}
spinlock_release(&router->lock);
if (map == NULL || state != SHMAP_READY)
{
if ((map = create_shard_map()) == NULL)
if ((map = shard_map_alloc()) == NULL)
{
skygw_log_write(LE, "Error: Failed to allocate enough memory to create"
"new shard mapping. Session will be closed.");
@ -2047,6 +2055,7 @@ static int routeQuery(
route_target_t route_target = TARGET_UNDEFINED;
bool succp = false;
char* tname = NULL;
char* targetserver = NULL;
GWBUF* querybuf = qbuf;
char db[MYSQL_DATABASE_MAXLEN + 1];
char errbuf[26+MYSQL_DATABASE_MAXLEN];
@ -2269,7 +2278,7 @@ static int routeQuery(
router_cli_ses->queue = querybuf;
int rc_refresh = 1;
if((router_cli_ses->shardmap = create_shard_map()))
if((router_cli_ses->shardmap = shard_map_alloc()))
{
gen_databaselist(inst,router_cli_ses);
}
@ -2324,27 +2333,37 @@ static int routeQuery(
spinlock_acquire(&router_cli_ses->shardmap->lock);
tname = hashtable_fetch(router_cli_ses->shardmap->hash,router_cli_ses->current_db);
spinlock_release(&router_cli_ses->shardmap->lock);
if(tname)
{
skygw_log_write(LOGFILE_TRACE,"schemarouter: INIT_DB for database '%s' on server '%s'",
router_cli_ses->current_db,tname);
route_target = TARGET_NAMED_SERVER;
targetserver = strdup(tname);
}
else
{
skygw_log_write(LOGFILE_TRACE,"schemarouter: INIT_DB with unknown database");
}
spinlock_release(&router_cli_ses->shardmap->lock);
}
else if(route_target != TARGET_ALL &&
(tname = get_shard_target_name(inst,router_cli_ses,querybuf,qtype)) != NULL)
else if (route_target != TARGET_ALL)
{
/** If no database is found in the query and there is no active database
* or hints in the query we need to route the query to the first available
* server. This isn't ideal for monitoring server status but works if
* we just want the server to send an error back. */
spinlock_acquire(&router_cli_ses->shardmap->lock);
if ((tname = get_shard_target_name(inst,router_cli_ses,querybuf,qtype)) != NULL)
{
bool shard_ok = check_shard_status(inst,tname);
if(shard_ok)
{
route_target = TARGET_NAMED_SERVER;
targetserver = strdup(tname);
}
else
{
@ -2357,10 +2376,12 @@ static int routeQuery(
*/
}
}
spinlock_release(&router_cli_ses->shardmap->lock);
}
if(TARGET_IS_UNDEFINED(route_target))
{
spinlock_acquire(&router_cli_ses->shardmap->lock);
tname = get_shard_target_name(inst,router_cli_ses,querybuf,qtype);
if( (tname == NULL &&
@ -2380,6 +2401,10 @@ static int routeQuery(
}
else
{
if (tname)
{
targetserver = strdup(tname);
}
if(!change_successful)
{
/**
@ -2395,11 +2420,11 @@ static int routeQuery(
/** Something else went wrong, terminate connection */
ret = 0;
}
spinlock_release(&router_cli_ses->shardmap->lock);
goto retblock;
}
spinlock_release(&router_cli_ses->shardmap->lock);
}
if (TARGET_IS_ALL(route_target))
@ -2442,8 +2467,8 @@ static int routeQuery(
{
if(SERVER_IS_RUNNING(inst->servers[z]->backend_server))
{
tname = inst->servers[z]->backend_server->unique_name;
route_target = TARGET_NAMED_SERVER;
targetserver = strdup(inst->servers[z]->backend_server->unique_name);
break;
}
}
@ -2466,14 +2491,14 @@ static int routeQuery(
/**
* Query is routed to one of the backends
*/
if (TARGET_IS_NAMED_SERVER(route_target))
if (TARGET_IS_NAMED_SERVER(route_target) && targetserver != NULL)
{
/**
* Search backend server by name or replication lag.
* If it fails, then try to find valid slave or master.
*/
succp = get_shard_dcb(&target_dcb, router_cli_ses, tname);
succp = get_shard_dcb(&target_dcb, router_cli_ses, targetserver);
if (!succp)
{
@ -2482,7 +2507,7 @@ static int routeQuery(
"Was supposed to route to named server "
"%s but couldn't find the server in a "
"suitable state.",
tname)));
targetserver)));
}
}
@ -2539,7 +2564,7 @@ static int routeQuery(
}
rses_end_locked_router_action(router_cli_ses);
retblock:
free(targetserver);
gwbuf_free(querybuf);
return ret;
@ -4600,7 +4625,8 @@ int process_show_shards(ROUTER_CLIENT_SES* rses)
{
HASHITERATOR* iter = hashtable_iterator(rses->shardmap->hash);
struct shard_list sl;
if (iter)
{
sl.iter = iter;
sl.rses = rses;
if ((sl.rset = resultset_create(shard_list_cb, &sl)) == NULL)
@ -4617,6 +4643,13 @@ int process_show_shards(ROUTER_CLIENT_SES* rses)
hashtable_iterator_free(iter);
}
}
else
{
skygw_log_write(LE, "Error: hashtable_iterator creation failed. "
"This is caused by a memory allocation failure.");
rval = -1;
}
}
spinlock_release(&rses->shardmap->lock);
return rval;
}