From c32bb18862ca2cac83a92aa67c713a9258fe0599 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Mon, 12 Nov 2018 18:14:54 +0200 Subject: [PATCH] Fix transaction replay checksum mismatches The transaction replay could get mixed up with new queries if the client managed to perform one while the delayed routing was taking place. A proper way to solve this would be to cork the client DCB until the transaction is fully replayed. As this change would be relatively more complex compared to simply labeling queries that are being retried the corking implementation is left for later when a more complete solution can be designed. This commit also adds some of the missing info logging for the transaction replaying which makes analysis of failures easier. --- include/maxscale/buffer.h | 4 ++++ .../routing/readwritesplit/rwsplit_route_stmt.cc | 10 ++++++++++ .../routing/readwritesplit/rwsplitsession.cc | 15 ++++++++++++++- 3 files changed, 28 insertions(+), 1 deletion(-) diff --git a/include/maxscale/buffer.h b/include/maxscale/buffer.h index c0aadc99d..3dafdb0b9 100644 --- a/include/maxscale/buffer.h +++ b/include/maxscale/buffer.h @@ -55,6 +55,7 @@ typedef enum GWBUF_TYPE_COLLECT_RESULT = (1 << 2), GWBUF_TYPE_RESULT = (1 << 3), GWBUF_TYPE_REPLY_OK = (1 << 4), + GWBUF_TYPE_REPLAYED = (1 << 5), } gwbuf_type_t; #define GWBUF_IS_TYPE_UNDEFINED(b) ((b)->gwbuf_type == 0) @@ -63,6 +64,9 @@ typedef enum #define GWBUF_SHOULD_COLLECT_RESULT(b) ((b)->gwbuf_type & GWBUF_TYPE_COLLECT_RESULT) #define GWBUF_IS_REPLY_OK(b) ((b)->gwbuf_type & GWBUF_TYPE_REPLY_OK) +// True if the query is not initiated by the client but an internal replaying mechanism +#define GWBUF_IS_REPLAYED(b) ((b)->gwbuf_type & GWBUF_TYPE_REPLAYED) + typedef enum { GWBUF_INFO_NONE = 0x0, diff --git a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc index 2d09b153c..b3d21c427 100644 --- a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc +++ b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc @@ -94,6 +94,16 @@ void RWSplitSession::retry_query(GWBUF* querybuf, int delay) mxb_assert(querybuf); // Try to route the query again later MXS_SESSION* session = m_client->session; + + /** + * Used to distinct retried queries from new ones while we're doing transaction replay. + * Not the cleanest way to do things but this will have to do for 2.3. + * + * TODO: Figure out a way to "cork" the client DCB as that would remove the need for this and be + * architecturally more clear. + */ + gwbuf_set_type(querybuf, GWBUF_TYPE_REPLAYED); + session_delay_routing(session, router_as_downstream(session), querybuf, delay); ++m_retry_duration; } diff --git a/server/modules/routing/readwritesplit/rwsplitsession.cc b/server/modules/routing/readwritesplit/rwsplitsession.cc index d5de1a40d..76615472e 100644 --- a/server/modules/routing/readwritesplit/rwsplitsession.cc +++ b/server/modules/routing/readwritesplit/rwsplitsession.cc @@ -131,6 +131,15 @@ int32_t RWSplitSession::routeQuery(GWBUF* querybuf) { int rval = 0; + if (m_is_replay_active && !GWBUF_IS_REPLAYED(querybuf)) + { + MXS_INFO("New query received while transaction replay is active: %s", + mxs::extract_sql(querybuf).c_str()); + mxb_assert(!m_interrupted_query.get()); + m_interrupted_query.reset(querybuf); + return 1; + } + if (m_query_queue == NULL && (m_expected_responses == 0 || m_qc.load_data_state() == QueryClassifier::LOAD_DATA_ACTIVE @@ -775,7 +784,9 @@ bool RWSplitSession::start_trx_replay() if (m_replayed_trx.have_stmts()) { // Pop the first statement and start replaying the transaction - retry_query(m_replayed_trx.pop_stmt(), 0); + GWBUF* buf = m_replayed_trx.pop_stmt(); + MXS_INFO("Replaying: %s", mxs::extract_sql(buf, 1024).c_str()); + retry_query(buf, 0); } else { @@ -786,6 +797,8 @@ bool RWSplitSession::start_trx_replay() */ mxb_assert_message(qc_get_trx_type_mask(m_interrupted_query.get()) & QUERY_TYPE_BEGIN_TRX, "The current query should start a transaction"); + MXS_INFO("Retrying interrupted query: %s", + mxs::extract_sql(m_interrupted_query.get()).c_str()); retry_query(m_interrupted_query.release(), 0); } }