Merge branch 'develop' into MAX-99
Conflicts: server/modules/routing/readwritesplit/readwritesplit.c
This commit is contained in:
@ -31,6 +31,7 @@
|
||||
* 10/06/13 Mark Riddoch Initial implementation
|
||||
* 11/07/13 Mark Riddoch Add reference count mechanism
|
||||
* 16/07/2013 Massimiliano Pinto Added command type to gwbuf struct
|
||||
* 24/06/2014 Mark Riddoch Addition of gwbuf_trim
|
||||
*
|
||||
* @endverbatim
|
||||
*/
|
||||
@ -295,6 +296,26 @@ int rval = 0;
|
||||
return rval;
|
||||
}
|
||||
|
||||
/**
|
||||
* Trim bytes form the end of a GWBUF structure
|
||||
*
|
||||
* @param buf The buffer to trim
|
||||
* @param nbytes The number of bytes to trim off
|
||||
* @return The buffer chain
|
||||
*/
|
||||
GWBUF *
|
||||
gwbuf_trim(GWBUF *buf, unsigned int n_bytes)
|
||||
{
|
||||
if (GWBUF_LENGTH(buf) <= n_bytes)
|
||||
{
|
||||
gwbuf_consume(buf, GWBUF_LENGTH(buf));
|
||||
return NULL;
|
||||
}
|
||||
buf->end -= n_bytes;
|
||||
|
||||
return buf;
|
||||
}
|
||||
|
||||
bool gwbuf_set_type(
|
||||
GWBUF* buf,
|
||||
gwbuf_type_t type)
|
||||
|
||||
@ -1090,7 +1090,7 @@ SERVER *server;
|
||||
s = strtok(NULL, ",");
|
||||
}
|
||||
}
|
||||
if (filters)
|
||||
if (filters && obj->element)
|
||||
serviceSetFilters(obj->element, filters);
|
||||
}
|
||||
else if (!strcmp(type, "listener"))
|
||||
|
||||
@ -48,6 +48,7 @@
|
||||
* This fixes a bug with many reads from
|
||||
* backend
|
||||
* 07/05/2014 Mark Riddoch Addition of callback mechanism
|
||||
* 20/06/2014 Mark Riddoch Addition of dcb_clone
|
||||
*
|
||||
* @endverbatim
|
||||
*/
|
||||
@ -84,6 +85,9 @@ static bool dcb_set_state_nomutex(
|
||||
dcb_state_t* old_state);
|
||||
static void dcb_call_callback(DCB *dcb, DCB_REASON reason);
|
||||
static DCB* dcb_get_next (DCB* dcb);
|
||||
static int dcb_null_write(DCB *dcb, GWBUF *buf);
|
||||
static int dcb_null_close(DCB *dcb);
|
||||
static int dcb_null_auth(DCB *dcb, SERVER *server, SESSION *session, GWBUF *buf);
|
||||
|
||||
DCB* dcb_get_zombies(void)
|
||||
{
|
||||
@ -134,6 +138,10 @@ DCB *rval;
|
||||
rval->next = NULL;
|
||||
rval->callbacks = NULL;
|
||||
|
||||
rval->remote = NULL;
|
||||
rval->user = NULL;
|
||||
rval->flags = 0;
|
||||
|
||||
spinlock_acquire(&dcbspin);
|
||||
if (allDCBs == NULL)
|
||||
allDCBs = rval;
|
||||
@ -245,7 +253,39 @@ dcb_add_to_zombieslist(DCB *dcb)
|
||||
spinlock_release(&zombiespin);
|
||||
}
|
||||
|
||||
/*
|
||||
* Clone a DCB for internal use, mostly used for specialist filters
|
||||
* to create dummy clients based on real clients.
|
||||
*
|
||||
* @param orig The DCB to clone
|
||||
* @return A DCB that can be used as a client
|
||||
*/
|
||||
DCB *
|
||||
dcb_clone(DCB *orig)
|
||||
{
|
||||
DCB *clone;
|
||||
|
||||
if ((clone = dcb_alloc(DCB_ROLE_REQUEST_HANDLER)) == NULL)
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
|
||||
clone->fd = -1;
|
||||
clone->flags |= DCBF_CLONE;
|
||||
clone->state = orig->state;
|
||||
clone->data = orig->data;
|
||||
if (orig->remote)
|
||||
clone->remote = strdup(orig->remote);
|
||||
if (orig->user)
|
||||
clone->user = strdup(orig->user);
|
||||
clone->protocol = orig->protocol;
|
||||
|
||||
clone->func.write = dcb_null_write;
|
||||
clone->func.close = dcb_null_close;
|
||||
clone->func.auth = dcb_null_auth;
|
||||
|
||||
return clone;
|
||||
}
|
||||
|
||||
/**
|
||||
* Free a DCB and remove it from the chain of all DCBs
|
||||
@ -308,12 +348,14 @@ DCB_CALLBACK *cb;
|
||||
}
|
||||
}
|
||||
|
||||
if (dcb->protocol != NULL)
|
||||
if (dcb->protocol && ((dcb->flags & DCBF_CLONE) ==0))
|
||||
free(dcb->protocol);
|
||||
if (dcb->data)
|
||||
if (dcb->data && ((dcb->flags & DCBF_CLONE) ==0))
|
||||
free(dcb->data);
|
||||
if (dcb->remote)
|
||||
free(dcb->remote);
|
||||
if (dcb->user)
|
||||
free(dcb->user);
|
||||
|
||||
/* Clear write and read buffers */
|
||||
if (dcb->delayq) {
|
||||
@ -1119,6 +1161,8 @@ printDCB(DCB *dcb)
|
||||
printf("\tDCB state: %s\n", gw_dcb_state2string(dcb->state));
|
||||
if (dcb->remote)
|
||||
printf("\tConnected to: %s\n", dcb->remote);
|
||||
if (dcb->user)
|
||||
printf("\tUsername to: %s\n", dcb->user);
|
||||
if (dcb->writeq)
|
||||
printf("\tQueued write data: %d\n",gwbuf_length(dcb->writeq));
|
||||
printf("\tStatistics:\n");
|
||||
@ -1176,6 +1220,9 @@ DCB *dcb;
|
||||
if (dcb->remote)
|
||||
dcb_printf(pdcb, "\tConnected to: %s\n",
|
||||
dcb->remote);
|
||||
if (dcb->user)
|
||||
dcb_printf(pdcb, "\tUsername: %s\n",
|
||||
dcb->user);
|
||||
if (dcb->writeq)
|
||||
dcb_printf(pdcb, "\tQueued write data: %d\n",
|
||||
gwbuf_length(dcb->writeq));
|
||||
@ -1186,6 +1233,8 @@ DCB *dcb;
|
||||
dcb_printf(pdcb, "\t\tNo. of Accepts: %d\n", dcb->stats.n_accepts);
|
||||
dcb_printf(pdcb, "\t\tNo. of High Water Events: %d\n", dcb->stats.n_high_water);
|
||||
dcb_printf(pdcb, "\t\tNo. of Low Water Events: %d\n", dcb->stats.n_low_water);
|
||||
if (dcb->flags & DCBF_CLONE)
|
||||
dcb_printf(pdcb, "\t\tDCB is a clone.\n");
|
||||
dcb = dcb->next;
|
||||
}
|
||||
spinlock_release(&dcbspin);
|
||||
@ -1250,6 +1299,8 @@ dprintDCB(DCB *pdcb, DCB *dcb)
|
||||
dcb->stats.n_high_water);
|
||||
dcb_printf(pdcb, "\t\tNo. of Low Water Events: %d\n",
|
||||
dcb->stats.n_low_water);
|
||||
if (dcb->flags & DCBF_CLONE)
|
||||
dcb_printf(pdcb, "\t\tDCB is a clone.\n");
|
||||
}
|
||||
|
||||
/**
|
||||
@ -1280,7 +1331,7 @@ gw_dcb_state2string (int state) {
|
||||
}
|
||||
|
||||
/**
|
||||
* A DCB based wrapper for printf. Allows formattign printing to
|
||||
* A DCB based wrapper for printf. Allows formatting printing to
|
||||
* a descritor control block.
|
||||
*
|
||||
* @param dcb Descriptor to write to
|
||||
@ -1817,4 +1868,47 @@ void dcb_call_foreach (
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Null protocol write routine used for cloned dcb's. It merely consumes
|
||||
* buffers written on the cloned DCB.
|
||||
*
|
||||
* @params dcb The descriptor control block
|
||||
* @params buf The buffer beign written
|
||||
* @return Always returns a good write operation result
|
||||
*/
|
||||
static int
|
||||
dcb_null_write(DCB *dcb, GWBUF *buf)
|
||||
{
|
||||
while (buf)
|
||||
{
|
||||
buf = gwbuf_consume(buf, GWBUF_LENGTH(buf));
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
|
||||
/**
|
||||
* Null protocol close operation for use by cloned DCB's.
|
||||
*
|
||||
* @param dcb The DCB being closed.
|
||||
*/
|
||||
static int
|
||||
dcb_null_close(DCB *dcb)
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Null protocol auth operation for use by cloned DCB's.
|
||||
*
|
||||
* @param dcb The DCB being closed.
|
||||
* @param server The server to auth against
|
||||
* @param session The user session
|
||||
* @param buf The buffer with the new auth request
|
||||
*/
|
||||
static int
|
||||
dcb_null_auth(DCB *dcb, SERVER *server, SESSION *session, GWBUF *buf)
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -135,6 +135,19 @@ FILTER_DEF *filter;
|
||||
return filter;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check a parameter to see if it is a standard filter parameter
|
||||
*
|
||||
* @param name Parameter name to check
|
||||
*/
|
||||
int
|
||||
filter_standard_parameter(char *name)
|
||||
{
|
||||
if (strcmp(name, "type") == 0 || strcmp(name, "module") == 0)
|
||||
return 1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Print all filters to a DCB
|
||||
*
|
||||
@ -209,13 +222,13 @@ int i;
|
||||
{
|
||||
dcb_printf(dcb, "Filters\n");
|
||||
dcb_printf(dcb, "--------------------+-----------------+----------------------------------------\n");
|
||||
dcb_printf(dcb, "%-18s | %-15s | Options\n",
|
||||
dcb_printf(dcb, "%-19s | %-15s | Options\n",
|
||||
"Filter", "Module");
|
||||
dcb_printf(dcb, "--------------------+-----------------+----------------------------------------\n");
|
||||
}
|
||||
while (ptr)
|
||||
{
|
||||
dcb_printf(dcb, "%-18s | %-15s | ",
|
||||
dcb_printf(dcb, "%-19s | %-15s | ",
|
||||
ptr->name, ptr->module);
|
||||
for (i = 0; ptr->options && ptr->options[i]; i++)
|
||||
dcb_printf(dcb, "%s ", ptr->options[i]);
|
||||
@ -289,6 +302,17 @@ int i;
|
||||
spinlock_release(&filter->spin);
|
||||
}
|
||||
|
||||
/**
|
||||
* Connect the downstream filter chain for a filter.
|
||||
*
|
||||
* This will create the filter instance, loading the filter module, and
|
||||
* conenct the fitler into the downstream chain.
|
||||
*
|
||||
* @param filter The filter to add into the chain
|
||||
* @param session The client session
|
||||
* @param downstream The filter downstream of this filter
|
||||
* @return The downstream component for the next filter
|
||||
*/
|
||||
DOWNSTREAM *
|
||||
filterApply(FILTER_DEF *filter, SESSION *session, DOWNSTREAM *downstream)
|
||||
{
|
||||
@ -318,3 +342,42 @@ DOWNSTREAM *me;
|
||||
|
||||
return me;
|
||||
}
|
||||
|
||||
/**
|
||||
* Connect a filter in the up stream filter chain for a session
|
||||
*
|
||||
* Note, the filter will have been created when the downstream chian was
|
||||
* previously setup.
|
||||
* Note all filters require to be in the upstream chain, so this routine
|
||||
* may skip a filter if it does not provide an upstream interface.
|
||||
*
|
||||
* @param filter The fitler to add to the chain
|
||||
* @param fsession The filter session
|
||||
* @param upstream The filter that should be upstream of this filter
|
||||
* @return The upstream component for the next filter
|
||||
*/
|
||||
UPSTREAM *
|
||||
filterUpstream(FILTER_DEF *filter, void *fsession, UPSTREAM *upstream)
|
||||
{
|
||||
UPSTREAM *me;
|
||||
|
||||
/*
|
||||
* The the filter has no setUpstream entry point then is does
|
||||
* not require to see results and can be left out of the chain.
|
||||
*/
|
||||
if (filter->obj->setUpstream == NULL)
|
||||
return upstream;
|
||||
|
||||
if (filter->obj->clientReply != NULL)
|
||||
{
|
||||
if ((me = (UPSTREAM *)calloc(1, sizeof(UPSTREAM))) == NULL)
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
me->instance = filter->filter;
|
||||
me->session = fsession;
|
||||
me->clientReply = (void *)(filter->obj->clientReply);
|
||||
filter->obj->setUpstream(me->instance, me->session, upstream);
|
||||
}
|
||||
return me;
|
||||
}
|
||||
|
||||
@ -57,9 +57,12 @@ unsigned char *ptr;
|
||||
* This routine is very simplistic and does not deal with SQL text
|
||||
* that spans multiple buffers.
|
||||
*
|
||||
* The length returned is the complete length of the SQL, which may
|
||||
* be larger than the amount of data in this packet.
|
||||
*
|
||||
* @param buf The packet buffer
|
||||
* @param sql Pointer that is set to point at the SQL data
|
||||
* @param length Length of the SQL data
|
||||
* @param length Length of the SQL query data
|
||||
* @return True if the packet is a COM_QUERY packet
|
||||
*/
|
||||
int
|
||||
@ -79,7 +82,54 @@ char *ptr;
|
||||
return 1;
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract the SQL portion of a COM_QUERY packet
|
||||
*
|
||||
* NB This sets *sql to point into the packet and does not
|
||||
* allocate any new storage. The string pointed to by *sql is
|
||||
* not NULL terminated.
|
||||
*
|
||||
* The number of bytes pointed to *sql is returned in *length
|
||||
*
|
||||
* The remaining number of bytes required for the complete query string
|
||||
* are returned in *residual
|
||||
*
|
||||
* @param buf The packet buffer
|
||||
* @param sql Pointer that is set to point at the SQL data
|
||||
* @param length Length of the SQL query data pointed to by sql
|
||||
* @param residual Any remain part of the query in future packets
|
||||
* @return True if the packet is a COM_QUERY packet
|
||||
*/
|
||||
int
|
||||
modutil_MySQL_Query(GWBUF *buf, char **sql, int *length, int *residual)
|
||||
{
|
||||
char *ptr;
|
||||
|
||||
if (!modutil_is_SQL(buf))
|
||||
return 0;
|
||||
ptr = GWBUF_DATA(buf);
|
||||
*residual = *ptr++;
|
||||
*residual += (*ptr++ << 8);
|
||||
*residual += (*ptr++ << 8);
|
||||
ptr += 2; // Skip sequence id and COM_QUERY byte
|
||||
*residual = *residual - 1;
|
||||
*length = GWBUF_LENGTH(buf) - 5;
|
||||
*residual -= *length;
|
||||
*sql = ptr;
|
||||
return 1;
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Replace the contents of a GWBUF with the new SQL statement passed as a text string.
|
||||
* The routine takes care of the modification needed to the MySQL packet,
|
||||
* returning a GWBUF chian that cna be used to send the data to a MySQL server
|
||||
*
|
||||
* @param orig The original request in a GWBUF
|
||||
* @param sql The SQL text to replace in the packet
|
||||
* @return A newly formed GWBUF containing the MySQL packet.
|
||||
*/
|
||||
GWBUF *
|
||||
modutil_replace_SQL(GWBUF *orig, char *sql)
|
||||
{
|
||||
|
||||
@ -168,6 +168,10 @@ session_alloc(SERVICE *service, DCB *client_dcb)
|
||||
|
||||
session->head.routeQuery = (void *)(service->router->routeQuery);
|
||||
|
||||
session->tail.instance = session;
|
||||
session->tail.session = session;
|
||||
session->tail.clientReply = session_reply;
|
||||
|
||||
if (service->n_filters > 0)
|
||||
{
|
||||
if (!session_setup_filters(session))
|
||||
@ -327,6 +331,12 @@ bool session_free(
|
||||
}
|
||||
if (session->n_filters)
|
||||
{
|
||||
for (i = 0; i < session->n_filters; i++)
|
||||
{
|
||||
session->filters[i].filter->obj->closeSession(
|
||||
session->filters[i].instance,
|
||||
session->filters[i].session);
|
||||
}
|
||||
for (i = 0; i < session->n_filters; i++)
|
||||
{
|
||||
session->filters[i].filter->obj->freeSession(
|
||||
@ -628,6 +638,7 @@ session_setup_filters(SESSION *session)
|
||||
{
|
||||
SERVICE *service = session->service;
|
||||
DOWNSTREAM *head;
|
||||
UPSTREAM *tail;
|
||||
int i;
|
||||
|
||||
if ((session->filters = calloc(service->n_filters,
|
||||
@ -658,9 +669,54 @@ int i;
|
||||
session->head = *head;
|
||||
}
|
||||
|
||||
for (i = 0; i < service->n_filters; i++)
|
||||
{
|
||||
if ((tail = filterUpstream(service->filters[i],
|
||||
session->filters[i].session,
|
||||
&session->tail)) == NULL)
|
||||
{
|
||||
LOGIF(LE, (skygw_log_write_flush(
|
||||
LOGFILE_ERROR,
|
||||
"Failed to create filter '%s' for service '%s'.\n",
|
||||
service->filters[i]->name,
|
||||
service->name)));
|
||||
return 0;
|
||||
}
|
||||
session->tail = *tail;
|
||||
}
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
/**
|
||||
* Entry point for the final element int he upstream filter, i.e. the writing
|
||||
* of the data to the client.
|
||||
*
|
||||
* @param instance The "instance" data
|
||||
* @param session The session
|
||||
* @param data The buffer chain to write
|
||||
*/
|
||||
int
|
||||
session_reply(void *instance, void *session, GWBUF *data)
|
||||
{
|
||||
SESSION *the_session = (SESSION *)session;
|
||||
|
||||
return the_session->client->func.write(the_session->client, data);
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the client connection address or name
|
||||
*
|
||||
* @param session The session whose client address to return
|
||||
*/
|
||||
char *
|
||||
session_get_remote(SESSION *session)
|
||||
{
|
||||
if (session && session->client)
|
||||
return session->client->remote;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
bool session_route_query (
|
||||
SESSION* ses,
|
||||
GWBUF* buf)
|
||||
@ -686,4 +742,17 @@ bool session_route_query (
|
||||
return_succp:
|
||||
return succp;
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Return the username of the user connected to the client side of the
|
||||
* session.
|
||||
*
|
||||
* @param session The session pointer.
|
||||
* @return The user name or NULL if it can not be determined.
|
||||
*/
|
||||
char *
|
||||
session_getUser(SESSION *session)
|
||||
{
|
||||
return (session && session->client) ? session->client->user : NULL;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user