Introduction of a new polling mechanism to make the thread usage more fair and avoid
having busy DCB's being able to block execution of events on less busy DCBs
This commit is contained in:
@ -69,6 +69,11 @@ static GWBITMASK poll_mask;
|
||||
static simple_mutex_t epoll_wait_mutex; /*< serializes calls to epoll_wait */
|
||||
#endif
|
||||
static int n_waiting = 0; /*< No. of threads in epoll_wait */
|
||||
static int process_pollq(int thread_id);
|
||||
|
||||
|
||||
DCB *eventq = NULL;
|
||||
SPINLOCK pollqlock = SPINLOCK_INIT;
|
||||
|
||||
/**
|
||||
* Thread load average, this is the average number of descriptors in each
|
||||
@ -129,6 +134,8 @@ static struct {
|
||||
int n_nothreads; /*< Number of times no threads are polling */
|
||||
int n_fds[MAXNFDS]; /*< Number of wakeups with particular
|
||||
n_fds value */
|
||||
int evq_length; /*< Event queue length */
|
||||
int evq_max; /*< Maximum event queue length */
|
||||
} pollStats;
|
||||
|
||||
/**
|
||||
@ -338,6 +345,17 @@ return_rc:
|
||||
* deschedule a process if a timeout is included, but will not do this if a 0 timeout
|
||||
* value is given. this improves performance when the gateway is under heavy load.
|
||||
*
|
||||
* In order to provide a fairer means of sharign the threads between the different
|
||||
* DCB's the poll mechanism has been decoupled from the processing of the events.
|
||||
* The events are now recieved via the epoll_wait call, a queue of DCB's that have
|
||||
* events pending is maintained and as new events arrive the DCB is added to the end
|
||||
* of this queue. If an eent arrives for a DCB alreayd in the queue, then the event
|
||||
* bits are added to the DCB but the DCB mantains the same point inthe queue unless
|
||||
* the original events are already being processed. If they are being processed then
|
||||
* the DCB is moved to the back of the queue, this means that a DCB that is receiving
|
||||
* events at a high rate will not block the execution of events for other DCB's and
|
||||
* should result in a fairer polling strategy.
|
||||
*
|
||||
* @param arg The thread ID passed as a void * to satisfy the threading package
|
||||
*/
|
||||
void
|
||||
@ -346,12 +364,7 @@ poll_waitevents(void *arg)
|
||||
struct epoll_event events[MAX_EVENTS];
|
||||
int i, nfds;
|
||||
int thread_id = (int)arg;
|
||||
bool no_op = false;
|
||||
static bool process_zombies_only = false; /*< flag for all threads */
|
||||
DCB *zombies = NULL;
|
||||
#if PROFILE_POLL
|
||||
CYCLES cycles[2];
|
||||
#endif
|
||||
|
||||
/** Add this thread to the bitmask of running polling threads */
|
||||
bitmask_set(&poll_mask, thread_id);
|
||||
@ -365,20 +378,15 @@ CYCLES cycles[2];
|
||||
|
||||
while (1)
|
||||
{
|
||||
/* Process of the queue of waiting requests */
|
||||
while (process_pollq(thread_id))
|
||||
zombies = dcb_process_zombies(thread_id, NULL);
|
||||
|
||||
atomic_add(&n_waiting, 1);
|
||||
#if BLOCKINGPOLL
|
||||
nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
|
||||
atomic_add(&n_waiting, -1);
|
||||
#else /* BLOCKINGPOLL */
|
||||
if (!no_op) {
|
||||
LOGIF(LD, (skygw_log_write(
|
||||
LOGFILE_DEBUG,
|
||||
"%lu [poll_waitevents] MaxScale thread "
|
||||
"%d > epoll_wait <",
|
||||
pthread_self(),
|
||||
thread_id)));
|
||||
no_op = TRUE;
|
||||
}
|
||||
#if MUTEX_EPOLL
|
||||
simple_mutex_lock(&epoll_wait_mutex, TRUE);
|
||||
#endif
|
||||
@ -399,32 +407,14 @@ CYCLES cycles[2];
|
||||
pthread_self(),
|
||||
nfds,
|
||||
eno)));
|
||||
no_op = FALSE;
|
||||
}
|
||||
else if (nfds == 0)
|
||||
{
|
||||
atomic_add(&n_waiting, -1);
|
||||
if (process_zombies_only) {
|
||||
#if MUTEX_EPOLL
|
||||
simple_mutex_unlock(&epoll_wait_mutex);
|
||||
#endif
|
||||
goto process_zombies;
|
||||
} else {
|
||||
nfds = epoll_wait(epoll_fd,
|
||||
atomic_add(&n_waiting, 1);
|
||||
nfds = epoll_wait(epoll_fd,
|
||||
events,
|
||||
MAX_EVENTS,
|
||||
EPOLL_TIMEOUT);
|
||||
/*<
|
||||
* When there are zombies to be cleaned up but
|
||||
* no client requests, allow all threads to call
|
||||
* dcb_process_zombies without having to wait
|
||||
* for the timeout.
|
||||
*/
|
||||
if (nfds == 0 && dcb_get_zombies() != NULL)
|
||||
{
|
||||
process_zombies_only = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -460,241 +450,60 @@ CYCLES cycles[2];
|
||||
atomic_add(&load_samples, 1);
|
||||
atomic_add(&load_nfds, nfds);
|
||||
|
||||
/*
|
||||
* Process every DCB that has a new event and add
|
||||
* it to the poll queue.
|
||||
* If the DCB is currently beign processed then we
|
||||
* or in the new eent bits to the pending event bits
|
||||
* and leave it in the queue.
|
||||
* If the DCB was not already in the queue then it was
|
||||
* idle and is added to the queue to process after
|
||||
* setting the eent bits.
|
||||
*/
|
||||
for (i = 0; i < nfds; i++)
|
||||
{
|
||||
DCB *dcb = (DCB *)events[i].data.ptr;
|
||||
__uint32_t ev = events[i].events;
|
||||
|
||||
#if PROFILE_POLL
|
||||
if (i > 0)
|
||||
spinlock_acquire(&pollqlock);
|
||||
if (DCB_POLL_BUSY(dcb))
|
||||
{
|
||||
LOGIF(LT, (skygw_log_write(
|
||||
LOGFILE_TRACE,
|
||||
"Delayed behind event that "
|
||||
"took %ld cycles",
|
||||
cycles[1] - cycles[0])));
|
||||
dcb->evq.pending_events |= ev;
|
||||
}
|
||||
cycles[0] = rdtsc();
|
||||
#endif
|
||||
|
||||
CHK_DCB(dcb);
|
||||
if (thread_data)
|
||||
else
|
||||
{
|
||||
thread_data[thread_id].cur_dcb = dcb;
|
||||
thread_data[thread_id].event = ev;
|
||||
}
|
||||
|
||||
#if defined(SS_DEBUG)
|
||||
if (dcb_fake_write_ev[dcb->fd] != 0) {
|
||||
LOGIF(LD, (skygw_log_write(
|
||||
LOGFILE_DEBUG,
|
||||
"%lu [poll_waitevents] "
|
||||
"Added fake events %d to ev %d.",
|
||||
pthread_self(),
|
||||
dcb_fake_write_ev[dcb->fd],
|
||||
ev)));
|
||||
ev |= dcb_fake_write_ev[dcb->fd];
|
||||
dcb_fake_write_ev[dcb->fd] = 0;
|
||||
}
|
||||
#endif
|
||||
ss_debug(spinlock_acquire(&dcb->dcb_initlock);)
|
||||
ss_dassert(dcb->state != DCB_STATE_ALLOC);
|
||||
ss_dassert(dcb->state != DCB_STATE_DISCONNECTED);
|
||||
ss_dassert(dcb->state != DCB_STATE_FREED);
|
||||
ss_debug(spinlock_release(&dcb->dcb_initlock);)
|
||||
|
||||
LOGIF(LD, (skygw_log_write(
|
||||
LOGFILE_DEBUG,
|
||||
"%lu [poll_waitevents] event %d dcb %p "
|
||||
"role %s",
|
||||
pthread_self(),
|
||||
ev,
|
||||
dcb,
|
||||
STRDCBROLE(dcb->dcb_role))));
|
||||
|
||||
if (ev & EPOLLOUT)
|
||||
{
|
||||
int eno = 0;
|
||||
eno = gw_getsockerrno(dcb->fd);
|
||||
|
||||
if (eno == 0) {
|
||||
#if MUTEX_BLOCK
|
||||
simple_mutex_lock(
|
||||
&dcb->dcb_write_lock,
|
||||
true);
|
||||
ss_info_dassert(
|
||||
!dcb->dcb_write_active,
|
||||
"Write already active");
|
||||
dcb->dcb_write_active = TRUE;
|
||||
atomic_add(
|
||||
&pollStats.n_write,
|
||||
1);
|
||||
dcb->func.write_ready(dcb);
|
||||
dcb->dcb_write_active = FALSE;
|
||||
simple_mutex_unlock(
|
||||
&dcb->dcb_write_lock);
|
||||
#else
|
||||
atomic_add(&pollStats.n_write,
|
||||
1);
|
||||
dcb_pollout(dcb, thread_id, nfds);
|
||||
#endif
|
||||
} else {
|
||||
LOGIF(LD, (skygw_log_write(
|
||||
LOGFILE_DEBUG,
|
||||
"%lu [poll_waitevents] "
|
||||
"EPOLLOUT due %d, %s. "
|
||||
"dcb %p, fd %i",
|
||||
pthread_self(),
|
||||
eno,
|
||||
strerror(eno),
|
||||
dcb,
|
||||
dcb->fd)));
|
||||
}
|
||||
}
|
||||
if (ev & EPOLLIN)
|
||||
{
|
||||
#if MUTEX_BLOCK
|
||||
simple_mutex_lock(&dcb->dcb_read_lock,
|
||||
true);
|
||||
ss_info_dassert(!dcb->dcb_read_active,
|
||||
"Read already active");
|
||||
dcb->dcb_read_active = TRUE;
|
||||
#endif
|
||||
|
||||
if (dcb->state == DCB_STATE_LISTENING)
|
||||
dcb->evq.pending_events = ev;
|
||||
if (eventq)
|
||||
{
|
||||
LOGIF(LD, (skygw_log_write(
|
||||
LOGFILE_DEBUG,
|
||||
"%lu [poll_waitevents] "
|
||||
"Accept in fd %d",
|
||||
pthread_self(),
|
||||
dcb->fd)));
|
||||
atomic_add(
|
||||
&pollStats.n_accept, 1);
|
||||
dcb->func.accept(dcb);
|
||||
}
|
||||
else
|
||||
{
|
||||
LOGIF(LD, (skygw_log_write(
|
||||
LOGFILE_DEBUG,
|
||||
"%lu [poll_waitevents] "
|
||||
"Read in dcb %p fd %d",
|
||||
pthread_self(),
|
||||
dcb,
|
||||
dcb->fd)));
|
||||
atomic_add(&pollStats.n_read, 1);
|
||||
#if MUTEX_BLOCK
|
||||
dcb->func.read(dcb);
|
||||
#else
|
||||
dcb_pollin(dcb, thread_id, nfds);
|
||||
#endif
|
||||
}
|
||||
#if MUTEX_BLOCK
|
||||
dcb->dcb_read_active = FALSE;
|
||||
simple_mutex_unlock(
|
||||
&dcb->dcb_read_lock);
|
||||
#endif
|
||||
}
|
||||
if (ev & EPOLLERR)
|
||||
{
|
||||
int eno = gw_getsockerrno(dcb->fd);
|
||||
#if defined(SS_DEBUG)
|
||||
if (eno == 0) {
|
||||
eno = dcb_fake_write_errno[dcb->fd];
|
||||
LOGIF(LD, (skygw_log_write(
|
||||
LOGFILE_DEBUG,
|
||||
"%lu [poll_waitevents] "
|
||||
"Added fake errno %d. "
|
||||
"%s",
|
||||
pthread_self(),
|
||||
eno,
|
||||
strerror(eno))));
|
||||
}
|
||||
dcb_fake_write_errno[dcb->fd] = 0;
|
||||
#endif
|
||||
if (eno != 0) {
|
||||
LOGIF(LD, (skygw_log_write(
|
||||
LOGFILE_DEBUG,
|
||||
"%lu [poll_waitevents] "
|
||||
"EPOLLERR due %d, %s.",
|
||||
pthread_self(),
|
||||
eno,
|
||||
strerror(eno))));
|
||||
}
|
||||
atomic_add(&pollStats.n_error, 1);
|
||||
dcb->func.error(dcb);
|
||||
}
|
||||
|
||||
if (ev & EPOLLHUP)
|
||||
{
|
||||
int eno = 0;
|
||||
eno = gw_getsockerrno(dcb->fd);
|
||||
|
||||
LOGIF(LD, (skygw_log_write(
|
||||
LOGFILE_DEBUG,
|
||||
"%lu [poll_waitevents] "
|
||||
"EPOLLHUP on dcb %p, fd %d. "
|
||||
"Errno %d, %s.",
|
||||
pthread_self(),
|
||||
dcb,
|
||||
dcb->fd,
|
||||
eno,
|
||||
strerror(eno))));
|
||||
atomic_add(&pollStats.n_hup, 1);
|
||||
spinlock_acquire(&dcb->dcb_initlock);
|
||||
if ((dcb->flags & DCBF_HUNG) == 0)
|
||||
{
|
||||
dcb->flags |= DCBF_HUNG;
|
||||
spinlock_release(&dcb->dcb_initlock);
|
||||
dcb->func.hangup(dcb);
|
||||
dcb->evq.prev = eventq->evq.prev;
|
||||
eventq->evq.prev->evq.next = dcb;
|
||||
eventq->evq.prev = dcb;
|
||||
dcb->evq.next = eventq;
|
||||
}
|
||||
else
|
||||
spinlock_release(&dcb->dcb_initlock);
|
||||
}
|
||||
|
||||
if (ev & EPOLLRDHUP)
|
||||
{
|
||||
int eno = 0;
|
||||
eno = gw_getsockerrno(dcb->fd);
|
||||
|
||||
LOGIF(LD, (skygw_log_write(
|
||||
LOGFILE_DEBUG,
|
||||
"%lu [poll_waitevents] "
|
||||
"EPOLLRDHUP on dcb %p, fd %d. "
|
||||
"Errno %d, %s.",
|
||||
pthread_self(),
|
||||
dcb,
|
||||
dcb->fd,
|
||||
eno,
|
||||
strerror(eno))));
|
||||
atomic_add(&pollStats.n_hup, 1);
|
||||
spinlock_acquire(&dcb->dcb_initlock);
|
||||
if ((dcb->flags & DCBF_HUNG) == 0)
|
||||
{
|
||||
dcb->flags |= DCBF_HUNG;
|
||||
spinlock_release(&dcb->dcb_initlock);
|
||||
dcb->func.hangup(dcb);
|
||||
eventq = dcb;
|
||||
dcb->evq.prev = dcb;
|
||||
dcb->evq.next = dcb;
|
||||
}
|
||||
pollStats.evq_length++;
|
||||
if (pollStats.evq_length > pollStats.evq_max)
|
||||
{
|
||||
pollStats.evq_max = pollStats.evq_length;
|
||||
}
|
||||
else
|
||||
spinlock_release(&dcb->dcb_initlock);
|
||||
}
|
||||
#if PROFILE_POLL
|
||||
cycles[1] = rdtsc();
|
||||
#endif
|
||||
} /*< for */
|
||||
no_op = FALSE;
|
||||
spinlock_release(&pollqlock);
|
||||
}
|
||||
|
||||
/*< for */
|
||||
}
|
||||
process_zombies:
|
||||
|
||||
if (thread_data)
|
||||
{
|
||||
thread_data[thread_id].state = THREAD_ZPROCESSING;
|
||||
}
|
||||
zombies = dcb_process_zombies(thread_id, NULL);
|
||||
|
||||
if (zombies == NULL) {
|
||||
process_zombies_only = false;
|
||||
}
|
||||
|
||||
if (do_shutdown)
|
||||
{
|
||||
/*<
|
||||
@ -717,6 +526,316 @@ process_zombies:
|
||||
mysql_thread_end();
|
||||
}
|
||||
|
||||
/**
|
||||
* Process of the queue of DCB's that have outstanding events
|
||||
*
|
||||
* The first event on the queue will be chosen to be executed by this thread,
|
||||
* all other events will be left on the queue and may be picked up by other
|
||||
* threads. When the processing is complete the thread will take the DCB off the
|
||||
* queue if there are no pending events that have arrived since the thread started
|
||||
* to process the DCB. If there are pending events the DCB will be moved to the
|
||||
* back of the queue so that other DCB's will have a share of the threads to
|
||||
* execute events for them.
|
||||
*
|
||||
* @param thread_id The thread ID of the calling thread
|
||||
* @return 0 if no DCB's have been processed
|
||||
*/
|
||||
static int
|
||||
process_pollq(int thread_id)
|
||||
{
|
||||
DCB *dcb;
|
||||
int found = 0;
|
||||
uint32_t ev;
|
||||
|
||||
spinlock_acquire(&pollqlock);
|
||||
if (eventq == NULL)
|
||||
{
|
||||
/* Nothing to process */
|
||||
spinlock_release(&pollqlock);
|
||||
return 0;
|
||||
}
|
||||
dcb = eventq;
|
||||
if (dcb->evq.next == dcb->evq.prev && dcb->evq.processing == 0)
|
||||
{
|
||||
found = 1;
|
||||
dcb->evq.processing = 1;
|
||||
}
|
||||
else if (dcb->evq.next == dcb->evq.prev)
|
||||
{
|
||||
/* Only item in queue is being processed */
|
||||
spinlock_release(&pollqlock);
|
||||
return 0;
|
||||
}
|
||||
else
|
||||
{
|
||||
do {
|
||||
dcb = dcb->evq.next;
|
||||
} while (dcb != eventq && dcb->evq.processing == 1);
|
||||
|
||||
if (dcb->evq.processing == 0)
|
||||
{
|
||||
/* Found DCB to process */
|
||||
dcb->evq.processing = 1;
|
||||
found = 1;
|
||||
}
|
||||
}
|
||||
if (found)
|
||||
{
|
||||
ev = dcb->evq.pending_events;
|
||||
dcb->evq.pending_events = 0;
|
||||
}
|
||||
spinlock_release(&pollqlock);
|
||||
|
||||
if (found == 0)
|
||||
return 0;
|
||||
|
||||
|
||||
CHK_DCB(dcb);
|
||||
if (thread_data)
|
||||
{
|
||||
thread_data[thread_id].state = THREAD_PROCESSING;
|
||||
thread_data[thread_id].cur_dcb = dcb;
|
||||
thread_data[thread_id].event = ev;
|
||||
}
|
||||
|
||||
#if defined(SS_DEBUG)
|
||||
if (dcb_fake_write_ev[dcb->fd] != 0) {
|
||||
LOGIF(LD, (skygw_log_write(
|
||||
LOGFILE_DEBUG,
|
||||
"%lu [poll_waitevents] "
|
||||
"Added fake events %d to ev %d.",
|
||||
pthread_self(),
|
||||
dcb_fake_write_ev[dcb->fd],
|
||||
ev)));
|
||||
ev |= dcb_fake_write_ev[dcb->fd];
|
||||
dcb_fake_write_ev[dcb->fd] = 0;
|
||||
}
|
||||
#endif
|
||||
ss_debug(spinlock_acquire(&dcb->dcb_initlock);)
|
||||
ss_dassert(dcb->state != DCB_STATE_ALLOC);
|
||||
ss_dassert(dcb->state != DCB_STATE_DISCONNECTED);
|
||||
ss_dassert(dcb->state != DCB_STATE_FREED);
|
||||
ss_debug(spinlock_release(&dcb->dcb_initlock);)
|
||||
|
||||
LOGIF(LD, (skygw_log_write(
|
||||
LOGFILE_DEBUG,
|
||||
"%lu [poll_waitevents] event %d dcb %p "
|
||||
"role %s",
|
||||
pthread_self(),
|
||||
ev,
|
||||
dcb,
|
||||
STRDCBROLE(dcb->dcb_role))));
|
||||
|
||||
if (ev & EPOLLOUT)
|
||||
{
|
||||
int eno = 0;
|
||||
eno = gw_getsockerrno(dcb->fd);
|
||||
|
||||
if (eno == 0) {
|
||||
#if MUTEX_BLOCK
|
||||
simple_mutex_lock(
|
||||
&dcb->dcb_write_lock,
|
||||
true);
|
||||
ss_info_dassert(
|
||||
!dcb->dcb_write_active,
|
||||
"Write already active");
|
||||
dcb->dcb_write_active = TRUE;
|
||||
atomic_add(
|
||||
&pollStats.n_write,
|
||||
1);
|
||||
dcb->func.write_ready(dcb);
|
||||
dcb->dcb_write_active = FALSE;
|
||||
simple_mutex_unlock(
|
||||
&dcb->dcb_write_lock);
|
||||
#else
|
||||
atomic_add(&pollStats.n_write,
|
||||
1);
|
||||
dcb->func.write_ready(dcb);
|
||||
#endif
|
||||
} else {
|
||||
LOGIF(LD, (skygw_log_write(
|
||||
LOGFILE_DEBUG,
|
||||
"%lu [poll_waitevents] "
|
||||
"EPOLLOUT due %d, %s. "
|
||||
"dcb %p, fd %i",
|
||||
pthread_self(),
|
||||
eno,
|
||||
strerror(eno),
|
||||
dcb,
|
||||
dcb->fd)));
|
||||
}
|
||||
}
|
||||
if (ev & EPOLLIN)
|
||||
{
|
||||
#if MUTEX_BLOCK
|
||||
simple_mutex_lock(&dcb->dcb_read_lock,
|
||||
true);
|
||||
ss_info_dassert(!dcb->dcb_read_active,
|
||||
"Read already active");
|
||||
dcb->dcb_read_active = TRUE;
|
||||
#endif
|
||||
|
||||
if (dcb->state == DCB_STATE_LISTENING)
|
||||
{
|
||||
LOGIF(LD, (skygw_log_write(
|
||||
LOGFILE_DEBUG,
|
||||
"%lu [poll_waitevents] "
|
||||
"Accept in fd %d",
|
||||
pthread_self(),
|
||||
dcb->fd)));
|
||||
atomic_add(
|
||||
&pollStats.n_accept, 1);
|
||||
dcb->func.accept(dcb);
|
||||
}
|
||||
else
|
||||
{
|
||||
LOGIF(LD, (skygw_log_write(
|
||||
LOGFILE_DEBUG,
|
||||
"%lu [poll_waitevents] "
|
||||
"Read in dcb %p fd %d",
|
||||
pthread_self(),
|
||||
dcb,
|
||||
dcb->fd)));
|
||||
atomic_add(&pollStats.n_read, 1);
|
||||
dcb->func.read(dcb);
|
||||
}
|
||||
#if MUTEX_BLOCK
|
||||
dcb->dcb_read_active = FALSE;
|
||||
simple_mutex_unlock(
|
||||
&dcb->dcb_read_lock);
|
||||
#endif
|
||||
}
|
||||
if (ev & EPOLLERR)
|
||||
{
|
||||
int eno = gw_getsockerrno(dcb->fd);
|
||||
#if defined(SS_DEBUG)
|
||||
if (eno == 0) {
|
||||
eno = dcb_fake_write_errno[dcb->fd];
|
||||
LOGIF(LD, (skygw_log_write(
|
||||
LOGFILE_DEBUG,
|
||||
"%lu [poll_waitevents] "
|
||||
"Added fake errno %d. "
|
||||
"%s",
|
||||
pthread_self(),
|
||||
eno,
|
||||
strerror(eno))));
|
||||
}
|
||||
dcb_fake_write_errno[dcb->fd] = 0;
|
||||
#endif
|
||||
if (eno != 0) {
|
||||
LOGIF(LD, (skygw_log_write(
|
||||
LOGFILE_DEBUG,
|
||||
"%lu [poll_waitevents] "
|
||||
"EPOLLERR due %d, %s.",
|
||||
pthread_self(),
|
||||
eno,
|
||||
strerror(eno))));
|
||||
}
|
||||
atomic_add(&pollStats.n_error, 1);
|
||||
dcb->func.error(dcb);
|
||||
}
|
||||
|
||||
if (ev & EPOLLHUP)
|
||||
{
|
||||
int eno = 0;
|
||||
eno = gw_getsockerrno(dcb->fd);
|
||||
|
||||
LOGIF(LD, (skygw_log_write(
|
||||
LOGFILE_DEBUG,
|
||||
"%lu [poll_waitevents] "
|
||||
"EPOLLHUP on dcb %p, fd %d. "
|
||||
"Errno %d, %s.",
|
||||
pthread_self(),
|
||||
dcb,
|
||||
dcb->fd,
|
||||
eno,
|
||||
strerror(eno))));
|
||||
atomic_add(&pollStats.n_hup, 1);
|
||||
spinlock_acquire(&dcb->dcb_initlock);
|
||||
if ((dcb->flags & DCBF_HUNG) == 0)
|
||||
{
|
||||
dcb->flags |= DCBF_HUNG;
|
||||
spinlock_release(&dcb->dcb_initlock);
|
||||
dcb->func.hangup(dcb);
|
||||
}
|
||||
else
|
||||
spinlock_release(&dcb->dcb_initlock);
|
||||
}
|
||||
|
||||
if (ev & EPOLLRDHUP)
|
||||
{
|
||||
int eno = 0;
|
||||
eno = gw_getsockerrno(dcb->fd);
|
||||
|
||||
LOGIF(LD, (skygw_log_write(
|
||||
LOGFILE_DEBUG,
|
||||
"%lu [poll_waitevents] "
|
||||
"EPOLLRDHUP on dcb %p, fd %d. "
|
||||
"Errno %d, %s.",
|
||||
pthread_self(),
|
||||
dcb,
|
||||
dcb->fd,
|
||||
eno,
|
||||
strerror(eno))));
|
||||
atomic_add(&pollStats.n_hup, 1);
|
||||
spinlock_acquire(&dcb->dcb_initlock);
|
||||
if ((dcb->flags & DCBF_HUNG) == 0)
|
||||
{
|
||||
dcb->flags |= DCBF_HUNG;
|
||||
spinlock_release(&dcb->dcb_initlock);
|
||||
dcb->func.hangup(dcb);
|
||||
}
|
||||
else
|
||||
spinlock_release(&dcb->dcb_initlock);
|
||||
}
|
||||
|
||||
spinlock_acquire(&pollqlock);
|
||||
if (dcb->evq.pending_events == 0)
|
||||
{
|
||||
/* No pending events so remove from the queue */
|
||||
if (dcb->evq.prev != dcb)
|
||||
{
|
||||
dcb->evq.prev->evq.next = dcb->evq.next;
|
||||
dcb->evq.next->evq.prev = dcb->evq.prev;
|
||||
if (eventq == dcb)
|
||||
eventq = dcb->evq.next;
|
||||
}
|
||||
else
|
||||
{
|
||||
eventq = NULL;
|
||||
}
|
||||
dcb->evq.next = NULL;
|
||||
dcb->evq.prev = NULL;
|
||||
pollStats.evq_length--;
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
* We have a pending event, move to the end of the queue
|
||||
* if there are any other DCB's in the queue.
|
||||
*
|
||||
* If we are the first item on the queue this is easy, we
|
||||
* just bump the eventq pointer.
|
||||
*/
|
||||
if (dcb->evq.prev != dcb)
|
||||
{
|
||||
if (eventq == dcb)
|
||||
eventq = dcb->evq.next;
|
||||
else
|
||||
{
|
||||
dcb->evq.prev->evq.next = dcb->evq.next;
|
||||
dcb->evq.next->evq.prev = dcb->evq.prev;
|
||||
dcb->evq.prev = eventq->evq.prev;
|
||||
dcb->evq.next = eventq;
|
||||
eventq->evq.prev = dcb;
|
||||
dcb->evq.prev->evq.next = dcb;
|
||||
}
|
||||
}
|
||||
}
|
||||
dcb->evq.processing = 0;
|
||||
spinlock_release(&pollqlock);
|
||||
}
|
||||
|
||||
/**
|
||||
* Shutdown the polling loop
|
||||
*/
|
||||
@ -761,6 +880,10 @@ int i;
|
||||
pollStats.n_accept);
|
||||
dcb_printf(dcb, "Number of times no threads polling: %d\n",
|
||||
pollStats.n_nothreads);
|
||||
dcb_printf(dcb, "Current event queue length: %d\n",
|
||||
pollStats.evq_length);
|
||||
dcb_printf(dcb, "Maximum event queue length: %d\n",
|
||||
pollStats.evq_max);
|
||||
|
||||
dcb_printf(dcb, "No of poll completions with descriptors\n");
|
||||
dcb_printf(dcb, "\tNo. of descriptors\tNo. of poll completions.\n");
|
||||
|
||||
Reference in New Issue
Block a user