Allow execution of tasks from within worker threads

It is now possible to define whether tasks are executed immediately or put
on the event queue of the worker thread. If task execution is in automatic
mode and the current executing thread is a worker thread, the
Task->execute method is called immediately.

This allows tasks to be posted from within worker threads. This is
intended to be used when purging of stale persistent connections and
printing diagnostic output via MaxAdmin. All of these actions are done
from within a worker thread.
This commit is contained in:
Markus Mäkelä 2017-04-28 23:29:20 +03:00
parent 6f24c04a4f
commit 4c6e0c75a5
2 changed files with 42 additions and 7 deletions

View File

@ -80,6 +80,12 @@ public:
ZPROCESSING
};
enum execute_mode_t
{
EXECUTE_AUTO, /**< Execute tasks immediately */
EXECUTE_QUEUED /**< Only queue tasks for execution */
};
/**
* Initialize the worker mechanism.
*
@ -258,6 +264,7 @@ public:
*
* @param pTask The task to be executed.
* @param pSem If non-NULL, will be posted once the task's `execute` return.
* @param mode Execution mode
*
* @return True if the task could be posted (i.e. not executed), false otherwise.
*
@ -283,6 +290,7 @@ public:
* Posts a task to a worker for execution.
*
* @param pTask The task to be executed.
* @param mode Execution mode
*
* @return True if the task could be posted (i.e. not executed), false otherwise.
*

View File

@ -554,7 +554,24 @@ bool Worker::post(Task* pTask, Semaphore* pSem)
intptr_t arg1 = reinterpret_cast<intptr_t>(pTask);
intptr_t arg2 = reinterpret_cast<intptr_t>(pSem);
return post_message(MXS_WORKER_MSG_TASK, arg1, arg2);
if (mode == Worker::EXECUTE_AUTO && Worker::get_current() == this)
{
pTask->execute(*this);
if (pSem)
{
pSem->post();
}
}
else
{
intptr_t arg1 = reinterpret_cast<intptr_t>(pTask);
intptr_t arg2 = reinterpret_cast<intptr_t>(pSem);
rval = post_message(MXS_WORKER_MSG_TASK, arg1, arg2);
}
return rval;
}
bool Worker::post(std::auto_ptr<DisposableTask> sTask)
@ -566,16 +583,26 @@ bool Worker::post(std::auto_ptr<DisposableTask> sTask)
// private
bool Worker::post_disposable(DisposableTask* pTask)
{
bool posted = true;
pTask->inc_ref();
intptr_t arg1 = reinterpret_cast<intptr_t>(pTask);
bool posted = post_message(MXS_WORKER_MSG_DISPOSABLE_TASK, arg1, 0);
if (!posted)
if (mode == Worker::EXECUTE_AUTO && Worker::get_current() == this)
{
pTask->execute(*this);
pTask->dec_ref();
}
else
{
intptr_t arg1 = reinterpret_cast<intptr_t>(pTask);
posted = post_message(MXS_WORKER_MSG_DISPOSABLE_TASK, arg1, 0);
if (!posted)
{
pTask->dec_ref();
}
}
return posted;
}
@ -859,7 +886,7 @@ void Worker::handle_message(MessageQueue& queue, const MessageQueue::Message& ms
case MXS_WORKER_MSG_CALL:
{
void (*f)(int, void*) = (void (*)(int,void*))msg.arg1();
void (*f)(int, void*) = (void (*)(int, void*))msg.arg1();
f(m_id, (void*)msg.arg2());
}