Merge pull request #8 from skysql/MAX-94

Max 94
This commit is contained in:
Mark Riddoch
2014-07-01 08:18:09 +01:00
6 changed files with 264 additions and 90 deletions

View File

@ -222,6 +222,7 @@ int error_count = 0;
if (router) if (router)
{ {
char* max_slave_conn_str; char* max_slave_conn_str;
char* max_slave_rlag_str;
obj->element = service_alloc(obj->object, router); obj->element = service_alloc(obj->object, router);
char *user = char *user =
@ -254,22 +255,30 @@ int error_count = 0;
if (gateway.version_string) if (gateway.version_string)
((SERVICE *)(obj->element))->version_string = strdup(gateway.version_string); ((SERVICE *)(obj->element))->version_string = strdup(gateway.version_string);
} }
max_slave_conn_str = max_slave_conn_str =
config_get_value(obj->parameters, config_get_value(obj->parameters,
"max_slave_connections"); "max_slave_connections");
max_slave_rlag_str =
config_get_value(obj->parameters,
"max_slave_replication_lag");
if (enable_root_user) if (enable_root_user)
serviceEnableRootUser(obj->element, config_truth_value(enable_root_user)); serviceEnableRootUser(
obj->element,
config_truth_value(enable_root_user));
if (weightby) if (weightby)
serviceWeightBy(obj->element, weightby); serviceWeightBy(obj->element, weightby);
if (!auth) if (!auth)
auth = config_get_value(obj->parameters, "auth"); auth = config_get_value(obj->parameters,
"auth");
if (obj->element && user && auth) if (obj->element && user && auth)
{ {
serviceSetUser(obj->element, user, auth); serviceSetUser(obj->element,
user,
auth);
} }
else if (user && auth == NULL) else if (user && auth == NULL)
{ {
@ -280,6 +289,7 @@ int error_count = 0;
"corresponding password.", "corresponding password.",
obj->object))); obj->object)));
} }
/** Read, validate and set max_slave_connections */
if (max_slave_conn_str != NULL) if (max_slave_conn_str != NULL)
{ {
CONFIG_PARAMETER* param; CONFIG_PARAMETER* param;
@ -288,11 +298,12 @@ int error_count = 0;
param = config_get_param(obj->parameters, param = config_get_param(obj->parameters,
"max_slave_connections"); "max_slave_connections");
succp = service_set_slave_conn_limit( succp = service_set_param_value(
obj->element, obj->element,
param, param,
max_slave_conn_str, max_slave_conn_str,
COUNT_ATMOST); COUNT_ATMOST,
(COUNT_TYPE|PERCENT_TYPE));
if (!succp) if (!succp)
{ {
@ -309,6 +320,36 @@ int error_count = 0;
param->value))); param->value)));
} }
} }
/** Read, validate and set max_slave_replication_lag */
if (max_slave_rlag_str != NULL)
{
CONFIG_PARAMETER* param;
bool succp;
param = config_get_param(
obj->parameters,
"max_slave_replication_lag");
succp = service_set_param_value(
obj->element,
param,
max_slave_rlag_str,
COUNT_ATMOST,
COUNT_TYPE);
if (!succp)
{
LOGIF(LM, (skygw_log_write(
LOGFILE_MESSAGE,
"* Warning : invalid value type "
"for parameter \'%s.%s = %s\'\n\tExpected "
"type is <int> for maximum "
"slave replication lag.",
((SERVICE*)obj->element)->name,
param->name,
param->value)));
}
}
} }
else else
{ {
@ -910,6 +951,7 @@ SERVER *server;
char *auth; char *auth;
char *enable_root_user; char *enable_root_user;
char* max_slave_conn_str; char* max_slave_conn_str;
char* max_slave_rlag_str;
char *version_string; char *version_string;
enable_root_user = config_get_value(obj->parameters, "enable_root_user"); enable_root_user = config_get_value(obj->parameters, "enable_root_user");
@ -934,24 +976,27 @@ SERVER *server;
auth); auth);
if (enable_root_user) if (enable_root_user)
serviceEnableRootUser(service, atoi(enable_root_user)); serviceEnableRootUser(service, atoi(enable_root_user));
/** Read, validate and set max_slave_connections */
max_slave_conn_str = max_slave_conn_str =
config_get_value( config_get_value(
obj->parameters, obj->parameters,
"max_slave_connections"); "max_slave_connections");
if (max_slave_conn_str != NULL) if (max_slave_conn_str != NULL)
{ {
CONFIG_PARAMETER* param; CONFIG_PARAMETER* param;
bool succp; bool succp;
param = config_get_param(obj->parameters, param = config_get_param(obj->parameters,
"max_slave_connections"); "max_slave_connections");
succp = service_set_slave_conn_limit( succp = service_set_param_value(
service, service,
param, param,
max_slave_conn_str, max_slave_conn_str,
COUNT_ATMOST); COUNT_ATMOST,
(PERCENT_TYPE|COUNT_TYPE));
if (!succp) if (!succp)
{ {
@ -968,8 +1013,40 @@ SERVER *server;
param->value))); param->value)));
} }
} }
/** Read, validate and set max_slave_replication_lag */
max_slave_rlag_str =
config_get_value(obj->parameters,
"max_slave_replication_lag");
if (max_slave_rlag_str != NULL)
{
CONFIG_PARAMETER* param;
bool succp;
param = config_get_param(
obj->parameters,
"max_slave_replication_lag");
succp = service_set_param_value(
service,
param,
max_slave_rlag_str,
COUNT_ATMOST,
COUNT_TYPE);
if (!succp)
{
LOGIF(LM, (skygw_log_write(
LOGFILE_MESSAGE,
"* Warning : invalid value type "
"for parameter \'%s.%s = %s\'\n\tExpected "
"type is <int> for maximum "
"slave replication lag.",
((SERVICE*)obj->element)->name,
param->name,
param->value)));
}
}
} }
obj->element = service; obj->element = service;
@ -1204,6 +1281,7 @@ static char *service_params[] =
"passwd", "passwd",
"enable_root_user", "enable_root_user",
"max_slave_connections", "max_slave_connections",
"max_slave_replication_lag",
"version_string", "version_string",
"filters", "filters",
NULL NULL

