MXS-1506: Use session_delay_routing with retry_failed_reads
Using the same functionality with the failed read retrying mechanism removes the need to have multiple versions of the target deduction logic.
This commit is contained in:
@ -205,8 +205,6 @@ bool RWSplitSession::route_single_stmt(GWBUF *querybuf, const RouteInfo& info)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
MXS_SESSION* session = m_client->session;
|
|
||||||
|
|
||||||
if (succp && target)
|
if (succp && target)
|
||||||
{
|
{
|
||||||
// We have a valid target, reset retry duration
|
// We have a valid target, reset retry duration
|
||||||
@ -240,12 +238,11 @@ bool RWSplitSession::route_single_stmt(GWBUF *querybuf, const RouteInfo& info)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else if (m_config.query_retry_interval > 0 &&
|
else if (can_retry_query())
|
||||||
m_retry_duration < m_config.query_retry_timeout &&
|
|
||||||
!session_trx_is_active(session))
|
|
||||||
{
|
{
|
||||||
// Try to route the query again later
|
// Try to route the query again later
|
||||||
uint64_t interval = m_config.query_retry_interval;
|
uint64_t interval = m_config.query_retry_interval;
|
||||||
|
MXS_SESSION* session = m_client->session;
|
||||||
session_delay_routing(session, router_as_downstream(session),
|
session_delay_routing(session, router_as_downstream(session),
|
||||||
gwbuf_clone(querybuf), interval);
|
gwbuf_clone(querybuf), interval);
|
||||||
m_retry_duration += interval;
|
m_retry_duration += interval;
|
||||||
|
|||||||
@ -211,56 +211,6 @@ bool RWSplitSession::route_stored_query()
|
|||||||
return rval;
|
return rval;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool RWSplitSession::reroute_stored_statement(const SRWBackend& old, GWBUF *stored)
|
|
||||||
{
|
|
||||||
bool success = false;
|
|
||||||
|
|
||||||
if (!session_trx_is_active(m_client->session))
|
|
||||||
{
|
|
||||||
/**
|
|
||||||
* Only try to retry the read if autocommit is enabled and we are
|
|
||||||
* outside of a transaction
|
|
||||||
*/
|
|
||||||
for (auto it = m_backends.begin(); it != m_backends.end(); it++)
|
|
||||||
{
|
|
||||||
SRWBackend& backend = *it;
|
|
||||||
|
|
||||||
if (backend->in_use() && backend != old &&
|
|
||||||
!backend->is_master() && backend->is_slave())
|
|
||||||
{
|
|
||||||
/** Found a valid candidate; a non-master slave that's in use */
|
|
||||||
if (backend->write(stored))
|
|
||||||
{
|
|
||||||
MXS_INFO("Retrying failed read at '%s'.", backend->name());
|
|
||||||
ss_dassert(backend->get_reply_state() == REPLY_STATE_DONE);
|
|
||||||
backend->set_reply_state(REPLY_STATE_START);
|
|
||||||
m_expected_responses++;
|
|
||||||
success = true;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!success && m_current_master && m_current_master->in_use())
|
|
||||||
{
|
|
||||||
/**
|
|
||||||
* Either we failed to write to the slave or no valid slave was found.
|
|
||||||
* Try to retry the read on the master.
|
|
||||||
*/
|
|
||||||
if (m_current_master->write(stored))
|
|
||||||
{
|
|
||||||
MXS_INFO("Retrying failed read at '%s'.", m_current_master->name());
|
|
||||||
ss_dassert(m_current_master->get_reply_state() == REPLY_STATE_DONE);
|
|
||||||
m_current_master->set_reply_state(REPLY_STATE_START);
|
|
||||||
m_expected_responses++;
|
|
||||||
success = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return success;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @bref discard the result of wait gtid statment, the result will be an error
|
* @bref discard the result of wait gtid statment, the result will be an error
|
||||||
* packet or an error packet.
|
* packet or an error packet.
|
||||||
@ -664,32 +614,31 @@ bool RWSplitSession::handle_error_new_connection(DCB *backend_dcb, GWBUF *errmsg
|
|||||||
*/
|
*/
|
||||||
GWBUF *stored = NULL;
|
GWBUF *stored = NULL;
|
||||||
const SERVER *target = NULL;
|
const SERVER *target = NULL;
|
||||||
if (!session_take_stmt(backend_dcb->session, &stored, &target) ||
|
|
||||||
target != backend->backend()->server ||
|
if (session_take_stmt(backend_dcb->session, &stored, &target) &&
|
||||||
!reroute_stored_statement(backend, stored))
|
m_config.retry_failed_reads)
|
||||||
|
{
|
||||||
|
ss_dassert(target == backend->server());
|
||||||
|
MXS_INFO("Re-routing failed read after server '%s' failed", backend->name());
|
||||||
|
MXS_SESSION* session = m_client->session;
|
||||||
|
|
||||||
|
// Try to route the failed read as often as possible
|
||||||
|
session_delay_routing(session, router_as_downstream(session), stored, 1);
|
||||||
|
}
|
||||||
|
else
|
||||||
{
|
{
|
||||||
/**
|
|
||||||
* We failed to route the stored statement or no statement was
|
|
||||||
* stored for this server. Either way we can safely free the buffer
|
|
||||||
* and decrement the expected response count.
|
|
||||||
*/
|
|
||||||
gwbuf_free(stored);
|
gwbuf_free(stored);
|
||||||
|
|
||||||
if (!backend->has_session_commands())
|
if (!backend->has_session_commands())
|
||||||
{
|
{
|
||||||
/**
|
/** The backend was not executing a session command so the client
|
||||||
* The backend was executing a command that requires a reply.
|
* is expecting a response. Send an error so they know to proceed. */
|
||||||
* Send an error to the client to let it know the query has
|
m_client->func.write(m_client, gwbuf_clone(errmsg));
|
||||||
* failed.
|
|
||||||
*/
|
|
||||||
DCB *client_dcb = ses->client_dcb;
|
|
||||||
client_dcb->func.write(client_dcb, gwbuf_clone(errmsg));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (m_expected_responses == 0)
|
if (m_expected_responses == 0)
|
||||||
{
|
{
|
||||||
/** The response from this server was the last one, try to
|
// This was the last response, try to route pending queries
|
||||||
* route all queued queries */
|
|
||||||
route_stored = true;
|
route_stored = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -138,7 +138,6 @@ private:
|
|||||||
bool route_session_write(GWBUF *querybuf, uint8_t command, uint32_t type);
|
bool route_session_write(GWBUF *querybuf, uint8_t command, uint32_t type);
|
||||||
bool route_single_stmt(GWBUF *querybuf, const RouteInfo& info);
|
bool route_single_stmt(GWBUF *querybuf, const RouteInfo& info);
|
||||||
bool route_stored_query();
|
bool route_stored_query();
|
||||||
bool reroute_stored_statement(const mxs::SRWBackend& old, GWBUF *stored);
|
|
||||||
|
|
||||||
mxs::SRWBackend get_hinted_backend(char *name);
|
mxs::SRWBackend get_hinted_backend(char *name);
|
||||||
mxs::SRWBackend get_slave_backend(int max_rlag);
|
mxs::SRWBackend get_slave_backend(int max_rlag);
|
||||||
@ -185,6 +184,13 @@ private:
|
|||||||
bool lock_to_master();
|
bool lock_to_master();
|
||||||
bool is_locked_to_master() const;
|
bool is_locked_to_master() const;
|
||||||
bool supports_hint(HINT_TYPE hint_type) const;
|
bool supports_hint(HINT_TYPE hint_type) const;
|
||||||
|
|
||||||
|
inline bool can_retry_query() const
|
||||||
|
{
|
||||||
|
return m_config.query_retry_interval > 0 &&
|
||||||
|
m_retry_duration < m_config.query_retry_timeout &&
|
||||||
|
!session_trx_is_active(m_client->session);
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
Reference in New Issue
Block a user