Clean up readwritesplit routing functions

Moved the more verbose parts of the routing code into subfunctions and
arranged it so that more relevant parts are closer to each other. Also
added the SQL statement that is being delayed to the message.
This commit is contained in:
Markus Mäkelä
2019-03-15 14:01:14 +02:00
parent 2bf55ed675
commit 0001babd26
3 changed files with 82 additions and 85 deletions

View File

@ -322,23 +322,7 @@ bool RWSplitSession::route_single_stmt(GWBUF* querybuf)
if (!succp && should_migrate_trx(target)) if (!succp && should_migrate_trx(target))
{ {
MXS_INFO("Starting transaction migration from '%s' to '%s'", return start_trx_migration(target, querybuf);
m_current_master->name(),
target->name());
/**
* Stash the current query so that the transaction replay treats
* it as if the query was interrupted.
*/
m_current_query.copy_from(querybuf);
/**
* After the transaction replay has been started, the rest of
* the query processing needs to be skipped. This is done to avoid
* the error logging done when no valid target is found for a query
* as well as to prevent retrying of queries in the wrong order.
*/
return start_trx_replay();
} }
} }
@ -362,15 +346,6 @@ bool RWSplitSession::route_single_stmt(GWBUF* querybuf)
{ {
// Target server was found and is in the correct state // Target server was found and is in the correct state
succp = handle_got_target(querybuf, target, store_stmt); succp = handle_got_target(querybuf, target, store_stmt);
if (succp && command == MXS_COM_STMT_EXECUTE && !is_locked_to_master())
{
/** Track the targets of the COM_STMT_EXECUTE statements. This
* information is used to route all COM_STMT_FETCH commands
* to the same server where the COM_STMT_EXECUTE was done. */
m_exec_map[stmt_id] = target;
MXS_INFO("COM_STMT_EXECUTE on %s: %s", target->name(), target->uri());
}
} }
} }
else if (can_retry_query() || can_continue_trx_replay()) else if (can_retry_query() || can_continue_trx_replay())
@ -995,6 +970,25 @@ bool RWSplitSession::should_migrate_trx(RWBackend* target)
m_can_replay_trx; m_can_replay_trx;
} }
bool RWSplitSession::start_trx_migration(RWBackend* target, GWBUF* querybuf)
{
MXS_INFO("Starting transaction migration from '%s' to '%s'", m_current_master->name(), target->name());
/**
* Stash the current query so that the transaction replay treats
* it as if the query was interrupted.
*/
m_current_query.copy_from(querybuf);
/**
* After the transaction replay has been started, the rest of
* the query processing needs to be skipped. This is done to avoid
* the error logging done when no valid target is found for a query
* as well as to prevent retrying of queries in the wrong order.
*/
return start_trx_replay();
}
/** /**
* @brief Handle master is the target * @brief Handle master is the target
* *
@ -1125,30 +1119,31 @@ GWBUF* RWSplitSession::add_prefix_wait_gtid(SERVER* server, GWBUF* origin)
*/ */
bool RWSplitSession::handle_got_target(GWBUF* querybuf, RWBackend* target, bool store) bool RWSplitSession::handle_got_target(GWBUF* querybuf, RWBackend* target, bool store)
{ {
mxb_assert(target->in_use()); mxb_assert_message(target->in_use(), "Target must be in use before routing to it");
mxb_assert_message(!target->has_session_commands(), "The session command cursor must not be active");
/** /**
* If the transaction is READ ONLY set forced_node to this backend. * TODO: This effectively disables pipelining of queries, very bad for batch insert performance. Replace
* This SLAVE backend will be used until the COMMIT is seen. * with proper, per server tracking of which responses need to be sent to the client. This would
* also solve MXS-2009 by speeding up session commands.
*/ */
mxb_assert_message(target->get_reply_state() == REPLY_STATE_DONE || m_qc.large_query(),
"Node must be idle when routing queries to it");
MXS_INFO("Route query to %s: %s \t%s <", target->is_master() ? "master" : "slave",
target->name(), target->uri());
if (!m_target_node && session_trx_is_read_only(m_client->session)) if (!m_target_node && session_trx_is_read_only(m_client->session))
{ {
// Lock the session to this node until the read-only transaction ends
m_target_node = target; m_target_node = target;
} }
MXS_INFO("Route query to %s: %s \t%s <",
target->is_master() ? "master" : "slave",
target->name(),
target->uri());
/** The session command cursor must not be active */
mxb_assert(!target->has_session_commands());
mxs::Backend::response_type response = mxs::Backend::NO_RESPONSE; mxs::Backend::response_type response = mxs::Backend::NO_RESPONSE;
uint8_t cmd = mxs_mysql_get_command(querybuf); uint8_t cmd = mxs_mysql_get_command(querybuf);
GWBUF* send_buf = gwbuf_clone(querybuf); GWBUF* send_buf = gwbuf_clone(querybuf);
if (m_config.causal_reads && cmd == COM_QUERY && !m_gtid_pos.empty() if (m_config.causal_reads && cmd == COM_QUERY && !m_gtid_pos.empty() && target->is_slave())
&& target->is_slave())
{ {
// Perform the causal read only when the query is routed to a slave // Perform the causal read only when the query is routed to a slave
send_buf = add_prefix_wait_gtid(target->server(), send_buf); send_buf = add_prefix_wait_gtid(target->server(), send_buf);
@ -1165,16 +1160,6 @@ bool RWSplitSession::handle_got_target(GWBUF* querybuf, RWBackend* target, bool
} }
bool large_query = is_large_query(querybuf); bool large_query = is_large_query(querybuf);
/**
* We should not be routing a query to a server that is busy processing a result.
*
* TODO: This effectively disables pipelining of queries, very bad for batch insert performance. Replace
* with proper, per server tracking of which responses need to be sent to the client. This would
* also solve MXS-2009 by speeding up session commands.
*/
mxb_assert(target->get_reply_state() == REPLY_STATE_DONE || m_qc.large_query());
uint32_t orig_id = 0; uint32_t orig_id = 0;
if (!is_locked_to_master() && mxs_mysql_is_ps_command(cmd) && !m_qc.large_query()) if (!is_locked_to_master() && mxs_mysql_is_ps_command(cmd) && !m_qc.large_query())
@ -1226,13 +1211,11 @@ bool RWSplitSession::handle_got_target(GWBUF* querybuf, RWBackend* target, bool
// Store the current target // Store the current target
m_prev_target = target; m_prev_target = target;
/**
* If a READ ONLY transaction is ending set forced_node to NULL
*/
if (m_target_node if (m_target_node
&& session_trx_is_read_only(m_client->session) && session_trx_is_read_only(m_client->session)
&& session_trx_is_ending(m_client->session)) && session_trx_is_ending(m_client->session))
{ {
// Read-only transaction is over, stop routing queries to a specific node
m_target_node = nullptr; m_target_node = nullptr;
} }
} }
@ -1247,5 +1230,14 @@ bool RWSplitSession::handle_got_target(GWBUF* querybuf, RWBackend* target, bool
MXS_ERROR("Routing query failed."); MXS_ERROR("Routing query failed.");
} }
if (success && cmd == MXS_COM_STMT_EXECUTE && !is_locked_to_master())
{
/** Track the targets of the COM_STMT_EXECUTE statements. This
* information is used to route all COM_STMT_FETCH commands
* to the same server where the COM_STMT_EXECUTE was done. */
m_exec_map[m_qc.current_route_info().stmt_id()] = target;
MXS_INFO("COM_STMT_EXECUTE on %s: %s", target->name(), target->uri());
}
return success; return success;
} }

