Remove BACKEND structure from readwritesplit

The BACKEND structure is no longer created for the instance. This allows
sessions to dynamically create new servers.
This commit is contained in:
Markus Makela
2016-11-03 12:00:33 +02:00
parent a097204c70
commit a163e31b9a
8 changed files with 135 additions and 225 deletions

View File

@ -100,6 +100,7 @@ typedef struct server_ref_t
struct server_ref_t *next; /**< Next server reference */ struct server_ref_t *next; /**< Next server reference */
SERVER* server; /**< The actual server */ SERVER* server; /**< The actual server */
int weight; /**< Weight of this server */ int weight; /**< Weight of this server */
int connections; /**< Number of connections created through this reference */
} SERVER_REF; } SERVER_REF;
#define SERVICE_MAX_RETRY_INTERVAL 3600 /*< The maximum interval between service start retries */ #define SERVICE_MAX_RETRY_INTERVAL 3600 /*< The maximum interval between service start retries */

View File

@ -751,6 +751,7 @@ static SERVER_REF* server_ref_alloc(SERVER *server)
sref->next = NULL; sref->next = NULL;
sref->server = server; sref->server = server;
sref->weight = SERVICE_BASE_SERVER_WEIGHT; sref->weight = SERVICE_BASE_SERVER_WEIGHT;
sref->connections = 0;
} }
return sref; return sref;

View File

