diff --git a/Documentation/Debug And Diagnostic Support.pdf b/Documentation/Debug And Diagnostic Support.pdf index 8f473c5fe..d7518bbee 100644 Binary files a/Documentation/Debug And Diagnostic Support.pdf and b/Documentation/Debug And Diagnostic Support.pdf differ diff --git a/Documentation/Max Scale 0.6 Release Notes.pdf b/Documentation/Max Scale 0.6 Release Notes.pdf deleted file mode 100644 index 418572497..000000000 Binary files a/Documentation/Max Scale 0.6 Release Notes.pdf and /dev/null differ diff --git a/Documentation/MaxScale 0.7 Release Notes.pdf b/Documentation/MaxScale 0.7 Release Notes.pdf new file mode 100644 index 000000000..d40d2d974 Binary files /dev/null and b/Documentation/MaxScale 0.7 Release Notes.pdf differ diff --git a/Documentation/MaxScale Configuration And Usage Scenarios.pdf b/Documentation/MaxScale Configuration And Usage Scenarios.pdf index 1dc35b055..ec8a813f8 100644 Binary files a/Documentation/MaxScale Configuration And Usage Scenarios.pdf and b/Documentation/MaxScale Configuration And Usage Scenarios.pdf differ diff --git a/Documentation/MaxScale 0.6 Release Notes.pdf b/Documentation/history/MaxScale 0.6 Release Notes.pdf similarity index 100% rename from Documentation/MaxScale 0.6 Release Notes.pdf rename to Documentation/history/MaxScale 0.6 Release Notes.pdf diff --git a/VERSION b/VERSION index a918a2aa1..faef31a43 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.6.0 +0.7.0 diff --git a/server/Makefile b/server/Makefile index 2ac7ed2be..d6aba988e 100644 --- a/server/Makefile +++ b/server/Makefile @@ -33,6 +33,7 @@ all: (cd modules/routing/readwritesplit; touch depend.mk ;make) (cd modules/protocol; touch depend.mk ;make) (cd modules/monitor; touch depend.mk ;make) + (cd modules/filter; touch depend.mk ;make) cleantests: $(MAKE) -C test cleantests @@ -49,12 +50,14 @@ clean: (cd modules/routing; touch depend.mk ; make clean) (cd modules/protocol; touch depend.mk ; make clean) (cd modules/monitor; touch depend.mk ; make clean) + (cd modules/filter; touch depend.mk ; make clean) depend: (cd core; touch depend.mk ; make depend) (cd modules/routing; touch depend.mk ; make depend) (cd modules/protocol; touch depend.mk ; make depend) (cd modules/monitor; touch depend.mk ; make depend) + (cd modules/filter; touch depend.mk ; make depend) install: @mkdir -p $(DEST) @@ -71,3 +74,4 @@ install: (cd modules/routing; make DEST=$(DEST) install) (cd modules/protocol; make DEST=$(DEST) install) (cd modules/monitor; make DEST=$(DEST) install) + (cd modules/filter; make DEST=$(DEST) install) diff --git a/server/MaxScale_template.cnf b/server/MaxScale_template.cnf index ee7eb6e30..94981afbc 100644 --- a/server/MaxScale_template.cnf +++ b/server/MaxScale_template.cnf @@ -20,6 +20,8 @@ threads=1 # user = # passwd= +# monitor_interval= [MySQL Monitor] type=monitor diff --git a/server/core/Makefile b/server/core/Makefile index bd29fbae8..0a8de1a4a 100644 --- a/server/core/Makefile +++ b/server/core/Makefile @@ -33,6 +33,7 @@ # 29/06/13 Vilho Raatikka Reverted Query classifier changes because # gateway needs mysql client lib, not qc. # 24/07/13 Mark Ridoch Addition of encryption routines +# 30/05/14 Mark Ridoch Filter API added include ../../build_gateway.inc @@ -56,14 +57,15 @@ LDFLAGS=-rdynamic -L$(LOGPATH) \ SRCS= atomic.c buffer.c spinlock.c gateway.c \ gw_utils.c utils.c dcb.c load_utils.c session.c service.c server.c \ poll.c config.c users.c hashtable.c dbusers.c thread.c gwbitmask.c \ - monitor.c adminusers.c secrets.c + monitor.c adminusers.c secrets.c filter.c modutil.c HDRS= ../include/atomic.h ../include/buffer.h ../include/dcb.h \ ../include/gw.h ../modules/include/mysql_client_server_protocol.h \ ../include/session.h ../include/spinlock.h ../include/thread.h \ ../include/modules.h ../include/poll.h ../include/config.h \ ../include/users.h ../include/hashtable.h ../include/gwbitmask.h \ - ../include/adminusers.h ../include/version.h ../include/maxscale.h + ../include/adminusers.h ../include/version.h ../include/maxscale.h \ + ../include/filter.h modutil.h OBJ=$(SRCS:.c=.o) diff --git a/server/core/config.c b/server/core/config.c index be38314ab..988e263fd 100644 --- a/server/core/config.c +++ b/server/core/config.c @@ -31,6 +31,9 @@ * 11/03/14 Massimiliano Pinto Added Unix socket support * 11/05/14 Massimiliano Pinto Added version_string support to service * 19/05/14 Mark Riddoch Added unique names from section headers + * 29/05/14 Mark Riddoch Addition of filter definition + * 23/05/14 Massimiliano Pinto Added automatic set of maxscale-id: first listening ipv4_raw + port + pid + * 28/05/14 Massimiliano Pinto Added detect_replication_lag parameter * * @endverbatim */ @@ -56,6 +59,7 @@ static char *config_get_value(CONFIG_PARAMETER *, const char *); static int handle_global_item(const char *, const char *); static void global_defaults(); static void check_config_objects(CONFIG_CONTEXT *context); +static int config_truth_value(char *str); static char *config_file = NULL; static GATEWAY_CONF gateway; @@ -231,7 +235,7 @@ int error_count = 0; if (obj->element == NULL) /*< if module load failed */ { - LOGIF(LE, (skygw_log_write_flush( + LOGIF(LE, (skygw_log_write_flush( LOGFILE_ERROR, "Error : Reading configuration " "for router service '%s' failed. " @@ -254,7 +258,7 @@ int error_count = 0; "max_slave_connections"); if (enable_root_user) - serviceEnableRootUser(obj->element, atoi(enable_root_user)); + serviceEnableRootUser(obj->element, config_truth_value(enable_root_user)); if (!auth) auth = config_get_value(obj->parameters, "auth"); @@ -359,6 +363,52 @@ int error_count = 0; obj->object))); } } + else if (!strcmp(type, "filter")) + { + char *module = config_get_value(obj->parameters, + "module"); + char *options = config_get_value(obj->parameters, + "options"); + + if (module) + { + obj->element = filter_alloc(obj->object, module); + } + else + { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error: Filter '%s' has no module " + "defined defined to load.", + obj->object))); + error_count++; + } + if (obj->element && options) + { + char *s = strtok(options, ","); + while (s) + { + filterAddOption(obj->element, s); + s = strtok(NULL, ","); + } + } + if (obj->element) + { + CONFIG_PARAMETER *params = obj->parameters; + while (params) + { + if (strcmp(params->name, "module") + && strcmp(params->name, + "options")) + { + filterAddParameter(obj->element, + params->name, + params->value); + } + params = params->next; + } + } + } obj = obj->next; } @@ -376,7 +426,8 @@ int error_count = 0; { char *servers; char *roptions; - + char *filters = config_get_value(obj->parameters, + "filters"); servers = config_get_value(obj->parameters, "servers"); roptions = config_get_value(obj->parameters, "router_options"); @@ -418,6 +469,10 @@ int error_count = 0; s = strtok(NULL, ","); } } + if (filters) + { + serviceSetFilters(obj->element, filters); + } } else if (!strcmp(type, "listener")) { @@ -426,12 +481,19 @@ int error_count = 0; char *port; char *protocol; char *socket; + struct sockaddr_in serv_addr; service = config_get_value(obj->parameters, "service"); port = config_get_value(obj->parameters, "port"); address = config_get_value(obj->parameters, "address"); protocol = config_get_value(obj->parameters, "protocol"); socket = config_get_value(obj->parameters, "socket"); + + /* if id is not set, do it now */ + if (gateway.id == 0) { + setipaddress(&serv_addr.sin_addr, (address == NULL) ? "0.0.0.0" : address); + gateway.id = (unsigned long) (serv_addr.sin_addr.s_addr + port + getpid()); + } if (service && socket && protocol) { CONFIG_CONTEXT *ptr = context; @@ -494,18 +556,46 @@ int error_count = 0; char *servers; char *user; char *passwd; + unsigned long interval = 0; + int replication_heartbeat = 0; module = config_get_value(obj->parameters, "module"); servers = config_get_value(obj->parameters, "servers"); user = config_get_value(obj->parameters, "user"); passwd = config_get_value(obj->parameters, "passwd"); + if (config_get_value(obj->parameters, "monitor_interval")) { + interval = strtoul(config_get_value(obj->parameters, "monitor_interval"), NULL, 10); + } + + if (config_get_value(obj->parameters, "detect_replication_lag")) { + replication_heartbeat = atoi(config_get_value(obj->parameters, "detect_replication_lag")); + } if (module) { obj->element = monitor_alloc(obj->object, module); if (servers && obj->element) { - char *s = strtok(servers, ","); + char *s; + + /* if id is not set, compute it now with pid only */ + if (gateway.id == 0) { + gateway.id = getpid(); + } + + /* add the maxscale-id to monitor data */ + monitorSetId(obj->element, gateway.id); + + /* set monitor interval */ + if (interval > 0) + monitorSetInterval(obj->element, interval); + + /* set replication heartbeat */ + if(replication_heartbeat == 1) + monitorSetReplicationHeartbeat(obj->element, replication_heartbeat); + + /* get the servers to monitor */ + s = strtok(servers, ","); while (s) { CONFIG_CONTEXT *obj1 = context; @@ -550,7 +640,8 @@ int error_count = 0; error_count++; } } - else if (strcmp(type, "server") != 0) + else if (strcmp(type, "server") != 0 + && strcmp(type, "filter") != 0) { LOGIF(LE, (skygw_log_write_flush( LOGFILE_ERROR, @@ -747,6 +838,7 @@ global_defaults() gateway.version_string = strdup(version_string); else gateway.version_string = NULL; + gateway.id=0; } /** @@ -959,10 +1051,12 @@ SERVER *server; { char *servers; char *roptions; + char *filters; servers = config_get_value(obj->parameters, "servers"); roptions = config_get_value(obj->parameters, "router_options"); + filters = config_get_value(obj->parameters, "filters"); if (servers && obj->element) { char *s = strtok(servers, ","); @@ -996,6 +1090,8 @@ SERVER *server; s = strtok(NULL, ","); } } + if (filters) + serviceSetFilters(obj->element, filters); } else if (!strcmp(type, "listener")) { @@ -1056,7 +1152,8 @@ SERVER *server; } } else if (strcmp(type, "server") != 0 && - strcmp(type, "monitor") != 0) + strcmp(type, "monitor") != 0 && + strcmp(type, "filter") != 0) { LOGIF(LE, (skygw_log_write_flush( LOGFILE_ERROR, @@ -1080,6 +1177,7 @@ static char *service_params[] = "enable_root_user", "max_slave_connections", "version_string", + "filters", NULL }; @@ -1112,6 +1210,8 @@ static char *monitor_params[] = "servers", "user", "passwd", + "monitor_interval", + "detect_replication_lag", NULL }; /** @@ -1213,3 +1313,24 @@ bool config_set_qualified_param( return succp; } +/** + * Used for boolean settings where values may be 1, yes or true + * to enable a setting or -, no, false to disable a setting. + * + * @param str String to convert to a boolean + * @return Truth value + */ +static int +config_truth_value(char *str) +{ + if (strcasecmp(str, "true") == 0 || strcasecmp(str, "on") == 0) + { + return 1; + } + if (strcasecmp(str, "flase") == 0 || strcasecmp(str, "off") == 0) + { + return 0; + } + return atoi(str); +} + diff --git a/server/core/filter.c b/server/core/filter.c new file mode 100644 index 000000000..a3db72e3b --- /dev/null +++ b/server/core/filter.c @@ -0,0 +1,316 @@ +/* + * This file is distributed as part of MaxScale from SkySQL. It is free + * software: you can redistribute it and/or modify it under the terms of the + * GNU General Public License as published by the Free Software Foundation, + * version 2. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Copyright SkySQL Ab 2014 + */ + +/** + * @file filter.c - A representation of a filter within MaxScale. + * + * @verbatim + * Revision History + * + * Date Who Description + * 29/05/14 Mark Riddoch Initial implementation + * + * @endverbatim + */ +#include +#include +#include +#include +#include +#include +#include +#include +#include + +extern int lm_enabled_logfiles_bitmask; + +static SPINLOCK filter_spin = SPINLOCK_INIT; +static FILTER_DEF *allFilters = NULL; + +/** + * Allocate a new filter within MaxScale + * + * + * @param name The filter name + * @param module The module to load + * + * @return The newly created filter or NULL if an error occured + */ +FILTER_DEF * +filter_alloc(char *name, char *module) +{ +FILTER_DEF *filter; + + if ((filter = (FILTER_DEF *)malloc(sizeof(FILTER_DEF))) == NULL) + return NULL; + filter->name = strdup(name); + filter->module = strdup(module); + filter->filter = NULL; + filter->options = NULL; + filter->obj = NULL; + filter->parameters = NULL; + + spinlock_init(&filter->spin); + + spinlock_acquire(&filter_spin); + filter->next = allFilters; + allFilters = filter; + spinlock_release(&filter_spin); + + return filter; +} + + +/** + * Deallocate the specified filter + * + * @param server The service to deallocate + * @return Returns true if the server was freed + */ +void +filter_free(FILTER_DEF *filter) +{ +FILTER_DEF *ptr; + + /* First of all remove from the linked list */ + spinlock_acquire(&filter_spin); + if (allFilters == filter) + { + allFilters = filter->next; + } + else + { + ptr = allFilters; + while (ptr && ptr->next != filter) + { + ptr = ptr->next; + } + if (ptr) + ptr->next = filter->next; + } + spinlock_release(&filter_spin); + + /* Clean up session and free the memory */ + free(filter->name); + free(filter->module); + free(filter); +} + +/** + * Find an existing filter using the unique section name in + * configuration file + * + * @param name The filter name + * @return The server or NULL if not found + */ +FILTER_DEF * +filter_find(char *name) +{ +FILTER_DEF *filter; + + spinlock_acquire(&filter_spin); + filter = allFilters; + while (filter) + { + if (strcmp(filter->name, name) == 0) + break; + filter = filter->next; + } + spinlock_release(&filter_spin); + return filter; +} + +/** + * Print all filters to a DCB + * + * Designed to be called within a debugger session in order + * to display all active filters within MaxScale + */ +void +dprintAllFilters(DCB *dcb) +{ +FILTER_DEF *ptr; +int i; + + spinlock_acquire(&filter_spin); + ptr = allFilters; + while (ptr) + { + dcb_printf(dcb, "Filter %p (%s)\n", ptr, ptr->name); + dcb_printf(dcb, "\tModule: %s\n", ptr->module); + if (ptr->options) + { + dcb_printf(dcb, "\tOptions: "); + for (i = 0; ptr->options && ptr->options[i]; i++) + dcb_printf(dcb, "%s ", ptr->options[i]); + dcb_printf(dcb, "\n"); + } + if (ptr->obj && ptr->filter) + ptr->obj->diagnostics(ptr->filter, NULL, dcb); + else + dcb_printf(dcb, "\tModule not loaded.\n"); + ptr = ptr->next; + } + spinlock_release(&filter_spin); +} + +/** + * Print filter details to a DCB + * + * Designed to be called within a debug CLI in order + * to display all active filters in MaxScale + */ +void +dprintFilter(DCB *dcb, FILTER_DEF *filter) +{ +int i; + + dcb_printf(dcb, "Filter %p (%s)\n", filter, filter->name); + dcb_printf(dcb, "\tModule: %s\n", filter->module); + if (filter->options) + { + dcb_printf(dcb, "\tOptions: "); + for (i = 0; filter->options && filter->options[i]; i++) + dcb_printf(dcb, "%s ", filter->options[i]); + dcb_printf(dcb, "\n"); + } + if (filter->obj && filter->filter) + filter->obj->diagnostics(filter->filter, NULL, dcb); +} + +/** + * List all filters in a tabular form to a DCB + * + */ +void +dListFilters(DCB *dcb) +{ +FILTER_DEF *ptr; +int i; + + spinlock_acquire(&filter_spin); + ptr = allFilters; + if (ptr) + { + dcb_printf(dcb, "%-18s | %-15s | Options\n", + "Filter", "Module"); + dcb_printf(dcb, "-------------------------------------------------------------------------------\n"); + } + while (ptr) + { + dcb_printf(dcb, "%-18s | %-15s | ", + ptr->name, ptr->module); + for (i = 0; ptr->options && ptr->options[i]; i++) + dcb_printf(dcb, "%s ", ptr->options[i]); + dcb_printf(dcb, "\n"); + ptr = ptr->next; + } + spinlock_release(&filter_spin); +} + +/** + * Add a router option to a service + * + * @param service The service to add the router option to + * @param option The option string + */ +void +filterAddOption(FILTER_DEF *filter, char *option) +{ +int i; + + spinlock_acquire(&filter->spin); + if (filter->options == NULL) + { + filter->options = (char **)calloc(2, sizeof(char *)); + filter->options[0] = strdup(option); + filter->options[1] = NULL; + } + else + { + for (i = 0; filter->options[i]; i++) + ; + filter->options = (char **)realloc(filter->options, + (i + 2) * sizeof(char *)); + filter->options[i] = strdup(option); + filter->options[i+1] = NULL; + } + spinlock_release(&filter->spin); +} + +/** + * Add a router parameter to a service + * + * @param service The service to add the router option to + * @param name The parameter name + * @param value The parameter value + */ +void +filterAddParameter(FILTER_DEF *filter, char *name, char *value) +{ +int i; + + spinlock_acquire(&filter->spin); + if (filter->parameters == NULL) + { + filter->parameters = (FILTER_PARAMETER **)calloc(2, sizeof(FILTER_PARAMETER *)); + i = 0; + } + else + { + for (i = 0; filter->parameters[i]; i++) + ; + filter->parameters = (FILTER_PARAMETER **)realloc(filter->parameters, + (i + 2) * sizeof(FILTER_PARAMETER *)); + } + filter->parameters[i] = (FILTER_PARAMETER *)calloc(1, sizeof(FILTER_PARAMETER)); + filter->parameters[i]->name = strdup(name); + filter->parameters[i]->value = strdup(value); + filter->parameters[i+1] = NULL; + spinlock_release(&filter->spin); +} + +DOWNSTREAM * +filterApply(FILTER_DEF *filter, SESSION *session, DOWNSTREAM *downstream) +{ +DOWNSTREAM *me; + + if (filter->obj == NULL) + { + /* Filter not yet loaded */ + if ((filter->obj = load_module(filter->module, + MODULE_FILTER)) == NULL) + { + return NULL; + } + } + if (filter->filter == NULL) + filter->filter = (filter->obj->createInstance)(filter->options, + filter->parameters); + if ((me = (DOWNSTREAM *)calloc(1, sizeof(DOWNSTREAM))) == NULL) + { + return NULL; + } + me->instance = filter->filter; + me->routeQuery = filter->obj->routeQuery; + me->session = filter->obj->newSession(me->instance, session); + + filter->obj->setDownstream(me->instance, me->session, downstream); + + return me; +} diff --git a/server/core/load_utils.c b/server/core/load_utils.c index ba5507018..3ace0f91f 100644 --- a/server/core/load_utils.c +++ b/server/core/load_utils.c @@ -28,6 +28,7 @@ * 14/06/13 Mark Riddoch Updated to add call to ModuleInit if one is * defined in the loaded module. * Also updated to call fixed GetModuleObject + * 02/06/14 Mark Riddoch Addition of module info * * @endverbatim */ @@ -38,6 +39,7 @@ #include #include #include +#include #include #include @@ -47,10 +49,11 @@ static MODULES *registered = NULL; static MODULES *find_module(const char *module); static void register_module(const char *module, - const char *type, - void *dlhandle, - char *version, - void *modobj); + const char *type, + void *dlhandle, + char *version, + void *modobj, + MODULE_INFO *info); static void unregister_module(const char *module); char* get_maxscale_home(void) @@ -76,12 +79,13 @@ char* get_maxscale_home(void) void * load_module(const char *module, const char *type) { -char *home, *version; -char fname[MAXPATHLEN]; -void *dlhandle, *sym; -char *(*ver)(); -void *(*ep)(), *modobj; -MODULES *mod; +char *home, *version; +char fname[MAXPATHLEN]; +void *dlhandle, *sym; +char *(*ver)(); +void *(*ep)(), *modobj; +MODULES *mod; +MODULE_INFO *mod_info = NULL; if ((mod = find_module(module)) == NULL) { @@ -141,6 +145,57 @@ MODULES *mod; ModuleInit(); } + if ((sym = dlsym(dlhandle, "info")) != NULL) + { + int fatal = 0; + mod_info = sym; + if (strcmp(type, MODULE_PROTOCOL) == 0 + && mod_info->modapi != MODULE_API_PROTOCOL) + { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Module '%s' does not implement " + "the protocol API.\n", + module))); + fatal = 1; + } + if (strcmp(type, MODULE_ROUTER) == 0 + && mod_info->modapi != MODULE_API_ROUTER) + { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Module '%s' does not implement " + "the router API.\n", + module))); + fatal = 1; + } + if (strcmp(type, MODULE_MONITOR) == 0 + && mod_info->modapi != MODULE_API_MONITOR) + { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Module '%s' does not implement " + "the monitor API.\n", + module))); + fatal = 1; + } + if (strcmp(type, MODULE_FILTER) == 0 + && mod_info->modapi != MODULE_API_FILTER) + { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Module '%s' does not implement " + "the filter API.\n", + module))); + fatal = 1; + } + if (fatal) + { + dlclose(dlhandle); + return NULL; + } + } + if ((sym = dlsym(dlhandle, "GetModuleObject")) == NULL) { LOGIF(LE, (skygw_log_write_flush( @@ -161,7 +216,7 @@ MODULES *mod; module, version, fname))); - register_module(module, type, dlhandle, version, modobj); + register_module(module, type, dlhandle, version, modobj, mod_info); } else { @@ -227,7 +282,7 @@ MODULES *mod = registered; * @param modobj The module object */ static void -register_module(const char *module, const char *type, void *dlhandle, char *version, void *modobj) +register_module(const char *module, const char *type, void *dlhandle, char *version, void *modobj, MODULE_INFO *mod_info) { MODULES *mod; @@ -239,6 +294,7 @@ MODULES *mod; mod->version = strdup(version); mod->modobj = modobj; mod->next = registered; + mod->info = mod_info; registered = mod; } @@ -303,11 +359,25 @@ dprintAllModules(DCB *dcb) { MODULES *ptr = registered; - dcb_printf(dcb, "%-15s | %-11s | Version\n", "Module Name", "Module Type"); - dcb_printf(dcb, "-----------------------------------------------------\n"); + dcb_printf(dcb, "%-15s | %-11s | Version | API | Status\n", "Module Name", "Module Type"); + dcb_printf(dcb, "--------------------------------------------------------------------------\n"); while (ptr) { - dcb_printf(dcb, "%-15s | %-11s | %s\n", ptr->module, ptr->type, ptr->version); + dcb_printf(dcb, "%-15s | %-11s | %-7s ", ptr->module, ptr->type, ptr->version); + if (ptr->info) + dcb_printf(dcb, "| %d.%d.%d | %s", + ptr->info->api_version.major, + ptr->info->api_version.minor, + ptr->info->api_version.patch, + ptr->info->status == MODULE_IN_DEVELOPMENT + ? "In Development" + : (ptr->info->status == MODULE_ALPHA_RELEASE + ? "Alpha" + : (ptr->info->status == MODULE_BETA_RELEASE + ? "Beta" + : (ptr->info->status == MODULE_GA + ? "GA" : "Unknown")))); + dcb_printf(dcb, "\n"); ptr = ptr->next; } } diff --git a/server/core/modutil.c b/server/core/modutil.c new file mode 100644 index 000000000..c6bc9bd00 --- /dev/null +++ b/server/core/modutil.c @@ -0,0 +1,123 @@ +/* + * This file is distributed as part of MaxScale from SkySQL. It is free + * software: you can redistribute it and/or modify it under the terms of the + * GNU General Public License as published by the Free Software Foundation, + * version 2. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Copyright SkySQL Ab 2014 + */ + +/** + * @file modutil.c - Implementation of useful routines for modules + * + * @verbatim + * Revision History + * + * Date Who Description + * 04/06/14 Mark Riddoch Initial implementation + * + * @endverbatim + */ +#include +#include + +/** + * Check if a GWBUF structure is a MySQL COM_QUERY packet + * + * @param buf Buffer to check + * @return True if GWBUF is a COM_QUERY packet + */ +int +modutil_is_SQL(GWBUF *buf) +{ +unsigned char *ptr; + + if (GWBUF_LENGTH(buf) < 5) + return 0; + ptr = GWBUF_DATA(buf); + return ptr[4] == 0x03; // COM_QUERY +} + +/** + * Extract the SQL portion of a COM_QUERY packet + * + * NB This sets *sql to point into the packet and does not + * allocate any new storage. The string pointed to by *sql is + * not NULL terminated. + * + * This routine is very simplistic and does not deal with SQL text + * that spans multiple buffers. + * + * @param buf The packet buffer + * @param sql Pointer that is set to point at the SQL data + * @param length Length of the SQL data + * @return True if the packet is a COM_QUERY packet + */ +int +modutil_extract_SQL(GWBUF *buf, char **sql, int *length) +{ +char *ptr; + + if (!modutil_is_SQL(buf)) + return 0; + ptr = GWBUF_DATA(buf); + *length = *ptr++; + *length += (*ptr++ << 8); + *length += (*ptr++ << 8); + ptr += 2; // Skip sequence id and COM_QUERY byte + *length = *length - 1; + *sql = ptr; + return 1; +} + + +GWBUF * +modutil_replace_SQL(GWBUF *orig, char *sql) +{ +char *ptr; +int length, newlength; +GWBUF *addition; + + if (!modutil_is_SQL(orig)) + return NULL; + ptr = GWBUF_DATA(orig); + length = *ptr++; + length += (*ptr++ << 8); + length += (*ptr++ << 8); + ptr += 2; // Skip sequence id and COM_QUERY byte + + newlength = strlen(sql); + if (length - 1 == newlength) + { + /* New SQL is the same length as old */ + memcpy(ptr, sql, newlength); + } + else if (length - 1 > newlength) + { + /* New SQL is shorter */ + memcpy(ptr, sql, newlength); + GWBUF_RTRIM(orig, (length - 1) - newlength); + } + else + { + memcpy(ptr, sql, length - 1); + addition = gwbuf_alloc(newlength - (length - 1)); + memcpy(GWBUF_DATA(addition), &sql[length - 1], newlength - (length - 1)); + ptr = GWBUF_DATA(orig); + *ptr++ = (newlength + 1) & 0xff; + *ptr++ = ((newlength + 1) >> 8) & 0xff; + *ptr++ = ((newlength + 1) >> 16) & 0xff; + orig->next = addition; + } + + return orig; +} diff --git a/server/core/monitor.c b/server/core/monitor.c index cee2f2d9e..2fca9bcac 100644 --- a/server/core/monitor.c +++ b/server/core/monitor.c @@ -22,8 +22,10 @@ * @verbatim * Revision History * - * Date Who Description - * 08/07/13 Mark Riddoch Initial implementation + * Date Who Description + * 08/07/13 Mark Riddoch Initial implementation + * 23/05/14 Massimiliano Pinto Addition of monitor_interval parameter + * and monitor id * * @endverbatim */ @@ -220,3 +222,47 @@ MONITOR *ptr; spinlock_release(&monLock); return ptr; } + + +/** + * Set the id of the monitor. + * + * @param mon The monitor instance + * @param id The id for the monitor + */ + +void +monitorSetId(MONITOR *mon, unsigned long id) +{ + if (mon->module->defaultId != NULL) { + mon->module->defaultId(mon->handle, id); + } +} + +/** + * Set the monitor sampling interval. + * + * @param mon The monitor instance + * @param interval The sampling interval in milliseconds + */ +void +monitorSetInterval (MONITOR *mon, unsigned long interval) +{ + if (mon->module->setInterval != NULL) { + mon->module->setInterval(mon->handle, interval); + } +} + +/** + * Enable Replication Heartbeat support in monitor. + * + * @param mon The monitor instance + * @param interval The sampling interval in milliseconds + */ +void +monitorSetReplicationHeartbeat(MONITOR *mon, int replication_heartbeat) +{ + if (mon->module->replicationHeartbeat != NULL) { + mon->module->replicationHeartbeat(mon->handle, replication_heartbeat); + } +} diff --git a/server/core/server.c b/server/core/server.c index d80c9372a..42a60caea 100644 --- a/server/core/server.c +++ b/server/core/server.c @@ -27,6 +27,7 @@ * 17/05/14 Mark Riddoch Addition of unique_name * 20/05/14 Massimiliano Pinto Addition of server_string * 21/05/14 Massimiliano Pinto Addition of node_id + * 28/05/14 Massimiliano Pinto Addition of rlagd and node_ts fields * * @endverbatim */ @@ -73,6 +74,8 @@ SERVER *server; server->unique_name = NULL; server->server_string = NULL; server->node_id = -1; + server->rlag = -1; + server->node_ts = 0; spinlock_acquire(&server_spin); server->next = allServers; @@ -247,6 +250,14 @@ char *stat; if (ptr->server_string) dcb_printf(dcb, "\tServer Version:\t\t%s\n", ptr->server_string); dcb_printf(dcb, "\tNode Id: %d\n", ptr->node_id); + if (SERVER_IS_SLAVE(ptr)) { + if (ptr->rlag >= 0) { + dcb_printf(dcb, "\tSlave delay:\t\t%d\n", ptr->rlag); + } + } + if (ptr->node_ts > 0) { + dcb_printf(dcb, "\tLast Repl Heartbeat:\t%lu\n", ptr->node_ts); + } dcb_printf(dcb, "\tNumber of connections: %d\n", ptr->stats.n_connections); dcb_printf(dcb, "\tCurrent no. of conns: %d\n", ptr->stats.n_current); ptr = ptr->next; @@ -275,6 +286,14 @@ char *stat; if (server->server_string) dcb_printf(dcb, "\tServer Version:\t\t%s\n", server->server_string); dcb_printf(dcb, "\tNode Id: %d\n", server->node_id); + if (SERVER_IS_SLAVE(server)) { + if (server->rlag >= 0) { + dcb_printf(dcb, "\tSlave delay:\t\t%d\n", server->rlag); + } + } + if (server->node_ts > 0) { + dcb_printf(dcb, "\tLast Repl Heartbeat:\t%lu\n", server->node_ts); + } dcb_printf(dcb, "\tNumber of connections: %d\n", server->stats.n_connections); dcb_printf(dcb, "\tCurrent no. of conns: %d\n", server->stats.n_current); } @@ -325,6 +344,8 @@ char *status = NULL; if ((status = (char *)malloc(200)) == NULL) return NULL; status[0] = 0; + if (server->status & SERVER_MAINT) + strcat(status, "Maintenance, "); if (server->status & SERVER_MASTER) strcat(status, "Master, "); if (server->status & SERVER_SLAVE) diff --git a/server/core/service.c b/server/core/service.c index ea177f097..b07454b72 100644 --- a/server/core/service.c +++ b/server/core/service.c @@ -30,6 +30,7 @@ * 28/02/14 Massimiliano Pinto users_alloc moved from service_alloc to serviceStartPort (generic hashable for services) * 07/05/14 Massimiliano Pinto Added: version_string initialized to NULL * 23/05/14 Mark Riddoch Addition of service validation call + * 29/05/14 Mark Riddoch Filter API implementation * * @endverbatim */ @@ -46,6 +47,7 @@ #include #include #include +#include #include #include #include @@ -110,6 +112,8 @@ SERVICE *service; service->databases = NULL; service->svc_config_param = NULL; service->svc_config_version = 0; + service->filters = NULL; + service->n_filters = 0; spinlock_init(&service->spin); spinlock_init(&service->users_table_spin); memset(&service->rate_limit, 0, sizeof(SERVICE_REFRESH_RATE)); @@ -538,10 +542,13 @@ serviceClearRouterOptions(SERVICE *service) int i; spinlock_acquire(&service->spin); - for (i = 0; service->routerOptions[i]; i++) - free(service->routerOptions[i]); - free(service->routerOptions); - service->routerOptions = NULL; + if (service->routerOptions != NULL) + { + for (i = 0; service->routerOptions[i]; i++) + free(service->routerOptions[i]); + free(service->routerOptions); + service->routerOptions = NULL; + } spinlock_release(&service->spin); } /** @@ -608,6 +615,63 @@ serviceEnableRootUser(SERVICE *service, int action) return 1; } +/** + * Trim whitespace from the from an rear of a string + * + * @param str String to trim + * @return Trimmed string, chanesg are done in situ + */ +static char * +trim(char *str) +{ +char *ptr; + + while (isspace(*str)) + str++; + + /* Point to last character of the string */ + ptr = str + strlen(str) - 1; + while (ptr > str && isspace(*ptr)) + *ptr-- = 0; + + return str; +} + +/** + * Set the filters used by the service + * + * @param service The service itself + * @param filters ASCII string of filters to use + */ +void +serviceSetFilters(SERVICE *service, char *filters) +{ +FILTER_DEF **flist; +char *ptr, *brkt; +int n = 0; + + flist = (FILTER_DEF *)malloc(sizeof(FILTER_DEF *)); + ptr = strtok_r(filters, "|", &brkt); + while (ptr) + { + n++; + flist = (FILTER_DEF *)realloc(flist, (n + 1) * sizeof(FILTER_DEF *)); + if ((flist[n-1] = filter_find(trim(ptr))) == NULL) + { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Unable to find filter '%s' for service '%s'\n", + trim(ptr), service->name + ))); + } + flist[n] = NULL; + ptr = strtok_r(NULL, "|", &brkt); + } + + service->filters = flist; + service->n_filters = n; +} + /** * Return a named service * @@ -638,6 +702,7 @@ void printService(SERVICE *service) { SERVER *ptr = service->databases; +int i; printf("Service %p\n", service); printf("\tService: %s\n", service->name); @@ -649,6 +714,16 @@ SERVER *ptr = service->databases; printf("\t\t%s:%d Protocol: %s\n", ptr->name, ptr->port, ptr->protocol); ptr = ptr->nextdb; } + if (service->n_filters) + { + printf("\tFilter chain: "); + for (i = 0; i < service->n_filters; i++) + { + printf("%s %s ", service->filters[i]->name, + i + 1 < service->n_filters ? "|" : ""); + } + printf("\n"); + } printf("\tUsers data: %p\n", service->users); printf("\tTotal connections: %d\n", service->stats.n_sessions); printf("\tCurrently connected: %d\n", service->stats.n_current); @@ -705,6 +780,7 @@ SERVICE *ptr; void dprintService(DCB *dcb, SERVICE *service) { SERVER *server = service->databases; +int i; dcb_printf(dcb, "Service %p\n", service); dcb_printf(dcb, "\tService: %s\n", service->name); @@ -714,6 +790,16 @@ SERVER *server = service->databases; service->router->diagnostics(service->router_instance, dcb); dcb_printf(dcb, "\tStarted: %s", asctime(localtime(&service->stats.started))); + if (service->n_filters) + { + dcb_printf(dcb, "\tFilter chain: "); + for (i = 0; i < service->n_filters; i++) + { + dcb_printf(dcb, "%s %s ", service->filters[i]->name, + i + 1 < service->n_filters ? "|" : ""); + } + dcb_printf(dcb, "\n"); + } dcb_printf(dcb, "\tBackend databases\n"); while (server) { @@ -996,4 +1082,4 @@ char* service_get_name( SERVICE* svc) { return svc->name; -} \ No newline at end of file +} diff --git a/server/core/session.c b/server/core/session.c index 8f59b85d3..cb392258c 100644 --- a/server/core/session.c +++ b/server/core/session.c @@ -25,6 +25,7 @@ * Date Who Description * 17/06/13 Mark Riddoch Initial implementation * 02/09/13 Massimiliano Pinto Added session refcounter + * 29/05/14 Mark Riddoch Addition of filter mechanism * * @endverbatim */ @@ -47,6 +48,9 @@ extern int lm_enabled_logfiles_bitmask; static SPINLOCK session_spin = SPINLOCK_INIT; static SESSION *allSessions = NULL; + +static int session_setup_filters(SESSION *session); + /** * Allocate a new session for a new client of the specified service. * @@ -90,6 +94,7 @@ session_alloc(SERVICE *service, DCB *client_dcb) spinlock_acquire(&session->ses_lock); session->service = service; session->client = client_dcb; + session->n_filters = 0; memset(&session->stats, 0, sizeof(SESSION_STATS)); session->stats.connect = time(0); session->state = SESSION_STATE_ALLOC; @@ -132,7 +137,7 @@ session_alloc(SERVICE *service, DCB *client_dcb) /** * Inform other threads that session is closing. */ - session->state == SESSION_STATE_STOPPING; + session->state = SESSION_STATE_STOPPING; /*< * Decrease refcount, set dcb's session pointer NULL * and set session pointer to NULL. @@ -147,7 +152,45 @@ session_alloc(SERVICE *service, DCB *client_dcb) goto return_session; } + + /* + * Pending filter chain being setup set the head of the chain to + * be the router. As filters are inserted the current head will + * be pushed to the filter and the head updated. + * + * NB This dictates that filters are created starting at the end + * of the chain nearest the router working back to the client + * protocol end of the chain. + */ + session->head.instance = service->router_instance; + session->head.session = session->router_session; + + session->head.routeQuery = service->router->routeQuery; + + if (service->n_filters > 0) + { + if (!session_setup_filters(session)) + { + /** + * Inform other threads that session is closing. + */ + session->state = SESSION_STATE_STOPPING; + /*< + * Decrease refcount, set dcb's session pointer NULL + * and set session pointer to NULL. + */ + session_free(session); + client_dcb->session = NULL; + session = NULL; + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Failed to create %s session.", + service->name))); + goto return_session; + } + } } + spinlock_acquire(&session_spin); session->state = SESSION_STATE_ROUTER_READY; session->next = allSessions; @@ -227,6 +270,7 @@ bool session_free( bool succp = false; SESSION *ptr; int nlink; + int i; CHK_SESSION(session); @@ -262,10 +306,23 @@ bool session_free( /* Free router_session and session */ if (session->router_session) { + session->service->router->closeSession( + session->service->router_instance, + session->router_session); session->service->router->freeSession( session->service->router_instance, session->router_session); } + if (session->n_filters) + { + for (i = 0; i < session->n_filters; i++) + { + session->filters[i].filter->obj->freeSession( + session->filters[i].instance, + session->filters[i].session); + } + free(session->filters); + } free(session); succp = true; @@ -440,6 +497,8 @@ SESSION *ptr; void dprintSession(DCB *dcb, SESSION *ptr) { +int i; + dcb_printf(dcb, "Session %p\n", ptr); dcb_printf(dcb, "\tState: %s\n", session_state(ptr->state)); dcb_printf(dcb, "\tService: %s (%p)\n", ptr->service->name, ptr->service); @@ -447,6 +506,18 @@ dprintSession(DCB *dcb, SESSION *ptr) if (ptr->client && ptr->client->remote) dcb_printf(dcb, "\tClient Address: %s\n", ptr->client->remote); dcb_printf(dcb, "\tConnected: %s", asctime(localtime(&ptr->stats.connect))); + if (ptr->n_filters) + { + for (i = 0; i < ptr->n_filters; i++) + { + dcb_printf(dcb, "\tFilter: %s\n", + ptr->filters[i].filter->name); + ptr->filters[i].filter->obj->diagnostics( + ptr->filters[i].instance, + ptr->filters[i].session, + dcb); + } + } } /** @@ -520,3 +591,54 @@ SESSION* get_session_by_router_ses( } return ses; } + + +/** + * Create the filter chain for this session. + * + * Filters must be setup in reverse order, starting with the last + * filter in the chain and working back towards the client connection + * Each filter is passed the current session head of the filter chain + * this head becomes the destination for the filter. The newly created + * filter becomes the new head of the filter chain. + * + * @param session The session that requires the chain + * @return 0 if filter creation fails + */ +static int +session_setup_filters(SESSION *session) +{ +SERVICE *service = session->service; +DOWNSTREAM *head; +int i; + + if ((session->filters = calloc(service->n_filters, + sizeof(SESSION_FILTER))) == NULL) + { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Insufficient memory to allocate session filter " + "tracking.\n"))); + return 0; + } + session->n_filters = service->n_filters; + for (i = service->n_filters - 1; i >= 0; i--) + { + if ((head = filterApply(service->filters[i], session, + &session->head)) == NULL) + { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Failed to create filter '%s' for service '%s'.\n", + service->filters[i]->name, + service->name))); + return 0; + } + session->filters[i].filter = service->filters[i]; + session->filters[i].session = head->session; + session->filters[i].instance = head->instance; + session->head = *head; + } + + return 1; +} diff --git a/server/include/buffer.h b/server/include/buffer.h index 53c81a4a6..9651031b2 100644 --- a/server/include/buffer.h +++ b/server/include/buffer.h @@ -93,6 +93,8 @@ typedef struct gwbuf { /*< Consume a number of bytes in the buffer */ #define GWBUF_CONSUME(b, bytes) (b)->start += bytes +#define GWBUF_RTRIM(b, bytes) (b)->end -= bytes + #define GWBUF_TYPE(b) (b)->gwbuf_type /*< * Function prototypes for the API to maniplate the buffers diff --git a/server/include/config.h b/server/include/config.h index 9331739f6..dc94e3ad9 100644 --- a/server/include/config.h +++ b/server/include/config.h @@ -28,6 +28,7 @@ * Date Who Description * 21/06/13 Mark Riddoch Initial implementation * 07/05/14 Massimiliano Pinto Added version_string to global configuration + * 23/05/14 Massimiliano Pinto Added id to global configuration * * @endverbatim */ @@ -78,6 +79,7 @@ typedef struct config_context { typedef struct { int n_threads; /**< Number of polling threads */ char *version_string; /**< The version string of embedded database library */ + unsigned long id; /**< MaxScale ID */ } GATEWAY_CONF; extern int config_load(char *); diff --git a/server/include/dcb.h b/server/include/dcb.h index ec0bc5b46..08c383014 100644 --- a/server/include/dcb.h +++ b/server/include/dcb.h @@ -19,6 +19,7 @@ */ #include #include +#include #include #include #include @@ -95,6 +96,13 @@ typedef struct gw_protocol { int (*session)(struct dcb *, void *); } GWPROTOCOL; +/** + * The GWPROTOCOL version data. The following should be updated whenever + * the GWPROTOCOL structure is changed. See the rules defined in modinfo.h + * that define how these numbers should change. + */ +#define GWPROTOCOL_VERSION {1, 0, 0} + /** * The statitics gathered on a descriptor control block */ diff --git a/server/include/filter.h b/server/include/filter.h new file mode 100644 index 000000000..8ddcfb00d --- /dev/null +++ b/server/include/filter.h @@ -0,0 +1,114 @@ +#ifndef _FILTER_H +#define _FILTER_H +/* + * This file is distributed as part of the SkySQL Gateway. It is free + * software: you can redistribute it and/or modify it under the terms of the + * GNU General Public License as published by the Free Software Foundation, + * version 2. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Copyright SkySQL Ab 2014 + */ + +/** + * @file filter.h - The filter interface mechanisms + * + * Revision History + * + * Date Who Description + * 27/05/2014 Mark Riddoch Initial implementation + * + */ +#include +#include +#include +#include + +/** + * The FILTER handle points to module specific data, so the best we can do + * is to make it a void * externally. + */ +typedef void *FILTER; + +/** + * The structure used to pass name, value pairs to the filter instances + */ +typedef struct { + char *name; /**< Name of the parameter */ + char *value; /**< Value of the parameter */ +} FILTER_PARAMETER; + +/** + * @verbatim + * The "module object" structure for a query router module + * + * The entry points are: + * createInstance Called by the service to create a new + * instance of the filter + * newSession Called to create a new user session + * within the filter + * closeSession Called when a session is closed + * freeSession Called when a session is freed + * setDownstream Sets the downstream component of the + * filter pipline + * routeQuery Called on each query that requires + * routing + * diagnostics Called to force the filter to print + * diagnostic output + * + * @endverbatim + * + * @see load_module + */ +typedef struct filter_object { + FILTER *(*createInstance)(char **options, FILTER_PARAMETER **); + void *(*newSession)(FILTER *instance, SESSION *session); + void (*closeSession)(FILTER *instance, void *fsession); + void (*freeSession)(FILTER *instance, void *fsession); + void (*setDownstream)(FILTER *instance, void *fsession, DOWNSTREAM *downstream); + int (*routeQuery)(FILTER *instance, void *fsession, GWBUF *queue); + void (*diagnostics)(FILTER *instance, void *fsession, DCB *dcb); +} FILTER_OBJECT; + +/** + * The filter API version. If the FILTER_OBJECT structure or the filter API + * is changed these values must be updated in line with the rules in the + * file modinfo.h. + */ +#define FILTER_VERSION {1, 0, 0} +/** + * The definition of a filter form the configuration file. + * This is basically the link between a plugin to load and the + * optons to pass to that plugin. + */ +typedef struct filter_def { + char *name; /*< The Filter name */ + char *module; /*< The module to load */ + char **options; /*< The options set for this filter */ + FILTER_PARAMETER + **parameters; /*< The filter parameters */ + FILTER filter; + FILTER_OBJECT *obj; + SPINLOCK spin; + struct filter_def + *next; /*< Next filter in the chain of all filters */ +} FILTER_DEF; + +FILTER_DEF *filter_alloc(char *, char *); +void filter_free(FILTER_DEF *); +FILTER_DEF *filter_find(char *); +void filterAddOption(FILTER_DEF *, char *); +void filterAddParameter(FILTER_DEF *, char *, char *); +DOWNSTREAM *filterApply(FILTER_DEF *, SESSION *, DOWNSTREAM *); +void dprintAllFilters(DCB *); +void dprintFilter(DCB *, FILTER_DEF *); +void dListFilters(DCB *); +#endif diff --git a/server/include/modinfo.h b/server/include/modinfo.h new file mode 100644 index 000000000..c3c1e64da --- /dev/null +++ b/server/include/modinfo.h @@ -0,0 +1,85 @@ +#ifndef _MODINFO_H +#define _MODINFO_H +/* + * This file is distributed as part of MaxScale. It is free + * software: you can redistribute it and/or modify it under the terms of the + * GNU General Public License as published by the Free Software Foundation, + * version 2. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Copyright SkySQL Ab 2014 + */ + +/** + * @file modinfo.h The module information interface + * + * @verbatim + * Revision History + * + * Date Who Description + * 02/06/14 Mark Riddoch Initial implementation + * + * @endverbatim + */ + +/** + * The status of the module. This gives some idea of the module + * maturity. + */ +typedef enum { + MODULE_IN_DEVELOPMENT = 0, + MODULE_ALPHA_RELEASE, + MODULE_BETA_RELEASE, + MODULE_GA +} MODULE_STATUS; + +/** + * The API implemented by the module + */ +typedef enum { + MODULE_API_PROTOCOL = 0, + MODULE_API_ROUTER, + MODULE_API_MONITOR, + MODULE_API_FILTER, + MODULE_API_AUTHENTICATION +} MODULE_API; + +/** + * The module version structure. + * + * The rules for changing these values are: + * + * Any change that affects an inexisting call in the API in question, + * making the new API no longer compatible with the old, + * must increment the major version. + * + * Any change that adds to the API, but does not alter the existing API + * calls, must increment the minor version. + * + * Any change that is purely cosmetic and does not affect the calling + * conventions of the API must increment only the patch version number. + */ +typedef struct { + int major; + int minor; + int patch; +} MODULE_VERSION; + +/** + * The module information structure + */ +typedef struct { + MODULE_API modapi; + MODULE_STATUS status; + MODULE_VERSION api_version; + char *description; +} MODULE_INFO; +#endif diff --git a/server/include/modules.h b/server/include/modules.h index c90cf45a1..199e3a24b 100644 --- a/server/include/modules.h +++ b/server/include/modules.h @@ -18,6 +18,7 @@ * Copyright SkySQL Ab 2013 */ #include +#include /** * @file modules.h Utilities for loading modules @@ -30,6 +31,7 @@ * Date Who Description * 13/06/13 Mark Riddoch Initial implementation * 08/07/13 Mark Riddoch Addition of monitor modules + * 29/05/14 Mark Riddoch Addition of filter modules * @endverbatim */ @@ -39,6 +41,8 @@ typedef struct modules { char *version; /**< Module version */ void *handle; /**< The handle returned by dlopen */ void *modobj; /**< The module "object" this is the set of entry points */ + MODULE_INFO + *info; /**< The module information */ struct modules *next; /**< Next module in the linked list */ } MODULES; @@ -49,6 +53,7 @@ typedef struct modules { #define MODULE_PROTOCOL "Protocol" /**< A protocol module type */ #define MODULE_ROUTER "Router" /**< A router module type */ #define MODULE_MONITOR "Monitor" /**< A database monitor module type */ +#define MODULE_FILTER "Filter" /**< A filter module type */ extern void *load_module(const char *module, const char *type); diff --git a/server/include/modutil.h b/server/include/modutil.h new file mode 100644 index 000000000..2092ddea5 --- /dev/null +++ b/server/include/modutil.h @@ -0,0 +1,37 @@ +#ifndef _MODUTIL_H +#define _MODUTIL_H +/* + * This file is distributed as part of MaxScale from SkySQL. It is free + * software: you can redistribute it and/or modify it under the terms of the + * GNU General Public License as published by the Free Software Foundation, + * version 2. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Copyright SkySQL Ab 2014 + */ + +/** + * @file modutil.h A set of useful routines for module writers + * + * @verbatim + * Revision History + * + * Date Who Description + * 04/06/14 Mark Riddoch Initial implementation + * + * @endverbatim + */ +#include + +extern int modutil_is_SQL(GWBUF *); +extern int modutil_extract_SQL(GWBUF *, char **, int *); +extern GWBUF *modutil_replace_SQL(GWBUF *, char *); +#endif diff --git a/server/include/monitor.h b/server/include/monitor.h index 6444fecd5..861c1c070 100644 --- a/server/include/monitor.h +++ b/server/include/monitor.h @@ -26,10 +26,11 @@ * @verbatim * Revision History * - * Date Who Description - * 07/07/13 Mark Riddoch Initial implementation - * 25/07/13 Mark Riddoch Addition of diagnotics - * 23/05/14 Mark Riddoch Addition of routine to find monitors by name + * Date Who Description + * 07/07/13 Mark Riddoch Initial implementation + * 25/07/13 Mark Riddoch Addition of diagnotics + * 23/05/14 Mark Riddoch Addition of routine to find monitors by name + * 23/05/14 Massimiliano Pinto Addition of defaultId and setInterval * * @endverbatim */ @@ -66,8 +67,17 @@ typedef struct { void (*unregisterServer)(void *, SERVER *); void (*defaultUser)(void *, char *, char *); void (*diagnostics)(DCB *, void *); + void (*setInterval)(void *, unsigned long); + void (*defaultId)(void *, unsigned long); + void (*replicationHeartbeat)(void *, int); } MONITOR_OBJECT; +/** + * The monitor API version number. Any change to the monitor module API + * must change these versions usign the rules defined in modinfo.h + */ +#define MONITOR_VERSION {1, 0, 0} + /** * Representation of the running monitor. */ @@ -87,4 +97,7 @@ extern void monitorStop(MONITOR *); extern void monitorStart(MONITOR *); extern void monitorStopAll(); extern void monitorShowAll(DCB *); +extern void monitorSetId(MONITOR *, unsigned long); +extern void monitorSetInterval (MONITOR *, unsigned long); +extern void monitorSetReplicationHeartbeat(MONITOR *, int); #endif diff --git a/server/include/router.h b/server/include/router.h index 0c467e671..ded9428a0 100644 --- a/server/include/router.h +++ b/server/include/router.h @@ -84,6 +84,13 @@ typedef struct router_object { uint8_t (*getCapabilities)(ROUTER *instance, void* router_session); } ROUTER_OBJECT; +/** + * The router module API version. Any change that changes the router API + * must update these versions numbers in accordance with the rules in + * modinfo.h. + */ +#define ROUTER_VERSION { 1, 0, 0 } + typedef enum router_capability_t { RCAP_TYPE_UNDEFINED = 0, RCAP_TYPE_STMT_INPUT = (1 << 0), diff --git a/server/include/server.h b/server/include/server.h index e36108481..b15453c18 100644 --- a/server/include/server.h +++ b/server/include/server.h @@ -34,6 +34,8 @@ * 18/05/14 Mark Riddoch Addition of unique_name field * 20/05/14 Massimiliano Pinto Addition of server_string field * 20/05/14 Massimiliano Pinto Addition of node_id field + * 23/05/14 Massimiliano Pinto Addition of rlag and node_ts fields + * 03/06/14 Mark Riddoch Addition of maintainance mode * * @endverbatim */ @@ -66,6 +68,8 @@ typedef struct server { struct server *nextdb; /**< Next server in list attached to a service */ char *server_string; /**< Server version string, i.e. MySQL server version */ long node_id; /**< Node id, server_id for M/S or local_index for Galera */ + int rlag; /**< Replication Lag for Master / Slave replication */ + unsigned long node_ts; /**< Last timestamp set from M/S monitor module */ } SERVER; /** @@ -77,14 +81,13 @@ typedef struct server { #define SERVER_MASTER 0x0002 /**<< The server is a master, i.e. can handle writes */ #define SERVER_SLAVE 0x0004 /**<< The server is a slave, i.e. can handle reads */ #define SERVER_JOINED 0x0008 /**<< The server is joined in a Galera cluster */ -#define SERVER_MAINT 0x1000 /**<< Server is in maintenance mode */ - +#define SERVER_MAINT 0x1000 /**<< Server is in maintenance mode */ /** * Is the server running - the macro returns true if the server is marked as running * regardless of it's state as a master or slave */ -#define SERVER_IS_RUNNING(server) ((server)->status & SERVER_RUNNING) +#define SERVER_IS_RUNNING(server) (((server)->status & (SERVER_RUNNING|SERVER_MAINT)) == SERVER_RUNNING) /** * Is the server marked as down - the macro returns true if the server is beleived * to be inoperable. @@ -95,26 +98,25 @@ typedef struct server { * in order for the macro to return true */ #define SERVER_IS_MASTER(server) \ - (((server)->status & (SERVER_RUNNING|SERVER_MASTER|SERVER_SLAVE)) == (SERVER_RUNNING|SERVER_MASTER)) + (((server)->status & (SERVER_RUNNING|SERVER_MASTER|SERVER_SLAVE|SERVER_MAINT)) == (SERVER_RUNNING|SERVER_MASTER)) /** * Is the server a slave? The server must be both running and marked as a slave * in order for the macro to return true */ #define SERVER_IS_SLAVE(server) \ - (((server)->status & (SERVER_RUNNING|SERVER_MASTER|SERVER_SLAVE)) == (SERVER_RUNNING|SERVER_SLAVE)) + (((server)->status & (SERVER_RUNNING|SERVER_MASTER|SERVER_SLAVE|SERVER_MAINT)) == (SERVER_RUNNING|SERVER_SLAVE)) /** * Is the server joined Galera node? The server must be running and joined. */ #define SERVER_IS_JOINED(server) \ - (((server)->status & (SERVER_RUNNING|SERVER_JOINED)) == (SERVER_RUNNING|SERVER_JOINED)) + (((server)->status & (SERVER_RUNNING|SERVER_JOINED|SERVER_MAINT)) == (SERVER_RUNNING|SERVER_JOINED)) /** - * Is the server in maintenance mode. - */ -#define SERVER_IN_MAINT(server) ((server)->status & SERVER_MAINT) - - + * Is the server in maintenance mode. + */ +#define SERVER_IN_MAINT(server) ((server)->status & SERVER_MAINT) + extern SERVER *server_alloc(char *, char *, unsigned short); extern int server_free(SERVER *); extern SERVER *server_find_by_unique_name(char *); @@ -129,6 +131,5 @@ extern void server_set_status(SERVER *, int); extern void server_clear_status(SERVER *, int); extern void serverAddMonUser(SERVER *, char *, char *); extern void server_update(SERVER *, char *, char *, char *); -void server_set_unique_name(SERVER *server, char *name); - +extern void server_set_unique_name(SERVER *, char *); #endif diff --git a/server/include/service.h b/server/include/service.h index a2f700ad5..4df43613d 100644 --- a/server/include/service.h +++ b/server/include/service.h @@ -22,6 +22,7 @@ #include #include #include +#include #include "config.h" /** @@ -39,7 +40,9 @@ * 23/06/13 Mark Riddoch Added service user and users * 06/02/14 Massimiliano Pinto Added service flag for root user access * 25/02/14 Massimiliano Pinto Added service refresh limit feature - * 07/05/14 Massimiliano Pinto Added version_string field to service struct + * 07/05/14 Massimiliano Pinto Added version_string field to service + * struct + * 29/05/14 Mark Riddoch Filter API mechanism * * @endverbatim */ @@ -73,9 +76,10 @@ typedef struct { } SERVICE_STATS; /** - * The service user structure holds the information that is needed for this service to - * allow the gateway to login to the backend database and extact information such as - * the user table or other database status or configuration data. + * The service user structure holds the information that is needed + for this service to allow the gateway to login to the backend + database and extact information such as the user table or other + database status or configuration data. */ typedef struct { char *name; /**< The user name to use to extract information */ @@ -83,8 +87,8 @@ typedef struct { } SERVICE_USER; /** - * The service refresh rate hols the counter and last load timet for this service to - * load users data from the backend database + * The service refresh rate holds the counter and last load time_t + for this service to load users data from the backend database */ typedef struct { int nloads; @@ -99,30 +103,33 @@ typedef struct { * to the service. */ typedef struct service { - char *name; /**< The service name */ - int state; /**< The service state */ - SERV_PROTOCOL *ports; /**< Linked list of ports and protocols - * that this service will listen on. - */ - char *routerModule; /**< Name of router module to use */ - char **routerOptions; /**< Router specific option strings */ + char *name; /**< The service name */ + int state; /**< The service state */ + SERV_PROTOCOL *ports; /**< Linked list of ports and protocols + * that this service will listen on. + */ + char *routerModule; /**< Name of router module to use */ + char **routerOptions;/**< Router specific option strings */ struct router_object - *router; /**< The router we are using */ + *router; /**< The router we are using */ void *router_instance; - /**< The router instance for this service */ - char *version_string; /** version string for this service listeners */ - struct server *databases; /**< The set of servers in the backend */ - SERVICE_USER credentials; /**< The cedentials of the service user */ - SPINLOCK spin; /**< The service spinlock */ - SERVICE_STATS stats; /**< The service statistics */ - struct users *users; /**< The user data for this service */ - int enable_root; /**< Allow root user access */ - CONFIG_PARAMETER* svc_config_param; /*< list of config params and values */ - int svc_config_version; /*< Version number of configuration */ + /**< The router instance for this service */ + char *version_string;/** version string for this service listeners */ + struct server *databases; /**< The set of servers in the backend */ + SERVICE_USER credentials; /**< The cedentials of the service user */ + SPINLOCK spin; /**< The service spinlock */ + SERVICE_STATS stats; /**< The service statistics */ + struct users *users; /**< The user data for this service */ + int enable_root; /**< Allow root user access */ + CONFIG_PARAMETER* + svc_config_param; /*< list of config params and values */ + int svc_config_version; /*< Version number of configuration */ SPINLOCK users_table_spin; /**< The spinlock for users data refresh */ SERVICE_REFRESH_RATE rate_limit; /**< The refresh rate limit for users table */ + FILTER_DEF **filters; /**< Ordered list of filters */ + int n_filters; /**< Number of filters */ struct service *next; /**< The next service in the linked list */ } SERVICE; diff --git a/server/include/session.h b/server/include/session.h index 3f6955ae3..f99982802 100644 --- a/server/include/session.h +++ b/server/include/session.h @@ -31,16 +31,20 @@ * 01-07-2013 Massimiliano Pinto Removed backends pointer * from struct session * 02-09-2013 Massimiliano Pinto Added session ref counter + * 29-05-2014 Mark Riddoch Support for filter mechanism + * added * * @endverbatim */ #include #include +#include #include #include struct dcb; struct service; +struct filter_def; /** * The session statistics structure @@ -59,6 +63,39 @@ typedef enum { SESSION_STATE_FREE /*< for all sessions */ } session_state_t; +/** + * The downstream element in the filter chain. This may refer to + * another filter or to a router. + */ +typedef struct { + void *instance; + void *session; + int (*routeQuery)(void *instance, + void *router_session, GWBUF *queue); +} DOWNSTREAM; + +/** + * The upstream element in the filter chain. This may refer to + * another filter or to the protocol implementation. + */ +typedef struct { + void *instance; + void *session; + int (*write)(void *, void *, GWBUF *); + int (*error)(void *); +} UPSTREAM; + +/** + * Structure used to track the filter instances and sessions of the filters + * that are in use within a session. + */ +typedef struct { + struct filter_def + *filter; + void *instance; + void *session; +} SESSION_FILTER; + /** * The session status block * @@ -77,6 +114,9 @@ typedef struct session { void *router_session;/**< The router instance data */ SESSION_STATS stats; /**< Session statistics */ struct service *service; /**< The service this session is using */ + int n_filters; /**< Number of filter sessions */ + SESSION_FILTER *filters; /**< The filters in use within this session */ + DOWNSTREAM head; /**< Head of the filter chain */ struct session *next; /**< Linked list of all sessions */ int refcount; /**< Reference count on the session */ #if defined(SS_DEBUG) @@ -86,6 +126,15 @@ typedef struct session { #define SESSION_PROTOCOL(x, type) DCB_PROTOCOL((x)->client, type) +/** + * A convenience macro that can be used by the protocol modules to route + * the incoming data to the first element in the pipeline of filters and + * routers. + */ +#define SESSION_ROUTE_QUERY(session, buf) \ + ((session)->head.routeQuery)((session)->head.instance, \ + (session)->head.session, (buf)) + SESSION *session_alloc(struct service *, struct dcb *); bool session_free(SESSION *); int session_isvalid(SESSION *); diff --git a/server/modules/filter/Makefile b/server/modules/filter/Makefile new file mode 100644 index 000000000..14b226b7d --- /dev/null +++ b/server/modules/filter/Makefile @@ -0,0 +1,86 @@ +# This file is distributed as part of MaxScale form SkySQL. It is free +# software: you can redistribute it and/or modify it under the terms of the +# GNU General Public License as published by the Free Software Foundation, +# version 2. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more +# details. +# +# You should have received a copy of the GNU General Public License along with +# this program; if not, write to the Free Software Foundation, Inc., 51 +# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +# +# Copyright SkySQL Ab 2014 +# +# Revision History +# Date Who Description +# 29/05/14 Mark Riddoch Initial module development + +include ../../../build_gateway.inc + +LOGPATH := $(ROOT_PATH)/log_manager +UTILSPATH := $(ROOT_PATH)/utils + +CC=cc +CFLAGS=-c -fPIC -I/usr/include -I../include -I../../include -I$(LOGPATH) \ + -I$(UTILSPATH) -Wall -g + +include ../../../makefile.inc + +LDFLAGS=-shared -L$(LOGPATH) -Wl,-rpath,$(DEST)/lib \ + -Wl,-rpath,$(LOGPATH) -Wl,-rpath,$(UTILSPATH) + +TESTSRCS=testfilter.c +TESTOBJ=$(TESTSRCS:.c=.o) +QLASRCS=qlafilter.c +QLAOBJ=$(QLASRCS:.c=.o) +REGEXSRCS=regexfilter.c +REGEXOBJ=$(REGEXSRCS:.c=.o) +SRCS=$(TESTSRCS) $(QLASRCS) $(REGEXSRCS) +OBJ=$(SRCS:.c=.o) +LIBS=$(UTILSPATH)/skygw_utils.o -lssl -llog_manager +MODULES= libtestfilter.so libqlafilter.so libregexfilter.so + + +all: $(MODULES) + +libtestfilter.so: $(TESTOBJ) + $(CC) $(LDFLAGS) $(TESTOBJ) $(LIBS) -o $@ + +libqlafilter.so: $(QLAOBJ) + $(CC) $(LDFLAGS) $(QLAOBJ) $(LIBS) -o $@ + +libregexfilter.so: $(REGEXOBJ) + $(CC) $(LDFLAGS) $(REGEXOBJ) $(LIBS) -o $@ + +.c.o: + $(CC) $(CFLAGS) $< -o $@ + +clean: + rm -f $(OBJ) $(MODULES) + +tags: + ctags $(SRCS) $(HDRS) + +depend: + @rm -f depend.mk + cc -M $(CFLAGS) $(SRCS) > depend.mk + +install: $(MODULES) + install -D $(MODULES) $(DEST)/MaxScale/modules + +cleantests: + $(MAKE) -C test cleantests + +buildtests: + $(MAKE) -C test DEBUG=Y buildtests + +runtests: + $(MAKE) -C test runtests + +testall: + $(MAKE) -C test testall + +include depend.mk diff --git a/server/modules/filter/qlafilter.c b/server/modules/filter/qlafilter.c new file mode 100644 index 000000000..520f1e1a9 --- /dev/null +++ b/server/modules/filter/qlafilter.c @@ -0,0 +1,295 @@ +/* + * This file is distributed as part of MaxScale by SkySQL. It is free + * software: you can redistribute it and/or modify it under the terms of the + * GNU General Public License as published by the Free Software Foundation, + * version 2. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Copyright SkySQL Ab 2014 + */ + +/** + * QLA Filter - Query Log All. A primitive query logging filter, simply + * used to verify the filter mechanism for downstream filters. All queries + * that are passed through the filter will be written to file. + * + * The filter makes no attempt to deal with query packets that do not fit + * in a single GWBUF. + * + * A single option may be passed to the filter, this is the name of the + * file to which the queries are logged. A serial number is appended to this + * name in order that each session logs to a different file. + */ +#include +#include +#include +#include +#include +#include +#include +#include + +MODULE_INFO info = { + MODULE_API_FILTER, + MODULE_ALPHA_RELEASE, + FILTER_VERSION, + "A simple query logging filter" +}; + +static char *version_str = "V1.0.0"; + +/* + * The filter entry points + */ +static FILTER *createInstance(char **options, FILTER_PARAMETER **); +static void *newSession(FILTER *instance, SESSION *session); +static void closeSession(FILTER *instance, void *session); +static void freeSession(FILTER *instance, void *session); +static void setDownstream(FILTER *instance, void *fsession, DOWNSTREAM *downstream); +static int routeQuery(FILTER *instance, void *fsession, GWBUF *queue); +static void diagnostic(FILTER *instance, void *fsession, DCB *dcb); + + +static FILTER_OBJECT MyObject = { + createInstance, + newSession, + closeSession, + freeSession, + setDownstream, + routeQuery, + diagnostic, +}; + +/** + * A instance structure, the assumption is that the option passed + * to the filter is simply a base for the filename to which the queries + * are logged. + * + * To this base a session number is attached such that each session will + * have a nique name. + */ +typedef struct { + int sessions; + char *filebase; +} QLA_INSTANCE; + +/** + * The session structure for this QLA filter. + * This stores the downstream filter information, such that the + * filter is able to pass the query on to the next filter (or router) + * in the chain. + * + * It also holds the file descriptor to which queries are written. + */ +typedef struct { + DOWNSTREAM down; + char *filename; + int fd; +} QLA_SESSION; + +/** + * Implementation of the mandatory version entry point + * + * @return version string of the module + */ +char * +version() +{ + return version_str; +} + +/** + * The module initialisation routine, called when the module + * is first loaded. + */ +void +ModuleInit() +{ +} + +/** + * The module entry point routine. It is this routine that + * must populate the structure that is referred to as the + * "module object", this is a structure with the set of + * external entry points for this module. + * + * @return The module object + */ +FILTER_OBJECT * +GetModuleObject() +{ + return &MyObject; +} + +/** + * Create an instance of the filter for a particular service + * within MaxScale. + * + * @param options The options for this filter + * + * @return The instance data for this new instance + */ +static FILTER * +createInstance(char **options, FILTER_PARAMETER **params) +{ +QLA_INSTANCE *my_instance; + + if ((my_instance = calloc(1, sizeof(QLA_INSTANCE))) != NULL) + { + if (options) + my_instance->filebase = strdup(options[0]); + else + my_instance->filebase = strdup("qla"); + my_instance->sessions = 0; + } + return (FILTER *)my_instance; +} + +/** + * Associate a new session with this instance of the filter. + * + * Create the file to log to and open it. + * + * @param instance The filter instance data + * @param session The session itself + * @return Session specific data for this session + */ +static void * +newSession(FILTER *instance, SESSION *session) +{ +QLA_INSTANCE *my_instance = (QLA_INSTANCE *)instance; +QLA_SESSION *my_session; + + if ((my_session = calloc(1, sizeof(QLA_SESSION))) != NULL) + { + if ((my_session->filename = + (char *)malloc(strlen(my_instance->filebase) + 20)) + == NULL) + { + free(my_session); + return NULL; + } + sprintf(my_session->filename, "%s.%d", my_instance->filebase, + my_instance->sessions); + my_instance->sessions++; + my_session->fd = open(my_session->filename, + O_WRONLY|O_CREAT|O_TRUNC, 0666); + } + + return my_session; +} + +/** + * Close a session with the filter, this is the mechanism + * by which a filter may cleanup data structure etc. + * In the case of the QLA filter we simple close the file descriptor. + * + * @param instance The filter instance data + * @param session The session being closed + */ +static void +closeSession(FILTER *instance, void *session) +{ +QLA_SESSION *my_session = (QLA_SESSION *)session; + + close(my_session->fd); +} + +/** + * Free the memory associated with the session + * + * @param instance The filter instance + * @param session The filter session + */ +static void +freeSession(FILTER *instance, void *session) +{ +QLA_SESSION *my_session = (QLA_SESSION *)session; + + free(my_session->filename); + free(session); + return; +} + +/** + * Set the downstream filter or router to which queries will be + * passed from this filter. + * + * @param instance The filter instance data + * @param session The filter session + * @param downstream The downstream filter or router. + */ +static void +setDownstream(FILTER *instance, void *session, DOWNSTREAM *downstream) +{ +QLA_SESSION *my_session = (QLA_SESSION *)session; + + my_session->down = *downstream; +} + +/** + * The routeQuery entry point. This is passed the query buffer + * to which the filter should be applied. Once applied the + * query should normally be passed to the downstream component + * (filter or router) in the filter chain. + * + * @param instance The filter instance data + * @param session The filter session + * @param queue The query data + */ +static int +routeQuery(FILTER *instance, void *session, GWBUF *queue) +{ +QLA_SESSION *my_session = (QLA_SESSION *)session; +char *ptr, t_buf[40]; +int length; +struct tm t; +struct timeval tv; + + if (modutil_extract_SQL(queue, &ptr, &length)) + { + gettimeofday(&tv, NULL); + localtime_r(&tv.tv_sec, &t); + sprintf(t_buf, "%02d:%02d:%02d.%-3d %d/%02d/%d, ", + t.tm_hour, t.tm_min, t.tm_sec, (int)(tv.tv_usec / 1000), + t.tm_mday, t.tm_mon + 1, 1900 + t.tm_year); + write(my_session->fd, t_buf, strlen(t_buf)); + write(my_session->fd, ptr, length); + write(my_session->fd, "\n", 1); + } + + /* Pass the query downstream */ + return my_session->down.routeQuery(my_session->down.instance, + my_session->down.session, queue); +} + +/** + * Diagnostics routine + * + * If fsession is NULL then print diagnostics on the filter + * instance as a whole, otherwise print diagnostics for the + * particular session. + * + * @param instance The filter instance + * @param fsession Filter session, may be NULL + * @param dcb The DCB for diagnostic output + */ +static void +diagnostic(FILTER *instance, void *fsession, DCB *dcb) +{ +QLA_SESSION *my_session = (QLA_SESSION *)fsession; + + if (my_session) + { + dcb_printf(dcb, "\t\tLogging to file %s.\n", + my_session->filename); + } +} diff --git a/server/modules/filter/regexfilter.c b/server/modules/filter/regexfilter.c new file mode 100644 index 000000000..ad773c40c --- /dev/null +++ b/server/modules/filter/regexfilter.c @@ -0,0 +1,355 @@ +/* + * This file is distributed as part of MaxScale by SkySQL. It is free + * software: you can redistribute it and/or modify it under the terms of the + * GNU General Public License as published by the Free Software Foundation, + * version 2. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Copyright SkySQL Ab 2014 + */ +#include +#include +#include +#include +#include +#include + +/** + * regexfilter.c - a very simple regular expression rewrite filter. + * + * A simple regular expression query rewrite filter. + * Two parameters should be defined in the filter configuration + * match= + * replace= + */ + +MODULE_INFO info = { + MODULE_API_FILTER, + MODULE_ALPHA_RELEASE, + FILTER_VERSION, + "A query rewrite filter that uses regular expressions to rewite queries" +}; + +static char *version_str = "V1.0.0"; + +static FILTER *createInstance(char **options, FILTER_PARAMETER **params); +static void *newSession(FILTER *instance, SESSION *session); +static void closeSession(FILTER *instance, void *session); +static void freeSession(FILTER *instance, void *session); +static void setDownstream(FILTER *instance, void *fsession, DOWNSTREAM *downstream); +static int routeQuery(FILTER *instance, void *fsession, GWBUF *queue); +static void diagnostic(FILTER *instance, void *fsession, DCB *dcb); + +static char *regex_replace(char *sql, int length, regex_t *re, char *replace); + +static FILTER_OBJECT MyObject = { + createInstance, + newSession, + closeSession, + freeSession, + setDownstream, + routeQuery, + diagnostic, +}; + +/** + * Instance structure + */ +typedef struct { + char *match; /* Regular expression to match */ + char *replace; /* Replacement text */ + regex_t re; /* Compiled regex text */ +} REGEX_INSTANCE; + +/** + * The session structure for this regex filter + */ +typedef struct { + DOWNSTREAM down; + int no_change; + int replacements; +} REGEX_SESSION; + +/** + * Implementation of the mandatory version entry point + * + * @return version string of the module + */ +char * +version() +{ + return version_str; +} + +/** + * The module initialisation routine, called when the module + * is first loaded. + */ +void +ModuleInit() +{ +} + +/** + * The module entry point routine. It is this routine that + * must populate the structure that is referred to as the + * "module object", this is a structure with the set of + * external entry points for this module. + * + * @return The module object + */ +FILTER_OBJECT * +GetModuleObject() +{ + return &MyObject; +} + +/** + * Create an instance of the filter for a particular service + * within MaxScale. + * + * @param options The options for this filter + * + * @return The instance data for this new instance + */ +static FILTER * +createInstance(char **options, FILTER_PARAMETER **params) +{ +REGEX_INSTANCE *my_instance; +int i; + + if ((my_instance = calloc(1, sizeof(REGEX_INSTANCE))) != NULL) + { + my_instance->match = NULL; + my_instance->replace = NULL; + + for (i = 0; params[i]; i++) + { + if (!strcmp(params[i]->name, "match")) + my_instance->match = strdup(params[i]->value); + if (!strcmp(params[i]->name, "replace")) + my_instance->replace = strdup(params[i]->value); + } + if (my_instance->match == NULL || my_instance->replace == NULL) + { + return NULL; + } + + if (regcomp(&my_instance->re, my_instance->match, REG_ICASE)) + { + free(my_instance->match); + free(my_instance->replace); + free(my_instance); + return NULL; + } + } + return (FILTER *)my_instance; +} + +/** + * Associate a new session with this instance of the filter. + * + * @param instance The filter instance data + * @param session The session itself + * @return Session specific data for this session + */ +static void * +newSession(FILTER *instance, SESSION *session) +{ +REGEX_SESSION *my_session; + + if ((my_session = calloc(1, sizeof(REGEX_SESSION))) != NULL) + { + my_session->no_change = 0; + my_session->replacements = 0; + } + + return my_session; +} + +/** + * Close a session with the filter, this is the mechanism + * by which a filter may cleanup data structure etc. + * + * @param instance The filter instance data + * @param session The session being closed + */ +static void +closeSession(FILTER *instance, void *session) +{ +} + +/** + * Free the memory associated with this filter session. + * + * @param instance The filter instance data + * @param session The session being closed + */ +static void +freeSession(FILTER *instance, void *session) +{ + free(session); + return; +} + +/** + * Set the downstream component for this filter. + * + * @param instance The filter instance data + * @param session The session being closed + * @param downstream The downstream filter or router + */ +static void +setDownstream(FILTER *instance, void *session, DOWNSTREAM *downstream) +{ +REGEX_SESSION *my_session = (REGEX_SESSION *)session; + + my_session->down = *downstream; +} + +/** + * The routeQuery entry point. This is passed the query buffer + * to which the filter should be applied. Once applied the + * query shoudl normally be passed to the downstream component + * (filter or router) in the filter chain. + * + * @param instance The filter instance data + * @param session The filter session + * @param queue The query data + */ +static int +routeQuery(FILTER *instance, void *session, GWBUF *queue) +{ +REGEX_INSTANCE *my_instance = (REGEX_INSTANCE *)instance; +REGEX_SESSION *my_session = (REGEX_SESSION *)session; +char *sql, *newsql; +int length; + + if (modutil_is_SQL(queue)) + { + modutil_extract_SQL(queue, &sql, &length); + newsql = regex_replace(sql, length, &my_instance->re, + my_instance->replace); + if (newsql) + { + queue = modutil_replace_SQL(queue, newsql); + free(newsql); + my_session->replacements++; + } + else + my_session->no_change++; + + } + return my_session->down.routeQuery(my_session->down.instance, + my_session->down.session, queue); +} + +/** + * Diagnostics routine + * + * If fsession is NULL then print diagnostics on the filter + * instance as a whole, otherwise print diagnostics for the + * particular session. + * + * @param instance The filter instance + * @param fsession Filter session, may be NULL + * @param dcb The DCB for diagnostic output + */ +static void +diagnostic(FILTER *instance, void *fsession, DCB *dcb) +{ +REGEX_INSTANCE *my_instance = (REGEX_INSTANCE *)instance; +REGEX_SESSION *my_session = (REGEX_SESSION *)fsession; + + dcb_printf(dcb, "\t\tSearch and replace: s/%s/%s/\n", + my_instance->match, my_instance->replace); + if (my_session) + { + dcb_printf(dcb, "\t\tNo. of queries unaltered by filter: %d\n", + my_session->no_change); + dcb_printf(dcb, "\t\tNo. of queries altered by filter: %d\n", + my_session->replacements); + } +} + +/** + * Perform a regular expression match and subsititution on the SQL + * + * @param sql The original SQL text + * @param length The length of the SQL text + * @param re The compiled regular expression + * @param replace The replacement text + * @return The replaced text or NULL if no replacement was done. + */ +static char * +regex_replace(char *sql, int length, regex_t *re, char *replace) +{ +char *orig, *result, *ptr; +int i, res_size, res_length, rep_length; +int last_match; +regmatch_t match[10]; + + orig = strndup(sql, length); + if (regexec(re, orig, 10, match, 0)) + { + free(orig); + return NULL; + } + res_size = 2 * length; + result = (char *)malloc(res_size); + res_length = 0; + rep_length = strlen(replace); + last_match = 0; + + for (i = 0; i < 10; i++) + { + if (match[i].rm_so != -1) + { + ptr = &result[res_length]; + if (last_match < match[i].rm_so) + { + int to_copy = match[i].rm_so - last_match; + if (last_match + to_copy > res_size) + { + res_size = last_match + to_copy + length; + result = (char *)realloc(result, res_size); + } + memcpy(ptr, &sql[last_match], to_copy); + res_length += to_copy; + } + last_match = match[i].rm_eo; + if (res_length + rep_length > res_size) + { + res_size += rep_length; + result = (char *)realloc(result, res_size); + } + ptr = &result[res_length]; + memcpy(ptr, replace, rep_length); + res_length += rep_length; + } + } + + if (last_match < length) + { + int to_copy = length - last_match; + if (last_match + to_copy > res_size) + { + res_size = last_match + to_copy + 1; + result = (char *)realloc(result, res_size); + } + ptr = &result[res_length]; + memcpy(ptr, &sql[last_match], to_copy); + res_length += to_copy; + } + result[res_length] = 0; + + return result; +} diff --git a/server/modules/filter/testfilter.c b/server/modules/filter/testfilter.c new file mode 100644 index 000000000..270dbd1cb --- /dev/null +++ b/server/modules/filter/testfilter.c @@ -0,0 +1,234 @@ +/* + * This file is distributed as part of MaxScale by SkySQL. It is free + * software: you can redistribute it and/or modify it under the terms of the + * GNU General Public License as published by the Free Software Foundation, + * version 2. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Copyright SkySQL Ab 2014 + */ +#include +#include +#include +#include + +/** + * testfilter.c - a very simple test filter. + * + * This filter is a very simple example used to test the filter API, + * it merely counts the number of statements that flow through the + * filter pipeline. + * + * Reporting is done via the diagnostics print routine. + */ + +MODULE_INFO info = { + MODULE_API_FILTER, + MODULE_ALPHA_RELEASE, + FILTER_VERSION, + "A simple query counting filter" +}; + +static char *version_str = "V1.0.0"; + +static FILTER *createInstance(char **options, FILTER_PARAMETER **params); +static void *newSession(FILTER *instance, SESSION *session); +static void closeSession(FILTER *instance, void *session); +static void freeSession(FILTER *instance, void *session); +static void setDownstream(FILTER *instance, void *fsession, DOWNSTREAM *downstream); +static int routeQuery(FILTER *instance, void *fsession, GWBUF *queue); +static void diagnostic(FILTER *instance, void *fsession, DCB *dcb); + + +static FILTER_OBJECT MyObject = { + createInstance, + newSession, + closeSession, + freeSession, + setDownstream, + routeQuery, + diagnostic, +}; + +/** + * A dummy instance structure + */ +typedef struct { + int sessions; +} TEST_INSTANCE; + +/** + * A dummy session structure for this test filter + */ +typedef struct { + DOWNSTREAM down; + int count; +} TEST_SESSION; + +/** + * Implementation of the mandatory version entry point + * + * @return version string of the module + */ +char * +version() +{ + return version_str; +} + +/** + * The module initialisation routine, called when the module + * is first loaded. + */ +void +ModuleInit() +{ +} + +/** + * The module entry point routine. It is this routine that + * must populate the structure that is referred to as the + * "module object", this is a structure with the set of + * external entry points for this module. + * + * @return The module object + */ +FILTER_OBJECT * +GetModuleObject() +{ + return &MyObject; +} + +/** + * Create an instance of the filter for a particular service + * within MaxScale. + * + * @param options The options for this filter + * + * @return The instance data for this new instance + */ +static FILTER * +createInstance(char **options, FILTER_PARAMETER **params) +{ +TEST_INSTANCE *my_instance; + + if ((my_instance = calloc(1, sizeof(TEST_INSTANCE))) != NULL) + my_instance->sessions = 0; + return (FILTER *)my_instance; +} + +/** + * Associate a new session with this instance of the filter. + * + * @param instance The filter instance data + * @param session The session itself + * @return Session specific data for this session + */ +static void * +newSession(FILTER *instance, SESSION *session) +{ +TEST_INSTANCE *my_instance = (TEST_INSTANCE *)instance; +TEST_SESSION *my_session; + + if ((my_session = calloc(1, sizeof(TEST_SESSION))) != NULL) + { + my_instance->sessions++; + my_session->count = 0; + } + + return my_session; +} + +/** + * Close a session with the filter, this is the mechanism + * by which a filter may cleanup data structure etc. + * + * @param instance The filter instance data + * @param session The session being closed + */ +static void +closeSession(FILTER *instance, void *session) +{ +} + +/** + * Free the memory associated with this filter session. + * + * @param instance The filter instance data + * @param session The session being closed + */ +static void +freeSession(FILTER *instance, void *session) +{ + free(session); + return; +} + +/** + * Set the downstream component for this filter. + * + * @param instance The filter instance data + * @param session The session being closed + * @param downstream The downstream filter or router + */ +static void +setDownstream(FILTER *instance, void *session, DOWNSTREAM *downstream) +{ +TEST_SESSION *my_session = (TEST_SESSION *)session; + + my_session->down = *downstream; +} + +/** + * The routeQuery entry point. This is passed the query buffer + * to which the filter should be applied. Once applied the + * query shoudl normally be passed to the downstream component + * (filter or router) in the filter chain. + * + * @param instance The filter instance data + * @param session The filter session + * @param queue The query data + */ +static int +routeQuery(FILTER *instance, void *session, GWBUF *queue) +{ +TEST_SESSION *my_session = (TEST_SESSION *)session; + + if (modutil_is_SQL(queue)) + my_session->count++; + return my_session->down.routeQuery(my_session->down.instance, + my_session->down.session, queue); +} + +/** + * Diagnostics routine + * + * If fsession is NULL then print diagnostics on the filter + * instance as a whole, otherwise print diagnostics for the + * particular session. + * + * @param instance The filter instance + * @param fsession Filter session, may be NULL + * @param dcb The DCB for diagnostic output + */ +static void +diagnostic(FILTER *instance, void *fsession, DCB *dcb) +{ +TEST_INSTANCE *my_instance = (TEST_INSTANCE *)instance; +TEST_SESSION *my_session = (TEST_SESSION *)fsession; + + if (my_session) + dcb_printf(dcb, "\t\tNo. of queries routed by filter: %d\n", + my_session->count); + else + dcb_printf(dcb, "\t\tNo. of sessions created: %d\n", + my_instance->sessions); +} diff --git a/server/modules/include/readwritesplit.h b/server/modules/include/readwritesplit.h index 07435f951..ed47d309d 100644 --- a/server/modules/include/readwritesplit.h +++ b/server/modules/include/readwritesplit.h @@ -236,7 +236,6 @@ typedef struct router_instance { } ROUTER_INSTANCE; #define BACKEND_TYPE(b) (SERVER_IS_MASTER((b)->backend_server) ? BE_MASTER : \ - (SERVER_IS_SLAVE((b)->backend_server) ? BE_SLAVE : \ - (SERVER_IS_JOINED((b)->backend_server) ? BE_JOINED : BE_UNDEFINED))); + (SERVER_IS_SLAVE((b)->backend_server) ? BE_SLAVE : BE_UNDEFINED)); #endif /*< _RWSPLITROUTER_H */ diff --git a/server/modules/monitor/galera_mon.c b/server/modules/monitor/galera_mon.c index 96b891d58..a9f242756 100644 --- a/server/modules/monitor/galera_mon.c +++ b/server/modules/monitor/galera_mon.c @@ -26,6 +26,9 @@ * 22/07/13 Mark Riddoch Initial implementation * 21/05/14 Massimiliano Pinto Monitor sets a master server * that has the lowest value of wsrep_local_index + * 23/05/14 Massimiliano Pinto Added 1 configuration option (setInterval). + * Interval is printed in diagnostics. + * 03/06/14 Mark Riddoch Add support for maintenance mode * * @endverbatim */ @@ -42,12 +45,20 @@ #include #include #include +#include extern int lm_enabled_logfiles_bitmask; static void monitorMain(void *); -static char *version_str = "V1.1.0"; +static char *version_str = "V1.2.0"; + +MODULE_INFO info = { + MODULE_API_MONITOR, + MODULE_ALPHA_RELEASE, + MONITOR_VERSION, + "A Galera cluster monitor" +}; static void *startMonitor(void *); static void stopMonitor(void *); @@ -55,8 +66,9 @@ static void registerServer(void *, SERVER *); static void unregisterServer(void *, SERVER *); static void defaultUsers(void *, char *, char *); static void diagnostics(DCB *, void *); +static void setInterval(void *, unsigned long); -static MONITOR_OBJECT MyObject = { startMonitor, stopMonitor, registerServer, unregisterServer, defaultUsers, diagnostics }; +static MONITOR_OBJECT MyObject = { startMonitor, stopMonitor, registerServer, unregisterServer, defaultUsers, diagnostics, setInterval, NULL, NULL }; /** * Implementation of the mandatory version entry point @@ -121,6 +133,8 @@ MYSQL_MONITOR *handle; handle->shutdown = 0; handle->defaultUser = NULL; handle->defaultPasswd = NULL; + handle->id = MONITOR_DEFAULT_ID; + handle->interval = MONITOR_INTERVAL; spinlock_init(&handle->lock); } handle->tid = (THREAD)thread_start(monitorMain, handle); @@ -236,7 +250,10 @@ char *sep; dcb_printf(dcb, "\tMonitor stopped\n"); break; } + + dcb_printf(dcb,"\tSampling interval:\t%lu milliseconds\n", handle->interval); dcb_printf(dcb, "\tMonitored servers: "); + db = handle->databases; sep = ""; while (db) @@ -293,13 +310,29 @@ char *server_string; if (uname == NULL) return; + /* Don't even probe server flagged as in maintenance */ + if (SERVER_IN_MAINT(database->server)) + return; + if (database->con == NULL || mysql_ping(database->con) != 0) { char *dpwd = decryptPassword(passwd); + int rc; + int read_timeout = 1; + database->con = mysql_init(NULL); + rc = mysql_options(database->con, MYSQL_OPT_READ_TIMEOUT, (void *)&read_timeout); + if (mysql_real_connect(database->con, database->server->name, uname, dpwd, NULL, database->server->port, NULL, 0) == NULL) { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Monitor was unable to connect to " + "server %s:%d : \"%s\"", + database->server->name, + database->server->port, + mysql_error(database->con)))); server_clear_status(database->server, SERVER_RUNNING); database->server->node_id = -1; free(dpwd); @@ -395,11 +428,12 @@ long master_id; while (ptr) { + unsigned int prev_status = ptr->server->status; monitorDatabase(ptr, handle->defaultUser, handle->defaultPasswd); /* set master_id to the lowest value of ptr->server->node_id */ - if (ptr->server->node_id >= 0 && SERVER_IS_JOINED(ptr->server)) { + if ((! SERVER_IN_MAINT(ptr->server)) && ptr->server->node_id >= 0 && SERVER_IS_JOINED(ptr->server)) { if (ptr->server->node_id < master_id && master_id >= 0) { master_id = ptr->server->node_id; } else { @@ -407,11 +441,21 @@ long master_id; master_id = ptr->server->node_id; } } - } else { + } else if (!SERVER_IN_MAINT(ptr->server)) { /* clear M/S status */ server_clear_status(ptr->server, SERVER_SLAVE); server_clear_status(ptr->server, SERVER_MASTER); } + if (ptr->server->status != prev_status || + SERVER_IS_DOWN(ptr->server)) + { + LOGIF(LM, (skygw_log_write_flush( + LOGFILE_MESSAGE, + "Backend server %s:%d state : %s", + ptr->server->name, + ptr->server->port, + STRSRVSTATUS(ptr->server)))); + } ptr = ptr->next; } @@ -420,7 +464,7 @@ long master_id; /* this server loop sets Master and Slave roles */ while (ptr) { - if (ptr->server->node_id >= 0 && master_id >= 0) { + if ((! SERVER_IN_MAINT(ptr->server)) && ptr->server->node_id >= 0 && master_id >= 0) { /* set the Master role */ if (SERVER_IS_JOINED(ptr->server) && (ptr->server->node_id == master_id)) { server_set_status(ptr->server, SERVER_MASTER); @@ -434,6 +478,19 @@ long master_id; ptr = ptr->next; } - thread_millisleep(MONITOR_INTERVAL); + thread_millisleep(handle->interval); } } + +/** + * Set the monitor sampling interval. + * + * @param arg The handle allocated by startMonitor + * @param interval The interval to set in monitor struct, in milliseconds + */ +static void +setInterval(void *arg, unsigned long interval) +{ +MYSQL_MONITOR *handle = (MYSQL_MONITOR *)arg; + memcpy(&handle->interval, &interval, sizeof(unsigned long)); +} diff --git a/server/modules/monitor/mysql_mon.c b/server/modules/monitor/mysql_mon.c index 9cee3aec2..03d7f5cb8 100644 --- a/server/modules/monitor/mysql_mon.c +++ b/server/modules/monitor/mysql_mon.c @@ -30,6 +30,9 @@ * diagnostic interface * 20/05/14 Massimiliano Pinto Addition of support for MariadDB multimaster replication setup. * New server field version_string is updated. + * 28/05/14 Massimiliano Pinto Added set Id and configuration options (setInverval) + * Parameters are now printed in diagnostics + * 03/06/14 Mark Ridoch Add support for maintenance mode * * @endverbatim */ @@ -46,12 +49,20 @@ #include #include #include +#include extern int lm_enabled_logfiles_bitmask; static void monitorMain(void *); -static char *version_str = "V1.1.0"; +static char *version_str = "V1.2.0"; + +MODULE_INFO info = { + MODULE_API_MONITOR, + MODULE_ALPHA_RELEASE, + MONITOR_VERSION, + "A MySQL Master/Slave replication monitor" +}; static void *startMonitor(void *); static void stopMonitor(void *); @@ -59,8 +70,11 @@ static void registerServer(void *, SERVER *); static void unregisterServer(void *, SERVER *); static void defaultUser(void *, char *, char *); static void diagnostics(DCB *, void *); +static void setInterval(void *, unsigned long); +static void defaultId(void *, unsigned long); +static void replicationHeartbeat(void *, int); -static MONITOR_OBJECT MyObject = { startMonitor, stopMonitor, registerServer, unregisterServer, defaultUser, diagnostics }; +static MONITOR_OBJECT MyObject = { startMonitor, stopMonitor, registerServer, unregisterServer, defaultUser, diagnostics, setInterval, defaultId, replicationHeartbeat }; /** * Implementation of the mandatory version entry point @@ -126,6 +140,8 @@ MYSQL_MONITOR *handle; handle->shutdown = 0; handle->defaultUser = NULL; handle->defaultPasswd = NULL; + handle->id = MONITOR_DEFAULT_ID; + handle->interval = MONITOR_INTERVAL; spinlock_init(&handle->lock); } handle->tid = (THREAD)thread_start(monitorMain, handle); @@ -261,7 +277,12 @@ char *sep; dcb_printf(dcb, "\tMonitor stopped\n"); break; } + + dcb_printf(dcb,"\tSampling interval:\t%lu milliseconds\n", handle->interval); + dcb_printf(dcb,"\tMaxScale MonitorId:\t%lu\n", handle->id); + dcb_printf(dcb,"\tReplication lag:\t%s\n", (handle->replicationHeartbeat == 1) ? "enabled" : "disabled"); dcb_printf(dcb, "\tMonitored servers: "); + db = handle->databases; sep = ""; while (db) @@ -280,13 +301,13 @@ char *sep; /** * Monitor an individual server * + * @param handle The MySQL Monitor object * @param database The database to probe - * @param defaultUser Default username for the monitor - * @param defaultPasswd Default password for the monitor */ static void -monitorDatabase(MONITOR_SERVERS *database, char *defaultUser, char *defaultPasswd) +monitorDatabase(MYSQL_MONITOR *handle, MONITOR_SERVERS *database) { +<<<<<<< HEAD MYSQL_ROW row; MYSQL_RES *result; int num_fields; @@ -298,6 +319,21 @@ static int conn_err_count; static int modval = 10; if (database->server->monuser != NULL) +======= +MYSQL_ROW row; +MYSQL_RES *result; +int num_fields; +int ismaster = 0, isslave = 0; +char *uname = handle->defaultUser, *passwd = handle->defaultPasswd; +unsigned long int server_version = 0; +char *server_string; +unsigned long id = handle->id; +int replication_heartbeat = handle->replicationHeartbeat; +static int conn_err_count; +static int modval = 10; + + if (database->server->monuser != NULL) +>>>>>>> develop { uname = database->server->monuser; passwd = database->server->monpw; @@ -306,13 +342,24 @@ static int modval = 10; if (uname == NULL) return; +<<<<<<< HEAD +======= + /* Don't probe servers in maintenance mode */ + if (SERVER_IN_MAINT(database->server)) + return; + +>>>>>>> develop if (database->con == NULL || mysql_ping(database->con) != 0) { char *dpwd = decryptPassword(passwd); int rc; int read_timeout = 1; +<<<<<<< HEAD database->con = mysql_init(NULL); +======= + database->con = mysql_init(NULL); +>>>>>>> develop rc = mysql_options(database->con, MYSQL_OPT_READ_TIMEOUT, (void *)&read_timeout); if (mysql_real_connect(database->con, @@ -324,6 +371,7 @@ static int modval = 10; NULL, 0) == NULL) { +<<<<<<< HEAD if (conn_err_count%modval == 0) { LOGIF(LE, (skygw_log_write_flush( @@ -340,6 +388,16 @@ static int modval = 10; { conn_err_count += 1; } +======= + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Monitor was unable to connect to " + "server %s:%d : \"%s\"", + database->server->name, + database->server->port, + mysql_error(database->con)))); + +>>>>>>> develop free(dpwd); server_clear_status(database->server, SERVER_RUNNING); @@ -386,15 +444,112 @@ static int modval = 10; { /* Log lack of permission */ } - } - else if ((result = mysql_store_result(database->con)) != NULL) - { + + database->server->rlag = -1; + } else if ((result = mysql_store_result(database->con)) != NULL) { num_fields = mysql_num_fields(result); while ((row = mysql_fetch_row(result))) { ismaster = 1; } mysql_free_result(result); + + if (ismaster && replication_heartbeat == 1) { + time_t heartbeat; + time_t purge_time; + char heartbeat_insert_query[128]=""; + char heartbeat_purge_query[128]=""; + + handle->master_id = database->server->node_id; + + /* create the maxscale_schema database */ + if (mysql_query(database->con, "CREATE DATABASE IF NOT EXISTS maxscale_schema")) { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "[mysql_mon]: Error creating maxscale_schema database in Master server" + ": %s", mysql_error(database->con)))); + + database->server->rlag = -1; + } + + /* create repl_heartbeat table in maxscale_schema database */ + if (mysql_query(database->con, "CREATE TABLE IF NOT EXISTS " + "maxscale_schema.replication_heartbeat " + "(maxscale_id INT NOT NULL, " + "master_server_id INT NOT NULL, " + "master_timestamp INT UNSIGNED NOT NULL, " + "PRIMARY KEY ( master_server_id, maxscale_id ) ) " + "ENGINE=MYISAM DEFAULT CHARSET=latin1")) { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "[mysql_mon]: Error creating maxscale_schema.replication_heartbeat table in Master server" + ": %s", mysql_error(database->con)))); + + database->server->rlag = -1; + } + + /* auto purge old values after 48 hours*/ + purge_time = time(0) - (3600 * 48); + + sprintf(heartbeat_purge_query, "DELETE FROM maxscale_schema.replication_heartbeat WHERE master_timestamp < %lu", purge_time); + + if (mysql_query(database->con, heartbeat_purge_query)) { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "[mysql_mon]: Error deleting from maxscale_schema.replication_heartbeat table: [%s], %s", + heartbeat_purge_query, + mysql_error(database->con)))); + } + + heartbeat = time(0); + + /* set node_ts for master as time(0) */ + database->server->node_ts = heartbeat; + + sprintf(heartbeat_insert_query, "UPDATE maxscale_schema.replication_heartbeat SET master_timestamp = %lu WHERE master_server_id = %i AND maxscale_id = %lu", heartbeat, handle->master_id, id); + + /* Try to insert MaxScale timestamp into master */ + if (mysql_query(database->con, heartbeat_insert_query)) { + + database->server->rlag = -1; + + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "[mysql_mon]: Error updating maxscale_schema.replication_heartbeat table: [%s], %s", + heartbeat_insert_query, + mysql_error(database->con)))); + } else { + if (mysql_affected_rows(database->con) == 0) { + heartbeat = time(0); + sprintf(heartbeat_insert_query, "REPLACE INTO maxscale_schema.replication_heartbeat (master_server_id, maxscale_id, master_timestamp ) VALUES ( %i, %lu, %lu)", handle->master_id, id, heartbeat); + + if (mysql_query(database->con, heartbeat_insert_query)) { + + database->server->rlag = -1; + + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "[mysql_mon]: Error inserting into maxscale_schema.replication_heartbeat table: [%s], %s", + heartbeat_insert_query, + mysql_error(database->con)))); + } else { + /* Set replication lag to 0 for the master */ + database->server->rlag = 0; + + LOGIF(LD, (skygw_log_write_flush( + LOGFILE_DEBUG, + "[mysql_mon]: heartbeat table inserted data for %s:%i", database->server->name, database->server->port))); + } + } else { + /* Set replication lag as 0 for the master */ + database->server->rlag = 0; + + LOGIF(LD, (skygw_log_write_flush( + LOGFILE_DEBUG, + "[mysql_mon]: heartbeat table updated for %s:%i", database->server->name, database->server->port))); + } + } + } } /* Check if the Slave_SQL_Running and Slave_IO_Running status is @@ -439,6 +594,80 @@ static int modval = 10; } } + /* Get the master_timestamp value from maxscale_schema.replication_heartbeat table */ + if (isslave && replication_heartbeat == 1) { + time_t heartbeat; + char select_heartbeat_query[256] = ""; + + sprintf(select_heartbeat_query, "SELECT master_timestamp " + "FROM maxscale_schema.replication_heartbeat " + "WHERE maxscale_id = %lu AND master_server_id = %i", + id, handle->master_id); + + /* if there is a master then send the query to the slave with master_id*/ + if (handle->master_id >= 0 && (mysql_query(database->con, select_heartbeat_query) == 0 + && (result = mysql_store_result(database->con)) != NULL)) { + num_fields = mysql_num_fields(result); + + while ((row = mysql_fetch_row(result))) { + int rlag = -1; + time_t slave_read; + + heartbeat = time(0); + slave_read = strtoul(row[0], NULL, 10); + + if ((errno == ERANGE && (slave_read == LONG_MAX || slave_read == LONG_MIN)) || (errno != 0 && slave_read == 0)) { + slave_read = 0; + } + + if (slave_read) { + /* set the replication lag */ + rlag = heartbeat - slave_read; + } + + /* set this node_ts as master_timestamp read from replication_heartbeat table */ + database->server->node_ts = slave_read; + + if (rlag >= 0) { + /* store rlag only if greater than monitor sampling interval */ + database->server->rlag = (rlag > (handle->interval / 1000)) ? rlag : 0; + } else { + database->server->rlag = -1; + } + + LOGIF(LD, (skygw_log_write_flush( + LOGFILE_DEBUG, + "[mysql_mon]: replication heartbeat: " + "server %s:%i is %i seconds behind master", + database->server->name, + database->server->port, + database->server->rlag))); + } + mysql_free_result(result); + } else { + database->server->rlag = -1; + database->server->node_ts = 0; + + if (handle->master_id < 0) { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "[mysql_mon]: error: replication heartbeat: " + "master_server_id NOT available for %s:%i", + database->server->name, + database->server->port))); + } else { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "[mysql_mon]: error: replication heartbeat: " + "failed selecting from hearthbeat table of %s:%i : [%s], %s", + database->server->name, + database->server->port, + select_heartbeat_query, + mysql_error(database->con)))); + } + } + } + if (ismaster) { server_set_status(database->server, SERVER_MASTER); @@ -492,11 +721,18 @@ static int modval = 10; { unsigned int prev_status = ptr->server->status; +<<<<<<< HEAD monitorDatabase(ptr, handle->defaultUser, handle->defaultPasswd); if (ptr->server->status != prev_status || (SERVER_IS_DOWN(ptr->server) && err_count%modval == 0)) +======= + monitorDatabase(handle, ptr); + + if (ptr->server->status != prev_status || + SERVER_IS_DOWN(ptr->server)) +>>>>>>> develop { LOGIF(LM, (skygw_log_write_flush( LOGFILE_MESSAGE, @@ -504,6 +740,7 @@ static int modval = 10; ptr->server->name, ptr->server->port, STRSRVSTATUS(ptr->server)))); +<<<<<<< HEAD err_count = 0; modval += 1; } @@ -511,8 +748,51 @@ static int modval = 10; { err_count += 1; } +======= + } + +>>>>>>> develop ptr = ptr->next; } - thread_millisleep(10000); + thread_millisleep(handle->interval); } } + +/** + * Set the default id to use in the monitor. + * + * @param arg The handle allocated by startMonitor + * @param id The id to set in monitor struct + */ +static void +defaultId(void *arg, unsigned long id) + { +MYSQL_MONITOR *handle = (MYSQL_MONITOR *)arg; + memcpy(&handle->id, &id, sizeof(unsigned long)); + } + +/** + * Set the monitor sampling interval. + * + * @param arg The handle allocated by startMonitor + * @param interval The interval to set in monitor struct, in milliseconds + */ +static void +setInterval(void *arg, unsigned long interval) +{ +MYSQL_MONITOR *handle = (MYSQL_MONITOR *)arg; + memcpy(&handle->interval, &interval, sizeof(unsigned long)); + } + +/** + * Enable/Disable the MySQL Replication hearbeat, detecting slave lag behind master. + * + * @param arg The handle allocated by startMonitor + * @param replicationHeartbeat To enable it 1, disable it with 0 + */ +static void +replicationHeartbeat(void *arg, int replicationHeartbeat) +{ +MYSQL_MONITOR *handle = (MYSQL_MONITOR *)arg; + memcpy(&handle->replicationHeartbeat, &replicationHeartbeat, sizeof(int)); +} diff --git a/server/modules/monitor/mysqlmon.h b/server/modules/monitor/mysqlmon.h index a2c2e364c..8f5bcd704 100644 --- a/server/modules/monitor/mysqlmon.h +++ b/server/modules/monitor/mysqlmon.h @@ -30,6 +30,7 @@ * Date Who Description * 08/07/13 Mark Riddoch Initial implementation * 26/05/14 Massimiliano Pinto Default values for MONITOR_INTERVAL + * 28/05/14 Massimiliano Pinto Addition of new fields in MYSQL_MONITOR struct * * @endverbatim */ @@ -55,6 +56,10 @@ typedef struct { int status; /**< Monitor status */ char *defaultUser; /**< Default username for monitoring */ char *defaultPasswd; /**< Default password for monitoring */ + unsigned long interval; /**< Monitor sampling interval */ + unsigned long id; /**< Monitor ID */ + int replicationHeartbeat; /**< Monitor flag for MySQL replication heartbeat */ + int master_id; /**< Master server-id for MySQL Master/Slave replication */ MONITOR_SERVERS *databases; /**< Linked list of servers to monitor */ } MYSQL_MONITOR; @@ -63,5 +68,6 @@ typedef struct { #define MONITOR_STOPPED 3 #define MONITOR_INTERVAL 10000 // in milliseconds +#define MONITOR_DEFAULT_ID 1UL // unsigned long value #endif diff --git a/server/modules/protocol/httpd.c b/server/modules/protocol/httpd.c index f92e49598..7d06264b9 100644 --- a/server/modules/protocol/httpd.c +++ b/server/modules/protocol/httpd.c @@ -39,6 +39,14 @@ #include #include +#include + +MODULE_INFO info = { + MODULE_API_PROTOCOL, + MODULE_IN_DEVELOPMENT, + GWPROTOCOL_VERSION, + "An experimental HTTPD implementation for use in admnistration" +}; #define ISspace(x) isspace((int)(x)) #define HTTP_SERVER_STRING "Gateway(c) v.1.0.0" diff --git a/server/modules/protocol/mysql_backend.c b/server/modules/protocol/mysql_backend.c index 4072a9201..ccf9ebd46 100644 --- a/server/modules/protocol/mysql_backend.c +++ b/server/modules/protocol/mysql_backend.c @@ -43,6 +43,14 @@ * 27/09/2013 Massimiliano Pinto Changed in gw_read_backend_event the check for dcb_read(), now is if rc < 0 * */ +#include + +MODULE_INFO info = { + MODULE_API_PROTOCOL, + MODULE_ALPHA_RELEASE, + GWPROTOCOL_VERSION, + "The MySQL to backend server protocol" +}; extern int lm_enabled_logfiles_bitmask; @@ -1016,4 +1024,4 @@ static int gw_session(DCB *backend_dcb, void *data) { return 1; } -*/ \ No newline at end of file +*/ diff --git a/server/modules/protocol/mysql_client.c b/server/modules/protocol/mysql_client.c index 94d0ed419..395bea545 100644 --- a/server/modules/protocol/mysql_client.c +++ b/server/modules/protocol/mysql_client.c @@ -40,6 +40,14 @@ #include #include #include +#include + +MODULE_INFO info = { + MODULE_API_PROTOCOL, + MODULE_ALPHA_RELEASE, + GWPROTOCOL_VERSION, + "The client to MaxScale MySQL protocol implementation" +}; extern int lm_enabled_logfiles_bitmask; @@ -57,11 +65,7 @@ static int gw_client_hangup_event(DCB *dcb); int mysql_send_ok(DCB *dcb, int packet_number, int in_affected_rows, const char* mysql_message); int MySQLSendHandshake(DCB* dcb); static int gw_mysql_do_authentication(DCB *dcb, GWBUF *queue); -static int route_by_statement( - ROUTER* router_instance, - ROUTER_OBJECT* router, - void* rsession, - GWBUF* read_buf); +static int route_by_statement(SESSION *, GWBUF *); /* * The "module object" for the mysqld client protocol module. @@ -791,6 +795,7 @@ int gw_read_client_event(DCB* dcb) { /** Route COM_QUIT to backend */ if (mysql_command == '\x01') { +<<<<<<< HEAD #if defined(ERRHANDLE) /** * Close router session and that closes @@ -799,7 +804,11 @@ int gw_read_client_event(DCB* dcb) { */ dcb_close(dcb); #else - router->routeQuery(router_instance, rsession, read_buffer); + SESSION_ROUTE_QUERY(session, read_buffer); +// router->routeQuery(router_instance, rsession, read_buffer); +======= + SESSION_ROUTE_QUERY(session, read_buffer); +>>>>>>> develop LOGIF(LD, (skygw_log_write_flush( LOGFILE_DEBUG, "%lu [gw_read_client_event] Routed COM_QUIT to " @@ -818,10 +827,7 @@ int gw_read_client_event(DCB* dcb) { * Feed each statement completely and separately * to router. */ - rc = route_by_statement(router_instance, - router, - rsession, - read_buffer); + rc = route_by_statement(session, read_buffer); if (read_buffer != NULL) { /** add incomplete mysql packet to read queue */ @@ -831,9 +837,7 @@ int gw_read_client_event(DCB* dcb) { else { /** Feed whole packet to router */ - rc = router->routeQuery(router_instance, - rsession, - read_buffer); + rc = SESSION_ROUTE_QUERY(session, read_buffer); } /** succeed */ @@ -1436,11 +1440,7 @@ gw_client_hangup_event(DCB *dcb) * Return 1 in success. If the last packet is incomplete return success but * leave incomplete packet to readbuf. */ -static int route_by_statement( - ROUTER* router_instance, - ROUTER_OBJECT* router, - void* rsession, - GWBUF* readbuf) +static int route_by_statement(SESSION *session, GWBUF *readbuf) { int rc = -1; GWBUF* packetbuf; @@ -1452,7 +1452,7 @@ static int route_by_statement( if (packetbuf != NULL) { CHK_GWBUF(packetbuf); - rc = router->routeQuery(router_instance, rsession, packetbuf); + rc = SESSION_ROUTE_QUERY(session, packetbuf); } else { diff --git a/server/modules/protocol/telnetd.c b/server/modules/protocol/telnetd.c index f7d6c1815..86e98f397 100644 --- a/server/modules/protocol/telnetd.c +++ b/server/modules/protocol/telnetd.c @@ -36,6 +36,14 @@ #include #include #include +#include + +MODULE_INFO info = { + MODULE_API_PROTOCOL, + MODULE_ALPHA_RELEASE, + GWPROTOCOL_VERSION, + "A telnet deamon protocol for simple administration interface" +}; extern int lm_enabled_logfiles_bitmask; @@ -140,9 +148,6 @@ telnetd_read_event(DCB* dcb) int n; GWBUF *head = NULL; SESSION *session = dcb->session; -ROUTER_OBJECT *router = session->service->router; -ROUTER *router_instance = session->service->router_instance; -void *rsession = session->router_session; TELNETD *telnetd = (TELNETD *)dcb->protocol; char *password, *t; @@ -196,7 +201,7 @@ char *password, *t; free(password); break; case TELNETD_STATE_DATA: - router->routeQuery(router_instance, rsession, head); + SESSION_ROUTE_QUERY(session, head); break; } } diff --git a/server/modules/routing/debugcli.c b/server/modules/routing/debugcli.c index 61e308a96..bf2404c08 100644 --- a/server/modules/routing/debugcli.c +++ b/server/modules/routing/debugcli.c @@ -35,6 +35,7 @@ #include #include #include +#include #include #include #include @@ -43,6 +44,14 @@ #include #include + +MODULE_INFO info = { + MODULE_API_ROUTER, + MODULE_ALPHA_RELEASE, + ROUTER_VERSION, + "The debug user interface" +}; + extern int lm_enabled_logfiles_bitmask; static char *version_str = "V1.1.1"; diff --git a/server/modules/routing/debugcmd.c b/server/modules/routing/debugcmd.c index e7720cb30..eb0fc2832 100644 --- a/server/modules/routing/debugcmd.c +++ b/server/modules/routing/debugcmd.c @@ -40,6 +40,7 @@ * 20/05/14 Mark Riddoch Added ability to give server and service names rather * than simply addresses * 23/05/14 Mark Riddoch Added support for developer and user modes + * 29/05/14 Mark Riddoch Add Filter support * * @endverbatim */ @@ -50,6 +51,7 @@ #include #include #include +#include #include #include #include @@ -77,6 +79,7 @@ #define ARG_TYPE_SESSION 6 #define ARG_TYPE_DCB 7 #define ARG_TYPE_MONITOR 8 +#define ARG_TYPE_FILTER 9 /** * The subcommand structure @@ -113,6 +116,14 @@ struct subcommand showoptions[] = { "Show the poll statistics", "Show the poll statistics", {0, 0, 0} }, + { "filter", 0, dprintFilter, + "Show details of a filter, called with a filter name", + "Show details of a filter, called with the address of a filter", + {ARG_TYPE_FILTER, 0, 0} }, + { "filters", 0, dprintAllFilters, + "Show all filters", + "Show all filters", + {0, 0, 0} }, { "modules", 0, dprintAllModules, "Show all currently loaded modules", "Show all currently loaded modules", @@ -157,6 +168,10 @@ struct subcommand showoptions[] = { * The subcommands of the list command */ struct subcommand listoptions[] = { + { "filters", 0, dListFilters, + "List all the filters defined within MaxScale", + "List all the filters defined within MaxScale", + {0, 0, 0} }, { "listeners", 0, dListListeners, "List all the listeners defined within MaxScale", "List all the listeners defined within MaxScale", @@ -493,6 +508,10 @@ SERVICE *service; if (mode == CLIM_USER || (rval = (unsigned long)strtol(arg, NULL, 0)) == 0) rval = (unsigned long)monitor_find(arg); return rval; + case ARG_TYPE_FILTER: + if (mode == CLIM_USER || (rval = (unsigned long)strtol(arg, NULL, 0)) == 0) + rval = (unsigned long)filter_find(arg); + return rval; } return 0; } @@ -516,7 +535,7 @@ execute_cmd(CLI_SESSION *cli) { DCB *dcb = cli->session->client; int argc, i, j, found = 0; -char *args[MAXARGS]; +char *args[MAXARGS + 1]; unsigned long arg1, arg2, arg3; int in_quotes = 0, escape_next = 0; char *ptr, *lptr; @@ -754,11 +773,13 @@ static struct { char *str; unsigned int bit; } ServerBits[] = { - { "running", SERVER_RUNNING }, - { "master", SERVER_MASTER }, - { "slave", SERVER_SLAVE }, - { "synced", SERVER_JOINED }, - { NULL, 0 } + { "running", SERVER_RUNNING }, + { "master", SERVER_MASTER }, + { "slave", SERVER_SLAVE }, + { "synced", SERVER_JOINED }, + { "maintenance", SERVER_MAINT }, + { "maint", SERVER_MAINT }, + { NULL, 0 } }; /** * Map the server status bit diff --git a/server/modules/routing/readconnroute.c b/server/modules/routing/readconnroute.c index 54e910893..8ca0d0b78 100644 --- a/server/modules/routing/readconnroute.c +++ b/server/modules/routing/readconnroute.c @@ -79,6 +79,7 @@ #include #include #include +#include #include #include @@ -88,6 +89,13 @@ extern int lm_enabled_logfiles_bitmask; +MODULE_INFO info = { + MODULE_API_ROUTER, + MODULE_ALPHA_RELEASE, + ROUTER_VERSION, + "A connection based router to load balance based on connections" +}; + static char *version_str = "V1.0.2"; /* The router entry points */ @@ -345,6 +353,9 @@ int master_host = -1; inst->bitmask))); } + if (SERVER_IN_MAINT(inst->servers[i]->server)) + continue; + /* * If router_options=slave, get the running master * It will be used if there are no running slaves at all diff --git a/server/modules/routing/readwritesplit/readwritesplit.c b/server/modules/routing/readwritesplit/readwritesplit.c index d481c5cf9..0e85e9077 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.c +++ b/server/modules/routing/readwritesplit/readwritesplit.c @@ -30,10 +30,19 @@ #include #include #include +#include + +MODULE_INFO info = { + MODULE_API_ROUTER, + MODULE_ALPHA_RELEASE, + ROUTER_VERSION, + "A Read/Write splitting router for enhancement read scalability" +}; #if defined(SS_DEBUG) # include #endif + extern int lm_enabled_logfiles_bitmask; /** @@ -829,8 +838,7 @@ static bool get_dcb( BACKEND* b = backend_ref[i].bref_backend; if (backend_ref[i].bref_state == BREF_IN_USE && - (SERVER_IS_MASTER(b->backend_server) || - SERVER_IS_JOINED(b->backend_server))) + (SERVER_IS_MASTER(b->backend_server))) { *p_dcb = backend_ref[i].bref_dcb; succp = true; @@ -1544,8 +1552,7 @@ static bool select_connect_backend_servers( } } else if (!master_connected && - (SERVER_IS_MASTER(b->backend_server) || - SERVER_IS_JOINED(b->backend_server))) + (SERVER_IS_MASTER(b->backend_server))) { master_found = true; @@ -1659,8 +1666,7 @@ static bool select_connect_backend_servers( "Selected %s in \t%s:%d", (btype == BE_MASTER ? "master" : (btype == BE_SLAVE ? "slave" : - (btype == BE_JOINED ? "galera node" : - "unknown node type"))), + "unknown node type")), b->backend_server->name, b->backend_server->port))); } diff --git a/server/modules/routing/testroute.c b/server/modules/routing/testroute.c index c0bd73fee..ce2ce2ca9 100644 --- a/server/modules/routing/testroute.c +++ b/server/modules/routing/testroute.c @@ -17,9 +17,17 @@ */ #include #include +#include static char *version_str = "V1.0.0"; +MODULE_INFO info = { + MODULE_API_ROUTER, + MODULE_IN_DEVELOPMENT, + ROUTER_VERSION, + "A test router - not for use in real systems" +}; + static ROUTER *createInstance(SERVICE *service, char **options); static void *newSession(ROUTER *instance, SESSION *session); static void closeSession(ROUTER *instance, void *session); @@ -145,4 +153,4 @@ static uint8_t getCapabilities( void* router_session) { return 0; -} \ No newline at end of file +} diff --git a/utils/skygw_debug.h b/utils/skygw_debug.h index 6e129152d..3910429a2 100644 --- a/utils/skygw_debug.h +++ b/utils/skygw_debug.h @@ -234,7 +234,7 @@ typedef enum skygw_chk_t { ((SERVER_IS_RUNNING(s) && SERVER_IS_JOINED(s)) ? "RUNNING JOINED" : \ ((SERVER_IS_RUNNING(s) && SERVER_IN_MAINT(s)) ? "RUNNING MAINTENANCE" : \ (SERVER_IS_RUNNING(s) ? "RUNNING (only)" : "NO STATUS"))))) - + #define CHK_MLIST(l) { \ ss_info_dassert((l->mlist_chk_top == CHK_NUM_MLIST && \ l->mlist_chk_tail == CHK_NUM_MLIST), \