Fold thread state into poll stats

This commit is contained in:
Johan Wikman
2017-04-12 22:54:53 +03:00
parent 30d7f52852
commit 3ac619bfec
5 changed files with 14 additions and 43 deletions

View File

@ -65,18 +65,6 @@ typedef enum
THREAD_ZPROCESSING THREAD_ZPROCESSING
} THREAD_STATE; } THREAD_STATE;
// TODO: Temporarily moved here.
/**
* Thread data used to report the current state and activity related to
* a thread
*/
typedef struct
{
THREAD_STATE state; /*< Current thread state */
} THREAD_DATA;
extern THREAD_DATA *thread_data;
// TODO: Temporarily moved here. // TODO: Temporarily moved here.
/** /**
* The number of buckets used to gather statistics about how many * The number of buckets used to gather statistics about how many
@ -92,6 +80,7 @@ extern THREAD_DATA *thread_data;
// TODO: Temporarily moved here. // TODO: Temporarily moved here.
typedef struct typedef struct
{ {
THREAD_STATE thread_state; /*< The thread state. */
int64_t n_read; /*< Number of read events */ int64_t n_read; /*< Number of read events */
int64_t n_write; /*< Number of write events */ int64_t n_write; /*< Number of write events */
int64_t n_error; /*< Number of error events */ int64_t n_error; /*< Number of error events */

View File

