MXS-2313: Use servers of same rank in readwritesplit
When a readwritesplit session has a connection to a master server, servers of the same rank as the master are used. If no master connection is available, the server with the highest rank among all connected servers is used. If there are no open connections, the server with the best rank is chosen and a connection to it is made. Connections with different rank values than what is the current rank value of the session will be discarded. This reduces the use of server with different ranks when the master server of a session fails. Without the active pruning of connections, slave connections to primary clusters without masters would remain in use even after the primary master fails. This guarantees full switchover to a secondary cluster if a master change occurs.
This commit is contained in:
@ -228,6 +228,41 @@ int get_backend_priority(RWBackend* backend, bool masters_accepts_reads)
|
||||
return priority;
|
||||
}
|
||||
|
||||
int64_t RWSplitSession::get_current_rank()
|
||||
{
|
||||
int64_t rv = 1;
|
||||
|
||||
if (m_current_master && m_current_master->in_use())
|
||||
{
|
||||
rv = m_current_master->server()->rank();
|
||||
}
|
||||
else
|
||||
{
|
||||
auto compare = [](RWBackend* a, RWBackend* b) {
|
||||
if (a->in_use() != b->in_use())
|
||||
{
|
||||
return a->in_use();
|
||||
}
|
||||
else if (a->can_connect() != b->can_connect())
|
||||
{
|
||||
return a->can_connect();
|
||||
}
|
||||
else
|
||||
{
|
||||
return a->server()->rank() < b->server()->rank();
|
||||
}
|
||||
};
|
||||
auto it = std::min_element(m_raw_backends.begin(), m_raw_backends.end(), compare);
|
||||
|
||||
if (it != m_raw_backends.end())
|
||||
{
|
||||
rv = (*it)->server()->rank();
|
||||
}
|
||||
}
|
||||
|
||||
return rv;
|
||||
}
|
||||
|
||||
RWBackend* RWSplitSession::get_slave_backend(int max_rlag)
|
||||
{
|
||||
thread_local PRWBackends candidates;
|
||||
@ -235,7 +270,7 @@ RWBackend* RWSplitSession::get_slave_backend(int max_rlag)
|
||||
|
||||
auto counts = get_slave_counts(m_raw_backends, m_current_master);
|
||||
int best_priority {INT_MAX};
|
||||
auto best_rank = std::numeric_limits<int64_t>::max();
|
||||
auto current_rank = get_current_rank();
|
||||
|
||||
// Create a list of backends valid for read operations
|
||||
for (auto& backend : m_raw_backends)
|
||||
@ -250,16 +285,15 @@ RWBackend* RWSplitSession::get_slave_backend(int max_rlag)
|
||||
int priority = get_backend_priority(backend, m_config.master_accept_reads);
|
||||
auto rank = backend->server()->rank();
|
||||
|
||||
if (master_or_slave && is_usable && rlag_ok)
|
||||
if (master_or_slave && is_usable && rlag_ok && rank == current_rank)
|
||||
{
|
||||
if (rank < best_rank || (rank == best_rank && priority < best_priority))
|
||||
if (priority < best_priority)
|
||||
{
|
||||
candidates.clear();
|
||||
best_rank = rank;
|
||||
best_priority = priority;
|
||||
}
|
||||
|
||||
if (rank == best_rank && priority == best_priority)
|
||||
if (priority == best_priority)
|
||||
{
|
||||
candidates.push_back(backend);
|
||||
}
|
||||
@ -278,22 +312,6 @@ RWBackend* RWSplitSession::get_slave_backend(int max_rlag)
|
||||
return (rval == candidates.end()) ? nullptr : *rval;
|
||||
}
|
||||
|
||||
void add_backend_with_rank(RWBackend* backend, PRWBackends* candidates, int64_t* best_rank)
|
||||
{
|
||||
auto rank = backend->server()->rank();
|
||||
|
||||
if (rank < *best_rank)
|
||||
{
|
||||
*best_rank = rank;
|
||||
candidates->clear();
|
||||
}
|
||||
|
||||
if (rank == *best_rank)
|
||||
{
|
||||
candidates->push_back(backend);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Log server connections
|
||||
*
|
||||
@ -379,7 +397,18 @@ RWBackend* get_root_master(const PRWBackends& backends, RWBackend* current_maste
|
||||
{
|
||||
if (backend->can_connect() && backend->is_master())
|
||||
{
|
||||
add_backend_with_rank(backend, &candidates, &best_rank);
|
||||
auto rank = backend->server()->rank();
|
||||
|
||||
if (rank < best_rank)
|
||||
{
|
||||
best_rank = rank;
|
||||
candidates.clear();
|
||||
}
|
||||
|
||||
if (rank == best_rank)
|
||||
{
|
||||
candidates.push_back(backend);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -456,15 +485,17 @@ bool RWSplitSession::open_connections()
|
||||
|
||||
int n_slaves = get_slave_counts(m_raw_backends, master).second;
|
||||
int max_nslaves = m_router->max_slave_count();
|
||||
auto best_rank = std::numeric_limits<int64_t>::max();
|
||||
PRWBackends candidates;
|
||||
mxb_assert(n_slaves <= max_nslaves || max_nslaves == 0);
|
||||
auto current_rank = get_current_rank();
|
||||
thread_local PRWBackends candidates;
|
||||
candidates.clear();
|
||||
|
||||
for (auto& pBackend : m_raw_backends)
|
||||
for (auto& backend : m_raw_backends)
|
||||
{
|
||||
if (!pBackend->in_use() && pBackend->can_connect() && valid_for_slave(pBackend, master))
|
||||
if (!backend->in_use() && backend->can_connect() && valid_for_slave(backend, master)
|
||||
&& backend->server()->rank() == current_rank)
|
||||
{
|
||||
add_backend_with_rank(pBackend, &candidates, &best_rank);
|
||||
candidates.push_back(backend);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -563,11 +563,23 @@ static bool server_is_shutting_down(GWBUF* writebuf)
|
||||
|
||||
void RWSplitSession::close_stale_connections()
|
||||
{
|
||||
auto current_rank = get_current_rank();
|
||||
|
||||
for (auto& backend : m_backends)
|
||||
{
|
||||
if (backend->in_use() && !backend->can_connect())
|
||||
if (backend->in_use())
|
||||
{
|
||||
backend->close();
|
||||
if (!backend->can_connect())
|
||||
{
|
||||
MXS_INFO("Discarding connection to '%s': Server is in maintenance", backend->name());
|
||||
backend->close();
|
||||
}
|
||||
else if (backend->server()->rank() != current_rank)
|
||||
{
|
||||
MXS_INFO("Discarding connection to '%s': Server has rank %ld and current rank is %ld",
|
||||
backend->name(), backend->server()->rank(), current_rank);
|
||||
backend->close();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -144,6 +144,7 @@ private:
|
||||
bool route_stored_query();
|
||||
void close_stale_connections();
|
||||
|
||||
int64_t get_current_rank();
|
||||
mxs::RWBackend* get_hinted_backend(char* name);
|
||||
mxs::RWBackend* get_slave_backend(int max_rlag);
|
||||
mxs::RWBackend* get_master_backend();
|
||||
|
Reference in New Issue
Block a user