Merge branch 'develop' into MAX-59

This commit is contained in:
MassimilianoPinto
2014-06-23 12:42:32 +02:00
22 changed files with 984 additions and 47 deletions

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -18,10 +18,19 @@
# Date Who Description # Date Who Description
# 13/06/14 Mark Riddoch Initial implementation of MaxScale # 13/06/14 Mark Riddoch Initial implementation of MaxScale
# client program # client program
# 18/06/14 Mark Riddoch Addition of conditional for histedit
ifeq ($(wildcard /usr/include/histedit.h), )
HISTLIB=
HISTFLAG=
else
HISTLIB=-ledit
HISTFLAG=-DHISTORY
endif
CC=cc CC=cc
CFLAGS=-c -Wall -g CFLAGS=-c -Wall -g $(HISTFLAG)
SRCS= maxadmin.c SRCS= maxadmin.c
@ -29,7 +38,7 @@ HDRS=
OBJ=$(SRCS:.c=.o) OBJ=$(SRCS:.c=.o)
LIBS=-ledit LIBS=$(HISTLIB)
all: maxadmin all: maxadmin

View File

@ -46,7 +46,9 @@
#include <locale.h> #include <locale.h>
#include <errno.h> #include <errno.h>
#ifdef HISTORY
#include <histedit.h> #include <histedit.h>
#endif
static int connectMaxScale(char *hostname, char *port); static int connectMaxScale(char *hostname, char *port);
static int setipaddress(struct in_addr *a, char *p); static int setipaddress(struct in_addr *a, char *p);
@ -54,6 +56,7 @@ static int authMaxScale(int so, char *user, char *password);
static int sendCommand(int so, char *cmd); static int sendCommand(int so, char *cmd);
static void DoSource(int so, char *cmd); static void DoSource(int so, char *cmd);
#ifdef HISTORY
static char * static char *
prompt(EditLine *el __attribute__((__unused__))) prompt(EditLine *el __attribute__((__unused__)))
{ {
@ -61,17 +64,22 @@ prompt(EditLine *el __attribute__((__unused__)))
return prompt; return prompt;
} }
#endif
int int
main(int argc, char **argv) main(int argc, char **argv)
{ {
EditLine *el = NULL;
int i, num, rv, fatal = 0; int i, num, rv, fatal = 0;
#ifdef HISTORY
char *buf; char *buf;
EditLine *el = NULL;
Tokenizer *tok; Tokenizer *tok;
History *hist; History *hist;
HistEvent ev; HistEvent ev;
const LineInfo *li; const LineInfo *li;
#else
char buf[1024];
#endif
char *hostname = "localhost"; char *hostname = "localhost";
char *port = "6603"; char *port = "6603";
char *user = "admin"; char *user = "admin";
@ -196,7 +204,7 @@ char *cmd;
} }
(void) setlocale(LC_CTYPE, ""); (void) setlocale(LC_CTYPE, "");
#ifdef HISTORY
hist = history_init(); /* Init the builtin history */ hist = history_init(); /* Init the builtin history */
/* Remember 100 events */ /* Remember 100 events */
history(hist, &ev, H_SETSIZE, 100); history(hist, &ev, H_SETSIZE, 100);
@ -227,12 +235,19 @@ char *cmd;
while ((buf = el_gets(el, &num)) != NULL && num != 0) while ((buf = el_gets(el, &num)) != NULL && num != 0)
{ {
#else
while (printf("MaxScale> ") && fgets(buf, 1024, stdin) != NULL)
{
num = strlen(buf);
#endif
/* Strip trailing \n\r */ /* Strip trailing \n\r */
for (i = num - 1; buf[i] == '\r' || buf[i] == '\n'; i--) for (i = num - 1; buf[i] == '\r' || buf[i] == '\n'; i--)
buf[i] = 0; buf[i] = 0;
#ifdef HISTORY
li = el_line(el); li = el_line(el);
history(hist, &ev, H_ENTER, buf); history(hist, &ev, H_ENTER, buf);
#endif
if (!strcasecmp(buf, "quit")) if (!strcasecmp(buf, "quit"))
{ {
@ -240,10 +255,14 @@ char *cmd;
} }
else if (!strcasecmp(buf, "history")) else if (!strcasecmp(buf, "history"))
{ {
#ifdef HISTORY
for (rv = history(hist, &ev, H_LAST); rv != -1; for (rv = history(hist, &ev, H_LAST); rv != -1;
rv = history(hist, &ev, H_PREV)) rv = history(hist, &ev, H_PREV))
fprintf(stdout, "%4d %s\n", fprintf(stdout, "%4d %s\n",
ev.num, ev.str); ev.num, ev.str);
#else
fprintf(stderr, "History not supported in this version.\n");
#endif
} }
else if (!strncasecmp(buf, "source", 6)) else if (!strncasecmp(buf, "source", 6))
{ {
@ -255,9 +274,11 @@ char *cmd;
} }
} }
#ifdef HISTORY
el_end(el); el_end(el);
tok_end(tok); tok_end(tok);
history_end(hist); history_end(hist);
#endif
close(so); close(so);
return 0; return 0;
} }

View File

@ -14,7 +14,10 @@ Source: %{name}-%{version}-%{release}.tar.gz
Prefix: / Prefix: /
Group: Development/Tools Group: Development/Tools
#Requires: #Requires:
BuildRequires: gcc gcc-c++ ncurses-devel bison glibc-devel cmake libgcc perl make libtool openssl-devel libaio libaio-devel libedit-devel MariaDB-devel MariaDB-server BuildRequires: gcc gcc-c++ ncurses-devel bison glibc-devel cmake libgcc perl make libtool openssl-devel libaio libaio-devel MariaDB-devel MariaDB-server
%if 0%{?rhel} == 6
BuildRequires: libedit-devel
%endif
%description %description
MaxScale MaxScale

View File

@ -1090,7 +1090,7 @@ SERVER *server;
s = strtok(NULL, ","); s = strtok(NULL, ",");
} }
} }
if (filters) if (filters && obj->element)
serviceSetFilters(obj->element, filters); serviceSetFilters(obj->element, filters);
} }
else if (!strcmp(type, "listener")) else if (!strcmp(type, "listener"))

View File

