diff --git a/include/maxscale/worker.h b/include/maxscale/worker.h index 25f4196cb..3a9907eb5 100644 --- a/include/maxscale/worker.h +++ b/include/maxscale/worker.h @@ -20,14 +20,14 @@ MXS_BEGIN_DECLS typedef struct mxs_worker { - MXS_POLL_DATA poll; /*< The poll data used by the polling mechanism. */ - int id; /*< The id of the worker. */ - int read_fd; /*< The file descriptor the worked reads from. */ - int write_fd; /*< The file descriptor used for sending data to the worker. */ - THREAD thread; /*< The thread handle of the worker. */ - bool started; /*< Whether the thread has been started or not. */ - bool should_shutdown; /*< Whether shutdown should be performed. */ - bool shutdown_initiated; /*< Whether shutdown has been initated. */ + MXS_POLL_DATA m_poll; /*< The poll data used by the polling mechanism. */ + int m_id; /*< The id of the worker. */ + int m_read_fd; /*< The file descriptor the worked reads from. */ + int m_write_fd; /*< The file descriptor used for sending data to the worker. */ + THREAD m_thread; /*< The thread handle of the worker. */ + bool m_started; /*< Whether the thread has been started or not. */ + bool m_should_shutdown; /*< Whether shutdown should be performed. */ + bool m_shutdown_initiated; /*< Whether shutdown has been initated. */ } MXS_WORKER; enum mxs_worker_msg_id @@ -91,9 +91,9 @@ int mxs_worker_get_current_id(); * * @return The id of the worker. */ -static inline int mxs_worker_id(MXS_WORKER* worker) +static inline int mxs_worker_id(MXS_WORKER* pWorker) { - return worker->id; + return pWorker->m_id; } /** diff --git a/server/core/maxscale/worker.h b/server/core/maxscale/worker.h index 244576d85..ed6e8fd59 100644 --- a/server/core/maxscale/worker.h +++ b/server/core/maxscale/worker.h @@ -90,7 +90,7 @@ void mxs_worker_shutdown_workers(); */ static inline bool mxs_worker_should_shutdown(MXS_WORKER* worker) { - return worker->should_shutdown; + return worker->m_should_shutdown; } MXS_END_DECLS diff --git a/server/core/worker.cc b/server/core/worker.cc index f85753328..d1a070674 100644 --- a/server/core/worker.cc +++ b/server/core/worker.cc @@ -11,7 +11,7 @@ * Public License. */ -#include "maxscale/worker.h" +#include "maxscale/worker.hh" #include #include #include @@ -25,13 +25,15 @@ #define WORKER_ABSENT_ID -1 +using maxscale::Worker; + /** * Unit variables. */ static struct this_unit { - int n_workers; // How many workers there are. - MXS_WORKER** workers; // Array of worker instances. + int n_workers; // How many workers there are. + Worker** ppWorkers; // Array of worker instances. } this_unit = { 0, @@ -59,34 +61,46 @@ typedef struct worker_message static MXS_WORKER* worker_create(int worker_id); static void worker_free(MXS_WORKER* worker); -static void worker_message_handler(MXS_WORKER* worker, uint32_t msg_id, intptr_t arg1, intptr_t arg2); static uint32_t worker_poll_handler(MXS_POLL_DATA *data, int worker_id, uint32_t events); -static void worker_thread_main(void* arg); static bool modules_thread_init(); static void modules_thread_finish(); +Worker::Worker(int id, int read_fd, int write_fd) +{ + m_poll.handler = &Worker::poll_handler; + m_id = id; + m_read_fd = read_fd; + m_write_fd = write_fd; + m_thread = 0; + m_started = false; + m_should_shutdown = false; + m_shutdown_initiated = false; +} -void mxs_worker_init() +// static +void Worker::init() { this_unit.n_workers = config_threadcount(); - this_unit.workers = (MXS_WORKER**) MXS_CALLOC(this_unit.n_workers, sizeof(MXS_WORKER*)); + this_unit.ppWorkers = new (std::nothrow) Worker* (); // Zero initialized array - if (!this_unit.workers) + if (!this_unit.ppWorkers) { + // If we cannot allocate the array, we just exit. exit(-1); } for (int i = 0; i < this_unit.n_workers; ++i) { - MXS_WORKER* worker = worker_create(i); + Worker* pWorker = Worker::create(i); - if (worker) + if (pWorker) { - this_unit.workers[i] = worker; + this_unit.ppWorkers[i] = pWorker; } else { + // If a worker cannot be created, we just exit. No way we can continue. exit(-1); } } @@ -94,22 +108,37 @@ void mxs_worker_init() MXS_NOTICE("Workers created!"); } -void mxs_worker_finish() +void mxs_worker_init() +{ + Worker::init(); +} + +void Worker::finish() { for (int i = 0; i < this_unit.n_workers; ++i) { - MXS_WORKER* worker = this_unit.workers[i]; + Worker* pWorker = this_unit.ppWorkers[i]; - worker_free(worker); - this_unit.workers[i] = NULL; + delete pWorker; + this_unit.ppWorkers[i] = NULL; } } +void mxs_worker_finish() +{ + Worker::finish(); +} + +Worker* Worker::get(int worker_id) +{ + ss_dassert(worker_id < this_unit.n_workers); + + return this_unit.ppWorkers[worker_id]; +} + MXS_WORKER* mxs_worker_get(int worker_id) { - ss_dassert(worker_id < this_unit.n_workers); - - return this_unit.workers[worker_id]; + return Worker::get(worker_id); } MXS_WORKER* mxs_worker_get_current() @@ -131,18 +160,22 @@ int mxs_worker_get_current_id() return this_thread.current_worker_id; } -bool mxs_worker_post_message(MXS_WORKER *worker, uint32_t msg_id, intptr_t arg1, intptr_t arg2) +bool Worker::post_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2) { // NOTE: No logging here, this function must be signal safe. - WORKER_MESSAGE message = { .id = msg_id, .arg1 = arg1, .arg2 = arg2 }; - ssize_t n = write(worker->write_fd, &message, sizeof(message)); + ssize_t n = write(m_write_fd, &message, sizeof(message)); return n == sizeof(message) ? true : false; } -size_t mxs_worker_broadcast_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2) +bool mxs_worker_post_message(MXS_WORKER* pWorker, uint32_t msg_id, intptr_t arg1, intptr_t arg2) +{ + return static_cast(pWorker)->post_message(msg_id, arg1, arg2); +} + +size_t Worker::broadcast_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2) { // NOTE: No logging here, this function must be signal safe. @@ -150,9 +183,9 @@ size_t mxs_worker_broadcast_message(uint32_t msg_id, intptr_t arg1, intptr_t arg for (int i = 0; i < this_unit.n_workers; ++i) { - MXS_WORKER* worker = this_unit.workers[i]; + Worker* pWorker = this_unit.ppWorkers[i]; - if (mxs_worker_post_message(worker, msg_id, arg1, arg2)) + if (pWorker->post_message(msg_id, arg1, arg2)) { ++n; } @@ -161,61 +194,93 @@ size_t mxs_worker_broadcast_message(uint32_t msg_id, intptr_t arg1, intptr_t arg return n; } -void mxs_worker_main(MXS_WORKER* worker) +size_t mxs_worker_broadcast_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2) { - this_thread.current_worker_id = worker->id; - poll_waitevents(worker); + return Worker::broadcast_message(msg_id, arg1, arg2); +} + +void Worker::run() +{ + this_thread.current_worker_id = m_id; + poll_waitevents(this); this_thread.current_worker_id = WORKER_ABSENT_ID; - MXS_NOTICE("Worker %d has shut down.", worker->id); + MXS_NOTICE("Worker %d has shut down.", m_id); } -bool mxs_worker_start(MXS_WORKER* worker) +void mxs_worker_main(MXS_WORKER* pWorker) { - if (thread_start(&worker->thread, worker_thread_main, worker)) + return static_cast(pWorker)->run(); +} + +bool Worker::start() +{ + m_started = true; + + if (!thread_start(&m_thread, &Worker::thread_main, this)) { - worker->started = true; + m_started = false; } - return worker->started; + return m_started; } -void mxs_worker_join(MXS_WORKER* worker) +bool mxs_worker_start(MXS_WORKER* pWorker) { - if (worker->started) + return static_cast(pWorker)->start(); +} + +void Worker::join() +{ + if (m_started) { - MXS_NOTICE("Waiting for worker %d.", worker->id); - thread_wait(worker->thread); - MXS_NOTICE("Waited for worker %d.", worker->id); - worker->started = false; + MXS_NOTICE("Waiting for worker %d.", m_id); + thread_wait(m_thread); + MXS_NOTICE("Waited for worker %d.", m_id); + m_started = false; } } -void mxs_worker_shutdown(MXS_WORKER* worker) +void mxs_worker_join(MXS_WORKER* pWorker) +{ + static_cast(pWorker)->join(); +} + +void Worker::shutdown() { // NOTE: No logging here, this function must be signal safe. - if (!worker->shutdown_initiated) + if (!m_shutdown_initiated) { - if (mxs_worker_post_message(worker, MXS_WORKER_MSG_SHUTDOWN, 0, 0)) + if (mxs_worker_post_message(this, MXS_WORKER_MSG_SHUTDOWN, 0, 0)) { - worker->shutdown_initiated = true; + m_shutdown_initiated = true; } } } -void mxs_worker_shutdown_workers() +void mxs_worker_shutdown(MXS_WORKER* pWorker) +{ + static_cast(pWorker)->shutdown(); +} + +void Worker::shutdown_all() { // NOTE: No logging here, this function must be signal safe. for (int i = 0; i < this_unit.n_workers; ++i) { - MXS_WORKER* worker = this_unit.workers[i]; + Worker* pWorker = this_unit.ppWorkers[i]; - mxs_worker_shutdown(worker); + pWorker->shutdown(); } } +void mxs_worker_shutdown_workers() +{ + return Worker::shutdown_all(); +} + /** * Creates a worker instance. * - Allocates the structure. @@ -226,42 +291,38 @@ void mxs_worker_shutdown_workers() * * @return A worker instance if successful, otherwise NULL. */ -static MXS_WORKER* worker_create(int worker_id) +//static +Worker* Worker::create(int worker_id) { - MXS_WORKER* worker = (MXS_WORKER*)MXS_CALLOC(1, sizeof(MXS_WORKER)); + Worker* pWorker = NULL; - if (worker) + int fds[2]; + + // We create the pipe in message mode (O_DIRECT), so that we do + // not need to deal with partial messages. + if (pipe2(fds, O_DIRECT | O_NONBLOCK | O_CLOEXEC) == 0) { - int fds[2]; + int read_fd = fds[0]; + int write_fd = fds[1]; - // We create the pipe in message mode (O_DIRECT), so that we do - // not need to deal with partial messages. - if (pipe2(fds, O_DIRECT | O_NONBLOCK | O_CLOEXEC) == 0) + pWorker = new (std::nothrow) Worker(worker_id, read_fd, write_fd); + + if (pWorker) { - int read_fd = fds[0]; - int write_fd = fds[1]; - - worker->poll.handler = worker_poll_handler; - worker->id = worker_id; - worker->read_fd = read_fd; - worker->write_fd = write_fd; - - if (poll_add_fd_to_worker(worker->id, worker->read_fd, EPOLLIN, &worker->poll) != 0) + if (poll_add_fd_to_worker(worker_id, read_fd, EPOLLIN, &pWorker->m_poll) != 0) { MXS_ERROR("Could not add read descriptor of worker to poll set: %s", mxs_strerror(errno)); - worker_free(worker); - worker = NULL; + delete pWorker; + pWorker = NULL; } } - else - { - MXS_ERROR("Could not create pipe for worker: %s", mxs_strerror(errno)); - MXS_FREE(worker); - worker = NULL; - } + } + else + { + MXS_ERROR("Could not create pipe for worker: %s", mxs_strerror(errno)); } - return worker; + return pWorker; } /** @@ -269,18 +330,13 @@ static MXS_WORKER* worker_create(int worker_id) * * @param worker The worker instance to be freed. */ -static void worker_free(MXS_WORKER* worker) +Worker::~Worker() { - if (worker) - { - ss_dassert(!worker->started); + ss_dassert(!m_started); - poll_remove_fd_from_worker(worker->id, worker->read_fd); - close(worker->read_fd); - close(worker->write_fd); - - MXS_FREE(worker); - } + poll_remove_fd_from_worker(m_id, m_read_fd); + close(m_read_fd); + close(m_write_fd); } /** @@ -291,23 +347,24 @@ static void worker_free(MXS_WORKER* worker) * @param arg1 Message specific first argument. * @param arg2 Message specific second argument. */ -static void worker_message_handler(MXS_WORKER *worker, uint32_t msg_id, intptr_t arg1, intptr_t arg2) +void Worker::handle_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2) { switch (msg_id) { case MXS_WORKER_MSG_PING: { ss_dassert(arg1 == 0); - const char* message = arg2 ? (const char*)arg2 : "Alive and kicking"; - MXS_NOTICE("Worker[%d]: %s.", worker->id, message); - MXS_FREE((void*)arg2); + char* zArg2 = reinterpret_cast(arg2); + const char* zMessage = zArg2 ? zArg2 : "Alive and kicking"; + MXS_NOTICE("Worker[%d]: %s.", m_id, zMessage); + MXS_FREE(zArg2); } break; case MXS_WORKER_MSG_SHUTDOWN: { - MXS_NOTICE("Worker %d received shutdown message.", worker->id); - worker->should_shutdown = true; + MXS_NOTICE("Worker %d received shutdown message.", m_id); + m_should_shutdown = true; } break; @@ -315,7 +372,7 @@ static void worker_message_handler(MXS_WORKER *worker, uint32_t msg_id, intptr_t { void (*f)(int, void*) = (void (*)(int,void*))arg1; - f(worker->id, (void*)arg2); + f(m_id, (void*)arg2); } break; @@ -333,11 +390,8 @@ static void worker_message_handler(MXS_WORKER *worker, uint32_t msg_id, intptr_t * * @return What events the handler handled. */ -static uint32_t worker_poll_handler(MXS_POLL_DATA *data, int thread_id, uint32_t events) +uint32_t Worker::poll(uint32_t events) { - MXS_WORKER *worker = (MXS_WORKER*)data; - ss_dassert(worker->id == thread_id); - int rc = MXS_POLL_NOP; // We only expect EPOLLIN events. @@ -351,11 +405,11 @@ static uint32_t worker_poll_handler(MXS_POLL_DATA *data, int thread_id, uint32_t do { - n = read(worker->read_fd, &message, sizeof(message)); + n = read(m_read_fd, &message, sizeof(message)); if (n == sizeof(message)) { - worker_message_handler(worker, message.id, message.arg1, message.arg2); + handle_message(message.id, message.arg1, message.arg2); } else if (n == -1) { @@ -383,18 +437,28 @@ static uint32_t worker_poll_handler(MXS_POLL_DATA *data, int thread_id, uint32_t return rc; } +//static +uint32_t Worker::poll_handler(MXS_POLL_DATA* pData, int thread_id, uint32_t events) +{ + Worker* pWorker = reinterpret_cast(pData); + ss_dassert(pWorker->m_id == thread_id); + + return pWorker->poll(events); +} + /** * The entry point of each worker thread. * * @param arg A worker. */ -static void worker_thread_main(void* arg) +//static +void Worker::thread_main(void* pArg) { if (modules_thread_init()) { - MXS_WORKER *worker = (MXS_WORKER*)arg; + Worker* pWorker = static_cast(pArg); - mxs_worker_main(worker); + pWorker->run(); modules_thread_finish(); }