Move fake events to a thread-specific queue

The fake poll events are now stored in thread specific queues. This
removes the need for the poll event queue.
This commit is contained in:
Markus Makela 2016-10-25 13:19:25 +03:00
parent b79210c760
commit 8efdaa1ea6
7 changed files with 92 additions and 199 deletions

View File

@ -248,6 +248,7 @@ typedef struct dcb
SPINLOCK delayqlock; /**< Delay Backend Write Queue spinlock */
GWBUF *delayq; /**< Delay Backend Write Data Queue */
GWBUF *dcb_readqueue; /**< read queue for storing incomplete reads */
GWBUF *dcb_fakequeue; /**< Fake event queue for generated events */
SPINLOCK authlock; /**< Generic Authorization spinlock */
DCBSTATS stats; /**< DCB related statistics */

View File

@ -420,7 +420,11 @@ dcb_free_all_memory(DCB *dcb)
gwbuf_free(dcb->dcb_readqueue);
dcb->dcb_readqueue = NULL;
}
if (dcb->dcb_fakequeue)
{
gwbuf_free(dcb->dcb_fakequeue);
dcb->dcb_fakequeue = NULL;
}
spinlock_acquire(&dcb->cb_lock);
while ((cb_dcb = dcb->callbacks) != NULL)
{
@ -913,11 +917,15 @@ int dcb_read(DCB *dcb,
if (dcb->dcb_readqueue)
{
spinlock_acquire(&dcb->authlock);
*head = gwbuf_append(*head, dcb->dcb_readqueue);
dcb->dcb_readqueue = NULL;
nreadtotal = gwbuf_length(*head);
spinlock_release(&dcb->authlock);
}
else if (dcb->dcb_fakequeue)
{
*head = gwbuf_append(*head, dcb->dcb_fakequeue);
dcb->dcb_fakequeue = NULL;
nreadtotal = gwbuf_length(*head);
}
if (SSL_HANDSHAKE_DONE == dcb->ssl_state || SSL_ESTABLISHED == dcb->ssl_state)
@ -1661,7 +1669,7 @@ dcb_grab_writeq(DCB *dcb, bool first_time)
if (first_time && dcb->ssl_read_want_write)
{
poll_fake_event(dcb, EPOLLIN);
poll_fake_read_event(dcb);
}
if (first_time && dcb->draining_flag)
@ -3554,7 +3562,5 @@ dcb_role_name(DCB *dcb)
*/
void dcb_append_readqueue(DCB *dcb, GWBUF *buffer)
{
spinlock_acquire(&dcb->authlock);
dcb->dcb_readqueue = gwbuf_append(dcb->dcb_readqueue, buffer);
spinlock_release(&dcb->authlock);
}

View File

@ -81,8 +81,19 @@ int max_poll_sleep;
*/
#define MUTEX_EPOLL 0
/** Fake epoll event struct */
typedef struct fake_event
{
DCB *dcb; /*< The DCB where this event was generated */
GWBUF *data; /*< Fake data, placed in the DCB's read queue */
uint32_t event; /*< The EPOLL event type */
struct fake_event *tail; /*< The last event */
struct fake_event *next; /*< The next event */
} fake_event_t;
static int *epoll_fd; /*< The epoll file descriptor */
static int next_epoll_fd = 0; /*< Which thread handles the next DCB */
static fake_event_t **fake_events; /*< Thread-specific fake event queue */
static int do_shutdown = 0; /*< Flag the shutdown of the poll subsystem */
static GWBITMASK poll_mask;
#if MUTEX_EPOLL
@ -91,7 +102,7 @@ static simple_mutex_t epoll_wait_mutex; /*< serializes calls to epoll_wait */
static int n_waiting = 0; /*< No. of threads in epoll_wait */
static int process_pollq(int thread_id, struct epoll_event *event);
static void poll_add_event_to_dcb(DCB* dcb, GWBUF* buf, __uint32_t ev);
static void poll_add_event_to_dcb(DCB* dcb, GWBUF* buf, uint32_t ev);
static bool poll_dcb_session_check(DCB *dcb, const char *);
DCB *eventq = NULL;
@ -224,6 +235,11 @@ poll_init()
}
}
if ((fake_events = MXS_CALLOC(sizeof(fake_event_t*), n_threads)) == NULL)
{
exit(-1);
}
memset(&pollStats, 0, sizeof(pollStats));
memset(&queueStats, 0, sizeof(queueStats));
bitmask_init(&poll_mask);
@ -339,7 +355,7 @@ poll_add_dcb(DCB *dcb)
STRDCBSTATE(dcb->state));
}
dcb->state = new_state;
spinlock_release(&dcb->dcb_initlock);
/*
* The only possible failure that will not cause a crash is
* running out of system resources.
@ -356,6 +372,7 @@ poll_add_dcb(DCB *dcb)
}
dcb->owner = owner;
spinlock_release(&dcb->dcb_initlock);
rc = epoll_ctl(epoll_fd[owner], EPOLL_CTL_ADD, dcb->fd, &ev);
if (rc)
@ -702,6 +719,19 @@ poll_waitevents(void *arg)
process_pollq(thread_id, &events[i]);
}
/** Process fake events */
while (fake_events[thread_id])
{
fake_event_t *event = fake_events[thread_id];
fake_events[thread_id] = fake_events[thread_id]->next;
struct epoll_event ev;
event->dcb->dcb_fakequeue = event->data;
ev.data.ptr = event->dcb;
ev.events = event->event;
process_pollq(thread_id, &ev);
}
if (check_timeouts && hkheartbeat >= next_timeout_check)
{
process_idle_sessions();
@ -795,7 +825,6 @@ process_pollq(int thread_id, struct epoll_event *event)
unsigned long qtime;
DCB *dcb = event->data.ptr;
atomic_add(&pollStats.evq_pending, -1);
#if PROFILE_POLL
memlog_log(plog, hkheartbeat - dcb->evq.inserted);
@ -1132,8 +1161,6 @@ dprintPollStats(DCB *dcb)
pollStats.evq_length);
dcb_printf(dcb, "Maximum event queue length: %" PRId32 "\n",
pollStats.evq_max);
dcb_printf(dcb, "No. of DCBs with pending events: %" PRId32 "\n",
pollStats.evq_pending);
dcb_printf(dcb, "No. of wakeups with pending queue: %" PRId32 "\n",
pollStats.wake_evqpending);
@ -1366,7 +1393,6 @@ poll_loadav(void *data)
current_avg = 0.0;
}
avg_samples[next_sample] = current_avg;
evqp_samples[next_sample] = pollStats.evq_pending;
next_sample++;
if (next_sample >= n_avg_samples)
{
@ -1396,50 +1422,30 @@ void poll_add_epollin_event_to_dcb(DCB* dcb,
static void poll_add_event_to_dcb(DCB* dcb,
GWBUF* buf,
__uint32_t ev)
uint32_t ev)
{
/** Add buf to readqueue */
spinlock_acquire(&dcb->authlock);
dcb->dcb_readqueue = gwbuf_append(dcb->dcb_readqueue, buf);
spinlock_release(&dcb->authlock);
fake_event_t *event = MXS_MALLOC(sizeof(*event));
spinlock_acquire(&pollqlock);
if (event)
{
event->data = buf;
event->dcb = dcb;
event->event = ev;
event->next = NULL;
event->tail = event;
/** Set event to DCB */
if (DCB_POLL_BUSY(dcb))
{
if (dcb->evq.pending_events == 0)
int thr = dcb->owner;
if (fake_events[thr])
{
pollStats.evq_pending++;
}
dcb->evq.pending_events |= ev;
}
else
{
dcb->evq.pending_events = ev;
/** Add DCB to eventqueue if it isn't already there */
if (eventq)
{
dcb->evq.prev = eventq->evq.prev;
eventq->evq.prev->evq.next = dcb;
eventq->evq.prev = dcb;
dcb->evq.next = eventq;
fake_events[thr]->tail->next = event;
fake_events[thr]->tail = event;
}
else
{
eventq = dcb;
dcb->evq.prev = dcb;
dcb->evq.next = dcb;
}
pollStats.evq_length++;
pollStats.evq_pending++;
if (pollStats.evq_length > pollStats.evq_max)
{
pollStats.evq_max = pollStats.evq_length;
fake_events[thr] = event;
}
}
spinlock_release(&pollqlock);
}
/*
@ -1458,7 +1464,7 @@ static void poll_add_event_to_dcb(DCB* dcb,
void
poll_fake_write_event(DCB *dcb)
{
poll_fake_event(dcb, EPOLLOUT);
poll_add_event_to_dcb(dcb, NULL, EPOLLOUT);
}
/*
@ -1477,79 +1483,7 @@ poll_fake_write_event(DCB *dcb)
void
poll_fake_read_event(DCB *dcb)
{
poll_fake_event(dcb, EPOLLIN);
}
/*
* Insert a fake completion event for a DCB into the polling queue.
*
* This is used to trigger transmission activity on another DCB from
* within the event processing routine of a DCB. or to allow a DCB
* to defer some further output processing, to allow for other DCBs
* to receive a slice of the processing time. Fake events are added
* to the tail of the event queue, in the same way that real events
* are, so maintain the "fairness" of processing.
*
* @param dcb DCB to emulate an event for
* @param ev Event to emulate
*/
void
poll_fake_event(DCB *dcb, enum EPOLL_EVENTS ev)
{
spinlock_acquire(&pollqlock);
/*
* If the DCB is already on the queue, there are no pending events and
* there are other events on the queue, then
* take it off the queue. This stops the DCB hogging the threads.
*/
if (DCB_POLL_BUSY(dcb) && dcb->evq.pending_events == 0 && dcb->evq.prev != dcb)
{
dcb->evq.prev->evq.next = dcb->evq.next;
dcb->evq.next->evq.prev = dcb->evq.prev;
if (eventq == dcb)
{
eventq = dcb->evq.next;
}
dcb->evq.next = NULL;
dcb->evq.prev = NULL;
pollStats.evq_length--;
}
if (DCB_POLL_BUSY(dcb))
{
if (dcb->evq.pending_events == 0)
{
pollStats.evq_pending++;
}
dcb->evq.pending_events |= ev;
}
else
{
dcb->evq.pending_events = ev;
dcb->evq.inserted = hkheartbeat;
if (eventq)
{
dcb->evq.prev = eventq->evq.prev;
eventq->evq.prev->evq.next = dcb;
eventq->evq.prev = dcb;
dcb->evq.next = eventq;
}
else
{
eventq = dcb;
dcb->evq.prev = dcb;
dcb->evq.next = dcb;
}
pollStats.evq_length++;
pollStats.evq_pending++;
dcb->evq.inserted = hkheartbeat;
if (pollStats.evq_length > pollStats.evq_max)
{
pollStats.evq_max = pollStats.evq_length;
}
}
spinlock_release(&pollqlock);
poll_add_event_to_dcb(dcb, NULL, EPOLLIN);
}
/*
@ -1567,42 +1501,7 @@ poll_fake_hangup_event(DCB *dcb)
#else
uint32_t ev = EPOLLHUP;
#endif
spinlock_acquire(&pollqlock);
if (DCB_POLL_BUSY(dcb))
{
if (dcb->evq.pending_events == 0)
{
pollStats.evq_pending++;
}
dcb->evq.pending_events |= ev;
}
else
{
dcb->evq.pending_events = ev;
dcb->evq.inserted = hkheartbeat;
if (eventq)
{
dcb->evq.prev = eventq->evq.prev;
eventq->evq.prev->evq.next = dcb;
eventq->evq.prev = dcb;
dcb->evq.next = eventq;
}
else
{
eventq = dcb;
dcb->evq.prev = dcb;
dcb->evq.next = dcb;
}
pollStats.evq_length++;
pollStats.evq_pending++;
dcb->evq.inserted = hkheartbeat;
if (pollStats.evq_length > pollStats.evq_max)
{
pollStats.evq_max = pollStats.evq_length;
}
}
spinlock_release(&pollqlock);
poll_add_event_to_dcb(dcb, NULL, ev);
}
/**
@ -1696,14 +1595,14 @@ poll_get_stat(POLL_STAT stat)
return ts_stats_sum(pollStats.n_accept);
case POLL_STAT_EVQ_LEN:
return pollStats.evq_length;
case POLL_STAT_EVQ_PENDING:
return pollStats.evq_pending;
case POLL_STAT_EVQ_MAX:
return pollStats.evq_max;
case POLL_STAT_MAX_QTIME:
return (int)queueStats.maxqtime;
case POLL_STAT_MAX_EXECTIME:
return (int)queueStats.maxexectime;
default:
break;
}
return 0;
}

View File

@ -599,10 +599,8 @@ gw_read_backend_event(DCB *dcb)
if (proto->protocol_auth_state == MXS_AUTH_STATE_COMPLETE)
{
/** Authentication completed successfully */
spinlock_acquire(&dcb->authlock);
GWBUF *localq = dcb->delayq;
dcb->delayq = NULL;
spinlock_release(&dcb->authlock);
if (localq)
{
@ -746,9 +744,9 @@ gw_read_and_write(DCB *dcb)
{
GWBUF *tmp = modutil_get_complete_packets(&read_buffer);
/* Put any residue into the read queue */
spinlock_acquire(&dcb->authlock);
dcb->dcb_readqueue = read_buffer;
spinlock_release(&dcb->authlock);
if (tmp == NULL)
{
/** No complete packets */
@ -1012,7 +1010,7 @@ static int gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue)
gwbuf_free(queue);
rc = 0;
spinlock_release(&dcb->authlock);
break;
case MXS_AUTH_STATE_COMPLETE:
@ -1027,7 +1025,7 @@ static int gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue)
dcb->fd,
STRPROTOCOLSTATE(backend_protocol->protocol_auth_state));
spinlock_release(&dcb->authlock);
/**
* Statement type is used in readwrite split router.
* Command is *not* set for readconn router.
@ -1082,7 +1080,7 @@ static int gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue)
* connected with auth ok
*/
backend_set_delayqueue(dcb, queue);
spinlock_release(&dcb->authlock);
rc = 1;
}
break;
@ -1807,9 +1805,9 @@ static GWBUF* process_response_data(DCB* dcb,
/** Store the already read data into the readqueue of the DCB
* and restore the response status to the initial number of packets */
spinlock_acquire(&dcb->authlock);
dcb->dcb_readqueue = gwbuf_append(outbuf, dcb->dcb_readqueue);
spinlock_release(&dcb->authlock);
protocol_set_response_status(p, initial_packets, initial_bytes);
return NULL;
}

View File

@ -433,19 +433,19 @@ int gw_read_client_event(DCB* dcb)
* will be changed to MYSQL_IDLE (see below).
*
*/
case MXS_AUTH_STATE_MESSAGE_READ:
/* After this call read_buffer will point to freed data */
if (nbytes_read < 3 || (0 == max_bytes && nbytes_read <
(MYSQL_GET_PACKET_LEN((uint8_t *) GWBUF_DATA(read_buffer)) + 4)) ||
(0 != max_bytes && nbytes_read < max_bytes))
{
spinlock_acquire(&dcb->authlock);
dcb->dcb_readqueue = read_buffer;
spinlock_release(&dcb->authlock);
return 0;
}
return_code = gw_read_do_authentication(dcb, read_buffer, nbytes_read);
break;
case MXS_AUTH_STATE_MESSAGE_READ:
/* After this call read_buffer will point to freed data */
if (nbytes_read < 3 || (0 == max_bytes && nbytes_read <
(MYSQL_GET_PACKET_LEN((uint8_t *) GWBUF_DATA(read_buffer)) + 4)) ||
(0 != max_bytes && nbytes_read < max_bytes))
{
dcb->dcb_readqueue = read_buffer;
return 0;
}
return_code = gw_read_do_authentication(dcb, read_buffer, nbytes_read);
break;
/**
*
@ -861,9 +861,9 @@ gw_read_normal_data(DCB *dcb, GWBUF *read_buffer, int nbytes_read)
if (nbytes_read < 3 || nbytes_read <
(MYSQL_GET_PACKET_LEN((uint8_t *) GWBUF_DATA(read_buffer)) + 4))
{
spinlock_acquire(&dcb->authlock);
dcb->dcb_readqueue = read_buffer;
spinlock_release(&dcb->authlock);
return 0;
}
gwbuf_set_type(read_buffer, GWBUF_TYPE_MYSQL);
@ -904,9 +904,9 @@ gw_read_finish_processing(DCB *dcb, GWBUF *read_buffer, uint64_t capabilities)
{
/* Must have been data left over */
/* Add incomplete mysql packet to read queue */
spinlock_acquire(&dcb->authlock);
dcb->dcb_readqueue = gwbuf_append(dcb->dcb_readqueue, read_buffer);
spinlock_release(&dcb->authlock);
}
}
else if (NULL != session->router_session || (rcap_type_required(capabilities, RCAP_TYPE_NO_RSESSION)))

