First working filters implementaton.

Only downstream filters are supported currently, i.e. no result
set filtering can be done.

A crude QLA (Query Log All) filter is included as a test harness only
This commit is contained in:
Mark Riddoch
2014-05-30 16:45:39 +01:00
parent 7bca4e383f
commit 8d55be4b23
16 changed files with 1158 additions and 51 deletions

View File

@ -33,6 +33,7 @@
# 29/06/13 Vilho Raatikka Reverted Query classifier changes because
# gateway needs mysql client lib, not qc.
# 24/07/13 Mark Ridoch Addition of encryption routines
# 30/05/14 Mark Ridoch Filter API added
include ../../build_gateway.inc
@ -56,14 +57,15 @@ LDFLAGS=-rdynamic -L$(LOGPATH) \
SRCS= atomic.c buffer.c spinlock.c gateway.c \
gw_utils.c utils.c dcb.c load_utils.c session.c service.c server.c \
poll.c config.c users.c hashtable.c dbusers.c thread.c gwbitmask.c \
monitor.c adminusers.c secrets.c
monitor.c adminusers.c secrets.c filter.c
HDRS= ../include/atomic.h ../include/buffer.h ../include/dcb.h \
../include/gw.h ../modules/include/mysql_client_server_protocol.h \
../include/session.h ../include/spinlock.h ../include/thread.h \
../include/modules.h ../include/poll.h ../include/config.h \
../include/users.h ../include/hashtable.h ../include/gwbitmask.h \
../include/adminusers.h ../include/version.h ../include/maxscale.h
../include/adminusers.h ../include/version.h ../include/maxscale.h \
../include/filter.h
OBJ=$(SRCS:.c=.o)

View File

@ -31,6 +31,7 @@
* 11/03/14 Massimiliano Pinto Added Unix socket support
* 11/05/14 Massimiliano Pinto Added version_string support to service
* 19/05/14 Mark Riddoch Added unique names from section headers
* 29/05/14 Mark Riddoch Addition of filter definition
*
* @endverbatim
*/
@ -215,6 +216,8 @@ int error_count = 0;
{
char *router = config_get_value(obj->parameters,
"router");
char *filters = config_get_value(obj->parameters,
"filters");
if (router)
{
char* max_slave_conn_str;
@ -301,6 +304,9 @@ int error_count = 0;
param->value)));
}
}
if (filters)
serviceSetFilters(obj->element,
filters);
}
else
{
@ -359,6 +365,36 @@ int error_count = 0;
obj->object)));
}
}
else if (!strcmp(type, "filter"))
{
char *module = config_get_value(obj->parameters,
"module");
char *options = config_get_value(obj->parameters,
"options");
if (module)
{
obj->element = filter_alloc(obj->object, module);
}
else
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error: Filter '%s' has no module "
"defined defined to load.",
obj->object)));
error_count++;
}
if (obj->element)
{
char *s = strtok(options, ",");
while (s)
{
filterAddOption(obj->element, s);
s = strtok(NULL, ",");
}
}
}
obj = obj->next;
}
@ -550,7 +586,8 @@ int error_count = 0;
error_count++;
}
}
else if (strcmp(type, "server") != 0)
else if (strcmp(type, "server") != 0
&& strcmp(type, "filter") != 0)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
@ -959,10 +996,12 @@ SERVER *server;
{
char *servers;
char *roptions;
char *filters;
servers = config_get_value(obj->parameters, "servers");
roptions = config_get_value(obj->parameters,
"router_options");
filters = config_get_value(obj->parameters, "filters");
if (servers && obj->element)
{
char *s = strtok(servers, ",");
@ -996,6 +1035,8 @@ SERVER *server;
s = strtok(NULL, ",");
}
}
if (filters)
serviceSetFilters(obj->element, filters);
}
else if (!strcmp(type, "listener"))
{
@ -1080,6 +1121,7 @@ static char *service_params[] =
"enable_root_user",
"max_slave_connections",
"version_string",
"filters",
NULL
};

279
server/core/filter.c Normal file
View File

