Tee filter now supports longer packets and only sends the last GWBUF if both the parent and the child session have send all their bytes.
This commit is contained in:
Markus Makela
2015-01-09 12:08:13 +02:00
parent 8ce9cd1cbd
commit 759b0c2ccb

View File

@ -60,6 +60,7 @@
#include <dcb.h> #include <dcb.h>
#include <sys/time.h> #include <sys/time.h>
#include <poll.h> #include <poll.h>
#include <mysql_client_server_protocol.h>
#define MYSQL_COM_QUIT 0x01 #define MYSQL_COM_QUIT 0x01
#define MYSQL_COM_INITDB 0x02 #define MYSQL_COM_INITDB 0x02
@ -73,7 +74,8 @@
#define REPLY_TIMEOUT_SECOND 5 #define REPLY_TIMEOUT_SECOND 5
#define REPLY_TIMEOUT_MILLISECOND 1 #define REPLY_TIMEOUT_MILLISECOND 1
#define PARENT 0
#define CHILD 1
static unsigned char required_packets[] = { static unsigned char required_packets[] = {
MYSQL_COM_QUIT, MYSQL_COM_QUIT,
MYSQL_COM_INITDB, MYSQL_COM_INITDB,
@ -154,7 +156,7 @@ typedef struct {
FILTER_DEF* dummy_filterdef; FILTER_DEF* dummy_filterdef;
int active; /* filter is active? */ int active; /* filter is active? */
int waiting; /* if the client is waiting for a reply */ int waiting; /* if the client is waiting for a reply */
int replies; /* Number of queries received */ int replies[2]; /* Number of queries received */
int min_replies; /* Minimum number of replies to receive int min_replies; /* Minimum number of replies to receive
* before forwarding the packet to the client*/ * before forwarding the packet to the client*/
DCB *branch_dcb; /* Client DCB for "branch" service */ DCB *branch_dcb; /* Client DCB for "branch" service */
@ -164,6 +166,7 @@ typedef struct {
int residual; /* Any outstanding SQL text */ int residual; /* Any outstanding SQL text */
GWBUF* tee_replybuf; /* Buffer for reply */ GWBUF* tee_replybuf; /* Buffer for reply */
SPINLOCK tee_lock; SPINLOCK tee_lock;
long bytes_left[2];
} TEE_SESSION; } TEE_SESSION;
typedef struct orphan_session_tt typedef struct orphan_session_tt
@ -761,8 +764,6 @@ GWBUF *clone = NULL;
(my_instance->nomatch == NULL || (my_instance->nomatch == NULL ||
regexec(&my_instance->nore,ptr,0,NULL, 0) != 0)) regexec(&my_instance->nore,ptr,0,NULL, 0) != 0))
{ {
char *dummy;
length = modutil_MySQL_query_len(queue, &residual); length = modutil_MySQL_query_len(queue, &residual);
clone = gwbuf_clone_all(queue); clone = gwbuf_clone_all(queue);
my_session->residual = residual; my_session->residual = residual;
@ -776,7 +777,11 @@ GWBUF *clone = NULL;
} }
/* Pass the query downstream */ /* Pass the query downstream */
my_session->replies = 0; ss_dassert(my_session->tee_replybuf == NULL);
memset(my_session->bytes_left,0,2*sizeof(long));
memset(my_session->replies,0,2*sizeof(int));
ss_dassert(my_session->tee_replybuf == NULL);
rval = my_session->down.routeQuery(my_session->down.instance, rval = my_session->down.routeQuery(my_session->down.instance,
my_session->down.session, my_session->down.session,
queue); queue);
@ -809,9 +814,11 @@ GWBUF *clone = NULL;
} }
my_session->n_rejected++; my_session->n_rejected++;
} }
return rval; return rval;
} }
#define BOTH_REPLIED(s) (s->replies[PARENT] > 0 && s->replies[CHILD] > 0)
/** /**
* The clientReply entry point. This is passed the response buffer * The clientReply entry point. This is passed the response buffer
@ -826,33 +833,51 @@ GWBUF *clone = NULL;
static int static int
clientReply (FILTER* instance, void *session, GWBUF *reply) clientReply (FILTER* instance, void *session, GWBUF *reply)
{ {
int rc; int rc, replies, branch;
long n_bytes_missing, len;
TEE_SESSION *my_session = (TEE_SESSION *) session; TEE_SESSION *my_session = (TEE_SESSION *) session;
spinlock_acquire(&my_session->tee_lock); spinlock_acquire(&my_session->tee_lock);
ss_dassert(my_session->active); ss_dassert(my_session->active);
my_session->replies++;
if (my_session->tee_replybuf == NULL && len = gwbuf_length(reply);
instance != NULL) branch = instance == NULL ? CHILD : PARENT;
if(my_session->replies[branch] == 0)
{ {
my_session->tee_replybuf = reply; long pklen = MYSQL_GET_PACKET_LEN((uint8_t *) GWBUF_DATA(reply)) + 4;
my_session->bytes_left[branch] = pklen;
}
my_session->bytes_left[branch] -= len;
my_session->replies[branch]++;
n_bytes_missing = MAX(my_session->bytes_left[PARENT],0) +
MAX(my_session->bytes_left[CHILD],0);
replies = my_session->replies[PARENT] + my_session->replies[CHILD];
if(branch == PARENT)
{
my_session->tee_replybuf = my_session->tee_replybuf == NULL ?
reply : gwbuf_append(my_session->tee_replybuf,reply);
} }
else else
{ {
gwbuf_free(reply); gwbuf_free(reply);
} }
if((my_session->branch_session == NULL ||
my_session->replies >= my_session->min_replies) &&
my_session->tee_replybuf != NULL) if(my_session->tee_replybuf != NULL &&
(my_session->branch_session == NULL ||
(n_bytes_missing == 0 && BOTH_REPLIED(my_session)) ||
my_session->bytes_left[PARENT] > 0))
{ {
rc = my_session->up.clientReply ( rc = my_session->up.clientReply (
my_session->up.instance, my_session->up.instance,
my_session->up.session, my_session->up.session,
my_session->tee_replybuf); my_session->tee_replybuf);
my_session->replies = 0;
my_session->tee_replybuf = NULL; my_session->tee_replybuf = NULL;
} }
else else