Move queue statistics to Worker
Just like the thread stats and poll stats earlier, the queue stats are now moved to worker. A litte refactoring still, and the polling will only work on local data.
This commit is contained in:
parent
76825eb2c5
commit
f952a11eb8
@ -114,6 +114,24 @@ typedef struct
|
||||
// TODO: Temporarily moved here.
|
||||
extern POLL_STATS* pollStats;
|
||||
|
||||
// TODO: Temporarily moved here.
|
||||
#define N_QUEUE_TIMES 30
|
||||
|
||||
// TODO: Temporarily moved here.
|
||||
/**
|
||||
* The event queue statistics
|
||||
*/
|
||||
typedef struct
|
||||
{
|
||||
uint32_t qtimes[N_QUEUE_TIMES + 1];
|
||||
uint32_t exectimes[N_QUEUE_TIMES + 1];
|
||||
int64_t maxqtime;
|
||||
int64_t maxexectime;
|
||||
} QUEUE_STATS;
|
||||
|
||||
// TODO: Temporarily moved here.
|
||||
extern QUEUE_STATS* queueStats;
|
||||
|
||||
/**
|
||||
* A file descriptor should be added to the poll set of all workers.
|
||||
*/
|
||||
|
@ -53,7 +53,8 @@ void poll_init();
|
||||
void poll_waitevents(int epoll_fd,
|
||||
int thread_id,
|
||||
THREAD_DATA* thread_data,
|
||||
POLL_STATS* pollStats,
|
||||
POLL_STATS* poll_stats,
|
||||
QUEUE_STATS* queue_stats,
|
||||
bool (*should_terminate)(void* data),
|
||||
void* data);
|
||||
void poll_set_maxwait(unsigned int);
|
||||
|
@ -197,10 +197,17 @@ public:
|
||||
static int get_current_id();
|
||||
|
||||
private:
|
||||
Worker(int id, int epoll_fd, THREAD_DATA* pThread_data, POLL_STATS* pPoll_stats);
|
||||
Worker(int id,
|
||||
int epoll_fd,
|
||||
THREAD_DATA* pThread_data,
|
||||
POLL_STATS* pPoll_stats,
|
||||
QUEUE_STATS* pQueue_stats);
|
||||
virtual ~Worker();
|
||||
|
||||
static Worker* create(int id, THREAD_DATA* pThread_data, POLL_STATS* pPoll_stats);
|
||||
static Worker* create(int id,
|
||||
THREAD_DATA* pThread_data,
|
||||
POLL_STATS* pPoll_stats,
|
||||
QUEUE_STATS* pQueue_stats);
|
||||
|
||||
void handle_message(MessageQueue& queue, const MessageQueue::Message& msg); // override
|
||||
|
||||
@ -211,6 +218,7 @@ private:
|
||||
int m_epoll_fd; /*< The epoll file descriptor. */
|
||||
THREAD_DATA* m_pThread_data; /*< The thread data of the worker. */
|
||||
POLL_STATS* m_pPoll_stats; /*< Statistics for worker. */
|
||||
QUEUE_STATS* m_pQueue_stats; /*< Statistics for queue. */
|
||||
MessageQueue* m_pQueue; /*< The message queue of the worker. */
|
||||
THREAD m_thread; /*< The thread handle of the worker. */
|
||||
bool m_started; /*< Whether the thread has been started or not. */
|
||||
|
@ -110,18 +110,6 @@ static int n_avg_samples;
|
||||
/* Thread statistics data */
|
||||
static int n_threads; /*< No. of threads */
|
||||
|
||||
#define N_QUEUE_TIMES 30
|
||||
/**
|
||||
* The event queue statistics
|
||||
*/
|
||||
static struct
|
||||
{
|
||||
uint32_t qtimes[N_QUEUE_TIMES + 1];
|
||||
uint32_t exectimes[N_QUEUE_TIMES + 1];
|
||||
ts_stats_t maxqtime;
|
||||
ts_stats_t maxexectime;
|
||||
} queueStats;
|
||||
|
||||
/**
|
||||
* How frequently to call the poll_loadav function used to monitor the load
|
||||
* average of the poll subsystem.
|
||||
@ -147,15 +135,6 @@ poll_init()
|
||||
exit(-1);
|
||||
}
|
||||
|
||||
memset(&queueStats, 0, sizeof(queueStats));
|
||||
|
||||
if ((queueStats.maxqtime = ts_stats_alloc()) == NULL ||
|
||||
(queueStats.maxexectime = ts_stats_alloc()) == NULL)
|
||||
{
|
||||
MXS_OOM_MESSAGE("FATAL: Could not allocate statistics data.");
|
||||
exit(-1);
|
||||
}
|
||||
|
||||
hktask_add("Load Average", poll_loadav, NULL, POLL_LOAD_FREQ);
|
||||
n_avg_samples = 15 * 60 / POLL_LOAD_FREQ;
|
||||
avg_samples = (double *)MXS_MALLOC(sizeof(double) * n_avg_samples);
|
||||
@ -296,6 +275,8 @@ bool poll_remove_fd_from_worker(int wid, int fd)
|
||||
* @param epoll_fd The epoll descriptor.
|
||||
* @param thread_id The id of the calling thread.
|
||||
* @param thread_data The thread data of the calling thread.
|
||||
* @param poll_stats The polling stats of the calling thread.
|
||||
* @param queue_stats The queue stats of the calling thread.
|
||||
* @param should_shutdown Pointer to function returning true if the polling should
|
||||
* be terminated.
|
||||
* @param data Data provided to the @c should_shutdown function.
|
||||
@ -304,6 +285,7 @@ void poll_waitevents(int epoll_fd,
|
||||
int thread_id,
|
||||
THREAD_DATA* thread_data,
|
||||
POLL_STATS* poll_stats,
|
||||
QUEUE_STATS* queue_stats,
|
||||
bool (*should_shutdown)(void* data),
|
||||
void* data)
|
||||
{
|
||||
@ -412,19 +394,19 @@ void poll_waitevents(int epoll_fd,
|
||||
for (int i = 0; i < nfds; i++)
|
||||
{
|
||||
/** Calculate event queue statistics */
|
||||
uint64_t started = hkheartbeat;
|
||||
uint64_t qtime = started - thread_data->cycle_start;
|
||||
int64_t started = hkheartbeat;
|
||||
int64_t qtime = started - thread_data->cycle_start;
|
||||
|
||||
if (qtime > N_QUEUE_TIMES)
|
||||
{
|
||||
queueStats.qtimes[N_QUEUE_TIMES]++;
|
||||
queue_stats->qtimes[N_QUEUE_TIMES]++;
|
||||
}
|
||||
else
|
||||
{
|
||||
queueStats.qtimes[qtime]++;
|
||||
queue_stats->qtimes[qtime]++;
|
||||
}
|
||||
|
||||
ts_stats_set_max(queueStats.maxqtime, qtime, thread_id);
|
||||
queue_stats->maxqtime = MXS_MAX(queue_stats->maxqtime, qtime);
|
||||
|
||||
MXS_POLL_DATA *data = (MXS_POLL_DATA*)events[i].data.ptr;
|
||||
thread_data->cur_data = data;
|
||||
@ -462,14 +444,14 @@ void poll_waitevents(int epoll_fd,
|
||||
|
||||
if (qtime > N_QUEUE_TIMES)
|
||||
{
|
||||
queueStats.exectimes[N_QUEUE_TIMES]++;
|
||||
queue_stats->exectimes[N_QUEUE_TIMES]++;
|
||||
}
|
||||
else
|
||||
{
|
||||
queueStats.exectimes[qtime % N_QUEUE_TIMES]++;
|
||||
queue_stats->exectimes[qtime % N_QUEUE_TIMES]++;
|
||||
}
|
||||
|
||||
ts_stats_set_max(queueStats.maxexectime, qtime, thread_id);
|
||||
queue_stats->maxexectime = MXS_MAX(queue_stats->maxexectime, qtime);
|
||||
}
|
||||
|
||||
dcb_process_idle_sessions(thread_id);
|
||||
@ -785,16 +767,18 @@ spin_reporter(void *dcb, char *desc, int value)
|
||||
dcb_printf((DCB *)dcb, "\t%-40s %d\n", desc, value);
|
||||
}
|
||||
|
||||
static int64_t poll_stats_get(int64_t POLL_STATS::*what, enum ts_stats_type type)
|
||||
namespace
|
||||
{
|
||||
|
||||
template<class T>
|
||||
int64_t stats_get(T* ts, int64_t T::*what, enum ts_stats_type type)
|
||||
{
|
||||
int64_t best = type == TS_STATS_MAX ? LONG_MIN : (type == TS_STATS_MIX ? LONG_MAX : 0);
|
||||
|
||||
size_t n_threads = config_threadcount();
|
||||
|
||||
for (size_t i = 0; i < n_threads; ++i)
|
||||
for (int i = 0; i < n_threads; ++i)
|
||||
{
|
||||
POLL_STATS* pollStat = &pollStats[i];
|
||||
int64_t value = pollStat->*what;
|
||||
T* t = &ts[i];
|
||||
int64_t value = t->*what;
|
||||
|
||||
switch (type)
|
||||
{
|
||||
@ -822,6 +806,17 @@ static int64_t poll_stats_get(int64_t POLL_STATS::*what, enum ts_stats_type type
|
||||
return type == TS_STATS_AVG ? best / n_threads : best;
|
||||
}
|
||||
|
||||
inline int64_t poll_stats_get(int64_t POLL_STATS::*what, enum ts_stats_type type)
|
||||
{
|
||||
return stats_get(pollStats, what, type);
|
||||
}
|
||||
|
||||
inline int64_t queue_stats_get(int64_t QUEUE_STATS::*what, enum ts_stats_type type)
|
||||
{
|
||||
return stats_get(queueStats, what, type);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Debug routine to print the polling statistics
|
||||
@ -863,7 +858,6 @@ dprintPollStats(DCB *dcb)
|
||||
|
||||
dcb_printf(dcb, "No of poll completions with descriptors\n");
|
||||
dcb_printf(dcb, "\tNo. of descriptors\tNo. of poll completions.\n");
|
||||
int n_threads = config_threadcount();
|
||||
for (i = 0; i < MAXNFDS - 1; i++)
|
||||
{
|
||||
int64_t v = 0;
|
||||
@ -1100,6 +1094,29 @@ poll_loadav(void *data)
|
||||
}
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
void get_queue_times(int index, int32_t* qtimes, int32_t* exectimes)
|
||||
{
|
||||
int64_t q = 0;
|
||||
int64_t e = 0;
|
||||
|
||||
for (int j = 0; j < n_threads; ++j)
|
||||
{
|
||||
q += queueStats[j].qtimes[index];
|
||||
e += queueStats[j].exectimes[index];
|
||||
}
|
||||
|
||||
q /= n_threads;
|
||||
e /= n_threads;
|
||||
|
||||
*qtimes = q;
|
||||
*exectimes = e;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Print the event queue statistics
|
||||
*
|
||||
@ -1111,29 +1128,35 @@ dShowEventStats(DCB *pdcb)
|
||||
int i;
|
||||
|
||||
dcb_printf(pdcb, "\nEvent statistics.\n");
|
||||
dcb_printf(pdcb, "Maximum queue time: %3" PRId64 "00ms\n", ts_stats_get(queueStats.maxqtime,
|
||||
TS_STATS_MAX));
|
||||
dcb_printf(pdcb, "Maximum execution time: %3" PRId64 "00ms\n", ts_stats_get(queueStats.maxexectime,
|
||||
TS_STATS_MAX));
|
||||
dcb_printf(pdcb, "Maximum event queue length: %3" PRId64 "\n", poll_stats_get(&POLL_STATS::evq_max,
|
||||
TS_STATS_MAX));
|
||||
dcb_printf(pdcb, "Total event queue length: %3" PRId64 "\n", poll_stats_get(&POLL_STATS::evq_length,
|
||||
TS_STATS_SUM));
|
||||
dcb_printf(pdcb, "Average event queue length: %3" PRId64 "\n", poll_stats_get(&POLL_STATS::evq_length,
|
||||
TS_STATS_AVG));
|
||||
dcb_printf(pdcb, "Maximum queue time: %3" PRId64 "00ms\n",
|
||||
queue_stats_get(&QUEUE_STATS::maxqtime, TS_STATS_MAX));
|
||||
dcb_printf(pdcb, "Maximum execution time: %3" PRId64 "00ms\n",
|
||||
queue_stats_get(&QUEUE_STATS::maxexectime, TS_STATS_MAX));
|
||||
dcb_printf(pdcb, "Maximum event queue length: %3" PRId64 "\n",
|
||||
poll_stats_get(&POLL_STATS::evq_max, TS_STATS_MAX));
|
||||
dcb_printf(pdcb, "Total event queue length: %3" PRId64 "\n",
|
||||
poll_stats_get(&POLL_STATS::evq_length, TS_STATS_SUM));
|
||||
dcb_printf(pdcb, "Average event queue length: %3" PRId64 "\n",
|
||||
poll_stats_get(&POLL_STATS::evq_length, TS_STATS_AVG));
|
||||
dcb_printf(pdcb, "\n");
|
||||
dcb_printf(pdcb, " | Number of events\n");
|
||||
dcb_printf(pdcb, "Duration | Queued | Executed\n");
|
||||
dcb_printf(pdcb, "---------------+------------+-----------\n");
|
||||
dcb_printf(pdcb, " < 100ms | %-10d | %-10d\n",
|
||||
queueStats.qtimes[0], queueStats.exectimes[0]);
|
||||
|
||||
int32_t qtimes;
|
||||
int32_t exectimes;
|
||||
|
||||
get_queue_times(0, &qtimes, &exectimes);
|
||||
dcb_printf(pdcb, " < 100ms | %-10d | %-10d\n", qtimes, exectimes);
|
||||
|
||||
for (i = 1; i < N_QUEUE_TIMES; i++)
|
||||
{
|
||||
dcb_printf(pdcb, " %2d00 - %2d00ms | %-10d | %-10d\n", i, i + 1,
|
||||
queueStats.qtimes[i], queueStats.exectimes[i]);
|
||||
get_queue_times(i, &qtimes, &exectimes);
|
||||
dcb_printf(pdcb, " %2d00 - %2d00ms | %-10d | %-10d\n", i, i + 1, qtimes, exectimes);
|
||||
}
|
||||
dcb_printf(pdcb, " > %2d00ms | %-10d | %-10d\n", N_QUEUE_TIMES,
|
||||
queueStats.qtimes[N_QUEUE_TIMES], queueStats.exectimes[N_QUEUE_TIMES]);
|
||||
|
||||
get_queue_times(N_QUEUE_TIMES, &qtimes, &exectimes);
|
||||
dcb_printf(pdcb, " > %2d00ms | %-10d | %-10d\n", N_QUEUE_TIMES, qtimes, exectimes);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -1162,9 +1185,9 @@ poll_get_stat(POLL_STAT stat)
|
||||
case POLL_STAT_EVQ_MAX:
|
||||
return poll_stats_get(&POLL_STATS::evq_max, TS_STATS_MAX);
|
||||
case POLL_STAT_MAX_QTIME:
|
||||
return ts_stats_get(queueStats.maxqtime, TS_STATS_MAX);
|
||||
return queue_stats_get(&QUEUE_STATS::maxqtime, TS_STATS_MAX);
|
||||
case POLL_STAT_MAX_EXECTIME:
|
||||
return ts_stats_get(queueStats.maxexectime, TS_STATS_MAX);
|
||||
return queue_stats_get(&QUEUE_STATS::maxexectime, TS_STATS_MAX);
|
||||
default:
|
||||
ss_dassert(false);
|
||||
break;
|
||||
@ -1208,10 +1231,13 @@ eventTimesRowCallback(RESULTSET *set, void *data)
|
||||
buf[39] = '\0';
|
||||
resultset_row_set(row, 0, buf);
|
||||
}
|
||||
snprintf(buf, 39, "%u", queueStats.qtimes[*rowno]);
|
||||
int32_t qtimes;
|
||||
int32_t exectimes;
|
||||
get_queue_times(*rowno, &qtimes, &exectimes);
|
||||
snprintf(buf, 39, "%u", qtimes);
|
||||
buf[39] = '\0';
|
||||
resultset_row_set(row, 1, buf);
|
||||
snprintf(buf, 39, "%u", queueStats.exectimes[*rowno]);
|
||||
snprintf(buf, 39, "%u", exectimes);
|
||||
buf[39] = '\0';
|
||||
resultset_row_set(row, 2, buf);
|
||||
(*rowno)++;
|
||||
|
@ -32,6 +32,8 @@ using maxscale::Worker;
|
||||
THREAD_DATA *thread_data = NULL;
|
||||
// TODO: Temporarily moved here.
|
||||
POLL_STATS *pollStats = NULL;
|
||||
// TODO: Temporarily moved here.
|
||||
QUEUE_STATS* queueStats = NULL;
|
||||
|
||||
namespace
|
||||
{
|
||||
@ -131,11 +133,16 @@ void poll_resolve_error(int wid, int fd, int errornum, int op)
|
||||
static bool modules_thread_init();
|
||||
static void modules_thread_finish();
|
||||
|
||||
Worker::Worker(int id, int epoll_fd, THREAD_DATA* pThread_data, POLL_STATS* pPoll_stats)
|
||||
Worker::Worker(int id,
|
||||
int epoll_fd,
|
||||
THREAD_DATA* pThread_data,
|
||||
POLL_STATS* pPoll_stats,
|
||||
QUEUE_STATS* pQueue_stats)
|
||||
: m_id(id)
|
||||
, m_epoll_fd(epoll_fd)
|
||||
, m_pThread_data(pThread_data)
|
||||
, m_pPoll_stats(pPoll_stats)
|
||||
, m_pQueue_stats(pQueue_stats)
|
||||
, m_pQueue(NULL)
|
||||
, m_thread(0)
|
||||
, m_started(false)
|
||||
@ -169,6 +176,16 @@ void Worker::init()
|
||||
}
|
||||
|
||||
pollStats = (POLL_STATS*)MXS_CALLOC(this_unit.n_workers, sizeof(POLL_STATS));
|
||||
if (!pollStats)
|
||||
{
|
||||
exit(-1);
|
||||
}
|
||||
|
||||
queueStats = (QUEUE_STATS*)MXS_CALLOC(this_unit.n_workers, sizeof(QUEUE_STATS));
|
||||
if (!queueStats)
|
||||
{
|
||||
exit(-1);
|
||||
}
|
||||
|
||||
this_unit.ppWorkers = new (std::nothrow) Worker* [this_unit.n_workers] (); // Zero initialized array
|
||||
|
||||
@ -180,7 +197,7 @@ void Worker::init()
|
||||
|
||||
for (int i = 0; i < this_unit.n_workers; ++i)
|
||||
{
|
||||
Worker* pWorker = Worker::create(i, &thread_data[i], &pollStats[i]);
|
||||
Worker* pWorker = Worker::create(i, &thread_data[i], &pollStats[i], &queueStats[i]);
|
||||
|
||||
if (pWorker)
|
||||
{
|
||||
@ -335,7 +352,9 @@ bool should_shutdown(void* pData)
|
||||
void Worker::run()
|
||||
{
|
||||
this_thread.current_worker_id = m_id;
|
||||
poll_waitevents(m_epoll_fd, m_id, m_pThread_data, m_pPoll_stats, ::should_shutdown, this);
|
||||
poll_waitevents(m_epoll_fd, m_id,
|
||||
m_pThread_data, m_pPoll_stats, m_pQueue_stats,
|
||||
::should_shutdown, this);
|
||||
this_thread.current_worker_id = WORKER_ABSENT_ID;
|
||||
|
||||
MXS_NOTICE("Worker %d has shut down.", m_id);
|
||||
@ -398,11 +417,15 @@ void Worker::shutdown_all()
|
||||
* @param worker_id The id of the worker.
|
||||
* @param pThread_data The thread data of the worker.
|
||||
* @param pPoll_stats The poll statistics of the worker.
|
||||
* @param pQueue_stats The queue statistics of the worker.
|
||||
*
|
||||
* @return A worker instance if successful, otherwise NULL.
|
||||
*/
|
||||
//static
|
||||
Worker* Worker::create(int worker_id, THREAD_DATA* pThread_data, POLL_STATS* pPoll_stats)
|
||||
Worker* Worker::create(int worker_id,
|
||||
THREAD_DATA* pThread_data,
|
||||
POLL_STATS* pPoll_stats,
|
||||
QUEUE_STATS* pQueue_stats)
|
||||
{
|
||||
Worker* pThis = NULL;
|
||||
|
||||
@ -410,7 +433,7 @@ Worker* Worker::create(int worker_id, THREAD_DATA* pThread_data, POLL_STATS* pPo
|
||||
|
||||
if (epoll_fd != -1)
|
||||
{
|
||||
pThis = new (std::nothrow) Worker(worker_id, epoll_fd, pThread_data, pPoll_stats);
|
||||
pThis = new (std::nothrow) Worker(worker_id, epoll_fd, pThread_data, pPoll_stats, pQueue_stats);
|
||||
|
||||
if (pThis)
|
||||
{
|
||||
|
Loading…
x
Reference in New Issue
Block a user