Merge branch '2.2' into develop
This commit is contained in:
@ -26,6 +26,52 @@
|
||||
* Functions for session command handling
|
||||
*/
|
||||
|
||||
|
||||
static std::string extract_error(GWBUF* buffer)
|
||||
{
|
||||
std::string rval;
|
||||
|
||||
if (MYSQL_IS_ERROR_PACKET(((uint8_t *)GWBUF_DATA(buffer))))
|
||||
{
|
||||
size_t replylen = MYSQL_GET_PAYLOAD_LEN(GWBUF_DATA(buffer));
|
||||
char replybuf[replylen];
|
||||
gwbuf_copy_data(buffer, 0, gwbuf_length(buffer), (uint8_t*)replybuf);
|
||||
std::string err;
|
||||
std::string msg;
|
||||
err.append(replybuf + 8, 5);
|
||||
msg.append(replybuf + 13, replylen - 4 - 5);
|
||||
rval = err + ": " + msg;
|
||||
}
|
||||
|
||||
return rval;
|
||||
}
|
||||
|
||||
/**
|
||||
* Discards the slave connection if its response differs from the master's response
|
||||
*
|
||||
* @param backend The slave Backend
|
||||
* @param master_cmd Master's reply
|
||||
* @param slave_cmd Slave's reply
|
||||
*
|
||||
* @return True if the responses were different and connection was discarded
|
||||
*/
|
||||
static bool discard_if_response_differs(SRWBackend backend, uint8_t master_cmd, uint8_t slave_cmd)
|
||||
{
|
||||
bool rval = false;
|
||||
|
||||
if (master_cmd != slave_cmd)
|
||||
{
|
||||
MXS_WARNING("Slave server '%s': response (0x%02hhx) differs "
|
||||
"from master's response(0x%02hhx). Closing slave "
|
||||
"connection due to inconsistent session state.",
|
||||
backend->name(), slave_cmd, master_cmd);
|
||||
backend->close(mxs::Backend::CLOSE_FATAL);
|
||||
rval = true;
|
||||
}
|
||||
|
||||
return rval;
|
||||
}
|
||||
|
||||
void process_sescmd_response(RWSplitSession* rses, SRWBackend& backend,
|
||||
GWBUF** ppPacket, bool* pReconnect)
|
||||
{
|
||||
@ -39,6 +85,7 @@ void process_sescmd_response(RWSplitSession* rses, SRWBackend& backend,
|
||||
uint8_t command = backend->next_session_command()->get_command();
|
||||
uint64_t id = backend->complete_session_command();
|
||||
MXS_PS_RESPONSE resp = {};
|
||||
bool discard = true;
|
||||
|
||||
if (command == MXS_COM_STMT_PREPARE && cmd != MYSQL_REPLY_ERR)
|
||||
{
|
||||
@ -48,41 +95,59 @@ void process_sescmd_response(RWSplitSession* rses, SRWBackend& backend,
|
||||
backend->add_ps_handle(id, resp.id);
|
||||
}
|
||||
|
||||
if (rses->recv_sescmd < rses->sent_sescmd &&
|
||||
id == rses->recv_sescmd + 1 &&
|
||||
(!rses->current_master || !rses->current_master->in_use() || // Session doesn't have a master
|
||||
rses->current_master == backend)) // This is the master's response
|
||||
if (rses->recv_sescmd < rses->sent_sescmd && id == rses->recv_sescmd + 1)
|
||||
{
|
||||
/** First reply to this session command, route it to the client */
|
||||
++rses->recv_sescmd;
|
||||
|
||||
/** Store the master's response so that the slave responses can
|
||||
* be compared to it */
|
||||
rses->sescmd_responses[id] = cmd;
|
||||
|
||||
if (command == MXS_COM_STMT_PREPARE)
|
||||
if (!rses->current_master || !rses->current_master->in_use() || // Session doesn't have a master
|
||||
rses->current_master == backend) // This is the master's response
|
||||
{
|
||||
/** Map the returned response to the internal ID */
|
||||
MXS_INFO("PS ID %u maps to internal ID %lu", resp.id, id);
|
||||
rses->ps_handles[resp.id] = id;
|
||||
/** First reply to this session command, route it to the client */
|
||||
++rses->recv_sescmd;
|
||||
discard = false;
|
||||
|
||||
/** Store the master's response so that the slave responses can
|
||||
* be compared to it */
|
||||
rses->sescmd_responses[id] = cmd;
|
||||
|
||||
if (cmd == MYSQL_REPLY_ERR)
|
||||
{
|
||||
MXS_INFO("Session command no. %lu failed: %s",
|
||||
id, extract_error(*ppPacket).c_str());
|
||||
}
|
||||
else if (command == MXS_COM_STMT_PREPARE)
|
||||
{
|
||||
/** Map the returned response to the internal ID */
|
||||
MXS_INFO("PS ID %u maps to internal ID %lu", resp.id, id);
|
||||
rses->ps_handles[resp.id] = id;
|
||||
}
|
||||
|
||||
// Discard any slave connections that did not return the same result
|
||||
for (SlaveResponseList::iterator it = rses->slave_responses.begin();
|
||||
it != rses->slave_responses.end(); it++)
|
||||
{
|
||||
if (discard_if_response_differs(it->first, cmd, it->second))
|
||||
{
|
||||
*pReconnect = true;
|
||||
}
|
||||
}
|
||||
|
||||
rses->slave_responses.clear();
|
||||
}
|
||||
else
|
||||
{
|
||||
/** Record slave command so that the response can be validated
|
||||
* against the master's response when it arrives. */
|
||||
rses->slave_responses.push_back(std::make_pair(backend, cmd));
|
||||
}
|
||||
}
|
||||
else
|
||||
else if (discard_if_response_differs(backend, rses->sescmd_responses[id], cmd))
|
||||
{
|
||||
*pReconnect = true;
|
||||
}
|
||||
|
||||
if (discard)
|
||||
{
|
||||
/** The reply to this session command has already been sent to
|
||||
* the client, discard it */
|
||||
gwbuf_free(*ppPacket);
|
||||
*ppPacket = NULL;
|
||||
|
||||
if (rses->sescmd_responses[id] != cmd)
|
||||
{
|
||||
MXS_WARNING("Slave server '%s': response (0x%02hhx) differs "
|
||||
"from master's response(0x%02hhx). Closing slave "
|
||||
"connection due to inconsistent session state.",
|
||||
backend->name(), cmd, rses->sescmd_responses[id]);
|
||||
backend->close(mxs::Backend::CLOSE_FATAL);
|
||||
*pReconnect = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -97,6 +97,9 @@ typedef std::list<SRWBackend> SRWBackendList;
|
||||
typedef std::tr1::unordered_set<std::string> TableSet;
|
||||
typedef std::map<uint64_t, uint8_t> ResponseMap;
|
||||
|
||||
/** List of slave responses that arrived before the master */
|
||||
typedef std::list< std::pair<SRWBackend, uint8_t> > SlaveResponseList;
|
||||
|
||||
/** Map of COM_STMT_EXECUTE targets by internal ID */
|
||||
typedef std::tr1::unordered_map<uint32_t, SRWBackend> ExecMap;
|
||||
|
||||
@ -141,6 +144,7 @@ public:
|
||||
TableSet temp_tables; /**< Set of temporary tables */
|
||||
mxs::SessionCommandList sescmd_list; /**< List of executed session commands */
|
||||
ResponseMap sescmd_responses; /**< Response to each session command */
|
||||
SlaveResponseList slave_responses; /**< Slaves that replied before the master */
|
||||
uint64_t sent_sescmd; /**< ID of the last sent session command*/
|
||||
uint64_t recv_sescmd; /**< ID of the most recently completed session command */
|
||||
PSManager ps_manager; /**< Prepared statement manager*/
|
||||
|
Reference in New Issue
Block a user