diff --git a/Makefile b/Makefile index 03e0586b4..0775d9bd9 100644 --- a/Makefile +++ b/Makefile @@ -19,6 +19,9 @@ # 14/06/13 Mark Riddoch Initial implementation # 17/06/13 Mark Riddoch Addition of documentation and depend # targets +# 18/06/13 Mark Riddoch Addition of install target + +DEST=/usr/local/skysql all: (cd core; make) @@ -37,3 +40,8 @@ depend: documentation: doxygen doxygate + +install: + (cd core; make DEST=$(DEST) install) + (cd modules/routing; make DEST=$(DEST) install) + (cd modules/protocol; make DEST=$(DEST) install) diff --git a/README b/README new file mode 100644 index 000000000..a793f276d --- /dev/null +++ b/README @@ -0,0 +1,25 @@ +/** \mainpage SkySQL Gateway + +The SkySQL Gateway is an intelligent proxy that allows forwarding of +database statements to one or more database server user complex rules +and a semantic understanding of the database satements and the roles of +the various servers within the backend cluster of databases. + +The Gateway is designed to provided load balancing and high avilability +functionality transparantly to the applications. In addition it provides +a highly scalable and flexibile architecture, with plugin components to +support differnt protocols and routing decissions. + +The Gateway is implemented in C and makes entensive use of the +asynchronous I/O capabilities of the Linux operating system. The epoll +system is used to provide the event driven framework for the input and +output via sockets. + +The protocols are implemented as external shared object modules which +can be loaded and runtime. These modules support a fixed interface, +communicating the entries points via a structure consisting of a set of +function pointers. This structured is called the "module object". + +The code that routes the queries to the database servers is also loaed +as external shared objects and are referred to as routing modules. +*/ diff --git a/core/Makefile b/core/Makefile index afaf6b63e..249da8fc8 100644 --- a/core/Makefile +++ b/core/Makefile @@ -24,7 +24,7 @@ CC=cc CFLAGS=-c -I/usr/include -I../include -Wall LDFLAGS=-rdynamic SRCS= atomic.c buffer.c spinlock.c gateway.c gateway_mysql_protocol.c gw_utils.c \ - utils.c dcb.c load_utils.c + utils.c dcb.c load_utils.c session.c service.c server.c HDRS= ../include/atomic.h ../include/buffer.h ../include/dcb.h \ ../include/gateway_mysql.h ../include/gw.h ../include/mysql_protocol.h \ ../include/session.h ../include/spinlock.h ../include/thread.h \ @@ -50,4 +50,7 @@ depend.mk: $(SRCS) @rm -f depend.mk cc -M $(CFLAGS) $(SRCS) > depend.mk +install: gateway + install -D $< $(DEST) + include depend.mk diff --git a/core/dcb.c b/core/dcb.c index 9b75d2a53..ced12def3 100644 --- a/core/dcb.c +++ b/core/dcb.c @@ -42,6 +42,8 @@ #include #include #include +#include +#include static DCB *allDCBs = NULL; /* Diagnotics need a list of DCBs */ static SPINLOCK *dcbspin = NULL; @@ -121,7 +123,7 @@ free_dcb(DCB *dcb) * * @param server The server to connect to * @param session The session this connection is being made for - * @param protcol The protocol module to use + * @param protocol The protocol module to use */ DCB * connect_dcb(SERVER *server, SESSION *session, const char *protocol) @@ -134,7 +136,7 @@ int epollfd = -1; // Need to work out how to get this { return NULL; } - if ((funcs = (GWPROTOCOL *)load_module(protocol, "Protocol")) == NULL) + if ((funcs = (GWPROTOCOL *)load_module(protocol, MODULE_PROTOCOL)) == NULL) { free(dcb); return NULL; @@ -155,6 +157,184 @@ int epollfd = -1; // Need to work out how to get this return dcb; } + +/** + * General purpose read routine to read data from a socket in the + * Descriptor Control Block and append it to a linked list of buffers. + * The list may be empty, in which case *head == NULL + * + * @param dcb The DCB to read from + * @param head Pointer to linked list to append data to + * @return The numebr of bytes read or -1 on fatal error + */ +int +dcb_read(DCB *dcb, GWBUF **head) +{ +GWBUF *buffer = NULL; +int b, n = 0; + + ioctl(dcb->fd, FIONREAD, &b); + while (b > 0) + { + int bufsize = b < MAX_BUFFER_SIZE ? b : MAX_BUFFER_SIZE; + if ((buffer = gwbuf_alloc(bufsize)) == NULL) + { + return n ? n : -1; + } + + GW_NOINTR_CALL(n = read(dcb->fd, GWBUF_DATA(buffer), bufsize); dcb->stats.n_reads++); + + if (n < 0) + { + if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) + { + return n; + } + else + { + return n ? n : -1; + } + } + else if (n == 0) + { + return n; + } + + // append read data to the gwbuf + *head = gwbuf_append(*head, buffer); + + /* Re issue the ioctl as the amount of data buffered may have changed */ + ioctl(dcb->fd, FIONREAD, &b); + } + + return n; +} + +/** + * General purpose routine to write to a DCB + * + * @param dcb The DCB of the client + * @param queue Queue of buffers to write + */ +int +dcb_write(DCB *dcb, GWBUF *queue) +{ +int w, saved_errno = 0; + + spinlock_acquire(&dcb->writeqlock); + if (dcb->writeq) + { + /* + * We have some queued data, so add our data to + * the write queue and return. + * The assumption is that there will be an EPOLLOUT + * event to drain what is already queued. We are protected + * by the spinlock, which will also be acquired by the + * the routine that drains the queue data, so we should + * not have a race condition on the event. + */ + dcb->writeq = gwbuf_append(dcb->writeq, queue); + dcb->stats.n_buffered++; + } + else + { + int len; + + /* + * Loop over the buffer chain that has been passed to us + * from the reading side. + * Send as much of the data in that chain as possible and + * add any balance to the write queue. + */ + while (queue != NULL) + { + len = GWBUF_LENGTH(queue); + GW_NOINTR_CALL(w = write(dcb->fd, GWBUF_DATA(queue), len); dcb->stats.n_writes++); + saved_errno = errno; + if (w < 0) + { + break; + } + + /* + * Pull the number of bytes we have written from + * queue with have. + */ + queue = gwbuf_consume(queue, w); + if (w < len) + { + /* We didn't write all the data */ + } + } + /* Buffer the balance of any data */ + dcb->writeq = queue; + if (queue) + { + dcb->stats.n_buffered++; + } + } + spinlock_release(&dcb->writeqlock); + + if (queue && (saved_errno != EAGAIN || saved_errno != EWOULDBLOCK)) + { + /* We had a real write failure that we must deal with */ + return 0; + } + + return 1; +} + +/** + * Drain the write queue of a DCB. THis is called as part of the EPOLLOUT handling + * of a socket and will try to send any buffered data from the write queue + * up until the point the write would block. + * + * @param dcb DCB to drain the write queue of + * @return The number of bytes written + */ +int +dcb_drain_writeq(DCB *dcb) +{ +int n = 0; +int w; +int saved_errno = 0; + + spinlock_acquire(&dcb->writeqlock); + if (dcb->writeq) + { + int len; + + /* + * Loop over the buffer chain in the pending writeq + * Send as much of the data in that chain as possible and + * leave any balance on the write queue. + */ + while (dcb->writeq != NULL) + { + len = GWBUF_LENGTH(dcb->writeq); + GW_NOINTR_CALL(w = write(dcb->fd, GWBUF_DATA(dcb->writeq), len);); + saved_errno = errno; + if (w < 0) + { + break; + } + + /* + * Pull the number of bytes we have written from + * queue with have. + */ + dcb->writeq = gwbuf_consume(dcb->writeq, w); + if (w < len) + { + /* We didn't write all the data */ + } + n += w; + } + } + spinlock_release(&dcb->writeqlock); + return n; +} + /** * Diagnostic to print a DCB * diff --git a/core/depend.mk b/core/depend.mk index cc4d77dad..a355d6520 100644 --- a/core/depend.mk +++ b/core/depend.mk @@ -67,7 +67,8 @@ gateway.o: gateway.c ../include/gw.h /usr/include/stdio.h \ ../include/gateway_mysql.h ../include/dcb.h ../include/spinlock.h \ ../include/thread.h /usr/include/pthread.h /usr/include/sched.h \ /usr/include/bits/sched.h /usr/include/bits/setjmp.h ../include/buffer.h \ - ../include/mysql_protocol.h ../include/dcb.h ../include/session.h + ../include/mysql_protocol.h ../include/dcb.h ../include/service.h \ + ../include/server.h ../include/session.h gateway_mysql_protocol.o: gateway_mysql_protocol.c ../include/gw.h \ /usr/include/stdio.h /usr/include/features.h /usr/include/sys/cdefs.h \ /usr/include/bits/wordsize.h /usr/include/gnu/stubs.h \ @@ -231,7 +232,34 @@ dcb.o: dcb.c /usr/include/stdio.h /usr/include/features.h \ ../include/spinlock.h ../include/thread.h /usr/include/pthread.h \ /usr/include/sched.h /usr/include/bits/sched.h \ /usr/include/bits/setjmp.h ../include/buffer.h ../include/server.h \ - ../include/session.h ../include/modules.h + ../include/session.h ../include/modules.h /usr/include/errno.h \ + /usr/include/bits/errno.h /usr/include/linux/errno.h \ + /usr/include/asm/errno.h /usr/include/asm-generic/errno.h \ + /usr/include/asm-generic/errno-base.h ../include/gw.h \ + /usr/include/ctype.h /usr/include/netdb.h /usr/include/netinet/in.h \ + /usr/include/stdint.h /usr/include/bits/wchar.h \ + /usr/include/sys/socket.h /usr/include/sys/uio.h /usr/include/bits/uio.h \ + /usr/include/bits/socket.h /usr/include/bits/sockaddr.h \ + /usr/include/asm/socket.h /usr/include/asm-generic/socket.h \ + /usr/include/asm/sockios.h /usr/include/asm-generic/sockios.h \ + /usr/include/bits/in.h /usr/include/rpc/netdb.h \ + /usr/include/bits/netdb.h /usr/include/fcntl.h /usr/include/bits/fcntl.h \ + /usr/include/bits/stat.h /usr/include/unistd.h \ + /usr/include/bits/posix_opt.h /usr/include/bits/environments.h \ + /usr/include/bits/confname.h /usr/include/getopt.h /usr/include/syslog.h \ + /usr/include/sys/syslog.h /usr/include/bits/syslog-path.h \ + /usr/include/pwd.h /usr/include/sys/epoll.h /usr/include/signal.h \ + /usr/include/bits/signum.h /usr/include/bits/siginfo.h \ + /usr/include/bits/sigaction.h /usr/include/bits/sigcontext.h \ + /usr/include/bits/sigstack.h /usr/include/sys/ucontext.h \ + /usr/include/bits/sigthread.h /usr/include/sys/ioctl.h \ + /usr/include/bits/ioctls.h /usr/include/asm/ioctls.h \ + /usr/include/asm-generic/ioctls.h /usr/include/linux/ioctl.h \ + /usr/include/asm/ioctl.h /usr/include/asm-generic/ioctl.h \ + /usr/include/bits/ioctl-types.h /usr/include/sys/ttydefaults.h \ + /usr/include/arpa/inet.h \ + /usr/lib/gcc/x86_64-redhat-linux/4.4.6/include/stdbool.h \ + ../include/gateway_mysql.h ../include/mysql_protocol.h ../include/dcb.h load_utils.o: load_utils.c /usr/include/sys/param.h \ /usr/lib/gcc/x86_64-redhat-linux/4.4.6/include/limits.h \ /usr/lib/gcc/x86_64-redhat-linux/4.4.6/include/syslimits.h \ @@ -258,3 +286,65 @@ load_utils.o: load_utils.c /usr/include/sys/param.h \ /usr/include/bits/environments.h /usr/include/bits/confname.h \ /usr/include/getopt.h /usr/include/string.h /usr/include/xlocale.h \ /usr/include/dlfcn.h /usr/include/bits/dlfcn.h ../include/modules.h +session.o: session.c /usr/include/stdio.h /usr/include/features.h \ + /usr/include/sys/cdefs.h /usr/include/bits/wordsize.h \ + /usr/include/gnu/stubs.h /usr/include/gnu/stubs-64.h \ + /usr/lib/gcc/x86_64-redhat-linux/4.4.6/include/stddef.h \ + /usr/include/bits/types.h /usr/include/bits/typesizes.h \ + /usr/include/libio.h /usr/include/_G_config.h /usr/include/wchar.h \ + /usr/lib/gcc/x86_64-redhat-linux/4.4.6/include/stdarg.h \ + /usr/include/bits/stdio_lim.h /usr/include/bits/sys_errlist.h \ + /usr/include/stdlib.h /usr/include/bits/waitflags.h \ + /usr/include/bits/waitstatus.h /usr/include/endian.h \ + /usr/include/bits/endian.h /usr/include/bits/byteswap.h \ + /usr/include/sys/types.h /usr/include/time.h /usr/include/sys/select.h \ + /usr/include/bits/select.h /usr/include/bits/sigset.h \ + /usr/include/bits/time.h /usr/include/sys/sysmacros.h \ + /usr/include/bits/pthreadtypes.h /usr/include/alloca.h \ + /usr/include/unistd.h /usr/include/bits/posix_opt.h \ + /usr/include/bits/environments.h /usr/include/bits/confname.h \ + /usr/include/getopt.h /usr/include/string.h /usr/include/xlocale.h \ + ../include/session.h ../include/service.h ../include/spinlock.h \ + ../include/thread.h /usr/include/pthread.h /usr/include/sched.h \ + /usr/include/bits/sched.h /usr/include/bits/setjmp.h ../include/dcb.h \ + ../include/buffer.h ../include/server.h ../include/router.h \ + ../include/atomic.h +service.o: service.c /usr/include/stdio.h /usr/include/features.h \ + /usr/include/sys/cdefs.h /usr/include/bits/wordsize.h \ + /usr/include/gnu/stubs.h /usr/include/gnu/stubs-64.h \ + /usr/lib/gcc/x86_64-redhat-linux/4.4.6/include/stddef.h \ + /usr/include/bits/types.h /usr/include/bits/typesizes.h \ + /usr/include/libio.h /usr/include/_G_config.h /usr/include/wchar.h \ + /usr/lib/gcc/x86_64-redhat-linux/4.4.6/include/stdarg.h \ + /usr/include/bits/stdio_lim.h /usr/include/bits/sys_errlist.h \ + /usr/include/stdlib.h /usr/include/bits/waitflags.h \ + /usr/include/bits/waitstatus.h /usr/include/endian.h \ + /usr/include/bits/endian.h /usr/include/bits/byteswap.h \ + /usr/include/sys/types.h /usr/include/time.h /usr/include/sys/select.h \ + /usr/include/bits/select.h /usr/include/bits/sigset.h \ + /usr/include/bits/time.h /usr/include/sys/sysmacros.h \ + /usr/include/bits/pthreadtypes.h /usr/include/alloca.h \ + /usr/include/string.h /usr/include/xlocale.h ../include/session.h \ + ../include/service.h ../include/spinlock.h ../include/thread.h \ + /usr/include/pthread.h /usr/include/sched.h /usr/include/bits/sched.h \ + /usr/include/bits/setjmp.h ../include/dcb.h ../include/buffer.h \ + ../include/server.h ../include/router.h ../include/modules.h +server.o: server.c /usr/include/stdio.h /usr/include/features.h \ + /usr/include/sys/cdefs.h /usr/include/bits/wordsize.h \ + /usr/include/gnu/stubs.h /usr/include/gnu/stubs-64.h \ + /usr/lib/gcc/x86_64-redhat-linux/4.4.6/include/stddef.h \ + /usr/include/bits/types.h /usr/include/bits/typesizes.h \ + /usr/include/libio.h /usr/include/_G_config.h /usr/include/wchar.h \ + /usr/lib/gcc/x86_64-redhat-linux/4.4.6/include/stdarg.h \ + /usr/include/bits/stdio_lim.h /usr/include/bits/sys_errlist.h \ + /usr/include/stdlib.h /usr/include/bits/waitflags.h \ + /usr/include/bits/waitstatus.h /usr/include/endian.h \ + /usr/include/bits/endian.h /usr/include/bits/byteswap.h \ + /usr/include/sys/types.h /usr/include/time.h /usr/include/sys/select.h \ + /usr/include/bits/select.h /usr/include/bits/sigset.h \ + /usr/include/bits/time.h /usr/include/sys/sysmacros.h \ + /usr/include/bits/pthreadtypes.h /usr/include/alloca.h \ + /usr/include/string.h /usr/include/xlocale.h ../include/session.h \ + ../include/server.h ../include/spinlock.h ../include/thread.h \ + /usr/include/pthread.h /usr/include/sched.h /usr/include/bits/sched.h \ + /usr/include/bits/setjmp.h diff --git a/core/gateway.c b/core/gateway.c index b1002a503..1a251bd53 100644 --- a/core/gateway.c +++ b/core/gateway.c @@ -18,7 +18,7 @@ */ /** - * @file Gateway.c - The gateway entry point. + * @file gateway.c - The gateway entry point. * * @verbatim * Revision History @@ -33,6 +33,8 @@ */ #include +#include +#include #include #include @@ -164,23 +166,44 @@ int handle_event_errors_backend(DCB *dcb, int event) { } // main function -int main(int argc, char **argv) { - int daemon_mode = 1; - sigset_t sigset; - struct epoll_event events[MAX_EVENTS]; - struct epoll_event ev; - int nfds; - int n; - char *port = NULL; +int +main(int argc, char **argv) +{ +int daemon_mode = 1; +sigset_t sigset; +struct epoll_event events[MAX_EVENTS]; +struct epoll_event ev; +int nfds; +int n; +unsigned short port = 4406; +SERVICE *service; +SERVER *server1, *server2, *server3; for (n = 0; n < argc; n++) { if (strncmp(argv[n], "-p", 2) == 0) { - port = &argv[n][2]; + port = atoi(&argv[n][2]); } } + /* + * Build the services etc. This would normally be done by the + * configuration, however in lieu of that being available we + * will build a static configuration here + */ + + if ((service = service_alloc("Test Service", "readconnroute")) == NULL) + exit(1); + serviceAddProtocol(service, "MySQLClient", port); + + server1 = server_alloc("127.0.0.1", "MySQLBackend", 3306); + server2 = server_alloc("127.0.0.1", "MySQLBackend", 3307); + server3 = server_alloc("127.0.0.1", "MySQLBackend", 3308); + serviceAddBackend(service, server1); + serviceAddBackend(service, server2); + serviceAddBackend(service, server3); + fprintf(stderr, "(C) SkySQL Ab 2013\n"); load_module("testroute", "Router"); @@ -220,6 +243,11 @@ int main(int argc, char **argv) { exit(EXIT_FAILURE); } + /* + * Start the service that was created above + */ + serviceStart(service, epollfd); + fprintf(stderr, ">> GATEWAY epoll maxevents is %i\n", MAX_EVENTS); // listen to MySQL protocol diff --git a/core/gw_utils.c b/core/gw_utils.c index ca44d4533..ed96fee19 100644 --- a/core/gw_utils.c +++ b/core/gw_utils.c @@ -273,7 +273,7 @@ int gw_read_gwbuff(DCB *dcb, GWBUF **head, int b) { * - backend connection using data in MySQL session * * @param client_dcb The client DCB struct - * @param epfd The epoll set to add the new connection + * @param efd The epoll set to add the new connection * @return 0 on Success or 1 on Failure. */ int create_backend_connection(DCB *client_dcb, int efd) { diff --git a/core/load_utils.c b/core/load_utils.c index 0eead68c9..16ecbfe9b 100644 --- a/core/load_utils.c +++ b/core/load_utils.c @@ -52,7 +52,6 @@ static void unregister_module(const char *module); * * @param module Name of the module to load * @param type Type of module, used purely for registration - * @param entry Routine to call to extract entry points * @return The module specific entry point structure or NULL */ void * diff --git a/core/server.c b/core/server.c new file mode 100644 index 000000000..b41e50cbc --- /dev/null +++ b/core/server.c @@ -0,0 +1,141 @@ +/* + * This file is distributed as part of the SkySQL Gateway. It is free + * software: you can redistribute it and/or modify it under the terms of the + * GNU General Public License as published by the Free Software Foundation, + * version 2. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Copyright SkySQL Ab 2013 + */ + +/** + * @file server.c - A representation of a backend server within the gateway. + * + * @verbatim + * Revision History + * + * Date Who Description + * 18/06/13 Mark Riddoch Initial implementation + * + * @endverbatim + */ +#include +#include +#include +#include +#include +#include + +static SPINLOCK server_spin = SPINLOCK_INIT; +static SERVER *allServers = NULL; + +/** + * Allocate a new server withn the gateway + * + * + * @param servname The server name + * @param protocol The protocol to use to connect to the server + * @param port The port to connect to + * + * @return The newly created server or NULL if an error occured + */ +SERVER * +server_alloc(char *servname, char *protocol, unsigned short port) +{ +SERVER *server; + + if ((server = (SERVER *)malloc(sizeof(SERVER))) == NULL) + return NULL; + server->name = strdup(servname); + server->protocol = strdup(protocol); + server->port = port; + memset(&server->stats, sizeof(SERVER_STATS), 0); + server->nextdb = NULL; + + spinlock_acquire(&server_spin); + server->next = allServers; + allServers = server; + spinlock_release(&server_spin); + + return server; +} + + +/** + * Deallocate the specified server + * + * @param server The service to deallocate + * @return Returns true if the server was freed + */ +int +server_free(SERVER *server) +{ +SERVER *ptr; + + /* First of all remove from the linked list */ + spinlock_acquire(&server_spin); + if (allServers == server) + { + allServers = server->next; + } + else + { + ptr = allServers; + while (ptr && ptr->next != server) + { + ptr = ptr->next; + } + if (ptr) + ptr->next = server->next; + } + spinlock_release(&server_spin); + + /* Clean up session and free the memory */ + free(server->name); + free(server->protocol); + free(server); + return 1; +} + +/** + * Print details of an individual server + * + * @param server Server to print + */ +void +printServer(SERVER *server) +{ + printf("Server %p\n", server); + printf("\tServer: %s\n", server->name); + printf("\tProtocol: %s\n", server->protocol); + printf("\tPort: %d\n", server->port); +} + +/** + * Print all servers + * + * Designed to be called within a debugger session in order + * to display all active servers within the gateway + */ +void +printAllServers() +{ +SERVER *ptr; + + spinlock_acquire(&server_spin); + ptr = allServers; + while (ptr) + { + printServer(ptr); + ptr = ptr->next; + } + spinlock_release(&server_spin); +} diff --git a/core/service.c b/core/service.c new file mode 100644 index 000000000..ec50ab118 --- /dev/null +++ b/core/service.c @@ -0,0 +1,249 @@ +/* + * This file is distributed as part of the SkySQL Gateway. It is free + * software: you can redistribute it and/or modify it under the terms of the + * GNU General Public License as published by the Free Software Foundation, + * version 2. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Copyright SkySQL Ab 2013 + */ + +/** + * @file service.c - A representation of the service within the gateway. + * + * @verbatim + * Revision History + * + * Date Who Description + * 18/06/13 Mark Riddoch Initial implementation + * + * @endverbatim + */ +#include +#include +#include +#include +#include +#include +#include +#include +#include + +static SPINLOCK service_spin = SPINLOCK_INIT; +static SERVICE *allServices = NULL; + +/** + * Allocate a new service for the gateway to support + * + * + * @param servname The service name + * @param router Name of the router module this service uses + * + * @return The newly created service or NULL if an error occured + */ +SERVICE * +service_alloc(char *servname, char *router) +{ +SERVICE *service; + + if ((service = (SERVICE *)malloc(sizeof(SERVICE))) == NULL) + return NULL; + if ((service->router = load_module(router, MODULE_ROUTER)) == NULL) + { + free(service); + return NULL; + } + service->name = strdup(servname); + service->routerModule = strdup(router); + memset(&service->stats, sizeof(SERVICE_STATS), 0); + service->ports = NULL; + service->stats.started = time(0); + service->state = SERVICE_STATE_ALLOC; + + spinlock_acquire(&service_spin); + service->next = allServices; + allServices = service; + spinlock_release(&service_spin); + + return service; +} + +/** + * Start a service + * + * This function loads the protocol modules for each port on which the + * service listens and starts the listener on that port + * + * Also create the router_instance for the service. + * + * @param service The Service that should be started + * @param efd The epoll descriptor + * @return Returns the number of listeners created + */ +int +serviceStart(SERVICE *service, int efd) +{ +SERV_PROTOCOL *port; +int listeners = 0; +char config_bind[40]; +GWPROTOCOL *funcs; + + port = service->ports; + while (port) + { + if ((port->listener = alloc_dcb()) == NULL) + { + break; + } + if ((funcs = (GWPROTOCOL *)load_module(port->protocol, MODULE_PROTOCOL)) == NULL) + { + free(port->listener); + port->listener = NULL; + } + memcpy(&(port->listener->func), funcs, sizeof(GWPROTOCOL)); + port->listener->session = NULL; + sprintf(config_bind, "0.0.0.0:%d", port->port); + if (port->listener->func.listen(efd, config_bind)) + listeners++; + } + port->listener->session = session_alloc(service, port->listener); + port->listener->session->state = SESSION_STATE_LISTENER; + if (listeners) + service->stats.started = time(0); + + service->router_instance = service->router->createInstance(service); + return listeners; +} + +/** + * Deallocate the specified service + * + * @param service The service to deallocate + * @return Returns true if the service was freed + */ +int +service_free(SERVICE *service) +{ +SERVICE *ptr; + + if (service->stats.n_current) + return 0; + /* First of all remove from the linked list */ + spinlock_acquire(&service_spin); + if (allServices == service) + { + allServices = service->next; + } + else + { + ptr = allServices; + while (ptr && ptr->next != service) + { + ptr = ptr->next; + } + if (ptr) + ptr->next = service->next; + } + spinlock_release(&service_spin); + + /* Clean up session and free the memory */ + free(service->name); + free(service->routerModule); + free(service); + return 1; +} + +/** + * Add a protocol/port pair to the service + * + * @param service The service + * @param protocol The name of the protocol module + * @param port The port to listen on + * @return TRUE if the protocol/port could be added + */ +int +serviceAddProtocol(SERVICE *service, char *protocol, unsigned short port) +{ +SERV_PROTOCOL *proto; + + if ((proto = (SERV_PROTOCOL *)malloc(sizeof(SERV_PROTOCOL))) == NULL) + { + return 0; + } + proto->protocol = strdup(protocol); + proto->port = port; + spinlock_acquire(&service->spin); + proto->next = service->ports; + service->ports = proto; + spinlock_release(&service->spin); + + return 1; +} + +/** + * Add a backend database server to a service + * + * @param service + * @param server + */ +void +serviceAddBackend(SERVICE *service, SERVER *server) +{ + spinlock_acquire(&service->spin); + server->nextdb = service->databases; + service->databases = server; + spinlock_release(&service->spin); +} + +/** + * Print details of an individual service + * + * @param service Service to print + */ +void +printService(SERVICE *service) +{ +SERVER *ptr = service->databases; + + printf("Service %p\n", service); + printf("\tService: %s\n", service->name); + printf("\tRouter: %s (%p)\n", service->routerModule, service->router); + printf("\tStarted: %s", asctime(localtime(&service->stats.started))); + printf("\tBackend databases\n"); + while (ptr) + { + printf("\t\t%s:%d %s\n", ptr->name, ptr->port, ptr->protocol); + ptr = ptr->next; + } + printf("\tTotal connections: %d\n", service->stats.n_sessions); + printf("\tCurrently connected: %d\n", service->stats.n_current); +} + +/** + * Print all services + * + * Designed to be called within a debugger session in order + * to display all active services within the gateway + */ +void +printAllServices() +{ +SERVICE *ptr; + + spinlock_acquire(&service_spin); + ptr = allServices; + while (ptr) + { + printService(ptr); + ptr = ptr->next; + } + spinlock_release(&service_spin); +} diff --git a/core/session.c b/core/session.c new file mode 100644 index 000000000..b340dab57 --- /dev/null +++ b/core/session.c @@ -0,0 +1,148 @@ +/* + * This file is distributed as part of the SkySQL Gateway. It is free + * software: you can redistribute it and/or modify it under the terms of the + * GNU General Public License as published by the Free Software Foundation, + * version 2. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Copyright SkySQL Ab 2013 + */ + +/** + * @file session.c - A representation of the session within the gateway. + * + * @verbatim + * Revision History + * + * Date Who Description + * 17/06/13 Mark Riddoch Initial implementation + * + * @endverbatim + */ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +static SPINLOCK session_spin = SPINLOCK_INIT; +static SESSION *allSessions = NULL; + +/** + * Allocate a new session for a new client of the specified service. + * + * Create the link to the router session by calling the newSession + * entry point of the router using the router instance of the + * service this session is part of. + * + * @param service The service this connection was established by + * @param client The client side DCB + * @return The newly created session or NULL if an error occured + */ +SESSION * +session_alloc(SERVICE *service, DCB *client) +{ +SESSION *session; + + if ((session = (SESSION *)malloc(sizeof(SESSION))) == NULL) + return NULL; + session->service = service; + session->client = client; + memset(&session->stats, sizeof(SESSION_STATS), 0); + session->stats.connect = time(0); + session->state = SESSION_STATE_ALLOC; + client->session = session; + + session->router_session = service->router->newSession(service->router_instance, session); + + spinlock_acquire(&session_spin); + session->next = allSessions; + allSessions = session; + spinlock_release(&session_spin); + + atomic_add(&service->stats.n_sessions, 1); + atomic_add(&service->stats.n_current, 1); + + return session; +} + +/** + * Deallocate the specified session + * + * @param session The session to deallocate + */ +void +session_free(SESSION *session) +{ +SESSION *ptr; + + /* First of all remove from the linked list */ + spinlock_acquire(&session_spin); + if (allSessions == session) + { + allSessions = session->next; + } + else + { + ptr = allSessions; + while (ptr && ptr->next != session) + { + ptr = ptr->next; + } + if (ptr) + ptr->next = session->next; + } + spinlock_release(&session_spin); + atomic_add(&session->service->stats.n_current, -1); + + /* Clean up session and free the memory */ + free(session); +} + +/** + * Print details of an individual session + * + * @param session Session to print + */ +void +printSession(SESSION *session) +{ + printf("Session %p\n", session); + printf("\tService: %s (%p)\n", session->service->name, session->service); + printf("\tClient DCB: %p\n", session->client); + printf("\tConnected: %s", asctime(localtime(&session->stats.connect))); +} + +/** + * Print all sessions + * + * Designed to be called within a debugger session in order + * to display all active sessions within the gateway + */ +void +printAllSessions() +{ +SESSION *ptr; + + spinlock_acquire(&session_spin); + ptr = allSessions; + while (ptr) + { + printSession(ptr); + ptr = ptr->next; + } + spinlock_release(&session_spin); +} diff --git a/doxygate b/doxygate index 2058c232b..50f4f81e4 100644 --- a/doxygate +++ b/doxygate @@ -568,7 +568,7 @@ WARN_LOGFILE = # directories like "/usr/src/myproject". Separate the files or directories # with spaces. -INPUT = core modules include +INPUT = README core modules include # This tag can be used to specify the character encoding of the source files # that doxygen parses. Internally doxygen uses the UTF-8 encoding, which is diff --git a/include/buffer.h b/include/buffer.h index d85bc96fe..2b555bc00 100644 --- a/include/buffer.h +++ b/include/buffer.h @@ -39,6 +39,15 @@ * * @endverbatim */ + +/** + * The buffer structure used by the descriptor control blocks. + * + * Linked lists of buffers are created as data is read from a descriptor + * or written to a descriptor. The use of linked lists of buffers with + * flexible data pointers is designed to minimise the need for data to + * be copied within the gateway. + */ typedef struct gwbuf { struct gwbuf *next; /**< Next buffer in a linked chain of buffers */ void *start; /**< Start of the valid data */ @@ -49,10 +58,18 @@ typedef struct gwbuf { /* * Macros to access the data in the buffers */ -#define GWBUF_DATA(b) ((b)->start) -#define GWBUF_LENGTH(b) ((b)->end - (b)->start) -#define GWBUF_EMPTY(b) ((b)->start == (b)->end) -#define GWBUF_CONSUME(b, bytes) (b)->start += bytes +#define GWBUF_DATA(b) ((b)->start) /**< First valid, uncomsumed + * byte in the buffer + */ +#define GWBUF_LENGTH(b) ((b)->end - (b)->start) /**< Number of bytes in the + * individual buffer + */ +#define GWBUF_EMPTY(b) ((b)->start == (b)->end) /**< True if all bytes in the + * buffer have been consumed + */ +#define GWBUF_CONSUME(b, bytes) (b)->start += bytes /**< Consume a number of bytes + * in the buffer + */ /* * Function prototypes for the API to maniplate the buffers diff --git a/include/dcb.h b/include/dcb.h index 41d305941..86dab59d2 100644 --- a/include/dcb.h +++ b/include/dcb.h @@ -36,6 +36,7 @@ struct server; * 01/06/13 Mark Riddoch Initial implementation * 11/06/13 Mark Riddoch Updated GWPROTOCOL structure with new * entry points + * 18/06/13 Mark Riddoch Addition of the listener entry point * * @endverbatim */ @@ -56,7 +57,12 @@ struct dcb; * connect Create a connection to the specified server * for the session pased in * close Gateway close entry point for the socket + * listen Create a listener for the protocol * @endverbatim + * + * This forms the "module object" for protocol modules within the gateway. + * + * @see load_module */ typedef struct gw_protocol { int (*read)(struct dcb *, int); @@ -67,16 +73,29 @@ typedef struct gw_protocol { int (*accept)(struct dcb *, int); int (*connect)(struct server *, struct session *, int); int (*close)(struct dcb *, int); + int (*listen)(int, char *); } GWPROTOCOL; +/** + * The statitics gathered on a descriptor control block + */ typedef struct dcbstats { int n_reads; /**< Number of reads on this descriptor */ int n_writes; /**< Number of writes on this descriptor */ int n_accepts; /**< Number of accepts on this descriptor */ int n_buffered; /**< Number of buffered writes */ } DCBSTATS; -/* + +/** * Descriptor Control Block + * + * A wrapper for a network descriptor within the gateway, it contains all the + * state information necessary to allow for the implementation of the asynchronous + * operation of the potocol and gateway functions. It also provides links to the service + * and session data that is required to route the information within the gateway. + * + * It is important to hold the state information here such that any thread within the + * gateway may be selected to execute the required actions when a network event occurs. */ typedef struct dcb { int fd; /**< The descriptor */ @@ -94,13 +113,13 @@ typedef struct dcb { } DCB; /* DCB states */ -#define DCB_STATE_ALLOC 0 /* Memory allocated but not populated */ -#define DCB_STATE_IDLE 1 /* Not yet in the poll mask */ -#define DCB_STATE_POLLING 2 /* Waiting in the poll loop */ -#define DCB_STATE_PROCESSING 4 /* Processing an event */ -#define DCB_STATE_LISTENING 5 /* The DCB is for a listening socket */ -#define DCB_STATE_DISCONNECTED 6 /* The socket is now closed */ -#define DCB_STATE_FREED 7 /* Memory freed */ +#define DCB_STATE_ALLOC 0 /**< Memory allocated but not populated */ +#define DCB_STATE_IDLE 1 /**< Not yet in the poll mask */ +#define DCB_STATE_POLLING 2 /**< Waiting in the poll loop */ +#define DCB_STATE_PROCESSING 4 /**< Processing an event */ +#define DCB_STATE_LISTENING 5 /**< The DCB is for a listening socket */ +#define DCB_STATE_DISCONNECTED 6 /**< The socket is now closed */ +#define DCB_STATE_FREED 7 /**< Memory freed */ /* A few useful macros */ #define DCB_SESSION(x) (x)->session @@ -109,6 +128,9 @@ typedef struct dcb { extern DCB *alloc_dcb(); /* Allocate a DCB */ extern void free_dcb(DCB *); /* Free a DCB */ extern DCB *connect_dcb(struct server *, struct session *, const char *); +extern int dcb_read(DCB *, GWBUF **); /* Generic read routine */ +extern int dcb_write(DCB *, GWBUF *); /* Generic write routine */ +extern int dcb_drain_writeq(DCB *); /* Generic write routine */ extern void printAllDCBs(); /* Debug to print all DCB in the system */ extern void printDCB(DCB *); /* Debug print routine */ extern const char *gw_dcb_state2string(int); /* DCB state to string */ diff --git a/include/modules.h b/include/modules.h index d4a4195ff..e7c42287d 100644 --- a/include/modules.h +++ b/include/modules.h @@ -41,6 +41,13 @@ typedef struct modules { *next; /**< Next module in the linked list */ } MODULES; +/** + * Module types + */ +#define MODULE_PROTOCOL "Protocol" /**< A protocol module type */ +#define MODULE_ROUTER "Router" /**< A router module type */ + + extern void *load_module(const char *module, const char *type); extern void unload_module(const char *module); extern void printModules(); diff --git a/include/router.h b/include/router.h index 95786d365..6d3d3ff86 100644 --- a/include/router.h +++ b/include/router.h @@ -51,8 +51,10 @@ typedef void *ROUTER; * routeQuery Called on each query that requires * routing * @endverbatim + * + * @see load_module */ -typedef struct { +typedef struct router_object { ROUTER *(*createInstance)(SERVICE *service); void *(*newSession)(ROUTER *instance, SESSION *session); void (*closeSession)(ROUTER *instance, void *router_session); diff --git a/include/server.h b/include/server.h index d9e64c195..38c838207 100644 --- a/include/server.h +++ b/include/server.h @@ -19,7 +19,7 @@ */ /** - * @file server.h + * @file service.h * * The server level definitions within the gateway * @@ -31,10 +31,32 @@ * * @endverbatim */ + +/** + * The server statistics structure + * + */ +typedef struct { + int n_connections; /**< Number of connections */ +} SERVER_STATS; + +/** + * The SERVER structure defines a backend server. Each server has a name + * or IP address for the server, a port that the server listens on and + * the name of a protocol module that is loaded to implement the protocol + * between the gateway and the server. + */ typedef struct server { char *name; /**< Server name/IP address*/ - int port; /**< Port to listen on */ + unsigned short port; /**< Port to listen on */ char *protocol; /**< Protocol module to use */ + SERVER_STATS stats; /**< The server statistics */ struct server *next; /**< Next server */ + struct server *nextdb; /**< Next server in lsit attached to a service */ } SERVER; + +extern SERVER *server_alloc(char *, char *, unsigned short); +extern int server_free(SERVER *); +extern void printServer(SERVER *); +extern void printAllServers(); #endif diff --git a/include/service.h b/include/service.h index 89f3b5ea4..4d9b0ba33 100644 --- a/include/service.h +++ b/include/service.h @@ -18,8 +18,13 @@ * Copyright SkySQL Ab 2013 */ +#include +#include +#include +#include + /** - * @file sevice.h + * @file service.h * * The service level definitions within the gateway * @@ -28,27 +33,69 @@ * * Date Who Description * 14/06/13 Mark Riddoch Initial implementation + * 18/06/13 Mark Riddoch Addition of statistics and function + * prototypes * * @endverbatim */ struct server; struct router; +struct router_object; +/** + * The servprotocol structure is used to link a service to the protocols that + * are used to support that service. It defines the name of the protocol module + * that should be loaded to support the client connection and the port that the + * protocol should use to listen for incoming client connections. + */ typedef struct servprotocol { char *protocol; /**< Protocol module to load */ - short port; /**< Port to listen on */ + unsigned short port; /**< Port to listen on */ + DCB *listener; /**< The DCB for the listener */ struct servprotocol *next; /**< Next service protocol */ } SERV_PROTOCOL; +/** + * The service statistics structure + */ +typedef struct { + time_t started; /**< The time when the service was started */ + int n_sessions; /**< Number of sessions created on service since start */ + int n_current; /**< Current number of sessions */ +} SERVICE_STATS; +/** + * Defines a service within the gateway. + * + * A service is a combination of a set of backend servers, a routing mechanism + * and a set of client side protocol/port pairs used to listen for new connections + * to the service. + */ typedef struct service { char *name; /**< The service name */ - SERV_PROTOCOL *servers; /**< Linked list of ports and protocols + int state; /**< The service state */ + SERV_PROTOCOL *ports; /**< Linked list of ports and protocols * that this service will listen on. */ char *routerModule; /**< Name of router module to use */ - struct router *router; /**< The router we are using */ + struct router_object + *router; /**< The router we are using */ + void *router_instance; + /**< The router instance for this service */ struct server *databases; /**< The set of servers in the backend */ + SPINLOCK spin; /**< The service spinlock */ + SERVICE_STATS stats; /**< The service statistics */ + struct service *next; /**< The next service in the linked list */ } SERVICE; +#define SERVICE_STATE_ALLOC 1 /**< The service has been allocated */ +#define SERVICE_STATE_STARTED 2 /**< The service has been started */ + +extern SERVICE *service_alloc(char *, char *); +extern int service_free(SERVICE *); +extern int serviceAddProtocol(SERVICE *, char *, unsigned short); +extern void serviceAddBackend(SERVICE *, SERVER *); +extern int serviceStart(SERVICE *, int); +extern void printService(SERVICE *); +extern void printAllServices(); #endif diff --git a/include/session.h b/include/session.h index 2c0d8492b..37258c2f8 100644 --- a/include/session.h +++ b/include/session.h @@ -30,21 +30,44 @@ * for session specific data * @endverbatim */ +#include struct dcb; +struct service; -/* +/** + * The session statistics structure + */ +typedef struct { + time_t connect; /**< Time when the session was started */ +} SESSION_STATS; + +/** * The session status block + * + * A session status block is created for each user (client) connection + * to the database, it links the descriptors, routing implementation + * and originating service together for the client session. */ typedef struct session { int state; /**< Current descriptor state */ struct dcb *client; /**< The client connection */ struct dcb *backends; /**< The set of backend servers */ void *data; /**< The session data */ + void *router_session;/**< The router insatnce data */ + SESSION_STATS stats; /**< Session statistics */ + struct service *service; /**< The service this session is using */ + struct session *next; /**< Linked list of all sessions */ } SESSION; #define SESSION_STATE_ALLOC 0 #define SESSION_STATE_READY 1 +#define SESSION_STATE_LISTENER 2 #define SESSION_PROTOCOL(x, type) DCB_PROTOCOL((x)->client, type) + +extern SESSION *session_alloc(struct service *, struct dcb *); +extern void session_free(SESSION *); +extern void printAllSessions(); +extern void printSession(SESSION *); #endif diff --git a/include/spinlock.h b/include/spinlock.h index 91c737a6f..66706d3fd 100644 --- a/include/spinlock.h +++ b/include/spinlock.h @@ -46,6 +46,12 @@ typedef struct spinlock { #define FALSE (1 == 0) #endif +#if DEBUG +#define SPINLOCK_INIT { 0, 0, 0, NULL } +#else +#define SPINLOCK_INIT { 0 } +#endif + extern void spinlock_init(SPINLOCK *lock); extern void spinlock_acquire(SPINLOCK *lock); extern int spinlock_acquire_nowait(SPINLOCK *lock); diff --git a/modules/protocol/Makefile b/modules/protocol/Makefile index dc5e33b86..e434f4a35 100644 --- a/modules/protocol/Makefile +++ b/modules/protocol/Makefile @@ -29,8 +29,9 @@ MYSQLBACKENDOBJ=$(MYSQLBACKENDSRCS:.c=.o) SRCS=$(MYSQLCLIENTSRCS) $(MYSQLBACKENDSRCS) OBJ=$(SRCS:.c=.o) LIBS= +MODULES=libMySQLClient.so libMySQLBackend.so -all: libMySQLClient.so libMySQLBackend.so +all: $(MODULES) libMySQLClient.so: $(MYSQLCLIENTOBJ) $(CC) $(LDFLAGS) $(MYSQLCLIENTOBJ) $(LIBS) -o $@ @@ -46,3 +47,6 @@ clean: tags: ctags $(SRCS) $(HDRS) + +install: $(MODULES) + install -D $< $(DEST)/gateway/modules diff --git a/modules/protocol/mysql_client.c b/modules/protocol/mysql_client.c index 1f5835b93..8eea7dd61 100644 --- a/modules/protocol/mysql_client.c +++ b/modules/protocol/mysql_client.c @@ -16,7 +16,9 @@ * Copyright SkySQL Ab 2013 */ -/* +/** + * @file mysql_client.c + * * MySQL Protocol module for handling the protocol between the gateway * and the client. * @@ -59,7 +61,7 @@ static GWPROTOCOL MyObject = { gw_MySQLListener /* Listen */ }; -/* +/** * Implementation of the mandatory version entry point * * @return version string of the module @@ -70,7 +72,7 @@ version() return version_str; } -/* +/** * The module initialisation routine, called when the module * is first loaded. */ @@ -80,7 +82,7 @@ ModuleInit() fprintf(stderr, "Initial MySQL Client Protcol module.\n"); } -/* +/** * The module entry point routine. It is this routine that * must populate the structure that is referred to as the * "module object", this is a structure with the set of @@ -94,7 +96,7 @@ GetModuleObject() return &MyObject; } -/* +/** * mysql_send_ok * * Send a MySQL protocol OK message to the dcb (client) @@ -175,16 +177,16 @@ mysql_send_ok(DCB *dcb, int packet_number, int in_affected_rows, const char* mys return sizeof(mysql_packet_header) + mysql_payload_size; } -/* +/** * mysql_send_auth_error * - * Send a MySQL protocol ERR message, for gatewayauthentication error to the dcb + * Send a MySQL protocol ERR message, for gateway authentication error to the dcb * * @param dcb Descriptor Control Block for the connection to which the OK is sent * @param packet_number * @param in_affected_rows * @param mysql_message - * @return packet lenght + * @return packet length * */ int @@ -259,11 +261,11 @@ mysql_send_auth_error (DCB *dcb, int packet_number, int in_affected_rows, const return sizeof(mysql_packet_header) + mysql_payload_size; } -/* +/** * MySQLSendHandshake * - * @param dcb The descriptor control block to use for sendign the handshake request - * @return + * @param dcb The descriptor control block to use for sending the handshake request + * @return The packet length sent */ int MySQLSendHandshake(DCB* dcb) @@ -287,6 +289,7 @@ MySQLSendHandshake(DCB* dcb) uint8_t mysql_filler_ten[10]; uint8_t mysql_last_byte = 0x00; char server_scramble[GW_MYSQL_SCRAMBLE_SIZE + 1]=""; + MySQLProtocol *protocol = DCB_PROTOCOL(dcb, MySQLProtocol); GWBUF *buf; @@ -406,7 +409,7 @@ MySQLSendHandshake(DCB* dcb) return sizeof(mysql_packet_header) + mysql_payload_size; } -/* +/** * gw_mysql_do_authentication * * Performs the MySQL protocol 4.1 authentication, using data in GWBUF *queue @@ -415,7 +418,7 @@ MySQLSendHandshake(DCB* dcb) * client_capabilitiesa are copied into the dcb->protocol * * @param dcb Descriptor Control Block of the client - * @param The GWBUF with data from client + * @param queue The GWBUF with data from client * @return 0 for Authentication ok, !=1 for failed autht * */ @@ -578,7 +581,7 @@ static int gw_check_mysql_scramble_data(uint8_t *token, unsigned int token_len, return memcmp(password, check_hash, SHA_DIGEST_LENGTH); } -/* +/** * Write function for client DCB: writes data from Gateway to Client * * @param dcb The DCB of the client @@ -652,9 +655,13 @@ int w, saved_errno = 0; return 0; } -////////////////////////////////////////// -//client read event triggered by EPOLLIN -////////////////////////////////////////// +/** + * Client read event triggered by EPOLLIN + * + * @param dcb Descriptor control block + * @param epfd Epoll descriptor + * @return TRUE on error + */ int gw_read_client_event(DCB* dcb, int epfd) { MySQLProtocol *protocol = NULL; uint8_t buffer[MAX_BUFFER_SIZE] = ""; @@ -762,9 +769,9 @@ int gw_read_client_event(DCB* dcb, int epfd) { fprintf(stderr, "COM_QUIT received\n"); if (dcb->session->backends) { dcb->session->backends->func.write(dcb, queue); - (dcb->session->backends->func).error(dcb->session->backends, epfd, -1); + (dcb->session->backends->func).error(dcb->session->backends, epfd); } - (dcb->func).error(dcb, epfd, -1); + (dcb->func).error(dcb, epfd); return 1; } @@ -843,9 +850,9 @@ int gw_write_client_event(DCB *dcb, int epfd) { // still to implement mysql_send_auth_error(dcb, 2, 0, "Authorization failed"); - dcb->func.error(dcb, epfd, -1); + dcb->func.error(dcb, epfd); if (dcb->session->backends) - dcb->session->backends->func.error(dcb->session->backends, epfd, -1); + dcb->session->backends->func.error(dcb->session->backends, epfd); return 0; } @@ -1030,28 +1037,24 @@ int gw_MySQLAccept(DCB *listener, int efd) { setsockopt(c_sock, SOL_SOCKET, SO_SNDBUF, &sendbuf, optlen); setnonblocking(c_sock); - client = (DCB *) calloc(1, sizeof(DCB)); - session = (SESSION *) calloc(1, sizeof(SESSION)); - protocol = (MySQLProtocol *) calloc(1, sizeof(MySQLProtocol)); - + client = alloc_dcb(); client->fd = c_sock; - client->state = DCB_STATE_ALLOC; + + session = session_alloc(listener->session->service, client); client->session = session; + + protocol = (MySQLProtocol *) calloc(1, sizeof(MySQLProtocol)); client->protocol = (void *)protocol; - session->state = SESSION_STATE_ALLOC; - session->client = client; - session->backends = NULL; protocol->state = MYSQL_ALLOC; protocol->descriptor = client; protocol->fd = c_sock; + session->backends = NULL; + // assign function poiters to "func" field - (client->func).error = gw_error_client_event; - (client->func).read = gw_read_client_event; - (client->func).write = gw_MySQLWrite_client; - (client->func).write_ready = gw_write_client_event; + memcpy(&client->func, &MyObject, sizeof(GWPROTOCOL)); // edge triggering flag added ee.events = EPOLLIN | EPOLLOUT | EPOLLET; @@ -1084,6 +1087,8 @@ int gw_MySQLAccept(DCB *listener, int efd) { */ static int gw_error_client_event(DCB *dcb, int epfd, int event) { struct epoll_event ed; + + MySQLProtocol *protocol = DCB_PROTOCOL(dcb, MySQLProtocol); fprintf(stderr, "#### Handle error function for [%i] is [%s]\n", dcb->state, gw_dcb_state2string(dcb->state)); diff --git a/modules/protocol/telnetd.c b/modules/protocol/telnetd.c index 754ffd5fa..d4da11535 100644 --- a/modules/protocol/telnetd.c +++ b/modules/protocol/telnetd.c @@ -18,9 +18,26 @@ #include #include #include +#include +#include +#include +#include +#include +#include +#include +#include /** - * @file telnetd - telnet daemon protocol module + * @file telnetd.c - telnet daemon protocol module + * + * The telnetd protocol module is intended as a mechanism to allow connections + * into the gateway for the purpsoe of accessing debugging information within + * the gateway rather than a protocol to be used to send queries to backend + * databases. + * + * In the first instance it is intended to allow a debug connection to access + * internal data structures, however it may also be used to manage the + * configuration of the gateway. * * @verbatim * Revision History @@ -39,6 +56,7 @@ static int telnetd_error(DCB *dcb, int event); static int telnetd_hangup(DCB *dcb, int event); static int telnetd_accept(DCB *dcb, int event); static int telnetd_close(DCB *dcb, int event); +static int telnetd_listen(DCB *dcb, int event); /** * The "module object" for the telnetd protocol module. @@ -51,7 +69,8 @@ static GWPROTOCOL MyObject = { telnetd_hangup, /**< HangUp - EPOLLHUP handler */ telnetd_accept, /**< Accept */ NULL, /**< Connect */ - telnetd_close /**< Close */ + telnetd_close, /**< Close */ + telnetd_listen /**< Create a listener */ }; /** @@ -72,7 +91,7 @@ version() void ModuleInit() { - fprintf(stderr, "Initialise Telnetd Protcol module.\n"); + fprintf(stderr, "Initialise Telnetd Protocol module.\n"); } /** @@ -99,6 +118,19 @@ GetModuleObject() static int telnetd_read_event(DCB* dcb, int epfd) { +int n; +GWBUF *head = NULL; +SESSION *session = dcb->session; +ROUTER_OBJECT *router = session->service->router; +ROUTER *router_instanc = session->service->router_instance; +void *rsession = session->router_session; + + if ((n = dcb_read(dcb, &head)) != -1) + { + router->routeQuery(router_instance, rsession, head); + } + + return n; } /** @@ -111,6 +143,7 @@ telnetd_read_event(DCB* dcb, int epfd) static int telnetd_write_event(DCB *dcb, int epfd) { + int dcb_drain_writeq(dcb); } /** @@ -125,13 +158,14 @@ telnetd_write_event(DCB *dcb, int epfd) static int telnetd_write(DCB *dcb, GWBUF *queue) { + return dcb_write(dcb, queue); } /** * Handler for the EPOLLERR event. * * @param dcb The descriptor control block - * @param epfd The epoll descriptor + * @param event The epoll descriptor */ static int telnetd_error(DCB *dcb, int event) @@ -142,7 +176,7 @@ telnetd_error(DCB *dcb, int event) * Handler for the EPOLLHUP event. * * @param dcb The descriptor control block - * @param epfd The epoll descriptor + * @param event The epoll descriptor */ static int telnetd_hangup(DCB *dcb, int event) @@ -154,11 +188,45 @@ telnetd_hangup(DCB *dcb, int event) * socket for the protocol. * * @param dcb The descriptor control block - * @param epfd The epoll descriptor + * @param event The epoll descriptor */ static int telnetd_accept(DCB *dcb, int event) { +int n_connect = 0; + + while (1) + { + int so; + struct sockaddr_in addr; + socklen_t addrlen; + DCB *client; + struct epoll_event ee; + + if ((so = accept(dcb->fd, (struct sockaddr *)&addr, &addrlen)) == -1) + return n_connect; + else + { + atomic_add(&dcb->stats.n_accepts, 1); + client = alloc_dcb(); + client->fd = so; + memcpy(&client->func, MyObject, sizeof(GWPROTOCOL)); + client->session = session_alloc(listener->service, client); + + ee.events = EPOLLIN | EPOLLOUT | EPOLLET; + ee.data.ptr = client; + client->state = DCB_STATE_IDLE; + + if (epoll_ctl(efd, EPOLL_CTL_ADD, c_sock, &ee) == -1) + { + return n_connect; + } + n_connect++; + + dcb_printf(client, "Gateway> "); + } + } + return n_connect; } /** @@ -166,7 +234,7 @@ telnetd_accept(DCB *dcb, int event) * explicitly close a connection. * * @param dcb The descriptor control block - * @param epfd The epoll descriptor + * @param event The epoll descriptor */ static int @@ -174,4 +242,61 @@ telnetd_close(DCB *dcb, int event) { } +/** + * Telnet daemon listener entry point + * + * @param efd epoll File descriptor + * @param config Configuration (ip:port) + */ +static int +telnetd_listen(int efd, char *config) +{ +DCB *listener; +struct sockaddr_in addr; +char *port; +struct epoll_event ev; +int one; + if ((listener = dcb_alloc()) == NULL) + { + return 0; + } + + memcpy(&listener->func, MyObject, sizeof(GWPROTOCOL)); + + port = strrchr(config, ':'); + if (port) + port++; + else + port = "4442"; + + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = htonl(INADDR_ANY); + addr.sin_port = htons(port); + + if ((listener->fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) + { + return 0; + } + + // socket options + setsockopt(listener->fd, SOL_SOCKET, SO_REUSEADDR, (char *)&one, sizeof(one)); + // set NONBLOCKING mode + setnonblocking(listener->fd); + bind address and port + if (bind(listener->fd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) + { + return 0; + } + + listener->state = DCB_STATE_LISTENING; + listen(listener->fd, SOMAXCONN); + + ev.events = EPOLLIN; + ev.data.ptr = listener; + if (epoll_ctl(epfd, EPOLL_CTL_ADD, l_so, &ev) == -1) + { + return 0; + } + return 1; +} diff --git a/modules/routing/Makefile b/modules/routing/Makefile index 80dbf6134..320417de7 100644 --- a/modules/routing/Makefile +++ b/modules/routing/Makefile @@ -28,8 +28,9 @@ READCONOBJ=$(READCONSRCS:.c=.o) SRCS=$(TESTSRCS) $(READCONSRCS) OBJ=$(SRCS:.c=.o) LIBS=-lssl +MODULES=libtestroute.so libreadconnroute.so -all: libtestroute.so libreadconnroute.so +all: $(MODULES) libtestroute.so: $(TESTOBJ) $(CC) $(LDFLAGS) $(TESTOBJ) $(LIBS) -o $@ @@ -52,4 +53,7 @@ depend.mk: $(SRCS) @rm -f depend.mk cc -M $(CFLAGS) $(SRCS) > depend.mk +install: $(MODULES) + install -D $< $(DEST)/gateway/modules + include depend.mk diff --git a/modules/routing/depend.mk b/modules/routing/depend.mk index 84764e1b4..a93efe0dd 100644 --- a/modules/routing/depend.mk +++ b/modules/routing/depend.mk @@ -6,8 +6,14 @@ testroute.o: testroute.c /usr/include/stdio.h /usr/include/features.h \ /usr/include/libio.h /usr/include/_G_config.h /usr/include/wchar.h \ /usr/lib/gcc/x86_64-redhat-linux/4.4.6/include/stdarg.h \ /usr/include/bits/stdio_lim.h /usr/include/bits/sys_errlist.h \ - ../../include/router.h ../../include/service.h ../../include/session.h \ - ../../include/buffer.h + ../../include/router.h ../../include/service.h /usr/include/time.h \ + /usr/include/bits/time.h /usr/include/xlocale.h ../../include/spinlock.h \ + ../../include/thread.h /usr/include/pthread.h /usr/include/endian.h \ + /usr/include/bits/endian.h /usr/include/bits/byteswap.h \ + /usr/include/sched.h /usr/include/bits/sched.h \ + /usr/include/bits/pthreadtypes.h /usr/include/bits/setjmp.h \ + ../../include/dcb.h ../../include/buffer.h ../../include/server.h \ + ../../include/session.h readconnroute.o: readconnroute.c /usr/include/stdio.h \ /usr/include/features.h /usr/include/sys/cdefs.h \ /usr/include/bits/wordsize.h /usr/include/gnu/stubs.h \ @@ -25,8 +31,8 @@ readconnroute.o: readconnroute.c /usr/include/stdio.h \ /usr/include/bits/time.h /usr/include/sys/sysmacros.h \ /usr/include/bits/pthreadtypes.h /usr/include/alloca.h \ /usr/include/string.h /usr/include/xlocale.h ../../include/service.h \ + ../../include/spinlock.h ../../include/thread.h /usr/include/pthread.h \ + /usr/include/sched.h /usr/include/bits/sched.h \ + /usr/include/bits/setjmp.h ../../include/dcb.h ../../include/buffer.h \ ../../include/server.h ../../include/router.h ../../include/session.h \ - ../../include/buffer.h ../../include/atomic.h ../../include/spinlock.h \ - ../../include/thread.h /usr/include/pthread.h /usr/include/sched.h \ - /usr/include/bits/sched.h /usr/include/bits/setjmp.h \ - ../include/readconnection.h ../../include/dcb.h + ../../include/atomic.h ../include/readconnection.h diff --git a/modules/routing/readconnroute.c b/modules/routing/readconnroute.c index eb4d9e034..95b6d7019 100644 --- a/modules/routing/readconnroute.c +++ b/modules/routing/readconnroute.c @@ -127,7 +127,7 @@ int i, n; * that we can maintain a count of the number of connections to each * backend server. */ - for (server = service->databases, n = 0; server; server = server->next) + for (server = service->databases, n = 0; server; server = server->nextdb) n++; inst->servers = (BACKEND **)calloc(n, sizeof(BACKEND *)); @@ -137,7 +137,7 @@ int i, n; return NULL; } - for (server = service->databases, n = 0; server; server = server->next) + for (server = service->databases, n = 0; server; server = server->nextdb) { if ((inst->servers[n] = malloc(sizeof(BACKEND))) == NULL) { @@ -224,8 +224,8 @@ int i; * Close a session with the router, this is the mechanism * by which a router may cleanup data structure etc. * - * @param instance The router instance data - * @param session The session being closed + * @param instance The router instance data + * @param router_session The session being closed */ static void closeSession(ROUTER *instance, void *router_session) @@ -265,9 +265,10 @@ CLIENT_SESSION *session = (CLIENT_SESSION *)router_session; * This is simply a case of sending it to the connection that was * chosen when we started the client session. * - * @param instance The router instance - * @param session The router session returned from the newSession call - * @param queue The queue of data buffers to route + * @param instance The router instance + * @param router_session The router session returned from the newSession call + * @param queue The queue of data buffers to route + * @return The number of bytes sent */ static int routeQuery(ROUTER *instance, void *router_session, GWBUF *queue)