Merge pull request #10 from skysql/MAX-94

Max 94
This commit is contained in:
Vilho Raatikka
2014-07-01 17:16:48 +03:00
4 changed files with 180 additions and 78 deletions

View File

@ -721,7 +721,12 @@ int error_count = 0;
} }
obj = obj->next; obj = obj->next;
} } /*< while */
/** TODO: consistency check function */
/**
* error_count += consistency_checks();
*/
if (error_count) if (error_count)
{ {

View File

@ -1008,7 +1008,6 @@ bool service_set_param_value (
{ {
char* p; char* p;
int valint; int valint;
bool percent = false;
bool succp; bool succp;
/** /**
@ -1083,36 +1082,57 @@ static void service_add_qualified_param(
SERVICE* svc, SERVICE* svc,
CONFIG_PARAMETER* param) CONFIG_PARAMETER* param)
{ {
CONFIG_PARAMETER** p;
spinlock_acquire(&svc->spin); spinlock_acquire(&svc->spin);
p = &svc->svc_config_param; if (svc->svc_config_param == NULL)
if ((*p) != NULL)
{ {
do svc->svc_config_param = config_clone_param(param);
{ svc->svc_config_param->next = NULL;
/** If duplicate is found, latter remains */
if (strncasecmp(param->name,
(*p)->name,
strlen(param->name)) == 0)
{
*p = config_clone_param(param);
break;
}
}
while ((*p)->next != NULL);
(*p)->next = config_clone_param(param);
} }
else else
{ {
(*p) = config_clone_param(param); CONFIG_PARAMETER* p = svc->svc_config_param;
CONFIG_PARAMETER* prev = NULL;
while (true)
{
CONFIG_PARAMETER* old;
/** Replace existing parameter in the list, free old */
if (strncasecmp(param->name,
p->name,
strlen(param->name)) == 0)
{
old = p;
p = config_clone_param(param);
p->next = old->next;
if (prev != NULL)
{
prev->next = p;
}
else
{
svc->svc_config_param = p;
}
free(old);
break;
}
prev = p;
p = p->next;
/** Hit end of the list, add new parameter */
if (p == NULL)
{
p = config_clone_param(param);
prev->next = p;
p->next = NULL;
break;
}
}
} }
/** Increment service's configuration version */ /** Increment service's configuration version */
atomic_add(&svc->svc_config_version, 1); atomic_add(&svc->svc_config_version, 1);
(*p)->next = NULL;
spinlock_release(&svc->spin); spinlock_release(&svc->spin);
} }

View File

