Added new parameter RW Split Router.max_slave_connections=[<int>|<int>%] which specifies the maximum number of slaves which read/write split router connects in each routing session.
Parameter it read from config file to CONFIG_CONTEXT's parameter list. It is qualified in service.c:service_set_slave_conn_limit and if qualified, the qualified integer value and the value type are copied to the CONFIG_PARAMETER structure. This CONFIG_PARAMETER struct is cloned (=copied to different memory area) and linked to RW Split SERVICE struct. When RW Split router_instance is created in readwritesplit.c:createInstance, the value is copied to (new) rwsplit_config_t structure from SERVICE's parameter list. When new routing session is created in readwritesplit.c:newSession, the rwsplit_config_t structure is copied to ROUTER_CLIENT_SES struct and the actual max_nslaves value is calculated from the config value (if percentage is used). Tests and many error handling branches are missing but functionality seems to be working.
This commit is contained in:
parent
04313caf82
commit
28bc3509cc
@ -195,6 +195,8 @@ int error_count = 0;
|
||||
"router");
|
||||
if (router)
|
||||
{
|
||||
char* max_slave_conn_str;
|
||||
|
||||
obj->element = service_alloc(obj->object, router);
|
||||
char *user =
|
||||
config_get_value(obj->parameters, "user");
|
||||
@ -203,6 +205,10 @@ int error_count = 0;
|
||||
char *enable_root_user =
|
||||
config_get_value(obj->parameters, "enable_root_user");
|
||||
|
||||
max_slave_conn_str =
|
||||
config_get_value(obj->parameters,
|
||||
"max_slave_connections");
|
||||
|
||||
if (enable_root_user)
|
||||
serviceEnableRootUser(obj->element, atoi(enable_root_user));
|
||||
|
||||
@ -222,6 +228,35 @@ int error_count = 0;
|
||||
"corresponding password.",
|
||||
obj->object)));
|
||||
}
|
||||
if (max_slave_conn_str != NULL)
|
||||
{
|
||||
CONFIG_PARAMETER* param;
|
||||
bool succp;
|
||||
|
||||
param = config_get_param(obj->parameters,
|
||||
"max_slave_connections");
|
||||
|
||||
succp = service_set_slave_conn_limit(
|
||||
obj->element,
|
||||
param,
|
||||
max_slave_conn_str,
|
||||
COUNT_ATMOST);
|
||||
|
||||
if (!succp)
|
||||
{
|
||||
LOGIF(LM, (skygw_log_write(
|
||||
LOGFILE_MESSAGE,
|
||||
"* Warning : invalid value type "
|
||||
"for parameter \'%s.%s = %s\'\n\tExpected "
|
||||
"type is either <int> for slave connection "
|
||||
"count or\n\t<int>%% for specifying the "
|
||||
"maximum percentage of available the "
|
||||
"slaves that will be connected.",
|
||||
((SERVICE*)obj->element)->name,
|
||||
param->name,
|
||||
param->value)));
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -515,6 +550,89 @@ config_get_value(CONFIG_PARAMETER *params, const char *name)
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
CONFIG_PARAMETER* config_get_param(
|
||||
CONFIG_PARAMETER* params,
|
||||
const char* name)
|
||||
{
|
||||
while (params)
|
||||
{
|
||||
if (!strcmp(params->name, name))
|
||||
return params;
|
||||
params = params->next;
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
config_param_type_t config_get_paramtype(
|
||||
CONFIG_PARAMETER* param)
|
||||
{
|
||||
return param->qfd_param_type;
|
||||
}
|
||||
|
||||
int config_get_valint(
|
||||
CONFIG_PARAMETER* param,
|
||||
const char* name, /*< if NULL examine current param only */
|
||||
config_param_type_t ptype)
|
||||
{
|
||||
int val = -1; /*< -1 indicates failure */
|
||||
|
||||
while (param)
|
||||
{
|
||||
if (name == NULL || !strncmp(param->name, name, MAX_PARAM_LEN))
|
||||
{
|
||||
switch (ptype) {
|
||||
case COUNT_TYPE:
|
||||
val = param->qfd.valcount;
|
||||
goto return_val;
|
||||
|
||||
case PERCENT_TYPE:
|
||||
val = param->qfd.valpercent;
|
||||
goto return_val;
|
||||
|
||||
case BOOL_TYPE:
|
||||
val = param->qfd.valbool;
|
||||
goto return_val;
|
||||
|
||||
default:
|
||||
goto return_val;
|
||||
}
|
||||
}
|
||||
else if (name == NULL)
|
||||
{
|
||||
goto return_val;
|
||||
}
|
||||
param = param->next;
|
||||
}
|
||||
return_val:
|
||||
return val;
|
||||
}
|
||||
|
||||
|
||||
CONFIG_PARAMETER* config_clone_param(
|
||||
CONFIG_PARAMETER* param)
|
||||
{
|
||||
CONFIG_PARAMETER* p2;
|
||||
|
||||
p2 = (CONFIG_PARAMETER*) malloc(sizeof(CONFIG_PARAMETER));
|
||||
|
||||
if (p2 == NULL)
|
||||
{
|
||||
goto return_p2;
|
||||
}
|
||||
memcpy(p2, param, sizeof(CONFIG_PARAMETER));
|
||||
p2->name = strndup(param->name, MAX_PARAM_LEN);
|
||||
p2->value = strndup(param->value, MAX_PARAM_LEN);
|
||||
|
||||
if (param->qfd_param_type == STRING_TYPE)
|
||||
{
|
||||
p2->qfd.valstr = strndup(param->qfd.valstr, MAX_PARAM_LEN);
|
||||
}
|
||||
|
||||
return_p2:
|
||||
return p2;
|
||||
}
|
||||
|
||||
/**
|
||||
* Free a config tree
|
||||
*
|
||||
@ -861,6 +979,7 @@ static char *service_params[] =
|
||||
"user",
|
||||
"passwd",
|
||||
"enable_root_user",
|
||||
"max_slave_connections",
|
||||
NULL
|
||||
};
|
||||
|
||||
@ -950,3 +1069,47 @@ int i;
|
||||
obj = obj->next;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Set qualified parameter value to CONFIG_PARAMETER struct.
|
||||
*/
|
||||
bool config_set_qualified_param(
|
||||
CONFIG_PARAMETER* param,
|
||||
void* val,
|
||||
config_param_type_t type)
|
||||
{
|
||||
bool succp;
|
||||
|
||||
switch (type) {
|
||||
case STRING_TYPE:
|
||||
param->qfd.valstr = strndup((const char *)val, MAX_PARAM_LEN);
|
||||
succp = true;
|
||||
break;
|
||||
|
||||
case COUNT_TYPE:
|
||||
param->qfd.valcount = *(int *)val;
|
||||
succp = true;
|
||||
break;
|
||||
|
||||
case PERCENT_TYPE:
|
||||
param->qfd.valpercent = *(int *)val;
|
||||
succp = true;
|
||||
break;
|
||||
|
||||
case BOOL_TYPE:
|
||||
param->qfd.valbool = *(bool *)val;
|
||||
succp = true;
|
||||
break;
|
||||
|
||||
default:
|
||||
succp = false;
|
||||
break;
|
||||
}
|
||||
|
||||
if (succp)
|
||||
{
|
||||
param->qfd_param_type = type;
|
||||
}
|
||||
return succp;
|
||||
}
|
||||
|
||||
|
@ -34,6 +34,8 @@
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <ctype.h>
|
||||
#include <errno.h>
|
||||
#include <session.h>
|
||||
#include <service.h>
|
||||
#include <server.h>
|
||||
@ -52,6 +54,11 @@ extern int lm_enabled_logfiles_bitmask;
|
||||
static SPINLOCK service_spin = SPINLOCK_INIT;
|
||||
static SERVICE *allServices = NULL;
|
||||
|
||||
static void service_add_qualified_param(
|
||||
SERVICE* svc,
|
||||
CONFIG_PARAMETER* param);
|
||||
|
||||
|
||||
/**
|
||||
* Allocate a new service for the gateway to support
|
||||
*
|
||||
@ -752,3 +759,95 @@ int service_refresh_users(SERVICE *service) {
|
||||
else
|
||||
return 1;
|
||||
}
|
||||
|
||||
bool service_set_slave_conn_limit (
|
||||
SERVICE* service,
|
||||
CONFIG_PARAMETER* param,
|
||||
char* valstr,
|
||||
count_spec_t count_spec)
|
||||
{
|
||||
char* p;
|
||||
int valint;
|
||||
bool percent = false;
|
||||
bool succp;
|
||||
|
||||
/**
|
||||
* Find out whether the value is numeric and ends with '%' or '\0'
|
||||
*/
|
||||
p = valstr;
|
||||
|
||||
while(isdigit(*p)) p++;
|
||||
|
||||
errno = 0;
|
||||
|
||||
if (p == valstr || (*p != '%' && *p != '\0'))
|
||||
{
|
||||
succp = false;
|
||||
}
|
||||
else if (*p == '%')
|
||||
{
|
||||
if (*(p+1) == '\0')
|
||||
{
|
||||
*p = '\0';
|
||||
valint = (int) strtol(valstr, (char **)NULL, 10);
|
||||
|
||||
if (valint == 0 && errno != 0)
|
||||
{
|
||||
succp = false;
|
||||
}
|
||||
else
|
||||
{
|
||||
succp = true;
|
||||
config_set_qualified_param(param, (void *)&valint, PERCENT_TYPE);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
succp = false;
|
||||
}
|
||||
}
|
||||
else if (*p == '\0')
|
||||
{
|
||||
valint = (int) strtol(valstr, (char **)NULL, 10);
|
||||
|
||||
if (valint == 0 && errno != 0)
|
||||
{
|
||||
succp = false;
|
||||
}
|
||||
else
|
||||
{
|
||||
succp = true;
|
||||
config_set_qualified_param(param, (void *)&valint, COUNT_TYPE);
|
||||
}
|
||||
}
|
||||
|
||||
if (succp)
|
||||
{
|
||||
service_add_qualified_param(service, param); /*< add param to svc */
|
||||
}
|
||||
return succp;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add qualified config parameter to SERVICE struct.
|
||||
*/
|
||||
static void service_add_qualified_param(
|
||||
SERVICE* svc,
|
||||
CONFIG_PARAMETER* param)
|
||||
{
|
||||
CONFIG_PARAMETER** p = &svc->svc_config_param;
|
||||
|
||||
spinlock_acquire(&svc->spin);
|
||||
|
||||
if ((*p) != NULL)
|
||||
{
|
||||
while ((*p)->next != NULL) *p = (*p)->next;
|
||||
(*p)->next = config_clone_param(param);
|
||||
}
|
||||
else
|
||||
{
|
||||
(*p) = config_clone_param(param);
|
||||
}
|
||||
(*p)->next = NULL;
|
||||
spinlock_release(&svc->spin);
|
||||
}
|
@ -17,6 +17,7 @@
|
||||
*
|
||||
* Copyright SkySQL Ab 2013
|
||||
*/
|
||||
#include <skygw_utils.h>
|
||||
|
||||
/**
|
||||
* @file config.h The configuration handling elements
|
||||
@ -30,12 +31,32 @@
|
||||
* @endverbatim
|
||||
*/
|
||||
|
||||
/**
|
||||
* Maximum length for configuration parameter value.
|
||||
*/
|
||||
enum {MAX_PARAM_LEN=256};
|
||||
|
||||
typedef enum {
|
||||
UNDEFINED_TYPE=0,
|
||||
STRING_TYPE,
|
||||
COUNT_TYPE,
|
||||
PERCENT_TYPE,
|
||||
BOOL_TYPE
|
||||
} config_param_type_t;
|
||||
|
||||
/**
|
||||
* The config parameter
|
||||
*/
|
||||
typedef struct config_parameter {
|
||||
char *name; /**< The name of the parameter */
|
||||
char *value; /**< The value of the parameter */
|
||||
char *value; /**< The value of the parameter */
|
||||
union { /*< qualified parameter value by type */
|
||||
char* valstr; /*< terminated char* array */
|
||||
int valcount; /*< int */
|
||||
int valpercent; /*< int */
|
||||
bool valbool; /*< bool */
|
||||
} qfd;
|
||||
config_param_type_t qfd_param_type;
|
||||
struct config_parameter *next; /**< Next pointer in the linked list */
|
||||
} CONFIG_PARAMETER;
|
||||
|
||||
@ -60,4 +81,18 @@ typedef struct {
|
||||
extern int config_load(char *);
|
||||
extern int config_reload();
|
||||
extern int config_threadcount();
|
||||
CONFIG_PARAMETER* config_get_param(CONFIG_PARAMETER* params, const char* name);
|
||||
|
||||
bool config_set_qualified_param(
|
||||
CONFIG_PARAMETER* param,
|
||||
void* val,
|
||||
config_param_type_t type);
|
||||
|
||||
CONFIG_PARAMETER* config_clone_param(CONFIG_PARAMETER* param);
|
||||
|
||||
int config_get_valint(
|
||||
CONFIG_PARAMETER* param,
|
||||
const char* name, /*< if NULL examine current param only */
|
||||
config_param_type_t ptype);
|
||||
|
||||
#endif
|
||||
|
@ -22,6 +22,7 @@
|
||||
#include <spinlock.h>
|
||||
#include <dcb.h>
|
||||
#include <server.h>
|
||||
#include "config.h"
|
||||
|
||||
/**
|
||||
* @file service.h
|
||||
@ -114,6 +115,7 @@ typedef struct service {
|
||||
SERVICE_STATS stats; /**< The service statistics */
|
||||
struct users *users; /**< The user data for this service */
|
||||
int enable_root; /**< Allow root user access */
|
||||
CONFIG_PARAMETER* svc_config_param; /*< list of config params and values */
|
||||
SPINLOCK
|
||||
users_table_spin; /**< The spinlock for users data refresh */
|
||||
SERVICE_REFRESH_RATE
|
||||
@ -121,6 +123,8 @@ typedef struct service {
|
||||
struct service *next; /**< The next service in the linked list */
|
||||
} SERVICE;
|
||||
|
||||
typedef enum count_spec_t {COUNT_ATLEAST=0, COUNT_EXACT, COUNT_ATMOST} count_spec_t;
|
||||
|
||||
#define SERVICE_STATE_ALLOC 1 /**< The service has been allocated */
|
||||
#define SERVICE_STATE_STARTED 2 /**< The service has been started */
|
||||
|
||||
@ -146,4 +150,11 @@ extern int service_refresh_users(SERVICE *);
|
||||
extern void printService(SERVICE *);
|
||||
extern void printAllServices();
|
||||
extern void dprintAllServices(DCB *);
|
||||
|
||||
bool service_set_slave_conn_limit (
|
||||
SERVICE* service,
|
||||
CONFIG_PARAMETER* param,
|
||||
char* valstr,
|
||||
count_spec_t count_spec);
|
||||
|
||||
#endif
|
||||
|
@ -121,6 +121,12 @@ typedef struct backend {
|
||||
#endif
|
||||
} BACKEND;
|
||||
|
||||
typedef struct rwsplit_config_st {
|
||||
int rw_max_slave_conn_percent;
|
||||
int rw_max_slave_conn_count;
|
||||
} rwsplit_config_t;
|
||||
|
||||
|
||||
/**
|
||||
* The client session structure used within this router.
|
||||
*/
|
||||
@ -135,6 +141,7 @@ struct router_client_session {
|
||||
rses_property_t* rses_properties[RSES_PROP_TYPE_COUNT];
|
||||
BACKEND* rses_master; /*< Pointer to master */
|
||||
BACKEND** rses_backend; /*< All backends used by client session */
|
||||
rwsplit_config_t rses_config; /*< copied config info from router instance */
|
||||
int rses_nbackends;
|
||||
int rses_capabilities; /*< input type, for example */
|
||||
struct router_client_session* next;
|
||||
@ -164,6 +171,7 @@ typedef struct router_instance {
|
||||
SPINLOCK lock; /*< Lock for the instance data */
|
||||
BACKEND** servers; /*< Backend servers */
|
||||
BACKEND* master; /*< NULL or pointer */
|
||||
rwsplit_config_t rwsplit_config; /*< expanded config info from SERVICE */
|
||||
unsigned int bitmask; /*< Bitmask to apply to server->status */
|
||||
unsigned int bitvalue; /*< Required value of server->status */
|
||||
ROUTER_STATS stats; /*< Statistics for this router */
|
||||
|
@ -252,10 +252,12 @@ int i, n;
|
||||
}
|
||||
else
|
||||
{
|
||||
LOGIF(LE, (skygw_log_write(
|
||||
LOGFILE_ERROR,
|
||||
"Warning : Unsupported router "
|
||||
"option %s for readconnroute.",
|
||||
LOGIF(LM, (skygw_log_write(
|
||||
LOGFILE_MESSAGE,
|
||||
"* Warning : Unsupported router "
|
||||
"option \'%s\' for readconnroute. "
|
||||
"Expected router options are "
|
||||
"[slave|master|synced]",
|
||||
options[i])));
|
||||
}
|
||||
}
|
||||
|
@ -154,11 +154,7 @@ static GWBUF* sescmd_cursor_process_replies(
|
||||
GWBUF* replybuf,
|
||||
sescmd_cursor_t* scur);
|
||||
|
||||
static bool cont_exec_sescmd_in_backend(
|
||||
ROUTER_CLIENT_SES* rses,
|
||||
backend_type_t be_type);
|
||||
|
||||
#if !defined(MAX95)
|
||||
#if 0 /*< disabled for now due multiple slaves changes */
|
||||
static void tracelog_routed_query(
|
||||
ROUTER_CLIENT_SES* rses,
|
||||
char* funcname,
|
||||
@ -227,10 +223,12 @@ static ROUTER* createInstance(
|
||||
SERVICE* service,
|
||||
char** options)
|
||||
{
|
||||
ROUTER_INSTANCE* router;
|
||||
SERVER* server;
|
||||
int nservers;
|
||||
int i;
|
||||
ROUTER_INSTANCE* router;
|
||||
SERVER* server;
|
||||
int nservers;
|
||||
int i;
|
||||
CONFIG_PARAMETER* param;
|
||||
config_param_type_t paramtype;
|
||||
|
||||
if ((router = calloc(1, sizeof(ROUTER_INSTANCE))) == NULL) {
|
||||
return NULL;
|
||||
@ -327,6 +325,27 @@ static ROUTER* createInstance(
|
||||
}
|
||||
}
|
||||
}
|
||||
/**
|
||||
* Copy config parameter value from service struct. This becomes the
|
||||
* default value for every new rwsplit router session.
|
||||
*/
|
||||
param = config_get_param(service->svc_config_param, "max_slave_connections");
|
||||
|
||||
if (param != NULL)
|
||||
{
|
||||
paramtype = config_get_paramtype(param);
|
||||
|
||||
if (paramtype == COUNT_TYPE)
|
||||
{
|
||||
router->rwsplit_config.rw_max_slave_conn_count =
|
||||
config_get_valint(param, NULL, paramtype);
|
||||
}
|
||||
else if (paramtype == PERCENT_TYPE)
|
||||
{
|
||||
router->rwsplit_config.rw_max_slave_conn_percent =
|
||||
config_get_valint(param, NULL, paramtype);
|
||||
}
|
||||
}
|
||||
/**
|
||||
* We have completed the creation of the router data, so now
|
||||
* insert this router into the linked list of routers
|
||||
@ -350,7 +369,6 @@ static ROUTER* createInstance(
|
||||
* @param session The session itself
|
||||
* @return Session specific data for this session
|
||||
*/
|
||||
const int conf_max_nslaves = 2; /*< replaces configuration parameter until its developed */
|
||||
|
||||
static void* newSession(
|
||||
ROUTER* router_inst,
|
||||
@ -363,7 +381,8 @@ static void* newSession(
|
||||
ROUTER_INSTANCE* router = (ROUTER_INSTANCE *)router_inst;
|
||||
bool succp;
|
||||
int router_nservers = 0; /*< # of servers in total */
|
||||
int max_nslaves; /*< max # of slaves used in this session */
|
||||
int max_nslaves; /*< max # of slaves used in this session */
|
||||
int conf_max_nslaves; /*< value from configuration file */
|
||||
|
||||
b = router->servers;
|
||||
|
||||
@ -376,7 +395,6 @@ static void* newSession(
|
||||
/** log */
|
||||
goto return_rses;
|
||||
}
|
||||
max_nslaves = MIN(router_nservers-1, MAX(1, conf_max_nslaves));
|
||||
client_rses = (ROUTER_CLIENT_SES *)calloc(1, sizeof(ROUTER_CLIENT_SES));
|
||||
|
||||
if (client_rses == NULL)
|
||||
@ -384,6 +402,23 @@ static void* newSession(
|
||||
ss_dassert(false);
|
||||
goto return_rses;
|
||||
}
|
||||
/** Copy config struct from router instance */
|
||||
client_rses->rses_config = router->rwsplit_config;
|
||||
|
||||
/**
|
||||
* Either copy direct count of slave connections or calculate the count
|
||||
* from percentage value.
|
||||
*/
|
||||
if (client_rses->rses_config.rw_max_slave_conn_count > 0)
|
||||
{
|
||||
conf_max_nslaves = client_rses->rses_config.rw_max_slave_conn_count;
|
||||
}
|
||||
else
|
||||
{
|
||||
conf_max_nslaves =
|
||||
(router_nservers*client_rses->rses_config.rw_max_slave_conn_percent)/100;
|
||||
}
|
||||
max_nslaves = MIN(router_nservers-1, MAX(1, conf_max_nslaves));
|
||||
pp_backend = (BACKEND **)calloc(1, (router_nservers)*sizeof(BACKEND *));
|
||||
|
||||
/**
|
||||
@ -1033,7 +1068,7 @@ static void clientReply(
|
||||
}
|
||||
be = router_cli_ses->rses_backend;
|
||||
|
||||
while (be !=NULL)
|
||||
while (*be !=NULL)
|
||||
{
|
||||
if ((*be)->be_dcb == backend_dcb)
|
||||
{
|
||||
@ -1169,7 +1204,7 @@ static bool select_connect_backend_servers(
|
||||
router->bitmask)));
|
||||
|
||||
if (SERVER_IS_RUNNING((*b)->backend_server) &&
|
||||
((*b)->backend_server->status & router->bitmask ==
|
||||
(((*b)->backend_server->status & router->bitmask) ==
|
||||
router->bitvalue))
|
||||
{
|
||||
if (slaves_found < max_nslaves &&
|
||||
@ -1871,8 +1906,6 @@ static bool route_session_write(
|
||||
skygw_query_type_t qtype)
|
||||
{
|
||||
bool succp;
|
||||
DCB* master_dcb;
|
||||
DCB* slave_dcb;
|
||||
rses_property_t* prop;
|
||||
BACKEND** b;
|
||||
|
||||
@ -1894,8 +1927,7 @@ static bool route_session_write(
|
||||
if (packet_type == COM_QUIT)
|
||||
{
|
||||
int rc;
|
||||
int rc2;
|
||||
|
||||
|
||||
succp = true;
|
||||
|
||||
while (*b != NULL)
|
||||
|
Loading…
x
Reference in New Issue
Block a user