MXS-1754 Rename mxs::Worker to mxs::RoutingWorker

A new class mxs::Worker will be introduced and mxs::RoutingWorker
will be inherited from that. mxs::Worker will basically only be a
thread with a message-loop.

Once available, all current non-worker threads (but the one
implicitly created by microhttpd) can be creating by inheriting
from that; in practice that means the housekeeping thread, all
monitor threads and possibly the logging thread.

The benefit of this arrangement is that there then will be a general
mechanism for cross thread communication without having to use any
shared data structures.
This commit is contained in:
Johan Wikman
2018-04-12 13:04:54 +03:00
parent fa3143cedf
commit 230876cd69
15 changed files with 140 additions and 138 deletions

View File

@ -38,6 +38,7 @@ add_library(maxscale-common SHARED
resultset.cc resultset.cc
resource.cc resource.cc
router.cc router.cc
routingworker.cc
secrets.cc secrets.cc
semaphore.cc semaphore.cc
server.cc server.cc
@ -51,7 +52,6 @@ add_library(maxscale-common SHARED
thread.cc thread.cc
users.cc users.cc
utils.cc utils.cc
worker.cc
workertask.cc workertask.cc
) )

View File

@ -57,8 +57,8 @@
#include <maxscale/semaphore.hh> #include <maxscale/semaphore.hh>
#include "internal/modules.h" #include "internal/modules.h"
#include "internal/routingworker.hh"
#include "internal/session.h" #include "internal/session.h"
#include "internal/worker.hh"
#include "internal/workertask.hh" #include "internal/workertask.hh"
using maxscale::Worker; using maxscale::Worker;

View File

@ -53,16 +53,16 @@
#include <maxscale/version.h> #include <maxscale/version.h>
#include <maxscale/random_jkiss.h> #include <maxscale/random_jkiss.h>
#include "internal/admin.hh"
#include "internal/config.h" #include "internal/config.h"
#include "internal/maxscale.h" #include "internal/maxscale.h"
#include "internal/messagequeue.hh" #include "internal/messagequeue.hh"
#include "internal/modules.h" #include "internal/modules.h"
#include "internal/monitor.h" #include "internal/monitor.h"
#include "internal/poll.h" #include "internal/poll.h"
#include "internal/routingworker.hh"
#include "internal/service.h" #include "internal/service.h"
#include "internal/statistics.h" #include "internal/statistics.h"
#include "internal/admin.hh"
#include "internal/worker.hh"
using namespace maxscale; using namespace maxscale;

View File

@ -19,7 +19,7 @@ namespace maxscale
{ {
class MessageQueue; class MessageQueue;
class Worker; class RoutingWorker;
/** /**
* An instance of @c MessageQueueMessage can be sent over a @c MessageQueue from * An instance of @c MessageQueueMessage can be sent over a @c MessageQueue from
@ -180,7 +180,7 @@ public:
* @attention If the message queue is currently added to a worker, it * @attention If the message queue is currently added to a worker, it
* will first be removed from that worker. * will first be removed from that worker.
*/ */
bool add_to_worker(Worker* pWorker); bool add_to_worker(RoutingWorker* pWorker);
/** /**
* Removes the message queue from the worker it is currently added to. * Removes the message queue from the worker it is currently added to.
@ -188,7 +188,7 @@ public:
* @return The worker the message queue was associated with, or NULL * @return The worker the message queue was associated with, or NULL
* if it was not associated with any. * if it was not associated with any.
*/ */
Worker* remove_from_worker(); RoutingWorker* remove_from_worker();
private: private:
MessageQueue(Handler* pHandler, int read_fd, int write_fd); MessageQueue(Handler* pHandler, int read_fd, int write_fd);
@ -198,10 +198,10 @@ private:
static uint32_t poll_handler(MXS_POLL_DATA* pData, int thread_id, uint32_t events); static uint32_t poll_handler(MXS_POLL_DATA* pData, int thread_id, uint32_t events);
private: private:
Handler& m_handler; Handler& m_handler;
int m_read_fd; int m_read_fd;
int m_write_fd; int m_write_fd;
Worker* m_pWorker; RoutingWorker* m_pWorker;
}; };
} }

View File

