Implementation of server routing generic functionality and for readconnrouter
This commit is contained in:
parent
52646a9942
commit
25d4f784bc
@ -230,6 +230,8 @@ int error_count = 0;
|
||||
config_get_value(obj->parameters, "passwd");
|
||||
char *enable_root_user =
|
||||
config_get_value(obj->parameters, "enable_root_user");
|
||||
char *weightby =
|
||||
config_get_value(obj->parameters, "weightby");
|
||||
|
||||
char *version_string = config_get_value(obj->parameters, "version_string");
|
||||
|
||||
@ -259,6 +261,8 @@ int error_count = 0;
|
||||
|
||||
if (enable_root_user)
|
||||
serviceEnableRootUser(obj->element, config_truth_value(enable_root_user));
|
||||
if (weightby)
|
||||
serviceWeightBy(obj->element, weightby);
|
||||
|
||||
if (!auth)
|
||||
auth = config_get_value(obj->parameters, "auth");
|
||||
@ -362,6 +366,30 @@ int error_count = 0;
|
||||
"defined but no corresponding password.",
|
||||
obj->object)));
|
||||
}
|
||||
if (obj->element)
|
||||
{
|
||||
CONFIG_PARAMETER *params = obj->parameters;
|
||||
while (params)
|
||||
{
|
||||
if (strcmp(params->name, "address")
|
||||
&& strcmp(params->name, "port")
|
||||
&& strcmp(params->name,
|
||||
"protocol")
|
||||
&& strcmp(params->name,
|
||||
"monitoruser")
|
||||
&& strcmp(params->name,
|
||||
"monitorpw")
|
||||
&& strcmp(params->name,
|
||||
"type")
|
||||
)
|
||||
{
|
||||
serverAddParameter(obj->element,
|
||||
params->name,
|
||||
params->value);
|
||||
}
|
||||
params = params->next;
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (!strcmp(type, "filter"))
|
||||
{
|
||||
@ -1238,8 +1266,6 @@ int i;
|
||||
{
|
||||
if (!strcmp(type, "service"))
|
||||
param_set = service_params;
|
||||
else if (!strcmp(type, "server"))
|
||||
param_set = server_params;
|
||||
else if (!strcmp(type, "listener"))
|
||||
param_set = listener_params;
|
||||
else if (!strcmp(type, "monitor"))
|
||||
|
@ -28,6 +28,7 @@
|
||||
* 20/05/14 Massimiliano Pinto Addition of server_string
|
||||
* 21/05/14 Massimiliano Pinto Addition of node_id
|
||||
* 28/05/14 Massimiliano Pinto Addition of rlagd and node_ts fields
|
||||
* 26/06/14 Mark Riddoch Addition of server parameters
|
||||
*
|
||||
* @endverbatim
|
||||
*/
|
||||
@ -76,6 +77,7 @@ SERVER *server;
|
||||
server->node_id = -1;
|
||||
server->rlag = -1;
|
||||
server->node_ts = 0;
|
||||
server->parameters = NULL;
|
||||
|
||||
spinlock_acquire(&server_spin);
|
||||
server->next = allServers;
|
||||
@ -274,7 +276,8 @@ char *stat;
|
||||
void
|
||||
dprintServer(DCB *dcb, SERVER *server)
|
||||
{
|
||||
char *stat;
|
||||
char *stat;
|
||||
SERVER_PARAM *param;
|
||||
|
||||
dcb_printf(dcb, "Server %p (%s)\n", server, server->unique_name);
|
||||
dcb_printf(dcb, "\tServer: %s\n", server->name);
|
||||
@ -294,6 +297,16 @@ char *stat;
|
||||
if (server->node_ts > 0) {
|
||||
dcb_printf(dcb, "\tLast Repl Heartbeat:\t%lu\n", server->node_ts);
|
||||
}
|
||||
if ((param = server->parameters) != NULL)
|
||||
{
|
||||
dcb_printf(dcb, "\tServer Parameters:\n");
|
||||
while (param)
|
||||
{
|
||||
dcb_printf(dcb, "\t\t%-20s %s\n", param->name,
|
||||
param->value);
|
||||
param = param->next;
|
||||
}
|
||||
}
|
||||
dcb_printf(dcb, "\tNumber of connections: %d\n", server->stats.n_connections);
|
||||
dcb_printf(dcb, "\tCurrent no. of conns: %d\n", server->stats.n_current);
|
||||
}
|
||||
@ -444,3 +457,59 @@ server_update(SERVER *server, char *protocol, char *user, char *passwd)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Add a server parameter to a server.
|
||||
*
|
||||
* Server parameters may be used by routing to weight the load
|
||||
* balancing they apply to the server.
|
||||
*
|
||||
* @param server The server we are adding the parameter to
|
||||
* @param name The parameter name
|
||||
* @param value The parameter value
|
||||
*/
|
||||
void
|
||||
serverAddParameter(SERVER *server, char *name, char *value)
|
||||
{
|
||||
SERVER_PARAM *param;
|
||||
|
||||
if ((param = (SERVER_PARAM *)malloc(sizeof(SERVER_PARAM))) == NULL)
|
||||
{
|
||||
return;
|
||||
}
|
||||
if ((param->name = strdup(name)) == NULL)
|
||||
{
|
||||
free(param);
|
||||
return;
|
||||
}
|
||||
if ((param->value = strdup(value)) == NULL)
|
||||
{
|
||||
free(param->value);
|
||||
free(param);
|
||||
return;
|
||||
}
|
||||
|
||||
param->next = server->parameters;
|
||||
server->parameters = param;
|
||||
}
|
||||
|
||||
/**
|
||||
* Retreive a parameter value from a server
|
||||
*
|
||||
* @param server The server we are looking for a parameter of
|
||||
* @param name The name of the parameter we require
|
||||
* @return The parameter value or NULL if not found
|
||||
*/
|
||||
char *
|
||||
serverGetParameter(SERVER *server, char *name)
|
||||
{
|
||||
SERVER_PARAM *param = server->parameters;
|
||||
|
||||
while (param)
|
||||
{
|
||||
if (strcmp(param->name, name) == 0)
|
||||
return param->value;
|
||||
param = param->next;
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
@ -114,6 +114,7 @@ SERVICE *service;
|
||||
service->svc_config_version = 0;
|
||||
service->filters = NULL;
|
||||
service->n_filters = 0;
|
||||
service->weightby = 0;
|
||||
spinlock_init(&service->spin);
|
||||
spinlock_init(&service->users_table_spin);
|
||||
memset(&service->rate_limit, 0, sizeof(SERVICE_REFRESH_RATE));
|
||||
@ -818,6 +819,8 @@ int i;
|
||||
server->protocol);
|
||||
server = server->nextdb;
|
||||
}
|
||||
if (service->weightby)
|
||||
dcb_printf(dcb, "\tRouting weight parameter: %s\n", service->weightby);
|
||||
dcb_printf(dcb, "\tUsers data: %p\n", service->users);
|
||||
dcb_printf(dcb, "\tTotal connections: %d\n", service->stats.n_sessions);
|
||||
dcb_printf(dcb, "\tCurrently connected: %d\n", service->stats.n_current);
|
||||
@ -1097,8 +1100,38 @@ static void service_add_qualified_param(
|
||||
spinlock_release(&svc->spin);
|
||||
}
|
||||
|
||||
char* service_get_name(
|
||||
SERVICE* svc)
|
||||
/**
|
||||
* Return the name of the service
|
||||
*
|
||||
* @param svc The service
|
||||
*/
|
||||
char *
|
||||
service_get_name(SERVICE *svc)
|
||||
{
|
||||
return svc->name;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the weighting parameter for the service
|
||||
*
|
||||
* @param service The service pointer
|
||||
* @param weightby The parameter name to weight the routing by
|
||||
*/
|
||||
void
|
||||
serviceWeightBy(SERVICE *service, char *weightby)
|
||||
{
|
||||
if (service->weightby)
|
||||
free(service->weightby);
|
||||
service->weightby = strdup(weightby);
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the parameter the wervice shoudl use to weight connections
|
||||
* by
|
||||
* @param service The Service pointer
|
||||
*/
|
||||
char *
|
||||
serviceGetWeightingParameter(SERVICE *service)
|
||||
{
|
||||
return service->weightby;
|
||||
}
|
||||
|
@ -36,10 +36,22 @@
|
||||
* 20/05/14 Massimiliano Pinto Addition of node_id field
|
||||
* 23/05/14 Massimiliano Pinto Addition of rlag and node_ts fields
|
||||
* 03/06/14 Mark Riddoch Addition of maintainance mode
|
||||
* 26/06/14 Mark Riddoch Adidtion of server parameters
|
||||
*
|
||||
* @endverbatim
|
||||
*/
|
||||
|
||||
/**
|
||||
* The server parameters used for weighting routing decissions
|
||||
*
|
||||
*/
|
||||
typedef struct server_params {
|
||||
char *name; /**< Parameter name */
|
||||
char *value; /**< Parameter value */
|
||||
struct server_params
|
||||
*next; /**< Next Paramter in the linked list */
|
||||
} SERVER_PARAM;
|
||||
|
||||
/**
|
||||
* The server statistics structure
|
||||
*
|
||||
@ -70,6 +82,7 @@ typedef struct server {
|
||||
long node_id; /**< Node id, server_id for M/S or local_index for Galera */
|
||||
int rlag; /**< Replication Lag for Master / Slave replication */
|
||||
unsigned long node_ts; /**< Last timestamp set from M/S monitor module */
|
||||
SERVER_PARAM *parameters; /**< Parameters of a server that may be used to weight routing decisions */
|
||||
} SERVER;
|
||||
|
||||
/**
|
||||
@ -135,6 +148,8 @@ extern char *server_status(SERVER *);
|
||||
extern void server_set_status(SERVER *, int);
|
||||
extern void server_clear_status(SERVER *, int);
|
||||
extern void serverAddMonUser(SERVER *, char *, char *);
|
||||
extern void serverAddParameter(SERVER *, char *, char *);
|
||||
extern char *serverGetParameter(SERVER *, char *);
|
||||
extern void server_update(SERVER *, char *, char *, char *);
|
||||
extern void server_set_unique_name(SERVER *, char *);
|
||||
#endif
|
||||
|
@ -43,6 +43,7 @@
|
||||
* 07/05/14 Massimiliano Pinto Added version_string field to service
|
||||
* struct
|
||||
* 29/05/14 Mark Riddoch Filter API mechanism
|
||||
* 26/06/14 Mark Riddoch Added WeightBy support
|
||||
*
|
||||
* @endverbatim
|
||||
*/
|
||||
@ -130,6 +131,7 @@ typedef struct service {
|
||||
rate_limit; /**< The refresh rate limit for users table */
|
||||
FILTER_DEF **filters; /**< Ordered list of filters */
|
||||
int n_filters; /**< Number of filters */
|
||||
char *weightby;
|
||||
struct service *next; /**< The next service in the linked list */
|
||||
} SERVICE;
|
||||
|
||||
@ -157,6 +159,8 @@ extern int serviceSetUser(SERVICE *, char *, char *);
|
||||
extern int serviceGetUser(SERVICE *, char **, char **);
|
||||
extern void serviceSetFilters(SERVICE *, char *);
|
||||
extern int serviceEnableRootUser(SERVICE *, int );
|
||||
extern void serviceWeightBy(SERVICE *, char *);
|
||||
extern char *serviceGetWeightingParameter(SERVICE *);
|
||||
extern void service_update(SERVICE *, char *, char *, char *);
|
||||
extern int service_refresh_users(SERVICE *);
|
||||
extern void printService(SERVICE *);
|
||||
|
@ -26,6 +26,7 @@
|
||||
*
|
||||
* Date Who Description
|
||||
* 14/06/13 Mark Riddoch Initial implementation
|
||||
* 27/06/14 Mark Riddoch Addition of server weight percentage
|
||||
*
|
||||
* @endverbatim
|
||||
*/
|
||||
@ -39,6 +40,7 @@
|
||||
typedef struct backend {
|
||||
SERVER *server; /*< The server itself */
|
||||
int current_connection_count; /*< Number of connections to the server */
|
||||
int weight; /*< Desired routing weight */
|
||||
} BACKEND;
|
||||
|
||||
/**
|
||||
|
@ -65,6 +65,7 @@
|
||||
* or take different actions such as open a new backend connection
|
||||
* 20/02/2014 Massimiliano Pinto If router_options=slave, route traffic to master if no slaves available
|
||||
* 06/03/2014 Massimiliano Pinto Server connection counter is now updated in closeSession
|
||||
* 27/06/2014 Mark Riddoch Addition of server weighting
|
||||
*
|
||||
* @endverbatim
|
||||
*/
|
||||
@ -96,7 +97,7 @@ MODULE_INFO info = {
|
||||
"A connection based router to load balance based on connections"
|
||||
};
|
||||
|
||||
static char *version_str = "V1.0.2";
|
||||
static char *version_str = "V1.1.0";
|
||||
|
||||
/* The router entry points */
|
||||
static ROUTER *createInstance(SERVICE *service, char **options);
|
||||
@ -196,6 +197,8 @@ createInstance(SERVICE *service, char **options)
|
||||
ROUTER_INSTANCE *inst;
|
||||
SERVER *server;
|
||||
int i, n;
|
||||
BACKEND *backend;
|
||||
char *weightby;
|
||||
|
||||
if ((inst = calloc(1, sizeof(ROUTER_INSTANCE))) == NULL) {
|
||||
return NULL;
|
||||
@ -231,10 +234,53 @@ int i, n;
|
||||
}
|
||||
inst->servers[n]->server = server;
|
||||
inst->servers[n]->current_connection_count = 0;
|
||||
inst->servers[n]->weight = 100;
|
||||
n++;
|
||||
}
|
||||
inst->servers[n] = NULL;
|
||||
|
||||
if ((weightby = serviceGetWeightingParameter(service)) != NULL)
|
||||
{
|
||||
int total = 0;
|
||||
for (n = 0; inst->servers[n]; n++)
|
||||
{
|
||||
backend = inst->servers[n];
|
||||
total += atoi(serverGetParameter(backend->server,
|
||||
weightby));
|
||||
}
|
||||
if (total == 0)
|
||||
{
|
||||
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
|
||||
"WARNING: Weighting Parameter for service '%s' "
|
||||
"will be ignored as no servers have values "
|
||||
"for the parameter '%s'.\n",
|
||||
service->name, weightby)));
|
||||
}
|
||||
else
|
||||
{
|
||||
for (n = 0; inst->servers[n]; n++)
|
||||
{
|
||||
int perc;
|
||||
backend = inst->servers[n];
|
||||
perc = (atoi(serverGetParameter(backend->server,
|
||||
weightby)) * 100) / total;
|
||||
backend->weight = perc;
|
||||
if (perc == 0)
|
||||
{
|
||||
LOGIF(LE, (skygw_log_write(
|
||||
LOGFILE_ERROR,
|
||||
"Server '%s' has no value "
|
||||
"for weighting parameter '%s', "
|
||||
"no queries will be routed to "
|
||||
"this server.\n",
|
||||
server->unique_name,
|
||||
weightby)));
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Process the options
|
||||
*/
|
||||
@ -262,11 +308,11 @@ int i, n;
|
||||
else
|
||||
{
|
||||
LOGIF(LM, (skygw_log_write(
|
||||
LOGFILE_MESSAGE,
|
||||
"* Warning : Unsupported router "
|
||||
"option \'%s\' for readconnroute. "
|
||||
"Expected router options are "
|
||||
"[slave|master|synced]",
|
||||
LOGFILE_MESSAGE,
|
||||
"* Warning : Unsupported router "
|
||||
"option \'%s\' for readconnroute. "
|
||||
"Expected router options are "
|
||||
"[slave|master|synced]",
|
||||
options[i])));
|
||||
}
|
||||
}
|
||||
@ -377,15 +423,19 @@ int master_host = -1;
|
||||
{
|
||||
candidate = inst->servers[i];
|
||||
}
|
||||
else if (inst->servers[i]->current_connection_count <
|
||||
candidate->current_connection_count)
|
||||
else if ((inst->servers[i]->current_connection_count
|
||||
* 100) / inst->servers[i]->weight <
|
||||
(candidate->current_connection_count *
|
||||
100) / candidate->weight)
|
||||
{
|
||||
/* This running server has fewer
|
||||
connections, set it as a new candidate */
|
||||
candidate = inst->servers[i];
|
||||
}
|
||||
else if (inst->servers[i]->current_connection_count ==
|
||||
candidate->current_connection_count &&
|
||||
else if ((inst->servers[i]->current_connection_count
|
||||
* 100) / inst->servers[i]->weight ==
|
||||
(candidate->current_connection_count *
|
||||
100) / candidate->weight &&
|
||||
inst->servers[i]->server->stats.n_connections <
|
||||
candidate->server->stats.n_connections)
|
||||
{
|
||||
@ -649,6 +699,8 @@ diagnostics(ROUTER *router, DCB *dcb)
|
||||
ROUTER_INSTANCE *router_inst = (ROUTER_INSTANCE *)router;
|
||||
ROUTER_CLIENT_SES *session;
|
||||
int i = 0;
|
||||
BACKEND *backend;
|
||||
char *weightby;
|
||||
|
||||
spinlock_acquire(&router_inst->lock);
|
||||
session = router_inst->connections;
|
||||
@ -664,6 +716,23 @@ int i = 0;
|
||||
dcb_printf(dcb, "\tCurrent no. of router sessions: %d\n", i);
|
||||
dcb_printf(dcb, "\tNumber of queries forwarded: %d\n",
|
||||
router_inst->stats.n_queries);
|
||||
if ((weightby = serviceGetWeightingParameter(router_inst->service))
|
||||
!= NULL)
|
||||
{
|
||||
dcb_printf(dcb, "\tConnection distribution based on %s\n",
|
||||
weightby);
|
||||
dcb_printf(dcb,
|
||||
"\t\tServer Target %% Connections\n");
|
||||
for (i = 0; router_inst->servers[i]; i++)
|
||||
{
|
||||
backend = router_inst->servers[i];
|
||||
dcb_printf(dcb, "\t\t%-20s %3d%% %d\n",
|
||||
backend->server->unique_name,
|
||||
backend->weight,
|
||||
backend->current_connection_count);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
Loading…
x
Reference in New Issue
Block a user