diff --git a/server/modules/routing/readwritesplit/readwritesplit.hh b/server/modules/routing/readwritesplit/readwritesplit.hh index 0a37d04d5..db345687c 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.hh +++ b/server/modules/routing/readwritesplit/readwritesplit.hh @@ -267,12 +267,7 @@ public: int max_slave_count() const; bool have_enough_servers() const; - bool select_connect_backend_servers(MXS_SESSION* session, - mxs::PRWBackends& backends, - mxs::RWBackend** current_master, - mxs::SessionCommandList* sescmd_list, - int* expected_responses, - connection_type type); + // API functions /** diff --git a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc index a03f2f5f1..19e6f4f69 100644 --- a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc +++ b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc @@ -67,6 +67,27 @@ void RWSplitSession::handle_connection_keepalive(RWBackend* target) mxb_assert(nserv < m_nbackends); } +bool RWSplitSession::prepare_connection(RWBackend* target) +{ + mxb_assert(!target->in_use()); + bool rval = target->connect(m_client->session, &m_sescmd_list); + + if (rval) + { + MXS_INFO("Connected to '%s'", target->name()); + + if (target->is_waiting_result()) + { + mxb_assert_message(!m_sescmd_list.empty() && target->has_session_commands(), + "Session command list must not be empty and target " + "should have unfinished session commands."); + m_expected_responses++; + } + } + + return rval; +} + bool RWSplitSession::prepare_target(RWBackend* target, route_target_t route_target) { bool rval = true; @@ -76,16 +97,7 @@ bool RWSplitSession::prepare_target(RWBackend* target, route_target_t route_targ { mxb_assert(target->can_connect() && can_recover_servers()); mxb_assert(!TARGET_IS_MASTER(route_target) || m_config.master_reconnection); - rval = target->connect(m_client->session, &m_sescmd_list); - MXS_INFO("Connected to '%s'", target->name()); - - if (rval && target->is_waiting_result()) - { - mxb_assert_message(!m_sescmd_list.empty() && target->has_session_commands(), - "Session command list must not be empty and target " - "should have unfinished session commands."); - m_expected_responses++; - } + rval = prepare_connection(target); } return rval; diff --git a/server/modules/routing/readwritesplit/rwsplit_select_backends.cc b/server/modules/routing/readwritesplit/rwsplit_select_backends.cc index 4f1e2e3c3..af9d14f23 100644 --- a/server/modules/routing/readwritesplit/rwsplit_select_backends.cc +++ b/server/modules/routing/readwritesplit/rwsplit_select_backends.cc @@ -12,6 +12,7 @@ */ #include "readwritesplit.hh" +#include "rwsplitsession.hh" #include #include @@ -378,93 +379,71 @@ std::pair get_slave_counts(PRWBackends& backends, RWBackend* master) /** * Select and connect to backend servers * - * @param inst Router instance - * @param session Client session - * @param backends List of backend servers - * @param current_master The current master server - * @param sescmd_list List of session commands to execute - * @param expected_responses Pointer where number of expected responses are written - * @param type Connection type, ALL for all types, SLAVE for slaves only - * * @return True if session can continue */ -bool RWSplit::select_connect_backend_servers(MXS_SESSION* session, - mxs::PRWBackends& backends, - mxs::RWBackend** current_master, - SessionCommandList* sescmd_list, - int* expected_responses, - connection_type type) +bool RWSplitSession::open_connections() { - const Config& cnf {config()}; - RWBackend* master = get_root_master(backends, *current_master, cnf.backend_select_fct); + if (m_config.lazy_connect) + { + return true; // No need to create connections + } - if ((!master || !master->can_connect()) && cnf.master_failure_mode == RW_FAIL_INSTANTLY) + RWBackend* master = get_root_master(m_raw_backends, m_current_master, m_config.backend_select_fct); + + if ((!master || !master->can_connect()) && m_config.master_failure_mode == RW_FAIL_INSTANTLY) { if (!master) { - MXS_ERROR("Couldn't find suitable Master from %lu candidates.", backends.size()); + MXS_ERROR("Couldn't find suitable Master from %lu candidates.", m_raw_backends.size()); } else { MXS_ERROR("Master exists (%s), but it is being drained and cannot be used.", master->server()->address); } - return false; } - auto select_criteria = cnf.slave_selection_criteria; - if (mxs_log_is_priority_enabled(LOG_INFO)) { - log_server_connections(select_criteria, backends); + log_server_connections(m_config.slave_selection_criteria, m_raw_backends); } - if (type == ALL && master && master->connect(session)) + if (can_recover_servers()) { - MXS_INFO("Selected Master: %s", master->name()); - *current_master = master; + // A master connection can be safely attempted + if (master && !master->in_use() && master->can_connect() && prepare_connection(master)) + { + MXS_INFO("Selected Master: %s", master->name()); + m_current_master = master; + } } - auto counts = get_slave_counts(backends, master); - int slaves_connected = counts.second; - int max_nslaves = max_slave_count(); - - mxb_assert(slaves_connected <= max_nslaves || max_nslaves == 0); - + int n_slaves = get_slave_counts(m_raw_backends, master).second; + int max_nslaves = m_router->max_slave_count(); PRWBackends candidates; - for (auto& pBackend : backends) + mxb_assert(n_slaves <= max_nslaves || max_nslaves == 0); + + for (auto& pBackend : m_raw_backends) { - if (!pBackend->in_use() - && pBackend->can_connect() - && valid_for_slave(pBackend, master)) + if (!pBackend->in_use() && pBackend->can_connect() && valid_for_slave(pBackend, master)) { candidates.push_back(pBackend); } } - while (slaves_connected < max_nslaves && candidates.size()) + for (auto ite = m_config.backend_select_fct(candidates); + n_slaves < max_nslaves && !candidates.empty() && ite != candidates.end(); + ite = m_config.backend_select_fct(candidates)) { - auto ite = m_config->backend_select_fct(candidates); - if (ite == candidates.end()) + if (prepare_connection(*ite)) { - break; + MXS_INFO("Selected Slave: %s", (*ite)->name()); + ++n_slaves; } - auto& backend = *ite; - - if (backend->connect(session, sescmd_list)) - { - MXS_INFO("Selected Slave: %s", backend->name()); - - if (sescmd_list && sescmd_list->size() && expected_responses) - { - (*expected_responses)++; - } - - ++slaves_connected; - } candidates.erase(ite); } + return true; } diff --git a/server/modules/routing/readwritesplit/rwsplitsession.cc b/server/modules/routing/readwritesplit/rwsplitsession.cc index 2d58d390b..85bf7b669 100644 --- a/server/modules/routing/readwritesplit/rwsplitsession.cc +++ b/server/modules/routing/readwritesplit/rwsplitsession.cc @@ -21,18 +21,14 @@ using namespace maxscale; -RWSplitSession::RWSplitSession(RWSplit* instance, - MXS_SESSION* session, - const Config& config, - mxs::SRWBackends backends, - mxs::RWBackend* master) +RWSplitSession::RWSplitSession(RWSplit* instance, MXS_SESSION* session, mxs::SRWBackends backends) : mxs::RouterSession(session) , m_backends(std::move(backends)) , m_raw_backends(sptr_vec_to_ptr_vec(m_backends)) , m_current_master(master) , m_target_node(nullptr) , m_prev_target(nullptr) - , m_config(config) + , m_config(instance->config()) , m_last_keepalive_check(mxs_clock()) , m_nbackends(instance->service()->n_dbref) , m_client(session->client_dcb) @@ -58,6 +54,11 @@ RWSplitSession::RWSplitSession(RWSplit* instance, n_conn = MXS_MAX(floor((double)m_nbackends * pct), 1); m_config.max_slave_connections = n_conn; } + + for (auto& b : m_raw_backends) + { + m_server_stats[b->server()].start_session(); + } } RWSplitSession* RWSplitSession::create(RWSplit* router, MXS_SESSION* session) @@ -68,31 +69,16 @@ RWSplitSession* RWSplitSession::create(RWSplit* router, MXS_SESSION* session) { SRWBackends backends = RWBackend::from_servers(router->service()->dbref); - /** - * At least the master must be found if the router is in the strict mode. - * If sessions without master are allowed, only a slave must be found. - */ - - RWBackend* master = nullptr; - const auto& config = router->config(); - auto backend_ptrs = sptr_vec_to_ptr_vec(backends); - - if (config.lazy_connect - || router->select_connect_backend_servers(session, - backend_ptrs, - &master, - NULL, - NULL, - connection_type::ALL)) + if ((rses = new(std::nothrow) RWSplitSession(router, session, std::move(backends)))) { - if ((rses = new RWSplitSession(router, session, config, std::move(backends), master))) + if (rses->open_connections()) { router->stats().n_sessions += 1; } - - for (auto& b : backends) + else { - rses->m_server_stats[b->server()].start_session(); + delete rses; + rses = nullptr; } } } @@ -1115,25 +1101,9 @@ bool RWSplitSession::handle_error_new_connection(DCB* backend_dcb, GWBUF* errmsg bool succp = false; - if (m_config.lazy_connect) + if (!can_recover_servers()) { - // Lazy connect is enabled, don't care whether we have available servers - succp = true; - } - /** - * Try to get replacement slave or at least the minimum - * number of slave connections for router session. - */ - else if (m_recv_sescmd > 0 && m_config.disable_sescmd_history) - { - for (const auto& a : m_raw_backends) - { - if (a->in_use()) - { - succp = true; - break; - } - } + succp = can_continue_session(); if (!succp) { @@ -1143,13 +1113,8 @@ bool RWSplitSession::handle_error_new_connection(DCB* backend_dcb, GWBUF* errmsg } else { - - succp = m_router->select_connect_backend_servers(ses, - m_raw_backends, - &m_current_master, - &m_sescmd_list, - &m_expected_responses, - connection_type::SLAVE); + // Try to replace failed connections + succp = open_connections(); } return succp; diff --git a/server/modules/routing/readwritesplit/rwsplitsession.hh b/server/modules/routing/readwritesplit/rwsplitsession.hh index 28d81a589..a3c0aeef3 100644 --- a/server/modules/routing/readwritesplit/rwsplitsession.hh +++ b/server/modules/routing/readwritesplit/rwsplitsession.hh @@ -131,12 +131,9 @@ public: } private: - RWSplitSession(RWSplit* instance, - MXS_SESSION* session, - const Config& config, - mxs::SRWBackends backends, - mxs::RWBackend* master); + RWSplitSession(RWSplit* instance, MXS_SESSION* session, mxs::SRWBackends backends); + bool open_connections(); void process_sescmd_response(mxs::RWBackend* backend, GWBUF** ppPacket); void compress_history(mxs::SSessionCommand& sescmd); @@ -163,6 +160,7 @@ private: bool handle_got_target(GWBUF* querybuf, mxs::RWBackend* target, bool store); void handle_connection_keepalive(mxs::RWBackend* target); bool prepare_target(mxs::RWBackend* target, route_target_t route_target); + bool prepare_connection(mxs::RWBackend* target); bool create_one_connection(); void retry_query(GWBUF* querybuf, int delay = 1); @@ -248,6 +246,13 @@ private: return !m_config.disable_sescmd_history || m_recv_sescmd == 0; } + inline bool can_continue_session() const + { + return std::any_of(m_raw_backends.begin(), m_raw_backends.end(), [](mxs::RWBackend* b) { + return b->in_use(); + }); + } + inline bool is_large_query(GWBUF* buf) { uint32_t buflen = gwbuf_length(buf);