diff --git a/include/maxscale/server.h b/include/maxscale/server.h index 0507d1d64..4ba6fc314 100644 --- a/include/maxscale/server.h +++ b/include/maxscale/server.h @@ -20,6 +20,7 @@ #include #include +#include #include #include @@ -30,7 +31,7 @@ MXS_BEGIN_DECLS #define MAX_SERVER_MONPW_LEN 1024 #define MAX_SERVER_VERSION_LEN 256 -#define MAX_NUM_SLAVES 128 /**< Maximum number of slaves under a single server*/ +#define MAX_NUM_SLAVES 128 /**< Maximum number of slaves under a single server*/ /** * Server configuration parameters names @@ -58,10 +59,10 @@ const int MXS_RLAG_UNDEFINED = -1; */ typedef struct server_params { - char *name; /**< Parameter name */ - char *value; /**< Parameter value */ - bool active; /**< Whether the parameter is valid */ - struct server_params *next; /**< Next Paramter in the linked list */ + char* name; /**< Parameter name */ + char* value; /**< Parameter value */ + bool active; /**< Whether the parameter is valid */ + struct server_params* next; /**< Next Paramter in the linked list */ } SERVER_PARAM; /** @@ -69,13 +70,13 @@ typedef struct server_params */ typedef struct { - int n_connections; /**< Number of connections */ - int n_current; /**< Current connections */ - int n_current_ops; /**< Current active operations */ - int n_persistent; /**< Current persistent pool */ - uint64_t n_new_conn; /**< Times the current pool was empty */ - uint64_t n_from_pool; /**< Times when a connection was available from the pool */ - uint64_t packets; /**< Number of packets routed to this server */ + int n_connections; /**< Number of connections */ + int n_current; /**< Current connections */ + int n_current_ops; /**< Current active operations */ + int n_persistent; /**< Current persistent pool */ + uint64_t n_new_conn; /**< Times the current pool was empty */ + uint64_t n_from_pool; /**< Times when a connection was available from the pool */ + uint64_t packets; /**< Number of packets routed to this server */ } SERVER_STATS; /** @@ -119,49 +120,55 @@ static uint64_t server_encode_version(const SERVER_VERSION* server_version) typedef struct server { // Base settings - char *name; /**< Server config name */ + char* name; /**< Server config name */ char address[MAX_SERVER_ADDRESS_LEN]; /**< Server hostname/IP-address */ - unsigned short port; /**< Server port */ - char *protocol; /**< Backend protocol module name */ - char *authenticator; /**< Authenticator module name */ + unsigned short port; /**< Server port */ + char* protocol; /**< Backend protocol module name */ + char* authenticator; /**< Authenticator module name */ // Other settings - char monuser[MAX_SERVER_MONUSER_LEN]; /**< Monitor username, overrides monitor setting */ - char monpw[MAX_SERVER_MONPW_LEN]; /**< Monitor password, overrides monitor setting */ - long persistpoolmax; /**< Maximum size of persistent connections pool */ - long persistmaxtime; /**< Maximum number of seconds connection can live */ - bool proxy_protocol; /**< Send proxy-protocol header to backends when connecting - * routing sessions. */ - SERVER_PARAM *parameters; /**< Additional custom parameters which may affect routing decisions. */ + char monuser[MAX_SERVER_MONUSER_LEN]; /**< Monitor username, overrides monitor setting */ + char monpw[MAX_SERVER_MONPW_LEN]; /**< Monitor password, overrides monitor setting */ + long persistpoolmax; /**< Maximum size of persistent connections pool */ + long persistmaxtime; /**< Maximum number of seconds connection can live */ + bool proxy_protocol; /**< Send proxy-protocol header to backends when connecting + * routing sessions. */ + SERVER_PARAM* parameters; /**< Additional custom parameters which may affect routing + * decisions. */ // Base variables - SPINLOCK lock; /**< Access lock. Required when modifying server status or settings. */ - bool is_active; /**< Server is active and has not been "destroyed" */ - void *auth_instance; /**< Authenticator instance data */ - SSL_LISTENER *server_ssl; /**< SSL data */ - DCB **persistent; /**< List of unused persistent connections to the server */ - uint8_t charset; /**< Server character set. Read from backend and sent to client. */ + SPINLOCK lock; /**< Access lock. Required when modifying server status or settings. */ + bool is_active; /**< Server is active and has not been "destroyed" */ + void* auth_instance;/**< Authenticator instance data */ + SSL_LISTENER* server_ssl; /**< SSL data */ + DCB** persistent; /**< List of unused persistent connections to the server */ + uint8_t charset; /**< Server character set. Read from backend and sent to client. */ // Statistics and events SERVER_STATS stats; /**< The server statistics, e.g. number of connections */ int persistmax; /**< Maximum pool size actually achieved since startup */ int last_event; /**< The last event that occurred on this server */ int64_t triggered_at; /**< Time when the last event was triggered */ // Status descriptors. Updated automatically by a monitor or manually by the admin - uint64_t status; /**< Current status flag bitmap */ - int maint_request; /**< Is admin requesting Maintenance=ON/OFF on the server? */ - char version_string[MAX_SERVER_VERSION_LEN]; /**< Server version string as given by backend */ - uint64_t version; /**< Server version numeric representation */ - server_type_t server_type; /**< Server type (MariaDB or MySQL), deduced from version string */ - long node_id; /**< Node id, server_id for M/S or local_index for Galera */ - int rlag; /**< Replication Lag for Master/Slave replication */ - unsigned long node_ts; /**< Last timestamp set from M/S monitor module */ - long master_id; /**< Master server id of this node */ + uint64_t status; /**< Current status flag bitmap */ + int maint_request; /**< Is admin requesting Maintenance=ON/OFF on the + * server? */ + char version_string[MAX_SERVER_VERSION_LEN]; /**< Server version string as given by backend */ + uint64_t version; /**< Server version numeric representation */ + server_type_t server_type; /**< Server type (MariaDB or MySQL), deduced from + * version string */ + long node_id; /**< Node id, server_id for M/S or local_index for + * Galera */ + int rlag; /**< Replication Lag for Master/Slave replication + * */ + unsigned long node_ts; /**< Last timestamp set from M/S monitor module */ + long master_id; /**< Master server id of this node */ // Misc fields - bool master_err_is_logged; /**< If node failed, this indicates whether it is logged. Only used - * by rwsplit. TODO: Move to rwsplit */ - bool warn_ssl_not_enabled; /**< SSL not used for an SSL enabled server */ - MxsDiskSpaceThreshold* disk_space_threshold; /**< Disk space thresholds */ + bool master_err_is_logged; /**< If node failed, this indicates whether it is logged. Only + * used + * by rwsplit. TODO: Move to rwsplit */ + bool warn_ssl_not_enabled;/**< SSL not used for an SSL enabled server */ + MxsDiskSpaceThreshold* disk_space_threshold;/**< Disk space thresholds */ // TODO, this is a plain ptr to a C++ class. Soonish, when the server is new/deleted // this will become a std::unique ptr. But not in this commit. - maxbase::EMAverage* response_time; /**< for calculating average response time */ + maxbase::EMAverage* response_time; /**< for calculating average response time */ } SERVER; /** @@ -169,30 +176,53 @@ typedef struct server * individual bits are independent, not all combinations make sense or are used. The bitfield is 64bits wide. */ // Bits used by most monitors -#define SERVER_RUNNING (1 << 0) /**<< The server is up and running */ -#define SERVER_MAINT (1 << 1) /**<< Server is in maintenance mode */ -#define SERVER_AUTH_ERROR (1 << 2) /**<< Authentication error from monitor */ -#define SERVER_MASTER (1 << 3) /**<< The server is a master, i.e. can handle writes */ -#define SERVER_SLAVE (1 << 4) /**<< The server is a slave, i.e. can handle reads */ +#define SERVER_RUNNING (1 << 0) /**<< The server is up and running */ +#define SERVER_MAINT (1 << 1) /**<< Server is in maintenance mode */ +#define SERVER_AUTH_ERROR (1 << 2) /**<< Authentication error from monitor */ +#define SERVER_MASTER (1 << 3) /**<< The server is a master, i.e. can handle writes */ +#define SERVER_SLAVE (1 << 4) /**<< The server is a slave, i.e. can handle reads */ // Bits used by MariaDB Monitor (mostly) -#define SERVER_SLAVE_OF_EXT_MASTER (1 << 5) /**<< Server is slave of a non-monitored master */ -#define SERVER_RELAY (1 << 6) /**<< Server is a relay */ -#define SERVER_WAS_MASTER (1 << 7) /**<< Server was a master but lost all slaves. */ +#define SERVER_SLAVE_OF_EXT_MASTER (1 << 5) /**<< Server is slave of a non-monitored master */ +#define SERVER_RELAY (1 << 6) /**<< Server is a relay */ +#define SERVER_WAS_MASTER (1 << 7) /**<< Server was a master but lost all slaves. */ // Bits used by other monitors -#define SERVER_JOINED (1 << 8) /**<< The server is joined in a Galera cluster */ -#define SERVER_NDB (1 << 9) /**<< The server is part of a MySQL cluster setup */ -#define SERVER_MASTER_STICKINESS (1 << 10) /**<< Server Master stickiness */ +#define SERVER_JOINED (1 << 8) /**<< The server is joined in a Galera cluster */ +#define SERVER_NDB (1 << 9) /**<< The server is part of a MySQL cluster setup */ +#define SERVER_MASTER_STICKINESS (1 << 10) /**<< Server Master stickiness */ // Bits providing general information -#define SERVER_DISK_SPACE_EXHAUSTED (1 << 31) /**<< The disk space of the server is exhausted */ +#define SERVER_DISK_SPACE_EXHAUSTED (1 << 31) /**<< The disk space of the server is exhausted */ -#define STRSRVSTATUS(s) (server_is_master(s) ? "RUNNING MASTER" : \ - (server_is_slave(s) ? "RUNNING SLAVE" : \ - (server_is_joined(s) ? "RUNNING JOINED" : \ - (server_is_ndb(s) ? "RUNNING NDB" : \ - ((server_is_running(s) && server_is_in_maint(s)) ? "RUNNING MAINTENANCE" : \ - (server_is_relay(s) ? "RUNNING RELAY" : \ - (server_is_usable(s) ? "RUNNING (only)" : \ - (server_is_down(s) ? "DOWN" : "UNKNOWN STATUS")))))))) +#define STRSRVSTATUS(s) \ + (server_is_master(s) ? "RUNNING MASTER" \ + : (server_is_slave(s) ? "RUNNING SLAVE" \ + : (server_is_joined(s) ? "RUNNING JOINED" \ + : (server_is_ndb(s) ? \ + "RUNNING NDB" \ + : (( \ + server_is_running( \ + s) \ + && \ + server_is_in_maint( \ + s)) \ + ? \ + "RUNNING MAINTENANCE" \ + : ( \ + server_is_relay( \ + s) \ + ? \ + "RUNNING RELAY" \ + : ( \ + server_is_usable( \ + s) \ + ? \ + "RUNNING (only)" \ + : ( \ + server_is_down( \ + s) \ + ? \ + "DOWN" \ + : \ + "UNKNOWN STATUS")))))))) /** * Is the server valid and active? @@ -223,7 +253,7 @@ inline bool server_is_usable(const SERVER* server) inline bool status_is_running(uint64_t status) { - return (status & SERVER_RUNNING); + return status & SERVER_RUNNING; } /** @@ -303,8 +333,8 @@ inline bool server_is_slave(const SERVER* server) inline bool status_is_relay(uint64_t status) { - return (status & (SERVER_RUNNING | SERVER_RELAY | SERVER_MAINT)) == \ - (SERVER_RUNNING | SERVER_RELAY); + return (status & (SERVER_RUNNING | SERVER_RELAY | SERVER_MAINT)) \ + == (SERVER_RUNNING | SERVER_RELAY); } inline bool server_is_relay(const SERVER* server) @@ -314,8 +344,8 @@ inline bool server_is_relay(const SERVER* server) inline bool status_is_joined(uint64_t status) { - return (status & (SERVER_RUNNING | SERVER_JOINED | SERVER_MAINT)) == - (SERVER_RUNNING | SERVER_JOINED); + return (status & (SERVER_RUNNING | SERVER_JOINED | SERVER_MAINT)) + == (SERVER_RUNNING | SERVER_JOINED); } /** @@ -341,14 +371,14 @@ inline bool server_is_ndb(const SERVER* server) inline bool server_is_in_cluster(const SERVER* server) { - return ((server->status & - (SERVER_MASTER | SERVER_SLAVE | SERVER_RELAY | SERVER_JOINED | SERVER_NDB)) != 0); + return (server->status + & (SERVER_MASTER | SERVER_SLAVE | SERVER_RELAY | SERVER_JOINED | SERVER_NDB)) != 0; } inline bool status_is_slave_of_ext_master(uint64_t status) { - return ((status & (SERVER_RUNNING | SERVER_SLAVE_OF_EXT_MASTER)) == - (SERVER_RUNNING | SERVER_SLAVE_OF_EXT_MASTER)); + return (status & (SERVER_RUNNING | SERVER_SLAVE_OF_EXT_MASTER)) + == (SERVER_RUNNING | SERVER_SLAVE_OF_EXT_MASTER); } inline bool server_is_slave_of_ext_master(const SERVER* server) @@ -358,7 +388,7 @@ inline bool server_is_slave_of_ext_master(const SERVER* server) inline bool status_is_disk_space_exhausted(uint64_t status) { - return (status & SERVER_DISK_SPACE_EXHAUSTED); + return status & SERVER_DISK_SPACE_EXHAUSTED; } inline bool server_is_disk_space_exhausted(const SERVER* server) @@ -378,7 +408,7 @@ inline bool server_is_disk_space_exhausted(const SERVER* server) * * @return The newly created server or NULL if an error occurred */ -extern SERVER* server_alloc(const char *name, MXS_CONFIG_PARAMETER* params); +extern SERVER* server_alloc(const char* name, MXS_CONFIG_PARAMETER* params); /** * @brief Serialize a server to a file @@ -390,7 +420,16 @@ extern SERVER* server_alloc(const char *name, MXS_CONFIG_PARAMETER* params); * @param server Server to serialize * @return False if the serialization of the server fails, true if it was successful */ -bool server_serialize(const SERVER *server); +bool server_serialize(const SERVER* server); + +/** + * @brief Add a server parameter + * + * @param server Server where the parameter is added + * @param name Parameter name + * @param value Parameter value + */ +void server_add_parameter(SERVER* server, const char* name, const char* value); /** * @brief Remove a server parameter @@ -399,7 +438,7 @@ bool server_serialize(const SERVER *server); * @param name The name of the parameter to remove * @return True if a parameter was removed */ -bool server_remove_parameter(SERVER *server, const char *name); +bool server_remove_parameter(SERVER* server, const char* name); /** * @brief Set server parameter @@ -416,7 +455,7 @@ void server_set_parameter(SERVER *server, const char *name, const char *value); * @param server Server to check * @return True if the server points to a local MaxScale service */ -bool server_is_mxs_service(const SERVER *server); +bool server_is_mxs_service(const SERVER* server); /** * @brief Convert a server to JSON format @@ -445,7 +484,7 @@ json_t* server_list_to_json(const char* host); * * @return True, if the provided string is valid and the threshold could be set. */ -bool server_set_disk_space_threshold(SERVER *server, const char *disk_space_threshold); +bool server_set_disk_space_threshold(SERVER* server, const char* disk_space_threshold); /** * @brief Add a response average to the server response average. @@ -455,35 +494,38 @@ bool server_set_disk_space_threshold(SERVER *server, const char *disk_space_thre * @param num_samples Number of samples the average consists of. * */ -void server_add_response_average(SERVER *server, double ave, int num_samples); +void server_add_response_average(SERVER* server, double ave, int num_samples); -extern int server_free(SERVER *server); -extern SERVER *server_find_by_unique_name(const char *name); -extern int server_find_by_unique_names(char **server_names, int size, SERVER*** output); -extern SERVER *server_find(const char *servname, unsigned short port); -extern char *server_status(const SERVER *); -extern void server_clear_set_status_nolock(SERVER *server, uint64_t bits_to_clear, uint64_t bits_to_set); -extern void server_set_status_nolock(SERVER *server, uint64_t bit); -extern void server_clear_status_nolock(SERVER *server, uint64_t bit); -extern void server_transfer_status(SERVER *dest_server, const SERVER *source_server); -extern void server_add_mon_user(SERVER *server, const char *user, const char *passwd); -extern size_t server_get_parameter(const SERVER *server, const char *name, char* out, size_t size); -extern void server_update_credentials(SERVER *server, const char *user, const char *passwd); -extern DCB* server_get_persistent(SERVER *server, const char *user, const char* ip, const char *protocol, - int id); -extern void server_update_address(SERVER *server, const char *address); -extern void server_update_port(SERVER *server, unsigned short port); -extern uint64_t server_map_status(const char *str); -extern void server_set_version_string(SERVER* server, const char* version_string); -extern void server_set_version(SERVER* server, const char* version_string, uint64_t version); +extern int server_free(SERVER* server); +extern SERVER* server_find_by_unique_name(const char* name); +extern int server_find_by_unique_names(char** server_names, int size, SERVER*** output); +extern SERVER* server_find(const char* servname, unsigned short port); +extern char* server_status(const SERVER*); +extern void server_clear_set_status_nolock(SERVER* server, uint64_t bits_to_clear, uint64_t bits_to_set); +extern void server_set_status_nolock(SERVER* server, uint64_t bit); +extern void server_clear_status_nolock(SERVER* server, uint64_t bit); +extern void server_transfer_status(SERVER* dest_server, const SERVER* source_server); +extern void server_add_mon_user(SERVER* server, const char* user, const char* passwd); +extern size_t server_get_parameter(const SERVER* server, const char* name, char* out, size_t size); +extern void server_update_credentials(SERVER* server, const char* user, const char* passwd); +extern DCB* server_get_persistent(SERVER* server, + const char* user, + const char* ip, + const char* protocol, + int id); +extern void server_update_address(SERVER* server, const char* address); +extern void server_update_port(SERVER* server, unsigned short port); +extern uint64_t server_map_status(const char* str); +extern void server_set_version_string(SERVER* server, const char* version_string); +extern void server_set_version(SERVER* server, const char* version_string, uint64_t version); extern uint64_t server_get_version(const SERVER* server); -extern void printServer(const SERVER *); +extern void printServer(const SERVER*); extern void printAllServers(); -extern void dprintAllServers(DCB *); -extern void dprintAllServersJson(DCB *); -extern void dprintServer(DCB *, const SERVER *); -extern void dprintPersistentDCBs(DCB *, const SERVER *); -extern void dListServers(DCB *); +extern void dprintAllServers(DCB*); +extern void dprintAllServersJson(DCB*); +extern void dprintServer(DCB*, const SERVER*); +extern void dprintPersistentDCBs(DCB*, const SERVER*); +extern void dListServers(DCB*); MXS_END_DECLS diff --git a/server/modules/routing/readwritesplit/readwritesplit.hh b/server/modules/routing/readwritesplit/readwritesplit.hh index ff6320967..fd721a126 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.hh +++ b/server/modules/routing/readwritesplit/readwritesplit.hh @@ -382,6 +382,25 @@ mxs::SRWBackend get_root_master(const mxs::SRWBackendList& backends); */ std::pair get_slave_counts(mxs::SRWBackendList& backends, mxs::SRWBackend& master); + +/* TODO, hopefully temporary */ +using BackendSPtrVec = std::vector; + +/** + * Find the best backend based on categorizing the servers, and then applying + * selection criteria to the best category. + * + * @param backends: vector of SRWBackend + * @param sc: which select_criteria_t to use + * @param master_accept_reads: NOTE: even if this is false, in some cases a master can + * still be selected for reads. + * + * @return Valid iterator into argument backends, or end(backends) if empty + */ +BackendSPtrVec::const_iterator find_best_backend(const BackendSPtrVec& backends, + select_criteria_t sc, + bool masters_accept_reads); + /* * The following are implemented in rwsplit_tmp_table_multi.c */ diff --git a/server/modules/routing/readwritesplit/response_stat.cc b/server/modules/routing/readwritesplit/response_stat.cc index 89cef4d91..e1f7751a0 100644 --- a/server/modules/routing/readwritesplit/response_stat.cc +++ b/server/modules/routing/readwritesplit/response_stat.cc @@ -49,7 +49,7 @@ void ResponseStat::query_ended() if (++m_sample_count == m_num_filter_samples) { - std::sort(begin(m_samples), end(m_samples)); + std::sort(m_samples.begin(), m_samples.end()); maxbase::Duration new_sample = m_samples[m_num_filter_samples / 2]; m_average.add(std::chrono::duration(new_sample).count()); m_sample_count = 0; diff --git a/server/modules/routing/readwritesplit/response_stat.hh b/server/modules/routing/readwritesplit/response_stat.hh index faa6295a6..1c6e0e7e6 100644 --- a/server/modules/routing/readwritesplit/response_stat.hh +++ b/server/modules/routing/readwritesplit/response_stat.hh @@ -12,8 +12,7 @@ */ #pragma once -#include - +#include #include #include @@ -22,7 +21,6 @@ */ namespace maxscale { - /** * Query response statistics. Uses median of N samples to filter noise, then * uses those medians to calculate the average response time. @@ -39,23 +37,24 @@ public: * @param num_synch_samples - this many medians before the average should be synced, or * @param sync_duration - this much time between syncs. */ - ResponseStat(int ignore_first_n = 5, int num_filter_samples = 3, + ResponseStat(int ignore_first_n = 5, + int num_filter_samples = 3, maxbase::Duration sync_duration = std::chrono::seconds(5)); - void query_started(); - void query_ended(); // ok to call without a query_started - bool is_valid() const; - int num_samples() const; + void query_started(); + void query_ended();// ok to call without a query_started + bool is_valid() const; + int num_samples() const; maxbase::Duration average() const; - bool sync_time_reached(int num_synch_medians); // is it time to apply the average? - void reset(); + bool sync_time_reached(int num_synch_medians); // is it time to apply the average? + void reset(); private: - int m_ignore_first_n; - const int m_num_filter_samples; - const maxbase::Duration m_sync_duration; - int m_sample_count; - std::vector m_samples; // N sampels from which median is used - maxbase::CumulativeAverage m_average; - maxbase::TimePoint m_last_start; - maxbase::TimePoint m_next_sync; + int m_ignore_first_n; + const int m_num_filter_samples; + const maxbase::Duration m_sync_duration; + int m_sample_count; + std::vector m_samples; // N sampels from which median is used + maxbase::CumulativeAverage m_average; + maxbase::TimePoint m_last_start; + maxbase::TimePoint m_next_sync; }; } diff --git a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc index c81c9ecf7..b898077e2 100644 --- a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc +++ b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc @@ -38,47 +38,6 @@ using namespace maxscale; extern int (*criteria_cmpfun[LAST_CRITERIA])(const SRWBackend&, const SRWBackend&); -/** - * Find out which of the two backend servers has smaller value for select - * criteria property. - * - * @param cand previously selected candidate - * @param new challenger - * @param sc select criteria - * - * @return pointer to backend reference of that backend server which has smaller - * value in selection criteria. If either reference pointer is NULL then the - * other reference pointer value is returned. - */ -static SRWBackend compare_backends(SRWBackend a, SRWBackend b, select_criteria_t sc) -{ - int (*p)(const SRWBackend&, const SRWBackend&) = criteria_cmpfun[sc]; - - if (!a) - { - return b; - } - else if (!b) - { - return a; - } - - // Prefer servers that are not busy executing session commands - bool a_busy = a->in_use() && a->has_session_commands(); - bool b_busy = b->in_use() && b->has_session_commands(); - - if (a_busy && !b_busy) - { - return b; - } - else if (!a_busy && b_busy) - { - return a; - } - - return p(a, b) <= 0 ? a : b; -} - void RWSplitSession::handle_connection_keepalive(SRWBackend& target) { mxb_assert(target); @@ -582,45 +541,36 @@ SRWBackend RWSplitSession::get_hinted_backend(char *name) SRWBackend RWSplitSession::get_slave_backend(int max_rlag) { - SRWBackend rval; + // create a list of useable backends (includes masters, function name is a bit off), + // then feed that list to compare. + BackendSPtrVec candidates; auto counts = get_slave_counts(m_backends, m_current_master); - for (auto it = m_backends.begin(); it != m_backends.end(); it++) + for (auto& backend : m_backends) { - auto& backend = *it; + bool can_take_slave_into_use = backend->is_slave() + && !backend->in_use() + && can_recover_servers() + && backend->can_connect() + && counts.second < m_router->max_slave_count(); - if ((backend->is_master() || backend->is_slave()) && // Either a master or a slave - rpl_lag_is_ok(backend, max_rlag)) // Not lagging too much + bool master_or_slave = backend->is_master() || backend->is_slave(); + bool is_useable = backend->in_use() || can_take_slave_into_use; + bool not_a_slacker = rpl_lag_is_ok(backend, max_rlag); + + bool server_is_candidate = master_or_slave && is_useable && not_a_slacker; + + if (server_is_candidate) { - if (backend->in_use() || (can_recover_servers() && backend->can_connect())) - { - if (!rval) - { - // No previous candidate, accept any valid server (includes master) - if ((backend->is_master() && backend == m_current_master) || - backend->is_slave()) - { - rval = backend; - } - } - else if (backend->in_use() || counts.second < m_router->max_slave_count()) - { - if (!m_config.master_accept_reads && rval->is_master()) - { - // Pick slaves over masters with master_accept_reads=false - rval = backend; - } - else - { - // Compare the two servers and pick the best one - rval = compare_backends(rval, backend, m_config.slave_selection_criteria); - } - } - } + candidates.push_back(&backend); } } - return rval; + BackendSPtrVec::const_iterator rval = find_best_backend(candidates, + m_config.slave_selection_criteria, + m_config.master_accept_reads); + + return (rval == candidates.end()) ? SRWBackend() : **rval; } SRWBackend RWSplitSession::get_master_backend() diff --git a/server/modules/routing/readwritesplit/rwsplit_select_backends.cc b/server/modules/routing/readwritesplit/rwsplit_select_backends.cc index c2d94528e..110378ba9 100644 --- a/server/modules/routing/readwritesplit/rwsplit_select_backends.cc +++ b/server/modules/routing/readwritesplit/rwsplit_select_backends.cc @@ -45,48 +45,6 @@ static bool valid_for_slave(const SRWBackend& backend, const SRWBackend& master) (!master || backend != master); } -/** - * @brief Find the best slave candidate - * - * This function iterates through @c backend and tries to find the best backend - * reference that is not in use. @c cmpfun will be called to compare the backends. - * - * @param rses Router client session - * @param master The master server - * @param cmpfun qsort() compatible comparison function - * - * @return The best slave backend reference or NULL if no candidates could be found - */ -static SRWBackend get_slave_candidate(const SRWBackendList& backends, const SRWBackend& master, - int (*cmpfun)(const SRWBackend&, const SRWBackend&)) -{ - SRWBackend candidate; - - for (SRWBackendList::const_iterator it = backends.begin(); - it != backends.end(); it++) - { - const SRWBackend& backend = *it; - - if (!backend->in_use() && backend->can_connect() && - valid_for_slave(backend, master)) - { - if (candidate) - { - if (cmpfun(candidate, backend) > 0) - { - candidate = backend; - } - } - else - { - candidate = backend; - } - } - } - - return candidate; -} - /** Compare number of connections from this router in backend servers */ static int backend_cmp_router_conn(const SRWBackend& a, const SRWBackend& b) { @@ -223,6 +181,104 @@ int (*criteria_cmpfun[LAST_CRITERIA])(const SRWBackend&, const SRWBackend&) = backend_cmp_response_time }; +// This is still the current compare method. The response-time compare, along with anything +// using weights, have to change to use the whole array at once to be correct. Id est, everything +// will change to use the whole array in the next iteration. +static BackendSPtrVec::const_iterator run_comparison(const BackendSPtrVec& candidates, + select_criteria_t sc) +{ + if (candidates.empty()) return candidates.end(); + + auto best = candidates.begin(); + + for (auto rival = std::next(best); + rival != candidates.end(); + rival = std::next(rival)) + { + if (criteria_cmpfun[sc](**best, **rival) > 0) + { + best = rival; + } + } + + return best; +} + +/** + * @brief Find the best slave candidate for a new connection. + * + * @param bends backends + * @param master the master server + * @param sc which select_criteria_t to use + * + * @return The best slave backend reference or null if no candidates could be found + */ +static SRWBackend get_slave_candidate(const SRWBackendList& bends, + const SRWBackend& master, + select_criteria_t sc) +{ + // TODO, nantti, see if this and get_slave_backend can be combined to a single function + BackendSPtrVec backends; + for (auto& b : bends) // match intefaces. TODO, should go away in the future. + { + backends.push_back(const_cast(&b)); + } + BackendSPtrVec candidates; + + for (auto& backend : backends) + { + if (!(*backend)->in_use() + && (*backend)->can_connect() + && valid_for_slave(*backend, master)) + { + candidates.push_back(backend); + } + } + + return !candidates.empty() ? **run_comparison(candidates, sc) : SRWBackend(); + +} + +BackendSPtrVec::const_iterator find_best_backend(const BackendSPtrVec& backends, + select_criteria_t sc, + bool masters_accept_reads) +{ + // Divide backends to priorities. The set of highest priority backends will then compete. + std::map priority_map;; + int best_priority {INT_MAX}; // low numbers are high priority + + for (auto& pSBackend : backends) + { + auto& backend = **pSBackend; + bool is_busy = backend.in_use() && backend.has_session_commands(); + bool acts_slave = backend.is_slave() || (backend.is_master() && masters_accept_reads); + + int priority; + if (acts_slave) + { + if (!is_busy) + { + priority = 1; // highest priority, idle servers + } + else + { + priority = 13; // lowest priority, busy servers + } + } + else + { + priority = 2; // idle masters with masters_accept_reads==false + } + + priority_map[priority].push_back(pSBackend); + best_priority = std::min(best_priority, priority); + } + + auto best = run_comparison(priority_map[best_priority], sc); + + return std::find(backends.begin(), backends.end(), *best); +} + /** * @brief Log server connections * @@ -351,10 +407,7 @@ bool RWSplit::select_connect_backend_servers(MXS_SESSION *session, return false; } - /** Check slave selection criteria and set compare function */ - select_criteria_t select_criteria = cnf.slave_selection_criteria; - auto cmpfun = criteria_cmpfun[select_criteria]; - mxb_assert(cmpfun); + auto select_criteria = cnf.slave_selection_criteria; if (mxs_log_is_priority_enabled(LOG_INFO)) { @@ -389,9 +442,9 @@ bool RWSplit::select_connect_backend_servers(MXS_SESSION *session, if (slaves_connected < max_nslaves) { /** Connect to all possible slaves */ - for (SRWBackend backend(get_slave_candidate(backends, master, cmpfun)); + for (SRWBackend backend(get_slave_candidate(backends, master, select_criteria)); backend && slaves_connected < max_nslaves; - backend = get_slave_candidate(backends, master, cmpfun)) + backend = get_slave_candidate(backends, master, select_criteria)) { if (backend->can_connect() && backend->connect(session, sescmd_list)) {