@ -134,6 +134,9 @@ DCB *rval;
rval->next = NULL; rval->next = NULL;
rval->callbacks = NULL; rval->callbacks = NULL;
rval->remote = NULL;
rval->user = NULL;
spinlock_acquire(&dcbspin); spinlock_acquire(&dcbspin);
if (allDCBs == NULL) if (allDCBs == NULL)
allDCBs = rval; allDCBs = rval;
@ -313,6 +316,8 @@ DCB_CALLBACK *cb;
free(dcb->data); free(dcb->data);
if (dcb->remote) if (dcb->remote)
free(dcb->remote); free(dcb->remote);
if (dcb->user)
free(dcb->user);
/* Clear write and read buffers */ /* Clear write and read buffers */
if (dcb->delayq) { if (dcb->delayq) {
@ -1113,6 +1118,8 @@ printDCB(DCB *dcb)
printf("\tDCB state: %s\n", gw_dcb_state2string(dcb->state)); printf("\tDCB state: %s\n", gw_dcb_state2string(dcb->state));
if (dcb->remote) if (dcb->remote)
printf("\tConnected to: %s\n", dcb->remote); printf("\tConnected to: %s\n", dcb->remote);
if (dcb->user)
printf("\tUsername to: %s\n", dcb->user);
if (dcb->writeq) if (dcb->writeq)
printf("\tQueued write data: %d\n",gwbuf_length(dcb->writeq)); printf("\tQueued write data: %d\n",gwbuf_length(dcb->writeq));
printf("\tStatistics:\n"); printf("\tStatistics:\n");
@ -1170,6 +1177,9 @@ DCB *dcb;
if (dcb->remote) if (dcb->remote)
dcb_printf(pdcb, "\tConnected to: %s\n", dcb_printf(pdcb, "\tConnected to: %s\n",
dcb->remote); dcb->remote);
if (dcb->user)
dcb_printf(pdcb, "\tUsername: %s\n",
dcb->user);
if (dcb->writeq) if (dcb->writeq)
dcb_printf(pdcb, "\tQueued write data: %d\n", dcb_printf(pdcb, "\tQueued write data: %d\n",
gwbuf_length(dcb->writeq)); gwbuf_length(dcb->writeq));

View File

@ -135,6 +135,19 @@ FILTER_DEF *filter;
return filter; return filter;
} }
/**
* Check a parameter to see if it is a standard filter parameter
*
* @param name Parameter name to check
*/
int
filter_standard_parameter(char *name)
{
if (strcmp(name, "type") == 0 || strcmp(name, "module") == 0)
return 1;
return 0;
}
/** /**
* Print all filters to a DCB * Print all filters to a DCB
* *
@ -289,6 +302,17 @@ int i;
spinlock_release(&filter->spin); spinlock_release(&filter->spin);
} }
/**
* Connect the downstream filter chain for a filter.
*
* This will create the filter instance, loading the filter module, and
* conenct the fitler into the downstream chain.
*
* @param filter The filter to add into the chain
* @param session The client session
* @param downstream The filter downstream of this filter
* @return The downstream component for the next filter
*/
DOWNSTREAM * DOWNSTREAM *
filterApply(FILTER_DEF *filter, SESSION *session, DOWNSTREAM *downstream) filterApply(FILTER_DEF *filter, SESSION *session, DOWNSTREAM *downstream)
{ {
@ -318,3 +342,42 @@ DOWNSTREAM *me;
return me; return me;
} }
/**
* Connect a filter in the up stream filter chain for a session
*
* Note, the filter will have been created when the downstream chian was
* previously setup.
* Note all filters require to be in the upstream chain, so this routine
* may skip a filter if it does not provide an upstream interface.
*
* @param filter The fitler to add to the chain
* @param fsession The filter session
* @param upstream The filter that should be upstream of this filter
* @return The upstream component for the next filter
*/
UPSTREAM *
filterUpstream(FILTER_DEF *filter, void *fsession, UPSTREAM *upstream)
{
UPSTREAM *me;
/*
* The the filter has no setUpstream entry point then is does
* not require to see results and can be left out of the chain.
*/
if (filter->obj->setUpstream == NULL)
return upstream;
if (filter->obj->clientReply != NULL)
{
if ((me = (UPSTREAM *)calloc(1, sizeof(UPSTREAM))) == NULL)
{
return NULL;
}
me->instance = filter->filter;
me->session = fsession;
me->clientReply = filter->obj->clientReply;
filter->obj->setUpstream(me->instance, me->session, upstream);
}
return me;
}

View File

