diff --git a/include/maxscale/dcb.h b/include/maxscale/dcb.h index 98237c685..80ac0b32d 100644 --- a/include/maxscale/dcb.h +++ b/include/maxscale/dcb.h @@ -227,6 +227,7 @@ typedef struct dcb DCBEVENTQ evq; /**< The event queue for this DCB */ int fd; /**< The descriptor */ dcb_state_t state; /**< Current descriptor state */ + int owner; /**< Owning thread */ SSL_STATE ssl_state; /**< Current state of SSL if in use */ int flags; /**< DCB flags */ char *remote; /**< Address of remote end */ diff --git a/server/core/poll.c b/server/core/poll.c index 90295a9bb..7b9b75768 100644 --- a/server/core/poll.c +++ b/server/core/poll.c @@ -81,7 +81,8 @@ int max_poll_sleep; */ #define MUTEX_EPOLL 0 -static int epoll_fd = -1; /*< The epoll file descriptor */ +static int *epoll_fd; /*< The epoll file descriptor */ +static int next_epoll_fd = 0; /*< Which thread handles the next DCB */ static int do_shutdown = 0; /*< Flag the shutdown of the poll subsystem */ static GWBITMASK poll_mask; #if MUTEX_EPOLL @@ -89,7 +90,7 @@ static simple_mutex_t epoll_wait_mutex; /*< serializes calls to epoll_wait */ #endif static int n_waiting = 0; /*< No. of threads in epoll_wait */ -static int process_pollq(int thread_id); +static int process_pollq(int thread_id, struct epoll_event *event); static void poll_add_event_to_dcb(DCB* dcb, GWBUF* buf, __uint32_t ev); static bool poll_dcb_session_check(DCB *dcb, const char *); @@ -206,26 +207,30 @@ static int poll_resolve_error(DCB *, int, bool); void poll_init() { - int i; + n_threads = config_threadcount(); - if (epoll_fd != -1) + if (!(epoll_fd = MXS_MALLOC(sizeof(int) * n_threads))) { return; } - if ((epoll_fd = epoll_create(MAX_EVENTS)) == -1) + + for (int i = 0; i < n_threads; i++) { - char errbuf[MXS_STRERROR_BUFLEN]; - MXS_ERROR("FATAL: Could not create epoll instance: %s", strerror_r(errno, errbuf, sizeof(errbuf))); - exit(-1); + if ((epoll_fd[i] = epoll_create(MAX_EVENTS)) == -1) + { + char errbuf[MXS_STRERROR_BUFLEN]; + MXS_ERROR("FATAL: Could not create epoll instance: %s", strerror_r(errno, errbuf, sizeof(errbuf))); + exit(-1); + } } + memset(&pollStats, 0, sizeof(pollStats)); memset(&queueStats, 0, sizeof(queueStats)); bitmask_init(&poll_mask); - n_threads = config_threadcount(); thread_data = (THREAD_DATA *)MXS_MALLOC(n_threads * sizeof(THREAD_DATA)); if (thread_data) { - for (i = 0; i < n_threads; i++) + for (int i = 0; i < n_threads; i++) { thread_data[i].state = THREAD_STOPPED; } @@ -254,13 +259,13 @@ poll_init() n_avg_samples = 15 * 60 / POLL_LOAD_FREQ; avg_samples = (double *)MXS_MALLOC(sizeof(double) * n_avg_samples); MXS_ABORT_IF_NULL(avg_samples); - for (i = 0; i < n_avg_samples; i++) + for (int i = 0; i < n_avg_samples; i++) { avg_samples[i] = 0.0; } evqp_samples = (int *)MXS_MALLOC(sizeof(int) * n_avg_samples); MXS_ABORT_IF_NULL(evqp_samples); - for (i = 0; i < n_avg_samples; i++) + for (int i = 0; i < n_avg_samples; i++) { evqp_samples[i] = 0.0; } @@ -339,7 +344,20 @@ poll_add_dcb(DCB *dcb) * The only possible failure that will not cause a crash is * running out of system resources. */ - rc = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, dcb->fd, &ev); + int owner = 0; + + if (dcb->dcb_role == DCB_ROLE_BACKEND_HANDLER) + { + owner = dcb->session->client_dcb->owner; + } + else + { + owner = (unsigned int)atomic_add(&next_epoll_fd, 1) % n_threads; + } + + dcb->owner = owner; + + rc = epoll_ctl(epoll_fd[owner], EPOLL_CTL_ADD, dcb->fd, &ev); if (rc) { /* Some errors are actually considered acceptable */ @@ -406,7 +424,7 @@ poll_remove_dcb(DCB *dcb) spinlock_release(&dcb->dcb_initlock); if (dcbfd > 0) { - rc = epoll_ctl(epoll_fd, EPOLL_CTL_DEL, dcbfd, &ev); + rc = epoll_ctl(epoll_fd[dcb->owner], EPOLL_CTL_DEL, dcbfd, &ev); /** * The poll_resolve_error function will always * return 0 or crash. So if it returns non-zero result, @@ -570,11 +588,6 @@ poll_waitevents(void *arg) while (1) { - if (pollStats.evq_pending == 0 && timeout_bias < 10) - { - timeout_bias++; - } - atomic_add(&n_waiting, 1); #if BLOCKINGPOLL nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1); @@ -589,7 +602,7 @@ poll_waitevents(void *arg) } ts_stats_increment(pollStats.n_polls, thread_id); - if ((nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, 0)) == -1) + if ((nfds = epoll_wait(epoll_fd[thread_id], events, MAX_EVENTS, 0)) == -1) { atomic_add(&n_waiting, -1); int eno = errno; @@ -609,14 +622,18 @@ poll_waitevents(void *arg) * We calculate a timeout bias to alter the length of the blocking * call based on the time since we last received an event to process */ - else if (nfds == 0 && pollStats.evq_pending == 0 && poll_spins++ > number_poll_spins) + else if (nfds == 0 && poll_spins++ > number_poll_spins) { + if (timeout_bias < 10) + { + timeout_bias++; + } ts_stats_increment(pollStats.blockingpolls, thread_id); - nfds = epoll_wait(epoll_fd, + nfds = epoll_wait(epoll_fd[thread_id], events, MAX_EVENTS, (max_poll_sleep * timeout_bias) / 10); - if (nfds == 0 && pollStats.evq_pending) + if (nfds == 0) { atomic_add(&pollStats.wake_evqpending, 1); poll_spins = 0; @@ -671,47 +688,6 @@ poll_waitevents(void *arg) * idle and is added to the queue to process after * setting the event bits. */ - for (i = 0; i < nfds; i++) - { - DCB *dcb = (DCB *)events[i].data.ptr; - __uint32_t ev = events[i].events; - - spinlock_acquire(&pollqlock); - if (DCB_POLL_BUSY(dcb)) - { - if (dcb->evq.pending_events == 0) - { - pollStats.evq_pending++; - dcb->evq.inserted = hkheartbeat; - } - dcb->evq.pending_events |= ev; - } - else - { - dcb->evq.pending_events = ev; - if (eventq) - { - dcb->evq.prev = eventq->evq.prev; - eventq->evq.prev->evq.next = dcb; - eventq->evq.prev = dcb; - dcb->evq.next = eventq; - } - else - { - eventq = dcb; - dcb->evq.prev = dcb; - dcb->evq.next = dcb; - } - pollStats.evq_length++; - pollStats.evq_pending++; - dcb->evq.inserted = hkheartbeat; - if (pollStats.evq_length > pollStats.evq_max) - { - pollStats.evq_max = pollStats.evq_length; - } - } - spinlock_release(&pollqlock); - } } /* @@ -720,9 +696,10 @@ poll_waitevents(void *arg) * precautionary measure to avoid issues if the house keeping * of the count goes wrong. */ - if (process_pollq(thread_id)) + + for (int i = 0; i < nfds; i++) { - timeout_bias = 1; + process_pollq(thread_id, &events[i]); } if (check_timeouts && hkheartbeat >= next_timeout_check) @@ -811,61 +788,14 @@ poll_set_maxwait(unsigned int maxwait) * @return 0 if no DCB's have been processed */ static int -process_pollq(int thread_id) +process_pollq(int thread_id, struct epoll_event *event) { - DCB *dcb; int found = 0; - uint32_t ev; + uint32_t ev = event->events; unsigned long qtime; - spinlock_acquire(&pollqlock); - if (eventq == NULL) - { - /* Nothing to process */ - spinlock_release(&pollqlock); - return 0; - } - dcb = eventq; - if (dcb->evq.next == dcb->evq.prev && dcb->evq.processing == 0) - { - found = 1; - dcb->evq.processing = 1; - } - else if (dcb->evq.next == dcb->evq.prev) - { - /* Only item in queue is being processed */ - spinlock_release(&pollqlock); - return 0; - } - else - { - do - { - dcb = dcb->evq.next; - } - while (dcb != eventq && dcb->evq.processing == 1); - - if (dcb->evq.processing == 0) - { - /* Found DCB to process */ - dcb->evq.processing = 1; - found = 1; - } - } - if (found) - { - ev = dcb->evq.pending_events; - dcb->evq.processing_events = ev; - dcb->evq.pending_events = 0; - pollStats.evq_pending--; - ss_dassert(pollStats.evq_pending >= 0); - } - spinlock_release(&pollqlock); - - if (found == 0) - { - return 0; - } + DCB *dcb = event->data.ptr; + atomic_add(&pollStats.evq_pending, -1); #if PROFILE_POLL memlog_log(plog, hkheartbeat - dcb->evq.inserted); @@ -1100,59 +1030,8 @@ process_pollq(int thread_id) queueStats.maxexectime = qtime; } - spinlock_acquire(&pollqlock); - dcb->evq.processing_events = 0; - - if (dcb->evq.pending_events == 0) - { - /* No pending events so remove from the queue */ - if (dcb->evq.prev != dcb) - { - dcb->evq.prev->evq.next = dcb->evq.next; - dcb->evq.next->evq.prev = dcb->evq.prev; - if (eventq == dcb) - { - eventq = dcb->evq.next; - } - } - else - { - eventq = NULL; - } - dcb->evq.next = NULL; - dcb->evq.prev = NULL; - pollStats.evq_length--; - } - else - { - /* - * We have a pending event, move to the end of the queue - * if there are any other DCB's in the queue. - * - * If we are the first item on the queue this is easy, we - * just bump the eventq pointer. - */ - if (dcb->evq.prev != dcb) - { - if (eventq == dcb) - { - eventq = dcb->evq.next; - } - else - { - dcb->evq.prev->evq.next = dcb->evq.next; - dcb->evq.next->evq.prev = dcb->evq.prev; - dcb->evq.prev = eventq->evq.prev; - dcb->evq.next = eventq; - eventq->evq.prev = dcb; - dcb->evq.prev->evq.next = dcb; - } - } - } - dcb->evq.processing = 0; /** Reset session id from thread's local storage */ mxs_log_tls.li_sesid = 0; - spinlock_release(&pollqlock); return 1; }