MXS-922: Move server weight calculation into the core
The core now does the server weight calculation instead of each router module doing the same thing.
This commit is contained in:
@ -97,8 +97,9 @@ typedef struct
|
|||||||
|
|
||||||
typedef struct server_ref_t
|
typedef struct server_ref_t
|
||||||
{
|
{
|
||||||
struct server_ref_t *next;
|
struct server_ref_t *next; /**< Next server reference */
|
||||||
SERVER* server;
|
SERVER* server; /**< The actual server */
|
||||||
|
int weight; /**< Weight of this server */
|
||||||
} SERVER_REF;
|
} SERVER_REF;
|
||||||
|
|
||||||
#define SERVICE_MAX_RETRY_INTERVAL 3600 /*< The maximum interval between service start retries */
|
#define SERVICE_MAX_RETRY_INTERVAL 3600 /*< The maximum interval between service start retries */
|
||||||
|
@ -66,6 +66,9 @@
|
|||||||
#include <maxscale/alloc.h>
|
#include <maxscale/alloc.h>
|
||||||
#include <maxscale/utils.h>
|
#include <maxscale/utils.h>
|
||||||
|
|
||||||
|
/** Base value for server weights */
|
||||||
|
#define SERVICE_BASE_SERVER_WEIGHT 1000
|
||||||
|
|
||||||
/** To be used with configuration type checks */
|
/** To be used with configuration type checks */
|
||||||
typedef struct typelib_st
|
typedef struct typelib_st
|
||||||
{
|
{
|
||||||
@ -96,6 +99,7 @@ static void service_add_qualified_param(SERVICE* svc,
|
|||||||
CONFIG_PARAMETER* param);
|
CONFIG_PARAMETER* param);
|
||||||
static void service_internal_restart(void *data);
|
static void service_internal_restart(void *data);
|
||||||
static void service_queue_check(void *data);
|
static void service_queue_check(void *data);
|
||||||
|
static void service_calculate_weights(SERVICE *service);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Allocate a new service for the gateway to support
|
* Allocate a new service for the gateway to support
|
||||||
@ -474,6 +478,9 @@ static void free_string_array(char** array)
|
|||||||
int
|
int
|
||||||
serviceStart(SERVICE *service)
|
serviceStart(SERVICE *service)
|
||||||
{
|
{
|
||||||
|
/** Calculate the server weights */
|
||||||
|
service_calculate_weights(service);
|
||||||
|
|
||||||
int listeners = 0;
|
int listeners = 0;
|
||||||
char **router_options = copy_string_array(service->routerOptions);
|
char **router_options = copy_string_array(service->routerOptions);
|
||||||
|
|
||||||
@ -729,6 +736,26 @@ int serviceHasProtocol(SERVICE *service, const char *protocol,
|
|||||||
return proto != NULL;
|
return proto != NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Allocate a new server reference
|
||||||
|
*
|
||||||
|
* @param server Server to refer to
|
||||||
|
* @return Server reference or NULL on error
|
||||||
|
*/
|
||||||
|
static SERVER_REF* server_ref_alloc(SERVER *server)
|
||||||
|
{
|
||||||
|
SERVER_REF *sref = MXS_MALLOC(sizeof(SERVER_REF));
|
||||||
|
|
||||||
|
if (sref)
|
||||||
|
{
|
||||||
|
sref->next = NULL;
|
||||||
|
sref->server = server;
|
||||||
|
sref->weight = SERVICE_BASE_SERVER_WEIGHT;
|
||||||
|
}
|
||||||
|
|
||||||
|
return sref;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Add a backend database server to a service
|
* Add a backend database server to a service
|
||||||
*
|
*
|
||||||
@ -738,13 +765,10 @@ int serviceHasProtocol(SERVICE *service, const char *protocol,
|
|||||||
void
|
void
|
||||||
serviceAddBackend(SERVICE *service, SERVER *server)
|
serviceAddBackend(SERVICE *service, SERVER *server)
|
||||||
{
|
{
|
||||||
SERVER_REF *sref = MXS_MALLOC(sizeof(SERVER_REF));
|
SERVER_REF *sref = server_ref_alloc(server);
|
||||||
|
|
||||||
if (sref)
|
if (sref)
|
||||||
{
|
{
|
||||||
sref->next = NULL;
|
|
||||||
sref->server = server;
|
|
||||||
|
|
||||||
spinlock_acquire(&service->spin);
|
spinlock_acquire(&service->spin);
|
||||||
if (service->dbref)
|
if (service->dbref)
|
||||||
{
|
{
|
||||||
@ -2027,3 +2051,76 @@ bool service_all_services_have_listeners()
|
|||||||
spinlock_release(&service_spin);
|
spinlock_release(&service_spin);
|
||||||
return rval;
|
return rval;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void service_calculate_weights(SERVICE *service)
|
||||||
|
{
|
||||||
|
char *weightby = serviceGetWeightingParameter(service);
|
||||||
|
if (weightby && service->dbref)
|
||||||
|
{
|
||||||
|
/** Service has a weighting parameter and at least one server */
|
||||||
|
int total = 0;
|
||||||
|
|
||||||
|
/** Calculate total weight */
|
||||||
|
for (SERVER_REF *server = service->dbref; server; server = server->next)
|
||||||
|
{
|
||||||
|
server->weight = SERVICE_BASE_SERVER_WEIGHT;
|
||||||
|
char *param = serverGetParameter(server->server, weightby);
|
||||||
|
if (param)
|
||||||
|
{
|
||||||
|
total += atoi(param);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (total == 0)
|
||||||
|
{
|
||||||
|
MXS_WARNING("Weighting Parameter for service '%s' will be ignored as "
|
||||||
|
"no servers have values for the parameter '%s'.",
|
||||||
|
service->name, weightby);
|
||||||
|
}
|
||||||
|
else if (total < 0)
|
||||||
|
{
|
||||||
|
MXS_ERROR("Sum of weighting parameter '%s' for service '%s' exceeds "
|
||||||
|
"maximum value of %d. Weighting will be ignored.",
|
||||||
|
weightby, service->name, INT_MAX);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/** Calculate the relative weight of the servers */
|
||||||
|
for (SERVER_REF *server = service->dbref; server; server = server->next)
|
||||||
|
{
|
||||||
|
char *param = serverGetParameter(server->server, weightby);
|
||||||
|
if (param)
|
||||||
|
{
|
||||||
|
int wght = atoi(param);
|
||||||
|
int perc = (wght * SERVICE_BASE_SERVER_WEIGHT) / total;
|
||||||
|
|
||||||
|
if (perc == 0)
|
||||||
|
{
|
||||||
|
MXS_ERROR("Weighting parameter '%s' with a value of %d for"
|
||||||
|
" server '%s' rounds down to zero with total weight"
|
||||||
|
" of %d for service '%s'. No queries will be "
|
||||||
|
"routed to this server as long as a server with"
|
||||||
|
" positive weight is available.",
|
||||||
|
weightby, wght, server->server->unique_name,
|
||||||
|
total, service->name);
|
||||||
|
}
|
||||||
|
else if (perc < 0)
|
||||||
|
{
|
||||||
|
MXS_ERROR("Weighting parameter '%s' for server '%s' is too large, "
|
||||||
|
"maximum value is %d. No weighting will be used for this "
|
||||||
|
"server.", weightby, server->server->unique_name,
|
||||||
|
INT_MAX / SERVICE_BASE_SERVER_WEIGHT);
|
||||||
|
perc = SERVICE_BASE_SERVER_WEIGHT;
|
||||||
|
}
|
||||||
|
server->weight = perc;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
MXS_WARNING("Server '%s' has no parameter '%s' used for weighting"
|
||||||
|
" for service '%s'.", server->server->unique_name,
|
||||||
|
weightby, service->name);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -203,11 +203,8 @@ static ROUTER *
|
|||||||
createInstance(SERVICE *service, char **options)
|
createInstance(SERVICE *service, char **options)
|
||||||
{
|
{
|
||||||
ROUTER_INSTANCE *inst;
|
ROUTER_INSTANCE *inst;
|
||||||
SERVER *server;
|
|
||||||
SERVER_REF *sref;
|
SERVER_REF *sref;
|
||||||
int i, n;
|
int i, n;
|
||||||
BACKEND *backend;
|
|
||||||
char *weightby;
|
|
||||||
|
|
||||||
if ((inst = MXS_CALLOC(1, sizeof(ROUTER_INSTANCE))) == NULL)
|
if ((inst = MXS_CALLOC(1, sizeof(ROUTER_INSTANCE))) == NULL)
|
||||||
{
|
{
|
||||||
@ -243,77 +240,11 @@ createInstance(SERVICE *service, char **options)
|
|||||||
}
|
}
|
||||||
inst->servers[n]->server = sref->server;
|
inst->servers[n]->server = sref->server;
|
||||||
inst->servers[n]->current_connection_count = 0;
|
inst->servers[n]->current_connection_count = 0;
|
||||||
inst->servers[n]->weight = 1000;
|
inst->servers[n]->weight = sref->weight;
|
||||||
n++;
|
n++;
|
||||||
}
|
}
|
||||||
inst->servers[n] = NULL;
|
inst->servers[n] = NULL;
|
||||||
|
|
||||||
if ((weightby = serviceGetWeightingParameter(service)) != NULL)
|
|
||||||
{
|
|
||||||
int total = 0;
|
|
||||||
|
|
||||||
for (int n = 0; inst->servers[n]; n++)
|
|
||||||
{
|
|
||||||
BACKEND *backend = inst->servers[n];
|
|
||||||
char *param = serverGetParameter(backend->server, weightby);
|
|
||||||
if (param)
|
|
||||||
{
|
|
||||||
total += atoi(param);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (total == 0)
|
|
||||||
{
|
|
||||||
MXS_WARNING("Weighting Parameter for service '%s' "
|
|
||||||
"will be ignored as no servers have values "
|
|
||||||
"for the parameter '%s'.",
|
|
||||||
service->name, weightby);
|
|
||||||
}
|
|
||||||
else if (total < 0)
|
|
||||||
{
|
|
||||||
MXS_ERROR("Sum of weighting parameter '%s' for service '%s' exceeds "
|
|
||||||
"maximum value of %d. Weighting will be ignored.",
|
|
||||||
weightby, service->name, INT_MAX);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
for (int n = 0; inst->servers[n]; n++)
|
|
||||||
{
|
|
||||||
BACKEND *backend = inst->servers[n];
|
|
||||||
char *param = serverGetParameter(backend->server, weightby);
|
|
||||||
if (param)
|
|
||||||
{
|
|
||||||
int wght = atoi(param);
|
|
||||||
int perc = (wght * 1000) / total;
|
|
||||||
|
|
||||||
if (perc == 0)
|
|
||||||
{
|
|
||||||
perc = 1;
|
|
||||||
MXS_ERROR("Weighting parameter '%s' with a value of %d for"
|
|
||||||
" server '%s' rounds down to zero with total weight"
|
|
||||||
" of %d for service '%s'. No queries will be "
|
|
||||||
"routed to this server.", weightby, wght,
|
|
||||||
backend->server->unique_name, total,
|
|
||||||
service->name);
|
|
||||||
}
|
|
||||||
else if (perc < 0)
|
|
||||||
{
|
|
||||||
MXS_ERROR("Weighting parameter '%s' for server '%s' is too large, "
|
|
||||||
"maximum value is %d. No weighting will be used for this server.",
|
|
||||||
weightby, backend->server->unique_name, INT_MAX / 1000);
|
|
||||||
perc = 1000;
|
|
||||||
}
|
|
||||||
backend->weight = perc;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
MXS_WARNING("Server '%s' has no parameter '%s' used for weighting"
|
|
||||||
" for service '%s'.", backend->server->unique_name,
|
|
||||||
weightby, service->name);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Process the options
|
* Process the options
|
||||||
*/
|
*/
|
||||||
|
@ -232,7 +232,7 @@ static ROUTER *createInstance(SERVICE *service, char **options)
|
|||||||
router->servers[nservers]->backend_server = sref->server;
|
router->servers[nservers]->backend_server = sref->server;
|
||||||
router->servers[nservers]->backend_conn_count = 0;
|
router->servers[nservers]->backend_conn_count = 0;
|
||||||
router->servers[nservers]->be_valid = false;
|
router->servers[nservers]->be_valid = false;
|
||||||
router->servers[nservers]->weight = 1000;
|
router->servers[nservers]->weight = sref->weight;
|
||||||
#if defined(SS_DEBUG)
|
#if defined(SS_DEBUG)
|
||||||
router->servers[nservers]->be_chk_top = CHK_NUM_BACKEND;
|
router->servers[nservers]->be_chk_top = CHK_NUM_BACKEND;
|
||||||
router->servers[nservers]->be_chk_tail = CHK_NUM_BACKEND;
|
router->servers[nservers]->be_chk_tail = CHK_NUM_BACKEND;
|
||||||
@ -247,81 +247,6 @@ static ROUTER *createInstance(SERVICE *service, char **options)
|
|||||||
*/
|
*/
|
||||||
router->available_slaves = true;
|
router->available_slaves = true;
|
||||||
|
|
||||||
/*
|
|
||||||
* If server weighting has been defined calculate the percentage
|
|
||||||
* of load that will be sent to each server. This is only used for
|
|
||||||
* calculating the least connections, either globally or within a
|
|
||||||
* service, or the number of current operations on a server.
|
|
||||||
*/
|
|
||||||
if ((weightby = serviceGetWeightingParameter(service)) != NULL)
|
|
||||||
{
|
|
||||||
int total = 0;
|
|
||||||
|
|
||||||
for (int n = 0; router->servers[n]; n++)
|
|
||||||
{
|
|
||||||
BACKEND *backend = router->servers[n];
|
|
||||||
char *param = serverGetParameter(backend->backend_server, weightby);
|
|
||||||
if (param)
|
|
||||||
{
|
|
||||||
total += atoi(param);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (total == 0)
|
|
||||||
{
|
|
||||||
MXS_WARNING("Weighting Parameter for service '%s' "
|
|
||||||
"will be ignored as no servers have values "
|
|
||||||
"for the parameter '%s'.",
|
|
||||||
service->name, weightby);
|
|
||||||
}
|
|
||||||
else if (total < 0)
|
|
||||||
{
|
|
||||||
MXS_ERROR("Sum of weighting parameter '%s' for service '%s' exceeds "
|
|
||||||
"maximum value of %d. Weighting will be ignored.",
|
|
||||||
weightby, service->name, INT_MAX);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
for (int n = 0; router->servers[n]; n++)
|
|
||||||
{
|
|
||||||
BACKEND *backend = router->servers[n];
|
|
||||||
char *param = serverGetParameter(backend->backend_server, weightby);
|
|
||||||
if (param)
|
|
||||||
{
|
|
||||||
int wght = atoi(param);
|
|
||||||
int perc = (wght * 1000) / total;
|
|
||||||
|
|
||||||
if (perc == 0)
|
|
||||||
{
|
|
||||||
MXS_ERROR("Weighting parameter '%s' with a value of %d for"
|
|
||||||
" server '%s' rounds down to zero with total weight"
|
|
||||||
" of %d for service '%s'. No queries will be "
|
|
||||||
"routed to this server as long as a server with"
|
|
||||||
" positive weight is available.",
|
|
||||||
weightby, wght, backend->backend_server->unique_name,
|
|
||||||
total, service->name);
|
|
||||||
}
|
|
||||||
else if (perc < 0)
|
|
||||||
{
|
|
||||||
MXS_ERROR("Weighting parameter '%s' for server '%s' is too large, "
|
|
||||||
"maximum value is %d. No weighting will be used for this "
|
|
||||||
"server.",
|
|
||||||
weightby, backend->backend_server->unique_name,
|
|
||||||
INT_MAX / 1000);
|
|
||||||
perc = 1000;
|
|
||||||
}
|
|
||||||
backend->weight = perc;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
MXS_WARNING("Server '%s' has no parameter '%s' used for weighting"
|
|
||||||
" for service '%s'.",
|
|
||||||
backend->backend_server->unique_name, weightby,
|
|
||||||
service->name);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Enable strict multistatement handling by default */
|
/** Enable strict multistatement handling by default */
|
||||||
router->rwsplit_config.rw_strict_multi_stmt = true;
|
router->rwsplit_config.rw_strict_multi_stmt = true;
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user