From 8d55be4b232df60b0aaaa5dbf91d80171a4b4692 Mon Sep 17 00:00:00 2001 From: Mark Riddoch Date: Fri, 30 May 2014 16:45:39 +0100 Subject: [PATCH] First working filters implementaton. Only downstream filters are supported currently, i.e. no result set filtering can be done. A crude QLA (Query Log All) filter is included as a test harness only --- server/Makefile | 4 + server/core/Makefile | 6 +- server/core/config.c | 44 +++- server/core/filter.c | 279 +++++++++++++++++++++++++ server/core/service.c | 82 ++++++++ server/core/session.c | 84 +++++++- server/include/filter.h | 96 +++++++++ server/include/modules.h | 2 + server/include/service.h | 55 ++--- server/include/session.h | 24 +++ server/modules/filter/Makefile | 83 ++++++++ server/modules/filter/qlafilter.c | 211 +++++++++++++++++++ server/modules/filter/testfilter.c | 190 +++++++++++++++++ server/modules/protocol/mysql_client.c | 25 +-- server/modules/protocol/telnetd.c | 5 +- server/modules/routing/debugcmd.c | 19 ++ 16 files changed, 1158 insertions(+), 51 deletions(-) create mode 100644 server/core/filter.c create mode 100644 server/include/filter.h create mode 100644 server/modules/filter/Makefile create mode 100644 server/modules/filter/qlafilter.c create mode 100644 server/modules/filter/testfilter.c diff --git a/server/Makefile b/server/Makefile index 2ac7ed2be..d6aba988e 100644 --- a/server/Makefile +++ b/server/Makefile @@ -33,6 +33,7 @@ all: (cd modules/routing/readwritesplit; touch depend.mk ;make) (cd modules/protocol; touch depend.mk ;make) (cd modules/monitor; touch depend.mk ;make) + (cd modules/filter; touch depend.mk ;make) cleantests: $(MAKE) -C test cleantests @@ -49,12 +50,14 @@ clean: (cd modules/routing; touch depend.mk ; make clean) (cd modules/protocol; touch depend.mk ; make clean) (cd modules/monitor; touch depend.mk ; make clean) + (cd modules/filter; touch depend.mk ; make clean) depend: (cd core; touch depend.mk ; make depend) (cd modules/routing; touch depend.mk ; make depend) (cd modules/protocol; touch depend.mk ; make depend) (cd modules/monitor; touch depend.mk ; make depend) + (cd modules/filter; touch depend.mk ; make depend) install: @mkdir -p $(DEST) @@ -71,3 +74,4 @@ install: (cd modules/routing; make DEST=$(DEST) install) (cd modules/protocol; make DEST=$(DEST) install) (cd modules/monitor; make DEST=$(DEST) install) + (cd modules/filter; make DEST=$(DEST) install) diff --git a/server/core/Makefile b/server/core/Makefile index bd29fbae8..b2722f94d 100644 --- a/server/core/Makefile +++ b/server/core/Makefile @@ -33,6 +33,7 @@ # 29/06/13 Vilho Raatikka Reverted Query classifier changes because # gateway needs mysql client lib, not qc. # 24/07/13 Mark Ridoch Addition of encryption routines +# 30/05/14 Mark Ridoch Filter API added include ../../build_gateway.inc @@ -56,14 +57,15 @@ LDFLAGS=-rdynamic -L$(LOGPATH) \ SRCS= atomic.c buffer.c spinlock.c gateway.c \ gw_utils.c utils.c dcb.c load_utils.c session.c service.c server.c \ poll.c config.c users.c hashtable.c dbusers.c thread.c gwbitmask.c \ - monitor.c adminusers.c secrets.c + monitor.c adminusers.c secrets.c filter.c HDRS= ../include/atomic.h ../include/buffer.h ../include/dcb.h \ ../include/gw.h ../modules/include/mysql_client_server_protocol.h \ ../include/session.h ../include/spinlock.h ../include/thread.h \ ../include/modules.h ../include/poll.h ../include/config.h \ ../include/users.h ../include/hashtable.h ../include/gwbitmask.h \ - ../include/adminusers.h ../include/version.h ../include/maxscale.h + ../include/adminusers.h ../include/version.h ../include/maxscale.h \ + ../include/filter.h OBJ=$(SRCS:.c=.o) diff --git a/server/core/config.c b/server/core/config.c index be38314ab..1950f4cac 100644 --- a/server/core/config.c +++ b/server/core/config.c @@ -31,6 +31,7 @@ * 11/03/14 Massimiliano Pinto Added Unix socket support * 11/05/14 Massimiliano Pinto Added version_string support to service * 19/05/14 Mark Riddoch Added unique names from section headers + * 29/05/14 Mark Riddoch Addition of filter definition * * @endverbatim */ @@ -215,6 +216,8 @@ int error_count = 0; { char *router = config_get_value(obj->parameters, "router"); + char *filters = config_get_value(obj->parameters, + "filters"); if (router) { char* max_slave_conn_str; @@ -301,6 +304,9 @@ int error_count = 0; param->value))); } } + if (filters) + serviceSetFilters(obj->element, + filters); } else { @@ -359,6 +365,36 @@ int error_count = 0; obj->object))); } } + else if (!strcmp(type, "filter")) + { + char *module = config_get_value(obj->parameters, + "module"); + char *options = config_get_value(obj->parameters, + "options"); + + if (module) + { + obj->element = filter_alloc(obj->object, module); + } + else + { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error: Filter '%s' has no module " + "defined defined to load.", + obj->object))); + error_count++; + } + if (obj->element) + { + char *s = strtok(options, ","); + while (s) + { + filterAddOption(obj->element, s); + s = strtok(NULL, ","); + } + } + } obj = obj->next; } @@ -550,7 +586,8 @@ int error_count = 0; error_count++; } } - else if (strcmp(type, "server") != 0) + else if (strcmp(type, "server") != 0 + && strcmp(type, "filter") != 0) { LOGIF(LE, (skygw_log_write_flush( LOGFILE_ERROR, @@ -959,10 +996,12 @@ SERVER *server; { char *servers; char *roptions; + char *filters; servers = config_get_value(obj->parameters, "servers"); roptions = config_get_value(obj->parameters, "router_options"); + filters = config_get_value(obj->parameters, "filters"); if (servers && obj->element) { char *s = strtok(servers, ","); @@ -996,6 +1035,8 @@ SERVER *server; s = strtok(NULL, ","); } } + if (filters) + serviceSetFilters(obj->element, filters); } else if (!strcmp(type, "listener")) { @@ -1080,6 +1121,7 @@ static char *service_params[] = "enable_root_user", "max_slave_connections", "version_string", + "filters", NULL }; diff --git a/server/core/filter.c b/server/core/filter.c new file mode 100644 index 000000000..4394477ab --- /dev/null +++ b/server/core/filter.c @@ -0,0 +1,279 @@ +/* + * This file is distributed as part of MaxScale from SkySQL. It is free + * software: you can redistribute it and/or modify it under the terms of the + * GNU General Public License as published by the Free Software Foundation, + * version 2. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Copyright SkySQL Ab 2014 + */ + +/** + * @file filter.c - A representation of a filter within MaxScale. + * + * @verbatim + * Revision History + * + * Date Who Description + * 29/05/14 Mark Riddoch Initial implementation + * + * @endverbatim + */ +#include +#include +#include +#include +#include +#include +#include +#include +#include + +extern int lm_enabled_logfiles_bitmask; + +static SPINLOCK filter_spin = SPINLOCK_INIT; +static FILTER_DEF *allFilters = NULL; + +/** + * Allocate a new filter within MaxScale + * + * + * @param name The filter name + * @param module The module to load + * + * @return The newly created filter or NULL if an error occured + */ +FILTER_DEF * +filter_alloc(char *name, char *module) +{ +FILTER_DEF *filter; + + if ((filter = (FILTER_DEF *)malloc(sizeof(FILTER_DEF))) == NULL) + return NULL; + filter->name = strdup(name); + filter->module = strdup(module); + + spinlock_init(&filter->spin); + + spinlock_acquire(&filter_spin); + filter->next = allFilters; + allFilters = filter; + spinlock_release(&filter_spin); + + return filter; +} + + +/** + * Deallocate the specified filter + * + * @param server The service to deallocate + * @return Returns true if the server was freed + */ +void +filter_free(FILTER_DEF *filter) +{ +FILTER_DEF *ptr; + + /* First of all remove from the linked list */ + spinlock_acquire(&filter_spin); + if (allFilters == filter) + { + allFilters = filter->next; + } + else + { + ptr = allFilters; + while (ptr && ptr->next != filter) + { + ptr = ptr->next; + } + if (ptr) + ptr->next = filter->next; + } + spinlock_release(&filter_spin); + + /* Clean up session and free the memory */ + free(filter->name); + free(filter->module); + free(filter); +} + +/** + * Find an existing filter using the unique section name in + * configuration file + * + * @param name The filter name + * @return The server or NULL if not found + */ +FILTER_DEF * +filter_find(char *name) +{ +FILTER_DEF *filter; + + spinlock_acquire(&filter_spin); + filter = allFilters; + while (filter) + { + if (strcmp(filter->name, name) == 0) + break; + filter = filter->next; + } + spinlock_release(&filter_spin); + return filter; +} + +/** + * Print all filters to a DCB + * + * Designed to be called within a debugger session in order + * to display all active filters within MaxScale + */ +void +dprintAllFilters(DCB *dcb) +{ +FILTER_DEF *ptr; +int i; + + spinlock_acquire(&filter_spin); + ptr = allFilters; + while (ptr) + { + dcb_printf(dcb, "Filter %p (%s)\n", ptr, ptr->name); + dcb_printf(dcb, "\tModule: %s\n", ptr->module); + if (ptr->options) + { + dcb_printf(dcb, "\tOptions: "); + for (i = 0; ptr->options && ptr->options[i]; i++) + dcb_printf(dcb, "%s ", ptr->options[i]); + dcb_printf(dcb, "\n"); + } + if (ptr->obj && ptr->filter) + ptr->obj->diagnostics(ptr->filter, NULL, dcb); + ptr = ptr->next; + } + spinlock_release(&filter_spin); +} + +/** + * Print filter details to a DCB + * + * Designed to be called within a debug CLI in order + * to display all active filters in MaxScale + */ +void +dprintFilter(DCB *dcb, FILTER_DEF *filter) +{ +int i; + + dcb_printf(dcb, "Filter %p (%s)\n", filter, filter->name); + dcb_printf(dcb, "\tModule: %s\n", filter->module); + if (filter->options) + { + dcb_printf(dcb, "\tOptions: "); + for (i = 0; filter->options && filter->options[i]; i++) + dcb_printf(dcb, "%s ", filter->options[i]); + dcb_printf(dcb, "\n"); + } + if (filter->obj && filter->filter) + filter->obj->diagnostics(filter->filter, NULL, dcb); +} + +/** + * List all filters in a tabular form to a DCB + * + */ +void +dListFilters(DCB *dcb) +{ +FILTER_DEF *ptr; +int i; + + spinlock_acquire(&filter_spin); + ptr = allFilters; + if (ptr) + { + dcb_printf(dcb, "%-18s | %-15s | Options\n", + "Filter", "Address", "Status"); + dcb_printf(dcb, "-------------------------------------------------------------------------------\n"); + } + while (ptr) + { + dcb_printf(dcb, "%-18s | %-15s | ", + ptr->name, ptr->module); + for (i = 0; ptr->options && ptr->options[i]; i++) + dcb_printf(dcb, "%s ", ptr->options[i]); + dcb_printf(dcb, "\n"); + ptr = ptr->next; + } + spinlock_release(&filter_spin); +} + + + +/** + * Add a router option to a service + * + * @param service The service to add the router option to + * @param option The option string + */ +void +filterAddOption(FILTER_DEF *filter, char *option) +{ +int i; + + spinlock_acquire(&filter->spin); + if (filter->options == NULL) + { + filter->options = (char **)calloc(2, sizeof(char *)); + filter->options[0] = strdup(option); + filter->options[1] = NULL; + } + else + { + for (i = 0; filter->options[i]; i++) + ; + filter->options = (char **)realloc(filter->options, + (i + 2) * sizeof(char *)); + filter->options[i] = strdup(option); + filter->options[i+1] = NULL; + } + spinlock_release(&filter->spin); +} + +DOWNSTREAM * +filterApply(FILTER_DEF *filter, SESSION *session, DOWNSTREAM *downstream) +{ +DOWNSTREAM *me; + + if (filter->obj == NULL) + { + /* Filter not yet loaded */ + if ((filter->obj = load_module(filter->module, + MODULE_FILTER)) == NULL) + { + return NULL; + } + } + if (filter->filter == NULL) + filter->filter = (filter->obj->createInstance)(filter->options); + if ((me = (DOWNSTREAM *)calloc(1, sizeof(DOWNSTREAM))) == NULL) + { + return NULL; + } + me->instance = filter->filter; + me->routeQuery = filter->obj->routeQuery; + me->session = filter->obj->newSession(me->instance, session); + + filter->obj->setDownstream(me->instance, me->session, downstream); + + return me; +} diff --git a/server/core/service.c b/server/core/service.c index cb7bea457..d528881b1 100644 --- a/server/core/service.c +++ b/server/core/service.c @@ -30,6 +30,7 @@ * 28/02/14 Massimiliano Pinto users_alloc moved from service_alloc to serviceStartPort (generic hashable for services) * 07/05/14 Massimiliano Pinto Added: version_string initialized to NULL * 23/05/14 Mark Riddoch Addition of service validation call + * 29/05/14 Mark Riddoch Filter API implementation * * @endverbatim */ @@ -46,6 +47,7 @@ #include #include #include +#include #include #include #include @@ -110,6 +112,8 @@ SERVICE *service; service->databases = NULL; service->svc_config_param = NULL; service->svc_config_version = 0; + service->filters = NULL; + service->n_filters = 0; spinlock_init(&service->spin); spinlock_init(&service->users_table_spin); memset(&service->rate_limit, 0, sizeof(SERVICE_REFRESH_RATE)); @@ -608,6 +612,62 @@ serviceEnableRootUser(SERVICE *service, int action) return 1; } +/** + * Trim whitespace from the from an rear of a string + * + * @param str String to trim + * @return Trimmed string, chanesg are done in situ + */ +static char * +trim(char *str) +{ +char *ptr; + + while (isspace(*str)) + str++; + + ptr = str + strlen(str); + while (ptr > str && isspace(*ptr)) + *ptr-- = 0; + + return str; +} + +/** + * Set the filters used by the service + * + * @param service The service itself + * @param filters ASCII string of filters to use + */ +void +serviceSetFilters(SERVICE *service, char *filters) +{ +FILTER_DEF **flist; +char *ptr, *brkt; +int n = 0; + + flist = (FILTER_DEF *)malloc(sizeof(FILTER_DEF *)); + ptr = strtok_r(filters, "|", &brkt); + while (ptr) + { + n++; + flist = (FILTER_DEF *)realloc(flist, (n + 1) * sizeof(FILTER_DEF *)); + if ((flist[n-1] = filter_find(trim(ptr))) == NULL) + { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Unable to find filter '%s' for service '%s'\n", + trim(ptr), service->name + ))); + } + flist[n] = NULL; + ptr = strtok_r(NULL, "|", &brkt); + } + + service->filters = flist; + service->n_filters = n; +} + /** * Return a named service * @@ -638,6 +698,7 @@ void printService(SERVICE *service) { SERVER *ptr = service->databases; +int i; printf("Service %p\n", service); printf("\tService: %s\n", service->name); @@ -649,6 +710,16 @@ SERVER *ptr = service->databases; printf("\t\t%s:%d Protocol: %s\n", ptr->name, ptr->port, ptr->protocol); ptr = ptr->nextdb; } + if (service->n_filters) + { + printf("\tFilter chain: "); + for (i = 0; i < service->n_filters; i++) + { + printf("%s %s ", service->filters[i]->name, + i + 1 < service->n_filters ? "|" : ""); + } + printf("\n"); + } printf("\tUsers data: %p\n", service->users); printf("\tTotal connections: %d\n", service->stats.n_sessions); printf("\tCurrently connected: %d\n", service->stats.n_current); @@ -705,6 +776,7 @@ SERVICE *ptr; void dprintService(DCB *dcb, SERVICE *service) { SERVER *server = service->databases; +int i; dcb_printf(dcb, "Service %p\n", service); dcb_printf(dcb, "\tService: %s\n", service->name); @@ -714,6 +786,16 @@ SERVER *server = service->databases; service->router->diagnostics(service->router_instance, dcb); dcb_printf(dcb, "\tStarted: %s", asctime(localtime(&service->stats.started))); + if (service->n_filters) + { + dcb_printf(dcb, "\tFilter chain: "); + for (i = 0; i < service->n_filters; i++) + { + dcb_printf(dcb, "%s %s ", service->filters[i]->name, + i + 1 < service->n_filters ? "|" : ""); + } + dcb_printf(dcb, "\n"); + } dcb_printf(dcb, "\tBackend databases\n"); while (server) { diff --git a/server/core/session.c b/server/core/session.c index 8f59b85d3..0d3cb6105 100644 --- a/server/core/session.c +++ b/server/core/session.c @@ -25,6 +25,7 @@ * Date Who Description * 17/06/13 Mark Riddoch Initial implementation * 02/09/13 Massimiliano Pinto Added session refcounter + * 29/05/14 Mark Riddoch Addition of filter mechanism * * @endverbatim */ @@ -47,6 +48,9 @@ extern int lm_enabled_logfiles_bitmask; static SPINLOCK session_spin = SPINLOCK_INIT; static SESSION *allSessions = NULL; + +static int session_setup_filters(SESSION *session); + /** * Allocate a new session for a new client of the specified service. * @@ -132,7 +136,7 @@ session_alloc(SERVICE *service, DCB *client_dcb) /** * Inform other threads that session is closing. */ - session->state == SESSION_STATE_STOPPING; + session->state = SESSION_STATE_STOPPING; /*< * Decrease refcount, set dcb's session pointer NULL * and set session pointer to NULL. @@ -147,7 +151,45 @@ session_alloc(SERVICE *service, DCB *client_dcb) goto return_session; } + + /* + * Pending filter chain being setup set the head of the chain to + * be the router. As filters are inserted the current head will + * be pushed to the filter and the head updated. + * + * NB This dictates that filters are created starting at the end + * of the chain nearest the router working back to the client + * protocol end of the chain. + */ + session->head.instance = service->router_instance; + session->head.session = session->router_session; + + session->head.routeQuery = service->router->routeQuery; + + if (service->n_filters > 0) + { + if (!session_setup_filters(session)) + { + /** + * Inform other threads that session is closing. + */ + session->state = SESSION_STATE_STOPPING; + /*< + * Decrease refcount, set dcb's session pointer NULL + * and set session pointer to NULL. + */ + session_free(session); + client_dcb->session = NULL; + session = NULL; + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Failed to create %s session.", + service->name))); + goto return_session; + } + } } + spinlock_acquire(&session_spin); session->state = SESSION_STATE_ROUTER_READY; session->next = allSessions; @@ -440,6 +482,8 @@ SESSION *ptr; void dprintSession(DCB *dcb, SESSION *ptr) { +DOWNSTREAM *dptr; + dcb_printf(dcb, "Session %p\n", ptr); dcb_printf(dcb, "\tState: %s\n", session_state(ptr->state)); dcb_printf(dcb, "\tService: %s (%p)\n", ptr->service->name, ptr->service); @@ -520,3 +564,41 @@ SESSION* get_session_by_router_ses( } return ses; } + + +/** + * Create the filter chain for this session. + * + * Filters must be setup in reverse order, starting with the last + * filter in the chain and working back towards the client connection + * Each filter is passed the current session head of the filter chain + * this head becomes the destination for the filter. The newly created + * filter becomes the new head of the filter chain. + * + * @param session The session that requires the chain + * @return 0 if filter creation fails + */ +static int +session_setup_filters(SESSION *session) +{ +SERVICE *service = session->service; +DOWNSTREAM *head; +int i; + + for (i = service->n_filters - 1; i >= 0; i--) + { + if ((head = filterApply(service->filters[i], session, + &session->head)) == NULL) + { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Failed to create filter '%s' for service '%s'.\n", + service->filters[i]->name, + service->name))); + return 0; + } + session->head = *head; + } + + return 1; +} diff --git a/server/include/filter.h b/server/include/filter.h new file mode 100644 index 000000000..3b4ec27b8 --- /dev/null +++ b/server/include/filter.h @@ -0,0 +1,96 @@ +#ifndef _FILTER_H +#define _FILTER_H +/* + * This file is distributed as part of the SkySQL Gateway. It is free + * software: you can redistribute it and/or modify it under the terms of the + * GNU General Public License as published by the Free Software Foundation, + * version 2. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Copyright SkySQL Ab 2014 + */ + +/** + * @file filter.h - The filter interface mechanisms + * + * Revision History + * + * Date Who Description + * 27/05/2014 Mark Riddoch Initial implementation + * + */ +#include +#include +#include +#include + +/** + * The FILTER handle points to module specific data, so the best we can do + * is to make it a void * externally. + */ +typedef void *FILTER; +/** + * @verbatim + * The "module object" structure for a query router module + * + * The entry points are: + * createInstance Called by the service to create a new + * instance of the filter + * newSession Called to create a new user session + * within the filter + * closeSession Called when a session is closed + * freeSession Called when a session is freed + * setDownstream Sets the downstream component of the + * filter pipline + * routeQuery Called on each query that requires + * routing + * diagnostics Called to force the filter to print + * diagnostic output + * + * @endverbatim + * + * @see load_module + */ +typedef struct filter_object { + FILTER *(*createInstance)(char **options); + void *(*newSession)(FILTER *instance, SESSION *session); + void (*closeSession)(FILTER *instance, void *fsession); + void (*freeSession)(FILTER *instance, void *fsession); + void (*setDownstream)(FILTER *instance, void *fsession, DOWNSTREAM *downstream); + int (*routeQuery)(FILTER *instance, void *fsession, GWBUF *queue); + void (*diagnostics)(FILTER *instance, void *fsession, DCB *dcb); +} FILTER_OBJECT; + +/** + * The definition of a filter form the configuration file. + * This is basically the link between a plugin to load and the + * optons to pass to that plugin. + */ +typedef struct filter_def { + char *name; /*< The Filter name */ + char *module; /*< The module to load */ + char **options; /*< The options set for this filter */ + FILTER filter; + FILTER_OBJECT *obj; + SPINLOCK spin; + struct filter_def + *next; /*< Next filter in the chain of all filters */ +} FILTER_DEF; + +FILTER_DEF *filter_alloc(char *, char *); +void filter_free(FILTER_DEF *); +FILTER_DEF *filter_find(char *); +void filterAddOption(FILTER_DEF *, char *); +DOWNSTREAM *filterApply(FILTER_DEF *, SESSION *, DOWNSTREAM *); +void dprintAllFilters(DCB *); +void dprintFilter(DCB *, FILTER_DEF *); +void dListFilters(DCB *); +#endif diff --git a/server/include/modules.h b/server/include/modules.h index c90cf45a1..b6e7a00eb 100644 --- a/server/include/modules.h +++ b/server/include/modules.h @@ -30,6 +30,7 @@ * Date Who Description * 13/06/13 Mark Riddoch Initial implementation * 08/07/13 Mark Riddoch Addition of monitor modules + * 29/05/14 Mark Riddoch Addition of filter modules * @endverbatim */ @@ -49,6 +50,7 @@ typedef struct modules { #define MODULE_PROTOCOL "Protocol" /**< A protocol module type */ #define MODULE_ROUTER "Router" /**< A router module type */ #define MODULE_MONITOR "Monitor" /**< A database monitor module type */ +#define MODULE_FILTER "Filter" /**< A filter module type */ extern void *load_module(const char *module, const char *type); diff --git a/server/include/service.h b/server/include/service.h index 28ae8d3f3..40023332b 100644 --- a/server/include/service.h +++ b/server/include/service.h @@ -22,6 +22,7 @@ #include #include #include +#include #include "config.h" /** @@ -39,7 +40,9 @@ * 23/06/13 Mark Riddoch Added service user and users * 06/02/14 Massimiliano Pinto Added service flag for root user access * 25/02/14 Massimiliano Pinto Added service refresh limit feature - * 07/05/14 Massimiliano Pinto Added version_string field to service struct + * 07/05/14 Massimiliano Pinto Added version_string field to service + * struct + * 29/05/14 Mark Riddoch Filter API mechanism * * @endverbatim */ @@ -73,9 +76,10 @@ typedef struct { } SERVICE_STATS; /** - * The service user structure holds the information that is needed for this service to - * allow the gateway to login to the backend database and extact information such as - * the user table or other database status or configuration data. + * The service user structure holds the information that is needed + for this service to allow the gateway to login to the backend + database and extact information such as the user table or other + database status or configuration data. */ typedef struct { char *name; /**< The user name to use to extract information */ @@ -83,8 +87,8 @@ typedef struct { } SERVICE_USER; /** - * The service refresh rate hols the counter and last load timet for this service to - * load users data from the backend database + * The service refresh rate holds the counter and last load time_t + for this service to load users data from the backend database */ typedef struct { int nloads; @@ -99,30 +103,33 @@ typedef struct { * to the service. */ typedef struct service { - char *name; /**< The service name */ - int state; /**< The service state */ - SERV_PROTOCOL *ports; /**< Linked list of ports and protocols - * that this service will listen on. - */ - char *routerModule; /**< Name of router module to use */ - char **routerOptions; /**< Router specific option strings */ + char *name; /**< The service name */ + int state; /**< The service state */ + SERV_PROTOCOL *ports; /**< Linked list of ports and protocols + * that this service will listen on. + */ + char *routerModule; /**< Name of router module to use */ + char **routerOptions;/**< Router specific option strings */ struct router_object - *router; /**< The router we are using */ + *router; /**< The router we are using */ void *router_instance; - /**< The router instance for this service */ - char *version_string; /** version string for this service listeners */ - struct server *databases; /**< The set of servers in the backend */ - SERVICE_USER credentials; /**< The cedentials of the service user */ - SPINLOCK spin; /**< The service spinlock */ - SERVICE_STATS stats; /**< The service statistics */ - struct users *users; /**< The user data for this service */ - int enable_root; /**< Allow root user access */ - CONFIG_PARAMETER* svc_config_param; /*< list of config params and values */ - int svc_config_version; /*< Version number of configuration */ + /**< The router instance for this service */ + char *version_string;/** version string for this service listeners */ + struct server *databases; /**< The set of servers in the backend */ + SERVICE_USER credentials; /**< The cedentials of the service user */ + SPINLOCK spin; /**< The service spinlock */ + SERVICE_STATS stats; /**< The service statistics */ + struct users *users; /**< The user data for this service */ + int enable_root; /**< Allow root user access */ + CONFIG_PARAMETER* + svc_config_param; /*< list of config params and values */ + int svc_config_version; /*< Version number of configuration */ SPINLOCK users_table_spin; /**< The spinlock for users data refresh */ SERVICE_REFRESH_RATE rate_limit; /**< The refresh rate limit for users table */ + FILTER_DEF **filters; /**< Ordered list of filters */ + int n_filters; /**< Number of filters */ struct service *next; /**< The next service in the linked list */ } SERVICE; diff --git a/server/include/session.h b/server/include/session.h index 3f6955ae3..6918c5343 100644 --- a/server/include/session.h +++ b/server/include/session.h @@ -31,11 +31,14 @@ * 01-07-2013 Massimiliano Pinto Removed backends pointer * from struct session * 02-09-2013 Massimiliano Pinto Added session ref counter + * 29-05-2014 Mark Riddoch Support for filter mechanism + * added * * @endverbatim */ #include #include +#include #include #include @@ -59,6 +62,17 @@ typedef enum { SESSION_STATE_FREE /*< for all sessions */ } session_state_t; +/** + * The downstream element in the filter chain. This may refer to + * another filter or to a router. + */ +typedef struct { + void *instance; + void *session; + int (*routeQuery)(void *instance, void *router_session, GWBUF *queue); +} DOWNSTREAM; + + /** * The session status block * @@ -77,6 +91,7 @@ typedef struct session { void *router_session;/**< The router instance data */ SESSION_STATS stats; /**< Session statistics */ struct service *service; /**< The service this session is using */ + DOWNSTREAM head; /**< Head of the filter chain */ struct session *next; /**< Linked list of all sessions */ int refcount; /**< Reference count on the session */ #if defined(SS_DEBUG) @@ -86,6 +101,15 @@ typedef struct session { #define SESSION_PROTOCOL(x, type) DCB_PROTOCOL((x)->client, type) +/** + * A convenience macro that can be used by the protocol modules to route + * the incoming data to the first element in the pipeline of filters and + * routers. + */ +#define SESSION_ROUTE_QUERY(session, buf) \ + ((session)->head.routeQuery)((session)->head.instance, \ + (session)->head.session, (buf)) + SESSION *session_alloc(struct service *, struct dcb *); bool session_free(SESSION *); int session_isvalid(SESSION *); diff --git a/server/modules/filter/Makefile b/server/modules/filter/Makefile new file mode 100644 index 000000000..273ca44d3 --- /dev/null +++ b/server/modules/filter/Makefile @@ -0,0 +1,83 @@ +# This file is distributed as part of MaxScale form SkySQL. It is free +# software: you can redistribute it and/or modify it under the terms of the +# GNU General Public License as published by the Free Software Foundation, +# version 2. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more +# details. +# +# You should have received a copy of the GNU General Public License along with +# this program; if not, write to the Free Software Foundation, Inc., 51 +# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +# +# Copyright SkySQL Ab 2014 +# +# Revision History +# Date Who Description +# 29/05/14 Mark Riddoch Initial module development + +include ../../../build_gateway.inc + +LOGPATH := $(ROOT_PATH)/log_manager +UTILSPATH := $(ROOT_PATH)/utils + +CC=cc +CFLAGS=-c -fPIC -I/usr/include -I../include -I../../include -I$(LOGPATH) \ + -I$(UTILSPATH) -Wall -g + +include ../../../makefile.inc + +LDFLAGS=-shared -L$(LOGPATH) -Wl,-rpath,$(DEST)/lib \ + -Wl,-rpath,$(LOGPATH) -Wl,-rpath,$(UTILSPATH) + +TESTSRCS=testfilter.c +TESTOBJ=$(TESTSRCS:.c=.o) +QLASRCS=qlafilter.c +QLAOBJ=$(QLASRCS:.c=.o) +SRCS=$(TESTSRCS) +OBJ=$(SRCS:.c=.o) +LIBS=$(UTILSPATH)/skygw_utils.o -lssl -llog_manager +MODULES= libtestfilter.so libqlafilter.so + + +all: $(MODULES) + +libtestfilter.so: $(TESTOBJ) + $(CC) $(LDFLAGS) $(TESTOBJ) $(LIBS) -o $@ + +libqlafilter.so: $(QLAOBJ) + $(CC) $(LDFLAGS) $(QLAOBJ) $(LIBS) -o $@ + +.c.o: + $(CC) $(CFLAGS) $< -o $@ + +clean: + rm -f $(OBJ) $(MODULES) + +tags: + ctags $(SRCS) $(HDRS) + +depend: + @rm -f depend.mk + cc -M $(CFLAGS) $(SRCS) > depend.mk + +install: $(MODULES) + install -D $(MODULES) $(DEST)/MaxScale/modules + +cleantests: + $(MAKE) -C test cleantests + +buildtests: + $(MAKE) -C readwritesplit/test DEBUG=Y buildtests + $(MAKE) -C test DEBUG=Y buildtests + +runtests: + $(MAKE) -C test runtests + $(MAKE) -C readwritesplit runtests + +testall: + $(MAKE) -C test testall + +include depend.mk diff --git a/server/modules/filter/qlafilter.c b/server/modules/filter/qlafilter.c new file mode 100644 index 000000000..c47b8aeb2 --- /dev/null +++ b/server/modules/filter/qlafilter.c @@ -0,0 +1,211 @@ +/* + * This file is distributed as part of MaxScale by SkySQL. It is free + * software: you can redistribute it and/or modify it under the terms of the + * GNU General Public License as published by the Free Software Foundation, + * version 2. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Copyright SkySQL Ab 2014 + */ +#include +#include +#include + +static char *version_str = "V1.0.0"; + +static FILTER *createInstance(char **options); +static void *newSession(FILTER *instance, SESSION *session); +static void closeSession(FILTER *instance, void *session); +static void freeSession(FILTER *instance, void *session); +static void setDownstream(FILTER *instance, void *fsession, DOWNSTREAM *downstream); +static int routeQuery(FILTER *instance, void *fsession, GWBUF *queue); +static void diagnostic(FILTER *instance, void *fsession, DCB *dcb); + + +static FILTER_OBJECT MyObject = { + createInstance, + newSession, + closeSession, + freeSession, + setDownstream, + routeQuery, + diagnostic, +}; + +/** + * A dummy instance structure + */ +typedef struct { + int sessions; + char *filebase; +} QLA_INSTANCE; + +/** + * A dummy session structure for this test filter + */ +typedef struct { + DOWNSTREAM down; + int mysession; + int fd; +} QLA_SESSION; + +/** + * Implementation of the mandatory version entry point + * + * @return version string of the module + */ +char * +version() +{ + return version_str; +} + +/** + * The module initialisation routine, called when the module + * is first loaded. + */ +void +ModuleInit() +{ +} + +/** + * The module entry point routine. It is this routine that + * must populate the structure that is referred to as the + * "module object", this is a structure with the set of + * external entry points for this module. + * + * @return The module object + */ +FILTER_OBJECT * +GetModuleObject() +{ + return &MyObject; +} + +/** + * Create an instance of the filter for a particular service + * within MaxScale. + * + * @param options The options for this filter + * + * @return The instance data for this new instance + */ +static FILTER * +createInstance(char **options) +{ +QLA_INSTANCE *my_instance; + + if ((my_instance = calloc(1, sizeof(QLA_INSTANCE))) != NULL) + { + if (options) + my_instance->filebase = strdup(options[0]); + else + my_instance->filebase = strdup("qla"); + my_instance->sessions = 0; + } + return (FILTER *)my_instance; +} + +/** + * Associate a new session with this instance of the router. + * + * @param instance The filter instance data + * @param session The session itself + * @return Session specific data for this session + */ +static void * +newSession(FILTER *instance, SESSION *session) +{ +QLA_INSTANCE *my_instance = (QLA_INSTANCE *)instance; +QLA_SESSION *my_session; +char filename[100]; + + if ((my_session = calloc(1, sizeof(QLA_SESSION))) != NULL) + { + sprintf(filename, "%s.%d", my_instance->filebase, + my_instance->sessions); +printf("Open file %s\n", filename); + my_instance->sessions++; + my_session->fd = open(filename, O_WRONLY|O_CREAT, 0666); + } + + return my_session; +} + +/** + * Close a session with the router, this is the mechanism + * by which a router may cleanup data structure etc. + * + * @param instance The filter instance data + * @param session The session being closed + */ +static void +closeSession(FILTER *instance, void *session) +{ +QLA_SESSION *my_session; + + close(my_session->fd); +} + +static void +freeSession(FILTER *instance, void *session) +{ + return; +} + +static void +setDownstream(FILTER *instance, void *session, DOWNSTREAM *downstream) +{ +QLA_SESSION *my_session = (QLA_SESSION *)session; + + my_session->down = *downstream; +} + +static int +routeQuery(FILTER *instance, void *session, GWBUF *queue) +{ +QLA_SESSION *my_session = (QLA_SESSION *)session; +unsigned char *ptr; +unsigned int length; + + ptr = GWBUF_DATA(queue); + length = *ptr++; + length += (*ptr++ << 8); + length += (*ptr++ << 8); + ptr++; // Skip sequence id + if (*ptr++ == 0x03) // COM_QUERY + { + write(my_session->fd, ptr, length - 1); + write(my_session->fd, "\n", 1); + } + return my_session->down.routeQuery(my_session->down.instance, + my_session->down.session, queue); +} + +/** + * Diagnostics routine + * + * If fsession is NULL then print diagnostics on the filter + * instance as a whole, otherwise print diagnostics for the + * particular session. + * + * @param instance The filter instance + * @param fsession Filter session, may be NULL + * @param dcb The DCB for diagnostic output + */ +static void +diagnostic(FILTER *instance, void *fsession, DCB *dcb) +{ +QLA_INSTANCE *my_instance = instance; +QLA_SESSION *my_session = fsession; + +} diff --git a/server/modules/filter/testfilter.c b/server/modules/filter/testfilter.c new file mode 100644 index 000000000..9e477cb5b --- /dev/null +++ b/server/modules/filter/testfilter.c @@ -0,0 +1,190 @@ +/* + * This file is distributed as part of MaxScale by SkySQL. It is free + * software: you can redistribute it and/or modify it under the terms of the + * GNU General Public License as published by the Free Software Foundation, + * version 2. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Copyright SkySQL Ab 2014 + */ +#include +#include + +static char *version_str = "V1.0.0"; + +static FILTER *createInstance(char **options); +static void *newSession(FILTER *instance, SESSION *session); +static void closeSession(FILTER *instance, void *session); +static void freeSession(FILTER *instance, void *session); +static void setDownstream(FILTER *instance, void *fsession, DOWNSTREAM *downstream); +static int routeQuery(FILTER *instance, void *fsession, GWBUF *queue); +static void diagnostic(FILTER *instance, void *fsession, DCB *dcb); + + +static FILTER_OBJECT MyObject = { + createInstance, + newSession, + closeSession, + freeSession, + setDownstream, + routeQuery, + diagnostic, +}; + +/** + * A dummy instance structure + */ +typedef struct { + int sessions; +} TEST_INSTANCE; + +/** + * A dummy session structure for this test filter + */ +typedef struct { + DOWNSTREAM down; + int count; +} TEST_SESSION; + +/** + * Implementation of the mandatory version entry point + * + * @return version string of the module + */ +char * +version() +{ + return version_str; +} + +/** + * The module initialisation routine, called when the module + * is first loaded. + */ +void +ModuleInit() +{ +} + +/** + * The module entry point routine. It is this routine that + * must populate the structure that is referred to as the + * "module object", this is a structure with the set of + * external entry points for this module. + * + * @return The module object + */ +FILTER_OBJECT * +GetModuleObject() +{ + return &MyObject; +} + +/** + * Create an instance of the filter for a particular service + * within MaxScale. + * + * @param options The options for this filter + * + * @return The instance data for this new instance + */ +static FILTER * +createInstance(char **options) +{ +TEST_INSTANCE *my_instance; + + if ((my_instance = calloc(1, sizeof(TEST_INSTANCE))) != NULL) + my_instance->sessions = 0; + return my_instance; +} + +/** + * Associate a new session with this instance of the router. + * + * @param instance The filter instance data + * @param session The session itself + * @return Session specific data for this session + */ +static void * +newSession(FILTER *instance, SESSION *session) +{ +TEST_INSTANCE *my_instance = (TEST_INSTANCE *)instance; +TEST_SESSION *my_session; + + if ((my_session = calloc(1, sizeof(TEST_SESSION))) != NULL) + { + my_instance->sessions++; + my_session->count = 0; + } + + return my_session; +} + +/** + * Close a session with the router, this is the mechanism + * by which a router may cleanup data structure etc. + * + * @param instance The filter instance data + * @param session The session being closed + */ +static void +closeSession(FILTER *instance, void *session) +{ +} + +static void +freeSession(FILTER *instance, void *session) +{ + return; +} + +static void +setDownstream(FILTER *instance, void *session, DOWNSTREAM *downstream) +{ +TEST_SESSION *my_session = (TEST_SESSION *)session; + + my_session->down = *downstream; +} + +static int +routeQuery(FILTER *instance, void *session, GWBUF *queue) +{ +TEST_SESSION *my_session = (TEST_SESSION *)session; + + my_session->count++; + return my_session->down.routeQuery(my_session->down.instance, + my_session->down.session, queue); +} + +/** + * Diagnostics routine + * + * If fsession is NULL then print diagnostics on the filter + * instance as a whole, otherwise print diagnostics for the + * particular session. + * + * @param instance The filter instance + * @param fsession Filter session, may be NULL + * @param dcb The DCB for diagnostic output + */ +static void +diagnostic(FILTER *instance, void *fsession, DCB *dcb) +{ +TEST_INSTANCE *my_instance = instance; +TEST_SESSION *my_session = fsession; + + if (my_session) + dcb_printf(dcb, "No. of queries routed by filter: %d\n", + my_session->count); + else + dcb_printf(dcb, "No. of sessions created: %d\n", + my_instance->sessions); +} diff --git a/server/modules/protocol/mysql_client.c b/server/modules/protocol/mysql_client.c index bcd94e423..941fd867d 100644 --- a/server/modules/protocol/mysql_client.c +++ b/server/modules/protocol/mysql_client.c @@ -57,11 +57,7 @@ static int gw_client_hangup_event(DCB *dcb); int mysql_send_ok(DCB *dcb, int packet_number, int in_affected_rows, const char* mysql_message); int MySQLSendHandshake(DCB* dcb); static int gw_mysql_do_authentication(DCB *dcb, GWBUF *queue); -static int route_by_statement( - ROUTER* router_instance, - ROUTER_OBJECT* router, - void* rsession, - GWBUF* read_buf); +static int route_by_statement(SESSION *, GWBUF *); /* * The "module object" for the mysqld client protocol module. @@ -765,7 +761,7 @@ int gw_read_client_event(DCB* dcb) { /** Route COM_QUIT to backend */ if (mysql_command == '\x01') { - router->routeQuery(router_instance, rsession, read_buffer); + SESSION_ROUTE_QUERY(session, read_buffer); LOGIF(LD, (skygw_log_write_flush( LOGFILE_DEBUG, "%lu [gw_read_client_event] Routed COM_QUIT to " @@ -783,10 +779,7 @@ int gw_read_client_event(DCB* dcb) { * Feed each statement completely and separately * to router. */ - rc = route_by_statement(router_instance, - router, - rsession, - read_buffer); + rc = route_by_statement(session, read_buffer); if (read_buffer != NULL) { /** add incomplete mysql packet to read queue */ @@ -796,9 +789,7 @@ int gw_read_client_event(DCB* dcb) { else { /** Feed whole packet to router */ - rc = router->routeQuery(router_instance, - rsession, - read_buffer); + rc = SESSION_ROUTE_QUERY(session, read_buffer); } /** succeed */ @@ -1339,11 +1330,7 @@ gw_client_hangup_event(DCB *dcb) * Return 1 in success. If the last packet is incomplete return success but * leave incomplete packet to readbuf. */ -static int route_by_statement( - ROUTER* router_instance, - ROUTER_OBJECT* router, - void* rsession, - GWBUF* readbuf) +static int route_by_statement(SESSION *session, GWBUF *readbuf) { int rc = -1; GWBUF* packetbuf; @@ -1355,7 +1342,7 @@ static int route_by_statement( if (packetbuf != NULL) { CHK_GWBUF(packetbuf); - rc = router->routeQuery(router_instance, rsession, packetbuf); + rc = SESSION_ROUTE_QUERY(session, packetbuf); } else { diff --git a/server/modules/protocol/telnetd.c b/server/modules/protocol/telnetd.c index f7d6c1815..b6be3e049 100644 --- a/server/modules/protocol/telnetd.c +++ b/server/modules/protocol/telnetd.c @@ -140,9 +140,6 @@ telnetd_read_event(DCB* dcb) int n; GWBUF *head = NULL; SESSION *session = dcb->session; -ROUTER_OBJECT *router = session->service->router; -ROUTER *router_instance = session->service->router_instance; -void *rsession = session->router_session; TELNETD *telnetd = (TELNETD *)dcb->protocol; char *password, *t; @@ -196,7 +193,7 @@ char *password, *t; free(password); break; case TELNETD_STATE_DATA: - router->routeQuery(router_instance, rsession, head); + SESSION_ROUTE_QUERY(session, head); break; } } diff --git a/server/modules/routing/debugcmd.c b/server/modules/routing/debugcmd.c index e7720cb30..3973efa3d 100644 --- a/server/modules/routing/debugcmd.c +++ b/server/modules/routing/debugcmd.c @@ -40,6 +40,7 @@ * 20/05/14 Mark Riddoch Added ability to give server and service names rather * than simply addresses * 23/05/14 Mark Riddoch Added support for developer and user modes + * 29/05/14 Mark Riddoch Add Filter support * * @endverbatim */ @@ -50,6 +51,7 @@ #include #include #include +#include #include #include #include @@ -77,6 +79,7 @@ #define ARG_TYPE_SESSION 6 #define ARG_TYPE_DCB 7 #define ARG_TYPE_MONITOR 8 +#define ARG_TYPE_FILTER 9 /** * The subcommand structure @@ -113,6 +116,14 @@ struct subcommand showoptions[] = { "Show the poll statistics", "Show the poll statistics", {0, 0, 0} }, + { "filter", 0, dprintFilter, + "Show details of a filter, called with a filter name", + "Show details of a filter, called with the address of a filter", + {ARG_TYPE_FILTER, 0, 0} }, + { "filters", 0, dprintAllFilters, + "Show all filters", + "Show all filters", + {0, 0, 0} }, { "modules", 0, dprintAllModules, "Show all currently loaded modules", "Show all currently loaded modules", @@ -157,6 +168,10 @@ struct subcommand showoptions[] = { * The subcommands of the list command */ struct subcommand listoptions[] = { + { "filters", 0, dListFilters, + "List all the filters defined within MaxScale", + "List all the filters defined within MaxScale", + {0, 0, 0} }, { "listeners", 0, dListListeners, "List all the listeners defined within MaxScale", "List all the listeners defined within MaxScale", @@ -493,6 +508,10 @@ SERVICE *service; if (mode == CLIM_USER || (rval = (unsigned long)strtol(arg, NULL, 0)) == 0) rval = (unsigned long)monitor_find(arg); return rval; + case ARG_TYPE_FILTER: + if (mode == CLIM_USER || (rval = (unsigned long)strtol(arg, NULL, 0)) == 0) + rval = (unsigned long)filter_find(arg); + return rval; } return 0; }