MXS-1777 Refactor selection.

This commit refactors slave selection. The compare is still pair-wise but isolated into a small run_comparison() function. The function get_slave_candidate() is used when new connections are created, which I both moved and modified (had to move due to scoping), so diff is off.
The slave selection for routing:  get_slave_backend() now has the filtering logic from old get_slave_backend() and compare_backends(), the latter of which is removed.

Backend functions mostly take shared_ptr<SRWBackend> in various forms (as is, const ref, in a container). Ideally the shared_ptr would be used only to where it is really needed, and either naked ptrs or references to RWBackend would be used. This refactor does not address that issue, but compounds it by using even deeper shared_ptr structures. Fixing that in a future commit.
This commit is contained in:
Niclas Antti 2018-07-30 18:12:56 +03:00
parent d52885d68d
commit de6ad542c4
6 changed files with 306 additions and 243 deletions

View File

@ -20,6 +20,7 @@
#include <maxscale/cdefs.h>
#include <maxbase/jansson.h>
#include <maxbase/average.hh>
#include <maxscale/config.h>
#include <maxscale/dcb.h>
@ -30,7 +31,7 @@ MXS_BEGIN_DECLS
#define MAX_SERVER_MONPW_LEN 1024
#define MAX_SERVER_VERSION_LEN 256
#define MAX_NUM_SLAVES 128 /**< Maximum number of slaves under a single server*/
#define MAX_NUM_SLAVES 128 /**< Maximum number of slaves under a single server*/
/**
* Server configuration parameters names
@ -58,10 +59,10 @@ const int MXS_RLAG_UNDEFINED = -1;
*/
typedef struct server_params
{
char *name; /**< Parameter name */
char *value; /**< Parameter value */
bool active; /**< Whether the parameter is valid */
struct server_params *next; /**< Next Paramter in the linked list */
char* name; /**< Parameter name */
char* value; /**< Parameter value */
bool active; /**< Whether the parameter is valid */
struct server_params* next; /**< Next Paramter in the linked list */
} SERVER_PARAM;
/**
@ -69,13 +70,13 @@ typedef struct server_params
*/
typedef struct
{
int n_connections; /**< Number of connections */
int n_current; /**< Current connections */
int n_current_ops; /**< Current active operations */
int n_persistent; /**< Current persistent pool */
uint64_t n_new_conn; /**< Times the current pool was empty */
uint64_t n_from_pool; /**< Times when a connection was available from the pool */
uint64_t packets; /**< Number of packets routed to this server */
int n_connections; /**< Number of connections */
int n_current; /**< Current connections */
int n_current_ops; /**< Current active operations */
int n_persistent; /**< Current persistent pool */
uint64_t n_new_conn; /**< Times the current pool was empty */
uint64_t n_from_pool; /**< Times when a connection was available from the pool */
uint64_t packets; /**< Number of packets routed to this server */
} SERVER_STATS;
/**
@ -119,49 +120,55 @@ static uint64_t server_encode_version(const SERVER_VERSION* server_version)
typedef struct server
{
// Base settings
char *name; /**< Server config name */
char* name; /**< Server config name */
char address[MAX_SERVER_ADDRESS_LEN]; /**< Server hostname/IP-address */
unsigned short port; /**< Server port */
char *protocol; /**< Backend protocol module name */
char *authenticator; /**< Authenticator module name */
unsigned short port; /**< Server port */
char* protocol; /**< Backend protocol module name */
char* authenticator; /**< Authenticator module name */
// Other settings
char monuser[MAX_SERVER_MONUSER_LEN]; /**< Monitor username, overrides monitor setting */
char monpw[MAX_SERVER_MONPW_LEN]; /**< Monitor password, overrides monitor setting */
long persistpoolmax; /**< Maximum size of persistent connections pool */
long persistmaxtime; /**< Maximum number of seconds connection can live */
bool proxy_protocol; /**< Send proxy-protocol header to backends when connecting
* routing sessions. */
SERVER_PARAM *parameters; /**< Additional custom parameters which may affect routing decisions. */
char monuser[MAX_SERVER_MONUSER_LEN]; /**< Monitor username, overrides monitor setting */
char monpw[MAX_SERVER_MONPW_LEN]; /**< Monitor password, overrides monitor setting */
long persistpoolmax; /**< Maximum size of persistent connections pool */
long persistmaxtime; /**< Maximum number of seconds connection can live */
bool proxy_protocol; /**< Send proxy-protocol header to backends when connecting
* routing sessions. */
SERVER_PARAM* parameters; /**< Additional custom parameters which may affect routing
* decisions. */
// Base variables
SPINLOCK lock; /**< Access lock. Required when modifying server status or settings. */
bool is_active; /**< Server is active and has not been "destroyed" */
void *auth_instance; /**< Authenticator instance data */
SSL_LISTENER *server_ssl; /**< SSL data */
DCB **persistent; /**< List of unused persistent connections to the server */
uint8_t charset; /**< Server character set. Read from backend and sent to client. */
SPINLOCK lock; /**< Access lock. Required when modifying server status or settings. */
bool is_active; /**< Server is active and has not been "destroyed" */
void* auth_instance;/**< Authenticator instance data */
SSL_LISTENER* server_ssl; /**< SSL data */
DCB** persistent; /**< List of unused persistent connections to the server */
uint8_t charset; /**< Server character set. Read from backend and sent to client. */
// Statistics and events
SERVER_STATS stats; /**< The server statistics, e.g. number of connections */
int persistmax; /**< Maximum pool size actually achieved since startup */
int last_event; /**< The last event that occurred on this server */
int64_t triggered_at; /**< Time when the last event was triggered */
// Status descriptors. Updated automatically by a monitor or manually by the admin
uint64_t status; /**< Current status flag bitmap */
int maint_request; /**< Is admin requesting Maintenance=ON/OFF on the server? */
char version_string[MAX_SERVER_VERSION_LEN]; /**< Server version string as given by backend */
uint64_t version; /**< Server version numeric representation */
server_type_t server_type; /**< Server type (MariaDB or MySQL), deduced from version string */
long node_id; /**< Node id, server_id for M/S or local_index for Galera */
int rlag; /**< Replication Lag for Master/Slave replication */
unsigned long node_ts; /**< Last timestamp set from M/S monitor module */
long master_id; /**< Master server id of this node */
uint64_t status; /**< Current status flag bitmap */
int maint_request; /**< Is admin requesting Maintenance=ON/OFF on the
* server? */
char version_string[MAX_SERVER_VERSION_LEN]; /**< Server version string as given by backend */
uint64_t version; /**< Server version numeric representation */
server_type_t server_type; /**< Server type (MariaDB or MySQL), deduced from
* version string */
long node_id; /**< Node id, server_id for M/S or local_index for
* Galera */
int rlag; /**< Replication Lag for Master/Slave replication
* */
unsigned long node_ts; /**< Last timestamp set from M/S monitor module */
long master_id; /**< Master server id of this node */
// Misc fields
bool master_err_is_logged; /**< If node failed, this indicates whether it is logged. Only used
* by rwsplit. TODO: Move to rwsplit */
bool warn_ssl_not_enabled; /**< SSL not used for an SSL enabled server */
MxsDiskSpaceThreshold* disk_space_threshold; /**< Disk space thresholds */
bool master_err_is_logged; /**< If node failed, this indicates whether it is logged. Only
* used
* by rwsplit. TODO: Move to rwsplit */
bool warn_ssl_not_enabled;/**< SSL not used for an SSL enabled server */
MxsDiskSpaceThreshold* disk_space_threshold;/**< Disk space thresholds */
// TODO, this is a plain ptr to a C++ class. Soonish, when the server is new/deleted
// this will become a std::unique ptr. But not in this commit.
maxbase::EMAverage* response_time; /**< for calculating average response time */
maxbase::EMAverage* response_time; /**< for calculating average response time */
} SERVER;
/**
@ -169,30 +176,53 @@ typedef struct server
* individual bits are independent, not all combinations make sense or are used. The bitfield is 64bits wide.
*/
// Bits used by most monitors
#define SERVER_RUNNING (1 << 0) /**<< The server is up and running */
#define SERVER_MAINT (1 << 1) /**<< Server is in maintenance mode */
#define SERVER_AUTH_ERROR (1 << 2) /**<< Authentication error from monitor */
#define SERVER_MASTER (1 << 3) /**<< The server is a master, i.e. can handle writes */
#define SERVER_SLAVE (1 << 4) /**<< The server is a slave, i.e. can handle reads */
#define SERVER_RUNNING (1 << 0) /**<< The server is up and running */
#define SERVER_MAINT (1 << 1) /**<< Server is in maintenance mode */
#define SERVER_AUTH_ERROR (1 << 2) /**<< Authentication error from monitor */
#define SERVER_MASTER (1 << 3) /**<< The server is a master, i.e. can handle writes */
#define SERVER_SLAVE (1 << 4) /**<< The server is a slave, i.e. can handle reads */
// Bits used by MariaDB Monitor (mostly)
#define SERVER_SLAVE_OF_EXT_MASTER (1 << 5) /**<< Server is slave of a non-monitored master */
#define SERVER_RELAY (1 << 6) /**<< Server is a relay */
#define SERVER_WAS_MASTER (1 << 7) /**<< Server was a master but lost all slaves. */
#define SERVER_SLAVE_OF_EXT_MASTER (1 << 5) /**<< Server is slave of a non-monitored master */
#define SERVER_RELAY (1 << 6) /**<< Server is a relay */
#define SERVER_WAS_MASTER (1 << 7) /**<< Server was a master but lost all slaves. */
// Bits used by other monitors
#define SERVER_JOINED (1 << 8) /**<< The server is joined in a Galera cluster */
#define SERVER_NDB (1 << 9) /**<< The server is part of a MySQL cluster setup */
#define SERVER_MASTER_STICKINESS (1 << 10) /**<< Server Master stickiness */
#define SERVER_JOINED (1 << 8) /**<< The server is joined in a Galera cluster */
#define SERVER_NDB (1 << 9) /**<< The server is part of a MySQL cluster setup */
#define SERVER_MASTER_STICKINESS (1 << 10) /**<< Server Master stickiness */
// Bits providing general information
#define SERVER_DISK_SPACE_EXHAUSTED (1 << 31) /**<< The disk space of the server is exhausted */
#define SERVER_DISK_SPACE_EXHAUSTED (1 << 31) /**<< The disk space of the server is exhausted */
#define STRSRVSTATUS(s) (server_is_master(s) ? "RUNNING MASTER" : \
(server_is_slave(s) ? "RUNNING SLAVE" : \
(server_is_joined(s) ? "RUNNING JOINED" : \
(server_is_ndb(s) ? "RUNNING NDB" : \
((server_is_running(s) && server_is_in_maint(s)) ? "RUNNING MAINTENANCE" : \
(server_is_relay(s) ? "RUNNING RELAY" : \
(server_is_usable(s) ? "RUNNING (only)" : \
(server_is_down(s) ? "DOWN" : "UNKNOWN STATUS"))))))))
#define STRSRVSTATUS(s) \
(server_is_master(s) ? "RUNNING MASTER" \
: (server_is_slave(s) ? "RUNNING SLAVE" \
: (server_is_joined(s) ? "RUNNING JOINED" \
: (server_is_ndb(s) ? \
"RUNNING NDB" \
: (( \
server_is_running( \
s) \
&& \
server_is_in_maint( \
s)) \
? \
"RUNNING MAINTENANCE" \
: ( \
server_is_relay( \
s) \
? \
"RUNNING RELAY" \
: ( \
server_is_usable( \
s) \
? \
"RUNNING (only)" \
: ( \
server_is_down( \
s) \
? \
"DOWN" \
: \
"UNKNOWN STATUS"))))))))
/**
* Is the server valid and active?
@ -223,7 +253,7 @@ inline bool server_is_usable(const SERVER* server)
inline bool status_is_running(uint64_t status)
{
return (status & SERVER_RUNNING);
return status & SERVER_RUNNING;
}
/**
@ -303,8 +333,8 @@ inline bool server_is_slave(const SERVER* server)
inline bool status_is_relay(uint64_t status)
{
return (status & (SERVER_RUNNING | SERVER_RELAY | SERVER_MAINT)) == \
(SERVER_RUNNING | SERVER_RELAY);
return (status & (SERVER_RUNNING | SERVER_RELAY | SERVER_MAINT)) \
== (SERVER_RUNNING | SERVER_RELAY);
}
inline bool server_is_relay(const SERVER* server)
@ -314,8 +344,8 @@ inline bool server_is_relay(const SERVER* server)
inline bool status_is_joined(uint64_t status)
{
return (status & (SERVER_RUNNING | SERVER_JOINED | SERVER_MAINT)) ==
(SERVER_RUNNING | SERVER_JOINED);
return (status & (SERVER_RUNNING | SERVER_JOINED | SERVER_MAINT))
== (SERVER_RUNNING | SERVER_JOINED);
}
/**
@ -341,14 +371,14 @@ inline bool server_is_ndb(const SERVER* server)
inline bool server_is_in_cluster(const SERVER* server)
{
return ((server->status &
(SERVER_MASTER | SERVER_SLAVE | SERVER_RELAY | SERVER_JOINED | SERVER_NDB)) != 0);
return (server->status
& (SERVER_MASTER | SERVER_SLAVE | SERVER_RELAY | SERVER_JOINED | SERVER_NDB)) != 0;
}
inline bool status_is_slave_of_ext_master(uint64_t status)
{
return ((status & (SERVER_RUNNING | SERVER_SLAVE_OF_EXT_MASTER)) ==
(SERVER_RUNNING | SERVER_SLAVE_OF_EXT_MASTER));
return (status & (SERVER_RUNNING | SERVER_SLAVE_OF_EXT_MASTER))
== (SERVER_RUNNING | SERVER_SLAVE_OF_EXT_MASTER);
}
inline bool server_is_slave_of_ext_master(const SERVER* server)
@ -358,7 +388,7 @@ inline bool server_is_slave_of_ext_master(const SERVER* server)
inline bool status_is_disk_space_exhausted(uint64_t status)
{
return (status & SERVER_DISK_SPACE_EXHAUSTED);
return status & SERVER_DISK_SPACE_EXHAUSTED;
}
inline bool server_is_disk_space_exhausted(const SERVER* server)
@ -378,7 +408,7 @@ inline bool server_is_disk_space_exhausted(const SERVER* server)
*
* @return The newly created server or NULL if an error occurred
*/
extern SERVER* server_alloc(const char *name, MXS_CONFIG_PARAMETER* params);
extern SERVER* server_alloc(const char* name, MXS_CONFIG_PARAMETER* params);
/**
* @brief Serialize a server to a file
@ -390,7 +420,16 @@ extern SERVER* server_alloc(const char *name, MXS_CONFIG_PARAMETER* params);
* @param server Server to serialize
* @return False if the serialization of the server fails, true if it was successful
*/
bool server_serialize(const SERVER *server);
bool server_serialize(const SERVER* server);
/**
* @brief Add a server parameter
*
* @param server Server where the parameter is added
* @param name Parameter name
* @param value Parameter value
*/
void server_add_parameter(SERVER* server, const char* name, const char* value);
/**
* @brief Remove a server parameter
@ -399,7 +438,7 @@ bool server_serialize(const SERVER *server);
* @param name The name of the parameter to remove
* @return True if a parameter was removed
*/
bool server_remove_parameter(SERVER *server, const char *name);
bool server_remove_parameter(SERVER* server, const char* name);
/**
* @brief Set server parameter
@ -416,7 +455,7 @@ void server_set_parameter(SERVER *server, const char *name, const char *value);
* @param server Server to check
* @return True if the server points to a local MaxScale service
*/
bool server_is_mxs_service(const SERVER *server);
bool server_is_mxs_service(const SERVER* server);
/**
* @brief Convert a server to JSON format
@ -445,7 +484,7 @@ json_t* server_list_to_json(const char* host);
*
* @return True, if the provided string is valid and the threshold could be set.
*/
bool server_set_disk_space_threshold(SERVER *server, const char *disk_space_threshold);
bool server_set_disk_space_threshold(SERVER* server, const char* disk_space_threshold);
/**
* @brief Add a response average to the server response average.
@ -455,35 +494,38 @@ bool server_set_disk_space_threshold(SERVER *server, const char *disk_space_thre
* @param num_samples Number of samples the average consists of.
*
*/
void server_add_response_average(SERVER *server, double ave, int num_samples);
void server_add_response_average(SERVER* server, double ave, int num_samples);
extern int server_free(SERVER *server);
extern SERVER *server_find_by_unique_name(const char *name);
extern int server_find_by_unique_names(char **server_names, int size, SERVER*** output);
extern SERVER *server_find(const char *servname, unsigned short port);
extern char *server_status(const SERVER *);
extern void server_clear_set_status_nolock(SERVER *server, uint64_t bits_to_clear, uint64_t bits_to_set);
extern void server_set_status_nolock(SERVER *server, uint64_t bit);
extern void server_clear_status_nolock(SERVER *server, uint64_t bit);
extern void server_transfer_status(SERVER *dest_server, const SERVER *source_server);
extern void server_add_mon_user(SERVER *server, const char *user, const char *passwd);
extern size_t server_get_parameter(const SERVER *server, const char *name, char* out, size_t size);
extern void server_update_credentials(SERVER *server, const char *user, const char *passwd);
extern DCB* server_get_persistent(SERVER *server, const char *user, const char* ip, const char *protocol,
int id);
extern void server_update_address(SERVER *server, const char *address);
extern void server_update_port(SERVER *server, unsigned short port);
extern uint64_t server_map_status(const char *str);
extern void server_set_version_string(SERVER* server, const char* version_string);
extern void server_set_version(SERVER* server, const char* version_string, uint64_t version);
extern int server_free(SERVER* server);
extern SERVER* server_find_by_unique_name(const char* name);
extern int server_find_by_unique_names(char** server_names, int size, SERVER*** output);
extern SERVER* server_find(const char* servname, unsigned short port);
extern char* server_status(const SERVER*);
extern void server_clear_set_status_nolock(SERVER* server, uint64_t bits_to_clear, uint64_t bits_to_set);
extern void server_set_status_nolock(SERVER* server, uint64_t bit);
extern void server_clear_status_nolock(SERVER* server, uint64_t bit);
extern void server_transfer_status(SERVER* dest_server, const SERVER* source_server);
extern void server_add_mon_user(SERVER* server, const char* user, const char* passwd);
extern size_t server_get_parameter(const SERVER* server, const char* name, char* out, size_t size);
extern void server_update_credentials(SERVER* server, const char* user, const char* passwd);
extern DCB* server_get_persistent(SERVER* server,
const char* user,
const char* ip,
const char* protocol,
int id);
extern void server_update_address(SERVER* server, const char* address);
extern void server_update_port(SERVER* server, unsigned short port);
extern uint64_t server_map_status(const char* str);
extern void server_set_version_string(SERVER* server, const char* version_string);
extern void server_set_version(SERVER* server, const char* version_string, uint64_t version);
extern uint64_t server_get_version(const SERVER* server);
extern void printServer(const SERVER *);
extern void printServer(const SERVER*);
extern void printAllServers();
extern void dprintAllServers(DCB *);
extern void dprintAllServersJson(DCB *);
extern void dprintServer(DCB *, const SERVER *);
extern void dprintPersistentDCBs(DCB *, const SERVER *);
extern void dListServers(DCB *);
extern void dprintAllServers(DCB*);
extern void dprintAllServersJson(DCB*);
extern void dprintServer(DCB*, const SERVER*);
extern void dprintPersistentDCBs(DCB*, const SERVER*);
extern void dListServers(DCB*);
MXS_END_DECLS

