Merge branch 'release-1.0GA' of https://github.com/mariadb-corporation/MaxScale into release-1.0GA
Conflicts: server/modules/filter/tee.c
This commit is contained in:
commit
a3d5367d55
@ -154,16 +154,16 @@ typedef struct {
|
||||
FILTER_DEF* dummy_filterdef;
|
||||
int active; /* filter is active? */
|
||||
int waiting; /* if the client is waiting for a reply */
|
||||
int replies;
|
||||
int min_replies;
|
||||
int replies; /* Number of queries received */
|
||||
int min_replies; /* Minimum number of replies to receive
|
||||
* before forwarding the packet to the client*/
|
||||
DCB *branch_dcb; /* Client DCB for "branch" service */
|
||||
SESSION *branch_session;/* The branch service session */
|
||||
int n_duped; /* Number of duplicated queries */
|
||||
int n_rejected; /* Number of rejected queries */
|
||||
int residual; /* Any outstanding SQL text */
|
||||
unsigned char last_qtype;
|
||||
char* last_query;
|
||||
GWBUF* tee_replybuf; /* Buffer for reply */
|
||||
SPINLOCK tee_lock;
|
||||
} TEE_SESSION;
|
||||
|
||||
typedef struct orphan_session_tt
|
||||
@ -382,7 +382,7 @@ char *remote, *userName;
|
||||
{
|
||||
my_session->active = 1;
|
||||
my_session->residual = 0;
|
||||
|
||||
spinlock_init(&my_session->tee_lock);
|
||||
if (my_instance->source &&
|
||||
(remote = session_get_remote(session)) != NULL)
|
||||
{
|
||||
@ -460,7 +460,7 @@ char *remote, *userName;
|
||||
ss_dassert(ses->ses_is_child);
|
||||
|
||||
dummy->obj = GetModuleObject();
|
||||
dummy->filter = my_instance;
|
||||
dummy->filter = NULL;
|
||||
|
||||
|
||||
if((dummy_upstream = filterUpstream(
|
||||
@ -654,7 +654,7 @@ SESSION* ses = my_session->branch_session;
|
||||
|
||||
#ifdef SS_DEBUG
|
||||
if(o_stopping + o_ready > 0)
|
||||
skygw_log_write(LOGFILE_TRACE,"tee.c: %d orphans in "
|
||||
skygw_log_write(LOGFILE_DEBUG,"tee.c: %d orphans in "
|
||||
"SESSION_STATE_STOPPING, %d orphans in "
|
||||
"SESSION_STATE_ROUTER_READY. ",o_stopping,o_ready);
|
||||
#endif
|
||||
@ -662,7 +662,7 @@ SESSION* ses = my_session->branch_session;
|
||||
while(finished)
|
||||
{
|
||||
#ifdef SS_DEBUG
|
||||
skygw_log_write(LOGFILE_TRACE,"tee.c: %d orphans freed.",++o_freed);
|
||||
skygw_log_write(LOGFILE_DEBUG,"tee.c: %d orphans freed.",++o_freed);
|
||||
#endif
|
||||
tmp = finished;
|
||||
finished = finished->next;
|
||||
@ -679,7 +679,6 @@ SESSION* ses = my_session->branch_session;
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the downstream filter or router to which queries will be
|
||||
* passed from this filter.
|
||||
@ -691,10 +690,10 @@ SESSION* ses = my_session->branch_session;
|
||||
static void
|
||||
setDownstream(FILTER *instance, void *session, DOWNSTREAM *downstream)
|
||||
{
|
||||
TEE_SESSION *my_session = (TEE_SESSION *)session;
|
||||
|
||||
my_session->down = *downstream;
|
||||
TEE_SESSION *my_session = (TEE_SESSION *) session;
|
||||
my_session->down = *downstream;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the downstream filter or router to which queries will be
|
||||
* passed from this filter.
|
||||
@ -703,10 +702,10 @@ TEE_SESSION *my_session = (TEE_SESSION *)session;
|
||||
* @param session The filter session
|
||||
* @param downstream The downstream filter or router.
|
||||
*/
|
||||
static void
|
||||
static void
|
||||
setUpstream(FILTER *instance, void *session, UPSTREAM *upstream)
|
||||
{
|
||||
TEE_SESSION *my_session = (TEE_SESSION *)session;
|
||||
TEE_SESSION *my_session = (TEE_SESSION *) session;
|
||||
my_session->up = *upstream;
|
||||
}
|
||||
|
||||
@ -776,18 +775,17 @@ GWBUF *clone = NULL;
|
||||
}
|
||||
}
|
||||
/* Pass the query downstream */
|
||||
rval = my_session->down.routeQuery(
|
||||
my_session->down.instance,
|
||||
my_session->down.session,
|
||||
queue);
|
||||
|
||||
my_session->replies = 0;
|
||||
rval = my_session->down.routeQuery(my_session->down.instance,
|
||||
my_session->down.session,
|
||||
queue);
|
||||
if (clone)
|
||||
{
|
||||
my_session->n_duped++;
|
||||
|
||||
if (my_session->branch_session->state == SESSION_STATE_ROUTER_READY)
|
||||
{
|
||||
my_session->replies = 0;
|
||||
my_session->last_qtype = *((unsigned char*)queue->start + 4);
|
||||
SESSION_ROUTE_QUERY(my_session->branch_session, clone);
|
||||
}
|
||||
else
|
||||
@ -796,7 +794,7 @@ GWBUF *clone = NULL;
|
||||
my_session->active = 0;
|
||||
LOGIF(LT, (skygw_log_write(
|
||||
LOGFILE_TRACE,
|
||||
"Closed tee filter session.")));
|
||||
"Closed tee filter session.")));
|
||||
gwbuf_free(clone);
|
||||
}
|
||||
}
|
||||
@ -831,29 +829,24 @@ clientReply (FILTER* instance, void *session, GWBUF *reply)
|
||||
int rc;
|
||||
TEE_SESSION *my_session = (TEE_SESSION *) session;
|
||||
|
||||
spinlock_acquire(&my_session->tee_lock);
|
||||
|
||||
ss_dassert(my_session->active);
|
||||
my_session->replies++;
|
||||
|
||||
if (my_session->tee_replybuf == NULL)
|
||||
|
||||
if (my_session->tee_replybuf == NULL &&
|
||||
instance != NULL)
|
||||
{
|
||||
my_session->tee_replybuf = reply;
|
||||
}
|
||||
else
|
||||
{
|
||||
if(my_session->last_qtype == 0x03 &&
|
||||
*(unsigned char*)(reply->start + 4) != 0xff
|
||||
&& *(unsigned char*)(my_session->tee_replybuf->start + 4) == 0xff)
|
||||
{
|
||||
gwbuf_free(my_session->tee_replybuf);
|
||||
my_session->tee_replybuf = reply;
|
||||
}
|
||||
else
|
||||
{
|
||||
gwbuf_free(reply);
|
||||
}
|
||||
}
|
||||
|
||||
if (my_session->branch_session == NULL ||
|
||||
my_session->replies >= my_session->min_replies)
|
||||
if((my_session->branch_session == NULL ||
|
||||
my_session->replies >= my_session->min_replies) &&
|
||||
my_session->tee_replybuf != NULL)
|
||||
{
|
||||
rc = my_session->up.clientReply (
|
||||
my_session->up.instance,
|
||||
@ -865,7 +858,9 @@ clientReply (FILTER* instance, void *session, GWBUF *reply)
|
||||
else
|
||||
{
|
||||
rc = 1;
|
||||
}
|
||||
}
|
||||
|
||||
spinlock_release(&my_session->tee_lock);
|
||||
return rc;
|
||||
}
|
||||
/**
|
||||
|
Loading…
x
Reference in New Issue
Block a user