MXS-2276 Use dynamic servers also for cluster check

Once the monitor has been able to connect to a clustrix node
and obtain the clustrix nodes, it'll primarily use those nodes
when looking for a Clustrix node to be used as the "hub".
With this change it is sufficient (but perhaps unwise) to provide
a single node boostrap node in the configuration file.

Some other rearrangements and renamings of functions has also been
made.
This commit is contained in:
Johan Wikman
2019-01-23 10:07:48 +02:00
parent 0fe5b0bec9
commit 42b3402a71
7 changed files with 404 additions and 238 deletions

View File

@ -2,6 +2,7 @@ add_library(clustrixmon SHARED
clustrix.cc
clustrixmon.cc
clustrixmonitor.cc
clustrixnode.cc
)
target_link_libraries(clustrixmon maxscale-common)
set_target_properties(clustrixmon PROPERTIES VERSION "1.0.0" LINK_FLAGS -Wl,-z,defs)

View File

@ -86,3 +86,92 @@ Clustrix::SubState Clustrix::substate_from_string(const std::string& substate)
return SubState::UNKNOWN;
}
}
bool Clustrix::is_part_of_the_quorum(const SERVER& server, MYSQL* pCon)
{
bool rv = false;
const char* zAddress = server.address;
int port = server.port;
const char ZQUERY_TEMPLATE[] =
"SELECT ms.status FROM system.membership AS ms INNER JOIN system.nodeinfo AS ni "
"ON ni.nodeid = ms.nid WHERE ni.iface_ip = '%s'";
char zQuery[sizeof(ZQUERY_TEMPLATE) + strlen(zAddress)];
sprintf(zQuery, ZQUERY_TEMPLATE, zAddress);
if (mysql_query(pCon, zQuery) == 0)
{
MYSQL_RES* pResult = mysql_store_result(pCon);
if (pResult)
{
mxb_assert(mysql_field_count(pCon) == 1);
MYSQL_ROW row;
while ((row = mysql_fetch_row(pResult)) != nullptr)
{
if (row[0])
{
Clustrix::Status status = Clustrix::status_from_string(row[0]);
switch (status)
{
case Clustrix::Status::QUORUM:
rv = true;
break;
case Clustrix::Status::STATIC:
MXS_NOTICE("Node %s:%d is not part of the quorum, switching to "
"other node for monitoring.", zAddress, port);
break;
case Clustrix::Status::UNKNOWN:
MXS_WARNING("Do not know how to interpret '%s'. Assuming node %s:%d "
"is not part of the quorum.", row[0], zAddress, port);
}
}
else
{
MXS_WARNING("No status returned for '%s' on %s:%d.", zQuery, zAddress, port);
}
}
mysql_free_result(pResult);
}
else
{
MXS_WARNING("No result returned for '%s' on %s:%d.", zQuery, zAddress, port);
}
}
else
{
MXS_ERROR("Could not execute '%s' on %s:%d: %s", zQuery, zAddress, port, mysql_error(pCon));
}
return rv;
}
bool Clustrix::ping_or_connect_to_hub(const MXS_MONITOR& mon, SERVER& server, MYSQL** ppCon)
{
bool connected = false;
mxs_connect_result_t rv = mon_ping_or_connect_to_db(mon, server, ppCon);
if (mon_connection_is_ok(rv))
{
if (Clustrix::is_part_of_the_quorum(server, *ppCon))
{
connected = true;
}
}
else
{
MXS_ERROR("Could either not ping or create connection to %s:%d: %s",
server.address, server.port, mysql_error(*ppCon));
}
return connected;
}

View File

