MXS-2300: Add session command pruning

This commit adds a new parameter that, when enabled, prunes the session
command history to a known length. This makes it possible to keep a
client-side pooled connection open indefinitely at the cost of making
reconnections theoretically unsafe. In practice the maximum history length
can be set to a value that encompasses a single session using the pooled
connection with no risk to session state integrity. The default history
length of 50 commands is quite likely to be adequate for the majority of
use-cases.
This commit is contained in:
Markus Mäkelä
2019-01-30 09:14:13 +02:00
parent bf4aa1ab2c
commit 260ce9b8b8
5 changed files with 66 additions and 12 deletions

View File

@ -273,6 +273,9 @@ void RWSplit::diagnostics(DCB* dcb)
dcb_printf(dcb,
"\tstrict_sp_calls: %s\n",
cnf.strict_sp_calls ? "true" : "false");
dcb_printf(dcb,
"\tprune_sescmd_history: %s\n",
cnf.prune_sescmd_history ? "true" : "false");
dcb_printf(dcb,
"\tdisable_sescmd_history: %s\n",
cnf.disable_sescmd_history ? "true" : "false");
@ -504,6 +507,7 @@ extern "C" MXS_MODULE* MXS_CREATE_MODULE()
{"max_slave_replication_lag", MXS_MODULE_PARAM_INT, "-1" },
{"max_slave_connections", MXS_MODULE_PARAM_STRING, MAX_SLAVE_COUNT},
{"retry_failed_reads", MXS_MODULE_PARAM_BOOL, "true" },
{"prune_sescmd_history", MXS_MODULE_PARAM_BOOL, "false" },
{"disable_sescmd_history", MXS_MODULE_PARAM_BOOL, "false" },
{"max_sescmd_history", MXS_MODULE_PARAM_COUNT, "50" },
{"strict_multi_stmt", MXS_MODULE_PARAM_BOOL, "false" },

View File

@ -151,6 +151,7 @@ struct Config
(enum failure_mode)config_get_enum(
params, "master_failure_mode", master_failure_mode_values))
, max_sescmd_history(config_get_integer(params, "max_sescmd_history"))
, prune_sescmd_history(config_get_bool(params, "prune_sescmd_history"))
, disable_sescmd_history(config_get_bool(params, "disable_sescmd_history"))
, master_accept_reads(config_get_bool(params, "master_accept_reads"))
, strict_multi_stmt(config_get_bool(params, "strict_multi_stmt"))
@ -205,6 +206,7 @@ struct Config
* master or all nodes */
failure_mode master_failure_mode; /**< Master server failure handling mode */
uint64_t max_sescmd_history; /**< Maximum amount of session commands to store */
bool prune_sescmd_history; /**< Prune session command history */
bool disable_sescmd_history;/**< Disable session command history */
bool master_accept_reads; /**< Use master for reads */
bool strict_multi_stmt; /**< Force non-multistatement queries to be routed to

View File

@ -393,6 +393,23 @@ void RWSplitSession::continue_large_session_write(GWBUF* querybuf, uint32_t type
}
}
void RWSplitSession::prune_to_position(uint64_t pos)
{
/** Prune all completed responses before a certain position */
ResponseMap::iterator it = m_sescmd_responses.lower_bound(pos);
if (it != m_sescmd_responses.end())
{
// Found newer responses that were returned after this position
m_sescmd_responses.erase(m_sescmd_responses.begin(), it);
}
else
{
// All responses are older than the requested position
m_sescmd_responses.clear();
}
}
/**
* Execute in backends used by current router session.
* Save session variable commands to router session property
@ -522,20 +539,17 @@ bool RWSplitSession::route_session_write(GWBUF* querybuf, uint8_t command, uint3
m_sescmd_list.clear();
}
if (m_config.prune_sescmd_history && !m_sescmd_list.empty()
&& m_sescmd_list.size() + 1 >= m_config.max_sescmd_history)
{
// Close to the history limit, remove the oldest command
prune_to_position(m_sescmd_list.front()->get_position());
m_sescmd_list.pop_front();
}
if (m_config.disable_sescmd_history)
{
/** Prune stored responses */
ResponseMap::iterator it = m_sescmd_responses.lower_bound(lowest_pos);
if (it != m_sescmd_responses.end())
{
m_sescmd_responses.erase(m_sescmd_responses.begin(), it);
}
else
{
// All responses processed
m_sescmd_responses.clear();
}
prune_to_position(lowest_pos);
}
else
{

View File

@ -182,6 +182,7 @@ private:
void process_sescmd_response(mxs::SRWBackend& backend, GWBUF** ppPacket);
void compress_history(mxs::SSessionCommand& sescmd);
void prune_to_position(uint64_t pos);
bool route_session_write(GWBUF* querybuf, uint8_t command, uint32_t type);
void continue_large_session_write(GWBUF* querybuf, uint32_t type);
bool route_single_stmt(GWBUF* querybuf);