From b36f6faa7e13259d24f74b60a84a654c8a0bd689 Mon Sep 17 00:00:00 2001 From: Johan Wikman Date: Fri, 13 Apr 2018 09:51:27 +0300 Subject: [PATCH] MXS-1754 Reintroduce maxscale::Worker Worker is now the base class of all workers. It has a message queue and can be run in a thread of its own, or in the calling thread. Worker can not be used as such, but a concrete worker class must be derived from it. Currently there is only one concrete class RoutingWorker. There is some overlapping in functionality between Worker and RoutingWorker, as there is e.g. a need for broadcasting a message to all routing workers, but not to other workers. Currently other workers can not be created as the array for holding the pointers to the workers is exactly as large as there will be RoutingWorkers. That will be changed so that the maximum number of threads is hardwired to some ridiculous value such as 128. That's the first step in the path towards a situation where the number of worker threads can be changed at runtime. --- include/maxscale/routingworker.h | 97 ++ include/maxscale/worker.h | 37 +- server/core/CMakeLists.txt | 1 + server/core/dcb.cc | 9 +- server/core/gateway.cc | 47 +- server/core/internal/messagequeue.hh | 14 +- server/core/internal/routingworker.hh | 809 +---------- server/core/internal/worker.hh | 862 ++++++++++++ server/core/internal/workertask.hh | 8 +- server/core/messagequeue.cc | 6 +- server/core/poll.cc | 19 +- server/core/resource.cc | 3 +- server/core/routingworker.cc | 1205 +++-------------- server/core/service.cc | 4 +- server/core/worker.cc | 1071 +++++++++++++++ .../authenticator/MySQLAuth/mysql_auth.c | 4 +- .../MySQL/mariadbclient/mysql_client.cc | 6 +- .../modules/routing/binlogrouter/blr_master.c | 10 +- server/modules/routing/debugcli/debugcmd.c | 18 +- 19 files changed, 2338 insertions(+), 1892 deletions(-) create mode 100644 include/maxscale/routingworker.h create mode 100644 server/core/internal/worker.hh create mode 100644 server/core/worker.cc diff --git a/include/maxscale/routingworker.h b/include/maxscale/routingworker.h new file mode 100644 index 000000000..bfceb90df --- /dev/null +++ b/include/maxscale/routingworker.h @@ -0,0 +1,97 @@ +#pragma once +/* + * Copyright (c) 2016 MariaDB Corporation Ab + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file and at www.mariadb.com/bsl11. + * + * Change Date: 2020-01-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2 or later of the General + * Public License. + */ + +#include +#include +#include + +MXS_BEGIN_DECLS + +#define MXS_RWORKER_MAIN -1 + +/** + * Return the routing worker associated with the provided worker id. + * + * @param worker_id A worker id. If MXS_RWORKER_MAIN is used, the + * routing worker running in the main thread will + * be returned. + * + * @return The corresponding routing worker instance, or NULL if the + * id does not correspond to a routing worker. + */ +MXS_WORKER* mxs_rworker_get(int worker_id); + +/** + * Return the current routing worker. + * + * @return A routing worker, or NULL if there is no current routing worker. + */ +MXS_WORKER* mxs_rworker_get_current(); + +/** + * Return the id of the current routing worker. + * + * @return The id of the routing worker, or -1 if there is no current + * routing worker. + */ +int mxs_rworker_get_current_id(); + +/** + * Broadcast a message to all routing workers. + * + * @param msg_id The message id. + * @param arg1 Message specific first argument. + * @param arg2 Message specific second argument. + * + * @return The number of messages posted; if less that ne number of workers + * then some postings failed. + * + * @attention The return value tells *only* whether message could be posted, + * *not* that it has reached the worker. + * + * @attentsion Exactly the same arguments are passed to all workers. Take that + * into account if the passed data must be freed. + * + * @attention This function is signal safe. + */ +size_t mxs_rworker_broadcast_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2); + +/** + * Add a session to the current routing worker's session container. Currently + * only required for some special commands e.g. "KILL " to work. + * + * @param session Session to add. + * @return true if successful, false if id already existed in map. + */ +bool mxs_rworker_register_session(MXS_SESSION* session); + +/** + * Remove a session from the current routing worker's session container. Does + * not actually remove anything from an epoll-set or affect the session in any + * way. + * + * @param id Which id to remove. + * @return The removed session or NULL if not found. + */ +bool mxs_rworker_deregister_session(uint64_t id); + +/** + * Find a session in the current routing worker's session container. + * + * @param id Which id to find. + * @return The found session or NULL if not found. + */ +MXS_SESSION* mxs_rworker_find_session(uint64_t id); + +MXS_END_DECLS diff --git a/include/maxscale/worker.h b/include/maxscale/worker.h index b3ccec58b..7dd88bcbc 100644 --- a/include/maxscale/worker.h +++ b/include/maxscale/worker.h @@ -70,12 +70,17 @@ MXS_WORKER* mxs_worker_get(int worker_id); */ int mxs_worker_id(MXS_WORKER* pWorker); +/** + * Return the current worker. + * + * @return A worker, or NULL if there is no current worker. + */ +MXS_WORKER* mxs_worker_get_current(); + /** * Return the id of the worker. * - * @return The id of the worker. - * - * @attention If there is no current worker, then -1 will be returned. + * @return The id of the worker, or -1 if there is no current worker. */ int mxs_worker_get_current_id(); @@ -117,32 +122,6 @@ bool mxs_worker_post_message(MXS_WORKER* worker, uint32_t msg_id, intptr_t arg1, */ size_t mxs_worker_broadcast_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2); -/** - * Add a session to the current worker's session container. Currently only - * required for some special commands e.g. "KILL " to work. - * - * @param session Session to add. - * @return true if successful, false if id already existed in map. - */ -bool mxs_worker_register_session(MXS_SESSION* session); - -/** - * Remove a session from the current worker's session container. Does not actually - * remove anything from an epoll-set or affect the session in any way. - * - * @param id Which id to remove. - * @return The removed session or NULL if not found. - */ -bool mxs_worker_deregister_session(uint64_t id); - -/** - * Find a session in the current worker's session container. - * - * @param id Which id to find. - * @return The found session or NULL if not found. - */ -MXS_SESSION* mxs_worker_find_session(uint64_t id); - /** * @brief Convert a worker to JSON format * diff --git a/server/core/CMakeLists.txt b/server/core/CMakeLists.txt index 5b3ece44b..8b468935d 100644 --- a/server/core/CMakeLists.txt +++ b/server/core/CMakeLists.txt @@ -52,6 +52,7 @@ add_library(maxscale-common SHARED thread.cc users.cc utils.cc + worker.cc workertask.cc ) diff --git a/server/core/dcb.cc b/server/core/dcb.cc index fa5702452..9f3dbd234 100644 --- a/server/core/dcb.cc +++ b/server/core/dcb.cc @@ -61,6 +61,7 @@ #include "internal/session.h" #include "internal/workertask.hh" +using maxscale::RoutingWorker; using maxscale::Worker; using maxscale::WorkerTask; using maxscale::Semaphore; @@ -208,8 +209,8 @@ dcb_alloc(dcb_role_t role, SERV_LISTENER *listener) else { /** Otherwise the DCB is owned by the thread that allocates it */ - ss_dassert(Worker::get_current_id() != -1); - newdcb->poll.thread.id = Worker::get_current_id(); + ss_dassert(RoutingWorker::get_current_id() != -1); + newdcb->poll.thread.id = RoutingWorker::get_current_id(); } return newdcb; @@ -1148,7 +1149,7 @@ void dcb_close(DCB *dcb) } else { - Worker* worker = Worker::get(dcb->poll.thread.id); + RoutingWorker* worker = RoutingWorker::get(dcb->poll.thread.id); ss_dassert(worker); worker->register_zombie(dcb); @@ -1179,7 +1180,7 @@ void dcb_close_in_owning_thread(DCB* dcb) // TODO: reference counted, so that we could addref before posting, thus // TODO: preventing too early a deletion. - MXS_WORKER* worker = mxs_worker_get(dcb->poll.thread.id); // The owning worker + MXS_WORKER* worker = mxs_rworker_get(dcb->poll.thread.id); // The owning worker ss_dassert(worker); intptr_t arg1 = (intptr_t)cb_dcb_close_in_owning_thread; diff --git a/server/core/gateway.cc b/server/core/gateway.cc index 9f98695aa..b6f661efc 100644 --- a/server/core/gateway.cc +++ b/server/core/gateway.cc @@ -1399,8 +1399,6 @@ bool set_dirs(const char *basedir) int main(int argc, char **argv) { int rc = MAXSCALE_SHUTDOWN; - int n_threads; /*< number of epoll listener threads */ - size_t thread_stack_size; int n_services; int eno = 0; /*< local variable for errno */ int opt; @@ -2129,6 +2127,13 @@ int main(int argc, char **argv) goto return_main; } + if (!RoutingWorker::init()) + { + MXS_ERROR("Failed to initialize routing workers."); + rc = MAXSCALE_INTERNALERROR; + goto return_main; + } + /* Init MaxScale modules */ if (!modules_process_init()) { @@ -2183,27 +2188,14 @@ int main(int argc, char **argv) } /*< - * Start the polling threads, note this is one less than is - * configured as the main thread will also poll. + * Start the routing workers running in their own thread. */ - n_threads = config_threadcount(); - - /*< - * Start workers. We start from 1, worker 0 will be running in the main thread. - */ - thread_stack_size = config_thread_stack_size(); - for (int i = 1; i < n_threads; i++) + if (!RoutingWorker::start_threaded_workers()) { - worker = Worker::get(i); - ss_dassert(worker); - - if (!worker->start(thread_stack_size)) - { - const char* logerr = "Failed to start worker thread."; - print_log_n_stderr(true, true, logerr, logerr, 0); - rc = MAXSCALE_INTERNALERROR; - goto return_main; - } + const char* logerr = "Failed to start routing workers."; + print_log_n_stderr(true, true, logerr, logerr, 0); + rc = MAXSCALE_INTERNALERROR; + goto return_main; } if (cnf->admin_enabled) @@ -2232,7 +2224,7 @@ int main(int argc, char **argv) } MXS_NOTICE("MaxScale started with %d worker threads, each with a stack size of %lu bytes.", - n_threads, thread_stack_size); + config_threadcount(), config_thread_stack_size()); /** * Successful start, notify the parent process that it can exit. @@ -2246,7 +2238,7 @@ int main(int argc, char **argv) /*< * Run worker 0 in the main thread. */ - worker = Worker::get(0); + worker = RoutingWorker::get(RoutingWorker::MAIN); ss_dassert(worker); worker->run(); @@ -2267,13 +2259,7 @@ int main(int argc, char **argv) /*< * Wait for worker threads to exit. */ - for (int i = 1; i < n_threads; i++) - { - worker = Worker::get(i); - ss_dassert(worker); - - worker->join(); - } + RoutingWorker::join_threaded_workers(); MXS_NOTICE("All workers have shut down."); @@ -2282,6 +2268,7 @@ int main(int argc, char **argv) */ service_destroy_instances(); + RoutingWorker::finish(); Worker::finish(); MessageQueue::finish(); diff --git a/server/core/internal/messagequeue.hh b/server/core/internal/messagequeue.hh index 8a928ac47..8278f670b 100644 --- a/server/core/internal/messagequeue.hh +++ b/server/core/internal/messagequeue.hh @@ -19,7 +19,7 @@ namespace maxscale { class MessageQueue; -class RoutingWorker; +class Worker; /** * An instance of @c MessageQueueMessage can be sent over a @c MessageQueue from @@ -180,7 +180,7 @@ public: * @attention If the message queue is currently added to a worker, it * will first be removed from that worker. */ - bool add_to_worker(RoutingWorker* pWorker); + bool add_to_worker(Worker* pWorker); /** * Removes the message queue from the worker it is currently added to. @@ -188,7 +188,7 @@ public: * @return The worker the message queue was associated with, or NULL * if it was not associated with any. */ - RoutingWorker* remove_from_worker(); + Worker* remove_from_worker(); private: MessageQueue(Handler* pHandler, int read_fd, int write_fd); @@ -198,10 +198,10 @@ private: static uint32_t poll_handler(MXS_POLL_DATA* pData, int thread_id, uint32_t events); private: - Handler& m_handler; - int m_read_fd; - int m_write_fd; - RoutingWorker* m_pWorker; + Handler& m_handler; + int m_read_fd; + int m_write_fd; + Worker* m_pWorker; }; } diff --git a/server/core/internal/routingworker.hh b/server/core/internal/routingworker.hh index fb4b9290c..6a34b62c6 100644 --- a/server/core/internal/routingworker.hh +++ b/server/core/internal/routingworker.hh @@ -13,450 +13,30 @@ */ #include -#include -#include -#include -#include -#include -#include "messagequeue.hh" -#include "poll.h" -#include "worker.h" -#include "workertask.hh" +#include +#include "worker.hh" #include "session.hh" namespace maxscale { -class Semaphore; - -struct WORKER_STATISTICS +class RoutingWorker : public Worker + , private MXS_POLL_DATA { - WORKER_STATISTICS() - { - memset(this, 0, sizeof(WORKER_STATISTICS)); - } - - enum - { - MAXNFDS = 10, - N_QUEUE_TIMES = 30 - }; - - int64_t n_read; /*< Number of read events */ - int64_t n_write; /*< Number of write events */ - int64_t n_error; /*< Number of error events */ - int64_t n_hup; /*< Number of hangup events */ - int64_t n_accept; /*< Number of accept events */ - int64_t n_polls; /*< Number of poll cycles */ - int64_t n_pollev; /*< Number of polls returning events */ - int64_t n_nbpollev; /*< Number of polls returning events */ - int64_t n_fds[MAXNFDS]; /*< Number of wakeups with particular n_fds value */ - int64_t evq_length; /*< Event queue length */ - int64_t evq_max; /*< Maximum event queue length */ - int64_t blockingpolls; /*< Number of epoll_waits with a timeout specified */ - uint32_t qtimes[N_QUEUE_TIMES + 1]; - uint32_t exectimes[N_QUEUE_TIMES + 1]; - int64_t maxqtime; - int64_t maxexectime; -}; - -/** - * WorkerLoad is a class that calculates the load percentage of a worker - * thread, based upon the relative amount of time the worker spends in - * epoll_wait(). - * - * If during a time period of length T milliseconds, the worker thread - * spends t milliseconds in epoll_wait(), then the load of the worker is - * calculated as 100 * ((T - t) / T). That is, if the worker spends all - * the time in epoll_wait(), then the load is 0 and if the worker spends - * no time waiting in epoll_wait(), then the load is 100. - */ -class WorkerLoad -{ - WorkerLoad(const WorkerLoad&); - WorkerLoad& operator = (const WorkerLoad&); + RoutingWorker(const RoutingWorker&) = delete; + RoutingWorker& operator = (const RoutingWorker&) = delete; public: - enum counter_t - { - ONE_SECOND = 1000, - ONE_MINUTE = 60 * ONE_SECOND, - ONE_HOUR = 60 * ONE_MINUTE, - }; - enum { - GRANULARITY = ONE_SECOND + MAIN = -1 }; - /** - * Constructor - */ - WorkerLoad(); - - /** - * Reset the load calculation. Should be called immediately before the - * worker enters its eternal epoll_wait()-loop. - */ - void reset() - { - uint64_t now = get_time(); - - m_start_time = now; - m_wait_start = 0; - m_wait_time = 0; - } - - /** - * To be used for signaling that the worker is about to call epoll_wait(). - * - * @param now The current time. - */ - void about_to_wait(uint64_t now) - { - m_wait_start = now; - } - - void about_to_wait() - { - about_to_wait(get_time()); - } - - /** - * To be used for signaling that the worker has returned from epoll_wait(). - * - * @param now The current time. - */ - void about_to_work(uint64_t now); - - void about_to_work() - { - about_to_work(get_time()); - } - - /** - * Returns the last calculated load, - * - * @return A value between 0 and 100. - */ - uint8_t percentage(counter_t counter) const - { - switch (counter) - { - case ONE_SECOND: - return m_load_1_second.value(); - - case ONE_MINUTE: - return m_load_1_minute.value(); - - case ONE_HOUR: - return m_load_1_hour.value(); - - default: - ss_dassert(!true); - return 0; - }; - } - - /** - * When was the last 1 second period started. - * - * @return The start time. - */ - uint64_t start_time() const - { - return m_start_time; - } - - /** - * Returns the current time using CLOCK_MONOTONIC. - * - * @return Current time in milliseconds. - */ - static uint64_t get_time(); - -private: - /** - * Average is a base class for classes intended to be used for calculating - * averages. An Average may have a dependant Average whose value depends - * upon the value of the first. At certain moments, an Average may trigger - * its dependant Average to update itself. - */ - class Average - { - Average(const Average&); - Average& operator = (const Average&); - - public: - /** - * Constructor - * - * @param pDependant An optional dependant average. - */ - Average(Average* pDependant = NULL) - : m_pDependant(pDependant) - , m_value(0) - {} - - virtual ~Average(); - - /** - * Add a value to the Average. The exact meaning depends upon the - * concrete Average class. - * - * If the addition of the value in some sense represents a full cycle - * in the average calculation, then the instance will call add_value() - * on its dependant, otherwise it will call update_value(). In both cases - * with its own value as argument. - * - * @param value The value to be added. - * - * @return True if the addition of the value caused a full cycle - * in the average calculation, false otherwise. - */ - virtual bool add_value(uint8_t value) = 0; - - /** - * Update the value of the Average. The exact meaning depends upon the - * concrete Average class. Will also call update_value() of its dependant - * with its own value as argument. - * - * @param value The value to be updated. - */ - virtual void update_value(uint8_t value) = 0; - - /** - * Return the average value. - * - * @return The value represented by the Average. - */ - uint8_t value() const - { - return atomic_load_uint32(&m_value); - } - - protected: - Average* m_pDependant; /*< The optional dependant Average. */ - uint32_t m_value; /*< The current average value. */ - - protected: - void set_value(uint32_t value) - { - atomic_store_uint32(&m_value, value); - } - }; - - /** - * An Average consisting of a single value. - */ - class Average1 : public Average - { - public: - Average1(Average* pDependant = NULL) - : Average(pDependant) - { - } - - bool add_value(uint8_t value) - { - set_value(value); - - // Every addition of a value represents a full cycle. - if (m_pDependant) - { - m_pDependant->add_value(value); - } - - return true; - } - - void update_value(uint8_t value) - { - set_value(value); - - if (m_pDependant) - { - m_pDependant->update_value(value); - } - } - }; - - /** - * An Average calculated from N values. - */ - template - class AverageN : public Average - { - public: - AverageN(Average* pDependant = NULL) - : Average(pDependant) - , m_end(m_begin + N) - , m_i(m_begin) - , m_sum(0) - , m_nValues(0) - { - } - - bool add_value(uint8_t value) - { - if (m_nValues == N) - { - // If as many values that fit has been added, then remove the - // least recent value from the sum. - m_sum -= *m_i; - } - else - { - // Otherwise make a note that a new value is added. - ++m_nValues; - } - - *m_i = value; - m_sum += *m_i; // Update the sum of all values. - - m_i = next(m_i); - - uint32_t average = m_sum / m_nValues; - - set_value(average); - - if (m_pDependant) - { - if (m_i == m_begin) - { - // If we have looped around we have performed a full cycle and will - // add a new value to the dependant average. - m_pDependant->add_value(average); - } - else - { - // Otherwise we just update the most recent value. - m_pDependant->update_value(average); - } - } - - return m_i == m_begin; - } - - void update_value(uint8_t value) - { - if (m_nValues == 0) - { - // If no values have been added yet, there's nothing to update but we - // need to add the value. - add_value(value); - } - else - { - // Otherwise we update the most recent value. - uint8_t* p = prev(m_i); - - m_sum -= *p; - *p = value; - m_sum += *p; - - uint32_t average = m_sum / m_nValues; - - set_value(average); - - if (m_pDependant) - { - m_pDependant->update_value(average); - } - } - } - - private: - uint8_t* prev(uint8_t* p) - { - ss_dassert(p >= m_begin); - ss_dassert(p < m_end); - - if (p > m_begin) - { - --p; - } - else - { - ss_dassert(p == m_begin); - p = m_end - 1; - } - - ss_dassert(p >= m_begin); - ss_dassert(p < m_end); - - return p; - } - - uint8_t* next(uint8_t* p) - { - ss_dassert(p >= m_begin); - ss_dassert(p < m_end); - - ++p; - - if (p == m_end) - { - p = m_begin; - } - - ss_dassert(p >= m_begin); - ss_dassert(p < m_end); - - return p; - } - - private: - uint8_t m_begin[N]; /*< Buffer containing values from which the average is calculated. */ - uint8_t* m_end; /*< Points to one past the end of the buffer. */ - uint8_t* m_i; /*< Current position in the buffer. */ - uint32_t m_sum; /*< Sum of all values in the buffer. */ - uint32_t m_nValues; /*< How many values the buffer contains. */ - }; - - uint64_t m_start_time; /*< When was the current 1-second period started. */ - uint64_t m_wait_start; /*< The time when the worker entered epoll_wait(). */ - uint64_t m_wait_time; /*< How much time the worker has spent in epoll_wait(). */ - AverageN<60> m_load_1_hour; /*< The average load during the last hour. */ - AverageN<60> m_load_1_minute; /*< The average load during the last minute. */ - Average1 m_load_1_second; /*< The load during the last 1-second period. */ -}; - -class RoutingWorker; -typedef RoutingWorker Worker; - -class RoutingWorker : public MXS_WORKER - , private MessageQueue::Handler - , private MXS_POLL_DATA -{ - RoutingWorker(const RoutingWorker&); - RoutingWorker& operator = (const RoutingWorker&); - -public: - typedef WORKER_STATISTICS STATISTICS; - typedef WorkerTask Task; - typedef WorkerDisposableTask DisposableTask; typedef Registry SessionsById; typedef std::vector Zombies; - typedef WorkerLoad Load; - - enum state_t - { - STOPPED, - IDLE, - POLLING, - PROCESSING, - ZPROCESSING - }; - - enum execute_mode_t - { - EXECUTE_AUTO, /**< Execute tasks immediately */ - EXECUTE_QUEUED /**< Only queue tasks for execution */ - }; /** - * Initialize the worker mechanism. + * Initialize the routing worker mechanism. * * To be called once at process startup. This will cause as many workers * to be created as the number of threads defined. @@ -474,102 +54,6 @@ public: */ static void finish(); - /** - * Returns the id of the worker - * - * @return The id of the worker. - */ - int id() const - { - return m_id; - } - - int load(Load::counter_t counter) - { - return m_load.percentage(counter); - } - - /** - * Returns the state of the worker. - * - * @return The current state. - * - * @attentions The state might have changed the moment after the function returns. - */ - state_t state() const - { - return m_state; - } - - /** - * Returns statistics for this worker. - * - * @return The worker specific statistics. - * - * @attentions The statistics may change at any time. - */ - const STATISTICS& statistics() const - { - return m_statistics; - } - - /** - * Returns statistics for all workers. - * - * @return Combined statistics. - * - * @attentions The statistics may no longer be accurate by the time it has - * been returned. The returned values may also not represent a - * 100% consistent set. - */ - static STATISTICS get_statistics(); - - /** - * Return a specific combined statistic value. - * - * @param what What to return. - * - * @return The corresponding value. - */ - static int64_t get_one_statistic(POLL_STAT what); - - /** - * Return this worker's statistics. - * - * @return Local statistics for this worker. - */ - const STATISTICS& get_local_statistics() const - { - return m_statistics; - } - - /** - * Return the count of descriptors. - * - * @param pnCurrent On output the current number of descriptors. - * @param pnTotal On output the total number of descriptors. - */ - void get_descriptor_counts(uint32_t* pnCurrent, uint64_t* pnTotal); - - /** - * Add a file descriptor to the epoll instance of the worker. - * - * @param fd The file descriptor to be added. - * @param events Mask of epoll event types. - * @param pData The poll data associated with the descriptor: - * - * data->handler : Handler that knows how to deal with events - * for this particular type of 'struct mxs_poll_data'. - * data->thread.id: Will be updated by the worker. - * - * @attention The provided file descriptor must be non-blocking. - * @attention @c pData must remain valid until the file descriptor is - * removed from the worker. - * - * @return True, if the descriptor could be added, false otherwise. - */ - bool add_fd(int fd, uint32_t events, MXS_POLL_DATA* pData); - /** * Add a file descriptor to the epoll instance shared between all workers. * Events occuring on the provided file descriptor will be handled by all @@ -589,15 +73,6 @@ public: */ static bool add_shared_fd(int fd, uint32_t events, MXS_POLL_DATA* pData); - /** - * Remove a file descriptor from the worker's epoll instance. - * - * @param fd The file descriptor to be removed. - * - * @return True on success, false on failure. - */ - bool remove_fd(int fd); - /** * Remove a file descriptor from the epoll instance shared between all workers. * @@ -616,180 +91,6 @@ public: */ void register_zombie(DCB* pZombie); - /** - * Main function of worker. - * - * The worker will run the poll loop, until it is told to shut down. - * - * @attention This function will run in the calling thread. - */ - void run(); - - /** - * Run worker in separate thread. - * - * This function will start a new thread, in which the `run` - * function will be executed. - * - * @param stack_size The stack size of the new thread. A value of 0 means - * that the pthread default should be used. - * - * @return True if the thread could be started, false otherwise. - */ - bool start(size_t stack_size = 0); - - /** - * Waits for the worker to finish. - */ - void join(); - - /** - * Initate shutdown of worker. - * - * @attention A call to this function will only initiate the shutdowm, - * the worker will not have shut down when the function returns. - * - * @attention This function is signal safe. - */ - void shutdown(); - - /** - * Query whether worker should shutdown. - * - * @return True, if the worker should shut down, false otherwise. - */ - bool should_shutdown() const - { - return m_should_shutdown; - } - - /** - * Posts a task to a worker for execution. - * - * @param pTask The task to be executed. - * @param pSem If non-NULL, will be posted once the task's `execute` return. - * @param mode Execution mode - * - * @return True if the task could be posted (i.e. not executed), false otherwise. - * - * @attention The instance must remain valid for as long as it takes for the - * task to be transferred to the worker and its `execute` function - * to be called. - * - * The semaphore can be used for waiting for the task to be finished. - * - * @code - * Semaphore sem; - * MyTask task; - * - * pWorker->execute(&task, &sem); - * sem.wait(); - * - * MyResult& result = task.result(); - * @endcode - */ - bool post(Task* pTask, Semaphore* pSem = NULL, enum execute_mode_t mode = EXECUTE_AUTO); - - /** - * Posts a task to a worker for execution. - * - * @param pTask The task to be executed. - * @param mode Execution mode - * - * @return True if the task could be posted (i.e. not executed), false otherwise. - * - * @attention Once the task has been executed, it will be deleted. - */ - bool post(std::auto_ptr sTask, enum execute_mode_t mode = EXECUTE_AUTO); - - template - bool post(std::auto_ptr sTask, enum execute_mode_t mode = EXECUTE_AUTO) - { - return post(std::auto_ptr(sTask.release()), mode); - } - - /** - * Posts a task to all workers for execution. - * - * @param pTask The task to be executed. - * @param pSem If non-NULL, will be posted once per worker when the task's - * `execute` return. - * - * @return How many workers the task was posted to. - * - * @attention The very same task will be posted to all workers. The task - * should either not have any sharable data or then it should - * have data specific to each worker that can be accessed - * without locks. - */ - static size_t broadcast(Task* pTask, Semaphore* pSem = NULL); - - /** - * Posts a task to all workers for execution. - * - * @param pTask The task to be executed. - * - * @return How many workers the task was posted to. - * - * @attention The very same task will be posted to all workers. The task - * should either not have any sharable data or then it should - * have data specific to each worker that can be accessed - * without locks. - * - * @attention Once the task has been executed by all workers, it will - * be deleted. - */ - static size_t broadcast(std::auto_ptr sTask); - - template - static size_t broadcast(std::auto_ptr sTask) - { - return broadcast(std::auto_ptr(sTask.release())); - } - - /** - * Executes a task on all workers in serial mode (the task is executed - * on at most one worker thread at a time). When the function returns - * the task has been executed on all workers. - * - * @param task The task to be executed. - * - * @return How many workers the task was posted to. - * - * @warning This function is extremely inefficient and will be slow compared - * to the other functions. Only use this function when printing thread-specific - * data to stdout. - */ - static size_t execute_serially(Task& task); - - /** - * Executes a task on all workers concurrently and waits until all workers - * are done. That is, when the function returns the task has been executed - * by all workers. - * - * @param task The task to be executed. - * - * @return How many workers the task was posted to. - */ - static size_t execute_concurrently(Task& task); - - /** - * Post a message to a worker. - * - * @param msg_id The message id. - * @param arg1 Message specific first argument. - * @param arg2 Message specific second argument. - * - * @return True if the message could be sent, false otherwise. If the message - * posting fails, errno is set appropriately. - * - * @attention The return value tells *only* whether the message could be sent, - * *not* that it has reached the worker. - * - * @attention This function is signal safe. - */ - bool post_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2); - /** * Return a reference to the session registry of this worker. * @@ -797,40 +98,11 @@ public: */ SessionsById& session_registry(); - /** - * Broadcast a message to all worker. - * - * @param msg_id The message id. - * @param arg1 Message specific first argument. - * @param arg2 Message specific second argument. - * - * @return The number of messages posted; if less that ne number of workers - * then some postings failed. - * - * @attention The return value tells *only* whether message could be posted, - * *not* that it has reached the worker. - * - * @attentsion Exactly the same arguments are passed to all workers. Take that - * into account if the passed data must be freed. - * - * @attention This function is signal safe. - */ - static size_t broadcast_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2); - - /** - * Initate shutdown of all workers. - * - * @attention A call to this function will only initiate the shutdowm, - * the workers will not have shut down when the function returns. - * - * @attention This function is signal safe. - */ - static void shutdown_all(); - /** * Return the worker associated with the provided worker id. * - * @param worker_id A worker id. + * @param worker_id A worker id. By specifying MAIN, the routing worker + * running in the main thread will be returned. * * @return The corresponding worker instance, or NULL if the id does * not correspond to a worker. @@ -852,59 +124,50 @@ public: static int get_current_id(); /** - * Set the number of non-blocking poll cycles that will be done before - * a blocking poll will take place. + * Starts all routing workers but the main worker (the one running in + * the main thread). * - * @param nbpolls Number of non-blocking polls to perform before blocking. + * @return True, if all secondary workers could be started. + */ + static bool start_threaded_workers(); + + /** + * Waits for all threaded workers. + */ + static void join_threaded_workers(); + + /** + * Deprecated */ static void set_nonblocking_polls(unsigned int nbpolls); /** - * Maximum time to block in epoll_wait. - * - * @param maxwait Maximum wait time in millliseconds. + * Deprecated */ static void set_maxwait(unsigned int maxwait); private: - RoutingWorker(int id, - int epoll_fd); + RoutingWorker(); virtual ~RoutingWorker(); - static RoutingWorker* create(int id, int epoll_listener_fd); + static RoutingWorker* create(int epoll_listener_fd); + + bool pre_run(); // override + void post_run(); // override + void epoll_tick(); // override void delete_zombies(); - bool post_disposable(DisposableTask* pTask, enum execute_mode_t mode = EXECUTE_AUTO); - - void handle_message(MessageQueue& queue, const MessageQueue::Message& msg); // override - - static void thread_main(void* arg); - - void poll_waitevents(); - static uint32_t epoll_instance_handler(struct mxs_poll_data* data, int wid, uint32_t events); uint32_t handle_epoll_events(uint32_t events); private: - int m_id; /*< The id of the worker. */ - state_t m_state; /*< The state of the worker */ - int m_epoll_fd; /*< The epoll file descriptor. */ - STATISTICS m_statistics; /*< Worker statistics. */ - MessageQueue* m_pQueue; /*< The message queue of the worker. */ - THREAD m_thread; /*< The thread handle of the worker. */ - bool m_started; /*< Whether the thread has been started or not. */ - bool m_should_shutdown; /*< Whether shutdown should be performed. */ - bool m_shutdown_initiated; /*< Whether shutdown has been initated. */ - SessionsById m_sessions; /*< A mapping of session_id->MXS_SESSION. The map - * should contain sessions exclusive to this - * worker and not e.g. listener sessions. For now, - * it's up to the protocol to decide whether a new - * session is added to the map. */ - Zombies m_zombies; /*< DCBs to be deleted. */ - uint32_t m_nCurrent_descriptors; /*< Current number of descriptors. */ - uint64_t m_nTotal_descriptors; /*< Total number of descriptors. */ - Load m_load; + SessionsById m_sessions; /*< A mapping of session_id->MXS_SESSION. The map + * should contain sessions exclusive to this + * worker and not e.g. listener sessions. For now, + * it's up to the protocol to decide whether a new + * session is added to the map. */ + Zombies m_zombies; /*< DCBs to be deleted. */ }; } diff --git a/server/core/internal/worker.hh b/server/core/internal/worker.hh new file mode 100644 index 000000000..ec273bc8e --- /dev/null +++ b/server/core/internal/worker.hh @@ -0,0 +1,862 @@ +#pragma once +/* + * Copyright (c) 2016 MariaDB Corporation Ab + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file and at www.mariadb.com/bsl11. + * + * Change Date: 2020-01-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2 or later of the General + * Public License. + */ + +#include +#include +#include +#include +#include +#include +#include "messagequeue.hh" +#include "poll.h" +#include "worker.h" +#include "workertask.hh" +#include "session.hh" + +namespace maxscale +{ + +class Semaphore; + +struct WORKER_STATISTICS +{ + WORKER_STATISTICS() + { + memset(this, 0, sizeof(WORKER_STATISTICS)); + } + + enum + { + MAXNFDS = 10, + N_QUEUE_TIMES = 30 + }; + + int64_t n_read; /*< Number of read events */ + int64_t n_write; /*< Number of write events */ + int64_t n_error; /*< Number of error events */ + int64_t n_hup; /*< Number of hangup events */ + int64_t n_accept; /*< Number of accept events */ + int64_t n_polls; /*< Number of poll cycles */ + int64_t n_pollev; /*< Number of polls returning events */ + int64_t n_nbpollev; /*< Number of polls returning events */ + int64_t n_fds[MAXNFDS]; /*< Number of wakeups with particular n_fds value */ + int64_t evq_length; /*< Event queue length */ + int64_t evq_max; /*< Maximum event queue length */ + int64_t blockingpolls; /*< Number of epoll_waits with a timeout specified */ + uint32_t qtimes[N_QUEUE_TIMES + 1]; + uint32_t exectimes[N_QUEUE_TIMES + 1]; + int64_t maxqtime; + int64_t maxexectime; +}; + +/** + * WorkerLoad is a class that calculates the load percentage of a worker + * thread, based upon the relative amount of time the worker spends in + * epoll_wait(). + * + * If during a time period of length T milliseconds, the worker thread + * spends t milliseconds in epoll_wait(), then the load of the worker is + * calculated as 100 * ((T - t) / T). That is, if the worker spends all + * the time in epoll_wait(), then the load is 0 and if the worker spends + * no time waiting in epoll_wait(), then the load is 100. + */ +class WorkerLoad +{ + WorkerLoad(const WorkerLoad&) = delete; + WorkerLoad& operator = (const WorkerLoad&) = delete; + +public: + enum counter_t + { + ONE_SECOND = 1000, + ONE_MINUTE = 60 * ONE_SECOND, + ONE_HOUR = 60 * ONE_MINUTE, + }; + + enum + { + GRANULARITY = ONE_SECOND + }; + + /** + * Constructor + */ + WorkerLoad(); + + /** + * Reset the load calculation. Should be called immediately before the + * worker enters its eternal epoll_wait()-loop. + */ + void reset() + { + uint64_t now = get_time(); + + m_start_time = now; + m_wait_start = 0; + m_wait_time = 0; + } + + /** + * To be used for signaling that the worker is about to call epoll_wait(). + * + * @param now The current time. + */ + void about_to_wait(uint64_t now) + { + m_wait_start = now; + } + + void about_to_wait() + { + about_to_wait(get_time()); + } + + /** + * To be used for signaling that the worker has returned from epoll_wait(). + * + * @param now The current time. + */ + void about_to_work(uint64_t now); + + void about_to_work() + { + about_to_work(get_time()); + } + + /** + * Returns the last calculated load, + * + * @return A value between 0 and 100. + */ + uint8_t percentage(counter_t counter) const + { + switch (counter) + { + case ONE_SECOND: + return m_load_1_second.value(); + + case ONE_MINUTE: + return m_load_1_minute.value(); + + case ONE_HOUR: + return m_load_1_hour.value(); + + default: + ss_dassert(!true); + return 0; + }; + } + + /** + * When was the last 1 second period started. + * + * @return The start time. + */ + uint64_t start_time() const + { + return m_start_time; + } + + /** + * Returns the current time using CLOCK_MONOTONIC. + * + * @return Current time in milliseconds. + */ + static uint64_t get_time(); + +private: + /** + * Average is a base class for classes intended to be used for calculating + * averages. An Average may have a dependant Average whose value depends + * upon the value of the first. At certain moments, an Average may trigger + * its dependant Average to update itself. + */ + class Average + { + Average(const Average&) = delete; + Average& operator = (const Average&) = delete; + + public: + /** + * Constructor + * + * @param pDependant An optional dependant average. + */ + Average(Average* pDependant = NULL) + : m_pDependant(pDependant) + , m_value(0) + {} + + virtual ~Average(); + + /** + * Add a value to the Average. The exact meaning depends upon the + * concrete Average class. + * + * If the addition of the value in some sense represents a full cycle + * in the average calculation, then the instance will call add_value() + * on its dependant, otherwise it will call update_value(). In both cases + * with its own value as argument. + * + * @param value The value to be added. + * + * @return True if the addition of the value caused a full cycle + * in the average calculation, false otherwise. + */ + virtual bool add_value(uint8_t value) = 0; + + /** + * Update the value of the Average. The exact meaning depends upon the + * concrete Average class. Will also call update_value() of its dependant + * with its own value as argument. + * + * @param value The value to be updated. + */ + virtual void update_value(uint8_t value) = 0; + + /** + * Return the average value. + * + * @return The value represented by the Average. + */ + uint8_t value() const + { + return atomic_load_uint32(&m_value); + } + + protected: + Average* m_pDependant; /*< The optional dependant Average. */ + uint32_t m_value; /*< The current average value. */ + + protected: + void set_value(uint32_t value) + { + atomic_store_uint32(&m_value, value); + } + }; + + /** + * An Average consisting of a single value. + */ + class Average1 : public Average + { + public: + Average1(Average* pDependant = NULL) + : Average(pDependant) + { + } + + bool add_value(uint8_t value) + { + set_value(value); + + // Every addition of a value represents a full cycle. + if (m_pDependant) + { + m_pDependant->add_value(value); + } + + return true; + } + + void update_value(uint8_t value) + { + set_value(value); + + if (m_pDependant) + { + m_pDependant->update_value(value); + } + } + }; + + /** + * An Average calculated from N values. + */ + template + class AverageN : public Average + { + public: + AverageN(Average* pDependant = NULL) + : Average(pDependant) + , m_end(m_begin + N) + , m_i(m_begin) + , m_sum(0) + , m_nValues(0) + { + } + + bool add_value(uint8_t value) + { + if (m_nValues == N) + { + // If as many values that fit has been added, then remove the + // least recent value from the sum. + m_sum -= *m_i; + } + else + { + // Otherwise make a note that a new value is added. + ++m_nValues; + } + + *m_i = value; + m_sum += *m_i; // Update the sum of all values. + + m_i = next(m_i); + + uint32_t average = m_sum / m_nValues; + + set_value(average); + + if (m_pDependant) + { + if (m_i == m_begin) + { + // If we have looped around we have performed a full cycle and will + // add a new value to the dependant average. + m_pDependant->add_value(average); + } + else + { + // Otherwise we just update the most recent value. + m_pDependant->update_value(average); + } + } + + return m_i == m_begin; + } + + void update_value(uint8_t value) + { + if (m_nValues == 0) + { + // If no values have been added yet, there's nothing to update but we + // need to add the value. + add_value(value); + } + else + { + // Otherwise we update the most recent value. + uint8_t* p = prev(m_i); + + m_sum -= *p; + *p = value; + m_sum += *p; + + uint32_t average = m_sum / m_nValues; + + set_value(average); + + if (m_pDependant) + { + m_pDependant->update_value(average); + } + } + } + + private: + uint8_t* prev(uint8_t* p) + { + ss_dassert(p >= m_begin); + ss_dassert(p < m_end); + + if (p > m_begin) + { + --p; + } + else + { + ss_dassert(p == m_begin); + p = m_end - 1; + } + + ss_dassert(p >= m_begin); + ss_dassert(p < m_end); + + return p; + } + + uint8_t* next(uint8_t* p) + { + ss_dassert(p >= m_begin); + ss_dassert(p < m_end); + + ++p; + + if (p == m_end) + { + p = m_begin; + } + + ss_dassert(p >= m_begin); + ss_dassert(p < m_end); + + return p; + } + + private: + uint8_t m_begin[N]; /*< Buffer containing values from which the average is calculated. */ + uint8_t* m_end; /*< Points to one past the end of the buffer. */ + uint8_t* m_i; /*< Current position in the buffer. */ + uint32_t m_sum; /*< Sum of all values in the buffer. */ + uint32_t m_nValues; /*< How many values the buffer contains. */ + }; + + uint64_t m_start_time; /*< When was the current 1-second period started. */ + uint64_t m_wait_start; /*< The time when the worker entered epoll_wait(). */ + uint64_t m_wait_time; /*< How much time the worker has spent in epoll_wait(). */ + AverageN<60> m_load_1_hour; /*< The average load during the last hour. */ + AverageN<60> m_load_1_minute; /*< The average load during the last minute. */ + Average1 m_load_1_second; /*< The load during the last 1-second period. */ +}; + + +class Worker : public MXS_WORKER + , private MessageQueue::Handler +{ + Worker(const Worker&) = delete; + Worker& operator = (const Worker&) = delete; + +public: + typedef WORKER_STATISTICS STATISTICS; + typedef WorkerTask Task; + typedef WorkerDisposableTask DisposableTask; + typedef WorkerLoad Load; + + enum state_t + { + STOPPED, + IDLE, + POLLING, + PROCESSING, + ZPROCESSING + }; + + enum execute_mode_t + { + EXECUTE_AUTO, /**< Execute tasks immediately */ + EXECUTE_QUEUED /**< Only queue tasks for execution */ + }; + + /** + * Initialize the worker mechanism. + * + * To be called once at process startup. This will cause as many workers + * to be created as the number of threads defined. + * + * @return True if the initialization succeeded, false otherwise. + */ + static bool init(); + + /** + * Finalize the worker mechanism. + * + * To be called once at process shutdown. This will cause all workers + * to be destroyed. When the function is called, no worker should be + * running anymore. + */ + static void finish(); + + /** + * Returns the id of the worker + * + * @return The id of the worker. + */ + int id() const + { + return m_id; + } + + int load(Load::counter_t counter) + { + return m_load.percentage(counter); + } + + /** + * Returns the state of the worker. + * + * @return The current state. + * + * @attentions The state might have changed the moment after the function returns. + */ + state_t state() const + { + return m_state; + } + + /** + * Returns statistics for this worker. + * + * @return The worker specific statistics. + * + * @attentions The statistics may change at any time. + */ + const STATISTICS& statistics() const + { + return m_statistics; + } + + /** + * Returns statistics for all workers. + * + * @return Combined statistics. + * + * @attentions The statistics may no longer be accurate by the time it has + * been returned. The returned values may also not represent a + * 100% consistent set. + */ + static STATISTICS get_statistics(); + + /** + * Return a specific combined statistic value. + * + * @param what What to return. + * + * @return The corresponding value. + */ + static int64_t get_one_statistic(POLL_STAT what); + + /** + * Return this worker's statistics. + * + * @return Local statistics for this worker. + */ + const STATISTICS& get_local_statistics() const + { + return m_statistics; + } + + /** + * Return the count of descriptors. + * + * @param pnCurrent On output the current number of descriptors. + * @param pnTotal On output the total number of descriptors. + */ + void get_descriptor_counts(uint32_t* pnCurrent, uint64_t* pnTotal); + + /** + * Add a file descriptor to the epoll instance of the worker. + * + * @param fd The file descriptor to be added. + * @param events Mask of epoll event types. + * @param pData The poll data associated with the descriptor: + * + * data->handler : Handler that knows how to deal with events + * for this particular type of 'struct mxs_poll_data'. + * data->thread.id: Will be updated by the worker. + * + * @attention The provided file descriptor must be non-blocking. + * @attention @c pData must remain valid until the file descriptor is + * removed from the worker. + * + * @return True, if the descriptor could be added, false otherwise. + */ + bool add_fd(int fd, uint32_t events, MXS_POLL_DATA* pData); + + /** + * Remove a file descriptor from the worker's epoll instance. + * + * @param fd The file descriptor to be removed. + * + * @return True on success, false on failure. + */ + bool remove_fd(int fd); + + /** + * Main function of worker. + * + * The worker will run the poll loop, until it is told to shut down. + * + * @attention This function will run in the calling thread. + */ + void run(); + + /** + * Run worker in separate thread. + * + * This function will start a new thread, in which the `run` + * function will be executed. + * + * @param stack_size The stack size of the new thread. A value of 0 means + * that the pthread default should be used. + * + * @return True if the thread could be started, false otherwise. + */ + bool start(size_t stack_size = 0); + + /** + * Waits for the worker to finish. + */ + void join(); + + /** + * Initate shutdown of worker. + * + * @attention A call to this function will only initiate the shutdowm, + * the worker will not have shut down when the function returns. + * + * @attention This function is signal safe. + */ + void shutdown(); + + /** + * Query whether worker should shutdown. + * + * @return True, if the worker should shut down, false otherwise. + */ + bool should_shutdown() const + { + return m_should_shutdown; + } + + /** + * Posts a task to a worker for execution. + * + * @param pTask The task to be executed. + * @param pSem If non-NULL, will be posted once the task's `execute` return. + * @param mode Execution mode + * + * @return True if the task could be posted (i.e. not executed), false otherwise. + * + * @attention The instance must remain valid for as long as it takes for the + * task to be transferred to the worker and its `execute` function + * to be called. + * + * The semaphore can be used for waiting for the task to be finished. + * + * @code + * Semaphore sem; + * MyTask task; + * + * pWorker->execute(&task, &sem); + * sem.wait(); + * + * MyResult& result = task.result(); + * @endcode + */ + bool post(Task* pTask, Semaphore* pSem = NULL, enum execute_mode_t mode = EXECUTE_AUTO); + + /** + * Posts a task to a worker for execution. + * + * @param pTask The task to be executed. + * @param mode Execution mode + * + * @return True if the task could be posted (i.e. not executed), false otherwise. + * + * @attention Once the task has been executed, it will be deleted. + */ + bool post(std::auto_ptr sTask, enum execute_mode_t mode = EXECUTE_AUTO); + + template + bool post(std::auto_ptr sTask, enum execute_mode_t mode = EXECUTE_AUTO) + { + return post(std::auto_ptr(sTask.release()), mode); + } + + /** + * Posts a task to all workers for execution. + * + * @param pTask The task to be executed. + * @param pSem If non-NULL, will be posted once per worker when the task's + * `execute` return. + * + * @return How many workers the task was posted to. + * + * @attention The very same task will be posted to all workers. The task + * should either not have any sharable data or then it should + * have data specific to each worker that can be accessed + * without locks. + */ + static size_t broadcast(Task* pTask, Semaphore* pSem = NULL); + + /** + * Posts a task to all workers for execution. + * + * @param pTask The task to be executed. + * + * @return How many workers the task was posted to. + * + * @attention The very same task will be posted to all workers. The task + * should either not have any sharable data or then it should + * have data specific to each worker that can be accessed + * without locks. + * + * @attention Once the task has been executed by all workers, it will + * be deleted. + */ + static size_t broadcast(std::auto_ptr sTask); + + template + static size_t broadcast(std::auto_ptr sTask) + { + return broadcast(std::auto_ptr(sTask.release())); + } + + /** + * Executes a task on all workers in serial mode (the task is executed + * on at most one worker thread at a time). When the function returns + * the task has been executed on all workers. + * + * @param task The task to be executed. + * + * @return How many workers the task was posted to. + * + * @warning This function is extremely inefficient and will be slow compared + * to the other functions. Only use this function when printing thread-specific + * data to stdout. + */ + static size_t execute_serially(Task& task); + + /** + * Executes a task on all workers concurrently and waits until all workers + * are done. That is, when the function returns the task has been executed + * by all workers. + * + * @param task The task to be executed. + * + * @return How many workers the task was posted to. + */ + static size_t execute_concurrently(Task& task); + + /** + * Post a message to a worker. + * + * @param msg_id The message id. + * @param arg1 Message specific first argument. + * @param arg2 Message specific second argument. + * + * @return True if the message could be sent, false otherwise. If the message + * posting fails, errno is set appropriately. + * + * @attention The return value tells *only* whether the message could be sent, + * *not* that it has reached the worker. + * + * @attention This function is signal safe. + */ + bool post_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2); + + /** + * Broadcast a message to all worker. + * + * @param msg_id The message id. + * @param arg1 Message specific first argument. + * @param arg2 Message specific second argument. + * + * @return The number of messages posted; if less that ne number of workers + * then some postings failed. + * + * @attention The return value tells *only* whether message could be posted, + * *not* that it has reached the worker. + * + * @attentsion Exactly the same arguments are passed to all workers. Take that + * into account if the passed data must be freed. + * + * @attention This function is signal safe. + */ + static size_t broadcast_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2); + + /** + * Initate shutdown of all workers. + * + * @attention A call to this function will only initiate the shutdowm, + * the workers will not have shut down when the function returns. + * + * @attention This function is signal safe. + */ + static void shutdown_all(); + + /** + * Return the worker associated with the provided worker id. + * + * @param worker_id A worker id. + * + * @return The corresponding worker instance, or NULL if the id does + * not correspond to a worker. + */ + static Worker* get(int worker_id); + + /** + * Return the worker associated with the current thread. + * + * @return The worker instance, or NULL if the current thread does not have a worker. + */ + static Worker* get_current(); + + /** + * Return the worker id associated with the current thread. + * + * @return A worker instance, or -1 if the current thread does not have a worker. + */ + static int get_current_id(); + +protected: + Worker(); + virtual ~Worker(); + + /** + * Called by Worker::run() before starting the epoll loop. + * + * @return True, if the epoll loop should be started, false otherwise. + */ + virtual bool pre_run() = 0; + + /** + * Called by Worker::run() after the epoll loop has finished. + */ + virtual void post_run() = 0; + + /** + * Called by Worker::run() once per epoll loop. + */ + virtual void epoll_tick() = 0; + + /** + * Helper for resolving epoll-errors. In case of fatal ones, SIGABRT + * will be raised. + * + * @param fd The epoll file descriptor. + * @param errnum The errno of the operation. + * @param op Either EPOLL_CTL_ADD or EPOLL_CTL_DEL. + */ + static void resolve_poll_error(int fd, int err, int op); + +protected: + const int m_id; /*< The id of the worker. */ + const int m_epoll_fd; /*< The epoll file descriptor. */ + state_t m_state; /*< The state of the worker */ + +private: + bool post_disposable(DisposableTask* pTask, enum execute_mode_t mode = EXECUTE_AUTO); + + void handle_message(MessageQueue& queue, const MessageQueue::Message& msg); // override + + static void thread_main(void* arg); + + void poll_waitevents(); + +private: + STATISTICS m_statistics; /*< Worker statistics. */ + MessageQueue* m_pQueue; /*< The message queue of the worker. */ + THREAD m_thread; /*< The thread handle of the worker. */ + bool m_started; /*< Whether the thread has been started or not. */ + bool m_should_shutdown; /*< Whether shutdown should be performed. */ + bool m_shutdown_initiated; /*< Whether shutdown has been initated. */ + uint32_t m_nCurrent_descriptors; /*< Current number of descriptors. */ + uint64_t m_nTotal_descriptors; /*< Total number of descriptors. */ + Load m_load; +}; + +} diff --git a/server/core/internal/workertask.hh b/server/core/internal/workertask.hh index e5d0020c7..7510ee8d8 100644 --- a/server/core/internal/workertask.hh +++ b/server/core/internal/workertask.hh @@ -17,7 +17,7 @@ namespace maxscale { -class RoutingWorker; +class Worker; /** * A WorkerTask represents a task to be performed by a Worker. @@ -38,7 +38,7 @@ public: * @attention As the function is called by a worker, the body of `execute` * should execute quickly and not perform any blocking operations. */ - virtual void execute(RoutingWorker& worker) = 0; + virtual void execute(Worker& worker) = 0; }; /** @@ -69,10 +69,10 @@ protected: * @attention As the function is called by a worker, the body of `execute` * should execute quickly and not perform any blocking operations. */ - virtual void execute(RoutingWorker& worker) = 0; + virtual void execute(Worker& worker) = 0; private: - friend class RoutingWorker; + friend class Worker; void inc_ref(); void dec_ref(); diff --git a/server/core/messagequeue.cc b/server/core/messagequeue.cc index 85279c0f5..30ef13eee 100644 --- a/server/core/messagequeue.cc +++ b/server/core/messagequeue.cc @@ -165,7 +165,7 @@ bool MessageQueue::post(const Message& message) const return rv; } -bool MessageQueue::add_to_worker(RoutingWorker* pWorker) +bool MessageQueue::add_to_worker(Worker* pWorker) { if (m_pWorker) { @@ -181,9 +181,9 @@ bool MessageQueue::add_to_worker(RoutingWorker* pWorker) return m_pWorker != NULL; } -RoutingWorker* MessageQueue::remove_from_worker() +Worker* MessageQueue::remove_from_worker() { - RoutingWorker* pWorker = m_pWorker; + Worker* pWorker = m_pWorker; if (m_pWorker) { diff --git a/server/core/poll.cc b/server/core/poll.cc index e9fb70721..9017da1d7 100644 --- a/server/core/poll.cc +++ b/server/core/poll.cc @@ -39,6 +39,7 @@ #include "internal/routingworker.hh" using maxscale::Worker; +using maxscale::RoutingWorker; static int n_threads; /*< Number of threads */ @@ -63,17 +64,17 @@ static bool add_fd_to_worker(int wid, int fd, uint32_t events, MXS_POLL_DATA* da return worker->add_fd(fd, events, data); } -static bool add_fd_to_workers(int fd, uint32_t events, MXS_POLL_DATA* data) +static bool add_fd_to_routing_workers(int fd, uint32_t events, MXS_POLL_DATA* data) { bool rv = true; int thread_id = data->thread.id; - rv = Worker::add_shared_fd(fd, events, data); + rv = RoutingWorker::add_shared_fd(fd, events, data); if (rv) { // The DCB will appear on the list of the calling thread. - int wid = Worker::get_current_id(); + int wid = RoutingWorker::get_current_id(); if (wid == -1) { @@ -101,7 +102,7 @@ bool poll_add_fd_to_worker(int wid, int fd, uint32_t events, MXS_POLL_DATA* data if (wid == MXS_WORKER_ALL) { - rv = add_fd_to_workers(fd, events, data); + rv = add_fd_to_routing_workers(fd, events, data); } else { @@ -123,9 +124,9 @@ static bool remove_fd_from_worker(int wid, int fd) return worker->remove_fd(fd); } -static bool remove_fd_from_workers(int fd) +static bool remove_fd_from_routing_workers(int fd) { - return Worker::remove_shared_fd(fd); + return RoutingWorker::remove_shared_fd(fd); } bool poll_remove_fd_from_worker(int wid, int fd) @@ -134,7 +135,7 @@ bool poll_remove_fd_from_worker(int wid, int fd) if (wid == MXS_WORKER_ALL) { - rv = remove_fd_from_workers(fd); + rv = remove_fd_from_routing_workers(fd); } else { @@ -156,7 +157,7 @@ bool poll_remove_fd_from_worker(int wid, int fd) void poll_set_nonblocking_polls(unsigned int nbpolls) { - Worker::set_nonblocking_polls(nbpolls); + RoutingWorker::set_nonblocking_polls(nbpolls); } /** @@ -169,7 +170,7 @@ poll_set_nonblocking_polls(unsigned int nbpolls) void poll_set_maxwait(unsigned int maxwait) { - Worker::set_maxwait(maxwait); + RoutingWorker::set_maxwait(maxwait); } /** diff --git a/server/core/resource.cc b/server/core/resource.cc index 8c0b3c004..3f3bc8980 100644 --- a/server/core/resource.cc +++ b/server/core/resource.cc @@ -34,7 +34,6 @@ #include "internal/service.h" #include "internal/config_runtime.h" #include "internal/modules.h" -#include "internal/worker.h" #include "internal/routingworker.hh" using std::list; @@ -128,7 +127,7 @@ bool Resource::matching_variable_path(const string& path, const string& target) char* end; int id = strtol(target.c_str(), &end, 10); - if (*end == '\0' && mxs_worker_get(id)) + if (*end == '\0' && mxs_rworker_get(id)) { rval = true; } diff --git a/server/core/routingworker.cc b/server/core/routingworker.cc index 0902d6f57..777cf5111 100644 --- a/server/core/routingworker.cc +++ b/server/core/routingworker.cc @@ -59,20 +59,23 @@ const int MXS_WORKER_MSG_DISPOSABLE_TASK = -2; struct this_unit { bool initialized; // Whether the initialization has been performed. - int n_workers; // How many workers there are. - RoutingWorker** ppWorkers; // Array of worker instances. + int nWorkers; // How many routing workers there are. + RoutingWorker** ppWorkers; // Array of routing worker instances. // DEPRECATED in 2.3, remove in 2.4. int number_poll_spins; // Maximum non-block polls // DEPRECATED in 2.3, remove in 2.4. int max_poll_sleep; // Maximum block time int epoll_listener_fd; // Shared epoll descriptor for listening descriptors. + int id_main_worker; // The id of the worker running in the main thread. } this_unit = { - false, - 0, - NULL, - 0, - 0 + false, // initialized + 0, // nWorkers + NULL, // ppWorkers + 0, // number_poll_spins + 0, // max_poll_sleep + -1, // epoll_listener_fd + WORKER_ABSENT_ID // id_main_worker }; thread_local struct this_thread @@ -84,143 +87,84 @@ thread_local struct this_thread }; /** - * Structure used for sending cross-thread messages. - */ -typedef struct worker_message -{ - uint32_t id; /*< Message id. */ - intptr_t arg1; /*< Message specific first argument. */ - intptr_t arg2; /*< Message specific second argument. */ -} WORKER_MESSAGE; - -/** - * Check error returns from epoll_ctl; impossible ones lead to crash. + * Calls thread_init on all loaded modules. * - * @param errornum The errno set by epoll_ctl - * @param op Either EPOLL_CTL_ADD or EPOLL_CTL_DEL. + * @return True, if all modules were successfully initialized. */ -void poll_resolve_error(int fd, int errornum, int op) +bool modules_thread_init() { - if (op == EPOLL_CTL_ADD) - { - if (EEXIST == errornum) - { - MXS_ERROR("File descriptor %d already present in an epoll instance.", fd); - return; - } + bool initialized = false; - if (ENOSPC == errornum) + MXS_MODULE_ITERATOR i = mxs_module_iterator_get(NULL); + MXS_MODULE* module = NULL; + + while ((module = mxs_module_iterator_get_next(&i)) != NULL) + { + if (module->thread_init) { - MXS_ERROR("The limit imposed by /proc/sys/fs/epoll/max_user_watches was " - "reached when trying to add file descriptor %d to an epoll instance.", fd); - return; + int rc = (module->thread_init)(); + + if (rc != 0) + { + break; + } + } + } + + if (module) + { + // If module is non-NULL it means that the initialization failed for + // that module. We now need to call finish on all modules that were + // successfully initialized. + MXS_MODULE* failed_module = module; + i = mxs_module_iterator_get(NULL); + + while ((module = mxs_module_iterator_get_next(&i)) != failed_module) + { + if (module->thread_finish) + { + (module->thread_finish)(); + } } } else { - ss_dassert(op == EPOLL_CTL_DEL); + initialized = true; + } - /* Must be removing */ - if (ENOENT == errornum) + return initialized; +} + +/** + * Calls thread_finish on all loaded modules. + */ +void modules_thread_finish() +{ + MXS_MODULE_ITERATOR i = mxs_module_iterator_get(NULL); + MXS_MODULE* module = NULL; + + while ((module = mxs_module_iterator_get_next(&i)) != NULL) + { + if (module->thread_finish) { - MXS_ERROR("File descriptor %d was not found in epoll instance.", fd); - return; + (module->thread_finish)(); } } - - /* Common checks for add or remove - crash MaxScale */ - if (EBADF == errornum) - { - raise(SIGABRT); - } - if (EINVAL == errornum) - { - raise(SIGABRT); - } - if (ENOMEM == errornum) - { - raise(SIGABRT); - } - if (EPERM == errornum) - { - raise(SIGABRT); - } - - /* Undocumented error number */ - raise(SIGABRT); } } -static bool modules_thread_init(); -static void modules_thread_finish(); - -WorkerLoad::WorkerLoad() - : m_start_time(0) - , m_wait_start(0) - , m_wait_time(0) - , m_load_1_minute(&m_load_1_hour) - , m_load_1_second(&m_load_1_minute) +namespace maxscale { -} -void WorkerLoad::about_to_work(uint64_t now) -{ - uint64_t duration = now - m_start_time; - - m_wait_time += (now - m_wait_start); - - if (duration > ONE_SECOND) - { - int load_percentage = 100 * ((duration - m_wait_time) / (double)duration); - - m_start_time = now; - m_wait_time = 0; - - m_load_1_second.add_value(load_percentage); - } -} - -WorkerLoad::Average::~Average() -{ -} - -//static -uint64_t WorkerLoad::get_time() -{ - uint64_t now; - - timespec t; - - ss_debug(int rv=)clock_gettime(CLOCK_MONOTONIC, &t); - ss_dassert(rv == 0); - - return t.tv_sec * 1000 + (t.tv_nsec / 1000000); -} - -RoutingWorker::RoutingWorker(int id, - int epoll_fd) - : m_id(id) - , m_state(STOPPED) - , m_epoll_fd(epoll_fd) - , m_pQueue(NULL) - , m_thread(0) - , m_started(false) - , m_should_shutdown(false) - , m_shutdown_initiated(false) - , m_nCurrent_descriptors(0) - , m_nTotal_descriptors(0) +RoutingWorker::RoutingWorker() { MXS_POLL_DATA::handler = &RoutingWorker::epoll_instance_handler; - MXS_POLL_DATA::thread.id = id; + MXS_POLL_DATA::thread.id = m_id; } RoutingWorker::~RoutingWorker() { - ss_dassert(!m_started); - - delete m_pQueue; - close(m_epoll_fd); } // static @@ -235,17 +179,25 @@ bool RoutingWorker::init() if (this_unit.epoll_listener_fd != -1) { - int n_workers = config_threadcount(); - RoutingWorker** ppWorkers = new (std::nothrow) RoutingWorker* [n_workers] (); // Zero initialized array + int nWorkers = config_threadcount(); + RoutingWorker** ppWorkers = new (std::nothrow) RoutingWorker* [nWorkers] (); // Zero inited array if (ppWorkers) { - for (int i = 0; i < n_workers; ++i) + int id_main_worker = WORKER_ABSENT_ID; + int i; + for (i = 0; i < nWorkers; ++i) { - RoutingWorker* pWorker = RoutingWorker::create(i, this_unit.epoll_listener_fd); + RoutingWorker* pWorker = RoutingWorker::create(this_unit.epoll_listener_fd); if (pWorker) { + // The first created worker will be the main worker. + if (id_main_worker == WORKER_ABSENT_ID) + { + id_main_worker = pWorker->id(); + } + ppWorkers[i] = pWorker; } else @@ -264,19 +216,21 @@ bool RoutingWorker::init() if (ppWorkers) { this_unit.ppWorkers = ppWorkers; - this_unit.n_workers = n_workers; + this_unit.nWorkers = nWorkers; + this_unit.id_main_worker = id_main_worker; this_unit.initialized = true; } } else { + MXS_OOM(); close(this_unit.epoll_listener_fd); } } else { - MXS_ERROR("Could not allocate an epoll instance."); + MXS_ALERT("Could not allocate an epoll instance."); } if (this_unit.initialized) @@ -296,7 +250,7 @@ void RoutingWorker::finish() { ss_dassert(this_unit.initialized); - for (int i = this_unit.n_workers - 1; i >= 0; --i) + for (int i = this_unit.nWorkers - 1; i >= 0; --i) { RoutingWorker* pWorker = this_unit.ppWorkers[i]; @@ -313,199 +267,6 @@ void RoutingWorker::finish() this_unit.initialized = false; } -namespace -{ - -int64_t one_stats_get(int64_t RoutingWorker::STATISTICS::*what, enum ts_stats_type type) -{ - int64_t best = type == TS_STATS_MAX ? LONG_MIN : (type == TS_STATS_MIX ? LONG_MAX : 0); - - for (int i = 0; i < this_unit.n_workers; ++i) - { - RoutingWorker* pWorker = RoutingWorker::get(i); - ss_dassert(pWorker); - - const RoutingWorker::STATISTICS& s = pWorker->statistics(); - - int64_t value = s.*what; - - switch (type) - { - case TS_STATS_MAX: - if (value > best) - { - best = value; - } - break; - - case TS_STATS_MIX: - if (value < best) - { - best = value; - } - break; - - case TS_STATS_AVG: - case TS_STATS_SUM: - best += value; - break; - } - } - - return type == TS_STATS_AVG ? best / this_unit.n_workers : best; -} - -} - -//static -RoutingWorker::STATISTICS RoutingWorker::get_statistics() -{ - STATISTICS cs; - - cs.n_read = one_stats_get(&STATISTICS::n_read, TS_STATS_SUM); - cs.n_write = one_stats_get(&STATISTICS::n_write, TS_STATS_SUM); - cs.n_error = one_stats_get(&STATISTICS::n_error, TS_STATS_SUM); - cs.n_hup = one_stats_get(&STATISTICS::n_hup, TS_STATS_SUM); - cs.n_accept = one_stats_get(&STATISTICS::n_accept, TS_STATS_SUM); - cs.n_polls = one_stats_get(&STATISTICS::n_polls, TS_STATS_SUM); - cs.n_pollev = one_stats_get(&STATISTICS::n_pollev, TS_STATS_SUM); - cs.n_nbpollev = one_stats_get(&STATISTICS::n_nbpollev, TS_STATS_SUM); - cs.evq_length = one_stats_get(&STATISTICS::evq_length, TS_STATS_AVG); - cs.evq_max = one_stats_get(&STATISTICS::evq_max, TS_STATS_MAX); - cs.blockingpolls = one_stats_get(&STATISTICS::blockingpolls, TS_STATS_SUM); - cs.maxqtime = one_stats_get(&STATISTICS::maxqtime, TS_STATS_MAX); - cs.maxexectime = one_stats_get(&STATISTICS::maxexectime, TS_STATS_MAX); - - for (int i = 0; i < RoutingWorker::STATISTICS::MAXNFDS - 1; i++) - { - for (int j = 0; j < this_unit.n_workers; ++j) - { - RoutingWorker* pWorker = RoutingWorker::get(j); - ss_dassert(pWorker); - - cs.n_fds[i] += pWorker->statistics().n_fds[i]; - } - } - - for (int i = 0; i <= RoutingWorker::STATISTICS::N_QUEUE_TIMES; ++i) - { - for (int j = 0; j < this_unit.n_workers; ++j) - { - RoutingWorker* pWorker = RoutingWorker::get(j); - ss_dassert(pWorker); - - cs.qtimes[i] += pWorker->statistics().qtimes[i]; - cs.exectimes[i] += pWorker->statistics().exectimes[i]; - } - - cs.qtimes[i] /= this_unit.n_workers; - cs.exectimes[i] /= this_unit.n_workers; - } - - return cs; -} - -//static -int64_t RoutingWorker::get_one_statistic(POLL_STAT what) -{ - int64_t rv = 0; - - int64_t RoutingWorker::STATISTICS::*member = NULL; - enum ts_stats_type approach; - - switch (what) - { - case POLL_STAT_READ: - member = &RoutingWorker::STATISTICS::n_read; - approach = TS_STATS_SUM; - break; - - case POLL_STAT_WRITE: - member = &RoutingWorker::STATISTICS::n_write; - approach = TS_STATS_SUM; - break; - - case POLL_STAT_ERROR: - member = &RoutingWorker::STATISTICS::n_error; - approach = TS_STATS_SUM; - break; - - case POLL_STAT_HANGUP: - member = &RoutingWorker::STATISTICS::n_hup; - approach = TS_STATS_SUM; - break; - - case POLL_STAT_ACCEPT: - member = &RoutingWorker::STATISTICS::n_accept; - approach = TS_STATS_SUM; - break; - - case POLL_STAT_EVQ_LEN: - member = &RoutingWorker::STATISTICS::evq_length; - approach = TS_STATS_AVG; - break; - - case POLL_STAT_EVQ_MAX: - member = &RoutingWorker::STATISTICS::evq_max; - approach = TS_STATS_MAX; - break; - - case POLL_STAT_MAX_QTIME: - member = &RoutingWorker::STATISTICS::maxqtime; - approach = TS_STATS_MAX; - break; - - case POLL_STAT_MAX_EXECTIME: - member = &RoutingWorker::STATISTICS::maxexectime; - approach = TS_STATS_MAX; - break; - - default: - ss_dassert(!true); - } - - if (member) - { - rv = one_stats_get(member, approach); - } - - return rv; -} - -void RoutingWorker::get_descriptor_counts(uint32_t* pnCurrent, uint64_t* pnTotal) -{ - *pnCurrent = atomic_load_uint32(&m_nCurrent_descriptors); - *pnTotal = atomic_load_uint64(&m_nTotal_descriptors); -} - -bool RoutingWorker::add_fd(int fd, uint32_t events, MXS_POLL_DATA* pData) -{ - bool rv = true; - - // Must be edge-triggered. - events |= EPOLLET; - - struct epoll_event ev; - - ev.events = events; - ev.data.ptr = pData; - - pData->thread.id = m_id; - - if (epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, fd, &ev) == 0) - { - atomic_add_uint32(&m_nCurrent_descriptors, 1); - atomic_add_uint64(&m_nTotal_descriptors, 1); - } - else - { - poll_resolve_error(fd, errno, EPOLL_CTL_ADD); - rv = false; - } - - return rv; -} - //static bool RoutingWorker::add_shared_fd(int fd, uint32_t events, MXS_POLL_DATA* pData) { @@ -528,26 +289,7 @@ bool RoutingWorker::add_shared_fd(int fd, uint32_t events, MXS_POLL_DATA* pData) if (epoll_ctl(this_unit.epoll_listener_fd, EPOLL_CTL_ADD, fd, &ev) != 0) { - poll_resolve_error(fd, errno, EPOLL_CTL_ADD); - rv = false; - } - - return rv; -} - -bool RoutingWorker::remove_fd(int fd) -{ - bool rv = true; - - struct epoll_event ev = {}; - - if (epoll_ctl(m_epoll_fd, EPOLL_CTL_DEL, fd, &ev) == 0) - { - atomic_add_uint32(&m_nCurrent_descriptors, -1); - } - else - { - poll_resolve_error(fd, errno, EPOLL_CTL_DEL); + Worker::resolve_poll_error(fd, errno, EPOLL_CTL_ADD); rv = false; } @@ -563,7 +305,7 @@ bool RoutingWorker::remove_shared_fd(int fd) if (epoll_ctl(this_unit.epoll_listener_fd, EPOLL_CTL_DEL, fd, &ev) != 0) { - poll_resolve_error(fd, errno, EPOLL_CTL_DEL); + Worker::resolve_poll_error(fd, errno, EPOLL_CTL_DEL); rv = false; } @@ -582,21 +324,16 @@ bool mxs_worker_should_shutdown(MXS_WORKER* pWorker) RoutingWorker* RoutingWorker::get(int worker_id) { - ss_dassert(worker_id < this_unit.n_workers); + if (worker_id == MAIN) + { + worker_id = this_unit.id_main_worker; + } + + ss_dassert(worker_id < this_unit.nWorkers); return this_unit.ppWorkers[worker_id]; } -MXS_WORKER* mxs_worker_get(int worker_id) -{ - return RoutingWorker::get(worker_id); -} - -int mxs_worker_get_current_id() -{ - return RoutingWorker::get_current_id(); -} - RoutingWorker* RoutingWorker::get_current() { RoutingWorker* pWorker = NULL; @@ -616,6 +353,43 @@ int RoutingWorker::get_current_id() return this_thread.current_worker_id; } +//static +bool RoutingWorker::start_threaded_workers() +{ + bool rv = true; + size_t stack_size = config_thread_stack_size(); + + // The first RoutingWorker will be run in the main thread, so + // we start from 1 and not 0. + for (int i = 1; i < this_unit.nWorkers; ++i) + { + RoutingWorker* pWorker = this_unit.ppWorkers[i]; + ss_dassert(pWorker); + + if (!pWorker->start(stack_size)) + { + MXS_ALERT("Could not start routing worker %d of %d.", i, this_unit.nWorkers); + rv = false; + // At startup, so we don't even try to clean up. + break; + } + } + + return rv; +} + +//static +void RoutingWorker::join_threaded_workers() +{ + for (int i = 1; i < this_unit.nWorkers; i++) + { + RoutingWorker* pWorker = this_unit.ppWorkers[i]; + ss_dassert(pWorker); + + pWorker->join(); + } +} + //static void RoutingWorker::set_nonblocking_polls(unsigned int nbpolls) { @@ -628,280 +402,11 @@ void RoutingWorker::set_maxwait(unsigned int maxwait) this_unit.max_poll_sleep = maxwait; } -bool RoutingWorker::post(Task* pTask, Semaphore* pSem, enum execute_mode_t mode) -{ - // No logging here, function must be signal safe. - bool rval = true; - - if (mode == RoutingWorker::EXECUTE_AUTO && RoutingWorker::get_current() == this) - { - pTask->execute(*this); - - if (pSem) - { - pSem->post(); - } - } - else - { - intptr_t arg1 = reinterpret_cast(pTask); - intptr_t arg2 = reinterpret_cast(pSem); - - rval = post_message(MXS_WORKER_MSG_TASK, arg1, arg2); - } - - return rval; -} - -bool RoutingWorker::post(std::auto_ptr sTask, enum execute_mode_t mode) -{ - // No logging here, function must be signal safe. - return post_disposable(sTask.release(), mode); -} - -// private -bool RoutingWorker::post_disposable(DisposableTask* pTask, enum execute_mode_t mode) -{ - bool posted = true; - - pTask->inc_ref(); - - if (mode == RoutingWorker::EXECUTE_AUTO && RoutingWorker::get_current() == this) - { - pTask->execute(*this); - pTask->dec_ref(); - } - else - { - intptr_t arg1 = reinterpret_cast(pTask); - - posted = post_message(MXS_WORKER_MSG_DISPOSABLE_TASK, arg1, 0); - - if (!posted) - { - pTask->dec_ref(); - } - } - - return posted; -} - -//static -size_t RoutingWorker::broadcast(Task* pTask, Semaphore* pSem) -{ - // No logging here, function must be signal safe. - size_t n = 0; - - for (int i = 0; i < this_unit.n_workers; ++i) - { - RoutingWorker* pWorker = this_unit.ppWorkers[i]; - - if (pWorker->post(pTask, pSem)) - { - ++n; - } - } - - return n; -} - -//static -size_t RoutingWorker::broadcast(std::auto_ptr sTask) -{ - DisposableTask* pTask = sTask.release(); - pTask->inc_ref(); - - size_t n = 0; - - for (int i = 0; i < this_unit.n_workers; ++i) - { - RoutingWorker* pWorker = this_unit.ppWorkers[i]; - - if (pWorker->post_disposable(pTask)) - { - ++n; - } - } - - pTask->dec_ref(); - - return n; -} - -//static -size_t RoutingWorker::execute_serially(Task& task) -{ - Semaphore sem; - size_t n = 0; - - for (int i = 0; i < this_unit.n_workers; ++i) - { - RoutingWorker* pWorker = this_unit.ppWorkers[i]; - - if (pWorker->post(&task, &sem)) - { - sem.wait(); - ++n; - } - } - - return n; -} - -//static -size_t RoutingWorker::execute_concurrently(Task& task) -{ - Semaphore sem; - return sem.wait_n(RoutingWorker::broadcast(&task, &sem)); -} - -bool RoutingWorker::post_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2) -{ - // NOTE: No logging here, this function must be signal safe. - MessageQueue::Message message(msg_id, arg1, arg2); - - return m_pQueue->post(message); -} - -bool mxs_worker_post_message(MXS_WORKER* pWorker, uint32_t msg_id, intptr_t arg1, intptr_t arg2) -{ - return static_cast(pWorker)->post_message(msg_id, arg1, arg2); -} - -size_t RoutingWorker::broadcast_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2) -{ - // NOTE: No logging here, this function must be signal safe. - - size_t n = 0; - - for (int i = 0; i < this_unit.n_workers; ++i) - { - RoutingWorker* pWorker = this_unit.ppWorkers[i]; - - if (pWorker->post_message(msg_id, arg1, arg2)) - { - ++n; - } - } - - return n; -} - -size_t mxs_worker_broadcast_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2) -{ - return RoutingWorker::broadcast_message(msg_id, arg1, arg2); -} - -bool mxs_worker_register_session(MXS_SESSION* session) -{ - RoutingWorker* pWorker = RoutingWorker::get_current(); - ss_dassert(pWorker); - return pWorker->session_registry().add(session); -} - -bool mxs_worker_deregister_session(uint64_t id) -{ - RoutingWorker* pWorker = RoutingWorker::get_current(); - ss_dassert(pWorker); - return pWorker->session_registry().remove(id); -} - -MXS_SESSION* mxs_worker_find_session(uint64_t id) -{ - RoutingWorker* pWorker = RoutingWorker::get_current(); - ss_dassert(pWorker); - return pWorker->session_registry().lookup(id); -} - RoutingWorker::SessionsById& RoutingWorker::session_registry() { return m_sessions; } -class WorkerInfoTask: public maxscale::WorkerTask -{ -public: - WorkerInfoTask(const char* host, uint32_t nthreads): - m_host(host) - { - m_data.resize(nthreads); - } - - void execute(RoutingWorker& worker) - { - json_t* stats = json_object(); - const RoutingWorker::STATISTICS& s = worker.get_local_statistics(); - json_object_set_new(stats, "reads", json_integer(s.n_read)); - json_object_set_new(stats, "writes", json_integer(s.n_write)); - json_object_set_new(stats, "errors", json_integer(s.n_error)); - json_object_set_new(stats, "hangups", json_integer(s.n_hup)); - json_object_set_new(stats, "accepts", json_integer(s.n_accept)); - json_object_set_new(stats, "blocking_polls", json_integer(s.blockingpolls)); - json_object_set_new(stats, "event_queue_length", json_integer(s.evq_length)); - json_object_set_new(stats, "max_event_queue_length", json_integer(s.evq_max)); - json_object_set_new(stats, "max_exec_time", json_integer(s.maxexectime)); - json_object_set_new(stats, "max_queue_time", json_integer(s.maxqtime)); - - json_t* attr = json_object(); - json_object_set_new(attr, "stats", stats); - - int idx = worker.get_current_id(); - stringstream ss; - ss << idx; - - json_t* json = json_object(); - json_object_set_new(json, CN_ID, json_string(ss.str().c_str())); - json_object_set_new(json, CN_TYPE, json_string(CN_THREADS)); - json_object_set_new(json, CN_ATTRIBUTES, attr); - json_object_set_new(json, CN_LINKS, mxs_json_self_link(m_host, CN_THREADS, ss.str().c_str())); - - ss_dassert((size_t)idx < m_data.size()); - m_data[idx] = json; - } - - json_t* resource() - { - json_t* arr = json_array(); - - for (vector::iterator it = m_data.begin(); it != m_data.end(); it++) - { - json_array_append_new(arr, *it); - } - - return mxs_json_resource(m_host, MXS_JSON_API_THREADS, arr); - } - - json_t* resource(int id) - { - stringstream self; - self << MXS_JSON_API_THREADS << id; - return mxs_json_resource(m_host, self.str().c_str(), m_data[id]); - } - -private: - vector m_data; - const char* m_host; -}; - -json_t* mxs_worker_to_json(const char* host, int id) -{ - RoutingWorker* target = RoutingWorker::get(id); - WorkerInfoTask task(host, id + 1); - Semaphore sem; - - target->post(&task, &sem); - sem.wait(); - - return task.resource(id); -} - -json_t* mxs_worker_list_to_json(const char* host) -{ - WorkerInfoTask task(host, config_threadcount()); - RoutingWorker::execute_concurrently(task); - return task.resource(); -} - void RoutingWorker::register_zombie(DCB* pDcb) { ss_dassert(pDcb->poll.thread.id == m_id); @@ -917,376 +422,91 @@ void RoutingWorker::delete_zombies() while (!m_zombies.empty()) { DCB* pDcb = m_zombies.back(); - m_zombies.resize(m_zombies.size() - 1); + m_zombies.pop_back(); dcb_final_close(pDcb); } } -void RoutingWorker::run() +bool RoutingWorker::pre_run() { this_thread.current_worker_id = m_id; - if (modules_thread_init() && service_thread_init()) - { - poll_waitevents(); + bool rv = modules_thread_init() && service_thread_init(); - MXS_INFO("Worker %d has shut down.", m_id); - modules_thread_finish(); - } - else + if (!rv) { MXS_ERROR("Could not perform thread initialization for all modules. Thread exits."); + this_thread.current_worker_id = WORKER_ABSENT_ID; } + return rv; +} + +void RoutingWorker::post_run() +{ + modules_thread_finish(); + // TODO: Add sercice_thread_finish(). this_thread.current_worker_id = WORKER_ABSENT_ID; } -bool RoutingWorker::start(size_t stack_size) -{ - m_started = true; - - if (!thread_start(&m_thread, &RoutingWorker::thread_main, this, stack_size)) - { - m_started = false; - } - - return m_started; -} - -void RoutingWorker::join() -{ - if (m_started) - { - MXS_INFO("Waiting for worker %d.", m_id); - thread_wait(m_thread); - MXS_INFO("Waited for worker %d.", m_id); - m_started = false; - } -} - -void RoutingWorker::shutdown() -{ - // NOTE: No logging here, this function must be signal safe. - - if (!m_shutdown_initiated) - { - if (post_message(MXS_WORKER_MSG_SHUTDOWN, 0, 0)) - { - m_shutdown_initiated = true; - } - } -} - -void RoutingWorker::shutdown_all() -{ - // NOTE: No logging here, this function must be signal safe. - ss_dassert((this_unit.n_workers == 0) || (this_unit.ppWorkers != NULL)); - - for (int i = 0; i < this_unit.n_workers; ++i) - { - RoutingWorker* pWorker = this_unit.ppWorkers[i]; - ss_dassert(pWorker); - - pWorker->shutdown(); - } -} - /** * Creates a worker instance. * - Allocates the structure. * - Creates a pipe. * - Adds the read descriptor to the polling mechanism. * - * @param worker_id The id of the worker. * @param epoll_listener_fd The file descriptor of the epoll set to which listening * sockets will be placed. * * @return A worker instance if successful, otherwise NULL. */ //static -RoutingWorker* RoutingWorker::create(int worker_id, int epoll_listener_fd) +RoutingWorker* RoutingWorker::create(int epoll_listener_fd) { - RoutingWorker* pThis = NULL; + RoutingWorker* pThis = new (std::nothrow) RoutingWorker(); - int epoll_fd = epoll_create(MAX_EVENTS); - - if (epoll_fd != -1) + if (pThis) { - pThis = new (std::nothrow) RoutingWorker(worker_id, epoll_fd); + struct epoll_event ev; + ev.events = EPOLLIN; + MXS_POLL_DATA* pData = pThis; + ev.data.ptr = pData; // Necessary for pointer adjustment, otherwise downcast will not work. - if (pThis) + // The shared epoll instance descriptor is *not* added using EPOLLET (edge-triggered) + // because we want it to be level-triggered. That way, as long as there is a single + // active (accept() can be called) listening socket, epoll_wait() will return an event + // for it. It must be like that because each worker will call accept() just once before + // calling epoll_wait() again. The end result is that as long as the load of different + // workers is roughly the same, the client connections will be distributed evenly across + // the workers. If the load is not the same, then a worker with less load will get more + // clients that a worker with more load. + if (epoll_ctl(pThis->m_epoll_fd, EPOLL_CTL_ADD, epoll_listener_fd, &ev) == 0) { - struct epoll_event ev; - ev.events = EPOLLIN; - MXS_POLL_DATA* pData = pThis; - ev.data.ptr = pData; // Necessary for pointer adjustment, otherwise downcast will not work. - - // The shared epoll instance descriptor is *not* added using EPOLLET (edge-triggered) - // because we want it to be level-triggered. That way, as long as there is a single - // active (accept() can be called) listening socket, epoll_wait() will return an event - // for it. It must be like that because each worker will call accept() just once before - // calling epoll_wait() again. The end result is that as long as the load of different - // workers is roughly the same, the client connections will be distributed evenly across - // the workers. If the load is not the same, then a worker with less load will get more - // clients that a worker with more load. - if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, epoll_listener_fd, &ev) == 0) - { - MXS_INFO("Epoll instance for listening sockets added to worker epoll instance."); - - MessageQueue* pQueue = MessageQueue::create(pThis); - - if (pQueue) - { - if (pQueue->add_to_worker(pThis)) - { - pThis->m_pQueue = pQueue; - } - else - { - MXS_ERROR("Could not add message queue to worker."); - delete pThis; - pThis = NULL; - } - } - else - { - MXS_ERROR("Could not create message queue for worker."); - delete pThis; - pThis = NULL; - } - } - else - { - MXS_ERROR("Could not add epoll instance for listening sockets to " - "epoll instance of worker: %s", mxs_strerror(errno)); - delete pThis; - pThis = NULL; - } + MXS_INFO("Epoll instance for listening sockets added to worker epoll instance."); } else { - MXS_OOM(); - close(epoll_fd); + MXS_ERROR("Could not add epoll instance for listening sockets to " + "epoll instance of worker: %s", mxs_strerror(errno)); + delete pThis; + pThis = NULL; } } else { - MXS_ERROR("Could not create epoll-instance for worker: %s", mxs_strerror(errno)); + MXS_OOM(); } return pThis; } -/** - * The worker message handler. - * - * @param msg_id The message id. - * @param arg1 Message specific first argument. - * @param arg2 Message specific second argument. - */ -void RoutingWorker::handle_message(MessageQueue& queue, const MessageQueue::Message& msg) +void RoutingWorker::epoll_tick() { - switch (msg.id()) - { - case MXS_WORKER_MSG_PING: - { - ss_dassert(msg.arg1() == 0); - char* zArg2 = reinterpret_cast(msg.arg2()); - const char* zMessage = zArg2 ? zArg2 : "Alive and kicking"; - MXS_NOTICE("Worker[%d]: %s.", m_id, zMessage); - MXS_FREE(zArg2); - } - break; + dcb_process_idle_sessions(m_id); - case MXS_WORKER_MSG_SHUTDOWN: - { - MXS_INFO("Worker %d received shutdown message.", m_id); - m_should_shutdown = true; - } - break; + m_state = ZPROCESSING; - case MXS_WORKER_MSG_CALL: - { - void (*f)(int, void*) = (void (*)(int, void*))msg.arg1(); - - f(m_id, (void*)msg.arg2()); - } - break; - - case MXS_WORKER_MSG_TASK: - { - Task *pTask = reinterpret_cast(msg.arg1()); - Semaphore* pSem = reinterpret_cast(msg.arg2()); - - pTask->execute(*this); - - if (pSem) - { - pSem->post(); - } - } - break; - - case MXS_WORKER_MSG_DISPOSABLE_TASK: - { - DisposableTask *pTask = reinterpret_cast(msg.arg1()); - pTask->execute(*this); - pTask->dec_ref(); - } - break; - - default: - MXS_ERROR("Worker received unknown message %d.", msg.id()); - } -} - -/** - * The entry point of each worker thread. - * - * @param arg A worker. - */ -//static -void RoutingWorker::thread_main(void* pArg) -{ - RoutingWorker* pWorker = static_cast(pArg); - pWorker->run(); -} - -/** - * The main polling loop - */ -void RoutingWorker::poll_waitevents() -{ - struct epoll_event events[MAX_EVENTS]; - - m_state = IDLE; - - m_load.reset(); - - while (!should_shutdown()) - { - int nfds; - - m_state = POLLING; - - atomic_add_int64(&m_statistics.n_polls, 1); - - uint64_t now = Load::get_time(); - int timeout = Load::GRANULARITY - (now - m_load.start_time()); - - if (timeout < 0) - { - // If the processing of the last batch of events took us past the next - // time boundary, we ensure we return immediately. - timeout = 0; - } - - m_load.about_to_wait(now); - nfds = epoll_wait(m_epoll_fd, events, MAX_EVENTS, timeout); - m_load.about_to_work(); - - if (nfds == -1 && errno != EINTR) - { - int eno = errno; - errno = 0; - MXS_ERROR("%lu [poll_waitevents] epoll_wait returned " - "%d, errno %d", - pthread_self(), - nfds, - eno); - } - - if (nfds > 0) - { - m_statistics.evq_length = nfds; - if (nfds > m_statistics.evq_max) - { - m_statistics.evq_max = nfds; - } - - MXS_DEBUG("%lu [poll_waitevents] epoll_wait found %d fds", - pthread_self(), - nfds); - atomic_add_int64(&m_statistics.n_pollev, 1); - - m_state = PROCESSING; - - m_statistics.n_fds[(nfds < STATISTICS::MAXNFDS ? (nfds - 1) : STATISTICS::MAXNFDS - 1)]++; - } - - uint64_t cycle_start = mxs_clock(); - - for (int i = 0; i < nfds; i++) - { - /** Calculate event queue statistics */ - int64_t started = mxs_clock(); - int64_t qtime = started - cycle_start; - - if (qtime > STATISTICS::N_QUEUE_TIMES) - { - m_statistics.qtimes[STATISTICS::N_QUEUE_TIMES]++; - } - else - { - m_statistics.qtimes[qtime]++; - } - - m_statistics.maxqtime = MXS_MAX(m_statistics.maxqtime, qtime); - - MXS_POLL_DATA *data = (MXS_POLL_DATA*)events[i].data.ptr; - - uint32_t actions = data->handler(data, m_id, events[i].events); - - if (actions & MXS_POLL_ACCEPT) - { - atomic_add_int64(&m_statistics.n_accept, 1); - } - - if (actions & MXS_POLL_READ) - { - atomic_add_int64(&m_statistics.n_read, 1); - } - - if (actions & MXS_POLL_WRITE) - { - atomic_add_int64(&m_statistics.n_write, 1); - } - - if (actions & MXS_POLL_HUP) - { - atomic_add_int64(&m_statistics.n_hup, 1); - } - - if (actions & MXS_POLL_ERROR) - { - atomic_add_int64(&m_statistics.n_error, 1); - } - - /** Calculate event execution statistics */ - qtime = mxs_clock() - started; - - if (qtime > STATISTICS::N_QUEUE_TIMES) - { - m_statistics.exectimes[STATISTICS::N_QUEUE_TIMES]++; - } - else - { - m_statistics.exectimes[qtime % STATISTICS::N_QUEUE_TIMES]++; - } - - m_statistics.maxexectime = MXS_MAX(m_statistics.maxexectime, qtime); - } - - dcb_process_idle_sessions(m_id); - - m_state = ZPROCESSING; - - delete_zombies(); - - m_state = IDLE; - } /*< while(1) */ - - m_state = STOPPED; + delete_zombies(); } /** @@ -1342,68 +562,45 @@ uint32_t RoutingWorker::handle_epoll_events(uint32_t events) return actions; } -/** - * Calls thread_init on all loaded modules. - * - * @return True, if all modules were successfully initialized. - */ -static bool modules_thread_init() -{ - bool initialized = false; - - MXS_MODULE_ITERATOR i = mxs_module_iterator_get(NULL); - MXS_MODULE* module = NULL; - - while ((module = mxs_module_iterator_get_next(&i)) != NULL) - { - if (module->thread_init) - { - int rc = (module->thread_init)(); - - if (rc != 0) - { - break; - } - } - } - - if (module) - { - // If module is non-NULL it means that the initialization failed for - // that module. We now need to call finish on all modules that were - // successfully initialized. - MXS_MODULE* failed_module = module; - i = mxs_module_iterator_get(NULL); - - while ((module = mxs_module_iterator_get_next(&i)) != failed_module) - { - if (module->thread_finish) - { - (module->thread_finish)(); - } - } - } - else - { - initialized = true; - } - - return initialized; } -/** - * Calls thread_finish on all loaded modules. - */ -static void modules_thread_finish() +size_t mxs_rworker_broadcast_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2) { - MXS_MODULE_ITERATOR i = mxs_module_iterator_get(NULL); - MXS_MODULE* module = NULL; - - while ((module = mxs_module_iterator_get_next(&i)) != NULL) - { - if (module->thread_finish) - { - (module->thread_finish)(); - } - } + return RoutingWorker::broadcast_message(msg_id, arg1, arg2); +} + +bool mxs_rworker_register_session(MXS_SESSION* session) +{ + RoutingWorker* pWorker = RoutingWorker::get_current(); + ss_dassert(pWorker); + return pWorker->session_registry().add(session); +} + +bool mxs_rworker_deregister_session(uint64_t id) +{ + RoutingWorker* pWorker = RoutingWorker::get_current(); + ss_dassert(pWorker); + return pWorker->session_registry().remove(id); +} + +MXS_SESSION* mxs_rworker_find_session(uint64_t id) +{ + RoutingWorker* pWorker = RoutingWorker::get_current(); + ss_dassert(pWorker); + return pWorker->session_registry().lookup(id); +} + +MXS_WORKER* mxs_rworker_get(int worker_id) +{ + return RoutingWorker::get(worker_id); +} + +MXS_WORKER* mxs_rworker_get_current() +{ + return RoutingWorker::get_current(); +} + +int mxs_rworker_get_current_id() +{ + return RoutingWorker::get_current_id(); } diff --git a/server/core/service.cc b/server/core/service.cc index 5ce580a67..2a1bdc9ee 100644 --- a/server/core/service.cc +++ b/server/core/service.cc @@ -50,7 +50,7 @@ #include #include #include -#include +#include #include "internal/config.h" #include "internal/filter.h" @@ -1695,7 +1695,7 @@ int service_refresh_users(SERVICE *service) { ss_dassert(service); int ret = 1; - int self = mxs_worker_get_current_id(); + int self = mxs_rworker_get_current_id(); ss_dassert(self >= 0); time_t now = time(NULL); diff --git a/server/core/worker.cc b/server/core/worker.cc new file mode 100644 index 000000000..5bd87526b --- /dev/null +++ b/server/core/worker.cc @@ -0,0 +1,1071 @@ +/* + * Copyright (c) 2016 MariaDB Corporation Ab + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file and at www.mariadb.com/bsl11. + * + * Change Date: 2020-01-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2 or later of the General + * Public License. + */ + +#include "internal/worker.hh" + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "internal/dcb.h" +#include "internal/modules.h" +#include "internal/poll.h" +#include "internal/service.h" +#include "internal/statistics.h" +#include "internal/workertask.hh" + +#define WORKER_ABSENT_ID -1 + +using std::vector; +using std::stringstream; + +namespace +{ + +using maxscale::Worker; + +const int MXS_WORKER_MSG_TASK = -1; +const int MXS_WORKER_MSG_DISPOSABLE_TASK = -2; + +/** + * Unit variables. + */ +struct this_unit +{ + bool initialized; // Whether the initialization has been performed. + int n_workers; // How many workers there are. + Worker** ppWorkers; // Array of worker instances. + int next_worker_id; // Next worker id. +} this_unit = +{ + false, // initialized + 0, // n_workers + NULL, // ppWorkers + 0, // next_worker_id +}; + +int next_worker_id() +{ + return atomic_add(&this_unit.next_worker_id, 1); +} + +thread_local struct this_thread +{ + int current_worker_id; // The worker id of the current thread +} this_thread = +{ + WORKER_ABSENT_ID +}; + +/** + * Structure used for sending cross-thread messages. + */ +typedef struct worker_message +{ + uint32_t id; /*< Message id. */ + intptr_t arg1; /*< Message specific first argument. */ + intptr_t arg2; /*< Message specific second argument. */ +} WORKER_MESSAGE; + +} + +static bool modules_thread_init(); +static void modules_thread_finish(); + +namespace maxscale +{ + +WorkerLoad::WorkerLoad() + : m_start_time(0) + , m_wait_start(0) + , m_wait_time(0) + , m_load_1_minute(&m_load_1_hour) + , m_load_1_second(&m_load_1_minute) +{ +} + +void WorkerLoad::about_to_work(uint64_t now) +{ + uint64_t duration = now - m_start_time; + + m_wait_time += (now - m_wait_start); + + if (duration > ONE_SECOND) + { + int load_percentage = 100 * ((duration - m_wait_time) / (double)duration); + + m_start_time = now; + m_wait_time = 0; + + m_load_1_second.add_value(load_percentage); + } +} + +WorkerLoad::Average::~Average() +{ +} + +//static +uint64_t WorkerLoad::get_time() +{ + uint64_t now; + + timespec t; + + ss_debug(int rv=)clock_gettime(CLOCK_MONOTONIC, &t); + ss_dassert(rv == 0); + + return t.tv_sec * 1000 + (t.tv_nsec / 1000000); +} + +Worker::Worker() + : m_id(next_worker_id()) + , m_epoll_fd(epoll_create(MAX_EVENTS)) + , m_state(STOPPED) + , m_pQueue(NULL) + , m_thread(0) + , m_started(false) + , m_should_shutdown(false) + , m_shutdown_initiated(false) + , m_nCurrent_descriptors(0) + , m_nTotal_descriptors(0) +{ + if (m_epoll_fd != -1) + { + m_pQueue = MessageQueue::create(this); + + if (m_pQueue) + { + if (!m_pQueue->add_to_worker(this)) + { + MXS_ALERT("Could not add message queue to worker. MaxScale will not work."); + ss_dassert(!true); + } + } + else + { + MXS_ALERT("Could not create message queue for worker. MaxScale will not work."); + ss_dassert(!true); + } + } + else + { + MXS_ALERT("Could not create epoll-instance for worker: %s. MaxScale will not work.", + mxs_strerror(errno)); + ss_dassert(!true); + } + + this_unit.ppWorkers[m_id] = this; +} + +Worker::~Worker() +{ + this_unit.ppWorkers[m_id] = NULL; + + ss_dassert(!m_started); + + delete m_pQueue; + close(m_epoll_fd); +} + +// static +bool Worker::init() +{ + ss_dassert(!this_unit.initialized); + + int n_workers = config_threadcount(); + Worker** ppWorkers = new (std::nothrow) Worker* [n_workers] (); // Zero initialized array + + if (ppWorkers) + { + this_unit.ppWorkers = ppWorkers; + this_unit.n_workers = n_workers; + + this_unit.initialized = true; + } + else + { + MXS_OOM(); + ss_dassert(!true); + } + + return this_unit.initialized; +} + +void Worker::finish() +{ + ss_dassert(this_unit.initialized); + + delete [] this_unit.ppWorkers; + this_unit.ppWorkers = NULL; + + this_unit.initialized = false; +} + +namespace +{ + +int64_t one_stats_get(int64_t Worker::STATISTICS::*what, enum ts_stats_type type) +{ + int64_t best = type == TS_STATS_MAX ? LONG_MIN : (type == TS_STATS_MIX ? LONG_MAX : 0); + + for (int i = 0; i < this_unit.n_workers; ++i) + { + Worker* pWorker = Worker::get(i); + ss_dassert(pWorker); + + const Worker::STATISTICS& s = pWorker->statistics(); + + int64_t value = s.*what; + + switch (type) + { + case TS_STATS_MAX: + if (value > best) + { + best = value; + } + break; + + case TS_STATS_MIX: + if (value < best) + { + best = value; + } + break; + + case TS_STATS_AVG: + case TS_STATS_SUM: + best += value; + break; + } + } + + return type == TS_STATS_AVG ? best / this_unit.n_workers : best; +} + +} + +//static +Worker::STATISTICS Worker::get_statistics() +{ + STATISTICS cs; + + cs.n_read = one_stats_get(&STATISTICS::n_read, TS_STATS_SUM); + cs.n_write = one_stats_get(&STATISTICS::n_write, TS_STATS_SUM); + cs.n_error = one_stats_get(&STATISTICS::n_error, TS_STATS_SUM); + cs.n_hup = one_stats_get(&STATISTICS::n_hup, TS_STATS_SUM); + cs.n_accept = one_stats_get(&STATISTICS::n_accept, TS_STATS_SUM); + cs.n_polls = one_stats_get(&STATISTICS::n_polls, TS_STATS_SUM); + cs.n_pollev = one_stats_get(&STATISTICS::n_pollev, TS_STATS_SUM); + cs.n_nbpollev = one_stats_get(&STATISTICS::n_nbpollev, TS_STATS_SUM); + cs.evq_length = one_stats_get(&STATISTICS::evq_length, TS_STATS_AVG); + cs.evq_max = one_stats_get(&STATISTICS::evq_max, TS_STATS_MAX); + cs.blockingpolls = one_stats_get(&STATISTICS::blockingpolls, TS_STATS_SUM); + cs.maxqtime = one_stats_get(&STATISTICS::maxqtime, TS_STATS_MAX); + cs.maxexectime = one_stats_get(&STATISTICS::maxexectime, TS_STATS_MAX); + + for (int i = 0; i < Worker::STATISTICS::MAXNFDS - 1; i++) + { + for (int j = 0; j < this_unit.n_workers; ++j) + { + Worker* pWorker = Worker::get(j); + ss_dassert(pWorker); + + cs.n_fds[i] += pWorker->statistics().n_fds[i]; + } + } + + for (int i = 0; i <= Worker::STATISTICS::N_QUEUE_TIMES; ++i) + { + for (int j = 0; j < this_unit.n_workers; ++j) + { + Worker* pWorker = Worker::get(j); + ss_dassert(pWorker); + + cs.qtimes[i] += pWorker->statistics().qtimes[i]; + cs.exectimes[i] += pWorker->statistics().exectimes[i]; + } + + cs.qtimes[i] /= this_unit.n_workers; + cs.exectimes[i] /= this_unit.n_workers; + } + + return cs; +} + +//static +int64_t Worker::get_one_statistic(POLL_STAT what) +{ + int64_t rv = 0; + + int64_t Worker::STATISTICS::*member = NULL; + enum ts_stats_type approach; + + switch (what) + { + case POLL_STAT_READ: + member = &Worker::STATISTICS::n_read; + approach = TS_STATS_SUM; + break; + + case POLL_STAT_WRITE: + member = &Worker::STATISTICS::n_write; + approach = TS_STATS_SUM; + break; + + case POLL_STAT_ERROR: + member = &Worker::STATISTICS::n_error; + approach = TS_STATS_SUM; + break; + + case POLL_STAT_HANGUP: + member = &Worker::STATISTICS::n_hup; + approach = TS_STATS_SUM; + break; + + case POLL_STAT_ACCEPT: + member = &Worker::STATISTICS::n_accept; + approach = TS_STATS_SUM; + break; + + case POLL_STAT_EVQ_LEN: + member = &Worker::STATISTICS::evq_length; + approach = TS_STATS_AVG; + break; + + case POLL_STAT_EVQ_MAX: + member = &Worker::STATISTICS::evq_max; + approach = TS_STATS_MAX; + break; + + case POLL_STAT_MAX_QTIME: + member = &Worker::STATISTICS::maxqtime; + approach = TS_STATS_MAX; + break; + + case POLL_STAT_MAX_EXECTIME: + member = &Worker::STATISTICS::maxexectime; + approach = TS_STATS_MAX; + break; + + default: + ss_dassert(!true); + } + + if (member) + { + rv = one_stats_get(member, approach); + } + + return rv; +} + +void Worker::get_descriptor_counts(uint32_t* pnCurrent, uint64_t* pnTotal) +{ + *pnCurrent = atomic_load_uint32(&m_nCurrent_descriptors); + *pnTotal = atomic_load_uint64(&m_nTotal_descriptors); +} + +bool Worker::add_fd(int fd, uint32_t events, MXS_POLL_DATA* pData) +{ + bool rv = true; + + // Must be edge-triggered. + events |= EPOLLET; + + struct epoll_event ev; + + ev.events = events; + ev.data.ptr = pData; + + pData->thread.id = m_id; + + if (epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, fd, &ev) == 0) + { + atomic_add_uint32(&m_nCurrent_descriptors, 1); + atomic_add_uint64(&m_nTotal_descriptors, 1); + } + else + { + resolve_poll_error(fd, errno, EPOLL_CTL_ADD); + rv = false; + } + + return rv; +} + +bool Worker::remove_fd(int fd) +{ + bool rv = true; + + struct epoll_event ev = {}; + + if (epoll_ctl(m_epoll_fd, EPOLL_CTL_DEL, fd, &ev) == 0) + { + atomic_add_uint32(&m_nCurrent_descriptors, -1); + } + else + { + resolve_poll_error(fd, errno, EPOLL_CTL_DEL); + rv = false; + } + + return rv; +} + +Worker* Worker::get(int worker_id) +{ + ss_dassert(worker_id < this_unit.n_workers); + + return this_unit.ppWorkers[worker_id]; +} + +Worker* Worker::get_current() +{ + Worker* pWorker = NULL; + + int worker_id = get_current_id(); + + if (worker_id != WORKER_ABSENT_ID) + { + pWorker = Worker::get(worker_id); + } + + return pWorker; +} + +int Worker::get_current_id() +{ + return this_thread.current_worker_id; +} + +bool Worker::post(Task* pTask, Semaphore* pSem, enum execute_mode_t mode) +{ + // No logging here, function must be signal safe. + bool rval = true; + + if (mode == Worker::EXECUTE_AUTO && Worker::get_current() == this) + { + pTask->execute(*this); + + if (pSem) + { + pSem->post(); + } + } + else + { + intptr_t arg1 = reinterpret_cast(pTask); + intptr_t arg2 = reinterpret_cast(pSem); + + rval = post_message(MXS_WORKER_MSG_TASK, arg1, arg2); + } + + return rval; +} + +bool Worker::post(std::auto_ptr sTask, enum execute_mode_t mode) +{ + // No logging here, function must be signal safe. + return post_disposable(sTask.release(), mode); +} + +// private +bool Worker::post_disposable(DisposableTask* pTask, enum execute_mode_t mode) +{ + bool posted = true; + + pTask->inc_ref(); + + if (mode == Worker::EXECUTE_AUTO && Worker::get_current() == this) + { + pTask->execute(*this); + pTask->dec_ref(); + } + else + { + intptr_t arg1 = reinterpret_cast(pTask); + + posted = post_message(MXS_WORKER_MSG_DISPOSABLE_TASK, arg1, 0); + + if (!posted) + { + pTask->dec_ref(); + } + } + + return posted; +} + +//static +size_t Worker::broadcast(Task* pTask, Semaphore* pSem) +{ + // No logging here, function must be signal safe. + size_t n = 0; + + for (int i = 0; i < this_unit.n_workers; ++i) + { + Worker* pWorker = this_unit.ppWorkers[i]; + + if (pWorker->post(pTask, pSem)) + { + ++n; + } + } + + return n; +} + +//static +size_t Worker::broadcast(std::auto_ptr sTask) +{ + DisposableTask* pTask = sTask.release(); + pTask->inc_ref(); + + size_t n = 0; + + for (int i = 0; i < this_unit.n_workers; ++i) + { + Worker* pWorker = this_unit.ppWorkers[i]; + + if (pWorker->post_disposable(pTask)) + { + ++n; + } + } + + pTask->dec_ref(); + + return n; +} + +//static +size_t Worker::execute_serially(Task& task) +{ + Semaphore sem; + size_t n = 0; + + for (int i = 0; i < this_unit.n_workers; ++i) + { + Worker* pWorker = this_unit.ppWorkers[i]; + + if (pWorker->post(&task, &sem)) + { + sem.wait(); + ++n; + } + } + + return n; +} + +//static +size_t Worker::execute_concurrently(Task& task) +{ + Semaphore sem; + return sem.wait_n(Worker::broadcast(&task, &sem)); +} + +bool Worker::post_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2) +{ + // NOTE: No logging here, this function must be signal safe. + MessageQueue::Message message(msg_id, arg1, arg2); + + return m_pQueue->post(message); +} + +size_t Worker::broadcast_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2) +{ + // NOTE: No logging here, this function must be signal safe. + + size_t n = 0; + + for (int i = 0; i < this_unit.n_workers; ++i) + { + Worker* pWorker = this_unit.ppWorkers[i]; + + if (pWorker->post_message(msg_id, arg1, arg2)) + { + ++n; + } + } + + return n; +} + +void Worker::run() +{ + this_thread.current_worker_id = m_id; + + if (pre_run()) + { + poll_waitevents(); + + post_run(); + MXS_INFO("Worker %d has shut down.", m_id); + } + + this_thread.current_worker_id = WORKER_ABSENT_ID; +} + +bool Worker::start(size_t stack_size) +{ + m_started = true; + + if (!thread_start(&m_thread, &Worker::thread_main, this, stack_size)) + { + m_started = false; + } + + return m_started; +} + +void Worker::join() +{ + if (m_started) + { + MXS_INFO("Waiting for worker %d.", m_id); + thread_wait(m_thread); + MXS_INFO("Waited for worker %d.", m_id); + m_started = false; + } +} + +void Worker::shutdown() +{ + // NOTE: No logging here, this function must be signal safe. + + if (!m_shutdown_initiated) + { + if (post_message(MXS_WORKER_MSG_SHUTDOWN, 0, 0)) + { + m_shutdown_initiated = true; + } + } +} + +void Worker::shutdown_all() +{ + // NOTE: No logging here, this function must be signal safe. + ss_dassert((this_unit.n_workers == 0) || (this_unit.ppWorkers != NULL)); + + for (int i = 0; i < this_unit.n_workers; ++i) + { + Worker* pWorker = this_unit.ppWorkers[i]; + ss_dassert(pWorker); + + pWorker->shutdown(); + } +} + +/** + * The worker message handler. + * + * @param msg_id The message id. + * @param arg1 Message specific first argument. + * @param arg2 Message specific second argument. + */ +void Worker::handle_message(MessageQueue& queue, const MessageQueue::Message& msg) +{ + switch (msg.id()) + { + case MXS_WORKER_MSG_PING: + { + ss_dassert(msg.arg1() == 0); + char* zArg2 = reinterpret_cast(msg.arg2()); + const char* zMessage = zArg2 ? zArg2 : "Alive and kicking"; + MXS_NOTICE("Worker[%d]: %s.", m_id, zMessage); + MXS_FREE(zArg2); + } + break; + + case MXS_WORKER_MSG_SHUTDOWN: + { + MXS_INFO("Worker %d received shutdown message.", m_id); + m_should_shutdown = true; + } + break; + + case MXS_WORKER_MSG_CALL: + { + void (*f)(int, void*) = (void (*)(int, void*))msg.arg1(); + + f(m_id, (void*)msg.arg2()); + } + break; + + case MXS_WORKER_MSG_TASK: + { + Task *pTask = reinterpret_cast(msg.arg1()); + Semaphore* pSem = reinterpret_cast(msg.arg2()); + + pTask->execute(*this); + + if (pSem) + { + pSem->post(); + } + } + break; + + case MXS_WORKER_MSG_DISPOSABLE_TASK: + { + DisposableTask *pTask = reinterpret_cast(msg.arg1()); + pTask->execute(*this); + pTask->dec_ref(); + } + break; + + default: + MXS_ERROR("Worker received unknown message %d.", msg.id()); + } +} + +/** + * The entry point of each worker thread. + * + * @param arg A worker. + */ +//static +void Worker::thread_main(void* pArg) +{ + Worker* pWorker = static_cast(pArg); + pWorker->run(); +} + +// static +void Worker::resolve_poll_error(int fd, int errornum, int op) +{ + if (op == EPOLL_CTL_ADD) + { + if (EEXIST == errornum) + { + MXS_ERROR("File descriptor %d already present in an epoll instance.", fd); + return; + } + + if (ENOSPC == errornum) + { + MXS_ERROR("The limit imposed by /proc/sys/fs/epoll/max_user_watches was " + "reached when trying to add file descriptor %d to an epoll instance.", fd); + return; + } + } + else + { + ss_dassert(op == EPOLL_CTL_DEL); + + /* Must be removing */ + if (ENOENT == errornum) + { + MXS_ERROR("File descriptor %d was not found in epoll instance.", fd); + return; + } + } + + /* Common checks for add or remove - crash MaxScale */ + if (EBADF == errornum) + { + raise(SIGABRT); + } + if (EINVAL == errornum) + { + raise(SIGABRT); + } + if (ENOMEM == errornum) + { + raise(SIGABRT); + } + if (EPERM == errornum) + { + raise(SIGABRT); + } + + /* Undocumented error number */ + raise(SIGABRT); +} + +/** + * The main polling loop + */ +void Worker::poll_waitevents() +{ + struct epoll_event events[MAX_EVENTS]; + + m_state = IDLE; + + m_load.reset(); + + while (!should_shutdown()) + { + int nfds; + + m_state = POLLING; + + atomic_add_int64(&m_statistics.n_polls, 1); + + uint64_t now = Load::get_time(); + int timeout = Load::GRANULARITY - (now - m_load.start_time()); + + if (timeout < 0) + { + // If the processing of the last batch of events took us past the next + // time boundary, we ensure we return immediately. + timeout = 0; + } + + m_load.about_to_wait(now); + nfds = epoll_wait(m_epoll_fd, events, MAX_EVENTS, timeout); + m_load.about_to_work(); + + if (nfds == -1 && errno != EINTR) + { + int eno = errno; + errno = 0; + MXS_ERROR("%lu [poll_waitevents] epoll_wait returned " + "%d, errno %d", + pthread_self(), + nfds, + eno); + } + + if (nfds > 0) + { + m_statistics.evq_length = nfds; + if (nfds > m_statistics.evq_max) + { + m_statistics.evq_max = nfds; + } + + MXS_DEBUG("%lu [poll_waitevents] epoll_wait found %d fds", + pthread_self(), + nfds); + atomic_add_int64(&m_statistics.n_pollev, 1); + + m_state = PROCESSING; + + m_statistics.n_fds[(nfds < STATISTICS::MAXNFDS ? (nfds - 1) : STATISTICS::MAXNFDS - 1)]++; + } + + uint64_t cycle_start = mxs_clock(); + + for (int i = 0; i < nfds; i++) + { + /** Calculate event queue statistics */ + int64_t started = mxs_clock(); + int64_t qtime = started - cycle_start; + + if (qtime > STATISTICS::N_QUEUE_TIMES) + { + m_statistics.qtimes[STATISTICS::N_QUEUE_TIMES]++; + } + else + { + m_statistics.qtimes[qtime]++; + } + + m_statistics.maxqtime = MXS_MAX(m_statistics.maxqtime, qtime); + + MXS_POLL_DATA *data = (MXS_POLL_DATA*)events[i].data.ptr; + + uint32_t actions = data->handler(data, m_id, events[i].events); + + if (actions & MXS_POLL_ACCEPT) + { + atomic_add_int64(&m_statistics.n_accept, 1); + } + + if (actions & MXS_POLL_READ) + { + atomic_add_int64(&m_statistics.n_read, 1); + } + + if (actions & MXS_POLL_WRITE) + { + atomic_add_int64(&m_statistics.n_write, 1); + } + + if (actions & MXS_POLL_HUP) + { + atomic_add_int64(&m_statistics.n_hup, 1); + } + + if (actions & MXS_POLL_ERROR) + { + atomic_add_int64(&m_statistics.n_error, 1); + } + + /** Calculate event execution statistics */ + qtime = mxs_clock() - started; + + if (qtime > STATISTICS::N_QUEUE_TIMES) + { + m_statistics.exectimes[STATISTICS::N_QUEUE_TIMES]++; + } + else + { + m_statistics.exectimes[qtime % STATISTICS::N_QUEUE_TIMES]++; + } + + m_statistics.maxexectime = MXS_MAX(m_statistics.maxexectime, qtime); + } + + epoll_tick(); + + m_state = IDLE; + } /*< while(1) */ + + m_state = STOPPED; +} + +} + + +namespace +{ + +class WorkerInfoTask: public maxscale::WorkerTask +{ +public: + WorkerInfoTask(const char* zHost, uint32_t nThreads) + : m_zHost(zHost) + { + m_data.resize(nThreads); + } + + void execute(Worker& worker) + { + json_t* pStats = json_object(); + const Worker::STATISTICS& s = worker.get_local_statistics(); + json_object_set_new(pStats, "reads", json_integer(s.n_read)); + json_object_set_new(pStats, "writes", json_integer(s.n_write)); + json_object_set_new(pStats, "errors", json_integer(s.n_error)); + json_object_set_new(pStats, "hangups", json_integer(s.n_hup)); + json_object_set_new(pStats, "accepts", json_integer(s.n_accept)); + json_object_set_new(pStats, "blocking_polls", json_integer(s.blockingpolls)); + json_object_set_new(pStats, "event_queue_length", json_integer(s.evq_length)); + json_object_set_new(pStats, "max_event_queue_length", json_integer(s.evq_max)); + json_object_set_new(pStats, "max_exec_time", json_integer(s.maxexectime)); + json_object_set_new(pStats, "max_queue_time", json_integer(s.maxqtime)); + + json_t* pAttr = json_object(); + json_object_set_new(pAttr, "stats", pStats); + + int idx = worker.get_current_id(); + stringstream ss; + ss << idx; + + json_t* pJson = json_object(); + json_object_set_new(pJson, CN_ID, json_string(ss.str().c_str())); + json_object_set_new(pJson, CN_TYPE, json_string(CN_THREADS)); + json_object_set_new(pJson, CN_ATTRIBUTES, pAttr); + json_object_set_new(pJson, CN_LINKS, mxs_json_self_link(m_zHost, CN_THREADS, ss.str().c_str())); + + ss_dassert((size_t)idx < m_data.size()); + m_data[idx] = pJson; + } + + json_t* resource() + { + json_t* pArr = json_array(); + + for (auto it = m_data.begin(); it != m_data.end(); it++) + { + json_array_append_new(pArr, *it); + } + + return mxs_json_resource(m_zHost, MXS_JSON_API_THREADS, pArr); + } + + json_t* resource(int id) + { + stringstream self; + self << MXS_JSON_API_THREADS << id; + return mxs_json_resource(m_zHost, self.str().c_str(), m_data[id]); + } + +private: + vector m_data; + const char* m_zHost; +}; + +} + +json_t* mxs_worker_to_json(const char* zHost, int id) +{ + Worker* target = Worker::get(id); + WorkerInfoTask task(zHost, id + 1); + mxs::Semaphore sem; + + target->post(&task, &sem); + sem.wait(); + + return task.resource(id); +} + +json_t* mxs_worker_list_to_json(const char* host) +{ + WorkerInfoTask task(host, config_threadcount()); + Worker::execute_concurrently(task); + return task.resource(); +} + +size_t mxs_worker_broadcast_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2) +{ + return Worker::broadcast_message(msg_id, arg1, arg2); +} + +MXS_WORKER* mxs_worker_get(int worker_id) +{ + return Worker::get(worker_id); +} + +MXS_WORKER* mxs_worker_get_current() +{ + return Worker::get_current(); +} + +int mxs_worker_get_current_id() +{ + return Worker::get_current_id(); +} + +int mxs_worker_id(MXS_WORKER* pWorker) +{ + return static_cast(pWorker)->id(); +} + +bool mxs_worker_should_shutdown(MXS_WORKER* pWorker) +{ + return static_cast(pWorker)->should_shutdown(); +} + +bool mxs_worker_post_message(MXS_WORKER* pWorker, uint32_t msg_id, intptr_t arg1, intptr_t arg2) +{ + return static_cast(pWorker)->post_message(msg_id, arg1, arg2); +} diff --git a/server/modules/authenticator/MySQLAuth/mysql_auth.c b/server/modules/authenticator/MySQLAuth/mysql_auth.c index d1665bdf6..7a4635aee 100644 --- a/server/modules/authenticator/MySQLAuth/mysql_auth.c +++ b/server/modules/authenticator/MySQLAuth/mysql_auth.c @@ -34,7 +34,7 @@ #include #include #include -#include +#include static void* mysql_auth_init(char **options); static bool mysql_auth_set_protocol_data(DCB *dcb, GWBUF *buf); @@ -137,7 +137,7 @@ static bool open_instance_database(const char *path, sqlite3 **handle) sqlite3* get_handle(MYSQL_AUTH* instance) { - int i = mxs_worker_get_current_id(); + int i = mxs_rworker_get_current_id(); ss_dassert(i >= 0); if (instance->handles[i] == NULL) diff --git a/server/modules/protocol/MySQL/mariadbclient/mysql_client.cc b/server/modules/protocol/MySQL/mariadbclient/mysql_client.cc index 5c5407f14..cd4ceea78 100644 --- a/server/modules/protocol/MySQL/mariadbclient/mysql_client.cc +++ b/server/modules/protocol/MySQL/mariadbclient/mysql_client.cc @@ -33,10 +33,10 @@ #include #include #include +#include #include #include #include -#include #include "setparser.hh" #include "sqlmodeparser.hh" @@ -711,7 +711,7 @@ gw_read_do_authentication(DCB *dcb, GWBUF *read_buffer, int nbytes_read) // For the time being only the sql_mode is stored in MXS_SESSION::client_protocol_data. session->client_protocol_data = QC_SQL_MODE_DEFAULT; protocol->protocol_auth_state = MXS_AUTH_STATE_COMPLETE; - ss_debug(bool check = ) mxs_worker_register_session(session); + ss_debug(bool check = ) mxs_rworker_register_session(session); ss_dassert(check); mxs_mysql_send_ok(dcb, next_sequence, 0, NULL); @@ -1439,7 +1439,7 @@ static int gw_client_close(DCB *dcb) { ss_dassert(target->state == SESSION_STATE_ROUTER_READY || target->state == SESSION_STATE_STOPPING); - ss_debug(bool removed = ) mxs_worker_deregister_session(target->ses_id); + ss_debug(bool removed = ) mxs_rworker_deregister_session(target->ses_id); ss_dassert(removed); session_close(target); } diff --git a/server/modules/routing/binlogrouter/blr_master.c b/server/modules/routing/binlogrouter/blr_master.c index 8fc5bc408..e9e13de31 100644 --- a/server/modules/routing/binlogrouter/blr_master.c +++ b/server/modules/routing/binlogrouter/blr_master.c @@ -44,13 +44,13 @@ #include #include #include +#include #include #include #include #include #include #include -#include static GWBUF *blr_make_query(DCB *dcb, char *query); static GWBUF *blr_make_registration(ROUTER_INSTANCE *router); @@ -149,7 +149,7 @@ typedef enum static void blr_start_master(void* data) { ROUTER_INSTANCE *router = (ROUTER_INSTANCE*)data; - ss_dassert(mxs_worker_get_current_id() == 0); + ss_dassert(mxs_rworker_get_current() == mxs_rworker_get(MXS_RWORKER_MAIN)); if (router->client) { @@ -235,7 +235,7 @@ static void blr_start_master(void* data) * 'client' is the fake DCB that emulates a client session: * we need to set the poll.thread.id for the "dummy client" */ - client->session->client_dcb->poll.thread.id = mxs_worker_get_current_id(); + client->session->client_dcb->poll.thread.id = mxs_rworker_get_current_id(); /* Connect to configured master server */ if ((router->master = dcb_connect(router->service->dbref->server, @@ -329,7 +329,7 @@ bool blr_start_master_in_main(void* data) // The master should be connected to in the main worker, so we post it a // message and call `blr_start_master` there. - MXS_WORKER* worker = mxs_worker_get(0); // The worker running in the main thread. + MXS_WORKER* worker = mxs_rworker_get(MXS_RWORKER_MAIN); // The worker running in the main thread. ss_dassert(worker); intptr_t arg1 = (intptr_t)worker_cb_start_master; @@ -367,7 +367,7 @@ void blr_close_master_in_main(void* data) // The master should be connected to in the main worker, so we post it a // message and call `blr_master_close` there. - MXS_WORKER* worker = mxs_worker_get(0); // The worker running in the main thread. + MXS_WORKER* worker = mxs_rworker_get(MXS_RWORKER_MAIN); // The worker running in the main thread. ss_dassert(worker); intptr_t arg1 = (intptr_t)worker_cb_close_master; diff --git a/server/modules/routing/debugcli/debugcmd.c b/server/modules/routing/debugcli/debugcmd.c index 4253e2552..8651747cf 100644 --- a/server/modules/routing/debugcli/debugcmd.c +++ b/server/modules/routing/debugcli/debugcmd.c @@ -52,7 +52,7 @@ #include #include #include -#include +#include #include @@ -876,21 +876,9 @@ static void cmd_AddServer(DCB *dcb, SERVER *server, char *v1, char *v2, char *v3 */ void ping_workers(DCB* dcb) { - int n_workers = config_threadcount(); + int n = mxs_rworker_broadcast_message(MXS_WORKER_MSG_PING, 0, 0); - for (int i = 0; i < n_workers; ++i) - { - MXS_WORKER *worker = mxs_worker_get(i); - - if (mxs_worker_post_message(worker, MXS_WORKER_MSG_PING, 0, 0)) - { - dcb_printf(dcb, "Posted message to worker %d.\n", i); - } - else - { - dcb_printf(dcb, "Could not post message to worker %d: %s\n", i, mxs_strerror(errno)); - } - } + dcb_printf(dcb, "Broadcasted ping message to %d workers.\n", n); } struct subcommand pingoptions[] =