Poll statistics moved to worker
Each worker now has a separate structure for collecting the polling statistics that is passed to epoll_waitevents(). When the stats are asked for, we loop over all separate stats and combine them. So, instead of having every statistics of each thread one cacheline apart, each thread has all its statistics in one lump that, for obvious reasons, are going to be apart. The primary purpose of this excersize is to remove the hardwired nature of the statistics collection. For instance, the admin thread will be doing I/O but that I/O should not be included in the statistics of the workers.
This commit is contained in:
@ -81,6 +81,39 @@ typedef struct
|
|||||||
|
|
||||||
extern THREAD_DATA *thread_data;
|
extern THREAD_DATA *thread_data;
|
||||||
|
|
||||||
|
// TODO: Temporarily moved here.
|
||||||
|
/**
|
||||||
|
* The number of buckets used to gather statistics about how many
|
||||||
|
* descriptors where processed on each epoll completion.
|
||||||
|
*
|
||||||
|
* An array of wakeup counts is created, with the number of descriptors used
|
||||||
|
* to index that array. Each time a completion occurs the n_fds - 1 value is
|
||||||
|
* used to index this array and increment the count held there.
|
||||||
|
* If n_fds - 1 >= MAXFDS then the count at MAXFDS -1 is incremented.
|
||||||
|
*/
|
||||||
|
#define MAXNFDS 10
|
||||||
|
|
||||||
|
// TODO: Temporarily moved here.
|
||||||
|
typedef struct
|
||||||
|
{
|
||||||
|
int64_t n_read; /*< Number of read events */
|
||||||
|
int64_t n_write; /*< Number of write events */
|
||||||
|
int64_t n_error; /*< Number of error events */
|
||||||
|
int64_t n_hup; /*< Number of hangup events */
|
||||||
|
int64_t n_accept; /*< Number of accept events */
|
||||||
|
int64_t n_polls; /*< Number of poll cycles */
|
||||||
|
int64_t n_pollev; /*< Number of polls returning events */
|
||||||
|
int64_t n_nbpollev; /*< Number of polls returning events */
|
||||||
|
int64_t n_nothreads; /*< Number of times no threads are polling */
|
||||||
|
int32_t n_fds[MAXNFDS]; /*< Number of wakeups with particular n_fds value */
|
||||||
|
int64_t evq_length; /*< Event queue length */
|
||||||
|
int64_t evq_max; /*< Maximum event queue length */
|
||||||
|
int64_t blockingpolls; /*< Number of epoll_waits with a timeout specified */
|
||||||
|
} POLL_STATS;
|
||||||
|
|
||||||
|
// TODO: Temporarily moved here.
|
||||||
|
extern POLL_STATS* pollStats;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A file descriptor should be added to the poll set of all workers.
|
* A file descriptor should be added to the poll set of all workers.
|
||||||
*/
|
*/
|
||||||
|
@ -53,6 +53,7 @@ void poll_init();
|
|||||||
void poll_waitevents(int epoll_fd,
|
void poll_waitevents(int epoll_fd,
|
||||||
int thread_id,
|
int thread_id,
|
||||||
THREAD_DATA* thread_data,
|
THREAD_DATA* thread_data,
|
||||||
|
POLL_STATS* pollStats,
|
||||||
bool (*should_terminate)(void* data),
|
bool (*should_terminate)(void* data),
|
||||||
void* data);
|
void* data);
|
||||||
void poll_set_maxwait(unsigned int);
|
void poll_set_maxwait(unsigned int);
|
||||||
|
@ -197,10 +197,10 @@ public:
|
|||||||
static int get_current_id();
|
static int get_current_id();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
Worker(int id, int epoll_fd, THREAD_DATA* pThread_data);
|
Worker(int id, int epoll_fd, THREAD_DATA* pThread_data, POLL_STATS* pPoll_stats);
|
||||||
virtual ~Worker();
|
virtual ~Worker();
|
||||||
|
|
||||||
static Worker* create(int id, THREAD_DATA* pThread_data);
|
static Worker* create(int id, THREAD_DATA* pThread_data, POLL_STATS* pPoll_stats);
|
||||||
|
|
||||||
void handle_message(MessageQueue& queue, const MessageQueue::Message& msg); // override
|
void handle_message(MessageQueue& queue, const MessageQueue::Message& msg); // override
|
||||||
|
|
||||||
@ -210,6 +210,7 @@ private:
|
|||||||
int m_id; /*< The id of the worker. */
|
int m_id; /*< The id of the worker. */
|
||||||
int m_epoll_fd; /*< The epoll file descriptor. */
|
int m_epoll_fd; /*< The epoll file descriptor. */
|
||||||
THREAD_DATA* m_pThread_data; /*< The thread data of the worker. */
|
THREAD_DATA* m_pThread_data; /*< The thread data of the worker. */
|
||||||
|
POLL_STATS* m_pPoll_stats; /*< Statistics for worker. */
|
||||||
MessageQueue* m_pQueue; /*< The message queue of the worker. */
|
MessageQueue* m_pQueue; /*< The message queue of the worker. */
|
||||||
THREAD m_thread; /*< The thread handle of the worker. */
|
THREAD m_thread; /*< The thread handle of the worker. */
|
||||||
bool m_started; /*< Whether the thread has been started or not. */
|
bool m_started; /*< Whether the thread has been started or not. */
|
||||||
|
@ -110,37 +110,6 @@ static int n_avg_samples;
|
|||||||
/* Thread statistics data */
|
/* Thread statistics data */
|
||||||
static int n_threads; /*< No. of threads */
|
static int n_threads; /*< No. of threads */
|
||||||
|
|
||||||
/**
|
|
||||||
* The number of buckets used to gather statistics about how many
|
|
||||||
* descriptors where processed on each epoll completion.
|
|
||||||
*
|
|
||||||
* An array of wakeup counts is created, with the number of descriptors used
|
|
||||||
* to index that array. Each time a completion occurs the n_fds - 1 value is
|
|
||||||
* used to index this array and increment the count held there.
|
|
||||||
* If n_fds - 1 >= MAXFDS then the count at MAXFDS -1 is incremented.
|
|
||||||
*/
|
|
||||||
#define MAXNFDS 10
|
|
||||||
|
|
||||||
/**
|
|
||||||
* The polling statistics
|
|
||||||
*/
|
|
||||||
static struct
|
|
||||||
{
|
|
||||||
ts_stats_t n_read; /*< Number of read events */
|
|
||||||
ts_stats_t n_write; /*< Number of write events */
|
|
||||||
ts_stats_t n_error; /*< Number of error events */
|
|
||||||
ts_stats_t n_hup; /*< Number of hangup events */
|
|
||||||
ts_stats_t n_accept; /*< Number of accept events */
|
|
||||||
ts_stats_t n_polls; /*< Number of poll cycles */
|
|
||||||
ts_stats_t n_pollev; /*< Number of polls returning events */
|
|
||||||
ts_stats_t n_nbpollev; /*< Number of polls returning events */
|
|
||||||
ts_stats_t n_nothreads; /*< Number of times no threads are polling */
|
|
||||||
int32_t n_fds[MAXNFDS]; /*< Number of wakeups with particular n_fds value */
|
|
||||||
ts_stats_t evq_length; /*< Event queue length */
|
|
||||||
ts_stats_t evq_max; /*< Maximum event queue length */
|
|
||||||
ts_stats_t blockingpolls; /*< Number of epoll_waits with a timeout specified */
|
|
||||||
} pollStats;
|
|
||||||
|
|
||||||
#define N_QUEUE_TIMES 30
|
#define N_QUEUE_TIMES 30
|
||||||
/**
|
/**
|
||||||
* The event queue statistics
|
* The event queue statistics
|
||||||
@ -178,23 +147,10 @@ poll_init()
|
|||||||
exit(-1);
|
exit(-1);
|
||||||
}
|
}
|
||||||
|
|
||||||
memset(&pollStats, 0, sizeof(pollStats));
|
|
||||||
memset(&queueStats, 0, sizeof(queueStats));
|
memset(&queueStats, 0, sizeof(queueStats));
|
||||||
|
|
||||||
if ((pollStats.n_read = ts_stats_alloc()) == NULL ||
|
if ((queueStats.maxqtime = ts_stats_alloc()) == NULL ||
|
||||||
(pollStats.n_write = ts_stats_alloc()) == NULL ||
|
(queueStats.maxexectime = ts_stats_alloc()) == NULL)
|
||||||
(pollStats.n_error = ts_stats_alloc()) == NULL ||
|
|
||||||
(pollStats.n_hup = ts_stats_alloc()) == NULL ||
|
|
||||||
(pollStats.n_accept = ts_stats_alloc()) == NULL ||
|
|
||||||
(pollStats.n_polls = ts_stats_alloc()) == NULL ||
|
|
||||||
(pollStats.n_pollev = ts_stats_alloc()) == NULL ||
|
|
||||||
(pollStats.n_nbpollev = ts_stats_alloc()) == NULL ||
|
|
||||||
(pollStats.n_nothreads = ts_stats_alloc()) == NULL ||
|
|
||||||
(pollStats.evq_length = ts_stats_alloc()) == NULL ||
|
|
||||||
(pollStats.evq_max = ts_stats_alloc()) == NULL ||
|
|
||||||
(queueStats.maxqtime = ts_stats_alloc()) == NULL ||
|
|
||||||
(queueStats.maxexectime = ts_stats_alloc()) == NULL ||
|
|
||||||
(pollStats.blockingpolls = ts_stats_alloc()) == NULL)
|
|
||||||
{
|
{
|
||||||
MXS_OOM_MESSAGE("FATAL: Could not allocate statistics data.");
|
MXS_OOM_MESSAGE("FATAL: Could not allocate statistics data.");
|
||||||
exit(-1);
|
exit(-1);
|
||||||
@ -347,6 +303,7 @@ bool poll_remove_fd_from_worker(int wid, int fd)
|
|||||||
void poll_waitevents(int epoll_fd,
|
void poll_waitevents(int epoll_fd,
|
||||||
int thread_id,
|
int thread_id,
|
||||||
THREAD_DATA* thread_data,
|
THREAD_DATA* thread_data,
|
||||||
|
POLL_STATS* poll_stats,
|
||||||
bool (*should_shutdown)(void* data),
|
bool (*should_shutdown)(void* data),
|
||||||
void* data)
|
void* data)
|
||||||
{
|
{
|
||||||
@ -363,7 +320,7 @@ void poll_waitevents(int epoll_fd,
|
|||||||
atomic_add(&n_waiting, 1);
|
atomic_add(&n_waiting, 1);
|
||||||
thread_data->state = THREAD_POLLING;
|
thread_data->state = THREAD_POLLING;
|
||||||
|
|
||||||
ts_stats_increment(pollStats.n_polls, thread_id);
|
atomic_add_int64(&poll_stats->n_polls, 1);
|
||||||
if ((nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, 0)) == -1)
|
if ((nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, 0)) == -1)
|
||||||
{
|
{
|
||||||
atomic_add(&n_waiting, -1);
|
atomic_add(&n_waiting, -1);
|
||||||
@ -390,7 +347,7 @@ void poll_waitevents(int epoll_fd,
|
|||||||
{
|
{
|
||||||
timeout_bias++;
|
timeout_bias++;
|
||||||
}
|
}
|
||||||
ts_stats_increment(pollStats.blockingpolls, thread_id);
|
atomic_add_int64(&poll_stats->blockingpolls, 1);
|
||||||
nfds = epoll_wait(epoll_fd,
|
nfds = epoll_wait(epoll_fd,
|
||||||
events,
|
events,
|
||||||
MAX_EVENTS,
|
MAX_EVENTS,
|
||||||
@ -407,31 +364,31 @@ void poll_waitevents(int epoll_fd,
|
|||||||
|
|
||||||
if (n_waiting == 0)
|
if (n_waiting == 0)
|
||||||
{
|
{
|
||||||
ts_stats_increment(pollStats.n_nothreads, thread_id);
|
atomic_add_int64(&poll_stats->n_nothreads, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (nfds > 0)
|
if (nfds > 0)
|
||||||
{
|
{
|
||||||
ts_stats_set(pollStats.evq_length, nfds, thread_id);
|
poll_stats->evq_length = nfds;
|
||||||
ts_stats_set_max(pollStats.evq_max, nfds, thread_id);
|
poll_stats->evq_max = nfds;
|
||||||
|
|
||||||
timeout_bias = 1;
|
timeout_bias = 1;
|
||||||
if (poll_spins <= number_poll_spins + 1)
|
if (poll_spins <= number_poll_spins + 1)
|
||||||
{
|
{
|
||||||
ts_stats_increment(pollStats.n_nbpollev, thread_id);
|
atomic_add_int64(&poll_stats->n_nbpollev, 1);
|
||||||
}
|
}
|
||||||
poll_spins = 0;
|
poll_spins = 0;
|
||||||
MXS_DEBUG("%lu [poll_waitevents] epoll_wait found %d fds",
|
MXS_DEBUG("%lu [poll_waitevents] epoll_wait found %d fds",
|
||||||
pthread_self(),
|
pthread_self(),
|
||||||
nfds);
|
nfds);
|
||||||
ts_stats_increment(pollStats.n_pollev, thread_id);
|
atomic_add_int64(&poll_stats->n_pollev, 1);
|
||||||
|
|
||||||
thread_data->n_fds = nfds;
|
thread_data->n_fds = nfds;
|
||||||
thread_data->cur_data = NULL;
|
thread_data->cur_data = NULL;
|
||||||
thread_data->event = 0;
|
thread_data->event = 0;
|
||||||
thread_data->state = THREAD_PROCESSING;
|
thread_data->state = THREAD_PROCESSING;
|
||||||
|
|
||||||
pollStats.n_fds[(nfds < MAXNFDS ? (nfds - 1) : MAXNFDS - 1)]++;
|
poll_stats->n_fds[(nfds < MAXNFDS ? (nfds - 1) : MAXNFDS - 1)]++;
|
||||||
|
|
||||||
load_average = (load_average * load_samples + nfds) / (load_samples + 1);
|
load_average = (load_average * load_samples + nfds) / (load_samples + 1);
|
||||||
atomic_add(&load_samples, 1);
|
atomic_add(&load_samples, 1);
|
||||||
@ -477,27 +434,27 @@ void poll_waitevents(int epoll_fd,
|
|||||||
|
|
||||||
if (actions & MXS_POLL_ACCEPT)
|
if (actions & MXS_POLL_ACCEPT)
|
||||||
{
|
{
|
||||||
ts_stats_increment(pollStats.n_accept, thread_id);
|
atomic_add_int64(&poll_stats->n_accept, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (actions & MXS_POLL_READ)
|
if (actions & MXS_POLL_READ)
|
||||||
{
|
{
|
||||||
ts_stats_increment(pollStats.n_read, thread_id);
|
atomic_add_int64(&poll_stats->n_read, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (actions & MXS_POLL_WRITE)
|
if (actions & MXS_POLL_WRITE)
|
||||||
{
|
{
|
||||||
ts_stats_increment(pollStats.n_write, thread_id);
|
atomic_add_int64(&poll_stats->n_write, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (actions & MXS_POLL_HUP)
|
if (actions & MXS_POLL_HUP)
|
||||||
{
|
{
|
||||||
ts_stats_increment(pollStats.n_hup, thread_id);
|
atomic_add_int64(&poll_stats->n_hup, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (actions & MXS_POLL_ERROR)
|
if (actions & MXS_POLL_ERROR)
|
||||||
{
|
{
|
||||||
ts_stats_increment(pollStats.n_error, thread_id);
|
atomic_add_int64(&poll_stats->n_error, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Calculate event execution statistics */
|
/** Calculate event execution statistics */
|
||||||
@ -828,6 +785,44 @@ spin_reporter(void *dcb, char *desc, int value)
|
|||||||
dcb_printf((DCB *)dcb, "\t%-40s %d\n", desc, value);
|
dcb_printf((DCB *)dcb, "\t%-40s %d\n", desc, value);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int64_t poll_stats_get(int64_t POLL_STATS::*what, enum ts_stats_type type)
|
||||||
|
{
|
||||||
|
int64_t best = type == TS_STATS_MAX ? LONG_MIN : (type == TS_STATS_MIX ? LONG_MAX : 0);
|
||||||
|
|
||||||
|
size_t n_threads = config_threadcount();
|
||||||
|
|
||||||
|
for (size_t i = 0; i < n_threads; ++i)
|
||||||
|
{
|
||||||
|
POLL_STATS* pollStat = &pollStats[i];
|
||||||
|
int64_t value = pollStat->*what;
|
||||||
|
|
||||||
|
switch (type)
|
||||||
|
{
|
||||||
|
case TS_STATS_MAX:
|
||||||
|
if (value > best)
|
||||||
|
{
|
||||||
|
best = value;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
|
||||||
|
case TS_STATS_MIX:
|
||||||
|
if (value < best)
|
||||||
|
{
|
||||||
|
best = value;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
|
||||||
|
case TS_STATS_AVG:
|
||||||
|
case TS_STATS_SUM:
|
||||||
|
best += value;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return type == TS_STATS_AVG ? best / n_threads : best;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Debug routine to print the polling statistics
|
* Debug routine to print the polling statistics
|
||||||
*
|
*
|
||||||
@ -840,40 +835,51 @@ dprintPollStats(DCB *dcb)
|
|||||||
|
|
||||||
dcb_printf(dcb, "\nPoll Statistics.\n\n");
|
dcb_printf(dcb, "\nPoll Statistics.\n\n");
|
||||||
dcb_printf(dcb, "No. of epoll cycles: %" PRId64 "\n",
|
dcb_printf(dcb, "No. of epoll cycles: %" PRId64 "\n",
|
||||||
ts_stats_get(pollStats.n_polls, TS_STATS_SUM));
|
poll_stats_get(&POLL_STATS::n_polls, TS_STATS_SUM));
|
||||||
dcb_printf(dcb, "No. of epoll cycles with wait: %" PRId64 "\n",
|
dcb_printf(dcb, "No. of epoll cycles with wait: %" PRId64 "\n",
|
||||||
ts_stats_get(pollStats.blockingpolls, TS_STATS_SUM));
|
poll_stats_get(&POLL_STATS::blockingpolls, TS_STATS_SUM));
|
||||||
dcb_printf(dcb, "No. of epoll calls returning events: %" PRId64 "\n",
|
dcb_printf(dcb, "No. of epoll calls returning events: %" PRId64 "\n",
|
||||||
ts_stats_get(pollStats.n_pollev, TS_STATS_SUM));
|
poll_stats_get(&POLL_STATS::n_pollev, TS_STATS_SUM));
|
||||||
dcb_printf(dcb, "No. of non-blocking calls returning events: %" PRId64 "\n",
|
dcb_printf(dcb, "No. of non-blocking calls returning events: %" PRId64 "\n",
|
||||||
ts_stats_get(pollStats.n_nbpollev, TS_STATS_SUM));
|
poll_stats_get(&POLL_STATS::n_nbpollev, TS_STATS_SUM));
|
||||||
dcb_printf(dcb, "No. of read events: %" PRId64 "\n",
|
dcb_printf(dcb, "No. of read events: %" PRId64 "\n",
|
||||||
ts_stats_get(pollStats.n_read, TS_STATS_SUM));
|
poll_stats_get(&POLL_STATS::n_read, TS_STATS_SUM));
|
||||||
dcb_printf(dcb, "No. of write events: %" PRId64 "\n",
|
dcb_printf(dcb, "No. of write events: %" PRId64 "\n",
|
||||||
ts_stats_get(pollStats.n_write, TS_STATS_SUM));
|
poll_stats_get(&POLL_STATS::n_write, TS_STATS_SUM));
|
||||||
dcb_printf(dcb, "No. of error events: %" PRId64 "\n",
|
dcb_printf(dcb, "No. of error events: %" PRId64 "\n",
|
||||||
ts_stats_get(pollStats.n_error, TS_STATS_SUM));
|
poll_stats_get(&POLL_STATS::n_error, TS_STATS_SUM));
|
||||||
dcb_printf(dcb, "No. of hangup events: %" PRId64 "\n",
|
dcb_printf(dcb, "No. of hangup events: %" PRId64 "\n",
|
||||||
ts_stats_get(pollStats.n_hup, TS_STATS_SUM));
|
poll_stats_get(&POLL_STATS::n_hup, TS_STATS_SUM));
|
||||||
dcb_printf(dcb, "No. of accept events: %" PRId64 "\n",
|
dcb_printf(dcb, "No. of accept events: %" PRId64 "\n",
|
||||||
ts_stats_get(pollStats.n_accept, TS_STATS_SUM));
|
poll_stats_get(&POLL_STATS::n_accept, TS_STATS_SUM));
|
||||||
dcb_printf(dcb, "No. of times no threads polling: %" PRId64 "\n",
|
dcb_printf(dcb, "No. of times no threads polling: %" PRId64 "\n",
|
||||||
ts_stats_get(pollStats.n_nothreads, TS_STATS_SUM));
|
poll_stats_get(&POLL_STATS::n_nothreads, TS_STATS_SUM));
|
||||||
dcb_printf(dcb, "Total event queue length: %" PRId64 "\n",
|
dcb_printf(dcb, "Total event queue length: %" PRId64 "\n",
|
||||||
ts_stats_get(pollStats.evq_length, TS_STATS_AVG));
|
poll_stats_get(&POLL_STATS::evq_length, TS_STATS_AVG));
|
||||||
dcb_printf(dcb, "Average event queue length: %" PRId64 "\n",
|
dcb_printf(dcb, "Average event queue length: %" PRId64 "\n",
|
||||||
ts_stats_get(pollStats.evq_length, TS_STATS_AVG));
|
poll_stats_get(&POLL_STATS::evq_length, TS_STATS_AVG));
|
||||||
dcb_printf(dcb, "Maximum event queue length: %" PRId64 "\n",
|
dcb_printf(dcb, "Maximum event queue length: %" PRId64 "\n",
|
||||||
ts_stats_get(pollStats.evq_max, TS_STATS_MAX));
|
poll_stats_get(&POLL_STATS::evq_max, TS_STATS_MAX));
|
||||||
|
|
||||||
dcb_printf(dcb, "No of poll completions with descriptors\n");
|
dcb_printf(dcb, "No of poll completions with descriptors\n");
|
||||||
dcb_printf(dcb, "\tNo. of descriptors\tNo. of poll completions.\n");
|
dcb_printf(dcb, "\tNo. of descriptors\tNo. of poll completions.\n");
|
||||||
|
int n_threads = config_threadcount();
|
||||||
for (i = 0; i < MAXNFDS - 1; i++)
|
for (i = 0; i < MAXNFDS - 1; i++)
|
||||||
{
|
{
|
||||||
dcb_printf(dcb, "\t%2d\t\t\t%" PRId32 "\n", i + 1, pollStats.n_fds[i]);
|
int64_t v = 0;
|
||||||
|
for (int j = 0; j < n_threads; ++j)
|
||||||
|
{
|
||||||
|
v += pollStats[j].n_fds[i];
|
||||||
|
}
|
||||||
|
|
||||||
|
dcb_printf(dcb, "\t%2d\t\t\t%" PRId64 "\n", i + 1, v);
|
||||||
}
|
}
|
||||||
dcb_printf(dcb, "\t>= %d\t\t\t%" PRId32 "\n", MAXNFDS,
|
int64_t v = 0;
|
||||||
pollStats.n_fds[MAXNFDS - 1]);
|
for (int j = 0; j < n_threads; ++j)
|
||||||
|
{
|
||||||
|
v += pollStats[j].n_fds[MAXNFDS - 1];
|
||||||
|
}
|
||||||
|
dcb_printf(dcb, "\t>= %d\t\t\t%" PRId64 "\n", MAXNFDS, v);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1109,11 +1115,11 @@ dShowEventStats(DCB *pdcb)
|
|||||||
TS_STATS_MAX));
|
TS_STATS_MAX));
|
||||||
dcb_printf(pdcb, "Maximum execution time: %3" PRId64 "00ms\n", ts_stats_get(queueStats.maxexectime,
|
dcb_printf(pdcb, "Maximum execution time: %3" PRId64 "00ms\n", ts_stats_get(queueStats.maxexectime,
|
||||||
TS_STATS_MAX));
|
TS_STATS_MAX));
|
||||||
dcb_printf(pdcb, "Maximum event queue length: %3" PRId64 "\n", ts_stats_get(pollStats.evq_max,
|
dcb_printf(pdcb, "Maximum event queue length: %3" PRId64 "\n", poll_stats_get(&POLL_STATS::evq_max,
|
||||||
TS_STATS_MAX));
|
TS_STATS_MAX));
|
||||||
dcb_printf(pdcb, "Total event queue length: %3" PRId64 "\n", ts_stats_get(pollStats.evq_length,
|
dcb_printf(pdcb, "Total event queue length: %3" PRId64 "\n", poll_stats_get(&POLL_STATS::evq_length,
|
||||||
TS_STATS_SUM));
|
TS_STATS_SUM));
|
||||||
dcb_printf(pdcb, "Average event queue length: %3" PRId64 "\n", ts_stats_get(pollStats.evq_length,
|
dcb_printf(pdcb, "Average event queue length: %3" PRId64 "\n", poll_stats_get(&POLL_STATS::evq_length,
|
||||||
TS_STATS_AVG));
|
TS_STATS_AVG));
|
||||||
dcb_printf(pdcb, "\n");
|
dcb_printf(pdcb, "\n");
|
||||||
dcb_printf(pdcb, " | Number of events\n");
|
dcb_printf(pdcb, " | Number of events\n");
|
||||||
@ -1142,19 +1148,19 @@ poll_get_stat(POLL_STAT stat)
|
|||||||
switch (stat)
|
switch (stat)
|
||||||
{
|
{
|
||||||
case POLL_STAT_READ:
|
case POLL_STAT_READ:
|
||||||
return ts_stats_get(pollStats.n_read, TS_STATS_SUM);
|
return poll_stats_get(&POLL_STATS::n_read, TS_STATS_SUM);
|
||||||
case POLL_STAT_WRITE:
|
case POLL_STAT_WRITE:
|
||||||
return ts_stats_get(pollStats.n_write, TS_STATS_SUM);
|
return poll_stats_get(&POLL_STATS::n_write, TS_STATS_SUM);
|
||||||
case POLL_STAT_ERROR:
|
case POLL_STAT_ERROR:
|
||||||
return ts_stats_get(pollStats.n_error, TS_STATS_SUM);
|
return poll_stats_get(&POLL_STATS::n_error, TS_STATS_SUM);
|
||||||
case POLL_STAT_HANGUP:
|
case POLL_STAT_HANGUP:
|
||||||
return ts_stats_get(pollStats.n_hup, TS_STATS_SUM);
|
return poll_stats_get(&POLL_STATS::n_hup, TS_STATS_SUM);
|
||||||
case POLL_STAT_ACCEPT:
|
case POLL_STAT_ACCEPT:
|
||||||
return ts_stats_get(pollStats.n_accept, TS_STATS_SUM);
|
return poll_stats_get(&POLL_STATS::n_accept, TS_STATS_SUM);
|
||||||
case POLL_STAT_EVQ_LEN:
|
case POLL_STAT_EVQ_LEN:
|
||||||
return ts_stats_get(pollStats.evq_length, TS_STATS_AVG);
|
return poll_stats_get(&POLL_STATS::evq_length, TS_STATS_AVG);
|
||||||
case POLL_STAT_EVQ_MAX:
|
case POLL_STAT_EVQ_MAX:
|
||||||
return ts_stats_get(pollStats.evq_max, TS_STATS_MAX);
|
return poll_stats_get(&POLL_STATS::evq_max, TS_STATS_MAX);
|
||||||
case POLL_STAT_MAX_QTIME:
|
case POLL_STAT_MAX_QTIME:
|
||||||
return ts_stats_get(queueStats.maxqtime, TS_STATS_MAX);
|
return ts_stats_get(queueStats.maxqtime, TS_STATS_MAX);
|
||||||
case POLL_STAT_MAX_EXECTIME:
|
case POLL_STAT_MAX_EXECTIME:
|
||||||
|
@ -30,6 +30,8 @@ using maxscale::Worker;
|
|||||||
|
|
||||||
// TODO: Temporarily moved here.
|
// TODO: Temporarily moved here.
|
||||||
THREAD_DATA *thread_data = NULL;
|
THREAD_DATA *thread_data = NULL;
|
||||||
|
// TODO: Temporarily moved here.
|
||||||
|
POLL_STATS *pollStats = NULL;
|
||||||
|
|
||||||
namespace
|
namespace
|
||||||
{
|
{
|
||||||
@ -129,10 +131,11 @@ void poll_resolve_error(int wid, int fd, int errornum, int op)
|
|||||||
static bool modules_thread_init();
|
static bool modules_thread_init();
|
||||||
static void modules_thread_finish();
|
static void modules_thread_finish();
|
||||||
|
|
||||||
Worker::Worker(int id, int epoll_fd, THREAD_DATA* pThread_data)
|
Worker::Worker(int id, int epoll_fd, THREAD_DATA* pThread_data, POLL_STATS* pPoll_stats)
|
||||||
: m_id(id)
|
: m_id(id)
|
||||||
, m_epoll_fd(epoll_fd)
|
, m_epoll_fd(epoll_fd)
|
||||||
, m_pThread_data(pThread_data)
|
, m_pThread_data(pThread_data)
|
||||||
|
, m_pPoll_stats(pPoll_stats)
|
||||||
, m_pQueue(NULL)
|
, m_pQueue(NULL)
|
||||||
, m_thread(0)
|
, m_thread(0)
|
||||||
, m_started(false)
|
, m_started(false)
|
||||||
@ -154,7 +157,7 @@ void Worker::init()
|
|||||||
{
|
{
|
||||||
this_unit.n_workers = config_threadcount();
|
this_unit.n_workers = config_threadcount();
|
||||||
|
|
||||||
thread_data = (THREAD_DATA *)MXS_CALLOC(this_unit.n_workers, sizeof(THREAD_DATA));
|
thread_data = (THREAD_DATA*)MXS_CALLOC(this_unit.n_workers, sizeof(THREAD_DATA));
|
||||||
if (!thread_data)
|
if (!thread_data)
|
||||||
{
|
{
|
||||||
exit(-1);
|
exit(-1);
|
||||||
@ -165,6 +168,8 @@ void Worker::init()
|
|||||||
thread_data[i].state = THREAD_STOPPED;
|
thread_data[i].state = THREAD_STOPPED;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pollStats = (POLL_STATS*)MXS_CALLOC(this_unit.n_workers, sizeof(POLL_STATS));
|
||||||
|
|
||||||
this_unit.ppWorkers = new (std::nothrow) Worker* [this_unit.n_workers] (); // Zero initialized array
|
this_unit.ppWorkers = new (std::nothrow) Worker* [this_unit.n_workers] (); // Zero initialized array
|
||||||
|
|
||||||
if (!this_unit.ppWorkers)
|
if (!this_unit.ppWorkers)
|
||||||
@ -175,7 +180,7 @@ void Worker::init()
|
|||||||
|
|
||||||
for (int i = 0; i < this_unit.n_workers; ++i)
|
for (int i = 0; i < this_unit.n_workers; ++i)
|
||||||
{
|
{
|
||||||
Worker* pWorker = Worker::create(i, &thread_data[i]);
|
Worker* pWorker = Worker::create(i, &thread_data[i], &pollStats[i]);
|
||||||
|
|
||||||
if (pWorker)
|
if (pWorker)
|
||||||
{
|
{
|
||||||
@ -330,7 +335,7 @@ bool should_shutdown(void* pData)
|
|||||||
void Worker::run()
|
void Worker::run()
|
||||||
{
|
{
|
||||||
this_thread.current_worker_id = m_id;
|
this_thread.current_worker_id = m_id;
|
||||||
poll_waitevents(m_epoll_fd, m_id, m_pThread_data, ::should_shutdown, this);
|
poll_waitevents(m_epoll_fd, m_id, m_pThread_data, m_pPoll_stats, ::should_shutdown, this);
|
||||||
this_thread.current_worker_id = WORKER_ABSENT_ID;
|
this_thread.current_worker_id = WORKER_ABSENT_ID;
|
||||||
|
|
||||||
MXS_NOTICE("Worker %d has shut down.", m_id);
|
MXS_NOTICE("Worker %d has shut down.", m_id);
|
||||||
@ -392,11 +397,12 @@ void Worker::shutdown_all()
|
|||||||
*
|
*
|
||||||
* @param worker_id The id of the worker.
|
* @param worker_id The id of the worker.
|
||||||
* @param pThread_data The thread data of the worker.
|
* @param pThread_data The thread data of the worker.
|
||||||
|
* @param pPoll_stats The poll statistics of the worker.
|
||||||
*
|
*
|
||||||
* @return A worker instance if successful, otherwise NULL.
|
* @return A worker instance if successful, otherwise NULL.
|
||||||
*/
|
*/
|
||||||
//static
|
//static
|
||||||
Worker* Worker::create(int worker_id, THREAD_DATA* pThread_data)
|
Worker* Worker::create(int worker_id, THREAD_DATA* pThread_data, POLL_STATS* pPoll_stats)
|
||||||
{
|
{
|
||||||
Worker* pThis = NULL;
|
Worker* pThis = NULL;
|
||||||
|
|
||||||
@ -404,7 +410,7 @@ Worker* Worker::create(int worker_id, THREAD_DATA* pThread_data)
|
|||||||
|
|
||||||
if (epoll_fd != -1)
|
if (epoll_fd != -1)
|
||||||
{
|
{
|
||||||
pThis = new (std::nothrow) Worker(worker_id, epoll_fd, pThread_data);
|
pThis = new (std::nothrow) Worker(worker_id, epoll_fd, pThread_data, pPoll_stats);
|
||||||
|
|
||||||
if (pThis)
|
if (pThis)
|
||||||
{
|
{
|
||||||
|
Reference in New Issue
Block a user