From 0614ff4c9dde363170ef92e42fbee09cd416826d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Sat, 7 Jul 2018 01:42:11 +0300 Subject: [PATCH] Fix handling of transactions with large results If transaction replaying was enabled and a result was returned in more than one call to clientReply, a NULL value would be added to the statement which in turn would trigger a debug assertion. Similarly any following statements in the transaction would be executed regardless of whether the result was complete. Renamed the statement execution function to better describe what it does. Extended the basic functional test case to cover this. --- maxscale-system-test/mxs1507_trx_replay.cpp | 13 ++++++++++++ .../routing/readwritesplit/rwsplitsession.cc | 21 ++++++++++++++++--- .../routing/readwritesplit/rwsplitsession.hh | 2 +- 3 files changed, 32 insertions(+), 4 deletions(-) diff --git a/maxscale-system-test/mxs1507_trx_replay.cpp b/maxscale-system-test/mxs1507_trx_replay.cpp index 8aefded32..e15dc8ea8 100644 --- a/maxscale-system-test/mxs1507_trx_replay.cpp +++ b/maxscale-system-test/mxs1507_trx_replay.cpp @@ -60,6 +60,19 @@ int main(int argc, char** argv) { } }, + { + "Large result", + { + bind(ok, "BEGIN"), + bind(ok, "SELECT REPEAT('a', 100000)"), + }, + { + bind(ok, "SELECT REPEAT('a', 100000)"), + bind(ok, "COMMIT"), + }, + { + } + }, { "Transaction with a write", { diff --git a/server/modules/routing/readwritesplit/rwsplitsession.cc b/server/modules/routing/readwritesplit/rwsplitsession.cc index 4416ce657..8d7c6e3bb 100644 --- a/server/modules/routing/readwritesplit/rwsplitsession.cc +++ b/server/modules/routing/readwritesplit/rwsplitsession.cc @@ -376,7 +376,7 @@ GWBUF* RWSplitSession::handle_causal_read_reply(GWBUF *writebuf, SRWBackend& bac return writebuf; } -void RWSplitSession::handle_trx_replay() +void RWSplitSession::trx_replay_next_stmt() { if (m_replayed_trx.have_stmts()) { @@ -495,8 +495,17 @@ void RWSplitSession::clientReply(GWBUF *writebuf, DCB *backend_dcb) { /** Transaction size is OK, store the statement for replaying and * update the checksum of the result */ - m_trx.add_stmt(m_current_query.release()); m_trx.add_result(writebuf); + + if (m_current_query.get()) + { + // TODO: Don't replay transactions interrupted mid-result. Currently + // the client will receive a `Packets out of order` error if this happens. + + // Add the statement to the transaction once the first part + // of the result is received. + m_trx.add_stmt(m_current_query.release()); + } } else { @@ -561,7 +570,12 @@ void RWSplitSession::clientReply(GWBUF *writebuf, DCB *backend_dcb) else if (m_is_replay_active) { ss_dassert(m_config->transaction_replay); - handle_trx_replay(); + + if (m_expected_responses == 0) + { + // Current statement is complete, continue with the next one + trx_replay_next_stmt(); + } /** * If the start of the transaction was interrupted, we need to return @@ -598,6 +612,7 @@ void RWSplitSession::clientReply(GWBUF *writebuf, DCB *backend_dcb) } else if (m_expected_responses == 0 && m_query_queue) { + ss_dassert(!m_is_replay_active); // Note: We might currently end up here // All replies received, route any stored queries route_stored_query(); } diff --git a/server/modules/routing/readwritesplit/rwsplitsession.hh b/server/modules/routing/readwritesplit/rwsplitsession.hh index d1836f58d..f93c673e3 100644 --- a/server/modules/routing/readwritesplit/rwsplitsession.hh +++ b/server/modules/routing/readwritesplit/rwsplitsession.hh @@ -202,7 +202,7 @@ private: void handle_error_reply_client(DCB *backend_dcb, GWBUF *errmsg); bool handle_error_new_connection(DCB *backend_dcb, GWBUF *errmsg); - void handle_trx_replay(); + void trx_replay_next_stmt(); // Do we have at least one open slave connection bool have_connected_slaves() const;