diff --git a/Documentation/Routers/ReadWriteSplit.md b/Documentation/Routers/ReadWriteSplit.md index f24dc0601..5ede6413b 100644 --- a/Documentation/Routers/ReadWriteSplit.md +++ b/Documentation/Routers/ReadWriteSplit.md @@ -68,7 +68,7 @@ master_failure_mode=fail_on_write ### `connection_keepalive` Send keepalive pings to backend servers. This feature was introduced in MaxScale -2.1.2 and is disabled by default. +2.2.0 and is disabled by default. The parameter value is the interval in seconds between each keepalive ping. A keepalive ping will be sent to a backend server if the connection is idle and it diff --git a/server/core/poll.cc b/server/core/poll.cc index 38918a525..92930e688 100644 --- a/server/core/poll.cc +++ b/server/core/poll.cc @@ -595,10 +595,6 @@ poll_resolve_error(DCB *dcb, int errornum, bool adding) return -1; } -#define BLOCKINGPOLL 0 /*< Set BLOCKING POLL to 1 if using a single thread and to make - * debugging easier. - */ - /** * The main polling loop * @@ -665,10 +661,6 @@ poll_waitevents(void *arg) while (1) { atomic_add(&n_waiting, 1); -#if BLOCKINGPOLL - nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1); - atomic_add(&n_waiting, -1); -#else /* BLOCKINGPOLL */ #if MUTEX_EPOLL simple_mutex_lock(&epoll_wait_mutex, TRUE); #endif @@ -723,7 +715,6 @@ poll_waitevents(void *arg) #if MUTEX_EPOLL simple_mutex_unlock(&epoll_wait_mutex); #endif -#endif /* BLOCKINGPOLL */ if (nfds > 0) { ts_stats_set(pollStats.evq_length, nfds, thread_id); diff --git a/server/modules/routing/hintrouter/dcb.cc b/server/modules/routing/hintrouter/dcb.cc index 04b44c1c1..76b0b22b2 100644 --- a/server/modules/routing/hintrouter/dcb.cc +++ b/server/modules/routing/hintrouter/dcb.cc @@ -15,47 +15,34 @@ #include #include - Dcb::Dcb(DCB* pDcb) - : m_pDcb(pDcb) - , m_pRefs(NULL) + : m_sInner() { - try + // A null value for m_pDcb is allowed as a special non-existing dcb + if (pDcb) { - m_pRefs = new int (1); - } - catch (const std::exception&) - { - dcb_close(pDcb); - throw; + try + { + m_sInner = SDCB(pDcb, Dcb::deleter); + } + catch (const std::exception&) + { + dcb_close(pDcb); + throw; + } } } -Dcb::Dcb(const Dcb& rhs) - : m_pDcb(rhs.m_pDcb) - , m_pRefs(rhs.m_pRefs) +void Dcb::deleter(DCB* dcb) { - ++(*m_pRefs); -} - -Dcb& Dcb::operator = (Dcb rhs) -{ - swap(rhs); - return *this; -} - -void Dcb::dec() -{ - ss_dassert(*m_pRefs > 0); - - if (--(*m_pRefs) == 0) + if (dcb) { HR_DEBUG("CLOSING dcb"); // TODO: You should not need to manually adjust any // TODO: connections number, dcb_close should handle that. - SERVER_REF* pSref = m_pDcb->service->dbref; + SERVER_REF* pSref = dcb->service->dbref; - while (pSref && (pSref->server != m_pDcb->server)) + while (pSref && (pSref->server != dcb->server)) { pSref = pSref->next; } @@ -64,9 +51,6 @@ void Dcb::dec() { atomic_add(&pSref->connections, -1); } - - dcb_close(m_pDcb); - - delete m_pRefs; + dcb_close(dcb); } } diff --git a/server/modules/routing/hintrouter/dcb.hh b/server/modules/routing/hintrouter/dcb.hh index 038a8b0f4..fbb462449 100644 --- a/server/modules/routing/hintrouter/dcb.hh +++ b/server/modules/routing/hintrouter/dcb.hh @@ -13,53 +13,46 @@ */ #include "hintrouterdefs.hh" -#include + +#include + #include class Dcb { public: - explicit Dcb(DCB* pDcb); - Dcb(const Dcb& rhs); - ~Dcb() - { - dec(); - } + typedef std::tr1::shared_ptr SDCB; - Dcb& operator = (Dcb rhs); + explicit Dcb(DCB* pDcb); + + Dcb(const Dcb& rhs) + : m_sInner(rhs.m_sInner) + {}; + + Dcb& operator = (Dcb rhs) + { + m_sInner.swap(rhs.m_sInner); + return *this; + } struct server* server() const { - return m_pDcb->server; + return (this->m_sInner.get()) ? m_sInner.get()->server : NULL; } DCB* get() const { - return m_pDcb; + return m_sInner.get(); } - bool write(GWBUF* pPacket) + bool write(GWBUF* pPacket) const { - ss_dassert(m_pDcb); - return m_pDcb->func.write(m_pDcb, pPacket) == 1; + ss_dassert(m_sInner.get()); + return m_sInner.get()->func.write(m_sInner.get(), pPacket) == 1; } private: - void inc() - { - ++(*m_pRefs); - } - - void dec(); - - void swap(Dcb& rhs) - { - std::swap(m_pDcb, rhs.m_pDcb); - std::swap(m_pRefs, rhs.m_pRefs); - } - -private: - DCB* m_pDcb; - int* m_pRefs; + static void deleter(DCB* dcb); + SDCB m_sInner; }; diff --git a/server/modules/routing/hintrouter/hintrouter.cc b/server/modules/routing/hintrouter/hintrouter.cc index 7d71c1304..265d57cba 100644 --- a/server/modules/routing/hintrouter/hintrouter.cc +++ b/server/modules/routing/hintrouter/hintrouter.cc @@ -13,79 +13,190 @@ #define MXS_MODULE_NAME "hintrouter" #include "hintrouter.hh" + +#include +#include + #include #include "dcb.hh" +static const MXS_ENUM_VALUE default_action_values[] = +{ + {"master", HINT_ROUTE_TO_MASTER}, + {"slave", HINT_ROUTE_TO_SLAVE}, + {"named", HINT_ROUTE_TO_NAMED_SERVER}, + {"all", HINT_ROUTE_TO_ALL}, + {NULL} /* Last must be NULL */ +}; +static const char DEFAULT_ACTION[] = "default_action"; +static const char DEFAULT_SERVER[] = "default_server"; +static const char MAX_SLAVES[] = "max_slaves"; -HintRouter::HintRouter(SERVICE* pService) - : maxscale::Router(pService) +HintRouter::HintRouter(SERVICE* pService, HINT_TYPE default_action, string& default_server, + int max_slaves) + : maxscale::Router(pService), + m_routed_to_master(0), + m_routed_to_slave(0), + m_routed_to_named(0), + m_routed_to_all(0), + m_default_action(default_action), + m_default_server(default_server), + m_max_slaves(max_slaves), + m_total_slave_conns(0) { HR_ENTRY(); + if (m_max_slaves < 0) + { + // set a reasonable default value + m_max_slaves = pService->n_dbref - 1; + } MXS_NOTICE("Hint router [%s] created.", pService->name); } - //static HintRouter* HintRouter::create(SERVICE* pService, char** pzOptions) { HR_ENTRY(); - return new HintRouter(pService); -} + MXS_CONFIG_PARAMETER* params = pService->svc_config_param; + HINT_TYPE default_action = (HINT_TYPE)config_get_enum(params, DEFAULT_ACTION, + default_action_values); + string default_server(config_get_string(params, DEFAULT_SERVER)); + int max_slaves = config_get_integer(params, MAX_SLAVES); + return new HintRouter(pService, default_action, default_server, max_slaves); +} HintRouterSession* HintRouter::newSession(MXS_SESSION *pSession) { + typedef HintRouterSession::RefArray::size_type array_index; HR_ENTRY(); - HintRouterSession* pRouterSession = NULL; + Dcb master_Dcb(NULL); + HintRouterSession::BackendMap all_backends; + all_backends.rehash(1 + m_max_slaves); + HintRouterSession::BackendArray slave_arr; + slave_arr.reserve(m_max_slaves); - HintRouterSession::Backends backends; + SERVER_REF* master_ref = NULL; + HintRouterSession::RefArray slave_refs; + slave_refs.reserve(m_max_slaves); - for (SERVER_REF* pSref = m_pService->dbref; pSref; pSref = pSref->next) + /* Go through the server references, find master and slaves */ + for (SERVER_REF* pSref = pSession->service->dbref; pSref; pSref = pSref->next) { if (SERVER_REF_IS_ACTIVE(pSref)) { - SERVER* pServer = pSref->server; - - HR_DEBUG("Connecting to %s.", pServer->name); - - DCB* pDcb = dcb_connect(pServer, pSession, pServer->protocol); - - if (pDcb) + if (SERVER_IS_MASTER(pSref->server)) { - HR_DEBUG("Connected to %p %s.", pDcb, pServer->name); - // TODO: What's done here, should be done by dcb_connect(). - atomic_add(&pSref->connections, 1); - pDcb->service = pSession->service; - - backends.push_back(Dcb(pDcb)); + if (!master_ref) + { + master_ref = pSref; + } + else + { + MXS_WARNING("Found multiple master servers when creating session.\n"); + } } - else + else if (SERVER_IS_SLAVE(pSref->server)) { - HR_DEBUG("Failed to connect to %s.", pServer->name); + slave_refs.push_back(pSref); } } } - if (backends.size() != 0) + if (master_ref) { - pRouterSession = new HintRouterSession(pSession, this, backends); + // Connect to master + HR_DEBUG("Connecting to %s.", master_ref->server->unique_name); + DCB* master_conn = dcb_connect(master_ref->server, pSession, master_ref->server->protocol); + + if (master_conn) + { + HR_DEBUG("Connected."); + atomic_add(&master_ref->connections, 1); + master_conn->service = pSession->service; + + master_Dcb = Dcb(master_conn); + string name(master_conn->server->unique_name); + all_backends.insert(HintRouterSession::MapElement(name, master_Dcb)); + } + else + { + HR_DEBUG("Connection failed."); + } } - return pRouterSession; + /* Different sessions may use different slaves if the 'max_session_slaves'- + * setting is low enough. First, set maximal looping limits noting that the + * array is treated as a ring. Also, array size may have changed since last + * time it was formed. */ + if (slave_refs.size()) + { + array_index size = slave_refs.size(); + array_index begin = m_total_slave_conns % size; + array_index limit = begin + size; + + int slave_conns = 0; + array_index current = begin; + for (; + (slave_conns < m_max_slaves) && current != limit; + current++) + { + SERVER_REF* slave_ref = slave_refs.at(current % size); + // Connect to a slave + HR_DEBUG("Connecting to %s.", slave_ref->server->unique_name); + DCB* slave_conn = dcb_connect(slave_ref->server, pSession, slave_ref->server->protocol); + + if (slave_conn) + { + HR_DEBUG("Connected."); + atomic_add(&slave_ref->connections, 1); + slave_conn->service = pSession->service; + Dcb slave_Dcb(slave_conn); + slave_arr.push_back(slave_Dcb); + + string name(slave_conn->server->unique_name); + all_backends.insert(HintRouterSession::MapElement(name, slave_Dcb)); + slave_conns++; + } + else + { + HR_DEBUG("Connection failed."); + } + } + m_total_slave_conns += slave_conns; + } + if (all_backends.size() != 0) + { + return new HintRouterSession(pSession, this, all_backends); + } + return NULL; } void HintRouter::diagnostics(DCB* pOut) { HR_ENTRY(); + for (int i = 0; default_action_values[i].name; i++) + { + if (default_action_values[i].enum_value == m_default_action) + { + dcb_printf(pOut, "\tDefault action: route to %s\n", default_action_values[i].name); + } + } + dcb_printf(pOut, "\tDefault server: %s\n", m_default_server.c_str()); + dcb_printf(pOut, "\tMaximum slave connections/session: %d\n", m_max_slaves); + dcb_printf(pOut, "\tTotal cumulative slave connections: %d\n", m_total_slave_conns); + dcb_printf(pOut, "\tQueries routed to master: %d\n", m_routed_to_master); + dcb_printf(pOut, "\tQueries routed to single slave: %d\n", m_routed_to_slave); + dcb_printf(pOut, "\tQueries routed to named server: %d\n", m_routed_to_named); + dcb_printf(pOut, "\tQueries routed to all servers: %d\n", m_routed_to_all); } uint64_t HintRouter::getCapabilities() { HR_ENTRY(); - return RCAP_TYPE_NONE; + return RCAP_TYPE_STMT_INPUT | RCAP_TYPE_RESULTSET_OUTPUT; } - extern "C" MXS_MODULE* MXS_CREATE_MODULE() { static MXS_MODULE module = @@ -95,16 +206,24 @@ extern "C" MXS_MODULE* MXS_CREATE_MODULE() MXS_ROUTER_VERSION, /* Implemented module API version */ "A hint router", /* Description */ "V1.0.0", /* Module version */ - RCAP_TYPE_STMT_OUTPUT, + RCAP_TYPE_STMT_INPUT | RCAP_TYPE_RESULTSET_OUTPUT, &HintRouter::s_object, NULL, /* Process init, can be null */ NULL, /* Process finish, can be null */ NULL, /* Thread init */ NULL, /* Thread finish */ { + { + DEFAULT_ACTION, + MXS_MODULE_PARAM_ENUM, + default_action_values[0].name, + MXS_MODULE_OPT_NONE, + default_action_values + }, + {DEFAULT_SERVER, MXS_MODULE_PARAM_SERVER, ""}, + {MAX_SLAVES, MXS_MODULE_PARAM_INT, "-1"}, {MXS_END_MODULE_PARAMS} } }; - return &module; } diff --git a/server/modules/routing/hintrouter/hintrouter.hh b/server/modules/routing/hintrouter/hintrouter.hh index f49885328..2048113ce 100644 --- a/server/modules/routing/hintrouter/hintrouter.hh +++ b/server/modules/routing/hintrouter/hintrouter.hh @@ -13,6 +13,7 @@ */ #include "hintrouterdefs.hh" + #include #include "hintroutersession.hh" @@ -20,16 +21,30 @@ class HintRouter : public maxscale::Router { public: static HintRouter* create(SERVICE* pService, char** pzOptions); - HintRouterSession* newSession(MXS_SESSION *pSession); - void diagnostics(DCB* pOut); - uint64_t getCapabilities(); - + HINT_TYPE get_default_action() const + { + return m_default_action; + }; + const string& get_default_server() const + { + return m_default_server; + }; + /* Simple, approximate statistics */ + volatile unsigned int m_routed_to_master; + volatile unsigned int m_routed_to_slave; + volatile unsigned int m_routed_to_named; + volatile unsigned int m_routed_to_all; private: - HintRouter(SERVICE* pService); + HintRouter(SERVICE* pService, HINT_TYPE default_action, string& default_server, + int max_slaves); + HINT_TYPE m_default_action; + string m_default_server; + int m_max_slaves; + volatile int m_total_slave_conns; private: HintRouter(const HintRouter&); HintRouter& operator = (const HintRouter&); diff --git a/server/modules/routing/hintrouter/hintrouterdefs.hh b/server/modules/routing/hintrouter/hintrouterdefs.hh index edb075cd7..b435ad799 100644 --- a/server/modules/routing/hintrouter/hintrouterdefs.hh +++ b/server/modules/routing/hintrouter/hintrouterdefs.hh @@ -13,10 +13,16 @@ */ #include + +#include #include +#if defined(SS_DEBUG) #define DEBUG_HINTROUTER //#undef DEBUG_HINTROUTER +#else +#undef DEBUG_HINTROUTER +#endif #ifdef DEBUG_HINTROUTER #define HR_DEBUG(msg, ...) MXS_NOTICE(msg, ##__VA_ARGS__) diff --git a/server/modules/routing/hintrouter/hintroutersession.cc b/server/modules/routing/hintrouter/hintroutersession.cc index 2f5dc2ae4..18ad85af1 100644 --- a/server/modules/routing/hintrouter/hintroutersession.cc +++ b/server/modules/routing/hintrouter/hintroutersession.cc @@ -13,19 +13,58 @@ #define MXS_MODULE_NAME "hintrouter" #include "hintroutersession.hh" + #include #include +#include +#include "hintrouter.hh" + +namespace +{ +/** + * Writer is a function object that writes a clone of a provided GWBUF, + * to each dcb it is called with. + */ +class Writer : std::unary_function +{ +public: + Writer(GWBUF* pPacket) + : m_pPacket(pPacket) + {} + + bool operator()(HintRouterSession::MapElement& elem) + { + bool rv = false; + Dcb& dcb = elem.second; + GWBUF* pPacket = gwbuf_clone(m_pPacket); + + if (pPacket) + { + SERVER* pServer = dcb.server(); + HR_DEBUG("Writing packet to %p %s.", dcb.get(), pServer ? pServer->unique_name : "(null)"); + rv = dcb.write(pPacket); + } + return rv; + } + +private: + GWBUF* m_pPacket; +}; +} HintRouterSession::HintRouterSession(MXS_SESSION* pSession, HintRouter* pRouter, - const Backends& backends) + const BackendMap& backends) : maxscale::RouterSession(pSession) - , m_pRouter(pRouter) + , m_router(pRouter) , m_backends(backends) + , m_master(NULL) + , m_n_routed_to_slave(0) , m_surplus_replies(0) { HR_ENTRY(); + update_connections(); } @@ -38,174 +77,55 @@ HintRouterSession::~HintRouterSession() void HintRouterSession::close() { HR_ENTRY(); + m_master = Dcb(NULL); + m_slaves.clear(); m_backends.clear(); } -namespace -{ - -/** - * Writer is a function object that writes a clone of a provided GWBUF, - * to each dcb it is called with. - */ -class Writer : std::unary_function -{ -public: - Writer(GWBUF* pPacket) - : m_pPacket(pPacket) - { - } - - bool operator()(Dcb& dcb) - { - bool rv = false; - - GWBUF* pPacket = gwbuf_clone(m_pPacket); - - if (pPacket) - { - SERVER* pServer = dcb.server(); - HR_DEBUG("Writing packet to %p %s.", dcb.get(), pServer ? pServer->name : "(null)"); - - rv = dcb.write(pPacket); - } - - return rv; - } - -private: - GWBUF* m_pPacket; -}; - - -/** - * HintMatcher is a function object that when invoked with a dcb, checks - * whether the dcb matches the hint(s) that was given when the HintMatcher - * was created. - */ -class HintMatcher : std::unary_function -{ -public: - HintMatcher(HINT* pHint) - : m_pHint(pHint) - { - } - - bool operator()(const Dcb& dcb) - { - bool match = false; - - SERVER* pServer = dcb.server(); - - if (pServer) - { - HINT* pHint = m_pHint; - - while (!match && pHint) - { - switch (pHint->type) - { - case HINT_ROUTE_TO_MASTER: - if (SERVER_IS_MASTER(pServer)) - { - match = true; - } - break; - - case HINT_ROUTE_TO_SLAVE: - if (SERVER_IS_SLAVE(pServer)) - { - match = true; - } - break; - - case HINT_ROUTE_TO_NAMED_SERVER: - { - const char* zName = static_cast(pHint->data); - - if (strcmp(zName, pServer->name) == 0) - { - match = true; - } - } - break; - - case HINT_ROUTE_TO_UPTODATE_SERVER: - case HINT_ROUTE_TO_ALL: - case HINT_PARAMETER: - MXS_ERROR("HINT not handled."); - ss_dassert(false); - break; - } - - pHint = pHint->next; - } - } - - return match; - } - -private: - HINT* m_pHint; -}; - -} - int32_t HintRouterSession::routeQuery(GWBUF* pPacket) { HR_ENTRY(); - int32_t continue_routing = 1; + bool success = false; if (pPacket->hint) { - // At least one hint => look for match. + /* At least one hint => look for match. Only use the later hints if the + * first is unsuccessful. */ + HINT* current_hint = pPacket->hint; HR_DEBUG("Hint, looking for match."); - - Backends::iterator i = std::find_if(m_backends.begin(), - m_backends.end(), - HintMatcher(pPacket->hint)); - - if (i != m_backends.end()) + while (!success && current_hint) { - Dcb& dcb = *i; - HR_DEBUG("Writing packet to %s.", dcb.server()->name); - - dcb.write(pPacket); - - // Rotate dcbs so that we get round robin as far as the slaves are concerned. - m_backends.push_front(m_backends.back()); // Push last to first - m_backends.pop_back(); // Remove last element - - m_surplus_replies = 0; - } - else - { - MXS_ERROR("No backend to write to."); - continue_routing = 0; + success = route_by_hint(pPacket, current_hint, false); + if (!success) + { + current_hint = current_hint->next; + } } } - else + + if (!success) { - // No hint => all. - HR_DEBUG("No hints, writing to all."); - - size_t n_writes = std::count_if(m_backends.begin(), m_backends.end(), Writer(pPacket)); - gwbuf_free(pPacket); - - if (n_writes != 0) + HR_DEBUG("No hints or hint-based routing failed, falling back to default action."); + HINT default_hint = {}; + default_hint.type = m_router->get_default_action(); + if (default_hint.type == HINT_ROUTE_TO_NAMED_SERVER) { - m_surplus_replies = n_writes - 1; + default_hint.data = MXS_STRDUP(m_router->get_default_server().c_str()); + // Ignore allocation error, it will just result in an error later on } - else + success = route_by_hint(pPacket, &default_hint, true); + if (default_hint.type == HINT_ROUTE_TO_NAMED_SERVER) { - MXS_ERROR("Nothing could be written, terminating session."); - - continue_routing = 0; + MXS_FREE(default_hint.data); } } - return continue_routing; + if (!success) + { + gwbuf_free(pPacket); + } + return success; } @@ -217,20 +137,19 @@ void HintRouterSession::clientReply(GWBUF* pPacket, DCB* pBackend) if (m_surplus_replies == 0) { - HR_DEBUG("Returning packet from %s.", pServer ? pServer->name : "(null)"); + HR_DEBUG("Returning packet from %s.", pServer ? pServer->unique_name : "(null)"); MXS_SESSION_ROUTE_REPLY(pBackend->session, pPacket); } else { - HR_DEBUG("Ignoring reply packet from %s.", pServer ? pServer->name : "(null)"); + HR_DEBUG("Ignoring reply packet from %s.", pServer ? pServer->unique_name : "(null)"); --m_surplus_replies; gwbuf_free(pPacket); } } - void HintRouterSession::handleError(GWBUF* pMessage, DCB* pProblem, mxs_error_action_t action, @@ -274,3 +193,217 @@ void HintRouterSession::handleError(GWBUF* pMessage, *pSuccess = false; } } + +bool HintRouterSession::route_by_hint(GWBUF* pPacket, HINT* hint, bool print_errors) +{ + bool success = false; + switch (hint->type) + { + case HINT_ROUTE_TO_MASTER: + { + bool master_ok = false; + // The master server should be already known, but may have changed + if (m_master.get() && SERVER_IS_MASTER(m_master.server())) + { + master_ok = true; + } + else + { + update_connections(); + if (m_master.get()) + { + master_ok = true; + } + } + + if (master_ok) + { + HR_DEBUG("Writing packet to master: '%s'.", m_master.server()->unique_name); + success = m_master.write(pPacket); + if (success) + { + m_router->m_routed_to_master++; + } + else + { + HR_DEBUG("Write to master failed."); + } + } + else if (print_errors) + { + MXS_ERROR("Hint suggests routing to master when no master connected."); + } + } + break; + + case HINT_ROUTE_TO_SLAVE: + success = route_to_slave(pPacket, print_errors); + break; + + case HINT_ROUTE_TO_NAMED_SERVER: + { + string backend_name((hint->data) ? (const char*)(hint->data) : ""); + BackendMap::const_iterator iter = m_backends.find(backend_name); + if (iter != m_backends.end()) + { + HR_DEBUG("Writing packet to %s.", iter->second.server()->unique_name); + success = iter->second.write(pPacket); + if (success) + { + m_router->m_routed_to_named++; + } + else + { + HR_DEBUG("Write failed."); + } + } + else if (print_errors) + { + /* This shouldn't be possible with current setup as server names are + * checked on startup. With a different filter and the 'print_errors' + * on for the first call this is possible. */ + MXS_ERROR("Hint suggests routing to backend '%s' when no such backend connected.", + backend_name.c_str()); + } + } + break; + + case HINT_ROUTE_TO_ALL: + { + HR_DEBUG("Writing packet to %lu backends.", m_backends.size()); + BackendMap::size_type n_writes = + std::count_if(m_backends.begin(), m_backends.end(), Writer(pPacket)); + if (n_writes != 0) + { + m_surplus_replies = n_writes - 1; + } + BackendMap::size_type size = m_backends.size(); + success = (n_writes == size); + if (success) + { + gwbuf_free(pPacket); + m_router->m_routed_to_all++; + } + else + { + HR_DEBUG("Write to all failed."); + if (print_errors) + { + MXS_ERROR("Write failed for '%lu' out of '%lu' backends.", + (size - n_writes), size); + } + } + } + break; + + default: + MXS_ERROR("Unsupported hint type '%d'", hint->type); + break; + } + return success; +} + +bool HintRouterSession::route_to_slave(GWBUF* pPacket, bool print_errors) +{ + bool success = false; + // Find a valid slave + size_type size = m_slaves.size(); + if (size) + { + size_type begin = m_n_routed_to_slave % size; + size_type limit = begin + size; + for (size_type curr = begin; curr != limit; curr++) + { + Dcb& candidate = m_slaves.at(curr % size); + if (SERVER_IS_SLAVE(candidate.server())) + { + HR_DEBUG("Writing packet to slave: '%s'.", candidate.server()->unique_name); + success = candidate.write(pPacket); + if (success) + { + break; + } + else + { + HR_DEBUG("Write to slave failed."); + } + } + } + } + + /* It is (in theory) possible, that none of the slaves in the slave-array are + * working (or have been promoted to master) and the previous master is now + * a slave. In this situation, re-arranging the dcb:s will help. */ + if (!success) + { + update_connections(); + size = m_slaves.size(); + if (size) + { + size_type begin = m_n_routed_to_slave % size; + size_type limit = begin + size; + for (size_type curr = begin; curr != limit; curr++) + { + Dcb& candidate = m_slaves.at(curr % size); + HR_DEBUG("Writing packet to slave: '%s'.", candidate.server()->unique_name); + success = candidate.write(pPacket); + if (success) + { + break; + } + else + { + HR_DEBUG("Write to slave failed."); + } + } + } + } + + if (success) + { + m_router->m_routed_to_slave++; + m_n_routed_to_slave++; + } + else if (print_errors) + { + if (!size) + { + MXS_ERROR("Hint suggests routing to slave when no slaves found."); + } + else + { + MXS_ERROR("Could not write to any of '%lu' slaves.", size); + } + } + return success; +} + +void HintRouterSession::update_connections() +{ + /* Attempt to rearrange the dcb:s in the session such that the master and + * slave containers are correct again. Do not try to make new connections, + * since those would not have the correct session state anyways. */ + m_master = Dcb(NULL); + m_slaves.clear(); + + for (BackendMap::const_iterator iter = m_backends.begin(); + iter != m_backends.end(); iter++) + { + SERVER* server = iter->second.get()->server; + if (SERVER_IS_MASTER(server)) + { + if (!m_master.get()) + { + m_master = iter->second; + } + else + { + MXS_WARNING("Found multiple master servers when updating connections."); + } + } + else if (SERVER_IS_SLAVE(server)) + { + m_slaves.push_back(iter->second); + } + } +} diff --git a/server/modules/routing/hintrouter/hintroutersession.hh b/server/modules/routing/hintrouter/hintroutersession.hh index d07195a41..003890470 100644 --- a/server/modules/routing/hintrouter/hintroutersession.hh +++ b/server/modules/routing/hintrouter/hintroutersession.hh @@ -13,20 +13,32 @@ */ #include "hintrouterdefs.hh" + #include +#include +#include +#include + #include #include "dcb.hh" +using std::string; + class HintRouter; class HintRouterSession : public maxscale::RouterSession { public: - typedef std::deque Backends; + typedef std::tr1::unordered_map BackendMap; // All backends, indexed by name + typedef std::vector BackendArray; + typedef std::vector RefArray; + typedef BackendMap::value_type MapElement; + typedef BackendArray::size_type size_type; HintRouterSession(MXS_SESSION* pSession, HintRouter* pRouter, - const Backends& backends); + const BackendMap& backends + ); ~HintRouterSession(); @@ -42,12 +54,17 @@ public: bool* pSuccess); private: - HintRouterSession(const HintRouterSession&); - HintRouterSession& operator = (const HintRouterSession&); - + HintRouterSession(const HintRouterSession&); // denied + HintRouterSession& operator = (const HintRouterSession&); // denied private: - HintRouter* m_pRouter; - Backends m_backends; - size_t m_surplus_replies; + bool route_by_hint(GWBUF* pPacket, HINT* current_hint, bool ignore_errors); + bool route_to_slave(GWBUF* pPacket, bool print_errors); + void update_connections(); + HintRouter* m_router; + BackendMap m_backends; // all connections + Dcb m_master; // connection to master + BackendArray m_slaves; // connections to slaves + size_type m_n_routed_to_slave; // packets routed to a single slave, used for rr + size_type m_surplus_replies; // how many replies should be ignored };