Merge branch 'develop' into MAX-59

This commit is contained in:
MassimilianoPinto
2014-06-24 18:21:09 +02:00
7 changed files with 523 additions and 5 deletions

Binary file not shown.

Binary file not shown.

View File

@ -48,6 +48,7 @@
* This fixes a bug with many reads from * This fixes a bug with many reads from
* backend * backend
* 07/05/2014 Mark Riddoch Addition of callback mechanism * 07/05/2014 Mark Riddoch Addition of callback mechanism
* 20/06/2014 Mark Riddoch Addition of dcb_clone
* *
* @endverbatim * @endverbatim
*/ */
@ -84,6 +85,9 @@ static bool dcb_set_state_nomutex(
dcb_state_t* old_state); dcb_state_t* old_state);
static void dcb_call_callback(DCB *dcb, DCB_REASON reason); static void dcb_call_callback(DCB *dcb, DCB_REASON reason);
static DCB* dcb_get_next (DCB* dcb); static DCB* dcb_get_next (DCB* dcb);
static int dcb_null_write(DCB *dcb, GWBUF *buf);
static int dcb_null_close(DCB *dcb);
static int dcb_null_auth(DCB *dcb, SERVER *server, SESSION *session, GWBUF *buf);
DCB* dcb_get_zombies(void) DCB* dcb_get_zombies(void)
{ {
@ -136,6 +140,7 @@ DCB *rval;
rval->remote = NULL; rval->remote = NULL;
rval->user = NULL; rval->user = NULL;
rval->flags = 0;
spinlock_acquire(&dcbspin); spinlock_acquire(&dcbspin);
if (allDCBs == NULL) if (allDCBs == NULL)
@ -248,7 +253,39 @@ dcb_add_to_zombieslist(DCB *dcb)
spinlock_release(&zombiespin); spinlock_release(&zombiespin);
} }
/*
* Clone a DCB for internal use, mostly used for specialist filters
* to create dummy clients based on real clients.
*
* @param orig The DCB to clone
* @return A DCB that can be used as a client
*/
DCB *
dcb_clone(DCB *orig)
{
DCB *clone;
if ((clone = dcb_alloc(DCB_ROLE_REQUEST_HANDLER)) == NULL)
{
return NULL;
}
clone->fd = -1;
clone->flags |= DCBF_CLONE;
clone->state = orig->state;
clone->data = orig->data;
if (orig->remote)
clone->remote = strdup(orig->remote);
if (orig->user)
clone->user = strdup(orig->user);
clone->protocol = orig->protocol;
clone->func.write = dcb_null_write;
clone->func.close = dcb_null_close;
clone->func.auth = dcb_null_auth;
return clone;
}
/** /**
* Free a DCB and remove it from the chain of all DCBs * Free a DCB and remove it from the chain of all DCBs
@ -312,7 +349,7 @@ DCB_CALLBACK *cb;
if (dcb->protocol != NULL) if (dcb->protocol != NULL)
free(dcb->protocol); free(dcb->protocol);
if (dcb->data) if (dcb->data && ((dcb->flags & DCBF_CLONE) ==0))
free(dcb->data); free(dcb->data);
if (dcb->remote) if (dcb->remote)
free(dcb->remote); free(dcb->remote);
@ -1190,6 +1227,8 @@ DCB *dcb;
dcb_printf(pdcb, "\t\tNo. of Accepts: %d\n", dcb->stats.n_accepts); dcb_printf(pdcb, "\t\tNo. of Accepts: %d\n", dcb->stats.n_accepts);
dcb_printf(pdcb, "\t\tNo. of High Water Events: %d\n", dcb->stats.n_high_water); dcb_printf(pdcb, "\t\tNo. of High Water Events: %d\n", dcb->stats.n_high_water);
dcb_printf(pdcb, "\t\tNo. of Low Water Events: %d\n", dcb->stats.n_low_water); dcb_printf(pdcb, "\t\tNo. of Low Water Events: %d\n", dcb->stats.n_low_water);
if (dcb->flags & DCBF_CLONE)
dcb_printf(pdcb, "\t\tDCB is a clone.\n");
dcb = dcb->next; dcb = dcb->next;
} }
spinlock_release(&dcbspin); spinlock_release(&dcbspin);
@ -1254,6 +1293,8 @@ dprintDCB(DCB *pdcb, DCB *dcb)
dcb->stats.n_high_water); dcb->stats.n_high_water);
dcb_printf(pdcb, "\t\tNo. of Low Water Events: %d\n", dcb_printf(pdcb, "\t\tNo. of Low Water Events: %d\n",
dcb->stats.n_low_water); dcb->stats.n_low_water);
if (dcb->flags & DCBF_CLONE)
dcb_printf(pdcb, "\t\tDCB is a clone.\n");
} }
/** /**
@ -1284,7 +1325,7 @@ gw_dcb_state2string (int state) {
} }
/** /**
* A DCB based wrapper for printf. Allows formattign printing to * A DCB based wrapper for printf. Allows formatting printing to
* a descritor control block. * a descritor control block.
* *
* @param dcb Descriptor to write to * @param dcb Descriptor to write to
@ -1822,3 +1863,46 @@ void dcb_call_foreach (
return; return;
} }
/**
* Null protocol write routine used for cloned dcb's. It merely consumes
* buffers written on the cloned DCB.
*
* @params dcb The descriptor control block
* @params buf The buffer beign written
* @return Always returns a good write operation result
*/
static int
dcb_null_write(DCB *dcb, GWBUF *buf)
{
while (buf)
{
buf = gwbuf_consume(buf, GWBUF_LENGTH(buf));
}
return 1;
}
/**
* Null protocol close operation for use by cloned DCB's.
*
* @param dcb The DCB being closed.
*/
static int
dcb_null_close(DCB *dcb)
{
return 0;
}
/**
* Null protocol auth operation for use by cloned DCB's.
*
* @param dcb The DCB being closed.
* @param server The server to auth against
* @param session The user session
* @param buf The buffer with the new auth request
*/
static int
dcb_null_auth(DCB *dcb, SERVER *server, SESSION *session, GWBUF *buf)
{
return 0;
}

