diff --git a/include/maxscale/backend.hh b/include/maxscale/backend.hh index 0fc213165..d50640fc6 100644 --- a/include/maxscale/backend.hh +++ b/include/maxscale/backend.hh @@ -86,7 +86,7 @@ public: /** * @brief Check if backend has session commands * - * @return True if backend has session commands + * @return Number of session commands */ size_t session_command_count() const; @@ -97,6 +97,20 @@ public: */ SERVER_REF* backend() const; + /** + * @brief Get pointer to server + * + * @return Pointer to server + */ + SERVER* server() const; + + /** + * @brief Check if a connection to this backend can be made + * + * @return True if the backend has not failed and a connection can be attempted + */ + bool can_connect() const; + /** * @brief Create a new connection * diff --git a/server/core/backend.cc b/server/core/backend.cc index 88f72a281..16021f1c2 100644 --- a/server/core/backend.cc +++ b/server/core/backend.cc @@ -152,9 +152,21 @@ void Backend::set_state(backend_state state) SERVER_REF* Backend::backend() const { + ss_dassert(m_backend); return m_backend; } +SERVER* Backend::server() const +{ + ss_dassert(m_backend); + return m_backend->server; +} + +bool Backend::can_connect() const +{ + return !has_failed() && SERVER_IS_RUNNING(m_backend->server); +} + bool Backend::connect(MXS_SESSION* session) { bool rval = false; @@ -165,6 +177,10 @@ bool Backend::connect(MXS_SESSION* session) atomic_add(&m_backend->connections, 1); rval = true; } + else + { + m_state = FATAL_FAILURE; + } return rval; } diff --git a/server/modules/routing/readwritesplit/readwritesplit.cc b/server/modules/routing/readwritesplit/readwritesplit.cc index f9a129695..ef4d4b9c2 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.cc +++ b/server/modules/routing/readwritesplit/readwritesplit.cc @@ -936,6 +936,23 @@ SRWBackend get_backend_from_bref(ROUTER_CLIENT_SES* rses, backend_ref_t* bref) return SRWBackend(); } +void close_all_connections(ROUTER_CLIENT_SES* rses) +{ + for (SRWBackendList::iterator it = rses->backends.begin(); + it != rses->backends.end(); it++) + { + if (BREF_IS_IN_USE((&backend_ref[i]))) + { + SRWBackend& bref = *it; + + if (bref->in_use()) + { + bref->close(); + } + } + } +} + /** * API function definitions */ diff --git a/server/modules/routing/readwritesplit/readwritesplit.hh b/server/modules/routing/readwritesplit/readwritesplit.hh index a5ab2d05a..2c3a1ed48 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.hh +++ b/server/modules/routing/readwritesplit/readwritesplit.hh @@ -195,26 +195,6 @@ enum reply_state_t REPLY_STATE_RSET_ROWS /**< Resultset response, waiting for rows */ }; -/** - * Reference to BACKEND. - * - * Owned by router client session. - */ -struct backend_ref_t -{ - skygw_chk_t bref_chk_top; - SERVER_REF* ref; - DCB* bref_dcb; - int bref_state; - int bref_num_result_wait; - sescmd_cursor_t bref_sescmd_cur; - unsigned char reply_cmd; /**< The reply the backend server sent to a session command. - * Used to detect slaves that fail to execute session command. */ - reply_state_t reply_state; /**< Reply state of the current query */ - skygw_chk_t bref_chk_tail; - int closed_at; /** DEBUG: Line number where this backend reference was closed */ -}; - struct rwsplit_config_t { int rw_max_slave_conn_percent; /**< Maximum percentage of slaves @@ -279,8 +259,6 @@ struct ROUTER_CLIENT_SES skygw_chk_t rses_chk_top; bool rses_closed; /**< true when closeSession is called */ rses_property_t* rses_properties[RSES_PROP_TYPE_COUNT]; /**< Properties listed by their type */ - backend_ref_t* rses_master_ref; - backend_ref_t* rses_backend_ref; /**< Pointer to backend reference array */ SRWBackendList backends; /**< List of backend servers */ SRWBackend current_master; /**< Current master server */ SRWBackend target_node; /**< The currently locked target node */ @@ -292,7 +270,6 @@ struct ROUTER_CLIENT_SES uint64_t rses_load_data_sent; /**< How much data has been sent */ DCB* client_dcb; uint64_t pos_generator; - backend_ref_t *forced_node; /**< Current server where all queries should be sent */ int expected_responses; /**< Number of expected responses to the current query */ GWBUF* query_queue; /**< Queued commands waiting to be executed */ struct ROUTER_INSTANCE *router; /**< The router instance */ diff --git a/server/modules/routing/readwritesplit/rwsplit_select_backends.cc b/server/modules/routing/readwritesplit/rwsplit_select_backends.cc index 805bd7af9..5dfd2c388 100644 --- a/server/modules/routing/readwritesplit/rwsplit_select_backends.cc +++ b/server/modules/routing/readwritesplit/rwsplit_select_backends.cc @@ -28,12 +28,10 @@ * not intended to be called from elsewhere. */ -static bool connect_server(backend_ref_t *bref, MXS_SESSION *session, bool execute_history); - static void log_server_connections(select_criteria_t select_criteria, - backend_ref_t *backend_ref, int router_nservers); + ROUTER_CLIENT_SES* rses); -static SERVER_REF *get_root_master(backend_ref_t *servers, int router_nservers); +static SERVER_REF *get_root_master(ROUTER_CLIENT_SES* rses); static int bref_cmp_global_conn(const void *bref1, const void *bref2); @@ -56,17 +54,6 @@ int (*criteria_cmpfun[LAST_CRITERIA])(const void *, const void *) = bref_cmp_current_load }; -/** - * @brief Check whether it's possible to connect to this server - * - * @param bref Backend reference - * @return True if a connection to this server can be attempted - */ -static bool bref_valid_for_connect(const backend_ref_t *bref) -{ - return !BREF_HAS_FAILED(bref) && SERVER_IS_RUNNING(bref->ref->server); -} - /** * Check whether it's possible to use this server as a slave * @@ -74,10 +61,8 @@ static bool bref_valid_for_connect(const backend_ref_t *bref) * @param master_host The master server * @return True if this server is a valid slave candidate */ -static bool bref_valid_for_slave(const backend_ref_t *bref, const SERVER *master_host) +static bool valid_for_slave(const SERVER *server, const SERVER *master_host) { - SERVER *server = bref->ref->server; - return (SERVER_IS_SLAVE(server) || SERVER_IS_RELAY_SERVER(server)) && (master_host == NULL || (server != master_host)); } @@ -94,27 +79,29 @@ static bool bref_valid_for_slave(const backend_ref_t *bref, const SERVER *master * @param cmpfun qsort() compatible comparison function * @return The best slave backend reference or NULL if no candidates could be found */ -backend_ref_t* get_slave_candidate(backend_ref_t *bref, int n, const SERVER *master, +SRWBackend* get_slave_candidate(ROUTER_CLIENT_SES* rses, const SERVER *master, int (*cmpfun)(const void *, const void *)) { - backend_ref_t *candidate = NULL; + SRWBackend candidate; - for (int i = 0; i < n; i++) + for (SRWBackendList::iterator it = rses->backends.begin(); + it != rses->backends.end(); it++) { - if (!BREF_IS_IN_USE(&bref[i]) && - bref_valid_for_connect(&bref[i]) && - bref_valid_for_slave(&bref[i], master)) + SRWBackend& bref = *it; + + if (!bref->in_use() && bref->can_connect() && + valid_for_slave(bref->server(), master)) { if (candidate) { - if (cmpfun(candidate, &bref[i]) > 0) + if (cmpfun(&candidate, &bref) > 0) { - candidate = &bref[i]; + candidate = bref; } } else { - candidate = &bref[i]; + candidate = bref; } } } @@ -140,9 +127,8 @@ backend_ref_t* get_slave_candidate(backend_ref_t *bref, int n, const SERVER *mas * @param router Router instance * @return true, if at least one master and one slave was found. */ -bool select_connect_backend_servers(backend_ref_t **p_master_ref, - backend_ref_t *backend_ref, - int router_nservers, int max_nslaves, +bool select_connect_backend_servers(int router_nservers, + int max_nslaves, int max_slave_rlag, select_criteria_t select_criteria, MXS_SESSION *session, @@ -150,16 +136,8 @@ bool select_connect_backend_servers(backend_ref_t **p_master_ref, ROUTER_CLIENT_SES *rses, bool active_session) { - if (p_master_ref == NULL || backend_ref == NULL) - { - MXS_ERROR("Master reference (%p) or backend reference (%p) is NULL.", - p_master_ref, backend_ref); - ss_dassert(false); - return false; - } - /* get the root Master */ - SERVER_REF *master_backend = get_root_master(backend_ref, router_nservers); + SERVER_REF *master_backend = get_root_master(rses); SERVER *master_host = master_backend ? master_backend->server : NULL; if (router->rwsplit_config.master_failure_mode == RW_FAIL_INSTANTLY && @@ -179,17 +157,15 @@ bool select_connect_backend_servers(backend_ref_t **p_master_ref, * Master is already connected or we don't have a master. The function was * called because new slaves must be selected to replace failed ones. */ - bool master_connected = active_session || *p_master_ref != NULL; + bool master_connected = active_session || rses->current_master; /** Check slave selection criteria and set compare function */ - int (*p)(const void *, const void *) = criteria_cmpfun[select_criteria]; - ss_dassert(p); - - SERVER *old_master = *p_master_ref ? (*p_master_ref)->ref->server : NULL; + int (*cmpfun)(const void *, const void *) = criteria_cmpfun[select_criteria]; + ss_dassert(cmpfun); if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO)) { - log_server_connections(select_criteria, backend_ref, router_nservers); + log_server_connections(select_criteria, rses); } int slaves_found = 0; @@ -200,42 +176,32 @@ bool select_connect_backend_servers(backend_ref_t **p_master_ref, if (!master_connected) { /** Find a master server */ - for (int i = 0; i < router_nservers; i++) + for (SRWBackendList::iterator it = rses->backends.begin(); + it != rses->backends.end(); it++) { - SERVER *serv = backend_ref[i].ref->server; + SRWBackend& bref = *it; - if (bref_valid_for_connect(&backend_ref[i]) && - master_host && serv == master_host) + if (bref->can_connect() && master_host && bref->server() == master_host) { - if (connect_server(&backend_ref[i], session, false)) + if (bref->connect(session)) { - for (SRWBackendList::iterator it = rses->backends.begin(); - it != rses->backends.end(); it++) - { - SRWBackend& backend = *it; - if (backend->backend()->server == serv) - { - backend->connect(session); - break; - } - } - - *p_master_ref = &backend_ref[i]; - break; + rses->current_master = bref; } } } } /** Calculate how many connections we already have */ - for (int i = 0; i < router_nservers; i++) + for (SRWBackendList::iterator it = rses->backends.begin(); + it != rses->backends.end(); it++) { - if (bref_valid_for_connect(&backend_ref[i]) && - bref_valid_for_slave(&backend_ref[i], master_host)) + SRWBackend& bref = *it; + + if (bref->can_connect() && valid_for_slave(bref->server(), master_host)) { slaves_found += 1; - if (BREF_IS_IN_USE(&backend_ref[i])) + if (bref->in_use()) { slaves_connected += 1; } @@ -244,38 +210,19 @@ bool select_connect_backend_servers(backend_ref_t **p_master_ref, ss_dassert(slaves_connected < max_nslaves || max_nslaves == 0); - backend_ref_t *bref = get_slave_candidate(backend_ref, router_nservers, master_host, p); - /** Connect to all possible slaves */ - while (bref && slaves_connected < max_nslaves) + for (SRWBackend bref(get_slave_candidate(rses, router_nservers, master_host, cmpfun)); + bref && slaves_connected < max_nslaves; + bref = get_slave_candidate(rses, master_host, cmpfun)) { - if (connect_server(bref, session, true)) + if (bref->connect(session) && + (bref->session_command_count() == 0 || + bref->execute_session_command())) { slaves_connected += 1; - - for (SRWBackendList::iterator it = rses->backends.begin(); - it != rses->backends.end(); it++) - { - SRWBackend& backend = *it; - if (backend->backend()->server == bref->ref->server) - { - backend->connect(session); - break; - } - } } - else - { - /** Failed to connect, mark server as failed */ - bref_set_state(bref, BREF_FATAL_FAILURE); - } - - bref = get_slave_candidate(backend_ref, router_nservers, master_host, p); } - /** - * Successful cases - */ if (slaves_connected >= min_nslaves && slaves_connected <= max_nslaves) { succp = true; @@ -289,41 +236,24 @@ bool select_connect_backend_servers(backend_ref_t **p_master_ref, "of %d of them.", slaves_connected, slaves_found); } - for (int i = 0; i < router_nservers; i++) + for (SRWBackendList::iterator it = rses->backends.begin(); + it != rses->backends.end(); it++) { - if (BREF_IS_IN_USE((&backend_ref[i]))) + SRWBackend& bref = *it; + if (bref->in_use()) { - MXS_INFO("Selected %s in \t[%s]:%d", - STRSRVSTATUS(backend_ref[i].ref->server), - backend_ref[i].ref->server->name, - backend_ref[i].ref->server->port); + MXS_INFO("Selected %s in \t[%s]:%d", STRSRVSTATUS(bref->server()), + bref->server()->name, bref->server()->port); } - } /* for */ + } } } - /** Failure cases */ else { MXS_ERROR("Couldn't establish required amount of slave connections for " "router session. Would need between %d and %d slaves but only have %d.", min_nslaves, max_nslaves, slaves_connected); - - /** Clean up connections */ - for (int i = 0; i < router_nservers; i++) - { - if (BREF_IS_IN_USE((&backend_ref[i]))) - { - ss_dassert(backend_ref[i].ref->connections > 0); - - close_failed_bref(&backend_ref[i], true); - - /** Decrease backend's connection counter. */ - atomic_add(&backend_ref[i].ref->connections, -1); - RW_CHK_DCB(&backend_ref[i], backend_ref[i].bref_dcb); - dcb_close(backend_ref[i].bref_dcb); - RW_CLOSE_BREF(&backend_ref[i]); - } - } + close_all_connections(rses); } return succp; @@ -332,8 +262,10 @@ bool select_connect_backend_servers(backend_ref_t **p_master_ref, /** Compare number of connections from this router in backend servers */ static int bref_cmp_router_conn(const void *bref1, const void *bref2) { - SERVER_REF *b1 = ((backend_ref_t *)bref1)->ref; - SERVER_REF *b2 = ((backend_ref_t *)bref2)->ref; + const SRWBackend& a = *reinterpret_cast(bref1); + const SRWBackend& b = *reinterpret_cast(bref2); + SERVER_REF *b1 = a->backend(); + SERVER_REF *b2 = b->backend(); if (b1->weight == 0 && b2->weight == 0) { @@ -355,8 +287,10 @@ static int bref_cmp_router_conn(const void *bref1, const void *bref2) /** Compare number of global connections in backend servers */ static int bref_cmp_global_conn(const void *bref1, const void *bref2) { - SERVER_REF *b1 = ((backend_ref_t *)bref1)->ref; - SERVER_REF *b2 = ((backend_ref_t *)bref2)->ref; + const SRWBackend& a = *reinterpret_cast(bref1); + const SRWBackend& b = *reinterpret_cast(bref2); + SERVER_REF *b1 = a->backend(); + SERVER_REF *b2 = b->backend(); if (b1->weight == 0 && b2->weight == 0) { @@ -379,8 +313,10 @@ static int bref_cmp_global_conn(const void *bref1, const void *bref2) /** Compare replication lag between backend servers */ static int bref_cmp_behind_master(const void *bref1, const void *bref2) { - SERVER_REF *b1 = ((backend_ref_t *)bref1)->ref; - SERVER_REF *b2 = ((backend_ref_t *)bref2)->ref; + const SRWBackend& a = *reinterpret_cast(bref1); + const SRWBackend& b = *reinterpret_cast(bref2); + SERVER_REF *b1 = a->backend(); + SERVER_REF *b2 = b->backend(); if (b1->weight == 0 && b2->weight == 0) { @@ -403,8 +339,10 @@ static int bref_cmp_behind_master(const void *bref1, const void *bref2) /** Compare number of current operations in backend servers */ static int bref_cmp_current_load(const void *bref1, const void *bref2) { - SERVER_REF *b1 = ((backend_ref_t *)bref1)->ref; - SERVER_REF *b2 = ((backend_ref_t *)bref2)->ref; + const SRWBackend& a = *reinterpret_cast(bref1); + const SRWBackend& b = *reinterpret_cast(bref2); + SERVER_REF *b1 = a->backend(); + SERVER_REF *b2 = b->backend(); if (b1->weight == 0 && b2->weight == 0) { @@ -423,59 +361,6 @@ static int bref_cmp_current_load(const void *bref1, const void *bref2) ((1000 + 1000 * b2->server->stats.n_current_ops) / b2->weight); } -/** - * @brief Connect a server - * - * Connects to a server, adds callbacks to the created DCB and updates - * router statistics. If @p execute_history is true, the session command - * history will be executed on this server. - * - * @param b Router's backend structure for the server - * @param session Client's session object - * @param execute_history Execute session command history - * @return True if successful, false if an error occurred - */ -static bool connect_server(backend_ref_t *bref, MXS_SESSION *session, bool execute_history) -{ - SERVER *serv = bref->ref->server; - bool rval = false; - - bref->bref_dcb = dcb_connect(serv, session, serv->protocol); - - if (bref->bref_dcb != NULL) - { - bref_clear_state(bref, BREF_CLOSED); - bref->closed_at = 0; - - if (!execute_history || execute_sescmd_history(bref)) - { - bref->bref_state = 0; - bref_set_state(bref, BREF_IN_USE); - atomic_add(&bref->ref->connections, 1); - rval = true; - } - else - { - MXS_ERROR("Failed to execute session command in %s ([%s]:%d). See earlier " - "errors for more details.", - bref->ref->server->unique_name, - bref->ref->server->name, - bref->ref->server->port); - RW_CHK_DCB(bref, bref->bref_dcb); - dcb_close(bref->bref_dcb); - RW_CLOSE_BREF(bref); - bref->bref_dcb = NULL; - } - } - else - { - MXS_ERROR("Unable to establish connection with server [%s]:%d", - serv->name, serv->port); - } - - return rval; -} - /** * @brief Log server connections * @@ -484,23 +369,19 @@ static bool connect_server(backend_ref_t *bref, MXS_SESSION *session, bool execu * @param router_nservers Number of backends in @p backend_ref */ static void log_server_connections(select_criteria_t select_criteria, - backend_ref_t *backend_ref, int router_nservers) + ROUTER_CLIENT_SES* rses) { - if (select_criteria == LEAST_GLOBAL_CONNECTIONS || - select_criteria == LEAST_ROUTER_CONNECTIONS || - select_criteria == LEAST_BEHIND_MASTER || - select_criteria == LEAST_CURRENT_OPERATIONS) + MXS_INFO("Servers and %s connection counts:", + select_criteria == LEAST_GLOBAL_CONNECTIONS ? "all MaxScale" + : "router"); + + for (SRWBackendList::iterator it = rses->backends.begin(); + it != rses->backends.end(); it++) { - MXS_INFO("Servers and %s connection counts:", - select_criteria == LEAST_GLOBAL_CONNECTIONS ? "all MaxScale" - : "router"); + SERVER_REF* b = (*it)->backend(); - for (int i = 0; i < router_nservers; i++) + switch (select_criteria) { - SERVER_REF *b = backend_ref[i].ref; - - switch (select_criteria) - { case LEAST_GLOBAL_CONNECTIONS: MXS_INFO("MaxScale connections : %d in \t[%s]:%d %s", b->server->stats.n_current, b->server->name, @@ -525,12 +406,11 @@ static void log_server_connections(select_criteria_t select_criteria, b->server->rlag, b->server->name, b->server->port, STRSRVSTATUS(b->server)); default: + ss_dassert(!true); break; - } } } } - /******************************** * This routine returns the root master server from MySQL replication tree * Get the root Master rule: @@ -544,21 +424,14 @@ static void log_server_connections(select_criteria_t select_criteria, * @return The Master found * */ -static SERVER_REF *get_root_master(backend_ref_t *servers, int router_nservers) +static SERVER_REF *get_root_master(ROUTER_CLIENT_SES* rses) { - int i = 0; SERVER_REF *master_host = NULL; - for (i = 0; i < router_nservers; i++) + for (SRWBackendList::iterator it = rses->backends.begin(); + it != rses->backends.end(); it++) { - if (servers[i].ref == NULL) - { - /** This should not happen */ - ss_dassert(false); - continue; - } - - SERVER_REF *b = servers[i].ref; + SERVER_REF* b = (*it)->backend(); if (SERVER_IS_MASTER(b->server)) { @@ -569,5 +442,6 @@ static SERVER_REF *get_root_master(backend_ref_t *servers, int router_nservers) } } } + return master_host; }