Addition of the service, server, serv_protocol and session structure needed to tie the elements together.

Improvements to the protocol module support

Documentation improvements

Addition of make install target
This commit is contained in:
Mark Riddoch 2013-06-18 16:59:01 +02:00
parent 4d5215e267
commit ec688e6222
26 changed files with 1255 additions and 93 deletions

View File

@ -19,6 +19,9 @@
# 14/06/13 Mark Riddoch Initial implementation
# 17/06/13 Mark Riddoch Addition of documentation and depend
# targets
# 18/06/13 Mark Riddoch Addition of install target
DEST=/usr/local/skysql
all:
(cd core; make)
@ -37,3 +40,8 @@ depend:
documentation:
doxygen doxygate
install:
(cd core; make DEST=$(DEST) install)
(cd modules/routing; make DEST=$(DEST) install)
(cd modules/protocol; make DEST=$(DEST) install)

25
README Normal file
View File

@ -0,0 +1,25 @@
/** \mainpage SkySQL Gateway
The SkySQL Gateway is an intelligent proxy that allows forwarding of
database statements to one or more database server user complex rules
and a semantic understanding of the database satements and the roles of
the various servers within the backend cluster of databases.
The Gateway is designed to provided load balancing and high avilability
functionality transparantly to the applications. In addition it provides
a highly scalable and flexibile architecture, with plugin components to
support differnt protocols and routing decissions.
The Gateway is implemented in C and makes entensive use of the
asynchronous I/O capabilities of the Linux operating system. The epoll
system is used to provide the event driven framework for the input and
output via sockets.
The protocols are implemented as external shared object modules which
can be loaded and runtime. These modules support a fixed interface,
communicating the entries points via a structure consisting of a set of
function pointers. This structured is called the "module object".
The code that routes the queries to the database servers is also loaed
as external shared objects and are referred to as routing modules.
*/

View File

@ -24,7 +24,7 @@ CC=cc
CFLAGS=-c -I/usr/include -I../include -Wall
LDFLAGS=-rdynamic
SRCS= atomic.c buffer.c spinlock.c gateway.c gateway_mysql_protocol.c gw_utils.c \
utils.c dcb.c load_utils.c
utils.c dcb.c load_utils.c session.c service.c server.c
HDRS= ../include/atomic.h ../include/buffer.h ../include/dcb.h \
../include/gateway_mysql.h ../include/gw.h ../include/mysql_protocol.h \
../include/session.h ../include/spinlock.h ../include/thread.h \
@ -50,4 +50,7 @@ depend.mk: $(SRCS)
@rm -f depend.mk
cc -M $(CFLAGS) $(SRCS) > depend.mk
install: gateway
install -D $< $(DEST)
include depend.mk

View File

