MXS-1915 Move Monitors on top of mxs::Worker

Monitors are now workers, so the path for making all interaction
between MaxScale proper and the monitors message based is now
open.
This commit is contained in:
Johan Wikman
2018-06-21 15:11:09 +03:00
parent 8ea7d8898a
commit 1eddb29d91
3 changed files with 101 additions and 97 deletions

View File

@ -1927,6 +1927,20 @@ int main(int argc, char **argv)
MXS_NOTICE("Module directory: %s", get_libdir());
MXS_NOTICE("Service cache: %s", get_cachedir());
if (!MessageQueue::init())
{
MXS_ERROR("Failed to initialize message queue.");
rc = MAXSCALE_INTERNALERROR;
goto return_main;
}
if (!Worker::init())
{
MXS_ERROR("Failed to initialize workers.");
rc = MAXSCALE_INTERNALERROR;
goto return_main;
}
if (!config_load(cnf_file_path))
{
const char* fprerr =
@ -2013,20 +2027,6 @@ int main(int argc, char **argv)
goto return_main;
}
if (!MessageQueue::init())
{
MXS_ERROR("Failed to initialize message queue.");
rc = MAXSCALE_INTERNALERROR;
goto return_main;
}
if (!Worker::init())
{
MXS_ERROR("Failed to initialize workers.");
rc = MAXSCALE_INTERNALERROR;
goto return_main;
}
if (!RoutingWorker::init())
{
MXS_ERROR("Failed to initialize routing workers.");

View File

@ -2513,16 +2513,15 @@ MonitorInstance::MonitorInstance(MXS_MONITOR* pMonitor)
: m_monitor(pMonitor)
, m_master(NULL)
, m_state(MXS_MONITOR_STOPPED)
, m_thread(0)
, m_shutdown(0)
, m_checked(false)
, m_events(0)
, m_loop_called(0)
{
}
MonitorInstance::~MonitorInstance()
{
ss_dassert(!m_thread);
}
int32_t MonitorInstance::state() const
@ -2533,18 +2532,14 @@ int32_t MonitorInstance::state() const
void MonitorInstance::stop()
{
// This is always called in single-thread context.
ss_dassert(m_thread);
ss_dassert(m_state == MXS_MONITOR_RUNNING);
if (state() == MXS_MONITOR_RUNNING)
{
atomic_store_int32(&m_state, MXS_MONITOR_STOPPING);
atomic_store_int32(&m_shutdown, 1);
thread_wait(m_thread);
Worker::shutdown();
Worker::join();
atomic_store_int32(&m_state, MXS_MONITOR_STOPPED);
m_thread = 0;
m_shutdown = 0;
}
else
{
@ -2593,8 +2588,7 @@ bool MonitorInstance::start(const MXS_CONFIG_PARAMETER* pParams)
{
bool started = false;
ss_dassert(!m_shutdown);
ss_dassert(!m_thread);
ss_dassert(Worker::state() == Worker::STOPPED);
ss_dassert(m_state == MXS_MONITOR_STOPPED);
if (state() == MXS_MONITOR_STOPPED)
@ -2619,9 +2613,11 @@ bool MonitorInstance::start(const MXS_CONFIG_PARAMETER* pParams)
if (configure(pParams))
{
if (thread_start(&m_thread, &maxscale::MonitorInstance::main, this, 0) == NULL)
m_loop_called = 0;
if (!Worker::start())
{
MXS_ERROR("Failed to start monitor thread for monitor '%s'.", m_monitor->name);
MXS_ERROR("Failed to start worker for monitor '%s'.", m_monitor->name);
}
else
{
@ -2635,8 +2631,7 @@ bool MonitorInstance::start(const MXS_CONFIG_PARAMETER* pParams)
{
// Ok, so the initialization failed and the thread will exit.
// We need to wait on it so that the thread resources will not leak.
thread_wait(m_thread);
m_thread = 0;
Worker::join();
}
}
}
@ -2926,76 +2921,81 @@ void MonitorInstance::process_state_changes()
mon_process_state_changes(m_monitor, m_script.empty() ? NULL : m_script.c_str(), m_events);
}
void MonitorInstance::main()
bool MonitorInstance::pre_run()
{
load_server_journal(m_monitor, &m_master);
pre_loop();
while (!m_shutdown)
{
/* Measure the time of monitor loop execution. */
int64_t loop_start_ms = get_time_ms();
monitor_check_maintenance_requests(m_monitor);
tick();
atomic_add_uint64(&m_monitor->ticks, 1);
flush_server_status();
process_state_changes();
mon_hangup_failed_servers(m_monitor);
store_server_journal(m_monitor, m_master);
sleep_until_next_tick(loop_start_ms);
}
post_loop();
}
/**
* Sleep until the next monitor tick
*
* @param tick_start_ms When was the latest tick started
*/
void MonitorInstance::sleep_until_next_tick(int64_t tick_start_ms)
{
// Check how much the monitor should sleep to get one full monitor interval.
int64_t sleep_time_remaining = m_monitor->interval - (get_time_ms() - tick_start_ms);
// Sleep at least one base interval.
sleep_time_remaining = MXS_MAX(MXS_MON_BASE_INTERVAL_MS, sleep_time_remaining);
/* Sleep in small increments to react fast to changes. */
while (sleep_time_remaining > 0 && !should_shutdown() &&
atomic_load_int(&m_monitor->check_maintenance_flag) == MAINTENANCE_FLAG_NOCHECK)
{
int small_sleep_ms = (sleep_time_remaining >= MXS_MON_BASE_INTERVAL_MS) ?
MXS_MON_BASE_INTERVAL_MS : sleep_time_remaining;
thread_millisleep(small_sleep_ms);
sleep_time_remaining -= small_sleep_ms;
}
}
//static
void MonitorInstance::main(void* pArg)
{
MonitorInstance* pThis = static_cast<MonitorInstance*>(pArg);
bool rv = false;
if (mysql_thread_init() == 0)
{
atomic_store_int32(&pThis->m_state, MXS_MONITOR_RUNNING);
pThis->m_semaphore.post();
rv = true;
pThis->main();
atomic_store_int32(&m_state, MXS_MONITOR_RUNNING);
m_semaphore.post();
mysql_thread_end();
load_server_journal(m_monitor, &m_master);
pre_loop();
delayed_call(1, &MonitorInstance::call_run_one_tick, this);
}
else
{
MXS_ERROR("mysql_thread_init() failed for %s. The monitor cannot start.",
pThis->m_monitor->name);
pThis->m_semaphore.post();
m_monitor->name);
m_semaphore.post();
}
return rv;
}
void MonitorInstance::post_run()
{
post_loop();
mysql_thread_end();
}
bool MonitorInstance::call_run_one_tick(Worker::Call::action_t action)
{
if (action == Worker::Call::EXECUTE)
{
int64_t now = get_time_ms();
if ((now - m_loop_called > static_cast<int64_t>(m_monitor->interval)) ||
atomic_load_int(&m_monitor->check_maintenance_flag) == MAINTENANCE_FLAG_CHECK)
{
m_loop_called = now;
run_one_tick();
now = get_time_ms();
}
int64_t ms_to_next_call = m_monitor->interval - (now - m_loop_called);
// ms_to_next_call will be negative, if the run_one_tick() call took
// longer than one monitor interval.
int64_t delay = ((ms_to_next_call <= 0) || (ms_to_next_call >= MXS_MON_BASE_INTERVAL_MS)) ?
MXS_MON_BASE_INTERVAL_MS : ms_to_next_call;
delayed_call(delay, &MonitorInstance::call_run_one_tick, this);
}
return false;
}
void MonitorInstance::run_one_tick()
{
monitor_check_maintenance_requests(m_monitor);
tick();
atomic_add_uint64(&m_monitor->ticks, 1);
flush_server_status();
process_state_changes();
mon_hangup_failed_servers(m_monitor);
store_server_journal(m_monitor, m_master);
}
}