diff --git a/server/core/config.c b/server/core/config.c index d964ba5fa..00fd947ce 100644 --- a/server/core/config.c +++ b/server/core/config.c @@ -195,6 +195,8 @@ int error_count = 0; "router"); if (router) { + char* max_slave_conn_str; + obj->element = service_alloc(obj->object, router); char *user = config_get_value(obj->parameters, "user"); @@ -203,6 +205,10 @@ int error_count = 0; char *enable_root_user = config_get_value(obj->parameters, "enable_root_user"); + max_slave_conn_str = + config_get_value(obj->parameters, + "max_slave_connections"); + if (enable_root_user) serviceEnableRootUser(obj->element, atoi(enable_root_user)); @@ -222,6 +228,35 @@ int error_count = 0; "corresponding password.", obj->object))); } + if (max_slave_conn_str != NULL) + { + CONFIG_PARAMETER* param; + bool succp; + + param = config_get_param(obj->parameters, + "max_slave_connections"); + + succp = service_set_slave_conn_limit( + obj->element, + param, + max_slave_conn_str, + COUNT_ATMOST); + + if (!succp) + { + LOGIF(LM, (skygw_log_write( + LOGFILE_MESSAGE, + "* Warning : invalid value type " + "for parameter \'%s.%s = %s\'\n\tExpected " + "type is either for slave connection " + "count or\n\t%% for specifying the " + "maximum percentage of available the " + "slaves that will be connected.", + ((SERVICE*)obj->element)->name, + param->name, + param->value))); + } + } } else { @@ -515,6 +550,89 @@ config_get_value(CONFIG_PARAMETER *params, const char *name) return NULL; } + +CONFIG_PARAMETER* config_get_param( + CONFIG_PARAMETER* params, + const char* name) +{ + while (params) + { + if (!strcmp(params->name, name)) + return params; + params = params->next; + } + return NULL; +} + +config_param_type_t config_get_paramtype( + CONFIG_PARAMETER* param) +{ + return param->qfd_param_type; +} + +int config_get_valint( + CONFIG_PARAMETER* param, + const char* name, /*< if NULL examine current param only */ + config_param_type_t ptype) +{ + int val = -1; /*< -1 indicates failure */ + + while (param) + { + if (name == NULL || !strncmp(param->name, name, MAX_PARAM_LEN)) + { + switch (ptype) { + case COUNT_TYPE: + val = param->qfd.valcount; + goto return_val; + + case PERCENT_TYPE: + val = param->qfd.valpercent; + goto return_val; + + case BOOL_TYPE: + val = param->qfd.valbool; + goto return_val; + + default: + goto return_val; + } + } + else if (name == NULL) + { + goto return_val; + } + param = param->next; + } +return_val: + return val; +} + + +CONFIG_PARAMETER* config_clone_param( + CONFIG_PARAMETER* param) +{ + CONFIG_PARAMETER* p2; + + p2 = (CONFIG_PARAMETER*) malloc(sizeof(CONFIG_PARAMETER)); + + if (p2 == NULL) + { + goto return_p2; + } + memcpy(p2, param, sizeof(CONFIG_PARAMETER)); + p2->name = strndup(param->name, MAX_PARAM_LEN); + p2->value = strndup(param->value, MAX_PARAM_LEN); + + if (param->qfd_param_type == STRING_TYPE) + { + p2->qfd.valstr = strndup(param->qfd.valstr, MAX_PARAM_LEN); + } + +return_p2: + return p2; +} + /** * Free a config tree * @@ -861,6 +979,7 @@ static char *service_params[] = "user", "passwd", "enable_root_user", + "max_slave_connections", NULL }; @@ -950,3 +1069,47 @@ int i; obj = obj->next; } } + +/** + * Set qualified parameter value to CONFIG_PARAMETER struct. + */ +bool config_set_qualified_param( + CONFIG_PARAMETER* param, + void* val, + config_param_type_t type) +{ + bool succp; + + switch (type) { + case STRING_TYPE: + param->qfd.valstr = strndup((const char *)val, MAX_PARAM_LEN); + succp = true; + break; + + case COUNT_TYPE: + param->qfd.valcount = *(int *)val; + succp = true; + break; + + case PERCENT_TYPE: + param->qfd.valpercent = *(int *)val; + succp = true; + break; + + case BOOL_TYPE: + param->qfd.valbool = *(bool *)val; + succp = true; + break; + + default: + succp = false; + break; + } + + if (succp) + { + param->qfd_param_type = type; + } + return succp; +} + diff --git a/server/core/service.c b/server/core/service.c index db41f881a..16532a1fa 100644 --- a/server/core/service.c +++ b/server/core/service.c @@ -34,6 +34,8 @@ #include #include #include +#include +#include #include #include #include @@ -52,6 +54,11 @@ extern int lm_enabled_logfiles_bitmask; static SPINLOCK service_spin = SPINLOCK_INIT; static SERVICE *allServices = NULL; +static void service_add_qualified_param( + SERVICE* svc, + CONFIG_PARAMETER* param); + + /** * Allocate a new service for the gateway to support * @@ -752,3 +759,95 @@ int service_refresh_users(SERVICE *service) { else return 1; } + +bool service_set_slave_conn_limit ( + SERVICE* service, + CONFIG_PARAMETER* param, + char* valstr, + count_spec_t count_spec) +{ + char* p; + int valint; + bool percent = false; + bool succp; + + /** + * Find out whether the value is numeric and ends with '%' or '\0' + */ + p = valstr; + + while(isdigit(*p)) p++; + + errno = 0; + + if (p == valstr || (*p != '%' && *p != '\0')) + { + succp = false; + } + else if (*p == '%') + { + if (*(p+1) == '\0') + { + *p = '\0'; + valint = (int) strtol(valstr, (char **)NULL, 10); + + if (valint == 0 && errno != 0) + { + succp = false; + } + else + { + succp = true; + config_set_qualified_param(param, (void *)&valint, PERCENT_TYPE); + } + } + else + { + succp = false; + } + } + else if (*p == '\0') + { + valint = (int) strtol(valstr, (char **)NULL, 10); + + if (valint == 0 && errno != 0) + { + succp = false; + } + else + { + succp = true; + config_set_qualified_param(param, (void *)&valint, COUNT_TYPE); + } + } + + if (succp) + { + service_add_qualified_param(service, param); /*< add param to svc */ + } + return succp; +} + +/** + * Add qualified config parameter to SERVICE struct. + */ +static void service_add_qualified_param( + SERVICE* svc, + CONFIG_PARAMETER* param) +{ + CONFIG_PARAMETER** p = &svc->svc_config_param; + + spinlock_acquire(&svc->spin); + + if ((*p) != NULL) + { + while ((*p)->next != NULL) *p = (*p)->next; + (*p)->next = config_clone_param(param); + } + else + { + (*p) = config_clone_param(param); + } + (*p)->next = NULL; + spinlock_release(&svc->spin); +} \ No newline at end of file diff --git a/server/include/config.h b/server/include/config.h index 88620f015..af91bbf5a 100644 --- a/server/include/config.h +++ b/server/include/config.h @@ -17,6 +17,7 @@ * * Copyright SkySQL Ab 2013 */ +#include /** * @file config.h The configuration handling elements @@ -30,12 +31,32 @@ * @endverbatim */ +/** + * Maximum length for configuration parameter value. + */ +enum {MAX_PARAM_LEN=256}; + +typedef enum { + UNDEFINED_TYPE=0, + STRING_TYPE, + COUNT_TYPE, + PERCENT_TYPE, + BOOL_TYPE +} config_param_type_t; + /** * The config parameter */ typedef struct config_parameter { char *name; /**< The name of the parameter */ - char *value; /**< The value of the parameter */ + char *value; /**< The value of the parameter */ + union { /*< qualified parameter value by type */ + char* valstr; /*< terminated char* array */ + int valcount; /*< int */ + int valpercent; /*< int */ + bool valbool; /*< bool */ + } qfd; + config_param_type_t qfd_param_type; struct config_parameter *next; /**< Next pointer in the linked list */ } CONFIG_PARAMETER; @@ -60,4 +81,18 @@ typedef struct { extern int config_load(char *); extern int config_reload(); extern int config_threadcount(); +CONFIG_PARAMETER* config_get_param(CONFIG_PARAMETER* params, const char* name); + +bool config_set_qualified_param( + CONFIG_PARAMETER* param, + void* val, + config_param_type_t type); + +CONFIG_PARAMETER* config_clone_param(CONFIG_PARAMETER* param); + +int config_get_valint( + CONFIG_PARAMETER* param, + const char* name, /*< if NULL examine current param only */ + config_param_type_t ptype); + #endif diff --git a/server/include/service.h b/server/include/service.h index d52a7eccf..ea44c7f37 100644 --- a/server/include/service.h +++ b/server/include/service.h @@ -22,6 +22,7 @@ #include #include #include +#include "config.h" /** * @file service.h @@ -114,6 +115,7 @@ typedef struct service { SERVICE_STATS stats; /**< The service statistics */ struct users *users; /**< The user data for this service */ int enable_root; /**< Allow root user access */ + CONFIG_PARAMETER* svc_config_param; /*< list of config params and values */ SPINLOCK users_table_spin; /**< The spinlock for users data refresh */ SERVICE_REFRESH_RATE @@ -121,6 +123,8 @@ typedef struct service { struct service *next; /**< The next service in the linked list */ } SERVICE; +typedef enum count_spec_t {COUNT_ATLEAST=0, COUNT_EXACT, COUNT_ATMOST} count_spec_t; + #define SERVICE_STATE_ALLOC 1 /**< The service has been allocated */ #define SERVICE_STATE_STARTED 2 /**< The service has been started */ @@ -146,4 +150,11 @@ extern int service_refresh_users(SERVICE *); extern void printService(SERVICE *); extern void printAllServices(); extern void dprintAllServices(DCB *); + +bool service_set_slave_conn_limit ( + SERVICE* service, + CONFIG_PARAMETER* param, + char* valstr, + count_spec_t count_spec); + #endif diff --git a/server/modules/include/readwritesplit.h b/server/modules/include/readwritesplit.h index ab17b2c35..9a63ab6f0 100644 --- a/server/modules/include/readwritesplit.h +++ b/server/modules/include/readwritesplit.h @@ -121,6 +121,12 @@ typedef struct backend { #endif } BACKEND; +typedef struct rwsplit_config_st { + int rw_max_slave_conn_percent; + int rw_max_slave_conn_count; +} rwsplit_config_t; + + /** * The client session structure used within this router. */ @@ -135,6 +141,7 @@ struct router_client_session { rses_property_t* rses_properties[RSES_PROP_TYPE_COUNT]; BACKEND* rses_master; /*< Pointer to master */ BACKEND** rses_backend; /*< All backends used by client session */ + rwsplit_config_t rses_config; /*< copied config info from router instance */ int rses_nbackends; int rses_capabilities; /*< input type, for example */ struct router_client_session* next; @@ -164,6 +171,7 @@ typedef struct router_instance { SPINLOCK lock; /*< Lock for the instance data */ BACKEND** servers; /*< Backend servers */ BACKEND* master; /*< NULL or pointer */ + rwsplit_config_t rwsplit_config; /*< expanded config info from SERVICE */ unsigned int bitmask; /*< Bitmask to apply to server->status */ unsigned int bitvalue; /*< Required value of server->status */ ROUTER_STATS stats; /*< Statistics for this router */ diff --git a/server/modules/routing/readconnroute.c b/server/modules/routing/readconnroute.c index f587b2c63..2bfba7efc 100644 --- a/server/modules/routing/readconnroute.c +++ b/server/modules/routing/readconnroute.c @@ -252,10 +252,12 @@ int i, n; } else { - LOGIF(LE, (skygw_log_write( - LOGFILE_ERROR, - "Warning : Unsupported router " - "option %s for readconnroute.", + LOGIF(LM, (skygw_log_write( + LOGFILE_MESSAGE, + "* Warning : Unsupported router " + "option \'%s\' for readconnroute. " + "Expected router options are " + "[slave|master|synced]", options[i]))); } } diff --git a/server/modules/routing/readwritesplit/readwritesplit.c b/server/modules/routing/readwritesplit/readwritesplit.c index c177d6d62..f056da724 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.c +++ b/server/modules/routing/readwritesplit/readwritesplit.c @@ -154,11 +154,7 @@ static GWBUF* sescmd_cursor_process_replies( GWBUF* replybuf, sescmd_cursor_t* scur); -static bool cont_exec_sescmd_in_backend( - ROUTER_CLIENT_SES* rses, - backend_type_t be_type); - -#if !defined(MAX95) +#if 0 /*< disabled for now due multiple slaves changes */ static void tracelog_routed_query( ROUTER_CLIENT_SES* rses, char* funcname, @@ -227,10 +223,12 @@ static ROUTER* createInstance( SERVICE* service, char** options) { - ROUTER_INSTANCE* router; - SERVER* server; - int nservers; - int i; + ROUTER_INSTANCE* router; + SERVER* server; + int nservers; + int i; + CONFIG_PARAMETER* param; + config_param_type_t paramtype; if ((router = calloc(1, sizeof(ROUTER_INSTANCE))) == NULL) { return NULL; @@ -327,6 +325,27 @@ static ROUTER* createInstance( } } } + /** + * Copy config parameter value from service struct. This becomes the + * default value for every new rwsplit router session. + */ + param = config_get_param(service->svc_config_param, "max_slave_connections"); + + if (param != NULL) + { + paramtype = config_get_paramtype(param); + + if (paramtype == COUNT_TYPE) + { + router->rwsplit_config.rw_max_slave_conn_count = + config_get_valint(param, NULL, paramtype); + } + else if (paramtype == PERCENT_TYPE) + { + router->rwsplit_config.rw_max_slave_conn_percent = + config_get_valint(param, NULL, paramtype); + } + } /** * We have completed the creation of the router data, so now * insert this router into the linked list of routers @@ -350,7 +369,6 @@ static ROUTER* createInstance( * @param session The session itself * @return Session specific data for this session */ - const int conf_max_nslaves = 2; /*< replaces configuration parameter until its developed */ static void* newSession( ROUTER* router_inst, @@ -363,7 +381,8 @@ static void* newSession( ROUTER_INSTANCE* router = (ROUTER_INSTANCE *)router_inst; bool succp; int router_nservers = 0; /*< # of servers in total */ - int max_nslaves; /*< max # of slaves used in this session */ + int max_nslaves; /*< max # of slaves used in this session */ + int conf_max_nslaves; /*< value from configuration file */ b = router->servers; @@ -376,7 +395,6 @@ static void* newSession( /** log */ goto return_rses; } - max_nslaves = MIN(router_nservers-1, MAX(1, conf_max_nslaves)); client_rses = (ROUTER_CLIENT_SES *)calloc(1, sizeof(ROUTER_CLIENT_SES)); if (client_rses == NULL) @@ -384,6 +402,23 @@ static void* newSession( ss_dassert(false); goto return_rses; } + /** Copy config struct from router instance */ + client_rses->rses_config = router->rwsplit_config; + + /** + * Either copy direct count of slave connections or calculate the count + * from percentage value. + */ + if (client_rses->rses_config.rw_max_slave_conn_count > 0) + { + conf_max_nslaves = client_rses->rses_config.rw_max_slave_conn_count; + } + else + { + conf_max_nslaves = + (router_nservers*client_rses->rses_config.rw_max_slave_conn_percent)/100; + } + max_nslaves = MIN(router_nservers-1, MAX(1, conf_max_nslaves)); pp_backend = (BACKEND **)calloc(1, (router_nservers)*sizeof(BACKEND *)); /** @@ -1033,7 +1068,7 @@ static void clientReply( } be = router_cli_ses->rses_backend; - while (be !=NULL) + while (*be !=NULL) { if ((*be)->be_dcb == backend_dcb) { @@ -1169,7 +1204,7 @@ static bool select_connect_backend_servers( router->bitmask))); if (SERVER_IS_RUNNING((*b)->backend_server) && - ((*b)->backend_server->status & router->bitmask == + (((*b)->backend_server->status & router->bitmask) == router->bitvalue)) { if (slaves_found < max_nslaves && @@ -1871,8 +1906,6 @@ static bool route_session_write( skygw_query_type_t qtype) { bool succp; - DCB* master_dcb; - DCB* slave_dcb; rses_property_t* prop; BACKEND** b; @@ -1894,8 +1927,7 @@ static bool route_session_write( if (packet_type == COM_QUIT) { int rc; - int rc2; - + succp = true; while (*b != NULL)