diff --git a/server/modules/monitor/clustrixmon/clustrixmonitor.cc b/server/modules/monitor/clustrixmon/clustrixmonitor.cc index 1c8690dda..4fe8ab400 100644 --- a/server/modules/monitor/clustrixmon/clustrixmonitor.cc +++ b/server/modules/monitor/clustrixmon/clustrixmonitor.cc @@ -12,13 +12,25 @@ */ #include "clustrixmonitor.hh" +#include namespace http = mxb::http; using namespace std; +namespace +{ + +const int DEFAULT_MYSQL_PORT = 3306; +const int DEFAULT_HEALTH_PORT = 3581; + +} + ClustrixMonitor::ClustrixMonitor(MXS_MONITOR* pMonitor) : maxscale::MonitorInstance(pMonitor) - , m_delayed_http_check_id(0) +{ +} + +ClustrixMonitor::~ClustrixMonitor() { } @@ -30,40 +42,42 @@ ClustrixMonitor* ClustrixMonitor::create(MXS_MONITOR* pMonitor) bool ClustrixMonitor::configure(const MXS_CONFIG_PARAMETER* pParams) { + if (!m_monitor->monitored_servers) + { + MXS_WARNING("No servers specified, cannot start monitor."); + return false; + } + m_health_urls.clear(); m_config.set_cluster_monitor_interval(config_get_integer(pParams, CLUSTER_MONITOR_INTERVAL_NAME)); - MXS_MONITORED_SERVER* pMonitored_server = m_monitor->monitored_servers; - - while (pMonitored_server) - { - SERVER* pServer = pMonitored_server->server; - - string url(pServer->address); - url += ":"; - url += "3581"; // TODO: Possibly make configurable. - - m_health_urls.push_back(url); - - pMonitored_server = pMonitored_server->next; - } + refresh_cluster_nodes(); return true; } void ClustrixMonitor::pre_loop() { - m_http = mxb::http::get_async(m_health_urls); + make_health_check(); +} - if (m_http.status() == http::Async::ERROR) +void ClustrixMonitor::post_loop() +{ + if (m_pMonitored_server && m_pMonitored_server->con) { - MXS_WARNING("Could not initiate health check to nodes."); + mysql_close(m_pMonitored_server->con); + m_pMonitored_server->con = nullptr; } } void ClustrixMonitor::tick() { + if (now() - m_last_cluster_check > m_config.cluster_monitor_interval()) + { + refresh_cluster_nodes(); + } + switch (m_http.status()) { case http::Async::PENDING: @@ -72,25 +86,174 @@ void ClustrixMonitor::tick() case http::Async::ERROR: MXS_WARNING("Health check round ended with general error."); + make_health_check(); + break; + case http::Async::READY: + { + auto b = std::begin(*m_monitor->monitored_servers); + auto e = std::end(*m_monitor->monitored_servers); + + for_each(b, e, + [this](MXS_MONITORED_SERVER& ms) { + monitor_stash_current_status(&ms); + + auto it = find_if(m_node_infos.begin(), m_node_infos.end(), + [&ms](const ClustrixNodeInfo& info) -> bool { + return ms.server->address == info.ip(); + }); + + if (it != m_node_infos.end()) + { + if (it->is_running()) + { + monitor_set_pending_status(&ms, SERVER_RUNNING); + } + else + { + monitor_clear_pending_status(&ms, SERVER_RUNNING); + } + } + else + { + monitor_clear_pending_status(&ms, SERVER_RUNNING); + } + + }); + + make_health_check(); + } + break; + } +} + +void ClustrixMonitor::fetch_cluster_nodes() +{ + auto b = begin(*(m_monitor->monitored_servers)); + auto e = end(*(m_monitor->monitored_servers)); + + auto it = find_if(b, e, + [this](MXS_MONITORED_SERVER& ms) -> bool { + mxs_connect_result_t rv = mon_ping_or_connect_to_db(m_monitor, &ms); + + return mon_connection_is_ok(rv) ? true : false; + }); + + if (it != e) + { + MXS_MONITORED_SERVER& ms = *it; + fetch_cluster_nodes_from(ms); + + m_pMonitored_server = &ms; + } + else + { + MXS_ERROR("Could not connect to any server."); + } +} + +void ClustrixMonitor::fetch_cluster_nodes_from(MXS_MONITORED_SERVER& ms) +{ + mxb_assert(ms.con); + + const char ZQUERY[] = "SELECT nodeid, iface_ip, mysql_port, healthmon_port FROM system.nodeinfo"; + + if (mysql_query(ms.con, ZQUERY) == 0) + { + MYSQL_RES* pResult = mysql_store_result(ms.con); + + if (pResult) { - m_http = mxb::http::get_async(m_health_urls); + mxb_assert(mysql_field_count(ms.con) == 4); - switch (m_http.status()) + vector node_infos; + vector health_urls; + + MYSQL_ROW row; + + while ((row = mysql_fetch_row(pResult)) != nullptr) { - case http::Async::PENDING: - initiate_delayed_http_check(); - break; + if (row[0] && row[1]) + { + int id = atoi(row[0]); + string ip = row[1]; + int mysql_port = row[2] ? atoi(row[2]) : DEFAULT_MYSQL_PORT; + int health_port = row[3] ? atoi(row[3]) : DEFAULT_HEALTH_PORT; - case http::Async::ERROR: - MXS_ERROR("Could not initiate health check."); - break; + node_infos.emplace_back(id, ip, mysql_port, health_port); - case http::Async::READY: - MXS_NOTICE("Health check available immediately."); - break; + string health_url = "http://" + ip + ":" + std::to_string(health_port); + health_urls.push_back(health_url); + } + else + { + MXS_WARNING("Either nodeid and/or iface_ip is missing, " + "ignoring node."); + } } + + mysql_free_result(pResult); + + m_node_infos.swap(node_infos); + m_health_urls.swap(health_urls); + + m_last_cluster_check = now(); } + else + { + MXS_WARNING("No result returned for '%s'.", ZQUERY); + } + } + else + { + MXS_ERROR("Could not execute '%s' on %s: %s", + ZQUERY, ms.server->address, mysql_error(ms.con)); + } +} + +void ClustrixMonitor::refresh_cluster_nodes() +{ + if (m_pMonitored_server) + { + mxs_connect_result_t rv = mon_ping_or_connect_to_db(m_monitor, m_pMonitored_server); + + if (mon_connection_is_ok(rv)) + { + fetch_cluster_nodes_from(*m_pMonitored_server); + } + else + { + mysql_close(m_pMonitored_server->con); + m_pMonitored_server->con = nullptr; + + fetch_cluster_nodes(); + } + } + else + { + fetch_cluster_nodes(); + } +} + +void ClustrixMonitor::make_health_check() +{ + mxb_assert(m_http.status() != http::Async::PENDING); + + m_http = mxb::http::get_async(m_health_urls); + + switch (m_http.status()) + { + case http::Async::PENDING: + initiate_delayed_http_check(); + break; + + case http::Async::ERROR: + MXS_ERROR("Could not initiate health check."); + break; + + case http::Async::READY: + MXS_NOTICE("Health check available immediately."); + break; } } @@ -126,26 +289,20 @@ bool ClustrixMonitor::check_http(Call::action_t action) { const vector& results = m_http.results(); - MXS_MONITORED_SERVER* pMonitored_server = m_monitor->monitored_servers; - - for (size_t i = 0; i < m_health_urls.size(); ++i) + for (size_t i = 0; i < results.size(); ++i) { - mxb_assert(pMonitored_server); - - const auto& url = m_health_urls[i]; const auto& result = results[i]; - MXS_INFO("%s: %s", url.c_str(), (result.code == 200) ? "OK" : result.body.c_str()); - - uint64_t bits = 0; + bool running = false; if (result.code == 200) { - bits |= SERVER_RUNNING; + running = true; } - monitor_set_pending_status(pMonitored_server, bits); - pMonitored_server = pMonitored_server->next; + auto& node_info = m_node_infos[i]; + + node_info.set_running(running); } } break; diff --git a/server/modules/monitor/clustrixmon/clustrixmonitor.hh b/server/modules/monitor/clustrixmon/clustrixmonitor.hh index 5db7499c9..8faa968c4 100644 --- a/server/modules/monitor/clustrixmon/clustrixmonitor.hh +++ b/server/modules/monitor/clustrixmon/clustrixmonitor.hh @@ -15,6 +15,7 @@ #include "clustrixmon.hh" #include #include +#include "clustrixnodeinfo.hh" class ClustrixMonitor : public maxscale::MonitorInstance { @@ -43,6 +44,8 @@ public: long m_cluster_monitor_interval; }; + ~ClustrixMonitor(); + static ClustrixMonitor* create(MXS_MONITOR* pMonitor); bool configure(const MXS_CONFIG_PARAMETER* pParams) override; @@ -51,15 +54,30 @@ private: ClustrixMonitor(MXS_MONITOR* pMonitor); void pre_loop() override; + void post_loop() override; void tick(); + void fetch_cluster_nodes(); + void fetch_cluster_nodes_from(MXS_MONITORED_SERVER& ms); + void refresh_cluster_nodes(); + + void make_health_check(); void initiate_delayed_http_check(); bool check_http(Call::action_t action); + static long now() + { + return mxb::WorkerLoad::get_time_ms(); + } + private: - Config m_config; - mxb::http::Async m_http; - std::vector m_health_urls; - uint32_t m_delayed_http_check_id; + Config m_config; + std::vector m_config_servers; + std::vector m_node_infos; + std::vector m_health_urls; + mxb::http::Async m_http; + uint32_t m_delayed_http_check_id { 0 }; + long m_last_cluster_check { 0 }; + MXS_MONITORED_SERVER* m_pMonitored_server { nullptr }; }; diff --git a/server/modules/monitor/clustrixmon/clustrixnodeinfo.hh b/server/modules/monitor/clustrixmon/clustrixnodeinfo.hh new file mode 100644 index 000000000..681f35ac2 --- /dev/null +++ b/server/modules/monitor/clustrixmon/clustrixnodeinfo.hh @@ -0,0 +1,88 @@ +/* + * Copyright (c) 2018 MariaDB Corporation Ab + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file and at www.mariadb.com/bsl11. + * + * Change Date: 2022-01-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2 or later of the General + * Public License. + */ +#pragma once + +#include "clustrixmon.hh" +#include +#include +#include + +class ClustrixNodeInfo +{ +public: + ClustrixNodeInfo(int id, + const std::string& ip, + int mysql_port, + int health_port) + : m_id(id) + , m_ip(ip) + , m_mysql_port(mysql_port) + , m_health_port(health_port) + { + } + + int id() const + { + return m_id; + } + + const std::string& ip() const + { + return m_ip; + } + + int mysql_port() const + { + return m_mysql_port; + } + + int health_port() const + { + return m_health_port; + } + + bool is_running() const + { + return m_is_running; + } + + void set_running(bool running) + { + m_is_running = running; + } + + std::string to_string() const + { + std::stringstream ss; + ss << "{" << m_id << ", " << m_ip << ", " << m_mysql_port << ", " << m_health_port << "}"; + return ss.str(); + } + + void print(std::ostream& o) const + { + o << to_string(); + } + +private: + int m_id; + std::string m_ip; + int m_mysql_port; + int m_health_port; + bool m_is_running { true }; // Assume running, until proven otherwise. +}; + +inline std::ostream& operator << (std::ostream& out, const ClustrixNodeInfo& x) +{ + x.print(out); + return out; +}