MXS-2313: Clean up readwritesplit connection creation

The connection creation is now internal to RWSplitSession. This makes the
code more readable by removing the need to pass parameters and allowing
easier reuse of existing functions. The various conditions require to
create connections are now also checked in only one place.
This commit is contained in:
Markus Mäkelä
2019-03-08 14:41:16 +02:00
parent 54a09e93df
commit ba448cb12c
5 changed files with 80 additions and 124 deletions

View File

@ -267,12 +267,7 @@ public:
int max_slave_count() const; int max_slave_count() const;
bool have_enough_servers() const; bool have_enough_servers() const;
bool select_connect_backend_servers(MXS_SESSION* session,
mxs::PRWBackends& backends,
mxs::RWBackend** current_master,
mxs::SessionCommandList* sescmd_list,
int* expected_responses,
connection_type type);
// API functions // API functions
/** /**

View File

@ -67,6 +67,27 @@ void RWSplitSession::handle_connection_keepalive(RWBackend* target)
mxb_assert(nserv < m_nbackends); mxb_assert(nserv < m_nbackends);
} }
bool RWSplitSession::prepare_connection(RWBackend* target)
{
mxb_assert(!target->in_use());
bool rval = target->connect(m_client->session, &m_sescmd_list);
if (rval)
{
MXS_INFO("Connected to '%s'", target->name());
if (target->is_waiting_result())
{
mxb_assert_message(!m_sescmd_list.empty() && target->has_session_commands(),
"Session command list must not be empty and target "
"should have unfinished session commands.");
m_expected_responses++;
}
}
return rval;
}
bool RWSplitSession::prepare_target(RWBackend* target, route_target_t route_target) bool RWSplitSession::prepare_target(RWBackend* target, route_target_t route_target)
{ {
bool rval = true; bool rval = true;
@ -76,16 +97,7 @@ bool RWSplitSession::prepare_target(RWBackend* target, route_target_t route_targ
{ {
mxb_assert(target->can_connect() && can_recover_servers()); mxb_assert(target->can_connect() && can_recover_servers());
mxb_assert(!TARGET_IS_MASTER(route_target) || m_config.master_reconnection); mxb_assert(!TARGET_IS_MASTER(route_target) || m_config.master_reconnection);
rval = target->connect(m_client->session, &m_sescmd_list); rval = prepare_connection(target);
MXS_INFO("Connected to '%s'", target->name());
if (rval && target->is_waiting_result())
{
mxb_assert_message(!m_sescmd_list.empty() && target->has_session_commands(),
"Session command list must not be empty and target "
"should have unfinished session commands.");
m_expected_responses++;
}
} }
return rval; return rval;

View File

@ -12,6 +12,7 @@
*/ */
#include "readwritesplit.hh" #include "readwritesplit.hh"
#include "rwsplitsession.hh"
#include <stdio.h> #include <stdio.h>
#include <strings.h> #include <strings.h>
@ -378,93 +379,71 @@ std::pair<int, int> get_slave_counts(PRWBackends& backends, RWBackend* master)
/** /**
* Select and connect to backend servers * Select and connect to backend servers
* *
* @param inst Router instance
* @param session Client session
* @param backends List of backend servers
* @param current_master The current master server
* @param sescmd_list List of session commands to execute
* @param expected_responses Pointer where number of expected responses are written
* @param type Connection type, ALL for all types, SLAVE for slaves only
*
* @return True if session can continue * @return True if session can continue
*/ */
bool RWSplit::select_connect_backend_servers(MXS_SESSION* session, bool RWSplitSession::open_connections()
mxs::PRWBackends& backends,
mxs::RWBackend** current_master,
SessionCommandList* sescmd_list,
int* expected_responses,
connection_type type)
{ {
const Config& cnf {config()}; if (m_config.lazy_connect)
RWBackend* master = get_root_master(backends, *current_master, cnf.backend_select_fct); {
return true; // No need to create connections
}
if ((!master || !master->can_connect()) && cnf.master_failure_mode == RW_FAIL_INSTANTLY) RWBackend* master = get_root_master(m_raw_backends, m_current_master, m_config.backend_select_fct);
if ((!master || !master->can_connect()) && m_config.master_failure_mode == RW_FAIL_INSTANTLY)
{ {
if (!master) if (!master)
{ {
MXS_ERROR("Couldn't find suitable Master from %lu candidates.", backends.size()); MXS_ERROR("Couldn't find suitable Master from %lu candidates.", m_raw_backends.size());
} }
else else
{ {
MXS_ERROR("Master exists (%s), but it is being drained and cannot be used.", MXS_ERROR("Master exists (%s), but it is being drained and cannot be used.",
master->server()->address); master->server()->address);
} }
return false; return false;
} }
auto select_criteria = cnf.slave_selection_criteria;
if (mxs_log_is_priority_enabled(LOG_INFO)) if (mxs_log_is_priority_enabled(LOG_INFO))
{ {
log_server_connections(select_criteria, backends); log_server_connections(m_config.slave_selection_criteria, m_raw_backends);
} }
if (type == ALL && master && master->connect(session)) if (can_recover_servers())
{
// A master connection can be safely attempted
if (master && !master->in_use() && master->can_connect() && prepare_connection(master))
{ {
MXS_INFO("Selected Master: %s", master->name()); MXS_INFO("Selected Master: %s", master->name());
*current_master = master; m_current_master = master;
}
} }
auto counts = get_slave_counts(backends, master); int n_slaves = get_slave_counts(m_raw_backends, master).second;
int slaves_connected = counts.second; int max_nslaves = m_router->max_slave_count();
int max_nslaves = max_slave_count();
mxb_assert(slaves_connected <= max_nslaves || max_nslaves == 0);
PRWBackends candidates; PRWBackends candidates;
for (auto& pBackend : backends) mxb_assert(n_slaves <= max_nslaves || max_nslaves == 0);
for (auto& pBackend : m_raw_backends)
{ {
if (!pBackend->in_use() if (!pBackend->in_use() && pBackend->can_connect() && valid_for_slave(pBackend, master))
&& pBackend->can_connect()
&& valid_for_slave(pBackend, master))
{ {
candidates.push_back(pBackend); candidates.push_back(pBackend);
} }
} }
while (slaves_connected < max_nslaves && candidates.size()) for (auto ite = m_config.backend_select_fct(candidates);
n_slaves < max_nslaves && !candidates.empty() && ite != candidates.end();
ite = m_config.backend_select_fct(candidates))
{ {
auto ite = m_config->backend_select_fct(candidates); if (prepare_connection(*ite))
if (ite == candidates.end())
{ {
break; MXS_INFO("Selected Slave: %s", (*ite)->name());
++n_slaves;
} }
auto& backend = *ite;
if (backend->connect(session, sescmd_list))
{
MXS_INFO("Selected Slave: %s", backend->name());
if (sescmd_list && sescmd_list->size() && expected_responses)
{
(*expected_responses)++;
}
++slaves_connected;
}
candidates.erase(ite); candidates.erase(ite);
} }
return true; return true;
} }

