From 445eece95bd06b6f29fa746a85890568caeec7f7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Sun, 3 Jun 2018 19:16:03 +0300 Subject: [PATCH] MXS-1507: Fix replaying of empty transactions If the starting of a transaction was interrupted by a server failure, the query needs to be retried. This needs to be done as a transaction replay to keep the routing logic consistent and simple. When a non-autocommit transaction is interrupted, there will be no query in progress and no replaying is needed. To handle this case, the replay initialization logic needed to be altered to treat truly empty transactions as a success case. --- include/maxscale/utils.hh | 1 + .../routing/readwritesplit/rwsplitsession.cc | 145 ++++++++++++------ server/modules/routing/readwritesplit/trx.hh | 20 ++- 3 files changed, 119 insertions(+), 47 deletions(-) diff --git a/include/maxscale/utils.hh b/include/maxscale/utils.hh index f2befe54b..93d5e7cdd 100644 --- a/include/maxscale/utils.hh +++ b/include/maxscale/utils.hh @@ -480,6 +480,7 @@ public: SHA1Checksum() { SHA1_Init(&m_ctx); + m_sum.fill(0); // CentOS 6 doesn't like aggregate initialization... } void update(GWBUF* buffer) diff --git a/server/modules/routing/readwritesplit/rwsplitsession.cc b/server/modules/routing/readwritesplit/rwsplitsession.cc index 5ba1ee918..572a2c5ba 100644 --- a/server/modules/routing/readwritesplit/rwsplitsession.cc +++ b/server/modules/routing/readwritesplit/rwsplitsession.cc @@ -387,39 +387,57 @@ GWBUF* RWSplitSession::handle_causal_read_reply(GWBUF *writebuf, SRWBackend& bac void RWSplitSession::handle_trx_replay() { - if (m_replayed_trx.empty()) - { - // No more statements to execute - m_is_replay_active = false; - atomic_add_uint64(&m_router->stats().n_trx_replay, 1); - - // Check that the checksums match. - SHA1Checksum chksum = m_trx.checksum(); - chksum.finalize(); - - if (chksum == m_replayed_trx.checksum()) - { - MXS_INFO("Checksums match, replay successful."); - - if (m_interrupted_query.get()) - { - MXS_INFO("Resuming execution: %s", mxs::extract_sql(m_interrupted_query.get()).c_str()); - retry_query(m_interrupted_query.release(), 0); - } - } - else - { - MXS_INFO("Checksum mismatch, transaction replay failed. Closing connection."); - poll_fake_hangup_event(m_client); - } - } - else + if (m_replayed_trx.have_stmts()) { // More statements to replay, pop the oldest one and execute it GWBUF* buf = m_replayed_trx.pop_stmt(); MXS_INFO("Replaying: %s", mxs::extract_sql(buf, 1024).c_str()); retry_query(buf, 0); } + else + { + // No more statements to execute + m_is_replay_active = false; + atomic_add_uint64(&m_router->stats().n_trx_replay, 1); + + if (!m_replayed_trx.empty()) + { + // Check that the checksums match. + SHA1Checksum chksum = m_trx.checksum(); + chksum.finalize(); + + if (chksum == m_replayed_trx.checksum()) + { + MXS_INFO("Checksums match, replay successful."); + + if (m_interrupted_query.get()) + { + MXS_INFO("Resuming execution: %s", mxs::extract_sql(m_interrupted_query.get()).c_str()); + retry_query(m_interrupted_query.release(), 0); + } + } + else + { + MXS_INFO("Checksum mismatch, transaction replay failed. Closing connection."); + modutil_send_mysql_err_packet(m_client, 0, 0, 1927, "08S01", + "Transaction checksum mismatch encountered " + "when replaying transaction."); + poll_fake_hangup_event(m_client); + } + } + else + { + /** + * The transaction was "empty". This means that the start of the transaction + * did not finish before we started the replay process. + * + * The transaction that is being currently replayed has a result, + * whereas the original interrupted transaction had none. Due to this, + * the checksums would not match if they were to be compared. + */ + ss_info_dassert(!m_interrupted_query.get(), "Interrupted query should be empty"); + } + } } void RWSplitSession::clientReply(GWBUF *writebuf, DCB *backend_dcb) @@ -501,9 +519,24 @@ void RWSplitSession::clientReply(GWBUF *writebuf, DCB *backend_dcb) ss_dassert(m_config.transaction_replay); handle_trx_replay(); - // Ignore the response, the client doesn't need it - gwbuf_free(writebuf); - return; + /** + * If the start of the transaction was interrupted, we need to return + * the result to the client. + * + * This retrying of START TRANSACTION is done with the transaction replay + * mechanism instead of the normal query retry mechanism because the safeguards + * in the routing logic prevent retrying of individual queries inside transactions. + * + * If the transaction was not empty and some results have already been + * sent to the client, we must discard all responses that the client already has. + */ + + if (!m_replayed_trx.empty()) + { + // Client already has this response, discard it + gwbuf_free(writebuf); + return; + } } else if (m_config.transaction_replay && session_trx_is_ending(m_client->session)) { @@ -566,23 +599,47 @@ bool RWSplitSession::start_trx_replay() if (!m_is_replay_active && m_config.transaction_replay && m_can_replay_trx) { - // Stash any interrupted queries while we replay the transaction - m_interrupted_query.reset(m_current_query.release()); + if (m_trx.have_stmts() || m_current_query.get()) + { + // Stash any interrupted queries while we replay the transaction + m_interrupted_query.reset(m_current_query.release()); - MXS_INFO("Starting transaction replay"); - m_is_replay_active = true; + MXS_INFO("Starting transaction replay"); + m_is_replay_active = true; - /** - * Copy the transaction for replaying and finalize it. This - * allows the checksums to be compared. The current transaction - * is closed as the replaying opens a new transaction. - */ - m_replayed_trx = m_trx; - m_replayed_trx.finalize(); - m_trx.close(); + /** + * Copy the transaction for replaying and finalize it. This + * allows the checksums to be compared. The current transaction + * is closed as the replaying opens a new transaction. + */ + m_replayed_trx = m_trx; + m_replayed_trx.finalize(); + m_trx.close(); + + if (m_replayed_trx.have_stmts()) + { + // Pop the first statement and start replaying the transaction + retry_query(m_replayed_trx.pop_stmt(), 0); + } + else + { + /** + * The transaction was only opened and no queries have been + * executed. The buffer should contain a query that starts + * a transaction. + */ + ss_info_dassert(qc_get_trx_type_mask(m_interrupted_query.get()) & QUERY_TYPE_BEGIN_TRX, + "The current query should start a transaction"); + retry_query(m_interrupted_query.release(), 0); + } + } + else + { + ss_info_dassert(!session_is_autocommit(m_client->session), + "Session should have autocommit disabled if the transaction " + "had no statements and no query was interrupted"); + } - // Pop the first statement and start replaying the transaction - retry_query(m_replayed_trx.pop_stmt(), 0); rval = true; } diff --git a/server/modules/routing/readwritesplit/trx.hh b/server/modules/routing/readwritesplit/trx.hh index ace350a0b..11c677c77 100644 --- a/server/modules/routing/readwritesplit/trx.hh +++ b/server/modules/routing/readwritesplit/trx.hh @@ -83,13 +83,27 @@ public: } /** - * Check if transaction is empty + * Check if transaction has statements * - * @return True if transaction has no statements + * @return True if transaction has statements + * + * @note This function should only be used when checking whether a transaction + * that is being replayed has ended. The empty() method can be used + * to check whether statements were added to the transaction. + */ + bool have_stmts() const + { + return !m_log.empty(); + } + + /** + * Check whether the transaction is empty + * + * @return True if no statements have been added to the transaction */ bool empty() const { - return m_log.empty(); + return m_size == 0; } /**