Cleanup slavelag.c

Untabification, indentation, Allman, etc.
This commit is contained in:
Johan Wikman
2016-03-04 08:42:34 +02:00
parent 1a2bd8b234
commit 6b4ffc8506

View File

@ -27,10 +27,10 @@
#include <regex.h> #include <regex.h>
/** /**
* @file slavelag.c - a very simple filter designed to send queries to the * @file slavelag.c - a very simple filter designed to send queries to the
* master server after data modification has occurred. This is done to prevent * master server after data modification has occurred. This is done to prevent
* replication lag affecting the outcome of a select query. * replication lag affecting the outcome of a select query.
* *
* @verbatim * @verbatim
* *
* Two optional parameters that define the behavior after a data modifying query * Two optional parameters that define the behavior after a data modifying query
@ -41,71 +41,80 @@
* match=<regex> Regex for matching * match=<regex> Regex for matching
* ignore=<regex> Regex for ignoring * ignore=<regex> Regex for ignoring
* *
* The filter also has two options: @c case, which makes the regex case-sensitive, and @c ignorecase, which does the opposite. * The filter also has two options:
* Date Who Description * @c case, which makes the regex case-sensitive, and
* 03/03/2015 Markus Mäkelä Written for demonstrative purposes * @c ignorecase, which does the opposite.
*
* Date Who Description
* 03/03/2015 Markus Mäkelä Written for demonstrative purposes
* @endverbatim * @endverbatim
*/ */
MODULE_INFO info = { MODULE_INFO info =
MODULE_API_FILTER, {
MODULE_GA, MODULE_API_FILTER,
FILTER_VERSION, MODULE_GA,
"A routing hint filter that send queries to the master after data modification" FILTER_VERSION,
"A routing hint filter that send queries to the master after data modification"
}; };
static char *version_str = "V1.1.0"; static char *version_str = "V1.1.0";
static FILTER *createInstance(char **options, FILTER_PARAMETER **params); static FILTER *createInstance(char **options, FILTER_PARAMETER **params);
static void *newSession(FILTER *instance, SESSION *session); static void *newSession(FILTER *instance, SESSION *session);
static void closeSession(FILTER *instance, void *session); static void closeSession(FILTER *instance, void *session);
static void freeSession(FILTER *instance, void *session); static void freeSession(FILTER *instance, void *session);
static void setDownstream(FILTER *instance, void *fsession, DOWNSTREAM *downstream); static void setDownstream(FILTER *instance, void *fsession, DOWNSTREAM *downstream);
static int routeQuery(FILTER *instance, void *fsession, GWBUF *queue); static int routeQuery(FILTER *instance, void *fsession, GWBUF *queue);
static void diagnostic(FILTER *instance, void *fsession, DCB *dcb); static void diagnostic(FILTER *instance, void *fsession, DCB *dcb);
static FILTER_OBJECT MyObject = { static FILTER_OBJECT MyObject =
{
createInstance, createInstance,
newSession, newSession,
closeSession, closeSession,
freeSession, freeSession,
setDownstream, setDownstream,
NULL, // No Upstream requirement NULL, // No Upstream requirement
routeQuery, routeQuery,
NULL, NULL,
diagnostic, diagnostic,
}; };
struct LAGSTATS{ typedef struct lagstats
int n_add_count; /*< No. of statements diverted based on count */ {
int n_add_time; /*< No. of statements diverted based on time */ int n_add_count; /*< No. of statements diverted based on count */
int n_modified; /*< No. of statements not diverted */ int n_add_time; /*< No. of statements diverted based on time */
}; int n_modified; /*< No. of statements not diverted */
} LAGSTATS;
/** /**
* Instance structure * Instance structure
*/ */
typedef struct { typedef struct
char *match; /* Regular expression to match */ {
char *nomatch; /* Regular expression to ignore */ char *match; /* Regular expression to match */
int time; /*< The number of seconds to wait before routing queries char *nomatch; /* Regular expression to ignore */
* to slave servers after a data modification operation int time; /*< The number of seconds to wait before routing queries
* is done. */ * to slave servers after a data modification operation
int count; /*< Number of hints to add after each operation * is done. */
* that modifies data. */ int count; /*< Number of hints to add after each operation
struct LAGSTATS stats; * that modifies data. */
regex_t re; /* Compiled regex text of match */ LAGSTATS stats;
regex_t nore; /* Compiled regex text of ignore */ regex_t re; /* Compiled regex text of match */
regex_t nore; /* Compiled regex text of ignore */
} LAG_INSTANCE; } LAG_INSTANCE;
/** /**
* The session structure for this filter * The session structure for this filter
*/ */
typedef struct { typedef struct
DOWNSTREAM down; /*< The downstream filter */ {
int hints_left; /*< Number of hints left to add to queries*/ DOWNSTREAM down; /*< The downstream filter */
time_t last_modification; /*< Time of the last modifying operation */ int hints_left; /*< Number of hints left to add to queries*/
int active; /*< Is filter active */ time_t last_modification; /*< Time of the last modifying operation */
int active; /*< Is filter active */
} LAG_SESSION; } LAG_SESSION;
/** /**
@ -116,7 +125,7 @@ typedef struct {
char * char *
version() version()
{ {
return version_str; return version_str;
} }
/** /**
@ -139,122 +148,130 @@ ModuleInit()
FILTER_OBJECT * FILTER_OBJECT *
GetModuleObject() GetModuleObject()
{ {
return &MyObject; return &MyObject;
} }
/** /**
* Create an instance of the filter for a particular service * Create an instance of the filter for a particular service
* within MaxScale. * within MaxScale.
* *
* @param options The options for this filter * @param options The options for this filter
* @param params The array of name/value pair parameters for the filter * @param params The array of name/value pair parameters for the filter
* *
* @return The instance data for this new instance * @return The instance data for this new instance
*/ */
static FILTER * static FILTER *
createInstance(char **options, FILTER_PARAMETER **params) createInstance(char **options, FILTER_PARAMETER **params)
{ {
LAG_INSTANCE *my_instance; LAG_INSTANCE *my_instance;
int i,cflags = 0; int i;
int cflags = 0;
if ((my_instance = calloc(1, sizeof(LAG_INSTANCE))) != NULL) if ((my_instance = calloc(1, sizeof(LAG_INSTANCE))) != NULL)
{ {
my_instance->count = 0; my_instance->count = 0;
my_instance->time = 0; my_instance->time = 0;
my_instance->stats.n_add_count = 0; my_instance->stats.n_add_count = 0;
my_instance->stats.n_add_time = 0; my_instance->stats.n_add_time = 0;
my_instance->stats.n_modified = 0; my_instance->stats.n_modified = 0;
my_instance->match = NULL; my_instance->match = NULL;
my_instance->nomatch = NULL; my_instance->nomatch = NULL;
for (i = 0; params && params[i]; i++) for (i = 0; params && params[i]; i++)
{ {
if (!strcmp(params[i]->name, "count")) if (!strcmp(params[i]->name, "count"))
my_instance->count = atoi(params[i]->value); {
else if (!strcmp(params[i]->name, "time")) my_instance->count = atoi(params[i]->value);
my_instance->time = atoi(params[i]->value); }
else if (!strcmp(params[i]->name, "match")) else if (!strcmp(params[i]->name, "time"))
my_instance->match = strdup(params[i]->value); {
else if (!strcmp(params[i]->name, "ignore")) my_instance->time = atoi(params[i]->value);
my_instance->nomatch = strdup(params[i]->value); }
else else if (!strcmp(params[i]->name, "match"))
{ {
MXS_ERROR("lagfilter: Unexpected parameter '%s'.\n", my_instance->match = strdup(params[i]->value);
params[i]->name); }
} else if (!strcmp(params[i]->name, "ignore"))
} {
my_instance->nomatch = strdup(params[i]->value);
}
else
{
MXS_ERROR("lagfilter: Unexpected parameter '%s'.\n", params[i]->name);
}
}
if (options) if (options)
{ {
for (i = 0; options[i]; i++) for (i = 0; options[i]; i++)
{ {
if (!strcasecmp(options[i], "ignorecase")) if (!strcasecmp(options[i], "ignorecase"))
{ {
cflags |= REG_ICASE; cflags |= REG_ICASE;
} }
else if (!strcasecmp(options[i], "case")) else if (!strcasecmp(options[i], "case"))
{ {
cflags &= ~REG_ICASE; cflags &= ~REG_ICASE;
} }
else else
{ {
MXS_ERROR("lagfilter: unsupported option '%s'.", MXS_ERROR("lagfilter: unsupported option '%s'.", options[i]);
options[i]); }
} }
} }
}
if(my_instance->match) if (my_instance->match)
{ {
if(regcomp(&my_instance->re,my_instance->match,cflags)) if (regcomp(&my_instance->re, my_instance->match, cflags))
{ {
MXS_ERROR("lagfilter: Failed to compile regex '%s'.", MXS_ERROR("lagfilter: Failed to compile regex '%s'.", my_instance->match);
my_instance->match); }
} }
}
if(my_instance->nomatch) if (my_instance->nomatch)
{ {
if(regcomp(&my_instance->nore,my_instance->nomatch,cflags)) if (regcomp(&my_instance->nore,my_instance->nomatch,cflags))
{ {
MXS_ERROR("lagfilter: Failed to compile regex '%s'.", MXS_ERROR("lagfilter: Failed to compile regex '%s'.", my_instance->nomatch);
my_instance->nomatch); }
} }
} }
}
return (FILTER *)my_instance; return (FILTER *)my_instance;
} }
/** /**
* Associate a new session with this instance of the filter. * Associate a new session with this instance of the filter.
* *
* @param instance The filter instance data * @param instance The filter instance data
* @param session The session itself * @param session The session itself
*
* @return Session specific data for this session * @return Session specific data for this session
*/ */
static void * static void *
newSession(FILTER *instance, SESSION *session) newSession(FILTER *instance, SESSION *session)
{ {
LAG_INSTANCE *my_instance = (LAG_INSTANCE *)instance; LAG_INSTANCE *my_instance = (LAG_INSTANCE *)instance;
LAG_SESSION *my_session; LAG_SESSION *my_session;
if ((my_session = malloc(sizeof(LAG_SESSION))) != NULL) if ((my_session = malloc(sizeof(LAG_SESSION))) != NULL)
{ {
my_session->active = 1; my_session->active = 1;
my_session->hints_left = 0; my_session->hints_left = 0;
my_session->last_modification = 0; my_session->last_modification = 0;
} }
return my_session; return my_session;
} }
/** /**
* Close a session with the filter, this is the mechanism * Close a session with the filter, this is the mechanism
* by which a filter may cleanup data structure etc. * by which a filter may cleanup data structure etc.
* *
* @param instance The filter instance data * @param instance The filter instance data
* @param session The session being closed * @param session The session being closed
*/ */
static void static void
closeSession(FILTER *instance, void *session) closeSession(FILTER *instance, void *session)
{ {
} }
@ -262,29 +279,28 @@ closeSession(FILTER *instance, void *session)
/** /**
* Free the memory associated with this filter session. * Free the memory associated with this filter session.
* *
* @param instance The filter instance data * @param instance The filter instance data
* @param session The session being closed * @param session The session being closed
*/ */
static void static void
freeSession(FILTER *instance, void *session) freeSession(FILTER *instance, void *session)
{ {
free(session); free(session);
return;
} }
/** /**
* Set the downstream component for this filter. * Set the downstream component for this filter.
* *
* @param instance The filter instance data * @param instance The filter instance data
* @param session The session being closed * @param session The session being closed
* @param downstream The downstream filter or router * @param downstream The downstream filter or router
*/ */
static void static void
setDownstream(FILTER *instance, void *session, DOWNSTREAM *downstream) setDownstream(FILTER *instance, void *session, DOWNSTREAM *downstream)
{ {
LAG_SESSION *my_session = (LAG_SESSION *)session; LAG_SESSION *my_session = (LAG_SESSION *)session;
my_session->down = *downstream; my_session->down = *downstream;
} }
/** /**
@ -297,60 +313,60 @@ LAG_SESSION *my_session = (LAG_SESSION *)session;
* filter definition matches the SQL text then add the hint * filter definition matches the SQL text then add the hint
* "Route to named server" with the name defined in the server parameter * "Route to named server" with the name defined in the server parameter
* *
* @param instance The filter instance data * @param instance The filter instance data
* @param session The filter session * @param session The filter session
* @param queue The query data * @param queue The query data
*/ */
static int static int
routeQuery(FILTER *instance, void *session, GWBUF *queue) routeQuery(FILTER *instance, void *session, GWBUF *queue)
{ {
LAG_INSTANCE *my_instance = (LAG_INSTANCE *)instance; LAG_INSTANCE *my_instance = (LAG_INSTANCE *)instance;
LAG_SESSION *my_session = (LAG_SESSION *)session; LAG_SESSION *my_session = (LAG_SESSION *)session;
char *sql; char *sql;
time_t now = time(NULL); time_t now = time(NULL);
if (modutil_is_SQL(queue)) if (modutil_is_SQL(queue))
{ {
if (queue->next != NULL) if (queue->next != NULL)
{ {
queue = gwbuf_make_contiguous(queue); queue = gwbuf_make_contiguous(queue);
} }
if(query_classifier_get_operation(queue) & (QUERY_OP_DELETE|QUERY_OP_INSERT|QUERY_OP_UPDATE)) if (query_classifier_get_operation(queue) & (QUERY_OP_DELETE | QUERY_OP_INSERT | QUERY_OP_UPDATE))
{ {
if((sql = modutil_get_SQL(queue)) != NULL) if ((sql = modutil_get_SQL(queue)) != NULL)
{ {
if(my_instance->nomatch == NULL||(my_instance->nomatch && regexec(&my_instance->nore,sql,0,NULL,0) != 0)) if (my_instance->nomatch == NULL ||
{ (my_instance->nomatch && regexec(&my_instance->nore, sql, 0, NULL, 0) != 0))
if(my_instance->match == NULL|| {
(my_instance->match && regexec(&my_instance->re,sql,0,NULL,0) == 0)) if (my_instance->match == NULL||
{ (my_instance->match && regexec(&my_instance->re,sql,0,NULL,0) == 0))
my_session->hints_left = my_instance->count; {
my_session->last_modification = now; my_session->hints_left = my_instance->count;
my_instance->stats.n_modified++; my_session->last_modification = now;
} my_instance->stats.n_modified++;
} }
free(sql); }
}
} free(sql);
else if(my_session->hints_left > 0) }
{ }
queue->hint = hint_create_route(queue->hint, else if (my_session->hints_left > 0)
HINT_ROUTE_TO_MASTER, {
NULL); queue->hint = hint_create_route(queue->hint, HINT_ROUTE_TO_MASTER, NULL);
my_session->hints_left--; my_session->hints_left--;
my_instance->stats.n_add_count++; my_instance->stats.n_add_count++;
} }
else if(difftime(now,my_session->last_modification) < my_instance->time) else if (difftime(now,my_session->last_modification) < my_instance->time)
{ {
queue->hint = hint_create_route(queue->hint, queue->hint = hint_create_route(queue->hint, HINT_ROUTE_TO_MASTER, NULL);
HINT_ROUTE_TO_MASTER, my_instance->stats.n_add_time++;
NULL); }
my_instance->stats.n_add_time++; }
}
} return my_session->down.routeQuery(my_session->down.instance,
return my_session->down.routeQuery(my_session->down.instance, my_session->down.session,
my_session->down.session, queue); queue);
} }
/** /**
@ -360,26 +376,20 @@ time_t now = time(NULL);
* instance as a whole, otherwise print diagnostics for the * instance as a whole, otherwise print diagnostics for the
* particular session. * particular session.
* *
* @param instance The filter instance * @param instance The filter instance
* @param fsession Filter session, may be NULL * @param fsession Filter session, may be NULL
* @param dcb The DCB for diagnostic output * @param dcb The DCB for diagnostic output
*/ */
static void static void
diagnostic(FILTER *instance, void *fsession, DCB *dcb) diagnostic(FILTER *instance, void *fsession, DCB *dcb)
{ {
LAG_INSTANCE *my_instance = (LAG_INSTANCE *)instance; LAG_INSTANCE *my_instance = (LAG_INSTANCE *)instance;
LAG_SESSION *my_session = (LAG_SESSION *)fsession; LAG_SESSION *my_session = (LAG_SESSION *)fsession;
dcb_printf(dcb, "Configuration:\n\tCount: %d\n",
my_instance->count);
dcb_printf(dcb, "\tTime: %d seconds\n\n",
my_instance->time);
dcb_printf(dcb, "Statistics:\n");
dcb_printf(dcb, "\tNo. of data modifications: %d\n",
my_instance->stats.n_modified);
dcb_printf(dcb, "\tNo. of hints added based on count: %d\n",
my_instance->stats.n_add_count);
dcb_printf(dcb, "\tNo. of hints added based on time: %d\n",
my_instance->stats.n_add_time);
dcb_printf(dcb, "Configuration:\n\tCount: %d\n", my_instance->count);
dcb_printf(dcb, "\tTime: %d seconds\n\n", my_instance->time);
dcb_printf(dcb, "Statistics:\n");
dcb_printf(dcb, "\tNo. of data modifications: %d\n", my_instance->stats.n_modified);
dcb_printf(dcb, "\tNo. of hints added based on count: %d\n", my_instance->stats.n_add_count);
dcb_printf(dcb, "\tNo. of hints added based on time: %d\n", my_instance->stats.n_add_time);
} }