Added consistency check for detect_replication_lag and max_slave_replication_lag and router_options=slave_select_criteria=LEAST_BEHIND_MASTER

Fixed a bug in service_add_qualified_param which didn't handle param list correctly.
This commit is contained in:
VilhoRaatikka 2014-07-01 16:54:16 +03:00
parent 3b6b33b7dd
commit 8ffca16368
4 changed files with 179 additions and 78 deletions

View File

@ -349,7 +349,7 @@ int error_count = 0;
param->name,
param->value)));
}
}
}
}
else
{
@ -721,7 +721,12 @@ int error_count = 0;
}
obj = obj->next;
}
} /*< while */
/** TODO: consistency check function */
/**
* error_count += consistency_checks();
*/
if (error_count)
{

View File

@ -1008,7 +1008,6 @@ bool service_set_param_value (
{
char* p;
int valint;
bool percent = false;
bool succp;
/**
@ -1082,37 +1081,58 @@ bool service_set_param_value (
static void service_add_qualified_param(
SERVICE* svc,
CONFIG_PARAMETER* param)
{
CONFIG_PARAMETER** p;
{
spinlock_acquire(&svc->spin);
p = &svc->svc_config_param;
if ((*p) != NULL)
if (svc->svc_config_param == NULL)
{
do
svc->svc_config_param = config_clone_param(param);
svc->svc_config_param->next = NULL;
}
else
{
CONFIG_PARAMETER* p = svc->svc_config_param;
CONFIG_PARAMETER* prev = NULL;
while (true)
{
/** If duplicate is found, latter remains */
CONFIG_PARAMETER* old;
/** Replace existing parameter in the list, free old */
if (strncasecmp(param->name,
(*p)->name,
p->name,
strlen(param->name)) == 0)
{
*p = config_clone_param(param);
{
old = p;
p = config_clone_param(param);
p->next = old->next;
if (prev != NULL)
{
prev->next = p;
}
else
{
svc->svc_config_param = p;
}
free(old);
break;
}
}
while ((*p)->next != NULL);
(*p)->next = config_clone_param(param);
}
else
{
(*p) = config_clone_param(param);
prev = p;
p = p->next;
/** Hit end of the list, add new parameter */
if (p == NULL)
{
p = config_clone_param(param);
prev->next = p;
p->next = NULL;
break;
}
}
}
/** Increment service's configuration version */
atomic_add(&svc->svc_config_version, 1);
(*p)->next = NULL;
spinlock_release(&svc->spin);
}

View File

