MXS-1507: Make transaction replay configurable
The transaction retrying behavior is now configurable and documented. The `transaction_replay` parameter implicitly enables the required functionality in the router that it needs.
This commit is contained in:
@ -290,6 +290,16 @@ RWSplit* RWSplit::create(SERVICE *service, char **options)
|
||||
config.max_sescmd_history = 0;
|
||||
}
|
||||
|
||||
if (config.transaction_replay)
|
||||
{
|
||||
/**
|
||||
* Replaying transactions requires that we are able to do delayed query
|
||||
* retries and reconnect to a master.
|
||||
*/
|
||||
config.delayed_retry = true;
|
||||
config.master_reconnection = true;
|
||||
}
|
||||
|
||||
return new (std::nothrow) RWSplit(service, config);
|
||||
}
|
||||
|
||||
@ -496,6 +506,7 @@ MXS_MODULE *MXS_CREATE_MODULE()
|
||||
{"master_reconnection", MXS_MODULE_PARAM_BOOL, "false"},
|
||||
{"delayed_retry", MXS_MODULE_PARAM_BOOL, "false"},
|
||||
{"delayed_retry_timeout", MXS_MODULE_PARAM_COUNT, "10"},
|
||||
{"transaction_replay", MXS_MODULE_PARAM_BOOL, "false"},
|
||||
{MXS_END_MODULE_PARAMS}
|
||||
}
|
||||
};
|
||||
|
@ -163,7 +163,8 @@ struct Config
|
||||
causal_read_timeout(config_get_string(params, "causal_read_timeout")),
|
||||
master_reconnection(config_get_bool(params, "master_reconnection")),
|
||||
delayed_retry(config_get_bool(params, "delayed_retry")),
|
||||
delayed_retry_timeout(config_get_integer(params, "delayed_retry_timeout"))
|
||||
delayed_retry_timeout(config_get_integer(params, "delayed_retry_timeout")),
|
||||
transaction_replay(config_get_bool(params, "transaction_replay"))
|
||||
{
|
||||
if (enable_causal_read)
|
||||
{
|
||||
@ -191,8 +192,9 @@ struct Config
|
||||
bool enable_causal_read; /**< Enable causual read */
|
||||
std::string causal_read_timeout; /**< Timeout, second parameter of function master_wait_gtid */
|
||||
bool master_reconnection; /**< Allow changes in master server */
|
||||
bool delayed_retry; /**< Delay routing if no target found */
|
||||
uint64_t delayed_retry_timeout; /**< How long to delay until an error is returned */
|
||||
bool delayed_retry; /**< Delay routing if no target found */
|
||||
uint64_t delayed_retry_timeout; /**< How long to delay until an error is returned */
|
||||
bool transaction_replay; /**< Replay failed transactions */
|
||||
};
|
||||
|
||||
/**
|
||||
|
@ -437,7 +437,7 @@ void RWSplitSession::clientReply(GWBUF *writebuf, DCB *backend_dcb)
|
||||
return;
|
||||
}
|
||||
|
||||
if (session_trx_is_active(m_client->session))
|
||||
if (m_config.transaction_replay && session_trx_is_active(m_client->session))
|
||||
{
|
||||
m_trx.add_stmt(m_current_query.release());
|
||||
m_trx.add_result(writebuf);
|
||||
@ -464,13 +464,14 @@ void RWSplitSession::clientReply(GWBUF *writebuf, DCB *backend_dcb)
|
||||
|
||||
if (m_is_replay_active)
|
||||
{
|
||||
ss_dassert(m_config.transaction_replay);
|
||||
handle_trx_replay();
|
||||
|
||||
// Ignore the response, the client doesn't need it
|
||||
gwbuf_free(writebuf);
|
||||
return;
|
||||
}
|
||||
else if (session_trx_is_ending(m_client->session))
|
||||
else if (m_config.transaction_replay && session_trx_is_ending(m_client->session))
|
||||
{
|
||||
m_trx.close();
|
||||
}
|
||||
@ -631,10 +632,17 @@ void RWSplitSession::handleError(GWBUF *errmsgbuf, DCB *problem_dcb,
|
||||
}
|
||||
else if (session_trx_is_active(session))
|
||||
{
|
||||
// Stash any interrupted queries while we replay the transaction
|
||||
m_interrupted_query.reset(m_current_query.release());
|
||||
can_continue = true;
|
||||
start_trx_replay();
|
||||
if (m_config.transaction_replay)
|
||||
{
|
||||
// Stash any interrupted queries while we replay the transaction
|
||||
m_interrupted_query.reset(m_current_query.release());
|
||||
can_continue = true;
|
||||
start_trx_replay();
|
||||
}
|
||||
else
|
||||
{
|
||||
can_continue = false;
|
||||
}
|
||||
}
|
||||
|
||||
*succp = can_continue;
|
||||
|
Reference in New Issue
Block a user