session.c:session_free:if session is child of another service (tee in this case), it is the parent which releases child's allocated memory back to the system. This now also includes the child router session.
dcb.h: Added DCB_IS_CLONE macro
tee.c:freeSession:if parent session triggered closing of tee, then child session may not be closed yet. In that case free the child session first and only then free child router session and release child session's memory back to system.
tee.c:routeQuery: only route if child session is ready for routing. Log if session is not ready for routing and set tee session inactive
mysql_client.c:gw_client_close:if DCB is cloned one don't close the protocol because they it is shared with the original DCB.
This commit is contained in:
VilhoRaatikka 2014-12-23 00:26:57 +02:00
parent 61fcc1e21e
commit f0d8ed0cf2
7 changed files with 98 additions and 57 deletions

View File

@ -310,7 +310,7 @@ DCB *clone;
return NULL;
}
clone->fd = DCBFD_CLONED;;
clone->fd = DCBFD_CLOSED;
clone->flags |= DCBF_CLONE;
clone->state = orig->state;
clone->data = orig->data;
@ -321,11 +321,10 @@ DCB *clone;
clone->protocol = orig->protocol;
clone->func.write = dcb_null_write;
#if 1
/**
* Close triggers closing of router session as well which is needed.
*/
clone->func.close = orig->func.close;
#else
clone->func.close = dcb_null_close;
#endif
clone->func.auth = dcb_null_auth;
return clone;

View File

