diff --git a/server/core/service.c b/server/core/service.c index d528881b1..c1b54c0af 100644 --- a/server/core/service.c +++ b/server/core/service.c @@ -626,7 +626,8 @@ char *ptr; while (isspace(*str)) str++; - ptr = str + strlen(str); + /* Point to last character of the string */ + ptr = str + strlen(str) - 1; while (ptr > str && isspace(*ptr)) *ptr-- = 0; diff --git a/server/core/session.c b/server/core/session.c index 0d3cb6105..cb392258c 100644 --- a/server/core/session.c +++ b/server/core/session.c @@ -94,6 +94,7 @@ session_alloc(SERVICE *service, DCB *client_dcb) spinlock_acquire(&session->ses_lock); session->service = service; session->client = client_dcb; + session->n_filters = 0; memset(&session->stats, 0, sizeof(SESSION_STATS)); session->stats.connect = time(0); session->state = SESSION_STATE_ALLOC; @@ -269,6 +270,7 @@ bool session_free( bool succp = false; SESSION *ptr; int nlink; + int i; CHK_SESSION(session); @@ -304,10 +306,23 @@ bool session_free( /* Free router_session and session */ if (session->router_session) { + session->service->router->closeSession( + session->service->router_instance, + session->router_session); session->service->router->freeSession( session->service->router_instance, session->router_session); } + if (session->n_filters) + { + for (i = 0; i < session->n_filters; i++) + { + session->filters[i].filter->obj->freeSession( + session->filters[i].instance, + session->filters[i].session); + } + free(session->filters); + } free(session); succp = true; @@ -482,7 +497,7 @@ SESSION *ptr; void dprintSession(DCB *dcb, SESSION *ptr) { -DOWNSTREAM *dptr; +int i; dcb_printf(dcb, "Session %p\n", ptr); dcb_printf(dcb, "\tState: %s\n", session_state(ptr->state)); @@ -491,6 +506,18 @@ DOWNSTREAM *dptr; if (ptr->client && ptr->client->remote) dcb_printf(dcb, "\tClient Address: %s\n", ptr->client->remote); dcb_printf(dcb, "\tConnected: %s", asctime(localtime(&ptr->stats.connect))); + if (ptr->n_filters) + { + for (i = 0; i < ptr->n_filters; i++) + { + dcb_printf(dcb, "\tFilter: %s\n", + ptr->filters[i].filter->name); + ptr->filters[i].filter->obj->diagnostics( + ptr->filters[i].instance, + ptr->filters[i].session, + dcb); + } + } } /** @@ -585,6 +612,16 @@ SERVICE *service = session->service; DOWNSTREAM *head; int i; + if ((session->filters = calloc(service->n_filters, + sizeof(SESSION_FILTER))) == NULL) + { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Insufficient memory to allocate session filter " + "tracking.\n"))); + return 0; + } + session->n_filters = service->n_filters; for (i = service->n_filters - 1; i >= 0; i--) { if ((head = filterApply(service->filters[i], session, @@ -597,6 +634,9 @@ int i; service->name))); return 0; } + session->filters[i].filter = service->filters[i]; + session->filters[i].session = head->session; + session->filters[i].instance = head->instance; session->head = *head; } diff --git a/server/include/session.h b/server/include/session.h index 6918c5343..6838dc1f1 100644 --- a/server/include/session.h +++ b/server/include/session.h @@ -44,6 +44,7 @@ struct dcb; struct service; +struct filter_def; /** * The session statistics structure @@ -69,9 +70,20 @@ typedef enum { typedef struct { void *instance; void *session; - int (*routeQuery)(void *instance, void *router_session, GWBUF *queue); + int (*routeQuery)(void *instance, + void *router_session, GWBUF *queue); } DOWNSTREAM; +/** + * Structure used to track the filter instances and sessions of the filters + * that are in use within a session. + */ +typedef struct { + struct filter_def + *filter; + void *instance; + void *session; +} SESSION_FILTER; /** * The session status block @@ -91,6 +103,8 @@ typedef struct session { void *router_session;/**< The router instance data */ SESSION_STATS stats; /**< Session statistics */ struct service *service; /**< The service this session is using */ + int n_filters; /**< Number of filter sessions */ + SESSION_FILTER *filters; /**< The filters in use within this session */ DOWNSTREAM head; /**< Head of the filter chain */ struct session *next; /**< Linked list of all sessions */ int refcount; /**< Reference count on the session */ diff --git a/server/modules/filter/qlafilter.c b/server/modules/filter/qlafilter.c index c47b8aeb2..b1155ce2a 100644 --- a/server/modules/filter/qlafilter.c +++ b/server/modules/filter/qlafilter.c @@ -15,12 +15,29 @@ * * Copyright SkySQL Ab 2014 */ + +/** + * QLA Filter - Query Log All. A primitive query logging filter, simply + * used to verify the filter mechanism for downstream filters. All queries + * that are passed through the filter will be written to file. + * + * The filter makes no attempt to deal with query packets that do not fit + * in a single GWBUF. + * + * A single option may be passed to the filter, this is the name of the + * file to which the queries are logged. A serial number is appended to this + * name in order that each session logs to a different file. + */ #include #include #include +#include static char *version_str = "V1.0.0"; +/* + * The filter entry points + */ static FILTER *createInstance(char **options); static void *newSession(FILTER *instance, SESSION *session); static void closeSession(FILTER *instance, void *session); @@ -41,7 +58,12 @@ static FILTER_OBJECT MyObject = { }; /** - * A dummy instance structure + * A instance structure, the assumption is that the option passed + * to the filter is simply a base for the filename to which the queries + * are logged. + * + * To this base a session number is attached such that each session will + * have a nique name. */ typedef struct { int sessions; @@ -49,11 +71,16 @@ typedef struct { } QLA_INSTANCE; /** - * A dummy session structure for this test filter + * The session structure for this QLA filter. + * This stores the downstream filter information, such that the + * filter is able to pass the query on to the next filter (or router) + * in the chain. + * + * It also holds the file descriptor to which queries are written. */ typedef struct { DOWNSTREAM down; - int mysession; + char *filename; int fd; } QLA_SESSION; @@ -116,7 +143,9 @@ QLA_INSTANCE *my_instance; } /** - * Associate a new session with this instance of the router. + * Associate a new session with this instance of the filter. + * + * Create the file to log to and open it. * * @param instance The filter instance data * @param session The session itself @@ -127,23 +156,30 @@ newSession(FILTER *instance, SESSION *session) { QLA_INSTANCE *my_instance = (QLA_INSTANCE *)instance; QLA_SESSION *my_session; -char filename[100]; if ((my_session = calloc(1, sizeof(QLA_SESSION))) != NULL) { - sprintf(filename, "%s.%d", my_instance->filebase, + if ((my_session->filename = + (char *)malloc(strlen(my_instance->filebase) + 20)) + == NULL) + { + free(my_session); + return NULL; + } + sprintf(my_session->filename, "%s.%d", my_instance->filebase, my_instance->sessions); -printf("Open file %s\n", filename); my_instance->sessions++; - my_session->fd = open(filename, O_WRONLY|O_CREAT, 0666); + my_session->fd = open(my_session->filename, + O_WRONLY|O_CREAT, 0666); } return my_session; } /** - * Close a session with the router, this is the mechanism - * by which a router may cleanup data structure etc. + * Close a session with the filter, this is the mechanism + * by which a filter may cleanup data structure etc. + * In the case of the QLA filter we simple close the file descriptor. * * @param instance The filter instance data * @param session The session being closed @@ -151,17 +187,35 @@ printf("Open file %s\n", filename); static void closeSession(FILTER *instance, void *session) { -QLA_SESSION *my_session; +QLA_SESSION *my_session = (QLA_SESSION *)session; close(my_session->fd); } +/** + * Free the memory associated with the session + * + * @param instance The filter instance + * @param session The filter session + */ static void freeSession(FILTER *instance, void *session) { +QLA_SESSION *my_session = (QLA_SESSION *)session; + + free(my_session->filename); + free(session); return; } +/** + * Set the downstream filter or router to which queries will be + * passed from this filter. + * + * @param instance The filter instance data + * @param session The filter session + * @param downstream The downstream filter or router. + */ static void setDownstream(FILTER *instance, void *session, DOWNSTREAM *downstream) { @@ -170,6 +224,16 @@ QLA_SESSION *my_session = (QLA_SESSION *)session; my_session->down = *downstream; } +/** + * The routeQuery entry point. This is passed the query buffer + * to which the filter should be applied. Once applied the + * query shoudl normally be passed to the downstream component + * (filter or router) in the filter chain. + * + * @param instance The filter instance data + * @param session The filter session + * @param queue The query data + */ static int routeQuery(FILTER *instance, void *session, GWBUF *queue) { @@ -177,16 +241,19 @@ QLA_SESSION *my_session = (QLA_SESSION *)session; unsigned char *ptr; unsigned int length; + /* Find the text of the query and write to the file */ ptr = GWBUF_DATA(queue); length = *ptr++; length += (*ptr++ << 8); length += (*ptr++ << 8); ptr++; // Skip sequence id - if (*ptr++ == 0x03) // COM_QUERY + if (*ptr++ == 0x03 && my_session->fd != -1) // COM_QUERY { write(my_session->fd, ptr, length - 1); write(my_session->fd, "\n", 1); } + + /* Pass the query downstream */ return my_session->down.routeQuery(my_session->down.instance, my_session->down.session, queue); } @@ -205,7 +272,12 @@ unsigned int length; static void diagnostic(FILTER *instance, void *fsession, DCB *dcb) { -QLA_INSTANCE *my_instance = instance; -QLA_SESSION *my_session = fsession; +QLA_INSTANCE *my_instance = (QLA_INSTANCE *)instance; +QLA_SESSION *my_session = (QLA_SESSION *)fsession; + if (my_session) + { + dcb_printf(dcb, "\t\tLogging to file %s.\n", + my_session->filename); + } } diff --git a/server/modules/filter/testfilter.c b/server/modules/filter/testfilter.c index 9e477cb5b..c32dcbd27 100644 --- a/server/modules/filter/testfilter.c +++ b/server/modules/filter/testfilter.c @@ -18,6 +18,16 @@ #include #include +/** + * testfilter.c - a very simple test filter. + * + * This filter is a very simple example used to test the filter API, + * it merely counts the number of statements that flow through the + * filter pipeline. + * + * Reporting is done via the diagnostics print routine. + */ + static char *version_str = "V1.0.0"; static FILTER *createInstance(char **options); @@ -103,11 +113,11 @@ TEST_INSTANCE *my_instance; if ((my_instance = calloc(1, sizeof(TEST_INSTANCE))) != NULL) my_instance->sessions = 0; - return my_instance; + return (FILTER *)my_instance; } /** - * Associate a new session with this instance of the router. + * Associate a new session with this instance of the filter. * * @param instance The filter instance data * @param session The session itself @@ -129,8 +139,8 @@ TEST_SESSION *my_session; } /** - * Close a session with the router, this is the mechanism - * by which a router may cleanup data structure etc. + * Close a session with the filter, this is the mechanism + * by which a filter may cleanup data structure etc. * * @param instance The filter instance data * @param session The session being closed @@ -140,12 +150,26 @@ closeSession(FILTER *instance, void *session) { } +/** + * Free the memory associated with this filter session. + * + * @param instance The filter instance data + * @param session The session being closed + */ static void freeSession(FILTER *instance, void *session) { + free(session); return; } +/** + * Set the downstream component for this filter. + * + * @param instance The filter instance data + * @param session The session being closed + * @param downstream The downstream filter or router + */ static void setDownstream(FILTER *instance, void *session, DOWNSTREAM *downstream) { @@ -154,6 +178,16 @@ TEST_SESSION *my_session = (TEST_SESSION *)session; my_session->down = *downstream; } +/** + * The routeQuery entry point. This is passed the query buffer + * to which the filter should be applied. Once applied the + * query shoudl normally be passed to the downstream component + * (filter or router) in the filter chain. + * + * @param instance The filter instance data + * @param session The filter session + * @param queue The query data + */ static int routeQuery(FILTER *instance, void *session, GWBUF *queue) { @@ -178,13 +212,13 @@ TEST_SESSION *my_session = (TEST_SESSION *)session; static void diagnostic(FILTER *instance, void *fsession, DCB *dcb) { -TEST_INSTANCE *my_instance = instance; -TEST_SESSION *my_session = fsession; +TEST_INSTANCE *my_instance = (TEST_INSTANCE *)instance; +TEST_SESSION *my_session = (TEST_SESSION *)fsession; if (my_session) - dcb_printf(dcb, "No. of queries routed by filter: %d\n", + dcb_printf(dcb, "\t\tNo. of queries routed by filter: %d\n", my_session->count); else - dcb_printf(dcb, "No. of sessions created: %d\n", + dcb_printf(dcb, "\t\tNo. of sessions created: %d\n", my_instance->sessions); }