Reindented server/core/filter.c

This commit is contained in:
Johan Wikman 2015-11-30 12:51:24 +02:00
parent bd94d8967a
commit 20c4a0aa67
2 changed files with 294 additions and 264 deletions

View File

@ -22,8 +22,8 @@
* @verbatim
* Revision History
*
* Date Who Description
* 29/05/14 Mark Riddoch Initial implementation
* Date Who Description
* 29/05/14 Mark Riddoch Initial implementation
*
* @endverbatim
*/
@ -38,116 +38,124 @@
#include <skygw_utils.h>
#include <log_manager.h>
static SPINLOCK filter_spin = SPINLOCK_INIT; /**< Protects the list of all filters */
static FILTER_DEF *allFilters = NULL; /**< The list of all filters */
static SPINLOCK filter_spin = SPINLOCK_INIT; /**< Protects the list of all filters */
static FILTER_DEF *allFilters = NULL; /**< The list of all filters */
/**
* Allocate a new filter within MaxScale
*
*
* @param name The filter name
* @param module The module to load
* @param name The filter name
* @param module The module to load
*
* @return The newly created filter or NULL if an error occured
* @return The newly created filter or NULL if an error occured
*/
FILTER_DEF *
filter_alloc(char *name, char *module)
{
FILTER_DEF *filter;
FILTER_DEF *filter;
if ((filter = (FILTER_DEF *)malloc(sizeof(FILTER_DEF))) == NULL)
return NULL;
filter->name = strdup(name);
filter->module = strdup(module);
filter->filter = NULL;
filter->options = NULL;
filter->obj = NULL;
filter->parameters = NULL;
if ((filter = (FILTER_DEF *)malloc(sizeof(FILTER_DEF))) == NULL)
{
return NULL;
}
filter->name = strdup(name);
filter->module = strdup(module);
filter->filter = NULL;
filter->options = NULL;
filter->obj = NULL;
filter->parameters = NULL;
spinlock_init(&filter->spin);
spinlock_init(&filter->spin);
spinlock_acquire(&filter_spin);
filter->next = allFilters;
allFilters = filter;
spinlock_release(&filter_spin);
spinlock_acquire(&filter_spin);
filter->next = allFilters;
allFilters = filter;
spinlock_release(&filter_spin);
return filter;
return filter;
}
/**
* Deallocate the specified filter
*
* @param filter The filter to deallocate
* @return Returns true if the server was freed
* @param filter The filter to deallocate
* @return Returns true if the server was freed
*/
void
filter_free(FILTER_DEF *filter)
{
FILTER_DEF *ptr;
FILTER_DEF *ptr;
if (filter)
{
/* First of all remove from the linked list */
spinlock_acquire(&filter_spin);
if (allFilters == filter)
{
allFilters = filter->next;
}
else
{
ptr = allFilters;
while (ptr && ptr->next != filter)
{
ptr = ptr->next;
}
if (ptr)
ptr->next = filter->next;
}
spinlock_release(&filter_spin);
if (filter)
{
/* First of all remove from the linked list */
spinlock_acquire(&filter_spin);
if (allFilters == filter)
{
allFilters = filter->next;
}
else
{
ptr = allFilters;
while (ptr && ptr->next != filter)
{
ptr = ptr->next;
}
if (ptr)
{
ptr->next = filter->next;
}
}
spinlock_release(&filter_spin);
/* Clean up session and free the memory */
free(filter->name);
free(filter->module);
free(filter);
}
/* Clean up session and free the memory */
free(filter->name);
free(filter->module);
free(filter);
}
}
/**
* Find an existing filter using the unique section name in
* configuration file
*
* @param name The filter name
* @return The server or NULL if not found
* @param name The filter name
* @return The server or NULL if not found
*/
FILTER_DEF *
filter_find(char *name)
{
FILTER_DEF *filter;
FILTER_DEF *filter;
spinlock_acquire(&filter_spin);
filter = allFilters;
while (filter)
{
if (strcmp(filter->name, name) == 0)
break;
filter = filter->next;
}
spinlock_release(&filter_spin);
return filter;
spinlock_acquire(&filter_spin);
filter = allFilters;
while (filter)
{
if (strcmp(filter->name, name) == 0)
{
break;
}
filter = filter->next;
}
spinlock_release(&filter_spin);
return filter;
}
/**
* Check a parameter to see if it is a standard filter parameter
*
* @param name Parameter name to check
* @param name Parameter name to check
*/
int
filter_standard_parameter(char *name)
{
if (strcmp(name, "type") == 0 || strcmp(name, "module") == 0)
return 1;
return 0;
if (strcmp(name, "type") == 0 || strcmp(name, "module") == 0)
{
return 1;
}
return 0;
}
/**
@ -159,29 +167,35 @@ filter_standard_parameter(char *name)
void
dprintAllFilters(DCB *dcb)
{
FILTER_DEF *ptr;
int i;
FILTER_DEF *ptr;
int i;
spinlock_acquire(&filter_spin);
ptr = allFilters;
while (ptr)
{
dcb_printf(dcb, "Filter %p (%s)\n", ptr, ptr->name);
dcb_printf(dcb, "\tModule: %s\n", ptr->module);
if (ptr->options)
{
dcb_printf(dcb, "\tOptions: ");
for (i = 0; ptr->options && ptr->options[i]; i++)
dcb_printf(dcb, "%s ", ptr->options[i]);
dcb_printf(dcb, "\n");
}
if (ptr->obj && ptr->filter)
ptr->obj->diagnostics(ptr->filter, NULL, dcb);
else
dcb_printf(dcb, "\tModule not loaded.\n");
ptr = ptr->next;
}
spinlock_release(&filter_spin);
spinlock_acquire(&filter_spin);
ptr = allFilters;
while (ptr)
{
dcb_printf(dcb, "Filter %p (%s)\n", ptr, ptr->name);
dcb_printf(dcb, "\tModule: %s\n", ptr->module);
if (ptr->options)
{
dcb_printf(dcb, "\tOptions: ");
for (i = 0; ptr->options && ptr->options[i]; i++)
{
dcb_printf(dcb, "%s ", ptr->options[i]);
}
dcb_printf(dcb, "\n");
}
if (ptr->obj && ptr->filter)
{
ptr->obj->diagnostics(ptr->filter, NULL, dcb);
}
else
{
dcb_printf(dcb, "\tModule not loaded.\n");
}
ptr = ptr->next;
}
spinlock_release(&filter_spin);
}
/**
@ -193,19 +207,23 @@ int i;
void
dprintFilter(DCB *dcb, FILTER_DEF *filter)
{
int i;
int i;
dcb_printf(dcb, "Filter %p (%s)\n", filter, filter->name);
dcb_printf(dcb, "\tModule: %s\n", filter->module);
if (filter->options)
{
dcb_printf(dcb, "\tOptions: ");
for (i = 0; filter->options && filter->options[i]; i++)
dcb_printf(dcb, "%s ", filter->options[i]);
dcb_printf(dcb, "\n");
}
if (filter->obj && filter->filter)
filter->obj->diagnostics(filter->filter, NULL, dcb);
dcb_printf(dcb, "Filter %p (%s)\n", filter, filter->name);
dcb_printf(dcb, "\tModule: %s\n", filter->module);
if (filter->options)
{
dcb_printf(dcb, "\tOptions: ");
for (i = 0; filter->options && filter->options[i]; i++)
{
dcb_printf(dcb, "%s ", filter->options[i]);
}
dcb_printf(dcb, "\n");
}
if (filter->obj && filter->filter)
{
filter->obj->diagnostics(filter->filter, NULL, dcb);
}
}
/**
@ -215,93 +233,101 @@ int i;
void
dListFilters(DCB *dcb)
{
FILTER_DEF *ptr;
int i;
FILTER_DEF *ptr;
int i;
spinlock_acquire(&filter_spin);
ptr = allFilters;
if (ptr)
{
dcb_printf(dcb, "Filters\n");
dcb_printf(dcb, "--------------------+-----------------+----------------------------------------\n");
dcb_printf(dcb, "%-19s | %-15s | Options\n",
"Filter", "Module");
dcb_printf(dcb, "--------------------+-----------------+----------------------------------------\n");
}
while (ptr)
{
dcb_printf(dcb, "%-19s | %-15s | ",
ptr->name, ptr->module);
for (i = 0; ptr->options && ptr->options[i]; i++)
dcb_printf(dcb, "%s ", ptr->options[i]);
dcb_printf(dcb, "\n");
ptr = ptr->next;
}
if (allFilters)
dcb_printf(dcb, "--------------------+-----------------+----------------------------------------\n\n");
spinlock_release(&filter_spin);
spinlock_acquire(&filter_spin);
ptr = allFilters;
if (ptr)
{
dcb_printf(dcb, "Filters\n");
dcb_printf(dcb, "--------------------+-----------------+----------------------------------------\n");
dcb_printf(dcb, "%-19s | %-15s | Options\n",
"Filter", "Module");
dcb_printf(dcb, "--------------------+-----------------+----------------------------------------\n");
}
while (ptr)
{
dcb_printf(dcb, "%-19s | %-15s | ",
ptr->name, ptr->module);
for (i = 0; ptr->options && ptr->options[i]; i++)
{
dcb_printf(dcb, "%s ", ptr->options[i]);
}
dcb_printf(dcb, "\n");
ptr = ptr->next;
}
if (allFilters)
{
dcb_printf(dcb,
"--------------------+-----------------+----------------------------------------\n\n");
}
spinlock_release(&filter_spin);
}
/**
* Add a router option to a service
*
* @param filter The filter to add the option to
* @param option The option string
* @param filter The filter to add the option to
* @param option The option string
*/
void
filterAddOption(FILTER_DEF *filter, char *option)
{
int i;
int i;
spinlock_acquire(&filter->spin);
if (filter->options == NULL)
spinlock_acquire(&filter->spin);
if (filter->options == NULL)
{
filter->options = (char **)calloc(2, sizeof(char *));
filter->options[0] = strdup(option);
filter->options[1] = NULL;
}
else
{
for (i = 0; filter->options[i]; i++)
{
filter->options = (char **)calloc(2, sizeof(char *));
filter->options[0] = strdup(option);
filter->options[1] = NULL;
;
}
else
{
for (i = 0; filter->options[i]; i++)
;
filter->options = (char **)realloc(filter->options,
(i + 2) * sizeof(char *));
filter->options[i] = strdup(option);
filter->options[i+1] = NULL;
}
spinlock_release(&filter->spin);
filter->options = (char **)realloc(filter->options, (i + 2) * sizeof(char *));
filter->options[i] = strdup(option);
filter->options[i+1] = NULL;
}
spinlock_release(&filter->spin);
}
/**
* Add a router parameter to a service
*
* @param filter The filter to add the parameter to
* @param name The parameter name
* @param value The parameter value
* @param filter The filter to add the parameter to
* @param name The parameter name
* @param value The parameter value
*/
void
filterAddParameter(FILTER_DEF *filter, char *name, char *value)
{
int i;
int i;
spinlock_acquire(&filter->spin);
if (filter->parameters == NULL)
spinlock_acquire(&filter->spin);
if (filter->parameters == NULL)
{
filter->parameters = (FILTER_PARAMETER **)calloc(2, sizeof(FILTER_PARAMETER *));
i = 0;
}
else
{
for (i = 0; filter->parameters[i]; i++)
{
filter->parameters = (FILTER_PARAMETER **)calloc(2, sizeof(FILTER_PARAMETER *));
i = 0;
;
}
else
{
for (i = 0; filter->parameters[i]; i++)
;
filter->parameters = (FILTER_PARAMETER **)realloc(filter->parameters,
(i + 2) * sizeof(FILTER_PARAMETER *));
}
filter->parameters[i] = (FILTER_PARAMETER *)calloc(1, sizeof(FILTER_PARAMETER));
filter->parameters[i]->name = strdup(name);
filter->parameters[i]->value = strdup(value);
filter->parameters[i+1] = NULL;
spinlock_release(&filter->spin);
filter->parameters = (FILTER_PARAMETER **)realloc(filter->parameters,
(i + 2) * sizeof(FILTER_PARAMETER *));
}
filter->parameters[i] = (FILTER_PARAMETER *)calloc(1, sizeof(FILTER_PARAMETER));
filter->parameters[i]->name = strdup(name);
filter->parameters[i]->value = strdup(value);
filter->parameters[i+1] = NULL;
spinlock_release(&filter->spin);
}
/**
@ -344,38 +370,38 @@ bool filter_load(FILTER_DEF* filter)
* This will create the filter instance, loading the filter module, and
* conenct the fitler into the downstream chain.
*
* @param filter The filter to add into the chain
* @param session The client session
* @param downstream The filter downstream of this filter
* @return The downstream component for the next filter or NULL
* if the filter could not be created
* @param filter The filter to add into the chain
* @param session The client session
* @param downstream The filter downstream of this filter
* @return The downstream component for the next filter or NULL
* if the filter could not be created
*/
DOWNSTREAM *
filterApply(FILTER_DEF *filter, SESSION *session, DOWNSTREAM *downstream)
{
DOWNSTREAM *me;
DOWNSTREAM *me;
if ((me = (DOWNSTREAM *)calloc(1, sizeof(DOWNSTREAM))) == NULL)
{
char errbuf[STRERROR_BUFLEN];
MXS_ERROR("Memory allocation for filter session failed "
"due to %d,%s.",
errno,
strerror_r(errno, errbuf, sizeof(errbuf)));
return NULL;
}
me->instance = filter->filter;
me->routeQuery = (void *)(filter->obj->routeQuery);
if ((me->session=filter->obj->newSession(me->instance, session)) == NULL)
{
free(me);
return NULL;
}
filter->obj->setDownstream(me->instance, me->session, downstream);
return me;
if ((me = (DOWNSTREAM *)calloc(1, sizeof(DOWNSTREAM))) == NULL)
{
char errbuf[STRERROR_BUFLEN];
MXS_ERROR("Memory allocation for filter session failed "
"due to %d,%s.",
errno,
strerror_r(errno, errbuf, sizeof(errbuf)));
return NULL;
}
me->instance = filter->filter;
me->routeQuery = (void *)(filter->obj->routeQuery);
if ((me->session=filter->obj->newSession(me->instance, session)) == NULL)
{
free(me);
return NULL;
}
filter->obj->setDownstream(me->instance, me->session, downstream);
return me;
}
/**
@ -386,33 +412,35 @@ filterApply(FILTER_DEF *filter, SESSION *session, DOWNSTREAM *downstream)
* Note all filters require to be in the upstream chain, so this routine
* may skip a filter if it does not provide an upstream interface.
*
* @param filter The fitler to add to the chain
* @param fsession The filter session
* @param upstream The filter that should be upstream of this filter
* @return The upstream component for the next filter
* @param filter The fitler to add to the chain
* @param fsession The filter session
* @param upstream The filter that should be upstream of this filter
* @return The upstream component for the next filter
*/
UPSTREAM *
filterUpstream(FILTER_DEF *filter, void *fsession, UPSTREAM *upstream)
{
UPSTREAM *me = NULL;
UPSTREAM *me = NULL;
/*
* The the filter has no setUpstream entry point then is does
* not require to see results and can be left out of the chain.
*/
if (filter->obj->setUpstream == NULL)
return upstream;
/*
* The the filter has no setUpstream entry point then is does
* not require to see results and can be left out of the chain.
*/
if (filter->obj->setUpstream == NULL)
{
return upstream;
}
if (filter->obj->clientReply != NULL)
{
if ((me = (UPSTREAM *)calloc(1, sizeof(UPSTREAM))) == NULL)
{
return NULL;
}
me->instance = filter->filter;
me->session = fsession;
me->clientReply = (void *)(filter->obj->clientReply);
filter->obj->setUpstream(me->instance, me->session, upstream);
}
return me;
if (filter->obj->clientReply != NULL)
{
if ((me = (UPSTREAM *)calloc(1, sizeof(UPSTREAM))) == NULL)
{
return NULL;
}
me->instance = filter->filter;
me->session = fsession;
me->clientReply = (void *)(filter->obj->clientReply);
filter->obj->setUpstream(me->instance, me->session, upstream);
}
return me;
}