View File

@ -1039,9 +1039,9 @@ bool read_complete_packet(DCB *dcb, GWBUF **readbuf)
if (localbuf)
{
/** Store any extra data in the DCB's readqueue */
spinlock_acquire(&dcb->authlock);
dcb->dcb_readqueue = gwbuf_append(dcb->dcb_readqueue, localbuf);
spinlock_release(&dcb->authlock);
}
}
@ -1061,7 +1061,6 @@ bool gw_get_shared_session_auth_info(DCB* dcb, MYSQL_session* session)
CHK_DCB(dcb);
CHK_SESSION(dcb->session);
spinlock_acquire(&dcb->session->ses_lock);
if (dcb->session->state != SESSION_STATE_ALLOC &&
dcb->session->state != SESSION_STATE_DUMMY)
@ -1076,7 +1075,7 @@ bool gw_get_shared_session_auth_info(DCB* dcb, MYSQL_session* session)
pthread_self(), dcb->session->state);
rval = false;
}
spinlock_release(&dcb->session->ses_lock);
return rval;
}

View File

@ -1044,15 +1044,6 @@ maxinfo_event_queue_length()
return poll_get_stat(POLL_STAT_EVQ_LEN);
}
/**
* Interface to poll stats for event pending queue length
*/
static int
maxinfo_event_pending_queue_length()
{
return poll_get_stat(POLL_STAT_EVQ_PENDING);
}
/**
* Interface to poll stats for max event queue length
*/
@ -1108,7 +1099,6 @@ static struct
{ "Error_events", VT_INT, (STATSFUNC)maxinfo_error_events },
{ "Accept_events", VT_INT, (STATSFUNC)maxinfo_accept_events },
{ "Event_queue_length", VT_INT, (STATSFUNC)maxinfo_event_queue_length },
{ "Pending_events", VT_INT, (STATSFUNC)maxinfo_event_pending_queue_length },
{ "Max_event_queue_length", VT_INT, (STATSFUNC)maxinfo_max_event_queue_length },
{ "Max_event_queue_time", VT_INT, (STATSFUNC)maxinfo_max_event_queue_time },
{ "Max_event_execution_time", VT_INT, (STATSFUNC)maxinfo_max_event_exec_time },