@ -14,6 +14,8 @@
#include "clustrixmon.hh"
#include <string>
#include <maxscale/monitor.hh>
#include <maxscale/server.hh>
namespace Clustrix
{
@ -37,4 +39,58 @@ enum class SubState
SubState substate_from_string(const std::string& substate);
std::string to_string(SubState sub_state);
/**
* Is a particular Clustrix node part of the quorum.
*
* @param server The server object of a Clustrix node.
* @param pCon Valid MYSQL handle to the server.
*
* @return True, if the node is part of the quorum, false otherwise.
*/
bool is_part_of_the_quorum(const SERVER& server, MYSQL* pCon);
/**
* Is a particular Clustrix node part of the quorum.
*
* @param ms The monitored server object of a Clustrix node.
*
* @return True, if the node is part of the quorum, false otherwise.
*/
inline bool is_part_of_the_quorum(MXS_MONITORED_SERVER& ms)
{
mxb_assert(ms.server);
mxb_assert(ms.con);
return is_part_of_the_quorum(*ms.server, ms.con);
}
/**
* Ping or create connection to server and check whether it can be used
* as hub.
*
* @param mon The monitor.
* @param server Server object referring to a Clustrix node.
* @param ppCon Address of pointer to MYSQL object referring to @server
* (@c *ppCon may also be NULL).
*
* @return True, if the server can be used as hub, false otherwise.
*
* @note Upon return @c *ppCon will be non-NULL.
*/
bool ping_or_connect_to_hub(const MXS_MONITOR& mon, SERVER& server, MYSQL** ppCon);
/**
* Ping or create connection to server and check whether it can be used
* as hub.
*
* @param mon The monitor.
* @param ms Monitored server object referring to a Clustrix node.
*
* @return True, if the server can be used as hub, false otherwise.
*/
inline bool ping_or_connect_to_hub(const MXS_MONITOR& mon, MXS_MONITORED_SERVER& ms)
{
return ping_or_connect_to_hub(mon, *ms.server, &ms.con);
}
}

View File

