diff --git a/Documentation/filters/Regex Filter.pdf b/Documentation/filters/Regex Filter.pdf index fb73468a4..f45fef043 100644 Binary files a/Documentation/filters/Regex Filter.pdf and b/Documentation/filters/Regex Filter.pdf differ diff --git a/Documentation/filters/Tee Filter.pdf b/Documentation/filters/Tee Filter.pdf new file mode 100644 index 000000000..f8de502ca Binary files /dev/null and b/Documentation/filters/Tee Filter.pdf differ diff --git a/Documentation/filters/Top Filter.pdf b/Documentation/filters/Top Filter.pdf index 03ff3d561..a7cb2061d 100644 Binary files a/Documentation/filters/Top Filter.pdf and b/Documentation/filters/Top Filter.pdf differ diff --git a/server/core/dcb.c b/server/core/dcb.c index 52585e9d7..f9f9eafbe 100644 --- a/server/core/dcb.c +++ b/server/core/dcb.c @@ -48,6 +48,7 @@ * This fixes a bug with many reads from * backend * 07/05/2014 Mark Riddoch Addition of callback mechanism + * 20/06/2014 Mark Riddoch Addition of dcb_clone * * @endverbatim */ @@ -84,6 +85,9 @@ static bool dcb_set_state_nomutex( dcb_state_t* old_state); static void dcb_call_callback(DCB *dcb, DCB_REASON reason); static DCB* dcb_get_next (DCB* dcb); +static int dcb_null_write(DCB *dcb, GWBUF *buf); +static int dcb_null_close(DCB *dcb); +static int dcb_null_auth(DCB *dcb, SERVER *server, SESSION *session, GWBUF *buf); DCB* dcb_get_zombies(void) { @@ -136,6 +140,7 @@ DCB *rval; rval->remote = NULL; rval->user = NULL; + rval->flags = 0; spinlock_acquire(&dcbspin); if (allDCBs == NULL) @@ -248,7 +253,39 @@ dcb_add_to_zombieslist(DCB *dcb) spinlock_release(&zombiespin); } +/* + * Clone a DCB for internal use, mostly used for specialist filters + * to create dummy clients based on real clients. + * + * @param orig The DCB to clone + * @return A DCB that can be used as a client + */ +DCB * +dcb_clone(DCB *orig) +{ +DCB *clone; + if ((clone = dcb_alloc(DCB_ROLE_REQUEST_HANDLER)) == NULL) + { + return NULL; + } + + clone->fd = -1; + clone->flags |= DCBF_CLONE; + clone->state = orig->state; + clone->data = orig->data; + if (orig->remote) + clone->remote = strdup(orig->remote); + if (orig->user) + clone->user = strdup(orig->user); + clone->protocol = orig->protocol; + + clone->func.write = dcb_null_write; + clone->func.close = dcb_null_close; + clone->func.auth = dcb_null_auth; + + return clone; +} /** * Free a DCB and remove it from the chain of all DCBs @@ -312,7 +349,7 @@ DCB_CALLBACK *cb; if (dcb->protocol != NULL) free(dcb->protocol); - if (dcb->data) + if (dcb->data && ((dcb->flags & DCBF_CLONE) ==0)) free(dcb->data); if (dcb->remote) free(dcb->remote); @@ -1190,6 +1227,8 @@ DCB *dcb; dcb_printf(pdcb, "\t\tNo. of Accepts: %d\n", dcb->stats.n_accepts); dcb_printf(pdcb, "\t\tNo. of High Water Events: %d\n", dcb->stats.n_high_water); dcb_printf(pdcb, "\t\tNo. of Low Water Events: %d\n", dcb->stats.n_low_water); + if (dcb->flags & DCBF_CLONE) + dcb_printf(pdcb, "\t\tDCB is a clone.\n"); dcb = dcb->next; } spinlock_release(&dcbspin); @@ -1254,6 +1293,8 @@ dprintDCB(DCB *pdcb, DCB *dcb) dcb->stats.n_high_water); dcb_printf(pdcb, "\t\tNo. of Low Water Events: %d\n", dcb->stats.n_low_water); + if (dcb->flags & DCBF_CLONE) + dcb_printf(pdcb, "\t\tDCB is a clone.\n"); } /** @@ -1284,7 +1325,7 @@ gw_dcb_state2string (int state) { } /** - * A DCB based wrapper for printf. Allows formattign printing to + * A DCB based wrapper for printf. Allows formatting printing to * a descritor control block. * * @param dcb Descriptor to write to @@ -1821,4 +1862,47 @@ void dcb_call_foreach ( } return; } - + + +/** + * Null protocol write routine used for cloned dcb's. It merely consumes + * buffers written on the cloned DCB. + * + * @params dcb The descriptor control block + * @params buf The buffer beign written + * @return Always returns a good write operation result + */ +static int +dcb_null_write(DCB *dcb, GWBUF *buf) +{ + while (buf) + { + buf = gwbuf_consume(buf, GWBUF_LENGTH(buf)); + } + return 1; +} + +/** + * Null protocol close operation for use by cloned DCB's. + * + * @param dcb The DCB being closed. + */ +static int +dcb_null_close(DCB *dcb) +{ + return 0; +} + +/** + * Null protocol auth operation for use by cloned DCB's. + * + * @param dcb The DCB being closed. + * @param server The server to auth against + * @param session The user session + * @param buf The buffer with the new auth request + */ +static int +dcb_null_auth(DCB *dcb, SERVER *server, SESSION *session, GWBUF *buf) +{ + return 0; +} diff --git a/server/include/dcb.h b/server/include/dcb.h index aa9b10e51..3a2cace5c 100644 --- a/server/include/dcb.h +++ b/server/include/dcb.h @@ -207,6 +207,7 @@ typedef struct dcb { #endif int fd; /**< The descriptor */ dcb_state_t state; /**< Current descriptor state */ + int flags; /**< DCB flags */ char *remote; /**< Address of remote end */ char *user; /**< User name for connection */ struct sockaddr_in ipv4; /**< remote end IPv4 address */ @@ -271,6 +272,7 @@ int dcb_write(DCB *, GWBUF *); DCB *dcb_alloc(dcb_role_t); void dcb_free(DCB *); DCB *dcb_connect(struct server *, struct session *, const char *); +DCB *dcb_clone(DCB *); int dcb_read(DCB *, GWBUF **); int dcb_drain_writeq(DCB *); void dcb_close(DCB *); @@ -295,4 +297,8 @@ bool dcb_set_state( DCB* dcb, dcb_state_t new_state, dcb_state_t* old_state); + + +/* DCB flags values */ +#define DCBF_CLONE 0x0001 /* DCB is a clone */ #endif /* _DCB_H */ diff --git a/server/modules/filter/Makefile b/server/modules/filter/Makefile index f7f6bb29a..b51a9e671 100644 --- a/server/modules/filter/Makefile +++ b/server/modules/filter/Makefile @@ -40,10 +40,12 @@ REGEXSRCS=regexfilter.c REGEXOBJ=$(REGEXSRCS:.c=.o) TOPNSRCS=topfilter.c TOPNOBJ=$(TOPNSRCS:.c=.o) -SRCS=$(TESTSRCS) $(QLASRCS) $(REGEXSRCS) $(TOPNSRCS) +TEESRCS=tee.c +TEEOBJ=$(TEESRCS:.c=.o) +SRCS=$(TESTSRCS) $(QLASRCS) $(REGEXSRCS) $(TOPNSRCS) $(TEESRCS) OBJ=$(SRCS:.c=.o) LIBS=$(UTILSPATH)/skygw_utils.o -lssl -llog_manager -MODULES= libtestfilter.so libqlafilter.so libregexfilter.so libtopfilter.so +MODULES= libtestfilter.so libqlafilter.so libregexfilter.so libtopfilter.so libtee.so all: $(MODULES) @@ -60,6 +62,9 @@ libregexfilter.so: $(REGEXOBJ) libtopfilter.so: $(TOPNOBJ) $(CC) $(LDFLAGS) $(TOPNOBJ) $(LIBS) -o $@ +libtee.so: $(TEEOBJ) + $(CC) $(LDFLAGS) $(TEEOBJ) $(LIBS) -o $@ + .c.o: $(CC) $(CFLAGS) $< -o $@ diff --git a/server/modules/filter/tee.c b/server/modules/filter/tee.c new file mode 100644 index 000000000..908117acf --- /dev/null +++ b/server/modules/filter/tee.c @@ -0,0 +1,423 @@ +/* + * This file is distributed as part of MaxScale by SkySQL. It is free + * software: you can redistribute it and/or modify it under the terms of the + * GNU General Public License as published by the Free Software Foundation, + * version 2. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Copyright SkySQL Ab 2014 + */ + +/** + * @file tee.c A filter that splits the processing pipeline in two + * + * Conditionally duplicate requests and send the duplicates to another service + * within MaxScale. + * + * Parameters + * ========== + * + * service The service to send the duplicates to + * source The source address to match in order to duplicate (optional) + * match A regular expression to match in order to perform duplication + * of the request (optional) + * nomatch A regular expression to match in order to prevent duplication + * of the request (optional) + * user A user name to match against. If present only requests that + * originate from this user will be duplciated (optional) + * + * Revision History + * ================ + * + * Date Who Description + * 20/06/2014 Mark Riddoch Initial implementation + * + */ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +extern int lm_enabled_logfiles_bitmask; + +MODULE_INFO info = { + MODULE_API_FILTER, + MODULE_ALPHA_RELEASE, + FILTER_VERSION, + "A tee piece in the filter plumbing" +}; + +static char *version_str = "V1.0.0"; + +/* + * The filter entry points + */ +static FILTER *createInstance(char **options, FILTER_PARAMETER **); +static void *newSession(FILTER *instance, SESSION *session); +static void closeSession(FILTER *instance, void *session); +static void freeSession(FILTER *instance, void *session); +static void setDownstream(FILTER *instance, void *fsession, DOWNSTREAM *downstream); +static int routeQuery(FILTER *instance, void *fsession, GWBUF *queue); +static void diagnostic(FILTER *instance, void *fsession, DCB *dcb); + + +static FILTER_OBJECT MyObject = { + createInstance, + newSession, + closeSession, + freeSession, + setDownstream, + NULL, // No Upstream requirement + routeQuery, + NULL, // No client reply + diagnostic, +}; + +/** + * The instance structure for the TEE filter - this holds the configuration + * information for the filter. + */ +typedef struct { + SERVICE *service; /* The service to duplicate requests to */ + char *source; /* The source of the client connection */ + char *userName; /* The user name to filter on */ + char *match; /* Optional text to match against */ + regex_t re; /* Compiled regex text */ + char *nomatch; /* Optional text to match against for exclusion */ + regex_t nore; /* Compiled regex nomatch text */ +} TEE_INSTANCE; + +/** + * The session structure for this TEE filter. + * This stores the downstream filter information, such that the + * filter is able to pass the query on to the next filter (or router) + * in the chain. + * + * It also holds the file descriptor to which queries are written. + */ +typedef struct { + DOWNSTREAM down; + int active; + DCB *branch_dcb; + SESSION *branch_session; + int n_duped; +} TEE_SESSION; + +/** + * Implementation of the mandatory version entry point + * + * @return version string of the module + */ +char * +version() +{ + return version_str; +} + +/** + * The module initialisation routine, called when the module + * is first loaded. + */ +void +ModuleInit() +{ +} + +/** + * The module entry point routine. It is this routine that + * must populate the structure that is referred to as the + * "module object", this is a structure with the set of + * external entry points for this module. + * + * @return The module object + */ +FILTER_OBJECT * +GetModuleObject() +{ + return &MyObject; +} + +/** + * Create an instance of the filter for a particular service + * within MaxScale. + * + * @param options The options for this filter + * + * @return The instance data for this new instance + */ +static FILTER * +createInstance(char **options, FILTER_PARAMETER **params) +{ +TEE_INSTANCE *my_instance; +int i; + + if ((my_instance = calloc(1, sizeof(TEE_INSTANCE))) != NULL) + { + if (options) + { + LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, + "tee: The tee filter has been passed an option, " + "this filter does not support any options.\n"))); + } + my_instance->service = NULL; + my_instance->source = NULL; + my_instance->userName = NULL; + my_instance->match = NULL; + my_instance->nomatch = NULL; + if (params) + { + for (i = 0; params[i]; i++) + { + if (!strcmp(params[i]->name, "service")) + { + if ((my_instance->service = service_find(params[i]->value)) == NULL) + { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "tee: service '%s' " + "not found.\n", + params[i]->value))); + } + } + else if (!strcmp(params[i]->name, "match")) + { + my_instance->match = strdup(params[i]->value); + } + else if (!strcmp(params[i]->name, "exclude")) + { + my_instance->nomatch = strdup(params[i]->value); + } + else if (!strcmp(params[i]->name, "source")) + my_instance->source = strdup(params[i]->value); + else if (!strcmp(params[i]->name, "user")) + my_instance->userName = strdup(params[i]->value); + else if (!filter_standard_parameter(params[i]->name)) + { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "tee: Unexpected parameter '%s'.\n", + params[i]->name))); + } + } + } + if (my_instance->service == NULL) + { + free(my_instance->match); + free(my_instance->source); + free(my_instance); + return NULL; + } + if (my_instance->match && + regcomp(&my_instance->re, my_instance->match, REG_ICASE)) + { + LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, + "tee: Invalid regular expression '%s'" + " for the match parameter.\n", + my_instance->match))); + free(my_instance->match); + free(my_instance->source); + free(my_instance); + return NULL; + } + if (my_instance->nomatch && + regcomp(&my_instance->nore, my_instance->nomatch, + REG_ICASE)) + { + LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, + "tee: Invalid regular expression '%s'" + " for the nomatch paramter.\n", + my_instance->match))); + if (my_instance->match) + regfree(&my_instance->re); + free(my_instance->match); + free(my_instance->source); + free(my_instance); + return NULL; + } + } + return (FILTER *)my_instance; +} + +/** + * Associate a new session with this instance of the filter. + * + * Create the file to log to and open it. + * + * @param instance The filter instance data + * @param session The session itself + * @return Session specific data for this session + */ +static void * +newSession(FILTER *instance, SESSION *session) +{ +TEE_INSTANCE *my_instance = (TEE_INSTANCE *)instance; +TEE_SESSION *my_session; +char *remote, *userName; + + if ((my_session = calloc(1, sizeof(TEE_SESSION))) != NULL) + { + my_session->active = 1; + if (my_instance->source + && (remote = session_get_remote(session)) != NULL) + { + if (strcmp(remote, my_instance->source)) + my_session->active = 0; + } + userName = session_getUser(session); + if (my_instance->userName && userName && strcmp(userName, + my_instance->userName)) + my_session->active = 0; + if (my_session->active) + { + my_session->branch_dcb = dcb_clone(session->client); + my_session->branch_session = session_alloc(my_instance->service, my_session->branch_dcb); + } + } + + return my_session; +} + +/** + * Close a session with the filter, this is the mechanism + * by which a filter may cleanup data structure etc. + * In the case of the tee filter we need to close down the + * "branch" session. + * + * @param instance The filter instance data + * @param session The session being closed + */ +static void +closeSession(FILTER *instance, void *session) +{ +TEE_SESSION *my_session = (TEE_SESSION *)session; +ROUTER_OBJECT *router; +void *router_instance, *rsession; +SESSION *bsession; + + if (my_session->active) + { + bsession = my_session->branch_session; + router = bsession->service->router; + router_instance = bsession->service->router_instance; + rsession = bsession->router_session; + /** Close router session and all its connections */ + router->closeSession(router_instance, rsession); + dcb_free(my_session->branch_dcb); + /* No need to free the session, this is done as + * a side effect of closign the client DCB of the + * session. + */ + } +} + +/** + * Free the memory associated with the session + * + * @param instance The filter instance + * @param session The filter session + */ +static void +freeSession(FILTER *instance, void *session) +{ +TEE_SESSION *my_session = (TEE_SESSION *)session; + + free(session); + return; +} + +/** + * Set the downstream filter or router to which queries will be + * passed from this filter. + * + * @param instance The filter instance data + * @param session The filter session + * @param downstream The downstream filter or router. + */ +static void +setDownstream(FILTER *instance, void *session, DOWNSTREAM *downstream) +{ +TEE_SESSION *my_session = (TEE_SESSION *)session; + + my_session->down = *downstream; +} + +/** + * The routeQuery entry point. This is passed the query buffer + * to which the filter should be applied. Once applied the + * query should normally be passed to the downstream component + * (filter or router) in the filter chain. + * + * @param instance The filter instance data + * @param session The filter session + * @param queue The query data + */ +static int +routeQuery(FILTER *instance, void *session, GWBUF *queue) +{ +TEE_INSTANCE *my_instance = (TEE_INSTANCE *)instance; +TEE_SESSION *my_session = (TEE_SESSION *)session; +char *ptr; +int length, rval; +GWBUF *clone = NULL; + + if (my_session->active && modutil_extract_SQL(queue, &ptr, &length)) + { + if ((my_instance->match == NULL || + regexec(&my_instance->re, ptr, 0, NULL, 0) == 0) && + (my_instance->nomatch == NULL || + regexec(&my_instance->nore,ptr,0,NULL, 0) != 0)) + { + clone = gwbuf_clone(queue); + } + } + + /* Pass the query downstream */ + rval = my_session->down.routeQuery(my_session->down.instance, + my_session->down.session, queue); + if (clone) + { + my_session->n_duped++; + SESSION_ROUTE_QUERY(my_session->branch_session, clone); + } + return rval; +} + +/** + * Diagnostics routine + * + * If fsession is NULL then print diagnostics on the filter + * instance as a whole, otherwise print diagnostics for the + * particular session. + * + * @param instance The filter instance + * @param fsession Filter session, may be NULL + * @param dcb The DCB for diagnostic output + */ +static void +diagnostic(FILTER *instance, void *fsession, DCB *dcb) +{ +TEE_SESSION *my_session = (TEE_SESSION *)fsession; + + if (my_session) + { + dcb_printf(dcb, "\t\tNo. of statements duplicated: %d.\n", + my_session->n_duped); + } +}