From 12398bfc26c99abb3019b9cb57108022f75d3148 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Sun, 24 Jun 2018 19:46:26 +0300 Subject: [PATCH] MXS-1549: Implement optimistic transaction execution When the `optimistic_trx` mode is enabled, all transactions are started on a slave server. If the client executes a query inside the transaction that is not of a read-only nature, the transaction is rolled back and replayed on the master. --- include/maxscale/queryclassifier.hh | 10 +++ .../readwritesplit/rwsplit_route_stmt.cc | 64 ++++++++++++++++++- .../routing/readwritesplit/rwsplitsession.cc | 39 ++++++++++- .../routing/readwritesplit/rwsplitsession.hh | 30 +++++++++ 4 files changed, 140 insertions(+), 3 deletions(-) diff --git a/include/maxscale/queryclassifier.hh b/include/maxscale/queryclassifier.hh index 1c3e654eb..aefa08e8e 100644 --- a/include/maxscale/queryclassifier.hh +++ b/include/maxscale/queryclassifier.hh @@ -218,6 +218,16 @@ public: return m_trx_is_read_only; } + /** + * Check if current transaction is still a read-only transaction + * + * @return True if no statements have been executed that modify data + */ + bool is_trx_starting() const + { + return qc_query_is_type(m_route_info.type_mask(), QUERY_TYPE_BEGIN_TRX); + } + /** * @brief Store and process a prepared statement * diff --git a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc index a4da57030..6f6f24c08 100644 --- a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc +++ b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc @@ -151,6 +151,44 @@ void replace_binary_ps_id(GWBUF* buffer, uint32_t id) } +bool RWSplitSession::should_try_trx_on_slave(route_target_t route_target) const +{ + return m_config.optimistic_trx && // Optimistic transactions are enabled + !is_locked_to_master() && // Not locked to master + !m_is_replay_active && // Not replaying a transaction + m_otrx_state == OTRX_INACTIVE && // Not yet in optimistic mode + TARGET_IS_MASTER(route_target); // The target type is master +} + +bool RWSplitSession::track_optimistic_trx(GWBUF** buffer) +{ + bool store_stmt = true; + + if (session_trx_is_ending(m_client->session)) + { + m_otrx_state = OTRX_INACTIVE; + } + else if (!m_qc.is_trx_still_read_only()) + { + // Not a plain SELECT, roll it back on the slave and start it on the master + MXS_INFO("Rolling back current optimistic transaction"); + + // Note: This clone is here because routeQuery will always free the buffer + m_current_query.reset(gwbuf_clone(*buffer)); + + /** + * Store the actual statement we were attempting to execute and + * replace it with a ROLLBACK. The storing of the statement is + * done here to avoid storage of the ROLLBACK. + */ + *buffer = modutil_create_query("ROLLBACK"); + store_stmt = false; + m_otrx_state = OTRX_ROLLBACK; + } + + return store_stmt; +} + /** * Routing function. Find out query type, backend type, and target DCB(s). * Then route query to found target(s). @@ -161,8 +199,9 @@ void replace_binary_ps_id(GWBUF* buffer, uint32_t id) */ bool RWSplitSession::route_single_stmt(GWBUF *querybuf) { + ss_info_dassert(m_otrx_state != OTRX_ROLLBACK, + "OTRX_ROLLBACK should never happen when routing queries"); bool succp = false; - const QueryClassifier::RouteInfo& info = m_qc.current_route_info(); uint32_t stmt_id = info.stmt_id(); uint8_t command = info.command(); @@ -192,6 +231,20 @@ bool RWSplitSession::route_single_stmt(GWBUF *querybuf) &m_router->stats().n_rw_trx, 1); } + if (m_qc.is_trx_starting() && // A transaction is starting + !session_trx_is_read_only(m_client->session) && // Not explicitly read-only + should_try_trx_on_slave(route_target)) // Qualifies for speculative routing + { + // Speculatively start routing the transaction to a slave + m_otrx_state = OTRX_STARTING; + route_target = TARGET_SLAVE; + } + else if (m_otrx_state == OTRX_STARTING) + { + // Transaction was started, begin active tracking of its progress + m_otrx_state = OTRX_ACTIVE; + } + // If delayed query retry is enabled, we need to store the current statement bool store_stmt = m_config.delayed_retry; @@ -203,6 +256,15 @@ bool RWSplitSession::route_single_stmt(GWBUF *querybuf) target = m_prev_target; succp = true; } + else if (m_otrx_state == OTRX_ACTIVE) + { + /** We are speculatively executing a transaction to the slave, keep + * routing queries to the same server. If the query modifies data, + * a rollback is initiated on the slave server. */ + store_stmt = track_optimistic_trx(&querybuf); + target = m_prev_target; + succp = true; + } else if (TARGET_IS_NAMED_SERVER(route_target) || TARGET_IS_RLAG_MAX(route_target)) { /** diff --git a/server/modules/routing/readwritesplit/rwsplitsession.cc b/server/modules/routing/readwritesplit/rwsplitsession.cc index 6ffb6a9b4..98a966bf9 100644 --- a/server/modules/routing/readwritesplit/rwsplitsession.cc +++ b/server/modules/routing/readwritesplit/rwsplitsession.cc @@ -478,8 +478,20 @@ void RWSplitSession::clientReply(GWBUF *writebuf, DCB *backend_dcb) return; // Nothing to route, return } - if (m_config.transaction_replay && m_can_replay_trx && - session_trx_is_active(m_client->session)) + if (m_otrx_state == OTRX_ROLLBACK) + { + /** This is the response to the ROLLBACK. If it fails, we must close + * the connection. The replaying of the transaction can continue + * regardless of the ROLLBACK result. */ + ss_dassert(backend == m_prev_target); + + if (!mxs_mysql_is_ok_packet(writebuf)) + { + poll_fake_hangup_event(backend_dcb); + } + } + else if (m_config.transaction_replay && m_can_replay_trx && + session_trx_is_active(m_client->session)) { if (!backend->has_session_commands()) { @@ -541,6 +553,15 @@ void RWSplitSession::clientReply(GWBUF *writebuf, DCB *backend_dcb) // Server requested a local file, go into data streaming mode m_qc.set_load_data_state(QueryClassifier::LOAD_DATA_ACTIVE); } + + if (m_otrx_state == OTRX_ROLLBACK) + { + // Transaction rolled back, start replaying it on the master + m_otrx_state = OTRX_INACTIVE; + start_trx_replay(); + gwbuf_free(writebuf); + return; + } } else { @@ -787,6 +808,20 @@ void RWSplitSession::handleError(GWBUF *errmsgbuf, DCB *problem_dcb, can_continue = start_trx_replay(); backend->close(); } + else if (m_otrx_state != OTRX_INACTIVE) + { + /** + * The connection was closed mid-transaction or while we were + * executing the ROLLBACK. In both cases the transaction will + * be closed. We can safely start retrying the transaction + * on the master. + */ + + ss_dassert(session_trx_is_active(session)); + m_otrx_state = OTRX_INACTIVE; + can_continue = start_trx_replay(); + backend->close(); + } else { /** Try to replace the failed connection with a new one */ diff --git a/server/modules/routing/readwritesplit/rwsplitsession.hh b/server/modules/routing/readwritesplit/rwsplitsession.hh index c68ac6213..ea9487465 100644 --- a/server/modules/routing/readwritesplit/rwsplitsession.hh +++ b/server/modules/routing/readwritesplit/rwsplitsession.hh @@ -59,6 +59,14 @@ public: TARGET_RLAG_MAX = maxscale::QueryClassifier::TARGET_RLAG_MAX, }; + enum otrx_state + { + OTRX_INACTIVE, // No open transactions + OTRX_STARTING, // Transaction starting on slave + OTRX_ACTIVE, // Transaction open on a slave server + OTRX_ROLLBACK // Transaction being rolled back on the slave server + }; + enum wait_gtid_state { NONE, @@ -149,6 +157,7 @@ public: bool m_can_replay_trx; /**< Whether the transaction can be replayed */ Trx m_replayed_trx; /**< The transaction we are replaying */ mxs::Buffer m_interrupted_query; /**< Query that was interrupted mid-transaction. */ + otrx_state m_otrx_state = OTRX_INACTIVE; /**< Optimistic trx state*/ private: RWSplitSession(RWSplit* instance, MXS_SESSION* session, @@ -202,6 +211,27 @@ private: */ bool start_trx_replay(); + /** + * See if the transaction could be done on a slave + * + * @param route_target Target where the query is routed + * + * @return True if the query can be attempted on a slave + */ + bool should_try_trx_on_slave(route_target_t route_target) const; + + /** + * Track optimistic transaction status + * + * Tracks the progress of the optimistic transaction and starts the rollback + * procedure if the transaction turns out to be one that modifies data. + * + * @param buffer Current query + * + * @return Whether the current statement should be stored for the duration of the query + */ + bool track_optimistic_trx(GWBUF** buffer); + private: // QueryClassifier::Handler bool lock_to_master();