diff --git a/server/modules/routing/readconnroute/readconnroute.cc b/server/modules/routing/readconnroute/readconnroute.cc index c32d58831..36f5f6b25 100644 --- a/server/modules/routing/readconnroute/readconnroute.cc +++ b/server/modules/routing/readconnroute/readconnroute.cc @@ -53,28 +53,6 @@ #include #include -/* The router entry points */ -static MXS_ROUTER* createInstance(SERVICE* service, MXS_CONFIG_PARAMETER* params); -static MXS_ROUTER_SESSION* newSession(MXS_ROUTER* instance, MXS_SESSION* session); -static void closeSession(MXS_ROUTER* instance, MXS_ROUTER_SESSION* router_session); -static void freeSession(MXS_ROUTER* instance, MXS_ROUTER_SESSION* router_session); -static int routeQuery(MXS_ROUTER* instance, MXS_ROUTER_SESSION* router_session, GWBUF* queue); -static void diagnostics(MXS_ROUTER* instance, DCB* dcb); -static json_t* diagnostics_json(const MXS_ROUTER* instance); -static void clientReply(MXS_ROUTER* instance, - MXS_ROUTER_SESSION* router_session, - GWBUF* queue, - DCB* backend_dcb); -static void handleError(MXS_ROUTER* instance, - MXS_ROUTER_SESSION* router_session, - GWBUF* errbuf, - DCB* problem_dcb, - mxs_error_action_t action, - bool* succp); -static uint64_t getCapabilities(MXS_ROUTER* instance); -static bool configureInstance(MXS_ROUTER* instance, MXS_CONFIG_PARAMETER* params); -static SERVER_REF* get_root_master(SERVER_REF* servers); - /** * The module entry point routine. It is this routine that * must populate the structure that is referred to as the @@ -87,22 +65,6 @@ extern "C" MXS_MODULE* MXS_CREATE_MODULE() { MXS_NOTICE("Initialise readconnroute router module."); - static MXS_ROUTER_OBJECT MyObject = - { - createInstance, - newSession, - closeSession, - freeSession, - routeQuery, - diagnostics, - diagnostics_json, - clientReply, - handleError, - getCapabilities, - nullptr, - configureInstance - }; - static MXS_MODULE info = { MXS_MODULE_API_ROUTER, @@ -111,7 +73,7 @@ extern "C" MXS_MODULE* MXS_CREATE_MODULE() "A connection based router to load balance based on connections", "V2.0.0", RCAP_TYPE_RUNTIME_CONFIG, - &MyObject, + &ReadConn::s_object, nullptr, /* Process init. */ nullptr, /* Process finish. */ nullptr, /* Thread init. */ @@ -124,14 +86,35 @@ extern "C" MXS_MODULE* MXS_CREATE_MODULE() return &info; } -static inline void free_readconn_instance(ReadConn* router) +/* + * This routine returns the master server from a MariaDB replication tree. The server must be + * running, not in maintenance and have the master bit set. If multiple masters are found, + * the one with the highest weight is chosen. + * + * @param servers The list of servers + * + * @return The Master server + */ +static SERVER_REF* get_root_master(SERVER_REF* servers) { - delete router; + SERVER_REF* master_host = nullptr; + for (SERVER_REF* ref = servers; ref; ref = ref->next) + { + if (ref->active && ref->server->is_master()) + { + // No master found yet or this one has better weight. + if (!master_host || ref->server_weight > master_host->server_weight) + { + master_host = ref; + } + } + } + return master_host; } -static bool configureInstance(MXS_ROUTER* instance, MXS_CONFIG_PARAMETER* params) +bool ReadConn::configure(MXS_CONFIG_PARAMETER* params) { - ReadConn* inst = static_cast(instance); + ReadConn* inst = this; uint64_t bitmask = 0; uint64_t bitvalue = 0; bool ok = true; @@ -189,6 +172,12 @@ static bool configureInstance(MXS_ROUTER* instance, MXS_CONFIG_PARAMETER* params return ok; } + +ReadConn::ReadConn(SERVICE* service) + : mxs::Router(service) +{ +} + /** * Create an instance of the router for a particular service * within the gateway. @@ -198,24 +187,40 @@ static bool configureInstance(MXS_ROUTER* instance, MXS_CONFIG_PARAMETER* params * * @return The instance data for this new instance */ -static MXS_ROUTER* createInstance(SERVICE* service, MXS_CONFIG_PARAMETER* params) +ReadConn* ReadConn::create(SERVICE* service, MXS_CONFIG_PARAMETER* params) { - ReadConn* inst = new(std::nothrow) ReadConn; + ReadConn* inst = new(std::nothrow) ReadConn(service); - if (inst) + if (inst && !inst->configure(params)) { - - inst->service = service; - inst->bitmask_and_bitvalue = 0; - - if (!configureInstance(static_cast(inst), params)) - { - free_readconn_instance(inst); - inst = nullptr; - } + delete inst; + inst = nullptr; } - return static_cast(inst); + return inst; +} + +ReadConnSession::ReadConnSession(ReadConn* inst, MXS_SESSION* session, SERVER_REF* backend, DCB* dcb, + uint32_t bitmask, uint32_t bitvalue) + : mxs::RouterSession(session) + , instance(inst) + , backend(backend) + , backend_dcb(dcb) + , client_dcb(session->client_dcb) + , bitmask(bitmask) + , bitvalue(bitvalue) +{ +} + +ReadConnSession::~ReadConnSession() +{ + mxb::atomic::add(&backend->connections, -1, mxb::atomic::RELAXED); +} + +void ReadConnSession::close() +{ + mxb_assert(backend_dcb); + dcb_close(backend_dcb); } /** @@ -225,33 +230,18 @@ static MXS_ROUTER* createInstance(SERVICE* service, MXS_CONFIG_PARAMETER* params * @param session The session itself * @return Session specific data for this session */ -static MXS_ROUTER_SESSION* newSession(MXS_ROUTER* instance, MXS_SESSION* session) +ReadConnSession* ReadConn::newSession(MXS_SESSION* session) { - ReadConn* inst = static_cast(instance); - - MXS_DEBUG("%lu [newSession] new router session with session " - "%p, and inst %p.", - pthread_self(), - session, - inst); - - ReadConnSession* client_rses = new(std::nothrow) ReadConnSession; - - if (!client_rses) - { - return nullptr; - } - - client_rses->client_dcb = session->client_dcb; + ReadConn* inst = this; uint64_t mask = atomic_load_uint64(&inst->bitmask_and_bitvalue); - client_rses->bitmask = mask; - client_rses->bitvalue = mask >> 32; + uint32_t bitmask = mask; + uint32_t bitvalue = mask >> 32; /** * Find the Master host from available servers */ - SERVER_REF* master_host = get_root_master(inst->service->dbref); + SERVER_REF* master_host = get_root_master(inst->m_pService->dbref); /** * Find a backend server to connect to. This is the extent of the @@ -272,7 +262,7 @@ static MXS_ROUTER_SESSION* newSession(MXS_ROUTER* instance, MXS_SESSION* session * become the new candidate. This has the effect of spreading the * connections over different servers during periods of very low load. */ - for (SERVER_REF* ref = inst->service->dbref; ref; ref = ref->next) + for (SERVER_REF* ref = inst->m_pService->dbref; ref; ref = ref->next) { if (!server_ref_is_active(ref) || ref->server->is_in_maint()) { @@ -280,13 +270,12 @@ static MXS_ROUTER_SESSION* newSession(MXS_ROUTER* instance, MXS_SESSION* session } /* Check server status bits against bitvalue from router_options */ - if (ref && ref->server->is_usable() - && (ref->server->status & client_rses->bitmask & client_rses->bitvalue)) + if (ref && ref->server->is_usable() && (ref->server->status & bitmask & bitvalue)) { if (master_host) { if (ref == master_host - && (client_rses->bitvalue & (SERVER_SLAVE | SERVER_MASTER)) == SERVER_SLAVE) + && (bitvalue & (SERVER_SLAVE | SERVER_MASTER)) == SERVER_SLAVE) { /* Skip root master here, as it could also be slave of an external server that * is not in the configuration. Intermediate masters (Relay Servers) are also @@ -295,7 +284,7 @@ static MXS_ROUTER_SESSION* newSession(MXS_ROUTER* instance, MXS_SESSION* session continue; } - if (ref == master_host && client_rses->bitvalue == SERVER_MASTER) + if (ref == master_host && bitvalue == SERVER_MASTER) { /* If option is "master" return only the root Master as there could be * intermediate masters (Relay Servers) and they must not be selected. @@ -305,7 +294,7 @@ static MXS_ROUTER_SESSION* newSession(MXS_ROUTER* instance, MXS_SESSION* session break; } } - else if (client_rses->bitvalue == SERVER_MASTER) + else if (bitvalue == SERVER_MASTER) { /* Master_host is nullptr, no master server. If requested router_option is 'master' * candidate will be nullptr. @@ -350,33 +339,33 @@ static MXS_ROUTER_SESSION* newSession(MXS_ROUTER* instance, MXS_SESSION* session // // We must do that so that readconnroute in MaxScale 2.2 will again behave // the same way as it did up until 2.1.12. - if (client_rses->bitvalue & SERVER_SLAVE) + if (bitvalue & SERVER_SLAVE) { - client_rses->bitvalue |= SERVER_MASTER; + bitvalue |= SERVER_MASTER; } } else { MXS_ERROR("Failed to create new routing session. Couldn't find eligible" " candidate server. Freeing allocated resources."); - MXS_FREE(client_rses); return nullptr; } } - /* - * We now have the server with the least connections. - * Bump the connection count for this server - */ - client_rses->backend = candidate; - /** Open the backend connection */ - client_rses->backend_dcb = dcb_connect(candidate->server, session, candidate->server->protocol().c_str()); + DCB* backend_dcb = dcb_connect(candidate->server, session, candidate->server->protocol().c_str()); - if (!client_rses->backend_dcb) + if (!backend_dcb) { /** The failure is reported in dcb_connect() */ - delete client_rses; + return nullptr; + } + + ReadConnSession* client_rses = new(std::nothrow) ReadConnSession(inst, session, candidate, backend_dcb, + bitmask, bitvalue); + + if (!client_rses) + { return nullptr; } @@ -388,51 +377,7 @@ static MXS_ROUTER_SESSION* newSession(MXS_ROUTER* instance, MXS_SESSION* session candidate->server->name(), candidate->connections); - return static_cast(client_rses); -} - -/** - * @node Unlink from backend server, unlink from router's connection list, - * and free memory of a router client session. - * - * Parameters: - * @param router - - * - * - * @param router_cli_ses - - * - * - * @return void - * - * - * @details (write detailed description here) - * - */ -static void freeSession(MXS_ROUTER* router_instance, MXS_ROUTER_SESSION* router_client_ses) -{ - ReadConn* router = static_cast(router_instance); - ReadConnSession* router_cli_ses = static_cast(router_client_ses); - - MXB_AT_DEBUG(int prev_val = ) mxb::atomic::add(&router_cli_ses->backend->connections, - -1, - mxb::atomic::RELAXED); - mxb_assert(prev_val > 0); - - delete router_cli_ses; -} - -/** - * Close a session with the router, this is the mechanism - * by which a router may cleanup data structure etc. - * - * @param instance The router instance data - * @param router_session The session being closed - */ -static void closeSession(MXS_ROUTER* instance, MXS_ROUTER_SESSION* router_session) -{ - ReadConnSession* router_cli_ses = static_cast(router_session); - mxb_assert(router_cli_ses->backend_dcb); - dcb_close(router_cli_ses->backend_dcb); + return client_rses; } /** Log routing failure due to closed session */ @@ -464,7 +409,7 @@ static void log_closed_session(mxs_mysql_cmd_t mysql_command, SERVER_REF* ref) * * @return True if the backend connection is still valid */ -static inline bool connection_is_valid(ReadConn* inst, ReadConnSession* router_cli_ses) +bool ReadConn::connection_is_valid(ReadConnSession* router_cli_ses) { bool rval = false; @@ -481,7 +426,7 @@ static inline bool connection_is_valid(ReadConn* inst, ReadConnSession* router_c if ((router_cli_ses->bitvalue == SERVER_MASTER) && router_cli_ses->backend->active) { // If we're using an active master server, verify that it is still a master - rval = router_cli_ses->backend == get_root_master(inst->service->dbref); + rval = router_cli_ses->backend == get_root_master(m_pService->dbref); } else { @@ -504,15 +449,14 @@ static inline bool connection_is_valid(ReadConn* inst, ReadConnSession* router_c * This is simply a case of sending it to the connection that was * chosen when we started the client session. * - * @param instance The router instance - * @param router_session The router session returned from the newSession call * @param queue The queue of data buffers to route + * * @return if succeed 1, otherwise 0 */ -static int routeQuery(MXS_ROUTER* instance, MXS_ROUTER_SESSION* router_session, GWBUF* queue) +int ReadConnSession::routeQuery(GWBUF* queue) { - ReadConn* inst = static_cast(instance); - ReadConnSession* router_cli_ses = static_cast(router_session); + ReadConn* inst = instance; + ReadConnSession* router_cli_ses = this; int rc = 0; MySQLProtocol* proto = static_cast(router_cli_ses->client_dcb->protocol); mxs_mysql_cmd_t mysql_command = proto->current_command; @@ -526,7 +470,7 @@ static int routeQuery(MXS_ROUTER* instance, MXS_ROUTER_SESSION* router_session, mxb_assert(backend_dcb); char* trc = nullptr; - if (!connection_is_valid(inst, router_cli_ses)) + if (!inst->connection_is_valid(router_cli_ses)) { log_closed_session(mysql_command, router_cli_ses->backend); gwbuf_free(queue); @@ -569,17 +513,17 @@ static int routeQuery(MXS_ROUTER* instance, MXS_ROUTER_SESSION* router_session, * @param instance Instance of the router * @param dcb DCB to send diagnostics to */ -static void diagnostics(MXS_ROUTER* router, DCB* dcb) +void ReadConn::diagnostics(DCB* dcb) { - ReadConn* router_inst = static_cast(router); - const char* weightby = serviceGetWeightingParameter(router_inst->service); + ReadConn* router_inst = this; + const char* weightby = serviceGetWeightingParameter(router_inst->m_pService); dcb_printf(dcb, "\tNumber of router sessions: %d\n", router_inst->stats.n_sessions); dcb_printf(dcb, "\tCurrent no. of router sessions: %d\n", - router_inst->service->stats.n_current); + router_inst->m_pService->stats.n_current); dcb_printf(dcb, "\tNumber of queries forwarded: %d\n", router_inst->stats.n_queries); @@ -591,7 +535,7 @@ static void diagnostics(MXS_ROUTER* router, DCB* dcb) weightby); dcb_printf(dcb, "\t\tServer Target %% Connections\n"); - for (SERVER_REF* ref = router_inst->service->dbref; ref; ref = ref->next) + for (SERVER_REF* ref = router_inst->m_pService->dbref; ref; ref = ref->next) { dcb_printf(dcb, "\t\t%-20s %3.1f%% %d\n", @@ -608,16 +552,16 @@ static void diagnostics(MXS_ROUTER* router, DCB* dcb) * @param instance Instance of the router * @param dcb DCB to send diagnostics to */ -static json_t* diagnostics_json(const MXS_ROUTER* router) +json_t* ReadConn::diagnostics_json() const { - const ReadConn* router_inst = static_cast(router); + const ReadConn* router_inst = this; json_t* rval = json_object(); json_object_set_new(rval, "connections", json_integer(router_inst->stats.n_sessions)); - json_object_set_new(rval, "current_connections", json_integer(router_inst->service->stats.n_current)); + json_object_set_new(rval, "current_connections", json_integer(router_inst->m_pService->stats.n_current)); json_object_set_new(rval, "queries", json_integer(router_inst->stats.n_queries)); - const char* weightby = serviceGetWeightingParameter(router_inst->service); + const char* weightby = serviceGetWeightingParameter(router_inst->m_pService); if (*weightby) { @@ -632,15 +576,10 @@ static json_t* diagnostics_json(const MXS_ROUTER* router) * * The routine will reply to client data from backend server * - * @param instance The router instance - * @param router_session The router session * @param backend_dcb The backend DCB * @param queue The GWBUF with reply data */ -static void clientReply(MXS_ROUTER* instance, - MXS_ROUTER_SESSION* router_session, - GWBUF* queue, - DCB* backend_dcb) +void ReadConnSession::clientReply(GWBUF* queue, DCB* backend_dcb) { mxb_assert(backend_dcb->session->client_dcb); MXS_SESSION_ROUTE_REPLY(backend_dcb->session, queue); @@ -651,20 +590,12 @@ static void clientReply(MXS_ROUTER* instance, * * The routine will handle errors that occurred in writes. * - * @param instance The router instance - * @param router_session The router session * @param message The error message to reply * @param problem_dcb The DCB related to the error * @param action The action: ERRACT_NEW_CONNECTION or ERRACT_REPLY_CLIENT * @param succp Result of action: true if router can continue - * */ -static void handleError(MXS_ROUTER* instance, - MXS_ROUTER_SESSION* router_session, - GWBUF* errbuf, - DCB* problem_dcb, - mxs_error_action_t action, - bool* succp) +void ReadConnSession::handleError(GWBUF* errbuf, DCB* problem_dcb, mxs_error_action_t action, bool* succp) { mxb_assert(problem_dcb->role == DCB::Role::BACKEND); @@ -676,34 +607,7 @@ static void handleError(MXS_ROUTER* instance, *succp = false; } -static uint64_t getCapabilities(MXS_ROUTER* instance) +uint64_t ReadConn::getCapabilities() { return RCAP_TYPE_RUNTIME_CONFIG; } - -/* - * This routine returns the master server from a MariaDB replication tree. The server must be - * running, not in maintenance and have the master bit set. If multiple masters are found, - * the one with the highest weight is chosen. - * - * @param servers The list of servers - * @return The Master server - * - */ - -static SERVER_REF* get_root_master(SERVER_REF* servers) -{ - SERVER_REF* master_host = nullptr; - for (SERVER_REF* ref = servers; ref; ref = ref->next) - { - if (ref->active && ref->server->is_master()) - { - // No master found yet or this one has better weight. - if (!master_host || ref->server_weight > master_host->server_weight) - { - master_host = ref; - } - } - } - return master_host; -} diff --git a/server/modules/routing/readconnroute/readconnroute.hh b/server/modules/routing/readconnroute/readconnroute.hh index 29b4f9598..3b910961a 100644 --- a/server/modules/routing/readconnroute/readconnroute.hh +++ b/server/modules/routing/readconnroute/readconnroute.hh @@ -23,11 +23,28 @@ #include #include +class ReadConn; + /** * The client session structure used within this router. */ -struct ReadConnSession : MXS_ROUTER_SESSION +class ReadConnSession : public mxs::RouterSession { +public: + ReadConnSession(ReadConn* inst, MXS_SESSION* session, SERVER_REF* backend, DCB* dcb, + uint32_t bitmask, uint32_t bitvalue); + ~ReadConnSession(); + + int routeQuery(GWBUF*); + + void close(); + void clientReply(GWBUF* pPacket, DCB* pBackend); + void handleError(GWBUF* pMessage, + DCB* pProblem, + mxs_error_action_t action, + bool* pSuccess); + + ReadConn* instance; SERVER_REF* backend; /*< Backend used by the client session */ DCB* backend_dcb;/*< DCB Connection to the backend */ DCB* client_dcb; /**< Client DCB */ @@ -40,16 +57,28 @@ struct ReadConnSession : MXS_ROUTER_SESSION */ struct Stats { - int n_sessions; /*< Number sessions created */ - int n_queries; /*< Number of queries forwarded */ + int n_sessions = 0; /*< Number sessions created */ + int n_queries = 0; /*< Number of queries forwarded */ }; /** * The per instance data for the router. */ -struct ReadConn : public MXS_ROUTER +class ReadConn : public mxs::Router { - SERVICE* service; /*< Pointer to the service using this router */ - uint64_t bitmask_and_bitvalue; /*< Lower 32-bits for bitmask and upper for bitvalue */ - Stats stats; /*< Statistics for this router */ +public: + static ReadConn* create(SERVICE* service, MXS_CONFIG_PARAMETER* params); + + ReadConnSession* newSession(MXS_SESSION* pSession); + void diagnostics(DCB* pDcb); + json_t* diagnostics_json() const; + uint64_t getCapabilities(); + bool configure(MXS_CONFIG_PARAMETER* params); + bool connection_is_valid(ReadConnSession* router_cli_ses); + + uint64_t bitmask_and_bitvalue = 0; /*< Lower 32-bits for bitmask and upper for bitvalue */ + Stats stats; /*< Statistics for this router */ + +private: + ReadConn(SERVICE* service); };