Merge branch 'develop' into MAX-11

Conflicts:
	server/include/server.h
	server/modules/monitor/mysql_mon.c
	server/modules/protocol/mysql_client.c
	server/modules/routing/readwritesplit/readwritesplit.c
	utils/skygw_debug.h
This commit is contained in:
VilhoRaatikka 2014-06-07 00:50:08 +03:00
commit ad744962b2
47 changed files with 2759 additions and 128 deletions

Binary file not shown.

View File

@ -1 +1 @@
0.6.0
0.7.0

View File

@ -33,6 +33,7 @@ all:
(cd modules/routing/readwritesplit; touch depend.mk ;make)
(cd modules/protocol; touch depend.mk ;make)
(cd modules/monitor; touch depend.mk ;make)
(cd modules/filter; touch depend.mk ;make)
cleantests:
$(MAKE) -C test cleantests
@ -49,12 +50,14 @@ clean:
(cd modules/routing; touch depend.mk ; make clean)
(cd modules/protocol; touch depend.mk ; make clean)
(cd modules/monitor; touch depend.mk ; make clean)
(cd modules/filter; touch depend.mk ; make clean)
depend:
(cd core; touch depend.mk ; make depend)
(cd modules/routing; touch depend.mk ; make depend)
(cd modules/protocol; touch depend.mk ; make depend)
(cd modules/monitor; touch depend.mk ; make depend)
(cd modules/filter; touch depend.mk ; make depend)
install:
@mkdir -p $(DEST)
@ -71,3 +74,4 @@ install:
(cd modules/routing; make DEST=$(DEST) install)
(cd modules/protocol; make DEST=$(DEST) install)
(cd modules/monitor; make DEST=$(DEST) install)
(cd modules/filter; make DEST=$(DEST) install)

View File

@ -20,6 +20,8 @@ threads=1
# user =<user name - must have slave replication and
# slave client privileges>
# passwd=<password of the above user, plain text currently>
# monitor_interval=<sampling interval in milliseconds,
# default value is 10000>
[MySQL Monitor]
type=monitor

View File

@ -33,6 +33,7 @@
# 29/06/13 Vilho Raatikka Reverted Query classifier changes because
# gateway needs mysql client lib, not qc.
# 24/07/13 Mark Ridoch Addition of encryption routines
# 30/05/14 Mark Ridoch Filter API added
include ../../build_gateway.inc
@ -56,14 +57,15 @@ LDFLAGS=-rdynamic -L$(LOGPATH) \
SRCS= atomic.c buffer.c spinlock.c gateway.c \
gw_utils.c utils.c dcb.c load_utils.c session.c service.c server.c \
poll.c config.c users.c hashtable.c dbusers.c thread.c gwbitmask.c \
monitor.c adminusers.c secrets.c
monitor.c adminusers.c secrets.c filter.c modutil.c
HDRS= ../include/atomic.h ../include/buffer.h ../include/dcb.h \
../include/gw.h ../modules/include/mysql_client_server_protocol.h \
../include/session.h ../include/spinlock.h ../include/thread.h \
../include/modules.h ../include/poll.h ../include/config.h \
../include/users.h ../include/hashtable.h ../include/gwbitmask.h \
../include/adminusers.h ../include/version.h ../include/maxscale.h
../include/adminusers.h ../include/version.h ../include/maxscale.h \
../include/filter.h modutil.h
OBJ=$(SRCS:.c=.o)

View File

