Move shard map handling into a separate file

Moved the handling of shard maps into another file.
This commit is contained in:
Markus Mäkelä
2017-03-24 14:01:29 +02:00
parent a5fd53e9a0
commit 9e4e70a337
5 changed files with 217 additions and 156 deletions

View File

@ -32,9 +32,6 @@
#define DEFAULT_REFRESH_INTERVAL "300"
/** Size of the hashtable used to store ignored databases */
#define SCHEMAROUTER_HASHSIZE 100
/** Hashtable size for the per user shard maps */
#define SCHEMAROUTER_USERHASH_SIZE 10
@ -117,8 +114,6 @@ static SCHEMAROUTER* instances;
bool detect_show_shards(GWBUF* query);
int process_show_shards(SCHEMAROUTER_SESSION* rses);
static int hashkeyfun(const void* key);
static int hashcmpfun(const void *, const void *);
void write_error_to_client(DCB* dcb, int errnum, const char* mysqlstate, const char* errmsg);
int inspect_backend_mapping_states(SCHEMAROUTER_SESSION *router_cli_ses,
@ -128,60 +123,18 @@ bool handle_default_db(SCHEMAROUTER_SESSION *router_cli_ses);
void route_queued_query(SCHEMAROUTER_SESSION *router_cli_ses);
void synchronize_shard_map(SCHEMAROUTER_SESSION *client);
static int hashkeyfun(const void* key)
bool check_server_status(SERVER_REF *servers, char* target)
{
if (key == NULL)
for (SERVER_REF *ref = servers; ref; ref = ref->next)
{
return 0;
}
int hash = 0, c = 0;
const char* ptr = (const char*)key;
while ((c = *ptr++))
{
hash = c + (hash << 6) + (hash << 16) - hash;
}
return hash;
}
static int hashcmpfun(const void* v1, const void* v2)
{
const char* i1 = (const char*) v1;
const char* i2 = (const char*) v2;
return strcmp(i1, i2);
}
void keyfreefun(void* data)
{
MXS_FREE(data);
}
/**
* Allocate a shard map and initialize it.
* @return Pointer to new shard_map_t or NULL if memory allocation failed
*/
shard_map_t* shard_map_alloc()
{
shard_map_t *rval = (shard_map_t*) MXS_MALLOC(sizeof(shard_map_t));
if (rval)
{
if ((rval->hash = hashtable_alloc(SCHEMAROUTER_HASHSIZE, hashkeyfun, hashcmpfun)))
if (strcmp(ref->server->unique_name, target) == 0 &&
SERVER_IS_RUNNING(ref->server))
{
HASHCOPYFN kcopy = (HASHCOPYFN)strdup;
hashtable_memory_fns(rval->hash, kcopy, kcopy, keyfreefun, keyfreefun);
spinlock_init(&rval->lock);
rval->last_updated = 0;
rval->state = SHMAP_UNINIT;
}
else
{
MXS_FREE(rval);
rval = NULL;
return true;
}
}
return rval;
return false;
}
/**
@ -539,46 +492,6 @@ char* get_shard_target_name(SCHEMAROUTER* router,
return rval;
}
/**
* Check if the backend is still running. If the backend is not running the
* hashtable is updated with up-to-date values.
* @param router Router instance
* @param shard Shard to check
* @return True if the backend server is running
*/
bool check_shard_status(SCHEMAROUTER* router, char* shard)
{
for (SERVER_REF *ref = router->service->dbref; ref; ref = ref->next)
{
if (strcmp(ref->server->unique_name, shard) == 0 &&
SERVER_IS_RUNNING(ref->server))
{
return true;
}
}
return false;
}
/**
* Check if the shard map is out of date and update its state if necessary.
* @param router Router instance
* @param map Shard map to update
* @return Current state of the shard map
*/
enum shard_map_state shard_map_update_state(shard_map_t *self, SCHEMAROUTER* router)
{
spinlock_acquire(&self->lock);
double tdiff = difftime(time(NULL), self->last_updated);
if (tdiff > router->schemarouter_config.refresh_min_interval)
{
self->state = SHMAP_STALE;
}
enum shard_map_state state = self->state;
spinlock_release(&self->lock);
return state;
}
/**
* Provide the router with a pointer to a suitable backend dcb.
*
@ -2419,24 +2332,6 @@ int inspect_backend_mapping_states(SCHEMAROUTER_SESSION *router_cli_ses,
return mapped ? 1 : 0;
}
/**
* Replace a shard map with another one. This function copies the contents of
* the source shard map to the target and frees the source memory.
* @param target Target shard map to replace
* @param source Source shard map to use
*/
void replace_shard_map(shard_map_t **target, shard_map_t **source)
{
shard_map_t *tgt = *target;
shard_map_t *src = *source;
tgt->last_updated = src->last_updated;
tgt->state = src->state;
hashtable_free(tgt->hash);
tgt->hash = src->hash;
MXS_FREE(src);
*source = NULL;
}
/**
* Synchronize the router client session shard map with the global shard map for
* this user.
@ -2455,33 +2350,14 @@ void synchronize_shard_map(SCHEMAROUTER_SESSION *client)
client->router->stats.shmap_cache_miss++;
shard_map_t *map = (shard_map_t *)hashtable_fetch(client->router->shard_maps,
client->rses_client_dcb->user);
client->rses_client_dcb->user);
if (map)
{
spinlock_acquire(&map->lock);
if (map->state == SHMAP_STALE)
{
replace_shard_map(&map, &client->shardmap);
}
else if (map->state != SHMAP_READY)
{
MXS_WARNING("Shard map state is not ready but"
"it is in use. Replacing it with a newer one.");
replace_shard_map(&map, &client->shardmap);
}
else
{
/**
* Another thread has already updated the shard map for this user
*/
hashtable_free(client->shardmap->hash);
MXS_FREE(client->shardmap);
}
spinlock_release(&map->lock);
client->shardmap = map;
map = get_latest_shard_map(map, client->shardmap);
}
else
{
/** No previous map found */
hashtable_add(client->router->shard_maps,
client->rses_client_dcb->user,
client->shardmap);
@ -2869,7 +2745,7 @@ static MXS_ROUTER_SESSION* newSession(MXS_ROUTER* router_inst, MXS_SESSION* sess
if (map)
{
state = shard_map_update_state(map, router);
state = shard_map_update_state(map, router->schemarouter_config.refresh_min_interval);
}
spinlock_release(&router->lock);
@ -3369,7 +3245,7 @@ static int routeQuery(MXS_ROUTER* instance, MXS_ROUTER_SESSION* router_session,
spinlock_acquire(&router_cli_ses->shardmap->lock);
if ((tname = get_shard_target_name(inst, router_cli_ses, querybuf, qtype)) != NULL)
{
bool shard_ok = check_shard_status(inst, tname);
bool shard_ok = check_server_status(inst->service->dbref, tname);
if (shard_ok)
{