Removed the passing of epoll fd and isolated epoll functionality to
a signle file Addition of show epoll debug CLI command
This commit is contained in:
parent
c2b24884fd
commit
0fc2f9dda3
@ -24,11 +24,11 @@ CC=cc
|
||||
CFLAGS=-c -I/usr/include -I../include -Wall -g
|
||||
LDFLAGS=-rdynamic
|
||||
SRCS= atomic.c buffer.c spinlock.c gateway.c gateway_mysql_protocol.c gw_utils.c \
|
||||
utils.c dcb.c load_utils.c session.c service.c server.c
|
||||
utils.c dcb.c load_utils.c session.c service.c server.c poll.c
|
||||
HDRS= ../include/atomic.h ../include/buffer.h ../include/dcb.h \
|
||||
../include/gateway_mysql.h ../include/gw.h ../include/mysql_protocol.h \
|
||||
../include/session.h ../include/spinlock.h ../include/thread.h \
|
||||
../include/modules.h
|
||||
../include/modules.h ../include/poll.h
|
||||
OBJ=$(SRCS:.c=.o)
|
||||
LIBS=-lssl
|
||||
|
||||
|
@ -44,6 +44,7 @@
|
||||
#include <modules.h>
|
||||
#include <errno.h>
|
||||
#include <gw.h>
|
||||
#include <poll.h>
|
||||
|
||||
static DCB *allDCBs = NULL; /* Diagnotics need a list of DCBs */
|
||||
static SPINLOCK *dcbspin = NULL;
|
||||
@ -130,7 +131,6 @@ connect_dcb(SERVER *server, SESSION *session, const char *protocol)
|
||||
{
|
||||
DCB *dcb;
|
||||
GWPROTOCOL *funcs;
|
||||
int epollfd = -1; // Need to work out how to get this
|
||||
|
||||
if ((dcb = alloc_dcb()) == NULL)
|
||||
{
|
||||
@ -144,11 +144,13 @@ int epollfd = -1; // Need to work out how to get this
|
||||
memcpy(&(dcb->func), funcs, sizeof(GWPROTOCOL));
|
||||
dcb->session = session;
|
||||
|
||||
if ((dcb->fd = dcb->func.connect(server, session, epollfd)) == -1)
|
||||
if ((dcb->fd = dcb->func.connect(server, session)) == -1)
|
||||
{
|
||||
free(dcb);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
poll_add_dcb(dcb);
|
||||
/*
|
||||
* We are now connected, the authentication etc will happen as
|
||||
* part of the EPOLLOUT event that will be received once the connection
|
||||
@ -342,9 +344,10 @@ int saved_errno = 0;
|
||||
* @param dcb The DCB to close
|
||||
*/
|
||||
void
|
||||
dcb_close(DCB *dcb, int efd)
|
||||
dcb_close(DCB *dcb)
|
||||
{
|
||||
close(dcb->fd);
|
||||
dcb->state = DCB_STATE_DISCONNECTED;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -259,7 +259,8 @@ dcb.o: dcb.c /usr/include/stdio.h /usr/include/features.h \
|
||||
/usr/include/bits/ioctl-types.h /usr/include/sys/ttydefaults.h \
|
||||
/usr/include/arpa/inet.h \
|
||||
/usr/lib/gcc/x86_64-redhat-linux/4.4.6/include/stdbool.h \
|
||||
../include/gateway_mysql.h ../include/mysql_protocol.h ../include/dcb.h
|
||||
../include/gateway_mysql.h ../include/mysql_protocol.h ../include/dcb.h \
|
||||
../include/poll.h
|
||||
load_utils.o: load_utils.c /usr/include/sys/param.h \
|
||||
/usr/lib/gcc/x86_64-redhat-linux/4.4.6/include/limits.h \
|
||||
/usr/lib/gcc/x86_64-redhat-linux/4.4.6/include/syslimits.h \
|
||||
@ -351,3 +352,24 @@ server.o: server.c /usr/include/stdio.h /usr/include/features.h \
|
||||
../include/server.h ../include/spinlock.h ../include/thread.h \
|
||||
/usr/include/pthread.h /usr/include/sched.h /usr/include/bits/sched.h \
|
||||
/usr/include/bits/setjmp.h ../include/dcb.h ../include/buffer.h
|
||||
poll.o: poll.c /usr/include/stdio.h /usr/include/features.h \
|
||||
/usr/include/sys/cdefs.h /usr/include/bits/wordsize.h \
|
||||
/usr/include/gnu/stubs.h /usr/include/gnu/stubs-64.h \
|
||||
/usr/lib/gcc/x86_64-redhat-linux/4.4.6/include/stddef.h \
|
||||
/usr/include/bits/types.h /usr/include/bits/typesizes.h \
|
||||
/usr/include/libio.h /usr/include/_G_config.h /usr/include/wchar.h \
|
||||
/usr/lib/gcc/x86_64-redhat-linux/4.4.6/include/stdarg.h \
|
||||
/usr/include/bits/stdio_lim.h /usr/include/bits/sys_errlist.h \
|
||||
/usr/include/string.h /usr/include/xlocale.h /usr/include/unistd.h \
|
||||
/usr/include/bits/posix_opt.h /usr/include/bits/environments.h \
|
||||
/usr/include/bits/confname.h /usr/include/getopt.h \
|
||||
/usr/include/sys/epoll.h /usr/include/stdint.h /usr/include/bits/wchar.h \
|
||||
/usr/include/sys/types.h /usr/include/time.h /usr/include/endian.h \
|
||||
/usr/include/bits/endian.h /usr/include/bits/byteswap.h \
|
||||
/usr/include/sys/select.h /usr/include/bits/select.h \
|
||||
/usr/include/bits/sigset.h /usr/include/bits/time.h \
|
||||
/usr/include/sys/sysmacros.h /usr/include/bits/pthreadtypes.h \
|
||||
../include/poll.h ../include/dcb.h ../include/spinlock.h \
|
||||
../include/thread.h /usr/include/pthread.h /usr/include/sched.h \
|
||||
/usr/include/bits/sched.h /usr/include/bits/setjmp.h ../include/buffer.h \
|
||||
../include/atomic.h
|
||||
|
144
core/gateway.c
144
core/gateway.c
@ -28,6 +28,7 @@
|
||||
* 12-06-2013 Mark Riddoch Add the -p option to set the
|
||||
* listening port
|
||||
* and bind addr is 0.0.0.0
|
||||
* 19/06/13 Mark Riddoch Extract the epoll functionality
|
||||
*
|
||||
* @endverbatim
|
||||
*/
|
||||
@ -38,9 +39,6 @@
|
||||
#include <dcb.h>
|
||||
#include <session.h>
|
||||
|
||||
// epoll fd, global!
|
||||
static int epollfd;
|
||||
|
||||
void myfree(void** pp) { free(*pp); *pp = NULL; }
|
||||
|
||||
/* basic signal handling */
|
||||
@ -68,8 +66,7 @@ static void signal_set (int sig, void (*handler)(int)) {
|
||||
}
|
||||
}
|
||||
|
||||
int handle_event_errors(DCB *dcb, int event) {
|
||||
struct epoll_event ed;
|
||||
int handle_event_errors(DCB *dcb) {
|
||||
MySQLProtocol *protocol = DCB_PROTOCOL(dcb, MySQLProtocol);
|
||||
|
||||
fprintf(stderr, "#### Handle error function for [%i] is [%s]\n", dcb->state, gw_dcb_state2string(dcb->state));
|
||||
@ -94,8 +91,8 @@ int handle_event_errors(DCB *dcb, int event) {
|
||||
#endif
|
||||
|
||||
if (dcb->state != DCB_STATE_LISTENING) {
|
||||
if (epoll_ctl(epollfd, EPOLL_CTL_DEL, dcb->fd, &ed) == -1) {
|
||||
fprintf(stderr, "epoll_ctl_del: from events check failed to delete %i, [%i]:[%s]\n", dcb->fd, errno, strerror(errno));
|
||||
if (poll_remove_dcb(dcb) == -1) {
|
||||
fprintf(stderr, "poll_remove_dcb: from events check failed to delete %i, [%i]:[%s]\n", dcb->fd, errno, strerror(errno));
|
||||
}
|
||||
|
||||
#ifdef GW_EVENT_DEBUG
|
||||
@ -128,8 +125,7 @@ int handle_event_errors(DCB *dcb, int event) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
int handle_event_errors_backend(DCB *dcb, int event) {
|
||||
struct epoll_event ed;
|
||||
int handle_event_errors_backend(DCB *dcb) {
|
||||
MySQLProtocol *protocol = DCB_PROTOCOL(dcb, MySQLProtocol);
|
||||
|
||||
fprintf(stderr, "#### Handle Backend error function for %i\n", dcb->fd);
|
||||
@ -149,8 +145,8 @@ int handle_event_errors_backend(DCB *dcb, int event) {
|
||||
#endif
|
||||
|
||||
if (dcb->state != DCB_STATE_LISTENING) {
|
||||
if (epoll_ctl(epollfd, EPOLL_CTL_DEL, dcb->fd, &ed) == -1) {
|
||||
fprintf(stderr, "Backend epoll_ctl_del: from events check failed to delete %i, [%i]:[%s]\n", dcb->fd, errno, strerror(errno));
|
||||
if (poll_remove_dcb(dcb) == -1) {
|
||||
fprintf(stderr, "Backend poll_remove_dcb: from events check failed to delete %i, [%i]:[%s]\n", dcb->fd, errno, strerror(errno));
|
||||
}
|
||||
|
||||
#ifdef GW_EVENT_DEBUG
|
||||
@ -171,8 +167,6 @@ main(int argc, char **argv)
|
||||
{
|
||||
int daemon_mode = 1;
|
||||
sigset_t sigset;
|
||||
struct epoll_event events[MAX_EVENTS];
|
||||
struct epoll_event ev;
|
||||
int nfds;
|
||||
int n;
|
||||
unsigned short port = 4406;
|
||||
@ -218,27 +212,28 @@ SERVER *server1, *server2, *server3;
|
||||
|
||||
load_module("testroute", "Router");
|
||||
|
||||
if (sigfillset(&sigset) != 0) {
|
||||
fprintf(stderr, "sigfillset() error %s\n", strerror(errno));
|
||||
return 1;
|
||||
}
|
||||
if (daemon_mode == 1)
|
||||
{
|
||||
if (sigfillset(&sigset) != 0) {
|
||||
fprintf(stderr, "sigfillset() error %s\n", strerror(errno));
|
||||
return 1;
|
||||
}
|
||||
|
||||
if (sigdelset(&sigset, SIGHUP) != 0) {
|
||||
fprintf(stderr, "sigdelset(SIGHUP) error %s\n", strerror(errno));
|
||||
}
|
||||
if (sigdelset(&sigset, SIGHUP) != 0) {
|
||||
fprintf(stderr, "sigdelset(SIGHUP) error %s\n", strerror(errno));
|
||||
}
|
||||
|
||||
if (sigdelset(&sigset, SIGTERM) != 0) {
|
||||
fprintf(stderr, "sigdelset(SIGTERM) error %s\n", strerror(errno));
|
||||
}
|
||||
if (sigdelset(&sigset, SIGTERM) != 0) {
|
||||
fprintf(stderr, "sigdelset(SIGTERM) error %s\n", strerror(errno));
|
||||
}
|
||||
|
||||
if (sigprocmask(SIG_SETMASK, &sigset, NULL) != 0) {
|
||||
fprintf(stderr, "sigprocmask() error %s\n", strerror(errno));
|
||||
}
|
||||
|
||||
signal_set(SIGHUP, sighup_handler);
|
||||
signal_set(SIGTERM, sigterm_handler);
|
||||
if (sigprocmask(SIG_SETMASK, &sigset, NULL) != 0) {
|
||||
fprintf(stderr, "sigprocmask() error %s\n", strerror(errno));
|
||||
}
|
||||
|
||||
signal_set(SIGHUP, sighup_handler);
|
||||
signal_set(SIGTERM, sigterm_handler);
|
||||
|
||||
if (daemon_mode == 1) {
|
||||
gw_daemonize();
|
||||
}
|
||||
|
||||
@ -246,91 +241,18 @@ SERVER *server1, *server2, *server3;
|
||||
|
||||
fprintf(stderr, ">> GATEWAY log is /dev/stderr\n");
|
||||
|
||||
epollfd = epoll_create(MAX_EVENTS);
|
||||
|
||||
if (epollfd == -1) {
|
||||
perror("epoll_create");
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
poll_init();
|
||||
|
||||
/*
|
||||
* Start the service that was created above
|
||||
*/
|
||||
serviceStart(service1, epollfd);
|
||||
serviceStart(service2, epollfd);
|
||||
serviceStart(service1);
|
||||
serviceStart(service2);
|
||||
|
||||
fprintf(stderr, ">> GATEWAY epoll maxevents is %i\n", MAX_EVENTS);
|
||||
fprintf(stderr, ">> GATEWAY poll maxevents is %i\n", MAX_EVENTS);
|
||||
|
||||
// listen to MySQL protocol
|
||||
/*
|
||||
1. create socket
|
||||
2. set reuse
|
||||
3. set nonblock
|
||||
4. listen
|
||||
5. bind
|
||||
6. epoll add event
|
||||
*/
|
||||
// MySQLListener(epollfd, port);
|
||||
|
||||
// event loop for all the descriptors added via epoll_ctl
|
||||
while (1) {
|
||||
nfds = epoll_wait(epollfd, events, MAX_EVENTS, -1);
|
||||
//nfds = epoll_wait(epollfd, events, MAX_EVENTS, 1000);
|
||||
if (nfds == -1 && (errno != EINTR)) {
|
||||
perror("GW: epoll_pwait ERROR");
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
|
||||
#ifdef GW_EVENT_DEBUG
|
||||
fprintf(stderr, "wake from epoll_wait, n. %i events\n", nfds);
|
||||
#endif
|
||||
|
||||
for (n = 0; n < nfds; ++n) {
|
||||
DCB *dcb = (DCB *) (events[n].data.ptr);
|
||||
|
||||
|
||||
#ifdef GW_EVENT_DEBUG
|
||||
fprintf(stderr, "New event %i for socket %i is %i\n", n, dcb->fd, events[n].events);
|
||||
if (events[n].events & EPOLLIN)
|
||||
fprintf(stderr, "New event %i for socket %i is EPOLLIN\n", n, dcb->fd);
|
||||
if (events[n].events & EPOLLOUT)
|
||||
fprintf(stderr, "New event %i for socket %i is EPOLLOUT\n", n, dcb->fd);
|
||||
if (events[n].events & EPOLLPRI)
|
||||
fprintf(stderr, "New event %i for socket %i is EPOLLPRI\n", n, dcb->fd);
|
||||
|
||||
#endif
|
||||
if (events[n].events & (EPOLLERR | EPOLLHUP)) {
|
||||
//fprintf(stderr, "CALL the ERROR pointer\n");
|
||||
(dcb->func).error(dcb, events[n].events);
|
||||
//fprintf(stderr, "CALLED the ERROR pointer\n");
|
||||
|
||||
// go to next event
|
||||
continue;
|
||||
}
|
||||
|
||||
if (events[n].events & EPOLLIN) {
|
||||
// now checking the listening socket
|
||||
if (dcb->state == DCB_STATE_LISTENING) {
|
||||
(dcb->func).accept(dcb, epollfd);
|
||||
|
||||
} else {
|
||||
//fprintf(stderr, "CALL the READ pointer\n");
|
||||
(dcb->func).read(dcb, epollfd);
|
||||
//fprintf(stderr, "CALLED the READ pointer\n");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
if (events[n].events & EPOLLOUT) {
|
||||
if (dcb->state != DCB_STATE_LISTENING) {
|
||||
//fprintf(stderr, "CALL the WRITE pointer\n");
|
||||
(dcb->func).write_ready(dcb, epollfd);
|
||||
//fprintf(stderr, ">>> CALLED the WRITE pointer\n");
|
||||
}
|
||||
}
|
||||
|
||||
} // filedesc loop
|
||||
} // infinite loop
|
||||
|
||||
// End of while (1)
|
||||
while (1)
|
||||
{
|
||||
poll();
|
||||
}
|
||||
} // End of main
|
||||
|
@ -556,7 +556,7 @@ int gw_mysql_read_command(DCB *dcb) {
|
||||
if (packet_no < 0) {
|
||||
fprintf(stderr, "DCB [%i], EPOLLIN Protocol exiting from MYSQL_IDLE [%i], Packet #%i for socket %i, scramble [%s]\n", dcb->state, protocol->state, packet_no, dcb->fd, protocol->scramble);
|
||||
|
||||
(dcb->func).error(dcb, -1);
|
||||
(dcb->func).error(dcb);
|
||||
|
||||
|
||||
fprintf(stderr, "closing fd [%i], from MYSQL_IDLE\n", dcb->fd);
|
||||
|
@ -208,9 +208,9 @@ int gw_read_gwbuff(DCB *dcb, GWBUF **head, int b) {
|
||||
if (b <= 0) {
|
||||
fprintf(stderr, "||| read_gwbuff called with 0 bytes, closing\n");
|
||||
if (dcb->session->backends) {
|
||||
(dcb->session->backends->func).error(dcb->session->backends, -1);
|
||||
(dcb->session->backends->func).error(dcb->session->backends);
|
||||
}
|
||||
dcb->func.error(dcb, -1);
|
||||
dcb->func.error(dcb);
|
||||
return 1;
|
||||
}
|
||||
|
||||
@ -220,9 +220,9 @@ int gw_read_gwbuff(DCB *dcb, GWBUF **head, int b) {
|
||||
/* Bad news, we have run out of memory */
|
||||
/* Error handling */
|
||||
if (dcb->session->backends) {
|
||||
(dcb->session->backends->func).error(dcb->session->backends, -1);
|
||||
(dcb->session->backends->func).error(dcb->session->backends);
|
||||
}
|
||||
(dcb->func).error(dcb, -1);
|
||||
(dcb->func).error(dcb);
|
||||
return 1;
|
||||
}
|
||||
|
||||
@ -235,9 +235,9 @@ int gw_read_gwbuff(DCB *dcb, GWBUF **head, int b) {
|
||||
} else {
|
||||
fprintf(stderr, "Client connection %i error: %i, %s\n", dcb->fd, errno, strerror(errno));;
|
||||
if (dcb->session->backends) {
|
||||
(dcb->session->backends->func).error(dcb->session->backends, -1);
|
||||
(dcb->session->backends->func).error(dcb->session->backends);
|
||||
}
|
||||
(dcb->func).error(dcb, -1);
|
||||
(dcb->func).error(dcb);
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
@ -246,9 +246,9 @@ int gw_read_gwbuff(DCB *dcb, GWBUF **head, int b) {
|
||||
// socket closed
|
||||
fprintf(stderr, "Client connection %i closed: %i, %s\n", dcb->fd, errno, strerror(errno));
|
||||
if (dcb->session->backends) {
|
||||
(dcb->session->backends->func).error(dcb->session->backends, -1);
|
||||
(dcb->session->backends->func).error(dcb->session->backends);
|
||||
}
|
||||
(dcb->func).error(dcb, -1);
|
||||
(dcb->func).error(dcb);
|
||||
return 1;
|
||||
}
|
||||
|
||||
@ -266,18 +266,16 @@ int gw_read_gwbuff(DCB *dcb, GWBUF **head, int b) {
|
||||
* Create a new MySQL backend connection.
|
||||
*
|
||||
* This routine performs the MySQL connection to the backend and fills the session->backends of the callier dcb
|
||||
* with the new allocatetd dcb and adds the new socket to the epoll set
|
||||
* with the new allocatetd dcb and adds the new socket to the poll set
|
||||
*
|
||||
* - backend dcb allocation
|
||||
* - MySQL session data fetch
|
||||
* - backend connection using data in MySQL session
|
||||
*
|
||||
* @param client_dcb The client DCB struct
|
||||
* @param efd The epoll set to add the new connection
|
||||
* @return 0 on Success or 1 on Failure.
|
||||
*/
|
||||
int create_backend_connection(DCB *client_dcb, int efd) {
|
||||
struct epoll_event ee;
|
||||
int create_backend_connection(DCB *client_dcb) {
|
||||
DCB *backend = NULL;
|
||||
MySQLProtocol *ptr_proto = NULL;
|
||||
MySQLProtocol *client_protocol = NULL;
|
||||
@ -304,14 +302,10 @@ int create_backend_connection(DCB *client_dcb, int efd) {
|
||||
backend->fd = -1;
|
||||
}
|
||||
|
||||
// edge triggering flag added
|
||||
ee.events = EPOLLIN | EPOLLET | EPOLLOUT;
|
||||
ee.data.ptr = backend;
|
||||
|
||||
// if connected, add it to the epoll
|
||||
// if connected, add it to the poll
|
||||
if (backend->fd > 0) {
|
||||
if (epoll_ctl(efd, EPOLL_CTL_ADD, backend->fd, &ee) == -1) {
|
||||
perror("epoll_ctl: backend sock");
|
||||
if (poll_add_dcb(backend) == -1) {
|
||||
perror("poll_ctl: backend sock");
|
||||
} else {
|
||||
fprintf(stderr, "--> Backend conn added, bk_fd [%i], scramble [%s], is session with client_fd [%i]\n", ptr_proto->fd, ptr_proto->scramble, client_dcb->fd);
|
||||
backend->state = DCB_STATE_POLLING;
|
||||
|
177
core/poll.c
Normal file
177
core/poll.c
Normal file
@ -0,0 +1,177 @@
|
||||
/*
|
||||
* This file is distributed as part of the SkySQL Gateway. It is free
|
||||
* software: you can redistribute it and/or modify it under the terms of the
|
||||
* GNU General Public License as published by the Free Software Foundation,
|
||||
* version 2.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
|
||||
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
|
||||
* details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License along with
|
||||
* this program; if not, write to the Free Software Foundation, Inc., 51
|
||||
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||
*
|
||||
* Copyright SkySQL Ab 2013
|
||||
*/
|
||||
#include <stdio.h>
|
||||
#include <string.h>
|
||||
#include <unistd.h>
|
||||
#include <sys/epoll.h>
|
||||
#include <poll.h>
|
||||
#include <dcb.h>
|
||||
#include <atomic.h>
|
||||
|
||||
/**
|
||||
* @file poll.c - Abraction of the epoll functionality
|
||||
*
|
||||
* @verbatim
|
||||
* Revision History
|
||||
*
|
||||
* Date Who Description
|
||||
* 19/06/13 Mark Riddoch Initial implementation
|
||||
*
|
||||
* @endverbatim
|
||||
*/
|
||||
|
||||
static int epoll_fd = -1; /**< The epoll file descriptor */
|
||||
|
||||
/**
|
||||
* The polling statistics
|
||||
*/
|
||||
static struct {
|
||||
int n_read; /**< Number of read events */
|
||||
int n_write; /**< Number of write events */
|
||||
int n_error; /**< Number of error events */
|
||||
int n_hup; /**< Number of hangup events */
|
||||
int n_accept; /**< Number of accept events */
|
||||
int n_polls; /**< Number of poll cycles */
|
||||
} pollStats;
|
||||
|
||||
|
||||
/**
|
||||
* Initialise the polling system we are using for the gateway.
|
||||
*
|
||||
* In this case we are using the Linux epoll mechanism
|
||||
*/
|
||||
void
|
||||
poll_init()
|
||||
{
|
||||
if (epoll_fd != -1)
|
||||
return;
|
||||
if ((epoll_fd = epoll_create(MAX_EVENTS)) == -1)
|
||||
{
|
||||
perror("epoll_create");
|
||||
exit(-1);
|
||||
}
|
||||
memset(&pollStats, 0, sizeof(pollStats));
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a DCB to the set of descriptors within the polling
|
||||
* environment.
|
||||
*
|
||||
* @param dcb The descriptor to add to the poll
|
||||
* @return -1 on error or 0 on success
|
||||
*/
|
||||
int
|
||||
poll_add_dcb(DCB *dcb)
|
||||
{
|
||||
struct epoll_event ev;
|
||||
|
||||
ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
|
||||
ev.data.ptr = dcb;
|
||||
|
||||
return epoll_ctl(epoll_fd, EPOLL_CTL_ADD, dcb->fd, &ev);
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove a descriptor from the set of descriptors within the
|
||||
* polling environment.
|
||||
*
|
||||
* @param dcb The descriptor to remove
|
||||
* @return -1 on error or 0 on success
|
||||
*/
|
||||
int
|
||||
poll_remove_dcb(DCB *dcb)
|
||||
{
|
||||
struct epoll_event ev;
|
||||
|
||||
return epoll_ctl(epoll_fd, EPOLL_CTL_DEL, dcb->fd, &ev);
|
||||
}
|
||||
|
||||
/**
|
||||
* The main polling loop
|
||||
*
|
||||
* This routine does the polling and despatches of IO events
|
||||
* to the DCB's
|
||||
*/
|
||||
void
|
||||
poll()
|
||||
{
|
||||
struct epoll_event events[MAX_EVENTS];
|
||||
int i, nfds;
|
||||
|
||||
while (1)
|
||||
{
|
||||
atomic_add(&pollStats.n_polls, 1);
|
||||
if ((nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1)) == -1)
|
||||
{
|
||||
}
|
||||
else
|
||||
{
|
||||
for (i = 0; i < nfds; i++)
|
||||
{
|
||||
DCB *dcb = (DCB *)events[i].data.ptr;
|
||||
__uint32_t ev = events[i].events;
|
||||
|
||||
if (ev & EPOLLERR)
|
||||
{
|
||||
atomic_add(&pollStats.n_error, 1);
|
||||
dcb->func.error(dcb);
|
||||
}
|
||||
if (ev & EPOLLHUP)
|
||||
{
|
||||
atomic_add(&pollStats.n_hup, 1);
|
||||
dcb->func.hangup(dcb);
|
||||
}
|
||||
if (ev & EPOLLIN)
|
||||
{
|
||||
if (dcb->state == DCB_STATE_LISTENING)
|
||||
{
|
||||
atomic_add(&pollStats.n_accept, 1);
|
||||
dcb->func.accept(dcb);
|
||||
}
|
||||
else
|
||||
{
|
||||
atomic_add(&pollStats.n_read, 1);
|
||||
dcb->func.read(dcb);
|
||||
}
|
||||
}
|
||||
if (ev & EPOLLOUT)
|
||||
{
|
||||
atomic_add(&pollStats.n_write, 1);
|
||||
dcb->func.write_ready(dcb);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Debug routine to print the polling statistics
|
||||
*
|
||||
* @param dcb DCB to print to
|
||||
*/
|
||||
void
|
||||
dprintPollStats(DCB *dcb)
|
||||
{
|
||||
dcb_printf(dcb, "Number of epoll cycles: %d\n", pollStats.n_polls);
|
||||
dcb_printf(dcb, "Number of read events: %d\n", pollStats.n_read);
|
||||
dcb_printf(dcb, "Number of write events: %d\n", pollStats.n_write);
|
||||
dcb_printf(dcb, "Number of error events: %d\n", pollStats.n_error);
|
||||
dcb_printf(dcb, "Number of hangup events: %d\n", pollStats.n_hup);
|
||||
dcb_printf(dcb, "Number of accept events: %d\n", pollStats.n_accept);
|
||||
}
|
@ -86,11 +86,10 @@ SERVICE *service;
|
||||
* Also create the router_instance for the service.
|
||||
*
|
||||
* @param service The Service that should be started
|
||||
* @param efd The epoll descriptor
|
||||
* @return Returns the number of listeners created
|
||||
*/
|
||||
int
|
||||
serviceStart(SERVICE *service, int efd)
|
||||
serviceStart(SERVICE *service)
|
||||
{
|
||||
SERV_PROTOCOL *port;
|
||||
int listeners = 0;
|
||||
@ -114,7 +113,7 @@ GWPROTOCOL *funcs;
|
||||
memcpy(&(port->listener->func), funcs, sizeof(GWPROTOCOL));
|
||||
port->listener->session = NULL;
|
||||
sprintf(config_bind, "0.0.0.0:%d", port->port);
|
||||
if (port->listener->func.listen(port->listener, efd, config_bind))
|
||||
if (port->listener->func.listen(port->listener, config_bind))
|
||||
listeners++;
|
||||
port->listener->session = session_alloc(service, port->listener);
|
||||
port->listener->session->state = SESSION_STATE_LISTENER;
|
||||
|
52
core/utils.c
52
core/utils.c
@ -54,7 +54,7 @@ char hex_lower[] = "0123456789abcdefghijklmnopqrstuvwxyz";
|
||||
//backend read event triggered by EPOLLIN
|
||||
//////////////////////////////////////////
|
||||
|
||||
int gw_read_backend_event(DCB *dcb, int epfd) {
|
||||
int gw_read_backend_event(DCB *dcb) {
|
||||
int n;
|
||||
MySQLProtocol *client_protocol = NULL;
|
||||
|
||||
@ -67,7 +67,6 @@ int gw_read_backend_event(DCB *dcb, int epfd) {
|
||||
#endif
|
||||
|
||||
if ((client_protocol->state == MYSQL_WAITING_RESULT) || (client_protocol->state == MYSQL_IDLE)) {
|
||||
struct epoll_event new_event;
|
||||
int w;
|
||||
int b = -1;
|
||||
int tot_b = -1;
|
||||
@ -193,7 +192,7 @@ int w, saved_errno = 0;
|
||||
//backend write event triggered by EPOLLOUT
|
||||
//////////////////////////////////////////
|
||||
|
||||
int gw_write_backend_event(DCB *dcb, int epfd) {
|
||||
int gw_write_backend_event(DCB *dcb) {
|
||||
|
||||
//fprintf(stderr, ">>> gw_write_backend_event for %i\n", dcb->fd);
|
||||
|
||||
@ -309,9 +308,9 @@ int gw_route_read_event(DCB* dcb, int epfd) {
|
||||
fprintf(stderr, "COM_QUIT received\n");
|
||||
if (dcb->session->backends) {
|
||||
dcb->session->backends->func.write(dcb, queue);
|
||||
(dcb->session->backends->func).error(dcb->session->backends, -1);
|
||||
(dcb->session->backends->func).error(dcb->session->backends);
|
||||
}
|
||||
(dcb->func).error(dcb, -1);
|
||||
(dcb->func).error(dcb);
|
||||
|
||||
return 1;
|
||||
}
|
||||
@ -339,10 +338,9 @@ int gw_route_read_event(DCB* dcb, int epfd) {
|
||||
///////////////////////////////////////////////
|
||||
// client write event triggered by EPOLLOUT
|
||||
//////////////////////////////////////////////
|
||||
int gw_handle_write_event(DCB *dcb, int epfd) {
|
||||
int gw_handle_write_event(DCB *dcb) {
|
||||
MySQLProtocol *protocol = NULL;
|
||||
int n;
|
||||
struct epoll_event new_event;
|
||||
|
||||
if (dcb == NULL) {
|
||||
fprintf(stderr, "DCB is NULL, return\n");
|
||||
@ -385,9 +383,9 @@ int gw_handle_write_event(DCB *dcb, int epfd) {
|
||||
// still to implement
|
||||
mysql_send_auth_error(dcb, 2, 0, "Authorization failed");
|
||||
|
||||
dcb->func.error(dcb, -1);
|
||||
dcb->func.error(dcb);
|
||||
if (dcb->session->backends)
|
||||
dcb->session->backends->func.error(dcb->session->backends, -1);
|
||||
dcb->session->backends->func.error(dcb->session->backends);
|
||||
|
||||
return 0;
|
||||
}
|
||||
@ -451,7 +449,6 @@ void MySQLListener(int epfd, char *config_bind) {
|
||||
char address[1024]="";
|
||||
int port=0;
|
||||
int one;
|
||||
struct epoll_event ev;
|
||||
|
||||
// this gateway, as default, will bind on port 4404 for localhost only
|
||||
(config_bind != NULL) ? (bind_address_and_port = config_bind) : (bind_address_and_port = "127.0.0.1:4406");
|
||||
@ -511,15 +508,9 @@ void MySQLListener(int epfd, char *config_bind) {
|
||||
// assign l_so to dcb
|
||||
listener->fd = l_so;
|
||||
|
||||
// register events, don't add EPOLLET for now
|
||||
ev.events = EPOLLIN;
|
||||
|
||||
// set user data to dcb struct
|
||||
ev.data.ptr = listener;
|
||||
|
||||
// add listening socket to epoll structure
|
||||
if (epoll_ctl(epfd, EPOLL_CTL_ADD, l_so, &ev) == -1) {
|
||||
perror("epoll_ctl: listen_sock");
|
||||
// add listening socket to poll structure
|
||||
if (poll_add_dcb(listener) == -1) {
|
||||
perror("poll_add_dcb: listen_sock");
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
|
||||
@ -529,7 +520,7 @@ void MySQLListener(int epfd, char *config_bind) {
|
||||
}
|
||||
|
||||
|
||||
int MySQLAccept(DCB *listener, int efd) {
|
||||
int MySQLAccept(DCB *listener) {
|
||||
|
||||
fprintf(stderr, "MySQL Listener socket is: %i\n", listener->fd);
|
||||
|
||||
@ -538,7 +529,6 @@ int MySQLAccept(DCB *listener, int efd) {
|
||||
struct sockaddr_in local;
|
||||
socklen_t addrlen;
|
||||
addrlen = sizeof(local);
|
||||
struct epoll_event ee;
|
||||
DCB *client;
|
||||
DCB *backend;
|
||||
SESSION *session;
|
||||
@ -604,14 +594,10 @@ int MySQLAccept(DCB *listener, int efd) {
|
||||
backend->fd = -1;
|
||||
}
|
||||
|
||||
// edge triggering flag added
|
||||
ee.events = EPOLLIN | EPOLLET | EPOLLOUT;
|
||||
ee.data.ptr = backend;
|
||||
|
||||
// if connected, add it to the epoll
|
||||
// if connected, add it to the poll
|
||||
if (backend->fd > 0) {
|
||||
if (epoll_ctl(efd, EPOLL_CTL_ADD, backend->fd, &ee) == -1) {
|
||||
perror("epoll_ctl: backend sock");
|
||||
if (poll_add_dcb(backend) == -1) {
|
||||
perror("poll_add_dcb: backend sock");
|
||||
} else {
|
||||
//fprintf(stderr, "--> Backend conn added, bk_fd [%i], scramble [%s], is session with client_fd [%i]\n", ptr_proto->fd, ptr_proto->scramble, client->fd);
|
||||
backend->state = DCB_STATE_POLLING;
|
||||
@ -635,18 +621,14 @@ int MySQLAccept(DCB *listener, int efd) {
|
||||
(client->func).write = MySQLWrite;
|
||||
(client->func).write_ready = gw_handle_write_event;
|
||||
|
||||
// edge triggering flag added
|
||||
ee.events = EPOLLIN | EPOLLOUT | EPOLLET;
|
||||
ee.data.ptr = client;
|
||||
|
||||
client->state = DCB_STATE_IDLE;
|
||||
|
||||
// event install
|
||||
if (epoll_ctl(efd, EPOLL_CTL_ADD, c_sock, &ee) == -1) {
|
||||
perror("epoll_ctl: conn_sock");
|
||||
if (poll_add_dcb(client) == -1) {
|
||||
perror("poll_add_dcb: conn_sock");
|
||||
exit(EXIT_FAILURE);
|
||||
} else {
|
||||
//fprintf(stderr, "Added fd %i to epoll, protocol state [%i]\n", c_sock , client->state);
|
||||
//fprintf(stderr, "Added fd %i to poll, protocol state [%i]\n", c_sock , client->state);
|
||||
client->state = DCB_STATE_POLLING;
|
||||
}
|
||||
|
||||
|
@ -65,15 +65,15 @@ struct dcb;
|
||||
* @see load_module
|
||||
*/
|
||||
typedef struct gw_protocol {
|
||||
int (*read)(struct dcb *, int);
|
||||
int (*read)(struct dcb *);
|
||||
int (*write)(struct dcb *, GWBUF *);
|
||||
int (*write_ready)(struct dcb *, int);
|
||||
int (*error)(struct dcb *, int);
|
||||
int (*hangup)(struct dcb *, int);
|
||||
int (*accept)(struct dcb *, int);
|
||||
int (*connect)(struct server *, struct session *, int);
|
||||
int (*close)(struct dcb *, int);
|
||||
int (*listen)(struct dcb *, int, char *);
|
||||
int (*write_ready)(struct dcb *);
|
||||
int (*error)(struct dcb *);
|
||||
int (*hangup)(struct dcb *);
|
||||
int (*accept)(struct dcb *);
|
||||
int (*connect)(struct server *, struct session *);
|
||||
int (*close)(struct dcb *);
|
||||
int (*listen)(struct dcb *, char *);
|
||||
} GWPROTOCOL;
|
||||
|
||||
/**
|
||||
@ -131,7 +131,7 @@ extern DCB *connect_dcb(struct server *, struct session *, const char *);
|
||||
extern int dcb_read(DCB *, GWBUF **); /* Generic read routine */
|
||||
extern int dcb_write(DCB *, GWBUF *); /* Generic write routine */
|
||||
extern int dcb_drain_writeq(DCB *); /* Generic write routine */
|
||||
extern void dcb_close(DCB *, int); /* Generic close functionality */
|
||||
extern void dcb_close(DCB *); /* Generic close functionality */
|
||||
extern void printAllDCBs(); /* Debug to print all DCB in the system */
|
||||
extern void printDCB(DCB *); /* Debug print routine */
|
||||
extern void dprintAllDCBs(DCB *); /* Debug to print all DCB in the system */
|
||||
|
10
include/gw.h
10
include/gw.h
@ -51,10 +51,10 @@
|
||||
#include "dcb.h"
|
||||
|
||||
int do_read_dcb(DCB *dcb);
|
||||
int handle_event_errors(DCB *dcb, int event);
|
||||
int handle_event_errors_backend(DCB *dcb, int event);
|
||||
int handle_event_errors(DCB *dcb);
|
||||
int handle_event_errors_backend(DCB *dcb);
|
||||
void MySQLListener(int epfd, char *config_bind);
|
||||
int MySQLAccept(DCB *listener, int efd);
|
||||
int MySQLAccept(DCB *listener);
|
||||
int gw_mysql_do_authentication(DCB *dcb, GWBUF *);
|
||||
void gw_mysql_close(MySQLProtocol **ptr);
|
||||
char *gw_strend(register const char *s);
|
||||
@ -62,5 +62,5 @@ int do_read_dcb(DCB *dcb);
|
||||
int do_read_10(DCB *dcb, uint8_t *buffer);
|
||||
MySQLProtocol * gw_mysql_init(MySQLProtocol *ptr);
|
||||
int MySQLWrite(DCB *dcb, GWBUF *queue);
|
||||
int gw_write_backend_event(DCB *dcb, int epfd);
|
||||
int gw_read_backend_event(DCB *dcb, int epfd);
|
||||
int gw_write_backend_event(DCB *dcb);
|
||||
int gw_read_backend_event(DCB *dcb);
|
||||
|
40
include/poll.h
Normal file
40
include/poll.h
Normal file
@ -0,0 +1,40 @@
|
||||
#ifndef _POLL_H
|
||||
#define _POLL_H
|
||||
/*
|
||||
* This file is distributed as part of the SkySQL Gateway. It is free
|
||||
* software: you can redistribute it and/or modify it under the terms of the
|
||||
* GNU General Public License as published by the Free Software Foundation,
|
||||
* version 2.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
|
||||
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
|
||||
* details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License along with
|
||||
* this program; if not, write to the Free Software Foundation, Inc., 51
|
||||
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||
*
|
||||
* Copyright SkySQL Ab 2013
|
||||
*/
|
||||
#include <dcb.h>
|
||||
|
||||
/**
|
||||
* @file poll.h The poll related functionality
|
||||
*
|
||||
* @verbatim
|
||||
* Revision History
|
||||
*
|
||||
* Date Who Description
|
||||
* 19/06/13 Mark Riddoch Initial implementation
|
||||
*
|
||||
* @endverbatim
|
||||
*/
|
||||
#define MAX_EVENTS 1000
|
||||
|
||||
extern void poll_init();
|
||||
extern int poll_add_dcb(DCB *);
|
||||
extern int poll_remove_dcb(DCB *);
|
||||
extern void poll();
|
||||
extern void dprintPollStats(DCB *);
|
||||
#endif
|
@ -95,7 +95,7 @@ extern SERVICE *service_alloc(char *, char *);
|
||||
extern int service_free(SERVICE *);
|
||||
extern int serviceAddProtocol(SERVICE *, char *, unsigned short);
|
||||
extern void serviceAddBackend(SERVICE *, SERVER *);
|
||||
extern int serviceStart(SERVICE *, int);
|
||||
extern int serviceStart(SERVICE *);
|
||||
extern void printService(SERVICE *);
|
||||
extern void printAllServices();
|
||||
extern void dprintAllServices(DCB *);
|
||||
|
@ -32,7 +32,6 @@
|
||||
#include <stdint.h>
|
||||
#include <string.h>
|
||||
#include <openssl/sha.h>
|
||||
#include <sys/epoll.h>
|
||||
#include <sys/ioctl.h>
|
||||
#include <errno.h>
|
||||
#include <sys/socket.h>
|
||||
|
@ -30,10 +30,10 @@
|
||||
|
||||
static char *version_str = "V1.0.0";
|
||||
|
||||
int gw_read_backend_event(DCB* dcb, int epfd);
|
||||
int gw_write_backend_event(DCB *dcb, int epfd);
|
||||
int gw_read_backend_event(DCB* dcb);
|
||||
int gw_write_backend_event(DCB *dcb);
|
||||
int gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue);
|
||||
int gw_error_backend_event(DCB *dcb, int epfd, int event);
|
||||
int gw_error_backend_event(DCB *dcb);
|
||||
|
||||
static GWPROTOCOL MyObject = {
|
||||
gw_read_backend_event, /* Read - EPOLLIN handler */
|
||||
@ -86,7 +86,7 @@ GetModuleObject()
|
||||
//////////////////////////////////////////
|
||||
//backend read event triggered by EPOLLIN
|
||||
//////////////////////////////////////////
|
||||
int gw_read_backend_event(DCB *dcb, int epfd) {
|
||||
int gw_read_backend_event(DCB *dcb) {
|
||||
int n;
|
||||
MySQLProtocol *client_protocol = NULL;
|
||||
|
||||
@ -99,7 +99,6 @@ int gw_read_backend_event(DCB *dcb, int epfd) {
|
||||
#endif
|
||||
|
||||
if ((client_protocol->state == MYSQL_WAITING_RESULT) || (client_protocol->state == MYSQL_IDLE)) {
|
||||
struct epoll_event new_event;
|
||||
int w;
|
||||
int b = -1;
|
||||
int tot_b = -1;
|
||||
@ -150,7 +149,7 @@ int gw_read_backend_event(DCB *dcb, int epfd) {
|
||||
//////////////////////////////////////////
|
||||
//backend write event triggered by EPOLLOUT
|
||||
//////////////////////////////////////////
|
||||
int gw_write_backend_event(DCB *dcb, int epfd) {
|
||||
int gw_write_backend_event(DCB *dcb) {
|
||||
//fprintf(stderr, ">>> gw_write_backend_event for %i\n", dcb->fd);
|
||||
return 0;
|
||||
}
|
||||
@ -229,8 +228,7 @@ int w, saved_errno = 0;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int gw_error_backend_event(DCB *dcb, int epfd, int event) {
|
||||
struct epoll_event ed;
|
||||
int gw_error_backend_event(DCB *dcb) {
|
||||
MySQLProtocol *protocol = DCB_PROTOCOL(dcb, MySQLProtocol);
|
||||
|
||||
fprintf(stderr, "#### Handle Backend error function for %i\n", dcb->fd);
|
||||
@ -250,8 +248,8 @@ int gw_error_backend_event(DCB *dcb, int epfd, int event) {
|
||||
#endif
|
||||
|
||||
if (dcb->state != DCB_STATE_LISTENING) {
|
||||
if (epoll_ctl(epfd, EPOLL_CTL_DEL, dcb->fd, &ed) == -1) {
|
||||
fprintf(stderr, "Backend epoll_ctl_del: from events check failed to delete %i, [%i]:[%s]\n", dcb->fd, errno, strerror(errno));
|
||||
if (poll_remove_dcb(dcb) == -1) {
|
||||
fprintf(stderr, "Backend poll_remove_dcb: from events check failed to delete %i, [%i]:[%s]\n", dcb->fd, errno, strerror(errno));
|
||||
}
|
||||
|
||||
#ifdef GW_EVENT_DEBUG
|
||||
|
@ -29,15 +29,16 @@
|
||||
*/
|
||||
|
||||
#include "mysql_client_server_protocol.h"
|
||||
#include "poll.h"
|
||||
|
||||
static char *version_str = "V1.0.0";
|
||||
|
||||
static int gw_MySQLAccept(DCB *listener, int efd);
|
||||
static int gw_MySQLListener(DCB *listener, int epfd, char *config_bind);
|
||||
static int gw_read_client_event(DCB* dcb, int epfd);
|
||||
static int gw_write_client_event(DCB *dcb, int epfd);
|
||||
static int gw_MySQLAccept(DCB *listener);
|
||||
static int gw_MySQLListener(DCB *listener, char *config_bind);
|
||||
static int gw_read_client_event(DCB* dcb);
|
||||
static int gw_write_client_event(DCB *dcb);
|
||||
static int gw_MySQLWrite_client(DCB *dcb, GWBUF *queue);
|
||||
static int gw_error_client_event(DCB *dcb, int epfd, int event);
|
||||
static int gw_error_client_event(DCB *dcb);
|
||||
|
||||
static int gw_check_mysql_scramble_data(uint8_t *token, unsigned int token_len, uint8_t *scramble, unsigned int scramble_len, char *username, uint8_t *stage1_hash);
|
||||
static int gw_find_mysql_user_password_sha1(char *username, uint8_t *gateway_password, void *repository);
|
||||
@ -659,10 +660,9 @@ int w, saved_errno = 0;
|
||||
* Client read event triggered by EPOLLIN
|
||||
*
|
||||
* @param dcb Descriptor control block
|
||||
* @param epfd Epoll descriptor
|
||||
* @return TRUE on error
|
||||
*/
|
||||
int gw_read_client_event(DCB* dcb, int epfd) {
|
||||
int gw_read_client_event(DCB* dcb) {
|
||||
MySQLProtocol *protocol = NULL;
|
||||
uint8_t buffer[MAX_BUFFER_SIZE] = "";
|
||||
int n = 0;
|
||||
@ -769,9 +769,9 @@ int gw_read_client_event(DCB* dcb, int epfd) {
|
||||
fprintf(stderr, "COM_QUIT received\n");
|
||||
if (dcb->session->backends) {
|
||||
dcb->session->backends->func.write(dcb, queue);
|
||||
(dcb->session->backends->func).error(dcb->session->backends, epfd);
|
||||
(dcb->session->backends->func).error(dcb->session->backends);
|
||||
}
|
||||
(dcb->func).error(dcb, epfd);
|
||||
(dcb->func).error(dcb);
|
||||
|
||||
return 1;
|
||||
}
|
||||
@ -799,10 +799,9 @@ int gw_read_client_event(DCB* dcb, int epfd) {
|
||||
///////////////////////////////////////////////
|
||||
// client write event to Client triggered by EPOLLOUT
|
||||
//////////////////////////////////////////////
|
||||
int gw_write_client_event(DCB *dcb, int epfd) {
|
||||
int gw_write_client_event(DCB *dcb) {
|
||||
MySQLProtocol *protocol = NULL;
|
||||
int n;
|
||||
struct epoll_event new_event;
|
||||
|
||||
if (dcb == NULL) {
|
||||
fprintf(stderr, "DCB is NULL, return\n");
|
||||
@ -839,7 +838,7 @@ int gw_write_client_event(DCB *dcb, int epfd) {
|
||||
// create one backend connection
|
||||
// This is not working now, as the backend dcb functions are in the mysql_protocol.c
|
||||
// and it will loaded separately
|
||||
//gw_create_backend_connection(dcb, epfd);
|
||||
//gw_create_backend_connection(dcb);
|
||||
|
||||
protocol->state = MYSQL_IDLE;
|
||||
|
||||
@ -850,9 +849,9 @@ int gw_write_client_event(DCB *dcb, int epfd) {
|
||||
// still to implement
|
||||
mysql_send_auth_error(dcb, 2, 0, "Authorization failed");
|
||||
|
||||
dcb->func.error(dcb, epfd);
|
||||
dcb->func.error(dcb);
|
||||
if (dcb->session->backends)
|
||||
dcb->session->backends->func.error(dcb->session->backends, epfd);
|
||||
dcb->session->backends->func.error(dcb->session->backends);
|
||||
|
||||
return 0;
|
||||
}
|
||||
@ -904,7 +903,7 @@ int gw_write_client_event(DCB *dcb, int epfd) {
|
||||
///
|
||||
// set listener for mysql protocol
|
||||
///
|
||||
int gw_MySQLListener(DCB *listener, int epfd, char *config_bind) {
|
||||
int gw_MySQLListener(DCB *listener, char *config_bind) {
|
||||
int l_so;
|
||||
int fl;
|
||||
struct sockaddr_in serv_addr;
|
||||
@ -915,7 +914,6 @@ int gw_MySQLListener(DCB *listener, int epfd, char *config_bind) {
|
||||
char address[1024]="";
|
||||
int port=0;
|
||||
int one;
|
||||
struct epoll_event ev;
|
||||
|
||||
// this gateway, as default, will bind on port 4404 for localhost only
|
||||
(config_bind != NULL) ? (bind_address_and_port = config_bind) : (bind_address_and_port = "127.0.0.1:4406");
|
||||
@ -973,15 +971,9 @@ int gw_MySQLListener(DCB *listener, int epfd, char *config_bind) {
|
||||
// assign l_so to dcb
|
||||
listener->fd = l_so;
|
||||
|
||||
// register events, don't add EPOLLET for now
|
||||
ev.events = EPOLLIN;
|
||||
|
||||
// set user data to dcb struct
|
||||
ev.data.ptr = listener;
|
||||
|
||||
// add listening socket to epoll structure
|
||||
if (epoll_ctl(epfd, EPOLL_CTL_ADD, l_so, &ev) == -1) {
|
||||
fprintf(stderr, ">>> epoll_ctl: can't add the listen_sock! Errno %i, %s\n", errno, strerror(errno));
|
||||
// add listening socket to poll structure
|
||||
if (poll_add_dcb(listener) == -1) {
|
||||
fprintf(stderr, ">>> poll_add_dcb: can't add the listen_sock! Errno %i, %s\n", errno, strerror(errno));
|
||||
return 1;
|
||||
}
|
||||
|
||||
@ -993,7 +985,7 @@ int gw_MySQLListener(DCB *listener, int epfd, char *config_bind) {
|
||||
}
|
||||
|
||||
|
||||
int gw_MySQLAccept(DCB *listener, int efd) {
|
||||
int gw_MySQLAccept(DCB *listener) {
|
||||
|
||||
fprintf(stderr, "MySQL Listener socket is: %i\n", listener->fd);
|
||||
|
||||
@ -1002,7 +994,6 @@ int gw_MySQLAccept(DCB *listener, int efd) {
|
||||
struct sockaddr_in local;
|
||||
socklen_t addrlen;
|
||||
addrlen = sizeof(local);
|
||||
struct epoll_event ee;
|
||||
DCB *client;
|
||||
SESSION *session;
|
||||
MySQLProtocol *protocol;
|
||||
@ -1052,18 +1043,14 @@ int gw_MySQLAccept(DCB *listener, int efd) {
|
||||
// assign function poiters to "func" field
|
||||
memcpy(&client->func, &MyObject, sizeof(GWPROTOCOL));
|
||||
|
||||
// edge triggering flag added
|
||||
ee.events = EPOLLIN | EPOLLOUT | EPOLLET;
|
||||
ee.data.ptr = client;
|
||||
|
||||
client->state = DCB_STATE_IDLE;
|
||||
|
||||
// event install
|
||||
if (epoll_ctl(efd, EPOLL_CTL_ADD, c_sock, &ee) == -1) {
|
||||
perror("epoll_ctl: conn_sock");
|
||||
if (poll_add_dcb(client) == -1) {
|
||||
perror("poll_add_dcb: conn_sock");
|
||||
exit(EXIT_FAILURE);
|
||||
} else {
|
||||
//fprintf(stderr, "Added fd %i to epoll, protocol state [%i]\n", c_sock , client->state);
|
||||
//fprintf(stderr, "Added fd %i to poll, protocol state [%i]\n", c_sock , client->state);
|
||||
client->state = DCB_STATE_POLLING;
|
||||
}
|
||||
|
||||
@ -1081,8 +1068,7 @@ int gw_MySQLAccept(DCB *listener, int efd) {
|
||||
|
||||
/*
|
||||
*/
|
||||
static int gw_error_client_event(DCB *dcb, int epfd, int event) {
|
||||
struct epoll_event ed;
|
||||
static int gw_error_client_event(DCB *dcb) {
|
||||
|
||||
|
||||
MySQLProtocol *protocol = DCB_PROTOCOL(dcb, MySQLProtocol);
|
||||
@ -1107,8 +1093,8 @@ static int gw_error_client_event(DCB *dcb, int epfd, int event) {
|
||||
#endif
|
||||
|
||||
if (dcb->state != DCB_STATE_LISTENING) {
|
||||
if (epoll_ctl(epfd, EPOLL_CTL_DEL, dcb->fd, &ed) == -1) {
|
||||
fprintf(stderr, "***** epoll_ctl_del: from events check failed to delete %i, [%i]:[%s]\n", dcb->fd, errno, strerror(errno));
|
||||
if (poll_remove_dcb(dcb) == -1) {
|
||||
fprintf(stderr, "***** poll_remove_dcb: from events check failed to delete %i, [%i]:[%s]\n", dcb->fd, errno, strerror(errno));
|
||||
}
|
||||
|
||||
#ifdef GW_EVENT_DEBUG
|
||||
|
@ -32,10 +32,10 @@ static char *version_str = "V1.0.0";
|
||||
static MySQLProtocol *gw_mysql_init(MySQLProtocol *data);
|
||||
static void gw_mysql_close(MySQLProtocol **ptr);
|
||||
|
||||
extern gw_read_backend_event(DCB* dcb, int epfd);
|
||||
extern gw_write_backend_event(DCB *dcb, int epfd);
|
||||
extern gw_read_backend_event(DCB* dcb);
|
||||
extern gw_write_backend_event(DCB *dcb);
|
||||
extern int gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue);
|
||||
extern int gw_error_backend_event(DCB *dcb, int epfd, int event);
|
||||
extern int gw_error_backend_event(DCB *dcb);
|
||||
|
||||
///////////////////////////////
|
||||
// Initialize mysql protocol struct
|
||||
@ -101,14 +101,13 @@ void gw_mysql_close(MySQLProtocol **ptr) {
|
||||
* Create a new MySQL backend connection.
|
||||
*
|
||||
* This routine performs the MySQL connection to the backend and fills the session->backends of the callier dcb
|
||||
* with the new allocatetd dcb and adds the new socket to the epoll set
|
||||
* with the new allocatetd dcb and adds the new socket to the poll set
|
||||
*
|
||||
* - backend dcb allocation
|
||||
* - MySQL session data fetch
|
||||
* - backend connection using data in MySQL session
|
||||
*
|
||||
* @param client_dcb The client DCB struct
|
||||
* @param epfd The epoll set to add the new connection
|
||||
* @return 0 on Success or 1 on Failure.
|
||||
*/
|
||||
|
||||
@ -116,8 +115,7 @@ void gw_mysql_close(MySQLProtocol **ptr) {
|
||||
* This function cannot work as it will be called from mysql_client.c but it needs function pointers from mysql_backend.c
|
||||
* They are modules loaded separately!!
|
||||
*
|
||||
int gw_create_backend_connection(DCB *client_dcb, int efd) {
|
||||
struct epoll_event ee;
|
||||
int gw_create_backend_connection(DCB *client_dcb) {
|
||||
DCB *backend = NULL;
|
||||
MySQLProtocol *ptr_proto = NULL;
|
||||
MySQLProtocol *client_protocol = NULL;
|
||||
|
@ -21,13 +21,13 @@
|
||||
#include <buffer.h>
|
||||
#include <service.h>
|
||||
#include <session.h>
|
||||
#include <sys/epoll.h>
|
||||
#include <sys/ioctl.h>
|
||||
#include <errno.h>
|
||||
#include <sys/socket.h>
|
||||
#include <netinet/in.h>
|
||||
#include <arpa/inet.h>
|
||||
#include <router.h>
|
||||
#include <poll.h>
|
||||
|
||||
/**
|
||||
* @file telnetd.c - telnet daemon protocol module
|
||||
@ -52,14 +52,14 @@
|
||||
|
||||
static char *version_str = "V1.0.0";
|
||||
|
||||
static int telnetd_read_event(DCB* dcb, int epfd);
|
||||
static int telnetd_write_event(DCB *dcb, int epfd);
|
||||
static int telnetd_read_event(DCB* dcb);
|
||||
static int telnetd_write_event(DCB *dcb);
|
||||
static int telnetd_write(DCB *dcb, GWBUF *queue);
|
||||
static int telnetd_error(DCB *dcb, int event);
|
||||
static int telnetd_hangup(DCB *dcb, int event);
|
||||
static int telnetd_accept(DCB *dcb, int event);
|
||||
static int telnetd_close(DCB *dcb, int event);
|
||||
static int telnetd_listen(DCB *dcb, int efd, char *config);
|
||||
static int telnetd_error(DCB *dcb);
|
||||
static int telnetd_hangup(DCB *dcb);
|
||||
static int telnetd_accept(DCB *dcb);
|
||||
static int telnetd_close(DCB *dcb);
|
||||
static int telnetd_listen(DCB *dcb, char *config);
|
||||
|
||||
/**
|
||||
* The "module object" for the telnetd protocol module.
|
||||
@ -76,6 +76,9 @@ static GWPROTOCOL MyObject = {
|
||||
telnetd_listen /**< Create a listener */
|
||||
};
|
||||
|
||||
static void
|
||||
telnetd_command(DCB *, char *cmd);
|
||||
|
||||
/**
|
||||
* Implementation of the mandatory version entry point
|
||||
*
|
||||
@ -115,11 +118,10 @@ GetModuleObject()
|
||||
* Read event for EPOLLIN on the telnetd protocol module.
|
||||
*
|
||||
* @param dcb The descriptor control block
|
||||
* @param epfd The epoll descriptor
|
||||
* @return
|
||||
*/
|
||||
static int
|
||||
telnetd_read_event(DCB* dcb, int epfd)
|
||||
telnetd_read_event(DCB* dcb)
|
||||
{
|
||||
int n;
|
||||
GWBUF *head = NULL;
|
||||
@ -130,17 +132,20 @@ void *rsession = session->router_session;
|
||||
|
||||
if ((n = dcb_read(dcb, &head)) != -1)
|
||||
{
|
||||
dcb->state = DCB_STATE_PROCESSING;
|
||||
if (head)
|
||||
{
|
||||
char *ptr = GWBUF_DATA(head);
|
||||
ptr = GWBUF_DATA(head);
|
||||
if (*ptr == TELNET_IAC)
|
||||
{
|
||||
telnetd_command(dcb, ptr + 1);
|
||||
GWBUF_CONSUME(head, 2);
|
||||
}
|
||||
router->routeQuery(router_instance, rsession, head);
|
||||
}
|
||||
}
|
||||
dcb->state = DCB_STATE_POLLING;
|
||||
|
||||
return n;
|
||||
}
|
||||
@ -149,11 +154,10 @@ void *rsession = session->router_session;
|
||||
* EPOLLOUT handler for the telnetd protocol module.
|
||||
*
|
||||
* @param dcb The descriptor control block
|
||||
* @param epfd The epoll descriptor
|
||||
* @return
|
||||
*/
|
||||
static int
|
||||
telnetd_write_event(DCB *dcb, int epfd)
|
||||
telnetd_write_event(DCB *dcb)
|
||||
{
|
||||
return dcb_drain_writeq(dcb);
|
||||
}
|
||||
@ -177,10 +181,9 @@ telnetd_write(DCB *dcb, GWBUF *queue)
|
||||
* Handler for the EPOLLERR event.
|
||||
*
|
||||
* @param dcb The descriptor control block
|
||||
* @param event The epoll descriptor
|
||||
*/
|
||||
static int
|
||||
telnetd_error(DCB *dcb, int event)
|
||||
telnetd_error(DCB *dcb)
|
||||
{
|
||||
}
|
||||
|
||||
@ -188,10 +191,9 @@ telnetd_error(DCB *dcb, int event)
|
||||
* Handler for the EPOLLHUP event.
|
||||
*
|
||||
* @param dcb The descriptor control block
|
||||
* @param event The epoll descriptor
|
||||
*/
|
||||
static int
|
||||
telnetd_hangup(DCB *dcb, int event)
|
||||
telnetd_hangup(DCB *dcb)
|
||||
{
|
||||
}
|
||||
|
||||
@ -200,10 +202,9 @@ telnetd_hangup(DCB *dcb, int event)
|
||||
* socket for the protocol.
|
||||
*
|
||||
* @param dcb The descriptor control block
|
||||
* @param efd The epoll descriptor
|
||||
*/
|
||||
static int
|
||||
telnetd_accept(DCB *dcb, int efd)
|
||||
telnetd_accept(DCB *dcb)
|
||||
{
|
||||
int n_connect = 0;
|
||||
|
||||
@ -213,7 +214,6 @@ int n_connect = 0;
|
||||
struct sockaddr_in addr;
|
||||
socklen_t addrlen;
|
||||
DCB *client;
|
||||
struct epoll_event ee;
|
||||
|
||||
if ((so = accept(dcb->fd, (struct sockaddr *)&addr, &addrlen)) == -1)
|
||||
return n_connect;
|
||||
@ -225,17 +225,16 @@ int n_connect = 0;
|
||||
memcpy(&client->func, &MyObject, sizeof(GWPROTOCOL));
|
||||
client->session = session_alloc(dcb->session->service, client);
|
||||
|
||||
ee.events = EPOLLIN | EPOLLOUT | EPOLLET;
|
||||
ee.data.ptr = client;
|
||||
client->state = DCB_STATE_IDLE;
|
||||
|
||||
if (epoll_ctl(efd, EPOLL_CTL_ADD, so, &ee) == -1)
|
||||
if (poll_add_dcb(client) == -1)
|
||||
{
|
||||
return n_connect;
|
||||
}
|
||||
n_connect++;
|
||||
|
||||
dcb_printf(client, "Gateway> ");
|
||||
client->state = DCB_STATE_POLLING;
|
||||
}
|
||||
}
|
||||
return n_connect;
|
||||
@ -246,27 +245,25 @@ int n_connect = 0;
|
||||
* explicitly close a connection.
|
||||
*
|
||||
* @param dcb The descriptor control block
|
||||
* @param efd The epoll descriptor
|
||||
*/
|
||||
|
||||
static int
|
||||
telnetd_close(DCB *dcb, int efd)
|
||||
telnetd_close(DCB *dcb)
|
||||
{
|
||||
dcb_close(dcb, efd);
|
||||
dcb_close(dcb);
|
||||
}
|
||||
|
||||
/**
|
||||
* Telnet daemon listener entry point
|
||||
*
|
||||
* @param efd epoll File descriptor
|
||||
* @param listener The Listener DCB
|
||||
* @param config Configuration (ip:port)
|
||||
*/
|
||||
static int
|
||||
telnetd_listen(DCB *listener, int efd, char *config)
|
||||
telnetd_listen(DCB *listener, char *config)
|
||||
{
|
||||
struct sockaddr_in addr;
|
||||
char *port;
|
||||
struct epoll_event ev;
|
||||
int one = 1;
|
||||
short pnum;
|
||||
|
||||
@ -301,11 +298,24 @@ short pnum;
|
||||
listener->state = DCB_STATE_LISTENING;
|
||||
listen(listener->fd, SOMAXCONN);
|
||||
|
||||
ev.events = EPOLLIN;
|
||||
ev.data.ptr = listener;
|
||||
if (epoll_ctl(efd, EPOLL_CTL_ADD, listener->fd, &ev) == -1)
|
||||
if (poll_add_dcb(listener) == -1)
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
|
||||
/**
|
||||
* Telnet command implementation
|
||||
*
|
||||
* Called for each command in the telnet stream.
|
||||
*
|
||||
* Currently we do no command execution
|
||||
*
|
||||
* @param dcb The client DCB
|
||||
* @param cmd The command stream
|
||||
*/
|
||||
static void
|
||||
telnetd_command(DCB *dcb, char *cmd)
|
||||
{
|
||||
}
|
||||
|
@ -38,6 +38,7 @@
|
||||
#include <atomic.h>
|
||||
#include <spinlock.h>
|
||||
#include <dcb.h>
|
||||
#include <poll.h>
|
||||
#include <debugcli.h>
|
||||
|
||||
static char *version_str = "V1.0.0";
|
||||
@ -172,7 +173,7 @@ CLI_SESSION *session = (CLI_SESSION *)router_session;
|
||||
/*
|
||||
* Close the connection to the backend
|
||||
*/
|
||||
session->session->client->func.close(session->session->client, 0);
|
||||
session->session->client->func.close(session->session->client);
|
||||
|
||||
spinlock_acquire(&inst->lock);
|
||||
if (inst->sessions == session)
|
||||
@ -222,7 +223,7 @@ CLI_SESSION *session = (CLI_SESSION *)router_session;
|
||||
if (execute_cmd(session))
|
||||
dcb_printf(session->session->client, "Gateway> ");
|
||||
else
|
||||
session->session->client->func.close(session->session->client, 0);
|
||||
session->session->client->func.close(session->session->client);
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
@ -236,6 +237,7 @@ static struct {
|
||||
{ "show servers", dprintAllServers },
|
||||
{ "show modules", dprintAllModules },
|
||||
{ "show dcbs", dprintAllDCBs },
|
||||
{ "show epoll", dprintPollStats },
|
||||
{ NULL, NULL }
|
||||
};
|
||||
|
||||
|
@ -237,7 +237,7 @@ CLIENT_SESSION *session = (CLIENT_SESSION *)router_session;
|
||||
/*
|
||||
* Close the connection to the backend
|
||||
*/
|
||||
session->dcb->func.close(session->dcb, 0);
|
||||
session->dcb->func.close(session->dcb);
|
||||
|
||||
atomic_add(&session->backend->count, -1);
|
||||
spinlock_acquire(&inst->lock);
|
||||
|
Loading…
x
Reference in New Issue
Block a user