MXS-2271 Move global monitor list inside a local class

Cleans up list locking and handling.
This commit is contained in:
Esa Korhonen 2019-01-22 15:24:09 +02:00
parent f6cec41dd8
commit 00594cc369
3 changed files with 131 additions and 147 deletions

View File

@ -200,13 +200,12 @@ public:
*/
virtual json_t* diagnostics_json() const = 0;
const char* const name; /**< Monitor instance name. TODO: change to string */
const std::string module_name; /**< Name of the monitor module */
Monitor* next; /**< Next monitor in the linked list */
const char* const name; /**< Monitor instance name. TODO: change to string */
const std::string module_name; /**< Name of the monitor module */
bool active = true; /**< True if monitor exists and has not been "destroyed". */
mutable std::mutex lock;
mutable std::mutex lock;
bool active = true; /**< True if monitor exists and has not been "destroyed". */
/** The state of the monitor. This should ONLY be written to by the admin thread. */
monitor_state_t state = MONITOR_STATE_STOPPED;

View File

@ -105,14 +105,6 @@ public:
* @attn Must only be called in single-thread context at system shutdown.
*/
static void destroy_all_monitors();
/**
* Free a monitor, first stop the monitor and then remove the monitor from
* the chain of monitors and free the memory.
*
* @param mon The monitor to free
*/
static void destroy_monitor(Monitor*);
};

View File

