From 0fc2f9dda30d18aedb1f56f857e402ea0642841b Mon Sep 17 00:00:00 2001 From: Mark Riddoch Date: Wed, 19 Jun 2013 16:29:52 +0200 Subject: [PATCH] Removed the passing of epoll fd and isolated epoll functionality to a signle file Addition of show epoll debug CLI command --- core/Makefile | 4 +- core/dcb.c | 9 +- core/depend.mk | 24 ++- core/gateway.c | 144 ++++---------- core/gateway_mysql_protocol.c | 2 +- core/gw_utils.c | 32 ++-- core/poll.c | 177 ++++++++++++++++++ core/service.c | 5 +- core/utils.c | 52 ++--- include/dcb.h | 18 +- include/gw.h | 10 +- include/poll.h | 40 ++++ include/service.h | 2 +- .../include/mysql_client_server_protocol.h | 1 - modules/protocol/mysql_backend.c | 18 +- modules/protocol/mysql_client.c | 62 +++--- modules/protocol/mysql_common.c | 12 +- modules/protocol/telnetd.c | 72 ++++--- modules/routing/debugcli.c | 6 +- modules/routing/readconnroute.c | 2 +- 20 files changed, 412 insertions(+), 280 deletions(-) create mode 100644 core/poll.c create mode 100644 include/poll.h diff --git a/core/Makefile b/core/Makefile index 08fce4f48..0b9b7ace6 100644 --- a/core/Makefile +++ b/core/Makefile @@ -24,11 +24,11 @@ CC=cc CFLAGS=-c -I/usr/include -I../include -Wall -g LDFLAGS=-rdynamic SRCS= atomic.c buffer.c spinlock.c gateway.c gateway_mysql_protocol.c gw_utils.c \ - utils.c dcb.c load_utils.c session.c service.c server.c + utils.c dcb.c load_utils.c session.c service.c server.c poll.c HDRS= ../include/atomic.h ../include/buffer.h ../include/dcb.h \ ../include/gateway_mysql.h ../include/gw.h ../include/mysql_protocol.h \ ../include/session.h ../include/spinlock.h ../include/thread.h \ - ../include/modules.h + ../include/modules.h ../include/poll.h OBJ=$(SRCS:.c=.o) LIBS=-lssl diff --git a/core/dcb.c b/core/dcb.c index 172610c16..c696ba240 100644 --- a/core/dcb.c +++ b/core/dcb.c @@ -44,6 +44,7 @@ #include #include #include +#include static DCB *allDCBs = NULL; /* Diagnotics need a list of DCBs */ static SPINLOCK *dcbspin = NULL; @@ -130,7 +131,6 @@ connect_dcb(SERVER *server, SESSION *session, const char *protocol) { DCB *dcb; GWPROTOCOL *funcs; -int epollfd = -1; // Need to work out how to get this if ((dcb = alloc_dcb()) == NULL) { @@ -144,11 +144,13 @@ int epollfd = -1; // Need to work out how to get this memcpy(&(dcb->func), funcs, sizeof(GWPROTOCOL)); dcb->session = session; - if ((dcb->fd = dcb->func.connect(server, session, epollfd)) == -1) + if ((dcb->fd = dcb->func.connect(server, session)) == -1) { free(dcb); return NULL; } + + poll_add_dcb(dcb); /* * We are now connected, the authentication etc will happen as * part of the EPOLLOUT event that will be received once the connection @@ -342,9 +344,10 @@ int saved_errno = 0; * @param dcb The DCB to close */ void -dcb_close(DCB *dcb, int efd) +dcb_close(DCB *dcb) { close(dcb->fd); + dcb->state = DCB_STATE_DISCONNECTED; } /** diff --git a/core/depend.mk b/core/depend.mk index b07b1c4dc..176bb4a42 100644 --- a/core/depend.mk +++ b/core/depend.mk @@ -259,7 +259,8 @@ dcb.o: dcb.c /usr/include/stdio.h /usr/include/features.h \ /usr/include/bits/ioctl-types.h /usr/include/sys/ttydefaults.h \ /usr/include/arpa/inet.h \ /usr/lib/gcc/x86_64-redhat-linux/4.4.6/include/stdbool.h \ - ../include/gateway_mysql.h ../include/mysql_protocol.h ../include/dcb.h + ../include/gateway_mysql.h ../include/mysql_protocol.h ../include/dcb.h \ + ../include/poll.h load_utils.o: load_utils.c /usr/include/sys/param.h \ /usr/lib/gcc/x86_64-redhat-linux/4.4.6/include/limits.h \ /usr/lib/gcc/x86_64-redhat-linux/4.4.6/include/syslimits.h \ @@ -351,3 +352,24 @@ server.o: server.c /usr/include/stdio.h /usr/include/features.h \ ../include/server.h ../include/spinlock.h ../include/thread.h \ /usr/include/pthread.h /usr/include/sched.h /usr/include/bits/sched.h \ /usr/include/bits/setjmp.h ../include/dcb.h ../include/buffer.h +poll.o: poll.c /usr/include/stdio.h /usr/include/features.h \ + /usr/include/sys/cdefs.h /usr/include/bits/wordsize.h \ + /usr/include/gnu/stubs.h /usr/include/gnu/stubs-64.h \ + /usr/lib/gcc/x86_64-redhat-linux/4.4.6/include/stddef.h \ + /usr/include/bits/types.h /usr/include/bits/typesizes.h \ + /usr/include/libio.h /usr/include/_G_config.h /usr/include/wchar.h \ + /usr/lib/gcc/x86_64-redhat-linux/4.4.6/include/stdarg.h \ + /usr/include/bits/stdio_lim.h /usr/include/bits/sys_errlist.h \ + /usr/include/string.h /usr/include/xlocale.h /usr/include/unistd.h \ + /usr/include/bits/posix_opt.h /usr/include/bits/environments.h \ + /usr/include/bits/confname.h /usr/include/getopt.h \ + /usr/include/sys/epoll.h /usr/include/stdint.h /usr/include/bits/wchar.h \ + /usr/include/sys/types.h /usr/include/time.h /usr/include/endian.h \ + /usr/include/bits/endian.h /usr/include/bits/byteswap.h \ + /usr/include/sys/select.h /usr/include/bits/select.h \ + /usr/include/bits/sigset.h /usr/include/bits/time.h \ + /usr/include/sys/sysmacros.h /usr/include/bits/pthreadtypes.h \ + ../include/poll.h ../include/dcb.h ../include/spinlock.h \ + ../include/thread.h /usr/include/pthread.h /usr/include/sched.h \ + /usr/include/bits/sched.h /usr/include/bits/setjmp.h ../include/buffer.h \ + ../include/atomic.h diff --git a/core/gateway.c b/core/gateway.c index 4684e28ea..5407d7281 100644 --- a/core/gateway.c +++ b/core/gateway.c @@ -28,6 +28,7 @@ * 12-06-2013 Mark Riddoch Add the -p option to set the * listening port * and bind addr is 0.0.0.0 + * 19/06/13 Mark Riddoch Extract the epoll functionality * * @endverbatim */ @@ -38,9 +39,6 @@ #include #include -// epoll fd, global! -static int epollfd; - void myfree(void** pp) { free(*pp); *pp = NULL; } /* basic signal handling */ @@ -68,8 +66,7 @@ static void signal_set (int sig, void (*handler)(int)) { } } -int handle_event_errors(DCB *dcb, int event) { - struct epoll_event ed; +int handle_event_errors(DCB *dcb) { MySQLProtocol *protocol = DCB_PROTOCOL(dcb, MySQLProtocol); fprintf(stderr, "#### Handle error function for [%i] is [%s]\n", dcb->state, gw_dcb_state2string(dcb->state)); @@ -94,8 +91,8 @@ int handle_event_errors(DCB *dcb, int event) { #endif if (dcb->state != DCB_STATE_LISTENING) { - if (epoll_ctl(epollfd, EPOLL_CTL_DEL, dcb->fd, &ed) == -1) { - fprintf(stderr, "epoll_ctl_del: from events check failed to delete %i, [%i]:[%s]\n", dcb->fd, errno, strerror(errno)); + if (poll_remove_dcb(dcb) == -1) { + fprintf(stderr, "poll_remove_dcb: from events check failed to delete %i, [%i]:[%s]\n", dcb->fd, errno, strerror(errno)); } #ifdef GW_EVENT_DEBUG @@ -128,8 +125,7 @@ int handle_event_errors(DCB *dcb, int event) { return 1; } -int handle_event_errors_backend(DCB *dcb, int event) { - struct epoll_event ed; +int handle_event_errors_backend(DCB *dcb) { MySQLProtocol *protocol = DCB_PROTOCOL(dcb, MySQLProtocol); fprintf(stderr, "#### Handle Backend error function for %i\n", dcb->fd); @@ -149,8 +145,8 @@ int handle_event_errors_backend(DCB *dcb, int event) { #endif if (dcb->state != DCB_STATE_LISTENING) { - if (epoll_ctl(epollfd, EPOLL_CTL_DEL, dcb->fd, &ed) == -1) { - fprintf(stderr, "Backend epoll_ctl_del: from events check failed to delete %i, [%i]:[%s]\n", dcb->fd, errno, strerror(errno)); + if (poll_remove_dcb(dcb) == -1) { + fprintf(stderr, "Backend poll_remove_dcb: from events check failed to delete %i, [%i]:[%s]\n", dcb->fd, errno, strerror(errno)); } #ifdef GW_EVENT_DEBUG @@ -171,8 +167,6 @@ main(int argc, char **argv) { int daemon_mode = 1; sigset_t sigset; -struct epoll_event events[MAX_EVENTS]; -struct epoll_event ev; int nfds; int n; unsigned short port = 4406; @@ -218,27 +212,28 @@ SERVER *server1, *server2, *server3; load_module("testroute", "Router"); - if (sigfillset(&sigset) != 0) { - fprintf(stderr, "sigfillset() error %s\n", strerror(errno)); - return 1; - } + if (daemon_mode == 1) + { + if (sigfillset(&sigset) != 0) { + fprintf(stderr, "sigfillset() error %s\n", strerror(errno)); + return 1; + } - if (sigdelset(&sigset, SIGHUP) != 0) { - fprintf(stderr, "sigdelset(SIGHUP) error %s\n", strerror(errno)); - } + if (sigdelset(&sigset, SIGHUP) != 0) { + fprintf(stderr, "sigdelset(SIGHUP) error %s\n", strerror(errno)); + } - if (sigdelset(&sigset, SIGTERM) != 0) { - fprintf(stderr, "sigdelset(SIGTERM) error %s\n", strerror(errno)); - } + if (sigdelset(&sigset, SIGTERM) != 0) { + fprintf(stderr, "sigdelset(SIGTERM) error %s\n", strerror(errno)); + } - if (sigprocmask(SIG_SETMASK, &sigset, NULL) != 0) { - fprintf(stderr, "sigprocmask() error %s\n", strerror(errno)); - } - - signal_set(SIGHUP, sighup_handler); - signal_set(SIGTERM, sigterm_handler); + if (sigprocmask(SIG_SETMASK, &sigset, NULL) != 0) { + fprintf(stderr, "sigprocmask() error %s\n", strerror(errno)); + } + + signal_set(SIGHUP, sighup_handler); + signal_set(SIGTERM, sigterm_handler); - if (daemon_mode == 1) { gw_daemonize(); } @@ -246,91 +241,18 @@ SERVER *server1, *server2, *server3; fprintf(stderr, ">> GATEWAY log is /dev/stderr\n"); - epollfd = epoll_create(MAX_EVENTS); - - if (epollfd == -1) { - perror("epoll_create"); - exit(EXIT_FAILURE); - } + poll_init(); /* * Start the service that was created above */ - serviceStart(service1, epollfd); - serviceStart(service2, epollfd); + serviceStart(service1); + serviceStart(service2); - fprintf(stderr, ">> GATEWAY epoll maxevents is %i\n", MAX_EVENTS); + fprintf(stderr, ">> GATEWAY poll maxevents is %i\n", MAX_EVENTS); - // listen to MySQL protocol - /* - 1. create socket - 2. set reuse - 3. set nonblock - 4. listen - 5. bind - 6. epoll add event - */ - // MySQLListener(epollfd, port); - - // event loop for all the descriptors added via epoll_ctl - while (1) { - nfds = epoll_wait(epollfd, events, MAX_EVENTS, -1); - //nfds = epoll_wait(epollfd, events, MAX_EVENTS, 1000); - if (nfds == -1 && (errno != EINTR)) { - perror("GW: epoll_pwait ERROR"); - exit(EXIT_FAILURE); - } - -#ifdef GW_EVENT_DEBUG - fprintf(stderr, "wake from epoll_wait, n. %i events\n", nfds); -#endif - - for (n = 0; n < nfds; ++n) { - DCB *dcb = (DCB *) (events[n].data.ptr); - - -#ifdef GW_EVENT_DEBUG - fprintf(stderr, "New event %i for socket %i is %i\n", n, dcb->fd, events[n].events); - if (events[n].events & EPOLLIN) - fprintf(stderr, "New event %i for socket %i is EPOLLIN\n", n, dcb->fd); - if (events[n].events & EPOLLOUT) - fprintf(stderr, "New event %i for socket %i is EPOLLOUT\n", n, dcb->fd); - if (events[n].events & EPOLLPRI) - fprintf(stderr, "New event %i for socket %i is EPOLLPRI\n", n, dcb->fd); - -#endif - if (events[n].events & (EPOLLERR | EPOLLHUP)) { - //fprintf(stderr, "CALL the ERROR pointer\n"); - (dcb->func).error(dcb, events[n].events); - //fprintf(stderr, "CALLED the ERROR pointer\n"); - - // go to next event - continue; - } - - if (events[n].events & EPOLLIN) { - // now checking the listening socket - if (dcb->state == DCB_STATE_LISTENING) { - (dcb->func).accept(dcb, epollfd); - - } else { - //fprintf(stderr, "CALL the READ pointer\n"); - (dcb->func).read(dcb, epollfd); - //fprintf(stderr, "CALLED the READ pointer\n"); - } - } - - - if (events[n].events & EPOLLOUT) { - if (dcb->state != DCB_STATE_LISTENING) { - //fprintf(stderr, "CALL the WRITE pointer\n"); - (dcb->func).write_ready(dcb, epollfd); - //fprintf(stderr, ">>> CALLED the WRITE pointer\n"); - } - } - - } // filedesc loop - } // infinite loop - - // End of while (1) + while (1) + { + poll(); + } } // End of main diff --git a/core/gateway_mysql_protocol.c b/core/gateway_mysql_protocol.c index b7b9a937e..482363ff6 100644 --- a/core/gateway_mysql_protocol.c +++ b/core/gateway_mysql_protocol.c @@ -556,7 +556,7 @@ int gw_mysql_read_command(DCB *dcb) { if (packet_no < 0) { fprintf(stderr, "DCB [%i], EPOLLIN Protocol exiting from MYSQL_IDLE [%i], Packet #%i for socket %i, scramble [%s]\n", dcb->state, protocol->state, packet_no, dcb->fd, protocol->scramble); - (dcb->func).error(dcb, -1); + (dcb->func).error(dcb); fprintf(stderr, "closing fd [%i], from MYSQL_IDLE\n", dcb->fd); diff --git a/core/gw_utils.c b/core/gw_utils.c index ed96fee19..ccb3d43b8 100644 --- a/core/gw_utils.c +++ b/core/gw_utils.c @@ -208,9 +208,9 @@ int gw_read_gwbuff(DCB *dcb, GWBUF **head, int b) { if (b <= 0) { fprintf(stderr, "||| read_gwbuff called with 0 bytes, closing\n"); if (dcb->session->backends) { - (dcb->session->backends->func).error(dcb->session->backends, -1); + (dcb->session->backends->func).error(dcb->session->backends); } - dcb->func.error(dcb, -1); + dcb->func.error(dcb); return 1; } @@ -220,9 +220,9 @@ int gw_read_gwbuff(DCB *dcb, GWBUF **head, int b) { /* Bad news, we have run out of memory */ /* Error handling */ if (dcb->session->backends) { - (dcb->session->backends->func).error(dcb->session->backends, -1); + (dcb->session->backends->func).error(dcb->session->backends); } - (dcb->func).error(dcb, -1); + (dcb->func).error(dcb); return 1; } @@ -235,9 +235,9 @@ int gw_read_gwbuff(DCB *dcb, GWBUF **head, int b) { } else { fprintf(stderr, "Client connection %i error: %i, %s\n", dcb->fd, errno, strerror(errno));; if (dcb->session->backends) { - (dcb->session->backends->func).error(dcb->session->backends, -1); + (dcb->session->backends->func).error(dcb->session->backends); } - (dcb->func).error(dcb, -1); + (dcb->func).error(dcb); return 1; } } @@ -246,9 +246,9 @@ int gw_read_gwbuff(DCB *dcb, GWBUF **head, int b) { // socket closed fprintf(stderr, "Client connection %i closed: %i, %s\n", dcb->fd, errno, strerror(errno)); if (dcb->session->backends) { - (dcb->session->backends->func).error(dcb->session->backends, -1); + (dcb->session->backends->func).error(dcb->session->backends); } - (dcb->func).error(dcb, -1); + (dcb->func).error(dcb); return 1; } @@ -266,18 +266,16 @@ int gw_read_gwbuff(DCB *dcb, GWBUF **head, int b) { * Create a new MySQL backend connection. * * This routine performs the MySQL connection to the backend and fills the session->backends of the callier dcb - * with the new allocatetd dcb and adds the new socket to the epoll set + * with the new allocatetd dcb and adds the new socket to the poll set * * - backend dcb allocation * - MySQL session data fetch * - backend connection using data in MySQL session * * @param client_dcb The client DCB struct - * @param efd The epoll set to add the new connection * @return 0 on Success or 1 on Failure. */ -int create_backend_connection(DCB *client_dcb, int efd) { - struct epoll_event ee; +int create_backend_connection(DCB *client_dcb) { DCB *backend = NULL; MySQLProtocol *ptr_proto = NULL; MySQLProtocol *client_protocol = NULL; @@ -304,14 +302,10 @@ int create_backend_connection(DCB *client_dcb, int efd) { backend->fd = -1; } - // edge triggering flag added - ee.events = EPOLLIN | EPOLLET | EPOLLOUT; - ee.data.ptr = backend; - - // if connected, add it to the epoll + // if connected, add it to the poll if (backend->fd > 0) { - if (epoll_ctl(efd, EPOLL_CTL_ADD, backend->fd, &ee) == -1) { - perror("epoll_ctl: backend sock"); + if (poll_add_dcb(backend) == -1) { + perror("poll_ctl: backend sock"); } else { fprintf(stderr, "--> Backend conn added, bk_fd [%i], scramble [%s], is session with client_fd [%i]\n", ptr_proto->fd, ptr_proto->scramble, client_dcb->fd); backend->state = DCB_STATE_POLLING; diff --git a/core/poll.c b/core/poll.c new file mode 100644 index 000000000..42cc726bd --- /dev/null +++ b/core/poll.c @@ -0,0 +1,177 @@ +/* + * This file is distributed as part of the SkySQL Gateway. It is free + * software: you can redistribute it and/or modify it under the terms of the + * GNU General Public License as published by the Free Software Foundation, + * version 2. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Copyright SkySQL Ab 2013 + */ +#include +#include +#include +#include +#include +#include +#include + +/** + * @file poll.c - Abraction of the epoll functionality + * + * @verbatim + * Revision History + * + * Date Who Description + * 19/06/13 Mark Riddoch Initial implementation + * + * @endverbatim + */ + +static int epoll_fd = -1; /**< The epoll file descriptor */ + +/** + * The polling statistics + */ +static struct { + int n_read; /**< Number of read events */ + int n_write; /**< Number of write events */ + int n_error; /**< Number of error events */ + int n_hup; /**< Number of hangup events */ + int n_accept; /**< Number of accept events */ + int n_polls; /**< Number of poll cycles */ +} pollStats; + + +/** + * Initialise the polling system we are using for the gateway. + * + * In this case we are using the Linux epoll mechanism + */ +void +poll_init() +{ + if (epoll_fd != -1) + return; + if ((epoll_fd = epoll_create(MAX_EVENTS)) == -1) + { + perror("epoll_create"); + exit(-1); + } + memset(&pollStats, 0, sizeof(pollStats)); +} + +/** + * Add a DCB to the set of descriptors within the polling + * environment. + * + * @param dcb The descriptor to add to the poll + * @return -1 on error or 0 on success + */ +int +poll_add_dcb(DCB *dcb) +{ +struct epoll_event ev; + + ev.events = EPOLLIN | EPOLLOUT | EPOLLET; + ev.data.ptr = dcb; + + return epoll_ctl(epoll_fd, EPOLL_CTL_ADD, dcb->fd, &ev); + +} + +/** + * Remove a descriptor from the set of descriptors within the + * polling environment. + * + * @param dcb The descriptor to remove + * @return -1 on error or 0 on success + */ +int +poll_remove_dcb(DCB *dcb) +{ +struct epoll_event ev; + + return epoll_ctl(epoll_fd, EPOLL_CTL_DEL, dcb->fd, &ev); +} + +/** + * The main polling loop + * + * This routine does the polling and despatches of IO events + * to the DCB's + */ +void +poll() +{ +struct epoll_event events[MAX_EVENTS]; +int i, nfds; + + while (1) + { + atomic_add(&pollStats.n_polls, 1); + if ((nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1)) == -1) + { + } + else + { + for (i = 0; i < nfds; i++) + { + DCB *dcb = (DCB *)events[i].data.ptr; + __uint32_t ev = events[i].events; + + if (ev & EPOLLERR) + { + atomic_add(&pollStats.n_error, 1); + dcb->func.error(dcb); + } + if (ev & EPOLLHUP) + { + atomic_add(&pollStats.n_hup, 1); + dcb->func.hangup(dcb); + } + if (ev & EPOLLIN) + { + if (dcb->state == DCB_STATE_LISTENING) + { + atomic_add(&pollStats.n_accept, 1); + dcb->func.accept(dcb); + } + else + { + atomic_add(&pollStats.n_read, 1); + dcb->func.read(dcb); + } + } + if (ev & EPOLLOUT) + { + atomic_add(&pollStats.n_write, 1); + dcb->func.write_ready(dcb); + } + } + } + } +} + +/** + * Debug routine to print the polling statistics + * + * @param dcb DCB to print to + */ +void +dprintPollStats(DCB *dcb) +{ + dcb_printf(dcb, "Number of epoll cycles: %d\n", pollStats.n_polls); + dcb_printf(dcb, "Number of read events: %d\n", pollStats.n_read); + dcb_printf(dcb, "Number of write events: %d\n", pollStats.n_write); + dcb_printf(dcb, "Number of error events: %d\n", pollStats.n_error); + dcb_printf(dcb, "Number of hangup events: %d\n", pollStats.n_hup); + dcb_printf(dcb, "Number of accept events: %d\n", pollStats.n_accept); +} diff --git a/core/service.c b/core/service.c index 8e652f628..015edcf3b 100644 --- a/core/service.c +++ b/core/service.c @@ -86,11 +86,10 @@ SERVICE *service; * Also create the router_instance for the service. * * @param service The Service that should be started - * @param efd The epoll descriptor * @return Returns the number of listeners created */ int -serviceStart(SERVICE *service, int efd) +serviceStart(SERVICE *service) { SERV_PROTOCOL *port; int listeners = 0; @@ -114,7 +113,7 @@ GWPROTOCOL *funcs; memcpy(&(port->listener->func), funcs, sizeof(GWPROTOCOL)); port->listener->session = NULL; sprintf(config_bind, "0.0.0.0:%d", port->port); - if (port->listener->func.listen(port->listener, efd, config_bind)) + if (port->listener->func.listen(port->listener, config_bind)) listeners++; port->listener->session = session_alloc(service, port->listener); port->listener->session->state = SESSION_STATE_LISTENER; diff --git a/core/utils.c b/core/utils.c index 51b58d82b..f9c7773a7 100644 --- a/core/utils.c +++ b/core/utils.c @@ -54,7 +54,7 @@ char hex_lower[] = "0123456789abcdefghijklmnopqrstuvwxyz"; //backend read event triggered by EPOLLIN ////////////////////////////////////////// -int gw_read_backend_event(DCB *dcb, int epfd) { +int gw_read_backend_event(DCB *dcb) { int n; MySQLProtocol *client_protocol = NULL; @@ -67,7 +67,6 @@ int gw_read_backend_event(DCB *dcb, int epfd) { #endif if ((client_protocol->state == MYSQL_WAITING_RESULT) || (client_protocol->state == MYSQL_IDLE)) { - struct epoll_event new_event; int w; int b = -1; int tot_b = -1; @@ -193,7 +192,7 @@ int w, saved_errno = 0; //backend write event triggered by EPOLLOUT ////////////////////////////////////////// -int gw_write_backend_event(DCB *dcb, int epfd) { +int gw_write_backend_event(DCB *dcb) { //fprintf(stderr, ">>> gw_write_backend_event for %i\n", dcb->fd); @@ -309,9 +308,9 @@ int gw_route_read_event(DCB* dcb, int epfd) { fprintf(stderr, "COM_QUIT received\n"); if (dcb->session->backends) { dcb->session->backends->func.write(dcb, queue); - (dcb->session->backends->func).error(dcb->session->backends, -1); + (dcb->session->backends->func).error(dcb->session->backends); } - (dcb->func).error(dcb, -1); + (dcb->func).error(dcb); return 1; } @@ -339,10 +338,9 @@ int gw_route_read_event(DCB* dcb, int epfd) { /////////////////////////////////////////////// // client write event triggered by EPOLLOUT ////////////////////////////////////////////// -int gw_handle_write_event(DCB *dcb, int epfd) { +int gw_handle_write_event(DCB *dcb) { MySQLProtocol *protocol = NULL; int n; - struct epoll_event new_event; if (dcb == NULL) { fprintf(stderr, "DCB is NULL, return\n"); @@ -385,9 +383,9 @@ int gw_handle_write_event(DCB *dcb, int epfd) { // still to implement mysql_send_auth_error(dcb, 2, 0, "Authorization failed"); - dcb->func.error(dcb, -1); + dcb->func.error(dcb); if (dcb->session->backends) - dcb->session->backends->func.error(dcb->session->backends, -1); + dcb->session->backends->func.error(dcb->session->backends); return 0; } @@ -451,7 +449,6 @@ void MySQLListener(int epfd, char *config_bind) { char address[1024]=""; int port=0; int one; - struct epoll_event ev; // this gateway, as default, will bind on port 4404 for localhost only (config_bind != NULL) ? (bind_address_and_port = config_bind) : (bind_address_and_port = "127.0.0.1:4406"); @@ -511,15 +508,9 @@ void MySQLListener(int epfd, char *config_bind) { // assign l_so to dcb listener->fd = l_so; - // register events, don't add EPOLLET for now - ev.events = EPOLLIN; - - // set user data to dcb struct - ev.data.ptr = listener; - - // add listening socket to epoll structure - if (epoll_ctl(epfd, EPOLL_CTL_ADD, l_so, &ev) == -1) { - perror("epoll_ctl: listen_sock"); + // add listening socket to poll structure + if (poll_add_dcb(listener) == -1) { + perror("poll_add_dcb: listen_sock"); exit(EXIT_FAILURE); } @@ -529,7 +520,7 @@ void MySQLListener(int epfd, char *config_bind) { } -int MySQLAccept(DCB *listener, int efd) { +int MySQLAccept(DCB *listener) { fprintf(stderr, "MySQL Listener socket is: %i\n", listener->fd); @@ -538,7 +529,6 @@ int MySQLAccept(DCB *listener, int efd) { struct sockaddr_in local; socklen_t addrlen; addrlen = sizeof(local); - struct epoll_event ee; DCB *client; DCB *backend; SESSION *session; @@ -604,14 +594,10 @@ int MySQLAccept(DCB *listener, int efd) { backend->fd = -1; } - // edge triggering flag added - ee.events = EPOLLIN | EPOLLET | EPOLLOUT; - ee.data.ptr = backend; - - // if connected, add it to the epoll + // if connected, add it to the poll if (backend->fd > 0) { - if (epoll_ctl(efd, EPOLL_CTL_ADD, backend->fd, &ee) == -1) { - perror("epoll_ctl: backend sock"); + if (poll_add_dcb(backend) == -1) { + perror("poll_add_dcb: backend sock"); } else { //fprintf(stderr, "--> Backend conn added, bk_fd [%i], scramble [%s], is session with client_fd [%i]\n", ptr_proto->fd, ptr_proto->scramble, client->fd); backend->state = DCB_STATE_POLLING; @@ -635,18 +621,14 @@ int MySQLAccept(DCB *listener, int efd) { (client->func).write = MySQLWrite; (client->func).write_ready = gw_handle_write_event; - // edge triggering flag added - ee.events = EPOLLIN | EPOLLOUT | EPOLLET; - ee.data.ptr = client; - client->state = DCB_STATE_IDLE; // event install - if (epoll_ctl(efd, EPOLL_CTL_ADD, c_sock, &ee) == -1) { - perror("epoll_ctl: conn_sock"); + if (poll_add_dcb(client) == -1) { + perror("poll_add_dcb: conn_sock"); exit(EXIT_FAILURE); } else { - //fprintf(stderr, "Added fd %i to epoll, protocol state [%i]\n", c_sock , client->state); + //fprintf(stderr, "Added fd %i to poll, protocol state [%i]\n", c_sock , client->state); client->state = DCB_STATE_POLLING; } diff --git a/include/dcb.h b/include/dcb.h index 631570620..3f4d53db4 100644 --- a/include/dcb.h +++ b/include/dcb.h @@ -65,15 +65,15 @@ struct dcb; * @see load_module */ typedef struct gw_protocol { - int (*read)(struct dcb *, int); + int (*read)(struct dcb *); int (*write)(struct dcb *, GWBUF *); - int (*write_ready)(struct dcb *, int); - int (*error)(struct dcb *, int); - int (*hangup)(struct dcb *, int); - int (*accept)(struct dcb *, int); - int (*connect)(struct server *, struct session *, int); - int (*close)(struct dcb *, int); - int (*listen)(struct dcb *, int, char *); + int (*write_ready)(struct dcb *); + int (*error)(struct dcb *); + int (*hangup)(struct dcb *); + int (*accept)(struct dcb *); + int (*connect)(struct server *, struct session *); + int (*close)(struct dcb *); + int (*listen)(struct dcb *, char *); } GWPROTOCOL; /** @@ -131,7 +131,7 @@ extern DCB *connect_dcb(struct server *, struct session *, const char *); extern int dcb_read(DCB *, GWBUF **); /* Generic read routine */ extern int dcb_write(DCB *, GWBUF *); /* Generic write routine */ extern int dcb_drain_writeq(DCB *); /* Generic write routine */ -extern void dcb_close(DCB *, int); /* Generic close functionality */ +extern void dcb_close(DCB *); /* Generic close functionality */ extern void printAllDCBs(); /* Debug to print all DCB in the system */ extern void printDCB(DCB *); /* Debug print routine */ extern void dprintAllDCBs(DCB *); /* Debug to print all DCB in the system */ diff --git a/include/gw.h b/include/gw.h index 929045dfd..04dd02a71 100644 --- a/include/gw.h +++ b/include/gw.h @@ -51,10 +51,10 @@ #include "dcb.h" int do_read_dcb(DCB *dcb); -int handle_event_errors(DCB *dcb, int event); -int handle_event_errors_backend(DCB *dcb, int event); +int handle_event_errors(DCB *dcb); +int handle_event_errors_backend(DCB *dcb); void MySQLListener(int epfd, char *config_bind); -int MySQLAccept(DCB *listener, int efd); +int MySQLAccept(DCB *listener); int gw_mysql_do_authentication(DCB *dcb, GWBUF *); void gw_mysql_close(MySQLProtocol **ptr); char *gw_strend(register const char *s); @@ -62,5 +62,5 @@ int do_read_dcb(DCB *dcb); int do_read_10(DCB *dcb, uint8_t *buffer); MySQLProtocol * gw_mysql_init(MySQLProtocol *ptr); int MySQLWrite(DCB *dcb, GWBUF *queue); -int gw_write_backend_event(DCB *dcb, int epfd); -int gw_read_backend_event(DCB *dcb, int epfd); +int gw_write_backend_event(DCB *dcb); +int gw_read_backend_event(DCB *dcb); diff --git a/include/poll.h b/include/poll.h new file mode 100644 index 000000000..12bf8efcd --- /dev/null +++ b/include/poll.h @@ -0,0 +1,40 @@ +#ifndef _POLL_H +#define _POLL_H +/* + * This file is distributed as part of the SkySQL Gateway. It is free + * software: you can redistribute it and/or modify it under the terms of the + * GNU General Public License as published by the Free Software Foundation, + * version 2. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Copyright SkySQL Ab 2013 + */ +#include + +/** + * @file poll.h The poll related functionality + * + * @verbatim + * Revision History + * + * Date Who Description + * 19/06/13 Mark Riddoch Initial implementation + * + * @endverbatim + */ +#define MAX_EVENTS 1000 + +extern void poll_init(); +extern int poll_add_dcb(DCB *); +extern int poll_remove_dcb(DCB *); +extern void poll(); +extern void dprintPollStats(DCB *); +#endif diff --git a/include/service.h b/include/service.h index d09f761a9..378ac5de0 100644 --- a/include/service.h +++ b/include/service.h @@ -95,7 +95,7 @@ extern SERVICE *service_alloc(char *, char *); extern int service_free(SERVICE *); extern int serviceAddProtocol(SERVICE *, char *, unsigned short); extern void serviceAddBackend(SERVICE *, SERVER *); -extern int serviceStart(SERVICE *, int); +extern int serviceStart(SERVICE *); extern void printService(SERVICE *); extern void printAllServices(); extern void dprintAllServices(DCB *); diff --git a/modules/include/mysql_client_server_protocol.h b/modules/include/mysql_client_server_protocol.h index b4967ee33..cca3533d2 100644 --- a/modules/include/mysql_client_server_protocol.h +++ b/modules/include/mysql_client_server_protocol.h @@ -32,7 +32,6 @@ #include #include #include -#include #include #include #include diff --git a/modules/protocol/mysql_backend.c b/modules/protocol/mysql_backend.c index 37dc6274f..b183e2ef7 100644 --- a/modules/protocol/mysql_backend.c +++ b/modules/protocol/mysql_backend.c @@ -30,10 +30,10 @@ static char *version_str = "V1.0.0"; -int gw_read_backend_event(DCB* dcb, int epfd); -int gw_write_backend_event(DCB *dcb, int epfd); +int gw_read_backend_event(DCB* dcb); +int gw_write_backend_event(DCB *dcb); int gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue); -int gw_error_backend_event(DCB *dcb, int epfd, int event); +int gw_error_backend_event(DCB *dcb); static GWPROTOCOL MyObject = { gw_read_backend_event, /* Read - EPOLLIN handler */ @@ -86,7 +86,7 @@ GetModuleObject() ////////////////////////////////////////// //backend read event triggered by EPOLLIN ////////////////////////////////////////// -int gw_read_backend_event(DCB *dcb, int epfd) { +int gw_read_backend_event(DCB *dcb) { int n; MySQLProtocol *client_protocol = NULL; @@ -99,7 +99,6 @@ int gw_read_backend_event(DCB *dcb, int epfd) { #endif if ((client_protocol->state == MYSQL_WAITING_RESULT) || (client_protocol->state == MYSQL_IDLE)) { - struct epoll_event new_event; int w; int b = -1; int tot_b = -1; @@ -150,7 +149,7 @@ int gw_read_backend_event(DCB *dcb, int epfd) { ////////////////////////////////////////// //backend write event triggered by EPOLLOUT ////////////////////////////////////////// -int gw_write_backend_event(DCB *dcb, int epfd) { +int gw_write_backend_event(DCB *dcb) { //fprintf(stderr, ">>> gw_write_backend_event for %i\n", dcb->fd); return 0; } @@ -229,8 +228,7 @@ int w, saved_errno = 0; return 0; } -int gw_error_backend_event(DCB *dcb, int epfd, int event) { - struct epoll_event ed; +int gw_error_backend_event(DCB *dcb) { MySQLProtocol *protocol = DCB_PROTOCOL(dcb, MySQLProtocol); fprintf(stderr, "#### Handle Backend error function for %i\n", dcb->fd); @@ -250,8 +248,8 @@ int gw_error_backend_event(DCB *dcb, int epfd, int event) { #endif if (dcb->state != DCB_STATE_LISTENING) { - if (epoll_ctl(epfd, EPOLL_CTL_DEL, dcb->fd, &ed) == -1) { - fprintf(stderr, "Backend epoll_ctl_del: from events check failed to delete %i, [%i]:[%s]\n", dcb->fd, errno, strerror(errno)); + if (poll_remove_dcb(dcb) == -1) { + fprintf(stderr, "Backend poll_remove_dcb: from events check failed to delete %i, [%i]:[%s]\n", dcb->fd, errno, strerror(errno)); } #ifdef GW_EVENT_DEBUG diff --git a/modules/protocol/mysql_client.c b/modules/protocol/mysql_client.c index fb0991f8c..f431c6e4b 100644 --- a/modules/protocol/mysql_client.c +++ b/modules/protocol/mysql_client.c @@ -29,15 +29,16 @@ */ #include "mysql_client_server_protocol.h" +#include "poll.h" static char *version_str = "V1.0.0"; -static int gw_MySQLAccept(DCB *listener, int efd); -static int gw_MySQLListener(DCB *listener, int epfd, char *config_bind); -static int gw_read_client_event(DCB* dcb, int epfd); -static int gw_write_client_event(DCB *dcb, int epfd); +static int gw_MySQLAccept(DCB *listener); +static int gw_MySQLListener(DCB *listener, char *config_bind); +static int gw_read_client_event(DCB* dcb); +static int gw_write_client_event(DCB *dcb); static int gw_MySQLWrite_client(DCB *dcb, GWBUF *queue); -static int gw_error_client_event(DCB *dcb, int epfd, int event); +static int gw_error_client_event(DCB *dcb); static int gw_check_mysql_scramble_data(uint8_t *token, unsigned int token_len, uint8_t *scramble, unsigned int scramble_len, char *username, uint8_t *stage1_hash); static int gw_find_mysql_user_password_sha1(char *username, uint8_t *gateway_password, void *repository); @@ -659,10 +660,9 @@ int w, saved_errno = 0; * Client read event triggered by EPOLLIN * * @param dcb Descriptor control block - * @param epfd Epoll descriptor * @return TRUE on error */ -int gw_read_client_event(DCB* dcb, int epfd) { +int gw_read_client_event(DCB* dcb) { MySQLProtocol *protocol = NULL; uint8_t buffer[MAX_BUFFER_SIZE] = ""; int n = 0; @@ -769,9 +769,9 @@ int gw_read_client_event(DCB* dcb, int epfd) { fprintf(stderr, "COM_QUIT received\n"); if (dcb->session->backends) { dcb->session->backends->func.write(dcb, queue); - (dcb->session->backends->func).error(dcb->session->backends, epfd); + (dcb->session->backends->func).error(dcb->session->backends); } - (dcb->func).error(dcb, epfd); + (dcb->func).error(dcb); return 1; } @@ -799,10 +799,9 @@ int gw_read_client_event(DCB* dcb, int epfd) { /////////////////////////////////////////////// // client write event to Client triggered by EPOLLOUT ////////////////////////////////////////////// -int gw_write_client_event(DCB *dcb, int epfd) { +int gw_write_client_event(DCB *dcb) { MySQLProtocol *protocol = NULL; int n; - struct epoll_event new_event; if (dcb == NULL) { fprintf(stderr, "DCB is NULL, return\n"); @@ -839,7 +838,7 @@ int gw_write_client_event(DCB *dcb, int epfd) { // create one backend connection // This is not working now, as the backend dcb functions are in the mysql_protocol.c // and it will loaded separately - //gw_create_backend_connection(dcb, epfd); + //gw_create_backend_connection(dcb); protocol->state = MYSQL_IDLE; @@ -850,9 +849,9 @@ int gw_write_client_event(DCB *dcb, int epfd) { // still to implement mysql_send_auth_error(dcb, 2, 0, "Authorization failed"); - dcb->func.error(dcb, epfd); + dcb->func.error(dcb); if (dcb->session->backends) - dcb->session->backends->func.error(dcb->session->backends, epfd); + dcb->session->backends->func.error(dcb->session->backends); return 0; } @@ -904,7 +903,7 @@ int gw_write_client_event(DCB *dcb, int epfd) { /// // set listener for mysql protocol /// -int gw_MySQLListener(DCB *listener, int epfd, char *config_bind) { +int gw_MySQLListener(DCB *listener, char *config_bind) { int l_so; int fl; struct sockaddr_in serv_addr; @@ -915,7 +914,6 @@ int gw_MySQLListener(DCB *listener, int epfd, char *config_bind) { char address[1024]=""; int port=0; int one; - struct epoll_event ev; // this gateway, as default, will bind on port 4404 for localhost only (config_bind != NULL) ? (bind_address_and_port = config_bind) : (bind_address_and_port = "127.0.0.1:4406"); @@ -973,15 +971,9 @@ int gw_MySQLListener(DCB *listener, int epfd, char *config_bind) { // assign l_so to dcb listener->fd = l_so; - // register events, don't add EPOLLET for now - ev.events = EPOLLIN; - - // set user data to dcb struct - ev.data.ptr = listener; - - // add listening socket to epoll structure - if (epoll_ctl(epfd, EPOLL_CTL_ADD, l_so, &ev) == -1) { - fprintf(stderr, ">>> epoll_ctl: can't add the listen_sock! Errno %i, %s\n", errno, strerror(errno)); + // add listening socket to poll structure + if (poll_add_dcb(listener) == -1) { + fprintf(stderr, ">>> poll_add_dcb: can't add the listen_sock! Errno %i, %s\n", errno, strerror(errno)); return 1; } @@ -993,7 +985,7 @@ int gw_MySQLListener(DCB *listener, int epfd, char *config_bind) { } -int gw_MySQLAccept(DCB *listener, int efd) { +int gw_MySQLAccept(DCB *listener) { fprintf(stderr, "MySQL Listener socket is: %i\n", listener->fd); @@ -1002,7 +994,6 @@ int gw_MySQLAccept(DCB *listener, int efd) { struct sockaddr_in local; socklen_t addrlen; addrlen = sizeof(local); - struct epoll_event ee; DCB *client; SESSION *session; MySQLProtocol *protocol; @@ -1052,18 +1043,14 @@ int gw_MySQLAccept(DCB *listener, int efd) { // assign function poiters to "func" field memcpy(&client->func, &MyObject, sizeof(GWPROTOCOL)); - // edge triggering flag added - ee.events = EPOLLIN | EPOLLOUT | EPOLLET; - ee.data.ptr = client; - client->state = DCB_STATE_IDLE; // event install - if (epoll_ctl(efd, EPOLL_CTL_ADD, c_sock, &ee) == -1) { - perror("epoll_ctl: conn_sock"); + if (poll_add_dcb(client) == -1) { + perror("poll_add_dcb: conn_sock"); exit(EXIT_FAILURE); } else { - //fprintf(stderr, "Added fd %i to epoll, protocol state [%i]\n", c_sock , client->state); + //fprintf(stderr, "Added fd %i to poll, protocol state [%i]\n", c_sock , client->state); client->state = DCB_STATE_POLLING; } @@ -1081,8 +1068,7 @@ int gw_MySQLAccept(DCB *listener, int efd) { /* */ -static int gw_error_client_event(DCB *dcb, int epfd, int event) { - struct epoll_event ed; +static int gw_error_client_event(DCB *dcb) { MySQLProtocol *protocol = DCB_PROTOCOL(dcb, MySQLProtocol); @@ -1107,8 +1093,8 @@ static int gw_error_client_event(DCB *dcb, int epfd, int event) { #endif if (dcb->state != DCB_STATE_LISTENING) { - if (epoll_ctl(epfd, EPOLL_CTL_DEL, dcb->fd, &ed) == -1) { - fprintf(stderr, "***** epoll_ctl_del: from events check failed to delete %i, [%i]:[%s]\n", dcb->fd, errno, strerror(errno)); + if (poll_remove_dcb(dcb) == -1) { + fprintf(stderr, "***** poll_remove_dcb: from events check failed to delete %i, [%i]:[%s]\n", dcb->fd, errno, strerror(errno)); } #ifdef GW_EVENT_DEBUG diff --git a/modules/protocol/mysql_common.c b/modules/protocol/mysql_common.c index 0b64e312a..5c7f8f60e 100644 --- a/modules/protocol/mysql_common.c +++ b/modules/protocol/mysql_common.c @@ -32,10 +32,10 @@ static char *version_str = "V1.0.0"; static MySQLProtocol *gw_mysql_init(MySQLProtocol *data); static void gw_mysql_close(MySQLProtocol **ptr); -extern gw_read_backend_event(DCB* dcb, int epfd); -extern gw_write_backend_event(DCB *dcb, int epfd); +extern gw_read_backend_event(DCB* dcb); +extern gw_write_backend_event(DCB *dcb); extern int gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue); -extern int gw_error_backend_event(DCB *dcb, int epfd, int event); +extern int gw_error_backend_event(DCB *dcb); /////////////////////////////// // Initialize mysql protocol struct @@ -101,14 +101,13 @@ void gw_mysql_close(MySQLProtocol **ptr) { * Create a new MySQL backend connection. * * This routine performs the MySQL connection to the backend and fills the session->backends of the callier dcb - * with the new allocatetd dcb and adds the new socket to the epoll set + * with the new allocatetd dcb and adds the new socket to the poll set * * - backend dcb allocation * - MySQL session data fetch * - backend connection using data in MySQL session * * @param client_dcb The client DCB struct - * @param epfd The epoll set to add the new connection * @return 0 on Success or 1 on Failure. */ @@ -116,8 +115,7 @@ void gw_mysql_close(MySQLProtocol **ptr) { * This function cannot work as it will be called from mysql_client.c but it needs function pointers from mysql_backend.c * They are modules loaded separately!! * -int gw_create_backend_connection(DCB *client_dcb, int efd) { - struct epoll_event ee; +int gw_create_backend_connection(DCB *client_dcb) { DCB *backend = NULL; MySQLProtocol *ptr_proto = NULL; MySQLProtocol *client_protocol = NULL; diff --git a/modules/protocol/telnetd.c b/modules/protocol/telnetd.c index f0a215393..4ff80e9d0 100644 --- a/modules/protocol/telnetd.c +++ b/modules/protocol/telnetd.c @@ -21,13 +21,13 @@ #include #include #include -#include #include #include #include #include #include #include +#include /** * @file telnetd.c - telnet daemon protocol module @@ -52,14 +52,14 @@ static char *version_str = "V1.0.0"; -static int telnetd_read_event(DCB* dcb, int epfd); -static int telnetd_write_event(DCB *dcb, int epfd); +static int telnetd_read_event(DCB* dcb); +static int telnetd_write_event(DCB *dcb); static int telnetd_write(DCB *dcb, GWBUF *queue); -static int telnetd_error(DCB *dcb, int event); -static int telnetd_hangup(DCB *dcb, int event); -static int telnetd_accept(DCB *dcb, int event); -static int telnetd_close(DCB *dcb, int event); -static int telnetd_listen(DCB *dcb, int efd, char *config); +static int telnetd_error(DCB *dcb); +static int telnetd_hangup(DCB *dcb); +static int telnetd_accept(DCB *dcb); +static int telnetd_close(DCB *dcb); +static int telnetd_listen(DCB *dcb, char *config); /** * The "module object" for the telnetd protocol module. @@ -76,6 +76,9 @@ static GWPROTOCOL MyObject = { telnetd_listen /**< Create a listener */ }; +static void +telnetd_command(DCB *, char *cmd); + /** * Implementation of the mandatory version entry point * @@ -115,11 +118,10 @@ GetModuleObject() * Read event for EPOLLIN on the telnetd protocol module. * * @param dcb The descriptor control block - * @param epfd The epoll descriptor * @return */ static int -telnetd_read_event(DCB* dcb, int epfd) +telnetd_read_event(DCB* dcb) { int n; GWBUF *head = NULL; @@ -130,17 +132,20 @@ void *rsession = session->router_session; if ((n = dcb_read(dcb, &head)) != -1) { + dcb->state = DCB_STATE_PROCESSING; if (head) { char *ptr = GWBUF_DATA(head); ptr = GWBUF_DATA(head); if (*ptr == TELNET_IAC) { + telnetd_command(dcb, ptr + 1); GWBUF_CONSUME(head, 2); } router->routeQuery(router_instance, rsession, head); } } + dcb->state = DCB_STATE_POLLING; return n; } @@ -149,11 +154,10 @@ void *rsession = session->router_session; * EPOLLOUT handler for the telnetd protocol module. * * @param dcb The descriptor control block - * @param epfd The epoll descriptor * @return */ static int -telnetd_write_event(DCB *dcb, int epfd) +telnetd_write_event(DCB *dcb) { return dcb_drain_writeq(dcb); } @@ -177,10 +181,9 @@ telnetd_write(DCB *dcb, GWBUF *queue) * Handler for the EPOLLERR event. * * @param dcb The descriptor control block - * @param event The epoll descriptor */ static int -telnetd_error(DCB *dcb, int event) +telnetd_error(DCB *dcb) { } @@ -188,10 +191,9 @@ telnetd_error(DCB *dcb, int event) * Handler for the EPOLLHUP event. * * @param dcb The descriptor control block - * @param event The epoll descriptor */ static int -telnetd_hangup(DCB *dcb, int event) +telnetd_hangup(DCB *dcb) { } @@ -200,10 +202,9 @@ telnetd_hangup(DCB *dcb, int event) * socket for the protocol. * * @param dcb The descriptor control block - * @param efd The epoll descriptor */ static int -telnetd_accept(DCB *dcb, int efd) +telnetd_accept(DCB *dcb) { int n_connect = 0; @@ -213,7 +214,6 @@ int n_connect = 0; struct sockaddr_in addr; socklen_t addrlen; DCB *client; - struct epoll_event ee; if ((so = accept(dcb->fd, (struct sockaddr *)&addr, &addrlen)) == -1) return n_connect; @@ -225,17 +225,16 @@ int n_connect = 0; memcpy(&client->func, &MyObject, sizeof(GWPROTOCOL)); client->session = session_alloc(dcb->session->service, client); - ee.events = EPOLLIN | EPOLLOUT | EPOLLET; - ee.data.ptr = client; client->state = DCB_STATE_IDLE; - if (epoll_ctl(efd, EPOLL_CTL_ADD, so, &ee) == -1) + if (poll_add_dcb(client) == -1) { return n_connect; } n_connect++; dcb_printf(client, "Gateway> "); + client->state = DCB_STATE_POLLING; } } return n_connect; @@ -246,27 +245,25 @@ int n_connect = 0; * explicitly close a connection. * * @param dcb The descriptor control block - * @param efd The epoll descriptor */ static int -telnetd_close(DCB *dcb, int efd) +telnetd_close(DCB *dcb) { - dcb_close(dcb, efd); + dcb_close(dcb); } /** * Telnet daemon listener entry point * - * @param efd epoll File descriptor + * @param listener The Listener DCB * @param config Configuration (ip:port) */ static int -telnetd_listen(DCB *listener, int efd, char *config) +telnetd_listen(DCB *listener, char *config) { struct sockaddr_in addr; char *port; -struct epoll_event ev; int one = 1; short pnum; @@ -301,11 +298,24 @@ short pnum; listener->state = DCB_STATE_LISTENING; listen(listener->fd, SOMAXCONN); - ev.events = EPOLLIN; - ev.data.ptr = listener; - if (epoll_ctl(efd, EPOLL_CTL_ADD, listener->fd, &ev) == -1) + if (poll_add_dcb(listener) == -1) { return 0; } return 1; } + +/** + * Telnet command implementation + * + * Called for each command in the telnet stream. + * + * Currently we do no command execution + * + * @param dcb The client DCB + * @param cmd The command stream + */ +static void +telnetd_command(DCB *dcb, char *cmd) +{ +} diff --git a/modules/routing/debugcli.c b/modules/routing/debugcli.c index 54153fa05..efca7f00b 100644 --- a/modules/routing/debugcli.c +++ b/modules/routing/debugcli.c @@ -38,6 +38,7 @@ #include #include #include +#include #include static char *version_str = "V1.0.0"; @@ -172,7 +173,7 @@ CLI_SESSION *session = (CLI_SESSION *)router_session; /* * Close the connection to the backend */ - session->session->client->func.close(session->session->client, 0); + session->session->client->func.close(session->session->client); spinlock_acquire(&inst->lock); if (inst->sessions == session) @@ -222,7 +223,7 @@ CLI_SESSION *session = (CLI_SESSION *)router_session; if (execute_cmd(session)) dcb_printf(session->session->client, "Gateway> "); else - session->session->client->func.close(session->session->client, 0); + session->session->client->func.close(session->session->client); } return 1; } @@ -236,6 +237,7 @@ static struct { { "show servers", dprintAllServers }, { "show modules", dprintAllModules }, { "show dcbs", dprintAllDCBs }, + { "show epoll", dprintPollStats }, { NULL, NULL } }; diff --git a/modules/routing/readconnroute.c b/modules/routing/readconnroute.c index 6cf4c0104..863adca7c 100644 --- a/modules/routing/readconnroute.c +++ b/modules/routing/readconnroute.c @@ -237,7 +237,7 @@ CLIENT_SESSION *session = (CLIENT_SESSION *)router_session; /* * Close the connection to the backend */ - session->dcb->func.close(session->dcb, 0); + session->dcb->func.close(session->dcb); atomic_add(&session->backend->count, -1); spinlock_acquire(&inst->lock);