@ -0,0 +1,279 @@
/*
* This file is distributed as part of MaxScale from SkySQL. It is free
* software: you can redistribute it and/or modify it under the terms of the
* GNU General Public License as published by the Free Software Foundation,
* version 2.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along with
* this program; if not, write to the Free Software Foundation, Inc., 51
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Copyright SkySQL Ab 2014
*/
/**
* @file filter.c - A representation of a filter within MaxScale.
*
* @verbatim
* Revision History
*
* Date Who Description
* 29/05/14 Mark Riddoch Initial implementation
*
* @endverbatim
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <filter.h>
#include <session.h>
#include <modules.h>
#include <spinlock.h>
#include <skygw_utils.h>
#include <log_manager.h>
extern int lm_enabled_logfiles_bitmask;
static SPINLOCK filter_spin = SPINLOCK_INIT;
static FILTER_DEF *allFilters = NULL;
/**
* Allocate a new filter within MaxScale
*
*
* @param name The filter name
* @param module The module to load
*
* @return The newly created filter or NULL if an error occured
*/
FILTER_DEF *
filter_alloc(char *name, char *module)
{
FILTER_DEF *filter;
if ((filter = (FILTER_DEF *)malloc(sizeof(FILTER_DEF))) == NULL)
return NULL;
filter->name = strdup(name);
filter->module = strdup(module);
spinlock_init(&filter->spin);
spinlock_acquire(&filter_spin);
filter->next = allFilters;
allFilters = filter;
spinlock_release(&filter_spin);
return filter;
}
/**
* Deallocate the specified filter
*
* @param server The service to deallocate
* @return Returns true if the server was freed
*/
void
filter_free(FILTER_DEF *filter)
{
FILTER_DEF *ptr;
/* First of all remove from the linked list */
spinlock_acquire(&filter_spin);
if (allFilters == filter)
{
allFilters = filter->next;
}
else
{
ptr = allFilters;
while (ptr && ptr->next != filter)
{
ptr = ptr->next;
}
if (ptr)
ptr->next = filter->next;
}
spinlock_release(&filter_spin);
/* Clean up session and free the memory */
free(filter->name);
free(filter->module);
free(filter);
}
/**
* Find an existing filter using the unique section name in
* configuration file
*
* @param name The filter name
* @return The server or NULL if not found
*/
FILTER_DEF *
filter_find(char *name)
{
FILTER_DEF *filter;
spinlock_acquire(&filter_spin);
filter = allFilters;
while (filter)
{
if (strcmp(filter->name, name) == 0)
break;
filter = filter->next;
}
spinlock_release(&filter_spin);
return filter;
}
/**
* Print all filters to a DCB
*
* Designed to be called within a debugger session in order
* to display all active filters within MaxScale
*/
void
dprintAllFilters(DCB *dcb)
{
FILTER_DEF *ptr;
int i;
spinlock_acquire(&filter_spin);
ptr = allFilters;
while (ptr)
{
dcb_printf(dcb, "Filter %p (%s)\n", ptr, ptr->name);
dcb_printf(dcb, "\tModule: %s\n", ptr->module);
if (ptr->options)
{
dcb_printf(dcb, "\tOptions: ");
for (i = 0; ptr->options && ptr->options[i]; i++)
dcb_printf(dcb, "%s ", ptr->options[i]);
dcb_printf(dcb, "\n");
}
if (ptr->obj && ptr->filter)
ptr->obj->diagnostics(ptr->filter, NULL, dcb);
ptr = ptr->next;
}
spinlock_release(&filter_spin);
}
/**
* Print filter details to a DCB
*
* Designed to be called within a debug CLI in order
* to display all active filters in MaxScale
*/
void
dprintFilter(DCB *dcb, FILTER_DEF *filter)
{
int i;
dcb_printf(dcb, "Filter %p (%s)\n", filter, filter->name);
dcb_printf(dcb, "\tModule: %s\n", filter->module);
if (filter->options)
{
dcb_printf(dcb, "\tOptions: ");
for (i = 0; filter->options && filter->options[i]; i++)
dcb_printf(dcb, "%s ", filter->options[i]);
dcb_printf(dcb, "\n");
}
if (filter->obj && filter->filter)
filter->obj->diagnostics(filter->filter, NULL, dcb);
}
/**
* List all filters in a tabular form to a DCB
*
*/
void
dListFilters(DCB *dcb)
{
FILTER_DEF *ptr;
int i;
spinlock_acquire(&filter_spin);
ptr = allFilters;
if (ptr)
{
dcb_printf(dcb, "%-18s | %-15s | Options\n",
"Filter", "Address", "Status");
dcb_printf(dcb, "-------------------------------------------------------------------------------\n");
}
while (ptr)
{
dcb_printf(dcb, "%-18s | %-15s | ",
ptr->name, ptr->module);
for (i = 0; ptr->options && ptr->options[i]; i++)
dcb_printf(dcb, "%s ", ptr->options[i]);
dcb_printf(dcb, "\n");
ptr = ptr->next;
}
spinlock_release(&filter_spin);
}
/**
* Add a router option to a service
*
* @param service The service to add the router option to
* @param option The option string
*/
void
filterAddOption(FILTER_DEF *filter, char *option)
{
int i;
spinlock_acquire(&filter->spin);
if (filter->options == NULL)
{
filter->options = (char **)calloc(2, sizeof(char *));
filter->options[0] = strdup(option);
filter->options[1] = NULL;
}
else
{
for (i = 0; filter->options[i]; i++)
;
filter->options = (char **)realloc(filter->options,
(i + 2) * sizeof(char *));
filter->options[i] = strdup(option);
filter->options[i+1] = NULL;
}
spinlock_release(&filter->spin);
}
DOWNSTREAM *
filterApply(FILTER_DEF *filter, SESSION *session, DOWNSTREAM *downstream)
{
DOWNSTREAM *me;
if (filter->obj == NULL)
{
/* Filter not yet loaded */
if ((filter->obj = load_module(filter->module,
MODULE_FILTER)) == NULL)
{
return NULL;
}
}
if (filter->filter == NULL)
filter->filter = (filter->obj->createInstance)(filter->options);
if ((me = (DOWNSTREAM *)calloc(1, sizeof(DOWNSTREAM))) == NULL)
{
return NULL;
}
me->instance = filter->filter;
me->routeQuery = filter->obj->routeQuery;
me->session = filter->obj->newSession(me->instance, session);
filter->obj->setDownstream(me->instance, me->session, downstream);
return me;
}

