MXS-1549: Implement optimistic transaction execution
When the `optimistic_trx` mode is enabled, all transactions are started on a slave server. If the client executes a query inside the transaction that is not of a read-only nature, the transaction is rolled back and replayed on the master.
This commit is contained in:
@ -218,6 +218,16 @@ public:
|
|||||||
return m_trx_is_read_only;
|
return m_trx_is_read_only;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if current transaction is still a read-only transaction
|
||||||
|
*
|
||||||
|
* @return True if no statements have been executed that modify data
|
||||||
|
*/
|
||||||
|
bool is_trx_starting() const
|
||||||
|
{
|
||||||
|
return qc_query_is_type(m_route_info.type_mask(), QUERY_TYPE_BEGIN_TRX);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Store and process a prepared statement
|
* @brief Store and process a prepared statement
|
||||||
*
|
*
|
||||||
|
|||||||
@ -151,6 +151,44 @@ void replace_binary_ps_id(GWBUF* buffer, uint32_t id)
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool RWSplitSession::should_try_trx_on_slave(route_target_t route_target) const
|
||||||
|
{
|
||||||
|
return m_config.optimistic_trx && // Optimistic transactions are enabled
|
||||||
|
!is_locked_to_master() && // Not locked to master
|
||||||
|
!m_is_replay_active && // Not replaying a transaction
|
||||||
|
m_otrx_state == OTRX_INACTIVE && // Not yet in optimistic mode
|
||||||
|
TARGET_IS_MASTER(route_target); // The target type is master
|
||||||
|
}
|
||||||
|
|
||||||
|
bool RWSplitSession::track_optimistic_trx(GWBUF** buffer)
|
||||||
|
{
|
||||||
|
bool store_stmt = true;
|
||||||
|
|
||||||
|
if (session_trx_is_ending(m_client->session))
|
||||||
|
{
|
||||||
|
m_otrx_state = OTRX_INACTIVE;
|
||||||
|
}
|
||||||
|
else if (!m_qc.is_trx_still_read_only())
|
||||||
|
{
|
||||||
|
// Not a plain SELECT, roll it back on the slave and start it on the master
|
||||||
|
MXS_INFO("Rolling back current optimistic transaction");
|
||||||
|
|
||||||
|
// Note: This clone is here because routeQuery will always free the buffer
|
||||||
|
m_current_query.reset(gwbuf_clone(*buffer));
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Store the actual statement we were attempting to execute and
|
||||||
|
* replace it with a ROLLBACK. The storing of the statement is
|
||||||
|
* done here to avoid storage of the ROLLBACK.
|
||||||
|
*/
|
||||||
|
*buffer = modutil_create_query("ROLLBACK");
|
||||||
|
store_stmt = false;
|
||||||
|
m_otrx_state = OTRX_ROLLBACK;
|
||||||
|
}
|
||||||
|
|
||||||
|
return store_stmt;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Routing function. Find out query type, backend type, and target DCB(s).
|
* Routing function. Find out query type, backend type, and target DCB(s).
|
||||||
* Then route query to found target(s).
|
* Then route query to found target(s).
|
||||||
@ -161,8 +199,9 @@ void replace_binary_ps_id(GWBUF* buffer, uint32_t id)
|
|||||||
*/
|
*/
|
||||||
bool RWSplitSession::route_single_stmt(GWBUF *querybuf)
|
bool RWSplitSession::route_single_stmt(GWBUF *querybuf)
|
||||||
{
|
{
|
||||||
|
ss_info_dassert(m_otrx_state != OTRX_ROLLBACK,
|
||||||
|
"OTRX_ROLLBACK should never happen when routing queries");
|
||||||
bool succp = false;
|
bool succp = false;
|
||||||
|
|
||||||
const QueryClassifier::RouteInfo& info = m_qc.current_route_info();
|
const QueryClassifier::RouteInfo& info = m_qc.current_route_info();
|
||||||
uint32_t stmt_id = info.stmt_id();
|
uint32_t stmt_id = info.stmt_id();
|
||||||
uint8_t command = info.command();
|
uint8_t command = info.command();
|
||||||
@ -192,6 +231,20 @@ bool RWSplitSession::route_single_stmt(GWBUF *querybuf)
|
|||||||
&m_router->stats().n_rw_trx, 1);
|
&m_router->stats().n_rw_trx, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (m_qc.is_trx_starting() && // A transaction is starting
|
||||||
|
!session_trx_is_read_only(m_client->session) && // Not explicitly read-only
|
||||||
|
should_try_trx_on_slave(route_target)) // Qualifies for speculative routing
|
||||||
|
{
|
||||||
|
// Speculatively start routing the transaction to a slave
|
||||||
|
m_otrx_state = OTRX_STARTING;
|
||||||
|
route_target = TARGET_SLAVE;
|
||||||
|
}
|
||||||
|
else if (m_otrx_state == OTRX_STARTING)
|
||||||
|
{
|
||||||
|
// Transaction was started, begin active tracking of its progress
|
||||||
|
m_otrx_state = OTRX_ACTIVE;
|
||||||
|
}
|
||||||
|
|
||||||
// If delayed query retry is enabled, we need to store the current statement
|
// If delayed query retry is enabled, we need to store the current statement
|
||||||
bool store_stmt = m_config.delayed_retry;
|
bool store_stmt = m_config.delayed_retry;
|
||||||
|
|
||||||
@ -203,6 +256,15 @@ bool RWSplitSession::route_single_stmt(GWBUF *querybuf)
|
|||||||
target = m_prev_target;
|
target = m_prev_target;
|
||||||
succp = true;
|
succp = true;
|
||||||
}
|
}
|
||||||
|
else if (m_otrx_state == OTRX_ACTIVE)
|
||||||
|
{
|
||||||
|
/** We are speculatively executing a transaction to the slave, keep
|
||||||
|
* routing queries to the same server. If the query modifies data,
|
||||||
|
* a rollback is initiated on the slave server. */
|
||||||
|
store_stmt = track_optimistic_trx(&querybuf);
|
||||||
|
target = m_prev_target;
|
||||||
|
succp = true;
|
||||||
|
}
|
||||||
else if (TARGET_IS_NAMED_SERVER(route_target) || TARGET_IS_RLAG_MAX(route_target))
|
else if (TARGET_IS_NAMED_SERVER(route_target) || TARGET_IS_RLAG_MAX(route_target))
|
||||||
{
|
{
|
||||||
/**
|
/**
|
||||||
|
|||||||
@ -478,8 +478,20 @@ void RWSplitSession::clientReply(GWBUF *writebuf, DCB *backend_dcb)
|
|||||||
return; // Nothing to route, return
|
return; // Nothing to route, return
|
||||||
}
|
}
|
||||||
|
|
||||||
if (m_config.transaction_replay && m_can_replay_trx &&
|
if (m_otrx_state == OTRX_ROLLBACK)
|
||||||
session_trx_is_active(m_client->session))
|
{
|
||||||
|
/** This is the response to the ROLLBACK. If it fails, we must close
|
||||||
|
* the connection. The replaying of the transaction can continue
|
||||||
|
* regardless of the ROLLBACK result. */
|
||||||
|
ss_dassert(backend == m_prev_target);
|
||||||
|
|
||||||
|
if (!mxs_mysql_is_ok_packet(writebuf))
|
||||||
|
{
|
||||||
|
poll_fake_hangup_event(backend_dcb);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if (m_config.transaction_replay && m_can_replay_trx &&
|
||||||
|
session_trx_is_active(m_client->session))
|
||||||
{
|
{
|
||||||
if (!backend->has_session_commands())
|
if (!backend->has_session_commands())
|
||||||
{
|
{
|
||||||
@ -541,6 +553,15 @@ void RWSplitSession::clientReply(GWBUF *writebuf, DCB *backend_dcb)
|
|||||||
// Server requested a local file, go into data streaming mode
|
// Server requested a local file, go into data streaming mode
|
||||||
m_qc.set_load_data_state(QueryClassifier::LOAD_DATA_ACTIVE);
|
m_qc.set_load_data_state(QueryClassifier::LOAD_DATA_ACTIVE);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (m_otrx_state == OTRX_ROLLBACK)
|
||||||
|
{
|
||||||
|
// Transaction rolled back, start replaying it on the master
|
||||||
|
m_otrx_state = OTRX_INACTIVE;
|
||||||
|
start_trx_replay();
|
||||||
|
gwbuf_free(writebuf);
|
||||||
|
return;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
@ -787,6 +808,20 @@ void RWSplitSession::handleError(GWBUF *errmsgbuf, DCB *problem_dcb,
|
|||||||
can_continue = start_trx_replay();
|
can_continue = start_trx_replay();
|
||||||
backend->close();
|
backend->close();
|
||||||
}
|
}
|
||||||
|
else if (m_otrx_state != OTRX_INACTIVE)
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* The connection was closed mid-transaction or while we were
|
||||||
|
* executing the ROLLBACK. In both cases the transaction will
|
||||||
|
* be closed. We can safely start retrying the transaction
|
||||||
|
* on the master.
|
||||||
|
*/
|
||||||
|
|
||||||
|
ss_dassert(session_trx_is_active(session));
|
||||||
|
m_otrx_state = OTRX_INACTIVE;
|
||||||
|
can_continue = start_trx_replay();
|
||||||
|
backend->close();
|
||||||
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
/** Try to replace the failed connection with a new one */
|
/** Try to replace the failed connection with a new one */
|
||||||
|
|||||||
@ -59,6 +59,14 @@ public:
|
|||||||
TARGET_RLAG_MAX = maxscale::QueryClassifier::TARGET_RLAG_MAX,
|
TARGET_RLAG_MAX = maxscale::QueryClassifier::TARGET_RLAG_MAX,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
enum otrx_state
|
||||||
|
{
|
||||||
|
OTRX_INACTIVE, // No open transactions
|
||||||
|
OTRX_STARTING, // Transaction starting on slave
|
||||||
|
OTRX_ACTIVE, // Transaction open on a slave server
|
||||||
|
OTRX_ROLLBACK // Transaction being rolled back on the slave server
|
||||||
|
};
|
||||||
|
|
||||||
enum wait_gtid_state
|
enum wait_gtid_state
|
||||||
{
|
{
|
||||||
NONE,
|
NONE,
|
||||||
@ -149,6 +157,7 @@ public:
|
|||||||
bool m_can_replay_trx; /**< Whether the transaction can be replayed */
|
bool m_can_replay_trx; /**< Whether the transaction can be replayed */
|
||||||
Trx m_replayed_trx; /**< The transaction we are replaying */
|
Trx m_replayed_trx; /**< The transaction we are replaying */
|
||||||
mxs::Buffer m_interrupted_query; /**< Query that was interrupted mid-transaction. */
|
mxs::Buffer m_interrupted_query; /**< Query that was interrupted mid-transaction. */
|
||||||
|
otrx_state m_otrx_state = OTRX_INACTIVE; /**< Optimistic trx state*/
|
||||||
|
|
||||||
private:
|
private:
|
||||||
RWSplitSession(RWSplit* instance, MXS_SESSION* session,
|
RWSplitSession(RWSplit* instance, MXS_SESSION* session,
|
||||||
@ -202,6 +211,27 @@ private:
|
|||||||
*/
|
*/
|
||||||
bool start_trx_replay();
|
bool start_trx_replay();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* See if the transaction could be done on a slave
|
||||||
|
*
|
||||||
|
* @param route_target Target where the query is routed
|
||||||
|
*
|
||||||
|
* @return True if the query can be attempted on a slave
|
||||||
|
*/
|
||||||
|
bool should_try_trx_on_slave(route_target_t route_target) const;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Track optimistic transaction status
|
||||||
|
*
|
||||||
|
* Tracks the progress of the optimistic transaction and starts the rollback
|
||||||
|
* procedure if the transaction turns out to be one that modifies data.
|
||||||
|
*
|
||||||
|
* @param buffer Current query
|
||||||
|
*
|
||||||
|
* @return Whether the current statement should be stored for the duration of the query
|
||||||
|
*/
|
||||||
|
bool track_optimistic_trx(GWBUF** buffer);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
// QueryClassifier::Handler
|
// QueryClassifier::Handler
|
||||||
bool lock_to_master();
|
bool lock_to_master();
|
||||||
|
|||||||
Reference in New Issue
Block a user