@ -31,6 +31,9 @@
* 11/03/14 Massimiliano Pinto Added Unix socket support
* 11/05/14 Massimiliano Pinto Added version_string support to service
* 19/05/14 Mark Riddoch Added unique names from section headers
* 29/05/14 Mark Riddoch Addition of filter definition
* 23/05/14 Massimiliano Pinto Added automatic set of maxscale-id: first listening ipv4_raw + port + pid
* 28/05/14 Massimiliano Pinto Added detect_replication_lag parameter
*
* @endverbatim
*/
@ -56,6 +59,7 @@ static char *config_get_value(CONFIG_PARAMETER *, const char *);
static int handle_global_item(const char *, const char *);
static void global_defaults();
static void check_config_objects(CONFIG_CONTEXT *context);
static int config_truth_value(char *str);
static char *config_file = NULL;
static GATEWAY_CONF gateway;
@ -231,7 +235,7 @@ int error_count = 0;
if (obj->element == NULL) /*< if module load failed */
{
LOGIF(LE, (skygw_log_write_flush(
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Reading configuration "
"for router service '%s' failed. "
@ -254,7 +258,7 @@ int error_count = 0;
"max_slave_connections");
if (enable_root_user)
serviceEnableRootUser(obj->element, atoi(enable_root_user));
serviceEnableRootUser(obj->element, config_truth_value(enable_root_user));
if (!auth)
auth = config_get_value(obj->parameters, "auth");
@ -359,6 +363,52 @@ int error_count = 0;
obj->object)));
}
}
else if (!strcmp(type, "filter"))
{
char *module = config_get_value(obj->parameters,
"module");
char *options = config_get_value(obj->parameters,
"options");
if (module)
{
obj->element = filter_alloc(obj->object, module);
}
else
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error: Filter '%s' has no module "
"defined defined to load.",
obj->object)));
error_count++;
}
if (obj->element && options)
{
char *s = strtok(options, ",");
while (s)
{
filterAddOption(obj->element, s);
s = strtok(NULL, ",");
}
}
if (obj->element)
{
CONFIG_PARAMETER *params = obj->parameters;
while (params)
{
if (strcmp(params->name, "module")
&& strcmp(params->name,
"options"))
{
filterAddParameter(obj->element,
params->name,
params->value);
}
params = params->next;
}
}
}
obj = obj->next;
}
@ -376,7 +426,8 @@ int error_count = 0;
{
char *servers;
char *roptions;
char *filters = config_get_value(obj->parameters,
"filters");
servers = config_get_value(obj->parameters, "servers");
roptions = config_get_value(obj->parameters,
"router_options");
@ -418,6 +469,10 @@ int error_count = 0;
s = strtok(NULL, ",");
}
}
if (filters)
{
serviceSetFilters(obj->element, filters);
}
}
else if (!strcmp(type, "listener"))
{
@ -426,12 +481,19 @@ int error_count = 0;
char *port;
char *protocol;
char *socket;
struct sockaddr_in serv_addr;
service = config_get_value(obj->parameters, "service");
port = config_get_value(obj->parameters, "port");
address = config_get_value(obj->parameters, "address");
protocol = config_get_value(obj->parameters, "protocol");
socket = config_get_value(obj->parameters, "socket");
/* if id is not set, do it now */
if (gateway.id == 0) {
setipaddress(&serv_addr.sin_addr, (address == NULL) ? "0.0.0.0" : address);
gateway.id = (unsigned long) (serv_addr.sin_addr.s_addr + port + getpid());
}
if (service && socket && protocol) {
CONFIG_CONTEXT *ptr = context;
@ -494,18 +556,46 @@ int error_count = 0;
char *servers;
char *user;
char *passwd;
unsigned long interval = 0;
int replication_heartbeat = 0;
module = config_get_value(obj->parameters, "module");
servers = config_get_value(obj->parameters, "servers");
user = config_get_value(obj->parameters, "user");
passwd = config_get_value(obj->parameters, "passwd");
if (config_get_value(obj->parameters, "monitor_interval")) {
interval = strtoul(config_get_value(obj->parameters, "monitor_interval"), NULL, 10);
}
if (config_get_value(obj->parameters, "detect_replication_lag")) {
replication_heartbeat = atoi(config_get_value(obj->parameters, "detect_replication_lag"));
}
if (module)
{
obj->element = monitor_alloc(obj->object, module);
if (servers && obj->element)
{
char *s = strtok(servers, ",");
char *s;
/* if id is not set, compute it now with pid only */
if (gateway.id == 0) {
gateway.id = getpid();
}
/* add the maxscale-id to monitor data */
monitorSetId(obj->element, gateway.id);
/* set monitor interval */
if (interval > 0)
monitorSetInterval(obj->element, interval);
/* set replication heartbeat */
if(replication_heartbeat == 1)
monitorSetReplicationHeartbeat(obj->element, replication_heartbeat);
/* get the servers to monitor */
s = strtok(servers, ",");
while (s)
{
CONFIG_CONTEXT *obj1 = context;
@ -550,7 +640,8 @@ int error_count = 0;
error_count++;
}
}
else if (strcmp(type, "server") != 0)
else if (strcmp(type, "server") != 0
&& strcmp(type, "filter") != 0)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
@ -747,6 +838,7 @@ global_defaults()
gateway.version_string = strdup(version_string);
else
gateway.version_string = NULL;
gateway.id=0;
}
/**
@ -959,10 +1051,12 @@ SERVER *server;
{
char *servers;
char *roptions;
char *filters;
servers = config_get_value(obj->parameters, "servers");
roptions = config_get_value(obj->parameters,
"router_options");
filters = config_get_value(obj->parameters, "filters");
if (servers && obj->element)
{
char *s = strtok(servers, ",");
@ -996,6 +1090,8 @@ SERVER *server;
s = strtok(NULL, ",");
}
}
if (filters)
serviceSetFilters(obj->element, filters);
}
else if (!strcmp(type, "listener"))
{
@ -1056,7 +1152,8 @@ SERVER *server;
}
}
else if (strcmp(type, "server") != 0 &&
strcmp(type, "monitor") != 0)
strcmp(type, "monitor") != 0 &&
strcmp(type, "filter") != 0)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
@ -1080,6 +1177,7 @@ static char *service_params[] =
"enable_root_user",
"max_slave_connections",
"version_string",
"filters",
NULL
};
@ -1112,6 +1210,8 @@ static char *monitor_params[] =
"servers",
"user",
"passwd",
"monitor_interval",
"detect_replication_lag",
NULL
};
/**
@ -1213,3 +1313,24 @@ bool config_set_qualified_param(
return succp;
}
/**
* Used for boolean settings where values may be 1, yes or true
* to enable a setting or -, no, false to disable a setting.
*
* @param str String to convert to a boolean
* @return Truth value
*/
static int
config_truth_value(char *str)
{
if (strcasecmp(str, "true") == 0 || strcasecmp(str, "on") == 0)
{
return 1;
}
if (strcasecmp(str, "flase") == 0 || strcasecmp(str, "off") == 0)
{
return 0;
}
return atoi(str);
}

316
server/core/filter.c Normal file
View File

@ -0,0 +1,316 @@
/*
* This file is distributed as part of MaxScale from SkySQL. It is free
* software: you can redistribute it and/or modify it under the terms of the
* GNU General Public License as published by the Free Software Foundation,
* version 2.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along with
* this program; if not, write to the Free Software Foundation, Inc., 51
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Copyright SkySQL Ab 2014
*/
/**
* @file filter.c - A representation of a filter within MaxScale.
*
* @verbatim
* Revision History
*
* Date Who Description
* 29/05/14 Mark Riddoch Initial implementation
*
* @endverbatim
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <filter.h>
#include <session.h>
#include <modules.h>
#include <spinlock.h>
#include <skygw_utils.h>
#include <log_manager.h>
extern int lm_enabled_logfiles_bitmask;
static SPINLOCK filter_spin = SPINLOCK_INIT;
static FILTER_DEF *allFilters = NULL;
/**
* Allocate a new filter within MaxScale
*
*
* @param name The filter name
* @param module The module to load
*
* @return The newly created filter or NULL if an error occured
*/
FILTER_DEF *
filter_alloc(char *name, char *module)
{
FILTER_DEF *filter;
if ((filter = (FILTER_DEF *)malloc(sizeof(FILTER_DEF))) == NULL)
return NULL;
filter->name = strdup(name);
filter->module = strdup(module);
filter->filter = NULL;
filter->options = NULL;
filter->obj = NULL;
filter->parameters = NULL;
spinlock_init(&filter->spin);
spinlock_acquire(&filter_spin);
filter->next = allFilters;
allFilters = filter;
spinlock_release(&filter_spin);
return filter;
}
/**
* Deallocate the specified filter
*
* @param server The service to deallocate
* @return Returns true if the server was freed
*/
void
filter_free(FILTER_DEF *filter)
{
FILTER_DEF *ptr;
/* First of all remove from the linked list */
spinlock_acquire(&filter_spin);
if (allFilters == filter)
{
allFilters = filter->next;
}
else
{
ptr = allFilters;
while (ptr && ptr->next != filter)
{
ptr = ptr->next;
}
if (ptr)
ptr->next = filter->next;
}
spinlock_release(&filter_spin);
/* Clean up session and free the memory */
free(filter->name);
free(filter->module);
free(filter);
}
/**
* Find an existing filter using the unique section name in
* configuration file
*
* @param name The filter name
* @return The server or NULL if not found
*/
FILTER_DEF *
filter_find(char *name)
{
FILTER_DEF *filter;
spinlock_acquire(&filter_spin);
filter = allFilters;
while (filter)
{
if (strcmp(filter->name, name) == 0)
break;
filter = filter->next;
}
spinlock_release(&filter_spin);
return filter;
}
/**
* Print all filters to a DCB
*
* Designed to be called within a debugger session in order
* to display all active filters within MaxScale
*/
void
dprintAllFilters(DCB *dcb)
{
FILTER_DEF *ptr;
int i;
spinlock_acquire(&filter_spin);
ptr = allFilters;
while (ptr)
{
dcb_printf(dcb, "Filter %p (%s)\n", ptr, ptr->name);
dcb_printf(dcb, "\tModule: %s\n", ptr->module);
if (ptr->options)
{
dcb_printf(dcb, "\tOptions: ");
for (i = 0; ptr->options && ptr->options[i]; i++)
dcb_printf(dcb, "%s ", ptr->options[i]);
dcb_printf(dcb, "\n");
}
if (ptr->obj && ptr->filter)
ptr->obj->diagnostics(ptr->filter, NULL, dcb);
else
dcb_printf(dcb, "\tModule not loaded.\n");
ptr = ptr->next;
}
spinlock_release(&filter_spin);
}
/**
* Print filter details to a DCB
*
* Designed to be called within a debug CLI in order
* to display all active filters in MaxScale
*/
void
dprintFilter(DCB *dcb, FILTER_DEF *filter)
{
int i;
dcb_printf(dcb, "Filter %p (%s)\n", filter, filter->name);
dcb_printf(dcb, "\tModule: %s\n", filter->module);
if (filter->options)
{
dcb_printf(dcb, "\tOptions: ");
for (i = 0; filter->options && filter->options[i]; i++)
dcb_printf(dcb, "%s ", filter->options[i]);
dcb_printf(dcb, "\n");
}
if (filter->obj && filter->filter)
filter->obj->diagnostics(filter->filter, NULL, dcb);
}
/**
* List all filters in a tabular form to a DCB
*
*/
void
dListFilters(DCB *dcb)
{
FILTER_DEF *ptr;
int i;
spinlock_acquire(&filter_spin);
ptr = allFilters;
if (ptr)
{
dcb_printf(dcb, "%-18s | %-15s | Options\n",
"Filter", "Module");
dcb_printf(dcb, "-------------------------------------------------------------------------------\n");
}
while (ptr)
{
dcb_printf(dcb, "%-18s | %-15s | ",
ptr->name, ptr->module);
for (i = 0; ptr->options && ptr->options[i]; i++)
dcb_printf(dcb, "%s ", ptr->options[i]);
dcb_printf(dcb, "\n");
ptr = ptr->next;
}
spinlock_release(&filter_spin);
}
/**
* Add a router option to a service
*
* @param service The service to add the router option to
* @param option The option string
*/
void
filterAddOption(FILTER_DEF *filter, char *option)
{
int i;
spinlock_acquire(&filter->spin);
if (filter->options == NULL)
{
filter->options = (char **)calloc(2, sizeof(char *));
filter->options[0] = strdup(option);
filter->options[1] = NULL;
}
else
{
for (i = 0; filter->options[i]; i++)
;
filter->options = (char **)realloc(filter->options,
(i + 2) * sizeof(char *));
filter->options[i] = strdup(option);
filter->options[i+1] = NULL;
}
spinlock_release(&filter->spin);
}
/**
* Add a router parameter to a service
*
* @param service The service to add the router option to
* @param name The parameter name
* @param value The parameter value
*/
void
filterAddParameter(FILTER_DEF *filter, char *name, char *value)
{
int i;
spinlock_acquire(&filter->spin);
if (filter->parameters == NULL)
{
filter->parameters = (FILTER_PARAMETER **)calloc(2, sizeof(FILTER_PARAMETER *));
i = 0;
}
else
{
for (i = 0; filter->parameters[i]; i++)
;
filter->parameters = (FILTER_PARAMETER **)realloc(filter->parameters,
(i + 2) * sizeof(FILTER_PARAMETER *));
}
filter->parameters[i] = (FILTER_PARAMETER *)calloc(1, sizeof(FILTER_PARAMETER));
filter->parameters[i]->name = strdup(name);
filter->parameters[i]->value = strdup(value);
filter->parameters[i+1] = NULL;
spinlock_release(&filter->spin);
}
DOWNSTREAM *
filterApply(FILTER_DEF *filter, SESSION *session, DOWNSTREAM *downstream)
{
DOWNSTREAM *me;
if (filter->obj == NULL)
{
/* Filter not yet loaded */
if ((filter->obj = load_module(filter->module,
MODULE_FILTER)) == NULL)
{
return NULL;
}
}
if (filter->filter == NULL)
filter->filter = (filter->obj->createInstance)(filter->options,
filter->parameters);
if ((me = (DOWNSTREAM *)calloc(1, sizeof(DOWNSTREAM))) == NULL)
{
return NULL;
}
me->instance = filter->filter;
me->routeQuery = filter->obj->routeQuery;
me->session = filter->obj->newSession(me->instance, session);
filter->obj->setDownstream(me->instance, me->session, downstream);
return me;
}

View File

@ -28,6 +28,7 @@
* 14/06/13 Mark Riddoch Updated to add call to ModuleInit if one is
* defined in the loaded module.
* Also updated to call fixed GetModuleObject
* 02/06/14 Mark Riddoch Addition of module info
*
* @endverbatim
*/
@ -38,6 +39,7 @@
#include <string.h>
#include <dlfcn.h>
#include <modules.h>
#include <modinfo.h>
#include <skygw_utils.h>
#include <log_manager.h>
@ -47,10 +49,11 @@ static MODULES *registered = NULL;
static MODULES *find_module(const char *module);
static void register_module(const char *module,
const char *type,
void *dlhandle,
char *version,
void *modobj);
const char *type,
void *dlhandle,
char *version,
void *modobj,
MODULE_INFO *info);
static void unregister_module(const char *module);
char* get_maxscale_home(void)
@ -76,12 +79,13 @@ char* get_maxscale_home(void)
void *
load_module(const char *module, const char *type)
{
char *home, *version;
char fname[MAXPATHLEN];
void *dlhandle, *sym;
char *(*ver)();
void *(*ep)(), *modobj;
MODULES *mod;
char *home, *version;
char fname[MAXPATHLEN];
void *dlhandle, *sym;
char *(*ver)();
void *(*ep)(), *modobj;
MODULES *mod;
MODULE_INFO *mod_info = NULL;
if ((mod = find_module(module)) == NULL)
{
@ -141,6 +145,57 @@ MODULES *mod;
ModuleInit();
}
if ((sym = dlsym(dlhandle, "info")) != NULL)
{
int fatal = 0;
mod_info = sym;
if (strcmp(type, MODULE_PROTOCOL) == 0
&& mod_info->modapi != MODULE_API_PROTOCOL)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Module '%s' does not implement "
"the protocol API.\n",
module)));
fatal = 1;
}
if (strcmp(type, MODULE_ROUTER) == 0
&& mod_info->modapi != MODULE_API_ROUTER)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Module '%s' does not implement "
"the router API.\n",
module)));
fatal = 1;
}
if (strcmp(type, MODULE_MONITOR) == 0
&& mod_info->modapi != MODULE_API_MONITOR)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Module '%s' does not implement "
"the monitor API.\n",
module)));
fatal = 1;
}
if (strcmp(type, MODULE_FILTER) == 0
&& mod_info->modapi != MODULE_API_FILTER)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Module '%s' does not implement "
"the filter API.\n",
module)));
fatal = 1;
}
if (fatal)
{
dlclose(dlhandle);
return NULL;
}
}
if ((sym = dlsym(dlhandle, "GetModuleObject")) == NULL)
{
LOGIF(LE, (skygw_log_write_flush(
@ -161,7 +216,7 @@ MODULES *mod;
module,
version,
fname)));
register_module(module, type, dlhandle, version, modobj);
register_module(module, type, dlhandle, version, modobj, mod_info);
}
else
{
@ -227,7 +282,7 @@ MODULES *mod = registered;
* @param modobj The module object
*/
static void
register_module(const char *module, const char *type, void *dlhandle, char *version, void *modobj)
register_module(const char *module, const char *type, void *dlhandle, char *version, void *modobj, MODULE_INFO *mod_info)
{
MODULES *mod;
@ -239,6 +294,7 @@ MODULES *mod;
mod->version = strdup(version);
mod->modobj = modobj;
mod->next = registered;
mod->info = mod_info;
registered = mod;
}
@ -303,11 +359,25 @@ dprintAllModules(DCB *dcb)
{
MODULES *ptr = registered;
dcb_printf(dcb, "%-15s | %-11s | Version\n", "Module Name", "Module Type");
dcb_printf(dcb, "-----------------------------------------------------\n");
dcb_printf(dcb, "%-15s | %-11s | Version | API | Status\n", "Module Name", "Module Type");
dcb_printf(dcb, "--------------------------------------------------------------------------\n");
while (ptr)
{
dcb_printf(dcb, "%-15s | %-11s | %s\n", ptr->module, ptr->type, ptr->version);
dcb_printf(dcb, "%-15s | %-11s | %-7s ", ptr->module, ptr->type, ptr->version);
if (ptr->info)
dcb_printf(dcb, "| %d.%d.%d | %s",
ptr->info->api_version.major,
ptr->info->api_version.minor,
ptr->info->api_version.patch,
ptr->info->status == MODULE_IN_DEVELOPMENT
? "In Development"
: (ptr->info->status == MODULE_ALPHA_RELEASE
? "Alpha"
: (ptr->info->status == MODULE_BETA_RELEASE
? "Beta"
: (ptr->info->status == MODULE_GA
? "GA" : "Unknown"))));
dcb_printf(dcb, "\n");
ptr = ptr->next;
}
}

123
server/core/modutil.c Normal file
View File

@ -0,0 +1,123 @@
/*
* This file is distributed as part of MaxScale from SkySQL. It is free
* software: you can redistribute it and/or modify it under the terms of the
* GNU General Public License as published by the Free Software Foundation,
* version 2.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along with
* this program; if not, write to the Free Software Foundation, Inc., 51
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Copyright SkySQL Ab 2014
*/
/**
* @file modutil.c - Implementation of useful routines for modules
*
* @verbatim
* Revision History
*
* Date Who Description
* 04/06/14 Mark Riddoch Initial implementation
*
* @endverbatim
*/
#include <buffer.h>
#include <string.h>
/**
* Check if a GWBUF structure is a MySQL COM_QUERY packet
*
* @param buf Buffer to check
* @return True if GWBUF is a COM_QUERY packet
*/
int
modutil_is_SQL(GWBUF *buf)
{
unsigned char *ptr;
if (GWBUF_LENGTH(buf) < 5)
return 0;
ptr = GWBUF_DATA(buf);
return ptr[4] == 0x03; // COM_QUERY
}
/**
* Extract the SQL portion of a COM_QUERY packet
*
* NB This sets *sql to point into the packet and does not
* allocate any new storage. The string pointed to by *sql is
* not NULL terminated.
*
* This routine is very simplistic and does not deal with SQL text
* that spans multiple buffers.
*
* @param buf The packet buffer
* @param sql Pointer that is set to point at the SQL data
* @param length Length of the SQL data
* @return True if the packet is a COM_QUERY packet
*/
int
modutil_extract_SQL(GWBUF *buf, char **sql, int *length)
{
char *ptr;
if (!modutil_is_SQL(buf))
return 0;
ptr = GWBUF_DATA(buf);
*length = *ptr++;
*length += (*ptr++ << 8);
*length += (*ptr++ << 8);
ptr += 2; // Skip sequence id and COM_QUERY byte
*length = *length - 1;
*sql = ptr;
return 1;
}
GWBUF *
modutil_replace_SQL(GWBUF *orig, char *sql)
{
char *ptr;
int length, newlength;
GWBUF *addition;
if (!modutil_is_SQL(orig))
return NULL;
ptr = GWBUF_DATA(orig);
length = *ptr++;
length += (*ptr++ << 8);
length += (*ptr++ << 8);
ptr += 2; // Skip sequence id and COM_QUERY byte
newlength = strlen(sql);
if (length - 1 == newlength)
{
/* New SQL is the same length as old */
memcpy(ptr, sql, newlength);
}
else if (length - 1 > newlength)
{
/* New SQL is shorter */
memcpy(ptr, sql, newlength);
GWBUF_RTRIM(orig, (length - 1) - newlength);
}
else
{
memcpy(ptr, sql, length - 1);
addition = gwbuf_alloc(newlength - (length - 1));
memcpy(GWBUF_DATA(addition), &sql[length - 1], newlength - (length - 1));
ptr = GWBUF_DATA(orig);
*ptr++ = (newlength + 1) & 0xff;
*ptr++ = ((newlength + 1) >> 8) & 0xff;
*ptr++ = ((newlength + 1) >> 16) & 0xff;
orig->next = addition;
}
return orig;
}

View File

@ -22,8 +22,10 @@
* @verbatim
* Revision History
*
* Date Who Description
* 08/07/13 Mark Riddoch Initial implementation
* Date Who Description
* 08/07/13 Mark Riddoch Initial implementation
* 23/05/14 Massimiliano Pinto Addition of monitor_interval parameter
* and monitor id
*
* @endverbatim
*/
@ -220,3 +222,47 @@ MONITOR *ptr;
spinlock_release(&monLock);
return ptr;
}
/**
* Set the id of the monitor.
*
* @param mon The monitor instance
* @param id The id for the monitor
*/
void
monitorSetId(MONITOR *mon, unsigned long id)
{
if (mon->module->defaultId != NULL) {
mon->module->defaultId(mon->handle, id);
}
}
/**
* Set the monitor sampling interval.
*
* @param mon The monitor instance
* @param interval The sampling interval in milliseconds
*/
void
monitorSetInterval (MONITOR *mon, unsigned long interval)
{
if (mon->module->setInterval != NULL) {
mon->module->setInterval(mon->handle, interval);
}
}
/**
* Enable Replication Heartbeat support in monitor.
*
* @param mon The monitor instance
* @param interval The sampling interval in milliseconds
*/
void
monitorSetReplicationHeartbeat(MONITOR *mon, int replication_heartbeat)
{
if (mon->module->replicationHeartbeat != NULL) {
mon->module->replicationHeartbeat(mon->handle, replication_heartbeat);
}
}

View File

@ -27,6 +27,7 @@
* 17/05/14 Mark Riddoch Addition of unique_name
* 20/05/14 Massimiliano Pinto Addition of server_string
* 21/05/14 Massimiliano Pinto Addition of node_id
* 28/05/14 Massimiliano Pinto Addition of rlagd and node_ts fields
*
* @endverbatim
*/
@ -73,6 +74,8 @@ SERVER *server;
server->unique_name = NULL;
server->server_string = NULL;
server->node_id = -1;
server->rlag = -1;
server->node_ts = 0;
spinlock_acquire(&server_spin);
server->next = allServers;
@ -247,6 +250,14 @@ char *stat;
if (ptr->server_string)
dcb_printf(dcb, "\tServer Version:\t\t%s\n", ptr->server_string);
dcb_printf(dcb, "\tNode Id: %d\n", ptr->node_id);
if (SERVER_IS_SLAVE(ptr)) {
if (ptr->rlag >= 0) {
dcb_printf(dcb, "\tSlave delay:\t\t%d\n", ptr->rlag);
}
}
if (ptr->node_ts > 0) {
dcb_printf(dcb, "\tLast Repl Heartbeat:\t%lu\n", ptr->node_ts);
}
dcb_printf(dcb, "\tNumber of connections: %d\n", ptr->stats.n_connections);
dcb_printf(dcb, "\tCurrent no. of conns: %d\n", ptr->stats.n_current);
ptr = ptr->next;
@ -275,6 +286,14 @@ char *stat;
if (server->server_string)
dcb_printf(dcb, "\tServer Version:\t\t%s\n", server->server_string);
dcb_printf(dcb, "\tNode Id: %d\n", server->node_id);
if (SERVER_IS_SLAVE(server)) {
if (server->rlag >= 0) {
dcb_printf(dcb, "\tSlave delay:\t\t%d\n", server->rlag);
}
}
if (server->node_ts > 0) {
dcb_printf(dcb, "\tLast Repl Heartbeat:\t%lu\n", server->node_ts);
}
dcb_printf(dcb, "\tNumber of connections: %d\n", server->stats.n_connections);
dcb_printf(dcb, "\tCurrent no. of conns: %d\n", server->stats.n_current);
}
@ -325,6 +344,8 @@ char *status = NULL;
if ((status = (char *)malloc(200)) == NULL)
return NULL;
status[0] = 0;
if (server->status & SERVER_MAINT)
strcat(status, "Maintenance, ");
if (server->status & SERVER_MASTER)
strcat(status, "Master, ");
if (server->status & SERVER_SLAVE)

