Merge branch '2.3' into develop

This commit is contained in:
Markus Mäkelä
2019-02-01 13:55:54 +02:00
31 changed files with 743 additions and 175 deletions

View File

@ -272,6 +272,9 @@ void RWSplit::diagnostics(DCB* dcb)
dcb_printf(dcb,
"\tstrict_sp_calls: %s\n",
cnf.strict_sp_calls ? "true" : "false");
dcb_printf(dcb,
"\tprune_sescmd_history: %s\n",
cnf.prune_sescmd_history ? "true" : "false");
dcb_printf(dcb,
"\tdisable_sescmd_history: %s\n",
cnf.disable_sescmd_history ? "true" : "false");
@ -503,6 +506,7 @@ extern "C" MXS_MODULE* MXS_CREATE_MODULE()
{"max_slave_replication_lag", MXS_MODULE_PARAM_INT, "-1" },
{"max_slave_connections", MXS_MODULE_PARAM_STRING, MAX_SLAVE_COUNT},
{"retry_failed_reads", MXS_MODULE_PARAM_BOOL, "true" },
{"prune_sescmd_history", MXS_MODULE_PARAM_BOOL, "false" },
{"disable_sescmd_history", MXS_MODULE_PARAM_BOOL, "false" },
{"max_sescmd_history", MXS_MODULE_PARAM_COUNT, "50" },
{"strict_multi_stmt", MXS_MODULE_PARAM_BOOL, "false" },

View File

