Reformat mascaled.c

This commit is contained in:
Johan Wikman
2016-01-12 14:20:11 +02:00
parent 018b87d304
commit 1b94c5b519

View File

@ -38,11 +38,12 @@
#include <modinfo.h> #include <modinfo.h>
#include <maxscaled.h> #include <maxscaled.h>
MODULE_INFO info = { MODULE_INFO info =
MODULE_API_PROTOCOL, {
MODULE_GA, MODULE_API_PROTOCOL,
GWPROTOCOL_VERSION, MODULE_GA,
"A maxscale protocol for the administration interface" GWPROTOCOL_VERSION,
"A maxscale protocol for the administration interface"
}; };
/** /**
@ -51,8 +52,8 @@ MODULE_INFO info = {
* *
* @verbatim * @verbatim
* Revision History * Revision History
* Date Who Description * Date Who Description
* 13/06/2014 Mark Riddoch Initial implementation * 13/06/2014 Mark Riddoch Initial implementation
* 07/07/15 Martin Brampton Correct failure handling * 07/07/15 Martin Brampton Correct failure handling
* *
* @endverbatim * @endverbatim
@ -72,37 +73,36 @@ static int maxscaled_listen(DCB *dcb, char *config);
/** /**
* The "module object" for the maxscaled protocol module. * The "module object" for the maxscaled protocol module.
*/ */
static GWPROTOCOL MyObject = { static GWPROTOCOL MyObject =
maxscaled_read_event, /**< Read - EPOLLIN handler */ {
maxscaled_write, /**< Write - data from gateway */ maxscaled_read_event, /**< Read - EPOLLIN handler */
maxscaled_write_event, /**< WriteReady - EPOLLOUT handler */ maxscaled_write, /**< Write - data from gateway */
maxscaled_error, /**< Error - EPOLLERR handler */ maxscaled_write_event, /**< WriteReady - EPOLLOUT handler */
maxscaled_hangup, /**< HangUp - EPOLLHUP handler */ maxscaled_error, /**< Error - EPOLLERR handler */
maxscaled_accept, /**< Accept */ maxscaled_hangup, /**< HangUp - EPOLLHUP handler */
NULL, /**< Connect */ maxscaled_accept, /**< Accept */
maxscaled_close, /**< Close */ NULL, /**< Connect */
maxscaled_listen, /**< Create a listener */ maxscaled_close, /**< Close */
NULL, /**< Authentication */ maxscaled_listen, /**< Create a listener */
NULL /**< Session */ NULL, /**< Authentication */
}; NULL /**< Session */
};
/** /**
* Implementation of the mandatory version entry point * Implementation of the mandatory version entry point
* *
* @return version string of the module * @return version string of the module
*/ */
char * char* version()
version()
{ {
return version_str; return version_str;
} }
/** /**
* The module initialisation routine, called when the module * The module initialisation routine, called when the module
* is first loaded. * is first loaded.
*/ */
void void ModuleInit()
ModuleInit()
{ {
MXS_INFO("Initialise MaxScaled Protocol module.");; MXS_INFO("Initialise MaxScaled Protocol module.");;
} }
@ -115,83 +115,85 @@ ModuleInit()
* *
* @return The module object * @return The module object
*/ */
GWPROTOCOL * GWPROTOCOL* GetModuleObject()
GetModuleObject()
{ {
return &MyObject; return &MyObject;
} }
/** /**
* Read event for EPOLLIN on the maxscaled protocol module. * Read event for EPOLLIN on the maxscaled protocol module.
* *
* @param dcb The descriptor control block * @param dcb The descriptor control block
* @return * @return
*/ */
static int static int maxscaled_read_event(DCB* dcb)
maxscaled_read_event(DCB* dcb)
{ {
int n; int n;
GWBUF *head = NULL; GWBUF *head = NULL;
SESSION *session = dcb->session; SESSION *session = dcb->session;
MAXSCALED *maxscaled = (MAXSCALED *)dcb->protocol; MAXSCALED *maxscaled = (MAXSCALED *)dcb->protocol;
char *password; char *password;
if ((n = dcb_read(dcb, &head, 0)) != -1) if ((n = dcb_read(dcb, &head, 0)) != -1)
{ {
if (head)
if (head) {
{ if (GWBUF_LENGTH(head))
if (GWBUF_LENGTH(head)) {
{ switch (maxscaled->state)
switch (maxscaled->state) {
{ case MAXSCALED_STATE_LOGIN:
case MAXSCALED_STATE_LOGIN: maxscaled->username = strndup(GWBUF_DATA(head), GWBUF_LENGTH(head));
maxscaled->username = strndup(GWBUF_DATA(head), GWBUF_LENGTH(head)); maxscaled->state = MAXSCALED_STATE_PASSWD;
maxscaled->state = MAXSCALED_STATE_PASSWD; dcb_printf(dcb, "PASSWORD");
dcb_printf(dcb, "PASSWORD"); while ((head = gwbuf_consume(head, GWBUF_LENGTH(head))) != NULL);
while ((head = gwbuf_consume(head, GWBUF_LENGTH(head))) != NULL); break;
break; case MAXSCALED_STATE_PASSWD:
case MAXSCALED_STATE_PASSWD: password = strndup(GWBUF_DATA(head), GWBUF_LENGTH(head));
password = strndup(GWBUF_DATA(head), GWBUF_LENGTH(head)); if (admin_verify(maxscaled->username, password))
if (admin_verify(maxscaled->username, password)) {
{ dcb_printf(dcb, "OK----");
dcb_printf(dcb, "OK----"); maxscaled->state = MAXSCALED_STATE_DATA;
maxscaled->state = MAXSCALED_STATE_DATA; }
} else
else {
{ dcb_printf(dcb, "FAILED");
dcb_printf(dcb, "FAILED"); maxscaled->state = MAXSCALED_STATE_LOGIN;
maxscaled->state = MAXSCALED_STATE_LOGIN; }
} while ((head = gwbuf_consume(head, GWBUF_LENGTH(head))) != NULL)
while ((head = gwbuf_consume(head, GWBUF_LENGTH(head))) != NULL); {
free(password); ;
break; }
case MAXSCALED_STATE_DATA: free(password);
SESSION_ROUTE_QUERY(session, head); break;
dcb_printf(dcb, "OK"); case MAXSCALED_STATE_DATA:
break; SESSION_ROUTE_QUERY(session, head);
} dcb_printf(dcb, "OK");
} break;
else }
{ }
// Force the free of the buffer header else
while ((head = gwbuf_consume(head, GWBUF_LENGTH(head))) != NULL); {
} // Force the free of the buffer header
} while ((head = gwbuf_consume(head, GWBUF_LENGTH(head))) != NULL)
} {
return n; ;
}
}
}
}
return n;
} }
/** /**
* EPOLLOUT handler for the maxscaled protocol module. * EPOLLOUT handler for the maxscaled protocol module.
* *
* @param dcb The descriptor control block * @param dcb The descriptor control block
* @return * @return
*/ */
static int static int maxscaled_write_event(DCB *dcb)
maxscaled_write_event(DCB *dcb)
{ {
return dcb_drain_writeq(dcb); return dcb_drain_writeq(dcb);
} }
/** /**
@ -200,184 +202,184 @@ maxscaled_write_event(DCB *dcb)
* Writes the content of the buffer queue to the socket * Writes the content of the buffer queue to the socket
* observing the non-blocking principles of MaxScale. * observing the non-blocking principles of MaxScale.
* *
* @param dcb Descriptor Control Block for the socket * @param dcb Descriptor Control Block for the socket
* @param queue Linked list of buffes to write * @param queue Linked list of buffes to write
*/ */
static int static int maxscaled_write(DCB *dcb, GWBUF *queue)
maxscaled_write(DCB *dcb, GWBUF *queue)
{ {
int rc; int rc;
rc = dcb_write(dcb, queue); rc = dcb_write(dcb, queue);
return rc; return rc;
} }
/** /**
* Handler for the EPOLLERR event. * Handler for the EPOLLERR event.
* *
* @param dcb The descriptor control block * @param dcb The descriptor control block
*/ */
static int static int maxscaled_error(DCB *dcb)
maxscaled_error(DCB *dcb)
{ {
return 0; return 0;
} }
/** /**
* Handler for the EPOLLHUP event. * Handler for the EPOLLHUP event.
* *
* @param dcb The descriptor control block * @param dcb The descriptor control block
*/ */
static int static int maxscaled_hangup(DCB *dcb)
maxscaled_hangup(DCB *dcb)
{ {
dcb_close(dcb); dcb_close(dcb);
return 0; return 0;
} }
/** /**
* Handler for the EPOLLIN event when the DCB refers to the listening * Handler for the EPOLLIN event when the DCB refers to the listening
* socket for the protocol. * socket for the protocol.
* *
* @param dcb The descriptor control block * @param dcb The descriptor control block
* @return The number of new connections created * @return The number of new connections created
*/ */
static int static int maxscaled_accept(DCB *dcb)
maxscaled_accept(DCB *dcb)
{ {
int n_connect = 0; int n_connect = 0;
while (1) while (1)
{ {
int so; int so;
struct sockaddr_in addr; struct sockaddr_in addr;
socklen_t addrlen = sizeof(struct sockaddr); socklen_t addrlen = sizeof(struct sockaddr);
DCB *client_dcb; DCB *client_dcb;
MAXSCALED *maxscaled_pr = NULL; MAXSCALED *maxscaled_pr = NULL;
so = accept(dcb->fd, (struct sockaddr *)&addr, &addrlen); so = accept(dcb->fd, (struct sockaddr *)&addr, &addrlen);
if (so == -1)
return n_connect;
else
{
atomic_add(&dcb->stats.n_accepts, 1);
client_dcb = dcb_alloc(DCB_ROLE_REQUEST_HANDLER);
if (client_dcb == NULL)
{
close(so);
return n_connect;
}
client_dcb->fd = so;
client_dcb->remote = strdup(inet_ntoa(addr.sin_addr));
memcpy(&client_dcb->func, &MyObject, sizeof(GWPROTOCOL));
if ((maxscaled_pr = (MAXSCALED *)malloc(sizeof(MAXSCALED))) == NULL)
{
client_dcb->protocol = NULL;
close(so);
dcb_close(client_dcb);
return n_connect;
}
maxscaled_pr->username = NULL;
spinlock_init(&maxscaled_pr->lock);
client_dcb->protocol = (void *)maxscaled_pr;
client_dcb->session = if (so == -1)
session_alloc(dcb->session->service, client_dcb); {
return n_connect;
}
else
{
atomic_add(&dcb->stats.n_accepts, 1);
client_dcb = dcb_alloc(DCB_ROLE_REQUEST_HANDLER);
if (client_dcb == NULL)
{
close(so);
return n_connect;
}
client_dcb->fd = so;
client_dcb->remote = strdup(inet_ntoa(addr.sin_addr));
memcpy(&client_dcb->func, &MyObject, sizeof(GWPROTOCOL));
if ((maxscaled_pr = (MAXSCALED *)malloc(sizeof(MAXSCALED))) == NULL)
{
client_dcb->protocol = NULL;
close(so);
dcb_close(client_dcb);
return n_connect;
}
maxscaled_pr->username = NULL;
spinlock_init(&maxscaled_pr->lock);
client_dcb->protocol = (void *)maxscaled_pr;
if (NULL == client_dcb->session || poll_add_dcb(client_dcb)) client_dcb->session = session_alloc(dcb->session->service, client_dcb);
{
dcb_close(dcb); if (NULL == client_dcb->session || poll_add_dcb(client_dcb))
return n_connect; {
} dcb_close(dcb);
n_connect++; return n_connect;
maxscaled_pr->state = MAXSCALED_STATE_LOGIN; }
dcb_printf(client_dcb, "USER"); n_connect++;
} maxscaled_pr->state = MAXSCALED_STATE_LOGIN;
} dcb_printf(client_dcb, "USER");
return n_connect; }
}
return n_connect;
} }
/** /**
* The close handler for the descriptor. Called by the gateway to * The close handler for the descriptor. Called by the gateway to
* explicitly close a connection. * explicitly close a connection.
* *
* @param dcb The descriptor control block * @param dcb The descriptor control block
*/ */
static int static int maxscaled_close(DCB *dcb)
maxscaled_close(DCB *dcb)
{ {
MAXSCALED *maxscaled = dcb->protocol; MAXSCALED *maxscaled = dcb->protocol;
if (!maxscaled) if (!maxscaled)
return 0; {
return 0;
}
spinlock_acquire(&maxscaled->lock); spinlock_acquire(&maxscaled->lock);
if (maxscaled->username) if (maxscaled->username)
{ {
free(maxscaled->username); free(maxscaled->username);
maxscaled->username = NULL; maxscaled->username = NULL;
} }
spinlock_release(&maxscaled->lock); spinlock_release(&maxscaled->lock);
return 0; return 0;
} }
/** /**
* Maxscale daemon listener entry point * Maxscale daemon listener entry point
* *
* @param listener The Listener DCB * @param listener The Listener DCB
* @param config Configuration (ip:port) * @param config Configuration (ip:port)
*/ */
static int static int maxscaled_listen(DCB *listener, char *config)
maxscaled_listen(DCB *listener, char *config)
{ {
struct sockaddr_in addr; struct sockaddr_in addr;
int one = 1; int one = 1;
int rc; int rc;
memcpy(&listener->func, &MyObject, sizeof(GWPROTOCOL)); memcpy(&listener->func, &MyObject, sizeof(GWPROTOCOL));
if (!parse_bindconfig(config, 6033, &addr)) if (!parse_bindconfig(config, 6033, &addr))
return 0; {
return 0;
}
if ((listener->fd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
{
return 0;
}
if ((listener->fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) // socket options
{ if (setsockopt(listener->fd, SOL_SOCKET, SO_REUSEADDR, (char *)&one, sizeof(one)))
return 0; {
} MXS_ERROR("Unable to set SO_REUSEADDR on maxscale listener.");
}
// set NONBLOCKING mode
setnonblocking(listener->fd);
// bind address and port
if (bind(listener->fd, (struct sockaddr *)&addr, sizeof(addr)) < 0)
{
return 0;
}
// socket options rc = listen(listener->fd, SOMAXCONN);
if (setsockopt(listener->fd, SOL_SOCKET, SO_REUSEADDR, (char *)&one, sizeof(one)))
{
MXS_ERROR("Unable to set SO_REUSEADDR on maxscale listener.");
}
// set NONBLOCKING mode
setnonblocking(listener->fd);
// bind address and port
if (bind(listener->fd, (struct sockaddr *)&addr, sizeof(addr)) < 0)
{
return 0;
}
rc = listen(listener->fd, SOMAXCONN); if (rc == 0)
{
if (rc == 0) { MXS_NOTICE("Listening maxscale connections at %s", config);
MXS_NOTICE("Listening maxscale connections at %s", config); }
} else { else
int eno = errno; {
errno = 0; int eno = errno;
char errbuf[STRERROR_BUFLEN]; errno = 0;
MXS_ERROR("Failed to start listening for maxscale admin connections " char errbuf[STRERROR_BUFLEN];
"due error %d, %s", MXS_ERROR("Failed to start listening for maxscale admin connections "
eno, strerror_r(eno, errbuf, sizeof(errbuf))); "due error %d, %s",
return 0; eno, strerror_r(eno, errbuf, sizeof(errbuf)));
} return 0;
}
if (poll_add_dcb(listener) == -1)
if (poll_add_dcb(listener) == -1) {
{ return 0;
return 0; }
} return 1;
return 1;
} }