View File

@ -21,18 +21,14 @@
using namespace maxscale; using namespace maxscale;
RWSplitSession::RWSplitSession(RWSplit* instance, RWSplitSession::RWSplitSession(RWSplit* instance, MXS_SESSION* session, mxs::SRWBackends backends)
MXS_SESSION* session,
const Config& config,
mxs::SRWBackends backends,
mxs::RWBackend* master)
: mxs::RouterSession(session) : mxs::RouterSession(session)
, m_backends(std::move(backends)) , m_backends(std::move(backends))
, m_raw_backends(sptr_vec_to_ptr_vec(m_backends)) , m_raw_backends(sptr_vec_to_ptr_vec(m_backends))
, m_current_master(master) , m_current_master(master)
, m_target_node(nullptr) , m_target_node(nullptr)
, m_prev_target(nullptr) , m_prev_target(nullptr)
, m_config(config) , m_config(instance->config())
, m_last_keepalive_check(mxs_clock()) , m_last_keepalive_check(mxs_clock())
, m_nbackends(instance->service()->n_dbref) , m_nbackends(instance->service()->n_dbref)
, m_client(session->client_dcb) , m_client(session->client_dcb)
@ -58,6 +54,11 @@ RWSplitSession::RWSplitSession(RWSplit* instance,
n_conn = MXS_MAX(floor((double)m_nbackends * pct), 1); n_conn = MXS_MAX(floor((double)m_nbackends * pct), 1);
m_config.max_slave_connections = n_conn; m_config.max_slave_connections = n_conn;
} }
for (auto& b : m_raw_backends)
{
m_server_stats[b->server()].start_session();
}
} }
RWSplitSession* RWSplitSession::create(RWSplit* router, MXS_SESSION* session) RWSplitSession* RWSplitSession::create(RWSplit* router, MXS_SESSION* session)
@ -68,31 +69,16 @@ RWSplitSession* RWSplitSession::create(RWSplit* router, MXS_SESSION* session)
{ {
SRWBackends backends = RWBackend::from_servers(router->service()->dbref); SRWBackends backends = RWBackend::from_servers(router->service()->dbref);
/** if ((rses = new(std::nothrow) RWSplitSession(router, session, std::move(backends))))
* At least the master must be found if the router is in the strict mode.
* If sessions without master are allowed, only a slave must be found.
*/
RWBackend* master = nullptr;
const auto& config = router->config();
auto backend_ptrs = sptr_vec_to_ptr_vec(backends);
if (config.lazy_connect
|| router->select_connect_backend_servers(session,
backend_ptrs,
&master,
NULL,
NULL,
connection_type::ALL))
{ {
if ((rses = new RWSplitSession(router, session, config, std::move(backends), master))) if (rses->open_connections())
{ {
router->stats().n_sessions += 1; router->stats().n_sessions += 1;
} }
else
for (auto& b : backends)
{ {
rses->m_server_stats[b->server()].start_session(); delete rses;
rses = nullptr;
} }
} }
} }
@ -1115,25 +1101,9 @@ bool RWSplitSession::handle_error_new_connection(DCB* backend_dcb, GWBUF* errmsg
bool succp = false; bool succp = false;
if (m_config.lazy_connect) if (!can_recover_servers())
{ {
// Lazy connect is enabled, don't care whether we have available servers succp = can_continue_session();
succp = true;
}
/**
* Try to get replacement slave or at least the minimum
* number of slave connections for router session.
*/
else if (m_recv_sescmd > 0 && m_config.disable_sescmd_history)
{
for (const auto& a : m_raw_backends)
{
if (a->in_use())
{
succp = true;
break;
}
}
if (!succp) if (!succp)
{ {
@ -1143,13 +1113,8 @@ bool RWSplitSession::handle_error_new_connection(DCB* backend_dcb, GWBUF* errmsg
} }
else else
{ {
// Try to replace failed connections
succp = m_router->select_connect_backend_servers(ses, succp = open_connections();
m_raw_backends,
&m_current_master,
&m_sescmd_list,
&m_expected_responses,
connection_type::SLAVE);
} }
return succp; return succp;

View File

@ -131,12 +131,9 @@ public:
} }
private: private:
RWSplitSession(RWSplit* instance, RWSplitSession(RWSplit* instance, MXS_SESSION* session, mxs::SRWBackends backends);
MXS_SESSION* session,
const Config& config,
mxs::SRWBackends backends,
mxs::RWBackend* master);
bool open_connections();
void process_sescmd_response(mxs::RWBackend* backend, GWBUF** ppPacket); void process_sescmd_response(mxs::RWBackend* backend, GWBUF** ppPacket);
void compress_history(mxs::SSessionCommand& sescmd); void compress_history(mxs::SSessionCommand& sescmd);
@ -163,6 +160,7 @@ private:
bool handle_got_target(GWBUF* querybuf, mxs::RWBackend* target, bool store); bool handle_got_target(GWBUF* querybuf, mxs::RWBackend* target, bool store);
void handle_connection_keepalive(mxs::RWBackend* target); void handle_connection_keepalive(mxs::RWBackend* target);
bool prepare_target(mxs::RWBackend* target, route_target_t route_target); bool prepare_target(mxs::RWBackend* target, route_target_t route_target);
bool prepare_connection(mxs::RWBackend* target);
bool create_one_connection(); bool create_one_connection();
void retry_query(GWBUF* querybuf, int delay = 1); void retry_query(GWBUF* querybuf, int delay = 1);
@ -248,6 +246,13 @@ private:
return !m_config.disable_sescmd_history || m_recv_sescmd == 0; return !m_config.disable_sescmd_history || m_recv_sescmd == 0;
} }
inline bool can_continue_session() const
{
return std::any_of(m_raw_backends.begin(), m_raw_backends.end(), [](mxs::RWBackend* b) {
return b->in_use();
});
}
inline bool is_large_query(GWBUF* buf) inline bool is_large_query(GWBUF* buf)
{ {
uint32_t buflen = gwbuf_length(buf); uint32_t buflen = gwbuf_length(buf);