Move statistics to Worker

Now the statistics is in a single structure and the property of the
Worker instance in question. Methods are provided for obtaining the
statistics of all workers in one go.
This commit is contained in:
Johan Wikman
2017-04-18 14:24:09 +03:00
parent 722d6da46f
commit db3153ee4e
5 changed files with 322 additions and 306 deletions

View File

@ -52,56 +52,6 @@ typedef struct mxs_poll_data
} thread;
} MXS_POLL_DATA;
// TODO: Temporarily moved here.
/**
* The number of buckets used to gather statistics about how many
* descriptors where processed on each epoll completion.
*
* An array of wakeup counts is created, with the number of descriptors used
* to index that array. Each time a completion occurs the n_fds - 1 value is
* used to index this array and increment the count held there.
* If n_fds - 1 >= MAXFDS then the count at MAXFDS -1 is incremented.
*/
#define MAXNFDS 10
// TODO: Temporarily moved here.
typedef struct
{
int64_t n_read; /*< Number of read events */
int64_t n_write; /*< Number of write events */
int64_t n_error; /*< Number of error events */
int64_t n_hup; /*< Number of hangup events */
int64_t n_accept; /*< Number of accept events */
int64_t n_polls; /*< Number of poll cycles */
int64_t n_pollev; /*< Number of polls returning events */
int64_t n_nbpollev; /*< Number of polls returning events */
int32_t n_fds[MAXNFDS]; /*< Number of wakeups with particular n_fds value */
int64_t evq_length; /*< Event queue length */
int64_t evq_max; /*< Maximum event queue length */
int64_t blockingpolls; /*< Number of epoll_waits with a timeout specified */
} POLL_STATS;
// TODO: Temporarily moved here.
extern POLL_STATS* pollStats;
// TODO: Temporarily moved here.
#define N_QUEUE_TIMES 30
// TODO: Temporarily moved here.
/**
* The event queue statistics
*/
typedef struct
{
uint32_t qtimes[N_QUEUE_TIMES + 1];
uint32_t exectimes[N_QUEUE_TIMES + 1];
int64_t maxqtime;
int64_t maxexectime;
} QUEUE_STATS;
// TODO: Temporarily moved here.
extern QUEUE_STATS* queueStats;
/**
* A file descriptor should be added to the poll set of all workers.
*/

View File

@ -50,12 +50,6 @@ enum poll_message
void poll_init();
//void poll_finish(); // TODO: Add this.
void poll_waitevents(int epoll_fd,
int thread_id,
POLL_STATS* poll_stats,
QUEUE_STATS* queue_stats,
bool (*should_terminate)(void* data),
void* data);
void poll_set_maxwait(unsigned int);
void poll_set_nonblocking_polls(unsigned int);

View File

