diff --git a/Documentation/filters/QLA Filter.pdf b/Documentation/filters/QLA Filter.pdf new file mode 100644 index 000000000..94818c426 Binary files /dev/null and b/Documentation/filters/QLA Filter.pdf differ diff --git a/Documentation/filters/Regex Filter.pdf b/Documentation/filters/Regex Filter.pdf new file mode 100644 index 000000000..f45fef043 Binary files /dev/null and b/Documentation/filters/Regex Filter.pdf differ diff --git a/Documentation/filters/Tee Filter.pdf b/Documentation/filters/Tee Filter.pdf new file mode 100644 index 000000000..f8de502ca Binary files /dev/null and b/Documentation/filters/Tee Filter.pdf differ diff --git a/Documentation/filters/Top Filter.pdf b/Documentation/filters/Top Filter.pdf new file mode 100644 index 000000000..a7cb2061d Binary files /dev/null and b/Documentation/filters/Top Filter.pdf differ diff --git a/client/Makefile b/client/Makefile index 4c1982e5d..216ef23ca 100644 --- a/client/Makefile +++ b/client/Makefile @@ -18,10 +18,19 @@ # Date Who Description # 13/06/14 Mark Riddoch Initial implementation of MaxScale # client program +# 18/06/14 Mark Riddoch Addition of conditional for histedit + +ifeq ($(wildcard /usr/include/histedit.h), ) +HISTLIB= +HISTFLAG= +else +HISTLIB=-ledit +HISTFLAG=-DHISTORY +endif CC=cc -CFLAGS=-c -Wall -g +CFLAGS=-c -Wall -g $(HISTFLAG) SRCS= maxadmin.c @@ -29,7 +38,7 @@ HDRS= OBJ=$(SRCS:.c=.o) -LIBS=-ledit +LIBS=$(HISTLIB) all: maxadmin diff --git a/client/maxadmin.c b/client/maxadmin.c index 00a75a872..79161110b 100644 --- a/client/maxadmin.c +++ b/client/maxadmin.c @@ -46,14 +46,18 @@ #include #include +#ifdef HISTORY #include +#endif static int connectMaxScale(char *hostname, char *port); static int setipaddress(struct in_addr *a, char *p); static int authMaxScale(int so, char *user, char *password); static int sendCommand(int so, char *cmd); static void DoSource(int so, char *cmd); +static void DoUsage(); +#ifdef HISTORY static char * prompt(EditLine *el __attribute__((__unused__))) { @@ -61,23 +65,35 @@ prompt(EditLine *el __attribute__((__unused__))) return prompt; } +#endif +/** + * The main for the maxadmin client + * + * @param argc Number of arguments + * @param argv The command line arguments + */ int main(int argc, char **argv) { -EditLine *el = NULL; int i, num, rv, fatal = 0; +#ifdef HISTORY char *buf; +EditLine *el = NULL; Tokenizer *tok; History *hist; HistEvent ev; const LineInfo *li; +#else +char buf[1024]; +#endif char *hostname = "localhost"; char *port = "6603"; char *user = "admin"; char *passwd = NULL; int so, cmdlen; char *cmd; +int argno = 0; cmd = malloc(1); *cmd = 0; @@ -137,14 +153,41 @@ char *cmd; fatal = 1; } break; + case '-': + { + char *word; + + word = &argv[i][2]; + if (strcmp(word, "help") == 0) + { + DoUsage(); + exit(0); + } + break; + } } } else { - cmdlen += strlen(argv[i]) + 1; - cmd = realloc(cmd, cmdlen); - strcat(cmd, argv[i]); - strcat(cmd, " "); + /* Arguments after the second argument are quoted + * to allow for quoted names on the command line + * to be passed on in quotes. + */ + if (argno++ > 1) + { + cmdlen += strlen(argv[i]) + 3; + cmd = realloc(cmd, cmdlen); + strcat(cmd, "\""); + strcat(cmd, argv[i]); + strcat(cmd, "\" "); + } + else + { + cmdlen += strlen(argv[i]) + 1; + cmd = realloc(cmd, cmdlen); + strcat(cmd, argv[i]); + strcat(cmd, " "); + } } } @@ -190,13 +233,16 @@ char *cmd; if (cmdlen > 1) { - cmd[cmdlen - 2] = '\0'; - sendCommand(so, cmd); + cmd[cmdlen - 2] = '\0'; /* Remove trailing space */ + if (access(cmd, R_OK) == 0) + DoSource(so, cmd); + else + sendCommand(so, cmd); exit(0); } (void) setlocale(LC_CTYPE, ""); - +#ifdef HISTORY hist = history_init(); /* Init the builtin history */ /* Remember 100 events */ history(hist, &ev, H_SETSIZE, 100); @@ -227,12 +273,19 @@ char *cmd; while ((buf = el_gets(el, &num)) != NULL && num != 0) { +#else + while (printf("MaxScale> ") && fgets(buf, 1024, stdin) != NULL) + { + num = strlen(buf); +#endif /* Strip trailing \n\r */ for (i = num - 1; buf[i] == '\r' || buf[i] == '\n'; i--) buf[i] = 0; +#ifdef HISTORY li = el_line(el); history(hist, &ev, H_ENTER, buf); +#endif if (!strcasecmp(buf, "quit")) { @@ -240,14 +293,25 @@ char *cmd; } else if (!strcasecmp(buf, "history")) { +#ifdef HISTORY for (rv = history(hist, &ev, H_LAST); rv != -1; rv = history(hist, &ev, H_PREV)) fprintf(stdout, "%4d %s\n", ev.num, ev.str); +#else + fprintf(stderr, "History not supported in this version.\n"); +#endif } else if (!strncasecmp(buf, "source", 6)) { - DoSource(so, buf); + char *ptr; + + /* Find the filename */ + ptr = &buf[strlen("source")]; + while (*ptr && isspace(*ptr)) + ptr++; + + DoSource(so, ptr); } else if (*buf) { @@ -255,13 +319,22 @@ char *cmd; } } +#ifdef HISTORY el_end(el); tok_end(tok); history_end(hist); +#endif close(so); return 0; } +/** + * Connect to the MaxScale server + * + * @param hostname The hostname to connect to + * @param port The port to use for the connection + * @return The connected socket or -1 on error + */ static int connectMaxScale(char *hostname, char *port) { @@ -289,7 +362,7 @@ int so; } -/* +/** * Set IP address in socket structure in_addr * * @param a Pointer to a struct in_addr into which the address is written @@ -343,6 +416,14 @@ setipaddress(struct in_addr *a, char *p) return 0; } +/** + * Perform authentication using the maxscaled protocol conventions + * + * @param so The socket connected to MaxScale + * @param user The username to authenticate + * @param password The password to authenticate with + * @return Non-zero of succesful authentication + */ static int authMaxScale(int so, char *user, char *password) { @@ -357,6 +438,14 @@ char buf[20]; return strncmp(buf, "FAILED", 6); } +/** + * Send a comamnd using the MaxScaled protocol, display the return data + * on standard output + * + * @param so The socket connect to MaxScale + * @param cmd The command to send + * @return 0 if the connection was closed + */ static int sendCommand(int so, char *cmd) { @@ -378,22 +467,23 @@ int i; return 1; } +/** + * Read a file of commands and send them to MaxScale + * + * @param so The socket connected to MaxScale + * @param file The filename + */ static void -DoSource(int so, char *buf) +DoSource(int so, char *file) { char *ptr, *pe; char line[132]; FILE *fp; - /* Find the filename */ - ptr = &buf[strlen("source")]; - while (*ptr && isspace(*ptr)) - ptr++; - - if ((fp = fopen(ptr, "r")) == NULL) + if ((fp = fopen(file, "r")) == NULL) { fprintf(stderr, "Unable to open command file '%s'.\n", - ptr); + file); return; } @@ -418,3 +508,24 @@ FILE *fp; fclose(fp); return; } + +/** + * Display the --help text. + */ +static void +DoUsage() +{ + printf("maxadmin: The MaxScale administrative and monitor client.\n\n"); + printf("Usage: maxadmin [-u user] [-p password] [-h hostname] [-P port] [ | ]\n\n"); + printf(" -u user The user name to use for the connection, default\n"); + printf(" is admin.\n"); + printf(" -p password The user password, if not given the password will\n"); + printf(" be prompted for interactively\n"); + printf(" -h hostname The maxscale host to connecto to. The default is\n"); + printf(" localhost\n"); + printf(" -P port The port to use for the connection, the default\n"); + printf(" port is 6603.\n"); + printf(" --help Print this help text.\n"); + printf("Any remaining arguments are treated as MaxScale commands or a file\n"); + printf("containing commands to execute.\n"); +} diff --git a/maxscale.spec b/maxscale.spec index 2423fe1f4..7d7b8634d 100644 --- a/maxscale.spec +++ b/maxscale.spec @@ -15,6 +15,9 @@ Prefix: / Group: Development/Tools #Requires: BuildRequires: gcc gcc-c++ ncurses-devel bison glibc-devel cmake libgcc perl make libtool openssl-devel libaio libaio-devel MariaDB-devel MariaDB-server +%if 0%{?rhel} == 6 +BuildRequires: libedit-devel +%endif %description MaxScale diff --git a/server/core/buffer.c b/server/core/buffer.c index c71a5eb63..2708d32ce 100644 --- a/server/core/buffer.c +++ b/server/core/buffer.c @@ -31,6 +31,7 @@ * 10/06/13 Mark Riddoch Initial implementation * 11/07/13 Mark Riddoch Add reference count mechanism * 16/07/2013 Massimiliano Pinto Added command type to gwbuf struct + * 24/06/2014 Mark Riddoch Addition of gwbuf_trim * * @endverbatim */ @@ -295,6 +296,26 @@ int rval = 0; return rval; } +/** + * Trim bytes form the end of a GWBUF structure + * + * @param buf The buffer to trim + * @param nbytes The number of bytes to trim off + * @return The buffer chain + */ +GWBUF * +gwbuf_trim(GWBUF *buf, unsigned int n_bytes) +{ + if (GWBUF_LENGTH(buf) <= n_bytes) + { + gwbuf_consume(buf, GWBUF_LENGTH(buf)); + return NULL; + } + buf->end -= n_bytes; + + return buf; +} + bool gwbuf_set_type( GWBUF* buf, gwbuf_type_t type) diff --git a/server/core/config.c b/server/core/config.c index 668cafdf4..8fd53318b 100644 --- a/server/core/config.c +++ b/server/core/config.c @@ -1090,7 +1090,7 @@ SERVER *server; s = strtok(NULL, ","); } } - if (filters) + if (filters && obj->element) serviceSetFilters(obj->element, filters); } else if (!strcmp(type, "listener")) diff --git a/server/core/dcb.c b/server/core/dcb.c index c7e6179b6..5f4c9fa15 100644 --- a/server/core/dcb.c +++ b/server/core/dcb.c @@ -48,6 +48,7 @@ * This fixes a bug with many reads from * backend * 07/05/2014 Mark Riddoch Addition of callback mechanism + * 20/06/2014 Mark Riddoch Addition of dcb_clone * * @endverbatim */ @@ -84,6 +85,9 @@ static bool dcb_set_state_nomutex( dcb_state_t* old_state); static void dcb_call_callback(DCB *dcb, DCB_REASON reason); static DCB* dcb_get_next (DCB* dcb); +static int dcb_null_write(DCB *dcb, GWBUF *buf); +static int dcb_null_close(DCB *dcb); +static int dcb_null_auth(DCB *dcb, SERVER *server, SESSION *session, GWBUF *buf); DCB* dcb_get_zombies(void) { @@ -134,6 +138,10 @@ DCB *rval; rval->next = NULL; rval->callbacks = NULL; + rval->remote = NULL; + rval->user = NULL; + rval->flags = 0; + spinlock_acquire(&dcbspin); if (allDCBs == NULL) allDCBs = rval; @@ -245,7 +253,39 @@ dcb_add_to_zombieslist(DCB *dcb) spinlock_release(&zombiespin); } +/* + * Clone a DCB for internal use, mostly used for specialist filters + * to create dummy clients based on real clients. + * + * @param orig The DCB to clone + * @return A DCB that can be used as a client + */ +DCB * +dcb_clone(DCB *orig) +{ +DCB *clone; + if ((clone = dcb_alloc(DCB_ROLE_REQUEST_HANDLER)) == NULL) + { + return NULL; + } + + clone->fd = -1; + clone->flags |= DCBF_CLONE; + clone->state = orig->state; + clone->data = orig->data; + if (orig->remote) + clone->remote = strdup(orig->remote); + if (orig->user) + clone->user = strdup(orig->user); + clone->protocol = orig->protocol; + + clone->func.write = dcb_null_write; + clone->func.close = dcb_null_close; + clone->func.auth = dcb_null_auth; + + return clone; +} /** * Free a DCB and remove it from the chain of all DCBs @@ -308,12 +348,14 @@ DCB_CALLBACK *cb; } } - if (dcb->protocol != NULL) + if (dcb->protocol && ((dcb->flags & DCBF_CLONE) ==0)) free(dcb->protocol); - if (dcb->data) + if (dcb->data && ((dcb->flags & DCBF_CLONE) ==0)) free(dcb->data); if (dcb->remote) free(dcb->remote); + if (dcb->user) + free(dcb->user); /* Clear write and read buffers */ if (dcb->delayq) { @@ -1119,6 +1161,8 @@ printDCB(DCB *dcb) printf("\tDCB state: %s\n", gw_dcb_state2string(dcb->state)); if (dcb->remote) printf("\tConnected to: %s\n", dcb->remote); + if (dcb->user) + printf("\tUsername to: %s\n", dcb->user); if (dcb->writeq) printf("\tQueued write data: %d\n",gwbuf_length(dcb->writeq)); printf("\tStatistics:\n"); @@ -1176,6 +1220,9 @@ DCB *dcb; if (dcb->remote) dcb_printf(pdcb, "\tConnected to: %s\n", dcb->remote); + if (dcb->user) + dcb_printf(pdcb, "\tUsername: %s\n", + dcb->user); if (dcb->writeq) dcb_printf(pdcb, "\tQueued write data: %d\n", gwbuf_length(dcb->writeq)); @@ -1186,6 +1233,8 @@ DCB *dcb; dcb_printf(pdcb, "\t\tNo. of Accepts: %d\n", dcb->stats.n_accepts); dcb_printf(pdcb, "\t\tNo. of High Water Events: %d\n", dcb->stats.n_high_water); dcb_printf(pdcb, "\t\tNo. of Low Water Events: %d\n", dcb->stats.n_low_water); + if (dcb->flags & DCBF_CLONE) + dcb_printf(pdcb, "\t\tDCB is a clone.\n"); dcb = dcb->next; } spinlock_release(&dcbspin); @@ -1250,6 +1299,8 @@ dprintDCB(DCB *pdcb, DCB *dcb) dcb->stats.n_high_water); dcb_printf(pdcb, "\t\tNo. of Low Water Events: %d\n", dcb->stats.n_low_water); + if (dcb->flags & DCBF_CLONE) + dcb_printf(pdcb, "\t\tDCB is a clone.\n"); } /** @@ -1280,7 +1331,7 @@ gw_dcb_state2string (int state) { } /** - * A DCB based wrapper for printf. Allows formattign printing to + * A DCB based wrapper for printf. Allows formatting printing to * a descritor control block. * * @param dcb Descriptor to write to @@ -1817,4 +1868,47 @@ void dcb_call_foreach ( } return; } - + + +/** + * Null protocol write routine used for cloned dcb's. It merely consumes + * buffers written on the cloned DCB. + * + * @params dcb The descriptor control block + * @params buf The buffer beign written + * @return Always returns a good write operation result + */ +static int +dcb_null_write(DCB *dcb, GWBUF *buf) +{ + while (buf) + { + buf = gwbuf_consume(buf, GWBUF_LENGTH(buf)); + } + return 1; +} + +/** + * Null protocol close operation for use by cloned DCB's. + * + * @param dcb The DCB being closed. + */ +static int +dcb_null_close(DCB *dcb) +{ + return 0; +} + +/** + * Null protocol auth operation for use by cloned DCB's. + * + * @param dcb The DCB being closed. + * @param server The server to auth against + * @param session The user session + * @param buf The buffer with the new auth request + */ +static int +dcb_null_auth(DCB *dcb, SERVER *server, SESSION *session, GWBUF *buf) +{ + return 0; +} diff --git a/server/core/filter.c b/server/core/filter.c index 87049461a..baeea21d7 100644 --- a/server/core/filter.c +++ b/server/core/filter.c @@ -135,6 +135,19 @@ FILTER_DEF *filter; return filter; } +/** + * Check a parameter to see if it is a standard filter parameter + * + * @param name Parameter name to check + */ +int +filter_standard_parameter(char *name) +{ + if (strcmp(name, "type") == 0 || strcmp(name, "module") == 0) + return 1; + return 0; +} + /** * Print all filters to a DCB * @@ -209,13 +222,13 @@ int i; { dcb_printf(dcb, "Filters\n"); dcb_printf(dcb, "--------------------+-----------------+----------------------------------------\n"); - dcb_printf(dcb, "%-18s | %-15s | Options\n", + dcb_printf(dcb, "%-19s | %-15s | Options\n", "Filter", "Module"); dcb_printf(dcb, "--------------------+-----------------+----------------------------------------\n"); } while (ptr) { - dcb_printf(dcb, "%-18s | %-15s | ", + dcb_printf(dcb, "%-19s | %-15s | ", ptr->name, ptr->module); for (i = 0; ptr->options && ptr->options[i]; i++) dcb_printf(dcb, "%s ", ptr->options[i]); @@ -289,6 +302,17 @@ int i; spinlock_release(&filter->spin); } +/** + * Connect the downstream filter chain for a filter. + * + * This will create the filter instance, loading the filter module, and + * conenct the fitler into the downstream chain. + * + * @param filter The filter to add into the chain + * @param session The client session + * @param downstream The filter downstream of this filter + * @return The downstream component for the next filter + */ DOWNSTREAM * filterApply(FILTER_DEF *filter, SESSION *session, DOWNSTREAM *downstream) { @@ -318,3 +342,42 @@ DOWNSTREAM *me; return me; } + +/** + * Connect a filter in the up stream filter chain for a session + * + * Note, the filter will have been created when the downstream chian was + * previously setup. + * Note all filters require to be in the upstream chain, so this routine + * may skip a filter if it does not provide an upstream interface. + * + * @param filter The fitler to add to the chain + * @param fsession The filter session + * @param upstream The filter that should be upstream of this filter + * @return The upstream component for the next filter + */ +UPSTREAM * +filterUpstream(FILTER_DEF *filter, void *fsession, UPSTREAM *upstream) +{ +UPSTREAM *me; + + /* + * The the filter has no setUpstream entry point then is does + * not require to see results and can be left out of the chain. + */ + if (filter->obj->setUpstream == NULL) + return upstream; + + if (filter->obj->clientReply != NULL) + { + if ((me = (UPSTREAM *)calloc(1, sizeof(UPSTREAM))) == NULL) + { + return NULL; + } + me->instance = filter->filter; + me->session = fsession; + me->clientReply = (void *)(filter->obj->clientReply); + filter->obj->setUpstream(me->instance, me->session, upstream); + } + return me; +} diff --git a/server/core/modutil.c b/server/core/modutil.c index c6bc9bd00..79b9ebad0 100644 --- a/server/core/modutil.c +++ b/server/core/modutil.c @@ -57,9 +57,12 @@ unsigned char *ptr; * This routine is very simplistic and does not deal with SQL text * that spans multiple buffers. * + * The length returned is the complete length of the SQL, which may + * be larger than the amount of data in this packet. + * * @param buf The packet buffer * @param sql Pointer that is set to point at the SQL data - * @param length Length of the SQL data + * @param length Length of the SQL query data * @return True if the packet is a COM_QUERY packet */ int @@ -79,7 +82,54 @@ char *ptr; return 1; } +/** + * Extract the SQL portion of a COM_QUERY packet + * + * NB This sets *sql to point into the packet and does not + * allocate any new storage. The string pointed to by *sql is + * not NULL terminated. + * + * The number of bytes pointed to *sql is returned in *length + * + * The remaining number of bytes required for the complete query string + * are returned in *residual + * + * @param buf The packet buffer + * @param sql Pointer that is set to point at the SQL data + * @param length Length of the SQL query data pointed to by sql + * @param residual Any remain part of the query in future packets + * @return True if the packet is a COM_QUERY packet + */ +int +modutil_MySQL_Query(GWBUF *buf, char **sql, int *length, int *residual) +{ +char *ptr; + if (!modutil_is_SQL(buf)) + return 0; + ptr = GWBUF_DATA(buf); + *residual = *ptr++; + *residual += (*ptr++ << 8); + *residual += (*ptr++ << 8); + ptr += 2; // Skip sequence id and COM_QUERY byte + *residual = *residual - 1; + *length = GWBUF_LENGTH(buf) - 5; + *residual -= *length; + *sql = ptr; + return 1; +} + + + +/** + * Replace the contents of a GWBUF with the new SQL statement passed as a text string. + * The routine takes care of the modification needed to the MySQL packet, + * returning a GWBUF chian that cna be used to send the data to a MySQL server + * + * @param orig The original request in a GWBUF + * @param sql The SQL text to replace in the packet + * @return A newly formed GWBUF containing the MySQL packet. + */ GWBUF * modutil_replace_SQL(GWBUF *orig, char *sql) { diff --git a/server/core/session.c b/server/core/session.c index 797de90ca..80767f9ff 100644 --- a/server/core/session.c +++ b/server/core/session.c @@ -168,6 +168,10 @@ session_alloc(SERVICE *service, DCB *client_dcb) session->head.routeQuery = (void *)(service->router->routeQuery); + session->tail.instance = session; + session->tail.session = session; + session->tail.clientReply = session_reply; + if (service->n_filters > 0) { if (!session_setup_filters(session)) @@ -327,6 +331,12 @@ bool session_free( } if (session->n_filters) { + for (i = 0; i < session->n_filters; i++) + { + session->filters[i].filter->obj->closeSession( + session->filters[i].instance, + session->filters[i].session); + } for (i = 0; i < session->n_filters; i++) { session->filters[i].filter->obj->freeSession( @@ -628,6 +638,7 @@ session_setup_filters(SESSION *session) { SERVICE *service = session->service; DOWNSTREAM *head; +UPSTREAM *tail; int i; if ((session->filters = calloc(service->n_filters, @@ -658,9 +669,54 @@ int i; session->head = *head; } + for (i = 0; i < service->n_filters; i++) + { + if ((tail = filterUpstream(service->filters[i], + session->filters[i].session, + &session->tail)) == NULL) + { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Failed to create filter '%s' for service '%s'.\n", + service->filters[i]->name, + service->name))); + return 0; + } + session->tail = *tail; + } + return 1; } +/** + * Entry point for the final element int he upstream filter, i.e. the writing + * of the data to the client. + * + * @param instance The "instance" data + * @param session The session + * @param data The buffer chain to write + */ +int +session_reply(void *instance, void *session, GWBUF *data) +{ +SESSION *the_session = (SESSION *)session; + + return the_session->client->func.write(the_session->client, data); +} + +/** + * Return the client connection address or name + * + * @param session The session whose client address to return + */ +char * +session_get_remote(SESSION *session) +{ + if (session && session->client) + return session->client->remote; + return NULL; +} + bool session_route_query ( SESSION* ses, GWBUF* buf) @@ -686,4 +742,17 @@ bool session_route_query ( return_succp: return succp; } - + + +/** + * Return the username of the user connected to the client side of the + * session. + * + * @param session The session pointer. + * @return The user name or NULL if it can not be determined. + */ +char * +session_getUser(SESSION *session) +{ + return (session && session->client) ? session->client->user : NULL; +} diff --git a/server/include/buffer.h b/server/include/buffer.h index 7e48052f4..3c4c022e3 100644 --- a/server/include/buffer.h +++ b/server/include/buffer.h @@ -97,9 +97,9 @@ typedef struct gwbuf { #define GWBUF_EMPTY(b) ((b)->start == (b)->end) /*< Consume a number of bytes in the buffer */ -#define GWBUF_CONSUME(b, bytes) (b)->start += bytes +#define GWBUF_CONSUME(b, bytes) (b)->start += (bytes) -#define GWBUF_RTRIM(b, bytes) (b)->end -= bytes +#define GWBUF_RTRIM(b, bytes) (b)->end -= (bytes) #define GWBUF_TYPE(b) (b)->gwbuf_type /*< @@ -110,6 +110,7 @@ extern void gwbuf_free(GWBUF *buf); extern GWBUF *gwbuf_clone(GWBUF *buf); extern GWBUF *gwbuf_append(GWBUF *head, GWBUF *tail); extern GWBUF *gwbuf_consume(GWBUF *head, unsigned int length); +extern GWBUF *gwbuf_trim(GWBUF *head, unsigned int length); extern unsigned int gwbuf_length(GWBUF *head); extern GWBUF *gwbuf_clone_portion(GWBUF *head, size_t offset, size_t len); extern GWBUF *gwbuf_clone_transform(GWBUF *head, gwbuf_type_t type); diff --git a/server/include/dcb.h b/server/include/dcb.h index 99073c1db..816bc7c5b 100644 --- a/server/include/dcb.h +++ b/server/include/dcb.h @@ -207,7 +207,9 @@ typedef struct dcb { #endif int fd; /**< The descriptor */ dcb_state_t state; /**< Current descriptor state */ + int flags; /**< DCB flags */ char *remote; /**< Address of remote end */ + char *user; /**< User name for connection */ struct sockaddr_in ipv4; /**< remote end IPv4 address */ void *protocol; /**< The protocol specific state */ struct session *session; /**< The owning session */ @@ -269,6 +271,7 @@ int dcb_write(DCB *, GWBUF *); DCB *dcb_alloc(dcb_role_t); void dcb_free(DCB *); DCB *dcb_connect(struct server *, struct session *, const char *); +DCB *dcb_clone(DCB *); int dcb_read(DCB *, GWBUF **); int dcb_drain_writeq(DCB *); void dcb_close(DCB *); @@ -293,4 +296,8 @@ bool dcb_set_state( DCB* dcb, dcb_state_t new_state, dcb_state_t* old_state); + + +/* DCB flags values */ +#define DCBF_CLONE 0x0001 /* DCB is a clone */ #endif /* _DCB_H */ diff --git a/server/include/filter.h b/server/include/filter.h index 8ddcfb00d..568df291a 100644 --- a/server/include/filter.h +++ b/server/include/filter.h @@ -61,6 +61,7 @@ typedef struct { * filter pipline * routeQuery Called on each query that requires * routing + * clientReply * diagnostics Called to force the filter to print * diagnostic output * @@ -74,7 +75,9 @@ typedef struct filter_object { void (*closeSession)(FILTER *instance, void *fsession); void (*freeSession)(FILTER *instance, void *fsession); void (*setDownstream)(FILTER *instance, void *fsession, DOWNSTREAM *downstream); + void (*setUpstream)(FILTER *instance, void *fsession, UPSTREAM *downstream); int (*routeQuery)(FILTER *instance, void *fsession, GWBUF *queue); + int (*clientReply)(FILTER *instance, void *fsession, GWBUF *queue); void (*diagnostics)(FILTER *instance, void *fsession, DCB *dcb); } FILTER_OBJECT; @@ -83,7 +86,7 @@ typedef struct filter_object { * is changed these values must be updated in line with the rules in the * file modinfo.h. */ -#define FILTER_VERSION {1, 0, 0} +#define FILTER_VERSION {1, 1, 0} /** * The definition of a filter form the configuration file. * This is basically the link between a plugin to load and the @@ -108,6 +111,8 @@ FILTER_DEF *filter_find(char *); void filterAddOption(FILTER_DEF *, char *); void filterAddParameter(FILTER_DEF *, char *, char *); DOWNSTREAM *filterApply(FILTER_DEF *, SESSION *, DOWNSTREAM *); +UPSTREAM *filterUpstream(FILTER_DEF *, void *, UPSTREAM *); +int filter_standard_parameter(char *); void dprintAllFilters(DCB *); void dprintFilter(DCB *, FILTER_DEF *); void dListFilters(DCB *); diff --git a/server/include/modutil.h b/server/include/modutil.h index 2092ddea5..00336f937 100644 --- a/server/include/modutil.h +++ b/server/include/modutil.h @@ -26,6 +26,7 @@ * * Date Who Description * 04/06/14 Mark Riddoch Initial implementation + * 24/06/14 Mark Riddoch Add modutil_MySQL_Query to enable multipacket queries * * @endverbatim */ @@ -33,5 +34,6 @@ extern int modutil_is_SQL(GWBUF *); extern int modutil_extract_SQL(GWBUF *, char **, int *); +extern int modutil_MySQL_Query(GWBUF *, char **, int *, int *); extern GWBUF *modutil_replace_SQL(GWBUF *, char *); #endif diff --git a/server/include/session.h b/server/include/session.h index a2415c785..cbd43fe40 100644 --- a/server/include/session.h +++ b/server/include/session.h @@ -70,8 +70,8 @@ typedef enum { typedef struct { void *instance; void *session; - int (*routeQuery)(void *instance, - void *router_session, GWBUF *queue); + int (*routeQuery)(void *instance, void *session, + GWBUF *request); } DOWNSTREAM; /** @@ -81,8 +81,9 @@ typedef struct { typedef struct { void *instance; void *session; - int (*write)(void *, void *, GWBUF *); - int (*error)(void *); + int (*clientReply)(void *instance, + void *session, GWBUF *response); + int (*error)(void *instance, void *session, void *); } UPSTREAM; /** @@ -117,6 +118,7 @@ typedef struct session { int n_filters; /**< Number of filter sessions */ SESSION_FILTER *filters; /**< The filters in use within this session */ DOWNSTREAM head; /**< Head of the filter chain */ + UPSTREAM tail; /**< The tail of the filter chain */ struct session *next; /**< Linked list of all sessions */ int refcount; /**< Reference count on the session */ #if defined(SS_DEBUG) @@ -131,13 +133,24 @@ typedef struct session { * the incoming data to the first element in the pipeline of filters and * routers. */ -#define SESSION_ROUTE_QUERY(session, buf) \ - ((session)->head.routeQuery)((session)->head.instance, \ - (session)->head.session, (buf)) +#define SESSION_ROUTE_QUERY(sess, buf) \ + ((sess)->head.routeQuery)((sess)->head.instance, \ + (sess)->head.session, (buf)) +/** + * A convenience macro that can be used by the router modules to route + * the replies to the first element in the pipeline of filters and + * the protocol. + */ +#define SESSION_ROUTE_REPLY(sess, buf) \ + ((sess)->tail.clientReply)((sess)->tail.instance, \ + (sess)->tail.session, (buf)) SESSION *session_alloc(struct service *, struct dcb *); bool session_free(SESSION *); int session_isvalid(SESSION *); +int session_reply(void *inst, void *session, GWBUF *data); +char *session_get_remote(SESSION *); +char *session_getUser(SESSION *); void printAllSessions(); void printSession(SESSION *); void dprintAllSessions(struct dcb *); diff --git a/server/modules/filter/Makefile b/server/modules/filter/Makefile index 14b226b7d..b51a9e671 100644 --- a/server/modules/filter/Makefile +++ b/server/modules/filter/Makefile @@ -38,10 +38,14 @@ QLASRCS=qlafilter.c QLAOBJ=$(QLASRCS:.c=.o) REGEXSRCS=regexfilter.c REGEXOBJ=$(REGEXSRCS:.c=.o) -SRCS=$(TESTSRCS) $(QLASRCS) $(REGEXSRCS) +TOPNSRCS=topfilter.c +TOPNOBJ=$(TOPNSRCS:.c=.o) +TEESRCS=tee.c +TEEOBJ=$(TEESRCS:.c=.o) +SRCS=$(TESTSRCS) $(QLASRCS) $(REGEXSRCS) $(TOPNSRCS) $(TEESRCS) OBJ=$(SRCS:.c=.o) LIBS=$(UTILSPATH)/skygw_utils.o -lssl -llog_manager -MODULES= libtestfilter.so libqlafilter.so libregexfilter.so +MODULES= libtestfilter.so libqlafilter.so libregexfilter.so libtopfilter.so libtee.so all: $(MODULES) @@ -55,6 +59,12 @@ libqlafilter.so: $(QLAOBJ) libregexfilter.so: $(REGEXOBJ) $(CC) $(LDFLAGS) $(REGEXOBJ) $(LIBS) -o $@ +libtopfilter.so: $(TOPNOBJ) + $(CC) $(LDFLAGS) $(TOPNOBJ) $(LIBS) -o $@ + +libtee.so: $(TEEOBJ) + $(CC) $(LDFLAGS) $(TEEOBJ) $(LIBS) -o $@ + .c.o: $(CC) $(CFLAGS) $< -o $@ diff --git a/server/modules/filter/qlafilter.c b/server/modules/filter/qlafilter.c index 520f1e1a9..bc3e4622d 100644 --- a/server/modules/filter/qlafilter.c +++ b/server/modules/filter/qlafilter.c @@ -27,15 +27,26 @@ * A single option may be passed to the filter, this is the name of the * file to which the queries are logged. A serial number is appended to this * name in order that each session logs to a different file. + * + * Date Who Description + * 03/06/2014 Mark Riddoch Initial implementation + * 11/06/2014 Mark Riddoch Addition of source and match parameters + * 19/06/2014 Mark Riddoch Addition of user parameter + * */ #include #include #include #include #include -#include +#include +#include #include #include +#include +#include + +extern int lm_enabled_logfiles_bitmask; MODULE_INFO info = { MODULE_API_FILTER, @@ -44,7 +55,7 @@ MODULE_INFO info = { "A simple query logging filter" }; -static char *version_str = "V1.0.0"; +static char *version_str = "V1.1.1"; /* * The filter entry points @@ -64,7 +75,9 @@ static FILTER_OBJECT MyObject = { closeSession, freeSession, setDownstream, + NULL, // No Upstream requirement routeQuery, + NULL, // No client reply diagnostic, }; @@ -77,13 +90,19 @@ static FILTER_OBJECT MyObject = { * have a nique name. */ typedef struct { - int sessions; - char *filebase; + int sessions; /* The count of sessions */ + char *filebase; /* The filemane base */ + char *source; /* The source of the client connection */ + char *userName; /* The user name to filter on */ + char *match; /* Optional text to match against */ + regex_t re; /* Compiled regex text */ + char *nomatch; /* Optional text to match against for exclusion */ + regex_t nore; /* Compiled regex nomatch text */ } QLA_INSTANCE; /** * The session structure for this QLA filter. - * This stores the downstream filter information, such that the + * This stores the downstream filter information, such that the * filter is able to pass the query on to the next filter (or router) * in the chain. * @@ -92,7 +111,8 @@ typedef struct { typedef struct { DOWNSTREAM down; char *filename; - int fd; + FILE *fp; + int active; } QLA_SESSION; /** @@ -141,6 +161,7 @@ static FILTER * createInstance(char **options, FILTER_PARAMETER **params) { QLA_INSTANCE *my_instance; +int i; if ((my_instance = calloc(1, sizeof(QLA_INSTANCE))) != NULL) { @@ -148,7 +169,71 @@ QLA_INSTANCE *my_instance; my_instance->filebase = strdup(options[0]); else my_instance->filebase = strdup("qla"); + my_instance->source = NULL; + my_instance->userName = NULL; + my_instance->match = NULL; + my_instance->nomatch = NULL; + if (params) + { + for (i = 0; params[i]; i++) + { + if (!strcmp(params[i]->name, "match")) + { + my_instance->match = strdup(params[i]->value); + } + else if (!strcmp(params[i]->name, "exclude")) + { + my_instance->nomatch = strdup(params[i]->value); + } + else if (!strcmp(params[i]->name, "source")) + my_instance->source = strdup(params[i]->value); + else if (!strcmp(params[i]->name, "user")) + my_instance->userName = strdup(params[i]->value); + else if (!strcmp(params[i]->name, "filebase")) + { + if (my_instance->filebase) + free(my_instance->filebase); + my_instance->source = strdup(params[i]->value); + } + else if (!filter_standard_parameter(params[i]->name)) + { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "qlafilter: Unexpected parameter '%s'.\n", + params[i]->name))); + } + } + } my_instance->sessions = 0; + if (my_instance->match && + regcomp(&my_instance->re, my_instance->match, REG_ICASE)) + { + LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, + "qlafilter: Invalid regular expression '%s'" + " for the match parameter.\n", + my_instance->match))); + free(my_instance->match); + free(my_instance->source); + free(my_instance->filebase); + free(my_instance); + return NULL; + } + if (my_instance->nomatch && + regcomp(&my_instance->nore, my_instance->nomatch, + REG_ICASE)) + { + LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, + "qlafilter: Invalid regular expression '%s'" + " for the nomatch paramter.\n", + my_instance->match))); + if (my_instance->match) + regfree(&my_instance->re); + free(my_instance->match); + free(my_instance->source); + free(my_instance->filebase); + free(my_instance); + return NULL; + } } return (FILTER *)my_instance; } @@ -167,6 +252,7 @@ newSession(FILTER *instance, SESSION *session) { QLA_INSTANCE *my_instance = (QLA_INSTANCE *)instance; QLA_SESSION *my_session; +char *remote, *userName; if ((my_session = calloc(1, sizeof(QLA_SESSION))) != NULL) { @@ -177,11 +263,22 @@ QLA_SESSION *my_session; free(my_session); return NULL; } + my_session->active = 1; + if (my_instance->source + && (remote = session_get_remote(session)) != NULL) + { + if (strcmp(remote, my_instance->source)) + my_session->active = 0; + } + userName = session_getUser(session); + if (my_instance->userName && userName && strcmp(userName, + my_instance->userName)) + my_session->active = 0; sprintf(my_session->filename, "%s.%d", my_instance->filebase, my_instance->sessions); my_instance->sessions++; - my_session->fd = open(my_session->filename, - O_WRONLY|O_CREAT|O_TRUNC, 0666); + if (my_session->active) + my_session->fp = fopen(my_session->filename, "w"); } return my_session; @@ -200,7 +297,8 @@ closeSession(FILTER *instance, void *session) { QLA_SESSION *my_session = (QLA_SESSION *)session; - close(my_session->fd); + if (my_session->active && my_session->fp) + fclose(my_session->fp); } /** @@ -248,22 +346,29 @@ QLA_SESSION *my_session = (QLA_SESSION *)session; static int routeQuery(FILTER *instance, void *session, GWBUF *queue) { +QLA_INSTANCE *my_instance = (QLA_INSTANCE *)instance; QLA_SESSION *my_session = (QLA_SESSION *)session; -char *ptr, t_buf[40]; +char *ptr; int length; struct tm t; struct timeval tv; - if (modutil_extract_SQL(queue, &ptr, &length)) + if (my_session->active && modutil_extract_SQL(queue, &ptr, &length)) { - gettimeofday(&tv, NULL); - localtime_r(&tv.tv_sec, &t); - sprintf(t_buf, "%02d:%02d:%02d.%-3d %d/%02d/%d, ", - t.tm_hour, t.tm_min, t.tm_sec, (int)(tv.tv_usec / 1000), - t.tm_mday, t.tm_mon + 1, 1900 + t.tm_year); - write(my_session->fd, t_buf, strlen(t_buf)); - write(my_session->fd, ptr, length); - write(my_session->fd, "\n", 1); + if ((my_instance->match == NULL || + regexec(&my_instance->re, ptr, 0, NULL, 0) == 0) && + (my_instance->nomatch == NULL || + regexec(&my_instance->nore,ptr,0,NULL, 0) != 0)) + { + gettimeofday(&tv, NULL); + localtime_r(&tv.tv_sec, &t); + fprintf(my_session->fp, + "%02d:%02d:%02d.%-3d %d/%02d/%d, ", + t.tm_hour, t.tm_min, t.tm_sec, (int)(tv.tv_usec / 1000), + t.tm_mday, t.tm_mon + 1, 1900 + t.tm_year); + fwrite(ptr, sizeof(char), length, my_session->fp); + fwrite("\n", sizeof(char), 1, my_session->fp); + } } /* Pass the query downstream */ diff --git a/server/modules/filter/regexfilter.c b/server/modules/filter/regexfilter.c index ad773c40c..69c00c930 100644 --- a/server/modules/filter/regexfilter.c +++ b/server/modules/filter/regexfilter.c @@ -19,9 +19,13 @@ #include #include #include +#include +#include #include #include +extern int lm_enabled_logfiles_bitmask; + /** * regexfilter.c - a very simple regular expression rewrite filter. * @@ -29,6 +33,12 @@ * Two parameters should be defined in the filter configuration * match= * replace= + * Two optional parameters + * source= + * user= + * + * Date Who Description + * 19/06/2014 Mark Riddoch Addition of source and user parameters */ MODULE_INFO info = { @@ -38,7 +48,7 @@ MODULE_INFO info = { "A query rewrite filter that uses regular expressions to rewite queries" }; -static char *version_str = "V1.0.0"; +static char *version_str = "V1.1.0"; static FILTER *createInstance(char **options, FILTER_PARAMETER **params); static void *newSession(FILTER *instance, SESSION *session); @@ -56,7 +66,9 @@ static FILTER_OBJECT MyObject = { closeSession, freeSession, setDownstream, + NULL, // No Upstream requirement routeQuery, + NULL, diagnostic, }; @@ -64,6 +76,8 @@ static FILTER_OBJECT MyObject = { * Instance structure */ typedef struct { + char *source; /* Source address to restrict matches */ + char *user; /* User name to restrict matches */ char *match; /* Regular expression to match */ char *replace; /* Replacement text */ regex_t re; /* Compiled regex text */ @@ -73,9 +87,10 @@ typedef struct { * The session structure for this regex filter */ typedef struct { - DOWNSTREAM down; - int no_change; - int replacements; + DOWNSTREAM down; /* The downstream filter */ + int no_change; /* No. of unchanged requests */ + int replacements; /* No. of changed requests */ + int active; /* Is filter active */ } REGEX_SESSION; /** @@ -124,20 +139,54 @@ static FILTER * createInstance(char **options, FILTER_PARAMETER **params) { REGEX_INSTANCE *my_instance; -int i; +int i, cflags = REG_ICASE; if ((my_instance = calloc(1, sizeof(REGEX_INSTANCE))) != NULL) { my_instance->match = NULL; my_instance->replace = NULL; - for (i = 0; params[i]; i++) + for (i = 0; params && params[i]; i++) { if (!strcmp(params[i]->name, "match")) my_instance->match = strdup(params[i]->value); - if (!strcmp(params[i]->name, "replace")) + else if (!strcmp(params[i]->name, "replace")) my_instance->replace = strdup(params[i]->value); + else if (!strcmp(params[i]->name, "source")) + my_instance->source = strdup(params[i]->value); + else if (!strcmp(params[i]->name, "user")) + my_instance->user = strdup(params[i]->value); + else if (!filter_standard_parameter(params[i]->name)) + { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "regexfilter: Unexpected parameter '%s'.\n", + params[i]->name))); + } } + + if (options) + { + for (i = 0; options[i]; i++) + { + if (!strcasecmp(options[i], "ignorecase")) + { + cflags |= REG_ICASE; + } + else if (!strcasecmp(options[i], "case")) + { + cflags &= ~REG_ICASE; + } + else + { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "regexfilter: unsupported option '%s'.\n", + options[i]))); + } + } + } + if (my_instance->match == NULL || my_instance->replace == NULL) { return NULL; @@ -145,6 +194,9 @@ int i; if (regcomp(&my_instance->re, my_instance->match, REG_ICASE)) { + LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, + "regexfilter: Invalid regular expression '%s'.\n", + my_instance->match))); free(my_instance->match); free(my_instance->replace); free(my_instance); @@ -164,12 +216,27 @@ int i; static void * newSession(FILTER *instance, SESSION *session) { +REGEX_INSTANCE *my_instance = (REGEX_INSTANCE *)instance; REGEX_SESSION *my_session; +char *remote, *user; if ((my_session = calloc(1, sizeof(REGEX_SESSION))) != NULL) { my_session->no_change = 0; my_session->replacements = 0; + my_session->active = 1; + if (my_instance->source + && (remote = session_get_remote(session)) != NULL) + { + if (strcmp(remote, my_instance->source)) + my_session->active = 0; + } + + if (my_instance->user && (user = session_getUser(session)) + && strcmp(user, my_instance->user)) + { + my_session->active = 0; + } } return my_session; diff --git a/server/modules/filter/tee.c b/server/modules/filter/tee.c new file mode 100644 index 000000000..211fbbd70 --- /dev/null +++ b/server/modules/filter/tee.c @@ -0,0 +1,445 @@ +/* + * This file is distributed as part of MaxScale by SkySQL. It is free + * software: you can redistribute it and/or modify it under the terms of the + * GNU General Public License as published by the Free Software Foundation, + * version 2. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Copyright SkySQL Ab 2014 + */ + +/** + * @file tee.c A filter that splits the processing pipeline in two + * + * Conditionally duplicate requests and send the duplicates to another service + * within MaxScale. + * + * Parameters + * ========== + * + * service The service to send the duplicates to + * source The source address to match in order to duplicate (optional) + * match A regular expression to match in order to perform duplication + * of the request (optional) + * nomatch A regular expression to match in order to prevent duplication + * of the request (optional) + * user A user name to match against. If present only requests that + * originate from this user will be duplciated (optional) + * + * Revision History + * ================ + * + * Date Who Description + * 20/06/2014 Mark Riddoch Initial implementation + * 24/06/2014 Mark Riddoch Addition of support for multi-packet queries + * + */ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +extern int lm_enabled_logfiles_bitmask; + +MODULE_INFO info = { + MODULE_API_FILTER, + MODULE_ALPHA_RELEASE, + FILTER_VERSION, + "A tee piece in the filter plumbing" +}; + +static char *version_str = "V1.0.0"; + +/* + * The filter entry points + */ +static FILTER *createInstance(char **options, FILTER_PARAMETER **); +static void *newSession(FILTER *instance, SESSION *session); +static void closeSession(FILTER *instance, void *session); +static void freeSession(FILTER *instance, void *session); +static void setDownstream(FILTER *instance, void *fsession, DOWNSTREAM *downstream); +static int routeQuery(FILTER *instance, void *fsession, GWBUF *queue); +static void diagnostic(FILTER *instance, void *fsession, DCB *dcb); + + +static FILTER_OBJECT MyObject = { + createInstance, + newSession, + closeSession, + freeSession, + setDownstream, + NULL, // No Upstream requirement + routeQuery, + NULL, // No client reply + diagnostic, +}; + +/** + * The instance structure for the TEE filter - this holds the configuration + * information for the filter. + */ +typedef struct { + SERVICE *service; /* The service to duplicate requests to */ + char *source; /* The source of the client connection */ + char *userName; /* The user name to filter on */ + char *match; /* Optional text to match against */ + regex_t re; /* Compiled regex text */ + char *nomatch; /* Optional text to match against for exclusion */ + regex_t nore; /* Compiled regex nomatch text */ +} TEE_INSTANCE; + +/** + * The session structure for this TEE filter. + * This stores the downstream filter information, such that the + * filter is able to pass the query on to the next filter (or router) + * in the chain. + * + * It also holds the file descriptor to which queries are written. + */ +typedef struct { + DOWNSTREAM down; /* The downstream filter */ + int active; /* filter is active? */ + DCB *branch_dcb; /* Client DCB for "branch" service */ + SESSION *branch_session;/* The branch service session */ + int n_duped; /* Number of duplicated querise */ + int residual; /* Any outstanding SQL text */ +} TEE_SESSION; + +/** + * Implementation of the mandatory version entry point + * + * @return version string of the module + */ +char * +version() +{ + return version_str; +} + +/** + * The module initialisation routine, called when the module + * is first loaded. + */ +void +ModuleInit() +{ +} + +/** + * The module entry point routine. It is this routine that + * must populate the structure that is referred to as the + * "module object", this is a structure with the set of + * external entry points for this module. + * + * @return The module object + */ +FILTER_OBJECT * +GetModuleObject() +{ + return &MyObject; +} + +/** + * Create an instance of the filter for a particular service + * within MaxScale. + * + * @param options The options for this filter + * + * @return The instance data for this new instance + */ +static FILTER * +createInstance(char **options, FILTER_PARAMETER **params) +{ +TEE_INSTANCE *my_instance; +int i; + + if ((my_instance = calloc(1, sizeof(TEE_INSTANCE))) != NULL) + { + if (options) + { + LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, + "tee: The tee filter has been passed an option, " + "this filter does not support any options.\n"))); + } + my_instance->service = NULL; + my_instance->source = NULL; + my_instance->userName = NULL; + my_instance->match = NULL; + my_instance->nomatch = NULL; + if (params) + { + for (i = 0; params[i]; i++) + { + if (!strcmp(params[i]->name, "service")) + { + if ((my_instance->service = service_find(params[i]->value)) == NULL) + { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "tee: service '%s' " + "not found.\n", + params[i]->value))); + } + } + else if (!strcmp(params[i]->name, "match")) + { + my_instance->match = strdup(params[i]->value); + } + else if (!strcmp(params[i]->name, "exclude")) + { + my_instance->nomatch = strdup(params[i]->value); + } + else if (!strcmp(params[i]->name, "source")) + my_instance->source = strdup(params[i]->value); + else if (!strcmp(params[i]->name, "user")) + my_instance->userName = strdup(params[i]->value); + else if (!filter_standard_parameter(params[i]->name)) + { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "tee: Unexpected parameter '%s'.\n", + params[i]->name))); + } + } + } + if (my_instance->service == NULL) + { + free(my_instance->match); + free(my_instance->source); + free(my_instance); + return NULL; + } + if (my_instance->match && + regcomp(&my_instance->re, my_instance->match, REG_ICASE)) + { + LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, + "tee: Invalid regular expression '%s'" + " for the match parameter.\n", + my_instance->match))); + free(my_instance->match); + free(my_instance->source); + free(my_instance); + return NULL; + } + if (my_instance->nomatch && + regcomp(&my_instance->nore, my_instance->nomatch, + REG_ICASE)) + { + LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, + "tee: Invalid regular expression '%s'" + " for the nomatch paramter.\n", + my_instance->match))); + if (my_instance->match) + regfree(&my_instance->re); + free(my_instance->match); + free(my_instance->source); + free(my_instance); + return NULL; + } + } + return (FILTER *)my_instance; +} + +/** + * Associate a new session with this instance of the filter. + * + * Create the file to log to and open it. + * + * @param instance The filter instance data + * @param session The session itself + * @return Session specific data for this session + */ +static void * +newSession(FILTER *instance, SESSION *session) +{ +TEE_INSTANCE *my_instance = (TEE_INSTANCE *)instance; +TEE_SESSION *my_session; +char *remote, *userName; + + if ((my_session = calloc(1, sizeof(TEE_SESSION))) != NULL) + { + my_session->active = 1; + my_session->residual = 0; + if (my_instance->source + && (remote = session_get_remote(session)) != NULL) + { + if (strcmp(remote, my_instance->source)) + my_session->active = 0; + } + userName = session_getUser(session); + if (my_instance->userName && userName && strcmp(userName, + my_instance->userName)) + my_session->active = 0; + if (my_session->active) + { + my_session->branch_dcb = dcb_clone(session->client); + my_session->branch_session = session_alloc(my_instance->service, my_session->branch_dcb); + } + } + + return my_session; +} + +/** + * Close a session with the filter, this is the mechanism + * by which a filter may cleanup data structure etc. + * In the case of the tee filter we need to close down the + * "branch" session. + * + * @param instance The filter instance data + * @param session The session being closed + */ +static void +closeSession(FILTER *instance, void *session) +{ +TEE_SESSION *my_session = (TEE_SESSION *)session; +ROUTER_OBJECT *router; +void *router_instance, *rsession; +SESSION *bsession; + + if (my_session->active) + { + bsession = my_session->branch_session; + router = bsession->service->router; + router_instance = bsession->service->router_instance; + rsession = bsession->router_session; + /** Close router session and all its connections */ + router->closeSession(router_instance, rsession); + dcb_free(my_session->branch_dcb); + /* No need to free the session, this is done as + * a side effect of closing the client DCB of the + * session. + */ + } +} + +/** + * Free the memory associated with the session + * + * @param instance The filter instance + * @param session The filter session + */ +static void +freeSession(FILTER *instance, void *session) +{ +TEE_SESSION *my_session = (TEE_SESSION *)session; + + free(session); + return; +} + +/** + * Set the downstream filter or router to which queries will be + * passed from this filter. + * + * @param instance The filter instance data + * @param session The filter session + * @param downstream The downstream filter or router. + */ +static void +setDownstream(FILTER *instance, void *session, DOWNSTREAM *downstream) +{ +TEE_SESSION *my_session = (TEE_SESSION *)session; + + my_session->down = *downstream; +} + +/** + * The routeQuery entry point. This is passed the query buffer + * to which the filter should be applied. Once applied the + * query should normally be passed to the downstream component + * (filter or router) in the filter chain. + * + * If my_session->residual is set then duplicate that many bytes + * and send them to the branch. + * + * If my_session->residual is zero then this must be a new request + * Extract the SQL text if possible, match against that text and forward + * the request. If the requets is not contained witin the packet we have + * then set my_session->residual to the number of outstanding bytes + * + * @param instance The filter instance data + * @param session The filter session + * @param queue The query data + */ +static int +routeQuery(FILTER *instance, void *session, GWBUF *queue) +{ +TEE_INSTANCE *my_instance = (TEE_INSTANCE *)instance; +TEE_SESSION *my_session = (TEE_SESSION *)session; +char *ptr; +int length, rval, residual; +GWBUF *clone = NULL; + + if (my_session->residual) + { + clone = gwbuf_clone(queue); + if (my_session->residual < GWBUF_LENGTH(clone)) + GWBUF_RTRIM(clone, GWBUF_LENGTH(clone) - residual); + my_session->residual -= GWBUF_LENGTH(clone); + if (my_session->residual < 0) + my_session->residual = 0; + } + else if (my_session->active && + modutil_MySQL_Query(queue, &ptr, &length, &residual)) + { + if ((my_instance->match == NULL || + regexec(&my_instance->re, ptr, 0, NULL, 0) == 0) && + (my_instance->nomatch == NULL || + regexec(&my_instance->nore,ptr,0,NULL, 0) != 0)) + { + clone = gwbuf_clone(queue); + my_session->residual = residual; + } + } + + /* Pass the query downstream */ + rval = my_session->down.routeQuery(my_session->down.instance, + my_session->down.session, queue); + if (clone) + { + my_session->n_duped++; + SESSION_ROUTE_QUERY(my_session->branch_session, clone); + } + return rval; +} + +/** + * Diagnostics routine + * + * If fsession is NULL then print diagnostics on the filter + * instance as a whole, otherwise print diagnostics for the + * particular session. + * + * @param instance The filter instance + * @param fsession Filter session, may be NULL + * @param dcb The DCB for diagnostic output + */ +static void +diagnostic(FILTER *instance, void *fsession, DCB *dcb) +{ +TEE_SESSION *my_session = (TEE_SESSION *)fsession; + + if (my_session) + { + dcb_printf(dcb, "\t\tNo. of statements duplicated: %d.\n", + my_session->n_duped); + } +} diff --git a/server/modules/filter/testfilter.c b/server/modules/filter/testfilter.c index 270dbd1cb..ba05e535e 100644 --- a/server/modules/filter/testfilter.c +++ b/server/modules/filter/testfilter.c @@ -54,7 +54,9 @@ static FILTER_OBJECT MyObject = { closeSession, freeSession, setDownstream, + NULL, // No upstream requirement routeQuery, + NULL, diagnostic, }; diff --git a/server/modules/filter/topfilter.c b/server/modules/filter/topfilter.c new file mode 100644 index 000000000..cc13ed6c4 --- /dev/null +++ b/server/modules/filter/topfilter.c @@ -0,0 +1,549 @@ +/* + * This file is distributed as part of MaxScale by SkySQL. It is free + * software: you can redistribute it and/or modify it under the terms of the + * GNU General Public License as published by the Free Software Foundation, + * version 2. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Copyright SkySQL Ab 2014 + */ + +/** + * TOPN Filter - Query Log All. A primitive query logging filter, simply + * used to verify the filter mechanism for downstream filters. All queries + * that are passed through the filter will be written to file. + * + * The filter makes no attempt to deal with query packets that do not fit + * in a single GWBUF. + * + * A single option may be passed to the filter, this is the name of the + * file to which the queries are logged. A serial number is appended to this + * name in order that each session logs to a different file. + * + * Date Who Description + * 18/06/2014 Mark Riddoch Addition of source and user filters + */ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +extern int lm_enabled_logfiles_bitmask; + +MODULE_INFO info = { + MODULE_API_FILTER, + MODULE_ALPHA_RELEASE, + FILTER_VERSION, + "A top N query logging filter" +}; + +static char *version_str = "V1.0.1"; + +/* + * The filter entry points + */ +static FILTER *createInstance(char **options, FILTER_PARAMETER **); +static void *newSession(FILTER *instance, SESSION *session); +static void closeSession(FILTER *instance, void *session); +static void freeSession(FILTER *instance, void *session); +static void setDownstream(FILTER *instance, void *fsession, DOWNSTREAM *downstream); +static void setUpstream(FILTER *instance, void *fsession, UPSTREAM *upstream); +static int routeQuery(FILTER *instance, void *fsession, GWBUF *queue); +static int clientReply(FILTER *instance, void *fsession, GWBUF *queue); +static void diagnostic(FILTER *instance, void *fsession, DCB *dcb); + + +static FILTER_OBJECT MyObject = { + createInstance, + newSession, + closeSession, + freeSession, + setDownstream, + setUpstream, + routeQuery, + clientReply, + diagnostic, +}; + +/** + * A instance structure, the assumption is that the option passed + * to the filter is simply a base for the filename to which the queries + * are logged. + * + * To this base a session number is attached such that each session will + * have a unique name. + */ +typedef struct { + int sessions; /* Session count */ + int topN; /* Number of queries to store */ + char *filebase; /* Base of fielname to log into */ + char *source; /* The source of the client connection */ + char *user; /* A user name to filter on */ + char *match; /* Optional text to match against */ + regex_t re; /* Compiled regex text */ + char *exclude; /* Optional text to match against for exclusion */ + regex_t exre; /* Compiled regex nomatch text */ +} TOPN_INSTANCE; + +/** + * Structure to hold the Top N queries + */ +typedef struct topnq { + struct timeval duration; + char *sql; +} TOPNQ; + +/** + * The session structure for this TOPN filter. + * This stores the downstream filter information, such that the + * filter is able to pass the query on to the next filter (or router) + * in the chain. + * + * It also holds the file descriptor to which queries are written. + */ +typedef struct { + DOWNSTREAM down; + UPSTREAM up; + int active; + char *clientHost; + char *userName; + char *filename; + int fd; + struct timeval start; + char *current; + TOPNQ **top; + int n_statements; + struct timeval total; + struct timeval connect; + struct timeval disconnect; +} TOPN_SESSION; + +/** + * Implementation of the mandatory version entry point + * + * @return version string of the module + */ +char * +version() +{ + return version_str; +} + +/** + * The module initialisation routine, called when the module + * is first loaded. + */ +void +ModuleInit() +{ +} + +/** + * The module entry point routine. It is this routine that + * must populate the structure that is referred to as the + * "module object", this is a structure with the set of + * external entry points for this module. + * + * @return The module object + */ +FILTER_OBJECT * +GetModuleObject() +{ + return &MyObject; +} + +/** + * Create an instance of the filter for a particular service + * within MaxScale. + * + * @param options The options for this filter + * + * @return The instance data for this new instance + */ +static FILTER * +createInstance(char **options, FILTER_PARAMETER **params) +{ +int i; +TOPN_INSTANCE *my_instance; + + if ((my_instance = calloc(1, sizeof(TOPN_INSTANCE))) != NULL) + { + my_instance->topN = 10; + my_instance->match = NULL; + my_instance->exclude = NULL; + my_instance->source = NULL; + my_instance->user = NULL; + my_instance->filebase = strdup("top"); + for (i = 0; params && params[i]; i++) + { + if (!strcmp(params[i]->name, "count")) + my_instance->topN = atoi(params[i]->value); + else if (!strcmp(params[i]->name, "filebase")) + { + free(my_instance->filebase); + my_instance->filebase = strdup(params[i]->value); + } + else if (!strcmp(params[i]->name, "match")) + { + my_instance->match = strdup(params[i]->value); + } + else if (!strcmp(params[i]->name, "exclude")) + { + my_instance->exclude = strdup(params[i]->value); + } + else if (!strcmp(params[i]->name, "source")) + my_instance->source = strdup(params[i]->value); + else if (!strcmp(params[i]->name, "user")) + my_instance->user = strdup(params[i]->value); + else if (!filter_standard_parameter(params[i]->name)) + { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "topfilter: Unexpected parameter '%s'.\n", + params[i]->name))); + } + } + if (options) + { + LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, + "topfilter: Options are not supported by this " + " filter. They will be ignored\n"))); + } + my_instance->sessions = 0; + if (my_instance->match && + regcomp(&my_instance->re, my_instance->match, REG_ICASE)) + { + LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, + "topfilter: Invalid regular expression '%s'" + " for the match parameter.\n", + my_instance->match))); + free(my_instance->match); + free(my_instance->source); + free(my_instance->user); + free(my_instance->filebase); + free(my_instance); + return NULL; + } + if (my_instance->exclude && + regcomp(&my_instance->exre, my_instance->exclude, + REG_ICASE)) + { + LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, + "qlafilter: Invalid regular expression '%s'" + " for the nomatch paramter.\n", + my_instance->match))); + regfree(&my_instance->re); + free(my_instance->match); + free(my_instance->source); + free(my_instance->user); + free(my_instance->filebase); + free(my_instance); + return NULL; + } + } + return (FILTER *)my_instance; +} + +/** + * Associate a new session with this instance of the filter. + * + * Create the file to log to and open it. + * + * @param instance The filter instance data + * @param session The session itself + * @return Session specific data for this session + */ +static void * +newSession(FILTER *instance, SESSION *session) +{ +TOPN_INSTANCE *my_instance = (TOPN_INSTANCE *)instance; +TOPN_SESSION *my_session; +int i; +char *remote, *user; + + if ((my_session = calloc(1, sizeof(TOPN_SESSION))) != NULL) + { + if ((my_session->filename = + (char *)malloc(strlen(my_instance->filebase) + 20)) + == NULL) + { + free(my_session); + return NULL; + } + sprintf(my_session->filename, "%s.%d", my_instance->filebase, + my_instance->sessions); + my_instance->sessions++; + my_session->top = (TOPNQ **)calloc(my_instance->topN + 1, + sizeof(TOPNQ *)); + for (i = 0; i < my_instance->topN; i++) + { + my_session->top[i] = (TOPNQ *)calloc(1, sizeof(TOPNQ)); + my_session->top[i]->sql = NULL; + } + my_session->n_statements = 0; + my_session->total.tv_sec = 0; + my_session->total.tv_usec = 0; + my_session->current = NULL; + if ((remote = session_get_remote(session)) != NULL) + my_session->clientHost = strdup(remote); + else + my_session->clientHost = NULL; + if ((user = session_getUser(session)) != NULL) + my_session->userName = strdup(user); + else + my_session->userName = NULL; + my_session->active = 1; + if (my_instance->source && strcmp(my_session->clientHost, + my_instance->source)) + my_session->active = 0; + if (my_instance->user && strcmp(my_session->userName, + my_instance->user)) + my_session->active = 0; + + sprintf(my_session->filename, "%s.%d", my_instance->filebase, + my_instance->sessions); + gettimeofday(&my_session->connect, NULL); + } + + return my_session; +} + +/** + * Close a session with the filter, this is the mechanism + * by which a filter may cleanup data structure etc. + * In the case of the TOPN filter we simple close the file descriptor. + * + * @param instance The filter instance data + * @param session The session being closed + */ +static void +closeSession(FILTER *instance, void *session) +{ +TOPN_INSTANCE *my_instance = (TOPN_INSTANCE *)instance; +TOPN_SESSION *my_session = (TOPN_SESSION *)session; +struct timeval diff; +int i; +FILE *fp; + + gettimeofday(&my_session->disconnect, NULL); + timersub((&my_session->disconnect), &(my_session->connect), &diff); + if ((fp = fopen(my_session->filename, "w")) != NULL) + { + fprintf(fp, "Top %d longest running queries in session.\n", + my_instance->topN); + fprintf(fp, "==========================================\n\n"); + fprintf(fp, "Time (sec) | Query\n"); + fprintf(fp, "-----------+-----------------------------------------------------------------\n"); + for (i = 0; i < my_instance->topN; i++) + { + if (my_session->top[i]->sql) + { + fprintf(fp, "%10.3f | %s\n", + (double)((my_session->top[i]->duration.tv_sec * 1000) + + (my_session->top[i]->duration.tv_usec / 1000)) / 1000, + my_session->top[i]->sql); + } + } + fprintf(fp, "-----------+-----------------------------------------------------------------\n"); + fprintf(fp, "\n\nSession started %s", + asctime(localtime(&my_session->connect.tv_sec))); + if (my_session->clientHost) + fprintf(fp, "Connection from %s\n", + my_session->clientHost); + if (my_session->userName) + fprintf(fp, "Username %s\n", + my_session->userName); + fprintf(fp, "\nTotal of %d statements executed.\n", + my_session->n_statements); + fprintf(fp, "Total statement execution time %5d.%d seconds\n", + (int)my_session->total.tv_sec, + (int)my_session->total.tv_usec / 1000); + fprintf(fp, "Average statement execution time %9.3f seconds\n", + (double)((my_session->total.tv_sec * 1000) + + (my_session->total.tv_usec / 1000)) + / (1000 * my_session->n_statements)); + fprintf(fp, "Total connection time %5d.%d seconds\n", + (int)diff.tv_sec, (int)diff.tv_usec / 1000); + fclose(fp); + } +} + +/** + * Free the memory associated with the session + * + * @param instance The filter instance + * @param session The filter session + */ +static void +freeSession(FILTER *instance, void *session) +{ +TOPN_SESSION *my_session = (TOPN_SESSION *)session; + + free(my_session->filename); + free(session); + return; +} + +/** + * Set the downstream filter or router to which queries will be + * passed from this filter. + * + * @param instance The filter instance data + * @param session The filter session + * @param downstream The downstream filter or router. + */ +static void +setDownstream(FILTER *instance, void *session, DOWNSTREAM *downstream) +{ +TOPN_SESSION *my_session = (TOPN_SESSION *)session; + + my_session->down = *downstream; +} + +/** + * Set the upstream filter or session to which results will be + * passed from this filter. + * + * @param instance The filter instance data + * @param session The filter session + * @param upstream The upstream filter or session. + */ +static void +setUpstream(FILTER *instance, void *session, UPSTREAM *upstream) +{ +TOPN_SESSION *my_session = (TOPN_SESSION *)session; + + my_session->up = *upstream; +} + +/** + * The routeQuery entry point. This is passed the query buffer + * to which the filter should be applied. Once applied the + * query should normally be passed to the downstream component + * (filter or router) in the filter chain. + * + * @param instance The filter instance data + * @param session The filter session + * @param queue The query data + */ +static int +routeQuery(FILTER *instance, void *session, GWBUF *queue) +{ +TOPN_INSTANCE *my_instance = (TOPN_INSTANCE *)instance; +TOPN_SESSION *my_session = (TOPN_SESSION *)session; +char *ptr; +int length; + + if (my_session->active && modutil_extract_SQL(queue, &ptr, &length)) + { + if ((my_instance->match == NULL || + regexec(&my_instance->re, ptr, 0, NULL, 0) == 0) && + (my_instance->exclude == NULL || + regexec(&my_instance->exre,ptr,0,NULL, 0) != 0)) + { + my_session->n_statements++; + if (my_session->current) + free(my_session->current); + gettimeofday(&my_session->start, NULL); + my_session->current = strndup(ptr, length); + } + } + + /* Pass the query downstream */ + return my_session->down.routeQuery(my_session->down.instance, + my_session->down.session, queue); +} + +static int +cmp_topn(TOPNQ **a, TOPNQ **b) +{ + if ((*b)->duration.tv_sec == (*a)->duration.tv_sec) + return (*b)->duration.tv_usec - (*a)->duration.tv_usec; + return (*b)->duration.tv_sec - (*a)->duration.tv_sec; +} + +static int +clientReply(FILTER *instance, void *session, GWBUF *reply) +{ +TOPN_INSTANCE *my_instance = (TOPN_INSTANCE *)instance; +TOPN_SESSION *my_session = (TOPN_SESSION *)session; +struct timeval tv, diff; +int i, inserted; + + if (my_session->current) + { + gettimeofday(&tv, NULL); + timersub(&tv, &(my_session->start), &diff); + + timeradd(&(my_session->total), &diff, &(my_session->total)); + + inserted = 0; + for (i = 0; i < my_instance->topN; i++) + { + if (my_session->top[i]->sql == NULL) + { + my_session->top[i]->sql = my_session->current; + my_session->top[i]->duration = diff; + inserted = 1; + break; + } + } + + if (inserted == 0 && ((diff.tv_sec > my_session->top[my_instance->topN-1]->duration.tv_sec) || (diff.tv_sec == my_session->top[my_instance->topN-1]->duration.tv_sec && diff.tv_usec > my_session->top[my_instance->topN-1]->duration.tv_usec ))) + { + free(my_session->top[my_instance->topN-1]->sql); + my_session->top[my_instance->topN-1]->sql = my_session->current; + my_session->top[my_instance->topN-1]->duration = diff; + inserted = 1; + } + + if (inserted) + qsort(my_session->top, my_instance->topN, + sizeof(TOPNQ *), cmp_topn); + else + free(my_session->current); + my_session->current = NULL; + } + + /* Pass the result upstream */ + return my_session->up.clientReply(my_session->up.instance, + my_session->up.session, reply); +} + +/** + * Diagnostics routine + * + * If fsession is NULL then print diagnostics on the filter + * instance as a whole, otherwise print diagnostics for the + * particular session. + * + * @param instance The filter instance + * @param fsession Filter session, may be NULL + * @param dcb The DCB for diagnostic output + */ +static void +diagnostic(FILTER *instance, void *fsession, DCB *dcb) +{ +TOPN_SESSION *my_session = (TOPN_SESSION *)fsession; + + if (my_session) + { + dcb_printf(dcb, "\t\tLogging to file %s.\n", + my_session->filename); + } +} diff --git a/server/modules/monitor/mysql_mon.c b/server/modules/monitor/mysql_mon.c index 2fdc6a97e..38625a6ed 100644 --- a/server/modules/monitor/mysql_mon.c +++ b/server/modules/monitor/mysql_mon.c @@ -548,7 +548,7 @@ static int conn_err_count; } mysql_free_result(result); - if (isslave == i) + if (isslave > 0 && isslave == i) isslave = 1; else isslave = 0; @@ -797,4 +797,4 @@ static bool mon_print_fail_status( succp = false; } return succp; -} \ No newline at end of file +} diff --git a/server/modules/protocol/maxscaled.c b/server/modules/protocol/maxscaled.c index f26a3d17a..0c7b4b744 100644 --- a/server/modules/protocol/maxscaled.c +++ b/server/modules/protocol/maxscaled.c @@ -87,8 +87,6 @@ static GWPROTOCOL MyObject = { NULL /**< Session */ }; -static void maxscaled_command(DCB *, unsigned char *cmd); - /** * Implementation of the mandatory version entry point * diff --git a/server/modules/protocol/mysql_backend.c b/server/modules/protocol/mysql_backend.c index 047e43e5f..cdcdf2c05 100644 --- a/server/modules/protocol/mysql_backend.c +++ b/server/modules/protocol/mysql_backend.c @@ -341,6 +341,8 @@ static int gw_read_backend_event(DCB *dcb) { GWBUF* errbuf; bool succp; + /* try reload users' table for next connection */ + service_refresh_users(dcb->session->service); #if defined(SS_DEBUG) LOGIF(LD, (skygw_log_write( LOGFILE_DEBUG, diff --git a/server/modules/protocol/mysql_client.c b/server/modules/protocol/mysql_client.c index f2942abcf..08c648d39 100644 --- a/server/modules/protocol/mysql_client.c +++ b/server/modules/protocol/mysql_client.c @@ -483,6 +483,11 @@ static int gw_mysql_do_authentication(DCB *dcb, GWBUF *queue) { if (auth_token) free(auth_token); + if (auth_ret == 0) + { + dcb->user = strdup(client_data->user); + } + return auth_ret; } diff --git a/server/modules/protocol/telnetd.c b/server/modules/protocol/telnetd.c index e8e8ec7c0..b5ae23c1a 100644 --- a/server/modules/protocol/telnetd.c +++ b/server/modules/protocol/telnetd.c @@ -83,17 +83,17 @@ static int telnetd_listen(DCB *dcb, char *config); * The "module object" for the telnetd protocol module. */ static GWPROTOCOL MyObject = { - telnetd_read_event, /**< Read - EPOLLIN handler */ - telnetd_write, /**< Write - data from gateway */ - telnetd_write_event, /**< WriteReady - EPOLLOUT handler */ - telnetd_error, /**< Error - EPOLLERR handler */ - telnetd_hangup, /**< HangUp - EPOLLHUP handler */ - telnetd_accept, /**< Accept */ - NULL, /**< Connect */ - telnetd_close, /**< Close */ - telnetd_listen, /**< Create a listener */ - NULL, /**< Authentication */ - NULL /**< Session */ + telnetd_read_event, /**< Read - EPOLLIN handler */ + telnetd_write, /**< Write - data from gateway */ + telnetd_write_event, /**< WriteReady - EPOLLOUT handler */ + telnetd_error, /**< Error - EPOLLERR handler */ + telnetd_hangup, /**< HangUp - EPOLLHUP handler */ + telnetd_accept, /**< Accept */ + NULL, /**< Connect */ + telnetd_close, /**< Close */ + telnetd_listen, /**< Create a listener */ + NULL, /**< Authentication */ + NULL /**< Session */ }; static void telnetd_command(DCB *, unsigned char *cmd); diff --git a/server/modules/routing/readconnroute.c b/server/modules/routing/readconnroute.c index a36180b30..c38d41199 100644 --- a/server/modules/routing/readconnroute.c +++ b/server/modules/routing/readconnroute.c @@ -689,7 +689,7 @@ clientReply( ss_dassert(client != NULL); - client->func.write(client, queue); + SESSION_ROUTE_REPLY(backend_dcb->session, queue); } /** diff --git a/server/modules/routing/readwritesplit/readwritesplit.c b/server/modules/routing/readwritesplit/readwritesplit.c index ea6624dbf..d00d8d7df 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.c +++ b/server/modules/routing/readwritesplit/readwritesplit.c @@ -1415,7 +1415,7 @@ static void clientReply ( if (writebuf != NULL && client_dcb != NULL) { /** Write reply to client DCB */ - client_dcb->func.write(client_dcb, writebuf); + SESSION_ROUTE_REPLY(backend_dcb->session, writebuf); bref_clear_state(bref, BREF_WAITING_RESULT); } /** Unlock router session */