Additional debugging info added to tee filter.

This commit is contained in:
Markus Makela
2015-01-14 21:13:52 +02:00
parent 5c210455fa
commit b635eb1493
2 changed files with 166 additions and 97 deletions

View File

@ -505,27 +505,32 @@ return_packetbuf:
int int
modutil_count_signal_packets(GWBUF *reply,int use_ok, int n_found) modutil_count_signal_packets(GWBUF *reply,int use_ok, int n_found)
{ {
unsigned char* ptr = (unsigned char*) reply->start; unsigned char* ptr = (unsigned char*) reply->start;
unsigned char* end = (unsigned char*) reply->end; unsigned char* end = (unsigned char*) reply->end;
int pktlen,pkt = 0; unsigned char* prev = ptr;
int pktlen,pkt = 0,found = n_found;
while(ptr < end)
while(ptr < end)
{ {
pktlen = gw_mysql_get_byte3(ptr) + 4;
pktlen = MYSQL_GET_PACKET_LEN(ptr) + 4;
if(PTR_IS_ERR(ptr) || (PTR_IS_EOF(ptr) && !use_ok) || (use_ok && PTR_IS_OK(ptr))) if( !found &&(PTR_IS_ERR(ptr) || PTR_IS_EOF(ptr)))
{ {
if(n_found) pkt++;
{ found++;
if(ptr + pktlen >= end)
pkt++;
}
else
{
pkt++;
}
} }
ptr += pktlen;
ptr += pktlen;
} }
return pkt;
if(found)
{
ptr -= pktlen;
if(PTR_IS_ERR(ptr) || PTR_IS_EOF(ptr))
pkt++;
}
return pkt;
} }
5D

View File

