Move thread data to workers

This is a step in the direction that any worker/thread related data
is the property of the worker/thread.
This commit is contained in:
Johan Wikman 2017-04-06 16:12:03 +03:00
parent 052975bccd
commit c11ca1c328
5 changed files with 71 additions and 58 deletions

View File

@ -52,6 +52,35 @@ typedef struct mxs_poll_data
} thread;
} MXS_POLL_DATA;
// TODO: Temporarily moved here.
/**
* Internal MaxScale thread states
*/
typedef enum
{
THREAD_STOPPED,
THREAD_IDLE,
THREAD_POLLING,
THREAD_PROCESSING,
THREAD_ZPROCESSING
} THREAD_STATE;
// TODO: Temporarily moved here.
/**
* Thread data used to report the current state and activity related to
* a thread
*/
typedef struct
{
THREAD_STATE state; /*< Current thread state */
int n_fds; /*< No. of descriptors thread is processing */
MXS_POLL_DATA *cur_data; /*< Current MXS_POLL_DATA being processed */
uint32_t event; /*< Current event being processed */
uint64_t cycle_start; /*< The time when the poll loop was started */
} THREAD_DATA;
extern THREAD_DATA *thread_data;
/**
* A file descriptor should be added to the poll set of all workers.
*/

View File

@ -52,6 +52,7 @@ void poll_init();
void poll_waitevents(int epoll_fd,
int thread_id,
THREAD_DATA* thread_data,
bool (*should_terminate)(void* data),
void* data);
void poll_set_maxwait(unsigned int);

View File

@ -197,10 +197,10 @@ public:
static int get_current_id();
private:
Worker(int id, int epoll_fd);
Worker(int id, int epoll_fd, THREAD_DATA* pThread_data);
virtual ~Worker();
static Worker* create(int id);
static Worker* create(int id, THREAD_DATA* pThread_data);
void handle_message(MessageQueue& queue, const MessageQueue::Message& msg); // override
@ -209,6 +209,7 @@ private:
private:
int m_id; /*< The id of the worker. */
int m_epoll_fd; /*< The epoll file descriptor. */
THREAD_DATA* m_pThread_data; /*< The thread data of the worker. */
MessageQueue* m_pQueue; /*< The message queue of the worker. */
THREAD m_thread; /*< The thread handle of the worker. */
bool m_started; /*< Whether the thread has been started or not. */

View File

