Use router template in readwritesplit

Take the router template into use in readwritesplit.
This commit is contained in:
Markus Mäkelä
2018-04-03 16:11:47 +03:00
parent 90c2d575c5
commit 7d7cef7dcd
4 changed files with 126 additions and 138 deletions

View File

@ -55,8 +55,6 @@ using namespace maxscale;
* The functions that implement the router module API * The functions that implement the router module API
*/ */
static int routeQuery(MXS_ROUTER *instance, MXS_ROUTER_SESSION *session, GWBUF *queue);
static bool rwsplit_process_router_options(RWSplit *router, static bool rwsplit_process_router_options(RWSplit *router,
char **options); char **options);
static void handle_error_reply_client(MXS_SESSION *ses, RWSplitSession *rses, static void handle_error_reply_client(MXS_SESSION *ses, RWSplitSession *rses,
@ -482,7 +480,7 @@ static bool route_stored_query(RWSplitSession *rses)
uint8_t cmd = mxs_mysql_get_command(query_queue); uint8_t cmd = mxs_mysql_get_command(query_queue);
mysql_protocol_set_current_command(rses->client_dcb, (mxs_mysql_cmd_t)cmd); mysql_protocol_set_current_command(rses->client_dcb, (mxs_mysql_cmd_t)cmd);
if (!routeQuery((MXS_ROUTER*)rses->router, (MXS_ROUTER_SESSION*)rses, query_queue)) if (!rses->routeQuery(query_queue))
{ {
rval = false; rval = false;
MXS_ERROR("Failed to route queued query."); MXS_ERROR("Failed to route queued query.");
@ -541,6 +539,7 @@ void check_and_log_backend_state(const SRWBackend& backend, DCB* problem_dcb)
} }
RWSplit::RWSplit(SERVICE* service, const Config& config): RWSplit::RWSplit(SERVICE* service, const Config& config):
mxs::Router<RWSplit, RWSplitSession>(service),
m_service(service), m_service(service),
m_config(config) m_config(config)
{ {
@ -565,6 +564,10 @@ Stats& RWSplit::stats()
return m_stats; return m_stats;
} }
const Stats& RWSplit::stats() const
{
return m_stats;
}
int RWSplit::max_slave_count() const int RWSplit::max_slave_count() const
{ {
int router_nservers = m_service->n_dbref; int router_nservers = m_service->n_dbref;
@ -624,18 +627,8 @@ bool RWSplit::have_enough_servers() const
* API function definitions * API function definitions
*/ */
/**
* @brief Create a new readwritesplit router instance RWSplit* RWSplit::create(SERVICE *service, char **options)
*
* An instance of the router is required for each service that uses this router.
* One instance of the router will handle multiple router sessions.
*
* @param service The service this router is being create for
* @param options The options for this query router
*
* @return New router instance or NULL on error
*/
static MXS_ROUTER* createInstance(SERVICE *service, char **options)
{ {
MXS_CONFIG_PARAMETER* params = service->svc_config_param; MXS_CONFIG_PARAMETER* params = service->svc_config_param;
@ -653,33 +646,14 @@ static MXS_ROUTER* createInstance(SERVICE *service, char **options)
config.max_sescmd_history = 0; config.max_sescmd_history = 0;
} }
return (MXS_ROUTER*)new (std::nothrow) RWSplit(service, config); return new (std::nothrow) RWSplit(service, config);
} }
/** RWSplitSession* RWSplit::newSession(MXS_SESSION *session)
* @brief Create a new session for this router instance
*
* The session is used to store all the data required by the router for a
* particular client connection. The instance of the router that relates to a
* particular service is passed as the first parameter. The second parameter is
* the session that has been created in response to the request from a client
* for a connection. The passed session contains generic information; this
* function creates the session structure that holds router specific data.
* There is often a one to one relationship between sessions and router
* sessions, although it is possible to create configurations where a
* connection is handled by multiple routers, one after another.
*
* @param instance The router instance data
* @param session The MaxScale session (generic connection data)
*
* @return New router session or NULL on error
*/
static MXS_ROUTER_SESSION* newSession(MXS_ROUTER *router_inst, MXS_SESSION *session)
{ {
RWSplit* router = reinterpret_cast<RWSplit*>(router_inst);
RWSplitSession* rses = NULL; RWSplitSession* rses = NULL;
MXS_EXCEPTION_GUARD(rses = RWSplitSession::create(router, session)); MXS_EXCEPTION_GUARD(rses = RWSplitSession::create(this, session));
return reinterpret_cast<MXS_ROUTER_SESSION*>(rses); return rses;
} }
/** /**
@ -693,9 +667,9 @@ static MXS_ROUTER_SESSION* newSession(MXS_ROUTER *router_inst, MXS_SESSION *sess
* @param instance The router instance data * @param instance The router instance data
* @param session The router session being closed * @param session The router session being closed
*/ */
static void closeSession(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session) void RWSplitSession::close()
{ {
RWSplitSession *router_cli_ses = (RWSplitSession *)router_session; RWSplitSession *router_cli_ses = this;
CHK_CLIENT_RSES(router_cli_ses); CHK_CLIENT_RSES(router_cli_ses);
if (!router_cli_ses->rses_closed) if (!router_cli_ses->rses_closed)
@ -721,40 +695,10 @@ static void closeSession(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_sessio
} }
} }
/** int32_t RWSplitSession::routeQuery(GWBUF* querybuf)
* @brief Free a router session
*
* When a router session has been closed, freeSession can be called to free
* allocated resources.
*
* @param instance The router instance
* @param session The router session
*
*/
static void freeSession(MXS_ROUTER* instance, MXS_ROUTER_SESSION* session)
{ {
RWSplitSession* rses = reinterpret_cast<RWSplitSession*>(session); RWSplit *inst = router;
delete rses; RWSplitSession *rses = this;
}
/**
* @brief The main routing entry point
*
* The routeQuery function will make the routing decision based on the contents
* of the instance, session and the query itself. The query always represents
* a complete MariaDB/MySQL packet because we define the RCAP_TYPE_STMT_INPUT in
* getCapabilities().
*
* @param instance Router instance
* @param router_session Router session associated with the client
* @param querybuf Buffer containing the query
*
* @return 1 on success, 0 on error
*/
static int routeQuery(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, GWBUF *querybuf)
{
RWSplit *inst = (RWSplit *) instance;
RWSplitSession *rses = (RWSplitSession *) router_session;
int rval = 0; int rval = 0;
CHK_CLIENT_RSES(rses); CHK_CLIENT_RSES(rses);
@ -810,17 +754,9 @@ static int routeQuery(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session,
return rval; return rval;
} }
/** void RWSplit::diagnostics(DCB *dcb)
* @brief Diagnostics routine
*
* Print query router statistics to the DCB passed in
*
* @param instance The router instance
* @param dcb The DCB for diagnostic output
*/
static void diagnostics(MXS_ROUTER *instance, DCB *dcb)
{ {
RWSplit *router = (RWSplit *)instance; RWSplit *router = this;
const char *weightby = serviceGetWeightingParameter(router->service()); const char *weightby = serviceGetWeightingParameter(router->service());
double master_pct = 0.0, slave_pct = 0.0, all_pct = 0.0; double master_pct = 0.0, slave_pct = 0.0, all_pct = 0.0;
@ -885,16 +821,9 @@ static void diagnostics(MXS_ROUTER *instance, DCB *dcb)
} }
} }
/** json_t* RWSplit::diagnostics_json() const
* @brief JSON diagnostics routine
*
* @param instance The router instance
* @param dcb The DCB for diagnostic output
*/
static json_t* diagnostics_json(const MXS_ROUTER *instance)
{ {
RWSplit *router = (RWSplit *)instance; const RWSplit *router = this;
json_t* rval = json_object(); json_t* rval = json_object();
json_object_set_new(rval, "use_sql_variables_in", json_object_set_new(rval, "use_sql_variables_in",
@ -1026,21 +955,10 @@ void correct_packet_sequence(GWBUF *buffer, RWSplitSession *rses)
} }
} }
/** void RWSplitSession::clientReply(GWBUF *writebuf, DCB *backend_dcb)
* @brief Client Reply routine
*
* @param instance The router instance
* @param router_session The router session
* @param backend_dcb The backend DCB
* @param queue The Buffer containing the reply
*/
static void clientReply(MXS_ROUTER *instance,
MXS_ROUTER_SESSION *router_session,
GWBUF *writebuf,
DCB *backend_dcb)
{ {
RWSplitSession *rses = (RWSplitSession *)router_session; RWSplitSession *rses = this;
RWSplit *inst = (RWSplit *)instance; RWSplit *inst = router;
DCB *client_dcb = backend_dcb->session->client_dcb; DCB *client_dcb = backend_dcb->session->client_dcb;
CHK_CLIENT_RSES(rses); CHK_CLIENT_RSES(rses);
ss_dassert(!rses->rses_closed); ss_dassert(!rses->rses_closed);
@ -1130,11 +1048,7 @@ static void clientReply(MXS_ROUTER *instance,
} }
} }
uint64_t RWSplit::getCapabilities()
/**
* @brief Get router capabilities
*/
static uint64_t getCapabilities(MXS_ROUTER* instance)
{ {
return RCAP_TYPE_STMT_INPUT | RCAP_TYPE_TRANSACTION_TRACKING | return RCAP_TYPE_STMT_INPUT | RCAP_TYPE_TRANSACTION_TRACKING |
RCAP_TYPE_PACKET_OUTPUT | RCAP_TYPE_SESSION_STATE_TRACKING; RCAP_TYPE_PACKET_OUTPUT | RCAP_TYPE_SESSION_STATE_TRACKING;
@ -1155,16 +1069,12 @@ static uint64_t getCapabilities(MXS_ROUTER* instance)
* ERRACT_REPLY_CLIENT * ERRACT_REPLY_CLIENT
* @param succp Result of action: true if router can continue * @param succp Result of action: true if router can continue
*/ */
static void handleError(MXS_ROUTER *instance, void RWSplitSession::handleError(GWBUF *errmsgbuf, DCB *problem_dcb,
MXS_ROUTER_SESSION *router_session, mxs_error_action_t action, bool *succp)
GWBUF *errmsgbuf,
DCB *problem_dcb,
mxs_error_action_t action,
bool *succp)
{ {
ss_dassert(problem_dcb->dcb_role == DCB_ROLE_BACKEND_HANDLER); ss_dassert(problem_dcb->dcb_role == DCB_ROLE_BACKEND_HANDLER);
RWSplit *inst = (RWSplit *)instance; RWSplit *inst = router;
RWSplitSession *rses = (RWSplitSession *)router_session; RWSplitSession *rses = this;
CHK_CLIENT_RSES(rses); CHK_CLIENT_RSES(rses);
CHK_DCB(problem_dcb); CHK_DCB(problem_dcb);
@ -1288,21 +1198,6 @@ MXS_BEGIN_DECLS
*/ */
MXS_MODULE *MXS_CREATE_MODULE() MXS_MODULE *MXS_CREATE_MODULE()
{ {
static MXS_ROUTER_OBJECT MyObject =
{
createInstance,
newSession,
closeSession,
freeSession,
routeQuery,
diagnostics,
diagnostics_json,
clientReply,
handleError,
getCapabilities,
NULL
};
static MXS_MODULE info = static MXS_MODULE info =
{ {
MXS_MODULE_API_ROUTER, MXS_MODULE_GA, MXS_ROUTER_VERSION, MXS_MODULE_API_ROUTER, MXS_MODULE_GA, MXS_ROUTER_VERSION,
@ -1310,7 +1205,7 @@ MXS_MODULE *MXS_CREATE_MODULE()
"V1.1.0", "V1.1.0",
RCAP_TYPE_STMT_INPUT | RCAP_TYPE_TRANSACTION_TRACKING | RCAP_TYPE_STMT_INPUT | RCAP_TYPE_TRANSACTION_TRACKING |
RCAP_TYPE_PACKET_OUTPUT | RCAP_TYPE_SESSION_STATE_TRACKING, RCAP_TYPE_PACKET_OUTPUT | RCAP_TYPE_SESSION_STATE_TRACKING,
&MyObject, &RWSplit::s_object,
NULL, /* Process init. */ NULL, /* Process init. */
NULL, /* Process finish. */ NULL, /* Process finish. */
NULL, /* Thread init. */ NULL, /* Thread init. */

View File

@ -28,7 +28,7 @@
#include <maxscale/dcb.h> #include <maxscale/dcb.h>
#include <maxscale/hashtable.h> #include <maxscale/hashtable.h>
#include <maxscale/log_manager.h> #include <maxscale/log_manager.h>
#include <maxscale/router.h> #include <maxscale/router.hh>
#include <maxscale/service.h> #include <maxscale/service.h>
#include <maxscale/backend.hh> #include <maxscale/backend.hh>
#include <maxscale/session_command.hh> #include <maxscale/session_command.hh>
@ -228,10 +228,12 @@ public:
uint64_t n_all; /**< Number of stmts sent to all */ uint64_t n_all; /**< Number of stmts sent to all */
}; };
class RWSplitSession;
/** /**
* The per instance data for the router. * The per instance data for the router.
*/ */
class RWSplit class RWSplit: public mxs::Router<RWSplit, RWSplitSession>
{ {
RWSplit(const RWSplit&); RWSplit(const RWSplit&);
RWSplit& operator=(const RWSplit&); RWSplit& operator=(const RWSplit&);
@ -243,9 +245,65 @@ public:
SERVICE* service() const; SERVICE* service() const;
const Config& config() const; const Config& config() const;
Stats& stats(); Stats& stats();
const Stats& stats() const;
int max_slave_count() const; int max_slave_count() const;
bool have_enough_servers() const; bool have_enough_servers() const;
// API functions
/**
* @brief Create a new readwritesplit router instance
*
* An instance of the router is required for each service that uses this router.
* One instance of the router will handle multiple router sessions.
*
* @param service The service this router is being create for
* @param options The options for this query router
*
* @return New router instance or NULL on error
*/
static RWSplit* create(SERVICE* pService, char** pzOptions);
/**
* @brief Create a new session for this router instance
*
* The session is used to store all the data required by the router for a
* particular client connection. The instance of the router that relates to a
* particular service is passed as the first parameter. The second parameter is
* the session that has been created in response to the request from a client
* for a connection. The passed session contains generic information; this
* function creates the session structure that holds router specific data.
* There is often a one to one relationship between sessions and router
* sessions, although it is possible to create configurations where a
* connection is handled by multiple routers, one after another.
*
* @param session The MaxScale session (generic connection data)
*
* @return New router session or NULL on error
*/
RWSplitSession* newSession(MXS_SESSION* pSession);
/**
* @brief Diagnostics routine
*
* Print query router diagnostics to the DCB passed in
*
* @param dcb The DCB for diagnostic output
*/
void diagnostics(DCB* pDcb);
/**
* @brief JSON diagnostics routine
*
* @return The JSON representation of this router instance
*/
json_t* diagnostics_json() const;
/**
* @brief Get router capabilities
*/
uint64_t getCapabilities();
private: private:
SERVICE* m_service; /**< Service where the router belongs*/ SERVICE* m_service; /**< Service where the router belongs*/
Config m_config; Config m_config;

View File

@ -22,6 +22,7 @@ using namespace maxscale;
RWSplitSession::RWSplitSession(RWSplit* instance, MXS_SESSION* session, RWSplitSession::RWSplitSession(RWSplit* instance, MXS_SESSION* session,
const SRWBackendList& backends, const SRWBackendList& backends,
const SRWBackend& master): const SRWBackend& master):
mxs::RouterSession(session),
rses_chk_top(CHK_NUM_ROUTER_SES), rses_chk_top(CHK_NUM_ROUTER_SES),
rses_closed(false), rses_closed(false),
backends(backends), backends(backends),

View File

@ -41,7 +41,7 @@ typedef std::tr1::unordered_map<uint32_t, mxs::SRWBackend> ExecMap;
/** /**
* The client session of a RWSplit instance * The client session of a RWSplit instance
*/ */
class RWSplitSession class RWSplitSession: public mxs::RouterSession
{ {
RWSplitSession(const RWSplitSession&) = delete; RWSplitSession(const RWSplitSession&) = delete;
RWSplitSession& operator=(const RWSplitSession&) = delete; RWSplitSession& operator=(const RWSplitSession&) = delete;
@ -58,6 +58,40 @@ public:
*/ */
static RWSplitSession* create(RWSplit* router, MXS_SESSION* session); static RWSplitSession* create(RWSplit* router, MXS_SESSION* session);
/**
* Called when a client session has been closed.
*/
void close();
/**
* Called when a packet being is routed to the backend. The router should
* forward the packet to the appropriate server(s).
*
* @param pPacket A client packet.
*/
int32_t routeQuery(GWBUF* pPacket);
/**
* Called when a packet is routed to the client. The router should
* forward the packet to the client using `MXS_SESSION_ROUTE_REPLY`.
*
* @param pPacket A client packet.
* @param pBackend The backend the packet is coming from.
*/
void clientReply(GWBUF* pPacket, DCB* pBackend);
/**
*
* @param pMessage The error message.
* @param pProblem The DCB on which the error occurred.
* @param action The context.
* @param pSuccess On output, if false, the session will be terminated.
*/
void handleError(GWBUF* pMessage,
DCB* pProblem,
mxs_error_action_t action,
bool* pSuccess);
// TODO: Make member variables private // TODO: Make member variables private
skygw_chk_t rses_chk_top; skygw_chk_t rses_chk_top;
bool rses_closed; /**< true when closeSession is called */ bool rses_closed; /**< true when closeSession is called */