Merge branch 'develop' into MXS-936
This commit is contained in:
@ -43,7 +43,7 @@ MODULE_INFO info =
|
||||
* by the entry point functions. Some of these are used by functions in other
|
||||
* modules of the read write split router, others are used only within this
|
||||
* module.
|
||||
*
|
||||
*
|
||||
* @verbatim
|
||||
* Revision History
|
||||
*
|
||||
@ -65,6 +65,9 @@ MODULE_INFO info =
|
||||
* @endverbatim
|
||||
*/
|
||||
|
||||
/** Maximum number of slaves */
|
||||
#define MAX_SLAVE_COUNT 255
|
||||
|
||||
static char *version_str = "V1.1.0";
|
||||
|
||||
/*
|
||||
@ -105,13 +108,6 @@ static ROUTER_OBJECT MyObject =
|
||||
NULL
|
||||
};
|
||||
|
||||
/*
|
||||
* A couple of static variables that are used throughout the router
|
||||
*/
|
||||
|
||||
static SPINLOCK instlock;
|
||||
static ROUTER_INSTANCE *instances;
|
||||
|
||||
/*
|
||||
* Declaration of functions that are used only within this module, and are
|
||||
* not part of the API.
|
||||
@ -127,10 +123,9 @@ static void handle_error_reply_client(SESSION *ses, ROUTER_CLIENT_SES *rses,
|
||||
static bool handle_error_new_connection(ROUTER_INSTANCE *inst,
|
||||
ROUTER_CLIENT_SES **rses,
|
||||
DCB *backend_dcb, GWBUF *errmsg);
|
||||
static int router_get_servercount(ROUTER_INSTANCE *inst);
|
||||
static bool have_enough_servers(ROUTER_CLIENT_SES **p_rses, const int min_nsrv,
|
||||
static bool have_enough_servers(ROUTER_CLIENT_SES *rses, const int min_nsrv,
|
||||
int router_nsrv, ROUTER_INSTANCE *router);
|
||||
|
||||
static bool create_backends(ROUTER_CLIENT_SES *rses, backend_ref_t** dest, int* n_backend);
|
||||
/**
|
||||
* Implementation of the mandatory version entry point
|
||||
*
|
||||
@ -148,8 +143,6 @@ char *version()
|
||||
void ModuleInit()
|
||||
{
|
||||
MXS_NOTICE("Initializing statement-based read/write split router module.");
|
||||
spinlock_init(&instlock);
|
||||
instances = NULL;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -171,7 +164,7 @@ ROUTER_OBJECT *GetModuleObject()
|
||||
|
||||
/**
|
||||
* @brief Create an instance of the read/write router (API).
|
||||
*
|
||||
*
|
||||
* Create an instance of read/write statement router within the MaxScale. One
|
||||
* instance of the router is required for each service that is defined in the
|
||||
* configuration as using this router. One instance of the router will handle
|
||||
@ -219,19 +212,12 @@ static ROUTER *createInstance(SERVICE *service, char **options)
|
||||
router->rwsplit_config.rw_max_sescmd_history_size = 0;
|
||||
}
|
||||
|
||||
int nservers = 0;
|
||||
|
||||
for (SERVER_REF *ref = service->dbref; ref; ref = ref->next)
|
||||
{
|
||||
nservers++;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set default value for max_slave_connections as 100%. This way
|
||||
* LEAST_CURRENT_OPERATIONS allows us to balance evenly across all the
|
||||
* configured slaves.
|
||||
*/
|
||||
router->rwsplit_config.rw_max_slave_conn_count = nservers;
|
||||
router->rwsplit_config.rw_max_slave_conn_count = MAX_SLAVE_COUNT;
|
||||
|
||||
if (router->rwsplit_config.rw_slave_select_criteria == UNDEFINED_CRITERIA)
|
||||
{
|
||||
@ -267,15 +253,6 @@ static ROUTER *createInstance(SERVICE *service, char **options)
|
||||
{
|
||||
refreshInstance(router, param);
|
||||
}
|
||||
/**
|
||||
* We have completed the creation of the router data, so now
|
||||
* insert this router into the linked list of routers
|
||||
* that have been created with this module.
|
||||
*/
|
||||
spinlock_acquire(&instlock);
|
||||
router->next = instances;
|
||||
instances = router;
|
||||
spinlock_release(&instlock);
|
||||
|
||||
return (ROUTER *)router;
|
||||
}
|
||||
@ -283,14 +260,14 @@ static ROUTER *createInstance(SERVICE *service, char **options)
|
||||
/**
|
||||
* @brief Associate a new session with this instance of the router (API).
|
||||
*
|
||||
* The session is used to store all the data required by the router for a
|
||||
* particular client connection. The instance of the router that relates to a
|
||||
* The session is used to store all the data required by the router for a
|
||||
* particular client connection. The instance of the router that relates to a
|
||||
* particular service is passed as the first parameter. The second parameter is
|
||||
* the session that has been created in response to the request from a client
|
||||
* for a connection. The passed session contains generic information; this
|
||||
* function creates the session structure that holds router specific data.
|
||||
* There is often a one to one relationship between sessions and router
|
||||
* sessions, although it is possible to create configurations where a
|
||||
* sessions, although it is possible to create configurations where a
|
||||
* connection is handled by multiple routers, one after another.
|
||||
*
|
||||
* @param instance The router instance data
|
||||
@ -299,23 +276,12 @@ static ROUTER *createInstance(SERVICE *service, char **options)
|
||||
*/
|
||||
static void *newSession(ROUTER *router_inst, SESSION *session)
|
||||
{
|
||||
backend_ref_t *backend_ref; /*< array of backend references (DCB,BACKEND,cursor) */
|
||||
backend_ref_t *master_ref = NULL; /*< pointer to selected master */
|
||||
ROUTER_CLIENT_SES *client_rses = NULL;
|
||||
ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)router_inst;
|
||||
bool succp;
|
||||
int router_nservers = 0; /*< # of servers in total */
|
||||
int max_nslaves; /*< max # of slaves used in this session */
|
||||
int max_slave_rlag; /*< max allowed replication lag for any slave */
|
||||
int i;
|
||||
const int min_nservers = 1; /*< hard-coded for now */
|
||||
|
||||
client_rses = (ROUTER_CLIENT_SES *)MXS_CALLOC(1, sizeof(ROUTER_CLIENT_SES));
|
||||
ROUTER_CLIENT_SES *client_rses = (ROUTER_CLIENT_SES *)MXS_CALLOC(1, sizeof(ROUTER_CLIENT_SES));
|
||||
|
||||
if (client_rses == NULL)
|
||||
{
|
||||
ss_dassert(false);
|
||||
goto return_rses;
|
||||
return NULL;
|
||||
}
|
||||
#if defined(SS_DEBUG)
|
||||
client_rses->rses_chk_top = CHK_NUM_ROUTER_SES;
|
||||
@ -324,119 +290,57 @@ static void *newSession(ROUTER *router_inst, SESSION *session)
|
||||
|
||||
client_rses->router = router;
|
||||
client_rses->client_dcb = session->client_dcb;
|
||||
/**
|
||||
* If service config has been changed, reload config from service to
|
||||
* router instance first.
|
||||
*/
|
||||
spinlock_acquire(&router->lock);
|
||||
|
||||
if (router->service->svc_config_version > router->rwsplit_version)
|
||||
{
|
||||
/** re-read all parameters to rwsplit config structure */
|
||||
refreshInstance(router, NULL); /*< scan through all parameters */
|
||||
/** increment rwsplit router's config version number */
|
||||
router->rwsplit_version = router->service->svc_config_version;
|
||||
/** Read options */
|
||||
rwsplit_process_router_options(router, router->service->routerOptions);
|
||||
}
|
||||
/** Copy config struct from router instance */
|
||||
memcpy(&client_rses->rses_config, &router->rwsplit_config, sizeof(rwsplit_config_t));
|
||||
|
||||
spinlock_release(&router->lock);
|
||||
/**
|
||||
* Set defaults to session variables.
|
||||
*/
|
||||
client_rses->rses_autocommit_enabled = true;
|
||||
client_rses->rses_transaction_active = false;
|
||||
client_rses->have_tmp_tables = false;
|
||||
client_rses->forced_node = NULL;
|
||||
spinlock_init(&client_rses->rses_lock);
|
||||
memcpy(&client_rses->rses_config, &router->rwsplit_config, sizeof(client_rses->rses_config));
|
||||
|
||||
router_nservers = router_get_servercount(router);
|
||||
int router_nservers = router->service->n_dbref;
|
||||
const int min_nservers = 1; /*< hard-coded for now */
|
||||
|
||||
if (!have_enough_servers(&client_rses, min_nservers, router_nservers, router))
|
||||
if (!have_enough_servers(client_rses, min_nservers, router_nservers, router))
|
||||
{
|
||||
goto return_rses;
|
||||
MXS_FREE(client_rses);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create backend reference objects for this session.
|
||||
*/
|
||||
backend_ref = (backend_ref_t *)MXS_CALLOC(1, router_nservers * sizeof(backend_ref_t));
|
||||
backend_ref_t *backend_ref;
|
||||
|
||||
if (backend_ref == NULL)
|
||||
if (!create_backends(client_rses, &backend_ref, &router_nservers))
|
||||
{
|
||||
/** log this */
|
||||
MXS_FREE(client_rses);
|
||||
MXS_FREE(backend_ref);
|
||||
client_rses = NULL;
|
||||
goto return_rses;
|
||||
return NULL;
|
||||
}
|
||||
/**
|
||||
* Initialize backend references with BACKEND ptr.
|
||||
* Initialize session command cursors for each backend reference.
|
||||
*/
|
||||
i = 0;
|
||||
for (SERVER_REF *sref = router->service->dbref; sref; sref = sref->next)
|
||||
{
|
||||
#if defined(SS_DEBUG)
|
||||
backend_ref[i].bref_chk_top = CHK_NUM_BACKEND_REF;
|
||||
backend_ref[i].bref_chk_tail = CHK_NUM_BACKEND_REF;
|
||||
backend_ref[i].bref_sescmd_cur.scmd_cur_chk_top = CHK_NUM_SESCMD_CUR;
|
||||
backend_ref[i].bref_sescmd_cur.scmd_cur_chk_tail = CHK_NUM_SESCMD_CUR;
|
||||
#endif
|
||||
backend_ref[i].bref_state = 0;
|
||||
backend_ref[i].ref = sref;
|
||||
/** store pointers to sescmd list to both cursors */
|
||||
backend_ref[i].bref_sescmd_cur.scmd_cur_rses = client_rses;
|
||||
backend_ref[i].bref_sescmd_cur.scmd_cur_active = false;
|
||||
backend_ref[i].bref_sescmd_cur.scmd_cur_ptr_property =
|
||||
&client_rses->rses_properties[RSES_PROP_TYPE_SESCMD];
|
||||
backend_ref[i].bref_sescmd_cur.scmd_cur_cmd = NULL;
|
||||
i++;
|
||||
}
|
||||
max_nslaves = rses_get_max_slavecount(client_rses, router_nservers);
|
||||
max_slave_rlag = rses_get_max_replication_lag(client_rses);
|
||||
|
||||
spinlock_init(&client_rses->rses_lock);
|
||||
int max_nslaves = rses_get_max_slavecount(client_rses, router_nservers);
|
||||
int max_slave_rlag = rses_get_max_replication_lag(client_rses);
|
||||
|
||||
client_rses->rses_backend_ref = backend_ref;
|
||||
client_rses->rses_nbackends = router_nservers; /*< # of backend servers */
|
||||
|
||||
/**
|
||||
* Find a backend servers to connect to.
|
||||
* This command requires that rsession's lock is held.
|
||||
*/
|
||||
|
||||
succp = rses_begin_locked_router_action(client_rses);
|
||||
|
||||
if (!succp)
|
||||
backend_ref_t *master_ref = NULL; /*< pointer to selected master */
|
||||
if (!select_connect_backend_servers(&master_ref, backend_ref, router_nservers,
|
||||
max_nslaves, max_slave_rlag,
|
||||
client_rses->rses_config.rw_slave_select_criteria,
|
||||
session, router))
|
||||
{
|
||||
/**
|
||||
* Master and at least <min_nslaves> slaves must be found if the router is
|
||||
* in the strict mode. If sessions without master are allowed, only
|
||||
* <min_nslaves> slaves must be found.
|
||||
*/
|
||||
MXS_FREE(client_rses->rses_backend_ref);
|
||||
MXS_FREE(client_rses);
|
||||
client_rses = NULL;
|
||||
goto return_rses;
|
||||
}
|
||||
succp = select_connect_backend_servers(&master_ref, backend_ref, router_nservers,
|
||||
max_nslaves, max_slave_rlag,
|
||||
client_rses->rses_config.rw_slave_select_criteria,
|
||||
session, router);
|
||||
|
||||
rses_end_locked_router_action(client_rses);
|
||||
|
||||
/**
|
||||
* Master and at least <min_nslaves> slaves must be found if the router is
|
||||
* in the strict mode. If sessions without master are allowed, only
|
||||
* <min_nslaves> slaves must be found.
|
||||
*/
|
||||
if (!succp)
|
||||
{
|
||||
MXS_FREE(client_rses->rses_backend_ref);
|
||||
MXS_FREE(client_rses);
|
||||
client_rses = NULL;
|
||||
goto return_rses;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/** Copy backend pointers to router session. */
|
||||
client_rses->rses_master_ref = master_ref;
|
||||
client_rses->rses_backend_ref = backend_ref;
|
||||
client_rses->rses_nbackends = router_nservers; /*< # of backend servers */
|
||||
|
||||
if (client_rses->rses_config.rw_max_slave_conn_percent)
|
||||
{
|
||||
@ -448,35 +352,15 @@ static void *newSession(ROUTER *router_inst, SESSION *session)
|
||||
|
||||
router->stats.n_sessions += 1;
|
||||
|
||||
/**
|
||||
* Version is bigger than zero once initialized.
|
||||
*/
|
||||
atomic_add(&client_rses->rses_versno, 2);
|
||||
ss_dassert(client_rses->rses_versno == 2);
|
||||
/**
|
||||
* Add this session to end of the list of active sessions in router.
|
||||
*/
|
||||
spinlock_acquire(&router->lock);
|
||||
client_rses->next = router->connections;
|
||||
router->connections = client_rses;
|
||||
spinlock_release(&router->lock);
|
||||
|
||||
return_rses:
|
||||
#if defined(SS_DEBUG)
|
||||
if (client_rses != NULL)
|
||||
{
|
||||
CHK_CLIENT_RSES(client_rses);
|
||||
}
|
||||
#endif
|
||||
return (void *)client_rses;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Close a router session (API).
|
||||
*
|
||||
* Close a session with the router, this is the mechanism by which a router
|
||||
* may cleanup data structure etc. The instance of the router that relates to
|
||||
* the relevant service is passed, along with the router session that is to
|
||||
*
|
||||
* Close a session with the router, this is the mechanism by which a router
|
||||
* may cleanup data structure etc. The instance of the router that relates to
|
||||
* the relevant service is passed, along with the router session that is to
|
||||
* be closed. Typically the function is used in conjunction with freeSession
|
||||
* which will release the resources used by a router session (see below).
|
||||
*
|
||||
@ -485,66 +369,38 @@ return_rses:
|
||||
*/
|
||||
static void closeSession(ROUTER *instance, void *router_session)
|
||||
{
|
||||
ROUTER_CLIENT_SES *router_cli_ses;
|
||||
backend_ref_t *backend_ref;
|
||||
|
||||
MXS_DEBUG("%lu [RWSplit:closeSession]", pthread_self());
|
||||
|
||||
/**
|
||||
* router session can be NULL if newSession failed and it is discarding
|
||||
* its connections and DCB's.
|
||||
*/
|
||||
if (router_session == NULL)
|
||||
{
|
||||
return;
|
||||
}
|
||||
router_cli_ses = (ROUTER_CLIENT_SES *)router_session;
|
||||
ROUTER_CLIENT_SES *router_cli_ses = (ROUTER_CLIENT_SES *)router_session;
|
||||
CHK_CLIENT_RSES(router_cli_ses);
|
||||
|
||||
backend_ref = router_cli_ses->rses_backend_ref;
|
||||
/**
|
||||
* Lock router client session for secure read and update.
|
||||
*/
|
||||
if (!router_cli_ses->rses_closed &&
|
||||
rses_begin_locked_router_action(router_cli_ses))
|
||||
if (!router_cli_ses->rses_closed && rses_begin_locked_router_action(router_cli_ses))
|
||||
{
|
||||
int i;
|
||||
/**
|
||||
* This sets router closed. Nobody is allowed to use router
|
||||
* without checking this first.
|
||||
* Mark router session as closed. @c rses_closed is checked at the start
|
||||
* of every API function to quickly stop the processing of closed sessions.
|
||||
*/
|
||||
router_cli_ses->rses_closed = true;
|
||||
|
||||
for (i = 0; i < router_cli_ses->rses_nbackends; i++)
|
||||
for (int i = 0; i < router_cli_ses->rses_nbackends; i++)
|
||||
{
|
||||
backend_ref_t *bref = &backend_ref[i];
|
||||
DCB *dcb = bref->bref_dcb;
|
||||
/** Close those which had been connected */
|
||||
backend_ref_t *bref = &router_cli_ses->rses_backend_ref[i];
|
||||
|
||||
if (BREF_IS_IN_USE(bref))
|
||||
{
|
||||
/** This backend is in use and it needs to be closed */
|
||||
DCB *dcb = bref->bref_dcb;
|
||||
CHK_DCB(dcb);
|
||||
#if defined(SS_DEBUG)
|
||||
/**
|
||||
* session must be moved to SESSION_STATE_STOPPING state before
|
||||
* router session is closed.
|
||||
*/
|
||||
if (dcb->session != NULL)
|
||||
{
|
||||
ss_dassert(dcb->session->state == SESSION_STATE_STOPPING);
|
||||
}
|
||||
#endif
|
||||
/** Clean operation counter in bref and in SERVER */
|
||||
ss_dassert(dcb->session->state == SESSION_STATE_STOPPING);
|
||||
|
||||
if (BREF_IS_WAITING_RESULT(bref))
|
||||
{
|
||||
/** This backend was executing a query when the session was closed */
|
||||
bref_clear_state(bref, BREF_WAITING_RESULT);
|
||||
}
|
||||
bref_clear_state(bref, BREF_IN_USE);
|
||||
bref_set_state(bref, BREF_CLOSED);
|
||||
/**
|
||||
* closes protocol and dcb
|
||||
*/
|
||||
dcb_close(dcb);
|
||||
/** decrease server current connection counters */
|
||||
|
||||
/** Decrease server reference connection count */
|
||||
atomic_add(&bref->ref->connections, -1);
|
||||
}
|
||||
else
|
||||
@ -561,15 +417,15 @@ static void closeSession(ROUTER *instance, void *router_session)
|
||||
}
|
||||
}
|
||||
}
|
||||
/** Unlock */
|
||||
|
||||
rses_end_locked_router_action(router_cli_ses);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Free a router session (API).
|
||||
*
|
||||
* When a router session has been closed, freeSession can be called to free
|
||||
*
|
||||
* When a router session has been closed, freeSession can be called to free
|
||||
* allocated resources.
|
||||
*
|
||||
* @param router_instance The router instance the session belongs to
|
||||
@ -578,40 +434,13 @@ static void closeSession(ROUTER *instance, void *router_session)
|
||||
*/
|
||||
static void freeSession(ROUTER *router_instance, void *router_client_session)
|
||||
{
|
||||
ROUTER_CLIENT_SES *router_cli_ses;
|
||||
ROUTER_INSTANCE *router;
|
||||
int i;
|
||||
|
||||
router_cli_ses = (ROUTER_CLIENT_SES *)router_client_session;
|
||||
router = (ROUTER_INSTANCE *)router_instance;
|
||||
|
||||
spinlock_acquire(&router->lock);
|
||||
|
||||
if (router->connections == router_cli_ses)
|
||||
{
|
||||
router->connections = router_cli_ses->next;
|
||||
}
|
||||
else
|
||||
{
|
||||
ROUTER_CLIENT_SES *ptr = router->connections;
|
||||
|
||||
while (ptr && ptr->next != router_cli_ses)
|
||||
{
|
||||
ptr = ptr->next;
|
||||
}
|
||||
|
||||
if (ptr)
|
||||
{
|
||||
ptr->next = router_cli_ses->next;
|
||||
}
|
||||
}
|
||||
spinlock_release(&router->lock);
|
||||
ROUTER_CLIENT_SES *router_cli_ses = (ROUTER_CLIENT_SES *)router_client_session;
|
||||
|
||||
/**
|
||||
* For each property type, walk through the list, finalize properties
|
||||
* and free the allocated memory.
|
||||
*/
|
||||
for (i = RSES_PROP_TYPE_FIRST; i < RSES_PROP_TYPE_COUNT; i++)
|
||||
for (int i = RSES_PROP_TYPE_FIRST; i < RSES_PROP_TYPE_COUNT; i++)
|
||||
{
|
||||
rses_property_t *p = router_cli_ses->rses_properties[i];
|
||||
rses_property_t *q = p;
|
||||
@ -623,11 +452,7 @@ static void freeSession(ROUTER *router_instance, void *router_client_session)
|
||||
p = q;
|
||||
}
|
||||
}
|
||||
/*
|
||||
* We are no longer in the linked list, free
|
||||
* all the memory and other resources associated
|
||||
* to the client session.
|
||||
*/
|
||||
|
||||
MXS_FREE(router_cli_ses->rses_backend_ref);
|
||||
MXS_FREE(router_cli_ses);
|
||||
return;
|
||||
@ -685,20 +510,8 @@ static int routeQuery(ROUTER *instance, void *router_session, GWBUF *querybuf)
|
||||
*/
|
||||
static void diagnostics(ROUTER *instance, DCB *dcb)
|
||||
{
|
||||
ROUTER_CLIENT_SES *router_cli_ses;
|
||||
ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)instance;
|
||||
int i = 0;
|
||||
char *weightby;
|
||||
|
||||
spinlock_acquire(&router->lock);
|
||||
router_cli_ses = router->connections;
|
||||
while (router_cli_ses)
|
||||
{
|
||||
i++;
|
||||
router_cli_ses = router_cli_ses->next;
|
||||
}
|
||||
spinlock_release(&router->lock);
|
||||
|
||||
double master_pct = 0.0, slave_pct = 0.0, all_pct = 0.0;
|
||||
|
||||
if (router->stats.n_queries > 0)
|
||||
@ -710,7 +523,8 @@ static void diagnostics(ROUTER *instance, DCB *dcb)
|
||||
|
||||
dcb_printf(dcb, "\tNumber of router sessions: %d\n",
|
||||
router->stats.n_sessions);
|
||||
dcb_printf(dcb, "\tCurrent no. of router sessions: %d\n", i);
|
||||
dcb_printf(dcb, "\tCurrent no. of router sessions: %d\n",
|
||||
router->service->stats.n_current);
|
||||
dcb_printf(dcb, "\tNumber of queries forwarded: %d\n",
|
||||
router->stats.n_queries);
|
||||
dcb_printf(dcb, "\tNumber of queries forwarded to master: %d (%.2f%%)\n",
|
||||
@ -731,9 +545,9 @@ static void diagnostics(ROUTER *instance, DCB *dcb)
|
||||
for (SERVER_REF *ref = router->service->dbref; ref; ref = ref->next)
|
||||
{
|
||||
dcb_printf(dcb, "\t\t%-20s %3.1f%% %-6d %-6d %d\n",
|
||||
ref->server->unique_name, (float)ref->weight / 10,
|
||||
ref->server->stats.n_current, ref->connections,
|
||||
ref->server->stats.n_current_ops);
|
||||
ref->server->unique_name, (float)ref->weight / 10,
|
||||
ref->server->stats.n_current, ref->connections,
|
||||
ref->server->stats.n_current_ops);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -767,7 +581,7 @@ static void clientReply(ROUTER *instance, void *router_session, GWBUF *writebuf,
|
||||
*/
|
||||
if (!rses_begin_locked_router_action(router_cli_ses))
|
||||
{
|
||||
print_error_packet(router_cli_ses, writebuf, backend_dcb);
|
||||
gwbuf_free(writebuf);
|
||||
goto lock_failed;
|
||||
}
|
||||
/** Holding lock ensures that router session remains open */
|
||||
@ -818,7 +632,7 @@ static void clientReply(ROUTER *instance, void *router_session, GWBUF *writebuf,
|
||||
if (sescmd_cursor_is_active(scur))
|
||||
{
|
||||
check_session_command_reply(writebuf, scur, bref);
|
||||
|
||||
|
||||
if (GWBUF_IS_TYPE_SESCMD_RESPONSE(writebuf))
|
||||
{
|
||||
/**
|
||||
@ -898,7 +712,7 @@ static void clientReply(ROUTER *instance, void *router_session, GWBUF *writebuf,
|
||||
CHK_GWBUF(bref->bref_pending_cmd);
|
||||
|
||||
if ((ret = bref->bref_dcb->func.write(bref->bref_dcb,
|
||||
gwbuf_clone(bref->bref_pending_cmd))) == 1)
|
||||
gwbuf_clone(bref->bref_pending_cmd))) == 1)
|
||||
{
|
||||
ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance;
|
||||
atomic_add(&inst->stats.n_queries, 1);
|
||||
@ -1014,12 +828,12 @@ void rses_end_locked_router_action(ROUTER_CLIENT_SES *rses)
|
||||
|
||||
/*
|
||||
* @brief Clear one or more bits in the backend reference state
|
||||
*
|
||||
* The router session holds details of the backend servers that are
|
||||
* involved in the routing for this particular service. Each backend
|
||||
* server has a state bit string, and this function (along with
|
||||
*
|
||||
* The router session holds details of the backend servers that are
|
||||
* involved in the routing for this particular service. Each backend
|
||||
* server has a state bit string, and this function (along with
|
||||
* bref_set_state) is used to manage the state.
|
||||
*
|
||||
*
|
||||
* @param bref The backend reference to be modified
|
||||
* @param state A bit string where the 1 bits indicate bits that should
|
||||
* be turned off in the bref state.
|
||||
@ -1063,12 +877,12 @@ void bref_clear_state(backend_ref_t *bref, bref_state_t state)
|
||||
|
||||
/*
|
||||
* @brief Set one or more bits in the backend reference state
|
||||
*
|
||||
* The router session holds details of the backend servers that are
|
||||
* involved in the routing for this particular service. Each backend
|
||||
* server has a state bit string, and this function (along with
|
||||
*
|
||||
* The router session holds details of the backend servers that are
|
||||
* involved in the routing for this particular service. Each backend
|
||||
* server has a state bit string, and this function (along with
|
||||
* bref_clear_state) is used to manage the state.
|
||||
*
|
||||
*
|
||||
* @param bref The backend reference to be modified
|
||||
* @param state A bit string where the 1 bits indicate bits that should
|
||||
* be turned on in the bref state.
|
||||
@ -1110,9 +924,9 @@ void bref_set_state(backend_ref_t *bref, bref_state_t state)
|
||||
|
||||
/**
|
||||
* @brief Free resources belonging to a property
|
||||
*
|
||||
*
|
||||
* Property is freed at the end of router client session.
|
||||
*
|
||||
*
|
||||
* @param prop The property whose resources are to be released
|
||||
*/
|
||||
void rses_property_done(rses_property_t *prop)
|
||||
@ -1146,16 +960,16 @@ void rses_property_done(rses_property_t *prop)
|
||||
|
||||
/**
|
||||
* @brief Get count of backend servers that are slaves.
|
||||
*
|
||||
*
|
||||
* Find out the number of read backend servers.
|
||||
* Depending on the configuration value type, either copy direct count
|
||||
* of slave connections or calculate the count from percentage value.
|
||||
*
|
||||
*
|
||||
* @param rses Router client session
|
||||
* @param router_nservers The number of backend servers in total
|
||||
*/
|
||||
int rses_get_max_slavecount(ROUTER_CLIENT_SES *rses,
|
||||
int router_nservers)
|
||||
int router_nservers)
|
||||
{
|
||||
int conf_max_nslaves;
|
||||
int max_nslaves;
|
||||
@ -1177,7 +991,7 @@ int rses_get_max_slavecount(ROUTER_CLIENT_SES *rses,
|
||||
|
||||
/*
|
||||
* @brief Get the maximum replication lag for this router
|
||||
*
|
||||
*
|
||||
* @param rses Router client session
|
||||
* @return Replication lag from configuration or very large number
|
||||
*/
|
||||
@ -1202,10 +1016,10 @@ int rses_get_max_replication_lag(ROUTER_CLIENT_SES *rses)
|
||||
|
||||
/**
|
||||
* @brief Find a back end reference that matches the given DCB
|
||||
*
|
||||
*
|
||||
* Finds out if there is a backend reference pointing at the DCB given as
|
||||
* parameter.
|
||||
*
|
||||
*
|
||||
* @param rses router client session
|
||||
* @param dcb DCB
|
||||
*
|
||||
@ -1239,14 +1053,14 @@ backend_ref_t *get_bref_from_dcb(ROUTER_CLIENT_SES *rses, DCB *dcb)
|
||||
|
||||
/**
|
||||
* @brief Call hang up function
|
||||
*
|
||||
*
|
||||
* Calls hang-up function for DCB if it is not both running and in
|
||||
* master/slave/joined/ndb role. Called by DCB's callback routine.
|
||||
*
|
||||
*
|
||||
* @param dcb DCB relating to a backend server
|
||||
* @param reason The reason for the state change
|
||||
* @param data Data is a backend reference structure belonging to this router
|
||||
*
|
||||
*
|
||||
* @return 1 for success, 0 for failure
|
||||
*/
|
||||
int router_handle_state_switch(DCB *dcb, DCB_REASON reason, void *data)
|
||||
@ -1346,10 +1160,10 @@ static bool rwsplit_process_router_options(ROUTER_INSTANCE *router,
|
||||
if (c == UNDEFINED_CRITERIA)
|
||||
{
|
||||
MXS_ERROR("Unknown slave selection criteria \"%s\". "
|
||||
"Allowed values are LEAST_GLOBAL_CONNECTIONS, "
|
||||
"LEAST_ROUTER_CONNECTIONS, LEAST_BEHIND_MASTER,"
|
||||
"and LEAST_CURRENT_OPERATIONS.",
|
||||
STRCRITERIA(router->rwsplit_config.rw_slave_select_criteria));
|
||||
"Allowed values are LEAST_GLOBAL_CONNECTIONS, "
|
||||
"LEAST_ROUTER_CONNECTIONS, LEAST_BEHIND_MASTER,"
|
||||
"and LEAST_CURRENT_OPERATIONS.",
|
||||
STRCRITERIA(router->rwsplit_config.rw_slave_select_criteria));
|
||||
success = false;
|
||||
}
|
||||
else
|
||||
@ -1407,9 +1221,9 @@ static bool rwsplit_process_router_options(ROUTER_INSTANCE *router,
|
||||
|
||||
/**
|
||||
* @brief Router error handling routine (API)
|
||||
*
|
||||
*
|
||||
* Error Handler routine to resolve _backend_ failures. If it succeeds then
|
||||
* there are enough operative backends available and connected. Otherwise it
|
||||
* there are enough operative backends available and connected. Otherwise it
|
||||
* fails, and session is terminated.
|
||||
*
|
||||
* @param instance The router instance
|
||||
@ -1472,8 +1286,8 @@ static void handleError(ROUTER *instance, void *router_session,
|
||||
if (!rses_begin_locked_router_action(rses))
|
||||
{
|
||||
close_dcb = false; /* With the assumption that if the router session is closed,
|
||||
* then so is the dcb.
|
||||
*/
|
||||
* then so is the dcb.
|
||||
*/
|
||||
*succp = false;
|
||||
break;
|
||||
}
|
||||
@ -1570,8 +1384,8 @@ static void handleError(ROUTER *instance, void *router_session,
|
||||
|
||||
if (close_dcb)
|
||||
{
|
||||
dcb_close(problem_dcb);
|
||||
}
|
||||
dcb_close(problem_dcb);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -1657,7 +1471,6 @@ static bool handle_error_new_connection(ROUTER_INSTANCE *inst,
|
||||
{
|
||||
ROUTER_CLIENT_SES *myrses;
|
||||
SESSION *ses;
|
||||
int router_nservers;
|
||||
int max_nslaves;
|
||||
int max_slave_rlag;
|
||||
backend_ref_t *bref;
|
||||
@ -1711,8 +1524,8 @@ static bool handle_error_new_connection(ROUTER_INSTANCE *inst,
|
||||
*/
|
||||
dcb_remove_callback(backend_dcb, DCB_REASON_NOT_RESPONDING,
|
||||
&router_handle_state_switch, (void *)bref);
|
||||
router_nservers = router_get_servercount(inst);
|
||||
max_nslaves = rses_get_max_slavecount(myrses, router_nservers);
|
||||
|
||||
max_nslaves = rses_get_max_slavecount(myrses, myrses->rses_nbackends);
|
||||
max_slave_rlag = rses_get_max_replication_lag(myrses);
|
||||
/**
|
||||
* Try to get replacement slave or at least the minimum
|
||||
@ -1720,13 +1533,13 @@ static bool handle_error_new_connection(ROUTER_INSTANCE *inst,
|
||||
*/
|
||||
if (inst->rwsplit_config.rw_disable_sescmd_hist)
|
||||
{
|
||||
succp = have_enough_servers(&myrses, 1, router_nservers, inst) ? true : false;
|
||||
succp = have_enough_servers(myrses, 1, myrses->rses_nbackends, inst) ? true : false;
|
||||
}
|
||||
else
|
||||
{
|
||||
succp = select_connect_backend_servers(&myrses->rses_master_ref,
|
||||
myrses->rses_backend_ref,
|
||||
router_nservers,
|
||||
myrses->rses_nbackends,
|
||||
max_nslaves, max_slave_rlag,
|
||||
myrses->rses_config.rw_slave_select_criteria,
|
||||
ses, inst);
|
||||
@ -1736,25 +1549,6 @@ return_succp:
|
||||
return succp;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Calculate the number of backend servers
|
||||
*
|
||||
* @param inst Router instance
|
||||
*
|
||||
* @return int - count of servers
|
||||
*/
|
||||
static int router_get_servercount(ROUTER_INSTANCE *inst)
|
||||
{
|
||||
int router_nservers = 0;
|
||||
|
||||
for (SERVER_REF *ref = inst->service->dbref; ref; ref = ref->next)
|
||||
{
|
||||
router_nservers++;
|
||||
}
|
||||
|
||||
return router_nservers;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Calculate whether we have enough servers to route a query
|
||||
*
|
||||
@ -1765,16 +1559,16 @@ static int router_get_servercount(ROUTER_INSTANCE *inst)
|
||||
*
|
||||
* @return bool - whether enough, side effect is error logging
|
||||
*/
|
||||
static bool have_enough_servers(ROUTER_CLIENT_SES **p_rses, const int min_nsrv,
|
||||
static bool have_enough_servers(ROUTER_CLIENT_SES *rses, const int min_nsrv,
|
||||
int router_nsrv, ROUTER_INSTANCE *router)
|
||||
{
|
||||
bool succp;
|
||||
|
||||
/** With too few servers session is not created */
|
||||
if (router_nsrv < min_nsrv ||
|
||||
MXS_MAX((*p_rses)->rses_config.rw_max_slave_conn_count,
|
||||
(router_nsrv * (*p_rses)->rses_config.rw_max_slave_conn_percent) /
|
||||
100) < min_nsrv)
|
||||
MXS_MAX((rses)->rses_config.rw_max_slave_conn_count,
|
||||
(router_nsrv * (rses)->rses_config.rw_max_slave_conn_percent) /
|
||||
100) < min_nsrv)
|
||||
{
|
||||
if (router_nsrv < min_nsrv)
|
||||
{
|
||||
@ -1785,16 +1579,16 @@ static bool have_enough_servers(ROUTER_CLIENT_SES **p_rses, const int min_nsrv,
|
||||
}
|
||||
else
|
||||
{
|
||||
int pct = (*p_rses)->rses_config.rw_max_slave_conn_percent / 100;
|
||||
int pct = (rses)->rses_config.rw_max_slave_conn_percent / 100;
|
||||
int nservers = router_nsrv * pct;
|
||||
|
||||
if ((*p_rses)->rses_config.rw_max_slave_conn_count < min_nsrv)
|
||||
if ((rses)->rses_config.rw_max_slave_conn_count < min_nsrv)
|
||||
{
|
||||
MXS_ERROR("Unable to start %s service. There are "
|
||||
"too few backend servers configured in "
|
||||
"MaxScale.cnf. Found %d when %d is required.",
|
||||
router->service->name,
|
||||
(*p_rses)->rses_config.rw_max_slave_conn_count, min_nsrv);
|
||||
(rses)->rses_config.rw_max_slave_conn_count, min_nsrv);
|
||||
}
|
||||
if (nservers < min_nsrv)
|
||||
{
|
||||
@ -1804,11 +1598,9 @@ static bool have_enough_servers(ROUTER_CLIENT_SES **p_rses, const int min_nsrv,
|
||||
"MaxScale.cnf. Found %d%% when at least %.0f%% "
|
||||
"would be required.",
|
||||
router->service->name,
|
||||
(*p_rses)->rses_config.rw_max_slave_conn_percent, dbgpct);
|
||||
(rses)->rses_config.rw_max_slave_conn_percent, dbgpct);
|
||||
}
|
||||
}
|
||||
MXS_FREE(*p_rses);
|
||||
*p_rses = NULL;
|
||||
succp = false;
|
||||
}
|
||||
else
|
||||
@ -1818,62 +1610,11 @@ static bool have_enough_servers(ROUTER_CLIENT_SES **p_rses, const int min_nsrv,
|
||||
return succp;
|
||||
}
|
||||
|
||||
#if defined(PREP_STMT_CACHING)
|
||||
#define MAX_STMT_LEN 1024
|
||||
|
||||
static prep_stmt_t *prep_stmt_init(prep_stmt_type_t type, void *id)
|
||||
{
|
||||
prep_stmt_t *pstmt;
|
||||
|
||||
pstmt = (prep_stmt_t *)MXS_CALLOC(1, sizeof(prep_stmt_t));
|
||||
|
||||
if (pstmt != NULL)
|
||||
{
|
||||
#if defined(SS_DEBUG)
|
||||
pstmt->pstmt_chk_top = CHK_NUM_PREP_STMT;
|
||||
pstmt->pstmt_chk_tail = CHK_NUM_PREP_STMT;
|
||||
#endif
|
||||
pstmt->pstmt_state = PREP_STMT_ALLOC;
|
||||
pstmt->pstmt_type = type;
|
||||
|
||||
if (type == PREP_STMT_NAME)
|
||||
{
|
||||
pstmt->pstmt_id.name = strndup((char *)id, MAX_STMT_LEN);
|
||||
}
|
||||
else
|
||||
{
|
||||
pstmt->pstmt_id.seq = 0;
|
||||
}
|
||||
}
|
||||
CHK_PREP_STMT(pstmt);
|
||||
return pstmt;
|
||||
}
|
||||
|
||||
static void prep_stmt_done(prep_stmt_t *pstmt)
|
||||
{
|
||||
CHK_PREP_STMT(pstmt);
|
||||
|
||||
if (pstmt->pstmt_type == PREP_STMT_NAME)
|
||||
{
|
||||
MXS_FREE(pstmt->pstmt_id.name);
|
||||
}
|
||||
MXS_FREE(pstmt);
|
||||
}
|
||||
|
||||
static bool prep_stmt_drop(prep_stmt_t *pstmt)
|
||||
{
|
||||
CHK_PREP_STMT(pstmt);
|
||||
|
||||
pstmt->pstmt_state = PREP_STMT_DROPPED;
|
||||
return true;
|
||||
}
|
||||
#endif /*< PREP_STMT_CACHING */
|
||||
|
||||
/**
|
||||
* @brief Refresh the instance by the given parameter value.
|
||||
*
|
||||
* Used by createInstance and newSession
|
||||
*
|
||||
*
|
||||
* @param router Router instance
|
||||
* @param singleparam Parameter fo be reloaded
|
||||
*
|
||||
@ -1973,53 +1714,15 @@ static void refreshInstance(ROUTER_INSTANCE *router,
|
||||
}
|
||||
param = param->next;
|
||||
}
|
||||
|
||||
#if defined(NOT_USED) /*< can't read monitor config parameters */
|
||||
if ((*router->servers)->backend_server->rlag == -2)
|
||||
{
|
||||
rlag_enabled = false;
|
||||
}
|
||||
else
|
||||
{
|
||||
rlag_enabled = true;
|
||||
}
|
||||
/**
|
||||
* If replication lag detection is not enabled the measure can't be
|
||||
* used in slave selection.
|
||||
*/
|
||||
if (!rlag_enabled)
|
||||
{
|
||||
if (rlag_limited)
|
||||
{
|
||||
MXS_WARNING("Configuration Failed, max_slave_replication_lag "
|
||||
"is set to %d,\n\t\t but detect_replication_lag "
|
||||
"is not enabled. Replication lag will not be checked.",
|
||||
router->rwsplit_config.rw_max_slave_replication_lag);
|
||||
}
|
||||
|
||||
if (router->rwsplit_config.rw_slave_select_criteria ==
|
||||
LEAST_BEHIND_MASTER)
|
||||
{
|
||||
MXS_WARNING("Configuration Failed, router option "
|
||||
"\n\t\t slave_selection_criteria=LEAST_BEHIND_MASTER "
|
||||
"is specified, but detect_replication_lag "
|
||||
"is not enabled.\n\t\t "
|
||||
"slave_selection_criteria=%s will be used instead.",
|
||||
STRCRITERIA(DEFAULT_CRITERIA));
|
||||
|
||||
router->rwsplit_config.rw_slave_select_criteria = DEFAULT_CRITERIA;
|
||||
}
|
||||
}
|
||||
#endif /*< NOT_USED */
|
||||
}
|
||||
|
||||
/*
|
||||
* @brief Release resources when createInstance fails to complete
|
||||
*
|
||||
*
|
||||
* Internal to createInstance
|
||||
*
|
||||
*
|
||||
* @param router Router instance
|
||||
*
|
||||
*
|
||||
*/
|
||||
static void free_rwsplit_instance(ROUTER_INSTANCE *router)
|
||||
{
|
||||
@ -2029,3 +1732,57 @@ static void free_rwsplit_instance(ROUTER_INSTANCE *router)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Create backend server references
|
||||
*
|
||||
* This creates a new set of backend references for the client session. Currently
|
||||
* this is only used on startup but it could be used to dynamically change the
|
||||
* set of used servers.
|
||||
*
|
||||
* @param rses Client router session
|
||||
* @param dest Destination where the array of backens is stored
|
||||
* @param n_backend Number of items in the array
|
||||
* @return True on success, false on error
|
||||
*/
|
||||
static bool create_backends(ROUTER_CLIENT_SES *rses, backend_ref_t** dest, int* n_backend)
|
||||
{
|
||||
backend_ref_t *backend_ref = (backend_ref_t *)MXS_CALLOC(1, *n_backend * sizeof(backend_ref_t));
|
||||
|
||||
if (backend_ref == NULL)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
int i = 0;
|
||||
|
||||
for (SERVER_REF *sref = rses->router->service->dbref; sref && i < *n_backend; sref = sref->next)
|
||||
{
|
||||
if (sref->active)
|
||||
{
|
||||
#if defined(SS_DEBUG)
|
||||
backend_ref[i].bref_chk_top = CHK_NUM_BACKEND_REF;
|
||||
backend_ref[i].bref_chk_tail = CHK_NUM_BACKEND_REF;
|
||||
backend_ref[i].bref_sescmd_cur.scmd_cur_chk_top = CHK_NUM_SESCMD_CUR;
|
||||
backend_ref[i].bref_sescmd_cur.scmd_cur_chk_tail = CHK_NUM_SESCMD_CUR;
|
||||
#endif
|
||||
backend_ref[i].bref_state = 0;
|
||||
backend_ref[i].ref = sref;
|
||||
/** store pointers to sescmd list to both cursors */
|
||||
backend_ref[i].bref_sescmd_cur.scmd_cur_rses = rses;
|
||||
backend_ref[i].bref_sescmd_cur.scmd_cur_active = false;
|
||||
backend_ref[i].bref_sescmd_cur.scmd_cur_ptr_property =
|
||||
&rses->rses_properties[RSES_PROP_TYPE_SESCMD];
|
||||
backend_ref[i].bref_sescmd_cur.scmd_cur_cmd = NULL;
|
||||
i++;
|
||||
}
|
||||
}
|
||||
|
||||
if (i < *n_backend)
|
||||
{
|
||||
MXS_INFO("The service reported %d servers but only took %d into use.", *n_backend, i);
|
||||
*n_backend = i;
|
||||
}
|
||||
|
||||
*dest = backend_ref;
|
||||
return true;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user