@ -110,33 +110,6 @@ static int n_avg_samples;
/* Thread statistics data */
static int n_threads; /*< No. of threads */
/**
* Internal MaxScale thread states
*/
typedef enum
{
THREAD_STOPPED,
THREAD_IDLE,
THREAD_POLLING,
THREAD_PROCESSING,
THREAD_ZPROCESSING
} THREAD_STATE;
/**
* Thread data used to report the current state and activity related to
* a thread
*/
typedef struct
{
THREAD_STATE state; /*< Current thread state */
int n_fds; /*< No. of descriptors thread is processing */
MXS_POLL_DATA *cur_data; /*< Current MXS_POLL_DATA being processed */
uint32_t event; /*< Current event being processed */
uint64_t cycle_start; /*< The time when the poll loop was started */
} THREAD_DATA;
static THREAD_DATA *thread_data = NULL; /*< Status of each thread */
/**
* The number of buckets used to gather statistics about how many
* descriptors where processed on each epoll completion.
@ -207,16 +180,6 @@ poll_init()
memset(&pollStats, 0, sizeof(pollStats));
memset(&queueStats, 0, sizeof(queueStats));
thread_data = (THREAD_DATA *)MXS_MALLOC(n_threads * sizeof(THREAD_DATA));
if (!thread_data)
{
exit(-1);
}
for (int i = 0; i < n_threads; i++)
{
thread_data[i].state = THREAD_STOPPED;
}
if ((pollStats.n_read = ts_stats_alloc()) == NULL ||
(pollStats.n_write = ts_stats_alloc()) == NULL ||
@ -376,12 +339,14 @@ bool poll_remove_fd_from_worker(int wid, int fd)
*
* @param epoll_fd The epoll descriptor.
* @param thread_id The id of the calling thread.
* @param thread_data The thread data of the calling thread.
* @param should_shutdown Pointer to function returning true if the polling should
* be terminated.
* @param data Data provided to the @c should_shutdown function.
*/
void poll_waitevents(int epoll_fd,
int thread_id,
THREAD_DATA* thread_data,
bool (*should_shutdown)(void* data),
void* data)
{
@ -391,12 +356,12 @@ void poll_waitevents(int epoll_fd,
int i, nfds, timeout_bias = 1;
int poll_spins = 0;
thread_data[thread_id].state = THREAD_IDLE;
thread_data->state = THREAD_IDLE;
while (!should_shutdown(data))
{
atomic_add(&n_waiting, 1);
thread_data[thread_id].state = THREAD_POLLING;
thread_data->state = THREAD_POLLING;
ts_stats_increment(pollStats.n_polls, thread_id);
if ((nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, 0)) == -1)
@ -461,10 +426,10 @@ void poll_waitevents(int epoll_fd,
nfds);
ts_stats_increment(pollStats.n_pollev, thread_id);
thread_data[thread_id].n_fds = nfds;
thread_data[thread_id].cur_data = NULL;
thread_data[thread_id].event = 0;
thread_data[thread_id].state = THREAD_PROCESSING;
thread_data->n_fds = nfds;
thread_data->cur_data = NULL;
thread_data->event = 0;
thread_data->state = THREAD_PROCESSING;
pollStats.n_fds[(nfds < MAXNFDS ? (nfds - 1) : MAXNFDS - 1)]++;
@ -484,14 +449,14 @@ void poll_waitevents(int epoll_fd,
*/
}
thread_data[thread_id].cycle_start = hkheartbeat;
thread_data->cycle_start = hkheartbeat;
/* Process of the queue of waiting requests */
for (int i = 0; i < nfds; i++)
{
/** Calculate event queue statistics */
uint64_t started = hkheartbeat;
uint64_t qtime = started - thread_data[thread_id].cycle_start;
uint64_t qtime = started - thread_data->cycle_start;
if (qtime > N_QUEUE_TIMES)
{
@ -505,9 +470,9 @@ void poll_waitevents(int epoll_fd,
ts_stats_set_max(queueStats.maxqtime, qtime, thread_id);
MXS_POLL_DATA *data = (MXS_POLL_DATA*)events[i].data.ptr;
thread_data[thread_id].cur_data = data;
thread_data->cur_data = data;
thread_data[thread_id].event = events[i].events;
thread_data->event = events[i].events;
uint32_t actions = data->handler(data, thread_id, events[i].events);
if (actions & MXS_POLL_ACCEPT)
@ -552,17 +517,17 @@ void poll_waitevents(int epoll_fd,
dcb_process_idle_sessions(thread_id);
thread_data[thread_id].state = THREAD_ZPROCESSING;
thread_data->state = THREAD_ZPROCESSING;
/** Process closed DCBs */
dcb_process_zombies(thread_id);
poll_check_message();
thread_data[thread_id].state = THREAD_IDLE;
thread_data->state = THREAD_IDLE;
} /*< while(1) */
thread_data[thread_id].state = THREAD_STOPPED;
thread_data->state = THREAD_STOPPED;
}
/**

View File

@ -28,6 +28,9 @@
using maxscale::Worker;
// TODO: Temporarily moved here.
THREAD_DATA *thread_data = NULL;
namespace
{
@ -126,9 +129,10 @@ void poll_resolve_error(int wid, int fd, int errornum, int op)
static bool modules_thread_init();
static void modules_thread_finish();
Worker::Worker(int id, int epoll_fd)
Worker::Worker(int id, int epoll_fd, THREAD_DATA* pThread_data)
: m_id(id)
, m_epoll_fd(epoll_fd)
, m_pThread_data(pThread_data)
, m_pQueue(NULL)
, m_thread(0)
, m_started(false)
@ -149,6 +153,18 @@ Worker::~Worker()
void Worker::init()
{
this_unit.n_workers = config_threadcount();
thread_data = (THREAD_DATA *)MXS_CALLOC(this_unit.n_workers, sizeof(THREAD_DATA));
if (!thread_data)
{
exit(-1);
}
for (int i = 0; i < this_unit.n_workers; i++)
{
thread_data[i].state = THREAD_STOPPED;
}
this_unit.ppWorkers = new (std::nothrow) Worker* [this_unit.n_workers] (); // Zero initialized array
if (!this_unit.ppWorkers)
@ -159,7 +175,7 @@ void Worker::init()
for (int i = 0; i < this_unit.n_workers; ++i)
{
Worker* pWorker = Worker::create(i);
Worker* pWorker = Worker::create(i, &thread_data[i]);
if (pWorker)
{
@ -314,7 +330,7 @@ bool should_shutdown(void* pData)
void Worker::run()
{
this_thread.current_worker_id = m_id;
poll_waitevents(m_epoll_fd, m_id, ::should_shutdown, this);
poll_waitevents(m_epoll_fd, m_id, m_pThread_data, ::should_shutdown, this);
this_thread.current_worker_id = WORKER_ABSENT_ID;
MXS_NOTICE("Worker %d has shut down.", m_id);
@ -374,12 +390,13 @@ void Worker::shutdown_all()
* - Creates a pipe.
* - Adds the read descriptor to the polling mechanism.
*
* @param worker_id The id of the worker.
* @param worker_id The id of the worker.
* @param pThread_data The thread data of the worker.
*
* @return A worker instance if successful, otherwise NULL.
*/
//static
Worker* Worker::create(int worker_id)
Worker* Worker::create(int worker_id, THREAD_DATA* pThread_data)
{
Worker* pThis = NULL;
@ -387,7 +404,7 @@ Worker* Worker::create(int worker_id)
if (epoll_fd != -1)
{
pThis = new (std::nothrow) Worker(worker_id, epoll_fd);
pThis = new (std::nothrow) Worker(worker_id, epoll_fd, pThread_data);
if (pThis)
{