Prevent stale item from being fetched more than once
If a cached entry becomes stale, then if no extra measures are taken then every thread hitting that item while it is being fetched from the server will also refresh it, even though it obviously is sufficient that one does it. Now the knowledge that the item is being refreshed is recorded, so that all other clients are simply returned the stale item. That way, we'll hit the server once per stale item.
This commit is contained in:
parent
f4e3ca2c87
commit
2ecd5f3340
202
server/modules/filter/cache/cache.c
vendored
202
server/modules/filter/cache/cache.c
vendored
@ -16,11 +16,13 @@
|
||||
#include <maxscale/alloc.h>
|
||||
#include <maxscale/filter.h>
|
||||
#include <maxscale/gwdirs.h>
|
||||
#include <maxscale/hashtable.h>
|
||||
#include <maxscale/log_manager.h>
|
||||
#include <maxscale/modinfo.h>
|
||||
#include <maxscale/modutil.h>
|
||||
#include <maxscale/mysql_utils.h>
|
||||
#include <maxscale/query_classifier.h>
|
||||
#include <maxscale/spinlock.h>
|
||||
#include "rules.h"
|
||||
#include "storage.h"
|
||||
|
||||
@ -119,11 +121,13 @@ static const CACHE_CONFIG DEFAULT_CONFIG =
|
||||
|
||||
typedef struct cache_instance
|
||||
{
|
||||
const char *name;
|
||||
CACHE_CONFIG config;
|
||||
CACHE_RULES *rules;
|
||||
CACHE_STORAGE_MODULE *module;
|
||||
CACHE_STORAGE *storage;
|
||||
const char *name; // The name of the instance; the section name in the config.
|
||||
CACHE_CONFIG config; // The configuration of the cache instance.
|
||||
CACHE_RULES *rules; // The rules of the cache instance.
|
||||
CACHE_STORAGE_MODULE *module; // The storage module.
|
||||
CACHE_STORAGE *storage; // The storage API.
|
||||
HASHTABLE *pending; // Pending items; being fetched from the backend.
|
||||
SPINLOCK pending_lock; // Lock used for protecting 'pending'.
|
||||
} CACHE_INSTANCE;
|
||||
|
||||
typedef enum cache_session_state
|
||||
@ -149,17 +153,18 @@ static void cache_response_state_reset(CACHE_RESPONSE_STATE *state);
|
||||
|
||||
typedef struct cache_session_data
|
||||
{
|
||||
CACHE_INSTANCE *instance; /**< The cache instance the session is associated with. */
|
||||
CACHE_STORAGE_API *api; /**< The storage API to be used. */
|
||||
CACHE_STORAGE *storage; /**< The storage to be used with this session data. */
|
||||
DOWNSTREAM down; /**< The previous filter or equivalent. */
|
||||
UPSTREAM up; /**< The next filter or equivalent. */
|
||||
CACHE_RESPONSE_STATE res; /**< The response state. */
|
||||
SESSION *session; /**< The session this data is associated with. */
|
||||
char key[CACHE_KEY_MAXLEN]; /**< Key storage. */
|
||||
char *default_db; /**< The default database. */
|
||||
char *use_db; /**< Pending default database. Needs server response. */
|
||||
cache_session_state_t state;
|
||||
CACHE_INSTANCE *instance; /**< The cache instance the session is associated with. */
|
||||
CACHE_STORAGE_API *api; /**< The storage API to be used. */
|
||||
CACHE_STORAGE *storage; /**< The storage to be used with this session data. */
|
||||
DOWNSTREAM down; /**< The previous filter or equivalent. */
|
||||
UPSTREAM up; /**< The next filter or equivalent. */
|
||||
CACHE_RESPONSE_STATE res; /**< The response state. */
|
||||
SESSION *session; /**< The session this data is associated with. */
|
||||
char key[CACHE_KEY_MAXLEN]; /**< Key storage. */
|
||||
char *default_db; /**< The default database. */
|
||||
char *use_db; /**< Pending default database. Needs server response. */
|
||||
cache_session_state_t state; /**< What state is the session in, what data is expected. */
|
||||
bool refreshing; /**< Whether the session is updating a stale cache entry. */
|
||||
} CACHE_SESSION_DATA;
|
||||
|
||||
static CACHE_SESSION_DATA *cache_session_data_create(CACHE_INSTANCE *instance, SESSION *session);
|
||||
@ -172,12 +177,58 @@ static int handle_expecting_rows(CACHE_SESSION_DATA *csdata);
|
||||
static int handle_expecting_use_response(CACHE_SESSION_DATA *csdata);
|
||||
static int handle_ignoring_response(CACHE_SESSION_DATA *csdata);
|
||||
static bool process_params(char **options, FILTER_PARAMETER **params, CACHE_CONFIG* config);
|
||||
static bool route_using_cache(CACHE_SESSION_DATA *sdata, const GWBUF *key, GWBUF **value);
|
||||
static cache_result_t get_cached_response(CACHE_SESSION_DATA *sdata, const GWBUF *key, GWBUF **value);
|
||||
|
||||
static int send_upstream(CACHE_SESSION_DATA *csdata);
|
||||
|
||||
static void store_result(CACHE_SESSION_DATA *csdata);
|
||||
|
||||
/**
|
||||
* Hashes a cache key to an integer.
|
||||
*
|
||||
* @param key Pointer to cache key.
|
||||
*
|
||||
* @returns Corresponding integer hash.
|
||||
*/
|
||||
static int hash_of_key(const void* key)
|
||||
{
|
||||
int hash = 0;
|
||||
|
||||
const char* i = (const char*)key;
|
||||
const char* end = i + CACHE_KEY_MAXLEN;
|
||||
|
||||
while (i < end)
|
||||
{
|
||||
int c = *i;
|
||||
hash = c + (hash << 6) + (hash << 16) - hash;
|
||||
++i;
|
||||
}
|
||||
|
||||
return hash;
|
||||
}
|
||||
|
||||
static int hashfn(const void* address)
|
||||
{
|
||||
// TODO: Hash the address; pointers are not evenly distributed.
|
||||
return (long)address;
|
||||
}
|
||||
|
||||
static int hashcmp(const void* address1, const void* address2)
|
||||
{
|
||||
return (long)address2 - (long)address1;
|
||||
}
|
||||
|
||||
#define DUMMY_VALUE (void*)0xdeadbeef
|
||||
|
||||
// Initial size of hashtable used for storing keys of queries that
|
||||
// are being fetches.
|
||||
#define CACHE_PENDING_ITEMS 50
|
||||
|
||||
static inline bool log_decisions(const CACHE_SESSION_DATA* csdata)
|
||||
{
|
||||
return csdata->instance->config.debug & CACHE_DEBUG_DECISIONS ? true : false;
|
||||
}
|
||||
|
||||
//
|
||||
// API BEGIN
|
||||
//
|
||||
@ -212,7 +263,10 @@ static FILTER *createInstance(const char *name, char **options, FILTER_PARAMETER
|
||||
|
||||
if (rules)
|
||||
{
|
||||
if ((cinstance = MXS_CALLOC(1, sizeof(CACHE_INSTANCE))) != NULL)
|
||||
cinstance = MXS_CALLOC(1, sizeof(CACHE_INSTANCE));
|
||||
HASHTABLE* pending = hashtable_alloc(CACHE_PENDING_ITEMS, hashfn, hashcmp);
|
||||
|
||||
if (cinstance && pending)
|
||||
{
|
||||
CACHE_STORAGE_MODULE *module = cache_storage_open(config.storage);
|
||||
|
||||
@ -231,6 +285,7 @@ static FILTER *createInstance(const char *name, char **options, FILTER_PARAMETER
|
||||
cinstance->rules = rules;
|
||||
cinstance->module = module;
|
||||
cinstance->storage = storage;
|
||||
cinstance->pending = pending;
|
||||
|
||||
MXS_NOTICE("Cache storage %s opened and initialized.", config.storage);
|
||||
}
|
||||
@ -240,6 +295,7 @@ static FILTER *createInstance(const char *name, char **options, FILTER_PARAMETER
|
||||
cache_rules_free(rules);
|
||||
cache_storage_close(module);
|
||||
MXS_FREE(cinstance);
|
||||
hashtable_free(pending);
|
||||
cinstance = NULL;
|
||||
}
|
||||
}
|
||||
@ -251,6 +307,14 @@ static FILTER *createInstance(const char *name, char **options, FILTER_PARAMETER
|
||||
cinstance = NULL;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
MXS_FREE(cinstance);
|
||||
if (pending)
|
||||
{
|
||||
hashtable_free(pending);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -348,7 +412,7 @@ static int routeQuery(FILTER *instance, void *sdata, GWBUF *packet)
|
||||
ss_dassert(GWBUF_LENGTH(packet) >= MYSQL_HEADER_LEN + 1);
|
||||
ss_dassert(MYSQL_GET_PACKET_LEN(data) + MYSQL_HEADER_LEN == GWBUF_LENGTH(packet));
|
||||
|
||||
bool use_default = true;
|
||||
bool fetch_from_server = true;
|
||||
|
||||
cache_response_state_reset(&csdata->res);
|
||||
csdata->state = CACHE_IGNORING_RESPONSE;
|
||||
@ -398,26 +462,81 @@ static int routeQuery(FILTER *instance, void *sdata, GWBUF *packet)
|
||||
{
|
||||
if (cache_rules_should_use(cinstance->rules, csdata->session))
|
||||
{
|
||||
GWBUF *result;
|
||||
use_default = !route_using_cache(csdata, packet, &result);
|
||||
GWBUF *response;
|
||||
cache_result_t result = get_cached_response(csdata, packet, &response);
|
||||
|
||||
if (use_default)
|
||||
switch (result)
|
||||
{
|
||||
case CACHE_RESULT_STALE:
|
||||
{
|
||||
// The value was found, but it was stale. Now we need to
|
||||
// figure out whether somebody else is already fetching it.
|
||||
|
||||
long key = hash_of_key(csdata->key);
|
||||
|
||||
spinlock_acquire(&cinstance->pending_lock);
|
||||
// TODO: Remove the internal locking of hashtable. The internal
|
||||
// TODO: locking is no good if you need transactional behaviour.
|
||||
// TODO: Now we lock twice.
|
||||
void *value = hashtable_fetch(cinstance->pending, (void*)key);
|
||||
if (!value)
|
||||
{
|
||||
// It's not being fetched, so we make a note that we are.
|
||||
hashtable_add(cinstance->pending, (void*)key, DUMMY_VALUE);
|
||||
}
|
||||
spinlock_release(&cinstance->pending_lock);
|
||||
|
||||
if (!value)
|
||||
{
|
||||
// We were the first ones who hit the stale item. It's
|
||||
// our responsibility now to fetch it.
|
||||
if (log_decisions(csdata))
|
||||
{
|
||||
MXS_NOTICE("Cache data is stale, fetching fresh from server.");
|
||||
}
|
||||
csdata->refreshing = true;
|
||||
fetch_from_server = true;
|
||||
break;
|
||||
}
|
||||
else
|
||||
{
|
||||
// Somebody is already fetching the new value. So, let's
|
||||
// use the stale value. No point in hitting the server twice.
|
||||
if (log_decisions(csdata))
|
||||
{
|
||||
MXS_NOTICE("Cache data is stale but returning it, fresh "
|
||||
"data is being fetched already.");
|
||||
}
|
||||
fetch_from_server = false;
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
||||
case CACHE_RESULT_OK:
|
||||
if (log_decisions(csdata))
|
||||
{
|
||||
MXS_NOTICE("Using fresh data from cache.");
|
||||
}
|
||||
fetch_from_server = false;
|
||||
break;
|
||||
|
||||
default:
|
||||
fetch_from_server = true;
|
||||
}
|
||||
|
||||
if (fetch_from_server)
|
||||
{
|
||||
csdata->state = CACHE_EXPECTING_RESPONSE;
|
||||
}
|
||||
else
|
||||
{
|
||||
csdata->state = CACHE_EXPECTING_NOTHING;
|
||||
if (csdata->instance->config.debug & CACHE_DEBUG_DECISIONS)
|
||||
{
|
||||
MXS_NOTICE("Using data from cache.");
|
||||
}
|
||||
gwbuf_free(packet);
|
||||
DCB *dcb = csdata->session->client_dcb;
|
||||
|
||||
// TODO: This is not ok. Any filters before this filter, will not
|
||||
// TODO: see this data.
|
||||
rv = dcb->func.write(dcb, result);
|
||||
rv = dcb->func.write(dcb, response);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -444,7 +563,7 @@ static int routeQuery(FILTER *instance, void *sdata, GWBUF *packet)
|
||||
}
|
||||
}
|
||||
|
||||
if (use_default)
|
||||
if (fetch_from_server)
|
||||
{
|
||||
rv = csdata->down.routeQuery(csdata->down.instance, csdata->down.session, packet);
|
||||
}
|
||||
@ -1079,22 +1198,24 @@ static bool process_params(char **options, FILTER_PARAMETER **params, CACHE_CONF
|
||||
* @param value The result.
|
||||
* @return True if the query was satisfied from the query.
|
||||
*/
|
||||
static bool route_using_cache(CACHE_SESSION_DATA *csdata,
|
||||
const GWBUF *query,
|
||||
GWBUF **value)
|
||||
static cache_result_t get_cached_response(CACHE_SESSION_DATA *csdata,
|
||||
const GWBUF *query,
|
||||
GWBUF **value)
|
||||
{
|
||||
cache_result_t result = csdata->api->getKey(csdata->storage, csdata->default_db, query, csdata->key);
|
||||
|
||||
if (result == CACHE_RESULT_OK)
|
||||
{
|
||||
result = csdata->api->getValue(csdata->storage, csdata->key, CACHE_FLAGS_NONE, value);
|
||||
uint32_t flags = CACHE_FLAGS_INCLUDE_STALE;
|
||||
|
||||
result = csdata->api->getValue(csdata->storage, csdata->key, flags, value);
|
||||
}
|
||||
else
|
||||
{
|
||||
MXS_ERROR("Could not create cache key.");
|
||||
}
|
||||
|
||||
return result == CACHE_RESULT_OK;
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -1138,4 +1259,19 @@ static void store_result(CACHE_SESSION_DATA *csdata)
|
||||
MXS_ERROR("Could not store cache item.");
|
||||
}
|
||||
}
|
||||
|
||||
if (csdata->refreshing)
|
||||
{
|
||||
long key = hash_of_key(csdata->key);
|
||||
|
||||
CACHE_INSTANCE *instance = csdata->instance;
|
||||
|
||||
spinlock_acquire(&instance->pending_lock);
|
||||
ss_dassert(hashtable_fetch(instance->pending, (void*)key) == DUMMY_VALUE);
|
||||
ss_debug(int n =) hashtable_delete(instance->pending, (void*)key);
|
||||
ss_dassert(n == 1);
|
||||
spinlock_release(&instance->pending_lock);
|
||||
|
||||
csdata->refreshing = false;
|
||||
}
|
||||
}
|
||||
|
2
server/modules/filter/cache/cache.h
vendored
2
server/modules/filter/cache/cache.h
vendored
@ -29,7 +29,7 @@ MXS_BEGIN_DECLS
|
||||
#define CACHE_DEBUG_RULES (CACHE_DEBUG_MATCHING | CACHE_DEBUG_NON_MATCHING)
|
||||
#define CACHE_DEBUG_USAGE (CACHE_DEBUG_USE | CACHE_DEBUG_NON_USE)
|
||||
#define CACHE_DEBUG_MIN CACHE_DEBUG_NONE
|
||||
#define CACHE_DEBUG_MAX (CACHE_DEBUG_RULES | CACHE_DEBUG_USAGE)
|
||||
#define CACHE_DEBUG_MAX (CACHE_DEBUG_RULES | CACHE_DEBUG_USAGE | CACHE_DEBUG_DECISIONS)
|
||||
|
||||
// Count
|
||||
#define CACHE_DEFAULT_MAX_RESULTSET_ROWS UINT_MAX
|
||||
|
Loading…
x
Reference in New Issue
Block a user