@ -357,7 +357,6 @@ bool session_free(
int i;
CHK_SESSION(session);
/*<
* Remove one reference. If there are no references left,
* free session.
@ -389,14 +388,14 @@ bool session_free(
atomic_add(&session->service->stats.n_current, -1);
/**
* Free router_session and set it NULL
* If session is not child of some other session, free router_session.
* Otherwise let the parent free it.
*/
if (session->router_session)
if (!session->ses_is_child && session->router_session)
{
session->service->router->freeSession(
session->service->router_instance,
session->router_session);
session->router_session = NULL;
}
if (session->n_filters)
{

View File

@ -130,7 +130,6 @@ typedef struct {
#define GWPROTOCOL_VERSION {1, 0, 0}
#define DCBFD_CLOSED -1
#define DCBFD_CLONED -2
/**
* The statitics gathered on a descriptor control block
@ -332,4 +331,6 @@ bool dcb_get_ses_log_info(DCB* dcb, size_t* sesid, int* enabled_logs);
*/
#define DCBF_CLONE 0x0001 /*< DCB is a clone */
#define DCBF_HUNG 0x0002 /*< Hangup has been dispatched */
#define DCB_IS_CLONE(d) ((d)->flags & DCBF_CLONE)
#endif /* _DCB_H */

View File

@ -351,7 +351,7 @@ char *remote, *userName;
if ((dcb = dcb_clone(session->client)) == NULL)
{
freeSession(my_instance, (void *)my_session);
freeSession(instance, (void *)my_session);
my_session = NULL;
LOGIF(LE, (skygw_log_write_flush(
@ -364,7 +364,7 @@ char *remote, *userName;
if ((ses = session_alloc(my_instance->service, dcb)) == NULL)
{
dcb_close(dcb);
freeSession(my_instance, (void *)my_session);
freeSession(instance, (void *)my_session);
my_session = NULL;
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
@ -438,12 +438,26 @@ freeSession(FILTER *instance, void *session)
TEE_SESSION *my_session = (TEE_SESSION *)session;
SESSION* ses = my_session->branch_session;
if (ses != NULL && ses->state == SESSION_STATE_TO_BE_FREED)
if (ses != NULL)
{
ses->state = SESSION_STATE_FREE;
free(ses);
if (ses->state == SESSION_STATE_ROUTER_READY)
{
session_free(ses);
}
if (ses->state == SESSION_STATE_TO_BE_FREED)
{
/** Free branch router session */
ses->service->router->freeSession(
ses->service->router_instance,
ses->router_session);
/** Free memory of branch client session */
ses->state = SESSION_STATE_FREE;
free(ses);
/** This indicates that branch session is not available anymore */
my_session->branch_session = NULL;
}
}
free(session);
return;
}
@ -490,46 +504,76 @@ char *ptr;
int length, rval, residual = 0;
GWBUF *clone = NULL;
if (my_session->residual)
if (my_session->branch_session->state == SESSION_STATE_ROUTER_READY)
{
clone = gwbuf_clone(queue);
if (my_session->residual < GWBUF_LENGTH(clone))
GWBUF_RTRIM(clone, GWBUF_LENGTH(clone) - residual);
my_session->residual -= GWBUF_LENGTH(clone);
if (my_session->residual < 0)
my_session->residual = 0;
}
else if ( my_session->active && (ptr = modutil_get_SQL(queue)) != NULL)
{
if ((my_instance->match == NULL ||
regexec(&my_instance->re, ptr, 0, NULL, 0) == 0) &&
(my_instance->nomatch == NULL ||
regexec(&my_instance->nore,ptr,0,NULL, 0) != 0))
{
char *dummy;
modutil_MySQL_Query(queue, &dummy, &length, &residual);
clone = gwbuf_clone(queue);
my_session->residual = residual;
if (my_session->residual)
{
clone = gwbuf_clone(queue);
if (my_session->residual < GWBUF_LENGTH(clone))
{
GWBUF_RTRIM(clone, GWBUF_LENGTH(clone) - residual);
}
my_session->residual -= GWBUF_LENGTH(clone);
if (my_session->residual < 0)
{
my_session->residual = 0;
}
}
free(ptr);
}
else if (packet_is_required(queue))
{
clone = gwbuf_clone(queue);
}
else if (my_session->active && (ptr = modutil_get_SQL(queue)) != NULL)
{
if ((my_instance->match == NULL ||
regexec(&my_instance->re, ptr, 0, NULL, 0) == 0) &&
(my_instance->nomatch == NULL ||
regexec(&my_instance->nore,ptr,0,NULL, 0) != 0))
{
char *dummy;
modutil_MySQL_Query(queue, &dummy, &length, &residual);
clone = gwbuf_clone(queue);
my_session->residual = residual;
}
free(ptr);
}
else if (packet_is_required(queue))
{
clone = gwbuf_clone(queue);
}
}
/* Pass the query downstream */
rval = my_session->down.routeQuery(my_session->down.instance,
my_session->down.session, queue);
my_session->down.session,
queue);
if (clone)
{
my_session->n_duped++;
SESSION_ROUTE_QUERY(my_session->branch_session, clone);
if (my_session->branch_session->state == SESSION_STATE_ROUTER_READY)
{
SESSION_ROUTE_QUERY(my_session->branch_session, clone);
}
else
{
/** Close tee session */
my_session->active = 0;
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Closed tee filter session.")));
gwbuf_free(clone);
}
}
else
{
if (my_session->active)
{
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Closed tee filter session.")));
my_session->active = 0;
}
my_session->n_rejected++;
}
return rval;

View File

@ -790,14 +790,14 @@ int log_no_master = 1;
{
LOGIF(LM, (skygw_log_write(
LOGFILE_MESSAGE,
"Info: A Master Server is now available: %s:%i",
"Info : A Master Server is now available: %s:%i",
root_master->server->name,
root_master->server->port)));
}
} else {
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error: No Master can be determined. Last known was %s:%i",
"Error : No Master can be determined. Last known was %s:%i",
root_master->server->name,
root_master->server->port)));
}
@ -807,7 +807,7 @@ int log_no_master = 1;
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error: No Master can be determined")));
"Error : No Master can be determined")));
log_no_master = 0;
}
}

View File

@ -1401,9 +1401,14 @@ gw_client_close(DCB *dcb)
LOGIF(LD, (skygw_log_write(LOGFILE_DEBUG,
"%lu [gw_client_close]",
pthread_self())));
mysql_protocol_done(dcb);
/**
* Since only protocol pointer is copied from original DCB to clone in
* dcb_clone, only dcb_close for the original DCB closes protocol.
*/
if (!DCB_IS_CLONE(dcb))
{
mysql_protocol_done(dcb);
}
session = dcb->session;
/**
* session may be NULL if session_alloc failed.

View File

@ -1032,13 +1032,6 @@ static void freeSession(
router = (ROUTER_INSTANCE *)router_instance;
backend_ref = router_cli_ses->rses_backend_ref;
for (i=0; i<router_cli_ses->rses_nbackends; i++)
{
if (!BREF_IS_IN_USE((&backend_ref[i])))
{
continue;
}
}
spinlock_acquire(&router->lock);
if (router->connections == router_cli_ses) {