Merge branch 'develop' into binlog_server_waitdata_encryption

This commit is contained in:
MassimilianoPinto
2016-11-29 17:38:04 +01:00
40 changed files with 1561 additions and 551 deletions

View File

@ -100,6 +100,21 @@ path is interpreted relative to the _data directory_ of MariaDB MaxScale.
rules=/path/to/rules-file
```
#### `cached_data`
An enumeration option specifying how data is shared between threads. The
allowed values are:
* `shared`: The cached data is shared between threads. On the one hand
it implies that there will be synchronization between threads, on
the other hand that all threads will use data fetched by any thread.
* `thread_specific`: The cached data is specific to a thread. On the
one hand it implies that no synchonization is needed between threads,
on the other hand that the very same data may be fetched and stored
multiple times.
Default is `shared`.
#### `debug`
An integer value, using which the level of debug logging made by the cache

View File

@ -157,3 +157,23 @@ bool runtime_create_listener(SERVICE *service, const char *name, const char *add
* @return True if the listener was successfully destroyed
*/
bool runtime_destroy_listener(SERVICE *service, const char *name);
/**
* @brief Create a new monitor
*
* @param name Name of the monitor
* @param module Monitor module
* @return True if new monitor was created and persisted
*/
bool runtime_create_monitor(const char *name, const char *module);
/**
* @brief Destroy a monitor
*
* Monitors are not removed from the runtime configuration but they are stopped.
* Destroyed monitor are removed after a restart.
*
* @param monitor Monitor to destroy
* @return True if monitor was destroyed
*/
bool runtime_destroy_monitor(MONITOR *monitor);

View File

@ -196,6 +196,7 @@ struct monitor
* two times the option value.
*/
MONITOR_OBJECT *module; /**< The "monitor object" */
char *module_name; /**< Name of the monitor module */
void *handle; /**< Handle returned from startMonitor */
size_t interval; /**< The monitor interval */
struct monitor *next; /**< Next monitor in the linked list */
@ -243,7 +244,7 @@ void mon_log_state_change(MONITOR_SERVERS *ptr);
void mon_hangup_failed_servers(MONITOR *monitor);
/**
* @brief Serialize a monitor to a file
* @brief Serialize the servers of a monitor to a file
*
* This partially converts @c monitor into an INI format file. Only the servers
* of the monitor are serialized. This allows the monitor to keep monitoring
@ -258,6 +259,16 @@ void mon_hangup_failed_servers(MONITOR *monitor);
*/
bool monitor_serialize_servers(const MONITOR *monitor);
/**
* @brief Serialize a monitor to a file
*
* This converts the static configuration of the monitor into an INI format file.
*
* @param monitor Monitor to serialize
* @return True if serialization was successful
*/
bool monitor_serialize(const MONITOR *monitor);
/**
* Check if a monitor uses @c servers
* @param server Server that is queried

View File

@ -225,6 +225,15 @@ bool serviceLaunchListener(SERVICE *service, SERV_LISTENER *port);
*/
bool serviceStopListener(SERVICE *service, const char *name);
/**
* @brief Restart a stopped listener
*
* @param service Service where the listener is linked
* @param name Name of the listener
* @return True if listener was restarted
*/
bool serviceStartListener(SERVICE *service, const char *name);
/**
* Utility functions
*/

View File

@ -280,6 +280,11 @@ bool runtime_alter_server(SERVER *server, char *key, char *value)
valid = false;
}
if (valid)
{
server_serialize(server);
}
spinlock_release(&crt_lock);
return valid;
}
@ -357,6 +362,11 @@ bool runtime_alter_monitor(MONITOR *monitor, char *key, char *value)
}
}
if (valid)
{
monitor_serialize(monitor);
}
spinlock_release(&crt_lock);
return valid;
}
@ -480,3 +490,58 @@ bool runtime_destroy_listener(SERVICE *service, const char *name)
spinlock_release(&crt_lock);
return rval;
}
bool runtime_create_monitor(const char *name, const char *module)
{
spinlock_acquire(&crt_lock);
bool rval = false;
MONITOR *monitor = monitor_alloc((char*)name, (char*)module);
if (monitor && monitor_serialize(monitor))
{
rval = true;
}
spinlock_release(&crt_lock);
return rval;
}
bool runtime_destroy_monitor(MONITOR *monitor)
{
bool rval = false;
char filename[PATH_MAX];
snprintf(filename, sizeof(filename), "%s/%s.cnf", get_config_persistdir(), monitor->name);
spinlock_acquire(&crt_lock);
if (unlink(filename) == -1)
{
if (errno != ENOENT)
{
char err[MXS_STRERROR_BUFLEN];
MXS_ERROR("Failed to remove persisted monitor configuration '%s': %d, %s",
filename, errno, strerror_r(errno, err, sizeof(err)));
}
else
{
rval = false;
MXS_WARNING("Monitor '%s' was not created at runtime. Remove the "
"monitor manually from the correct configuration file.",
monitor->name);
}
}
else
{
rval = true;
}
if (rval)
{
monitorStop(monitor);
MXS_NOTICE("Destroyed monitor '%s'. The monitor will be removed "
"after the next restart of MaxScale.", monitor->name);
}
spinlock_release(&crt_lock);
return rval;
}

View File

@ -136,10 +136,11 @@ static MODULECMD* command_create(const char *identifier, const char *domain,
MODULECMDFN entry_point, int argc,
modulecmd_arg_type_t* argv)
{
ss_dassert((argc && argv) || (argc == 0 && argv == NULL));
MODULECMD *rval = MXS_MALLOC(sizeof(*rval));
char *id = MXS_STRDUP(identifier);
char *dm = MXS_STRDUP(domain);
modulecmd_arg_type_t *types = MXS_MALLOC(sizeof(*types) * argc);
modulecmd_arg_type_t *types = MXS_MALLOC(sizeof(*types) * (argc ? argc : 1));
if (rval && id && dm && types)
{
@ -154,6 +155,13 @@ static MODULECMD* command_create(const char *identifier, const char *domain,
types[i] = argv[i];
}
if (argc == 0)
{
/** The command requires no arguments */
types[0].type = MODULECMD_ARG_NONE;
types[0].description = "";
}
rval->func = entry_point;
rval->identifier = id;
rval->domain = dm;

View File

@ -73,13 +73,15 @@ MONITOR *
monitor_alloc(char *name, char *module)
{
name = MXS_STRDUP(name);
char *my_module = MXS_STRDUP(module);
MONITOR *mon = (MONITOR *)MXS_MALLOC(sizeof(MONITOR));
if (!name || !mon)
if (!name || !mon || !my_module)
{
MXS_FREE(name);
MXS_FREE(mon);
MXS_FREE(my_module);
return NULL;
}
@ -92,6 +94,7 @@ monitor_alloc(char *name, char *module)
}
mon->state = MONITOR_STATE_ALLOC;
mon->name = name;
mon->module_name = module;
mon->handle = NULL;
mon->databases = NULL;
*mon->password = '\0';
@ -144,6 +147,7 @@ monitor_free(MONITOR *mon)
free_config_parameter(mon->parameters);
monitor_server_free_all(mon->databases);
MXS_FREE(mon->name);
MXS_FREE(mon->module_name);
MXS_FREE(mon);
}
@ -1261,7 +1265,7 @@ bool monitor_server_in_use(const SERVER *server)
* @param filename Filename where configuration is written
* @return True on success, false on error
*/
static bool create_monitor_config(const MONITOR *monitor, const char *filename)
static bool create_monitor_server_config(const MONITOR *monitor, const char *filename)
{
int file = open(filename, O_EXCL | O_CREAT | O_WRONLY, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
@ -1300,7 +1304,75 @@ static bool create_monitor_config(const MONITOR *monitor, const char *filename)
return true;
}
static bool create_monitor_config(const MONITOR *monitor, const char *filename)
{
int file = open(filename, O_EXCL | O_CREAT | O_WRONLY, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
if (file == -1)
{
char errbuf[MXS_STRERROR_BUFLEN];
MXS_ERROR("Failed to open file '%s' when serializing monitor '%s': %d, %s",
filename, monitor->name, errno, strerror_r(errno, errbuf, sizeof(errbuf)));
return false;
}
/**
* Only additional parameters are added to the configuration. This prevents
* duplication or addition of parameters that don't support it.
*
* TODO: Check for return values on all of the dprintf calls
*/
dprintf(file, "[%s]\n", monitor->name);
dprintf(file, "module=%s\n", monitor->module_name);
dprintf(file, "user=%s\n", monitor->user);
dprintf(file, "password=%s\n", monitor->password);
dprintf(file, "monitor_interval=%lu\n", monitor->interval);
dprintf(file, "backend_connect_timeout=%d\n", monitor->connect_timeout);
dprintf(file, "backend_write_timeout=%d\n", monitor->write_timeout);
dprintf(file, "backend_read_timeout=%d\n", monitor->read_timeout);
close(file);
return true;
}
bool monitor_serialize_servers(const MONITOR *monitor)
{
bool rval = false;
char filename[PATH_MAX];
snprintf(filename, sizeof(filename), "%s/%s.cnf.tmp", get_config_persistdir(),
monitor->name);
if (unlink(filename) == -1 && errno != ENOENT)
{
char err[MXS_STRERROR_BUFLEN];
MXS_ERROR("Failed to remove temporary monitor configuration at '%s': %d, %s",
filename, errno, strerror_r(errno, err, sizeof(err)));
}
else if (create_monitor_server_config(monitor, filename))
{
char final_filename[PATH_MAX];
strcpy(final_filename, filename);
char *dot = strrchr(final_filename, '.');
ss_dassert(dot);
*dot = '\0';
if (rename(filename, final_filename) == 0)
{
rval = true;
}
else
{
char err[MXS_STRERROR_BUFLEN];
MXS_ERROR("Failed to rename temporary monitor configuration at '%s': %d, %s",
filename, errno, strerror_r(errno, err, sizeof(err)));
}
}
return rval;
}
bool monitor_serialize(const MONITOR *monitor)
{
bool rval = false;
char filename[PATH_MAX];

View File

@ -520,6 +520,31 @@ bool serviceStopListener(SERVICE *service, const char *name)
return rval;
}
bool serviceStartListener(SERVICE *service, const char *name)
{
bool rval = false;
spinlock_acquire(&service->spin);
for (SERV_LISTENER *port = service->ports; port; port = port->next)
{
if (strcmp(port->name, name) == 0)
{
if (port->listener && port->listener->session->state == SESSION_STATE_LISTENER_STOPPED &&
poll_add_dcb(port->listener) == 0)
{
port->listener->session->state = SESSION_STATE_LISTENER;
rval = true;
}
break;
}
}
spinlock_release(&service->spin);
return rval;
}
int service_launch_all()
{
SERVICE *ptr;
@ -1422,18 +1447,20 @@ dListListeners(DCB *dcb)
if (service)
{
dcb_printf(dcb, "Listeners.\n");
dcb_printf(dcb, "---------------------+--------------------+-----------------+-------+--------\n");
dcb_printf(dcb, "%-20s | %-18s | %-15s | Port | State\n",
"Service Name", "Protocol Module", "Address");
dcb_printf(dcb, "---------------------+--------------------+-----------------+-------+--------\n");
dcb_printf(dcb, "---------------------+---------------------+"
"--------------------+-----------------+-------+--------\n");
dcb_printf(dcb, "%-20s | %-19s | %-18s | %-15s | Port | State\n",
"Name", "Service Name", "Protocol Module", "Address");
dcb_printf(dcb, "---------------------+---------------------+"
"--------------------+-----------------+-------+--------\n");
}
while (service)
{
lptr = service->ports;
while (lptr)
{
dcb_printf(dcb, "%-20s | %-18s | %-15s | %5d | %s\n",
service->name, lptr->protocol,
dcb_printf(dcb, "%-20s | %-19s | %-18s | %-15s | %5d | %s\n",
lptr->name, service->name, lptr->protocol,
(lptr && lptr->address) ? lptr->address : "*",
lptr->port,
(!lptr->listener ||
@ -1447,7 +1474,8 @@ dListListeners(DCB *dcb)
}
if (allServices)
{
dcb_printf(dcb, "---------------------+--------------------+-----------------+-------+--------\n\n");
dcb_printf(dcb, "---------------------+---------------------+"
"--------------------+-----------------+-------+--------\n\n");
}
spinlock_release(&service_spin);
}

View File

@ -1,5 +1,5 @@
if (JANSSON_FOUND)
add_library(cache SHARED cache.cc cachefilter.cc cachemt.cc rules.cc sessioncache.cc storage.cc storagefactory.cc)
add_library(cache SHARED cache.cc cachefilter.cc cachemt.cc cachept.cc cachesimple.cc cachest.cc rules.cc sessioncache.cc storage.cc storagefactory.cc storagereal.cc)
target_link_libraries(cache maxscale-common jansson)
set_target_properties(cache PROPERTIES VERSION "1.0.0")
set_target_properties(cache PROPERTIES LINK_FLAGS -Wl,-z,defs)

View File

@ -19,77 +19,28 @@
#include "storagefactory.h"
#include "storage.h"
// Initial size of hashtable used for storing keys of queries that
// are being fetches.
#define CACHE_PENDING_ITEMS 50
/**
* Hashes a cache key to an integer.
*
* @param key Pointer to cache key.
*
* @returns Corresponding integer hash.
*/
static int hash_of_key(const void* key)
{
int hash = 0;
const char* i = (const char*)key;
const char* end = i + CACHE_KEY_MAXLEN;
while (i < end)
{
int c = *i;
hash = c + (hash << 6) + (hash << 16) - hash;
++i;
}
return hash;
}
static int hashfn(const void* address)
{
// TODO: Hash the address; pointers are not evenly distributed.
return (long)address;
}
static int hashcmp(const void* address1, const void* address2)
{
return (long)address2 - (long)address1;
}
Cache::Cache(const char* zName,
CACHE_CONFIG& config,
Cache::Cache(const std::string& name,
const CACHE_CONFIG* pConfig,
CACHE_RULES* pRules,
StorageFactory* pFactory,
Storage* pStorage,
HASHTABLE* pPending)
: m_zName(zName)
, m_config(config)
StorageFactory* pFactory)
: m_name(name)
, m_config(*pConfig)
, m_pRules(pRules)
, m_pFactory(pFactory)
, m_pStorage(pStorage)
, m_pPending(pPending)
{
cache_config_reset(config);
}
Cache::~Cache()
{
// TODO: Free everything.
ss_dassert(false);
cache_rules_free(m_pRules);
delete m_pFactory;
}
//static
bool Cache::Create(const CACHE_CONFIG& config,
CACHE_RULES** ppRules,
StorageFactory** ppFactory,
HASHTABLE** ppPending)
CACHE_RULES** ppRules)
{
CACHE_RULES* pRules = NULL;
HASHTABLE* pPending = NULL;
StorageFactory* pFactory = NULL;
if (config.rules)
{
@ -101,128 +52,50 @@ bool Cache::Create(const CACHE_CONFIG& config,
}
if (pRules)
{
pPending = hashtable_alloc(CACHE_PENDING_ITEMS, hashfn, hashcmp);
if (pPending)
{
pFactory = StorageFactory::Open(config.storage);
if (!pFactory)
{
MXS_ERROR("Could not open storage factory '%s'.", config.storage);
}
}
}
bool rv = (pRules && pPending && pFactory);
if (rv)
{
*ppRules = pRules;
*ppPending = pPending;
*ppFactory = pFactory;
}
else
{
cache_rules_free(pRules);
hashtable_free(pPending);
delete pFactory;
MXS_ERROR("Could not create rules.");
}
return rv;
return pRules != NULL;
}
bool Cache::shouldStore(const char* zDefaultDb, const GWBUF* pQuery)
//static
bool Cache::Create(const CACHE_CONFIG& config,
CACHE_RULES** ppRules,
StorageFactory** ppFactory)
{
CACHE_RULES* pRules = NULL;
StorageFactory* pFactory = NULL;
if (Create(config, &pRules))
{
pFactory = StorageFactory::Open(config.storage);
if (pFactory)
{
*ppFactory = pFactory;
*ppRules = pRules;
}
else
{
MXS_ERROR("Could not open storage factory '%s'.", config.storage);
cache_rules_free(pRules);
}
}
return pFactory != NULL;
}
bool Cache::should_store(const char* zDefaultDb, const GWBUF* pQuery)
{
return cache_rules_should_store(m_pRules, zDefaultDb, pQuery);
}
bool Cache::shouldUse(const SESSION* pSession)
bool Cache::should_use(const SESSION* pSession)
{
return cache_rules_should_use(m_pRules, pSession);
}
bool Cache::mustRefresh(const char* pKey, const SessionCache* pSessionCache)
{
long key = hash_of_key(pKey);
spinlock_acquire(&m_lockPending);
// TODO: Remove the internal locking of hashtable. The internal
// TODO: locking is no good if you need transactional behaviour.
// TODO: Now we lock twice.
void *pValue = hashtable_fetch(m_pPending, (void*)pKey);
if (!pValue)
{
// It's not being fetched, so we make a note that we are.
hashtable_add(m_pPending, (void*)pKey, (void*)pSessionCache);
}
spinlock_release(&m_lockPending);
return pValue == NULL;
}
void Cache::refreshed(const char* pKey, const SessionCache* pSessionCache)
{
long key = hash_of_key(pKey);
spinlock_acquire(&m_lockPending);
ss_dassert(hashtable_fetch(m_pPending, (void*)pKey) == pSessionCache);
ss_debug(int n =) hashtable_delete(m_pPending, (void*)pKey);
ss_dassert(n == 1);
spinlock_release(&m_lockPending);
}
cache_result_t Cache::getKey(const char* zDefaultDb,
const GWBUF* pQuery,
char* pKey)
{
return m_pStorage->getKey(zDefaultDb, pQuery, pKey);
}
cache_result_t Cache::getValue(const char* pKey,
uint32_t flags,
GWBUF** ppValue)
{
return m_pStorage->getValue(pKey, flags, ppValue);
}
cache_result_t Cache::putValue(const char* pKey,
const GWBUF* pValue)
{
return m_pStorage->putValue(pKey, pValue);
}
cache_result_t Cache::delValue(const char* pKey)
{
return m_pStorage->delValue(pKey);
}
// protected
long Cache::hashOfKey(const char* pKey)
{
return hash_of_key(pKey);
}
// protected
bool Cache::mustRefresh(long key, const SessionCache* pSessionCache)
{
void *pValue = hashtable_fetch(m_pPending, (void*)key);
if (!pValue)
{
// It's not being fetched, so we make a note that we are.
hashtable_add(m_pPending, (void*)key, (void*)pSessionCache);
}
return !pValue;
}
// protected
void Cache::refreshed(long key, const SessionCache* pSessionCache)
{
ss_dassert(hashtable_fetch(m_pPending, (void*)key) == pSessionCache);
ss_debug(int n =) hashtable_delete(m_pPending, (void*)key);
ss_dassert(n == 1);
}

View File

@ -13,8 +13,9 @@
*/
#include <maxscale/cdefs.h>
#include <tr1/functional>
#include <string>
#include <maxscale/buffer.h>
#include <maxscale/filter.h>
#include <maxscale/session.h>
#include "cachefilter.h"
#include "cache_storage_api.h"
@ -24,7 +25,9 @@ class SessionCache;
class Cache
{
public:
~Cache();
virtual ~Cache();
const CACHE_CONFIG& config() const { return m_config; }
/**
* Returns whether the results of a particular query should be stored.
@ -34,7 +37,7 @@ public:
*
* @return True of the result should be cached.
*/
bool shouldStore(const char* zDefaultDb, const GWBUF* pQuery);
bool should_store(const char* zDefaultDb, const GWBUF* pQuery);
/**
* Returns whether cached results should be used.
@ -43,65 +46,54 @@ public:
*
* @return True of cached results should be used.
*/
bool shouldUse(const SESSION* pSession);
bool should_use(const SESSION* pSession);
/**
* Specifies whether a particular SessioCache should refresh the data.
*
* @param pKey The hashed key for a query.
* @param key The hashed key for a query.
* @param pSessionCache The session cache asking.
*
* @return True, if the session cache should refresh the data.
*/
virtual bool mustRefresh(const char* pKey, const SessionCache* pSessionCache);
virtual bool must_refresh(const CACHE_KEY& key, const SessionCache* pSessionCache) = 0;
/**
* To inform the cache that a particular item has been updated upon request.
*
* @param pKey The hashed key for a query.
* @param key The hashed key for a query.
* @param pSessionCache The session cache informing.
*/
virtual void refreshed(const char* pKey, const SessionCache* pSessionCache);
virtual void refreshed(const CACHE_KEY& key, const SessionCache* pSessionCache) = 0;
const CACHE_CONFIG& config() const { return m_config; }
virtual cache_result_t get_key(const char* zDefaultDb, const GWBUF* pQuery, CACHE_KEY* pKey) = 0;
cache_result_t getKey(const char* zDefaultDb, const GWBUF* pQuery, char* pKey);
virtual cache_result_t get_value(const CACHE_KEY& key, uint32_t flags, GWBUF** ppValue) = 0;
cache_result_t getValue(const char* pKey, uint32_t flags, GWBUF** ppValue);
virtual cache_result_t put_value(const CACHE_KEY& key, const GWBUF* pValue) = 0;
cache_result_t putValue(const char* pKey, const GWBUF* pValue);
cache_result_t delValue(const char* pKey);
virtual cache_result_t del_value(const CACHE_KEY& key) = 0;
protected:
Cache(const char* zName,
CACHE_CONFIG& config,
CACHE_RULES* pRules,
StorageFactory* pFactory,
Storage* pStorage,
HASHTABLE* pPending);
Cache(const std::string& name,
const CACHE_CONFIG* pConfig,
CACHE_RULES* pRules,
StorageFactory* pFactory);
static bool Create(const CACHE_CONFIG& config,
CACHE_RULES** ppRules);
static bool Create(const CACHE_CONFIG& config,
CACHE_RULES** ppRules,
StorageFactory** ppFactory,
HASHTABLE** ppPending);
long hashOfKey(const char* pKey);
bool mustRefresh(long key, const SessionCache* pSessionCache);
void refreshed(long key, const SessionCache* pSessionCache);
StorageFactory** ppFactory);
private:
Cache(const Cache&);
Cache& operator = (const Cache&);
protected:
const char* m_zName; // The name of the instance; the section name in the config.
CACHE_CONFIG m_config; // The configuration of the cache instance.
CACHE_RULES* m_pRules; // The rules of the cache instance.
StorageFactory* m_pFactory; // The storage factory.
Storage* m_pStorage; // The storage instance to use.
HASHTABLE* m_pPending; // Pending items; being fetched from the backend.
SPINLOCK m_lockPending; // Lock used for protecting 'pending'.
const std::string m_name; // The name of the instance; the section name in the config.
const CACHE_CONFIG& m_config; // The configuration of the cache instance.
CACHE_RULES* m_pRules; // The rules of the cache instance.
StorageFactory* m_pFactory; // The storage factory.
};

View File

@ -37,6 +37,12 @@ typedef enum cache_flags
CACHE_FLAGS_INCLUDE_STALE = 0x01,
} cache_flags_t;
typedef enum cache_thread_model
{
CACHE_THREAD_MODEL_ST,
CACHE_THREAD_MODEL_MT
} cache_thread_model_t;
typedef void* CACHE_STORAGE;
enum
@ -44,6 +50,11 @@ enum
CACHE_KEY_MAXLEN = 128
};
typedef struct cache_key
{
char data[CACHE_KEY_MAXLEN];
} CACHE_KEY;
typedef struct cache_storage_api
{
/**
@ -58,6 +69,10 @@ typedef struct cache_storage_api
* create the actual storage, initialize it and prepare to put and get
* cache items.
*
* @param model Whether the storage will be used in a single thread or
* multi thread context. In the latter case the storage must
* perform thread synchronization as appropriate, in the former
* case it need not.
* @param name The name of the cache instance.
* @param ttl Time to live; number of seconds the value is valid.
* @param argc The number of elements in the argv array.
@ -66,7 +81,8 @@ typedef struct cache_storage_api
* @return A new cache instance, or NULL if the instance could not be
* created.
*/
CACHE_STORAGE* (*createInstance)(const char *name,
CACHE_STORAGE* (*createInstance)(cache_thread_model_t model,
const char *name,
uint32_t ttl,
int argc, char* argv[]);
@ -82,19 +98,19 @@ typedef struct cache_storage_api
*
* @param storage Pointer to a CACHE_STORAGE.
* @param query An SQL query. Must be one contiguous buffer.
* @param key Pointer to array of CACHE_KEY_MAXLEN size where
* the key will be written.
* @param key Pointer to key.
*
* @return CACHE_RESULT_OK if a key was created, otherwise some error code.
*/
cache_result_t (*getKey)(CACHE_STORAGE* storage,
const char* default_db,
const GWBUF* query,
char* key);
CACHE_KEY* key);
/**
* Get a value from the cache.
*
* @param storage Pointer to a CACHE_STORAGE.
* @param key A key generated with getKey.
* @param key A key generated with get_key.
* @param flags Mask of cache_flags_t values.
* @param result Pointer to variable that after a successful return will
* point to a GWBUF.
@ -105,7 +121,7 @@ typedef struct cache_storage_api
* the ttl was reached), or some other error code.
*/
cache_result_t (*getValue)(CACHE_STORAGE* storage,
const char* key,
const CACHE_KEY* key,
uint32_t flags,
GWBUF** result);
@ -113,7 +129,7 @@ typedef struct cache_storage_api
* Put a value to the cache.
*
* @param storage Pointer to a CACHE_STORAGE.
* @param key A key generated with getKey.
* @param key A key generated with get_key.
* @param value Pointer to GWBUF containing the value to be stored.
* Must be one contiguous buffer.
* @return CACHE_RESULT_OK if item was successfully put,
@ -121,19 +137,19 @@ typedef struct cache_storage_api
* some resource having become exhausted, or some other error code.
*/
cache_result_t (*putValue)(CACHE_STORAGE* storage,
const char* key,
const CACHE_KEY* key,
const GWBUF* value);
/**
* Delete a value from the cache.
*
* @param storage Pointer to a CACHE_STORAGE.
* @param key A key generated with getKey.
* @param key A key generated with get_key.
* @return CACHE_RESULT_OK if item was successfully deleted. Note that
* CACHE_RESULT_OK may be returned also if the entry was not present.
*/
cache_result_t (*delValue)(CACHE_STORAGE* storage,
const char* key);
const CACHE_KEY* key);
} CACHE_STORAGE_API;
#define CACHE_STORAGE_ENTRY_POINT "CacheGetStorageAPI"

View File

@ -14,12 +14,16 @@
#define MXS_MODULE_NAME "cache"
#include "cachefilter.h"
#include <exception>
#include <new>
#include <maxscale/alloc.h>
#include <maxscale/filter.h>
#include <maxscale/gwdirs.h>
#include "cachemt.h"
#include "cachept.h"
#include "sessioncache.h"
using std::string;
static char VERSION_STRING[] = "V1.0.0";
static const CACHE_CONFIG DEFAULT_CONFIG =
@ -32,9 +36,32 @@ static const CACHE_CONFIG DEFAULT_CONFIG =
NULL,
0,
CACHE_DEFAULT_TTL,
CACHE_DEFAULT_DEBUG
CACHE_DEFAULT_DEBUG,
CACHE_DEFAULT_THREAD_MODEL,
};
typedef struct cache_filter
{
cache_filter()
: config(DEFAULT_CONFIG)
, pCache(NULL)
{
}
~cache_filter()
{
delete pCache;
cache_config_finish(config);
}
CACHE_CONFIG config;
Cache* pCache;
private:
cache_filter(const cache_filter&);
cache_filter& operator = (const cache_filter&);
} CACHE_FILTER;
static FILTER* createInstance(const char* zName, char** pzOptions, FILTER_PARAMETER** ppParams);
static void* newSession(FILTER* pInstance, SESSION* pSession);
static void closeSession(FILTER* pInstance, void* pSessionData);
@ -114,20 +141,38 @@ extern "C" FILTER_OBJECT *GetModuleObject()
*/
static FILTER *createInstance(const char* zName, char** pzOptions, FILTER_PARAMETER** ppParams)
{
Cache* pCache = NULL;
CACHE_CONFIG config = DEFAULT_CONFIG;
CACHE_FILTER* pFilter = new (std::nothrow) CACHE_FILTER;
if (process_params(pzOptions, ppParams, config))
if (pFilter)
{
CPP_GUARD(pCache = CacheMT::Create(zName, config));
if (!pCache)
if (process_params(pzOptions, ppParams, pFilter->config))
{
cache_config_finish(config);
switch (pFilter->config.thread_model)
{
case CACHE_THREAD_MODEL_MT:
MXS_NOTICE("Creating shared cache.");
CPP_GUARD(pFilter->pCache = CacheMT::Create(zName, &pFilter->config));
break;
case CACHE_THREAD_MODEL_ST:
MXS_NOTICE("Creating thread specific cache.");
CPP_GUARD(pFilter->pCache = CachePT::Create(zName, &pFilter->config));
break;
default:
ss_dassert(!true);
}
}
if (!pFilter->pCache)
{
cache_config_finish(pFilter->config);
delete pFilter;
pFilter = NULL;
}
}
return reinterpret_cast<FILTER*>(pCache);
return reinterpret_cast<FILTER*>(pFilter);
}
/**
@ -140,7 +185,8 @@ static FILTER *createInstance(const char* zName, char** pzOptions, FILTER_PARAME
*/
static void *newSession(FILTER* pInstance, SESSION* pSession)
{
Cache* pCache = reinterpret_cast<Cache*>(pInstance);
CACHE_FILTER *pFilter = reinterpret_cast<CACHE_FILTER*>(pInstance);
Cache* pCache = pFilter->pCache;
SessionCache* pSessionCache = NULL;
CPP_GUARD(pSessionCache = SessionCache::Create(pCache, pSession));
@ -422,6 +468,23 @@ static bool process_params(char **pzOptions, FILTER_PARAMETER **ppParams, CACHE_
error = true;
}
}
else if (strcmp(pParam->name, "cached_data") == 0)
{
if (strcmp(pParam->value, "shared") == 0)
{
config.thread_model = CACHE_THREAD_MODEL_MT;
}
else if (strcmp(pParam->value, "thread_specific") == 0)
{
config.thread_model = CACHE_THREAD_MODEL_ST;
}
else
{
MXS_ERROR("The value of the configuration entry '%s' must "
"be either 'shared' or 'thread_specific'.", pParam->name);
error = true;
}
}
else if (!filter_standard_parameter(pParam->name))
{
MXS_ERROR("Unknown configuration entry '%s'.", pParam->name);
@ -447,11 +510,7 @@ void cache_config_finish(CACHE_CONFIG& config)
MXS_FREE(config.rules);
MXS_FREE(config.storage);
MXS_FREE(config.storage_options);
for (int i = 0; i < config.storage_argc; ++i)
{
MXS_FREE(config.storage_argv[i]);
}
MXS_FREE(config.storage_argv); // The items need not be freed, they point into storage_options.
config.max_resultset_rows = 0;
config.max_resultset_size = 0;
@ -487,3 +546,59 @@ void cache_config_reset(CACHE_CONFIG& config)
{
memset(&config, 0, sizeof(config));
}
/**
* Hashes a CACHE_KEY to size_t.
*
* @param key The key to be hashed.
*
* @return The corresponding hash.
*/
size_t cache_key_hash(const CACHE_KEY& key)
{
size_t hash = 0;
const char* i = key.data;
const char* end = i + CACHE_KEY_MAXLEN;
while (i < end)
{
int c = *i;
hash = c + (hash << 6) + (hash << 16) - hash;
++i;
}
return hash;
}
/**
* Are two CACHE_KEYs equal.
*
* @param lhs One cache key.
* @param rhs Another cache key.
*
* @return True, if the keys are equal.
*/
bool cache_key_equal_to(const CACHE_KEY& lhs, const CACHE_KEY& rhs)
{
return memcmp(lhs.data, rhs.data, CACHE_KEY_MAXLEN) == 0;
}
std::string cache_key_to_string(const CACHE_KEY& key)
{
string s;
for (int i = 0; i < CACHE_KEY_MAXLEN; ++i)
{
char c = key.data[i];
if (!isprint(c))
{
c = '.';
}
s += c;
}
return s;
}

View File

@ -16,9 +16,12 @@
#include <maxscale/cdefs.h>
#include <limits.h>
#include <exception>
#include <tr1/functional>
#include <maxscale/hashtable.h>
#include <maxscale/spinlock.h>
#include "rules.h"
#include "cache_storage_api.h"
class Storage;
class StorageFactory;
@ -43,24 +46,60 @@ class StorageFactory;
#define CACHE_DEFAULT_TTL 10
// Integer value
#define CACHE_DEFAULT_DEBUG 0
// Thread model
#define CACHE_DEFAULT_THREAD_MODEL CACHE_THREAD_MODEL_MT
typedef struct cache_config
{
uint32_t max_resultset_rows;
uint32_t max_resultset_size;
char* rules;
char* storage;
char* storage_options;
char** storage_argv;
int storage_argc;
uint32_t ttl;
uint32_t debug;
uint32_t max_resultset_rows; /**< The maximum number of rows of a resultset for it to be cached. */
uint32_t max_resultset_size; /**< The maximum size of a resultset for it to be cached. */
char* rules; /**< Name of rules file. */
char* storage; /**< Name of storage module. */
char* storage_options; /**< Raw options for storage module. */
char** storage_argv; /**< Cooked options for storage module. */
int storage_argc; /**< Number of cooked options. */
uint32_t ttl; /**< Time to live. */
uint32_t debug; /**< Debug settings. */
cache_thread_model_t thread_model; /**< Thread model. */
} CACHE_CONFIG;
void cache_config_finish(CACHE_CONFIG& config);
void cache_config_free(CACHE_CONFIG* pConfig);
void cache_config_reset(CACHE_CONFIG& config);
size_t cache_key_hash(const CACHE_KEY& key);
bool cache_key_equal_to(const CACHE_KEY& lhs, const CACHE_KEY& rhs);
std::string cache_key_to_string(const CACHE_KEY& key);
namespace std
{
template<>
struct equal_to<CACHE_KEY>
{
bool operator()(const CACHE_KEY& lhs, const CACHE_KEY& rhs) const
{
return cache_key_equal_to(lhs, rhs);
}
};
namespace tr1
{
template<>
struct hash<CACHE_KEY>
{
size_t operator()(const CACHE_KEY& key) const
{
return cache_key_hash(key);
}
};
}
}
#define CPP_GUARD(statement)\
do { try { statement; } \
catch (const std::exception& x) { MXS_ERROR("Caught standard exception: %s", x.what()); }\

View File

@ -12,18 +12,15 @@
*/
#include "cachemt.h"
#include <new>
#include <maxscale/spinlock.h>
#include "storage.h"
#include "storagefactory.h"
CacheMT::CacheMT(const char* zName,
CACHE_CONFIG& config,
CacheMT::CacheMT(const std::string& name,
const CACHE_CONFIG* pConfig,
CACHE_RULES* pRules,
StorageFactory* pFactory,
Storage* pStorage,
HASHTABLE* pPending)
: Cache(zName, config, pRules, pFactory, pStorage, pPending)
Storage* pStorage)
: CacheSimple(name, pConfig, pRules, pFactory, pStorage)
{
spinlock_init(&m_lockPending);
}
@ -32,60 +29,86 @@ CacheMT::~CacheMT()
{
}
CacheMT* CacheMT::Create(const char* zName, CACHE_CONFIG& config)
CacheMT* CacheMT::Create(const std::string& name, const CACHE_CONFIG* pConfig)
{
ss_dassert(pConfig);
CacheMT* pCache = NULL;
CACHE_RULES* pRules = NULL;
HASHTABLE* pPending = NULL;
StorageFactory* pFactory = NULL;
if (Cache::Create(config, &pRules, &pFactory, &pPending))
if (CacheSimple::Create(*pConfig, &pRules, &pFactory))
{
uint32_t ttl = config.ttl;
int argc = config.storage_argc;
char** argv = config.storage_argv;
Storage* pStorage = pFactory->createStorage(zName, ttl, argc, argv);
if (pStorage)
{
CPP_GUARD(pCache = new CacheMT(zName,
config,
pRules,
pFactory,
pStorage,
pPending));
if (!pCache)
{
cache_rules_free(pRules);
hashtable_free(pPending);
delete pStorage;
delete pFactory;
}
}
pCache = Create(name, pConfig, pRules, pFactory);
}
return pCache;
}
bool CacheMT::mustRefresh(const char* pKey, const SessionCache* pSessionCache)
// static
CacheMT* CacheMT::Create(const std::string& name, StorageFactory* pFactory, const CACHE_CONFIG* pConfig)
{
long key = hashOfKey(pKey);
ss_dassert(pConfig);
ss_dassert(pFactory);
CacheMT* pCache = NULL;
CACHE_RULES* pRules = NULL;
if (CacheSimple::Create(*pConfig, &pRules))
{
pCache = Create(name, pConfig, pRules, pFactory);
}
return pCache;
}
bool CacheMT::must_refresh(const CACHE_KEY& key, const SessionCache* pSessionCache)
{
spinlock_acquire(&m_lockPending);
bool rv = Cache::mustRefresh(key, pSessionCache);
bool rv = CacheSimple::do_must_refresh(key, pSessionCache);
spinlock_release(&m_lockPending);
return rv;
}
void CacheMT::refreshed(const char* pKey, const SessionCache* pSessionCache)
void CacheMT::refreshed(const CACHE_KEY& key, const SessionCache* pSessionCache)
{
long key = hashOfKey(pKey);
spinlock_acquire(&m_lockPending);
Cache::refreshed(key, pSessionCache);
CacheSimple::do_refreshed(key, pSessionCache);
spinlock_release(&m_lockPending);
}
// static
CacheMT* CacheMT::Create(const std::string& name,
const CACHE_CONFIG* pConfig,
CACHE_RULES* pRules,
StorageFactory* pFactory)
{
CacheMT* pCache = NULL;
uint32_t ttl = pConfig->ttl;
int argc = pConfig->storage_argc;
char** argv = pConfig->storage_argv;
Storage* pStorage = pFactory->createStorage(CACHE_THREAD_MODEL_MT, name.c_str(), ttl, argc, argv);
if (pStorage)
{
CPP_GUARD(pCache = new CacheMT(name,
pConfig,
pRules,
pFactory,
pStorage));
if (!pCache)
{
delete pStorage;
cache_rules_free(pRules);
delete pFactory;
}
}
return pCache;
}

View File

@ -13,26 +13,32 @@
*/
#include <maxscale/cdefs.h>
#include "cache.h"
#include <maxscale/spinlock.h>
#include "cachesimple.h"
class CacheMT : public Cache
class CacheMT : public CacheSimple
{
public:
~CacheMT();
static CacheMT* Create(const char* zName, CACHE_CONFIG& config);
static CacheMT* Create(const std::string& name, const CACHE_CONFIG* pConfig);
static CacheMT* Create(const std::string& name, StorageFactory* pFactory, const CACHE_CONFIG* pConfig);
bool mustRefresh(const char* pKey, const SessionCache* pSessionCache);
bool must_refresh(const CACHE_KEY& key, const SessionCache* pSessionCache);
void refreshed(const char* pKey, const SessionCache* pSessionCache);
void refreshed(const CACHE_KEY& key, const SessionCache* pSessionCache);
private:
CacheMT(const char* zName,
CACHE_CONFIG& config,
CACHE_RULES* pRules,
StorageFactory* pFactory,
Storage* pStorage,
HASHTABLE* pPending);
CacheMT(const std::string& name,
const CACHE_CONFIG* pConfig,
CACHE_RULES* pRules,
StorageFactory* pFactory,
Storage* pStorage);
static CacheMT* Create(const std::string& name,
const CACHE_CONFIG* pConfig,
CACHE_RULES* pRules,
StorageFactory* pFactory);
private:
CacheMT(const CacheMT&);

189
server/modules/filter/cache/cachept.cc vendored Normal file
View File

@ -0,0 +1,189 @@
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl.
*
* Change Date: 2019-07-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include "cachept.h"
#include <maxscale/atomic.h>
#include <maxscale/platform.h>
#include "cachest.h"
#include "storagefactory.h"
using std::tr1::shared_ptr;
using std::string;
namespace
{
int u_current_thread_id = 0;
thread_local int u_thread_id = -1;
/**
* Get the thread index of the current thread.
*
* @return The index of the current thread.
*/
inline int thread_index()
{
// A value of -1 indicates that the value has not been initialized,
if (u_thread_id == -1)
{
u_thread_id = atomic_add(&u_current_thread_id, 1);
}
return u_thread_id;
}
}
CachePT::CachePT(const std::string& name,
const CACHE_CONFIG* pConfig,
CACHE_RULES* pRules,
StorageFactory* pFactory,
const Caches& caches)
: Cache(name, pConfig, pRules, pFactory)
, m_caches(caches)
{
}
CachePT::~CachePT()
{
}
// static
CachePT* CachePT::Create(const std::string& name, const CACHE_CONFIG* pConfig)
{
ss_dassert(pConfig);
CachePT* pCache = NULL;
CACHE_RULES* pRules = NULL;
StorageFactory* pFactory = NULL;
if (Cache::Create(*pConfig, &pRules, &pFactory))
{
pCache = Create(name, pConfig, pRules, pFactory);
}
return pCache;
}
// static
CachePT* CachePT::Create(const std::string& name,
StorageFactory* pFactory,
const CACHE_CONFIG* pConfig)
{
ss_dassert(pConfig);
CachePT* pCache = NULL;
CACHE_RULES* pRules = NULL;
if (Cache::Create(*pConfig, &pRules))
{
pCache = Create(name, pConfig, pRules, pFactory);
}
return pCache;
}
bool CachePT::must_refresh(const CACHE_KEY& key, const SessionCache* pSessionCache)
{
return thread_cache().must_refresh(key, pSessionCache);
}
void CachePT::refreshed(const CACHE_KEY& key, const SessionCache* pSessionCache)
{
thread_cache().refreshed(key, pSessionCache);
}
cache_result_t CachePT::get_key(const char* zDefaultDb, const GWBUF* pQuery, CACHE_KEY* pKey)
{
return thread_cache().get_key(zDefaultDb, pQuery, pKey);
}
cache_result_t CachePT::get_value(const CACHE_KEY& key, uint32_t flags, GWBUF** ppValue)
{
return thread_cache().get_value(key, flags, ppValue);
}
cache_result_t CachePT::put_value(const CACHE_KEY& key, const GWBUF* pValue)
{
return thread_cache().put_value(key, pValue);
}
cache_result_t CachePT::del_value(const CACHE_KEY& key)
{
return thread_cache().del_value(key);
}
// static
CachePT* CachePT::Create(const std::string& name,
const CACHE_CONFIG* pConfig,
CACHE_RULES* pRules,
StorageFactory* pFactory)
{
CachePT* pCache = NULL;
try
{
int n_threads = config_threadcount();
Caches caches;
bool error = false;
int i = 0;
while (!error && (i < n_threads))
{
char suffix[6]; // Enough for 99999 threads
sprintf(suffix, "%d", i);
string namest(name + "-" + suffix);
CacheST* pCacheST = 0;
CPP_GUARD(pCacheST = CacheST::Create(namest, pFactory, pConfig));
if (pCacheST)
{
shared_ptr<Cache> sCache(pCacheST);
caches.push_back(sCache);
}
else
{
error = true;
}
++i;
}
if (!error)
{
pCache = new CachePT(name, pConfig, pRules, pFactory, caches);
}
}
catch (const std::exception&)
{
cache_rules_free(pRules);
delete pFactory;
}
return pCache;
}
Cache& CachePT::thread_cache()
{
int i = thread_index();
ss_dassert(i < (int)m_caches.size());
return *m_caches[i].get();
}

63
server/modules/filter/cache/cachept.h vendored Normal file
View File

@ -0,0 +1,63 @@
#pragma once
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl.
*
* Change Date: 2019-07-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include <maxscale/cdefs.h>
#include <tr1/memory>
#include <vector>
#include "cache.h"
class CachePT : public Cache
{
public:
~CachePT();
static CachePT* Create(const std::string& name, const CACHE_CONFIG* pConfig);
static CachePT* Create(const std::string& name, StorageFactory* pFactory, const CACHE_CONFIG* pConfig);
bool must_refresh(const CACHE_KEY& key, const SessionCache* pSessionCache);
void refreshed(const CACHE_KEY& key, const SessionCache* pSessionCache);
cache_result_t get_key(const char* zDefaultDb, const GWBUF* pQuery, CACHE_KEY* pKey);
cache_result_t get_value(const CACHE_KEY& key, uint32_t flags, GWBUF** ppValue);
cache_result_t put_value(const CACHE_KEY& key, const GWBUF* pValue);
cache_result_t del_value(const CACHE_KEY& key);
private:
typedef std::tr1::shared_ptr<Cache> SCache;
typedef std::vector<SCache> Caches;
CachePT(const std::string& name,
const CACHE_CONFIG* pConfig,
CACHE_RULES* pRules,
StorageFactory* pFactory,
const Caches& caches);
static CachePT* Create(const std::string& name,
const CACHE_CONFIG* pConfig,
CACHE_RULES* pRules,
StorageFactory* pFactory);
Cache& thread_cache();
private:
CachePT(const Cache&);
CachePT& operator = (const CachePT&);
private:
Caches m_caches;
};

View File

@ -0,0 +1,123 @@
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl.
*
* Change Date: 2019-07-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include "cachesimple.h"
#include "storage.h"
#include "storagefactory.h"
CacheSimple::CacheSimple(const std::string& name,
const CACHE_CONFIG* pConfig,
CACHE_RULES* pRules,
StorageFactory* pFactory,
Storage* pStorage)
: Cache(name, pConfig, pRules, pFactory)
, m_pStorage(pStorage)
{
}
CacheSimple::~CacheSimple()
{
delete m_pStorage;
}
// static
bool CacheSimple::Create(const CACHE_CONFIG& config,
CACHE_RULES** ppRules)
{
int rv = false;
CACHE_RULES* pRules = NULL;
if (Cache::Create(config, &pRules))
{
*ppRules = pRules;
}
return pRules != NULL;;
}
// static
bool CacheSimple::Create(const CACHE_CONFIG& config,
CACHE_RULES** ppRules,
StorageFactory** ppFactory)
{
int rv = false;
CACHE_RULES* pRules = NULL;
StorageFactory* pFactory = NULL;
if (Cache::Create(config, &pRules, &pFactory))
{
*ppRules = pRules;
*ppFactory = pFactory;
}
return pRules != NULL;
}
cache_result_t CacheSimple::get_key(const char* zDefaultDb,
const GWBUF* pQuery,
CACHE_KEY* pKey)
{
return m_pStorage->get_key(zDefaultDb, pQuery, pKey);
}
cache_result_t CacheSimple::get_value(const CACHE_KEY& key,
uint32_t flags,
GWBUF** ppValue)
{
return m_pStorage->get_value(key, flags, ppValue);
}
cache_result_t CacheSimple::put_value(const CACHE_KEY& key,
const GWBUF* pValue)
{
return m_pStorage->put_value(key, pValue);
}
cache_result_t CacheSimple::del_value(const CACHE_KEY& key)
{
return m_pStorage->del_value(key);
}
// protected
bool CacheSimple::do_must_refresh(const CACHE_KEY& key, const SessionCache* pSessionCache)
{
bool rv = false;
Pending::iterator i = m_pending.find(key);
if (i == m_pending.end())
{
try
{
m_pending.insert(std::make_pair(key, pSessionCache));
rv = true;
}
catch (const std::exception& x)
{
rv = false;
}
}
return rv;
}
// protected
void CacheSimple::do_refreshed(const CACHE_KEY& key, const SessionCache* pSessionCache)
{
Pending::iterator i = m_pending.find(key);
ss_dassert(i != m_pending.end());
ss_dassert(i->second == pSessionCache);
m_pending.erase(i);
}

View File

@ -0,0 +1,63 @@
#pragma once
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl.
*
* Change Date: 2019-07-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include <maxscale/cdefs.h>
#include <tr1/unordered_map>
#include <maxscale/hashtable.h>
#include "cache.h"
class Storage;
class CacheSimple : public Cache
{
public:
~CacheSimple();
cache_result_t get_key(const char* zDefaultDb, const GWBUF* pQuery, CACHE_KEY* pKey);
cache_result_t get_value(const CACHE_KEY& key, uint32_t flags, GWBUF** ppValue);
cache_result_t put_value(const CACHE_KEY& key, const GWBUF* pValue);
cache_result_t del_value(const CACHE_KEY& key);
protected:
CacheSimple(const std::string& name,
const CACHE_CONFIG* pConfig,
CACHE_RULES* pRules,
StorageFactory* pFactory,
Storage* pStorage);
static bool Create(const CACHE_CONFIG& config,
CACHE_RULES** ppRules);
static bool Create(const CACHE_CONFIG& config,
CACHE_RULES** ppRules,
StorageFactory** ppFactory);
bool do_must_refresh(const CACHE_KEY& key, const SessionCache* pSessionCache);
void do_refreshed(const CACHE_KEY& key, const SessionCache* pSessionCache);
private:
CacheSimple(const Cache&);
CacheSimple& operator = (const CacheSimple&);
protected:
typedef std::tr1::unordered_map<CACHE_KEY, const SessionCache*> Pending;
Pending m_pending; // Pending items; being fetched from the backend.
Storage* m_pStorage; // The storage instance to use.
};

107
server/modules/filter/cache/cachest.cc vendored Normal file
View File

@ -0,0 +1,107 @@
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl.
*
* Change Date: 2019-07-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include "cachest.h"
#include "storage.h"
#include "storagefactory.h"
CacheST::CacheST(const std::string& name,
const CACHE_CONFIG* pConfig,
CACHE_RULES* pRules,
StorageFactory* pFactory,
Storage* pStorage)
: CacheSimple(name, pConfig, pRules, pFactory, pStorage)
{
}
CacheST::~CacheST()
{
}
CacheST* CacheST::Create(const std::string& name, const CACHE_CONFIG* pConfig)
{
ss_dassert(pConfig);
CacheST* pCache = NULL;
CACHE_RULES* pRules = NULL;
StorageFactory* pFactory = NULL;
if (CacheSimple::Create(*pConfig, &pRules, &pFactory))
{
pCache = Create(name, pConfig, pRules, pFactory);
}
return pCache;
}
// static
CacheST* CacheST::Create(const std::string& name, StorageFactory* pFactory, const CACHE_CONFIG* pConfig)
{
ss_dassert(pConfig);
ss_dassert(pFactory);
CacheST* pCache = NULL;
CACHE_RULES* pRules = NULL;
if (CacheSimple::Create(*pConfig, &pRules))
{
pCache = Create(name, pConfig, pRules, pFactory);
}
return pCache;
}
bool CacheST::must_refresh(const CACHE_KEY& key, const SessionCache* pSessionCache)
{
return CacheSimple::do_must_refresh(key, pSessionCache);
}
void CacheST::refreshed(const CACHE_KEY& key, const SessionCache* pSessionCache)
{
CacheSimple::do_refreshed(key, pSessionCache);
}
// static
CacheST* CacheST::Create(const std::string& name,
const CACHE_CONFIG* pConfig,
CACHE_RULES* pRules,
StorageFactory* pFactory)
{
CacheST* pCache = NULL;
uint32_t ttl = pConfig->ttl;
int argc = pConfig->storage_argc;
char** argv = pConfig->storage_argv;
Storage* pStorage = pFactory->createStorage(CACHE_THREAD_MODEL_ST, name.c_str(), ttl, argc, argv);
if (pStorage)
{
CPP_GUARD(pCache = new CacheST(name,
pConfig,
pRules,
pFactory,
pStorage));
if (!pCache)
{
delete pStorage;
cache_rules_free(pRules);
delete pFactory;
}
}
return pCache;
}

44
server/modules/filter/cache/cachest.h vendored Normal file
View File

@ -0,0 +1,44 @@
#pragma once
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl.
*
* Change Date: 2019-07-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include <maxscale/cdefs.h>
#include "cachesimple.h"
class CacheST : public CacheSimple
{
public:
~CacheST();
static CacheST* Create(const std::string& name, const CACHE_CONFIG* pConfig);
static CacheST* Create(const std::string& name, StorageFactory* pFactory, const CACHE_CONFIG* pConfig);
bool must_refresh(const CACHE_KEY& key, const SessionCache* pSessionCache);
void refreshed(const CACHE_KEY& key, const SessionCache* pSessionCache);
private:
CacheST(const std::string& name,
const CACHE_CONFIG* pConfig,
CACHE_RULES* pRules,
StorageFactory* pFactory,
Storage* pStorage);
static CacheST* Create(const std::string& name,
const CACHE_CONFIG* pConfig,
CACHE_RULES* pRules,
StorageFactory* pFactory);
private:
CacheST(const CacheST&);
CacheST& operator = (const CacheST&);
};

View File

@ -29,7 +29,7 @@ SessionCache::SessionCache(Cache* pCache, SESSION* pSession, char* zDefaultDb)
{
memset(&m_down, 0, sizeof(m_down));
memset(&m_up, 0, sizeof(m_up));
memset(m_key, 0, CACHE_KEY_MAXLEN);
memset(m_key.data, 0, CACHE_KEY_MAXLEN);
reset_response_state();
}
@ -138,9 +138,9 @@ int SessionCache::routeQuery(GWBUF* pPacket)
if ((session_is_autocommit(session) && !session_trx_is_active(session)) ||
session_trx_is_read_only(session))
{
if (m_pCache->shouldStore(m_zDefaultDb, pPacket))
if (m_pCache->should_store(m_zDefaultDb, pPacket))
{
if (m_pCache->shouldUse(m_pSession))
if (m_pCache->should_use(m_pSession))
{
GWBUF* pResponse;
cache_result_t result = get_cached_response(pPacket, &pResponse);
@ -152,7 +152,7 @@ int SessionCache::routeQuery(GWBUF* pPacket)
// The value was found, but it was stale. Now we need to
// figure out whether somebody else is already fetching it.
if (m_pCache->mustRefresh(m_key, this))
if (m_pCache->must_refresh(m_key, this))
{
// We were the first ones who hit the stale item. It's
// our responsibility now to fetch it.
@ -607,13 +607,13 @@ void SessionCache::reset_response_state()
*/
cache_result_t SessionCache::get_cached_response(const GWBUF *pQuery, GWBUF **ppResponse)
{
cache_result_t result = m_pCache->getKey(m_zDefaultDb, pQuery, m_key);
cache_result_t result = m_pCache->get_key(m_zDefaultDb, pQuery, &m_key);
if (result == CACHE_RESULT_OK)
{
uint32_t flags = CACHE_FLAGS_INCLUDE_STALE;
result = m_pCache->getValue(m_key, flags, ppResponse);
result = m_pCache->get_value(m_key, flags, ppResponse);
}
else
{
@ -638,13 +638,13 @@ void SessionCache::store_result()
{
m_res.pData = pData;
cache_result_t result = m_pCache->putValue(m_key, m_res.pData);
cache_result_t result = m_pCache->put_value(m_key, m_res.pData);
if (result != CACHE_RESULT_OK)
{
MXS_ERROR("Could not store cache item, deleting it.");
result = m_pCache->delValue(m_key);
result = m_pCache->del_value(m_key);
if ((result != CACHE_RESULT_OK) || (result != CACHE_RESULT_NOT_FOUND))
{

View File

@ -128,15 +128,15 @@ private:
SessionCache& operator = (const SessionCache&);
private:
cache_session_state_t m_state; /**< What state is the session in, what data is expected. */
Cache* m_pCache; /**< The cache instance the session is associated with. */
SESSION* m_pSession; /**< The session this data is associated with. */
DOWNSTREAM m_down; /**< The previous filter or equivalent. */
UPSTREAM m_up; /**< The next filter or equivalent. */
CACHE_RESPONSE_STATE m_res; /**< The response state. */
char m_key[CACHE_KEY_MAXLEN]; /**< Key storage. */
char* m_zDefaultDb; /**< The default database. */
char* m_zUseDb; /**< Pending default database. Needs server response. */
bool m_refreshing; /**< Whether the session is updating a stale cache entry. */
cache_session_state_t m_state; /**< What state is the session in, what data is expected. */
Cache* m_pCache; /**< The cache instance the session is associated with. */
SESSION* m_pSession; /**< The session this data is associated with. */
DOWNSTREAM m_down; /**< The previous filter or equivalent. */
UPSTREAM m_up; /**< The next filter or equivalent. */
CACHE_RESPONSE_STATE m_res; /**< The response state. */
CACHE_KEY m_key; /**< Key storage. */
char* m_zDefaultDb; /**< The default database. */
char* m_zUseDb; /**< Pending default database. Needs server response. */
bool m_refreshing; /**< Whether the session is updating a stale cache entry. */
};

View File

@ -15,39 +15,10 @@
#include "storage.h"
Storage::Storage(CACHE_STORAGE_API* pApi, CACHE_STORAGE* pStorage)
: m_pApi(pApi)
, m_pStorage(pStorage)
Storage::Storage()
{
ss_dassert(m_pApi);
ss_dassert(m_pStorage);
}
Storage::~Storage()
{
}
cache_result_t Storage::getKey(const char* zDefaultDb,
const GWBUF* pQuery,
char* pKey)
{
return m_pApi->getKey(m_pStorage, zDefaultDb, pQuery, pKey);
}
cache_result_t Storage::getValue(const char* pKey,
uint32_t flags,
GWBUF** ppValue)
{
return m_pApi->getValue(m_pStorage, pKey, flags, ppValue);
}
cache_result_t Storage::putValue(const char* pKey,
const GWBUF* pValue)
{
return m_pApi->putValue(m_pStorage, pKey, pValue);
}
cache_result_t Storage::delValue(const char* pKey)
{
return m_pApi->delValue(m_pStorage, pKey);
}

View File

@ -1,6 +1,4 @@
#pragma once
#ifndef _MAXSCALE_FILTER_CACHE_STORAGE_H
#define _MAXSCALE_FILTER_CACHE_STORAGE_H
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
@ -20,32 +18,24 @@
class Storage
{
public:
~Storage();
virtual ~Storage();
cache_result_t getKey(const char* zDefaultDb,
const GWBUF* pQuery,
char* pKey);
virtual cache_result_t get_key(const char* zDefaultDb,
const GWBUF* pQuery,
CACHE_KEY* pKey) = 0;
cache_result_t getValue(const char* pKey,
uint32_t flags,
GWBUF** ppValue);
virtual cache_result_t get_value(const CACHE_KEY& key,
uint32_t flags,
GWBUF** ppValue) = 0;
cache_result_t putValue(const char* pKey,
const GWBUF* pValue);
virtual cache_result_t put_value(const CACHE_KEY& key,
const GWBUF* pValue) = 0;
cache_result_t delValue(const char* pKey);
virtual cache_result_t del_value(const CACHE_KEY& key) = 0;
private:
friend class StorageFactory;
Storage(CACHE_STORAGE_API* pApi, CACHE_STORAGE* pStorage);
protected:
Storage();
Storage(const Storage&);
Storage& operator = (const Storage&);
private:
CACHE_STORAGE_API* m_pApi;
CACHE_STORAGE* m_pStorage;
};
#endif

View File

@ -345,7 +345,7 @@ RocksDBStorage* RocksDBStorage::Create(const string& storageDirectory, const cha
return pStorage;
}
cache_result_t RocksDBStorage::getKey(const char* zDefaultDB, const GWBUF* pQuery, char* pKey)
cache_result_t RocksDBStorage::getKey(const char* zDefaultDB, const GWBUF* pQuery, CACHE_KEY* pKey)
{
ss_dassert(GWBUF_IS_CONTIGUOUS(pQuery));
@ -380,7 +380,7 @@ cache_result_t RocksDBStorage::getKey(const char* zDefaultDB, const GWBUF* pQuer
string tag;
for_each(dbs.begin(), dbs.end(), [&tag](const string& db) { tag.append(db); });
memset(pKey, 0, CACHE_KEY_MAXLEN);
memset(pKey->data, 0, CACHE_KEY_MAXLEN);
const unsigned char* pData;
@ -389,7 +389,7 @@ cache_result_t RocksDBStorage::getKey(const char* zDefaultDB, const GWBUF* pQuer
// This will also mean that entries related to the same databases will
// be placed near each other.
pData = reinterpret_cast<const unsigned char*>(tag.data());
SHA512(pData, tag.length(), reinterpret_cast<unsigned char*>(pKey));
SHA512(pData, tag.length(), reinterpret_cast<unsigned char*>(pKey->data));
char *pSql;
int length;
@ -398,16 +398,16 @@ cache_result_t RocksDBStorage::getKey(const char* zDefaultDB, const GWBUF* pQuer
// Then we store the query itself in the second half of the key.
pData = reinterpret_cast<const unsigned char*>(pSql);
SHA512(pData, length, reinterpret_cast<unsigned char*>(pKey) + SHA512_DIGEST_LENGTH);
SHA512(pData, length, reinterpret_cast<unsigned char*>(pKey->data) + SHA512_DIGEST_LENGTH);
return CACHE_RESULT_OK;
}
cache_result_t RocksDBStorage::getValue(const char* pKey, uint32_t flags, GWBUF** ppResult)
cache_result_t RocksDBStorage::getValue(const CACHE_KEY* pKey, uint32_t flags, GWBUF** ppResult)
{
// Use the root DB so that we get the value *with* the timestamp at the end.
rocksdb::DB* pDb = m_sDb->GetRootDB();
rocksdb::Slice key(pKey, ROCKSDB_KEY_LENGTH);
rocksdb::Slice key(pKey->data, ROCKSDB_KEY_LENGTH);
string value;
rocksdb::Status status = pDb->Get(rocksdb::ReadOptions(), key, &value);
@ -465,11 +465,11 @@ cache_result_t RocksDBStorage::getValue(const char* pKey, uint32_t flags, GWBUF*
return result;
}
cache_result_t RocksDBStorage::putValue(const char* pKey, const GWBUF* pValue)
cache_result_t RocksDBStorage::putValue(const CACHE_KEY* pKey, const GWBUF* pValue)
{
ss_dassert(GWBUF_IS_CONTIGUOUS(pValue));
rocksdb::Slice key(pKey, ROCKSDB_KEY_LENGTH);
rocksdb::Slice key(pKey->data, ROCKSDB_KEY_LENGTH);
rocksdb::Slice value((char*)GWBUF_DATA(pValue), GWBUF_LENGTH(pValue));
rocksdb::Status status = m_sDb->Put(writeOptions(), key, value);
@ -477,11 +477,11 @@ cache_result_t RocksDBStorage::putValue(const char* pKey, const GWBUF* pValue)
return status.ok() ? CACHE_RESULT_OK : CACHE_RESULT_ERROR;
}
cache_result_t RocksDBStorage::delValue(const char* pKey)
cache_result_t RocksDBStorage::delValue(const CACHE_KEY* pKey)
{
ss_dassert(pKey);
rocksdb::Slice key(pKey, ROCKSDB_KEY_LENGTH);
rocksdb::Slice key(pKey->data, ROCKSDB_KEY_LENGTH);
rocksdb::Status status = m_sDb->Delete(writeOptions(), key);

View File

@ -29,10 +29,10 @@ public:
static RocksDBStorage* Create(const char* zName, uint32_t ttl, int argc, char* argv[]);
~RocksDBStorage();
cache_result_t getKey(const char* zDefaultDB, const GWBUF* pQuery, char* pKey);
cache_result_t getValue(const char* pKey, uint32_t flags, GWBUF** ppResult);
cache_result_t putValue(const char* pKey, const GWBUF* pValue);
cache_result_t delValue(const char* pKey);
cache_result_t getKey(const char* zDefaultDB, const GWBUF* pQuery, CACHE_KEY* pKey);
cache_result_t getValue(const CACHE_KEY* pKey, uint32_t flags, GWBUF** ppResult);
cache_result_t putValue(const CACHE_KEY* pKey, const GWBUF* pValue);
cache_result_t delValue(const CACHE_KEY* pKey);
private:
RocksDBStorage(std::unique_ptr<rocksdb::DBWithTTL>& sDb,

View File

@ -23,7 +23,10 @@ bool initialize()
return RocksDBStorage::Initialize();
}
CACHE_STORAGE* createInstance(const char* zName, uint32_t ttl, int argc, char* argv[])
CACHE_STORAGE* createInstance(cache_thread_model_t, // Ignored, RocksDB always MT safe.
const char* zName,
uint32_t ttl,
int argc, char* argv[])
{
ss_dassert(zName);
@ -57,7 +60,7 @@ void freeInstance(CACHE_STORAGE* pInstance)
cache_result_t getKey(CACHE_STORAGE* pStorage,
const char* zDefaultDB,
const GWBUF* pQuery,
char* pKey)
CACHE_KEY* pKey)
{
ss_dassert(pStorage);
// zDefaultDB may be NULL.
@ -87,7 +90,7 @@ cache_result_t getKey(CACHE_STORAGE* pStorage,
}
cache_result_t getValue(CACHE_STORAGE* pStorage,
const char* pKey,
const CACHE_KEY* pKey,
uint32_t flags,
GWBUF** ppResult)
{
@ -118,7 +121,7 @@ cache_result_t getValue(CACHE_STORAGE* pStorage,
}
cache_result_t putValue(CACHE_STORAGE* pStorage,
const char* pKey,
const CACHE_KEY* pKey,
const GWBUF* pValue)
{
ss_dassert(pStorage);
@ -148,7 +151,7 @@ cache_result_t putValue(CACHE_STORAGE* pStorage,
}
cache_result_t delValue(CACHE_STORAGE* pStorage,
const char* pKey)
const CACHE_KEY* pKey)
{
ss_dassert(pStorage);
ss_dassert(pKey);

View File

@ -19,7 +19,8 @@
#include <maxscale/alloc.h>
#include <maxscale/gwdirs.h>
#include <maxscale/log_manager.h>
#include "storage.h"
#include "cachefilter.h"
#include "storagereal.h"
namespace
@ -120,7 +121,7 @@ StorageFactory* StorageFactory::Open(const char* zName)
if (open_cache_storage(zName, &handle, &pApi))
{
pFactory = new (std::nothrow) StorageFactory(handle, pApi);
CPP_GUARD(pFactory = new StorageFactory(handle, pApi));
if (!pFactory)
{
@ -131,7 +132,8 @@ StorageFactory* StorageFactory::Open(const char* zName)
return pFactory;
}
Storage* StorageFactory::createStorage(const char* zName,
Storage* StorageFactory::createStorage(cache_thread_model_t model,
const char* zName,
uint32_t ttl,
int argc, char* argv[])
{
@ -139,11 +141,11 @@ Storage* StorageFactory::createStorage(const char* zName,
ss_dassert(m_pApi);
Storage* pStorage = 0;
CACHE_STORAGE* pRawStorage = m_pApi->createInstance(zName, ttl, argc, argv);
CACHE_STORAGE* pRawStorage = m_pApi->createInstance(model, zName, ttl, argc, argv);
if (pRawStorage)
{
pStorage = new (std::nothrow) Storage(m_pApi, pRawStorage);
CPP_GUARD(pStorage = new StorageReal(m_pApi, pRawStorage));
if (!pStorage)
{

View File

@ -26,7 +26,8 @@ public:
static StorageFactory* Open(const char* zName);
Storage* createStorage(const char* zName,
Storage* createStorage(cache_thread_model_t model,
const char* zName,
uint32_t ttl,
int argc, char* argv[]);

View File

@ -0,0 +1,53 @@
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl.
*
* Change Date: 2019-07-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#define MXS_MODULE_NAME "cache"
#include "storagereal.h"
StorageReal::StorageReal(CACHE_STORAGE_API* pApi, CACHE_STORAGE* pStorage)
: m_pApi(pApi)
, m_pStorage(pStorage)
{
ss_dassert(m_pApi);
ss_dassert(m_pStorage);
}
StorageReal::~StorageReal()
{
}
cache_result_t StorageReal::get_key(const char* zDefaultDb,
const GWBUF* pQuery,
CACHE_KEY* pKey)
{
return m_pApi->getKey(m_pStorage, zDefaultDb, pQuery, pKey);
}
cache_result_t StorageReal::get_value(const CACHE_KEY& key,
uint32_t flags,
GWBUF** ppValue)
{
return m_pApi->getValue(m_pStorage, &key, flags, ppValue);
}
cache_result_t StorageReal::put_value(const CACHE_KEY& key,
const GWBUF* pValue)
{
return m_pApi->putValue(m_pStorage, &key, pValue);
}
cache_result_t StorageReal::del_value(const CACHE_KEY& key)
{
return m_pApi->delValue(m_pStorage, &key);
}

View File

@ -0,0 +1,47 @@
#pragma once
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl.
*
* Change Date: 2019-07-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include <maxscale/cdefs.h>
#include "storage.h"
class StorageReal : public Storage
{
public:
~StorageReal();
cache_result_t get_key(const char* zDefaultDb,
const GWBUF* pQuery,
CACHE_KEY* pKey);
cache_result_t get_value(const CACHE_KEY& key,
uint32_t flags,
GWBUF** ppValue);
cache_result_t put_value(const CACHE_KEY& key,
const GWBUF* pValue);
cache_result_t del_value(const CACHE_KEY& key);
private:
friend class StorageFactory;
StorageReal(CACHE_STORAGE_API* pApi, CACHE_STORAGE* pStorage);
StorageReal(const StorageReal&);
StorageReal& operator = (const StorageReal&);
private:
CACHE_STORAGE_API* m_pApi;
CACHE_STORAGE* m_pStorage;
};

View File

@ -207,7 +207,6 @@ typedef struct queryspeed_t
int limit; /*< Maximum number of queries */
long id; /*< Unique id of the rule */
bool active; /*< If the rule has been triggered */
struct queryspeed_t* next; /*< Next node in the list */
} QUERYSPEED;
/**
@ -287,6 +286,7 @@ typedef struct
{
SESSION* session; /*< Client session structure */
char* errmsg; /*< Rule specific error message */
QUERYSPEED* query_speed; /*< How fast the user has executed queries */
DOWNSTREAM down; /*< Next object in the downstream chain */
UPSTREAM up; /*< Next object in the upstream chain */
} FW_SESSION;
@ -1627,10 +1627,8 @@ static void
freeSession(FILTER *instance, void *session)
{
FW_SESSION *my_session = (FW_SESSION *) session;
if (my_session->errmsg)
{
MXS_FREE(my_session->errmsg);
}
MXS_FREE(my_session->errmsg);
MXS_FREE(my_session->query_speed);
MXS_FREE(my_session);
}
@ -1814,6 +1812,74 @@ static char* create_parse_error(FW_INSTANCE* my_instance,
return msg;
}
bool handle_throttle_rule(FW_SESSION* my_session, RULE_BOOK *rulebook, char **msg)
{
bool matches = false;
QUERYSPEED* rule_qs = (QUERYSPEED*)rulebook->rule->data;
QUERYSPEED* queryspeed = my_session->query_speed;
time_t time_now = time(NULL);
char emsg[512];
if (queryspeed == NULL)
{
/**No match found*/
queryspeed = (QUERYSPEED*)MXS_CALLOC(1, sizeof(QUERYSPEED));
MXS_ABORT_IF_NULL(queryspeed);
queryspeed->period = rule_qs->period;
queryspeed->cooldown = rule_qs->cooldown;
queryspeed->limit = rule_qs->limit;
my_session->query_speed = queryspeed;
}
if (queryspeed->active)
{
if (difftime(time_now, queryspeed->triggered) < queryspeed->cooldown)
{
double blocked_for = queryspeed->cooldown - difftime(time_now, queryspeed->triggered);
sprintf(emsg, "Queries denied for %f seconds", blocked_for);
*msg = MXS_STRDUP_A(emsg);
matches = true;
MXS_INFO("dbfwfilter: rule '%s': user denied for %f seconds",
rulebook->rule->name, blocked_for);
}
else
{
queryspeed->active = false;
queryspeed->count = 0;
}
}
else
{
if (queryspeed->count >= queryspeed->limit)
{
MXS_INFO("dbfwfilter: rule '%s': query limit triggered (%d queries in %d seconds), "
"denying queries from user for %d seconds.", rulebook->rule->name,
queryspeed->limit, queryspeed->period, queryspeed->cooldown);
queryspeed->triggered = time_now;
queryspeed->active = true;
matches = true;
double blocked_for = queryspeed->cooldown - difftime(time_now, queryspeed->triggered);
sprintf(emsg, "Queries denied for %f seconds", blocked_for);
*msg = MXS_STRDUP_A(emsg);
}
else if (queryspeed->count > 0 &&
difftime(time_now, queryspeed->first_query) <= queryspeed->period)
{
queryspeed->count++;
}
else
{
queryspeed->first_query = time_now;
queryspeed->count = 1;
}
}
return matches;
}
/**
* Check if a query matches a single rule
* @param my_instance Fwfilter instance
@ -1830,19 +1896,12 @@ bool rule_matches(FW_INSTANCE* my_instance,
RULE_BOOK *rulebook,
char* query)
{
char *ptr, *msg = NULL;
char *msg = NULL;
char emsg[512];
unsigned char* memptr = (unsigned char*) queue->start;
bool is_sql, is_real, matches;
qc_query_op_t optype = QUERY_OP_UNDEFINED;
QUERYSPEED* queryspeed = NULL;
QUERYSPEED* rule_qs = NULL;
time_t time_now;
struct tm tm_now;
time(&time_now);
localtime_r(&time_now, &tm_now);
bool matches = false;
bool is_sql = modutil_is_SQL(queue) || modutil_is_SQL_prepare(queue);
bool is_real = false;
matches = false;
is_sql = modutil_is_SQL(queue) || modutil_is_SQL_prepare(queue);
@ -1885,10 +1944,6 @@ bool rule_matches(FW_INSTANCE* my_instance,
}
}
}
else
{
is_real = false;
}
if (rulebook->rule->on_queries == QUERY_OP_UNDEFINED ||
rulebook->rule->on_queries & optype ||
@ -1898,6 +1953,7 @@ bool rule_matches(FW_INSTANCE* my_instance,
switch (rulebook->rule->type)
{
case RT_UNDEFINED:
ss_dassert(false);
MXS_ERROR("Undefined rule type found.");
break;
@ -1920,7 +1976,6 @@ bool rule_matches(FW_INSTANCE* my_instance,
{
msg = MXS_STRDUP_A("Permission denied, query matched regular expression.");
MXS_INFO("dbfwfilter: rule '%s': regex matched on query", rulebook->rule->name);
goto queryresolved;
}
}
else
@ -1932,14 +1987,9 @@ bool rule_matches(FW_INSTANCE* my_instance,
break;
case RT_PERMISSION:
{
matches = true;
msg = MXS_STRDUP_A("Permission denied at this time.");
char buffer[32]; // asctime documentation requires 26
asctime_r(&tm_now, buffer);
MXS_INFO("dbfwfilter: rule '%s': query denied at: %s", rulebook->rule->name, buffer);
goto queryresolved;
}
MXS_INFO("dbfwfilter: rule '%s': query denied at this time.", rulebook->rule->name);
break;
case RT_COLUMN:
@ -1964,7 +2014,7 @@ bool rule_matches(FW_INSTANCE* my_instance,
MXS_INFO("dbfwfilter: rule '%s': query targets forbidden column: %s",
rulebook->rule->name, strln->value);
msg = MXS_STRDUP_A(emsg);
goto queryresolved;
break;
}
strln = strln->next;
}
@ -1989,98 +2039,13 @@ bool rule_matches(FW_INSTANCE* my_instance,
msg = MXS_STRDUP_A("Usage of wildcard denied.");
MXS_INFO("dbfwfilter: rule '%s': query contains a wildcard.",
rulebook->rule->name);
goto queryresolved;
}
}
}
break;
case RT_THROTTLE:
/**
* Check if this is the first time this rule is matched and if so, allocate
* and initialize a new QUERYSPEED struct for this session.
*/
spinlock_acquire(&my_instance->lock);
rule_qs = (QUERYSPEED*) rulebook->rule->data;
spinlock_release(&my_instance->lock);
spinlock_acquire(&user->lock);
queryspeed = user->qs_limit;
spinlock_release(&user->lock);
while (queryspeed)
{
if (queryspeed->id == rule_qs->id)
{
break;
}
queryspeed = queryspeed->next;
}
if (queryspeed == NULL)
{
/**No match found*/
queryspeed = (QUERYSPEED*) MXS_CALLOC(1, sizeof(QUERYSPEED));
MXS_ABORT_IF_NULL(queryspeed);
queryspeed->period = rule_qs->period;
queryspeed->cooldown = rule_qs->cooldown;
queryspeed->limit = rule_qs->limit;
queryspeed->id = rule_qs->id;
queryspeed->next = user->qs_limit;
user->qs_limit = queryspeed;
}
if (queryspeed->active)
{
if (difftime(time_now, queryspeed->triggered) < queryspeed->cooldown)
{
double blocked_for =
queryspeed->cooldown - difftime(time_now, queryspeed->triggered);
sprintf(emsg, "Queries denied for %f seconds", blocked_for);
MXS_INFO("dbfwfilter: rule '%s': user denied for %f seconds",
rulebook->rule->name, blocked_for);
msg = MXS_STRDUP_A(emsg);
matches = true;
}
else
{
queryspeed->active = false;
queryspeed->count = 0;
}
}
else
{
if (queryspeed->count >= queryspeed->limit)
{
queryspeed->triggered = time_now;
matches = true;
queryspeed->active = true;
MXS_INFO("dbfwfilter: rule '%s': query limit triggered (%d queries in %d seconds), "
"denying queries from user for %d seconds.",
rulebook->rule->name,
queryspeed->limit,
queryspeed->period,
queryspeed->cooldown);
double blocked_for =
queryspeed->cooldown - difftime(time_now, queryspeed->triggered);
sprintf(emsg, "Queries denied for %f seconds", blocked_for);
msg = MXS_STRDUP_A(emsg);
}
else if (queryspeed->count > 0 &&
difftime(time_now, queryspeed->first_query) <= queryspeed->period)
{
queryspeed->count++;
}
else
{
queryspeed->first_query = time_now;
queryspeed->count = 1;
}
}
matches = handle_throttle_rule(my_session, rulebook, &msg);
break;
case RT_CLAUSE:

View File

@ -50,6 +50,7 @@
#include <binlog_common.h>
#include <avro/errors.h>
#include <maxscale/alloc.h>
#include <maxscale/modulecmd.h>
#ifndef BINLOG_NAMEFMT
#define BINLOG_NAMEFMT "%s.%06d"
@ -93,6 +94,7 @@ bool avro_save_conversion_state(AVRO_INSTANCE *router);
static void stats_func(void *);
void avro_index_file(AVRO_INSTANCE *router, const char* path);
void avro_update_index(AVRO_INSTANCE* router);
static bool conversion_task_ctl(AVRO_INSTANCE *inst, bool start);
/** The module object definition */
static ROUTER_OBJECT MyObject =
@ -123,6 +125,26 @@ version()
return version_str;
}
bool avro_handle_convert(const MODULECMD_ARG *args)
{
bool rval = false;
if (strcmp(args->argv[1].value.string, "start") == 0 &&
conversion_task_ctl(args->argv[0].value.service->router_instance, true))
{
MXS_NOTICE("Started conversion for service '%s'.", args->argv[0].value.service->name);
rval = true;
}
else if (strcmp(args->argv[1].value.string, "stop") == 0 &&
conversion_task_ctl(args->argv[0].value.service->router_instance, false))
{
MXS_NOTICE("Stopped conversion for service '%s'.", args->argv[0].value.service->name);
rval = true;
}
return rval;
}
/**
* The module initialisation routine, called when the module
* is first loaded.
@ -133,6 +155,13 @@ ModuleInit()
MXS_NOTICE("Initialized avrorouter module %s.\n", version_str);
spinlock_init(&instlock);
instances = NULL;
modulecmd_arg_type_t args[] =
{
{ MODULECMD_ARG_SERVICE, "The avrorouter service" },
{ MODULECMD_ARG_STRING, "Action, whether to 'start' or 'stop' the conversion process" }
};
modulecmd_register_command("avrorouter", "convert", avro_handle_convert, 2, args);
}
/**
@ -224,22 +253,29 @@ bool create_tables(sqlite3* handle)
return true;
}
static bool add_conversion_task(AVRO_INSTANCE *inst)
static bool conversion_task_ctl(AVRO_INSTANCE *inst, bool start)
{
char tasknm[strlen(avro_task_name) + strlen(inst->service->name) + 2];
snprintf(tasknm, sizeof(tasknm), "%s-%s", inst->service->name, avro_task_name);
if (inst->service->svc_do_shutdown)
bool rval = false;
if (!inst->service->svc_do_shutdown)
{
MXS_INFO("AVRO converter task is not added due to MaxScale shutdown");
return false;
char tasknm[strlen(avro_task_name) + strlen(inst->service->name) + 2];
snprintf(tasknm, sizeof(tasknm), "%s-%s", inst->service->name, avro_task_name);
/** Remove old task and create a new one */
hktask_remove(tasknm);
if (!start || hktask_add(tasknm, converter_func, inst, inst->task_delay))
{
rval = true;
}
else
{
MXS_ERROR("Failed to add binlog to Avro conversion task to housekeeper.");
}
}
MXS_INFO("Setting task for converter_func");
if (hktask_oneshot(tasknm, converter_func, inst, inst->task_delay) == 0)
{
MXS_ERROR("Failed to add binlog to Avro conversion task to housekeeper.");
return false;
}
return true;
return rval;
}
/**
@ -566,7 +602,7 @@ createInstance(SERVICE *service, char **options)
*/
/* Start the scan, read, convert AVRO task */
add_conversion_task(inst);
conversion_task_ctl(inst, true);
MXS_INFO("AVRO: current MySQL binlog file is %s, pos is %lu\n",
inst->binlog_name, inst->current_pos);
@ -1047,7 +1083,7 @@ void converter_func(void* data)
if (binlog_end == AVRO_LAST_FILE)
{
router->task_delay = MXS_MIN(router->task_delay + 1, AVRO_TASK_DELAY_MAX);
if (add_conversion_task(router))
if (conversion_task_ctl(router, true))
{
MXS_INFO("Stopped processing file %s at position %lu. Waiting until"
" more data is written before continuing. Next check in %d seconds.",

View File

@ -423,6 +423,19 @@ static void shutdown_server()
static void shutdown_service(DCB *dcb, SERVICE *service);
static void shutdown_monitor(DCB *dcb, MONITOR *monitor);
static void
shutdown_listener(DCB *dcb, SERVICE *service, const char *name)
{
if (serviceStopListener(service, name))
{
dcb_printf(dcb, "Stopped listener '%s'\n", name);
}
else
{
dcb_printf(dcb, "Failed to stop listener '%s'\n", name);
}
}
/**
* The subcommands of the shutdown command
*/
@ -452,6 +465,14 @@ struct subcommand shutdownoptions[] =
"E.g. shutdown service \"Sales Database\"",
{ARG_TYPE_SERVICE, 0, 0}
},
{
"listener",
2, 2,
shutdown_listener,
"Stop a listener",
"E.g. shutdown listener \"RW Service\" \"RW Listener\"",
{ARG_TYPE_SERVICE, ARG_TYPE_STRING}
},
{
EMPTY_OPTION
}
@ -487,6 +508,20 @@ struct subcommand syncoptions[] =
static void restart_service(DCB *dcb, SERVICE *service);
static void restart_monitor(DCB *dcb, MONITOR *monitor);
static void
restart_listener(DCB *dcb, SERVICE *service, const char *name)
{
if (serviceStartListener(service, name))
{
dcb_printf(dcb, "Restarted listener '%s'\n", name);
}
else
{
dcb_printf(dcb, "Failed to restart listener '%s'\n", name);
}
}
/**
* The subcommands of the restart command
*/
@ -504,6 +539,12 @@ struct subcommand restartoptions[] =
"E.g. restart service \"Sales Database\"",
{ARG_TYPE_SERVICE, 0, 0}
},
{
"listener", 2, 2, restart_listener,
"Restart a listener",
"E.g. restart listener \"RW Service\" \"RW Listener\"",
{ARG_TYPE_SERVICE, ARG_TYPE_STRING}
},
{ EMPTY_OPTION }
};

View File

@ -359,29 +359,35 @@ execute(ROUTER *rinstance, void *router_session, GWBUF *queue)
return 1;
}
// We have a complete request in a signle buffer
int rc = 1;
// We have a complete request in a single buffer
if (modutil_MySQL_Query(queue, &sql, &len, &residual))
{
sql = strndup(sql, len);
int rc = maxinfo_execute_query(instance, session, sql);
rc = maxinfo_execute_query(instance, session, sql);
MXS_FREE(sql);
return rc;
}
else
{
switch (MYSQL_COMMAND(queue))
{
case COM_PING:
return maxinfo_ping(instance, session, queue);
rc = maxinfo_ping(instance, session, queue);
break;
case COM_STATISTICS:
return maxinfo_statistics(instance, session, queue);
rc = maxinfo_statistics(instance, session, queue);
break;
case COM_QUIT:
break;
default:
MXS_ERROR("maxinfo: Unexpected MySQL command 0x%x",
MYSQL_COMMAND(queue));
break;
}
}
return 1;
// MaxInfo doesn't route the data forward so it should be freed.
gwbuf_free(queue);
return rc;
}
/**
@ -652,6 +658,7 @@ maxinfo_execute_query(INFO_INSTANCE *instance, INFO_SESSION *session, char *sql)
else
{
maxinfo_execute(session->dcb, tree);
maxinfo_free_tree(tree);
}
return 1;
}

View File

@ -138,6 +138,7 @@ typedef enum
extern MAXINFO_TREE *maxinfo_parse(char *, PARSE_ERROR *);
extern void maxinfo_free_tree(MAXINFO_TREE *);
extern void maxinfo_execute(DCB *, MAXINFO_TREE *);
extern void maxinfo_send_error(DCB *, int, char *);
extern void maxinfo_send_parse_error(DCB *, char *, PARSE_ERROR);

View File

@ -880,21 +880,27 @@ static void
exec_show_variables(DCB *dcb, MAXINFO_TREE *filter)
{
RESULTSET *result;
VARCONTEXT context;
VARCONTEXT *context;
if ((context = MXS_MALLOC(sizeof(VARCONTEXT))) == NULL)
{
return;
}
if (filter)
{
context.like = filter->value;
context->like = filter->value;
}
else
{
context.like = NULL;
context->like = NULL;
}
context.index = 0;
context->index = 0;
if ((result = resultset_create(variable_row, &context)) == NULL)
if ((result = resultset_create(variable_row, context)) == NULL)
{
maxinfo_send_error(dcb, 0, "No resources available");
MXS_FREE(context);
return;
}
resultset_add_column(result, "Variable_name", 40, COL_TYPE_VARCHAR);
@ -1166,21 +1172,27 @@ static void
exec_show_status(DCB *dcb, MAXINFO_TREE *filter)
{
RESULTSET *result;
VARCONTEXT context;
VARCONTEXT *context;
if ((context = MXS_MALLOC(sizeof(VARCONTEXT))) == NULL)
{
return;
}
if (filter)
{
context.like = filter->value;
context->like = filter->value;
}
else
{
context.like = NULL;
context->like = NULL;
}
context.index = 0;
context->index = 0;
if ((result = resultset_create(status_row, &context)) == NULL)
if ((result = resultset_create(status_row, context)) == NULL)
{
maxinfo_send_error(dcb, 0, "No resources available");
MXS_FREE(context);
return;
}
resultset_add_column(result, "Variable_name", 40, COL_TYPE_VARCHAR);

View File

@ -43,7 +43,7 @@
#include <maxscale/log_manager.h>
static MAXINFO_TREE *make_tree_node(MAXINFO_OPERATOR, char *, MAXINFO_TREE *, MAXINFO_TREE *);
static void free_tree(MAXINFO_TREE *);
void maxinfo_free_tree(MAXINFO_TREE *); // This function is needed by maxinfo.c
static char *fetch_token(char *, int *, char **);
static MAXINFO_TREE *parse_column_list(char **sql);
static MAXINFO_TREE *parse_table_name(char **sql);
@ -95,13 +95,13 @@ maxinfo_parse(char *sql, PARSE_ERROR *parse_error)
{
// Expected expression
*parse_error = PARSE_EXPECTED_LIKE;
free_tree(tree);
maxinfo_free_tree(tree);
return NULL;
}
}
// Malformed show
MXS_FREE(text);
free_tree(tree);
maxinfo_free_tree(tree);
*parse_error = PARSE_MALFORMED_SHOW;
return NULL;
#if 0
@ -132,7 +132,7 @@ maxinfo_parse(char *sql, PARSE_ERROR *parse_error)
{
/** Unknown token after SHUTDOWN MONITOR|SERVICE */
*parse_error = PARSE_SYNTAX_ERROR;
free_tree(tree);
maxinfo_free_tree(tree);
return NULL;
}
return tree;
@ -146,7 +146,7 @@ maxinfo_parse(char *sql, PARSE_ERROR *parse_error)
{
/** Missing token for RESTART MONITOR|SERVICE */
*parse_error = PARSE_SYNTAX_ERROR;
free_tree(tree);
maxinfo_free_tree(tree);
return NULL;
}
tree->right = make_tree_node(MAXOP_LITERAL, text, NULL, NULL);
@ -156,7 +156,7 @@ maxinfo_parse(char *sql, PARSE_ERROR *parse_error)
/** Unknown token after RESTART MONITOR|SERVICE */
*parse_error = PARSE_SYNTAX_ERROR;
MXS_FREE(text);
free_tree(tree);
maxinfo_free_tree(tree);
return NULL;
}
return tree;
@ -278,20 +278,20 @@ make_tree_node(MAXINFO_OPERATOR op, char *value, MAXINFO_TREE *left, MAXINFO_TRE
}
/**
* Recusrsively free the storage associated with a parse tree
* Recursively free the storage associated with a parse tree
*
* @param tree The parse tree to free
*/
static void
free_tree(MAXINFO_TREE *tree)
void
maxinfo_free_tree(MAXINFO_TREE *tree)
{
if (tree->left)
{
free_tree(tree->left);
maxinfo_free_tree(tree->left);
}
if (tree->right)
{
free_tree(tree->right);
maxinfo_free_tree(tree->right);
}
if (tree->value)
{
@ -431,7 +431,7 @@ MAXINFO_TREE* maxinfo_parse_literals(MAXINFO_TREE *tree, int min_args, char *ptr
(node->right = make_tree_node(MAXOP_LITERAL, text, NULL, NULL)) == NULL)
{
*parse_error = PARSE_SYNTAX_ERROR;
free_tree(tree);
maxinfo_free_tree(tree);
if (ptr)
{
MXS_FREE(text);