From 880842e55da7300ac261fde170591e2af1b4d732 Mon Sep 17 00:00:00 2001 From: Johan Wikman Date: Wed, 9 Jan 2019 14:13:46 +0200 Subject: [PATCH] MXS-2219 Perform cluster monitoring as well Now the monitor - will frequently ping the health port of each server - less frequently check from system.membership the actual number of available nodes and act accordingly. Currently, the updated servers are the ones listed in the conf file. Subsequently this will be changed so that the servers listed in the configuration file are only used for bootstrapping the monitor and server objects are then created dynamically according to what is found in the cluster. --- .../monitor/clustrixmon/clustrixmonitor.cc | 239 +++++++++++++++--- .../monitor/clustrixmon/clustrixmonitor.hh | 26 +- .../monitor/clustrixmon/clustrixnodeinfo.hh | 88 +++++++ 3 files changed, 308 insertions(+), 45 deletions(-) create mode 100644 server/modules/monitor/clustrixmon/clustrixnodeinfo.hh diff --git a/server/modules/monitor/clustrixmon/clustrixmonitor.cc b/server/modules/monitor/clustrixmon/clustrixmonitor.cc index 1c8690dda..4fe8ab400 100644 --- a/server/modules/monitor/clustrixmon/clustrixmonitor.cc +++ b/server/modules/monitor/clustrixmon/clustrixmonitor.cc @@ -12,13 +12,25 @@ */ #include "clustrixmonitor.hh" +#include namespace http = mxb::http; using namespace std; +namespace +{ + +const int DEFAULT_MYSQL_PORT = 3306; +const int DEFAULT_HEALTH_PORT = 3581; + +} + ClustrixMonitor::ClustrixMonitor(MXS_MONITOR* pMonitor) : maxscale::MonitorInstance(pMonitor) - , m_delayed_http_check_id(0) +{ +} + +ClustrixMonitor::~ClustrixMonitor() { } @@ -30,40 +42,42 @@ ClustrixMonitor* ClustrixMonitor::create(MXS_MONITOR* pMonitor) bool ClustrixMonitor::configure(const MXS_CONFIG_PARAMETER* pParams) { + if (!m_monitor->monitored_servers) + { + MXS_WARNING("No servers specified, cannot start monitor."); + return false; + } + m_health_urls.clear(); m_config.set_cluster_monitor_interval(config_get_integer(pParams, CLUSTER_MONITOR_INTERVAL_NAME)); - MXS_MONITORED_SERVER* pMonitored_server = m_monitor->monitored_servers; - - while (pMonitored_server) - { - SERVER* pServer = pMonitored_server->server; - - string url(pServer->address); - url += ":"; - url += "3581"; // TODO: Possibly make configurable. - - m_health_urls.push_back(url); - - pMonitored_server = pMonitored_server->next; - } + refresh_cluster_nodes(); return true; } void ClustrixMonitor::pre_loop() { - m_http = mxb::http::get_async(m_health_urls); + make_health_check(); +} - if (m_http.status() == http::Async::ERROR) +void ClustrixMonitor::post_loop() +{ + if (m_pMonitored_server && m_pMonitored_server->con) { - MXS_WARNING("Could not initiate health check to nodes."); + mysql_close(m_pMonitored_server->con); + m_pMonitored_server->con = nullptr; } } void ClustrixMonitor::tick() { + if (now() - m_last_cluster_check > m_config.cluster_monitor_interval()) + { + refresh_cluster_nodes(); + } + switch (m_http.status()) { case http::Async::PENDING: @@ -72,25 +86,174 @@ void ClustrixMonitor::tick() case http::Async::ERROR: MXS_WARNING("Health check round ended with general error."); + make_health_check(); + break; + case http::Async::READY: + { + auto b = std::begin(*m_monitor->monitored_servers); + auto e = std::end(*m_monitor->monitored_servers); + + for_each(b, e, + [this](MXS_MONITORED_SERVER& ms) { + monitor_stash_current_status(&ms); + + auto it = find_if(m_node_infos.begin(), m_node_infos.end(), + [&ms](const ClustrixNodeInfo& info) -> bool { + return ms.server->address == info.ip(); + }); + + if (it != m_node_infos.end()) + { + if (it->is_running()) + { + monitor_set_pending_status(&ms, SERVER_RUNNING); + } + else + { + monitor_clear_pending_status(&ms, SERVER_RUNNING); + } + } + else + { + monitor_clear_pending_status(&ms, SERVER_RUNNING); + } + + }); + + make_health_check(); + } + break; + } +} + +void ClustrixMonitor::fetch_cluster_nodes() +{ + auto b = begin(*(m_monitor->monitored_servers)); + auto e = end(*(m_monitor->monitored_servers)); + + auto it = find_if(b, e, + [this](MXS_MONITORED_SERVER& ms) -> bool { + mxs_connect_result_t rv = mon_ping_or_connect_to_db(m_monitor, &ms); + + return mon_connection_is_ok(rv) ? true : false; + }); + + if (it != e) + { + MXS_MONITORED_SERVER& ms = *it; + fetch_cluster_nodes_from(ms); + + m_pMonitored_server = &ms; + } + else + { + MXS_ERROR("Could not connect to any server."); + } +} + +void ClustrixMonitor::fetch_cluster_nodes_from(MXS_MONITORED_SERVER& ms) +{ + mxb_assert(ms.con); + + const char ZQUERY[] = "SELECT nodeid, iface_ip, mysql_port, healthmon_port FROM system.nodeinfo"; + + if (mysql_query(ms.con, ZQUERY) == 0) + { + MYSQL_RES* pResult = mysql_store_result(ms.con); + + if (pResult) { - m_http = mxb::http::get_async(m_health_urls); + mxb_assert(mysql_field_count(ms.con) == 4); - switch (m_http.status()) + vector node_infos; + vector health_urls; + + MYSQL_ROW row; + + while ((row = mysql_fetch_row(pResult)) != nullptr) { - case http::Async::PENDING: - initiate_delayed_http_check(); - break; + if (row[0] && row[1]) + { + int id = atoi(row[0]); + string ip = row[1]; + int mysql_port = row[2] ? atoi(row[2]) : DEFAULT_MYSQL_PORT; + int health_port = row[3] ? atoi(row[3]) : DEFAULT_HEALTH_PORT; - case http::Async::ERROR: - MXS_ERROR("Could not initiate health check."); - break; + node_infos.emplace_back(id, ip, mysql_port, health_port); - case http::Async::READY: - MXS_NOTICE("Health check available immediately."); - break; + string health_url = "http://" + ip + ":" + std::to_string(health_port); + health_urls.push_back(health_url); + } + else + { + MXS_WARNING("Either nodeid and/or iface_ip is missing, " + "ignoring node."); + } } + + mysql_free_result(pResult); + + m_node_infos.swap(node_infos); + m_health_urls.swap(health_urls); + + m_last_cluster_check = now(); } + else + { + MXS_WARNING("No result returned for '%s'.", ZQUERY); + } + } + else + { + MXS_ERROR("Could not execute '%s' on %s: %s", + ZQUERY, ms.server->address, mysql_error(ms.con)); + } +} + +void ClustrixMonitor::refresh_cluster_nodes() +{ + if (m_pMonitored_server) + { + mxs_connect_result_t rv = mon_ping_or_connect_to_db(m_monitor, m_pMonitored_server); + + if (mon_connection_is_ok(rv)) + { + fetch_cluster_nodes_from(*m_pMonitored_server); + } + else + { + mysql_close(m_pMonitored_server->con); + m_pMonitored_server->con = nullptr; + + fetch_cluster_nodes(); + } + } + else + { + fetch_cluster_nodes(); + } +} + +void ClustrixMonitor::make_health_check() +{ + mxb_assert(m_http.status() != http::Async::PENDING); + + m_http = mxb::http::get_async(m_health_urls); + + switch (m_http.status()) + { + case http::Async::PENDING: + initiate_delayed_http_check(); + break; + + case http::Async::ERROR: + MXS_ERROR("Could not initiate health check."); + break; + + case http::Async::READY: + MXS_NOTICE("Health check available immediately."); + break; } } @@ -126,26 +289,20 @@ bool ClustrixMonitor::check_http(Call::action_t action) { const vector& results = m_http.results(); - MXS_MONITORED_SERVER* pMonitored_server = m_monitor->monitored_servers; - - for (size_t i = 0; i < m_health_urls.size(); ++i) + for (size_t i = 0; i < results.size(); ++i) { - mxb_assert(pMonitored_server); - - const auto& url = m_health_urls[i]; const auto& result = results[i]; - MXS_INFO("%s: %s", url.c_str(), (result.code == 200) ? "OK" : result.body.c_str()); - - uint64_t bits = 0; + bool running = false; if (result.code == 200) { - bits |= SERVER_RUNNING; + running = true; } - monitor_set_pending_status(pMonitored_server, bits); - pMonitored_server = pMonitored_server->next; + auto& node_info = m_node_infos[i]; + + node_info.set_running(running); } } break; diff --git a/server/modules/monitor/clustrixmon/clustrixmonitor.hh b/server/modules/monitor/clustrixmon/clustrixmonitor.hh index 5db7499c9..8faa968c4 100644 --- a/server/modules/monitor/clustrixmon/clustrixmonitor.hh +++ b/server/modules/monitor/clustrixmon/clustrixmonitor.hh @@ -15,6 +15,7 @@ #include "clustrixmon.hh" #include #include +#include "clustrixnodeinfo.hh" class ClustrixMonitor : public maxscale::MonitorInstance { @@ -43,6 +44,8 @@ public: long m_cluster_monitor_interval; }; + ~ClustrixMonitor(); + static ClustrixMonitor* create(MXS_MONITOR* pMonitor); bool configure(const MXS_CONFIG_PARAMETER* pParams) override; @@ -51,15 +54,30 @@ private: ClustrixMonitor(MXS_MONITOR* pMonitor); void pre_loop() override; + void post_loop() override; void tick(); + void fetch_cluster_nodes(); + void fetch_cluster_nodes_from(MXS_MONITORED_SERVER& ms); + void refresh_cluster_nodes(); + + void make_health_check(); void initiate_delayed_http_check(); bool check_http(Call::action_t action); + static long now() + { + return mxb::WorkerLoad::get_time_ms(); + } + private: - Config m_config; - mxb::http::Async m_http; - std::vector m_health_urls; - uint32_t m_delayed_http_check_id; + Config m_config; + std::vector m_config_servers; + std::vector m_node_infos; + std::vector m_health_urls; + mxb::http::Async m_http; + uint32_t m_delayed_http_check_id { 0 }; + long m_last_cluster_check { 0 }; + MXS_MONITORED_SERVER* m_pMonitored_server { nullptr }; }; diff --git a/server/modules/monitor/clustrixmon/clustrixnodeinfo.hh b/server/modules/monitor/clustrixmon/clustrixnodeinfo.hh new file mode 100644 index 000000000..681f35ac2 --- /dev/null +++ b/server/modules/monitor/clustrixmon/clustrixnodeinfo.hh @@ -0,0 +1,88 @@ +/* + * Copyright (c) 2018 MariaDB Corporation Ab + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file and at www.mariadb.com/bsl11. + * + * Change Date: 2022-01-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2 or later of the General + * Public License. + */ +#pragma once + +#include "clustrixmon.hh" +#include +#include +#include + +class ClustrixNodeInfo +{ +public: + ClustrixNodeInfo(int id, + const std::string& ip, + int mysql_port, + int health_port) + : m_id(id) + , m_ip(ip) + , m_mysql_port(mysql_port) + , m_health_port(health_port) + { + } + + int id() const + { + return m_id; + } + + const std::string& ip() const + { + return m_ip; + } + + int mysql_port() const + { + return m_mysql_port; + } + + int health_port() const + { + return m_health_port; + } + + bool is_running() const + { + return m_is_running; + } + + void set_running(bool running) + { + m_is_running = running; + } + + std::string to_string() const + { + std::stringstream ss; + ss << "{" << m_id << ", " << m_ip << ", " << m_mysql_port << ", " << m_health_port << "}"; + return ss.str(); + } + + void print(std::ostream& o) const + { + o << to_string(); + } + +private: + int m_id; + std::string m_ip; + int m_mysql_port; + int m_health_port; + bool m_is_running { true }; // Assume running, until proven otherwise. +}; + +inline std::ostream& operator << (std::ostream& out, const ClustrixNodeInfo& x) +{ + x.print(out); + return out; +}