Added the missing detection of partial packets in the buffers.
This commit is contained in:
Markus Makela
2015-01-15 13:13:09 +02:00
parent 85c84c9e71
commit 88a26f03ab
3 changed files with 121 additions and 43 deletions

View File

@ -494,42 +494,92 @@ return_packetbuf:
return packetbuf; return packetbuf;
} }
/**
* Parse the buffer and split complete packets into individual buffers.
* Any partial packets are left in the old buffer.
* @param p_readbuf Buffer to split
* @return Head of the chain of complete packets
*/
GWBUF* modutil_get_complete_packets(GWBUF** p_readbuf)
{
GWBUF *buff = NULL, *packet = NULL;
while((packet = modutil_get_next_MySQL_packet(p_readbuf)) != NULL)
{
buff = gwbuf_append(buff,packet);
}
return buff;
}
/** /**
* Count the number of EOF, OK or ERR packets in the buffer. * Count the number of EOF, OK or ERR packets in the buffer. Only complete
* packets are inspected and the buffer is assumed to only contain whole packets.
* If partial packets are in the buffer, they are ingnored. The caller must handle the
* detection of partial packets in buffers.
* @param reply Buffer to use * @param reply Buffer to use
* @param use_ok Whether the DEPRECATE_EOF flag is set * @param use_ok Whether the DEPRECATE_EOF flag is set
* @param n_found If there were previous packets found * @param n_found If there were previous packets found
* @return Number of EOF packets * @return Number of EOF packets
*/ */
int int
modutil_count_signal_packets(GWBUF *reply,int use_ok, int n_found) modutil_count_signal_packets(GWBUF *reply, int use_ok, int n_found)
{ {
unsigned char* ptr = (unsigned char*) reply->start; unsigned char* ptr = (unsigned char*) reply->start;
unsigned char* end = (unsigned char*) reply->end; unsigned char* end = (unsigned char*) reply->end;
unsigned char* prev = ptr; unsigned char* prev = ptr;
int pktlen,pkt = 0,found = n_found; int pktlen, eof = 0, err = 0, found = n_found;
int errlen = 0, eoflen = 0;
int iserr = 0, iseof = 0;
while(ptr < end) while(ptr < end)
{ {
pktlen = MYSQL_GET_PACKET_LEN(ptr) + 4; pktlen = MYSQL_GET_PACKET_LEN(ptr) + 4;
if( !found &&(PTR_IS_ERR(ptr) || PTR_IS_EOF(ptr))) if((iserr = PTR_IS_ERR(ptr)) || (iseof = PTR_IS_EOF(ptr)))
{ {
pkt++; if(iserr)
found++; {
err++;
errlen = pktlen;
}
else if(iseof)
{
eof++;
eoflen = pktlen;
}
} }
if((ptr + pktlen) > end)
{
ptr = prev;
break;
}
prev = ptr;
ptr += pktlen; ptr += pktlen;
} }
if(found)
/*
* If there were new EOF/ERR packets found, make sure that they are the last
* packet in the buffer.
*/
if((eof || err) && n_found)
{ {
ptr -= pktlen; if(err)
if(PTR_IS_ERR(ptr) || PTR_IS_EOF(ptr)) {
pkt++; ptr -= errlen;
if(!PTR_IS_ERR(ptr))
err = 0;
}
else
{
ptr -= eoflen;
if(!PTR_IS_EOF(ptr))
eof = 0;
}
} }
return pkt; return(eof + err);
} }

View File

