MAX-94, Added configuration parameter, max_slave_replication_lag=<longest allowed replication lag in seconds> to router section.

Parameter can be changed runtime but it requires reloading of config.
This commit is contained in:
VilhoRaatikka 2014-06-30 22:35:27 +03:00
parent aa828e8751
commit b875936a21
6 changed files with 257 additions and 90 deletions

View File

@ -222,6 +222,7 @@ int error_count = 0;
if (router)
{
char* max_slave_conn_str;
char* max_slave_rlag_str;
obj->element = service_alloc(obj->object, router);
char *user =
@ -254,22 +255,30 @@ int error_count = 0;
if (gateway.version_string)
((SERVICE *)(obj->element))->version_string = strdup(gateway.version_string);
}
max_slave_conn_str =
config_get_value(obj->parameters,
"max_slave_connections");
max_slave_rlag_str =
config_get_value(obj->parameters,
"max_slave_replication_lag");
if (enable_root_user)
serviceEnableRootUser(obj->element, config_truth_value(enable_root_user));
serviceEnableRootUser(
obj->element,
config_truth_value(enable_root_user));
if (weightby)
serviceWeightBy(obj->element, weightby);
if (!auth)
auth = config_get_value(obj->parameters, "auth");
auth = config_get_value(obj->parameters,
"auth");
if (obj->element && user && auth)
{
serviceSetUser(obj->element, user, auth);
serviceSetUser(obj->element,
user,
auth);
}
else if (user && auth == NULL)
{
@ -280,6 +289,7 @@ int error_count = 0;
"corresponding password.",
obj->object)));
}
/** Read, validate and set max_slave_connections */
if (max_slave_conn_str != NULL)
{
CONFIG_PARAMETER* param;
@ -288,11 +298,12 @@ int error_count = 0;
param = config_get_param(obj->parameters,
"max_slave_connections");
succp = service_set_slave_conn_limit(
succp = service_set_param_value(
obj->element,
param,
max_slave_conn_str,
COUNT_ATMOST);
COUNT_ATMOST,
(COUNT_TYPE|PERCENT_TYPE));
if (!succp)
{
@ -309,6 +320,36 @@ int error_count = 0;
param->value)));
}
}
/** Read, validate and set max_slave_replication_lag */
if (max_slave_rlag_str != NULL)
{
CONFIG_PARAMETER* param;
bool succp;
param = config_get_param(
obj->parameters,
"max_slave_replication_lag");
succp = service_set_param_value(
obj->element,
param,
max_slave_rlag_str,
COUNT_ATMOST,
COUNT_TYPE);
if (!succp)
{
LOGIF(LM, (skygw_log_write(
LOGFILE_MESSAGE,
"* Warning : invalid value type "
"for parameter \'%s.%s = %s\'\n\tExpected "
"type is <int> for maximum "
"slave replication lag.",
((SERVICE*)obj->element)->name,
param->name,
param->value)));
}
}
}
else
{
@ -910,6 +951,7 @@ SERVER *server;
char *auth;
char *enable_root_user;
char* max_slave_conn_str;
char* max_slave_rlag_str;
char *version_string;
enable_root_user = config_get_value(obj->parameters, "enable_root_user");
@ -934,24 +976,27 @@ SERVER *server;
auth);
if (enable_root_user)
serviceEnableRootUser(service, atoi(enable_root_user));
/** Read, validate and set max_slave_connections */
max_slave_conn_str =
config_get_value(
obj->parameters,
"max_slave_connections");
if (max_slave_conn_str != NULL)
{
CONFIG_PARAMETER* param;
bool succp;
param = config_get_param(obj->parameters,
"max_slave_connections");
"max_slave_connections");
succp = service_set_slave_conn_limit(
service,
param,
max_slave_conn_str,
COUNT_ATMOST);
succp = service_set_param_value(
service,
param,
max_slave_conn_str,
COUNT_ATMOST,
(PERCENT_TYPE|COUNT_TYPE));
if (!succp)
{
@ -968,8 +1013,40 @@ SERVER *server;
param->value)));
}
}
/** Read, validate and set max_slave_replication_lag */
max_slave_rlag_str =
config_get_value(obj->parameters,
"max_slave_replication_lag");
if (max_slave_rlag_str != NULL)
{
CONFIG_PARAMETER* param;
bool succp;
param = config_get_param(
obj->parameters,
"max_slave_replication_lag");
succp = service_set_param_value(
service,
param,
max_slave_rlag_str,
COUNT_ATMOST,
COUNT_TYPE);
if (!succp)
{
LOGIF(LM, (skygw_log_write(
LOGFILE_MESSAGE,
"* Warning : invalid value type "
"for parameter \'%s.%s = %s\'\n\tExpected "
"type is <int> for maximum "
"slave replication lag.",
((SERVICE*)obj->element)->name,
param->name,
param->value)));
}
}
}
obj->element = service;
@ -1204,6 +1281,7 @@ static char *service_params[] =
"passwd",
"enable_root_user",
"max_slave_connections",
"max_slave_replication_lag",
"version_string",
"filters",
NULL