@ -42,6 +42,8 @@
#include <server.h>
#include <session.h>
#include <modules.h>
#include <errno.h>
#include <gw.h>
static DCB *allDCBs = NULL; /* Diagnotics need a list of DCBs */
static SPINLOCK *dcbspin = NULL;
@ -121,7 +123,7 @@ free_dcb(DCB *dcb)
*
* @param server The server to connect to
* @param session The session this connection is being made for
* @param protcol The protocol module to use
* @param protocol The protocol module to use
*/
DCB *
connect_dcb(SERVER *server, SESSION *session, const char *protocol)
@ -134,7 +136,7 @@ int epollfd = -1; // Need to work out how to get this
{
return NULL;
}
if ((funcs = (GWPROTOCOL *)load_module(protocol, "Protocol")) == NULL)
if ((funcs = (GWPROTOCOL *)load_module(protocol, MODULE_PROTOCOL)) == NULL)
{
free(dcb);
return NULL;
@ -155,6 +157,184 @@ int epollfd = -1; // Need to work out how to get this
return dcb;
}
/**
* General purpose read routine to read data from a socket in the
* Descriptor Control Block and append it to a linked list of buffers.
* The list may be empty, in which case *head == NULL
*
* @param dcb The DCB to read from
* @param head Pointer to linked list to append data to
* @return The numebr of bytes read or -1 on fatal error
*/
int
dcb_read(DCB *dcb, GWBUF **head)
{
GWBUF *buffer = NULL;
int b, n = 0;
ioctl(dcb->fd, FIONREAD, &b);
while (b > 0)
{
int bufsize = b < MAX_BUFFER_SIZE ? b : MAX_BUFFER_SIZE;
if ((buffer = gwbuf_alloc(bufsize)) == NULL)
{
return n ? n : -1;
}
GW_NOINTR_CALL(n = read(dcb->fd, GWBUF_DATA(buffer), bufsize); dcb->stats.n_reads++);
if (n < 0)
{
if ((errno == EAGAIN) || (errno == EWOULDBLOCK))
{
return n;
}
else
{
return n ? n : -1;
}
}
else if (n == 0)
{
return n;
}
// append read data to the gwbuf
*head = gwbuf_append(*head, buffer);
/* Re issue the ioctl as the amount of data buffered may have changed */
ioctl(dcb->fd, FIONREAD, &b);
}
return n;
}
/**
* General purpose routine to write to a DCB
*
* @param dcb The DCB of the client
* @param queue Queue of buffers to write
*/
int
dcb_write(DCB *dcb, GWBUF *queue)
{
int w, saved_errno = 0;
spinlock_acquire(&dcb->writeqlock);
if (dcb->writeq)
{
/*
* We have some queued data, so add our data to
* the write queue and return.
* The assumption is that there will be an EPOLLOUT
* event to drain what is already queued. We are protected
* by the spinlock, which will also be acquired by the
* the routine that drains the queue data, so we should
* not have a race condition on the event.
*/
dcb->writeq = gwbuf_append(dcb->writeq, queue);
dcb->stats.n_buffered++;
}
else
{
int len;
/*
* Loop over the buffer chain that has been passed to us
* from the reading side.
* Send as much of the data in that chain as possible and
* add any balance to the write queue.
*/
while (queue != NULL)
{
len = GWBUF_LENGTH(queue);
GW_NOINTR_CALL(w = write(dcb->fd, GWBUF_DATA(queue), len); dcb->stats.n_writes++);
saved_errno = errno;
if (w < 0)
{
break;
}
/*
* Pull the number of bytes we have written from
* queue with have.
*/
queue = gwbuf_consume(queue, w);
if (w < len)
{
/* We didn't write all the data */
}
}
/* Buffer the balance of any data */
dcb->writeq = queue;
if (queue)
{
dcb->stats.n_buffered++;
}
}
spinlock_release(&dcb->writeqlock);
if (queue && (saved_errno != EAGAIN || saved_errno != EWOULDBLOCK))
{
/* We had a real write failure that we must deal with */
return 0;
}
return 1;
}
/**
* Drain the write queue of a DCB. THis is called as part of the EPOLLOUT handling
* of a socket and will try to send any buffered data from the write queue
* up until the point the write would block.
*
* @param dcb DCB to drain the write queue of
* @return The number of bytes written
*/
int
dcb_drain_writeq(DCB *dcb)
{
int n = 0;
int w;
int saved_errno = 0;
spinlock_acquire(&dcb->writeqlock);
if (dcb->writeq)
{
int len;
/*
* Loop over the buffer chain in the pending writeq
* Send as much of the data in that chain as possible and
* leave any balance on the write queue.
*/
while (dcb->writeq != NULL)
{
len = GWBUF_LENGTH(dcb->writeq);
GW_NOINTR_CALL(w = write(dcb->fd, GWBUF_DATA(dcb->writeq), len););
saved_errno = errno;
if (w < 0)
{
break;
}
/*
* Pull the number of bytes we have written from
* queue with have.
*/
dcb->writeq = gwbuf_consume(dcb->writeq, w);
if (w < len)
{
/* We didn't write all the data */
}
n += w;
}
}
spinlock_release(&dcb->writeqlock);
return n;
}
/**
* Diagnostic to print a DCB
*

View File

@ -67,7 +67,8 @@ gateway.o: gateway.c ../include/gw.h /usr/include/stdio.h \
../include/gateway_mysql.h ../include/dcb.h ../include/spinlock.h \
../include/thread.h /usr/include/pthread.h /usr/include/sched.h \
/usr/include/bits/sched.h /usr/include/bits/setjmp.h ../include/buffer.h \
../include/mysql_protocol.h ../include/dcb.h ../include/session.h
../include/mysql_protocol.h ../include/dcb.h ../include/service.h \
../include/server.h ../include/session.h
gateway_mysql_protocol.o: gateway_mysql_protocol.c ../include/gw.h \
/usr/include/stdio.h /usr/include/features.h /usr/include/sys/cdefs.h \
/usr/include/bits/wordsize.h /usr/include/gnu/stubs.h \
@ -231,7 +232,34 @@ dcb.o: dcb.c /usr/include/stdio.h /usr/include/features.h \
../include/spinlock.h ../include/thread.h /usr/include/pthread.h \
/usr/include/sched.h /usr/include/bits/sched.h \
/usr/include/bits/setjmp.h ../include/buffer.h ../include/server.h \
../include/session.h ../include/modules.h
../include/session.h ../include/modules.h /usr/include/errno.h \
/usr/include/bits/errno.h /usr/include/linux/errno.h \
/usr/include/asm/errno.h /usr/include/asm-generic/errno.h \
/usr/include/asm-generic/errno-base.h ../include/gw.h \
/usr/include/ctype.h /usr/include/netdb.h /usr/include/netinet/in.h \
/usr/include/stdint.h /usr/include/bits/wchar.h \
/usr/include/sys/socket.h /usr/include/sys/uio.h /usr/include/bits/uio.h \
/usr/include/bits/socket.h /usr/include/bits/sockaddr.h \
/usr/include/asm/socket.h /usr/include/asm-generic/socket.h \
/usr/include/asm/sockios.h /usr/include/asm-generic/sockios.h \
/usr/include/bits/in.h /usr/include/rpc/netdb.h \
/usr/include/bits/netdb.h /usr/include/fcntl.h /usr/include/bits/fcntl.h \
/usr/include/bits/stat.h /usr/include/unistd.h \
/usr/include/bits/posix_opt.h /usr/include/bits/environments.h \
/usr/include/bits/confname.h /usr/include/getopt.h /usr/include/syslog.h \
/usr/include/sys/syslog.h /usr/include/bits/syslog-path.h \
/usr/include/pwd.h /usr/include/sys/epoll.h /usr/include/signal.h \
/usr/include/bits/signum.h /usr/include/bits/siginfo.h \
/usr/include/bits/sigaction.h /usr/include/bits/sigcontext.h \
/usr/include/bits/sigstack.h /usr/include/sys/ucontext.h \
/usr/include/bits/sigthread.h /usr/include/sys/ioctl.h \
/usr/include/bits/ioctls.h /usr/include/asm/ioctls.h \
/usr/include/asm-generic/ioctls.h /usr/include/linux/ioctl.h \
/usr/include/asm/ioctl.h /usr/include/asm-generic/ioctl.h \
/usr/include/bits/ioctl-types.h /usr/include/sys/ttydefaults.h \
/usr/include/arpa/inet.h \
/usr/lib/gcc/x86_64-redhat-linux/4.4.6/include/stdbool.h \
../include/gateway_mysql.h ../include/mysql_protocol.h ../include/dcb.h
load_utils.o: load_utils.c /usr/include/sys/param.h \
/usr/lib/gcc/x86_64-redhat-linux/4.4.6/include/limits.h \
/usr/lib/gcc/x86_64-redhat-linux/4.4.6/include/syslimits.h \
@ -258,3 +286,65 @@ load_utils.o: load_utils.c /usr/include/sys/param.h \
/usr/include/bits/environments.h /usr/include/bits/confname.h \
/usr/include/getopt.h /usr/include/string.h /usr/include/xlocale.h \
/usr/include/dlfcn.h /usr/include/bits/dlfcn.h ../include/modules.h
session.o: session.c /usr/include/stdio.h /usr/include/features.h \
/usr/include/sys/cdefs.h /usr/include/bits/wordsize.h \
/usr/include/gnu/stubs.h /usr/include/gnu/stubs-64.h \
/usr/lib/gcc/x86_64-redhat-linux/4.4.6/include/stddef.h \
/usr/include/bits/types.h /usr/include/bits/typesizes.h \
/usr/include/libio.h /usr/include/_G_config.h /usr/include/wchar.h \
/usr/lib/gcc/x86_64-redhat-linux/4.4.6/include/stdarg.h \
/usr/include/bits/stdio_lim.h /usr/include/bits/sys_errlist.h \
/usr/include/stdlib.h /usr/include/bits/waitflags.h \
/usr/include/bits/waitstatus.h /usr/include/endian.h \
/usr/include/bits/endian.h /usr/include/bits/byteswap.h \
/usr/include/sys/types.h /usr/include/time.h /usr/include/sys/select.h \
/usr/include/bits/select.h /usr/include/bits/sigset.h \
/usr/include/bits/time.h /usr/include/sys/sysmacros.h \
/usr/include/bits/pthreadtypes.h /usr/include/alloca.h \
/usr/include/unistd.h /usr/include/bits/posix_opt.h \
/usr/include/bits/environments.h /usr/include/bits/confname.h \
/usr/include/getopt.h /usr/include/string.h /usr/include/xlocale.h \
../include/session.h ../include/service.h ../include/spinlock.h \
../include/thread.h /usr/include/pthread.h /usr/include/sched.h \
/usr/include/bits/sched.h /usr/include/bits/setjmp.h ../include/dcb.h \
../include/buffer.h ../include/server.h ../include/router.h \
../include/atomic.h
service.o: service.c /usr/include/stdio.h /usr/include/features.h \
/usr/include/sys/cdefs.h /usr/include/bits/wordsize.h \
/usr/include/gnu/stubs.h /usr/include/gnu/stubs-64.h \
/usr/lib/gcc/x86_64-redhat-linux/4.4.6/include/stddef.h \
/usr/include/bits/types.h /usr/include/bits/typesizes.h \
/usr/include/libio.h /usr/include/_G_config.h /usr/include/wchar.h \
/usr/lib/gcc/x86_64-redhat-linux/4.4.6/include/stdarg.h \
/usr/include/bits/stdio_lim.h /usr/include/bits/sys_errlist.h \
/usr/include/stdlib.h /usr/include/bits/waitflags.h \
/usr/include/bits/waitstatus.h /usr/include/endian.h \
/usr/include/bits/endian.h /usr/include/bits/byteswap.h \
/usr/include/sys/types.h /usr/include/time.h /usr/include/sys/select.h \
/usr/include/bits/select.h /usr/include/bits/sigset.h \
/usr/include/bits/time.h /usr/include/sys/sysmacros.h \
/usr/include/bits/pthreadtypes.h /usr/include/alloca.h \
/usr/include/string.h /usr/include/xlocale.h ../include/session.h \
../include/service.h ../include/spinlock.h ../include/thread.h \
/usr/include/pthread.h /usr/include/sched.h /usr/include/bits/sched.h \
/usr/include/bits/setjmp.h ../include/dcb.h ../include/buffer.h \
../include/server.h ../include/router.h ../include/modules.h
server.o: server.c /usr/include/stdio.h /usr/include/features.h \
/usr/include/sys/cdefs.h /usr/include/bits/wordsize.h \
/usr/include/gnu/stubs.h /usr/include/gnu/stubs-64.h \
/usr/lib/gcc/x86_64-redhat-linux/4.4.6/include/stddef.h \
/usr/include/bits/types.h /usr/include/bits/typesizes.h \
/usr/include/libio.h /usr/include/_G_config.h /usr/include/wchar.h \
/usr/lib/gcc/x86_64-redhat-linux/4.4.6/include/stdarg.h \
/usr/include/bits/stdio_lim.h /usr/include/bits/sys_errlist.h \
/usr/include/stdlib.h /usr/include/bits/waitflags.h \
/usr/include/bits/waitstatus.h /usr/include/endian.h \
/usr/include/bits/endian.h /usr/include/bits/byteswap.h \
/usr/include/sys/types.h /usr/include/time.h /usr/include/sys/select.h \
/usr/include/bits/select.h /usr/include/bits/sigset.h \
/usr/include/bits/time.h /usr/include/sys/sysmacros.h \
/usr/include/bits/pthreadtypes.h /usr/include/alloca.h \
/usr/include/string.h /usr/include/xlocale.h ../include/session.h \
../include/server.h ../include/spinlock.h ../include/thread.h \
/usr/include/pthread.h /usr/include/sched.h /usr/include/bits/sched.h \
/usr/include/bits/setjmp.h

View File

@ -18,7 +18,7 @@
*/
/**
* @file Gateway.c - The gateway entry point.
* @file gateway.c - The gateway entry point.
*
* @verbatim
* Revision History
@ -33,6 +33,8 @@
*/
#include <gw.h>
#include <service.h>
#include <server.h>
#include <dcb.h>
#include <session.h>
@ -164,23 +166,44 @@ int handle_event_errors_backend(DCB *dcb, int event) {
}
// main function
int main(int argc, char **argv) {
int daemon_mode = 1;
sigset_t sigset;
struct epoll_event events[MAX_EVENTS];
struct epoll_event ev;
int nfds;
int n;
char *port = NULL;
int
main(int argc, char **argv)
{
int daemon_mode = 1;
sigset_t sigset;
struct epoll_event events[MAX_EVENTS];
struct epoll_event ev;
int nfds;
int n;
unsigned short port = 4406;
SERVICE *service;
SERVER *server1, *server2, *server3;
for (n = 0; n < argc; n++)
{
if (strncmp(argv[n], "-p", 2) == 0)
{
port = &argv[n][2];
port = atoi(&argv[n][2]);
}
}
/*
* Build the services etc. This would normally be done by the
* configuration, however in lieu of that being available we
* will build a static configuration here
*/
if ((service = service_alloc("Test Service", "readconnroute")) == NULL)
exit(1);
serviceAddProtocol(service, "MySQLClient", port);
server1 = server_alloc("127.0.0.1", "MySQLBackend", 3306);
server2 = server_alloc("127.0.0.1", "MySQLBackend", 3307);
server3 = server_alloc("127.0.0.1", "MySQLBackend", 3308);
serviceAddBackend(service, server1);
serviceAddBackend(service, server2);
serviceAddBackend(service, server3);
fprintf(stderr, "(C) SkySQL Ab 2013\n");
load_module("testroute", "Router");
@ -220,6 +243,11 @@ int main(int argc, char **argv) {
exit(EXIT_FAILURE);
}
/*
* Start the service that was created above
*/
serviceStart(service, epollfd);
fprintf(stderr, ">> GATEWAY epoll maxevents is %i\n", MAX_EVENTS);
// listen to MySQL protocol

View File

@ -273,7 +273,7 @@ int gw_read_gwbuff(DCB *dcb, GWBUF **head, int b) {
* - backend connection using data in MySQL session
*
* @param client_dcb The client DCB struct
* @param epfd The epoll set to add the new connection
* @param efd The epoll set to add the new connection
* @return 0 on Success or 1 on Failure.
*/
int create_backend_connection(DCB *client_dcb, int efd) {

View File

@ -52,7 +52,6 @@ static void unregister_module(const char *module);
*
* @param module Name of the module to load
* @param type Type of module, used purely for registration
* @param entry Routine to call to extract entry points
* @return The module specific entry point structure or NULL
*/
void *

141
core/server.c Normal file
View File

@ -0,0 +1,141 @@
/*
* This file is distributed as part of the SkySQL Gateway. It is free
* software: you can redistribute it and/or modify it under the terms of the
* GNU General Public License as published by the Free Software Foundation,
* version 2.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along with
* this program; if not, write to the Free Software Foundation, Inc., 51
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Copyright SkySQL Ab 2013
*/
/**
* @file server.c - A representation of a backend server within the gateway.
*
* @verbatim
* Revision History
*
* Date Who Description
* 18/06/13 Mark Riddoch Initial implementation
*
* @endverbatim
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <session.h>
#include <server.h>
#include <spinlock.h>
static SPINLOCK server_spin = SPINLOCK_INIT;
static SERVER *allServers = NULL;
/**
* Allocate a new server withn the gateway
*
*
* @param servname The server name
* @param protocol The protocol to use to connect to the server
* @param port The port to connect to
*
* @return The newly created server or NULL if an error occured
*/
SERVER *
server_alloc(char *servname, char *protocol, unsigned short port)
{
SERVER *server;
if ((server = (SERVER *)malloc(sizeof(SERVER))) == NULL)
return NULL;
server->name = strdup(servname);
server->protocol = strdup(protocol);
server->port = port;
memset(&server->stats, sizeof(SERVER_STATS), 0);
server->nextdb = NULL;
spinlock_acquire(&server_spin);
server->next = allServers;
allServers = server;
spinlock_release(&server_spin);
return server;
}
/**
* Deallocate the specified server
*
* @param server The service to deallocate
* @return Returns true if the server was freed
*/
int
server_free(SERVER *server)
{
SERVER *ptr;
/* First of all remove from the linked list */
spinlock_acquire(&server_spin);
if (allServers == server)
{
allServers = server->next;
}
else
{
ptr = allServers;
while (ptr && ptr->next != server)
{
ptr = ptr->next;
}
if (ptr)
ptr->next = server->next;
}
spinlock_release(&server_spin);
/* Clean up session and free the memory */
free(server->name);
free(server->protocol);
free(server);
return 1;
}
/**
* Print details of an individual server
*
* @param server Server to print
*/
void
printServer(SERVER *server)
{
printf("Server %p\n", server);
printf("\tServer: %s\n", server->name);
printf("\tProtocol: %s\n", server->protocol);
printf("\tPort: %d\n", server->port);
}
/**
* Print all servers
*
* Designed to be called within a debugger session in order
* to display all active servers within the gateway
*/
void
printAllServers()
{
SERVER *ptr;
spinlock_acquire(&server_spin);
ptr = allServers;
while (ptr)
{
printServer(ptr);
ptr = ptr->next;
}
spinlock_release(&server_spin);
}

249
core/service.c Normal file
View File

@ -0,0 +1,249 @@
/*
* This file is distributed as part of the SkySQL Gateway. It is free
* software: you can redistribute it and/or modify it under the terms of the
* GNU General Public License as published by the Free Software Foundation,
* version 2.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along with
* this program; if not, write to the Free Software Foundation, Inc., 51
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Copyright SkySQL Ab 2013
*/
/**
* @file service.c - A representation of the service within the gateway.
*
* @verbatim
* Revision History
*
* Date Who Description
* 18/06/13 Mark Riddoch Initial implementation
*
* @endverbatim
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <session.h>
#include <service.h>
#include <server.h>
#include <router.h>
#include <spinlock.h>
#include <modules.h>
static SPINLOCK service_spin = SPINLOCK_INIT;
static SERVICE *allServices = NULL;
/**
* Allocate a new service for the gateway to support
*
*
* @param servname The service name
* @param router Name of the router module this service uses
*
* @return The newly created service or NULL if an error occured
*/
SERVICE *
service_alloc(char *servname, char *router)
{
SERVICE *service;
if ((service = (SERVICE *)malloc(sizeof(SERVICE))) == NULL)
return NULL;
if ((service->router = load_module(router, MODULE_ROUTER)) == NULL)
{
free(service);
return NULL;
}
service->name = strdup(servname);
service->routerModule = strdup(router);
memset(&service->stats, sizeof(SERVICE_STATS), 0);
service->ports = NULL;
service->stats.started = time(0);
service->state = SERVICE_STATE_ALLOC;
spinlock_acquire(&service_spin);
service->next = allServices;
allServices = service;
spinlock_release(&service_spin);
return service;
}
/**
* Start a service
*
* This function loads the protocol modules for each port on which the
* service listens and starts the listener on that port
*
* Also create the router_instance for the service.
*
* @param service The Service that should be started
* @param efd The epoll descriptor
* @return Returns the number of listeners created
*/
int
serviceStart(SERVICE *service, int efd)
{
SERV_PROTOCOL *port;
int listeners = 0;
char config_bind[40];
GWPROTOCOL *funcs;
port = service->ports;
while (port)
{
if ((port->listener = alloc_dcb()) == NULL)
{
break;
}
if ((funcs = (GWPROTOCOL *)load_module(port->protocol, MODULE_PROTOCOL)) == NULL)
{
free(port->listener);
port->listener = NULL;
}
memcpy(&(port->listener->func), funcs, sizeof(GWPROTOCOL));
port->listener->session = NULL;
sprintf(config_bind, "0.0.0.0:%d", port->port);
if (port->listener->func.listen(efd, config_bind))
listeners++;
}
port->listener->session = session_alloc(service, port->listener);
port->listener->session->state = SESSION_STATE_LISTENER;
if (listeners)
service->stats.started = time(0);
service->router_instance = service->router->createInstance(service);
return listeners;
}
/**
* Deallocate the specified service
*
* @param service The service to deallocate
* @return Returns true if the service was freed
*/
int
service_free(SERVICE *service)
{
SERVICE *ptr;
if (service->stats.n_current)
return 0;
/* First of all remove from the linked list */
spinlock_acquire(&service_spin);
if (allServices == service)
{
allServices = service->next;
}
else
{
ptr = allServices;
while (ptr && ptr->next != service)
{
ptr = ptr->next;
}
if (ptr)
ptr->next = service->next;
}
spinlock_release(&service_spin);
/* Clean up session and free the memory */
free(service->name);
free(service->routerModule);
free(service);
return 1;
}
/**
* Add a protocol/port pair to the service
*
* @param service The service
* @param protocol The name of the protocol module
* @param port The port to listen on
* @return TRUE if the protocol/port could be added
*/
int
serviceAddProtocol(SERVICE *service, char *protocol, unsigned short port)
{
SERV_PROTOCOL *proto;
if ((proto = (SERV_PROTOCOL *)malloc(sizeof(SERV_PROTOCOL))) == NULL)
{
return 0;
}
proto->protocol = strdup(protocol);
proto->port = port;
spinlock_acquire(&service->spin);
proto->next = service->ports;
service->ports = proto;
spinlock_release(&service->spin);
return 1;
}
/**
* Add a backend database server to a service
*
* @param service
* @param server
*/
void
serviceAddBackend(SERVICE *service, SERVER *server)
{
spinlock_acquire(&service->spin);
server->nextdb = service->databases;
service->databases = server;
spinlock_release(&service->spin);
}
/**
* Print details of an individual service
*
* @param service Service to print
*/
void
printService(SERVICE *service)
{
SERVER *ptr = service->databases;
printf("Service %p\n", service);
printf("\tService: %s\n", service->name);
printf("\tRouter: %s (%p)\n", service->routerModule, service->router);
printf("\tStarted: %s", asctime(localtime(&service->stats.started)));
printf("\tBackend databases\n");
while (ptr)
{
printf("\t\t%s:%d %s\n", ptr->name, ptr->port, ptr->protocol);
ptr = ptr->next;
}
printf("\tTotal connections: %d\n", service->stats.n_sessions);
printf("\tCurrently connected: %d\n", service->stats.n_current);
}
/**
* Print all services
*
* Designed to be called within a debugger session in order
* to display all active services within the gateway
*/
void
printAllServices()
{
SERVICE *ptr;
spinlock_acquire(&service_spin);
ptr = allServices;
while (ptr)
{
printService(ptr);
ptr = ptr->next;
}
spinlock_release(&service_spin);
}

148
core/session.c Normal file
View File

@ -0,0 +1,148 @@
/*
* This file is distributed as part of the SkySQL Gateway. It is free
* software: you can redistribute it and/or modify it under the terms of the
* GNU General Public License as published by the Free Software Foundation,
* version 2.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along with
* this program; if not, write to the Free Software Foundation, Inc., 51
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Copyright SkySQL Ab 2013
*/
/**
* @file session.c - A representation of the session within the gateway.
*
* @verbatim
* Revision History
*
* Date Who Description
* 17/06/13 Mark Riddoch Initial implementation
*
* @endverbatim
*/
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <session.h>
#include <service.h>
#include <router.h>
#include <dcb.h>
#include <spinlock.h>
#include <atomic.h>
static SPINLOCK session_spin = SPINLOCK_INIT;
static SESSION *allSessions = NULL;
/**
* Allocate a new session for a new client of the specified service.
*
* Create the link to the router session by calling the newSession
* entry point of the router using the router instance of the
* service this session is part of.
*
* @param service The service this connection was established by
* @param client The client side DCB
* @return The newly created session or NULL if an error occured
*/
SESSION *
session_alloc(SERVICE *service, DCB *client)
{
SESSION *session;
if ((session = (SESSION *)malloc(sizeof(SESSION))) == NULL)
return NULL;
session->service = service;
session->client = client;
memset(&session->stats, sizeof(SESSION_STATS), 0);
session->stats.connect = time(0);
session->state = SESSION_STATE_ALLOC;
client->session = session;
session->router_session = service->router->newSession(service->router_instance, session);
spinlock_acquire(&session_spin);
session->next = allSessions;
allSessions = session;
spinlock_release(&session_spin);
atomic_add(&service->stats.n_sessions, 1);
atomic_add(&service->stats.n_current, 1);
return session;
}
/**
* Deallocate the specified session
*
* @param session The session to deallocate
*/
void
session_free(SESSION *session)
{
SESSION *ptr;
/* First of all remove from the linked list */
spinlock_acquire(&session_spin);
if (allSessions == session)
{
allSessions = session->next;
}
else
{
ptr = allSessions;
while (ptr && ptr->next != session)
{
ptr = ptr->next;
}
if (ptr)
ptr->next = session->next;
}
spinlock_release(&session_spin);
atomic_add(&session->service->stats.n_current, -1);
/* Clean up session and free the memory */
free(session);
}
/**
* Print details of an individual session
*
* @param session Session to print
*/
void
printSession(SESSION *session)
{
printf("Session %p\n", session);
printf("\tService: %s (%p)\n", session->service->name, session->service);
printf("\tClient DCB: %p\n", session->client);
printf("\tConnected: %s", asctime(localtime(&session->stats.connect)));
}
/**
* Print all sessions
*
* Designed to be called within a debugger session in order
* to display all active sessions within the gateway
*/
void
printAllSessions()
{
SESSION *ptr;
spinlock_acquire(&session_spin);
ptr = allSessions;
while (ptr)
{
printSession(ptr);
ptr = ptr->next;
}
spinlock_release(&session_spin);
}

View File

@ -568,7 +568,7 @@ WARN_LOGFILE =
# directories like "/usr/src/myproject". Separate the files or directories
# with spaces.
INPUT = core modules include
INPUT = README core modules include
# This tag can be used to specify the character encoding of the source files
# that doxygen parses. Internally doxygen uses the UTF-8 encoding, which is

View File

@ -39,6 +39,15 @@
*
* @endverbatim
*/
/**
* The buffer structure used by the descriptor control blocks.
*
* Linked lists of buffers are created as data is read from a descriptor
* or written to a descriptor. The use of linked lists of buffers with
* flexible data pointers is designed to minimise the need for data to
* be copied within the gateway.
*/
typedef struct gwbuf {
struct gwbuf *next; /**< Next buffer in a linked chain of buffers */
void *start; /**< Start of the valid data */
@ -49,10 +58,18 @@ typedef struct gwbuf {
/*
* Macros to access the data in the buffers
*/
#define GWBUF_DATA(b) ((b)->start)
#define GWBUF_LENGTH(b) ((b)->end - (b)->start)
#define GWBUF_EMPTY(b) ((b)->start == (b)->end)
#define GWBUF_CONSUME(b, bytes) (b)->start += bytes
#define GWBUF_DATA(b) ((b)->start) /**< First valid, uncomsumed
* byte in the buffer
*/
#define GWBUF_LENGTH(b) ((b)->end - (b)->start) /**< Number of bytes in the
* individual buffer
*/
#define GWBUF_EMPTY(b) ((b)->start == (b)->end) /**< True if all bytes in the
* buffer have been consumed
*/
#define GWBUF_CONSUME(b, bytes) (b)->start += bytes /**< Consume a number of bytes
* in the buffer
*/
/*
* Function prototypes for the API to maniplate the buffers

View File

@ -36,6 +36,7 @@ struct server;
* 01/06/13 Mark Riddoch Initial implementation
* 11/06/13 Mark Riddoch Updated GWPROTOCOL structure with new
* entry points
* 18/06/13 Mark Riddoch Addition of the listener entry point
*
* @endverbatim
*/
@ -56,7 +57,12 @@ struct dcb;
* connect Create a connection to the specified server
* for the session pased in
* close Gateway close entry point for the socket
* listen Create a listener for the protocol
* @endverbatim
*
* This forms the "module object" for protocol modules within the gateway.
*
* @see load_module
*/
typedef struct gw_protocol {
int (*read)(struct dcb *, int);
@ -67,16 +73,29 @@ typedef struct gw_protocol {
int (*accept)(struct dcb *, int);
int (*connect)(struct server *, struct session *, int);
int (*close)(struct dcb *, int);
int (*listen)(int, char *);
} GWPROTOCOL;
/**
* The statitics gathered on a descriptor control block
*/
typedef struct dcbstats {
int n_reads; /**< Number of reads on this descriptor */
int n_writes; /**< Number of writes on this descriptor */
int n_accepts; /**< Number of accepts on this descriptor */
int n_buffered; /**< Number of buffered writes */
} DCBSTATS;
/*
/**
* Descriptor Control Block
*
* A wrapper for a network descriptor within the gateway, it contains all the
* state information necessary to allow for the implementation of the asynchronous
* operation of the potocol and gateway functions. It also provides links to the service
* and session data that is required to route the information within the gateway.
*
* It is important to hold the state information here such that any thread within the
* gateway may be selected to execute the required actions when a network event occurs.
*/
typedef struct dcb {
int fd; /**< The descriptor */
@ -94,13 +113,13 @@ typedef struct dcb {
} DCB;
/* DCB states */
#define DCB_STATE_ALLOC 0 /* Memory allocated but not populated */
#define DCB_STATE_IDLE 1 /* Not yet in the poll mask */
#define DCB_STATE_POLLING 2 /* Waiting in the poll loop */
#define DCB_STATE_PROCESSING 4 /* Processing an event */
#define DCB_STATE_LISTENING 5 /* The DCB is for a listening socket */
#define DCB_STATE_DISCONNECTED 6 /* The socket is now closed */
#define DCB_STATE_FREED 7 /* Memory freed */
#define DCB_STATE_ALLOC 0 /**< Memory allocated but not populated */
#define DCB_STATE_IDLE 1 /**< Not yet in the poll mask */
#define DCB_STATE_POLLING 2 /**< Waiting in the poll loop */
#define DCB_STATE_PROCESSING 4 /**< Processing an event */
#define DCB_STATE_LISTENING 5 /**< The DCB is for a listening socket */
#define DCB_STATE_DISCONNECTED 6 /**< The socket is now closed */
#define DCB_STATE_FREED 7 /**< Memory freed */
/* A few useful macros */
#define DCB_SESSION(x) (x)->session
@ -109,6 +128,9 @@ typedef struct dcb {
extern DCB *alloc_dcb(); /* Allocate a DCB */
extern void free_dcb(DCB *); /* Free a DCB */
extern DCB *connect_dcb(struct server *, struct session *, const char *);
extern int dcb_read(DCB *, GWBUF **); /* Generic read routine */
extern int dcb_write(DCB *, GWBUF *); /* Generic write routine */
extern int dcb_drain_writeq(DCB *); /* Generic write routine */
extern void printAllDCBs(); /* Debug to print all DCB in the system */
extern void printDCB(DCB *); /* Debug print routine */
extern const char *gw_dcb_state2string(int); /* DCB state to string */

View File

@ -41,6 +41,13 @@ typedef struct modules {
*next; /**< Next module in the linked list */
} MODULES;
/**
* Module types
*/
#define MODULE_PROTOCOL "Protocol" /**< A protocol module type */
#define MODULE_ROUTER "Router" /**< A router module type */
extern void *load_module(const char *module, const char *type);
extern void unload_module(const char *module);
extern void printModules();

View File

@ -51,8 +51,10 @@ typedef void *ROUTER;
* routeQuery Called on each query that requires
* routing
* @endverbatim
*
* @see load_module
*/
typedef struct {
typedef struct router_object {
ROUTER *(*createInstance)(SERVICE *service);
void *(*newSession)(ROUTER *instance, SESSION *session);
void (*closeSession)(ROUTER *instance, void *router_session);

View File

@ -19,7 +19,7 @@
*/
/**
* @file server.h
* @file service.h
*
* The server level definitions within the gateway
*
@ -31,10 +31,32 @@
*
* @endverbatim
*/
/**
* The server statistics structure
*
*/
typedef struct {
int n_connections; /**< Number of connections */
} SERVER_STATS;
/**
* The SERVER structure defines a backend server. Each server has a name
* or IP address for the server, a port that the server listens on and
* the name of a protocol module that is loaded to implement the protocol
* between the gateway and the server.
*/
typedef struct server {
char *name; /**< Server name/IP address*/
int port; /**< Port to listen on */
unsigned short port; /**< Port to listen on */
char *protocol; /**< Protocol module to use */
SERVER_STATS stats; /**< The server statistics */
struct server *next; /**< Next server */
struct server *nextdb; /**< Next server in lsit attached to a service */
} SERVER;
extern SERVER *server_alloc(char *, char *, unsigned short);
extern int server_free(SERVER *);
extern void printServer(SERVER *);
extern void printAllServers();
#endif

View File

@ -18,8 +18,13 @@
* Copyright SkySQL Ab 2013
*/
#include <time.h>
#include <spinlock.h>
#include <dcb.h>
#include <server.h>
/**
* @file sevice.h
* @file service.h
*
* The service level definitions within the gateway
*
@ -28,27 +33,69 @@
*
* Date Who Description
* 14/06/13 Mark Riddoch Initial implementation
* 18/06/13 Mark Riddoch Addition of statistics and function
* prototypes
*
* @endverbatim
*/
struct server;
struct router;
struct router_object;
/**
* The servprotocol structure is used to link a service to the protocols that
* are used to support that service. It defines the name of the protocol module
* that should be loaded to support the client connection and the port that the
* protocol should use to listen for incoming client connections.
*/
typedef struct servprotocol {
char *protocol; /**< Protocol module to load */
short port; /**< Port to listen on */
unsigned short port; /**< Port to listen on */
DCB *listener; /**< The DCB for the listener */
struct servprotocol
*next; /**< Next service protocol */
} SERV_PROTOCOL;
/**
* The service statistics structure
*/
typedef struct {
time_t started; /**< The time when the service was started */
int n_sessions; /**< Number of sessions created on service since start */
int n_current; /**< Current number of sessions */
} SERVICE_STATS;
/**
* Defines a service within the gateway.
*
* A service is a combination of a set of backend servers, a routing mechanism
* and a set of client side protocol/port pairs used to listen for new connections
* to the service.
*/
typedef struct service {
char *name; /**< The service name */
SERV_PROTOCOL *servers; /**< Linked list of ports and protocols
int state; /**< The service state */
SERV_PROTOCOL *ports; /**< Linked list of ports and protocols
* that this service will listen on.
*/
char *routerModule; /**< Name of router module to use */
struct router *router; /**< The router we are using */
struct router_object
*router; /**< The router we are using */
void *router_instance;
/**< The router instance for this service */
struct server *databases; /**< The set of servers in the backend */
SPINLOCK spin; /**< The service spinlock */
SERVICE_STATS stats; /**< The service statistics */
struct service *next; /**< The next service in the linked list */
} SERVICE;
#define SERVICE_STATE_ALLOC 1 /**< The service has been allocated */
#define SERVICE_STATE_STARTED 2 /**< The service has been started */
extern SERVICE *service_alloc(char *, char *);
extern int service_free(SERVICE *);
extern int serviceAddProtocol(SERVICE *, char *, unsigned short);
extern void serviceAddBackend(SERVICE *, SERVER *);
extern int serviceStart(SERVICE *, int);
extern void printService(SERVICE *);
extern void printAllServices();
#endif

View File

@ -30,21 +30,44 @@
* for session specific data
* @endverbatim
*/
#include <time.h>
struct dcb;
struct service;
/*
/**
* The session statistics structure
*/
typedef struct {
time_t connect; /**< Time when the session was started */
} SESSION_STATS;
/**
* The session status block
*
* A session status block is created for each user (client) connection
* to the database, it links the descriptors, routing implementation
* and originating service together for the client session.
*/
typedef struct session {
int state; /**< Current descriptor state */
struct dcb *client; /**< The client connection */
struct dcb *backends; /**< The set of backend servers */
void *data; /**< The session data */
void *router_session;/**< The router insatnce data */
SESSION_STATS stats; /**< Session statistics */
struct service *service; /**< The service this session is using */
struct session *next; /**< Linked list of all sessions */
} SESSION;
#define SESSION_STATE_ALLOC 0
#define SESSION_STATE_READY 1
#define SESSION_STATE_LISTENER 2
#define SESSION_PROTOCOL(x, type) DCB_PROTOCOL((x)->client, type)
extern SESSION *session_alloc(struct service *, struct dcb *);
extern void session_free(SESSION *);
extern void printAllSessions();
extern void printSession(SESSION *);
#endif

View File

@ -46,6 +46,12 @@ typedef struct spinlock {
#define FALSE (1 == 0)
#endif
#if DEBUG
#define SPINLOCK_INIT { 0, 0, 0, NULL }
#else
#define SPINLOCK_INIT { 0 }
#endif
extern void spinlock_init(SPINLOCK *lock);
extern void spinlock_acquire(SPINLOCK *lock);
extern int spinlock_acquire_nowait(SPINLOCK *lock);

View File

@ -29,8 +29,9 @@ MYSQLBACKENDOBJ=$(MYSQLBACKENDSRCS:.c=.o)
SRCS=$(MYSQLCLIENTSRCS) $(MYSQLBACKENDSRCS)
OBJ=$(SRCS:.c=.o)
LIBS=
MODULES=libMySQLClient.so libMySQLBackend.so
all: libMySQLClient.so libMySQLBackend.so
all: $(MODULES)
libMySQLClient.so: $(MYSQLCLIENTOBJ)
$(CC) $(LDFLAGS) $(MYSQLCLIENTOBJ) $(LIBS) -o $@
@ -46,3 +47,6 @@ clean:
tags:
ctags $(SRCS) $(HDRS)
install: $(MODULES)
install -D $< $(DEST)/gateway/modules

View File

@ -16,7 +16,9 @@
* Copyright SkySQL Ab 2013
*/
/*
/**
* @file mysql_client.c
*
* MySQL Protocol module for handling the protocol between the gateway
* and the client.
*
@ -59,7 +61,7 @@ static GWPROTOCOL MyObject = {
gw_MySQLListener /* Listen */
};
/*
/**
* Implementation of the mandatory version entry point
*
* @return version string of the module
@ -70,7 +72,7 @@ version()
return version_str;
}
/*
/**
* The module initialisation routine, called when the module
* is first loaded.
*/
@ -80,7 +82,7 @@ ModuleInit()
fprintf(stderr, "Initial MySQL Client Protcol module.\n");
}
/*
/**
* The module entry point routine. It is this routine that
* must populate the structure that is referred to as the
* "module object", this is a structure with the set of
@ -94,7 +96,7 @@ GetModuleObject()
return &MyObject;
}
/*
/**
* mysql_send_ok
*
* Send a MySQL protocol OK message to the dcb (client)
@ -175,16 +177,16 @@ mysql_send_ok(DCB *dcb, int packet_number, int in_affected_rows, const char* mys
return sizeof(mysql_packet_header) + mysql_payload_size;
}
/*
/**
* mysql_send_auth_error
*
* Send a MySQL protocol ERR message, for gatewayauthentication error to the dcb
* Send a MySQL protocol ERR message, for gateway authentication error to the dcb
*
* @param dcb Descriptor Control Block for the connection to which the OK is sent
* @param packet_number
* @param in_affected_rows
* @param mysql_message
* @return packet lenght
* @return packet length
*
*/
int
@ -259,11 +261,11 @@ mysql_send_auth_error (DCB *dcb, int packet_number, int in_affected_rows, const
return sizeof(mysql_packet_header) + mysql_payload_size;
}
/*
/**
* MySQLSendHandshake
*
* @param dcb The descriptor control block to use for sendign the handshake request
* @return
* @param dcb The descriptor control block to use for sending the handshake request
* @return The packet length sent
*/
int
MySQLSendHandshake(DCB* dcb)
@ -287,6 +289,7 @@ MySQLSendHandshake(DCB* dcb)
uint8_t mysql_filler_ten[10];
uint8_t mysql_last_byte = 0x00;
char server_scramble[GW_MYSQL_SCRAMBLE_SIZE + 1]="";
MySQLProtocol *protocol = DCB_PROTOCOL(dcb, MySQLProtocol);
GWBUF *buf;
@ -406,7 +409,7 @@ MySQLSendHandshake(DCB* dcb)
return sizeof(mysql_packet_header) + mysql_payload_size;
}
/*
/**
* gw_mysql_do_authentication
*
* Performs the MySQL protocol 4.1 authentication, using data in GWBUF *queue
@ -415,7 +418,7 @@ MySQLSendHandshake(DCB* dcb)
* client_capabilitiesa are copied into the dcb->protocol
*
* @param dcb Descriptor Control Block of the client
* @param The GWBUF with data from client
* @param queue The GWBUF with data from client
* @return 0 for Authentication ok, !=1 for failed autht
*
*/
@ -578,7 +581,7 @@ static int gw_check_mysql_scramble_data(uint8_t *token, unsigned int token_len,
return memcmp(password, check_hash, SHA_DIGEST_LENGTH);
}
/*
/**
* Write function for client DCB: writes data from Gateway to Client
*
* @param dcb The DCB of the client
@ -652,9 +655,13 @@ int w, saved_errno = 0;
return 0;
}
//////////////////////////////////////////
//client read event triggered by EPOLLIN
//////////////////////////////////////////
/**
* Client read event triggered by EPOLLIN
*
* @param dcb Descriptor control block
* @param epfd Epoll descriptor
* @return TRUE on error
*/
int gw_read_client_event(DCB* dcb, int epfd) {
MySQLProtocol *protocol = NULL;
uint8_t buffer[MAX_BUFFER_SIZE] = "";
@ -762,9 +769,9 @@ int gw_read_client_event(DCB* dcb, int epfd) {
fprintf(stderr, "COM_QUIT received\n");
if (dcb->session->backends) {
dcb->session->backends->func.write(dcb, queue);
(dcb->session->backends->func).error(dcb->session->backends, epfd, -1);
(dcb->session->backends->func).error(dcb->session->backends, epfd);
}
(dcb->func).error(dcb, epfd, -1);
(dcb->func).error(dcb, epfd);
return 1;
}
@ -843,9 +850,9 @@ int gw_write_client_event(DCB *dcb, int epfd) {
// still to implement
mysql_send_auth_error(dcb, 2, 0, "Authorization failed");
dcb->func.error(dcb, epfd, -1);
dcb->func.error(dcb, epfd);
if (dcb->session->backends)
dcb->session->backends->func.error(dcb->session->backends, epfd, -1);
dcb->session->backends->func.error(dcb->session->backends, epfd);
return 0;
}
@ -1030,28 +1037,24 @@ int gw_MySQLAccept(DCB *listener, int efd) {
setsockopt(c_sock, SOL_SOCKET, SO_SNDBUF, &sendbuf, optlen);
setnonblocking(c_sock);
client = (DCB *) calloc(1, sizeof(DCB));
session = (SESSION *) calloc(1, sizeof(SESSION));
protocol = (MySQLProtocol *) calloc(1, sizeof(MySQLProtocol));
client = alloc_dcb();
client->fd = c_sock;
client->state = DCB_STATE_ALLOC;
session = session_alloc(listener->session->service, client);
client->session = session;
protocol = (MySQLProtocol *) calloc(1, sizeof(MySQLProtocol));
client->protocol = (void *)protocol;
session->state = SESSION_STATE_ALLOC;
session->client = client;
session->backends = NULL;
protocol->state = MYSQL_ALLOC;
protocol->descriptor = client;
protocol->fd = c_sock;
session->backends = NULL;
// assign function poiters to "func" field
(client->func).error = gw_error_client_event;
(client->func).read = gw_read_client_event;
(client->func).write = gw_MySQLWrite_client;
(client->func).write_ready = gw_write_client_event;
memcpy(&client->func, &MyObject, sizeof(GWPROTOCOL));
// edge triggering flag added
ee.events = EPOLLIN | EPOLLOUT | EPOLLET;
@ -1084,6 +1087,8 @@ int gw_MySQLAccept(DCB *listener, int efd) {
*/
static int gw_error_client_event(DCB *dcb, int epfd, int event) {
struct epoll_event ed;
MySQLProtocol *protocol = DCB_PROTOCOL(dcb, MySQLProtocol);
fprintf(stderr, "#### Handle error function for [%i] is [%s]\n", dcb->state, gw_dcb_state2string(dcb->state));

View File

@ -18,9 +18,26 @@
#include <stdio.h>
#include <dcb.h>
#include <buffer.h>
#include <service.h>
#include <session.h>
#include <sys/epoll.h>
#include <sys/ioctl.h>
#include <errno.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
/**
* @file telnetd - telnet daemon protocol module
* @file telnetd.c - telnet daemon protocol module
*
* The telnetd protocol module is intended as a mechanism to allow connections
* into the gateway for the purpsoe of accessing debugging information within
* the gateway rather than a protocol to be used to send queries to backend
* databases.
*
* In the first instance it is intended to allow a debug connection to access
* internal data structures, however it may also be used to manage the
* configuration of the gateway.
*
* @verbatim
* Revision History
@ -39,6 +56,7 @@ static int telnetd_error(DCB *dcb, int event);
static int telnetd_hangup(DCB *dcb, int event);
static int telnetd_accept(DCB *dcb, int event);
static int telnetd_close(DCB *dcb, int event);
static int telnetd_listen(DCB *dcb, int event);
/**
* The "module object" for the telnetd protocol module.
@ -51,7 +69,8 @@ static GWPROTOCOL MyObject = {
telnetd_hangup, /**< HangUp - EPOLLHUP handler */
telnetd_accept, /**< Accept */
NULL, /**< Connect */
telnetd_close /**< Close */
telnetd_close, /**< Close */
telnetd_listen /**< Create a listener */
};
/**
@ -72,7 +91,7 @@ version()
void
ModuleInit()
{
fprintf(stderr, "Initialise Telnetd Protcol module.\n");
fprintf(stderr, "Initialise Telnetd Protocol module.\n");
}
/**
@ -99,6 +118,19 @@ GetModuleObject()
static int
telnetd_read_event(DCB* dcb, int epfd)
{
int n;
GWBUF *head = NULL;
SESSION *session = dcb->session;
ROUTER_OBJECT *router = session->service->router;
ROUTER *router_instanc = session->service->router_instance;
void *rsession = session->router_session;
if ((n = dcb_read(dcb, &head)) != -1)
{
router->routeQuery(router_instance, rsession, head);
}
return n;
}
/**
@ -111,6 +143,7 @@ telnetd_read_event(DCB* dcb, int epfd)
static int
telnetd_write_event(DCB *dcb, int epfd)
{
int dcb_drain_writeq(dcb);
}
/**
@ -125,13 +158,14 @@ telnetd_write_event(DCB *dcb, int epfd)
static int
telnetd_write(DCB *dcb, GWBUF *queue)
{
return dcb_write(dcb, queue);
}
/**
* Handler for the EPOLLERR event.
*
* @param dcb The descriptor control block
* @param epfd The epoll descriptor
* @param event The epoll descriptor
*/
static int
telnetd_error(DCB *dcb, int event)
@ -142,7 +176,7 @@ telnetd_error(DCB *dcb, int event)
* Handler for the EPOLLHUP event.
*
* @param dcb The descriptor control block
* @param epfd The epoll descriptor
* @param event The epoll descriptor
*/
static int
telnetd_hangup(DCB *dcb, int event)
@ -154,11 +188,45 @@ telnetd_hangup(DCB *dcb, int event)
* socket for the protocol.
*
* @param dcb The descriptor control block
* @param epfd The epoll descriptor
* @param event The epoll descriptor
*/
static int
telnetd_accept(DCB *dcb, int event)
{
int n_connect = 0;
while (1)
{
int so;
struct sockaddr_in addr;
socklen_t addrlen;
DCB *client;
struct epoll_event ee;
if ((so = accept(dcb->fd, (struct sockaddr *)&addr, &addrlen)) == -1)
return n_connect;
else
{
atomic_add(&dcb->stats.n_accepts, 1);
client = alloc_dcb();
client->fd = so;
memcpy(&client->func, MyObject, sizeof(GWPROTOCOL));
client->session = session_alloc(listener->service, client);
ee.events = EPOLLIN | EPOLLOUT | EPOLLET;
ee.data.ptr = client;
client->state = DCB_STATE_IDLE;
if (epoll_ctl(efd, EPOLL_CTL_ADD, c_sock, &ee) == -1)
{
return n_connect;
}
n_connect++;
dcb_printf(client, "Gateway> ");
}
}
return n_connect;
}
/**
@ -166,7 +234,7 @@ telnetd_accept(DCB *dcb, int event)
* explicitly close a connection.
*
* @param dcb The descriptor control block
* @param epfd The epoll descriptor
* @param event The epoll descriptor
*/
static int
@ -174,4 +242,61 @@ telnetd_close(DCB *dcb, int event)
{
}
/**
* Telnet daemon listener entry point
*
* @param efd epoll File descriptor
* @param config Configuration (ip:port)
*/
static int
telnetd_listen(int efd, char *config)
{
DCB *listener;
struct sockaddr_in addr;
char *port;
struct epoll_event ev;
int one;
if ((listener = dcb_alloc()) == NULL)
{
return 0;
}
memcpy(&listener->func, MyObject, sizeof(GWPROTOCOL));
port = strrchr(config, ':');
if (port)
port++;
else
port = "4442";
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = htonl(INADDR_ANY);
addr.sin_port = htons(port);
if ((listener->fd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
{
return 0;
}
// socket options
setsockopt(listener->fd, SOL_SOCKET, SO_REUSEADDR, (char *)&one, sizeof(one));
// set NONBLOCKING mode
setnonblocking(listener->fd);
bind address and port
if (bind(listener->fd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0)
{
return 0;
}
listener->state = DCB_STATE_LISTENING;
listen(listener->fd, SOMAXCONN);
ev.events = EPOLLIN;
ev.data.ptr = listener;
if (epoll_ctl(epfd, EPOLL_CTL_ADD, l_so, &ev) == -1)
{
return 0;
}
return 1;
}

View File

@ -28,8 +28,9 @@ READCONOBJ=$(READCONSRCS:.c=.o)
SRCS=$(TESTSRCS) $(READCONSRCS)
OBJ=$(SRCS:.c=.o)
LIBS=-lssl
MODULES=libtestroute.so libreadconnroute.so
all: libtestroute.so libreadconnroute.so
all: $(MODULES)
libtestroute.so: $(TESTOBJ)
$(CC) $(LDFLAGS) $(TESTOBJ) $(LIBS) -o $@
@ -52,4 +53,7 @@ depend.mk: $(SRCS)
@rm -f depend.mk
cc -M $(CFLAGS) $(SRCS) > depend.mk
install: $(MODULES)
install -D $< $(DEST)/gateway/modules
include depend.mk

View File

@ -6,8 +6,14 @@ testroute.o: testroute.c /usr/include/stdio.h /usr/include/features.h \
/usr/include/libio.h /usr/include/_G_config.h /usr/include/wchar.h \
/usr/lib/gcc/x86_64-redhat-linux/4.4.6/include/stdarg.h \
/usr/include/bits/stdio_lim.h /usr/include/bits/sys_errlist.h \
../../include/router.h ../../include/service.h ../../include/session.h \
../../include/buffer.h
../../include/router.h ../../include/service.h /usr/include/time.h \
/usr/include/bits/time.h /usr/include/xlocale.h ../../include/spinlock.h \
../../include/thread.h /usr/include/pthread.h /usr/include/endian.h \
/usr/include/bits/endian.h /usr/include/bits/byteswap.h \
/usr/include/sched.h /usr/include/bits/sched.h \
/usr/include/bits/pthreadtypes.h /usr/include/bits/setjmp.h \
../../include/dcb.h ../../include/buffer.h ../../include/server.h \
../../include/session.h
readconnroute.o: readconnroute.c /usr/include/stdio.h \
/usr/include/features.h /usr/include/sys/cdefs.h \
/usr/include/bits/wordsize.h /usr/include/gnu/stubs.h \
@ -25,8 +31,8 @@ readconnroute.o: readconnroute.c /usr/include/stdio.h \
/usr/include/bits/time.h /usr/include/sys/sysmacros.h \
/usr/include/bits/pthreadtypes.h /usr/include/alloca.h \
/usr/include/string.h /usr/include/xlocale.h ../../include/service.h \
../../include/spinlock.h ../../include/thread.h /usr/include/pthread.h \
/usr/include/sched.h /usr/include/bits/sched.h \
/usr/include/bits/setjmp.h ../../include/dcb.h ../../include/buffer.h \
../../include/server.h ../../include/router.h ../../include/session.h \
../../include/buffer.h ../../include/atomic.h ../../include/spinlock.h \
../../include/thread.h /usr/include/pthread.h /usr/include/sched.h \
/usr/include/bits/sched.h /usr/include/bits/setjmp.h \
../include/readconnection.h ../../include/dcb.h
../../include/atomic.h ../include/readconnection.h

View File

@ -127,7 +127,7 @@ int i, n;
* that we can maintain a count of the number of connections to each
* backend server.
*/
for (server = service->databases, n = 0; server; server = server->next)
for (server = service->databases, n = 0; server; server = server->nextdb)
n++;
inst->servers = (BACKEND **)calloc(n, sizeof(BACKEND *));
@ -137,7 +137,7 @@ int i, n;
return NULL;
}
for (server = service->databases, n = 0; server; server = server->next)
for (server = service->databases, n = 0; server; server = server->nextdb)
{
if ((inst->servers[n] = malloc(sizeof(BACKEND))) == NULL)
{
@ -224,8 +224,8 @@ int i;
* Close a session with the router, this is the mechanism
* by which a router may cleanup data structure etc.
*
* @param instance The router instance data
* @param session The session being closed
* @param instance The router instance data
* @param router_session The session being closed
*/
static void
closeSession(ROUTER *instance, void *router_session)
@ -265,9 +265,10 @@ CLIENT_SESSION *session = (CLIENT_SESSION *)router_session;
* This is simply a case of sending it to the connection that was
* chosen when we started the client session.
*
* @param instance The router instance
* @param session The router session returned from the newSession call
* @param queue The queue of data buffers to route
* @param instance The router instance
* @param router_session The router session returned from the newSession call
* @param queue The queue of data buffers to route
* @return The number of bytes sent
*/
static int
routeQuery(ROUTER *instance, void *router_session, GWBUF *queue)