@ -52,7 +52,6 @@ void poll_init();
void poll_waitevents(int epoll_fd, void poll_waitevents(int epoll_fd,
int thread_id, int thread_id,
THREAD_DATA* thread_data,
POLL_STATS* poll_stats, POLL_STATS* poll_stats,
QUEUE_STATS* queue_stats, QUEUE_STATS* queue_stats,
bool (*should_terminate)(void* data), bool (*should_terminate)(void* data),

View File

@ -199,13 +199,11 @@ public:
private: private:
Worker(int id, Worker(int id,
int epoll_fd, int epoll_fd,
THREAD_DATA* pThread_data,
POLL_STATS* pPoll_stats, POLL_STATS* pPoll_stats,
QUEUE_STATS* pQueue_stats); QUEUE_STATS* pQueue_stats);
virtual ~Worker(); virtual ~Worker();
static Worker* create(int id, static Worker* create(int id,
THREAD_DATA* pThread_data,
POLL_STATS* pPoll_stats, POLL_STATS* pPoll_stats,
QUEUE_STATS* pQueue_stats); QUEUE_STATS* pQueue_stats);
@ -216,7 +214,6 @@ private:
private: private:
int m_id; /*< The id of the worker. */ int m_id; /*< The id of the worker. */
int m_epoll_fd; /*< The epoll file descriptor. */ int m_epoll_fd; /*< The epoll file descriptor. */
THREAD_DATA* m_pThread_data; /*< The thread data of the worker. */
POLL_STATS* m_pPoll_stats; /*< Statistics for worker. */ POLL_STATS* m_pPoll_stats; /*< Statistics for worker. */
QUEUE_STATS* m_pQueue_stats; /*< Statistics for queue. */ QUEUE_STATS* m_pQueue_stats; /*< Statistics for queue. */
MessageQueue* m_pQueue; /*< The message queue of the worker. */ MessageQueue* m_pQueue; /*< The message queue of the worker. */

View File

@ -272,7 +272,6 @@ bool poll_remove_fd_from_worker(int wid, int fd)
* *
* @param epoll_fd The epoll descriptor. * @param epoll_fd The epoll descriptor.
* @param thread_id The id of the calling thread. * @param thread_id The id of the calling thread.
* @param thread_data The thread data of the calling thread.
* @param poll_stats The polling stats of the calling thread. * @param poll_stats The polling stats of the calling thread.
* @param queue_stats The queue stats of the calling thread. * @param queue_stats The queue stats of the calling thread.
* @param should_shutdown Pointer to function returning true if the polling should * @param should_shutdown Pointer to function returning true if the polling should
@ -281,7 +280,6 @@ bool poll_remove_fd_from_worker(int wid, int fd)
*/ */
void poll_waitevents(int epoll_fd, void poll_waitevents(int epoll_fd,
int thread_id, int thread_id,
THREAD_DATA* thread_data,
POLL_STATS* poll_stats, POLL_STATS* poll_stats,
QUEUE_STATS* queue_stats, QUEUE_STATS* queue_stats,
bool (*should_shutdown)(void* data), bool (*should_shutdown)(void* data),
@ -293,11 +291,11 @@ void poll_waitevents(int epoll_fd,
int i, nfds, timeout_bias = 1; int i, nfds, timeout_bias = 1;
int poll_spins = 0; int poll_spins = 0;
thread_data->state = THREAD_IDLE; poll_stats->thread_state = THREAD_IDLE;
while (!should_shutdown(data)) while (!should_shutdown(data))
{ {
thread_data->state = THREAD_POLLING; poll_stats->thread_state = THREAD_POLLING;
atomic_add_int64(&poll_stats->n_polls, 1); atomic_add_int64(&poll_stats->n_polls, 1);
if ((nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, 0)) == -1) if ((nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, 0)) == -1)
@ -354,7 +352,7 @@ void poll_waitevents(int epoll_fd,
nfds); nfds);
atomic_add_int64(&poll_stats->n_pollev, 1); atomic_add_int64(&poll_stats->n_pollev, 1);
thread_data->state = THREAD_PROCESSING; poll_stats->thread_state = THREAD_PROCESSING;
poll_stats->n_fds[(nfds < MAXNFDS ? (nfds - 1) : MAXNFDS - 1)]++; poll_stats->n_fds[(nfds < MAXNFDS ? (nfds - 1) : MAXNFDS - 1)]++;
@ -440,17 +438,17 @@ void poll_waitevents(int epoll_fd,
dcb_process_idle_sessions(thread_id); dcb_process_idle_sessions(thread_id);
thread_data->state = THREAD_ZPROCESSING; poll_stats->thread_state = THREAD_ZPROCESSING;
/** Process closed DCBs */ /** Process closed DCBs */
dcb_process_zombies(thread_id); dcb_process_zombies(thread_id);
poll_check_message(); poll_check_message();
thread_data->state = THREAD_IDLE; poll_stats->thread_state = THREAD_IDLE;
} /*< while(1) */ } /*< while(1) */
thread_data->state = THREAD_STOPPED; poll_stats->thread_state = THREAD_STOPPED;
} }
/** /**
@ -984,7 +982,7 @@ dShowThreads(DCB *dcb)
dcb_printf(dcb, "----+------------\n"); dcb_printf(dcb, "----+------------\n");
for (i = 0; i < n_threads; i++) for (i = 0; i < n_threads; i++)
{ {
switch (thread_data[i].state) switch (pollStats[i].thread_state)
{ {
case THREAD_STOPPED: case THREAD_STOPPED:
state = "Stopped"; state = "Stopped";

View File

@ -28,8 +28,6 @@
using maxscale::Worker; using maxscale::Worker;
// TODO: Temporarily moved here.
THREAD_DATA *thread_data = NULL;
// TODO: Temporarily moved here. // TODO: Temporarily moved here.
POLL_STATS *pollStats = NULL; POLL_STATS *pollStats = NULL;
// TODO: Temporarily moved here. // TODO: Temporarily moved here.
@ -135,12 +133,10 @@ static void modules_thread_finish();
Worker::Worker(int id, Worker::Worker(int id,
int epoll_fd, int epoll_fd,
THREAD_DATA* pThread_data,
POLL_STATS* pPoll_stats, POLL_STATS* pPoll_stats,
QUEUE_STATS* pQueue_stats) QUEUE_STATS* pQueue_stats)
: m_id(id) : m_id(id)
, m_epoll_fd(epoll_fd) , m_epoll_fd(epoll_fd)
, m_pThread_data(pThread_data)
, m_pPoll_stats(pPoll_stats) , m_pPoll_stats(pPoll_stats)
, m_pQueue_stats(pQueue_stats) , m_pQueue_stats(pQueue_stats)
, m_pQueue(NULL) , m_pQueue(NULL)
@ -164,21 +160,15 @@ void Worker::init()
{ {
this_unit.n_workers = config_threadcount(); this_unit.n_workers = config_threadcount();
thread_data = (THREAD_DATA*)MXS_CALLOC(this_unit.n_workers, sizeof(THREAD_DATA)); pollStats = (POLL_STATS*)MXS_CALLOC(this_unit.n_workers, sizeof(POLL_STATS));
if (!thread_data) if (!pollStats)
{ {
exit(-1); exit(-1);
} }
for (int i = 0; i < this_unit.n_workers; i++) for (int i = 0; i < this_unit.n_workers; i++)
{ {
thread_data[i].state = THREAD_STOPPED; pollStats[i].thread_state = THREAD_STOPPED;
}
pollStats = (POLL_STATS*)MXS_CALLOC(this_unit.n_workers, sizeof(POLL_STATS));
if (!pollStats)
{
exit(-1);
} }
queueStats = (QUEUE_STATS*)MXS_CALLOC(this_unit.n_workers, sizeof(QUEUE_STATS)); queueStats = (QUEUE_STATS*)MXS_CALLOC(this_unit.n_workers, sizeof(QUEUE_STATS));
@ -197,7 +187,7 @@ void Worker::init()
for (int i = 0; i < this_unit.n_workers; ++i) for (int i = 0; i < this_unit.n_workers; ++i)
{ {
Worker* pWorker = Worker::create(i, &thread_data[i], &pollStats[i], &queueStats[i]); Worker* pWorker = Worker::create(i, &pollStats[i], &queueStats[i]);
if (pWorker) if (pWorker)
{ {
@ -353,7 +343,7 @@ void Worker::run()
{ {
this_thread.current_worker_id = m_id; this_thread.current_worker_id = m_id;
poll_waitevents(m_epoll_fd, m_id, poll_waitevents(m_epoll_fd, m_id,
m_pThread_data, m_pPoll_stats, m_pQueue_stats, m_pPoll_stats, m_pQueue_stats,
::should_shutdown, this); ::should_shutdown, this);
this_thread.current_worker_id = WORKER_ABSENT_ID; this_thread.current_worker_id = WORKER_ABSENT_ID;
@ -415,7 +405,6 @@ void Worker::shutdown_all()
* - Adds the read descriptor to the polling mechanism. * - Adds the read descriptor to the polling mechanism.
* *
* @param worker_id The id of the worker. * @param worker_id The id of the worker.
* @param pThread_data The thread data of the worker.
* @param pPoll_stats The poll statistics of the worker. * @param pPoll_stats The poll statistics of the worker.
* @param pQueue_stats The queue statistics of the worker. * @param pQueue_stats The queue statistics of the worker.
* *
@ -423,7 +412,6 @@ void Worker::shutdown_all()
*/ */
//static //static
Worker* Worker::create(int worker_id, Worker* Worker::create(int worker_id,
THREAD_DATA* pThread_data,
POLL_STATS* pPoll_stats, POLL_STATS* pPoll_stats,
QUEUE_STATS* pQueue_stats) QUEUE_STATS* pQueue_stats)
{ {
@ -433,7 +421,7 @@ Worker* Worker::create(int worker_id,
if (epoll_fd != -1) if (epoll_fd != -1)
{ {
pThis = new (std::nothrow) Worker(worker_id, epoll_fd, pThread_data, pPoll_stats, pQueue_stats); pThis = new (std::nothrow) Worker(worker_id, epoll_fd, pPoll_stats, pQueue_stats);
if (pThis) if (pThis)
{ {