@ -422,13 +422,15 @@ private:
Average1 m_load_1_second; /*< The load during the last 1-second period. */ Average1 m_load_1_second; /*< The load during the last 1-second period. */
}; };
class RoutingWorker;
typedef RoutingWorker Worker;
class Worker : public MXS_WORKER class RoutingWorker : public MXS_WORKER
, private MessageQueue::Handler , private MessageQueue::Handler
, private MXS_POLL_DATA , private MXS_POLL_DATA
{ {
Worker(const Worker&); RoutingWorker(const RoutingWorker&);
Worker& operator = (const Worker&); RoutingWorker& operator = (const RoutingWorker&);
public: public:
typedef WORKER_STATISTICS STATISTICS; typedef WORKER_STATISTICS STATISTICS;
@ -833,14 +835,14 @@ public:
* @return The corresponding worker instance, or NULL if the id does * @return The corresponding worker instance, or NULL if the id does
* not correspond to a worker. * not correspond to a worker.
*/ */
static Worker* get(int worker_id); static RoutingWorker* get(int worker_id);
/** /**
* Return the worker associated with the current thread. * Return the worker associated with the current thread.
* *
* @return The worker instance, or NULL if the current thread does not have a worker. * @return The worker instance, or NULL if the current thread does not have a worker.
*/ */
static Worker* get_current(); static RoutingWorker* get_current();
/** /**
* Return the worker id associated with the current thread. * Return the worker id associated with the current thread.
@ -865,11 +867,11 @@ public:
static void set_maxwait(unsigned int maxwait); static void set_maxwait(unsigned int maxwait);
private: private:
Worker(int id, RoutingWorker(int id,
int epoll_fd); int epoll_fd);
virtual ~Worker(); virtual ~RoutingWorker();
static Worker* create(int id, int epoll_listener_fd); static RoutingWorker* create(int id, int epoll_listener_fd);
void delete_zombies(); void delete_zombies();

View File

@ -17,7 +17,7 @@
namespace maxscale namespace maxscale
{ {
class Worker; class RoutingWorker;
/** /**
* A WorkerTask represents a task to be performed by a Worker. * A WorkerTask represents a task to be performed by a Worker.
@ -38,7 +38,7 @@ public:
* @attention As the function is called by a worker, the body of `execute` * @attention As the function is called by a worker, the body of `execute`
* should execute quickly and not perform any blocking operations. * should execute quickly and not perform any blocking operations.
*/ */
virtual void execute(Worker& worker) = 0; virtual void execute(RoutingWorker& worker) = 0;
}; };
/** /**
@ -69,10 +69,10 @@ protected:
* @attention As the function is called by a worker, the body of `execute` * @attention As the function is called by a worker, the body of `execute`
* should execute quickly and not perform any blocking operations. * should execute quickly and not perform any blocking operations.
*/ */
virtual void execute(Worker& worker) = 0; virtual void execute(RoutingWorker& worker) = 0;
private: private:
friend class Worker; friend class RoutingWorker;
void inc_ref(); void inc_ref();
void dec_ref(); void dec_ref();

View File