@ -48,6 +48,7 @@ extern GWBUF *modutil_replace_SQL(GWBUF *, char *);
extern char *modutil_get_query(GWBUF* buf); extern char *modutil_get_query(GWBUF* buf);
extern int modutil_send_mysql_err_packet(DCB *, int, int, int, const char *, const char *); extern int modutil_send_mysql_err_packet(DCB *, int, int, int, const char *, const char *);
GWBUF* modutil_get_next_MySQL_packet(GWBUF** p_readbuf); GWBUF* modutil_get_next_MySQL_packet(GWBUF** p_readbuf);
GWBUF* modutil_get_complete_packets(GWBUF** p_readbuf);
int modutil_MySQL_query_len(GWBUF* buf, int* nbytes_missing); int modutil_MySQL_query_len(GWBUF* buf, int* nbytes_missing);
GWBUF *modutil_create_mysql_err_msg( GWBUF *modutil_create_mysql_err_msg(

View File

@ -161,7 +161,7 @@ typedef struct {
FILTER_DEF* dummy_filterdef; FILTER_DEF* dummy_filterdef;
int active; /* filter is active? */ int active; /* filter is active? */
bool use_ok; bool use_ok;
bool multipacket; bool multipacket[2];
unsigned char command; unsigned char command;
bool waiting[2]; /* if the client is waiting for a reply */ bool waiting[2]; /* if the client is waiting for a reply */
int eof[2]; int eof[2];
@ -172,6 +172,7 @@ typedef struct {
int n_rejected; /* Number of rejected queries */ int n_rejected; /* Number of rejected queries */
int residual; /* Any outstanding SQL text */ int residual; /* Any outstanding SQL text */
GWBUF* tee_replybuf; /* Buffer for reply */ GWBUF* tee_replybuf; /* Buffer for reply */
GWBUF* tee_partials[2];
SPINLOCK tee_lock; SPINLOCK tee_lock;
#ifdef SS_DEBUG #ifdef SS_DEBUG
long d_id; long d_id;
@ -826,10 +827,10 @@ unsigned char command = *((unsigned char*)queue->start + 4);
case 0x17: case 0x17:
case 0x04: case 0x04:
case 0x0a: case 0x0a:
my_session->multipacket = true; memset(my_session->multipacket,(char)true,2*sizeof(bool));
break; break;
default: default:
my_session->multipacket = false; memset(my_session->multipacket,(char)false,2*sizeof(bool));
break; break;
} }
@ -840,9 +841,16 @@ unsigned char command = *((unsigned char*)queue->start + 4);
#ifdef SS_DEBUG #ifdef SS_DEBUG
spinlock_acquire(&debug_lock); spinlock_acquire(&debug_lock);
my_session->d_id = ++debug_id; my_session->d_id = ++debug_id;
skygw_log_write_flush(LOGFILE_DEBUG,"tee.c [%d] query command [%x]", skygw_log_write_flush(LOGFILE_DEBUG,"tee.c [%d] command [%x]",
my_session->d_id, my_session->d_id,
my_session->command); my_session->command);
if(command == 0x03)
{
char* tmpstr = modutil_get_SQL(queue);
skygw_log_write_flush(LOGFILE_DEBUG,"tee.c query: '%s'",
tmpstr);
free(tmpstr);
}
spinlock_release(&debug_lock); spinlock_release(&debug_lock);
#endif #endif
rval = my_session->down.routeQuery(my_session->down.instance, rval = my_session->down.routeQuery(my_session->down.instance,
@ -896,14 +904,30 @@ clientReply (FILTER* instance, void *session, GWBUF *reply)
{ {
int rc, branch, eof; int rc, branch, eof;
TEE_SESSION *my_session = (TEE_SESSION *) session; TEE_SESSION *my_session = (TEE_SESSION *) session;
bool route = false; bool route = false,mpkt;
GWBUF *complete = NULL;
unsigned char *ptr;
int min_eof = my_session->command != 0x04 ? 2 : 1;
spinlock_acquire(&my_session->tee_lock); spinlock_acquire(&my_session->tee_lock);
ss_dassert(my_session->active); ss_dassert(my_session->active);
branch = instance == NULL ? CHILD : PARENT; branch = instance == NULL ? CHILD : PARENT;
unsigned char *ptr = (unsigned char*)reply->start;
my_session->tee_partials[branch] = gwbuf_append(my_session->tee_partials[branch], reply);
my_session->tee_partials[branch] = gwbuf_make_contiguous(my_session->tee_partials[branch]);
complete = modutil_get_complete_packets(&my_session->tee_partials[branch]);
complete = gwbuf_make_contiguous(complete);
if(my_session->tee_partials[branch] &&
GWBUF_EMPTY(my_session->tee_partials[branch]))
{
gwbuf_free(my_session->tee_partials[branch]);
my_session->tee_partials[branch] = NULL;
}
ptr = (unsigned char*) complete->start;
if(my_session->replies[branch] == 0) if(my_session->replies[branch] == 0)
{ {
@ -911,9 +935,10 @@ clientReply (FILTER* instance, void *session, GWBUF *reply)
* Otherwise the reply is a result set and the amount of packets is unknown. * Otherwise the reply is a result set and the amount of packets is unknown.
*/ */
if(PTR_IS_ERR(ptr) || PTR_IS_LOCAL_INFILE(ptr) || if(PTR_IS_ERR(ptr) || PTR_IS_LOCAL_INFILE(ptr) ||
PTR_IS_OK(ptr) || !my_session->multipacket ) PTR_IS_OK(ptr) || !my_session->multipacket[branch] )
{ {
my_session->waiting[branch] = false; my_session->waiting[branch] = false;
my_session->multipacket[branch] = false;
} }
#ifdef SS_DEBUG #ifdef SS_DEBUG
else else
@ -931,10 +956,10 @@ clientReply (FILTER* instance, void *session, GWBUF *reply)
if(my_session->waiting[branch]) if(my_session->waiting[branch])
{ {
eof = modutil_count_signal_packets(reply,my_session->use_ok,my_session->eof[branch] > 0);
eof = modutil_count_signal_packets(complete,my_session->use_ok,my_session->eof[branch] > 0);
my_session->eof[branch] += eof; my_session->eof[branch] += eof;
if(my_session->eof[branch] >= 2 || if(my_session->eof[branch] >= min_eof)
(my_session->command == 0x04 && my_session->eof[branch] > 0))
{ {
#ifdef SS_DEBUG #ifdef SS_DEBUG
skygw_log_write_flush(LOGFILE_DEBUG,"tee.c [%d] %s received last EOF packet", skygw_log_write_flush(LOGFILE_DEBUG,"tee.c [%d] %s received last EOF packet",
@ -948,18 +973,20 @@ clientReply (FILTER* instance, void *session, GWBUF *reply)
if(branch == PARENT) if(branch == PARENT)
{ {
ss_dassert(my_session->tee_replybuf == NULL) ss_dassert(my_session->tee_replybuf == NULL);
my_session->tee_replybuf = reply; my_session->tee_replybuf = complete;
} }
else else
{ {
gwbuf_free(reply); if(complete)
gwbuf_free(complete);
} }
my_session->replies[branch]++; my_session->replies[branch]++;
rc = 1; rc = 1;
mpkt = my_session->multipacket[PARENT] || my_session->multipacket[CHILD];
int min_eof = my_session->command != 0x04 ? 2 : 1;
if(my_session->tee_replybuf != NULL) if(my_session->tee_replybuf != NULL)
{ {
@ -972,7 +999,7 @@ clientReply (FILTER* instance, void *session, GWBUF *reply)
skygw_log_write_flush(LOGFILE_ERROR,"Error : Tee child session was closed."); skygw_log_write_flush(LOGFILE_ERROR,"Error : Tee child session was closed.");
} }
if(my_session->multipacket) if(mpkt)
{ {
if(my_session->waiting[PARENT]) if(my_session->waiting[PARENT])