From 05fef4b23e6168ae2b5edb5b2c84d15b84fc340b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Thu, 15 Jun 2017 12:16:17 +0300 Subject: [PATCH] Duplicate connections to the Backend class Creating duplicate connections using the Backend class allows the connections and their handling to be tested at the same time that the old system is in place. This should make it somewhat easier to grasp what changes and where when the new implementation is taken into use. --- .../routing/readwritesplit/readwritesplit.cc | 33 +++++++++++++++-- .../routing/readwritesplit/readwritesplit.hh | 37 +++++++++++++++++++ .../readwritesplit/rwsplit_internal.hh | 11 +++--- .../readwritesplit/rwsplit_route_stmt.cc | 4 ++ .../readwritesplit/rwsplit_select_backends.cc | 23 ++++++++++++ 5 files changed, 100 insertions(+), 8 deletions(-) diff --git a/server/modules/routing/readwritesplit/readwritesplit.cc b/server/modules/routing/readwritesplit/readwritesplit.cc index 1725c48f6..f9a129695 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.cc +++ b/server/modules/routing/readwritesplit/readwritesplit.cc @@ -542,7 +542,8 @@ static bool reroute_stored_statement(ROUTER_CLIENT_SES *rses, backend_ref_t *old } } - if (!success && rses->rses_master_ref && BREF_IS_IN_USE(rses->rses_master_ref)) + if (!success && rses->rses_master_ref && BREF_IS_IN_USE(rses->rses_master_ref) && + rses->current_master && rses->current_master->in_use()) { /** * Either we failed to write to the slave or no valid slave was found. @@ -672,7 +673,7 @@ static bool handle_error_new_connection(ROUTER_INSTANCE *inst, myrses->rses_nbackends, max_nslaves, max_slave_rlag, myrses->rses_config.slave_selection_criteria, - ses, inst, true); + ses, inst, myrses, true); } return succp; @@ -763,6 +764,8 @@ static bool create_backends(ROUTER_CLIENT_SES *rses, backend_ref_t** dest, int* { if (sref->active) { + rses->backends.push_back(SRWBackend(new RWBackend(sref))); + backend_ref[i].bref_chk_top = CHK_NUM_BACKEND_REF; backend_ref[i].bref_chk_tail = CHK_NUM_BACKEND_REF; backend_ref[i].bref_sescmd_cur.scmd_cur_chk_top = CHK_NUM_SESCMD_CUR; @@ -919,6 +922,20 @@ bool reply_is_complete(backend_ref_t* bref, GWBUF *buffer) return bref->reply_state == REPLY_STATE_DONE; } +SRWBackend get_backend_from_bref(ROUTER_CLIENT_SES* rses, backend_ref_t* bref) +{ + for (SRWBackendList::iterator it = rses->backends.begin(); + it != rses->backends.end(); it++) + { + if ((*it)->backend() == bref->ref) + { + return *it; + } + } + + return SRWBackend(); +} + /** * API function definitions */ @@ -1056,7 +1073,7 @@ static MXS_ROUTER_SESSION *newSession(MXS_ROUTER *router_inst, MXS_SESSION *sess if (!select_connect_backend_servers(&master_ref, backend_ref, router_nservers, max_nslaves, max_slave_rlag, client_rses->rses_config.slave_selection_criteria, - session, router, false)) + session, router, client_rses, false)) { /** * Master and at least slaves must be found if the router is @@ -1070,6 +1087,7 @@ static MXS_ROUTER_SESSION *newSession(MXS_ROUTER *router_inst, MXS_SESSION *sess /** Copy backend pointers to router session. */ client_rses->rses_master_ref = master_ref; + client_rses->current_master = get_backend_from_bref(client_rses, master_ref); if (client_rses->rses_config.rw_max_slave_conn_percent) { @@ -1158,6 +1176,13 @@ static void closeSession(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_sessio } } + for (SRWBackendList::iterator it = router_cli_ses->backends.begin(); + it != router_cli_ses->backends.end(); it++) + { + SRWBackend& backend = *it; + backend->close(); + } + if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO) && router_cli_ses->sescmd_list.size()) { @@ -1487,6 +1512,7 @@ static void clientReply(MXS_ROUTER *instance, router_cli_ses->rses_config.slave_selection_criteria, router_cli_ses->rses_master_ref->bref_dcb->session, router_cli_ses->router, + router_cli_ses, true); } } @@ -1674,6 +1700,7 @@ static void handleError(MXS_ROUTER *instance, problem_dcb->server->unique_name); rses->forced_node = NULL; + rses->target_node.reset(); *succp = false; break; } diff --git a/server/modules/routing/readwritesplit/readwritesplit.hh b/server/modules/routing/readwritesplit/readwritesplit.hh index 14a0672cb..a5ab2d05a 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.hh +++ b/server/modules/routing/readwritesplit/readwritesplit.hh @@ -27,6 +27,7 @@ #include #include #include +#include #include enum bref_state_t @@ -235,6 +236,39 @@ struct rwsplit_config_t * been idle for too long */ }; +class RWBackend: public mxs::Backend +{ + RWBackend(const RWBackend&); + RWBackend& operator=(const RWBackend&); + +public: + RWBackend(SERVER_REF* ref): + mxs::Backend(ref), + m_reply_state(REPLY_STATE_DONE) + { + } + + ~RWBackend() + { + } + + reply_state_t get_reply_state() const + { + return m_reply_state; + } + + void set_reply_state(reply_state_t state) + { + m_reply_state = state; + } + +private: + reply_state_t m_reply_state; +}; + +typedef std::tr1::shared_ptr SRWBackend; +typedef std::list SRWBackendList; + typedef std::tr1::unordered_set TableSet; /** @@ -247,6 +281,9 @@ struct ROUTER_CLIENT_SES rses_property_t* rses_properties[RSES_PROP_TYPE_COUNT]; /**< Properties listed by their type */ backend_ref_t* rses_master_ref; backend_ref_t* rses_backend_ref; /**< Pointer to backend reference array */ + SRWBackendList backends; /**< List of backend servers */ + SRWBackend current_master; /**< Current master server */ + SRWBackend target_node; /**< The currently locked target node */ rwsplit_config_t rses_config; /**< copied config info from router instance */ int rses_nbackends; int rses_nsescmd; /**< Number of executed session commands */ diff --git a/server/modules/routing/readwritesplit/rwsplit_internal.hh b/server/modules/routing/readwritesplit/rwsplit_internal.hh index 89ddd60a1..b071a21c7 100644 --- a/server/modules/routing/readwritesplit/rwsplit_internal.hh +++ b/server/modules/routing/readwritesplit/rwsplit_internal.hh @@ -12,14 +12,13 @@ * Public License. */ -#include -#include +#include -/* This needs to be removed along with dependency on it - see the - * rwsplit_tmp_table_multi functions - */ +#include #include +#include "readwritesplit.hh" + #define RW_CHK_DCB(b, d) \ do{ \ if(d->state == DCB_STATE_DISCONNECTED){ \ @@ -120,6 +119,7 @@ bool select_connect_backend_servers(backend_ref_t **p_master_ref, select_criteria_t select_criteria, MXS_SESSION *session, ROUTER_INSTANCE *router, + ROUTER_CLIENT_SES *rses, bool active_session); /* @@ -134,3 +134,4 @@ void check_create_tmp_table(ROUTER_CLIENT_SES *router_cli_ses, bool check_for_multi_stmt(GWBUF *buf, void *protocol, uint8_t packet_type); uint32_t determine_query_type(GWBUF *querybuf, int packet_type, bool non_empty_packet); void close_failed_bref(backend_ref_t *bref, bool fatal); +SRWBackend get_backend_from_bref(ROUTER_CLIENT_SES* rses, backend_ref_t* bref); diff --git a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc index a625e29d0..ecbeadb61 100644 --- a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc +++ b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc @@ -190,6 +190,7 @@ bool route_single_stmt(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, { /** Reset the forced node as we're in relaxed multi-statement mode */ rses->forced_node = NULL; + rses->target_node.reset(); } } @@ -958,6 +959,7 @@ handle_multi_temp_and_load(ROUTER_CLIENT_SES *rses, GWBUF *querybuf, if (rses->rses_master_ref) { rses->forced_node = rses->rses_master_ref; + rses->target_node = rses->current_master; MXS_INFO("Multi-statement query, routing all future queries to master."); } else @@ -1289,6 +1291,7 @@ handle_got_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, session_trx_is_read_only(rses->client_dcb->session)) { rses->forced_node = bref; + rses->target_node = get_backend_from_bref(rses, bref); MXS_DEBUG("Setting forced_node SLAVE to %s within an opened READ ONLY transaction\n", target_dcb->server->unique_name); } @@ -1356,6 +1359,7 @@ handle_got_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, { MXS_DEBUG("An opened READ ONLY transaction ends: forced_node is set to NULL"); rses->forced_node = NULL; + rses->target_node.reset(); } return true; } diff --git a/server/modules/routing/readwritesplit/rwsplit_select_backends.cc b/server/modules/routing/readwritesplit/rwsplit_select_backends.cc index a9b40b963..805bd7af9 100644 --- a/server/modules/routing/readwritesplit/rwsplit_select_backends.cc +++ b/server/modules/routing/readwritesplit/rwsplit_select_backends.cc @@ -147,6 +147,7 @@ bool select_connect_backend_servers(backend_ref_t **p_master_ref, select_criteria_t select_criteria, MXS_SESSION *session, ROUTER_INSTANCE *router, + ROUTER_CLIENT_SES *rses, bool active_session) { if (p_master_ref == NULL || backend_ref == NULL) @@ -208,6 +209,17 @@ bool select_connect_backend_servers(backend_ref_t **p_master_ref, { if (connect_server(&backend_ref[i], session, false)) { + for (SRWBackendList::iterator it = rses->backends.begin(); + it != rses->backends.end(); it++) + { + SRWBackend& backend = *it; + if (backend->backend()->server == serv) + { + backend->connect(session); + break; + } + } + *p_master_ref = &backend_ref[i]; break; } @@ -240,6 +252,17 @@ bool select_connect_backend_servers(backend_ref_t **p_master_ref, if (connect_server(bref, session, true)) { slaves_connected += 1; + + for (SRWBackendList::iterator it = rses->backends.begin(); + it != rses->backends.end(); it++) + { + SRWBackend& backend = *it; + if (backend->backend()->server == bref->ref->server) + { + backend->connect(session); + break; + } + } } else {