Refactor readwritesplit connection creation
The connections for a router session can now be done without a constructed router session. This simplifies the creation of new router session by removing the need to handle memory allocations. Readwritesplit router sessions are now created in the static `create` function which handles the actual creation of the connections and allocation of the session itself.
This commit is contained in:
@ -430,8 +430,9 @@ static bool handle_error_new_connection(RWSplit *inst,
|
|||||||
else
|
else
|
||||||
{
|
{
|
||||||
succp = select_connect_backend_servers(myrses->rses_nbackends, max_nslaves,
|
succp = select_connect_backend_servers(myrses->rses_nbackends, max_nslaves,
|
||||||
myrses->rses_config.slave_selection_criteria,
|
ses, inst->config(), myrses->backends,
|
||||||
ses, inst, myrses,
|
myrses->current_master, &myrses->sescmd_list,
|
||||||
|
&myrses->expected_responses,
|
||||||
connection_type::SLAVE);
|
connection_type::SLAVE);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -557,10 +558,9 @@ bool reply_is_complete(SRWBackend backend, GWBUF *buffer)
|
|||||||
return backend->get_reply_state() == REPLY_STATE_DONE;
|
return backend->get_reply_state() == REPLY_STATE_DONE;
|
||||||
}
|
}
|
||||||
|
|
||||||
void close_all_connections(RWSplitSession* rses)
|
void close_all_connections(SRWBackendList& backends)
|
||||||
{
|
{
|
||||||
for (SRWBackendList::iterator it = rses->backends.begin();
|
for (SRWBackendList::iterator it = backends.begin(); it != backends.end(); it++)
|
||||||
it != rses->backends.end(); it++)
|
|
||||||
{
|
{
|
||||||
SRWBackend& backend = *it;
|
SRWBackend& backend = *it;
|
||||||
|
|
||||||
@ -674,9 +674,13 @@ bool RWSplit::have_enough_servers() const
|
|||||||
return succp;
|
return succp;
|
||||||
}
|
}
|
||||||
|
|
||||||
RWSplitSession::RWSplitSession(RWSplit* instance, MXS_SESSION* session):
|
RWSplitSession::RWSplitSession(RWSplit* instance, MXS_SESSION* session,
|
||||||
|
const SRWBackendList& backends,
|
||||||
|
const SRWBackend& master):
|
||||||
rses_chk_top(CHK_NUM_ROUTER_SES),
|
rses_chk_top(CHK_NUM_ROUTER_SES),
|
||||||
rses_closed(false),
|
rses_closed(false),
|
||||||
|
backends(backends),
|
||||||
|
current_master(master),
|
||||||
rses_config(instance->config()),
|
rses_config(instance->config()),
|
||||||
rses_nbackends(instance->service()->n_dbref),
|
rses_nbackends(instance->service()->n_dbref),
|
||||||
load_data_state(LOAD_DATA_INACTIVE),
|
load_data_state(LOAD_DATA_INACTIVE),
|
||||||
@ -700,6 +704,43 @@ RWSplitSession::RWSplitSession(RWSplit* instance, MXS_SESSION* session):
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
RWSplitSession* RWSplitSession::create(RWSplit* router, MXS_SESSION* session)
|
||||||
|
{
|
||||||
|
RWSplitSession* rses = NULL;
|
||||||
|
|
||||||
|
if (router->have_enough_servers())
|
||||||
|
{
|
||||||
|
SRWBackendList backends;
|
||||||
|
|
||||||
|
for (SERVER_REF *sref = router->service()->dbref; sref; sref = sref->next)
|
||||||
|
{
|
||||||
|
if (sref->active)
|
||||||
|
{
|
||||||
|
backends.push_back(SRWBackend(new RWBackend(sref)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* At least the master must be found if the router is in the strict mode.
|
||||||
|
* If sessions without master are allowed, only a slave must be found.
|
||||||
|
*/
|
||||||
|
|
||||||
|
SRWBackend master;
|
||||||
|
|
||||||
|
if (select_connect_backend_servers(router->service()->n_dbref, router->max_slave_count(),
|
||||||
|
session, router->config(), backends, master,
|
||||||
|
NULL, NULL, connection_type::ALL))
|
||||||
|
{
|
||||||
|
if ((rses = new RWSplitSession(router, session, backends, master)))
|
||||||
|
{
|
||||||
|
router->stats().n_sessions += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return rses;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* API function definitions
|
* API function definitions
|
||||||
*/
|
*/
|
||||||
@ -757,43 +798,9 @@ static MXS_ROUTER* createInstance(SERVICE *service, char **options)
|
|||||||
static MXS_ROUTER_SESSION* newSession(MXS_ROUTER *router_inst, MXS_SESSION *session)
|
static MXS_ROUTER_SESSION* newSession(MXS_ROUTER *router_inst, MXS_SESSION *session)
|
||||||
{
|
{
|
||||||
RWSplit* router = reinterpret_cast<RWSplit*>(router_inst);
|
RWSplit* router = reinterpret_cast<RWSplit*>(router_inst);
|
||||||
|
RWSplitSession* rses = NULL;
|
||||||
if (!router->have_enough_servers())
|
MXS_EXCEPTION_GUARD(rses = RWSplitSession::create(router, session));
|
||||||
{
|
return reinterpret_cast<MXS_ROUTER_SESSION*>(rses);
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
RWSplitSession* client_rses = new (std::nothrow) RWSplitSession(router, session);
|
|
||||||
|
|
||||||
if (client_rses == NULL)
|
|
||||||
{
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (SERVER_REF *sref = router->service()->dbref; sref; sref = sref->next)
|
|
||||||
{
|
|
||||||
if (sref->active)
|
|
||||||
{
|
|
||||||
client_rses->backends.push_back(SRWBackend(new RWBackend(sref)));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!select_connect_backend_servers(router->service()->n_dbref, router->max_slave_count(),
|
|
||||||
router->config().slave_selection_criteria,
|
|
||||||
session, router, client_rses,
|
|
||||||
connection_type::ALL))
|
|
||||||
{
|
|
||||||
/**
|
|
||||||
* At least the master must be found if the router is in the strict mode.
|
|
||||||
* If sessions without master are allowed, only a slave must be found.
|
|
||||||
*/
|
|
||||||
delete client_rses;
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
router->stats().n_sessions += 1;
|
|
||||||
|
|
||||||
return reinterpret_cast<MXS_ROUTER_SESSION*>(client_rses);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -815,7 +822,7 @@ static void closeSession(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_sessio
|
|||||||
if (!router_cli_ses->rses_closed)
|
if (!router_cli_ses->rses_closed)
|
||||||
{
|
{
|
||||||
router_cli_ses->rses_closed = true;
|
router_cli_ses->rses_closed = true;
|
||||||
close_all_connections(router_cli_ses);
|
close_all_connections(router_cli_ses->backends);
|
||||||
|
|
||||||
if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO) &&
|
if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO) &&
|
||||||
router_cli_ses->sescmd_list.size())
|
router_cli_ses->sescmd_list.size())
|
||||||
@ -1147,10 +1154,9 @@ static void clientReply(MXS_ROUTER *instance,
|
|||||||
select_connect_backend_servers(
|
select_connect_backend_servers(
|
||||||
rses->rses_nbackends,
|
rses->rses_nbackends,
|
||||||
rses->rses_config.max_slave_connections,
|
rses->rses_config.max_slave_connections,
|
||||||
rses->rses_config.slave_selection_criteria,
|
|
||||||
rses->client_dcb->session,
|
rses->client_dcb->session,
|
||||||
rses->router,
|
rses->router->config(), rses->backends, rses->current_master,
|
||||||
rses,
|
&rses->sescmd_list, &rses->expected_responses,
|
||||||
connection_type::SLAVE);
|
connection_type::SLAVE);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -101,12 +101,13 @@ enum connection_type
|
|||||||
|
|
||||||
bool select_connect_backend_servers(int router_nservers,
|
bool select_connect_backend_servers(int router_nservers,
|
||||||
int max_nslaves,
|
int max_nslaves,
|
||||||
select_criteria_t select_criteria,
|
|
||||||
MXS_SESSION *session,
|
MXS_SESSION *session,
|
||||||
RWSplit *router,
|
const Config& config,
|
||||||
RWSplitSession *rses,
|
SRWBackendList& backends,
|
||||||
|
SRWBackend& current_master,
|
||||||
|
mxs::SessionCommandList* sescmd,
|
||||||
|
int* expected_responses,
|
||||||
connection_type type);
|
connection_type type);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* The following are implemented in rwsplit_tmp_table_multi.c
|
* The following are implemented in rwsplit_tmp_table_multi.c
|
||||||
*/
|
*/
|
||||||
@ -118,7 +119,7 @@ void check_create_tmp_table(RWSplitSession *router_cli_ses,
|
|||||||
GWBUF *querybuf, uint32_t type);
|
GWBUF *querybuf, uint32_t type);
|
||||||
bool check_for_multi_stmt(GWBUF *buf, void *protocol, uint8_t packet_type);
|
bool check_for_multi_stmt(GWBUF *buf, void *protocol, uint8_t packet_type);
|
||||||
|
|
||||||
void close_all_connections(RWSplitSession* rses);
|
void close_all_connections(SRWBackendList& backends);
|
||||||
|
|
||||||
uint32_t determine_query_type(GWBUF *querybuf, int command);
|
uint32_t determine_query_type(GWBUF *querybuf, int command);
|
||||||
|
|
||||||
|
|||||||
@ -54,15 +54,15 @@ static bool valid_for_slave(const SERVER *server, const SERVER *master)
|
|||||||
*
|
*
|
||||||
* @return The best slave backend reference or NULL if no candidates could be found
|
* @return The best slave backend reference or NULL if no candidates could be found
|
||||||
*/
|
*/
|
||||||
SRWBackend get_slave_candidate(RWSplitSession* rses, const SERVER *master,
|
SRWBackend get_slave_candidate(const SRWBackendList& backends, const SERVER *master,
|
||||||
int (*cmpfun)(const SRWBackend&, const SRWBackend&))
|
int (*cmpfun)(const SRWBackend&, const SRWBackend&))
|
||||||
{
|
{
|
||||||
SRWBackend candidate;
|
SRWBackend candidate;
|
||||||
|
|
||||||
for (SRWBackendList::iterator it = rses->backends.begin();
|
for (SRWBackendList::const_iterator it = backends.begin();
|
||||||
it != rses->backends.end(); it++)
|
it != backends.end(); it++)
|
||||||
{
|
{
|
||||||
SRWBackend& backend = *it;
|
const SRWBackend& backend = *it;
|
||||||
|
|
||||||
if (!backend->in_use() && backend->can_connect() &&
|
if (!backend->in_use() && backend->can_connect() &&
|
||||||
valid_for_slave(backend->server(), master))
|
valid_for_slave(backend->server(), master))
|
||||||
@ -197,14 +197,12 @@ int (*criteria_cmpfun[LAST_CRITERIA])(const SRWBackend&, const SRWBackend&) =
|
|||||||
* @param criteria Slave selection criteria
|
* @param criteria Slave selection criteria
|
||||||
* @param rses Router client session
|
* @param rses Router client session
|
||||||
*/
|
*/
|
||||||
static void log_server_connections(select_criteria_t criteria,
|
static void log_server_connections(select_criteria_t criteria, const SRWBackendList& backends)
|
||||||
RWSplitSession* rses)
|
|
||||||
{
|
{
|
||||||
MXS_INFO("Servers and %s connection counts:",
|
MXS_INFO("Servers and %s connection counts:",
|
||||||
criteria == LEAST_GLOBAL_CONNECTIONS ? "all MaxScale" : "router");
|
criteria == LEAST_GLOBAL_CONNECTIONS ? "all MaxScale" : "router");
|
||||||
|
|
||||||
for (SRWBackendList::iterator it = rses->backends.begin();
|
for (SRWBackendList::const_iterator it = backends.begin(); it != backends.end(); it++)
|
||||||
it != rses->backends.end(); it++)
|
|
||||||
{
|
{
|
||||||
SERVER_REF* b = (*it)->backend();
|
SERVER_REF* b = (*it)->backend();
|
||||||
|
|
||||||
@ -246,12 +244,12 @@ static void log_server_connections(select_criteria_t criteria,
|
|||||||
*
|
*
|
||||||
* @return The root master reference or NULL if no master is found
|
* @return The root master reference or NULL if no master is found
|
||||||
*/
|
*/
|
||||||
static SERVER_REF* get_root_master(RWSplitSession* rses)
|
static SERVER_REF* get_root_master(const SRWBackendList& backends)
|
||||||
{
|
{
|
||||||
SERVER_REF *master_host = NULL;
|
SERVER_REF *master_host = NULL;
|
||||||
|
|
||||||
for (SRWBackendList::iterator it = rses->backends.begin();
|
for (SRWBackendList::const_iterator it = backends.begin();
|
||||||
it != rses->backends.end(); it++)
|
it != backends.end(); it++)
|
||||||
{
|
{
|
||||||
SERVER_REF* b = (*it)->backend();
|
SERVER_REF* b = (*it)->backend();
|
||||||
|
|
||||||
@ -288,17 +286,19 @@ static SERVER_REF* get_root_master(RWSplitSession* rses)
|
|||||||
*/
|
*/
|
||||||
bool select_connect_backend_servers(int router_nservers,
|
bool select_connect_backend_servers(int router_nservers,
|
||||||
int max_nslaves,
|
int max_nslaves,
|
||||||
select_criteria_t select_criteria,
|
|
||||||
MXS_SESSION *session,
|
MXS_SESSION *session,
|
||||||
RWSplit *router,
|
const Config& config,
|
||||||
RWSplitSession *rses,
|
SRWBackendList& backends,
|
||||||
|
SRWBackend& current_master,
|
||||||
|
mxs::SessionCommandList* sescmd_list,
|
||||||
|
int* expected_responses,
|
||||||
connection_type type)
|
connection_type type)
|
||||||
{
|
{
|
||||||
/* get the root Master */
|
/* get the root Master */
|
||||||
SERVER_REF *master_backend = get_root_master(rses);
|
SERVER_REF *master_backend = get_root_master(backends);
|
||||||
SERVER *master_host = master_backend ? master_backend->server : NULL;
|
SERVER *master_host = master_backend ? master_backend->server : NULL;
|
||||||
|
|
||||||
if (router->config().master_failure_mode == RW_FAIL_INSTANTLY &&
|
if (config.master_failure_mode == RW_FAIL_INSTANTLY &&
|
||||||
(master_host == NULL || SERVER_IS_DOWN(master_host)))
|
(master_host == NULL || SERVER_IS_DOWN(master_host)))
|
||||||
{
|
{
|
||||||
MXS_ERROR("Couldn't find suitable Master from %d candidates.", router_nservers);
|
MXS_ERROR("Couldn't find suitable Master from %d candidates.", router_nservers);
|
||||||
@ -315,15 +315,16 @@ bool select_connect_backend_servers(int router_nservers,
|
|||||||
* Master is already connected or we don't have a master. The function was
|
* Master is already connected or we don't have a master. The function was
|
||||||
* called because new slaves must be selected to replace failed ones.
|
* called because new slaves must be selected to replace failed ones.
|
||||||
*/
|
*/
|
||||||
bool master_connected = type == SLAVE || rses->current_master;
|
bool master_connected = type == SLAVE || current_master;
|
||||||
|
|
||||||
/** Check slave selection criteria and set compare function */
|
/** Check slave selection criteria and set compare function */
|
||||||
|
select_criteria_t select_criteria = config.slave_selection_criteria;
|
||||||
int (*cmpfun)(const SRWBackend&, const SRWBackend&) = criteria_cmpfun[select_criteria];
|
int (*cmpfun)(const SRWBackend&, const SRWBackend&) = criteria_cmpfun[select_criteria];
|
||||||
ss_dassert(cmpfun);
|
ss_dassert(cmpfun);
|
||||||
|
|
||||||
if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO))
|
if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO))
|
||||||
{
|
{
|
||||||
log_server_connections(select_criteria, rses);
|
log_server_connections(select_criteria, backends);
|
||||||
}
|
}
|
||||||
|
|
||||||
int slaves_found = 0;
|
int slaves_found = 0;
|
||||||
@ -334,26 +335,24 @@ bool select_connect_backend_servers(int router_nservers,
|
|||||||
if (!master_connected)
|
if (!master_connected)
|
||||||
{
|
{
|
||||||
/** Find a master server */
|
/** Find a master server */
|
||||||
for (SRWBackendList::iterator it = rses->backends.begin();
|
for (SRWBackendList::const_iterator it = backends.begin(); it != backends.end(); it++)
|
||||||
it != rses->backends.end(); it++)
|
|
||||||
{
|
{
|
||||||
SRWBackend& backend = *it;
|
const SRWBackend& backend = *it;
|
||||||
|
|
||||||
if (backend->can_connect() && master_host && backend->server() == master_host)
|
if (backend->can_connect() && master_host && backend->server() == master_host)
|
||||||
{
|
{
|
||||||
if (backend->connect(session))
|
if (backend->connect(session))
|
||||||
{
|
{
|
||||||
rses->current_master = backend;
|
current_master = backend;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Calculate how many connections we already have */
|
/** Calculate how many connections we already have */
|
||||||
for (SRWBackendList::iterator it = rses->backends.begin();
|
for (SRWBackendList::const_iterator it = backends.begin(); it != backends.end(); it++)
|
||||||
it != rses->backends.end(); it++)
|
|
||||||
{
|
{
|
||||||
SRWBackend& backend = *it;
|
const SRWBackend& backend = *it;
|
||||||
|
|
||||||
if (backend->can_connect() && valid_for_slave(backend->server(), master_host))
|
if (backend->can_connect() && valid_for_slave(backend->server(), master_host))
|
||||||
{
|
{
|
||||||
@ -369,19 +368,22 @@ bool select_connect_backend_servers(int router_nservers,
|
|||||||
ss_dassert(slaves_connected < max_nslaves || max_nslaves == 0);
|
ss_dassert(slaves_connected < max_nslaves || max_nslaves == 0);
|
||||||
|
|
||||||
/** Connect to all possible slaves */
|
/** Connect to all possible slaves */
|
||||||
for (SRWBackend backend(get_slave_candidate(rses, master_host, cmpfun));
|
for (SRWBackend backend(get_slave_candidate(backends, master_host, cmpfun));
|
||||||
backend && slaves_connected < max_nslaves;
|
backend && slaves_connected < max_nslaves;
|
||||||
backend = get_slave_candidate(rses, master_host, cmpfun))
|
backend = get_slave_candidate(backends, master_host, cmpfun))
|
||||||
{
|
{
|
||||||
if (backend->can_connect() && backend->connect(session))
|
if (backend->can_connect() && backend->connect(session))
|
||||||
{
|
{
|
||||||
if (rses->sescmd_list.size())
|
if (sescmd_list && sescmd_list->size())
|
||||||
{
|
{
|
||||||
backend->append_session_command(rses->sescmd_list);
|
backend->append_session_command(*sescmd_list);
|
||||||
|
|
||||||
if (backend->execute_session_command())
|
if (backend->execute_session_command())
|
||||||
{
|
{
|
||||||
rses->expected_responses++;
|
if (expected_responses)
|
||||||
|
{
|
||||||
|
(*expected_responses)++;
|
||||||
|
}
|
||||||
slaves_connected++;
|
slaves_connected++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -405,10 +407,9 @@ bool select_connect_backend_servers(int router_nservers,
|
|||||||
"of %d of them.", slaves_connected, slaves_found);
|
"of %d of them.", slaves_connected, slaves_found);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (SRWBackendList::iterator it = rses->backends.begin();
|
for (SRWBackendList::const_iterator it = backends.begin(); it != backends.end(); it++)
|
||||||
it != rses->backends.end(); it++)
|
|
||||||
{
|
{
|
||||||
SRWBackend& backend = *it;
|
const SRWBackend& backend = *it;
|
||||||
if (backend->in_use())
|
if (backend->in_use())
|
||||||
{
|
{
|
||||||
MXS_INFO("Selected %s in \t%s", STRSRVSTATUS(backend->server()),
|
MXS_INFO("Selected %s in \t%s", STRSRVSTATUS(backend->server()),
|
||||||
@ -422,7 +423,7 @@ bool select_connect_backend_servers(int router_nservers,
|
|||||||
MXS_ERROR("Couldn't establish required amount of slave connections for "
|
MXS_ERROR("Couldn't establish required amount of slave connections for "
|
||||||
"router session. Would need between %d and %d slaves but only have %d.",
|
"router session. Would need between %d and %d slaves but only have %d.",
|
||||||
min_nslaves, max_nslaves, slaves_connected);
|
min_nslaves, max_nslaves, slaves_connected);
|
||||||
close_all_connections(rses);
|
close_all_connections(backends);
|
||||||
}
|
}
|
||||||
|
|
||||||
return succp;
|
return succp;
|
||||||
|
|||||||
@ -73,7 +73,16 @@ class RWSplitSession
|
|||||||
RWSplitSession& operator=(const RWSplitSession&);
|
RWSplitSession& operator=(const RWSplitSession&);
|
||||||
|
|
||||||
public:
|
public:
|
||||||
RWSplitSession(RWSplit* instance, MXS_SESSION* session);
|
|
||||||
|
/**
|
||||||
|
* Create a new router session
|
||||||
|
*
|
||||||
|
* @param instance Router instance
|
||||||
|
* @param session The session object
|
||||||
|
*
|
||||||
|
* @return New router session
|
||||||
|
*/
|
||||||
|
static RWSplitSession* create(RWSplit* router, MXS_SESSION* session);
|
||||||
|
|
||||||
// TODO: Make member variables private
|
// TODO: Make member variables private
|
||||||
skygw_chk_t rses_chk_top;
|
skygw_chk_t rses_chk_top;
|
||||||
@ -100,6 +109,10 @@ public:
|
|||||||
ClientHandleMap ps_handles; /**< Client PS handle to internal ID mapping */
|
ClientHandleMap ps_handles; /**< Client PS handle to internal ID mapping */
|
||||||
ExecMap exec_map; /**< Map of COM_STMT_EXECUTE statement IDs to Backends */
|
ExecMap exec_map; /**< Map of COM_STMT_EXECUTE statement IDs to Backends */
|
||||||
skygw_chk_t rses_chk_tail;
|
skygw_chk_t rses_chk_tail;
|
||||||
|
|
||||||
|
private:
|
||||||
|
RWSplitSession(RWSplit* instance, MXS_SESSION* session,
|
||||||
|
const SRWBackendList& backends, const SRWBackend& master);
|
||||||
};
|
};
|
||||||
|
|
||||||
/** Struct for holding routing related information */
|
/** Struct for holding routing related information */
|
||||||
|
|||||||
Reference in New Issue
Block a user