@ -173,6 +173,9 @@ typedef struct {
int residual; /* Any outstanding SQL text */ int residual; /* Any outstanding SQL text */
GWBUF* tee_replybuf; /* Buffer for reply */ GWBUF* tee_replybuf; /* Buffer for reply */
SPINLOCK tee_lock; SPINLOCK tee_lock;
#ifdef SS_DEBUG
long d_id;
#endif
} TEE_SESSION; } TEE_SESSION;
typedef struct orphan_session_tt typedef struct orphan_session_tt
@ -181,6 +184,11 @@ typedef struct orphan_session_tt
struct orphan_session_tt* next; struct orphan_session_tt* next;
}orphan_session_t; }orphan_session_t;
#ifdef SS_DEBUG
static SPINLOCK debug_lock;
static long debug_id = 0;
#endif
static orphan_session_t* allOrphans = NULL; static orphan_session_t* allOrphans = NULL;
static SPINLOCK orphanLock; static SPINLOCK orphanLock;
@ -283,9 +291,7 @@ orphan_free(void* data)
while(finished) while(finished)
{ {
#ifdef SS_DEBUG
o_freed++; o_freed++;
#endif
tmp = finished; tmp = finished;
finished = finished->next; finished = finished->next;
@ -322,7 +328,10 @@ void
ModuleInit() ModuleInit()
{ {
spinlock_init(&orphanLock); spinlock_init(&orphanLock);
hktask_add("tee orphan cleanup",orphan_free,NULL,15); //hktask_add("tee orphan cleanup",orphan_free,NULL,15);
#ifdef SS_DEBUG
spinlock_init(&debug_lock);
#endif
} }
/** /**
@ -707,6 +716,8 @@ session_state_t state;
gwbuf_free(my_session->tee_replybuf); gwbuf_free(my_session->tee_replybuf);
free(session); free(session);
orphan_free(NULL);
return; return;
} }
/** /**
@ -824,6 +835,14 @@ unsigned char command = *((unsigned char*)queue->start + 4);
memset(my_session->eof,0,2*sizeof(int)); memset(my_session->eof,0,2*sizeof(int));
memset(my_session->waiting,1,2*sizeof(bool)); memset(my_session->waiting,1,2*sizeof(bool));
my_session->command = command; my_session->command = command;
#ifdef SS_DEBUG
spinlock_acquire(&debug_lock);
my_session->d_id = ++debug_id;
skygw_log_write_flush(LOGFILE_DEBUG,"tee.c [%d] query command [%x]",
my_session->d_id,
my_session->command);
spinlock_release(&debug_lock);
#endif
rval = my_session->down.routeQuery(my_session->down.instance, rval = my_session->down.routeQuery(my_session->down.instance,
my_session->down.session, my_session->down.session,
queue); queue);
@ -873,93 +892,138 @@ unsigned char command = *((unsigned char*)queue->start + 4);
static int static int
clientReply (FILTER* instance, void *session, GWBUF *reply) clientReply (FILTER* instance, void *session, GWBUF *reply)
{ {
int rc, branch, eof; int rc, branch, eof;
TEE_SESSION *my_session = (TEE_SESSION *) session; TEE_SESSION *my_session = (TEE_SESSION *) session;
bool route = false;
spinlock_acquire(&my_session->tee_lock); spinlock_acquire(&my_session->tee_lock);
ss_dassert(my_session->active); ss_dassert(my_session->active);
branch = instance == NULL ? CHILD : PARENT; branch = instance == NULL ? CHILD : PARENT;
unsigned char *ptr = (unsigned char*)reply->start; unsigned char *ptr = (unsigned char*)reply->start;
if(my_session->replies[branch] == 0) if(my_session->replies[branch] == 0)
{ {
/* Reply is in a single packet if it is an OK, ERR or LOCAL_INFILE packet. /* Reply is in a single packet if it is an OK, ERR or LOCAL_INFILE packet.
* Otherwise the reply is a result set and the amount of packets is unknown. * Otherwise the reply is a result set and the amount of packets is unknown.
*/ */
if(PTR_IS_ERR(ptr) || PTR_IS_LOCAL_INFILE(ptr) || if(PTR_IS_ERR(ptr) || PTR_IS_LOCAL_INFILE(ptr) ||
PTR_IS_OK(ptr) || !my_session->multipacket ) PTR_IS_OK(ptr) || !my_session->multipacket )
{ {
my_session->waiting[branch] = false; my_session->waiting[branch] = false;
} }
#ifdef SS_DEBUG #ifdef SS_DEBUG
else else
{ {
ss_dassert(PTR_IS_RESULTSET(ptr)); ss_dassert(PTR_IS_RESULTSET(ptr));
skygw_log_write_flush(LOGFILE_DEBUG,"tee.c: Waiting for a result set from %s session.",branch == PARENT?"parent":"child"); skygw_log_write_flush(LOGFILE_DEBUG,"tee.c: [%d] Waiting for a result set from %s session.",
} my_session->d_id,
ss_dassert(PTR_IS_ERR(ptr) || PTR_IS_LOCAL_INFILE(ptr)|| branch == PARENT?"parent":"child");
PTR_IS_OK(ptr) || my_session->waiting[branch] || }
!my_session->multipacket); ss_dassert(PTR_IS_ERR(ptr) || PTR_IS_LOCAL_INFILE(ptr)||
PTR_IS_OK(ptr) || my_session->waiting[branch] ||
!my_session->multipacket);
#endif #endif
} }
if(my_session->waiting[branch]) if(my_session->waiting[branch])
{ {
eof = modutil_count_signal_packets(reply,my_session->use_ok,my_session->eof[branch] > 0);
eof = modutil_count_signal_packets(reply,my_session->use_ok,my_session->eof[branch] > 0); my_session->eof[branch] += eof;
my_session->eof[branch] += eof; if(my_session->eof[branch] >= 2 ||
if(my_session->eof[branch] >= 2 || (my_session->command == 0x04 && my_session->eof[branch] > 0))
(my_session->command == 0x04 && my_session->eof[branch] > 0)) {
{ skygw_log_write_flush(LOGFILE_DEBUG,"tee.c [%d] %s received last EOF packet",
ss_dassert(my_session->eof[branch] < 3) my_session->d_id,
my_session->waiting[branch] = false; branch == PARENT?"parent":"child");
} ss_dassert(my_session->eof[branch] < 3)
} my_session->waiting[branch] = false;
}
}
if(branch == PARENT) if(branch == PARENT)
{ {
ss_dassert(my_session->tee_replybuf == NULL) ss_dassert(my_session->tee_replybuf == NULL)
my_session->tee_replybuf = reply; my_session->tee_replybuf = reply;
} }
else else
{ {
gwbuf_free(reply); gwbuf_free(reply);
} }
my_session->replies[branch]++; my_session->replies[branch]++;
rc = 1;
if(my_session->tee_replybuf != NULL &&
(my_session->branch_session == NULL || int min_eof = my_session->command != 0x04 ? 2 : 1;
my_session->waiting[PARENT] ||
(!my_session->waiting[CHILD] && !my_session->waiting[PARENT]))) if(my_session->tee_replybuf != NULL)
{
if(my_session->branch_session == NULL)
{
rc = 0;
gwbuf_free(my_session->tee_replybuf);
my_session->tee_replybuf = NULL;
skygw_log_write_flush(LOGFILE_ERROR,"Error : Tee child session was closed.");
}
if(my_session->multipacket)
{
if(my_session->waiting[PARENT])
{
route = true;
#ifdef SS_DEBUG
ss_dassert(my_session->replies[PARENT] < 2 ||
modutil_count_signal_packets(my_session->tee_replybuf,
my_session->use_ok,
my_session->eof[PARENT]) == 0);
skygw_log_write_flush(LOGFILE_DEBUG,"tee.c:[%d] Routing partial response set.",my_session->d_id);
#endif
}
else if(my_session->eof[PARENT] == min_eof &&
my_session->eof[CHILD] == min_eof)
{
route = true;
#ifdef SS_DEBUG
skygw_log_write_flush(LOGFILE_DEBUG,"tee.c:[%d] Routing final packet of response set.",my_session->d_id);
#endif
}
}
else if(!my_session->waiting[PARENT] &&
!my_session->waiting[CHILD])
{
#ifdef SS_DEBUG
skygw_log_write_flush(LOGFILE_DEBUG,"tee.c:[%d] Routing single packet response.",my_session->d_id);
#endif
route = true;
}
}
if(route)
{ {
#ifdef SS_DEBUG #ifdef SS_DEBUG
skygw_log_write_flush(LOGFILE_DEBUG, "tee.c: Routing buffer '%p' parent(waiting [%s] replies [%d] eof[%d])" skygw_log_write_flush(LOGFILE_DEBUG, "tee.c:[%d] Routing buffer '%p' parent(waiting [%s] replies [%d] eof[%d])"
" child(waiting [%s] replies[%d] eof [%d])", " child(waiting [%s] replies[%d] eof [%d])",
my_session->tee_replybuf, my_session->d_id,
my_session->waiting[PARENT] ? "true":"false", my_session->tee_replybuf,
my_session->replies[PARENT], my_session->waiting[PARENT] ? "true":"false",
my_session->eof[PARENT], my_session->replies[PARENT],
my_session->waiting[CHILD]?"true":"false", my_session->eof[PARENT],
my_session->replies[CHILD], my_session->waiting[CHILD]?"true":"false",
my_session->eof[CHILD]); my_session->replies[CHILD],
my_session->eof[CHILD]);
#endif #endif
rc = my_session->up.clientReply ( rc = my_session->up.clientReply (my_session->up.instance,
my_session->up.instance, my_session->up.session,
my_session->up.session, my_session->tee_replybuf);
my_session->tee_replybuf); my_session->tee_replybuf = NULL;
my_session->tee_replybuf = NULL; }
}
else spinlock_release(&my_session->tee_lock);
{ return rc;
rc = 1;
}
spinlock_release(&my_session->tee_lock);
return rc;
} }
/** /**
* Diagnostics routine * Diagnostics routine