MXS-2219 Use system.membership as primary table

From system.membership we can find out what server exist in the
cluster while system.nodeinfo contains information about those
servers. If a node goes down, it will disappear from system.nodeinfo,
but not from system.membership. Consequently, we must start from
system.membership and then fetch more information from system.nodeinfo.

Incidentally, a query like

    SELECT ms.nid, ni.iface_ip
    FROM system.membership AS ms
        LEFT JOIN system.nodeinfo AS ni ON ms.nid=ni.nodeid;

should provide all information in one go, but it seems that such joins
are not supported on the system tables.
This commit is contained in:
Johan Wikman 2019-01-17 16:58:05 +02:00
parent f7c840df26
commit 6b556859ce
8 changed files with 564 additions and 163 deletions

View File

@ -1,4 +1,5 @@
add_library(clustrixmon SHARED
clustrix.cc
clustrixmon.cc
clustrixmonitor.cc
)

View File

@ -0,0 +1,88 @@
/*
* Copyright (c) 2018 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2022-01-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include "clustrix.hh"
#include <maxbase/assert.h>
namespace
{
const char CN_NORMAL[] = "normal";
const char CN_QUORUM[] = "quorum";
const char CN_STATIC[] = "static";
const char CN_UNKNOWN[] = "unknown";
}
std::string Clustrix::to_string(Clustrix::Status status)
{
switch (status)
{
case Status::QUORUM:
return CN_QUORUM;
case Status::STATIC:
return CN_STATIC;
case Status::UNKNOWN:
return CN_UNKNOWN;
}
mxb_assert(!true);
return CN_UNKNOWN;
}
Clustrix::Status Clustrix::status_from_string(const std::string& status)
{
if (status == CN_QUORUM)
{
return Status::QUORUM;
}
else if (status == CN_STATIC)
{
return Status::STATIC;
}
else
{
MXB_WARNING("'%s' is an unknown status for a Clustrix node.", status.c_str());
return Status::UNKNOWN;
}
}
std::string Clustrix::to_string(Clustrix::SubState substate)
{
switch (substate)
{
case SubState::NORMAL:
return CN_NORMAL;
case SubState::UNKNOWN:
return CN_UNKNOWN;
}
mxb_assert(!true);
return CN_UNKNOWN;
}
Clustrix::SubState Clustrix::substate_from_string(const std::string& substate)
{
if (substate == CN_NORMAL)
{
return SubState::NORMAL;
}
else
{
MXB_WARNING("'%s' is an unknown sub-state for a Clustrix node.", substate.c_str());
return SubState::UNKNOWN;
}
}

View File

@ -0,0 +1,40 @@
/*
* Copyright (c) 2018 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2022-01-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#pragma once
#include "clustrixmon.hh"
#include <string>
namespace Clustrix
{
enum class Status
{
QUORUM,
STATIC,
UNKNOWN
};
Status status_from_string(const std::string& status);
std::string to_string(Status status);
enum class SubState
{
NORMAL,
UNKNOWN
};
SubState substate_from_string(const std::string& substate);
std::string to_string(SubState sub_state);
}

View File

@ -0,0 +1,83 @@
/*
* Copyright (c) 2018 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2022-01-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#pragma once
#include "clustrixmon.hh"
#include <iostream>
#include <sstream>
#include <string>
#include "clustrix.hh"
class ClustrixMembership
{
public:
ClustrixMembership(int id,
Clustrix::Status status,
Clustrix::SubState substate,
int instance)
: m_id(id)
, m_status(status)
, m_substate(substate)
, m_instance(instance)
{
}
int id() const
{
return m_id;
}
Clustrix::Status status() const
{
return m_status;
}
Clustrix::SubState substate() const
{
return m_substate;
}
int instance() const
{
return m_instance;
}
std::string to_string() const
{
std::stringstream ss;
ss << "{"
<< m_id << ", "
<< Clustrix::to_string(m_status) << ", "
<< Clustrix::to_string(m_substate) << ", "
<< m_instance
<< "}";
return ss.str();
}
void print(std::ostream& o) const
{
o << to_string();
}
private:
int m_id;
Clustrix::Status m_status;
Clustrix::SubState m_substate;
int m_instance;
};
inline std::ostream& operator << (std::ostream& out, const ClustrixMembership& x)
{
x.print(out);
return out;
}

View File

@ -45,7 +45,7 @@ ClustrixMonitor* ClustrixMonitor::create(MXS_MONITOR* pMonitor)
bool ClustrixMonitor::configure(const MXS_CONFIG_PARAMETER* pParams)
{
m_health_urls.clear();
m_node_infos.clear();
m_nodes.clear();
m_config.set_cluster_monitor_interval(config_get_integer(pParams, CLUSTER_MONITOR_INTERVAL_NAME));
m_config.set_health_check_threshold(config_get_integer(pParams, HEALTH_CHECK_THRESHOLD_NAME));
@ -97,7 +97,7 @@ void ClustrixMonitor::tick()
}
}
void ClustrixMonitor::fetch_cluster_nodes()
void ClustrixMonitor::update_cluster_nodes()
{
auto b = begin(*(m_monitor->monitored_servers));
auto e = end(*(m_monitor->monitored_servers));
@ -112,7 +112,7 @@ void ClustrixMonitor::fetch_cluster_nodes()
if (it != e)
{
MXS_MONITORED_SERVER& ms = *it;
fetch_cluster_nodes_from(ms);
update_cluster_nodes(ms);
m_pMonitored_server = &ms;
}
@ -122,10 +122,14 @@ void ClustrixMonitor::fetch_cluster_nodes()
}
}
void ClustrixMonitor::fetch_cluster_nodes_from(MXS_MONITORED_SERVER& ms)
void ClustrixMonitor::update_cluster_nodes(MXS_MONITORED_SERVER& ms)
{
mxb_assert(ms.con);
map<int, ClustrixMembership> memberships;
check_cluster_membership(ms, &memberships);
const char ZQUERY[] = "SELECT nodeid, iface_ip, mysql_port, healthmon_port FROM system.nodeinfo";
if (mysql_query(ms.con, ZQUERY) == 0)
@ -136,14 +140,13 @@ void ClustrixMonitor::fetch_cluster_nodes_from(MXS_MONITORED_SERVER& ms)
{
mxb_assert(mysql_field_count(ms.con) == 4);
MYSQL_ROW row;
set<int> nids;
for_each(m_node_infos.begin(), m_node_infos.end(),
[&nids](const pair<int, ClustrixNodeInfo>& element) {
for_each(m_nodes.begin(), m_nodes.end(),
[&nids](const pair<int, ClustrixNode>& element) {
nids.insert(element.first);
});
MYSQL_ROW row;
while ((row = mysql_fetch_row(pResult)) != nullptr)
{
if (row[0] && row[1])
@ -152,14 +155,39 @@ void ClustrixMonitor::fetch_cluster_nodes_from(MXS_MONITORED_SERVER& ms)
string ip = row[1];
int mysql_port = row[2] ? atoi(row[2]) : DEFAULT_MYSQL_PORT;
int health_port = row[3] ? atoi(row[3]) : DEFAULT_HEALTH_PORT;
int health_check_threshold = m_config.health_check_threshold();
string name = "@Clustrix-Server-" + std::to_string(id);
auto it = m_node_infos.find(id);
auto nit = m_nodes.find(id);
auto mit = memberships.find(id);
if (it == m_node_infos.end())
if (nit != m_nodes.end())
{
// Existing node.
mxb_assert(SERVER::find_by_unique_name(name));
ClustrixNode& node = nit->second;
if (node.ip() != ip)
{
node.set_ip(ip);
}
if (node.mysql_port() != mysql_port)
{
node.set_mysql_port(mysql_port);
}
if (node.health_port() != health_port)
{
node.set_health_port(health_port);
}
nids.erase(id);
}
else if (mit != memberships.end())
{
// New node.
mxb_assert(!SERVER::find_by_unique_name(name));
if (runtime_create_server(name.c_str(),
@ -172,23 +200,28 @@ void ClustrixMonitor::fetch_cluster_nodes_from(MXS_MONITORED_SERVER& ms)
SERVER* pServer = SERVER::find_by_unique_name(name);
mxb_assert(pServer);
ClustrixNodeInfo info(id, ip, mysql_port, health_port, health_check_threshold, pServer);
const ClustrixMembership& membership = mit->second;
int health_check_threshold = m_config.health_check_threshold();
m_node_infos.insert(make_pair(id, info));
ClustrixNode node(membership, ip, mysql_port, health_port, health_check_threshold, pServer);
m_nodes.insert(make_pair(id, node));
}
else
{
MXS_ERROR("Could not create server %s at %s:%d.",
name.c_str(), ip.c_str(), mysql_port);
}
memberships.erase(mit);
}
else
{
mxb_assert(SERVER::find_by_unique_name(name));
// Node found in system.node_info but not in system.membership
MXS_ERROR("Node %d at %s:%d,%d found in system.node_info "
"but not in system.membership.",
id, ip.c_str(), mysql_port, health_port);
auto it = nids.find(id);
mxb_assert(it != nids.end());
nids.erase(it);
}
}
else
@ -201,19 +234,18 @@ void ClustrixMonitor::fetch_cluster_nodes_from(MXS_MONITORED_SERVER& ms)
for_each(nids.begin(), nids.end(),
[this](int nid) {
auto it = m_node_infos.find(nid);
mxb_assert(it != m_node_infos.end());
auto it = m_nodes.find(nid);
mxb_assert(it != m_nodes.end());
ClustrixNodeInfo& info = it->second;
info.deactivate_server();
m_node_infos.erase(it);
ClustrixNode& node = it->second;
node.set_running(false, ClustrixNode::APPROACH_OVERRIDE);
});
vector<string> health_urls;
for_each(m_node_infos.begin(), m_node_infos.end(),
[&health_urls](const pair<int, ClustrixNodeInfo>& element) {
const ClustrixNodeInfo& info = element.second;
string url = "http://" + info.ip() + ":" + std::to_string(info.health_port());
for_each(m_nodes.begin(), m_nodes.end(),
[&health_urls](const pair<int, ClustrixNode>& element) {
const ClustrixNode& node = element.second;
string url = "http://" + node.ip() + ":" + std::to_string(node.health_port());
health_urls.push_back(url);
});
@ -242,22 +274,113 @@ void ClustrixMonitor::refresh_cluster_nodes()
if (mon_connection_is_ok(rv))
{
fetch_cluster_nodes_from(*m_pMonitored_server);
update_cluster_nodes(*m_pMonitored_server);
}
else
{
mysql_close(m_pMonitored_server->con);
m_pMonitored_server->con = nullptr;
fetch_cluster_nodes();
update_cluster_nodes();
}
}
else if (m_monitor->monitored_servers)
{
fetch_cluster_nodes();
update_cluster_nodes();
}
}
bool ClustrixMonitor::check_cluster_membership(MXS_MONITORED_SERVER& ms,
std::map<int, ClustrixMembership>* pMemberships)
{
mxb_assert(ms.con);
mxb_assert(pMemberships);
bool rv = false;
const char ZQUERY[] = "SELECT nid, status, instance, substate FROM system.membership";
if (mysql_query(ms.con, ZQUERY) == 0)
{
MYSQL_RES* pResult = mysql_store_result(ms.con);
if (pResult)
{
mxb_assert(mysql_field_count(ms.con) == 4);
set<int> nids;
for_each(m_nodes.begin(), m_nodes.end(),
[&nids](const pair<int, ClustrixNode>& element) {
nids.insert(element.first);
});
MYSQL_ROW row;
while ((row = mysql_fetch_row(pResult)) != nullptr)
{
if (row[0])
{
int nid = atoi(row[0]);
string status = row[1] ? row[1] : "unknown";
int instance = row[2] ? atoi(row[2]) : -1;
string substate = row[3] ? row[3] : "unknown";
auto it = m_nodes.find(nid);
if (it != m_nodes.end())
{
ClustrixNode& node = it->second;
node.update(Clustrix::status_from_string(status),
Clustrix::substate_from_string(substate),
instance);
nids.erase(node.id());
}
else
{
ClustrixMembership membership(nid,
Clustrix::status_from_string(status),
Clustrix::substate_from_string(substate),
instance);
pMemberships->insert(make_pair(nid, membership));
}
}
else
{
MXS_WARNING("No node id returned in row for '%s'.", ZQUERY);
}
}
mysql_free_result(pResult);
// Deactivate all servers that are no longer members.
for_each(nids.begin(), nids.end(),
[this](int nid) {
auto it = m_nodes.find(nid);
mxb_assert(it != m_nodes.end());
ClustrixNode& node = it->second;
node.deactivate_server();
m_nodes.erase(it);
});
rv = true;
}
else
{
MXS_WARNING("No result returned for '%s'.", ZQUERY);
}
}
else
{
MXS_ERROR("Could not execute '%s' on %s: %s",
ZQUERY, ms.server->address, mysql_error(ms.con));
}
return rv;
}
void ClustrixMonitor::update_server_statuses()
{
mxb_assert(m_monitor->monitored_servers);
@ -269,15 +392,15 @@ void ClustrixMonitor::update_server_statuses()
[this](MXS_MONITORED_SERVER& ms) {
monitor_stash_current_status(&ms);
auto it = find_if(m_node_infos.begin(), m_node_infos.end(),
[&ms](const std::pair<int,ClustrixNodeInfo>& element) -> bool {
const ClustrixNodeInfo& info = element.second;
auto it = find_if(m_nodes.begin(), m_nodes.end(),
[&ms](const std::pair<int,ClustrixNode>& element) -> bool {
const ClustrixNode& info = element.second;
return ms.server->address == info.ip();
});
if (it != m_node_infos.end())
if (it != m_nodes.end())
{
const ClustrixNodeInfo& info = it->second;
const ClustrixNode& info = it->second;
if (info.is_running())
{
@ -350,7 +473,7 @@ bool ClustrixMonitor::check_http(Call::action_t action)
{
const vector<http::Result>& results = m_http.results();
auto it = m_node_infos.begin();
auto it = m_nodes.begin();
for_each(results.begin(), results.end(),
[&it](const http::Result& result) {

View File

@ -16,7 +16,8 @@
#include <map>
#include <maxscale/monitor.hh>
#include <maxbase/http.hh>
#include "clustrixnodeinfo.hh"
#include "clustrixmembership.hh"
#include "clustrixnode.hh"
class ClustrixMonitor : public maxscale::MonitorInstance
{
@ -71,9 +72,10 @@ private:
void tick();
void fetch_cluster_nodes();
void fetch_cluster_nodes_from(MXS_MONITORED_SERVER& ms);
void update_cluster_nodes();
void update_cluster_nodes(MXS_MONITORED_SERVER& ms);
void refresh_cluster_nodes();
bool check_cluster_membership(MXS_MONITORED_SERVER& ms, std::map<int, ClustrixMembership>* pMemberships);
void update_server_statuses();
void make_health_check();
@ -86,12 +88,11 @@ private:
}
private:
Config m_config;
std::vector<std::string> m_config_servers;
std::map<int, ClustrixNodeInfo> m_node_infos;
std::vector<std::string> m_health_urls;
mxb::http::Async m_http;
uint32_t m_delayed_http_check_id { 0 };
long m_last_cluster_check { 0 };
MXS_MONITORED_SERVER* m_pMonitored_server { nullptr };
Config m_config;
std::map<int, ClustrixNode> m_nodes;
std::vector<std::string> m_health_urls;
mxb::http::Async m_http;
uint32_t m_delayed_http_check_id { 0 };
long m_last_cluster_check { 0 };
MXS_MONITORED_SERVER* m_pMonitored_server { nullptr };
};

View File

@ -0,0 +1,182 @@
/*
* Copyright (c) 2018 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2022-01-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#pragma once
#include "clustrixmon.hh"
#include <iostream>
#include <sstream>
#include <string>
#include "clustrix.hh"
class ClustrixNode
{
public:
enum
{
DEFAULT_MYSQL_PORT = 3306,
DEFAULT_HEALTH_PORT = 3581,
};
enum approach_t
{
APPROACH_OVERRIDE,
APPROACH_DEFAULT
};
ClustrixNode(const ClustrixMembership& membership,
const std::string& ip,
int mysql_port,
int health_port,
int health_check_threshold,
SERVER* pServer)
: m_id(membership.id())
, m_status(membership.status())
, m_substate(membership.substate())
, m_instance(membership.instance())
, m_ip(ip)
, m_mysql_port(mysql_port)
, m_health_port(health_port)
, m_health_check_threshold(health_check_threshold)
, m_nRunning(m_health_check_threshold)
, m_pServer(pServer)
{
}
int id() const
{
return m_id;
}
Clustrix::Status status() const
{
return m_status;
}
Clustrix::SubState substate() const
{
return m_substate;
}
int instance() const
{
return m_instance;
}
const std::string& ip() const
{
return m_ip;
}
void set_ip(const std::string& ip)
{
m_ip = ip;
m_pServer->server_update_address(ip);
}
int mysql_port() const
{
return m_mysql_port;
}
void set_mysql_port(int port)
{
m_mysql_port = port;
m_pServer->update_port(port);
}
int health_port() const
{
return m_health_port;
}
void set_health_port(int port)
{
m_health_port = port;
}
bool is_running() const
{
return m_nRunning > 0;
}
void set_running(bool running, approach_t approach = APPROACH_DEFAULT)
{
if (running)
{
m_nRunning = m_health_check_threshold;
m_pServer->set_status(SERVER_RUNNING);
}
else
{
if (m_nRunning > 0)
{
if (approach == APPROACH_OVERRIDE)
{
m_nRunning = 0;
}
else
{
--m_nRunning;
}
if (m_nRunning == 0)
{
m_pServer->clear_status(SERVER_RUNNING);
}
}
}
}
void update(Clustrix::Status status, Clustrix::SubState substate, int instance)
{
m_status = status;
m_substate = substate;
m_instance = instance;
}
void deactivate_server()
{
m_pServer->is_active = false;
}
std::string to_string() const
{
std::stringstream ss;
ss << "{" << m_id << ", " << m_ip << ", " << m_mysql_port << ", " << m_health_port << "}";
return ss.str();
}
void print(std::ostream& o) const
{
o << to_string();
}
private:
int m_id;
Clustrix::Status m_status;
Clustrix::SubState m_substate;
int m_instance;
std::string m_ip;
int m_mysql_port { DEFAULT_MYSQL_PORT };
int m_health_port { DEFAULT_HEALTH_PORT };
int m_health_check_threshold { DEFAULT_HEALTH_CHECK_THRESHOLD_VALUE };
int m_nRunning { 0 };
SERVER* m_pServer { nullptr };
};
inline std::ostream& operator << (std::ostream& out, const ClustrixNode& x)
{
x.print(out);
return out;
}

View File

@ -1,117 +0,0 @@
/*
* Copyright (c) 2018 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2022-01-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#pragma once
#include "clustrixmon.hh"
#include <iostream>
#include <sstream>
#include <string>
class ClustrixNodeInfo
{
public:
ClustrixNodeInfo(int id,
const std::string& ip,
int mysql_port,
int health_port,
int health_check_threshold,
SERVER* pServer)
: m_id(id)
, m_ip(ip)
, m_mysql_port(mysql_port)
, m_health_port(health_port)
, m_health_check_threshold(health_check_threshold)
, m_nRunning(m_health_check_threshold)
, m_pServer(pServer)
{
}
int id() const
{
return m_id;
}
const std::string& ip() const
{
return m_ip;
}
int mysql_port() const
{
return m_mysql_port;
}
int health_port() const
{
return m_health_port;
}
bool is_running() const
{
return m_nRunning > 0;
}
void set_running(bool running)
{
if (running)
{
m_nRunning = m_health_check_threshold;
m_pServer->set_status(SERVER_RUNNING);
}
else
{
if (m_nRunning > 0)
{
--m_nRunning;
if (m_nRunning == 0)
{
m_pServer->clear_status(SERVER_RUNNING);
}
}
}
}
void deactivate_server()
{
m_pServer->is_active = false;
}
std::string to_string() const
{
std::stringstream ss;
ss << "{" << m_id << ", " << m_ip << ", " << m_mysql_port << ", " << m_health_port << "}";
return ss.str();
}
void print(std::ostream& o) const
{
o << to_string();
}
private:
int m_id;
std::string m_ip;
int m_mysql_port;
int m_health_port;
int m_health_check_threshold;
int m_nRunning;
SERVER* m_pServer;
};
inline std::ostream& operator << (std::ostream& out, const ClustrixNodeInfo& x)
{
x.print(out);
return out;
}