diff --git a/include/maxscale/poll_core.h b/include/maxscale/poll_core.h index 6372b1fd4..b846c3fc0 100644 --- a/include/maxscale/poll_core.h +++ b/include/maxscale/poll_core.h @@ -31,25 +31,24 @@ typedef enum mxs_poll_action MXS_POLL_ERROR = 0x10, } mxs_poll_action_t; +struct mxs_poll_data; +/** Pointer to function that knows how to handle events for a particular + * 'struct mxs_poll_data' structure. + * + * @param data The `mxs_poll_data` instance that contained this pointer. + * @param wid The worker thread id. + * @param events The epoll events. + * + * @return A combination of mxs_poll_action_t enumeration values. + */ +typedef uint32_t (*mxs_poll_handler_t)(struct mxs_poll_data* data, int wid, uint32_t events); + typedef struct mxs_poll_data { - /** Pointer to function that knows how to handle events for this particular - * 'struct mxs_poll_data' structure. - * - * @param data The `mxs_poll_data` instance that contained this pointer. - * @param wid The worker thread id. - * @param events The epoll events. - * - * @return A combination of mxs_poll_action_t enumeration values. - */ - uint32_t (*handler)(struct mxs_poll_data *data, int wid, uint32_t events); - + mxs_poll_handler_t handler; /*< Handler for this particular kind of mxs_poll_data. */ struct { - /** - * The id of the worker thread - */ - int id; + int id; /*< The id of the worker thread. */ } thread; } MXS_POLL_DATA; diff --git a/include/maxscale/poll_core.hh b/include/maxscale/poll_core.hh new file mode 100644 index 000000000..3f2a4d39b --- /dev/null +++ b/include/maxscale/poll_core.hh @@ -0,0 +1,36 @@ +#pragma once +/* + * Copyright (c) 2016 MariaDB Corporation Ab + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file and at www.mariadb.com/bsl. + * + * Change Date: 2019-07-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2 or later of the General + * Public License. + */ + +#include +#include + +namespace maxscale +{ + +struct MxsPollData : MXS_POLL_DATA +{ + MxsPollData() + { + handler = NULL; + thread.id = 0; + } + + MxsPollData(mxs_poll_handler_t h) + { + handler = h; + thread.id = 0; + } +}; + +} diff --git a/server/core/CMakeLists.txt b/server/core/CMakeLists.txt index 3d7666ff5..ee6f12201 100644 --- a/server/core/CMakeLists.txt +++ b/server/core/CMakeLists.txt @@ -18,6 +18,7 @@ add_library(maxscale-common SHARED maxscale_pcre2.cc misc.cc mlist.cc + messagequeue.cc modulecmd.cc modutil.cc monitor.cc diff --git a/server/core/maxscale/messagequeue.hh b/server/core/maxscale/messagequeue.hh new file mode 100644 index 000000000..32a938358 --- /dev/null +++ b/server/core/maxscale/messagequeue.hh @@ -0,0 +1,192 @@ +#pragma once +/* + * Copyright (c) 2016 MariaDB Corporation Ab + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file and at www.mariadb.com/bsl11. + * + * Change Date: 2019-07-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2 or later of the General + * Public License. + */ + +#include +#include + +namespace maxscale +{ + +class MessageQueue; +class Worker; + +/** + * An instance of @c MessageQueueMessage can be sent over a @c MessageQueue from + * one context to another. The instance will be copied verbatim without any + * interpretation, so if the same message is sent to multiple recipients it is + * the caller's and recipient's responsibility to manage the lifetime and + * concurrent access of anything possibly pointed to from the message. + */ +class MessageQueueMessage /* final */ +{ +public: + /** + * Constructor + * + * @param id The id of the message. The meaning is an affair between the sender + * and the recipient. + * @param arg1 First argument. + * @param arg2 Second argument. + */ + explicit MessageQueueMessage(uint32_t id = 0, intptr_t arg1 = 0, intptr_t arg2 = 0) + : m_id(id) + , m_arg1(arg1) + , m_arg2(arg2) + { + } + + uint32_t id() const + { + return m_id; + } + + intptr_t arg1() const + { + return m_arg1; + } + + intptr_t arg2() const + { + return m_arg2; + } + + MessageQueueMessage& set_id(uint32_t id) + { + m_id = id; + return *this; + } + + MessageQueueMessage& set_arg1(intptr_t arg1) + { + m_arg1 = arg1; + return *this; + } + + MessageQueueMessage& set_arg2(intptr_t arg2) + { + m_arg2 = arg2; + return *this; + } + +private: + uint32_t m_id; + intptr_t m_arg1; + intptr_t m_arg2; +}; + + +/** + * A @c MessageQueueHandler will be delivered messages received over a + * @c MessageQueue. + */ +class MessageQueueHandler +{ +public: + /** + * Message delivery. + * + * @param queue The queue over which the message was received. + * @param message The message. + */ + virtual void handle_message(MessageQueue& queue, const MessageQueueMessage& message) = 0; +}; + + +/** + * The class @c MessageQueue provides a cross thread message queue implemented + * on top of a pipe. + */ +class MessageQueue : private MxsPollData +{ + MessageQueue(const MessageQueue&); + MessageQueue& operator = (const MessageQueue&); + +public: + typedef MessageQueueHandler Handler; + typedef MessageQueueMessage Message; + + /** + * Creates a @c MessageQueue with the provided handler. + * + * @param pHandler The handler that will receive the messages sent over the + * message queue. Note that the handler *must* remain valid + * for the lifetime of the @c MessageQueue. + * + * @return A pointer to a new @c MessageQueue or NULL if an error occurred. + * + * @attention Before the message queue can be used, it must be added to + * a worker. + */ + static MessageQueue* create(Handler* pHandler); + + /** + * Destructor + * + * Removes itself If still added to a worker and closes the pipe. + */ + ~MessageQueue(); + + /** + * Posts a message over the queue to the handler provided when the + * @c MessageQueue was created. + * + * @param message The message to be posted. A bitwise copy of the message + * will be delivered to the handler, after an unspecified time. + * + * @return True if the message could be posted, false otherwise. Note that + * a return value of true only means that the message could successfully + * be posted, not that it has reached the handler. + * + * @attention Note that the message queue must have been added to a worker + * before a message can be posted. + */ + bool post(const Message& message) const; + + /** + * Adds the message queue to a particular worker. + * + * @param pWorker The worker the message queue should be added to. + * Must remain valid until the message queue is removed + * from it. + * + * @return True if the message queue could be added, otherwise false. + * + * @attention If the message queue is currently added to a worker, it + * will first be removed from that worker. + */ + bool add_to_worker(Worker* pWorker); + + /** + * Removes the message queue from the worker it is currently added to. + * + * @return The worker the message queue was associated with, or NULL + * if it was not associated with any. + */ + Worker* remove_from_worker(); + +private: + MessageQueue(Handler* pHandler, int read_fd, int write_fd); + + uint32_t handle_poll_events(int thread_id, uint32_t events); + + static uint32_t poll_handler(MXS_POLL_DATA* pData, int thread_id, uint32_t events); + +private: + Handler& m_handler; + int m_read_fd; + int m_write_fd; + Worker* m_pWorker; +}; + +} diff --git a/server/core/messagequeue.cc b/server/core/messagequeue.cc new file mode 100644 index 000000000..1b9db0996 --- /dev/null +++ b/server/core/messagequeue.cc @@ -0,0 +1,184 @@ +/* + * Copyright (c) 2016 MariaDB Corporation Ab + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file and at www.mariadb.com/bsl11. + * + * Change Date: 2019-07-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2 or later of the General + * Public License. + */ + +#include "maxscale/messagequeue.hh" +#include +#include +#include +#include +#include +#include +#include "maxscale/worker.hh" + +namespace maxscale +{ + +MessageQueue::MessageQueue(Handler* pHandler, int read_fd, int write_fd) + : MxsPollData(&MessageQueue::poll_handler) + , m_handler(*pHandler) + , m_read_fd(read_fd) + , m_write_fd(write_fd) + , m_pWorker(NULL) +{ + ss_dassert(pHandler); + ss_dassert(read_fd); + ss_dassert(write_fd); +} + +MessageQueue::~MessageQueue() +{ + if (m_pWorker) + { + m_pWorker->remove_fd(m_read_fd); + } + + close(m_read_fd); + close(m_write_fd); +} + +//static +MessageQueue* MessageQueue::create(Handler* pHandler) +{ + MessageQueue* pThis = NULL; + + // We create the pipe in message mode (O_DIRECT), so that we do + // not need to deal with partial messages and as non blocking so + // that the descriptor can be added to an epoll instance. + + int fds[2]; + if (pipe2(fds, O_DIRECT | O_NONBLOCK | O_CLOEXEC) == 0) + { + int read_fd = fds[0]; + int write_fd = fds[1]; + + pThis = new (std::nothrow) MessageQueue(pHandler, read_fd, write_fd); + + if (!pThis) + { + MXS_OOM(); + close(read_fd); + close(write_fd); + } + } + else + { + MXS_ERROR("Could not create pipe for worker: %s", mxs_strerror(errno)); + } + + return pThis; +} + +bool MessageQueue::post(const Message& message) const +{ + // NOTE: No logging here, this function must be signal safe. + bool rv = false; + + ss_dassert(m_pWorker); + if (m_pWorker) + { + ssize_t n = write(m_write_fd, &message, sizeof(message)); + rv = (n == sizeof(message)); + } + else + { + MXS_ERROR("Attempt to post using a message queue that is not added to a worker."); + } + + return rv; +} + +bool MessageQueue::add_to_worker(Worker* pWorker) +{ + if (m_pWorker) + { + m_pWorker->remove_fd(m_read_fd); + m_pWorker = NULL; + } + + if (pWorker->add_fd(m_read_fd, EPOLLIN, this)) + { + m_pWorker = pWorker; + } + + return m_pWorker != NULL; +} + +Worker* MessageQueue::remove_from_worker() +{ + Worker* pWorker = m_pWorker; + + if (m_pWorker) + { + m_pWorker->remove_fd(m_read_fd); + m_pWorker = NULL; + } + + return pWorker; +} + +uint32_t MessageQueue::handle_poll_events(int thread_id, uint32_t events) +{ + uint32_t rc = MXS_POLL_NOP; + + // We only expect EPOLLIN events. + ss_dassert(((events & EPOLLIN) != 0) && ((events & ~EPOLLIN) == 0)); + + if (events & EPOLLIN) + { + Message message; + + ssize_t n; + + do + { + n = read(m_read_fd, &message, sizeof(message)); + + if (n == sizeof(message)) + { + m_handler.handle_message(*this, message); + } + else if (n == -1) + { + if (errno != EWOULDBLOCK) + { + MXS_ERROR("Worker could not read from pipe: %s", mxs_strerror(errno)); + } + } + else if (n != 0) + { + // This really should not happen as the pipe is in message mode. We + // should either get a message, nothing at all or an error. In non-debug + // mode we continue reading in order to empty the pipe as otherwise the + // thread may hang. + MXS_ERROR("MessageQueue could only read %ld bytes from pipe, although " + "expected %lu bytes.", n, sizeof(message)); + ss_dassert(!true); + } + } + while ((n != 0) && (n != -1)); + + rc = MXS_POLL_READ; + } + + return rc; +} + +//static +uint32_t MessageQueue::poll_handler(MXS_POLL_DATA* pData, int thread_id, uint32_t events) +{ + MessageQueue* pThis = static_cast(pData); + + return pThis->handle_poll_events(thread_id, events); +} + +}