Refinement of polling process

This commit is contained in:
Mark Riddoch
2014-10-02 17:19:18 +01:00
parent 0bc47b90ff
commit 829a4bcbfa

View File

@ -377,13 +377,30 @@ return_rc:
* events at a high rate will not block the execution of events for other DCB's and * events at a high rate will not block the execution of events for other DCB's and
* should result in a fairer polling strategy. * should result in a fairer polling strategy.
* *
* The introduction of the ability to inject "fake" write events into the event queue meant
* that there was a possibility to "starve" new events sicne the polling loop would
* consume the event queue before lookign for new events. If the DCB that inject
* the fake event then injected another fake event as a result of the first it meant
* that new events did not get added to the queue. The strategy has been updated to
* not consume the entire event queue, but process one event before doing a non-blocking
* call to add any new events before processing any more events. A blocking call to
* collect events is only made if there are no pending events to be processed on the
* event queue.
*
* Also introduced a "timeout bias" mechanism. This mechansim control the length of
* of timeout passed to epoll_wait in blocking calls based on previous behaviour.
* The initial call will block for 10% of the define timeout peroid, this will be
* increased in increments of 10% until the full timeout value is used. If at any
* point there is an event to be processed then the value will be reduced to 10% again
* for the next blocking call.
*
* @param arg The thread ID passed as a void * to satisfy the threading package * @param arg The thread ID passed as a void * to satisfy the threading package
*/ */
void void
poll_waitevents(void *arg) poll_waitevents(void *arg)
{ {
struct epoll_event events[MAX_EVENTS]; struct epoll_event events[MAX_EVENTS];
int i, nfds; int i, nfds, timeout_bias = 1;
int thread_id = (int)arg; int thread_id = (int)arg;
DCB *zombies = NULL; DCB *zombies = NULL;
@ -399,14 +416,9 @@ DCB *zombies = NULL;
while (1) while (1)
{ {
/* Process of the queue of waiting requests */ if (pollStats.evq_pending == 0 && timeout_bias < 10)
while (do_shutdown == 0 && process_pollq(thread_id))
{ {
if (thread_data) timeout_bias++;
thread_data[thread_id].state = THREAD_ZPROCESSING;
zombies = dcb_process_zombies(thread_id);
if (thread_data)
thread_data[thread_id].state = THREAD_IDLE;
} }
atomic_add(&n_waiting, 1); atomic_add(&n_waiting, 1);
@ -440,13 +452,16 @@ DCB *zombies = NULL;
* If there are no new descriptors from the non-blocking call * If there are no new descriptors from the non-blocking call
* and nothing to proces on the event queue then for do a * and nothing to proces on the event queue then for do a
* blocking call to epoll_wait. * blocking call to epoll_wait.
*
* We calculate a timeout bias to alter the length of the blocking
* call based on the time since we last received an event to process
*/ */
else if (nfds == 0 && process_pollq(thread_id) == 0) else if (nfds == 0 && pollStats.evq_pending > 0)
{ {
nfds = epoll_wait(epoll_fd, nfds = epoll_wait(epoll_fd,
events, events,
MAX_EVENTS, MAX_EVENTS,
EPOLL_TIMEOUT); (EPOLL_TIMEOUT * timeout_bias) / 10);
if (nfds == 0 && pollStats.evq_pending) if (nfds == 0 && pollStats.evq_pending)
atomic_add(&pollStats.wake_evqpending, 1); atomic_add(&pollStats.wake_evqpending, 1);
} }
@ -463,6 +478,7 @@ DCB *zombies = NULL;
#endif /* BLOCKINGPOLL */ #endif /* BLOCKINGPOLL */
if (nfds > 0) if (nfds > 0)
{ {
timeout_bias = 1;
LOGIF(LD, (skygw_log_write( LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG, LOGFILE_DEBUG,
"%lu [poll_waitevents] epoll_wait found %d fds", "%lu [poll_waitevents] epoll_wait found %d fds",
@ -542,20 +558,19 @@ DCB *zombies = NULL;
} }
/* /*
* If there was nothing to process then process the zombie queue * Process of the queue of waiting requests
* This is done without checking the evq_pending count as a
* precautionary measure to avoid issues if the house keeping
* of the count goes wrong.
*/ */
if (process_pollq(thread_id) == 0) if (process_pollq(thread_id))
{ timeout_bias = 1;
if (thread_data) if (thread_data)
{
thread_data[thread_id].state = THREAD_ZPROCESSING; thread_data[thread_id].state = THREAD_ZPROCESSING;
}
zombies = dcb_process_zombies(thread_id); zombies = dcb_process_zombies(thread_id);
if (thread_data) if (thread_data)
{
thread_data[thread_id].state = THREAD_IDLE; thread_data[thread_id].state = THREAD_IDLE;
}
}
if (do_shutdown) if (do_shutdown)
{ {
@ -1152,7 +1167,11 @@ int new_samples, new_nfds;
* queue. * queue.
* *
* This is used to trigger transmission activity on another DCB from * This is used to trigger transmission activity on another DCB from
* within the event processing routine of a DCB. * within the event processing routine of a DCB. or to allow a DCB
* to defer some further output processing, to allow for other DCBs
* to receive a slice of the processing time. Fake events are added
* to the tail of the event queue, in the same way that real events
* are, so maintain the "fairness" of processing.
* *
* @param dcb DCB to emulate an EPOLLOUT event for * @param dcb DCB to emulate an EPOLLOUT event for
*/ */