diff --git a/server/core/buffer.c b/server/core/buffer.c index db6da8bec..dd5e4bde3 100644 --- a/server/core/buffer.c +++ b/server/core/buffer.c @@ -31,6 +31,7 @@ * 10/06/13 Mark Riddoch Initial implementation * 11/07/13 Mark Riddoch Add reference count mechanism * 16/07/2013 Massimiliano Pinto Added command type to gwbuf struct + * 24/06/2014 Mark Riddoch Addition of gwbuf_trim * * @endverbatim */ @@ -297,6 +298,26 @@ int rval = 0; return rval; } +/** + * Trim bytes form the end of a GWBUF structure + * + * @param buf The buffer to trim + * @param nbytes The number of bytes to trim off + * @return The buffer chain + */ +GWBUF * +gwbuf_trim(GWBUF *buf, unsigned int n_bytes) +{ + if (GWBUF_LENGTH(buf) <= n_bytes) + { + gwbuf_consume(buf, GWBUF_LENGTH(buf)); + return NULL; + } + buf->end -= n_bytes; + + return buf; +} + bool gwbuf_set_type( GWBUF* buf, gwbuf_type_t type) diff --git a/server/core/dcb.c b/server/core/dcb.c index f9f9eafbe..4a83dc3b0 100644 --- a/server/core/dcb.c +++ b/server/core/dcb.c @@ -347,7 +347,7 @@ DCB_CALLBACK *cb; } } - if (dcb->protocol != NULL) + if (dcb->protocol && ((dcb->flags & DCBF_CLONE) ==0)) free(dcb->protocol); if (dcb->data && ((dcb->flags & DCBF_CLONE) ==0)) free(dcb->data); diff --git a/server/core/modutil.c b/server/core/modutil.c index c6bc9bd00..79b9ebad0 100644 --- a/server/core/modutil.c +++ b/server/core/modutil.c @@ -57,9 +57,12 @@ unsigned char *ptr; * This routine is very simplistic and does not deal with SQL text * that spans multiple buffers. * + * The length returned is the complete length of the SQL, which may + * be larger than the amount of data in this packet. + * * @param buf The packet buffer * @param sql Pointer that is set to point at the SQL data - * @param length Length of the SQL data + * @param length Length of the SQL query data * @return True if the packet is a COM_QUERY packet */ int @@ -79,7 +82,54 @@ char *ptr; return 1; } +/** + * Extract the SQL portion of a COM_QUERY packet + * + * NB This sets *sql to point into the packet and does not + * allocate any new storage. The string pointed to by *sql is + * not NULL terminated. + * + * The number of bytes pointed to *sql is returned in *length + * + * The remaining number of bytes required for the complete query string + * are returned in *residual + * + * @param buf The packet buffer + * @param sql Pointer that is set to point at the SQL data + * @param length Length of the SQL query data pointed to by sql + * @param residual Any remain part of the query in future packets + * @return True if the packet is a COM_QUERY packet + */ +int +modutil_MySQL_Query(GWBUF *buf, char **sql, int *length, int *residual) +{ +char *ptr; + if (!modutil_is_SQL(buf)) + return 0; + ptr = GWBUF_DATA(buf); + *residual = *ptr++; + *residual += (*ptr++ << 8); + *residual += (*ptr++ << 8); + ptr += 2; // Skip sequence id and COM_QUERY byte + *residual = *residual - 1; + *length = GWBUF_LENGTH(buf) - 5; + *residual -= *length; + *sql = ptr; + return 1; +} + + + +/** + * Replace the contents of a GWBUF with the new SQL statement passed as a text string. + * The routine takes care of the modification needed to the MySQL packet, + * returning a GWBUF chian that cna be used to send the data to a MySQL server + * + * @param orig The original request in a GWBUF + * @param sql The SQL text to replace in the packet + * @return A newly formed GWBUF containing the MySQL packet. + */ GWBUF * modutil_replace_SQL(GWBUF *orig, char *sql) { diff --git a/server/include/buffer.h b/server/include/buffer.h index 66c56322b..4070a4539 100644 --- a/server/include/buffer.h +++ b/server/include/buffer.h @@ -94,9 +94,9 @@ typedef struct gwbuf { #define GWBUF_EMPTY(b) ((b)->start == (b)->end) /*< Consume a number of bytes in the buffer */ -#define GWBUF_CONSUME(b, bytes) (b)->start += bytes +#define GWBUF_CONSUME(b, bytes) (b)->start += (bytes) -#define GWBUF_RTRIM(b, bytes) (b)->end -= bytes +#define GWBUF_RTRIM(b, bytes) (b)->end -= (bytes) #define GWBUF_TYPE(b) (b)->gwbuf_type /*< @@ -107,6 +107,7 @@ extern void gwbuf_free(GWBUF *buf); extern GWBUF *gwbuf_clone(GWBUF *buf); extern GWBUF *gwbuf_append(GWBUF *head, GWBUF *tail); extern GWBUF *gwbuf_consume(GWBUF *head, unsigned int length); +extern GWBUF *gwbuf_trim(GWBUF *head, unsigned int length); extern unsigned int gwbuf_length(GWBUF *head); extern GWBUF *gwbuf_clone_portion(GWBUF *head, size_t offset, size_t len); extern GWBUF *gwbuf_clone_transform(GWBUF *head, gwbuf_type_t type); diff --git a/server/include/modutil.h b/server/include/modutil.h index 2092ddea5..00336f937 100644 --- a/server/include/modutil.h +++ b/server/include/modutil.h @@ -26,6 +26,7 @@ * * Date Who Description * 04/06/14 Mark Riddoch Initial implementation + * 24/06/14 Mark Riddoch Add modutil_MySQL_Query to enable multipacket queries * * @endverbatim */ @@ -33,5 +34,6 @@ extern int modutil_is_SQL(GWBUF *); extern int modutil_extract_SQL(GWBUF *, char **, int *); +extern int modutil_MySQL_Query(GWBUF *, char **, int *, int *); extern GWBUF *modutil_replace_SQL(GWBUF *, char *); #endif diff --git a/server/modules/filter/tee.c b/server/modules/filter/tee.c index 908117acf..211fbbd70 100644 --- a/server/modules/filter/tee.c +++ b/server/modules/filter/tee.c @@ -39,6 +39,7 @@ * * Date Who Description * 20/06/2014 Mark Riddoch Initial implementation + * 24/06/2014 Mark Riddoch Addition of support for multi-packet queries * */ #include @@ -113,11 +114,12 @@ typedef struct { * It also holds the file descriptor to which queries are written. */ typedef struct { - DOWNSTREAM down; - int active; - DCB *branch_dcb; - SESSION *branch_session; - int n_duped; + DOWNSTREAM down; /* The downstream filter */ + int active; /* filter is active? */ + DCB *branch_dcb; /* Client DCB for "branch" service */ + SESSION *branch_session;/* The branch service session */ + int n_duped; /* Number of duplicated querise */ + int residual; /* Any outstanding SQL text */ } TEE_SESSION; /** @@ -274,6 +276,7 @@ char *remote, *userName; if ((my_session = calloc(1, sizeof(TEE_SESSION))) != NULL) { my_session->active = 1; + my_session->residual = 0; if (my_instance->source && (remote = session_get_remote(session)) != NULL) { @@ -321,7 +324,7 @@ SESSION *bsession; router->closeSession(router_instance, rsession); dcb_free(my_session->branch_dcb); /* No need to free the session, this is done as - * a side effect of closign the client DCB of the + * a side effect of closing the client DCB of the * session. */ } @@ -364,6 +367,14 @@ TEE_SESSION *my_session = (TEE_SESSION *)session; * query should normally be passed to the downstream component * (filter or router) in the filter chain. * + * If my_session->residual is set then duplicate that many bytes + * and send them to the branch. + * + * If my_session->residual is zero then this must be a new request + * Extract the SQL text if possible, match against that text and forward + * the request. If the requets is not contained witin the packet we have + * then set my_session->residual to the number of outstanding bytes + * * @param instance The filter instance data * @param session The filter session * @param queue The query data @@ -374,10 +385,20 @@ routeQuery(FILTER *instance, void *session, GWBUF *queue) TEE_INSTANCE *my_instance = (TEE_INSTANCE *)instance; TEE_SESSION *my_session = (TEE_SESSION *)session; char *ptr; -int length, rval; +int length, rval, residual; GWBUF *clone = NULL; - if (my_session->active && modutil_extract_SQL(queue, &ptr, &length)) + if (my_session->residual) + { + clone = gwbuf_clone(queue); + if (my_session->residual < GWBUF_LENGTH(clone)) + GWBUF_RTRIM(clone, GWBUF_LENGTH(clone) - residual); + my_session->residual -= GWBUF_LENGTH(clone); + if (my_session->residual < 0) + my_session->residual = 0; + } + else if (my_session->active && + modutil_MySQL_Query(queue, &ptr, &length, &residual)) { if ((my_instance->match == NULL || regexec(&my_instance->re, ptr, 0, NULL, 0) == 0) && @@ -385,6 +406,7 @@ GWBUF *clone = NULL; regexec(&my_instance->nore,ptr,0,NULL, 0) != 0)) { clone = gwbuf_clone(queue); + my_session->residual = residual; } }