diff --git a/server/modules/routing/readwritesplit/rwsplitsession.cc b/server/modules/routing/readwritesplit/rwsplitsession.cc index 6cd97509a..930347ade 100644 --- a/server/modules/routing/readwritesplit/rwsplitsession.cc +++ b/server/modules/routing/readwritesplit/rwsplitsession.cc @@ -481,21 +481,35 @@ void RWSplitSession::clientReply(GWBUF *writebuf, DCB *backend_dcb) if (m_config.transaction_replay && m_can_replay_trx && session_trx_is_active(m_client->session)) { - size_t size{m_trx.size() + m_current_query.length()}; - // A transaction is open and it is eligible for replaying - if (size < m_config.trx_max_size) + if (!backend->has_session_commands()) { - /** Transaction size is OK, store the statement for replaying and - * update the checksum of the result */ - m_trx.add_stmt(m_current_query.release()); - m_trx.add_result(writebuf); - } - else - { - MXS_INFO("Transaction is too big (%lu bytes), can't replay if it fails.", size); - m_current_query.reset(); - m_trx.close(); - m_can_replay_trx = false; + /** + * Session commands are tracked separately from the transaction. + * We must not put any response to a session command into + * the transaction as they are tracked separately. + * + * TODO: It might be wise to include the session commands to guarantee + * that the session state during the transaction replay remains + * consistent if the state change in the middle of the transaction + * is intentional. + */ + + size_t size{m_trx.size() + m_current_query.length()}; + // A transaction is open and it is eligible for replaying + if (size < m_config.trx_max_size) + { + /** Transaction size is OK, store the statement for replaying and + * update the checksum of the result */ + m_trx.add_stmt(m_current_query.release()); + m_trx.add_result(writebuf); + } + else + { + MXS_INFO("Transaction is too big (%lu bytes), can't replay if it fails.", size); + m_current_query.reset(); + m_trx.close(); + m_can_replay_trx = false; + } } } else @@ -531,7 +545,13 @@ void RWSplitSession::clientReply(GWBUF *writebuf, DCB *backend_dcb) m_expected_responses, backend->name()); } - if (m_is_replay_active) + if (backend->has_session_commands()) + { + /** Process the reply to an executed session command. This function can + * close the backend if it's a slave. */ + process_sescmd_response(backend, &writebuf); + } + else if (m_is_replay_active) { ss_dassert(m_config.transaction_replay); handle_trx_replay(); @@ -561,13 +581,6 @@ void RWSplitSession::clientReply(GWBUF *writebuf, DCB *backend_dcb) m_can_replay_trx = true; } - if (backend->has_session_commands()) - { - /** Process the reply to an executed session command. This function can - * close the backend if it's a slave. */ - process_sescmd_response(backend, &writebuf); - } - if (backend->in_use() && backend->has_session_commands()) { // Backend is still in use and has more session commands to execute diff --git a/server/modules/routing/readwritesplit/trx.hh b/server/modules/routing/readwritesplit/trx.hh index 91ec0b7a5..8803f6121 100644 --- a/server/modules/routing/readwritesplit/trx.hh +++ b/server/modules/routing/readwritesplit/trx.hh @@ -18,6 +18,7 @@ #include #include +#include // A transaction class Trx @@ -38,6 +39,13 @@ public: */ void add_stmt(GWBUF* buf) { + ss_info_dassert(buf, "Trx::add_stmt: Buffer must not be empty"); + + if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO)) + { + MXS_INFO("Adding to trx: %s", mxs::extract_sql(buf, 512).c_str()); + } + m_size += gwbuf_length(buf); m_log.emplace_back(buf); } @@ -123,6 +131,7 @@ public: */ void close() { + MXS_INFO("Transaction is complete"); m_checksum.reset(); m_log.clear(); m_size = 0;