Fixed orphaned branch sessions leaking memory.
tee.c: Created a static, module-wide list of orphaned branch sessions which holds the branch sessions that can't be freed at the time the parent session is being freed. This list is processed every time a a tee filter session is freeing its memory.
This commit is contained in:
@ -166,6 +166,15 @@ typedef struct {
|
|||||||
GWBUF* tee_replybuf; /* Buffer for reply */
|
GWBUF* tee_replybuf; /* Buffer for reply */
|
||||||
} TEE_SESSION;
|
} TEE_SESSION;
|
||||||
|
|
||||||
|
typedef struct orphan_session_tt
|
||||||
|
{
|
||||||
|
SESSION* session;
|
||||||
|
struct orphan_session_tt* next;
|
||||||
|
}orphan_session_t;
|
||||||
|
|
||||||
|
static orphan_session_t* allOrphans = NULL;
|
||||||
|
|
||||||
|
static SPINLOCK orphanLock;
|
||||||
static int packet_is_required(GWBUF *queue);
|
static int packet_is_required(GWBUF *queue);
|
||||||
static int detect_loops(TEE_INSTANCE *instance, HASHTABLE* ht, SERVICE* session);
|
static int detect_loops(TEE_INSTANCE *instance, HASHTABLE* ht, SERVICE* session);
|
||||||
|
|
||||||
@ -211,6 +220,7 @@ version()
|
|||||||
void
|
void
|
||||||
ModuleInit()
|
ModuleInit()
|
||||||
{
|
{
|
||||||
|
spinlock_init(&orphanLock);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -402,6 +412,8 @@ char *remote, *userName;
|
|||||||
{
|
{
|
||||||
DCB* dcb;
|
DCB* dcb;
|
||||||
SESSION* ses;
|
SESSION* ses;
|
||||||
|
FILTER_DEF* dummy;
|
||||||
|
UPSTREAM* dummy_upstream;
|
||||||
|
|
||||||
if ((dcb = dcb_clone(session->client)) == NULL)
|
if ((dcb = dcb_clone(session->client)) == NULL)
|
||||||
{
|
{
|
||||||
@ -415,6 +427,23 @@ char *remote, *userName;
|
|||||||
|
|
||||||
goto retblock;
|
goto retblock;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if((dummy = filter_alloc("tee_dummy","tee_dummy")) == NULL)
|
||||||
|
{
|
||||||
|
dcb_close(dcb);
|
||||||
|
freeSession(instance, (void *)my_session);
|
||||||
|
my_session = NULL;
|
||||||
|
LOGIF(LE, (skygw_log_write_flush(
|
||||||
|
LOGFILE_ERROR,
|
||||||
|
"Error : tee: Allocating memory for "
|
||||||
|
"dummy filter definition failed."
|
||||||
|
" Terminating session.")));
|
||||||
|
|
||||||
|
goto retblock;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
if ((ses = session_alloc(my_instance->service, dcb)) == NULL)
|
if ((ses = session_alloc(my_instance->service, dcb)) == NULL)
|
||||||
{
|
{
|
||||||
dcb_close(dcb);
|
dcb_close(dcb);
|
||||||
@ -427,23 +456,45 @@ char *remote, *userName;
|
|||||||
|
|
||||||
goto retblock;
|
goto retblock;
|
||||||
}
|
}
|
||||||
|
|
||||||
ss_dassert(ses->ses_is_child);
|
ss_dassert(ses->ses_is_child);
|
||||||
|
|
||||||
FILTER_DEF* dummy;
|
|
||||||
UPSTREAM* dummy_upstream;
|
|
||||||
|
|
||||||
dummy = filter_alloc("tee_dummy","tee_dummy");
|
|
||||||
dummy->obj = GetModuleObject();
|
dummy->obj = GetModuleObject();
|
||||||
dummy->filter = my_instance;
|
dummy->filter = my_instance;
|
||||||
|
|
||||||
dummy_upstream = filterUpstream (dummy,my_session,&ses->tail);
|
|
||||||
ses->tail = *dummy_upstream;
|
|
||||||
free(dummy_upstream);
|
|
||||||
|
|
||||||
|
if((dummy_upstream = filterUpstream(
|
||||||
|
dummy, my_session, &ses->tail)) == NULL)
|
||||||
|
{
|
||||||
|
spinlock_acquire(&ses->ses_lock);
|
||||||
|
ses->state = SESSION_STATE_STOPPING;
|
||||||
|
spinlock_release(&ses->ses_lock);
|
||||||
|
|
||||||
|
ses->service->router->closeSession(
|
||||||
|
ses->service->router_instance,
|
||||||
|
ses->router_session);
|
||||||
|
|
||||||
|
ses->client = NULL;
|
||||||
|
dcb->session = NULL;
|
||||||
|
session_free(ses);
|
||||||
|
dcb_close(dcb);
|
||||||
|
freeSession(instance, (void *) my_session);
|
||||||
|
my_session = NULL;
|
||||||
|
LOGIF(LE, (skygw_log_write_flush(
|
||||||
|
LOGFILE_ERROR,
|
||||||
|
"Error : tee: Allocating memory for"
|
||||||
|
"dummy upstream failed."
|
||||||
|
" Terminating session.")));
|
||||||
|
|
||||||
|
goto retblock;
|
||||||
|
}
|
||||||
|
|
||||||
|
ses->tail = *dummy_upstream;
|
||||||
my_session->min_replies = 2;
|
my_session->min_replies = 2;
|
||||||
my_session->branch_session = ses;
|
my_session->branch_session = ses;
|
||||||
my_session->branch_dcb = dcb;
|
my_session->branch_dcb = dcb;
|
||||||
my_session->dummy_filterdef = dummy;
|
my_session->dummy_filterdef = dummy;
|
||||||
|
free(dummy_upstream);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
retblock:
|
retblock:
|
||||||
@ -526,12 +577,77 @@ SESSION* ses = my_session->branch_session;
|
|||||||
/** This indicates that branch session is not available anymore */
|
/** This indicates that branch session is not available anymore */
|
||||||
my_session->branch_session = NULL;
|
my_session->branch_session = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if(ses->state == SESSION_STATE_STOPPING)
|
||||||
|
{
|
||||||
|
orphan_session_t* orphan;
|
||||||
|
if((orphan = malloc(sizeof(orphan_session_t))) == NULL)
|
||||||
|
{
|
||||||
|
skygw_log_write(LOGFILE_ERROR,"Error : Failed to "
|
||||||
|
"allocate memory for orphan session struct, "
|
||||||
|
"child session might leak memory.");
|
||||||
|
}else{
|
||||||
|
orphan->session = ses;
|
||||||
|
spinlock_acquire(&orphanLock);
|
||||||
|
orphan->next = allOrphans;
|
||||||
|
allOrphans = orphan;
|
||||||
|
spinlock_release(&orphanLock);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if (my_session->dummy_filterdef)
|
if (my_session->dummy_filterdef)
|
||||||
{
|
{
|
||||||
filter_free(my_session->dummy_filterdef);
|
filter_free(my_session->dummy_filterdef);
|
||||||
}
|
}
|
||||||
free(session);
|
free(session);
|
||||||
|
|
||||||
|
spinlock_acquire(&orphanLock);
|
||||||
|
orphan_session_t *ptr = allOrphans, *finished = NULL,*tmp = NULL;
|
||||||
|
|
||||||
|
while(ptr)
|
||||||
|
{
|
||||||
|
if(ptr->session->state == SESSION_STATE_TO_BE_FREED)
|
||||||
|
{
|
||||||
|
if(ptr == allOrphans)
|
||||||
|
{
|
||||||
|
tmp = ptr;
|
||||||
|
allOrphans = ptr->next;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
tmp = allOrphans;
|
||||||
|
while(tmp && tmp->next != ptr)
|
||||||
|
tmp = tmp->next;
|
||||||
|
tmp->next = ptr->next;
|
||||||
|
tmp = ptr;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ptr = ptr->next;
|
||||||
|
if(tmp)
|
||||||
|
{
|
||||||
|
tmp->next = finished;
|
||||||
|
finished = tmp;
|
||||||
|
tmp = NULL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
spinlock_release(&orphanLock);
|
||||||
|
|
||||||
|
while(finished)
|
||||||
|
{
|
||||||
|
tmp = finished;
|
||||||
|
finished = finished->next;
|
||||||
|
|
||||||
|
tmp->session->service->router->freeSession(
|
||||||
|
tmp->session->service->router_instance,
|
||||||
|
tmp->session->router_session);
|
||||||
|
|
||||||
|
tmp->session->state = SESSION_STATE_FREE;
|
||||||
|
free(tmp->session);
|
||||||
|
free(tmp);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user