From b875936a2112d9175c63e27e631a261e2f5164a0 Mon Sep 17 00:00:00 2001 From: VilhoRaatikka Date: Mon, 30 Jun 2014 22:35:27 +0300 Subject: [PATCH 1/2] MAX-94, Added configuration parameter, max_slave_replication_lag= to router section. Parameter can be changed runtime but it requires reloading of config. --- server/core/config.c | 108 ++++++++-- server/core/service.c | 23 ++- server/include/config.h | 12 +- server/include/service.h | 13 +- server/modules/include/readwritesplit.h | 2 + .../routing/readwritesplit/readwritesplit.c | 189 ++++++++++++------ 6 files changed, 257 insertions(+), 90 deletions(-) diff --git a/server/core/config.c b/server/core/config.c index efd72ea7d..a29b4dca4 100644 --- a/server/core/config.c +++ b/server/core/config.c @@ -222,6 +222,7 @@ int error_count = 0; if (router) { char* max_slave_conn_str; + char* max_slave_rlag_str; obj->element = service_alloc(obj->object, router); char *user = @@ -254,22 +255,30 @@ int error_count = 0; if (gateway.version_string) ((SERVICE *)(obj->element))->version_string = strdup(gateway.version_string); } - max_slave_conn_str = config_get_value(obj->parameters, "max_slave_connections"); - + + max_slave_rlag_str = + config_get_value(obj->parameters, + "max_slave_replication_lag"); + if (enable_root_user) - serviceEnableRootUser(obj->element, config_truth_value(enable_root_user)); + serviceEnableRootUser( + obj->element, + config_truth_value(enable_root_user)); if (weightby) serviceWeightBy(obj->element, weightby); if (!auth) - auth = config_get_value(obj->parameters, "auth"); + auth = config_get_value(obj->parameters, + "auth"); if (obj->element && user && auth) { - serviceSetUser(obj->element, user, auth); + serviceSetUser(obj->element, + user, + auth); } else if (user && auth == NULL) { @@ -280,6 +289,7 @@ int error_count = 0; "corresponding password.", obj->object))); } + /** Read, validate and set max_slave_connections */ if (max_slave_conn_str != NULL) { CONFIG_PARAMETER* param; @@ -288,11 +298,12 @@ int error_count = 0; param = config_get_param(obj->parameters, "max_slave_connections"); - succp = service_set_slave_conn_limit( + succp = service_set_param_value( obj->element, param, max_slave_conn_str, - COUNT_ATMOST); + COUNT_ATMOST, + (COUNT_TYPE|PERCENT_TYPE)); if (!succp) { @@ -309,6 +320,36 @@ int error_count = 0; param->value))); } } + /** Read, validate and set max_slave_replication_lag */ + if (max_slave_rlag_str != NULL) + { + CONFIG_PARAMETER* param; + bool succp; + + param = config_get_param( + obj->parameters, + "max_slave_replication_lag"); + + succp = service_set_param_value( + obj->element, + param, + max_slave_rlag_str, + COUNT_ATMOST, + COUNT_TYPE); + + if (!succp) + { + LOGIF(LM, (skygw_log_write( + LOGFILE_MESSAGE, + "* Warning : invalid value type " + "for parameter \'%s.%s = %s\'\n\tExpected " + "type is for maximum " + "slave replication lag.", + ((SERVICE*)obj->element)->name, + param->name, + param->value))); + } + } } else { @@ -910,6 +951,7 @@ SERVER *server; char *auth; char *enable_root_user; char* max_slave_conn_str; + char* max_slave_rlag_str; char *version_string; enable_root_user = config_get_value(obj->parameters, "enable_root_user"); @@ -934,24 +976,27 @@ SERVER *server; auth); if (enable_root_user) serviceEnableRootUser(service, atoi(enable_root_user)); + + /** Read, validate and set max_slave_connections */ max_slave_conn_str = config_get_value( obj->parameters, "max_slave_connections"); - + if (max_slave_conn_str != NULL) { CONFIG_PARAMETER* param; bool succp; param = config_get_param(obj->parameters, - "max_slave_connections"); + "max_slave_connections"); - succp = service_set_slave_conn_limit( - service, - param, - max_slave_conn_str, - COUNT_ATMOST); + succp = service_set_param_value( + service, + param, + max_slave_conn_str, + COUNT_ATMOST, + (PERCENT_TYPE|COUNT_TYPE)); if (!succp) { @@ -968,8 +1013,40 @@ SERVER *server; param->value))); } } - + /** Read, validate and set max_slave_replication_lag */ + max_slave_rlag_str = + config_get_value(obj->parameters, + "max_slave_replication_lag"); + if (max_slave_rlag_str != NULL) + { + CONFIG_PARAMETER* param; + bool succp; + + param = config_get_param( + obj->parameters, + "max_slave_replication_lag"); + + succp = service_set_param_value( + service, + param, + max_slave_rlag_str, + COUNT_ATMOST, + COUNT_TYPE); + + if (!succp) + { + LOGIF(LM, (skygw_log_write( + LOGFILE_MESSAGE, + "* Warning : invalid value type " + "for parameter \'%s.%s = %s\'\n\tExpected " + "type is for maximum " + "slave replication lag.", + ((SERVICE*)obj->element)->name, + param->name, + param->value))); + } + } } obj->element = service; @@ -1204,6 +1281,7 @@ static char *service_params[] = "passwd", "enable_root_user", "max_slave_connections", + "max_slave_replication_lag", "version_string", "filters", NULL diff --git a/server/core/service.c b/server/core/service.c index 3e558c4f2..7da419835 100644 --- a/server/core/service.c +++ b/server/core/service.c @@ -999,11 +999,12 @@ int service_refresh_users(SERVICE *service) { return 1; } -bool service_set_slave_conn_limit ( - SERVICE* service, - CONFIG_PARAMETER* param, - char* valstr, - count_spec_t count_spec) +bool service_set_param_value ( + SERVICE* service, + CONFIG_PARAMETER* param, + char* valstr, + count_spec_t count_spec, + config_param_type_t type) { char* p; int valint; @@ -1034,11 +1035,15 @@ bool service_set_slave_conn_limit ( { succp = false; } - else + else if (PARAM_IS_TYPE(type,PERCENT_TYPE)) { succp = true; config_set_qualified_param(param, (void *)&valint, PERCENT_TYPE); } + else + { + /** Log error */ + } } else { @@ -1053,11 +1058,15 @@ bool service_set_slave_conn_limit ( { succp = false; } - else + else if (PARAM_IS_TYPE(type,COUNT_TYPE)) { succp = true; config_set_qualified_param(param, (void *)&valint, COUNT_TYPE); } + else + { + /** Log error */ + } } if (succp) diff --git a/server/include/config.h b/server/include/config.h index dc94e3ad9..59cb096e8 100644 --- a/server/include/config.h +++ b/server/include/config.h @@ -39,13 +39,15 @@ enum {MAX_PARAM_LEN=256}; typedef enum { - UNDEFINED_TYPE=0, - STRING_TYPE, - COUNT_TYPE, - PERCENT_TYPE, - BOOL_TYPE + UNDEFINED_TYPE = 0x00, + STRING_TYPE = 0x01, + COUNT_TYPE = 0x02, + PERCENT_TYPE = 0x04, + BOOL_TYPE = 0x08 } config_param_type_t; +#define PARAM_IS_TYPE(p,t) ((p) & (t)) + /** * The config parameter */ diff --git a/server/include/service.h b/server/include/service.h index 43374e50d..cd13d411b 100644 --- a/server/include/service.h +++ b/server/include/service.h @@ -166,11 +166,14 @@ extern int service_refresh_users(SERVICE *); extern void printService(SERVICE *); extern void printAllServices(); extern void dprintAllServices(DCB *); -bool service_set_slave_conn_limit ( - SERVICE* service, - CONFIG_PARAMETER* param, - char* valstr, - count_spec_t count_spec); + +bool service_set_param_value ( + SERVICE* service, + CONFIG_PARAMETER* param, + char* valstr, + count_spec_t count_spec, + config_param_type_t type); + extern void dprintService(DCB *, SERVICE *); extern void dListServices(DCB *); extern void dListListeners(DCB *); diff --git a/server/modules/include/readwritesplit.h b/server/modules/include/readwritesplit.h index 4db72ae5e..4af967d47 100644 --- a/server/modules/include/readwritesplit.h +++ b/server/modules/include/readwritesplit.h @@ -99,6 +99,7 @@ typedef enum select_criteria { /** default values for rwsplit configuration parameters */ #define CONFIG_MAX_SLAVE_CONN 1 +#define CONFIG_MAX_SLAVE_RLAG -1 /*< not used */ #define GET_SELECT_CRITERIA(s) \ (strncmp(s,"LEAST_GLOBAL_CONNECTIONS", strlen("LEAST_GLOBAL_CONNECTIONS")) == 0 ? \ @@ -201,6 +202,7 @@ typedef struct rwsplit_config_st { int rw_max_slave_conn_percent; int rw_max_slave_conn_count; select_criteria_t rw_slave_select_criteria; + int rw_max_slave_replication_lag; } rwsplit_config_t; diff --git a/server/modules/routing/readwritesplit/readwritesplit.c b/server/modules/routing/readwritesplit/readwritesplit.c index 0bfd1b4f7..046aa59b4 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.c +++ b/server/modules/routing/readwritesplit/readwritesplit.c @@ -94,6 +94,7 @@ static void handleError( static void print_error_packet(ROUTER_CLIENT_SES* rses, GWBUF* buf, DCB* dcb); static int router_get_servercount(ROUTER_INSTANCE* router); static int rses_get_max_slavecount(ROUTER_CLIENT_SES* rses, int router_nservers); +static int rses_get_max_replication_lag(ROUTER_CLIENT_SES* rses); static backend_ref_t* get_bref_from_dcb(ROUTER_CLIENT_SES* rses, DCB* dcb); static uint8_t getCapabilities (ROUTER* inst, void* router_session); @@ -128,6 +129,7 @@ static bool select_connect_backend_servers( backend_ref_t* backend_ref, int router_nservers, int max_nslaves, + int max_rlag, select_criteria_t select_criteria, SESSION* session, ROUTER_INSTANCE* router); @@ -244,9 +246,15 @@ static bool handle_error_new_connection( GWBUF* errmsg); static bool handle_error_reply_client(SESSION* ses, GWBUF* errmsg); -static BACKEND *get_root_master( - backend_ref_t *servers, - int router_nservers); +static BACKEND* get_root_master( + backend_ref_t* servers, + int router_nservers); + +static bool have_enough_servers( + ROUTER_CLIENT_SES** rses, + const int nsrv, + int router_nsrv, + ROUTER_INSTANCE* router); static SPINLOCK instlock; static ROUTER_INSTANCE* instances; @@ -319,7 +327,7 @@ static void refreshInstance( /** - * Create an instance of read/write statemtn router within the MaxScale. + * Create an instance of read/write statement router within the MaxScale. * * * @param service The service this router is being create for @@ -416,12 +424,23 @@ static ROUTER* createInstance( { router->rwsplit_config.rw_slave_select_criteria = DEFAULT_CRITERIA; } - - /** + /** * Copy all config parameters from service to router instance. * Finally, copy version number to indicate that configs match. */ - param = config_get_param(service->svc_config_param, "max_slave_connections"); + param = config_get_param(service->svc_config_param, "max_slave_connections"); + + if (param != NULL) + { + refreshInstance(router, param); + router->rwsplit_version = service->svc_config_version; + } + /** + * Read default value for slave replication lag upper limit and then + * configured value if it exists. + */ + router->rwsplit_config.rw_max_slave_replication_lag = CONFIG_MAX_SLAVE_RLAG; + param = config_get_param(service->svc_config_param, "max_slave_replication_lag"); if (param != NULL) { @@ -462,6 +481,7 @@ static void* newSession( bool succp; int router_nservers = 0; /*< # of servers in total */ int max_nslaves; /*< max # of slaves used in this session */ + int max_slave_rlag; /*< max allowed replication lag for any slave */ int i; const int min_nservers = 1; /*< hard-coded for now */ static uint64_t router_client_ses_seq; /*< ID for client session */ @@ -508,61 +528,17 @@ static void* newSession( router_nservers = router_get_servercount(router); - /** With too few servers session is not created */ - if (router_nservers < min_nservers || - MAX(client_rses->rses_config.rw_max_slave_conn_count, - (router_nservers*client_rses->rses_config.rw_max_slave_conn_percent)/100) - < min_nservers) + if (!have_enough_servers(&client_rses, + min_nservers, + router_nservers, + router)) { - if (router_nservers < min_nservers) - { - LOGIF(LE, (skygw_log_write_flush( - LOGFILE_ERROR, - "Error : Unable to start %s service. There are " - "too few backend servers available. Found %d " - "when %d is required.", - router->service->name, - router_nservers, - min_nservers))); - } - else - { - double pct = client_rses->rses_config.rw_max_slave_conn_percent/100; - double nservers = (double)router_nservers*pct; - - if (client_rses->rses_config.rw_max_slave_conn_count < - min_nservers) - { - LOGIF(LE, (skygw_log_write_flush( - LOGFILE_ERROR, - "Error : Unable to start %s service. There are " - "too few backend servers configured in " - "MaxScale.cnf. Found %d when %d is required.", - router->service->name, - client_rses->rses_config.rw_max_slave_conn_count, - min_nservers))); - } - if (nservers < min_nservers) - { - LOGIF(LE, (skygw_log_write_flush( - LOGFILE_ERROR, - "Error : Unable to start %s service. There are " - "too few backend servers configured in " - "MaxScale.cnf. Found %d%% when at least %.0f%% " - "would be required.", - router->service->name, - client_rses->rses_config.rw_max_slave_conn_percent, - min_nservers/(((double)router_nservers)/100)))); - } - } - free(client_rses); - client_rses = NULL; goto return_rses; } /** * Create backend reference objects for this session. */ - backend_ref = (backend_ref_t *)calloc (1, router_nservers*sizeof(backend_ref_t)); + backend_ref = (backend_ref_t *)calloc(1, router_nservers*sizeof(backend_ref_t)); if (backend_ref == NULL) { @@ -593,8 +569,9 @@ static void* newSession( &client_rses->rses_properties[RSES_PROP_TYPE_SESCMD]; backend_ref[i].bref_sescmd_cur.scmd_cur_cmd = NULL; } - max_nslaves = rses_get_max_slavecount(client_rses, router_nservers); - + max_nslaves = rses_get_max_slavecount(client_rses, router_nservers); + max_slave_rlag = rses_get_max_replication_lag(client_rses); + spinlock_init(&client_rses->rses_lock); client_rses->rses_backend_ref = backend_ref; @@ -608,6 +585,7 @@ static void* newSession( backend_ref, router_nservers, max_nslaves, + max_slave_rlag, client_rses->rses_config.rw_slave_select_criteria, session, router); @@ -1530,6 +1508,9 @@ static void bref_set_state( * @param max_nslaves - in, use * Upper limit for the number of slaves. Configuration parameter or default. * + * @param max_slave_rlag - in, use + * Maximum allowed replication lag for any slave. Configuration parameter or default. + * * @param session - in, use * MaxScale session pointer used when connection to backend is established. * @@ -1549,6 +1530,7 @@ static bool select_connect_backend_servers( backend_ref_t* backend_ref, int router_nservers, int max_nslaves, + int max_slave_rlag, select_criteria_t select_criteria, SESSION* session, ROUTER_INSTANCE* router) @@ -1713,6 +1695,7 @@ static bool select_connect_backend_servers( { /* check also for relay servers and don't take the master_host */ if (slaves_found < max_nslaves && + b->backend_server->rlag <= max_slave_rlag && (SERVER_IS_SLAVE(b->backend_server) || SERVER_IS_RELAY_SERVER(b->backend_server)) && (master_host != NULL && (b->backend_server != master_host->backend_server))) { @@ -2966,6 +2949,7 @@ static bool handle_error_new_connection( SESSION* ses; int router_nservers; int max_nslaves; + int max_slave_rlag; backend_ref_t* bref; bool succp; @@ -3018,6 +3002,7 @@ static bool handle_error_new_connection( router_nservers = router_get_servercount(inst); max_nslaves = rses_get_max_slavecount(rses, router_nservers); + max_slave_rlag = rses_get_max_replication_lag(rses); /** * Try to get replacement slave or at least the minimum * number of slave connections for router session. @@ -3027,6 +3012,7 @@ static bool handle_error_new_connection( rses->rses_backend_ref, router_nservers, max_nslaves, + max_slave_rlag, rses->rses_config.rw_slave_select_criteria, ses, inst); @@ -3101,6 +3087,71 @@ static int router_get_servercount( return router_nservers; } +static bool have_enough_servers( + ROUTER_CLIENT_SES** p_rses, + const int min_nsrv, + int router_nsrv, + ROUTER_INSTANCE* router) +{ + bool succp; + + /** With too few servers session is not created */ + if (router_nsrv < min_nsrv || + MAX((*p_rses)->rses_config.rw_max_slave_conn_count, + (router_nsrv*(*p_rses)->rses_config.rw_max_slave_conn_percent)/100) + < min_nsrv) + { + if (router_nsrv < min_nsrv) + { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Unable to start %s service. There are " + "too few backend servers available. Found %d " + "when %d is required.", + router->service->name, + router_nsrv, + min_nsrv))); + } + else + { + double pct = (*p_rses)->rses_config.rw_max_slave_conn_percent/100; + double nservers = (double)router_nsrv*pct; + + if ((*p_rses)->rses_config.rw_max_slave_conn_count < min_nsrv) + { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Unable to start %s service. There are " + "too few backend servers configured in " + "MaxScale.cnf. Found %d when %d is required.", + router->service->name, + (*p_rses)->rses_config.rw_max_slave_conn_count, + min_nsrv))); + } + if (nservers < min_nsrv) + { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Unable to start %s service. There are " + "too few backend servers configured in " + "MaxScale.cnf. Found %d%% when at least %.0f%% " + "would be required.", + router->service->name, + (*p_rses)->rses_config.rw_max_slave_conn_percent, + min_nsrv/(((double)router_nsrv)/100)))); + } + } + free(*p_rses); + *p_rses = NULL; + succp = false; + } + else + { + succp = true; + } + return succp; +} + /** * Find out the number of read backend servers. * Depending on the configuration value type, either copy direct count @@ -3129,6 +3180,28 @@ static int rses_get_max_slavecount( return max_nslaves; } + +static int rses_get_max_replication_lag( + ROUTER_CLIENT_SES* rses) +{ + int conf_max_rlag; + + CHK_CLIENT_RSES(rses); + + /** if there is no configured value, then longest possible int is used */ + if (rses->rses_config.rw_max_slave_replication_lag > 0) + { + conf_max_rlag = rses->rses_config.rw_max_slave_replication_lag; + } + else + { + conf_max_rlag = ~(1<<31); + } + + return conf_max_rlag; +} + + static backend_ref_t* get_bref_from_dcb( ROUTER_CLIENT_SES* rses, DCB* dcb) From 3b6b33b7dd143ff79faa6ad44f59d93231a277da Mon Sep 17 00:00:00 2001 From: VilhoRaatikka Date: Mon, 30 Jun 2014 22:50:31 +0300 Subject: [PATCH 2/2] Modified refreshInstance to support max_slave_replication_lag. --- server/modules/routing/readwritesplit/readwritesplit.c | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/server/modules/routing/readwritesplit/readwritesplit.c b/server/modules/routing/readwritesplit/readwritesplit.c index 7b6826c5b..f2d4debc5 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.c +++ b/server/modules/routing/readwritesplit/readwritesplit.c @@ -316,6 +316,13 @@ static void refreshInstance( router->rwsplit_config.rw_max_slave_conn_count = config_get_valint(param, NULL, paramtype); } + else if (strncmp(param->name, + "max_slave_replication_lag", + MAX_PARAM_LEN) == 0) + { + router->rwsplit_config.rw_max_slave_replication_lag = + config_get_valint(param, NULL, paramtype); + } } else if (paramtype == PERCENT_TYPE) {