@ -18,7 +18,7 @@
#include <unistd.h> #include <unistd.h>
#include <maxscale/debug.h> #include <maxscale/debug.h>
#include <maxscale/log_manager.h> #include <maxscale/log_manager.h>
#include "internal/worker.hh" #include "internal/routingworker.hh"
namespace namespace
{ {
@ -165,7 +165,7 @@ bool MessageQueue::post(const Message& message) const
return rv; return rv;
} }
bool MessageQueue::add_to_worker(Worker* pWorker) bool MessageQueue::add_to_worker(RoutingWorker* pWorker)
{ {
if (m_pWorker) if (m_pWorker)
{ {
@ -181,9 +181,9 @@ bool MessageQueue::add_to_worker(Worker* pWorker)
return m_pWorker != NULL; return m_pWorker != NULL;
} }
Worker* MessageQueue::remove_from_worker() RoutingWorker* MessageQueue::remove_from_worker()
{ {
Worker* pWorker = m_pWorker; RoutingWorker* pWorker = m_pWorker;
if (m_pWorker) if (m_pWorker)
{ {

View File

@ -36,7 +36,7 @@
#include <maxscale/server.h> #include <maxscale/server.h>
#include <maxscale/statistics.h> #include <maxscale/statistics.h>
#include "internal/poll.h" #include "internal/poll.h"
#include "internal/worker.hh" #include "internal/routingworker.hh"
using maxscale::Worker; using maxscale::Worker;

View File

@ -35,7 +35,7 @@
#include "internal/config_runtime.h" #include "internal/config_runtime.h"
#include "internal/modules.h" #include "internal/modules.h"
#include "internal/worker.h" #include "internal/worker.h"
#include "internal/worker.hh" #include "internal/routingworker.hh"
using std::list; using std::list;
using std::map; using std::map;

View File

@ -11,7 +11,7 @@
* Public License. * Public License.
*/ */
#include "internal/worker.hh" #include "internal/routingworker.hh"
#include <errno.h> #include <errno.h>
#include <fcntl.h> #include <fcntl.h>
@ -40,7 +40,7 @@
#define WORKER_ABSENT_ID -1 #define WORKER_ABSENT_ID -1
using maxscale::Worker; using maxscale::RoutingWorker;
using maxscale::WorkerLoad; using maxscale::WorkerLoad;
using maxscale::Closer; using maxscale::Closer;
using maxscale::Semaphore; using maxscale::Semaphore;
@ -58,14 +58,14 @@ const int MXS_WORKER_MSG_DISPOSABLE_TASK = -2;
*/ */
struct this_unit struct this_unit
{ {
bool initialized; // Whether the initialization has been performed. bool initialized; // Whether the initialization has been performed.
int n_workers; // How many workers there are. int n_workers; // How many workers there are.
Worker** ppWorkers; // Array of worker instances. RoutingWorker** ppWorkers; // Array of worker instances.
// DEPRECATED in 2.3, remove in 2.4. // DEPRECATED in 2.3, remove in 2.4.
int number_poll_spins; // Maximum non-block polls int number_poll_spins; // Maximum non-block polls
// DEPRECATED in 2.3, remove in 2.4. // DEPRECATED in 2.3, remove in 2.4.
int max_poll_sleep; // Maximum block time int max_poll_sleep; // Maximum block time
int epoll_listener_fd; // Shared epoll descriptor for listening descriptors. int epoll_listener_fd; // Shared epoll descriptor for listening descriptors.
} this_unit = } this_unit =
{ {
false, false,
@ -198,7 +198,7 @@ uint64_t WorkerLoad::get_time()
return t.tv_sec * 1000 + (t.tv_nsec / 1000000); return t.tv_sec * 1000 + (t.tv_nsec / 1000000);
} }
Worker::Worker(int id, RoutingWorker::RoutingWorker(int id,
int epoll_fd) int epoll_fd)
: m_id(id) : m_id(id)
, m_state(STOPPED) , m_state(STOPPED)
@ -211,11 +211,11 @@ Worker::Worker(int id,
, m_nCurrent_descriptors(0) , m_nCurrent_descriptors(0)
, m_nTotal_descriptors(0) , m_nTotal_descriptors(0)
{ {
MXS_POLL_DATA::handler = &Worker::epoll_instance_handler; MXS_POLL_DATA::handler = &RoutingWorker::epoll_instance_handler;
MXS_POLL_DATA::thread.id = id; MXS_POLL_DATA::thread.id = id;
} }
Worker::~Worker() RoutingWorker::~RoutingWorker()
{ {
ss_dassert(!m_started); ss_dassert(!m_started);
@ -224,7 +224,7 @@ Worker::~Worker()
} }
// static // static
bool Worker::init() bool RoutingWorker::init()
{ {
ss_dassert(!this_unit.initialized); ss_dassert(!this_unit.initialized);
@ -236,13 +236,13 @@ bool Worker::init()
if (this_unit.epoll_listener_fd != -1) if (this_unit.epoll_listener_fd != -1)
{ {
int n_workers = config_threadcount(); int n_workers = config_threadcount();
Worker** ppWorkers = new (std::nothrow) Worker* [n_workers] (); // Zero initialized array RoutingWorker** ppWorkers = new (std::nothrow) RoutingWorker* [n_workers] (); // Zero initialized array
if (ppWorkers) if (ppWorkers)
{ {
for (int i = 0; i < n_workers; ++i) for (int i = 0; i < n_workers; ++i)
{ {
Worker* pWorker = Worker::create(i, this_unit.epoll_listener_fd); RoutingWorker* pWorker = RoutingWorker::create(i, this_unit.epoll_listener_fd);
if (pWorker) if (pWorker)
{ {
@ -292,13 +292,13 @@ bool Worker::init()
return this_unit.initialized; return this_unit.initialized;
} }
void Worker::finish() void RoutingWorker::finish()
{ {
ss_dassert(this_unit.initialized); ss_dassert(this_unit.initialized);
for (int i = this_unit.n_workers - 1; i >= 0; --i) for (int i = this_unit.n_workers - 1; i >= 0; --i)
{ {
Worker* pWorker = this_unit.ppWorkers[i]; RoutingWorker* pWorker = this_unit.ppWorkers[i];
delete pWorker; delete pWorker;
this_unit.ppWorkers[i] = NULL; this_unit.ppWorkers[i] = NULL;
@ -316,16 +316,16 @@ void Worker::finish()
namespace namespace
{ {
int64_t one_stats_get(int64_t Worker::STATISTICS::*what, enum ts_stats_type type) int64_t one_stats_get(int64_t RoutingWorker::STATISTICS::*what, enum ts_stats_type type)
{ {
int64_t best = type == TS_STATS_MAX ? LONG_MIN : (type == TS_STATS_MIX ? LONG_MAX : 0); int64_t best = type == TS_STATS_MAX ? LONG_MIN : (type == TS_STATS_MIX ? LONG_MAX : 0);
for (int i = 0; i < this_unit.n_workers; ++i) for (int i = 0; i < this_unit.n_workers; ++i)
{ {
Worker* pWorker = Worker::get(i); RoutingWorker* pWorker = RoutingWorker::get(i);
ss_dassert(pWorker); ss_dassert(pWorker);
const Worker::STATISTICS& s = pWorker->statistics(); const RoutingWorker::STATISTICS& s = pWorker->statistics();
int64_t value = s.*what; int64_t value = s.*what;
@ -358,7 +358,7 @@ int64_t one_stats_get(int64_t Worker::STATISTICS::*what, enum ts_stats_type type
} }
//static //static
Worker::STATISTICS Worker::get_statistics() RoutingWorker::STATISTICS RoutingWorker::get_statistics()
{ {
STATISTICS cs; STATISTICS cs;
@ -376,22 +376,22 @@ Worker::STATISTICS Worker::get_statistics()
cs.maxqtime = one_stats_get(&STATISTICS::maxqtime, TS_STATS_MAX); cs.maxqtime = one_stats_get(&STATISTICS::maxqtime, TS_STATS_MAX);
cs.maxexectime = one_stats_get(&STATISTICS::maxexectime, TS_STATS_MAX); cs.maxexectime = one_stats_get(&STATISTICS::maxexectime, TS_STATS_MAX);
for (int i = 0; i < Worker::STATISTICS::MAXNFDS - 1; i++) for (int i = 0; i < RoutingWorker::STATISTICS::MAXNFDS - 1; i++)
{ {
for (int j = 0; j < this_unit.n_workers; ++j) for (int j = 0; j < this_unit.n_workers; ++j)
{ {
Worker* pWorker = Worker::get(j); RoutingWorker* pWorker = RoutingWorker::get(j);
ss_dassert(pWorker); ss_dassert(pWorker);
cs.n_fds[i] += pWorker->statistics().n_fds[i]; cs.n_fds[i] += pWorker->statistics().n_fds[i];
} }
} }
for (int i = 0; i <= Worker::STATISTICS::N_QUEUE_TIMES; ++i) for (int i = 0; i <= RoutingWorker::STATISTICS::N_QUEUE_TIMES; ++i)
{ {
for (int j = 0; j < this_unit.n_workers; ++j) for (int j = 0; j < this_unit.n_workers; ++j)
{ {
Worker* pWorker = Worker::get(j); RoutingWorker* pWorker = RoutingWorker::get(j);
ss_dassert(pWorker); ss_dassert(pWorker);
cs.qtimes[i] += pWorker->statistics().qtimes[i]; cs.qtimes[i] += pWorker->statistics().qtimes[i];
@ -406,57 +406,57 @@ Worker::STATISTICS Worker::get_statistics()
} }
//static //static
int64_t Worker::get_one_statistic(POLL_STAT what) int64_t RoutingWorker::get_one_statistic(POLL_STAT what)
{ {
int64_t rv = 0; int64_t rv = 0;
int64_t Worker::STATISTICS::*member = NULL; int64_t RoutingWorker::STATISTICS::*member = NULL;
enum ts_stats_type approach; enum ts_stats_type approach;
switch (what) switch (what)
{ {
case POLL_STAT_READ: case POLL_STAT_READ:
member = &Worker::STATISTICS::n_read; member = &RoutingWorker::STATISTICS::n_read;
approach = TS_STATS_SUM; approach = TS_STATS_SUM;
break; break;
case POLL_STAT_WRITE: case POLL_STAT_WRITE:
member = &Worker::STATISTICS::n_write; member = &RoutingWorker::STATISTICS::n_write;
approach = TS_STATS_SUM; approach = TS_STATS_SUM;
break; break;
case POLL_STAT_ERROR: case POLL_STAT_ERROR:
member = &Worker::STATISTICS::n_error; member = &RoutingWorker::STATISTICS::n_error;
approach = TS_STATS_SUM; approach = TS_STATS_SUM;
break; break;
case POLL_STAT_HANGUP: case POLL_STAT_HANGUP:
member = &Worker::STATISTICS::n_hup; member = &RoutingWorker::STATISTICS::n_hup;
approach = TS_STATS_SUM; approach = TS_STATS_SUM;
break; break;
case POLL_STAT_ACCEPT: case POLL_STAT_ACCEPT:
member = &Worker::STATISTICS::n_accept; member = &RoutingWorker::STATISTICS::n_accept;
approach = TS_STATS_SUM; approach = TS_STATS_SUM;
break; break;
case POLL_STAT_EVQ_LEN: case POLL_STAT_EVQ_LEN:
member = &Worker::STATISTICS::evq_length; member = &RoutingWorker::STATISTICS::evq_length;
approach = TS_STATS_AVG; approach = TS_STATS_AVG;
break; break;
case POLL_STAT_EVQ_MAX: case POLL_STAT_EVQ_MAX:
member = &Worker::STATISTICS::evq_max; member = &RoutingWorker::STATISTICS::evq_max;
approach = TS_STATS_MAX; approach = TS_STATS_MAX;
break; break;
case POLL_STAT_MAX_QTIME: case POLL_STAT_MAX_QTIME:
member = &Worker::STATISTICS::maxqtime; member = &RoutingWorker::STATISTICS::maxqtime;
approach = TS_STATS_MAX; approach = TS_STATS_MAX;
break; break;
case POLL_STAT_MAX_EXECTIME: case POLL_STAT_MAX_EXECTIME:
member = &Worker::STATISTICS::maxexectime; member = &RoutingWorker::STATISTICS::maxexectime;
approach = TS_STATS_MAX; approach = TS_STATS_MAX;
break; break;
@ -472,13 +472,13 @@ int64_t Worker::get_one_statistic(POLL_STAT what)
return rv; return rv;
} }
void Worker::get_descriptor_counts(uint32_t* pnCurrent, uint64_t* pnTotal) void RoutingWorker::get_descriptor_counts(uint32_t* pnCurrent, uint64_t* pnTotal)
{ {
*pnCurrent = atomic_load_uint32(&m_nCurrent_descriptors); *pnCurrent = atomic_load_uint32(&m_nCurrent_descriptors);
*pnTotal = atomic_load_uint64(&m_nTotal_descriptors); *pnTotal = atomic_load_uint64(&m_nTotal_descriptors);
} }
bool Worker::add_fd(int fd, uint32_t events, MXS_POLL_DATA* pData) bool RoutingWorker::add_fd(int fd, uint32_t events, MXS_POLL_DATA* pData)
{ {
bool rv = true; bool rv = true;
@ -507,7 +507,7 @@ bool Worker::add_fd(int fd, uint32_t events, MXS_POLL_DATA* pData)
} }
//static //static
bool Worker::add_shared_fd(int fd, uint32_t events, MXS_POLL_DATA* pData) bool RoutingWorker::add_shared_fd(int fd, uint32_t events, MXS_POLL_DATA* pData)
{ {
bool rv = true; bool rv = true;
@ -535,7 +535,7 @@ bool Worker::add_shared_fd(int fd, uint32_t events, MXS_POLL_DATA* pData)
return rv; return rv;
} }
bool Worker::remove_fd(int fd) bool RoutingWorker::remove_fd(int fd)
{ {
bool rv = true; bool rv = true;
@ -555,7 +555,7 @@ bool Worker::remove_fd(int fd)
} }
//static //static
bool Worker::remove_shared_fd(int fd) bool RoutingWorker::remove_shared_fd(int fd)
{ {
bool rv = true; bool rv = true;
@ -572,15 +572,15 @@ bool Worker::remove_shared_fd(int fd)
int mxs_worker_id(MXS_WORKER* pWorker) int mxs_worker_id(MXS_WORKER* pWorker)
{ {
return static_cast<Worker*>(pWorker)->id(); return static_cast<RoutingWorker*>(pWorker)->id();
} }
bool mxs_worker_should_shutdown(MXS_WORKER* pWorker) bool mxs_worker_should_shutdown(MXS_WORKER* pWorker)
{ {
return static_cast<Worker*>(pWorker)->should_shutdown(); return static_cast<RoutingWorker*>(pWorker)->should_shutdown();
} }
Worker* Worker::get(int worker_id) RoutingWorker* RoutingWorker::get(int worker_id)
{ {
ss_dassert(worker_id < this_unit.n_workers); ss_dassert(worker_id < this_unit.n_workers);
@ -589,51 +589,51 @@ Worker* Worker::get(int worker_id)
MXS_WORKER* mxs_worker_get(int worker_id) MXS_WORKER* mxs_worker_get(int worker_id)
{ {
return Worker::get(worker_id); return RoutingWorker::get(worker_id);
} }
int mxs_worker_get_current_id() int mxs_worker_get_current_id()
{ {
return Worker::get_current_id(); return RoutingWorker::get_current_id();
} }
Worker* Worker::get_current() RoutingWorker* RoutingWorker::get_current()
{ {
Worker* pWorker = NULL; RoutingWorker* pWorker = NULL;
int worker_id = get_current_id(); int worker_id = get_current_id();
if (worker_id != WORKER_ABSENT_ID) if (worker_id != WORKER_ABSENT_ID)
{ {
pWorker = Worker::get(worker_id); pWorker = RoutingWorker::get(worker_id);
} }
return pWorker; return pWorker;
} }
int Worker::get_current_id() int RoutingWorker::get_current_id()
{ {
return this_thread.current_worker_id; return this_thread.current_worker_id;
} }
//static //static
void Worker::set_nonblocking_polls(unsigned int nbpolls) void RoutingWorker::set_nonblocking_polls(unsigned int nbpolls)
{ {
this_unit.number_poll_spins = nbpolls; this_unit.number_poll_spins = nbpolls;
} }
//static //static
void Worker::set_maxwait(unsigned int maxwait) void RoutingWorker::set_maxwait(unsigned int maxwait)
{ {
this_unit.max_poll_sleep = maxwait; this_unit.max_poll_sleep = maxwait;
} }
bool Worker::post(Task* pTask, Semaphore* pSem, enum execute_mode_t mode) bool RoutingWorker::post(Task* pTask, Semaphore* pSem, enum execute_mode_t mode)
{ {
// No logging here, function must be signal safe. // No logging here, function must be signal safe.
bool rval = true; bool rval = true;
if (mode == Worker::EXECUTE_AUTO && Worker::get_current() == this) if (mode == RoutingWorker::EXECUTE_AUTO && RoutingWorker::get_current() == this)
{ {
pTask->execute(*this); pTask->execute(*this);
@ -653,20 +653,20 @@ bool Worker::post(Task* pTask, Semaphore* pSem, enum execute_mode_t mode)
return rval; return rval;
} }
bool Worker::post(std::auto_ptr<DisposableTask> sTask, enum execute_mode_t mode) bool RoutingWorker::post(std::auto_ptr<DisposableTask> sTask, enum execute_mode_t mode)
{ {
// No logging here, function must be signal safe. // No logging here, function must be signal safe.
return post_disposable(sTask.release(), mode); return post_disposable(sTask.release(), mode);
} }
// private // private
bool Worker::post_disposable(DisposableTask* pTask, enum execute_mode_t mode) bool RoutingWorker::post_disposable(DisposableTask* pTask, enum execute_mode_t mode)
{ {
bool posted = true; bool posted = true;
pTask->inc_ref(); pTask->inc_ref();
if (mode == Worker::EXECUTE_AUTO && Worker::get_current() == this) if (mode == RoutingWorker::EXECUTE_AUTO && RoutingWorker::get_current() == this)
{ {
pTask->execute(*this); pTask->execute(*this);
pTask->dec_ref(); pTask->dec_ref();
@ -687,14 +687,14 @@ bool Worker::post_disposable(DisposableTask* pTask, enum execute_mode_t mode)
} }
//static //static
size_t Worker::broadcast(Task* pTask, Semaphore* pSem) size_t RoutingWorker::broadcast(Task* pTask, Semaphore* pSem)
{ {
// No logging here, function must be signal safe. // No logging here, function must be signal safe.
size_t n = 0; size_t n = 0;
for (int i = 0; i < this_unit.n_workers; ++i) for (int i = 0; i < this_unit.n_workers; ++i)
{ {
Worker* pWorker = this_unit.ppWorkers[i]; RoutingWorker* pWorker = this_unit.ppWorkers[i];
if (pWorker->post(pTask, pSem)) if (pWorker->post(pTask, pSem))
{ {
@ -706,7 +706,7 @@ size_t Worker::broadcast(Task* pTask, Semaphore* pSem)
} }
//static //static
size_t Worker::broadcast(std::auto_ptr<DisposableTask> sTask) size_t RoutingWorker::broadcast(std::auto_ptr<DisposableTask> sTask)
{ {
DisposableTask* pTask = sTask.release(); DisposableTask* pTask = sTask.release();
pTask->inc_ref(); pTask->inc_ref();
@ -715,7 +715,7 @@ size_t Worker::broadcast(std::auto_ptr<DisposableTask> sTask)
for (int i = 0; i < this_unit.n_workers; ++i) for (int i = 0; i < this_unit.n_workers; ++i)
{ {
Worker* pWorker = this_unit.ppWorkers[i]; RoutingWorker* pWorker = this_unit.ppWorkers[i];
if (pWorker->post_disposable(pTask)) if (pWorker->post_disposable(pTask))
{ {
@ -729,14 +729,14 @@ size_t Worker::broadcast(std::auto_ptr<DisposableTask> sTask)
} }
//static //static
size_t Worker::execute_serially(Task& task) size_t RoutingWorker::execute_serially(Task& task)
{ {
Semaphore sem; Semaphore sem;
size_t n = 0; size_t n = 0;
for (int i = 0; i < this_unit.n_workers; ++i) for (int i = 0; i < this_unit.n_workers; ++i)
{ {
Worker* pWorker = this_unit.ppWorkers[i]; RoutingWorker* pWorker = this_unit.ppWorkers[i];
if (pWorker->post(&task, &sem)) if (pWorker->post(&task, &sem))
{ {
@ -749,13 +749,13 @@ size_t Worker::execute_serially(Task& task)
} }
//static //static
size_t Worker::execute_concurrently(Task& task) size_t RoutingWorker::execute_concurrently(Task& task)
{ {
Semaphore sem; Semaphore sem;
return sem.wait_n(Worker::broadcast(&task, &sem)); return sem.wait_n(RoutingWorker::broadcast(&task, &sem));
} }
bool Worker::post_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2) bool RoutingWorker::post_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2)
{ {
// NOTE: No logging here, this function must be signal safe. // NOTE: No logging here, this function must be signal safe.
MessageQueue::Message message(msg_id, arg1, arg2); MessageQueue::Message message(msg_id, arg1, arg2);
@ -765,10 +765,10 @@ bool Worker::post_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2)
bool mxs_worker_post_message(MXS_WORKER* pWorker, uint32_t msg_id, intptr_t arg1, intptr_t arg2) bool mxs_worker_post_message(MXS_WORKER* pWorker, uint32_t msg_id, intptr_t arg1, intptr_t arg2)
{ {
return static_cast<Worker*>(pWorker)->post_message(msg_id, arg1, arg2); return static_cast<RoutingWorker*>(pWorker)->post_message(msg_id, arg1, arg2);
} }
size_t Worker::broadcast_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2) size_t RoutingWorker::broadcast_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2)
{ {
// NOTE: No logging here, this function must be signal safe. // NOTE: No logging here, this function must be signal safe.
@ -776,7 +776,7 @@ size_t Worker::broadcast_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2)
for (int i = 0; i < this_unit.n_workers; ++i) for (int i = 0; i < this_unit.n_workers; ++i)
{ {
Worker* pWorker = this_unit.ppWorkers[i]; RoutingWorker* pWorker = this_unit.ppWorkers[i];
if (pWorker->post_message(msg_id, arg1, arg2)) if (pWorker->post_message(msg_id, arg1, arg2))
{ {
@ -789,31 +789,31 @@ size_t Worker::broadcast_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2)
size_t mxs_worker_broadcast_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2) size_t mxs_worker_broadcast_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2)
{ {
return Worker::broadcast_message(msg_id, arg1, arg2); return RoutingWorker::broadcast_message(msg_id, arg1, arg2);
} }
bool mxs_worker_register_session(MXS_SESSION* session) bool mxs_worker_register_session(MXS_SESSION* session)
{ {
Worker* worker = Worker::get_current(); RoutingWorker* pWorker = RoutingWorker::get_current();
ss_dassert(worker); ss_dassert(pWorker);
return worker->session_registry().add(session); return pWorker->session_registry().add(session);
} }
bool mxs_worker_deregister_session(uint64_t id) bool mxs_worker_deregister_session(uint64_t id)
{ {
Worker* worker = Worker::get_current(); RoutingWorker* pWorker = RoutingWorker::get_current();
ss_dassert(worker); ss_dassert(pWorker);
return worker->session_registry().remove(id); return pWorker->session_registry().remove(id);
} }
MXS_SESSION* mxs_worker_find_session(uint64_t id) MXS_SESSION* mxs_worker_find_session(uint64_t id)
{ {
Worker* worker = Worker::get_current(); RoutingWorker* pWorker = RoutingWorker::get_current();
ss_dassert(worker); ss_dassert(pWorker);
return worker->session_registry().lookup(id); return pWorker->session_registry().lookup(id);
} }
Worker::SessionsById& Worker::session_registry() RoutingWorker::SessionsById& RoutingWorker::session_registry()
{ {
return m_sessions; return m_sessions;
} }
@ -827,10 +827,10 @@ public:
m_data.resize(nthreads); m_data.resize(nthreads);
} }
void execute(Worker& worker) void execute(RoutingWorker& worker)
{ {
json_t* stats = json_object(); json_t* stats = json_object();
const Worker::STATISTICS& s = worker.get_local_statistics(); const RoutingWorker::STATISTICS& s = worker.get_local_statistics();
json_object_set_new(stats, "reads", json_integer(s.n_read)); json_object_set_new(stats, "reads", json_integer(s.n_read));
json_object_set_new(stats, "writes", json_integer(s.n_write)); json_object_set_new(stats, "writes", json_integer(s.n_write));
json_object_set_new(stats, "errors", json_integer(s.n_error)); json_object_set_new(stats, "errors", json_integer(s.n_error));
@ -885,7 +885,7 @@ private:
json_t* mxs_worker_to_json(const char* host, int id) json_t* mxs_worker_to_json(const char* host, int id)
{ {
Worker* target = Worker::get(id); RoutingWorker* target = RoutingWorker::get(id);
WorkerInfoTask task(host, id + 1); WorkerInfoTask task(host, id + 1);
Semaphore sem; Semaphore sem;
@ -898,18 +898,18 @@ json_t* mxs_worker_to_json(const char* host, int id)
json_t* mxs_worker_list_to_json(const char* host) json_t* mxs_worker_list_to_json(const char* host)
{ {
WorkerInfoTask task(host, config_threadcount()); WorkerInfoTask task(host, config_threadcount());
Worker::execute_concurrently(task); RoutingWorker::execute_concurrently(task);
return task.resource(); return task.resource();
} }
void Worker::register_zombie(DCB* pDcb) void RoutingWorker::register_zombie(DCB* pDcb)
{ {
ss_dassert(pDcb->poll.thread.id == m_id); ss_dassert(pDcb->poll.thread.id == m_id);
m_zombies.push_back(pDcb); m_zombies.push_back(pDcb);
} }
void Worker::delete_zombies() void RoutingWorker::delete_zombies()
{ {
// An algorithm cannot be used, as the final closing of a DCB may cause // An algorithm cannot be used, as the final closing of a DCB may cause
// other DCBs to be registered in the zombie queue. // other DCBs to be registered in the zombie queue.
@ -922,7 +922,7 @@ void Worker::delete_zombies()
} }
} }
void Worker::run() void RoutingWorker::run()
{ {
this_thread.current_worker_id = m_id; this_thread.current_worker_id = m_id;
@ -941,11 +941,11 @@ void Worker::run()
this_thread.current_worker_id = WORKER_ABSENT_ID; this_thread.current_worker_id = WORKER_ABSENT_ID;
} }
bool Worker::start(size_t stack_size) bool RoutingWorker::start(size_t stack_size)
{ {
m_started = true; m_started = true;
if (!thread_start(&m_thread, &Worker::thread_main, this, stack_size)) if (!thread_start(&m_thread, &RoutingWorker::thread_main, this, stack_size))
{ {
m_started = false; m_started = false;
} }
@ -953,7 +953,7 @@ bool Worker::start(size_t stack_size)
return m_started; return m_started;
} }
void Worker::join() void RoutingWorker::join()
{ {
if (m_started) if (m_started)
{ {
@ -964,7 +964,7 @@ void Worker::join()
} }
} }
void Worker::shutdown() void RoutingWorker::shutdown()
{ {
// NOTE: No logging here, this function must be signal safe. // NOTE: No logging here, this function must be signal safe.
@ -977,14 +977,14 @@ void Worker::shutdown()
} }
} }
void Worker::shutdown_all() void RoutingWorker::shutdown_all()
{ {
// NOTE: No logging here, this function must be signal safe. // NOTE: No logging here, this function must be signal safe.
ss_dassert((this_unit.n_workers == 0) || (this_unit.ppWorkers != NULL)); ss_dassert((this_unit.n_workers == 0) || (this_unit.ppWorkers != NULL));
for (int i = 0; i < this_unit.n_workers; ++i) for (int i = 0; i < this_unit.n_workers; ++i)
{ {
Worker* pWorker = this_unit.ppWorkers[i]; RoutingWorker* pWorker = this_unit.ppWorkers[i];
ss_dassert(pWorker); ss_dassert(pWorker);
pWorker->shutdown(); pWorker->shutdown();
@ -1004,15 +1004,15 @@ void Worker::shutdown_all()
* @return A worker instance if successful, otherwise NULL. * @return A worker instance if successful, otherwise NULL.
*/ */
//static //static
Worker* Worker::create(int worker_id, int epoll_listener_fd) RoutingWorker* RoutingWorker::create(int worker_id, int epoll_listener_fd)
{ {
Worker* pThis = NULL; RoutingWorker* pThis = NULL;
int epoll_fd = epoll_create(MAX_EVENTS); int epoll_fd = epoll_create(MAX_EVENTS);
if (epoll_fd != -1) if (epoll_fd != -1)
{ {
pThis = new (std::nothrow) Worker(worker_id, epoll_fd); pThis = new (std::nothrow) RoutingWorker(worker_id, epoll_fd);
if (pThis) if (pThis)
{ {
@ -1084,7 +1084,7 @@ Worker* Worker::create(int worker_id, int epoll_listener_fd)
* @param arg1 Message specific first argument. * @param arg1 Message specific first argument.
* @param arg2 Message specific second argument. * @param arg2 Message specific second argument.
*/ */
void Worker::handle_message(MessageQueue& queue, const MessageQueue::Message& msg) void RoutingWorker::handle_message(MessageQueue& queue, const MessageQueue::Message& msg)
{ {
switch (msg.id()) switch (msg.id())
{ {
@ -1146,16 +1146,16 @@ void Worker::handle_message(MessageQueue& queue, const MessageQueue::Message& ms
* @param arg A worker. * @param arg A worker.
*/ */
//static //static
void Worker::thread_main(void* pArg) void RoutingWorker::thread_main(void* pArg)
{ {
Worker* pWorker = static_cast<Worker*>(pArg); RoutingWorker* pWorker = static_cast<RoutingWorker*>(pArg);
pWorker->run(); pWorker->run();
} }
/** /**
* The main polling loop * The main polling loop
*/ */
void Worker::poll_waitevents() void RoutingWorker::poll_waitevents()
{ {
struct epoll_event events[MAX_EVENTS]; struct epoll_event events[MAX_EVENTS];
@ -1299,9 +1299,9 @@ void Worker::poll_waitevents()
* @return What actions were performed. * @return What actions were performed.
*/ */
//static //static
uint32_t Worker::epoll_instance_handler(struct mxs_poll_data* pData, int wid, uint32_t events) uint32_t RoutingWorker::epoll_instance_handler(struct mxs_poll_data* pData, int wid, uint32_t events)
{ {
Worker* pWorker = static_cast<Worker*>(pData); RoutingWorker* pWorker = static_cast<RoutingWorker*>(pData);
ss_dassert(pWorker->m_id == wid); ss_dassert(pWorker->m_id == wid);
return pWorker->handle_epoll_events(events); return pWorker->handle_epoll_events(events);
@ -1314,7 +1314,7 @@ uint32_t Worker::epoll_instance_handler(struct mxs_poll_data* pData, int wid, ui
* *
* @return What actions were performed. * @return What actions were performed.
*/ */
uint32_t Worker::handle_epoll_events(uint32_t events) uint32_t RoutingWorker::handle_epoll_events(uint32_t events)
{ {
struct epoll_event epoll_events[1]; struct epoll_event epoll_events[1];

View File

@ -44,7 +44,7 @@
#include "internal/monitor.h" #include "internal/monitor.h"
#include "internal/poll.h" #include "internal/poll.h"
#include "internal/workertask.hh" #include "internal/workertask.hh"
#include "internal/worker.hh" #include "internal/routingworker.hh"
using maxscale::Semaphore; using maxscale::Semaphore;
using maxscale::Worker; using maxscale::Worker;

View File

@ -41,9 +41,9 @@
#include <maxscale/protocol/mysql.h> #include <maxscale/protocol/mysql.h>
#include "internal/dcb.h" #include "internal/dcb.h"
#include "internal/session.h"
#include "internal/filter.h" #include "internal/filter.h"
#include "internal/worker.hh" #include "internal/routingworker.hh"
#include "internal/session.h"
#include "internal/workertask.hh" #include "internal/workertask.hh"
using std::string; using std::string;

View File

@ -38,7 +38,7 @@
#include <maxscale/listener.h> #include <maxscale/listener.h>
#include "../internal/messagequeue.hh" #include "../internal/messagequeue.hh"
#include "../internal/worker.hh" #include "../internal/routingworker.hh"
#include "../dcb.cc" #include "../dcb.cc"
/** /**

View File

@ -27,8 +27,8 @@
#include <sys/stat.h> #include <sys/stat.h>
#include "../internal/poll.h" #include "../internal/poll.h"
#include "../internal/routingworker.hh"
#include "../internal/statistics.h" #include "../internal/statistics.h"
#include "../internal/worker.hh"
void init_test_env(char *path) void init_test_env(char *path)

View File

@ -15,7 +15,7 @@
#include <maxscale/utils.h> #include <maxscale/utils.h>
// TODO: Find a way to cleanly expose this // TODO: Find a way to cleanly expose this
#include "../../../core/internal/worker.hh" #include "../../../core/internal/routingworker.hh"
#ifdef EPOLLRDHUP #ifdef EPOLLRDHUP
#define ERROR_EVENTS (EPOLLRDHUP | EPOLLHUP | EPOLLERR) #define ERROR_EVENTS (EPOLLRDHUP | EPOLLHUP | EPOLLERR)