Merge branch 'develop' into MXS-1075
This commit is contained in:
@ -15,47 +15,34 @@
|
||||
#include <maxscale/atomic.h>
|
||||
#include <maxscale/service.h>
|
||||
|
||||
|
||||
Dcb::Dcb(DCB* pDcb)
|
||||
: m_pDcb(pDcb)
|
||||
, m_pRefs(NULL)
|
||||
: m_sInner()
|
||||
{
|
||||
try
|
||||
// A null value for m_pDcb is allowed as a special non-existing dcb
|
||||
if (pDcb)
|
||||
{
|
||||
m_pRefs = new int (1);
|
||||
}
|
||||
catch (const std::exception&)
|
||||
{
|
||||
dcb_close(pDcb);
|
||||
throw;
|
||||
try
|
||||
{
|
||||
m_sInner = SDCB(pDcb, Dcb::deleter);
|
||||
}
|
||||
catch (const std::exception&)
|
||||
{
|
||||
dcb_close(pDcb);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Dcb::Dcb(const Dcb& rhs)
|
||||
: m_pDcb(rhs.m_pDcb)
|
||||
, m_pRefs(rhs.m_pRefs)
|
||||
void Dcb::deleter(DCB* dcb)
|
||||
{
|
||||
++(*m_pRefs);
|
||||
}
|
||||
|
||||
Dcb& Dcb::operator = (Dcb rhs)
|
||||
{
|
||||
swap(rhs);
|
||||
return *this;
|
||||
}
|
||||
|
||||
void Dcb::dec()
|
||||
{
|
||||
ss_dassert(*m_pRefs > 0);
|
||||
|
||||
if (--(*m_pRefs) == 0)
|
||||
if (dcb)
|
||||
{
|
||||
HR_DEBUG("CLOSING dcb");
|
||||
// TODO: You should not need to manually adjust any
|
||||
// TODO: connections number, dcb_close should handle that.
|
||||
SERVER_REF* pSref = m_pDcb->service->dbref;
|
||||
SERVER_REF* pSref = dcb->service->dbref;
|
||||
|
||||
while (pSref && (pSref->server != m_pDcb->server))
|
||||
while (pSref && (pSref->server != dcb->server))
|
||||
{
|
||||
pSref = pSref->next;
|
||||
}
|
||||
@ -64,9 +51,6 @@ void Dcb::dec()
|
||||
{
|
||||
atomic_add(&pSref->connections, -1);
|
||||
}
|
||||
|
||||
dcb_close(m_pDcb);
|
||||
|
||||
delete m_pRefs;
|
||||
dcb_close(dcb);
|
||||
}
|
||||
}
|
||||
|
@ -13,53 +13,46 @@
|
||||
*/
|
||||
|
||||
#include "hintrouterdefs.hh"
|
||||
#include <algorithm>
|
||||
|
||||
#include <tr1/memory>
|
||||
|
||||
#include <maxscale/dcb.h>
|
||||
|
||||
class Dcb
|
||||
{
|
||||
public:
|
||||
explicit Dcb(DCB* pDcb);
|
||||
Dcb(const Dcb& rhs);
|
||||
~Dcb()
|
||||
{
|
||||
dec();
|
||||
}
|
||||
typedef std::tr1::shared_ptr<DCB> SDCB;
|
||||
|
||||
Dcb& operator = (Dcb rhs);
|
||||
explicit Dcb(DCB* pDcb);
|
||||
|
||||
Dcb(const Dcb& rhs)
|
||||
: m_sInner(rhs.m_sInner)
|
||||
{};
|
||||
|
||||
Dcb& operator = (Dcb rhs)
|
||||
{
|
||||
m_sInner.swap(rhs.m_sInner);
|
||||
return *this;
|
||||
}
|
||||
|
||||
struct server* server() const
|
||||
{
|
||||
return m_pDcb->server;
|
||||
return (this->m_sInner.get()) ? m_sInner.get()->server : NULL;
|
||||
}
|
||||
|
||||
DCB* get() const
|
||||
{
|
||||
return m_pDcb;
|
||||
return m_sInner.get();
|
||||
}
|
||||
|
||||
bool write(GWBUF* pPacket)
|
||||
bool write(GWBUF* pPacket) const
|
||||
{
|
||||
ss_dassert(m_pDcb);
|
||||
return m_pDcb->func.write(m_pDcb, pPacket) == 1;
|
||||
ss_dassert(m_sInner.get());
|
||||
return m_sInner.get()->func.write(m_sInner.get(), pPacket) == 1;
|
||||
}
|
||||
|
||||
private:
|
||||
void inc()
|
||||
{
|
||||
++(*m_pRefs);
|
||||
}
|
||||
|
||||
void dec();
|
||||
|
||||
void swap(Dcb& rhs)
|
||||
{
|
||||
std::swap(m_pDcb, rhs.m_pDcb);
|
||||
std::swap(m_pRefs, rhs.m_pRefs);
|
||||
}
|
||||
|
||||
private:
|
||||
DCB* m_pDcb;
|
||||
int* m_pRefs;
|
||||
static void deleter(DCB* dcb);
|
||||
SDCB m_sInner;
|
||||
};
|
||||
|
||||
|
@ -13,79 +13,190 @@
|
||||
|
||||
#define MXS_MODULE_NAME "hintrouter"
|
||||
#include "hintrouter.hh"
|
||||
|
||||
#include <limits>
|
||||
#include <vector>
|
||||
|
||||
#include <maxscale/log_manager.h>
|
||||
#include "dcb.hh"
|
||||
|
||||
static const MXS_ENUM_VALUE default_action_values[] =
|
||||
{
|
||||
{"master", HINT_ROUTE_TO_MASTER},
|
||||
{"slave", HINT_ROUTE_TO_SLAVE},
|
||||
{"named", HINT_ROUTE_TO_NAMED_SERVER},
|
||||
{"all", HINT_ROUTE_TO_ALL},
|
||||
{NULL} /* Last must be NULL */
|
||||
};
|
||||
static const char DEFAULT_ACTION[] = "default_action";
|
||||
static const char DEFAULT_SERVER[] = "default_server";
|
||||
static const char MAX_SLAVES[] = "max_slaves";
|
||||
|
||||
HintRouter::HintRouter(SERVICE* pService)
|
||||
: maxscale::Router<HintRouter, HintRouterSession>(pService)
|
||||
HintRouter::HintRouter(SERVICE* pService, HINT_TYPE default_action, string& default_server,
|
||||
int max_slaves)
|
||||
: maxscale::Router<HintRouter, HintRouterSession>(pService),
|
||||
m_routed_to_master(0),
|
||||
m_routed_to_slave(0),
|
||||
m_routed_to_named(0),
|
||||
m_routed_to_all(0),
|
||||
m_default_action(default_action),
|
||||
m_default_server(default_server),
|
||||
m_max_slaves(max_slaves),
|
||||
m_total_slave_conns(0)
|
||||
{
|
||||
HR_ENTRY();
|
||||
if (m_max_slaves < 0)
|
||||
{
|
||||
// set a reasonable default value
|
||||
m_max_slaves = pService->n_dbref - 1;
|
||||
}
|
||||
MXS_NOTICE("Hint router [%s] created.", pService->name);
|
||||
}
|
||||
|
||||
|
||||
//static
|
||||
HintRouter* HintRouter::create(SERVICE* pService, char** pzOptions)
|
||||
{
|
||||
HR_ENTRY();
|
||||
return new HintRouter(pService);
|
||||
}
|
||||
|
||||
MXS_CONFIG_PARAMETER* params = pService->svc_config_param;
|
||||
HINT_TYPE default_action = (HINT_TYPE)config_get_enum(params, DEFAULT_ACTION,
|
||||
default_action_values);
|
||||
string default_server(config_get_string(params, DEFAULT_SERVER));
|
||||
int max_slaves = config_get_integer(params, MAX_SLAVES);
|
||||
return new HintRouter(pService, default_action, default_server, max_slaves);
|
||||
}
|
||||
|
||||
HintRouterSession* HintRouter::newSession(MXS_SESSION *pSession)
|
||||
{
|
||||
typedef HintRouterSession::RefArray::size_type array_index;
|
||||
HR_ENTRY();
|
||||
HintRouterSession* pRouterSession = NULL;
|
||||
Dcb master_Dcb(NULL);
|
||||
HintRouterSession::BackendMap all_backends;
|
||||
all_backends.rehash(1 + m_max_slaves);
|
||||
HintRouterSession::BackendArray slave_arr;
|
||||
slave_arr.reserve(m_max_slaves);
|
||||
|
||||
HintRouterSession::Backends backends;
|
||||
SERVER_REF* master_ref = NULL;
|
||||
HintRouterSession::RefArray slave_refs;
|
||||
slave_refs.reserve(m_max_slaves);
|
||||
|
||||
for (SERVER_REF* pSref = m_pService->dbref; pSref; pSref = pSref->next)
|
||||
/* Go through the server references, find master and slaves */
|
||||
for (SERVER_REF* pSref = pSession->service->dbref; pSref; pSref = pSref->next)
|
||||
{
|
||||
if (SERVER_REF_IS_ACTIVE(pSref))
|
||||
{
|
||||
SERVER* pServer = pSref->server;
|
||||
|
||||
HR_DEBUG("Connecting to %s.", pServer->name);
|
||||
|
||||
DCB* pDcb = dcb_connect(pServer, pSession, pServer->protocol);
|
||||
|
||||
if (pDcb)
|
||||
if (SERVER_IS_MASTER(pSref->server))
|
||||
{
|
||||
HR_DEBUG("Connected to %p %s.", pDcb, pServer->name);
|
||||
// TODO: What's done here, should be done by dcb_connect().
|
||||
atomic_add(&pSref->connections, 1);
|
||||
pDcb->service = pSession->service;
|
||||
|
||||
backends.push_back(Dcb(pDcb));
|
||||
if (!master_ref)
|
||||
{
|
||||
master_ref = pSref;
|
||||
}
|
||||
else
|
||||
{
|
||||
MXS_WARNING("Found multiple master servers when creating session.\n");
|
||||
}
|
||||
}
|
||||
else
|
||||
else if (SERVER_IS_SLAVE(pSref->server))
|
||||
{
|
||||
HR_DEBUG("Failed to connect to %s.", pServer->name);
|
||||
slave_refs.push_back(pSref);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (backends.size() != 0)
|
||||
if (master_ref)
|
||||
{
|
||||
pRouterSession = new HintRouterSession(pSession, this, backends);
|
||||
// Connect to master
|
||||
HR_DEBUG("Connecting to %s.", master_ref->server->unique_name);
|
||||
DCB* master_conn = dcb_connect(master_ref->server, pSession, master_ref->server->protocol);
|
||||
|
||||
if (master_conn)
|
||||
{
|
||||
HR_DEBUG("Connected.");
|
||||
atomic_add(&master_ref->connections, 1);
|
||||
master_conn->service = pSession->service;
|
||||
|
||||
master_Dcb = Dcb(master_conn);
|
||||
string name(master_conn->server->unique_name);
|
||||
all_backends.insert(HintRouterSession::MapElement(name, master_Dcb));
|
||||
}
|
||||
else
|
||||
{
|
||||
HR_DEBUG("Connection failed.");
|
||||
}
|
||||
}
|
||||
|
||||
return pRouterSession;
|
||||
/* Different sessions may use different slaves if the 'max_session_slaves'-
|
||||
* setting is low enough. First, set maximal looping limits noting that the
|
||||
* array is treated as a ring. Also, array size may have changed since last
|
||||
* time it was formed. */
|
||||
if (slave_refs.size())
|
||||
{
|
||||
array_index size = slave_refs.size();
|
||||
array_index begin = m_total_slave_conns % size;
|
||||
array_index limit = begin + size;
|
||||
|
||||
int slave_conns = 0;
|
||||
array_index current = begin;
|
||||
for (;
|
||||
(slave_conns < m_max_slaves) && current != limit;
|
||||
current++)
|
||||
{
|
||||
SERVER_REF* slave_ref = slave_refs.at(current % size);
|
||||
// Connect to a slave
|
||||
HR_DEBUG("Connecting to %s.", slave_ref->server->unique_name);
|
||||
DCB* slave_conn = dcb_connect(slave_ref->server, pSession, slave_ref->server->protocol);
|
||||
|
||||
if (slave_conn)
|
||||
{
|
||||
HR_DEBUG("Connected.");
|
||||
atomic_add(&slave_ref->connections, 1);
|
||||
slave_conn->service = pSession->service;
|
||||
Dcb slave_Dcb(slave_conn);
|
||||
slave_arr.push_back(slave_Dcb);
|
||||
|
||||
string name(slave_conn->server->unique_name);
|
||||
all_backends.insert(HintRouterSession::MapElement(name, slave_Dcb));
|
||||
slave_conns++;
|
||||
}
|
||||
else
|
||||
{
|
||||
HR_DEBUG("Connection failed.");
|
||||
}
|
||||
}
|
||||
m_total_slave_conns += slave_conns;
|
||||
}
|
||||
if (all_backends.size() != 0)
|
||||
{
|
||||
return new HintRouterSession(pSession, this, all_backends);
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void HintRouter::diagnostics(DCB* pOut)
|
||||
{
|
||||
HR_ENTRY();
|
||||
for (int i = 0; default_action_values[i].name; i++)
|
||||
{
|
||||
if (default_action_values[i].enum_value == m_default_action)
|
||||
{
|
||||
dcb_printf(pOut, "\tDefault action: route to %s\n", default_action_values[i].name);
|
||||
}
|
||||
}
|
||||
dcb_printf(pOut, "\tDefault server: %s\n", m_default_server.c_str());
|
||||
dcb_printf(pOut, "\tMaximum slave connections/session: %d\n", m_max_slaves);
|
||||
dcb_printf(pOut, "\tTotal cumulative slave connections: %d\n", m_total_slave_conns);
|
||||
dcb_printf(pOut, "\tQueries routed to master: %d\n", m_routed_to_master);
|
||||
dcb_printf(pOut, "\tQueries routed to single slave: %d\n", m_routed_to_slave);
|
||||
dcb_printf(pOut, "\tQueries routed to named server: %d\n", m_routed_to_named);
|
||||
dcb_printf(pOut, "\tQueries routed to all servers: %d\n", m_routed_to_all);
|
||||
}
|
||||
|
||||
uint64_t HintRouter::getCapabilities()
|
||||
{
|
||||
HR_ENTRY();
|
||||
return RCAP_TYPE_NONE;
|
||||
return RCAP_TYPE_STMT_INPUT | RCAP_TYPE_RESULTSET_OUTPUT;
|
||||
}
|
||||
|
||||
|
||||
extern "C" MXS_MODULE* MXS_CREATE_MODULE()
|
||||
{
|
||||
static MXS_MODULE module =
|
||||
@ -95,16 +206,24 @@ extern "C" MXS_MODULE* MXS_CREATE_MODULE()
|
||||
MXS_ROUTER_VERSION, /* Implemented module API version */
|
||||
"A hint router", /* Description */
|
||||
"V1.0.0", /* Module version */
|
||||
RCAP_TYPE_STMT_OUTPUT,
|
||||
RCAP_TYPE_STMT_INPUT | RCAP_TYPE_RESULTSET_OUTPUT,
|
||||
&HintRouter::s_object,
|
||||
NULL, /* Process init, can be null */
|
||||
NULL, /* Process finish, can be null */
|
||||
NULL, /* Thread init */
|
||||
NULL, /* Thread finish */
|
||||
{
|
||||
{
|
||||
DEFAULT_ACTION,
|
||||
MXS_MODULE_PARAM_ENUM,
|
||||
default_action_values[0].name,
|
||||
MXS_MODULE_OPT_NONE,
|
||||
default_action_values
|
||||
},
|
||||
{DEFAULT_SERVER, MXS_MODULE_PARAM_SERVER, ""},
|
||||
{MAX_SLAVES, MXS_MODULE_PARAM_INT, "-1"},
|
||||
{MXS_END_MODULE_PARAMS}
|
||||
}
|
||||
};
|
||||
|
||||
return &module;
|
||||
}
|
||||
|
@ -13,6 +13,7 @@
|
||||
*/
|
||||
|
||||
#include "hintrouterdefs.hh"
|
||||
|
||||
#include <maxscale/router.hh>
|
||||
#include "hintroutersession.hh"
|
||||
|
||||
@ -20,16 +21,30 @@ class HintRouter : public maxscale::Router<HintRouter, HintRouterSession>
|
||||
{
|
||||
public:
|
||||
static HintRouter* create(SERVICE* pService, char** pzOptions);
|
||||
|
||||
HintRouterSession* newSession(MXS_SESSION *pSession);
|
||||
|
||||
void diagnostics(DCB* pOut);
|
||||
|
||||
uint64_t getCapabilities();
|
||||
|
||||
HINT_TYPE get_default_action() const
|
||||
{
|
||||
return m_default_action;
|
||||
};
|
||||
const string& get_default_server() const
|
||||
{
|
||||
return m_default_server;
|
||||
};
|
||||
/* Simple, approximate statistics */
|
||||
volatile unsigned int m_routed_to_master;
|
||||
volatile unsigned int m_routed_to_slave;
|
||||
volatile unsigned int m_routed_to_named;
|
||||
volatile unsigned int m_routed_to_all;
|
||||
private:
|
||||
HintRouter(SERVICE* pService);
|
||||
HintRouter(SERVICE* pService, HINT_TYPE default_action, string& default_server,
|
||||
int max_slaves);
|
||||
|
||||
HINT_TYPE m_default_action;
|
||||
string m_default_server;
|
||||
int m_max_slaves;
|
||||
volatile int m_total_slave_conns;
|
||||
private:
|
||||
HintRouter(const HintRouter&);
|
||||
HintRouter& operator = (const HintRouter&);
|
||||
|
@ -13,10 +13,16 @@
|
||||
*/
|
||||
|
||||
#include <maxscale/cppdefs.hh>
|
||||
|
||||
#include <maxscale/dcb.h>
|
||||
#include <maxscale/log_manager.h>
|
||||
|
||||
#if defined(SS_DEBUG)
|
||||
#define DEBUG_HINTROUTER
|
||||
//#undef DEBUG_HINTROUTER
|
||||
#else
|
||||
#undef DEBUG_HINTROUTER
|
||||
#endif
|
||||
|
||||
#ifdef DEBUG_HINTROUTER
|
||||
#define HR_DEBUG(msg, ...) MXS_NOTICE(msg, ##__VA_ARGS__)
|
||||
|
@ -13,19 +13,58 @@
|
||||
|
||||
#define MXS_MODULE_NAME "hintrouter"
|
||||
#include "hintroutersession.hh"
|
||||
|
||||
#include <algorithm>
|
||||
#include <functional>
|
||||
|
||||
#include <maxscale/alloc.h>
|
||||
#include "hintrouter.hh"
|
||||
|
||||
namespace
|
||||
{
|
||||
/**
|
||||
* Writer is a function object that writes a clone of a provided GWBUF,
|
||||
* to each dcb it is called with.
|
||||
*/
|
||||
class Writer : std::unary_function<HintRouterSession::MapElement, bool>
|
||||
{
|
||||
public:
|
||||
Writer(GWBUF* pPacket)
|
||||
: m_pPacket(pPacket)
|
||||
{}
|
||||
|
||||
bool operator()(HintRouterSession::MapElement& elem)
|
||||
{
|
||||
bool rv = false;
|
||||
Dcb& dcb = elem.second;
|
||||
GWBUF* pPacket = gwbuf_clone(m_pPacket);
|
||||
|
||||
if (pPacket)
|
||||
{
|
||||
SERVER* pServer = dcb.server();
|
||||
HR_DEBUG("Writing packet to %p %s.", dcb.get(), pServer ? pServer->unique_name : "(null)");
|
||||
rv = dcb.write(pPacket);
|
||||
}
|
||||
return rv;
|
||||
}
|
||||
|
||||
private:
|
||||
GWBUF* m_pPacket;
|
||||
};
|
||||
}
|
||||
|
||||
HintRouterSession::HintRouterSession(MXS_SESSION* pSession,
|
||||
HintRouter* pRouter,
|
||||
const Backends& backends)
|
||||
const BackendMap& backends)
|
||||
: maxscale::RouterSession(pSession)
|
||||
, m_pRouter(pRouter)
|
||||
, m_router(pRouter)
|
||||
, m_backends(backends)
|
||||
, m_master(NULL)
|
||||
, m_n_routed_to_slave(0)
|
||||
, m_surplus_replies(0)
|
||||
{
|
||||
HR_ENTRY();
|
||||
update_connections();
|
||||
}
|
||||
|
||||
|
||||
@ -38,174 +77,55 @@ HintRouterSession::~HintRouterSession()
|
||||
void HintRouterSession::close()
|
||||
{
|
||||
HR_ENTRY();
|
||||
m_master = Dcb(NULL);
|
||||
m_slaves.clear();
|
||||
m_backends.clear();
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
/**
|
||||
* Writer is a function object that writes a clone of a provided GWBUF,
|
||||
* to each dcb it is called with.
|
||||
*/
|
||||
class Writer : std::unary_function<Dcb, bool>
|
||||
{
|
||||
public:
|
||||
Writer(GWBUF* pPacket)
|
||||
: m_pPacket(pPacket)
|
||||
{
|
||||
}
|
||||
|
||||
bool operator()(Dcb& dcb)
|
||||
{
|
||||
bool rv = false;
|
||||
|
||||
GWBUF* pPacket = gwbuf_clone(m_pPacket);
|
||||
|
||||
if (pPacket)
|
||||
{
|
||||
SERVER* pServer = dcb.server();
|
||||
HR_DEBUG("Writing packet to %p %s.", dcb.get(), pServer ? pServer->name : "(null)");
|
||||
|
||||
rv = dcb.write(pPacket);
|
||||
}
|
||||
|
||||
return rv;
|
||||
}
|
||||
|
||||
private:
|
||||
GWBUF* m_pPacket;
|
||||
};
|
||||
|
||||
|
||||
/**
|
||||
* HintMatcher is a function object that when invoked with a dcb, checks
|
||||
* whether the dcb matches the hint(s) that was given when the HintMatcher
|
||||
* was created.
|
||||
*/
|
||||
class HintMatcher : std::unary_function<const Dcb, bool>
|
||||
{
|
||||
public:
|
||||
HintMatcher(HINT* pHint)
|
||||
: m_pHint(pHint)
|
||||
{
|
||||
}
|
||||
|
||||
bool operator()(const Dcb& dcb)
|
||||
{
|
||||
bool match = false;
|
||||
|
||||
SERVER* pServer = dcb.server();
|
||||
|
||||
if (pServer)
|
||||
{
|
||||
HINT* pHint = m_pHint;
|
||||
|
||||
while (!match && pHint)
|
||||
{
|
||||
switch (pHint->type)
|
||||
{
|
||||
case HINT_ROUTE_TO_MASTER:
|
||||
if (SERVER_IS_MASTER(pServer))
|
||||
{
|
||||
match = true;
|
||||
}
|
||||
break;
|
||||
|
||||
case HINT_ROUTE_TO_SLAVE:
|
||||
if (SERVER_IS_SLAVE(pServer))
|
||||
{
|
||||
match = true;
|
||||
}
|
||||
break;
|
||||
|
||||
case HINT_ROUTE_TO_NAMED_SERVER:
|
||||
{
|
||||
const char* zName = static_cast<const char*>(pHint->data);
|
||||
|
||||
if (strcmp(zName, pServer->name) == 0)
|
||||
{
|
||||
match = true;
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
||||
case HINT_ROUTE_TO_UPTODATE_SERVER:
|
||||
case HINT_ROUTE_TO_ALL:
|
||||
case HINT_PARAMETER:
|
||||
MXS_ERROR("HINT not handled.");
|
||||
ss_dassert(false);
|
||||
break;
|
||||
}
|
||||
|
||||
pHint = pHint->next;
|
||||
}
|
||||
}
|
||||
|
||||
return match;
|
||||
}
|
||||
|
||||
private:
|
||||
HINT* m_pHint;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
int32_t HintRouterSession::routeQuery(GWBUF* pPacket)
|
||||
{
|
||||
HR_ENTRY();
|
||||
|
||||
int32_t continue_routing = 1;
|
||||
bool success = false;
|
||||
|
||||
if (pPacket->hint)
|
||||
{
|
||||
// At least one hint => look for match.
|
||||
/* At least one hint => look for match. Only use the later hints if the
|
||||
* first is unsuccessful. */
|
||||
HINT* current_hint = pPacket->hint;
|
||||
HR_DEBUG("Hint, looking for match.");
|
||||
|
||||
Backends::iterator i = std::find_if(m_backends.begin(),
|
||||
m_backends.end(),
|
||||
HintMatcher(pPacket->hint));
|
||||
|
||||
if (i != m_backends.end())
|
||||
while (!success && current_hint)
|
||||
{
|
||||
Dcb& dcb = *i;
|
||||
HR_DEBUG("Writing packet to %s.", dcb.server()->name);
|
||||
|
||||
dcb.write(pPacket);
|
||||
|
||||
// Rotate dcbs so that we get round robin as far as the slaves are concerned.
|
||||
m_backends.push_front(m_backends.back()); // Push last to first
|
||||
m_backends.pop_back(); // Remove last element
|
||||
|
||||
m_surplus_replies = 0;
|
||||
}
|
||||
else
|
||||
{
|
||||
MXS_ERROR("No backend to write to.");
|
||||
continue_routing = 0;
|
||||
success = route_by_hint(pPacket, current_hint, false);
|
||||
if (!success)
|
||||
{
|
||||
current_hint = current_hint->next;
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
|
||||
if (!success)
|
||||
{
|
||||
// No hint => all.
|
||||
HR_DEBUG("No hints, writing to all.");
|
||||
|
||||
size_t n_writes = std::count_if(m_backends.begin(), m_backends.end(), Writer(pPacket));
|
||||
gwbuf_free(pPacket);
|
||||
|
||||
if (n_writes != 0)
|
||||
HR_DEBUG("No hints or hint-based routing failed, falling back to default action.");
|
||||
HINT default_hint = {};
|
||||
default_hint.type = m_router->get_default_action();
|
||||
if (default_hint.type == HINT_ROUTE_TO_NAMED_SERVER)
|
||||
{
|
||||
m_surplus_replies = n_writes - 1;
|
||||
default_hint.data = MXS_STRDUP(m_router->get_default_server().c_str());
|
||||
// Ignore allocation error, it will just result in an error later on
|
||||
}
|
||||
else
|
||||
success = route_by_hint(pPacket, &default_hint, true);
|
||||
if (default_hint.type == HINT_ROUTE_TO_NAMED_SERVER)
|
||||
{
|
||||
MXS_ERROR("Nothing could be written, terminating session.");
|
||||
|
||||
continue_routing = 0;
|
||||
MXS_FREE(default_hint.data);
|
||||
}
|
||||
}
|
||||
|
||||
return continue_routing;
|
||||
if (!success)
|
||||
{
|
||||
gwbuf_free(pPacket);
|
||||
}
|
||||
return success;
|
||||
}
|
||||
|
||||
|
||||
@ -217,20 +137,19 @@ void HintRouterSession::clientReply(GWBUF* pPacket, DCB* pBackend)
|
||||
|
||||
if (m_surplus_replies == 0)
|
||||
{
|
||||
HR_DEBUG("Returning packet from %s.", pServer ? pServer->name : "(null)");
|
||||
HR_DEBUG("Returning packet from %s.", pServer ? pServer->unique_name : "(null)");
|
||||
|
||||
MXS_SESSION_ROUTE_REPLY(pBackend->session, pPacket);
|
||||
}
|
||||
else
|
||||
{
|
||||
HR_DEBUG("Ignoring reply packet from %s.", pServer ? pServer->name : "(null)");
|
||||
HR_DEBUG("Ignoring reply packet from %s.", pServer ? pServer->unique_name : "(null)");
|
||||
|
||||
--m_surplus_replies;
|
||||
gwbuf_free(pPacket);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void HintRouterSession::handleError(GWBUF* pMessage,
|
||||
DCB* pProblem,
|
||||
mxs_error_action_t action,
|
||||
@ -274,3 +193,217 @@ void HintRouterSession::handleError(GWBUF* pMessage,
|
||||
*pSuccess = false;
|
||||
}
|
||||
}
|
||||
|
||||
bool HintRouterSession::route_by_hint(GWBUF* pPacket, HINT* hint, bool print_errors)
|
||||
{
|
||||
bool success = false;
|
||||
switch (hint->type)
|
||||
{
|
||||
case HINT_ROUTE_TO_MASTER:
|
||||
{
|
||||
bool master_ok = false;
|
||||
// The master server should be already known, but may have changed
|
||||
if (m_master.get() && SERVER_IS_MASTER(m_master.server()))
|
||||
{
|
||||
master_ok = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
update_connections();
|
||||
if (m_master.get())
|
||||
{
|
||||
master_ok = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (master_ok)
|
||||
{
|
||||
HR_DEBUG("Writing packet to master: '%s'.", m_master.server()->unique_name);
|
||||
success = m_master.write(pPacket);
|
||||
if (success)
|
||||
{
|
||||
m_router->m_routed_to_master++;
|
||||
}
|
||||
else
|
||||
{
|
||||
HR_DEBUG("Write to master failed.");
|
||||
}
|
||||
}
|
||||
else if (print_errors)
|
||||
{
|
||||
MXS_ERROR("Hint suggests routing to master when no master connected.");
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
||||
case HINT_ROUTE_TO_SLAVE:
|
||||
success = route_to_slave(pPacket, print_errors);
|
||||
break;
|
||||
|
||||
case HINT_ROUTE_TO_NAMED_SERVER:
|
||||
{
|
||||
string backend_name((hint->data) ? (const char*)(hint->data) : "");
|
||||
BackendMap::const_iterator iter = m_backends.find(backend_name);
|
||||
if (iter != m_backends.end())
|
||||
{
|
||||
HR_DEBUG("Writing packet to %s.", iter->second.server()->unique_name);
|
||||
success = iter->second.write(pPacket);
|
||||
if (success)
|
||||
{
|
||||
m_router->m_routed_to_named++;
|
||||
}
|
||||
else
|
||||
{
|
||||
HR_DEBUG("Write failed.");
|
||||
}
|
||||
}
|
||||
else if (print_errors)
|
||||
{
|
||||
/* This shouldn't be possible with current setup as server names are
|
||||
* checked on startup. With a different filter and the 'print_errors'
|
||||
* on for the first call this is possible. */
|
||||
MXS_ERROR("Hint suggests routing to backend '%s' when no such backend connected.",
|
||||
backend_name.c_str());
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
||||
case HINT_ROUTE_TO_ALL:
|
||||
{
|
||||
HR_DEBUG("Writing packet to %lu backends.", m_backends.size());
|
||||
BackendMap::size_type n_writes =
|
||||
std::count_if(m_backends.begin(), m_backends.end(), Writer(pPacket));
|
||||
if (n_writes != 0)
|
||||
{
|
||||
m_surplus_replies = n_writes - 1;
|
||||
}
|
||||
BackendMap::size_type size = m_backends.size();
|
||||
success = (n_writes == size);
|
||||
if (success)
|
||||
{
|
||||
gwbuf_free(pPacket);
|
||||
m_router->m_routed_to_all++;
|
||||
}
|
||||
else
|
||||
{
|
||||
HR_DEBUG("Write to all failed.");
|
||||
if (print_errors)
|
||||
{
|
||||
MXS_ERROR("Write failed for '%lu' out of '%lu' backends.",
|
||||
(size - n_writes), size);
|
||||
}
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
||||
default:
|
||||
MXS_ERROR("Unsupported hint type '%d'", hint->type);
|
||||
break;
|
||||
}
|
||||
return success;
|
||||
}
|
||||
|
||||
bool HintRouterSession::route_to_slave(GWBUF* pPacket, bool print_errors)
|
||||
{
|
||||
bool success = false;
|
||||
// Find a valid slave
|
||||
size_type size = m_slaves.size();
|
||||
if (size)
|
||||
{
|
||||
size_type begin = m_n_routed_to_slave % size;
|
||||
size_type limit = begin + size;
|
||||
for (size_type curr = begin; curr != limit; curr++)
|
||||
{
|
||||
Dcb& candidate = m_slaves.at(curr % size);
|
||||
if (SERVER_IS_SLAVE(candidate.server()))
|
||||
{
|
||||
HR_DEBUG("Writing packet to slave: '%s'.", candidate.server()->unique_name);
|
||||
success = candidate.write(pPacket);
|
||||
if (success)
|
||||
{
|
||||
break;
|
||||
}
|
||||
else
|
||||
{
|
||||
HR_DEBUG("Write to slave failed.");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* It is (in theory) possible, that none of the slaves in the slave-array are
|
||||
* working (or have been promoted to master) and the previous master is now
|
||||
* a slave. In this situation, re-arranging the dcb:s will help. */
|
||||
if (!success)
|
||||
{
|
||||
update_connections();
|
||||
size = m_slaves.size();
|
||||
if (size)
|
||||
{
|
||||
size_type begin = m_n_routed_to_slave % size;
|
||||
size_type limit = begin + size;
|
||||
for (size_type curr = begin; curr != limit; curr++)
|
||||
{
|
||||
Dcb& candidate = m_slaves.at(curr % size);
|
||||
HR_DEBUG("Writing packet to slave: '%s'.", candidate.server()->unique_name);
|
||||
success = candidate.write(pPacket);
|
||||
if (success)
|
||||
{
|
||||
break;
|
||||
}
|
||||
else
|
||||
{
|
||||
HR_DEBUG("Write to slave failed.");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (success)
|
||||
{
|
||||
m_router->m_routed_to_slave++;
|
||||
m_n_routed_to_slave++;
|
||||
}
|
||||
else if (print_errors)
|
||||
{
|
||||
if (!size)
|
||||
{
|
||||
MXS_ERROR("Hint suggests routing to slave when no slaves found.");
|
||||
}
|
||||
else
|
||||
{
|
||||
MXS_ERROR("Could not write to any of '%lu' slaves.", size);
|
||||
}
|
||||
}
|
||||
return success;
|
||||
}
|
||||
|
||||
void HintRouterSession::update_connections()
|
||||
{
|
||||
/* Attempt to rearrange the dcb:s in the session such that the master and
|
||||
* slave containers are correct again. Do not try to make new connections,
|
||||
* since those would not have the correct session state anyways. */
|
||||
m_master = Dcb(NULL);
|
||||
m_slaves.clear();
|
||||
|
||||
for (BackendMap::const_iterator iter = m_backends.begin();
|
||||
iter != m_backends.end(); iter++)
|
||||
{
|
||||
SERVER* server = iter->second.get()->server;
|
||||
if (SERVER_IS_MASTER(server))
|
||||
{
|
||||
if (!m_master.get())
|
||||
{
|
||||
m_master = iter->second;
|
||||
}
|
||||
else
|
||||
{
|
||||
MXS_WARNING("Found multiple master servers when updating connections.");
|
||||
}
|
||||
}
|
||||
else if (SERVER_IS_SLAVE(server))
|
||||
{
|
||||
m_slaves.push_back(iter->second);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -13,20 +13,32 @@
|
||||
*/
|
||||
|
||||
#include "hintrouterdefs.hh"
|
||||
|
||||
#include <deque>
|
||||
#include <tr1/unordered_map>
|
||||
#include <vector>
|
||||
#include <string>
|
||||
|
||||
#include <maxscale/router.hh>
|
||||
#include "dcb.hh"
|
||||
|
||||
using std::string;
|
||||
|
||||
class HintRouter;
|
||||
|
||||
class HintRouterSession : public maxscale::RouterSession
|
||||
{
|
||||
public:
|
||||
typedef std::deque<Dcb> Backends;
|
||||
typedef std::tr1::unordered_map<string, Dcb> BackendMap; // All backends, indexed by name
|
||||
typedef std::vector<Dcb> BackendArray;
|
||||
typedef std::vector<SERVER_REF*> RefArray;
|
||||
typedef BackendMap::value_type MapElement;
|
||||
typedef BackendArray::size_type size_type;
|
||||
|
||||
HintRouterSession(MXS_SESSION* pSession,
|
||||
HintRouter* pRouter,
|
||||
const Backends& backends);
|
||||
const BackendMap& backends
|
||||
);
|
||||
|
||||
~HintRouterSession();
|
||||
|
||||
@ -42,12 +54,17 @@ public:
|
||||
bool* pSuccess);
|
||||
|
||||
private:
|
||||
HintRouterSession(const HintRouterSession&);
|
||||
HintRouterSession& operator = (const HintRouterSession&);
|
||||
|
||||
HintRouterSession(const HintRouterSession&); // denied
|
||||
HintRouterSession& operator = (const HintRouterSession&); // denied
|
||||
private:
|
||||
HintRouter* m_pRouter;
|
||||
Backends m_backends;
|
||||
size_t m_surplus_replies;
|
||||
bool route_by_hint(GWBUF* pPacket, HINT* current_hint, bool ignore_errors);
|
||||
bool route_to_slave(GWBUF* pPacket, bool print_errors);
|
||||
void update_connections();
|
||||
|
||||
HintRouter* m_router;
|
||||
BackendMap m_backends; // all connections
|
||||
Dcb m_master; // connection to master
|
||||
BackendArray m_slaves; // connections to slaves
|
||||
size_type m_n_routed_to_slave; // packets routed to a single slave, used for rr
|
||||
size_type m_surplus_replies; // how many replies should be ignored
|
||||
};
|
||||
|
Reference in New Issue
Block a user