View File

@ -999,11 +999,12 @@ int service_refresh_users(SERVICE *service) {
return 1; return 1;
} }
bool service_set_slave_conn_limit ( bool service_set_param_value (
SERVICE* service, SERVICE* service,
CONFIG_PARAMETER* param, CONFIG_PARAMETER* param,
char* valstr, char* valstr,
count_spec_t count_spec) count_spec_t count_spec,
config_param_type_t type)
{ {
char* p; char* p;
int valint; int valint;
@ -1034,11 +1035,15 @@ bool service_set_slave_conn_limit (
{ {
succp = false; succp = false;
} }
else else if (PARAM_IS_TYPE(type,PERCENT_TYPE))
{ {
succp = true; succp = true;
config_set_qualified_param(param, (void *)&valint, PERCENT_TYPE); config_set_qualified_param(param, (void *)&valint, PERCENT_TYPE);
} }
else
{
/** Log error */
}
} }
else else
{ {
@ -1053,11 +1058,15 @@ bool service_set_slave_conn_limit (
{ {
succp = false; succp = false;
} }
else else if (PARAM_IS_TYPE(type,COUNT_TYPE))
{ {
succp = true; succp = true;
config_set_qualified_param(param, (void *)&valint, COUNT_TYPE); config_set_qualified_param(param, (void *)&valint, COUNT_TYPE);
} }
else
{
/** Log error */
}
} }
if (succp) if (succp)

View File

@ -39,13 +39,15 @@
enum {MAX_PARAM_LEN=256}; enum {MAX_PARAM_LEN=256};
typedef enum { typedef enum {
UNDEFINED_TYPE=0, UNDEFINED_TYPE = 0x00,
STRING_TYPE, STRING_TYPE = 0x01,
COUNT_TYPE, COUNT_TYPE = 0x02,
PERCENT_TYPE, PERCENT_TYPE = 0x04,
BOOL_TYPE BOOL_TYPE = 0x08
} config_param_type_t; } config_param_type_t;
#define PARAM_IS_TYPE(p,t) ((p) & (t))
/** /**
* The config parameter * The config parameter
*/ */

View File

@ -166,11 +166,14 @@ extern int service_refresh_users(SERVICE *);
extern void printService(SERVICE *); extern void printService(SERVICE *);
extern void printAllServices(); extern void printAllServices();
extern void dprintAllServices(DCB *); extern void dprintAllServices(DCB *);
bool service_set_slave_conn_limit (
SERVICE* service, bool service_set_param_value (
CONFIG_PARAMETER* param, SERVICE* service,
char* valstr, CONFIG_PARAMETER* param,
count_spec_t count_spec); char* valstr,
count_spec_t count_spec,
config_param_type_t type);
extern void dprintService(DCB *, SERVICE *); extern void dprintService(DCB *, SERVICE *);
extern void dListServices(DCB *); extern void dListServices(DCB *);
extern void dListListeners(DCB *); extern void dListListeners(DCB *);

View File

@ -102,6 +102,7 @@ typedef enum select_criteria {
/** default values for rwsplit configuration parameters */ /** default values for rwsplit configuration parameters */
#define CONFIG_MAX_SLAVE_CONN 1 #define CONFIG_MAX_SLAVE_CONN 1
#define CONFIG_MAX_SLAVE_RLAG -1 /*< not used */
#define GET_SELECT_CRITERIA(s) \ #define GET_SELECT_CRITERIA(s) \
(strncmp(s,"LEAST_GLOBAL_CONNECTIONS", strlen("LEAST_GLOBAL_CONNECTIONS")) == 0 ? \ (strncmp(s,"LEAST_GLOBAL_CONNECTIONS", strlen("LEAST_GLOBAL_CONNECTIONS")) == 0 ? \
@ -207,6 +208,7 @@ typedef struct rwsplit_config_st {
int rw_max_slave_conn_percent; int rw_max_slave_conn_percent;
int rw_max_slave_conn_count; int rw_max_slave_conn_count;
select_criteria_t rw_slave_select_criteria; select_criteria_t rw_slave_select_criteria;
int rw_max_slave_replication_lag;
} rwsplit_config_t; } rwsplit_config_t;

View File

@ -94,6 +94,7 @@ static void handleError(
static void print_error_packet(ROUTER_CLIENT_SES* rses, GWBUF* buf, DCB* dcb); static void print_error_packet(ROUTER_CLIENT_SES* rses, GWBUF* buf, DCB* dcb);
static int router_get_servercount(ROUTER_INSTANCE* router); static int router_get_servercount(ROUTER_INSTANCE* router);
static int rses_get_max_slavecount(ROUTER_CLIENT_SES* rses, int router_nservers); static int rses_get_max_slavecount(ROUTER_CLIENT_SES* rses, int router_nservers);
static int rses_get_max_replication_lag(ROUTER_CLIENT_SES* rses);
static backend_ref_t* get_bref_from_dcb(ROUTER_CLIENT_SES* rses, DCB* dcb); static backend_ref_t* get_bref_from_dcb(ROUTER_CLIENT_SES* rses, DCB* dcb);
static uint8_t getCapabilities (ROUTER* inst, void* router_session); static uint8_t getCapabilities (ROUTER* inst, void* router_session);
@ -134,6 +135,7 @@ static bool select_connect_backend_servers(
backend_ref_t* backend_ref, backend_ref_t* backend_ref,
int router_nservers, int router_nservers,
int max_nslaves, int max_nslaves,
int max_rlag,
select_criteria_t select_criteria, select_criteria_t select_criteria,
SESSION* session, SESSION* session,
ROUTER_INSTANCE* router); ROUTER_INSTANCE* router);
@ -247,9 +249,15 @@ static bool handle_error_new_connection(
GWBUF* errmsg); GWBUF* errmsg);
static bool handle_error_reply_client(SESSION* ses, GWBUF* errmsg); static bool handle_error_reply_client(SESSION* ses, GWBUF* errmsg);
static BACKEND *get_root_master( static BACKEND* get_root_master(
backend_ref_t *servers, backend_ref_t* servers,
int router_nservers); int router_nservers);
static bool have_enough_servers(
ROUTER_CLIENT_SES** rses,
const int nsrv,
int router_nsrv,
ROUTER_INSTANCE* router);
static SPINLOCK instlock; static SPINLOCK instlock;
static ROUTER_INSTANCE* instances; static ROUTER_INSTANCE* instances;
@ -308,6 +316,13 @@ static void refreshInstance(
router->rwsplit_config.rw_max_slave_conn_count = router->rwsplit_config.rw_max_slave_conn_count =
config_get_valint(param, NULL, paramtype); config_get_valint(param, NULL, paramtype);
} }
else if (strncmp(param->name,
"max_slave_replication_lag",
MAX_PARAM_LEN) == 0)
{
router->rwsplit_config.rw_max_slave_replication_lag =
config_get_valint(param, NULL, paramtype);
}
} }
else if (paramtype == PERCENT_TYPE) else if (paramtype == PERCENT_TYPE)
{ {
@ -322,7 +337,7 @@ static void refreshInstance(
/** /**
* Create an instance of read/write statemtn router within the MaxScale. * Create an instance of read/write statement router within the MaxScale.
* *
* *
* @param service The service this router is being create for * @param service The service this router is being create for
@ -419,12 +434,23 @@ static ROUTER* createInstance(
{ {
router->rwsplit_config.rw_slave_select_criteria = DEFAULT_CRITERIA; router->rwsplit_config.rw_slave_select_criteria = DEFAULT_CRITERIA;
} }
/**
/**
* Copy all config parameters from service to router instance. * Copy all config parameters from service to router instance.
* Finally, copy version number to indicate that configs match. * Finally, copy version number to indicate that configs match.
*/ */
param = config_get_param(service->svc_config_param, "max_slave_connections"); param = config_get_param(service->svc_config_param, "max_slave_connections");
if (param != NULL)
{
refreshInstance(router, param);
router->rwsplit_version = service->svc_config_version;
}
/**
* Read default value for slave replication lag upper limit and then
* configured value if it exists.
*/
router->rwsplit_config.rw_max_slave_replication_lag = CONFIG_MAX_SLAVE_RLAG;
param = config_get_param(service->svc_config_param, "max_slave_replication_lag");
if (param != NULL) if (param != NULL)
{ {
@ -465,6 +491,7 @@ static void* newSession(
bool succp; bool succp;
int router_nservers = 0; /*< # of servers in total */ int router_nservers = 0; /*< # of servers in total */
int max_nslaves; /*< max # of slaves used in this session */ int max_nslaves; /*< max # of slaves used in this session */
int max_slave_rlag; /*< max allowed replication lag for any slave */
int i; int i;
const int min_nservers = 1; /*< hard-coded for now */ const int min_nservers = 1; /*< hard-coded for now */
@ -510,61 +537,17 @@ static void* newSession(
router_nservers = router_get_servercount(router); router_nservers = router_get_servercount(router);
/** With too few servers session is not created */ if (!have_enough_servers(&client_rses,
if (router_nservers < min_nservers || min_nservers,
MAX(client_rses->rses_config.rw_max_slave_conn_count, router_nservers,
(router_nservers*client_rses->rses_config.rw_max_slave_conn_percent)/100) router))
< min_nservers)
{ {
if (router_nservers < min_nservers)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Unable to start %s service. There are "
"too few backend servers available. Found %d "
"when %d is required.",
router->service->name,
router_nservers,
min_nservers)));
}
else
{
double pct = client_rses->rses_config.rw_max_slave_conn_percent/100;
double nservers = (double)router_nservers*pct;
if (client_rses->rses_config.rw_max_slave_conn_count <
min_nservers)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Unable to start %s service. There are "
"too few backend servers configured in "
"MaxScale.cnf. Found %d when %d is required.",
router->service->name,
client_rses->rses_config.rw_max_slave_conn_count,
min_nservers)));
}
if (nservers < min_nservers)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Unable to start %s service. There are "
"too few backend servers configured in "
"MaxScale.cnf. Found %d%% when at least %.0f%% "
"would be required.",
router->service->name,
client_rses->rses_config.rw_max_slave_conn_percent,
min_nservers/(((double)router_nservers)/100))));
}
}
free(client_rses);
client_rses = NULL;
goto return_rses; goto return_rses;
} }
/** /**
* Create backend reference objects for this session. * Create backend reference objects for this session.
*/ */
backend_ref = (backend_ref_t *)calloc (1, router_nservers*sizeof(backend_ref_t)); backend_ref = (backend_ref_t *)calloc(1, router_nservers*sizeof(backend_ref_t));
if (backend_ref == NULL) if (backend_ref == NULL)
{ {
@ -595,8 +578,9 @@ static void* newSession(
&client_rses->rses_properties[RSES_PROP_TYPE_SESCMD]; &client_rses->rses_properties[RSES_PROP_TYPE_SESCMD];
backend_ref[i].bref_sescmd_cur.scmd_cur_cmd = NULL; backend_ref[i].bref_sescmd_cur.scmd_cur_cmd = NULL;
} }
max_nslaves = rses_get_max_slavecount(client_rses, router_nservers); max_nslaves = rses_get_max_slavecount(client_rses, router_nservers);
max_slave_rlag = rses_get_max_replication_lag(client_rses);
spinlock_init(&client_rses->rses_lock); spinlock_init(&client_rses->rses_lock);
client_rses->rses_backend_ref = backend_ref; client_rses->rses_backend_ref = backend_ref;
@ -610,6 +594,7 @@ static void* newSession(
backend_ref, backend_ref,
router_nservers, router_nservers,
max_nslaves, max_nslaves,
max_slave_rlag,
client_rses->rses_config.rw_slave_select_criteria, client_rses->rses_config.rw_slave_select_criteria,
session, session,
router); router);
@ -1621,6 +1606,9 @@ static void bref_set_state(
* @param max_nslaves - in, use * @param max_nslaves - in, use
* Upper limit for the number of slaves. Configuration parameter or default. * Upper limit for the number of slaves. Configuration parameter or default.
* *
* @param max_slave_rlag - in, use
* Maximum allowed replication lag for any slave. Configuration parameter or default.
*
* @param session - in, use * @param session - in, use
* MaxScale session pointer used when connection to backend is established. * MaxScale session pointer used when connection to backend is established.
* *
@ -1640,6 +1628,7 @@ static bool select_connect_backend_servers(
backend_ref_t* backend_ref, backend_ref_t* backend_ref,
int router_nservers, int router_nservers,
int max_nslaves, int max_nslaves,
int max_slave_rlag,
select_criteria_t select_criteria, select_criteria_t select_criteria,
SESSION* session, SESSION* session,
ROUTER_INSTANCE* router) ROUTER_INSTANCE* router)
@ -1809,6 +1798,7 @@ static bool select_connect_backend_servers(
{ {
/* check also for relay servers and don't take the master_host */ /* check also for relay servers and don't take the master_host */
if (slaves_found < max_nslaves && if (slaves_found < max_nslaves &&
b->backend_server->rlag <= max_slave_rlag &&
(SERVER_IS_SLAVE(b->backend_server) || SERVER_IS_RELAY_SERVER(b->backend_server)) && (SERVER_IS_SLAVE(b->backend_server) || SERVER_IS_RELAY_SERVER(b->backend_server)) &&
(master_host != NULL && (b->backend_server != master_host->backend_server))) (master_host != NULL && (b->backend_server != master_host->backend_server)))
{ {
@ -3048,6 +3038,7 @@ static bool handle_error_new_connection(
SESSION* ses; SESSION* ses;
int router_nservers; int router_nservers;
int max_nslaves; int max_nslaves;
int max_slave_rlag;
backend_ref_t* bref; backend_ref_t* bref;
bool succp; bool succp;
@ -3098,6 +3089,7 @@ static bool handle_error_new_connection(
router_nservers = router_get_servercount(inst); router_nservers = router_get_servercount(inst);
max_nslaves = rses_get_max_slavecount(rses, router_nservers); max_nslaves = rses_get_max_slavecount(rses, router_nservers);
max_slave_rlag = rses_get_max_replication_lag(rses);
/** /**
* Try to get replacement slave or at least the minimum * Try to get replacement slave or at least the minimum
* number of slave connections for router session. * number of slave connections for router session.
@ -3107,6 +3099,7 @@ static bool handle_error_new_connection(
rses->rses_backend_ref, rses->rses_backend_ref,
router_nservers, router_nservers,
max_nslaves, max_nslaves,
max_slave_rlag,
rses->rses_config.rw_slave_select_criteria, rses->rses_config.rw_slave_select_criteria,
ses, ses,
inst); inst);
@ -3181,6 +3174,71 @@ static int router_get_servercount(
return router_nservers; return router_nservers;
} }
static bool have_enough_servers(
ROUTER_CLIENT_SES** p_rses,
const int min_nsrv,
int router_nsrv,
ROUTER_INSTANCE* router)
{
bool succp;
/** With too few servers session is not created */
if (router_nsrv < min_nsrv ||
MAX((*p_rses)->rses_config.rw_max_slave_conn_count,
(router_nsrv*(*p_rses)->rses_config.rw_max_slave_conn_percent)/100)
< min_nsrv)
{
if (router_nsrv < min_nsrv)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Unable to start %s service. There are "
"too few backend servers available. Found %d "
"when %d is required.",
router->service->name,
router_nsrv,
min_nsrv)));
}
else
{
double pct = (*p_rses)->rses_config.rw_max_slave_conn_percent/100;
double nservers = (double)router_nsrv*pct;
if ((*p_rses)->rses_config.rw_max_slave_conn_count < min_nsrv)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Unable to start %s service. There are "
"too few backend servers configured in "
"MaxScale.cnf. Found %d when %d is required.",
router->service->name,
(*p_rses)->rses_config.rw_max_slave_conn_count,
min_nsrv)));
}
if (nservers < min_nsrv)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Unable to start %s service. There are "
"too few backend servers configured in "
"MaxScale.cnf. Found %d%% when at least %.0f%% "
"would be required.",
router->service->name,
(*p_rses)->rses_config.rw_max_slave_conn_percent,
min_nsrv/(((double)router_nsrv)/100))));
}
}
free(*p_rses);
*p_rses = NULL;
succp = false;
}
else
{
succp = true;
}
return succp;
}
/** /**
* Find out the number of read backend servers. * Find out the number of read backend servers.
* Depending on the configuration value type, either copy direct count * Depending on the configuration value type, either copy direct count
@ -3209,6 +3267,28 @@ static int rses_get_max_slavecount(
return max_nslaves; return max_nslaves;
} }
static int rses_get_max_replication_lag(
ROUTER_CLIENT_SES* rses)
{
int conf_max_rlag;
CHK_CLIENT_RSES(rses);
/** if there is no configured value, then longest possible int is used */
if (rses->rses_config.rw_max_slave_replication_lag > 0)
{
conf_max_rlag = rses->rses_config.rw_max_slave_replication_lag;
}
else
{
conf_max_rlag = ~(1<<31);
}
return conf_max_rlag;
}
static backend_ref_t* get_bref_from_dcb( static backend_ref_t* get_bref_from_dcb(
ROUTER_CLIENT_SES* rses, ROUTER_CLIENT_SES* rses,
DCB* dcb) DCB* dcb)