View File

@ -30,6 +30,7 @@
* 28/02/14 Massimiliano Pinto users_alloc moved from service_alloc to serviceStartPort (generic hashable for services)
* 07/05/14 Massimiliano Pinto Added: version_string initialized to NULL
* 23/05/14 Mark Riddoch Addition of service validation call
* 29/05/14 Mark Riddoch Filter API implementation
*
* @endverbatim
*/
@ -46,6 +47,7 @@
#include <modules.h>
#include <dcb.h>
#include <users.h>
#include <filter.h>
#include <dbusers.h>
#include <poll.h>
#include <skygw_utils.h>
@ -110,6 +112,8 @@ SERVICE *service;
service->databases = NULL;
service->svc_config_param = NULL;
service->svc_config_version = 0;
service->filters = NULL;
service->n_filters = 0;
spinlock_init(&service->spin);
spinlock_init(&service->users_table_spin);
memset(&service->rate_limit, 0, sizeof(SERVICE_REFRESH_RATE));
@ -538,10 +542,13 @@ serviceClearRouterOptions(SERVICE *service)
int i;
spinlock_acquire(&service->spin);
for (i = 0; service->routerOptions[i]; i++)
free(service->routerOptions[i]);
free(service->routerOptions);
service->routerOptions = NULL;
if (service->routerOptions != NULL)
{
for (i = 0; service->routerOptions[i]; i++)
free(service->routerOptions[i]);
free(service->routerOptions);
service->routerOptions = NULL;
}
spinlock_release(&service->spin);
}
/**
@ -608,6 +615,63 @@ serviceEnableRootUser(SERVICE *service, int action)
return 1;
}
/**
* Trim whitespace from the from an rear of a string
*
* @param str String to trim
* @return Trimmed string, chanesg are done in situ
*/
static char *
trim(char *str)
{
char *ptr;
while (isspace(*str))
str++;
/* Point to last character of the string */
ptr = str + strlen(str) - 1;
while (ptr > str && isspace(*ptr))
*ptr-- = 0;
return str;
}
/**
* Set the filters used by the service
*
* @param service The service itself
* @param filters ASCII string of filters to use
*/
void
serviceSetFilters(SERVICE *service, char *filters)
{
FILTER_DEF **flist;
char *ptr, *brkt;
int n = 0;
flist = (FILTER_DEF *)malloc(sizeof(FILTER_DEF *));
ptr = strtok_r(filters, "|", &brkt);
while (ptr)
{
n++;
flist = (FILTER_DEF *)realloc(flist, (n + 1) * sizeof(FILTER_DEF *));
if ((flist[n-1] = filter_find(trim(ptr))) == NULL)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Unable to find filter '%s' for service '%s'\n",
trim(ptr), service->name
)));
}
flist[n] = NULL;
ptr = strtok_r(NULL, "|", &brkt);
}
service->filters = flist;
service->n_filters = n;
}
/**
* Return a named service
*
@ -638,6 +702,7 @@ void
printService(SERVICE *service)
{
SERVER *ptr = service->databases;
int i;
printf("Service %p\n", service);
printf("\tService: %s\n", service->name);
@ -649,6 +714,16 @@ SERVER *ptr = service->databases;
printf("\t\t%s:%d Protocol: %s\n", ptr->name, ptr->port, ptr->protocol);
ptr = ptr->nextdb;
}
if (service->n_filters)
{
printf("\tFilter chain: ");
for (i = 0; i < service->n_filters; i++)
{
printf("%s %s ", service->filters[i]->name,
i + 1 < service->n_filters ? "|" : "");
}
printf("\n");
}
printf("\tUsers data: %p\n", service->users);
printf("\tTotal connections: %d\n", service->stats.n_sessions);
printf("\tCurrently connected: %d\n", service->stats.n_current);
@ -705,6 +780,7 @@ SERVICE *ptr;
void dprintService(DCB *dcb, SERVICE *service)
{
SERVER *server = service->databases;
int i;
dcb_printf(dcb, "Service %p\n", service);
dcb_printf(dcb, "\tService: %s\n", service->name);
@ -714,6 +790,16 @@ SERVER *server = service->databases;
service->router->diagnostics(service->router_instance, dcb);
dcb_printf(dcb, "\tStarted: %s",
asctime(localtime(&service->stats.started)));
if (service->n_filters)
{
dcb_printf(dcb, "\tFilter chain: ");
for (i = 0; i < service->n_filters; i++)
{
dcb_printf(dcb, "%s %s ", service->filters[i]->name,
i + 1 < service->n_filters ? "|" : "");
}
dcb_printf(dcb, "\n");
}
dcb_printf(dcb, "\tBackend databases\n");
while (server)
{
@ -996,4 +1082,4 @@ char* service_get_name(
SERVICE* svc)
{
return svc->name;
}
}

View File

@ -25,6 +25,7 @@
* Date Who Description
* 17/06/13 Mark Riddoch Initial implementation
* 02/09/13 Massimiliano Pinto Added session refcounter
* 29/05/14 Mark Riddoch Addition of filter mechanism
*
* @endverbatim
*/
@ -47,6 +48,9 @@ extern int lm_enabled_logfiles_bitmask;
static SPINLOCK session_spin = SPINLOCK_INIT;
static SESSION *allSessions = NULL;
static int session_setup_filters(SESSION *session);
/**
* Allocate a new session for a new client of the specified service.
*
@ -90,6 +94,7 @@ session_alloc(SERVICE *service, DCB *client_dcb)
spinlock_acquire(&session->ses_lock);
session->service = service;
session->client = client_dcb;
session->n_filters = 0;
memset(&session->stats, 0, sizeof(SESSION_STATS));
session->stats.connect = time(0);
session->state = SESSION_STATE_ALLOC;
@ -132,7 +137,7 @@ session_alloc(SERVICE *service, DCB *client_dcb)
/**
* Inform other threads that session is closing.
*/
session->state == SESSION_STATE_STOPPING;
session->state = SESSION_STATE_STOPPING;
/*<
* Decrease refcount, set dcb's session pointer NULL
* and set session pointer to NULL.
@ -147,7 +152,45 @@ session_alloc(SERVICE *service, DCB *client_dcb)
goto return_session;
}
/*
* Pending filter chain being setup set the head of the chain to
* be the router. As filters are inserted the current head will
* be pushed to the filter and the head updated.
*
* NB This dictates that filters are created starting at the end
* of the chain nearest the router working back to the client
* protocol end of the chain.
*/
session->head.instance = service->router_instance;
session->head.session = session->router_session;
session->head.routeQuery = service->router->routeQuery;
if (service->n_filters > 0)
{
if (!session_setup_filters(session))
{
/**
* Inform other threads that session is closing.
*/
session->state = SESSION_STATE_STOPPING;
/*<
* Decrease refcount, set dcb's session pointer NULL
* and set session pointer to NULL.
*/
session_free(session);
client_dcb->session = NULL;
session = NULL;
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Failed to create %s session.",
service->name)));
goto return_session;
}
}
}
spinlock_acquire(&session_spin);
session->state = SESSION_STATE_ROUTER_READY;
session->next = allSessions;
@ -227,6 +270,7 @@ bool session_free(
bool succp = false;
SESSION *ptr;
int nlink;
int i;
CHK_SESSION(session);
@ -262,10 +306,23 @@ bool session_free(
/* Free router_session and session */
if (session->router_session) {
session->service->router->closeSession(
session->service->router_instance,
session->router_session);
session->service->router->freeSession(
session->service->router_instance,
session->router_session);
}
if (session->n_filters)
{
for (i = 0; i < session->n_filters; i++)
{
session->filters[i].filter->obj->freeSession(
session->filters[i].instance,
session->filters[i].session);
}
free(session->filters);
}
free(session);
succp = true;
@ -440,6 +497,8 @@ SESSION *ptr;
void
dprintSession(DCB *dcb, SESSION *ptr)
{
int i;
dcb_printf(dcb, "Session %p\n", ptr);
dcb_printf(dcb, "\tState: %s\n", session_state(ptr->state));
dcb_printf(dcb, "\tService: %s (%p)\n", ptr->service->name, ptr->service);
@ -447,6 +506,18 @@ dprintSession(DCB *dcb, SESSION *ptr)
if (ptr->client && ptr->client->remote)
dcb_printf(dcb, "\tClient Address: %s\n", ptr->client->remote);
dcb_printf(dcb, "\tConnected: %s", asctime(localtime(&ptr->stats.connect)));
if (ptr->n_filters)
{
for (i = 0; i < ptr->n_filters; i++)
{
dcb_printf(dcb, "\tFilter: %s\n",
ptr->filters[i].filter->name);
ptr->filters[i].filter->obj->diagnostics(
ptr->filters[i].instance,
ptr->filters[i].session,
dcb);
}
}
}
/**
@ -520,3 +591,54 @@ SESSION* get_session_by_router_ses(
}
return ses;
}
/**
* Create the filter chain for this session.
*
* Filters must be setup in reverse order, starting with the last
* filter in the chain and working back towards the client connection
* Each filter is passed the current session head of the filter chain
* this head becomes the destination for the filter. The newly created
* filter becomes the new head of the filter chain.
*
* @param session The session that requires the chain
* @return 0 if filter creation fails
*/
static int
session_setup_filters(SESSION *session)
{
SERVICE *service = session->service;
DOWNSTREAM *head;
int i;
if ((session->filters = calloc(service->n_filters,
sizeof(SESSION_FILTER))) == NULL)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Insufficient memory to allocate session filter "
"tracking.\n")));
return 0;
}
session->n_filters = service->n_filters;
for (i = service->n_filters - 1; i >= 0; i--)
{
if ((head = filterApply(service->filters[i], session,
&session->head)) == NULL)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Failed to create filter '%s' for service '%s'.\n",
service->filters[i]->name,
service->name)));
return 0;
}
session->filters[i].filter = service->filters[i];
session->filters[i].session = head->session;
session->filters[i].instance = head->instance;
session->head = *head;
}
return 1;
}

View File