@ -99,6 +99,13 @@ static backend_ref_t* get_bref_from_dcb(ROUTER_CLIENT_SES* rses, DCB* dcb);
static uint8_t getCapabilities (ROUTER* inst, void* router_session); static uint8_t getCapabilities (ROUTER* inst, void* router_session);
#if defined(NOT_USED)
static bool router_option_configured(
ROUTER_INSTANCE* router,
const char* optionstr,
void* data);
#endif
#if defined(PREP_STMT_CACHING) #if defined(PREP_STMT_CACHING)
static prep_stmt_t* prep_stmt_init(prep_stmt_type_t type, void* id); static prep_stmt_t* prep_stmt_init(prep_stmt_type_t type, void* id);
static void prep_stmt_done(prep_stmt_t* pstmt); static void prep_stmt_done(prep_stmt_t* pstmt);
@ -120,7 +127,10 @@ int bref_cmp_current_load(
const void* bref1, const void* bref1,
const void* bref2); const void* bref2);
/**
* The order of functions _must_ match with the order the select criteria are
* listed in select_criteria_t definition in readwritesplit.h
*/
int (*criteria_cmpfun[LAST_CRITERIA])(const void*, const void*)= int (*criteria_cmpfun[LAST_CRITERIA])(const void*, const void*)=
{ {
NULL, NULL,
@ -145,7 +155,7 @@ static bool get_dcb(
ROUTER_CLIENT_SES* rses, ROUTER_CLIENT_SES* rses,
backend_type_t btype); backend_type_t btype);
static void rwsplit_process_options( static void rwsplit_process_router_options(
ROUTER_INSTANCE* router, ROUTER_INSTANCE* router,
char** options); char** options);
@ -305,6 +315,9 @@ static void refreshInstance(
CONFIG_PARAMETER* param) CONFIG_PARAMETER* param)
{ {
config_param_type_t paramtype; config_param_type_t paramtype;
bool rlag_enabled = false;
bool rlag_limited = false;
paramtype = config_get_paramtype(param); paramtype = config_get_paramtype(param);
@ -322,6 +335,15 @@ static void refreshInstance(
{ {
router->rwsplit_config.rw_max_slave_replication_lag = router->rwsplit_config.rw_max_slave_replication_lag =
config_get_valint(param, NULL, paramtype); config_get_valint(param, NULL, paramtype);
if (router->rwsplit_config.rw_max_slave_replication_lag > 0)
{
rlag_limited = true;
}
}
else if (strncmp(param->name, "detect_replication_lag", MAX_PARAM_LEN) == 0)
{
rlag_enabled = (bool)config_get_valint(param, NULL, paramtype);
} }
} }
else if (paramtype == PERCENT_TYPE) else if (paramtype == PERCENT_TYPE)
@ -333,8 +355,39 @@ static void refreshInstance(
config_get_valint(param, NULL, paramtype); config_get_valint(param, NULL, paramtype);
} }
} }
/**
* If replication lag detection is not enabled the measure can't be
* used in slave selection.
*/
if (!rlag_enabled)
{
if (rlag_limited)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Warning : Configuration Failed, max_slave_replication_lag "
"is set to %d,\n\t\t but detect_replication_lag "
"is not enabled. Replication lag will not be checked.",
router->rwsplit_config.rw_max_slave_replication_lag)));
} }
if (router->rwsplit_config.rw_slave_select_criteria ==
LEAST_BEHIND_MASTER)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Warning : Configuration Failed, router option "
"\n\t\t slave_selection_criteria=LEAST_BEHIND_MASTER "
"is specified, but detect_replication_lag "
"is not enabled.\n\t\t "
"slave_selection_criteria=%s will be used instead.",
STRCRITERIA(DEFAULT_CRITERIA))));
router->rwsplit_config.rw_slave_select_criteria =
DEFAULT_CRITERIA;
}
}
}
/** /**
* Create an instance of read/write statement router within the MaxScale. * Create an instance of read/write statement router within the MaxScale.
@ -473,9 +526,11 @@ createInstance(SERVICE *service, char **options)
*/ */
router->bitmask = 0; router->bitmask = 0;
router->bitvalue = 0; router->bitvalue = 0;
/** Call this before refreshInstance */
if (options) if (options)
{ {
rwsplit_process_options(router, options); rwsplit_process_router_options(router, options);
} }
/** /**
* Set default value for max_slave_connections and for slave selection * Set default value for max_slave_connections and for slave selection
@ -497,7 +552,6 @@ createInstance(SERVICE *service, char **options)
if (param != NULL) if (param != NULL)
{ {
refreshInstance(router, param); refreshInstance(router, param);
router->rwsplit_version = service->svc_config_version;
} }
/** /**
* Read default value for slave replication lag upper limit and then * Read default value for slave replication lag upper limit and then
@ -509,8 +563,8 @@ createInstance(SERVICE *service, char **options)
if (param != NULL) if (param != NULL)
{ {
refreshInstance(router, param); refreshInstance(router, param);
router->rwsplit_version = service->svc_config_version;
} }
router->rwsplit_version = service->svc_config_version;
/** /**
* We have completed the creation of the router data, so now * We have completed the creation of the router data, so now
* insert this router into the linked list of routers * insert this router into the linked list of routers
@ -577,7 +631,8 @@ static void* newSession(
} }
router->rwsplit_version = router->service->svc_config_version; router->rwsplit_version = router->service->svc_config_version;
/** Read options */ /** Read options */
rwsplit_process_options(router, router->service->routerOptions); rwsplit_process_router_options(router, router->service->routerOptions);
} }
/** Copy config struct from router instance */ /** Copy config struct from router instance */
client_rses->rses_config = router->rwsplit_config; client_rses->rses_config = router->rwsplit_config;
@ -1553,7 +1608,7 @@ lock_failed:
return; return;
} }
/** Compare nunmber of connections from this router in backend servers */
int bref_cmp_router_conn( int bref_cmp_router_conn(
const void* bref1, const void* bref1,
const void* bref2) const void* bref2)
@ -1565,6 +1620,7 @@ int bref_cmp_router_conn(
- ((1000 * b2->backend_conn_count) / b2->weight); - ((1000 * b2->backend_conn_count) / b2->weight);
} }
/** Compare nunmber of global connections in backend servers */
int bref_cmp_global_conn( int bref_cmp_global_conn(
const void* bref1, const void* bref1,
const void* bref2) const void* bref2)
@ -1577,13 +1633,19 @@ int bref_cmp_global_conn(
} }
/** Compare relication lag between backend servers */
int bref_cmp_behind_master( int bref_cmp_behind_master(
const void* bref1, const void* bref1,
const void* bref2) const void* bref2)
{ {
return 1; BACKEND* b1 = ((backend_ref_t *)bref1)->bref_backend;
BACKEND* b2 = ((backend_ref_t *)bref2)->bref_backend;
return ((b1->backend_server->rlag < b2->backend_server->rlag) ? -1 :
((b1->backend_server->rlag > b2->backend_server->rlag) ? 1 : 0));
} }
/** Compare nunmber of current operations in backend servers */
int bref_cmp_current_load( int bref_cmp_current_load(
const void* bref1, const void* bref1,
const void* bref2) const void* bref2)
@ -1597,7 +1659,6 @@ int bref_cmp_current_load(
- ((1000 * s2->stats.n_current_ops) - b2->weight); - ((1000 * s2->stats.n_current_ops) - b2->weight);
} }
static void bref_clear_state( static void bref_clear_state(
backend_ref_t* bref, backend_ref_t* bref,
bref_state_t state) bref_state_t state)
@ -1619,14 +1680,6 @@ static void bref_clear_state(
prev2 = atomic_add( prev2 = atomic_add(
&bref->bref_backend->backend_server->stats.n_current_ops, -1); &bref->bref_backend->backend_server->stats.n_current_ops, -1);
ss_dassert(prev2 > 0); ss_dassert(prev2 > 0);
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Current waiters %d and ops %d after in %s:%d",
prev1-1,
prev2-1,
bref->bref_backend->backend_server->name,
bref->bref_backend->backend_server->port)));
} }
} }
@ -1651,23 +1704,7 @@ static void bref_set_state(
prev2 = atomic_add( prev2 = atomic_add(
&bref->bref_backend->backend_server->stats.n_current_ops, 1); &bref->bref_backend->backend_server->stats.n_current_ops, 1);
ss_dassert(prev2 >= 0); ss_dassert(prev2 >= 0);
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Current waiters %d and ops %d before in %s:%d",
prev1,
prev2,
bref->bref_backend->backend_server->name,
bref->bref_backend->backend_server->port)));
} }
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [bref_set_state] Set state %d for %s:%d fd %d",
pthread_self(),
bref->bref_state,
bref->bref_backend->backend_server->name,
bref->bref_backend->backend_server->port,
bref->bref_dcb->fd)));
} }
/** /**
@ -1814,7 +1851,9 @@ static bool select_connect_backend_servers(
if (LOG_IS_ENABLED(LOGFILE_TRACE)) if (LOG_IS_ENABLED(LOGFILE_TRACE))
{ {
if (select_criteria == LEAST_GLOBAL_CONNECTIONS || if (select_criteria == LEAST_GLOBAL_CONNECTIONS ||
select_criteria == LEAST_ROUTER_CONNECTIONS) select_criteria == LEAST_ROUTER_CONNECTIONS ||
select_criteria == LEAST_BEHIND_MASTER ||
select_criteria == LEAST_CURRENT_OPERATIONS)
{ {
LOGIF(LT, (skygw_log_write(LOGFILE_TRACE, LOGIF(LT, (skygw_log_write(LOGFILE_TRACE,
"Servers and %s connection counts:", "Servers and %s connection counts:",
@ -1828,7 +1867,7 @@ static bool select_connect_backend_servers(
switch(select_criteria) { switch(select_criteria) {
case LEAST_GLOBAL_CONNECTIONS: case LEAST_GLOBAL_CONNECTIONS:
LOGIF(LT, (skygw_log_write_flush(LOGFILE_TRACE, LOGIF(LT, (skygw_log_write_flush(LOGFILE_TRACE,
"%s %d:%d", "%s:%d MaxScale connections : %d",
b->backend_server->name, b->backend_server->name,
b->backend_server->port, b->backend_server->port,
b->backend_server->stats.n_current))); b->backend_server->stats.n_current)));
@ -1836,7 +1875,7 @@ static bool select_connect_backend_servers(
case LEAST_ROUTER_CONNECTIONS: case LEAST_ROUTER_CONNECTIONS:
LOGIF(LT, (skygw_log_write_flush(LOGFILE_TRACE, LOGIF(LT, (skygw_log_write_flush(LOGFILE_TRACE,
"%s %d:%d", "%s:%d RWSplit connections : %d",
b->backend_server->name, b->backend_server->name,
b->backend_server->port, b->backend_server->port,
b->backend_conn_count))); b->backend_conn_count)));
@ -1844,11 +1883,18 @@ static bool select_connect_backend_servers(
case LEAST_CURRENT_OPERATIONS: case LEAST_CURRENT_OPERATIONS:
LOGIF(LT, (skygw_log_write_flush(LOGFILE_TRACE, LOGIF(LT, (skygw_log_write_flush(LOGFILE_TRACE,
"%s %d:%d", "%s:%d current operations : %d",
b->backend_server->name, b->backend_server->name,
b->backend_server->port, b->backend_server->port,
b->backend_server->stats.n_current_ops))); b->backend_server->stats.n_current_ops)));
break; break;
case LEAST_BEHIND_MASTER:
LOGIF(LT, (skygw_log_write_flush(LOGFILE_TRACE,
"%s:%d replication lag : %d",
b->backend_server->name,
b->backend_server->port,
b->backend_server->rlag)));
default: default:
break; break;
} }
@ -1865,21 +1911,13 @@ static bool select_connect_backend_servers(
{ {
BACKEND* b = backend_ref[i].bref_backend; BACKEND* b = backend_ref[i].bref_backend;
LOGIF(LT, (skygw_log_write_flush(
LOGFILE_TRACE,
"Examine server "
"%s:%d %s with %d active operations.",
b->backend_server->name,
b->backend_server->port,
STRSRVSTATUS(b->backend_server),
b->backend_server->stats.n_current_ops)));
if (SERVER_IS_RUNNING(b->backend_server) && if (SERVER_IS_RUNNING(b->backend_server) &&
((b->backend_server->status & router->bitmask) == ((b->backend_server->status & router->bitmask) ==
router->bitvalue)) router->bitvalue))
{ {
/* check also for relay servers and don't take the master_host */ /* check also for relay servers and don't take the master_host */
if (slaves_found < max_nslaves && if (slaves_found < max_nslaves &&
b->backend_server->rlag != -1 && /*< information currently not available */
b->backend_server->rlag <= max_slave_rlag && b->backend_server->rlag <= max_slave_rlag &&
(SERVER_IS_SLAVE(b->backend_server) || SERVER_IS_RELAY_SERVER(b->backend_server)) && (SERVER_IS_SLAVE(b->backend_server) || SERVER_IS_RELAY_SERVER(b->backend_server)) &&
(master_host != NULL && (b->backend_server != master_host->backend_server))) (master_host != NULL && (b->backend_server != master_host->backend_server)))
@ -2967,7 +3005,44 @@ return_succp:
return succp; return succp;
} }
static void rwsplit_process_options( #if defined(NOT_USED)
static bool router_option_configured(
ROUTER_INSTANCE* router,
const char* optionstr,
void* data)
{
bool succp = false;
char** option;
option = router->service->routerOptions;
while (option != NULL)
{
char* value;
if ((value = strchr(options[i], '=')) == NULL)
{
break;
}
else
{
*value = 0;
value++;
if (strcmp(options[i], "slave_selection_criteria") == 0)
{
if (GET_SELECT_CRITERIA(value) == (select_criteria_t *)*data)
{
succp = true;
break;
}
}
}
}
return succp;
}
#endif
static void rwsplit_process_router_options(
ROUTER_INSTANCE* router, ROUTER_INSTANCE* router,
char** options) char** options)
{ {
@ -3004,9 +3079,10 @@ static void rwsplit_process_options(
LOGIF(LE, (skygw_log_write( LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR, "Warning : Unknown " LOGFILE_ERROR, "Warning : Unknown "
"slave selection criteria \"%s\". " "slave selection criteria \"%s\". "
"Allowed values are \"LEAST_GLOBAL_CONNECTIONS\", " "Allowed values are LEAST_GLOBAL_CONNECTIONS, "
"LEAST_ROUTER_CONNECTIONS, " "LEAST_ROUTER_CONNECTIONS, "
"and \"LEAST_CURRENT_OPERATIONS\".", "LEAST_BEHIND_MASTER,"
"and LEAST_CURRENT_OPERATIONS.",
STRCRITERIA(router->rwsplit_config.rw_slave_select_criteria)))); STRCRITERIA(router->rwsplit_config.rw_slave_select_criteria))));
} }
else else

