Added checks that the query was cloned in clientReply before waiting for a reply.

This commit is contained in:
Markus Makela
2014-12-30 23:05:13 +02:00
parent 9a77509e6a
commit 073db0f1e2
2 changed files with 30 additions and 41 deletions

View File

@ -59,6 +59,7 @@
#include <router.h> #include <router.h>
#include <dcb.h> #include <dcb.h>
#include <sys/time.h> #include <sys/time.h>
#include <poll.h>
#define MYSQL_COM_QUIT 0x01 #define MYSQL_COM_QUIT 0x01
#define MYSQL_COM_INITDB 0x02 #define MYSQL_COM_INITDB 0x02
@ -70,8 +71,8 @@
#define MYSQL_COM_STMT_CLOSE 0x19 #define MYSQL_COM_STMT_CLOSE 0x19
#define MYSQL_COM_STMT_RESET 0x1a #define MYSQL_COM_STMT_RESET 0x1a
#define REPLY_TIMEOUT_SECOND 2 #define REPLY_TIMEOUT_SECOND 5
#define REPLY_TIMEOUT_MILLISECOND 500 #define REPLY_TIMEOUT_MILLISECOND 1
static unsigned char required_packets[] = { static unsigned char required_packets[] = {
MYSQL_COM_QUIT, MYSQL_COM_QUIT,
@ -150,6 +151,7 @@ typedef struct {
DOWNSTREAM down; /* The downstream filter */ DOWNSTREAM down; /* The downstream filter */
UPSTREAM up; /* The upstream filter */ UPSTREAM up; /* The upstream filter */
int active; /* filter is active? */ int active; /* filter is active? */
int waiting; /* if the client is waiting for a reply */
DCB *branch_dcb; /* Client DCB for "branch" service */ DCB *branch_dcb; /* Client DCB for "branch" service */
SESSION *branch_session;/* The branch service session */ SESSION *branch_session;/* The branch service session */
int n_duped; /* Number of duplicated queries */ int n_duped; /* Number of duplicated queries */
@ -184,7 +186,6 @@ static int hcfn(
return strcmp(i1,i2); return strcmp(i1,i2);
} }
/** /**
* Implementation of the mandatory version entry point * Implementation of the mandatory version entry point
* *
@ -467,6 +468,7 @@ SESSION *bsession;
* a side effect of closing the client DCB of the * a side effect of closing the client DCB of the
* session. * session.
*/ */
my_session->active = 0; my_session->active = 0;
} }
} }
@ -613,7 +615,9 @@ GWBUF *clone = NULL;
if (my_session->branch_session->state == SESSION_STATE_ROUTER_READY) if (my_session->branch_session->state == SESSION_STATE_ROUTER_READY)
{ {
my_session->waiting = 1;
SESSION_ROUTE_QUERY(my_session->branch_session, clone); SESSION_ROUTE_QUERY(my_session->branch_session, clone);
} }
else else
{ {
@ -655,25 +659,25 @@ static int clientReply(FILTER* instance, void *session, GWBUF *reply)
TEE_SESSION *my_session = (TEE_SESSION *)session; TEE_SESSION *my_session = (TEE_SESSION *)session;
DCB* dcb; DCB* dcb;
SESSION *bsession; SESSION *bsession;
ROUTER_OBJECT *router;
void *router_instance, *rsession;
double duration = 0.0, timeout = 0.0; double duration = 0.0, timeout = 0.0;
struct timeval start,now,diff,limit; struct timeval start,now,diff;
bool do_check;
limit.tv_sec = REPLY_TIMEOUT_SECOND;
limit.tv_usec = REPLY_TIMEOUT_MILLISECOND*1000;
timeout = REPLY_TIMEOUT_SECOND; timeout = REPLY_TIMEOUT_SECOND;
timeout += (double)((REPLY_TIMEOUT_MILLISECOND + 1.0) / 1000.0); timeout += (double)((REPLY_TIMEOUT_MILLISECOND + 1.0) / 1000.0);
timerclear(&diff); timerclear(&diff);
gettimeofday(&start, NULL); gettimeofday(&start, NULL);
if(my_session->branch_session) if(my_session->branch_session && my_session->waiting)
{ {
dcb = my_session->branch_session->client; spinlock_acquire(&my_session->branch_dcb->authlock);
dcb = my_session->branch_dcb;
do_check = !DCB_REPLIED(dcb);
spinlock_release(&my_session->branch_dcb->authlock);
while(!DCB_REPLIED(dcb)) while(do_check)
{ {
gettimeofday(&now, NULL); gettimeofday(&now, NULL);
timersub(&now,&start,&diff); timersub(&now,&start,&diff);
@ -690,39 +694,24 @@ static int clientReply(FILTER* instance, void *session, GWBUF *reply)
*/ */
bsession = my_session->branch_session; bsession = my_session->branch_session;
bsession->ses_is_child = false;
if (bsession) session_free(bsession);
{
CHK_SESSION(bsession);
spinlock_acquire(&bsession->ses_lock);
if (bsession->state != SESSION_STATE_STOPPING)
{
bsession->state = SESSION_STATE_STOPPING;
}
router = bsession->service->router;
router_instance = bsession->service->router_instance;
rsession = bsession->router_session;
spinlock_release(&bsession->ses_lock);
/** Close router session and all its connections */
router->closeSession(router_instance, rsession);
my_session->branch_session = NULL; my_session->branch_session = NULL;
skygw_log_write(LOGFILE_TRACE,"tee.c: Branch session not replying fast enough, closing session.");
skygw_log_write(LOGFILE_TRACE,"tee.c: Branch session not replying fast enough, closing session.",duration);
}
break; break;
} }
thread_millisleep(1);
spinlock_acquire(&dcb->authlock);
do_check = !DCB_REPLIED(dcb);
spinlock_release(&dcb->authlock);
} }
if(duration > 0.0) if(duration > 0.0)
{ {
skygw_log_write(LOGFILE_TRACE,"tee.c: Waited for %.4f seconds for branch session reply.",duration); skygw_log_write_flush(LOGFILE_TRACE,"tee.c: Waited for %.4f seconds for branch session reply.",duration);
} }
my_session->waiting = 0;
} }
return my_session->up.clientReply(my_session->up.instance, return my_session->up.clientReply(my_session->up.instance,