Fix tee filter

With the addition of filter capabilities, the tee filter should work with
all sorts of routers that require at most the RCAP_TYPE_CONTIGUOUS_INPUT
capability.

Due to a recent discovery of the server's capability to process multiple
requests, the filter can safely send data from one service to another
without waiting for the earlier replies.

This also fixes a minor problem with the cloning of DCBs where the backend
DCBs could end up in the wrong thread's pool.
This commit is contained in:
Markus Mäkelä
2016-12-14 13:03:39 +02:00
parent 1b7a0a80ee
commit f9a7edc7d2
2 changed files with 37 additions and 384 deletions

View File

@ -295,6 +295,7 @@ dcb_clone(DCB *orig)
clonedcb->ssl_state = orig->ssl_state;
clonedcb->remote = remote;
clonedcb->user = user;
clonedcb->thread.id = orig->thread.id;
clonedcb->protocol = orig->protocol;
clonedcb->func.write = dcb_null_write;

View File

@ -548,9 +548,6 @@ newSession(FILTER *instance, SESSION *session)
{
DCB* dcb;
SESSION* ses;
FILTER_DEF* dummy;
UPSTREAM* dummy_upstream;
if ((dcb = dcb_clone(session->client_dcb)) == NULL)
{
freeSession(instance, (void *) my_session);
@ -562,23 +559,8 @@ newSession(FILTER *instance, SESSION *session)
goto retblock;
}
if ((dummy = filter_alloc("tee_dummy", "tee_dummy")) == NULL)
{
dcb_close(dcb);
freeSession(instance, (void *) my_session);
my_session = NULL;
MXS_ERROR("tee: Allocating memory for "
"dummy filter definition failed."
" Terminating session.");
goto retblock;
}
if ((ses = session_alloc(my_instance->service, dcb)) == NULL)
{
filter_free(dummy);
dcb_close(dcb);
freeSession(instance, (void *) my_session);
my_session = NULL;
@ -590,29 +572,8 @@ newSession(FILTER *instance, SESSION *session)
ss_dassert(ses->ses_is_child);
dummy->obj = GetModuleObject();
dummy->filter = NULL;
my_session->branch_session = ses;
my_session->branch_dcb = dcb;
my_session->dummy_filterdef = dummy;
if ((dummy_upstream = filter_upstream(dummy, my_session, &ses->tail)) == NULL)
{
filter_free(dummy);
closeSession(instance, (void*) my_session);
dcb_close(dcb);
MXS_FREE(my_session);
MXS_ERROR("tee: Allocating memory for"
"dummy upstream failed."
" Terminating session.");
return NULL;
}
ses->tail = *dummy_upstream;
MySQLProtocol* protocol = (MySQLProtocol*) session->client_dcb->protocol;
my_session->use_ok = protocol->client_capabilities & (1 << 6);
MXS_FREE(dummy_upstream);
}
}
retblock:
@ -788,143 +749,9 @@ routeQuery(FILTER *instance, void *session, GWBUF *queue)
{
TEE_INSTANCE *my_instance = (TEE_INSTANCE *) instance;
TEE_SESSION *my_session = (TEE_SESSION *) session;
char *ptr;
int rval;
GWBUF *buffer = NULL, *clone = NULL;
unsigned char command = gwbuf_length(queue) >= 5 ?
*((unsigned char*) queue->start + 4) : 1;
GWBUF *clone = clone_query(my_instance, my_session, queue);
#ifdef SS_DEBUG
int prev_debug_seq = atomic_add(&debug_seq, 1);
MXS_INFO("Tee routeQuery: %d : %s",
prev_debug_seq,
((char*) queue->start + 5));
#endif
spinlock_acquire(&my_session->tee_lock);
if (!my_session->active)
{
MXS_INFO("Tee: Received a reply when the session was closed.");
gwbuf_free(queue);
spinlock_release(&my_session->tee_lock);
return 0;
}
if (my_session->queue)
{
my_session->queue = gwbuf_append(my_session->queue, queue);
buffer = modutil_get_next_MySQL_packet(&my_session->queue);
}
else
{
buffer = modutil_get_next_MySQL_packet(&queue);
my_session->queue = queue;
}
if (buffer == NULL)
{
spinlock_release(&my_session->tee_lock);
return 1;
}
clone = clone_query(my_instance, my_session, buffer);
spinlock_release(&my_session->tee_lock);
/* Reset session state */
if (!reset_session_state(my_session, buffer))
{
return 0;
}
/** Route query downstream */
spinlock_acquire(&my_session->tee_lock);
rval = route_single_query(my_instance, my_session, buffer, clone);
spinlock_release(&my_session->tee_lock);
return rval;
}
int count_replies(GWBUF* buffer)
{
unsigned char* ptr = (unsigned char*) buffer->start;
unsigned char* end = (unsigned char*) buffer->end;
int pktlen, eof = 0;
int replies = 0;
while (ptr < end)
{
pktlen = MYSQL_GET_PACKET_LEN(ptr) + 4;
if (PTR_IS_OK(ptr) || PTR_IS_ERR(ptr) || PTR_IS_LOCAL_INFILE(ptr))
{
replies++;
ptr += pktlen;
}
else
{
while (ptr < end && eof < 2)
{
pktlen = MYSQL_GET_PACKET_LEN(ptr) + 4;
if (PTR_IS_EOF(ptr) || PTR_IS_ERR(ptr))
{
eof++;
}
ptr += pktlen;
}
if (eof == 2)
{
replies++;
}
eof = 0;
}
}
return replies;
}
int lenenc_length(uint8_t* ptr)
{
uint8_t val = *ptr;
if (val < 251)
{
return 1;
}
else if (val == 0xfc)
{
return 3;
}
else if (val == 0xfd)
{
return 4;
}
else
{
return 9;
}
}
uint16_t get_response_flags(uint8_t* datastart, bool ok_packet)
{
uint8_t* ptr = datastart;
uint16_t rval = 0;
int pktlen = gw_mysql_get_byte3(ptr);
ptr += 4;
if (ok_packet)
{
ptr += lenenc_length(ptr);
ptr += lenenc_length(ptr);
memcpy(&rval, ptr, sizeof(uint8_t) * 2);
}
else
{
/** This is an EOF packet*/
ptr += 2;
memcpy(&rval, ptr, sizeof(uint8_t) * 2);
}
return rval;
return route_single_query(my_instance, my_session, queue, clone);
}
/**
@ -942,138 +769,10 @@ clientReply(FILTER* instance, void *session, GWBUF *reply)
{
int rc = 1, branch, eof;
TEE_SESSION *my_session = (TEE_SESSION *) session;
bool route = true;
GWBUF *complete = NULL;
unsigned char *ptr;
uint16_t flags = 0;
int more_results = 0;
spinlock_acquire(&my_session->tee_lock);
int min_eof = my_session->command != 0x04 ? 2 : 1;
if (!my_session->active)
{
spinlock_release(&my_session->tee_lock);
MXS_INFO("Tee: Failed to return reply, session is already closed");
gwbuf_free(reply);
return 0;
}
branch = instance == NULL ? CHILD : PARENT;
my_session->tee_partials[branch] = gwbuf_append(my_session->tee_partials[branch], reply);
my_session->tee_partials[branch] = gwbuf_make_contiguous(my_session->tee_partials[branch]);
complete = modutil_get_complete_packets(&my_session->tee_partials[branch]);
if (complete == NULL)
{
spinlock_release(&my_session->tee_lock);
/** Incomplete packet */
MXS_DEBUG("tee.c: Incomplete packet, "
"waiting for a complete packet before forwarding.");
return 1;
}
complete = gwbuf_make_contiguous(complete);
ptr = (unsigned char*) complete->start;
if (my_session->replies[branch] == 0)
{
MXS_INFO("Tee: First reply to a query for [%s].", branch == PARENT ? "PARENT" : "CHILD");
/* Reply is in a single packet if it is an OK, ERR or LOCAL_INFILE packet.
* Otherwise the reply is a result set and the amount of packets is unknown.
*/
if (PTR_IS_ERR(ptr) || PTR_IS_LOCAL_INFILE(ptr) ||
PTR_IS_OK(ptr) || !my_session->multipacket[branch])
{
my_session->waiting[branch] = false;
my_session->multipacket[branch] = false;
if (PTR_IS_OK(ptr))
{
flags = get_response_flags(ptr, true);
more_results = (flags & 0x08) && my_session->client_multistatement;
if (more_results)
{
MXS_INFO("Tee: [%s] waiting for more results.",
branch == PARENT ? "PARENT" : "CHILD");
}
}
}
}
if (my_session->waiting[branch])
{
eof = modutil_count_signal_packets(complete, my_session->use_ok, my_session->eof[branch] > 0, &more_results);
my_session->eof[branch] += eof;
if (my_session->eof[branch] >= min_eof)
{
if (more_results && my_session->client_multistatement)
{
my_session->waiting[branch] = true;
my_session->eof[branch] = 0;
}
else
{
my_session->waiting[branch] = false;
}
}
}
if (branch == PARENT)
{
my_session->tee_replybuf = gwbuf_append(my_session->tee_replybuf, complete);
}
else
{
gwbuf_free(complete);
}
my_session->replies[branch]++;
if (my_session->tee_replybuf == NULL ||
(!my_session->waiting[PARENT] && my_session->waiting[CHILD]) ||
((my_session->multipacket[PARENT] || my_session->multipacket[CHILD]) &&
(my_session->eof[PARENT] < min_eof || my_session->eof[CHILD] < min_eof)))
{
route = false;
}
if (route)
{
#ifdef SS_DEBUG
MXS_DEBUG("tee.c:[%ld] Routing buffer '%p' parent(waiting [%s] replies [%d] eof[%d])"
" child(waiting [%s] replies[%d] eof [%d])",
my_session->d_id,
my_session->tee_replybuf,
my_session->waiting[PARENT] ? "true" : "false",
my_session->replies[PARENT],
my_session->eof[PARENT],
my_session->waiting[CHILD] ? "true" : "false",
my_session->replies[CHILD],
my_session->eof[CHILD]);
#endif
rc = my_session->up.clientReply(my_session->up.instance,
my_session->up.session,
my_session->tee_replybuf);
my_session->tee_replybuf = NULL;
}
if (my_session->queue &&
!my_session->waiting[PARENT] &&
!my_session->waiting[CHILD])
{
GWBUF* buffer = modutil_get_next_MySQL_packet(&my_session->queue);
GWBUF* clone = clone_query(my_session->instance, my_session, buffer);
reset_session_state(my_session, buffer);
spinlock_release(&my_session->tee_lock);
MXS_INFO("tee: routing queued query");
return route_single_query(my_session->instance, my_session, buffer, clone);
}
spinlock_release(&my_session->tee_lock);
return rc;
return my_session->up.clientReply(my_session->up.instance,
my_session->up.session,
reply);
}
/**
@ -1211,64 +910,29 @@ int detect_loops(TEE_INSTANCE *instance, HASHTABLE* ht, SERVICE* service)
return false;
}
int internal_route(DCB* dcb)
{
GWBUF* buffer = dcb->dcb_readqueue;
/** This was set in the newSession function*/
TEE_SESSION* session = dcb->data;
return routeQuery((FILTER*) session->instance, session, buffer);
}
/**
*
* @param my_instance
* @param my_session
* @param buffer
* @return
*/
GWBUF* clone_query(TEE_INSTANCE* my_instance, TEE_SESSION* my_session, GWBUF* buffer)
{
GWBUF* clone = NULL;
int residual = 0;
char* ptr;
if (my_session->branch_session &&
my_session->branch_session->state == SESSION_STATE_ROUTER_READY)
if ((!my_instance->match && !my_instance->nomatch) || packet_is_required(buffer))
{
if (my_session->residual)
{
clone = gwbuf_clone_all(buffer);
clone = gwbuf_clone_all(buffer);
}
else
{
char *ptr = modutil_get_SQL(buffer);
if (my_session->residual < GWBUF_LENGTH(clone))
{
GWBUF_RTRIM(clone, GWBUF_LENGTH(clone) - residual);
}
my_session->residual -= GWBUF_LENGTH(clone);
if (my_session->residual < 0)
{
my_session->residual = 0;
}
}
else if (my_session->active && (ptr = modutil_get_SQL(buffer)) != NULL)
if (ptr)
{
if ((my_instance->match == NULL ||
regexec(&my_instance->re, ptr, 0, NULL, 0) == 0) &&
(my_instance->nomatch == NULL ||
regexec(&my_instance->nore, ptr, 0, NULL, 0) != 0))
if ((my_instance->match && regexec(&my_instance->re, ptr, 0, NULL, 0) == 0) ||
(my_instance->nomatch && regexec(&my_instance->nore, ptr, 0, NULL, 0) != 0))
{
clone = gwbuf_clone_all(buffer);
my_session->residual = residual;
}
MXS_FREE(ptr);
}
else if (packet_is_required(buffer))
{
clone = gwbuf_clone_all(buffer);
}
}
return clone;
}
@ -1285,42 +949,30 @@ GWBUF* clone_query(TEE_INSTANCE* my_instance, TEE_SESSION* my_session, GWBUF* bu
int route_single_query(TEE_INSTANCE* my_instance, TEE_SESSION* my_session, GWBUF* buffer, GWBUF* clone)
{
int rval = 0;
if (!my_session->active ||
my_session->branch_session == NULL ||
my_session->branch_session->state != SESSION_STATE_ROUTER_READY)
{
rval = 0;
my_session->active = 0;
return rval;
}
if (clone == NULL)
if (my_session->active && my_session->branch_session &&
my_session->branch_session->state == SESSION_STATE_ROUTER_READY)
{
/** We won't be expecting any response from the child branch */
my_session->waiting[CHILD] = false;
my_session->multipacket[CHILD] = false;
my_session->eof[CHILD] = 2;
my_session->n_rejected++;
}
rval = my_session->down.routeQuery(my_session->down.instance,
my_session->down.session,
buffer);
if (clone)
{
my_session->n_duped++;
if (my_session->branch_session->state == SESSION_STATE_ROUTER_READY)
rval = my_session->down.routeQuery(my_session->down.instance,
my_session->down.session,
buffer);
if (clone)
{
SESSION_ROUTE_QUERY(my_session->branch_session, clone);
}
else
{
/** Close tee session */
my_session->active = 0;
rval = 0;
MXS_INFO("Closed tee filter session: Child session in invalid state.");
gwbuf_free(clone);
my_session->n_duped++;
if (my_session->branch_session->state == SESSION_STATE_ROUTER_READY)
{
SESSION_ROUTE_QUERY(my_session->branch_session, clone);
}
else
{
/** Close tee session */
my_session->active = 0;
rval = 0;
MXS_INFO("Closed tee filter session: Child session in invalid state.");
gwbuf_free(clone);
}
}
}