diff --git a/server/modules/routing/readwritesplit/rwsplitsession.cc b/server/modules/routing/readwritesplit/rwsplitsession.cc index 2cf2aee72..0f7b96b53 100644 --- a/server/modules/routing/readwritesplit/rwsplitsession.cc +++ b/server/modules/routing/readwritesplit/rwsplitsession.cc @@ -467,36 +467,8 @@ void RWSplitSession::trx_replay_next_stmt() } } -void RWSplitSession::clientReply(GWBUF* writebuf, DCB* backend_dcb) +void RWSplitSession::manage_transactions(SRWBackend& backend, GWBUF* writebuf) { - DCB* client_dcb = backend_dcb->session->client_dcb; - - SRWBackend& backend = get_backend_from_dcb(backend_dcb); - - if (backend->get_reply_state() == REPLY_STATE_DONE) - { - if (connection_was_killed(writebuf)) - { - // The connection was killed, we can safely ignore it. When the TCP connection is - // closed, the router's error handling will sort it out. - gwbuf_free(writebuf); - } - else - { - /** If we receive an unexpected response from the server, the internal - * logic cannot handle this situation. Routing the reply straight to - * the client should be the safest thing to do at this point. */ - log_unexpected_response(backend, writebuf, m_current_query.get()); - MXS_SESSION_ROUTE_REPLY(backend_dcb->session, writebuf); - } - return; - } - - if ((writebuf = handle_causal_read_reply(writebuf, backend)) == NULL) - { - return; // Nothing to route, return - } - if (m_otrx_state == OTRX_ROLLBACK) { /** This is the response to the ROLLBACK. If it fails, we must close @@ -506,7 +478,7 @@ void RWSplitSession::clientReply(GWBUF* writebuf, DCB* backend_dcb) if (!mxs_mysql_is_ok_packet(writebuf)) { - poll_fake_hangup_event(backend_dcb); + poll_fake_hangup_event(backend->dcb()); } } else if (m_config.transaction_replay && m_can_replay_trx @@ -559,6 +531,39 @@ void RWSplitSession::clientReply(GWBUF* writebuf, DCB* backend_dcb) * in case the connection breaks in the middle of a resultset. */ m_current_query.reset(); } +} + +void RWSplitSession::clientReply(GWBUF* writebuf, DCB* backend_dcb) +{ + DCB* client_dcb = backend_dcb->session->client_dcb; + SRWBackend& backend = get_backend_from_dcb(backend_dcb); + + if (backend->get_reply_state() == REPLY_STATE_DONE) + { + if (connection_was_killed(writebuf)) + { + // The connection was killed, we can safely ignore it. When the TCP connection is + // closed, the router's error handling will sort it out. + gwbuf_free(writebuf); + } + else + { + /** If we receive an unexpected response from the server, the internal + * logic cannot handle this situation. Routing the reply straight to + * the client should be the safest thing to do at this point. */ + log_unexpected_response(backend, writebuf, m_current_query.get()); + MXS_SESSION_ROUTE_REPLY(backend_dcb->session, writebuf); + } + return; + } + + if ((writebuf = handle_causal_read_reply(writebuf, backend)) == NULL) + { + return; // Nothing to route, return + } + + // Track transaction contents and handle ROLLBACK with aggressive transaction load balancing + manage_transactions(backend, writebuf); backend->process_reply(writebuf); diff --git a/server/modules/routing/readwritesplit/rwsplitsession.hh b/server/modules/routing/readwritesplit/rwsplitsession.hh index 6cc37ad39..5b1311e3e 100644 --- a/server/modules/routing/readwritesplit/rwsplitsession.hh +++ b/server/modules/routing/readwritesplit/rwsplitsession.hh @@ -215,6 +215,7 @@ private: void handle_error_reply_client(DCB* backend_dcb, GWBUF* errmsg); bool handle_error_new_connection(DCB* backend_dcb, GWBUF* errmsg); + void manage_transactions(mxs::SRWBackend& backend, GWBUF* writebuf); void trx_replay_next_stmt();