@ -93,6 +93,8 @@ typedef struct gwbuf {
/*< Consume a number of bytes in the buffer */
#define GWBUF_CONSUME(b, bytes) (b)->start += bytes
#define GWBUF_RTRIM(b, bytes) (b)->end -= bytes
#define GWBUF_TYPE(b) (b)->gwbuf_type
/*<
* Function prototypes for the API to maniplate the buffers

View File

@ -28,6 +28,7 @@
* Date Who Description
* 21/06/13 Mark Riddoch Initial implementation
* 07/05/14 Massimiliano Pinto Added version_string to global configuration
* 23/05/14 Massimiliano Pinto Added id to global configuration
*
* @endverbatim
*/
@ -78,6 +79,7 @@ typedef struct config_context {
typedef struct {
int n_threads; /**< Number of polling threads */
char *version_string; /**< The version string of embedded database library */
unsigned long id; /**< MaxScale ID */
} GATEWAY_CONF;
extern int config_load(char *);

View File

@ -19,6 +19,7 @@
*/
#include <spinlock.h>
#include <buffer.h>
#include <modinfo.h>
#include <gwbitmask.h>
#include <skygw_utils.h>
#include <netinet/in.h>
@ -95,6 +96,13 @@ typedef struct gw_protocol {
int (*session)(struct dcb *, void *);
} GWPROTOCOL;
/**
* The GWPROTOCOL version data. The following should be updated whenever
* the GWPROTOCOL structure is changed. See the rules defined in modinfo.h
* that define how these numbers should change.
*/
#define GWPROTOCOL_VERSION {1, 0, 0}
/**
* The statitics gathered on a descriptor control block
*/

114
server/include/filter.h Normal file
View File

@ -0,0 +1,114 @@
#ifndef _FILTER_H
#define _FILTER_H
/*
* This file is distributed as part of the SkySQL Gateway. It is free
* software: you can redistribute it and/or modify it under the terms of the
* GNU General Public License as published by the Free Software Foundation,
* version 2.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along with
* this program; if not, write to the Free Software Foundation, Inc., 51
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Copyright SkySQL Ab 2014
*/
/**
* @file filter.h - The filter interface mechanisms
*
* Revision History
*
* Date Who Description
* 27/05/2014 Mark Riddoch Initial implementation
*
*/
#include <dcb.h>
#include <session.h>
#include <buffer.h>
#include <stdint.h>
/**
* The FILTER handle points to module specific data, so the best we can do
* is to make it a void * externally.
*/
typedef void *FILTER;
/**
* The structure used to pass name, value pairs to the filter instances
*/
typedef struct {
char *name; /**< Name of the parameter */
char *value; /**< Value of the parameter */
} FILTER_PARAMETER;
/**
* @verbatim
* The "module object" structure for a query router module
*
* The entry points are:
* createInstance Called by the service to create a new
* instance of the filter
* newSession Called to create a new user session
* within the filter
* closeSession Called when a session is closed
* freeSession Called when a session is freed
* setDownstream Sets the downstream component of the
* filter pipline
* routeQuery Called on each query that requires
* routing
* diagnostics Called to force the filter to print
* diagnostic output
*
* @endverbatim
*
* @see load_module
*/
typedef struct filter_object {
FILTER *(*createInstance)(char **options, FILTER_PARAMETER **);
void *(*newSession)(FILTER *instance, SESSION *session);
void (*closeSession)(FILTER *instance, void *fsession);
void (*freeSession)(FILTER *instance, void *fsession);
void (*setDownstream)(FILTER *instance, void *fsession, DOWNSTREAM *downstream);
int (*routeQuery)(FILTER *instance, void *fsession, GWBUF *queue);
void (*diagnostics)(FILTER *instance, void *fsession, DCB *dcb);
} FILTER_OBJECT;
/**
* The filter API version. If the FILTER_OBJECT structure or the filter API
* is changed these values must be updated in line with the rules in the
* file modinfo.h.
*/
#define FILTER_VERSION {1, 0, 0}
/**
* The definition of a filter form the configuration file.
* This is basically the link between a plugin to load and the
* optons to pass to that plugin.
*/
typedef struct filter_def {
char *name; /*< The Filter name */
char *module; /*< The module to load */
char **options; /*< The options set for this filter */
FILTER_PARAMETER
**parameters; /*< The filter parameters */
FILTER filter;
FILTER_OBJECT *obj;
SPINLOCK spin;
struct filter_def
*next; /*< Next filter in the chain of all filters */
} FILTER_DEF;
FILTER_DEF *filter_alloc(char *, char *);
void filter_free(FILTER_DEF *);
FILTER_DEF *filter_find(char *);
void filterAddOption(FILTER_DEF *, char *);
void filterAddParameter(FILTER_DEF *, char *, char *);
DOWNSTREAM *filterApply(FILTER_DEF *, SESSION *, DOWNSTREAM *);
void dprintAllFilters(DCB *);
void dprintFilter(DCB *, FILTER_DEF *);
void dListFilters(DCB *);
#endif

85
server/include/modinfo.h Normal file
View File

@ -0,0 +1,85 @@
#ifndef _MODINFO_H
#define _MODINFO_H
/*
* This file is distributed as part of MaxScale. It is free
* software: you can redistribute it and/or modify it under the terms of the
* GNU General Public License as published by the Free Software Foundation,
* version 2.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along with
* this program; if not, write to the Free Software Foundation, Inc., 51
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Copyright SkySQL Ab 2014
*/
/**
* @file modinfo.h The module information interface
*
* @verbatim
* Revision History
*
* Date Who Description
* 02/06/14 Mark Riddoch Initial implementation
*
* @endverbatim
*/
/**
* The status of the module. This gives some idea of the module
* maturity.
*/
typedef enum {
MODULE_IN_DEVELOPMENT = 0,
MODULE_ALPHA_RELEASE,
MODULE_BETA_RELEASE,
MODULE_GA
} MODULE_STATUS;
/**
* The API implemented by the module
*/
typedef enum {
MODULE_API_PROTOCOL = 0,
MODULE_API_ROUTER,
MODULE_API_MONITOR,
MODULE_API_FILTER,
MODULE_API_AUTHENTICATION
} MODULE_API;
/**
* The module version structure.
*
* The rules for changing these values are:
*
* Any change that affects an inexisting call in the API in question,
* making the new API no longer compatible with the old,
* must increment the major version.
*
* Any change that adds to the API, but does not alter the existing API
* calls, must increment the minor version.
*
* Any change that is purely cosmetic and does not affect the calling
* conventions of the API must increment only the patch version number.
*/
typedef struct {
int major;
int minor;
int patch;
} MODULE_VERSION;
/**
* The module information structure
*/
typedef struct {
MODULE_API modapi;
MODULE_STATUS status;
MODULE_VERSION api_version;
char *description;
} MODULE_INFO;
#endif

View File

@ -18,6 +18,7 @@
* Copyright SkySQL Ab 2013
*/
#include <dcb.h>
#include <modinfo.h>
/**
* @file modules.h Utilities for loading modules
@ -30,6 +31,7 @@
* Date Who Description
* 13/06/13 Mark Riddoch Initial implementation
* 08/07/13 Mark Riddoch Addition of monitor modules
* 29/05/14 Mark Riddoch Addition of filter modules
* @endverbatim
*/
@ -39,6 +41,8 @@ typedef struct modules {
char *version; /**< Module version */
void *handle; /**< The handle returned by dlopen */
void *modobj; /**< The module "object" this is the set of entry points */
MODULE_INFO
*info; /**< The module information */
struct modules
*next; /**< Next module in the linked list */
} MODULES;
@ -49,6 +53,7 @@ typedef struct modules {
#define MODULE_PROTOCOL "Protocol" /**< A protocol module type */
#define MODULE_ROUTER "Router" /**< A router module type */
#define MODULE_MONITOR "Monitor" /**< A database monitor module type */
#define MODULE_FILTER "Filter" /**< A filter module type */
extern void *load_module(const char *module, const char *type);

37
server/include/modutil.h Normal file
View File

@ -0,0 +1,37 @@
#ifndef _MODUTIL_H
#define _MODUTIL_H
/*
* This file is distributed as part of MaxScale from SkySQL. It is free
* software: you can redistribute it and/or modify it under the terms of the
* GNU General Public License as published by the Free Software Foundation,
* version 2.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along with
* this program; if not, write to the Free Software Foundation, Inc., 51
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Copyright SkySQL Ab 2014
*/
/**
* @file modutil.h A set of useful routines for module writers
*
* @verbatim
* Revision History
*
* Date Who Description
* 04/06/14 Mark Riddoch Initial implementation
*
* @endverbatim
*/
#include <buffer.h>
extern int modutil_is_SQL(GWBUF *);
extern int modutil_extract_SQL(GWBUF *, char **, int *);
extern GWBUF *modutil_replace_SQL(GWBUF *, char *);
#endif

View File

@ -26,10 +26,11 @@
* @verbatim
* Revision History
*
* Date Who Description
* 07/07/13 Mark Riddoch Initial implementation
* 25/07/13 Mark Riddoch Addition of diagnotics
* 23/05/14 Mark Riddoch Addition of routine to find monitors by name
* Date Who Description
* 07/07/13 Mark Riddoch Initial implementation
* 25/07/13 Mark Riddoch Addition of diagnotics
* 23/05/14 Mark Riddoch Addition of routine to find monitors by name
* 23/05/14 Massimiliano Pinto Addition of defaultId and setInterval
*
* @endverbatim
*/
@ -66,8 +67,17 @@ typedef struct {
void (*unregisterServer)(void *, SERVER *);
void (*defaultUser)(void *, char *, char *);
void (*diagnostics)(DCB *, void *);
void (*setInterval)(void *, unsigned long);
void (*defaultId)(void *, unsigned long);
void (*replicationHeartbeat)(void *, int);
} MONITOR_OBJECT;
/**
* The monitor API version number. Any change to the monitor module API
* must change these versions usign the rules defined in modinfo.h
*/
#define MONITOR_VERSION {1, 0, 0}
/**
* Representation of the running monitor.
*/
@ -87,4 +97,7 @@ extern void monitorStop(MONITOR *);
extern void monitorStart(MONITOR *);
extern void monitorStopAll();
extern void monitorShowAll(DCB *);
extern void monitorSetId(MONITOR *, unsigned long);
extern void monitorSetInterval (MONITOR *, unsigned long);
extern void monitorSetReplicationHeartbeat(MONITOR *, int);
#endif

View File

@ -84,6 +84,13 @@ typedef struct router_object {
uint8_t (*getCapabilities)(ROUTER *instance, void* router_session);
} ROUTER_OBJECT;
/**
* The router module API version. Any change that changes the router API
* must update these versions numbers in accordance with the rules in
* modinfo.h.
*/
#define ROUTER_VERSION { 1, 0, 0 }
typedef enum router_capability_t {
RCAP_TYPE_UNDEFINED = 0,
RCAP_TYPE_STMT_INPUT = (1 << 0),

View File

@ -34,6 +34,8 @@
* 18/05/14 Mark Riddoch Addition of unique_name field
* 20/05/14 Massimiliano Pinto Addition of server_string field
* 20/05/14 Massimiliano Pinto Addition of node_id field
* 23/05/14 Massimiliano Pinto Addition of rlag and node_ts fields
* 03/06/14 Mark Riddoch Addition of maintainance mode
*
* @endverbatim
*/
@ -66,6 +68,8 @@ typedef struct server {
struct server *nextdb; /**< Next server in list attached to a service */
char *server_string; /**< Server version string, i.e. MySQL server version */
long node_id; /**< Node id, server_id for M/S or local_index for Galera */
int rlag; /**< Replication Lag for Master / Slave replication */
unsigned long node_ts; /**< Last timestamp set from M/S monitor module */
} SERVER;
/**
@ -77,14 +81,13 @@ typedef struct server {
#define SERVER_MASTER 0x0002 /**<< The server is a master, i.e. can handle writes */
#define SERVER_SLAVE 0x0004 /**<< The server is a slave, i.e. can handle reads */
#define SERVER_JOINED 0x0008 /**<< The server is joined in a Galera cluster */
#define SERVER_MAINT 0x1000 /**<< Server is in maintenance mode */
#define SERVER_MAINT 0x1000 /**<< Server is in maintenance mode */
/**
* Is the server running - the macro returns true if the server is marked as running
* regardless of it's state as a master or slave
*/
#define SERVER_IS_RUNNING(server) ((server)->status & SERVER_RUNNING)
#define SERVER_IS_RUNNING(server) (((server)->status & (SERVER_RUNNING|SERVER_MAINT)) == SERVER_RUNNING)
/**
* Is the server marked as down - the macro returns true if the server is beleived
* to be inoperable.
@ -95,26 +98,25 @@ typedef struct server {
* in order for the macro to return true
*/
#define SERVER_IS_MASTER(server) \
(((server)->status & (SERVER_RUNNING|SERVER_MASTER|SERVER_SLAVE)) == (SERVER_RUNNING|SERVER_MASTER))
(((server)->status & (SERVER_RUNNING|SERVER_MASTER|SERVER_SLAVE|SERVER_MAINT)) == (SERVER_RUNNING|SERVER_MASTER))
/**
* Is the server a slave? The server must be both running and marked as a slave
* in order for the macro to return true
*/
#define SERVER_IS_SLAVE(server) \
(((server)->status & (SERVER_RUNNING|SERVER_MASTER|SERVER_SLAVE)) == (SERVER_RUNNING|SERVER_SLAVE))
(((server)->status & (SERVER_RUNNING|SERVER_MASTER|SERVER_SLAVE|SERVER_MAINT)) == (SERVER_RUNNING|SERVER_SLAVE))
/**
* Is the server joined Galera node? The server must be running and joined.
*/
#define SERVER_IS_JOINED(server) \
(((server)->status & (SERVER_RUNNING|SERVER_JOINED)) == (SERVER_RUNNING|SERVER_JOINED))
(((server)->status & (SERVER_RUNNING|SERVER_JOINED|SERVER_MAINT)) == (SERVER_RUNNING|SERVER_JOINED))
/**
* Is the server in maintenance mode.
*/
#define SERVER_IN_MAINT(server) ((server)->status & SERVER_MAINT)
* Is the server in maintenance mode.
*/
#define SERVER_IN_MAINT(server) ((server)->status & SERVER_MAINT)
extern SERVER *server_alloc(char *, char *, unsigned short);
extern int server_free(SERVER *);
extern SERVER *server_find_by_unique_name(char *);
@ -129,6 +131,5 @@ extern void server_set_status(SERVER *, int);
extern void server_clear_status(SERVER *, int);
extern void serverAddMonUser(SERVER *, char *, char *);
extern void server_update(SERVER *, char *, char *, char *);
void server_set_unique_name(SERVER *server, char *name);
extern void server_set_unique_name(SERVER *, char *);
#endif

View File

@ -22,6 +22,7 @@
#include <spinlock.h>
#include <dcb.h>
#include <server.h>
#include <filter.h>
#include "config.h"
/**
@ -39,7 +40,9 @@
* 23/06/13 Mark Riddoch Added service user and users
* 06/02/14 Massimiliano Pinto Added service flag for root user access
* 25/02/14 Massimiliano Pinto Added service refresh limit feature
* 07/05/14 Massimiliano Pinto Added version_string field to service struct
* 07/05/14 Massimiliano Pinto Added version_string field to service
* struct
* 29/05/14 Mark Riddoch Filter API mechanism
*
* @endverbatim
*/
@ -73,9 +76,10 @@ typedef struct {
} SERVICE_STATS;
/**
* The service user structure holds the information that is needed for this service to
* allow the gateway to login to the backend database and extact information such as
* the user table or other database status or configuration data.
* The service user structure holds the information that is needed
for this service to allow the gateway to login to the backend
database and extact information such as the user table or other
database status or configuration data.
*/
typedef struct {
char *name; /**< The user name to use to extract information */
@ -83,8 +87,8 @@ typedef struct {
} SERVICE_USER;
/**
* The service refresh rate hols the counter and last load timet for this service to
* load users data from the backend database
* The service refresh rate holds the counter and last load time_t
for this service to load users data from the backend database
*/
typedef struct {
int nloads;
@ -99,30 +103,33 @@ typedef struct {
* to the service.
*/
typedef struct service {
char *name; /**< The service name */
int state; /**< The service state */
SERV_PROTOCOL *ports; /**< Linked list of ports and protocols
* that this service will listen on.
*/
char *routerModule; /**< Name of router module to use */
char **routerOptions; /**< Router specific option strings */
char *name; /**< The service name */
int state; /**< The service state */
SERV_PROTOCOL *ports; /**< Linked list of ports and protocols
* that this service will listen on.
*/
char *routerModule; /**< Name of router module to use */
char **routerOptions;/**< Router specific option strings */
struct router_object
*router; /**< The router we are using */
*router; /**< The router we are using */
void *router_instance;
/**< The router instance for this service */
char *version_string; /** version string for this service listeners */
struct server *databases; /**< The set of servers in the backend */
SERVICE_USER credentials; /**< The cedentials of the service user */
SPINLOCK spin; /**< The service spinlock */
SERVICE_STATS stats; /**< The service statistics */
struct users *users; /**< The user data for this service */
int enable_root; /**< Allow root user access */
CONFIG_PARAMETER* svc_config_param; /*< list of config params and values */
int svc_config_version; /*< Version number of configuration */
/**< The router instance for this service */
char *version_string;/** version string for this service listeners */
struct server *databases; /**< The set of servers in the backend */
SERVICE_USER credentials; /**< The cedentials of the service user */
SPINLOCK spin; /**< The service spinlock */
SERVICE_STATS stats; /**< The service statistics */
struct users *users; /**< The user data for this service */
int enable_root; /**< Allow root user access */
CONFIG_PARAMETER*
svc_config_param; /*< list of config params and values */
int svc_config_version; /*< Version number of configuration */
SPINLOCK
users_table_spin; /**< The spinlock for users data refresh */
SERVICE_REFRESH_RATE
rate_limit; /**< The refresh rate limit for users table */
FILTER_DEF **filters; /**< Ordered list of filters */
int n_filters; /**< Number of filters */
struct service *next; /**< The next service in the linked list */
} SERVICE;

View File

@ -31,16 +31,20 @@
* 01-07-2013 Massimiliano Pinto Removed backends pointer
* from struct session
* 02-09-2013 Massimiliano Pinto Added session ref counter
* 29-05-2014 Mark Riddoch Support for filter mechanism
* added
*
* @endverbatim
*/
#include <time.h>
#include <atomic.h>
#include <buffer.h>
#include <spinlock.h>
#include <skygw_utils.h>
struct dcb;
struct service;
struct filter_def;
/**
* The session statistics structure
@ -59,6 +63,39 @@ typedef enum {
SESSION_STATE_FREE /*< for all sessions */
} session_state_t;
/**
* The downstream element in the filter chain. This may refer to
* another filter or to a router.
*/
typedef struct {
void *instance;
void *session;
int (*routeQuery)(void *instance,
void *router_session, GWBUF *queue);
} DOWNSTREAM;
/**
* The upstream element in the filter chain. This may refer to
* another filter or to the protocol implementation.
*/
typedef struct {
void *instance;
void *session;
int (*write)(void *, void *, GWBUF *);
int (*error)(void *);
} UPSTREAM;
/**
* Structure used to track the filter instances and sessions of the filters
* that are in use within a session.
*/
typedef struct {
struct filter_def
*filter;
void *instance;
void *session;
} SESSION_FILTER;
/**
* The session status block
*
@ -77,6 +114,9 @@ typedef struct session {
void *router_session;/**< The router instance data */
SESSION_STATS stats; /**< Session statistics */
struct service *service; /**< The service this session is using */
int n_filters; /**< Number of filter sessions */
SESSION_FILTER *filters; /**< The filters in use within this session */
DOWNSTREAM head; /**< Head of the filter chain */
struct session *next; /**< Linked list of all sessions */
int refcount; /**< Reference count on the session */
#if defined(SS_DEBUG)
@ -86,6 +126,15 @@ typedef struct session {
#define SESSION_PROTOCOL(x, type) DCB_PROTOCOL((x)->client, type)
/**
* A convenience macro that can be used by the protocol modules to route
* the incoming data to the first element in the pipeline of filters and
* routers.
*/
#define SESSION_ROUTE_QUERY(session, buf) \
((session)->head.routeQuery)((session)->head.instance, \
(session)->head.session, (buf))
SESSION *session_alloc(struct service *, struct dcb *);
bool session_free(SESSION *);
int session_isvalid(SESSION *);

View File

@ -0,0 +1,86 @@
# This file is distributed as part of MaxScale form SkySQL. It is free
# software: you can redistribute it and/or modify it under the terms of the
# GNU General Public License as published by the Free Software Foundation,
# version 2.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc., 51
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
# Copyright SkySQL Ab 2014
#
# Revision History
# Date Who Description
# 29/05/14 Mark Riddoch Initial module development
include ../../../build_gateway.inc
LOGPATH := $(ROOT_PATH)/log_manager
UTILSPATH := $(ROOT_PATH)/utils
CC=cc
CFLAGS=-c -fPIC -I/usr/include -I../include -I../../include -I$(LOGPATH) \
-I$(UTILSPATH) -Wall -g
include ../../../makefile.inc
LDFLAGS=-shared -L$(LOGPATH) -Wl,-rpath,$(DEST)/lib \
-Wl,-rpath,$(LOGPATH) -Wl,-rpath,$(UTILSPATH)
TESTSRCS=testfilter.c
TESTOBJ=$(TESTSRCS:.c=.o)
QLASRCS=qlafilter.c
QLAOBJ=$(QLASRCS:.c=.o)
REGEXSRCS=regexfilter.c
REGEXOBJ=$(REGEXSRCS:.c=.o)
SRCS=$(TESTSRCS) $(QLASRCS) $(REGEXSRCS)
OBJ=$(SRCS:.c=.o)
LIBS=$(UTILSPATH)/skygw_utils.o -lssl -llog_manager
MODULES= libtestfilter.so libqlafilter.so libregexfilter.so
all: $(MODULES)
libtestfilter.so: $(TESTOBJ)
$(CC) $(LDFLAGS) $(TESTOBJ) $(LIBS) -o $@
libqlafilter.so: $(QLAOBJ)
$(CC) $(LDFLAGS) $(QLAOBJ) $(LIBS) -o $@
libregexfilter.so: $(REGEXOBJ)
$(CC) $(LDFLAGS) $(REGEXOBJ) $(LIBS) -o $@
.c.o:
$(CC) $(CFLAGS) $< -o $@
clean:
rm -f $(OBJ) $(MODULES)
tags:
ctags $(SRCS) $(HDRS)
depend:
@rm -f depend.mk
cc -M $(CFLAGS) $(SRCS) > depend.mk
install: $(MODULES)
install -D $(MODULES) $(DEST)/MaxScale/modules
cleantests:
$(MAKE) -C test cleantests
buildtests:
$(MAKE) -C test DEBUG=Y buildtests
runtests:
$(MAKE) -C test runtests
testall:
$(MAKE) -C test testall
include depend.mk

View File

@ -0,0 +1,295 @@
/*
* This file is distributed as part of MaxScale by SkySQL. It is free
* software: you can redistribute it and/or modify it under the terms of the
* GNU General Public License as published by the Free Software Foundation,
* version 2.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along with
* this program; if not, write to the Free Software Foundation, Inc., 51
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Copyright SkySQL Ab 2014
*/
/**
* QLA Filter - Query Log All. A primitive query logging filter, simply
* used to verify the filter mechanism for downstream filters. All queries
* that are passed through the filter will be written to file.
*
* The filter makes no attempt to deal with query packets that do not fit
* in a single GWBUF.
*
* A single option may be passed to the filter, this is the name of the
* file to which the queries are logged. A serial number is appended to this
* name in order that each session logs to a different file.
*/
#include <stdio.h>
#include <fcntl.h>
#include <filter.h>
#include <modinfo.h>
#include <modutil.h>
#include <string.h>
#include <time.h>
#include <sys/time.h>
MODULE_INFO info = {
MODULE_API_FILTER,
MODULE_ALPHA_RELEASE,
FILTER_VERSION,
"A simple query logging filter"
};
static char *version_str = "V1.0.0";
/*
* The filter entry points
*/
static FILTER *createInstance(char **options, FILTER_PARAMETER **);
static void *newSession(FILTER *instance, SESSION *session);
static void closeSession(FILTER *instance, void *session);
static void freeSession(FILTER *instance, void *session);
static void setDownstream(FILTER *instance, void *fsession, DOWNSTREAM *downstream);
static int routeQuery(FILTER *instance, void *fsession, GWBUF *queue);
static void diagnostic(FILTER *instance, void *fsession, DCB *dcb);
static FILTER_OBJECT MyObject = {
createInstance,
newSession,
closeSession,
freeSession,
setDownstream,
routeQuery,
diagnostic,
};
/**
* A instance structure, the assumption is that the option passed
* to the filter is simply a base for the filename to which the queries
* are logged.
*
* To this base a session number is attached such that each session will
* have a nique name.
*/
typedef struct {
int sessions;
char *filebase;
} QLA_INSTANCE;
/**
* The session structure for this QLA filter.
* This stores the downstream filter information, such that the
* filter is able to pass the query on to the next filter (or router)
* in the chain.
*
* It also holds the file descriptor to which queries are written.
*/
typedef struct {
DOWNSTREAM down;
char *filename;
int fd;
} QLA_SESSION;
/**
* Implementation of the mandatory version entry point
*
* @return version string of the module
*/
char *
version()
{
return version_str;
}
/**
* The module initialisation routine, called when the module
* is first loaded.
*/
void
ModuleInit()
{
}
/**
* The module entry point routine. It is this routine that
* must populate the structure that is referred to as the
* "module object", this is a structure with the set of
* external entry points for this module.
*
* @return The module object
*/
FILTER_OBJECT *
GetModuleObject()
{
return &MyObject;
}
/**
* Create an instance of the filter for a particular service
* within MaxScale.
*
* @param options The options for this filter
*
* @return The instance data for this new instance
*/
static FILTER *
createInstance(char **options, FILTER_PARAMETER **params)
{
QLA_INSTANCE *my_instance;
if ((my_instance = calloc(1, sizeof(QLA_INSTANCE))) != NULL)
{
if (options)
my_instance->filebase = strdup(options[0]);
else
my_instance->filebase = strdup("qla");
my_instance->sessions = 0;
}
return (FILTER *)my_instance;
}
/**
* Associate a new session with this instance of the filter.
*
* Create the file to log to and open it.
*
* @param instance The filter instance data
* @param session The session itself
* @return Session specific data for this session
*/
static void *
newSession(FILTER *instance, SESSION *session)
{
QLA_INSTANCE *my_instance = (QLA_INSTANCE *)instance;
QLA_SESSION *my_session;
if ((my_session = calloc(1, sizeof(QLA_SESSION))) != NULL)
{
if ((my_session->filename =
(char *)malloc(strlen(my_instance->filebase) + 20))
== NULL)
{
free(my_session);
return NULL;
}
sprintf(my_session->filename, "%s.%d", my_instance->filebase,
my_instance->sessions);
my_instance->sessions++;
my_session->fd = open(my_session->filename,
O_WRONLY|O_CREAT|O_TRUNC, 0666);
}
return my_session;
}
/**
* Close a session with the filter, this is the mechanism
* by which a filter may cleanup data structure etc.
* In the case of the QLA filter we simple close the file descriptor.
*
* @param instance The filter instance data
* @param session The session being closed
*/
static void
closeSession(FILTER *instance, void *session)
{
QLA_SESSION *my_session = (QLA_SESSION *)session;
close(my_session->fd);
}
/**
* Free the memory associated with the session
*
* @param instance The filter instance
* @param session The filter session
*/
static void
freeSession(FILTER *instance, void *session)
{
QLA_SESSION *my_session = (QLA_SESSION *)session;
free(my_session->filename);
free(session);
return;
}
/**
* Set the downstream filter or router to which queries will be
* passed from this filter.
*
* @param instance The filter instance data
* @param session The filter session
* @param downstream The downstream filter or router.
*/
static void
setDownstream(FILTER *instance, void *session, DOWNSTREAM *downstream)
{
QLA_SESSION *my_session = (QLA_SESSION *)session;
my_session->down = *downstream;
}
/**
* The routeQuery entry point. This is passed the query buffer
* to which the filter should be applied. Once applied the
* query should normally be passed to the downstream component
* (filter or router) in the filter chain.
*
* @param instance The filter instance data
* @param session The filter session
* @param queue The query data
*/
static int
routeQuery(FILTER *instance, void *session, GWBUF *queue)
{
QLA_SESSION *my_session = (QLA_SESSION *)session;
char *ptr, t_buf[40];
int length;
struct tm t;
struct timeval tv;
if (modutil_extract_SQL(queue, &ptr, &length))
{
gettimeofday(&tv, NULL);
localtime_r(&tv.tv_sec, &t);
sprintf(t_buf, "%02d:%02d:%02d.%-3d %d/%02d/%d, ",
t.tm_hour, t.tm_min, t.tm_sec, (int)(tv.tv_usec / 1000),
t.tm_mday, t.tm_mon + 1, 1900 + t.tm_year);
write(my_session->fd, t_buf, strlen(t_buf));
write(my_session->fd, ptr, length);
write(my_session->fd, "\n", 1);
}
/* Pass the query downstream */
return my_session->down.routeQuery(my_session->down.instance,
my_session->down.session, queue);
}
/**
* Diagnostics routine
*
* If fsession is NULL then print diagnostics on the filter
* instance as a whole, otherwise print diagnostics for the
* particular session.
*
* @param instance The filter instance
* @param fsession Filter session, may be NULL
* @param dcb The DCB for diagnostic output
*/
static void
diagnostic(FILTER *instance, void *fsession, DCB *dcb)
{
QLA_SESSION *my_session = (QLA_SESSION *)fsession;
if (my_session)
{
dcb_printf(dcb, "\t\tLogging to file %s.\n",
my_session->filename);
}
}

View File

@ -0,0 +1,355 @@
/*
* This file is distributed as part of MaxScale by SkySQL. It is free
* software: you can redistribute it and/or modify it under the terms of the
* GNU General Public License as published by the Free Software Foundation,
* version 2.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along with
* this program; if not, write to the Free Software Foundation, Inc., 51
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Copyright SkySQL Ab 2014
*/
#include <stdio.h>
#include <filter.h>
#include <modinfo.h>
#include <modutil.h>
#include <string.h>
#include <regex.h>
/**
* regexfilter.c - a very simple regular expression rewrite filter.
*
* A simple regular expression query rewrite filter.
* Two parameters should be defined in the filter configuration
* match=<regular expression>
* replace=<replacement text>
*/
MODULE_INFO info = {
MODULE_API_FILTER,
MODULE_ALPHA_RELEASE,
FILTER_VERSION,
"A query rewrite filter that uses regular expressions to rewite queries"
};
static char *version_str = "V1.0.0";
static FILTER *createInstance(char **options, FILTER_PARAMETER **params);
static void *newSession(FILTER *instance, SESSION *session);
static void closeSession(FILTER *instance, void *session);
static void freeSession(FILTER *instance, void *session);
static void setDownstream(FILTER *instance, void *fsession, DOWNSTREAM *downstream);
static int routeQuery(FILTER *instance, void *fsession, GWBUF *queue);
static void diagnostic(FILTER *instance, void *fsession, DCB *dcb);
static char *regex_replace(char *sql, int length, regex_t *re, char *replace);
static FILTER_OBJECT MyObject = {
createInstance,
newSession,
closeSession,
freeSession,
setDownstream,
routeQuery,
diagnostic,
};
/**
* Instance structure
*/
typedef struct {
char *match; /* Regular expression to match */
char *replace; /* Replacement text */
regex_t re; /* Compiled regex text */
} REGEX_INSTANCE;
/**
* The session structure for this regex filter
*/
typedef struct {
DOWNSTREAM down;
int no_change;
int replacements;
} REGEX_SESSION;
/**
* Implementation of the mandatory version entry point
*
* @return version string of the module
*/
char *
version()
{
return version_str;
}
/**
* The module initialisation routine, called when the module
* is first loaded.
*/
void
ModuleInit()
{
}
/**
* The module entry point routine. It is this routine that
* must populate the structure that is referred to as the
* "module object", this is a structure with the set of
* external entry points for this module.
*
* @return The module object
*/
FILTER_OBJECT *
GetModuleObject()
{
return &MyObject;
}
/**
* Create an instance of the filter for a particular service
* within MaxScale.
*
* @param options The options for this filter
*
* @return The instance data for this new instance
*/
static FILTER *
createInstance(char **options, FILTER_PARAMETER **params)
{
REGEX_INSTANCE *my_instance;
int i;
if ((my_instance = calloc(1, sizeof(REGEX_INSTANCE))) != NULL)
{
my_instance->match = NULL;
my_instance->replace = NULL;
for (i = 0; params[i]; i++)
{
if (!strcmp(params[i]->name, "match"))
my_instance->match = strdup(params[i]->value);
if (!strcmp(params[i]->name, "replace"))
my_instance->replace = strdup(params[i]->value);
}
if (my_instance->match == NULL || my_instance->replace == NULL)
{
return NULL;
}
if (regcomp(&my_instance->re, my_instance->match, REG_ICASE))
{
free(my_instance->match);
free(my_instance->replace);
free(my_instance);
return NULL;
}
}
return (FILTER *)my_instance;
}
/**
* Associate a new session with this instance of the filter.
*
* @param instance The filter instance data
* @param session The session itself
* @return Session specific data for this session
*/
static void *
newSession(FILTER *instance, SESSION *session)
{
REGEX_SESSION *my_session;
if ((my_session = calloc(1, sizeof(REGEX_SESSION))) != NULL)
{
my_session->no_change = 0;
my_session->replacements = 0;
}
return my_session;
}
/**
* Close a session with the filter, this is the mechanism
* by which a filter may cleanup data structure etc.
*
* @param instance The filter instance data
* @param session The session being closed
*/
static void
closeSession(FILTER *instance, void *session)
{
}
/**
* Free the memory associated with this filter session.
*
* @param instance The filter instance data
* @param session The session being closed
*/
static void
freeSession(FILTER *instance, void *session)
{
free(session);
return;
}
/**
* Set the downstream component for this filter.
*
* @param instance The filter instance data
* @param session The session being closed
* @param downstream The downstream filter or router
*/
static void
setDownstream(FILTER *instance, void *session, DOWNSTREAM *downstream)
{
REGEX_SESSION *my_session = (REGEX_SESSION *)session;
my_session->down = *downstream;
}
/**
* The routeQuery entry point. This is passed the query buffer
* to which the filter should be applied. Once applied the
* query shoudl normally be passed to the downstream component
* (filter or router) in the filter chain.
*
* @param instance The filter instance data
* @param session The filter session
* @param queue The query data
*/
static int
routeQuery(FILTER *instance, void *session, GWBUF *queue)
{
REGEX_INSTANCE *my_instance = (REGEX_INSTANCE *)instance;
REGEX_SESSION *my_session = (REGEX_SESSION *)session;
char *sql, *newsql;
int length;
if (modutil_is_SQL(queue))
{
modutil_extract_SQL(queue, &sql, &length);
newsql = regex_replace(sql, length, &my_instance->re,
my_instance->replace);
if (newsql)
{
queue = modutil_replace_SQL(queue, newsql);
free(newsql);
my_session->replacements++;
}
else
my_session->no_change++;
}
return my_session->down.routeQuery(my_session->down.instance,
my_session->down.session, queue);
}
/**
* Diagnostics routine
*
* If fsession is NULL then print diagnostics on the filter
* instance as a whole, otherwise print diagnostics for the
* particular session.
*
* @param instance The filter instance
* @param fsession Filter session, may be NULL
* @param dcb The DCB for diagnostic output
*/
static void
diagnostic(FILTER *instance, void *fsession, DCB *dcb)
{
REGEX_INSTANCE *my_instance = (REGEX_INSTANCE *)instance;
REGEX_SESSION *my_session = (REGEX_SESSION *)fsession;
dcb_printf(dcb, "\t\tSearch and replace: s/%s/%s/\n",
my_instance->match, my_instance->replace);
if (my_session)
{
dcb_printf(dcb, "\t\tNo. of queries unaltered by filter: %d\n",
my_session->no_change);
dcb_printf(dcb, "\t\tNo. of queries altered by filter: %d\n",
my_session->replacements);
}
}
/**
* Perform a regular expression match and subsititution on the SQL
*
* @param sql The original SQL text
* @param length The length of the SQL text
* @param re The compiled regular expression
* @param replace The replacement text
* @return The replaced text or NULL if no replacement was done.
*/
static char *
regex_replace(char *sql, int length, regex_t *re, char *replace)
{
char *orig, *result, *ptr;
int i, res_size, res_length, rep_length;
int last_match;
regmatch_t match[10];
orig = strndup(sql, length);
if (regexec(re, orig, 10, match, 0))
{
free(orig);
return NULL;
}
res_size = 2 * length;
result = (char *)malloc(res_size);
res_length = 0;
rep_length = strlen(replace);
last_match = 0;
for (i = 0; i < 10; i++)
{
if (match[i].rm_so != -1)
{
ptr = &result[res_length];
if (last_match < match[i].rm_so)
{
int to_copy = match[i].rm_so - last_match;
if (last_match + to_copy > res_size)
{
res_size = last_match + to_copy + length;
result = (char *)realloc(result, res_size);
}
memcpy(ptr, &sql[last_match], to_copy);
res_length += to_copy;
}
last_match = match[i].rm_eo;
if (res_length + rep_length > res_size)
{
res_size += rep_length;
result = (char *)realloc(result, res_size);
}
ptr = &result[res_length];
memcpy(ptr, replace, rep_length);
res_length += rep_length;
}
}
if (last_match < length)
{
int to_copy = length - last_match;
if (last_match + to_copy > res_size)
{
res_size = last_match + to_copy + 1;
result = (char *)realloc(result, res_size);
}
ptr = &result[res_length];
memcpy(ptr, &sql[last_match], to_copy);
res_length += to_copy;
}
result[res_length] = 0;
return result;
}

View File

@ -0,0 +1,234 @@
/*
* This file is distributed as part of MaxScale by SkySQL. It is free
* software: you can redistribute it and/or modify it under the terms of the
* GNU General Public License as published by the Free Software Foundation,
* version 2.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along with
* this program; if not, write to the Free Software Foundation, Inc., 51
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Copyright SkySQL Ab 2014
*/
#include <stdio.h>
#include <filter.h>
#include <modinfo.h>
#include <modutil.h>
/**
* testfilter.c - a very simple test filter.
*
* This filter is a very simple example used to test the filter API,
* it merely counts the number of statements that flow through the
* filter pipeline.
*
* Reporting is done via the diagnostics print routine.
*/
MODULE_INFO info = {
MODULE_API_FILTER,
MODULE_ALPHA_RELEASE,
FILTER_VERSION,
"A simple query counting filter"
};
static char *version_str = "V1.0.0";
static FILTER *createInstance(char **options, FILTER_PARAMETER **params);
static void *newSession(FILTER *instance, SESSION *session);
static void closeSession(FILTER *instance, void *session);
static void freeSession(FILTER *instance, void *session);
static void setDownstream(FILTER *instance, void *fsession, DOWNSTREAM *downstream);
static int routeQuery(FILTER *instance, void *fsession, GWBUF *queue);
static void diagnostic(FILTER *instance, void *fsession, DCB *dcb);
static FILTER_OBJECT MyObject = {
createInstance,
newSession,
closeSession,
freeSession,
setDownstream,
routeQuery,
diagnostic,
};
/**
* A dummy instance structure
*/
typedef struct {
int sessions;
} TEST_INSTANCE;
/**
* A dummy session structure for this test filter
*/
typedef struct {
DOWNSTREAM down;
int count;
} TEST_SESSION;
/**
* Implementation of the mandatory version entry point
*
* @return version string of the module
*/
char *
version()
{
return version_str;
}
/**
* The module initialisation routine, called when the module
* is first loaded.
*/
void
ModuleInit()
{
}
/**
* The module entry point routine. It is this routine that
* must populate the structure that is referred to as the
* "module object", this is a structure with the set of
* external entry points for this module.
*
* @return The module object
*/
FILTER_OBJECT *
GetModuleObject()
{
return &MyObject;
}
/**
* Create an instance of the filter for a particular service
* within MaxScale.
*
* @param options The options for this filter
*
* @return The instance data for this new instance
*/
static FILTER *
createInstance(char **options, FILTER_PARAMETER **params)
{
TEST_INSTANCE *my_instance;
if ((my_instance = calloc(1, sizeof(TEST_INSTANCE))) != NULL)
my_instance->sessions = 0;
return (FILTER *)my_instance;
}
/**
* Associate a new session with this instance of the filter.
*
* @param instance The filter instance data
* @param session The session itself
* @return Session specific data for this session
*/
static void *
newSession(FILTER *instance, SESSION *session)
{
TEST_INSTANCE *my_instance = (TEST_INSTANCE *)instance;
TEST_SESSION *my_session;
if ((my_session = calloc(1, sizeof(TEST_SESSION))) != NULL)
{
my_instance->sessions++;
my_session->count = 0;
}
return my_session;
}
/**
* Close a session with the filter, this is the mechanism
* by which a filter may cleanup data structure etc.
*
* @param instance The filter instance data
* @param session The session being closed
*/
static void
closeSession(FILTER *instance, void *session)
{
}
/**
* Free the memory associated with this filter session.
*
* @param instance The filter instance data
* @param session The session being closed
*/
static void
freeSession(FILTER *instance, void *session)
{
free(session);
return;
}
/**
* Set the downstream component for this filter.
*
* @param instance The filter instance data
* @param session The session being closed
* @param downstream The downstream filter or router
*/
static void
setDownstream(FILTER *instance, void *session, DOWNSTREAM *downstream)
{
TEST_SESSION *my_session = (TEST_SESSION *)session;
my_session->down = *downstream;
}
/**
* The routeQuery entry point. This is passed the query buffer
* to which the filter should be applied. Once applied the
* query shoudl normally be passed to the downstream component
* (filter or router) in the filter chain.
*
* @param instance The filter instance data
* @param session The filter session
* @param queue The query data
*/
static int
routeQuery(FILTER *instance, void *session, GWBUF *queue)
{
TEST_SESSION *my_session = (TEST_SESSION *)session;
if (modutil_is_SQL(queue))
my_session->count++;
return my_session->down.routeQuery(my_session->down.instance,
my_session->down.session, queue);
}
/**
* Diagnostics routine
*
* If fsession is NULL then print diagnostics on the filter
* instance as a whole, otherwise print diagnostics for the
* particular session.
*
* @param instance The filter instance
* @param fsession Filter session, may be NULL
* @param dcb The DCB for diagnostic output
*/
static void
diagnostic(FILTER *instance, void *fsession, DCB *dcb)
{
TEST_INSTANCE *my_instance = (TEST_INSTANCE *)instance;
TEST_SESSION *my_session = (TEST_SESSION *)fsession;
if (my_session)
dcb_printf(dcb, "\t\tNo. of queries routed by filter: %d\n",
my_session->count);
else
dcb_printf(dcb, "\t\tNo. of sessions created: %d\n",
my_instance->sessions);
}

View File

@ -236,7 +236,6 @@ typedef struct router_instance {
} ROUTER_INSTANCE;
#define BACKEND_TYPE(b) (SERVER_IS_MASTER((b)->backend_server) ? BE_MASTER : \
(SERVER_IS_SLAVE((b)->backend_server) ? BE_SLAVE : \
(SERVER_IS_JOINED((b)->backend_server) ? BE_JOINED : BE_UNDEFINED)));
(SERVER_IS_SLAVE((b)->backend_server) ? BE_SLAVE : BE_UNDEFINED));
#endif /*< _RWSPLITROUTER_H */

View File

@ -26,6 +26,9 @@
* 22/07/13 Mark Riddoch Initial implementation
* 21/05/14 Massimiliano Pinto Monitor sets a master server
* that has the lowest value of wsrep_local_index
* 23/05/14 Massimiliano Pinto Added 1 configuration option (setInterval).
* Interval is printed in diagnostics.
* 03/06/14 Mark Riddoch Add support for maintenance mode
*
* @endverbatim
*/
@ -42,12 +45,20 @@
#include <log_manager.h>
#include <secrets.h>
#include <dcb.h>
#include <modinfo.h>
extern int lm_enabled_logfiles_bitmask;
static void monitorMain(void *);
static char *version_str = "V1.1.0";
static char *version_str = "V1.2.0";
MODULE_INFO info = {
MODULE_API_MONITOR,
MODULE_ALPHA_RELEASE,
MONITOR_VERSION,
"A Galera cluster monitor"
};
static void *startMonitor(void *);
static void stopMonitor(void *);
@ -55,8 +66,9 @@ static void registerServer(void *, SERVER *);
static void unregisterServer(void *, SERVER *);
static void defaultUsers(void *, char *, char *);
static void diagnostics(DCB *, void *);
static void setInterval(void *, unsigned long);
static MONITOR_OBJECT MyObject = { startMonitor, stopMonitor, registerServer, unregisterServer, defaultUsers, diagnostics };
static MONITOR_OBJECT MyObject = { startMonitor, stopMonitor, registerServer, unregisterServer, defaultUsers, diagnostics, setInterval, NULL, NULL };
/**
* Implementation of the mandatory version entry point
@ -121,6 +133,8 @@ MYSQL_MONITOR *handle;
handle->shutdown = 0;
handle->defaultUser = NULL;
handle->defaultPasswd = NULL;
handle->id = MONITOR_DEFAULT_ID;
handle->interval = MONITOR_INTERVAL;
spinlock_init(&handle->lock);
}
handle->tid = (THREAD)thread_start(monitorMain, handle);
@ -236,7 +250,10 @@ char *sep;
dcb_printf(dcb, "\tMonitor stopped\n");
break;
}
dcb_printf(dcb,"\tSampling interval:\t%lu milliseconds\n", handle->interval);
dcb_printf(dcb, "\tMonitored servers: ");
db = handle->databases;
sep = "";
while (db)
@ -293,13 +310,29 @@ char *server_string;
if (uname == NULL)
return;
/* Don't even probe server flagged as in maintenance */
if (SERVER_IN_MAINT(database->server))
return;
if (database->con == NULL || mysql_ping(database->con) != 0)
{
char *dpwd = decryptPassword(passwd);
int rc;
int read_timeout = 1;
database->con = mysql_init(NULL);
rc = mysql_options(database->con, MYSQL_OPT_READ_TIMEOUT, (void *)&read_timeout);
if (mysql_real_connect(database->con, database->server->name,
uname, dpwd, NULL, database->server->port, NULL, 0) == NULL)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Monitor was unable to connect to "
"server %s:%d : \"%s\"",
database->server->name,
database->server->port,
mysql_error(database->con))));
server_clear_status(database->server, SERVER_RUNNING);
database->server->node_id = -1;
free(dpwd);
@ -395,11 +428,12 @@ long master_id;
while (ptr)
{
unsigned int prev_status = ptr->server->status;
monitorDatabase(ptr, handle->defaultUser, handle->defaultPasswd);
/* set master_id to the lowest value of ptr->server->node_id */
if (ptr->server->node_id >= 0 && SERVER_IS_JOINED(ptr->server)) {
if ((! SERVER_IN_MAINT(ptr->server)) && ptr->server->node_id >= 0 && SERVER_IS_JOINED(ptr->server)) {
if (ptr->server->node_id < master_id && master_id >= 0) {
master_id = ptr->server->node_id;
} else {
@ -407,11 +441,21 @@ long master_id;
master_id = ptr->server->node_id;
}
}
} else {
} else if (!SERVER_IN_MAINT(ptr->server)) {
/* clear M/S status */
server_clear_status(ptr->server, SERVER_SLAVE);
server_clear_status(ptr->server, SERVER_MASTER);
}
if (ptr->server->status != prev_status ||
SERVER_IS_DOWN(ptr->server))
{
LOGIF(LM, (skygw_log_write_flush(
LOGFILE_MESSAGE,
"Backend server %s:%d state : %s",
ptr->server->name,
ptr->server->port,
STRSRVSTATUS(ptr->server))));
}
ptr = ptr->next;
}
@ -420,7 +464,7 @@ long master_id;
/* this server loop sets Master and Slave roles */
while (ptr)
{
if (ptr->server->node_id >= 0 && master_id >= 0) {
if ((! SERVER_IN_MAINT(ptr->server)) && ptr->server->node_id >= 0 && master_id >= 0) {
/* set the Master role */
if (SERVER_IS_JOINED(ptr->server) && (ptr->server->node_id == master_id)) {
server_set_status(ptr->server, SERVER_MASTER);
@ -434,6 +478,19 @@ long master_id;
ptr = ptr->next;
}
thread_millisleep(MONITOR_INTERVAL);
thread_millisleep(handle->interval);
}
}
/**
* Set the monitor sampling interval.
*
* @param arg The handle allocated by startMonitor
* @param interval The interval to set in monitor struct, in milliseconds
*/
static void
setInterval(void *arg, unsigned long interval)
{
MYSQL_MONITOR *handle = (MYSQL_MONITOR *)arg;
memcpy(&handle->interval, &interval, sizeof(unsigned long));
}

View File

@ -30,6 +30,9 @@
* diagnostic interface
* 20/05/14 Massimiliano Pinto Addition of support for MariadDB multimaster replication setup.
* New server field version_string is updated.
* 28/05/14 Massimiliano Pinto Added set Id and configuration options (setInverval)
* Parameters are now printed in diagnostics
* 03/06/14 Mark Ridoch Add support for maintenance mode
*
* @endverbatim
*/
@ -46,12 +49,20 @@
#include <log_manager.h>
#include <secrets.h>
#include <dcb.h>
#include <modinfo.h>
extern int lm_enabled_logfiles_bitmask;
static void monitorMain(void *);
static char *version_str = "V1.1.0";
static char *version_str = "V1.2.0";
MODULE_INFO info = {
MODULE_API_MONITOR,
MODULE_ALPHA_RELEASE,
MONITOR_VERSION,
"A MySQL Master/Slave replication monitor"
};
static void *startMonitor(void *);
static void stopMonitor(void *);
@ -59,8 +70,11 @@ static void registerServer(void *, SERVER *);
static void unregisterServer(void *, SERVER *);
static void defaultUser(void *, char *, char *);
static void diagnostics(DCB *, void *);
static void setInterval(void *, unsigned long);
static void defaultId(void *, unsigned long);
static void replicationHeartbeat(void *, int);
static MONITOR_OBJECT MyObject = { startMonitor, stopMonitor, registerServer, unregisterServer, defaultUser, diagnostics };
static MONITOR_OBJECT MyObject = { startMonitor, stopMonitor, registerServer, unregisterServer, defaultUser, diagnostics, setInterval, defaultId, replicationHeartbeat };
/**
* Implementation of the mandatory version entry point
@ -126,6 +140,8 @@ MYSQL_MONITOR *handle;
handle->shutdown = 0;
handle->defaultUser = NULL;
handle->defaultPasswd = NULL;
handle->id = MONITOR_DEFAULT_ID;
handle->interval = MONITOR_INTERVAL;
spinlock_init(&handle->lock);
}
handle->tid = (THREAD)thread_start(monitorMain, handle);
@ -261,7 +277,12 @@ char *sep;
dcb_printf(dcb, "\tMonitor stopped\n");
break;
}
dcb_printf(dcb,"\tSampling interval:\t%lu milliseconds\n", handle->interval);
dcb_printf(dcb,"\tMaxScale MonitorId:\t%lu\n", handle->id);
dcb_printf(dcb,"\tReplication lag:\t%s\n", (handle->replicationHeartbeat == 1) ? "enabled" : "disabled");
dcb_printf(dcb, "\tMonitored servers: ");
db = handle->databases;
sep = "";
while (db)
@ -280,13 +301,13 @@ char *sep;
/**
* Monitor an individual server
*
* @param handle The MySQL Monitor object
* @param database The database to probe
* @param defaultUser Default username for the monitor
* @param defaultPasswd Default password for the monitor
*/
static void
monitorDatabase(MONITOR_SERVERS *database, char *defaultUser, char *defaultPasswd)
monitorDatabase(MYSQL_MONITOR *handle, MONITOR_SERVERS *database)
{
<<<<<<< HEAD
MYSQL_ROW row;
MYSQL_RES *result;
int num_fields;
@ -298,6 +319,21 @@ static int conn_err_count;
static int modval = 10;
if (database->server->monuser != NULL)
=======
MYSQL_ROW row;
MYSQL_RES *result;
int num_fields;
int ismaster = 0, isslave = 0;
char *uname = handle->defaultUser, *passwd = handle->defaultPasswd;
unsigned long int server_version = 0;
char *server_string;
unsigned long id = handle->id;
int replication_heartbeat = handle->replicationHeartbeat;
static int conn_err_count;
static int modval = 10;
if (database->server->monuser != NULL)
>>>>>>> develop
{
uname = database->server->monuser;
passwd = database->server->monpw;
@ -306,13 +342,24 @@ static int modval = 10;
if (uname == NULL)
return;
<<<<<<< HEAD
=======
/* Don't probe servers in maintenance mode */
if (SERVER_IN_MAINT(database->server))
return;
>>>>>>> develop
if (database->con == NULL || mysql_ping(database->con) != 0)
{
char *dpwd = decryptPassword(passwd);
int rc;
int read_timeout = 1;
<<<<<<< HEAD
database->con = mysql_init(NULL);
=======
database->con = mysql_init(NULL);
>>>>>>> develop
rc = mysql_options(database->con, MYSQL_OPT_READ_TIMEOUT, (void *)&read_timeout);
if (mysql_real_connect(database->con,
@ -324,6 +371,7 @@ static int modval = 10;
NULL,
0) == NULL)
{
<<<<<<< HEAD
if (conn_err_count%modval == 0)
{
LOGIF(LE, (skygw_log_write_flush(
@ -340,6 +388,16 @@ static int modval = 10;
{
conn_err_count += 1;
}
=======
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Monitor was unable to connect to "
"server %s:%d : \"%s\"",
database->server->name,
database->server->port,
mysql_error(database->con))));
>>>>>>> develop
free(dpwd);
server_clear_status(database->server, SERVER_RUNNING);
@ -386,15 +444,112 @@ static int modval = 10;
{
/* Log lack of permission */
}
}
else if ((result = mysql_store_result(database->con)) != NULL)
{
database->server->rlag = -1;
} else if ((result = mysql_store_result(database->con)) != NULL) {
num_fields = mysql_num_fields(result);
while ((row = mysql_fetch_row(result)))
{
ismaster = 1;
}
mysql_free_result(result);
if (ismaster && replication_heartbeat == 1) {
time_t heartbeat;
time_t purge_time;
char heartbeat_insert_query[128]="";
char heartbeat_purge_query[128]="";
handle->master_id = database->server->node_id;
/* create the maxscale_schema database */
if (mysql_query(database->con, "CREATE DATABASE IF NOT EXISTS maxscale_schema")) {
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"[mysql_mon]: Error creating maxscale_schema database in Master server"
": %s", mysql_error(database->con))));
database->server->rlag = -1;
}
/* create repl_heartbeat table in maxscale_schema database */
if (mysql_query(database->con, "CREATE TABLE IF NOT EXISTS "
"maxscale_schema.replication_heartbeat "
"(maxscale_id INT NOT NULL, "
"master_server_id INT NOT NULL, "
"master_timestamp INT UNSIGNED NOT NULL, "
"PRIMARY KEY ( master_server_id, maxscale_id ) ) "
"ENGINE=MYISAM DEFAULT CHARSET=latin1")) {
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"[mysql_mon]: Error creating maxscale_schema.replication_heartbeat table in Master server"
": %s", mysql_error(database->con))));
database->server->rlag = -1;
}
/* auto purge old values after 48 hours*/
purge_time = time(0) - (3600 * 48);
sprintf(heartbeat_purge_query, "DELETE FROM maxscale_schema.replication_heartbeat WHERE master_timestamp < %lu", purge_time);
if (mysql_query(database->con, heartbeat_purge_query)) {
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"[mysql_mon]: Error deleting from maxscale_schema.replication_heartbeat table: [%s], %s",
heartbeat_purge_query,
mysql_error(database->con))));
}
heartbeat = time(0);
/* set node_ts for master as time(0) */
database->server->node_ts = heartbeat;
sprintf(heartbeat_insert_query, "UPDATE maxscale_schema.replication_heartbeat SET master_timestamp = %lu WHERE master_server_id = %i AND maxscale_id = %lu", heartbeat, handle->master_id, id);
/* Try to insert MaxScale timestamp into master */
if (mysql_query(database->con, heartbeat_insert_query)) {
database->server->rlag = -1;
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"[mysql_mon]: Error updating maxscale_schema.replication_heartbeat table: [%s], %s",
heartbeat_insert_query,
mysql_error(database->con))));
} else {
if (mysql_affected_rows(database->con) == 0) {
heartbeat = time(0);
sprintf(heartbeat_insert_query, "REPLACE INTO maxscale_schema.replication_heartbeat (master_server_id, maxscale_id, master_timestamp ) VALUES ( %i, %lu, %lu)", handle->master_id, id, heartbeat);
if (mysql_query(database->con, heartbeat_insert_query)) {
database->server->rlag = -1;
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"[mysql_mon]: Error inserting into maxscale_schema.replication_heartbeat table: [%s], %s",
heartbeat_insert_query,
mysql_error(database->con))));
} else {
/* Set replication lag to 0 for the master */
database->server->rlag = 0;
LOGIF(LD, (skygw_log_write_flush(
LOGFILE_DEBUG,
"[mysql_mon]: heartbeat table inserted data for %s:%i", database->server->name, database->server->port)));
}
} else {
/* Set replication lag as 0 for the master */
database->server->rlag = 0;
LOGIF(LD, (skygw_log_write_flush(
LOGFILE_DEBUG,
"[mysql_mon]: heartbeat table updated for %s:%i", database->server->name, database->server->port)));
}
}
}
}
/* Check if the Slave_SQL_Running and Slave_IO_Running status is
@ -439,6 +594,80 @@ static int modval = 10;
}
}
/* Get the master_timestamp value from maxscale_schema.replication_heartbeat table */
if (isslave && replication_heartbeat == 1) {
time_t heartbeat;
char select_heartbeat_query[256] = "";
sprintf(select_heartbeat_query, "SELECT master_timestamp "
"FROM maxscale_schema.replication_heartbeat "
"WHERE maxscale_id = %lu AND master_server_id = %i",
id, handle->master_id);
/* if there is a master then send the query to the slave with master_id*/
if (handle->master_id >= 0 && (mysql_query(database->con, select_heartbeat_query) == 0
&& (result = mysql_store_result(database->con)) != NULL)) {
num_fields = mysql_num_fields(result);
while ((row = mysql_fetch_row(result))) {
int rlag = -1;
time_t slave_read;
heartbeat = time(0);
slave_read = strtoul(row[0], NULL, 10);
if ((errno == ERANGE && (slave_read == LONG_MAX || slave_read == LONG_MIN)) || (errno != 0 && slave_read == 0)) {
slave_read = 0;
}
if (slave_read) {
/* set the replication lag */
rlag = heartbeat - slave_read;
}
/* set this node_ts as master_timestamp read from replication_heartbeat table */
database->server->node_ts = slave_read;
if (rlag >= 0) {
/* store rlag only if greater than monitor sampling interval */
database->server->rlag = (rlag > (handle->interval / 1000)) ? rlag : 0;
} else {
database->server->rlag = -1;
}
LOGIF(LD, (skygw_log_write_flush(
LOGFILE_DEBUG,
"[mysql_mon]: replication heartbeat: "
"server %s:%i is %i seconds behind master",
database->server->name,
database->server->port,
database->server->rlag)));
}
mysql_free_result(result);
} else {
database->server->rlag = -1;
database->server->node_ts = 0;
if (handle->master_id < 0) {
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"[mysql_mon]: error: replication heartbeat: "
"master_server_id NOT available for %s:%i",
database->server->name,
database->server->port)));
} else {
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"[mysql_mon]: error: replication heartbeat: "
"failed selecting from hearthbeat table of %s:%i : [%s], %s",
database->server->name,
database->server->port,
select_heartbeat_query,
mysql_error(database->con))));
}
}
}
if (ismaster)
{
server_set_status(database->server, SERVER_MASTER);
@ -492,11 +721,18 @@ static int modval = 10;
{
unsigned int prev_status = ptr->server->status;
<<<<<<< HEAD
monitorDatabase(ptr, handle->defaultUser, handle->defaultPasswd);
if (ptr->server->status != prev_status ||
(SERVER_IS_DOWN(ptr->server) &&
err_count%modval == 0))
=======
monitorDatabase(handle, ptr);
if (ptr->server->status != prev_status ||
SERVER_IS_DOWN(ptr->server))
>>>>>>> develop
{
LOGIF(LM, (skygw_log_write_flush(
LOGFILE_MESSAGE,
@ -504,6 +740,7 @@ static int modval = 10;
ptr->server->name,
ptr->server->port,
STRSRVSTATUS(ptr->server))));
<<<<<<< HEAD
err_count = 0;
modval += 1;
}
@ -511,8 +748,51 @@ static int modval = 10;
{
err_count += 1;
}
=======
}
>>>>>>> develop
ptr = ptr->next;
}
thread_millisleep(10000);
thread_millisleep(handle->interval);
}
}
/**
* Set the default id to use in the monitor.
*
* @param arg The handle allocated by startMonitor
* @param id The id to set in monitor struct
*/
static void
defaultId(void *arg, unsigned long id)
{
MYSQL_MONITOR *handle = (MYSQL_MONITOR *)arg;
memcpy(&handle->id, &id, sizeof(unsigned long));
}
/**
* Set the monitor sampling interval.
*
* @param arg The handle allocated by startMonitor
* @param interval The interval to set in monitor struct, in milliseconds
*/
static void
setInterval(void *arg, unsigned long interval)
{
MYSQL_MONITOR *handle = (MYSQL_MONITOR *)arg;
memcpy(&handle->interval, &interval, sizeof(unsigned long));
}
/**
* Enable/Disable the MySQL Replication hearbeat, detecting slave lag behind master.
*
* @param arg The handle allocated by startMonitor
* @param replicationHeartbeat To enable it 1, disable it with 0
*/
static void
replicationHeartbeat(void *arg, int replicationHeartbeat)
{
MYSQL_MONITOR *handle = (MYSQL_MONITOR *)arg;
memcpy(&handle->replicationHeartbeat, &replicationHeartbeat, sizeof(int));
}

