Move all statistics gathering to poll_waitevents().

The handler callback should now return a bitmask with bits set
according to what it did when it was called. That way the actual
statistics gathering can be done in poll_waitevents() and the
handler need not be aware of any thread structs.

Actually, the only thing that needs any assistance is accept handling,
because in poll_waitevents() we do not know whether a READ event
relates to a listening or a normal socket, that is, should the
event be counted as an accept or as a read.
This commit is contained in:
Johan Wikman 2017-02-15 09:21:15 +02:00
parent dec2bcdab2
commit b11b848e66
2 changed files with 59 additions and 15 deletions

View File

@ -18,6 +18,16 @@
#include <maxscale/cdefs.h>
typedef enum mxs_poll_action
{
MXS_POLL_NOP = 0x00,
MXS_POLL_ACCEPT = 0x01,
MXS_POLL_READ = 0x02,
MXS_POLL_WRITE = 0x04,
MXS_POLL_HUP = 0x08,
MXS_POLL_ERROR = 0x10,
} mxs_poll_action_t;
typedef struct mxs_poll_data
{
/** Pointer to function that knows how to handle events for this particular
@ -26,8 +36,10 @@ typedef struct mxs_poll_data
* @param data The `mxs_poll_data` instance that contained this pointer.
* @param wid The worker thread id.
* @param events The epoll events.
*
* @return A combination of mxs_poll_action_t enumeration values.
*/
void (*handler)(struct mxs_poll_data *data, int wid, uint32_t events);
uint32_t (*handler)(struct mxs_poll_data *data, int wid, uint32_t events);
struct
{

View File

@ -111,8 +111,8 @@ static simple_mutex_t epoll_wait_mutex; /*< serializes calls to epoll_wait */
#endif
static int n_waiting = 0; /*< No. of threads in epoll_wait */
static int process_pollq_dcb(DCB *dcb, int thread_id, uint32_t ev);
static void dcb_poll_handler(MXS_POLL_DATA *data, int wid, uint32_t events);
static uint32_t process_pollq_dcb(DCB *dcb, int thread_id, uint32_t ev);
static uint32_t dcb_poll_handler(MXS_POLL_DATA *data, int wid, uint32_t events);
static void poll_add_event_to_dcb(DCB* dcb, GWBUF* buf, uint32_t ev);
static bool poll_dcb_session_check(DCB *dcb, const char *);
@ -857,7 +857,32 @@ poll_waitevents(void *arg)
thread_data[thread_id].cur_data = data;
thread_data[thread_id].event = events[i].events;
data->handler(data, thread_id, events[i].events);
uint32_t actions = data->handler(data, thread_id, events[i].events);
if (actions & MXS_POLL_ACCEPT)
{
ts_stats_increment(pollStats.n_accept, thread_id);
}
if (actions & MXS_POLL_READ)
{
ts_stats_increment(pollStats.n_read, thread_id);
}
if (actions & MXS_POLL_WRITE)
{
ts_stats_increment(pollStats.n_write, thread_id);
}
if (actions & MXS_POLL_HUP)
{
ts_stats_increment(pollStats.n_hup, thread_id);
}
if (actions & MXS_POLL_ERROR)
{
ts_stats_increment(pollStats.n_error, thread_id);
}
/** Calculate event execution statistics */
qtime = hkheartbeat - started;
@ -949,18 +974,20 @@ poll_set_maxwait(unsigned int maxwait)
* @param thread_id The thread ID of the calling thread
* @return 0 if no DCB's have been processed
*/
static int
static uint32_t
process_pollq_dcb(DCB *dcb, int thread_id, uint32_t ev)
{
ss_dassert(dcb->poll.thread.id == thread_id || dcb->dcb_role == DCB_ROLE_SERVICE_LISTENER);
CHK_DCB(dcb);
uint32_t rc = MXS_POLL_NOP;
/* It isn't obvious that this is impossible */
/* ss_dassert(dcb->state != DCB_STATE_DISCONNECTED); */
if (DCB_STATE_DISCONNECTED == dcb->state)
{
return 0;
return rc;
}
MXS_DEBUG("%lu [poll_waitevents] event %d dcb %p "
@ -977,7 +1004,7 @@ process_pollq_dcb(DCB *dcb, int thread_id, uint32_t ev)
if (eno == 0)
{
ts_stats_increment(pollStats.n_write, thread_id);
rc |= MXS_POLL_WRITE;
if (poll_dcb_session_check(dcb, "write_ready"))
{
@ -1004,7 +1031,7 @@ process_pollq_dcb(DCB *dcb, int thread_id, uint32_t ev)
"Accept in fd %d",
pthread_self(),
dcb->fd);
ts_stats_increment(pollStats.n_accept, thread_id);
rc |= MXS_POLL_ACCEPT;
if (poll_dcb_session_check(dcb, "accept"))
{
@ -1018,7 +1045,7 @@ process_pollq_dcb(DCB *dcb, int thread_id, uint32_t ev)
pthread_self(),
dcb,
dcb->fd);
ts_stats_increment(pollStats.n_read, thread_id);
rc |= MXS_POLL_READ;
if (poll_dcb_session_check(dcb, "read"))
{
@ -1049,7 +1076,7 @@ process_pollq_dcb(DCB *dcb, int thread_id, uint32_t ev)
eno,
mxs_strerror(eno));
}
ts_stats_increment(pollStats.n_error, thread_id);
rc |= MXS_POLL_ERROR;
if (poll_dcb_session_check(dcb, "error"))
{
@ -1068,7 +1095,9 @@ process_pollq_dcb(DCB *dcb, int thread_id, uint32_t ev)
dcb->fd,
eno,
mxs_strerror(eno));
ts_stats_increment(pollStats.n_hup, thread_id);
rc |= MXS_POLL_HUP;
if ((dcb->flags & DCBF_HUNG) == 0)
{
dcb->flags |= DCBF_HUNG;
@ -1092,7 +1121,8 @@ process_pollq_dcb(DCB *dcb, int thread_id, uint32_t ev)
dcb->fd,
eno,
mxs_strerror(eno));
ts_stats_increment(pollStats.n_hup, thread_id);
rc |= MXS_POLL_HUP;
if ((dcb->flags & DCBF_HUNG) == 0)
{
@ -1105,12 +1135,12 @@ process_pollq_dcb(DCB *dcb, int thread_id, uint32_t ev)
}
}
#endif
return 1;
return rc;
}
static void dcb_poll_handler(MXS_POLL_DATA *data, int wid, uint32_t events)
static uint32_t dcb_poll_handler(MXS_POLL_DATA *data, int wid, uint32_t events)
{
process_pollq_dcb((DCB*)data, wid, events);
uint32_t rc = process_pollq_dcb((DCB*)data, wid, events);
// Since this loop is now here, it will be processed once per extracted epoll
// event and not once per extraction of events, but as this is temporary code
@ -1138,6 +1168,8 @@ static void dcb_poll_handler(MXS_POLL_DATA *data, int wid, uint32_t events)
event = event->next;
MXS_FREE(tmp);
}
return rc;
}
/**