View File

@ -207,6 +207,7 @@ typedef struct dcb {
#endif #endif
int fd; /**< The descriptor */ int fd; /**< The descriptor */
dcb_state_t state; /**< Current descriptor state */ dcb_state_t state; /**< Current descriptor state */
int flags; /**< DCB flags */
char *remote; /**< Address of remote end */ char *remote; /**< Address of remote end */
char *user; /**< User name for connection */ char *user; /**< User name for connection */
struct sockaddr_in ipv4; /**< remote end IPv4 address */ struct sockaddr_in ipv4; /**< remote end IPv4 address */
@ -271,6 +272,7 @@ int dcb_write(DCB *, GWBUF *);
DCB *dcb_alloc(dcb_role_t); DCB *dcb_alloc(dcb_role_t);
void dcb_free(DCB *); void dcb_free(DCB *);
DCB *dcb_connect(struct server *, struct session *, const char *); DCB *dcb_connect(struct server *, struct session *, const char *);
DCB *dcb_clone(DCB *);
int dcb_read(DCB *, GWBUF **); int dcb_read(DCB *, GWBUF **);
int dcb_drain_writeq(DCB *); int dcb_drain_writeq(DCB *);
void dcb_close(DCB *); void dcb_close(DCB *);
@ -295,4 +297,8 @@ bool dcb_set_state(
DCB* dcb, DCB* dcb,
dcb_state_t new_state, dcb_state_t new_state,
dcb_state_t* old_state); dcb_state_t* old_state);
/* DCB flags values */
#define DCBF_CLONE 0x0001 /* DCB is a clone */
#endif /* _DCB_H */ #endif /* _DCB_H */

View File

