Added static simple_mutex_t epoll_wait_mutex, which is acquired and released in poll_waitevents.

This commit is contained in:
vraatikka 2013-08-23 22:45:06 +03:00
parent 0a182913a1
commit 6f0ccd7b42

View File

@ -45,6 +45,8 @@
static int epoll_fd = -1; /**< The epoll file descriptor */
static int shutdown = 0; /**< Flag the shutdown of the poll subsystem */
static GWBITMASK poll_mask;
static simple_mutex_t epoll_wait_mutex; /**< serializes calls to epoll_wait */
/**
* The polling statistics
*/
@ -75,6 +77,7 @@ poll_init()
}
memset(&pollStats, 0, sizeof(pollStats));
bitmask_init(&poll_mask);
simple_mutex_init(&epoll_wait_mutex, "epoll_wait_mutex");
}
/**
@ -87,7 +90,7 @@ poll_init()
int
poll_add_dcb(DCB *dcb)
{
struct epoll_event ev;
struct epoll_event ev;
ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
ev.data.ptr = dcb;
@ -106,14 +109,14 @@ struct epoll_event ev;
int
poll_remove_dcb(DCB *dcb)
{
struct epoll_event ev;
struct epoll_event ev;
return epoll_ctl(epoll_fd, EPOLL_CTL_DEL, dcb->fd, &ev);
}
#define BLOCKINGPOLL 0 /* Set BLOCKING POLL to 1 if using a single thread and to make
* debugging easier.
*/
* debugging easier.
*/
/**
* The main polling loop
*
@ -140,13 +143,14 @@ struct epoll_event ev;
void
poll_waitevents(void *arg)
{
struct epoll_event events[MAX_EVENTS];
int i, nfds;
int thread_id = (int)arg;
bool no_op = FALSE;
struct epoll_event events[MAX_EVENTS];
int i, nfds;
int thread_id = (int)arg;
bool no_op = FALSE;
/* Add this thread to the bitmask of running polling threads */
bitmask_set(&poll_mask, thread_id);
while (1)
{
#if BLOCKINGPOLL
@ -155,32 +159,34 @@ bool no_op = FALSE;
}
#else
if (!no_op) {
skygw_log_write(LOGFILE_TRACE,
"%lu [poll_waitevents] > epoll_wait <",
pthread_self());
no_op = TRUE;
skygw_log_write(LOGFILE_TRACE,
"%lu [poll_waitevents] > epoll_wait <",
pthread_self());
no_op = TRUE;
}
simple_mutex_lock(&epoll_wait_mutex, TRUE);
if ((nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, 0)) == -1)
{
int eno = errno;
errno = 0;
skygw_log_write(LOGFILE_TRACE,
"%lu [poll_waitevents] epoll_wait returned "
"%d, errno %d",
pthread_self(),
nfds,
eno);
no_op = FALSE;
int eno = errno;
errno = 0;
skygw_log_write(LOGFILE_TRACE,
"%lu [poll_waitevents] epoll_wait returned "
"%d, errno %d",
pthread_self(),
nfds,
eno);
no_op = FALSE;
}
else if (nfds == 0)
{
nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, EPOLL_TIMEOUT);
nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, EPOLL_TIMEOUT);
if (nfds == -1)
{
}
if (nfds == -1)
{
}
}
simple_mutex_unlock(&epoll_wait_mutex);
#endif
if (nfds > 0)
{
@ -189,8 +195,8 @@ bool no_op = FALSE;
"%lu [poll_waitevents] epoll_wait found %d fds",
pthread_self(),
nfds);
atomic_add(&pollStats.n_polls, 1);
for (i = 0; i < nfds; i++)
{
DCB *dcb = (DCB *)events[i].data.ptr;
@ -206,7 +212,8 @@ bool no_op = FALSE;
{
skygw_log_write(
LOGFILE_TRACE,
"%lu [poll_waitevents] dcb is zombie",
"%lu [poll_waitevents] dcb is "
"zombie",
pthread_self());
continue;
}
@ -231,11 +238,11 @@ bool no_op = FALSE;
}
if (ev & EPOLLOUT)
{
skygw_log_write(LOGFILE_TRACE,
"%lu [poll_waitevents] "
"Write in fd %d",
pthread_self(),
dcb->fd);
skygw_log_write(LOGFILE_TRACE,
"%lu [poll_waitevents] "
"Write in fd %d",
pthread_self(),
dcb->fd);
atomic_add(&pollStats.n_write, 1);
dcb->func.write_ready(dcb);
}
@ -243,23 +250,23 @@ bool no_op = FALSE;
{
if (dcb->state == DCB_STATE_LISTENING)
{
skygw_log_write(
LOGFILE_TRACE,
"%lu [poll_waitevents] "
"Accept in fd %d",
pthread_self(),
dcb->fd);
atomic_add(&pollStats.n_accept, 1);
dcb->func.accept(dcb);
skygw_log_write(
LOGFILE_TRACE,
"%lu [poll_waitevents] "
"Accept in fd %d",
pthread_self(),
dcb->fd);
atomic_add(&pollStats.n_accept, 1);
dcb->func.accept(dcb);
}
else
{
skygw_log_write(
LOGFILE_TRACE,
"%lu [poll_waitevents] "
"Read in fd %d",
pthread_self(),
dcb->fd);
skygw_log_write(
LOGFILE_TRACE,
"%lu [poll_waitevents] "
"Read in fd %d",
pthread_self(),
dcb->fd);
atomic_add(&pollStats.n_read, 1);
dcb->func.read(dcb);
}
@ -268,9 +275,13 @@ bool no_op = FALSE;
no_op = FALSE;
}
dcb_process_zombies(thread_id);
if (shutdown)
{
/* Remove this thread from the bitmask of running polling threads */
/**
* Remove this thread from the bitmask of running
* polling threads
*/
bitmask_clear(&poll_mask, thread_id);
return;
}