Simplified tee filter packet handling logic

Inverting the default action of not routing packets to always routing packets
makes the code simpler to understand. Removing some of the not so useful debug
logging also makes the code more readable and easier to understand.
This commit is contained in:
Markus Makela 2016-03-07 14:14:20 +02:00
parent 7aab966573
commit 3973ba36be

View File

@ -449,6 +449,7 @@ createInstance(char **options, FILTER_PARAMETER **params)
" for the match parameter.",
my_instance->match);
free(my_instance->match);
free(my_instance->nomatch);
free(my_instance->source);
free(my_instance);
return NULL;
@ -458,10 +459,13 @@ createInstance(char **options, FILTER_PARAMETER **params)
{
MXS_ERROR("tee: Invalid regular expression '%s'"
" for the nomatch paramter.\n",
my_instance->match);
my_instance->nomatch);
if (my_instance->match)
{
regfree(&my_instance->re);
free(my_instance->match);
free(my_instance->match);
}
free(my_instance->nomatch);
free(my_instance->source);
free(my_instance);
return NULL;
@ -931,40 +935,23 @@ uint16_t get_response_flags(uint8_t* datastart, bool ok_packet)
static int
clientReply(FILTER* instance, void *session, GWBUF *reply)
{
int rc, branch, eof;
int rc = 1, branch, eof;
TEE_SESSION *my_session = (TEE_SESSION *) session;
bool route = false, mpkt;
bool route = true;
GWBUF *complete = NULL;
unsigned char *ptr;
uint16_t flags = 0;
int min_eof = my_session->command != 0x04 ? 2 : 1;
int more_results = 0;
#ifdef SS_DEBUG
int prev_debug_seq = atomic_add(&debug_seq, 1);
ptr = (unsigned char*) reply->start;
MXS_INFO("Tee clientReply [%s] [%s] [%s]: %d",
instance ? "parent" : "child",
my_session->active ? "open" : "closed",
PTR_IS_ERR(ptr) ? "ERR" : PTR_IS_OK(ptr) ? "OK" : "RSET",
prev_debug_seq);
#endif
spinlock_acquire(&my_session->tee_lock);
int min_eof = my_session->command != 0x04 ? 2 : 1;
if (!my_session->active)
{
MXS_INFO("Tee: Failed to return reply, session is closed");
spinlock_release(&my_session->tee_lock);
MXS_INFO("Tee: Failed to return reply, session is already closed");
gwbuf_free(reply);
rc = 0;
if (my_session->waiting[PARENT])
{
GWBUF* errbuf = modutil_create_mysql_err_msg(1, 0, 1, "0000", "Session closed.");
my_session->waiting[PARENT] = false;
my_session->up.clientReply(my_session->up.instance,
my_session->up.session,
errbuf);
}
goto retblock;
return 0;
}
branch = instance == NULL ? CHILD : PARENT;
@ -975,22 +962,14 @@ clientReply(FILTER* instance, void *session, GWBUF *reply)
if (complete == NULL)
{
spinlock_release(&my_session->tee_lock);
/** Incomplete packet */
MXS_DEBUG("tee.c: Incomplete packet, "
"waiting for a complete packet before forwarding.");
rc = 1;
goto retblock;
return 1;
}
complete = gwbuf_make_contiguous(complete);
if (my_session->tee_partials[branch] &&
GWBUF_EMPTY(my_session->tee_partials[branch]))
{
gwbuf_free(my_session->tee_partials[branch]);
my_session->tee_partials[branch] = NULL;
}
ptr = (unsigned char*) complete->start;
if (my_session->replies[branch] == 0)
@ -1010,38 +989,29 @@ clientReply(FILTER* instance, void *session, GWBUF *reply)
more_results = (flags & 0x08) && my_session->client_multistatement;
if (more_results)
{
MXS_INFO("Tee: [%s] waiting for more results.", branch == PARENT ? "PARENT" : "CHILD");
MXS_INFO("Tee: [%s] waiting for more results.",
branch == PARENT ? "PARENT" : "CHILD");
}
}
}
#ifdef SS_DEBUG
else
{
MXS_DEBUG("tee.c: [%ld] Waiting for a result set from %s session.",
my_session->d_id,
branch == PARENT ? "parent" : "child");
}
#endif
}
if (my_session->waiting[branch])
{
eof = modutil_count_signal_packets(complete, my_session->use_ok, my_session->eof[branch] > 0, &more_results);
more_results &= my_session->client_multistatement;
my_session->eof[branch] += eof;
if (my_session->eof[branch] >= min_eof)
{
#ifdef SS_DEBUG
MXS_DEBUG("tee.c [%ld] %s received last EOF packet",
my_session->d_id,
branch == PARENT ? "parent" : "child");
#endif
my_session->waiting[branch] = more_results;
if (more_results)
if (more_results && my_session->client_multistatement)
{
my_session->waiting[branch] = true;
my_session->eof[branch] = 0;
}
else
{
my_session->waiting[branch] = false;
}
}
}
@ -1055,45 +1025,13 @@ clientReply(FILTER* instance, void *session, GWBUF *reply)
}
my_session->replies[branch]++;
rc = 1;
mpkt = my_session->multipacket[PARENT] || my_session->multipacket[CHILD];
if (my_session->tee_replybuf != NULL)
if (my_session->tee_replybuf == NULL ||
(!my_session->waiting[PARENT] && my_session->waiting[CHILD]) ||
((my_session->multipacket[PARENT] || my_session->multipacket[CHILD]) &&
(my_session->eof[PARENT] < min_eof || my_session->eof[CHILD] < min_eof)))
{
if (my_session->branch_session == NULL)
{
rc = 0;
gwbuf_free(my_session->tee_replybuf);
my_session->tee_replybuf = NULL;
MXS_ERROR("Tee child session was closed.");
}
if (mpkt)
{
if (my_session->waiting[PARENT])
{
route = true;
}
else if (my_session->eof[PARENT] >= min_eof &&
my_session->eof[CHILD] >= min_eof)
{
route = true;
#ifdef SS_DEBUG
MXS_DEBUG("tee.c:[%ld] Routing final packet of response set.", my_session->d_id);
#endif
}
}
else if (!my_session->waiting[PARENT] &&
!my_session->waiting[CHILD])
{
#ifdef SS_DEBUG
MXS_DEBUG("tee.c:[%ld] Routing single packet response.", my_session->d_id);
#endif
route = true;
}
route = false;
}
if (route)
@ -1124,15 +1062,12 @@ clientReply(FILTER* instance, void *session, GWBUF *reply)
GWBUF* buffer = modutil_get_next_MySQL_packet(&my_session->queue);
GWBUF* clone = clone_query(my_session->instance, my_session, buffer);
reset_session_state(my_session, buffer);
route_single_query(my_session->instance, my_session, buffer, clone);
spinlock_release(&my_session->tee_lock);
MXS_INFO("tee: routing queued query");
return route_single_query(my_session->instance, my_session, buffer, clone);
}
retblock:
spinlock_release(&my_session->tee_lock);
return rc;
}