Reindent server/core/poll.c

This commit is contained in:
Johan Wikman
2015-11-30 15:53:14 +02:00
parent c0615408aa
commit 6c401b9085
2 changed files with 1258 additions and 1160 deletions

View File

@ -88,6 +88,7 @@ static GWBITMASK poll_mask;
static simple_mutex_t epoll_wait_mutex; /*< serializes calls to epoll_wait */
#endif
static int n_waiting = 0; /*< No. of threads in epoll_wait */
static int process_pollq(int thread_id);
static void poll_add_event_to_dcb(DCB* dcb, GWBUF* buf, __uint32_t ev);
static bool poll_dcb_session_check(DCB *dcb, const char *);
@ -114,15 +115,21 @@ static int n_threads; /*< No. of threads */
/**
* Internal MaxScale thread states
*/
typedef enum { THREAD_STOPPED, THREAD_IDLE,
THREAD_POLLING, THREAD_PROCESSING,
THREAD_ZPROCESSING } THREAD_STATE;
typedef enum
{
THREAD_STOPPED,
THREAD_IDLE,
THREAD_POLLING,
THREAD_PROCESSING,
THREAD_ZPROCESSING
} THREAD_STATE;
/**
* Thread data used to report the current state and activity related to
* a thread
*/
typedef struct {
typedef struct
{
THREAD_STATE state; /*< Current thread state */
int n_fds; /*< No. of descriptors thread is processing */
DCB *cur_dcb; /*< Current DCB being processed */
@ -145,7 +152,8 @@ static THREAD_DATA *thread_data = NULL; /*< Status of each thread */
/**
* The polling statistics
*/
static struct {
static struct
{
int n_read; /*< Number of read events */
int n_write; /*< Number of write events */
int n_error; /*< Number of error events */
@ -155,8 +163,7 @@ static struct {
int n_pollev; /*< Number of polls returning events */
int n_nbpollev; /*< Number of polls returning events */
int n_nothreads; /*< Number of times no threads are polling */
int n_fds[MAXNFDS]; /*< Number of wakeups with particular
n_fds value */
int n_fds[MAXNFDS]; /*< Number of wakeups with particular n_fds value */
int evq_length; /*< Event queue length */
int evq_pending; /*< Number of pending descriptors in event queue */
int evq_max; /*< Maximum event queue length */
@ -168,7 +175,8 @@ static struct {
/**
* The event queue statistics
*/
static struct {
static struct
{
unsigned int qtimes[N_QUEUE_TIMES+1];
unsigned int exectimes[N_QUEUE_TIMES+1];
unsigned long maxqtime;
@ -201,7 +209,9 @@ poll_init()
int i;
if (epoll_fd != -1)
{
return;
}
if ((epoll_fd = epoll_create(MAX_EVENTS)) == -1)
{
perror("epoll_create");
@ -211,8 +221,7 @@ int i;
memset(&queueStats, 0, sizeof(queueStats));
bitmask_init(&poll_mask);
n_threads = config_threadcount();
if ((thread_data =
(THREAD_DATA *)malloc(n_threads * sizeof(THREAD_DATA))) != NULL)
if ((thread_data = (THREAD_DATA *)malloc(n_threads * sizeof(THREAD_DATA))) != NULL)
{
for (i = 0; i < n_threads; i++)
{
@ -227,10 +236,14 @@ int i;
n_avg_samples = 15 * 60 / POLL_LOAD_FREQ;
avg_samples = (double *)malloc(sizeof(double) * n_avg_samples);
for (i = 0; i < n_avg_samples; i++)
{
avg_samples[i] = 0.0;
}
evqp_samples = (int *)malloc(sizeof(int) * n_avg_samples);
for (i = 0; i < n_avg_samples; i++)
{
evqp_samples[i] = 0.0;
}
number_poll_spins = config_nbpolls();
max_poll_sleep = config_pollsleep();
@ -376,8 +389,14 @@ poll_remove_dcb(DCB *dcb)
* return 0 or crash. So if it returns non-zero result,
* things have gone wrong and we crash.
*/
if (rc) rc = poll_resolve_error(dcb, errno, false);
if (rc) raise(SIGABRT);
if (rc)
{
rc = poll_resolve_error(dcb, errno, false);
}
if (rc)
{
raise(SIGABRT);
}
}
return rc;
}
@ -434,10 +453,22 @@ poll_resolve_error(DCB *dcb, int errornum, bool adding)
}
}
/* Common checks for add or remove - crash MaxScale */
if (EBADF == errornum) raise(SIGABRT);
if (EINVAL == errornum) raise(SIGABRT);
if (ENOMEM == errornum) raise(SIGABRT);
if (EPERM == errornum) raise(SIGABRT);
if (EBADF == errornum)
{
raise(SIGABRT);
}
if (EINVAL == errornum)
{
raise(SIGABRT);
}
if (ENOMEM == errornum)
{
raise(SIGABRT);
}
if (EPERM == errornum)
{
raise(SIGABRT);
}
/* Undocumented error number */
raise(SIGABRT);
/* The following statement should never be reached, but avoids compiler warning */
@ -577,7 +608,9 @@ int poll_spins = 0;
}
if (n_waiting == 0)
{
atomic_add(&pollStats.n_nothreads, 1);
}
#if MUTEX_EPOLL
simple_mutex_unlock(&epoll_wait_mutex);
#endif
@ -586,7 +619,9 @@ int poll_spins = 0;
{
timeout_bias = 1;
if (poll_spins <= number_poll_spins + 1)
{
atomic_add(&pollStats.n_nbpollev, 1);
}
poll_spins = 0;
MXS_DEBUG("%lu [poll_waitevents] epoll_wait found %d fds",
pthread_self(),
@ -602,8 +637,7 @@ int poll_spins = 0;
pollStats.n_fds[(nfds < MAXNFDS ? (nfds - 1) : MAXNFDS - 1)]++;
load_average = (load_average * load_samples + nfds)
/ (load_samples + 1);
load_average = (load_average * load_samples + nfds) / (load_samples + 1);
atomic_add(&load_samples, 1);
atomic_add(&load_nfds, nfds);
@ -667,13 +701,19 @@ int poll_spins = 0;
* of the count goes wrong.
*/
if (process_pollq(thread_id))
{
timeout_bias = 1;
}
if (thread_data)
{
thread_data[thread_id].state = THREAD_ZPROCESSING;
}
dcb_process_zombies(thread_id);
if (thread_data)
{
thread_data[thread_id].state = THREAD_IDLE;
}
if (do_shutdown)
{
@ -776,9 +816,11 @@ unsigned long qtime;
}
else
{
do {
do
{
dcb = dcb->evq.next;
} while (dcb != eventq && dcb->evq.processing == 1);
}
while (dcb != eventq && dcb->evq.processing == 1);
if (dcb->evq.processing == 0)
{
@ -798,7 +840,9 @@ unsigned long qtime;
spinlock_release(&pollqlock);
if (found == 0)
{
return 0;
}
#if PROFILE_POLL
memlog_log(plog, hkheartbeat - dcb->evq.inserted);
@ -807,11 +851,17 @@ unsigned long qtime;
dcb->evq.started = hkheartbeat;
if (qtime > N_QUEUE_TIMES)
{
queueStats.qtimes[N_QUEUE_TIMES]++;
}
else
{
queueStats.qtimes[qtime]++;
}
if (qtime > queueStats.maxqtime)
{
queueStats.maxqtime = qtime;
}
CHK_DCB(dcb);
@ -823,7 +873,8 @@ unsigned long qtime;
}
#if defined(FAKE_CODE)
if (dcb_fake_write_ev[dcb->fd] != 0) {
if (dcb_fake_write_ev[dcb->fd] != 0)
{
MXS_DEBUG("%lu [poll_waitevents] "
"Added fake events %d to ev %d.",
pthread_self(),
@ -833,7 +884,7 @@ unsigned long qtime;
dcb_fake_write_ev[dcb->fd] = 0;
}
#endif /* FAKE_CODE */
ss_debug(spinlock_acquire(&dcb->dcb_initlock);)
ss_debug(spinlock_acquire(&dcb->dcb_initlock));
ss_dassert(dcb->state != DCB_STATE_ALLOC);
/* It isn't obvious that this is impossible */
/* ss_dassert(dcb->state != DCB_STATE_DISCONNECTED); */
@ -855,7 +906,8 @@ unsigned long qtime;
int eno = 0;
eno = gw_getsockerrno(dcb->fd);
if (eno == 0) {
if (eno == 0)
{
atomic_add(&pollStats.n_write, 1);
/** Read session id to thread's local storage */
dcb_get_ses_log_info(dcb,
@ -866,7 +918,9 @@ unsigned long qtime;
{
dcb->func.write_ready(dcb);
}
} else {
}
else
{
char errbuf[STRERROR_BUFLEN];
MXS_DEBUG("%lu [poll_waitevents] "
"EPOLLOUT due %d, %s. "
@ -886,8 +940,7 @@ unsigned long qtime;
"Accept in fd %d",
pthread_self(),
dcb->fd);
atomic_add(
&pollStats.n_accept, 1);
atomic_add(&pollStats.n_accept, 1);
dcb_get_ses_log_info(dcb,
&mxs_log_tls.li_sesid,
&mxs_log_tls.li_enabled_priorities);
@ -920,7 +973,8 @@ unsigned long qtime;
{
int eno = gw_getsockerrno(dcb->fd);
#if defined(FAKE_CODE)
if (eno == 0) {
if (eno == 0)
{
eno = dcb_fake_write_errno[dcb->fd];
char errbuf[STRERROR_BUFLEN];
MXS_DEBUG("%lu [poll_waitevents] "
@ -932,7 +986,8 @@ unsigned long qtime;
}
dcb_fake_write_errno[dcb->fd] = 0;
#endif /* FAKE_CODE */
if (eno != 0) {
if (eno != 0)
{
char errbuf[STRERROR_BUFLEN];
MXS_DEBUG("%lu [poll_waitevents] "
"EPOLLERR due %d, %s.",
@ -982,8 +1037,10 @@ unsigned long qtime;
}
}
else
{
spinlock_release(&dcb->dcb_initlock);
}
}
#ifdef EPOLLRDHUP
if (ev & EPOLLRDHUP)
@ -1016,17 +1073,25 @@ unsigned long qtime;
}
}
else
{
spinlock_release(&dcb->dcb_initlock);
}
}
#endif
qtime = hkheartbeat - dcb->evq.started;
if (qtime > N_QUEUE_TIMES)
{
queueStats.exectimes[N_QUEUE_TIMES]++;
}
else
{
queueStats.exectimes[qtime % N_QUEUE_TIMES]++;
}
if (qtime > queueStats.maxexectime)
{
queueStats.maxexectime = qtime;
}
spinlock_acquire(&pollqlock);
dcb->evq.processing_events = 0;
@ -1110,7 +1175,6 @@ poll_dcb_session_check(DCB *dcb, const char *function)
}
/**
*
* Shutdown the polling loop
*/
void
@ -1212,7 +1276,9 @@ char *str;
str = malloc(22); // 22 is max returned string length
if (str == NULL)
{
return NULL;
}
*str = 0;
if (event & EPOLLIN)
{
@ -1227,20 +1293,26 @@ char *str;
if (event & EPOLLERR)
{
if (*str)
{
strcat(str, "|");
}
strcat(str, "ERR");
}
if (event & EPOLLHUP)
{
if (*str)
{
strcat(str, "|");
}
strcat(str, "HUP");
}
#ifdef EPOLLRDHUP
if (event & EPOLLRDHUP)
{
if (*str)
{
strcat(str, "|");
}
strcat(str, "RDHUP");
}
#endif
@ -1261,7 +1333,6 @@ char *state;
double avg1 = 0.0, avg5 = 0.0, avg15 = 0.0;
double qavg1 = 0.0, qavg5 = 0.0, qavg15 = 0.0;
dcb_printf(dcb, "Polling Threads.\n\n");
dcb_printf(dcb, "Historic Thread Load Average: %.2f.\n", load_average);
dcb_printf(dcb, "Current Thread Load Average: %.2f.\n", current_avg);
@ -1279,7 +1350,9 @@ double qavg1 = 0.0, qavg5 = 0.0, qavg15 = 0.0;
n = 5 * 60 / POLL_LOAD_FREQ;
i = next_sample - (n + 1);
if (i < 0)
{
i += n_avg_samples;
}
for (j = i; j < i + n; j++)
{
avg5 += avg_samples[j % n_avg_samples];
@ -1292,7 +1365,9 @@ double qavg1 = 0.0, qavg5 = 0.0, qavg15 = 0.0;
n = 60 / POLL_LOAD_FREQ;
i = next_sample - (n + 1);
if (i < 0)
{
i += n_avg_samples;
}
for (j = i; j < i + n; j++)
{
avg1 += avg_samples[j % n_avg_samples];
@ -1308,7 +1383,9 @@ double qavg1 = 0.0, qavg5 = 0.0, qavg15 = 0.0;
"1 Minute Average: %.2f\n\n", qavg15, qavg5, qavg1);
if (thread_data == NULL)
{
return;
}
dcb_printf(dcb, " ID | State | # fds | Descriptor | Running | Event\n");
dcb_printf(dcb, "----+------------+--------+------------------+----------+---------------\n");
for (i = 0; i < n_threads; i++)
@ -1332,17 +1409,20 @@ double qavg1 = 0.0, qavg5 = 0.0, qavg15 = 0.0;
break;
}
if (thread_data[i].state != THREAD_PROCESSING)
{
dcb_printf(dcb,
" %2d | %-10s | | | |\n",
i, state);
}
else if (thread_data[i].cur_dcb == NULL)
{
dcb_printf(dcb,
" %2d | %-10s | %6d | | |\n",
i, state, thread_data[i].n_fds);
}
else
{
char *event_string
= event_to_string(thread_data[i].event);
char *event_string = event_to_string(thread_data[i].event);
bool from_heap;
if (event_string == NULL)
@ -1387,15 +1467,21 @@ int new_samples, new_nfds;
/* POLL_LOAD_FREQ average is... */
if (new_samples)
{
current_avg = new_nfds / new_samples;
}
else
{
current_avg = 0.0;
}
avg_samples[next_sample] = current_avg;
evqp_samples[next_sample] = pollStats.evq_pending;
next_sample++;
if (next_sample >= n_avg_samples)
{
next_sample = 0;
}
}
/**
* Add given GWBUF to DCB's readqueue and add a pending EPOLLIN event for DCB.
@ -1406,8 +1492,7 @@ int new_samples, new_nfds;
* @param buf GWBUF including the data
*
*/
void poll_add_epollin_event_to_dcb(
DCB* dcb,
void poll_add_epollin_event_to_dcb(DCB* dcb,
GWBUF* buf)
{
__uint32_t ev;
@ -1418,8 +1503,7 @@ void poll_add_epollin_event_to_dcb(
}
static void poll_add_event_to_dcb(
DCB* dcb,
static void poll_add_event_to_dcb(DCB* dcb,
GWBUF* buf,
__uint32_t ev)
{
@ -1496,7 +1580,9 @@ uint32_t ev = EPOLLOUT;
dcb->evq.prev->evq.next = dcb->evq.next;
dcb->evq.next->evq.prev = dcb->evq.prev;
if (eventq == dcb)
{
eventq = dcb->evq.next;
}
dcb->evq.next = NULL;
dcb->evq.prev = NULL;
pollStats.evq_length--;
@ -1505,7 +1591,9 @@ uint32_t ev = EPOLLOUT;
if (DCB_POLL_BUSY(dcb))
{
if (dcb->evq.pending_events == 0)
{
pollStats.evq_pending++;
}
dcb->evq.pending_events |= ev;
}
else
@ -1552,7 +1640,9 @@ uint32_t ev = EPOLLRDHUP;
if (DCB_POLL_BUSY(dcb))
{
if (dcb->evq.pending_events == 0)
{
pollStats.evq_pending++;
}
dcb->evq.pending_events |= ev;
}
else
@ -1606,7 +1696,8 @@ char *tmp1, *tmp2;
dcb_printf(pdcb, "%-16s | %-10s | %-18s | %s\n", "DCB", "Status", "Processing Events",
"Pending Events");
dcb_printf(pdcb, "-----------------+------------+--------------------+-------------------\n");
do {
do
{
dcb_printf(pdcb, "%-16p | %-10s | %-18s | %-18s\n", dcb,
dcb->evq.processing ? "Processing" : "Pending",
(tmp1 = event_to_string(dcb->evq.processing_events)),
@ -1614,7 +1705,8 @@ char *tmp1, *tmp2;
free(tmp1);
free(tmp2);
dcb = dcb->evq.next;
} while (dcb != eventq);
}
while (dcb != eventq);
spinlock_release(&pollqlock);
}
@ -1705,7 +1797,9 @@ RESULT_ROW *row;
}
row = resultset_make_row(set);
if (*rowno == 0)
{
resultset_row_set(row, 0, "< 100ms");
}
else if (*rowno == N_QUEUE_TIMES - 1)
{
snprintf(buf,39, "> %2d00ms", N_QUEUE_TIMES);
@ -1740,7 +1834,9 @@ RESULTSET *set;
int *data;
if ((data = (int *)malloc(sizeof(int))) == NULL)
{
return NULL;
}
*data = 0;
if ((set = resultset_create(eventTimesRowCallback, data)) == NULL)
{

View File

@ -38,7 +38,8 @@
/**
* A statistic identifier that can be returned by poll_get_stat
*/
typedef enum {
typedef enum
{
POLL_STAT_READ,
POLL_STAT_WRITE,
POLL_STAT_ERROR,
@ -68,4 +69,5 @@ extern int poll_get_stat(POLL_STAT stat);
extern RESULTSET* eventTimesGetList();
extern void poll_fake_hangup_event(DCB *dcb);
extern void poll_fake_write_event(DCB *dcb);
#endif