Turn worker into a C++ class

This is the first step in turning the worker mechanism and everything
around it into a set of C++ classes. In this change, the original C
API is still present, but in subsequent changes that will be removed.
This commit is contained in:
Johan Wikman
2017-04-05 13:01:36 +03:00
parent ece77c4478
commit 0e4e889c15
3 changed files with 171 additions and 107 deletions

View File

@ -20,14 +20,14 @@ MXS_BEGIN_DECLS
typedef struct mxs_worker typedef struct mxs_worker
{ {
MXS_POLL_DATA poll; /*< The poll data used by the polling mechanism. */ MXS_POLL_DATA m_poll; /*< The poll data used by the polling mechanism. */
int id; /*< The id of the worker. */ int m_id; /*< The id of the worker. */
int read_fd; /*< The file descriptor the worked reads from. */ int m_read_fd; /*< The file descriptor the worked reads from. */
int write_fd; /*< The file descriptor used for sending data to the worker. */ int m_write_fd; /*< The file descriptor used for sending data to the worker. */
THREAD thread; /*< The thread handle of the worker. */ THREAD m_thread; /*< The thread handle of the worker. */
bool started; /*< Whether the thread has been started or not. */ bool m_started; /*< Whether the thread has been started or not. */
bool should_shutdown; /*< Whether shutdown should be performed. */ bool m_should_shutdown; /*< Whether shutdown should be performed. */
bool shutdown_initiated; /*< Whether shutdown has been initated. */ bool m_shutdown_initiated; /*< Whether shutdown has been initated. */
} MXS_WORKER; } MXS_WORKER;
enum mxs_worker_msg_id enum mxs_worker_msg_id
@ -91,9 +91,9 @@ int mxs_worker_get_current_id();
* *
* @return The id of the worker. * @return The id of the worker.
*/ */
static inline int mxs_worker_id(MXS_WORKER* worker) static inline int mxs_worker_id(MXS_WORKER* pWorker)
{ {
return worker->id; return pWorker->m_id;
} }
/** /**

View File

@ -90,7 +90,7 @@ void mxs_worker_shutdown_workers();
*/ */
static inline bool mxs_worker_should_shutdown(MXS_WORKER* worker) static inline bool mxs_worker_should_shutdown(MXS_WORKER* worker)
{ {
return worker->should_shutdown; return worker->m_should_shutdown;
} }
MXS_END_DECLS MXS_END_DECLS

View File

