diff --git a/server/core/config.c b/server/core/config.c index 8fd53318b..efd72ea7d 100644 --- a/server/core/config.c +++ b/server/core/config.c @@ -230,6 +230,8 @@ int error_count = 0; config_get_value(obj->parameters, "passwd"); char *enable_root_user = config_get_value(obj->parameters, "enable_root_user"); + char *weightby = + config_get_value(obj->parameters, "weightby"); char *version_string = config_get_value(obj->parameters, "version_string"); @@ -259,6 +261,8 @@ int error_count = 0; if (enable_root_user) serviceEnableRootUser(obj->element, config_truth_value(enable_root_user)); + if (weightby) + serviceWeightBy(obj->element, weightby); if (!auth) auth = config_get_value(obj->parameters, "auth"); @@ -362,6 +366,30 @@ int error_count = 0; "defined but no corresponding password.", obj->object))); } + if (obj->element) + { + CONFIG_PARAMETER *params = obj->parameters; + while (params) + { + if (strcmp(params->name, "address") + && strcmp(params->name, "port") + && strcmp(params->name, + "protocol") + && strcmp(params->name, + "monitoruser") + && strcmp(params->name, + "monitorpw") + && strcmp(params->name, + "type") + ) + { + serverAddParameter(obj->element, + params->name, + params->value); + } + params = params->next; + } + } } else if (!strcmp(type, "filter")) { @@ -1238,8 +1266,6 @@ int i; { if (!strcmp(type, "service")) param_set = service_params; - else if (!strcmp(type, "server")) - param_set = server_params; else if (!strcmp(type, "listener")) param_set = listener_params; else if (!strcmp(type, "monitor")) diff --git a/server/core/server.c b/server/core/server.c index c1bec6189..f56b39ded 100644 --- a/server/core/server.c +++ b/server/core/server.c @@ -28,6 +28,7 @@ * 20/05/14 Massimiliano Pinto Addition of server_string * 21/05/14 Massimiliano Pinto Addition of node_id * 28/05/14 Massimiliano Pinto Addition of rlagd and node_ts fields + * 26/06/14 Mark Riddoch Addition of server parameters * * @endverbatim */ @@ -76,6 +77,7 @@ SERVER *server; server->node_id = -1; server->rlag = -1; server->node_ts = 0; + server->parameters = NULL; spinlock_acquire(&server_spin); server->next = allServers; @@ -274,7 +276,8 @@ char *stat; void dprintServer(DCB *dcb, SERVER *server) { -char *stat; +char *stat; +SERVER_PARAM *param; dcb_printf(dcb, "Server %p (%s)\n", server, server->unique_name); dcb_printf(dcb, "\tServer: %s\n", server->name); @@ -294,6 +297,16 @@ char *stat; if (server->node_ts > 0) { dcb_printf(dcb, "\tLast Repl Heartbeat:\t%lu\n", server->node_ts); } + if ((param = server->parameters) != NULL) + { + dcb_printf(dcb, "\tServer Parameters:\n"); + while (param) + { + dcb_printf(dcb, "\t\t%-20s %s\n", param->name, + param->value); + param = param->next; + } + } dcb_printf(dcb, "\tNumber of connections: %d\n", server->stats.n_connections); dcb_printf(dcb, "\tCurrent no. of conns: %d\n", server->stats.n_current); } @@ -444,3 +457,59 @@ server_update(SERVER *server, char *protocol, char *user, char *passwd) } } + +/** + * Add a server parameter to a server. + * + * Server parameters may be used by routing to weight the load + * balancing they apply to the server. + * + * @param server The server we are adding the parameter to + * @param name The parameter name + * @param value The parameter value + */ +void +serverAddParameter(SERVER *server, char *name, char *value) +{ +SERVER_PARAM *param; + + if ((param = (SERVER_PARAM *)malloc(sizeof(SERVER_PARAM))) == NULL) + { + return; + } + if ((param->name = strdup(name)) == NULL) + { + free(param); + return; + } + if ((param->value = strdup(value)) == NULL) + { + free(param->value); + free(param); + return; + } + + param->next = server->parameters; + server->parameters = param; +} + +/** + * Retreive a parameter value from a server + * + * @param server The server we are looking for a parameter of + * @param name The name of the parameter we require + * @return The parameter value or NULL if not found + */ +char * +serverGetParameter(SERVER *server, char *name) +{ +SERVER_PARAM *param = server->parameters; + + while (param) + { + if (strcmp(param->name, name) == 0) + return param->value; + param = param->next; + } + return NULL; +} diff --git a/server/core/service.c b/server/core/service.c index d3db48390..2e61c44c1 100644 --- a/server/core/service.c +++ b/server/core/service.c @@ -114,6 +114,7 @@ SERVICE *service; service->svc_config_version = 0; service->filters = NULL; service->n_filters = 0; + service->weightby = 0; spinlock_init(&service->spin); spinlock_init(&service->users_table_spin); memset(&service->rate_limit, 0, sizeof(SERVICE_REFRESH_RATE)); @@ -818,6 +819,8 @@ int i; server->protocol); server = server->nextdb; } + if (service->weightby) + dcb_printf(dcb, "\tRouting weight parameter: %s\n", service->weightby); dcb_printf(dcb, "\tUsers data: %p\n", service->users); dcb_printf(dcb, "\tTotal connections: %d\n", service->stats.n_sessions); dcb_printf(dcb, "\tCurrently connected: %d\n", service->stats.n_current); @@ -1097,8 +1100,38 @@ static void service_add_qualified_param( spinlock_release(&svc->spin); } -char* service_get_name( - SERVICE* svc) +/** + * Return the name of the service + * + * @param svc The service + */ +char * +service_get_name(SERVICE *svc) { return svc->name; } + +/** + * Set the weighting parameter for the service + * + * @param service The service pointer + * @param weightby The parameter name to weight the routing by + */ +void +serviceWeightBy(SERVICE *service, char *weightby) +{ + if (service->weightby) + free(service->weightby); + service->weightby = strdup(weightby); +} + +/** + * Return the parameter the wervice shoudl use to weight connections + * by + * @param service The Service pointer + */ +char * +serviceGetWeightingParameter(SERVICE *service) +{ + return service->weightby; +} diff --git a/server/include/server.h b/server/include/server.h index d32413bb7..3768d98f0 100644 --- a/server/include/server.h +++ b/server/include/server.h @@ -36,10 +36,22 @@ * 20/05/14 Massimiliano Pinto Addition of node_id field * 23/05/14 Massimiliano Pinto Addition of rlag and node_ts fields * 03/06/14 Mark Riddoch Addition of maintainance mode + * 26/06/14 Mark Riddoch Adidtion of server parameters * * @endverbatim */ +/** + * The server parameters used for weighting routing decissions + * + */ +typedef struct server_params { + char *name; /**< Parameter name */ + char *value; /**< Parameter value */ + struct server_params + *next; /**< Next Paramter in the linked list */ +} SERVER_PARAM; + /** * The server statistics structure * @@ -70,6 +82,7 @@ typedef struct server { long node_id; /**< Node id, server_id for M/S or local_index for Galera */ int rlag; /**< Replication Lag for Master / Slave replication */ unsigned long node_ts; /**< Last timestamp set from M/S monitor module */ + SERVER_PARAM *parameters; /**< Parameters of a server that may be used to weight routing decisions */ } SERVER; /** @@ -135,6 +148,8 @@ extern char *server_status(SERVER *); extern void server_set_status(SERVER *, int); extern void server_clear_status(SERVER *, int); extern void serverAddMonUser(SERVER *, char *, char *); +extern void serverAddParameter(SERVER *, char *, char *); +extern char *serverGetParameter(SERVER *, char *); extern void server_update(SERVER *, char *, char *, char *); extern void server_set_unique_name(SERVER *, char *); #endif diff --git a/server/include/service.h b/server/include/service.h index 1c6fd4822..43374e50d 100644 --- a/server/include/service.h +++ b/server/include/service.h @@ -43,6 +43,7 @@ * 07/05/14 Massimiliano Pinto Added version_string field to service * struct * 29/05/14 Mark Riddoch Filter API mechanism + * 26/06/14 Mark Riddoch Added WeightBy support * * @endverbatim */ @@ -130,6 +131,7 @@ typedef struct service { rate_limit; /**< The refresh rate limit for users table */ FILTER_DEF **filters; /**< Ordered list of filters */ int n_filters; /**< Number of filters */ + char *weightby; struct service *next; /**< The next service in the linked list */ } SERVICE; @@ -157,6 +159,8 @@ extern int serviceSetUser(SERVICE *, char *, char *); extern int serviceGetUser(SERVICE *, char **, char **); extern void serviceSetFilters(SERVICE *, char *); extern int serviceEnableRootUser(SERVICE *, int ); +extern void serviceWeightBy(SERVICE *, char *); +extern char *serviceGetWeightingParameter(SERVICE *); extern void service_update(SERVICE *, char *, char *, char *); extern int service_refresh_users(SERVICE *); extern void printService(SERVICE *); diff --git a/server/modules/include/readconnection.h b/server/modules/include/readconnection.h index c6f19995c..589296302 100644 --- a/server/modules/include/readconnection.h +++ b/server/modules/include/readconnection.h @@ -26,6 +26,7 @@ * * Date Who Description * 14/06/13 Mark Riddoch Initial implementation + * 27/06/14 Mark Riddoch Addition of server weight percentage * * @endverbatim */ @@ -39,6 +40,7 @@ typedef struct backend { SERVER *server; /*< The server itself */ int current_connection_count; /*< Number of connections to the server */ + int weight; /*< Desired routing weight */ } BACKEND; /** diff --git a/server/modules/routing/readconnroute.c b/server/modules/routing/readconnroute.c index 5f9b5225d..5b64f8815 100644 --- a/server/modules/routing/readconnroute.c +++ b/server/modules/routing/readconnroute.c @@ -65,6 +65,7 @@ * or take different actions such as open a new backend connection * 20/02/2014 Massimiliano Pinto If router_options=slave, route traffic to master if no slaves available * 06/03/2014 Massimiliano Pinto Server connection counter is now updated in closeSession + * 27/06/2014 Mark Riddoch Addition of server weighting * * @endverbatim */ @@ -96,7 +97,7 @@ MODULE_INFO info = { "A connection based router to load balance based on connections" }; -static char *version_str = "V1.0.2"; +static char *version_str = "V1.1.0"; /* The router entry points */ static ROUTER *createInstance(SERVICE *service, char **options); @@ -196,6 +197,8 @@ createInstance(SERVICE *service, char **options) ROUTER_INSTANCE *inst; SERVER *server; int i, n; +BACKEND *backend; +char *weightby; if ((inst = calloc(1, sizeof(ROUTER_INSTANCE))) == NULL) { return NULL; @@ -231,10 +234,53 @@ int i, n; } inst->servers[n]->server = server; inst->servers[n]->current_connection_count = 0; + inst->servers[n]->weight = 100; n++; } inst->servers[n] = NULL; + if ((weightby = serviceGetWeightingParameter(service)) != NULL) + { + int total = 0; + for (n = 0; inst->servers[n]; n++) + { + backend = inst->servers[n]; + total += atoi(serverGetParameter(backend->server, + weightby)); + } + if (total == 0) + { + LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, + "WARNING: Weighting Parameter for service '%s' " + "will be ignored as no servers have values " + "for the parameter '%s'.\n", + service->name, weightby))); + } + else + { + for (n = 0; inst->servers[n]; n++) + { + int perc; + backend = inst->servers[n]; + perc = (atoi(serverGetParameter(backend->server, + weightby)) * 100) / total; + backend->weight = perc; + if (perc == 0) + { + LOGIF(LE, (skygw_log_write( + LOGFILE_ERROR, + "Server '%s' has no value " + "for weighting parameter '%s', " + "no queries will be routed to " + "this server.\n", + server->unique_name, + weightby))); + } + + } + } + } + /* * Process the options */ @@ -262,11 +308,11 @@ int i, n; else { LOGIF(LM, (skygw_log_write( - LOGFILE_MESSAGE, - "* Warning : Unsupported router " - "option \'%s\' for readconnroute. " - "Expected router options are " - "[slave|master|synced]", + LOGFILE_MESSAGE, + "* Warning : Unsupported router " + "option \'%s\' for readconnroute. " + "Expected router options are " + "[slave|master|synced]", options[i]))); } } @@ -377,15 +423,19 @@ int master_host = -1; { candidate = inst->servers[i]; } - else if (inst->servers[i]->current_connection_count < - candidate->current_connection_count) + else if ((inst->servers[i]->current_connection_count + * 100) / inst->servers[i]->weight < + (candidate->current_connection_count * + 100) / candidate->weight) { /* This running server has fewer connections, set it as a new candidate */ candidate = inst->servers[i]; } - else if (inst->servers[i]->current_connection_count == - candidate->current_connection_count && + else if ((inst->servers[i]->current_connection_count + * 100) / inst->servers[i]->weight == + (candidate->current_connection_count * + 100) / candidate->weight && inst->servers[i]->server->stats.n_connections < candidate->server->stats.n_connections) { @@ -649,6 +699,8 @@ diagnostics(ROUTER *router, DCB *dcb) ROUTER_INSTANCE *router_inst = (ROUTER_INSTANCE *)router; ROUTER_CLIENT_SES *session; int i = 0; +BACKEND *backend; +char *weightby; spinlock_acquire(&router_inst->lock); session = router_inst->connections; @@ -664,6 +716,23 @@ int i = 0; dcb_printf(dcb, "\tCurrent no. of router sessions: %d\n", i); dcb_printf(dcb, "\tNumber of queries forwarded: %d\n", router_inst->stats.n_queries); + if ((weightby = serviceGetWeightingParameter(router_inst->service)) + != NULL) + { + dcb_printf(dcb, "\tConnection distribution based on %s\n", + weightby); + dcb_printf(dcb, + "\t\tServer Target %% Connections\n"); + for (i = 0; router_inst->servers[i]; i++) + { + backend = router_inst->servers[i]; + dcb_printf(dcb, "\t\t%-20s %3d%% %d\n", + backend->server->unique_name, + backend->weight, + backend->current_connection_count); + } + + } } /**