Add MessageQueue to Worker
The Worker no longer creates a pipe and implements the cross worker/thread message mechanism itself. Instead it has a MessageQueue instance variable for that purpose.
This commit is contained in:
@ -20,7 +20,6 @@ MXS_BEGIN_DECLS
|
|||||||
|
|
||||||
typedef struct mxs_worker
|
typedef struct mxs_worker
|
||||||
{
|
{
|
||||||
MXS_POLL_DATA m_poll; /*< The poll data used by the polling mechanism. */
|
|
||||||
} MXS_WORKER;
|
} MXS_WORKER;
|
||||||
|
|
||||||
enum mxs_worker_msg_id
|
enum mxs_worker_msg_id
|
||||||
|
@ -13,12 +13,14 @@
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
#include <maxscale/cppdefs.hh>
|
#include <maxscale/cppdefs.hh>
|
||||||
|
#include "messagequeue.hh"
|
||||||
#include "worker.h"
|
#include "worker.h"
|
||||||
|
|
||||||
namespace maxscale
|
namespace maxscale
|
||||||
{
|
{
|
||||||
|
|
||||||
class Worker : public MXS_WORKER
|
class Worker : public MXS_WORKER
|
||||||
|
, private MessageQueue::Handler
|
||||||
{
|
{
|
||||||
Worker(const Worker&);
|
Worker(const Worker&);
|
||||||
Worker& operator = (const Worker&);
|
Worker& operator = (const Worker&);
|
||||||
@ -195,27 +197,23 @@ public:
|
|||||||
static int get_current_id();
|
static int get_current_id();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
Worker(int id, int epoll_fd, int read_fd, int write_fd);
|
Worker(int id, int epoll_fd);
|
||||||
~Worker();
|
virtual ~Worker();
|
||||||
|
|
||||||
static Worker* create(int id);
|
static Worker* create(int id);
|
||||||
|
|
||||||
uint32_t poll(uint32_t events);
|
void handle_message(MessageQueue& queue, const MessageQueue::Message& msg); // override
|
||||||
static uint32_t poll_handler(MXS_POLL_DATA* pData, int worker_id, uint32_t events);
|
|
||||||
|
|
||||||
void handle_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2);
|
|
||||||
|
|
||||||
static void thread_main(void* arg);
|
static void thread_main(void* arg);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
int m_id; /*< The id of the worker. */
|
int m_id; /*< The id of the worker. */
|
||||||
int m_epoll_fd; /*< The epoll file descriptor. */
|
int m_epoll_fd; /*< The epoll file descriptor. */
|
||||||
int m_read_fd; /*< The file descriptor the worker reads from. */
|
MessageQueue* m_pQueue; /*< The message queue of the worker. */
|
||||||
int m_write_fd; /*< The file descriptor used for sending data to the worker. */
|
THREAD m_thread; /*< The thread handle of the worker. */
|
||||||
THREAD m_thread; /*< The thread handle of the worker. */
|
bool m_started; /*< Whether the thread has been started or not. */
|
||||||
bool m_started; /*< Whether the thread has been started or not. */
|
bool m_should_shutdown; /*< Whether shutdown should be performed. */
|
||||||
bool m_should_shutdown; /*< Whether shutdown should be performed. */
|
bool m_shutdown_initiated; /*< Whether shutdown has been initated. */
|
||||||
bool m_shutdown_initiated; /*< Whether shutdown has been initated. */
|
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -126,27 +126,22 @@ void poll_resolve_error(int wid, int fd, int errornum, int op)
|
|||||||
static bool modules_thread_init();
|
static bool modules_thread_init();
|
||||||
static void modules_thread_finish();
|
static void modules_thread_finish();
|
||||||
|
|
||||||
Worker::Worker(int id, int epoll_fd, int read_fd, int write_fd)
|
Worker::Worker(int id, int epoll_fd)
|
||||||
: m_id(id)
|
: m_id(id)
|
||||||
, m_epoll_fd(epoll_fd)
|
, m_epoll_fd(epoll_fd)
|
||||||
, m_read_fd(read_fd)
|
, m_pQueue(NULL)
|
||||||
, m_write_fd(write_fd)
|
, m_thread(0)
|
||||||
|
, m_started(false)
|
||||||
|
, m_should_shutdown(false)
|
||||||
|
, m_shutdown_initiated(false)
|
||||||
{
|
{
|
||||||
m_poll.handler = &Worker::poll_handler;
|
|
||||||
|
|
||||||
m_thread = 0;
|
|
||||||
m_started = false;
|
|
||||||
m_should_shutdown = false;
|
|
||||||
m_shutdown_initiated = false;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Worker::~Worker()
|
Worker::~Worker()
|
||||||
{
|
{
|
||||||
ss_dassert(!m_started);
|
ss_dassert(!m_started);
|
||||||
|
|
||||||
poll_remove_fd_from_worker(m_id, m_read_fd);
|
delete m_pQueue;
|
||||||
close(m_read_fd);
|
|
||||||
close(m_write_fd);
|
|
||||||
close(m_epoll_fd);
|
close(m_epoll_fd);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -272,14 +267,9 @@ int Worker::get_current_id()
|
|||||||
bool Worker::post_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2)
|
bool Worker::post_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2)
|
||||||
{
|
{
|
||||||
// NOTE: No logging here, this function must be signal safe.
|
// NOTE: No logging here, this function must be signal safe.
|
||||||
WORKER_MESSAGE message = {};
|
MessageQueue::Message message(msg_id, arg1, arg2);
|
||||||
message.id = msg_id;
|
|
||||||
message.arg1 = arg1;
|
|
||||||
message.arg2 = arg2;
|
|
||||||
|
|
||||||
ssize_t n = write(m_write_fd, &message, sizeof(message));
|
return m_pQueue->post(message);
|
||||||
|
|
||||||
return n == sizeof(message) ? true : false;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
bool mxs_worker_post_message(MXS_WORKER* pWorker, uint32_t msg_id, intptr_t arg1, intptr_t arg2)
|
bool mxs_worker_post_message(MXS_WORKER* pWorker, uint32_t msg_id, intptr_t arg1, intptr_t arg2)
|
||||||
@ -391,42 +381,41 @@ void Worker::shutdown_all()
|
|||||||
//static
|
//static
|
||||||
Worker* Worker::create(int worker_id)
|
Worker* Worker::create(int worker_id)
|
||||||
{
|
{
|
||||||
Worker* pWorker = NULL;
|
Worker* pThis = NULL;
|
||||||
|
|
||||||
int epoll_fd = epoll_create(MAX_EVENTS);
|
int epoll_fd = epoll_create(MAX_EVENTS);
|
||||||
|
|
||||||
if (epoll_fd != -1)
|
if (epoll_fd != -1)
|
||||||
{
|
{
|
||||||
int fds[2];
|
pThis = new (std::nothrow) Worker(worker_id, epoll_fd);
|
||||||
|
|
||||||
// We create the pipe in message mode (O_DIRECT), so that we do
|
if (pThis)
|
||||||
// not need to deal with partial messages.
|
|
||||||
if (pipe2(fds, O_DIRECT | O_NONBLOCK | O_CLOEXEC) == 0)
|
|
||||||
{
|
{
|
||||||
int read_fd = fds[0];
|
MessageQueue* pQueue = MessageQueue::create(pThis);
|
||||||
int write_fd = fds[1];
|
|
||||||
|
|
||||||
pWorker = new (std::nothrow) Worker(worker_id, epoll_fd, read_fd, write_fd);
|
if (pQueue)
|
||||||
|
|
||||||
if (pWorker)
|
|
||||||
{
|
{
|
||||||
if (!pWorker->add_fd(read_fd, EPOLLIN, &pWorker->m_poll))
|
if (pQueue->add_to_worker(pThis))
|
||||||
{
|
{
|
||||||
MXS_ERROR("Could not add read descriptor of worker to poll set: %s", mxs_strerror(errno));
|
pThis->m_pQueue = pQueue;
|
||||||
delete pWorker;
|
}
|
||||||
pWorker = NULL;
|
else
|
||||||
|
{
|
||||||
|
MXS_ERROR("Could not add message queue to worker.");
|
||||||
|
delete pThis;
|
||||||
|
pThis = NULL;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
close(read_fd);
|
MXS_ERROR("Could not create message queue for worker.");
|
||||||
close(write_fd);
|
delete pThis;
|
||||||
close(epoll_fd);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
MXS_ERROR("Could not create pipe for worker: %s", mxs_strerror(errno));
|
MXS_OOM();
|
||||||
|
close(epoll_fd);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
@ -434,7 +423,7 @@ Worker* Worker::create(int worker_id)
|
|||||||
MXS_ERROR("Could not create epoll-instance for worker: %s", mxs_strerror(errno));
|
MXS_ERROR("Could not create epoll-instance for worker: %s", mxs_strerror(errno));
|
||||||
}
|
}
|
||||||
|
|
||||||
return pWorker;
|
return pThis;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -444,14 +433,14 @@ Worker* Worker::create(int worker_id)
|
|||||||
* @param arg1 Message specific first argument.
|
* @param arg1 Message specific first argument.
|
||||||
* @param arg2 Message specific second argument.
|
* @param arg2 Message specific second argument.
|
||||||
*/
|
*/
|
||||||
void Worker::handle_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2)
|
void Worker::handle_message(MessageQueue& queue, const MessageQueue::Message& msg)
|
||||||
{
|
{
|
||||||
switch (msg_id)
|
switch (msg.id())
|
||||||
{
|
{
|
||||||
case MXS_WORKER_MSG_PING:
|
case MXS_WORKER_MSG_PING:
|
||||||
{
|
{
|
||||||
ss_dassert(arg1 == 0);
|
ss_dassert(msg.arg1() == 0);
|
||||||
char* zArg2 = reinterpret_cast<char*>(arg2);
|
char* zArg2 = reinterpret_cast<char*>(msg.arg2());
|
||||||
const char* zMessage = zArg2 ? zArg2 : "Alive and kicking";
|
const char* zMessage = zArg2 ? zArg2 : "Alive and kicking";
|
||||||
MXS_NOTICE("Worker[%d]: %s.", m_id, zMessage);
|
MXS_NOTICE("Worker[%d]: %s.", m_id, zMessage);
|
||||||
MXS_FREE(zArg2);
|
MXS_FREE(zArg2);
|
||||||
@ -467,90 +456,17 @@ void Worker::handle_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2)
|
|||||||
|
|
||||||
case MXS_WORKER_MSG_CALL:
|
case MXS_WORKER_MSG_CALL:
|
||||||
{
|
{
|
||||||
void (*f)(int, void*) = (void (*)(int,void*))arg1;
|
void (*f)(int, void*) = (void (*)(int,void*))msg.arg1();
|
||||||
|
|
||||||
f(m_id, (void*)arg2);
|
f(m_id, (void*)msg.arg2());
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
|
||||||
default:
|
default:
|
||||||
MXS_ERROR("Worker received unknown message %d.", msg_id);
|
MXS_ERROR("Worker received unknown message %d.", msg.id());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Worker poll handler.
|
|
||||||
*
|
|
||||||
* @param thread_id Id of the thread; same as id of the relevant worker.
|
|
||||||
* @param events Epoll events.
|
|
||||||
*
|
|
||||||
* @return What events the handler handled.
|
|
||||||
*/
|
|
||||||
uint32_t Worker::poll(uint32_t events)
|
|
||||||
{
|
|
||||||
int rc = MXS_POLL_NOP;
|
|
||||||
|
|
||||||
// We only expect EPOLLIN events.
|
|
||||||
ss_dassert(((events & EPOLLIN) != 0) && ((events & ~EPOLLIN) == 0));
|
|
||||||
|
|
||||||
if (events & EPOLLIN)
|
|
||||||
{
|
|
||||||
WORKER_MESSAGE message;
|
|
||||||
|
|
||||||
ssize_t n;
|
|
||||||
|
|
||||||
do
|
|
||||||
{
|
|
||||||
n = read(m_read_fd, &message, sizeof(message));
|
|
||||||
|
|
||||||
if (n == sizeof(message))
|
|
||||||
{
|
|
||||||
handle_message(message.id, message.arg1, message.arg2);
|
|
||||||
}
|
|
||||||
else if (n == -1)
|
|
||||||
{
|
|
||||||
if (errno != EWOULDBLOCK)
|
|
||||||
{
|
|
||||||
MXS_ERROR("Worker could not read from pipe: %s", mxs_strerror(errno));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else if (n != 0)
|
|
||||||
{
|
|
||||||
// This really should not happen as the pipe is in message mode. We
|
|
||||||
// should either get a message, nothing at all or an error. In non-debug
|
|
||||||
// mode we continue reading in order to empty the pipe as otherwise the
|
|
||||||
// thread may hang.
|
|
||||||
MXS_ERROR("Worker could only read %ld bytes from pipe, although expected %lu bytes.",
|
|
||||||
n, sizeof(message));
|
|
||||||
ss_dassert(!true);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
while ((n != 0) && (n != -1));
|
|
||||||
|
|
||||||
rc = MXS_POLL_READ;
|
|
||||||
}
|
|
||||||
|
|
||||||
return rc;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Handler for poll events related to the read descriptor of the worker.
|
|
||||||
*
|
|
||||||
* @param pData The MXS_POLL_DATA of the worker in question.
|
|
||||||
* @param thread_id Id of the thread; same as id of the relevant worker.
|
|
||||||
* @param events Epoll events.
|
|
||||||
*
|
|
||||||
* @return What events the handler handled.
|
|
||||||
*/
|
|
||||||
//static
|
|
||||||
uint32_t Worker::poll_handler(MXS_POLL_DATA* pData, int thread_id, uint32_t events)
|
|
||||||
{
|
|
||||||
Worker* pWorker = reinterpret_cast<Worker*>(pData);
|
|
||||||
ss_dassert(pWorker->m_id == thread_id);
|
|
||||||
|
|
||||||
return pWorker->poll(events);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The entry point of each worker thread.
|
* The entry point of each worker thread.
|
||||||
*
|
*
|
||||||
|
Reference in New Issue
Block a user