diff --git a/server/modules/filter/cache/cache.c b/server/modules/filter/cache/cache.c index c633d6661..0a5da258b 100644 --- a/server/modules/filter/cache/cache.c +++ b/server/modules/filter/cache/cache.c @@ -16,11 +16,13 @@ #include #include #include +#include #include #include #include #include #include +#include #include "rules.h" #include "storage.h" @@ -119,11 +121,13 @@ static const CACHE_CONFIG DEFAULT_CONFIG = typedef struct cache_instance { - const char *name; - CACHE_CONFIG config; - CACHE_RULES *rules; - CACHE_STORAGE_MODULE *module; - CACHE_STORAGE *storage; + const char *name; // The name of the instance; the section name in the config. + CACHE_CONFIG config; // The configuration of the cache instance. + CACHE_RULES *rules; // The rules of the cache instance. + CACHE_STORAGE_MODULE *module; // The storage module. + CACHE_STORAGE *storage; // The storage API. + HASHTABLE *pending; // Pending items; being fetched from the backend. + SPINLOCK pending_lock; // Lock used for protecting 'pending'. } CACHE_INSTANCE; typedef enum cache_session_state @@ -149,17 +153,18 @@ static void cache_response_state_reset(CACHE_RESPONSE_STATE *state); typedef struct cache_session_data { - CACHE_INSTANCE *instance; /**< The cache instance the session is associated with. */ - CACHE_STORAGE_API *api; /**< The storage API to be used. */ - CACHE_STORAGE *storage; /**< The storage to be used with this session data. */ - DOWNSTREAM down; /**< The previous filter or equivalent. */ - UPSTREAM up; /**< The next filter or equivalent. */ - CACHE_RESPONSE_STATE res; /**< The response state. */ - SESSION *session; /**< The session this data is associated with. */ - char key[CACHE_KEY_MAXLEN]; /**< Key storage. */ - char *default_db; /**< The default database. */ - char *use_db; /**< Pending default database. Needs server response. */ - cache_session_state_t state; + CACHE_INSTANCE *instance; /**< The cache instance the session is associated with. */ + CACHE_STORAGE_API *api; /**< The storage API to be used. */ + CACHE_STORAGE *storage; /**< The storage to be used with this session data. */ + DOWNSTREAM down; /**< The previous filter or equivalent. */ + UPSTREAM up; /**< The next filter or equivalent. */ + CACHE_RESPONSE_STATE res; /**< The response state. */ + SESSION *session; /**< The session this data is associated with. */ + char key[CACHE_KEY_MAXLEN]; /**< Key storage. */ + char *default_db; /**< The default database. */ + char *use_db; /**< Pending default database. Needs server response. */ + cache_session_state_t state; /**< What state is the session in, what data is expected. */ + bool refreshing; /**< Whether the session is updating a stale cache entry. */ } CACHE_SESSION_DATA; static CACHE_SESSION_DATA *cache_session_data_create(CACHE_INSTANCE *instance, SESSION *session); @@ -172,12 +177,58 @@ static int handle_expecting_rows(CACHE_SESSION_DATA *csdata); static int handle_expecting_use_response(CACHE_SESSION_DATA *csdata); static int handle_ignoring_response(CACHE_SESSION_DATA *csdata); static bool process_params(char **options, FILTER_PARAMETER **params, CACHE_CONFIG* config); -static bool route_using_cache(CACHE_SESSION_DATA *sdata, const GWBUF *key, GWBUF **value); +static cache_result_t get_cached_response(CACHE_SESSION_DATA *sdata, const GWBUF *key, GWBUF **value); static int send_upstream(CACHE_SESSION_DATA *csdata); static void store_result(CACHE_SESSION_DATA *csdata); +/** + * Hashes a cache key to an integer. + * + * @param key Pointer to cache key. + * + * @returns Corresponding integer hash. + */ +static int hash_of_key(const void* key) +{ + int hash = 0; + + const char* i = (const char*)key; + const char* end = i + CACHE_KEY_MAXLEN; + + while (i < end) + { + int c = *i; + hash = c + (hash << 6) + (hash << 16) - hash; + ++i; + } + + return hash; +} + +static int hashfn(const void* address) +{ + // TODO: Hash the address; pointers are not evenly distributed. + return (long)address; +} + +static int hashcmp(const void* address1, const void* address2) +{ + return (long)address2 - (long)address1; +} + +#define DUMMY_VALUE (void*)0xdeadbeef + +// Initial size of hashtable used for storing keys of queries that +// are being fetches. +#define CACHE_PENDING_ITEMS 50 + +static inline bool log_decisions(const CACHE_SESSION_DATA* csdata) +{ + return csdata->instance->config.debug & CACHE_DEBUG_DECISIONS ? true : false; +} + // // API BEGIN // @@ -212,7 +263,10 @@ static FILTER *createInstance(const char *name, char **options, FILTER_PARAMETER if (rules) { - if ((cinstance = MXS_CALLOC(1, sizeof(CACHE_INSTANCE))) != NULL) + cinstance = MXS_CALLOC(1, sizeof(CACHE_INSTANCE)); + HASHTABLE* pending = hashtable_alloc(CACHE_PENDING_ITEMS, hashfn, hashcmp); + + if (cinstance && pending) { CACHE_STORAGE_MODULE *module = cache_storage_open(config.storage); @@ -231,6 +285,7 @@ static FILTER *createInstance(const char *name, char **options, FILTER_PARAMETER cinstance->rules = rules; cinstance->module = module; cinstance->storage = storage; + cinstance->pending = pending; MXS_NOTICE("Cache storage %s opened and initialized.", config.storage); } @@ -240,6 +295,7 @@ static FILTER *createInstance(const char *name, char **options, FILTER_PARAMETER cache_rules_free(rules); cache_storage_close(module); MXS_FREE(cinstance); + hashtable_free(pending); cinstance = NULL; } } @@ -251,6 +307,14 @@ static FILTER *createInstance(const char *name, char **options, FILTER_PARAMETER cinstance = NULL; } } + else + { + MXS_FREE(cinstance); + if (pending) + { + hashtable_free(pending); + } + } } } @@ -348,7 +412,7 @@ static int routeQuery(FILTER *instance, void *sdata, GWBUF *packet) ss_dassert(GWBUF_LENGTH(packet) >= MYSQL_HEADER_LEN + 1); ss_dassert(MYSQL_GET_PACKET_LEN(data) + MYSQL_HEADER_LEN == GWBUF_LENGTH(packet)); - bool use_default = true; + bool fetch_from_server = true; cache_response_state_reset(&csdata->res); csdata->state = CACHE_IGNORING_RESPONSE; @@ -398,26 +462,81 @@ static int routeQuery(FILTER *instance, void *sdata, GWBUF *packet) { if (cache_rules_should_use(cinstance->rules, csdata->session)) { - GWBUF *result; - use_default = !route_using_cache(csdata, packet, &result); + GWBUF *response; + cache_result_t result = get_cached_response(csdata, packet, &response); - if (use_default) + switch (result) + { + case CACHE_RESULT_STALE: + { + // The value was found, but it was stale. Now we need to + // figure out whether somebody else is already fetching it. + + long key = hash_of_key(csdata->key); + + spinlock_acquire(&cinstance->pending_lock); + // TODO: Remove the internal locking of hashtable. The internal + // TODO: locking is no good if you need transactional behaviour. + // TODO: Now we lock twice. + void *value = hashtable_fetch(cinstance->pending, (void*)key); + if (!value) + { + // It's not being fetched, so we make a note that we are. + hashtable_add(cinstance->pending, (void*)key, DUMMY_VALUE); + } + spinlock_release(&cinstance->pending_lock); + + if (!value) + { + // We were the first ones who hit the stale item. It's + // our responsibility now to fetch it. + if (log_decisions(csdata)) + { + MXS_NOTICE("Cache data is stale, fetching fresh from server."); + } + csdata->refreshing = true; + fetch_from_server = true; + break; + } + else + { + // Somebody is already fetching the new value. So, let's + // use the stale value. No point in hitting the server twice. + if (log_decisions(csdata)) + { + MXS_NOTICE("Cache data is stale but returning it, fresh " + "data is being fetched already."); + } + fetch_from_server = false; + } + } + break; + + case CACHE_RESULT_OK: + if (log_decisions(csdata)) + { + MXS_NOTICE("Using fresh data from cache."); + } + fetch_from_server = false; + break; + + default: + fetch_from_server = true; + } + + if (fetch_from_server) { csdata->state = CACHE_EXPECTING_RESPONSE; } else { csdata->state = CACHE_EXPECTING_NOTHING; - if (csdata->instance->config.debug & CACHE_DEBUG_DECISIONS) - { - MXS_NOTICE("Using data from cache."); - } gwbuf_free(packet); DCB *dcb = csdata->session->client_dcb; // TODO: This is not ok. Any filters before this filter, will not // TODO: see this data. - rv = dcb->func.write(dcb, result); + rv = dcb->func.write(dcb, response); } } } @@ -444,7 +563,7 @@ static int routeQuery(FILTER *instance, void *sdata, GWBUF *packet) } } - if (use_default) + if (fetch_from_server) { rv = csdata->down.routeQuery(csdata->down.instance, csdata->down.session, packet); } @@ -1079,22 +1198,24 @@ static bool process_params(char **options, FILTER_PARAMETER **params, CACHE_CONF * @param value The result. * @return True if the query was satisfied from the query. */ -static bool route_using_cache(CACHE_SESSION_DATA *csdata, - const GWBUF *query, - GWBUF **value) +static cache_result_t get_cached_response(CACHE_SESSION_DATA *csdata, + const GWBUF *query, + GWBUF **value) { cache_result_t result = csdata->api->getKey(csdata->storage, csdata->default_db, query, csdata->key); if (result == CACHE_RESULT_OK) { - result = csdata->api->getValue(csdata->storage, csdata->key, CACHE_FLAGS_NONE, value); + uint32_t flags = CACHE_FLAGS_INCLUDE_STALE; + + result = csdata->api->getValue(csdata->storage, csdata->key, flags, value); } else { MXS_ERROR("Could not create cache key."); } - return result == CACHE_RESULT_OK; + return result; } /** @@ -1138,4 +1259,19 @@ static void store_result(CACHE_SESSION_DATA *csdata) MXS_ERROR("Could not store cache item."); } } + + if (csdata->refreshing) + { + long key = hash_of_key(csdata->key); + + CACHE_INSTANCE *instance = csdata->instance; + + spinlock_acquire(&instance->pending_lock); + ss_dassert(hashtable_fetch(instance->pending, (void*)key) == DUMMY_VALUE); + ss_debug(int n =) hashtable_delete(instance->pending, (void*)key); + ss_dassert(n == 1); + spinlock_release(&instance->pending_lock); + + csdata->refreshing = false; + } } diff --git a/server/modules/filter/cache/cache.h b/server/modules/filter/cache/cache.h index 5b725f147..0b03cdeb5 100644 --- a/server/modules/filter/cache/cache.h +++ b/server/modules/filter/cache/cache.h @@ -29,7 +29,7 @@ MXS_BEGIN_DECLS #define CACHE_DEBUG_RULES (CACHE_DEBUG_MATCHING | CACHE_DEBUG_NON_MATCHING) #define CACHE_DEBUG_USAGE (CACHE_DEBUG_USE | CACHE_DEBUG_NON_USE) #define CACHE_DEBUG_MIN CACHE_DEBUG_NONE -#define CACHE_DEBUG_MAX (CACHE_DEBUG_RULES | CACHE_DEBUG_USAGE) +#define CACHE_DEBUG_MAX (CACHE_DEBUG_RULES | CACHE_DEBUG_USAGE | CACHE_DEBUG_DECISIONS) // Count #define CACHE_DEFAULT_MAX_RESULTSET_ROWS UINT_MAX