First version of read write split router. Memory is leaked but it seems that its happening in connection handling somewhere.

This commit is contained in:
vraatikka
2013-07-10 12:31:52 +03:00
parent 7e1840b609
commit 8221e75fa7
4 changed files with 629 additions and 427 deletions

View File

@ -17,6 +17,7 @@
*/
#include <stdio.h>
#include <strings.h>
#include <string.h>
#include <router.h>
#include <readwritesplit.h>
@ -350,9 +351,45 @@ static void* newSession(
* @param instance The router instance data
* @param session The session being closed
*/
static void
closeSession(ROUTER *instance, void *session)
static void closeSession(
ROUTER* instance,
void* router_session)
{
INSTANCE* inst = (INSTANCE *)instance;
CLIENT_SESSION* session = (CLIENT_SESSION *)router_session;
/**
* Close the connection to the backend servers
*/
session->slaveconn->func.close(session->slaveconn);
session->masterconn->func.close(session->masterconn);
atomic_add(&session->slave->count, -1);
atomic_add(&session->master->count, -1);
atomic_add(&session->slave->server->stats.n_current, -1);
atomic_add(&session->master->server->stats.n_current, -1);
spinlock_acquire(&inst->lock);
if (inst->connections == session) {
inst->connections = session->next;
} else {
CLIENT_SESSION* ptr = inst->connections;
while (ptr && ptr->next != session) {
ptr = ptr->next;
}
if (ptr) {
ptr->next = session->next;
}
}
spinlock_release(&inst->lock);
/*
* We are no longer in the linked list, free
* all the memory and other resources associated
* to the client session.
*/
free(session);
}
@ -378,11 +415,12 @@ static int routeQuery(
void* router_session,
GWBUF* queue)
{
skygw_query_type_t qtype = QUERY_TYPE_UNKNOWN;
skygw_query_type_t qtype = QUERY_TYPE_UNKNOWN;
char* querystr = NULL;
char* startpos;
char* querystr;
size_t len;
unsigned char packet_type;
int ret = 0;
INSTANCE* inst = (INSTANCE *)instance;
CLIENT_SESSION* session = (CLIENT_SESSION *)router_session;
@ -393,15 +431,6 @@ static int routeQuery(
len = (unsigned char)queue->data[0];
len += 255*(unsigned char)queue->data[1];
len += 255*255*((unsigned char)queue->data[2]);
querystr = (char *)malloc(len);
snprintf(querystr, len, "%s", startpos);
skygw_log_write(NULL,
LOGFILE_TRACE,
"Packet type %s. %s : %d.",
STRPACKETTYPE(packet_type),
__FILE__,
__LINE__);
switch(packet_type) {
case COM_INIT_DB: /**< 2 */
@ -415,6 +444,9 @@ static int routeQuery(
break;
case COM_QUERY:
querystr = (char *)malloc(len);
memcpy(querystr, startpos, len-1);
memset(&querystr[len-1], 0, 1);
qtype = skygw_query_classifier_get_type(querystr, 0);
break;
@ -429,47 +461,59 @@ static int routeQuery(
case COM_DAEMON: /**< 1d ? */
break;
}
skygw_log_write(NULL, LOGFILE_TRACE, "String\t\"%s\"", querystr);
skygw_log_write(NULL,
LOGFILE_TRACE,
"Packet type\t%s",
STRPACKETTYPE(packet_type));
switch (qtype) {
case QUERY_TYPE_WRITE:
skygw_log_write(NULL,
LOGFILE_MESSAGE,
"Query type : %s, routing to Master.",
LOGFILE_TRACE,
"Query type\t%s, routing to Master.",
STRQTYPE(qtype));
return session->masterconn->func.write(session->masterconn, queue);
ret = session->masterconn->func.write(session->masterconn, queue);
goto return_ret;
break;
case QUERY_TYPE_READ:
skygw_log_write(NULL,
LOGFILE_MESSAGE,
"Query type : %s, routing to Slave.",
LOGFILE_TRACE,
"Query type\t%s, routing to Slave.",
STRQTYPE(qtype));
return session->slaveconn->func.write(session->slaveconn, queue);
ret = session->slaveconn->func.write(session->slaveconn, queue);
goto return_ret;
break;
case QUERY_TYPE_SESSION_WRITE:
skygw_log_write(NULL,
LOGFILE_MESSAGE,
"Query type : %s, routing to Master.",
LOGFILE_TRACE,
"Query type\t%s, routing to Master.",
STRQTYPE(qtype));
/**
* TODO! Connection to all servers must be established, and
* the command must be executed in them.
*/
return session->masterconn->func.write(session->masterconn, queue);
ret = session->masterconn->func.write(session->masterconn, queue);
goto return_ret;
break;
default:
skygw_log_write(NULL,
LOGFILE_MESSAGE,
"Query type : %s, routing to Master.",
LOGFILE_TRACE,
"Query type\t%s, routing to Master.",
STRQTYPE(qtype));
/** Is this really ok? */
return session->masterconn->func.write(session->masterconn, queue);
ret = session->masterconn->func.write(session->masterconn, queue);
goto return_ret;
break;
}
return 0;
return_ret:
free(querystr);
return ret;
}
/**