HintRouter: Support various hint types, round-robin rotate slaves

Supports hint types:
-master
-slave
-named server
-all

A default action, which is performed when no hint exists or on error,
can be set. The different actions are analogous to the hint types.

A maximum connection number for slaves can be set. If more slaves are
configured for the service, the filter will rotate slaves for new sessions.

Within a session with multiple slaves, the "route_to_slave"-hint will
also rotate among the slave backends.
This commit is contained in:
Esa Korhonen 2017-03-27 10:07:51 +03:00
parent ddcd1f960c
commit 90c249a8b2
5 changed files with 377 additions and 93 deletions

View File

@ -13,65 +13,158 @@
#define MXS_MODULE_NAME "hintrouter"
#include "hintrouter.hh"
#include <limits>
#include <vector>
#include <maxscale/log_manager.h>
#include "dcb.hh"
static const MXS_ENUM_VALUE default_action_values[] =
{
{"master", HINT_ROUTE_TO_MASTER},
{"slave", HINT_ROUTE_TO_SLAVE},
{"named_server", HINT_ROUTE_TO_NAMED_SERVER},
{"route_to_all", HINT_ROUTE_TO_ALL},
{NULL} /* Last must be NULL */
};
static const char DEFAULT_ACTION[] = "default_action";
static const char DEFAULT_SERVER[] = "default_server";
static const char MAX_SLAVES[] = "max_slaves";
HintRouter::HintRouter(SERVICE* pService)
: maxscale::Router<HintRouter, HintRouterSession>(pService)
HintRouter::HintRouter(SERVICE* pService, HINT_TYPE default_action, string& default_server,
int max_slaves)
: maxscale::Router<HintRouter, HintRouterSession>(pService),
m_default_action(default_action),
m_default_server(default_server),
m_max_slaves(max_slaves),
m_total_slave_conns(0)
{
HR_ENTRY();
if (m_max_slaves < 0)
{ // set a reasonable default value
m_max_slaves = pService->n_dbref - 1;
}
MXS_NOTICE("Hint router [%s] created.", pService->name);
}
//static
HintRouter* HintRouter::create(SERVICE* pService, char** pzOptions)
{
HR_ENTRY();
return new HintRouter(pService);
}
MXS_CONFIG_PARAMETER* params = pService->svc_config_param;
HINT_TYPE default_action = (HINT_TYPE)config_get_enum(params, DEFAULT_ACTION,
default_action_values);
string default_server(config_get_string(params, DEFAULT_SERVER));
int max_slaves = config_get_integer(params, MAX_SLAVES);
return new HintRouter(pService, default_action, default_server, max_slaves);
}
HintRouterSession* HintRouter::newSession(MXS_SESSION *pSession)
{
typedef HintRouterSession::RefArray::size_type array_index;
HR_ENTRY();
HintRouterSession* pRouterSession = NULL;
Dcb master_Dcb(NULL);
HintRouterSession::BackendMap all_backends;
all_backends.rehash(1 + m_max_slaves);
HintRouterSession::BackendArray slave_arr;
slave_arr.reserve(m_max_slaves);
HintRouterSession::Backends backends;
SERVER_REF* master_ref = NULL;
HintRouterSession::RefArray slave_refs;
slave_refs.reserve(m_max_slaves);
for (SERVER_REF* pSref = m_pService->dbref; pSref; pSref = pSref->next)
/* Go through the server references, find master and slaves */
for (SERVER_REF* pSref = pSession->service->dbref; pSref; pSref = pSref->next)
{
if (SERVER_REF_IS_ACTIVE(pSref))
{
SERVER* pServer = pSref->server;
HR_DEBUG("Connecting to %s.", pServer->name);
DCB* pDcb = dcb_connect(pServer, pSession, pServer->protocol);
if (pDcb)
if (SERVER_IS_MASTER(pSref->server))
{
HR_DEBUG("Connected to %p %s.", pDcb, pServer->name);
// TODO: What's done here, should be done by dcb_connect().
atomic_add(&pSref->connections, 1);
pDcb->service = pSession->service;
backends.push_back(Dcb(pDcb));
if (!master_ref)
{
master_ref = pSref;
}
else
{
MXS_WARNING("Found multiple master servers when creating session.\n");
}
}
else
else if (SERVER_IS_SLAVE(pSref->server))
{
HR_DEBUG("Failed to connect to %s.", pServer->name);
slave_refs.push_back(pSref);
}
}
}
if (backends.size() != 0)
if (master_ref)
{
pRouterSession = new HintRouterSession(pSession, this, backends);
// Connect to master
HR_DEBUG("Connecting to %s.", master_ref->server->name);
DCB* master_conn = dcb_connect(master_ref->server, pSession, master_ref->server->protocol);
if (master_conn)
{
HR_DEBUG("Connected.");
atomic_add(&master_ref->connections, 1);
master_conn->service = pSession->service;
master_Dcb = Dcb(master_conn);
string name(master_conn->server->unique_name);
all_backends.insert(HintRouterSession::MapElement(name, master_Dcb));
}
else
{
HR_DEBUG("Connection failed.");
}
}
return pRouterSession;
/* Different sessions may use different slaves if the 'max_session_slaves'-
* setting is low enough. First, set maximal looping limits noting that the
* array is treated as a ring. Also, array size may have changed since last
* time it was formed. */
if (slave_refs.size())
{
array_index size = slave_refs.size();
array_index begin = m_total_slave_conns % size;
array_index limit = begin + size;
int slave_conns = 0;
array_index current = begin;
for (;
(slave_conns < m_max_slaves) && current != limit;
current++)
{
SERVER_REF* slave_ref = slave_refs.at(current % size);
// Connect to a slave
HR_DEBUG("Connecting to %s.", slave_ref->server->name);
DCB* slave_conn = dcb_connect(slave_ref->server, pSession, slave_ref->server->protocol);
if (slave_conn)
{
HR_DEBUG("Connected.");
atomic_add(&slave_ref->connections, 1);
slave_conn->service = pSession->service;
Dcb slave_Dcb(slave_conn);
slave_arr.push_back(slave_Dcb);
string name(slave_conn->server->unique_name);
all_backends.insert(HintRouterSession::MapElement(name, slave_Dcb));
slave_conns++;
}
else
{
HR_DEBUG("Connection failed.");
}
}
m_total_slave_conns += slave_conns;
}
if (all_backends.size() != 0)
{
return new HintRouterSession(pSession, this, all_backends);
}
return NULL;
}
void HintRouter::diagnostics(DCB* pOut)
@ -82,10 +175,9 @@ void HintRouter::diagnostics(DCB* pOut)
uint64_t HintRouter::getCapabilities()
{
HR_ENTRY();
return RCAP_TYPE_NONE;
return RCAP_TYPE_STMT_INPUT | RCAP_TYPE_RESULTSET_OUTPUT;
}
extern "C" MXS_MODULE* MXS_CREATE_MODULE()
{
static MXS_MODULE module =
@ -95,16 +187,24 @@ extern "C" MXS_MODULE* MXS_CREATE_MODULE()
MXS_ROUTER_VERSION, /* Implemented module API version */
"A hint router", /* Description */
"V1.0.0", /* Module version */
RCAP_TYPE_STMT_OUTPUT,
RCAP_TYPE_STMT_INPUT | RCAP_TYPE_RESULTSET_OUTPUT,
&HintRouter::s_object,
NULL, /* Process init, can be null */
NULL, /* Process finish, can be null */
NULL, /* Thread init */
NULL, /* Thread finish */
{
{
DEFAULT_ACTION,
MXS_MODULE_PARAM_ENUM,
default_action_values[0].name,
MXS_MODULE_OPT_NONE,
default_action_values
},
{DEFAULT_SERVER, MXS_MODULE_PARAM_SERVER, ""},
{MAX_SLAVES, MXS_MODULE_PARAM_INT, "-1"},
{MXS_END_MODULE_PARAMS}
}
};
return &module;
}

