Files
MaxScale/server/core/worker.cc
Johan Wikman d20c89be37 Use epoll instance of Worker
Now the epoll instance of the Worker is used when polling. The
work is still done in poll.cc and the worker provides the descriptor
and thread id.

The comments of poll_waitevents() have been removed; they were not
accurate anymore so better to let the code speak for itself.
2017-04-20 13:51:16 +03:00

641 lines
14 KiB
C++

/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2019-07-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include "maxscale/worker.hh"
#include <errno.h>
#include <fcntl.h>
#include <signal.h>
#include <stdlib.h>
#include <unistd.h>
#include <maxscale/alloc.h>
#include <maxscale/config.h>
#include <maxscale/log_manager.h>
#include <maxscale/platform.h>
#include "maxscale/modules.h"
#include "maxscale/poll.h"
#define WORKER_ABSENT_ID -1
using maxscale::Worker;
namespace
{
/**
* Unit variables.
*/
static struct this_unit
{
int n_workers; // How many workers there are.
Worker** ppWorkers; // Array of worker instances.
} this_unit =
{
0,
NULL
};
static thread_local struct this_thread
{
int current_worker_id; // The worker id of the current thread
} this_thread =
{
WORKER_ABSENT_ID
};
/**
* Structure used for sending cross-thread messages.
*/
typedef struct worker_message
{
uint32_t id; /*< Message id. */
intptr_t arg1; /*< Message specific first argument. */
intptr_t arg2; /*< Message specific second argument. */
} WORKER_MESSAGE;
/**
* Check error returns from epoll_ctl; impossible ones lead to crash.
*
* @param wid Worker id.
* @param errornum The errno set by epoll_ctl
* @param op Either EPOLL_CTL_ADD or EPOLL_CTL_DEL.
*/
void poll_resolve_error(int wid, int fd, int errornum, int op)
{
if (op == EPOLL_CTL_ADD)
{
if (EEXIST == errornum)
{
MXS_ERROR("File descriptor %d already added to epoll instance of worker %d.", fd, wid);
return;
}
if (ENOSPC == errornum)
{
MXS_ERROR("The limit imposed by /proc/sys/fs/epoll/max_user_watches was "
"reached when trying to add file descriptor %d to epoll instance "
"of worker %d.", fd, wid);
return;
}
}
else
{
ss_dassert(op == EPOLL_CTL_DEL);
/* Must be removing */
if (ENOENT == errornum)
{
MXS_ERROR("File descriptor %d was not found in epoll instance of worker %d.", fd, wid);
return;
}
}
/* Common checks for add or remove - crash MaxScale */
if (EBADF == errornum)
{
raise(SIGABRT);
}
if (EINVAL == errornum)
{
raise(SIGABRT);
}
if (ENOMEM == errornum)
{
raise(SIGABRT);
}
if (EPERM == errornum)
{
raise(SIGABRT);
}
/* Undocumented error number */
raise(SIGABRT);
}
}
static bool modules_thread_init();
static void modules_thread_finish();
Worker::Worker(int id, int epoll_fd, int read_fd, int write_fd)
: m_id(id)
, m_epoll_fd(epoll_fd)
, m_read_fd(read_fd)
, m_write_fd(write_fd)
{
m_poll.handler = &Worker::poll_handler;
m_thread = 0;
m_started = false;
m_should_shutdown = false;
m_shutdown_initiated = false;
}
Worker::~Worker()
{
ss_dassert(!m_started);
poll_remove_fd_from_worker(m_id, m_read_fd);
close(m_read_fd);
close(m_write_fd);
close(m_epoll_fd);
}
// static
void Worker::init()
{
this_unit.n_workers = config_threadcount();
this_unit.ppWorkers = new (std::nothrow) Worker* [this_unit.n_workers] (); // Zero initialized array
if (!this_unit.ppWorkers)
{
// If we cannot allocate the array, we just exit.
exit(-1);
}
for (int i = 0; i < this_unit.n_workers; ++i)
{
Worker* pWorker = Worker::create(i);
if (pWorker)
{
this_unit.ppWorkers[i] = pWorker;
}
else
{
// If a worker cannot be created, we just exit. No way we can continue.
exit(-1);
}
}
MXS_NOTICE("Workers created!");
}
void Worker::finish()
{
for (int i = 0; i < this_unit.n_workers; ++i)
{
Worker* pWorker = this_unit.ppWorkers[i];
delete pWorker;
this_unit.ppWorkers[i] = NULL;
}
}
bool Worker::add_fd(int fd, uint32_t events, MXS_POLL_DATA* pData)
{
bool rv = true;
events |= EPOLLET;
struct epoll_event ev;
ev.events = events;
ev.data.ptr = pData;
pData->thread.id = m_id;
if (epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, fd, &ev) != 0)
{
poll_resolve_error(m_id, fd, errno, EPOLL_CTL_ADD);
rv = false;
}
return rv;
}
bool Worker::remove_fd(int fd)
{
bool rv = true;
struct epoll_event ev = {};
if (epoll_ctl(m_epoll_fd, EPOLL_CTL_DEL, fd, &ev) != 0)
{
poll_resolve_error(m_id, fd, errno, EPOLL_CTL_DEL);
rv = false;
}
return rv;
}
int mxs_worker_id(MXS_WORKER* pWorker)
{
return static_cast<Worker*>(pWorker)->id();
}
bool mxs_worker_should_shutdown(MXS_WORKER* pWorker)
{
return static_cast<Worker*>(pWorker)->should_shutdown();
}
Worker* Worker::get(int worker_id)
{
ss_dassert(worker_id < this_unit.n_workers);
return this_unit.ppWorkers[worker_id];
}
MXS_WORKER* mxs_worker_get(int worker_id)
{
return Worker::get(worker_id);
}
Worker* Worker::get_current()
{
Worker* pWorker = NULL;
int worker_id = get_current_id();
if (worker_id != WORKER_ABSENT_ID)
{
pWorker = Worker::get(worker_id);
}
return pWorker;
}
int Worker::get_current_id()
{
return this_thread.current_worker_id;
}
bool Worker::post_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2)
{
// NOTE: No logging here, this function must be signal safe.
WORKER_MESSAGE message = {};
message.id = msg_id;
message.arg1 = arg1;
message.arg2 = arg2;
ssize_t n = write(m_write_fd, &message, sizeof(message));
return n == sizeof(message) ? true : false;
}
bool mxs_worker_post_message(MXS_WORKER* pWorker, uint32_t msg_id, intptr_t arg1, intptr_t arg2)
{
return static_cast<Worker*>(pWorker)->post_message(msg_id, arg1, arg2);
}
size_t Worker::broadcast_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2)
{
// NOTE: No logging here, this function must be signal safe.
size_t n = 0;
for (int i = 0; i < this_unit.n_workers; ++i)
{
Worker* pWorker = this_unit.ppWorkers[i];
if (pWorker->post_message(msg_id, arg1, arg2))
{
++n;
}
}
return n;
}
size_t mxs_worker_broadcast_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2)
{
return Worker::broadcast_message(msg_id, arg1, arg2);
}
namespace
{
bool should_shutdown(void* pData)
{
return static_cast<Worker*>(pData)->should_shutdown();
}
}
void Worker::run()
{
this_thread.current_worker_id = m_id;
poll_waitevents(m_epoll_fd, m_id, ::should_shutdown, this);
this_thread.current_worker_id = WORKER_ABSENT_ID;
MXS_NOTICE("Worker %d has shut down.", m_id);
}
bool Worker::start()
{
m_started = true;
if (!thread_start(&m_thread, &Worker::thread_main, this))
{
m_started = false;
}
return m_started;
}
void Worker::join()
{
if (m_started)
{
MXS_NOTICE("Waiting for worker %d.", m_id);
thread_wait(m_thread);
MXS_NOTICE("Waited for worker %d.", m_id);
m_started = false;
}
}
void Worker::shutdown()
{
// NOTE: No logging here, this function must be signal safe.
if (!m_shutdown_initiated)
{
if (post_message(MXS_WORKER_MSG_SHUTDOWN, 0, 0))
{
m_shutdown_initiated = true;
}
}
}
void Worker::shutdown_all()
{
// NOTE: No logging here, this function must be signal safe.
for (int i = 0; i < this_unit.n_workers; ++i)
{
Worker* pWorker = this_unit.ppWorkers[i];
pWorker->shutdown();
}
}
/**
* Creates a worker instance.
* - Allocates the structure.
* - Creates a pipe.
* - Adds the read descriptor to the polling mechanism.
*
* @param worker_id The id of the worker.
*
* @return A worker instance if successful, otherwise NULL.
*/
//static
Worker* Worker::create(int worker_id)
{
Worker* pWorker = NULL;
int epoll_fd = epoll_create(MAX_EVENTS);
if (epoll_fd != -1)
{
int fds[2];
// We create the pipe in message mode (O_DIRECT), so that we do
// not need to deal with partial messages.
if (pipe2(fds, O_DIRECT | O_NONBLOCK | O_CLOEXEC) == 0)
{
int read_fd = fds[0];
int write_fd = fds[1];
pWorker = new (std::nothrow) Worker(worker_id, epoll_fd, read_fd, write_fd);
if (pWorker)
{
if (!pWorker->add_fd(read_fd, EPOLLIN, &pWorker->m_poll))
{
MXS_ERROR("Could not add read descriptor of worker to poll set: %s", mxs_strerror(errno));
delete pWorker;
pWorker = NULL;
}
}
else
{
close(read_fd);
close(write_fd);
close(epoll_fd);
}
}
else
{
MXS_ERROR("Could not create pipe for worker: %s", mxs_strerror(errno));
}
}
else
{
MXS_ERROR("Could not create epoll-instance for worker: %s", mxs_strerror(errno));
}
return pWorker;
}
/**
* The worker message handler.
*
* @param msg_id The message id.
* @param arg1 Message specific first argument.
* @param arg2 Message specific second argument.
*/
void Worker::handle_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2)
{
switch (msg_id)
{
case MXS_WORKER_MSG_PING:
{
ss_dassert(arg1 == 0);
char* zArg2 = reinterpret_cast<char*>(arg2);
const char* zMessage = zArg2 ? zArg2 : "Alive and kicking";
MXS_NOTICE("Worker[%d]: %s.", m_id, zMessage);
MXS_FREE(zArg2);
}
break;
case MXS_WORKER_MSG_SHUTDOWN:
{
MXS_NOTICE("Worker %d received shutdown message.", m_id);
m_should_shutdown = true;
}
break;
case MXS_WORKER_MSG_CALL:
{
void (*f)(int, void*) = (void (*)(int,void*))arg1;
f(m_id, (void*)arg2);
}
break;
default:
MXS_ERROR("Worker received unknown message %d.", msg_id);
}
}
/**
* Worker poll handler.
*
* @param thread_id Id of the thread; same as id of the relevant worker.
* @param events Epoll events.
*
* @return What events the handler handled.
*/
uint32_t Worker::poll(uint32_t events)
{
int rc = MXS_POLL_NOP;
// We only expect EPOLLIN events.
ss_dassert(((events & EPOLLIN) != 0) && ((events & ~EPOLLIN) == 0));
if (events & EPOLLIN)
{
WORKER_MESSAGE message;
ssize_t n;
do
{
n = read(m_read_fd, &message, sizeof(message));
if (n == sizeof(message))
{
handle_message(message.id, message.arg1, message.arg2);
}
else if (n == -1)
{
if (errno != EWOULDBLOCK)
{
MXS_ERROR("Worker could not read from pipe: %s", mxs_strerror(errno));
}
}
else if (n != 0)
{
// This really should not happen as the pipe is in message mode. We
// should either get a message, nothing at all or an error. In non-debug
// mode we continue reading in order to empty the pipe as otherwise the
// thread may hang.
MXS_ERROR("Worker could only read %ld bytes from pipe, although expected %lu bytes.",
n, sizeof(message));
ss_dassert(!true);
}
}
while ((n != 0) && (n != -1));
rc = MXS_POLL_READ;
}
return rc;
}
/**
* Handler for poll events related to the read descriptor of the worker.
*
* @param pData The MXS_POLL_DATA of the worker in question.
* @param thread_id Id of the thread; same as id of the relevant worker.
* @param events Epoll events.
*
* @return What events the handler handled.
*/
//static
uint32_t Worker::poll_handler(MXS_POLL_DATA* pData, int thread_id, uint32_t events)
{
Worker* pWorker = reinterpret_cast<Worker*>(pData);
ss_dassert(pWorker->m_id == thread_id);
return pWorker->poll(events);
}
/**
* The entry point of each worker thread.
*
* @param arg A worker.
*/
//static
void Worker::thread_main(void* pArg)
{
if (modules_thread_init())
{
Worker* pWorker = static_cast<Worker*>(pArg);
pWorker->run();
modules_thread_finish();
}
else
{
MXS_ERROR("Could not perform thread initialization for all modules. Thread exits.");
}
}
/**
* Calls thread_init on all loaded modules.
*
* @return True, if all modules were successfully initialized.
*/
static bool modules_thread_init()
{
bool initialized = false;
MXS_MODULE_ITERATOR i = mxs_module_iterator_get(NULL);
MXS_MODULE* module = NULL;
while ((module = mxs_module_iterator_get_next(&i)) != NULL)
{
if (module->thread_init)
{
int rc = (module->thread_init)();
if (rc != 0)
{
break;
}
}
}
if (module)
{
// If module is non-NULL it means that the initialization failed for
// that module. We now need to call finish on all modules that were
// successfully initialized.
MXS_MODULE* failed_module = module;
i = mxs_module_iterator_get(NULL);
while ((module = mxs_module_iterator_get_next(&i)) != failed_module)
{
if (module->thread_finish)
{
(module->thread_finish)();
}
}
}
else
{
initialized = true;
}
return initialized;
}
/**
* Calls thread_finish on all loaded modules.
*/
static void modules_thread_finish()
{
MXS_MODULE_ITERATOR i = mxs_module_iterator_get(NULL);
MXS_MODULE* module = NULL;
while ((module = mxs_module_iterator_get_next(&i)) != NULL)
{
if (module->thread_finish)
{
(module->thread_finish)();
}
}
}