From 3973ba36be15c22d678dca37c2952fd5ded1b00b Mon Sep 17 00:00:00 2001 From: Markus Makela Date: Mon, 7 Mar 2016 14:14:20 +0200 Subject: [PATCH] Simplified tee filter packet handling logic Inverting the default action of not routing packets to always routing packets makes the code simpler to understand. Removing some of the not so useful debug logging also makes the code more readable and easier to understand. --- server/modules/filter/tee.c | 123 +++++++++--------------------------- 1 file changed, 29 insertions(+), 94 deletions(-) diff --git a/server/modules/filter/tee.c b/server/modules/filter/tee.c index 85d16d395..9988ff692 100644 --- a/server/modules/filter/tee.c +++ b/server/modules/filter/tee.c @@ -449,6 +449,7 @@ createInstance(char **options, FILTER_PARAMETER **params) " for the match parameter.", my_instance->match); free(my_instance->match); + free(my_instance->nomatch); free(my_instance->source); free(my_instance); return NULL; @@ -458,10 +459,13 @@ createInstance(char **options, FILTER_PARAMETER **params) { MXS_ERROR("tee: Invalid regular expression '%s'" " for the nomatch paramter.\n", - my_instance->match); + my_instance->nomatch); if (my_instance->match) + { regfree(&my_instance->re); - free(my_instance->match); + free(my_instance->match); + } + free(my_instance->nomatch); free(my_instance->source); free(my_instance); return NULL; @@ -931,40 +935,23 @@ uint16_t get_response_flags(uint8_t* datastart, bool ok_packet) static int clientReply(FILTER* instance, void *session, GWBUF *reply) { - int rc, branch, eof; + int rc = 1, branch, eof; TEE_SESSION *my_session = (TEE_SESSION *) session; - bool route = false, mpkt; + bool route = true; GWBUF *complete = NULL; unsigned char *ptr; uint16_t flags = 0; - int min_eof = my_session->command != 0x04 ? 2 : 1; int more_results = 0; -#ifdef SS_DEBUG - int prev_debug_seq = atomic_add(&debug_seq, 1); - ptr = (unsigned char*) reply->start; - MXS_INFO("Tee clientReply [%s] [%s] [%s]: %d", - instance ? "parent" : "child", - my_session->active ? "open" : "closed", - PTR_IS_ERR(ptr) ? "ERR" : PTR_IS_OK(ptr) ? "OK" : "RSET", - prev_debug_seq); -#endif spinlock_acquire(&my_session->tee_lock); + int min_eof = my_session->command != 0x04 ? 2 : 1; if (!my_session->active) { - MXS_INFO("Tee: Failed to return reply, session is closed"); + spinlock_release(&my_session->tee_lock); + MXS_INFO("Tee: Failed to return reply, session is already closed"); gwbuf_free(reply); - rc = 0; - if (my_session->waiting[PARENT]) - { - GWBUF* errbuf = modutil_create_mysql_err_msg(1, 0, 1, "0000", "Session closed."); - my_session->waiting[PARENT] = false; - my_session->up.clientReply(my_session->up.instance, - my_session->up.session, - errbuf); - } - goto retblock; + return 0; } branch = instance == NULL ? CHILD : PARENT; @@ -975,22 +962,14 @@ clientReply(FILTER* instance, void *session, GWBUF *reply) if (complete == NULL) { + spinlock_release(&my_session->tee_lock); /** Incomplete packet */ MXS_DEBUG("tee.c: Incomplete packet, " "waiting for a complete packet before forwarding."); - rc = 1; - goto retblock; + return 1; } complete = gwbuf_make_contiguous(complete); - - if (my_session->tee_partials[branch] && - GWBUF_EMPTY(my_session->tee_partials[branch])) - { - gwbuf_free(my_session->tee_partials[branch]); - my_session->tee_partials[branch] = NULL; - } - ptr = (unsigned char*) complete->start; if (my_session->replies[branch] == 0) @@ -1010,38 +989,29 @@ clientReply(FILTER* instance, void *session, GWBUF *reply) more_results = (flags & 0x08) && my_session->client_multistatement; if (more_results) { - MXS_INFO("Tee: [%s] waiting for more results.", branch == PARENT ? "PARENT" : "CHILD"); + MXS_INFO("Tee: [%s] waiting for more results.", + branch == PARENT ? "PARENT" : "CHILD"); } } } -#ifdef SS_DEBUG - else - { - MXS_DEBUG("tee.c: [%ld] Waiting for a result set from %s session.", - my_session->d_id, - branch == PARENT ? "parent" : "child"); - } -#endif } if (my_session->waiting[branch]) { eof = modutil_count_signal_packets(complete, my_session->use_ok, my_session->eof[branch] > 0, &more_results); - more_results &= my_session->client_multistatement; my_session->eof[branch] += eof; if (my_session->eof[branch] >= min_eof) { -#ifdef SS_DEBUG - MXS_DEBUG("tee.c [%ld] %s received last EOF packet", - my_session->d_id, - branch == PARENT ? "parent" : "child"); -#endif - my_session->waiting[branch] = more_results; - if (more_results) + if (more_results && my_session->client_multistatement) { + my_session->waiting[branch] = true; my_session->eof[branch] = 0; } + else + { + my_session->waiting[branch] = false; + } } } @@ -1055,45 +1025,13 @@ clientReply(FILTER* instance, void *session, GWBUF *reply) } my_session->replies[branch]++; - rc = 1; - mpkt = my_session->multipacket[PARENT] || my_session->multipacket[CHILD]; - if (my_session->tee_replybuf != NULL) + if (my_session->tee_replybuf == NULL || + (!my_session->waiting[PARENT] && my_session->waiting[CHILD]) || + ((my_session->multipacket[PARENT] || my_session->multipacket[CHILD]) && + (my_session->eof[PARENT] < min_eof || my_session->eof[CHILD] < min_eof))) { - - if (my_session->branch_session == NULL) - { - rc = 0; - gwbuf_free(my_session->tee_replybuf); - my_session->tee_replybuf = NULL; - MXS_ERROR("Tee child session was closed."); - } - - if (mpkt) - { - - if (my_session->waiting[PARENT]) - { - route = true; - - } - else if (my_session->eof[PARENT] >= min_eof && - my_session->eof[CHILD] >= min_eof) - { - route = true; -#ifdef SS_DEBUG - MXS_DEBUG("tee.c:[%ld] Routing final packet of response set.", my_session->d_id); -#endif - } - } - else if (!my_session->waiting[PARENT] && - !my_session->waiting[CHILD]) - { -#ifdef SS_DEBUG - MXS_DEBUG("tee.c:[%ld] Routing single packet response.", my_session->d_id); -#endif - route = true; - } + route = false; } if (route) @@ -1124,15 +1062,12 @@ clientReply(FILTER* instance, void *session, GWBUF *reply) GWBUF* buffer = modutil_get_next_MySQL_packet(&my_session->queue); GWBUF* clone = clone_query(my_session->instance, my_session, buffer); reset_session_state(my_session, buffer); - route_single_query(my_session->instance, my_session, buffer, clone); + spinlock_release(&my_session->tee_lock); MXS_INFO("tee: routing queued query"); - + return route_single_query(my_session->instance, my_session, buffer, clone); } -retblock: - spinlock_release(&my_session->tee_lock); - return rc; }