Added new member to TEE_SESSION structure: TEE_SESSION->tee_replybuf where reply message is stored until replied to client.

Temporarily commented out waiting section and left to be removed after further testing.
This commit is contained in:
VilhoRaatikka
2014-12-31 14:42:14 +02:00
parent 5ab329f888
commit d1bc68c7d0

View File

@ -162,6 +162,7 @@ typedef struct {
int residual; /* Any outstanding SQL text */ int residual; /* Any outstanding SQL text */
unsigned char last_qtype; unsigned char last_qtype;
char* last_query; char* last_query;
GWBUF* tee_replybuf; /* Buffer for reply */
} TEE_SESSION; } TEE_SESSION;
static int packet_is_required(GWBUF *queue); static int packet_is_required(GWBUF *queue);
@ -353,18 +354,18 @@ char *remote, *userName;
goto retblock; goto retblock;
} }
HASHTABLE* ht = hashtable_alloc(100,hkfn,hcfn); HASHTABLE* ht = hashtable_alloc(100,hkfn,hcfn);
bool is_loop = detect_loops(my_instance,ht,session->service); bool is_loop = detect_loops(my_instance,ht,session->service);
hashtable_free(ht); hashtable_free(ht);
if(is_loop) if(is_loop)
{ {
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"Error : %s: Recursive use of tee filter in service.", "Error : %s: Recursive use of tee filter in service.",
session->service->name))); session->service->name)));
my_session = NULL; my_session = NULL;
goto retblock; goto retblock;
} }
if ((my_session = calloc(1, sizeof(TEE_SESSION))) != NULL) if ((my_session = calloc(1, sizeof(TEE_SESSION))) != NULL)
{ {
@ -636,7 +637,7 @@ GWBUF *clone = NULL;
clone = gwbuf_clone(queue); clone = gwbuf_clone(queue);
} }
} }
#if 0
if (my_session->branch_session && my_session->waiting) if (my_session->branch_session && my_session->waiting)
{ {
SESSION *bsession; SESSION *bsession;
@ -698,6 +699,7 @@ GWBUF *clone = NULL;
my_session->waiting = 0; my_session->waiting = 0;
} }
ss_dassert(my_session->waiting == 0) ss_dassert(my_session->waiting == 0)
#endif
/* Pass the query downstream */ /* Pass the query downstream */
rval = my_session->down.routeQuery(my_session->down.instance, rval = my_session->down.routeQuery(my_session->down.instance,
my_session->down.session, my_session->down.session,
@ -750,18 +752,35 @@ GWBUF *clone = NULL;
static int static int
clientReply (FILTER* instance, void *session, GWBUF *reply) clientReply (FILTER* instance, void *session, GWBUF *reply)
{ {
TEE_SESSION *my_session = (TEE_SESSION *) session; int rc;
TEE_SESSION *my_session = (TEE_SESSION *) session;
my_session->replies++; my_session->replies++;
if (my_session->branch_session == NULL || if (my_session->tee_replybuf == NULL)
my_session->replies < my_session->min_replies) {
{ my_session->tee_replybuf = reply;
return my_session->up.clientReply (my_session->up.instance, }
my_session->up.session, reply); else
} {
gwbuf_free(reply);
return 1; }
if (my_session->branch_session == NULL ||
my_session->replies >= my_session->min_replies)
{
rc = my_session->up.clientReply (
my_session->up.instance,
my_session->up.session,
my_session->tee_replybuf);
my_session->replies = 0;
my_session->tee_replybuf = NULL;
}
else
{
rc = 1;
}
return rc;
} }
/** /**
* Diagnostics routine * Diagnostics routine