diff --git a/include/maxscale/worker.h b/include/maxscale/worker.h index 609436a06..06a753fb3 100644 --- a/include/maxscale/worker.h +++ b/include/maxscale/worker.h @@ -20,7 +20,6 @@ MXS_BEGIN_DECLS typedef struct mxs_worker { - MXS_POLL_DATA m_poll; /*< The poll data used by the polling mechanism. */ } MXS_WORKER; enum mxs_worker_msg_id diff --git a/server/core/maxscale/worker.hh b/server/core/maxscale/worker.hh index 31c56c926..9fbb7096d 100644 --- a/server/core/maxscale/worker.hh +++ b/server/core/maxscale/worker.hh @@ -13,12 +13,14 @@ */ #include +#include "messagequeue.hh" #include "worker.h" namespace maxscale { class Worker : public MXS_WORKER + , private MessageQueue::Handler { Worker(const Worker&); Worker& operator = (const Worker&); @@ -195,27 +197,23 @@ public: static int get_current_id(); private: - Worker(int id, int epoll_fd, int read_fd, int write_fd); - ~Worker(); + Worker(int id, int epoll_fd); + virtual ~Worker(); static Worker* create(int id); - uint32_t poll(uint32_t events); - static uint32_t poll_handler(MXS_POLL_DATA* pData, int worker_id, uint32_t events); - - void handle_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2); + void handle_message(MessageQueue& queue, const MessageQueue::Message& msg); // override static void thread_main(void* arg); private: - int m_id; /*< The id of the worker. */ - int m_epoll_fd; /*< The epoll file descriptor. */ - int m_read_fd; /*< The file descriptor the worker reads from. */ - int m_write_fd; /*< The file descriptor used for sending data to the worker. */ - THREAD m_thread; /*< The thread handle of the worker. */ - bool m_started; /*< Whether the thread has been started or not. */ - bool m_should_shutdown; /*< Whether shutdown should be performed. */ - bool m_shutdown_initiated; /*< Whether shutdown has been initated. */ + int m_id; /*< The id of the worker. */ + int m_epoll_fd; /*< The epoll file descriptor. */ + MessageQueue* m_pQueue; /*< The message queue of the worker. */ + THREAD m_thread; /*< The thread handle of the worker. */ + bool m_started; /*< Whether the thread has been started or not. */ + bool m_should_shutdown; /*< Whether shutdown should be performed. */ + bool m_shutdown_initiated; /*< Whether shutdown has been initated. */ }; } diff --git a/server/core/worker.cc b/server/core/worker.cc index a41d0d9e9..d562111f2 100644 --- a/server/core/worker.cc +++ b/server/core/worker.cc @@ -126,27 +126,22 @@ void poll_resolve_error(int wid, int fd, int errornum, int op) static bool modules_thread_init(); static void modules_thread_finish(); -Worker::Worker(int id, int epoll_fd, int read_fd, int write_fd) +Worker::Worker(int id, int epoll_fd) : m_id(id) , m_epoll_fd(epoll_fd) - , m_read_fd(read_fd) - , m_write_fd(write_fd) + , m_pQueue(NULL) + , m_thread(0) + , m_started(false) + , m_should_shutdown(false) + , m_shutdown_initiated(false) { - m_poll.handler = &Worker::poll_handler; - - m_thread = 0; - m_started = false; - m_should_shutdown = false; - m_shutdown_initiated = false; } Worker::~Worker() { ss_dassert(!m_started); - poll_remove_fd_from_worker(m_id, m_read_fd); - close(m_read_fd); - close(m_write_fd); + delete m_pQueue; close(m_epoll_fd); } @@ -272,14 +267,9 @@ int Worker::get_current_id() bool Worker::post_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2) { // NOTE: No logging here, this function must be signal safe. - WORKER_MESSAGE message = {}; - message.id = msg_id; - message.arg1 = arg1; - message.arg2 = arg2; + MessageQueue::Message message(msg_id, arg1, arg2); - ssize_t n = write(m_write_fd, &message, sizeof(message)); - - return n == sizeof(message) ? true : false; + return m_pQueue->post(message); } bool mxs_worker_post_message(MXS_WORKER* pWorker, uint32_t msg_id, intptr_t arg1, intptr_t arg2) @@ -391,42 +381,41 @@ void Worker::shutdown_all() //static Worker* Worker::create(int worker_id) { - Worker* pWorker = NULL; + Worker* pThis = NULL; int epoll_fd = epoll_create(MAX_EVENTS); if (epoll_fd != -1) { - int fds[2]; + pThis = new (std::nothrow) Worker(worker_id, epoll_fd); - // We create the pipe in message mode (O_DIRECT), so that we do - // not need to deal with partial messages. - if (pipe2(fds, O_DIRECT | O_NONBLOCK | O_CLOEXEC) == 0) + if (pThis) { - int read_fd = fds[0]; - int write_fd = fds[1]; + MessageQueue* pQueue = MessageQueue::create(pThis); - pWorker = new (std::nothrow) Worker(worker_id, epoll_fd, read_fd, write_fd); - - if (pWorker) + if (pQueue) { - if (!pWorker->add_fd(read_fd, EPOLLIN, &pWorker->m_poll)) + if (pQueue->add_to_worker(pThis)) { - MXS_ERROR("Could not add read descriptor of worker to poll set: %s", mxs_strerror(errno)); - delete pWorker; - pWorker = NULL; + pThis->m_pQueue = pQueue; + } + else + { + MXS_ERROR("Could not add message queue to worker."); + delete pThis; + pThis = NULL; } } else { - close(read_fd); - close(write_fd); - close(epoll_fd); + MXS_ERROR("Could not create message queue for worker."); + delete pThis; } } else { - MXS_ERROR("Could not create pipe for worker: %s", mxs_strerror(errno)); + MXS_OOM(); + close(epoll_fd); } } else @@ -434,7 +423,7 @@ Worker* Worker::create(int worker_id) MXS_ERROR("Could not create epoll-instance for worker: %s", mxs_strerror(errno)); } - return pWorker; + return pThis; } /** @@ -444,14 +433,14 @@ Worker* Worker::create(int worker_id) * @param arg1 Message specific first argument. * @param arg2 Message specific second argument. */ -void Worker::handle_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2) +void Worker::handle_message(MessageQueue& queue, const MessageQueue::Message& msg) { - switch (msg_id) + switch (msg.id()) { case MXS_WORKER_MSG_PING: { - ss_dassert(arg1 == 0); - char* zArg2 = reinterpret_cast(arg2); + ss_dassert(msg.arg1() == 0); + char* zArg2 = reinterpret_cast(msg.arg2()); const char* zMessage = zArg2 ? zArg2 : "Alive and kicking"; MXS_NOTICE("Worker[%d]: %s.", m_id, zMessage); MXS_FREE(zArg2); @@ -467,90 +456,17 @@ void Worker::handle_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2) case MXS_WORKER_MSG_CALL: { - void (*f)(int, void*) = (void (*)(int,void*))arg1; + void (*f)(int, void*) = (void (*)(int,void*))msg.arg1(); - f(m_id, (void*)arg2); + f(m_id, (void*)msg.arg2()); } break; default: - MXS_ERROR("Worker received unknown message %d.", msg_id); + MXS_ERROR("Worker received unknown message %d.", msg.id()); } } -/** - * Worker poll handler. - * - * @param thread_id Id of the thread; same as id of the relevant worker. - * @param events Epoll events. - * - * @return What events the handler handled. - */ -uint32_t Worker::poll(uint32_t events) -{ - int rc = MXS_POLL_NOP; - - // We only expect EPOLLIN events. - ss_dassert(((events & EPOLLIN) != 0) && ((events & ~EPOLLIN) == 0)); - - if (events & EPOLLIN) - { - WORKER_MESSAGE message; - - ssize_t n; - - do - { - n = read(m_read_fd, &message, sizeof(message)); - - if (n == sizeof(message)) - { - handle_message(message.id, message.arg1, message.arg2); - } - else if (n == -1) - { - if (errno != EWOULDBLOCK) - { - MXS_ERROR("Worker could not read from pipe: %s", mxs_strerror(errno)); - } - } - else if (n != 0) - { - // This really should not happen as the pipe is in message mode. We - // should either get a message, nothing at all or an error. In non-debug - // mode we continue reading in order to empty the pipe as otherwise the - // thread may hang. - MXS_ERROR("Worker could only read %ld bytes from pipe, although expected %lu bytes.", - n, sizeof(message)); - ss_dassert(!true); - } - } - while ((n != 0) && (n != -1)); - - rc = MXS_POLL_READ; - } - - return rc; -} - -/** - * Handler for poll events related to the read descriptor of the worker. - * - * @param pData The MXS_POLL_DATA of the worker in question. - * @param thread_id Id of the thread; same as id of the relevant worker. - * @param events Epoll events. - * - * @return What events the handler handled. - */ -//static -uint32_t Worker::poll_handler(MXS_POLL_DATA* pData, int thread_id, uint32_t events) -{ - Worker* pWorker = reinterpret_cast(pData); - ss_dassert(pWorker->m_id == thread_id); - - return pWorker->poll(events); -} - /** * The entry point of each worker thread. *