Initial upstream fitlering implementation.

Test filter with up and down stream filtering. Keeps top N queries
and prints a report of these queries on session close.
This commit is contained in:
Mark Riddoch 2014-06-09 21:13:28 +01:00
parent 9e164b83f0
commit 77e1426dbf
12 changed files with 519 additions and 14 deletions

View File

@ -314,3 +314,29 @@ DOWNSTREAM *me;
return me;
}
UPSTREAM *
filterUpstream(FILTER_DEF *filter, void *fsession, UPSTREAM *upstream)
{
UPSTREAM *me;
/*
* The the filter has no setUpstream entry point then is does
* not require to see results and can be left out of the chain.
*/
if (filter->obj->setUpstream == NULL)
return upstream;
if (filter->obj->clientReply != NULL)
{
if ((me = (UPSTREAM *)calloc(1, sizeof(UPSTREAM))) == NULL)
{
return NULL;
}
me->instance = filter->filter;
me->session = fsession;
me->clientReply = filter->obj->clientReply;
filter->obj->setUpstream(me->instance, me->session, upstream);
}
return me;
}

View File

@ -164,9 +164,12 @@ session_alloc(SERVICE *service, DCB *client_dcb)
*/
session->head.instance = service->router_instance;
session->head.session = session->router_session;
session->head.routeQuery = service->router->routeQuery;
session->tail.instance = session;
session->tail.session = session;
session->tail.clientReply = session_reply;
if (service->n_filters > 0)
{
if (!session_setup_filters(session))
@ -315,6 +318,12 @@ bool session_free(
}
if (session->n_filters)
{
for (i = 0; i < session->n_filters; i++)
{
session->filters[i].filter->obj->closeSession(
session->filters[i].instance,
session->filters[i].session);
}
for (i = 0; i < session->n_filters; i++)
{
session->filters[i].filter->obj->freeSession(
@ -610,6 +619,7 @@ session_setup_filters(SESSION *session)
{
SERVICE *service = session->service;
DOWNSTREAM *head;
UPSTREAM *tail;
int i;
if ((session->filters = calloc(service->n_filters,
@ -640,5 +650,28 @@ int i;
session->head = *head;
}
for (i = 0; i < service->n_filters; i++)
{
if ((tail = filterUpstream(service->filters[i],
session->filters[i].session,
&session->tail)) == NULL)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Failed to create filter '%s' for service '%s'.\n",
service->filters[i]->name,
service->name)));
return 0;
}
session->tail = *tail;
}
return 1;
}
int
session_reply(void *instance, void *session, GWBUF *data)
{
SESSION *the_session = (SESSION *)session;
return the_session->client->func.write(the_session->client, data);
}

View File

@ -61,6 +61,7 @@ typedef struct {
* filter pipline
* routeQuery Called on each query that requires
* routing
* clientReply
* diagnostics Called to force the filter to print
* diagnostic output
*
@ -74,7 +75,9 @@ typedef struct filter_object {
void (*closeSession)(FILTER *instance, void *fsession);
void (*freeSession)(FILTER *instance, void *fsession);
void (*setDownstream)(FILTER *instance, void *fsession, DOWNSTREAM *downstream);
void (*setUpstream)(FILTER *instance, void *fsession, UPSTREAM *downstream);
int (*routeQuery)(FILTER *instance, void *fsession, GWBUF *queue);
int (*clientReply)(FILTER *instance, void *fsession, GWBUF *queue);
void (*diagnostics)(FILTER *instance, void *fsession, DCB *dcb);
} FILTER_OBJECT;
@ -83,7 +86,7 @@ typedef struct filter_object {
* is changed these values must be updated in line with the rules in the
* file modinfo.h.
*/
#define FILTER_VERSION {1, 0, 0}
#define FILTER_VERSION {1, 1, 0}
/**
* The definition of a filter form the configuration file.
* This is basically the link between a plugin to load and the

View File

@ -70,8 +70,8 @@ typedef enum {
typedef struct {
void *instance;
void *session;
int (*routeQuery)(void *instance,
void *router_session, GWBUF *queue);
int (*routeQuery)(void *instance, void *session,
GWBUF *request);
} DOWNSTREAM;
/**
@ -81,8 +81,9 @@ typedef struct {
typedef struct {
void *instance;
void *session;
int (*write)(void *, void *, GWBUF *);
int (*error)(void *);
int (*clientReply)(void *instance,
void *session, GWBUF *response);
int (*error)(void *instance, void *session, void *);
} UPSTREAM;
/**
@ -117,6 +118,7 @@ typedef struct session {
int n_filters; /**< Number of filter sessions */
SESSION_FILTER *filters; /**< The filters in use within this session */
DOWNSTREAM head; /**< Head of the filter chain */
UPSTREAM tail; /**< The tail of the filter chain */
struct session *next; /**< Linked list of all sessions */
int refcount; /**< Reference count on the session */
#if defined(SS_DEBUG)
@ -131,13 +133,22 @@ typedef struct session {
* the incoming data to the first element in the pipeline of filters and
* routers.
*/
#define SESSION_ROUTE_QUERY(session, buf) \
((session)->head.routeQuery)((session)->head.instance, \
(session)->head.session, (buf))
#define SESSION_ROUTE_QUERY(sess, buf) \
((sess)->head.routeQuery)((sess)->head.instance, \
(sess)->head.session, (buf))
/**
* A convenience macro that can be used by the router modules to route
* the replies to the first element in the pipeline of filters and
* the protocol.
*/
#define SESSION_ROUTE_REPLY(sess, buf) \
((sess)->tail.clientReply)((sess)->tail.instance, \
(sess)->tail.session, (buf))
SESSION *session_alloc(struct service *, struct dcb *);
bool session_free(SESSION *);
int session_isvalid(SESSION *);
int session_reply(void *inst, void *session, GWBUF *data);
void printAllSessions();
void printSession(SESSION *);
void dprintAllSessions(struct dcb *);

View File

@ -38,10 +38,12 @@ QLASRCS=qlafilter.c
QLAOBJ=$(QLASRCS:.c=.o)
REGEXSRCS=regexfilter.c
REGEXOBJ=$(REGEXSRCS:.c=.o)
SRCS=$(TESTSRCS) $(QLASRCS) $(REGEXSRCS)
TOPNSRCS=topfilter.c
TOPNOBJ=$(TOPNSRCS:.c=.o)
SRCS=$(TESTSRCS) $(QLASRCS) $(REGEXSRCS) $(TOPNSRCS)
OBJ=$(SRCS:.c=.o)
LIBS=$(UTILSPATH)/skygw_utils.o -lssl -llog_manager
MODULES= libtestfilter.so libqlafilter.so libregexfilter.so
MODULES= libtestfilter.so libqlafilter.so libregexfilter.so libtopfilter.so
all: $(MODULES)
@ -55,6 +57,9 @@ libqlafilter.so: $(QLAOBJ)
libregexfilter.so: $(REGEXOBJ)
$(CC) $(LDFLAGS) $(REGEXOBJ) $(LIBS) -o $@
libtopfilter.so: $(TOPNOBJ)
$(CC) $(LDFLAGS) $(TOPNOBJ) $(LIBS) -o $@
.c.o:
$(CC) $(CFLAGS) $< -o $@

View File

@ -64,7 +64,9 @@ static FILTER_OBJECT MyObject = {
closeSession,
freeSession,
setDownstream,
NULL, // No Upstream requirement
routeQuery,
NULL, // No client reply
diagnostic,
};

View File

@ -56,7 +56,9 @@ static FILTER_OBJECT MyObject = {
closeSession,
freeSession,
setDownstream,
NULL, // No Upstream requirement
routeQuery,
NULL,
diagnostic,
};

View File

@ -54,7 +54,9 @@ static FILTER_OBJECT MyObject = {
closeSession,
freeSession,
setDownstream,
NULL, // No upstream requirement
routeQuery,
NULL,
diagnostic,
};

View File

@ -0,0 +1,421 @@
/*
* This file is distributed as part of MaxScale by SkySQL. It is free
* software: you can redistribute it and/or modify it under the terms of the
* GNU General Public License as published by the Free Software Foundation,
* version 2.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along with
* this program; if not, write to the Free Software Foundation, Inc., 51
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Copyright SkySQL Ab 2014
*/
/**
* TOPN Filter - Query Log All. A primitive query logging filter, simply
* used to verify the filter mechanism for downstream filters. All queries
* that are passed through the filter will be written to file.
*
* The filter makes no attempt to deal with query packets that do not fit
* in a single GWBUF.
*
* A single option may be passed to the filter, this is the name of the
* file to which the queries are logged. A serial number is appended to this
* name in order that each session logs to a different file.
*/
#include <stdio.h>
#include <fcntl.h>
#include <filter.h>
#include <modinfo.h>
#include <modutil.h>
#include <string.h>
#include <time.h>
#include <sys/time.h>
MODULE_INFO info = {
MODULE_API_FILTER,
MODULE_ALPHA_RELEASE,
FILTER_VERSION,
"A top N query logging filter"
};
static char *version_str = "V1.0.0";
/*
* The filter entry points
*/
static FILTER *createInstance(char **options, FILTER_PARAMETER **);
static void *newSession(FILTER *instance, SESSION *session);
static void closeSession(FILTER *instance, void *session);
static void freeSession(FILTER *instance, void *session);
static void setDownstream(FILTER *instance, void *fsession, DOWNSTREAM *downstream);
static void setUpstream(FILTER *instance, void *fsession, UPSTREAM *upstream);
static int routeQuery(FILTER *instance, void *fsession, GWBUF *queue);
static int clientReply(FILTER *instance, void *fsession, GWBUF *queue);
static void diagnostic(FILTER *instance, void *fsession, DCB *dcb);
static FILTER_OBJECT MyObject = {
createInstance,
newSession,
closeSession,
freeSession,
setDownstream,
setUpstream,
routeQuery,
clientReply,
diagnostic,
};
/**
* A instance structure, the assumption is that the option passed
* to the filter is simply a base for the filename to which the queries
* are logged.
*
* To this base a session number is attached such that each session will
* have a nique name.
*/
typedef struct {
int sessions;
int topN;
char *filebase;
} TOPN_INSTANCE;
/**
* Structure to hold the Top N queries
*/
typedef struct topnq {
struct timeval duration;
char *sql;
} TOPNQ;
/**
* The session structure for this TOPN filter.
* This stores the downstream filter information, such that the
* filter is able to pass the query on to the next filter (or router)
* in the chain.
*
* It also holds the file descriptor to which queries are written.
*/
typedef struct {
DOWNSTREAM down;
UPSTREAM up;
char *filename;
int fd;
struct timeval start;
char *current;
TOPNQ **top;
int n_statements;
struct timeval total;
} TOPN_SESSION;
/**
* Implementation of the mandatory version entry point
*
* @return version string of the module
*/
char *
version()
{
return version_str;
}
/**
* The module initialisation routine, called when the module
* is first loaded.
*/
void
ModuleInit()
{
}
/**
* The module entry point routine. It is this routine that
* must populate the structure that is referred to as the
* "module object", this is a structure with the set of
* external entry points for this module.
*
* @return The module object
*/
FILTER_OBJECT *
GetModuleObject()
{
return &MyObject;
}
/**
* Create an instance of the filter for a particular service
* within MaxScale.
*
* @param options The options for this filter
*
* @return The instance data for this new instance
*/
static FILTER *
createInstance(char **options, FILTER_PARAMETER **params)
{
int i;
TOPN_INSTANCE *my_instance;
if ((my_instance = calloc(1, sizeof(TOPN_INSTANCE))) != NULL)
{
for (i = 0; params[i]; i++)
{
if (!strcmp(params[i]->name, "count"))
my_instance->topN = atoi(params[i]->value);
if (!strcmp(params[i]->name, "filebase"))
my_instance->filebase = strdup(params[i]->value);
}
my_instance->sessions = 0;
}
return (FILTER *)my_instance;
}
/**
* Associate a new session with this instance of the filter.
*
* Create the file to log to and open it.
*
* @param instance The filter instance data
* @param session The session itself
* @return Session specific data for this session
*/
static void *
newSession(FILTER *instance, SESSION *session)
{
TOPN_INSTANCE *my_instance = (TOPN_INSTANCE *)instance;
TOPN_SESSION *my_session;
int i;
if ((my_session = calloc(1, sizeof(TOPN_SESSION))) != NULL)
{
if ((my_session->filename =
(char *)malloc(strlen(my_instance->filebase) + 20))
== NULL)
{
free(my_session);
return NULL;
}
sprintf(my_session->filename, "%s.%d", my_instance->filebase,
my_instance->sessions);
my_instance->sessions++;
my_session->top = (TOPNQ **)calloc(my_instance->topN + 1,
sizeof(TOPNQ *));
for (i = 0; i < my_instance->topN; i++)
{
my_session->top[i] = (TOPNQ *)calloc(1, sizeof(TOPNQ));
my_session->top[i]->sql = NULL;
}
my_session->n_statements = 0;
my_session->total.tv_sec = 0;
my_session->total.tv_usec = 0;
}
return my_session;
}
/**
* Close a session with the filter, this is the mechanism
* by which a filter may cleanup data structure etc.
* In the case of the TOPN filter we simple close the file descriptor.
*
* @param instance The filter instance data
* @param session The session being closed
*/
static void
closeSession(FILTER *instance, void *session)
{
TOPN_INSTANCE *my_instance = (TOPN_INSTANCE *)instance;
TOPN_SESSION *my_session = (TOPN_SESSION *)session;
int i;
FILE *fp;
if ((fp = fopen(my_session->filename, "w")) != NULL)
{
fprintf(fp, "Top %d longest running queries in session.\n",
my_instance->topN);
for (i = 0; i < my_instance->topN; i++)
{
if (my_session->top[i]->sql)
{
fprintf(fp, "%4d.%-3d, %s\n",
my_session->top[i]->duration.tv_sec,
my_session->top[i]->duration.tv_usec / 1000,
my_session->top[i]->sql);
}
}
fprintf(fp, "\n\nTotal of %d statements executed.\n",
my_session->n_statements);
fprintf(fp, "Total statement execution time %d.%d seconds\n",
my_session->total.tv_sec,
my_session->total.tv_usec / 1000);
fprintf(fp, "Average statement execution time %.3f.\n",
(double)((my_session->total.tv_sec * 1000)
+ (my_session->total.tv_usec / 1000))
/ (1000 * my_session->n_statements));
fclose(fp);
}
}
/**
* Free the memory associated with the session
*
* @param instance The filter instance
* @param session The filter session
*/
static void
freeSession(FILTER *instance, void *session)
{
TOPN_SESSION *my_session = (TOPN_SESSION *)session;
free(my_session->filename);
free(session);
return;
}
/**
* Set the downstream filter or router to which queries will be
* passed from this filter.
*
* @param instance The filter instance data
* @param session The filter session
* @param downstream The downstream filter or router.
*/
static void
setDownstream(FILTER *instance, void *session, DOWNSTREAM *downstream)
{
TOPN_SESSION *my_session = (TOPN_SESSION *)session;
my_session->down = *downstream;
}
/**
* Set the upstream filter or session to which results will be
* passed from this filter.
*
* @param instance The filter instance data
* @param session The filter session
* @param upstream The upstream filter or session.
*/
static void
setUpstream(FILTER *instance, void *session, UPSTREAM *upstream)
{
TOPN_SESSION *my_session = (TOPN_SESSION *)session;
my_session->up = *upstream;
}
/**
* The routeQuery entry point. This is passed the query buffer
* to which the filter should be applied. Once applied the
* query should normally be passed to the downstream component
* (filter or router) in the filter chain.
*
* @param instance The filter instance data
* @param session The filter session
* @param queue The query data
*/
static int
routeQuery(FILTER *instance, void *session, GWBUF *queue)
{
TOPN_SESSION *my_session = (TOPN_SESSION *)session;
char *ptr;
int length;
if (modutil_extract_SQL(queue, &ptr, &length))
{
my_session->n_statements++;
if (my_session->current)
free(my_session->current);
gettimeofday(&my_session->start, NULL);
my_session->current = strndup(ptr, length);
}
/* Pass the query downstream */
return my_session->down.routeQuery(my_session->down.instance,
my_session->down.session, queue);
}
static int
cmp_topn(TOPNQ **a, TOPNQ **b)
{
if ((*b)->duration.tv_sec == (*a)->duration.tv_sec)
return (*b)->duration.tv_usec - (*a)->duration.tv_usec;
return (*b)->duration.tv_sec - (*a)->duration.tv_sec;
}
static int
clientReply(FILTER *instance, void *session, GWBUF *reply)
{
TOPN_INSTANCE *my_instance = (TOPN_INSTANCE *)instance;
TOPN_SESSION *my_session = (TOPN_SESSION *)session;
struct timeval tv, diff;
int i, inserted;
if (my_session->current)
{
gettimeofday(&tv, NULL);
timersub(&tv, &(my_session->start), &diff);
timeradd(&(my_session->total), &diff, &(my_session->total));
inserted = 0;
for (i = 0; i < my_instance->topN; i++)
{
if (my_session->top[i]->sql == NULL)
{
my_session->top[i]->sql = my_session->current;
my_session->top[i]->duration = diff;
inserted = 1;
break;
}
}
if (inserted == 0 && ((diff.tv_sec > my_session->top[my_instance->topN-1]->duration.tv_sec) || (diff.tv_sec == my_session->top[my_instance->topN-1]->duration.tv_sec && diff.tv_usec > my_session->top[my_instance->topN-1]->duration.tv_usec )))
{
free(my_session->top[my_instance->topN-1]->sql);
my_session->top[my_instance->topN-1]->sql = my_session->current;
my_session->top[my_instance->topN-1]->duration = diff;
inserted = 1;
}
if (inserted)
qsort(my_session->top, my_instance->topN,
sizeof(TOPNQ *), cmp_topn);
else
free(my_session->current);
my_session->current = NULL;
}
/* Pass the result upstream */
return my_session->up.clientReply(my_session->up.instance,
my_session->up.session, reply);
}
/**
* Diagnostics routine
*
* If fsession is NULL then print diagnostics on the filter
* instance as a whole, otherwise print diagnostics for the
* particular session.
*
* @param instance The filter instance
* @param fsession Filter session, may be NULL
* @param dcb The DCB for diagnostic output
*/
static void
diagnostic(FILTER *instance, void *fsession, DCB *dcb)
{
TOPN_SESSION *my_session = (TOPN_SESSION *)fsession;
if (my_session)
{
dcb_printf(dcb, "\t\tLogging to file %s.\n",
my_session->filename);
}
}

View File

@ -318,7 +318,7 @@ static int gw_read_backend_event(DCB *dcb) {
/* try reload users' table for next connection */
service_refresh_users(dcb->session->client->service);
service_refresh_users(dcb->session->service);
while (session->state != SESSION_STATE_ROUTER_READY &&
session->state != SESSION_STATE_STOPPING)

View File

@ -688,7 +688,7 @@ clientReply(
ss_dassert(client != NULL);
client->func.write(client, queue);
SESSION_ROUTE_REPLY(backend_dcb->session, queue);
}
/**

View File

@ -1318,7 +1318,7 @@ static void clientReply(
if (writebuf != NULL && client_dcb != NULL)
{
/** Write reply to client DCB */
client_dcb->func.write(client_dcb, writebuf);
SESSION_ROUTE_REPLY(backend_dcb->session, writebuf);
LOGIF(LT, (skygw_log_write_flush(
LOGFILE_TRACE,