@ -40,10 +40,12 @@ REGEXSRCS=regexfilter.c
REGEXOBJ=$(REGEXSRCS:.c=.o) REGEXOBJ=$(REGEXSRCS:.c=.o)
TOPNSRCS=topfilter.c TOPNSRCS=topfilter.c
TOPNOBJ=$(TOPNSRCS:.c=.o) TOPNOBJ=$(TOPNSRCS:.c=.o)
SRCS=$(TESTSRCS) $(QLASRCS) $(REGEXSRCS) $(TOPNSRCS) TEESRCS=tee.c
TEEOBJ=$(TEESRCS:.c=.o)
SRCS=$(TESTSRCS) $(QLASRCS) $(REGEXSRCS) $(TOPNSRCS) $(TEESRCS)
OBJ=$(SRCS:.c=.o) OBJ=$(SRCS:.c=.o)
LIBS=$(UTILSPATH)/skygw_utils.o -lssl -llog_manager LIBS=$(UTILSPATH)/skygw_utils.o -lssl -llog_manager
MODULES= libtestfilter.so libqlafilter.so libregexfilter.so libtopfilter.so MODULES= libtestfilter.so libqlafilter.so libregexfilter.so libtopfilter.so libtee.so
all: $(MODULES) all: $(MODULES)
@ -60,6 +62,9 @@ libregexfilter.so: $(REGEXOBJ)
libtopfilter.so: $(TOPNOBJ) libtopfilter.so: $(TOPNOBJ)
$(CC) $(LDFLAGS) $(TOPNOBJ) $(LIBS) -o $@ $(CC) $(LDFLAGS) $(TOPNOBJ) $(LIBS) -o $@
libtee.so: $(TEEOBJ)
$(CC) $(LDFLAGS) $(TEEOBJ) $(LIBS) -o $@
.c.o: .c.o:
$(CC) $(CFLAGS) $< -o $@ $(CC) $(CFLAGS) $< -o $@

423
server/modules/filter/tee.c Normal file
View File

