Formatted tee filter

Tee filter formatted according to the style guide.
This commit is contained in:
Markus Makela
2015-11-18 14:18:00 +02:00
parent e24504c427
commit a7c0952e66

View File

@ -45,6 +45,7 @@
* *
* @endverbatim * @endverbatim
*/ */
#include <stdio.h> #include <stdio.h>
#include <fcntl.h> #include <fcntl.h>
#include <filter.h> #include <filter.h>
@ -83,7 +84,8 @@
static int debug_seq = 0; static int debug_seq = 0;
#endif #endif
static unsigned char required_packets[] = { static unsigned char required_packets[] =
{
MYSQL_COM_QUIT, MYSQL_COM_QUIT,
MYSQL_COM_INITDB, MYSQL_COM_INITDB,
MYSQL_COM_FIELD_LIST, MYSQL_COM_FIELD_LIST,
@ -94,9 +96,11 @@ static unsigned char required_packets[] = {
MYSQL_COM_STMT_CLOSE, MYSQL_COM_STMT_CLOSE,
MYSQL_COM_STMT_RESET, MYSQL_COM_STMT_RESET,
MYSQL_COM_CONNECT, MYSQL_COM_CONNECT,
0 }; 0
};
MODULE_INFO info = { MODULE_INFO info =
{
MODULE_API_FILTER, MODULE_API_FILTER,
MODULE_GA, MODULE_GA,
FILTER_VERSION, FILTER_VERSION,
@ -118,7 +122,8 @@ static int routeQuery(FILTER *instance, void *fsession, GWBUF *queue);
static int clientReply(FILTER *instance, void *fsession, GWBUF *queue); static int clientReply(FILTER *instance, void *fsession, GWBUF *queue);
static void diagnostic(FILTER *instance, void *fsession, DCB *dcb); static void diagnostic(FILTER *instance, void *fsession, DCB *dcb);
static FILTER_OBJECT MyObject = { static FILTER_OBJECT MyObject =
{
createInstance, createInstance,
newSession, newSession,
closeSession, closeSession,
@ -134,7 +139,8 @@ static FILTER_OBJECT MyObject = {
* The instance structure for the TEE filter - this holds the configuration * The instance structure for the TEE filter - this holds the configuration
* information for the filter. * information for the filter.
*/ */
typedef struct { typedef struct
{
SERVICE *service; /* The service to duplicate requests to */ SERVICE *service; /* The service to duplicate requests to */
char *source; /* The source of the client connection */ char *source; /* The source of the client connection */
char *userName; /* The user name to filter on */ char *userName; /* The user name to filter on */
@ -152,10 +158,10 @@ typedef struct {
* *
* It also holds the file descriptor to which queries are written. * It also holds the file descriptor to which queries are written.
*/ */
typedef struct { typedef struct
{
DOWNSTREAM down; /* The downstream filter */ DOWNSTREAM down; /* The downstream filter */
UPSTREAM up; /* The upstream filter */ UPSTREAM up; /* The upstream filter */
FILTER_DEF* dummy_filterdef; FILTER_DEF* dummy_filterdef;
int active; /* filter is active? */ int active; /* filter is active? */
bool use_ok; bool use_ok;
@ -167,7 +173,7 @@ typedef struct {
int replies[2]; /* Number of queries received */ int replies[2]; /* Number of queries received */
int reply_packets[2]; /* Number of OK, ERR, LOCAL_INFILE_REQUEST or RESULT_SET packets received */ int reply_packets[2]; /* Number of OK, ERR, LOCAL_INFILE_REQUEST or RESULT_SET packets received */
DCB *branch_dcb; /* Client DCB for "branch" service */ DCB *branch_dcb; /* Client DCB for "branch" service */
SESSION *branch_session;/* The branch service session */ SESSION *branch_session; /* The branch service session */
TEE_INSTANCE *instance; TEE_INSTANCE *instance;
int n_duped; /* Number of duplicated queries */ int n_duped; /* Number of duplicated queries */
int n_rejected; /* Number of rejected queries */ int n_rejected; /* Number of rejected queries */
@ -188,7 +194,7 @@ typedef struct orphan_session_tt
SESSION* session; /*< The child branch session whose parent was freed before SESSION* session; /*< The child branch session whose parent was freed before
* the child session was in a suitable state. */ * the child session was in a suitable state. */
struct orphan_session_tt* next; struct orphan_session_tt* next;
}orphan_session_t; } orphan_session_t;
#ifdef SS_DEBUG #ifdef SS_DEBUG
static SPINLOCK debug_lock; static SPINLOCK debug_lock;
@ -217,13 +223,11 @@ orphan_free(void* data)
#ifdef SS_DEBUG #ifdef SS_DEBUG
int o_stopping = 0, o_ready = 0, o_freed = 0; int o_stopping = 0, o_ready = 0, o_freed = 0;
#endif #endif
while(ptr) while (ptr)
{ {
if(ptr->session->state == SESSION_STATE_TO_BE_FREED) if (ptr->session->state == SESSION_STATE_TO_BE_FREED)
{ {
if (ptr == allOrphans)
if(ptr == allOrphans)
{ {
tmp = ptr; tmp = ptr;
allOrphans = ptr->next; allOrphans = ptr->next;
@ -231,38 +235,39 @@ orphan_free(void* data)
else else
{ {
tmp = allOrphans; tmp = allOrphans;
while(tmp && tmp->next != ptr) while (tmp && tmp->next != ptr)
{
tmp = tmp->next; tmp = tmp->next;
if(tmp) }
if (tmp)
{ {
tmp->next = ptr->next; tmp->next = ptr->next;
tmp = ptr; tmp = ptr;
} }
} }
} }
/* /*
* The session has been unlinked from all the DCBs and it is ready to be freed. * The session has been unlinked from all the DCBs and it is ready to be freed.
*/ */
if(ptr->session->state == SESSION_STATE_STOPPING && if (ptr->session->state == SESSION_STATE_STOPPING &&
ptr->session->refcount == 0 && ptr->session->client == NULL) ptr->session->refcount == 0 && ptr->session->client == NULL)
{ {
ptr->session->state = SESSION_STATE_TO_BE_FREED; ptr->session->state = SESSION_STATE_TO_BE_FREED;
} }
#ifdef SS_DEBUG #ifdef SS_DEBUG
else if(ptr->session->state == SESSION_STATE_STOPPING) else if (ptr->session->state == SESSION_STATE_STOPPING)
{ {
o_stopping++; o_stopping++;
} }
else if(ptr->session->state == SESSION_STATE_ROUTER_READY) else if (ptr->session->state == SESSION_STATE_ROUTER_READY)
{ {
o_ready++; o_ready++;
} }
#endif #endif
ptr = ptr->next; ptr = ptr->next;
if(tmp) if (tmp)
{ {
tmp->next = finished; tmp->next = finished;
finished = tmp; finished = tmp;
@ -273,13 +278,15 @@ orphan_free(void* data)
spinlock_release(&orphanLock); spinlock_release(&orphanLock);
#ifdef SS_DEBUG #ifdef SS_DEBUG
if(o_stopping + o_ready > 0) if (o_stopping + o_ready > 0)
{
MXS_DEBUG("tee.c: %d orphans in " MXS_DEBUG("tee.c: %d orphans in "
"SESSION_STATE_STOPPING, %d orphans in " "SESSION_STATE_STOPPING, %d orphans in "
"SESSION_STATE_ROUTER_READY. ", o_stopping, o_ready); "SESSION_STATE_ROUTER_READY. ", o_stopping, o_ready);
}
#endif #endif
while(finished) while (finished)
{ {
#ifdef SS_DEBUG #ifdef SS_DEBUG
o_freed++; o_freed++;
@ -351,8 +358,8 @@ GetModuleObject()
static FILTER * static FILTER *
createInstance(char **options, FILTER_PARAMETER **params) createInstance(char **options, FILTER_PARAMETER **params)
{ {
TEE_INSTANCE *my_instance; TEE_INSTANCE *my_instance;
int i; int i;
if ((my_instance = calloc(1, sizeof(TEE_INSTANCE))) != NULL) if ((my_instance = calloc(1, sizeof(TEE_INSTANCE))) != NULL)
{ {
@ -387,9 +394,13 @@ int i;
my_instance->nomatch = strdup(params[i]->value); my_instance->nomatch = strdup(params[i]->value);
} }
else if (!strcmp(params[i]->name, "source")) else if (!strcmp(params[i]->name, "source"))
{
my_instance->source = strdup(params[i]->value); my_instance->source = strdup(params[i]->value);
}
else if (!strcmp(params[i]->name, "user")) else if (!strcmp(params[i]->name, "user"))
{
my_instance->userName = strdup(params[i]->value); my_instance->userName = strdup(params[i]->value);
}
else if (!filter_standard_parameter(params[i]->name)) else if (!filter_standard_parameter(params[i]->name))
{ {
MXS_ERROR("tee: Unexpected parameter '%s'.", MXS_ERROR("tee: Unexpected parameter '%s'.",
@ -431,7 +442,7 @@ int i;
return NULL; return NULL;
} }
} }
return (FILTER *)my_instance; return(FILTER *) my_instance;
} }
/** /**
@ -446,9 +457,9 @@ int i;
static void * static void *
newSession(FILTER *instance, SESSION *session) newSession(FILTER *instance, SESSION *session)
{ {
TEE_INSTANCE *my_instance = (TEE_INSTANCE *)instance; TEE_INSTANCE *my_instance = (TEE_INSTANCE *) instance;
TEE_SESSION *my_session; TEE_SESSION *my_session;
char *remote, *userName; char *remote, *userName;
if (strcmp(my_instance->service->name, session->service->name) == 0) if (strcmp(my_instance->service->name, session->service->name) == 0)
{ {
@ -458,11 +469,11 @@ char *remote, *userName;
goto retblock; goto retblock;
} }
HASHTABLE* ht = hashtable_alloc(100,simple_str_hash,strcmp); HASHTABLE* ht = hashtable_alloc(100, simple_str_hash, strcmp);
bool is_loop = detect_loops(my_instance,ht,session->service); bool is_loop = detect_loops(my_instance, ht, session->service);
hashtable_free(ht); hashtable_free(ht);
if(is_loop) if (is_loop)
{ {
MXS_ERROR("%s: Recursive use of tee filter in service.", MXS_ERROR("%s: Recursive use of tee filter in service.",
session->service->name); session->service->name);
@ -510,7 +521,7 @@ char *remote, *userName;
if ((dcb = dcb_clone(session->client)) == NULL) if ((dcb = dcb_clone(session->client)) == NULL)
{ {
freeSession(instance, (void *)my_session); freeSession(instance, (void *) my_session);
my_session = NULL; my_session = NULL;
MXS_ERROR("Creating client DCB for Tee " MXS_ERROR("Creating client DCB for Tee "
@ -519,10 +530,10 @@ char *remote, *userName;
goto retblock; goto retblock;
} }
if((dummy = filter_alloc("tee_dummy","tee_dummy")) == NULL) if ((dummy = filter_alloc("tee_dummy", "tee_dummy")) == NULL)
{ {
dcb_close(dcb); dcb_close(dcb);
freeSession(instance, (void *)my_session); freeSession(instance, (void *) my_session);
my_session = NULL; my_session = NULL;
MXS_ERROR("tee: Allocating memory for " MXS_ERROR("tee: Allocating memory for "
"dummy filter definition failed." "dummy filter definition failed."
@ -537,7 +548,7 @@ char *remote, *userName;
{ {
filter_free(dummy); filter_free(dummy);
dcb_close(dcb); dcb_close(dcb);
freeSession(instance, (void *)my_session); freeSession(instance, (void *) my_session);
my_session = NULL; my_session = NULL;
MXS_ERROR("Creating client session for Tee " MXS_ERROR("Creating client session for Tee "
"filter failed. Terminating session."); "filter failed. Terminating session.");
@ -553,11 +564,11 @@ char *remote, *userName;
my_session->branch_dcb = dcb; my_session->branch_dcb = dcb;
my_session->dummy_filterdef = dummy; my_session->dummy_filterdef = dummy;
if((dummy_upstream = filterUpstream( if ((dummy_upstream = filterUpstream(
dummy, my_session, &ses->tail)) == NULL) dummy, my_session, &ses->tail)) == NULL)
{ {
filter_free(dummy); filter_free(dummy);
closeSession(instance,(void*)my_session); closeSession(instance, (void*) my_session);
dcb_close(dcb); dcb_close(dcb);
free(my_session); free(my_session);
MXS_ERROR("tee: Allocating memory for" MXS_ERROR("tee: Allocating memory for"
@ -568,7 +579,7 @@ char *remote, *userName;
} }
ses->tail = *dummy_upstream; ses->tail = *dummy_upstream;
MySQLProtocol* protocol = (MySQLProtocol*)session->client->protocol; MySQLProtocol* protocol = (MySQLProtocol*) session->client->protocol;
my_session->use_ok = protocol->client_capabilities & (1 << 6); my_session->use_ok = protocol->client_capabilities & (1 << 6);
free(dummy_upstream); free(dummy_upstream);
} }
@ -589,12 +600,12 @@ retblock:
static void static void
closeSession(FILTER *instance, void *session) closeSession(FILTER *instance, void *session)
{ {
TEE_SESSION *my_session = (TEE_SESSION *)session; TEE_SESSION *my_session = (TEE_SESSION *) session;
ROUTER_OBJECT *router; ROUTER_OBJECT *router;
void *router_instance, *rsession; void *router_instance, *rsession;
SESSION *bsession; SESSION *bsession;
#ifdef SS_DEBUG #ifdef SS_DEBUG
MXS_INFO("Tee close: %d", atomic_add(&debug_seq,1)); MXS_INFO("Tee close: %d", atomic_add(&debug_seq, 1));
#endif #endif
if (my_session->active) if (my_session->active)
{ {
@ -621,15 +632,15 @@ MXS_INFO("Tee close: %d", atomic_add(&debug_seq,1));
* session. * session.
*/ */
if(my_session->waiting[PARENT]) if (my_session->waiting[PARENT])
{ {
if(my_session->command != 0x01 && if (my_session->command != 0x01 &&
my_session->client_dcb && my_session->client_dcb &&
my_session->client_dcb->state == DCB_STATE_POLLING) my_session->client_dcb->state == DCB_STATE_POLLING)
{ {
MXS_INFO("Tee session closed mid-query."); MXS_INFO("Tee session closed mid-query.");
GWBUF* errbuf = modutil_create_mysql_err_msg(1,0,1,"00000","Session closed."); GWBUF* errbuf = modutil_create_mysql_err_msg(1, 0, 1, "00000", "Session closed.");
my_session->client_dcb->func.write(my_session->client_dcb,errbuf); my_session->client_dcb->func.write(my_session->client_dcb, errbuf);
} }
} }
@ -647,11 +658,11 @@ MXS_INFO("Tee close: %d", atomic_add(&debug_seq,1));
static void static void
freeSession(FILTER *instance, void *session) freeSession(FILTER *instance, void *session)
{ {
TEE_SESSION *my_session = (TEE_SESSION *)session; TEE_SESSION *my_session = (TEE_SESSION *) session;
SESSION* ses = my_session->branch_session; SESSION* ses = my_session->branch_session;
session_state_t state; session_state_t state;
#ifdef SS_DEBUG #ifdef SS_DEBUG
MXS_INFO("Tee free: %d", atomic_add(&debug_seq,1)); MXS_INFO("Tee free: %d", atomic_add(&debug_seq, 1));
#endif #endif
if (ses != NULL) if (ses != NULL)
{ {
@ -673,7 +684,7 @@ MXS_INFO("Tee free: %d", atomic_add(&debug_seq,1));
/** This indicates that branch session is not available anymore */ /** This indicates that branch session is not available anymore */
my_session->branch_session = NULL; my_session->branch_session = NULL;
} }
else if(state == SESSION_STATE_STOPPING) else if (state == SESSION_STATE_STOPPING)
{ {
create_orphan(ses); create_orphan(ses);
} }
@ -682,14 +693,17 @@ MXS_INFO("Tee free: %d", atomic_add(&debug_seq,1));
{ {
filter_free(my_session->dummy_filterdef); filter_free(my_session->dummy_filterdef);
} }
if(my_session->tee_replybuf) if (my_session->tee_replybuf)
{
gwbuf_free(my_session->tee_replybuf); gwbuf_free(my_session->tee_replybuf);
}
free(session); free(session);
orphan_free(NULL); orphan_free(NULL);
return; return;
} }
/** /**
* Set the downstream filter or router to which queries will be * Set the downstream filter or router to which queries will be
* passed from this filter. * passed from this filter.
@ -741,25 +755,25 @@ setUpstream(FILTER *instance, void *session, UPSTREAM *upstream)
static int static int
routeQuery(FILTER *instance, void *session, GWBUF *queue) routeQuery(FILTER *instance, void *session, GWBUF *queue)
{ {
TEE_INSTANCE *my_instance = (TEE_INSTANCE *)instance; TEE_INSTANCE *my_instance = (TEE_INSTANCE *) instance;
TEE_SESSION *my_session = (TEE_SESSION *)session; TEE_SESSION *my_session = (TEE_SESSION *) session;
char *ptr; char *ptr;
int rval; int rval;
GWBUF *buffer = NULL, *clone = NULL; GWBUF *buffer = NULL, *clone = NULL;
unsigned char command = gwbuf_length(queue) >= 5 ? unsigned char command = gwbuf_length(queue) >= 5 ?
*((unsigned char*)queue->start + 4) : 1; *((unsigned char*) queue->start + 4) : 1;
#ifdef SS_DEBUG #ifdef SS_DEBUG
int prev_debug_seq = atomic_add(&debug_seq,1); int prev_debug_seq = atomic_add(&debug_seq, 1);
MXS_INFO("Tee routeQuery: %d : %s", MXS_INFO("Tee routeQuery: %d : %s",
prev_debug_seq, prev_debug_seq,
((char*)queue->start + 5)); ((char*) queue->start + 5));
#endif #endif
spinlock_acquire(&my_session->tee_lock); spinlock_acquire(&my_session->tee_lock);
if(!my_session->active) if (!my_session->active)
{ {
MXS_INFO("Tee: Received a reply when the session was closed."); MXS_INFO("Tee: Received a reply when the session was closed.");
gwbuf_free(queue); gwbuf_free(queue);
@ -767,9 +781,9 @@ routeQuery(FILTER *instance, void *session, GWBUF *queue)
return 0; return 0;
} }
if(my_session->queue) if (my_session->queue)
{ {
my_session->queue = gwbuf_append(my_session->queue,queue); my_session->queue = gwbuf_append(my_session->queue, queue);
buffer = modutil_get_next_MySQL_packet(&my_session->queue); buffer = modutil_get_next_MySQL_packet(&my_session->queue);
} }
else else
@ -778,22 +792,24 @@ routeQuery(FILTER *instance, void *session, GWBUF *queue)
my_session->queue = queue; my_session->queue = queue;
} }
if(buffer == NULL) if (buffer == NULL)
{ {
spinlock_release(&my_session->tee_lock); spinlock_release(&my_session->tee_lock);
return 1; return 1;
} }
clone = clone_query(my_instance, my_session,buffer); clone = clone_query(my_instance, my_session, buffer);
spinlock_release(&my_session->tee_lock); spinlock_release(&my_session->tee_lock);
/* Reset session state */ /* Reset session state */
if(!reset_session_state(my_session,buffer)) if (!reset_session_state(my_session, buffer))
{
return 0; return 0;
}
/** Route query downstream */ /** Route query downstream */
spinlock_acquire(&my_session->tee_lock); spinlock_acquire(&my_session->tee_lock);
rval = route_single_query(my_instance,my_session,buffer,clone); rval = route_single_query(my_instance, my_session, buffer, clone);
spinlock_release(&my_session->tee_lock); spinlock_release(&my_session->tee_lock);
return rval; return rval;
@ -801,27 +817,30 @@ routeQuery(FILTER *instance, void *session, GWBUF *queue)
int count_replies(GWBUF* buffer) int count_replies(GWBUF* buffer)
{ {
unsigned char* ptr = (unsigned char*)buffer->start; unsigned char* ptr = (unsigned char*) buffer->start;
unsigned char* end = (unsigned char*) buffer->end; unsigned char* end = (unsigned char*) buffer->end;
int pktlen, eof = 0; int pktlen, eof = 0;
int replies = 0; int replies = 0;
while(ptr < end) while (ptr < end)
{ {
pktlen = MYSQL_GET_PACKET_LEN(ptr) + 4; pktlen = MYSQL_GET_PACKET_LEN(ptr) + 4;
if(PTR_IS_OK(ptr) || PTR_IS_ERR(ptr) || PTR_IS_LOCAL_INFILE(ptr)) if (PTR_IS_OK(ptr) || PTR_IS_ERR(ptr) || PTR_IS_LOCAL_INFILE(ptr))
{ {
replies++; replies++;
ptr += pktlen; ptr += pktlen;
} }
else else
{ {
while(ptr < end && eof < 2) while (ptr < end && eof < 2)
{ {
pktlen = MYSQL_GET_PACKET_LEN(ptr) + 4; pktlen = MYSQL_GET_PACKET_LEN(ptr) + 4;
if(PTR_IS_EOF(ptr) || PTR_IS_ERR(ptr)) eof++; if (PTR_IS_EOF(ptr) || PTR_IS_ERR(ptr)) eof++;
ptr += pktlen; ptr += pktlen;
} }
if(eof == 2) replies++; if (eof == 2)
{
replies++;
}
eof = 0; eof = 0;
} }
} }
@ -832,14 +851,22 @@ int count_replies(GWBUF* buffer)
int lenenc_length(uint8_t* ptr) int lenenc_length(uint8_t* ptr)
{ {
char val = *ptr; char val = *ptr;
if(val < 251) if (val < 251)
{
return 1; return 1;
else if(val == 0xfc) }
else if (val == 0xfc)
{
return 3; return 3;
else if(val == 0xfd) }
else if (val == 0xfd)
{
return 4; return 4;
}
else else
{
return 9; return 9;
}
} }
uint16_t get_response_flags(uint8_t* datastart, bool ok_packet) uint16_t get_response_flags(uint8_t* datastart, bool ok_packet)
@ -850,17 +877,17 @@ uint16_t get_response_flags(uint8_t* datastart, bool ok_packet)
ptr += 4; ptr += 4;
if(ok_packet) if (ok_packet)
{ {
ptr += lenenc_length(ptr); ptr += lenenc_length(ptr);
ptr += lenenc_length(ptr); ptr += lenenc_length(ptr);
memcpy(&rval,ptr,sizeof(uint8_t)*2); memcpy(&rval, ptr, sizeof(uint8_t)*2);
} }
else else
{ {
/** This is an EOF packet*/ /** This is an EOF packet*/
ptr += 2; ptr += 2;
memcpy(&rval,ptr,sizeof(uint8_t)*2); memcpy(&rval, ptr, sizeof(uint8_t)*2);
} }
return rval; return rval;
@ -877,21 +904,21 @@ uint16_t get_response_flags(uint8_t* datastart, bool ok_packet)
* @param reply The response data * @param reply The response data
*/ */
static int static int
clientReply (FILTER* instance, void *session, GWBUF *reply) clientReply(FILTER* instance, void *session, GWBUF *reply)
{ {
int rc, branch, eof; int rc, branch, eof;
TEE_SESSION *my_session = (TEE_SESSION *) session; TEE_SESSION *my_session = (TEE_SESSION *) session;
bool route = false,mpkt; bool route = false, mpkt;
GWBUF *complete = NULL; GWBUF *complete = NULL;
unsigned char *ptr; unsigned char *ptr;
uint16_t flags = 0; uint16_t flags = 0;
int min_eof = my_session->command != 0x04 ? 2 : 1; int min_eof = my_session->command != 0x04 ? 2 : 1;
int more_results = 0; int more_results = 0;
#ifdef SS_DEBUG #ifdef SS_DEBUG
int prev_debug_seq = atomic_add(&debug_seq,1); int prev_debug_seq = atomic_add(&debug_seq, 1);
ptr = (unsigned char*) reply->start; ptr = (unsigned char*) reply->start;
MXS_INFO("Tee clientReply [%s] [%s] [%s]: %d", MXS_INFO("Tee clientReply [%s] [%s] [%s]: %d",
instance ? "parent":"child", instance ? "parent" : "child",
my_session->active ? "open" : "closed", my_session->active ? "open" : "closed",
PTR_IS_ERR(ptr) ? "ERR" : PTR_IS_OK(ptr) ? "OK" : "RSET", PTR_IS_ERR(ptr) ? "ERR" : PTR_IS_OK(ptr) ? "OK" : "RSET",
prev_debug_seq); prev_debug_seq);
@ -899,16 +926,16 @@ clientReply (FILTER* instance, void *session, GWBUF *reply)
spinlock_acquire(&my_session->tee_lock); spinlock_acquire(&my_session->tee_lock);
if(!my_session->active) if (!my_session->active)
{ {
MXS_INFO("Tee: Failed to return reply, session is closed"); MXS_INFO("Tee: Failed to return reply, session is closed");
gwbuf_free(reply); gwbuf_free(reply);
rc = 0; rc = 0;
if(my_session->waiting[PARENT]) if (my_session->waiting[PARENT])
{ {
GWBUF* errbuf = modutil_create_mysql_err_msg(1,0,1,"0000","Session closed."); GWBUF* errbuf = modutil_create_mysql_err_msg(1, 0, 1, "0000", "Session closed.");
my_session->waiting[PARENT] = false; my_session->waiting[PARENT] = false;
my_session->up.clientReply (my_session->up.instance, my_session->up.clientReply(my_session->up.instance,
my_session->up.session, my_session->up.session,
errbuf); errbuf);
} }
@ -921,7 +948,7 @@ clientReply (FILTER* instance, void *session, GWBUF *reply)
my_session->tee_partials[branch] = gwbuf_make_contiguous(my_session->tee_partials[branch]); my_session->tee_partials[branch] = gwbuf_make_contiguous(my_session->tee_partials[branch]);
complete = modutil_get_complete_packets(&my_session->tee_partials[branch]); complete = modutil_get_complete_packets(&my_session->tee_partials[branch]);
if(complete == NULL) if (complete == NULL)
{ {
/** Incomplete packet */ /** Incomplete packet */
MXS_DEBUG("tee.c: Incomplete packet, " MXS_DEBUG("tee.c: Incomplete packet, "
@ -932,7 +959,7 @@ clientReply (FILTER* instance, void *session, GWBUF *reply)
complete = gwbuf_make_contiguous(complete); complete = gwbuf_make_contiguous(complete);
if(my_session->tee_partials[branch] && if (my_session->tee_partials[branch] &&
GWBUF_EMPTY(my_session->tee_partials[branch])) GWBUF_EMPTY(my_session->tee_partials[branch]))
{ {
gwbuf_free(my_session->tee_partials[branch]); gwbuf_free(my_session->tee_partials[branch]);
@ -941,24 +968,24 @@ clientReply (FILTER* instance, void *session, GWBUF *reply)
ptr = (unsigned char*) complete->start; ptr = (unsigned char*) complete->start;
if(my_session->replies[branch] == 0) if (my_session->replies[branch] == 0)
{ {
MXS_INFO("Tee: First reply to a query for [%s].",branch == PARENT ? "PARENT":"CHILD"); MXS_INFO("Tee: First reply to a query for [%s].", branch == PARENT ? "PARENT" : "CHILD");
/* Reply is in a single packet if it is an OK, ERR or LOCAL_INFILE packet. /* Reply is in a single packet if it is an OK, ERR or LOCAL_INFILE packet.
* Otherwise the reply is a result set and the amount of packets is unknown. * Otherwise the reply is a result set and the amount of packets is unknown.
*/ */
if(PTR_IS_ERR(ptr) || PTR_IS_LOCAL_INFILE(ptr) || if (PTR_IS_ERR(ptr) || PTR_IS_LOCAL_INFILE(ptr) ||
PTR_IS_OK(ptr) || !my_session->multipacket[branch] ) PTR_IS_OK(ptr) || !my_session->multipacket[branch])
{ {
my_session->waiting[branch] = false; my_session->waiting[branch] = false;
my_session->multipacket[branch] = false; my_session->multipacket[branch] = false;
if(PTR_IS_OK(ptr)) if (PTR_IS_OK(ptr))
{ {
flags = get_response_flags(ptr,true); flags = get_response_flags(ptr, true);
more_results = (flags & 0x08) && my_session->client_multistatement; more_results = (flags & 0x08) && my_session->client_multistatement;
if(more_results) if (more_results)
{ {
MXS_INFO("Tee: [%s] waiting for more results.",branch == PARENT ? "PARENT":"CHILD"); MXS_INFO("Tee: [%s] waiting for more results.", branch == PARENT ? "PARENT" : "CHILD");
} }
} }
} }
@ -967,35 +994,35 @@ clientReply (FILTER* instance, void *session, GWBUF *reply)
{ {
MXS_DEBUG("tee.c: [%ld] Waiting for a result set from %s session.", MXS_DEBUG("tee.c: [%ld] Waiting for a result set from %s session.",
my_session->d_id, my_session->d_id,
branch == PARENT?"parent":"child"); branch == PARENT ? "parent" : "child");
} }
#endif #endif
} }
if(my_session->waiting[branch]) if (my_session->waiting[branch])
{ {
eof = modutil_count_signal_packets(complete,my_session->use_ok,my_session->eof[branch] > 0,&more_results); eof = modutil_count_signal_packets(complete, my_session->use_ok, my_session->eof[branch] > 0, &more_results);
more_results &= my_session->client_multistatement; more_results &= my_session->client_multistatement;
my_session->eof[branch] += eof; my_session->eof[branch] += eof;
if(my_session->eof[branch] >= min_eof) if (my_session->eof[branch] >= min_eof)
{ {
#ifdef SS_DEBUG #ifdef SS_DEBUG
MXS_DEBUG("tee.c [%ld] %s received last EOF packet", MXS_DEBUG("tee.c [%ld] %s received last EOF packet",
my_session->d_id, my_session->d_id,
branch == PARENT?"parent":"child"); branch == PARENT ? "parent" : "child");
#endif #endif
my_session->waiting[branch] = more_results; my_session->waiting[branch] = more_results;
if(more_results) if (more_results)
{ {
my_session->eof[branch] = 0; my_session->eof[branch] = 0;
} }
} }
} }
if(branch == PARENT) if (branch == PARENT)
{ {
my_session->tee_replybuf = gwbuf_append(my_session->tee_replybuf,complete); my_session->tee_replybuf = gwbuf_append(my_session->tee_replybuf, complete);
} }
else else
{ {
@ -1006,10 +1033,10 @@ clientReply (FILTER* instance, void *session, GWBUF *reply)
rc = 1; rc = 1;
mpkt = my_session->multipacket[PARENT] || my_session->multipacket[CHILD]; mpkt = my_session->multipacket[PARENT] || my_session->multipacket[CHILD];
if(my_session->tee_replybuf != NULL) if (my_session->tee_replybuf != NULL)
{ {
if(my_session->branch_session == NULL) if (my_session->branch_session == NULL)
{ {
rc = 0; rc = 0;
gwbuf_free(my_session->tee_replybuf); gwbuf_free(my_session->tee_replybuf);
@ -1017,67 +1044,67 @@ clientReply (FILTER* instance, void *session, GWBUF *reply)
MXS_ERROR("Tee child session was closed."); MXS_ERROR("Tee child session was closed.");
} }
if(mpkt) if (mpkt)
{ {
if(my_session->waiting[PARENT]) if (my_session->waiting[PARENT])
{ {
route = true; route = true;
} }
else if(my_session->eof[PARENT] >= min_eof && else if (my_session->eof[PARENT] >= min_eof &&
my_session->eof[CHILD] >= min_eof) my_session->eof[CHILD] >= min_eof)
{ {
route = true; route = true;
#ifdef SS_DEBUG #ifdef SS_DEBUG
MXS_DEBUG("tee.c:[%ld] Routing final packet of response set.",my_session->d_id); MXS_DEBUG("tee.c:[%ld] Routing final packet of response set.", my_session->d_id);
#endif #endif
} }
} }
else if(!my_session->waiting[PARENT] && else if (!my_session->waiting[PARENT] &&
!my_session->waiting[CHILD]) !my_session->waiting[CHILD])
{ {
#ifdef SS_DEBUG #ifdef SS_DEBUG
MXS_DEBUG("tee.c:[%ld] Routing single packet response.",my_session->d_id); MXS_DEBUG("tee.c:[%ld] Routing single packet response.", my_session->d_id);
#endif #endif
route = true; route = true;
} }
} }
if(route) if (route)
{ {
#ifdef SS_DEBUG #ifdef SS_DEBUG
MXS_DEBUG("tee.c:[%ld] Routing buffer '%p' parent(waiting [%s] replies [%d] eof[%d])" MXS_DEBUG("tee.c:[%ld] Routing buffer '%p' parent(waiting [%s] replies [%d] eof[%d])"
" child(waiting [%s] replies[%d] eof [%d])", " child(waiting [%s] replies[%d] eof [%d])",
my_session->d_id, my_session->d_id,
my_session->tee_replybuf, my_session->tee_replybuf,
my_session->waiting[PARENT] ? "true":"false", my_session->waiting[PARENT] ? "true" : "false",
my_session->replies[PARENT], my_session->replies[PARENT],
my_session->eof[PARENT], my_session->eof[PARENT],
my_session->waiting[CHILD]?"true":"false", my_session->waiting[CHILD] ? "true" : "false",
my_session->replies[CHILD], my_session->replies[CHILD],
my_session->eof[CHILD]); my_session->eof[CHILD]);
#endif #endif
rc = my_session->up.clientReply (my_session->up.instance, rc = my_session->up.clientReply(my_session->up.instance,
my_session->up.session, my_session->up.session,
my_session->tee_replybuf); my_session->tee_replybuf);
my_session->tee_replybuf = NULL; my_session->tee_replybuf = NULL;
} }
if(my_session->queue && if (my_session->queue &&
!my_session->waiting[PARENT] && !my_session->waiting[PARENT] &&
!my_session->waiting[CHILD]) !my_session->waiting[CHILD])
{ {
GWBUF* buffer = modutil_get_next_MySQL_packet(&my_session->queue); GWBUF* buffer = modutil_get_next_MySQL_packet(&my_session->queue);
GWBUF* clone = clone_query(my_session->instance,my_session,buffer); GWBUF* clone = clone_query(my_session->instance, my_session, buffer);
reset_session_state(my_session,buffer); reset_session_state(my_session, buffer);
route_single_query(my_session->instance,my_session,buffer,clone); route_single_query(my_session->instance, my_session, buffer, clone);
MXS_INFO("tee: routing queued query"); MXS_INFO("tee: routing queued query");
} }
retblock: retblock:
spinlock_release(&my_session->tee_lock); spinlock_release(&my_session->tee_lock);
@ -1098,23 +1125,31 @@ clientReply (FILTER* instance, void *session, GWBUF *reply)
static void static void
diagnostic(FILTER *instance, void *fsession, DCB *dcb) diagnostic(FILTER *instance, void *fsession, DCB *dcb)
{ {
TEE_INSTANCE *my_instance = (TEE_INSTANCE *)instance; TEE_INSTANCE *my_instance = (TEE_INSTANCE *) instance;
TEE_SESSION *my_session = (TEE_SESSION *)fsession; TEE_SESSION *my_session = (TEE_SESSION *) fsession;
if (my_instance->source) if (my_instance->source)
{
dcb_printf(dcb, "\t\tLimit to connections from %s\n", dcb_printf(dcb, "\t\tLimit to connections from %s\n",
my_instance->source); my_instance->source);
}
dcb_printf(dcb, "\t\tDuplicate statements to service %s\n", dcb_printf(dcb, "\t\tDuplicate statements to service %s\n",
my_instance->service->name); my_instance->service->name);
if (my_instance->userName) if (my_instance->userName)
{
dcb_printf(dcb, "\t\tLimit to user %s\n", dcb_printf(dcb, "\t\tLimit to user %s\n",
my_instance->userName); my_instance->userName);
}
if (my_instance->match) if (my_instance->match)
{
dcb_printf(dcb, "\t\tInclude queries that match %s\n", dcb_printf(dcb, "\t\tInclude queries that match %s\n",
my_instance->match); my_instance->match);
}
if (my_instance->nomatch) if (my_instance->nomatch)
{
dcb_printf(dcb, "\t\tExclude queries that match %s\n", dcb_printf(dcb, "\t\tExclude queries that match %s\n",
my_instance->nomatch); my_instance->nomatch);
}
if (my_session) if (my_session)
{ {
dcb_printf(dcb, "\t\tNo. of statements duplicated: %d.\n", dcb_printf(dcb, "\t\tNo. of statements duplicated: %d.\n",
@ -1135,46 +1170,52 @@ TEE_SESSION *my_session = (TEE_SESSION *)fsession;
static int static int
packet_is_required(GWBUF *queue) packet_is_required(GWBUF *queue)
{ {
uint8_t *ptr; uint8_t *ptr;
int i; int i;
ptr = GWBUF_DATA(queue); ptr = GWBUF_DATA(queue);
if (GWBUF_LENGTH(queue) > 4) if (GWBUF_LENGTH(queue) > 4)
{
for (i = 0; required_packets[i]; i++) for (i = 0; required_packets[i]; i++)
{
if (ptr[4] == required_packets[i]) if (ptr[4] == required_packets[i])
{
return 1; return 1;
}
}
}
return 0; return 0;
} }
/** /**
* Detects possible loops in the query cloning chain. * Detects possible loops in the query cloning chain.
*/ */
int detect_loops(TEE_INSTANCE *instance,HASHTABLE* ht, SERVICE* service) int detect_loops(TEE_INSTANCE *instance, HASHTABLE* ht, SERVICE* service)
{ {
SERVICE* svc = service; SERVICE* svc = service;
int i; int i;
if(ht == NULL) if (ht == NULL)
{ {
return -1; return -1;
} }
if(hashtable_add(ht,(void*)service->name,(void*)true) == 0) if (hashtable_add(ht, (void*) service->name, (void*) true) == 0)
{ {
return true; return true;
} }
for(i = 0;i<svc->n_filters;i++) for (i = 0; i < svc->n_filters; i++)
{ {
if(strcmp(svc->filters[i]->module,"tee") == 0) if (strcmp(svc->filters[i]->module, "tee") == 0)
{ {
/* /*
* Found a Tee filter, recurse down its path * Found a Tee filter, recurse down its path
* if the service name isn't already in the hashtable. * if the service name isn't already in the hashtable.
*/ */
TEE_INSTANCE* ninst = (TEE_INSTANCE*)svc->filters[i]->filter; TEE_INSTANCE* ninst = (TEE_INSTANCE*) svc->filters[i]->filter;
if(ninst == NULL) if (ninst == NULL)
{ {
/** /**
* This tee instance hasn't been initialized yet and full * This tee instance hasn't been initialized yet and full
@ -1184,7 +1225,7 @@ int detect_loops(TEE_INSTANCE *instance,HASHTABLE* ht, SERVICE* service)
} }
SERVICE* tgt = ninst->service; SERVICE* tgt = ninst->service;
if(detect_loops((TEE_INSTANCE*)svc->filters[i]->filter,ht,tgt)) if (detect_loops((TEE_INSTANCE*) svc->filters[i]->filter, ht, tgt))
{ {
return true; return true;
} }
@ -1202,7 +1243,7 @@ int internal_route(DCB* dcb)
/** This was set in the newSession function*/ /** This was set in the newSession function*/
TEE_SESSION* session = dcb->data; TEE_SESSION* session = dcb->data;
return routeQuery((FILTER*)session->instance,session,buffer); return routeQuery((FILTER*) session->instance, session, buffer);
} }
/** /**
@ -1241,7 +1282,7 @@ GWBUF* clone_query(TEE_INSTANCE* my_instance, TEE_SESSION* my_session, GWBUF* bu
if ((my_instance->match == NULL || if ((my_instance->match == NULL ||
regexec(&my_instance->re, ptr, 0, NULL, 0) == 0) && regexec(&my_instance->re, ptr, 0, NULL, 0) == 0) &&
(my_instance->nomatch == NULL || (my_instance->nomatch == NULL ||
regexec(&my_instance->nore,ptr,0,NULL, 0) != 0)) regexec(&my_instance->nore, ptr, 0, NULL, 0) != 0))
{ {
clone = gwbuf_clone_all(buffer); clone = gwbuf_clone_all(buffer);
my_session->residual = residual; my_session->residual = residual;
@ -1269,7 +1310,7 @@ GWBUF* clone_query(TEE_INSTANCE* my_instance, TEE_SESSION* my_session, GWBUF* bu
int route_single_query(TEE_INSTANCE* my_instance, TEE_SESSION* my_session, GWBUF* buffer, GWBUF* clone) int route_single_query(TEE_INSTANCE* my_instance, TEE_SESSION* my_session, GWBUF* buffer, GWBUF* clone)
{ {
int rval = 0; int rval = 0;
if(!my_session->active || if (!my_session->active ||
my_session->branch_session == NULL || my_session->branch_session == NULL ||
my_session->branch_session->state != SESSION_STATE_ROUTER_READY) my_session->branch_session->state != SESSION_STATE_ROUTER_READY)
{ {
@ -1318,33 +1359,35 @@ int route_single_query(TEE_INSTANCE* my_instance, TEE_SESSION* my_session, GWBUF
*/ */
int reset_session_state(TEE_SESSION* my_session, GWBUF* buffer) int reset_session_state(TEE_SESSION* my_session, GWBUF* buffer)
{ {
if(gwbuf_length(buffer) < 5) if (gwbuf_length(buffer) < 5)
{
return 0; return 0;
}
unsigned char command = *((unsigned char*)buffer->start + 4); unsigned char command = *((unsigned char*) buffer->start + 4);
switch(command) switch (command)
{ {
case 0x1b: case 0x1b:
my_session->client_multistatement = *((unsigned char*) buffer->start + 5); my_session->client_multistatement = *((unsigned char*) buffer->start + 5);
MXS_INFO("tee: client %s multistatements", MXS_INFO("tee: client %s multistatements",
my_session->client_multistatement ? "enabled":"disabled"); my_session->client_multistatement ? "enabled" : "disabled");
case 0x03: case 0x03:
case 0x16: case 0x16:
case 0x17: case 0x17:
case 0x04: case 0x04:
case 0x0a: case 0x0a:
memset(my_session->multipacket,(char)true,2*sizeof(bool)); memset(my_session->multipacket, (char) true, 2 * sizeof(bool));
break; break;
default: default:
memset(my_session->multipacket,(char)false,2*sizeof(bool)); memset(my_session->multipacket, (char) false, 2 * sizeof(bool));
break; break;
} }
memset(my_session->replies,0,2*sizeof(int)); memset(my_session->replies, 0, 2 * sizeof(int));
memset(my_session->reply_packets,0,2*sizeof(int)); memset(my_session->reply_packets, 0, 2 * sizeof(int));
memset(my_session->eof,0,2*sizeof(int)); memset(my_session->eof, 0, 2 * sizeof(int));
memset(my_session->waiting,1,2*sizeof(bool)); memset(my_session->waiting, 1, 2 * sizeof(bool));
my_session->command = command; my_session->command = command;
return 1; return 1;
@ -1353,12 +1396,14 @@ int reset_session_state(TEE_SESSION* my_session, GWBUF* buffer)
void create_orphan(SESSION* ses) void create_orphan(SESSION* ses)
{ {
orphan_session_t* orphan; orphan_session_t* orphan;
if((orphan = malloc(sizeof(orphan_session_t))) == NULL) if ((orphan = malloc(sizeof(orphan_session_t))) == NULL)
{ {
MXS_ERROR("Failed to " MXS_ERROR("Failed to "
"allocate memory for orphan session struct, " "allocate memory for orphan session struct, "
"child session might leak memory."); "child session might leak memory.");
}else{ }
else
{
orphan->session = ses; orphan->session = ses;
spinlock_acquire(&orphanLock); spinlock_acquire(&orphanLock);
orphan->next = allOrphans; orphan->next = allOrphans;