Tee filter now only returns the reply from the parent service.
This commit is contained in:
@ -154,15 +154,14 @@ typedef struct {
|
||||
FILTER_DEF* dummy_filterdef;
|
||||
int active; /* filter is active? */
|
||||
int waiting; /* if the client is waiting for a reply */
|
||||
int replies;
|
||||
int min_replies;
|
||||
int replies; /* Number of queries received */
|
||||
int min_replies; /* Minimum number of replies to receive
|
||||
* before forwarding the packet to the client*/
|
||||
DCB *branch_dcb; /* Client DCB for "branch" service */
|
||||
SESSION *branch_session;/* The branch service session */
|
||||
int n_duped; /* Number of duplicated queries */
|
||||
int n_rejected; /* Number of rejected queries */
|
||||
int residual; /* Any outstanding SQL text */
|
||||
unsigned char last_qtype;
|
||||
char* last_query;
|
||||
GWBUF* tee_replybuf; /* Buffer for reply */
|
||||
} TEE_SESSION;
|
||||
|
||||
@ -460,7 +459,7 @@ char *remote, *userName;
|
||||
ss_dassert(ses->ses_is_child);
|
||||
|
||||
dummy->obj = GetModuleObject();
|
||||
dummy->filter = my_instance;
|
||||
dummy->filter = NULL;
|
||||
|
||||
|
||||
if((dummy_upstream = filterUpstream(
|
||||
@ -654,7 +653,7 @@ SESSION* ses = my_session->branch_session;
|
||||
|
||||
#ifdef SS_DEBUG
|
||||
if(o_stopping + o_ready > 0)
|
||||
skygw_log_write(LOGFILE_TRACE,"tee.c: %d orphans in "
|
||||
skygw_log_write(LOGFILE_DEBUG,"tee.c: %d orphans in "
|
||||
"SESSION_STATE_STOPPING, %d orphans in "
|
||||
"SESSION_STATE_ROUTER_READY. ",o_stopping,o_ready);
|
||||
#endif
|
||||
@ -662,7 +661,7 @@ SESSION* ses = my_session->branch_session;
|
||||
while(finished)
|
||||
{
|
||||
#ifdef SS_DEBUG
|
||||
skygw_log_write(LOGFILE_TRACE,"tee.c: %d orphans freed.",++o_freed);
|
||||
skygw_log_write(LOGFILE_DEBUG,"tee.c: %d orphans freed.",++o_freed);
|
||||
#endif
|
||||
tmp = finished;
|
||||
finished = finished->next;
|
||||
@ -679,7 +678,6 @@ SESSION* ses = my_session->branch_session;
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the downstream filter or router to which queries will be
|
||||
* passed from this filter.
|
||||
@ -691,10 +689,10 @@ SESSION* ses = my_session->branch_session;
|
||||
static void
|
||||
setDownstream(FILTER *instance, void *session, DOWNSTREAM *downstream)
|
||||
{
|
||||
TEE_SESSION *my_session = (TEE_SESSION *)session;
|
||||
|
||||
my_session->down = *downstream;
|
||||
TEE_SESSION *my_session = (TEE_SESSION *) session;
|
||||
my_session->down = *downstream;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the downstream filter or router to which queries will be
|
||||
* passed from this filter.
|
||||
@ -703,10 +701,10 @@ TEE_SESSION *my_session = (TEE_SESSION *)session;
|
||||
* @param session The filter session
|
||||
* @param downstream The downstream filter or router.
|
||||
*/
|
||||
static void
|
||||
static void
|
||||
setUpstream(FILTER *instance, void *session, UPSTREAM *upstream)
|
||||
{
|
||||
TEE_SESSION *my_session = (TEE_SESSION *)session;
|
||||
TEE_SESSION *my_session = (TEE_SESSION *) session;
|
||||
my_session->up = *upstream;
|
||||
}
|
||||
|
||||
@ -777,6 +775,8 @@ GWBUF *clone = NULL;
|
||||
}
|
||||
}
|
||||
/* Pass the query downstream */
|
||||
|
||||
my_session->replies = 0;
|
||||
rval = my_session->down.routeQuery(my_session->down.instance,
|
||||
my_session->down.session,
|
||||
queue);
|
||||
@ -786,8 +786,6 @@ GWBUF *clone = NULL;
|
||||
|
||||
if (my_session->branch_session->state == SESSION_STATE_ROUTER_READY)
|
||||
{
|
||||
my_session->replies = 0;
|
||||
my_session->last_qtype = *((unsigned char*)queue->start + 4);
|
||||
SESSION_ROUTE_QUERY(my_session->branch_session, clone);
|
||||
}
|
||||
else
|
||||
@ -831,29 +829,22 @@ clientReply (FILTER* instance, void *session, GWBUF *reply)
|
||||
int rc;
|
||||
TEE_SESSION *my_session = (TEE_SESSION *) session;
|
||||
|
||||
ss_dassert(my_session->active);
|
||||
my_session->replies++;
|
||||
|
||||
if (my_session->tee_replybuf == NULL)
|
||||
|
||||
if (my_session->tee_replybuf == NULL &&
|
||||
instance != NULL)
|
||||
{
|
||||
my_session->tee_replybuf = reply;
|
||||
}
|
||||
else
|
||||
{
|
||||
if(my_session->last_qtype == 0x03 &&
|
||||
*(unsigned char*)(reply->start + 4) != 0xff
|
||||
&& *(unsigned char*)(my_session->tee_replybuf->start + 4) == 0xff)
|
||||
{
|
||||
gwbuf_free(my_session->tee_replybuf);
|
||||
my_session->tee_replybuf = reply;
|
||||
}
|
||||
else
|
||||
{
|
||||
gwbuf_free(reply);
|
||||
}
|
||||
}
|
||||
|
||||
if (my_session->branch_session == NULL ||
|
||||
my_session->replies >= my_session->min_replies)
|
||||
if((my_session->branch_session == NULL ||
|
||||
my_session->replies >= my_session->min_replies) &&
|
||||
my_session->tee_replybuf != NULL)
|
||||
{
|
||||
rc = my_session->up.clientReply (
|
||||
my_session->up.instance,
|
||||
|
Reference in New Issue
Block a user