Add shared epoll instance to workers

All workers share an epoll instance that is added level-triggered
to the epoll instance of each Worker. This is intended to be used
together with listening sockets.

When a listening socket is added to the shared epoll instance the
effect is that EPOLLIN will be active for it whenever there is a
connection pending on a listening socket added to that epoll
instance.

When that occurs all workers in their epoll_wait()-calls will return.
When the workers subsequently call epoll_wait() on the shared epoll
instance, that will return with an event provided some other thread(s)
has not yet called accept() on the listening socket.

As each worker extracts just one event at a time and calls accept just
once before calling epoll_wait(), it means that the client connections
will be distributed evenly across all workers, provided the load on
the workers is roughly the same. If it isn't then a worker with less
load will get more connections to handle (which will even out the load).
This commit is contained in:
Johan Wikman
2017-04-21 21:04:11 +03:00
parent a27bec5e88
commit 56b411aea9
2 changed files with 209 additions and 37 deletions

View File

@ -54,6 +54,7 @@ struct WORKER_STATISTICS
class Worker : public MXS_WORKER
, private MessageQueue::Handler
, private MXS_POLL_DATA
{
Worker(const Worker&);
Worker& operator = (const Worker&);
@ -163,14 +164,42 @@ public:
bool add_fd(int fd, uint32_t events, MXS_POLL_DATA* pData);
/**
* Remove a file descriptor from a poll set.
* Add a file descriptor to the epoll instance shared between all workers.
* Events occuring on the provided file descriptor will be handled by all
* workers. This is primarily intended for listening sockets where the
* only event is EPOLLIN, signaling that accept() can be used on the listening
* socket for creating a connected socket to a client.
*
* @param fd The file descriptor to be removed.
* @param fd The file descriptor to be added.
* @param events Mask of epoll event types.
* @param pData The poll data associated with the descriptor:
*
* data->handler : Handler that knows how to deal with events
* for this particular type of 'struct mxs_poll_data'.
* data->thread.id: 0
*
* @return True, if the descriptor could be added, false otherwise.
*/
static bool add_shared_fd(int fd, uint32_t events, MXS_POLL_DATA* pData);
/**
* Remove a file descriptor from the worker's epoll instance.
*
* @param fd The file descriptor to be removed.
*
* @return True on success, false on failure.
*/
bool remove_fd(int fd);
/**
* Remove a file descriptor from the epoll instance shared between all workers.
*
* @param fd The file descriptor to be removed.
*
* @return True on success, false on failure.
*/
static bool remove_shared_fd(int fd);
/**
* Main function of worker.
*
@ -306,7 +335,7 @@ private:
int epoll_fd);
virtual ~Worker();
static Worker* create(int id);
static Worker* create(int id, int epoll_listener_fd);
void handle_message(MessageQueue& queue, const MessageQueue::Message& msg); // override
@ -314,6 +343,9 @@ private:
void poll_waitevents();
static uint32_t epoll_instance_handler(struct mxs_poll_data* data, int wid, uint32_t events);
uint32_t handle_epoll_events(uint32_t events);
private:
int m_id; /*< The id of the worker. */
state_t m_state; /*< The state of the worker */

View File

@ -37,14 +37,14 @@ namespace
/**
* Unit variables.
*/
static struct this_unit
struct this_unit
{
bool initialized; // Whether the initialization has been performed.
int n_workers; // How many workers there are.
Worker** ppWorkers; // Array of worker instances.
int number_poll_spins; // Maximum non-block polls
int max_poll_sleep; // Maximum block time
int epoll_listener_fd; // Shared epoll descriptor for listening descriptors.
} this_unit =
{
false,
@ -54,7 +54,7 @@ static struct this_unit
0
};
static thread_local struct this_thread
thread_local struct this_thread
{
int current_worker_id; // The worker id of the current thread
} this_thread =
@ -75,25 +75,23 @@ typedef struct worker_message
/**
* Check error returns from epoll_ctl; impossible ones lead to crash.
*
* @param wid Worker id.
* @param errornum The errno set by epoll_ctl
* @param op Either EPOLL_CTL_ADD or EPOLL_CTL_DEL.
*/
void poll_resolve_error(int wid, int fd, int errornum, int op)
void poll_resolve_error(int fd, int errornum, int op)
{
if (op == EPOLL_CTL_ADD)
{
if (EEXIST == errornum)
{
MXS_ERROR("File descriptor %d already added to epoll instance of worker %d.", fd, wid);
MXS_ERROR("File descriptor %d already present in an epoll instance.", fd);
return;
}
if (ENOSPC == errornum)
{
MXS_ERROR("The limit imposed by /proc/sys/fs/epoll/max_user_watches was "
"reached when trying to add file descriptor %d to epoll instance "
"of worker %d.", fd, wid);
"reached when trying to add file descriptor %d to an epoll instance.", fd);
return;
}
}
@ -104,7 +102,7 @@ void poll_resolve_error(int wid, int fd, int errornum, int op)
/* Must be removing */
if (ENOENT == errornum)
{
MXS_ERROR("File descriptor %d was not found in epoll instance of worker %d.", fd, wid);
MXS_ERROR("File descriptor %d was not found in epoll instance.", fd);
return;
}
}
@ -147,6 +145,8 @@ Worker::Worker(int id,
, m_should_shutdown(false)
, m_shutdown_initiated(false)
{
MXS_POLL_DATA::handler = &Worker::epoll_instance_handler;
MXS_POLL_DATA::thread.id = id;
}
Worker::~Worker()
@ -165,28 +165,34 @@ bool Worker::init()
this_unit.n_workers = config_threadcount();
this_unit.number_poll_spins = config_nbpolls();
this_unit.max_poll_sleep = config_pollsleep();
this_unit.ppWorkers = new (std::nothrow) Worker* [this_unit.n_workers] (); // Zero initialized array
if (this_unit.ppWorkers)
this_unit.epoll_listener_fd = epoll_create(MAX_EVENTS);
if (this_unit.epoll_listener_fd != -1)
{
for (int i = 0; i < this_unit.n_workers; ++i)
this_unit.ppWorkers = new (std::nothrow) Worker* [this_unit.n_workers] (); // Zero initialized array
if (this_unit.ppWorkers)
{
Worker* pWorker = Worker::create(i);
for (int i = 0; i < this_unit.n_workers; ++i)
{
Worker* pWorker = Worker::create(i, this_unit.epoll_listener_fd);
if (pWorker)
{
this_unit.ppWorkers[i] = pWorker;
}
else
{
for (int j = i - 1; j >= 0; --j)
if (pWorker)
{
delete this_unit.ppWorkers[j];
this_unit.ppWorkers[i] = pWorker;
}
else
{
for (int j = i - 1; j >= 0; --j)
{
delete this_unit.ppWorkers[j];
}
delete this_unit.ppWorkers;
this_unit.ppWorkers = NULL;
break;
delete this_unit.ppWorkers;
this_unit.ppWorkers = NULL;
break;
}
}
if (this_unit.ppWorkers)
@ -194,6 +200,14 @@ bool Worker::init()
this_unit.initialized = true;
}
}
else
{
close(this_unit.epoll_listener_fd);
}
}
else
{
MXS_ERROR("Could not allocate an epoll instance.");
}
return this_unit.initialized;
@ -213,6 +227,10 @@ void Worker::finish()
delete [] this_unit.ppWorkers;
this_unit.ppWorkers = NULL;
close(this_unit.epoll_listener_fd);
this_unit.epoll_listener_fd = 0;
this_unit.initialized = false;
}
@ -379,6 +397,7 @@ bool Worker::add_fd(int fd, uint32_t events, MXS_POLL_DATA* pData)
{
bool rv = true;
// Must be edge-triggered.
events |= EPOLLET;
struct epoll_event ev;
@ -390,7 +409,31 @@ bool Worker::add_fd(int fd, uint32_t events, MXS_POLL_DATA* pData)
if (epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, fd, &ev) != 0)
{
poll_resolve_error(m_id, fd, errno, EPOLL_CTL_ADD);
poll_resolve_error(fd, errno, EPOLL_CTL_ADD);
rv = false;
}
return rv;
}
//static
bool Worker::add_shared_fd(int fd, uint32_t events, MXS_POLL_DATA* pData)
{
bool rv = true;
// TODO: Does this really need to be level-triggered?
events &= ~EPOLLET;
struct epoll_event ev;
ev.events = events;
ev.data.ptr = pData;
pData->thread.id = 0; // TODO: Remove the thread id altogether.
if (epoll_ctl(this_unit.epoll_listener_fd, EPOLL_CTL_ADD, fd, &ev) != 0)
{
poll_resolve_error(fd, errno, EPOLL_CTL_ADD);
rv = false;
}
@ -405,7 +448,23 @@ bool Worker::remove_fd(int fd)
if (epoll_ctl(m_epoll_fd, EPOLL_CTL_DEL, fd, &ev) != 0)
{
poll_resolve_error(m_id, fd, errno, EPOLL_CTL_DEL);
poll_resolve_error(fd, errno, EPOLL_CTL_DEL);
rv = false;
}
return rv;
}
//static
bool Worker::remove_shared_fd(int fd)
{
bool rv = true;
struct epoll_event ev = {};
if (epoll_ctl(this_unit.epoll_listener_fd, EPOLL_CTL_DEL, fd, &ev) != 0)
{
poll_resolve_error(fd, errno, EPOLL_CTL_DEL);
rv = false;
}
@ -565,12 +624,14 @@ void Worker::shutdown_all()
* - Creates a pipe.
* - Adds the read descriptor to the polling mechanism.
*
* @param worker_id The id of the worker.
* @param worker_id The id of the worker.
* @param epoll_listener_fd The file descriptor of the epoll set to which listening
* sockets will be placed.
*
* @return A worker instance if successful, otherwise NULL.
*/
//static
Worker* Worker::create(int worker_id)
Worker* Worker::create(int worker_id, int epoll_listener_fd)
{
Worker* pThis = NULL;
@ -582,25 +643,51 @@ Worker* Worker::create(int worker_id)
if (pThis)
{
MessageQueue* pQueue = MessageQueue::create(pThis);
struct epoll_event ev;
ev.events = EPOLLIN;
MXS_POLL_DATA* pData = pThis;
ev.data.ptr = pData; // Necessary for pointer adjustment, otherwise downcast will not work.
if (pQueue)
// The shared epoll instance descriptor is *not* added using EPOLLET (edge-triggered)
// because we want it to be level-triggered. That way, as long as there is a single
// active (accept() can be called) listening socket, epoll_wait() will return an event
// for it. It must be like that because each worker will call accept() just once before
// calling epoll_wait() again. The end result is that as long as the load of different
// workers is roughly the same, the client connections will be distributed evenly across
// the workers. If the load is not the same, then a worker with less load will get more
// clients that a worker with more load.
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, epoll_listener_fd, &ev) == 0)
{
if (pQueue->add_to_worker(pThis))
MXS_NOTICE("Epoll instance for listening sockets added to worker epoll instance.");
MessageQueue* pQueue = MessageQueue::create(pThis);
if (pQueue)
{
pThis->m_pQueue = pQueue;
if (pQueue->add_to_worker(pThis))
{
pThis->m_pQueue = pQueue;
}
else
{
MXS_ERROR("Could not add message queue to worker.");
delete pThis;
pThis = NULL;
}
}
else
{
MXS_ERROR("Could not add message queue to worker.");
MXS_ERROR("Could not create message queue for worker.");
delete pThis;
pThis = NULL;
}
}
else
{
MXS_ERROR("Could not create message queue for worker.");
MXS_ERROR("Could not add epoll instance for listening sockets to "
"epoll instance of worker: %s", mxs_strerror(errno));
delete pThis;
pThis = NULL;
}
}
else
@ -833,6 +920,59 @@ void Worker::poll_waitevents()
m_state = STOPPED;
}
/**
* Callback for events occurring on the shared epoll instance.
*
* @param pData Will point to a Worker instance.
* @param wid The worker id.
* @param events The events.
*
* @return What actions were performed.
*/
//static
uint32_t Worker::epoll_instance_handler(struct mxs_poll_data* pData, int wid, uint32_t events)
{
Worker* pWorker = static_cast<Worker*>(pData);
ss_dassert(pWorker->m_id == wid);
return pWorker->handle_epoll_events(events);
}
/**
* Handler for events occurring in the shared epoll instance.
*
* @param events The events.
*
* @return What actions were performed.
*/
uint32_t Worker::handle_epoll_events(uint32_t events)
{
struct epoll_event epoll_events[1];
// We extract just one event
int nfds = epoll_wait(this_unit.epoll_listener_fd, epoll_events, 1, 0);
uint32_t actions = MXS_POLL_NOP;
if (nfds == -1)
{
MXS_ERROR("epoll_wait failed: %s", mxs_strerror(errno));
}
else if (nfds == 0)
{
MXS_DEBUG("No events for worker %d.", m_id);
}
else
{
MXS_DEBUG("1 event for worker %d.", m_id);
MXS_POLL_DATA* pData = static_cast<MXS_POLL_DATA*>(epoll_events[0].data.ptr);
actions = pData->handler(pData, m_id, epoll_events[0].events);
}
return actions;
}
/**
* Calls thread_init on all loaded modules.
*