View File

@ -30,6 +30,7 @@
* 28/02/14 Massimiliano Pinto users_alloc moved from service_alloc to serviceStartPort (generic hashable for services)
* 07/05/14 Massimiliano Pinto Added: version_string initialized to NULL
* 23/05/14 Mark Riddoch Addition of service validation call
* 29/05/14 Mark Riddoch Filter API implementation
*
* @endverbatim
*/
@ -46,6 +47,7 @@
#include <modules.h>
#include <dcb.h>
#include <users.h>
#include <filter.h>
#include <dbusers.h>
#include <poll.h>
#include <skygw_utils.h>
@ -110,6 +112,8 @@ SERVICE *service;
service->databases = NULL;
service->svc_config_param = NULL;
service->svc_config_version = 0;
service->filters = NULL;
service->n_filters = 0;
spinlock_init(&service->spin);
spinlock_init(&service->users_table_spin);
memset(&service->rate_limit, 0, sizeof(SERVICE_REFRESH_RATE));
@ -608,6 +612,62 @@ serviceEnableRootUser(SERVICE *service, int action)
return 1;
}
/**
* Trim whitespace from the from an rear of a string
*
* @param str String to trim
* @return Trimmed string, chanesg are done in situ
*/
static char *
trim(char *str)
{
char *ptr;
while (isspace(*str))
str++;
ptr = str + strlen(str);
while (ptr > str && isspace(*ptr))
*ptr-- = 0;
return str;
}
/**
* Set the filters used by the service
*
* @param service The service itself
* @param filters ASCII string of filters to use
*/
void
serviceSetFilters(SERVICE *service, char *filters)
{
FILTER_DEF **flist;
char *ptr, *brkt;
int n = 0;
flist = (FILTER_DEF *)malloc(sizeof(FILTER_DEF *));
ptr = strtok_r(filters, "|", &brkt);
while (ptr)
{
n++;
flist = (FILTER_DEF *)realloc(flist, (n + 1) * sizeof(FILTER_DEF *));
if ((flist[n-1] = filter_find(trim(ptr))) == NULL)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Unable to find filter '%s' for service '%s'\n",
trim(ptr), service->name
)));
}
flist[n] = NULL;
ptr = strtok_r(NULL, "|", &brkt);
}
service->filters = flist;
service->n_filters = n;
}
/**
* Return a named service
*
@ -638,6 +698,7 @@ void
printService(SERVICE *service)
{
SERVER *ptr = service->databases;
int i;
printf("Service %p\n", service);
printf("\tService: %s\n", service->name);
@ -649,6 +710,16 @@ SERVER *ptr = service->databases;
printf("\t\t%s:%d Protocol: %s\n", ptr->name, ptr->port, ptr->protocol);
ptr = ptr->nextdb;
}
if (service->n_filters)
{
printf("\tFilter chain: ");
for (i = 0; i < service->n_filters; i++)
{
printf("%s %s ", service->filters[i]->name,
i + 1 < service->n_filters ? "|" : "");
}
printf("\n");
}
printf("\tUsers data: %p\n", service->users);
printf("\tTotal connections: %d\n", service->stats.n_sessions);
printf("\tCurrently connected: %d\n", service->stats.n_current);
@ -705,6 +776,7 @@ SERVICE *ptr;
void dprintService(DCB *dcb, SERVICE *service)
{
SERVER *server = service->databases;
int i;
dcb_printf(dcb, "Service %p\n", service);
dcb_printf(dcb, "\tService: %s\n", service->name);
@ -714,6 +786,16 @@ SERVER *server = service->databases;
service->router->diagnostics(service->router_instance, dcb);
dcb_printf(dcb, "\tStarted: %s",
asctime(localtime(&service->stats.started)));
if (service->n_filters)
{
dcb_printf(dcb, "\tFilter chain: ");
for (i = 0; i < service->n_filters; i++)
{
dcb_printf(dcb, "%s %s ", service->filters[i]->name,
i + 1 < service->n_filters ? "|" : "");
}
dcb_printf(dcb, "\n");
}
dcb_printf(dcb, "\tBackend databases\n");
while (server)
{

View File

@ -25,6 +25,7 @@
* Date Who Description
* 17/06/13 Mark Riddoch Initial implementation
* 02/09/13 Massimiliano Pinto Added session refcounter
* 29/05/14 Mark Riddoch Addition of filter mechanism
*
* @endverbatim
*/
@ -47,6 +48,9 @@ extern int lm_enabled_logfiles_bitmask;
static SPINLOCK session_spin = SPINLOCK_INIT;
static SESSION *allSessions = NULL;
static int session_setup_filters(SESSION *session);
/**
* Allocate a new session for a new client of the specified service.
*
@ -132,7 +136,7 @@ session_alloc(SERVICE *service, DCB *client_dcb)
/**
* Inform other threads that session is closing.
*/
session->state == SESSION_STATE_STOPPING;
session->state = SESSION_STATE_STOPPING;
/*<
* Decrease refcount, set dcb's session pointer NULL
* and set session pointer to NULL.
@ -147,7 +151,45 @@ session_alloc(SERVICE *service, DCB *client_dcb)
goto return_session;
}
/*
* Pending filter chain being setup set the head of the chain to
* be the router. As filters are inserted the current head will
* be pushed to the filter and the head updated.
*
* NB This dictates that filters are created starting at the end
* of the chain nearest the router working back to the client
* protocol end of the chain.
*/
session->head.instance = service->router_instance;
session->head.session = session->router_session;
session->head.routeQuery = service->router->routeQuery;
if (service->n_filters > 0)
{
if (!session_setup_filters(session))
{
/**
* Inform other threads that session is closing.
*/
session->state = SESSION_STATE_STOPPING;
/*<
* Decrease refcount, set dcb's session pointer NULL
* and set session pointer to NULL.
*/
session_free(session);
client_dcb->session = NULL;
session = NULL;
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Failed to create %s session.",
service->name)));
goto return_session;
}
}
}
spinlock_acquire(&session_spin);
session->state = SESSION_STATE_ROUTER_READY;
session->next = allSessions;
@ -440,6 +482,8 @@ SESSION *ptr;
void
dprintSession(DCB *dcb, SESSION *ptr)
{
DOWNSTREAM *dptr;
dcb_printf(dcb, "Session %p\n", ptr);
dcb_printf(dcb, "\tState: %s\n", session_state(ptr->state));
dcb_printf(dcb, "\tService: %s (%p)\n", ptr->service->name, ptr->service);
@ -520,3 +564,41 @@ SESSION* get_session_by_router_ses(
}
return ses;
}
/**
* Create the filter chain for this session.
*
* Filters must be setup in reverse order, starting with the last
* filter in the chain and working back towards the client connection
* Each filter is passed the current session head of the filter chain
* this head becomes the destination for the filter. The newly created
* filter becomes the new head of the filter chain.
*
* @param session The session that requires the chain
* @return 0 if filter creation fails
*/
static int
session_setup_filters(SESSION *session)
{
SERVICE *service = session->service;
DOWNSTREAM *head;
int i;
for (i = service->n_filters - 1; i >= 0; i--)
{
if ((head = filterApply(service->filters[i], session,
&session->head)) == NULL)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Failed to create filter '%s' for service '%s'.\n",
service->filters[i]->name,
service->name)));
return 0;
}
session->head = *head;
}
return 1;
}