diff --git a/include/maxscale/buffer.h b/include/maxscale/buffer.h index c0aadc99d..3dafdb0b9 100644 --- a/include/maxscale/buffer.h +++ b/include/maxscale/buffer.h @@ -55,6 +55,7 @@ typedef enum GWBUF_TYPE_COLLECT_RESULT = (1 << 2), GWBUF_TYPE_RESULT = (1 << 3), GWBUF_TYPE_REPLY_OK = (1 << 4), + GWBUF_TYPE_REPLAYED = (1 << 5), } gwbuf_type_t; #define GWBUF_IS_TYPE_UNDEFINED(b) ((b)->gwbuf_type == 0) @@ -63,6 +64,9 @@ typedef enum #define GWBUF_SHOULD_COLLECT_RESULT(b) ((b)->gwbuf_type & GWBUF_TYPE_COLLECT_RESULT) #define GWBUF_IS_REPLY_OK(b) ((b)->gwbuf_type & GWBUF_TYPE_REPLY_OK) +// True if the query is not initiated by the client but an internal replaying mechanism +#define GWBUF_IS_REPLAYED(b) ((b)->gwbuf_type & GWBUF_TYPE_REPLAYED) + typedef enum { GWBUF_INFO_NONE = 0x0, diff --git a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc index 2d09b153c..b3d21c427 100644 --- a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc +++ b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc @@ -94,6 +94,16 @@ void RWSplitSession::retry_query(GWBUF* querybuf, int delay) mxb_assert(querybuf); // Try to route the query again later MXS_SESSION* session = m_client->session; + + /** + * Used to distinct retried queries from new ones while we're doing transaction replay. + * Not the cleanest way to do things but this will have to do for 2.3. + * + * TODO: Figure out a way to "cork" the client DCB as that would remove the need for this and be + * architecturally more clear. + */ + gwbuf_set_type(querybuf, GWBUF_TYPE_REPLAYED); + session_delay_routing(session, router_as_downstream(session), querybuf, delay); ++m_retry_duration; } diff --git a/server/modules/routing/readwritesplit/rwsplitsession.cc b/server/modules/routing/readwritesplit/rwsplitsession.cc index d5de1a40d..76615472e 100644 --- a/server/modules/routing/readwritesplit/rwsplitsession.cc +++ b/server/modules/routing/readwritesplit/rwsplitsession.cc @@ -131,6 +131,15 @@ int32_t RWSplitSession::routeQuery(GWBUF* querybuf) { int rval = 0; + if (m_is_replay_active && !GWBUF_IS_REPLAYED(querybuf)) + { + MXS_INFO("New query received while transaction replay is active: %s", + mxs::extract_sql(querybuf).c_str()); + mxb_assert(!m_interrupted_query.get()); + m_interrupted_query.reset(querybuf); + return 1; + } + if (m_query_queue == NULL && (m_expected_responses == 0 || m_qc.load_data_state() == QueryClassifier::LOAD_DATA_ACTIVE @@ -775,7 +784,9 @@ bool RWSplitSession::start_trx_replay() if (m_replayed_trx.have_stmts()) { // Pop the first statement and start replaying the transaction - retry_query(m_replayed_trx.pop_stmt(), 0); + GWBUF* buf = m_replayed_trx.pop_stmt(); + MXS_INFO("Replaying: %s", mxs::extract_sql(buf, 1024).c_str()); + retry_query(buf, 0); } else { @@ -786,6 +797,8 @@ bool RWSplitSession::start_trx_replay() */ mxb_assert_message(qc_get_trx_type_mask(m_interrupted_query.get()) & QUERY_TYPE_BEGIN_TRX, "The current query should start a transaction"); + MXS_INFO("Retrying interrupted query: %s", + mxs::extract_sql(m_interrupted_query.get()).c_str()); retry_query(m_interrupted_query.release(), 0); } }