@ -81,8 +81,61 @@ const char CN_MONITOR_INTERVAL[] = "monitor_interval";
const char CN_SCRIPT[] = "script";
const char CN_SCRIPT_TIMEOUT[] = "script_timeout";
static Monitor* allMonitors = NULL;
static std::mutex monLock;
namespace
{
class ThisUnit
{
public:
/**
* Call a function on every monitor in the global monitor list.
*
* @param apply The function to apply. If the function returns false, iteration is discontinued.
*/
void foreach_monitor(std::function<bool(Monitor*)> apply)
{
Guard guard(m_all_monitors_lock);
for (Monitor* monitor : m_all_monitors)
{
if (!apply(monitor))
{
break;
}
}
}
/**
* Clear the internal list and return previous contents.
*
* @return Contents before clearing
*/
std::vector<Monitor*> clear()
{
Guard guard(m_all_monitors_lock);
return std::move(m_all_monitors);
}
void insert_front(Monitor* monitor)
{
Guard guard(m_all_monitors_lock);
m_all_monitors.insert(m_all_monitors.begin(), monitor);
}
void run_behind_lock(std::function<void(void)> apply)
{
Guard guard(m_all_monitors_lock);
apply();
}
private:
std::mutex m_all_monitors_lock; /**< Protects access to array */
std::vector<Monitor*> m_all_monitors; /**< Global list of monitors, in configuration file order */
};
ThisUnit this_unit;
}
static void monitor_server_free_all(MXS_MONITORED_SERVER* servers);
static void remove_server_journal(Monitor* monitor);
@ -116,9 +169,7 @@ Monitor* MonitorManager::create_monitor(const string& name, const string& module
if (mon->configure_base(params)) // TODO: Move derived class configure() here
{
std::lock_guard<std::mutex> guard(monLock);
mon->next = allMonitors;
allMonitors = mon;
this_unit.insert_front(mon);
}
else
{
@ -176,33 +227,6 @@ bool Monitor::configure_base(const MXS_CONFIG_PARAMETER* params)
return !error;
}
void MonitorManager::destroy_monitor(Monitor* mon)
{
Monitor* ptr;
std::unique_lock<std::mutex> guard(monLock);
if (allMonitors == mon)
{
allMonitors = mon->next;
}
else
{
ptr = allMonitors;
while (ptr->next && ptr->next != mon)
{
ptr = ptr->next;
}
if (ptr->next)
{
ptr->next = mon->next;
}
}
guard.unlock();
delete mon;
}
Monitor::~Monitor()
{
delete disk_space_threshold;
@ -213,14 +237,11 @@ Monitor::~Monitor()
void MonitorManager::destroy_all_monitors()
{
// monitor_destroy() grabs 'monLock', so it cannot be grabbed here
// without additional changes. But this function should only be
// called at system shutdown in single-thread context.
while (allMonitors)
auto monitors = this_unit.clear();
for (auto monitor : monitors)
{
Monitor* monitor = allMonitors;
destroy_monitor(monitor);
mxb_assert(monitor->state == MONITOR_STATE_STOPPED);
delete monitor;
}
}
@ -261,18 +282,13 @@ void monitor_start(Monitor* monitor, const MXS_CONFIG_PARAMETER* params)
*/
void monitor_start_all()
{
Monitor* ptr;
std::lock_guard<std::mutex> guard(monLock);
ptr = allMonitors;
while (ptr)
{
if (ptr->active)
this_unit.foreach_monitor([](Monitor* monitor) {
if (monitor->active)
{
monitor_start(ptr, ptr->parameters);
monitor_start(monitor, monitor->parameters);
}
ptr = ptr->next;
}
return true;
});
}
/**
@ -307,8 +323,9 @@ void monitor_stop(Monitor* monitor)
void monitor_deactivate(Monitor* monitor)
{
std::lock_guard<std::mutex> guard(monLock);
monitor->active = false;
this_unit.run_behind_lock([monitor](){
monitor->active = false;
});
}
/**
@ -316,15 +333,13 @@ void monitor_deactivate(Monitor* monitor)
*/
void monitor_stop_all()
{
std::lock_guard<std::mutex> guard(monLock);
for (Monitor* monitor = allMonitors; monitor; monitor = monitor->next)
{
this_unit.foreach_monitor([](Monitor* monitor) {
if (monitor->active)
{
monitor_stop(monitor);
}
}
return true;
});
}
/**
@ -486,15 +501,13 @@ void monitor_add_user(Monitor* mon, const char* user, const char* passwd)
*/
void monitor_show_all(DCB* dcb)
{
std::lock_guard<std::mutex> guard(monLock);
for (Monitor* ptr = allMonitors; ptr; ptr = ptr->next)
{
if (ptr->active)
this_unit.foreach_monitor([dcb](Monitor* monitor) {
if (monitor->active)
{
monitor_show(dcb, ptr);
monitor_show(dcb, monitor);
}
}
return true;
});
}
/**
@ -543,15 +556,11 @@ void monitor_show(DCB* dcb, Monitor* monitor)
*/
void monitor_list(DCB* dcb)
{
Monitor* ptr;
std::lock_guard<std::mutex> guard(monLock);
ptr = allMonitors;
dcb_printf(dcb, "---------------------+---------------------\n");
dcb_printf(dcb, "%-20s | Status\n", "Monitor");
dcb_printf(dcb, "---------------------+---------------------\n");
while (ptr)
{
this_unit.foreach_monitor([dcb](Monitor* ptr){
if (ptr->active)
{
dcb_printf(dcb,
@ -560,8 +569,9 @@ void monitor_list(DCB* dcb)
ptr->state & MONITOR_STATE_RUNNING ?
"Running" : "Stopped");
}
ptr = ptr->next;
}
return true;
});
dcb_printf(dcb, "---------------------+---------------------\n");
}
@ -573,17 +583,15 @@ void monitor_list(DCB* dcb)
*/
Monitor* monitor_find(const char* name)
{
std::lock_guard<std::mutex> guard(monLock);
for (Monitor* ptr = allMonitors; ptr; ptr = ptr->next)
{
Monitor* rval = nullptr;
this_unit.foreach_monitor([&rval, name](Monitor* ptr) {
if (!strcmp(ptr->name, name) && ptr->active)
{
return ptr;
rval = ptr;
}
}
return nullptr;
return (rval == nullptr);
});
return rval;
}
/**
* Find a destroyed monitor by name
@ -594,18 +602,15 @@ Monitor* monitor_find(const char* name)
Monitor* monitor_repurpose_destroyed(const char* name, const char* module)
{
Monitor* rval = NULL;
std::lock_guard<std::mutex> guard(monLock);
for (Monitor* ptr = allMonitors; ptr; ptr = ptr->next)
{
if (strcmp(ptr->name, name) == 0 && (ptr->module_name == module))
this_unit.foreach_monitor([&rval, name, module](Monitor* monitor) {
if (strcmp(monitor->name, name) == 0 && (monitor->module_name == module))
{
mxb_assert(!ptr->active);
ptr->active = true;
rval = ptr;
mxb_assert(!monitor->active);
monitor->active = true;
rval = monitor;
}
}
return (rval == nullptr);
});
return rval;
}
@ -690,14 +695,11 @@ bool monitor_set_network_timeout(Monitor* mon, int type, int value, const char*
std::unique_ptr<ResultSet> monitor_get_list()
{
std::unique_ptr<ResultSet> set = ResultSet::create({"Monitor", "Status"});
std::lock_guard<std::mutex> guard(monLock);
for (Monitor* ptr = allMonitors; ptr; ptr = ptr->next)
{
this_unit.foreach_monitor([&set](Monitor* ptr) {
const char* state = ptr->state & MONITOR_STATE_RUNNING ? "Running" : "Stopped";
set->add_row({ptr->name, state});
}
return true;
});
return set;
}
@ -1484,24 +1486,22 @@ static void mon_log_state_change(MXS_MONITORED_SERVER* ptr)
Monitor* monitor_server_in_use(const SERVER* server)
{
Monitor* rval = NULL;
std::lock_guard<std::mutex> guard(monLock);
for (Monitor* mon = allMonitors; mon && !rval; mon = mon->next)
{
Guard guard(mon->lock);
if (mon->active)
Monitor* rval = nullptr;
this_unit.foreach_monitor([&rval, server](Monitor* monitor) {
Guard guard(monitor->lock);
if (monitor->active)
{
for (MXS_MONITORED_SERVER* db = mon->monitored_servers; db && !rval; db = db->next)
for (MXS_MONITORED_SERVER* db = monitor->monitored_servers; db; db = db->next)
{
if (db->server == server)
{
rval = mon;
rval = monitor;
break;
}
}
}
}
return (rval == nullptr);
});
return rval;
}
@ -1776,11 +1776,7 @@ json_t* monitor_to_json(const Monitor* monitor, const char* host)
json_t* monitor_list_to_json(const char* host)
{
json_t* rval = json_array();
std::unique_lock<std::mutex> guard(monLock);
for (Monitor* mon = allMonitors; mon; mon = mon->next)
{
this_unit.foreach_monitor([rval, host](Monitor* mon) {
if (mon->active)
{
json_t* json = monitor_json_data(mon, host);
@ -1790,9 +1786,8 @@ json_t* monitor_list_to_json(const char* host)
json_array_append_new(rval, json);
}
}
}
guard.unlock();
return true;
});
return mxs_json_resource(host, MXS_JSON_API_MONITORS, rval);
}
@ -1800,10 +1795,7 @@ json_t* monitor_list_to_json(const char* host)
json_t* monitor_relations_to_server(const SERVER* server, const char* host)
{
std::vector<std::string> names;
std::unique_lock<std::mutex> guard(monLock);
for (Monitor* mon = allMonitors; mon; mon = mon->next)
{
this_unit.foreach_monitor([&names, server](Monitor* mon) {
Guard guard(mon->lock);
if (mon->active)
{
@ -1816,12 +1808,10 @@ json_t* monitor_relations_to_server(const SERVER* server, const char* host)
}
}
}
}
guard.unlock();
return true;
});
json_t* rel = NULL;
if (!names.empty())
{
rel = mxs_json_relationship(host, MXS_JSON_API_MONITORS);
@ -2399,29 +2389,32 @@ bool monitor_set_disk_space_threshold(Monitor* monitor, const char* disk_space_t
void monitor_debug_wait()
{
using namespace std::chrono;
std::lock_guard<std::mutex> guard(monLock);
std::map<Monitor*, uint64_t> ticks;
// Get tick values for all monitors
for (Monitor* mon = allMonitors; mon; mon = mon->next)
{
this_unit.foreach_monitor([&ticks](Monitor* mon) {
ticks[mon] = mxb::atomic::load(&mon->ticks);
}
return true;
});
// Wait for all running monitors to advance at least one tick
for (Monitor* mon = allMonitors; mon; mon = mon->next)
{
// Wait for all running monitors to advance at least one tick.
this_unit.foreach_monitor([&ticks](Monitor* mon) {
if (mon->state == MONITOR_STATE_RUNNING)
{
auto start = steady_clock::now();
while (ticks[mon] == mxb::atomic::load(&mon->ticks)
&& steady_clock::now() - start < seconds(60))
// A monitor may have been added in between the two foreach-calls (not if config changes are
// serialized). Check if entry exists.
if (ticks.count(mon) > 0)
{
std::this_thread::sleep_for(milliseconds(100));
auto tick = ticks[mon];
while (mxb::atomic::load(&mon->ticks) == tick && (steady_clock::now() - start < seconds(60)))
{
std::this_thread::sleep_for(milliseconds(100));
}
}
}
}
return true;
});
}
namespace maxscale