From b635eb1493506c290cc8617c8dbd40eafb63e428 Mon Sep 17 00:00:00 2001 From: Markus Makela Date: Wed, 14 Jan 2015 21:13:52 +0200 Subject: [PATCH] Additional debugging info added to tee filter. --- server/core/modutil.c | 43 +++---- server/modules/filter/tee.c | 220 +++++++++++++++++++++++------------- 2 files changed, 166 insertions(+), 97 deletions(-) diff --git a/server/core/modutil.c b/server/core/modutil.c index fd7fb3b68..7bb57ef2c 100644 --- a/server/core/modutil.c +++ b/server/core/modutil.c @@ -505,27 +505,32 @@ return_packetbuf: int modutil_count_signal_packets(GWBUF *reply,int use_ok, int n_found) { - unsigned char* ptr = (unsigned char*) reply->start; - unsigned char* end = (unsigned char*) reply->end; - int pktlen,pkt = 0; - - while(ptr < end) + unsigned char* ptr = (unsigned char*) reply->start; + unsigned char* end = (unsigned char*) reply->end; + unsigned char* prev = ptr; + int pktlen,pkt = 0,found = n_found; + + while(ptr < end) { - pktlen = gw_mysql_get_byte3(ptr) + 4; + + pktlen = MYSQL_GET_PACKET_LEN(ptr) + 4; - if(PTR_IS_ERR(ptr) || (PTR_IS_EOF(ptr) && !use_ok) || (use_ok && PTR_IS_OK(ptr))) - { - if(n_found) - { - if(ptr + pktlen >= end) - pkt++; - } - else - { - pkt++; - } + if( !found &&(PTR_IS_ERR(ptr) || PTR_IS_EOF(ptr))) + { + pkt++; + found++; } - ptr += pktlen; + + ptr += pktlen; } - return pkt; + + if(found) + { + ptr -= pktlen; + if(PTR_IS_ERR(ptr) || PTR_IS_EOF(ptr)) + pkt++; + } + + return pkt; } +5D diff --git a/server/modules/filter/tee.c b/server/modules/filter/tee.c index 639ad7a51..bb89f99c8 100644 --- a/server/modules/filter/tee.c +++ b/server/modules/filter/tee.c @@ -173,6 +173,9 @@ typedef struct { int residual; /* Any outstanding SQL text */ GWBUF* tee_replybuf; /* Buffer for reply */ SPINLOCK tee_lock; +#ifdef SS_DEBUG + long d_id; +#endif } TEE_SESSION; typedef struct orphan_session_tt @@ -181,6 +184,11 @@ typedef struct orphan_session_tt struct orphan_session_tt* next; }orphan_session_t; +#ifdef SS_DEBUG +static SPINLOCK debug_lock; +static long debug_id = 0; +#endif + static orphan_session_t* allOrphans = NULL; static SPINLOCK orphanLock; @@ -283,9 +291,7 @@ orphan_free(void* data) while(finished) { -#ifdef SS_DEBUG o_freed++; -#endif tmp = finished; finished = finished->next; @@ -322,7 +328,10 @@ void ModuleInit() { spinlock_init(&orphanLock); - hktask_add("tee orphan cleanup",orphan_free,NULL,15); + //hktask_add("tee orphan cleanup",orphan_free,NULL,15); +#ifdef SS_DEBUG + spinlock_init(&debug_lock); +#endif } /** @@ -707,6 +716,8 @@ session_state_t state; gwbuf_free(my_session->tee_replybuf); free(session); + orphan_free(NULL); + return; } /** @@ -824,6 +835,14 @@ unsigned char command = *((unsigned char*)queue->start + 4); memset(my_session->eof,0,2*sizeof(int)); memset(my_session->waiting,1,2*sizeof(bool)); my_session->command = command; +#ifdef SS_DEBUG + spinlock_acquire(&debug_lock); + my_session->d_id = ++debug_id; + skygw_log_write_flush(LOGFILE_DEBUG,"tee.c [%d] query command [%x]", + my_session->d_id, + my_session->command); + spinlock_release(&debug_lock); +#endif rval = my_session->down.routeQuery(my_session->down.instance, my_session->down.session, queue); @@ -873,93 +892,138 @@ unsigned char command = *((unsigned char*)queue->start + 4); static int clientReply (FILTER* instance, void *session, GWBUF *reply) { - int rc, branch, eof; - TEE_SESSION *my_session = (TEE_SESSION *) session; + int rc, branch, eof; + TEE_SESSION *my_session = (TEE_SESSION *) session; + bool route = false; - spinlock_acquire(&my_session->tee_lock); + spinlock_acquire(&my_session->tee_lock); - ss_dassert(my_session->active); + ss_dassert(my_session->active); - branch = instance == NULL ? CHILD : PARENT; - unsigned char *ptr = (unsigned char*)reply->start; + branch = instance == NULL ? CHILD : PARENT; + unsigned char *ptr = (unsigned char*)reply->start; - if(my_session->replies[branch] == 0) - { - /* Reply is in a single packet if it is an OK, ERR or LOCAL_INFILE packet. - * Otherwise the reply is a result set and the amount of packets is unknown. - */ - if(PTR_IS_ERR(ptr) || PTR_IS_LOCAL_INFILE(ptr) || - PTR_IS_OK(ptr) || !my_session->multipacket ) - { - my_session->waiting[branch] = false; - } + if(my_session->replies[branch] == 0) + { + /* Reply is in a single packet if it is an OK, ERR or LOCAL_INFILE packet. + * Otherwise the reply is a result set and the amount of packets is unknown. + */ + if(PTR_IS_ERR(ptr) || PTR_IS_LOCAL_INFILE(ptr) || + PTR_IS_OK(ptr) || !my_session->multipacket ) + { + my_session->waiting[branch] = false; + } #ifdef SS_DEBUG - else - { - ss_dassert(PTR_IS_RESULTSET(ptr)); - skygw_log_write_flush(LOGFILE_DEBUG,"tee.c: Waiting for a result set from %s session.",branch == PARENT?"parent":"child"); - } - ss_dassert(PTR_IS_ERR(ptr) || PTR_IS_LOCAL_INFILE(ptr)|| - PTR_IS_OK(ptr) || my_session->waiting[branch] || - !my_session->multipacket); + else + { + ss_dassert(PTR_IS_RESULTSET(ptr)); + skygw_log_write_flush(LOGFILE_DEBUG,"tee.c: [%d] Waiting for a result set from %s session.", + my_session->d_id, + branch == PARENT?"parent":"child"); + } + ss_dassert(PTR_IS_ERR(ptr) || PTR_IS_LOCAL_INFILE(ptr)|| + PTR_IS_OK(ptr) || my_session->waiting[branch] || + !my_session->multipacket); #endif - } + } - if(my_session->waiting[branch]) - { - - eof = modutil_count_signal_packets(reply,my_session->use_ok,my_session->eof[branch] > 0); - my_session->eof[branch] += eof; - if(my_session->eof[branch] >= 2 || - (my_session->command == 0x04 && my_session->eof[branch] > 0)) - { - ss_dassert(my_session->eof[branch] < 3) - my_session->waiting[branch] = false; - } - } + if(my_session->waiting[branch]) + { + eof = modutil_count_signal_packets(reply,my_session->use_ok,my_session->eof[branch] > 0); + my_session->eof[branch] += eof; + if(my_session->eof[branch] >= 2 || + (my_session->command == 0x04 && my_session->eof[branch] > 0)) + { + skygw_log_write_flush(LOGFILE_DEBUG,"tee.c [%d] %s received last EOF packet", + my_session->d_id, + branch == PARENT?"parent":"child"); + ss_dassert(my_session->eof[branch] < 3) + my_session->waiting[branch] = false; + } + } - if(branch == PARENT) - { - ss_dassert(my_session->tee_replybuf == NULL) - my_session->tee_replybuf = reply; - } - else - { - gwbuf_free(reply); - } + if(branch == PARENT) + { + ss_dassert(my_session->tee_replybuf == NULL) + my_session->tee_replybuf = reply; + } + else + { + gwbuf_free(reply); + } - my_session->replies[branch]++; - - if(my_session->tee_replybuf != NULL && - (my_session->branch_session == NULL || - my_session->waiting[PARENT] || - (!my_session->waiting[CHILD] && !my_session->waiting[PARENT]))) + my_session->replies[branch]++; + rc = 1; + + int min_eof = my_session->command != 0x04 ? 2 : 1; + + if(my_session->tee_replybuf != NULL) + { + + if(my_session->branch_session == NULL) + { + rc = 0; + gwbuf_free(my_session->tee_replybuf); + my_session->tee_replybuf = NULL; + skygw_log_write_flush(LOGFILE_ERROR,"Error : Tee child session was closed."); + } + + if(my_session->multipacket) + { + + if(my_session->waiting[PARENT]) + { + route = true; +#ifdef SS_DEBUG + ss_dassert(my_session->replies[PARENT] < 2 || + modutil_count_signal_packets(my_session->tee_replybuf, + my_session->use_ok, + my_session->eof[PARENT]) == 0); + skygw_log_write_flush(LOGFILE_DEBUG,"tee.c:[%d] Routing partial response set.",my_session->d_id); +#endif + } + else if(my_session->eof[PARENT] == min_eof && + my_session->eof[CHILD] == min_eof) + { + route = true; +#ifdef SS_DEBUG + skygw_log_write_flush(LOGFILE_DEBUG,"tee.c:[%d] Routing final packet of response set.",my_session->d_id); +#endif + } + } + else if(!my_session->waiting[PARENT] && + !my_session->waiting[CHILD]) + { +#ifdef SS_DEBUG + skygw_log_write_flush(LOGFILE_DEBUG,"tee.c:[%d] Routing single packet response.",my_session->d_id); +#endif + route = true; + } + } + + if(route) { #ifdef SS_DEBUG - skygw_log_write_flush(LOGFILE_DEBUG, "tee.c: Routing buffer '%p' parent(waiting [%s] replies [%d] eof[%d])" - " child(waiting [%s] replies[%d] eof [%d])", - my_session->tee_replybuf, - my_session->waiting[PARENT] ? "true":"false", - my_session->replies[PARENT], - my_session->eof[PARENT], - my_session->waiting[CHILD]?"true":"false", - my_session->replies[CHILD], - my_session->eof[CHILD]); + skygw_log_write_flush(LOGFILE_DEBUG, "tee.c:[%d] Routing buffer '%p' parent(waiting [%s] replies [%d] eof[%d])" + " child(waiting [%s] replies[%d] eof [%d])", + my_session->d_id, + my_session->tee_replybuf, + my_session->waiting[PARENT] ? "true":"false", + my_session->replies[PARENT], + my_session->eof[PARENT], + my_session->waiting[CHILD]?"true":"false", + my_session->replies[CHILD], + my_session->eof[CHILD]); #endif - - rc = my_session->up.clientReply ( - my_session->up.instance, - my_session->up.session, - my_session->tee_replybuf); - my_session->tee_replybuf = NULL; - } - else - { - rc = 1; - } - - spinlock_release(&my_session->tee_lock); - return rc; + + rc = my_session->up.clientReply (my_session->up.instance, + my_session->up.session, + my_session->tee_replybuf); + my_session->tee_replybuf = NULL; + } + + spinlock_release(&my_session->tee_lock); + return rc; } /** * Diagnostics routine