MXS-1507: Fix replaying of empty transactions
If the starting of a transaction was interrupted by a server failure, the query needs to be retried. This needs to be done as a transaction replay to keep the routing logic consistent and simple. When a non-autocommit transaction is interrupted, there will be no query in progress and no replaying is needed. To handle this case, the replay initialization logic needed to be altered to treat truly empty transactions as a success case.
This commit is contained in:
@ -480,6 +480,7 @@ public:
|
||||
SHA1Checksum()
|
||||
{
|
||||
SHA1_Init(&m_ctx);
|
||||
m_sum.fill(0); // CentOS 6 doesn't like aggregate initialization...
|
||||
}
|
||||
|
||||
void update(GWBUF* buffer)
|
||||
|
@ -387,39 +387,57 @@ GWBUF* RWSplitSession::handle_causal_read_reply(GWBUF *writebuf, SRWBackend& bac
|
||||
|
||||
void RWSplitSession::handle_trx_replay()
|
||||
{
|
||||
if (m_replayed_trx.empty())
|
||||
{
|
||||
// No more statements to execute
|
||||
m_is_replay_active = false;
|
||||
atomic_add_uint64(&m_router->stats().n_trx_replay, 1);
|
||||
|
||||
// Check that the checksums match.
|
||||
SHA1Checksum chksum = m_trx.checksum();
|
||||
chksum.finalize();
|
||||
|
||||
if (chksum == m_replayed_trx.checksum())
|
||||
{
|
||||
MXS_INFO("Checksums match, replay successful.");
|
||||
|
||||
if (m_interrupted_query.get())
|
||||
{
|
||||
MXS_INFO("Resuming execution: %s", mxs::extract_sql(m_interrupted_query.get()).c_str());
|
||||
retry_query(m_interrupted_query.release(), 0);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
MXS_INFO("Checksum mismatch, transaction replay failed. Closing connection.");
|
||||
poll_fake_hangup_event(m_client);
|
||||
}
|
||||
}
|
||||
else
|
||||
if (m_replayed_trx.have_stmts())
|
||||
{
|
||||
// More statements to replay, pop the oldest one and execute it
|
||||
GWBUF* buf = m_replayed_trx.pop_stmt();
|
||||
MXS_INFO("Replaying: %s", mxs::extract_sql(buf, 1024).c_str());
|
||||
retry_query(buf, 0);
|
||||
}
|
||||
else
|
||||
{
|
||||
// No more statements to execute
|
||||
m_is_replay_active = false;
|
||||
atomic_add_uint64(&m_router->stats().n_trx_replay, 1);
|
||||
|
||||
if (!m_replayed_trx.empty())
|
||||
{
|
||||
// Check that the checksums match.
|
||||
SHA1Checksum chksum = m_trx.checksum();
|
||||
chksum.finalize();
|
||||
|
||||
if (chksum == m_replayed_trx.checksum())
|
||||
{
|
||||
MXS_INFO("Checksums match, replay successful.");
|
||||
|
||||
if (m_interrupted_query.get())
|
||||
{
|
||||
MXS_INFO("Resuming execution: %s", mxs::extract_sql(m_interrupted_query.get()).c_str());
|
||||
retry_query(m_interrupted_query.release(), 0);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
MXS_INFO("Checksum mismatch, transaction replay failed. Closing connection.");
|
||||
modutil_send_mysql_err_packet(m_client, 0, 0, 1927, "08S01",
|
||||
"Transaction checksum mismatch encountered "
|
||||
"when replaying transaction.");
|
||||
poll_fake_hangup_event(m_client);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
/**
|
||||
* The transaction was "empty". This means that the start of the transaction
|
||||
* did not finish before we started the replay process.
|
||||
*
|
||||
* The transaction that is being currently replayed has a result,
|
||||
* whereas the original interrupted transaction had none. Due to this,
|
||||
* the checksums would not match if they were to be compared.
|
||||
*/
|
||||
ss_info_dassert(!m_interrupted_query.get(), "Interrupted query should be empty");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void RWSplitSession::clientReply(GWBUF *writebuf, DCB *backend_dcb)
|
||||
@ -501,9 +519,24 @@ void RWSplitSession::clientReply(GWBUF *writebuf, DCB *backend_dcb)
|
||||
ss_dassert(m_config.transaction_replay);
|
||||
handle_trx_replay();
|
||||
|
||||
// Ignore the response, the client doesn't need it
|
||||
gwbuf_free(writebuf);
|
||||
return;
|
||||
/**
|
||||
* If the start of the transaction was interrupted, we need to return
|
||||
* the result to the client.
|
||||
*
|
||||
* This retrying of START TRANSACTION is done with the transaction replay
|
||||
* mechanism instead of the normal query retry mechanism because the safeguards
|
||||
* in the routing logic prevent retrying of individual queries inside transactions.
|
||||
*
|
||||
* If the transaction was not empty and some results have already been
|
||||
* sent to the client, we must discard all responses that the client already has.
|
||||
*/
|
||||
|
||||
if (!m_replayed_trx.empty())
|
||||
{
|
||||
// Client already has this response, discard it
|
||||
gwbuf_free(writebuf);
|
||||
return;
|
||||
}
|
||||
}
|
||||
else if (m_config.transaction_replay && session_trx_is_ending(m_client->session))
|
||||
{
|
||||
@ -566,23 +599,47 @@ bool RWSplitSession::start_trx_replay()
|
||||
|
||||
if (!m_is_replay_active && m_config.transaction_replay && m_can_replay_trx)
|
||||
{
|
||||
// Stash any interrupted queries while we replay the transaction
|
||||
m_interrupted_query.reset(m_current_query.release());
|
||||
if (m_trx.have_stmts() || m_current_query.get())
|
||||
{
|
||||
// Stash any interrupted queries while we replay the transaction
|
||||
m_interrupted_query.reset(m_current_query.release());
|
||||
|
||||
MXS_INFO("Starting transaction replay");
|
||||
m_is_replay_active = true;
|
||||
MXS_INFO("Starting transaction replay");
|
||||
m_is_replay_active = true;
|
||||
|
||||
/**
|
||||
* Copy the transaction for replaying and finalize it. This
|
||||
* allows the checksums to be compared. The current transaction
|
||||
* is closed as the replaying opens a new transaction.
|
||||
*/
|
||||
m_replayed_trx = m_trx;
|
||||
m_replayed_trx.finalize();
|
||||
m_trx.close();
|
||||
/**
|
||||
* Copy the transaction for replaying and finalize it. This
|
||||
* allows the checksums to be compared. The current transaction
|
||||
* is closed as the replaying opens a new transaction.
|
||||
*/
|
||||
m_replayed_trx = m_trx;
|
||||
m_replayed_trx.finalize();
|
||||
m_trx.close();
|
||||
|
||||
if (m_replayed_trx.have_stmts())
|
||||
{
|
||||
// Pop the first statement and start replaying the transaction
|
||||
retry_query(m_replayed_trx.pop_stmt(), 0);
|
||||
}
|
||||
else
|
||||
{
|
||||
/**
|
||||
* The transaction was only opened and no queries have been
|
||||
* executed. The buffer should contain a query that starts
|
||||
* a transaction.
|
||||
*/
|
||||
ss_info_dassert(qc_get_trx_type_mask(m_interrupted_query.get()) & QUERY_TYPE_BEGIN_TRX,
|
||||
"The current query should start a transaction");
|
||||
retry_query(m_interrupted_query.release(), 0);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
ss_info_dassert(!session_is_autocommit(m_client->session),
|
||||
"Session should have autocommit disabled if the transaction "
|
||||
"had no statements and no query was interrupted");
|
||||
}
|
||||
|
||||
// Pop the first statement and start replaying the transaction
|
||||
retry_query(m_replayed_trx.pop_stmt(), 0);
|
||||
rval = true;
|
||||
}
|
||||
|
||||
|
@ -83,13 +83,27 @@ public:
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if transaction is empty
|
||||
* Check if transaction has statements
|
||||
*
|
||||
* @return True if transaction has no statements
|
||||
* @return True if transaction has statements
|
||||
*
|
||||
* @note This function should only be used when checking whether a transaction
|
||||
* that is being replayed has ended. The empty() method can be used
|
||||
* to check whether statements were added to the transaction.
|
||||
*/
|
||||
bool have_stmts() const
|
||||
{
|
||||
return !m_log.empty();
|
||||
}
|
||||
|
||||
/**
|
||||
* Check whether the transaction is empty
|
||||
*
|
||||
* @return True if no statements have been added to the transaction
|
||||
*/
|
||||
bool empty() const
|
||||
{
|
||||
return m_log.empty();
|
||||
return m_size == 0;
|
||||
}
|
||||
|
||||
/**
|
||||
|
Reference in New Issue
Block a user