Addition of session filter tracking in order to allow the show session
command to call the diagnostic entries points of the filters in the session. Slight improvements to the two example filters and a fix to the trim routine.
This commit is contained in:
parent
8d55be4b23
commit
857ae25570
@ -626,7 +626,8 @@ char *ptr;
|
||||
while (isspace(*str))
|
||||
str++;
|
||||
|
||||
ptr = str + strlen(str);
|
||||
/* Point to last character of the string */
|
||||
ptr = str + strlen(str) - 1;
|
||||
while (ptr > str && isspace(*ptr))
|
||||
*ptr-- = 0;
|
||||
|
||||
|
@ -94,6 +94,7 @@ session_alloc(SERVICE *service, DCB *client_dcb)
|
||||
spinlock_acquire(&session->ses_lock);
|
||||
session->service = service;
|
||||
session->client = client_dcb;
|
||||
session->n_filters = 0;
|
||||
memset(&session->stats, 0, sizeof(SESSION_STATS));
|
||||
session->stats.connect = time(0);
|
||||
session->state = SESSION_STATE_ALLOC;
|
||||
@ -269,6 +270,7 @@ bool session_free(
|
||||
bool succp = false;
|
||||
SESSION *ptr;
|
||||
int nlink;
|
||||
int i;
|
||||
|
||||
CHK_SESSION(session);
|
||||
|
||||
@ -304,10 +306,23 @@ bool session_free(
|
||||
|
||||
/* Free router_session and session */
|
||||
if (session->router_session) {
|
||||
session->service->router->closeSession(
|
||||
session->service->router_instance,
|
||||
session->router_session);
|
||||
session->service->router->freeSession(
|
||||
session->service->router_instance,
|
||||
session->router_session);
|
||||
}
|
||||
if (session->n_filters)
|
||||
{
|
||||
for (i = 0; i < session->n_filters; i++)
|
||||
{
|
||||
session->filters[i].filter->obj->freeSession(
|
||||
session->filters[i].instance,
|
||||
session->filters[i].session);
|
||||
}
|
||||
free(session->filters);
|
||||
}
|
||||
free(session);
|
||||
succp = true;
|
||||
|
||||
@ -482,7 +497,7 @@ SESSION *ptr;
|
||||
void
|
||||
dprintSession(DCB *dcb, SESSION *ptr)
|
||||
{
|
||||
DOWNSTREAM *dptr;
|
||||
int i;
|
||||
|
||||
dcb_printf(dcb, "Session %p\n", ptr);
|
||||
dcb_printf(dcb, "\tState: %s\n", session_state(ptr->state));
|
||||
@ -491,6 +506,18 @@ DOWNSTREAM *dptr;
|
||||
if (ptr->client && ptr->client->remote)
|
||||
dcb_printf(dcb, "\tClient Address: %s\n", ptr->client->remote);
|
||||
dcb_printf(dcb, "\tConnected: %s", asctime(localtime(&ptr->stats.connect)));
|
||||
if (ptr->n_filters)
|
||||
{
|
||||
for (i = 0; i < ptr->n_filters; i++)
|
||||
{
|
||||
dcb_printf(dcb, "\tFilter: %s\n",
|
||||
ptr->filters[i].filter->name);
|
||||
ptr->filters[i].filter->obj->diagnostics(
|
||||
ptr->filters[i].instance,
|
||||
ptr->filters[i].session,
|
||||
dcb);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -585,6 +612,16 @@ SERVICE *service = session->service;
|
||||
DOWNSTREAM *head;
|
||||
int i;
|
||||
|
||||
if ((session->filters = calloc(service->n_filters,
|
||||
sizeof(SESSION_FILTER))) == NULL)
|
||||
{
|
||||
LOGIF(LE, (skygw_log_write_flush(
|
||||
LOGFILE_ERROR,
|
||||
"Insufficient memory to allocate session filter "
|
||||
"tracking.\n")));
|
||||
return 0;
|
||||
}
|
||||
session->n_filters = service->n_filters;
|
||||
for (i = service->n_filters - 1; i >= 0; i--)
|
||||
{
|
||||
if ((head = filterApply(service->filters[i], session,
|
||||
@ -597,6 +634,9 @@ int i;
|
||||
service->name)));
|
||||
return 0;
|
||||
}
|
||||
session->filters[i].filter = service->filters[i];
|
||||
session->filters[i].session = head->session;
|
||||
session->filters[i].instance = head->instance;
|
||||
session->head = *head;
|
||||
}
|
||||
|
||||
|
@ -44,6 +44,7 @@
|
||||
|
||||
struct dcb;
|
||||
struct service;
|
||||
struct filter_def;
|
||||
|
||||
/**
|
||||
* The session statistics structure
|
||||
@ -69,9 +70,20 @@ typedef enum {
|
||||
typedef struct {
|
||||
void *instance;
|
||||
void *session;
|
||||
int (*routeQuery)(void *instance, void *router_session, GWBUF *queue);
|
||||
int (*routeQuery)(void *instance,
|
||||
void *router_session, GWBUF *queue);
|
||||
} DOWNSTREAM;
|
||||
|
||||
/**
|
||||
* Structure used to track the filter instances and sessions of the filters
|
||||
* that are in use within a session.
|
||||
*/
|
||||
typedef struct {
|
||||
struct filter_def
|
||||
*filter;
|
||||
void *instance;
|
||||
void *session;
|
||||
} SESSION_FILTER;
|
||||
|
||||
/**
|
||||
* The session status block
|
||||
@ -91,6 +103,8 @@ typedef struct session {
|
||||
void *router_session;/**< The router instance data */
|
||||
SESSION_STATS stats; /**< Session statistics */
|
||||
struct service *service; /**< The service this session is using */
|
||||
int n_filters; /**< Number of filter sessions */
|
||||
SESSION_FILTER *filters; /**< The filters in use within this session */
|
||||
DOWNSTREAM head; /**< Head of the filter chain */
|
||||
struct session *next; /**< Linked list of all sessions */
|
||||
int refcount; /**< Reference count on the session */
|
||||
|
@ -15,12 +15,29 @@
|
||||
*
|
||||
* Copyright SkySQL Ab 2014
|
||||
*/
|
||||
|
||||
/**
|
||||
* QLA Filter - Query Log All. A primitive query logging filter, simply
|
||||
* used to verify the filter mechanism for downstream filters. All queries
|
||||
* that are passed through the filter will be written to file.
|
||||
*
|
||||
* The filter makes no attempt to deal with query packets that do not fit
|
||||
* in a single GWBUF.
|
||||
*
|
||||
* A single option may be passed to the filter, this is the name of the
|
||||
* file to which the queries are logged. A serial number is appended to this
|
||||
* name in order that each session logs to a different file.
|
||||
*/
|
||||
#include <stdio.h>
|
||||
#include <fcntl.h>
|
||||
#include <filter.h>
|
||||
#include <string.h>
|
||||
|
||||
static char *version_str = "V1.0.0";
|
||||
|
||||
/*
|
||||
* The filter entry points
|
||||
*/
|
||||
static FILTER *createInstance(char **options);
|
||||
static void *newSession(FILTER *instance, SESSION *session);
|
||||
static void closeSession(FILTER *instance, void *session);
|
||||
@ -41,7 +58,12 @@ static FILTER_OBJECT MyObject = {
|
||||
};
|
||||
|
||||
/**
|
||||
* A dummy instance structure
|
||||
* A instance structure, the assumption is that the option passed
|
||||
* to the filter is simply a base for the filename to which the queries
|
||||
* are logged.
|
||||
*
|
||||
* To this base a session number is attached such that each session will
|
||||
* have a nique name.
|
||||
*/
|
||||
typedef struct {
|
||||
int sessions;
|
||||
@ -49,11 +71,16 @@ typedef struct {
|
||||
} QLA_INSTANCE;
|
||||
|
||||
/**
|
||||
* A dummy session structure for this test filter
|
||||
* The session structure for this QLA filter.
|
||||
* This stores the downstream filter information, such that the
|
||||
* filter is able to pass the query on to the next filter (or router)
|
||||
* in the chain.
|
||||
*
|
||||
* It also holds the file descriptor to which queries are written.
|
||||
*/
|
||||
typedef struct {
|
||||
DOWNSTREAM down;
|
||||
int mysession;
|
||||
char *filename;
|
||||
int fd;
|
||||
} QLA_SESSION;
|
||||
|
||||
@ -116,7 +143,9 @@ QLA_INSTANCE *my_instance;
|
||||
}
|
||||
|
||||
/**
|
||||
* Associate a new session with this instance of the router.
|
||||
* Associate a new session with this instance of the filter.
|
||||
*
|
||||
* Create the file to log to and open it.
|
||||
*
|
||||
* @param instance The filter instance data
|
||||
* @param session The session itself
|
||||
@ -127,23 +156,30 @@ newSession(FILTER *instance, SESSION *session)
|
||||
{
|
||||
QLA_INSTANCE *my_instance = (QLA_INSTANCE *)instance;
|
||||
QLA_SESSION *my_session;
|
||||
char filename[100];
|
||||
|
||||
if ((my_session = calloc(1, sizeof(QLA_SESSION))) != NULL)
|
||||
{
|
||||
sprintf(filename, "%s.%d", my_instance->filebase,
|
||||
if ((my_session->filename =
|
||||
(char *)malloc(strlen(my_instance->filebase) + 20))
|
||||
== NULL)
|
||||
{
|
||||
free(my_session);
|
||||
return NULL;
|
||||
}
|
||||
sprintf(my_session->filename, "%s.%d", my_instance->filebase,
|
||||
my_instance->sessions);
|
||||
printf("Open file %s\n", filename);
|
||||
my_instance->sessions++;
|
||||
my_session->fd = open(filename, O_WRONLY|O_CREAT, 0666);
|
||||
my_session->fd = open(my_session->filename,
|
||||
O_WRONLY|O_CREAT, 0666);
|
||||
}
|
||||
|
||||
return my_session;
|
||||
}
|
||||
|
||||
/**
|
||||
* Close a session with the router, this is the mechanism
|
||||
* by which a router may cleanup data structure etc.
|
||||
* Close a session with the filter, this is the mechanism
|
||||
* by which a filter may cleanup data structure etc.
|
||||
* In the case of the QLA filter we simple close the file descriptor.
|
||||
*
|
||||
* @param instance The filter instance data
|
||||
* @param session The session being closed
|
||||
@ -151,17 +187,35 @@ printf("Open file %s\n", filename);
|
||||
static void
|
||||
closeSession(FILTER *instance, void *session)
|
||||
{
|
||||
QLA_SESSION *my_session;
|
||||
QLA_SESSION *my_session = (QLA_SESSION *)session;
|
||||
|
||||
close(my_session->fd);
|
||||
}
|
||||
|
||||
/**
|
||||
* Free the memory associated with the session
|
||||
*
|
||||
* @param instance The filter instance
|
||||
* @param session The filter session
|
||||
*/
|
||||
static void
|
||||
freeSession(FILTER *instance, void *session)
|
||||
{
|
||||
QLA_SESSION *my_session = (QLA_SESSION *)session;
|
||||
|
||||
free(my_session->filename);
|
||||
free(session);
|
||||
return;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the downstream filter or router to which queries will be
|
||||
* passed from this filter.
|
||||
*
|
||||
* @param instance The filter instance data
|
||||
* @param session The filter session
|
||||
* @param downstream The downstream filter or router.
|
||||
*/
|
||||
static void
|
||||
setDownstream(FILTER *instance, void *session, DOWNSTREAM *downstream)
|
||||
{
|
||||
@ -170,6 +224,16 @@ QLA_SESSION *my_session = (QLA_SESSION *)session;
|
||||
my_session->down = *downstream;
|
||||
}
|
||||
|
||||
/**
|
||||
* The routeQuery entry point. This is passed the query buffer
|
||||
* to which the filter should be applied. Once applied the
|
||||
* query shoudl normally be passed to the downstream component
|
||||
* (filter or router) in the filter chain.
|
||||
*
|
||||
* @param instance The filter instance data
|
||||
* @param session The filter session
|
||||
* @param queue The query data
|
||||
*/
|
||||
static int
|
||||
routeQuery(FILTER *instance, void *session, GWBUF *queue)
|
||||
{
|
||||
@ -177,16 +241,19 @@ QLA_SESSION *my_session = (QLA_SESSION *)session;
|
||||
unsigned char *ptr;
|
||||
unsigned int length;
|
||||
|
||||
/* Find the text of the query and write to the file */
|
||||
ptr = GWBUF_DATA(queue);
|
||||
length = *ptr++;
|
||||
length += (*ptr++ << 8);
|
||||
length += (*ptr++ << 8);
|
||||
ptr++; // Skip sequence id
|
||||
if (*ptr++ == 0x03) // COM_QUERY
|
||||
if (*ptr++ == 0x03 && my_session->fd != -1) // COM_QUERY
|
||||
{
|
||||
write(my_session->fd, ptr, length - 1);
|
||||
write(my_session->fd, "\n", 1);
|
||||
}
|
||||
|
||||
/* Pass the query downstream */
|
||||
return my_session->down.routeQuery(my_session->down.instance,
|
||||
my_session->down.session, queue);
|
||||
}
|
||||
@ -205,7 +272,12 @@ unsigned int length;
|
||||
static void
|
||||
diagnostic(FILTER *instance, void *fsession, DCB *dcb)
|
||||
{
|
||||
QLA_INSTANCE *my_instance = instance;
|
||||
QLA_SESSION *my_session = fsession;
|
||||
QLA_INSTANCE *my_instance = (QLA_INSTANCE *)instance;
|
||||
QLA_SESSION *my_session = (QLA_SESSION *)fsession;
|
||||
|
||||
if (my_session)
|
||||
{
|
||||
dcb_printf(dcb, "\t\tLogging to file %s.\n",
|
||||
my_session->filename);
|
||||
}
|
||||
}
|
||||
|
@ -18,6 +18,16 @@
|
||||
#include <stdio.h>
|
||||
#include <filter.h>
|
||||
|
||||
/**
|
||||
* testfilter.c - a very simple test filter.
|
||||
*
|
||||
* This filter is a very simple example used to test the filter API,
|
||||
* it merely counts the number of statements that flow through the
|
||||
* filter pipeline.
|
||||
*
|
||||
* Reporting is done via the diagnostics print routine.
|
||||
*/
|
||||
|
||||
static char *version_str = "V1.0.0";
|
||||
|
||||
static FILTER *createInstance(char **options);
|
||||
@ -103,11 +113,11 @@ TEST_INSTANCE *my_instance;
|
||||
|
||||
if ((my_instance = calloc(1, sizeof(TEST_INSTANCE))) != NULL)
|
||||
my_instance->sessions = 0;
|
||||
return my_instance;
|
||||
return (FILTER *)my_instance;
|
||||
}
|
||||
|
||||
/**
|
||||
* Associate a new session with this instance of the router.
|
||||
* Associate a new session with this instance of the filter.
|
||||
*
|
||||
* @param instance The filter instance data
|
||||
* @param session The session itself
|
||||
@ -129,8 +139,8 @@ TEST_SESSION *my_session;
|
||||
}
|
||||
|
||||
/**
|
||||
* Close a session with the router, this is the mechanism
|
||||
* by which a router may cleanup data structure etc.
|
||||
* Close a session with the filter, this is the mechanism
|
||||
* by which a filter may cleanup data structure etc.
|
||||
*
|
||||
* @param instance The filter instance data
|
||||
* @param session The session being closed
|
||||
@ -140,12 +150,26 @@ closeSession(FILTER *instance, void *session)
|
||||
{
|
||||
}
|
||||
|
||||
/**
|
||||
* Free the memory associated with this filter session.
|
||||
*
|
||||
* @param instance The filter instance data
|
||||
* @param session The session being closed
|
||||
*/
|
||||
static void
|
||||
freeSession(FILTER *instance, void *session)
|
||||
{
|
||||
free(session);
|
||||
return;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the downstream component for this filter.
|
||||
*
|
||||
* @param instance The filter instance data
|
||||
* @param session The session being closed
|
||||
* @param downstream The downstream filter or router
|
||||
*/
|
||||
static void
|
||||
setDownstream(FILTER *instance, void *session, DOWNSTREAM *downstream)
|
||||
{
|
||||
@ -154,6 +178,16 @@ TEST_SESSION *my_session = (TEST_SESSION *)session;
|
||||
my_session->down = *downstream;
|
||||
}
|
||||
|
||||
/**
|
||||
* The routeQuery entry point. This is passed the query buffer
|
||||
* to which the filter should be applied. Once applied the
|
||||
* query shoudl normally be passed to the downstream component
|
||||
* (filter or router) in the filter chain.
|
||||
*
|
||||
* @param instance The filter instance data
|
||||
* @param session The filter session
|
||||
* @param queue The query data
|
||||
*/
|
||||
static int
|
||||
routeQuery(FILTER *instance, void *session, GWBUF *queue)
|
||||
{
|
||||
@ -178,13 +212,13 @@ TEST_SESSION *my_session = (TEST_SESSION *)session;
|
||||
static void
|
||||
diagnostic(FILTER *instance, void *fsession, DCB *dcb)
|
||||
{
|
||||
TEST_INSTANCE *my_instance = instance;
|
||||
TEST_SESSION *my_session = fsession;
|
||||
TEST_INSTANCE *my_instance = (TEST_INSTANCE *)instance;
|
||||
TEST_SESSION *my_session = (TEST_SESSION *)fsession;
|
||||
|
||||
if (my_session)
|
||||
dcb_printf(dcb, "No. of queries routed by filter: %d\n",
|
||||
dcb_printf(dcb, "\t\tNo. of queries routed by filter: %d\n",
|
||||
my_session->count);
|
||||
else
|
||||
dcb_printf(dcb, "No. of sessions created: %d\n",
|
||||
dcb_printf(dcb, "\t\tNo. of sessions created: %d\n",
|
||||
my_instance->sessions);
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user