MXS-2384 Update servers concurrently in MariaDB-Monitor
All servers are now updated in their own threads simultaneously. This should reduce the possibility of having significantly different gtid:s shown for different servers.
This commit is contained in:
@ -16,6 +16,7 @@
|
|||||||
*/
|
*/
|
||||||
#include "mariadbmon.hh"
|
#include "mariadbmon.hh"
|
||||||
|
|
||||||
|
#include <future>
|
||||||
#include <inttypes.h>
|
#include <inttypes.h>
|
||||||
#include <sstream>
|
#include <sstream>
|
||||||
#include <maxbase/assert.h>
|
#include <maxbase/assert.h>
|
||||||
@ -83,15 +84,15 @@ void MariaDBMonitor::reset_server_info()
|
|||||||
for (auto mon_server : Monitor::m_servers)
|
for (auto mon_server : Monitor::m_servers)
|
||||||
{
|
{
|
||||||
m_servers.push_back(new MariaDBServer(mon_server, m_servers.size(),
|
m_servers.push_back(new MariaDBServer(mon_server, m_servers.size(),
|
||||||
m_assume_unique_hostnames, m_handle_event_scheduler));
|
m_assume_unique_hostnames, m_handle_event_scheduler));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void MariaDBMonitor::clear_server_info()
|
void MariaDBMonitor::clear_server_info()
|
||||||
{
|
{
|
||||||
for (auto iter = m_servers.begin(); iter != m_servers.end(); iter++)
|
for (auto server : m_servers)
|
||||||
{
|
{
|
||||||
delete *iter;
|
delete server;
|
||||||
}
|
}
|
||||||
// All MariaDBServer*:s are now invalid, as well as any dependant data.
|
// All MariaDBServer*:s are now invalid, as well as any dependant data.
|
||||||
m_servers.clear();
|
m_servers.clear();
|
||||||
@ -329,73 +330,6 @@ json_t* MariaDBMonitor::to_json() const
|
|||||||
return rval;
|
return rval;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Connect to and query/update a server.
|
|
||||||
*
|
|
||||||
* @param server The server to update
|
|
||||||
*/
|
|
||||||
void MariaDBMonitor::update_server(MariaDBServer* server, bool time_to_update_disk_space)
|
|
||||||
{
|
|
||||||
MonitorServer* mon_srv = server->m_server_base;
|
|
||||||
mxs_connect_result_t conn_status = mon_srv->ping_or_connect(m_settings.conn_settings);
|
|
||||||
MYSQL* conn = mon_srv->con; // mon_ping_or_connect_to_db() may have reallocated the MYSQL struct.
|
|
||||||
|
|
||||||
if (connection_is_ok(conn_status))
|
|
||||||
{
|
|
||||||
server->set_status(SERVER_RUNNING);
|
|
||||||
if (conn_status == MONITOR_CONN_NEWCONN_OK)
|
|
||||||
{
|
|
||||||
// Is a new connection or a reconnection. Check server version.
|
|
||||||
server->update_server_version();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (server->m_capabilities.basic_support
|
|
||||||
|| server->m_srv_type == MariaDBServer::server_type::BINLOG_ROUTER)
|
|
||||||
{
|
|
||||||
// Check permissions if permissions failed last time or if this is a new connection.
|
|
||||||
if (server->had_status(SERVER_AUTH_ERROR) || conn_status == MONITOR_CONN_NEWCONN_OK)
|
|
||||||
{
|
|
||||||
server->check_permissions();
|
|
||||||
}
|
|
||||||
|
|
||||||
// If permissions are ok, continue.
|
|
||||||
if (!server->has_status(SERVER_AUTH_ERROR))
|
|
||||||
{
|
|
||||||
if (time_to_update_disk_space && mon_srv->can_update_disk_space_status())
|
|
||||||
{
|
|
||||||
mon_srv->update_disk_space_status();
|
|
||||||
}
|
|
||||||
|
|
||||||
// Query MariaDBServer specific data
|
|
||||||
server->monitor_server();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
/* The current server is not running. Clear all but the stale master bit as it is used to detect
|
|
||||||
* masters that went down but came up. */
|
|
||||||
server->clear_status(~SERVER_WAS_MASTER);
|
|
||||||
auto conn_errno = mysql_errno(conn);
|
|
||||||
if (conn_errno == ER_ACCESS_DENIED_ERROR || conn_errno == ER_ACCESS_DENIED_NO_PASSWORD_ERROR)
|
|
||||||
{
|
|
||||||
server->set_status(SERVER_AUTH_ERROR);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Log connect failure only once, that is, if server was RUNNING or MAINTENANCE during last
|
|
||||||
* iteration. */
|
|
||||||
if (server->had_status(SERVER_RUNNING) || server->had_status(SERVER_MAINT))
|
|
||||||
{
|
|
||||||
mon_srv->log_connect_error(conn_status);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Increase or reset the error count of the server. */
|
|
||||||
bool is_running = server->is_running();
|
|
||||||
bool in_maintenance = server->is_in_maintenance();
|
|
||||||
mon_srv->mon_err_count = (is_running || in_maintenance) ? 0 : mon_srv->mon_err_count + 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
void MariaDBMonitor::pre_loop()
|
void MariaDBMonitor::pre_loop()
|
||||||
{
|
{
|
||||||
// MonitorInstance reads the journal and has the last known master in its m_master member variable.
|
// MonitorInstance reads the journal and has the last known master in its m_master member variable.
|
||||||
@ -434,11 +368,27 @@ void MariaDBMonitor::tick()
|
|||||||
}
|
}
|
||||||
|
|
||||||
bool should_update_disk_space = check_disk_space_this_tick();
|
bool should_update_disk_space = check_disk_space_this_tick();
|
||||||
|
const auto& conn_settings = m_settings.conn_settings;
|
||||||
|
|
||||||
|
auto update_task = [should_update_disk_space, conn_settings](MariaDBServer* server) {
|
||||||
|
server->update_server(should_update_disk_space, conn_settings);
|
||||||
|
};
|
||||||
|
|
||||||
|
// Asynchronously query all servers for their status.
|
||||||
|
std::vector<std::future<void>> futures;
|
||||||
|
futures.reserve(m_servers.size());
|
||||||
|
for (auto server : m_servers)
|
||||||
|
{
|
||||||
|
futures.emplace_back(std::async(std::launch::async, update_task, server));
|
||||||
|
}
|
||||||
|
// Wait for all updates to complete.
|
||||||
|
for (auto& f : futures)
|
||||||
|
{
|
||||||
|
f.get();
|
||||||
|
}
|
||||||
|
|
||||||
// Query all servers for their status.
|
|
||||||
for (MariaDBServer* server : m_servers)
|
for (MariaDBServer* server : m_servers)
|
||||||
{
|
{
|
||||||
update_server(server, should_update_disk_space);
|
|
||||||
if (server->m_topology_changed)
|
if (server->m_topology_changed)
|
||||||
{
|
{
|
||||||
m_cluster_topology_changed = true;
|
m_cluster_topology_changed = true;
|
||||||
|
|||||||
@ -107,8 +107,8 @@ public:
|
|||||||
bool run_manual_reset_replication(SERVER* master_server, json_t** error_out);
|
bool run_manual_reset_replication(SERVER* master_server, json_t** error_out);
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
void pre_loop();
|
void pre_loop() override;
|
||||||
void tick();
|
void tick() override;
|
||||||
void process_state_changes() override;
|
void process_state_changes() override;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
@ -255,7 +255,6 @@ private:
|
|||||||
MariaDBServer* get_server(SERVER* server);
|
MariaDBServer* get_server(SERVER* server);
|
||||||
|
|
||||||
// Cluster discovery and status assignment methods, top levels
|
// Cluster discovery and status assignment methods, top levels
|
||||||
void update_server(MariaDBServer* server, bool time_to_update_disk_space);
|
|
||||||
void update_topology();
|
void update_topology();
|
||||||
void build_replication_graph();
|
void build_replication_graph();
|
||||||
void assign_new_master(MariaDBServer* new_master);
|
void assign_new_master(MariaDBServer* new_master);
|
||||||
|
|||||||
@ -2177,4 +2177,73 @@ bool MariaDBServer::update_enabled_events()
|
|||||||
|
|
||||||
m_enabled_events = std::move(full_names);
|
m_enabled_events = std::move(full_names);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Connect to and query/update a server.
|
||||||
|
*
|
||||||
|
* @param server The server to update
|
||||||
|
*/
|
||||||
|
void MariaDBServer::update_server(bool time_to_update_disk_space,
|
||||||
|
const MonitorServer::ConnectionSettings& conn_settings)
|
||||||
|
{
|
||||||
|
auto server = this;
|
||||||
|
MonitorServer* mon_srv = server->m_server_base;
|
||||||
|
mxs_connect_result_t conn_status = mon_srv->ping_or_connect(conn_settings);
|
||||||
|
MYSQL* conn = mon_srv->con; // mon_ping_or_connect_to_db() may have reallocated the MYSQL struct.
|
||||||
|
|
||||||
|
if (mxs::Monitor::connection_is_ok(conn_status))
|
||||||
|
{
|
||||||
|
server->set_status(SERVER_RUNNING);
|
||||||
|
if (conn_status == MONITOR_CONN_NEWCONN_OK)
|
||||||
|
{
|
||||||
|
// Is a new connection or a reconnection. Check server version.
|
||||||
|
server->update_server_version();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (server->m_capabilities.basic_support
|
||||||
|
|| server->m_srv_type == MariaDBServer::server_type::BINLOG_ROUTER)
|
||||||
|
{
|
||||||
|
// Check permissions if permissions failed last time or if this is a new connection.
|
||||||
|
if (server->had_status(SERVER_AUTH_ERROR) || conn_status == MONITOR_CONN_NEWCONN_OK)
|
||||||
|
{
|
||||||
|
server->check_permissions();
|
||||||
|
}
|
||||||
|
|
||||||
|
// If permissions are ok, continue.
|
||||||
|
if (!server->has_status(SERVER_AUTH_ERROR))
|
||||||
|
{
|
||||||
|
if (time_to_update_disk_space && mon_srv->can_update_disk_space_status())
|
||||||
|
{
|
||||||
|
mon_srv->update_disk_space_status();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Query MariaDBServer specific data
|
||||||
|
server->monitor_server();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/* The current server is not running. Clear all but the stale master bit as it is used to detect
|
||||||
|
* masters that went down but came up. */
|
||||||
|
server->clear_status(~SERVER_WAS_MASTER);
|
||||||
|
auto conn_errno = mysql_errno(conn);
|
||||||
|
if (conn_errno == ER_ACCESS_DENIED_ERROR || conn_errno == ER_ACCESS_DENIED_NO_PASSWORD_ERROR)
|
||||||
|
{
|
||||||
|
server->set_status(SERVER_AUTH_ERROR);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Log connect failure only once, that is, if server was RUNNING or MAINTENANCE during last
|
||||||
|
* iteration. */
|
||||||
|
if (server->had_status(SERVER_RUNNING) || server->had_status(SERVER_MAINT))
|
||||||
|
{
|
||||||
|
mon_srv->log_connect_error(conn_status);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Increase or reset the error count of the server. */
|
||||||
|
bool is_running = server->is_running();
|
||||||
|
bool in_maintenance = server->is_in_maintenance();
|
||||||
|
mon_srv->mon_err_count = (is_running || in_maintenance) ? 0 : mon_srv->mon_err_count + 1;
|
||||||
|
}
|
||||||
|
|||||||
@ -116,7 +116,6 @@ public:
|
|||||||
bool log_slave_updates = false; /* Does the slave write replicated events to binlog? */
|
bool log_slave_updates = false; /* Does the slave write replicated events to binlog? */
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
/* Monitored server base class/struct. MariaDBServer does not own the struct, it is not freed
|
/* Monitored server base class/struct. MariaDBServer does not own the struct, it is not freed
|
||||||
* (or connection closed) when a MariaDBServer is destroyed. */
|
* (or connection closed) when a MariaDBServer is destroyed. */
|
||||||
mxs::MonitorServer* m_server_base = NULL;
|
mxs::MonitorServer* m_server_base = NULL;
|
||||||
@ -168,6 +167,9 @@ public:
|
|||||||
*/
|
*/
|
||||||
std::string diagnostics() const;
|
std::string diagnostics() const;
|
||||||
|
|
||||||
|
void update_server(bool time_to_update_disk_space,
|
||||||
|
const mxs::MonitorServer::ConnectionSettings& conn_settings);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Query this server.
|
* Query this server.
|
||||||
*/
|
*/
|
||||||
|
|||||||
Reference in New Issue
Block a user