Formatted readconnroute

Formatted readconnroute files according to the style guide.
This commit is contained in:
Markus Makela
2015-12-01 12:49:43 +02:00
parent 7119ed817d
commit 91eba965fc
2 changed files with 617 additions and 592 deletions

View File

@ -37,7 +37,8 @@
* connections to. This provides the storage for routing module specific data * connections to. This provides the storage for routing module specific data
* that is required for each of the backend servers. * that is required for each of the backend servers.
*/ */
typedef struct backend { typedef struct backend
{
SERVER *server; /*< The server itself */ SERVER *server; /*< The server itself */
int current_connection_count; /*< Number of connections to the server */ int current_connection_count; /*< Number of connections to the server */
int weight; /*< Desired routing weight */ int weight; /*< Desired routing weight */
@ -46,7 +47,8 @@ typedef struct backend {
/** /**
* The client session structure used within this router. * The client session structure used within this router.
*/ */
typedef struct router_client_session { typedef struct router_client_session
{
#if defined(SS_DEBUG) #if defined(SS_DEBUG)
skygw_chk_t rses_chk_top; skygw_chk_t rses_chk_top;
#endif #endif
@ -65,16 +67,17 @@ typedef struct router_client_session {
/** /**
* The statistics for this router instance * The statistics for this router instance
*/ */
typedef struct { typedef struct
{
int n_sessions; /*< Number sessions created */ int n_sessions; /*< Number sessions created */
int n_queries; /*< Number of queries forwarded */ int n_queries; /*< Number of queries forwarded */
} ROUTER_STATS; } ROUTER_STATS;
/** /**
* The per instance data for the router. * The per instance data for the router.
*/ */
typedef struct router_instance { typedef struct router_instance
{
SERVICE *service; /*< Pointer to the service using this router */ SERVICE *service; /*< Pointer to the service using this router */
ROUTER_CLIENT_SES *connections; /*< Link list of all the client connections */ ROUTER_CLIENT_SES *connections; /*< Link list of all the client connections */
SPINLOCK lock; /*< Spinlock for the instance data */ SPINLOCK lock; /*< Spinlock for the instance data */

View File

@ -96,7 +96,8 @@
#include "modutil.h" #include "modutil.h"
MODULE_INFO info = { MODULE_INFO info =
{
MODULE_API_ROUTER, MODULE_API_ROUTER,
MODULE_GA, MODULE_GA,
ROUTER_VERSION, ROUTER_VERSION,
@ -112,23 +113,16 @@ static void closeSession(ROUTER *instance, void *router_session);
static void freeSession(ROUTER *instance, void *router_session); static void freeSession(ROUTER *instance, void *router_session);
static int routeQuery(ROUTER *instance, void *router_session, GWBUF *queue); static int routeQuery(ROUTER *instance, void *router_session, GWBUF *queue);
static void diagnostics(ROUTER *instance, DCB *dcb); static void diagnostics(ROUTER *instance, DCB *dcb);
static void clientReply( static void clientReply(ROUTER *instance, void *router_session, GWBUF *queue,
ROUTER *instance,
void *router_session,
GWBUF *queue,
DCB *backend_dcb); DCB *backend_dcb);
static void handleError( static void handleError(ROUTER *instance, void *router_session, GWBUF *errbuf,
ROUTER *instance, DCB *problem_dcb, error_action_t action, bool *succp);
void *router_session, static int getCapabilities();
GWBUF *errbuf,
DCB *problem_dcb,
error_action_t action,
bool *succp);
static int getCapabilities ();
/** The module object definition */ /** The module object definition */
static ROUTER_OBJECT MyObject = { static ROUTER_OBJECT MyObject =
{
createInstance, createInstance,
newSession, newSession,
closeSession, closeSession,
@ -140,16 +134,12 @@ static ROUTER_OBJECT MyObject = {
getCapabilities getCapabilities
}; };
static bool rses_begin_locked_router_action( static bool rses_begin_locked_router_action(ROUTER_CLIENT_SES* rses);
ROUTER_CLIENT_SES* rses);
static void rses_end_locked_router_action( static void rses_end_locked_router_action(ROUTER_CLIENT_SES* rses);
ROUTER_CLIENT_SES* rses);
static BACKEND *get_root_master( static BACKEND *get_root_master(BACKEND **servers);
BACKEND **servers); static int handle_state_switch(DCB* dcb, DCB_REASON reason, void * routersession);
static int handle_state_switch(
DCB* dcb,DCB_REASON reason, void * routersession);
static SPINLOCK instlock; static SPINLOCK instlock;
static ROUTER_INSTANCE *instances; static ROUTER_INSTANCE *instances;
@ -202,14 +192,15 @@ GetModuleObject()
static ROUTER * static ROUTER *
createInstance(SERVICE *service, char **options) createInstance(SERVICE *service, char **options)
{ {
ROUTER_INSTANCE *inst; ROUTER_INSTANCE *inst;
SERVER *server; SERVER *server;
SERVER_REF *sref; SERVER_REF *sref;
int i, n; int i, n;
BACKEND *backend; BACKEND *backend;
char *weightby; char *weightby;
if ((inst = calloc(1, sizeof(ROUTER_INSTANCE))) == NULL) { if ((inst = calloc(1, sizeof(ROUTER_INSTANCE))) == NULL)
{
return NULL; return NULL;
} }
@ -224,7 +215,7 @@ char *weightby;
for (sref = service->dbref, n = 0; sref; sref = sref->next) for (sref = service->dbref, n = 0; sref; sref = sref->next)
n++; n++;
inst->servers = (BACKEND **)calloc(n + 1, sizeof(BACKEND *)); inst->servers = (BACKEND **) calloc(n + 1, sizeof(BACKEND *));
if (!inst->servers) if (!inst->servers)
{ {
free(inst); free(inst);
@ -236,7 +227,9 @@ char *weightby;
if ((inst->servers[n] = malloc(sizeof(BACKEND))) == NULL) if ((inst->servers[n] = malloc(sizeof(BACKEND))) == NULL)
{ {
for (i = 0; i < n; i++) for (i = 0; i < n; i++)
{
free(inst->servers[i]); free(inst->servers[i]);
}
free(inst->servers); free(inst->servers);
free(inst); free(inst);
return NULL; return NULL;
@ -325,12 +318,12 @@ char *weightby;
{ {
if (!strcasecmp(options[i], "master")) if (!strcasecmp(options[i], "master"))
{ {
inst->bitmask |= (SERVER_MASTER|SERVER_SLAVE); inst->bitmask |= (SERVER_MASTER | SERVER_SLAVE);
inst->bitvalue |= SERVER_MASTER; inst->bitvalue |= SERVER_MASTER;
} }
else if (!strcasecmp(options[i], "slave")) else if (!strcasecmp(options[i], "slave"))
{ {
inst->bitmask |= (SERVER_MASTER|SERVER_SLAVE); inst->bitmask |= (SERVER_MASTER | SERVER_SLAVE);
inst->bitvalue |= SERVER_SLAVE; inst->bitvalue |= SERVER_SLAVE;
} }
else if (!strcasecmp(options[i], "running")) else if (!strcasecmp(options[i], "running"))
@ -358,7 +351,7 @@ char *weightby;
} }
} }
} }
if(inst->bitmask == 0 && inst->bitvalue == 0) if (inst->bitmask == 0 && inst->bitvalue == 0)
{ {
/** No parameters given, use RUNNING as a valid server */ /** No parameters given, use RUNNING as a valid server */
inst->bitmask |= (SERVER_RUNNING); inst->bitmask |= (SERVER_RUNNING);
@ -374,7 +367,7 @@ char *weightby;
instances = inst; instances = inst;
spinlock_release(&instlock); spinlock_release(&instlock);
return (ROUTER *)inst; return(ROUTER *) inst;
} }
/** /**
@ -387,11 +380,11 @@ char *weightby;
static void * static void *
newSession(ROUTER *instance, SESSION *session) newSession(ROUTER *instance, SESSION *session)
{ {
ROUTER_INSTANCE *inst = (ROUTER_INSTANCE *)instance; ROUTER_INSTANCE *inst = (ROUTER_INSTANCE *) instance;
ROUTER_CLIENT_SES *client_rses; ROUTER_CLIENT_SES *client_rses;
BACKEND *candidate = NULL; BACKEND *candidate = NULL;
int i; int i;
BACKEND *master_host = NULL; BACKEND *master_host = NULL;
MXS_DEBUG("%lu [newSession] new router session with session " MXS_DEBUG("%lu [newSession] new router session with session "
"%p, and inst %p.", "%p, and inst %p.",
@ -400,9 +393,10 @@ BACKEND *master_host = NULL;
inst); inst);
client_rses = (ROUTER_CLIENT_SES *)calloc(1, sizeof(ROUTER_CLIENT_SES)); client_rses = (ROUTER_CLIENT_SES *) calloc(1, sizeof(ROUTER_CLIENT_SES));
if (client_rses == NULL) { if (client_rses == NULL)
{
return NULL; return NULL;
} }
@ -434,8 +428,10 @@ BACKEND *master_host = NULL;
* become the new candidate. This has the effect of spreading the * become the new candidate. This has the effect of spreading the
* connections over different servers during periods of very low load. * connections over different servers during periods of very low load.
*/ */
for (i = 0; inst->servers[i]; i++) { for (i = 0; inst->servers[i]; i++)
if(inst->servers[i]) { {
if (inst->servers[i])
{
MXS_DEBUG("%lu [newSession] Examine server in port %d with " MXS_DEBUG("%lu [newSession] Examine server in port %d with "
"%d connections. Status is %s, " "%d connections. Status is %s, "
"inst->bitvalue is %d", "inst->bitvalue is %d",
@ -447,18 +443,24 @@ BACKEND *master_host = NULL;
} }
if (SERVER_IN_MAINT(inst->servers[i]->server)) if (SERVER_IN_MAINT(inst->servers[i]->server))
{
continue; continue;
}
if (inst->servers[i]->weight == 0) if (inst->servers[i]->weight == 0)
{
continue; continue;
}
/* Check server status bits against bitvalue from router_options */ /* Check server status bits against bitvalue from router_options */
if (inst->servers[i] && if (inst->servers[i] &&
SERVER_IS_RUNNING(inst->servers[i]->server) && SERVER_IS_RUNNING(inst->servers[i]->server) &&
(inst->servers[i]->server->status & inst->bitmask & inst->bitvalue)) (inst->servers[i]->server->status & inst->bitmask & inst->bitvalue))
{ {
if (master_host) { if (master_host)
if (inst->servers[i] == master_host && (inst->bitvalue & SERVER_SLAVE)) { {
if (inst->servers[i] == master_host && (inst->bitvalue & SERVER_SLAVE))
{
/* skip root Master here, as it could also be slave of an external server /* skip root Master here, as it could also be slave of an external server
* that is not in the configuration. * that is not in the configuration.
* Intermediate masters (Relay Servers) are also slave and will be selected * Intermediate masters (Relay Servers) are also slave and will be selected
@ -467,7 +469,8 @@ BACKEND *master_host = NULL;
continue; continue;
} }
if (inst->servers[i] == master_host && (inst->bitvalue & SERVER_MASTER)) { if (inst->servers[i] == master_host && (inst->bitvalue & SERVER_MASTER))
{
/* If option is "master" return only the root Master as there /* If option is "master" return only the root Master as there
* could be intermediate masters (Relay Servers) * could be intermediate masters (Relay Servers)
* and they must not be selected. * and they must not be selected.
@ -476,12 +479,15 @@ BACKEND *master_host = NULL;
candidate = master_host; candidate = master_host;
break; break;
} }
} else { }
else
{
/* master_host is NULL, no master server. /* master_host is NULL, no master server.
* If requested router_option is 'master' * If requested router_option is 'master'
* candidate wll be NULL. * candidate wll be NULL.
*/ */
if (inst->bitvalue & SERVER_MASTER) { if (inst->bitvalue & SERVER_MASTER)
{
candidate = NULL; candidate = NULL;
break; break;
} }
@ -522,10 +528,14 @@ BACKEND *master_host = NULL;
* With router_option=slave a master_host could be set, so route traffic there. * With router_option=slave a master_host could be set, so route traffic there.
* Otherwise, just clean up and return NULL * Otherwise, just clean up and return NULL
*/ */
if (!candidate) { if (!candidate)
if (master_host) { {
if (master_host)
{
candidate = master_host; candidate = master_host;
} else { }
else
{
MXS_ERROR("Failed to create new routing session. " MXS_ERROR("Failed to create new routing session. "
"Couldn't find eligible candidate server. Freeing " "Couldn't find eligible candidate server. Freeing "
"allocated resources."); "allocated resources.");
@ -582,7 +592,8 @@ BACKEND *master_host = NULL;
"Connections : %d", "Connections : %d",
candidate->server->unique_name, candidate->server->unique_name,
candidate->current_connection_count); candidate->current_connection_count);
return (void *)client_rses;
return(void *) client_rses;
} }
/** /**
@ -602,13 +613,12 @@ BACKEND *master_host = NULL;
* @details (write detailed description here) * @details (write detailed description here)
* *
*/ */
static void freeSession( static void freeSession(ROUTER* router_instance,
ROUTER* router_instance,
void* router_client_ses) void* router_client_ses)
{ {
ROUTER_INSTANCE* router = (ROUTER_INSTANCE *)router_instance; ROUTER_INSTANCE* router = (ROUTER_INSTANCE *) router_instance;
ROUTER_CLIENT_SES* router_cli_ses = ROUTER_CLIENT_SES* router_cli_ses =
(ROUTER_CLIENT_SES *)router_client_ses; (ROUTER_CLIENT_SES *) router_client_ses;
int prev_val; int prev_val;
prev_val = atomic_add(&router_cli_ses->backend->current_connection_count, -1); prev_val = atomic_add(&router_cli_ses->backend->current_connection_count, -1);
@ -616,16 +626,21 @@ static void freeSession(
spinlock_acquire(&router->lock); spinlock_acquire(&router->lock);
if (router->connections == router_cli_ses) { if (router->connections == router_cli_ses)
{
router->connections = router_cli_ses->next; router->connections = router_cli_ses->next;
} else { }
else
{
ROUTER_CLIENT_SES *ptr = router->connections; ROUTER_CLIENT_SES *ptr = router->connections;
while (ptr != NULL && ptr->next != router_cli_ses) { while (ptr != NULL && ptr->next != router_cli_ses)
{
ptr = ptr->next; ptr = ptr->next;
} }
if (ptr != NULL) { if (ptr != NULL)
{
ptr->next = router_cli_ses->next; ptr->next = router_cli_ses->next;
} }
} }
@ -637,12 +652,11 @@ static void freeSession(
router_cli_ses, router_cli_ses,
router, router,
router_cli_ses->backend->server->port, router_cli_ses->backend->server->port,
prev_val-1); prev_val - 1);
free(router_cli_ses); free(router_cli_ses);
} }
/** /**
* Close a session with the router, this is the mechanism * Close a session with the router, this is the mechanism
* by which a router may cleanup data structure etc. * by which a router may cleanup data structure etc.
@ -653,8 +667,8 @@ static void freeSession(
static void static void
closeSession(ROUTER *instance, void *router_session) closeSession(ROUTER *instance, void *router_session)
{ {
ROUTER_CLIENT_SES *router_cli_ses = (ROUTER_CLIENT_SES *)router_session; ROUTER_CLIENT_SES *router_cli_ses = (ROUTER_CLIENT_SES *) router_session;
DCB* backend_dcb; DCB* backend_dcb;
CHK_CLIENT_RSES(router_cli_ses); CHK_CLIENT_RSES(router_cli_ses);
/** /**
@ -673,7 +687,8 @@ DCB* backend_dcb;
/** /**
* Close the backend server connection * Close the backend server connection
*/ */
if (backend_dcb != NULL) { if (backend_dcb != NULL)
{
CHK_DCB(backend_dcb); CHK_DCB(backend_dcb);
dcb_close(backend_dcb); dcb_close(backend_dcb);
} }
@ -693,8 +708,8 @@ DCB* backend_dcb;
static int static int
routeQuery(ROUTER *instance, void *router_session, GWBUF *queue) routeQuery(ROUTER *instance, void *router_session, GWBUF *queue)
{ {
ROUTER_INSTANCE *inst = (ROUTER_INSTANCE *)instance; ROUTER_INSTANCE *inst = (ROUTER_INSTANCE *) instance;
ROUTER_CLIENT_SES *router_cli_ses = (ROUTER_CLIENT_SES *)router_session; ROUTER_CLIENT_SES *router_cli_ses = (ROUTER_CLIENT_SES *) router_session;
uint8_t *payload = GWBUF_DATA(queue); uint8_t *payload = GWBUF_DATA(queue);
int mysql_command; int mysql_command;
int rc; int rc;
@ -729,21 +744,19 @@ routeQuery(ROUTER *instance, void *router_session, GWBUF *queue)
{ {
MXS_ERROR("Failed to route MySQL command %d to backend " MXS_ERROR("Failed to route MySQL command %d to backend "
"server.%s", "server.%s",
mysql_command,rses_is_closed ? " Session is closed." : ""); mysql_command, rses_is_closed ? " Session is closed." : "");
rc = 0; rc = 0;
while((queue = GWBUF_CONSUME_ALL(queue)) != NULL); while ((queue = GWBUF_CONSUME_ALL(queue)) != NULL);
goto return_rc; goto return_rc;
} }
char* trc = NULL; char* trc = NULL;
switch(mysql_command) { switch (mysql_command)
{
case MYSQL_COM_CHANGE_USER: case MYSQL_COM_CHANGE_USER:
rc = backend_dcb->func.auth( rc = backend_dcb->func.auth(backend_dcb, NULL, backend_dcb->session,
backend_dcb,
NULL,
backend_dcb->session,
queue); queue);
break; break;
case MYSQL_COM_QUERY: case MYSQL_COM_QUERY:
@ -759,10 +772,12 @@ routeQuery(ROUTER *instance, void *router_session, GWBUF *queue)
MXS_INFO("Routed [%s] to '%s'%s%s", MXS_INFO("Routed [%s] to '%s'%s%s",
STRPACKETTYPE(mysql_command), STRPACKETTYPE(mysql_command),
backend_dcb->server->unique_name, backend_dcb->server->unique_name,
trc?": ":".", trc ? ": " : ".",
trc?trc:""); trc ? trc : "");
free(trc); free(trc);
return_rc: return_rc:
return rc; return rc;
} }
@ -775,11 +790,11 @@ return_rc:
static void static void
diagnostics(ROUTER *router, DCB *dcb) diagnostics(ROUTER *router, DCB *dcb)
{ {
ROUTER_INSTANCE *router_inst = (ROUTER_INSTANCE *)router; ROUTER_INSTANCE *router_inst = (ROUTER_INSTANCE *) router;
ROUTER_CLIENT_SES *session; ROUTER_CLIENT_SES *session;
int i = 0; int i = 0;
BACKEND *backend; BACKEND *backend;
char *weightby; char *weightby;
spinlock_acquire(&router_inst->lock); spinlock_acquire(&router_inst->lock);
session = router_inst->connections; session = router_inst->connections;
@ -808,7 +823,7 @@ char *weightby;
backend = router_inst->servers[i]; backend = router_inst->servers[i];
dcb_printf(dcb, "\t\t%-20s %3.1f%% %d\n", dcb_printf(dcb, "\t\t%-20s %3.1f%% %d\n",
backend->server->unique_name, backend->server->unique_name,
(float)backend->weight / 10, (float) backend->weight / 10,
backend->current_connection_count); backend->current_connection_count);
} }
@ -861,7 +876,7 @@ static void handleError(
DCB *client_dcb; DCB *client_dcb;
SESSION *session = problem_dcb->session; SESSION *session = problem_dcb->session;
session_state_t sesstate; session_state_t sesstate;
ROUTER_CLIENT_SES *router_cli_ses = (ROUTER_CLIENT_SES *)router_session; ROUTER_CLIENT_SES *router_cli_ses = (ROUTER_CLIENT_SES *) router_session;
/** Don't handle same error twice on same DCB */ /** Don't handle same error twice on same DCB */
if (problem_dcb->dcb_errhandle_called) if (problem_dcb->dcb_errhandle_called)
@ -904,6 +919,7 @@ static void handleError(
} }
/** to be inline'd */ /** to be inline'd */
/** /**
* @node Acquires lock to router client session if it is not closed. * @node Acquires lock to router client session if it is not closed.
* *
@ -926,11 +942,13 @@ static bool rses_begin_locked_router_action(
CHK_CLIENT_RSES(rses); CHK_CLIENT_RSES(rses);
if (rses->rses_closed) { if (rses->rses_closed)
{
goto return_succp; goto return_succp;
} }
spinlock_acquire(&rses->rses_lock); spinlock_acquire(&rses->rses_lock);
if (rses->rses_closed) { if (rses->rses_closed)
{
spinlock_release(&rses->rses_lock); spinlock_release(&rses->rses_lock);
goto return_succp; goto return_succp;
} }
@ -941,6 +959,7 @@ return_succp:
} }
/** to be inline'd */ /** to be inline'd */
/** /**
* @node Releases router client session lock. * @node Releases router client session lock.
* *
@ -961,7 +980,6 @@ static void rses_end_locked_router_action(
spinlock_release(&rses->rses_lock); spinlock_release(&rses->rses_lock);
} }
static int getCapabilities() static int getCapabilities()
{ {
return RCAP_TYPE_PACKET_INPUT; return RCAP_TYPE_PACKET_INPUT;
@ -980,31 +998,35 @@ static int getCapabilities()
* *
*/ */
static BACKEND *get_root_master(BACKEND **servers) { static BACKEND *get_root_master(BACKEND **servers)
{
int i = 0; int i = 0;
BACKEND *master_host = NULL; BACKEND *master_host = NULL;
for (i = 0; servers[i]; i++) { for (i = 0; servers[i]; i++)
if (servers[i] && (servers[i]->server->status & (SERVER_MASTER|SERVER_MAINT)) == SERVER_MASTER) { {
if (master_host && servers[i]->server->depth < master_host->server->depth) { if (servers[i] && (servers[i]->server->status & (SERVER_MASTER | SERVER_MAINT)) == SERVER_MASTER)
master_host = servers[i]; {
} else { if (master_host && servers[i]->server->depth < master_host->server->depth)
if (master_host == NULL) { {
master_host = servers[i]; master_host = servers[i];
} }
else if (master_host == NULL)
{
master_host = servers[i];
} }
} }
} }
return master_host; return master_host;
} }
static int handle_state_switch(DCB* dcb,DCB_REASON reason, void * routersession) static int handle_state_switch(DCB* dcb, DCB_REASON reason, void * routersession)
{ {
ss_dassert(dcb != NULL); ss_dassert(dcb != NULL);
SESSION* session = dcb->session; SESSION* session = dcb->session;
ROUTER_CLIENT_SES* rses = (ROUTER_CLIENT_SES*)routersession; ROUTER_CLIENT_SES* rses = (ROUTER_CLIENT_SES*) routersession;
SERVICE* service = session->service; SERVICE* service = session->service;
ROUTER* router = (ROUTER *)service->router; ROUTER* router = (ROUTER *) service->router;
if (NULL == dcb->session->router_session && DCB_REASON_ERROR != reason) if (NULL == dcb->session->router_session && DCB_REASON_ERROR != reason)
{ {
@ -1014,7 +1036,7 @@ static int handle_state_switch(DCB* dcb,DCB_REASON reason, void * routersession)
*/ */
return 0; return 0;
} }
switch(reason) switch (reason)
{ {
case DCB_REASON_CLOSE: case DCB_REASON_CLOSE:
dcb->func.close(dcb); dcb->func.close(dcb);