View File

@ -13,6 +13,7 @@
*/
#include "hintrouterdefs.hh"
#include <maxscale/router.hh>
#include "hintroutersession.hh"
@ -20,15 +21,25 @@ class HintRouter : public maxscale::Router<HintRouter, HintRouterSession>
{
public:
static HintRouter* create(SERVICE* pService, char** pzOptions);
HintRouterSession* newSession(MXS_SESSION *pSession);
void diagnostics(DCB* pOut);
uint64_t getCapabilities();
HINT_TYPE get_default_action() const
{
return m_default_action;
};
const string& get_default_server() const
{
return m_default_server;
};
private:
HintRouter(SERVICE* pService);
HintRouter(SERVICE* pService, HINT_TYPE default_action, string& default_server,
int max_slaves);
HINT_TYPE m_default_action;
string m_default_server;
int m_max_slaves;
volatile int m_total_slave_conns;
private:
HintRouter(const HintRouter&);

View File

@ -13,6 +13,8 @@
*/
#include <maxscale/cppdefs.hh>
#include <maxscale/dcb.h>
#include <maxscale/log_manager.h>
#define DEBUG_HINTROUTER

View File

@ -13,19 +13,25 @@
#define MXS_MODULE_NAME "hintrouter"
#include "hintroutersession.hh"
#include <algorithm>
#include <functional>
#include <maxscale/alloc.h>
#include "hintrouter.hh"
HintRouterSession::HintRouterSession(MXS_SESSION* pSession,
HintRouter* pRouter,
const Backends& backends)
const BackendMap& backends)
: maxscale::RouterSession(pSession)
, m_pRouter(pRouter)
, m_router(pRouter)
, m_backends(backends)
, m_master(NULL)
, m_n_routed_to_slave(0)
, m_surplus_replies(0)
{
HR_ENTRY();
update_connections();
}
@ -38,6 +44,8 @@ HintRouterSession::~HintRouterSession()
void HintRouterSession::close()
{
HR_ENTRY();
m_master = Dcb(NULL);
m_slaves.clear();
m_backends.clear();
}
@ -48,18 +56,17 @@ namespace
* Writer is a function object that writes a clone of a provided GWBUF,
* to each dcb it is called with.
*/
class Writer : std::unary_function<Dcb, bool>
class Writer : std::unary_function<HintRouterSession::MapElement, bool>
{
public:
Writer(GWBUF* pPacket)
: m_pPacket(pPacket)
{
}
{}
bool operator()(Dcb& dcb)
bool operator()(HintRouterSession::MapElement& elem)
{
bool rv = false;
Dcb& dcb = elem.second;
GWBUF* pPacket = gwbuf_clone(m_pPacket);
if (pPacket)
@ -77,7 +84,6 @@ private:
GWBUF* m_pPacket;
};
/**
* HintMatcher is a function object that when invoked with a dcb, checks
* whether the dcb matches the hint(s) that was given when the HintMatcher
@ -88,8 +94,7 @@ class HintMatcher : std::unary_function<const Dcb, bool>
public:
HintMatcher(HINT* pHint)
: m_pHint(pHint)
{
}
{}
bool operator()(const Dcb& dcb)
{
@ -151,61 +156,178 @@ private:
}
bool HintRouterSession::route_to_slave(GWBUF* pPacket)
{
bool success = false;
// Find a valid slave
size_type size = m_slaves.size();
size_type begin = m_n_routed_to_slave % size;
size_type limit = begin + size;
for (size_type curr = begin; curr != limit; curr++)
{
Dcb& candidate = m_slaves.at(curr % size);
if (SERVER_IS_SLAVE(candidate.server()))
{
success = candidate.write(pPacket);
if (success)
{
break;
}
}
}
/* It is (in theory) possible, that none of the slaves in the slave-array are
* working (or have been promoted to master) and the previous master is now
* a slave. In this situation, re-arranging the dcb:s will help. */
if (!success)
{
update_connections();
for (size_type curr = begin; curr != limit; curr++)
{
Dcb& candidate = m_slaves.at(curr % size);
success = candidate.write(pPacket);
if (success)
{
break;
}
}
}
if (success)
{
m_n_routed_to_slave++;
}
return success;
}
bool HintRouterSession::route_by_hint(GWBUF* pPacket, HINT* hint, bool ignore_errors)
{
bool success = false;
switch (hint->type)
{
case HINT_ROUTE_TO_MASTER:
if (m_master.get())
{
// The master server should be already known, but may have changed
if (SERVER_IS_MASTER(m_master.server()))
{
success = m_master.write(pPacket);
}
else
{
update_connections();
if (m_master.get())
{
HR_DEBUG("Writing packet to %s.", m_master.server()->name);
success = m_master.write(pPacket);
}
}
}
else if (!ignore_errors)
{
MXS_ERROR("Hint suggests routing to master when no master connected.");
}
break;
case HINT_ROUTE_TO_SLAVE:
if (m_slaves.size())
{
success = route_to_slave(pPacket);
}
else if (!ignore_errors)
{
MXS_ERROR("Hint suggests routing to slave when no slave connected.");
}
break;
case HINT_ROUTE_TO_NAMED_SERVER:
{
string backend_name((hint->data) ? (const char*)(hint->data) : "");
BackendMap::const_iterator iter = m_backends.find(backend_name);
if (iter != m_backends.end())
{
HR_DEBUG("Writing packet to %s.", iter->second.server()->unique_name);
success = iter->second.write(pPacket);
}
else if (!ignore_errors)
{
MXS_ERROR("Hint suggests routing to backend '%s' when no such backend connected.",
backend_name.c_str());
}
}
break;
case HINT_ROUTE_TO_ALL:
{
HR_DEBUG("Writing packet to %lu backends.", m_backends.size());
size_type n_writes =
std::count_if(m_backends.begin(), m_backends.end(), Writer(pPacket));
if (n_writes != 0)
{
m_surplus_replies = n_writes - 1;
}
else if (!ignore_errors)
{
MXS_ERROR("Nothing could be written, terminating session.");
}
success = (n_writes == m_backends.size()); // Is this too strict?
if (success)
{
gwbuf_free(pPacket);
}
}
break;
default:
MXS_ERROR("Unsupported hint type '%d'", hint->type);
break;
}
return success;
}
int32_t HintRouterSession::routeQuery(GWBUF* pPacket)
{
HR_ENTRY();
int32_t continue_routing = 1;
bool success = false;
if (pPacket->hint)
{
// At least one hint => look for match.
/* At least one hint => look for match. Only use the later hints if the
* first is unsuccessful. */
HINT* current_hint = pPacket->hint;
HR_DEBUG("Hint, looking for match.");
Backends::iterator i = std::find_if(m_backends.begin(),
m_backends.end(),
HintMatcher(pPacket->hint));
if (i != m_backends.end())
while (!success && current_hint)
{
Dcb& dcb = *i;
HR_DEBUG("Writing packet to %s.", dcb.server()->name);
dcb.write(pPacket);
// Rotate dcbs so that we get round robin as far as the slaves are concerned.
m_backends.push_front(m_backends.back()); // Push last to first
m_backends.pop_back(); // Remove last element
m_surplus_replies = 0;
}
else
{
MXS_ERROR("No backend to write to.");
continue_routing = 0;
success = route_by_hint(pPacket, current_hint, true);
current_hint = current_hint->next;
}
}
else
if (!success)
{
// No hint => all.
HR_DEBUG("No hints, writing to all.");
size_t n_writes = std::count_if(m_backends.begin(), m_backends.end(), Writer(pPacket));
gwbuf_free(pPacket);
if (n_writes != 0)
// No hint => default action.
HR_DEBUG("No hints or hint-based routing failed, falling back to default action.");
HINT fake_hint = {};
fake_hint.type = m_router->get_default_action();
if (fake_hint.type == HINT_ROUTE_TO_NAMED_SERVER)
{
m_surplus_replies = n_writes - 1;
fake_hint.data = MXS_STRDUP(m_router->get_default_server().c_str());
// Ignore allocation error, it will just result in an error later on
}
else
success = route_by_hint(pPacket, &fake_hint, false);
if (fake_hint.type == HINT_ROUTE_TO_NAMED_SERVER)
{
MXS_ERROR("Nothing could be written, terminating session.");
continue_routing = 0;
MXS_FREE(fake_hint.data);
}
}
return continue_routing;
if (!success)
{
gwbuf_free(pPacket);
}
return success;
}
@ -217,20 +339,19 @@ void HintRouterSession::clientReply(GWBUF* pPacket, DCB* pBackend)
if (m_surplus_replies == 0)
{
HR_DEBUG("Returning packet from %s.", pServer ? pServer->name : "(null)");
HR_DEBUG("Returning packet from %s.", pServer ? pServer->unique_name : "(null)");
MXS_SESSION_ROUTE_REPLY(pBackend->session, pPacket);
}
else
{
HR_DEBUG("Ignoring reply packet from %s.", pServer ? pServer->name : "(null)");
HR_DEBUG("Ignoring reply packet from %s.", pServer ? pServer->unique_name : "(null)");
--m_surplus_replies;
gwbuf_free(pPacket);
}
}
void HintRouterSession::handleError(GWBUF* pMessage,
DCB* pProblem,
mxs_error_action_t action,
@ -274,3 +395,33 @@ void HintRouterSession::handleError(GWBUF* pMessage,
*pSuccess = false;
}
}
void HintRouterSession::update_connections()
{
/* Attempt to rearrange the dcb:s in the session such that the master and
* slave containers are correct again. Do not try to make new connections,
* since those would not have the correct session state anyways. */
m_master = Dcb(NULL);
m_slaves.clear();
for (BackendMap::const_iterator iter = m_backends.begin();
iter != m_backends.end(); iter++)
{
SERVER* server = iter->second.get()->server;
if (SERVER_IS_MASTER(server))
{
if (!m_master.get())
{
m_master = iter->second;
}
else
{
MXS_WARNING("Found multiple master servers when updating connections.");
}
}
else if (SERVER_IS_SLAVE(server))
{
m_slaves.push_back(iter->second);
}
}
}

