From 22849f7c902ffbded39e3ff9c6f08dc0641bd776 Mon Sep 17 00:00:00 2001 From: Markus Makela Date: Fri, 6 Mar 2015 22:40:10 +0200 Subject: [PATCH] Added support for multiple result sets. --- server/core/modutil.c | 6 ++- server/include/modutil.h | 3 +- server/modules/filter/tee.c | 88 ++++++++++++++++++++++++------------- 3 files changed, 64 insertions(+), 33 deletions(-) diff --git a/server/core/modutil.c b/server/core/modutil.c index cf62b542c..6362d70a0 100644 --- a/server/core/modutil.c +++ b/server/core/modutil.c @@ -558,7 +558,7 @@ GWBUF* modutil_get_complete_packets(GWBUF** p_readbuf) * @return Number of EOF packets */ int -modutil_count_signal_packets(GWBUF *reply, int use_ok, int n_found) +modutil_count_signal_packets(GWBUF *reply, int use_ok, int n_found, int* more) { unsigned char* ptr = (unsigned char*) reply->start; unsigned char* end = (unsigned char*) reply->end; @@ -566,6 +566,7 @@ modutil_count_signal_packets(GWBUF *reply, int use_ok, int n_found) int pktlen, eof = 0, err = 0; int errlen = 0, eoflen = 0; int iserr = 0, iseof = 0; + bool moreresults = false; while(ptr < end) { @@ -587,6 +588,7 @@ modutil_count_signal_packets(GWBUF *reply, int use_ok, int n_found) if((ptr + pktlen) > end || (eof + n_found) >= 2) { + moreresults = PTR_EOF_MORE_RESULTS(ptr); ptr = prev; break; } @@ -616,6 +618,8 @@ modutil_count_signal_packets(GWBUF *reply, int use_ok, int n_found) } } + *more = moreresults; + return(eof + err); } diff --git a/server/include/modutil.h b/server/include/modutil.h index a465d00ab..dc7ee2937 100644 --- a/server/include/modutil.h +++ b/server/include/modutil.h @@ -40,6 +40,7 @@ #define PTR_IS_OK(b) (b[4] == 0x00) #define PTR_IS_ERR(b) (b[4] == 0xff) #define PTR_IS_LOCAL_INFILE(b) (b[4] == 0xfb) +#define PTR_EOF_MORE_RESULTS(b) ((PTR_IS_EOF(b) && ptr[7] & 0x08)) extern int modutil_is_SQL(GWBUF *); extern int modutil_is_SQL_prepare(GWBUF *); @@ -64,5 +65,5 @@ GWBUF *modutil_create_mysql_err_msg( const char *statemsg, const char *msg); -int modutil_count_signal_packets(GWBUF*,int,int); +int modutil_count_signal_packets(GWBUF*,int,int,int*); #endif diff --git a/server/modules/filter/tee.c b/server/modules/filter/tee.c index e0a6e51fe..236549c43 100644 --- a/server/modules/filter/tee.c +++ b/server/modules/filter/tee.c @@ -164,6 +164,7 @@ typedef struct { FILTER_DEF* dummy_filterdef; int active; /* filter is active? */ bool use_ok; + int client_multistatement; bool multipacket[2]; unsigned char command; bool waiting[2]; /* if the client is waiting for a reply */ @@ -512,6 +513,7 @@ char *remote, *userName; my_session->tee_replybuf = NULL; my_session->client_dcb = session->client; my_session->instance = my_instance; + my_session->client_multistatement = false; spinlock_init(&my_session->tee_lock); if (my_instance->source && @@ -877,6 +879,8 @@ if(!my_session->active) switch(command) { + case 0x1b: + my_session->client_multistatement = *((unsigned char*) queue->start + 5); case 0x03: case 0x16: case 0x17: @@ -989,6 +993,43 @@ int count_replies(GWBUF* buffer) return replies; } +int lenenc_length(uint8_t* ptr) +{ + char val = *ptr; + if(val < 251) + return 1; + else if(val == 0xfc) + return 3; + else if(val == 0xfd) + return 4; + else + return 9; +} + +uint16_t get_response_flags(uint8_t* datastart, bool ok_packet) +{ + uint8_t* ptr = datastart; + uint16_t rval = 0; + int pktlen = gw_mysql_get_byte3(ptr); + + ptr += 4; + + if(ok_packet) + { + ptr += lenenc_length(ptr); + ptr += lenenc_length(ptr); + memcpy(&rval,ptr,sizeof(uint8_t)*2); + } + else + { + /** This is an EOF packet*/ + ptr += 2; + memcpy(&rval,ptr,sizeof(uint8_t)*2); + } + + return rval; +} + /** * The clientReply entry point. This is passed the response buffer * to which the filter should be applied. Once processed the @@ -1007,7 +1048,9 @@ clientReply (FILTER* instance, void *session, GWBUF *reply) bool route = false,mpkt; GWBUF *complete = NULL; unsigned char *ptr; + uint16_t flags = 0; int min_eof = my_session->command != 0x04 ? 2 : 1; + int more_results = 0; #ifdef SS_DEBUG ptr = (unsigned char*) reply->start; skygw_log_write(LOGFILE_TRACE,"Tee clientReply [%s] [%s] [%s]: %d", @@ -1059,29 +1102,28 @@ clientReply (FILTER* instance, void *session, GWBUF *reply) { my_session->waiting[branch] = false; my_session->multipacket[branch] = false; + if(PTR_IS_OK(ptr)) + { + flags = get_response_flags(ptr,true); + more_results = (flags & 0x08) && my_session->client_multistatement; + } } #ifdef SS_DEBUG else { - //ss_dassert(PTR_IS_RESULTSET(ptr)); skygw_log_write_flush(LOGFILE_DEBUG,"tee.c: [%d] Waiting for a result set from %s session.", my_session->d_id, branch == PARENT?"parent":"child"); } -/* - ss_dassert(PTR_IS_ERR(ptr) || PTR_IS_LOCAL_INFILE(ptr)|| - PTR_IS_OK(ptr) || my_session->waiting[branch] || - !my_session->multipacket); -*/ #endif } if(my_session->waiting[branch]) { - - eof = modutil_count_signal_packets(complete,my_session->use_ok,my_session->eof[branch] > 0); - + eof = modutil_count_signal_packets(complete,my_session->use_ok,my_session->eof[branch] > 0,&more_results); + more_results &= my_session->client_multistatement; my_session->eof[branch] += eof; + if(my_session->eof[branch] >= min_eof) { #ifdef SS_DEBUG @@ -1089,43 +1131,27 @@ clientReply (FILTER* instance, void *session, GWBUF *reply) my_session->d_id, branch == PARENT?"parent":"child"); #endif - //ss_dassert(my_session->eof[branch] < 3) - my_session->waiting[branch] = false; + my_session->waiting[branch] = more_results; + if(more_results) + { + my_session->eof[branch] = 0; + } } } - - int reply_packets = count_replies(complete); - - /** COM_SET_OPTION returns a single EOF or ERR packet*/ - if(my_session->command == 0x1b && - reply_packets == 0 && - PTR_IS_EOF(ptr)) - { - reply_packets = 1; - } - - my_session->reply_packets[branch] += reply_packets; - if(branch == PARENT) { - //ss_dassert(my_session->tee_replybuf == NULL); my_session->tee_replybuf = gwbuf_append(my_session->tee_replybuf,complete); } else { if(complete) - gwbuf_free(complete); + gwbuf_free(complete); } my_session->replies[branch]++; rc = 1; mpkt = my_session->multipacket[PARENT] || my_session->multipacket[CHILD]; - - if(my_session->reply_packets[branch] < my_session->statements) - { - my_session->waiting[branch] = true; - } if(my_session->tee_replybuf != NULL) {