Introduce concept of Worker tasks

A Worker::Task is an object that can be sent to a worker for
execution. The task is sent to the worker using the messaging
mechanism where the `execute` function of the task will be
called in the thread context of the worker.

There are two kinds of tasks; regular tasks and disposable tasks.
The former are just sent to the worker for execution while the
latter are sent and subsequently disposed of, once the task has
been executed.

A disposable task can be sent to either one worker or to all
workers. In the latter case, the task will be deleted once it
has been executed by all workers.

A semaphore can be associated with a regular task. Once the task
has been executed by the worker, the semaphore will automatically
be posted. That way, it is trivial to send a task for execution
to a worker and wait until the task has been executed. For instance:

    Semaphore sem;
    MyTask task;

    pWorker->execute(&task, &sem);
    sem.wait();

    const MyResult& result = task.result();

The low level mechanism for posting and broadcasting messages will
be removed.
This commit is contained in:
Johan Wikman 2017-04-24 14:38:32 +03:00
parent 172cdbc5a3
commit 8174690f77
5 changed files with 289 additions and 0 deletions

View File

@ -44,6 +44,7 @@ add_library(maxscale-common SHARED
users.cc
utils.cc
worker.cc
workertask.cc
)
if(WITH_JEMALLOC)

View File

@ -13,14 +13,18 @@
*/
#include <maxscale/cppdefs.hh>
#include <memory>
#include <maxscale/platform.h>
#include "messagequeue.hh"
#include "poll.h"
#include "worker.h"
#include "workertask.hh"
namespace maxscale
{
class Semaphore;
struct WORKER_STATISTICS
{
WORKER_STATISTICS()
@ -61,6 +65,8 @@ class Worker : public MXS_WORKER
public:
typedef WORKER_STATISTICS STATISTICS;
typedef WorkerTask Task;
typedef WorkerDisposableTask DisposableTask;
enum state_t
{
@ -244,6 +250,76 @@ public:
return m_should_shutdown;
}
/**
* Executes a task in the context of a Worker.
*
* @param pTask The task to be executed.
* @param pSem If non-NULL, will be posted once the task's `execute` return.
*
* @return True if the task could be *posted*, false otherwise.
*
* @attention The instance must remain valid for as long as it takes for the
* task to be transferred to the worker and its `execute` function
* to be called.
*
* The semaphore can be used for waiting for the task to be finished.
*
* @code
* Semaphore sem;
* MyTask task;
*
* pWorker->execute(&task, &sem);
* sem.wait();
*
* MyResult& result = task.result();
* @endcode
*/
bool execute(Task* pTask, Semaphore* pSem = NULL);
/**
* Executes a disposable task in the context of a Worker.
*
* @param pTask The task to be executed.
*
* @return True if the task could be *posted*, false otherwise.
*
* @attention Once the task has been executed, it will be deleted.
*/
bool execute(std::auto_ptr<DisposableTask> sTask);
/**
* Executes a task on all workers.
*
* @param pTask The task to be executed.
* @param pSem If non-NULL, will be posted once per worker when the task's
* `execute` return.
*
* @return How many workers the task was posted to.
*
* @attention The very same task will be posted to all workers. The task
* should either not have any sharable data or then it should
* have data specific to each worker that can be accessed
* without locks.
*/
static size_t execute_on_all(Task* pTask, Semaphore* pSem = NULL);
/**
* Executes a task on all workers.
*
* @param pTask The task to be executed.
*
* @return How many workers the task was posted to.
*
* @attention The very same task will be posted to all workers. The task
* should either not have any sharable data or then it should
* have data specific to each worker that can be accessed
* without locks.
*
* @attention Once the task has been executed by all workers, it will
* be deleted.
*/
static size_t execute_on_all(std::auto_ptr<DisposableTask> sTask);
/**
* Post a message to a worker.
*
@ -337,6 +413,8 @@ private:
static Worker* create(int id, int epoll_listener_fd);
bool execute_disposable(DisposableTask* pTask);
void handle_message(MessageQueue& queue, const MessageQueue::Message& msg); // override
static void thread_main(void* arg);

View File

@ -0,0 +1,65 @@
#pragma once
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2019-07-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include <maxscale/cppdefs.hh>
namespace maxscale
{
class Worker;
/**
* A WorkerTask represents a task to be performed by a Worker.
*/
class WorkerTask
{
public:
/**
* Destructor
*/
virtual ~WorkerTask();
/**
* @brief Called in the context of a specific worker.
*
* @param worker The worker in whose context `execute` is called.
*
* @attention As the function is called by a worker, the body of `execute`
* should execute quickly and not perform any blocking operations.
*/
virtual void execute(Worker& worker) = 0;
};
/**
* A WorkerDisposableTask represents a task to be performed by a Worker.
*
* When the task has been executed, the instance will automatically be
* deleted.
*/
class WorkerDisposableTask : public WorkerTask
{
protected:
WorkerDisposableTask();
private:
friend class Worker;
void inc_count();
void dec_count();
private:
int32_t m_count;
};
}

View File

@ -23,6 +23,7 @@
#include <maxscale/hk_heartbeat.h>
#include <maxscale/log_manager.h>
#include <maxscale/platform.h>
#include <maxscale/semaphore.hh>
#include "maxscale/modules.h"
#include "maxscale/poll.h"
#include "maxscale/statistics.h"
@ -34,6 +35,9 @@ using maxscale::Worker;
namespace
{
const int MXS_WORKER_MSG_TASK = -1;
const int MXS_WORKER_MSG_DISPOSABLE_TASK = -2;
/**
* Unit variables.
*/
@ -529,6 +533,77 @@ void Worker::set_maxwait(unsigned int maxwait)
this_unit.max_poll_sleep = maxwait;
}
bool Worker::execute(Task* pTask, Semaphore* pSem)
{
intptr_t arg1 = reinterpret_cast<intptr_t>(pTask);
intptr_t arg2 = reinterpret_cast<intptr_t>(pSem);
return post_message(MXS_WORKER_MSG_TASK, arg1, arg2);
}
bool Worker::execute(std::auto_ptr<DisposableTask> sTask)
{
return execute_disposable(sTask.release());
}
// private
bool Worker::execute_disposable(DisposableTask* pTask)
{
pTask->inc_count();
intptr_t arg1 = reinterpret_cast<intptr_t>(pTask);
bool posted = post_message(MXS_WORKER_MSG_DISPOSABLE_TASK, arg1, 0);
if (!posted)
{
pTask->dec_count();
}
return posted;
}
//static
size_t Worker::execute_on_all(Task* pTask, Semaphore* pSem)
{
size_t n = 0;
for (int i = 0; i < this_unit.n_workers; ++i)
{
Worker* pWorker = this_unit.ppWorkers[i];
if (pWorker->execute(pTask, pSem))
{
++n;
}
}
return n;
}
//static
size_t Worker::execute_on_all(std::auto_ptr<DisposableTask> sTask)
{
DisposableTask* pTask = sTask.release();
pTask->inc_count();
size_t n = 0;
for (int i = 0; i < this_unit.n_workers; ++i)
{
Worker* pWorker = this_unit.ppWorkers[i];
if (pWorker->execute_disposable(pTask))
{
++n;
}
}
pTask->dec_count();
return n;
}
bool Worker::post_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2)
{
// NOTE: No logging here, this function must be signal safe.
@ -745,6 +820,28 @@ void Worker::handle_message(MessageQueue& queue, const MessageQueue::Message& ms
}
break;
case MXS_WORKER_MSG_TASK:
{
Task *pTask = reinterpret_cast<Task*>(msg.arg1());
Semaphore* pSem = reinterpret_cast<Semaphore*>(msg.arg2());
pTask->execute(*this);
if (pSem)
{
pSem->post();
}
}
break;
case MXS_WORKER_MSG_DISPOSABLE_TASK:
{
DisposableTask *pTask = reinterpret_cast<DisposableTask*>(msg.arg1());
pTask->execute(*this);
pTask->dec_count();
}
break;
default:
MXS_ERROR("Worker received unknown message %d.", msg.id());
}

48
server/core/workertask.cc Normal file
View File

@ -0,0 +1,48 @@
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2019-07-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include "maxscale/workertask.hh"
#include <maxscale/atomic.h>
namespace maxscale
{
//
// WorkerTask
//
WorkerTask::~WorkerTask()
{
}
//
// WorkerDisposableTask
//
WorkerDisposableTask::WorkerDisposableTask()
: m_count(0)
{
}
void WorkerDisposableTask::inc_count()
{
atomic_add(&m_count, 1);
}
void WorkerDisposableTask::dec_count()
{
if (atomic_add(&m_count, -1) == 1)
{
delete this;
}
}
}