diff --git a/include/maxscale/monitor.hh b/include/maxscale/monitor.hh index ea35c12b0..4afa15933 100644 --- a/include/maxscale/monitor.hh +++ b/include/maxscale/monitor.hh @@ -22,7 +22,7 @@ namespace maxscale { class MonitorInstance : public MXS_MONITOR_INSTANCE - , private maxscale::Worker + , protected maxscale::Worker { public: MonitorInstance(const MonitorInstance&) = delete; @@ -112,8 +112,6 @@ public: virtual json_t* diagnostics_json() const; protected: - typedef std::function GenericFunction; - MonitorInstance(MXS_MONITOR* pMonitor); const std::string& script() const { return m_script; } @@ -210,16 +208,6 @@ protected: */ virtual void process_state_changes(); - /** - * Execute a task in the worker thread of this monitor. - * - * @param func The task which should be executed, wrapped in a function object. - * @param mode Execution mode. If EXECUTE_AUTO, the function will only return once the task has - * been executed. Otherwise, the task will be queued and the function returns immediately. - * @return True, if task was sent to the worker - */ - bool execute_worker_task(GenericFunction func, execute_mode_t mode = Worker::EXECUTE_AUTO); - MXS_MONITOR* m_monitor; /**< The generic monitor structure. */ MXS_MONITORED_SERVER* m_master; /**< Master server */ diff --git a/include/maxscale/worker.hh b/include/maxscale/worker.hh index b86a0070c..dace5ffa3 100644 --- a/include/maxscale/worker.hh +++ b/include/maxscale/worker.hh @@ -13,16 +13,20 @@ */ #include + +#include #include -#include #include +#include + #include #include #include #include -#include "messagequeue.hh" #include +#include "messagequeue.hh" + namespace maxscale { @@ -487,11 +491,12 @@ class Worker : public MXS_WORKER Worker& operator = (const Worker&) = delete; public: - typedef WORKER_STATISTICS STATISTICS; - typedef WorkerTask Task; - typedef WorkerDisposableTask DisposableTask; - typedef WorkerLoad Load; - typedef WorkerTimer Timer; + typedef WORKER_STATISTICS STATISTICS; + typedef WorkerTask Task; + typedef WorkerDisposableTask DisposableTask; + typedef WorkerLoad Load; + typedef WorkerTimer Timer; + typedef std::function GenericFunction; /** * A delegating timer that delegates the timer tick handling @@ -745,6 +750,29 @@ public: return post(std::auto_ptr(sTask.release()), mode); } + /** + * Execute a funcion in a worker + * + * @param func The function to call + * @param pSem If non-NULL, will be posted once the task's `execute` return. + * @param mode Execution mode + * + * @return True, if task was posted to the worker + */ + bool post(GenericFunction func, Semaphore* pSem, enum execute_mode_t mode); + + /** + * Execute function on worker + * + * This is a convenience wrapper of `post` with automatic waiting on the + * semaphore. + * + * @param func Function to execute + * + * @return True if function was executed on the worker + */ + bool execute(GenericFunction func); + /** * Post a message to a worker. * diff --git a/server/core/monitor.cc b/server/core/monitor.cc index 30731e224..c717128cd 100644 --- a/server/core/monitor.cc +++ b/server/core/monitor.cc @@ -2968,45 +2968,4 @@ void MonitorInstance::run_one_tick() store_server_journal(m_monitor, m_master); } -bool MonitorInstance::execute_worker_task(GenericFunction func, execute_mode_t mode) -{ - /* The worker message system works on objects of class Task, each representing a different action. - * Let's use a function object inside a task to construct a generic action. */ - class CustomTask : public maxscale::Worker::Task - { - public: - CustomTask(GenericFunction func) - : m_func(func) - {} - - private: - GenericFunction m_func; - void execute(maxscale::Worker& worker) - { - m_func(); - delete this; // Ok, since this object is not touched afterwards. - } - }; - - CustomTask* task = new (std::nothrow) CustomTask(func); - bool sent = false; - - if (mode == Worker::EXECUTE_AUTO) - { - maxscale::Semaphore done(0); - /* Although the current method is being ran in the admin thread, 'post' sends the task to the - * worker thread of "this". */ - sent = post(task, &done, mode); - if (sent) - { - done.wait(); - } - } - else - { - sent = post(task, NULL, mode); - } - return sent; -} - } diff --git a/server/core/worker.cc b/server/core/worker.cc index 61cb58201..9e8244a70 100644 --- a/server/core/worker.cc +++ b/server/core/worker.cc @@ -477,6 +477,51 @@ bool Worker::post_disposable(DisposableTask* pTask, enum execute_mode_t mode) return posted; } +bool Worker::post(GenericFunction func, Semaphore* pSem, execute_mode_t mode) +{ + + class CustomTask : public maxscale::WorkerTask + { + public: + + CustomTask(GenericFunction func) + : m_func(func) + { + } + + private: + GenericFunction m_func; + + void execute(maxscale::Worker& worker) + { + m_func(); + + // The task needs to delete itself only after the task has been executed + delete this; + } + }; + + bool rval = false; + CustomTask* task = new (std::nothrow) CustomTask(func); + + if (task) + { + if (!(rval = post(task, pSem, mode))) + { + // Posting the task failed, it needs to be deleted now + delete task; + } + } + + return rval; +} + +bool Worker::execute(GenericFunction func) +{ + Semaphore sem; + return post(func, &sem, EXECUTE_AUTO) && sem.wait(); +} + bool Worker::post_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2) { // NOTE: No logging here, this function must be signal safe. diff --git a/server/modules/monitor/mariadbmon/mariadbmon.cc b/server/modules/monitor/mariadbmon/mariadbmon.cc index a9b1a3b11..6a4bed111 100644 --- a/server/modules/monitor/mariadbmon/mariadbmon.cc +++ b/server/modules/monitor/mariadbmon/mariadbmon.cc @@ -239,17 +239,19 @@ void MariaDBMonitor::diagnostics(DCB *dcb) const * should not be written to by any other thread. To prevent this, have the monitor thread * print the diagnostics to a string. */ string diag_str; - // 'execute_worker_task' is not a const method, although the task we are sending is. + + // 'execute' is not a const method, although the task we are sending is. MariaDBMonitor* mutable_ptr = const_cast(this); - bool func_ran = mutable_ptr->execute_worker_task([this, &diag_str] + auto func = [this, &diag_str] { diag_str = diagnostics_to_string(); - }); + }; - if (!func_ran) + if (!mutable_ptr->execute(func)) { diag_str = DIAG_ERROR; } + dcb_printf(dcb, "%s", diag_str.c_str()); } @@ -287,15 +289,16 @@ json_t* MariaDBMonitor::diagnostics_json() const ss_dassert(mxs_rworker_get_current() == mxs_rworker_get(MXS_RWORKER_MAIN)); json_t* rval = NULL; MariaDBMonitor* mutable_ptr = const_cast(this); - bool func_ran = mutable_ptr->execute_worker_task([this, &rval] + auto func = [this, &rval] { rval = diagnostics_to_json(); - }); + }; - if (!func_ran) + if (!mutable_ptr->execute(func)) { rval = mxs_json_error_append(rval, "%s", DIAG_ERROR); } + return rval; }