@ -183,12 +183,7 @@ ROUTER_OBJECT *GetModuleObject()
static ROUTER *createInstance(SERVICE *service, char **options) static ROUTER *createInstance(SERVICE *service, char **options)
{ {
ROUTER_INSTANCE *router; ROUTER_INSTANCE *router;
SERVER *server;
SERVER_REF *sref;
int nservers;
int i;
CONFIG_PARAMETER *param; CONFIG_PARAMETER *param;
char *weightby;
if ((router = MXS_CALLOC(1, sizeof(ROUTER_INSTANCE))) == NULL) if ((router = MXS_CALLOC(1, sizeof(ROUTER_INSTANCE))) == NULL)
{ {
@ -197,51 +192,6 @@ static ROUTER *createInstance(SERVICE *service, char **options)
router->service = service; router->service = service;
spinlock_init(&router->lock); spinlock_init(&router->lock);
/** Calculate number of servers */
sref = service->dbref;
nservers = 0;
while (sref != NULL)
{
nservers++;
sref = sref->next;
}
router->servers = (BACKEND **)MXS_CALLOC(nservers + 1, sizeof(BACKEND *));
if (router->servers == NULL)
{
free_rwsplit_instance(router);
return NULL;
}
/**
* Create an array of the backend servers in the router structure to
* maintain a count of the number of connections to each
* backend server.
*/
sref = service->dbref;
nservers = 0;
while (sref != NULL)
{
if ((router->servers[nservers] = MXS_MALLOC(sizeof(BACKEND))) == NULL)
{
free_rwsplit_instance(router);
return NULL;
}
router->servers[nservers]->backend_server = sref->server;
router->servers[nservers]->backend_conn_count = 0;
router->servers[nservers]->be_valid = false;
router->servers[nservers]->weight = sref->weight;
#if defined(SS_DEBUG)
router->servers[nservers]->be_chk_top = CHK_NUM_BACKEND;
router->servers[nservers]->be_chk_tail = CHK_NUM_BACKEND;
#endif
nservers += 1;
sref = sref->next;
}
router->servers[nservers] = NULL;
/* /*
* Until we know otherwise assume we have some available slaves. * Until we know otherwise assume we have some available slaves.
*/ */
@ -268,6 +218,13 @@ static ROUTER *createInstance(SERVICE *service, char **options)
router->rwsplit_config.rw_max_sescmd_history_size = 0; router->rwsplit_config.rw_max_sescmd_history_size = 0;
} }
int nservers = 0;
for (SERVER_REF *ref = service->dbref; ref; ref = ref->next)
{
nservers++;
}
/** /**
* Set default value for max_slave_connections as 100%. This way * Set default value for max_slave_connections as 100%. This way
* LEAST_CURRENT_OPERATIONS allows us to balance evenly across all the * LEAST_CURRENT_OPERATIONS allows us to balance evenly across all the
@ -341,8 +298,7 @@ static ROUTER *createInstance(SERVICE *service, char **options)
*/ */
static void *newSession(ROUTER *router_inst, SESSION *session) static void *newSession(ROUTER *router_inst, SESSION *session)
{ {
backend_ref_t backend_ref_t *backend_ref; /*< array of backend references (DCB,BACKEND,cursor) */
*backend_ref; /*< array of backend references (DCB,BACKEND,cursor) */
backend_ref_t *master_ref = NULL; /*< pointer to selected master */ backend_ref_t *master_ref = NULL; /*< pointer to selected master */
ROUTER_CLIENT_SES *client_rses = NULL; ROUTER_CLIENT_SES *client_rses = NULL;
ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)router_inst; ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)router_inst;
@ -417,7 +373,8 @@ static void *newSession(ROUTER *router_inst, SESSION *session)
* Initialize backend references with BACKEND ptr. * Initialize backend references with BACKEND ptr.
* Initialize session command cursors for each backend reference. * Initialize session command cursors for each backend reference.
*/ */
for (i = 0; i < router_nservers; i++) i = 0;
for (SERVER_REF *sref = router->service->dbref; sref; sref = sref->next)
{ {
#if defined(SS_DEBUG) #if defined(SS_DEBUG)
backend_ref[i].bref_chk_top = CHK_NUM_BACKEND_REF; backend_ref[i].bref_chk_top = CHK_NUM_BACKEND_REF;
@ -426,13 +383,14 @@ static void *newSession(ROUTER *router_inst, SESSION *session)
backend_ref[i].bref_sescmd_cur.scmd_cur_chk_tail = CHK_NUM_SESCMD_CUR; backend_ref[i].bref_sescmd_cur.scmd_cur_chk_tail = CHK_NUM_SESCMD_CUR;
#endif #endif
backend_ref[i].bref_state = 0; backend_ref[i].bref_state = 0;
backend_ref[i].bref_backend = router->servers[i]; backend_ref[i].ref = sref;
/** store pointers to sescmd list to both cursors */ /** store pointers to sescmd list to both cursors */
backend_ref[i].bref_sescmd_cur.scmd_cur_rses = client_rses; backend_ref[i].bref_sescmd_cur.scmd_cur_rses = client_rses;
backend_ref[i].bref_sescmd_cur.scmd_cur_active = false; backend_ref[i].bref_sescmd_cur.scmd_cur_active = false;
backend_ref[i].bref_sescmd_cur.scmd_cur_ptr_property = backend_ref[i].bref_sescmd_cur.scmd_cur_ptr_property =
&client_rses->rses_properties[RSES_PROP_TYPE_SESCMD]; &client_rses->rses_properties[RSES_PROP_TYPE_SESCMD];
backend_ref[i].bref_sescmd_cur.scmd_cur_cmd = NULL; backend_ref[i].bref_sescmd_cur.scmd_cur_cmd = NULL;
i++;
} }
max_nslaves = rses_get_max_slavecount(client_rses, router_nservers); max_nslaves = rses_get_max_slavecount(client_rses, router_nservers);
max_slave_rlag = rses_get_max_replication_lag(client_rses); max_slave_rlag = rses_get_max_replication_lag(client_rses);
@ -586,7 +544,7 @@ static void closeSession(ROUTER *instance, void *router_session)
*/ */
dcb_close(dcb); dcb_close(dcb);
/** decrease server current connection counters */ /** decrease server current connection counters */
atomic_add(&bref->bref_backend->backend_conn_count, -1); atomic_add(&bref->ref->connections, -1);
} }
else else
{ {
@ -729,7 +687,6 @@ static void diagnostics(ROUTER *instance, DCB *dcb)
ROUTER_CLIENT_SES *router_cli_ses; ROUTER_CLIENT_SES *router_cli_ses;
ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)instance; ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)instance;
int i = 0; int i = 0;
BACKEND *backend;
char *weightby; char *weightby;
spinlock_acquire(&router->lock); spinlock_acquire(&router->lock);
@ -770,13 +727,12 @@ static void diagnostics(ROUTER *instance, DCB *dcb)
dcb_printf(dcb, "\t\tServer Target %% Connections " dcb_printf(dcb, "\t\tServer Target %% Connections "
"Operations\n"); "Operations\n");
dcb_printf(dcb, "\t\t Global Router\n"); dcb_printf(dcb, "\t\t Global Router\n");
for (i = 0; router->servers[i]; i++) for (SERVER_REF *ref = router->service->dbref; ref; ref = ref->next)
{ {
backend = router->servers[i];
dcb_printf(dcb, "\t\t%-20s %3.1f%% %-6d %-6d %d\n", dcb_printf(dcb, "\t\t%-20s %3.1f%% %-6d %-6d %d\n",
backend->backend_server->unique_name, (float)backend->weight / 10, ref->server->unique_name, (float)ref->weight / 10,
backend->backend_server->stats.n_current, backend->backend_conn_count, ref->server->stats.n_current, ref->connections,
backend->backend_server->stats.n_current_ops); ref->server->stats.n_current_ops);
} }
} }
} }
@ -923,17 +879,15 @@ static void clientReply(ROUTER *instance, void *router_session, GWBUF *writebuf,
{ {
bool succp; bool succp;
MXS_INFO("Backend %s:%d processed reply and starts to execute " MXS_INFO("Backend %s:%d processed reply and starts to execute active cursor.",
"active cursor.", bref->bref_backend->backend_server->name, bref->ref->server->name, bref->ref->server->port);
bref->bref_backend->backend_server->port);
succp = execute_sescmd_in_backend(bref); succp = execute_sescmd_in_backend(bref);
if (!succp) if (!succp)
{ {
MXS_INFO("Backend %s:%d failed to execute session command.", MXS_INFO("Backend %s:%d failed to execute session command.",
bref->bref_backend->backend_server->name, bref->ref->server->name, bref->ref->server->port);
bref->bref_backend->backend_server->port);
} }
} }
else if (bref->bref_pending_cmd != NULL) /*< non-sescmd is waiting to be routed */ else if (bref->bref_pending_cmd != NULL) /*< non-sescmd is waiting to be routed */
@ -1092,13 +1046,13 @@ void bref_clear_state(backend_ref_t *bref, bref_state_t state)
else else
{ {
/** Decrease global operation count */ /** Decrease global operation count */
prev2 = atomic_add(&bref->bref_backend->backend_server->stats.n_current_ops, -1); prev2 = atomic_add(&bref->ref->server->stats.n_current_ops, -1);
ss_dassert(prev2 > 0); ss_dassert(prev2 > 0);
if (prev2 <= 0) if (prev2 <= 0)
{ {
MXS_ERROR("[%s] Error: negative current operation count in backend %s:%u", MXS_ERROR("[%s] Error: negative current operation count in backend %s:%u",
__FUNCTION__, bref->bref_backend->backend_server->name, __FUNCTION__, bref->ref->server->name,
bref->bref_backend->backend_server->port); bref->ref->server->port);
} }
} }
} }
@ -1137,19 +1091,16 @@ void bref_set_state(backend_ref_t *bref, bref_state_t state)
if (prev1 < 0) if (prev1 < 0)
{ {
MXS_ERROR("[%s] Error: negative number of connections waiting for " MXS_ERROR("[%s] Error: negative number of connections waiting for "
"results in backend %s:%u", "results in backend %s:%u", __FUNCTION__,
__FUNCTION__, bref->bref_backend->backend_server->name, bref->ref->server->name, bref->ref->server->port);
bref->bref_backend->backend_server->port);
} }
/** Increase global operation count */ /** Increase global operation count */
prev2 = prev2 = atomic_add(&bref->ref->server->stats.n_current_ops, 1);
atomic_add(&bref->bref_backend->backend_server->stats.n_current_ops, 1);
ss_dassert(prev2 >= 0); ss_dassert(prev2 >= 0);
if (prev2 < 0) if (prev2 < 0)
{ {
MXS_ERROR("[%s] Error: negative current operation count in backend %s:%u", MXS_ERROR("[%s] Error: negative current operation count in backend %s:%u",
__FUNCTION__, bref->bref_backend->backend_server->name, __FUNCTION__, bref->ref->server->name, bref->ref->server->port);
bref->bref_backend->backend_server->port);
} }
} }
@ -1316,7 +1267,7 @@ int router_handle_state_switch(DCB *dcb, DCB_REASON reason, void *data)
bref = (backend_ref_t *)data; bref = (backend_ref_t *)data;
CHK_BACKEND_REF(bref); CHK_BACKEND_REF(bref);
srv = bref->bref_backend->backend_server; srv = bref->ref->server;
if (SERVER_IS_RUNNING(srv) && SERVER_IS_IN_CLUSTER(srv)) if (SERVER_IS_RUNNING(srv) && SERVER_IS_IN_CLUSTER(srv))
{ {
@ -1531,9 +1482,9 @@ static void handleError(ROUTER *instance, void *router_session,
* handled so that session could continue. * handled so that session could continue.
*/ */
if (rses->rses_master_ref && rses->rses_master_ref->bref_dcb == problem_dcb && if (rses->rses_master_ref && rses->rses_master_ref->bref_dcb == problem_dcb &&
!SERVER_IS_MASTER(rses->rses_master_ref->bref_backend->backend_server)) !SERVER_IS_MASTER(rses->rses_master_ref->ref->server))
{ {
SERVER *srv = rses->rses_master_ref->bref_backend->backend_server; SERVER *srv = rses->rses_master_ref->ref->server;
backend_ref_t *bref; backend_ref_t *bref;
bref = get_bref_from_dcb(rses, problem_dcb); bref = get_bref_from_dcb(rses, problem_dcb);
if (bref != NULL) if (bref != NULL)
@ -1794,9 +1745,8 @@ return_succp:
static int router_get_servercount(ROUTER_INSTANCE *inst) static int router_get_servercount(ROUTER_INSTANCE *inst)
{ {
int router_nservers = 0; int router_nservers = 0;
BACKEND **b = inst->servers;
/** count servers */ for (SERVER_REF *ref = inst->service->dbref; ref; ref = ref->next)
while (*(b++) != NULL)
{ {
router_nservers++; router_nservers++;
} }
@ -2074,14 +2024,6 @@ static void free_rwsplit_instance(ROUTER_INSTANCE *router)
{ {
if (router) if (router)
{ {
if (router->servers)
{
for (int i = 0; router->servers[i]; i++)
{
MXS_FREE(router->servers[i]);
}
}
MXS_FREE(router->servers);
MXS_FREE(router); MXS_FREE(router);
} }
} }

View File

@ -199,27 +199,6 @@ typedef struct sescmd_cursor_st
#endif #endif
} sescmd_cursor_t; } sescmd_cursor_t;
/**
* Internal structure used to define the set of backend servers we are routing
* connections to. This provides the storage for routing module specific data
* that is required for each of the backend servers.
*
* Owned by router_instance, referenced by each routing session.
*/
typedef struct backend_st
{
#if defined(SS_DEBUG)
skygw_chk_t be_chk_top;
#endif
SERVER* backend_server; /*< The server itself */
int backend_conn_count; /*< Number of connections to the server */
bool be_valid; /*< Valid when belongs to the router's configuration */
int weight; /*< Desired weighting on the load. Expressed in .1% increments */
#if defined(SS_DEBUG)
skygw_chk_t be_chk_tail;
#endif
} BACKEND;
/** /**
* Reference to BACKEND. * Reference to BACKEND.
* *
@ -230,7 +209,7 @@ typedef struct backend_ref_st
#if defined(SS_DEBUG) #if defined(SS_DEBUG)
skygw_chk_t bref_chk_top; skygw_chk_t bref_chk_top;
#endif #endif
BACKEND* bref_backend; SERVER_REF* ref;
DCB* bref_dcb; DCB* bref_dcb;
bref_state_t bref_state; bref_state_t bref_state;
int bref_num_result_wait; int bref_num_result_wait;
@ -348,8 +327,6 @@ typedef struct router_instance
SERVICE* service; /*< Pointer to service */ SERVICE* service; /*< Pointer to service */
ROUTER_CLIENT_SES* connections; /*< List of client connections */ ROUTER_CLIENT_SES* connections; /*< List of client connections */
SPINLOCK lock; /*< Lock for the instance data */ SPINLOCK lock; /*< Lock for the instance data */
BACKEND** servers; /*< Backend servers */
BACKEND* master; /*< NULL or pointer */
rwsplit_config_t rwsplit_config; /*< expanded config info from SERVICE */ rwsplit_config_t rwsplit_config; /*< expanded config info from SERVICE */
int rwsplit_version; /*< version number for router's config */ int rwsplit_version; /*< version number for router's config */
ROUTER_STATS stats; /*< Statistics for this router */ ROUTER_STATS stats; /*< Statistics for this router */

View File

@ -404,7 +404,7 @@ void print_error_packet(ROUTER_CLIENT_SES *rses, GWBUF *buf, DCB *dcb)
{ {
if (bref[i].bref_dcb == dcb) if (bref[i].bref_dcb == dcb)
{ {
srv = bref[i].bref_backend->backend_server; srv = bref[i].ref->server;
} }
} }
ss_dassert(srv != NULL); ss_dassert(srv != NULL);
@ -453,8 +453,8 @@ void check_session_command_reply(GWBUF *writebuf, sescmd_cursor_t *scur, backend
ss_dassert(len + 4 == GWBUF_LENGTH(scur->scmd_cur_cmd->my_sescmd_buf)); ss_dassert(len + 4 == GWBUF_LENGTH(scur->scmd_cur_cmd->my_sescmd_buf));
MXS_ERROR("Failed to execute session command in %s:%d. Error was: %s %s", MXS_ERROR("Failed to execute session command in %s:%d. Error was: %s %s",
bref->bref_backend->backend_server->name, bref->ref->server->name,
bref->bref_backend->backend_server->port, err, replystr); bref->ref->server->port, err, replystr);
MXS_FREE(err); MXS_FREE(err);
MXS_FREE(replystr); MXS_FREE(replystr);
} }

View File

@ -253,10 +253,10 @@ bool route_session_write(ROUTER_CLIENT_SES *router_cli_ses,
BREF_IS_IN_USE((&backend_ref[i]))) BREF_IS_IN_USE((&backend_ref[i])))
{ {
MXS_INFO("Route query to %s \t%s:%d%s", MXS_INFO("Route query to %s \t%s:%d%s",
(SERVER_IS_MASTER(backend_ref[i].bref_backend->backend_server) (SERVER_IS_MASTER(backend_ref[i].ref->server)
? "master" : "slave"), ? "master" : "slave"),
backend_ref[i].bref_backend->backend_server->name, backend_ref[i].ref->server->name,
backend_ref[i].bref_backend->backend_server->port, backend_ref[i].ref->server->port,
(i + 1 == router_cli_ses->rses_nbackends ? " <" : " ")); (i + 1 == router_cli_ses->rses_nbackends ? " <" : " "));
} }
@ -368,10 +368,10 @@ bool route_session_write(ROUTER_CLIENT_SES *router_cli_ses,
if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO)) if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO))
{ {
MXS_INFO("Route query to %s \t%s:%d%s", MXS_INFO("Route query to %s \t%s:%d%s",
(SERVER_IS_MASTER(backend_ref[i].bref_backend->backend_server) (SERVER_IS_MASTER(backend_ref[i].ref->server)
? "master" : "slave"), ? "master" : "slave"),
backend_ref[i].bref_backend->backend_server->name, backend_ref[i].ref->server->name,
backend_ref[i].bref_backend->backend_server->port, backend_ref[i].ref->server->port,
(i + 1 == router_cli_ses->rses_nbackends ? " <" : " ")); (i + 1 == router_cli_ses->rses_nbackends ? " <" : " "));
} }
@ -391,8 +391,8 @@ bool route_session_write(ROUTER_CLIENT_SES *router_cli_ses,
{ {
nsucc += 1; nsucc += 1;
MXS_INFO("Backend %s:%d already executing sescmd.", MXS_INFO("Backend %s:%d already executing sescmd.",
backend_ref[i].bref_backend->backend_server->name, backend_ref[i].ref->server->name,
backend_ref[i].bref_backend->backend_server->port); backend_ref[i].ref->server->port);
} }
else else
{ {
@ -403,8 +403,8 @@ bool route_session_write(ROUTER_CLIENT_SES *router_cli_ses,
else else
{ {
MXS_ERROR("Failed to execute session command in %s:%d", MXS_ERROR("Failed to execute session command in %s:%d",
backend_ref[i].bref_backend->backend_server->name, backend_ref[i].ref->server->name,
backend_ref[i].bref_backend->backend_server->port); backend_ref[i].ref->server->port);
} }
} }
} }
@ -533,9 +533,9 @@ bool rwsplit_get_dcb(DCB **p_dcb, ROUTER_CLIENT_SES *rses, backend_type_t btype,
for (i = 0; i < rses->rses_nbackends; i++) for (i = 0; i < rses->rses_nbackends; i++)
{ {
BACKEND *b = backend_ref[i].bref_backend; SERVER_REF *b = backend_ref[i].ref;
SERVER server; SERVER server;
server.status = backend_ref[i].bref_backend->backend_server->status; server.status = b->server->status;
/** /**
* To become chosen: * To become chosen:
* backend must be in use, name must match, * backend must be in use, name must match,
@ -543,7 +543,7 @@ bool rwsplit_get_dcb(DCB **p_dcb, ROUTER_CLIENT_SES *rses, backend_type_t btype,
* server, or master. * server, or master.
*/ */
if (BREF_IS_IN_USE((&backend_ref[i])) && if (BREF_IS_IN_USE((&backend_ref[i])) &&
(strncasecmp(name, b->backend_server->unique_name, PATH_MAX) == 0) && (strncasecmp(name, b->server->unique_name, PATH_MAX) == 0) &&
(SERVER_IS_SLAVE(&server) || SERVER_IS_RELAY_SERVER(&server) || (SERVER_IS_SLAVE(&server) || SERVER_IS_RELAY_SERVER(&server) ||
SERVER_IS_MASTER(&server))) SERVER_IS_MASTER(&server)))
{ {
@ -569,10 +569,10 @@ bool rwsplit_get_dcb(DCB **p_dcb, ROUTER_CLIENT_SES *rses, backend_type_t btype,
for (i = 0; i < rses->rses_nbackends; i++) for (i = 0; i < rses->rses_nbackends; i++)
{ {
BACKEND *b = (&backend_ref[i])->bref_backend; SERVER_REF *b = backend_ref[i].ref;
SERVER server; SERVER server;
SERVER candidate; SERVER candidate;
server.status = backend_ref[i].bref_backend->backend_server->status; server.status = b->server->status;
/** /**
* Unused backend or backend which is not master nor * Unused backend or backend which is not master nor
* slave can't be used * slave can't be used
@ -596,7 +596,7 @@ bool rwsplit_get_dcb(DCB **p_dcb, ROUTER_CLIENT_SES *rses, backend_type_t btype,
{ {
/** found master */ /** found master */
candidate_bref = &backend_ref[i]; candidate_bref = &backend_ref[i];
candidate.status = candidate_bref->bref_backend->backend_server->status; candidate.status = candidate_bref->ref->server->status;
succp = true; succp = true;
} }
/** /**
@ -605,12 +605,12 @@ bool rwsplit_get_dcb(DCB **p_dcb, ROUTER_CLIENT_SES *rses, backend_type_t btype,
* maximum allowed replication lag. * maximum allowed replication lag.
*/ */
else if (max_rlag == MAX_RLAG_UNDEFINED || else if (max_rlag == MAX_RLAG_UNDEFINED ||
(b->backend_server->rlag != MAX_RLAG_NOT_AVAILABLE && (b->server->rlag != MAX_RLAG_NOT_AVAILABLE &&
b->backend_server->rlag <= max_rlag)) b->server->rlag <= max_rlag))
{ {
/** found slave */ /** found slave */
candidate_bref = &backend_ref[i]; candidate_bref = &backend_ref[i];
candidate.status = candidate_bref->bref_backend->backend_server->status; candidate.status = candidate_bref->ref->server->status;
succp = true; succp = true;
} }
} }
@ -620,13 +620,13 @@ bool rwsplit_get_dcb(DCB **p_dcb, ROUTER_CLIENT_SES *rses, backend_type_t btype,
*/ */
else if (SERVER_IS_MASTER(&candidate) && SERVER_IS_SLAVE(&server) && else if (SERVER_IS_MASTER(&candidate) && SERVER_IS_SLAVE(&server) &&
(max_rlag == MAX_RLAG_UNDEFINED || (max_rlag == MAX_RLAG_UNDEFINED ||
(b->backend_server->rlag != MAX_RLAG_NOT_AVAILABLE && (b->server->rlag != MAX_RLAG_NOT_AVAILABLE &&
b->backend_server->rlag <= max_rlag)) && b->server->rlag <= max_rlag)) &&
!rses->rses_config.rw_master_reads) !rses->rses_config.rw_master_reads)
{ {
/** found slave */ /** found slave */
candidate_bref = &backend_ref[i]; candidate_bref = &backend_ref[i];
candidate.status = candidate_bref->bref_backend->backend_server->status; candidate.status = candidate_bref->ref->server->status;
succp = true; succp = true;
} }
/** /**
@ -637,21 +637,17 @@ bool rwsplit_get_dcb(DCB **p_dcb, ROUTER_CLIENT_SES *rses, backend_type_t btype,
else if (SERVER_IS_SLAVE(&server)) else if (SERVER_IS_SLAVE(&server))
{ {
if (max_rlag == MAX_RLAG_UNDEFINED || if (max_rlag == MAX_RLAG_UNDEFINED ||
(b->backend_server->rlag != MAX_RLAG_NOT_AVAILABLE && (b->server->rlag != MAX_RLAG_NOT_AVAILABLE &&
b->backend_server->rlag <= max_rlag)) b->server->rlag <= max_rlag))
{ {
candidate_bref = candidate_bref = check_candidate_bref(candidate_bref, &backend_ref[i],
check_candidate_bref(candidate_bref, &backend_ref[i], rses->rses_config.rw_slave_select_criteria);
rses->rses_config.rw_slave_select_criteria); candidate.status = candidate_bref->ref->server->status;
candidate.status =
candidate_bref->bref_backend->backend_server->status;
} }
else else
{ {
MXS_INFO("Server %s:%d is too much behind the " MXS_INFO("Server %s:%d is too much behind the master, %d s. and can't be chosen.",
"master, %d s. and can't be chosen.", b->server->name, b->server->port, b->server->rlag);
b->backend_server->name, b->backend_server->port,
b->backend_server->rlag);
} }
} }
} /*< for */ } /*< for */
@ -675,7 +671,7 @@ bool rwsplit_get_dcb(DCB **p_dcb, ROUTER_CLIENT_SES *rses, backend_type_t btype,
* so copying it locally will make possible error messages * so copying it locally will make possible error messages
* easier to understand */ * easier to understand */
SERVER server; SERVER server;
server.status = master_bref->bref_backend->backend_server->status; server.status = master_bref->ref->server->status;
if (BREF_IS_IN_USE(master_bref) && SERVER_IS_MASTER(&server)) if (BREF_IS_IN_USE(master_bref) && SERVER_IS_MASTER(&server))
{ {
*p_dcb = master_bref->bref_dcb; *p_dcb = master_bref->bref_dcb;
@ -687,8 +683,8 @@ bool rwsplit_get_dcb(DCB **p_dcb, ROUTER_CLIENT_SES *rses, backend_type_t btype,
{ {
MXS_ERROR("Server at %s:%d should be master but " MXS_ERROR("Server at %s:%d should be master but "
"is %s instead and can't be chosen to master.", "is %s instead and can't be chosen to master.",
master_bref->bref_backend->backend_server->name, master_bref->ref->server->name,
master_bref->bref_backend->backend_server->port, master_bref->ref->server->port,
STRSRVSTATUS(&server)); STRSRVSTATUS(&server));
succp = false; succp = false;
} }
@ -1191,9 +1187,8 @@ handle_got_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
ss_dassert(target_dcb != NULL); ss_dassert(target_dcb != NULL);
MXS_INFO("Route query to %s \t%s:%d <", MXS_INFO("Route query to %s \t%s:%d <",
(SERVER_IS_MASTER(bref->bref_backend->backend_server) ? "master" (SERVER_IS_MASTER(bref->ref->server) ? "master"
: "slave"), bref->bref_backend->backend_server->name, : "slave"), bref->ref->server->name, bref->ref->server->port);
bref->bref_backend->backend_server->port);
/** /**
* Store current stmt if execution of previous session command * Store current stmt if execution of previous session command
* haven't completed yet. * haven't completed yet.
@ -1372,14 +1367,13 @@ static backend_ref_t *get_root_master_bref(ROUTER_CLIENT_SES *rses)
if (bref == rses->rses_master_ref) if (bref == rses->rses_master_ref)
{ {
/** Store master state for better error reporting */ /** Store master state for better error reporting */
master.status = bref->bref_backend->backend_server->status; master.status = bref->ref->server->status;
} }
if (bref->bref_backend->backend_server->status & SERVER_MASTER) if (SERVER_IS_MASTER(bref->ref->server))
{ {
if (candidate_bref == NULL || if (candidate_bref == NULL ||
(bref->bref_backend->backend_server->depth < (bref->ref->server->depth < candidate_bref->ref->server->depth))
candidate_bref->bref_backend->backend_server->depth))
{ {
candidate_bref = bref; candidate_bref = bref;
} }

View File

@ -38,7 +38,7 @@ static bool connect_server(backend_ref_t *bref, SESSION *session, bool execute_h
static void log_server_connections(select_criteria_t select_criteria, static void log_server_connections(select_criteria_t select_criteria,
backend_ref_t *backend_ref, int router_nservers); backend_ref_t *backend_ref, int router_nservers);
static BACKEND *get_root_master(backend_ref_t *servers, int router_nservers); static SERVER_REF *get_root_master(backend_ref_t *servers, int router_nservers);
static int bref_cmp_global_conn(const void *bref1, const void *bref2); static int bref_cmp_global_conn(const void *bref1, const void *bref2);
@ -103,10 +103,10 @@ bool select_connect_backend_servers(backend_ref_t **p_master_ref,
} }
/* get the root Master */ /* get the root Master */
BACKEND *master_host = get_root_master(backend_ref, router_nservers); SERVER_REF *master_host = get_root_master(backend_ref, router_nservers);
if (router->rwsplit_config.rw_master_failure_mode == RW_FAIL_INSTANTLY && if (router->rwsplit_config.rw_master_failure_mode == RW_FAIL_INSTANTLY &&
(master_host == NULL || SERVER_IS_DOWN(master_host->backend_server))) (master_host == NULL || SERVER_IS_DOWN(master_host->server)))
{ {
MXS_ERROR("Couldn't find suitable Master from %d candidates.", router_nservers); MXS_ERROR("Couldn't find suitable Master from %d candidates.", router_nservers);
return false; return false;
@ -145,7 +145,7 @@ bool select_connect_backend_servers(backend_ref_t **p_master_ref,
for (int i = 0; i < router_nservers && for (int i = 0; i < router_nservers &&
(slaves_connected < max_nslaves || !master_connected); i++) (slaves_connected < max_nslaves || !master_connected); i++)
{ {
SERVER *serv = backend_ref[i].bref_backend->backend_server; SERVER *serv = backend_ref[i].ref->server;
if (!BREF_HAS_FAILED(&backend_ref[i]) && SERVER_IS_RUNNING(serv)) if (!BREF_HAS_FAILED(&backend_ref[i]) && SERVER_IS_RUNNING(serv))
{ {
@ -155,7 +155,7 @@ bool select_connect_backend_servers(backend_ref_t **p_master_ref,
(serv->rlag != MAX_RLAG_NOT_AVAILABLE && (serv->rlag != MAX_RLAG_NOT_AVAILABLE &&
serv->rlag <= max_slave_rlag)) && serv->rlag <= max_slave_rlag)) &&
(SERVER_IS_SLAVE(serv) || SERVER_IS_RELAY_SERVER(serv)) && (SERVER_IS_SLAVE(serv) || SERVER_IS_RELAY_SERVER(serv)) &&
(master_host == NULL || (serv != master_host->backend_server))) (master_host == NULL || (serv != master_host->server)))
{ {
slaves_found += 1; slaves_found += 1;
@ -166,7 +166,7 @@ bool select_connect_backend_servers(backend_ref_t **p_master_ref,
} }
} }
/* take the master_host for master */ /* take the master_host for master */
else if (master_host && (serv == master_host->backend_server)) else if (master_host && (serv == master_host->server))
{ {
/** p_master_ref must be assigned with this backend_ref pointer /** p_master_ref must be assigned with this backend_ref pointer
* because its original value may have been lost when backend * because its original value may have been lost when backend
@ -205,9 +205,9 @@ bool select_connect_backend_servers(backend_ref_t **p_master_ref,
if (BREF_IS_IN_USE((&backend_ref[i]))) if (BREF_IS_IN_USE((&backend_ref[i])))
{ {
MXS_INFO("Selected %s in \t%s:%d", MXS_INFO("Selected %s in \t%s:%d",
STRSRVSTATUS(backend_ref[i].bref_backend->backend_server), STRSRVSTATUS(backend_ref[i].ref->server),
backend_ref[i].bref_backend->backend_server->name, backend_ref[i].ref->server->name,
backend_ref[i].bref_backend->backend_server->port); backend_ref[i].ref->server->port);
} }
} /* for */ } /* for */
} }
@ -226,12 +226,12 @@ bool select_connect_backend_servers(backend_ref_t **p_master_ref,
{ {
if (BREF_IS_IN_USE((&backend_ref[i]))) if (BREF_IS_IN_USE((&backend_ref[i])))
{ {
ss_dassert(backend_ref[i].bref_backend->backend_conn_count > 0); ss_dassert(backend_ref[i].ref->connections > 0);
/** disconnect opened connections */ /** disconnect opened connections */
bref_clear_state(&backend_ref[i], BREF_IN_USE); bref_clear_state(&backend_ref[i], BREF_IN_USE);
/** Decrease backend's connection counter. */ /** Decrease backend's connection counter. */
atomic_add(&backend_ref[i].bref_backend->backend_conn_count, -1); atomic_add(&backend_ref[i].ref->connections, -1);
dcb_close(backend_ref[i].bref_dcb); dcb_close(backend_ref[i].bref_dcb);
} }
} }
@ -243,13 +243,12 @@ bool select_connect_backend_servers(backend_ref_t **p_master_ref,
/** Compare number of connections from this router in backend servers */ /** Compare number of connections from this router in backend servers */
static int bref_cmp_router_conn(const void *bref1, const void *bref2) static int bref_cmp_router_conn(const void *bref1, const void *bref2)
{ {
BACKEND *b1 = ((backend_ref_t *)bref1)->bref_backend; SERVER_REF *b1 = ((backend_ref_t *)bref1)->ref;
BACKEND *b2 = ((backend_ref_t *)bref2)->bref_backend; SERVER_REF *b2 = ((backend_ref_t *)bref2)->ref;
if (b1->weight == 0 && b2->weight == 0) if (b1->weight == 0 && b2->weight == 0)
{ {
return b1->backend_server->stats.n_current - return b1->connections - b2->connections;
b2->backend_server->stats.n_current;
} }
else if (b1->weight == 0) else if (b1->weight == 0)
{ {
@ -260,20 +259,20 @@ static int bref_cmp_router_conn(const void *bref1, const void *bref2)
return -1; return -1;
} }
return ((1000 + 1000 * b1->backend_conn_count) / b1->weight) - return ((1000 + 1000 * b1->connections) / b1->weight) -
((1000 + 1000 * b2->backend_conn_count) / b2->weight); ((1000 + 1000 * b2->connections) / b2->weight);
} }
/** Compare number of global connections in backend servers */ /** Compare number of global connections in backend servers */
static int bref_cmp_global_conn(const void *bref1, const void *bref2) static int bref_cmp_global_conn(const void *bref1, const void *bref2)
{ {
BACKEND *b1 = ((backend_ref_t *)bref1)->bref_backend; SERVER_REF *b1 = ((backend_ref_t *)bref1)->ref;
BACKEND *b2 = ((backend_ref_t *)bref2)->bref_backend; SERVER_REF *b2 = ((backend_ref_t *)bref2)->ref;
if (b1->weight == 0 && b2->weight == 0) if (b1->weight == 0 && b2->weight == 0)
{ {
return b1->backend_server->stats.n_current - return b1->server->stats.n_current -
b2->backend_server->stats.n_current; b2->server->stats.n_current;
} }
else if (b1->weight == 0) else if (b1->weight == 0)
{ {
@ -284,32 +283,29 @@ static int bref_cmp_global_conn(const void *bref1, const void *bref2)
return -1; return -1;
} }
return ((1000 + 1000 * b1->backend_server->stats.n_current) / b1->weight) - return ((1000 + 1000 * b1->server->stats.n_current) / b1->weight) -
((1000 + 1000 * b2->backend_server->stats.n_current) / b2->weight); ((1000 + 1000 * b2->server->stats.n_current) / b2->weight);
} }
/** Compare replication lag between backend servers */ /** Compare replication lag between backend servers */
static int bref_cmp_behind_master(const void *bref1, const void *bref2) static int bref_cmp_behind_master(const void *bref1, const void *bref2)
{ {
BACKEND *b1 = ((backend_ref_t *)bref1)->bref_backend; SERVER_REF *b1 = ((backend_ref_t *)bref1)->ref;
BACKEND *b2 = ((backend_ref_t *)bref2)->bref_backend; SERVER_REF *b2 = ((backend_ref_t *)bref2)->ref;
return ((b1->backend_server->rlag < b2->backend_server->rlag) ? -1 return b1->server->rlag - b2->server->rlag;
: ((b1->backend_server->rlag > b2->backend_server->rlag) ? 1 : 0));
} }
/** Compare number of current operations in backend servers */ /** Compare number of current operations in backend servers */
static int bref_cmp_current_load(const void *bref1, const void *bref2) static int bref_cmp_current_load(const void *bref1, const void *bref2)
{ {
SERVER *s1 = ((backend_ref_t *)bref1)->bref_backend->backend_server; SERVER_REF *b1 = ((backend_ref_t *)bref1)->ref;
SERVER *s2 = ((backend_ref_t *)bref2)->bref_backend->backend_server; SERVER_REF *b2 = ((backend_ref_t *)bref2)->ref;
BACKEND *b1 = ((backend_ref_t *)bref1)->bref_backend;
BACKEND *b2 = ((backend_ref_t *)bref2)->bref_backend;
if (b1->weight == 0 && b2->weight == 0) if (b1->weight == 0 && b2->weight == 0)
{ {
return b1->backend_server->stats.n_current - // TODO: Fix this so that operations are used instead of connections
b2->backend_server->stats.n_current; return b1->server->stats.n_current - b2->server->stats.n_current;
} }
else if (b1->weight == 0) else if (b1->weight == 0)
{ {
@ -320,8 +316,8 @@ static int bref_cmp_current_load(const void *bref1, const void *bref2)
return -1; return -1;
} }
return ((1000 * s1->stats.n_current_ops) - b1->weight) - return ((1000 * b1->server->stats.n_current_ops) - b1->weight) -
((1000 * s2->stats.n_current_ops) - b2->weight); ((1000 * b2->server->stats.n_current_ops) - b2->weight);
} }
/** /**
@ -338,7 +334,7 @@ static int bref_cmp_current_load(const void *bref1, const void *bref2)
*/ */
static bool connect_server(backend_ref_t *bref, SESSION *session, bool execute_history) static bool connect_server(backend_ref_t *bref, SESSION *session, bool execute_history)
{ {
SERVER *serv = bref->bref_backend->backend_server; SERVER *serv = bref->ref->server;
bool rval = false; bool rval = false;
bref->bref_dcb = dcb_connect(serv, session, serv->protocol); bref->bref_dcb = dcb_connect(serv, session, serv->protocol);
@ -354,16 +350,16 @@ static bool connect_server(backend_ref_t *bref, SESSION *session, bool execute_h
&router_handle_state_switch, (void *) bref); &router_handle_state_switch, (void *) bref);
bref->bref_state = 0; bref->bref_state = 0;
bref_set_state(bref, BREF_IN_USE); bref_set_state(bref, BREF_IN_USE);
atomic_add(&bref->bref_backend->backend_conn_count, 1); atomic_add(&bref->ref->connections, 1);
rval = true; rval = true;
} }
else else
{ {
MXS_ERROR("Failed to execute session command in %s (%s:%d). See earlier " MXS_ERROR("Failed to execute session command in %s (%s:%d). See earlier "
"errors for more details.", "errors for more details.",
bref->bref_backend->backend_server->unique_name, bref->ref->server->unique_name,
bref->bref_backend->backend_server->name, bref->ref->server->name,
bref->bref_backend->backend_server->port); bref->ref->server->port);
dcb_close(bref->bref_dcb); dcb_close(bref->bref_dcb);
bref->bref_dcb = NULL; bref->bref_dcb = NULL;
} }
@ -398,33 +394,33 @@ static void log_server_connections(select_criteria_t select_criteria,
for (int i = 0; i < router_nservers; i++) for (int i = 0; i < router_nservers; i++)
{ {
BACKEND *b = backend_ref[i].bref_backend; SERVER_REF *b = backend_ref[i].ref;
switch (select_criteria) switch (select_criteria)
{ {
case LEAST_GLOBAL_CONNECTIONS: case LEAST_GLOBAL_CONNECTIONS:
MXS_INFO("MaxScale connections : %d in \t%s:%d %s", MXS_INFO("MaxScale connections : %d in \t%s:%d %s",
b->backend_server->stats.n_current, b->backend_server->name, b->server->stats.n_current, b->server->name,
b->backend_server->port, STRSRVSTATUS(b->backend_server)); b->server->port, STRSRVSTATUS(b->server));
break; break;
case LEAST_ROUTER_CONNECTIONS: case LEAST_ROUTER_CONNECTIONS:
MXS_INFO("RWSplit connections : %d in \t%s:%d %s", MXS_INFO("RWSplit connections : %d in \t%s:%d %s",
b->backend_conn_count, b->backend_server->name, b->connections, b->server->name,
b->backend_server->port, STRSRVSTATUS(b->backend_server)); b->server->port, STRSRVSTATUS(b->server));
break; break;
case LEAST_CURRENT_OPERATIONS: case LEAST_CURRENT_OPERATIONS:
MXS_INFO("current operations : %d in \t%s:%d %s", MXS_INFO("current operations : %d in \t%s:%d %s",
b->backend_server->stats.n_current_ops, b->server->stats.n_current_ops,
b->backend_server->name, b->backend_server->port, b->server->name, b->server->port,
STRSRVSTATUS(b->backend_server)); STRSRVSTATUS(b->server));
break; break;
case LEAST_BEHIND_MASTER: case LEAST_BEHIND_MASTER:
MXS_INFO("replication lag : %d in \t%s:%d %s", MXS_INFO("replication lag : %d in \t%s:%d %s",
b->backend_server->rlag, b->backend_server->name, b->server->rlag, b->server->name,
b->backend_server->port, STRSRVSTATUS(b->backend_server)); b->server->port, STRSRVSTATUS(b->server));
default: default:
break; break;
} }
@ -445,27 +441,26 @@ static void log_server_connections(select_criteria_t select_criteria,
* @return The Master found * @return The Master found
* *
*/ */
static BACKEND *get_root_master(backend_ref_t *servers, int router_nservers) static SERVER_REF *get_root_master(backend_ref_t *servers, int router_nservers)
{ {
int i = 0; int i = 0;
BACKEND *master_host = NULL; SERVER_REF *master_host = NULL;
for (i = 0; i < router_nservers; i++) for (i = 0; i < router_nservers; i++)
{ {
BACKEND *b; if (servers[i].ref == NULL)
if (servers[i].bref_backend == NULL)
{ {
/** This should not happen */
ss_dassert(false);
continue; continue;
} }
b = servers[i].bref_backend; SERVER_REF *b = servers[i].ref;
if ((b->backend_server->status & (SERVER_MASTER | SERVER_MAINT)) == if (SERVER_IS_MASTER(b->server))
SERVER_MASTER)
{ {
if (master_host == NULL || if (master_host == NULL ||
(b->backend_server->depth < master_host->backend_server->depth)) (b->server->depth < master_host->server->depth))
{ {
master_host = b; master_host = b;
} }

View File

@ -173,7 +173,7 @@ GWBUF *sescmd_cursor_process_replies(GWBUF *replybuf,
{ {
MXS_ERROR("Slave server '%s': response differs from master's response. " MXS_ERROR("Slave server '%s': response differs from master's response. "
"Closing connection due to inconsistent session state.", "Closing connection due to inconsistent session state.",
bref->bref_backend->backend_server->unique_name); bref->ref->server->unique_name);
sescmd_cursor_set_active(scur, false); sescmd_cursor_set_active(scur, false);
bref_clear_state(bref, BREF_QUERY_ACTIVE); bref_clear_state(bref, BREF_QUERY_ACTIVE);
bref_clear_state(bref, BREF_IN_USE); bref_clear_state(bref, BREF_IN_USE);
@ -205,7 +205,7 @@ GWBUF *sescmd_cursor_process_replies(GWBUF *replybuf,
scmd->reply_cmd = *((unsigned char *)replybuf->start + 4); scmd->reply_cmd = *((unsigned char *)replybuf->start + 4);
MXS_INFO("Server '%s' responded to a session command, sending the response " MXS_INFO("Server '%s' responded to a session command, sending the response "
"to the client.", bref->bref_backend->backend_server->unique_name); "to the client.", bref->ref->server->unique_name);
for (int i = 0; i < ses->rses_nbackends; i++) for (int i = 0; i < ses->rses_nbackends; i++)
{ {
@ -226,8 +226,8 @@ GWBUF *sescmd_cursor_process_replies(GWBUF *replybuf,
*reconnect = true; *reconnect = true;
MXS_INFO("Disabling slave %s:%d, result differs from " MXS_INFO("Disabling slave %s:%d, result differs from "
"master's result. Master: %d Slave: %d", "master's result. Master: %d Slave: %d",
ses->rses_backend_ref[i].bref_backend->backend_server->name, ses->rses_backend_ref[i].ref->server->name,
ses->rses_backend_ref[i].bref_backend->backend_server->port, ses->rses_backend_ref[i].ref->server->port,
bref->reply_cmd, ses->rses_backend_ref[i].reply_cmd); bref->reply_cmd, ses->rses_backend_ref[i].reply_cmd);
} }
} }
@ -237,11 +237,11 @@ GWBUF *sescmd_cursor_process_replies(GWBUF *replybuf,
else else
{ {
MXS_INFO("Slave '%s' responded before master to a session command. Result: %d", MXS_INFO("Slave '%s' responded before master to a session command. Result: %d",
bref->bref_backend->backend_server->unique_name, bref->ref->server->unique_name,
(int)bref->reply_cmd); (int)bref->reply_cmd);
if (bref->reply_cmd == 0xff) if (bref->reply_cmd == 0xff)
{ {
SERVER *serv = bref->bref_backend->backend_server; SERVER *serv = bref->ref->server;
MXS_ERROR("Slave '%s' (%s:%u) failed to execute session command.", MXS_ERROR("Slave '%s' (%s:%u) failed to execute session command.",
serv->unique_name, serv->name, serv->port); serv->unique_name, serv->name, serv->port);
} }