@ -141,6 +141,7 @@ struct Config
, master_failure_mode(
(enum failure_mode)params->get_enum("master_failure_mode", master_failure_mode_values))
, max_sescmd_history(params->get_integer("max_sescmd_history"))
, prune_sescmd_history(config_get_bool(params, "prune_sescmd_history"))
, disable_sescmd_history(config_get_bool(params, "disable_sescmd_history"))
, master_accept_reads(config_get_bool(params, "master_accept_reads"))
, strict_multi_stmt(config_get_bool(params, "strict_multi_stmt"))
@ -195,6 +196,7 @@ struct Config
* master or all nodes */
failure_mode master_failure_mode; /**< Master server failure handling mode */
uint64_t max_sescmd_history; /**< Maximum amount of session commands to store */
bool prune_sescmd_history; /**< Prune session command history */
bool disable_sescmd_history;/**< Disable session command history */
bool master_accept_reads; /**< Use master for reads */
bool strict_multi_stmt; /**< Force non-multistatement queries to be routed to

View File

@ -341,8 +341,7 @@ bool RWSplitSession::route_single_stmt(GWBUF* querybuf)
else
{
MXS_ERROR("Could not find valid server for target type %s, closing "
"connection.",
STRTARGET(route_target));
"connection.", route_target_to_string(route_target));
}
}
@ -391,6 +390,23 @@ void RWSplitSession::continue_large_session_write(GWBUF* querybuf, uint32_t type
}
}
void RWSplitSession::prune_to_position(uint64_t pos)
{
/** Prune all completed responses before a certain position */
ResponseMap::iterator it = m_sescmd_responses.lower_bound(pos);
if (it != m_sescmd_responses.end())
{
// Found newer responses that were returned after this position
m_sescmd_responses.erase(m_sescmd_responses.begin(), it);
}
else
{
// All responses are older than the requested position
m_sescmd_responses.clear();
}
}
/**
* Execute in backends used by current router session.
* Save session variable commands to router session property
@ -498,7 +514,7 @@ bool RWSplitSession::route_session_write(GWBUF* querybuf, uint8_t command, uint3
}
}
if (m_config.max_sescmd_history > 0 && m_sescmd_list.size() >= m_config.max_sescmd_history)
if (m_config.max_sescmd_history > 0 && m_sescmd_list.size() > m_config.max_sescmd_history)
{
static bool warn_history_exceeded = true;
if (warn_history_exceeded)
@ -520,20 +536,17 @@ bool RWSplitSession::route_session_write(GWBUF* querybuf, uint8_t command, uint3
m_sescmd_list.clear();
}
if (m_config.prune_sescmd_history && !m_sescmd_list.empty()
&& m_sescmd_list.size() + 1 > m_config.max_sescmd_history)
{
// Close to the history limit, remove the oldest command
prune_to_position(m_sescmd_list.front()->get_position());
m_sescmd_list.pop_front();
}
if (m_config.disable_sescmd_history)
{
/** Prune stored responses */
ResponseMap::iterator it = m_sescmd_responses.lower_bound(lowest_pos);
if (it != m_sescmd_responses.end())
{
m_sescmd_responses.erase(m_sescmd_responses.begin(), it);
}
else
{
// All responses processed
m_sescmd_responses.clear();
}
prune_to_position(lowest_pos);
}
else
{
@ -554,8 +567,16 @@ bool RWSplitSession::route_session_write(GWBUF* querybuf, uint8_t command, uint3
}
else
{
MXS_ERROR("Could not route session command: %s", attempted_write ? "Write to all backends failed" :
"All connections have failed");
std::string status;
for (const auto& a : m_backends)
{
status += "\n";
status += a->get_verbose_status();
}
MXS_ERROR("Could not route session command: %s. Connection information: %s",
attempted_write ? "Write to all backends failed" : "All connections have failed",
status.c_str());
}
return nsucc;
@ -695,7 +716,6 @@ RWBackend* RWSplitSession::get_target_backend(backend_type_t btype,
if (name) /*< Choose backend by name from a hint */
{
mxb_assert(btype != BE_MASTER);
btype = BE_SLAVE;
rval = get_hinted_backend(name);
}
@ -800,10 +820,19 @@ RWBackend* RWSplitSession::handle_hinted_target(GWBUF* querybuf, route_target_t
{
if (TARGET_IS_NAMED_SERVER(route_target))
{
MXS_INFO("Was supposed to route to named server "
"%s but couldn't find the server in a "
"suitable state.",
named_server);
std::string status = "Could not find server";
for (const auto& a : m_backends)
{
if (strcmp(a->server()->name(), named_server) == 0)
{
status = a->server()->status_string();
break;
}
}
MXS_INFO("Was supposed to route to named server %s but couldn't find the server in a "
"suitable state. Server state: %s", named_server, status.c_str());
}
else if (TARGET_IS_RLAG_MAX(route_target))
{
@ -1013,6 +1042,7 @@ bool RWSplitSession::handle_master_is_target(RWBackend** dest)
if (m_current_master && m_current_master->in_use())
{
m_current_master->close();
m_current_master->set_close_reason("The original master is not available");
}
}
else if (!m_config.delayed_retry

View File

@ -78,6 +78,7 @@ static void discard_if_response_differs(RWBackend* backend,
STRPACKETTYPE(cmd),
query.empty() ? "<no query>" : query.c_str());
backend->close(mxs::Backend::CLOSE_FATAL);
backend->set_close_reason("Invalid response to: " + query);
}
}
@ -160,5 +161,32 @@ void RWSplitSession::process_sescmd_response(RWBackend* backend, GWBUF** ppPacke
gwbuf_free(*ppPacket);
*ppPacket = NULL;
}
if (m_expected_responses == 0
&& (command == MXS_COM_CHANGE_USER || command == MXS_COM_RESET_CONNECTION))
{
mxb_assert_message(m_slave_responses.empty(), "All responses should've been processed");
// This is the last session command to finish that resets the session state, reset the history
MXS_INFO("Resetting session command history (length: %lu)", m_sescmd_list.size());
/**
* Since new connections need to perform the COM_CHANGE_USER, pop it off the list along
* with the expected response to it.
*/
SSessionCommand latest = m_sescmd_list.back();
cmd = m_sescmd_responses[latest->get_position()];
m_sescmd_list.clear();
m_sescmd_responses.clear();
// Push the response back as the first executed session command
m_sescmd_list.push_back(latest);
m_sescmd_responses[latest->get_position()] = cmd;
// Adjust counters to match the number of stored session commands
m_recv_sescmd = 1;
m_sent_sescmd = 1;
m_sescmd_count = 2;
}
}
}

View File

@ -949,6 +949,7 @@ void RWSplitSession::handleError(GWBUF* errmsgbuf,
}
backend->close();
backend->set_close_reason("Master connection failed: " + extract_error(errmsgbuf));
}
else
{
@ -962,6 +963,7 @@ void RWSplitSession::handleError(GWBUF* errmsgbuf,
// Try to replay the transaction on another node
can_continue = start_trx_replay();
backend->close();
backend->set_close_reason("Read-only trx failed: " + extract_error(errmsgbuf));
if (!can_continue)
{
@ -982,6 +984,7 @@ void RWSplitSession::handleError(GWBUF* errmsgbuf,
m_otrx_state = OTRX_INACTIVE;
can_continue = start_trx_replay();
backend->close();
backend->set_close_reason("Optimistic trx failed: " + extract_error(errmsgbuf));
}
else
{
@ -1071,6 +1074,7 @@ bool RWSplitSession::handle_error_new_connection(DCB* backend_dcb, GWBUF* errmsg
* is closed, it's possible that the routing logic will pick the failed
* server as the target. */
backend->close();
backend->set_close_reason("Slave connection failed: " + extract_error(errmsg));
if (route_stored)
{

View File

@ -139,6 +139,7 @@ private:
void process_sescmd_response(mxs::RWBackend* backend, GWBUF** ppPacket);
void compress_history(mxs::SSessionCommand& sescmd);
void prune_to_position(uint64_t pos);
bool route_session_write(GWBUF* querybuf, uint8_t command, uint32_t type);
void continue_large_session_write(GWBUF* querybuf, uint32_t type);
bool route_single_stmt(GWBUF* querybuf);
@ -277,7 +278,7 @@ private:
int m_last_keepalive_check; /**< When the last ping was done */
int m_nbackends; /**< Number of backend servers (obsolete) */
DCB* m_client; /**< The client DCB */
uint64_t m_sescmd_count; /**< Number of executed session commands */
uint64_t m_sescmd_count; /**< Number of executed session commands (starts from 1) */
int m_expected_responses; /**< Number of expected responses to the current
* query */
GWBUF* m_query_queue; /**< Queued commands waiting to be executed */
@ -322,22 +323,35 @@ private:
*/
uint32_t get_internal_ps_id(RWSplitSession* rses, GWBUF* buffer);
#define STRTARGET(t) \
(t == TARGET_ALL ? "TARGET_ALL" \
: (t == TARGET_MASTER ? "TARGET_MASTER" \
: (t == TARGET_SLAVE ? "TARGET_SLAVE" \
: (t \
== TARGET_NAMED_SERVER \
? "TARGET_NAMED_SERVER" \
: (t \
== \
TARGET_RLAG_MAX \
? "TARGET_RLAG_MAX" \
: ( \
t \
== \
TARGET_UNDEFINED \
? \
"TARGET_UNDEFINED" \
: \
"Unknown target value"))))))
static inline const char* route_target_to_string(route_target_t target)
{
if (TARGET_IS_MASTER(target))
{
return "TARGET_MASTER";
}
else if (TARGET_IS_SLAVE(target))
{
return "TARGET_SLAVE";
}
else if (TARGET_IS_NAMED_SERVER(target))
{
return "TARGET_NAMED_SERVER";
}
else if (TARGET_IS_ALL(target))
{
return "TARGET_ALL";
}
else if (TARGET_IS_RLAG_MAX(target))
{
return "TARGET_RLAG_MAX";
}
else if (TARGET_IS_LAST_USED(target))
{
return "TARGET_LAST_USED";
}
else
{
mxb_assert(!true);
return "Unknown target value";
}
}