@ -99,6 +99,13 @@ static backend_ref_t* get_bref_from_dcb(ROUTER_CLIENT_SES* rses, DCB* dcb);
static uint8_t getCapabilities (ROUTER* inst, void* router_session);
#if defined(NOT_USED)
static bool router_option_configured(
ROUTER_INSTANCE* router,
const char* optionstr,
void* data);
#endif
#if defined(PREP_STMT_CACHING)
static prep_stmt_t* prep_stmt_init(prep_stmt_type_t type, void* id);
static void prep_stmt_done(prep_stmt_t* pstmt);
@ -120,7 +127,10 @@ int bref_cmp_current_load(
const void* bref1,
const void* bref2);
/**
* The order of functions _must_ match with the order the select criteria are
* listed in select_criteria_t definition in readwritesplit.h
*/
int (*criteria_cmpfun[LAST_CRITERIA])(const void*, const void*)=
{
NULL,
@ -145,7 +155,7 @@ static bool get_dcb(
ROUTER_CLIENT_SES* rses,
backend_type_t btype);
static void rwsplit_process_options(
static void rwsplit_process_router_options(
ROUTER_INSTANCE* router,
char** options);
@ -305,6 +315,9 @@ static void refreshInstance(
CONFIG_PARAMETER* param)
{
config_param_type_t paramtype;
bool rlag_enabled = false;
bool rlag_limited = false;
paramtype = config_get_paramtype(param);
@ -322,8 +335,17 @@ static void refreshInstance(
{
router->rwsplit_config.rw_max_slave_replication_lag =
config_get_valint(param, NULL, paramtype);
if (router->rwsplit_config.rw_max_slave_replication_lag > 0)
{
rlag_limited = true;
}
}
}
else if (strncmp(param->name, "detect_replication_lag", MAX_PARAM_LEN) == 0)
{
rlag_enabled = (bool)config_get_valint(param, NULL, paramtype);
}
}
else if (paramtype == PERCENT_TYPE)
{
if (strncmp(param->name, "max_slave_connections", MAX_PARAM_LEN) == 0)
@ -333,9 +355,40 @@ static void refreshInstance(
config_get_valint(param, NULL, paramtype);
}
}
/**
* If replication lag detection is not enabled the measure can't be
* used in slave selection.
*/
if (!rlag_enabled)
{
if (rlag_limited)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Warning : Configuration Failed, max_slave_replication_lag "
"is set to %d,\n\t\t but detect_replication_lag "
"is not enabled. Replication lag will not be checked.",
router->rwsplit_config.rw_max_slave_replication_lag)));
}
if (router->rwsplit_config.rw_slave_select_criteria ==
LEAST_BEHIND_MASTER)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Warning : Configuration Failed, router option "
"\n\t\t slave_selection_criteria=LEAST_BEHIND_MASTER "
"is specified, but detect_replication_lag "
"is not enabled.\n\t\t "
"slave_selection_criteria=%s will be used instead.",
STRCRITERIA(DEFAULT_CRITERIA))));
router->rwsplit_config.rw_slave_select_criteria =
DEFAULT_CRITERIA;
}
}
}
/**
* Create an instance of read/write statement router within the MaxScale.
*
@ -419,9 +472,11 @@ static ROUTER* createInstance(
*/
router->bitmask = 0;
router->bitvalue = 0;
/** Call this before refreshInstance */
if (options)
{
rwsplit_process_options(router, options);
rwsplit_process_router_options(router, options);
}
/**
* Set default value for max_slave_connections and for slave selection
@ -443,7 +498,6 @@ static ROUTER* createInstance(
if (param != NULL)
{
refreshInstance(router, param);
router->rwsplit_version = service->svc_config_version;
}
/**
* Read default value for slave replication lag upper limit and then
@ -455,8 +509,8 @@ static ROUTER* createInstance(
if (param != NULL)
{
refreshInstance(router, param);
router->rwsplit_version = service->svc_config_version;
}
router->rwsplit_version = service->svc_config_version;
/**
* We have completed the creation of the router data, so now
* insert this router into the linked list of routers
@ -523,7 +577,8 @@ static void* newSession(
}
router->rwsplit_version = router->service->svc_config_version;
/** Read options */
rwsplit_process_options(router, router->service->routerOptions);
rwsplit_process_router_options(router, router->service->routerOptions);
}
/** Copy config struct from router instance */
client_rses->rses_config = router->rwsplit_config;
@ -1473,7 +1528,7 @@ lock_failed:
return;
}
/** Compare nunmber of connections from this router in backend servers */
int bref_cmp_router_conn(
const void* bref1,
const void* bref2)
@ -1485,6 +1540,7 @@ int bref_cmp_router_conn(
((b1->backend_conn_count > b2->backend_conn_count) ? 1 : 0));
}
/** Compare nunmber of global connections in backend servers */
int bref_cmp_global_conn(
const void* bref1,
const void* bref2)
@ -1497,13 +1553,19 @@ int bref_cmp_global_conn(
}
/** Compare relication lag between backend servers */
int bref_cmp_behind_master(
const void* bref1,
const void* bref2)
{
return 1;
BACKEND* b1 = ((backend_ref_t *)bref1)->bref_backend;
BACKEND* b2 = ((backend_ref_t *)bref2)->bref_backend;
return ((b1->backend_server->rlag < b2->backend_server->rlag) ? -1 :
((b1->backend_server->rlag > b2->backend_server->rlag) ? 1 : 0));
}
/** Compare nunmber of current operations in backend servers */
int bref_cmp_current_load(
const void* bref1,
const void* bref2)
@ -1515,7 +1577,6 @@ int bref_cmp_current_load(
((s1->stats.n_current_ops > s2->stats.n_current_ops) ? 1 : 0));
}
static void bref_clear_state(
backend_ref_t* bref,
bref_state_t state)
@ -1536,15 +1597,7 @@ static void bref_clear_state(
/** Decrease global operation count */
prev2 = atomic_add(
&bref->bref_backend->backend_server->stats.n_current_ops, -1);
ss_dassert(prev2 > 0);
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Current waiters %d and ops %d after in %s:%d",
prev1-1,
prev2-1,
bref->bref_backend->backend_server->name,
bref->bref_backend->backend_server->port)));
ss_dassert(prev2 > 0);
}
}
@ -1568,24 +1621,8 @@ static void bref_set_state(
/** Increase global operation count */
prev2 = atomic_add(
&bref->bref_backend->backend_server->stats.n_current_ops, 1);
ss_dassert(prev2 >= 0);
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Current waiters %d and ops %d before in %s:%d",
prev1,
prev2,
bref->bref_backend->backend_server->name,
bref->bref_backend->backend_server->port)));
ss_dassert(prev2 >= 0);
}
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [bref_set_state] Set state %d for %s:%d fd %d",
pthread_self(),
bref->bref_state,
bref->bref_backend->backend_server->name,
bref->bref_backend->backend_server->port,
bref->bref_dcb->fd)));
}
/**
@ -1732,7 +1769,9 @@ static bool select_connect_backend_servers(
if (LOG_IS_ENABLED(LOGFILE_TRACE))
{
if (select_criteria == LEAST_GLOBAL_CONNECTIONS ||
select_criteria == LEAST_ROUTER_CONNECTIONS)
select_criteria == LEAST_ROUTER_CONNECTIONS ||
select_criteria == LEAST_BEHIND_MASTER ||
select_criteria == LEAST_CURRENT_OPERATIONS)
{
LOGIF(LT, (skygw_log_write(LOGFILE_TRACE,
"Servers and %s connection counts:",
@ -1746,7 +1785,7 @@ static bool select_connect_backend_servers(
switch(select_criteria) {
case LEAST_GLOBAL_CONNECTIONS:
LOGIF(LT, (skygw_log_write_flush(LOGFILE_TRACE,
"%s %d:%d",
"%s:%d MaxScale connections : %d",
b->backend_server->name,
b->backend_server->port,
b->backend_server->stats.n_current)));
@ -1754,7 +1793,7 @@ static bool select_connect_backend_servers(
case LEAST_ROUTER_CONNECTIONS:
LOGIF(LT, (skygw_log_write_flush(LOGFILE_TRACE,
"%s %d:%d",
"%s:%d RWSplit connections : %d",
b->backend_server->name,
b->backend_server->port,
b->backend_conn_count)));
@ -1762,11 +1801,18 @@ static bool select_connect_backend_servers(
case LEAST_CURRENT_OPERATIONS:
LOGIF(LT, (skygw_log_write_flush(LOGFILE_TRACE,
"%s %d:%d",
"%s:%d current operations : %d",
b->backend_server->name,
b->backend_server->port,
b->backend_server->stats.n_current_ops)));
break;
case LEAST_BEHIND_MASTER:
LOGIF(LT, (skygw_log_write_flush(LOGFILE_TRACE,
"%s:%d replication lag : %d",
b->backend_server->name,
b->backend_server->port,
b->backend_server->rlag)));
default:
break;
}
@ -1782,15 +1828,6 @@ static bool select_connect_backend_servers(
i++)
{
BACKEND* b = backend_ref[i].bref_backend;
LOGIF(LT, (skygw_log_write_flush(
LOGFILE_TRACE,
"Examine server "
"%s:%d %s with %d active operations.",
b->backend_server->name,
b->backend_server->port,
STRSRVSTATUS(b->backend_server),
b->backend_server->stats.n_current_ops)));
if (SERVER_IS_RUNNING(b->backend_server) &&
((b->backend_server->status & router->bitmask) ==
@ -2885,7 +2922,44 @@ return_succp:
return succp;
}
static void rwsplit_process_options(
#if defined(NOT_USED)
static bool router_option_configured(
ROUTER_INSTANCE* router,
const char* optionstr,
void* data)
{
bool succp = false;
char** option;
option = router->service->routerOptions;
while (option != NULL)
{
char* value;
if ((value = strchr(options[i], '=')) == NULL)
{
break;
}
else
{
*value = 0;
value++;
if (strcmp(options[i], "slave_selection_criteria") == 0)
{
if (GET_SELECT_CRITERIA(value) == (select_criteria_t *)*data)
{
succp = true;
break;
}
}
}
}
return succp;
}
#endif
static void rwsplit_process_router_options(
ROUTER_INSTANCE* router,
char** options)
{
@ -2922,9 +2996,10 @@ static void rwsplit_process_options(
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR, "Warning : Unknown "
"slave selection criteria \"%s\". "
"Allowed values are \"LEAST_GLOBAL_CONNECTIONS\", "
"Allowed values are LEAST_GLOBAL_CONNECTIONS, "
"LEAST_ROUTER_CONNECTIONS, "
"and \"LEAST_CURRENT_OPERATIONS\".",
"LEAST_BEHIND_MASTER,"
"and LEAST_CURRENT_OPERATIONS.",
STRCRITERIA(router->rwsplit_config.rw_slave_select_criteria))));
}
else

View File

@ -228,7 +228,8 @@ typedef enum skygw_chk_t {
#define STRCRITERIA(c) ((c) == UNDEFINED_CRITERIA ? "UNDEFINED_CRITERIA" : \
((c) == LEAST_GLOBAL_CONNECTIONS ? "LEAST_GLOBAL_CONNECTIONS" : \
((c) == LEAST_ROUTER_CONNECTIONS ? "LEAST_ROUTER_CONNECTIONS" : \
((c) == LEAST_BEHIND_MASTER ? "LEAST_BEHIND_MASTER" : "Unknown criteria"))))
((c) == LEAST_BEHIND_MASTER ? "LEAST_BEHIND_MASTER" : \
((c) == LEAST_CURRENT_OPERATIONS ? "LEAST_CURRENT_OPERATIONS" : "Unknown criteria")))))
#define STRSRVSTATUS(s) ((SERVER_IS_RUNNING(s) && SERVER_IS_MASTER(s)) ? "RUNNING MASTER" : \
((SERVER_IS_RUNNING(s) && SERVER_IS_SLAVE(s)) ? "RUNNING SLAVE" : \