@ -167,6 +167,10 @@ session_alloc(SERVICE *service, DCB *client_dcb)
session->head.routeQuery = (void *)(service->router->routeQuery); session->head.routeQuery = (void *)(service->router->routeQuery);
session->tail.instance = session;
session->tail.session = session;
session->tail.clientReply = session_reply;
if (service->n_filters > 0) if (service->n_filters > 0)
{ {
if (!session_setup_filters(session)) if (!session_setup_filters(session))
@ -315,6 +319,12 @@ bool session_free(
} }
if (session->n_filters) if (session->n_filters)
{ {
for (i = 0; i < session->n_filters; i++)
{
session->filters[i].filter->obj->closeSession(
session->filters[i].instance,
session->filters[i].session);
}
for (i = 0; i < session->n_filters; i++) for (i = 0; i < session->n_filters; i++)
{ {
session->filters[i].filter->obj->freeSession( session->filters[i].filter->obj->freeSession(
@ -616,6 +626,7 @@ session_setup_filters(SESSION *session)
{ {
SERVICE *service = session->service; SERVICE *service = session->service;
DOWNSTREAM *head; DOWNSTREAM *head;
UPSTREAM *tail;
int i; int i;
if ((session->filters = calloc(service->n_filters, if ((session->filters = calloc(service->n_filters,
@ -646,9 +657,54 @@ int i;
session->head = *head; session->head = *head;
} }
for (i = 0; i < service->n_filters; i++)
{
if ((tail = filterUpstream(service->filters[i],
session->filters[i].session,
&session->tail)) == NULL)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Failed to create filter '%s' for service '%s'.\n",
service->filters[i]->name,
service->name)));
return 0;
}
session->tail = *tail;
}
return 1; return 1;
} }
/**
* Entry point for the final element int he upstream filter, i.e. the writing
* of the data to the client.
*
* @param instance The "instance" data
* @param session The session
* @param data The buffer chain to write
*/
int
session_reply(void *instance, void *session, GWBUF *data)
{
SESSION *the_session = (SESSION *)session;
return the_session->client->func.write(the_session->client, data);
}
/**
* Return the client connection address or name
*
* @param session The session whose client address to return
*/
char *
session_get_remote(SESSION *session)
{
if (session && session->client)
return session->client->remote;
return NULL;
}
bool session_route_query ( bool session_route_query (
SESSION* ses, SESSION* ses,
GWBUF* buf) GWBUF* buf)
@ -675,3 +731,16 @@ return_succp:
return succp; return succp;
} }
/**
* Return the username of the user connected to the client side of the
* session.
*
* @param session The session pointer.
* @return The user name or NULL if it can not be determined.
*/
char *
session_getUser(SESSION *session)
{
return (session && session->client) ? session->client->user : NULL;
}

View File

@ -208,6 +208,7 @@ typedef struct dcb {
int fd; /**< The descriptor */ int fd; /**< The descriptor */
dcb_state_t state; /**< Current descriptor state */ dcb_state_t state; /**< Current descriptor state */
char *remote; /**< Address of remote end */ char *remote; /**< Address of remote end */
char *user; /**< User name for connection */
struct sockaddr_in ipv4; /**< remote end IPv4 address */ struct sockaddr_in ipv4; /**< remote end IPv4 address */
void *protocol; /**< The protocol specific state */ void *protocol; /**< The protocol specific state */
struct session *session; /**< The owning session */ struct session *session; /**< The owning session */

View File

@ -61,6 +61,7 @@ typedef struct {
* filter pipline * filter pipline
* routeQuery Called on each query that requires * routeQuery Called on each query that requires
* routing * routing
* clientReply
* diagnostics Called to force the filter to print * diagnostics Called to force the filter to print
* diagnostic output * diagnostic output
* *
@ -74,7 +75,9 @@ typedef struct filter_object {
void (*closeSession)(FILTER *instance, void *fsession); void (*closeSession)(FILTER *instance, void *fsession);
void (*freeSession)(FILTER *instance, void *fsession); void (*freeSession)(FILTER *instance, void *fsession);
void (*setDownstream)(FILTER *instance, void *fsession, DOWNSTREAM *downstream); void (*setDownstream)(FILTER *instance, void *fsession, DOWNSTREAM *downstream);
void (*setUpstream)(FILTER *instance, void *fsession, UPSTREAM *downstream);
int (*routeQuery)(FILTER *instance, void *fsession, GWBUF *queue); int (*routeQuery)(FILTER *instance, void *fsession, GWBUF *queue);
int (*clientReply)(FILTER *instance, void *fsession, GWBUF *queue);
void (*diagnostics)(FILTER *instance, void *fsession, DCB *dcb); void (*diagnostics)(FILTER *instance, void *fsession, DCB *dcb);
} FILTER_OBJECT; } FILTER_OBJECT;
@ -83,7 +86,7 @@ typedef struct filter_object {
* is changed these values must be updated in line with the rules in the * is changed these values must be updated in line with the rules in the
* file modinfo.h. * file modinfo.h.
*/ */
#define FILTER_VERSION {1, 0, 0} #define FILTER_VERSION {1, 1, 0}
/** /**
* The definition of a filter form the configuration file. * The definition of a filter form the configuration file.
* This is basically the link between a plugin to load and the * This is basically the link between a plugin to load and the
@ -108,6 +111,8 @@ FILTER_DEF *filter_find(char *);
void filterAddOption(FILTER_DEF *, char *); void filterAddOption(FILTER_DEF *, char *);
void filterAddParameter(FILTER_DEF *, char *, char *); void filterAddParameter(FILTER_DEF *, char *, char *);
DOWNSTREAM *filterApply(FILTER_DEF *, SESSION *, DOWNSTREAM *); DOWNSTREAM *filterApply(FILTER_DEF *, SESSION *, DOWNSTREAM *);
UPSTREAM *filterUpstream(FILTER_DEF *, void *, UPSTREAM *);
int filter_standard_parameter(char *);
void dprintAllFilters(DCB *); void dprintAllFilters(DCB *);
void dprintFilter(DCB *, FILTER_DEF *); void dprintFilter(DCB *, FILTER_DEF *);
void dListFilters(DCB *); void dListFilters(DCB *);

View File

@ -70,8 +70,8 @@ typedef enum {
typedef struct { typedef struct {
void *instance; void *instance;
void *session; void *session;
int (*routeQuery)(void *instance, int (*routeQuery)(void *instance, void *session,
void *router_session, GWBUF *queue); GWBUF *request);
} DOWNSTREAM; } DOWNSTREAM;
/** /**
@ -81,8 +81,9 @@ typedef struct {
typedef struct { typedef struct {
void *instance; void *instance;
void *session; void *session;
int (*write)(void *, void *, GWBUF *); int (*clientReply)(void *instance,
int (*error)(void *); void *session, GWBUF *response);
int (*error)(void *instance, void *session, void *);
} UPSTREAM; } UPSTREAM;
/** /**
@ -117,6 +118,7 @@ typedef struct session {
int n_filters; /**< Number of filter sessions */ int n_filters; /**< Number of filter sessions */
SESSION_FILTER *filters; /**< The filters in use within this session */ SESSION_FILTER *filters; /**< The filters in use within this session */
DOWNSTREAM head; /**< Head of the filter chain */ DOWNSTREAM head; /**< Head of the filter chain */
UPSTREAM tail; /**< The tail of the filter chain */
struct session *next; /**< Linked list of all sessions */ struct session *next; /**< Linked list of all sessions */
int refcount; /**< Reference count on the session */ int refcount; /**< Reference count on the session */
#if defined(SS_DEBUG) #if defined(SS_DEBUG)
@ -131,13 +133,24 @@ typedef struct session {
* the incoming data to the first element in the pipeline of filters and * the incoming data to the first element in the pipeline of filters and
* routers. * routers.
*/ */
#define SESSION_ROUTE_QUERY(session, buf) \ #define SESSION_ROUTE_QUERY(sess, buf) \
((session)->head.routeQuery)((session)->head.instance, \ ((sess)->head.routeQuery)((sess)->head.instance, \
(session)->head.session, (buf)) (sess)->head.session, (buf))
/**
* A convenience macro that can be used by the router modules to route
* the replies to the first element in the pipeline of filters and
* the protocol.
*/
#define SESSION_ROUTE_REPLY(sess, buf) \
((sess)->tail.clientReply)((sess)->tail.instance, \
(sess)->tail.session, (buf))
SESSION *session_alloc(struct service *, struct dcb *); SESSION *session_alloc(struct service *, struct dcb *);
bool session_free(SESSION *); bool session_free(SESSION *);
int session_isvalid(SESSION *); int session_isvalid(SESSION *);
int session_reply(void *inst, void *session, GWBUF *data);
char *session_get_remote(SESSION *);
char *session_getUser(SESSION *);
void printAllSessions(); void printAllSessions();
void printSession(SESSION *); void printSession(SESSION *);
void dprintAllSessions(struct dcb *); void dprintAllSessions(struct dcb *);

View File

@ -38,10 +38,12 @@ QLASRCS=qlafilter.c
QLAOBJ=$(QLASRCS:.c=.o) QLAOBJ=$(QLASRCS:.c=.o)
REGEXSRCS=regexfilter.c REGEXSRCS=regexfilter.c
REGEXOBJ=$(REGEXSRCS:.c=.o) REGEXOBJ=$(REGEXSRCS:.c=.o)
SRCS=$(TESTSRCS) $(QLASRCS) $(REGEXSRCS) TOPNSRCS=topfilter.c
TOPNOBJ=$(TOPNSRCS:.c=.o)
SRCS=$(TESTSRCS) $(QLASRCS) $(REGEXSRCS) $(TOPNSRCS)
OBJ=$(SRCS:.c=.o) OBJ=$(SRCS:.c=.o)
LIBS=$(UTILSPATH)/skygw_utils.o -lssl -llog_manager LIBS=$(UTILSPATH)/skygw_utils.o -lssl -llog_manager
MODULES= libtestfilter.so libqlafilter.so libregexfilter.so MODULES= libtestfilter.so libqlafilter.so libregexfilter.so libtopfilter.so
all: $(MODULES) all: $(MODULES)
@ -55,6 +57,9 @@ libqlafilter.so: $(QLAOBJ)
libregexfilter.so: $(REGEXOBJ) libregexfilter.so: $(REGEXOBJ)
$(CC) $(LDFLAGS) $(REGEXOBJ) $(LIBS) -o $@ $(CC) $(LDFLAGS) $(REGEXOBJ) $(LIBS) -o $@
libtopfilter.so: $(TOPNOBJ)
$(CC) $(LDFLAGS) $(TOPNOBJ) $(LIBS) -o $@
.c.o: .c.o:
$(CC) $(CFLAGS) $< -o $@ $(CC) $(CFLAGS) $< -o $@

View File

@ -27,15 +27,26 @@
* A single option may be passed to the filter, this is the name of the * A single option may be passed to the filter, this is the name of the
* file to which the queries are logged. A serial number is appended to this * file to which the queries are logged. A serial number is appended to this
* name in order that each session logs to a different file. * name in order that each session logs to a different file.
*
* Date Who Description
* 03/06/2014 Mark Riddoch Initial implementation
* 11/06/2014 Mark Riddoch Addition of source and match parameters
* 19/06/2014 Mark Riddoch Addition of user parameter
*
*/ */
#include <stdio.h> #include <stdio.h>
#include <fcntl.h> #include <fcntl.h>
#include <filter.h> #include <filter.h>
#include <modinfo.h> #include <modinfo.h>
#include <modutil.h> #include <modutil.h>
#include <string.h> #include <skygw_utils.h>
#include <log_manager.h>
#include <time.h> #include <time.h>
#include <sys/time.h> #include <sys/time.h>
#include <regex.h>
#include <string.h>
extern int lm_enabled_logfiles_bitmask;
MODULE_INFO info = { MODULE_INFO info = {
MODULE_API_FILTER, MODULE_API_FILTER,
@ -44,7 +55,7 @@ MODULE_INFO info = {
"A simple query logging filter" "A simple query logging filter"
}; };
static char *version_str = "V1.0.0"; static char *version_str = "V1.1.1";
/* /*
* The filter entry points * The filter entry points
@ -64,7 +75,9 @@ static FILTER_OBJECT MyObject = {
closeSession, closeSession,
freeSession, freeSession,
setDownstream, setDownstream,
NULL, // No Upstream requirement
routeQuery, routeQuery,
NULL, // No client reply
diagnostic, diagnostic,
}; };
@ -77,8 +90,14 @@ static FILTER_OBJECT MyObject = {
* have a nique name. * have a nique name.
*/ */
typedef struct { typedef struct {
int sessions; int sessions; /* The count of sessions */
char *filebase; char *filebase; /* The filemane base */
char *source; /* The source of the client connection */
char *userName; /* The user name to filter on */
char *match; /* Optional text to match against */
regex_t re; /* Compiled regex text */
char *nomatch; /* Optional text to match against for exclusion */
regex_t nore; /* Compiled regex nomatch text */
} QLA_INSTANCE; } QLA_INSTANCE;
/** /**
@ -92,7 +111,8 @@ typedef struct {
typedef struct { typedef struct {
DOWNSTREAM down; DOWNSTREAM down;
char *filename; char *filename;
int fd; FILE *fp;
int active;
} QLA_SESSION; } QLA_SESSION;
/** /**
@ -141,6 +161,7 @@ static FILTER *
createInstance(char **options, FILTER_PARAMETER **params) createInstance(char **options, FILTER_PARAMETER **params)
{ {
QLA_INSTANCE *my_instance; QLA_INSTANCE *my_instance;
int i;
if ((my_instance = calloc(1, sizeof(QLA_INSTANCE))) != NULL) if ((my_instance = calloc(1, sizeof(QLA_INSTANCE))) != NULL)
{ {
@ -148,7 +169,71 @@ QLA_INSTANCE *my_instance;
my_instance->filebase = strdup(options[0]); my_instance->filebase = strdup(options[0]);
else else
my_instance->filebase = strdup("qla"); my_instance->filebase = strdup("qla");
my_instance->source = NULL;
my_instance->userName = NULL;
my_instance->match = NULL;
my_instance->nomatch = NULL;
if (params)
{
for (i = 0; params[i]; i++)
{
if (!strcmp(params[i]->name, "match"))
{
my_instance->match = strdup(params[i]->value);
}
else if (!strcmp(params[i]->name, "exclude"))
{
my_instance->nomatch = strdup(params[i]->value);
}
else if (!strcmp(params[i]->name, "source"))
my_instance->source = strdup(params[i]->value);
else if (!strcmp(params[i]->name, "user"))
my_instance->userName = strdup(params[i]->value);
else if (!strcmp(params[i]->name, "filebase"))
{
if (my_instance->filebase)
free(my_instance->filebase);
my_instance->source = strdup(params[i]->value);
}
else if (!filter_standard_parameter(params[i]->name))
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"qlafilter: Unexpected parameter '%s'.\n",
params[i]->name)));
}
}
}
my_instance->sessions = 0; my_instance->sessions = 0;
if (my_instance->match &&
regcomp(&my_instance->re, my_instance->match, REG_ICASE))
{
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"qlafilter: Invalid regular expression '%s'"
" for the match parameter.\n",
my_instance->match)));
free(my_instance->match);
free(my_instance->source);
free(my_instance->filebase);
free(my_instance);
return NULL;
}
if (my_instance->nomatch &&
regcomp(&my_instance->nore, my_instance->nomatch,
REG_ICASE))
{
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"qlafilter: Invalid regular expression '%s'"
" for the nomatch paramter.\n",
my_instance->match)));
if (my_instance->match)
regfree(&my_instance->re);
free(my_instance->match);
free(my_instance->source);
free(my_instance->filebase);
free(my_instance);
return NULL;
}
} }
return (FILTER *)my_instance; return (FILTER *)my_instance;
} }
@ -167,6 +252,7 @@ newSession(FILTER *instance, SESSION *session)
{ {
QLA_INSTANCE *my_instance = (QLA_INSTANCE *)instance; QLA_INSTANCE *my_instance = (QLA_INSTANCE *)instance;
QLA_SESSION *my_session; QLA_SESSION *my_session;
char *remote, *userName;
if ((my_session = calloc(1, sizeof(QLA_SESSION))) != NULL) if ((my_session = calloc(1, sizeof(QLA_SESSION))) != NULL)
{ {
@ -177,11 +263,22 @@ QLA_SESSION *my_session;
free(my_session); free(my_session);
return NULL; return NULL;
} }
my_session->active = 1;
if (my_instance->source
&& (remote = session_get_remote(session)) != NULL)
{
if (strcmp(remote, my_instance->source))
my_session->active = 0;
}
userName = session_getUser(session);
if (my_instance->userName && userName && strcmp(userName,
my_instance->userName))
my_session->active = 0;
sprintf(my_session->filename, "%s.%d", my_instance->filebase, sprintf(my_session->filename, "%s.%d", my_instance->filebase,
my_instance->sessions); my_instance->sessions);
my_instance->sessions++; my_instance->sessions++;
my_session->fd = open(my_session->filename, if (my_session->active)
O_WRONLY|O_CREAT|O_TRUNC, 0666); my_session->fp = fopen(my_session->filename, "w");
} }
return my_session; return my_session;
@ -200,7 +297,8 @@ closeSession(FILTER *instance, void *session)
{ {
QLA_SESSION *my_session = (QLA_SESSION *)session; QLA_SESSION *my_session = (QLA_SESSION *)session;
close(my_session->fd); if (my_session->active && my_session->fp)
fclose(my_session->fp);
} }
/** /**
@ -248,22 +346,29 @@ QLA_SESSION *my_session = (QLA_SESSION *)session;
static int static int
routeQuery(FILTER *instance, void *session, GWBUF *queue) routeQuery(FILTER *instance, void *session, GWBUF *queue)
{ {
QLA_INSTANCE *my_instance = (QLA_INSTANCE *)instance;
QLA_SESSION *my_session = (QLA_SESSION *)session; QLA_SESSION *my_session = (QLA_SESSION *)session;
char *ptr, t_buf[40]; char *ptr;
int length; int length;
struct tm t; struct tm t;
struct timeval tv; struct timeval tv;
if (modutil_extract_SQL(queue, &ptr, &length)) if (my_session->active && modutil_extract_SQL(queue, &ptr, &length))
{ {
gettimeofday(&tv, NULL); if ((my_instance->match == NULL ||
localtime_r(&tv.tv_sec, &t); regexec(&my_instance->re, ptr, 0, NULL, 0) == 0) &&
sprintf(t_buf, "%02d:%02d:%02d.%-3d %d/%02d/%d, ", (my_instance->nomatch == NULL ||
t.tm_hour, t.tm_min, t.tm_sec, (int)(tv.tv_usec / 1000), regexec(&my_instance->nore,ptr,0,NULL, 0) != 0))
t.tm_mday, t.tm_mon + 1, 1900 + t.tm_year); {
write(my_session->fd, t_buf, strlen(t_buf)); gettimeofday(&tv, NULL);
write(my_session->fd, ptr, length); localtime_r(&tv.tv_sec, &t);
write(my_session->fd, "\n", 1); fprintf(my_session->fp,
"%02d:%02d:%02d.%-3d %d/%02d/%d, ",
t.tm_hour, t.tm_min, t.tm_sec, (int)(tv.tv_usec / 1000),
t.tm_mday, t.tm_mon + 1, 1900 + t.tm_year);
fwrite(ptr, sizeof(char), length, my_session->fp);
fwrite("\n", sizeof(char), 1, my_session->fp);
}
} }
/* Pass the query downstream */ /* Pass the query downstream */

View File

@ -19,9 +19,13 @@
#include <filter.h> #include <filter.h>
#include <modinfo.h> #include <modinfo.h>
#include <modutil.h> #include <modutil.h>
#include <skygw_utils.h>
#include <log_manager.h>
#include <string.h> #include <string.h>
#include <regex.h> #include <regex.h>
extern int lm_enabled_logfiles_bitmask;
/** /**
* regexfilter.c - a very simple regular expression rewrite filter. * regexfilter.c - a very simple regular expression rewrite filter.
* *
@ -29,6 +33,12 @@
* Two parameters should be defined in the filter configuration * Two parameters should be defined in the filter configuration
* match=<regular expression> * match=<regular expression>
* replace=<replacement text> * replace=<replacement text>
* Two optional parameters
* source=<source address to limit filter>
* user=<username to limit filter>
*
* Date Who Description
* 19/06/2014 Mark Riddoch Addition of source and user parameters
*/ */
MODULE_INFO info = { MODULE_INFO info = {
@ -38,7 +48,7 @@ MODULE_INFO info = {
"A query rewrite filter that uses regular expressions to rewite queries" "A query rewrite filter that uses regular expressions to rewite queries"
}; };
static char *version_str = "V1.0.0"; static char *version_str = "V1.1.0";
static FILTER *createInstance(char **options, FILTER_PARAMETER **params); static FILTER *createInstance(char **options, FILTER_PARAMETER **params);
static void *newSession(FILTER *instance, SESSION *session); static void *newSession(FILTER *instance, SESSION *session);
@ -56,7 +66,9 @@ static FILTER_OBJECT MyObject = {
closeSession, closeSession,
freeSession, freeSession,
setDownstream, setDownstream,
NULL, // No Upstream requirement
routeQuery, routeQuery,
NULL,
diagnostic, diagnostic,
}; };
@ -64,6 +76,8 @@ static FILTER_OBJECT MyObject = {
* Instance structure * Instance structure
*/ */
typedef struct { typedef struct {
char *source; /* Source address to restrict matches */
char *user; /* User name to restrict matches */
char *match; /* Regular expression to match */ char *match; /* Regular expression to match */
char *replace; /* Replacement text */ char *replace; /* Replacement text */
regex_t re; /* Compiled regex text */ regex_t re; /* Compiled regex text */
@ -73,9 +87,10 @@ typedef struct {
* The session structure for this regex filter * The session structure for this regex filter
*/ */
typedef struct { typedef struct {
DOWNSTREAM down; DOWNSTREAM down; /* The downstream filter */
int no_change; int no_change; /* No. of unchanged requests */
int replacements; int replacements; /* No. of changed requests */
int active; /* Is filter active */
} REGEX_SESSION; } REGEX_SESSION;
/** /**
@ -124,20 +139,54 @@ static FILTER *
createInstance(char **options, FILTER_PARAMETER **params) createInstance(char **options, FILTER_PARAMETER **params)
{ {
REGEX_INSTANCE *my_instance; REGEX_INSTANCE *my_instance;
int i; int i, cflags = REG_ICASE;
if ((my_instance = calloc(1, sizeof(REGEX_INSTANCE))) != NULL) if ((my_instance = calloc(1, sizeof(REGEX_INSTANCE))) != NULL)
{ {
my_instance->match = NULL; my_instance->match = NULL;
my_instance->replace = NULL; my_instance->replace = NULL;
for (i = 0; params[i]; i++) for (i = 0; params && params[i]; i++)
{ {
if (!strcmp(params[i]->name, "match")) if (!strcmp(params[i]->name, "match"))
my_instance->match = strdup(params[i]->value); my_instance->match = strdup(params[i]->value);
if (!strcmp(params[i]->name, "replace")) else if (!strcmp(params[i]->name, "replace"))
my_instance->replace = strdup(params[i]->value); my_instance->replace = strdup(params[i]->value);
else if (!strcmp(params[i]->name, "source"))
my_instance->source = strdup(params[i]->value);
else if (!strcmp(params[i]->name, "user"))
my_instance->user = strdup(params[i]->value);
else if (!filter_standard_parameter(params[i]->name))
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"regexfilter: Unexpected parameter '%s'.\n",
params[i]->name)));
}
} }
if (options)
{
for (i = 0; options[i]; i++)
{
if (!strcasecmp(options[i], "ignorecase"))
{
cflags |= REG_ICASE;
}
else if (!strcasecmp(options[i], "case"))
{
cflags &= ~REG_ICASE;
}
else
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"regexfilter: unsupported option '%s'.\n",
options[i])));
}
}
}
if (my_instance->match == NULL || my_instance->replace == NULL) if (my_instance->match == NULL || my_instance->replace == NULL)
{ {
return NULL; return NULL;
@ -145,6 +194,9 @@ int i;
if (regcomp(&my_instance->re, my_instance->match, REG_ICASE)) if (regcomp(&my_instance->re, my_instance->match, REG_ICASE))
{ {
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"regexfilter: Invalid regular expression '%s'.\n",
my_instance->match)));
free(my_instance->match); free(my_instance->match);
free(my_instance->replace); free(my_instance->replace);
free(my_instance); free(my_instance);
@ -164,12 +216,27 @@ int i;
static void * static void *
newSession(FILTER *instance, SESSION *session) newSession(FILTER *instance, SESSION *session)
{ {
REGEX_INSTANCE *my_instance = (REGEX_INSTANCE *)instance;
REGEX_SESSION *my_session; REGEX_SESSION *my_session;
char *remote, *user;
if ((my_session = calloc(1, sizeof(REGEX_SESSION))) != NULL) if ((my_session = calloc(1, sizeof(REGEX_SESSION))) != NULL)
{ {
my_session->no_change = 0; my_session->no_change = 0;
my_session->replacements = 0; my_session->replacements = 0;
my_session->active = 1;
if (my_instance->source
&& (remote = session_get_remote(session)) != NULL)
{
if (strcmp(remote, my_instance->source))
my_session->active = 0;
}
if (my_instance->user && (user = session_getUser(session))
&& strcmp(user, my_instance->user))
{
my_session->active = 0;
}
} }
return my_session; return my_session;

View File

@ -54,7 +54,9 @@ static FILTER_OBJECT MyObject = {
closeSession, closeSession,
freeSession, freeSession,
setDownstream, setDownstream,
NULL, // No upstream requirement
routeQuery, routeQuery,
NULL,
diagnostic, diagnostic,
}; };

View File

@ -0,0 +1,549 @@
/*
* This file is distributed as part of MaxScale by SkySQL. It is free
* software: you can redistribute it and/or modify it under the terms of the
* GNU General Public License as published by the Free Software Foundation,
* version 2.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along with
* this program; if not, write to the Free Software Foundation, Inc., 51
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Copyright SkySQL Ab 2014
*/
/**
* TOPN Filter - Query Log All. A primitive query logging filter, simply
* used to verify the filter mechanism for downstream filters. All queries
* that are passed through the filter will be written to file.
*
* The filter makes no attempt to deal with query packets that do not fit
* in a single GWBUF.
*
* A single option may be passed to the filter, this is the name of the
* file to which the queries are logged. A serial number is appended to this
* name in order that each session logs to a different file.
*
* Date Who Description
* 18/06/2014 Mark Riddoch Addition of source and user filters
*/
#include <stdio.h>
#include <fcntl.h>
#include <filter.h>
#include <modinfo.h>
#include <modutil.h>
#include <skygw_utils.h>
#include <log_manager.h>
#include <string.h>
#include <time.h>
#include <sys/time.h>
#include <regex.h>
extern int lm_enabled_logfiles_bitmask;
MODULE_INFO info = {
MODULE_API_FILTER,
MODULE_ALPHA_RELEASE,
FILTER_VERSION,
"A top N query logging filter"
};
static char *version_str = "V1.0.1";
/*
* The filter entry points
*/
static FILTER *createInstance(char **options, FILTER_PARAMETER **);
static void *newSession(FILTER *instance, SESSION *session);
static void closeSession(FILTER *instance, void *session);
static void freeSession(FILTER *instance, void *session);
static void setDownstream(FILTER *instance, void *fsession, DOWNSTREAM *downstream);
static void setUpstream(FILTER *instance, void *fsession, UPSTREAM *upstream);
static int routeQuery(FILTER *instance, void *fsession, GWBUF *queue);
static int clientReply(FILTER *instance, void *fsession, GWBUF *queue);
static void diagnostic(FILTER *instance, void *fsession, DCB *dcb);
static FILTER_OBJECT MyObject = {
createInstance,
newSession,
closeSession,
freeSession,
setDownstream,
setUpstream,
routeQuery,
clientReply,
diagnostic,
};
/**
* A instance structure, the assumption is that the option passed
* to the filter is simply a base for the filename to which the queries
* are logged.
*
* To this base a session number is attached such that each session will
* have a unique name.
*/
typedef struct {
int sessions; /* Session count */
int topN; /* Number of queries to store */
char *filebase; /* Base of fielname to log into */
char *source; /* The source of the client connection */
char *user; /* A user name to filter on */
char *match; /* Optional text to match against */
regex_t re; /* Compiled regex text */
char *exclude; /* Optional text to match against for exclusion */
regex_t exre; /* Compiled regex nomatch text */
} TOPN_INSTANCE;
/**
* Structure to hold the Top N queries
*/
typedef struct topnq {
struct timeval duration;
char *sql;
} TOPNQ;
/**
* The session structure for this TOPN filter.
* This stores the downstream filter information, such that the
* filter is able to pass the query on to the next filter (or router)
* in the chain.
*
* It also holds the file descriptor to which queries are written.
*/
typedef struct {
DOWNSTREAM down;
UPSTREAM up;
int active;
char *clientHost;
char *userName;
char *filename;
int fd;
struct timeval start;
char *current;
TOPNQ **top;
int n_statements;
struct timeval total;
struct timeval connect;
struct timeval disconnect;
} TOPN_SESSION;
/**
* Implementation of the mandatory version entry point
*
* @return version string of the module
*/
char *
version()
{
return version_str;
}
/**
* The module initialisation routine, called when the module
* is first loaded.
*/
void
ModuleInit()
{
}
/**
* The module entry point routine. It is this routine that
* must populate the structure that is referred to as the
* "module object", this is a structure with the set of
* external entry points for this module.
*
* @return The module object
*/
FILTER_OBJECT *
GetModuleObject()
{
return &MyObject;
}
/**
* Create an instance of the filter for a particular service
* within MaxScale.
*
* @param options The options for this filter
*
* @return The instance data for this new instance
*/
static FILTER *
createInstance(char **options, FILTER_PARAMETER **params)
{
int i;
TOPN_INSTANCE *my_instance;
if ((my_instance = calloc(1, sizeof(TOPN_INSTANCE))) != NULL)
{
my_instance->topN = 10;
my_instance->match = NULL;
my_instance->exclude = NULL;
my_instance->source = NULL;
my_instance->user = NULL;
my_instance->filebase = strdup("top");
for (i = 0; params && params[i]; i++)
{
if (!strcmp(params[i]->name, "count"))
my_instance->topN = atoi(params[i]->value);
else if (!strcmp(params[i]->name, "filebase"))
{
free(my_instance->filebase);
my_instance->filebase = strdup(params[i]->value);
}
else if (!strcmp(params[i]->name, "match"))
{
my_instance->match = strdup(params[i]->value);
}
else if (!strcmp(params[i]->name, "exclude"))
{
my_instance->exclude = strdup(params[i]->value);
}
else if (!strcmp(params[i]->name, "source"))
my_instance->source = strdup(params[i]->value);
else if (!strcmp(params[i]->name, "user"))
my_instance->user = strdup(params[i]->value);
else if (!filter_standard_parameter(params[i]->name))
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"topfilter: Unexpected parameter '%s'.\n",
params[i]->name)));
}
}
if (options)
{
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"topfilter: Options are not supported by this "
" filter. They will be ignored\n")));
}
my_instance->sessions = 0;
if (my_instance->match &&
regcomp(&my_instance->re, my_instance->match, REG_ICASE))
{
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"topfilter: Invalid regular expression '%s'"
" for the match parameter.\n",
my_instance->match)));
free(my_instance->match);
free(my_instance->source);
free(my_instance->user);
free(my_instance->filebase);
free(my_instance);
return NULL;
}
if (my_instance->exclude &&
regcomp(&my_instance->exre, my_instance->exclude,
REG_ICASE))
{
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"qlafilter: Invalid regular expression '%s'"
" for the nomatch paramter.\n",
my_instance->match)));
regfree(&my_instance->re);
free(my_instance->match);
free(my_instance->source);
free(my_instance->user);
free(my_instance->filebase);
free(my_instance);
return NULL;
}
}
return (FILTER *)my_instance;
}
/**
* Associate a new session with this instance of the filter.
*
* Create the file to log to and open it.
*
* @param instance The filter instance data
* @param session The session itself
* @return Session specific data for this session
*/
static void *
newSession(FILTER *instance, SESSION *session)
{
TOPN_INSTANCE *my_instance = (TOPN_INSTANCE *)instance;
TOPN_SESSION *my_session;
int i;
char *remote, *user;
if ((my_session = calloc(1, sizeof(TOPN_SESSION))) != NULL)
{
if ((my_session->filename =
(char *)malloc(strlen(my_instance->filebase) + 20))
== NULL)
{
free(my_session);
return NULL;
}
sprintf(my_session->filename, "%s.%d", my_instance->filebase,
my_instance->sessions);
my_instance->sessions++;
my_session->top = (TOPNQ **)calloc(my_instance->topN + 1,
sizeof(TOPNQ *));
for (i = 0; i < my_instance->topN; i++)
{
my_session->top[i] = (TOPNQ *)calloc(1, sizeof(TOPNQ));
my_session->top[i]->sql = NULL;
}
my_session->n_statements = 0;
my_session->total.tv_sec = 0;
my_session->total.tv_usec = 0;
my_session->current = NULL;
if ((remote = session_get_remote(session)) != NULL)
my_session->clientHost = strdup(remote);
else
my_session->clientHost = NULL;
if ((user = session_getUser(session)) != NULL)
my_session->userName = strdup(user);
else
my_session->userName = NULL;
my_session->active = 1;
if (my_instance->source && strcmp(my_session->clientHost,
my_instance->source))
my_session->active = 0;
if (my_instance->user && strcmp(my_session->userName,
my_instance->user))
my_session->active = 0;
sprintf(my_session->filename, "%s.%d", my_instance->filebase,
my_instance->sessions);
gettimeofday(&my_session->connect, NULL);
}
return my_session;
}
/**
* Close a session with the filter, this is the mechanism
* by which a filter may cleanup data structure etc.
* In the case of the TOPN filter we simple close the file descriptor.
*
* @param instance The filter instance data
* @param session The session being closed
*/
static void
closeSession(FILTER *instance, void *session)
{
TOPN_INSTANCE *my_instance = (TOPN_INSTANCE *)instance;
TOPN_SESSION *my_session = (TOPN_SESSION *)session;
struct timeval diff;
int i;
FILE *fp;
gettimeofday(&my_session->disconnect, NULL);
timersub((&my_session->disconnect), &(my_session->connect), &diff);
if ((fp = fopen(my_session->filename, "w")) != NULL)
{
fprintf(fp, "Top %d longest running queries in session.\n",
my_instance->topN);
fprintf(fp, "==========================================\n\n");
fprintf(fp, "Time (sec) | Query\n");
fprintf(fp, "-----------+-----------------------------------------------------------------\n");
for (i = 0; i < my_instance->topN; i++)
{
if (my_session->top[i]->sql)
{
fprintf(fp, "%10.3f | %s\n",
(double)((my_session->top[i]->duration.tv_sec * 1000)
+ (my_session->top[i]->duration.tv_usec / 1000)) / 1000,
my_session->top[i]->sql);
}
}
fprintf(fp, "-----------+-----------------------------------------------------------------\n");
fprintf(fp, "\n\nSession started %s",
asctime(localtime(&my_session->connect.tv_sec)));
if (my_session->clientHost)
fprintf(fp, "Connection from %s\n",
my_session->clientHost);
if (my_session->userName)
fprintf(fp, "Username %s\n",
my_session->userName);
fprintf(fp, "\nTotal of %d statements executed.\n",
my_session->n_statements);
fprintf(fp, "Total statement execution time %5d.%d seconds\n",
(int)my_session->total.tv_sec,
(int)my_session->total.tv_usec / 1000);
fprintf(fp, "Average statement execution time %9.3f seconds\n",
(double)((my_session->total.tv_sec * 1000)
+ (my_session->total.tv_usec / 1000))
/ (1000 * my_session->n_statements));
fprintf(fp, "Total connection time %5d.%d seconds\n",
(int)diff.tv_sec, (int)diff.tv_usec / 1000);
fclose(fp);
}
}
/**
* Free the memory associated with the session
*
* @param instance The filter instance
* @param session The filter session
*/
static void
freeSession(FILTER *instance, void *session)
{
TOPN_SESSION *my_session = (TOPN_SESSION *)session;
free(my_session->filename);
free(session);
return;
}
/**
* Set the downstream filter or router to which queries will be
* passed from this filter.
*
* @param instance The filter instance data
* @param session The filter session
* @param downstream The downstream filter or router.
*/
static void
setDownstream(FILTER *instance, void *session, DOWNSTREAM *downstream)
{
TOPN_SESSION *my_session = (TOPN_SESSION *)session;
my_session->down = *downstream;
}
/**
* Set the upstream filter or session to which results will be
* passed from this filter.
*
* @param instance The filter instance data
* @param session The filter session
* @param upstream The upstream filter or session.
*/
static void
setUpstream(FILTER *instance, void *session, UPSTREAM *upstream)
{
TOPN_SESSION *my_session = (TOPN_SESSION *)session;
my_session->up = *upstream;
}
/**
* The routeQuery entry point. This is passed the query buffer
* to which the filter should be applied. Once applied the
* query should normally be passed to the downstream component
* (filter or router) in the filter chain.
*
* @param instance The filter instance data
* @param session The filter session
* @param queue The query data
*/
static int
routeQuery(FILTER *instance, void *session, GWBUF *queue)
{
TOPN_INSTANCE *my_instance = (TOPN_INSTANCE *)instance;
TOPN_SESSION *my_session = (TOPN_SESSION *)session;
char *ptr;
int length;
if (my_session->active && modutil_extract_SQL(queue, &ptr, &length))
{
if ((my_instance->match == NULL ||
regexec(&my_instance->re, ptr, 0, NULL, 0) == 0) &&
(my_instance->exclude == NULL ||
regexec(&my_instance->exre,ptr,0,NULL, 0) != 0))
{
my_session->n_statements++;
if (my_session->current)
free(my_session->current);
gettimeofday(&my_session->start, NULL);
my_session->current = strndup(ptr, length);
}
}
/* Pass the query downstream */
return my_session->down.routeQuery(my_session->down.instance,
my_session->down.session, queue);
}
static int
cmp_topn(TOPNQ **a, TOPNQ **b)
{
if ((*b)->duration.tv_sec == (*a)->duration.tv_sec)
return (*b)->duration.tv_usec - (*a)->duration.tv_usec;
return (*b)->duration.tv_sec - (*a)->duration.tv_sec;
}
static int
clientReply(FILTER *instance, void *session, GWBUF *reply)
{
TOPN_INSTANCE *my_instance = (TOPN_INSTANCE *)instance;
TOPN_SESSION *my_session = (TOPN_SESSION *)session;
struct timeval tv, diff;
int i, inserted;
if (my_session->current)
{
gettimeofday(&tv, NULL);
timersub(&tv, &(my_session->start), &diff);
timeradd(&(my_session->total), &diff, &(my_session->total));
inserted = 0;
for (i = 0; i < my_instance->topN; i++)
{
if (my_session->top[i]->sql == NULL)
{
my_session->top[i]->sql = my_session->current;
my_session->top[i]->duration = diff;
inserted = 1;
break;
}
}
if (inserted == 0 && ((diff.tv_sec > my_session->top[my_instance->topN-1]->duration.tv_sec) || (diff.tv_sec == my_session->top[my_instance->topN-1]->duration.tv_sec && diff.tv_usec > my_session->top[my_instance->topN-1]->duration.tv_usec )))
{
free(my_session->top[my_instance->topN-1]->sql);
my_session->top[my_instance->topN-1]->sql = my_session->current;
my_session->top[my_instance->topN-1]->duration = diff;
inserted = 1;
}
if (inserted)
qsort(my_session->top, my_instance->topN,
sizeof(TOPNQ *), cmp_topn);
else
free(my_session->current);
my_session->current = NULL;
}
/* Pass the result upstream */
return my_session->up.clientReply(my_session->up.instance,
my_session->up.session, reply);
}
/**
* Diagnostics routine
*
* If fsession is NULL then print diagnostics on the filter
* instance as a whole, otherwise print diagnostics for the
* particular session.
*
* @param instance The filter instance
* @param fsession Filter session, may be NULL
* @param dcb The DCB for diagnostic output
*/
static void
diagnostic(FILTER *instance, void *fsession, DCB *dcb)
{
TOPN_SESSION *my_session = (TOPN_SESSION *)fsession;
if (my_session)
{
dcb_printf(dcb, "\t\tLogging to file %s.\n",
my_session->filename);
}
}

