From c1c942a058fa673ba808c4c6717264127ba72df3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Thu, 19 Apr 2018 16:10:10 +0300 Subject: [PATCH] MXS-1507: Retry interrupted queries in transactions As the current query was added to the transaction log before it finished, the m_current_query contained a duplicate of the latest transaction log entry. To correctly log only successful transactions, the statement should be added only after it has successfully completed. This change also removed the unnecessary cloning that took place when the statement was added to the log before it finished. With the fixed transaction logging, the value of m_current_query can be stashed for later retrying while the replay process is happening. If the replay completes successfully and the checksums match, the interrupted query is retried. Also added a clarifying comment to can_retry_query to explain why a query inside a transaction cannot be retried. --- .../readwritesplit/rwsplit_route_stmt.cc | 22 +++++---------- .../routing/readwritesplit/rwsplitsession.cc | 27 ++++++++++--------- .../routing/readwritesplit/rwsplitsession.hh | 7 +++++ 3 files changed, 29 insertions(+), 27 deletions(-) diff --git a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc index e784bf9e9..93315eee2 100644 --- a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc +++ b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc @@ -235,22 +235,14 @@ bool RWSplitSession::route_single_stmt(GWBUF *querybuf) // Target server was found and is in the correct state succp = handle_got_target(querybuf, target, store_stmt); - if (succp) + if (succp && command == MXS_COM_STMT_EXECUTE && not_locked_to_master) { - if (command == MXS_COM_STMT_EXECUTE && not_locked_to_master) - { - /** Track the targets of the COM_STMT_EXECUTE statements. This - * information is used to route all COM_STMT_FETCH commands - * to the same server where the COM_STMT_EXECUTE was done. */ - ss_dassert(stmt_id > 0); - m_exec_map[stmt_id] = target; - MXS_INFO("COM_STMT_EXECUTE on %s: %s", target->name(), target->uri()); - } - - if (session_trx_is_active(m_client->session)) - { - m_trx.add_stmt(gwbuf_clone(querybuf)); - } + /** Track the targets of the COM_STMT_EXECUTE statements. This + * information is used to route all COM_STMT_FETCH commands + * to the same server where the COM_STMT_EXECUTE was done. */ + ss_dassert(stmt_id > 0); + m_exec_map[stmt_id] = target; + MXS_INFO("COM_STMT_EXECUTE on %s: %s", target->name(), target->uri()); } } } diff --git a/server/modules/routing/readwritesplit/rwsplitsession.cc b/server/modules/routing/readwritesplit/rwsplitsession.cc index bfab332ab..31bd1a108 100644 --- a/server/modules/routing/readwritesplit/rwsplitsession.cc +++ b/server/modules/routing/readwritesplit/rwsplitsession.cc @@ -387,6 +387,12 @@ void RWSplitSession::handle_trx_replay() if (chksum == m_replayed_trx.checksum()) { MXS_INFO("Checksums match, replay successful."); + + if (m_interrupted_query.get()) + { + MXS_INFO("Resuming execution: %s", mxs::extract_sql(m_interrupted_query.get()).c_str()); + retry_query(m_interrupted_query.release(), 0); + } } else { @@ -431,12 +437,15 @@ void RWSplitSession::clientReply(GWBUF *writebuf, DCB *backend_dcb) return; } - m_current_query.reset(); - if (session_trx_is_active(m_client->session)) { + m_trx.add_stmt(m_current_query.release()); m_trx.add_result(writebuf); } + else + { + m_current_query.reset(); + } if (backend->reply_is_complete(writebuf)) { @@ -622,16 +631,10 @@ void RWSplitSession::handleError(GWBUF *errmsgbuf, DCB *problem_dcb, } else if (session_trx_is_active(session)) { - if (m_current_query.get()) - { - // TODO: Re-execute interrupted queries in transactions - can_continue = false; - } - else - { - can_continue = true; - start_trx_replay(); - } + // Stash any interrupted queries while we replay the transaction + m_interrupted_query.reset(m_current_query.release()); + can_continue = true; + start_trx_replay(); } *succp = can_continue; diff --git a/server/modules/routing/readwritesplit/rwsplitsession.hh b/server/modules/routing/readwritesplit/rwsplitsession.hh index 7266edfee..d07f03ae1 100644 --- a/server/modules/routing/readwritesplit/rwsplitsession.hh +++ b/server/modules/routing/readwritesplit/rwsplitsession.hh @@ -140,6 +140,7 @@ public: Trx m_trx; /**< Current transaction */ bool m_is_replay_active; /**< Whether we are actively replaying a transaction */ Trx m_replayed_trx; /**< The transaction we are replaying */ + mxs::Buffer m_interrupted_query; /**< Query that was interrupted mid-transaction. */ private: RWSplitSession(RWSplit* instance, MXS_SESSION* session, @@ -194,6 +195,12 @@ private: inline bool can_retry_query() const { + /** Individual queries can only be retried if we are not inside + * a transaction. If a query in a transaction needs to be retried, + * the whole transaction must be replayed before the retrying is done. + * + * @see handle_trx_replay + */ return m_config.delayed_retry && m_retry_duration < m_config.delayed_retry_timeout && !session_trx_is_active(m_client->session);