Adjust Worker terminology

- Posting a task to a worker for execution (without implicit wait)
  is called "post".
- Posting a task to every worker for execution (without implicit wait)
  is called "broadcast".

In these cases the task must be provided as a pointer or auto_ptr, to
indicate that the provided pointer must remain alive for longer than
the duration of the function call.

- Posting a task to a worker for execution *and* waiting for all workers
  to have executed the task is called "execute" and the two variants are
  now called "execute_concurrently" and "execute_serially".

In these cases the task is provided as a reference, since the functions
will return only when all workers have (in concurrent or serial fashion)
executed the task. That is, it need not remain alive for longer than the
duration of the function call.
This commit is contained in:
Johan Wikman
2017-05-02 11:22:37 +03:00
parent 1b58a75f42
commit 00b6c10089
3 changed files with 38 additions and 35 deletions

View File

@ -3069,7 +3069,7 @@ private:
bool dcb_foreach(bool(*func)(DCB *dcb, void *data), void *data) bool dcb_foreach(bool(*func)(DCB *dcb, void *data), void *data)
{ {
SerialDcbTask task(func, data); SerialDcbTask task(func, data);
Worker::execute_on_all_serially(&task); Worker::execute_serially(task);
return task.more(); return task.more();
} }
@ -3104,9 +3104,8 @@ private:
void dcb_foreach_parallel(bool(*func)(DCB *dcb, void *data), void **data) void dcb_foreach_parallel(bool(*func)(DCB *dcb, void *data), void **data)
{ {
Semaphore sem;
ParallelDcbTask task(func, data); ParallelDcbTask task(func, data);
sem.wait_n(Worker::execute_on_all(&task, &sem)); Worker::execute_concurrently(task);
} }
int dcb_get_port(const DCB *dcb) int dcb_get_port(const DCB *dcb)

View File

