Update to bug 685 fix: http://bugs.mariadb.com/show_bug.cgi?id=685
Fixed results sets not being processed as a set of multiple packets.
This commit is contained in:
@ -61,6 +61,7 @@
|
|||||||
#include <sys/time.h>
|
#include <sys/time.h>
|
||||||
#include <poll.h>
|
#include <poll.h>
|
||||||
#include <mysql_client_server_protocol.h>
|
#include <mysql_client_server_protocol.h>
|
||||||
|
#include <housekeeper.h>
|
||||||
|
|
||||||
#define MYSQL_COM_QUIT 0x01
|
#define MYSQL_COM_QUIT 0x01
|
||||||
#define MYSQL_COM_INITDB 0x02
|
#define MYSQL_COM_INITDB 0x02
|
||||||
@ -76,6 +77,10 @@
|
|||||||
#define REPLY_TIMEOUT_MILLISECOND 1
|
#define REPLY_TIMEOUT_MILLISECOND 1
|
||||||
#define PARENT 0
|
#define PARENT 0
|
||||||
#define CHILD 1
|
#define CHILD 1
|
||||||
|
|
||||||
|
#define PTR_IS_RESULTSET(b) (b[0] == 0x01 && b[1] == 0x0 && b[2] == 0x0 && b[3] == 0x01)
|
||||||
|
#define PTR_IS_EOF(b) (b[4] == 0xfe)
|
||||||
|
|
||||||
static unsigned char required_packets[] = {
|
static unsigned char required_packets[] = {
|
||||||
MYSQL_COM_QUIT,
|
MYSQL_COM_QUIT,
|
||||||
MYSQL_COM_INITDB,
|
MYSQL_COM_INITDB,
|
||||||
@ -155,10 +160,9 @@ typedef struct {
|
|||||||
|
|
||||||
FILTER_DEF* dummy_filterdef;
|
FILTER_DEF* dummy_filterdef;
|
||||||
int active; /* filter is active? */
|
int active; /* filter is active? */
|
||||||
int waiting; /* if the client is waiting for a reply */
|
bool waiting[2]; /* if the client is waiting for a reply */
|
||||||
|
int eof[2];
|
||||||
int replies[2]; /* Number of queries received */
|
int replies[2]; /* Number of queries received */
|
||||||
int min_replies; /* Minimum number of replies to receive
|
|
||||||
* before forwarding the packet to the client*/
|
|
||||||
DCB *branch_dcb; /* Client DCB for "branch" service */
|
DCB *branch_dcb; /* Client DCB for "branch" service */
|
||||||
SESSION *branch_session;/* The branch service session */
|
SESSION *branch_session;/* The branch service session */
|
||||||
int n_duped; /* Number of duplicated queries */
|
int n_duped; /* Number of duplicated queries */
|
||||||
@ -166,7 +170,6 @@ typedef struct {
|
|||||||
int residual; /* Any outstanding SQL text */
|
int residual; /* Any outstanding SQL text */
|
||||||
GWBUF* tee_replybuf; /* Buffer for reply */
|
GWBUF* tee_replybuf; /* Buffer for reply */
|
||||||
SPINLOCK tee_lock;
|
SPINLOCK tee_lock;
|
||||||
long bytes_left[2];
|
|
||||||
} TEE_SESSION;
|
} TEE_SESSION;
|
||||||
|
|
||||||
typedef struct orphan_session_tt
|
typedef struct orphan_session_tt
|
||||||
@ -205,6 +208,83 @@ static int hcfn(
|
|||||||
return strcmp(i1,i2);
|
return strcmp(i1,i2);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
orphan_free(void* data)
|
||||||
|
{
|
||||||
|
spinlock_acquire(&orphanLock);
|
||||||
|
orphan_session_t *ptr = allOrphans, *finished = NULL, *tmp = NULL;
|
||||||
|
#ifdef SS_DEBUG
|
||||||
|
int o_stopping = 0, o_ready = 0, o_freed = 0;
|
||||||
|
#endif
|
||||||
|
while(ptr)
|
||||||
|
{
|
||||||
|
if(ptr->session->state == SESSION_STATE_TO_BE_FREED)
|
||||||
|
{
|
||||||
|
if(ptr == allOrphans)
|
||||||
|
{
|
||||||
|
tmp = ptr;
|
||||||
|
allOrphans = ptr->next;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
tmp = allOrphans;
|
||||||
|
while(tmp && tmp->next != ptr)
|
||||||
|
tmp = tmp->next;
|
||||||
|
if(tmp)
|
||||||
|
{
|
||||||
|
tmp->next = ptr->next;
|
||||||
|
tmp = ptr;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#ifdef SS_DEBUG
|
||||||
|
else if(ptr->session->state == SESSION_STATE_STOPPING)
|
||||||
|
{
|
||||||
|
o_stopping++;
|
||||||
|
}
|
||||||
|
else if(ptr->session->state == SESSION_STATE_ROUTER_READY)
|
||||||
|
{
|
||||||
|
o_ready++;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
ptr = ptr->next;
|
||||||
|
if(tmp)
|
||||||
|
{
|
||||||
|
tmp->next = finished;
|
||||||
|
finished = tmp;
|
||||||
|
tmp = NULL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
spinlock_release(&orphanLock);
|
||||||
|
|
||||||
|
#ifdef SS_DEBUG
|
||||||
|
if(o_stopping + o_ready > 0)
|
||||||
|
skygw_log_write(LOGFILE_DEBUG, "tee.c: %d orphans in "
|
||||||
|
"SESSION_STATE_STOPPING, %d orphans in "
|
||||||
|
"SESSION_STATE_ROUTER_READY. ", o_stopping, o_ready);
|
||||||
|
#endif
|
||||||
|
|
||||||
|
while(finished)
|
||||||
|
{
|
||||||
|
o_freed++;
|
||||||
|
tmp = finished;
|
||||||
|
finished = finished->next;
|
||||||
|
|
||||||
|
tmp->session->service->router->freeSession(
|
||||||
|
tmp->session->service->router_instance,
|
||||||
|
tmp->session->router_session);
|
||||||
|
|
||||||
|
tmp->session->state = SESSION_STATE_FREE;
|
||||||
|
free(tmp->session);
|
||||||
|
free(tmp);
|
||||||
|
}
|
||||||
|
|
||||||
|
#ifdef SS_DEBUG
|
||||||
|
skygw_log_write(LOGFILE_DEBUG, "tee.c: %d orphans freed.", o_freed);
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Implementation of the mandatory version entry point
|
* Implementation of the mandatory version entry point
|
||||||
*
|
*
|
||||||
@ -224,6 +304,7 @@ void
|
|||||||
ModuleInit()
|
ModuleInit()
|
||||||
{
|
{
|
||||||
spinlock_init(&orphanLock);
|
spinlock_init(&orphanLock);
|
||||||
|
hktask_add("tee orphan cleanup",orphan_free,NULL,15);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -493,7 +574,6 @@ char *remote, *userName;
|
|||||||
}
|
}
|
||||||
|
|
||||||
ses->tail = *dummy_upstream;
|
ses->tail = *dummy_upstream;
|
||||||
my_session->min_replies = 2;
|
|
||||||
my_session->branch_session = ses;
|
my_session->branch_session = ses;
|
||||||
my_session->branch_dcb = dcb;
|
my_session->branch_dcb = dcb;
|
||||||
my_session->dummy_filterdef = dummy;
|
my_session->dummy_filterdef = dummy;
|
||||||
@ -608,78 +688,6 @@ SESSION* ses = my_session->branch_session;
|
|||||||
}
|
}
|
||||||
free(session);
|
free(session);
|
||||||
|
|
||||||
spinlock_acquire(&orphanLock);
|
|
||||||
orphan_session_t *ptr = allOrphans, *finished = NULL,*tmp = NULL;
|
|
||||||
#ifdef SS_DEBUG
|
|
||||||
int o_stopping = 0, o_ready = 0,o_freed = 0;
|
|
||||||
#endif
|
|
||||||
while(ptr)
|
|
||||||
{
|
|
||||||
if(ptr->session->state == SESSION_STATE_TO_BE_FREED)
|
|
||||||
{
|
|
||||||
if(ptr == allOrphans)
|
|
||||||
{
|
|
||||||
tmp = ptr;
|
|
||||||
allOrphans = ptr->next;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
tmp = allOrphans;
|
|
||||||
while(tmp && tmp->next != ptr)
|
|
||||||
tmp = tmp->next;
|
|
||||||
if(tmp)
|
|
||||||
{
|
|
||||||
tmp->next = ptr->next;
|
|
||||||
tmp = ptr;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
#ifdef SS_DEBUG
|
|
||||||
else if(ptr->session->state == SESSION_STATE_STOPPING)
|
|
||||||
{
|
|
||||||
o_stopping++;
|
|
||||||
}
|
|
||||||
else if(ptr->session->state == SESSION_STATE_ROUTER_READY)
|
|
||||||
{
|
|
||||||
o_ready++;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
ptr = ptr->next;
|
|
||||||
if(tmp)
|
|
||||||
{
|
|
||||||
tmp->next = finished;
|
|
||||||
finished = tmp;
|
|
||||||
tmp = NULL;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
spinlock_release(&orphanLock);
|
|
||||||
|
|
||||||
#ifdef SS_DEBUG
|
|
||||||
if(o_stopping + o_ready > 0)
|
|
||||||
skygw_log_write(LOGFILE_DEBUG,"tee.c: %d orphans in "
|
|
||||||
"SESSION_STATE_STOPPING, %d orphans in "
|
|
||||||
"SESSION_STATE_ROUTER_READY. ",o_stopping,o_ready);
|
|
||||||
#endif
|
|
||||||
|
|
||||||
while(finished)
|
|
||||||
{
|
|
||||||
#ifdef SS_DEBUG
|
|
||||||
skygw_log_write(LOGFILE_DEBUG,"tee.c: %d orphans freed.",++o_freed);
|
|
||||||
#endif
|
|
||||||
tmp = finished;
|
|
||||||
finished = finished->next;
|
|
||||||
|
|
||||||
tmp->session->service->router->freeSession(
|
|
||||||
tmp->session->service->router_instance,
|
|
||||||
tmp->session->router_session);
|
|
||||||
|
|
||||||
tmp->session->state = SESSION_STATE_FREE;
|
|
||||||
free(tmp->session);
|
|
||||||
free(tmp);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
/**
|
/**
|
||||||
@ -779,9 +787,10 @@ GWBUF *clone = NULL;
|
|||||||
|
|
||||||
ss_dassert(my_session->tee_replybuf == NULL);
|
ss_dassert(my_session->tee_replybuf == NULL);
|
||||||
|
|
||||||
memset(my_session->bytes_left,0,2*sizeof(long));
|
|
||||||
memset(my_session->replies,0,2*sizeof(int));
|
memset(my_session->replies,0,2*sizeof(int));
|
||||||
ss_dassert(my_session->tee_replybuf == NULL);
|
memset(my_session->eof,0,2*sizeof(int));
|
||||||
|
memset(my_session->waiting,0,2*sizeof(bool));
|
||||||
|
|
||||||
rval = my_session->down.routeQuery(my_session->down.instance,
|
rval = my_session->down.routeQuery(my_session->down.instance,
|
||||||
my_session->down.session,
|
my_session->down.session,
|
||||||
queue);
|
queue);
|
||||||
@ -818,7 +827,38 @@ GWBUF *clone = NULL;
|
|||||||
return rval;
|
return rval;
|
||||||
}
|
}
|
||||||
|
|
||||||
#define BOTH_REPLIED(s) (s->replies[PARENT] > 0 && s->replies[CHILD] > 0)
|
/**
|
||||||
|
* Scans the GWBUF for EOF packets. If two packets for this session have been found
|
||||||
|
* from either the parent or the child branch, mark the response set from that branch as over.
|
||||||
|
* @param session The Tee filter session
|
||||||
|
* @param branch Parent or child branch
|
||||||
|
* @param reply Buffer to scan
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
scan_resultset(TEE_SESSION *session, int branch, GWBUF *reply)
|
||||||
|
{
|
||||||
|
unsigned char* ptr = (unsigned char*) reply->start;
|
||||||
|
unsigned char* end = (unsigned char*) reply->end;
|
||||||
|
int pktlen = 0;
|
||||||
|
|
||||||
|
while(ptr < end)
|
||||||
|
{
|
||||||
|
pktlen = gw_mysql_get_byte3(ptr) + 4;
|
||||||
|
if(PTR_IS_EOF(ptr))
|
||||||
|
{
|
||||||
|
session->eof[branch]++;
|
||||||
|
|
||||||
|
if(session->eof[branch] == 2)
|
||||||
|
{
|
||||||
|
session->waiting[branch] = false;
|
||||||
|
session->eof[branch] = 0;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ptr += pktlen;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The clientReply entry point. This is passed the response buffer
|
* The clientReply entry point. This is passed the response buffer
|
||||||
@ -833,46 +873,46 @@ GWBUF *clone = NULL;
|
|||||||
static int
|
static int
|
||||||
clientReply (FILTER* instance, void *session, GWBUF *reply)
|
clientReply (FILTER* instance, void *session, GWBUF *reply)
|
||||||
{
|
{
|
||||||
int rc, replies, branch;
|
int rc, branch;
|
||||||
long n_bytes_missing, len;
|
|
||||||
TEE_SESSION *my_session = (TEE_SESSION *) session;
|
TEE_SESSION *my_session = (TEE_SESSION *) session;
|
||||||
|
|
||||||
spinlock_acquire(&my_session->tee_lock);
|
spinlock_acquire(&my_session->tee_lock);
|
||||||
|
|
||||||
ss_dassert(my_session->active);
|
ss_dassert(my_session->active);
|
||||||
|
|
||||||
len = gwbuf_length(reply);
|
|
||||||
branch = instance == NULL ? CHILD : PARENT;
|
branch = instance == NULL ? CHILD : PARENT;
|
||||||
|
unsigned char *ptr = (unsigned char*)reply->start;
|
||||||
|
|
||||||
if(my_session->replies[branch] == 0)
|
if(my_session->replies[branch] == 0)
|
||||||
{
|
{
|
||||||
long pklen = MYSQL_GET_PACKET_LEN((uint8_t *) GWBUF_DATA(reply)) + 4;
|
if(PTR_IS_RESULTSET(ptr))
|
||||||
my_session->bytes_left[branch] = pklen;
|
{
|
||||||
|
my_session->waiting[branch] = true;
|
||||||
|
my_session->eof[branch] = 0;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
my_session->bytes_left[branch] -= len;
|
|
||||||
my_session->replies[branch]++;
|
|
||||||
|
|
||||||
n_bytes_missing = MAX(my_session->bytes_left[PARENT],0) +
|
if(my_session->waiting[branch])
|
||||||
MAX(my_session->bytes_left[CHILD],0);
|
{
|
||||||
|
scan_resultset(my_session,branch,reply);
|
||||||
replies = my_session->replies[PARENT] + my_session->replies[CHILD];
|
}
|
||||||
|
|
||||||
if(branch == PARENT)
|
if(branch == PARENT)
|
||||||
{
|
{
|
||||||
my_session->tee_replybuf = my_session->tee_replybuf == NULL ?
|
ss_dassert(my_session->tee_replybuf == NULL)
|
||||||
reply : gwbuf_append(my_session->tee_replybuf,reply);
|
my_session->tee_replybuf = reply;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
gwbuf_free(reply);
|
gwbuf_free(reply);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
my_session->replies[branch]++;
|
||||||
|
|
||||||
if(my_session->tee_replybuf != NULL &&
|
if(my_session->tee_replybuf != NULL &&
|
||||||
(my_session->branch_session == NULL ||
|
(my_session->branch_session == NULL ||
|
||||||
(n_bytes_missing == 0 && BOTH_REPLIED(my_session)) ||
|
my_session->waiting[PARENT] ||
|
||||||
my_session->bytes_left[PARENT] > 0))
|
(!my_session->waiting[CHILD] && !my_session->waiting[PARENT])))
|
||||||
{
|
{
|
||||||
rc = my_session->up.clientReply (
|
rc = my_session->up.clientReply (
|
||||||
my_session->up.instance,
|
my_session->up.instance,
|
||||||
|
|||||||
Reference in New Issue
Block a user