Properly close the branch session of the tee filter.

This commit is contained in:
Markus Makela
2015-08-28 19:49:03 +03:00
parent 5338318900
commit d29c5909a6

View File

@ -212,6 +212,7 @@ int route_single_query(TEE_INSTANCE* my_instance,
GWBUF* buffer, GWBUF* buffer,
GWBUF* clone); GWBUF* clone);
int reset_session_state(TEE_SESSION* my_session, GWBUF* buffer); int reset_session_state(TEE_SESSION* my_session, GWBUF* buffer);
void create_orphan(SESSION* ses);
static void static void
orphan_free(void* data) orphan_free(void* data)
@ -557,6 +558,7 @@ char *remote, *userName;
if ((ses = session_alloc(my_instance->service, dcb)) == NULL) if ((ses = session_alloc(my_instance->service, dcb)) == NULL)
{ {
filter_free(dummy);
dcb_close(dcb); dcb_close(dcb);
freeSession(instance, (void *)my_session); freeSession(instance, (void *)my_session);
my_session = NULL; my_session = NULL;
@ -572,39 +574,27 @@ char *remote, *userName;
dummy->obj = GetModuleObject(); dummy->obj = GetModuleObject();
dummy->filter = NULL; dummy->filter = NULL;
my_session->branch_session = ses;
my_session->branch_dcb = dcb;
my_session->dummy_filterdef = dummy;
if((dummy_upstream = filterUpstream( if(true || (dummy_upstream = filterUpstream(
dummy, my_session, &ses->tail)) == NULL) dummy, my_session, &ses->tail)) == NULL)
{ {
spinlock_acquire(&ses->ses_lock); filter_free(dummy);
ses->state = SESSION_STATE_STOPPING; closeSession(instance,(void*)my_session);
spinlock_release(&ses->ses_lock);
ses->service->router->closeSession(
ses->service->router_instance,
ses->router_session);
ses->client = NULL;
dcb->session = NULL;
session_free(ses);
dcb_close(dcb); dcb_close(dcb);
freeSession(instance, (void *) my_session); free(my_session);
my_session = NULL;
LOGIF(LE, (skygw_log_write_flush( LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR, LOGFILE_ERROR,
"Error : tee: Allocating memory for" "Error : tee: Allocating memory for"
"dummy upstream failed." "dummy upstream failed."
" Terminating session."))); " Terminating session.")));
goto retblock; return NULL;
} }
ses->tail = *dummy_upstream; ses->tail = *dummy_upstream;
my_session->branch_session = ses;
my_session->branch_dcb = dcb;
my_session->dummy_filterdef = dummy;
MySQLProtocol* protocol = (MySQLProtocol*)session->client->protocol; MySQLProtocol* protocol = (MySQLProtocol*)session->client->protocol;
my_session->use_ok = protocol->client_capabilities & (1 << 6); my_session->use_ok = protocol->client_capabilities & (1 << 6);
free(dummy_upstream); free(dummy_upstream);
@ -712,19 +702,7 @@ skygw_log_write(LOGFILE_TRACE,"Tee free: %d", atomic_add(&debug_seq,1));
} }
else if(state == SESSION_STATE_STOPPING) else if(state == SESSION_STATE_STOPPING)
{ {
orphan_session_t* orphan; create_orphan(ses);
if((orphan = malloc(sizeof(orphan_session_t))) == NULL)
{
skygw_log_write(LOGFILE_ERROR,"Error : Failed to "
"allocate memory for orphan session struct, "
"child session might leak memory.");
}else{
orphan->session = ses;
spinlock_acquire(&orphanLock);
orphan->next = allOrphans;
allOrphans = orphan;
spinlock_release(&orphanLock);
}
} }
} }
if (my_session->dummy_filterdef) if (my_session->dummy_filterdef)
@ -1400,4 +1378,21 @@ int reset_session_state(TEE_SESSION* my_session, GWBUF* buffer)
my_session->command = command; my_session->command = command;
return 1; return 1;
}
void create_orphan(SESSION* ses)
{
orphan_session_t* orphan;
if((orphan = malloc(sizeof(orphan_session_t))) == NULL)
{
skygw_log_write(LOGFILE_ERROR,"Error : Failed to "
"allocate memory for orphan session struct, "
"child session might leak memory.");
}else{
orphan->session = ses;
spinlock_acquire(&orphanLock);
orphan->next = allOrphans;
allOrphans = orphan;
spinlock_release(&orphanLock);
}
} }