@ -15,11 +15,43 @@
#include <maxscale/cppdefs.hh>
#include <maxscale/platform.h>
#include "messagequeue.hh"
#include "poll.h"
#include "worker.h"
namespace maxscale
{
struct WORKER_STATISTICS
{
WORKER_STATISTICS()
{
memset(this, 0, sizeof(WORKER_STATISTICS));
}
enum
{
MAXNFDS = 10,
N_QUEUE_TIMES = 30
};
int64_t n_read; /*< Number of read events */
int64_t n_write; /*< Number of write events */
int64_t n_error; /*< Number of error events */
int64_t n_hup; /*< Number of hangup events */
int64_t n_accept; /*< Number of accept events */
int64_t n_polls; /*< Number of poll cycles */
int64_t n_pollev; /*< Number of polls returning events */
int64_t n_nbpollev; /*< Number of polls returning events */
int64_t n_fds[MAXNFDS]; /*< Number of wakeups with particular n_fds value */
int64_t evq_length; /*< Event queue length */
int64_t evq_max; /*< Maximum event queue length */
int64_t blockingpolls; /*< Number of epoll_waits with a timeout specified */
uint32_t qtimes[N_QUEUE_TIMES + 1];
uint32_t exectimes[N_QUEUE_TIMES + 1];
int64_t maxqtime;
int64_t maxexectime;
};
class Worker : public MXS_WORKER
, private MessageQueue::Handler
{
@ -27,6 +59,8 @@ class Worker : public MXS_WORKER
Worker& operator = (const Worker&);
public:
typedef WORKER_STATISTICS STATISTICS;
enum state_t
{
STOPPED,
@ -75,6 +109,38 @@ public:
return m_state;
}
/**
* Returns statistics for this worker.
*
* @return The worker specific statistics.
*
* @attentions The statistics may change at any time.
*/
const STATISTICS& statistics() const
{
return m_statistics;
}
/**
* Returns statistics for all workers.
*
* @return Combined statistics.
*
* @attentions The statistics may no longer be accurate by the time it has
* been returned. The returned values may also not represent a
* 100% consistent set.
*/
static STATISTICS get_statistics();
/**
* Return a specific combined statistic value.
*
* @param what What to return.
*
* @return The corresponding value.
*/
static int64_t get_one_statistic(POLL_STAT what);
/**
* Add a file descriptor to the epoll instance of the worker.
*
@ -235,27 +301,22 @@ public:
private:
Worker(int id,
int epoll_fd,
POLL_STATS* pPoll_stats,
QUEUE_STATS* pQueue_stats);
int epoll_fd);
virtual ~Worker();
static Worker* create(int id,
POLL_STATS* pPoll_stats,
QUEUE_STATS* pQueue_stats);
static Worker* create(int id);
void handle_message(MessageQueue& queue, const MessageQueue::Message& msg); // override
static void thread_main(void* arg);
void poll_waitevents(POLL_STATS* poll_stats, QUEUE_STATS* queue_stats);
void poll_waitevents();
private:
int m_id; /*< The id of the worker. */
state_t m_state; /*< The state of the worker */
int m_epoll_fd; /*< The epoll file descriptor. */
POLL_STATS* m_pPoll_stats; /*< Statistics for worker. */
QUEUE_STATS* m_pQueue_stats; /*< Statistics for queue. */
STATISTICS m_statistics; /*< Worker statistics. */
MessageQueue* m_pQueue; /*< The message queue of the worker. */
THREAD m_thread; /*< The thread handle of the worker. */
bool m_started; /*< Whether the thread has been started or not. */

View File

@ -232,57 +232,6 @@ spin_reporter(void *dcb, char *desc, int value)
dcb_printf((DCB *)dcb, "\t%-40s %d\n", desc, value);
}
namespace
{
template<class T>
int64_t stats_get(T* ts, int64_t T::*what, enum ts_stats_type type)
{
int64_t best = type == TS_STATS_MAX ? LONG_MIN : (type == TS_STATS_MIX ? LONG_MAX : 0);
for (int i = 0; i < n_threads; ++i)
{
T* t = &ts[i];
int64_t value = t->*what;
switch (type)
{
case TS_STATS_MAX:
if (value > best)
{
best = value;
}
break;
case TS_STATS_MIX:
if (value < best)
{
best = value;
}
break;
case TS_STATS_AVG:
case TS_STATS_SUM:
best += value;
break;
}
}
return type == TS_STATS_AVG ? best / n_threads : best;
}
inline int64_t poll_stats_get(int64_t POLL_STATS::*what, enum ts_stats_type type)
{
return stats_get(pollStats, what, type);
}
inline int64_t queue_stats_get(int64_t QUEUE_STATS::*what, enum ts_stats_type type)
{
return stats_get(queueStats, what, type);
}
}
/**
* Debug routine to print the polling statistics
*
@ -293,50 +242,31 @@ dprintPollStats(DCB *dcb)
{
int i;
Worker::STATISTICS s = Worker::get_statistics();
dcb_printf(dcb, "\nPoll Statistics.\n\n");
dcb_printf(dcb, "No. of epoll cycles: %" PRId64 "\n",
poll_stats_get(&POLL_STATS::n_polls, TS_STATS_SUM));
dcb_printf(dcb, "No. of epoll cycles with wait: %" PRId64 "\n",
poll_stats_get(&POLL_STATS::blockingpolls, TS_STATS_SUM));
dcb_printf(dcb, "No. of epoll calls returning events: %" PRId64 "\n",
poll_stats_get(&POLL_STATS::n_pollev, TS_STATS_SUM));
dcb_printf(dcb, "No. of non-blocking calls returning events: %" PRId64 "\n",
poll_stats_get(&POLL_STATS::n_nbpollev, TS_STATS_SUM));
dcb_printf(dcb, "No. of read events: %" PRId64 "\n",
poll_stats_get(&POLL_STATS::n_read, TS_STATS_SUM));
dcb_printf(dcb, "No. of write events: %" PRId64 "\n",
poll_stats_get(&POLL_STATS::n_write, TS_STATS_SUM));
dcb_printf(dcb, "No. of error events: %" PRId64 "\n",
poll_stats_get(&POLL_STATS::n_error, TS_STATS_SUM));
dcb_printf(dcb, "No. of hangup events: %" PRId64 "\n",
poll_stats_get(&POLL_STATS::n_hup, TS_STATS_SUM));
dcb_printf(dcb, "No. of accept events: %" PRId64 "\n",
poll_stats_get(&POLL_STATS::n_accept, TS_STATS_SUM));
dcb_printf(dcb, "Total event queue length: %" PRId64 "\n",
poll_stats_get(&POLL_STATS::evq_length, TS_STATS_AVG));
dcb_printf(dcb, "Average event queue length: %" PRId64 "\n",
poll_stats_get(&POLL_STATS::evq_length, TS_STATS_AVG));
dcb_printf(dcb, "Maximum event queue length: %" PRId64 "\n",
poll_stats_get(&POLL_STATS::evq_max, TS_STATS_MAX));
dcb_printf(dcb, "No. of epoll cycles: %" PRId64 "\n", s.n_polls);
dcb_printf(dcb, "No. of epoll cycles with wait: %" PRId64 "\n", s.blockingpolls);
dcb_printf(dcb, "No. of epoll calls returning events: %" PRId64 "\n", s.n_pollev);
dcb_printf(dcb, "No. of non-blocking calls returning events: %" PRId64 "\n", s.n_nbpollev);
dcb_printf(dcb, "No. of read events: %" PRId64 "\n", s.n_read);
dcb_printf(dcb, "No. of write events: %" PRId64 "\n", s.n_write);
dcb_printf(dcb, "No. of error events: %" PRId64 "\n", s.n_error);
dcb_printf(dcb, "No. of hangup events: %" PRId64 "\n", s.n_hup);
dcb_printf(dcb, "No. of accept events: %" PRId64 "\n", s.n_accept);
dcb_printf(dcb, "Total event queue length: %" PRId64 "\n", s.evq_length);
dcb_printf(dcb, "Average event queue length: %" PRId64 "\n", s.evq_length);
dcb_printf(dcb, "Maximum event queue length: %" PRId64 "\n", s.evq_max);
dcb_printf(dcb, "No of poll completions with descriptors\n");
dcb_printf(dcb, "\tNo. of descriptors\tNo. of poll completions.\n");
for (i = 0; i < MAXNFDS - 1; i++)
for (i = 0; i < Worker::STATISTICS::MAXNFDS - 1; i++)
{
int64_t v = 0;
for (int j = 0; j < n_threads; ++j)
{
v += pollStats[j].n_fds[i];
}
dcb_printf(dcb, "\t%2d\t\t\t%" PRId64 "\n", i + 1, s.n_fds[i]);
}
dcb_printf(dcb, "\t%2d\t\t\t%" PRId64 "\n", i + 1, v);
}
int64_t v = 0;
for (int j = 0; j < n_threads; ++j)
{
v += pollStats[j].n_fds[MAXNFDS - 1];
}
dcb_printf(dcb, "\t>= %d\t\t\t%" PRId64 "\n", MAXNFDS, v);
dcb_printf(dcb, "\t>= %d\t\t\t%" PRId64 "\n",
Worker::STATISTICS::MAXNFDS, s.n_fds[Worker::STATISTICS::MAXNFDS - 1]);
}
@ -385,29 +315,6 @@ dShowThreads(DCB *dcb)
}
}
namespace
{
void get_queue_times(int index, int32_t* qtimes, int32_t* exectimes)
{
int64_t q = 0;
int64_t e = 0;
for (int j = 0; j < n_threads; ++j)
{
q += queueStats[j].qtimes[index];
e += queueStats[j].exectimes[index];
}
q /= n_threads;
e /= n_threads;
*qtimes = q;
*exectimes = e;
}
}
/**
* Print the event queue statistics
*
@ -418,72 +325,50 @@ dShowEventStats(DCB *pdcb)
{
int i;
Worker::STATISTICS s = Worker::get_statistics();
dcb_printf(pdcb, "\nEvent statistics.\n");
dcb_printf(pdcb, "Maximum queue time: %3" PRId64 "00ms\n",
queue_stats_get(&QUEUE_STATS::maxqtime, TS_STATS_MAX));
dcb_printf(pdcb, "Maximum execution time: %3" PRId64 "00ms\n",
queue_stats_get(&QUEUE_STATS::maxexectime, TS_STATS_MAX));
dcb_printf(pdcb, "Maximum event queue length: %3" PRId64 "\n",
poll_stats_get(&POLL_STATS::evq_max, TS_STATS_MAX));
dcb_printf(pdcb, "Total event queue length: %3" PRId64 "\n",
poll_stats_get(&POLL_STATS::evq_length, TS_STATS_SUM));
dcb_printf(pdcb, "Average event queue length: %3" PRId64 "\n",
poll_stats_get(&POLL_STATS::evq_length, TS_STATS_AVG));
dcb_printf(pdcb, "Maximum queue time: %3" PRId64 "00ms\n", s.maxqtime);
dcb_printf(pdcb, "Maximum execution time: %3" PRId64 "00ms\n", s.maxexectime);
dcb_printf(pdcb, "Maximum event queue length: %3" PRId64 "\n", s.evq_max);
dcb_printf(pdcb, "Average event queue length: %3" PRId64 "\n", s.evq_length);
dcb_printf(pdcb, "\n");
dcb_printf(pdcb, " | Number of events\n");
dcb_printf(pdcb, "Duration | Queued | Executed\n");
dcb_printf(pdcb, "---------------+------------+-----------\n");
int32_t qtimes;
int32_t exectimes;
dcb_printf(pdcb, " < 100ms | %-10d | %-10d\n", s.qtimes[0], s.exectimes[0]);
get_queue_times(0, &qtimes, &exectimes);
dcb_printf(pdcb, " < 100ms | %-10d | %-10d\n", qtimes, exectimes);
for (i = 1; i < N_QUEUE_TIMES; i++)
for (i = 1; i < Worker::STATISTICS::N_QUEUE_TIMES; i++)
{
get_queue_times(i, &qtimes, &exectimes);
dcb_printf(pdcb, " %2d00 - %2d00ms | %-10d | %-10d\n", i, i + 1, qtimes, exectimes);
dcb_printf(pdcb, " %2d00 - %2d00ms | %-10d | %-10d\n", i, i + 1, s.qtimes[i], s.exectimes[i]);
}
get_queue_times(N_QUEUE_TIMES, &qtimes, &exectimes);
dcb_printf(pdcb, " > %2d00ms | %-10d | %-10d\n", N_QUEUE_TIMES, qtimes, exectimes);
dcb_printf(pdcb, " > %2d00ms | %-10d | %-10d\n", Worker::STATISTICS::N_QUEUE_TIMES,
s.qtimes[Worker::STATISTICS::N_QUEUE_TIMES], s.exectimes[Worker::STATISTICS::N_QUEUE_TIMES]);
}
/**
* Return a poll statistic from the polling subsystem
*
* @param stat The required statistic
* @param what The required statistic
* @return The value of that statistic
*/
int
poll_get_stat(POLL_STAT stat)
poll_get_stat(POLL_STAT what)
{
switch (stat)
{
case POLL_STAT_READ:
return poll_stats_get(&POLL_STATS::n_read, TS_STATS_SUM);
case POLL_STAT_WRITE:
return poll_stats_get(&POLL_STATS::n_write, TS_STATS_SUM);
case POLL_STAT_ERROR:
return poll_stats_get(&POLL_STATS::n_error, TS_STATS_SUM);
case POLL_STAT_HANGUP:
return poll_stats_get(&POLL_STATS::n_hup, TS_STATS_SUM);
case POLL_STAT_ACCEPT:
return poll_stats_get(&POLL_STATS::n_accept, TS_STATS_SUM);
case POLL_STAT_EVQ_LEN:
return poll_stats_get(&POLL_STATS::evq_length, TS_STATS_AVG);
case POLL_STAT_EVQ_MAX:
return poll_stats_get(&POLL_STATS::evq_max, TS_STATS_MAX);
case POLL_STAT_MAX_QTIME:
return queue_stats_get(&QUEUE_STATS::maxqtime, TS_STATS_MAX);
case POLL_STAT_MAX_EXECTIME:
return queue_stats_get(&QUEUE_STATS::maxexectime, TS_STATS_MAX);
default:
ss_dassert(false);
break;
}
return 0;
return Worker::get_one_statistic(what);
}
namespace
{
struct EVENT_TIMES_CB_DATA
{
int rowno;
Worker::STATISTICS* stats;
};
}
/**
@ -494,44 +379,43 @@ poll_get_stat(POLL_STAT stat)
* @return The next row or NULL
*/
static RESULT_ROW *
eventTimesRowCallback(RESULTSET *set, void *data)
eventTimesRowCallback(RESULTSET *set, void *v)
{
int *rowno = (int *)data;
EVENT_TIMES_CB_DATA* data = (EVENT_TIMES_CB_DATA*)v;
char buf[40];
RESULT_ROW *row;
if (*rowno >= N_QUEUE_TIMES)
if (data->rowno >= Worker::STATISTICS::N_QUEUE_TIMES)
{
MXS_FREE(data);
return NULL;
}
row = resultset_make_row(set);
if (*rowno == 0)
if (data->rowno == 0)
{
resultset_row_set(row, 0, "< 100ms");
}
else if (*rowno == N_QUEUE_TIMES - 1)
else if (data->rowno == Worker::STATISTICS::N_QUEUE_TIMES - 1)
{
snprintf(buf, 39, "> %2d00ms", N_QUEUE_TIMES);
snprintf(buf, 39, "> %2d00ms", Worker::STATISTICS::N_QUEUE_TIMES);
buf[39] = '\0';
resultset_row_set(row, 0, buf);
}
else
{
snprintf(buf, 39, "%2d00 - %2d00ms", *rowno, (*rowno) + 1);
snprintf(buf, 39, "%2d00 - %2d00ms", data->rowno, data->rowno + 1);
buf[39] = '\0';
resultset_row_set(row, 0, buf);
}
int32_t qtimes;
int32_t exectimes;
get_queue_times(*rowno, &qtimes, &exectimes);
snprintf(buf, 39, "%u", qtimes);
snprintf(buf, 39, "%u", data->stats->qtimes[data->rowno]);
buf[39] = '\0';
resultset_row_set(row, 1, buf);
snprintf(buf, 39, "%u", exectimes);
snprintf(buf, 39, "%u", data->stats->exectimes[data->rowno]);
buf[39] = '\0';
resultset_row_set(row, 2, buf);
(*rowno)++;
data->rowno++;
return row;
}
@ -544,13 +428,18 @@ RESULTSET *
eventTimesGetList()
{
RESULTSET *set;
int *data;
EVENT_TIMES_CB_DATA *data;
if ((data = (int *)MXS_MALLOC(sizeof(int))) == NULL)
if ((data = (EVENT_TIMES_CB_DATA*)MXS_MALLOC(sizeof(EVENT_TIMES_CB_DATA))) == NULL)
{
return NULL;
}
*data = 0;
Worker::STATISTICS s = Worker::get_statistics();
data->rowno = 0;
data->stats = &s;
if ((set = resultset_create(eventTimesRowCallback, data)) == NULL)
{
MXS_FREE(data);

View File

@ -25,16 +25,12 @@
#include <maxscale/platform.h>
#include "maxscale/modules.h"
#include "maxscale/poll.h"
#include "maxscale/statistics.h"
#define WORKER_ABSENT_ID -1
using maxscale::Worker;
// TODO: Temporarily moved here.
POLL_STATS *pollStats = NULL;
// TODO: Temporarily moved here.
QUEUE_STATS* queueStats = NULL;
namespace
{
@ -137,14 +133,10 @@ static bool modules_thread_init();
static void modules_thread_finish();
Worker::Worker(int id,
int epoll_fd,
POLL_STATS* pPoll_stats,
QUEUE_STATS* pQueue_stats)
int epoll_fd)
: m_id(id)
, m_state(STOPPED)
, m_epoll_fd(epoll_fd)
, m_pPoll_stats(pPoll_stats)
, m_pQueue_stats(pQueue_stats)
, m_pQueue(NULL)
, m_thread(0)
, m_started(false)
@ -168,18 +160,6 @@ void Worker::init()
this_unit.number_poll_spins = config_nbpolls();
this_unit.max_poll_sleep = config_pollsleep();
pollStats = (POLL_STATS*)MXS_CALLOC(this_unit.n_workers, sizeof(POLL_STATS));
if (!pollStats)
{
exit(-1);
}
queueStats = (QUEUE_STATS*)MXS_CALLOC(this_unit.n_workers, sizeof(QUEUE_STATS));
if (!queueStats)
{
exit(-1);
}
this_unit.ppWorkers = new (std::nothrow) Worker* [this_unit.n_workers] (); // Zero initialized array
if (!this_unit.ppWorkers)
@ -190,7 +170,7 @@ void Worker::init()
for (int i = 0; i < this_unit.n_workers; ++i)
{
Worker* pWorker = Worker::create(i, &pollStats[i], &queueStats[i]);
Worker* pWorker = Worker::create(i);
if (pWorker)
{
@ -217,6 +197,165 @@ void Worker::finish()
}
}
namespace
{
int64_t one_stats_get(int64_t Worker::STATISTICS::*what, enum ts_stats_type type)
{
int64_t best = type == TS_STATS_MAX ? LONG_MIN : (type == TS_STATS_MIX ? LONG_MAX : 0);
for (int i = 0; i < this_unit.n_workers; ++i)
{
Worker* pWorker = Worker::get(i);
ss_dassert(pWorker);
const Worker::STATISTICS& s = pWorker->statistics();
int64_t value = s.*what;
switch (type)
{
case TS_STATS_MAX:
if (value > best)
{
best = value;
}
break;
case TS_STATS_MIX:
if (value < best)
{
best = value;
}
break;
case TS_STATS_AVG:
case TS_STATS_SUM:
best += value;
break;
}
}
return type == TS_STATS_AVG ? best / this_unit.n_workers : best;
}
}
//static
Worker::STATISTICS Worker::get_statistics()
{
STATISTICS cs;
cs.n_read = one_stats_get(&STATISTICS::n_read, TS_STATS_SUM);
cs.n_write = one_stats_get(&STATISTICS::n_write, TS_STATS_SUM);
cs.n_error = one_stats_get(&STATISTICS::n_error, TS_STATS_SUM);
cs.n_hup = one_stats_get(&STATISTICS::n_hup, TS_STATS_SUM);
cs.n_accept = one_stats_get(&STATISTICS::n_accept, TS_STATS_SUM);
cs.n_polls = one_stats_get(&STATISTICS::n_polls, TS_STATS_SUM);
cs.n_pollev = one_stats_get(&STATISTICS::n_pollev, TS_STATS_SUM);
cs.n_nbpollev = one_stats_get(&STATISTICS::n_nbpollev, TS_STATS_SUM);
cs.evq_length = one_stats_get(&STATISTICS::evq_length, TS_STATS_AVG);
cs.evq_max = one_stats_get(&STATISTICS::evq_max, TS_STATS_MAX);
cs.blockingpolls = one_stats_get(&STATISTICS::blockingpolls, TS_STATS_SUM);
cs.maxqtime = one_stats_get(&STATISTICS::maxqtime, TS_STATS_MAX);
cs.maxexectime = one_stats_get(&STATISTICS::maxexectime, TS_STATS_MAX);
for (int i = 0; i < Worker::STATISTICS::MAXNFDS - 1; i++)
{
for (int j = 0; j < this_unit.n_workers; ++j)
{
Worker* pWorker = Worker::get(j);
ss_dassert(pWorker);
cs.n_fds[i] += pWorker->statistics().n_fds[i];
}
}
for (int i = 0; i <= Worker::STATISTICS::N_QUEUE_TIMES; ++i)
{
for (int j = 0; j < this_unit.n_workers; ++j)
{
Worker* pWorker = Worker::get(j);
ss_dassert(pWorker);
cs.qtimes[i] += pWorker->statistics().qtimes[i];
cs.exectimes[i] += pWorker->statistics().exectimes[i];
}
cs.qtimes[i] /= this_unit.n_workers;
cs.exectimes[i] /= this_unit.n_workers;
}
return cs;
}
//static
int64_t Worker::get_one_statistic(POLL_STAT what)
{
int64_t rv = 0;
int64_t Worker::STATISTICS::*member = NULL;
enum ts_stats_type approach;
switch (what)
{
case POLL_STAT_READ:
member = &Worker::STATISTICS::n_read;
approach = TS_STATS_SUM;
break;
case POLL_STAT_WRITE:
member = &Worker::STATISTICS::n_write;
approach = TS_STATS_SUM;
break;
case POLL_STAT_ERROR:
member = &Worker::STATISTICS::n_error;
approach = TS_STATS_SUM;
break;
case POLL_STAT_HANGUP:
member = &Worker::STATISTICS::n_hup;
approach = TS_STATS_SUM;
break;
case POLL_STAT_ACCEPT:
member = &Worker::STATISTICS::n_accept;
approach = TS_STATS_SUM;
break;
case POLL_STAT_EVQ_LEN:
member = &Worker::STATISTICS::evq_length;
approach = TS_STATS_AVG;
break;
case POLL_STAT_EVQ_MAX:
member = &Worker::STATISTICS::evq_max;
approach = TS_STATS_MAX;
break;
case POLL_STAT_MAX_QTIME:
member = &Worker::STATISTICS::maxqtime;
approach = TS_STATS_MAX;
break;
case POLL_STAT_MAX_EXECTIME:
member = &Worker::STATISTICS::maxexectime;
approach = TS_STATS_MAX;
break;
default:
ss_dassert(!true);
}
if (member)
{
rv = one_stats_get(member, approach);
}
return rv;
}
bool Worker::add_fd(int fd, uint32_t events, MXS_POLL_DATA* pData)
{
bool rv = true;
@ -344,20 +483,10 @@ size_t mxs_worker_broadcast_message(uint32_t msg_id, intptr_t arg1, intptr_t arg
return Worker::broadcast_message(msg_id, arg1, arg2);
}
namespace
{
bool should_shutdown(void* pData)
{
return static_cast<Worker*>(pData)->should_shutdown();
}
}
void Worker::run()
{
this_thread.current_worker_id = m_id;
poll_waitevents(m_pPoll_stats, m_pQueue_stats);
poll_waitevents();
this_thread.current_worker_id = WORKER_ABSENT_ID;
MXS_NOTICE("Worker %d has shut down.", m_id);
@ -417,16 +546,12 @@ void Worker::shutdown_all()
* - Creates a pipe.
* - Adds the read descriptor to the polling mechanism.
*
* @param worker_id The id of the worker.
* @param pPoll_stats The poll statistics of the worker.
* @param pQueue_stats The queue statistics of the worker.
* @param worker_id The id of the worker.
*
* @return A worker instance if successful, otherwise NULL.
*/
//static
Worker* Worker::create(int worker_id,
POLL_STATS* pPoll_stats,
QUEUE_STATS* pQueue_stats)
Worker* Worker::create(int worker_id)
{
Worker* pThis = NULL;
@ -434,7 +559,7 @@ Worker* Worker::create(int worker_id,
if (epoll_fd != -1)
{
pThis = new (std::nothrow) Worker(worker_id, epoll_fd, pPoll_stats, pQueue_stats);
pThis = new (std::nothrow) Worker(worker_id, epoll_fd);
if (pThis)
{
@ -538,11 +663,8 @@ void Worker::thread_main(void* pArg)
/**
* The main polling loop
*
* @param poll_stats The polling stats of the calling thread.
* @param queue_stats The queue stats of the calling thread.
*/
void Worker::poll_waitevents(POLL_STATS* poll_stats, QUEUE_STATS* queue_stats)
void Worker::poll_waitevents()
{
struct epoll_event events[MAX_EVENTS];
int i, nfds, timeout_bias = 1;
@ -554,7 +676,7 @@ void Worker::poll_waitevents(POLL_STATS* poll_stats, QUEUE_STATS* queue_stats)
{
m_state = POLLING;
atomic_add_int64(&poll_stats->n_polls, 1);
atomic_add_int64(&m_statistics.n_polls, 1);
if ((nfds = epoll_wait(m_epoll_fd, events, MAX_EVENTS, 0)) == -1)
{
int eno = errno;
@ -579,7 +701,7 @@ void Worker::poll_waitevents(POLL_STATS* poll_stats, QUEUE_STATS* queue_stats)
{
timeout_bias++;
}
atomic_add_int64(&poll_stats->blockingpolls, 1);
atomic_add_int64(&m_statistics.blockingpolls, 1);
nfds = epoll_wait(m_epoll_fd,
events,
MAX_EVENTS,
@ -592,26 +714,26 @@ void Worker::poll_waitevents(POLL_STATS* poll_stats, QUEUE_STATS* queue_stats)
if (nfds > 0)
{
poll_stats->evq_length = nfds;
if (nfds > poll_stats->evq_max)
m_statistics.evq_length = nfds;
if (nfds > m_statistics.evq_max)
{
poll_stats->evq_max = nfds;
m_statistics.evq_max = nfds;
}
timeout_bias = 1;
if (poll_spins <= this_unit.number_poll_spins + 1)
{
atomic_add_int64(&poll_stats->n_nbpollev, 1);
atomic_add_int64(&m_statistics.n_nbpollev, 1);
}
poll_spins = 0;
MXS_DEBUG("%lu [poll_waitevents] epoll_wait found %d fds",
pthread_self(),
nfds);
atomic_add_int64(&poll_stats->n_pollev, 1);
atomic_add_int64(&m_statistics.n_pollev, 1);
m_state = PROCESSING;
poll_stats->n_fds[(nfds < MAXNFDS ? (nfds - 1) : MAXNFDS - 1)]++;
m_statistics.n_fds[(nfds < STATISTICS::MAXNFDS ? (nfds - 1) : STATISTICS::MAXNFDS - 1)]++;
}
uint64_t cycle_start = hkheartbeat;
@ -622,16 +744,16 @@ void Worker::poll_waitevents(POLL_STATS* poll_stats, QUEUE_STATS* queue_stats)
int64_t started = hkheartbeat;
int64_t qtime = started - cycle_start;
if (qtime > N_QUEUE_TIMES)
if (qtime > STATISTICS::N_QUEUE_TIMES)
{
queue_stats->qtimes[N_QUEUE_TIMES]++;
m_statistics.qtimes[STATISTICS::N_QUEUE_TIMES]++;
}
else
{
queue_stats->qtimes[qtime]++;
m_statistics.qtimes[qtime]++;
}
queue_stats->maxqtime = MXS_MAX(queue_stats->maxqtime, qtime);
m_statistics.maxqtime = MXS_MAX(m_statistics.maxqtime, qtime);
MXS_POLL_DATA *data = (MXS_POLL_DATA*)events[i].data.ptr;
@ -639,42 +761,42 @@ void Worker::poll_waitevents(POLL_STATS* poll_stats, QUEUE_STATS* queue_stats)
if (actions & MXS_POLL_ACCEPT)
{
atomic_add_int64(&poll_stats->n_accept, 1);
atomic_add_int64(&m_statistics.n_accept, 1);
}
if (actions & MXS_POLL_READ)
{
atomic_add_int64(&poll_stats->n_read, 1);
atomic_add_int64(&m_statistics.n_read, 1);
}
if (actions & MXS_POLL_WRITE)
{
atomic_add_int64(&poll_stats->n_write, 1);
atomic_add_int64(&m_statistics.n_write, 1);
}
if (actions & MXS_POLL_HUP)
{
atomic_add_int64(&poll_stats->n_hup, 1);
atomic_add_int64(&m_statistics.n_hup, 1);
}
if (actions & MXS_POLL_ERROR)
{
atomic_add_int64(&poll_stats->n_error, 1);
atomic_add_int64(&m_statistics.n_error, 1);
}
/** Calculate event execution statistics */
qtime = hkheartbeat - started;
if (qtime > N_QUEUE_TIMES)
if (qtime > STATISTICS::N_QUEUE_TIMES)
{
queue_stats->exectimes[N_QUEUE_TIMES]++;
m_statistics.exectimes[STATISTICS::N_QUEUE_TIMES]++;
}
else
{
queue_stats->exectimes[qtime % N_QUEUE_TIMES]++;
m_statistics.exectimes[qtime % STATISTICS::N_QUEUE_TIMES]++;
}
queue_stats->maxexectime = MXS_MAX(queue_stats->maxexectime, qtime);
m_statistics.maxexectime = MXS_MAX(m_statistics.maxexectime, qtime);
}
dcb_process_idle_sessions(m_id);