View File

@ -30,6 +30,7 @@
* Date Who Description
* 08/07/13 Mark Riddoch Initial implementation
* 26/05/14 Massimiliano Pinto Default values for MONITOR_INTERVAL
* 28/05/14 Massimiliano Pinto Addition of new fields in MYSQL_MONITOR struct
*
* @endverbatim
*/
@ -55,6 +56,10 @@ typedef struct {
int status; /**< Monitor status */
char *defaultUser; /**< Default username for monitoring */
char *defaultPasswd; /**< Default password for monitoring */
unsigned long interval; /**< Monitor sampling interval */
unsigned long id; /**< Monitor ID */
int replicationHeartbeat; /**< Monitor flag for MySQL replication heartbeat */
int master_id; /**< Master server-id for MySQL Master/Slave replication */
MONITOR_SERVERS *databases; /**< Linked list of servers to monitor */
} MYSQL_MONITOR;
@ -63,5 +68,6 @@ typedef struct {
#define MONITOR_STOPPED 3
#define MONITOR_INTERVAL 10000 // in milliseconds
#define MONITOR_DEFAULT_ID 1UL // unsigned long value
#endif

View File

@ -39,6 +39,14 @@
#include <httpd.h>
#include <gw.h>
#include <modinfo.h>
MODULE_INFO info = {
MODULE_API_PROTOCOL,
MODULE_IN_DEVELOPMENT,
GWPROTOCOL_VERSION,
"An experimental HTTPD implementation for use in admnistration"
};
#define ISspace(x) isspace((int)(x))
#define HTTP_SERVER_STRING "Gateway(c) v.1.0.0"

View File

@ -43,6 +43,14 @@
* 27/09/2013 Massimiliano Pinto Changed in gw_read_backend_event the check for dcb_read(), now is if rc < 0
*
*/
#include <modinfo.h>
MODULE_INFO info = {
MODULE_API_PROTOCOL,
MODULE_ALPHA_RELEASE,
GWPROTOCOL_VERSION,
"The MySQL to backend server protocol"
};
extern int lm_enabled_logfiles_bitmask;
@ -1016,4 +1024,4 @@ static int gw_session(DCB *backend_dcb, void *data) {
return 1;
}
*/
*/

View File

@ -40,6 +40,14 @@
#include <log_manager.h>
#include <mysql_client_server_protocol.h>
#include <gw.h>
#include <modinfo.h>
MODULE_INFO info = {
MODULE_API_PROTOCOL,
MODULE_ALPHA_RELEASE,
GWPROTOCOL_VERSION,
"The client to MaxScale MySQL protocol implementation"
};
extern int lm_enabled_logfiles_bitmask;
@ -57,11 +65,7 @@ static int gw_client_hangup_event(DCB *dcb);
int mysql_send_ok(DCB *dcb, int packet_number, int in_affected_rows, const char* mysql_message);
int MySQLSendHandshake(DCB* dcb);
static int gw_mysql_do_authentication(DCB *dcb, GWBUF *queue);
static int route_by_statement(
ROUTER* router_instance,
ROUTER_OBJECT* router,
void* rsession,
GWBUF* read_buf);
static int route_by_statement(SESSION *, GWBUF *);
/*
* The "module object" for the mysqld client protocol module.
@ -791,6 +795,7 @@ int gw_read_client_event(DCB* dcb) {
/** Route COM_QUIT to backend */
if (mysql_command == '\x01') {
<<<<<<< HEAD
#if defined(ERRHANDLE)
/**
* Close router session and that closes
@ -799,7 +804,11 @@ int gw_read_client_event(DCB* dcb) {
*/
dcb_close(dcb);
#else
router->routeQuery(router_instance, rsession, read_buffer);
SESSION_ROUTE_QUERY(session, read_buffer);
// router->routeQuery(router_instance, rsession, read_buffer);
=======
SESSION_ROUTE_QUERY(session, read_buffer);
>>>>>>> develop
LOGIF(LD, (skygw_log_write_flush(
LOGFILE_DEBUG,
"%lu [gw_read_client_event] Routed COM_QUIT to "
@ -818,10 +827,7 @@ int gw_read_client_event(DCB* dcb) {
* Feed each statement completely and separately
* to router.
*/
rc = route_by_statement(router_instance,
router,
rsession,
read_buffer);
rc = route_by_statement(session, read_buffer);
if (read_buffer != NULL)
{
/** add incomplete mysql packet to read queue */
@ -831,9 +837,7 @@ int gw_read_client_event(DCB* dcb) {
else
{
/** Feed whole packet to router */
rc = router->routeQuery(router_instance,
rsession,
read_buffer);
rc = SESSION_ROUTE_QUERY(session, read_buffer);
}
/** succeed */
@ -1436,11 +1440,7 @@ gw_client_hangup_event(DCB *dcb)
* Return 1 in success. If the last packet is incomplete return success but
* leave incomplete packet to readbuf.
*/
static int route_by_statement(
ROUTER* router_instance,
ROUTER_OBJECT* router,
void* rsession,
GWBUF* readbuf)
static int route_by_statement(SESSION *session, GWBUF *readbuf)
{
int rc = -1;
GWBUF* packetbuf;
@ -1452,7 +1452,7 @@ static int route_by_statement(
if (packetbuf != NULL)
{
CHK_GWBUF(packetbuf);
rc = router->routeQuery(router_instance, rsession, packetbuf);
rc = SESSION_ROUTE_QUERY(session, packetbuf);
}
else
{

View File

@ -36,6 +36,14 @@
#include <adminusers.h>
#include <skygw_utils.h>
#include <log_manager.h>
#include <modinfo.h>
MODULE_INFO info = {
MODULE_API_PROTOCOL,
MODULE_ALPHA_RELEASE,
GWPROTOCOL_VERSION,
"A telnet deamon protocol for simple administration interface"
};
extern int lm_enabled_logfiles_bitmask;
@ -140,9 +148,6 @@ telnetd_read_event(DCB* dcb)
int n;
GWBUF *head = NULL;
SESSION *session = dcb->session;
ROUTER_OBJECT *router = session->service->router;
ROUTER *router_instance = session->service->router_instance;
void *rsession = session->router_session;
TELNETD *telnetd = (TELNETD *)dcb->protocol;
char *password, *t;
@ -196,7 +201,7 @@ char *password, *t;
free(password);
break;
case TELNETD_STATE_DATA:
router->routeQuery(router_instance, rsession, head);
SESSION_ROUTE_QUERY(session, head);
break;
}
}

View File

@ -35,6 +35,7 @@
#include <session.h>
#include <router.h>
#include <modules.h>
#include <modinfo.h>
#include <atomic.h>
#include <spinlock.h>
#include <dcb.h>
@ -43,6 +44,14 @@
#include <skygw_utils.h>
#include <log_manager.h>
MODULE_INFO info = {
MODULE_API_ROUTER,
MODULE_ALPHA_RELEASE,
ROUTER_VERSION,
"The debug user interface"
};
extern int lm_enabled_logfiles_bitmask;
static char *version_str = "V1.1.1";

View File

@ -40,6 +40,7 @@
* 20/05/14 Mark Riddoch Added ability to give server and service names rather
* than simply addresses
* 23/05/14 Mark Riddoch Added support for developer and user modes
* 29/05/14 Mark Riddoch Add Filter support
*
* @endverbatim
*/
@ -50,6 +51,7 @@
#include <service.h>
#include <session.h>
#include <router.h>
#include <filter.h>
#include <modules.h>
#include <atomic.h>
#include <server.h>
@ -77,6 +79,7 @@
#define ARG_TYPE_SESSION 6
#define ARG_TYPE_DCB 7
#define ARG_TYPE_MONITOR 8
#define ARG_TYPE_FILTER 9
/**
* The subcommand structure
@ -113,6 +116,14 @@ struct subcommand showoptions[] = {
"Show the poll statistics",
"Show the poll statistics",
{0, 0, 0} },
{ "filter", 0, dprintFilter,
"Show details of a filter, called with a filter name",
"Show details of a filter, called with the address of a filter",
{ARG_TYPE_FILTER, 0, 0} },
{ "filters", 0, dprintAllFilters,
"Show all filters",
"Show all filters",
{0, 0, 0} },
{ "modules", 0, dprintAllModules,
"Show all currently loaded modules",
"Show all currently loaded modules",
@ -157,6 +168,10 @@ struct subcommand showoptions[] = {
* The subcommands of the list command
*/
struct subcommand listoptions[] = {
{ "filters", 0, dListFilters,
"List all the filters defined within MaxScale",
"List all the filters defined within MaxScale",
{0, 0, 0} },
{ "listeners", 0, dListListeners,
"List all the listeners defined within MaxScale",
"List all the listeners defined within MaxScale",
@ -493,6 +508,10 @@ SERVICE *service;
if (mode == CLIM_USER || (rval = (unsigned long)strtol(arg, NULL, 0)) == 0)
rval = (unsigned long)monitor_find(arg);
return rval;
case ARG_TYPE_FILTER:
if (mode == CLIM_USER || (rval = (unsigned long)strtol(arg, NULL, 0)) == 0)
rval = (unsigned long)filter_find(arg);
return rval;
}
return 0;
}
@ -516,7 +535,7 @@ execute_cmd(CLI_SESSION *cli)
{
DCB *dcb = cli->session->client;
int argc, i, j, found = 0;
char *args[MAXARGS];
char *args[MAXARGS + 1];
unsigned long arg1, arg2, arg3;
int in_quotes = 0, escape_next = 0;
char *ptr, *lptr;
@ -754,11 +773,13 @@ static struct {
char *str;
unsigned int bit;
} ServerBits[] = {
{ "running", SERVER_RUNNING },
{ "master", SERVER_MASTER },
{ "slave", SERVER_SLAVE },
{ "synced", SERVER_JOINED },
{ NULL, 0 }
{ "running", SERVER_RUNNING },
{ "master", SERVER_MASTER },
{ "slave", SERVER_SLAVE },
{ "synced", SERVER_JOINED },
{ "maintenance", SERVER_MAINT },
{ "maint", SERVER_MAINT },
{ NULL, 0 }
};
/**
* Map the server status bit

View File

@ -79,6 +79,7 @@
#include <readconnection.h>
#include <dcb.h>
#include <spinlock.h>
#include <modinfo.h>
#include <skygw_types.h>
#include <skygw_utils.h>
@ -88,6 +89,13 @@
extern int lm_enabled_logfiles_bitmask;
MODULE_INFO info = {
MODULE_API_ROUTER,
MODULE_ALPHA_RELEASE,
ROUTER_VERSION,
"A connection based router to load balance based on connections"
};
static char *version_str = "V1.0.2";
/* The router entry points */
@ -345,6 +353,9 @@ int master_host = -1;
inst->bitmask)));
}
if (SERVER_IN_MAINT(inst->servers[i]->server))
continue;
/*
* If router_options=slave, get the running master
* It will be used if there are no running slaves at all

View File

@ -30,10 +30,19 @@
#include <query_classifier.h>
#include <dcb.h>
#include <spinlock.h>
#include <modinfo.h>
MODULE_INFO info = {
MODULE_API_ROUTER,
MODULE_ALPHA_RELEASE,
ROUTER_VERSION,
"A Read/Write splitting router for enhancement read scalability"
};
#if defined(SS_DEBUG)
# include <mysql_client_server_protocol.h>
#endif
extern int lm_enabled_logfiles_bitmask;
/**
@ -829,8 +838,7 @@ static bool get_dcb(
BACKEND* b = backend_ref[i].bref_backend;
if (backend_ref[i].bref_state == BREF_IN_USE &&
(SERVER_IS_MASTER(b->backend_server) ||
SERVER_IS_JOINED(b->backend_server)))
(SERVER_IS_MASTER(b->backend_server)))
{
*p_dcb = backend_ref[i].bref_dcb;
succp = true;
@ -1544,8 +1552,7 @@ static bool select_connect_backend_servers(
}
}
else if (!master_connected &&
(SERVER_IS_MASTER(b->backend_server) ||
SERVER_IS_JOINED(b->backend_server)))
(SERVER_IS_MASTER(b->backend_server)))
{
master_found = true;
@ -1659,8 +1666,7 @@ static bool select_connect_backend_servers(
"Selected %s in \t%s:%d",
(btype == BE_MASTER ? "master" :
(btype == BE_SLAVE ? "slave" :
(btype == BE_JOINED ? "galera node" :
"unknown node type"))),
"unknown node type")),
b->backend_server->name,
b->backend_server->port)));
}

View File

@ -17,9 +17,17 @@
*/
#include <stdio.h>
#include <router.h>
#include <modinfo.h>
static char *version_str = "V1.0.0";
MODULE_INFO info = {
MODULE_API_ROUTER,
MODULE_IN_DEVELOPMENT,
ROUTER_VERSION,
"A test router - not for use in real systems"
};
static ROUTER *createInstance(SERVICE *service, char **options);
static void *newSession(ROUTER *instance, SESSION *session);
static void closeSession(ROUTER *instance, void *session);
@ -145,4 +153,4 @@ static uint8_t getCapabilities(
void* router_session)
{
return 0;
}
}

View File

@ -234,7 +234,7 @@ typedef enum skygw_chk_t {
((SERVER_IS_RUNNING(s) && SERVER_IS_JOINED(s)) ? "RUNNING JOINED" : \
((SERVER_IS_RUNNING(s) && SERVER_IN_MAINT(s)) ? "RUNNING MAINTENANCE" : \
(SERVER_IS_RUNNING(s) ? "RUNNING (only)" : "NO STATUS")))))
#define CHK_MLIST(l) { \
ss_info_dassert((l->mlist_chk_top == CHK_NUM_MLIST && \
l->mlist_chk_tail == CHK_NUM_MLIST), \