Create thread specific epoll instances

Having a unique epoll instance for each thread allows a lot of the locking
from poll.c to be removed. The downside to this is that each session can
have only one thread processing events for it which might reduce
performance with very low client counts.
This commit is contained in:
Markus Makela
2016-10-20 00:25:05 +03:00
parent 573faff987
commit b79210c760
2 changed files with 48 additions and 168 deletions

View File

@ -227,6 +227,7 @@ typedef struct dcb
DCBEVENTQ evq; /**< The event queue for this DCB */ DCBEVENTQ evq; /**< The event queue for this DCB */
int fd; /**< The descriptor */ int fd; /**< The descriptor */
dcb_state_t state; /**< Current descriptor state */ dcb_state_t state; /**< Current descriptor state */
int owner; /**< Owning thread */
SSL_STATE ssl_state; /**< Current state of SSL if in use */ SSL_STATE ssl_state; /**< Current state of SSL if in use */
int flags; /**< DCB flags */ int flags; /**< DCB flags */
char *remote; /**< Address of remote end */ char *remote; /**< Address of remote end */

View File

@ -81,7 +81,8 @@ int max_poll_sleep;
*/ */
#define MUTEX_EPOLL 0 #define MUTEX_EPOLL 0
static int epoll_fd = -1; /*< The epoll file descriptor */ static int *epoll_fd; /*< The epoll file descriptor */
static int next_epoll_fd = 0; /*< Which thread handles the next DCB */
static int do_shutdown = 0; /*< Flag the shutdown of the poll subsystem */ static int do_shutdown = 0; /*< Flag the shutdown of the poll subsystem */
static GWBITMASK poll_mask; static GWBITMASK poll_mask;
#if MUTEX_EPOLL #if MUTEX_EPOLL
@ -89,7 +90,7 @@ static simple_mutex_t epoll_wait_mutex; /*< serializes calls to epoll_wait */
#endif #endif
static int n_waiting = 0; /*< No. of threads in epoll_wait */ static int n_waiting = 0; /*< No. of threads in epoll_wait */
static int process_pollq(int thread_id); static int process_pollq(int thread_id, struct epoll_event *event);
static void poll_add_event_to_dcb(DCB* dcb, GWBUF* buf, __uint32_t ev); static void poll_add_event_to_dcb(DCB* dcb, GWBUF* buf, __uint32_t ev);
static bool poll_dcb_session_check(DCB *dcb, const char *); static bool poll_dcb_session_check(DCB *dcb, const char *);
@ -206,26 +207,30 @@ static int poll_resolve_error(DCB *, int, bool);
void void
poll_init() poll_init()
{ {
int i; n_threads = config_threadcount();
if (epoll_fd != -1) if (!(epoll_fd = MXS_MALLOC(sizeof(int) * n_threads)))
{ {
return; return;
} }
if ((epoll_fd = epoll_create(MAX_EVENTS)) == -1)
for (int i = 0; i < n_threads; i++)
{
if ((epoll_fd[i] = epoll_create(MAX_EVENTS)) == -1)
{ {
char errbuf[MXS_STRERROR_BUFLEN]; char errbuf[MXS_STRERROR_BUFLEN];
MXS_ERROR("FATAL: Could not create epoll instance: %s", strerror_r(errno, errbuf, sizeof(errbuf))); MXS_ERROR("FATAL: Could not create epoll instance: %s", strerror_r(errno, errbuf, sizeof(errbuf)));
exit(-1); exit(-1);
} }
}
memset(&pollStats, 0, sizeof(pollStats)); memset(&pollStats, 0, sizeof(pollStats));
memset(&queueStats, 0, sizeof(queueStats)); memset(&queueStats, 0, sizeof(queueStats));
bitmask_init(&poll_mask); bitmask_init(&poll_mask);
n_threads = config_threadcount();
thread_data = (THREAD_DATA *)MXS_MALLOC(n_threads * sizeof(THREAD_DATA)); thread_data = (THREAD_DATA *)MXS_MALLOC(n_threads * sizeof(THREAD_DATA));
if (thread_data) if (thread_data)
{ {
for (i = 0; i < n_threads; i++) for (int i = 0; i < n_threads; i++)
{ {
thread_data[i].state = THREAD_STOPPED; thread_data[i].state = THREAD_STOPPED;
} }
@ -254,13 +259,13 @@ poll_init()
n_avg_samples = 15 * 60 / POLL_LOAD_FREQ; n_avg_samples = 15 * 60 / POLL_LOAD_FREQ;
avg_samples = (double *)MXS_MALLOC(sizeof(double) * n_avg_samples); avg_samples = (double *)MXS_MALLOC(sizeof(double) * n_avg_samples);
MXS_ABORT_IF_NULL(avg_samples); MXS_ABORT_IF_NULL(avg_samples);
for (i = 0; i < n_avg_samples; i++) for (int i = 0; i < n_avg_samples; i++)
{ {
avg_samples[i] = 0.0; avg_samples[i] = 0.0;
} }
evqp_samples = (int *)MXS_MALLOC(sizeof(int) * n_avg_samples); evqp_samples = (int *)MXS_MALLOC(sizeof(int) * n_avg_samples);
MXS_ABORT_IF_NULL(evqp_samples); MXS_ABORT_IF_NULL(evqp_samples);
for (i = 0; i < n_avg_samples; i++) for (int i = 0; i < n_avg_samples; i++)
{ {
evqp_samples[i] = 0.0; evqp_samples[i] = 0.0;
} }
@ -339,7 +344,20 @@ poll_add_dcb(DCB *dcb)
* The only possible failure that will not cause a crash is * The only possible failure that will not cause a crash is
* running out of system resources. * running out of system resources.
*/ */
rc = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, dcb->fd, &ev); int owner = 0;
if (dcb->dcb_role == DCB_ROLE_BACKEND_HANDLER)
{
owner = dcb->session->client_dcb->owner;
}
else
{
owner = (unsigned int)atomic_add(&next_epoll_fd, 1) % n_threads;
}
dcb->owner = owner;
rc = epoll_ctl(epoll_fd[owner], EPOLL_CTL_ADD, dcb->fd, &ev);
if (rc) if (rc)
{ {
/* Some errors are actually considered acceptable */ /* Some errors are actually considered acceptable */
@ -406,7 +424,7 @@ poll_remove_dcb(DCB *dcb)
spinlock_release(&dcb->dcb_initlock); spinlock_release(&dcb->dcb_initlock);
if (dcbfd > 0) if (dcbfd > 0)
{ {
rc = epoll_ctl(epoll_fd, EPOLL_CTL_DEL, dcbfd, &ev); rc = epoll_ctl(epoll_fd[dcb->owner], EPOLL_CTL_DEL, dcbfd, &ev);
/** /**
* The poll_resolve_error function will always * The poll_resolve_error function will always
* return 0 or crash. So if it returns non-zero result, * return 0 or crash. So if it returns non-zero result,
@ -570,11 +588,6 @@ poll_waitevents(void *arg)
while (1) while (1)
{ {
if (pollStats.evq_pending == 0 && timeout_bias < 10)
{
timeout_bias++;
}
atomic_add(&n_waiting, 1); atomic_add(&n_waiting, 1);
#if BLOCKINGPOLL #if BLOCKINGPOLL
nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1); nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
@ -589,7 +602,7 @@ poll_waitevents(void *arg)
} }
ts_stats_increment(pollStats.n_polls, thread_id); ts_stats_increment(pollStats.n_polls, thread_id);
if ((nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, 0)) == -1) if ((nfds = epoll_wait(epoll_fd[thread_id], events, MAX_EVENTS, 0)) == -1)
{ {
atomic_add(&n_waiting, -1); atomic_add(&n_waiting, -1);
int eno = errno; int eno = errno;
@ -609,14 +622,18 @@ poll_waitevents(void *arg)
* We calculate a timeout bias to alter the length of the blocking * We calculate a timeout bias to alter the length of the blocking
* call based on the time since we last received an event to process * call based on the time since we last received an event to process
*/ */
else if (nfds == 0 && pollStats.evq_pending == 0 && poll_spins++ > number_poll_spins) else if (nfds == 0 && poll_spins++ > number_poll_spins)
{ {
if (timeout_bias < 10)
{
timeout_bias++;
}
ts_stats_increment(pollStats.blockingpolls, thread_id); ts_stats_increment(pollStats.blockingpolls, thread_id);
nfds = epoll_wait(epoll_fd, nfds = epoll_wait(epoll_fd[thread_id],
events, events,
MAX_EVENTS, MAX_EVENTS,
(max_poll_sleep * timeout_bias) / 10); (max_poll_sleep * timeout_bias) / 10);
if (nfds == 0 && pollStats.evq_pending) if (nfds == 0)
{ {
atomic_add(&pollStats.wake_evqpending, 1); atomic_add(&pollStats.wake_evqpending, 1);
poll_spins = 0; poll_spins = 0;
@ -671,47 +688,6 @@ poll_waitevents(void *arg)
* idle and is added to the queue to process after * idle and is added to the queue to process after
* setting the event bits. * setting the event bits.
*/ */
for (i = 0; i < nfds; i++)
{
DCB *dcb = (DCB *)events[i].data.ptr;
__uint32_t ev = events[i].events;
spinlock_acquire(&pollqlock);
if (DCB_POLL_BUSY(dcb))
{
if (dcb->evq.pending_events == 0)
{
pollStats.evq_pending++;
dcb->evq.inserted = hkheartbeat;
}
dcb->evq.pending_events |= ev;
}
else
{
dcb->evq.pending_events = ev;
if (eventq)
{
dcb->evq.prev = eventq->evq.prev;
eventq->evq.prev->evq.next = dcb;
eventq->evq.prev = dcb;
dcb->evq.next = eventq;
}
else
{
eventq = dcb;
dcb->evq.prev = dcb;
dcb->evq.next = dcb;
}
pollStats.evq_length++;
pollStats.evq_pending++;
dcb->evq.inserted = hkheartbeat;
if (pollStats.evq_length > pollStats.evq_max)
{
pollStats.evq_max = pollStats.evq_length;
}
}
spinlock_release(&pollqlock);
}
} }
/* /*
@ -720,9 +696,10 @@ poll_waitevents(void *arg)
* precautionary measure to avoid issues if the house keeping * precautionary measure to avoid issues if the house keeping
* of the count goes wrong. * of the count goes wrong.
*/ */
if (process_pollq(thread_id))
for (int i = 0; i < nfds; i++)
{ {
timeout_bias = 1; process_pollq(thread_id, &events[i]);
} }
if (check_timeouts && hkheartbeat >= next_timeout_check) if (check_timeouts && hkheartbeat >= next_timeout_check)
@ -811,61 +788,14 @@ poll_set_maxwait(unsigned int maxwait)
* @return 0 if no DCB's have been processed * @return 0 if no DCB's have been processed
*/ */
static int static int
process_pollq(int thread_id) process_pollq(int thread_id, struct epoll_event *event)
{ {
DCB *dcb;
int found = 0; int found = 0;
uint32_t ev; uint32_t ev = event->events;
unsigned long qtime; unsigned long qtime;
spinlock_acquire(&pollqlock); DCB *dcb = event->data.ptr;
if (eventq == NULL) atomic_add(&pollStats.evq_pending, -1);
{
/* Nothing to process */
spinlock_release(&pollqlock);
return 0;
}
dcb = eventq;
if (dcb->evq.next == dcb->evq.prev && dcb->evq.processing == 0)
{
found = 1;
dcb->evq.processing = 1;
}
else if (dcb->evq.next == dcb->evq.prev)
{
/* Only item in queue is being processed */
spinlock_release(&pollqlock);
return 0;
}
else
{
do
{
dcb = dcb->evq.next;
}
while (dcb != eventq && dcb->evq.processing == 1);
if (dcb->evq.processing == 0)
{
/* Found DCB to process */
dcb->evq.processing = 1;
found = 1;
}
}
if (found)
{
ev = dcb->evq.pending_events;
dcb->evq.processing_events = ev;
dcb->evq.pending_events = 0;
pollStats.evq_pending--;
ss_dassert(pollStats.evq_pending >= 0);
}
spinlock_release(&pollqlock);
if (found == 0)
{
return 0;
}
#if PROFILE_POLL #if PROFILE_POLL
memlog_log(plog, hkheartbeat - dcb->evq.inserted); memlog_log(plog, hkheartbeat - dcb->evq.inserted);
@ -1100,59 +1030,8 @@ process_pollq(int thread_id)
queueStats.maxexectime = qtime; queueStats.maxexectime = qtime;
} }
spinlock_acquire(&pollqlock);
dcb->evq.processing_events = 0;
if (dcb->evq.pending_events == 0)
{
/* No pending events so remove from the queue */
if (dcb->evq.prev != dcb)
{
dcb->evq.prev->evq.next = dcb->evq.next;
dcb->evq.next->evq.prev = dcb->evq.prev;
if (eventq == dcb)
{
eventq = dcb->evq.next;
}
}
else
{
eventq = NULL;
}
dcb->evq.next = NULL;
dcb->evq.prev = NULL;
pollStats.evq_length--;
}
else
{
/*
* We have a pending event, move to the end of the queue
* if there are any other DCB's in the queue.
*
* If we are the first item on the queue this is easy, we
* just bump the eventq pointer.
*/
if (dcb->evq.prev != dcb)
{
if (eventq == dcb)
{
eventq = dcb->evq.next;
}
else
{
dcb->evq.prev->evq.next = dcb->evq.next;
dcb->evq.next->evq.prev = dcb->evq.prev;
dcb->evq.prev = eventq->evq.prev;
dcb->evq.next = eventq;
eventq->evq.prev = dcb;
dcb->evq.prev->evq.next = dcb;
}
}
}
dcb->evq.processing = 0;
/** Reset session id from thread's local storage */ /** Reset session id from thread's local storage */
mxs_log_tls.li_sesid = 0; mxs_log_tls.li_sesid = 0;
spinlock_release(&pollqlock);
return 1; return 1;
} }