cache: Process response to COM_QUERY
When a query has been sent to a backend, the response is now processed to the extent that the cache is capable of figuring out how many rows are being returned, so that the cache setting `max_resultset_rows` can be processed. The code is now also written in such a manner that it should be insensitive to how a package has been split up into a chain of GWBUFs.
This commit is contained in:
@ -87,15 +87,26 @@ The setting can be changed to `any`, provided fully qualified names
|
|||||||
are always used or if the names of tables in different databases are
|
are always used or if the names of tables in different databases are
|
||||||
different.
|
different.
|
||||||
|
|
||||||
|
#### `max_resultset_rows`
|
||||||
|
|
||||||
|
Specifies the maximum number of rows a resultset can have in order to be
|
||||||
|
stored in the cache. A resultset larger than this, will not be stored.
|
||||||
|
```
|
||||||
|
max_resultset_rows=1000
|
||||||
|
```
|
||||||
|
Zero or a negative value is interpreted as no limitation.
|
||||||
|
|
||||||
|
The default value is `-1`.
|
||||||
|
|
||||||
#### `max_resultset_size`
|
#### `max_resultset_size`
|
||||||
|
|
||||||
Specifies the maximum size a resultset can have, measured in kibibytes,
|
Specifies the maximum size a resultset can have, measured in kibibytes,
|
||||||
in order to be stored in the cache. A resultset larger than this, will
|
in order to be stored in the cache. A resultset larger than this, will
|
||||||
not be stored.
|
not be stored.
|
||||||
```
|
```
|
||||||
max_resultset_size=64
|
max_resultset_size=128
|
||||||
```
|
```
|
||||||
The default value is TBD.
|
The default value is 64.
|
||||||
|
|
||||||
#### `ttl`
|
#### `ttl`
|
||||||
|
|
||||||
|
|||||||
534
server/modules/filter/cache/cache.c
vendored
534
server/modules/filter/cache/cache.c
vendored
@ -17,23 +17,13 @@
|
|||||||
#include <log_manager.h>
|
#include <log_manager.h>
|
||||||
#include <modinfo.h>
|
#include <modinfo.h>
|
||||||
#include <modutil.h>
|
#include <modutil.h>
|
||||||
|
#include <mysql_utils.h>
|
||||||
#include <query_classifier.h>
|
#include <query_classifier.h>
|
||||||
|
#include "cache.h"
|
||||||
#include "storage.h"
|
#include "storage.h"
|
||||||
|
|
||||||
static char VERSION_STRING[] = "V1.0.0";
|
static char VERSION_STRING[] = "V1.0.0";
|
||||||
|
|
||||||
typedef enum cache_references
|
|
||||||
{
|
|
||||||
CACHE_REFERENCES_ANY,
|
|
||||||
CACHE_REFERENCES_QUALIFIED
|
|
||||||
} cache_references_t;
|
|
||||||
|
|
||||||
#define DEFAULT_ALLOWED_REFERENCES CACHE_REFERENCES_QUALIFIED
|
|
||||||
// Bytes
|
|
||||||
#define DEFAULT_MAX_RESULTSET_SIZE 64 * 1024
|
|
||||||
// Seconds
|
|
||||||
#define DEFAULT_TTL 10
|
|
||||||
|
|
||||||
static FILTER *createInstance(const char *name, char **options, FILTER_PARAMETER **);
|
static FILTER *createInstance(const char *name, char **options, FILTER_PARAMETER **);
|
||||||
static void *newSession(FILTER *instance, SESSION *session);
|
static void *newSession(FILTER *instance, SESSION *session);
|
||||||
static void closeSession(FILTER *instance, void *sdata);
|
static void closeSession(FILTER *instance, void *sdata);
|
||||||
@ -101,12 +91,23 @@ FILTER_OBJECT *GetModuleObject()
|
|||||||
typedef struct cache_config
|
typedef struct cache_config
|
||||||
{
|
{
|
||||||
cache_references_t allowed_references;
|
cache_references_t allowed_references;
|
||||||
|
uint32_t max_resultset_rows;
|
||||||
uint32_t max_resultset_size;
|
uint32_t max_resultset_size;
|
||||||
const char *storage;
|
const char *storage;
|
||||||
const char *storage_args;
|
const char *storage_args;
|
||||||
uint32_t ttl;
|
uint32_t ttl;
|
||||||
} CACHE_CONFIG;
|
} CACHE_CONFIG;
|
||||||
|
|
||||||
|
static const CACHE_CONFIG DEFAULT_CONFIG =
|
||||||
|
{
|
||||||
|
CACHE_DEFAULT_ALLOWED_REFERENCES,
|
||||||
|
CACHE_DEFAULT_MAX_RESULTSET_ROWS,
|
||||||
|
CACHE_DEFAULT_MAX_RESULTSET_SIZE,
|
||||||
|
NULL,
|
||||||
|
NULL,
|
||||||
|
CACHE_DEFAULT_TTL
|
||||||
|
};
|
||||||
|
|
||||||
typedef struct cache_instance
|
typedef struct cache_instance
|
||||||
{
|
{
|
||||||
const char *name;
|
const char *name;
|
||||||
@ -115,31 +116,59 @@ typedef struct cache_instance
|
|||||||
CACHE_STORAGE *storage;
|
CACHE_STORAGE *storage;
|
||||||
} CACHE_INSTANCE;
|
} CACHE_INSTANCE;
|
||||||
|
|
||||||
static const CACHE_CONFIG DEFAULT_CONFIG =
|
typedef enum cache_session_state
|
||||||
{
|
{
|
||||||
DEFAULT_ALLOWED_REFERENCES,
|
CACHE_EXPECTING_RESPONSE, // A select has been sent, and we are waiting for the response.
|
||||||
DEFAULT_MAX_RESULTSET_SIZE,
|
CACHE_EXPECTING_FIELDS, // A select has been sent, and we want more fields.
|
||||||
NULL,
|
CACHE_EXPECTING_ROWS, // A select has been sent, and we want more rows.
|
||||||
NULL,
|
CACHE_EXPECTING_NOTHING, // We are not expecting anything from the server.
|
||||||
DEFAULT_TTL
|
CACHE_IGNORING_RESPONSE, // We are not interested in the data received from the server.
|
||||||
};
|
} cache_session_state_t;
|
||||||
|
|
||||||
|
typedef struct cache_request_state
|
||||||
|
{
|
||||||
|
GWBUF* data; /**< Request data, possibly incomplete. */
|
||||||
|
} CACHE_REQUEST_STATE;
|
||||||
|
|
||||||
|
typedef struct cache_response_state
|
||||||
|
{
|
||||||
|
GWBUF* data; /**< Response data, possibly incomplete. */
|
||||||
|
size_t n_totalfields; /**< The number of fields a resultset contains. */
|
||||||
|
size_t n_fields; /**< How many fields we have received, <= n_totalfields. */
|
||||||
|
size_t n_rows; /**< How many rows we have received. */
|
||||||
|
size_t offset; /**< Where we are in the response buffer. */
|
||||||
|
} CACHE_RESPONSE_STATE;
|
||||||
|
|
||||||
|
static void cache_response_state_reset(CACHE_RESPONSE_STATE *state);
|
||||||
|
|
||||||
typedef struct cache_session_data
|
typedef struct cache_session_data
|
||||||
{
|
{
|
||||||
|
CACHE_INSTANCE *instance; /**< The cache instance the session is associated with. */
|
||||||
CACHE_STORAGE_API *api; /**< The storage API to be used. */
|
CACHE_STORAGE_API *api; /**< The storage API to be used. */
|
||||||
CACHE_STORAGE *storage; /**< The storage to be used with this session data. */
|
CACHE_STORAGE *storage; /**< The storage to be used with this session data. */
|
||||||
DOWNSTREAM down; /**< The previous filter or equivalent. */
|
DOWNSTREAM down; /**< The previous filter or equivalent. */
|
||||||
UPSTREAM up; /**< The next filter or equivalent. */
|
UPSTREAM up; /**< The next filter or equivalent. */
|
||||||
GWBUF *packets; /**< A possible incomplete packet. */
|
CACHE_REQUEST_STATE req; /**< The request state. */
|
||||||
|
CACHE_RESPONSE_STATE res; /**< The response state. */
|
||||||
SESSION *session; /**< The session this data is associated with. */
|
SESSION *session; /**< The session this data is associated with. */
|
||||||
char key[CACHE_KEY_MAXLEN]; /**< Key storage. */
|
char key[CACHE_KEY_MAXLEN]; /**< Key storage. */
|
||||||
char *used_key; /**< A key if one is ued. */
|
cache_session_state_t state;
|
||||||
} CACHE_SESSION_DATA;
|
} CACHE_SESSION_DATA;
|
||||||
|
|
||||||
static bool route_using_cache(CACHE_INSTANCE *instance,
|
static CACHE_SESSION_DATA *cache_session_data_create(CACHE_INSTANCE *instance, SESSION *session);
|
||||||
CACHE_SESSION_DATA *sdata,
|
static void cache_session_data_free(CACHE_SESSION_DATA *data);
|
||||||
const GWBUF *key,
|
|
||||||
GWBUF **value);
|
static int handle_expecting_fields(CACHE_SESSION_DATA *csdata);
|
||||||
|
static int handle_expecting_nothing(CACHE_SESSION_DATA *csdata);
|
||||||
|
static int handle_expecting_response(CACHE_SESSION_DATA *csdata);
|
||||||
|
static int handle_expecting_rows(CACHE_SESSION_DATA *csdata);
|
||||||
|
static int handle_ignoring_response(CACHE_SESSION_DATA *csdata);
|
||||||
|
|
||||||
|
static bool route_using_cache(CACHE_SESSION_DATA *sdata, const GWBUF *key, GWBUF **value);
|
||||||
|
|
||||||
|
static int send_upstream(CACHE_SESSION_DATA *csdata);
|
||||||
|
|
||||||
|
static void store_result(CACHE_SESSION_DATA *csdata);
|
||||||
|
|
||||||
//
|
//
|
||||||
// API BEGIN
|
// API BEGIN
|
||||||
@ -181,13 +210,26 @@ static FILTER *createInstance(const char *name, char **options, FILTER_PARAMETER
|
|||||||
error = true;
|
error = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
else if (strcmp(param->name, "max_resultset_rows") == 0)
|
||||||
|
{
|
||||||
|
int v = atoi(param->value);
|
||||||
|
|
||||||
|
if (v > 0)
|
||||||
|
{
|
||||||
|
config.max_resultset_rows = v;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
config.max_resultset_rows = CACHE_DEFAULT_MAX_RESULTSET_ROWS;
|
||||||
|
}
|
||||||
|
}
|
||||||
else if (strcmp(param->name, "max_resultset_size") == 0)
|
else if (strcmp(param->name, "max_resultset_size") == 0)
|
||||||
{
|
{
|
||||||
int v = atoi(param->value);
|
int v = atoi(param->value);
|
||||||
|
|
||||||
if (v > 0)
|
if (v > 0)
|
||||||
{
|
{
|
||||||
config.max_resultset_size = v;
|
config.max_resultset_size = v * 1024;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
@ -282,14 +324,7 @@ static FILTER *createInstance(const char *name, char **options, FILTER_PARAMETER
|
|||||||
static void *newSession(FILTER *instance, SESSION *session)
|
static void *newSession(FILTER *instance, SESSION *session)
|
||||||
{
|
{
|
||||||
CACHE_INSTANCE *cinstance = (CACHE_INSTANCE*)instance;
|
CACHE_INSTANCE *cinstance = (CACHE_INSTANCE*)instance;
|
||||||
CACHE_SESSION_DATA *csdata = (CACHE_SESSION_DATA*)MXS_CALLOC(1, sizeof(CACHE_SESSION_DATA));
|
CACHE_SESSION_DATA *csdata = cache_session_data_create(cinstance, session);
|
||||||
|
|
||||||
if (csdata)
|
|
||||||
{
|
|
||||||
csdata->api = cinstance->module->api;
|
|
||||||
csdata->storage = cinstance->storage;
|
|
||||||
csdata->session = session;
|
|
||||||
}
|
|
||||||
|
|
||||||
return csdata;
|
return csdata;
|
||||||
}
|
}
|
||||||
@ -317,7 +352,7 @@ static void freeSession(FILTER *instance, void *sdata)
|
|||||||
CACHE_INSTANCE *cinstance = (CACHE_INSTANCE*)instance;
|
CACHE_INSTANCE *cinstance = (CACHE_INSTANCE*)instance;
|
||||||
CACHE_SESSION_DATA *csdata = (CACHE_SESSION_DATA*)sdata;
|
CACHE_SESSION_DATA *csdata = (CACHE_SESSION_DATA*)sdata;
|
||||||
|
|
||||||
MXS_FREE(csdata);
|
cache_session_data_free(csdata);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -357,21 +392,21 @@ static void setUpstream(FILTER *instance, void *sdata, UPSTREAM *up)
|
|||||||
* @param sdata The filter session data
|
* @param sdata The filter session data
|
||||||
* @param packets The query data
|
* @param packets The query data
|
||||||
*/
|
*/
|
||||||
static int routeQuery(FILTER *instance, void *sdata, GWBUF *packets)
|
static int routeQuery(FILTER *instance, void *sdata, GWBUF *data)
|
||||||
{
|
{
|
||||||
CACHE_INSTANCE *cinstance = (CACHE_INSTANCE*)instance;
|
CACHE_INSTANCE *cinstance = (CACHE_INSTANCE*)instance;
|
||||||
CACHE_SESSION_DATA *csdata = (CACHE_SESSION_DATA*)sdata;
|
CACHE_SESSION_DATA *csdata = (CACHE_SESSION_DATA*)sdata;
|
||||||
|
|
||||||
if (csdata->packets)
|
if (csdata->req.data)
|
||||||
{
|
{
|
||||||
gwbuf_append(csdata->packets, packets);
|
gwbuf_append(csdata->req.data, data);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
csdata->packets = packets;
|
csdata->req.data = data;
|
||||||
}
|
}
|
||||||
|
|
||||||
GWBUF *packet = modutil_get_next_MySQL_packet(&csdata->packets);
|
GWBUF *packet = modutil_get_next_MySQL_packet(&csdata->req.data);
|
||||||
|
|
||||||
int rv;
|
int rv;
|
||||||
|
|
||||||
@ -379,6 +414,9 @@ static int routeQuery(FILTER *instance, void *sdata, GWBUF *packets)
|
|||||||
{
|
{
|
||||||
bool use_default = true;
|
bool use_default = true;
|
||||||
|
|
||||||
|
cache_response_state_reset(&csdata->res);
|
||||||
|
csdata->state = CACHE_IGNORING_RESPONSE;
|
||||||
|
|
||||||
// TODO: This returns the wrong result if GWBUF_LENGTH(packet) is < 5.
|
// TODO: This returns the wrong result if GWBUF_LENGTH(packet) is < 5.
|
||||||
if (modutil_is_SQL(packet))
|
if (modutil_is_SQL(packet))
|
||||||
{
|
{
|
||||||
@ -392,10 +430,15 @@ static int routeQuery(FILTER *instance, void *sdata, GWBUF *packets)
|
|||||||
if (qc_get_operation(packet) == QUERY_OP_SELECT)
|
if (qc_get_operation(packet) == QUERY_OP_SELECT)
|
||||||
{
|
{
|
||||||
GWBUF *result;
|
GWBUF *result;
|
||||||
use_default = !route_using_cache(cinstance, csdata, packet, &result);
|
use_default = !route_using_cache(csdata, packet, &result);
|
||||||
|
|
||||||
if (!use_default)
|
if (use_default)
|
||||||
{
|
{
|
||||||
|
csdata->state = CACHE_EXPECTING_RESPONSE;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
csdata->state = CACHE_EXPECTING_NOTHING;
|
||||||
C_DEBUG("Using data from cache.");
|
C_DEBUG("Using data from cache.");
|
||||||
gwbuf_free(packet);
|
gwbuf_free(packet);
|
||||||
DCB *dcb = csdata->session->client_dcb;
|
DCB *dcb = csdata->session->client_dcb;
|
||||||
@ -429,33 +472,66 @@ static int routeQuery(FILTER *instance, void *sdata, GWBUF *packets)
|
|||||||
* @param sdata The filter session data
|
* @param sdata The filter session data
|
||||||
* @param queue The query data
|
* @param queue The query data
|
||||||
*/
|
*/
|
||||||
static int clientReply(FILTER *instance, void *sdata, GWBUF *queue)
|
static int clientReply(FILTER *instance, void *sdata, GWBUF *data)
|
||||||
{
|
{
|
||||||
CACHE_INSTANCE *cinstance = (CACHE_INSTANCE*)instance;
|
CACHE_INSTANCE *cinstance = (CACHE_INSTANCE*)instance;
|
||||||
CACHE_SESSION_DATA *csdata = (CACHE_SESSION_DATA*)sdata;
|
CACHE_SESSION_DATA *csdata = (CACHE_SESSION_DATA*)sdata;
|
||||||
|
|
||||||
// TODO: queue can be put to the cache only if it is a complete
|
int rv;
|
||||||
// TODO: response. If it isn't, then we need to stash it and wait
|
|
||||||
// TODO: we get a complete response.
|
|
||||||
// TODO: Since we will know from the first queue how big the
|
|
||||||
// TODO: entire response will be, this is also where we can decide
|
|
||||||
// TODO: that something is too large to cache. If it is, an existing
|
|
||||||
// TODO: item must be deleted.
|
|
||||||
|
|
||||||
if (csdata->used_key)
|
if (csdata->res.data)
|
||||||
{
|
{
|
||||||
C_DEBUG("Key available, storing result.");
|
gwbuf_append(csdata->res.data, data);
|
||||||
|
}
|
||||||
cache_result_t result = csdata->api->putValue(csdata->storage, csdata->used_key, queue);
|
else
|
||||||
csdata->used_key = NULL;
|
|
||||||
|
|
||||||
if (result != CACHE_RESULT_OK)
|
|
||||||
{
|
{
|
||||||
MXS_ERROR("Could not store cache item.");
|
csdata->res.data = data;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (csdata->state != CACHE_IGNORING_RESPONSE)
|
||||||
|
{
|
||||||
|
if (gwbuf_length(csdata->res.data) > csdata->instance->config.max_resultset_size)
|
||||||
|
{
|
||||||
|
C_DEBUG("Current size %uB of resultset, at least as much "
|
||||||
|
"as maximum allowed size %uKiB. Not caching.",
|
||||||
|
gwbuf_length(csdata->res.data),
|
||||||
|
csdata->instance->config.max_resultset_size / 1024);
|
||||||
|
|
||||||
|
csdata->state = CACHE_IGNORING_RESPONSE;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return csdata->up.clientReply(csdata->up.instance, csdata->up.session, queue);
|
switch (csdata->state)
|
||||||
|
{
|
||||||
|
case CACHE_EXPECTING_FIELDS:
|
||||||
|
rv = handle_expecting_fields(csdata);
|
||||||
|
break;
|
||||||
|
|
||||||
|
case CACHE_EXPECTING_NOTHING:
|
||||||
|
rv = handle_expecting_nothing(csdata);
|
||||||
|
break;
|
||||||
|
|
||||||
|
case CACHE_EXPECTING_RESPONSE:
|
||||||
|
rv = handle_expecting_response(csdata);
|
||||||
|
break;
|
||||||
|
|
||||||
|
case CACHE_EXPECTING_ROWS:
|
||||||
|
rv = handle_expecting_rows(csdata);
|
||||||
|
break;
|
||||||
|
|
||||||
|
case CACHE_IGNORING_RESPONSE:
|
||||||
|
rv = handle_ignoring_response(csdata);
|
||||||
|
break;
|
||||||
|
|
||||||
|
default:
|
||||||
|
MXS_ERROR("Internal cache logic broken, unexpected state: %d", csdata->state);
|
||||||
|
ss_dassert(!true);
|
||||||
|
rv = send_upstream(csdata);
|
||||||
|
cache_response_state_reset(&csdata->res);
|
||||||
|
csdata->state = CACHE_IGNORING_RESPONSE;
|
||||||
|
}
|
||||||
|
|
||||||
|
return rv;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -481,46 +557,340 @@ static void diagnostics(FILTER *instance, void *sdata, DCB *dcb)
|
|||||||
//
|
//
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Route a query via the cache.
|
* Reset cache response state
|
||||||
*
|
*
|
||||||
* @param instance The filter instance.
|
* @param state Pointer to object.
|
||||||
* @param sdata Session data
|
|
||||||
* @param key A SELECT packet.
|
|
||||||
* @param value The result.
|
|
||||||
* @return True if the query was satisfied from the query.
|
|
||||||
*/
|
*/
|
||||||
static bool route_using_cache(CACHE_INSTANCE *instance,
|
static void cache_response_state_reset(CACHE_RESPONSE_STATE *state)
|
||||||
CACHE_SESSION_DATA *csdata,
|
|
||||||
const GWBUF *query,
|
|
||||||
GWBUF **value)
|
|
||||||
{
|
{
|
||||||
// TODO: This works *only* if only one request/response is handled at a time.
|
state->data = NULL;
|
||||||
// TODO: Is that the case, or is it not?
|
state->n_totalfields = 0;
|
||||||
|
state->n_fields = 0;
|
||||||
|
state->n_rows = 0;
|
||||||
|
state->offset = 0;
|
||||||
|
}
|
||||||
|
|
||||||
cache_result_t result = csdata->api->getKey(csdata->storage, query, csdata->key);
|
/**
|
||||||
|
* Create cache session data
|
||||||
|
*
|
||||||
|
* @param instance The cache instance this data is associated with.
|
||||||
|
*
|
||||||
|
* @return Session data or NULL if creation fails.
|
||||||
|
*/
|
||||||
|
static CACHE_SESSION_DATA *cache_session_data_create(CACHE_INSTANCE *instance,
|
||||||
|
SESSION* session)
|
||||||
|
{
|
||||||
|
CACHE_SESSION_DATA *data = (CACHE_SESSION_DATA*)MXS_CALLOC(1, sizeof(CACHE_SESSION_DATA));
|
||||||
|
|
||||||
if (result == CACHE_RESULT_OK)
|
if (data)
|
||||||
{
|
{
|
||||||
result = csdata->api->getValue(csdata->storage, csdata->key, value);
|
data->instance = instance;
|
||||||
|
data->api = instance->module->api;
|
||||||
|
data->storage = instance->storage;
|
||||||
|
data->session = session;
|
||||||
|
data->state = CACHE_EXPECTING_NOTHING;
|
||||||
|
}
|
||||||
|
|
||||||
switch (result)
|
return data;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Free cache session data.
|
||||||
|
*
|
||||||
|
* @param A cache session data previously allocated using session_data_create().
|
||||||
|
*/
|
||||||
|
static void cache_session_data_free(CACHE_SESSION_DATA* data)
|
||||||
|
{
|
||||||
|
if (data)
|
||||||
{
|
{
|
||||||
case CACHE_RESULT_OK:
|
MXS_FREE(data);
|
||||||
csdata->used_key = NULL;
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Called when resultset field information is handled.
|
||||||
|
*
|
||||||
|
* @param csdata The cache session data.
|
||||||
|
*/
|
||||||
|
static int handle_expecting_fields(CACHE_SESSION_DATA *csdata)
|
||||||
|
{
|
||||||
|
ss_dassert(csdata->state == CACHE_EXPECTING_FIELDS);
|
||||||
|
ss_dassert(csdata->res.data);
|
||||||
|
|
||||||
|
int rv = 1;
|
||||||
|
|
||||||
|
bool insufficient = false;
|
||||||
|
|
||||||
|
size_t buflen = gwbuf_length(csdata->res.data);
|
||||||
|
|
||||||
|
while (!insufficient && (buflen - csdata->res.offset >= MYSQL_HEADER_LEN))
|
||||||
|
{
|
||||||
|
uint8_t header[MYSQL_HEADER_LEN + 1];
|
||||||
|
gwbuf_copy_data(csdata->res.data, csdata->res.offset, MYSQL_HEADER_LEN + 1, header);
|
||||||
|
|
||||||
|
size_t packetlen = MYSQL_HEADER_LEN + MYSQL_GET_PACKET_LEN(header);
|
||||||
|
|
||||||
|
if (csdata->res.offset + packetlen <= buflen)
|
||||||
|
{
|
||||||
|
// We have at least one complete packet.
|
||||||
|
int command = (int)MYSQL_GET_COMMAND(header);
|
||||||
|
|
||||||
|
switch (command)
|
||||||
|
{
|
||||||
|
case 0xfe: // EOF, the one after the fields.
|
||||||
|
csdata->res.offset += packetlen;
|
||||||
|
csdata->state = CACHE_EXPECTING_ROWS;
|
||||||
|
rv = handle_expecting_rows(csdata);
|
||||||
break;
|
break;
|
||||||
|
|
||||||
default:
|
default: // Field information.
|
||||||
MXS_ERROR("Could not get value from cache storage.");
|
csdata->res.offset += packetlen;
|
||||||
case CACHE_RESULT_NOT_FOUND:
|
++csdata->res.n_fields;
|
||||||
csdata->used_key = csdata->key;
|
ss_dassert(csdata->res.n_fields <= csdata->res.n_totalfields);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
{
|
||||||
|
// We need more data
|
||||||
|
insufficient = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return rv;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Called when data is received (even if nothing is expected) from the server.
|
||||||
|
*
|
||||||
|
* @param csdata The cache session data.
|
||||||
|
*/
|
||||||
|
static int handle_expecting_nothing(CACHE_SESSION_DATA *csdata)
|
||||||
|
{
|
||||||
|
ss_dassert(csdata->state == CACHE_EXPECTING_NOTHING);
|
||||||
|
ss_dassert(csdata->res.data);
|
||||||
|
MXS_ERROR("Received data from the backend althoug we were expecting nothing.");
|
||||||
|
ss_dassert(!true);
|
||||||
|
|
||||||
|
return send_upstream(csdata);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Called when a response is received from the server.
|
||||||
|
*
|
||||||
|
* @param csdata The cache session data.
|
||||||
|
*/
|
||||||
|
static int handle_expecting_response(CACHE_SESSION_DATA *csdata)
|
||||||
|
{
|
||||||
|
ss_dassert(csdata->state == CACHE_EXPECTING_RESPONSE);
|
||||||
|
ss_dassert(csdata->res.data);
|
||||||
|
|
||||||
|
int rv = 1;
|
||||||
|
|
||||||
|
size_t buflen = gwbuf_length(csdata->res.data);
|
||||||
|
|
||||||
|
if (buflen >= MYSQL_HEADER_LEN + 1) // We need the command byte.
|
||||||
|
{
|
||||||
|
// Reserve enough space to accomodate for the largest length encoded integer,
|
||||||
|
// which is type field + 8 bytes.
|
||||||
|
uint8_t header[MYSQL_HEADER_LEN + 1 + 8];
|
||||||
|
gwbuf_copy_data(csdata->res.data, 0, MYSQL_HEADER_LEN + 1, header);
|
||||||
|
|
||||||
|
switch ((int)MYSQL_GET_COMMAND(header))
|
||||||
|
{
|
||||||
|
case 0x00: // OK
|
||||||
|
case 0xff: // ERR
|
||||||
|
C_DEBUG("OK or ERR");
|
||||||
|
store_result(csdata);
|
||||||
|
|
||||||
|
rv = send_upstream(csdata);
|
||||||
|
csdata->state = CACHE_EXPECTING_NOTHING;
|
||||||
|
break;
|
||||||
|
|
||||||
|
case 0xfb: // GET_MORE_CLIENT_DATA/SEND_MORE_CLIENT_DATA
|
||||||
|
C_DEBUG("GET_MORE_CLIENT_DATA");
|
||||||
|
rv = send_upstream(csdata);
|
||||||
|
csdata->state = CACHE_IGNORING_RESPONSE;
|
||||||
|
break;
|
||||||
|
|
||||||
|
default:
|
||||||
|
C_DEBUG("RESULTSET");
|
||||||
|
|
||||||
|
if (csdata->res.n_totalfields != 0)
|
||||||
|
{
|
||||||
|
// We've seen the header and have figured out how many fields there are.
|
||||||
|
csdata->state = CACHE_EXPECTING_FIELDS;
|
||||||
|
rv = handle_expecting_fields(csdata);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// leint_bytes() returns the length of the int type field + the size of the
|
||||||
|
// integer.
|
||||||
|
size_t n_bytes = leint_bytes(&header[4]);
|
||||||
|
|
||||||
|
if (MYSQL_HEADER_LEN + n_bytes <= buflen)
|
||||||
|
{
|
||||||
|
// Now we can figure out how many fields there are, but first we
|
||||||
|
// need to copy some more data.
|
||||||
|
gwbuf_copy_data(csdata->res.data,
|
||||||
|
MYSQL_HEADER_LEN + 1, n_bytes - 1, &header[MYSQL_HEADER_LEN + 1]);
|
||||||
|
|
||||||
|
csdata->res.n_totalfields = leint_value(&header[4]);
|
||||||
|
csdata->res.offset = MYSQL_HEADER_LEN + n_bytes;
|
||||||
|
|
||||||
|
csdata->state = CACHE_EXPECTING_FIELDS;
|
||||||
|
rv = handle_expecting_fields(csdata);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// We need more data. We will be called again, when data is available.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return rv;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Called when resultset rows are handled.
|
||||||
|
*
|
||||||
|
* @param csdata The cache session data.
|
||||||
|
*/
|
||||||
|
static int handle_expecting_rows(CACHE_SESSION_DATA *csdata)
|
||||||
|
{
|
||||||
|
ss_dassert(csdata->state == CACHE_EXPECTING_ROWS);
|
||||||
|
ss_dassert(csdata->res.data);
|
||||||
|
|
||||||
|
int rv = 1;
|
||||||
|
|
||||||
|
bool insufficient = false;
|
||||||
|
|
||||||
|
size_t buflen = gwbuf_length(csdata->res.data);
|
||||||
|
|
||||||
|
while (!insufficient && (buflen - csdata->res.offset >= MYSQL_HEADER_LEN))
|
||||||
|
{
|
||||||
|
uint8_t header[MYSQL_HEADER_LEN + 1];
|
||||||
|
gwbuf_copy_data(csdata->res.data, csdata->res.offset, MYSQL_HEADER_LEN + 1, header);
|
||||||
|
|
||||||
|
size_t packetlen = MYSQL_HEADER_LEN + MYSQL_GET_PACKET_LEN(header);
|
||||||
|
|
||||||
|
if (csdata->res.offset + packetlen <= buflen)
|
||||||
|
{
|
||||||
|
// We have at least one complete packet.
|
||||||
|
int command = (int)MYSQL_GET_COMMAND(header);
|
||||||
|
|
||||||
|
switch (command)
|
||||||
|
{
|
||||||
|
case 0xfe: // EOF, the one after the rows.
|
||||||
|
csdata->res.offset += packetlen;
|
||||||
|
ss_dassert(csdata->res.offset == buflen);
|
||||||
|
|
||||||
|
store_result(csdata);
|
||||||
|
|
||||||
|
rv = send_upstream(csdata);
|
||||||
|
csdata->state = CACHE_EXPECTING_NOTHING;
|
||||||
|
break;
|
||||||
|
|
||||||
|
case 0xfb: // NULL
|
||||||
|
default: // length-encoded-string
|
||||||
|
csdata->res.offset += packetlen;
|
||||||
|
++csdata->res.n_rows;
|
||||||
|
|
||||||
|
if (csdata->res.n_rows > csdata->instance->config.max_resultset_rows)
|
||||||
|
{
|
||||||
|
C_DEBUG("Max rows %lu reached, not caching result.", csdata->res.n_rows);
|
||||||
|
rv = send_upstream(csdata);
|
||||||
|
csdata->res.offset = buflen; // To abort the loop.
|
||||||
|
csdata->state = CACHE_IGNORING_RESPONSE;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// We need more data
|
||||||
|
insufficient = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return rv;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Called when all data from the server is ignored.
|
||||||
|
*
|
||||||
|
* @param csdata The cache session data.
|
||||||
|
*/
|
||||||
|
static int handle_ignoring_response(CACHE_SESSION_DATA *csdata)
|
||||||
|
{
|
||||||
|
ss_dassert(csdata->state == CACHE_IGNORING_RESPONSE);
|
||||||
|
ss_dassert(csdata->res.data);
|
||||||
|
|
||||||
|
return send_upstream(csdata);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Route a query via the cache.
|
||||||
|
*
|
||||||
|
* @param csdata Session data
|
||||||
|
* @param key A SELECT packet.
|
||||||
|
* @param value The result.
|
||||||
|
* @return True if the query was satisfied from the query.
|
||||||
|
*/
|
||||||
|
static bool route_using_cache(CACHE_SESSION_DATA *csdata,
|
||||||
|
const GWBUF *query,
|
||||||
|
GWBUF **value)
|
||||||
|
{
|
||||||
|
cache_result_t result = csdata->api->getKey(csdata->storage, query, csdata->key);
|
||||||
|
|
||||||
|
if (result == CACHE_RESULT_OK)
|
||||||
|
{
|
||||||
|
result = csdata->api->getValue(csdata->storage, csdata->key, value);
|
||||||
|
}
|
||||||
|
else
|
||||||
{
|
{
|
||||||
MXS_ERROR("Could not create cache key.");
|
MXS_ERROR("Could not create cache key.");
|
||||||
csdata->used_key = NULL;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return result == CACHE_RESULT_OK;
|
return result == CACHE_RESULT_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Send data upstream.
|
||||||
|
*
|
||||||
|
* @param csdata Session data
|
||||||
|
*
|
||||||
|
* @return Whatever the upstream returns.
|
||||||
|
*/
|
||||||
|
static int send_upstream(CACHE_SESSION_DATA *csdata)
|
||||||
|
{
|
||||||
|
ss_dassert(csdata->res.data != NULL);
|
||||||
|
|
||||||
|
int rv = csdata->up.clientReply(csdata->up.instance, csdata->up.session, csdata->res.data);
|
||||||
|
csdata->res.data = NULL;
|
||||||
|
|
||||||
|
return rv;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Store the data.
|
||||||
|
*
|
||||||
|
* @param csdata Session data
|
||||||
|
*/
|
||||||
|
static void store_result(CACHE_SESSION_DATA *csdata)
|
||||||
|
{
|
||||||
|
ss_dassert(csdata->res.data);
|
||||||
|
|
||||||
|
csdata->res.data = gwbuf_make_contiguous(csdata->res.data);
|
||||||
|
|
||||||
|
cache_result_t result = csdata->api->putValue(csdata->storage,
|
||||||
|
csdata->key,
|
||||||
|
csdata->res.data);
|
||||||
|
|
||||||
|
if (result != CACHE_RESULT_OK)
|
||||||
|
{
|
||||||
|
MXS_ERROR("Could not store cache item.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
33
server/modules/filter/cache/cache.h
vendored
Normal file
33
server/modules/filter/cache/cache.h
vendored
Normal file
@ -0,0 +1,33 @@
|
|||||||
|
#ifndef CACHE_H
|
||||||
|
#define CACHE_H
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2016 MariaDB Corporation Ab
|
||||||
|
*
|
||||||
|
* Use of this software is governed by the Business Source License included
|
||||||
|
* in the LICENSE.TXT file and at www.mariadb.com/bsl.
|
||||||
|
*
|
||||||
|
* Change Date: 2019-07-01
|
||||||
|
*
|
||||||
|
* On the date above, in accordance with the Business Source License, use
|
||||||
|
* of this software will be governed by version 2 or later of the General
|
||||||
|
* Public License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include <limits.h>
|
||||||
|
|
||||||
|
|
||||||
|
typedef enum cache_references
|
||||||
|
{
|
||||||
|
CACHE_REFERENCES_ANY, // select * from tbl;
|
||||||
|
CACHE_REFERENCES_QUALIFIED // select * from db.tbl;
|
||||||
|
} cache_references_t;
|
||||||
|
|
||||||
|
#define CACHE_DEFAULT_ALLOWED_REFERENCES CACHE_REFERENCES_QUALIFIED
|
||||||
|
// Count
|
||||||
|
#define CACHE_DEFAULT_MAX_RESULTSET_ROWS UINT_MAX
|
||||||
|
// Bytes
|
||||||
|
#define CACHE_DEFAULT_MAX_RESULTSET_SIZE 64 * 1024
|
||||||
|
// Seconds
|
||||||
|
#define CACHE_DEFAULT_TTL 10
|
||||||
|
|
||||||
|
#endif
|
||||||
Reference in New Issue
Block a user