View File

@ -228,7 +228,8 @@ typedef enum skygw_chk_t {
#define STRCRITERIA(c) ((c) == UNDEFINED_CRITERIA ? "UNDEFINED_CRITERIA" : \ #define STRCRITERIA(c) ((c) == UNDEFINED_CRITERIA ? "UNDEFINED_CRITERIA" : \
((c) == LEAST_GLOBAL_CONNECTIONS ? "LEAST_GLOBAL_CONNECTIONS" : \ ((c) == LEAST_GLOBAL_CONNECTIONS ? "LEAST_GLOBAL_CONNECTIONS" : \
((c) == LEAST_ROUTER_CONNECTIONS ? "LEAST_ROUTER_CONNECTIONS" : \ ((c) == LEAST_ROUTER_CONNECTIONS ? "LEAST_ROUTER_CONNECTIONS" : \
((c) == LEAST_BEHIND_MASTER ? "LEAST_BEHIND_MASTER" : "Unknown criteria")))) ((c) == LEAST_BEHIND_MASTER ? "LEAST_BEHIND_MASTER" : \
((c) == LEAST_CURRENT_OPERATIONS ? "LEAST_CURRENT_OPERATIONS" : "Unknown criteria")))))
#define STRSRVSTATUS(s) ((SERVER_IS_RUNNING(s) && SERVER_IS_MASTER(s)) ? "RUNNING MASTER" : \ #define STRSRVSTATUS(s) ((SERVER_IS_RUNNING(s) && SERVER_IS_MASTER(s)) ? "RUNNING MASTER" : \
((SERVER_IS_RUNNING(s) && SERVER_IS_SLAVE(s)) ? "RUNNING SLAVE" : \ ((SERVER_IS_RUNNING(s) && SERVER_IS_SLAVE(s)) ? "RUNNING SLAVE" : \