From 90c249a8b27ad12916ccecdc0ba8ca2e8ba03e4f Mon Sep 17 00:00:00 2001 From: Esa Korhonen Date: Mon, 27 Mar 2017 10:07:51 +0300 Subject: [PATCH] HintRouter: Support various hint types, round-robin rotate slaves Supports hint types: -master -slave -named server -all A default action, which is performed when no hint exists or on error, can be set. The different actions are analogous to the hint types. A maximum connection number for slaves can be set. If more slaves are configured for the service, the filter will rotate slaves for new sessions. Within a session with multiple slaves, the "route_to_slave"-hint will also rotate among the slave backends. --- .../modules/routing/hintrouter/hintrouter.cc | 160 ++++++++--- .../modules/routing/hintrouter/hintrouter.hh | 21 +- .../routing/hintrouter/hintrouterdefs.hh | 2 + .../routing/hintrouter/hintroutersession.cc | 251 ++++++++++++++---- .../routing/hintrouter/hintroutersession.hh | 36 ++- 5 files changed, 377 insertions(+), 93 deletions(-) diff --git a/server/modules/routing/hintrouter/hintrouter.cc b/server/modules/routing/hintrouter/hintrouter.cc index 7d71c1304..8cd4f6fe2 100644 --- a/server/modules/routing/hintrouter/hintrouter.cc +++ b/server/modules/routing/hintrouter/hintrouter.cc @@ -13,65 +13,158 @@ #define MXS_MODULE_NAME "hintrouter" #include "hintrouter.hh" + +#include +#include + #include #include "dcb.hh" +static const MXS_ENUM_VALUE default_action_values[] = +{ + {"master", HINT_ROUTE_TO_MASTER}, + {"slave", HINT_ROUTE_TO_SLAVE}, + {"named_server", HINT_ROUTE_TO_NAMED_SERVER}, + {"route_to_all", HINT_ROUTE_TO_ALL}, + {NULL} /* Last must be NULL */ +}; +static const char DEFAULT_ACTION[] = "default_action"; +static const char DEFAULT_SERVER[] = "default_server"; +static const char MAX_SLAVES[] = "max_slaves"; -HintRouter::HintRouter(SERVICE* pService) - : maxscale::Router(pService) +HintRouter::HintRouter(SERVICE* pService, HINT_TYPE default_action, string& default_server, + int max_slaves) + : maxscale::Router(pService), + m_default_action(default_action), + m_default_server(default_server), + m_max_slaves(max_slaves), + m_total_slave_conns(0) { HR_ENTRY(); + if (m_max_slaves < 0) + { // set a reasonable default value + m_max_slaves = pService->n_dbref - 1; + } MXS_NOTICE("Hint router [%s] created.", pService->name); } - //static HintRouter* HintRouter::create(SERVICE* pService, char** pzOptions) { HR_ENTRY(); - return new HintRouter(pService); -} + MXS_CONFIG_PARAMETER* params = pService->svc_config_param; + HINT_TYPE default_action = (HINT_TYPE)config_get_enum(params, DEFAULT_ACTION, + default_action_values); + string default_server(config_get_string(params, DEFAULT_SERVER)); + int max_slaves = config_get_integer(params, MAX_SLAVES); + return new HintRouter(pService, default_action, default_server, max_slaves); +} HintRouterSession* HintRouter::newSession(MXS_SESSION *pSession) { + typedef HintRouterSession::RefArray::size_type array_index; HR_ENTRY(); - HintRouterSession* pRouterSession = NULL; + Dcb master_Dcb(NULL); + HintRouterSession::BackendMap all_backends; + all_backends.rehash(1 + m_max_slaves); + HintRouterSession::BackendArray slave_arr; + slave_arr.reserve(m_max_slaves); - HintRouterSession::Backends backends; + SERVER_REF* master_ref = NULL; + HintRouterSession::RefArray slave_refs; + slave_refs.reserve(m_max_slaves); - for (SERVER_REF* pSref = m_pService->dbref; pSref; pSref = pSref->next) + /* Go through the server references, find master and slaves */ + for (SERVER_REF* pSref = pSession->service->dbref; pSref; pSref = pSref->next) { if (SERVER_REF_IS_ACTIVE(pSref)) { - SERVER* pServer = pSref->server; - - HR_DEBUG("Connecting to %s.", pServer->name); - - DCB* pDcb = dcb_connect(pServer, pSession, pServer->protocol); - - if (pDcb) + if (SERVER_IS_MASTER(pSref->server)) { - HR_DEBUG("Connected to %p %s.", pDcb, pServer->name); - // TODO: What's done here, should be done by dcb_connect(). - atomic_add(&pSref->connections, 1); - pDcb->service = pSession->service; - - backends.push_back(Dcb(pDcb)); + if (!master_ref) + { + master_ref = pSref; + } + else + { + MXS_WARNING("Found multiple master servers when creating session.\n"); + } } - else + else if (SERVER_IS_SLAVE(pSref->server)) { - HR_DEBUG("Failed to connect to %s.", pServer->name); + slave_refs.push_back(pSref); } } } - if (backends.size() != 0) + if (master_ref) { - pRouterSession = new HintRouterSession(pSession, this, backends); + // Connect to master + HR_DEBUG("Connecting to %s.", master_ref->server->name); + DCB* master_conn = dcb_connect(master_ref->server, pSession, master_ref->server->protocol); + + if (master_conn) + { + HR_DEBUG("Connected."); + atomic_add(&master_ref->connections, 1); + master_conn->service = pSession->service; + + master_Dcb = Dcb(master_conn); + string name(master_conn->server->unique_name); + all_backends.insert(HintRouterSession::MapElement(name, master_Dcb)); + } + else + { + HR_DEBUG("Connection failed."); + } } - return pRouterSession; + /* Different sessions may use different slaves if the 'max_session_slaves'- + * setting is low enough. First, set maximal looping limits noting that the + * array is treated as a ring. Also, array size may have changed since last + * time it was formed. */ + if (slave_refs.size()) + { + array_index size = slave_refs.size(); + array_index begin = m_total_slave_conns % size; + array_index limit = begin + size; + + int slave_conns = 0; + array_index current = begin; + for (; + (slave_conns < m_max_slaves) && current != limit; + current++) + { + SERVER_REF* slave_ref = slave_refs.at(current % size); + // Connect to a slave + HR_DEBUG("Connecting to %s.", slave_ref->server->name); + DCB* slave_conn = dcb_connect(slave_ref->server, pSession, slave_ref->server->protocol); + + if (slave_conn) + { + HR_DEBUG("Connected."); + atomic_add(&slave_ref->connections, 1); + slave_conn->service = pSession->service; + Dcb slave_Dcb(slave_conn); + slave_arr.push_back(slave_Dcb); + + string name(slave_conn->server->unique_name); + all_backends.insert(HintRouterSession::MapElement(name, slave_Dcb)); + slave_conns++; + } + else + { + HR_DEBUG("Connection failed."); + } + } + m_total_slave_conns += slave_conns; + } + if (all_backends.size() != 0) + { + return new HintRouterSession(pSession, this, all_backends); + } + return NULL; } void HintRouter::diagnostics(DCB* pOut) @@ -82,10 +175,9 @@ void HintRouter::diagnostics(DCB* pOut) uint64_t HintRouter::getCapabilities() { HR_ENTRY(); - return RCAP_TYPE_NONE; + return RCAP_TYPE_STMT_INPUT | RCAP_TYPE_RESULTSET_OUTPUT; } - extern "C" MXS_MODULE* MXS_CREATE_MODULE() { static MXS_MODULE module = @@ -95,16 +187,24 @@ extern "C" MXS_MODULE* MXS_CREATE_MODULE() MXS_ROUTER_VERSION, /* Implemented module API version */ "A hint router", /* Description */ "V1.0.0", /* Module version */ - RCAP_TYPE_STMT_OUTPUT, + RCAP_TYPE_STMT_INPUT | RCAP_TYPE_RESULTSET_OUTPUT, &HintRouter::s_object, NULL, /* Process init, can be null */ NULL, /* Process finish, can be null */ NULL, /* Thread init */ NULL, /* Thread finish */ { + { + DEFAULT_ACTION, + MXS_MODULE_PARAM_ENUM, + default_action_values[0].name, + MXS_MODULE_OPT_NONE, + default_action_values + }, + {DEFAULT_SERVER, MXS_MODULE_PARAM_SERVER, ""}, + {MAX_SLAVES, MXS_MODULE_PARAM_INT, "-1"}, {MXS_END_MODULE_PARAMS} } }; - return &module; } diff --git a/server/modules/routing/hintrouter/hintrouter.hh b/server/modules/routing/hintrouter/hintrouter.hh index f49885328..fe8f3c1e1 100644 --- a/server/modules/routing/hintrouter/hintrouter.hh +++ b/server/modules/routing/hintrouter/hintrouter.hh @@ -13,6 +13,7 @@ */ #include "hintrouterdefs.hh" + #include #include "hintroutersession.hh" @@ -20,15 +21,25 @@ class HintRouter : public maxscale::Router { public: static HintRouter* create(SERVICE* pService, char** pzOptions); - HintRouterSession* newSession(MXS_SESSION *pSession); - void diagnostics(DCB* pOut); - uint64_t getCapabilities(); - + HINT_TYPE get_default_action() const + { + return m_default_action; + }; + const string& get_default_server() const + { + return m_default_server; + }; private: - HintRouter(SERVICE* pService); + HintRouter(SERVICE* pService, HINT_TYPE default_action, string& default_server, + int max_slaves); + + HINT_TYPE m_default_action; + string m_default_server; + int m_max_slaves; + volatile int m_total_slave_conns; private: HintRouter(const HintRouter&); diff --git a/server/modules/routing/hintrouter/hintrouterdefs.hh b/server/modules/routing/hintrouter/hintrouterdefs.hh index edb075cd7..cdbef06ab 100644 --- a/server/modules/routing/hintrouter/hintrouterdefs.hh +++ b/server/modules/routing/hintrouter/hintrouterdefs.hh @@ -13,6 +13,8 @@ */ #include + +#include #include #define DEBUG_HINTROUTER diff --git a/server/modules/routing/hintrouter/hintroutersession.cc b/server/modules/routing/hintrouter/hintroutersession.cc index 2f5dc2ae4..72f9e28f9 100644 --- a/server/modules/routing/hintrouter/hintroutersession.cc +++ b/server/modules/routing/hintrouter/hintroutersession.cc @@ -13,19 +13,25 @@ #define MXS_MODULE_NAME "hintrouter" #include "hintroutersession.hh" + #include #include +#include +#include "hintrouter.hh" HintRouterSession::HintRouterSession(MXS_SESSION* pSession, HintRouter* pRouter, - const Backends& backends) + const BackendMap& backends) : maxscale::RouterSession(pSession) - , m_pRouter(pRouter) + , m_router(pRouter) , m_backends(backends) + , m_master(NULL) + , m_n_routed_to_slave(0) , m_surplus_replies(0) { HR_ENTRY(); + update_connections(); } @@ -38,6 +44,8 @@ HintRouterSession::~HintRouterSession() void HintRouterSession::close() { HR_ENTRY(); + m_master = Dcb(NULL); + m_slaves.clear(); m_backends.clear(); } @@ -48,18 +56,17 @@ namespace * Writer is a function object that writes a clone of a provided GWBUF, * to each dcb it is called with. */ -class Writer : std::unary_function +class Writer : std::unary_function { public: Writer(GWBUF* pPacket) : m_pPacket(pPacket) - { - } + {} - bool operator()(Dcb& dcb) + bool operator()(HintRouterSession::MapElement& elem) { bool rv = false; - + Dcb& dcb = elem.second; GWBUF* pPacket = gwbuf_clone(m_pPacket); if (pPacket) @@ -77,7 +84,6 @@ private: GWBUF* m_pPacket; }; - /** * HintMatcher is a function object that when invoked with a dcb, checks * whether the dcb matches the hint(s) that was given when the HintMatcher @@ -88,8 +94,7 @@ class HintMatcher : std::unary_function public: HintMatcher(HINT* pHint) : m_pHint(pHint) - { - } + {} bool operator()(const Dcb& dcb) { @@ -151,61 +156,178 @@ private: } +bool HintRouterSession::route_to_slave(GWBUF* pPacket) +{ + bool success = false; + // Find a valid slave + size_type size = m_slaves.size(); + size_type begin = m_n_routed_to_slave % size; + size_type limit = begin + size; + for (size_type curr = begin; curr != limit; curr++) + { + Dcb& candidate = m_slaves.at(curr % size); + if (SERVER_IS_SLAVE(candidate.server())) + { + success = candidate.write(pPacket); + if (success) + { + break; + } + } + } + /* It is (in theory) possible, that none of the slaves in the slave-array are + * working (or have been promoted to master) and the previous master is now + * a slave. In this situation, re-arranging the dcb:s will help. */ + if (!success) + { + update_connections(); + for (size_type curr = begin; curr != limit; curr++) + { + Dcb& candidate = m_slaves.at(curr % size); + success = candidate.write(pPacket); + if (success) + { + break; + } + } + } + + if (success) + { + m_n_routed_to_slave++; + } + return success; +} + +bool HintRouterSession::route_by_hint(GWBUF* pPacket, HINT* hint, bool ignore_errors) +{ + bool success = false; + switch (hint->type) + { + case HINT_ROUTE_TO_MASTER: + if (m_master.get()) + { + // The master server should be already known, but may have changed + if (SERVER_IS_MASTER(m_master.server())) + { + success = m_master.write(pPacket); + } + else + { + update_connections(); + if (m_master.get()) + { + HR_DEBUG("Writing packet to %s.", m_master.server()->name); + success = m_master.write(pPacket); + } + } + } + else if (!ignore_errors) + { + MXS_ERROR("Hint suggests routing to master when no master connected."); + } + break; + + case HINT_ROUTE_TO_SLAVE: + if (m_slaves.size()) + { + success = route_to_slave(pPacket); + } + else if (!ignore_errors) + { + MXS_ERROR("Hint suggests routing to slave when no slave connected."); + } + break; + + case HINT_ROUTE_TO_NAMED_SERVER: + { + string backend_name((hint->data) ? (const char*)(hint->data) : ""); + BackendMap::const_iterator iter = m_backends.find(backend_name); + if (iter != m_backends.end()) + { + HR_DEBUG("Writing packet to %s.", iter->second.server()->unique_name); + success = iter->second.write(pPacket); + } + else if (!ignore_errors) + { + MXS_ERROR("Hint suggests routing to backend '%s' when no such backend connected.", + backend_name.c_str()); + } + } + break; + + case HINT_ROUTE_TO_ALL: + { + HR_DEBUG("Writing packet to %lu backends.", m_backends.size()); + size_type n_writes = + std::count_if(m_backends.begin(), m_backends.end(), Writer(pPacket)); + if (n_writes != 0) + { + m_surplus_replies = n_writes - 1; + } + else if (!ignore_errors) + { + MXS_ERROR("Nothing could be written, terminating session."); + } + + success = (n_writes == m_backends.size()); // Is this too strict? + if (success) + { + gwbuf_free(pPacket); + } + } + break; + + default: + MXS_ERROR("Unsupported hint type '%d'", hint->type); + break; + } + + return success; +} + int32_t HintRouterSession::routeQuery(GWBUF* pPacket) { HR_ENTRY(); - int32_t continue_routing = 1; + bool success = false; if (pPacket->hint) { - // At least one hint => look for match. + /* At least one hint => look for match. Only use the later hints if the + * first is unsuccessful. */ + HINT* current_hint = pPacket->hint; HR_DEBUG("Hint, looking for match."); - - Backends::iterator i = std::find_if(m_backends.begin(), - m_backends.end(), - HintMatcher(pPacket->hint)); - - if (i != m_backends.end()) + while (!success && current_hint) { - Dcb& dcb = *i; - HR_DEBUG("Writing packet to %s.", dcb.server()->name); - - dcb.write(pPacket); - - // Rotate dcbs so that we get round robin as far as the slaves are concerned. - m_backends.push_front(m_backends.back()); // Push last to first - m_backends.pop_back(); // Remove last element - - m_surplus_replies = 0; - } - else - { - MXS_ERROR("No backend to write to."); - continue_routing = 0; + success = route_by_hint(pPacket, current_hint, true); + current_hint = current_hint->next; } } - else + + if (!success) { - // No hint => all. - HR_DEBUG("No hints, writing to all."); - - size_t n_writes = std::count_if(m_backends.begin(), m_backends.end(), Writer(pPacket)); - gwbuf_free(pPacket); - - if (n_writes != 0) + // No hint => default action. + HR_DEBUG("No hints or hint-based routing failed, falling back to default action."); + HINT fake_hint = {}; + fake_hint.type = m_router->get_default_action(); + if (fake_hint.type == HINT_ROUTE_TO_NAMED_SERVER) { - m_surplus_replies = n_writes - 1; + fake_hint.data = MXS_STRDUP(m_router->get_default_server().c_str()); + // Ignore allocation error, it will just result in an error later on } - else + success = route_by_hint(pPacket, &fake_hint, false); + if (fake_hint.type == HINT_ROUTE_TO_NAMED_SERVER) { - MXS_ERROR("Nothing could be written, terminating session."); - - continue_routing = 0; + MXS_FREE(fake_hint.data); } } - return continue_routing; + if (!success) + { + gwbuf_free(pPacket); + } + return success; } @@ -217,20 +339,19 @@ void HintRouterSession::clientReply(GWBUF* pPacket, DCB* pBackend) if (m_surplus_replies == 0) { - HR_DEBUG("Returning packet from %s.", pServer ? pServer->name : "(null)"); + HR_DEBUG("Returning packet from %s.", pServer ? pServer->unique_name : "(null)"); MXS_SESSION_ROUTE_REPLY(pBackend->session, pPacket); } else { - HR_DEBUG("Ignoring reply packet from %s.", pServer ? pServer->name : "(null)"); + HR_DEBUG("Ignoring reply packet from %s.", pServer ? pServer->unique_name : "(null)"); --m_surplus_replies; gwbuf_free(pPacket); } } - void HintRouterSession::handleError(GWBUF* pMessage, DCB* pProblem, mxs_error_action_t action, @@ -274,3 +395,33 @@ void HintRouterSession::handleError(GWBUF* pMessage, *pSuccess = false; } } + +void HintRouterSession::update_connections() +{ + /* Attempt to rearrange the dcb:s in the session such that the master and + * slave containers are correct again. Do not try to make new connections, + * since those would not have the correct session state anyways. */ + m_master = Dcb(NULL); + m_slaves.clear(); + + for (BackendMap::const_iterator iter = m_backends.begin(); + iter != m_backends.end(); iter++) + { + SERVER* server = iter->second.get()->server; + if (SERVER_IS_MASTER(server)) + { + if (!m_master.get()) + { + m_master = iter->second; + } + else + { + MXS_WARNING("Found multiple master servers when updating connections."); + } + } + else if (SERVER_IS_SLAVE(server)) + { + m_slaves.push_back(iter->second); + } + } +} diff --git a/server/modules/routing/hintrouter/hintroutersession.hh b/server/modules/routing/hintrouter/hintroutersession.hh index d07195a41..674210f5a 100644 --- a/server/modules/routing/hintrouter/hintroutersession.hh +++ b/server/modules/routing/hintrouter/hintroutersession.hh @@ -13,26 +13,42 @@ */ #include "hintrouterdefs.hh" + #include +#include +#include +#include + #include #include "dcb.hh" +using std::string; + class HintRouter; class HintRouterSession : public maxscale::RouterSession { public: - typedef std::deque Backends; + typedef std::tr1::unordered_map BackendMap; // All backends, indexed by name + typedef std::vector BackendArray; + typedef std::vector RefArray; + typedef BackendMap::value_type MapElement; + typedef BackendArray::size_type size_type; HintRouterSession(MXS_SESSION* pSession, HintRouter* pRouter, - const Backends& backends); + const BackendMap& backends + ); ~HintRouterSession(); void close(); int32_t routeQuery(GWBUF* pPacket); + bool route_by_hint(GWBUF* pPacket, HINT* current_hint, bool ignore_errors); + + bool route_to_slave(GWBUF* pPacket); + void clientReply(GWBUF* pPacket, DCB* pBackend); @@ -41,13 +57,17 @@ public: mxs_error_action_t action, bool* pSuccess); -private: - HintRouterSession(const HintRouterSession&); - HintRouterSession& operator = (const HintRouterSession&); + void update_connections(); private: - HintRouter* m_pRouter; - Backends m_backends; - size_t m_surplus_replies; + HintRouterSession(const HintRouterSession&); // denied + HintRouterSession& operator = (const HintRouterSession&); // denied +private: + HintRouter* m_router; + BackendMap m_backends; // all connections + Dcb m_master; // connection to master + BackendArray m_slaves; // connections to slaves + size_type m_n_routed_to_slave; // packets routed to a single slave, used for rr + size_t m_surplus_replies; // how many replies should be ignored };