diff --git a/Documentation/Filters/Cache.md b/Documentation/Filters/Cache.md new file mode 100644 index 000000000..902483e15 --- /dev/null +++ b/Documentation/Filters/Cache.md @@ -0,0 +1,113 @@ +#Cache + +## Overview +The cache filter is capable of caching the result of SELECTs, so that subsequent identical +SELECTs are served directly by MaxScale, without being routed to any server. + +## Configuration + +The cache filter is straightforward to configure and simple to add to any +existing service. + +``` +[Cache] +type=filter +module=cache +ttl=5 +storage=... +storage_args=... + +[Cached Routing Service] +type=service +... +filters=Cache +``` + +Each configured cache filter uses a storage of its own. That is, if there +are two services, each configured with a specific cache filter, then, +even if queries target the very same servers the cached data will not +be shared. + +Two services can use the same cache filter, but then either the services +should use the very same servers _or_ a completely different set of servers, +where the used table names are different. Otherwise there can be unintended +sharing. + + +### Filter Parameters + +The cache filter has one mandatory parameter - `storage` - and a few +optional ones. + +#### `storage` + +The name of the module that provides the storage for the cache. That +module will be loaded and provided with the value of `storage_args` as +argument. For instance: +``` +storage=storage_rocksdb +``` + +#### `storage_args` + +A comma separated list of arguments to be provided to the storage module, +specified in `storage`, when it is loaded. Note that the needed arguments +depend upon the specific module. For instance, +``` +storage_args=path=/usr/maxscale/cache/rocksdb +``` + +#### `allowed_references` + +Specifies whether any or only fully qualified references are allowed in +queries stored to the cache. +``` +allowed_references=[fully-qualified|any] +``` +The default is `fully-qualified`, which means that only queries where +the database name is included in the table name are subject to caching. +``` +select col from db.tbl; +``` +If `any` is specified, then also queries where the table name is not +fully qualified are subject to caching. +``` +select col from tbl; +``` +Care should be excersized before this setting is changed, because, for +instance, the following is likely to produce unexpected results. +``` +use db1; +select col from tbl; +... +use db2; +select col from tbl; +``` +The setting can be changed to `any`, provided fully qualified names +are always used or if the names of tables in different databases are +different. + +#### `maximum_resultset_size` + +Specifies the maximum size a resultset can have, measured in kibibytes, +in order to be stored in the cache. A resultset larger than this, will +not be stored. +``` +maximum_resultset_size=64 +``` +The default value is TBD. + +#### `ttl` + +_Time to live_; the amount of time - in seconds - the cached result is used +before it is refreshed from the server. + +If nothing is specified, the default _ttl_ value is 10. + +``` +ttl=60 +``` + +#Storage + +## Storage RocksDB diff --git a/server/modules/filter/cache/CMakeLists.txt b/server/modules/filter/cache/CMakeLists.txt index 05d9e742c..19499cc58 100644 --- a/server/modules/filter/cache/CMakeLists.txt +++ b/server/modules/filter/cache/CMakeLists.txt @@ -1,4 +1,4 @@ -add_library(cache SHARED cache.c) +add_library(cache SHARED cache.c storage.c) target_link_libraries(cache maxscale-common) set_target_properties(cache PROPERTIES VERSION "1.0.0") install_module(cache experimental) diff --git a/server/modules/filter/cache/cache.c b/server/modules/filter/cache/cache.c index 1826ed28d..2678e0eba 100644 --- a/server/modules/filter/cache/cache.c +++ b/server/modules/filter/cache/cache.c @@ -11,12 +11,19 @@ * Public License. */ -#include -#include +#define MXS_MODULE_NAME "cache" #include +#include +#include +#include +#include +#include +#include "storage.h" static char VERSION_STRING[] = "V1.0.0"; +static const int DEFAULT_TTL = 10; + static FILTER *createInstance(const char *name, char **options, FILTER_PARAMETER **); static void *newSession(FILTER *instance, SESSION *session); static void closeSession(FILTER *instance, void *sdata); @@ -27,6 +34,8 @@ static int routeQuery(FILTER *instance, void *sdata, GWBUF *queue); static int clientReply(FILTER *instance, void *sdata, GWBUF *queue); static void diagnostics(FILTER *instance, void *sdata, DCB *dcb); +#define C_DEBUG(format, ...) MXS_LOG_MESSAGE(LOG_NOTICE, format, ##__VA_ARGS__) + // // Global symbols of the Module // @@ -81,14 +90,35 @@ FILTER_OBJECT *GetModuleObject() typedef struct cache_instance { + const char *name; + const char *storage_name; + const char *storage_args; + uint32_t ttl; // Time to live in seconds. + CACHE_STORAGE_MODULE *module; + CACHE_STORAGE *storage; } CACHE_INSTANCE; typedef struct cache_session_data { - DOWNSTREAM down; - UPSTREAM up; + CACHE_STORAGE_API *api; /**< The storage API to be used. */ + CACHE_STORAGE *storage; /**< The storage to be used with this session data. */ + DOWNSTREAM down; /**< The previous filter or equivalent. */ + UPSTREAM up; /**< The next filter or equivalent. */ + GWBUF *packets; /**< A possible incomplete packet. */ + SESSION *session; /**< The session this data is associated with. */ + char key[CACHE_KEY_MAXLEN]; /**< Key storage. */ + char *used_key; /**< A key if one is ued. */ } CACHE_SESSION_DATA; +static bool route_using_cache(CACHE_INSTANCE *instance, + CACHE_SESSION_DATA *sdata, + const GWBUF *key, + GWBUF **value); + +// +// API BEGIN +// + /** * Create an instance of the cache filter for a particular service * within MaxScale. @@ -101,10 +131,88 @@ typedef struct cache_session_data */ static FILTER *createInstance(const char *name, char **options, FILTER_PARAMETER **params) { - CACHE_INSTANCE *cinstance; + const char *storage_name = 0; + const char *storage_args = 0; + uint32_t ttl = DEFAULT_TTL; - if ((cinstance = MXS_CALLOC(1, sizeof(CACHE_INSTANCE))) != NULL) + bool error = false; + + for (int i = 0; params[i]; ++i) { + const FILTER_PARAMETER *param = params[i]; + + if (strcmp(param->name, "storage_name") == 0) + { + storage_name = param->value; + } + else if (strcmp(param->name, "storage_args") == 0) + { + storage_args = param->value; + } + else if (strcmp(param->name, "ttl") == 0) + { + int v = atoi(param->value); + + if (v > 0) + { + ttl = v; + } + else + { + MXS_ERROR("The value of the configuration entry 'ttl' must " + "be an integer larger than 0."); + error = true; + } + } + else if (!filter_standard_parameter(params[i]->name)) + { + MXS_ERROR("Unknown configuration entry '%s'.", param->name); + error = true; + } + } + + CACHE_INSTANCE *cinstance = NULL; + + if (!error) + { + if ((cinstance = MXS_CALLOC(1, sizeof(CACHE_INSTANCE))) != NULL) + { + CACHE_STORAGE_MODULE *module = cache_storage_open(storage_name); + + if (module) + { + CACHE_STORAGE *storage = module->api->createInstance(name, ttl, 0, NULL); + + if (storage) + { + cinstance->name = name; + cinstance->storage_name = storage_name; + cinstance->storage_args = storage_args; + cinstance->ttl = ttl; + cinstance->module = module; + cinstance->storage = storage; + + MXS_NOTICE("Cache storage %s opened and initialized.", storage_name); + } + else + { + MXS_ERROR("Could not create storage instance for %s.", name); + cache_storage_close(module); + MXS_FREE(cinstance); + cinstance = NULL; + } + } + else + { + MXS_ERROR("Could not load cache storage module %s.", name); + MXS_FREE(cinstance); + cinstance = NULL; + } + } + } + else + { + cinstance = NULL; } return (FILTER*)cinstance; @@ -125,6 +233,9 @@ static void *newSession(FILTER *instance, SESSION *session) if (csdata) { + csdata->api = cinstance->module->api; + csdata->storage = cinstance->storage; + csdata->session = session; } return csdata; @@ -191,14 +302,97 @@ static void setUpstream(FILTER *instance, void *sdata, UPSTREAM *up) * * @param instance The filter instance data * @param sdata The filter session data - * @param queue The query data + * @param packets The query data */ -static int routeQuery(FILTER *instance, void *sdata, GWBUF *queue) +static int routeQuery(FILTER *instance, void *sdata, GWBUF *packets) { CACHE_INSTANCE *cinstance = (CACHE_INSTANCE*)instance; CACHE_SESSION_DATA *csdata = (CACHE_SESSION_DATA*)sdata; - return csdata->down.routeQuery(csdata->down.instance, csdata->down.session, queue); + if (csdata->packets) + { + C_DEBUG("Old packets exist."); + gwbuf_append(csdata->packets, packets); + } + else + { + C_DEBUG("NO old packets exist."); + csdata->packets = packets; + } + + packets = modutil_get_complete_packets(&csdata->packets); + + int rv; + + if (packets) + { + C_DEBUG("At least one complete packet exist."); + GWBUF *packet; + + // TODO: Is it really possible to get more that one packet + // TODO: is this loop? If so, can those packets be sent + // TODO: after one and other, or do we need to wait for + // TODO: a replies? If there are more complete packets + // TODO: than one, then either CACHE_SESSION_DATA::key + // TODO: needs to be a queue + + // TODO: modutil_get_next_MySQL_packet *copies* the data. + while ((packet = modutil_get_next_MySQL_packet(&packets))) + { + C_DEBUG("Processing packet."); + bool use_default = true; + + if (modutil_is_SQL(packet)) + { + C_DEBUG("Is SQL."); + // We do not care whether the query was fully parsed or not. + // If a query cannot be fully parsed, the worst thing that can + // happen is that caching is not used, even though it would be + // possible. + + if (qc_get_operation(packet) == QUERY_OP_SELECT) + { + C_DEBUG("Is a SELECT"); + + GWBUF *result; + use_default = !route_using_cache(cinstance, csdata, packet, &result); + + if (!use_default) + { + C_DEBUG("Using data from cache."); + gwbuf_free(packet); + DCB *dcb = csdata->session->client_dcb; + + // TODO: This is not ok. Any filters before this filter, will not + // TODO: see this data. + rv = dcb->func.write(dcb, result); + } + } + else + { + C_DEBUG("Is NOT a SELECT"); + } + } + else + { + C_DEBUG("Is NOT SQL."); + } + + if (use_default) + { + C_DEBUG("Using default processing."); + rv = csdata->down.routeQuery(csdata->down.instance, csdata->down.session, packet); + } + } + } + else + { + C_DEBUG("Not even one complete packet exist; more data needed."); + // Ok, we need more data before we can do something. + rv = 1; + } + + return rv; } /** @@ -213,6 +407,27 @@ static int clientReply(FILTER *instance, void *sdata, GWBUF *queue) CACHE_INSTANCE *cinstance = (CACHE_INSTANCE*)instance; CACHE_SESSION_DATA *csdata = (CACHE_SESSION_DATA*)sdata; + // TODO: queue can be put to the cache only if it is a complete + // TODO: response. If it isn't, then we need to stash it and wait + // TODO: we get a complete response. + // TODO: Since we will know from the first queue how big the + // TODO: entire response will be, this is also where we can decide + // TODO: that something is too large to cache. If it is, an existing + // TODO: item must be deleted. + + if (csdata->used_key) + { + C_DEBUG("Key available, storing result."); + + cache_result_t result = csdata->api->putValue(csdata->storage, csdata->used_key, queue); + csdata->used_key = NULL; + + if (result != CACHE_RESULT_OK) + { + MXS_ERROR("Could not store cache item."); + } + } + return csdata->up.clientReply(csdata->up.instance, csdata->up.session, queue); } @@ -233,3 +448,52 @@ static void diagnostics(FILTER *instance, void *sdata, DCB *dcb) dcb_printf(dcb, "Hello World from Cache!\n"); } + +// +// API END +// + +/** + * Route a query via the cache. + * + * @param instance The filter instance. + * @param sdata Session data + * @param key A SELECT packet. + * @param value The result. + * @return True if the query was satisfied from the query. + */ +static bool route_using_cache(CACHE_INSTANCE *instance, + CACHE_SESSION_DATA *csdata, + const GWBUF *query, + GWBUF **value) +{ + // TODO: This works *only* if only one request/response is handled at a time. + // TODO: Is that the case, or is it not? + + cache_result_t result = csdata->api->getKey(csdata->storage, query, csdata->key); + + if (result == CACHE_RESULT_OK) + { + result = csdata->api->getValue(csdata->storage, csdata->key, value); + + switch (result) + { + case CACHE_RESULT_OK: + csdata->used_key = NULL; + break; + + default: + MXS_ERROR("Could not get value from cache storage."); + case CACHE_RESULT_NOT_FOUND: + csdata->used_key = csdata->key; + break; + } + } + else + { + MXS_ERROR("Could not create cache key."); + csdata->used_key = NULL; + } + + return result == CACHE_RESULT_OK; +} diff --git a/server/modules/filter/cache/cache_storage_api.h b/server/modules/filter/cache/cache_storage_api.h new file mode 100644 index 000000000..f6c1b6618 --- /dev/null +++ b/server/modules/filter/cache/cache_storage_api.h @@ -0,0 +1,120 @@ +#ifndef _MAXSCALE_FILTER_CACHE_CACHE_H +#define _MAXSCALE_FILTER_CACHE_CACHE_H +/* + * Copyright (c) 2016 MariaDB Corporation Ab + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file and at www.mariadb.com/bsl. + * + * Change Date: 2019-07-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2 or later of the General + * Public License. + */ + +#include +#include +#include +#include +#include + +EXTERN_C_BLOCK_BEGIN + +typedef enum cache_result +{ + CACHE_RESULT_OK, + CACHE_RESULT_NOT_FOUND, + CACHE_RESULT_OUT_OF_RESOURCES, + CACHE_RESULT_ERROR +} cache_result_t; + +typedef void* CACHE_STORAGE; + +enum +{ + CACHE_KEY_MAXLEN = 128 +}; + +typedef struct cache_storage_api +{ + /** + * Called immediately after the storage module has been loaded. + * + * @return True if the initialization succeeded, false otherwise. + */ + bool (*initialize)(); + + /** + * Creates an instance of cache storage. This function should, if necessary, + * create the actual storage, initialize it and prepare to put and get + * cache items. + * + * @param name The name of the cache instance. + * @param ttl Time to live; number of seconds the value is valid. + * @param argc The number of elements in the argv array. + * @param argv Array of arguments, as passed in the `storage_args` parameter + * in the cache section in the MaxScale configuration file. + * @return A new cache instance, or NULL if the instance could not be + * created. + */ + CACHE_STORAGE* (*createInstance)(const char *name, + uint32_t ttl, + int argc, char* argv[]); + + /** + * Frees an CACHE_STORAGE instance earlier created with createInstance. + * + * @param instance The CACHE_STORAGE instance to be freed. + */ + void (*freeInstance)(CACHE_STORAGE* instance); + + /** + * Create a key for a GWBUF. + * + * @param storage Pointer to a CACHE_STORAGE. + * @param query An SQL query. Must be one contiguous buffer. + * @param key Pointer to array of CACHE_KEY_MAXLEN size where + * the key will be written. + * @return CACHE_RESULT_OK if a key was created, otherwise some error code. + */ + cache_result_t (*getKey)(CACHE_STORAGE* storage, + const GWBUF* query, + char* key); + /** + * Get a value from the cache. + * + * @param storage Pointer to a CACHE_STORAGE. + * @param key A key generated with getKey. + * @param result Pointer to variable that after a successful return will + * point to a GWBUF. + * @return CACHE_RESULT_OK if item was found, + * CACHE_RESULT_NOT_FOUND if item was not found (which may be because + * the ttl was reached), or some other error code. + */ + cache_result_t (*getValue)(CACHE_STORAGE* storage, + const char* key, + GWBUF** result); + + /** + * Put a value to the cache. + * + * @param storage Pointer to a CACHE_STORAGE. + * @param key A key generated with getKey. + * @param value Pointer to GWBUF containing the value to be stored. + * Must be one contiguous buffer. + * @return CACHE_RESULT_OK if item was successfully put, + * CACHE_RESULT_OUT_OF_RESOURCES if item could not be put, due to + * some resource having become exhausted, or some other error code. + */ + cache_result_t (*putValue)(CACHE_STORAGE* storage, + const char* key, + const GWBUF* value); +} CACHE_STORAGE_API; + +#define CACHE_STORAGE_ENTRY_POINT "CacheGetStorageAPI" +typedef CACHE_STORAGE_API* (*CacheGetStorageAPIFN)(); + +EXTERN_C_BLOCK_END + +#endif diff --git a/server/modules/filter/cache/storage.c b/server/modules/filter/cache/storage.c new file mode 100644 index 000000000..7aa51b82e --- /dev/null +++ b/server/modules/filter/cache/storage.c @@ -0,0 +1,96 @@ +/* + * Copyright (c) 2016 MariaDB Corporation Ab + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file and at www.mariadb.com/bsl. + * + * Change Date: 2019-07-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2 or later of the General + * Public License. + */ + +#include "storage.h" +#include +#include +#include +#include +#include + +CACHE_STORAGE_MODULE* cache_storage_open(const char *name) +{ + CACHE_STORAGE_MODULE* module = (CACHE_STORAGE_MODULE*)MXS_CALLOC(1, sizeof(CACHE_STORAGE_MODULE)); + + if (module) + { + char path[MAXPATHLEN + 1]; + sprintf(path, "%s/lib%s.so", get_libdir(), name); + + void *handle = dlopen(path, RTLD_NOW | RTLD_LOCAL); + + if (handle) + { + module->handle = handle; + + void *f = dlsym(module->handle, CACHE_STORAGE_ENTRY_POINT); + + if (f) + { + module->api = ((CacheGetStorageAPIFN)f)(); + + if (module->api) + { + if (!(module->api->initialize)()) + { + MXS_ERROR("Initialization of %s failed.", path); + + (void)dlclose(module->handle); + MXS_FREE(module); + module = NULL; + } + } + else + { + MXS_ERROR("Could not obtain API object from %s.", name); + + (void)dlclose(module->handle); + MXS_FREE(module); + module = NULL; + } + } + else + { + const char* s = dlerror(); + MXS_ERROR("Could not look up symbol %s from %s: %s", + name, CACHE_STORAGE_ENTRY_POINT, s ? s : ""); + MXS_FREE(module); + module = NULL; + } + } + else + { + const char* s = dlerror(); + MXS_ERROR("Could not load %s: %s", name, s ? s : ""); + MXS_FREE(module); + module = NULL; + } + } + + return module; +} + + +void cache_storage_close(CACHE_STORAGE_MODULE *module) +{ + if (module) + { + if (dlclose(module->handle) != 0) + { + const char *s = dlerror(); + MXS_ERROR("Could not close module %s: ", s ? s : ""); + } + + MXS_FREE(module); + } +} diff --git a/server/modules/filter/cache/storage.h b/server/modules/filter/cache/storage.h new file mode 100644 index 000000000..3132938f3 --- /dev/null +++ b/server/modules/filter/cache/storage.h @@ -0,0 +1,27 @@ +#ifndef _MAXSCALE_FILTER_CACHE_STORAGE_H +#define _MAXSCALE_FILTER_CACHE_STORAGE_H +/* + * Copyright (c) 2016 MariaDB Corporation Ab + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file and at www.mariadb.com/bsl. + * + * Change Date: 2019-07-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2 or later of the General + * Public License. + */ + +#include "cache_storage_api.h" + +typedef struct cache_storage_module_t +{ + void* handle; + CACHE_STORAGE_API* api; +} CACHE_STORAGE_MODULE; + +CACHE_STORAGE_MODULE* cache_storage_open(const char *name); +void cache_storage_close(CACHE_STORAGE_MODULE *module); + +#endif