From 92ce8a47ba9c6d80b53ed50f31b474d7d08829a9 Mon Sep 17 00:00:00 2001 From: Mark Riddoch Date: Wed, 24 Sep 2014 16:34:34 +0100 Subject: [PATCH] Introduction of a new polling mechanism to make the thread usage more fair and avoid having busy DCB's being able to block execution of events on less busy DCBs --- server/core/dcb.c | 113 +------- server/core/poll.c | 619 ++++++++++++++++++++++++++----------------- server/include/dcb.h | 20 +- 3 files changed, 394 insertions(+), 358 deletions(-) diff --git a/server/core/dcb.c b/server/core/dcb.c index 1706da044..94fad72f6 100644 --- a/server/core/dcb.c +++ b/server/core/dcb.c @@ -141,6 +141,13 @@ DCB *rval; rval->polloutbusy = 0; rval->writecheck = 0; rval->fd = -1; + + rval->evq.next = NULL; + rval->evq.prev = NULL; + rval->evq.pending_events = 0; + rval->evq.processing = 0; + spinlock_init(&rval->evq.eventqlock); + memset(&rval->stats, 0, sizeof(DCBSTATS)); // Zero the statistics rval->state = DCB_STATE_ALLOC; bitmask_init(&rval->memdata.bitmask); @@ -446,9 +453,10 @@ bool succp = false; CHK_DCB(ptr); /* - * Skip processing of the excluded DCB + * Skip processing of the excluded DCB or DCB's that are + * in the event queue waiting to be processed. */ - if (ptr == excluded) + if (ptr == excluded || ptr->evq.next || ptr->evq.prev) { lptr = ptr; ptr = ptr->memdata.next; @@ -1316,10 +1324,6 @@ DCB *dcb; dcb_printf(pdcb, "\t\tNo. of Writes: %d\n", dcb->stats.n_writes); dcb_printf(pdcb, "\t\tNo. of Buffered Writes: %d\n", dcb->stats.n_buffered); dcb_printf(pdcb, "\t\tNo. of Accepts: %d\n", dcb->stats.n_accepts); - dcb_printf(pdcb, "\t\tNo. of busy polls: %d\n", dcb->stats.n_busypolls); - dcb_printf(pdcb, "\t\tNo. of read rechecks: %d\n", dcb->stats.n_readrechecks); - dcb_printf(pdcb, "\t\tNo. of busy write polls: %d\n", dcb->stats.n_busywrpolls); - dcb_printf(pdcb, "\t\tNo. of write rechecks: %d\n", dcb->stats.n_writerechecks); dcb_printf(pdcb, "\t\tNo. of High Water Events: %d\n", dcb->stats.n_high_water); dcb_printf(pdcb, "\t\tNo. of Low Water Events: %d\n", dcb->stats.n_low_water); if (dcb->flags & DCBF_CLONE) @@ -1427,10 +1431,6 @@ dprintDCB(DCB *pdcb, DCB *dcb) dcb->stats.n_buffered); dcb_printf(pdcb, "\t\tNo. of Accepts: %d\n", dcb->stats.n_accepts); - dcb_printf(pdcb, "\t\tNo. of busy polls: %d\n", dcb->stats.n_busypolls); - dcb_printf(pdcb, "\t\tNo. of read rechecks: %d\n", dcb->stats.n_readrechecks); - dcb_printf(pdcb, "\t\tNo. of busy write polls: %d\n", dcb->stats.n_busywrpolls); - dcb_printf(pdcb, "\t\tNo. of write rechecks: %d\n", dcb->stats.n_writerechecks); dcb_printf(pdcb, "\t\tNo. of High Water Events: %d\n", dcb->stats.n_high_water); dcb_printf(pdcb, "\t\tNo. of Low Water Events: %d\n", @@ -1955,99 +1955,6 @@ int rval = 0; return rval; } -/** - * Called by the EPOLLIN event. Take care of calling the protocol - * read entry point and managing multiple threads competing for the DCB - * without blocking those threads. - * - * This mechanism does away with the need for a mutex on the EPOLLIN event - * and instead implements a queuing mechanism in which nested events are - * queued on the DCB such that when the thread processing the first event - * returns it will read the queued event and process it. This allows the - * thread that would otherwise have to wait to process the nested event - * to return immediately and and process other events. - * - * @param dcb The DCB that has data available - * @param thread_id The ID of the calling thread - * @param nozombies If non-zero then do not do zombie processing - */ -void -dcb_pollin(DCB *dcb, int thread_id, int nozombies) -{ - - spinlock_acquire(&dcb->pollinlock); - if (dcb->pollinbusy == 0) - { - dcb->pollinbusy = 1; - do { - if (dcb->readcheck) - { - dcb->stats.n_readrechecks++; - if (!nozombies) - dcb_process_zombies(thread_id, dcb); - } - dcb->readcheck = 0; - spinlock_release(&dcb->pollinlock); - dcb->func.read(dcb); - spinlock_acquire(&dcb->pollinlock); - } while (dcb->readcheck); - dcb->pollinbusy = 0; - } - else - { - dcb->stats.n_busypolls++; - dcb->readcheck = 1; - } - spinlock_release(&dcb->pollinlock); -} - - -/** - * Called by the EPOLLOUT event. Take care of calling the protocol - * write_ready entry point and managing multiple threads competing for the DCB - * without blocking those threads. - * - * This mechanism does away with the need for a mutex on the EPOLLOUT event - * and instead implements a queuing mechanism in which nested events are - * queued on the DCB such that when the thread processing the first event - * returns it will read the queued event and process it. This allows the - * thread that would otherwise have to wait to process the nested event - * to return immediately and and process other events. - * - * @param dcb The DCB thats available for writes - * @param thread_id The ID of the calling thread - * @param nozombies If non-zero then do not do zombie processing - */ -void -dcb_pollout(DCB *dcb, int thread_id, int nozombies) -{ - - spinlock_acquire(&dcb->polloutlock); - if (dcb->polloutbusy == 0) - { - dcb->polloutbusy = 1; - do { - if (dcb->writecheck) - { - if (!nozombies) - dcb_process_zombies(thread_id, dcb); - dcb->stats.n_writerechecks++; - } - dcb->writecheck = 0; - spinlock_release(&dcb->polloutlock); - dcb->func.write_ready(dcb); - spinlock_acquire(&dcb->polloutlock); - } while (dcb->writecheck); - dcb->polloutbusy = 0; - } - else - { - dcb->stats.n_busywrpolls++; - dcb->writecheck = 1; - } - spinlock_release(&dcb->polloutlock); -} - /** * Get the next DCB in the list of all DCB's diff --git a/server/core/poll.c b/server/core/poll.c index 781b05b2c..3d396a76d 100644 --- a/server/core/poll.c +++ b/server/core/poll.c @@ -69,6 +69,11 @@ static GWBITMASK poll_mask; static simple_mutex_t epoll_wait_mutex; /*< serializes calls to epoll_wait */ #endif static int n_waiting = 0; /*< No. of threads in epoll_wait */ +static int process_pollq(int thread_id); + + +DCB *eventq = NULL; +SPINLOCK pollqlock = SPINLOCK_INIT; /** * Thread load average, this is the average number of descriptors in each @@ -129,6 +134,8 @@ static struct { int n_nothreads; /*< Number of times no threads are polling */ int n_fds[MAXNFDS]; /*< Number of wakeups with particular n_fds value */ + int evq_length; /*< Event queue length */ + int evq_max; /*< Maximum event queue length */ } pollStats; /** @@ -338,6 +345,17 @@ return_rc: * deschedule a process if a timeout is included, but will not do this if a 0 timeout * value is given. this improves performance when the gateway is under heavy load. * + * In order to provide a fairer means of sharign the threads between the different + * DCB's the poll mechanism has been decoupled from the processing of the events. + * The events are now recieved via the epoll_wait call, a queue of DCB's that have + * events pending is maintained and as new events arrive the DCB is added to the end + * of this queue. If an eent arrives for a DCB alreayd in the queue, then the event + * bits are added to the DCB but the DCB mantains the same point inthe queue unless + * the original events are already being processed. If they are being processed then + * the DCB is moved to the back of the queue, this means that a DCB that is receiving + * events at a high rate will not block the execution of events for other DCB's and + * should result in a fairer polling strategy. + * * @param arg The thread ID passed as a void * to satisfy the threading package */ void @@ -346,12 +364,7 @@ poll_waitevents(void *arg) struct epoll_event events[MAX_EVENTS]; int i, nfds; int thread_id = (int)arg; -bool no_op = false; -static bool process_zombies_only = false; /*< flag for all threads */ DCB *zombies = NULL; -#if PROFILE_POLL -CYCLES cycles[2]; -#endif /** Add this thread to the bitmask of running polling threads */ bitmask_set(&poll_mask, thread_id); @@ -365,20 +378,15 @@ CYCLES cycles[2]; while (1) { + /* Process of the queue of waiting requests */ + while (process_pollq(thread_id)) + zombies = dcb_process_zombies(thread_id, NULL); + atomic_add(&n_waiting, 1); #if BLOCKINGPOLL nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1); atomic_add(&n_waiting, -1); #else /* BLOCKINGPOLL */ - if (!no_op) { - LOGIF(LD, (skygw_log_write( - LOGFILE_DEBUG, - "%lu [poll_waitevents] MaxScale thread " - "%d > epoll_wait <", - pthread_self(), - thread_id))); - no_op = TRUE; - } #if MUTEX_EPOLL simple_mutex_lock(&epoll_wait_mutex, TRUE); #endif @@ -399,32 +407,14 @@ CYCLES cycles[2]; pthread_self(), nfds, eno))); - no_op = FALSE; } else if (nfds == 0) { - atomic_add(&n_waiting, -1); - if (process_zombies_only) { -#if MUTEX_EPOLL - simple_mutex_unlock(&epoll_wait_mutex); -#endif - goto process_zombies; - } else { - nfds = epoll_wait(epoll_fd, + atomic_add(&n_waiting, 1); + nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, EPOLL_TIMEOUT); - /*< - * When there are zombies to be cleaned up but - * no client requests, allow all threads to call - * dcb_process_zombies without having to wait - * for the timeout. - */ - if (nfds == 0 && dcb_get_zombies() != NULL) - { - process_zombies_only = true; - } - } } else { @@ -460,241 +450,60 @@ CYCLES cycles[2]; atomic_add(&load_samples, 1); atomic_add(&load_nfds, nfds); + /* + * Process every DCB that has a new event and add + * it to the poll queue. + * If the DCB is currently beign processed then we + * or in the new eent bits to the pending event bits + * and leave it in the queue. + * If the DCB was not already in the queue then it was + * idle and is added to the queue to process after + * setting the eent bits. + */ for (i = 0; i < nfds; i++) { DCB *dcb = (DCB *)events[i].data.ptr; __uint32_t ev = events[i].events; -#if PROFILE_POLL - if (i > 0) + spinlock_acquire(&pollqlock); + if (DCB_POLL_BUSY(dcb)) { - LOGIF(LT, (skygw_log_write( - LOGFILE_TRACE, - "Delayed behind event that " - "took %ld cycles", - cycles[1] - cycles[0]))); + dcb->evq.pending_events |= ev; } - cycles[0] = rdtsc(); -#endif - - CHK_DCB(dcb); - if (thread_data) + else { - thread_data[thread_id].cur_dcb = dcb; - thread_data[thread_id].event = ev; - } - -#if defined(SS_DEBUG) - if (dcb_fake_write_ev[dcb->fd] != 0) { - LOGIF(LD, (skygw_log_write( - LOGFILE_DEBUG, - "%lu [poll_waitevents] " - "Added fake events %d to ev %d.", - pthread_self(), - dcb_fake_write_ev[dcb->fd], - ev))); - ev |= dcb_fake_write_ev[dcb->fd]; - dcb_fake_write_ev[dcb->fd] = 0; - } -#endif - ss_debug(spinlock_acquire(&dcb->dcb_initlock);) - ss_dassert(dcb->state != DCB_STATE_ALLOC); - ss_dassert(dcb->state != DCB_STATE_DISCONNECTED); - ss_dassert(dcb->state != DCB_STATE_FREED); - ss_debug(spinlock_release(&dcb->dcb_initlock);) - - LOGIF(LD, (skygw_log_write( - LOGFILE_DEBUG, - "%lu [poll_waitevents] event %d dcb %p " - "role %s", - pthread_self(), - ev, - dcb, - STRDCBROLE(dcb->dcb_role)))); - - if (ev & EPOLLOUT) - { - int eno = 0; - eno = gw_getsockerrno(dcb->fd); - - if (eno == 0) { -#if MUTEX_BLOCK - simple_mutex_lock( - &dcb->dcb_write_lock, - true); - ss_info_dassert( - !dcb->dcb_write_active, - "Write already active"); - dcb->dcb_write_active = TRUE; - atomic_add( - &pollStats.n_write, - 1); - dcb->func.write_ready(dcb); - dcb->dcb_write_active = FALSE; - simple_mutex_unlock( - &dcb->dcb_write_lock); -#else - atomic_add(&pollStats.n_write, - 1); - dcb_pollout(dcb, thread_id, nfds); -#endif - } else { - LOGIF(LD, (skygw_log_write( - LOGFILE_DEBUG, - "%lu [poll_waitevents] " - "EPOLLOUT due %d, %s. " - "dcb %p, fd %i", - pthread_self(), - eno, - strerror(eno), - dcb, - dcb->fd))); - } - } - if (ev & EPOLLIN) - { -#if MUTEX_BLOCK - simple_mutex_lock(&dcb->dcb_read_lock, - true); - ss_info_dassert(!dcb->dcb_read_active, - "Read already active"); - dcb->dcb_read_active = TRUE; -#endif - - if (dcb->state == DCB_STATE_LISTENING) + dcb->evq.pending_events = ev; + if (eventq) { - LOGIF(LD, (skygw_log_write( - LOGFILE_DEBUG, - "%lu [poll_waitevents] " - "Accept in fd %d", - pthread_self(), - dcb->fd))); - atomic_add( - &pollStats.n_accept, 1); - dcb->func.accept(dcb); - } - else - { - LOGIF(LD, (skygw_log_write( - LOGFILE_DEBUG, - "%lu [poll_waitevents] " - "Read in dcb %p fd %d", - pthread_self(), - dcb, - dcb->fd))); - atomic_add(&pollStats.n_read, 1); -#if MUTEX_BLOCK - dcb->func.read(dcb); -#else - dcb_pollin(dcb, thread_id, nfds); -#endif - } -#if MUTEX_BLOCK - dcb->dcb_read_active = FALSE; - simple_mutex_unlock( - &dcb->dcb_read_lock); -#endif - } - if (ev & EPOLLERR) - { - int eno = gw_getsockerrno(dcb->fd); -#if defined(SS_DEBUG) - if (eno == 0) { - eno = dcb_fake_write_errno[dcb->fd]; - LOGIF(LD, (skygw_log_write( - LOGFILE_DEBUG, - "%lu [poll_waitevents] " - "Added fake errno %d. " - "%s", - pthread_self(), - eno, - strerror(eno)))); - } - dcb_fake_write_errno[dcb->fd] = 0; -#endif - if (eno != 0) { - LOGIF(LD, (skygw_log_write( - LOGFILE_DEBUG, - "%lu [poll_waitevents] " - "EPOLLERR due %d, %s.", - pthread_self(), - eno, - strerror(eno)))); - } - atomic_add(&pollStats.n_error, 1); - dcb->func.error(dcb); - } - - if (ev & EPOLLHUP) - { - int eno = 0; - eno = gw_getsockerrno(dcb->fd); - - LOGIF(LD, (skygw_log_write( - LOGFILE_DEBUG, - "%lu [poll_waitevents] " - "EPOLLHUP on dcb %p, fd %d. " - "Errno %d, %s.", - pthread_self(), - dcb, - dcb->fd, - eno, - strerror(eno)))); - atomic_add(&pollStats.n_hup, 1); - spinlock_acquire(&dcb->dcb_initlock); - if ((dcb->flags & DCBF_HUNG) == 0) - { - dcb->flags |= DCBF_HUNG; - spinlock_release(&dcb->dcb_initlock); - dcb->func.hangup(dcb); + dcb->evq.prev = eventq->evq.prev; + eventq->evq.prev->evq.next = dcb; + eventq->evq.prev = dcb; + dcb->evq.next = eventq; } else - spinlock_release(&dcb->dcb_initlock); - } - - if (ev & EPOLLRDHUP) - { - int eno = 0; - eno = gw_getsockerrno(dcb->fd); - - LOGIF(LD, (skygw_log_write( - LOGFILE_DEBUG, - "%lu [poll_waitevents] " - "EPOLLRDHUP on dcb %p, fd %d. " - "Errno %d, %s.", - pthread_self(), - dcb, - dcb->fd, - eno, - strerror(eno)))); - atomic_add(&pollStats.n_hup, 1); - spinlock_acquire(&dcb->dcb_initlock); - if ((dcb->flags & DCBF_HUNG) == 0) { - dcb->flags |= DCBF_HUNG; - spinlock_release(&dcb->dcb_initlock); - dcb->func.hangup(dcb); + eventq = dcb; + dcb->evq.prev = dcb; + dcb->evq.next = dcb; + } + pollStats.evq_length++; + if (pollStats.evq_length > pollStats.evq_max) + { + pollStats.evq_max = pollStats.evq_length; } - else - spinlock_release(&dcb->dcb_initlock); } -#if PROFILE_POLL - cycles[1] = rdtsc(); -#endif - } /*< for */ - no_op = FALSE; + spinlock_release(&pollqlock); + } + + /*< for */ } -process_zombies: + if (thread_data) { thread_data[thread_id].state = THREAD_ZPROCESSING; } zombies = dcb_process_zombies(thread_id, NULL); - if (zombies == NULL) { - process_zombies_only = false; - } - if (do_shutdown) { /*< @@ -717,6 +526,316 @@ process_zombies: mysql_thread_end(); } +/** + * Process of the queue of DCB's that have outstanding events + * + * The first event on the queue will be chosen to be executed by this thread, + * all other events will be left on the queue and may be picked up by other + * threads. When the processing is complete the thread will take the DCB off the + * queue if there are no pending events that have arrived since the thread started + * to process the DCB. If there are pending events the DCB will be moved to the + * back of the queue so that other DCB's will have a share of the threads to + * execute events for them. + * + * @param thread_id The thread ID of the calling thread + * @return 0 if no DCB's have been processed + */ +static int +process_pollq(int thread_id) +{ +DCB *dcb; +int found = 0; +uint32_t ev; + + spinlock_acquire(&pollqlock); + if (eventq == NULL) + { + /* Nothing to process */ + spinlock_release(&pollqlock); + return 0; + } + dcb = eventq; + if (dcb->evq.next == dcb->evq.prev && dcb->evq.processing == 0) + { + found = 1; + dcb->evq.processing = 1; + } + else if (dcb->evq.next == dcb->evq.prev) + { + /* Only item in queue is being processed */ + spinlock_release(&pollqlock); + return 0; + } + else + { + do { + dcb = dcb->evq.next; + } while (dcb != eventq && dcb->evq.processing == 1); + + if (dcb->evq.processing == 0) + { + /* Found DCB to process */ + dcb->evq.processing = 1; + found = 1; + } + } + if (found) + { + ev = dcb->evq.pending_events; + dcb->evq.pending_events = 0; + } + spinlock_release(&pollqlock); + + if (found == 0) + return 0; + + + CHK_DCB(dcb); + if (thread_data) + { + thread_data[thread_id].state = THREAD_PROCESSING; + thread_data[thread_id].cur_dcb = dcb; + thread_data[thread_id].event = ev; + } + +#if defined(SS_DEBUG) + if (dcb_fake_write_ev[dcb->fd] != 0) { + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [poll_waitevents] " + "Added fake events %d to ev %d.", + pthread_self(), + dcb_fake_write_ev[dcb->fd], + ev))); + ev |= dcb_fake_write_ev[dcb->fd]; + dcb_fake_write_ev[dcb->fd] = 0; + } +#endif + ss_debug(spinlock_acquire(&dcb->dcb_initlock);) + ss_dassert(dcb->state != DCB_STATE_ALLOC); + ss_dassert(dcb->state != DCB_STATE_DISCONNECTED); + ss_dassert(dcb->state != DCB_STATE_FREED); + ss_debug(spinlock_release(&dcb->dcb_initlock);) + + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [poll_waitevents] event %d dcb %p " + "role %s", + pthread_self(), + ev, + dcb, + STRDCBROLE(dcb->dcb_role)))); + + if (ev & EPOLLOUT) + { + int eno = 0; + eno = gw_getsockerrno(dcb->fd); + + if (eno == 0) { +#if MUTEX_BLOCK + simple_mutex_lock( + &dcb->dcb_write_lock, + true); + ss_info_dassert( + !dcb->dcb_write_active, + "Write already active"); + dcb->dcb_write_active = TRUE; + atomic_add( + &pollStats.n_write, + 1); + dcb->func.write_ready(dcb); + dcb->dcb_write_active = FALSE; + simple_mutex_unlock( + &dcb->dcb_write_lock); +#else + atomic_add(&pollStats.n_write, + 1); + dcb->func.write_ready(dcb); +#endif + } else { + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [poll_waitevents] " + "EPOLLOUT due %d, %s. " + "dcb %p, fd %i", + pthread_self(), + eno, + strerror(eno), + dcb, + dcb->fd))); + } + } + if (ev & EPOLLIN) + { +#if MUTEX_BLOCK + simple_mutex_lock(&dcb->dcb_read_lock, + true); + ss_info_dassert(!dcb->dcb_read_active, + "Read already active"); + dcb->dcb_read_active = TRUE; +#endif + + if (dcb->state == DCB_STATE_LISTENING) + { + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [poll_waitevents] " + "Accept in fd %d", + pthread_self(), + dcb->fd))); + atomic_add( + &pollStats.n_accept, 1); + dcb->func.accept(dcb); + } + else + { + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [poll_waitevents] " + "Read in dcb %p fd %d", + pthread_self(), + dcb, + dcb->fd))); + atomic_add(&pollStats.n_read, 1); + dcb->func.read(dcb); + } +#if MUTEX_BLOCK + dcb->dcb_read_active = FALSE; + simple_mutex_unlock( + &dcb->dcb_read_lock); +#endif + } + if (ev & EPOLLERR) + { + int eno = gw_getsockerrno(dcb->fd); +#if defined(SS_DEBUG) + if (eno == 0) { + eno = dcb_fake_write_errno[dcb->fd]; + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [poll_waitevents] " + "Added fake errno %d. " + "%s", + pthread_self(), + eno, + strerror(eno)))); + } + dcb_fake_write_errno[dcb->fd] = 0; +#endif + if (eno != 0) { + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [poll_waitevents] " + "EPOLLERR due %d, %s.", + pthread_self(), + eno, + strerror(eno)))); + } + atomic_add(&pollStats.n_error, 1); + dcb->func.error(dcb); + } + + if (ev & EPOLLHUP) + { + int eno = 0; + eno = gw_getsockerrno(dcb->fd); + + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [poll_waitevents] " + "EPOLLHUP on dcb %p, fd %d. " + "Errno %d, %s.", + pthread_self(), + dcb, + dcb->fd, + eno, + strerror(eno)))); + atomic_add(&pollStats.n_hup, 1); + spinlock_acquire(&dcb->dcb_initlock); + if ((dcb->flags & DCBF_HUNG) == 0) + { + dcb->flags |= DCBF_HUNG; + spinlock_release(&dcb->dcb_initlock); + dcb->func.hangup(dcb); + } + else + spinlock_release(&dcb->dcb_initlock); + } + + if (ev & EPOLLRDHUP) + { + int eno = 0; + eno = gw_getsockerrno(dcb->fd); + + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [poll_waitevents] " + "EPOLLRDHUP on dcb %p, fd %d. " + "Errno %d, %s.", + pthread_self(), + dcb, + dcb->fd, + eno, + strerror(eno)))); + atomic_add(&pollStats.n_hup, 1); + spinlock_acquire(&dcb->dcb_initlock); + if ((dcb->flags & DCBF_HUNG) == 0) + { + dcb->flags |= DCBF_HUNG; + spinlock_release(&dcb->dcb_initlock); + dcb->func.hangup(dcb); + } + else + spinlock_release(&dcb->dcb_initlock); + } + + spinlock_acquire(&pollqlock); + if (dcb->evq.pending_events == 0) + { + /* No pending events so remove from the queue */ + if (dcb->evq.prev != dcb) + { + dcb->evq.prev->evq.next = dcb->evq.next; + dcb->evq.next->evq.prev = dcb->evq.prev; + if (eventq == dcb) + eventq = dcb->evq.next; + } + else + { + eventq = NULL; + } + dcb->evq.next = NULL; + dcb->evq.prev = NULL; + pollStats.evq_length--; + } + else + { + /* + * We have a pending event, move to the end of the queue + * if there are any other DCB's in the queue. + * + * If we are the first item on the queue this is easy, we + * just bump the eventq pointer. + */ + if (dcb->evq.prev != dcb) + { + if (eventq == dcb) + eventq = dcb->evq.next; + else + { + dcb->evq.prev->evq.next = dcb->evq.next; + dcb->evq.next->evq.prev = dcb->evq.prev; + dcb->evq.prev = eventq->evq.prev; + dcb->evq.next = eventq; + eventq->evq.prev = dcb; + dcb->evq.prev->evq.next = dcb; + } + } + } + dcb->evq.processing = 0; + spinlock_release(&pollqlock); +} + /** * Shutdown the polling loop */ @@ -761,6 +880,10 @@ int i; pollStats.n_accept); dcb_printf(dcb, "Number of times no threads polling: %d\n", pollStats.n_nothreads); + dcb_printf(dcb, "Current event queue length: %d\n", + pollStats.evq_length); + dcb_printf(dcb, "Maximum event queue length: %d\n", + pollStats.evq_max); dcb_printf(dcb, "No of poll completions with descriptors\n"); dcb_printf(dcb, "\tNo. of descriptors\tNo. of poll completions.\n"); diff --git a/server/include/dcb.h b/server/include/dcb.h index 06687f349..5e14fc235 100644 --- a/server/include/dcb.h +++ b/server/include/dcb.h @@ -53,7 +53,8 @@ struct service; * 07/02/2014 Massimiliano Pinto Added ipv4 data struct into for dcb * 07/05/2014 Mark Riddoch Addition of callback mechanism * 08/05/2014 Mark Riddoch Addition of writeq high and low watermarks - * 27/08/2014 Mark Ridddoch Addition of write event queuing + * 27/08/2014 Mark Riddoch Addition of write event queuing + * 23/09/2014 Mark Riddoch New poll processing queue * * @endverbatim */ @@ -97,6 +98,14 @@ typedef struct gw_protocol { int (*session)(struct dcb *, void *); } GWPROTOCOL; +typedef struct { + struct dcb *next; + struct dcb *prev; + uint32_t pending_events; + int processing; + SPINLOCK eventqlock; +} DCBEVENTQ; + /** * The GWPROTOCOL version data. The following should be updated whenever * the GWPROTOCOL structure is changed. See the rules defined in modinfo.h @@ -114,10 +123,6 @@ typedef struct dcbstats { int n_buffered; /*< Number of buffered writes */ int n_high_water; /*< Number of crosses of high water mark */ int n_low_water; /*< Number of crosses of low water mark */ - int n_busypolls; /*< Number of read polls whiel reading */ - int n_readrechecks; /*< Number of rechecks for reads */ - int n_busywrpolls; /*< Number of write polls while writing */ - int n_writerechecks;/*< Number of rechecks for writes */ } DCBSTATS; /** @@ -204,6 +209,7 @@ typedef struct dcb { #endif dcb_role_t dcb_role; SPINLOCK dcb_initlock; + DCBEVENTQ evq; /**< The event queue for this DCB */ #if 1 simple_mutex_t dcb_read_lock; simple_mutex_t dcb_write_lock; @@ -271,8 +277,8 @@ int fail_accept_errno; #define DCB_BELOW_LOW_WATER(x) ((x)->low_water && (x)->writeqlen < (x)->low_water) #define DCB_ABOVE_HIGH_WATER(x) ((x)->high_water && (x)->writeqlen > (x)->high_water) -void dcb_pollin(DCB *, int, int); -void dcb_pollout(DCB *, int, int); +#define DCB_POLL_BUSY(x) ((x)->evq.next != NULL) + DCB *dcb_get_zombies(void); int gw_write( #if defined(SS_DEBUG)