View File

@ -13,26 +13,42 @@
*/
#include "hintrouterdefs.hh"
#include <deque>
#include <tr1/unordered_map>
#include <vector>
#include <string>
#include <maxscale/router.hh>
#include "dcb.hh"
using std::string;
class HintRouter;
class HintRouterSession : public maxscale::RouterSession
{
public:
typedef std::deque<Dcb> Backends;
typedef std::tr1::unordered_map<string, Dcb> BackendMap; // All backends, indexed by name
typedef std::vector<Dcb> BackendArray;
typedef std::vector<SERVER_REF*> RefArray;
typedef BackendMap::value_type MapElement;
typedef BackendArray::size_type size_type;
HintRouterSession(MXS_SESSION* pSession,
HintRouter* pRouter,
const Backends& backends);
const BackendMap& backends
);
~HintRouterSession();
void close();
int32_t routeQuery(GWBUF* pPacket);
bool route_by_hint(GWBUF* pPacket, HINT* current_hint, bool ignore_errors);
bool route_to_slave(GWBUF* pPacket);
void clientReply(GWBUF* pPacket, DCB* pBackend);
@ -41,13 +57,17 @@ public:
mxs_error_action_t action,
bool* pSuccess);
private:
HintRouterSession(const HintRouterSession&);
HintRouterSession& operator = (const HintRouterSession&);
void update_connections();
private:
HintRouter* m_pRouter;
Backends m_backends;
size_t m_surplus_replies;
HintRouterSession(const HintRouterSession&); // denied
HintRouterSession& operator = (const HintRouterSession&); // denied
private:
HintRouter* m_router;
BackendMap m_backends; // all connections
Dcb m_master; // connection to master
BackendArray m_slaves; // connections to slaves
size_type m_n_routed_to_slave; // packets routed to a single slave, used for rr
size_t m_surplus_replies; // how many replies should be ignored
};