diff --git a/server/modules/filter/tee.c b/server/modules/filter/tee.c index 179799192..373acf950 100644 --- a/server/modules/filter/tee.c +++ b/server/modules/filter/tee.c @@ -150,8 +150,11 @@ typedef struct { typedef struct { DOWNSTREAM down; /* The downstream filter */ UPSTREAM up; /* The upstream filter */ + UPSTREAM* dummy_upstream; int active; /* filter is active? */ int waiting; /* if the client is waiting for a reply */ + int replies; + int min_replies; DCB *branch_dcb; /* Client DCB for "branch" service */ SESSION *branch_session;/* The branch service session */ int n_duped; /* Number of duplicated queries */ @@ -292,7 +295,8 @@ int i; free(my_instance->source); free(my_instance); return NULL; - } + } + if (my_instance->match && regcomp(&my_instance->re, my_instance->match, REG_ICASE)) { @@ -421,9 +425,33 @@ char *remote, *userName; goto retblock; } + + FILTER_DEF* dummy = filter_alloc("tee_dummy","tee_dummy"); + + dummy->obj = GetModuleObject(); + dummy->filter = my_instance; + + /* + if(my_instance->service->n_filters == 0) + { + ses->n_filters = 1; + ses->filters = calloc(2,sizeof(SESSION_FILTER)); + ses->filters[0] = NULL; + + ses->head = filterApply(dummy, ses, + &ses->head); + } + */ + + + ses->tail = *filterUpstream (dummy,my_session,&ses->tail); + + my_session->min_replies = 2; + ss_dassert(ses->ses_is_child); my_session->branch_session = ses; my_session->branch_dcb = dcb; + my_session->dummy_upstream = dummy; } } retblock: @@ -507,6 +535,7 @@ SESSION* ses = my_session->branch_session; my_session->branch_session = NULL; } } + free(my_session->dummy_upstream); free(session); return; } @@ -610,7 +639,6 @@ GWBUF *clone = NULL; if (my_session->branch_session && my_session->waiting) { - DCB* dcb; SESSION *bsession; double duration = 0.0, timeout = 0.0; struct timeval start, now, diff; @@ -622,13 +650,8 @@ GWBUF *clone = NULL; timerclear (&diff); gettimeofday (&start, NULL); - - - spinlock_acquire (&my_session->branch_dcb->authlock); - dcb = my_session->branch_dcb; - do_check = !DCB_REPLIED (dcb); - spinlock_release (&my_session->branch_dcb->authlock); - + do_check = my_session->replies < my_session->min_replies; + while (do_check) { gettimeofday (&now, NULL); @@ -665,20 +688,14 @@ GWBUF *clone = NULL; break; } thread_millisleep(1); - spinlock_acquire (&dcb->authlock); - do_check = !DCB_REPLIED (dcb); - spinlock_release (&dcb->authlock); + do_check = my_session->replies < my_session->min_replies; } if (duration > 0.0) { skygw_log_write_flush (LOGFILE_TRACE, "tee.c: Waited for %.4f seconds for branch session reply.", duration); } - spinlock_acquire (&dcb->authlock); - dcb->flags &= ~DCBF_REPLIED; - spinlock_release (&dcb->authlock); my_session->waiting = 0; - } ss_dassert(my_session->waiting == 0) /* Pass the query downstream */ @@ -692,14 +709,7 @@ GWBUF *clone = NULL; if (my_session->branch_session->state == SESSION_STATE_ROUTER_READY) { my_session->waiting = 1; - my_session->last_qtype = *((unsigned char*)clone->start + 4); - if(my_session->last_qtype == 0x03) - { - if(my_session->last_query) - free(my_session->last_query); - ptr = modutil_get_SQL(clone); - my_session->last_query = strdup(ptr); - } + my_session->replies = 0; SESSION_ROUTE_QUERY(my_session->branch_session, clone); } else @@ -741,16 +751,17 @@ static int clientReply (FILTER* instance, void *session, GWBUF *reply) { TEE_SESSION *my_session = (TEE_SESSION *) session; - DCB* dcb; - - spinlock_acquire (&my_session->branch_dcb->authlock); - dcb = my_session->branch_dcb; - if(DCB_REPLIED (dcb)) - my_session->waiting = 0; - spinlock_release (&my_session->branch_dcb->authlock); - return my_session->up.clientReply (my_session->up.instance, + my_session->replies++; + + if (my_session->branch_session == NULL || + my_session->replies < my_session->min_replies) + { + return my_session->up.clientReply (my_session->up.instance, my_session->up.session, reply); + } + + return 1; } /** * Diagnostics routine @@ -820,7 +831,6 @@ int i; int detect_loops(TEE_INSTANCE *instance,HASHTABLE* ht, SERVICE* service) { SERVICE* svc = service; - bool recurse = true; int i; if(ht == NULL)