diff --git a/server/core/CMakeLists.txt b/server/core/CMakeLists.txt index 9ff490016..5b3ece44b 100644 --- a/server/core/CMakeLists.txt +++ b/server/core/CMakeLists.txt @@ -38,6 +38,7 @@ add_library(maxscale-common SHARED resultset.cc resource.cc router.cc + routingworker.cc secrets.cc semaphore.cc server.cc @@ -51,7 +52,6 @@ add_library(maxscale-common SHARED thread.cc users.cc utils.cc - worker.cc workertask.cc ) diff --git a/server/core/dcb.cc b/server/core/dcb.cc index d34b6f9f6..fa5702452 100644 --- a/server/core/dcb.cc +++ b/server/core/dcb.cc @@ -57,8 +57,8 @@ #include #include "internal/modules.h" +#include "internal/routingworker.hh" #include "internal/session.h" -#include "internal/worker.hh" #include "internal/workertask.hh" using maxscale::Worker; diff --git a/server/core/gateway.cc b/server/core/gateway.cc index 9f37fdb51..9f98695aa 100644 --- a/server/core/gateway.cc +++ b/server/core/gateway.cc @@ -53,16 +53,16 @@ #include #include +#include "internal/admin.hh" #include "internal/config.h" #include "internal/maxscale.h" #include "internal/messagequeue.hh" #include "internal/modules.h" #include "internal/monitor.h" #include "internal/poll.h" +#include "internal/routingworker.hh" #include "internal/service.h" #include "internal/statistics.h" -#include "internal/admin.hh" -#include "internal/worker.hh" using namespace maxscale; diff --git a/server/core/internal/messagequeue.hh b/server/core/internal/messagequeue.hh index 8278f670b..8a928ac47 100644 --- a/server/core/internal/messagequeue.hh +++ b/server/core/internal/messagequeue.hh @@ -19,7 +19,7 @@ namespace maxscale { class MessageQueue; -class Worker; +class RoutingWorker; /** * An instance of @c MessageQueueMessage can be sent over a @c MessageQueue from @@ -180,7 +180,7 @@ public: * @attention If the message queue is currently added to a worker, it * will first be removed from that worker. */ - bool add_to_worker(Worker* pWorker); + bool add_to_worker(RoutingWorker* pWorker); /** * Removes the message queue from the worker it is currently added to. @@ -188,7 +188,7 @@ public: * @return The worker the message queue was associated with, or NULL * if it was not associated with any. */ - Worker* remove_from_worker(); + RoutingWorker* remove_from_worker(); private: MessageQueue(Handler* pHandler, int read_fd, int write_fd); @@ -198,10 +198,10 @@ private: static uint32_t poll_handler(MXS_POLL_DATA* pData, int thread_id, uint32_t events); private: - Handler& m_handler; - int m_read_fd; - int m_write_fd; - Worker* m_pWorker; + Handler& m_handler; + int m_read_fd; + int m_write_fd; + RoutingWorker* m_pWorker; }; } diff --git a/server/core/internal/worker.hh b/server/core/internal/routingworker.hh similarity index 98% rename from server/core/internal/worker.hh rename to server/core/internal/routingworker.hh index 7ccf86078..fb4b9290c 100644 --- a/server/core/internal/worker.hh +++ b/server/core/internal/routingworker.hh @@ -422,13 +422,15 @@ private: Average1 m_load_1_second; /*< The load during the last 1-second period. */ }; +class RoutingWorker; +typedef RoutingWorker Worker; -class Worker : public MXS_WORKER +class RoutingWorker : public MXS_WORKER , private MessageQueue::Handler , private MXS_POLL_DATA { - Worker(const Worker&); - Worker& operator = (const Worker&); + RoutingWorker(const RoutingWorker&); + RoutingWorker& operator = (const RoutingWorker&); public: typedef WORKER_STATISTICS STATISTICS; @@ -833,14 +835,14 @@ public: * @return The corresponding worker instance, or NULL if the id does * not correspond to a worker. */ - static Worker* get(int worker_id); + static RoutingWorker* get(int worker_id); /** * Return the worker associated with the current thread. * * @return The worker instance, or NULL if the current thread does not have a worker. */ - static Worker* get_current(); + static RoutingWorker* get_current(); /** * Return the worker id associated with the current thread. @@ -865,11 +867,11 @@ public: static void set_maxwait(unsigned int maxwait); private: - Worker(int id, - int epoll_fd); - virtual ~Worker(); + RoutingWorker(int id, + int epoll_fd); + virtual ~RoutingWorker(); - static Worker* create(int id, int epoll_listener_fd); + static RoutingWorker* create(int id, int epoll_listener_fd); void delete_zombies(); diff --git a/server/core/internal/workertask.hh b/server/core/internal/workertask.hh index 7510ee8d8..e5d0020c7 100644 --- a/server/core/internal/workertask.hh +++ b/server/core/internal/workertask.hh @@ -17,7 +17,7 @@ namespace maxscale { -class Worker; +class RoutingWorker; /** * A WorkerTask represents a task to be performed by a Worker. @@ -38,7 +38,7 @@ public: * @attention As the function is called by a worker, the body of `execute` * should execute quickly and not perform any blocking operations. */ - virtual void execute(Worker& worker) = 0; + virtual void execute(RoutingWorker& worker) = 0; }; /** @@ -69,10 +69,10 @@ protected: * @attention As the function is called by a worker, the body of `execute` * should execute quickly and not perform any blocking operations. */ - virtual void execute(Worker& worker) = 0; + virtual void execute(RoutingWorker& worker) = 0; private: - friend class Worker; + friend class RoutingWorker; void inc_ref(); void dec_ref(); diff --git a/server/core/messagequeue.cc b/server/core/messagequeue.cc index 8d52c798e..85279c0f5 100644 --- a/server/core/messagequeue.cc +++ b/server/core/messagequeue.cc @@ -18,7 +18,7 @@ #include #include #include -#include "internal/worker.hh" +#include "internal/routingworker.hh" namespace { @@ -165,7 +165,7 @@ bool MessageQueue::post(const Message& message) const return rv; } -bool MessageQueue::add_to_worker(Worker* pWorker) +bool MessageQueue::add_to_worker(RoutingWorker* pWorker) { if (m_pWorker) { @@ -181,9 +181,9 @@ bool MessageQueue::add_to_worker(Worker* pWorker) return m_pWorker != NULL; } -Worker* MessageQueue::remove_from_worker() +RoutingWorker* MessageQueue::remove_from_worker() { - Worker* pWorker = m_pWorker; + RoutingWorker* pWorker = m_pWorker; if (m_pWorker) { diff --git a/server/core/poll.cc b/server/core/poll.cc index 1472923dc..e9fb70721 100644 --- a/server/core/poll.cc +++ b/server/core/poll.cc @@ -36,7 +36,7 @@ #include #include #include "internal/poll.h" -#include "internal/worker.hh" +#include "internal/routingworker.hh" using maxscale::Worker; diff --git a/server/core/resource.cc b/server/core/resource.cc index ebd52b64d..8c0b3c004 100644 --- a/server/core/resource.cc +++ b/server/core/resource.cc @@ -35,7 +35,7 @@ #include "internal/config_runtime.h" #include "internal/modules.h" #include "internal/worker.h" -#include "internal/worker.hh" +#include "internal/routingworker.hh" using std::list; using std::map; diff --git a/server/core/worker.cc b/server/core/routingworker.cc similarity index 83% rename from server/core/worker.cc rename to server/core/routingworker.cc index acccec3d4..0902d6f57 100644 --- a/server/core/worker.cc +++ b/server/core/routingworker.cc @@ -11,7 +11,7 @@ * Public License. */ -#include "internal/worker.hh" +#include "internal/routingworker.hh" #include #include @@ -40,7 +40,7 @@ #define WORKER_ABSENT_ID -1 -using maxscale::Worker; +using maxscale::RoutingWorker; using maxscale::WorkerLoad; using maxscale::Closer; using maxscale::Semaphore; @@ -58,14 +58,14 @@ const int MXS_WORKER_MSG_DISPOSABLE_TASK = -2; */ struct this_unit { - bool initialized; // Whether the initialization has been performed. - int n_workers; // How many workers there are. - Worker** ppWorkers; // Array of worker instances. + bool initialized; // Whether the initialization has been performed. + int n_workers; // How many workers there are. + RoutingWorker** ppWorkers; // Array of worker instances. // DEPRECATED in 2.3, remove in 2.4. - int number_poll_spins; // Maximum non-block polls + int number_poll_spins; // Maximum non-block polls // DEPRECATED in 2.3, remove in 2.4. - int max_poll_sleep; // Maximum block time - int epoll_listener_fd; // Shared epoll descriptor for listening descriptors. + int max_poll_sleep; // Maximum block time + int epoll_listener_fd; // Shared epoll descriptor for listening descriptors. } this_unit = { false, @@ -198,7 +198,7 @@ uint64_t WorkerLoad::get_time() return t.tv_sec * 1000 + (t.tv_nsec / 1000000); } -Worker::Worker(int id, +RoutingWorker::RoutingWorker(int id, int epoll_fd) : m_id(id) , m_state(STOPPED) @@ -211,11 +211,11 @@ Worker::Worker(int id, , m_nCurrent_descriptors(0) , m_nTotal_descriptors(0) { - MXS_POLL_DATA::handler = &Worker::epoll_instance_handler; + MXS_POLL_DATA::handler = &RoutingWorker::epoll_instance_handler; MXS_POLL_DATA::thread.id = id; } -Worker::~Worker() +RoutingWorker::~RoutingWorker() { ss_dassert(!m_started); @@ -224,7 +224,7 @@ Worker::~Worker() } // static -bool Worker::init() +bool RoutingWorker::init() { ss_dassert(!this_unit.initialized); @@ -236,13 +236,13 @@ bool Worker::init() if (this_unit.epoll_listener_fd != -1) { int n_workers = config_threadcount(); - Worker** ppWorkers = new (std::nothrow) Worker* [n_workers] (); // Zero initialized array + RoutingWorker** ppWorkers = new (std::nothrow) RoutingWorker* [n_workers] (); // Zero initialized array if (ppWorkers) { for (int i = 0; i < n_workers; ++i) { - Worker* pWorker = Worker::create(i, this_unit.epoll_listener_fd); + RoutingWorker* pWorker = RoutingWorker::create(i, this_unit.epoll_listener_fd); if (pWorker) { @@ -292,13 +292,13 @@ bool Worker::init() return this_unit.initialized; } -void Worker::finish() +void RoutingWorker::finish() { ss_dassert(this_unit.initialized); for (int i = this_unit.n_workers - 1; i >= 0; --i) { - Worker* pWorker = this_unit.ppWorkers[i]; + RoutingWorker* pWorker = this_unit.ppWorkers[i]; delete pWorker; this_unit.ppWorkers[i] = NULL; @@ -316,16 +316,16 @@ void Worker::finish() namespace { -int64_t one_stats_get(int64_t Worker::STATISTICS::*what, enum ts_stats_type type) +int64_t one_stats_get(int64_t RoutingWorker::STATISTICS::*what, enum ts_stats_type type) { int64_t best = type == TS_STATS_MAX ? LONG_MIN : (type == TS_STATS_MIX ? LONG_MAX : 0); for (int i = 0; i < this_unit.n_workers; ++i) { - Worker* pWorker = Worker::get(i); + RoutingWorker* pWorker = RoutingWorker::get(i); ss_dassert(pWorker); - const Worker::STATISTICS& s = pWorker->statistics(); + const RoutingWorker::STATISTICS& s = pWorker->statistics(); int64_t value = s.*what; @@ -358,7 +358,7 @@ int64_t one_stats_get(int64_t Worker::STATISTICS::*what, enum ts_stats_type type } //static -Worker::STATISTICS Worker::get_statistics() +RoutingWorker::STATISTICS RoutingWorker::get_statistics() { STATISTICS cs; @@ -376,22 +376,22 @@ Worker::STATISTICS Worker::get_statistics() cs.maxqtime = one_stats_get(&STATISTICS::maxqtime, TS_STATS_MAX); cs.maxexectime = one_stats_get(&STATISTICS::maxexectime, TS_STATS_MAX); - for (int i = 0; i < Worker::STATISTICS::MAXNFDS - 1; i++) + for (int i = 0; i < RoutingWorker::STATISTICS::MAXNFDS - 1; i++) { for (int j = 0; j < this_unit.n_workers; ++j) { - Worker* pWorker = Worker::get(j); + RoutingWorker* pWorker = RoutingWorker::get(j); ss_dassert(pWorker); cs.n_fds[i] += pWorker->statistics().n_fds[i]; } } - for (int i = 0; i <= Worker::STATISTICS::N_QUEUE_TIMES; ++i) + for (int i = 0; i <= RoutingWorker::STATISTICS::N_QUEUE_TIMES; ++i) { for (int j = 0; j < this_unit.n_workers; ++j) { - Worker* pWorker = Worker::get(j); + RoutingWorker* pWorker = RoutingWorker::get(j); ss_dassert(pWorker); cs.qtimes[i] += pWorker->statistics().qtimes[i]; @@ -406,57 +406,57 @@ Worker::STATISTICS Worker::get_statistics() } //static -int64_t Worker::get_one_statistic(POLL_STAT what) +int64_t RoutingWorker::get_one_statistic(POLL_STAT what) { int64_t rv = 0; - int64_t Worker::STATISTICS::*member = NULL; + int64_t RoutingWorker::STATISTICS::*member = NULL; enum ts_stats_type approach; switch (what) { case POLL_STAT_READ: - member = &Worker::STATISTICS::n_read; + member = &RoutingWorker::STATISTICS::n_read; approach = TS_STATS_SUM; break; case POLL_STAT_WRITE: - member = &Worker::STATISTICS::n_write; + member = &RoutingWorker::STATISTICS::n_write; approach = TS_STATS_SUM; break; case POLL_STAT_ERROR: - member = &Worker::STATISTICS::n_error; + member = &RoutingWorker::STATISTICS::n_error; approach = TS_STATS_SUM; break; case POLL_STAT_HANGUP: - member = &Worker::STATISTICS::n_hup; + member = &RoutingWorker::STATISTICS::n_hup; approach = TS_STATS_SUM; break; case POLL_STAT_ACCEPT: - member = &Worker::STATISTICS::n_accept; + member = &RoutingWorker::STATISTICS::n_accept; approach = TS_STATS_SUM; break; case POLL_STAT_EVQ_LEN: - member = &Worker::STATISTICS::evq_length; + member = &RoutingWorker::STATISTICS::evq_length; approach = TS_STATS_AVG; break; case POLL_STAT_EVQ_MAX: - member = &Worker::STATISTICS::evq_max; + member = &RoutingWorker::STATISTICS::evq_max; approach = TS_STATS_MAX; break; case POLL_STAT_MAX_QTIME: - member = &Worker::STATISTICS::maxqtime; + member = &RoutingWorker::STATISTICS::maxqtime; approach = TS_STATS_MAX; break; case POLL_STAT_MAX_EXECTIME: - member = &Worker::STATISTICS::maxexectime; + member = &RoutingWorker::STATISTICS::maxexectime; approach = TS_STATS_MAX; break; @@ -472,13 +472,13 @@ int64_t Worker::get_one_statistic(POLL_STAT what) return rv; } -void Worker::get_descriptor_counts(uint32_t* pnCurrent, uint64_t* pnTotal) +void RoutingWorker::get_descriptor_counts(uint32_t* pnCurrent, uint64_t* pnTotal) { *pnCurrent = atomic_load_uint32(&m_nCurrent_descriptors); *pnTotal = atomic_load_uint64(&m_nTotal_descriptors); } -bool Worker::add_fd(int fd, uint32_t events, MXS_POLL_DATA* pData) +bool RoutingWorker::add_fd(int fd, uint32_t events, MXS_POLL_DATA* pData) { bool rv = true; @@ -507,7 +507,7 @@ bool Worker::add_fd(int fd, uint32_t events, MXS_POLL_DATA* pData) } //static -bool Worker::add_shared_fd(int fd, uint32_t events, MXS_POLL_DATA* pData) +bool RoutingWorker::add_shared_fd(int fd, uint32_t events, MXS_POLL_DATA* pData) { bool rv = true; @@ -535,7 +535,7 @@ bool Worker::add_shared_fd(int fd, uint32_t events, MXS_POLL_DATA* pData) return rv; } -bool Worker::remove_fd(int fd) +bool RoutingWorker::remove_fd(int fd) { bool rv = true; @@ -555,7 +555,7 @@ bool Worker::remove_fd(int fd) } //static -bool Worker::remove_shared_fd(int fd) +bool RoutingWorker::remove_shared_fd(int fd) { bool rv = true; @@ -572,15 +572,15 @@ bool Worker::remove_shared_fd(int fd) int mxs_worker_id(MXS_WORKER* pWorker) { - return static_cast(pWorker)->id(); + return static_cast(pWorker)->id(); } bool mxs_worker_should_shutdown(MXS_WORKER* pWorker) { - return static_cast(pWorker)->should_shutdown(); + return static_cast(pWorker)->should_shutdown(); } -Worker* Worker::get(int worker_id) +RoutingWorker* RoutingWorker::get(int worker_id) { ss_dassert(worker_id < this_unit.n_workers); @@ -589,51 +589,51 @@ Worker* Worker::get(int worker_id) MXS_WORKER* mxs_worker_get(int worker_id) { - return Worker::get(worker_id); + return RoutingWorker::get(worker_id); } int mxs_worker_get_current_id() { - return Worker::get_current_id(); + return RoutingWorker::get_current_id(); } -Worker* Worker::get_current() +RoutingWorker* RoutingWorker::get_current() { - Worker* pWorker = NULL; + RoutingWorker* pWorker = NULL; int worker_id = get_current_id(); if (worker_id != WORKER_ABSENT_ID) { - pWorker = Worker::get(worker_id); + pWorker = RoutingWorker::get(worker_id); } return pWorker; } -int Worker::get_current_id() +int RoutingWorker::get_current_id() { return this_thread.current_worker_id; } //static -void Worker::set_nonblocking_polls(unsigned int nbpolls) +void RoutingWorker::set_nonblocking_polls(unsigned int nbpolls) { this_unit.number_poll_spins = nbpolls; } //static -void Worker::set_maxwait(unsigned int maxwait) +void RoutingWorker::set_maxwait(unsigned int maxwait) { this_unit.max_poll_sleep = maxwait; } -bool Worker::post(Task* pTask, Semaphore* pSem, enum execute_mode_t mode) +bool RoutingWorker::post(Task* pTask, Semaphore* pSem, enum execute_mode_t mode) { // No logging here, function must be signal safe. bool rval = true; - if (mode == Worker::EXECUTE_AUTO && Worker::get_current() == this) + if (mode == RoutingWorker::EXECUTE_AUTO && RoutingWorker::get_current() == this) { pTask->execute(*this); @@ -653,20 +653,20 @@ bool Worker::post(Task* pTask, Semaphore* pSem, enum execute_mode_t mode) return rval; } -bool Worker::post(std::auto_ptr sTask, enum execute_mode_t mode) +bool RoutingWorker::post(std::auto_ptr sTask, enum execute_mode_t mode) { // No logging here, function must be signal safe. return post_disposable(sTask.release(), mode); } // private -bool Worker::post_disposable(DisposableTask* pTask, enum execute_mode_t mode) +bool RoutingWorker::post_disposable(DisposableTask* pTask, enum execute_mode_t mode) { bool posted = true; pTask->inc_ref(); - if (mode == Worker::EXECUTE_AUTO && Worker::get_current() == this) + if (mode == RoutingWorker::EXECUTE_AUTO && RoutingWorker::get_current() == this) { pTask->execute(*this); pTask->dec_ref(); @@ -687,14 +687,14 @@ bool Worker::post_disposable(DisposableTask* pTask, enum execute_mode_t mode) } //static -size_t Worker::broadcast(Task* pTask, Semaphore* pSem) +size_t RoutingWorker::broadcast(Task* pTask, Semaphore* pSem) { // No logging here, function must be signal safe. size_t n = 0; for (int i = 0; i < this_unit.n_workers; ++i) { - Worker* pWorker = this_unit.ppWorkers[i]; + RoutingWorker* pWorker = this_unit.ppWorkers[i]; if (pWorker->post(pTask, pSem)) { @@ -706,7 +706,7 @@ size_t Worker::broadcast(Task* pTask, Semaphore* pSem) } //static -size_t Worker::broadcast(std::auto_ptr sTask) +size_t RoutingWorker::broadcast(std::auto_ptr sTask) { DisposableTask* pTask = sTask.release(); pTask->inc_ref(); @@ -715,7 +715,7 @@ size_t Worker::broadcast(std::auto_ptr sTask) for (int i = 0; i < this_unit.n_workers; ++i) { - Worker* pWorker = this_unit.ppWorkers[i]; + RoutingWorker* pWorker = this_unit.ppWorkers[i]; if (pWorker->post_disposable(pTask)) { @@ -729,14 +729,14 @@ size_t Worker::broadcast(std::auto_ptr sTask) } //static -size_t Worker::execute_serially(Task& task) +size_t RoutingWorker::execute_serially(Task& task) { Semaphore sem; size_t n = 0; for (int i = 0; i < this_unit.n_workers; ++i) { - Worker* pWorker = this_unit.ppWorkers[i]; + RoutingWorker* pWorker = this_unit.ppWorkers[i]; if (pWorker->post(&task, &sem)) { @@ -749,13 +749,13 @@ size_t Worker::execute_serially(Task& task) } //static -size_t Worker::execute_concurrently(Task& task) +size_t RoutingWorker::execute_concurrently(Task& task) { Semaphore sem; - return sem.wait_n(Worker::broadcast(&task, &sem)); + return sem.wait_n(RoutingWorker::broadcast(&task, &sem)); } -bool Worker::post_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2) +bool RoutingWorker::post_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2) { // NOTE: No logging here, this function must be signal safe. MessageQueue::Message message(msg_id, arg1, arg2); @@ -765,10 +765,10 @@ bool Worker::post_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2) bool mxs_worker_post_message(MXS_WORKER* pWorker, uint32_t msg_id, intptr_t arg1, intptr_t arg2) { - return static_cast(pWorker)->post_message(msg_id, arg1, arg2); + return static_cast(pWorker)->post_message(msg_id, arg1, arg2); } -size_t Worker::broadcast_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2) +size_t RoutingWorker::broadcast_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2) { // NOTE: No logging here, this function must be signal safe. @@ -776,7 +776,7 @@ size_t Worker::broadcast_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2) for (int i = 0; i < this_unit.n_workers; ++i) { - Worker* pWorker = this_unit.ppWorkers[i]; + RoutingWorker* pWorker = this_unit.ppWorkers[i]; if (pWorker->post_message(msg_id, arg1, arg2)) { @@ -789,31 +789,31 @@ size_t Worker::broadcast_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2) size_t mxs_worker_broadcast_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2) { - return Worker::broadcast_message(msg_id, arg1, arg2); + return RoutingWorker::broadcast_message(msg_id, arg1, arg2); } bool mxs_worker_register_session(MXS_SESSION* session) { - Worker* worker = Worker::get_current(); - ss_dassert(worker); - return worker->session_registry().add(session); + RoutingWorker* pWorker = RoutingWorker::get_current(); + ss_dassert(pWorker); + return pWorker->session_registry().add(session); } bool mxs_worker_deregister_session(uint64_t id) { - Worker* worker = Worker::get_current(); - ss_dassert(worker); - return worker->session_registry().remove(id); + RoutingWorker* pWorker = RoutingWorker::get_current(); + ss_dassert(pWorker); + return pWorker->session_registry().remove(id); } MXS_SESSION* mxs_worker_find_session(uint64_t id) { - Worker* worker = Worker::get_current(); - ss_dassert(worker); - return worker->session_registry().lookup(id); + RoutingWorker* pWorker = RoutingWorker::get_current(); + ss_dassert(pWorker); + return pWorker->session_registry().lookup(id); } -Worker::SessionsById& Worker::session_registry() +RoutingWorker::SessionsById& RoutingWorker::session_registry() { return m_sessions; } @@ -827,10 +827,10 @@ public: m_data.resize(nthreads); } - void execute(Worker& worker) + void execute(RoutingWorker& worker) { json_t* stats = json_object(); - const Worker::STATISTICS& s = worker.get_local_statistics(); + const RoutingWorker::STATISTICS& s = worker.get_local_statistics(); json_object_set_new(stats, "reads", json_integer(s.n_read)); json_object_set_new(stats, "writes", json_integer(s.n_write)); json_object_set_new(stats, "errors", json_integer(s.n_error)); @@ -885,7 +885,7 @@ private: json_t* mxs_worker_to_json(const char* host, int id) { - Worker* target = Worker::get(id); + RoutingWorker* target = RoutingWorker::get(id); WorkerInfoTask task(host, id + 1); Semaphore sem; @@ -898,18 +898,18 @@ json_t* mxs_worker_to_json(const char* host, int id) json_t* mxs_worker_list_to_json(const char* host) { WorkerInfoTask task(host, config_threadcount()); - Worker::execute_concurrently(task); + RoutingWorker::execute_concurrently(task); return task.resource(); } -void Worker::register_zombie(DCB* pDcb) +void RoutingWorker::register_zombie(DCB* pDcb) { ss_dassert(pDcb->poll.thread.id == m_id); m_zombies.push_back(pDcb); } -void Worker::delete_zombies() +void RoutingWorker::delete_zombies() { // An algorithm cannot be used, as the final closing of a DCB may cause // other DCBs to be registered in the zombie queue. @@ -922,7 +922,7 @@ void Worker::delete_zombies() } } -void Worker::run() +void RoutingWorker::run() { this_thread.current_worker_id = m_id; @@ -941,11 +941,11 @@ void Worker::run() this_thread.current_worker_id = WORKER_ABSENT_ID; } -bool Worker::start(size_t stack_size) +bool RoutingWorker::start(size_t stack_size) { m_started = true; - if (!thread_start(&m_thread, &Worker::thread_main, this, stack_size)) + if (!thread_start(&m_thread, &RoutingWorker::thread_main, this, stack_size)) { m_started = false; } @@ -953,7 +953,7 @@ bool Worker::start(size_t stack_size) return m_started; } -void Worker::join() +void RoutingWorker::join() { if (m_started) { @@ -964,7 +964,7 @@ void Worker::join() } } -void Worker::shutdown() +void RoutingWorker::shutdown() { // NOTE: No logging here, this function must be signal safe. @@ -977,14 +977,14 @@ void Worker::shutdown() } } -void Worker::shutdown_all() +void RoutingWorker::shutdown_all() { // NOTE: No logging here, this function must be signal safe. ss_dassert((this_unit.n_workers == 0) || (this_unit.ppWorkers != NULL)); for (int i = 0; i < this_unit.n_workers; ++i) { - Worker* pWorker = this_unit.ppWorkers[i]; + RoutingWorker* pWorker = this_unit.ppWorkers[i]; ss_dassert(pWorker); pWorker->shutdown(); @@ -1004,15 +1004,15 @@ void Worker::shutdown_all() * @return A worker instance if successful, otherwise NULL. */ //static -Worker* Worker::create(int worker_id, int epoll_listener_fd) +RoutingWorker* RoutingWorker::create(int worker_id, int epoll_listener_fd) { - Worker* pThis = NULL; + RoutingWorker* pThis = NULL; int epoll_fd = epoll_create(MAX_EVENTS); if (epoll_fd != -1) { - pThis = new (std::nothrow) Worker(worker_id, epoll_fd); + pThis = new (std::nothrow) RoutingWorker(worker_id, epoll_fd); if (pThis) { @@ -1084,7 +1084,7 @@ Worker* Worker::create(int worker_id, int epoll_listener_fd) * @param arg1 Message specific first argument. * @param arg2 Message specific second argument. */ -void Worker::handle_message(MessageQueue& queue, const MessageQueue::Message& msg) +void RoutingWorker::handle_message(MessageQueue& queue, const MessageQueue::Message& msg) { switch (msg.id()) { @@ -1146,16 +1146,16 @@ void Worker::handle_message(MessageQueue& queue, const MessageQueue::Message& ms * @param arg A worker. */ //static -void Worker::thread_main(void* pArg) +void RoutingWorker::thread_main(void* pArg) { - Worker* pWorker = static_cast(pArg); + RoutingWorker* pWorker = static_cast(pArg); pWorker->run(); } /** * The main polling loop */ -void Worker::poll_waitevents() +void RoutingWorker::poll_waitevents() { struct epoll_event events[MAX_EVENTS]; @@ -1299,9 +1299,9 @@ void Worker::poll_waitevents() * @return What actions were performed. */ //static -uint32_t Worker::epoll_instance_handler(struct mxs_poll_data* pData, int wid, uint32_t events) +uint32_t RoutingWorker::epoll_instance_handler(struct mxs_poll_data* pData, int wid, uint32_t events) { - Worker* pWorker = static_cast(pData); + RoutingWorker* pWorker = static_cast(pData); ss_dassert(pWorker->m_id == wid); return pWorker->handle_epoll_events(events); @@ -1314,7 +1314,7 @@ uint32_t Worker::epoll_instance_handler(struct mxs_poll_data* pData, int wid, ui * * @return What actions were performed. */ -uint32_t Worker::handle_epoll_events(uint32_t events) +uint32_t RoutingWorker::handle_epoll_events(uint32_t events) { struct epoll_event epoll_events[1]; diff --git a/server/core/server.cc b/server/core/server.cc index fc3eba990..fe752f8f6 100644 --- a/server/core/server.cc +++ b/server/core/server.cc @@ -44,7 +44,7 @@ #include "internal/monitor.h" #include "internal/poll.h" #include "internal/workertask.hh" -#include "internal/worker.hh" +#include "internal/routingworker.hh" using maxscale::Semaphore; using maxscale::Worker; diff --git a/server/core/session.cc b/server/core/session.cc index 9757a84c1..7a888e0d8 100644 --- a/server/core/session.cc +++ b/server/core/session.cc @@ -41,9 +41,9 @@ #include #include "internal/dcb.h" -#include "internal/session.h" #include "internal/filter.h" -#include "internal/worker.hh" +#include "internal/routingworker.hh" +#include "internal/session.h" #include "internal/workertask.hh" using std::string; diff --git a/server/core/test/test_dcb.cc b/server/core/test/test_dcb.cc index b555693d2..9dc52554b 100644 --- a/server/core/test/test_dcb.cc +++ b/server/core/test/test_dcb.cc @@ -38,7 +38,7 @@ #include #include "../internal/messagequeue.hh" -#include "../internal/worker.hh" +#include "../internal/routingworker.hh" #include "../dcb.cc" /** diff --git a/server/core/test/test_utils.h b/server/core/test/test_utils.h index 72c4cc8cc..861dd2c34 100644 --- a/server/core/test/test_utils.h +++ b/server/core/test/test_utils.h @@ -27,8 +27,8 @@ #include #include "../internal/poll.h" +#include "../internal/routingworker.hh" #include "../internal/statistics.h" -#include "../internal/worker.hh" void init_test_env(char *path) diff --git a/server/modules/protocol/MySQL/mariadb_client.cc b/server/modules/protocol/MySQL/mariadb_client.cc index 009c731a9..fbb029bd1 100644 --- a/server/modules/protocol/MySQL/mariadb_client.cc +++ b/server/modules/protocol/MySQL/mariadb_client.cc @@ -15,7 +15,7 @@ #include // TODO: Find a way to cleanly expose this -#include "../../../core/internal/worker.hh" +#include "../../../core/internal/routingworker.hh" #ifdef EPOLLRDHUP #define ERROR_EVENTS (EPOLLRDHUP | EPOLLHUP | EPOLLERR)