Merge branch 'develop' into MXS-1075
This commit is contained in:
@ -105,13 +105,13 @@ bool avro_handle_convert(const MODULECMD_ARG *args)
|
||||
bool rval = false;
|
||||
|
||||
if (strcmp(args->argv[1].value.string, "start") == 0 &&
|
||||
conversion_task_ctl(args->argv[0].value.service->router_instance, true))
|
||||
conversion_task_ctl((AVRO_INSTANCE*)args->argv[0].value.service->router_instance, true))
|
||||
{
|
||||
MXS_NOTICE("Started conversion for service '%s'.", args->argv[0].value.service->name);
|
||||
rval = true;
|
||||
}
|
||||
else if (strcmp(args->argv[1].value.string, "stop") == 0 &&
|
||||
conversion_task_ctl(args->argv[0].value.service->router_instance, false))
|
||||
conversion_task_ctl((AVRO_INSTANCE*)args->argv[0].value.service->router_instance, false))
|
||||
{
|
||||
MXS_NOTICE("Stopped conversion for service '%s'.", args->argv[0].value.service->name);
|
||||
rval = true;
|
||||
@ -170,6 +170,7 @@ MXS_MODULE* MXS_CREATE_MODULE()
|
||||
MXS_ROUTER_VERSION,
|
||||
"Binlogrouter",
|
||||
"V1.0.0",
|
||||
RCAP_TYPE_NO_RSESSION | RCAP_TYPE_NO_AUTH,
|
||||
&MyObject,
|
||||
NULL, /* Process init. */
|
||||
NULL, /* Process finish. */
|
||||
@ -1018,7 +1019,7 @@ errorReply(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, GWBUF *mess
|
||||
|
||||
static uint64_t getCapabilities(MXS_ROUTER* instance)
|
||||
{
|
||||
return RCAP_TYPE_NO_RSESSION;
|
||||
return RCAP_TYPE_NONE;
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@ -173,6 +173,8 @@ MXS_MODULE* MXS_CREATE_MODULE()
|
||||
MXS_ROUTER_VERSION,
|
||||
"Binlogrouter",
|
||||
"V2.1.0",
|
||||
RCAP_TYPE_NO_RSESSION | RCAP_TYPE_CONTIGUOUS_OUTPUT |
|
||||
RCAP_TYPE_RESULTSET_OUTPUT | RCAP_TYPE_NO_AUTH,
|
||||
&MyObject,
|
||||
NULL, /* Process init. */
|
||||
NULL, /* Process finish. */
|
||||
@ -1942,7 +1944,7 @@ static void rses_end_locked_router_action(ROUTER_SLAVE *rses)
|
||||
|
||||
static uint64_t getCapabilities(MXS_ROUTER* instance)
|
||||
{
|
||||
return RCAP_TYPE_NO_RSESSION | RCAP_TYPE_CONTIGUOUS_OUTPUT | RCAP_TYPE_RESULTSET_OUTPUT;
|
||||
return RCAP_TYPE_NONE;
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@ -91,6 +91,7 @@ MXS_MODULE* MXS_CREATE_MODULE()
|
||||
MXS_ROUTER_VERSION,
|
||||
"The admin user interface",
|
||||
"V1.0.0",
|
||||
RCAP_TYPE_NO_AUTH,
|
||||
&MyObject,
|
||||
NULL, /* Process init. */
|
||||
NULL, /* Process finish. */
|
||||
@ -271,6 +272,8 @@ execute(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, GWBUF *queue)
|
||||
queue = gwbuf_consume(queue, GWBUF_LENGTH(queue));
|
||||
}
|
||||
|
||||
MXS_INFO("MaxAdmin: %s", session->cmdbuf);
|
||||
|
||||
execute_cmd(session);
|
||||
return 1;
|
||||
}
|
||||
@ -289,5 +292,5 @@ diagnostics(MXS_ROUTER *instance, DCB *dcb)
|
||||
|
||||
static uint64_t getCapabilities(MXS_ROUTER *instance)
|
||||
{
|
||||
return 0;
|
||||
return RCAP_TYPE_NONE;
|
||||
}
|
||||
|
||||
@ -90,6 +90,7 @@ MXS_MODULE* MXS_CREATE_MODULE()
|
||||
MXS_ROUTER_VERSION,
|
||||
"The debug user interface",
|
||||
"V1.1.1",
|
||||
RCAP_TYPE_NO_AUTH,
|
||||
&MyObject,
|
||||
NULL, /* Process init. */
|
||||
NULL, /* Process finish. */
|
||||
@ -294,5 +295,5 @@ diagnostics(MXS_ROUTER *instance, DCB *dcb)
|
||||
|
||||
static uint64_t getCapabilities(MXS_ROUTER* instance)
|
||||
{
|
||||
return 0;
|
||||
return RCAP_TYPE_NONE;
|
||||
}
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
add_library(hintrouter SHARED
|
||||
hintrouter.cc
|
||||
hintroutersession.cc
|
||||
dcb.cc
|
||||
)
|
||||
|
||||
target_link_libraries(hintrouter maxscale-common)
|
||||
|
||||
72
server/modules/routing/hintrouter/dcb.cc
Normal file
72
server/modules/routing/hintrouter/dcb.cc
Normal file
@ -0,0 +1,72 @@
|
||||
/*
|
||||
* Copyright (c) 2016 MariaDB Corporation Ab
|
||||
*
|
||||
* Use of this software is governed by the Business Source License included
|
||||
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
|
||||
*
|
||||
* Change Date: 2019-07-01
|
||||
*
|
||||
* On the date above, in accordance with the Business Source License, use
|
||||
* of this software will be governed by version 2 or later of the General
|
||||
* Public License.
|
||||
*/
|
||||
|
||||
#include "dcb.hh"
|
||||
#include <maxscale/atomic.h>
|
||||
#include <maxscale/service.h>
|
||||
|
||||
|
||||
Dcb::Dcb(DCB* pDcb)
|
||||
: m_pDcb(pDcb)
|
||||
, m_pRefs(NULL)
|
||||
{
|
||||
try
|
||||
{
|
||||
m_pRefs = new int (1);
|
||||
}
|
||||
catch (const std::exception&)
|
||||
{
|
||||
dcb_close(pDcb);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
Dcb::Dcb(const Dcb& rhs)
|
||||
: m_pDcb(rhs.m_pDcb)
|
||||
, m_pRefs(rhs.m_pRefs)
|
||||
{
|
||||
++(*m_pRefs);
|
||||
}
|
||||
|
||||
Dcb& Dcb::operator = (Dcb rhs)
|
||||
{
|
||||
swap(rhs);
|
||||
return *this;
|
||||
}
|
||||
|
||||
void Dcb::dec()
|
||||
{
|
||||
ss_dassert(*m_pRefs > 0);
|
||||
|
||||
if (--(*m_pRefs) == 0)
|
||||
{
|
||||
HR_DEBUG("CLOSING dcb");
|
||||
// TODO: You should not need to manually adjust any
|
||||
// TODO: connections number, dcb_close should handle that.
|
||||
SERVER_REF* pSref = m_pDcb->service->dbref;
|
||||
|
||||
while (pSref && (pSref->server != m_pDcb->server))
|
||||
{
|
||||
pSref = pSref->next;
|
||||
}
|
||||
|
||||
if (pSref)
|
||||
{
|
||||
atomic_add(&pSref->connections, -1);
|
||||
}
|
||||
|
||||
dcb_close(m_pDcb);
|
||||
|
||||
delete m_pRefs;
|
||||
}
|
||||
}
|
||||
65
server/modules/routing/hintrouter/dcb.hh
Normal file
65
server/modules/routing/hintrouter/dcb.hh
Normal file
@ -0,0 +1,65 @@
|
||||
#pragma once
|
||||
/*
|
||||
* Copyright (c) 2016 MariaDB Corporation Ab
|
||||
*
|
||||
* Use of this software is governed by the Business Source License included
|
||||
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
|
||||
*
|
||||
* Change Date: 2019-07-01
|
||||
*
|
||||
* On the date above, in accordance with the Business Source License, use
|
||||
* of this software will be governed by version 2 or later of the General
|
||||
* Public License.
|
||||
*/
|
||||
|
||||
#include "hintrouterdefs.hh"
|
||||
#include <algorithm>
|
||||
#include <maxscale/dcb.h>
|
||||
|
||||
class Dcb
|
||||
{
|
||||
public:
|
||||
explicit Dcb(DCB* pDcb);
|
||||
Dcb(const Dcb& rhs);
|
||||
~Dcb()
|
||||
{
|
||||
dec();
|
||||
}
|
||||
|
||||
Dcb& operator = (Dcb rhs);
|
||||
|
||||
struct server* server() const
|
||||
{
|
||||
return m_pDcb->server;
|
||||
}
|
||||
|
||||
DCB* get() const
|
||||
{
|
||||
return m_pDcb;
|
||||
}
|
||||
|
||||
bool write(GWBUF* pPacket)
|
||||
{
|
||||
ss_dassert(m_pDcb);
|
||||
return m_pDcb->func.write(m_pDcb, pPacket) == 1;
|
||||
}
|
||||
|
||||
private:
|
||||
void inc()
|
||||
{
|
||||
++(*m_pRefs);
|
||||
}
|
||||
|
||||
void dec();
|
||||
|
||||
void swap(Dcb& rhs)
|
||||
{
|
||||
std::swap(m_pDcb, rhs.m_pDcb);
|
||||
std::swap(m_pRefs, rhs.m_pRefs);
|
||||
}
|
||||
|
||||
private:
|
||||
DCB* m_pDcb;
|
||||
int* m_pRefs;
|
||||
};
|
||||
|
||||
@ -14,11 +14,13 @@
|
||||
#define MXS_MODULE_NAME "hintrouter"
|
||||
#include "hintrouter.hh"
|
||||
#include <maxscale/log_manager.h>
|
||||
#include "dcb.hh"
|
||||
|
||||
|
||||
HintRouter::HintRouter(SERVICE* pService)
|
||||
: maxscale::Router<HintRouter, HintRouterSession>(pService)
|
||||
{
|
||||
HR_ENTRY();
|
||||
MXS_NOTICE("Hint router [%s] created.", pService->name);
|
||||
}
|
||||
|
||||
@ -26,22 +28,61 @@ HintRouter::HintRouter(SERVICE* pService)
|
||||
//static
|
||||
HintRouter* HintRouter::create(SERVICE* pService, char** pzOptions)
|
||||
{
|
||||
HR_ENTRY();
|
||||
return new HintRouter(pService);
|
||||
}
|
||||
|
||||
|
||||
HintRouterSession* HintRouter::newSession(MXS_SESSION *pSession)
|
||||
{
|
||||
return new HintRouterSession(pSession, this);
|
||||
HR_ENTRY();
|
||||
HintRouterSession* pRouterSession = NULL;
|
||||
|
||||
HintRouterSession::Backends backends;
|
||||
|
||||
for (SERVER_REF* pSref = m_pService->dbref; pSref; pSref = pSref->next)
|
||||
{
|
||||
if (SERVER_REF_IS_ACTIVE(pSref))
|
||||
{
|
||||
SERVER* pServer = pSref->server;
|
||||
|
||||
HR_DEBUG("Connecting to %s.", pServer->name);
|
||||
|
||||
DCB* pDcb = dcb_connect(pServer, pSession, pServer->protocol);
|
||||
|
||||
if (pDcb)
|
||||
{
|
||||
HR_DEBUG("Connected to %p %s.", pDcb, pServer->name);
|
||||
// TODO: What's done here, should be done by dcb_connect().
|
||||
atomic_add(&pSref->connections, 1);
|
||||
pDcb->service = pSession->service;
|
||||
|
||||
backends.push_back(Dcb(pDcb));
|
||||
}
|
||||
else
|
||||
{
|
||||
HR_DEBUG("Failed to connect to %s.", pServer->name);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (backends.size() != 0)
|
||||
{
|
||||
pRouterSession = new HintRouterSession(pSession, this, backends);
|
||||
}
|
||||
|
||||
return pRouterSession;
|
||||
}
|
||||
|
||||
void HintRouter::diagnostics(DCB* pOut)
|
||||
{
|
||||
HR_ENTRY();
|
||||
}
|
||||
|
||||
uint64_t HintRouter::getCapabilities()
|
||||
{
|
||||
return 0;
|
||||
HR_ENTRY();
|
||||
return RCAP_TYPE_NONE;
|
||||
}
|
||||
|
||||
|
||||
@ -54,6 +95,7 @@ extern "C" MXS_MODULE* MXS_CREATE_MODULE()
|
||||
MXS_ROUTER_VERSION, /* Implemented module API version */
|
||||
"A hint router", /* Description */
|
||||
"V1.0.0", /* Module version */
|
||||
RCAP_TYPE_STMT_OUTPUT,
|
||||
&HintRouter::s_object,
|
||||
NULL, /* Process init, can be null */
|
||||
NULL, /* Process finish, can be null */
|
||||
|
||||
@ -12,7 +12,7 @@
|
||||
* Public License.
|
||||
*/
|
||||
|
||||
#include <maxscale/cppdefs.hh>
|
||||
#include "hintrouterdefs.hh"
|
||||
#include <maxscale/router.hh>
|
||||
#include "hintroutersession.hh"
|
||||
|
||||
|
||||
27
server/modules/routing/hintrouter/hintrouterdefs.hh
Normal file
27
server/modules/routing/hintrouter/hintrouterdefs.hh
Normal file
@ -0,0 +1,27 @@
|
||||
#pragma once
|
||||
/*
|
||||
* Copyright (c) 2016 MariaDB Corporation Ab
|
||||
*
|
||||
* Use of this software is governed by the Business Source License included
|
||||
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
|
||||
*
|
||||
* Change Date: 2019-07-01
|
||||
*
|
||||
* On the date above, in accordance with the Business Source License, use
|
||||
* of this software will be governed by version 2 or later of the General
|
||||
* Public License.
|
||||
*/
|
||||
|
||||
#include <maxscale/cppdefs.hh>
|
||||
#include <maxscale/log_manager.h>
|
||||
|
||||
#define DEBUG_HINTROUTER
|
||||
//#undef DEBUG_HINTROUTER
|
||||
|
||||
#ifdef DEBUG_HINTROUTER
|
||||
#define HR_DEBUG(msg, ...) MXS_NOTICE(msg, ##__VA_ARGS__)
|
||||
#define HR_ENTRY() HR_DEBUG(__func__)
|
||||
#else
|
||||
#define HR_DEBUG(msg, ...)
|
||||
#define HR_ENTRY()
|
||||
#endif
|
||||
@ -13,36 +13,221 @@
|
||||
|
||||
#define MXS_MODULE_NAME "hintrouter"
|
||||
#include "hintroutersession.hh"
|
||||
#include <maxscale/log_manager.h>
|
||||
#include <algorithm>
|
||||
#include <functional>
|
||||
|
||||
|
||||
HintRouterSession::HintRouterSession(MXS_SESSION* pSession, HintRouter* pRouter)
|
||||
HintRouterSession::HintRouterSession(MXS_SESSION* pSession,
|
||||
HintRouter* pRouter,
|
||||
const Backends& backends)
|
||||
: maxscale::RouterSession(pSession)
|
||||
, m_pRouter(pRouter)
|
||||
, m_backends(backends)
|
||||
, m_surplus_replies(0)
|
||||
{
|
||||
HR_ENTRY();
|
||||
}
|
||||
|
||||
|
||||
HintRouterSession::~HintRouterSession()
|
||||
{
|
||||
HR_ENTRY();
|
||||
}
|
||||
|
||||
|
||||
void HintRouterSession::close()
|
||||
{
|
||||
HR_ENTRY();
|
||||
m_backends.clear();
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
/**
|
||||
* Writer is a function object that writes a clone of a provided GWBUF,
|
||||
* to each dcb it is called with.
|
||||
*/
|
||||
class Writer : std::unary_function<Dcb, bool>
|
||||
{
|
||||
public:
|
||||
Writer(GWBUF* pPacket)
|
||||
: m_pPacket(pPacket)
|
||||
{
|
||||
}
|
||||
|
||||
bool operator()(Dcb& dcb)
|
||||
{
|
||||
bool rv = false;
|
||||
|
||||
GWBUF* pPacket = gwbuf_clone(m_pPacket);
|
||||
|
||||
if (pPacket)
|
||||
{
|
||||
SERVER* pServer = dcb.server();
|
||||
HR_DEBUG("Writing packet to %p %s.", dcb.get(), pServer ? pServer->name : "(null)");
|
||||
|
||||
rv = dcb.write(pPacket);
|
||||
}
|
||||
|
||||
return rv;
|
||||
}
|
||||
|
||||
private:
|
||||
GWBUF* m_pPacket;
|
||||
};
|
||||
|
||||
|
||||
/**
|
||||
* HintMatcher is a function object that when invoked with a dcb, checks
|
||||
* whether the dcb matches the hint(s) that was given when the HintMatcher
|
||||
* was created.
|
||||
*/
|
||||
class HintMatcher : std::unary_function<const Dcb, bool>
|
||||
{
|
||||
public:
|
||||
HintMatcher(HINT* pHint)
|
||||
: m_pHint(pHint)
|
||||
{
|
||||
}
|
||||
|
||||
bool operator()(const Dcb& dcb)
|
||||
{
|
||||
bool match = false;
|
||||
|
||||
SERVER* pServer = dcb.server();
|
||||
|
||||
if (pServer)
|
||||
{
|
||||
HINT* pHint = m_pHint;
|
||||
|
||||
while (!match && pHint)
|
||||
{
|
||||
switch (pHint->type)
|
||||
{
|
||||
case HINT_ROUTE_TO_MASTER:
|
||||
if (SERVER_IS_MASTER(pServer))
|
||||
{
|
||||
match = true;
|
||||
}
|
||||
break;
|
||||
|
||||
case HINT_ROUTE_TO_SLAVE:
|
||||
if (SERVER_IS_SLAVE(pServer))
|
||||
{
|
||||
match = true;
|
||||
}
|
||||
break;
|
||||
|
||||
case HINT_ROUTE_TO_NAMED_SERVER:
|
||||
{
|
||||
const char* zName = static_cast<const char*>(pHint->data);
|
||||
|
||||
if (strcmp(zName, pServer->name) == 0)
|
||||
{
|
||||
match = true;
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
||||
case HINT_ROUTE_TO_UPTODATE_SERVER:
|
||||
case HINT_ROUTE_TO_ALL:
|
||||
case HINT_PARAMETER:
|
||||
MXS_ERROR("HINT not handled.");
|
||||
ss_dassert(false);
|
||||
break;
|
||||
}
|
||||
|
||||
pHint = pHint->next;
|
||||
}
|
||||
}
|
||||
|
||||
return match;
|
||||
}
|
||||
|
||||
private:
|
||||
HINT* m_pHint;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
int32_t HintRouterSession::routeQuery(GWBUF* pPacket)
|
||||
{
|
||||
MXS_ERROR("routeQuery not implemented yet.");
|
||||
return 0;
|
||||
HR_ENTRY();
|
||||
|
||||
int32_t continue_routing = 1;
|
||||
|
||||
if (pPacket->hint)
|
||||
{
|
||||
// At least one hint => look for match.
|
||||
HR_DEBUG("Hint, looking for match.");
|
||||
|
||||
Backends::iterator i = std::find_if(m_backends.begin(),
|
||||
m_backends.end(),
|
||||
HintMatcher(pPacket->hint));
|
||||
|
||||
if (i != m_backends.end())
|
||||
{
|
||||
Dcb& dcb = *i;
|
||||
HR_DEBUG("Writing packet to %s.", dcb.server()->name);
|
||||
|
||||
dcb.write(pPacket);
|
||||
|
||||
// Rotate dcbs so that we get round robin as far as the slaves are concerned.
|
||||
m_backends.push_front(m_backends.back()); // Push last to first
|
||||
m_backends.pop_back(); // Remove last element
|
||||
|
||||
m_surplus_replies = 0;
|
||||
}
|
||||
else
|
||||
{
|
||||
MXS_ERROR("No backend to write to.");
|
||||
continue_routing = 0;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// No hint => all.
|
||||
HR_DEBUG("No hints, writing to all.");
|
||||
|
||||
size_t n_writes = std::count_if(m_backends.begin(), m_backends.end(), Writer(pPacket));
|
||||
gwbuf_free(pPacket);
|
||||
|
||||
if (n_writes != 0)
|
||||
{
|
||||
m_surplus_replies = n_writes - 1;
|
||||
}
|
||||
else
|
||||
{
|
||||
MXS_ERROR("Nothing could be written, terminating session.");
|
||||
|
||||
continue_routing = 0;
|
||||
}
|
||||
}
|
||||
|
||||
return continue_routing;
|
||||
}
|
||||
|
||||
|
||||
void HintRouterSession::clientReply(GWBUF* pPacket, DCB* pBackend)
|
||||
{
|
||||
MXS_ERROR("clientReply not implemented yet.");
|
||||
HR_ENTRY();
|
||||
|
||||
SERVER* pServer = pBackend->server;
|
||||
|
||||
if (m_surplus_replies == 0)
|
||||
{
|
||||
HR_DEBUG("Returning packet from %s.", pServer ? pServer->name : "(null)");
|
||||
|
||||
MXS_SESSION_ROUTE_REPLY(pBackend->session, pPacket);
|
||||
}
|
||||
else
|
||||
{
|
||||
HR_DEBUG("Ignoring reply packet from %s.", pServer ? pServer->name : "(null)");
|
||||
|
||||
--m_surplus_replies;
|
||||
gwbuf_free(pPacket);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -51,6 +236,41 @@ void HintRouterSession::handleError(GWBUF* pMessage,
|
||||
mxs_error_action_t action,
|
||||
bool* pSuccess)
|
||||
{
|
||||
HR_ENTRY();
|
||||
|
||||
ss_dassert(pProblem->dcb_role == DCB_ROLE_BACKEND_HANDLER);
|
||||
MXS_ERROR("handleError not implemented yet.");
|
||||
|
||||
MXS_SESSION* pSession = pProblem->session;
|
||||
mxs_session_state_t sesstate = pSession->state;
|
||||
|
||||
switch (action)
|
||||
{
|
||||
case ERRACT_REPLY_CLIENT:
|
||||
{
|
||||
/* React to failed authentication, send message to client */
|
||||
if (sesstate == SESSION_STATE_ROUTER_READY)
|
||||
{
|
||||
/* Send error report to client */
|
||||
GWBUF* pCopy = gwbuf_clone(pMessage);
|
||||
if (pCopy)
|
||||
{
|
||||
DCB* pClient = pSession->client_dcb;
|
||||
pClient->func.write(pClient, pCopy);
|
||||
}
|
||||
}
|
||||
*pSuccess = false;
|
||||
}
|
||||
break;
|
||||
|
||||
case ERRACT_NEW_CONNECTION:
|
||||
{
|
||||
HR_DEBUG("ERRACT_NEW_CONNECTION");
|
||||
*pSuccess = true;
|
||||
}
|
||||
break;
|
||||
|
||||
default:
|
||||
ss_dassert(!true);
|
||||
*pSuccess = false;
|
||||
}
|
||||
}
|
||||
|
||||
@ -12,16 +12,21 @@
|
||||
* Public License.
|
||||
*/
|
||||
|
||||
#include <maxscale/cppdefs.hh>
|
||||
#include "hintrouterdefs.hh"
|
||||
#include <deque>
|
||||
#include <maxscale/router.hh>
|
||||
#include "dcb.hh"
|
||||
|
||||
class HintRouter;
|
||||
|
||||
class HintRouterSession : public maxscale::RouterSession
|
||||
{
|
||||
public:
|
||||
HintRouterSession(MXS_SESSION* pSession,
|
||||
HintRouter* pRouter);
|
||||
typedef std::deque<Dcb> Backends;
|
||||
|
||||
HintRouterSession(MXS_SESSION* pSession,
|
||||
HintRouter* pRouter,
|
||||
const Backends& backends);
|
||||
|
||||
~HintRouterSession();
|
||||
|
||||
@ -42,4 +47,7 @@ private:
|
||||
|
||||
private:
|
||||
HintRouter* m_pRouter;
|
||||
Backends m_backends;
|
||||
size_t m_surplus_replies;
|
||||
|
||||
};
|
||||
|
||||
@ -116,6 +116,7 @@ MXS_MODULE* MXS_CREATE_MODULE()
|
||||
MXS_ROUTER_VERSION,
|
||||
"The MaxScale Information Schema",
|
||||
"V1.0.0",
|
||||
RCAP_TYPE_NO_AUTH,
|
||||
&MyObject,
|
||||
NULL, /* Process init. */
|
||||
NULL, /* Process finish. */
|
||||
@ -381,7 +382,7 @@ diagnostics(MXS_ROUTER *instance, DCB *dcb)
|
||||
static uint64_t
|
||||
getCapabilities(MXS_ROUTER* instance)
|
||||
{
|
||||
return 0;
|
||||
return RCAP_TYPE_NONE;
|
||||
}
|
||||
|
||||
|
||||
|
||||
@ -136,6 +136,7 @@ MXS_MODULE* MXS_CREATE_MODULE()
|
||||
MXS_ROUTER_VERSION,
|
||||
"A connection based router to load balance based on connections",
|
||||
"V1.1.0",
|
||||
MXS_NO_MODULE_CAPABILITIES,
|
||||
&MyObject,
|
||||
NULL, /* Process init. */
|
||||
NULL, /* Process finish. */
|
||||
|
||||
@ -161,6 +161,7 @@ MXS_MODULE *MXS_CREATE_MODULE()
|
||||
MXS_MODULE_API_ROUTER, MXS_MODULE_GA, MXS_ROUTER_VERSION,
|
||||
"A Read/Write splitting router for enhancement read scalability",
|
||||
"V1.1.0",
|
||||
RCAP_TYPE_STMT_INPUT | RCAP_TYPE_TRANSACTION_TRACKING,
|
||||
&MyObject,
|
||||
NULL, /* Process init. */
|
||||
NULL, /* Process finish. */
|
||||
@ -195,6 +196,7 @@ MXS_MODULE *MXS_CREATE_MODULE()
|
||||
{"max_sescmd_history", MXS_MODULE_PARAM_COUNT, "0"},
|
||||
{"strict_multi_stmt", MXS_MODULE_PARAM_BOOL, "true"},
|
||||
{"master_accept_reads", MXS_MODULE_PARAM_BOOL, "false"},
|
||||
{"connection_keepalive", MXS_MODULE_PARAM_COUNT, "0"},
|
||||
{MXS_END_MODULE_PARAMS}
|
||||
}
|
||||
};
|
||||
@ -277,6 +279,7 @@ static MXS_ROUTER *createInstance(SERVICE *service, char **options)
|
||||
router->rwsplit_config.disable_sescmd_history = config_get_bool(params, "disable_sescmd_history");
|
||||
router->rwsplit_config.max_sescmd_history = config_get_integer(params, "max_sescmd_history");
|
||||
router->rwsplit_config.master_accept_reads = config_get_bool(params, "master_accept_reads");
|
||||
router->rwsplit_config.connection_keepalive = config_get_integer(params, "connection_keepalive");
|
||||
|
||||
if (!handle_max_slaves(router, config_get_string(params, "max_slave_connections")) ||
|
||||
(options && !rwsplit_process_router_options(router, options)))
|
||||
@ -820,7 +823,7 @@ static void clientReply(MXS_ROUTER *instance,
|
||||
*/
|
||||
static uint64_t getCapabilities(MXS_ROUTER* instance)
|
||||
{
|
||||
return RCAP_TYPE_STMT_INPUT | RCAP_TYPE_TRANSACTION_TRACKING;
|
||||
return RCAP_TYPE_NONE;
|
||||
}
|
||||
|
||||
/*
|
||||
|
||||
@ -275,6 +275,8 @@ typedef struct rwsplit_config_st
|
||||
enum failure_mode master_failure_mode; /**< Master server failure handling mode.
|
||||
* @see enum failure_mode */
|
||||
bool retry_failed_reads; /**< Retry failed reads on other servers */
|
||||
int connection_keepalive; /**< Send pings to servers that have
|
||||
* been idle for too long */
|
||||
} rwsplit_config_t;
|
||||
|
||||
#if defined(PREP_STMT_CACHING)
|
||||
|
||||
@ -19,9 +19,11 @@
|
||||
#include <stdlib.h>
|
||||
#include <stdint.h>
|
||||
#include <maxscale/alloc.h>
|
||||
|
||||
#include <maxscale/router.h>
|
||||
#include <maxscale/modutil.h>
|
||||
|
||||
#include "rwsplit_internal.h"
|
||||
|
||||
/**
|
||||
* @file rwsplit_route_stmt.c The functions that support the routing of
|
||||
* queries to back end servers. All the functions in this module are internal
|
||||
@ -44,6 +46,35 @@ static backend_ref_t *check_candidate_bref(backend_ref_t *cand,
|
||||
select_criteria_t sc);
|
||||
static backend_ref_t *get_root_master_bref(ROUTER_CLIENT_SES *rses);
|
||||
|
||||
void handle_connection_keepalive(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
|
||||
DCB *target_dcb)
|
||||
{
|
||||
ss_dassert(target_dcb);
|
||||
ss_debug(int nserv = 0);
|
||||
|
||||
for (int i = 0; i < rses->rses_nbackends; i++)
|
||||
{
|
||||
/** Each heartbeat is 1/10th of a second */
|
||||
int keepalive = inst->rwsplit_config.connection_keepalive * 10;
|
||||
backend_ref_t *bref = &rses->rses_backend_ref[i];
|
||||
|
||||
if (bref->bref_dcb != target_dcb && BREF_IS_IN_USE(bref) &&
|
||||
!BREF_IS_WAITING_RESULT(bref))
|
||||
{
|
||||
ss_debug(nserv++);
|
||||
int diff = hkheartbeat - bref->bref_dcb->last_read;
|
||||
|
||||
if (diff > keepalive)
|
||||
{
|
||||
MXS_INFO("Pinging %s, idle for %d seconds",
|
||||
bref->bref_dcb->server->unique_name, diff / 10);
|
||||
modutil_ignorable_ping(bref->bref_dcb);
|
||||
}
|
||||
}
|
||||
}
|
||||
ss_dassert(nserv < rses->rses_nbackends);
|
||||
}
|
||||
|
||||
/**
|
||||
* Routing function. Find out query type, backend type, and target DCB(s).
|
||||
* Then route query to found target(s).
|
||||
@ -148,6 +179,12 @@ bool route_single_stmt(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
|
||||
}
|
||||
}
|
||||
|
||||
if (succp && inst->rwsplit_config.connection_keepalive &&
|
||||
(TARGET_IS_SLAVE(route_target) || TARGET_IS_MASTER(route_target)))
|
||||
{
|
||||
handle_connection_keepalive(inst, rses, target_dcb);
|
||||
}
|
||||
|
||||
return succp;
|
||||
} /* route_single_stmt */
|
||||
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@ -110,7 +110,7 @@ typedef enum backend_type_t
|
||||
BE_COUNT
|
||||
} backend_type_t;
|
||||
|
||||
struct router_instance;
|
||||
struct schemarouter_instance;
|
||||
|
||||
/**
|
||||
* Route target types
|
||||
@ -132,7 +132,7 @@ typedef enum
|
||||
#define TARGET_IS_ANY(t) (t & TARGET_ANY)
|
||||
|
||||
typedef struct rses_property_st rses_property_t;
|
||||
typedef struct router_client_session ROUTER_CLIENT_SES;
|
||||
typedef struct schemarouter_session SCHEMAROUTER_SESSION;
|
||||
|
||||
/**
|
||||
* Router session properties
|
||||
@ -189,7 +189,7 @@ struct rses_property_st
|
||||
#if defined(SS_DEBUG)
|
||||
skygw_chk_t rses_prop_chk_top;
|
||||
#endif
|
||||
ROUTER_CLIENT_SES* rses_prop_rsession; /*< Parent router session */
|
||||
SCHEMAROUTER_SESSION* rses_prop_rsession; /*< Parent router session */
|
||||
int rses_prop_refcount; /*< Reference count*/
|
||||
rses_property_type_t rses_prop_type; /*< Property type */
|
||||
union rses_prop_data
|
||||
@ -208,7 +208,7 @@ typedef struct sescmd_cursor_st
|
||||
#if defined(SS_DEBUG)
|
||||
skygw_chk_t scmd_cur_chk_top;
|
||||
#endif
|
||||
ROUTER_CLIENT_SES* scmd_cur_rses; /*< pointer to owning router session */
|
||||
SCHEMAROUTER_SESSION* scmd_cur_rses; /*< pointer to owning router session */
|
||||
rses_property_t** scmd_cur_ptr_property; /*< address of pointer to owner property */
|
||||
mysql_sescmd_t* scmd_cur_cmd; /*< pointer to current session command */
|
||||
bool scmd_cur_active; /*< true if command is being executed */
|
||||
@ -313,14 +313,12 @@ typedef struct
|
||||
/**
|
||||
* The client session structure used within this router.
|
||||
*/
|
||||
struct router_client_session
|
||||
struct schemarouter_session
|
||||
{
|
||||
#if defined(SS_DEBUG)
|
||||
skygw_chk_t rses_chk_top;
|
||||
#endif
|
||||
SPINLOCK rses_lock; /*< protects rses_deleted */
|
||||
int rses_versno; /*< even = no active update, else odd. not used 4/14 */
|
||||
bool rses_closed; /*< true when closeSession is called */
|
||||
bool closed; /*< true when closeSession is called */
|
||||
DCB* rses_client_dcb;
|
||||
MYSQL_session* rses_mysql_session; /*< Session client data (username, password, SHA1). */
|
||||
/** Properties listed by their type */
|
||||
@ -331,10 +329,10 @@ struct router_client_session
|
||||
int rses_nbackends; /*< Number of backends */
|
||||
bool rses_autocommit_enabled; /*< Is autocommit enabled */
|
||||
bool rses_transaction_active; /*< Is a transaction active */
|
||||
struct router_instance *router; /*< The router instance */
|
||||
struct router_client_session* next; /*< List of router sessions */
|
||||
shard_map_t*
|
||||
shardmap; /*< Database hash containing names of the databases mapped to the servers that contain them */
|
||||
struct schemarouter_instance *router; /*< The router instance */
|
||||
struct schemarouter_session* next; /*< List of router sessions */
|
||||
shard_map_t *shardmap; /*< Database hash containing names of the databases
|
||||
* mapped to the servers that contain them */
|
||||
char connect_db[MYSQL_DATABASE_MAXLEN + 1]; /*< Database the user was trying to connect to */
|
||||
char current_db[MYSQL_DATABASE_MAXLEN + 1]; /*< Current active database */
|
||||
init_mask_t init; /*< Initialization state bitmask */
|
||||
@ -353,18 +351,18 @@ struct router_client_session
|
||||
/**
|
||||
* The per instance data for the router.
|
||||
*/
|
||||
typedef struct router_instance
|
||||
typedef struct schemarouter_instance
|
||||
{
|
||||
HASHTABLE* shard_maps; /*< Shard maps hashed by user name */
|
||||
SERVICE* service; /*< Pointer to service */
|
||||
ROUTER_CLIENT_SES* connections; /*< List of client connections */
|
||||
SCHEMAROUTER_SESSION* connections; /*< List of client connections */
|
||||
SPINLOCK lock; /*< Lock for the instance data */
|
||||
schemarouter_config_t schemarouter_config; /*< expanded config info from SERVICE */
|
||||
int schemarouter_version;/*< version number for router's config */
|
||||
unsigned int bitmask; /*< Bitmask to apply to server->status */
|
||||
unsigned int bitvalue; /*< Required value of server->status */
|
||||
ROUTER_STATS stats; /*< Statistics for this router */
|
||||
struct router_instance* next; /*< Next router on the list */
|
||||
struct schemarouter_instance* next; /*< Next router on the list */
|
||||
bool available_slaves; /*< The router has some slaves available */
|
||||
HASHTABLE* ignored_dbs; /*< List of databases to ignore when the
|
||||
* database mapping finds multiple servers
|
||||
@ -374,7 +372,7 @@ typedef struct router_instance
|
||||
* if they are found on more than one server. */
|
||||
pcre2_match_data* ignore_match_data;
|
||||
|
||||
} ROUTER_INSTANCE;
|
||||
} SCHEMAROUTER;
|
||||
|
||||
#define BACKEND_TYPE(b) (SERVER_IS_MASTER((b)->backend_server) ? BE_MASTER : \
|
||||
(SERVER_IS_SLAVE((b)->backend_server) ? BE_SLAVE : BE_UNDEFINED));
|
||||
|
||||
Reference in New Issue
Block a user