Cache: Take SessionCache into use
This commit is contained in:
parent
b5022e1deb
commit
5a6c4b5970
765
server/modules/filter/cache/cachefilter.cc
vendored
765
server/modules/filter/cache/cachefilter.cc
vendored
@ -16,14 +16,9 @@
|
||||
#include <maxscale/alloc.h>
|
||||
#include <maxscale/filter.h>
|
||||
#include <maxscale/gwdirs.h>
|
||||
#include <maxscale/hashtable.h>
|
||||
#include <maxscale/log_manager.h>
|
||||
#include <maxscale/modinfo.h>
|
||||
#include <maxscale/modutil.h>
|
||||
#include <maxscale/mysql_utils.h>
|
||||
#include <maxscale/query_classifier.h>
|
||||
#include <maxscale/spinlock.h>
|
||||
#include "rules.h"
|
||||
#include "sessioncache.h"
|
||||
#include "storage.h"
|
||||
#include "storagefactory.h"
|
||||
|
||||
@ -107,57 +102,7 @@ static const CACHE_CONFIG DEFAULT_CONFIG =
|
||||
CACHE_DEFAULT_DEBUG
|
||||
};
|
||||
|
||||
typedef enum cache_session_state
|
||||
{
|
||||
CACHE_EXPECTING_RESPONSE, // A select has been sent, and we are waiting for the response.
|
||||
CACHE_EXPECTING_FIELDS, // A select has been sent, and we want more fields.
|
||||
CACHE_EXPECTING_ROWS, // A select has been sent, and we want more rows.
|
||||
CACHE_EXPECTING_NOTHING, // We are not expecting anything from the server.
|
||||
CACHE_EXPECTING_USE_RESPONSE, // A "USE DB" was issued.
|
||||
CACHE_IGNORING_RESPONSE, // We are not interested in the data received from the server.
|
||||
} cache_session_state_t;
|
||||
|
||||
typedef struct cache_response_state
|
||||
{
|
||||
GWBUF* data; /**< Response data, possibly incomplete. */
|
||||
size_t n_totalfields; /**< The number of fields a resultset contains. */
|
||||
size_t n_fields; /**< How many fields we have received, <= n_totalfields. */
|
||||
size_t n_rows; /**< How many rows we have received. */
|
||||
size_t offset; /**< Where we are in the response buffer. */
|
||||
} CACHE_RESPONSE_STATE;
|
||||
|
||||
static void cache_response_state_reset(CACHE_RESPONSE_STATE *state);
|
||||
|
||||
typedef struct cache_session_data
|
||||
{
|
||||
CACHE_INSTANCE *instance; /**< The cache instance the session is associated with. */
|
||||
Storage *storage; /**< The storage to be used with this session data. */
|
||||
DOWNSTREAM down; /**< The previous filter or equivalent. */
|
||||
UPSTREAM up; /**< The next filter or equivalent. */
|
||||
CACHE_RESPONSE_STATE res; /**< The response state. */
|
||||
SESSION *session; /**< The session this data is associated with. */
|
||||
char key[CACHE_KEY_MAXLEN]; /**< Key storage. */
|
||||
char *default_db; /**< The default database. */
|
||||
char *use_db; /**< Pending default database. Needs server response. */
|
||||
cache_session_state_t state; /**< What state is the session in, what data is expected. */
|
||||
bool refreshing; /**< Whether the session is updating a stale cache entry. */
|
||||
} CACHE_SESSION_DATA;
|
||||
|
||||
static CACHE_SESSION_DATA *cache_session_data_create(CACHE_INSTANCE *instance, SESSION *session);
|
||||
static void cache_session_data_free(CACHE_SESSION_DATA *data);
|
||||
|
||||
static int handle_expecting_fields(CACHE_SESSION_DATA *csdata);
|
||||
static int handle_expecting_nothing(CACHE_SESSION_DATA *csdata);
|
||||
static int handle_expecting_response(CACHE_SESSION_DATA *csdata);
|
||||
static int handle_expecting_rows(CACHE_SESSION_DATA *csdata);
|
||||
static int handle_expecting_use_response(CACHE_SESSION_DATA *csdata);
|
||||
static int handle_ignoring_response(CACHE_SESSION_DATA *csdata);
|
||||
static bool process_params(char **options, FILTER_PARAMETER **params, CACHE_CONFIG* config);
|
||||
static cache_result_t get_cached_response(CACHE_SESSION_DATA *sdata, const GWBUF *key, GWBUF **value);
|
||||
|
||||
static int send_upstream(CACHE_SESSION_DATA *csdata);
|
||||
|
||||
static void store_result(CACHE_SESSION_DATA *csdata);
|
||||
|
||||
/**
|
||||
* Hashes a cache key to an integer.
|
||||
@ -194,17 +139,10 @@ static int hashcmp(const void* address1, const void* address2)
|
||||
return (long)address2 - (long)address1;
|
||||
}
|
||||
|
||||
#define DUMMY_VALUE (void*)0xdeadbeef
|
||||
|
||||
// Initial size of hashtable used for storing keys of queries that
|
||||
// are being fetches.
|
||||
#define CACHE_PENDING_ITEMS 50
|
||||
|
||||
static inline bool log_decisions(const CACHE_SESSION_DATA* csdata)
|
||||
{
|
||||
return csdata->instance->config.debug & CACHE_DEBUG_DECISIONS ? true : false;
|
||||
}
|
||||
|
||||
//
|
||||
// API BEGIN
|
||||
//
|
||||
@ -308,9 +246,10 @@ static FILTER *createInstance(const char *name, char **options, FILTER_PARAMETER
|
||||
static void *newSession(FILTER *instance, SESSION *session)
|
||||
{
|
||||
CACHE_INSTANCE *cinstance = (CACHE_INSTANCE*)instance;
|
||||
CACHE_SESSION_DATA *csdata = cache_session_data_create(cinstance, session);
|
||||
|
||||
return csdata;
|
||||
SessionCache *pSessionCache = SessionCache::Create(cinstance, session);
|
||||
|
||||
return pSessionCache;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -322,7 +261,10 @@ static void *newSession(FILTER *instance, SESSION *session)
|
||||
static void closeSession(FILTER *instance, void *sdata)
|
||||
{
|
||||
CACHE_INSTANCE *cinstance = (CACHE_INSTANCE*)instance;
|
||||
CACHE_SESSION_DATA *csdata = (CACHE_SESSION_DATA*)sdata;
|
||||
|
||||
SessionCache* pSessionCache = static_cast<SessionCache*>(sdata);
|
||||
|
||||
pSessionCache->close();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -334,9 +276,10 @@ static void closeSession(FILTER *instance, void *sdata)
|
||||
static void freeSession(FILTER *instance, void *sdata)
|
||||
{
|
||||
CACHE_INSTANCE *cinstance = (CACHE_INSTANCE*)instance;
|
||||
CACHE_SESSION_DATA *csdata = (CACHE_SESSION_DATA*)sdata;
|
||||
|
||||
cache_session_data_free(csdata);
|
||||
SessionCache* pSessionCache = static_cast<SessionCache*>(sdata);
|
||||
|
||||
delete pSessionCache;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -349,9 +292,10 @@ static void freeSession(FILTER *instance, void *sdata)
|
||||
static void setDownstream(FILTER *instance, void *sdata, DOWNSTREAM *down)
|
||||
{
|
||||
CACHE_INSTANCE *cinstance = (CACHE_INSTANCE*)instance;
|
||||
CACHE_SESSION_DATA *csdata = (CACHE_SESSION_DATA*)sdata;
|
||||
|
||||
csdata->down = *down;
|
||||
SessionCache* pSessionCache = static_cast<SessionCache*>(sdata);
|
||||
|
||||
pSessionCache->setDownstream(down);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -364,9 +308,10 @@ static void setDownstream(FILTER *instance, void *sdata, DOWNSTREAM *down)
|
||||
static void setUpstream(FILTER *instance, void *sdata, UPSTREAM *up)
|
||||
{
|
||||
CACHE_INSTANCE *cinstance = (CACHE_INSTANCE*)instance;
|
||||
CACHE_SESSION_DATA *csdata = (CACHE_SESSION_DATA*)sdata;
|
||||
|
||||
csdata->up = *up;
|
||||
SessionCache* pSessionCache = static_cast<SessionCache*>(sdata);
|
||||
|
||||
pSessionCache->setUpstream(up);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -379,172 +324,10 @@ static void setUpstream(FILTER *instance, void *sdata, UPSTREAM *up)
|
||||
static int routeQuery(FILTER *instance, void *sdata, GWBUF *packet)
|
||||
{
|
||||
CACHE_INSTANCE *cinstance = (CACHE_INSTANCE*)instance;
|
||||
CACHE_SESSION_DATA *csdata = (CACHE_SESSION_DATA*)sdata;
|
||||
|
||||
uint8_t *data = (uint8_t*)GWBUF_DATA(packet);
|
||||
SessionCache* pSessionCache = static_cast<SessionCache*>(sdata);
|
||||
|
||||
// All of these should be guaranteed by RCAP_TYPE_TRANSACTION_TRACKING
|
||||
ss_dassert(GWBUF_IS_CONTIGUOUS(packet));
|
||||
ss_dassert(GWBUF_LENGTH(packet) >= MYSQL_HEADER_LEN + 1);
|
||||
ss_dassert(MYSQL_GET_PACKET_LEN(data) + MYSQL_HEADER_LEN == GWBUF_LENGTH(packet));
|
||||
|
||||
bool fetch_from_server = true;
|
||||
|
||||
cache_response_state_reset(&csdata->res);
|
||||
csdata->state = CACHE_IGNORING_RESPONSE;
|
||||
|
||||
int rv;
|
||||
|
||||
switch ((int)MYSQL_GET_COMMAND(data))
|
||||
{
|
||||
case MYSQL_COM_INIT_DB:
|
||||
{
|
||||
ss_dassert(!csdata->use_db);
|
||||
size_t len = MYSQL_GET_PACKET_LEN(data) - 1; // Remove the command byte.
|
||||
csdata->use_db = (char*)MXS_MALLOC(len + 1);
|
||||
|
||||
if (csdata->use_db)
|
||||
{
|
||||
memcpy(csdata->use_db, data + MYSQL_HEADER_LEN + 1, len);
|
||||
csdata->use_db[len] = 0;
|
||||
csdata->state = CACHE_EXPECTING_USE_RESPONSE;
|
||||
}
|
||||
else
|
||||
{
|
||||
// Memory allocation failed. We need to remove the default database to
|
||||
// prevent incorrect cache entries, since we won't know what the
|
||||
// default db is. But we only need to do that if "USE <db>" really
|
||||
// succeeds. The right thing will happen by itself in
|
||||
// handle_expecting_use_response(); if OK is returned, default_db will
|
||||
// become NULL, if ERR, default_db will not be changed.
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
||||
case MYSQL_COM_QUERY:
|
||||
{
|
||||
// We do not care whether the query was fully parsed or not.
|
||||
// If a query cannot be fully parsed, the worst thing that can
|
||||
// happen is that caching is not used, even though it would be
|
||||
// possible.
|
||||
if (qc_get_operation(packet) == QUERY_OP_SELECT)
|
||||
{
|
||||
SESSION *session = csdata->session;
|
||||
|
||||
if ((session_is_autocommit(session) && !session_trx_is_active(session)) ||
|
||||
session_trx_is_read_only(session))
|
||||
{
|
||||
if (cache_rules_should_store(cinstance->rules, csdata->default_db, packet))
|
||||
{
|
||||
if (cache_rules_should_use(cinstance->rules, csdata->session))
|
||||
{
|
||||
GWBUF *response;
|
||||
cache_result_t result = get_cached_response(csdata, packet, &response);
|
||||
|
||||
switch (result)
|
||||
{
|
||||
case CACHE_RESULT_STALE:
|
||||
{
|
||||
// The value was found, but it was stale. Now we need to
|
||||
// figure out whether somebody else is already fetching it.
|
||||
|
||||
long key = hash_of_key(csdata->key);
|
||||
|
||||
spinlock_acquire(&cinstance->pending_lock);
|
||||
// TODO: Remove the internal locking of hashtable. The internal
|
||||
// TODO: locking is no good if you need transactional behaviour.
|
||||
// TODO: Now we lock twice.
|
||||
void *value = hashtable_fetch(cinstance->pending, (void*)key);
|
||||
if (!value)
|
||||
{
|
||||
// It's not being fetched, so we make a note that we are.
|
||||
hashtable_add(cinstance->pending, (void*)key, DUMMY_VALUE);
|
||||
}
|
||||
spinlock_release(&cinstance->pending_lock);
|
||||
|
||||
if (!value)
|
||||
{
|
||||
// We were the first ones who hit the stale item. It's
|
||||
// our responsibility now to fetch it.
|
||||
if (log_decisions(csdata))
|
||||
{
|
||||
MXS_NOTICE("Cache data is stale, fetching fresh from server.");
|
||||
}
|
||||
csdata->refreshing = true;
|
||||
fetch_from_server = true;
|
||||
break;
|
||||
}
|
||||
else
|
||||
{
|
||||
// Somebody is already fetching the new value. So, let's
|
||||
// use the stale value. No point in hitting the server twice.
|
||||
if (log_decisions(csdata))
|
||||
{
|
||||
MXS_NOTICE("Cache data is stale but returning it, fresh "
|
||||
"data is being fetched already.");
|
||||
}
|
||||
fetch_from_server = false;
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
||||
case CACHE_RESULT_OK:
|
||||
if (log_decisions(csdata))
|
||||
{
|
||||
MXS_NOTICE("Using fresh data from cache.");
|
||||
}
|
||||
fetch_from_server = false;
|
||||
break;
|
||||
|
||||
default:
|
||||
fetch_from_server = true;
|
||||
}
|
||||
|
||||
if (fetch_from_server)
|
||||
{
|
||||
csdata->state = CACHE_EXPECTING_RESPONSE;
|
||||
}
|
||||
else
|
||||
{
|
||||
csdata->state = CACHE_EXPECTING_NOTHING;
|
||||
gwbuf_free(packet);
|
||||
DCB *dcb = csdata->session->client_dcb;
|
||||
|
||||
// TODO: This is not ok. Any filters before this filter, will not
|
||||
// TODO: see this data.
|
||||
rv = dcb->func.write(dcb, response);
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
csdata->state = CACHE_IGNORING_RESPONSE;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
if (csdata->instance->config.debug & CACHE_DEBUG_DECISIONS)
|
||||
{
|
||||
MXS_NOTICE("autocommit = %s and transaction state %s => Not using or "
|
||||
"storing to cache.",
|
||||
session_is_autocommit(csdata->session) ? "ON" : "OFF",
|
||||
session_trx_state_to_string(session_get_trx_state(csdata->session)));
|
||||
}
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (fetch_from_server)
|
||||
{
|
||||
rv = csdata->down.routeQuery(csdata->down.instance, csdata->down.session, packet);
|
||||
}
|
||||
|
||||
return rv;
|
||||
return pSessionCache->routeQuery(packet);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -557,70 +340,10 @@ static int routeQuery(FILTER *instance, void *sdata, GWBUF *packet)
|
||||
static int clientReply(FILTER *instance, void *sdata, GWBUF *data)
|
||||
{
|
||||
CACHE_INSTANCE *cinstance = (CACHE_INSTANCE*)instance;
|
||||
CACHE_SESSION_DATA *csdata = (CACHE_SESSION_DATA*)sdata;
|
||||
|
||||
int rv;
|
||||
SessionCache* pSessionCache = static_cast<SessionCache*>(sdata);
|
||||
|
||||
if (csdata->res.data)
|
||||
{
|
||||
gwbuf_append(csdata->res.data, data);
|
||||
}
|
||||
else
|
||||
{
|
||||
csdata->res.data = data;
|
||||
}
|
||||
|
||||
if (csdata->state != CACHE_IGNORING_RESPONSE)
|
||||
{
|
||||
if (gwbuf_length(csdata->res.data) > csdata->instance->config.max_resultset_size)
|
||||
{
|
||||
if (csdata->instance->config.debug & CACHE_DEBUG_DECISIONS)
|
||||
{
|
||||
MXS_NOTICE("Current size %uB of resultset, at least as much "
|
||||
"as maximum allowed size %uKiB. Not caching.",
|
||||
gwbuf_length(csdata->res.data),
|
||||
csdata->instance->config.max_resultset_size / 1024);
|
||||
}
|
||||
|
||||
csdata->state = CACHE_IGNORING_RESPONSE;
|
||||
}
|
||||
}
|
||||
|
||||
switch (csdata->state)
|
||||
{
|
||||
case CACHE_EXPECTING_FIELDS:
|
||||
rv = handle_expecting_fields(csdata);
|
||||
break;
|
||||
|
||||
case CACHE_EXPECTING_NOTHING:
|
||||
rv = handle_expecting_nothing(csdata);
|
||||
break;
|
||||
|
||||
case CACHE_EXPECTING_RESPONSE:
|
||||
rv = handle_expecting_response(csdata);
|
||||
break;
|
||||
|
||||
case CACHE_EXPECTING_ROWS:
|
||||
rv = handle_expecting_rows(csdata);
|
||||
break;
|
||||
|
||||
case CACHE_EXPECTING_USE_RESPONSE:
|
||||
rv = handle_expecting_use_response(csdata);
|
||||
break;
|
||||
|
||||
case CACHE_IGNORING_RESPONSE:
|
||||
rv = handle_ignoring_response(csdata);
|
||||
break;
|
||||
|
||||
default:
|
||||
MXS_ERROR("Internal cache logic broken, unexpected state: %d", csdata->state);
|
||||
ss_dassert(!true);
|
||||
rv = send_upstream(csdata);
|
||||
cache_response_state_reset(&csdata->res);
|
||||
csdata->state = CACHE_IGNORING_RESPONSE;
|
||||
}
|
||||
|
||||
return rv;
|
||||
return pSessionCache->clientReply(data);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -636,9 +359,10 @@ static int clientReply(FILTER *instance, void *sdata, GWBUF *data)
|
||||
static void diagnostics(FILTER *instance, void *sdata, DCB *dcb)
|
||||
{
|
||||
CACHE_INSTANCE *cinstance = (CACHE_INSTANCE*)instance;
|
||||
CACHE_SESSION_DATA *csdata = (CACHE_SESSION_DATA*)sdata;
|
||||
|
||||
dcb_printf(dcb, "Hello World from Cache!\n");
|
||||
SessionCache* pSessionCache = static_cast<SessionCache*>(sdata);
|
||||
|
||||
pSessionCache->diagnostics(dcb);
|
||||
}
|
||||
|
||||
|
||||
@ -656,356 +380,6 @@ static uint64_t getCapabilities(void)
|
||||
// API END
|
||||
//
|
||||
|
||||
/**
|
||||
* Reset cache response state
|
||||
*
|
||||
* @param state Pointer to object.
|
||||
*/
|
||||
static void cache_response_state_reset(CACHE_RESPONSE_STATE *state)
|
||||
{
|
||||
state->data = NULL;
|
||||
state->n_totalfields = 0;
|
||||
state->n_fields = 0;
|
||||
state->n_rows = 0;
|
||||
state->offset = 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create cache session data
|
||||
*
|
||||
* @param instance The cache instance this data is associated with.
|
||||
*
|
||||
* @return Session data or NULL if creation fails.
|
||||
*/
|
||||
static CACHE_SESSION_DATA *cache_session_data_create(CACHE_INSTANCE *instance,
|
||||
SESSION* session)
|
||||
{
|
||||
CACHE_SESSION_DATA *data = (CACHE_SESSION_DATA*)MXS_CALLOC(1, sizeof(CACHE_SESSION_DATA));
|
||||
|
||||
if (data)
|
||||
{
|
||||
char *default_db = NULL;
|
||||
|
||||
ss_dassert(session->client_dcb);
|
||||
ss_dassert(session->client_dcb->data);
|
||||
MYSQL_session *mysql_session = (MYSQL_session*)session->client_dcb->data;
|
||||
|
||||
if (mysql_session->db[0] != 0)
|
||||
{
|
||||
default_db = MXS_STRDUP(mysql_session->db);
|
||||
}
|
||||
|
||||
if ((mysql_session->db[0] == 0) || default_db)
|
||||
{
|
||||
data->instance = instance;
|
||||
data->storage = instance->storage;
|
||||
data->session = session;
|
||||
data->state = CACHE_EXPECTING_NOTHING;
|
||||
data->default_db = default_db;
|
||||
}
|
||||
else
|
||||
{
|
||||
MXS_FREE(data);
|
||||
data = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
return data;
|
||||
}
|
||||
|
||||
/**
|
||||
* Free cache session data.
|
||||
*
|
||||
* @param A cache session data previously allocated using session_data_create().
|
||||
*/
|
||||
static void cache_session_data_free(CACHE_SESSION_DATA* data)
|
||||
{
|
||||
if (data)
|
||||
{
|
||||
// In normal circumstances, only data->default_db may be non-NULL at
|
||||
// this point. However, if the authentication with the backend fails
|
||||
// and the session is closed, data->use_db may be non-NULL.
|
||||
MXS_FREE(data->use_db);
|
||||
MXS_FREE(data->default_db);
|
||||
MXS_FREE(data);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Called when resultset field information is handled.
|
||||
*
|
||||
* @param csdata The cache session data.
|
||||
*/
|
||||
static int handle_expecting_fields(CACHE_SESSION_DATA *csdata)
|
||||
{
|
||||
ss_dassert(csdata->state == CACHE_EXPECTING_FIELDS);
|
||||
ss_dassert(csdata->res.data);
|
||||
|
||||
int rv = 1;
|
||||
|
||||
bool insufficient = false;
|
||||
|
||||
size_t buflen = gwbuf_length(csdata->res.data);
|
||||
|
||||
while (!insufficient && (buflen - csdata->res.offset >= MYSQL_HEADER_LEN))
|
||||
{
|
||||
uint8_t header[MYSQL_HEADER_LEN + 1];
|
||||
gwbuf_copy_data(csdata->res.data, csdata->res.offset, MYSQL_HEADER_LEN + 1, header);
|
||||
|
||||
size_t packetlen = MYSQL_HEADER_LEN + MYSQL_GET_PACKET_LEN(header);
|
||||
|
||||
if (csdata->res.offset + packetlen <= buflen)
|
||||
{
|
||||
// We have at least one complete packet.
|
||||
int command = (int)MYSQL_GET_COMMAND(header);
|
||||
|
||||
switch (command)
|
||||
{
|
||||
case 0xfe: // EOF, the one after the fields.
|
||||
csdata->res.offset += packetlen;
|
||||
csdata->state = CACHE_EXPECTING_ROWS;
|
||||
rv = handle_expecting_rows(csdata);
|
||||
break;
|
||||
|
||||
default: // Field information.
|
||||
csdata->res.offset += packetlen;
|
||||
++csdata->res.n_fields;
|
||||
ss_dassert(csdata->res.n_fields <= csdata->res.n_totalfields);
|
||||
break;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// We need more data
|
||||
insufficient = true;
|
||||
}
|
||||
}
|
||||
|
||||
return rv;
|
||||
}
|
||||
|
||||
/**
|
||||
* Called when data is received (even if nothing is expected) from the server.
|
||||
*
|
||||
* @param csdata The cache session data.
|
||||
*/
|
||||
static int handle_expecting_nothing(CACHE_SESSION_DATA *csdata)
|
||||
{
|
||||
ss_dassert(csdata->state == CACHE_EXPECTING_NOTHING);
|
||||
ss_dassert(csdata->res.data);
|
||||
MXS_ERROR("Received data from the backend althoug we were expecting nothing.");
|
||||
ss_dassert(!true);
|
||||
|
||||
return send_upstream(csdata);
|
||||
}
|
||||
|
||||
/**
|
||||
* Called when a response is received from the server.
|
||||
*
|
||||
* @param csdata The cache session data.
|
||||
*/
|
||||
static int handle_expecting_response(CACHE_SESSION_DATA *csdata)
|
||||
{
|
||||
ss_dassert(csdata->state == CACHE_EXPECTING_RESPONSE);
|
||||
ss_dassert(csdata->res.data);
|
||||
|
||||
int rv = 1;
|
||||
|
||||
size_t buflen = gwbuf_length(csdata->res.data);
|
||||
|
||||
if (buflen >= MYSQL_HEADER_LEN + 1) // We need the command byte.
|
||||
{
|
||||
// Reserve enough space to accomodate for the largest length encoded integer,
|
||||
// which is type field + 8 bytes.
|
||||
uint8_t header[MYSQL_HEADER_LEN + 1 + 8];
|
||||
gwbuf_copy_data(csdata->res.data, 0, MYSQL_HEADER_LEN + 1, header);
|
||||
|
||||
switch ((int)MYSQL_GET_COMMAND(header))
|
||||
{
|
||||
case 0x00: // OK
|
||||
case 0xff: // ERR
|
||||
store_result(csdata);
|
||||
|
||||
rv = send_upstream(csdata);
|
||||
csdata->state = CACHE_IGNORING_RESPONSE;
|
||||
break;
|
||||
|
||||
case 0xfb: // GET_MORE_CLIENT_DATA/SEND_MORE_CLIENT_DATA
|
||||
rv = send_upstream(csdata);
|
||||
csdata->state = CACHE_IGNORING_RESPONSE;
|
||||
break;
|
||||
|
||||
default:
|
||||
if (csdata->res.n_totalfields != 0)
|
||||
{
|
||||
// We've seen the header and have figured out how many fields there are.
|
||||
csdata->state = CACHE_EXPECTING_FIELDS;
|
||||
rv = handle_expecting_fields(csdata);
|
||||
}
|
||||
else
|
||||
{
|
||||
// leint_bytes() returns the length of the int type field + the size of the
|
||||
// integer.
|
||||
size_t n_bytes = leint_bytes(&header[4]);
|
||||
|
||||
if (MYSQL_HEADER_LEN + n_bytes <= buflen)
|
||||
{
|
||||
// Now we can figure out how many fields there are, but first we
|
||||
// need to copy some more data.
|
||||
gwbuf_copy_data(csdata->res.data,
|
||||
MYSQL_HEADER_LEN + 1, n_bytes - 1, &header[MYSQL_HEADER_LEN + 1]);
|
||||
|
||||
csdata->res.n_totalfields = leint_value(&header[4]);
|
||||
csdata->res.offset = MYSQL_HEADER_LEN + n_bytes;
|
||||
|
||||
csdata->state = CACHE_EXPECTING_FIELDS;
|
||||
rv = handle_expecting_fields(csdata);
|
||||
}
|
||||
else
|
||||
{
|
||||
// We need more data. We will be called again, when data is available.
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return rv;
|
||||
}
|
||||
|
||||
/**
|
||||
* Called when resultset rows are handled.
|
||||
*
|
||||
* @param csdata The cache session data.
|
||||
*/
|
||||
static int handle_expecting_rows(CACHE_SESSION_DATA *csdata)
|
||||
{
|
||||
ss_dassert(csdata->state == CACHE_EXPECTING_ROWS);
|
||||
ss_dassert(csdata->res.data);
|
||||
|
||||
int rv = 1;
|
||||
|
||||
bool insufficient = false;
|
||||
|
||||
size_t buflen = gwbuf_length(csdata->res.data);
|
||||
|
||||
while (!insufficient && (buflen - csdata->res.offset >= MYSQL_HEADER_LEN))
|
||||
{
|
||||
uint8_t header[MYSQL_HEADER_LEN + 1];
|
||||
gwbuf_copy_data(csdata->res.data, csdata->res.offset, MYSQL_HEADER_LEN + 1, header);
|
||||
|
||||
size_t packetlen = MYSQL_HEADER_LEN + MYSQL_GET_PACKET_LEN(header);
|
||||
|
||||
if (csdata->res.offset + packetlen <= buflen)
|
||||
{
|
||||
// We have at least one complete packet.
|
||||
int command = (int)MYSQL_GET_COMMAND(header);
|
||||
|
||||
switch (command)
|
||||
{
|
||||
case 0xfe: // EOF, the one after the rows.
|
||||
csdata->res.offset += packetlen;
|
||||
ss_dassert(csdata->res.offset == buflen);
|
||||
|
||||
store_result(csdata);
|
||||
|
||||
rv = send_upstream(csdata);
|
||||
csdata->state = CACHE_EXPECTING_NOTHING;
|
||||
break;
|
||||
|
||||
case 0xfb: // NULL
|
||||
default: // length-encoded-string
|
||||
csdata->res.offset += packetlen;
|
||||
++csdata->res.n_rows;
|
||||
|
||||
if (csdata->res.n_rows > csdata->instance->config.max_resultset_rows)
|
||||
{
|
||||
if (csdata->instance->config.debug & CACHE_DEBUG_DECISIONS)
|
||||
{
|
||||
MXS_NOTICE("Max rows %lu reached, not caching result.", csdata->res.n_rows);
|
||||
}
|
||||
rv = send_upstream(csdata);
|
||||
csdata->res.offset = buflen; // To abort the loop.
|
||||
csdata->state = CACHE_IGNORING_RESPONSE;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// We need more data
|
||||
insufficient = true;
|
||||
}
|
||||
}
|
||||
|
||||
return rv;
|
||||
}
|
||||
|
||||
/**
|
||||
* Called when a response to a "USE db" is received from the server.
|
||||
*
|
||||
* @param csdata The cache session data.
|
||||
*/
|
||||
static int handle_expecting_use_response(CACHE_SESSION_DATA *csdata)
|
||||
{
|
||||
ss_dassert(csdata->state == CACHE_EXPECTING_USE_RESPONSE);
|
||||
ss_dassert(csdata->res.data);
|
||||
|
||||
int rv = 1;
|
||||
|
||||
size_t buflen = gwbuf_length(csdata->res.data);
|
||||
|
||||
if (buflen >= MYSQL_HEADER_LEN + 1) // We need the command byte.
|
||||
{
|
||||
uint8_t command;
|
||||
|
||||
gwbuf_copy_data(csdata->res.data, MYSQL_HEADER_LEN, 1, &command);
|
||||
|
||||
switch (command)
|
||||
{
|
||||
case 0x00: // OK
|
||||
// In case csdata->use_db could not be allocated in routeQuery(), we will
|
||||
// in fact reset the default db here. That's ok as it will prevent broken
|
||||
// entries in the cache.
|
||||
MXS_FREE(csdata->default_db);
|
||||
csdata->default_db = csdata->use_db;
|
||||
csdata->use_db = NULL;
|
||||
break;
|
||||
|
||||
case 0xff: // ERR
|
||||
MXS_FREE(csdata->use_db);
|
||||
csdata->use_db = NULL;
|
||||
break;
|
||||
|
||||
default:
|
||||
MXS_ERROR("\"USE %s\" received unexpected server response %d.",
|
||||
csdata->use_db ? csdata->use_db : "<db>", command);
|
||||
MXS_FREE(csdata->default_db);
|
||||
MXS_FREE(csdata->use_db);
|
||||
csdata->default_db = NULL;
|
||||
csdata->use_db = NULL;
|
||||
}
|
||||
|
||||
rv = send_upstream(csdata);
|
||||
csdata->state = CACHE_IGNORING_RESPONSE;
|
||||
}
|
||||
|
||||
return rv;
|
||||
}
|
||||
|
||||
/**
|
||||
* Called when all data from the server is ignored.
|
||||
*
|
||||
* @param csdata The cache session data.
|
||||
*/
|
||||
static int handle_ignoring_response(CACHE_SESSION_DATA *csdata)
|
||||
{
|
||||
ss_dassert(csdata->state == CACHE_IGNORING_RESPONSE);
|
||||
ss_dassert(csdata->res.data);
|
||||
|
||||
return send_upstream(csdata);
|
||||
}
|
||||
|
||||
/**
|
||||
* Processes the cache params
|
||||
*
|
||||
@ -1164,94 +538,3 @@ static bool process_params(char **options, FILTER_PARAMETER **params, CACHE_CONF
|
||||
|
||||
return !error;
|
||||
}
|
||||
|
||||
/**
|
||||
* Route a query via the cache.
|
||||
*
|
||||
* @param csdata Session data
|
||||
* @param key A SELECT packet.
|
||||
* @param value The result.
|
||||
* @return True if the query was satisfied from the query.
|
||||
*/
|
||||
static cache_result_t get_cached_response(CACHE_SESSION_DATA *csdata,
|
||||
const GWBUF *query,
|
||||
GWBUF **value)
|
||||
{
|
||||
cache_result_t result = csdata->storage->getKey(csdata->default_db, query, csdata->key);
|
||||
|
||||
if (result == CACHE_RESULT_OK)
|
||||
{
|
||||
uint32_t flags = CACHE_FLAGS_INCLUDE_STALE;
|
||||
|
||||
result = csdata->storage->getValue(csdata->key, flags, value);
|
||||
}
|
||||
else
|
||||
{
|
||||
MXS_ERROR("Could not create cache key.");
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Send data upstream.
|
||||
*
|
||||
* @param csdata Session data
|
||||
*
|
||||
* @return Whatever the upstream returns.
|
||||
*/
|
||||
static int send_upstream(CACHE_SESSION_DATA *csdata)
|
||||
{
|
||||
ss_dassert(csdata->res.data != NULL);
|
||||
|
||||
int rv = csdata->up.clientReply(csdata->up.instance, csdata->up.session, csdata->res.data);
|
||||
csdata->res.data = NULL;
|
||||
|
||||
return rv;
|
||||
}
|
||||
|
||||
/**
|
||||
* Store the data.
|
||||
*
|
||||
* @param csdata Session data
|
||||
*/
|
||||
static void store_result(CACHE_SESSION_DATA *csdata)
|
||||
{
|
||||
ss_dassert(csdata->res.data);
|
||||
|
||||
GWBUF *data = gwbuf_make_contiguous(csdata->res.data);
|
||||
|
||||
if (data)
|
||||
{
|
||||
csdata->res.data = data;
|
||||
|
||||
cache_result_t result = csdata->storage->putValue(csdata->key, csdata->res.data);
|
||||
|
||||
if (result != CACHE_RESULT_OK)
|
||||
{
|
||||
MXS_ERROR("Could not store cache item, deleting it.");
|
||||
|
||||
result = csdata->storage->delValue(csdata->key);
|
||||
|
||||
if ((result != CACHE_RESULT_OK) || (result != CACHE_RESULT_NOT_FOUND))
|
||||
{
|
||||
MXS_ERROR("Could not delete cache item.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (csdata->refreshing)
|
||||
{
|
||||
long key = hash_of_key(csdata->key);
|
||||
|
||||
CACHE_INSTANCE *instance = csdata->instance;
|
||||
|
||||
spinlock_acquire(&instance->pending_lock);
|
||||
ss_dassert(hashtable_fetch(instance->pending, (void*)key) == DUMMY_VALUE);
|
||||
ss_debug(int n =) hashtable_delete(instance->pending, (void*)key);
|
||||
ss_dassert(n == 1);
|
||||
spinlock_release(&instance->pending_lock);
|
||||
|
||||
csdata->refreshing = false;
|
||||
}
|
||||
}
|
||||
|
1
server/modules/filter/cache/cachefilter.h
vendored
1
server/modules/filter/cache/cachefilter.h
vendored
@ -17,6 +17,7 @@
|
||||
#include <maxscale/cdefs.h>
|
||||
#include <limits.h>
|
||||
#include <maxscale/hashtable.h>
|
||||
#include <maxscale/spinlock.h>
|
||||
#include "rules.h"
|
||||
|
||||
class Storage;
|
||||
|
1
server/modules/filter/cache/storage.cc
vendored
1
server/modules/filter/cache/storage.cc
vendored
@ -11,6 +11,7 @@
|
||||
* Public License.
|
||||
*/
|
||||
|
||||
#define MXS_MODULE_NAME "cache"
|
||||
#include "storage.h"
|
||||
|
||||
|
||||
|
@ -11,6 +11,7 @@
|
||||
* Public License.
|
||||
*/
|
||||
|
||||
#define MXS_MODULE_NAME "cache"
|
||||
#include "storagefactory.h"
|
||||
#include <dlfcn.h>
|
||||
#include <sys/param.h>
|
||||
|
Loading…
x
Reference in New Issue
Block a user