From 77e1426dbfa4c45170ce21cdeb33b4cd8b74543f Mon Sep 17 00:00:00 2001 From: Mark Riddoch Date: Mon, 9 Jun 2014 21:13:28 +0100 Subject: [PATCH] Initial upstream fitlering implementation. Test filter with up and down stream filtering. Keeps top N queries and prints a report of these queries on session close. --- server/core/filter.c | 26 ++ server/core/session.c | 35 +- server/include/filter.h | 5 +- server/include/session.h | 25 +- server/modules/filter/Makefile | 9 +- server/modules/filter/qlafilter.c | 2 + server/modules/filter/regexfilter.c | 2 + server/modules/filter/testfilter.c | 2 + server/modules/filter/topfilter.c | 421 ++++++++++++++++++ server/modules/protocol/mysql_backend.c | 2 +- server/modules/routing/readconnroute.c | 2 +- .../routing/readwritesplit/readwritesplit.c | 2 +- 12 files changed, 519 insertions(+), 14 deletions(-) create mode 100644 server/modules/filter/topfilter.c diff --git a/server/core/filter.c b/server/core/filter.c index a3db72e3b..beab5abac 100644 --- a/server/core/filter.c +++ b/server/core/filter.c @@ -314,3 +314,29 @@ DOWNSTREAM *me; return me; } + +UPSTREAM * +filterUpstream(FILTER_DEF *filter, void *fsession, UPSTREAM *upstream) +{ +UPSTREAM *me; + + /* + * The the filter has no setUpstream entry point then is does + * not require to see results and can be left out of the chain. + */ + if (filter->obj->setUpstream == NULL) + return upstream; + + if (filter->obj->clientReply != NULL) + { + if ((me = (UPSTREAM *)calloc(1, sizeof(UPSTREAM))) == NULL) + { + return NULL; + } + me->instance = filter->filter; + me->session = fsession; + me->clientReply = filter->obj->clientReply; + filter->obj->setUpstream(me->instance, me->session, upstream); + } + return me; +} diff --git a/server/core/session.c b/server/core/session.c index cb392258c..2d6fe9396 100644 --- a/server/core/session.c +++ b/server/core/session.c @@ -164,9 +164,12 @@ session_alloc(SERVICE *service, DCB *client_dcb) */ session->head.instance = service->router_instance; session->head.session = session->router_session; - session->head.routeQuery = service->router->routeQuery; + session->tail.instance = session; + session->tail.session = session; + session->tail.clientReply = session_reply; + if (service->n_filters > 0) { if (!session_setup_filters(session)) @@ -315,6 +318,12 @@ bool session_free( } if (session->n_filters) { + for (i = 0; i < session->n_filters; i++) + { + session->filters[i].filter->obj->closeSession( + session->filters[i].instance, + session->filters[i].session); + } for (i = 0; i < session->n_filters; i++) { session->filters[i].filter->obj->freeSession( @@ -610,6 +619,7 @@ session_setup_filters(SESSION *session) { SERVICE *service = session->service; DOWNSTREAM *head; +UPSTREAM *tail; int i; if ((session->filters = calloc(service->n_filters, @@ -640,5 +650,28 @@ int i; session->head = *head; } + for (i = 0; i < service->n_filters; i++) + { + if ((tail = filterUpstream(service->filters[i], + session->filters[i].session, + &session->tail)) == NULL) + { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Failed to create filter '%s' for service '%s'.\n", + service->filters[i]->name, + service->name))); + return 0; + } + session->tail = *tail; + } + return 1; } + +int +session_reply(void *instance, void *session, GWBUF *data) +{ +SESSION *the_session = (SESSION *)session; + return the_session->client->func.write(the_session->client, data); +} diff --git a/server/include/filter.h b/server/include/filter.h index 8ddcfb00d..0da887d3a 100644 --- a/server/include/filter.h +++ b/server/include/filter.h @@ -61,6 +61,7 @@ typedef struct { * filter pipline * routeQuery Called on each query that requires * routing + * clientReply * diagnostics Called to force the filter to print * diagnostic output * @@ -74,7 +75,9 @@ typedef struct filter_object { void (*closeSession)(FILTER *instance, void *fsession); void (*freeSession)(FILTER *instance, void *fsession); void (*setDownstream)(FILTER *instance, void *fsession, DOWNSTREAM *downstream); + void (*setUpstream)(FILTER *instance, void *fsession, UPSTREAM *downstream); int (*routeQuery)(FILTER *instance, void *fsession, GWBUF *queue); + int (*clientReply)(FILTER *instance, void *fsession, GWBUF *queue); void (*diagnostics)(FILTER *instance, void *fsession, DCB *dcb); } FILTER_OBJECT; @@ -83,7 +86,7 @@ typedef struct filter_object { * is changed these values must be updated in line with the rules in the * file modinfo.h. */ -#define FILTER_VERSION {1, 0, 0} +#define FILTER_VERSION {1, 1, 0} /** * The definition of a filter form the configuration file. * This is basically the link between a plugin to load and the diff --git a/server/include/session.h b/server/include/session.h index f99982802..1c827b095 100644 --- a/server/include/session.h +++ b/server/include/session.h @@ -70,8 +70,8 @@ typedef enum { typedef struct { void *instance; void *session; - int (*routeQuery)(void *instance, - void *router_session, GWBUF *queue); + int (*routeQuery)(void *instance, void *session, + GWBUF *request); } DOWNSTREAM; /** @@ -81,8 +81,9 @@ typedef struct { typedef struct { void *instance; void *session; - int (*write)(void *, void *, GWBUF *); - int (*error)(void *); + int (*clientReply)(void *instance, + void *session, GWBUF *response); + int (*error)(void *instance, void *session, void *); } UPSTREAM; /** @@ -117,6 +118,7 @@ typedef struct session { int n_filters; /**< Number of filter sessions */ SESSION_FILTER *filters; /**< The filters in use within this session */ DOWNSTREAM head; /**< Head of the filter chain */ + UPSTREAM tail; /**< The tail of the filter chain */ struct session *next; /**< Linked list of all sessions */ int refcount; /**< Reference count on the session */ #if defined(SS_DEBUG) @@ -131,13 +133,22 @@ typedef struct session { * the incoming data to the first element in the pipeline of filters and * routers. */ -#define SESSION_ROUTE_QUERY(session, buf) \ - ((session)->head.routeQuery)((session)->head.instance, \ - (session)->head.session, (buf)) +#define SESSION_ROUTE_QUERY(sess, buf) \ + ((sess)->head.routeQuery)((sess)->head.instance, \ + (sess)->head.session, (buf)) +/** + * A convenience macro that can be used by the router modules to route + * the replies to the first element in the pipeline of filters and + * the protocol. + */ +#define SESSION_ROUTE_REPLY(sess, buf) \ + ((sess)->tail.clientReply)((sess)->tail.instance, \ + (sess)->tail.session, (buf)) SESSION *session_alloc(struct service *, struct dcb *); bool session_free(SESSION *); int session_isvalid(SESSION *); +int session_reply(void *inst, void *session, GWBUF *data); void printAllSessions(); void printSession(SESSION *); void dprintAllSessions(struct dcb *); diff --git a/server/modules/filter/Makefile b/server/modules/filter/Makefile index 14b226b7d..f7f6bb29a 100644 --- a/server/modules/filter/Makefile +++ b/server/modules/filter/Makefile @@ -38,10 +38,12 @@ QLASRCS=qlafilter.c QLAOBJ=$(QLASRCS:.c=.o) REGEXSRCS=regexfilter.c REGEXOBJ=$(REGEXSRCS:.c=.o) -SRCS=$(TESTSRCS) $(QLASRCS) $(REGEXSRCS) +TOPNSRCS=topfilter.c +TOPNOBJ=$(TOPNSRCS:.c=.o) +SRCS=$(TESTSRCS) $(QLASRCS) $(REGEXSRCS) $(TOPNSRCS) OBJ=$(SRCS:.c=.o) LIBS=$(UTILSPATH)/skygw_utils.o -lssl -llog_manager -MODULES= libtestfilter.so libqlafilter.so libregexfilter.so +MODULES= libtestfilter.so libqlafilter.so libregexfilter.so libtopfilter.so all: $(MODULES) @@ -55,6 +57,9 @@ libqlafilter.so: $(QLAOBJ) libregexfilter.so: $(REGEXOBJ) $(CC) $(LDFLAGS) $(REGEXOBJ) $(LIBS) -o $@ +libtopfilter.so: $(TOPNOBJ) + $(CC) $(LDFLAGS) $(TOPNOBJ) $(LIBS) -o $@ + .c.o: $(CC) $(CFLAGS) $< -o $@ diff --git a/server/modules/filter/qlafilter.c b/server/modules/filter/qlafilter.c index 520f1e1a9..928d17869 100644 --- a/server/modules/filter/qlafilter.c +++ b/server/modules/filter/qlafilter.c @@ -64,7 +64,9 @@ static FILTER_OBJECT MyObject = { closeSession, freeSession, setDownstream, + NULL, // No Upstream requirement routeQuery, + NULL, // No client reply diagnostic, }; diff --git a/server/modules/filter/regexfilter.c b/server/modules/filter/regexfilter.c index ad773c40c..f7680a84f 100644 --- a/server/modules/filter/regexfilter.c +++ b/server/modules/filter/regexfilter.c @@ -56,7 +56,9 @@ static FILTER_OBJECT MyObject = { closeSession, freeSession, setDownstream, + NULL, // No Upstream requirement routeQuery, + NULL, diagnostic, }; diff --git a/server/modules/filter/testfilter.c b/server/modules/filter/testfilter.c index 270dbd1cb..ba05e535e 100644 --- a/server/modules/filter/testfilter.c +++ b/server/modules/filter/testfilter.c @@ -54,7 +54,9 @@ static FILTER_OBJECT MyObject = { closeSession, freeSession, setDownstream, + NULL, // No upstream requirement routeQuery, + NULL, diagnostic, }; diff --git a/server/modules/filter/topfilter.c b/server/modules/filter/topfilter.c new file mode 100644 index 000000000..0923122ec --- /dev/null +++ b/server/modules/filter/topfilter.c @@ -0,0 +1,421 @@ +/* + * This file is distributed as part of MaxScale by SkySQL. It is free + * software: you can redistribute it and/or modify it under the terms of the + * GNU General Public License as published by the Free Software Foundation, + * version 2. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Copyright SkySQL Ab 2014 + */ + +/** + * TOPN Filter - Query Log All. A primitive query logging filter, simply + * used to verify the filter mechanism for downstream filters. All queries + * that are passed through the filter will be written to file. + * + * The filter makes no attempt to deal with query packets that do not fit + * in a single GWBUF. + * + * A single option may be passed to the filter, this is the name of the + * file to which the queries are logged. A serial number is appended to this + * name in order that each session logs to a different file. + */ +#include +#include +#include +#include +#include +#include +#include +#include + +MODULE_INFO info = { + MODULE_API_FILTER, + MODULE_ALPHA_RELEASE, + FILTER_VERSION, + "A top N query logging filter" +}; + +static char *version_str = "V1.0.0"; + +/* + * The filter entry points + */ +static FILTER *createInstance(char **options, FILTER_PARAMETER **); +static void *newSession(FILTER *instance, SESSION *session); +static void closeSession(FILTER *instance, void *session); +static void freeSession(FILTER *instance, void *session); +static void setDownstream(FILTER *instance, void *fsession, DOWNSTREAM *downstream); +static void setUpstream(FILTER *instance, void *fsession, UPSTREAM *upstream); +static int routeQuery(FILTER *instance, void *fsession, GWBUF *queue); +static int clientReply(FILTER *instance, void *fsession, GWBUF *queue); +static void diagnostic(FILTER *instance, void *fsession, DCB *dcb); + + +static FILTER_OBJECT MyObject = { + createInstance, + newSession, + closeSession, + freeSession, + setDownstream, + setUpstream, + routeQuery, + clientReply, + diagnostic, +}; + +/** + * A instance structure, the assumption is that the option passed + * to the filter is simply a base for the filename to which the queries + * are logged. + * + * To this base a session number is attached such that each session will + * have a nique name. + */ +typedef struct { + int sessions; + int topN; + char *filebase; +} TOPN_INSTANCE; + +/** + * Structure to hold the Top N queries + */ +typedef struct topnq { + struct timeval duration; + char *sql; +} TOPNQ; + +/** + * The session structure for this TOPN filter. + * This stores the downstream filter information, such that the + * filter is able to pass the query on to the next filter (or router) + * in the chain. + * + * It also holds the file descriptor to which queries are written. + */ +typedef struct { + DOWNSTREAM down; + UPSTREAM up; + char *filename; + int fd; + struct timeval start; + char *current; + TOPNQ **top; + int n_statements; + struct timeval total; +} TOPN_SESSION; + +/** + * Implementation of the mandatory version entry point + * + * @return version string of the module + */ +char * +version() +{ + return version_str; +} + +/** + * The module initialisation routine, called when the module + * is first loaded. + */ +void +ModuleInit() +{ +} + +/** + * The module entry point routine. It is this routine that + * must populate the structure that is referred to as the + * "module object", this is a structure with the set of + * external entry points for this module. + * + * @return The module object + */ +FILTER_OBJECT * +GetModuleObject() +{ + return &MyObject; +} + +/** + * Create an instance of the filter for a particular service + * within MaxScale. + * + * @param options The options for this filter + * + * @return The instance data for this new instance + */ +static FILTER * +createInstance(char **options, FILTER_PARAMETER **params) +{ +int i; +TOPN_INSTANCE *my_instance; + + if ((my_instance = calloc(1, sizeof(TOPN_INSTANCE))) != NULL) + { + for (i = 0; params[i]; i++) + { + if (!strcmp(params[i]->name, "count")) + my_instance->topN = atoi(params[i]->value); + if (!strcmp(params[i]->name, "filebase")) + my_instance->filebase = strdup(params[i]->value); + } + my_instance->sessions = 0; + } + return (FILTER *)my_instance; +} + +/** + * Associate a new session with this instance of the filter. + * + * Create the file to log to and open it. + * + * @param instance The filter instance data + * @param session The session itself + * @return Session specific data for this session + */ +static void * +newSession(FILTER *instance, SESSION *session) +{ +TOPN_INSTANCE *my_instance = (TOPN_INSTANCE *)instance; +TOPN_SESSION *my_session; +int i; + + if ((my_session = calloc(1, sizeof(TOPN_SESSION))) != NULL) + { + if ((my_session->filename = + (char *)malloc(strlen(my_instance->filebase) + 20)) + == NULL) + { + free(my_session); + return NULL; + } + sprintf(my_session->filename, "%s.%d", my_instance->filebase, + my_instance->sessions); + my_instance->sessions++; + my_session->top = (TOPNQ **)calloc(my_instance->topN + 1, + sizeof(TOPNQ *)); + for (i = 0; i < my_instance->topN; i++) + { + my_session->top[i] = (TOPNQ *)calloc(1, sizeof(TOPNQ)); + my_session->top[i]->sql = NULL; + } + my_session->n_statements = 0; + my_session->total.tv_sec = 0; + my_session->total.tv_usec = 0; + } + + return my_session; +} + +/** + * Close a session with the filter, this is the mechanism + * by which a filter may cleanup data structure etc. + * In the case of the TOPN filter we simple close the file descriptor. + * + * @param instance The filter instance data + * @param session The session being closed + */ +static void +closeSession(FILTER *instance, void *session) +{ +TOPN_INSTANCE *my_instance = (TOPN_INSTANCE *)instance; +TOPN_SESSION *my_session = (TOPN_SESSION *)session; +int i; +FILE *fp; + + if ((fp = fopen(my_session->filename, "w")) != NULL) + { + fprintf(fp, "Top %d longest running queries in session.\n", + my_instance->topN); + for (i = 0; i < my_instance->topN; i++) + { + if (my_session->top[i]->sql) + { + fprintf(fp, "%4d.%-3d, %s\n", + my_session->top[i]->duration.tv_sec, + my_session->top[i]->duration.tv_usec / 1000, + my_session->top[i]->sql); + } + } + fprintf(fp, "\n\nTotal of %d statements executed.\n", + my_session->n_statements); + fprintf(fp, "Total statement execution time %d.%d seconds\n", + my_session->total.tv_sec, + my_session->total.tv_usec / 1000); + fprintf(fp, "Average statement execution time %.3f.\n", + (double)((my_session->total.tv_sec * 1000) + + (my_session->total.tv_usec / 1000)) + / (1000 * my_session->n_statements)); + fclose(fp); + } +} + +/** + * Free the memory associated with the session + * + * @param instance The filter instance + * @param session The filter session + */ +static void +freeSession(FILTER *instance, void *session) +{ +TOPN_SESSION *my_session = (TOPN_SESSION *)session; + + free(my_session->filename); + free(session); + return; +} + +/** + * Set the downstream filter or router to which queries will be + * passed from this filter. + * + * @param instance The filter instance data + * @param session The filter session + * @param downstream The downstream filter or router. + */ +static void +setDownstream(FILTER *instance, void *session, DOWNSTREAM *downstream) +{ +TOPN_SESSION *my_session = (TOPN_SESSION *)session; + + my_session->down = *downstream; +} + +/** + * Set the upstream filter or session to which results will be + * passed from this filter. + * + * @param instance The filter instance data + * @param session The filter session + * @param upstream The upstream filter or session. + */ +static void +setUpstream(FILTER *instance, void *session, UPSTREAM *upstream) +{ +TOPN_SESSION *my_session = (TOPN_SESSION *)session; + + my_session->up = *upstream; +} + +/** + * The routeQuery entry point. This is passed the query buffer + * to which the filter should be applied. Once applied the + * query should normally be passed to the downstream component + * (filter or router) in the filter chain. + * + * @param instance The filter instance data + * @param session The filter session + * @param queue The query data + */ +static int +routeQuery(FILTER *instance, void *session, GWBUF *queue) +{ +TOPN_SESSION *my_session = (TOPN_SESSION *)session; +char *ptr; +int length; + + if (modutil_extract_SQL(queue, &ptr, &length)) + { + my_session->n_statements++; + if (my_session->current) + free(my_session->current); + gettimeofday(&my_session->start, NULL); + my_session->current = strndup(ptr, length); + } + + /* Pass the query downstream */ + return my_session->down.routeQuery(my_session->down.instance, + my_session->down.session, queue); +} + +static int +cmp_topn(TOPNQ **a, TOPNQ **b) +{ + if ((*b)->duration.tv_sec == (*a)->duration.tv_sec) + return (*b)->duration.tv_usec - (*a)->duration.tv_usec; + return (*b)->duration.tv_sec - (*a)->duration.tv_sec; +} + +static int +clientReply(FILTER *instance, void *session, GWBUF *reply) +{ +TOPN_INSTANCE *my_instance = (TOPN_INSTANCE *)instance; +TOPN_SESSION *my_session = (TOPN_SESSION *)session; +struct timeval tv, diff; +int i, inserted; + + if (my_session->current) + { + gettimeofday(&tv, NULL); + timersub(&tv, &(my_session->start), &diff); + + timeradd(&(my_session->total), &diff, &(my_session->total)); + + inserted = 0; + for (i = 0; i < my_instance->topN; i++) + { + if (my_session->top[i]->sql == NULL) + { + my_session->top[i]->sql = my_session->current; + my_session->top[i]->duration = diff; + inserted = 1; + break; + } + } + + if (inserted == 0 && ((diff.tv_sec > my_session->top[my_instance->topN-1]->duration.tv_sec) || (diff.tv_sec == my_session->top[my_instance->topN-1]->duration.tv_sec && diff.tv_usec > my_session->top[my_instance->topN-1]->duration.tv_usec ))) + { + free(my_session->top[my_instance->topN-1]->sql); + my_session->top[my_instance->topN-1]->sql = my_session->current; + my_session->top[my_instance->topN-1]->duration = diff; + inserted = 1; + } + + if (inserted) + qsort(my_session->top, my_instance->topN, + sizeof(TOPNQ *), cmp_topn); + else + free(my_session->current); + my_session->current = NULL; + } + + /* Pass the result upstream */ + return my_session->up.clientReply(my_session->up.instance, + my_session->up.session, reply); +} + +/** + * Diagnostics routine + * + * If fsession is NULL then print diagnostics on the filter + * instance as a whole, otherwise print diagnostics for the + * particular session. + * + * @param instance The filter instance + * @param fsession Filter session, may be NULL + * @param dcb The DCB for diagnostic output + */ +static void +diagnostic(FILTER *instance, void *fsession, DCB *dcb) +{ +TOPN_SESSION *my_session = (TOPN_SESSION *)fsession; + + if (my_session) + { + dcb_printf(dcb, "\t\tLogging to file %s.\n", + my_session->filename); + } +} diff --git a/server/modules/protocol/mysql_backend.c b/server/modules/protocol/mysql_backend.c index 9dfa6e637..08589438e 100644 --- a/server/modules/protocol/mysql_backend.c +++ b/server/modules/protocol/mysql_backend.c @@ -318,7 +318,7 @@ static int gw_read_backend_event(DCB *dcb) { /* try reload users' table for next connection */ - service_refresh_users(dcb->session->client->service); + service_refresh_users(dcb->session->service); while (session->state != SESSION_STATE_ROUTER_READY && session->state != SESSION_STATE_STOPPING) diff --git a/server/modules/routing/readconnroute.c b/server/modules/routing/readconnroute.c index 0652f9f0c..d01e61599 100644 --- a/server/modules/routing/readconnroute.c +++ b/server/modules/routing/readconnroute.c @@ -688,7 +688,7 @@ clientReply( ss_dassert(client != NULL); - client->func.write(client, queue); + SESSION_ROUTE_REPLY(backend_dcb->session, queue); } /** diff --git a/server/modules/routing/readwritesplit/readwritesplit.c b/server/modules/routing/readwritesplit/readwritesplit.c index 9fec7e1bd..67139bf02 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.c +++ b/server/modules/routing/readwritesplit/readwritesplit.c @@ -1318,7 +1318,7 @@ static void clientReply( if (writebuf != NULL && client_dcb != NULL) { /** Write reply to client DCB */ - client_dcb->func.write(client_dcb, writebuf); + SESSION_ROUTE_REPLY(backend_dcb->session, writebuf); LOGIF(LT, (skygw_log_write_flush( LOGFILE_TRACE,