View File

@ -337,6 +337,8 @@ static int gw_read_backend_event(DCB *dcb) {
GWBUF* errbuf; GWBUF* errbuf;
bool succp; bool succp;
/* try reload users' table for next connection */
service_refresh_users(dcb->session->service);
#if defined(SS_DEBUG) #if defined(SS_DEBUG)
LOGIF(LE, (skygw_log_write_flush( LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR, LOGFILE_ERROR,

View File

@ -483,6 +483,11 @@ static int gw_mysql_do_authentication(DCB *dcb, GWBUF *queue) {
if (auth_token) if (auth_token)
free(auth_token); free(auth_token);
if (auth_ret == 0)
{
dcb->user = strdup(client_data->user);
}
return auth_ret; return auth_ret;
} }

View File

@ -689,7 +689,7 @@ clientReply(
ss_dassert(client != NULL); ss_dassert(client != NULL);
client->func.write(client, queue); SESSION_ROUTE_REPLY(backend_dcb->session, queue);
} }
/** /**

View File

@ -1364,7 +1364,15 @@ static void clientReply(
if (writebuf != NULL && client_dcb != NULL) if (writebuf != NULL && client_dcb != NULL)
{ {
/** Write reply to client DCB */ /** Write reply to client DCB */
client_dcb->func.write(client_dcb, writebuf); SESSION_ROUTE_REPLY(backend_dcb->session, writebuf);
LOGIF(LT, (skygw_log_write_flush(
LOGFILE_TRACE,
"%lu [clientReply:rwsplit] client dcb %p, "
"backend dcb %p. End of normal reply.",
pthread_self(),
client_dcb,
backend_dcb)));
bref_clear_state(backend_ref, BREF_WAITING_RESULT); bref_clear_state(backend_ref, BREF_WAITING_RESULT);
} }