Merge branch 'release-1.0GA' of https://github.com/mariadb-corporation/MaxScale into release-1.0GA

This commit is contained in:
VilhoRaatikka 2015-01-02 09:54:29 +02:00
commit 38b16ec2d0
2 changed files with 178 additions and 13 deletions

View File

@ -716,6 +716,15 @@ session_state(int state)
return "Listener Session";
case SESSION_STATE_LISTENER_STOPPED:
return "Stopped Listener Session";
#ifdef SS_DEBUG
case SESSION_STATE_STOPPING:
return "Stopping session";
case SESSION_STATE_TO_BE_FREED:
return "Session to be freed";
case SESSION_STATE_FREE:
return "Freed session";
#endif
default:
return "Invalid State";
}

View File

@ -166,6 +166,15 @@ typedef struct {
GWBUF* tee_replybuf; /* Buffer for reply */
} TEE_SESSION;
typedef struct orphan_session_tt
{
SESSION* session;
struct orphan_session_tt* next;
}orphan_session_t;
static orphan_session_t* allOrphans = NULL;
static SPINLOCK orphanLock;
static int packet_is_required(GWBUF *queue);
static int detect_loops(TEE_INSTANCE *instance, HASHTABLE* ht, SERVICE* session);
@ -211,6 +220,7 @@ version()
void
ModuleInit()
{
spinlock_init(&orphanLock);
}
/**
@ -402,7 +412,9 @@ char *remote, *userName;
{
DCB* dcb;
SESSION* ses;
FILTER_DEF* dummy;
UPSTREAM* dummy_upstream;
if ((dcb = dcb_clone(session->client)) == NULL)
{
freeSession(instance, (void *)my_session);
@ -415,6 +427,23 @@ char *remote, *userName;
goto retblock;
}
if((dummy = filter_alloc("tee_dummy","tee_dummy")) == NULL)
{
dcb_close(dcb);
freeSession(instance, (void *)my_session);
my_session = NULL;
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : tee: Allocating memory for "
"dummy filter definition failed."
" Terminating session.")));
goto retblock;
}
if ((ses = session_alloc(my_instance->service, dcb)) == NULL)
{
dcb_close(dcb);
@ -427,23 +456,45 @@ char *remote, *userName;
goto retblock;
}
ss_dassert(ses->ses_is_child);
FILTER_DEF* dummy;
UPSTREAM* dummy_upstream;
dummy = filter_alloc("tee_dummy","tee_dummy");
dummy->obj = GetModuleObject();
dummy->filter = my_instance;
dummy_upstream = filterUpstream (dummy,my_session,&ses->tail);
ses->tail = *dummy_upstream;
free(dummy_upstream);
if((dummy_upstream = filterUpstream(
dummy, my_session, &ses->tail)) == NULL)
{
spinlock_acquire(&ses->ses_lock);
ses->state = SESSION_STATE_STOPPING;
spinlock_release(&ses->ses_lock);
ses->service->router->closeSession(
ses->service->router_instance,
ses->router_session);
ses->client = NULL;
dcb->session = NULL;
session_free(ses);
dcb_close(dcb);
freeSession(instance, (void *) my_session);
my_session = NULL;
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : tee: Allocating memory for"
"dummy upstream failed."
" Terminating session.")));
goto retblock;
}
ses->tail = *dummy_upstream;
my_session->min_replies = 2;
my_session->branch_session = ses;
my_session->branch_dcb = dcb;
my_session->branch_dcb = dcb;
my_session->dummy_filterdef = dummy;
free(dummy_upstream);
}
}
retblock:
@ -526,12 +577,106 @@ SESSION* ses = my_session->branch_session;
/** This indicates that branch session is not available anymore */
my_session->branch_session = NULL;
}
else if(ses->state == SESSION_STATE_STOPPING)
{
orphan_session_t* orphan;
if((orphan = malloc(sizeof(orphan_session_t))) == NULL)
{
skygw_log_write(LOGFILE_ERROR,"Error : Failed to "
"allocate memory for orphan session struct, "
"child session might leak memory.");
}else{
orphan->session = ses;
spinlock_acquire(&orphanLock);
orphan->next = allOrphans;
allOrphans = orphan;
spinlock_release(&orphanLock);
}
if(ses->refcount == 0)
{
ss_dassert(ses->refcount == 0 && ses->client == NULL);
ses->state = SESSION_STATE_TO_BE_FREED;
}
}
}
if (my_session->dummy_filterdef)
{
filter_free(my_session->dummy_filterdef);
}
free(session);
spinlock_acquire(&orphanLock);
orphan_session_t *ptr = allOrphans, *finished = NULL,*tmp = NULL;
#ifdef SS_DEBUG
int o_stopping = 0, o_ready = 0,o_freed = 0;
#endif
while(ptr)
{
if(ptr->session->state == SESSION_STATE_TO_BE_FREED)
{
if(ptr == allOrphans)
{
tmp = ptr;
allOrphans = ptr->next;
}
else
{
tmp = allOrphans;
while(tmp && tmp->next != ptr)
tmp = tmp->next;
if(tmp)
{
tmp->next = ptr->next;
tmp = ptr;
}
}
}
#ifdef SS_DEBUG
else if(ptr->session->state == SESSION_STATE_STOPPING)
{
o_stopping++;
}
else if(ptr->session->state == SESSION_STATE_ROUTER_READY)
{
o_ready++;
}
#endif
ptr = ptr->next;
if(tmp)
{
tmp->next = finished;
finished = tmp;
tmp = NULL;
}
}
spinlock_release(&orphanLock);
#ifdef SS_DEBUG
if(o_stopping + o_ready > 0)
skygw_log_write(LOGFILE_TRACE,"tee.c: %d orphans in "
"SESSION_STATE_STOPPING, %d orphans in "
"SESSION_STATE_ROUTER_READY. ",o_stopping,o_ready);
#endif
while(finished)
{
#ifdef SS_DEBUG
skygw_log_write(LOGFILE_TRACE,"tee.c: %d orphans freed.",++o_freed);
#endif
tmp = finished;
finished = finished->next;
tmp->session->service->router->freeSession(
tmp->session->service->router_instance,
tmp->session->router_session);
tmp->session->state = SESSION_STATE_FREE;
free(tmp->session);
free(tmp);
}
return;
}
@ -642,6 +787,7 @@ GWBUF *clone = NULL;
if (my_session->branch_session->state == SESSION_STATE_ROUTER_READY)
{
my_session->replies = 0;
my_session->last_qtype = *((unsigned char*)queue->start + 4);
SESSION_ROUTE_QUERY(my_session->branch_session, clone);
}
else
@ -693,12 +839,22 @@ clientReply (FILTER* instance, void *session, GWBUF *reply)
}
else
{
gwbuf_free(reply);
if(my_session->last_qtype == 0x03 &&
*(unsigned char*)(reply->start + 4) != 0xff
&& *(unsigned char*)(my_session->tee_replybuf->start + 4) == 0xff)
{
gwbuf_free(my_session->tee_replybuf);
my_session->tee_replybuf = reply;
}
else
{
gwbuf_free(reply);
}
}
if (my_session->branch_session == NULL ||
my_session->replies >= my_session->min_replies)
{
{
rc = my_session->up.clientReply (
my_session->up.instance,
my_session->up.session,