poll_waitevents moved to Worker

A direct move without any non-essential modifications. Poll_waitevents will
be turned into a regular methods using instance variables.
This commit is contained in:
Johan Wikman
2017-04-18 12:37:07 +03:00
parent 4ed615a6c1
commit effa2f5674
4 changed files with 234 additions and 184 deletions

View File

@ -162,4 +162,11 @@ bool poll_add_fd_to_worker(int wid, int fd, uint32_t events, MXS_POLL_DATA* data
*/
bool poll_remove_fd_from_worker(int wid, int fd);
/**
* Check whether there are cross-thread messages for current thread.
*
* @attention Only to be called by the system.
*/
void poll_check_message(void);
MXS_END_DECLS

View File

@ -13,6 +13,7 @@
*/
#include <maxscale/cppdefs.hh>
#include <maxscale/platform.h>
#include "messagequeue.hh"
#include "worker.h"
@ -196,6 +197,21 @@ public:
*/
static int get_current_id();
/**
* Set the number of non-blocking poll cycles that will be done before
* a blocking poll will take place.
*
* @param nbpolls Number of non-blocking polls to perform before blocking.
*/
static void set_nonblocking_polls(unsigned int nbpolls);
/**
* Maximum time to block in epoll_wait.
*
* @param maxwait Maximum wait time in millliseconds.
*/
static void set_maxwait(unsigned int maxwait);
private:
Worker(int id,
int epoll_fd,
@ -211,6 +227,13 @@ private:
static void thread_main(void* arg);
static void poll_waitevents(int epoll_fd,
int thread_id,
POLL_STATS* poll_stats,
QUEUE_STATS* queue_stats,
bool (*should_shutdown)(void* data),
void* data);
private:
int m_id; /*< The id of the worker. */
int m_epoll_fd; /*< The epoll file descriptor. */

View File

@ -40,11 +40,8 @@
using maxscale::Worker;
static thread_local int current_thread_id; /*< This thread's ID */
static int next_epoll_fd = 0; /*< Which thread handles the next DCB */
static int n_threads; /*< Number of threads */
static int number_poll_spins; /*< Maximum non-block polls */
static int max_poll_sleep; /*< Maximum block time */
/** Poll cross-thread messaging variables */
@ -52,8 +49,6 @@ static volatile int *poll_msg;
static void *poll_msg_data = NULL;
static SPINLOCK poll_msg_lock = SPINLOCK_INIT;
static void poll_check_message(void);
/**
* Initialise the polling system we are using for the gateway.
*
@ -68,9 +63,6 @@ poll_init()
{
exit(-1);
}
number_poll_spins = config_nbpolls();
max_poll_sleep = config_pollsleep();
}
static bool add_fd_to_worker(int wid, int fd, uint32_t events, MXS_POLL_DATA* data)
@ -111,7 +103,18 @@ static bool add_fd_to_workers(int fd, uint32_t events, MXS_POLL_DATA* data)
if (rv)
{
// The DCB will appear on the list of the calling thread.
data->thread.id = current_thread_id;
int wid = Worker::get_current_id();
if (wid == -1)
{
// TODO: Listeners are created before the workers have been started.
// TODO: Hence the returned id will be -1. We change it to 0, which in
// TODO: practice will mean that they will end up on the Worker running
// TODO: in the main thread. This needs to be sorted out.
wid = 0;
}
data->thread.id = wid;
}
else
{
@ -188,174 +191,6 @@ bool poll_remove_fd_from_worker(int wid, int fd)
return rv;
}
/**
* The main polling loop
*
* @param epoll_fd The epoll descriptor.
* @param thread_id The id of the calling thread.
* @param poll_stats The polling stats of the calling thread.
* @param queue_stats The queue stats of the calling thread.
* @param should_shutdown Pointer to function returning true if the polling should
* be terminated.
* @param data Data provided to the @c should_shutdown function.
*/
void poll_waitevents(int epoll_fd,
int thread_id,
POLL_STATS* poll_stats,
QUEUE_STATS* queue_stats,
bool (*should_shutdown)(void* data),
void* data)
{
current_thread_id = thread_id;
struct epoll_event events[MAX_EVENTS];
int i, nfds, timeout_bias = 1;
int poll_spins = 0;
poll_stats->thread_state = THREAD_IDLE;
while (!should_shutdown(data))
{
poll_stats->thread_state = THREAD_POLLING;
atomic_add_int64(&poll_stats->n_polls, 1);
if ((nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, 0)) == -1)
{
int eno = errno;
errno = 0;
MXS_DEBUG("%lu [poll_waitevents] epoll_wait returned "
"%d, errno %d",
pthread_self(),
nfds,
eno);
}
/*
* If there are no new descriptors from the non-blocking call
* and nothing to process on the event queue then for do a
* blocking call to epoll_wait.
*
* We calculate a timeout bias to alter the length of the blocking
* call based on the time since we last received an event to process
*/
else if (nfds == 0 && poll_spins++ > number_poll_spins)
{
if (timeout_bias < 10)
{
timeout_bias++;
}
atomic_add_int64(&poll_stats->blockingpolls, 1);
nfds = epoll_wait(epoll_fd,
events,
MAX_EVENTS,
(max_poll_sleep * timeout_bias) / 10);
if (nfds == 0)
{
poll_spins = 0;
}
}
if (nfds > 0)
{
poll_stats->evq_length = nfds;
if (nfds > poll_stats->evq_max)
{
poll_stats->evq_max = nfds;
}
timeout_bias = 1;
if (poll_spins <= number_poll_spins + 1)
{
atomic_add_int64(&poll_stats->n_nbpollev, 1);
}
poll_spins = 0;
MXS_DEBUG("%lu [poll_waitevents] epoll_wait found %d fds",
pthread_self(),
nfds);
atomic_add_int64(&poll_stats->n_pollev, 1);
poll_stats->thread_state = THREAD_PROCESSING;
poll_stats->n_fds[(nfds < MAXNFDS ? (nfds - 1) : MAXNFDS - 1)]++;
}
uint64_t cycle_start = hkheartbeat;
for (int i = 0; i < nfds; i++)
{
/** Calculate event queue statistics */
int64_t started = hkheartbeat;
int64_t qtime = started - cycle_start;
if (qtime > N_QUEUE_TIMES)
{
queue_stats->qtimes[N_QUEUE_TIMES]++;
}
else
{
queue_stats->qtimes[qtime]++;
}
queue_stats->maxqtime = MXS_MAX(queue_stats->maxqtime, qtime);
MXS_POLL_DATA *data = (MXS_POLL_DATA*)events[i].data.ptr;
uint32_t actions = data->handler(data, thread_id, events[i].events);
if (actions & MXS_POLL_ACCEPT)
{
atomic_add_int64(&poll_stats->n_accept, 1);
}
if (actions & MXS_POLL_READ)
{
atomic_add_int64(&poll_stats->n_read, 1);
}
if (actions & MXS_POLL_WRITE)
{
atomic_add_int64(&poll_stats->n_write, 1);
}
if (actions & MXS_POLL_HUP)
{
atomic_add_int64(&poll_stats->n_hup, 1);
}
if (actions & MXS_POLL_ERROR)
{
atomic_add_int64(&poll_stats->n_error, 1);
}
/** Calculate event execution statistics */
qtime = hkheartbeat - started;
if (qtime > N_QUEUE_TIMES)
{
queue_stats->exectimes[N_QUEUE_TIMES]++;
}
else
{
queue_stats->exectimes[qtime % N_QUEUE_TIMES]++;
}
queue_stats->maxexectime = MXS_MAX(queue_stats->maxexectime, qtime);
}
dcb_process_idle_sessions(thread_id);
poll_stats->thread_state = THREAD_ZPROCESSING;
/** Process closed DCBs */
dcb_process_zombies(thread_id);
poll_check_message();
poll_stats->thread_state = THREAD_IDLE;
} /*< while(1) */
poll_stats->thread_state = THREAD_STOPPED;
}
/**
* Set the number of non-blocking poll cycles that will be done before
* a blocking poll will take place. Whenever an event arrives on a thread
@ -368,7 +203,7 @@ void poll_waitevents(int epoll_fd,
void
poll_set_nonblocking_polls(unsigned int nbpolls)
{
number_poll_spins = nbpolls;
Worker::set_nonblocking_polls(nbpolls);
}
/**
@ -381,7 +216,7 @@ poll_set_nonblocking_polls(unsigned int nbpolls)
void
poll_set_maxwait(unsigned int maxwait)
{
max_poll_sleep = maxwait;
Worker::set_maxwait(maxwait);
}
/**
@ -741,7 +576,7 @@ void poll_send_message(enum poll_message msg, void *data)
for (int i = 0; i < nthr; i++)
{
if (i != current_thread_id)
if (i != Worker::get_current_id())
{
while (poll_msg[i] & msg)
{
@ -754,9 +589,9 @@ void poll_send_message(enum poll_message msg, void *data)
spinlock_release(&poll_msg_lock);
}
static void poll_check_message()
void poll_check_message()
{
int thread_id = current_thread_id;
int thread_id = Worker::get_current_id();
if (poll_msg[thread_id] & POLL_MSG_CLEAN_PERSISTENT)
{

View File

@ -18,7 +18,9 @@
#include <stdlib.h>
#include <unistd.h>
#include <maxscale/alloc.h>
#include <maxscale/atomic.h>
#include <maxscale/config.h>
#include <maxscale/hk_heartbeat.h>
#include <maxscale/log_manager.h>
#include <maxscale/platform.h>
#include "maxscale/modules.h"
@ -43,6 +45,9 @@ static struct this_unit
{
int n_workers; // How many workers there are.
Worker** ppWorkers; // Array of worker instances.
int number_poll_spins; // Maximum non-block polls
int max_poll_sleep; // Maximum block time
} this_unit =
{
0,
@ -159,6 +164,8 @@ Worker::~Worker()
void Worker::init()
{
this_unit.n_workers = config_threadcount();
this_unit.number_poll_spins = config_nbpolls();
this_unit.max_poll_sleep = config_pollsleep();
pollStats = (POLL_STATS*)MXS_CALLOC(this_unit.n_workers, sizeof(POLL_STATS));
if (!pollStats)
@ -292,6 +299,18 @@ int Worker::get_current_id()
return this_thread.current_worker_id;
}
//static
void Worker::set_nonblocking_polls(unsigned int nbpolls)
{
this_unit.number_poll_spins = nbpolls;
}
//static
void Worker::set_maxwait(unsigned int maxwait)
{
this_unit.max_poll_sleep = maxwait;
}
bool Worker::post_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2)
{
// NOTE: No logging here, this function must be signal safe.
@ -523,6 +542,172 @@ void Worker::thread_main(void* pArg)
}
}
/**
* The main polling loop
*
* @param epoll_fd The epoll descriptor.
* @param thread_id The id of the calling thread.
* @param poll_stats The polling stats of the calling thread.
* @param queue_stats The queue stats of the calling thread.
* @param should_shutdown Pointer to function returning true if the polling should
* be terminated.
* @param data Data provided to the @c should_shutdown function.
*/
//static
void Worker::poll_waitevents(int epoll_fd,
int thread_id,
POLL_STATS* poll_stats,
QUEUE_STATS* queue_stats,
bool (*should_shutdown)(void* data),
void* data)
{
struct epoll_event events[MAX_EVENTS];
int i, nfds, timeout_bias = 1;
int poll_spins = 0;
poll_stats->thread_state = THREAD_IDLE;
while (!should_shutdown(data))
{
poll_stats->thread_state = THREAD_POLLING;
atomic_add_int64(&poll_stats->n_polls, 1);
if ((nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, 0)) == -1)
{
int eno = errno;
errno = 0;
MXS_DEBUG("%lu [poll_waitevents] epoll_wait returned "
"%d, errno %d",
pthread_self(),
nfds,
eno);
}
/*
* If there are no new descriptors from the non-blocking call
* and nothing to process on the event queue then for do a
* blocking call to epoll_wait.
*
* We calculate a timeout bias to alter the length of the blocking
* call based on the time since we last received an event to process
*/
else if (nfds == 0 && poll_spins++ > this_unit.number_poll_spins)
{
if (timeout_bias < 10)
{
timeout_bias++;
}
atomic_add_int64(&poll_stats->blockingpolls, 1);
nfds = epoll_wait(epoll_fd,
events,
MAX_EVENTS,
(this_unit.max_poll_sleep * timeout_bias) / 10);
if (nfds == 0)
{
poll_spins = 0;
}
}
if (nfds > 0)
{
poll_stats->evq_length = nfds;
if (nfds > poll_stats->evq_max)
{
poll_stats->evq_max = nfds;
}
timeout_bias = 1;
if (poll_spins <= this_unit.number_poll_spins + 1)
{
atomic_add_int64(&poll_stats->n_nbpollev, 1);
}
poll_spins = 0;
MXS_DEBUG("%lu [poll_waitevents] epoll_wait found %d fds",
pthread_self(),
nfds);
atomic_add_int64(&poll_stats->n_pollev, 1);
poll_stats->thread_state = THREAD_PROCESSING;
poll_stats->n_fds[(nfds < MAXNFDS ? (nfds - 1) : MAXNFDS - 1)]++;
}
uint64_t cycle_start = hkheartbeat;
for (int i = 0; i < nfds; i++)
{
/** Calculate event queue statistics */
int64_t started = hkheartbeat;
int64_t qtime = started - cycle_start;
if (qtime > N_QUEUE_TIMES)
{
queue_stats->qtimes[N_QUEUE_TIMES]++;
}
else
{
queue_stats->qtimes[qtime]++;
}
queue_stats->maxqtime = MXS_MAX(queue_stats->maxqtime, qtime);
MXS_POLL_DATA *data = (MXS_POLL_DATA*)events[i].data.ptr;
uint32_t actions = data->handler(data, thread_id, events[i].events);
if (actions & MXS_POLL_ACCEPT)
{
atomic_add_int64(&poll_stats->n_accept, 1);
}
if (actions & MXS_POLL_READ)
{
atomic_add_int64(&poll_stats->n_read, 1);
}
if (actions & MXS_POLL_WRITE)
{
atomic_add_int64(&poll_stats->n_write, 1);
}
if (actions & MXS_POLL_HUP)
{
atomic_add_int64(&poll_stats->n_hup, 1);
}
if (actions & MXS_POLL_ERROR)
{
atomic_add_int64(&poll_stats->n_error, 1);
}
/** Calculate event execution statistics */
qtime = hkheartbeat - started;
if (qtime > N_QUEUE_TIMES)
{
queue_stats->exectimes[N_QUEUE_TIMES]++;
}
else
{
queue_stats->exectimes[qtime % N_QUEUE_TIMES]++;
}
queue_stats->maxexectime = MXS_MAX(queue_stats->maxexectime, qtime);
}
dcb_process_idle_sessions(thread_id);
poll_stats->thread_state = THREAD_ZPROCESSING;
/** Process closed DCBs */
dcb_process_zombies(thread_id);
poll_check_message();
poll_stats->thread_state = THREAD_IDLE;
} /*< while(1) */
poll_stats->thread_state = THREAD_STOPPED;
}
/**
* Calls thread_init on all loaded modules.
*