@ -0,0 +1,423 @@
/*
* This file is distributed as part of MaxScale by SkySQL. It is free
* software: you can redistribute it and/or modify it under the terms of the
* GNU General Public License as published by the Free Software Foundation,
* version 2.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along with
* this program; if not, write to the Free Software Foundation, Inc., 51
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Copyright SkySQL Ab 2014
*/
/**
* @file tee.c A filter that splits the processing pipeline in two
*
* Conditionally duplicate requests and send the duplicates to another service
* within MaxScale.
*
* Parameters
* ==========
*
* service The service to send the duplicates to
* source The source address to match in order to duplicate (optional)
* match A regular expression to match in order to perform duplication
* of the request (optional)
* nomatch A regular expression to match in order to prevent duplication
* of the request (optional)
* user A user name to match against. If present only requests that
* originate from this user will be duplciated (optional)
*
* Revision History
* ================
*
* Date Who Description
* 20/06/2014 Mark Riddoch Initial implementation
*
*/
#include <stdio.h>
#include <fcntl.h>
#include <filter.h>
#include <modinfo.h>
#include <modutil.h>
#include <skygw_utils.h>
#include <log_manager.h>
#include <sys/time.h>
#include <regex.h>
#include <string.h>
#include <service.h>
#include <router.h>
#include <dcb.h>
extern int lm_enabled_logfiles_bitmask;
MODULE_INFO info = {
MODULE_API_FILTER,
MODULE_ALPHA_RELEASE,
FILTER_VERSION,
"A tee piece in the filter plumbing"
};
static char *version_str = "V1.0.0";
/*
* The filter entry points
*/
static FILTER *createInstance(char **options, FILTER_PARAMETER **);
static void *newSession(FILTER *instance, SESSION *session);
static void closeSession(FILTER *instance, void *session);
static void freeSession(FILTER *instance, void *session);
static void setDownstream(FILTER *instance, void *fsession, DOWNSTREAM *downstream);
static int routeQuery(FILTER *instance, void *fsession, GWBUF *queue);
static void diagnostic(FILTER *instance, void *fsession, DCB *dcb);
static FILTER_OBJECT MyObject = {
createInstance,
newSession,
closeSession,
freeSession,
setDownstream,
NULL, // No Upstream requirement
routeQuery,
NULL, // No client reply
diagnostic,
};
/**
* The instance structure for the TEE filter - this holds the configuration
* information for the filter.
*/
typedef struct {
SERVICE *service; /* The service to duplicate requests to */
char *source; /* The source of the client connection */
char *userName; /* The user name to filter on */
char *match; /* Optional text to match against */
regex_t re; /* Compiled regex text */
char *nomatch; /* Optional text to match against for exclusion */
regex_t nore; /* Compiled regex nomatch text */
} TEE_INSTANCE;
/**
* The session structure for this TEE filter.
* This stores the downstream filter information, such that the
* filter is able to pass the query on to the next filter (or router)
* in the chain.
*
* It also holds the file descriptor to which queries are written.
*/
typedef struct {
DOWNSTREAM down;
int active;
DCB *branch_dcb;
SESSION *branch_session;
int n_duped;
} TEE_SESSION;
/**
* Implementation of the mandatory version entry point
*
* @return version string of the module
*/
char *
version()
{
return version_str;
}
/**
* The module initialisation routine, called when the module
* is first loaded.
*/
void
ModuleInit()
{
}
/**
* The module entry point routine. It is this routine that
* must populate the structure that is referred to as the
* "module object", this is a structure with the set of
* external entry points for this module.
*
* @return The module object
*/
FILTER_OBJECT *
GetModuleObject()
{
return &MyObject;
}
/**
* Create an instance of the filter for a particular service
* within MaxScale.
*
* @param options The options for this filter
*
* @return The instance data for this new instance
*/
static FILTER *
createInstance(char **options, FILTER_PARAMETER **params)
{
TEE_INSTANCE *my_instance;
int i;
if ((my_instance = calloc(1, sizeof(TEE_INSTANCE))) != NULL)
{
if (options)
{
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"tee: The tee filter has been passed an option, "
"this filter does not support any options.\n")));
}
my_instance->service = NULL;
my_instance->source = NULL;
my_instance->userName = NULL;
my_instance->match = NULL;
my_instance->nomatch = NULL;
if (params)
{
for (i = 0; params[i]; i++)
{
if (!strcmp(params[i]->name, "service"))
{
if ((my_instance->service = service_find(params[i]->value)) == NULL)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"tee: service '%s' "
"not found.\n",
params[i]->value)));
}
}
else if (!strcmp(params[i]->name, "match"))
{
my_instance->match = strdup(params[i]->value);
}
else if (!strcmp(params[i]->name, "exclude"))
{
my_instance->nomatch = strdup(params[i]->value);
}
else if (!strcmp(params[i]->name, "source"))
my_instance->source = strdup(params[i]->value);
else if (!strcmp(params[i]->name, "user"))
my_instance->userName = strdup(params[i]->value);
else if (!filter_standard_parameter(params[i]->name))
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"tee: Unexpected parameter '%s'.\n",
params[i]->name)));
}
}
}
if (my_instance->service == NULL)
{
free(my_instance->match);
free(my_instance->source);
free(my_instance);
return NULL;
}
if (my_instance->match &&
regcomp(&my_instance->re, my_instance->match, REG_ICASE))
{
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"tee: Invalid regular expression '%s'"
" for the match parameter.\n",
my_instance->match)));
free(my_instance->match);
free(my_instance->source);
free(my_instance);
return NULL;
}
if (my_instance->nomatch &&
regcomp(&my_instance->nore, my_instance->nomatch,
REG_ICASE))
{
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"tee: Invalid regular expression '%s'"
" for the nomatch paramter.\n",
my_instance->match)));
if (my_instance->match)
regfree(&my_instance->re);
free(my_instance->match);
free(my_instance->source);
free(my_instance);
return NULL;
}
}
return (FILTER *)my_instance;
}
/**
* Associate a new session with this instance of the filter.
*
* Create the file to log to and open it.
*
* @param instance The filter instance data
* @param session The session itself
* @return Session specific data for this session
*/
static void *
newSession(FILTER *instance, SESSION *session)
{
TEE_INSTANCE *my_instance = (TEE_INSTANCE *)instance;
TEE_SESSION *my_session;
char *remote, *userName;
if ((my_session = calloc(1, sizeof(TEE_SESSION))) != NULL)
{
my_session->active = 1;
if (my_instance->source
&& (remote = session_get_remote(session)) != NULL)
{
if (strcmp(remote, my_instance->source))
my_session->active = 0;
}
userName = session_getUser(session);
if (my_instance->userName && userName && strcmp(userName,
my_instance->userName))
my_session->active = 0;
if (my_session->active)
{
my_session->branch_dcb = dcb_clone(session->client);
my_session->branch_session = session_alloc(my_instance->service, my_session->branch_dcb);
}
}
return my_session;
}
/**
* Close a session with the filter, this is the mechanism
* by which a filter may cleanup data structure etc.
* In the case of the tee filter we need to close down the
* "branch" session.
*
* @param instance The filter instance data
* @param session The session being closed
*/
static void
closeSession(FILTER *instance, void *session)
{
TEE_SESSION *my_session = (TEE_SESSION *)session;
ROUTER_OBJECT *router;
void *router_instance, *rsession;
SESSION *bsession;
if (my_session->active)
{
bsession = my_session->branch_session;
router = bsession->service->router;
router_instance = bsession->service->router_instance;
rsession = bsession->router_session;
/** Close router session and all its connections */
router->closeSession(router_instance, rsession);
dcb_free(my_session->branch_dcb);
/* No need to free the session, this is done as
* a side effect of closign the client DCB of the
* session.
*/
}
}
/**
* Free the memory associated with the session
*
* @param instance The filter instance
* @param session The filter session
*/
static void
freeSession(FILTER *instance, void *session)
{
TEE_SESSION *my_session = (TEE_SESSION *)session;
free(session);
return;
}
/**
* Set the downstream filter or router to which queries will be
* passed from this filter.
*
* @param instance The filter instance data
* @param session The filter session
* @param downstream The downstream filter or router.
*/
static void
setDownstream(FILTER *instance, void *session, DOWNSTREAM *downstream)
{
TEE_SESSION *my_session = (TEE_SESSION *)session;
my_session->down = *downstream;
}
/**
* The routeQuery entry point. This is passed the query buffer
* to which the filter should be applied. Once applied the
* query should normally be passed to the downstream component
* (filter or router) in the filter chain.
*
* @param instance The filter instance data
* @param session The filter session
* @param queue The query data
*/
static int
routeQuery(FILTER *instance, void *session, GWBUF *queue)
{
TEE_INSTANCE *my_instance = (TEE_INSTANCE *)instance;
TEE_SESSION *my_session = (TEE_SESSION *)session;
char *ptr;
int length, rval;
GWBUF *clone = NULL;
if (my_session->active && modutil_extract_SQL(queue, &ptr, &length))
{
if ((my_instance->match == NULL ||
regexec(&my_instance->re, ptr, 0, NULL, 0) == 0) &&
(my_instance->nomatch == NULL ||
regexec(&my_instance->nore,ptr,0,NULL, 0) != 0))
{
clone = gwbuf_clone(queue);
}
}
/* Pass the query downstream */
rval = my_session->down.routeQuery(my_session->down.instance,
my_session->down.session, queue);
if (clone)
{
my_session->n_duped++;
SESSION_ROUTE_QUERY(my_session->branch_session, clone);
}
return rval;
}
/**
* Diagnostics routine
*
* If fsession is NULL then print diagnostics on the filter
* instance as a whole, otherwise print diagnostics for the
* particular session.
*
* @param instance The filter instance
* @param fsession Filter session, may be NULL
* @param dcb The DCB for diagnostic output
*/
static void
diagnostic(FILTER *instance, void *fsession, DCB *dcb)
{
TEE_SESSION *my_session = (TEE_SESSION *)fsession;
if (my_session)
{
dcb_printf(dcb, "\t\tNo. of statements duplicated: %d.\n",
my_session->n_duped);
}
}