View File

@ -25,7 +25,7 @@ RWSplitSession::RWSplitSession(RWSplit* instance, MXS_SESSION* session, mxs::SRW
: mxs::RouterSession(session) : mxs::RouterSession(session)
, m_backends(std::move(backends)) , m_backends(std::move(backends))
, m_raw_backends(sptr_vec_to_ptr_vec(m_backends)) , m_raw_backends(sptr_vec_to_ptr_vec(m_backends))
, m_current_master(master) , m_current_master(nullptr)
, m_target_node(nullptr) , m_target_node(nullptr)
, m_prev_target(nullptr) , m_prev_target(nullptr)
, m_config(instance->config()) , m_config(instance->config())
@ -88,10 +88,8 @@ RWSplitSession* RWSplitSession::create(RWSplit* router, MXS_SESSION* session)
void close_all_connections(PRWBackends& backends) void close_all_connections(PRWBackends& backends)
{ {
for (PRWBackends::iterator it = backends.begin(); it != backends.end(); it++) for (auto& backend : backends)
{ {
RWBackend* backend = *it;
if (backend->in_use()) if (backend->in_use())
{ {
backend->close(); backend->close();
@ -133,31 +131,12 @@ int32_t RWSplitSession::routeQuery(GWBUF* querybuf)
return 1; return 1;
} }
if (m_query_queue == NULL if (can_route_queries())
&& (m_expected_responses == 0
|| m_qc.load_data_state() == QueryClassifier::LOAD_DATA_ACTIVE
|| m_qc.large_query()))
{ {
/** Gather the information required to make routing decisions */ /** Gather the information required to make routing decisions */
QueryClassifier::current_target_t current_target;
if (m_target_node == NULL)
{
current_target = QueryClassifier::CURRENT_TARGET_UNDEFINED;
}
else if (m_target_node == m_current_master)
{
current_target = QueryClassifier::CURRENT_TARGET_MASTER;
}
else
{
current_target = QueryClassifier::CURRENT_TARGET_SLAVE;
}
if (!m_qc.large_query()) if (!m_qc.large_query())
{ {
m_qc.update_route_info(current_target, querybuf); m_qc.update_route_info(get_current_target(), querybuf);
} }
/** No active or pending queries */ /** No active or pending queries */
@ -168,15 +147,12 @@ int32_t RWSplitSession::routeQuery(GWBUF* querybuf)
} }
else else
{ {
/** // Already busy executing a query, put the query in a queue and route it later
* We are already processing a request from the client. Store the
* new query and wait for the previous one to complete.
*/
mxb_assert(m_expected_responses > 0 || m_query_queue); mxb_assert(m_expected_responses > 0 || m_query_queue);
MXS_INFO("Storing query (len: %d cmd: %0x), expecting %d replies to current command", MXS_INFO("Storing query (len: %d cmd: %0x), expecting %d replies to current command: %s",
gwbuf_length(querybuf), gwbuf_length(querybuf), GWBUF_DATA(querybuf)[4], m_expected_responses,
GWBUF_DATA(querybuf)[4], mxs::extract_sql(querybuf, 1024).c_str());
m_expected_responses);
m_query_queue = gwbuf_append(m_query_queue, querybuf); m_query_queue = gwbuf_append(m_query_queue, querybuf);
querybuf = NULL; querybuf = NULL;
rval = 1; rval = 1;

View File

@ -168,6 +168,7 @@ private:
bool should_replace_master(mxs::RWBackend* target); bool should_replace_master(mxs::RWBackend* target);
void replace_master(mxs::RWBackend* target); void replace_master(mxs::RWBackend* target);
bool should_migrate_trx(mxs::RWBackend* target); bool should_migrate_trx(mxs::RWBackend* target);
bool start_trx_migration(mxs::RWBackend* target, GWBUF* querybuf);
void log_master_routing_failure(bool found, void log_master_routing_failure(bool found,
mxs::RWBackend* old_master, mxs::RWBackend* old_master,
mxs::RWBackend* curr_master); mxs::RWBackend* curr_master);
@ -266,6 +267,34 @@ private:
return buflen == MYSQL_HEADER_LEN + GW_MYSQL_MAX_PACKET_LEN; return buflen == MYSQL_HEADER_LEN + GW_MYSQL_MAX_PACKET_LEN;
} }
inline bool can_route_queries() const
{
return m_query_queue == NULL
&& (m_expected_responses == 0
|| m_qc.load_data_state() == mxs::QueryClassifier::LOAD_DATA_ACTIVE
|| m_qc.large_query());
}
inline mxs::QueryClassifier::current_target_t get_current_target() const
{
mxs::QueryClassifier::current_target_t current_target;
if (m_target_node == NULL)
{
current_target = mxs::QueryClassifier::CURRENT_TARGET_UNDEFINED;
}
else if (m_target_node == m_current_master)
{
current_target = mxs::QueryClassifier::CURRENT_TARGET_MASTER;
}
else
{
current_target = mxs::QueryClassifier::CURRENT_TARGET_SLAVE;
}
return current_target;
}
void update_trx_statistics() void update_trx_statistics()
{ {
if (session_trx_is_ending(m_client->session)) if (session_trx_is_ending(m_client->session))