View File

@ -23,8 +23,8 @@
*
* Revision History
*
* Date Who Description
* 27/05/2014 Mark Riddoch Initial implementation
* Date Who Description
* 27/05/2014 Mark Riddoch Initial implementation
*
*/
#include <dcb.h>
@ -41,9 +41,10 @@ typedef void *FILTER;
/**
* The structure used to pass name, value pairs to the filter instances
*/
typedef struct {
char *name; /**< Name of the parameter */
char *value; /**< Value of the parameter */
typedef struct
{
char *name; /**< Name of the parameter */
char *value; /**< Value of the parameter */
} FILTER_PARAMETER;
/**
@ -51,34 +52,35 @@ typedef struct {
* The "module object" structure for a query router module
*
* The entry points are:
* createInstance Called by the service to create a new
* instance of the filter
* newSession Called to create a new user session
* within the filter
* closeSession Called when a session is closed
* freeSession Called when a session is freed
* setDownstream Sets the downstream component of the
* filter pipline
* routeQuery Called on each query that requires
* routing
* clientReply Called for each reply packet
* diagnostics Called to force the filter to print
* diagnostic output
* createInstance Called by the service to create a new
* instance of the filter
* newSession Called to create a new user session
* within the filter
* closeSession Called when a session is closed
* freeSession Called when a session is freed
* setDownstream Sets the downstream component of the
* filter pipline
* routeQuery Called on each query that requires
* routing
* clientReply Called for each reply packet
* diagnostics Called to force the filter to print
* diagnostic output
*
* @endverbatim
*
* @see load_module
*/
typedef struct filter_object {
FILTER *(*createInstance)(char **options, FILTER_PARAMETER **);
void *(*newSession)(FILTER *instance, SESSION *session);
void (*closeSession)(FILTER *instance, void *fsession);
void (*freeSession)(FILTER *instance, void *fsession);
void (*setDownstream)(FILTER *instance, void *fsession, DOWNSTREAM *downstream);
void (*setUpstream)(FILTER *instance, void *fsession, UPSTREAM *downstream);
int (*routeQuery)(FILTER *instance, void *fsession, GWBUF *queue);
int (*clientReply)(FILTER *instance, void *fsession, GWBUF *queue);
void (*diagnostics)(FILTER *instance, void *fsession, DCB *dcb);
typedef struct filter_object
{
FILTER *(*createInstance)(char **options, FILTER_PARAMETER **);
void *(*newSession)(FILTER *instance, SESSION *session);
void (*closeSession)(FILTER *instance, void *fsession);
void (*freeSession)(FILTER *instance, void *fsession);
void (*setDownstream)(FILTER *instance, void *fsession, DOWNSTREAM *downstream);
void (*setUpstream)(FILTER *instance, void *fsession, UPSTREAM *downstream);
int (*routeQuery)(FILTER *instance, void *fsession, GWBUF *queue);
int (*clientReply)(FILTER *instance, void *fsession, GWBUF *queue);
void (*diagnostics)(FILTER *instance, void *fsession, DCB *dcb);
} FILTER_OBJECT;
/**
@ -86,35 +88,35 @@ typedef struct filter_object {
* is changed these values must be updated in line with the rules in the
* file modinfo.h.
*/
#define FILTER_VERSION {1, 1, 0}
#define FILTER_VERSION {1, 1, 0}
/**
* The definition of a filter from the configuration file.
* This is basically the link between a plugin to load and the
* optons to pass to that plugin.
*/
typedef struct filter_def {
char *name; /**< The Filter name */
char *module; /**< The module to load */
char **options; /**< The options set for this filter */
FILTER_PARAMETER
**parameters; /**< The filter parameters */
FILTER filter; /**< The runtime filter */
FILTER_OBJECT *obj; /**< The "MODULE_OBJECT" for the filter */
SPINLOCK spin; /**< Spinlock to protect the filter definition */
struct filter_def
*next; /**< Next filter in the chain of all filters */
typedef struct filter_def
{
char *name; /**< The Filter name */
char *module; /**< The module to load */
char **options; /**< The options set for this filter */
FILTER_PARAMETER **parameters; /**< The filter parameters */
FILTER filter; /**< The runtime filter */
FILTER_OBJECT *obj; /**< The "MODULE_OBJECT" for the filter */
SPINLOCK spin; /**< Spinlock to protect the filter definition */
struct filter_def *next; /**< Next filter in the chain of all filters */
} FILTER_DEF;
FILTER_DEF *filter_alloc(char *, char *);
void filter_free(FILTER_DEF *);
bool filter_load(FILTER_DEF* filter);
FILTER_DEF *filter_find(char *);
void filterAddOption(FILTER_DEF *, char *);
void filterAddParameter(FILTER_DEF *, char *, char *);
DOWNSTREAM *filterApply(FILTER_DEF *, SESSION *, DOWNSTREAM *);
UPSTREAM *filterUpstream(FILTER_DEF *, void *, UPSTREAM *);
int filter_standard_parameter(char *);
void dprintAllFilters(DCB *);
void dprintFilter(DCB *, FILTER_DEF *);
void dListFilters(DCB *);
FILTER_DEF *filter_alloc(char *, char *);
void filter_free(FILTER_DEF *);
bool filter_load(FILTER_DEF* filter);
FILTER_DEF *filter_find(char *);
void filterAddOption(FILTER_DEF *, char *);
void filterAddParameter(FILTER_DEF *, char *, char *);
DOWNSTREAM *filterApply(FILTER_DEF *, SESSION *, DOWNSTREAM *);
UPSTREAM *filterUpstream(FILTER_DEF *, void *, UPSTREAM *);
int filter_standard_parameter(char *);
void dprintAllFilters(DCB *);
void dprintFilter(DCB *, FILTER_DEF *);
void dListFilters(DCB *);
#endif