Implement first version of HintRouter

The hintrouter is now in principle capable of routing requests
to the master or to some slave (in a round robin fashion) based
upon hints set by some earlier filter.

Note that as the router is completely oblivious of transaction
boundaries, using it with transactions and autocommit being off
will not make anyone happy.

Recognizing transaction boundaries using regexes and then pinning
the server until transaction commit would be needed.
This commit is contained in:
Johan Wikman
2017-03-06 09:55:06 +02:00
parent 9dbabb666a
commit 7165b4306f
8 changed files with 446 additions and 12 deletions

View File

@ -1,6 +1,7 @@
add_library(hintrouter SHARED
hintrouter.cc
hintroutersession.cc
dcb.cc
)
target_link_libraries(hintrouter maxscale-common)

View File

@ -0,0 +1,72 @@
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2019-07-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include "dcb.hh"
#include <maxscale/atomic.h>
#include <maxscale/service.h>
Dcb::Dcb(DCB* pDcb)
: m_pDcb(pDcb)
, m_pRefs(NULL)
{
try
{
m_pRefs = new int (1);
}
catch (const std::exception&)
{
dcb_close(pDcb);
throw;
}
}
Dcb::Dcb(const Dcb& rhs)
: m_pDcb(rhs.m_pDcb)
, m_pRefs(rhs.m_pRefs)
{
++(*m_pRefs);
}
Dcb& Dcb::operator = (Dcb rhs)
{
swap(rhs);
return *this;
}
void Dcb::dec()
{
ss_dassert(*m_pRefs > 0);
if (--(*m_pRefs) == 0)
{
HR_DEBUG("CLOSING dcb");
// TODO: You should not need to manually adjust any
// TODO: connections number, dcb_close should handle that.
SERVER_REF* pSref = m_pDcb->service->dbref;
while (pSref && (pSref->server != m_pDcb->server))
{
pSref = pSref->next;
}
if (pSref)
{
atomic_add(&pSref->connections, -1);
}
dcb_close(m_pDcb);
delete m_pRefs;
}
}

View File

@ -0,0 +1,65 @@
#pragma once
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2019-07-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include "hintrouterdefs.hh"
#include <algorithm>
#include <maxscale/dcb.h>
class Dcb
{
public:
explicit Dcb(DCB* pDcb);
Dcb(const Dcb& rhs);
~Dcb()
{
dec();
}
Dcb& operator = (Dcb rhs);
struct server* server() const
{
return m_pDcb->server;
}
DCB* get() const
{
return m_pDcb;
}
bool write(GWBUF* pPacket)
{
ss_dassert(m_pDcb);
return m_pDcb->func.write(m_pDcb, pPacket) == 1;
}
private:
void inc()
{
++(*m_pRefs);
}
void dec();
void swap(Dcb& rhs)
{
std::swap(m_pDcb, rhs.m_pDcb);
std::swap(m_pRefs, rhs.m_pRefs);
}
private:
DCB* m_pDcb;
int* m_pRefs;
};

View File

@ -14,11 +14,13 @@
#define MXS_MODULE_NAME "hintrouter"
#include "hintrouter.hh"
#include <maxscale/log_manager.h>
#include "dcb.hh"
HintRouter::HintRouter(SERVICE* pService)
: maxscale::Router<HintRouter, HintRouterSession>(pService)
{
HR_ENTRY();
MXS_NOTICE("Hint router [%s] created.", pService->name);
}
@ -26,22 +28,61 @@ HintRouter::HintRouter(SERVICE* pService)
//static
HintRouter* HintRouter::create(SERVICE* pService, char** pzOptions)
{
HR_ENTRY();
return new HintRouter(pService);
}
HintRouterSession* HintRouter::newSession(MXS_SESSION *pSession)
{
return new HintRouterSession(pSession, this);
HR_ENTRY();
HintRouterSession* pRouterSession = NULL;
HintRouterSession::Backends backends;
for (SERVER_REF* pSref = m_pService->dbref; pSref; pSref = pSref->next)
{
if (SERVER_REF_IS_ACTIVE(pSref))
{
SERVER* pServer = pSref->server;
HR_DEBUG("Connecting to %s.", pServer->name);
DCB* pDcb = dcb_connect(pServer, pSession, pServer->protocol);
if (pDcb)
{
HR_DEBUG("Connected to %p %s.", pDcb, pServer->name);
// TODO: What's done here, should be done by dcb_connect().
atomic_add(&pSref->connections, 1);
pDcb->service = pSession->service;
backends.push_back(Dcb(pDcb));
}
else
{
HR_DEBUG("Failed to connect to %s.", pServer->name);
}
}
}
if (backends.size() != 0)
{
pRouterSession = new HintRouterSession(pSession, this, backends);
}
return pRouterSession;
}
void HintRouter::diagnostics(DCB* pOut)
{
HR_ENTRY();
}
uint64_t HintRouter::getCapabilities()
{
return 0;
HR_ENTRY();
return RCAP_TYPE_STMT_OUTPUT;
}

