Move execute_worker_task to MonitorInstance

The function is rather general and may of use to other monitor modules.
This commit is contained in:
Esa Korhonen 2018-07-23 14:35:42 +03:00
parent 27084f1368
commit b421e56d1c
4 changed files with 56 additions and 36 deletions

View File

@ -22,7 +22,7 @@ namespace maxscale
{
class MonitorInstance : public MXS_MONITOR_INSTANCE
, protected maxscale::Worker
, private maxscale::Worker
{
public:
MonitorInstance(const MonitorInstance&) = delete;
@ -112,6 +112,8 @@ public:
virtual json_t* diagnostics_json() const;
protected:
typedef std::function<void ()> GenericFunction;
MonitorInstance(MXS_MONITOR* pMonitor);
const std::string& script() const { return m_script; }
@ -208,6 +210,16 @@ protected:
*/
virtual void process_state_changes();
/**
* Execute a task in the worker thread of this monitor.
*
* @param func The task which should be executed, wrapped in a function object.
* @param mode Execution mode. If EXECUTE_AUTO, the function will only return once the task has
* been executed. Otherwise, the task will be queued and the function returns immediately.
* @return True, if task was sent to the worker
*/
bool execute_worker_task(GenericFunction func, execute_mode_t mode = Worker::EXECUTE_AUTO);
MXS_MONITOR* m_monitor; /**< The generic monitor structure. */
MXS_MONITORED_SERVER* m_master; /**< Master server */

View File

@ -3013,4 +3013,45 @@ void MonitorInstance::run_one_tick()
store_server_journal(m_monitor, m_master);
}
bool MonitorInstance::execute_worker_task(GenericFunction func, execute_mode_t mode)
{
/* The worker message system works on objects of class Task, each representing a different action.
* Let's use a function object inside a task to construct a generic action. */
class CustomTask : public maxscale::Worker::Task
{
public:
CustomTask(GenericFunction func)
: m_func(func)
{}
private:
GenericFunction m_func;
void execute(maxscale::Worker& worker)
{
m_func();
delete this; // Ok, since this object is not touched afterwards.
}
};
CustomTask* task = new (std::nothrow) CustomTask(func);
bool sent = false;
if (mode == Worker::EXECUTE_AUTO)
{
maxscale::Semaphore done(0);
/* Although the current method is being ran in the admin thread, 'post' sends the task to the
* worker thread of "this". */
sent = post(task, &done, mode);
if (sent)
{
done.wait();
}
}
else
{
sent = post(task, NULL, mode);
}
return sent;
}
}

View File

@ -234,6 +234,7 @@ void MariaDBMonitor::diagnostics(DCB *dcb) const
* thread and not the admin thread. Because the diagnostic must be printable even when the monitor is
* not running, the printing must be done outside the normal loop. */
ss_dassert(mxs_rworker_get_current() == mxs_rworker_get(MXS_RWORKER_MAIN));
/* The 'dcb' is owned by the admin thread (the thread executing this function), and probably
* should not be written to by any other thread. To prevent this, have the monitor thread
* print the diagnostics to a string. */
@ -283,6 +284,7 @@ string MariaDBMonitor::diagnostics_to_string() const
json_t* MariaDBMonitor::diagnostics_json() const
{
ss_dassert(mxs_rworker_get_current() == mxs_rworker_get(MXS_RWORKER_MAIN));
json_t* rval = NULL;
MariaDBMonitor* mutable_ptr = const_cast<MariaDBMonitor*>(this);
bool func_ran = mutable_ptr->execute_worker_task([this, &rval]
@ -335,39 +337,6 @@ json_t* MariaDBMonitor::diagnostics_to_json() const
return rval;
}
bool MariaDBMonitor::execute_worker_task(GenericFunction func)
{
ss_dassert(mxs_rworker_get_current() == mxs_rworker_get(MXS_RWORKER_MAIN));
/* The worker message system works on objects of class Task, each representing a different action.
* Let's use a function object inside a task to construct a generic action. */
class CustomTask : public maxscale::Worker::Task
{
public:
CustomTask(GenericFunction func)
: m_func(func)
{}
private:
GenericFunction m_func;
void execute(maxscale::Worker& worker)
{
m_func();
}
};
CustomTask task(func);
maxscale::Semaphore done(0);
/* Although the current method is being ran in the admin thread, 'post' sends the task to the
* worker thread of "this". "task" is a stack parameter, so need to always wait for completion
* even if there were no results to process. */
bool sent = post(&task, &done, Worker::EXECUTE_AUTO);
if (sent)
{
done.wait();
}
return sent;
}
/**
* Connect to and query/update a server.
*

View File

@ -104,7 +104,6 @@ protected:
void process_state_changes();
private:
typedef std::function<void ()> GenericFunction;
struct CycleInfo
{
@ -192,7 +191,6 @@ private:
MariaDBServer* get_server_info(MXS_MONITORED_SERVER* db);
MariaDBServer* get_server(int64_t id);
bool execute_manual_command(GenericFunction command, json_t** error_out);
bool execute_worker_task(GenericFunction func);
std::string diagnostics_to_string() const;
json_t* diagnostics_to_json() const;