From effa2f5674c2990cc524a54972cb1b11527770ab Mon Sep 17 00:00:00 2001 From: Johan Wikman Date: Tue, 18 Apr 2017 12:37:07 +0300 Subject: [PATCH] poll_waitevents moved to Worker A direct move without any non-essential modifications. Poll_waitevents will be turned into a regular methods using instance variables. --- include/maxscale/poll_core.h | 7 ++ server/core/maxscale/worker.hh | 23 ++++ server/core/poll.cc | 199 +++------------------------------ server/core/worker.cc | 189 ++++++++++++++++++++++++++++++- 4 files changed, 234 insertions(+), 184 deletions(-) diff --git a/include/maxscale/poll_core.h b/include/maxscale/poll_core.h index f202bb44c..f4aa45f57 100644 --- a/include/maxscale/poll_core.h +++ b/include/maxscale/poll_core.h @@ -162,4 +162,11 @@ bool poll_add_fd_to_worker(int wid, int fd, uint32_t events, MXS_POLL_DATA* data */ bool poll_remove_fd_from_worker(int wid, int fd); +/** + * Check whether there are cross-thread messages for current thread. + * + * @attention Only to be called by the system. + */ +void poll_check_message(void); + MXS_END_DECLS diff --git a/server/core/maxscale/worker.hh b/server/core/maxscale/worker.hh index 1cdf13727..8b73800a1 100644 --- a/server/core/maxscale/worker.hh +++ b/server/core/maxscale/worker.hh @@ -13,6 +13,7 @@ */ #include +#include #include "messagequeue.hh" #include "worker.h" @@ -196,6 +197,21 @@ public: */ static int get_current_id(); + /** + * Set the number of non-blocking poll cycles that will be done before + * a blocking poll will take place. + * + * @param nbpolls Number of non-blocking polls to perform before blocking. + */ + static void set_nonblocking_polls(unsigned int nbpolls); + + /** + * Maximum time to block in epoll_wait. + * + * @param maxwait Maximum wait time in millliseconds. + */ + static void set_maxwait(unsigned int maxwait); + private: Worker(int id, int epoll_fd, @@ -211,6 +227,13 @@ private: static void thread_main(void* arg); + static void poll_waitevents(int epoll_fd, + int thread_id, + POLL_STATS* poll_stats, + QUEUE_STATS* queue_stats, + bool (*should_shutdown)(void* data), + void* data); + private: int m_id; /*< The id of the worker. */ int m_epoll_fd; /*< The epoll file descriptor. */ diff --git a/server/core/poll.cc b/server/core/poll.cc index c8e6aeab2..1659e37df 100644 --- a/server/core/poll.cc +++ b/server/core/poll.cc @@ -40,11 +40,8 @@ using maxscale::Worker; -static thread_local int current_thread_id; /*< This thread's ID */ static int next_epoll_fd = 0; /*< Which thread handles the next DCB */ static int n_threads; /*< Number of threads */ -static int number_poll_spins; /*< Maximum non-block polls */ -static int max_poll_sleep; /*< Maximum block time */ /** Poll cross-thread messaging variables */ @@ -52,8 +49,6 @@ static volatile int *poll_msg; static void *poll_msg_data = NULL; static SPINLOCK poll_msg_lock = SPINLOCK_INIT; -static void poll_check_message(void); - /** * Initialise the polling system we are using for the gateway. * @@ -68,9 +63,6 @@ poll_init() { exit(-1); } - - number_poll_spins = config_nbpolls(); - max_poll_sleep = config_pollsleep(); } static bool add_fd_to_worker(int wid, int fd, uint32_t events, MXS_POLL_DATA* data) @@ -111,7 +103,18 @@ static bool add_fd_to_workers(int fd, uint32_t events, MXS_POLL_DATA* data) if (rv) { // The DCB will appear on the list of the calling thread. - data->thread.id = current_thread_id; + int wid = Worker::get_current_id(); + + if (wid == -1) + { + // TODO: Listeners are created before the workers have been started. + // TODO: Hence the returned id will be -1. We change it to 0, which in + // TODO: practice will mean that they will end up on the Worker running + // TODO: in the main thread. This needs to be sorted out. + wid = 0; + } + + data->thread.id = wid; } else { @@ -188,174 +191,6 @@ bool poll_remove_fd_from_worker(int wid, int fd) return rv; } -/** - * The main polling loop - * - * @param epoll_fd The epoll descriptor. - * @param thread_id The id of the calling thread. - * @param poll_stats The polling stats of the calling thread. - * @param queue_stats The queue stats of the calling thread. - * @param should_shutdown Pointer to function returning true if the polling should - * be terminated. - * @param data Data provided to the @c should_shutdown function. - */ -void poll_waitevents(int epoll_fd, - int thread_id, - POLL_STATS* poll_stats, - QUEUE_STATS* queue_stats, - bool (*should_shutdown)(void* data), - void* data) -{ - current_thread_id = thread_id; - - struct epoll_event events[MAX_EVENTS]; - int i, nfds, timeout_bias = 1; - int poll_spins = 0; - - poll_stats->thread_state = THREAD_IDLE; - - while (!should_shutdown(data)) - { - poll_stats->thread_state = THREAD_POLLING; - - atomic_add_int64(&poll_stats->n_polls, 1); - if ((nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, 0)) == -1) - { - int eno = errno; - errno = 0; - MXS_DEBUG("%lu [poll_waitevents] epoll_wait returned " - "%d, errno %d", - pthread_self(), - nfds, - eno); - } - /* - * If there are no new descriptors from the non-blocking call - * and nothing to process on the event queue then for do a - * blocking call to epoll_wait. - * - * We calculate a timeout bias to alter the length of the blocking - * call based on the time since we last received an event to process - */ - else if (nfds == 0 && poll_spins++ > number_poll_spins) - { - if (timeout_bias < 10) - { - timeout_bias++; - } - atomic_add_int64(&poll_stats->blockingpolls, 1); - nfds = epoll_wait(epoll_fd, - events, - MAX_EVENTS, - (max_poll_sleep * timeout_bias) / 10); - if (nfds == 0) - { - poll_spins = 0; - } - } - - if (nfds > 0) - { - poll_stats->evq_length = nfds; - if (nfds > poll_stats->evq_max) - { - poll_stats->evq_max = nfds; - } - - timeout_bias = 1; - if (poll_spins <= number_poll_spins + 1) - { - atomic_add_int64(&poll_stats->n_nbpollev, 1); - } - poll_spins = 0; - MXS_DEBUG("%lu [poll_waitevents] epoll_wait found %d fds", - pthread_self(), - nfds); - atomic_add_int64(&poll_stats->n_pollev, 1); - - poll_stats->thread_state = THREAD_PROCESSING; - - poll_stats->n_fds[(nfds < MAXNFDS ? (nfds - 1) : MAXNFDS - 1)]++; - } - - uint64_t cycle_start = hkheartbeat; - - for (int i = 0; i < nfds; i++) - { - /** Calculate event queue statistics */ - int64_t started = hkheartbeat; - int64_t qtime = started - cycle_start; - - if (qtime > N_QUEUE_TIMES) - { - queue_stats->qtimes[N_QUEUE_TIMES]++; - } - else - { - queue_stats->qtimes[qtime]++; - } - - queue_stats->maxqtime = MXS_MAX(queue_stats->maxqtime, qtime); - - MXS_POLL_DATA *data = (MXS_POLL_DATA*)events[i].data.ptr; - - uint32_t actions = data->handler(data, thread_id, events[i].events); - - if (actions & MXS_POLL_ACCEPT) - { - atomic_add_int64(&poll_stats->n_accept, 1); - } - - if (actions & MXS_POLL_READ) - { - atomic_add_int64(&poll_stats->n_read, 1); - } - - if (actions & MXS_POLL_WRITE) - { - atomic_add_int64(&poll_stats->n_write, 1); - } - - if (actions & MXS_POLL_HUP) - { - atomic_add_int64(&poll_stats->n_hup, 1); - } - - if (actions & MXS_POLL_ERROR) - { - atomic_add_int64(&poll_stats->n_error, 1); - } - - /** Calculate event execution statistics */ - qtime = hkheartbeat - started; - - if (qtime > N_QUEUE_TIMES) - { - queue_stats->exectimes[N_QUEUE_TIMES]++; - } - else - { - queue_stats->exectimes[qtime % N_QUEUE_TIMES]++; - } - - queue_stats->maxexectime = MXS_MAX(queue_stats->maxexectime, qtime); - } - - dcb_process_idle_sessions(thread_id); - - poll_stats->thread_state = THREAD_ZPROCESSING; - - /** Process closed DCBs */ - dcb_process_zombies(thread_id); - - poll_check_message(); - - poll_stats->thread_state = THREAD_IDLE; - } /*< while(1) */ - - poll_stats->thread_state = THREAD_STOPPED; -} - /** * Set the number of non-blocking poll cycles that will be done before * a blocking poll will take place. Whenever an event arrives on a thread @@ -368,7 +203,7 @@ void poll_waitevents(int epoll_fd, void poll_set_nonblocking_polls(unsigned int nbpolls) { - number_poll_spins = nbpolls; + Worker::set_nonblocking_polls(nbpolls); } /** @@ -381,7 +216,7 @@ poll_set_nonblocking_polls(unsigned int nbpolls) void poll_set_maxwait(unsigned int maxwait) { - max_poll_sleep = maxwait; + Worker::set_maxwait(maxwait); } /** @@ -741,7 +576,7 @@ void poll_send_message(enum poll_message msg, void *data) for (int i = 0; i < nthr; i++) { - if (i != current_thread_id) + if (i != Worker::get_current_id()) { while (poll_msg[i] & msg) { @@ -754,9 +589,9 @@ void poll_send_message(enum poll_message msg, void *data) spinlock_release(&poll_msg_lock); } -static void poll_check_message() +void poll_check_message() { - int thread_id = current_thread_id; + int thread_id = Worker::get_current_id(); if (poll_msg[thread_id] & POLL_MSG_CLEAN_PERSISTENT) { diff --git a/server/core/worker.cc b/server/core/worker.cc index afea9b56f..f5d6f6da7 100644 --- a/server/core/worker.cc +++ b/server/core/worker.cc @@ -18,7 +18,9 @@ #include #include #include +#include #include +#include #include #include #include "maxscale/modules.h" @@ -41,8 +43,11 @@ namespace */ static struct this_unit { - int n_workers; // How many workers there are. - Worker** ppWorkers; // Array of worker instances. + int n_workers; // How many workers there are. + Worker** ppWorkers; // Array of worker instances. + int number_poll_spins; // Maximum non-block polls + int max_poll_sleep; // Maximum block time + } this_unit = { 0, @@ -159,6 +164,8 @@ Worker::~Worker() void Worker::init() { this_unit.n_workers = config_threadcount(); + this_unit.number_poll_spins = config_nbpolls(); + this_unit.max_poll_sleep = config_pollsleep(); pollStats = (POLL_STATS*)MXS_CALLOC(this_unit.n_workers, sizeof(POLL_STATS)); if (!pollStats) @@ -292,6 +299,18 @@ int Worker::get_current_id() return this_thread.current_worker_id; } +//static +void Worker::set_nonblocking_polls(unsigned int nbpolls) +{ + this_unit.number_poll_spins = nbpolls; +} + +//static +void Worker::set_maxwait(unsigned int maxwait) +{ + this_unit.max_poll_sleep = maxwait; +} + bool Worker::post_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2) { // NOTE: No logging here, this function must be signal safe. @@ -523,6 +542,172 @@ void Worker::thread_main(void* pArg) } } +/** + * The main polling loop + * + * @param epoll_fd The epoll descriptor. + * @param thread_id The id of the calling thread. + * @param poll_stats The polling stats of the calling thread. + * @param queue_stats The queue stats of the calling thread. + * @param should_shutdown Pointer to function returning true if the polling should + * be terminated. + * @param data Data provided to the @c should_shutdown function. + */ +//static +void Worker::poll_waitevents(int epoll_fd, + int thread_id, + POLL_STATS* poll_stats, + QUEUE_STATS* queue_stats, + bool (*should_shutdown)(void* data), + void* data) +{ + struct epoll_event events[MAX_EVENTS]; + int i, nfds, timeout_bias = 1; + int poll_spins = 0; + + poll_stats->thread_state = THREAD_IDLE; + + while (!should_shutdown(data)) + { + poll_stats->thread_state = THREAD_POLLING; + + atomic_add_int64(&poll_stats->n_polls, 1); + if ((nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, 0)) == -1) + { + int eno = errno; + errno = 0; + MXS_DEBUG("%lu [poll_waitevents] epoll_wait returned " + "%d, errno %d", + pthread_self(), + nfds, + eno); + } + /* + * If there are no new descriptors from the non-blocking call + * and nothing to process on the event queue then for do a + * blocking call to epoll_wait. + * + * We calculate a timeout bias to alter the length of the blocking + * call based on the time since we last received an event to process + */ + else if (nfds == 0 && poll_spins++ > this_unit.number_poll_spins) + { + if (timeout_bias < 10) + { + timeout_bias++; + } + atomic_add_int64(&poll_stats->blockingpolls, 1); + nfds = epoll_wait(epoll_fd, + events, + MAX_EVENTS, + (this_unit.max_poll_sleep * timeout_bias) / 10); + if (nfds == 0) + { + poll_spins = 0; + } + } + + if (nfds > 0) + { + poll_stats->evq_length = nfds; + if (nfds > poll_stats->evq_max) + { + poll_stats->evq_max = nfds; + } + + timeout_bias = 1; + if (poll_spins <= this_unit.number_poll_spins + 1) + { + atomic_add_int64(&poll_stats->n_nbpollev, 1); + } + poll_spins = 0; + MXS_DEBUG("%lu [poll_waitevents] epoll_wait found %d fds", + pthread_self(), + nfds); + atomic_add_int64(&poll_stats->n_pollev, 1); + + poll_stats->thread_state = THREAD_PROCESSING; + + poll_stats->n_fds[(nfds < MAXNFDS ? (nfds - 1) : MAXNFDS - 1)]++; + } + + uint64_t cycle_start = hkheartbeat; + + for (int i = 0; i < nfds; i++) + { + /** Calculate event queue statistics */ + int64_t started = hkheartbeat; + int64_t qtime = started - cycle_start; + + if (qtime > N_QUEUE_TIMES) + { + queue_stats->qtimes[N_QUEUE_TIMES]++; + } + else + { + queue_stats->qtimes[qtime]++; + } + + queue_stats->maxqtime = MXS_MAX(queue_stats->maxqtime, qtime); + + MXS_POLL_DATA *data = (MXS_POLL_DATA*)events[i].data.ptr; + + uint32_t actions = data->handler(data, thread_id, events[i].events); + + if (actions & MXS_POLL_ACCEPT) + { + atomic_add_int64(&poll_stats->n_accept, 1); + } + + if (actions & MXS_POLL_READ) + { + atomic_add_int64(&poll_stats->n_read, 1); + } + + if (actions & MXS_POLL_WRITE) + { + atomic_add_int64(&poll_stats->n_write, 1); + } + + if (actions & MXS_POLL_HUP) + { + atomic_add_int64(&poll_stats->n_hup, 1); + } + + if (actions & MXS_POLL_ERROR) + { + atomic_add_int64(&poll_stats->n_error, 1); + } + + /** Calculate event execution statistics */ + qtime = hkheartbeat - started; + + if (qtime > N_QUEUE_TIMES) + { + queue_stats->exectimes[N_QUEUE_TIMES]++; + } + else + { + queue_stats->exectimes[qtime % N_QUEUE_TIMES]++; + } + + queue_stats->maxexectime = MXS_MAX(queue_stats->maxexectime, qtime); + } + + dcb_process_idle_sessions(thread_id); + + poll_stats->thread_state = THREAD_ZPROCESSING; + + /** Process closed DCBs */ + dcb_process_zombies(thread_id); + + poll_check_message(); + + poll_stats->thread_state = THREAD_IDLE; + } /*< while(1) */ + + poll_stats->thread_state = THREAD_STOPPED; + } /** * Calls thread_init on all loaded modules. *