MXS-2455 Recognize transaction rollbacks
All transaction rollback errors have an sql_state like "40XXX". So, when an error reply is received we check for that and act accordingly.
This commit is contained in:
@ -569,80 +569,56 @@ void RWSplitSession::close_stale_connections()
|
|||||||
namespace
|
namespace
|
||||||
{
|
{
|
||||||
|
|
||||||
// TODO: It is not OK that knowledge about Clustrix is embedded into RWS.
|
inline bool is_transaction_rollback(uint8_t* pData)
|
||||||
// TODO: The capacity for recovery should be abstracted into SERVER, of
|
|
||||||
// TODO: which there then would be backend specific concrete specializations.
|
|
||||||
|
|
||||||
const int CLUSTRIX_ERROR_CODE = 1;
|
|
||||||
|
|
||||||
// NOTE: Keep these alphabetically ordered!
|
|
||||||
const char CLUSTRIX_ERROR_1[] = "[16389] Group change during GTM operation";
|
|
||||||
|
|
||||||
const struct ClustrixError
|
|
||||||
{
|
{
|
||||||
const void* message;
|
bool rv = false;
|
||||||
int len;
|
|
||||||
|
|
||||||
bool operator == (const ClustrixError& rhs) const
|
// The 'sql_state' of all transaction rollbacks is "40XXX". In an error
|
||||||
|
// packet, the 'sql_state' is found in the payload at offset 4, after the one
|
||||||
|
// byte 'header', the two byte 'error_code', and the 1 byte 'sql_state_marker'.
|
||||||
|
uint8_t* p = pData + MYSQL_HEADER_LEN + 1 + 2 + 1;
|
||||||
|
|
||||||
|
if (*p++ == '4' && *p == '0')
|
||||||
{
|
{
|
||||||
return len == rhs.len && memcmp(message, rhs.message, len) == 0;
|
rv = true;
|
||||||
}
|
|
||||||
|
|
||||||
bool operator < (const ClustrixError& rhs) const
|
if (mxb_log_is_priority_enabled(LOG_INFO))
|
||||||
{
|
|
||||||
int rv = memcmp(message, rhs.message, std::min(len, rhs.len));
|
|
||||||
|
|
||||||
if (rv == 0)
|
|
||||||
{
|
{
|
||||||
rv = len - rhs.len;
|
// p now points at the second byte of the 5 byte long 'sql_state'.
|
||||||
|
p += 4; // Now at the start of the human readable error message
|
||||||
|
|
||||||
|
uint8_t* end = pData + MYSQL_HEADER_LEN + MYSQL_GET_PAYLOAD_LEN(pData);
|
||||||
|
int len = end - p;
|
||||||
|
char message[len + 1];
|
||||||
|
memcpy(message, p, len);
|
||||||
|
message[len] = 0;
|
||||||
|
|
||||||
|
MXS_NOTICE("A retryable Clustrix error: %s", message);
|
||||||
}
|
}
|
||||||
|
|
||||||
return rv < 0 ? true : false;
|
|
||||||
}
|
}
|
||||||
} clustrix_errors[] =
|
|
||||||
{
|
|
||||||
{ CLUSTRIX_ERROR_1, sizeof(CLUSTRIX_ERROR_1) - 1 }
|
|
||||||
};
|
|
||||||
|
|
||||||
const int nClustrix_errors = sizeof(clustrix_errors) / sizeof(clustrix_errors[0]);
|
return rv;
|
||||||
|
}
|
||||||
|
|
||||||
bool is_manageable_clustrix_error(GWBUF* writebuf)
|
bool is_transaction_rollback(GWBUF* writebuf)
|
||||||
{
|
{
|
||||||
bool rv = false;
|
bool rv = false;
|
||||||
|
|
||||||
if (MYSQL_IS_ERROR_PACKET(GWBUF_DATA(writebuf)))
|
if (MYSQL_IS_ERROR_PACKET(GWBUF_DATA(writebuf)))
|
||||||
{
|
{
|
||||||
uint8_t* pData = GWBUF_DATA(writebuf);
|
if (GWBUF_IS_CONTIGUOUS(writebuf))
|
||||||
uint8_t data[MYSQL_HEADER_LEN + MYSQL_GET_PAYLOAD_LEN(pData)];
|
|
||||||
|
|
||||||
if (!GWBUF_IS_CONTIGUOUS(writebuf))
|
|
||||||
{
|
{
|
||||||
gwbuf_copy_data(writebuf, 0, sizeof(data), data);
|
rv = is_transaction_rollback(GWBUF_DATA(writebuf));
|
||||||
pData = data;
|
|
||||||
}
|
}
|
||||||
|
else
|
||||||
uint16_t code = MYSQL_GET_ERRCODE(pData);
|
|
||||||
|
|
||||||
if (code == CLUSTRIX_ERROR_CODE)
|
|
||||||
{
|
{
|
||||||
// May be a recoverable error.
|
uint8_t* pData = GWBUF_DATA(writebuf);
|
||||||
uint8_t* pMessage;
|
int len = MYSQL_HEADER_LEN + MYSQL_GET_PAYLOAD_LEN(pData);
|
||||||
uint16_t nMessage;
|
uint8_t data[len];
|
||||||
extract_error_message(pData, &pMessage, &nMessage);
|
|
||||||
|
|
||||||
if (std::binary_search(clustrix_errors, clustrix_errors + nClustrix_errors,
|
gwbuf_copy_data(writebuf, 0, len, data);
|
||||||
ClustrixError { pMessage, nMessage }))
|
|
||||||
{
|
|
||||||
if (mxb_log_is_priority_enabled(LOG_INFO))
|
|
||||||
{
|
|
||||||
char message[nMessage + 1];
|
|
||||||
memcpy(message, pMessage, nMessage);
|
|
||||||
message[nMessage] = 0;
|
|
||||||
|
|
||||||
MXS_INFO("A recoverable Clustrix error: %s", message);
|
rv = is_transaction_rollback(data);
|
||||||
}
|
|
||||||
rv = true;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -689,7 +665,7 @@ void RWSplitSession::clientReply(GWBUF* writebuf, DCB* backend_dcb)
|
|||||||
|
|
||||||
backend->process_reply(writebuf);
|
backend->process_reply(writebuf);
|
||||||
|
|
||||||
if (m_config.transaction_replay && is_manageable_clustrix_error(writebuf))
|
if (m_config.transaction_replay && is_transaction_rollback(writebuf))
|
||||||
{
|
{
|
||||||
// writebuf was an error that can be handled by replaying the transaction.
|
// writebuf was an error that can be handled by replaying the transaction.
|
||||||
m_expected_responses--;
|
m_expected_responses--;
|
||||||
|
Reference in New Issue
Block a user