From 7165b4306f95fa8942b472813bd625a476b97b06 Mon Sep 17 00:00:00 2001 From: Johan Wikman Date: Mon, 6 Mar 2017 09:55:06 +0200 Subject: [PATCH] Implement first version of HintRouter The hintrouter is now in principle capable of routing requests to the master or to some slave (in a round robin fashion) based upon hints set by some earlier filter. Note that as the router is completely oblivious of transaction boundaries, using it with transactions and autocommit being off will not make anyone happy. Recognizing transaction boundaries using regexes and then pinning the server until transaction commit would be needed. --- .../modules/routing/hintrouter/CMakeLists.txt | 1 + server/modules/routing/hintrouter/dcb.cc | 72 ++++++ server/modules/routing/hintrouter/dcb.hh | 65 +++++ .../modules/routing/hintrouter/hintrouter.cc | 45 +++- .../modules/routing/hintrouter/hintrouter.hh | 2 +- .../routing/hintrouter/hintrouterdefs.hh | 27 ++ .../routing/hintrouter/hintroutersession.cc | 232 +++++++++++++++++- .../routing/hintrouter/hintroutersession.hh | 14 +- 8 files changed, 446 insertions(+), 12 deletions(-) create mode 100644 server/modules/routing/hintrouter/dcb.cc create mode 100644 server/modules/routing/hintrouter/dcb.hh create mode 100644 server/modules/routing/hintrouter/hintrouterdefs.hh diff --git a/server/modules/routing/hintrouter/CMakeLists.txt b/server/modules/routing/hintrouter/CMakeLists.txt index 3c32e39b9..b7e9fad85 100644 --- a/server/modules/routing/hintrouter/CMakeLists.txt +++ b/server/modules/routing/hintrouter/CMakeLists.txt @@ -1,6 +1,7 @@ add_library(hintrouter SHARED hintrouter.cc hintroutersession.cc + dcb.cc ) target_link_libraries(hintrouter maxscale-common) diff --git a/server/modules/routing/hintrouter/dcb.cc b/server/modules/routing/hintrouter/dcb.cc new file mode 100644 index 000000000..04b44c1c1 --- /dev/null +++ b/server/modules/routing/hintrouter/dcb.cc @@ -0,0 +1,72 @@ +/* + * Copyright (c) 2016 MariaDB Corporation Ab + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file and at www.mariadb.com/bsl11. + * + * Change Date: 2019-07-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2 or later of the General + * Public License. + */ + +#include "dcb.hh" +#include +#include + + +Dcb::Dcb(DCB* pDcb) + : m_pDcb(pDcb) + , m_pRefs(NULL) +{ + try + { + m_pRefs = new int (1); + } + catch (const std::exception&) + { + dcb_close(pDcb); + throw; + } +} + +Dcb::Dcb(const Dcb& rhs) + : m_pDcb(rhs.m_pDcb) + , m_pRefs(rhs.m_pRefs) +{ + ++(*m_pRefs); +} + +Dcb& Dcb::operator = (Dcb rhs) +{ + swap(rhs); + return *this; +} + +void Dcb::dec() +{ + ss_dassert(*m_pRefs > 0); + + if (--(*m_pRefs) == 0) + { + HR_DEBUG("CLOSING dcb"); + // TODO: You should not need to manually adjust any + // TODO: connections number, dcb_close should handle that. + SERVER_REF* pSref = m_pDcb->service->dbref; + + while (pSref && (pSref->server != m_pDcb->server)) + { + pSref = pSref->next; + } + + if (pSref) + { + atomic_add(&pSref->connections, -1); + } + + dcb_close(m_pDcb); + + delete m_pRefs; + } +} diff --git a/server/modules/routing/hintrouter/dcb.hh b/server/modules/routing/hintrouter/dcb.hh new file mode 100644 index 000000000..038a8b0f4 --- /dev/null +++ b/server/modules/routing/hintrouter/dcb.hh @@ -0,0 +1,65 @@ +#pragma once +/* + * Copyright (c) 2016 MariaDB Corporation Ab + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file and at www.mariadb.com/bsl11. + * + * Change Date: 2019-07-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2 or later of the General + * Public License. + */ + +#include "hintrouterdefs.hh" +#include +#include + +class Dcb +{ +public: + explicit Dcb(DCB* pDcb); + Dcb(const Dcb& rhs); + ~Dcb() + { + dec(); + } + + Dcb& operator = (Dcb rhs); + + struct server* server() const + { + return m_pDcb->server; + } + + DCB* get() const + { + return m_pDcb; + } + + bool write(GWBUF* pPacket) + { + ss_dassert(m_pDcb); + return m_pDcb->func.write(m_pDcb, pPacket) == 1; + } + +private: + void inc() + { + ++(*m_pRefs); + } + + void dec(); + + void swap(Dcb& rhs) + { + std::swap(m_pDcb, rhs.m_pDcb); + std::swap(m_pRefs, rhs.m_pRefs); + } + +private: + DCB* m_pDcb; + int* m_pRefs; +}; + diff --git a/server/modules/routing/hintrouter/hintrouter.cc b/server/modules/routing/hintrouter/hintrouter.cc index e11e21cef..237e55dfa 100644 --- a/server/modules/routing/hintrouter/hintrouter.cc +++ b/server/modules/routing/hintrouter/hintrouter.cc @@ -14,11 +14,13 @@ #define MXS_MODULE_NAME "hintrouter" #include "hintrouter.hh" #include +#include "dcb.hh" HintRouter::HintRouter(SERVICE* pService) : maxscale::Router(pService) { + HR_ENTRY(); MXS_NOTICE("Hint router [%s] created.", pService->name); } @@ -26,22 +28,61 @@ HintRouter::HintRouter(SERVICE* pService) //static HintRouter* HintRouter::create(SERVICE* pService, char** pzOptions) { + HR_ENTRY(); return new HintRouter(pService); } HintRouterSession* HintRouter::newSession(MXS_SESSION *pSession) { - return new HintRouterSession(pSession, this); + HR_ENTRY(); + HintRouterSession* pRouterSession = NULL; + + HintRouterSession::Backends backends; + + for (SERVER_REF* pSref = m_pService->dbref; pSref; pSref = pSref->next) + { + if (SERVER_REF_IS_ACTIVE(pSref)) + { + SERVER* pServer = pSref->server; + + HR_DEBUG("Connecting to %s.", pServer->name); + + DCB* pDcb = dcb_connect(pServer, pSession, pServer->protocol); + + if (pDcb) + { + HR_DEBUG("Connected to %p %s.", pDcb, pServer->name); + // TODO: What's done here, should be done by dcb_connect(). + atomic_add(&pSref->connections, 1); + pDcb->service = pSession->service; + + backends.push_back(Dcb(pDcb)); + } + else + { + HR_DEBUG("Failed to connect to %s.", pServer->name); + } + } + } + + if (backends.size() != 0) + { + pRouterSession = new HintRouterSession(pSession, this, backends); + } + + return pRouterSession; } void HintRouter::diagnostics(DCB* pOut) { + HR_ENTRY(); } uint64_t HintRouter::getCapabilities() { - return 0; + HR_ENTRY(); + return RCAP_TYPE_STMT_OUTPUT; } diff --git a/server/modules/routing/hintrouter/hintrouter.hh b/server/modules/routing/hintrouter/hintrouter.hh index f0d1314b4..f49885328 100644 --- a/server/modules/routing/hintrouter/hintrouter.hh +++ b/server/modules/routing/hintrouter/hintrouter.hh @@ -12,7 +12,7 @@ * Public License. */ -#include +#include "hintrouterdefs.hh" #include #include "hintroutersession.hh" diff --git a/server/modules/routing/hintrouter/hintrouterdefs.hh b/server/modules/routing/hintrouter/hintrouterdefs.hh new file mode 100644 index 000000000..edb075cd7 --- /dev/null +++ b/server/modules/routing/hintrouter/hintrouterdefs.hh @@ -0,0 +1,27 @@ +#pragma once +/* + * Copyright (c) 2016 MariaDB Corporation Ab + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file and at www.mariadb.com/bsl11. + * + * Change Date: 2019-07-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2 or later of the General + * Public License. + */ + +#include +#include + +#define DEBUG_HINTROUTER +//#undef DEBUG_HINTROUTER + +#ifdef DEBUG_HINTROUTER +#define HR_DEBUG(msg, ...) MXS_NOTICE(msg, ##__VA_ARGS__) +#define HR_ENTRY() HR_DEBUG(__func__) +#else +#define HR_DEBUG(msg, ...) +#define HR_ENTRY() +#endif diff --git a/server/modules/routing/hintrouter/hintroutersession.cc b/server/modules/routing/hintrouter/hintroutersession.cc index 9c9a4351d..2f5dc2ae4 100644 --- a/server/modules/routing/hintrouter/hintroutersession.cc +++ b/server/modules/routing/hintrouter/hintroutersession.cc @@ -13,36 +13,221 @@ #define MXS_MODULE_NAME "hintrouter" #include "hintroutersession.hh" -#include +#include +#include -HintRouterSession::HintRouterSession(MXS_SESSION* pSession, HintRouter* pRouter) +HintRouterSession::HintRouterSession(MXS_SESSION* pSession, + HintRouter* pRouter, + const Backends& backends) : maxscale::RouterSession(pSession) , m_pRouter(pRouter) + , m_backends(backends) + , m_surplus_replies(0) { + HR_ENTRY(); } HintRouterSession::~HintRouterSession() { + HR_ENTRY(); } void HintRouterSession::close() { + HR_ENTRY(); + m_backends.clear(); } +namespace +{ + +/** + * Writer is a function object that writes a clone of a provided GWBUF, + * to each dcb it is called with. + */ +class Writer : std::unary_function +{ +public: + Writer(GWBUF* pPacket) + : m_pPacket(pPacket) + { + } + + bool operator()(Dcb& dcb) + { + bool rv = false; + + GWBUF* pPacket = gwbuf_clone(m_pPacket); + + if (pPacket) + { + SERVER* pServer = dcb.server(); + HR_DEBUG("Writing packet to %p %s.", dcb.get(), pServer ? pServer->name : "(null)"); + + rv = dcb.write(pPacket); + } + + return rv; + } + +private: + GWBUF* m_pPacket; +}; + + +/** + * HintMatcher is a function object that when invoked with a dcb, checks + * whether the dcb matches the hint(s) that was given when the HintMatcher + * was created. + */ +class HintMatcher : std::unary_function +{ +public: + HintMatcher(HINT* pHint) + : m_pHint(pHint) + { + } + + bool operator()(const Dcb& dcb) + { + bool match = false; + + SERVER* pServer = dcb.server(); + + if (pServer) + { + HINT* pHint = m_pHint; + + while (!match && pHint) + { + switch (pHint->type) + { + case HINT_ROUTE_TO_MASTER: + if (SERVER_IS_MASTER(pServer)) + { + match = true; + } + break; + + case HINT_ROUTE_TO_SLAVE: + if (SERVER_IS_SLAVE(pServer)) + { + match = true; + } + break; + + case HINT_ROUTE_TO_NAMED_SERVER: + { + const char* zName = static_cast(pHint->data); + + if (strcmp(zName, pServer->name) == 0) + { + match = true; + } + } + break; + + case HINT_ROUTE_TO_UPTODATE_SERVER: + case HINT_ROUTE_TO_ALL: + case HINT_PARAMETER: + MXS_ERROR("HINT not handled."); + ss_dassert(false); + break; + } + + pHint = pHint->next; + } + } + + return match; + } + +private: + HINT* m_pHint; +}; + +} int32_t HintRouterSession::routeQuery(GWBUF* pPacket) { - MXS_ERROR("routeQuery not implemented yet."); - return 0; + HR_ENTRY(); + + int32_t continue_routing = 1; + + if (pPacket->hint) + { + // At least one hint => look for match. + HR_DEBUG("Hint, looking for match."); + + Backends::iterator i = std::find_if(m_backends.begin(), + m_backends.end(), + HintMatcher(pPacket->hint)); + + if (i != m_backends.end()) + { + Dcb& dcb = *i; + HR_DEBUG("Writing packet to %s.", dcb.server()->name); + + dcb.write(pPacket); + + // Rotate dcbs so that we get round robin as far as the slaves are concerned. + m_backends.push_front(m_backends.back()); // Push last to first + m_backends.pop_back(); // Remove last element + + m_surplus_replies = 0; + } + else + { + MXS_ERROR("No backend to write to."); + continue_routing = 0; + } + } + else + { + // No hint => all. + HR_DEBUG("No hints, writing to all."); + + size_t n_writes = std::count_if(m_backends.begin(), m_backends.end(), Writer(pPacket)); + gwbuf_free(pPacket); + + if (n_writes != 0) + { + m_surplus_replies = n_writes - 1; + } + else + { + MXS_ERROR("Nothing could be written, terminating session."); + + continue_routing = 0; + } + } + + return continue_routing; } void HintRouterSession::clientReply(GWBUF* pPacket, DCB* pBackend) { - MXS_ERROR("clientReply not implemented yet."); + HR_ENTRY(); + + SERVER* pServer = pBackend->server; + + if (m_surplus_replies == 0) + { + HR_DEBUG("Returning packet from %s.", pServer ? pServer->name : "(null)"); + + MXS_SESSION_ROUTE_REPLY(pBackend->session, pPacket); + } + else + { + HR_DEBUG("Ignoring reply packet from %s.", pServer ? pServer->name : "(null)"); + + --m_surplus_replies; + gwbuf_free(pPacket); + } } @@ -51,6 +236,41 @@ void HintRouterSession::handleError(GWBUF* pMessage, mxs_error_action_t action, bool* pSuccess) { + HR_ENTRY(); + ss_dassert(pProblem->dcb_role == DCB_ROLE_BACKEND_HANDLER); - MXS_ERROR("handleError not implemented yet."); + + MXS_SESSION* pSession = pProblem->session; + mxs_session_state_t sesstate = pSession->state; + + switch (action) + { + case ERRACT_REPLY_CLIENT: + { + /* React to failed authentication, send message to client */ + if (sesstate == SESSION_STATE_ROUTER_READY) + { + /* Send error report to client */ + GWBUF* pCopy = gwbuf_clone(pMessage); + if (pCopy) + { + DCB* pClient = pSession->client_dcb; + pClient->func.write(pClient, pCopy); + } + } + *pSuccess = false; + } + break; + + case ERRACT_NEW_CONNECTION: + { + HR_DEBUG("ERRACT_NEW_CONNECTION"); + *pSuccess = true; + } + break; + + default: + ss_dassert(!true); + *pSuccess = false; + } } diff --git a/server/modules/routing/hintrouter/hintroutersession.hh b/server/modules/routing/hintrouter/hintroutersession.hh index 43eeae83e..d07195a41 100644 --- a/server/modules/routing/hintrouter/hintroutersession.hh +++ b/server/modules/routing/hintrouter/hintroutersession.hh @@ -12,16 +12,21 @@ * Public License. */ -#include +#include "hintrouterdefs.hh" +#include #include +#include "dcb.hh" class HintRouter; class HintRouterSession : public maxscale::RouterSession { public: - HintRouterSession(MXS_SESSION* pSession, - HintRouter* pRouter); + typedef std::deque Backends; + + HintRouterSession(MXS_SESSION* pSession, + HintRouter* pRouter, + const Backends& backends); ~HintRouterSession(); @@ -42,4 +47,7 @@ private: private: HintRouter* m_pRouter; + Backends m_backends; + size_t m_surplus_replies; + };