MXS-1507: Add initial implementation of transaction replay

Added the initial implementation of transaction replay. Transactions are
only replayed if the master fails when no statement is being executed.

The validity of the replayed transaction is done by verifying that the
checksums of the returned results are equal.

Added a close function into the Trx class to make resetting its state
easier. Also changed the return type of the pop_stmt to GWBUF* as the
places where it is used expect a raw GWBUF pointer.
This commit is contained in:
Markus Mäkelä
2018-04-19 15:36:14 +03:00
parent daecb6980b
commit 01bf5cc8b0
4 changed files with 102 additions and 32 deletions

View File

@ -16,6 +16,7 @@
#include <cmath>
#include <maxscale/modutil.hh>
#include <maxscale/poll.h>
using namespace maxscale;
@ -38,7 +39,8 @@ RWSplitSession::RWSplitSession(RWSplit* instance, MXS_SESSION* session,
m_waiting_for_gtid(false),
m_next_seq(0),
m_qc(this, session, instance->config().use_sql_variables_in),
m_retry_duration(0)
m_retry_duration(0),
m_is_replay_active(false)
{
if (m_config.rw_max_slave_conn_percent)
{
@ -371,6 +373,36 @@ GWBUF* RWSplitSession::handle_causal_read_reply(GWBUF *writebuf, SRWBackend& bac
return writebuf;
}
void RWSplitSession::handle_trx_replay()
{
if (m_replayed_trx.empty())
{
// No more statements to execute
m_is_replay_active = false;
// Check that the checksums match.
SHA1Checksum chksum = m_trx.checksum();
chksum.finalize();
if (chksum == m_replayed_trx.checksum())
{
MXS_INFO("Checksums match, replay successful.");
}
else
{
MXS_INFO("Checksum mismatch, transaction replay failed. Closing connection.");
poll_fake_hangup_event(m_client);
}
}
else
{
// More statements to replay, pop the oldest one and execute it
GWBUF* buf = m_replayed_trx.pop_stmt();
MXS_INFO("Replaying: %s", mxs::extract_sql(buf, 1024).c_str());
retry_query(buf, 0);
}
}
void RWSplitSession::clientReply(GWBUF *writebuf, DCB *backend_dcb)
{
DCB *client_dcb = backend_dcb->session->client_dcb;
@ -401,11 +433,7 @@ void RWSplitSession::clientReply(GWBUF *writebuf, DCB *backend_dcb)
m_current_query.reset();
if (session_trx_is_ending(m_client->session))
{
close_transaction();
}
else if (session_trx_is_active(m_client->session))
if (session_trx_is_active(m_client->session))
{
m_trx.add_result(writebuf);
}
@ -425,6 +453,19 @@ void RWSplitSession::clientReply(GWBUF *writebuf, DCB *backend_dcb)
m_expected_responses, backend->name());
}
if (m_is_replay_active)
{
handle_trx_replay();
// Ignore the response, the client doesn't need it
gwbuf_free(writebuf);
return;
}
else if (session_trx_is_ending(m_client->session))
{
m_trx.close();
}
if (backend->has_session_commands())
{
/** Reply to an executed session command */
@ -474,6 +515,24 @@ void check_and_log_backend_state(const SRWBackend& backend, DCB* problem_dcb)
}
}
void RWSplitSession::start_trx_replay()
{
MXS_INFO("Starting transaction replay");
m_is_replay_active = true;
/**
* Copy the transaction for replaying and finalize it. This
* allows the checksums to be compared. The current transaction
* is closed as the replaying opens a new transaction.
*/
m_replayed_trx = m_trx;
m_replayed_trx.finalize();
m_trx.close();
// Pop the first statement and start replaying the transaction
retry_query(m_replayed_trx.pop_stmt(), 0);
}
/**
* @brief Router error handling routine
*
@ -556,11 +615,23 @@ void RWSplitSession::handleError(GWBUF *errmsgbuf, DCB *problem_dcb,
}
}
if (session_trx_is_active(session))
if (m_is_replay_active)
{
// We have an open transaction, we can't continue
MXS_INFO("Failed to replay transaction. Closing connection.");
can_continue = false;
close_transaction();
}
else if (session_trx_is_active(session))
{
if (m_current_query.get())
{
// TODO: Re-execute interrupted queries in transactions
can_continue = false;
}
else
{
can_continue = true;
start_trx_replay();
}
}
*succp = can_continue;
@ -759,17 +830,3 @@ bool RWSplitSession::supports_hint(HINT_TYPE hint_type) const
return rv;
}
void RWSplitSession::close_transaction()
{
m_trx.finalize();
MXS_INFO("Checksum of current transaction: %s", m_trx.checksum().hex().c_str());
int i = 1;
while (!m_trx.empty())
{
const int max_len = 1024;
mxs::Buffer buf = m_trx.pop_stmt();
MXS_INFO("%d: %s", i++, mxs::extract_sql(buf.get(), max_len).c_str());
}
}