Take new sharding implementation into use
The sharding implementation now uses a class to abstract the details of the shard. This allows for different design where each session makes a copy of the global shard map which is then used for the duration of the session. In addition to making the desing a bit clearer to understand, it also removes lock competition between threads. Due to the change to C++, the main entry points need to be wrapped in the exception-safety macros. The next step in the refactoring will be to use the router template. This will remove the need to manually define them.
This commit is contained in:
@ -15,109 +15,96 @@
|
||||
|
||||
#include <maxscale/alloc.h>
|
||||
|
||||
int hashkeyfun(const void* key)
|
||||
Shard::Shard():
|
||||
m_last_updated(time(NULL))
|
||||
{
|
||||
if (key == NULL)
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
int hash = 0, c = 0;
|
||||
const char* ptr = (const char*)key;
|
||||
while ((c = *ptr++))
|
||||
{
|
||||
hash = c + (hash << 6) + (hash << 16) - hash;
|
||||
}
|
||||
|
||||
return hash;
|
||||
}
|
||||
|
||||
int hashcmpfun(const void* v1, const void* v2)
|
||||
Shard::~Shard()
|
||||
{
|
||||
const char* i1 = (const char*) v1;
|
||||
const char* i2 = (const char*) v2;
|
||||
|
||||
return strcmp(i1, i2);
|
||||
}
|
||||
|
||||
void keyfreefun(void* data)
|
||||
bool Shard::add_location(string db, SERVER* target)
|
||||
{
|
||||
MXS_FREE(data);
|
||||
return m_map.insert(make_pair(db, target)).second;
|
||||
}
|
||||
|
||||
shard_map_t* shard_map_alloc()
|
||||
SERVER* Shard::get_location(string db)
|
||||
{
|
||||
shard_map_t *rval = (shard_map_t*) MXS_MALLOC(sizeof(shard_map_t));
|
||||
SERVER* rval = NULL;
|
||||
ServerMap::iterator iter = m_map.find(db);
|
||||
|
||||
if (rval)
|
||||
if (iter != m_map.end())
|
||||
{
|
||||
if ((rval->hash = hashtable_alloc(SCHEMAROUTER_HASHSIZE, hashkeyfun, hashcmpfun)))
|
||||
{
|
||||
HASHCOPYFN kcopy = (HASHCOPYFN)strdup;
|
||||
hashtable_memory_fns(rval->hash, kcopy, kcopy, keyfreefun, keyfreefun);
|
||||
spinlock_init(&rval->lock);
|
||||
rval->last_updated = 0;
|
||||
rval->state = SHMAP_UNINIT;
|
||||
}
|
||||
else
|
||||
{
|
||||
MXS_FREE(rval);
|
||||
rval = NULL;
|
||||
}
|
||||
rval = iter->second;
|
||||
}
|
||||
|
||||
return rval;
|
||||
}
|
||||
|
||||
enum shard_map_state shard_map_update_state(shard_map_t *self, double refresh_min_interval)
|
||||
bool Shard::stale(double max_interval) const
|
||||
{
|
||||
spinlock_acquire(&self->lock);
|
||||
double tdiff = difftime(time(NULL), self->last_updated);
|
||||
if (tdiff > refresh_min_interval)
|
||||
{
|
||||
self->state = SHMAP_STALE;
|
||||
}
|
||||
enum shard_map_state state = self->state;
|
||||
spinlock_release(&self->lock);
|
||||
return state;
|
||||
time_t now = time(NULL);
|
||||
|
||||
return difftime(now, m_last_updated) > max_interval;
|
||||
}
|
||||
|
||||
void replace_shard_map(shard_map_t **target, shard_map_t **source)
|
||||
bool Shard::empty() const
|
||||
{
|
||||
shard_map_t *tgt = *target;
|
||||
shard_map_t *src = *source;
|
||||
tgt->last_updated = src->last_updated;
|
||||
tgt->state = src->state;
|
||||
hashtable_free(tgt->hash);
|
||||
tgt->hash = src->hash;
|
||||
MXS_FREE(src);
|
||||
*source = NULL;
|
||||
return m_map.size() == 0;
|
||||
}
|
||||
|
||||
shard_map_t* get_latest_shard_map(shard_map_t *stored, shard_map_t *current)
|
||||
void Shard::get_content(ServerMap& dest)
|
||||
{
|
||||
shard_map_t *map = stored;
|
||||
|
||||
spinlock_acquire(&map->lock);
|
||||
|
||||
if (map->state == SHMAP_STALE)
|
||||
for (ServerMap::iterator it = m_map.begin(); it != m_map.end(); it++)
|
||||
{
|
||||
replace_shard_map(&map, ¤t);
|
||||
dest.insert(*it);
|
||||
}
|
||||
else if (map->state != SHMAP_READY)
|
||||
}
|
||||
|
||||
bool Shard::newer_than(const Shard& shard) const
|
||||
{
|
||||
return m_last_updated > shard.m_last_updated;
|
||||
}
|
||||
|
||||
ShardManager::ShardManager()
|
||||
{
|
||||
spinlock_init(&m_lock);
|
||||
}
|
||||
|
||||
ShardManager::~ShardManager()
|
||||
{
|
||||
}
|
||||
|
||||
Shard ShardManager::get_shard(string user, double max_interval)
|
||||
{
|
||||
SpinLockGuard guard(m_lock);
|
||||
|
||||
ShardMap::iterator iter = m_maps.find(user);
|
||||
|
||||
if (iter == m_maps.end() || iter->second.stale(max_interval))
|
||||
{
|
||||
MXS_WARNING("Shard map state is not ready but"
|
||||
"it is in use. Replacing it with a newer one.");
|
||||
replace_shard_map(&map, ¤t);
|
||||
}
|
||||
else
|
||||
{
|
||||
/**
|
||||
* Another thread has already updated the shard map for this user
|
||||
*/
|
||||
hashtable_free(current->hash);
|
||||
MXS_FREE(current);
|
||||
// No previous shard or a stale shard, construct a new one
|
||||
|
||||
if (iter != m_maps.end())
|
||||
{
|
||||
m_maps.erase(iter);
|
||||
}
|
||||
|
||||
return Shard();
|
||||
}
|
||||
|
||||
spinlock_release(&map->lock);
|
||||
// Found valid shard
|
||||
return iter->second;
|
||||
}
|
||||
|
||||
return map;
|
||||
}
|
||||
void ShardManager::update_shard(Shard& shard, string user)
|
||||
{
|
||||
SpinLockGuard guard(m_lock);
|
||||
ShardMap::iterator iter = m_maps.find(user);
|
||||
|
||||
if (iter == m_maps.end() || shard.newer_than(iter->second))
|
||||
{
|
||||
m_maps[user] = shard;
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user