MXS-1507: Add transaction size limit

Added the `transaction_replay_max_size` parameter that controls the
maximum size of a transaction that can be replayed. If the limit is
exceeded, the stored statements are released thus preventing the
transaction from being replayed.

This limitation prevents accidental misuse of the transaction replaying
system when autocommit is disabled. It also allows the user to control the
amount of memory that MaxScale will use.
This commit is contained in:
Markus Mäkelä 2018-04-25 10:50:43 +03:00
parent cffed86962
commit 092532745d
No known key found for this signature in database
GPG Key ID: 72D48FCE664F7B19
6 changed files with 53 additions and 7 deletions

View File

@ -360,6 +360,13 @@ with transaction replay is discouraged. If such statements are executed
but the results of each reply are identical, the transaction is replayed but the results
are not guaranteed to be consistent on the database level.
### `transaction_replay_max_size`
The limit on transaction size for transaction replay in bytes. Any transaction
that exceeds this limit will not be replayed. The default size limit is 1
MiB. Read [the configuration guide](../Getting-Started/Configuration-Guide.md#sizes)
for more details on size type parameters in MaxScale.
## Routing hints
The readwritesplit router supports routing hints. For a detailed guide on hint

View File

@ -507,6 +507,7 @@ MXS_MODULE *MXS_CREATE_MODULE()
{"delayed_retry", MXS_MODULE_PARAM_BOOL, "false"},
{"delayed_retry_timeout", MXS_MODULE_PARAM_COUNT, "10"},
{"transaction_replay", MXS_MODULE_PARAM_BOOL, "false"},
{"transaction_replay_max_size", MXS_MODULE_PARAM_COUNT, "1Mi"},
{MXS_END_MODULE_PARAMS}
}
};

View File

@ -164,7 +164,8 @@ struct Config
master_reconnection(config_get_bool(params, "master_reconnection")),
delayed_retry(config_get_bool(params, "delayed_retry")),
delayed_retry_timeout(config_get_integer(params, "delayed_retry_timeout")),
transaction_replay(config_get_bool(params, "transaction_replay"))
transaction_replay(config_get_bool(params, "transaction_replay")),
trx_max_size(config_get_size(params, "transaction_replay_max_size"))
{
if (enable_causal_read)
{
@ -195,6 +196,7 @@ struct Config
bool delayed_retry; /**< Delay routing if no target found */
uint64_t delayed_retry_timeout; /**< How long to delay until an error is returned */
bool transaction_replay; /**< Replay failed transactions */
size_t trx_max_size; /**< Max transaction size for replaying */
};
/**

View File

@ -40,7 +40,8 @@ RWSplitSession::RWSplitSession(RWSplit* instance, MXS_SESSION* session,
m_next_seq(0),
m_qc(this, session, instance->config().use_sql_variables_in),
m_retry_duration(0),
m_is_replay_active(false)
m_is_replay_active(false),
m_can_replay_trx(true)
{
if (m_config.rw_max_slave_conn_percent)
{
@ -437,10 +438,25 @@ void RWSplitSession::clientReply(GWBUF *writebuf, DCB *backend_dcb)
return;
}
if (m_config.transaction_replay && session_trx_is_active(m_client->session))
if (m_config.transaction_replay && m_can_replay_trx &&
session_trx_is_active(m_client->session))
{
m_trx.add_stmt(m_current_query.release());
m_trx.add_result(writebuf);
size_t size{m_trx.size() + m_current_query.length()};
// A transaction is open and it is eligible for replaying
if (size < m_config.trx_max_size)
{
/** Transaction size is OK, store the statement for replaying and
* update the checksum of the result */
m_trx.add_stmt(m_current_query.release());
m_trx.add_result(writebuf);
}
else
{
MXS_INFO("Transaction is too big (%lu bytes), can't replay if it fails.", size);
m_current_query.reset();
m_trx.close();
m_can_replay_trx = false;
}
}
else
{
@ -474,6 +490,7 @@ void RWSplitSession::clientReply(GWBUF *writebuf, DCB *backend_dcb)
else if (m_config.transaction_replay && session_trx_is_ending(m_client->session))
{
m_trx.close();
m_can_replay_trx = true;
}
if (backend->has_session_commands())
@ -529,7 +546,7 @@ bool RWSplitSession::start_trx_replay()
{
bool rval = false;
if (!m_is_replay_active && m_config.transaction_replay)
if (!m_is_replay_active && m_config.transaction_replay && m_can_replay_trx)
{
// Stash any interrupted queries while we replay the transaction
m_interrupted_query.reset(m_current_query.release());

View File

@ -139,6 +139,7 @@ public:
mxs::Buffer m_current_query; /**< Current query being executed */
Trx m_trx; /**< Current transaction */
bool m_is_replay_active; /**< Whether we are actively replaying a transaction */
bool m_can_replay_trx; /**< Whether the transaction can be replayed */
Trx m_replayed_trx; /**< The transaction we are replaying */
mxs::Buffer m_interrupted_query; /**< Query that was interrupted mid-transaction. */

View File

@ -26,6 +26,11 @@ public:
// A log of executed queries, for transaction replay
typedef std::deque<mxs::Buffer> TrxLog;
Trx():
m_size(0)
{
}
/**
* Add a statement to the transaction
*
@ -33,6 +38,7 @@ public:
*/
void add_stmt(GWBUF* buf)
{
m_size += gwbuf_length(buf);
m_log.push_back(buf);
}
@ -86,6 +92,16 @@ public:
return m_log.empty();
}
/**
* Get transaction size in bytes
*
* @return Size of the transaction in bytes
*/
size_t size() const
{
return m_size;
}
/**
* Close the transaction
*
@ -95,6 +111,7 @@ public:
{
m_checksum.reset();
m_log.clear();
m_size = 0;
}
/**
@ -112,4 +129,5 @@ public:
private:
mxs::SHA1Checksum m_checksum; /**< Checksum of the transaction */
TrxLog m_log; /**< The transaction contents */
} ;
size_t m_size; /**< Transaction size in bytes */
};