Addition of routing module diagnostics and options

readconnroute now support the options slave and master to restrict the set of
servers it will connect to
This commit is contained in:
Mark Riddoch 2013-06-26 14:04:56 +02:00
parent 031b6e4978
commit c3f70c863e
11 changed files with 202 additions and 62 deletions

View File

@ -175,6 +175,7 @@ CONFIG_CONTEXT *obj;
else if (!strcmp(type, "service"))
{
char *servers = config_get_value(obj->parameters, "servers");
char *roptions = config_get_value(obj->parameters, "router_options");
if (servers)
{
char *s = strtok(servers, ",");
@ -190,6 +191,15 @@ CONFIG_CONTEXT *obj;
s = strtok(NULL, ",");
}
}
if (roptions)
{
char *s = strtok(roptions, ",");
while (s)
{
serviceAddRouterOption(obj->element, s);
s = strtok(NULL, ",");
}
}
}
else if (!strcmp(type, "listener"))
{

View File

@ -18,8 +18,7 @@
*/
/**
*
* gw_utils.c - A set if utility functions useful within the context
* @file gw_utils.c - A set if utility functions useful within the context
* of the gateway.
*
* Revision History
@ -36,10 +35,14 @@
#include <dcb.h>
#include <session.h>
///
// set ip address in sockect struct
///
void setipaddress(struct in_addr *a, char *p) {
/**
* Set IP address in socket structure in_addr
*
* @param a Pointer to a struct in_addr into which the address is written
* @param p The hostname to lookup
*/
void
setipaddress(struct in_addr *a, char *p) {
struct hostent *h = gethostbyname(p);
if (h == NULL) {
if ((a->s_addr = inet_addr(p)) == -1) {
@ -50,6 +53,10 @@ void setipaddress(struct in_addr *a, char *p) {
}
}
/**
* Daemonize the process by forking and putting the process into the
* background.
*/
void gw_daemonize(void) {
pid_t pid;

View File

@ -74,6 +74,7 @@ SERVICE *service;
service->credentials.name = NULL;
service->credentials.authdata = NULL;
service->users = users_alloc();
service->routerOptions = NULL;
spinlock_acquire(&service_spin);
service->next = allServices;
@ -102,7 +103,8 @@ int listeners = 0;
char config_bind[40];
GWPROTOCOL *funcs;
service->router_instance = service->router->createInstance(service);
service->router_instance = service->router->createInstance(service,
service->routerOptions);
port = service->ports;
while (port)
@ -288,8 +290,8 @@ SERV_PROTOCOL *proto;
/**
* Add a backend database server to a service
*
* @param service
* @param server
* @param service The service to add the server to
* @param server The server to add
*/
void
serviceAddBackend(SERVICE *service, SERVER *server)
@ -300,6 +302,36 @@ serviceAddBackend(SERVICE *service, SERVER *server)
spinlock_release(&service->spin);
}
/**
* Add a router option to a service
*
* @param service The service to add the router option to
* @param option The option string
*/
void
serviceAddRouterOption(SERVICE *service, char *option)
{
int i;
spinlock_acquire(&service->spin);
if (service->routerOptions == NULL)
{
service->routerOptions = (char **)calloc(2, sizeof(char *));
service->routerOptions[0] = strdup(option);
service->routerOptions[1] = NULL;
}
else
{
for (i = 0; service->routerOptions[i]; i++)
;
service->routerOptions = (char **)realloc(service->routerOptions,
(i + 2) * sizeof(char *));
service->routerOptions[i] = strdup(option);
service->routerOptions[i+1] = NULL;
}
spinlock_release(&service->spin);
}
/**
* Set the service user that is used to log in to the backebd servers
* associated with this service.
@ -410,6 +442,8 @@ SERVICE *ptr;
dcb_printf(dcb, "\tService: %s\n", ptr->name);
dcb_printf(dcb, "\tRouter: %s (%p)\n", ptr->routerModule,
ptr->router);
if (ptr->router)
ptr->router->diagnostics(ptr->router_instance, dcb);
dcb_printf(dcb, "\tStarted: %s",
asctime(localtime(&ptr->stats.started)));
dcb_printf(dcb, "\tBackend databases\n");

View File

@ -8,6 +8,7 @@ threads=1
[Test Service]
type=service
router=readconnroute
router_options=slave
servers=server1,server2,server3
user=massi
auth=massi

View File

@ -49,19 +49,19 @@
#include "mysql_protocol.h"
#include "dcb.h"
void gw_daemonize(void);
int do_read_dcb(DCB *dcb);
int handle_event_errors(DCB *dcb);
int handle_event_errors_backend(DCB *dcb);
void MySQLListener(int epfd, char *config_bind);
int MySQLAccept(DCB *listener);
int gw_mysql_do_authentication(DCB *dcb, GWBUF *);
void gw_mysql_close(MySQLProtocol **ptr);
char *gw_strend(register const char *s);
int do_read_dcb(DCB *dcb);
int do_read_10(DCB *dcb, uint8_t *buffer);
MySQLProtocol * gw_mysql_init(MySQLProtocol *ptr);
int MySQLWrite(DCB *dcb, GWBUF *queue);
int gw_write_backend_event(DCB *dcb);
int gw_read_backend_event(DCB *dcb);
int setnonblocking(int fd);
extern void gw_daemonize(void);
extern int do_read_dcb(DCB *dcb);
extern int handle_event_errors(DCB *dcb);
extern int handle_event_errors_backend(DCB *dcb);
extern void MySQLListener(int epfd, char *config_bind);
extern int MySQLAccept(DCB *listener);
extern int gw_mysql_do_authentication(DCB *dcb, GWBUF *);
extern void gw_mysql_close(MySQLProtocol **ptr);
extern char *gw_strend(register const char *s);
extern int do_read_dcb(DCB *dcb);
extern int do_read_10(DCB *dcb, uint8_t *buffer);
extern MySQLProtocol * gw_mysql_init(MySQLProtocol *ptr);
extern int MySQLWrite(DCB *dcb, GWBUF *queue);
extern int gw_write_backend_event(DCB *dcb);
extern int gw_read_backend_event(DCB *dcb);
extern int setnonblocking(int fd);

View File

@ -25,14 +25,16 @@
*
* Date Who Description
* 14/06/13 Mark Riddoch Initial implementation
* 26/06/13 Mark Riddoch Addition of router options
* and the diagnostic entry point
*
*/
#include <service.h>
#include <session.h>
#include <buffer.h>
/*
* the ROUTER handle points to module specific data, so the best we can do
/**
* The ROUTER handle points to module specific data, so the best we can do
* is to make it a void * externally.
*/
typedef void *ROUTER;
@ -50,14 +52,17 @@ typedef void *ROUTER;
* closeSession Called when a session is closed
* routeQuery Called on each query that requires
* routing
* diagnostics Called to force the router to print
* diagnostic output
* @endverbatim
*
* @see load_module
*/
typedef struct router_object {
ROUTER *(*createInstance)(SERVICE *service);
ROUTER *(*createInstance)(SERVICE *service, char **options);
void *(*newSession)(ROUTER *instance, SESSION *session);
void (*closeSession)(ROUTER *instance, void *router_session);
int (*routeQuery)(ROUTER *instance, void *router_session, GWBUF *queue);
void (*diagnostics)(ROUTER *instance, DCB *dcb);
} ROUTER_OBJECT;
#endif

View File

@ -91,6 +91,7 @@ typedef struct service {
* that this service will listen on.
*/
char *routerModule; /**< Name of router module to use */
char **routerOptions;/**< Router specific option strings */
struct router_object
*router; /**< The router we are using */
void *router_instance;
@ -110,6 +111,7 @@ extern SERVICE *service_alloc(char *, char *);
extern int service_free(SERVICE *);
extern int serviceAddProtocol(SERVICE *, char *, unsigned short);
extern void serviceAddBackend(SERVICE *, SERVER *);
extern void serviceAddRouterOption(SERVICE *, char *);
extern int serviceStart(SERVICE *);
extern int serviceStartAll();
extern int serviceStop(SERVICE *);

View File

@ -59,6 +59,8 @@ typedef struct instance {
CLIENT_SESSION *connections; /**< Link list of all the client connections */
SPINLOCK lock; /**< Spinlock for the instance data */
BACKEND **servers; /**< The set of backend servers for this instance */
unsigned int bitmask; /**< Bitmask to apply to server->status */
unsigned int bitvalue; /**< Required value of server->status */
struct instance *next;
} INSTANCE;
#endif

View File

@ -33,12 +33,14 @@ mysql_client.o: mysql_client.c ../include/mysql_client_server_protocol.h \
/usr/include/arpa/inet.h \
/usr/lib/gcc/x86_64-redhat-linux/4.4.6/include/stdbool.h \
/usr/include/fcntl.h /usr/include/bits/fcntl.h /usr/include/bits/stat.h \
../../include/service.h ../../include/spinlock.h ../../include/thread.h \
/usr/include/pthread.h /usr/include/sched.h /usr/include/bits/sched.h \
/usr/include/bits/setjmp.h ../../include/dcb.h ../../include/buffer.h \
../../include/server.h ../../include/router.h ../../include/session.h \
../../include/poll.h ../../include/users.h ../../include/hashtable.h \
../../include/atomic.h
/usr/include/unistd.h /usr/include/bits/posix_opt.h \
/usr/include/bits/environments.h /usr/include/bits/confname.h \
/usr/include/getopt.h ../../include/service.h ../../include/spinlock.h \
../../include/thread.h /usr/include/pthread.h /usr/include/sched.h \
/usr/include/bits/sched.h /usr/include/bits/setjmp.h ../../include/dcb.h \
../../include/buffer.h ../../include/server.h ../../include/router.h \
../../include/session.h ../../include/poll.h ../../include/users.h \
../../include/hashtable.h ../../include/atomic.h
mysql_common.o: mysql_common.c ../include/mysql_client_server_protocol.h \
/usr/include/stdio.h /usr/include/features.h /usr/include/sys/cdefs.h \
/usr/include/bits/wordsize.h /usr/include/gnu/stubs.h \
@ -74,12 +76,14 @@ mysql_common.o: mysql_common.c ../include/mysql_client_server_protocol.h \
/usr/include/arpa/inet.h \
/usr/lib/gcc/x86_64-redhat-linux/4.4.6/include/stdbool.h \
/usr/include/fcntl.h /usr/include/bits/fcntl.h /usr/include/bits/stat.h \
../../include/service.h ../../include/spinlock.h ../../include/thread.h \
/usr/include/pthread.h /usr/include/sched.h /usr/include/bits/sched.h \
/usr/include/bits/setjmp.h ../../include/dcb.h ../../include/buffer.h \
../../include/server.h ../../include/router.h ../../include/session.h \
../../include/poll.h ../../include/users.h ../../include/hashtable.h \
../../include/atomic.h
/usr/include/unistd.h /usr/include/bits/posix_opt.h \
/usr/include/bits/environments.h /usr/include/bits/confname.h \
/usr/include/getopt.h ../../include/service.h ../../include/spinlock.h \
../../include/thread.h /usr/include/pthread.h /usr/include/sched.h \
/usr/include/bits/sched.h /usr/include/bits/setjmp.h ../../include/dcb.h \
../../include/buffer.h ../../include/server.h ../../include/router.h \
../../include/session.h ../../include/poll.h ../../include/users.h \
../../include/hashtable.h ../../include/atomic.h
mysql_backend.o: mysql_backend.c \
../include/mysql_client_server_protocol.h /usr/include/stdio.h \
/usr/include/features.h /usr/include/sys/cdefs.h \
@ -116,12 +120,14 @@ mysql_backend.o: mysql_backend.c \
/usr/include/arpa/inet.h \
/usr/lib/gcc/x86_64-redhat-linux/4.4.6/include/stdbool.h \
/usr/include/fcntl.h /usr/include/bits/fcntl.h /usr/include/bits/stat.h \
../../include/service.h ../../include/spinlock.h ../../include/thread.h \
/usr/include/pthread.h /usr/include/sched.h /usr/include/bits/sched.h \
/usr/include/bits/setjmp.h ../../include/dcb.h ../../include/buffer.h \
../../include/server.h ../../include/router.h ../../include/session.h \
../../include/poll.h ../../include/users.h ../../include/hashtable.h \
../../include/atomic.h
/usr/include/unistd.h /usr/include/bits/posix_opt.h \
/usr/include/bits/environments.h /usr/include/bits/confname.h \
/usr/include/getopt.h ../../include/service.h ../../include/spinlock.h \
../../include/thread.h /usr/include/pthread.h /usr/include/sched.h \
/usr/include/bits/sched.h /usr/include/bits/setjmp.h ../../include/dcb.h \
../../include/buffer.h ../../include/server.h ../../include/router.h \
../../include/session.h ../../include/poll.h ../../include/users.h \
../../include/hashtable.h ../../include/atomic.h
mysql_common.o: mysql_common.c ../include/mysql_client_server_protocol.h \
/usr/include/stdio.h /usr/include/features.h /usr/include/sys/cdefs.h \
/usr/include/bits/wordsize.h /usr/include/gnu/stubs.h \
@ -157,12 +163,14 @@ mysql_common.o: mysql_common.c ../include/mysql_client_server_protocol.h \
/usr/include/arpa/inet.h \
/usr/lib/gcc/x86_64-redhat-linux/4.4.6/include/stdbool.h \
/usr/include/fcntl.h /usr/include/bits/fcntl.h /usr/include/bits/stat.h \
../../include/service.h ../../include/spinlock.h ../../include/thread.h \
/usr/include/pthread.h /usr/include/sched.h /usr/include/bits/sched.h \
/usr/include/bits/setjmp.h ../../include/dcb.h ../../include/buffer.h \
../../include/server.h ../../include/router.h ../../include/session.h \
../../include/poll.h ../../include/users.h ../../include/hashtable.h \
../../include/atomic.h
/usr/include/unistd.h /usr/include/bits/posix_opt.h \
/usr/include/bits/environments.h /usr/include/bits/confname.h \
/usr/include/getopt.h ../../include/service.h ../../include/spinlock.h \
../../include/thread.h /usr/include/pthread.h /usr/include/sched.h \
/usr/include/bits/sched.h /usr/include/bits/setjmp.h ../../include/dcb.h \
../../include/buffer.h ../../include/server.h ../../include/router.h \
../../include/session.h ../../include/poll.h ../../include/users.h \
../../include/hashtable.h ../../include/atomic.h
telnetd.o: telnetd.c /usr/include/stdio.h /usr/include/features.h \
/usr/include/sys/cdefs.h /usr/include/bits/wordsize.h \
/usr/include/gnu/stubs.h /usr/include/gnu/stubs-64.h \

View File

@ -41,16 +41,17 @@
#include <poll.h>
#include <debugcli.h>
static char *version_str = "V1.0.0";
static char *version_str = "V1.0.1";
/* The router entry points */
static ROUTER *createInstance(SERVICE *service);
static ROUTER *createInstance(SERVICE *service, char **options);
static void *newSession(ROUTER *instance, SESSION *session);
static void closeSession(ROUTER *instance, void *router_session);
static int execute(ROUTER *instance, void *router_session, GWBUF *queue);
static void diagnostics(ROUTER *instance, DCB *dcb);
/** The module object definition */
static ROUTER_OBJECT MyObject = { createInstance, newSession, closeSession, execute };
static ROUTER_OBJECT MyObject = { createInstance, newSession, closeSession, execute, diagnostics };
extern int execute_cmd(CLI_SESSION *cli);
@ -104,7 +105,7 @@ GetModuleObject()
* @return The instance data for this new instance
*/
static ROUTER *
createInstance(SERVICE *service)
createInstance(SERVICE *service, char **options)
{
CLI_INSTANCE *inst;
@ -225,3 +226,15 @@ CLI_SESSION *session = (CLI_SESSION *)router_session;
}
return 1;
}
/**
* Display router diagnostics
*
* @param instance Instance of the router
* @param dcb DCB to send diagnostics to
*/
static void
diagnostics(ROUTER *instance, DCB *dcb)
{
return; /* Nothing to do currently */
}

View File

@ -31,6 +31,13 @@
* When two servers have the same number of current connections the one with
* the least number of connections since startup will be used.
*
* The router may also have options associated to it that will limit the
* choice of backend server. Currently two options are supported, the "master"
* option will cause the router to only connect to servers marked as masters
* and the "slave" option will limit connections to routers that are marked
* as slaves. If neither option is specified the router will connect to either
* masters or slaves.
*
* @verbatim
* Revision History
*
@ -40,6 +47,7 @@
* 26/06/13 Mark Riddoch Use server with least connections since
* startup if the number of current
* connections is the same for two servers
* Addition of master and slave options
*
* @endverbatim
*/
@ -55,16 +63,17 @@
#include <dcb.h>
#include <spinlock.h>
static char *version_str = "V1.0.0";
static char *version_str = "V1.0.1";
/* The router entry points */
static ROUTER *createInstance(SERVICE *service);
static ROUTER *createInstance(SERVICE *service, char **options);
static void *newSession(ROUTER *instance, SESSION *session);
static void closeSession(ROUTER *instance, void *router_session);
static int routeQuery(ROUTER *instance, void *router_session, GWBUF *queue);
static void diagnostics(ROUTER *instance, DCB *dcb);
/** The module object definition */
static ROUTER_OBJECT MyObject = { createInstance, newSession, closeSession, routeQuery };
static ROUTER_OBJECT MyObject = { createInstance, newSession, closeSession, routeQuery, diagnostics };
static SPINLOCK instlock;
static INSTANCE *instances;
@ -116,7 +125,7 @@ GetModuleObject()
* @return The instance data for this new instance
*/
static ROUTER *
createInstance(SERVICE *service)
createInstance(SERVICE *service, char **options)
{
INSTANCE *inst;
SERVER *server;
@ -160,6 +169,28 @@ int i, n;
}
inst->servers[n] = NULL;
/*
* Process the options
*/
inst->bitmask = 0;
inst->bitvalue = 0;
if (options)
{
for (i = 0; options[i]; i++)
{
if (!strcasecmp(options[i], "master"))
{
inst->bitmask |= SERVER_MASTER;
inst->bitvalue |= SERVER_MASTER;
}
else if (!strcasecmp(options[i], "slave"))
{
inst->bitmask |= SERVER_MASTER;
inst->bitvalue &= ~SERVER_MASTER;
}
}
}
/*
* We have completed the creation of the instance data, so now
* insert this router instance into the linked list of routers
@ -201,7 +232,8 @@ int i;
/* First find a running server to set as our initial candidate server */
for (i = 0; inst->servers[i]; i++)
{
if (inst->servers[i] && SERVER_IS_RUNNING(inst->servers[i]->server))
if (inst->servers[i] && SERVER_IS_RUNNING(inst->servers[i]->server)
&& (inst->servers[i]->server->status & inst->bitmask) == inst->bitvalue)
{
candidate = inst->servers[i];
break;
@ -222,7 +254,8 @@ int i;
*/
for (i = 1; inst->servers[i]; i++)
{
if (inst->servers[i] && SERVER_IS_RUNNING(inst->servers[i]->server))
if (inst->servers[i] && SERVER_IS_RUNNING(inst->servers[i]->server)
&& (inst->servers[i]->server->status & inst->bitmask) == inst->bitvalue)
{
if (inst->servers[i]->count < candidate->count)
{
@ -324,3 +357,28 @@ CLIENT_SESSION *session = (CLIENT_SESSION *)router_session;
return session->dcb->func.write(session->dcb, queue);
}
/**
* Display router diagnostics
*
* @param instance Instance of the router
* @param dcb DCB to send diagnostics to
*/
static void
diagnostics(ROUTER *instance, DCB *dcb)
{
INSTANCE *inst = (INSTANCE *)instance;
CLIENT_SESSION *session;
int i = 0;
spinlock_acquire(&inst->lock);
session = inst->connections;
while (session)
{
i++;
session = session->next;
}
spinlock_release(&inst->lock);
dcb_printf(dcb, "Number of router sessions: %d\n", i);
}