Fix transaction replay checksum mismatches

The transaction replay could get mixed up with new queries if the client
managed to perform one while the delayed routing was taking place. A
proper way to solve this would be to cork the client DCB until the
transaction is fully replayed. As this change would be relatively more
complex compared to simply labeling queries that are being retried the
corking implementation is left for later when a more complete solution can
be designed.

This commit also adds some of the missing info logging for the transaction
replaying which makes analysis of failures easier.
This commit is contained in:
Markus Mäkelä
2018-11-12 18:14:54 +02:00
parent 0355398425
commit c32bb18862
3 changed files with 28 additions and 1 deletions

View File

@ -55,6 +55,7 @@ typedef enum
GWBUF_TYPE_COLLECT_RESULT = (1 << 2), GWBUF_TYPE_COLLECT_RESULT = (1 << 2),
GWBUF_TYPE_RESULT = (1 << 3), GWBUF_TYPE_RESULT = (1 << 3),
GWBUF_TYPE_REPLY_OK = (1 << 4), GWBUF_TYPE_REPLY_OK = (1 << 4),
GWBUF_TYPE_REPLAYED = (1 << 5),
} gwbuf_type_t; } gwbuf_type_t;
#define GWBUF_IS_TYPE_UNDEFINED(b) ((b)->gwbuf_type == 0) #define GWBUF_IS_TYPE_UNDEFINED(b) ((b)->gwbuf_type == 0)
@ -63,6 +64,9 @@ typedef enum
#define GWBUF_SHOULD_COLLECT_RESULT(b) ((b)->gwbuf_type & GWBUF_TYPE_COLLECT_RESULT) #define GWBUF_SHOULD_COLLECT_RESULT(b) ((b)->gwbuf_type & GWBUF_TYPE_COLLECT_RESULT)
#define GWBUF_IS_REPLY_OK(b) ((b)->gwbuf_type & GWBUF_TYPE_REPLY_OK) #define GWBUF_IS_REPLY_OK(b) ((b)->gwbuf_type & GWBUF_TYPE_REPLY_OK)
// True if the query is not initiated by the client but an internal replaying mechanism
#define GWBUF_IS_REPLAYED(b) ((b)->gwbuf_type & GWBUF_TYPE_REPLAYED)
typedef enum typedef enum
{ {
GWBUF_INFO_NONE = 0x0, GWBUF_INFO_NONE = 0x0,

View File

@ -94,6 +94,16 @@ void RWSplitSession::retry_query(GWBUF* querybuf, int delay)
mxb_assert(querybuf); mxb_assert(querybuf);
// Try to route the query again later // Try to route the query again later
MXS_SESSION* session = m_client->session; MXS_SESSION* session = m_client->session;
/**
* Used to distinct retried queries from new ones while we're doing transaction replay.
* Not the cleanest way to do things but this will have to do for 2.3.
*
* TODO: Figure out a way to "cork" the client DCB as that would remove the need for this and be
* architecturally more clear.
*/
gwbuf_set_type(querybuf, GWBUF_TYPE_REPLAYED);
session_delay_routing(session, router_as_downstream(session), querybuf, delay); session_delay_routing(session, router_as_downstream(session), querybuf, delay);
++m_retry_duration; ++m_retry_duration;
} }

View File

@ -131,6 +131,15 @@ int32_t RWSplitSession::routeQuery(GWBUF* querybuf)
{ {
int rval = 0; int rval = 0;
if (m_is_replay_active && !GWBUF_IS_REPLAYED(querybuf))
{
MXS_INFO("New query received while transaction replay is active: %s",
mxs::extract_sql(querybuf).c_str());
mxb_assert(!m_interrupted_query.get());
m_interrupted_query.reset(querybuf);
return 1;
}
if (m_query_queue == NULL if (m_query_queue == NULL
&& (m_expected_responses == 0 && (m_expected_responses == 0
|| m_qc.load_data_state() == QueryClassifier::LOAD_DATA_ACTIVE || m_qc.load_data_state() == QueryClassifier::LOAD_DATA_ACTIVE
@ -775,7 +784,9 @@ bool RWSplitSession::start_trx_replay()
if (m_replayed_trx.have_stmts()) if (m_replayed_trx.have_stmts())
{ {
// Pop the first statement and start replaying the transaction // Pop the first statement and start replaying the transaction
retry_query(m_replayed_trx.pop_stmt(), 0); GWBUF* buf = m_replayed_trx.pop_stmt();
MXS_INFO("Replaying: %s", mxs::extract_sql(buf, 1024).c_str());
retry_query(buf, 0);
} }
else else
{ {
@ -786,6 +797,8 @@ bool RWSplitSession::start_trx_replay()
*/ */
mxb_assert_message(qc_get_trx_type_mask(m_interrupted_query.get()) & QUERY_TYPE_BEGIN_TRX, mxb_assert_message(qc_get_trx_type_mask(m_interrupted_query.get()) & QUERY_TYPE_BEGIN_TRX,
"The current query should start a transaction"); "The current query should start a transaction");
MXS_INFO("Retrying interrupted query: %s",
mxs::extract_sql(m_interrupted_query.get()).c_str());
retry_query(m_interrupted_query.release(), 0); retry_query(m_interrupted_query.release(), 0);
} }
} }