Move execute_worker_task into mxs::Worker

The function has use outside of the monitors as it makes execution of
worker tasks much more convenient. Currently, this change only moves the
code and takes it into use: there should be no functional changes.
This commit is contained in:
Markus Mäkelä
2018-08-02 08:21:10 +03:00
parent 107395f608
commit d412b8d729
5 changed files with 91 additions and 68 deletions

View File

@ -22,7 +22,7 @@ namespace maxscale
{ {
class MonitorInstance : public MXS_MONITOR_INSTANCE class MonitorInstance : public MXS_MONITOR_INSTANCE
, private maxscale::Worker , protected maxscale::Worker
{ {
public: public:
MonitorInstance(const MonitorInstance&) = delete; MonitorInstance(const MonitorInstance&) = delete;
@ -112,8 +112,6 @@ public:
virtual json_t* diagnostics_json() const; virtual json_t* diagnostics_json() const;
protected: protected:
typedef std::function<void ()> GenericFunction;
MonitorInstance(MXS_MONITOR* pMonitor); MonitorInstance(MXS_MONITOR* pMonitor);
const std::string& script() const { return m_script; } const std::string& script() const { return m_script; }
@ -210,16 +208,6 @@ protected:
*/ */
virtual void process_state_changes(); virtual void process_state_changes();
/**
* Execute a task in the worker thread of this monitor.
*
* @param func The task which should be executed, wrapped in a function object.
* @param mode Execution mode. If EXECUTE_AUTO, the function will only return once the task has
* been executed. Otherwise, the task will be queued and the function returns immediately.
* @return True, if task was sent to the worker
*/
bool execute_worker_task(GenericFunction func, execute_mode_t mode = Worker::EXECUTE_AUTO);
MXS_MONITOR* m_monitor; /**< The generic monitor structure. */ MXS_MONITOR* m_monitor; /**< The generic monitor structure. */
MXS_MONITORED_SERVER* m_master; /**< Master server */ MXS_MONITORED_SERVER* m_master; /**< Master server */

View File

@ -13,16 +13,20 @@
*/ */
#include <maxscale/cppdefs.hh> #include <maxscale/cppdefs.hh>
#include <functional>
#include <map> #include <map>
#include <unordered_set>
#include <memory> #include <memory>
#include <unordered_set>
#include <maxscale/platform.h> #include <maxscale/platform.h>
#include <maxscale/session.h> #include <maxscale/session.h>
#include <maxscale/utils.hh> #include <maxscale/utils.hh>
#include <maxscale/worker.h> #include <maxscale/worker.h>
#include "messagequeue.hh"
#include <maxscale/workertask.hh> #include <maxscale/workertask.hh>
#include "messagequeue.hh"
namespace maxscale namespace maxscale
{ {
@ -492,6 +496,7 @@ public:
typedef WorkerDisposableTask DisposableTask; typedef WorkerDisposableTask DisposableTask;
typedef WorkerLoad Load; typedef WorkerLoad Load;
typedef WorkerTimer Timer; typedef WorkerTimer Timer;
typedef std::function<void ()> GenericFunction;
/** /**
* A delegating timer that delegates the timer tick handling * A delegating timer that delegates the timer tick handling
@ -745,6 +750,29 @@ public:
return post(std::auto_ptr<DisposableTask>(sTask.release()), mode); return post(std::auto_ptr<DisposableTask>(sTask.release()), mode);
} }
/**
* Execute a funcion in a worker
*
* @param func The function to call
* @param pSem If non-NULL, will be posted once the task's `execute` return.
* @param mode Execution mode
*
* @return True, if task was posted to the worker
*/
bool post(GenericFunction func, Semaphore* pSem, enum execute_mode_t mode);
/**
* Execute function on worker
*
* This is a convenience wrapper of `post` with automatic waiting on the
* semaphore.
*
* @param func Function to execute
*
* @return True if function was executed on the worker
*/
bool execute(GenericFunction func);
/** /**
* Post a message to a worker. * Post a message to a worker.
* *

View File

@ -2968,45 +2968,4 @@ void MonitorInstance::run_one_tick()
store_server_journal(m_monitor, m_master); store_server_journal(m_monitor, m_master);
} }
bool MonitorInstance::execute_worker_task(GenericFunction func, execute_mode_t mode)
{
/* The worker message system works on objects of class Task, each representing a different action.
* Let's use a function object inside a task to construct a generic action. */
class CustomTask : public maxscale::Worker::Task
{
public:
CustomTask(GenericFunction func)
: m_func(func)
{}
private:
GenericFunction m_func;
void execute(maxscale::Worker& worker)
{
m_func();
delete this; // Ok, since this object is not touched afterwards.
}
};
CustomTask* task = new (std::nothrow) CustomTask(func);
bool sent = false;
if (mode == Worker::EXECUTE_AUTO)
{
maxscale::Semaphore done(0);
/* Although the current method is being ran in the admin thread, 'post' sends the task to the
* worker thread of "this". */
sent = post(task, &done, mode);
if (sent)
{
done.wait();
}
}
else
{
sent = post(task, NULL, mode);
}
return sent;
}
} }

View File

@ -477,6 +477,51 @@ bool Worker::post_disposable(DisposableTask* pTask, enum execute_mode_t mode)
return posted; return posted;
} }
bool Worker::post(GenericFunction func, Semaphore* pSem, execute_mode_t mode)
{
class CustomTask : public maxscale::WorkerTask
{
public:
CustomTask(GenericFunction func)
: m_func(func)
{
}
private:
GenericFunction m_func;
void execute(maxscale::Worker& worker)
{
m_func();
// The task needs to delete itself only after the task has been executed
delete this;
}
};
bool rval = false;
CustomTask* task = new (std::nothrow) CustomTask(func);
if (task)
{
if (!(rval = post(task, pSem, mode)))
{
// Posting the task failed, it needs to be deleted now
delete task;
}
}
return rval;
}
bool Worker::execute(GenericFunction func)
{
Semaphore sem;
return post(func, &sem, EXECUTE_AUTO) && sem.wait();
}
bool Worker::post_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2) bool Worker::post_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2)
{ {
// NOTE: No logging here, this function must be signal safe. // NOTE: No logging here, this function must be signal safe.

View File

@ -239,17 +239,19 @@ void MariaDBMonitor::diagnostics(DCB *dcb) const
* should not be written to by any other thread. To prevent this, have the monitor thread * should not be written to by any other thread. To prevent this, have the monitor thread
* print the diagnostics to a string. */ * print the diagnostics to a string. */
string diag_str; string diag_str;
// 'execute_worker_task' is not a const method, although the task we are sending is.
// 'execute' is not a const method, although the task we are sending is.
MariaDBMonitor* mutable_ptr = const_cast<MariaDBMonitor*>(this); MariaDBMonitor* mutable_ptr = const_cast<MariaDBMonitor*>(this);
bool func_ran = mutable_ptr->execute_worker_task([this, &diag_str] auto func = [this, &diag_str]
{ {
diag_str = diagnostics_to_string(); diag_str = diagnostics_to_string();
}); };
if (!func_ran) if (!mutable_ptr->execute(func))
{ {
diag_str = DIAG_ERROR; diag_str = DIAG_ERROR;
} }
dcb_printf(dcb, "%s", diag_str.c_str()); dcb_printf(dcb, "%s", diag_str.c_str());
} }
@ -287,15 +289,16 @@ json_t* MariaDBMonitor::diagnostics_json() const
ss_dassert(mxs_rworker_get_current() == mxs_rworker_get(MXS_RWORKER_MAIN)); ss_dassert(mxs_rworker_get_current() == mxs_rworker_get(MXS_RWORKER_MAIN));
json_t* rval = NULL; json_t* rval = NULL;
MariaDBMonitor* mutable_ptr = const_cast<MariaDBMonitor*>(this); MariaDBMonitor* mutable_ptr = const_cast<MariaDBMonitor*>(this);
bool func_ran = mutable_ptr->execute_worker_task([this, &rval] auto func = [this, &rval]
{ {
rval = diagnostics_to_json(); rval = diagnostics_to_json();
}); };
if (!func_ran) if (!mutable_ptr->execute(func))
{ {
rval = mxs_json_error_append(rval, "%s", DIAG_ERROR); rval = mxs_json_error_append(rval, "%s", DIAG_ERROR);
} }
return rval; return rval;
} }