@ -11,7 +11,7 @@
* Public License. * Public License.
*/ */
#include "maxscale/worker.h" #include "maxscale/worker.hh"
#include <errno.h> #include <errno.h>
#include <fcntl.h> #include <fcntl.h>
#include <stdlib.h> #include <stdlib.h>
@ -25,13 +25,15 @@
#define WORKER_ABSENT_ID -1 #define WORKER_ABSENT_ID -1
using maxscale::Worker;
/** /**
* Unit variables. * Unit variables.
*/ */
static struct this_unit static struct this_unit
{ {
int n_workers; // How many workers there are. int n_workers; // How many workers there are.
MXS_WORKER** workers; // Array of worker instances. Worker** ppWorkers; // Array of worker instances.
} this_unit = } this_unit =
{ {
0, 0,
@ -59,34 +61,46 @@ typedef struct worker_message
static MXS_WORKER* worker_create(int worker_id); static MXS_WORKER* worker_create(int worker_id);
static void worker_free(MXS_WORKER* worker); static void worker_free(MXS_WORKER* worker);
static void worker_message_handler(MXS_WORKER* worker, uint32_t msg_id, intptr_t arg1, intptr_t arg2);
static uint32_t worker_poll_handler(MXS_POLL_DATA *data, int worker_id, uint32_t events); static uint32_t worker_poll_handler(MXS_POLL_DATA *data, int worker_id, uint32_t events);
static void worker_thread_main(void* arg);
static bool modules_thread_init(); static bool modules_thread_init();
static void modules_thread_finish(); static void modules_thread_finish();
Worker::Worker(int id, int read_fd, int write_fd)
{
m_poll.handler = &Worker::poll_handler;
m_id = id;
m_read_fd = read_fd;
m_write_fd = write_fd;
m_thread = 0;
m_started = false;
m_should_shutdown = false;
m_shutdown_initiated = false;
}
void mxs_worker_init() // static
void Worker::init()
{ {
this_unit.n_workers = config_threadcount(); this_unit.n_workers = config_threadcount();
this_unit.workers = (MXS_WORKER**) MXS_CALLOC(this_unit.n_workers, sizeof(MXS_WORKER*)); this_unit.ppWorkers = new (std::nothrow) Worker* (); // Zero initialized array
if (!this_unit.workers) if (!this_unit.ppWorkers)
{ {
// If we cannot allocate the array, we just exit.
exit(-1); exit(-1);
} }
for (int i = 0; i < this_unit.n_workers; ++i) for (int i = 0; i < this_unit.n_workers; ++i)
{ {
MXS_WORKER* worker = worker_create(i); Worker* pWorker = Worker::create(i);
if (worker) if (pWorker)
{ {
this_unit.workers[i] = worker; this_unit.ppWorkers[i] = pWorker;
} }
else else
{ {
// If a worker cannot be created, we just exit. No way we can continue.
exit(-1); exit(-1);
} }
} }
@ -94,22 +108,37 @@ void mxs_worker_init()
MXS_NOTICE("Workers created!"); MXS_NOTICE("Workers created!");
} }
void mxs_worker_finish() void mxs_worker_init()
{
Worker::init();
}
void Worker::finish()
{ {
for (int i = 0; i < this_unit.n_workers; ++i) for (int i = 0; i < this_unit.n_workers; ++i)
{ {
MXS_WORKER* worker = this_unit.workers[i]; Worker* pWorker = this_unit.ppWorkers[i];
worker_free(worker); delete pWorker;
this_unit.workers[i] = NULL; this_unit.ppWorkers[i] = NULL;
} }
} }
void mxs_worker_finish()
{
Worker::finish();
}
Worker* Worker::get(int worker_id)
{
ss_dassert(worker_id < this_unit.n_workers);
return this_unit.ppWorkers[worker_id];
}
MXS_WORKER* mxs_worker_get(int worker_id) MXS_WORKER* mxs_worker_get(int worker_id)
{ {
ss_dassert(worker_id < this_unit.n_workers); return Worker::get(worker_id);
return this_unit.workers[worker_id];
} }
MXS_WORKER* mxs_worker_get_current() MXS_WORKER* mxs_worker_get_current()
@ -131,18 +160,22 @@ int mxs_worker_get_current_id()
return this_thread.current_worker_id; return this_thread.current_worker_id;
} }
bool mxs_worker_post_message(MXS_WORKER *worker, uint32_t msg_id, intptr_t arg1, intptr_t arg2) bool Worker::post_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2)
{ {
// NOTE: No logging here, this function must be signal safe. // NOTE: No logging here, this function must be signal safe.
WORKER_MESSAGE message = { .id = msg_id, .arg1 = arg1, .arg2 = arg2 }; WORKER_MESSAGE message = { .id = msg_id, .arg1 = arg1, .arg2 = arg2 };
ssize_t n = write(worker->write_fd, &message, sizeof(message)); ssize_t n = write(m_write_fd, &message, sizeof(message));
return n == sizeof(message) ? true : false; return n == sizeof(message) ? true : false;
} }
size_t mxs_worker_broadcast_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2) bool mxs_worker_post_message(MXS_WORKER* pWorker, uint32_t msg_id, intptr_t arg1, intptr_t arg2)
{
return static_cast<Worker*>(pWorker)->post_message(msg_id, arg1, arg2);
}
size_t Worker::broadcast_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2)
{ {
// NOTE: No logging here, this function must be signal safe. // NOTE: No logging here, this function must be signal safe.
@ -150,9 +183,9 @@ size_t mxs_worker_broadcast_message(uint32_t msg_id, intptr_t arg1, intptr_t arg
for (int i = 0; i < this_unit.n_workers; ++i) for (int i = 0; i < this_unit.n_workers; ++i)
{ {
MXS_WORKER* worker = this_unit.workers[i]; Worker* pWorker = this_unit.ppWorkers[i];
if (mxs_worker_post_message(worker, msg_id, arg1, arg2)) if (pWorker->post_message(msg_id, arg1, arg2))
{ {
++n; ++n;
} }
@ -161,61 +194,93 @@ size_t mxs_worker_broadcast_message(uint32_t msg_id, intptr_t arg1, intptr_t arg
return n; return n;
} }
void mxs_worker_main(MXS_WORKER* worker) size_t mxs_worker_broadcast_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2)
{ {
this_thread.current_worker_id = worker->id; return Worker::broadcast_message(msg_id, arg1, arg2);
poll_waitevents(worker); }
void Worker::run()
{
this_thread.current_worker_id = m_id;
poll_waitevents(this);
this_thread.current_worker_id = WORKER_ABSENT_ID; this_thread.current_worker_id = WORKER_ABSENT_ID;
MXS_NOTICE("Worker %d has shut down.", worker->id); MXS_NOTICE("Worker %d has shut down.", m_id);
} }
bool mxs_worker_start(MXS_WORKER* worker) void mxs_worker_main(MXS_WORKER* pWorker)
{ {
if (thread_start(&worker->thread, worker_thread_main, worker)) return static_cast<Worker*>(pWorker)->run();
{
worker->started = true;
} }
return worker->started; bool Worker::start()
{
m_started = true;
if (!thread_start(&m_thread, &Worker::thread_main, this))
{
m_started = false;
} }
void mxs_worker_join(MXS_WORKER* worker) return m_started;
}
bool mxs_worker_start(MXS_WORKER* pWorker)
{ {
if (worker->started) return static_cast<Worker*>(pWorker)->start();
}
void Worker::join()
{ {
MXS_NOTICE("Waiting for worker %d.", worker->id); if (m_started)
thread_wait(worker->thread); {
MXS_NOTICE("Waited for worker %d.", worker->id); MXS_NOTICE("Waiting for worker %d.", m_id);
worker->started = false; thread_wait(m_thread);
MXS_NOTICE("Waited for worker %d.", m_id);
m_started = false;
} }
} }
void mxs_worker_shutdown(MXS_WORKER* worker) void mxs_worker_join(MXS_WORKER* pWorker)
{
static_cast<Worker*>(pWorker)->join();
}
void Worker::shutdown()
{ {
// NOTE: No logging here, this function must be signal safe. // NOTE: No logging here, this function must be signal safe.
if (!worker->shutdown_initiated) if (!m_shutdown_initiated)
{ {
if (mxs_worker_post_message(worker, MXS_WORKER_MSG_SHUTDOWN, 0, 0)) if (mxs_worker_post_message(this, MXS_WORKER_MSG_SHUTDOWN, 0, 0))
{ {
worker->shutdown_initiated = true; m_shutdown_initiated = true;
} }
} }
} }
void mxs_worker_shutdown_workers() void mxs_worker_shutdown(MXS_WORKER* pWorker)
{
static_cast<Worker*>(pWorker)->shutdown();
}
void Worker::shutdown_all()
{ {
// NOTE: No logging here, this function must be signal safe. // NOTE: No logging here, this function must be signal safe.
for (int i = 0; i < this_unit.n_workers; ++i) for (int i = 0; i < this_unit.n_workers; ++i)
{ {
MXS_WORKER* worker = this_unit.workers[i]; Worker* pWorker = this_unit.ppWorkers[i];
mxs_worker_shutdown(worker); pWorker->shutdown();
} }
} }
void mxs_worker_shutdown_workers()
{
return Worker::shutdown_all();
}
/** /**
* Creates a worker instance. * Creates a worker instance.
* - Allocates the structure. * - Allocates the structure.
@ -226,12 +291,11 @@ void mxs_worker_shutdown_workers()
* *
* @return A worker instance if successful, otherwise NULL. * @return A worker instance if successful, otherwise NULL.
*/ */
static MXS_WORKER* worker_create(int worker_id) //static
Worker* Worker::create(int worker_id)
{ {
MXS_WORKER* worker = (MXS_WORKER*)MXS_CALLOC(1, sizeof(MXS_WORKER)); Worker* pWorker = NULL;
if (worker)
{
int fds[2]; int fds[2];
// We create the pipe in message mode (O_DIRECT), so that we do // We create the pipe in message mode (O_DIRECT), so that we do
@ -241,27 +305,24 @@ static MXS_WORKER* worker_create(int worker_id)
int read_fd = fds[0]; int read_fd = fds[0];
int write_fd = fds[1]; int write_fd = fds[1];
worker->poll.handler = worker_poll_handler; pWorker = new (std::nothrow) Worker(worker_id, read_fd, write_fd);
worker->id = worker_id;
worker->read_fd = read_fd;
worker->write_fd = write_fd;
if (poll_add_fd_to_worker(worker->id, worker->read_fd, EPOLLIN, &worker->poll) != 0) if (pWorker)
{
if (poll_add_fd_to_worker(worker_id, read_fd, EPOLLIN, &pWorker->m_poll) != 0)
{ {
MXS_ERROR("Could not add read descriptor of worker to poll set: %s", mxs_strerror(errno)); MXS_ERROR("Could not add read descriptor of worker to poll set: %s", mxs_strerror(errno));
worker_free(worker); delete pWorker;
worker = NULL; pWorker = NULL;
}
} }
} }
else else
{ {
MXS_ERROR("Could not create pipe for worker: %s", mxs_strerror(errno)); MXS_ERROR("Could not create pipe for worker: %s", mxs_strerror(errno));
MXS_FREE(worker);
worker = NULL;
}
} }
return worker; return pWorker;
} }
/** /**
@ -269,18 +330,13 @@ static MXS_WORKER* worker_create(int worker_id)
* *
* @param worker The worker instance to be freed. * @param worker The worker instance to be freed.
*/ */
static void worker_free(MXS_WORKER* worker) Worker::~Worker()
{ {
if (worker) ss_dassert(!m_started);
{
ss_dassert(!worker->started);
poll_remove_fd_from_worker(worker->id, worker->read_fd); poll_remove_fd_from_worker(m_id, m_read_fd);
close(worker->read_fd); close(m_read_fd);
close(worker->write_fd); close(m_write_fd);
MXS_FREE(worker);
}
} }
/** /**
@ -291,23 +347,24 @@ static void worker_free(MXS_WORKER* worker)
* @param arg1 Message specific first argument. * @param arg1 Message specific first argument.
* @param arg2 Message specific second argument. * @param arg2 Message specific second argument.
*/ */
static void worker_message_handler(MXS_WORKER *worker, uint32_t msg_id, intptr_t arg1, intptr_t arg2) void Worker::handle_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2)
{ {
switch (msg_id) switch (msg_id)
{ {
case MXS_WORKER_MSG_PING: case MXS_WORKER_MSG_PING:
{ {
ss_dassert(arg1 == 0); ss_dassert(arg1 == 0);
const char* message = arg2 ? (const char*)arg2 : "Alive and kicking"; char* zArg2 = reinterpret_cast<char*>(arg2);
MXS_NOTICE("Worker[%d]: %s.", worker->id, message); const char* zMessage = zArg2 ? zArg2 : "Alive and kicking";
MXS_FREE((void*)arg2); MXS_NOTICE("Worker[%d]: %s.", m_id, zMessage);
MXS_FREE(zArg2);
} }
break; break;
case MXS_WORKER_MSG_SHUTDOWN: case MXS_WORKER_MSG_SHUTDOWN:
{ {
MXS_NOTICE("Worker %d received shutdown message.", worker->id); MXS_NOTICE("Worker %d received shutdown message.", m_id);
worker->should_shutdown = true; m_should_shutdown = true;
} }
break; break;
@ -315,7 +372,7 @@ static void worker_message_handler(MXS_WORKER *worker, uint32_t msg_id, intptr_t
{ {
void (*f)(int, void*) = (void (*)(int,void*))arg1; void (*f)(int, void*) = (void (*)(int,void*))arg1;
f(worker->id, (void*)arg2); f(m_id, (void*)arg2);
} }
break; break;
@ -333,11 +390,8 @@ static void worker_message_handler(MXS_WORKER *worker, uint32_t msg_id, intptr_t
* *
* @return What events the handler handled. * @return What events the handler handled.
*/ */
static uint32_t worker_poll_handler(MXS_POLL_DATA *data, int thread_id, uint32_t events) uint32_t Worker::poll(uint32_t events)
{ {
MXS_WORKER *worker = (MXS_WORKER*)data;
ss_dassert(worker->id == thread_id);
int rc = MXS_POLL_NOP; int rc = MXS_POLL_NOP;
// We only expect EPOLLIN events. // We only expect EPOLLIN events.
@ -351,11 +405,11 @@ static uint32_t worker_poll_handler(MXS_POLL_DATA *data, int thread_id, uint32_t
do do
{ {
n = read(worker->read_fd, &message, sizeof(message)); n = read(m_read_fd, &message, sizeof(message));
if (n == sizeof(message)) if (n == sizeof(message))
{ {
worker_message_handler(worker, message.id, message.arg1, message.arg2); handle_message(message.id, message.arg1, message.arg2);
} }
else if (n == -1) else if (n == -1)
{ {
@ -383,18 +437,28 @@ static uint32_t worker_poll_handler(MXS_POLL_DATA *data, int thread_id, uint32_t
return rc; return rc;
} }
//static
uint32_t Worker::poll_handler(MXS_POLL_DATA* pData, int thread_id, uint32_t events)
{
Worker* pWorker = reinterpret_cast<Worker*>(pData);
ss_dassert(pWorker->m_id == thread_id);
return pWorker->poll(events);
}
/** /**
* The entry point of each worker thread. * The entry point of each worker thread.
* *
* @param arg A worker. * @param arg A worker.
*/ */
static void worker_thread_main(void* arg) //static
void Worker::thread_main(void* pArg)
{ {
if (modules_thread_init()) if (modules_thread_init())
{ {
MXS_WORKER *worker = (MXS_WORKER*)arg; Worker* pWorker = static_cast<Worker*>(pArg);
mxs_worker_main(worker); pWorker->run();
modules_thread_finish(); modules_thread_finish();
} }