diff --git a/server/modules/routing/readwritesplit/readwritesplit.cc b/server/modules/routing/readwritesplit/readwritesplit.cc index 1725c48f6..f9a129695 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.cc +++ b/server/modules/routing/readwritesplit/readwritesplit.cc @@ -542,7 +542,8 @@ static bool reroute_stored_statement(ROUTER_CLIENT_SES *rses, backend_ref_t *old } } - if (!success && rses->rses_master_ref && BREF_IS_IN_USE(rses->rses_master_ref)) + if (!success && rses->rses_master_ref && BREF_IS_IN_USE(rses->rses_master_ref) && + rses->current_master && rses->current_master->in_use()) { /** * Either we failed to write to the slave or no valid slave was found. @@ -672,7 +673,7 @@ static bool handle_error_new_connection(ROUTER_INSTANCE *inst, myrses->rses_nbackends, max_nslaves, max_slave_rlag, myrses->rses_config.slave_selection_criteria, - ses, inst, true); + ses, inst, myrses, true); } return succp; @@ -763,6 +764,8 @@ static bool create_backends(ROUTER_CLIENT_SES *rses, backend_ref_t** dest, int* { if (sref->active) { + rses->backends.push_back(SRWBackend(new RWBackend(sref))); + backend_ref[i].bref_chk_top = CHK_NUM_BACKEND_REF; backend_ref[i].bref_chk_tail = CHK_NUM_BACKEND_REF; backend_ref[i].bref_sescmd_cur.scmd_cur_chk_top = CHK_NUM_SESCMD_CUR; @@ -919,6 +922,20 @@ bool reply_is_complete(backend_ref_t* bref, GWBUF *buffer) return bref->reply_state == REPLY_STATE_DONE; } +SRWBackend get_backend_from_bref(ROUTER_CLIENT_SES* rses, backend_ref_t* bref) +{ + for (SRWBackendList::iterator it = rses->backends.begin(); + it != rses->backends.end(); it++) + { + if ((*it)->backend() == bref->ref) + { + return *it; + } + } + + return SRWBackend(); +} + /** * API function definitions */ @@ -1056,7 +1073,7 @@ static MXS_ROUTER_SESSION *newSession(MXS_ROUTER *router_inst, MXS_SESSION *sess if (!select_connect_backend_servers(&master_ref, backend_ref, router_nservers, max_nslaves, max_slave_rlag, client_rses->rses_config.slave_selection_criteria, - session, router, false)) + session, router, client_rses, false)) { /** * Master and at least slaves must be found if the router is @@ -1070,6 +1087,7 @@ static MXS_ROUTER_SESSION *newSession(MXS_ROUTER *router_inst, MXS_SESSION *sess /** Copy backend pointers to router session. */ client_rses->rses_master_ref = master_ref; + client_rses->current_master = get_backend_from_bref(client_rses, master_ref); if (client_rses->rses_config.rw_max_slave_conn_percent) { @@ -1158,6 +1176,13 @@ static void closeSession(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_sessio } } + for (SRWBackendList::iterator it = router_cli_ses->backends.begin(); + it != router_cli_ses->backends.end(); it++) + { + SRWBackend& backend = *it; + backend->close(); + } + if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO) && router_cli_ses->sescmd_list.size()) { @@ -1487,6 +1512,7 @@ static void clientReply(MXS_ROUTER *instance, router_cli_ses->rses_config.slave_selection_criteria, router_cli_ses->rses_master_ref->bref_dcb->session, router_cli_ses->router, + router_cli_ses, true); } } @@ -1674,6 +1700,7 @@ static void handleError(MXS_ROUTER *instance, problem_dcb->server->unique_name); rses->forced_node = NULL; + rses->target_node.reset(); *succp = false; break; } diff --git a/server/modules/routing/readwritesplit/readwritesplit.hh b/server/modules/routing/readwritesplit/readwritesplit.hh index 14a0672cb..a5ab2d05a 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.hh +++ b/server/modules/routing/readwritesplit/readwritesplit.hh @@ -27,6 +27,7 @@ #include #include #include +#include #include enum bref_state_t @@ -235,6 +236,39 @@ struct rwsplit_config_t * been idle for too long */ }; +class RWBackend: public mxs::Backend +{ + RWBackend(const RWBackend&); + RWBackend& operator=(const RWBackend&); + +public: + RWBackend(SERVER_REF* ref): + mxs::Backend(ref), + m_reply_state(REPLY_STATE_DONE) + { + } + + ~RWBackend() + { + } + + reply_state_t get_reply_state() const + { + return m_reply_state; + } + + void set_reply_state(reply_state_t state) + { + m_reply_state = state; + } + +private: + reply_state_t m_reply_state; +}; + +typedef std::tr1::shared_ptr SRWBackend; +typedef std::list SRWBackendList; + typedef std::tr1::unordered_set TableSet; /** @@ -247,6 +281,9 @@ struct ROUTER_CLIENT_SES rses_property_t* rses_properties[RSES_PROP_TYPE_COUNT]; /**< Properties listed by their type */ backend_ref_t* rses_master_ref; backend_ref_t* rses_backend_ref; /**< Pointer to backend reference array */ + SRWBackendList backends; /**< List of backend servers */ + SRWBackend current_master; /**< Current master server */ + SRWBackend target_node; /**< The currently locked target node */ rwsplit_config_t rses_config; /**< copied config info from router instance */ int rses_nbackends; int rses_nsescmd; /**< Number of executed session commands */ diff --git a/server/modules/routing/readwritesplit/rwsplit_internal.hh b/server/modules/routing/readwritesplit/rwsplit_internal.hh index 89ddd60a1..b071a21c7 100644 --- a/server/modules/routing/readwritesplit/rwsplit_internal.hh +++ b/server/modules/routing/readwritesplit/rwsplit_internal.hh @@ -12,14 +12,13 @@ * Public License. */ -#include -#include +#include -/* This needs to be removed along with dependency on it - see the - * rwsplit_tmp_table_multi functions - */ +#include #include +#include "readwritesplit.hh" + #define RW_CHK_DCB(b, d) \ do{ \ if(d->state == DCB_STATE_DISCONNECTED){ \ @@ -120,6 +119,7 @@ bool select_connect_backend_servers(backend_ref_t **p_master_ref, select_criteria_t select_criteria, MXS_SESSION *session, ROUTER_INSTANCE *router, + ROUTER_CLIENT_SES *rses, bool active_session); /* @@ -134,3 +134,4 @@ void check_create_tmp_table(ROUTER_CLIENT_SES *router_cli_ses, bool check_for_multi_stmt(GWBUF *buf, void *protocol, uint8_t packet_type); uint32_t determine_query_type(GWBUF *querybuf, int packet_type, bool non_empty_packet); void close_failed_bref(backend_ref_t *bref, bool fatal); +SRWBackend get_backend_from_bref(ROUTER_CLIENT_SES* rses, backend_ref_t* bref); diff --git a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc index a625e29d0..ecbeadb61 100644 --- a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc +++ b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc @@ -190,6 +190,7 @@ bool route_single_stmt(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, { /** Reset the forced node as we're in relaxed multi-statement mode */ rses->forced_node = NULL; + rses->target_node.reset(); } } @@ -958,6 +959,7 @@ handle_multi_temp_and_load(ROUTER_CLIENT_SES *rses, GWBUF *querybuf, if (rses->rses_master_ref) { rses->forced_node = rses->rses_master_ref; + rses->target_node = rses->current_master; MXS_INFO("Multi-statement query, routing all future queries to master."); } else @@ -1289,6 +1291,7 @@ handle_got_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, session_trx_is_read_only(rses->client_dcb->session)) { rses->forced_node = bref; + rses->target_node = get_backend_from_bref(rses, bref); MXS_DEBUG("Setting forced_node SLAVE to %s within an opened READ ONLY transaction\n", target_dcb->server->unique_name); } @@ -1356,6 +1359,7 @@ handle_got_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, { MXS_DEBUG("An opened READ ONLY transaction ends: forced_node is set to NULL"); rses->forced_node = NULL; + rses->target_node.reset(); } return true; } diff --git a/server/modules/routing/readwritesplit/rwsplit_select_backends.cc b/server/modules/routing/readwritesplit/rwsplit_select_backends.cc index a9b40b963..805bd7af9 100644 --- a/server/modules/routing/readwritesplit/rwsplit_select_backends.cc +++ b/server/modules/routing/readwritesplit/rwsplit_select_backends.cc @@ -147,6 +147,7 @@ bool select_connect_backend_servers(backend_ref_t **p_master_ref, select_criteria_t select_criteria, MXS_SESSION *session, ROUTER_INSTANCE *router, + ROUTER_CLIENT_SES *rses, bool active_session) { if (p_master_ref == NULL || backend_ref == NULL) @@ -208,6 +209,17 @@ bool select_connect_backend_servers(backend_ref_t **p_master_ref, { if (connect_server(&backend_ref[i], session, false)) { + for (SRWBackendList::iterator it = rses->backends.begin(); + it != rses->backends.end(); it++) + { + SRWBackend& backend = *it; + if (backend->backend()->server == serv) + { + backend->connect(session); + break; + } + } + *p_master_ref = &backend_ref[i]; break; } @@ -240,6 +252,17 @@ bool select_connect_backend_servers(backend_ref_t **p_master_ref, if (connect_server(bref, session, true)) { slaves_connected += 1; + + for (SRWBackendList::iterator it = rses->backends.begin(); + it != rses->backends.end(); it++) + { + SRWBackend& backend = *it; + if (backend->backend()->server == bref->ref->server) + { + backend->connect(session); + break; + } + } } else {