MXS-1507: Retry interrupted queries in transactions

As the current query was added to the transaction log before it finished,
the m_current_query contained a duplicate of the latest transaction log
entry. To correctly log only successful transactions, the statement should
be added only after it has successfully completed. This change also
removed the unnecessary cloning that took place when the statement was
added to the log before it finished.

With the fixed transaction logging, the value of m_current_query can be
stashed for later retrying while the replay process is happening. If the
replay completes successfully and the checksums match, the interrupted
query is retried.

Also added a clarifying comment to can_retry_query to explain why a query
inside a transaction cannot be retried.
This commit is contained in:
Markus Mäkelä 2018-04-19 16:10:10 +03:00
parent 01bf5cc8b0
commit c1c942a058
No known key found for this signature in database
GPG Key ID: 72D48FCE664F7B19
3 changed files with 29 additions and 27 deletions

View File

@ -235,22 +235,14 @@ bool RWSplitSession::route_single_stmt(GWBUF *querybuf)
// Target server was found and is in the correct state
succp = handle_got_target(querybuf, target, store_stmt);
if (succp)
if (succp && command == MXS_COM_STMT_EXECUTE && not_locked_to_master)
{
if (command == MXS_COM_STMT_EXECUTE && not_locked_to_master)
{
/** Track the targets of the COM_STMT_EXECUTE statements. This
* information is used to route all COM_STMT_FETCH commands
* to the same server where the COM_STMT_EXECUTE was done. */
ss_dassert(stmt_id > 0);
m_exec_map[stmt_id] = target;
MXS_INFO("COM_STMT_EXECUTE on %s: %s", target->name(), target->uri());
}
if (session_trx_is_active(m_client->session))
{
m_trx.add_stmt(gwbuf_clone(querybuf));
}
/** Track the targets of the COM_STMT_EXECUTE statements. This
* information is used to route all COM_STMT_FETCH commands
* to the same server where the COM_STMT_EXECUTE was done. */
ss_dassert(stmt_id > 0);
m_exec_map[stmt_id] = target;
MXS_INFO("COM_STMT_EXECUTE on %s: %s", target->name(), target->uri());
}
}
}

View File

@ -387,6 +387,12 @@ void RWSplitSession::handle_trx_replay()
if (chksum == m_replayed_trx.checksum())
{
MXS_INFO("Checksums match, replay successful.");
if (m_interrupted_query.get())
{
MXS_INFO("Resuming execution: %s", mxs::extract_sql(m_interrupted_query.get()).c_str());
retry_query(m_interrupted_query.release(), 0);
}
}
else
{
@ -431,12 +437,15 @@ void RWSplitSession::clientReply(GWBUF *writebuf, DCB *backend_dcb)
return;
}
m_current_query.reset();
if (session_trx_is_active(m_client->session))
{
m_trx.add_stmt(m_current_query.release());
m_trx.add_result(writebuf);
}
else
{
m_current_query.reset();
}
if (backend->reply_is_complete(writebuf))
{
@ -622,16 +631,10 @@ void RWSplitSession::handleError(GWBUF *errmsgbuf, DCB *problem_dcb,
}
else if (session_trx_is_active(session))
{
if (m_current_query.get())
{
// TODO: Re-execute interrupted queries in transactions
can_continue = false;
}
else
{
can_continue = true;
start_trx_replay();
}
// Stash any interrupted queries while we replay the transaction
m_interrupted_query.reset(m_current_query.release());
can_continue = true;
start_trx_replay();
}
*succp = can_continue;

View File

@ -140,6 +140,7 @@ public:
Trx m_trx; /**< Current transaction */
bool m_is_replay_active; /**< Whether we are actively replaying a transaction */
Trx m_replayed_trx; /**< The transaction we are replaying */
mxs::Buffer m_interrupted_query; /**< Query that was interrupted mid-transaction. */
private:
RWSplitSession(RWSplit* instance, MXS_SESSION* session,
@ -194,6 +195,12 @@ private:
inline bool can_retry_query() const
{
/** Individual queries can only be retried if we are not inside
* a transaction. If a query in a transaction needs to be retried,
* the whole transaction must be replayed before the retrying is done.
*
* @see handle_trx_replay
*/
return m_config.delayed_retry &&
m_retry_duration < m_config.delayed_retry_timeout &&
!session_trx_is_active(m_client->session);