MXS-1653: Fix slave session command processing

The responses of slaves that arrived before the master were always
compared to the empty value of 0x00. If the slave connection replied after
the master, the comparison was correct.

This commit introduces a map of slaves and their responses that
are handled once the master's response arrives.
This commit is contained in:
Markus Mäkelä 2018-02-07 17:20:40 +02:00
parent bff4f05e3b
commit 2504ff19b3
No known key found for this signature in database
GPG Key ID: 72D48FCE664F7B19
2 changed files with 72 additions and 28 deletions

View File

@ -26,6 +26,32 @@
* Functions for session command handling
*/
/**
* Discards the slave connection if its response differs from the master's response
*
* @param backend The slave Backend
* @param master_cmd Master's reply
* @param slave_cmd Slave's reply
*
* @return True if the responses were different and connection was discarded
*/
static bool discard_if_response_differs(SRWBackend backend, uint8_t master_cmd, uint8_t slave_cmd)
{
bool rval = false;
if (master_cmd != slave_cmd)
{
MXS_WARNING("Slave server '%s': response (0x%02hhx) differs "
"from master's response(0x%02hhx). Closing slave "
"connection due to inconsistent session state.",
backend->name(), slave_cmd, master_cmd);
backend->close(mxs::Backend::CLOSE_FATAL);
rval = true;
}
return rval;
}
void process_sescmd_response(RWSplitSession* rses, SRWBackend& backend,
GWBUF** ppPacket, bool* pReconnect)
{
@ -39,6 +65,7 @@ void process_sescmd_response(RWSplitSession* rses, SRWBackend& backend,
uint8_t command = backend->next_session_command()->get_command();
uint64_t id = backend->complete_session_command();
MXS_PS_RESPONSE resp = {};
bool discard = true;
if (command == MXS_COM_STMT_PREPARE && cmd != MYSQL_REPLY_ERR)
{
@ -48,41 +75,54 @@ void process_sescmd_response(RWSplitSession* rses, SRWBackend& backend,
backend->add_ps_handle(id, resp.id);
}
if (rses->recv_sescmd < rses->sent_sescmd &&
id == rses->recv_sescmd + 1 &&
(!rses->current_master || !rses->current_master->in_use() || // Session doesn't have a master
rses->current_master == backend)) // This is the master's response
if (rses->recv_sescmd < rses->sent_sescmd && id == rses->recv_sescmd + 1)
{
/** First reply to this session command, route it to the client */
++rses->recv_sescmd;
/** Store the master's response so that the slave responses can
* be compared to it */
rses->sescmd_responses[id] = cmd;
if (command == MXS_COM_STMT_PREPARE)
if (!rses->current_master || !rses->current_master->in_use() || // Session doesn't have a master
rses->current_master == backend) // This is the master's response
{
/** Map the returned response to the internal ID */
MXS_INFO("PS ID %u maps to internal ID %lu", resp.id, id);
rses->ps_handles[resp.id] = id;
/** First reply to this session command, route it to the client */
++rses->recv_sescmd;
discard = false;
/** Store the master's response so that the slave responses can
* be compared to it */
rses->sescmd_responses[id] = cmd;
if (command == MXS_COM_STMT_PREPARE)
{
/** Map the returned response to the internal ID */
MXS_INFO("PS ID %u maps to internal ID %lu", resp.id, id);
rses->ps_handles[resp.id] = id;
}
// Discard any slave connections that did not return the same result
for (SlaveResponseList::iterator it = rses->slave_responses.begin();
it != rses->slave_responses.end(); it++)
{
if (discard_if_response_differs(it->first, cmd, it->second))
{
*pReconnect = true;
}
}
rses->slave_responses.clear();
}
else
{
/** Record slave command so that the response can be validated
* against the master's response when it arrives. */
rses->slave_responses.push_back(std::make_pair(backend, cmd));
}
}
else
else if (discard_if_response_differs(backend, rses->sescmd_responses[id], cmd))
{
*pReconnect = true;
}
if (discard)
{
/** The reply to this session command has already been sent to
* the client, discard it */
gwbuf_free(*ppPacket);
*ppPacket = NULL;
if (rses->sescmd_responses[id] != cmd)
{
MXS_WARNING("Slave server '%s': response (0x%02hhx) differs "
"from master's response(0x%02hhx). Closing slave "
"connection due to inconsistent session state.",
backend->name(), cmd, rses->sescmd_responses[id]);
backend->close(mxs::Backend::CLOSE_FATAL);
*pReconnect = true;
}
}
}
}

View File

@ -89,6 +89,9 @@ typedef std::list<SRWBackend> SRWBackendList;
typedef std::tr1::unordered_set<std::string> TableSet;
typedef std::map<uint64_t, uint8_t> ResponseMap;
/** List of slave responses that arrived before the master */
typedef std::list< std::pair<SRWBackend, uint8_t> > SlaveResponseList;
/** Map of COM_STMT_EXECUTE targets by internal ID */
typedef std::tr1::unordered_map<uint32_t, SRWBackend> ExecMap;
@ -133,6 +136,7 @@ public:
TableSet temp_tables; /**< Set of temporary tables */
mxs::SessionCommandList sescmd_list; /**< List of executed session commands */
ResponseMap sescmd_responses; /**< Response to each session command */
SlaveResponseList slave_responses; /**< Slaves that replied before the master */
uint64_t sent_sescmd; /**< ID of the last sent session command*/
uint64_t recv_sescmd; /**< ID of the most recently completed session command */
PSManager ps_manager; /**< Prepared statement manager*/