From 88a26f03abd200e9a1f9dd1a87d9b2a19dc5c195 Mon Sep 17 00:00:00 2001 From: Markus Makela Date: Thu, 15 Jan 2015 13:13:09 +0200 Subject: [PATCH] Fix to bug 685: http://bugs.mariadb.com/show_bug.cgi?id=685 Added the missing detection of partial packets in the buffers. --- server/core/modutil.c | 98 ++++++++++++++++++++++++++++--------- server/include/modutil.h | 1 + server/modules/filter/tee.c | 65 +++++++++++++++++------- 3 files changed, 121 insertions(+), 43 deletions(-) diff --git a/server/core/modutil.c b/server/core/modutil.c index 7f6ef1e27..33c302e7f 100644 --- a/server/core/modutil.c +++ b/server/core/modutil.c @@ -494,42 +494,92 @@ return_packetbuf: return packetbuf; } +/** + * Parse the buffer and split complete packets into individual buffers. + * Any partial packets are left in the old buffer. + * @param p_readbuf Buffer to split + * @return Head of the chain of complete packets + */ +GWBUF* modutil_get_complete_packets(GWBUF** p_readbuf) +{ + GWBUF *buff = NULL, *packet = NULL; + + while((packet = modutil_get_next_MySQL_packet(p_readbuf)) != NULL) + { + buff = gwbuf_append(buff,packet); + } + + return buff; +} /** - * Count the number of EOF, OK or ERR packets in the buffer. + * Count the number of EOF, OK or ERR packets in the buffer. Only complete + * packets are inspected and the buffer is assumed to only contain whole packets. + * If partial packets are in the buffer, they are ingnored. The caller must handle the + * detection of partial packets in buffers. * @param reply Buffer to use * @param use_ok Whether the DEPRECATE_EOF flag is set * @param n_found If there were previous packets found * @return Number of EOF packets */ int -modutil_count_signal_packets(GWBUF *reply,int use_ok, int n_found) +modutil_count_signal_packets(GWBUF *reply, int use_ok, int n_found) { - unsigned char* ptr = (unsigned char*) reply->start; - unsigned char* end = (unsigned char*) reply->end; - unsigned char* prev = ptr; - int pktlen,pkt = 0,found = n_found; - - while(ptr < end) + unsigned char* ptr = (unsigned char*) reply->start; + unsigned char* end = (unsigned char*) reply->end; + unsigned char* prev = ptr; + int pktlen, eof = 0, err = 0, found = n_found; + int errlen = 0, eoflen = 0; + int iserr = 0, iseof = 0; + while(ptr < end) { - - pktlen = MYSQL_GET_PACKET_LEN(ptr) + 4; - - if( !found &&(PTR_IS_ERR(ptr) || PTR_IS_EOF(ptr))) - { - pkt++; - found++; + + pktlen = MYSQL_GET_PACKET_LEN(ptr) + 4; + + if((iserr = PTR_IS_ERR(ptr)) || (iseof = PTR_IS_EOF(ptr))) + { + if(iserr) + { + err++; + errlen = pktlen; + } + else if(iseof) + { + eof++; + eoflen = pktlen; + } } - - ptr += pktlen; + + if((ptr + pktlen) > end) + { + ptr = prev; + break; + } + + prev = ptr; + ptr += pktlen; } - if(found) + + /* + * If there were new EOF/ERR packets found, make sure that they are the last + * packet in the buffer. + */ + if((eof || err) && n_found) { - ptr -= pktlen; - if(PTR_IS_ERR(ptr) || PTR_IS_EOF(ptr)) - pkt++; + if(err) + { + ptr -= errlen; + if(!PTR_IS_ERR(ptr)) + err = 0; + } + else + { + ptr -= eoflen; + if(!PTR_IS_EOF(ptr)) + eof = 0; + } } - - return pkt; -} \ No newline at end of file + + return(eof + err); +} diff --git a/server/include/modutil.h b/server/include/modutil.h index 2ccab523c..e287b080e 100644 --- a/server/include/modutil.h +++ b/server/include/modutil.h @@ -48,6 +48,7 @@ extern GWBUF *modutil_replace_SQL(GWBUF *, char *); extern char *modutil_get_query(GWBUF* buf); extern int modutil_send_mysql_err_packet(DCB *, int, int, int, const char *, const char *); GWBUF* modutil_get_next_MySQL_packet(GWBUF** p_readbuf); +GWBUF* modutil_get_complete_packets(GWBUF** p_readbuf); int modutil_MySQL_query_len(GWBUF* buf, int* nbytes_missing); GWBUF *modutil_create_mysql_err_msg( diff --git a/server/modules/filter/tee.c b/server/modules/filter/tee.c index 87bc8c3c2..17331e21a 100644 --- a/server/modules/filter/tee.c +++ b/server/modules/filter/tee.c @@ -161,7 +161,7 @@ typedef struct { FILTER_DEF* dummy_filterdef; int active; /* filter is active? */ bool use_ok; - bool multipacket; + bool multipacket[2]; unsigned char command; bool waiting[2]; /* if the client is waiting for a reply */ int eof[2]; @@ -172,6 +172,7 @@ typedef struct { int n_rejected; /* Number of rejected queries */ int residual; /* Any outstanding SQL text */ GWBUF* tee_replybuf; /* Buffer for reply */ + GWBUF* tee_partials[2]; SPINLOCK tee_lock; #ifdef SS_DEBUG long d_id; @@ -826,10 +827,10 @@ unsigned char command = *((unsigned char*)queue->start + 4); case 0x17: case 0x04: case 0x0a: - my_session->multipacket = true; + memset(my_session->multipacket,(char)true,2*sizeof(bool)); break; default: - my_session->multipacket = false; + memset(my_session->multipacket,(char)false,2*sizeof(bool)); break; } @@ -840,9 +841,16 @@ unsigned char command = *((unsigned char*)queue->start + 4); #ifdef SS_DEBUG spinlock_acquire(&debug_lock); my_session->d_id = ++debug_id; - skygw_log_write_flush(LOGFILE_DEBUG,"tee.c [%d] query command [%x]", + skygw_log_write_flush(LOGFILE_DEBUG,"tee.c [%d] command [%x]", my_session->d_id, my_session->command); + if(command == 0x03) + { + char* tmpstr = modutil_get_SQL(queue); + skygw_log_write_flush(LOGFILE_DEBUG,"tee.c query: '%s'", + tmpstr); + free(tmpstr); + } spinlock_release(&debug_lock); #endif rval = my_session->down.routeQuery(my_session->down.instance, @@ -896,24 +904,41 @@ clientReply (FILTER* instance, void *session, GWBUF *reply) { int rc, branch, eof; TEE_SESSION *my_session = (TEE_SESSION *) session; - bool route = false; - + bool route = false,mpkt; + GWBUF *complete = NULL; + unsigned char *ptr; + int min_eof = my_session->command != 0x04 ? 2 : 1; + spinlock_acquire(&my_session->tee_lock); ss_dassert(my_session->active); branch = instance == NULL ? CHILD : PARENT; - unsigned char *ptr = (unsigned char*)reply->start; - if(my_session->replies[branch] == 0) + my_session->tee_partials[branch] = gwbuf_append(my_session->tee_partials[branch], reply); + my_session->tee_partials[branch] = gwbuf_make_contiguous(my_session->tee_partials[branch]); + complete = modutil_get_complete_packets(&my_session->tee_partials[branch]); + complete = gwbuf_make_contiguous(complete); + + if(my_session->tee_partials[branch] && + GWBUF_EMPTY(my_session->tee_partials[branch])) + { + gwbuf_free(my_session->tee_partials[branch]); + my_session->tee_partials[branch] = NULL; + } + + ptr = (unsigned char*) complete->start; + + if(my_session->replies[branch] == 0) { /* Reply is in a single packet if it is an OK, ERR or LOCAL_INFILE packet. * Otherwise the reply is a result set and the amount of packets is unknown. */ if(PTR_IS_ERR(ptr) || PTR_IS_LOCAL_INFILE(ptr) || - PTR_IS_OK(ptr) || !my_session->multipacket ) + PTR_IS_OK(ptr) || !my_session->multipacket[branch] ) { my_session->waiting[branch] = false; + my_session->multipacket[branch] = false; } #ifdef SS_DEBUG else @@ -931,35 +956,37 @@ clientReply (FILTER* instance, void *session, GWBUF *reply) if(my_session->waiting[branch]) { - eof = modutil_count_signal_packets(reply,my_session->use_ok,my_session->eof[branch] > 0); + + eof = modutil_count_signal_packets(complete,my_session->use_ok,my_session->eof[branch] > 0); my_session->eof[branch] += eof; - if(my_session->eof[branch] >= 2 || - (my_session->command == 0x04 && my_session->eof[branch] > 0)) + if(my_session->eof[branch] >= min_eof) { #ifdef SS_DEBUG skygw_log_write_flush(LOGFILE_DEBUG,"tee.c [%d] %s received last EOF packet", my_session->d_id, branch == PARENT?"parent":"child"); #endif - ss_dassert(my_session->eof[branch] < 3) + ss_dassert(my_session->eof[branch] < 3) my_session->waiting[branch] = false; } } if(branch == PARENT) { - ss_dassert(my_session->tee_replybuf == NULL) - my_session->tee_replybuf = reply; + ss_dassert(my_session->tee_replybuf == NULL); + my_session->tee_replybuf = complete; } else { - gwbuf_free(reply); + if(complete) + gwbuf_free(complete); } my_session->replies[branch]++; rc = 1; - - int min_eof = my_session->command != 0x04 ? 2 : 1; + mpkt = my_session->multipacket[PARENT] || my_session->multipacket[CHILD]; + + if(my_session->tee_replybuf != NULL) { @@ -972,7 +999,7 @@ clientReply (FILTER* instance, void *session, GWBUF *reply) skygw_log_write_flush(LOGFILE_ERROR,"Error : Tee child session was closed."); } - if(my_session->multipacket) + if(mpkt) { if(my_session->waiting[PARENT])