diff --git a/include/maxscale/monitor.hh b/include/maxscale/monitor.hh index ddc42ed72..9dfe71ee3 100644 --- a/include/maxscale/monitor.hh +++ b/include/maxscale/monitor.hh @@ -16,11 +16,13 @@ #include #include #include +#include namespace maxscale { class MonitorInstance : public MXS_MONITOR_INSTANCE + , private maxscale::Worker { public: MonitorInstance(const MonitorInstance&) = delete; @@ -210,17 +212,19 @@ protected: MXS_MONITORED_SERVER* m_master; /**< Master server */ private: - int32_t m_state; /**< The current state of the monitor. */ - THREAD m_thread; /**< The thread handle of the monitoring thread. */ - int32_t m_shutdown; /**< Non-zero if the monitor should shut down. */ - bool m_checked; /**< Whether server access has been checked. */ - std::string m_script; /**< Launchable script. */ - uint64_t m_events; /**< Enabled monitor events. */ - Semaphore m_semaphore; /**< Semaphore for synchronizing with monitor thread. */ + int32_t m_state; /**< The current state of the monitor. */ + int32_t m_shutdown; /**< Non-zero if the monitor should shut down. */ + bool m_checked; /**< Whether server access has been checked. */ + std::string m_script; /**< Launchable script. */ + uint64_t m_events; /**< Enabled monitor events. */ + Semaphore m_semaphore; /**< Semaphore for synchronizing with monitor thread. */ + int64_t m_loop_called; /**< When was the loop called the last time. */ - void main(); - static void main(void* pArg); - void sleep_until_next_tick(int64_t tick_start_ms); + bool pre_run() final; + void post_run() final; + + bool call_run_one_tick(Worker::Call::action_t action); + void run_one_tick(); }; class MonitorInstanceSimple : public MonitorInstance diff --git a/server/core/gateway.cc b/server/core/gateway.cc index 4266d5e80..44a5f3ddb 100644 --- a/server/core/gateway.cc +++ b/server/core/gateway.cc @@ -1927,6 +1927,20 @@ int main(int argc, char **argv) MXS_NOTICE("Module directory: %s", get_libdir()); MXS_NOTICE("Service cache: %s", get_cachedir()); + if (!MessageQueue::init()) + { + MXS_ERROR("Failed to initialize message queue."); + rc = MAXSCALE_INTERNALERROR; + goto return_main; + } + + if (!Worker::init()) + { + MXS_ERROR("Failed to initialize workers."); + rc = MAXSCALE_INTERNALERROR; + goto return_main; + } + if (!config_load(cnf_file_path)) { const char* fprerr = @@ -2013,20 +2027,6 @@ int main(int argc, char **argv) goto return_main; } - if (!MessageQueue::init()) - { - MXS_ERROR("Failed to initialize message queue."); - rc = MAXSCALE_INTERNALERROR; - goto return_main; - } - - if (!Worker::init()) - { - MXS_ERROR("Failed to initialize workers."); - rc = MAXSCALE_INTERNALERROR; - goto return_main; - } - if (!RoutingWorker::init()) { MXS_ERROR("Failed to initialize routing workers."); diff --git a/server/core/monitor.cc b/server/core/monitor.cc index cc48a7071..66eb3d8bd 100644 --- a/server/core/monitor.cc +++ b/server/core/monitor.cc @@ -2513,16 +2513,15 @@ MonitorInstance::MonitorInstance(MXS_MONITOR* pMonitor) : m_monitor(pMonitor) , m_master(NULL) , m_state(MXS_MONITOR_STOPPED) - , m_thread(0) , m_shutdown(0) , m_checked(false) , m_events(0) + , m_loop_called(0) { } MonitorInstance::~MonitorInstance() { - ss_dassert(!m_thread); } int32_t MonitorInstance::state() const @@ -2533,18 +2532,14 @@ int32_t MonitorInstance::state() const void MonitorInstance::stop() { // This is always called in single-thread context. - ss_dassert(m_thread); ss_dassert(m_state == MXS_MONITOR_RUNNING); if (state() == MXS_MONITOR_RUNNING) { atomic_store_int32(&m_state, MXS_MONITOR_STOPPING); - atomic_store_int32(&m_shutdown, 1); - thread_wait(m_thread); + Worker::shutdown(); + Worker::join(); atomic_store_int32(&m_state, MXS_MONITOR_STOPPED); - - m_thread = 0; - m_shutdown = 0; } else { @@ -2593,8 +2588,7 @@ bool MonitorInstance::start(const MXS_CONFIG_PARAMETER* pParams) { bool started = false; - ss_dassert(!m_shutdown); - ss_dassert(!m_thread); + ss_dassert(Worker::state() == Worker::STOPPED); ss_dassert(m_state == MXS_MONITOR_STOPPED); if (state() == MXS_MONITOR_STOPPED) @@ -2619,9 +2613,11 @@ bool MonitorInstance::start(const MXS_CONFIG_PARAMETER* pParams) if (configure(pParams)) { - if (thread_start(&m_thread, &maxscale::MonitorInstance::main, this, 0) == NULL) + m_loop_called = 0; + + if (!Worker::start()) { - MXS_ERROR("Failed to start monitor thread for monitor '%s'.", m_monitor->name); + MXS_ERROR("Failed to start worker for monitor '%s'.", m_monitor->name); } else { @@ -2635,8 +2631,7 @@ bool MonitorInstance::start(const MXS_CONFIG_PARAMETER* pParams) { // Ok, so the initialization failed and the thread will exit. // We need to wait on it so that the thread resources will not leak. - thread_wait(m_thread); - m_thread = 0; + Worker::join(); } } } @@ -2926,76 +2921,81 @@ void MonitorInstance::process_state_changes() mon_process_state_changes(m_monitor, m_script.empty() ? NULL : m_script.c_str(), m_events); } -void MonitorInstance::main() +bool MonitorInstance::pre_run() { - load_server_journal(m_monitor, &m_master); - - pre_loop(); - - while (!m_shutdown) - { - /* Measure the time of monitor loop execution. */ - int64_t loop_start_ms = get_time_ms(); - - monitor_check_maintenance_requests(m_monitor); - - tick(); - atomic_add_uint64(&m_monitor->ticks, 1); - - flush_server_status(); - - process_state_changes(); - - mon_hangup_failed_servers(m_monitor); - store_server_journal(m_monitor, m_master); - sleep_until_next_tick(loop_start_ms); - } - - post_loop(); -} - -/** - * Sleep until the next monitor tick - * - * @param tick_start_ms When was the latest tick started - */ -void MonitorInstance::sleep_until_next_tick(int64_t tick_start_ms) -{ - // Check how much the monitor should sleep to get one full monitor interval. - int64_t sleep_time_remaining = m_monitor->interval - (get_time_ms() - tick_start_ms); - // Sleep at least one base interval. - sleep_time_remaining = MXS_MAX(MXS_MON_BASE_INTERVAL_MS, sleep_time_remaining); - /* Sleep in small increments to react fast to changes. */ - while (sleep_time_remaining > 0 && !should_shutdown() && - atomic_load_int(&m_monitor->check_maintenance_flag) == MAINTENANCE_FLAG_NOCHECK) - { - int small_sleep_ms = (sleep_time_remaining >= MXS_MON_BASE_INTERVAL_MS) ? - MXS_MON_BASE_INTERVAL_MS : sleep_time_remaining; - thread_millisleep(small_sleep_ms); - sleep_time_remaining -= small_sleep_ms; - } -} - -//static -void MonitorInstance::main(void* pArg) -{ - MonitorInstance* pThis = static_cast(pArg); + bool rv = false; if (mysql_thread_init() == 0) { - atomic_store_int32(&pThis->m_state, MXS_MONITOR_RUNNING); - pThis->m_semaphore.post(); + rv = true; - pThis->main(); + atomic_store_int32(&m_state, MXS_MONITOR_RUNNING); + m_semaphore.post(); - mysql_thread_end(); + load_server_journal(m_monitor, &m_master); + + pre_loop(); + + delayed_call(1, &MonitorInstance::call_run_one_tick, this); } else { MXS_ERROR("mysql_thread_init() failed for %s. The monitor cannot start.", - pThis->m_monitor->name); - pThis->m_semaphore.post(); + m_monitor->name); + m_semaphore.post(); } + + return rv; +} + +void MonitorInstance::post_run() +{ + post_loop(); + + mysql_thread_end(); +} + +bool MonitorInstance::call_run_one_tick(Worker::Call::action_t action) +{ + if (action == Worker::Call::EXECUTE) + { + int64_t now = get_time_ms(); + + if ((now - m_loop_called > static_cast(m_monitor->interval)) || + atomic_load_int(&m_monitor->check_maintenance_flag) == MAINTENANCE_FLAG_CHECK) + { + m_loop_called = now; + + run_one_tick(); + + now = get_time_ms(); + } + + int64_t ms_to_next_call = m_monitor->interval - (now - m_loop_called); + // ms_to_next_call will be negative, if the run_one_tick() call took + // longer than one monitor interval. + int64_t delay = ((ms_to_next_call <= 0) || (ms_to_next_call >= MXS_MON_BASE_INTERVAL_MS)) ? + MXS_MON_BASE_INTERVAL_MS : ms_to_next_call; + + delayed_call(delay, &MonitorInstance::call_run_one_tick, this); + } + + return false; +} + +void MonitorInstance::run_one_tick() +{ + monitor_check_maintenance_requests(m_monitor); + + tick(); + atomic_add_uint64(&m_monitor->ticks, 1); + + flush_server_status(); + + process_state_changes(); + + mon_hangup_failed_servers(m_monitor); + store_server_journal(m_monitor, m_master); } }