View File

@ -382,6 +382,25 @@ mxs::SRWBackend get_root_master(const mxs::SRWBackendList& backends);
*/
std::pair<int, int> get_slave_counts(mxs::SRWBackendList& backends, mxs::SRWBackend& master);
/* TODO, hopefully temporary */
using BackendSPtrVec = std::vector<mxs::SRWBackend*>;
/**
* Find the best backend based on categorizing the servers, and then applying
* selection criteria to the best category.
*
* @param backends: vector of SRWBackend
* @param sc: which select_criteria_t to use
* @param master_accept_reads: NOTE: even if this is false, in some cases a master can
* still be selected for reads.
*
* @return Valid iterator into argument backends, or end(backends) if empty
*/
BackendSPtrVec::const_iterator find_best_backend(const BackendSPtrVec& backends,
select_criteria_t sc,
bool masters_accept_reads);
/*
* The following are implemented in rwsplit_tmp_table_multi.c
*/

View File

@ -49,7 +49,7 @@ void ResponseStat::query_ended()
if (++m_sample_count == m_num_filter_samples)
{
std::sort(begin(m_samples), end(m_samples));
std::sort(m_samples.begin(), m_samples.end());
maxbase::Duration new_sample = m_samples[m_num_filter_samples / 2];
m_average.add(std::chrono::duration<double>(new_sample).count());
m_sample_count = 0;

View File

@ -12,8 +12,7 @@
*/
#pragma once
#include <maxscale/cppdefs.hh>
#include <maxscale/ccdefs.hh>
#include <maxbase/stopwatch.hh>
#include <maxbase/average.hh>
@ -22,7 +21,6 @@
*/
namespace maxscale
{
/**
* Query response statistics. Uses median of N samples to filter noise, then
* uses those medians to calculate the average response time.
@ -39,23 +37,24 @@ public:
* @param num_synch_samples - this many medians before the average should be synced, or
* @param sync_duration - this much time between syncs.
*/
ResponseStat(int ignore_first_n = 5, int num_filter_samples = 3,
ResponseStat(int ignore_first_n = 5,
int num_filter_samples = 3,
maxbase::Duration sync_duration = std::chrono::seconds(5));
void query_started();
void query_ended(); // ok to call without a query_started
bool is_valid() const;
int num_samples() const;
void query_started();
void query_ended();// ok to call without a query_started
bool is_valid() const;
int num_samples() const;
maxbase::Duration average() const;
bool sync_time_reached(int num_synch_medians); // is it time to apply the average?
void reset();
bool sync_time_reached(int num_synch_medians); // is it time to apply the average?
void reset();
private:
int m_ignore_first_n;
const int m_num_filter_samples;
const maxbase::Duration m_sync_duration;
int m_sample_count;
std::vector<maxbase::Duration> m_samples; // N sampels from which median is used
maxbase::CumulativeAverage m_average;
maxbase::TimePoint m_last_start;
maxbase::TimePoint m_next_sync;
int m_ignore_first_n;
const int m_num_filter_samples;
const maxbase::Duration m_sync_duration;
int m_sample_count;
std::vector<maxbase::Duration> m_samples; // N sampels from which median is used
maxbase::CumulativeAverage m_average;
maxbase::TimePoint m_last_start;
maxbase::TimePoint m_next_sync;
};
}

View File

@ -38,47 +38,6 @@ using namespace maxscale;
extern int (*criteria_cmpfun[LAST_CRITERIA])(const SRWBackend&, const SRWBackend&);
/**
* Find out which of the two backend servers has smaller value for select
* criteria property.
*
* @param cand previously selected candidate
* @param new challenger
* @param sc select criteria
*
* @return pointer to backend reference of that backend server which has smaller
* value in selection criteria. If either reference pointer is NULL then the
* other reference pointer value is returned.
*/
static SRWBackend compare_backends(SRWBackend a, SRWBackend b, select_criteria_t sc)
{
int (*p)(const SRWBackend&, const SRWBackend&) = criteria_cmpfun[sc];
if (!a)
{
return b;
}
else if (!b)
{
return a;
}
// Prefer servers that are not busy executing session commands
bool a_busy = a->in_use() && a->has_session_commands();
bool b_busy = b->in_use() && b->has_session_commands();
if (a_busy && !b_busy)
{
return b;
}
else if (!a_busy && b_busy)
{
return a;
}
return p(a, b) <= 0 ? a : b;
}
void RWSplitSession::handle_connection_keepalive(SRWBackend& target)
{
mxb_assert(target);
@ -582,45 +541,36 @@ SRWBackend RWSplitSession::get_hinted_backend(char *name)
SRWBackend RWSplitSession::get_slave_backend(int max_rlag)
{
SRWBackend rval;
// create a list of useable backends (includes masters, function name is a bit off),
// then feed that list to compare.
BackendSPtrVec candidates;
auto counts = get_slave_counts(m_backends, m_current_master);
for (auto it = m_backends.begin(); it != m_backends.end(); it++)
for (auto& backend : m_backends)
{
auto& backend = *it;
bool can_take_slave_into_use = backend->is_slave()
&& !backend->in_use()
&& can_recover_servers()
&& backend->can_connect()
&& counts.second < m_router->max_slave_count();
if ((backend->is_master() || backend->is_slave()) && // Either a master or a slave
rpl_lag_is_ok(backend, max_rlag)) // Not lagging too much
bool master_or_slave = backend->is_master() || backend->is_slave();
bool is_useable = backend->in_use() || can_take_slave_into_use;
bool not_a_slacker = rpl_lag_is_ok(backend, max_rlag);
bool server_is_candidate = master_or_slave && is_useable && not_a_slacker;
if (server_is_candidate)
{
if (backend->in_use() || (can_recover_servers() && backend->can_connect()))
{
if (!rval)
{
// No previous candidate, accept any valid server (includes master)
if ((backend->is_master() && backend == m_current_master) ||
backend->is_slave())
{
rval = backend;
}
}
else if (backend->in_use() || counts.second < m_router->max_slave_count())
{
if (!m_config.master_accept_reads && rval->is_master())
{
// Pick slaves over masters with master_accept_reads=false
rval = backend;
}
else
{
// Compare the two servers and pick the best one
rval = compare_backends(rval, backend, m_config.slave_selection_criteria);
}
}
}
candidates.push_back(&backend);
}
}
return rval;
BackendSPtrVec::const_iterator rval = find_best_backend(candidates,
m_config.slave_selection_criteria,
m_config.master_accept_reads);
return (rval == candidates.end()) ? SRWBackend() : **rval;
}
SRWBackend RWSplitSession::get_master_backend()

View File

@ -45,48 +45,6 @@ static bool valid_for_slave(const SRWBackend& backend, const SRWBackend& master)
(!master || backend != master);
}
/**
* @brief Find the best slave candidate
*
* This function iterates through @c backend and tries to find the best backend
* reference that is not in use. @c cmpfun will be called to compare the backends.
*
* @param rses Router client session
* @param master The master server
* @param cmpfun qsort() compatible comparison function
*
* @return The best slave backend reference or NULL if no candidates could be found
*/
static SRWBackend get_slave_candidate(const SRWBackendList& backends, const SRWBackend& master,
int (*cmpfun)(const SRWBackend&, const SRWBackend&))
{
SRWBackend candidate;
for (SRWBackendList::const_iterator it = backends.begin();
it != backends.end(); it++)
{
const SRWBackend& backend = *it;
if (!backend->in_use() && backend->can_connect() &&
valid_for_slave(backend, master))
{
if (candidate)
{
if (cmpfun(candidate, backend) > 0)
{
candidate = backend;
}
}
else
{
candidate = backend;
}
}
}
return candidate;
}
/** Compare number of connections from this router in backend servers */
static int backend_cmp_router_conn(const SRWBackend& a, const SRWBackend& b)
{
@ -223,6 +181,104 @@ int (*criteria_cmpfun[LAST_CRITERIA])(const SRWBackend&, const SRWBackend&) =
backend_cmp_response_time
};
// This is still the current compare method. The response-time compare, along with anything
// using weights, have to change to use the whole array at once to be correct. Id est, everything
// will change to use the whole array in the next iteration.
static BackendSPtrVec::const_iterator run_comparison(const BackendSPtrVec& candidates,
select_criteria_t sc)
{
if (candidates.empty()) return candidates.end();
auto best = candidates.begin();
for (auto rival = std::next(best);
rival != candidates.end();
rival = std::next(rival))
{
if (criteria_cmpfun[sc](**best, **rival) > 0)
{
best = rival;
}
}
return best;
}
/**
* @brief Find the best slave candidate for a new connection.
*
* @param bends backends
* @param master the master server
* @param sc which select_criteria_t to use
*
* @return The best slave backend reference or null if no candidates could be found
*/
static SRWBackend get_slave_candidate(const SRWBackendList& bends,
const SRWBackend& master,
select_criteria_t sc)
{
// TODO, nantti, see if this and get_slave_backend can be combined to a single function
BackendSPtrVec backends;
for (auto& b : bends) // match intefaces. TODO, should go away in the future.
{
backends.push_back(const_cast<SRWBackend*>(&b));
}
BackendSPtrVec candidates;
for (auto& backend : backends)
{
if (!(*backend)->in_use()
&& (*backend)->can_connect()
&& valid_for_slave(*backend, master))
{
candidates.push_back(backend);
}
}
return !candidates.empty() ? **run_comparison(candidates, sc) : SRWBackend();
}
BackendSPtrVec::const_iterator find_best_backend(const BackendSPtrVec& backends,
select_criteria_t sc,
bool masters_accept_reads)
{
// Divide backends to priorities. The set of highest priority backends will then compete.
std::map<int, BackendSPtrVec> priority_map;;
int best_priority {INT_MAX}; // low numbers are high priority
for (auto& pSBackend : backends)
{
auto& backend = **pSBackend;
bool is_busy = backend.in_use() && backend.has_session_commands();
bool acts_slave = backend.is_slave() || (backend.is_master() && masters_accept_reads);
int priority;
if (acts_slave)
{
if (!is_busy)
{
priority = 1; // highest priority, idle servers
}
else
{
priority = 13; // lowest priority, busy servers
}
}
else
{
priority = 2; // idle masters with masters_accept_reads==false
}
priority_map[priority].push_back(pSBackend);
best_priority = std::min(best_priority, priority);
}
auto best = run_comparison(priority_map[best_priority], sc);
return std::find(backends.begin(), backends.end(), *best);
}
/**
* @brief Log server connections
*
@ -351,10 +407,7 @@ bool RWSplit::select_connect_backend_servers(MXS_SESSION *session,
return false;
}
/** Check slave selection criteria and set compare function */
select_criteria_t select_criteria = cnf.slave_selection_criteria;
auto cmpfun = criteria_cmpfun[select_criteria];
mxb_assert(cmpfun);
auto select_criteria = cnf.slave_selection_criteria;
if (mxs_log_is_priority_enabled(LOG_INFO))
{
@ -389,9 +442,9 @@ bool RWSplit::select_connect_backend_servers(MXS_SESSION *session,
if (slaves_connected < max_nslaves)
{
/** Connect to all possible slaves */
for (SRWBackend backend(get_slave_candidate(backends, master, cmpfun));
for (SRWBackend backend(get_slave_candidate(backends, master, select_criteria));
backend && slaves_connected < max_nslaves;
backend = get_slave_candidate(backends, master, cmpfun))
backend = get_slave_candidate(backends, master, select_criteria))
{
if (backend->can_connect() && backend->connect(session, sescmd_list))
{