Cache: Take Cache class into use

Now the basic structure is in place:

- cachefilter.cc is the MaxScale filter interface.
- Cache is the actual cache class that will also handle LRU issues.
- SessionCache (sessioncache.cc) is the session specific cache class
  that using Cache acts as the cache for a particular session.
  If an item is stale, then one SessionCache will update it.
- StorageFactory is the component that is capable of loading a module
  providing storage facilities.
- Storage is the actual key/value store.
This commit is contained in:
Johan Wikman
2016-11-24 16:48:49 +02:00
parent 719cffd596
commit 14c8a5bb5f
6 changed files with 127 additions and 437 deletions

View File

@ -23,6 +23,42 @@
// are being fetches. // are being fetches.
#define CACHE_PENDING_ITEMS 50 #define CACHE_PENDING_ITEMS 50
static const CACHE_CONFIG DEFAULT_CONFIG =
{
CACHE_DEFAULT_MAX_RESULTSET_ROWS,
CACHE_DEFAULT_MAX_RESULTSET_SIZE,
NULL,
NULL,
NULL,
NULL,
0,
CACHE_DEFAULT_TTL,
CACHE_DEFAULT_DEBUG
};
/**
* Hashes a cache key to an integer.
*
* @param key Pointer to cache key.
*
* @returns Corresponding integer hash.
*/
static int hash_of_key(const void* key)
{
int hash = 0;
const char* i = (const char*)key;
const char* end = i + CACHE_KEY_MAXLEN;
while (i < end)
{
int c = *i;
hash = c + (hash << 6) + (hash << 16) - hash;
++i;
}
return hash;
}
static int hashfn(const void* address) static int hashfn(const void* address)
{ {
@ -62,8 +98,7 @@ Cache* Cache::Create(const char* zName, char** pzOptions, FILTER_PARAMETER** ppP
{ {
Cache* pCache = NULL; Cache* pCache = NULL;
CACHE_CONFIG config; CACHE_CONFIG config = DEFAULT_CONFIG;
memset(&config, 0, sizeof(config));
if (process_params(pzOptions, ppParams, &config)) if (process_params(pzOptions, ppParams, &config))
{ {

View File

@ -65,6 +65,8 @@ public:
*/ */
void refreshed(const char* pKey, const SessionCache* pSessionCache); void refreshed(const char* pKey, const SessionCache* pSessionCache);
const CACHE_CONFIG& config() const { return m_config; }
cache_result_t getKey(const char* zDefaultDb, const GWBUF* pQuery, char* pKey); cache_result_t getKey(const char* zDefaultDb, const GWBUF* pQuery, char* pKey);
cache_result_t getValue(const char* pKey, uint32_t flags, GWBUF** ppValue); cache_result_t getValue(const char* pKey, uint32_t flags, GWBUF** ppValue);

View File

@ -12,27 +12,21 @@
*/ */
#define MXS_MODULE_NAME "cache" #define MXS_MODULE_NAME "cache"
#include "cachefilter.h"
#include <maxscale/alloc.h>
#include <maxscale/filter.h> #include <maxscale/filter.h>
#include <maxscale/gwdirs.h> #include "cache.h"
#include <maxscale/log_manager.h>
#include "rules.h"
#include "sessioncache.h" #include "sessioncache.h"
#include "storage.h"
#include "storagefactory.h"
static char VERSION_STRING[] = "V1.0.0"; static char VERSION_STRING[] = "V1.0.0";
static FILTER *createInstance(const char *name, char **options, FILTER_PARAMETER **); static FILTER* createInstance(const char* zName, char** pzOptions, FILTER_PARAMETER** ppParams);
static void *newSession(FILTER *instance, SESSION *session); static void* newSession(FILTER* pInstance, SESSION* pSession);
static void closeSession(FILTER *instance, void *sdata); static void closeSession(FILTER* pInstance, void* pSessionData);
static void freeSession(FILTER *instance, void *sdata); static void freeSession(FILTER* pInstance, void* pSessionData);
static void setDownstream(FILTER *instance, void *sdata, DOWNSTREAM *downstream); static void setDownstream(FILTER* pInstance, void* pSessionData, DOWNSTREAM* pDownstream);
static void setUpstream(FILTER *instance, void *sdata, UPSTREAM *upstream); static void setUpstream(FILTER* pInstance, void* pSessionData, UPSTREAM* pUpstream);
static int routeQuery(FILTER *instance, void *sdata, GWBUF *queue); static int routeQuery(FILTER* pInstance, void* pSessionData, GWBUF* pPacket);
static int clientReply(FILTER *instance, void *sdata, GWBUF *queue); static int clientReply(FILTER* pInstance, void* pSessionData, GWBUF* pPacket);
static void diagnostics(FILTER *instance, void *sdata, DCB *dcb); static void diagnostics(FILTER* pInstance, void* pSessionData, DCB* pDcb);
static uint64_t getCapabilities(void); static uint64_t getCapabilities(void);
// //
@ -86,168 +80,38 @@ extern "C" FILTER_OBJECT *GetModuleObject()
}; };
// //
// Implementation // API Implementation
//
static const CACHE_CONFIG DEFAULT_CONFIG =
{
CACHE_DEFAULT_MAX_RESULTSET_ROWS,
CACHE_DEFAULT_MAX_RESULTSET_SIZE,
NULL,
NULL,
NULL,
NULL,
0,
CACHE_DEFAULT_TTL,
CACHE_DEFAULT_DEBUG
};
static bool process_params(char **options, FILTER_PARAMETER **params, CACHE_CONFIG* config);
/**
* Hashes a cache key to an integer.
*
* @param key Pointer to cache key.
*
* @returns Corresponding integer hash.
*/
int hash_of_key(const void* key)
{
int hash = 0;
const char* i = (const char*)key;
const char* end = i + CACHE_KEY_MAXLEN;
while (i < end)
{
int c = *i;
hash = c + (hash << 6) + (hash << 16) - hash;
++i;
}
return hash;
}
static int hashfn(const void* address)
{
// TODO: Hash the address; pointers are not evenly distributed.
return (long)address;
}
static int hashcmp(const void* address1, const void* address2)
{
return (long)address2 - (long)address1;
}
// Initial size of hashtable used for storing keys of queries that
// are being fetches.
#define CACHE_PENDING_ITEMS 50
//
// API BEGIN
// //
/** /**
* Create an instance of the cache filter for a particular service * Create an instance of the cache filter for a particular service
* within MaxScale. * within MaxScale.
* *
* @param name The name of the instance (as defined in the config file). * @param zName The name of the instance (as defined in the config file).
* @param options The options for this filter * @param pzOptions The options for this filter
* @param params The array of name/value pair parameters for the filter * @param ppparams The array of name/value pair parameters for the filter
* *
* @return The instance data for this new instance * @return The instance data for this new instance
*/ */
static FILTER *createInstance(const char *name, char **options, FILTER_PARAMETER **params) static FILTER *createInstance(const char* zName, char** pzOptions, FILTER_PARAMETER** ppParams)
{ {
CACHE_INSTANCE *cinstance = NULL; Cache* pCache = Cache::Create(zName, pzOptions, ppParams);
CACHE_CONFIG config = DEFAULT_CONFIG;
if (process_params(options, params, &config)) return reinterpret_cast<FILTER*>(pCache);
{
CACHE_RULES *rules = NULL;
if (config.rules)
{
rules = cache_rules_load(config.rules, config.debug);
}
else
{
rules = cache_rules_create(config.debug);
}
if (rules)
{
cinstance = (CACHE_INSTANCE*)MXS_CALLOC(1, sizeof(CACHE_INSTANCE));
HASHTABLE* pending = hashtable_alloc(CACHE_PENDING_ITEMS, hashfn, hashcmp);
if (cinstance && pending)
{
StorageFactory *factory = StorageFactory::Open(config.storage);
if (factory)
{
uint32_t ttl = config.ttl;
int argc = config.storage_argc;
char** argv = config.storage_argv;
Storage *storage = factory->createStorage(name, ttl, argc, argv);
if (storage)
{
cinstance->name = name;
cinstance->config = config;
cinstance->rules = rules;
cinstance->factory = factory;
cinstance->storage = storage;
cinstance->pending = pending;
MXS_NOTICE("Cache storage %s opened and initialized.", config.storage);
}
else
{
MXS_ERROR("Could not create storage instance for '%s'.", name);
cache_rules_free(rules);
delete factory;
MXS_FREE(cinstance);
hashtable_free(pending);
cinstance = NULL;
}
}
else
{
MXS_ERROR("Could not load cache storage module '%s'.", name);
cache_rules_free(rules);
MXS_FREE(cinstance);
cinstance = NULL;
}
}
else
{
MXS_FREE(cinstance);
if (pending)
{
hashtable_free(pending);
}
}
}
}
return (FILTER*)cinstance;
} }
/** /**
* Associate a new session with this instance of the filter. * Associate a new session with this instance of the filter.
* *
* @param instance The cache instance data * @param pInstance The cache instance data
* @param session The session itself * @param pSession The session itself
* *
* @return Session specific data for this session * @return Session specific data for this session
*/ */
static void *newSession(FILTER *instance, SESSION *session) static void *newSession(FILTER* pInstance, SESSION* pSession)
{ {
CACHE_INSTANCE *cinstance = (CACHE_INSTANCE*)instance; Cache* pCache = reinterpret_cast<Cache*>(pInstance);
SessionCache* pSessionCache = SessionCache::Create(pCache, pSession);
SessionCache *pSessionCache = SessionCache::Create(cinstance, session);
return pSessionCache; return pSessionCache;
} }
@ -255,14 +119,12 @@ static void *newSession(FILTER *instance, SESSION *session)
/** /**
* A session has been closed. * A session has been closed.
* *
* @param instance The cache instance data * @param pInstance The cache instance data
* @param sdata The session data of the session being closed * @param pSessionData The session data of the session being closed
*/ */
static void closeSession(FILTER *instance, void *sdata) static void closeSession(FILTER* pInstance, void* pSessionData)
{ {
CACHE_INSTANCE *cinstance = (CACHE_INSTANCE*)instance; SessionCache* pSessionCache = static_cast<SessionCache*>(pSessionData);
SessionCache* pSessionCache = static_cast<SessionCache*>(sdata);
pSessionCache->close(); pSessionCache->close();
} }
@ -270,14 +132,12 @@ static void closeSession(FILTER *instance, void *sdata)
/** /**
* Free the session data. * Free the session data.
* *
* @param instance The cache instance data * @param pInstance The cache instance data
* @param sdata The session data of the session being closed * @param pSessionData The session data of the session being closed
*/ */
static void freeSession(FILTER *instance, void *sdata) static void freeSession(FILTER* pInstance, void* pSessionData)
{ {
CACHE_INSTANCE *cinstance = (CACHE_INSTANCE*)instance; SessionCache* pSessionCache = static_cast<SessionCache*>(pSessionData);
SessionCache* pSessionCache = static_cast<SessionCache*>(sdata);
delete pSessionCache; delete pSessionCache;
} }
@ -285,84 +145,74 @@ static void freeSession(FILTER *instance, void *sdata)
/** /**
* Set the downstream component for this filter. * Set the downstream component for this filter.
* *
* @param instance The cache instance data * @param pInstance The cache instance data
* @param sdata The session data of the session * @param pSessionData The session data of the session
* @param down The downstream filter or router * @param pDownstream The downstream filter or router
*/ */
static void setDownstream(FILTER *instance, void *sdata, DOWNSTREAM *down) static void setDownstream(FILTER* pInstance, void* pSessionData, DOWNSTREAM* pDownstream)
{ {
CACHE_INSTANCE *cinstance = (CACHE_INSTANCE*)instance; SessionCache* pSessionCache = static_cast<SessionCache*>(pSessionData);
SessionCache* pSessionCache = static_cast<SessionCache*>(sdata); pSessionCache->setDownstream(pDownstream);
pSessionCache->setDownstream(down);
} }
/** /**
* Set the upstream component for this filter. * Set the upstream component for this filter.
* *
* @param instance The cache instance data * @param pInstance The cache instance data
* @param sdata The session data of the session * @param pSessionData The session data of the session
* @param up The upstream filter or router * @param pUpstream The upstream filter or router
*/ */
static void setUpstream(FILTER *instance, void *sdata, UPSTREAM *up) static void setUpstream(FILTER* pInstance, void* pSessionData, UPSTREAM* pUpstream)
{ {
CACHE_INSTANCE *cinstance = (CACHE_INSTANCE*)instance; SessionCache* pSessionCache = static_cast<SessionCache*>(pSessionData);
SessionCache* pSessionCache = static_cast<SessionCache*>(sdata); pSessionCache->setUpstream(pUpstream);
pSessionCache->setUpstream(up);
} }
/** /**
* A request on its way to a backend is delivered to this function. * A request on its way to a backend is delivered to this function.
* *
* @param instance The filter instance data * @param pInstance The filter instance data
* @param sdata The filter session data * @param pSessionData The filter session data
* @param buffer Buffer containing an MySQL protocol packet. * @param pPacket Buffer containing an MySQL protocol packet.
*/ */
static int routeQuery(FILTER *instance, void *sdata, GWBUF *packet) static int routeQuery(FILTER* pInstance, void* pSessionData, GWBUF* pPacket)
{ {
CACHE_INSTANCE *cinstance = (CACHE_INSTANCE*)instance; SessionCache* pSessionCache = static_cast<SessionCache*>(pSessionData);
SessionCache* pSessionCache = static_cast<SessionCache*>(sdata); return pSessionCache->routeQuery(pPacket);
return pSessionCache->routeQuery(packet);
} }
/** /**
* A response on its way to the client is delivered to this function. * A response on its way to the client is delivered to this function.
* *
* @param instance The filter instance data * @param pInstance The filter instance data
* @param sdata The filter session data * @param pSessionData The filter session data
* @param queue The query data * @param pPacket The response data
*/ */
static int clientReply(FILTER *instance, void *sdata, GWBUF *data) static int clientReply(FILTER* pInstance, void* pSessionData, GWBUF* pPacket)
{ {
CACHE_INSTANCE *cinstance = (CACHE_INSTANCE*)instance; SessionCache* pSessionCache = static_cast<SessionCache*>(pSessionData);
SessionCache* pSessionCache = static_cast<SessionCache*>(sdata); return pSessionCache->clientReply(pPacket);
return pSessionCache->clientReply(data);
} }
/** /**
* Diagnostics routine * Diagnostics routine
* *
* If csdata is NULL then print diagnostics on the instance as a whole, * If cpSessionData is NULL then print diagnostics on the instance as a whole,
* otherwise print diagnostics for the particular session. * otherwise print diagnostics for the particular session.
* *
* @param instance The filter instance * @param pInstance The filter instance
* @param fsession Filter session, may be NULL * @param pSessionData Filter session, may be NULL
* @param dcb The DCB for diagnostic output * @param pDcb The DCB for diagnostic output
*/ */
static void diagnostics(FILTER *instance, void *sdata, DCB *dcb) static void diagnostics(FILTER* pInstance, void* pSessionData, DCB* pDcb)
{ {
CACHE_INSTANCE *cinstance = (CACHE_INSTANCE*)instance; SessionCache* pSessionCache = static_cast<SessionCache*>(pSessionData);
SessionCache* pSessionCache = static_cast<SessionCache*>(sdata); pSessionCache->diagnostics(pDcb);
pSessionCache->diagnostics(dcb);
} }
@ -375,166 +225,3 @@ static uint64_t getCapabilities(void)
{ {
return RCAP_TYPE_TRANSACTION_TRACKING; return RCAP_TYPE_TRANSACTION_TRACKING;
} }
//
// API END
//
/**
* Processes the cache params
*
* @param options Options as passed to the filter.
* @param params Parameters as passed to the filter.
* @param config Pointer to config instance where params will be stored.
*
* @return True if all parameters could be processed, false otherwise.
*/
static bool process_params(char **options, FILTER_PARAMETER **params, CACHE_CONFIG* config)
{
bool error = false;
for (int i = 0; params[i]; ++i)
{
const FILTER_PARAMETER *param = params[i];
if (strcmp(param->name, "max_resultset_rows") == 0)
{
int v = atoi(param->value);
if (v > 0)
{
config->max_resultset_rows = v;
}
else
{
config->max_resultset_rows = CACHE_DEFAULT_MAX_RESULTSET_ROWS;
}
}
else if (strcmp(param->name, "max_resultset_size") == 0)
{
int v = atoi(param->value);
if (v > 0)
{
config->max_resultset_size = v * 1024;
}
else
{
MXS_ERROR("The value of the configuration entry '%s' must "
"be an integer larger than 0.", param->name);
error = true;
}
}
else if (strcmp(param->name, "rules") == 0)
{
if (*param->value == '/')
{
config->rules = MXS_STRDUP(param->value);
}
else
{
const char *datadir = get_datadir();
size_t len = strlen(datadir) + 1 + strlen(param->value) + 1;
char *rules = (char*)MXS_MALLOC(len);
if (rules)
{
sprintf(rules, "%s/%s", datadir, param->value);
config->rules = rules;
}
}
if (!config->rules)
{
error = true;
}
}
else if (strcmp(param->name, "storage_options") == 0)
{
config->storage_options = MXS_STRDUP(param->value);
if (config->storage_options)
{
int argc = 1;
char *arg = config->storage_options;
while ((arg = strchr(config->storage_options, ',')))
{
++argc;
}
config->storage_argv = (char**) MXS_MALLOC((argc + 1) * sizeof(char*));
if (config->storage_argv)
{
config->storage_argc = argc;
int i = 0;
arg = config->storage_options;
config->storage_argv[i++] = arg;
while ((arg = strchr(config->storage_options, ',')))
{
*arg = 0;
++arg;
config->storage_argv[i++] = arg;
}
config->storage_argv[i] = NULL;
}
else
{
MXS_FREE(config->storage_options);
config->storage_options = NULL;
}
}
else
{
error = true;
}
}
else if (strcmp(param->name, "storage") == 0)
{
config->storage = param->value;
}
else if (strcmp(param->name, "ttl") == 0)
{
int v = atoi(param->value);
if (v > 0)
{
config->ttl = v;
}
else
{
MXS_ERROR("The value of the configuration entry '%s' must "
"be an integer larger than 0.", param->name);
error = true;
}
}
else if (strcmp(param->name, "debug") == 0)
{
int v = atoi(param->value);
if ((v >= CACHE_DEBUG_MIN) && (v <= CACHE_DEBUG_MAX))
{
config->debug = v;
}
else
{
MXS_ERROR("The value of the configuration entry '%s' must "
"be between %d and %d, inclusive.",
param->name, CACHE_DEBUG_MIN, CACHE_DEBUG_MAX);
error = true;
}
}
else if (!filter_standard_parameter(params[i]->name))
{
MXS_ERROR("Unknown configuration entry '%s'.", param->name);
error = true;
}
}
return !error;
}

View File

@ -57,17 +57,4 @@ typedef struct cache_config
uint32_t debug; uint32_t debug;
} CACHE_CONFIG; } CACHE_CONFIG;
typedef struct cache_instance
{
const char *name; // The name of the instance; the section name in the config.
CACHE_CONFIG config; // The configuration of the cache instance.
CACHE_RULES *rules; // The rules of the cache instance.
StorageFactory *factory; // The storage factory.
Storage *storage; // The storage instance to use.
HASHTABLE *pending; // Pending items; being fetched from the backend.
SPINLOCK pending_lock; // Lock used for protecting 'pending'.
} CACHE_INSTANCE;
int hash_of_key(const void* key);
#endif #endif

View File

@ -11,6 +11,7 @@
* Public License. * Public License.
*/ */
#define MXS_MODULE_NAME "cache"
#include "sessioncache.h" #include "sessioncache.h"
#include <new> #include <new>
#include <maxscale/alloc.h> #include <maxscale/alloc.h>
@ -18,13 +19,10 @@
#include <maxscale/mysql_utils.h> #include <maxscale/mysql_utils.h>
#include "storage.h" #include "storage.h"
#define DUMMY_VALUE (void*)0xdeadbeef SessionCache::SessionCache(Cache* pCache, SESSION* pSession, char* zDefaultDb)
SessionCache::SessionCache(CACHE_INSTANCE* pInstance, SESSION* pSession, char* zDefaultDb)
: m_state(CACHE_EXPECTING_NOTHING) : m_state(CACHE_EXPECTING_NOTHING)
, m_pInstance(pInstance) , m_pCache(pCache)
, m_pSession(pSession) , m_pSession(pSession)
, m_pStorage(pInstance->storage)
, m_zDefaultDb(zDefaultDb) , m_zDefaultDb(zDefaultDb)
, m_zUseDb(NULL) , m_zUseDb(NULL)
, m_refreshing(false) , m_refreshing(false)
@ -43,7 +41,7 @@ SessionCache::~SessionCache()
} }
//static //static
SessionCache* SessionCache::Create(CACHE_INSTANCE* pInstance, SESSION* pSession) SessionCache* SessionCache::Create(Cache* pCache, SESSION* pSession)
{ {
SessionCache* pSessionCache = NULL; SessionCache* pSessionCache = NULL;
@ -60,7 +58,7 @@ SessionCache* SessionCache::Create(CACHE_INSTANCE* pInstance, SESSION* pSession)
if ((pMysqlSession->db[0] == 0) || zDefaultDb) if ((pMysqlSession->db[0] == 0) || zDefaultDb)
{ {
pSessionCache = new (std::nothrow) SessionCache(pInstance, pSession, zDefaultDb); pSessionCache = new (std::nothrow) SessionCache(pCache, pSession, zDefaultDb);
if (!pSessionCache) if (!pSessionCache)
{ {
@ -140,9 +138,9 @@ int SessionCache::routeQuery(GWBUF* pPacket)
if ((session_is_autocommit(session) && !session_trx_is_active(session)) || if ((session_is_autocommit(session) && !session_trx_is_active(session)) ||
session_trx_is_read_only(session)) session_trx_is_read_only(session))
{ {
if (cache_rules_should_store(m_pInstance->rules, m_zDefaultDb, pPacket)) if (m_pCache->shouldStore(m_zDefaultDb, pPacket))
{ {
if (cache_rules_should_use(m_pInstance->rules, m_pSession)) if (m_pCache->shouldUse(m_pSession))
{ {
GWBUF* pResponse; GWBUF* pResponse;
cache_result_t result = get_cached_response(pPacket, &pResponse); cache_result_t result = get_cached_response(pPacket, &pResponse);
@ -154,21 +152,7 @@ int SessionCache::routeQuery(GWBUF* pPacket)
// The value was found, but it was stale. Now we need to // The value was found, but it was stale. Now we need to
// figure out whether somebody else is already fetching it. // figure out whether somebody else is already fetching it.
long key = hash_of_key(m_key); if (m_pCache->mustRefresh(m_key, this))
spinlock_acquire(&m_pInstance->pending_lock);
// TODO: Remove the internal locking of hashtable. The internal
// TODO: locking is no good if you need transactional behaviour.
// TODO: Now we lock twice.
void *value = hashtable_fetch(m_pInstance->pending, (void*)key);
if (!value)
{
// It's not being fetched, so we make a note that we are.
hashtable_add(m_pInstance->pending, (void*)key, DUMMY_VALUE);
}
spinlock_release(&m_pInstance->pending_lock);
if (!value)
{ {
// We were the first ones who hit the stale item. It's // We were the first ones who hit the stale item. It's
// our responsibility now to fetch it. // our responsibility now to fetch it.
@ -229,7 +213,7 @@ int SessionCache::routeQuery(GWBUF* pPacket)
} }
else else
{ {
if (m_pInstance->config.debug & CACHE_DEBUG_DECISIONS) if (log_decisions())
{ {
MXS_NOTICE("autocommit = %s and transaction state %s => Not using or " MXS_NOTICE("autocommit = %s and transaction state %s => Not using or "
"storing to cache.", "storing to cache.",
@ -268,14 +252,14 @@ int SessionCache::clientReply(GWBUF* pData)
if (m_state != CACHE_IGNORING_RESPONSE) if (m_state != CACHE_IGNORING_RESPONSE)
{ {
if (gwbuf_length(m_res.pData) > m_pInstance->config.max_resultset_size) if (gwbuf_length(m_res.pData) > m_pCache->config().max_resultset_size)
{ {
if (m_pInstance->config.debug & CACHE_DEBUG_DECISIONS) if (log_decisions())
{ {
MXS_NOTICE("Current size %uB of resultset, at least as much " MXS_NOTICE("Current size %uB of resultset, at least as much "
"as maximum allowed size %uKiB. Not caching.", "as maximum allowed size %uKiB. Not caching.",
gwbuf_length(m_res.pData), gwbuf_length(m_res.pData),
m_pInstance->config.max_resultset_size / 1024); m_pCache->config().max_resultset_size / 1024);
} }
m_state = CACHE_IGNORING_RESPONSE; m_state = CACHE_IGNORING_RESPONSE;
@ -503,9 +487,9 @@ int SessionCache::handle_expecting_rows()
m_res.offset += packetlen; m_res.offset += packetlen;
++m_res.nRows; ++m_res.nRows;
if (m_res.nRows > m_pInstance->config.max_resultset_rows) if (m_res.nRows > m_pCache->config().max_resultset_rows)
{ {
if (m_pInstance->config.debug & CACHE_DEBUG_DECISIONS) if (log_decisions())
{ {
MXS_NOTICE("Max rows %lu reached, not caching result.", m_res.nRows); MXS_NOTICE("Max rows %lu reached, not caching result.", m_res.nRows);
} }
@ -623,13 +607,13 @@ void SessionCache::reset_response_state()
*/ */
cache_result_t SessionCache::get_cached_response(const GWBUF *pQuery, GWBUF **ppResponse) cache_result_t SessionCache::get_cached_response(const GWBUF *pQuery, GWBUF **ppResponse)
{ {
cache_result_t result = m_pStorage->getKey(m_zDefaultDb, pQuery, m_key); cache_result_t result = m_pCache->getKey(m_zDefaultDb, pQuery, m_key);
if (result == CACHE_RESULT_OK) if (result == CACHE_RESULT_OK)
{ {
uint32_t flags = CACHE_FLAGS_INCLUDE_STALE; uint32_t flags = CACHE_FLAGS_INCLUDE_STALE;
result = m_pStorage->getValue(m_key, flags, ppResponse); result = m_pCache->getValue(m_key, flags, ppResponse);
} }
else else
{ {
@ -654,13 +638,13 @@ void SessionCache::store_result()
{ {
m_res.pData = pData; m_res.pData = pData;
cache_result_t result = m_pStorage->putValue(m_key, m_res.pData); cache_result_t result = m_pCache->putValue(m_key, m_res.pData);
if (result != CACHE_RESULT_OK) if (result != CACHE_RESULT_OK)
{ {
MXS_ERROR("Could not store cache item, deleting it."); MXS_ERROR("Could not store cache item, deleting it.");
result = m_pStorage->delValue(m_key); result = m_pCache->delValue(m_key);
if ((result != CACHE_RESULT_OK) || (result != CACHE_RESULT_NOT_FOUND)) if ((result != CACHE_RESULT_OK) || (result != CACHE_RESULT_NOT_FOUND))
{ {
@ -671,14 +655,7 @@ void SessionCache::store_result()
if (m_refreshing) if (m_refreshing)
{ {
long key = hash_of_key(m_key); m_pCache->refreshed(m_key, this);
spinlock_acquire(&m_pInstance->pending_lock);
ss_dassert(hashtable_fetch(m_pInstance->pending, (void*)key) == DUMMY_VALUE);
ss_debug(int n =) hashtable_delete(m_pInstance->pending, (void*)key);
ss_dassert(n == 1);
spinlock_release(&m_pInstance->pending_lock);
m_refreshing = false; m_refreshing = false;
} }
} }

View File

@ -15,9 +15,12 @@
#include <maxscale/cdefs.h> #include <maxscale/cdefs.h>
#include <maxscale/buffer.h> #include <maxscale/buffer.h>
#include <maxscale/filter.h> #include <maxscale/filter.h>
#include "cache.h"
#include "cachefilter.h" #include "cachefilter.h"
#include "cache_storage_api.h" #include "cache_storage_api.h"
class Cache;
class SessionCache class SessionCache
{ {
public: public:
@ -48,7 +51,7 @@ public:
/** /**
* Creates a SessionCache instance. * Creates a SessionCache instance.
* *
* @param pInstance Pointer to the cache instance to which this session cache * @param pCache Pointer to the cache instance to which this session cache
* belongs. Must remain valid for the lifetime of the SessionCache * belongs. Must remain valid for the lifetime of the SessionCache
* instance being created. * instance being created.
* @param pSession Pointer to the session this session cache instance is * @param pSession Pointer to the session this session cache instance is
@ -57,7 +60,7 @@ public:
* *
* @return A new instance or NULL if memory allocation fails. * @return A new instance or NULL if memory allocation fails.
*/ */
static SessionCache* Create(CACHE_INSTANCE* pInstance, SESSION* pSession); static SessionCache* Create(Cache* pCache, SESSION* pSession);
/** /**
* The session has been closed. * The session has been closed.
@ -113,22 +116,21 @@ private:
bool log_decisions() const bool log_decisions() const
{ {
return m_pInstance->config.debug & CACHE_DEBUG_DECISIONS ? true : false; return m_pCache->config().debug & CACHE_DEBUG_DECISIONS ? true : false;
} }
void store_result(); void store_result();
private: private:
SessionCache(CACHE_INSTANCE* pInstance, SESSION* pSession, char* zDefaultDb); SessionCache(Cache* pCache, SESSION* pSession, char* zDefaultDb);
SessionCache(const SessionCache&); SessionCache(const SessionCache&);
SessionCache& operator = (const SessionCache&); SessionCache& operator = (const SessionCache&);
private: private:
cache_session_state_t m_state; /**< What state is the session in, what data is expected. */ cache_session_state_t m_state; /**< What state is the session in, what data is expected. */
CACHE_INSTANCE* m_pInstance; /**< The cache instance the session is associated with. */ Cache* m_pCache; /**< The cache instance the session is associated with. */
SESSION* m_pSession; /**< The session this data is associated with. */ SESSION* m_pSession; /**< The session this data is associated with. */
Storage* m_pStorage; /**< The storage to be used with this session data. */
DOWNSTREAM m_down; /**< The previous filter or equivalent. */ DOWNSTREAM m_down; /**< The previous filter or equivalent. */
UPSTREAM m_up; /**< The next filter or equivalent. */ UPSTREAM m_up; /**< The next filter or equivalent. */
CACHE_RESPONSE_STATE m_res; /**< The response state. */ CACHE_RESPONSE_STATE m_res; /**< The response state. */