Support for multipacket statements
Remove double free in cloned DCB's
This commit is contained in:
@ -31,6 +31,7 @@
|
|||||||
* 10/06/13 Mark Riddoch Initial implementation
|
* 10/06/13 Mark Riddoch Initial implementation
|
||||||
* 11/07/13 Mark Riddoch Add reference count mechanism
|
* 11/07/13 Mark Riddoch Add reference count mechanism
|
||||||
* 16/07/2013 Massimiliano Pinto Added command type to gwbuf struct
|
* 16/07/2013 Massimiliano Pinto Added command type to gwbuf struct
|
||||||
|
* 24/06/2014 Mark Riddoch Addition of gwbuf_trim
|
||||||
*
|
*
|
||||||
* @endverbatim
|
* @endverbatim
|
||||||
*/
|
*/
|
||||||
@ -297,6 +298,26 @@ int rval = 0;
|
|||||||
return rval;
|
return rval;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Trim bytes form the end of a GWBUF structure
|
||||||
|
*
|
||||||
|
* @param buf The buffer to trim
|
||||||
|
* @param nbytes The number of bytes to trim off
|
||||||
|
* @return The buffer chain
|
||||||
|
*/
|
||||||
|
GWBUF *
|
||||||
|
gwbuf_trim(GWBUF *buf, unsigned int n_bytes)
|
||||||
|
{
|
||||||
|
if (GWBUF_LENGTH(buf) <= n_bytes)
|
||||||
|
{
|
||||||
|
gwbuf_consume(buf, GWBUF_LENGTH(buf));
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
buf->end -= n_bytes;
|
||||||
|
|
||||||
|
return buf;
|
||||||
|
}
|
||||||
|
|
||||||
bool gwbuf_set_type(
|
bool gwbuf_set_type(
|
||||||
GWBUF* buf,
|
GWBUF* buf,
|
||||||
gwbuf_type_t type)
|
gwbuf_type_t type)
|
||||||
|
@ -347,7 +347,7 @@ DCB_CALLBACK *cb;
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (dcb->protocol != NULL)
|
if (dcb->protocol && ((dcb->flags & DCBF_CLONE) ==0))
|
||||||
free(dcb->protocol);
|
free(dcb->protocol);
|
||||||
if (dcb->data && ((dcb->flags & DCBF_CLONE) ==0))
|
if (dcb->data && ((dcb->flags & DCBF_CLONE) ==0))
|
||||||
free(dcb->data);
|
free(dcb->data);
|
||||||
|
@ -57,9 +57,12 @@ unsigned char *ptr;
|
|||||||
* This routine is very simplistic and does not deal with SQL text
|
* This routine is very simplistic and does not deal with SQL text
|
||||||
* that spans multiple buffers.
|
* that spans multiple buffers.
|
||||||
*
|
*
|
||||||
|
* The length returned is the complete length of the SQL, which may
|
||||||
|
* be larger than the amount of data in this packet.
|
||||||
|
*
|
||||||
* @param buf The packet buffer
|
* @param buf The packet buffer
|
||||||
* @param sql Pointer that is set to point at the SQL data
|
* @param sql Pointer that is set to point at the SQL data
|
||||||
* @param length Length of the SQL data
|
* @param length Length of the SQL query data
|
||||||
* @return True if the packet is a COM_QUERY packet
|
* @return True if the packet is a COM_QUERY packet
|
||||||
*/
|
*/
|
||||||
int
|
int
|
||||||
@ -79,7 +82,54 @@ char *ptr;
|
|||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Extract the SQL portion of a COM_QUERY packet
|
||||||
|
*
|
||||||
|
* NB This sets *sql to point into the packet and does not
|
||||||
|
* allocate any new storage. The string pointed to by *sql is
|
||||||
|
* not NULL terminated.
|
||||||
|
*
|
||||||
|
* The number of bytes pointed to *sql is returned in *length
|
||||||
|
*
|
||||||
|
* The remaining number of bytes required for the complete query string
|
||||||
|
* are returned in *residual
|
||||||
|
*
|
||||||
|
* @param buf The packet buffer
|
||||||
|
* @param sql Pointer that is set to point at the SQL data
|
||||||
|
* @param length Length of the SQL query data pointed to by sql
|
||||||
|
* @param residual Any remain part of the query in future packets
|
||||||
|
* @return True if the packet is a COM_QUERY packet
|
||||||
|
*/
|
||||||
|
int
|
||||||
|
modutil_MySQL_Query(GWBUF *buf, char **sql, int *length, int *residual)
|
||||||
|
{
|
||||||
|
char *ptr;
|
||||||
|
|
||||||
|
if (!modutil_is_SQL(buf))
|
||||||
|
return 0;
|
||||||
|
ptr = GWBUF_DATA(buf);
|
||||||
|
*residual = *ptr++;
|
||||||
|
*residual += (*ptr++ << 8);
|
||||||
|
*residual += (*ptr++ << 8);
|
||||||
|
ptr += 2; // Skip sequence id and COM_QUERY byte
|
||||||
|
*residual = *residual - 1;
|
||||||
|
*length = GWBUF_LENGTH(buf) - 5;
|
||||||
|
*residual -= *length;
|
||||||
|
*sql = ptr;
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Replace the contents of a GWBUF with the new SQL statement passed as a text string.
|
||||||
|
* The routine takes care of the modification needed to the MySQL packet,
|
||||||
|
* returning a GWBUF chian that cna be used to send the data to a MySQL server
|
||||||
|
*
|
||||||
|
* @param orig The original request in a GWBUF
|
||||||
|
* @param sql The SQL text to replace in the packet
|
||||||
|
* @return A newly formed GWBUF containing the MySQL packet.
|
||||||
|
*/
|
||||||
GWBUF *
|
GWBUF *
|
||||||
modutil_replace_SQL(GWBUF *orig, char *sql)
|
modutil_replace_SQL(GWBUF *orig, char *sql)
|
||||||
{
|
{
|
||||||
|
@ -94,9 +94,9 @@ typedef struct gwbuf {
|
|||||||
#define GWBUF_EMPTY(b) ((b)->start == (b)->end)
|
#define GWBUF_EMPTY(b) ((b)->start == (b)->end)
|
||||||
|
|
||||||
/*< Consume a number of bytes in the buffer */
|
/*< Consume a number of bytes in the buffer */
|
||||||
#define GWBUF_CONSUME(b, bytes) (b)->start += bytes
|
#define GWBUF_CONSUME(b, bytes) (b)->start += (bytes)
|
||||||
|
|
||||||
#define GWBUF_RTRIM(b, bytes) (b)->end -= bytes
|
#define GWBUF_RTRIM(b, bytes) (b)->end -= (bytes)
|
||||||
|
|
||||||
#define GWBUF_TYPE(b) (b)->gwbuf_type
|
#define GWBUF_TYPE(b) (b)->gwbuf_type
|
||||||
/*<
|
/*<
|
||||||
@ -107,6 +107,7 @@ extern void gwbuf_free(GWBUF *buf);
|
|||||||
extern GWBUF *gwbuf_clone(GWBUF *buf);
|
extern GWBUF *gwbuf_clone(GWBUF *buf);
|
||||||
extern GWBUF *gwbuf_append(GWBUF *head, GWBUF *tail);
|
extern GWBUF *gwbuf_append(GWBUF *head, GWBUF *tail);
|
||||||
extern GWBUF *gwbuf_consume(GWBUF *head, unsigned int length);
|
extern GWBUF *gwbuf_consume(GWBUF *head, unsigned int length);
|
||||||
|
extern GWBUF *gwbuf_trim(GWBUF *head, unsigned int length);
|
||||||
extern unsigned int gwbuf_length(GWBUF *head);
|
extern unsigned int gwbuf_length(GWBUF *head);
|
||||||
extern GWBUF *gwbuf_clone_portion(GWBUF *head, size_t offset, size_t len);
|
extern GWBUF *gwbuf_clone_portion(GWBUF *head, size_t offset, size_t len);
|
||||||
extern GWBUF *gwbuf_clone_transform(GWBUF *head, gwbuf_type_t type);
|
extern GWBUF *gwbuf_clone_transform(GWBUF *head, gwbuf_type_t type);
|
||||||
|
@ -26,6 +26,7 @@
|
|||||||
*
|
*
|
||||||
* Date Who Description
|
* Date Who Description
|
||||||
* 04/06/14 Mark Riddoch Initial implementation
|
* 04/06/14 Mark Riddoch Initial implementation
|
||||||
|
* 24/06/14 Mark Riddoch Add modutil_MySQL_Query to enable multipacket queries
|
||||||
*
|
*
|
||||||
* @endverbatim
|
* @endverbatim
|
||||||
*/
|
*/
|
||||||
@ -33,5 +34,6 @@
|
|||||||
|
|
||||||
extern int modutil_is_SQL(GWBUF *);
|
extern int modutil_is_SQL(GWBUF *);
|
||||||
extern int modutil_extract_SQL(GWBUF *, char **, int *);
|
extern int modutil_extract_SQL(GWBUF *, char **, int *);
|
||||||
|
extern int modutil_MySQL_Query(GWBUF *, char **, int *, int *);
|
||||||
extern GWBUF *modutil_replace_SQL(GWBUF *, char *);
|
extern GWBUF *modutil_replace_SQL(GWBUF *, char *);
|
||||||
#endif
|
#endif
|
||||||
|
@ -39,6 +39,7 @@
|
|||||||
*
|
*
|
||||||
* Date Who Description
|
* Date Who Description
|
||||||
* 20/06/2014 Mark Riddoch Initial implementation
|
* 20/06/2014 Mark Riddoch Initial implementation
|
||||||
|
* 24/06/2014 Mark Riddoch Addition of support for multi-packet queries
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
#include <stdio.h>
|
#include <stdio.h>
|
||||||
@ -113,11 +114,12 @@ typedef struct {
|
|||||||
* It also holds the file descriptor to which queries are written.
|
* It also holds the file descriptor to which queries are written.
|
||||||
*/
|
*/
|
||||||
typedef struct {
|
typedef struct {
|
||||||
DOWNSTREAM down;
|
DOWNSTREAM down; /* The downstream filter */
|
||||||
int active;
|
int active; /* filter is active? */
|
||||||
DCB *branch_dcb;
|
DCB *branch_dcb; /* Client DCB for "branch" service */
|
||||||
SESSION *branch_session;
|
SESSION *branch_session;/* The branch service session */
|
||||||
int n_duped;
|
int n_duped; /* Number of duplicated querise */
|
||||||
|
int residual; /* Any outstanding SQL text */
|
||||||
} TEE_SESSION;
|
} TEE_SESSION;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -274,6 +276,7 @@ char *remote, *userName;
|
|||||||
if ((my_session = calloc(1, sizeof(TEE_SESSION))) != NULL)
|
if ((my_session = calloc(1, sizeof(TEE_SESSION))) != NULL)
|
||||||
{
|
{
|
||||||
my_session->active = 1;
|
my_session->active = 1;
|
||||||
|
my_session->residual = 0;
|
||||||
if (my_instance->source
|
if (my_instance->source
|
||||||
&& (remote = session_get_remote(session)) != NULL)
|
&& (remote = session_get_remote(session)) != NULL)
|
||||||
{
|
{
|
||||||
@ -321,7 +324,7 @@ SESSION *bsession;
|
|||||||
router->closeSession(router_instance, rsession);
|
router->closeSession(router_instance, rsession);
|
||||||
dcb_free(my_session->branch_dcb);
|
dcb_free(my_session->branch_dcb);
|
||||||
/* No need to free the session, this is done as
|
/* No need to free the session, this is done as
|
||||||
* a side effect of closign the client DCB of the
|
* a side effect of closing the client DCB of the
|
||||||
* session.
|
* session.
|
||||||
*/
|
*/
|
||||||
}
|
}
|
||||||
@ -364,6 +367,14 @@ TEE_SESSION *my_session = (TEE_SESSION *)session;
|
|||||||
* query should normally be passed to the downstream component
|
* query should normally be passed to the downstream component
|
||||||
* (filter or router) in the filter chain.
|
* (filter or router) in the filter chain.
|
||||||
*
|
*
|
||||||
|
* If my_session->residual is set then duplicate that many bytes
|
||||||
|
* and send them to the branch.
|
||||||
|
*
|
||||||
|
* If my_session->residual is zero then this must be a new request
|
||||||
|
* Extract the SQL text if possible, match against that text and forward
|
||||||
|
* the request. If the requets is not contained witin the packet we have
|
||||||
|
* then set my_session->residual to the number of outstanding bytes
|
||||||
|
*
|
||||||
* @param instance The filter instance data
|
* @param instance The filter instance data
|
||||||
* @param session The filter session
|
* @param session The filter session
|
||||||
* @param queue The query data
|
* @param queue The query data
|
||||||
@ -374,10 +385,20 @@ routeQuery(FILTER *instance, void *session, GWBUF *queue)
|
|||||||
TEE_INSTANCE *my_instance = (TEE_INSTANCE *)instance;
|
TEE_INSTANCE *my_instance = (TEE_INSTANCE *)instance;
|
||||||
TEE_SESSION *my_session = (TEE_SESSION *)session;
|
TEE_SESSION *my_session = (TEE_SESSION *)session;
|
||||||
char *ptr;
|
char *ptr;
|
||||||
int length, rval;
|
int length, rval, residual;
|
||||||
GWBUF *clone = NULL;
|
GWBUF *clone = NULL;
|
||||||
|
|
||||||
if (my_session->active && modutil_extract_SQL(queue, &ptr, &length))
|
if (my_session->residual)
|
||||||
|
{
|
||||||
|
clone = gwbuf_clone(queue);
|
||||||
|
if (my_session->residual < GWBUF_LENGTH(clone))
|
||||||
|
GWBUF_RTRIM(clone, GWBUF_LENGTH(clone) - residual);
|
||||||
|
my_session->residual -= GWBUF_LENGTH(clone);
|
||||||
|
if (my_session->residual < 0)
|
||||||
|
my_session->residual = 0;
|
||||||
|
}
|
||||||
|
else if (my_session->active &&
|
||||||
|
modutil_MySQL_Query(queue, &ptr, &length, &residual))
|
||||||
{
|
{
|
||||||
if ((my_instance->match == NULL ||
|
if ((my_instance->match == NULL ||
|
||||||
regexec(&my_instance->re, ptr, 0, NULL, 0) == 0) &&
|
regexec(&my_instance->re, ptr, 0, NULL, 0) == 0) &&
|
||||||
@ -385,6 +406,7 @@ GWBUF *clone = NULL;
|
|||||||
regexec(&my_instance->nore,ptr,0,NULL, 0) != 0))
|
regexec(&my_instance->nore,ptr,0,NULL, 0) != 0))
|
||||||
{
|
{
|
||||||
clone = gwbuf_clone(queue);
|
clone = gwbuf_clone(queue);
|
||||||
|
my_session->residual = residual;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user