View File

@ -999,11 +999,12 @@ int service_refresh_users(SERVICE *service) {
return 1;
}
bool service_set_slave_conn_limit (
SERVICE* service,
CONFIG_PARAMETER* param,
char* valstr,
count_spec_t count_spec)
bool service_set_param_value (
SERVICE* service,
CONFIG_PARAMETER* param,
char* valstr,
count_spec_t count_spec,
config_param_type_t type)
{
char* p;
int valint;
@ -1034,11 +1035,15 @@ bool service_set_slave_conn_limit (
{
succp = false;
}
else
else if (PARAM_IS_TYPE(type,PERCENT_TYPE))
{
succp = true;
config_set_qualified_param(param, (void *)&valint, PERCENT_TYPE);
}
else
{
/** Log error */
}
}
else
{
@ -1053,11 +1058,15 @@ bool service_set_slave_conn_limit (
{
succp = false;
}
else
else if (PARAM_IS_TYPE(type,COUNT_TYPE))
{
succp = true;
config_set_qualified_param(param, (void *)&valint, COUNT_TYPE);
}
else
{
/** Log error */
}
}
if (succp)

View File

@ -39,13 +39,15 @@
enum {MAX_PARAM_LEN=256};
typedef enum {
UNDEFINED_TYPE=0,
STRING_TYPE,
COUNT_TYPE,
PERCENT_TYPE,
BOOL_TYPE
UNDEFINED_TYPE = 0x00,
STRING_TYPE = 0x01,
COUNT_TYPE = 0x02,
PERCENT_TYPE = 0x04,
BOOL_TYPE = 0x08
} config_param_type_t;
#define PARAM_IS_TYPE(p,t) ((p) & (t))
/**
* The config parameter
*/

View File

@ -166,11 +166,14 @@ extern int service_refresh_users(SERVICE *);
extern void printService(SERVICE *);
extern void printAllServices();
extern void dprintAllServices(DCB *);
bool service_set_slave_conn_limit (
SERVICE* service,
CONFIG_PARAMETER* param,
char* valstr,
count_spec_t count_spec);
bool service_set_param_value (
SERVICE* service,
CONFIG_PARAMETER* param,
char* valstr,
count_spec_t count_spec,
config_param_type_t type);
extern void dprintService(DCB *, SERVICE *);
extern void dListServices(DCB *);
extern void dListListeners(DCB *);

View File

@ -99,6 +99,7 @@ typedef enum select_criteria {
/** default values for rwsplit configuration parameters */
#define CONFIG_MAX_SLAVE_CONN 1
#define CONFIG_MAX_SLAVE_RLAG -1 /*< not used */
#define GET_SELECT_CRITERIA(s) \
(strncmp(s,"LEAST_GLOBAL_CONNECTIONS", strlen("LEAST_GLOBAL_CONNECTIONS")) == 0 ? \
@ -201,6 +202,7 @@ typedef struct rwsplit_config_st {
int rw_max_slave_conn_percent;
int rw_max_slave_conn_count;
select_criteria_t rw_slave_select_criteria;
int rw_max_slave_replication_lag;
} rwsplit_config_t;

View File

@ -94,6 +94,7 @@ static void handleError(
static void print_error_packet(ROUTER_CLIENT_SES* rses, GWBUF* buf, DCB* dcb);
static int router_get_servercount(ROUTER_INSTANCE* router);
static int rses_get_max_slavecount(ROUTER_CLIENT_SES* rses, int router_nservers);
static int rses_get_max_replication_lag(ROUTER_CLIENT_SES* rses);
static backend_ref_t* get_bref_from_dcb(ROUTER_CLIENT_SES* rses, DCB* dcb);
static uint8_t getCapabilities (ROUTER* inst, void* router_session);
@ -128,6 +129,7 @@ static bool select_connect_backend_servers(
backend_ref_t* backend_ref,
int router_nservers,
int max_nslaves,
int max_rlag,
select_criteria_t select_criteria,
SESSION* session,
ROUTER_INSTANCE* router);
@ -244,9 +246,15 @@ static bool handle_error_new_connection(
GWBUF* errmsg);
static bool handle_error_reply_client(SESSION* ses, GWBUF* errmsg);
static BACKEND *get_root_master(
backend_ref_t *servers,
int router_nservers);
static BACKEND* get_root_master(
backend_ref_t* servers,
int router_nservers);
static bool have_enough_servers(
ROUTER_CLIENT_SES** rses,
const int nsrv,
int router_nsrv,
ROUTER_INSTANCE* router);
static SPINLOCK instlock;
static ROUTER_INSTANCE* instances;
@ -319,7 +327,7 @@ static void refreshInstance(
/**
* Create an instance of read/write statemtn router within the MaxScale.
* Create an instance of read/write statement router within the MaxScale.
*
*
* @param service The service this router is being create for
@ -416,12 +424,23 @@ static ROUTER* createInstance(
{
router->rwsplit_config.rw_slave_select_criteria = DEFAULT_CRITERIA;
}
/**
/**
* Copy all config parameters from service to router instance.
* Finally, copy version number to indicate that configs match.
*/
param = config_get_param(service->svc_config_param, "max_slave_connections");
param = config_get_param(service->svc_config_param, "max_slave_connections");
if (param != NULL)
{
refreshInstance(router, param);
router->rwsplit_version = service->svc_config_version;
}
/**
* Read default value for slave replication lag upper limit and then
* configured value if it exists.
*/
router->rwsplit_config.rw_max_slave_replication_lag = CONFIG_MAX_SLAVE_RLAG;
param = config_get_param(service->svc_config_param, "max_slave_replication_lag");
if (param != NULL)
{
@ -462,6 +481,7 @@ static void* newSession(
bool succp;
int router_nservers = 0; /*< # of servers in total */
int max_nslaves; /*< max # of slaves used in this session */
int max_slave_rlag; /*< max allowed replication lag for any slave */
int i;
const int min_nservers = 1; /*< hard-coded for now */
static uint64_t router_client_ses_seq; /*< ID for client session */
@ -508,61 +528,17 @@ static void* newSession(
router_nservers = router_get_servercount(router);
/** With too few servers session is not created */
if (router_nservers < min_nservers ||
MAX(client_rses->rses_config.rw_max_slave_conn_count,
(router_nservers*client_rses->rses_config.rw_max_slave_conn_percent)/100)
< min_nservers)
if (!have_enough_servers(&client_rses,
min_nservers,
router_nservers,
router))
{
if (router_nservers < min_nservers)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Unable to start %s service. There are "
"too few backend servers available. Found %d "
"when %d is required.",
router->service->name,
router_nservers,
min_nservers)));
}
else
{
double pct = client_rses->rses_config.rw_max_slave_conn_percent/100;
double nservers = (double)router_nservers*pct;
if (client_rses->rses_config.rw_max_slave_conn_count <
min_nservers)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Unable to start %s service. There are "
"too few backend servers configured in "
"MaxScale.cnf. Found %d when %d is required.",
router->service->name,
client_rses->rses_config.rw_max_slave_conn_count,
min_nservers)));
}
if (nservers < min_nservers)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Unable to start %s service. There are "
"too few backend servers configured in "
"MaxScale.cnf. Found %d%% when at least %.0f%% "
"would be required.",
router->service->name,
client_rses->rses_config.rw_max_slave_conn_percent,
min_nservers/(((double)router_nservers)/100))));
}
}
free(client_rses);
client_rses = NULL;
goto return_rses;
}
/**
* Create backend reference objects for this session.
*/
backend_ref = (backend_ref_t *)calloc (1, router_nservers*sizeof(backend_ref_t));
backend_ref = (backend_ref_t *)calloc(1, router_nservers*sizeof(backend_ref_t));
if (backend_ref == NULL)
{
@ -593,8 +569,9 @@ static void* newSession(
&client_rses->rses_properties[RSES_PROP_TYPE_SESCMD];
backend_ref[i].bref_sescmd_cur.scmd_cur_cmd = NULL;
}
max_nslaves = rses_get_max_slavecount(client_rses, router_nservers);
max_nslaves = rses_get_max_slavecount(client_rses, router_nservers);
max_slave_rlag = rses_get_max_replication_lag(client_rses);
spinlock_init(&client_rses->rses_lock);
client_rses->rses_backend_ref = backend_ref;
@ -608,6 +585,7 @@ static void* newSession(
backend_ref,
router_nservers,
max_nslaves,
max_slave_rlag,
client_rses->rses_config.rw_slave_select_criteria,
session,
router);
@ -1530,6 +1508,9 @@ static void bref_set_state(
* @param max_nslaves - in, use
* Upper limit for the number of slaves. Configuration parameter or default.
*
* @param max_slave_rlag - in, use
* Maximum allowed replication lag for any slave. Configuration parameter or default.
*
* @param session - in, use
* MaxScale session pointer used when connection to backend is established.
*
@ -1549,6 +1530,7 @@ static bool select_connect_backend_servers(
backend_ref_t* backend_ref,
int router_nservers,
int max_nslaves,
int max_slave_rlag,
select_criteria_t select_criteria,
SESSION* session,
ROUTER_INSTANCE* router)
@ -1713,6 +1695,7 @@ static bool select_connect_backend_servers(
{
/* check also for relay servers and don't take the master_host */
if (slaves_found < max_nslaves &&
b->backend_server->rlag <= max_slave_rlag &&
(SERVER_IS_SLAVE(b->backend_server) || SERVER_IS_RELAY_SERVER(b->backend_server)) &&
(master_host != NULL && (b->backend_server != master_host->backend_server)))
{
@ -2966,6 +2949,7 @@ static bool handle_error_new_connection(
SESSION* ses;
int router_nservers;
int max_nslaves;
int max_slave_rlag;
backend_ref_t* bref;
bool succp;
@ -3018,6 +3002,7 @@ static bool handle_error_new_connection(
router_nservers = router_get_servercount(inst);
max_nslaves = rses_get_max_slavecount(rses, router_nservers);
max_slave_rlag = rses_get_max_replication_lag(rses);
/**
* Try to get replacement slave or at least the minimum
* number of slave connections for router session.
@ -3027,6 +3012,7 @@ static bool handle_error_new_connection(
rses->rses_backend_ref,
router_nservers,
max_nslaves,
max_slave_rlag,
rses->rses_config.rw_slave_select_criteria,
ses,
inst);
@ -3101,6 +3087,71 @@ static int router_get_servercount(
return router_nservers;
}
static bool have_enough_servers(
ROUTER_CLIENT_SES** p_rses,
const int min_nsrv,
int router_nsrv,
ROUTER_INSTANCE* router)
{
bool succp;
/** With too few servers session is not created */
if (router_nsrv < min_nsrv ||
MAX((*p_rses)->rses_config.rw_max_slave_conn_count,
(router_nsrv*(*p_rses)->rses_config.rw_max_slave_conn_percent)/100)
< min_nsrv)
{
if (router_nsrv < min_nsrv)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Unable to start %s service. There are "
"too few backend servers available. Found %d "
"when %d is required.",
router->service->name,
router_nsrv,
min_nsrv)));
}
else
{
double pct = (*p_rses)->rses_config.rw_max_slave_conn_percent/100;
double nservers = (double)router_nsrv*pct;
if ((*p_rses)->rses_config.rw_max_slave_conn_count < min_nsrv)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Unable to start %s service. There are "
"too few backend servers configured in "
"MaxScale.cnf. Found %d when %d is required.",
router->service->name,
(*p_rses)->rses_config.rw_max_slave_conn_count,
min_nsrv)));
}
if (nservers < min_nsrv)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Unable to start %s service. There are "
"too few backend servers configured in "
"MaxScale.cnf. Found %d%% when at least %.0f%% "
"would be required.",
router->service->name,
(*p_rses)->rses_config.rw_max_slave_conn_percent,
min_nsrv/(((double)router_nsrv)/100))));
}
}
free(*p_rses);
*p_rses = NULL;
succp = false;
}
else
{
succp = true;
}
return succp;
}
/**
* Find out the number of read backend servers.
* Depending on the configuration value type, either copy direct count
@ -3129,6 +3180,28 @@ static int rses_get_max_slavecount(
return max_nslaves;
}
static int rses_get_max_replication_lag(
ROUTER_CLIENT_SES* rses)
{
int conf_max_rlag;
CHK_CLIENT_RSES(rses);
/** if there is no configured value, then longest possible int is used */
if (rses->rses_config.rw_max_slave_replication_lag > 0)
{
conf_max_rlag = rses->rses_config.rw_max_slave_replication_lag;
}
else
{
conf_max_rlag = ~(1<<31);
}
return conf_max_rlag;
}
static backend_ref_t* get_bref_from_dcb(
ROUTER_CLIENT_SES* rses,
DCB* dcb)