diff --git a/server/modules/routing/readwritesplit/readwritesplit.cc b/server/modules/routing/readwritesplit/readwritesplit.cc index e472b50c0..3ad79d8e7 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.cc +++ b/server/modules/routing/readwritesplit/readwritesplit.cc @@ -55,8 +55,6 @@ using namespace maxscale; * The functions that implement the router module API */ -static int routeQuery(MXS_ROUTER *instance, MXS_ROUTER_SESSION *session, GWBUF *queue); - static bool rwsplit_process_router_options(RWSplit *router, char **options); static void handle_error_reply_client(MXS_SESSION *ses, RWSplitSession *rses, @@ -482,7 +480,7 @@ static bool route_stored_query(RWSplitSession *rses) uint8_t cmd = mxs_mysql_get_command(query_queue); mysql_protocol_set_current_command(rses->client_dcb, (mxs_mysql_cmd_t)cmd); - if (!routeQuery((MXS_ROUTER*)rses->router, (MXS_ROUTER_SESSION*)rses, query_queue)) + if (!rses->routeQuery(query_queue)) { rval = false; MXS_ERROR("Failed to route queued query."); @@ -541,6 +539,7 @@ void check_and_log_backend_state(const SRWBackend& backend, DCB* problem_dcb) } RWSplit::RWSplit(SERVICE* service, const Config& config): + mxs::Router(service), m_service(service), m_config(config) { @@ -565,6 +564,10 @@ Stats& RWSplit::stats() return m_stats; } +const Stats& RWSplit::stats() const +{ + return m_stats; +} int RWSplit::max_slave_count() const { int router_nservers = m_service->n_dbref; @@ -624,18 +627,8 @@ bool RWSplit::have_enough_servers() const * API function definitions */ -/** - * @brief Create a new readwritesplit router instance - * - * An instance of the router is required for each service that uses this router. - * One instance of the router will handle multiple router sessions. - * - * @param service The service this router is being create for - * @param options The options for this query router - * - * @return New router instance or NULL on error - */ -static MXS_ROUTER* createInstance(SERVICE *service, char **options) + +RWSplit* RWSplit::create(SERVICE *service, char **options) { MXS_CONFIG_PARAMETER* params = service->svc_config_param; @@ -653,33 +646,14 @@ static MXS_ROUTER* createInstance(SERVICE *service, char **options) config.max_sescmd_history = 0; } - return (MXS_ROUTER*)new (std::nothrow) RWSplit(service, config); + return new (std::nothrow) RWSplit(service, config); } -/** - * @brief Create a new session for this router instance - * - * The session is used to store all the data required by the router for a - * particular client connection. The instance of the router that relates to a - * particular service is passed as the first parameter. The second parameter is - * the session that has been created in response to the request from a client - * for a connection. The passed session contains generic information; this - * function creates the session structure that holds router specific data. - * There is often a one to one relationship between sessions and router - * sessions, although it is possible to create configurations where a - * connection is handled by multiple routers, one after another. - * - * @param instance The router instance data - * @param session The MaxScale session (generic connection data) - * - * @return New router session or NULL on error - */ -static MXS_ROUTER_SESSION* newSession(MXS_ROUTER *router_inst, MXS_SESSION *session) +RWSplitSession* RWSplit::newSession(MXS_SESSION *session) { - RWSplit* router = reinterpret_cast(router_inst); RWSplitSession* rses = NULL; - MXS_EXCEPTION_GUARD(rses = RWSplitSession::create(router, session)); - return reinterpret_cast(rses); + MXS_EXCEPTION_GUARD(rses = RWSplitSession::create(this, session)); + return rses; } /** @@ -693,9 +667,9 @@ static MXS_ROUTER_SESSION* newSession(MXS_ROUTER *router_inst, MXS_SESSION *sess * @param instance The router instance data * @param session The router session being closed */ -static void closeSession(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session) +void RWSplitSession::close() { - RWSplitSession *router_cli_ses = (RWSplitSession *)router_session; + RWSplitSession *router_cli_ses = this; CHK_CLIENT_RSES(router_cli_ses); if (!router_cli_ses->rses_closed) @@ -721,40 +695,10 @@ static void closeSession(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_sessio } } -/** - * @brief Free a router session - * - * When a router session has been closed, freeSession can be called to free - * allocated resources. - * - * @param instance The router instance - * @param session The router session - * - */ -static void freeSession(MXS_ROUTER* instance, MXS_ROUTER_SESSION* session) +int32_t RWSplitSession::routeQuery(GWBUF* querybuf) { - RWSplitSession* rses = reinterpret_cast(session); - delete rses; -} - -/** - * @brief The main routing entry point - * - * The routeQuery function will make the routing decision based on the contents - * of the instance, session and the query itself. The query always represents - * a complete MariaDB/MySQL packet because we define the RCAP_TYPE_STMT_INPUT in - * getCapabilities(). - * - * @param instance Router instance - * @param router_session Router session associated with the client - * @param querybuf Buffer containing the query - * - * @return 1 on success, 0 on error - */ -static int routeQuery(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, GWBUF *querybuf) -{ - RWSplit *inst = (RWSplit *) instance; - RWSplitSession *rses = (RWSplitSession *) router_session; + RWSplit *inst = router; + RWSplitSession *rses = this; int rval = 0; CHK_CLIENT_RSES(rses); @@ -810,17 +754,9 @@ static int routeQuery(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, return rval; } -/** - * @brief Diagnostics routine - * - * Print query router statistics to the DCB passed in - * - * @param instance The router instance - * @param dcb The DCB for diagnostic output - */ -static void diagnostics(MXS_ROUTER *instance, DCB *dcb) +void RWSplit::diagnostics(DCB *dcb) { - RWSplit *router = (RWSplit *)instance; + RWSplit *router = this; const char *weightby = serviceGetWeightingParameter(router->service()); double master_pct = 0.0, slave_pct = 0.0, all_pct = 0.0; @@ -885,16 +821,9 @@ static void diagnostics(MXS_ROUTER *instance, DCB *dcb) } } -/** - * @brief JSON diagnostics routine - * - * @param instance The router instance - * @param dcb The DCB for diagnostic output - */ -static json_t* diagnostics_json(const MXS_ROUTER *instance) +json_t* RWSplit::diagnostics_json() const { - RWSplit *router = (RWSplit *)instance; - + const RWSplit *router = this; json_t* rval = json_object(); json_object_set_new(rval, "use_sql_variables_in", @@ -1026,21 +955,10 @@ void correct_packet_sequence(GWBUF *buffer, RWSplitSession *rses) } } -/** - * @brief Client Reply routine - * - * @param instance The router instance - * @param router_session The router session - * @param backend_dcb The backend DCB - * @param queue The Buffer containing the reply - */ -static void clientReply(MXS_ROUTER *instance, - MXS_ROUTER_SESSION *router_session, - GWBUF *writebuf, - DCB *backend_dcb) +void RWSplitSession::clientReply(GWBUF *writebuf, DCB *backend_dcb) { - RWSplitSession *rses = (RWSplitSession *)router_session; - RWSplit *inst = (RWSplit *)instance; + RWSplitSession *rses = this; + RWSplit *inst = router; DCB *client_dcb = backend_dcb->session->client_dcb; CHK_CLIENT_RSES(rses); ss_dassert(!rses->rses_closed); @@ -1130,11 +1048,7 @@ static void clientReply(MXS_ROUTER *instance, } } - -/** - * @brief Get router capabilities - */ -static uint64_t getCapabilities(MXS_ROUTER* instance) +uint64_t RWSplit::getCapabilities() { return RCAP_TYPE_STMT_INPUT | RCAP_TYPE_TRANSACTION_TRACKING | RCAP_TYPE_PACKET_OUTPUT | RCAP_TYPE_SESSION_STATE_TRACKING; @@ -1155,16 +1069,12 @@ static uint64_t getCapabilities(MXS_ROUTER* instance) * ERRACT_REPLY_CLIENT * @param succp Result of action: true if router can continue */ -static void handleError(MXS_ROUTER *instance, - MXS_ROUTER_SESSION *router_session, - GWBUF *errmsgbuf, - DCB *problem_dcb, - mxs_error_action_t action, - bool *succp) +void RWSplitSession::handleError(GWBUF *errmsgbuf, DCB *problem_dcb, + mxs_error_action_t action, bool *succp) { ss_dassert(problem_dcb->dcb_role == DCB_ROLE_BACKEND_HANDLER); - RWSplit *inst = (RWSplit *)instance; - RWSplitSession *rses = (RWSplitSession *)router_session; + RWSplit *inst = router; + RWSplitSession *rses = this; CHK_CLIENT_RSES(rses); CHK_DCB(problem_dcb); @@ -1288,21 +1198,6 @@ MXS_BEGIN_DECLS */ MXS_MODULE *MXS_CREATE_MODULE() { - static MXS_ROUTER_OBJECT MyObject = - { - createInstance, - newSession, - closeSession, - freeSession, - routeQuery, - diagnostics, - diagnostics_json, - clientReply, - handleError, - getCapabilities, - NULL - }; - static MXS_MODULE info = { MXS_MODULE_API_ROUTER, MXS_MODULE_GA, MXS_ROUTER_VERSION, @@ -1310,7 +1205,7 @@ MXS_MODULE *MXS_CREATE_MODULE() "V1.1.0", RCAP_TYPE_STMT_INPUT | RCAP_TYPE_TRANSACTION_TRACKING | RCAP_TYPE_PACKET_OUTPUT | RCAP_TYPE_SESSION_STATE_TRACKING, - &MyObject, + &RWSplit::s_object, NULL, /* Process init. */ NULL, /* Process finish. */ NULL, /* Thread init. */ diff --git a/server/modules/routing/readwritesplit/readwritesplit.hh b/server/modules/routing/readwritesplit/readwritesplit.hh index d900ea163..ce5796518 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.hh +++ b/server/modules/routing/readwritesplit/readwritesplit.hh @@ -28,7 +28,7 @@ #include #include #include -#include +#include #include #include #include @@ -228,10 +228,12 @@ public: uint64_t n_all; /**< Number of stmts sent to all */ }; +class RWSplitSession; + /** * The per instance data for the router. */ -class RWSplit +class RWSplit: public mxs::Router { RWSplit(const RWSplit&); RWSplit& operator=(const RWSplit&); @@ -243,9 +245,65 @@ public: SERVICE* service() const; const Config& config() const; Stats& stats(); + const Stats& stats() const; int max_slave_count() const; bool have_enough_servers() const; + // API functions + + /** + * @brief Create a new readwritesplit router instance + * + * An instance of the router is required for each service that uses this router. + * One instance of the router will handle multiple router sessions. + * + * @param service The service this router is being create for + * @param options The options for this query router + * + * @return New router instance or NULL on error + */ + static RWSplit* create(SERVICE* pService, char** pzOptions); + + /** + * @brief Create a new session for this router instance + * + * The session is used to store all the data required by the router for a + * particular client connection. The instance of the router that relates to a + * particular service is passed as the first parameter. The second parameter is + * the session that has been created in response to the request from a client + * for a connection. The passed session contains generic information; this + * function creates the session structure that holds router specific data. + * There is often a one to one relationship between sessions and router + * sessions, although it is possible to create configurations where a + * connection is handled by multiple routers, one after another. + * + * @param session The MaxScale session (generic connection data) + * + * @return New router session or NULL on error + */ + RWSplitSession* newSession(MXS_SESSION* pSession); + + /** + * @brief Diagnostics routine + * + * Print query router diagnostics to the DCB passed in + * + * @param dcb The DCB for diagnostic output + */ + void diagnostics(DCB* pDcb); + + /** + * @brief JSON diagnostics routine + * + * @return The JSON representation of this router instance + */ + json_t* diagnostics_json() const; + + /** + * @brief Get router capabilities + */ + uint64_t getCapabilities(); + private: SERVICE* m_service; /**< Service where the router belongs*/ Config m_config; diff --git a/server/modules/routing/readwritesplit/rwsplitsession.cc b/server/modules/routing/readwritesplit/rwsplitsession.cc index ff2ea438d..8385446e0 100644 --- a/server/modules/routing/readwritesplit/rwsplitsession.cc +++ b/server/modules/routing/readwritesplit/rwsplitsession.cc @@ -22,6 +22,7 @@ using namespace maxscale; RWSplitSession::RWSplitSession(RWSplit* instance, MXS_SESSION* session, const SRWBackendList& backends, const SRWBackend& master): + mxs::RouterSession(session), rses_chk_top(CHK_NUM_ROUTER_SES), rses_closed(false), backends(backends), diff --git a/server/modules/routing/readwritesplit/rwsplitsession.hh b/server/modules/routing/readwritesplit/rwsplitsession.hh index 0d71688d3..d80b63033 100644 --- a/server/modules/routing/readwritesplit/rwsplitsession.hh +++ b/server/modules/routing/readwritesplit/rwsplitsession.hh @@ -41,7 +41,7 @@ typedef std::tr1::unordered_map ExecMap; /** * The client session of a RWSplit instance */ -class RWSplitSession +class RWSplitSession: public mxs::RouterSession { RWSplitSession(const RWSplitSession&) = delete; RWSplitSession& operator=(const RWSplitSession&) = delete; @@ -58,6 +58,40 @@ public: */ static RWSplitSession* create(RWSplit* router, MXS_SESSION* session); + /** + * Called when a client session has been closed. + */ + void close(); + + /** + * Called when a packet being is routed to the backend. The router should + * forward the packet to the appropriate server(s). + * + * @param pPacket A client packet. + */ + int32_t routeQuery(GWBUF* pPacket); + + /** + * Called when a packet is routed to the client. The router should + * forward the packet to the client using `MXS_SESSION_ROUTE_REPLY`. + * + * @param pPacket A client packet. + * @param pBackend The backend the packet is coming from. + */ + void clientReply(GWBUF* pPacket, DCB* pBackend); + + /** + * + * @param pMessage The error message. + * @param pProblem The DCB on which the error occurred. + * @param action The context. + * @param pSuccess On output, if false, the session will be terminated. + */ + void handleError(GWBUF* pMessage, + DCB* pProblem, + mxs_error_action_t action, + bool* pSuccess); + // TODO: Make member variables private skygw_chk_t rses_chk_top; bool rses_closed; /**< true when closeSession is called */