@ -254,12 +254,12 @@ public:
} }
/** /**
* Executes a task in the context of a Worker. * Posts a task to a worker for execution.
* *
* @param pTask The task to be executed. * @param pTask The task to be executed.
* @param pSem If non-NULL, will be posted once the task's `execute` return. * @param pSem If non-NULL, will be posted once the task's `execute` return.
* *
* @return True if the task could be *posted*, false otherwise. * @return True if the task could be posted (i.e. not executed), false otherwise.
* *
* @attention The instance must remain valid for as long as it takes for the * @attention The instance must remain valid for as long as it takes for the
* task to be transferred to the worker and its `execute` function * task to be transferred to the worker and its `execute` function
@ -277,21 +277,21 @@ public:
* MyResult& result = task.result(); * MyResult& result = task.result();
* @endcode * @endcode
*/ */
bool execute(Task* pTask, Semaphore* pSem = NULL); bool post(Task* pTask, Semaphore* pSem = NULL);
/** /**
* Executes a disposable task in the context of a Worker. * Posts a task to a worker for execution.
* *
* @param pTask The task to be executed. * @param pTask The task to be executed.
* *
* @return True if the task could be *posted*, false otherwise. * @return True if the task could be posted (i.e. not executed), false otherwise.
* *
* @attention Once the task has been executed, it will be deleted. * @attention Once the task has been executed, it will be deleted.
*/ */
bool execute(std::auto_ptr<DisposableTask> sTask); bool post(std::auto_ptr<DisposableTask> sTask);
/** /**
* Executes a task on all workers. * Posts a task to all workers for execution.
* *
* @param pTask The task to be executed. * @param pTask The task to be executed.
* @param pSem If non-NULL, will be posted once per worker when the task's * @param pSem If non-NULL, will be posted once per worker when the task's
@ -304,10 +304,10 @@ public:
* have data specific to each worker that can be accessed * have data specific to each worker that can be accessed
* without locks. * without locks.
*/ */
static size_t execute_on_all(Task* pTask, Semaphore* pSem = NULL); static size_t broadcast(Task* pTask, Semaphore* pSem = NULL);
/** /**
* Executes a task on all workers. * Posts a task to all workers for execution.
* *
* @param pTask The task to be executed. * @param pTask The task to be executed.
* *
@ -321,14 +321,14 @@ public:
* @attention Once the task has been executed by all workers, it will * @attention Once the task has been executed by all workers, it will
* be deleted. * be deleted.
*/ */
static size_t execute_on_all(std::auto_ptr<DisposableTask> sTask); static size_t broadcast(std::auto_ptr<DisposableTask> sTask);
/** /**
* Executes a task on all workers in serial mode. * Executes a task on all workers in serial mode (the task is executed
* on at most one worker thread at a time). When the function returns
* the task has been executed on all workers.
* *
* The task is executed on at most one worker thread at a time. * @param task The task to be executed.
*
* @param pTask The task to be executed.
* *
* @return How many workers the task was posted to. * @return How many workers the task was posted to.
* *
@ -336,17 +336,18 @@ public:
* to the other functions. Only use this function when printing thread-specific * to the other functions. Only use this function when printing thread-specific
* data to stdout. * data to stdout.
*/ */
static size_t execute_on_all_serially(Task* pTask); static size_t execute_serially(Task& task);
/** /**
* Executes a task on all workers concurrently and waits until * Executes a task on all workers concurrently and waits until all workers
* all workers are done. * are done. That is, when the function returns the task has been executed
* by all workers.
* *
* @param pTask The task to be executed. * @param task The task to be executed.
* *
* @return How many workers the task was posted to. * @return How many workers the task was posted to.
*/ */
static size_t execute_on_all_concurrently(Task* pTask); static size_t execute_concurrently(Task& task);
/** /**
* Post a message to a worker. * Post a message to a worker.
@ -441,7 +442,7 @@ private:
static Worker* create(int id, int epoll_listener_fd); static Worker* create(int id, int epoll_listener_fd);
bool execute_disposable(DisposableTask* pTask); bool post_disposable(DisposableTask* pTask);
void handle_message(MessageQueue& queue, const MessageQueue::Message& msg); // override void handle_message(MessageQueue& queue, const MessageQueue::Message& msg); // override

View File

@ -548,21 +548,23 @@ void Worker::set_maxwait(unsigned int maxwait)
this_unit.max_poll_sleep = maxwait; this_unit.max_poll_sleep = maxwait;
} }
bool Worker::execute(Task* pTask, Semaphore* pSem) bool Worker::post(Task* pTask, Semaphore* pSem)
{ {
// No logging here, function must be signal safe.
intptr_t arg1 = reinterpret_cast<intptr_t>(pTask); intptr_t arg1 = reinterpret_cast<intptr_t>(pTask);
intptr_t arg2 = reinterpret_cast<intptr_t>(pSem); intptr_t arg2 = reinterpret_cast<intptr_t>(pSem);
return post_message(MXS_WORKER_MSG_TASK, arg1, arg2); return post_message(MXS_WORKER_MSG_TASK, arg1, arg2);
} }
bool Worker::execute(std::auto_ptr<DisposableTask> sTask) bool Worker::post(std::auto_ptr<DisposableTask> sTask)
{ {
return execute_disposable(sTask.release()); // No logging here, function must be signal safe.
return post_disposable(sTask.release());
} }
// private // private
bool Worker::execute_disposable(DisposableTask* pTask) bool Worker::post_disposable(DisposableTask* pTask)
{ {
pTask->inc_ref(); pTask->inc_ref();
@ -579,15 +581,16 @@ bool Worker::execute_disposable(DisposableTask* pTask)
} }
//static //static
size_t Worker::execute_on_all(Task* pTask, Semaphore* pSem) size_t Worker::broadcast(Task* pTask, Semaphore* pSem)
{ {
// No logging here, function must be signal safe.
size_t n = 0; size_t n = 0;
for (int i = 0; i < this_unit.n_workers; ++i) for (int i = 0; i < this_unit.n_workers; ++i)
{ {
Worker* pWorker = this_unit.ppWorkers[i]; Worker* pWorker = this_unit.ppWorkers[i];
if (pWorker->execute(pTask, pSem)) if (pWorker->post(pTask, pSem))
{ {
++n; ++n;
} }
@ -597,7 +600,7 @@ size_t Worker::execute_on_all(Task* pTask, Semaphore* pSem)
} }
//static //static
size_t Worker::execute_on_all(std::auto_ptr<DisposableTask> sTask) size_t Worker::broadcast(std::auto_ptr<DisposableTask> sTask)
{ {
DisposableTask* pTask = sTask.release(); DisposableTask* pTask = sTask.release();
pTask->inc_ref(); pTask->inc_ref();
@ -608,7 +611,7 @@ size_t Worker::execute_on_all(std::auto_ptr<DisposableTask> sTask)
{ {
Worker* pWorker = this_unit.ppWorkers[i]; Worker* pWorker = this_unit.ppWorkers[i];
if (pWorker->execute_disposable(pTask)) if (pWorker->post_disposable(pTask))
{ {
++n; ++n;
} }
@ -620,7 +623,7 @@ size_t Worker::execute_on_all(std::auto_ptr<DisposableTask> sTask)
} }
//static //static
size_t Worker::execute_on_all_serially(Task* pTask) size_t Worker::execute_serially(Task& task)
{ {
Semaphore sem; Semaphore sem;
size_t n = 0; size_t n = 0;
@ -629,7 +632,7 @@ size_t Worker::execute_on_all_serially(Task* pTask)
{ {
Worker* pWorker = this_unit.ppWorkers[i]; Worker* pWorker = this_unit.ppWorkers[i];
if (pWorker->execute(pTask, &sem)) if (pWorker->post(&task, &sem))
{ {
sem.wait(); sem.wait();
++n; ++n;
@ -640,10 +643,10 @@ size_t Worker::execute_on_all_serially(Task* pTask)
} }
//static //static
size_t Worker::execute_on_all_concurrently(Task* pTask) size_t Worker::execute_concurrently(Task& task)
{ {
Semaphore sem; Semaphore sem;
return sem.wait_n(Worker::execute_on_all(pTask, &sem)); return sem.wait_n(Worker::broadcast(&task, &sem));
} }
bool Worker::post_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2) bool Worker::post_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2)