From 01bf5cc8b012e2505be1903728d009b1dfa19ce9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Thu, 19 Apr 2018 15:36:14 +0300 Subject: [PATCH] MXS-1507: Add initial implementation of transaction replay Added the initial implementation of transaction replay. Transactions are only replayed if the master fails when no statement is being executed. The validity of the replayed transaction is done by verifying that the checksums of the returned results are equal. Added a close function into the Trx class to make resetting its state easier. Also changed the return type of the pop_stmt to GWBUF* as the places where it is used expect a raw GWBUF pointer. --- .../readwritesplit/rwsplit_route_stmt.cc | 4 +- .../routing/readwritesplit/rwsplitsession.cc | 103 ++++++++++++++---- .../routing/readwritesplit/rwsplitsession.hh | 6 +- server/modules/routing/readwritesplit/trx.hh | 21 +++- 4 files changed, 102 insertions(+), 32 deletions(-) diff --git a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc index 2102a6de7..e784bf9e9 100644 --- a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc +++ b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc @@ -254,7 +254,7 @@ bool RWSplitSession::route_single_stmt(GWBUF *querybuf) } } } - else if (can_retry_query()) + else if (can_retry_query() || m_is_replay_active) { retry_query(gwbuf_clone(querybuf)); succp = true; @@ -819,7 +819,7 @@ bool RWSplitSession::should_replace_master(SRWBackend& target) // We have a target server and it's not the current master target && target != m_current_master && // We are not inside a transaction (also checks for autocommit=1) - !session_trx_is_active(m_client->session) && + (!session_trx_is_active(m_client->session) || m_is_replay_active) && // We are not locked to the old master !is_locked_to_master(); } diff --git a/server/modules/routing/readwritesplit/rwsplitsession.cc b/server/modules/routing/readwritesplit/rwsplitsession.cc index c8cb3d5e3..bfab332ab 100644 --- a/server/modules/routing/readwritesplit/rwsplitsession.cc +++ b/server/modules/routing/readwritesplit/rwsplitsession.cc @@ -16,6 +16,7 @@ #include #include +#include using namespace maxscale; @@ -38,7 +39,8 @@ RWSplitSession::RWSplitSession(RWSplit* instance, MXS_SESSION* session, m_waiting_for_gtid(false), m_next_seq(0), m_qc(this, session, instance->config().use_sql_variables_in), - m_retry_duration(0) + m_retry_duration(0), + m_is_replay_active(false) { if (m_config.rw_max_slave_conn_percent) { @@ -371,6 +373,36 @@ GWBUF* RWSplitSession::handle_causal_read_reply(GWBUF *writebuf, SRWBackend& bac return writebuf; } +void RWSplitSession::handle_trx_replay() +{ + if (m_replayed_trx.empty()) + { + // No more statements to execute + m_is_replay_active = false; + + // Check that the checksums match. + SHA1Checksum chksum = m_trx.checksum(); + chksum.finalize(); + + if (chksum == m_replayed_trx.checksum()) + { + MXS_INFO("Checksums match, replay successful."); + } + else + { + MXS_INFO("Checksum mismatch, transaction replay failed. Closing connection."); + poll_fake_hangup_event(m_client); + } + } + else + { + // More statements to replay, pop the oldest one and execute it + GWBUF* buf = m_replayed_trx.pop_stmt(); + MXS_INFO("Replaying: %s", mxs::extract_sql(buf, 1024).c_str()); + retry_query(buf, 0); + } +} + void RWSplitSession::clientReply(GWBUF *writebuf, DCB *backend_dcb) { DCB *client_dcb = backend_dcb->session->client_dcb; @@ -401,11 +433,7 @@ void RWSplitSession::clientReply(GWBUF *writebuf, DCB *backend_dcb) m_current_query.reset(); - if (session_trx_is_ending(m_client->session)) - { - close_transaction(); - } - else if (session_trx_is_active(m_client->session)) + if (session_trx_is_active(m_client->session)) { m_trx.add_result(writebuf); } @@ -425,6 +453,19 @@ void RWSplitSession::clientReply(GWBUF *writebuf, DCB *backend_dcb) m_expected_responses, backend->name()); } + if (m_is_replay_active) + { + handle_trx_replay(); + + // Ignore the response, the client doesn't need it + gwbuf_free(writebuf); + return; + } + else if (session_trx_is_ending(m_client->session)) + { + m_trx.close(); + } + if (backend->has_session_commands()) { /** Reply to an executed session command */ @@ -474,6 +515,24 @@ void check_and_log_backend_state(const SRWBackend& backend, DCB* problem_dcb) } } +void RWSplitSession::start_trx_replay() +{ + MXS_INFO("Starting transaction replay"); + m_is_replay_active = true; + + /** + * Copy the transaction for replaying and finalize it. This + * allows the checksums to be compared. The current transaction + * is closed as the replaying opens a new transaction. + */ + m_replayed_trx = m_trx; + m_replayed_trx.finalize(); + m_trx.close(); + + // Pop the first statement and start replaying the transaction + retry_query(m_replayed_trx.pop_stmt(), 0); +} + /** * @brief Router error handling routine * @@ -556,11 +615,23 @@ void RWSplitSession::handleError(GWBUF *errmsgbuf, DCB *problem_dcb, } } - if (session_trx_is_active(session)) + if (m_is_replay_active) { - // We have an open transaction, we can't continue + MXS_INFO("Failed to replay transaction. Closing connection."); can_continue = false; - close_transaction(); + } + else if (session_trx_is_active(session)) + { + if (m_current_query.get()) + { + // TODO: Re-execute interrupted queries in transactions + can_continue = false; + } + else + { + can_continue = true; + start_trx_replay(); + } } *succp = can_continue; @@ -759,17 +830,3 @@ bool RWSplitSession::supports_hint(HINT_TYPE hint_type) const return rv; } - -void RWSplitSession::close_transaction() -{ - m_trx.finalize(); - MXS_INFO("Checksum of current transaction: %s", m_trx.checksum().hex().c_str()); - int i = 1; - - while (!m_trx.empty()) - { - const int max_len = 1024; - mxs::Buffer buf = m_trx.pop_stmt(); - MXS_INFO("%d: %s", i++, mxs::extract_sql(buf.get(), max_len).c_str()); - } -} diff --git a/server/modules/routing/readwritesplit/rwsplitsession.hh b/server/modules/routing/readwritesplit/rwsplitsession.hh index 8e2453643..7266edfee 100644 --- a/server/modules/routing/readwritesplit/rwsplitsession.hh +++ b/server/modules/routing/readwritesplit/rwsplitsession.hh @@ -138,6 +138,8 @@ public: uint64_t m_retry_duration; /**< Total time spent retrying queries */ mxs::Buffer m_current_query; /**< Current query being executed */ Trx m_trx; /**< Current transaction */ + bool m_is_replay_active; /**< Whether we are actively replaying a transaction */ + Trx m_replayed_trx; /**< The transaction we are replaying */ private: RWSplitSession(RWSplit* instance, MXS_SESSION* session, @@ -181,8 +183,8 @@ private: void handle_error_reply_client(DCB *backend_dcb, GWBUF *errmsg); bool handle_error_new_connection(DCB *backend_dcb, GWBUF *errmsg); - // Currently only for diagnostic purposes - void close_transaction(); + void handle_trx_replay(); + void start_trx_replay(); private: // QueryClassifier::Handler diff --git a/server/modules/routing/readwritesplit/trx.hh b/server/modules/routing/readwritesplit/trx.hh index d6c2912fa..29eac5f1a 100644 --- a/server/modules/routing/readwritesplit/trx.hh +++ b/server/modules/routing/readwritesplit/trx.hh @@ -19,13 +19,12 @@ #include #include -// A log of executed queries, for transaction replay -typedef std::deque TrxLog; - // A transaction class Trx { public: + // A log of executed queries, for transaction replay + typedef std::deque TrxLog; /** * Add a statement to the transaction @@ -57,9 +56,10 @@ public: * * @return The oldest statement in this transaction */ - mxs::Buffer pop_stmt() + GWBUF* pop_stmt() { - mxs::Buffer rval = m_log.front(); + ss_dassert(!m_log.empty()); + GWBUF* rval = m_log.front().release(); m_log.pop_front(); return rval; } @@ -86,6 +86,17 @@ public: return m_log.empty(); } + /** + * Close the transaction + * + * This clears out the stored statements and resets the checksum state. + */ + void close() + { + m_checksum.reset(); + m_log.clear(); + } + /** * Return the current checksum *