diff --git a/server/modules/routing/readwritesplit/rwsplit_session_cmd.cc b/server/modules/routing/readwritesplit/rwsplit_session_cmd.cc index 31d9ea6ac..238b60650 100644 --- a/server/modules/routing/readwritesplit/rwsplit_session_cmd.cc +++ b/server/modules/routing/readwritesplit/rwsplit_session_cmd.cc @@ -26,6 +26,32 @@ * Functions for session command handling */ +/** + * Discards the slave connection if its response differs from the master's response + * + * @param backend The slave Backend + * @param master_cmd Master's reply + * @param slave_cmd Slave's reply + * + * @return True if the responses were different and connection was discarded + */ +static bool discard_if_response_differs(SRWBackend backend, uint8_t master_cmd, uint8_t slave_cmd) +{ + bool rval = false; + + if (master_cmd != slave_cmd) + { + MXS_WARNING("Slave server '%s': response (0x%02hhx) differs " + "from master's response(0x%02hhx). Closing slave " + "connection due to inconsistent session state.", + backend->name(), slave_cmd, master_cmd); + backend->close(mxs::Backend::CLOSE_FATAL); + rval = true; + } + + return rval; +} + void process_sescmd_response(RWSplitSession* rses, SRWBackend& backend, GWBUF** ppPacket, bool* pReconnect) { @@ -39,6 +65,7 @@ void process_sescmd_response(RWSplitSession* rses, SRWBackend& backend, uint8_t command = backend->next_session_command()->get_command(); uint64_t id = backend->complete_session_command(); MXS_PS_RESPONSE resp = {}; + bool discard = true; if (command == MXS_COM_STMT_PREPARE && cmd != MYSQL_REPLY_ERR) { @@ -48,41 +75,54 @@ void process_sescmd_response(RWSplitSession* rses, SRWBackend& backend, backend->add_ps_handle(id, resp.id); } - if (rses->recv_sescmd < rses->sent_sescmd && - id == rses->recv_sescmd + 1 && - (!rses->current_master || !rses->current_master->in_use() || // Session doesn't have a master - rses->current_master == backend)) // This is the master's response + if (rses->recv_sescmd < rses->sent_sescmd && id == rses->recv_sescmd + 1) { - /** First reply to this session command, route it to the client */ - ++rses->recv_sescmd; - - /** Store the master's response so that the slave responses can - * be compared to it */ - rses->sescmd_responses[id] = cmd; - - if (command == MXS_COM_STMT_PREPARE) + if (!rses->current_master || !rses->current_master->in_use() || // Session doesn't have a master + rses->current_master == backend) // This is the master's response { - /** Map the returned response to the internal ID */ - MXS_INFO("PS ID %u maps to internal ID %lu", resp.id, id); - rses->ps_handles[resp.id] = id; + /** First reply to this session command, route it to the client */ + ++rses->recv_sescmd; + discard = false; + + /** Store the master's response so that the slave responses can + * be compared to it */ + rses->sescmd_responses[id] = cmd; + + if (command == MXS_COM_STMT_PREPARE) + { + /** Map the returned response to the internal ID */ + MXS_INFO("PS ID %u maps to internal ID %lu", resp.id, id); + rses->ps_handles[resp.id] = id; + } + + // Discard any slave connections that did not return the same result + for (SlaveResponseList::iterator it = rses->slave_responses.begin(); + it != rses->slave_responses.end(); it++) + { + if (discard_if_response_differs(it->first, cmd, it->second)) + { + *pReconnect = true; + } + } + + rses->slave_responses.clear(); + } + else + { + /** Record slave command so that the response can be validated + * against the master's response when it arrives. */ + rses->slave_responses.push_back(std::make_pair(backend, cmd)); } } - else + else if (discard_if_response_differs(backend, rses->sescmd_responses[id], cmd)) + { + *pReconnect = true; + } + + if (discard) { - /** The reply to this session command has already been sent to - * the client, discard it */ gwbuf_free(*ppPacket); *ppPacket = NULL; - - if (rses->sescmd_responses[id] != cmd) - { - MXS_WARNING("Slave server '%s': response (0x%02hhx) differs " - "from master's response(0x%02hhx). Closing slave " - "connection due to inconsistent session state.", - backend->name(), cmd, rses->sescmd_responses[id]); - backend->close(mxs::Backend::CLOSE_FATAL); - *pReconnect = true; - } } } } diff --git a/server/modules/routing/readwritesplit/rwsplitsession.hh b/server/modules/routing/readwritesplit/rwsplitsession.hh index 29511104d..1de6fb696 100644 --- a/server/modules/routing/readwritesplit/rwsplitsession.hh +++ b/server/modules/routing/readwritesplit/rwsplitsession.hh @@ -89,6 +89,9 @@ typedef std::list SRWBackendList; typedef std::tr1::unordered_set TableSet; typedef std::map ResponseMap; +/** List of slave responses that arrived before the master */ +typedef std::list< std::pair > SlaveResponseList; + /** Map of COM_STMT_EXECUTE targets by internal ID */ typedef std::tr1::unordered_map ExecMap; @@ -133,6 +136,7 @@ public: TableSet temp_tables; /**< Set of temporary tables */ mxs::SessionCommandList sescmd_list; /**< List of executed session commands */ ResponseMap sescmd_responses; /**< Response to each session command */ + SlaveResponseList slave_responses; /**< Slaves that replied before the master */ uint64_t sent_sescmd; /**< ID of the last sent session command*/ uint64_t recv_sescmd; /**< ID of the most recently completed session command */ PSManager ps_manager; /**< Prepared statement manager*/