Merge branch 'develop' into binlog_server_waitdata_encryption

This commit is contained in:
MassimilianoPinto 2016-11-30 09:38:00 +01:00
commit 4ac6fe88c7
63 changed files with 3113 additions and 1639 deletions

View File

@ -91,6 +91,34 @@ If nothing is specified, the default _ttl_ value is 10.
ttl=60
```
#### `max_count`
The maximum number of items the cache may contain. If the limit has been
reached and a new item should be stored, then an older item will be evicted.
Note that if `cached_data` is `thread_specific` then this limit will be
applied to each cache _separately_.
```
max_count=1000
```
The default value is 0, which means no limit.
#### `max_size`
The maximum size - expressed in kibibytes - the cache may occupy. If the limit
has been reached and a new item should be stored, then some older item(s) will
be evicted to make space.
Note that the value of `max_size` must be at least as large as the value of
`max_resultset_size`.
Note that if `cached_data` is `thread_specific` then this limit will be
applied to each cache _separately_.
```
max_size=1000
```
The default value is 0, which means no limit.
#### `rules`
Specifies the path of the file where the caching rules are stored. A relative
@ -113,7 +141,12 @@ allowed values are:
on the other hand that the very same data may be fetched and stored
multiple times.
Default is `shared`.
```
cached_data=thread_specific
```
Default is `shared`. See `max_count` and `max_size` what implication changing
this setting to `thread_specific` has.
#### `debug`

View File

@ -120,6 +120,9 @@ is the name of the function. _ARGS_ is a function specific list of arguments.
Read [Module Commands](../Reference/Module-Commands.md) documentation for more details.
In the 2.1 release of MaxScale, the [_dbfwfilter_}(../Filters/Database-Firewall-Filter.md)
and [_avrorouter_](../Routers/Avrorouter.md) implement module commands.
### Amazon RDS Aurora monitor
The new [Aurora Monitor](../Monitors/Aurora-Monitor.md) module allows monitoring

View File

@ -142,6 +142,18 @@ data block. The default value is 1 transaction.
Controls the number of row events that are grouped into a single Avro
data block. The default value is 1000 row events.
## Module commands
Read [Module Commands](../Reference/Module-Commands.md) documentation for details about module commands.
The avrorouter supports the following module commands.
### `avrorouter::convert SERVICE {start | stop}`
Start or stop the binary log to Avro conversion. The first parameter is the name
of the service to stop and the second parameter tells whether to start the
conversion process or to stop it.
# Files Created by the Avrorouter
The avrorouter creates two files in the location pointed by _avrodir_:

View File

@ -48,7 +48,6 @@
#include <maxscale/cdefs.h>
#include <maxscale/spinlock.h>
#include <maxscale/buffer.h>
#include <maxscale/listmanager.h>
#include <maxscale/gw_protocol.h>
#include <maxscale/gw_authenticator.h>
#include <maxscale/gw_ssl.h>
@ -216,7 +215,6 @@ typedef enum
*/
typedef struct dcb
{
LIST_ENTRY_FIELDS
skygw_chk_t dcb_chk_top;
bool dcb_errhandle_called; /*< this can be called only once */
bool dcb_is_zombie; /**< Whether the DCB is in the zombie list */
@ -247,7 +245,7 @@ typedef struct dcb
SPINLOCK delayqlock; /**< Delay Backend Write Queue spinlock */
GWBUF *delayq; /**< Delay Backend Write Data Queue */
GWBUF *dcb_readqueue; /**< read queue for storing incomplete reads */
SPINLOCK authlock; /**< Generic Authorization spinlock */
GWBUF *dcb_fakequeue; /**< Fake event queue for generated events */
DCBSTATS stats; /**< DCB related statistics */
unsigned int dcb_server_status; /*< the server role indicator from SERVER */
@ -277,17 +275,23 @@ typedef struct dcb
bool ssl_write_want_write; /*< Flag */
int dcb_port; /**< port of target server */
bool was_persistent; /**< Whether this DCB was in the persistent pool */
struct
{
int id; /**< The owning thread's ID */
struct dcb *next; /**< Next DCB in owning thread's list */
struct dcb *tail; /**< Last DCB in owning thread's list */
} thread;
skygw_chk_t dcb_chk_tail;
} DCB;
#define DCB_INIT {.dcb_chk_top = CHK_NUM_DCB, .dcb_initlock = SPINLOCK_INIT, \
.evq = DCBEVENTQ_INIT, .ipv4 = {0}, .func = {0}, .authfunc = {0}, \
.writeqlock = SPINLOCK_INIT, .delayqlock = SPINLOCK_INIT, \
.authlock = SPINLOCK_INIT, .stats = {0}, .memdata = DCBMM_INIT, \
.stats = {0}, .memdata = DCBMM_INIT, \
.cb_lock = SPINLOCK_INIT, .pollinlock = SPINLOCK_INIT, \
.fd = DCBFD_CLOSED, .stats = DCBSTATS_INIT, .ssl_state = SSL_HANDSHAKE_UNKNOWN, \
.state = DCB_STATE_ALLOC, .polloutlock = SPINLOCK_INIT, .dcb_chk_tail = CHK_NUM_DCB, \
.authenticator_data = NULL}
.authenticator_data = NULL, .thread = {0}}
/**
* The DCB usage filer used for returning DCB's in use for a certain reason
@ -314,10 +318,15 @@ typedef enum
#define DCB_POLL_BUSY(x) ((x)->evq.next != NULL)
DCB *dcb_get_zombies(void);
/**
* @brief DCB system initialization function
*
* This function needs to be the first function call into this system.
*/
void dcb_global_init();
int dcb_write(DCB *, GWBUF *);
DCB *dcb_accept(DCB *listener, GWPROTOCOL *protocol_funcs);
bool dcb_pre_alloc(int number);
DCB *dcb_alloc(dcb_role_t, struct servlistener *);
void dcb_free(DCB *);
void dcb_free_all_memory(DCB *dcb);
@ -326,7 +335,24 @@ DCB *dcb_clone(DCB *);
int dcb_read(DCB *, GWBUF **, int);
int dcb_drain_writeq(DCB *);
void dcb_close(DCB *);
DCB *dcb_process_zombies(int); /* Process Zombies except the one behind the pointer */
/**
* @brief Process zombie DCBs
*
* This should only be called from a polling thread in poll.c when no events
* are being processed.
*
* @param threadid Thread ID of the poll thread
*/
void dcb_process_zombies(int threadid);
/**
* Add a DCB to the owner's list
*
* @param dcb DCB to add
*/
void dcb_add_to_list(DCB *dcb);
void printAllDCBs(); /* Debug to print all DCB in the system */
void printDCB(DCB *); /* Debug print routine */
void dprintDCBList(DCB *); /* Debug print DCB list statistics */
@ -342,9 +368,7 @@ int dcb_add_callback(DCB *, DCB_REASON, int (*)(struct dcb *, DCB_REASON, void *
int dcb_remove_callback(DCB *, DCB_REASON, int (*)(struct dcb *, DCB_REASON, void *), void *);
int dcb_isvalid(DCB *); /* Check the DCB is in the linked list */
int dcb_count_by_usage(DCB_USAGE); /* Return counts of DCBs */
int dcb_persistent_clean_count(DCB *, bool); /* Clean persistent and return count */
void dcb_call_foreach (struct server* server, DCB_REASON reason);
int dcb_persistent_clean_count(DCB *, int, bool); /* Clean persistent and return count */
void dcb_hangup_foreach (struct server* server);
size_t dcb_get_session_id(DCB* dcb);
bool dcb_get_ses_log_info(DCB* dcb, size_t* sesid, int* enabled_logs);
@ -353,6 +377,19 @@ int dcb_accept_SSL(DCB* dcb);
int dcb_connect_SSL(DCB* dcb);
int dcb_listen(DCB *listener, const char *config, const char *protocol_name);
void dcb_append_readqueue(DCB *dcb, GWBUF *buffer);
void dcb_enable_session_timeouts();
void dcb_process_idle_sessions(int thr);
/**
* @brief Call a function for each connected DCB
*
* @param func Function to call. The function should return @c true to continue iteration
* and @c false to stop iteration earlier. The first parameter is a DCB and the second
* is the value of @c data that the user provided.
* @param data User provided data passed as the second parameter to @c func
* @return True if all DCBs were iterated, false if the callback returned false
*/
bool dcb_foreach(bool (*func)(DCB *, void *), void *data);
/**
* DCB flags values

View File

@ -243,7 +243,8 @@ typedef enum skygw_chk_t
#define STRDCBROLE(r) ((r) == DCB_ROLE_SERVICE_LISTENER ? "DCB_ROLE_SERVICE_LISTENER" : \
((r) == DCB_ROLE_CLIENT_HANDLER ? "DCB_ROLE_CLIENT_HANDLER" : \
((r) == DCB_ROLE_BACKEND_HANDLER ? "DCB_ROLE_BACKEND_HANDLER" : \
"UNKNOWN DCB ROLE")))
((r) == DCB_ROLE_INTERNAL ? "DCB_ROLE_INTERNAL" : \
"UNKNOWN DCB ROLE"))))
#define STRBETYPE(t) ((t) == BE_MASTER ? "BE_MASTER" : \
((t) == BE_SLAVE ? "BE_SLAVE" : \
@ -474,7 +475,6 @@ typedef enum skygw_chk_t
ss_info_dassert(d->dcb_chk_top == CHK_NUM_DCB && \
d->dcb_chk_tail == CHK_NUM_DCB, \
"Dcb under- or overflow"); \
CHK_MANAGED_LIST(d) \
}
#define CHK_PROTOCOL(p) { \
@ -487,7 +487,6 @@ typedef enum skygw_chk_t
ss_info_dassert(s->ses_chk_top == CHK_NUM_SESSION && \
s->ses_chk_tail == CHK_NUM_SESSION, \
"Session under- or overflow"); \
CHK_MANAGED_LIST(s) \
}
#define CHK_SERVER(s) { \

View File

@ -46,12 +46,16 @@ typedef enum
POLL_STAT_HANGUP,
POLL_STAT_ACCEPT,
POLL_STAT_EVQ_LEN,
POLL_STAT_EVQ_PENDING,
POLL_STAT_EVQ_MAX,
POLL_STAT_MAX_QTIME,
POLL_STAT_MAX_EXECTIME
} POLL_STAT;
enum poll_message
{
POLL_MSG_CLEAN_PERSISTENT = 0x01
};
extern void poll_init();
extern int poll_add_dcb(DCB *);
extern int poll_remove_dcb(DCB *);
@ -71,5 +75,6 @@ extern void poll_fake_event(DCB *dcb, enum EPOLL_EVENTS ev);
extern void poll_fake_hangup_event(DCB *dcb);
extern void poll_fake_write_event(DCB *dcb);
extern void poll_fake_read_event(DCB *dcb);
extern void poll_send_message(enum poll_message msg, void *data);
MXS_END_DECLS

View File

@ -36,6 +36,11 @@ typedef enum routing_capability
/**< The transaction state and autocommit mode of the session are tracked;
implies RCAP_TYPE_CONTIGUOUS_INPUT and RCAP_TYPE_STMT_INPUT. */
RCAP_TYPE_TRANSACTION_TRACKING = 0x0007, /* 0b0000000000000111 */
/**< Responses are delivered one per buffer. */
RCAP_TYPE_STMT_OUTPUT = 0x0010, /* 0b0000000000010000 */
/**< Each delivered buffer is contiguous; implies RCAP_TYPE_STMT_OUTPUT. */
RCAP_TYPE_CONTIGUOUS_OUTPUT = 0x0030, /* 0b0000000000110000 */
} routing_capability_t;
#define RCAP_TYPE_NONE 0

View File

@ -110,8 +110,7 @@ typedef struct server
int depth; /**< Replication level in the tree */
long slaves[MAX_NUM_SLAVES]; /**< Slaves of this node */
bool master_err_is_logged; /*< If node failed, this indicates whether it is logged */
DCB *persistent; /**< List of unused persistent connections to the server */
SPINLOCK persistlock; /**< Lock for adjusting the persistent connections list */
DCB **persistent; /**< List of unused persistent connections to the server */
long persistpoolmax; /**< Maximum size of persistent connections pool */
long persistmaxtime; /**< Maximum number of seconds connection can live */
int persistmax; /**< Maximum pool size actually achieved since startup */
@ -272,7 +271,7 @@ extern void serverAddMonUser(SERVER *, char *, char *);
extern void serverAddParameter(SERVER *, char *, char *);
extern char *serverGetParameter(SERVER *, char *);
extern void server_update_credentials(SERVER *, char *, char *);
extern DCB *server_get_persistent(SERVER *, char *, const char *);
extern DCB *server_get_persistent(SERVER *, char *, const char *, int);
extern void server_update_address(SERVER *, char *);
extern void server_update_port(SERVER *, unsigned short);
extern RESULTSET *serverGetList();

View File

@ -37,7 +37,6 @@
#include <time.h>
#include <maxscale/atomic.h>
#include <maxscale/buffer.h>
#include <maxscale/listmanager.h>
#include <maxscale/spinlock.h>
#include <maxscale/resultset.h>
#include <maxscale/log_manager.h>
@ -160,7 +159,6 @@ typedef enum
*/
typedef struct session
{
LIST_ENTRY_FIELDS
skygw_chk_t ses_chk_top;
SPINLOCK ses_lock;
session_state_t state; /*< Current descriptor state */
@ -185,13 +183,6 @@ typedef struct session
.stats = SESSION_STATS_INIT, .head = DOWNSTREAM_INIT, .tail = UPSTREAM_INIT, \
.state = SESSION_STATE_ALLOC, .ses_chk_tail = CHK_NUM_SESSION}
/** Whether to do session timeout checks */
extern bool check_timeouts;
/** When the next timeout check is done. This is compared to hkheartbeat in
* hk_heartbeat.h */
extern long next_timeout_check;
#define SESSION_PROTOCOL(x, type) DCB_PROTOCOL((x)->client_dcb, type)
/**
@ -212,7 +203,6 @@ extern long next_timeout_check;
(sess)->tail.session, (buf))
SESSION *session_alloc(struct service *, struct dcb *);
bool session_pre_alloc(int number);
SESSION *session_set_dummy(struct dcb *);
bool session_free(SESSION *);
int session_isvalid(SESSION *);
@ -227,12 +217,9 @@ void dprintSession(struct dcb *, SESSION *);
void dListSessions(struct dcb *);
char *session_state(session_state_t);
bool session_link_dcb(SESSION *, struct dcb *);
SESSION* get_session_by_router_ses(void* rses);
void session_enable_log_priority(SESSION* ses, int priority);
void session_disable_log_priority(SESSION* ses, int priority);
RESULTSET *sessionGetList(SESSIONLISTFILTER);
void process_idle_sessions();
void enable_session_timeouts();
/**
* Get the transaction state of the session.

View File

@ -31,6 +31,15 @@ MXS_BEGIN_DECLS
typedef void* ts_stats_t;
/** Enum values for ts_stats_get */
enum ts_stats_type
{
TS_STATS_MAX, /**< Maximum value */
TS_STATS_MIX, /**< Minimum value */
TS_STATS_SUM, /**< Sum of all value */
TS_STATS_AVG /**< Average of all values */
};
/** stats_init should be called only once */
void ts_stats_init();
@ -39,7 +48,17 @@ void ts_stats_end();
ts_stats_t ts_stats_alloc();
void ts_stats_free(ts_stats_t stats);
int64_t ts_stats_sum(ts_stats_t stats);
/**
* @brief Get statistics
*
* @param stats Statistics to read
* @param type Type of statistics to get
* @return Statistics value
*
* @see enum ts_stats_type
*/
int64_t ts_stats_get(ts_stats_t stats, enum ts_stats_type type);
/**
* @brief Increment thread statistics by one
@ -61,8 +80,6 @@ ts_stats_increment(ts_stats_t stats, int thread_id)
* @param stats Statistics to set
* @param value Value to set to
* @param thread_id ID of thread
*
* @note Appears to be unused
*/
static void inline
ts_stats_set(ts_stats_t stats, int value, int thread_id)
@ -70,4 +87,44 @@ ts_stats_set(ts_stats_t stats, int value, int thread_id)
((int64_t*)stats)[thread_id] = value;
}
/**
* @brief Assign the maximum value to a statistics element
*
* This sets the value for the specified thread if the current value is smaller.
*
* @param stats Statistics to set
* @param value Value to set to
* @param thread_id ID of thread
*/
static void inline
ts_stats_set_max(ts_stats_t stats, int value, int thread_id)
{
int64_t *p = (int64_t*) stats;
if (value > p[thread_id])
{
p[thread_id] = value;
}
}
/**
* @brief Assign the minimum value to a statistics element
*
* This sets the value for the specified thread if the current value is larger.
*
* @param stats Statistics to set
* @param value Value to set to
* @param thread_id ID of thread
*/
static void inline
ts_stats_set_min(ts_stats_t stats, int value, int thread_id)
{
int64_t *p = (int64_t*) stats;
if (value < p[thread_id])
{
p[thread_id] = value;
}
}
MXS_END_DECLS

View File

@ -1,4 +1,4 @@
add_library(maxscale-common SHARED adminusers.c alloc.c authenticator.c atomic.c buffer.c config.c config_runtime.c dcb.c filter.c externcmd.c gwbitmask.c gwdirs.c hashtable.c hint.c housekeeper.c listmanager.c load_utils.c log_manager.cc maxscale_pcre2.c memlog.c misc.c mlist.c modutil.c monitor.c queuemanager.c query_classifier.c poll.c random_jkiss.c resultset.c secrets.c server.c service.c session.c spinlock.c thread.c users.c utils.c skygw_utils.cc statistics.c listener.c gw_ssl.c mysql_utils.c mysql_binlog.c modulecmd.c )
add_library(maxscale-common SHARED adminusers.c alloc.c authenticator.c atomic.c buffer.c config.c config_runtime.c dcb.c filter.c externcmd.c gwbitmask.c gwdirs.c hashtable.c hint.c housekeeper.c load_utils.c log_manager.cc maxscale_pcre2.c memlog.c misc.c mlist.c modutil.c monitor.c queuemanager.c query_classifier.c poll.c random_jkiss.c resultset.c secrets.c server.c service.c session.c spinlock.c thread.c users.c utils.c skygw_utils.cc statistics.c listener.c gw_ssl.c mysql_utils.c mysql_binlog.c modulecmd.c )
target_link_libraries(maxscale-common ${MARIADB_CONNECTOR_LIBRARIES} ${LZMA_LINK_FLAGS} ${PCRE2_LIBRARIES} ${CURL_LIBRARIES} ssl pthread crypt dl crypto inih z rt m stdc++)

View File

@ -702,10 +702,6 @@ config_load(const char *filename)
{
ss_dassert(!config_file);
/* Temporary - should use configuration values and test return value (bool) */
dcb_pre_alloc(1000);
session_pre_alloc(250);
global_defaults();
feedback_defaults();

File diff suppressed because it is too large Load Diff

View File

@ -1941,6 +1941,7 @@ int main(int argc, char **argv)
/* Init MaxScale poll system */
poll_init();
dcb_global_init();
/**
* Init mysql thread context for main thread as well. Needed when users
* are queried from backends.

File diff suppressed because it is too large Load Diff

View File

@ -87,15 +87,18 @@ SERVER* server_alloc(const char *name, const char *address, unsigned short port,
return NULL;
}
int nthr = config_threadcount();
SERVER *server = (SERVER *)MXS_CALLOC(1, sizeof(SERVER));
char *my_name = MXS_STRDUP(name);
char *my_protocol = MXS_STRDUP(protocol);
char *my_authenticator = MXS_STRDUP(authenticator);
DCB **persistent = MXS_CALLOC(nthr, sizeof(*persistent));
if (!server || !my_name || !my_protocol || !my_authenticator)
if (!server || !my_name || !my_protocol || !my_authenticator || !persistent)
{
MXS_FREE(server);
MXS_FREE(my_name);
MXS_FREE(persistent);
MXS_FREE(my_protocol);
MXS_FREE(my_authenticator);
return NULL;
@ -125,7 +128,7 @@ SERVER* server_alloc(const char *name, const char *address, unsigned short port,
server->parameters = NULL;
server->server_string = NULL;
spinlock_init(&server->lock);
server->persistent = NULL;
server->persistent = persistent;
server->persistmax = 0;
server->persistmaxtime = 0;
server->persistpoolmax = 0;
@ -133,7 +136,6 @@ SERVER* server_alloc(const char *name, const char *address, unsigned short port,
server->monpw[0] = '\0';
server->is_active = true;
server->charset = SERVER_DEFAULT_CHARSET;
spinlock_init(&server->persistlock);
spinlock_acquire(&server_spin);
server->next = allServers;
@ -183,7 +185,12 @@ server_free(SERVER *tofreeserver)
if (tofreeserver->persistent)
{
dcb_persistent_clean_count(tofreeserver->persistent, true);
int nthr = config_threadcount();
for (int i = 0; i < nthr; i++)
{
dcb_persistent_clean_count(tofreeserver->persistent[i], i, true);
}
}
MXS_FREE(tofreeserver);
return 1;
@ -197,17 +204,16 @@ server_free(SERVER *tofreeserver)
* @param protocol The name of the protocol needed for the connection
*/
DCB *
server_get_persistent(SERVER *server, char *user, const char *protocol)
server_get_persistent(SERVER *server, char *user, const char *protocol, int id)
{
DCB *dcb, *previous = NULL;
if (server->persistent
&& dcb_persistent_clean_count(server->persistent, false)
&& server->persistent
if (server->persistent[id]
&& dcb_persistent_clean_count(server->persistent[id], id, false)
&& server->persistent[id] // Check after cleaning
&& (server->status & SERVER_RUNNING))
{
spinlock_acquire(&server->persistlock);
dcb = server->persistent;
dcb = server->persistent[id];
while (dcb)
{
if (dcb->user
@ -219,7 +225,7 @@ server_get_persistent(SERVER *server, char *user, const char *protocol)
{
if (NULL == previous)
{
server->persistent = dcb->nextpersistent;
server->persistent[id] = dcb->nextpersistent;
}
else
{
@ -227,7 +233,6 @@ server_get_persistent(SERVER *server, char *user, const char *protocol)
}
MXS_FREE(dcb->user);
dcb->user = NULL;
spinlock_release(&server->persistlock);
atomic_add(&server->stats.n_persistent, -1);
atomic_add(&server->stats.n_current, 1);
return dcb;
@ -249,7 +254,6 @@ server_get_persistent(SERVER *server, char *user, const char *protocol)
previous = dcb;
dcb = dcb->nextpersistent;
}
spinlock_release(&server->persistlock);
}
return NULL;
}
@ -549,8 +553,8 @@ dprintServer(DCB *dcb, SERVER *server)
if (server->persistpoolmax)
{
dcb_printf(dcb, "\tPersistent pool size: %d\n", server->stats.n_persistent);
dcb_printf(dcb, "\tPersistent measured pool size: %d\n",
dcb_persistent_clean_count(server->persistent, false));
poll_send_message(POLL_MSG_CLEAN_PERSISTENT, server);
dcb_printf(dcb, "\tPersistent measured pool size: %d\n", server->stats.n_persistent);
dcb_printf(dcb, "\tPersistent actual size max: %d\n", server->persistmax);
dcb_printf(dcb, "\tPersistent pool size limit: %ld\n", server->persistpoolmax);
dcb_printf(dcb, "\tPersistent max time (secs): %ld\n", server->persistmaxtime);
@ -595,19 +599,21 @@ void
dprintPersistentDCBs(DCB *pdcb, SERVER *server)
{
DCB *dcb;
int nthr = config_threadcount();
spinlock_acquire(&server->persistlock);
#if SPINLOCK_PROFILE
dcb_printf(pdcb, "DCB List Spinlock Statistics:\n");
spinlock_stats(&server->persistlock, spin_reporter, pdcb);
#endif
dcb = server->persistent;
while (dcb)
for (int i = 0; i < nthr; i++)
{
dprintOneDCB(pdcb, dcb);
dcb = dcb->nextpersistent;
#if SPINLOCK_PROFILE
dcb_printf(pdcb, "DCB List Spinlock Statistics:\n");
spinlock_stats(&server->persistlock, spin_reporter, pdcb);
#endif
dcb = server->persistent[i];
while (dcb)
{
dprintOneDCB(pdcb, dcb);
dcb = dcb->nextpersistent;
}
}
spinlock_release(&server->persistlock);
}
/**

View File

@ -1052,7 +1052,7 @@ serviceSetTimeout(SERVICE *service, int val)
* configured with a idle timeout. */
if ((service->conn_idle_timeout = val))
{
enable_session_timeouts();
dcb_enable_session_timeouts();
}
return 1;

View File

@ -34,7 +34,6 @@
#include <errno.h>
#include <maxscale/alloc.h>
#include <maxscale/session.h>
#include <maxscale/listmanager.h>
#include <maxscale/service.h>
#include <maxscale/router.h>
#include <maxscale/dcb.h>
@ -42,10 +41,7 @@
#include <maxscale/atomic.h>
#include <maxscale/log_manager.h>
#include <maxscale/housekeeper.h>
/* This list of all sessions */
LIST_CONFIG SESSIONlist =
{LIST_TYPE_RECYCLABLE, sizeof(SESSION), SPINLOCK_INIT};
#include <maxscale/poll.h>
/* A session with null values, used for initialization */
static SESSION session_initialized = SESSION_INIT;
@ -55,21 +51,12 @@ static int session_id;
static struct session session_dummy_struct;
/**
* These two are declared in session.h
*/
bool check_timeouts = false;
long next_timeout_check = 0;
static SPINLOCK timeout_lock = SPINLOCK_INIT;
static void session_initialize(void *session);
static int session_setup_filters(SESSION *session);
static void session_simple_free(SESSION *session, DCB *dcb);
static void session_add_to_all_list(SESSION *session);
static SESSION *session_find_free();
static void session_final_free(SESSION *session);
static list_entry_t *skip_maybe_to_next_non_listener(list_entry_t *current, SESSIONLISTFILTER filter);
/**
* @brief Initialize a session
@ -90,17 +77,6 @@ session_initialize(void *session)
*(SESSION *)session = session_initialized;
}
/*
* @brief Pre-allocate memory for a number of sessions
*
* @param The number of sessions to be pre-allocated
*/
bool
session_pre_alloc(int number)
{
return list_pre_alloc(&SESSIONlist, number, session_initialize);
}
/**
* Allocate a new session for a new client of the specified service.
*
@ -115,15 +91,14 @@ session_pre_alloc(int number)
SESSION *
session_alloc(SERVICE *service, DCB *client_dcb)
{
SESSION *session;
SESSION *session = (SESSION *)(MXS_MALLOC(sizeof(*session)));
session = (SESSION *)list_find_free(&SESSIONlist, session_initialize);
ss_info_dassert(session != NULL, "Allocating memory for session failed.");
if (NULL == session)
{
MXS_OOM();
return NULL;
}
session_initialize(session);
/** Assign a session id and increase */
session->ses_id = (size_t)atomic_add(&session_id, 1) + 1;
session->ses_is_child = (bool) DCB_IS_CLONE(client_dcb);
@ -228,7 +203,6 @@ session_alloc(SERVICE *service, DCB *client_dcb)
CHK_SESSION(session);
client_dcb->session = session;
session->entry_is_ready = true;
return SESSION_STATE_TO_BE_FREED == session->state ? NULL : session;
}
@ -246,8 +220,6 @@ session_set_dummy(DCB *client_dcb)
SESSION *session;
session = &session_dummy_struct;
session->list_entry_chk_top = CHK_NUM_MANAGED_LIST;
session->list_entry_chk_tail = CHK_NUM_MANAGED_LIST;
session->ses_chk_top = CHK_NUM_SESSION;
session->ses_chk_tail = CHK_NUM_SESSION;
session->ses_is_child = false;
@ -260,7 +232,6 @@ session_set_dummy(DCB *client_dcb)
session->state = SESSION_STATE_DUMMY;
session->refcount = 1;
session->ses_id = 0;
session->next = NULL;
client_dcb->session = session;
return session;
@ -318,6 +289,8 @@ session_link_dcb(SESSION *session, DCB *dcb)
}
atomic_add(&session->refcount, 1);
dcb->session = session;
/** Move this DCB under the same thread */
dcb->thread.id = session->client_dcb->thread.id;
spinlock_release(&session->ses_lock);
return true;
}
@ -442,8 +415,7 @@ session_free(SESSION *session)
static void
session_final_free(SESSION *session)
{
/* We never free the actual session, it is available for reuse*/
list_free_entry(&SESSIONlist, (list_entry_t *)session);
MXS_FREE(session);
}
/**
@ -455,20 +427,7 @@ session_final_free(SESSION *session)
int
session_isvalid(SESSION *session)
{
int rval = 0;
list_entry_t *current = list_start_iteration(&SESSIONlist);
while (current)
{
if ((SESSION *)current == session)
{
rval = 1;
list_terminate_iteration_early(&SESSIONlist, current);
break;
}
current = list_iterate(&SESSIONlist, current);
}
return rval;
return session != NULL;
}
/**
@ -486,8 +445,19 @@ printSession(SESSION *session)
printf("\tState: %s\n", session_state(session->state));
printf("\tService: %s (%p)\n", session->service->name, session->service);
printf("\tClient DCB: %p\n", session->client_dcb);
printf("\tConnected: %s",
printf("\tConnected: %s\n",
asctime_r(localtime_r(&session->stats.connect, &result), timebuf));
printf("\tRouter Session: %p\n", session->router_session);
}
bool printAllSessions_cb(DCB *dcb, void *data)
{
if (dcb->dcb_role == DCB_ROLE_CLIENT_HANDLER)
{
printSession(dcb->session);
}
return true;
}
/**
@ -499,76 +469,7 @@ printSession(SESSION *session)
void
printAllSessions()
{
list_entry_t *current = list_start_iteration(&SESSIONlist);
while (current)
{
printSession((SESSION *)current);
current = list_iterate(&SESSIONlist, current);
}
}
/**
* Check sessions
*
* Designed to be called within a debugger session in order
* to display information regarding "interesting" sessions
*/
void
CheckSessions()
{
list_entry_t *current;
int noclients = 0;
int norouter = 0;
current = list_start_iteration(&SESSIONlist);
while (current)
{
SESSION *list_session = (SESSION *)current;
if (list_session->state != SESSION_STATE_LISTENER ||
list_session->state != SESSION_STATE_LISTENER_STOPPED)
{
if (list_session->client_dcb == NULL && list_session->refcount)
{
if (noclients == 0)
{
printf("Sessions without a client DCB.\n");
printf("==============================\n");
}
printSession(list_session);
noclients++;
}
}
current = list_iterate(&SESSIONlist, current);
}
if (noclients)
{
printf("%d Sessions have no clients\n", noclients);
}
current = list_start_iteration(&SESSIONlist);
while (current)
{
SESSION *list_session = (SESSION *)current;
if (list_session->state != SESSION_STATE_LISTENER ||
list_session->state != SESSION_STATE_LISTENER_STOPPED)
{
if (list_session->router_session == NULL && list_session->refcount)
{
if (norouter == 0)
{
printf("Sessions without a router session.\n");
printf("==================================\n");
}
printSession(list_session);
norouter++;
}
}
current = list_iterate(&SESSIONlist, current);
}
if (norouter)
{
printf("%d Sessions have no router session\n", norouter);
}
dcb_foreach(printAllSessions_cb, NULL);
}
/*
@ -579,7 +480,17 @@ CheckSessions()
void
dprintSessionList(DCB *pdcb)
{
dprintListStats(pdcb, &SESSIONlist, "All Sessions");
}
/** Callback for dprintAllSessions */
bool dprintAllSessions_cb(DCB *dcb, void *data)
{
if (dcb->dcb_role == DCB_ROLE_CLIENT_HANDLER)
{
DCB *out_dcb = (DCB*)data;
dprintSession(out_dcb, dcb->session);
}
return true;
}
/**
@ -593,14 +504,8 @@ dprintSessionList(DCB *pdcb)
void
dprintAllSessions(DCB *dcb)
{
list_entry_t *current = list_start_iteration(&SESSIONlist);
while (current)
{
dprintSession(dcb, (SESSION *)current);
current = list_iterate(&SESSIONlist, current);
}
}
dcb_foreach(dprintAllSessions_cb, dcb);
}
/**
* Print a particular session to a DCB
@ -653,6 +558,22 @@ dprintSession(DCB *dcb, SESSION *print_session)
}
}
bool dListSessions_cb(DCB *dcb, void *data)
{
if (dcb->dcb_role == DCB_ROLE_CLIENT_HANDLER)
{
DCB *out_dcb = (DCB*)data;
SESSION *session = dcb->session;
dcb_printf(out_dcb, "%-16p | %-15s | %-14s | %s\n", session,
session->client_dcb && session->client_dcb->remote ?
session->client_dcb->remote : "",
session->service && session->service->name ?
session->service->name : "",
session_state(session->state));
}
return true;
}
/**
* List all sessions in tabular form to a DCB
*
@ -664,32 +585,13 @@ dprintSession(DCB *dcb, SESSION *print_session)
void
dListSessions(DCB *dcb)
{
bool written_heading = false;
list_entry_t *current = list_start_iteration(&SESSIONlist);
if (current)
{
dcb_printf(dcb, "Sessions.\n");
dcb_printf(dcb, "-----------------+-----------------+----------------+--------------------------\n");
dcb_printf(dcb, "Session | Client | Service | State\n");
dcb_printf(dcb, "-----------------+-----------------+----------------+--------------------------\n");
written_heading = true;
}
while (current)
{
SESSION *list_session = (SESSION *)current;
dcb_printf(dcb, "%-16p | %-15s | %-14s | %s\n", list_session,
((list_session->client_dcb && list_session->client_dcb->remote)
? list_session->client_dcb->remote : ""),
(list_session->service && list_session->service->name ? list_session->service->name
: ""),
session_state(list_session->state));
current = list_iterate(&SESSIONlist, current);
}
if (written_heading)
{
dcb_printf(dcb,
"-----------------+-----------------+----------------+--------------------------\n\n");
}
dcb_printf(dcb, "-----------------+-----------------+----------------+--------------------------\n");
dcb_printf(dcb, "Session | Client | Service | State\n");
dcb_printf(dcb, "-----------------+-----------------+----------------+--------------------------\n");
dcb_foreach(dListSessions_cb, dcb);
dcb_printf(dcb, "-----------------+-----------------+----------------+--------------------------\n\n");
}
/**
@ -726,28 +628,6 @@ session_state(session_state_t state)
}
}
/*
* @brief Find the session that relates to a given router session
*
* @param rses A router session
* @return The related session, or NULL if none
*/
SESSION* get_session_by_router_ses(void* rses)
{
list_entry_t *current = list_start_iteration(&SESSIONlist);
while (current)
{
if (((SESSION *)current)->router_session == rses)
{
list_terminate_iteration_early(&SESSIONlist, current);
return (SESSION *)current;
}
current = list_iterate(&SESSIONlist, current);
}
return NULL;
}
/**
* Create the filter chain for this session.
*
@ -893,59 +773,50 @@ session_getUser(SESSION *session)
return (session && session->client_dcb) ? session->client_dcb->user : NULL;
}
/**
* Enable the timing out of idle connections.
*
* This will prevent unnecessary acquisitions of the session spinlock if no
* service is configured with a session idle timeout.
*/
void enable_session_timeouts()
{
check_timeouts = true;
}
/**
* Close sessions that have been idle for too long.
*
* If the time since a session last sent data is greater than the set value in the
* service, it is disconnected. The connection timeout is disabled by default.
*/
void process_idle_sessions()
{
if (spinlock_acquire_nowait(&timeout_lock))
{
if (hkheartbeat >= next_timeout_check)
{
list_entry_t *current = list_start_iteration(&SESSIONlist);
/** Because the resolution of the timeout is one second, we only need to
* check for it once per second. One heartbeat is 100 milliseconds. */
next_timeout_check = hkheartbeat + 10;
while (current)
{
SESSION *all_session = (SESSION *)current;
if (all_session->service && all_session->client_dcb && all_session->client_dcb->state == DCB_STATE_POLLING &&
hkheartbeat - all_session->client_dcb->last_read > all_session->service->conn_idle_timeout * 10)
{
dcb_close(all_session->client_dcb);
}
current = list_iterate(&SESSIONlist, current);
}
}
spinlock_release(&timeout_lock);
}
}
/**
* Callback structure for the session list extraction
*/
typedef struct
{
int index;
int current;
SESSIONLISTFILTER filter;
RESULT_ROW *row;
RESULTSET *set;
} SESSIONFILTER;
bool dcb_iter_cb(DCB *dcb, void *data)
{
SESSIONFILTER *cbdata = (SESSIONFILTER*)data;
if (cbdata->current < cbdata->index)
{
if (cbdata->filter == SESSION_LIST_ALL ||
(cbdata->filter == SESSION_LIST_CONNECTION &&
(dcb->session->state != SESSION_STATE_LISTENER)))
{
cbdata->current++;
}
}
else
{
char buf[20];
SESSION *list_session = dcb->session;
cbdata->index++;
cbdata->row = resultset_make_row(cbdata->set);
snprintf(buf, sizeof(buf), "%p", list_session);
resultset_row_set(cbdata->row, 0, buf);
resultset_row_set(cbdata->row, 1, ((list_session->client_dcb && list_session->client_dcb->remote)
? list_session->client_dcb->remote : ""));
resultset_row_set(cbdata->row, 2, (list_session->service && list_session->service->name
? list_session->service->name : ""));
resultset_row_set(cbdata->row, 3, session_state(list_session->state));
return false;
}
return true;
}
/**
* Provide a row to the result set that defines the set of sessions
*
@ -956,74 +827,18 @@ typedef struct
static RESULT_ROW *
sessionRowCallback(RESULTSET *set, void *data)
{
SESSIONFILTER *cbdata = (SESSIONFILTER *)data;
int i = 0;
list_entry_t *current = list_start_iteration(&SESSIONlist);
SESSIONFILTER *cbdata = (SESSIONFILTER*)data;
RESULT_ROW *row = NULL;
/* Skip to the first non-listener if not showing listeners */
current = skip_maybe_to_next_non_listener(current, cbdata->filter);
dcb_foreach(dcb_iter_cb, cbdata);
while (i < cbdata->index && current)
if (cbdata->row)
{
if (cbdata->filter == SESSION_LIST_ALL ||
(cbdata->filter == SESSION_LIST_CONNECTION &&
((SESSION *)current)->state != SESSION_STATE_LISTENER))
{
i++;
}
current = list_iterate(&SESSIONlist, current);
row = cbdata->row;
cbdata->row = NULL;
}
/* Skip to the next non-listener if not showing listeners */
current = skip_maybe_to_next_non_listener(current, cbdata->filter);
if (NULL == current)
{
MXS_FREE(data);
return NULL;
}
else
{
char buf[20];
RESULT_ROW *row;
SESSION *list_session = (SESSION *)current;
cbdata->index++;
row = resultset_make_row(set);
snprintf(buf,19, "%p", list_session);
buf[19] = '\0';
resultset_row_set(row, 0, buf);
resultset_row_set(row, 1, ((list_session->client_dcb && list_session->client_dcb->remote)
? list_session->client_dcb->remote : ""));
resultset_row_set(row, 2, (list_session->service && list_session->service->name
? list_session->service->name : ""));
resultset_row_set(row, 3, session_state(list_session->state));
list_terminate_iteration_early(&SESSIONlist, current);
return row;
}
}
/*
* @brief Skip to the next non-listener session, if not showing listeners
*
* Based on a test of the filter that is the second parameter, along with the
* state of the sessions.
*
* @param current The session to start the possible skipping
* @param filter The filter the defines the operation
*
* @result The first session beyond those skipped, or the starting session;
* NULL if the list of sessions is exhausted.
*/
static list_entry_t *skip_maybe_to_next_non_listener(list_entry_t *current, SESSIONLISTFILTER filter)
{
/* Skip to the first non-listener if not showing listeners */
while (current && filter == SESSION_LIST_CONNECTION &&
((SESSION *)current)->state == SESSION_STATE_LISTENER)
{
current = list_iterate(&SESSIONlist, current);
}
return current;
return row;
}
/**
@ -1036,6 +851,7 @@ static list_entry_t *skip_maybe_to_next_non_listener(list_entry_t *current, SESS
* so we suppress the warning. In fact, the function call results in return
* of the set structure which includes a pointer to data
*/
/*lint -e429 */
RESULTSET *
sessionGetList(SESSIONLISTFILTER filter)
@ -1049,11 +865,16 @@ sessionGetList(SESSIONLISTFILTER filter)
}
data->index = 0;
data->filter = filter;
data->current = 0;
data->row = NULL;
if ((set = resultset_create(sessionRowCallback, data)) == NULL)
{
MXS_FREE(data);
return NULL;
}
data->set = set;
resultset_add_column(set, "Session", 16, COL_TYPE_VARCHAR);
resultset_add_column(set, "Client", 15, COL_TYPE_VARCHAR);
resultset_add_column(set, "Service", 15, COL_TYPE_VARCHAR);

View File

@ -91,3 +91,47 @@ int64_t ts_stats_sum(ts_stats_t stats)
}
return sum;
}
/**
* @brief Read the value of the statistics object
*
* Calculate
*
* @param stats Statistics to read
* @param type The statistics type
* @return Value of statistics
*/
int64_t ts_stats_get(ts_stats_t stats, enum ts_stats_type type)
{
ss_dassert(stats_initialized);
int64_t best = type == TS_STATS_MAX ? LONG_MIN : (type == TS_STATS_MIX ? LONG_MAX : 0);
for (int i = 0; i < thread_count; i++)
{
int64_t value = ((int64_t*)stats)[i];
switch (type)
{
case TS_STATS_MAX:
if (value > best)
{
best = value;
}
break;
case TS_STATS_MIX:
if (value < best)
{
best = value;
}
break;
case TS_STATS_AVG:
case TS_STATS_SUM:
best += value;
break;
}
}
return type == TS_STATS_AVG ? best / thread_count : best;
}

View File

@ -67,7 +67,6 @@ test1()
ss_dfprintf(stderr, "\t..done\nMake clone DCB a zombie");
clone->state = DCB_STATE_NOPOLLING;
dcb_close(clone);
ss_info_dassert(dcb_get_zombies() == clone, "Clone DCB must be start of zombie list now");
ss_dfprintf(stderr, "\t..done\nProcess the zombies list");
dcb_process_zombies(0);
ss_dfprintf(stderr, "\t..done\nCheck clone no longer valid");

View File

@ -1,5 +1,20 @@
if (JANSSON_FOUND)
add_library(cache SHARED cache.cc cachefilter.cc cachemt.cc cachept.cc cachesimple.cc cachest.cc rules.cc sessioncache.cc storage.cc storagefactory.cc storagereal.cc)
add_library(cache SHARED
cache.cc
cachefilter.cc
cachemt.cc
cachept.cc
cachesimple.cc
cachest.cc
lrustorage.cc
lrustoragemt.cc
lrustoragest.cc
rules.cc
sessioncache.cc
storage.cc
storagefactory.cc
storagereal.cc
)
target_link_libraries(cache maxscale-common jansson)
set_target_properties(cache PROPERTIES VERSION "1.0.0")
set_target_properties(cache PROPERTIES LINK_FLAGS -Wl,-z,defs)

View File

@ -19,59 +19,39 @@
#include "storagefactory.h"
#include "storage.h"
Cache::Cache(const std::string& name,
Cache::Cache(const std::string& name,
const CACHE_CONFIG* pConfig,
CACHE_RULES* pRules,
StorageFactory* pFactory)
SCacheRules sRules,
SStorageFactory sFactory)
: m_name(name)
, m_config(*pConfig)
, m_pRules(pRules)
, m_pFactory(pFactory)
, m_sRules(sRules)
, m_sFactory(sFactory)
{
}
Cache::~Cache()
{
cache_rules_free(m_pRules);
delete m_pFactory;
}
//static
bool Cache::Create(const CACHE_CONFIG& config,
CACHE_RULES** ppRules)
CacheRules** ppRules,
StorageFactory** ppFactory)
{
CACHE_RULES* pRules = NULL;
CacheRules* pRules = NULL;
StorageFactory* pFactory = NULL;
if (config.rules)
{
pRules = cache_rules_load(config.rules, config.debug);
pRules = CacheRules::load(config.rules, config.debug);
}
else
{
pRules = cache_rules_create(config.debug);
pRules = CacheRules::create(config.debug);
}
if (pRules)
{
*ppRules = pRules;
}
else
{
MXS_ERROR("Could not create rules.");
}
return pRules != NULL;
}
//static
bool Cache::Create(const CACHE_CONFIG& config,
CACHE_RULES** ppRules,
StorageFactory** ppFactory)
{
CACHE_RULES* pRules = NULL;
StorageFactory* pFactory = NULL;
if (Create(config, &pRules))
{
pFactory = StorageFactory::Open(config.storage);
@ -83,19 +63,23 @@ bool Cache::Create(const CACHE_CONFIG& config,
else
{
MXS_ERROR("Could not open storage factory '%s'.", config.storage);
cache_rules_free(pRules);
delete pRules;
}
}
else
{
MXS_ERROR("Could not create rules.");
}
return pFactory != NULL;
}
bool Cache::should_store(const char* zDefaultDb, const GWBUF* pQuery)
{
return cache_rules_should_store(m_pRules, zDefaultDb, pQuery);
return m_sRules->should_store(zDefaultDb, pQuery);
}
bool Cache::should_use(const SESSION* pSession)
{
return cache_rules_should_use(m_pRules, pSession);
return m_sRules->should_use(pSession);
}

View File

@ -14,6 +14,7 @@
#include <maxscale/cdefs.h>
#include <tr1/functional>
#include <tr1/memory>
#include <string>
#include <maxscale/buffer.h>
#include <maxscale/session.h>
@ -25,6 +26,9 @@ class SessionCache;
class Cache
{
public:
typedef std::tr1::shared_ptr<CacheRules> SCacheRules;
typedef std::tr1::shared_ptr<StorageFactory> SStorageFactory;
virtual ~Cache();
const CACHE_CONFIG& config() const { return m_config; }
@ -77,14 +81,11 @@ public:
protected:
Cache(const std::string& name,
const CACHE_CONFIG* pConfig,
CACHE_RULES* pRules,
StorageFactory* pFactory);
SCacheRules sRules,
SStorageFactory sFactory);
static bool Create(const CACHE_CONFIG& config,
CACHE_RULES** ppRules);
static bool Create(const CACHE_CONFIG& config,
CACHE_RULES** ppRules,
CacheRules** ppRules,
StorageFactory** ppFactory);
private:
@ -94,6 +95,6 @@ private:
protected:
const std::string m_name; // The name of the instance; the section name in the config.
const CACHE_CONFIG& m_config; // The configuration of the cache instance.
CACHE_RULES* m_pRules; // The rules of the cache instance.
StorageFactory* m_pFactory; // The storage factory.
SCacheRules m_sRules; // The rules of the cache instance.
SStorageFactory m_sFactory; // The storage factory.
};

View File

@ -55,35 +55,62 @@ typedef struct cache_key
char data[CACHE_KEY_MAXLEN];
} CACHE_KEY;
typedef enum cache_storage_capabilities
{
CACHE_STORAGE_CAP_NONE = 0x00,
CACHE_STORAGE_CAP_ST = 0x01, /*< Storage can optimize for single thread. */
CACHE_STORAGE_CAP_MT = 0x02, /*< Storage can handle multiple threads. */
CACHE_STORAGE_CAP_LRU = 0x04, /*< Storage capable of LRU eviction. */
CACHE_STORAGE_CAP_MAX_COUNT = 0x08, /*< Storage capable of capping number of entries.*/
CACHE_STORAGE_CAP_MAX_SIZE = 0x10, /*< Storage capable of capping size of cache.*/
} cache_storage_capabilities_t;
static inline bool cache_storage_has_cap(uint32_t capabilities, uint32_t mask)
{
return (capabilities & mask) == mask;
}
typedef struct cache_storage_api
{
/**
* Called immediately after the storage module has been loaded.
*
* @param capabilities On successful return, contains a bitmask of
* cache_storage_capabilities_t values.
* @return True if the initialization succeeded, false otherwise.
*/
bool (*initialize)();
bool (*initialize)(uint32_t* capabilities);
/**
* Creates an instance of cache storage. This function should, if necessary,
* create the actual storage, initialize it and prepare to put and get
* cache items.
*
* @param model Whether the storage will be used in a single thread or
* multi thread context. In the latter case the storage must
* perform thread synchronization as appropriate, in the former
* case it need not.
* @param name The name of the cache instance.
* @param ttl Time to live; number of seconds the value is valid.
* @param argc The number of elements in the argv array.
* @param argv Array of arguments, as passed in the `storage_options` parameter
* in the cache section in the MaxScale configuration file.
* @param model Whether the storage will be used in a single thread or
* multi thread context. In the latter case the storage must
* perform thread synchronization as appropriate, in the former
* case it need not.
* @param name The name of the cache instance.
* @param ttl Time to live; number of seconds the value is valid.
* @param max_count The maximum number of items the storage may store, before
* it should evict some items. Caller should specify 0, unless
* CACHE_STORAGE_CAP_MAX_COUNT is returned at initialization.
* @param max_count The maximum size of the storage may may occupy, before it
should evict some items. Caller should specify 0, unless
* CACHE_STORAGE_CAP_MAX_SIZE is returned at initialization.
* @param argc The number of elements in the argv array.
* @param argv Array of arguments, as passed in the `storage_options`
* parameter in the cache section in the MaxScale configuration
* file.
*
* @return A new cache instance, or NULL if the instance could not be
* created.
*/
CACHE_STORAGE* (*createInstance)(cache_thread_model_t model,
const char *name,
uint32_t ttl,
uint32_t max_count,
uint64_t max_size,
int argc, char* argv[]);
/**

View File

@ -36,6 +36,8 @@ static const CACHE_CONFIG DEFAULT_CONFIG =
NULL,
0,
CACHE_DEFAULT_TTL,
CACHE_DEFAULT_MAX_COUNT,
CACHE_DEFAULT_MAX_SIZE,
CACHE_DEFAULT_DEBUG,
CACHE_DEFAULT_THREAD_MODEL,
};
@ -333,24 +335,42 @@ static bool process_params(char **pzOptions, FILTER_PARAMETER **ppParams, CACHE_
if (strcmp(pParam->name, "max_resultset_rows") == 0)
{
int v = atoi(pParam->value);
char* end;
int32_t value = strtol(pParam->value, &end, 0);
if (v > 0)
if ((*end == 0) && (value >= 0))
{
config.max_resultset_rows = v;
if (value != 0)
{
config.max_resultset_rows = value;
}
else
{
config.max_resultset_rows = CACHE_DEFAULT_MAX_RESULTSET_ROWS;
}
}
else
{
config.max_resultset_rows = CACHE_DEFAULT_MAX_RESULTSET_ROWS;
MXS_ERROR("The value of the configuration entry '%s' must "
"be an integer larger than 0.", pParam->name);
error = true;
}
}
else if (strcmp(pParam->name, "max_resultset_size") == 0)
{
int v = atoi(pParam->value);
char* end;
int64_t value = strtoll(pParam->value, &end, 0);
if (v > 0)
if ((*end == 0) && (value >= 0))
{
config.max_resultset_size = v * 1024;
if (value != 0)
{
config.max_resultset_size = value * 1024;
}
else
{
config.max_resultset_size = CACHE_DEFAULT_MAX_RESULTSET_SIZE;
}
}
else
{
@ -452,6 +472,52 @@ static bool process_params(char **pzOptions, FILTER_PARAMETER **ppParams, CACHE_
error = true;
}
}
else if (strcmp(pParam->name, "max_count") == 0)
{
char* end;
int32_t value = strtoul(pParam->value, &end, 0);
if ((*end == 0) && (value >= 0))
{
if (value != 0)
{
config.max_count = value;
}
else
{
config.max_count = CACHE_DEFAULT_MAX_COUNT;
}
}
else
{
MXS_ERROR("The value of the configuration entry '%s' must "
"be an integer larger than or equal to 0.", pParam->name);
error = true;
}
}
else if (strcmp(pParam->name, "max_size") == 0)
{
char* end;
int64_t value = strtoull(pParam->value, &end, 0);
if ((*end == 0) && (value >= 0))
{
if (value != 0)
{
config.max_size = value * 1024;
}
else
{
config.max_size = CACHE_DEFAULT_MAX_SIZE;
}
}
else
{
MXS_ERROR("The value of the configuration entry '%s' must "
"be an integer larger than or equal to 0.", pParam->name);
error = true;
}
}
else if (strcmp(pParam->name, "debug") == 0)
{
int v = atoi(pParam->value);
@ -492,6 +558,17 @@ static bool process_params(char **pzOptions, FILTER_PARAMETER **ppParams, CACHE_
}
}
if (!error)
{
if (config.max_size < config.max_resultset_size)
{
MXS_ERROR("The value of 'max_size' must be at least as larged as that "
"of 'max_resultset_size'.");
error = true;
}
}
if (error)
{
cache_config_finish(config);

View File

@ -39,13 +39,17 @@ class StorageFactory;
#define CACHE_DEBUG_MAX (CACHE_DEBUG_RULES | CACHE_DEBUG_USAGE | CACHE_DEBUG_DECISIONS)
// Count
#define CACHE_DEFAULT_MAX_RESULTSET_ROWS UINT_MAX
#define CACHE_DEFAULT_MAX_RESULTSET_ROWS UINT32_MAX
// Bytes
#define CACHE_DEFAULT_MAX_RESULTSET_SIZE 64 * 1024
// Seconds
#define CACHE_DEFAULT_TTL 10
// Integer value
#define CACHE_DEFAULT_DEBUG 0
// Positive integer
#define CACHE_DEFAULT_MAX_COUNT UINT32_MAX
// Positive integer
#define CACHE_DEFAULT_MAX_SIZE UINT64_MAX
// Thread model
#define CACHE_DEFAULT_THREAD_MODEL CACHE_THREAD_MODEL_MT
@ -59,6 +63,8 @@ typedef struct cache_config
char** storage_argv; /**< Cooked options for storage module. */
int storage_argc; /**< Number of cooked options. */
uint32_t ttl; /**< Time to live. */
uint32_t max_count; /**< Maximum number of entries in the cache.*/
uint64_t max_size; /**< Maximum size of the cache.*/
uint32_t debug; /**< Debug settings. */
cache_thread_model_t thread_model; /**< Thread model. */
} CACHE_CONFIG;
@ -100,6 +106,31 @@ struct hash<CACHE_KEY>
}
/**
* LockGuard is a RAII class whose constructor acquires a spinlock and
* destructor releases the same spinlock. To be used for locking a spinlock
* in an exceptionsafe manner for the duration of a scope.
*/
class LockGuard
{
public:
LockGuard(SPINLOCK* plock)
: lock_(*plock)
{
spinlock_acquire(&lock_);
}
~LockGuard()
{
spinlock_release(&lock_);
}
private:
LockGuard(const LockGuard&);
LockGuard& operator = (const LockGuard&);
SPINLOCK& lock_;
};
#define CPP_GUARD(statement)\
do { try { statement; } \
catch (const std::exception& x) { MXS_ERROR("Caught standard exception: %s", x.what()); }\

View File

@ -11,18 +11,23 @@
* Public License.
*/
#define MXS_MODULE_NAME "cache"
#include "cachemt.h"
#include "storage.h"
#include "storagefactory.h"
CacheMT::CacheMT(const std::string& name,
using std::tr1::shared_ptr;
CacheMT::CacheMT(const std::string& name,
const CACHE_CONFIG* pConfig,
CACHE_RULES* pRules,
StorageFactory* pFactory,
Storage* pStorage)
: CacheSimple(name, pConfig, pRules, pFactory, pStorage)
SCacheRules sRules,
SStorageFactory sFactory,
Storage* pStorage)
: CacheSimple(name, pConfig, sRules, sFactory, pStorage)
{
spinlock_init(&m_lockPending);
MXS_NOTICE("Created multi threaded cache.");
}
CacheMT::~CacheMT()
@ -35,30 +40,15 @@ CacheMT* CacheMT::Create(const std::string& name, const CACHE_CONFIG* pConfig)
CacheMT* pCache = NULL;
CACHE_RULES* pRules = NULL;
CacheRules* pRules = NULL;
StorageFactory* pFactory = NULL;
if (CacheSimple::Create(*pConfig, &pRules, &pFactory))
{
pCache = Create(name, pConfig, pRules, pFactory);
}
shared_ptr<CacheRules> sRules(pRules);
shared_ptr<StorageFactory> sFactory(pFactory);
return pCache;
}
// static
CacheMT* CacheMT::Create(const std::string& name, StorageFactory* pFactory, const CACHE_CONFIG* pConfig)
{
ss_dassert(pConfig);
ss_dassert(pFactory);
CacheMT* pCache = NULL;
CACHE_RULES* pRules = NULL;
if (CacheSimple::Create(*pConfig, &pRules))
{
pCache = Create(name, pConfig, pRules, pFactory);
pCache = Create(name, pConfig, sRules, sFactory);
}
return pCache;
@ -66,47 +56,48 @@ CacheMT* CacheMT::Create(const std::string& name, StorageFactory* pFactory, cons
bool CacheMT::must_refresh(const CACHE_KEY& key, const SessionCache* pSessionCache)
{
spinlock_acquire(&m_lockPending);
bool rv = CacheSimple::do_must_refresh(key, pSessionCache);
spinlock_release(&m_lockPending);
LockGuard guard(&m_lockPending);
return rv;
return do_must_refresh(key, pSessionCache);
}
void CacheMT::refreshed(const CACHE_KEY& key, const SessionCache* pSessionCache)
{
spinlock_acquire(&m_lockPending);
CacheSimple::do_refreshed(key, pSessionCache);
spinlock_release(&m_lockPending);
LockGuard guard(&m_lockPending);
do_refreshed(key, pSessionCache);
}
// static
CacheMT* CacheMT::Create(const std::string& name,
const CACHE_CONFIG* pConfig,
CACHE_RULES* pRules,
StorageFactory* pFactory)
SCacheRules sRules,
SStorageFactory sFactory)
{
CacheMT* pCache = NULL;
uint32_t ttl = pConfig->ttl;
uint32_t maxCount = pConfig->max_count;
uint32_t maxSize = pConfig->max_size;
int argc = pConfig->storage_argc;
char** argv = pConfig->storage_argv;
Storage* pStorage = pFactory->createStorage(CACHE_THREAD_MODEL_MT, name.c_str(), ttl, argc, argv);
Storage* pStorage = sFactory->createStorage(CACHE_THREAD_MODEL_MT, name.c_str(),
ttl, maxCount, maxSize,
argc, argv);
if (pStorage)
{
CPP_GUARD(pCache = new CacheMT(name,
pConfig,
pRules,
pFactory,
sRules,
sFactory,
pStorage));
if (!pCache)
{
delete pStorage;
cache_rules_free(pRules);
delete pFactory;
}
}

View File

@ -22,7 +22,6 @@ public:
~CacheMT();
static CacheMT* Create(const std::string& name, const CACHE_CONFIG* pConfig);
static CacheMT* Create(const std::string& name, StorageFactory* pFactory, const CACHE_CONFIG* pConfig);
bool must_refresh(const CACHE_KEY& key, const SessionCache* pSessionCache);
@ -31,14 +30,14 @@ public:
private:
CacheMT(const std::string& name,
const CACHE_CONFIG* pConfig,
CACHE_RULES* pRules,
StorageFactory* pFactory,
SCacheRules sRules,
SStorageFactory sFactory,
Storage* pStorage);
static CacheMT* Create(const std::string& name,
const CACHE_CONFIG* pConfig,
CACHE_RULES* pRules,
StorageFactory* pFactory);
SCacheRules sRules,
SStorageFactory sFactory);
private:
CacheMT(const CacheMT&);

View File

@ -11,6 +11,7 @@
* Public License.
*/
#define MXS_MODULE_NAME "cache"
#include "cachept.h"
#include <maxscale/atomic.h>
#include <maxscale/platform.h>
@ -46,12 +47,13 @@ inline int thread_index()
CachePT::CachePT(const std::string& name,
const CACHE_CONFIG* pConfig,
CACHE_RULES* pRules,
StorageFactory* pFactory,
SCacheRules sRules,
SStorageFactory sFactory,
const Caches& caches)
: Cache(name, pConfig, pRules, pFactory)
: Cache(name, pConfig, sRules, sFactory)
, m_caches(caches)
{
MXS_NOTICE("Created cache per thread.");
}
CachePT::~CachePT()
@ -65,31 +67,15 @@ CachePT* CachePT::Create(const std::string& name, const CACHE_CONFIG* pConfig)
CachePT* pCache = NULL;
CACHE_RULES* pRules = NULL;
CacheRules* pRules = NULL;
StorageFactory* pFactory = NULL;
if (Cache::Create(*pConfig, &pRules, &pFactory))
{
pCache = Create(name, pConfig, pRules, pFactory);
}
shared_ptr<CacheRules> sRules(pRules);
shared_ptr<StorageFactory> sFactory(pFactory);
return pCache;
}
// static
CachePT* CachePT::Create(const std::string& name,
StorageFactory* pFactory,
const CACHE_CONFIG* pConfig)
{
ss_dassert(pConfig);
CachePT* pCache = NULL;
CACHE_RULES* pRules = NULL;
if (Cache::Create(*pConfig, &pRules))
{
pCache = Create(name, pConfig, pRules, pFactory);
pCache = Create(name, pConfig, sRules, sFactory);
}
return pCache;
@ -128,8 +114,8 @@ cache_result_t CachePT::del_value(const CACHE_KEY& key)
// static
CachePT* CachePT::Create(const std::string& name,
const CACHE_CONFIG* pConfig,
CACHE_RULES* pRules,
StorageFactory* pFactory)
SCacheRules sRules,
SStorageFactory sFactory)
{
CachePT* pCache = NULL;
@ -151,7 +137,7 @@ CachePT* CachePT::Create(const std::string& name,
CacheST* pCacheST = 0;
CPP_GUARD(pCacheST = CacheST::Create(namest, pFactory, pConfig));
CPP_GUARD(pCacheST = CacheST::Create(namest, sRules, sFactory, pConfig));
if (pCacheST)
{
@ -169,13 +155,11 @@ CachePT* CachePT::Create(const std::string& name,
if (!error)
{
pCache = new CachePT(name, pConfig, pRules, pFactory, caches);
pCache = new CachePT(name, pConfig, sRules, sFactory, caches);
}
}
catch (const std::exception&)
{
cache_rules_free(pRules);
delete pFactory;
}
return pCache;

View File

@ -23,7 +23,6 @@ public:
~CachePT();
static CachePT* Create(const std::string& name, const CACHE_CONFIG* pConfig);
static CachePT* Create(const std::string& name, StorageFactory* pFactory, const CACHE_CONFIG* pConfig);
bool must_refresh(const CACHE_KEY& key, const SessionCache* pSessionCache);
@ -43,14 +42,14 @@ private:
CachePT(const std::string& name,
const CACHE_CONFIG* pConfig,
CACHE_RULES* pRules,
StorageFactory* pFactory,
SCacheRules sRules,
SStorageFactory sFactory,
const Caches& caches);
static CachePT* Create(const std::string& name,
const CACHE_CONFIG* pConfig,
CACHE_RULES* pRules,
StorageFactory* pFactory);
SCacheRules sRules,
SStorageFactory sFactory);
Cache& thread_cache();

View File

@ -11,16 +11,17 @@
* Public License.
*/
#define MXS_MODULE_NAME "cache"
#include "cachesimple.h"
#include "storage.h"
#include "storagefactory.h"
CacheSimple::CacheSimple(const std::string& name,
const CACHE_CONFIG* pConfig,
CACHE_RULES* pRules,
StorageFactory* pFactory,
SCacheRules sRules,
SStorageFactory sFactory,
Storage* pStorage)
: Cache(name, pConfig, pRules, pFactory)
: Cache(name, pConfig, sRules, sFactory)
, m_pStorage(pStorage)
{
}
@ -30,31 +31,14 @@ CacheSimple::~CacheSimple()
delete m_pStorage;
}
// static
bool CacheSimple::Create(const CACHE_CONFIG& config,
CACHE_RULES** ppRules)
{
int rv = false;
CACHE_RULES* pRules = NULL;
if (Cache::Create(config, &pRules))
{
*ppRules = pRules;
}
return pRules != NULL;;
}
// static
bool CacheSimple::Create(const CACHE_CONFIG& config,
CACHE_RULES** ppRules,
CacheRules** ppRules,
StorageFactory** ppFactory)
{
int rv = false;
CACHE_RULES* pRules = NULL;
CacheRules* pRules = NULL;
StorageFactory* pFactory = NULL;
if (Cache::Create(config, &pRules, &pFactory))

View File

@ -35,15 +35,12 @@ public:
protected:
CacheSimple(const std::string& name,
const CACHE_CONFIG* pConfig,
CACHE_RULES* pRules,
StorageFactory* pFactory,
SCacheRules sRules,
SStorageFactory sFactory,
Storage* pStorage);
static bool Create(const CACHE_CONFIG& config,
CACHE_RULES** ppRules);
static bool Create(const CACHE_CONFIG& config,
CACHE_RULES** ppRules,
CacheRules** ppRules,
StorageFactory** ppFactory);

View File

@ -11,17 +11,21 @@
* Public License.
*/
#define MXS_MODULE_NAME "cache"
#include "cachest.h"
#include "storage.h"
#include "storagefactory.h"
CacheST::CacheST(const std::string& name,
using std::tr1::shared_ptr;
CacheST::CacheST(const std::string& name,
const CACHE_CONFIG* pConfig,
CACHE_RULES* pRules,
StorageFactory* pFactory,
Storage* pStorage)
: CacheSimple(name, pConfig, pRules, pFactory, pStorage)
SCacheRules sRules,
SStorageFactory sFactory,
Storage* pStorage)
: CacheSimple(name, pConfig, sRules, sFactory, pStorage)
{
MXS_NOTICE("Created single threaded cache.");
}
CacheST::~CacheST()
@ -34,33 +38,31 @@ CacheST* CacheST::Create(const std::string& name, const CACHE_CONFIG* pConfig)
CacheST* pCache = NULL;
CACHE_RULES* pRules = NULL;
CacheRules* pRules = NULL;
StorageFactory* pFactory = NULL;
if (CacheSimple::Create(*pConfig, &pRules, &pFactory))
{
pCache = Create(name, pConfig, pRules, pFactory);
shared_ptr<CacheRules> sRules(pRules);
shared_ptr<StorageFactory> sFactory(pFactory);
pCache = Create(name, pConfig, sRules, sFactory);
}
return pCache;
}
// static
CacheST* CacheST::Create(const std::string& name, StorageFactory* pFactory, const CACHE_CONFIG* pConfig)
CacheST* CacheST::Create(const std::string& name,
SCacheRules sRules,
SStorageFactory sFactory,
const CACHE_CONFIG* pConfig)
{
ss_dassert(sRules.get());
ss_dassert(sFactory.get());
ss_dassert(pConfig);
ss_dassert(pFactory);
CacheST* pCache = NULL;
CACHE_RULES* pRules = NULL;
if (CacheSimple::Create(*pConfig, &pRules))
{
pCache = Create(name, pConfig, pRules, pFactory);
}
return pCache;
return Create(name, pConfig, sRules, sFactory);
}
bool CacheST::must_refresh(const CACHE_KEY& key, const SessionCache* pSessionCache)
@ -76,30 +78,33 @@ void CacheST::refreshed(const CACHE_KEY& key, const SessionCache* pSessionCache
// static
CacheST* CacheST::Create(const std::string& name,
const CACHE_CONFIG* pConfig,
CACHE_RULES* pRules,
StorageFactory* pFactory)
SCacheRules sRules,
SStorageFactory sFactory)
{
CacheST* pCache = NULL;
uint32_t ttl = pConfig->ttl;
uint32_t maxCount = pConfig->max_count;
uint32_t maxSize = pConfig->max_size;
int argc = pConfig->storage_argc;
char** argv = pConfig->storage_argv;
Storage* pStorage = pFactory->createStorage(CACHE_THREAD_MODEL_ST, name.c_str(), ttl, argc, argv);
Storage* pStorage = sFactory->createStorage(CACHE_THREAD_MODEL_ST, name.c_str(),
ttl, maxCount, maxSize,
argc, argv);
if (pStorage)
{
CPP_GUARD(pCache = new CacheST(name,
pConfig,
pRules,
pFactory,
sRules,
sFactory,
pStorage));
if (!pCache)
{
delete pStorage;
cache_rules_free(pRules);
delete pFactory;
}
}

View File

@ -21,7 +21,10 @@ public:
~CacheST();
static CacheST* Create(const std::string& name, const CACHE_CONFIG* pConfig);
static CacheST* Create(const std::string& name, StorageFactory* pFactory, const CACHE_CONFIG* pConfig);
static CacheST* Create(const std::string& name,
SCacheRules sRules,
SStorageFactory sFactory,
const CACHE_CONFIG* pConfig);
bool must_refresh(const CACHE_KEY& key, const SessionCache* pSessionCache);
@ -30,14 +33,14 @@ public:
private:
CacheST(const std::string& name,
const CACHE_CONFIG* pConfig,
CACHE_RULES* pRules,
StorageFactory* pFactory,
SCacheRules sRules,
SStorageFactory sFactory,
Storage* pStorage);
static CacheST* Create(const std::string& name,
const CACHE_CONFIG* pConfig,
CACHE_RULES* pRules,
StorageFactory* pFactory);
SCacheRules sRules,
SStorageFactory sFactory);
private:
CacheST(const CacheST&);
CacheST& operator = (const CacheST&);

View File

@ -0,0 +1,315 @@
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl.
*
* Change Date: 2019-07-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#define MXS_MODULE_NAME "cache"
#include "lrustorage.h"
LRUStorage::LRUStorage(Storage* pstorage, size_t max_count, size_t max_size)
: pstorage_(pstorage)
, max_count_(max_count)
, max_size_(max_size)
, count_(0)
, size_(0)
, phead_(NULL)
, ptail_(NULL)
{
}
LRUStorage::~LRUStorage()
{
}
cache_result_t LRUStorage::get_key(const char* zdefault_db,
const GWBUF* pquery,
CACHE_KEY* pkey)
{
return pstorage_->get_key(zdefault_db, pquery, pkey);
}
cache_result_t LRUStorage::do_get_value(const CACHE_KEY& key,
uint32_t flags,
GWBUF** ppvalue)
{
NodesPerKey::iterator i = nodes_per_key_.find(key);
bool existed = (i != nodes_per_key_.end());
cache_result_t result = pstorage_->get_value(key, flags, ppvalue);
if (result == CACHE_RESULT_OK)
{
if (existed)
{
if (ptail_ == i->second)
{
ptail_ = i->second->prev();
}
phead_ = i->second->prepend(phead_);
}
else
{
MXS_ERROR("Item found in storage, but not in key mapping.");
}
}
return result;
}
cache_result_t LRUStorage::do_put_value(const CACHE_KEY& key,
const GWBUF* pvalue)
{
cache_result_t result = CACHE_RESULT_ERROR;
size_t value_size = GWBUF_LENGTH(pvalue);
size_t new_size = size_ + value_size;
Node* pnode = NULL;
NodesPerKey::iterator i = nodes_per_key_.find(key);
bool existed = (i != nodes_per_key_.end());
if (existed)
{
// TODO: Also in this case max_size_ needs to be honoured.
pnode = i->second;
}
else
{
if ((new_size > max_size_) || (count_ == max_count_))
{
if (new_size > max_size_)
{
MXS_NOTICE("New size %lu > max size %lu. Removing least recently used.",
new_size, max_size_);
pnode = free_lru(value_size);
}
else
{
ss_dassert(count_ == max_count_);
MXS_NOTICE("Max count %lu reached, removing least recently used.", max_count_);
pnode = free_lru();
}
}
else
{
pnode = new (std::nothrow) Node;
}
if (pnode)
{
try
{
std::pair<NodesPerKey::iterator, bool>
rv = nodes_per_key_.insert(std::make_pair(key, pnode));
ss_dassert(rv.second);
i = rv.first;
}
catch (const std::exception& x)
{
delete pnode;
pnode = NULL;
result = CACHE_RESULT_OUT_OF_RESOURCES;
}
}
}
if (pnode)
{
result = pstorage_->put_value(key, pvalue);
if (result == CACHE_RESULT_OK)
{
if (existed)
{
size_ -= pnode->size();
}
else
{
++count_;
}
pnode->reset(&i->first, value_size);
size_ += pnode->size();
if (ptail_ == pnode)
{
ptail_ = pnode->prev();
}
phead_ = pnode->prepend(phead_);
if (!ptail_)
{
ptail_ = phead_;
}
}
else if (!existed)
{
MXS_ERROR("Could not put a value to the storage.");
nodes_per_key_.erase(i);
delete pnode;
}
}
return result;
}
cache_result_t LRUStorage::do_del_value(const CACHE_KEY& key)
{
NodesPerKey::iterator i = nodes_per_key_.find(key);
cache_result_t result = pstorage_->del_value(key);
if (result == CACHE_RESULT_OK)
{
if (i == nodes_per_key_.end())
{
Node* pnode = i->second;
ss_dassert(size_ > pnode->size());
ss_dassert(count_ > 0);
size_ -= pnode->size();
--count_;
phead_ = pnode->remove();
delete pnode;
if (!phead_)
{
ptail_ = NULL;
}
nodes_per_key_.erase(i);
}
else
{
MXS_ERROR("Key was found from storage, but not from LRU register.");
}
}
return result;
}
/**
* Free the data associated with the least recently used node.
*
* @return The node itself, for reuse.
*/
LRUStorage::Node* LRUStorage::free_lru()
{
ss_dassert(ptail_);
Node* pnode = NULL;
if (free_node_data(ptail_))
{
pnode = ptail_;
ptail_ = ptail_->remove();
}
return pnode;
}
/**
* Free the data associated with sufficient number of least recently used nodes,
* to make the required space available.
*
* @return The last node whose data was freed, for reuse.
*/
LRUStorage::Node* LRUStorage::free_lru(size_t needed_space)
{
Node* pnode = NULL;
size_t freed_space = 0;
bool error = false;
while (!error && ptail_ && (freed_space < needed_space))
{
size_t size = ptail_->size();
if (free_node_data(ptail_))
{
freed_space += size;
pnode = ptail_;
ptail_ = ptail_->remove();
if (freed_space < needed_space)
{
delete pnode;
pnode = NULL;
}
}
else
{
error = true;
}
}
if (pnode)
{
pnode->reset();
}
return pnode;
}
/**
* Free the data associated with a node.
*
* @return True, if the data could be freed, false otherwise.
*/
bool LRUStorage::free_node_data(Node* pnode)
{
bool success = true;
const CACHE_KEY* pkey = pnode->key();
ss_dassert(pkey);
NodesPerKey::iterator i = nodes_per_key_.find(*pkey);
if (i == nodes_per_key_.end())
{
MXS_ERROR("Item in LRU list was not found in key mapping.");
}
cache_result_t result = pstorage_->del_value(*pkey);
switch (result)
{
case CACHE_RESULT_NOT_FOUND:
MXS_ERROR("Item in LRU list was not found in storage.");
case CACHE_RESULT_OK:
if (i != nodes_per_key_.end())
{
nodes_per_key_.erase(i);
}
ss_dassert(size_ >= pnode->size());
ss_dassert(count_ > 0);
size_ -= pnode->size();
count_ -= 1;
break;
default:
MXS_ERROR("Could not remove value from storage, cannot "
"remove from LRU list or key mapping either.");
success = false;
}
return success;
}

179
server/modules/filter/cache/lrustorage.h vendored Normal file
View File

@ -0,0 +1,179 @@
#pragma once
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl.
*
* Change Date: 2019-07-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include <maxscale/cdefs.h>
#include <tr1/unordered_map>
#include "storage.h"
#include "cachefilter.h"
class LRUStorage : public Storage
{
public:
~LRUStorage();
/**
* @see Storage::get_key
*/
cache_result_t get_key(const char* zDefaultDb,
const GWBUF* pQuery,
CACHE_KEY* pKey);
protected:
LRUStorage(Storage* pstorage, size_t max_count, size_t max_size);
/**
* Fetches the value from the underlying storage and, if found, moves the
* entry to the top of the LRU list.
*
* @see Storage::get_value
*/
cache_result_t do_get_value(const CACHE_KEY& key,
uint32_t flags,
GWBUF** ppValue);
/**
* Stores the value to the underlying storage and, if successful, either
* places the entry at or moves the existing entry to the top of the LRU
* list.
*
* @see Storage::put_value
*/
cache_result_t do_put_value(const CACHE_KEY& key,
const GWBUF* pValue);
/**
* Deletes the value from the underlying storage and, if successful, removes
* the entry from the LRU list.
*
* @see Storage::del_value
*/
cache_result_t do_del_value(const CACHE_KEY& key);
private:
LRUStorage(const LRUStorage&);
LRUStorage& operator = (const LRUStorage&);
/**
* The Node class is used for maintaining LRU information.
*/
class Node
{
public:
Node()
: pkey_(NULL)
, size_(0)
, pnext_(NULL)
, pprev_(NULL)
{}
~Node()
{
if (pnext_)
{
pnext_->pprev_ = pprev_;
}
if (pprev_)
{
pprev_->pnext_ = pnext_;
}
}
const CACHE_KEY* key() const { return pkey_; }
size_t size() const { return size_; }
Node* next() const { return pnext_; }
Node* prev() const { return pprev_; }
/**
* Move the node before the node provided as argument.
*
* @param pnode The node in front of which this should be moved.
* @return This node.
*/
Node* prepend(Node* pnode)
{
if (pnode)
{
if (pprev_)
{
pprev_->pnext_ = pnext_;
}
if (pnext_)
{
pnext_->pprev_ = pprev_;
}
if (pnode->pprev_)
{
pnode->pprev_->pnext_ = this;
}
pprev_ = pnode->pprev_;
pnext_ = pnode;
pnode->pprev_ = this;
}
return this;
}
/**
* Remove this node from the list.
*
* @return The previous node if there is one, or the next node.
*/
Node* remove()
{
if (pprev_)
{
pprev_->pnext_ = pnext_;
}
if (pnext_)
{
pnext_->pprev_ = pprev_;
}
return pprev_ ? pprev_ : pnext_;
}
void reset(const CACHE_KEY* pkey = NULL, size_t size = 0)
{
pkey_ = pkey;
size_ = size;
}
private:
const CACHE_KEY* pkey_; /*< Points at the key stored in nodes_per_key_ below. */
size_t size_; /*< The size of the data referred to by pkey_. */
Node* pnext_; /*< The next node in the LRU list. */
Node* pprev_; /*< The previous node in the LRU list. */
};
Node* free_lru();
Node* free_lru(size_t space);
bool free_node_data(Node* pnode);
private:
typedef std::tr1::unordered_map<CACHE_KEY, Node*> NodesPerKey;
Storage* pstorage_; /*< The actual storage. */
size_t max_count_; /*< The maximum number of items in the LRU list, */
size_t max_size_; /*< The maximum size of all cached items. */
size_t count_; /*< The current count of cached items. */
size_t size_; /*< The current size of all cached items. */
NodesPerKey nodes_per_key_; /*< Mapping from cache keys to corresponding Node. */
Node* phead_; /*< The node at the LRU list. */
Node* ptail_; /*< The node at bottom of the LRU list.*/
};

View File

@ -0,0 +1,60 @@
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl.
*
* Change Date: 2019-07-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#define MXS_MODULE_NAME "cache"
#include "lrustoragemt.h"
LRUStorageMT::LRUStorageMT(Storage* pstorage, size_t max_count, size_t max_size)
: LRUStorage(pstorage, max_count, max_size)
{
spinlock_init(&lock_);
MXS_NOTICE("Created multi threaded LRU storage.");
}
LRUStorageMT::~LRUStorageMT()
{
}
LRUStorageMT* LRUStorageMT::create(Storage* pstorage, size_t max_count, size_t max_size)
{
LRUStorageMT* plru_storage = NULL;
CPP_GUARD(plru_storage = new LRUStorageMT(pstorage, max_count, max_size));
return plru_storage;
}
cache_result_t LRUStorageMT::get_value(const CACHE_KEY& key,
uint32_t flags,
GWBUF** ppvalue)
{
LockGuard guard(&lock_);
return do_get_value(key, flags, ppvalue);
}
cache_result_t LRUStorageMT::put_value(const CACHE_KEY& key,
const GWBUF* pvalue)
{
LockGuard guard(&lock_);
return do_put_value(key, pvalue);
}
cache_result_t LRUStorageMT::del_value(const CACHE_KEY& key)
{
LockGuard guard(&lock_);
return do_del_value(key);
}

View File

@ -0,0 +1,43 @@
#pragma once
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl.
*
* Change Date: 2019-07-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include <maxscale/cdefs.h>
#include <maxscale/spinlock.h>
#include "lrustorage.h"
class LRUStorageMT : public LRUStorage
{
public:
~LRUStorageMT();
static LRUStorageMT* create(Storage* pstorage, size_t max_count, size_t max_size);
cache_result_t get_value(const CACHE_KEY& key,
uint32_t flags,
GWBUF** ppvalue);
cache_result_t put_value(const CACHE_KEY& key,
const GWBUF* pvalue);
cache_result_t del_value(const CACHE_KEY& key);
private:
LRUStorageMT(Storage* pstorage, size_t max_count, size_t max_size);
LRUStorageMT(const LRUStorageMT&);
LRUStorageMT& operator = (const LRUStorageMT&);
private:
SPINLOCK lock_;
};

View File

@ -0,0 +1,52 @@
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl.
*
* Change Date: 2019-07-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#define MXS_MODULE_NAME "cache"
#include "lrustoragest.h"
LRUStorageST::LRUStorageST(Storage* pstorage, size_t max_count, size_t max_size)
: LRUStorage(pstorage, max_count, max_size)
{
MXS_NOTICE("Created single threaded LRU storage.");
}
LRUStorageST::~LRUStorageST()
{
}
LRUStorageST* LRUStorageST::create(Storage* pstorage, size_t max_count, size_t max_size)
{
LRUStorageST* plru_storage = NULL;
CPP_GUARD(plru_storage = new LRUStorageST(pstorage, max_count, max_size));
return plru_storage;
}
cache_result_t LRUStorageST::get_value(const CACHE_KEY& key,
uint32_t flags,
GWBUF** ppvalue)
{
return LRUStorage::do_get_value(key, flags, ppvalue);
}
cache_result_t LRUStorageST::put_value(const CACHE_KEY& key,
const GWBUF* pvalue)
{
return LRUStorage::do_put_value(key, pvalue);
}
cache_result_t LRUStorageST::del_value(const CACHE_KEY& key)
{
return LRUStorage::do_del_value(key);
}

View File

@ -0,0 +1,39 @@
#pragma once
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl.
*
* Change Date: 2019-07-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include <maxscale/cdefs.h>
#include "lrustorage.h"
class LRUStorageST : public LRUStorage
{
public:
~LRUStorageST();
static LRUStorageST* create(Storage* pstorage, size_t max_count, size_t max_size);
cache_result_t get_value(const CACHE_KEY& key,
uint32_t flags,
GWBUF** ppValue);
cache_result_t put_value(const CACHE_KEY& key,
const GWBUF* pValue);
cache_result_t del_value(const CACHE_KEY& key);
private:
LRUStorageST(Storage* pstorage, size_t max_count, size_t max_size);
LRUStorageST(const LRUStorageST&);
LRUStorageST& operator = (const LRUStorageST&);
};

View File

@ -156,13 +156,6 @@ static mysql_account_kind_t mysql_to_pcre(char *pcre, const char *mysql, pcre_qu
* API begin
*/
/**
* Returns a string representation of a attribute.
*
* @param attribute An attribute type.
*
* @return Corresponding string, not to be freed.
*/
const char *cache_rule_attribute_to_string(cache_rule_attribute_t attribute)
{
switch (attribute)
@ -188,13 +181,6 @@ const char *cache_rule_attribute_to_string(cache_rule_attribute_t attribute)
}
}
/**
* Returns a string representation of an operator.
*
* @param op An operator.
*
* @return Corresponding string, not to be freed.
*/
const char *cache_rule_op_to_string(cache_rule_op_t op)
{
switch (op)
@ -217,13 +203,6 @@ const char *cache_rule_op_to_string(cache_rule_op_t op)
}
}
/**
* Create a default cache rules object.
*
* @param debug The debug level.
*
* @return The rules object or NULL is allocation fails.
*/
CACHE_RULES *cache_rules_create(uint32_t debug)
{
CACHE_RULES *rules = (CACHE_RULES*)MXS_CALLOC(1, sizeof(CACHE_RULES));
@ -236,14 +215,6 @@ CACHE_RULES *cache_rules_create(uint32_t debug)
return rules;
}
/**
* Loads the caching rules from a file and returns corresponding object.
*
* @param path The path of the file containing the rules.
* @param debug The debug level.
*
* @return The corresponding rules object, or NULL in case of error.
*/
CACHE_RULES *cache_rules_load(const char *path, uint32_t debug)
{
CACHE_RULES *rules = NULL;
@ -279,14 +250,6 @@ CACHE_RULES *cache_rules_load(const char *path, uint32_t debug)
return rules;
}
/**
* Parses the caching rules from a string and returns corresponding object.
*
* @param json String containing json.
* @param debug The debug level.
*
* @return The corresponding rules object, or NULL in case of error.
*/
CACHE_RULES *cache_rules_parse(const char *json, uint32_t debug)
{
CACHE_RULES *rules = NULL;
@ -308,13 +271,6 @@ CACHE_RULES *cache_rules_parse(const char *json, uint32_t debug)
return rules;
}
/**
* Frees the rules object.
*
* @param path The path of the file containing the rules.
*
* @return The corresponding rules object, or NULL in case of error.
*/
void cache_rules_free(CACHE_RULES *rules)
{
if (rules)
@ -325,15 +281,6 @@ void cache_rules_free(CACHE_RULES *rules)
}
}
/**
* Returns boolean indicating whether the result of the query should be stored.
*
* @param self The CACHE_RULES object.
* @param default_db The current default database, NULL if there is none.
* @param query The query, expected to contain a COM_QUERY.
*
* @return True, if the results should be stored.
*/
bool cache_rules_should_store(CACHE_RULES *self, const char *default_db, const GWBUF* query)
{
bool should_store = false;
@ -356,14 +303,6 @@ bool cache_rules_should_store(CACHE_RULES *self, const char *default_db, const G
return should_store;
}
/**
* Returns boolean indicating whether the cache should be used, that is consulted.
*
* @param self The CACHE_RULES object.
* @param session The current session.
*
* @return True, if the cache should be used.
*/
bool cache_rules_should_use(CACHE_RULES *self, const SESSION *session)
{
bool should_use = false;
@ -401,6 +340,57 @@ bool cache_rules_should_use(CACHE_RULES *self, const SESSION *session)
return should_use;
}
CacheRules::CacheRules(CACHE_RULES* prules)
: prules_(prules)
{
}
CacheRules::~CacheRules()
{
cache_rules_free(prules_);
}
// static
CacheRules* CacheRules::create(uint32_t debug)
{
CacheRules* pthis = NULL;
CACHE_RULES* prules = cache_rules_create(debug);
if (prules)
{
pthis = new (std::nothrow) CacheRules(prules);
}
return pthis;
}
// static
CacheRules* CacheRules::load(const char *zpath, uint32_t debug)
{
CacheRules* pthis = NULL;
CACHE_RULES* prules = cache_rules_load(zpath, debug);
if (prules)
{
pthis = new (std::nothrow) CacheRules(prules);
}
return pthis;
}
bool CacheRules::should_store(const char* zdefault_db, const GWBUF* pquery) const
{
return cache_rules_should_store(prules_, zdefault_db, pquery);
}
bool CacheRules::should_use(const SESSION* psession) const
{
return cache_rules_should_use(prules_, psession);
}
/*
* API end
*/

View File

@ -68,18 +68,140 @@ typedef struct cache_rules
CACHE_RULE *use_rules; // The rules for when to use data from the cache.
} CACHE_RULES;
/**
* Returns a string representation of a attribute.
*
* @param attribute An attribute type.
*
* @return Corresponding string, not to be freed.
*/
const char *cache_rule_attribute_to_string(cache_rule_attribute_t attribute);
/**
* Returns a string representation of an operator.
*
* @param op An operator.
*
* @return Corresponding string, not to be freed.
*/
const char *cache_rule_op_to_string(cache_rule_op_t op);
/**
* Create a default cache rules object.
*
* @param debug The debug level.
*
* @return The rules object or NULL is allocation fails.
*/
CACHE_RULES *cache_rules_create(uint32_t debug);
/**
* Frees the rules object.
*
* @param path The path of the file containing the rules.
*
* @return The corresponding rules object, or NULL in case of error.
*/
void cache_rules_free(CACHE_RULES *rules);
/**
* Loads the caching rules from a file and returns corresponding object.
*
* @param path The path of the file containing the rules.
* @param debug The debug level.
*
* @return The corresponding rules object, or NULL in case of error.
*/
CACHE_RULES *cache_rules_load(const char *path, uint32_t debug);
/**
* Parses the caching rules from a string and returns corresponding object.
*
* @param json String containing json.
* @param debug The debug level.
*
* @return The corresponding rules object, or NULL in case of error.
*/
CACHE_RULES *cache_rules_parse(const char *json, uint32_t debug);
/**
* Returns boolean indicating whether the result of the query should be stored.
*
* @param rules The CACHE_RULES object.
* @param default_db The current default database, NULL if there is none.
* @param query The query, expected to contain a COM_QUERY.
*
* @return True, if the results should be stored.
*/
bool cache_rules_should_store(CACHE_RULES *rules, const char *default_db, const GWBUF* query);
/**
* Returns boolean indicating whether the cache should be used, that is consulted.
*
* @param rules The CACHE_RULES object.
* @param session The current session.
*
* @return True, if the cache should be used.
*/
bool cache_rules_should_use(CACHE_RULES *rules, const SESSION *session);
MXS_END_DECLS
#if defined(__cplusplus)
class CacheRules
{
public:
~CacheRules();
/**
* Creates an empty rules object.
*
* @param debug The debug level.
*
* @return An empty rules object, or NULL in case of error.
*/
static CacheRules* create(uint32_t debug);
/**
* Loads the caching rules from a file and returns corresponding object.
*
* @param path The path of the file containing the rules.
* @param debug The debug level.
*
* @return The corresponding rules object, or NULL in case of error.
*/
static CacheRules* load(const char *zpath, uint32_t debug);
/**
* Returns boolean indicating whether the result of the query should be stored.
*
* @param zdefault_db The current default database, NULL if there is none.
* @param pquery The query, expected to contain a COM_QUERY.
*
* @return True, if the results should be stored.
*/
bool should_store(const char* zdefault_db, const GWBUF* pquery) const;
/**
* Returns boolean indicating whether the cache should be used, that is consulted.
*
* @param psession The current session.
*
* @return True, if the cache should be used.
*/
bool should_use(const SESSION* psession) const;
private:
CacheRules(CACHE_RULES* prules);
CacheRules(const CacheRules&);
CacheRules& operator = (const CacheRules&);
private:
CACHE_RULES* prules_;
};
#endif
#endif

View File

@ -1 +1,2 @@
add_subdirectory(storage_rocksdb)
add_subdirectory(storage_inmemory)

View File

@ -0,0 +1,10 @@
add_library(storage_inmemory SHARED
inmemorystorage.cc
inmemorystoragest.cc
inmemorystoragemt.cc
storage_inmemory.cc
)
target_link_libraries(storage_inmemory cache maxscale-common)
set_target_properties(storage_inmemory PROPERTIES VERSION "1.0.0")
set_target_properties(storage_inmemory PROPERTIES LINK_FLAGS -Wl,-z,defs)
install_module(storage_inmemory experimental)

View File

@ -0,0 +1,195 @@
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl.
*
* Change Date: 2019-07-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#define MXS_MODULE_NAME "storage_inmemory"
#include "inmemorystorage.h"
#include <openssl/sha.h>
#include <algorithm>
#include <set>
#include <maxscale/alloc.h>
#include <maxscale/modutil.h>
#include <maxscale/query_classifier.h>
using std::set;
using std::string;
namespace
{
const size_t INMEMORY_KEY_LENGTH = 2 * SHA512_DIGEST_LENGTH;
#if INMEMORY_KEY_LENGTH > CACHE_KEY_MAXLEN
#error storage_inmemory key is too long.
#endif
}
InMemoryStorage::InMemoryStorage(const string& name,
uint32_t ttl)
: name_(name)
, ttl_(ttl)
{
}
InMemoryStorage::~InMemoryStorage()
{
}
cache_result_t InMemoryStorage::get_key(const char* zdefault_db, const GWBUF* pquery, CACHE_KEY* pkey)
{
ss_dassert(GWBUF_IS_CONTIGUOUS(pquery));
int n;
bool fullnames = true;
char** pztables = qc_get_table_names(const_cast<GWBUF*>(pquery), &n, fullnames);
set<string> dbs; // Elements in set are sorted.
for (int i = 0; i < n; ++i)
{
char *ztable = pztables[i];
char *zdot = strchr(ztable, '.');
if (zdot)
{
*zdot = 0;
dbs.insert(ztable);
}
else if (zdefault_db)
{
// If zdefault_db is NULL, then there will be a table for which we
// do not know the database. However, that will fail in the server,
// so nothing will be stored.
dbs.insert(zdefault_db);
}
MXS_FREE(ztable);
}
MXS_FREE(pztables);
// dbs now contain each accessed database in sorted order. Now copy them to a single string.
string tag;
for (set<string>::const_iterator i = dbs.begin(); i != dbs.end(); ++i)
{
tag.append(*i);
}
memset(pkey->data, 0, CACHE_KEY_MAXLEN);
const unsigned char* pdata;
// We store the databases in the first half of the key. That will ensure that
// identical queries targeting different default databases will not clash.
// This will also mean that entries related to the same databases will
// be placed near each other.
pdata = reinterpret_cast<const unsigned char*>(tag.data());
SHA512(pdata, tag.length(), reinterpret_cast<unsigned char*>(pkey->data));
char *psql;
int length;
modutil_extract_SQL(const_cast<GWBUF*>(pquery), &psql, &length);
// Then we store the query itself in the second half of the key.
pdata = reinterpret_cast<const unsigned char*>(psql);
SHA512(pdata, length, reinterpret_cast<unsigned char*>(pkey->data) + SHA512_DIGEST_LENGTH);
return CACHE_RESULT_OK;
}
cache_result_t InMemoryStorage::do_get_value(const CACHE_KEY& key, uint32_t flags, GWBUF** ppresult)
{
cache_result_t result = CACHE_RESULT_NOT_FOUND;
Entries::iterator i = entries_.find(key);
if (i != entries_.end())
{
Entry& entry = i->second;
uint32_t now = time(NULL);
bool is_stale = (now - entry.time > ttl_);
if (!is_stale || ((flags & CACHE_FLAGS_INCLUDE_STALE) != 0))
{
size_t length = entry.value.size();
*ppresult = gwbuf_alloc(length);
if (*ppresult)
{
memcpy(GWBUF_DATA(*ppresult), entry.value.data(), length);
if (is_stale)
{
result = CACHE_RESULT_STALE;
}
else
{
result = CACHE_RESULT_OK;
}
}
else
{
result = CACHE_RESULT_OUT_OF_RESOURCES;
}
}
else
{
MXS_NOTICE("Cache item is stale, not using.");
}
}
return result;
}
cache_result_t InMemoryStorage::do_put_value(const CACHE_KEY& key, const GWBUF* pvalue)
{
ss_dassert(GWBUF_IS_CONTIGUOUS(pvalue));
size_t size = GWBUF_LENGTH(pvalue);
Entry& entry = entries_[key];
if (size < entry.value.capacity())
{
// If the needed value is less than what is currently stored,
// we shrink the buffer so as not to waste space.
Value value(size);
entry.value.swap(value);
}
else
{
entry.value.resize(size);
}
const uint8_t* pdata = GWBUF_DATA(pvalue);
copy(pdata, pdata + size, entry.value.begin());
entry.time = time(NULL);
return CACHE_RESULT_OK;
}
cache_result_t InMemoryStorage::do_del_value(const CACHE_KEY& key)
{
Entries::iterator i = entries_.find(key);
if (i != entries_.end())
{
entries_.erase(i);
}
return i != entries_.end() ? CACHE_RESULT_OK : CACHE_RESULT_NOT_FOUND;
}

View File

@ -0,0 +1,62 @@
#pragma once
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl.
*
* Change Date: 2019-07-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include <maxscale/cdefs.h>
#include <memory>
#include <string>
#include <vector>
#include <tr1/unordered_map>
#include "../../cachefilter.h"
class InMemoryStorage
{
public:
virtual ~InMemoryStorage();
cache_result_t get_key(const char* zdefault_db, const GWBUF* pquery, CACHE_KEY* pkey);
virtual cache_result_t get_value(const CACHE_KEY& key, uint32_t flags, GWBUF** ppresult) = 0;
virtual cache_result_t put_value(const CACHE_KEY& key, const GWBUF* pvalue) = 0;
virtual cache_result_t del_value(const CACHE_KEY& key) = 0;
protected:
InMemoryStorage(const std::string& name, uint32_t ttl);
cache_result_t do_get_value(const CACHE_KEY& key, uint32_t flags, GWBUF** ppresult);
cache_result_t do_put_value(const CACHE_KEY& key, const GWBUF* pvalue);
cache_result_t do_del_value(const CACHE_KEY& key);
private:
InMemoryStorage(const InMemoryStorage&);
InMemoryStorage& operator = (const InMemoryStorage&);
private:
typedef std::vector<uint8_t> Value;
struct Entry
{
Entry()
: time(0)
{}
uint32_t time;
Value value;
};
typedef std::tr1::unordered_map<CACHE_KEY, Entry> Entries;
std::string name_;
uint32_t ttl_;
Entries entries_;
};

View File

@ -0,0 +1,54 @@
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl.
*
* Change Date: 2019-07-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#define MXS_MODULE_NAME "storage_inmemory"
#include "inmemorystoragemt.h"
InMemoryStorageMT::InMemoryStorageMT(const std::string& name, uint32_t ttl)
: InMemoryStorage(name, ttl)
{
spinlock_init(&lock_);
}
InMemoryStorageMT::~InMemoryStorageMT()
{
}
// static
InMemoryStorageMT* InMemoryStorageMT::create(const std::string& name,
uint32_t ttl,
int argc, char* argv[])
{
return new InMemoryStorageMT(name, ttl);
}
cache_result_t InMemoryStorageMT::get_value(const CACHE_KEY& key, uint32_t flags, GWBUF** ppresult)
{
LockGuard guard(&lock_);
return do_get_value(key, flags, ppresult);
}
cache_result_t InMemoryStorageMT::put_value(const CACHE_KEY& key, const GWBUF* pvalue)
{
LockGuard guard(&lock_);
return do_put_value(key, pvalue);
}
cache_result_t InMemoryStorageMT::del_value(const CACHE_KEY& key)
{
LockGuard guard(&lock_);
return do_del_value(key);
}

View File

@ -0,0 +1,39 @@
#pragma once
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl.
*
* Change Date: 2019-07-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include <maxscale/cdefs.h>
#include <maxscale/spinlock.h>
#include "inmemorystorage.h"
class InMemoryStorageMT : public InMemoryStorage
{
public:
~InMemoryStorageMT();
static InMemoryStorageMT* create(const std::string& name, uint32_t ttl, int argc, char* argv[]);
cache_result_t get_value(const CACHE_KEY& key, uint32_t flags, GWBUF** ppresult);
cache_result_t put_value(const CACHE_KEY& key, const GWBUF* pvalue);
cache_result_t del_value(const CACHE_KEY& key);
private:
InMemoryStorageMT(const std::string& name, uint32_t ttl);
private:
InMemoryStorageMT(const InMemoryStorageMT&);
InMemoryStorageMT& operator = (const InMemoryStorageMT&);
private:
SPINLOCK lock_;
};

View File

@ -0,0 +1,47 @@
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl.
*
* Change Date: 2019-07-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#define MXS_MODULE_NAME "storage_inmemory"
#include "inmemorystoragest.h"
InMemoryStorageST::InMemoryStorageST(const std::string& name, uint32_t ttl)
: InMemoryStorage(name, ttl)
{
}
InMemoryStorageST::~InMemoryStorageST()
{
}
// static
InMemoryStorageST* InMemoryStorageST::create(const std::string& name,
uint32_t ttl,
int argc, char* argv[])
{
return new InMemoryStorageST(name, ttl);
}
cache_result_t InMemoryStorageST::get_value(const CACHE_KEY& key, uint32_t flags, GWBUF** ppresult)
{
return do_get_value(key, flags, ppresult);
}
cache_result_t InMemoryStorageST::put_value(const CACHE_KEY& key, const GWBUF* pvalue)
{
return do_put_value(key, pvalue);
}
cache_result_t InMemoryStorageST::del_value(const CACHE_KEY& key)
{
return do_del_value(key);
}

View File

@ -0,0 +1,35 @@
#pragma once
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl.
*
* Change Date: 2019-07-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include <maxscale/cdefs.h>
#include "inmemorystorage.h"
class InMemoryStorageST : public InMemoryStorage
{
public:
~InMemoryStorageST();
static InMemoryStorageST* create(const std::string& name, uint32_t ttl, int argc, char* argv[]);
cache_result_t get_value(const CACHE_KEY& key, uint32_t flags, GWBUF** ppresult);
cache_result_t put_value(const CACHE_KEY& key, const GWBUF* pvalue);
cache_result_t del_value(const CACHE_KEY& key);
private:
InMemoryStorageST(const std::string& name, uint32_t ttl);
private:
InMemoryStorageST(const InMemoryStorageST&);
InMemoryStorageST& operator = (const InMemoryStorageST&);
};

View File

@ -0,0 +1,233 @@
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl.
*
* Change Date: 2019-07-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#define MXS_MODULE_NAME "storage_inmemory"
#include <inttypes.h>
#include "../../cache_storage_api.h"
#include "inmemorystoragest.h"
#include "inmemorystoragemt.h"
namespace
{
bool initialize(uint32_t* pcapabilities)
{
*pcapabilities = CACHE_STORAGE_CAP_ST;
*pcapabilities = CACHE_STORAGE_CAP_MT;
return true;
}
CACHE_STORAGE* createInstance(cache_thread_model_t model,
const char* zname,
uint32_t ttl,
uint32_t max_count,
uint64_t max_size,
int argc, char* argv[])
{
ss_dassert(zname);
CACHE_STORAGE* pStorage = 0;
if (max_count != 0)
{
MXS_WARNING("A maximum item count of %" PRIu32 " specified, although 'storage_inMemory' "
"does not enforce such a limit.", max_count);
}
if (max_size != 0)
{
MXS_WARNING("A maximum size of %" PRIu64 " specified, although 'storage_inMemory' "
"does not enforce such a limit.", max_size);
}
try
{
switch (model)
{
case CACHE_THREAD_MODEL_ST:
pStorage = reinterpret_cast<CACHE_STORAGE*>(InMemoryStorageST::create(zname, ttl, argc, argv));
break;
default:
MXS_ERROR("Unknown thread model %d, creating multi-thread aware storage.", (int)model);
case CACHE_THREAD_MODEL_MT:
pStorage = reinterpret_cast<CACHE_STORAGE*>(InMemoryStorageST::create(zname, ttl, argc, argv));
}
MXS_NOTICE("Storage module created.");
}
catch (const std::bad_alloc&)
{
MXS_OOM();
}
catch (const std::exception& x)
{
MXS_ERROR("Standard exception caught: %s", x.what());
}
catch (...)
{
MXS_ERROR("Unknown exception caught.");
}
return pStorage;
}
void freeInstance(CACHE_STORAGE* pinstance)
{
delete reinterpret_cast<InMemoryStorage*>(pinstance);
}
cache_result_t getKey(CACHE_STORAGE* pstorage,
const char* zdefault_db,
const GWBUF* pquery,
CACHE_KEY* pkey)
{
ss_dassert(pstorage);
// zdefault_db may be NULL.
ss_dassert(pquery);
ss_dassert(pkey);
cache_result_t result = CACHE_RESULT_ERROR;
try
{
result = reinterpret_cast<InMemoryStorage*>(pstorage)->get_key(zdefault_db, pquery, pkey);
}
catch (const std::bad_alloc&)
{
MXS_OOM();
}
catch (const std::exception& x)
{
MXS_ERROR("Standard exception caught: %s", x.what());
}
catch (...)
{
MXS_ERROR("Unknown exception caught.");
}
return result;
}
cache_result_t getValue(CACHE_STORAGE* pstorage,
const CACHE_KEY* pkey,
uint32_t flags,
GWBUF** ppresult)
{
ss_dassert(pstorage);
ss_dassert(pkey);
ss_dassert(ppresult);
cache_result_t result = CACHE_RESULT_ERROR;
try
{
result = reinterpret_cast<InMemoryStorage*>(pstorage)->get_value(*pkey, flags, ppresult);
}
catch (const std::bad_alloc&)
{
MXS_OOM();
}
catch (const std::exception& x)
{
MXS_ERROR("Standard exception caught: %s", x.what());
}
catch (...)
{
MXS_ERROR("Unknown exception caught.");
}
return result;
}
cache_result_t putValue(CACHE_STORAGE* pstorage,
const CACHE_KEY* pkey,
const GWBUF* pvalue)
{
ss_dassert(pstorage);
ss_dassert(pkey);
ss_dassert(pvalue);
cache_result_t result = CACHE_RESULT_ERROR;
try
{
result = reinterpret_cast<InMemoryStorage*>(pstorage)->put_value(*pkey, pvalue);
}
catch (const std::bad_alloc&)
{
MXS_OOM();
}
catch (const std::exception& x)
{
MXS_ERROR("Standard exception caught: %s", x.what());
}
catch (...)
{
MXS_ERROR("Unknown exception caught.");
}
return result;
}
cache_result_t delValue(CACHE_STORAGE* pstorage,
const CACHE_KEY* pkey)
{
ss_dassert(pstorage);
ss_dassert(pkey);
cache_result_t result = CACHE_RESULT_ERROR;
try
{
result = reinterpret_cast<InMemoryStorage*>(pstorage)->del_value(*pkey);
}
catch (const std::bad_alloc&)
{
MXS_OOM();
}
catch (const std::exception& x)
{
MXS_ERROR("Standard exception caught: %s", x.what());
}
catch (...)
{
MXS_ERROR("Unknown exception caught.");
}
return result;
}
}
extern "C"
{
CACHE_STORAGE_API* CacheGetStorageAPI()
{
static CACHE_STORAGE_API api =
{
initialize,
createInstance,
freeInstance,
getKey,
getValue,
putValue,
delValue,
};
return &api;
}
}

View File

@ -11,6 +11,7 @@
* Public License.
*/
#define MXS_MODULE_NAME "storage_rocksdb"
#include "rocksdbinternals.h"
#include <rocksdb/env.h>
#include <util/coding.h>

View File

@ -11,6 +11,7 @@
* Public License.
*/
#define MXS_MODULE_NAME "storage_rocksdb"
#include "rocksdbstorage.h"
#include <openssl/sha.h>
#include <sys/stat.h>

View File

@ -11,30 +11,50 @@
* Public License.
*/
#define MXS_MODULE_NAME "storage_rocksdb"
#include "storage_rocksdb.h"
#include <inttypes.h>
#include "../../cache_storage_api.h"
#include "rocksdbstorage.h"
namespace
{
bool initialize()
bool initialize(uint32_t* pCapabilities)
{
*pCapabilities = CACHE_STORAGE_CAP_MT;
return RocksDBStorage::Initialize();
}
CACHE_STORAGE* createInstance(cache_thread_model_t, // Ignored, RocksDB always MT safe.
const char* zName,
uint32_t ttl,
uint32_t maxCount,
uint64_t maxSize,
int argc, char* argv[])
{
ss_dassert(zName);
CACHE_STORAGE* pStorage = 0;
if (maxCount != 0)
{
MXS_WARNING("A maximum item count of %" PRIu32 " specifed, although 'storage_rocksdb' "
"does not enforce such a limit.", maxCount);
}
if (maxSize != 0)
{
MXS_WARNING("A maximum size of %" PRIu64 " specified, although 'storage_rocksdb' "
"does not enforce such a limit.", maxSize);
}
try
{
pStorage = reinterpret_cast<CACHE_STORAGE*>(RocksDBStorage::Create(zName, ttl, argc, argv));
MXS_NOTICE("Storage module created.");
}
catch (const std::bad_alloc&)
{

View File

@ -20,13 +20,18 @@
#include <maxscale/gwdirs.h>
#include <maxscale/log_manager.h>
#include "cachefilter.h"
#include "lrustoragest.h"
#include "lrustoragemt.h"
#include "storagereal.h"
namespace
{
bool open_cache_storage(const char* zName, void** pHandle, CACHE_STORAGE_API** ppApi)
bool open_cache_storage(const char* zName,
void** pHandle,
CACHE_STORAGE_API** ppApi,
uint32_t* pCapabilities)
{
bool rv = false;
@ -45,7 +50,7 @@ bool open_cache_storage(const char* zName, void** pHandle, CACHE_STORAGE_API** p
if (pApi)
{
if ((pApi->initialize)())
if ((pApi->initialize)(pCapabilities))
{
*pHandle = handle;
*ppApi = pApi;
@ -96,9 +101,12 @@ void close_cache_storage(void* handle, CACHE_STORAGE_API* pApi)
}
StorageFactory::StorageFactory(void* handle, CACHE_STORAGE_API* pApi)
StorageFactory::StorageFactory(void* handle,
CACHE_STORAGE_API* pApi,
uint32_t capabilities)
: m_handle(handle)
, m_pApi(pApi)
, m_capabilities(capabilities)
{
ss_dassert(handle);
ss_dassert(pApi);
@ -118,10 +126,11 @@ StorageFactory* StorageFactory::Open(const char* zName)
void* handle;
CACHE_STORAGE_API* pApi;
uint32_t capabilities;
if (open_cache_storage(zName, &handle, &pApi))
if (open_cache_storage(zName, &handle, &pApi, &capabilities))
{
CPP_GUARD(pFactory = new StorageFactory(handle, pApi));
CPP_GUARD(pFactory = new StorageFactory(handle, pApi, capabilities));
if (!pFactory)
{
@ -135,19 +144,63 @@ StorageFactory* StorageFactory::Open(const char* zName)
Storage* StorageFactory::createStorage(cache_thread_model_t model,
const char* zName,
uint32_t ttl,
uint32_t maxCount,
uint64_t maxSize,
int argc, char* argv[])
{
ss_dassert(m_handle);
ss_dassert(m_pApi);
Storage* pStorage = 0;
CACHE_STORAGE* pRawStorage = m_pApi->createInstance(model, zName, ttl, argc, argv);
uint32_t mc = cache_storage_has_cap(m_capabilities, CACHE_STORAGE_CAP_MAX_COUNT) ? maxCount : 0;
uint64_t ms = cache_storage_has_cap(m_capabilities, CACHE_STORAGE_CAP_MAX_SIZE) ? maxSize : 0;
CACHE_STORAGE* pRawStorage = m_pApi->createInstance(model, zName, ttl, mc, ms, argc, argv);
if (pRawStorage)
{
CPP_GUARD(pStorage = new StorageReal(m_pApi, pRawStorage));
StorageReal* pStorageReal = NULL;
if (!pStorage)
CPP_GUARD(pStorageReal = new StorageReal(m_pApi, pRawStorage));
if (pStorageReal)
{
uint32_t mask = CACHE_STORAGE_CAP_MAX_COUNT | CACHE_STORAGE_CAP_MAX_SIZE;
if (!cache_storage_has_cap(m_capabilities, mask))
{
// Ok, so the cache cannot handle eviction. Let's decorate the
// real storage with a storage than can.
LRUStorage *pLruStorage = NULL;
if (model == CACHE_THREAD_MODEL_ST)
{
pLruStorage = LRUStorageST::create(pStorageReal, maxCount, maxSize);
}
else
{
ss_dassert(model == CACHE_THREAD_MODEL_MT);
pLruStorage = LRUStorageMT::create(pStorageReal, maxCount, maxSize);
}
if (pLruStorage)
{
pStorage = pLruStorage;
}
else
{
delete pStorageReal;
}
}
else
{
pStorage = pStorageReal;
}
}
else
{
m_pApi->freeInstance(pRawStorage);
}

View File

@ -29,17 +29,20 @@ public:
Storage* createStorage(cache_thread_model_t model,
const char* zName,
uint32_t ttl,
uint32_t max_count,
uint64_t max_size,
int argc, char* argv[]);
private:
StorageFactory(void* handle, CACHE_STORAGE_API* pApi);
StorageFactory(void* handle, CACHE_STORAGE_API* pApi, uint32_t capabilities);
StorageFactory(const StorageFactory&);
StorageFactory& operator = (const StorageFactory&);
private:
void* m_handle;
CACHE_STORAGE_API* m_pApi;
void* m_handle; /*< dl handle of storage. */
CACHE_STORAGE_API* m_pApi; /*< API of storage. */
uint32_t m_capabilities; /*< Capabilities of storage. */
};
#endif

View File

@ -213,8 +213,6 @@ int route_single_query(TEE_INSTANCE* my_instance,
int reset_session_state(TEE_SESSION* my_session, GWBUF* buffer);
void create_orphan(SESSION* ses);
extern LIST_CONFIG SESSIONlist;
static void
orphan_free(void* data)
{
@ -299,7 +297,7 @@ orphan_free(void* data)
tmp->session->router_session);
tmp->session->state = SESSION_STATE_FREE;
list_free_entry(&SESSIONlist, (list_entry_t*)tmp->session);
MXS_FREE(tmp->session);
MXS_FREE(tmp);
}

View File

@ -599,10 +599,8 @@ gw_read_backend_event(DCB *dcb)
if (proto->protocol_auth_state == MXS_AUTH_STATE_COMPLETE)
{
/** Authentication completed successfully */
spinlock_acquire(&dcb->authlock);
GWBUF *localq = dcb->delayq;
dcb->delayq = NULL;
spinlock_release(&dcb->authlock);
if (localq)
{
@ -675,6 +673,42 @@ gw_reply_on_error(DCB *dcb, mxs_auth_state_t state)
gwbuf_free(errbuf);
}
/**
* @brief Check if a reply can be routed to the client
*
* @param Backend DCB
* @return True if session is ready for reply routing
*/
static inline bool session_ok_to_route(DCB *dcb)
{
bool rval = false;
if (dcb->session->state == SESSION_STATE_ROUTER_READY &&
dcb->session->client_dcb != NULL &&
dcb->session->client_dcb->state == DCB_STATE_POLLING &&
(dcb->session->router_session ||
service_get_capabilities(dcb->session->service) & RCAP_TYPE_NO_RSESSION))
{
MySQLProtocol *client_protocol = (MySQLProtocol *)dcb->session->client_dcb->protocol;
if (client_protocol)
{
CHK_PROTOCOL(client_protocol);
if (client_protocol->protocol_auth_state == MXS_AUTH_STATE_COMPLETE)
{
rval = true;
}
}
else if (dcb->session->client_dcb->dcb_role == DCB_ROLE_INTERNAL)
{
rval = true;
}
}
return rval;
}
/**
* @brief With authentication completed, read new data and write to backend
*
@ -688,7 +722,7 @@ gw_read_and_write(DCB *dcb)
GWBUF *read_buffer = NULL;
SESSION *session = dcb->session;
int nbytes_read;
int return_code;
int return_code = 0;
CHK_SESSION(session);
@ -721,50 +755,56 @@ gw_read_and_write(DCB *dcb)
session->state = SESSION_STATE_STOPPING;
spinlock_release(&session->ses_lock);
}
return_code = 0;
goto return_rc;
return 0;
}
nbytes_read = gwbuf_length(read_buffer);
if (nbytes_read == 0)
{
ss_dassert(read_buffer == NULL);
goto return_rc;
return return_code;
}
else
{
ss_dassert(read_buffer != NULL);
}
if (nbytes_read < 3)
{
dcb->dcb_readqueue = read_buffer;
return_code = 0;
goto return_rc;
}
/** Ask what type of output the router/filter chain expects */
uint64_t capabilities = service_get_capabilities(session->service);
if (rcap_type_required(capabilities, RCAP_TYPE_STMT_OUTPUT))
{
GWBUF *tmp = modutil_get_complete_packets(&read_buffer);
/* Put any residue into the read queue */
spinlock_acquire(&dcb->authlock);
dcb->dcb_readqueue = read_buffer;
spinlock_release(&dcb->authlock);
if (tmp == NULL)
{
/** No complete packets */
return_code = 0;
goto return_rc;
return 0;
}
else
read_buffer = tmp;
if (rcap_type_required(capabilities, RCAP_TYPE_CONTIGUOUS_OUTPUT))
{
read_buffer = tmp;
if ((tmp = gwbuf_make_contiguous(read_buffer)))
{
read_buffer = tmp;
}
else
{
/** Failed to make the buffer contiguous */
gwbuf_free(read_buffer);
poll_fake_hangup_event(dcb);
return 0;
}
}
}
MySQLProtocol *proto = (MySQLProtocol *)dcb->protocol;
spinlock_acquire(&dcb->authlock);
if (proto->ignore_reply)
{
@ -775,8 +815,6 @@ gw_read_and_write(DCB *dcb)
proto->ignore_reply = false;
gwbuf_free(read_buffer);
spinlock_release(&dcb->authlock);
int rval = 0;
if (result == MYSQL_REPLY_OK)
@ -794,8 +832,6 @@ gw_read_and_write(DCB *dcb)
return rval;
}
spinlock_release(&dcb->authlock);
/**
* If protocol has session command set, concatenate whole
* response into one buffer.
@ -809,64 +845,32 @@ gw_read_and_write(DCB *dcb)
*/
if (!sescmd_response_complete(dcb))
{
return_code = 0;
goto return_rc;
return 0;
}
if (!read_buffer)
{
MXS_NOTICE("%lu [gw_read_backend_event] "
MXS_ERROR("%lu [gw_read_backend_event] "
"Read buffer unexpectedly null, even though response "
"not marked as complete. User: %s",
pthread_self(), dcb->session->client_dcb->user);
return_code = 0;
goto return_rc;
return 0;
}
}
/**
* Check that session is operable, and that client DCB is
* still listening the socket for replies.
*/
if (dcb->session->state == SESSION_STATE_ROUTER_READY &&
dcb->session->client_dcb != NULL &&
dcb->session->client_dcb->state == DCB_STATE_POLLING &&
(session->router_session ||
service_get_capabilities(session->service) & RCAP_TYPE_NO_RSESSION))
if (session_ok_to_route(dcb))
{
MySQLProtocol *client_protocol = (MySQLProtocol *)dcb->session->client_dcb->protocol;
if (client_protocol != NULL)
{
CHK_PROTOCOL(client_protocol);
if (client_protocol->protocol_auth_state == MXS_AUTH_STATE_COMPLETE)
{
gwbuf_set_type(read_buffer, GWBUF_TYPE_MYSQL);
session->service->router->clientReply(
session->service->router_instance,
session->router_session,
read_buffer,
dcb);
return_code = 1;
}
}
else if (dcb->session->client_dcb->dcb_role == DCB_ROLE_INTERNAL)
{
gwbuf_set_type(read_buffer, GWBUF_TYPE_MYSQL);
session->service->router->clientReply(
session->service->router_instance,
session->router_session,
read_buffer,
dcb);
return_code = 1;
}
gwbuf_set_type(read_buffer, GWBUF_TYPE_MYSQL);
session->service->router->clientReply(session->service->router_instance,
session->router_session,
read_buffer, dcb);
return_code = 1;
}
else /*< session is closing; replying to client isn't possible */
{
gwbuf_free(read_buffer);
}
return_rc:
return return_code;
}
@ -886,13 +890,11 @@ static int gw_write_backend_event(DCB *dcb)
uint8_t* data = NULL;
bool com_quit = false;
spinlock_acquire(&dcb->writeqlock);
if (dcb->writeq)
{
data = (uint8_t *) GWBUF_DATA(dcb->writeq);
com_quit = MYSQL_IS_COM_QUIT(data);
}
spinlock_release(&dcb->writeqlock);
if (data)
{
@ -947,7 +949,6 @@ static int gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue)
int rc = 0;
CHK_DCB(dcb);
spinlock_acquire(&dcb->authlock);
if (dcb->was_persistent && dcb->state == DCB_STATE_POLLING)
{
@ -967,8 +968,6 @@ static int gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue)
backend_protocol->ignore_reply = true;
backend_protocol->stored_query = queue;
spinlock_release(&dcb->authlock);
GWBUF *buf = gw_create_change_user_packet(dcb->session->client_dcb->data, dcb->protocol);
return dcb_write(dcb, buf) ? 1 : 0;
}
@ -987,7 +986,6 @@ static int gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue)
*/
backend_protocol->stored_query = gwbuf_append(backend_protocol->stored_query, queue);
}
spinlock_release(&dcb->authlock);
return 1;
}
@ -1012,7 +1010,7 @@ static int gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue)
gwbuf_free(queue);
rc = 0;
spinlock_release(&dcb->authlock);
break;
case MXS_AUTH_STATE_COMPLETE:
@ -1027,7 +1025,7 @@ static int gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue)
dcb->fd,
STRPROTOCOLSTATE(backend_protocol->protocol_auth_state));
spinlock_release(&dcb->authlock);
/**
* Statement type is used in readwrite split router.
* Command is *not* set for readconn router.
@ -1082,7 +1080,7 @@ static int gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue)
* connected with auth ok
*/
backend_set_delayqueue(dcb, queue);
spinlock_release(&dcb->authlock);
rc = 1;
}
break;
@ -1807,9 +1805,9 @@ static GWBUF* process_response_data(DCB* dcb,
/** Store the already read data into the readqueue of the DCB
* and restore the response status to the initial number of packets */
spinlock_acquire(&dcb->authlock);
dcb->dcb_readqueue = gwbuf_append(outbuf, dcb->dcb_readqueue);
spinlock_release(&dcb->authlock);
protocol_set_response_status(p, initial_packets, initial_bytes);
return NULL;
}

View File

@ -433,19 +433,19 @@ int gw_read_client_event(DCB* dcb)
* will be changed to MYSQL_IDLE (see below).
*
*/
case MXS_AUTH_STATE_MESSAGE_READ:
/* After this call read_buffer will point to freed data */
if (nbytes_read < 3 || (0 == max_bytes && nbytes_read <
(MYSQL_GET_PACKET_LEN((uint8_t *) GWBUF_DATA(read_buffer)) + 4)) ||
(0 != max_bytes && nbytes_read < max_bytes))
{
spinlock_acquire(&dcb->authlock);
dcb->dcb_readqueue = read_buffer;
spinlock_release(&dcb->authlock);
return 0;
}
return_code = gw_read_do_authentication(dcb, read_buffer, nbytes_read);
break;
case MXS_AUTH_STATE_MESSAGE_READ:
/* After this call read_buffer will point to freed data */
if (nbytes_read < 3 || (0 == max_bytes && nbytes_read <
(MYSQL_GET_PACKET_LEN((uint8_t *) GWBUF_DATA(read_buffer)) + 4)) ||
(0 != max_bytes && nbytes_read < max_bytes))
{
dcb->dcb_readqueue = read_buffer;
return 0;
}
return_code = gw_read_do_authentication(dcb, read_buffer, nbytes_read);
break;
/**
*
@ -861,9 +861,9 @@ gw_read_normal_data(DCB *dcb, GWBUF *read_buffer, int nbytes_read)
if (nbytes_read < 3 || nbytes_read <
(MYSQL_GET_PACKET_LEN((uint8_t *) GWBUF_DATA(read_buffer)) + 4))
{
spinlock_acquire(&dcb->authlock);
dcb->dcb_readqueue = read_buffer;
spinlock_release(&dcb->authlock);
return 0;
}
gwbuf_set_type(read_buffer, GWBUF_TYPE_MYSQL);
@ -904,9 +904,9 @@ gw_read_finish_processing(DCB *dcb, GWBUF *read_buffer, uint64_t capabilities)
{
/* Must have been data left over */
/* Add incomplete mysql packet to read queue */
spinlock_acquire(&dcb->authlock);
dcb->dcb_readqueue = gwbuf_append(dcb->dcb_readqueue, read_buffer);
spinlock_release(&dcb->authlock);
}
}
else if (NULL != session->router_session || (rcap_type_required(capabilities, RCAP_TYPE_NO_RSESSION)))

View File

@ -1039,9 +1039,9 @@ bool read_complete_packet(DCB *dcb, GWBUF **readbuf)
if (localbuf)
{
/** Store any extra data in the DCB's readqueue */
spinlock_acquire(&dcb->authlock);
dcb->dcb_readqueue = gwbuf_append(dcb->dcb_readqueue, localbuf);
spinlock_release(&dcb->authlock);
}
}
@ -1061,7 +1061,6 @@ bool gw_get_shared_session_auth_info(DCB* dcb, MYSQL_session* session)
CHK_DCB(dcb);
CHK_SESSION(dcb->session);
spinlock_acquire(&dcb->session->ses_lock);
if (dcb->session->state != SESSION_STATE_ALLOC &&
dcb->session->state != SESSION_STATE_DUMMY)
@ -1076,7 +1075,7 @@ bool gw_get_shared_session_auth_info(DCB* dcb, MYSQL_session* session)
pthread_self(), dcb->session->state);
rval = false;
}
spinlock_release(&dcb->session->ses_lock);
return rval;
}

View File

@ -92,8 +92,6 @@
#define ARG_TYPE_FILTER 9
#define ARG_TYPE_NUMERIC 10
extern LIST_CONFIG SESSIONlist;
/**
* The subcommand structure
*
@ -1522,6 +1520,8 @@ convert_arg(int mode, char *arg, int arg_type)
return 0;
}
static SPINLOCK debugcmd_lock = SPINLOCK_INIT;
/**
* We have a complete line from the user, lookup the commands and execute them
*
@ -1613,6 +1613,8 @@ execute_cmd(CLI_SESSION *cli)
argc = i - 2; /* The number of extra arguments to commands */
spinlock_acquire(&debugcmd_lock);
if (!strcasecmp(args[0], "help"))
{
if (args[1] == NULL || *args[1] == 0)
@ -1664,11 +1666,7 @@ execute_cmd(CLI_SESSION *cli)
}
found = 1;
}
else if (!strcasecmp(args[0], "quit"))
{
return 0;
}
else if (argc >= 0)
else if (strcasecmp(args[0], "quit") && argc >= 0)
{
for (i = 0; cmds[i].cmd; i++)
{
@ -1712,7 +1710,7 @@ execute_cmd(CLI_SESSION *cli)
if (arg_list[k] == 0)
{
dcb_printf(dcb, "Invalid argument: %s\n", args[k + 2]);
return 0;
break;
}
}
@ -1809,6 +1807,8 @@ execute_cmd(CLI_SESSION *cli)
"Command '%s' not known, type help for a list of available commands\n", args[0]);
}
spinlock_release(&debugcmd_lock);
memset(cli->cmdbuf, 0, CMDBUFLEN);
return 1;
@ -2115,6 +2115,31 @@ static bool get_log_action(const char* name, struct log_action_entry* entryp)
return found;
}
bool seslog_cb(DCB *dcb, void *data)
{
bool rval = true;
struct log_action_entry *entry = ((void**)data)[0];
size_t *id = ((void**)data)[1];
bool enable = (bool)((void**)data)[2];
SESSION *session = dcb->session;
if (session->ses_id == *id)
{
if (enable)
{
session_enable_log_priority(session, entry->priority);
}
else
{
session_disable_log_priority(session, entry->priority);
}
rval = false;
}
return rval;
}
/**
* Enables a log for a single session
* @param session The session in question
@ -2127,21 +2152,10 @@ static void enable_sess_log_action(DCB *dcb, char *arg1, char *arg2)
if (get_log_action(arg1, &entry))
{
size_t id = (size_t) strtol(arg2, 0, 0);
list_entry_t *current = list_start_iteration(&SESSIONlist);
while (current)
{
SESSION *session = (SESSION *)current;
if (session->ses_id == id)
{
session_enable_log_priority(session, entry.priority);
list_terminate_iteration_early(&SESSIONlist, current);
break;
}
current = list_iterate(&SESSIONlist, current);
}
size_t id = (size_t)strtol(arg2, NULL, 10);
void *data[] = {&entry, &id, (void*)true};
if (!current)
if (dcb_foreach(seslog_cb, data))
{
dcb_printf(dcb, "Session not found: %s.\n", arg2);
}
@ -2164,28 +2178,17 @@ static void disable_sess_log_action(DCB *dcb, char *arg1, char *arg2)
if (get_log_action(arg1, &entry))
{
size_t id = (size_t) strtol(arg2, 0, 0);
list_entry_t *current = list_start_iteration(&SESSIONlist);
while (current)
{
SESSION *session = (SESSION *)current;
if (session->ses_id == id)
{
session_disable_log_priority(session, entry.priority);
list_terminate_iteration_early(&SESSIONlist, current);
break;
}
current = list_iterate(&SESSIONlist, current);
}
size_t id = (size_t)strtol(arg2, NULL, 10);
void *data[] = {&entry, &id, (void*)false};
if (!current)
if (dcb_foreach(seslog_cb, data))
{
dcb_printf(dcb, "Session not found: %s.\n", arg2);
}
}
else
{
dcb_printf(dcb, "%s is not supported for disable log.\n", arg1);
dcb_printf(dcb, "%s is not supported for enable log.\n", arg1);
}
}
@ -2226,6 +2229,30 @@ static int string_to_priority(const char* name)
return result ? result->priority : -1;
}
bool sesprio_cb(DCB *dcb, void *data)
{
bool rval = true;
int *priority = ((void**)data)[0];
size_t *id = ((void**)data)[1];
bool enable = (bool)((void**)data)[2];
SESSION *session = dcb->session;
if (session->ses_id == *id)
{
if (enable)
{
session_enable_log_priority(session, *priority);
}
else
{
session_disable_log_priority(session, *priority);
}
rval = false;
}
return rval;
}
/**
* Enables a log priority for a single session
* @param session The session in question
@ -2238,21 +2265,10 @@ static void enable_sess_log_priority(DCB *dcb, char *arg1, char *arg2)
if (priority != -1)
{
size_t id = (size_t) strtol(arg2, 0, 0);
list_entry_t *current = list_start_iteration(&SESSIONlist);
while (current)
{
SESSION *session = (SESSION *)current;
if (session->ses_id == id)
{
session_enable_log_priority(session, priority);
list_terminate_iteration_early(&SESSIONlist, current);
break;
}
current = list_iterate(&SESSIONlist, current);
}
size_t id = (size_t) strtol(arg2, NULL, 10);
void *data[] = {&priority, &id, (void*)true};
if (!current)
if (dcb_foreach(sesprio_cb, data))
{
dcb_printf(dcb, "Session not found: %s.\n", arg2);
}
@ -2275,21 +2291,10 @@ static void disable_sess_log_priority(DCB *dcb, char *arg1, char *arg2)
if (priority != -1)
{
size_t id = (size_t) strtol(arg2, 0, 0);
list_entry_t *current = list_start_iteration(&SESSIONlist);
while (current)
{
SESSION *session = (SESSION *)current;
if (session->ses_id == id)
{
session_disable_log_priority(session, priority);
list_terminate_iteration_early(&SESSIONlist, current);
break;
}
current = list_iterate(&SESSIONlist, current);
}
size_t id = (size_t) strtol(arg2, NULL, 10);
void *data[] = {&priority, &id, (void*)false};
if (!current)
if (dcb_foreach(seslog_cb, data))
{
dcb_printf(dcb, "Session not found: %s.\n", arg2);
}

View File

@ -1044,15 +1044,6 @@ maxinfo_event_queue_length()
return poll_get_stat(POLL_STAT_EVQ_LEN);
}
/**
* Interface to poll stats for event pending queue length
*/
static int
maxinfo_event_pending_queue_length()
{
return poll_get_stat(POLL_STAT_EVQ_PENDING);
}
/**
* Interface to poll stats for max event queue length
*/
@ -1108,7 +1099,6 @@ static struct
{ "Error_events", VT_INT, (STATSFUNC)maxinfo_error_events },
{ "Accept_events", VT_INT, (STATSFUNC)maxinfo_accept_events },
{ "Event_queue_length", VT_INT, (STATSFUNC)maxinfo_event_queue_length },
{ "Pending_events", VT_INT, (STATSFUNC)maxinfo_event_pending_queue_length },
{ "Max_event_queue_length", VT_INT, (STATSFUNC)maxinfo_max_event_queue_length },
{ "Max_event_queue_time", VT_INT, (STATSFUNC)maxinfo_max_event_queue_time },
{ "Max_event_execution_time", VT_INT, (STATSFUNC)maxinfo_max_event_exec_time },

View File

@ -607,41 +607,6 @@ bool check_shard_status(ROUTER_INSTANCE* router, char* shard)
return false;
}
/**
* A fake DCB read function used to forward queued queries.
* @param dcb Internal DCB used by the router session
* @return Always 1
*/
int internalRoute(DCB* dcb)
{
if (dcb->dcb_readqueue && dcb->session)
{
GWBUF* tmp = dcb->dcb_readqueue;
void* rinst = dcb->session->service->router_instance;
void *rses = dcb->session->router_session;
dcb->dcb_readqueue = NULL;
return dcb->session->service->router->routeQuery(rinst, rses, tmp);
}
return 1;
}
/**
* A fake DCB read function used to forward replies to the client.
* @param dcb Internal DCB used by the router session
* @return Always 1
*/
int internalReply(DCB* dcb)
{
if (dcb->dcb_readqueue && dcb->session)
{
GWBUF* tmp = dcb->dcb_readqueue;
dcb->dcb_readqueue = NULL;
return SESSION_ROUTE_REPLY(dcb->session, tmp);
}
return 1;
}
/**
* Implementation of the mandatory version entry point
*
@ -955,16 +920,8 @@ static void* newSession(ROUTER* router_inst, SESSION* session)
}
client_rses->shardmap = map;
client_rses->dcb_reply = dcb_alloc(DCB_ROLE_INTERNAL, NULL);
client_rses->dcb_reply->func.read = internalReply;
client_rses->dcb_reply->state = DCB_STATE_POLLING;
client_rses->dcb_reply->session = session;
memcpy(&client_rses->rses_config, &router->schemarouter_config, sizeof(schemarouter_config_t));
client_rses->n_sescmd = 0;
client_rses->dcb_route = dcb_alloc(DCB_ROLE_INTERNAL, NULL);
client_rses->dcb_route->func.read = internalRoute;
client_rses->dcb_route->state = DCB_STATE_POLLING;
client_rses->dcb_route->session = session;
client_rses->rses_config.last_refresh = time(NULL);
if (using_db)
@ -1147,18 +1104,7 @@ static void closeSession(ROUTER* instance, void* router_session)
}
}
/* Close internal DCBs */
router_cli_ses->dcb_reply->session = NULL;
router_cli_ses->dcb_route->session = NULL;
dcb_close(router_cli_ses->dcb_reply);
dcb_close(router_cli_ses->dcb_route);
while (router_cli_ses->queue &&
(router_cli_ses->queue = gwbuf_consume(
router_cli_ses->queue, gwbuf_length(router_cli_ses->queue))))
{
;
}
gwbuf_free(router_cli_ses->queue);
/** Unlock */
rses_end_locked_router_action(router_cli_ses);
@ -4153,7 +4099,7 @@ void route_queued_query(ROUTER_CLIENT_SES *router_cli_ses)
querystr);
MXS_FREE(querystr);
#endif
poll_add_epollin_event_to_dcb(router_cli_ses->dcb_route, tmp);
poll_add_epollin_event_to_dcb(router_cli_ses->rses_client_dcb, tmp);
}
/**