diff --git a/include/maxscale/utils.hh b/include/maxscale/utils.hh index f2befe54b..93d5e7cdd 100644 --- a/include/maxscale/utils.hh +++ b/include/maxscale/utils.hh @@ -480,6 +480,7 @@ public: SHA1Checksum() { SHA1_Init(&m_ctx); + m_sum.fill(0); // CentOS 6 doesn't like aggregate initialization... } void update(GWBUF* buffer) diff --git a/server/modules/routing/readwritesplit/rwsplitsession.cc b/server/modules/routing/readwritesplit/rwsplitsession.cc index 5ba1ee918..572a2c5ba 100644 --- a/server/modules/routing/readwritesplit/rwsplitsession.cc +++ b/server/modules/routing/readwritesplit/rwsplitsession.cc @@ -387,39 +387,57 @@ GWBUF* RWSplitSession::handle_causal_read_reply(GWBUF *writebuf, SRWBackend& bac void RWSplitSession::handle_trx_replay() { - if (m_replayed_trx.empty()) - { - // No more statements to execute - m_is_replay_active = false; - atomic_add_uint64(&m_router->stats().n_trx_replay, 1); - - // Check that the checksums match. - SHA1Checksum chksum = m_trx.checksum(); - chksum.finalize(); - - if (chksum == m_replayed_trx.checksum()) - { - MXS_INFO("Checksums match, replay successful."); - - if (m_interrupted_query.get()) - { - MXS_INFO("Resuming execution: %s", mxs::extract_sql(m_interrupted_query.get()).c_str()); - retry_query(m_interrupted_query.release(), 0); - } - } - else - { - MXS_INFO("Checksum mismatch, transaction replay failed. Closing connection."); - poll_fake_hangup_event(m_client); - } - } - else + if (m_replayed_trx.have_stmts()) { // More statements to replay, pop the oldest one and execute it GWBUF* buf = m_replayed_trx.pop_stmt(); MXS_INFO("Replaying: %s", mxs::extract_sql(buf, 1024).c_str()); retry_query(buf, 0); } + else + { + // No more statements to execute + m_is_replay_active = false; + atomic_add_uint64(&m_router->stats().n_trx_replay, 1); + + if (!m_replayed_trx.empty()) + { + // Check that the checksums match. + SHA1Checksum chksum = m_trx.checksum(); + chksum.finalize(); + + if (chksum == m_replayed_trx.checksum()) + { + MXS_INFO("Checksums match, replay successful."); + + if (m_interrupted_query.get()) + { + MXS_INFO("Resuming execution: %s", mxs::extract_sql(m_interrupted_query.get()).c_str()); + retry_query(m_interrupted_query.release(), 0); + } + } + else + { + MXS_INFO("Checksum mismatch, transaction replay failed. Closing connection."); + modutil_send_mysql_err_packet(m_client, 0, 0, 1927, "08S01", + "Transaction checksum mismatch encountered " + "when replaying transaction."); + poll_fake_hangup_event(m_client); + } + } + else + { + /** + * The transaction was "empty". This means that the start of the transaction + * did not finish before we started the replay process. + * + * The transaction that is being currently replayed has a result, + * whereas the original interrupted transaction had none. Due to this, + * the checksums would not match if they were to be compared. + */ + ss_info_dassert(!m_interrupted_query.get(), "Interrupted query should be empty"); + } + } } void RWSplitSession::clientReply(GWBUF *writebuf, DCB *backend_dcb) @@ -501,9 +519,24 @@ void RWSplitSession::clientReply(GWBUF *writebuf, DCB *backend_dcb) ss_dassert(m_config.transaction_replay); handle_trx_replay(); - // Ignore the response, the client doesn't need it - gwbuf_free(writebuf); - return; + /** + * If the start of the transaction was interrupted, we need to return + * the result to the client. + * + * This retrying of START TRANSACTION is done with the transaction replay + * mechanism instead of the normal query retry mechanism because the safeguards + * in the routing logic prevent retrying of individual queries inside transactions. + * + * If the transaction was not empty and some results have already been + * sent to the client, we must discard all responses that the client already has. + */ + + if (!m_replayed_trx.empty()) + { + // Client already has this response, discard it + gwbuf_free(writebuf); + return; + } } else if (m_config.transaction_replay && session_trx_is_ending(m_client->session)) { @@ -566,23 +599,47 @@ bool RWSplitSession::start_trx_replay() if (!m_is_replay_active && m_config.transaction_replay && m_can_replay_trx) { - // Stash any interrupted queries while we replay the transaction - m_interrupted_query.reset(m_current_query.release()); + if (m_trx.have_stmts() || m_current_query.get()) + { + // Stash any interrupted queries while we replay the transaction + m_interrupted_query.reset(m_current_query.release()); - MXS_INFO("Starting transaction replay"); - m_is_replay_active = true; + MXS_INFO("Starting transaction replay"); + m_is_replay_active = true; - /** - * Copy the transaction for replaying and finalize it. This - * allows the checksums to be compared. The current transaction - * is closed as the replaying opens a new transaction. - */ - m_replayed_trx = m_trx; - m_replayed_trx.finalize(); - m_trx.close(); + /** + * Copy the transaction for replaying and finalize it. This + * allows the checksums to be compared. The current transaction + * is closed as the replaying opens a new transaction. + */ + m_replayed_trx = m_trx; + m_replayed_trx.finalize(); + m_trx.close(); + + if (m_replayed_trx.have_stmts()) + { + // Pop the first statement and start replaying the transaction + retry_query(m_replayed_trx.pop_stmt(), 0); + } + else + { + /** + * The transaction was only opened and no queries have been + * executed. The buffer should contain a query that starts + * a transaction. + */ + ss_info_dassert(qc_get_trx_type_mask(m_interrupted_query.get()) & QUERY_TYPE_BEGIN_TRX, + "The current query should start a transaction"); + retry_query(m_interrupted_query.release(), 0); + } + } + else + { + ss_info_dassert(!session_is_autocommit(m_client->session), + "Session should have autocommit disabled if the transaction " + "had no statements and no query was interrupted"); + } - // Pop the first statement and start replaying the transaction - retry_query(m_replayed_trx.pop_stmt(), 0); rval = true; } diff --git a/server/modules/routing/readwritesplit/trx.hh b/server/modules/routing/readwritesplit/trx.hh index ace350a0b..11c677c77 100644 --- a/server/modules/routing/readwritesplit/trx.hh +++ b/server/modules/routing/readwritesplit/trx.hh @@ -83,13 +83,27 @@ public: } /** - * Check if transaction is empty + * Check if transaction has statements * - * @return True if transaction has no statements + * @return True if transaction has statements + * + * @note This function should only be used when checking whether a transaction + * that is being replayed has ended. The empty() method can be used + * to check whether statements were added to the transaction. + */ + bool have_stmts() const + { + return !m_log.empty(); + } + + /** + * Check whether the transaction is empty + * + * @return True if no statements have been added to the transaction */ bool empty() const { - return m_log.empty(); + return m_size == 0; } /**