Merge branch 'bug_685_fix' into develop

This commit is contained in:
Markus Makela
2015-01-09 19:43:49 +02:00

View File

@ -60,6 +60,7 @@
#include <dcb.h>
#include <sys/time.h>
#include <poll.h>
#include <mysql_client_server_protocol.h>
#define MYSQL_COM_QUIT 0x01
#define MYSQL_COM_INITDB 0x02
@ -73,7 +74,8 @@
#define REPLY_TIMEOUT_SECOND 5
#define REPLY_TIMEOUT_MILLISECOND 1
#define PARENT 0
#define CHILD 1
static unsigned char required_packets[] = {
MYSQL_COM_QUIT,
MYSQL_COM_INITDB,
@ -154,7 +156,7 @@ typedef struct {
FILTER_DEF* dummy_filterdef;
int active; /* filter is active? */
int waiting; /* if the client is waiting for a reply */
int replies; /* Number of queries received */
int replies[2]; /* Number of queries received */
int min_replies; /* Minimum number of replies to receive
* before forwarding the packet to the client*/
DCB *branch_dcb; /* Client DCB for "branch" service */
@ -164,6 +166,7 @@ typedef struct {
int residual; /* Any outstanding SQL text */
GWBUF* tee_replybuf; /* Buffer for reply */
SPINLOCK tee_lock;
long bytes_left[2];
} TEE_SESSION;
typedef struct orphan_session_tt
@ -761,8 +764,6 @@ GWBUF *clone = NULL;
(my_instance->nomatch == NULL ||
regexec(&my_instance->nore,ptr,0,NULL, 0) != 0))
{
char *dummy;
length = modutil_MySQL_query_len(queue, &residual);
clone = gwbuf_clone_all(queue);
my_session->residual = residual;
@ -775,8 +776,12 @@ GWBUF *clone = NULL;
}
}
/* Pass the query downstream */
my_session->replies = 0;
ss_dassert(my_session->tee_replybuf == NULL);
memset(my_session->bytes_left,0,2*sizeof(long));
memset(my_session->replies,0,2*sizeof(int));
ss_dassert(my_session->tee_replybuf == NULL);
rval = my_session->down.routeQuery(my_session->down.instance,
my_session->down.session,
queue);
@ -809,9 +814,11 @@ GWBUF *clone = NULL;
}
my_session->n_rejected++;
}
return rval;
}
#define BOTH_REPLIED(s) (s->replies[PARENT] > 0 && s->replies[CHILD] > 0)
/**
* The clientReply entry point. This is passed the response buffer
@ -826,33 +833,51 @@ GWBUF *clone = NULL;
static int
clientReply (FILTER* instance, void *session, GWBUF *reply)
{
int rc;
int rc, replies, branch;
long n_bytes_missing, len;
TEE_SESSION *my_session = (TEE_SESSION *) session;
spinlock_acquire(&my_session->tee_lock);
ss_dassert(my_session->active);
my_session->replies++;
if (my_session->tee_replybuf == NULL &&
instance != NULL)
{
my_session->tee_replybuf = reply;
}
else
{
gwbuf_free(reply);
}
if((my_session->branch_session == NULL ||
my_session->replies >= my_session->min_replies) &&
my_session->tee_replybuf != NULL)
len = gwbuf_length(reply);
branch = instance == NULL ? CHILD : PARENT;
if(my_session->replies[branch] == 0)
{
long pklen = MYSQL_GET_PACKET_LEN((uint8_t *) GWBUF_DATA(reply)) + 4;
my_session->bytes_left[branch] = pklen;
}
my_session->bytes_left[branch] -= len;
my_session->replies[branch]++;
n_bytes_missing = MAX(my_session->bytes_left[PARENT],0) +
MAX(my_session->bytes_left[CHILD],0);
replies = my_session->replies[PARENT] + my_session->replies[CHILD];
if(branch == PARENT)
{
my_session->tee_replybuf = my_session->tee_replybuf == NULL ?
reply : gwbuf_append(my_session->tee_replybuf,reply);
}
else
{
gwbuf_free(reply);
}
if(my_session->tee_replybuf != NULL &&
(my_session->branch_session == NULL ||
(n_bytes_missing == 0 && BOTH_REPLIED(my_session)) ||
my_session->bytes_left[PARENT] > 0))
{
rc = my_session->up.clientReply (
my_session->up.instance,
my_session->up.session,
my_session->tee_replybuf);
my_session->replies = 0;
my_session->tee_replybuf = NULL;
}
else