diff --git a/server/core/maxscale/poll.h b/server/core/maxscale/poll.h index 13113be23..d89be55eb 100644 --- a/server/core/maxscale/poll.h +++ b/server/core/maxscale/poll.h @@ -50,7 +50,10 @@ enum poll_message void poll_init(); //void poll_finish(); // TODO: Add this. -void poll_waitevents(struct mxs_worker *worker); +void poll_waitevents(int epoll_fd, + int thread_id, + bool (*should_terminate)(void* data), + void* data); void poll_set_maxwait(unsigned int); void poll_set_nonblocking_polls(unsigned int); diff --git a/server/core/poll.cc b/server/core/poll.cc index d3435caef..1461712a7 100644 --- a/server/core/poll.cc +++ b/server/core/poll.cc @@ -40,10 +40,12 @@ #include #include "maxscale/poll.h" -#include "maxscale/worker.h" +#include "maxscale/worker.hh" #define PROFILE_POLL 0 +using maxscale::Worker; + #if PROFILE_POLL extern unsigned long hkheartbeat; #endif @@ -79,7 +81,6 @@ int max_poll_sleep; */ thread_local int current_thread_id; /**< This thread's ID */ -static int *epoll_fd; /*< The epoll file descriptor */ static int next_epoll_fd = 0; /*< Which thread handles the next DCB */ static int do_shutdown = 0; /*< Flag the shutdown of the poll subsystem */ @@ -189,11 +190,6 @@ static struct */ static void poll_loadav(void *); -/** - * Function to analyse error return from epoll_ctl - */ -static int poll_resolve_error(int fd, int error, int op); - /** * Initialise the polling system we are using for the gateway. * @@ -204,21 +200,6 @@ poll_init() { n_threads = config_threadcount(); - if (!(epoll_fd = (int*)MXS_MALLOC(sizeof(int) * n_threads))) - { - return; - } - - for (int i = 0; i < n_threads; i++) - { - if ((epoll_fd[i] = epoll_create(MAX_EVENTS)) == -1) - { - MXS_ERROR("FATAL: Could not create epoll instance: %s", - mxs_strerror(errno)); - exit(-1); - } - } - if ((poll_msg = (int*)MXS_CALLOC(n_threads, sizeof(int))) == NULL) { exit(-1); @@ -279,59 +260,49 @@ static bool add_fd_to_worker(int wid, int fd, uint32_t events, MXS_POLL_DATA* da { ss_dassert((wid >= 0) && (wid <= n_threads)); - events |= EPOLLET; + Worker* worker = Worker::get(wid); + ss_dassert(worker); - struct epoll_event ev; - - ev.events = events; - ev.data.ptr = data; - - data->thread.id = wid; - int rc = epoll_ctl(epoll_fd[wid], EPOLL_CTL_ADD, fd, &ev); - - if (rc != 0) - { - rc = poll_resolve_error(fd, errno, EPOLL_CTL_ADD); - } - - return rc == 0; + return worker->add_fd(fd, events, data); } static bool add_fd_to_workers(int fd, uint32_t events, MXS_POLL_DATA* data) { - events |= EPOLLET; - - struct epoll_event ev; - - ev.events = events; - ev.data.ptr = data; - data->thread.id = current_thread_id; // The DCB will appear on the list of the calling thread. - - int stored_errno = 0; - int rc = 0; + bool rv = true; + int thread_id = data->thread.id; for (int i = 0; i < n_threads; i++) { - rc = epoll_ctl(epoll_fd[i], EPOLL_CTL_ADD, fd, &ev); + Worker* worker = Worker::get(i); + ss_dassert(worker); - if (rc != 0) + if (!worker->add_fd(fd, events, data)) { - stored_errno = errno; /** Remove the fd from the previous epoll instances */ for (int j = 0; j < i; j++) { - epoll_ctl(epoll_fd[j], EPOLL_CTL_DEL, fd, &ev); + Worker* worker = Worker::get(j); + ss_dassert(worker); + + worker->remove_fd(fd); } + rv = false; break; } } - if (rc != 0) + if (rv) { - rc = poll_resolve_error(fd, stored_errno, EPOLL_CTL_ADD); + // The DCB will appear on the list of the calling thread. + data->thread.id = current_thread_id; + } + else + { + // Restore the situation. + data->thread.id = thread_id; } - return rc == 0; + return rv; } bool poll_add_fd_to_worker(int wid, int fd, uint32_t events, MXS_POLL_DATA* data) @@ -362,16 +333,10 @@ static bool remove_fd_from_worker(int wid, int fd) { ss_dassert((wid >= 0) && (wid < n_threads)); - struct epoll_event ev = {}; + Worker* worker = Worker::get(wid); + ss_dassert(worker); - int rc = epoll_ctl(epoll_fd[wid], EPOLL_CTL_DEL, fd, &ev); - - if (rc == -1) - { - rc = poll_resolve_error(fd, errno, EPOLL_CTL_DEL); - } - - return rc == 0; + return worker->remove_fd(fd); } static bool remove_fd_from_workers(int fd) @@ -380,9 +345,11 @@ static bool remove_fd_from_workers(int fd) for (int i = 0; i < n_threads; ++i) { - // We don't store the error, anything serious and the process will - // have been taken down in poll_resolve_error(). - remove_fd_from_worker(i, fd); + Worker* worker = Worker::get(i); + ss_dassert(worker); + // We don't care about the result, anything serious and the + // function will not return and the process taken down. + worker->remove_fd(fd); } return true; @@ -404,153 +371,35 @@ bool poll_remove_fd_from_worker(int wid, int fd) return rv; } -/** - * Check error returns from epoll_ctl. Most result in a crash since they - * are "impossible". Adding when already present is assumed non-fatal. - * Likewise, removing when not present is assumed non-fatal. - * It is assumed that callers to poll routines can handle the failure - * that results from hitting system limit, although an error is written - * here to record the problem. - * - * @param errornum The errno set by epoll_ctl - * @param op Either EPOLL_CTL_ADD or EPOLL_CTL_DEL. - * @return -1 on error or 0 for possibly revised return code - */ -static int -poll_resolve_error(int fd, int errornum, int op) -{ - if (op == EPOLL_CTL_ADD) - { - if (EEXIST == errornum) - { - MXS_ERROR("%lu [poll_resolve_error] Error : epoll_ctl could not add, " - "already exists for descriptor %d.", - pthread_self(), - fd); - // Assume another thread added and no serious harm done - return 0; - } - if (ENOSPC == errornum) - { - MXS_ERROR("%lu [poll_resolve_error] The limit imposed by " - "/proc/sys/fs/epoll/max_user_watches was " - "encountered while trying to register (EPOLL_CTL_ADD) a new " - "file descriptor on an epoll instance for descriptor %d.", - pthread_self(), - fd); - /* Failure - assume handled by callers */ - return -1; - } - } - else - { - ss_dassert(op == EPOLL_CTL_DEL); - - /* Must be removing */ - if (ENOENT == errornum) - { - MXS_ERROR("%lu [poll_resolve_error] Error : epoll_ctl could not remove, " - "not found, for dcb %d.", - pthread_self(), - fd); - // Assume another thread removed and no serious harm done - return 0; - } - } - /* Common checks for add or remove - crash MaxScale */ - if (EBADF == errornum) - { - raise(SIGABRT); - } - if (EINVAL == errornum) - { - raise(SIGABRT); - } - if (ENOMEM == errornum) - { - raise(SIGABRT); - } - if (EPERM == errornum) - { - raise(SIGABRT); - } - /* Undocumented error number */ - raise(SIGABRT); - /* The following statement should never be reached, but avoids compiler warning */ - return -1; -} - /** * The main polling loop * - * This routine does the polling and despatches of IO events - * to the DCB's. It may be called either directly or as the entry point - * of a polling thread within the gateway. - * - * The routine will loop as long as the variable "shutdown" is set to zero, - * setting this to a non-zero value will cause the polling loop to return. - * - * There are two options for the polling, a debug option that is only useful if - * you have a single thread. This blocks in epoll_wait until an event occurs. - * - * The non-debug option does an epoll with a time out. This allows the checking of - * shutdown value to be checked in all threads. The algorithm for polling in this - * mode is to do a poll with no-wait, if no events are detected then the poll is - * repeated with a time out. This allows for a quick check before making the call - * with timeout. The call with the timeout differs in that the Linux scheduler may - * deschedule a process if a timeout is included, but will not do this if a 0 timeout - * value is given. this improves performance when the gateway is under heavy load. - * - * In order to provide a fairer means of sharing the threads between the different - * DCB's the poll mechanism has been decoupled from the processing of the events. - * The events are now recieved via the epoll_wait call, a queue of DCB's that have - * events pending is maintained and as new events arrive the DCB is added to the end - * of this queue. If an eent arrives for a DCB alreayd in the queue, then the event - * bits are added to the DCB but the DCB mantains the same point in the queue unless - * the original events are already being processed. If they are being processed then - * the DCB is moved to the back of the queue, this means that a DCB that is receiving - * events at a high rate will not block the execution of events for other DCB's and - * should result in a fairer polling strategy. - * - * The introduction of the ability to inject "fake" write events into the event queue meant - * that there was a possibility to "starve" new events sicne the polling loop would - * consume the event queue before looking for new events. If the DCB that inject - * the fake event then injected another fake event as a result of the first it meant - * that new events did not get added to the queue. The strategy has been updated to - * not consume the entire event queue, but process one event before doing a non-blocking - * call to add any new events before processing any more events. A blocking call to - * collect events is only made if there are no pending events to be processed on the - * event queue. - * - * Also introduced a "timeout bias" mechanism. This mechansim control the length of - * of timeout passed to epoll_wait in blocking calls based on previous behaviour. - * The initial call will block for 10% of the define timeout peroid, this will be - * increased in increments of 10% until the full timeout value is used. If at any - * point there is an event to be processed then the value will be reduced to 10% again - * for the next blocking call. - * - * @param arg The thread ID passed as a void * to satisfy the threading package + * @param epoll_fd The epoll descriptor. + * @param thread_id The id of the calling thread. + * @param should_shutdown Pointer to function returning true if the polling should + * be terminated. + * @param data Data provided to the @c should_shutdown function. */ -void -poll_waitevents(MXS_WORKER *worker) +void poll_waitevents(int epoll_fd, + int thread_id, + bool (*should_shutdown)(void* data), + void* data) { - current_thread_id = mxs_worker_id(worker); + current_thread_id = thread_id; struct epoll_event events[MAX_EVENTS]; int i, nfds, timeout_bias = 1; int poll_spins = 0; - int thread_id = current_thread_id; - thread_data[thread_id].state = THREAD_IDLE; - while (!mxs_worker_should_shutdown(worker)) + while (!should_shutdown(data)) { atomic_add(&n_waiting, 1); thread_data[thread_id].state = THREAD_POLLING; ts_stats_increment(pollStats.n_polls, thread_id); - if ((nfds = epoll_wait(epoll_fd[thread_id], events, MAX_EVENTS, 0)) == -1) + if ((nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, 0)) == -1) { atomic_add(&n_waiting, -1); int eno = errno; @@ -577,7 +426,7 @@ poll_waitevents(MXS_WORKER *worker) timeout_bias++; } ts_stats_increment(pollStats.blockingpolls, thread_id); - nfds = epoll_wait(epoll_fd[thread_id], + nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, (max_poll_sleep * timeout_bias) / 10); diff --git a/server/core/worker.cc b/server/core/worker.cc index 622c1281d..a41d0d9e9 100644 --- a/server/core/worker.cc +++ b/server/core/worker.cc @@ -311,10 +311,20 @@ size_t mxs_worker_broadcast_message(uint32_t msg_id, intptr_t arg1, intptr_t arg return Worker::broadcast_message(msg_id, arg1, arg2); } +namespace +{ + +bool should_shutdown(void* pData) +{ + return static_cast(pData)->should_shutdown(); +} + +} + void Worker::run() { this_thread.current_worker_id = m_id; - poll_waitevents(this); + poll_waitevents(m_epoll_fd, m_id, ::should_shutdown, this); this_thread.current_worker_id = WORKER_ABSENT_ID; MXS_NOTICE("Worker %d has shut down.", m_id); @@ -400,7 +410,7 @@ Worker* Worker::create(int worker_id) if (pWorker) { - if (!poll_add_fd_to_worker(worker_id, read_fd, EPOLLIN, &pWorker->m_poll)) + if (!pWorker->add_fd(read_fd, EPOLLIN, &pWorker->m_poll)) { MXS_ERROR("Could not add read descriptor of worker to poll set: %s", mxs_strerror(errno)); delete pWorker;