View File

@ -12,7 +12,7 @@
* Public License.
*/
#include <maxscale/cppdefs.hh>
#include "hintrouterdefs.hh"
#include <maxscale/router.hh>
#include "hintroutersession.hh"

View File

@ -0,0 +1,27 @@
#pragma once
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2019-07-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include <maxscale/cppdefs.hh>
#include <maxscale/log_manager.h>
#define DEBUG_HINTROUTER
//#undef DEBUG_HINTROUTER
#ifdef DEBUG_HINTROUTER
#define HR_DEBUG(msg, ...) MXS_NOTICE(msg, ##__VA_ARGS__)
#define HR_ENTRY() HR_DEBUG(__func__)
#else
#define HR_DEBUG(msg, ...)
#define HR_ENTRY()
#endif

View File

@ -13,36 +13,221 @@
#define MXS_MODULE_NAME "hintrouter"
#include "hintroutersession.hh"
#include <maxscale/log_manager.h>
#include <algorithm>
#include <functional>
HintRouterSession::HintRouterSession(MXS_SESSION* pSession, HintRouter* pRouter)
HintRouterSession::HintRouterSession(MXS_SESSION* pSession,
HintRouter* pRouter,
const Backends& backends)
: maxscale::RouterSession(pSession)
, m_pRouter(pRouter)
, m_backends(backends)
, m_surplus_replies(0)
{
HR_ENTRY();
}
HintRouterSession::~HintRouterSession()
{
HR_ENTRY();
}
void HintRouterSession::close()
{
HR_ENTRY();
m_backends.clear();
}
namespace
{
/**
* Writer is a function object that writes a clone of a provided GWBUF,
* to each dcb it is called with.
*/
class Writer : std::unary_function<Dcb, bool>
{
public:
Writer(GWBUF* pPacket)
: m_pPacket(pPacket)
{
}
bool operator()(Dcb& dcb)
{
bool rv = false;
GWBUF* pPacket = gwbuf_clone(m_pPacket);
if (pPacket)
{
SERVER* pServer = dcb.server();
HR_DEBUG("Writing packet to %p %s.", dcb.get(), pServer ? pServer->name : "(null)");
rv = dcb.write(pPacket);
}
return rv;
}
private:
GWBUF* m_pPacket;
};
/**
* HintMatcher is a function object that when invoked with a dcb, checks
* whether the dcb matches the hint(s) that was given when the HintMatcher
* was created.
*/
class HintMatcher : std::unary_function<const Dcb, bool>
{
public:
HintMatcher(HINT* pHint)
: m_pHint(pHint)
{
}
bool operator()(const Dcb& dcb)
{
bool match = false;
SERVER* pServer = dcb.server();
if (pServer)
{
HINT* pHint = m_pHint;
while (!match && pHint)
{
switch (pHint->type)
{
case HINT_ROUTE_TO_MASTER:
if (SERVER_IS_MASTER(pServer))
{
match = true;
}
break;
case HINT_ROUTE_TO_SLAVE:
if (SERVER_IS_SLAVE(pServer))
{
match = true;
}
break;
case HINT_ROUTE_TO_NAMED_SERVER:
{
const char* zName = static_cast<const char*>(pHint->data);
if (strcmp(zName, pServer->name) == 0)
{
match = true;
}
}
break;
case HINT_ROUTE_TO_UPTODATE_SERVER:
case HINT_ROUTE_TO_ALL:
case HINT_PARAMETER:
MXS_ERROR("HINT not handled.");
ss_dassert(false);
break;
}
pHint = pHint->next;
}
}
return match;
}
private:
HINT* m_pHint;
};
}
int32_t HintRouterSession::routeQuery(GWBUF* pPacket)
{
MXS_ERROR("routeQuery not implemented yet.");
return 0;
HR_ENTRY();
int32_t continue_routing = 1;
if (pPacket->hint)
{
// At least one hint => look for match.
HR_DEBUG("Hint, looking for match.");
Backends::iterator i = std::find_if(m_backends.begin(),
m_backends.end(),
HintMatcher(pPacket->hint));
if (i != m_backends.end())
{
Dcb& dcb = *i;
HR_DEBUG("Writing packet to %s.", dcb.server()->name);
dcb.write(pPacket);
// Rotate dcbs so that we get round robin as far as the slaves are concerned.
m_backends.push_front(m_backends.back()); // Push last to first
m_backends.pop_back(); // Remove last element
m_surplus_replies = 0;
}
else
{
MXS_ERROR("No backend to write to.");
continue_routing = 0;
}
}
else
{
// No hint => all.
HR_DEBUG("No hints, writing to all.");
size_t n_writes = std::count_if(m_backends.begin(), m_backends.end(), Writer(pPacket));
gwbuf_free(pPacket);
if (n_writes != 0)
{
m_surplus_replies = n_writes - 1;
}
else
{
MXS_ERROR("Nothing could be written, terminating session.");
continue_routing = 0;
}
}
return continue_routing;
}
void HintRouterSession::clientReply(GWBUF* pPacket, DCB* pBackend)
{
MXS_ERROR("clientReply not implemented yet.");
HR_ENTRY();
SERVER* pServer = pBackend->server;
if (m_surplus_replies == 0)
{
HR_DEBUG("Returning packet from %s.", pServer ? pServer->name : "(null)");
MXS_SESSION_ROUTE_REPLY(pBackend->session, pPacket);
}
else
{
HR_DEBUG("Ignoring reply packet from %s.", pServer ? pServer->name : "(null)");
--m_surplus_replies;
gwbuf_free(pPacket);
}
}
@ -51,6 +236,41 @@ void HintRouterSession::handleError(GWBUF* pMessage,
mxs_error_action_t action,
bool* pSuccess)
{
HR_ENTRY();
ss_dassert(pProblem->dcb_role == DCB_ROLE_BACKEND_HANDLER);
MXS_ERROR("handleError not implemented yet.");
MXS_SESSION* pSession = pProblem->session;
mxs_session_state_t sesstate = pSession->state;
switch (action)
{
case ERRACT_REPLY_CLIENT:
{
/* React to failed authentication, send message to client */
if (sesstate == SESSION_STATE_ROUTER_READY)
{
/* Send error report to client */
GWBUF* pCopy = gwbuf_clone(pMessage);
if (pCopy)
{
DCB* pClient = pSession->client_dcb;
pClient->func.write(pClient, pCopy);
}
}
*pSuccess = false;
}
break;
case ERRACT_NEW_CONNECTION:
{
HR_DEBUG("ERRACT_NEW_CONNECTION");
*pSuccess = true;
}
break;
default:
ss_dassert(!true);
*pSuccess = false;
}
}

View File

@ -12,16 +12,21 @@
* Public License.
*/
#include <maxscale/cppdefs.hh>
#include "hintrouterdefs.hh"
#include <deque>
#include <maxscale/router.hh>
#include "dcb.hh"
class HintRouter;
class HintRouterSession : public maxscale::RouterSession
{
public:
HintRouterSession(MXS_SESSION* pSession,
HintRouter* pRouter);
typedef std::deque<Dcb> Backends;
HintRouterSession(MXS_SESSION* pSession,
HintRouter* pRouter,
const Backends& backends);
~HintRouterSession();
@ -42,4 +47,7 @@ private:
private:
HintRouter* m_pRouter;
Backends m_backends;
size_t m_surplus_replies;
};