diff --git a/server/modules/filter/tee.c b/server/modules/filter/tee.c index 8f0156e32..af56660db 100644 --- a/server/modules/filter/tee.c +++ b/server/modules/filter/tee.c @@ -61,6 +61,7 @@ #include #include #include +#include #define MYSQL_COM_QUIT 0x01 #define MYSQL_COM_INITDB 0x02 @@ -76,6 +77,10 @@ #define REPLY_TIMEOUT_MILLISECOND 1 #define PARENT 0 #define CHILD 1 + +#define PTR_IS_RESULTSET(b) (b[0] == 0x01 && b[1] == 0x0 && b[2] == 0x0 && b[3] == 0x01) +#define PTR_IS_EOF(b) (b[4] == 0xfe) + static unsigned char required_packets[] = { MYSQL_COM_QUIT, MYSQL_COM_INITDB, @@ -155,10 +160,9 @@ typedef struct { FILTER_DEF* dummy_filterdef; int active; /* filter is active? */ - int waiting; /* if the client is waiting for a reply */ + bool waiting[2]; /* if the client is waiting for a reply */ + int eof[2]; int replies[2]; /* Number of queries received */ - int min_replies; /* Minimum number of replies to receive - * before forwarding the packet to the client*/ DCB *branch_dcb; /* Client DCB for "branch" service */ SESSION *branch_session;/* The branch service session */ int n_duped; /* Number of duplicated queries */ @@ -166,7 +170,6 @@ typedef struct { int residual; /* Any outstanding SQL text */ GWBUF* tee_replybuf; /* Buffer for reply */ SPINLOCK tee_lock; - long bytes_left[2]; } TEE_SESSION; typedef struct orphan_session_tt @@ -205,6 +208,83 @@ static int hcfn( return strcmp(i1,i2); } +static void +orphan_free(void* data) +{ + spinlock_acquire(&orphanLock); + orphan_session_t *ptr = allOrphans, *finished = NULL, *tmp = NULL; +#ifdef SS_DEBUG + int o_stopping = 0, o_ready = 0, o_freed = 0; +#endif + while(ptr) + { + if(ptr->session->state == SESSION_STATE_TO_BE_FREED) + { + if(ptr == allOrphans) + { + tmp = ptr; + allOrphans = ptr->next; + } + else + { + tmp = allOrphans; + while(tmp && tmp->next != ptr) + tmp = tmp->next; + if(tmp) + { + tmp->next = ptr->next; + tmp = ptr; + } + } + } +#ifdef SS_DEBUG + else if(ptr->session->state == SESSION_STATE_STOPPING) + { + o_stopping++; + } + else if(ptr->session->state == SESSION_STATE_ROUTER_READY) + { + o_ready++; + } +#endif + ptr = ptr->next; + if(tmp) + { + tmp->next = finished; + finished = tmp; + tmp = NULL; + } + } + + spinlock_release(&orphanLock); + +#ifdef SS_DEBUG + if(o_stopping + o_ready > 0) + skygw_log_write(LOGFILE_DEBUG, "tee.c: %d orphans in " + "SESSION_STATE_STOPPING, %d orphans in " + "SESSION_STATE_ROUTER_READY. ", o_stopping, o_ready); +#endif + + while(finished) + { + o_freed++; + tmp = finished; + finished = finished->next; + + tmp->session->service->router->freeSession( + tmp->session->service->router_instance, + tmp->session->router_session); + + tmp->session->state = SESSION_STATE_FREE; + free(tmp->session); + free(tmp); + } + +#ifdef SS_DEBUG + skygw_log_write(LOGFILE_DEBUG, "tee.c: %d orphans freed.", o_freed); +#endif +} + /** * Implementation of the mandatory version entry point * @@ -224,6 +304,7 @@ void ModuleInit() { spinlock_init(&orphanLock); + hktask_add("tee orphan cleanup",orphan_free,NULL,15); } /** @@ -493,7 +574,6 @@ char *remote, *userName; } ses->tail = *dummy_upstream; - my_session->min_replies = 2; my_session->branch_session = ses; my_session->branch_dcb = dcb; my_session->dummy_filterdef = dummy; @@ -608,78 +688,6 @@ SESSION* ses = my_session->branch_session; } free(session); - spinlock_acquire(&orphanLock); - orphan_session_t *ptr = allOrphans, *finished = NULL,*tmp = NULL; -#ifdef SS_DEBUG - int o_stopping = 0, o_ready = 0,o_freed = 0; -#endif - while(ptr) - { - if(ptr->session->state == SESSION_STATE_TO_BE_FREED) - { - if(ptr == allOrphans) - { - tmp = ptr; - allOrphans = ptr->next; - } - else - { - tmp = allOrphans; - while(tmp && tmp->next != ptr) - tmp = tmp->next; - if(tmp) - { - tmp->next = ptr->next; - tmp = ptr; - } - } - } -#ifdef SS_DEBUG - else if(ptr->session->state == SESSION_STATE_STOPPING) - { - o_stopping++; - } - else if(ptr->session->state == SESSION_STATE_ROUTER_READY) - { - o_ready++; - } -#endif - ptr = ptr->next; - if(tmp) - { - tmp->next = finished; - finished = tmp; - tmp = NULL; - } - } - - spinlock_release(&orphanLock); - -#ifdef SS_DEBUG - if(o_stopping + o_ready > 0) - skygw_log_write(LOGFILE_DEBUG,"tee.c: %d orphans in " - "SESSION_STATE_STOPPING, %d orphans in " - "SESSION_STATE_ROUTER_READY. ",o_stopping,o_ready); -#endif - - while(finished) - { -#ifdef SS_DEBUG - skygw_log_write(LOGFILE_DEBUG,"tee.c: %d orphans freed.",++o_freed); -#endif - tmp = finished; - finished = finished->next; - - tmp->session->service->router->freeSession( - tmp->session->service->router_instance, - tmp->session->router_session); - - tmp->session->state = SESSION_STATE_FREE; - free(tmp->session); - free(tmp); - } - - return; } /** @@ -779,10 +787,11 @@ GWBUF *clone = NULL; ss_dassert(my_session->tee_replybuf == NULL); - memset(my_session->bytes_left,0,2*sizeof(long)); memset(my_session->replies,0,2*sizeof(int)); - ss_dassert(my_session->tee_replybuf == NULL); - rval = my_session->down.routeQuery(my_session->down.instance, + memset(my_session->eof,0,2*sizeof(int)); + memset(my_session->waiting,0,2*sizeof(bool)); + + rval = my_session->down.routeQuery(my_session->down.instance, my_session->down.session, queue); if (clone) @@ -818,7 +827,38 @@ GWBUF *clone = NULL; return rval; } -#define BOTH_REPLIED(s) (s->replies[PARENT] > 0 && s->replies[CHILD] > 0) +/** + * Scans the GWBUF for EOF packets. If two packets for this session have been found + * from either the parent or the child branch, mark the response set from that branch as over. + * @param session The Tee filter session + * @param branch Parent or child branch + * @param reply Buffer to scan + */ +void +scan_resultset(TEE_SESSION *session, int branch, GWBUF *reply) +{ + unsigned char* ptr = (unsigned char*) reply->start; + unsigned char* end = (unsigned char*) reply->end; + int pktlen = 0; + + while(ptr < end) + { + pktlen = gw_mysql_get_byte3(ptr) + 4; + if(PTR_IS_EOF(ptr)) + { + session->eof[branch]++; + + if(session->eof[branch] == 2) + { + session->waiting[branch] = false; + session->eof[branch] = 0; + return; + } + } + + ptr += pktlen; + } +} /** * The clientReply entry point. This is passed the response buffer @@ -833,46 +873,46 @@ GWBUF *clone = NULL; static int clientReply (FILTER* instance, void *session, GWBUF *reply) { - int rc, replies, branch; - long n_bytes_missing, len; + int rc, branch; TEE_SESSION *my_session = (TEE_SESSION *) session; spinlock_acquire(&my_session->tee_lock); ss_dassert(my_session->active); - - len = gwbuf_length(reply); + branch = instance == NULL ? CHILD : PARENT; + unsigned char *ptr = (unsigned char*)reply->start; if(my_session->replies[branch] == 0) { - long pklen = MYSQL_GET_PACKET_LEN((uint8_t *) GWBUF_DATA(reply)) + 4; - my_session->bytes_left[branch] = pklen; + if(PTR_IS_RESULTSET(ptr)) + { + my_session->waiting[branch] = true; + my_session->eof[branch] = 0; + } } - my_session->bytes_left[branch] -= len; - my_session->replies[branch]++; - n_bytes_missing = MAX(my_session->bytes_left[PARENT],0) + - MAX(my_session->bytes_left[CHILD],0); - - replies = my_session->replies[PARENT] + my_session->replies[CHILD]; + if(my_session->waiting[branch]) + { + scan_resultset(my_session,branch,reply); + } if(branch == PARENT) { - my_session->tee_replybuf = my_session->tee_replybuf == NULL ? - reply : gwbuf_append(my_session->tee_replybuf,reply); + ss_dassert(my_session->tee_replybuf == NULL) + my_session->tee_replybuf = reply; } else { gwbuf_free(reply); } + my_session->replies[branch]++; - - if(my_session->tee_replybuf != NULL && + if(my_session->tee_replybuf != NULL && (my_session->branch_session == NULL || - (n_bytes_missing == 0 && BOTH_REPLIED(my_session)) || - my_session->bytes_left[PARENT] > 0)) + my_session->waiting[PARENT] || + (!my_session->waiting[CHILD] && !my_session->waiting[PARENT]))) { rc = my_session->up.clientReply ( my_session->up.instance,