MXS-2219 Perform cluster monitoring as well
Now the monitor - will frequently ping the health port of each server - less frequently check from system.membership the actual number of available nodes and act accordingly. Currently, the updated servers are the ones listed in the conf file. Subsequently this will be changed so that the servers listed in the configuration file are only used for bootstrapping the monitor and server objects are then created dynamically according to what is found in the cluster.
This commit is contained in:
@ -12,13 +12,25 @@
|
||||
*/
|
||||
|
||||
#include "clustrixmonitor.hh"
|
||||
#include <algorithm>
|
||||
|
||||
namespace http = mxb::http;
|
||||
using namespace std;
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
const int DEFAULT_MYSQL_PORT = 3306;
|
||||
const int DEFAULT_HEALTH_PORT = 3581;
|
||||
|
||||
}
|
||||
|
||||
ClustrixMonitor::ClustrixMonitor(MXS_MONITOR* pMonitor)
|
||||
: maxscale::MonitorInstance(pMonitor)
|
||||
, m_delayed_http_check_id(0)
|
||||
{
|
||||
}
|
||||
|
||||
ClustrixMonitor::~ClustrixMonitor()
|
||||
{
|
||||
}
|
||||
|
||||
@ -30,40 +42,42 @@ ClustrixMonitor* ClustrixMonitor::create(MXS_MONITOR* pMonitor)
|
||||
|
||||
bool ClustrixMonitor::configure(const MXS_CONFIG_PARAMETER* pParams)
|
||||
{
|
||||
if (!m_monitor->monitored_servers)
|
||||
{
|
||||
MXS_WARNING("No servers specified, cannot start monitor.");
|
||||
return false;
|
||||
}
|
||||
|
||||
m_health_urls.clear();
|
||||
|
||||
m_config.set_cluster_monitor_interval(config_get_integer(pParams, CLUSTER_MONITOR_INTERVAL_NAME));
|
||||
|
||||
MXS_MONITORED_SERVER* pMonitored_server = m_monitor->monitored_servers;
|
||||
|
||||
while (pMonitored_server)
|
||||
{
|
||||
SERVER* pServer = pMonitored_server->server;
|
||||
|
||||
string url(pServer->address);
|
||||
url += ":";
|
||||
url += "3581"; // TODO: Possibly make configurable.
|
||||
|
||||
m_health_urls.push_back(url);
|
||||
|
||||
pMonitored_server = pMonitored_server->next;
|
||||
}
|
||||
refresh_cluster_nodes();
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
void ClustrixMonitor::pre_loop()
|
||||
{
|
||||
m_http = mxb::http::get_async(m_health_urls);
|
||||
make_health_check();
|
||||
}
|
||||
|
||||
if (m_http.status() == http::Async::ERROR)
|
||||
void ClustrixMonitor::post_loop()
|
||||
{
|
||||
if (m_pMonitored_server && m_pMonitored_server->con)
|
||||
{
|
||||
MXS_WARNING("Could not initiate health check to nodes.");
|
||||
mysql_close(m_pMonitored_server->con);
|
||||
m_pMonitored_server->con = nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
void ClustrixMonitor::tick()
|
||||
{
|
||||
if (now() - m_last_cluster_check > m_config.cluster_monitor_interval())
|
||||
{
|
||||
refresh_cluster_nodes();
|
||||
}
|
||||
|
||||
switch (m_http.status())
|
||||
{
|
||||
case http::Async::PENDING:
|
||||
@ -72,25 +86,174 @@ void ClustrixMonitor::tick()
|
||||
|
||||
case http::Async::ERROR:
|
||||
MXS_WARNING("Health check round ended with general error.");
|
||||
make_health_check();
|
||||
break;
|
||||
|
||||
case http::Async::READY:
|
||||
{
|
||||
auto b = std::begin(*m_monitor->monitored_servers);
|
||||
auto e = std::end(*m_monitor->monitored_servers);
|
||||
|
||||
for_each(b, e,
|
||||
[this](MXS_MONITORED_SERVER& ms) {
|
||||
monitor_stash_current_status(&ms);
|
||||
|
||||
auto it = find_if(m_node_infos.begin(), m_node_infos.end(),
|
||||
[&ms](const ClustrixNodeInfo& info) -> bool {
|
||||
return ms.server->address == info.ip();
|
||||
});
|
||||
|
||||
if (it != m_node_infos.end())
|
||||
{
|
||||
if (it->is_running())
|
||||
{
|
||||
monitor_set_pending_status(&ms, SERVER_RUNNING);
|
||||
}
|
||||
else
|
||||
{
|
||||
monitor_clear_pending_status(&ms, SERVER_RUNNING);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
monitor_clear_pending_status(&ms, SERVER_RUNNING);
|
||||
}
|
||||
|
||||
});
|
||||
|
||||
make_health_check();
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
void ClustrixMonitor::fetch_cluster_nodes()
|
||||
{
|
||||
auto b = begin(*(m_monitor->monitored_servers));
|
||||
auto e = end(*(m_monitor->monitored_servers));
|
||||
|
||||
auto it = find_if(b, e,
|
||||
[this](MXS_MONITORED_SERVER& ms) -> bool {
|
||||
mxs_connect_result_t rv = mon_ping_or_connect_to_db(m_monitor, &ms);
|
||||
|
||||
return mon_connection_is_ok(rv) ? true : false;
|
||||
});
|
||||
|
||||
if (it != e)
|
||||
{
|
||||
MXS_MONITORED_SERVER& ms = *it;
|
||||
fetch_cluster_nodes_from(ms);
|
||||
|
||||
m_pMonitored_server = &ms;
|
||||
}
|
||||
else
|
||||
{
|
||||
MXS_ERROR("Could not connect to any server.");
|
||||
}
|
||||
}
|
||||
|
||||
void ClustrixMonitor::fetch_cluster_nodes_from(MXS_MONITORED_SERVER& ms)
|
||||
{
|
||||
mxb_assert(ms.con);
|
||||
|
||||
const char ZQUERY[] = "SELECT nodeid, iface_ip, mysql_port, healthmon_port FROM system.nodeinfo";
|
||||
|
||||
if (mysql_query(ms.con, ZQUERY) == 0)
|
||||
{
|
||||
MYSQL_RES* pResult = mysql_store_result(ms.con);
|
||||
|
||||
if (pResult)
|
||||
{
|
||||
m_http = mxb::http::get_async(m_health_urls);
|
||||
mxb_assert(mysql_field_count(ms.con) == 4);
|
||||
|
||||
switch (m_http.status())
|
||||
vector<ClustrixNodeInfo> node_infos;
|
||||
vector<string> health_urls;
|
||||
|
||||
MYSQL_ROW row;
|
||||
|
||||
while ((row = mysql_fetch_row(pResult)) != nullptr)
|
||||
{
|
||||
case http::Async::PENDING:
|
||||
initiate_delayed_http_check();
|
||||
break;
|
||||
if (row[0] && row[1])
|
||||
{
|
||||
int id = atoi(row[0]);
|
||||
string ip = row[1];
|
||||
int mysql_port = row[2] ? atoi(row[2]) : DEFAULT_MYSQL_PORT;
|
||||
int health_port = row[3] ? atoi(row[3]) : DEFAULT_HEALTH_PORT;
|
||||
|
||||
case http::Async::ERROR:
|
||||
MXS_ERROR("Could not initiate health check.");
|
||||
break;
|
||||
node_infos.emplace_back(id, ip, mysql_port, health_port);
|
||||
|
||||
case http::Async::READY:
|
||||
MXS_NOTICE("Health check available immediately.");
|
||||
break;
|
||||
string health_url = "http://" + ip + ":" + std::to_string(health_port);
|
||||
health_urls.push_back(health_url);
|
||||
}
|
||||
else
|
||||
{
|
||||
MXS_WARNING("Either nodeid and/or iface_ip is missing, "
|
||||
"ignoring node.");
|
||||
}
|
||||
}
|
||||
|
||||
mysql_free_result(pResult);
|
||||
|
||||
m_node_infos.swap(node_infos);
|
||||
m_health_urls.swap(health_urls);
|
||||
|
||||
m_last_cluster_check = now();
|
||||
}
|
||||
else
|
||||
{
|
||||
MXS_WARNING("No result returned for '%s'.", ZQUERY);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
MXS_ERROR("Could not execute '%s' on %s: %s",
|
||||
ZQUERY, ms.server->address, mysql_error(ms.con));
|
||||
}
|
||||
}
|
||||
|
||||
void ClustrixMonitor::refresh_cluster_nodes()
|
||||
{
|
||||
if (m_pMonitored_server)
|
||||
{
|
||||
mxs_connect_result_t rv = mon_ping_or_connect_to_db(m_monitor, m_pMonitored_server);
|
||||
|
||||
if (mon_connection_is_ok(rv))
|
||||
{
|
||||
fetch_cluster_nodes_from(*m_pMonitored_server);
|
||||
}
|
||||
else
|
||||
{
|
||||
mysql_close(m_pMonitored_server->con);
|
||||
m_pMonitored_server->con = nullptr;
|
||||
|
||||
fetch_cluster_nodes();
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
fetch_cluster_nodes();
|
||||
}
|
||||
}
|
||||
|
||||
void ClustrixMonitor::make_health_check()
|
||||
{
|
||||
mxb_assert(m_http.status() != http::Async::PENDING);
|
||||
|
||||
m_http = mxb::http::get_async(m_health_urls);
|
||||
|
||||
switch (m_http.status())
|
||||
{
|
||||
case http::Async::PENDING:
|
||||
initiate_delayed_http_check();
|
||||
break;
|
||||
|
||||
case http::Async::ERROR:
|
||||
MXS_ERROR("Could not initiate health check.");
|
||||
break;
|
||||
|
||||
case http::Async::READY:
|
||||
MXS_NOTICE("Health check available immediately.");
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@ -126,26 +289,20 @@ bool ClustrixMonitor::check_http(Call::action_t action)
|
||||
{
|
||||
const vector<http::Result>& results = m_http.results();
|
||||
|
||||
MXS_MONITORED_SERVER* pMonitored_server = m_monitor->monitored_servers;
|
||||
|
||||
for (size_t i = 0; i < m_health_urls.size(); ++i)
|
||||
for (size_t i = 0; i < results.size(); ++i)
|
||||
{
|
||||
mxb_assert(pMonitored_server);
|
||||
|
||||
const auto& url = m_health_urls[i];
|
||||
const auto& result = results[i];
|
||||
|
||||
MXS_INFO("%s: %s", url.c_str(), (result.code == 200) ? "OK" : result.body.c_str());
|
||||
|
||||
uint64_t bits = 0;
|
||||
bool running = false;
|
||||
|
||||
if (result.code == 200)
|
||||
{
|
||||
bits |= SERVER_RUNNING;
|
||||
running = true;
|
||||
}
|
||||
|
||||
monitor_set_pending_status(pMonitored_server, bits);
|
||||
pMonitored_server = pMonitored_server->next;
|
||||
auto& node_info = m_node_infos[i];
|
||||
|
||||
node_info.set_running(running);
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
||||
Reference in New Issue
Block a user