Moved the branch session reply state inspection to routeQuery instead of clientReply.

This commit is contained in:
Markus Makela
2014-12-31 07:19:12 +02:00
parent 073db0f1e2
commit d3e15afb2f

View File

@ -157,6 +157,8 @@ typedef struct {
int n_duped; /* Number of duplicated queries */
int n_rejected; /* Number of rejected queries */
int residual; /* Any outstanding SQL text */
unsigned char last_qtype;
char* last_query;
} TEE_SESSION;
static int packet_is_required(GWBUF *queue);
@ -604,7 +606,81 @@ GWBUF *clone = NULL;
{
clone = gwbuf_clone(queue);
}
}
}
if (my_session->branch_session && my_session->waiting)
{
DCB* dcb;
SESSION *bsession;
double duration = 0.0, timeout = 0.0;
struct timeval start, now, diff;
bool do_check;
timeout = REPLY_TIMEOUT_SECOND;
timeout += (double) ((REPLY_TIMEOUT_MILLISECOND + 1.0) / 1000.0);
timerclear (&diff);
gettimeofday (&start, NULL);
spinlock_acquire (&my_session->branch_dcb->authlock);
dcb = my_session->branch_dcb;
do_check = !DCB_REPLIED (dcb);
spinlock_release (&my_session->branch_dcb->authlock);
while (do_check)
{
gettimeofday (&now, NULL);
timersub (&now, &start, &diff);
if (diff.tv_usec > 0)
{
duration = (double) diff.tv_sec + ((double) diff.tv_usec / 1000000.0);
}
if (duration > timeout)
{
/**
* Branch session has failed,
* Close it and stop cloning queries.
*/
bsession = my_session->branch_session;
bsession->ses_is_child = false;
session_free (bsession);
if (clone)
{
gwbuf_free (clone);
clone = NULL;
}
my_session->branch_session = NULL;
skygw_log_write (LOGFILE_TRACE, "tee.c: Branch session not replying fast enough, closing session.");
skygw_log_write (LOGFILE_TRACE, "tee.c: Last command was: %x.",my_session->last_qtype);
if(my_session->last_qtype == 0x03)
{
skygw_log_write (LOGFILE_TRACE, "tee.c: Last query was: %s.",my_session->last_query);
}
break;
}
thread_millisleep(1);
spinlock_acquire (&dcb->authlock);
do_check = !DCB_REPLIED (dcb);
spinlock_release (&dcb->authlock);
}
if (duration > 0.0)
{
skygw_log_write_flush (LOGFILE_TRACE, "tee.c: Waited for %.4f seconds for branch session reply.", duration);
}
spinlock_acquire (&dcb->authlock);
dcb->flags &= ~DCBF_REPLIED;
spinlock_release (&dcb->authlock);
my_session->waiting = 0;
}
ss_dassert(my_session->waiting == 0)
/* Pass the query downstream */
rval = my_session->down.routeQuery(my_session->down.instance,
my_session->down.session,
@ -616,8 +692,15 @@ GWBUF *clone = NULL;
if (my_session->branch_session->state == SESSION_STATE_ROUTER_READY)
{
my_session->waiting = 1;
SESSION_ROUTE_QUERY(my_session->branch_session, clone);
my_session->last_qtype = *((unsigned char*)clone->start + 4);
if(my_session->last_qtype == 0x03)
{
if(my_session->last_query)
free(my_session->last_query);
ptr = modutil_get_SQL(clone);
my_session->last_query = strdup(ptr);
}
SESSION_ROUTE_QUERY(my_session->branch_session, clone);
}
else
{
@ -654,68 +737,20 @@ GWBUF *clone = NULL;
* @param session The filter session
* @param reply The response data
*/
static int clientReply(FILTER* instance, void *session, GWBUF *reply)
static int
clientReply (FILTER* instance, void *session, GWBUF *reply)
{
TEE_SESSION *my_session = (TEE_SESSION *)session;
TEE_SESSION *my_session = (TEE_SESSION *) session;
DCB* dcb;
SESSION *bsession;
double duration = 0.0, timeout = 0.0;
struct timeval start,now,diff;
bool do_check;
timeout = REPLY_TIMEOUT_SECOND;
timeout += (double)((REPLY_TIMEOUT_MILLISECOND + 1.0) / 1000.0);
timerclear(&diff);
gettimeofday(&start, NULL);
if(my_session->branch_session && my_session->waiting)
{
spinlock_acquire(&my_session->branch_dcb->authlock);
dcb = my_session->branch_dcb;
do_check = !DCB_REPLIED(dcb);
spinlock_release(&my_session->branch_dcb->authlock);
while(do_check)
{
gettimeofday(&now, NULL);
timersub(&now,&start,&diff);
if(diff.tv_usec > 0)
{
duration = (double)diff.tv_sec + ((double)diff.tv_usec / 1000000.0);
}
if(duration > timeout)
{
/**
* Branch session has failed,
* Close it and return the query.
*/
bsession = my_session->branch_session;
bsession->ses_is_child = false;
session_free(bsession);
my_session->branch_session = NULL;
skygw_log_write(LOGFILE_TRACE,"tee.c: Branch session not replying fast enough, closing session.");
break;
}
spinlock_acquire(&dcb->authlock);
do_check = !DCB_REPLIED(dcb);
spinlock_release(&dcb->authlock);
}
if(duration > 0.0)
{
skygw_log_write_flush(LOGFILE_TRACE,"tee.c: Waited for %.4f seconds for branch session reply.",duration);
}
spinlock_acquire (&my_session->branch_dcb->authlock);
dcb = my_session->branch_dcb;
if(DCB_REPLIED (dcb))
my_session->waiting = 0;
}
return my_session->up.clientReply(my_session->up.instance,
my_session->up.session, reply);
spinlock_release (&my_session->branch_dcb->authlock);
return my_session->up.clientReply (my_session->up.instance,
my_session->up.session, reply);
}
/**
* Diagnostics routine