Changed the way replies are handled.

Tee filter now receives two client replies, one from the parent service and
one from the child service. These replies are checked before cloning any more queries
into the child service. If the reply isn't sent fast enough, the child session is closed.
This commit is contained in:
Markus Makela
2014-12-31 12:58:37 +02:00
parent d3e15afb2f
commit 5ab329f888

View File

@ -150,8 +150,11 @@ typedef struct {
typedef struct {
DOWNSTREAM down; /* The downstream filter */
UPSTREAM up; /* The upstream filter */
UPSTREAM* dummy_upstream;
int active; /* filter is active? */
int waiting; /* if the client is waiting for a reply */
int replies;
int min_replies;
DCB *branch_dcb; /* Client DCB for "branch" service */
SESSION *branch_session;/* The branch service session */
int n_duped; /* Number of duplicated queries */
@ -292,7 +295,8 @@ int i;
free(my_instance->source);
free(my_instance);
return NULL;
}
}
if (my_instance->match &&
regcomp(&my_instance->re, my_instance->match, REG_ICASE))
{
@ -421,9 +425,33 @@ char *remote, *userName;
goto retblock;
}
FILTER_DEF* dummy = filter_alloc("tee_dummy","tee_dummy");
dummy->obj = GetModuleObject();
dummy->filter = my_instance;
/*
if(my_instance->service->n_filters == 0)
{
ses->n_filters = 1;
ses->filters = calloc(2,sizeof(SESSION_FILTER));
ses->filters[0] = NULL;
ses->head = filterApply(dummy, ses,
&ses->head);
}
*/
ses->tail = *filterUpstream (dummy,my_session,&ses->tail);
my_session->min_replies = 2;
ss_dassert(ses->ses_is_child);
my_session->branch_session = ses;
my_session->branch_dcb = dcb;
my_session->dummy_upstream = dummy;
}
}
retblock:
@ -507,6 +535,7 @@ SESSION* ses = my_session->branch_session;
my_session->branch_session = NULL;
}
}
free(my_session->dummy_upstream);
free(session);
return;
}
@ -610,7 +639,6 @@ GWBUF *clone = NULL;
if (my_session->branch_session && my_session->waiting)
{
DCB* dcb;
SESSION *bsession;
double duration = 0.0, timeout = 0.0;
struct timeval start, now, diff;
@ -622,13 +650,8 @@ GWBUF *clone = NULL;
timerclear (&diff);
gettimeofday (&start, NULL);
spinlock_acquire (&my_session->branch_dcb->authlock);
dcb = my_session->branch_dcb;
do_check = !DCB_REPLIED (dcb);
spinlock_release (&my_session->branch_dcb->authlock);
do_check = my_session->replies < my_session->min_replies;
while (do_check)
{
gettimeofday (&now, NULL);
@ -665,20 +688,14 @@ GWBUF *clone = NULL;
break;
}
thread_millisleep(1);
spinlock_acquire (&dcb->authlock);
do_check = !DCB_REPLIED (dcb);
spinlock_release (&dcb->authlock);
do_check = my_session->replies < my_session->min_replies;
}
if (duration > 0.0)
{
skygw_log_write_flush (LOGFILE_TRACE, "tee.c: Waited for %.4f seconds for branch session reply.", duration);
}
spinlock_acquire (&dcb->authlock);
dcb->flags &= ~DCBF_REPLIED;
spinlock_release (&dcb->authlock);
my_session->waiting = 0;
}
ss_dassert(my_session->waiting == 0)
/* Pass the query downstream */
@ -692,14 +709,7 @@ GWBUF *clone = NULL;
if (my_session->branch_session->state == SESSION_STATE_ROUTER_READY)
{
my_session->waiting = 1;
my_session->last_qtype = *((unsigned char*)clone->start + 4);
if(my_session->last_qtype == 0x03)
{
if(my_session->last_query)
free(my_session->last_query);
ptr = modutil_get_SQL(clone);
my_session->last_query = strdup(ptr);
}
my_session->replies = 0;
SESSION_ROUTE_QUERY(my_session->branch_session, clone);
}
else
@ -741,16 +751,17 @@ static int
clientReply (FILTER* instance, void *session, GWBUF *reply)
{
TEE_SESSION *my_session = (TEE_SESSION *) session;
DCB* dcb;
spinlock_acquire (&my_session->branch_dcb->authlock);
dcb = my_session->branch_dcb;
if(DCB_REPLIED (dcb))
my_session->waiting = 0;
spinlock_release (&my_session->branch_dcb->authlock);
return my_session->up.clientReply (my_session->up.instance,
my_session->replies++;
if (my_session->branch_session == NULL ||
my_session->replies < my_session->min_replies)
{
return my_session->up.clientReply (my_session->up.instance,
my_session->up.session, reply);
}
return 1;
}
/**
* Diagnostics routine
@ -820,7 +831,6 @@ int i;
int detect_loops(TEE_INSTANCE *instance,HASHTABLE* ht, SERVICE* service)
{
SERVICE* svc = service;
bool recurse = true;
int i;
if(ht == NULL)