diff --git a/server/modules/filter/tee.c b/server/modules/filter/tee.c index 85d16d395..9988ff692 100644 --- a/server/modules/filter/tee.c +++ b/server/modules/filter/tee.c @@ -449,6 +449,7 @@ createInstance(char **options, FILTER_PARAMETER **params) " for the match parameter.", my_instance->match); free(my_instance->match); + free(my_instance->nomatch); free(my_instance->source); free(my_instance); return NULL; @@ -458,10 +459,13 @@ createInstance(char **options, FILTER_PARAMETER **params) { MXS_ERROR("tee: Invalid regular expression '%s'" " for the nomatch paramter.\n", - my_instance->match); + my_instance->nomatch); if (my_instance->match) + { regfree(&my_instance->re); - free(my_instance->match); + free(my_instance->match); + } + free(my_instance->nomatch); free(my_instance->source); free(my_instance); return NULL; @@ -931,40 +935,23 @@ uint16_t get_response_flags(uint8_t* datastart, bool ok_packet) static int clientReply(FILTER* instance, void *session, GWBUF *reply) { - int rc, branch, eof; + int rc = 1, branch, eof; TEE_SESSION *my_session = (TEE_SESSION *) session; - bool route = false, mpkt; + bool route = true; GWBUF *complete = NULL; unsigned char *ptr; uint16_t flags = 0; - int min_eof = my_session->command != 0x04 ? 2 : 1; int more_results = 0; -#ifdef SS_DEBUG - int prev_debug_seq = atomic_add(&debug_seq, 1); - ptr = (unsigned char*) reply->start; - MXS_INFO("Tee clientReply [%s] [%s] [%s]: %d", - instance ? "parent" : "child", - my_session->active ? "open" : "closed", - PTR_IS_ERR(ptr) ? "ERR" : PTR_IS_OK(ptr) ? "OK" : "RSET", - prev_debug_seq); -#endif spinlock_acquire(&my_session->tee_lock); + int min_eof = my_session->command != 0x04 ? 2 : 1; if (!my_session->active) { - MXS_INFO("Tee: Failed to return reply, session is closed"); + spinlock_release(&my_session->tee_lock); + MXS_INFO("Tee: Failed to return reply, session is already closed"); gwbuf_free(reply); - rc = 0; - if (my_session->waiting[PARENT]) - { - GWBUF* errbuf = modutil_create_mysql_err_msg(1, 0, 1, "0000", "Session closed."); - my_session->waiting[PARENT] = false; - my_session->up.clientReply(my_session->up.instance, - my_session->up.session, - errbuf); - } - goto retblock; + return 0; } branch = instance == NULL ? CHILD : PARENT; @@ -975,22 +962,14 @@ clientReply(FILTER* instance, void *session, GWBUF *reply) if (complete == NULL) { + spinlock_release(&my_session->tee_lock); /** Incomplete packet */ MXS_DEBUG("tee.c: Incomplete packet, " "waiting for a complete packet before forwarding."); - rc = 1; - goto retblock; + return 1; } complete = gwbuf_make_contiguous(complete); - - if (my_session->tee_partials[branch] && - GWBUF_EMPTY(my_session->tee_partials[branch])) - { - gwbuf_free(my_session->tee_partials[branch]); - my_session->tee_partials[branch] = NULL; - } - ptr = (unsigned char*) complete->start; if (my_session->replies[branch] == 0) @@ -1010,38 +989,29 @@ clientReply(FILTER* instance, void *session, GWBUF *reply) more_results = (flags & 0x08) && my_session->client_multistatement; if (more_results) { - MXS_INFO("Tee: [%s] waiting for more results.", branch == PARENT ? "PARENT" : "CHILD"); + MXS_INFO("Tee: [%s] waiting for more results.", + branch == PARENT ? "PARENT" : "CHILD"); } } } -#ifdef SS_DEBUG - else - { - MXS_DEBUG("tee.c: [%ld] Waiting for a result set from %s session.", - my_session->d_id, - branch == PARENT ? "parent" : "child"); - } -#endif } if (my_session->waiting[branch]) { eof = modutil_count_signal_packets(complete, my_session->use_ok, my_session->eof[branch] > 0, &more_results); - more_results &= my_session->client_multistatement; my_session->eof[branch] += eof; if (my_session->eof[branch] >= min_eof) { -#ifdef SS_DEBUG - MXS_DEBUG("tee.c [%ld] %s received last EOF packet", - my_session->d_id, - branch == PARENT ? "parent" : "child"); -#endif - my_session->waiting[branch] = more_results; - if (more_results) + if (more_results && my_session->client_multistatement) { + my_session->waiting[branch] = true; my_session->eof[branch] = 0; } + else + { + my_session->waiting[branch] = false; + } } } @@ -1055,45 +1025,13 @@ clientReply(FILTER* instance, void *session, GWBUF *reply) } my_session->replies[branch]++; - rc = 1; - mpkt = my_session->multipacket[PARENT] || my_session->multipacket[CHILD]; - if (my_session->tee_replybuf != NULL) + if (my_session->tee_replybuf == NULL || + (!my_session->waiting[PARENT] && my_session->waiting[CHILD]) || + ((my_session->multipacket[PARENT] || my_session->multipacket[CHILD]) && + (my_session->eof[PARENT] < min_eof || my_session->eof[CHILD] < min_eof))) { - - if (my_session->branch_session == NULL) - { - rc = 0; - gwbuf_free(my_session->tee_replybuf); - my_session->tee_replybuf = NULL; - MXS_ERROR("Tee child session was closed."); - } - - if (mpkt) - { - - if (my_session->waiting[PARENT]) - { - route = true; - - } - else if (my_session->eof[PARENT] >= min_eof && - my_session->eof[CHILD] >= min_eof) - { - route = true; -#ifdef SS_DEBUG - MXS_DEBUG("tee.c:[%ld] Routing final packet of response set.", my_session->d_id); -#endif - } - } - else if (!my_session->waiting[PARENT] && - !my_session->waiting[CHILD]) - { -#ifdef SS_DEBUG - MXS_DEBUG("tee.c:[%ld] Routing single packet response.", my_session->d_id); -#endif - route = true; - } + route = false; } if (route) @@ -1124,15 +1062,12 @@ clientReply(FILTER* instance, void *session, GWBUF *reply) GWBUF* buffer = modutil_get_next_MySQL_packet(&my_session->queue); GWBUF* clone = clone_query(my_session->instance, my_session, buffer); reset_session_state(my_session, buffer); - route_single_query(my_session->instance, my_session, buffer, clone); + spinlock_release(&my_session->tee_lock); MXS_INFO("tee: routing queued query"); - + return route_single_query(my_session->instance, my_session, buffer, clone); } -retblock: - spinlock_release(&my_session->tee_lock); - return rc; }