diff --git a/server/core/CMakeLists.txt b/server/core/CMakeLists.txt index 9f2e14b94..f2436b79f 100644 --- a/server/core/CMakeLists.txt +++ b/server/core/CMakeLists.txt @@ -44,6 +44,7 @@ add_library(maxscale-common SHARED users.cc utils.cc worker.cc + workertask.cc ) if(WITH_JEMALLOC) diff --git a/server/core/maxscale/worker.hh b/server/core/maxscale/worker.hh index d5df23452..69f3f49bd 100644 --- a/server/core/maxscale/worker.hh +++ b/server/core/maxscale/worker.hh @@ -13,14 +13,18 @@ */ #include +#include #include #include "messagequeue.hh" #include "poll.h" #include "worker.h" +#include "workertask.hh" namespace maxscale { +class Semaphore; + struct WORKER_STATISTICS { WORKER_STATISTICS() @@ -61,6 +65,8 @@ class Worker : public MXS_WORKER public: typedef WORKER_STATISTICS STATISTICS; + typedef WorkerTask Task; + typedef WorkerDisposableTask DisposableTask; enum state_t { @@ -244,6 +250,76 @@ public: return m_should_shutdown; } + /** + * Executes a task in the context of a Worker. + * + * @param pTask The task to be executed. + * @param pSem If non-NULL, will be posted once the task's `execute` return. + * + * @return True if the task could be *posted*, false otherwise. + * + * @attention The instance must remain valid for as long as it takes for the + * task to be transferred to the worker and its `execute` function + * to be called. + * + * The semaphore can be used for waiting for the task to be finished. + * + * @code + * Semaphore sem; + * MyTask task; + * + * pWorker->execute(&task, &sem); + * sem.wait(); + * + * MyResult& result = task.result(); + * @endcode + */ + bool execute(Task* pTask, Semaphore* pSem = NULL); + + /** + * Executes a disposable task in the context of a Worker. + * + * @param pTask The task to be executed. + * + * @return True if the task could be *posted*, false otherwise. + * + * @attention Once the task has been executed, it will be deleted. + */ + bool execute(std::auto_ptr sTask); + + /** + * Executes a task on all workers. + * + * @param pTask The task to be executed. + * @param pSem If non-NULL, will be posted once per worker when the task's + * `execute` return. + * + * @return How many workers the task was posted to. + * + * @attention The very same task will be posted to all workers. The task + * should either not have any sharable data or then it should + * have data specific to each worker that can be accessed + * without locks. + */ + static size_t execute_on_all(Task* pTask, Semaphore* pSem = NULL); + + /** + * Executes a task on all workers. + * + * @param pTask The task to be executed. + * + * @return How many workers the task was posted to. + * + * @attention The very same task will be posted to all workers. The task + * should either not have any sharable data or then it should + * have data specific to each worker that can be accessed + * without locks. + * + * @attention Once the task has been executed by all workers, it will + * be deleted. + */ + static size_t execute_on_all(std::auto_ptr sTask); + /** * Post a message to a worker. * @@ -337,6 +413,8 @@ private: static Worker* create(int id, int epoll_listener_fd); + bool execute_disposable(DisposableTask* pTask); + void handle_message(MessageQueue& queue, const MessageQueue::Message& msg); // override static void thread_main(void* arg); diff --git a/server/core/maxscale/workertask.hh b/server/core/maxscale/workertask.hh new file mode 100644 index 000000000..e8186013d --- /dev/null +++ b/server/core/maxscale/workertask.hh @@ -0,0 +1,65 @@ +#pragma once +/* + * Copyright (c) 2016 MariaDB Corporation Ab + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file and at www.mariadb.com/bsl11. + * + * Change Date: 2019-07-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2 or later of the General + * Public License. + */ + +#include + +namespace maxscale +{ + +class Worker; + +/** + * A WorkerTask represents a task to be performed by a Worker. + */ +class WorkerTask +{ +public: + /** + * Destructor + */ + virtual ~WorkerTask(); + + /** + * @brief Called in the context of a specific worker. + * + * @param worker The worker in whose context `execute` is called. + * + * @attention As the function is called by a worker, the body of `execute` + * should execute quickly and not perform any blocking operations. + */ + virtual void execute(Worker& worker) = 0; +}; + +/** + * A WorkerDisposableTask represents a task to be performed by a Worker. + * + * When the task has been executed, the instance will automatically be + * deleted. + */ +class WorkerDisposableTask : public WorkerTask +{ +protected: + WorkerDisposableTask(); + +private: + friend class Worker; + + void inc_count(); + void dec_count(); + +private: + int32_t m_count; +}; + +} diff --git a/server/core/worker.cc b/server/core/worker.cc index 66c3b6d49..0f95fffe2 100644 --- a/server/core/worker.cc +++ b/server/core/worker.cc @@ -23,6 +23,7 @@ #include #include #include +#include #include "maxscale/modules.h" #include "maxscale/poll.h" #include "maxscale/statistics.h" @@ -34,6 +35,9 @@ using maxscale::Worker; namespace { +const int MXS_WORKER_MSG_TASK = -1; +const int MXS_WORKER_MSG_DISPOSABLE_TASK = -2; + /** * Unit variables. */ @@ -529,6 +533,77 @@ void Worker::set_maxwait(unsigned int maxwait) this_unit.max_poll_sleep = maxwait; } +bool Worker::execute(Task* pTask, Semaphore* pSem) +{ + intptr_t arg1 = reinterpret_cast(pTask); + intptr_t arg2 = reinterpret_cast(pSem); + + return post_message(MXS_WORKER_MSG_TASK, arg1, arg2); +} + +bool Worker::execute(std::auto_ptr sTask) +{ + return execute_disposable(sTask.release()); +} + +// private +bool Worker::execute_disposable(DisposableTask* pTask) +{ + pTask->inc_count(); + + intptr_t arg1 = reinterpret_cast(pTask); + + bool posted = post_message(MXS_WORKER_MSG_DISPOSABLE_TASK, arg1, 0); + + if (!posted) + { + pTask->dec_count(); + } + + return posted; +} + +//static +size_t Worker::execute_on_all(Task* pTask, Semaphore* pSem) +{ + size_t n = 0; + + for (int i = 0; i < this_unit.n_workers; ++i) + { + Worker* pWorker = this_unit.ppWorkers[i]; + + if (pWorker->execute(pTask, pSem)) + { + ++n; + } + } + + return n; +} + +//static +size_t Worker::execute_on_all(std::auto_ptr sTask) +{ + DisposableTask* pTask = sTask.release(); + pTask->inc_count(); + + size_t n = 0; + + for (int i = 0; i < this_unit.n_workers; ++i) + { + Worker* pWorker = this_unit.ppWorkers[i]; + + if (pWorker->execute_disposable(pTask)) + { + ++n; + } + } + + pTask->dec_count(); + + return n; +} + bool Worker::post_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2) { // NOTE: No logging here, this function must be signal safe. @@ -745,6 +820,28 @@ void Worker::handle_message(MessageQueue& queue, const MessageQueue::Message& ms } break; + case MXS_WORKER_MSG_TASK: + { + Task *pTask = reinterpret_cast(msg.arg1()); + Semaphore* pSem = reinterpret_cast(msg.arg2()); + + pTask->execute(*this); + + if (pSem) + { + pSem->post(); + } + } + break; + + case MXS_WORKER_MSG_DISPOSABLE_TASK: + { + DisposableTask *pTask = reinterpret_cast(msg.arg1()); + pTask->execute(*this); + pTask->dec_count(); + } + break; + default: MXS_ERROR("Worker received unknown message %d.", msg.id()); } diff --git a/server/core/workertask.cc b/server/core/workertask.cc new file mode 100644 index 000000000..c8d661a22 --- /dev/null +++ b/server/core/workertask.cc @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2016 MariaDB Corporation Ab + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file and at www.mariadb.com/bsl11. + * + * Change Date: 2019-07-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2 or later of the General + * Public License. + */ + +#include "maxscale/workertask.hh" +#include + +namespace maxscale +{ + +// +// WorkerTask +// +WorkerTask::~WorkerTask() +{ +} + +// +// WorkerDisposableTask +// +WorkerDisposableTask::WorkerDisposableTask() + : m_count(0) +{ +} + +void WorkerDisposableTask::inc_count() +{ + atomic_add(&m_count, 1); +} + +void WorkerDisposableTask::dec_count() +{ + if (atomic_add(&m_count, -1) == 1) + { + delete this; + } +} + +}