@ -50,7 +50,7 @@ bool ClustrixMonitor::configure(const MXS_CONFIG_PARAMETER* pParams)
m_config.set_cluster_monitor_interval(config_get_integer(pParams, CLUSTER_MONITOR_INTERVAL_NAME));
m_config.set_health_check_threshold(config_get_integer(pParams, HEALTH_CHECK_THRESHOLD_NAME));
refresh_cluster_nodes();
check_hub_and_refresh_nodes();
return true;
}
@ -62,18 +62,20 @@ void ClustrixMonitor::pre_loop()
void ClustrixMonitor::post_loop()
{
if (m_pMonitored_server && m_pMonitored_server->con)
if (m_pHub_con)
{
mysql_close(m_pMonitored_server->con);
m_pMonitored_server->con = nullptr;
mysql_close(m_pHub_con);
}
m_pHub_con = nullptr;
m_pHub_server = nullptr;
}
void ClustrixMonitor::tick()
{
if (now() - m_last_cluster_check > m_config.cluster_monitor_interval())
{
refresh_cluster_nodes();
check_hub_and_refresh_nodes();
}
switch (m_http.status())
@ -88,128 +90,79 @@ void ClustrixMonitor::tick()
break;
case http::Async::READY:
if (m_monitor->monitored_servers)
update_server_statuses();
if (!m_health_urls.empty())
{
update_server_statuses();
make_health_check();
}
break;
}
}
namespace
void ClustrixMonitor::choose_hub()
{
mxb_assert(!m_pHub_con);
bool is_part_of_the_quorum(const MXS_MONITORED_SERVER& ms)
{
bool rv = false;
SERVER* pHub_server = nullptr;
MYSQL* pHub_con = nullptr;
const char ZQUERY_TEMPLATE[] =
"SELECT ms.status FROM system.membership AS ms INNER JOIN system.nodeinfo AS ni "
"ON ni.nodeid = ms.nid WHERE ni.iface_ip = '%s'";
set<string> ips;
const char* zAddress = ms.server->address;
char zQuery[sizeof(ZQUERY_TEMPLATE) + strlen(zAddress)];
sprintf(zQuery, ZQUERY_TEMPLATE, zAddress);
if (mysql_query(ms.con, zQuery) == 0)
// First we check the dynamic servers, in case there are.
for (auto it = m_nodes.begin(); !pHub_con && (it != m_nodes.end()); ++it)
{
MYSQL_RES* pResult = mysql_store_result(ms.con);
auto& element = *it;
ClustrixNode& node = element.second;
if (pResult)
if (node.can_be_used_as_hub(*m_monitor))
{
mxb_assert(mysql_field_count(ms.con) == 1);
pHub_con = node.release_connection();
pHub_server = node.server();
}
MYSQL_ROW row;
while ((row = mysql_fetch_row(pResult)) != nullptr)
ips.insert(node.ip());
}
if (!pHub_con)
{
// If that fails, then we check the bootstrap servers, but only if
// it was not checked above.
auto b = begin(*(m_monitor->monitored_servers));
auto e = end(*(m_monitor->monitored_servers));
for (auto it = b; !pHub_con && (it != e); ++it)
{
MXS_MONITORED_SERVER& ms = *it;
if (ips.find(ms.server->address) == ips.end())
{
if (row[0])
if (Clustrix::ping_or_connect_to_hub(*m_monitor, ms))
{
Clustrix::Status status = Clustrix::status_from_string(row[0]);
switch (status)
{
case Clustrix::Status::QUORUM:
rv = true;
break;
case Clustrix::Status::STATIC:
MXS_NOTICE("Node %s is not part of the quorum, switching to "
"other node for monitoring.", zAddress);
break;
case Clustrix::Status::UNKNOWN:
MXS_WARNING("Do not know how to interpret '%s'. Assuming node %s "
"is not part of the quorum.", row[0], zAddress);
}
pHub_con = ms.con;
pHub_server = ms.server;
}
else
else if (ms.con)
{
MXS_WARNING("No status returned for '%s' on %s.", zQuery, zAddress);
mysql_close(ms.con);
}
ms.con = nullptr;
}
mysql_free_result(pResult);
}
else
{
MXS_WARNING("No result returned for '%s' on %s.", zQuery, zAddress);
}
}
else
if (pHub_con)
{
MXS_ERROR("Could not execute '%s' on %s: %s", zQuery, zAddress, mysql_error(ms.con));
}
MXS_NOTICE("Monitoring Clustrix cluster state using node %s:%d.",
pHub_server->address, pHub_server->port);
return rv;
}
}
void ClustrixMonitor::update_cluster_nodes()
{
auto b = begin(*(m_monitor->monitored_servers));
auto e = end(*(m_monitor->monitored_servers));
auto it = find_if(b, e,
[this](MXS_MONITORED_SERVER& ms) -> bool {
mxs_connect_result_t rv = mon_ping_or_connect_to_db(m_monitor, &ms);
bool usable = false;
if (mon_connection_is_ok(rv) && is_part_of_the_quorum(ms))
{
usable = true;
}
else if (ms.con)
{
mysql_close(ms.con);
ms.con = nullptr;
}
return usable;
});
if (it != e)
{
MXS_MONITORED_SERVER& ms = *it;
if (!m_pMonitored_server)
{
MXS_NOTICE("Monitoring Clustrix cluster state using node %s.", ms.server->address);
}
else if (m_pMonitored_server != &ms)
{
MXS_NOTICE("Monitoring Clustrix cluster state using %s (used to be %s).",
ms.server->address, m_pMonitored_server->server->address);
}
m_pMonitored_server = &ms;
mxb_assert(m_pMonitored_server->con);
update_cluster_nodes(*m_pMonitored_server);
m_pHub_con = pHub_con;
m_pHub_server = pHub_server;
mxb_assert(m_pHub_con);
mxb_assert(m_pHub_con);
}
else
{
@ -218,198 +171,202 @@ void ClustrixMonitor::update_cluster_nodes()
}
}
void ClustrixMonitor::update_cluster_nodes(MXS_MONITORED_SERVER& ms)
void ClustrixMonitor::refresh_nodes()
{
mxb_assert(ms.con);
mxb_assert(m_pHub_con);
map<int, ClustrixMembership> memberships;
check_cluster_membership(ms, &memberships);
const char ZQUERY[] = "SELECT nodeid, iface_ip, mysql_port, healthmon_port FROM system.nodeinfo";
if (mysql_query(ms.con, ZQUERY) == 0)
if (check_cluster_membership(&memberships))
{
MYSQL_RES* pResult = mysql_store_result(ms.con);
const char ZQUERY[] = "SELECT nodeid, iface_ip, mysql_port, healthmon_port FROM system.nodeinfo";
if (pResult)
if (mysql_query(m_pHub_con, ZQUERY) == 0)
{
mxb_assert(mysql_field_count(ms.con) == 4);
MYSQL_RES* pResult = mysql_store_result(m_pHub_con);
set<int> nids;
for (const auto& element : m_nodes)
if (pResult)
{
const ClustrixNode& node = element.second;
nids.insert(node.id());
}
mxb_assert(mysql_field_count(m_pHub_con) == 4);
MYSQL_ROW row;
while ((row = mysql_fetch_row(pResult)) != nullptr)
{
if (row[0] && row[1])
set<int> nids;
for (const auto& element : m_nodes)
{
int id = atoi(row[0]);
string ip = row[1];
int mysql_port = row[2] ? atoi(row[2]) : DEFAULT_MYSQL_PORT;
int health_port = row[3] ? atoi(row[3]) : DEFAULT_HEALTH_PORT;
const ClustrixNode& node = element.second;
nids.insert(node.id());
}
// '@@' ensures no clash with user created servers.
// Monitor name ensures no clash with other Clustrix monitor instances.
string name = string("@@") + this->name + ":server-" + std::to_string(id);
auto nit = m_nodes.find(id);
auto mit = memberships.find(id);
if (nit != m_nodes.end())
MYSQL_ROW row;
while ((row = mysql_fetch_row(pResult)) != nullptr)
{
if (row[0] && row[1])
{
// Existing node.
mxb_assert(SERVER::find_by_unique_name(name));
int id = atoi(row[0]);
string ip = row[1];
int mysql_port = row[2] ? atoi(row[2]) : DEFAULT_MYSQL_PORT;
int health_port = row[3] ? atoi(row[3]) : DEFAULT_HEALTH_PORT;
ClustrixNode& node = nit->second;
// '@@' ensures no clash with user created servers.
// Monitor name ensures no clash with other Clustrix monitor instances.
string name = string("@@") + this->name + ":server-" + std::to_string(id);
if (node.ip() != ip)
auto nit = m_nodes.find(id);
auto mit = memberships.find(id);
if (nit != m_nodes.end())
{
node.set_ip(ip);
// Existing node.
mxb_assert(SERVER::find_by_unique_name(name));
ClustrixNode& node = nit->second;
if (node.ip() != ip)
{
node.set_ip(ip);
}
if (node.mysql_port() != mysql_port)
{
node.set_mysql_port(mysql_port);
}
if (node.health_port() != health_port)
{
node.set_health_port(health_port);
}
nids.erase(id);
}
if (node.mysql_port() != mysql_port)
else if (mit != memberships.end())
{
node.set_mysql_port(mysql_port);
}
// New node.
mxb_assert(!SERVER::find_by_unique_name(name));
if (node.health_port() != health_port)
{
node.set_health_port(health_port);
}
if (runtime_create_server(name.c_str(),
ip.c_str(),
std::to_string(mysql_port).c_str(),
"mariadbbackend",
"mysqlbackendauth",
false))
{
SERVER* pServer = SERVER::find_by_unique_name(name);
mxb_assert(pServer);
nids.erase(id);
}
else if (mit != memberships.end())
{
// New node.
mxb_assert(!SERVER::find_by_unique_name(name));
const ClustrixMembership& membership = mit->second;
int health_check_threshold = m_config.health_check_threshold();
if (runtime_create_server(name.c_str(),
ip.c_str(),
std::to_string(mysql_port).c_str(),
"mariadbbackend",
"mysqlbackendauth",
false))
{
SERVER* pServer = SERVER::find_by_unique_name(name);
mxb_assert(pServer);
ClustrixNode node(membership, ip, mysql_port, health_port,
health_check_threshold, pServer);
const ClustrixMembership& membership = mit->second;
int health_check_threshold = m_config.health_check_threshold();
m_nodes.insert(make_pair(id, node));
}
else
{
MXS_ERROR("Could not create server %s at %s:%d.",
name.c_str(), ip.c_str(), mysql_port);
}
ClustrixNode node(membership, ip, mysql_port, health_port,
health_check_threshold, pServer);
m_nodes.insert(make_pair(id, node));
memberships.erase(mit);
}
else
{
MXS_ERROR("Could not create server %s at %s:%d.",
name.c_str(), ip.c_str(), mysql_port);
// Node found in system.node_info but not in system.membership
MXS_ERROR("Node %d at %s:%d,%d found in system.node_info "
"but not in system.membership.",
id, ip.c_str(), mysql_port, health_port);
}
memberships.erase(mit);
}
else
{
// Node found in system.node_info but not in system.membership
MXS_ERROR("Node %d at %s:%d,%d found in system.node_info "
"but not in system.membership.",
id, ip.c_str(), mysql_port, health_port);
MXS_WARNING("Either nodeid and/or iface_ip is missing, ignoring node.");
}
}
else
mysql_free_result(pResult);
for (const auto nid : nids)
{
MXS_WARNING("Either nodeid and/or iface_ip is missing, ignoring node.");
auto it = m_nodes.find(nid);
mxb_assert(it != m_nodes.end());
ClustrixNode& node = it->second;
node.set_running(false, ClustrixNode::APPROACH_OVERRIDE);
}
vector<string> health_urls;
for (const auto& element : m_nodes)
{
const ClustrixNode& node = element.second;
string url = "http://" + node.ip() + ":" + std::to_string(node.health_port());
health_urls.push_back(url);
}
m_health_urls.swap(health_urls);
m_last_cluster_check = now();
}
mysql_free_result(pResult);
for (const auto nid : nids)
else
{
auto it = m_nodes.find(nid);
mxb_assert(it != m_nodes.end());
ClustrixNode& node = it->second;
node.set_running(false, ClustrixNode::APPROACH_OVERRIDE);
MXS_WARNING("No result returned for '%s' on %s.", ZQUERY, m_pHub_server->address);
}
vector<string> health_urls;
for (const auto& element : m_nodes)
{
const ClustrixNode& node = element.second;
string url = "http://" + node.ip() + ":" + std::to_string(node.health_port());
health_urls.push_back(url);
}
m_health_urls.swap(health_urls);
m_last_cluster_check = now();
}
else
{
MXS_WARNING("No result returned for '%s' on %s.", ZQUERY, ms.server->address);
MXS_ERROR("Could not execute '%s' on %s: %s",
ZQUERY, m_pHub_server->address, mysql_error(m_pHub_con));
}
}
else
{
MXS_ERROR("Could not execute '%s' on %s: %s",
ZQUERY, ms.server->address, mysql_error(ms.con));
}
}
void ClustrixMonitor::refresh_cluster_nodes()
void ClustrixMonitor::check_hub_and_refresh_nodes()
{
if (m_pMonitored_server)
if (m_pHub_con)
{
mxs_connect_result_t rv = mon_ping_or_connect_to_db(m_monitor, m_pMonitored_server);
if (mon_connection_is_ok(rv) && is_part_of_the_quorum(*m_pMonitored_server))
{
update_cluster_nodes(*m_pMonitored_server);
}
else
{
if (m_pMonitored_server->con)
{
mysql_close(m_pMonitored_server->con);
m_pMonitored_server->con = nullptr;
}
update_cluster_nodes();
}
check_hub();
}
else if (m_monitor->monitored_servers)
if (!m_pHub_con)
{
update_cluster_nodes();
choose_hub();
}
if (m_pHub_con)
{
refresh_nodes();
}
}
bool ClustrixMonitor::check_cluster_membership(MXS_MONITORED_SERVER& ms,
std::map<int, ClustrixMembership>* pMemberships)
void ClustrixMonitor::check_hub()
{
mxb_assert(m_pHub_con);
mxb_assert(m_pHub_server);
if (!Clustrix::ping_or_connect_to_hub(*m_monitor, *m_pHub_server, &m_pHub_con))
{
mysql_close(m_pHub_con);
m_pHub_con = nullptr;
}
}
bool ClustrixMonitor::check_cluster_membership(std::map<int, ClustrixMembership>* pMemberships)
{
mxb_assert(ms.con);
mxb_assert(pMemberships);
mxb_assert(m_pHub_con);
mxb_assert(m_pHub_server);
bool rv = false;
const char ZQUERY[] = "SELECT nid, status, instance, substate FROM system.membership";
if (mysql_query(ms.con, ZQUERY) == 0)
if (mysql_query(m_pHub_con, ZQUERY) == 0)
{
MYSQL_RES* pResult = mysql_store_result(ms.con);
MYSQL_RES* pResult = mysql_store_result(m_pHub_con);
if (pResult)
{
mxb_assert(mysql_field_count(ms.con) == 4);
mxb_assert(mysql_field_count(m_pHub_con) == 4);
set<int> nids;
for (const auto& element : m_nodes)
@ -479,7 +436,7 @@ bool ClustrixMonitor::check_cluster_membership(MXS_MONITORED_SERVER& ms,
else
{
MXS_ERROR("Could not execute '%s' on %s: %s",
ZQUERY, ms.server->address, mysql_error(ms.con));
ZQUERY, m_pHub_server->address, mysql_error(m_pHub_con));
}
return rv;

View File

@ -70,12 +70,14 @@ private:
void pre_loop() override;
void post_loop() override;
void tick();
void tick() override;
void check_hub_and_refresh_nodes();
void check_hub();
void choose_hub();
void refresh_nodes();
bool check_cluster_membership(std::map<int, ClustrixMembership>* pMemberships);
void update_cluster_nodes();
void update_cluster_nodes(MXS_MONITORED_SERVER& ms);
void refresh_cluster_nodes();
bool check_cluster_membership(MXS_MONITORED_SERVER& ms, std::map<int, ClustrixMembership>* pMemberships);
void update_server_statuses();
void make_health_check();
@ -94,5 +96,6 @@ private:
mxb::http::Async m_http;
uint32_t m_delayed_http_check_id { 0 };
long m_last_cluster_check { 0 };
MXS_MONITORED_SERVER* m_pMonitored_server { nullptr };
SERVER* m_pHub_server { nullptr };
MYSQL* m_pHub_con { nullptr };
};

View File

@ -0,0 +1,30 @@
/*
* Copyright (c) 2018 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2022-01-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include "clustrixnode.hh"
#include "clustrix.hh"
bool ClustrixNode::can_be_used_as_hub(const MXS_MONITOR& mon)
{
mxb_assert(m_pServer);
bool rv = Clustrix::ping_or_connect_to_hub(mon, *m_pServer, &m_pCon);
if (!rv)
{
mysql_close(m_pCon);
m_pCon = nullptr;
}
return rv;
}

View File

@ -17,6 +17,7 @@
#include <sstream>
#include <string>
#include "clustrix.hh"
#include "clustrixmembership.hh"
class ClustrixNode
{
@ -49,9 +50,18 @@ public:
, m_health_check_threshold(health_check_threshold)
, m_nRunning(m_health_check_threshold)
, m_pServer(pServer)
, m_pCon(nullptr)
{
}
~ClustrixNode()
{
if (m_pCon)
{
mysql_close(m_pCon);
}
}
int id() const
{
return m_id;
@ -150,6 +160,25 @@ public:
m_pServer->is_active = false;
}
bool can_be_used_as_hub(const MXS_MONITOR& mon);
SERVER* server() const
{
return m_pServer;
}
MYSQL* connection() const
{
return m_pCon;
}
MYSQL* release_connection()
{
MYSQL* pCon = m_pCon;
m_pCon = nullptr;
return pCon;
}
std::string to_string() const
{
std::stringstream ss;
@ -173,6 +202,7 @@ private:
int m_health_check_threshold { DEFAULT_HEALTH_CHECK_THRESHOLD_VALUE };
int m_nRunning { 0 };
SERVER* m_pServer { nullptr };
MYSQL* m_pCon { nullptr };
};
inline std::ostream& operator << (std::ostream& out, const ClustrixNode& x)