Use router template in readconnroute

This commit is contained in:
Markus Mäkelä
2019-01-08 11:32:32 +02:00
parent 54589424f9
commit d6df5a7d6a
2 changed files with 139 additions and 206 deletions

View File

@ -53,28 +53,6 @@
#include <maxscale/modutil.hh>
#include <maxscale/utils.hh>
/* The router entry points */
static MXS_ROUTER* createInstance(SERVICE* service, MXS_CONFIG_PARAMETER* params);
static MXS_ROUTER_SESSION* newSession(MXS_ROUTER* instance, MXS_SESSION* session);
static void closeSession(MXS_ROUTER* instance, MXS_ROUTER_SESSION* router_session);
static void freeSession(MXS_ROUTER* instance, MXS_ROUTER_SESSION* router_session);
static int routeQuery(MXS_ROUTER* instance, MXS_ROUTER_SESSION* router_session, GWBUF* queue);
static void diagnostics(MXS_ROUTER* instance, DCB* dcb);
static json_t* diagnostics_json(const MXS_ROUTER* instance);
static void clientReply(MXS_ROUTER* instance,
MXS_ROUTER_SESSION* router_session,
GWBUF* queue,
DCB* backend_dcb);
static void handleError(MXS_ROUTER* instance,
MXS_ROUTER_SESSION* router_session,
GWBUF* errbuf,
DCB* problem_dcb,
mxs_error_action_t action,
bool* succp);
static uint64_t getCapabilities(MXS_ROUTER* instance);
static bool configureInstance(MXS_ROUTER* instance, MXS_CONFIG_PARAMETER* params);
static SERVER_REF* get_root_master(SERVER_REF* servers);
/**
* The module entry point routine. It is this routine that
* must populate the structure that is referred to as the
@ -87,22 +65,6 @@ extern "C" MXS_MODULE* MXS_CREATE_MODULE()
{
MXS_NOTICE("Initialise readconnroute router module.");
static MXS_ROUTER_OBJECT MyObject =
{
createInstance,
newSession,
closeSession,
freeSession,
routeQuery,
diagnostics,
diagnostics_json,
clientReply,
handleError,
getCapabilities,
nullptr,
configureInstance
};
static MXS_MODULE info =
{
MXS_MODULE_API_ROUTER,
@ -111,7 +73,7 @@ extern "C" MXS_MODULE* MXS_CREATE_MODULE()
"A connection based router to load balance based on connections",
"V2.0.0",
RCAP_TYPE_RUNTIME_CONFIG,
&MyObject,
&ReadConn::s_object,
nullptr, /* Process init. */
nullptr, /* Process finish. */
nullptr, /* Thread init. */
@ -124,14 +86,35 @@ extern "C" MXS_MODULE* MXS_CREATE_MODULE()
return &info;
}
static inline void free_readconn_instance(ReadConn* router)
/*
* This routine returns the master server from a MariaDB replication tree. The server must be
* running, not in maintenance and have the master bit set. If multiple masters are found,
* the one with the highest weight is chosen.
*
* @param servers The list of servers
*
* @return The Master server
*/
static SERVER_REF* get_root_master(SERVER_REF* servers)
{
delete router;
SERVER_REF* master_host = nullptr;
for (SERVER_REF* ref = servers; ref; ref = ref->next)
{
if (ref->active && ref->server->is_master())
{
// No master found yet or this one has better weight.
if (!master_host || ref->server_weight > master_host->server_weight)
{
master_host = ref;
}
}
}
return master_host;
}
static bool configureInstance(MXS_ROUTER* instance, MXS_CONFIG_PARAMETER* params)
bool ReadConn::configure(MXS_CONFIG_PARAMETER* params)
{
ReadConn* inst = static_cast<ReadConn*>(instance);
ReadConn* inst = this;
uint64_t bitmask = 0;
uint64_t bitvalue = 0;
bool ok = true;
@ -189,6 +172,12 @@ static bool configureInstance(MXS_ROUTER* instance, MXS_CONFIG_PARAMETER* params
return ok;
}
ReadConn::ReadConn(SERVICE* service)
: mxs::Router<ReadConn, ReadConnSession>(service)
{
}
/**
* Create an instance of the router for a particular service
* within the gateway.
@ -198,24 +187,40 @@ static bool configureInstance(MXS_ROUTER* instance, MXS_CONFIG_PARAMETER* params
*
* @return The instance data for this new instance
*/
static MXS_ROUTER* createInstance(SERVICE* service, MXS_CONFIG_PARAMETER* params)
ReadConn* ReadConn::create(SERVICE* service, MXS_CONFIG_PARAMETER* params)
{
ReadConn* inst = new(std::nothrow) ReadConn;
ReadConn* inst = new(std::nothrow) ReadConn(service);
if (inst)
if (inst && !inst->configure(params))
{
inst->service = service;
inst->bitmask_and_bitvalue = 0;
if (!configureInstance(static_cast<MXS_ROUTER*>(inst), params))
{
free_readconn_instance(inst);
inst = nullptr;
}
delete inst;
inst = nullptr;
}
return static_cast<MXS_ROUTER*>(inst);
return inst;
}
ReadConnSession::ReadConnSession(ReadConn* inst, MXS_SESSION* session, SERVER_REF* backend, DCB* dcb,
uint32_t bitmask, uint32_t bitvalue)
: mxs::RouterSession(session)
, instance(inst)
, backend(backend)
, backend_dcb(dcb)
, client_dcb(session->client_dcb)
, bitmask(bitmask)
, bitvalue(bitvalue)
{
}
ReadConnSession::~ReadConnSession()
{
mxb::atomic::add(&backend->connections, -1, mxb::atomic::RELAXED);
}
void ReadConnSession::close()
{
mxb_assert(backend_dcb);
dcb_close(backend_dcb);
}
/**
@ -225,33 +230,18 @@ static MXS_ROUTER* createInstance(SERVICE* service, MXS_CONFIG_PARAMETER* params
* @param session The session itself
* @return Session specific data for this session
*/
static MXS_ROUTER_SESSION* newSession(MXS_ROUTER* instance, MXS_SESSION* session)
ReadConnSession* ReadConn::newSession(MXS_SESSION* session)
{
ReadConn* inst = static_cast<ReadConn*>(instance);
MXS_DEBUG("%lu [newSession] new router session with session "
"%p, and inst %p.",
pthread_self(),
session,
inst);
ReadConnSession* client_rses = new(std::nothrow) ReadConnSession;
if (!client_rses)
{
return nullptr;
}
client_rses->client_dcb = session->client_dcb;
ReadConn* inst = this;
uint64_t mask = atomic_load_uint64(&inst->bitmask_and_bitvalue);
client_rses->bitmask = mask;
client_rses->bitvalue = mask >> 32;
uint32_t bitmask = mask;
uint32_t bitvalue = mask >> 32;
/**
* Find the Master host from available servers
*/
SERVER_REF* master_host = get_root_master(inst->service->dbref);
SERVER_REF* master_host = get_root_master(inst->m_pService->dbref);
/**
* Find a backend server to connect to. This is the extent of the
@ -272,7 +262,7 @@ static MXS_ROUTER_SESSION* newSession(MXS_ROUTER* instance, MXS_SESSION* session
* become the new candidate. This has the effect of spreading the
* connections over different servers during periods of very low load.
*/
for (SERVER_REF* ref = inst->service->dbref; ref; ref = ref->next)
for (SERVER_REF* ref = inst->m_pService->dbref; ref; ref = ref->next)
{
if (!server_ref_is_active(ref) || ref->server->is_in_maint())
{
@ -280,13 +270,12 @@ static MXS_ROUTER_SESSION* newSession(MXS_ROUTER* instance, MXS_SESSION* session
}
/* Check server status bits against bitvalue from router_options */
if (ref && ref->server->is_usable()
&& (ref->server->status & client_rses->bitmask & client_rses->bitvalue))
if (ref && ref->server->is_usable() && (ref->server->status & bitmask & bitvalue))
{
if (master_host)
{
if (ref == master_host
&& (client_rses->bitvalue & (SERVER_SLAVE | SERVER_MASTER)) == SERVER_SLAVE)
&& (bitvalue & (SERVER_SLAVE | SERVER_MASTER)) == SERVER_SLAVE)
{
/* Skip root master here, as it could also be slave of an external server that
* is not in the configuration. Intermediate masters (Relay Servers) are also
@ -295,7 +284,7 @@ static MXS_ROUTER_SESSION* newSession(MXS_ROUTER* instance, MXS_SESSION* session
continue;
}
if (ref == master_host && client_rses->bitvalue == SERVER_MASTER)
if (ref == master_host && bitvalue == SERVER_MASTER)
{
/* If option is "master" return only the root Master as there could be
* intermediate masters (Relay Servers) and they must not be selected.
@ -305,7 +294,7 @@ static MXS_ROUTER_SESSION* newSession(MXS_ROUTER* instance, MXS_SESSION* session
break;
}
}
else if (client_rses->bitvalue == SERVER_MASTER)
else if (bitvalue == SERVER_MASTER)
{
/* Master_host is nullptr, no master server. If requested router_option is 'master'
* candidate will be nullptr.
@ -350,33 +339,33 @@ static MXS_ROUTER_SESSION* newSession(MXS_ROUTER* instance, MXS_SESSION* session
//
// We must do that so that readconnroute in MaxScale 2.2 will again behave
// the same way as it did up until 2.1.12.
if (client_rses->bitvalue & SERVER_SLAVE)
if (bitvalue & SERVER_SLAVE)
{
client_rses->bitvalue |= SERVER_MASTER;
bitvalue |= SERVER_MASTER;
}
}
else
{
MXS_ERROR("Failed to create new routing session. Couldn't find eligible"
" candidate server. Freeing allocated resources.");
MXS_FREE(client_rses);
return nullptr;
}
}
/*
* We now have the server with the least connections.
* Bump the connection count for this server
*/
client_rses->backend = candidate;
/** Open the backend connection */
client_rses->backend_dcb = dcb_connect(candidate->server, session, candidate->server->protocol().c_str());
DCB* backend_dcb = dcb_connect(candidate->server, session, candidate->server->protocol().c_str());
if (!client_rses->backend_dcb)
if (!backend_dcb)
{
/** The failure is reported in dcb_connect() */
delete client_rses;
return nullptr;
}
ReadConnSession* client_rses = new(std::nothrow) ReadConnSession(inst, session, candidate, backend_dcb,
bitmask, bitvalue);
if (!client_rses)
{
return nullptr;
}
@ -388,51 +377,7 @@ static MXS_ROUTER_SESSION* newSession(MXS_ROUTER* instance, MXS_SESSION* session
candidate->server->name(),
candidate->connections);
return static_cast<MXS_ROUTER_SESSION*>(client_rses);
}
/**
* @node Unlink from backend server, unlink from router's connection list,
* and free memory of a router client session.
*
* Parameters:
* @param router - <usage>
* <description>
*
* @param router_cli_ses - <usage>
* <description>
*
* @return void
*
*
* @details (write detailed description here)
*
*/
static void freeSession(MXS_ROUTER* router_instance, MXS_ROUTER_SESSION* router_client_ses)
{
ReadConn* router = static_cast<ReadConn*>(router_instance);
ReadConnSession* router_cli_ses = static_cast<ReadConnSession*>(router_client_ses);
MXB_AT_DEBUG(int prev_val = ) mxb::atomic::add(&router_cli_ses->backend->connections,
-1,
mxb::atomic::RELAXED);
mxb_assert(prev_val > 0);
delete router_cli_ses;
}
/**
* Close a session with the router, this is the mechanism
* by which a router may cleanup data structure etc.
*
* @param instance The router instance data
* @param router_session The session being closed
*/
static void closeSession(MXS_ROUTER* instance, MXS_ROUTER_SESSION* router_session)
{
ReadConnSession* router_cli_ses = static_cast<ReadConnSession*>(router_session);
mxb_assert(router_cli_ses->backend_dcb);
dcb_close(router_cli_ses->backend_dcb);
return client_rses;
}
/** Log routing failure due to closed session */
@ -464,7 +409,7 @@ static void log_closed_session(mxs_mysql_cmd_t mysql_command, SERVER_REF* ref)
*
* @return True if the backend connection is still valid
*/
static inline bool connection_is_valid(ReadConn* inst, ReadConnSession* router_cli_ses)
bool ReadConn::connection_is_valid(ReadConnSession* router_cli_ses)
{
bool rval = false;
@ -481,7 +426,7 @@ static inline bool connection_is_valid(ReadConn* inst, ReadConnSession* router_c
if ((router_cli_ses->bitvalue == SERVER_MASTER) && router_cli_ses->backend->active)
{
// If we're using an active master server, verify that it is still a master
rval = router_cli_ses->backend == get_root_master(inst->service->dbref);
rval = router_cli_ses->backend == get_root_master(m_pService->dbref);
}
else
{
@ -504,15 +449,14 @@ static inline bool connection_is_valid(ReadConn* inst, ReadConnSession* router_c
* This is simply a case of sending it to the connection that was
* chosen when we started the client session.
*
* @param instance The router instance
* @param router_session The router session returned from the newSession call
* @param queue The queue of data buffers to route
*
* @return if succeed 1, otherwise 0
*/
static int routeQuery(MXS_ROUTER* instance, MXS_ROUTER_SESSION* router_session, GWBUF* queue)
int ReadConnSession::routeQuery(GWBUF* queue)
{
ReadConn* inst = static_cast<ReadConn*>(instance);
ReadConnSession* router_cli_ses = static_cast<ReadConnSession*>(router_session);
ReadConn* inst = instance;
ReadConnSession* router_cli_ses = this;
int rc = 0;
MySQLProtocol* proto = static_cast<MySQLProtocol*>(router_cli_ses->client_dcb->protocol);
mxs_mysql_cmd_t mysql_command = proto->current_command;
@ -526,7 +470,7 @@ static int routeQuery(MXS_ROUTER* instance, MXS_ROUTER_SESSION* router_session,
mxb_assert(backend_dcb);
char* trc = nullptr;
if (!connection_is_valid(inst, router_cli_ses))
if (!inst->connection_is_valid(router_cli_ses))
{
log_closed_session(mysql_command, router_cli_ses->backend);
gwbuf_free(queue);
@ -569,17 +513,17 @@ static int routeQuery(MXS_ROUTER* instance, MXS_ROUTER_SESSION* router_session,
* @param instance Instance of the router
* @param dcb DCB to send diagnostics to
*/
static void diagnostics(MXS_ROUTER* router, DCB* dcb)
void ReadConn::diagnostics(DCB* dcb)
{
ReadConn* router_inst = static_cast<ReadConn*>(router);
const char* weightby = serviceGetWeightingParameter(router_inst->service);
ReadConn* router_inst = this;
const char* weightby = serviceGetWeightingParameter(router_inst->m_pService);
dcb_printf(dcb,
"\tNumber of router sessions: %d\n",
router_inst->stats.n_sessions);
dcb_printf(dcb,
"\tCurrent no. of router sessions: %d\n",
router_inst->service->stats.n_current);
router_inst->m_pService->stats.n_current);
dcb_printf(dcb,
"\tNumber of queries forwarded: %d\n",
router_inst->stats.n_queries);
@ -591,7 +535,7 @@ static void diagnostics(MXS_ROUTER* router, DCB* dcb)
weightby);
dcb_printf(dcb,
"\t\tServer Target %% Connections\n");
for (SERVER_REF* ref = router_inst->service->dbref; ref; ref = ref->next)
for (SERVER_REF* ref = router_inst->m_pService->dbref; ref; ref = ref->next)
{
dcb_printf(dcb,
"\t\t%-20s %3.1f%% %d\n",
@ -608,16 +552,16 @@ static void diagnostics(MXS_ROUTER* router, DCB* dcb)
* @param instance Instance of the router
* @param dcb DCB to send diagnostics to
*/
static json_t* diagnostics_json(const MXS_ROUTER* router)
json_t* ReadConn::diagnostics_json() const
{
const ReadConn* router_inst = static_cast<const ReadConn*>(router);
const ReadConn* router_inst = this;
json_t* rval = json_object();
json_object_set_new(rval, "connections", json_integer(router_inst->stats.n_sessions));
json_object_set_new(rval, "current_connections", json_integer(router_inst->service->stats.n_current));
json_object_set_new(rval, "current_connections", json_integer(router_inst->m_pService->stats.n_current));
json_object_set_new(rval, "queries", json_integer(router_inst->stats.n_queries));
const char* weightby = serviceGetWeightingParameter(router_inst->service);
const char* weightby = serviceGetWeightingParameter(router_inst->m_pService);
if (*weightby)
{
@ -632,15 +576,10 @@ static json_t* diagnostics_json(const MXS_ROUTER* router)
*
* The routine will reply to client data from backend server
*
* @param instance The router instance
* @param router_session The router session
* @param backend_dcb The backend DCB
* @param queue The GWBUF with reply data
*/
static void clientReply(MXS_ROUTER* instance,
MXS_ROUTER_SESSION* router_session,
GWBUF* queue,
DCB* backend_dcb)
void ReadConnSession::clientReply(GWBUF* queue, DCB* backend_dcb)
{
mxb_assert(backend_dcb->session->client_dcb);
MXS_SESSION_ROUTE_REPLY(backend_dcb->session, queue);
@ -651,20 +590,12 @@ static void clientReply(MXS_ROUTER* instance,
*
* The routine will handle errors that occurred in writes.
*
* @param instance The router instance
* @param router_session The router session
* @param message The error message to reply
* @param problem_dcb The DCB related to the error
* @param action The action: ERRACT_NEW_CONNECTION or ERRACT_REPLY_CLIENT
* @param succp Result of action: true if router can continue
*
*/
static void handleError(MXS_ROUTER* instance,
MXS_ROUTER_SESSION* router_session,
GWBUF* errbuf,
DCB* problem_dcb,
mxs_error_action_t action,
bool* succp)
void ReadConnSession::handleError(GWBUF* errbuf, DCB* problem_dcb, mxs_error_action_t action, bool* succp)
{
mxb_assert(problem_dcb->role == DCB::Role::BACKEND);
@ -676,34 +607,7 @@ static void handleError(MXS_ROUTER* instance,
*succp = false;
}
static uint64_t getCapabilities(MXS_ROUTER* instance)
uint64_t ReadConn::getCapabilities()
{
return RCAP_TYPE_RUNTIME_CONFIG;
}
/*
* This routine returns the master server from a MariaDB replication tree. The server must be
* running, not in maintenance and have the master bit set. If multiple masters are found,
* the one with the highest weight is chosen.
*
* @param servers The list of servers
* @return The Master server
*
*/
static SERVER_REF* get_root_master(SERVER_REF* servers)
{
SERVER_REF* master_host = nullptr;
for (SERVER_REF* ref = servers; ref; ref = ref->next)
{
if (ref->active && ref->server->is_master())
{
// No master found yet or this one has better weight.
if (!master_host || ref->server_weight > master_host->server_weight)
{
master_host = ref;
}
}
}
return master_host;
}

View File

@ -23,11 +23,28 @@
#include <maxscale/service.hh>
#include <maxscale/router.hh>
class ReadConn;
/**
* The client session structure used within this router.
*/
struct ReadConnSession : MXS_ROUTER_SESSION
class ReadConnSession : public mxs::RouterSession
{
public:
ReadConnSession(ReadConn* inst, MXS_SESSION* session, SERVER_REF* backend, DCB* dcb,
uint32_t bitmask, uint32_t bitvalue);
~ReadConnSession();
int routeQuery(GWBUF*);
void close();
void clientReply(GWBUF* pPacket, DCB* pBackend);
void handleError(GWBUF* pMessage,
DCB* pProblem,
mxs_error_action_t action,
bool* pSuccess);
ReadConn* instance;
SERVER_REF* backend; /*< Backend used by the client session */
DCB* backend_dcb;/*< DCB Connection to the backend */
DCB* client_dcb; /**< Client DCB */
@ -40,16 +57,28 @@ struct ReadConnSession : MXS_ROUTER_SESSION
*/
struct Stats
{
int n_sessions; /*< Number sessions created */
int n_queries; /*< Number of queries forwarded */
int n_sessions = 0; /*< Number sessions created */
int n_queries = 0; /*< Number of queries forwarded */
};
/**
* The per instance data for the router.
*/
struct ReadConn : public MXS_ROUTER
class ReadConn : public mxs::Router<ReadConn, ReadConnSession>
{
SERVICE* service; /*< Pointer to the service using this router */
uint64_t bitmask_and_bitvalue; /*< Lower 32-bits for bitmask and upper for bitvalue */
Stats stats; /*< Statistics for this router */
public:
static ReadConn* create(SERVICE* service, MXS_CONFIG_PARAMETER* params);
ReadConnSession* newSession(MXS_SESSION* pSession);
void diagnostics(DCB* pDcb);
json_t* diagnostics_json() const;
uint64_t getCapabilities();
bool configure(MXS_CONFIG_PARAMETER* params);
bool connection_is_valid(ReadConnSession* router_cli_ses);
uint64_t bitmask_and_bitvalue = 0; /*< Lower 32-bits for bitmask and upper for bitvalue */
Stats stats; /*< Statistics for this router */
private:
ReadConn(SERVICE* service);
};