diff --git a/include/maxscale/messagequeue.hh b/include/maxscale/messagequeue.hh index 2d2e9fff2..b56d191df 100644 --- a/include/maxscale/messagequeue.hh +++ b/include/maxscale/messagequeue.hh @@ -193,9 +193,9 @@ public: private: MessageQueue(Handler* pHandler, int read_fd, int write_fd); - uint32_t handle_poll_events(int thread_id, uint32_t events); + uint32_t handle_poll_events(Worker* pWorker, uint32_t events); - static uint32_t poll_handler(MXS_POLL_DATA* pData, int thread_id, uint32_t events); + static uint32_t poll_handler(MXS_POLL_DATA* pData, void* worker, uint32_t events); private: Handler& m_handler; diff --git a/include/maxscale/poll_core.h b/include/maxscale/poll_core.h index 5390df3a3..eaaaa1616 100644 --- a/include/maxscale/poll_core.h +++ b/include/maxscale/poll_core.h @@ -39,12 +39,13 @@ struct mxs_poll_data; * 'struct mxs_poll_data' structure. * * @param data The `mxs_poll_data` instance that contained this function pointer. - * @param wid The worker thread id. + * @param worker The worker. * @param events The epoll events. * * @return A combination of mxs_poll_action_t enumeration values. */ -typedef uint32_t (*mxs_poll_handler_t)(struct mxs_poll_data* data, int wid, uint32_t events); +// TODO: Change worker to mxs::Worker once this is C++-ified. +typedef uint32_t (*mxs_poll_handler_t)(struct mxs_poll_data* data, void* worker, uint32_t events); typedef struct mxs_poll_data { @@ -52,45 +53,4 @@ typedef struct mxs_poll_data void* owner; /*< Owning worker. */ } MXS_POLL_DATA; -/** - * A file descriptor should be added to the poll set of all workers. - */ -#define MXS_WORKER_ALL -1 - -/** - * Add a file descriptor with associated data to the poll set. - * - * @param wid `MXS_WORKER_ALL` if the file descriptor should be added to the - * poll set of all workers, `MXS_WORKER_ANY` if the file descriptor - * should be added to some worker, otherwise the id of a worker. - * @param fd The file descriptor to be added. - * @param events Mask of epoll event types. - * @param data The structure containing the file descriptor to be - * added. - * - * data->handler : Handler that knows how to deal with events - * for this particular type of 'struct mxs_poll_data'. - * data->thread.id: Will be updated by `poll_add_fd_to_worker`. - * - * @attention If the descriptor should be added to all workers, then the worker - * thread id will be 0. - * - * @attention The provided file descriptor *must* be non-blocking. - * - * @return True on success, false on failure. - */ -bool poll_add_fd_to_worker(int wid, int fd, uint32_t events, MXS_POLL_DATA* data); - - -/** - * Remove a file descriptor from a poll set. - * - * @param wid `MXS_WORKER_ALL` if the file descriptor should be removed from - * the poll set of all workers; otherwise the id of a worker. - * @param fd The file descriptor to be removed. - * - * @return True on success, false on failure. - */ -bool poll_remove_fd_from_worker(int wid, int fd); - MXS_END_DECLS diff --git a/include/maxscale/protocol/mariadb_client.hh b/include/maxscale/protocol/mariadb_client.hh index bee1aae21..3d4f17b98 100644 --- a/include/maxscale/protocol/mariadb_client.hh +++ b/include/maxscale/protocol/mariadb_client.hh @@ -59,7 +59,7 @@ public: private: static LocalClient* create(MXS_SESSION* session, const char* ip, uint64_t port); LocalClient(MXS_SESSION* session, int fd); - static uint32_t poll_handler(struct mxs_poll_data* data, int wid, uint32_t events); + static uint32_t poll_handler(struct mxs_poll_data* data, void* worker, uint32_t events); void process(uint32_t events); GWBUF* read_complete_packet(); void drain_queue(); diff --git a/include/maxscale/worker.h b/include/maxscale/worker.h index 219bd474d..74167f629 100644 --- a/include/maxscale/worker.h +++ b/include/maxscale/worker.h @@ -45,7 +45,7 @@ enum mxs_worker_msg_id /** * Function call message. * - * arg1: Pointer to function with the prototype: void (*)(int thread_id, void* arg2); + * arg1: Pointer to function with the prototype: void (*)(MXS_WORKER*, void* arg2); * arg2: Second argument for the function passed in arg1. */ MXS_WORKER_MSG_CALL @@ -76,25 +76,4 @@ MXS_WORKER* mxs_worker_get_current(); */ bool mxs_worker_post_message(MXS_WORKER* worker, uint32_t msg_id, intptr_t arg1, intptr_t arg2); -/** - * @brief Convert a worker to JSON format - * - * @param host Hostname of this server - * @param id ID of the worker - * - * @return JSON resource representing the worker - */ -json_t* mxs_worker_to_json(const char* host, int id); - -/** - * Convert workers into JSON format - * - * @param host Hostname of this server - * - * @return A JSON resource collection of workers - * - * @see mxs_json_resource() - */ -json_t* mxs_worker_list_to_json(const char* host); - MXS_END_DECLS diff --git a/include/maxscale/worker.hh b/include/maxscale/worker.hh index 2e3debafd..27936909a 100644 --- a/include/maxscale/worker.hh +++ b/include/maxscale/worker.hh @@ -466,9 +466,9 @@ protected: virtual void tick() = 0; private: - uint32_t handle(int wid, uint32_t events); + uint32_t handle(Worker* pWorker, uint32_t events); - static uint32_t handler(MXS_POLL_DATA* pThis, int wid, uint32_t events); + static uint32_t handler(MXS_POLL_DATA* pThis, void* pWorker, uint32_t events); private: int m_fd; /**< The timerfd descriptor. */ @@ -578,16 +578,6 @@ public: Worker(); virtual ~Worker(); - /** - * Returns the id of the worker - * - * @return The id of the worker. - */ - int id() const - { - return m_id; - } - int load(Load::counter_t counter) { return m_load.percentage(counter); @@ -617,26 +607,6 @@ public: return m_statistics; } - /** - * Returns statistics for all workers. - * - * @return Combined statistics. - * - * @attentions The statistics may no longer be accurate by the time it has - * been returned. The returned values may also not represent a - * 100% consistent set. - */ - static STATISTICS get_statistics(); - - /** - * Return a specific combined statistic value. - * - * @param what What to return. - * - * @return The corresponding value. - */ - static int64_t get_one_statistic(POLL_STAT what); - /** * Return this worker's statistics. * @@ -775,71 +745,6 @@ public: return post(std::auto_ptr(sTask.release()), mode); } - /** - * Posts a task to all workers for execution. - * - * @param pTask The task to be executed. - * @param pSem If non-NULL, will be posted once per worker when the task's - * `execute` return. - * - * @return How many workers the task was posted to. - * - * @attention The very same task will be posted to all workers. The task - * should either not have any sharable data or then it should - * have data specific to each worker that can be accessed - * without locks. - */ - static size_t broadcast(Task* pTask, Semaphore* pSem = NULL); - - /** - * Posts a task to all workers for execution. - * - * @param pTask The task to be executed. - * - * @return How many workers the task was posted to. - * - * @attention The very same task will be posted to all workers. The task - * should either not have any sharable data or then it should - * have data specific to each worker that can be accessed - * without locks. - * - * @attention Once the task has been executed by all workers, it will - * be deleted. - */ - static size_t broadcast(std::auto_ptr sTask); - - template - static size_t broadcast(std::auto_ptr sTask) - { - return broadcast(std::auto_ptr(sTask.release())); - } - - /** - * Executes a task on all workers in serial mode (the task is executed - * on at most one worker thread at a time). When the function returns - * the task has been executed on all workers. - * - * @param task The task to be executed. - * - * @return How many workers the task was posted to. - * - * @warning This function is extremely inefficient and will be slow compared - * to the other functions. Only use this function when printing thread-specific - * data to stdout. - */ - static size_t execute_serially(Task& task); - - /** - * Executes a task on all workers concurrently and waits until all workers - * are done. That is, when the function returns the task has been executed - * by all workers. - * - * @param task The task to be executed. - * - * @return How many workers the task was posted to. - */ - static size_t execute_concurrently(Task& task); - /** * Post a message to a worker. * @@ -857,46 +762,6 @@ public: */ bool post_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2); - /** - * Broadcast a message to all worker. - * - * @param msg_id The message id. - * @param arg1 Message specific first argument. - * @param arg2 Message specific second argument. - * - * @return The number of messages posted; if less that ne number of workers - * then some postings failed. - * - * @attention The return value tells *only* whether message could be posted, - * *not* that it has reached the worker. - * - * @attentsion Exactly the same arguments are passed to all workers. Take that - * into account if the passed data must be freed. - * - * @attention This function is signal safe. - */ - static size_t broadcast_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2); - - /** - * Initate shutdown of all workers. - * - * @attention A call to this function will only initiate the shutdowm, - * the workers will not have shut down when the function returns. - * - * @attention This function is signal safe. - */ - static void shutdown_all(); - - /** - * Return the worker associated with the provided worker id. - * - * @param worker_id A worker id. - * - * @return The corresponding worker instance, or NULL if the id does - * not correspond to a worker. - */ - static Worker* get(int worker_id); - /** * Return the worker associated with the current thread. * @@ -904,13 +769,6 @@ public: */ static Worker* get_current(); - /** - * Return the worker id associated with the current thread. - * - * @return A worker instance, or -1 if the current thread does not have a worker. - */ - static int get_current_id(); - /** * Push a function for delayed execution. * @@ -1034,6 +892,21 @@ public: bool cancel_delayed_call(uint32_t id); protected: + const int m_epoll_fd; /*< The epoll file descriptor. */ + state_t m_state; /*< The state of the worker */ + + static void inc_ref(WorkerDisposableTask* pTask) + { + pTask->inc_ref(); + } + + static void dec_ref(WorkerDisposableTask* pTask) + { + pTask->dec_ref(); + } + + bool post_disposable(DisposableTask* pTask, enum execute_mode_t mode = EXECUTE_AUTO); + /** * Called by Worker::run() before starting the epoll loop. * @@ -1067,11 +940,6 @@ protected: */ static void resolve_poll_error(int fd, int err, int op); -protected: - const int m_id; /*< The id of the worker. */ - const int m_epoll_fd; /*< The epoll file descriptor. */ - state_t m_state; /*< The state of the worker */ - private: class DelayedCall; friend class DelayedCall; @@ -1259,8 +1127,6 @@ private: uint32_t add_delayed_call(DelayedCall* pDelayed_call); void adjust_timer(); - bool post_disposable(DisposableTask* pTask, enum execute_mode_t mode = EXECUTE_AUTO); - void handle_message(MessageQueue& queue, const MessageQueue::Message& msg); // override static void thread_main(void* arg); diff --git a/server/core/dcb.cc b/server/core/dcb.cc index 9ea12fdb9..025bd0218 100644 --- a/server/core/dcb.cc +++ b/server/core/dcb.cc @@ -115,11 +115,11 @@ static int dcb_listen_create_socket_unix(const char *path); static int dcb_set_socket_option(int sockfd, int level, int optname, void *optval, socklen_t optlen); static void dcb_add_to_all_list(DCB *dcb); static void dcb_add_to_list(DCB *dcb); -static bool dcb_add_to_worker(int worker_id, DCB *dcb, uint32_t events); +static bool dcb_add_to_worker(Worker* worker, DCB *dcb, uint32_t events); static DCB *dcb_find_free(); static void dcb_remove_from_list(DCB *dcb); -static uint32_t dcb_poll_handler(MXS_POLL_DATA *data, int thread_id, uint32_t events); +static uint32_t dcb_poll_handler(MXS_POLL_DATA *data, void* worker, uint32_t events); static uint32_t dcb_process_poll_events(DCB *dcb, uint32_t ev); static bool dcb_session_check(DCB *dcb, const char *); static int upstream_throttle_callback(DCB *dcb, DCB_REASON reason, void *userdata); @@ -1102,12 +1102,12 @@ void dcb_close(DCB *dcb) CHK_DCB(dcb); #if defined(SS_DEBUG) - int wid = Worker::get_current_id(); + RoutingWorker* current = RoutingWorker::get_current(); RoutingWorker* owner = static_cast(dcb->poll.owner); - if ((wid != -1) && (owner->id() != wid)) + if (current && (current != owner)) { MXS_ALERT("dcb_close(%p) called by %d, owned by %d.", - dcb, wid, owner->id()); + dcb, current->id(), owner->id()); ss_dassert(owner == RoutingWorker::get_current()); } #endif @@ -1166,7 +1166,7 @@ void dcb_close(DCB *dcb) } } -static void cb_dcb_close_in_owning_thread(int worker_id, void* data) +static void cb_dcb_close_in_owning_thread(MXS_WORKER*, void* data) { DCB* dcb = static_cast(data); ss_dassert(dcb); @@ -1197,13 +1197,13 @@ void dcb_close_in_owning_thread(DCB* dcb) void dcb_final_close(DCB* dcb) { #if defined(SS_DEBUG) - int wid = Worker::get_current_id(); + RoutingWorker* current = RoutingWorker::get_current(); RoutingWorker* owner = static_cast(dcb->poll.owner); - if ((wid != -1) && (owner->id() != wid)) + if (current && (current != owner)) { MXS_ALERT("dcb_final_close(%p) called by %d, owned by %d.", - dcb, wid, owner->id()); - ss_dassert(owner->id() == Worker::get_current_id()); + dcb, current->id(), owner->id()); + ss_dassert(owner == current); } #endif ss_dassert(dcb->n_close != 0); @@ -2000,9 +2000,12 @@ dcb_call_callback(DCB *dcb, DCB_REASON reason) } } -static void dcb_hangup_foreach_worker(int thread_id, struct server* server) +static void dcb_hangup_foreach_worker(MXS_WORKER* worker, struct server* server) { - for (DCB *dcb = this_unit.all_dcbs[thread_id]; dcb; dcb = dcb->thread.next) + RoutingWorker* rworker = static_cast(worker); + int id = rworker->id(); + + for (DCB *dcb = this_unit.all_dcbs[id]; dcb; dcb = dcb->thread.next) { if (dcb->state == DCB_STATE_POLLING && dcb->server && dcb->server == server) @@ -2024,7 +2027,7 @@ dcb_hangup_foreach(struct server* server) intptr_t arg1 = (intptr_t)dcb_hangup_foreach_worker; intptr_t arg2 = (intptr_t)server; - Worker::broadcast_message(MXS_WORKER_MSG_CALL, arg1, arg2); + RoutingWorker::broadcast_message(MXS_WORKER_MSG_CALL, arg1, arg2); } /** @@ -2917,7 +2920,8 @@ public: void execute(Worker& worker) { - int thread_id = worker.id(); + RoutingWorker& rworker = static_cast(worker); + int thread_id = rworker.id(); for (DCB *dcb = this_unit.all_dcbs[thread_id]; dcb && atomic_load_int32(&m_more); @@ -2950,7 +2954,7 @@ private: bool dcb_foreach(bool(*func)(DCB *dcb, void *data), void *data) { SerialDcbTask task(func, data); - Worker::execute_serially(task); + RoutingWorker::execute_serially(task); return task.more(); } @@ -2967,7 +2971,8 @@ public: void execute(Worker& worker) { - int thread_id = worker.id(); + RoutingWorker& rworker = static_cast(worker); + int thread_id = rworker.id(); for (DCB *dcb = this_unit.all_dcbs[thread_id]; dcb; dcb = dcb->thread.next) { @@ -2986,7 +2991,7 @@ private: void dcb_foreach_parallel(bool(*func)(DCB *dcb, void *data), void **data) { ParallelDcbTask task(func, data); - Worker::execute_concurrently(task); + RoutingWorker::execute_concurrently(task); } int dcb_get_port(const DCB *dcb) @@ -3014,7 +3019,7 @@ int dcb_get_port(const DCB *dcb) static uint32_t dcb_process_poll_events(DCB *dcb, uint32_t events) { RoutingWorker* owner = static_cast(dcb->poll.owner); - ss_dassert(owner->id() == mxs::Worker::get_current_id() || + ss_dassert(owner == RoutingWorker::get_current() || dcb->dcb_role == DCB_ROLE_SERVICE_LISTENER); CHK_DCB(dcb); @@ -3220,7 +3225,7 @@ static uint32_t dcb_handler(DCB* dcb, uint32_t events) return rv; } -static uint32_t dcb_poll_handler(MXS_POLL_DATA *data, int thread_id, uint32_t events) +static uint32_t dcb_poll_handler(MXS_POLL_DATA *data, void* worker, uint32_t events) { uint32_t rval = 0; DCB *dcb = (DCB*)data; @@ -3277,7 +3282,10 @@ public: void execute(Worker& worker) { - if (dcb_is_still_valid(m_dcb, worker.get_current_id())) + ss_dassert(&worker == RoutingWorker::get_current()); + + RoutingWorker& rworker = static_cast(worker); + if (dcb_is_still_valid(m_dcb, rworker.id())) { m_dcb->fakeq = m_buffer; dcb_handler(m_dcb, m_ev); @@ -3423,7 +3431,7 @@ public: ss_dassert(rworker.id() == static_cast(m_dcb->poll.owner)->id()); - bool added = dcb_add_to_worker(worker.id(), m_dcb, m_events); + bool added = dcb_add_to_worker(&rworker, m_dcb, m_events); ss_dassert(added); if (!added) @@ -3439,20 +3447,54 @@ private: } -static bool dcb_add_to_worker(int worker_id, DCB* dcb, uint32_t events) +static bool add_fd_to_routing_workers(int fd, uint32_t events, MXS_POLL_DATA* data) +{ + bool rv = true; + void* previous_owner = data->owner; + + rv = RoutingWorker::add_shared_fd(fd, events, data); + + if (rv) + { + // The DCB will appear on the list of the calling thread. + RoutingWorker* worker = RoutingWorker::get_current(); + + if (!worker) + { + // TODO: Listeners are created before the workers have been started. + // TODO: Hence there will be no current worker. So, we just store them + // TODO: in the main worker. + worker = RoutingWorker::get(RoutingWorker::MAIN); + } + + data->owner = worker; + } + else + { + // Restore the situation. + data->owner = previous_owner; + } + + return rv; +} + +static bool dcb_add_to_worker(Worker* worker, DCB* dcb, uint32_t events) { bool rv = false; - if (worker_id == MXS_WORKER_ALL) + if (!worker) { - // A listening DCB, we add it immediately (poll_add_fd_to_worker() is thread-safe). - if (poll_add_fd_to_worker(worker_id, dcb->fd, events, (MXS_POLL_DATA*)dcb)) + // No specific worker; indicates the DCB is a listening DCB. + ss_dassert(dcb->dcb_role == DCB_ROLE_SERVICE_LISTENER); + + // A listening DCB, we add it immediately. + if (add_fd_to_routing_workers(dcb->fd, events, (MXS_POLL_DATA*)dcb)) { // If this takes place on the main thread (all listening DCBs are - // stored on the main thread), + // stored on the main thread)... if (dcb->poll.owner == RoutingWorker::get_current()) { - // we'll add it immediately to the list, + // ..we'll add it immediately to the list, dcb_add_to_list(dcb); } else @@ -3476,13 +3518,13 @@ static bool dcb_add_to_worker(int worker_id, DCB* dcb, uint32_t events) } else { - ss_dassert(worker_id == static_cast(dcb->poll.owner)->id()); + ss_dassert(worker == dcb->poll.owner); - if (worker_id == RoutingWorker::get_current_id()) + if (worker == RoutingWorker::get_current()) { // If the DCB should end up on the current thread, we can both add it // to the epoll-instance and to the DCB book-keeping immediately. - if (poll_add_fd_to_worker(worker_id, dcb->fd, events, (MXS_POLL_DATA*)dcb)) + if (worker->add_fd(dcb->fd, events, (MXS_POLL_DATA*)dcb)) { dcb_add_to_list(dcb); rv = true; @@ -3534,7 +3576,7 @@ int poll_add_dcb(DCB *dcb) /** Choose new state and worker thread ID according to the role of DCB. */ dcb_state_t new_state; - int worker_id = 0; + RoutingWorker* owner = nullptr; if (dcb->dcb_role == DCB_ROLE_SERVICE_LISTENER) { @@ -3545,7 +3587,6 @@ int poll_add_dcb(DCB *dcb) */ events = EPOLLIN; new_state = DCB_STATE_LISTENING; - worker_id = MXS_WORKER_ALL; } else if (dcb->dcb_role == DCB_ROLE_CLIENT_HANDLER && (strcasecmp(dcb->service->routerModule, "cli") == 0 || @@ -3557,9 +3598,8 @@ int poll_add_dcb(DCB *dcb) // handled by different worker threads. // See: https://jira.mariadb.org/browse/MXS-1805 and https://jira.mariadb.org/browse/MXS-1833 new_state = DCB_STATE_POLLING; - RoutingWorker* owner = RoutingWorker::get(RoutingWorker::MAIN); + owner = RoutingWorker::get(RoutingWorker::MAIN); dcb->poll.owner = owner; - worker_id = owner->id(); } else { @@ -3569,7 +3609,7 @@ int poll_add_dcb(DCB *dcb) ss_dassert(RoutingWorker::get_current() == dcb->poll.owner); new_state = DCB_STATE_POLLING; - worker_id = static_cast(dcb->poll.owner)->id(); + owner = static_cast(dcb->poll.owner); } /** @@ -3581,7 +3621,7 @@ int poll_add_dcb(DCB *dcb) int rc = 0; - if (!dcb_add_to_worker(worker_id, dcb, events)) + if (!dcb_add_to_worker(owner, dcb, events)) { /** * We failed to add the DCB to a worker. Revert the state so that it @@ -3630,24 +3670,24 @@ int poll_remove_dcb(DCB *dcb) if (dcbfd > 0) { - int worker_id; + rc = -1; if (dcb->dcb_role == DCB_ROLE_SERVICE_LISTENER) { - worker_id = MXS_WORKER_ALL; + if (RoutingWorker::remove_shared_fd(dcbfd)) + { + rc = 0; + } } else { - worker_id = static_cast(dcb->poll.owner)->id(); - } + Worker* worker = static_cast(dcb->poll.owner); + ss_dassert(worker); - if (poll_remove_fd_from_worker(worker_id, dcbfd)) - { - rc = 0; - } - else - { - rc = -1; + if (worker->remove_fd(dcbfd)) + { + rc = 0; + } } } return rc; diff --git a/server/core/internal/routingworker.hh b/server/core/internal/routingworker.hh index c817bb830..e263c8aa0 100644 --- a/server/core/internal/routingworker.hh +++ b/server/core/internal/routingworker.hh @@ -82,6 +82,16 @@ public: */ static bool remove_shared_fd(int fd); + /** + * Returns the id of the routing worker + * + * @return The id of the routing worker. + */ + int id() const + { + return m_id; + } + /** * Register zombie for later deletion. * @@ -146,7 +156,130 @@ public: */ static void set_maxwait(unsigned int maxwait); + /** + * Posts a task to all workers for execution. + * + * @param pTask The task to be executed. + * @param pSem If non-NULL, will be posted once per worker when the task's + * `execute` return. + * + * @return How many workers the task was posted to. + * + * @attention The very same task will be posted to all workers. The task + * should either not have any sharable data or then it should + * have data specific to each worker that can be accessed + * without locks. + */ + static size_t broadcast(Task* pTask, Semaphore* pSem = NULL); + + /** + * Posts a task to all workers for execution. + * + * @param pTask The task to be executed. + * + * @return How many workers the task was posted to. + * + * @attention The very same task will be posted to all workers. The task + * should either not have any sharable data or then it should + * have data specific to each worker that can be accessed + * without locks. + * + * @attention Once the task has been executed by all workers, it will + * be deleted. + */ + static size_t broadcast(std::auto_ptr sTask); + + template + static size_t broadcast(std::auto_ptr sTask) + { + return broadcast(std::auto_ptr(sTask.release())); + } + + /** + * Executes a task on all workers in serial mode (the task is executed + * on at most one worker thread at a time). When the function returns + * the task has been executed on all workers. + * + * @param task The task to be executed. + * + * @return How many workers the task was posted to. + * + * @warning This function is extremely inefficient and will be slow compared + * to the other functions. Only use this function when printing thread-specific + * data to stdout. + */ + static size_t execute_serially(Task& task); + + /** + * Executes a task on all workers concurrently and waits until all workers + * are done. That is, when the function returns the task has been executed + * by all workers. + * + * @param task The task to be executed. + * + * @return How many workers the task was posted to. + */ + static size_t execute_concurrently(Task& task); + + /** + * Broadcast a message to all worker. + * + * @param msg_id The message id. + * @param arg1 Message specific first argument. + * @param arg2 Message specific second argument. + * + * @return The number of messages posted; if less that ne number of workers + * then some postings failed. + * + * @attention The return value tells *only* whether message could be posted, + * *not* that it has reached the worker. + * + * @attentsion Exactly the same arguments are passed to all workers. Take that + * into account if the passed data must be freed. + * + * @attention This function is signal safe. + */ + static size_t broadcast_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2); + + /** + * Initate shutdown of all workers. + * + * @attention A call to this function will only initiate the shutdowm, + * the workers will not have shut down when the function returns. + * + * @attention This function is signal safe. + */ + static void shutdown_all(); + + /** + * Returns statistics for all workers. + * + * @return Combined statistics. + * + * @attentions The statistics may no longer be accurate by the time it has + * been returned. The returned values may also not represent a + * 100% consistent set. + */ + static STATISTICS get_statistics(); + + /** + * Return a specific combined statistic value. + * + * @param what What to return. + * + * @return The corresponding value. + */ + static int64_t get_one_statistic(POLL_STAT what); + private: + const int m_id; /*< The id of the worker. */ + SessionsById m_sessions; /*< A mapping of session_id->MXS_SESSION. The map + * should contain sessions exclusive to this + * worker and not e.g. listener sessions. For now, + * it's up to the protocol to decide whether a new + * session is added to the map. */ + Zombies m_zombies; /*< DCBs to be deleted. */ + RoutingWorker(); virtual ~RoutingWorker(); @@ -158,16 +291,29 @@ private: void delete_zombies(); - static uint32_t epoll_instance_handler(struct mxs_poll_data* data, int wid, uint32_t events); + static uint32_t epoll_instance_handler(struct mxs_poll_data* data, void* worker, uint32_t events); uint32_t handle_epoll_events(uint32_t events); - -private: - SessionsById m_sessions; /*< A mapping of session_id->MXS_SESSION. The map - * should contain sessions exclusive to this - * worker and not e.g. listener sessions. For now, - * it's up to the protocol to decide whether a new - * session is added to the map. */ - Zombies m_zombies; /*< DCBs to be deleted. */ }; } + +/** + * @brief Convert a routing worker to JSON format + * + * @param host Hostname of this server + * @param id ID of the worker + * + * @return JSON resource representing the worker + */ +json_t* mxs_rworker_to_json(const char* host, int id); + +/** + * Convert routing workers into JSON format + * + * @param host Hostname of this server + * + * @return A JSON resource collection of workers + * + * @see mxs_json_resource() + */ +json_t* mxs_rworker_list_to_json(const char* host); diff --git a/server/core/messagequeue.cc b/server/core/messagequeue.cc index c39e85572..9901a9e60 100644 --- a/server/core/messagequeue.cc +++ b/server/core/messagequeue.cc @@ -220,10 +220,12 @@ Worker* MessageQueue::remove_from_worker() return pWorker; } -uint32_t MessageQueue::handle_poll_events(int thread_id, uint32_t events) +uint32_t MessageQueue::handle_poll_events(Worker* pWorker, uint32_t events) { uint32_t rc = MXS_POLL_NOP; + ss_dassert(pWorker == m_pWorker); + // We only expect EPOLLIN events. ss_dassert(((events & EPOLLIN) != 0) && ((events & ~EPOLLIN) == 0)); @@ -268,11 +270,11 @@ uint32_t MessageQueue::handle_poll_events(int thread_id, uint32_t events) } //static -uint32_t MessageQueue::poll_handler(MXS_POLL_DATA* pData, int thread_id, uint32_t events) +uint32_t MessageQueue::poll_handler(MXS_POLL_DATA* pData, void* pWorker, uint32_t events) { MessageQueue* pThis = static_cast(pData); - return pThis->handle_poll_events(thread_id, events); + return pThis->handle_poll_events(static_cast(pWorker), events); } } diff --git a/server/core/misc.cc b/server/core/misc.cc index aff2e09b2..f8aea36e5 100644 --- a/server/core/misc.cc +++ b/server/core/misc.cc @@ -15,9 +15,8 @@ #include -#include - #include "internal/maxscale.h" +#include "internal/routingworker.hh" #include "internal/service.h" static time_t started; @@ -46,7 +45,7 @@ int maxscale_shutdown() if (n == 0) { service_shutdown(); - mxs::Worker::shutdown_all(); + mxs::RoutingWorker::shutdown_all(); } return n + 1; diff --git a/server/core/poll.cc b/server/core/poll.cc index c6d1cc4fc..aac798c2f 100644 --- a/server/core/poll.cc +++ b/server/core/poll.cc @@ -54,97 +54,6 @@ poll_init() n_threads = config_threadcount(); } -static bool add_fd_to_worker(int wid, int fd, uint32_t events, MXS_POLL_DATA* data) -{ - ss_dassert((wid >= 0) && (wid <= n_threads)); - - Worker* worker = Worker::get(wid); - ss_dassert(worker); - - return worker->add_fd(fd, events, data); -} - -static bool add_fd_to_routing_workers(int fd, uint32_t events, MXS_POLL_DATA* data) -{ - bool rv = true; - void* previous_owner = data->owner; - - rv = RoutingWorker::add_shared_fd(fd, events, data); - - if (rv) - { - // The DCB will appear on the list of the calling thread. - RoutingWorker* worker = RoutingWorker::get_current(); - - if (!worker) - { - // TODO: Listeners are created before the workers have been started. - // TODO: Hence the returned id will be -1. We change it to 0, which in - // TODO: practice will mean that they will end up on the Worker running - // TODO: in the main thread. This needs to be sorted out. - worker = RoutingWorker::get(RoutingWorker::MAIN); - } - - data->owner = worker; - } - else - { - // Restore the situation. - data->owner = previous_owner; - } - - return rv; -} - -bool poll_add_fd_to_worker(int wid, int fd, uint32_t events, MXS_POLL_DATA* data) -{ - bool rv; - - if (wid == MXS_WORKER_ALL) - { - rv = add_fd_to_routing_workers(fd, events, data); - } - else - { - ss_dassert((wid >= 0) && (wid < n_threads)); - - rv = add_fd_to_worker(wid, fd, events, data); - } - - return rv; -} - -static bool remove_fd_from_worker(int wid, int fd) -{ - ss_dassert((wid >= 0) && (wid < n_threads)); - - Worker* worker = Worker::get(wid); - ss_dassert(worker); - - return worker->remove_fd(fd); -} - -static bool remove_fd_from_routing_workers(int fd) -{ - return RoutingWorker::remove_shared_fd(fd); -} - -bool poll_remove_fd_from_worker(int wid, int fd) -{ - bool rv; - - if (wid == MXS_WORKER_ALL) - { - rv = remove_fd_from_routing_workers(fd); - } - else - { - rv = remove_fd_from_worker(wid, fd); - } - - return rv; -} - /** * Set the number of non-blocking poll cycles that will be done before * a blocking poll will take place. Whenever an event arrives on a thread @@ -196,7 +105,7 @@ dprintPollStats(DCB *dcb) { int i; - Worker::STATISTICS s = Worker::get_statistics(); + Worker::STATISTICS s = RoutingWorker::get_statistics(); dcb_printf(dcb, "\nPoll Statistics.\n\n"); dcb_printf(dcb, "No. of epoll cycles: %" PRId64 "\n", s.n_polls); @@ -238,7 +147,7 @@ dShowThreads(DCB *dcb) dcb_printf(dcb, "----+------------+---------------------+---------------------+-----------+-----------+-----------+\n"); for (int i = 0; i < n_threads; i++) { - Worker* worker = Worker::get(i); + Worker* worker = RoutingWorker::get(i); ss_dassert(worker); const char *state = "Unknown"; @@ -288,7 +197,7 @@ dShowEventStats(DCB *pdcb) { int i; - Worker::STATISTICS s = Worker::get_statistics(); + Worker::STATISTICS s = RoutingWorker::get_statistics(); dcb_printf(pdcb, "\nEvent statistics.\n"); dcb_printf(pdcb, "Maximum queue time: %3" PRId64 "00ms\n", s.maxqtime); @@ -320,7 +229,7 @@ dShowEventStats(DCB *pdcb) int64_t poll_get_stat(POLL_STAT what) { - return Worker::get_one_statistic(what); + return RoutingWorker::get_one_statistic(what); } namespace @@ -399,7 +308,7 @@ eventTimesGetList() } data->rowno = 0; - data->stats = Worker::get_statistics(); + data->stats = RoutingWorker::get_statistics(); if ((set = resultset_create(eventTimesRowCallback, data)) == NULL) { diff --git a/server/core/resource.cc b/server/core/resource.cc index 8c83e4775..d545b12cc 100644 --- a/server/core/resource.cc +++ b/server/core/resource.cc @@ -574,13 +574,13 @@ HttpResponse cb_flush(const HttpRequest& request) HttpResponse cb_all_threads(const HttpRequest& request) { - return HttpResponse(MHD_HTTP_OK, mxs_worker_list_to_json(request.host())); + return HttpResponse(MHD_HTTP_OK, mxs_rworker_list_to_json(request.host())); } HttpResponse cb_thread(const HttpRequest& request) { int id = atoi(request.last_uri_part().c_str()); - return HttpResponse(MHD_HTTP_OK, mxs_worker_to_json(request.host(), id)); + return HttpResponse(MHD_HTTP_OK, mxs_rworker_to_json(request.host(), id)); } HttpResponse cb_tasks(const HttpRequest& request) @@ -1187,7 +1187,7 @@ private: HttpResponse resource_handle_request(const HttpRequest& request) { - mxs::Worker* worker = mxs::Worker::get(0); + mxs::Worker* worker = mxs::RoutingWorker::get(mxs::RoutingWorker::MAIN); mxs::Semaphore sem; ResourceTask task(request); diff --git a/server/core/routingworker.cc b/server/core/routingworker.cc index 472dbd6f8..2a6f5b10e 100644 --- a/server/core/routingworker.cc +++ b/server/core/routingworker.cc @@ -60,6 +60,7 @@ struct this_unit bool initialized; // Whether the initialization has been performed. int nWorkers; // How many routing workers there are. RoutingWorker** ppWorkers; // Array of routing worker instances. + int next_worker_id; // Next worker id // DEPRECATED in 2.3, remove in 2.4. int number_poll_spins; // Maximum non-block polls // DEPRECATED in 2.3, remove in 2.4. @@ -73,6 +74,7 @@ struct this_unit false, // initialized 0, // nWorkers NULL, // ppWorkers + 0, // next_worker_id 0, // number_poll_spins 0, // max_poll_sleep -1, // epoll_listener_fd @@ -81,6 +83,11 @@ struct this_unit WORKER_ABSENT_ID, // id_max_worker }; +int next_worker_id() +{ + return atomic_add(&this_unit.next_worker_id, 1); +} + thread_local struct this_thread { int current_worker_id; // The worker id of the current thread @@ -161,6 +168,7 @@ namespace maxscale { RoutingWorker::RoutingWorker() + : m_id(next_worker_id()) { MXS_POLL_DATA::handler = &RoutingWorker::epoll_instance_handler; MXS_POLL_DATA::owner = this; @@ -541,12 +549,12 @@ void RoutingWorker::epoll_tick() * @return What actions were performed. */ //static -uint32_t RoutingWorker::epoll_instance_handler(struct mxs_poll_data* pData, int wid, uint32_t events) +uint32_t RoutingWorker::epoll_instance_handler(struct mxs_poll_data* pData, void* pWorker, uint32_t events) { - RoutingWorker* pWorker = static_cast(pData); - ss_dassert(pWorker->m_id == wid); + RoutingWorker* pThis = static_cast(pData); + ss_dassert(pThis == pWorker); - return pWorker->handle_epoll_events(events); + return pThis->handle_epoll_events(events); } /** @@ -578,12 +586,288 @@ uint32_t RoutingWorker::handle_epoll_events(uint32_t events) MXS_DEBUG("1 event for worker %d.", m_id); MXS_POLL_DATA* pData = static_cast(epoll_events[0].data.ptr); - actions = pData->handler(pData, m_id, epoll_events[0].events); + actions = pData->handler(pData, this, epoll_events[0].events); } return actions; } +//static +size_t RoutingWorker::broadcast(Task* pTask, Semaphore* pSem) +{ + // No logging here, function must be signal safe. + size_t n = 0; + + int nWorkers = this_unit.next_worker_id; + for (int i = 0; i < nWorkers; ++i) + { + Worker* pWorker = this_unit.ppWorkers[i]; + ss_dassert(pWorker); + + if (pWorker->post(pTask, pSem)) + { + ++n; + } + } + + return n; +} + +//static +size_t RoutingWorker::broadcast(std::auto_ptr sTask) +{ + DisposableTask* pTask = sTask.release(); + Worker::inc_ref(pTask); + + size_t n = 0; + + int nWorkers = this_unit.next_worker_id; + for (int i = 0; i < nWorkers; ++i) + { + RoutingWorker* pWorker = this_unit.ppWorkers[i]; + ss_dassert(pWorker); + + if (pWorker->post_disposable(pTask)) + { + ++n; + } + } + + Worker::dec_ref(pTask); + + return n; +} + +//static +size_t RoutingWorker::execute_serially(Task& task) +{ + Semaphore sem; + size_t n = 0; + + int nWorkers = this_unit.next_worker_id; + for (int i = 0; i < nWorkers; ++i) + { + RoutingWorker* pWorker = this_unit.ppWorkers[i]; + ss_dassert(pWorker); + + if (pWorker->post(&task, &sem)) + { + sem.wait(); + ++n; + } + } + + return n; +} + +//static +size_t RoutingWorker::execute_concurrently(Task& task) +{ + Semaphore sem; + return sem.wait_n(RoutingWorker::broadcast(&task, &sem)); +} + +// static +size_t RoutingWorker::broadcast_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2) +{ + // NOTE: No logging here, this function must be signal safe. + + size_t n = 0; + + int nWorkers = this_unit.next_worker_id; + for (int i = 0; i < nWorkers; ++i) + { + Worker* pWorker = this_unit.ppWorkers[i]; + ss_dassert(pWorker); + + if (pWorker->post_message(msg_id, arg1, arg2)) + { + ++n; + } + } + + return n; +} + +// static +void RoutingWorker::shutdown_all() +{ + // NOTE: No logging here, this function must be signal safe. + ss_dassert((this_unit.next_worker_id == 0) || (this_unit.ppWorkers != NULL)); + + int nWorkers = this_unit.next_worker_id; + for (int i = 0; i < nWorkers; ++i) + { + RoutingWorker* pWorker = this_unit.ppWorkers[i]; + ss_dassert(pWorker); + + pWorker->shutdown(); + } +} + +namespace +{ + +int64_t one_stats_get(int64_t Worker::STATISTICS::*what, enum ts_stats_type type) +{ + int64_t best = type == TS_STATS_MAX ? LONG_MIN : (type == TS_STATS_MIX ? LONG_MAX : 0); + + int nWorkers = this_unit.next_worker_id; + + for (int i = 0; i < nWorkers; ++i) + { + RoutingWorker* pWorker = RoutingWorker::get(i); + ss_dassert(pWorker); + + const Worker::STATISTICS& s = pWorker->statistics(); + + int64_t value = s.*what; + + switch (type) + { + case TS_STATS_MAX: + if (value > best) + { + best = value; + } + break; + + case TS_STATS_MIX: + if (value < best) + { + best = value; + } + break; + + case TS_STATS_AVG: + case TS_STATS_SUM: + best += value; + break; + } + } + + return type == TS_STATS_AVG ? best / (nWorkers != 0 ? nWorkers : 1) : best; +} + +} + +//static +Worker::STATISTICS RoutingWorker::get_statistics() +{ + STATISTICS cs; + + cs.n_read = one_stats_get(&STATISTICS::n_read, TS_STATS_SUM); + cs.n_write = one_stats_get(&STATISTICS::n_write, TS_STATS_SUM); + cs.n_error = one_stats_get(&STATISTICS::n_error, TS_STATS_SUM); + cs.n_hup = one_stats_get(&STATISTICS::n_hup, TS_STATS_SUM); + cs.n_accept = one_stats_get(&STATISTICS::n_accept, TS_STATS_SUM); + cs.n_polls = one_stats_get(&STATISTICS::n_polls, TS_STATS_SUM); + cs.n_pollev = one_stats_get(&STATISTICS::n_pollev, TS_STATS_SUM); + cs.n_nbpollev = one_stats_get(&STATISTICS::n_nbpollev, TS_STATS_SUM); + cs.evq_length = one_stats_get(&STATISTICS::evq_length, TS_STATS_AVG); + cs.evq_max = one_stats_get(&STATISTICS::evq_max, TS_STATS_MAX); + cs.blockingpolls = one_stats_get(&STATISTICS::blockingpolls, TS_STATS_SUM); + cs.maxqtime = one_stats_get(&STATISTICS::maxqtime, TS_STATS_MAX); + cs.maxexectime = one_stats_get(&STATISTICS::maxexectime, TS_STATS_MAX); + + for (int i = 0; i < Worker::STATISTICS::MAXNFDS - 1; i++) + { + for (int j = 0; j < this_unit.next_worker_id; ++j) + { + Worker* pWorker = RoutingWorker::get(j); + ss_dassert(pWorker); + + cs.n_fds[i] += pWorker->statistics().n_fds[i]; + } + } + + for (int i = 0; i <= Worker::STATISTICS::N_QUEUE_TIMES; ++i) + { + int nWorkers = this_unit.next_worker_id; + + for (int j = 0; j < nWorkers; ++j) + { + Worker* pWorker = RoutingWorker::get(j); + ss_dassert(pWorker); + + cs.qtimes[i] += pWorker->statistics().qtimes[i]; + cs.exectimes[i] += pWorker->statistics().exectimes[i]; + } + + cs.qtimes[i] /= (nWorkers != 0 ? nWorkers : 1); + cs.exectimes[i] /= (nWorkers != 0 ? nWorkers : 1); + } + + return cs; +} + +//static +int64_t RoutingWorker::get_one_statistic(POLL_STAT what) +{ + int64_t rv = 0; + + int64_t Worker::STATISTICS::*member = NULL; + enum ts_stats_type approach; + + switch (what) + { + case POLL_STAT_READ: + member = &Worker::STATISTICS::n_read; + approach = TS_STATS_SUM; + break; + + case POLL_STAT_WRITE: + member = &Worker::STATISTICS::n_write; + approach = TS_STATS_SUM; + break; + + case POLL_STAT_ERROR: + member = &Worker::STATISTICS::n_error; + approach = TS_STATS_SUM; + break; + + case POLL_STAT_HANGUP: + member = &Worker::STATISTICS::n_hup; + approach = TS_STATS_SUM; + break; + + case POLL_STAT_ACCEPT: + member = &Worker::STATISTICS::n_accept; + approach = TS_STATS_SUM; + break; + + case POLL_STAT_EVQ_LEN: + member = &Worker::STATISTICS::evq_length; + approach = TS_STATS_AVG; + break; + + case POLL_STAT_EVQ_MAX: + member = &Worker::STATISTICS::evq_max; + approach = TS_STATS_MAX; + break; + + case POLL_STAT_MAX_QTIME: + member = &Worker::STATISTICS::maxqtime; + approach = TS_STATS_MAX; + break; + + case POLL_STAT_MAX_EXECTIME: + member = &Worker::STATISTICS::maxexectime; + approach = TS_STATS_MAX; + break; + + default: + ss_dassert(!true); + } + + if (member) + { + rv = one_stats_get(member, approach); + } + + return rv; +} + } size_t mxs_rworker_broadcast_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2) @@ -626,3 +910,107 @@ int mxs_rworker_get_current_id() { return RoutingWorker::get_current_id(); } + +namespace +{ + +using namespace maxscale; + +class WorkerInfoTask: public maxscale::WorkerTask +{ +public: + WorkerInfoTask(const char* zHost, uint32_t nThreads) + : m_zHost(zHost) + { + m_data.resize(nThreads); + } + + void execute(Worker& worker) + { + RoutingWorker& rworker = static_cast(worker); + + json_t* pStats = json_object(); + const Worker::STATISTICS& s = rworker.get_local_statistics(); + json_object_set_new(pStats, "reads", json_integer(s.n_read)); + json_object_set_new(pStats, "writes", json_integer(s.n_write)); + json_object_set_new(pStats, "errors", json_integer(s.n_error)); + json_object_set_new(pStats, "hangups", json_integer(s.n_hup)); + json_object_set_new(pStats, "accepts", json_integer(s.n_accept)); + json_object_set_new(pStats, "blocking_polls", json_integer(s.blockingpolls)); + json_object_set_new(pStats, "event_queue_length", json_integer(s.evq_length)); + json_object_set_new(pStats, "max_event_queue_length", json_integer(s.evq_max)); + json_object_set_new(pStats, "max_exec_time", json_integer(s.maxexectime)); + json_object_set_new(pStats, "max_queue_time", json_integer(s.maxqtime)); + + uint32_t nCurrent; + uint64_t nTotal; + rworker.get_descriptor_counts(&nCurrent, &nTotal); + json_object_set_new(pStats, "current_descriptors", json_integer(nCurrent)); + json_object_set_new(pStats, "total_descriptors", json_integer(nTotal)); + + json_t* load = json_object(); + json_object_set_new(load, "last_second", json_integer(rworker.load(Worker::Load::ONE_SECOND))); + json_object_set_new(load, "last_minute", json_integer(rworker.load(Worker::Load::ONE_MINUTE))); + json_object_set_new(load, "last_hour", json_integer(rworker.load(Worker::Load::ONE_HOUR))); + json_object_set_new(pStats, "load", load); + + json_t* pAttr = json_object(); + json_object_set_new(pAttr, "stats", pStats); + + int idx = rworker.id(); + stringstream ss; + ss << idx; + + json_t* pJson = json_object(); + json_object_set_new(pJson, CN_TYPE, json_string(CN_THREADS)); + json_object_set_new(pJson, CN_ATTRIBUTES, pAttr); + json_object_set_new(pJson, CN_LINKS, mxs_json_self_link(m_zHost, CN_THREADS, ss.str().c_str())); + + ss_dassert((size_t)idx < m_data.size()); + m_data[idx] = pJson; + } + + json_t* resource() + { + json_t* pArr = json_array(); + + for (auto it = m_data.begin(); it != m_data.end(); it++) + { + json_array_append_new(pArr, *it); + } + + return mxs_json_resource(m_zHost, MXS_JSON_API_THREADS, pArr); + } + + json_t* resource(int id) + { + stringstream self; + self << MXS_JSON_API_THREADS << id; + return mxs_json_resource(m_zHost, self.str().c_str(), m_data[id]); + } + +private: + vector m_data; + const char* m_zHost; +}; + +} + +json_t* mxs_rworker_to_json(const char* zHost, int id) +{ + Worker* target = RoutingWorker::get(id); + WorkerInfoTask task(zHost, id + 1); + mxs::Semaphore sem; + + target->post(&task, &sem); + sem.wait(); + + return task.resource(id); +} + +json_t* mxs_rworker_list_to_json(const char* host) +{ + WorkerInfoTask task(host, config_threadcount()); + RoutingWorker::execute_concurrently(task); + return task.resource(); +} diff --git a/server/core/server.cc b/server/core/server.cc index 30986a864..b236d120c 100644 --- a/server/core/server.cc +++ b/server/core/server.cc @@ -47,6 +47,7 @@ using maxscale::Semaphore; +using maxscale::RoutingWorker; using maxscale::Worker; using maxscale::WorkerTask; @@ -466,7 +467,10 @@ public: void execute(Worker& worker) { - int thread_id = worker.get_current_id(); + RoutingWorker& rworker = static_cast(worker); + ss_dassert(&rworker == RoutingWorker::get_current()); + + int thread_id = rworker.id(); dcb_persistent_clean_count(m_server->persistent[thread_id], thread_id, false); } @@ -484,7 +488,7 @@ private: static void cleanup_persistent_connections(const SERVER* server) { CleanupTask task(server); - Worker::execute_concurrently(task); + RoutingWorker::execute_concurrently(task); } /** diff --git a/server/core/test/test_worker.cc b/server/core/test/test_worker.cc index 74c0a9c9f..ceecbd573 100644 --- a/server/core/test/test_worker.cc +++ b/server/core/test/test_worker.cc @@ -15,6 +15,7 @@ #include #include "../internal/poll.h" +using namespace maxscale; using namespace std; namespace @@ -30,44 +31,14 @@ int64_t get_monotonic_time_ms() return ts.tv_sec * 1000 + ts.tv_nsec / 1000000; } -class TestWorker : public maxscale::Worker -{ - TestWorker(const TestWorker&); - TestWorker& operator = (const TestWorker&); - -public: - TestWorker() - { - } - - ~TestWorker() - { - } - -private: - // TODO: Perhaps these could have default implementations, so that - // TODO: Worker could be used as such. - bool pre_run() // override - { - return true; - } - - void post_run() // override - { - } - - void epoll_tick() // override - { - } -}; - class TimerTest { public: static int s_ticks; - TimerTest(int* pRv, int32_t delay) + TimerTest(Worker* pWorker, int* pRv, int32_t delay) : m_id(s_id++) + , m_worker(*pWorker) , m_delay(delay) , m_at(get_monotonic_time_ms() + delay) , m_rv(*pRv) @@ -100,7 +71,7 @@ public: if (--s_ticks < 0) { - maxscale::Worker::shutdown_all(); + m_worker.shutdown(); } rv = true; @@ -113,6 +84,7 @@ private: static int s_id; int m_id; + Worker& m_worker; int32_t m_delay; int64_t m_at; int& m_rv; @@ -127,13 +99,13 @@ int run() TimerTest::s_ticks = 100; - TestWorker w; + Worker w; - TimerTest t1(&rv, 200); - TimerTest t2(&rv, 300); - TimerTest t3(&rv, 400); - TimerTest t4(&rv, 500); - TimerTest t5(&rv, 600); + TimerTest t1(&w, &rv, 200); + TimerTest t2(&w, &rv, 300); + TimerTest t3(&w, &rv, 400); + TimerTest t4(&w, &rv, 500); + TimerTest t5(&w, &rv, 600); w.delayed_call(t1.delay(), &TimerTest::tick, &t1); w.delayed_call(t2.delay(), &TimerTest::tick, &t2); diff --git a/server/core/worker.cc b/server/core/worker.cc index 0d26d5fa3..afb818cb0 100644 --- a/server/core/worker.cc +++ b/server/core/worker.cc @@ -57,27 +57,18 @@ const int MXS_WORKER_MSG_DISPOSABLE_TASK = -2; */ struct this_unit { - bool initialized; // Whether the initialization has been performed. - Worker** ppWorkers; // Array of worker instances. - int next_worker_id; // Next worker id. + bool initialized; // Whether the initialization has been performed. } this_unit = { false, // initialized - NULL, // ppWorkers - 0, // next_worker_id }; -int next_worker_id() -{ - return atomic_add(&this_unit.next_worker_id, 1); -} - thread_local struct this_thread { - int current_worker_id; // The worker id of the current thread + Worker* pCurrent_worker; // The current worker } this_thread = { - WORKER_ABSENT_ID + nullptr }; /** @@ -260,9 +251,9 @@ void WorkerTimer::cancel() start(0); } -uint32_t WorkerTimer::handle(int wid, uint32_t events) +uint32_t WorkerTimer::handle(Worker* pWorker, uint32_t events) { - ss_dassert(wid == m_pWorker->id()); + ss_dassert(pWorker == m_pWorker); ss_dassert(events & EPOLLIN); ss_dassert((events & ~EPOLLIN) == 0); @@ -278,9 +269,9 @@ uint32_t WorkerTimer::handle(int wid, uint32_t events) } //static -uint32_t WorkerTimer::handler(MXS_POLL_DATA* pThis, int wid, uint32_t events) +uint32_t WorkerTimer::handler(MXS_POLL_DATA* pThis, void* pWorker, uint32_t events) { - return static_cast(pThis)->handle(wid, events); + return static_cast(pThis)->handle(static_cast(pWorker), events); } namespace @@ -306,8 +297,7 @@ int create_epoll_instance() uint32_t Worker::s_next_delayed_call_id = 1; Worker::Worker() - : m_id(next_worker_id()) - , m_epoll_fd(create_epoll_instance()) + : m_epoll_fd(create_epoll_instance()) , m_state(STOPPED) , m_pQueue(NULL) , m_thread(0) @@ -336,14 +326,10 @@ Worker::Worker() ss_dassert(!true); } } - - this_unit.ppWorkers[m_id] = this; } Worker::~Worker() { - this_unit.ppWorkers[m_id] = NULL; - ss_dassert(!m_started); delete m_pTimer; @@ -363,19 +349,7 @@ bool Worker::init() { ss_dassert(!this_unit.initialized); - Worker** ppWorkers = new (std::nothrow) Worker* [MXS_MAX_THREADS] (); // Zero initialized array - - if (ppWorkers) - { - this_unit.ppWorkers = ppWorkers; - - this_unit.initialized = true; - } - else - { - MXS_OOM(); - ss_dassert(!true); - } + this_unit.initialized = true; return this_unit.initialized; } @@ -384,175 +358,9 @@ void Worker::finish() { ss_dassert(this_unit.initialized); - delete [] this_unit.ppWorkers; - this_unit.ppWorkers = NULL; - this_unit.initialized = false; } -namespace -{ - -int64_t one_stats_get(int64_t Worker::STATISTICS::*what, enum ts_stats_type type) -{ - int64_t best = type == TS_STATS_MAX ? LONG_MIN : (type == TS_STATS_MIX ? LONG_MAX : 0); - - int nWorkers = this_unit.next_worker_id; - - for (int i = 0; i < nWorkers; ++i) - { - Worker* pWorker = Worker::get(i); - ss_dassert(pWorker); - - const Worker::STATISTICS& s = pWorker->statistics(); - - int64_t value = s.*what; - - switch (type) - { - case TS_STATS_MAX: - if (value > best) - { - best = value; - } - break; - - case TS_STATS_MIX: - if (value < best) - { - best = value; - } - break; - - case TS_STATS_AVG: - case TS_STATS_SUM: - best += value; - break; - } - } - - return type == TS_STATS_AVG ? best / (nWorkers != 0 ? nWorkers : 1) : best; -} - -} - -//static -Worker::STATISTICS Worker::get_statistics() -{ - STATISTICS cs; - - cs.n_read = one_stats_get(&STATISTICS::n_read, TS_STATS_SUM); - cs.n_write = one_stats_get(&STATISTICS::n_write, TS_STATS_SUM); - cs.n_error = one_stats_get(&STATISTICS::n_error, TS_STATS_SUM); - cs.n_hup = one_stats_get(&STATISTICS::n_hup, TS_STATS_SUM); - cs.n_accept = one_stats_get(&STATISTICS::n_accept, TS_STATS_SUM); - cs.n_polls = one_stats_get(&STATISTICS::n_polls, TS_STATS_SUM); - cs.n_pollev = one_stats_get(&STATISTICS::n_pollev, TS_STATS_SUM); - cs.n_nbpollev = one_stats_get(&STATISTICS::n_nbpollev, TS_STATS_SUM); - cs.evq_length = one_stats_get(&STATISTICS::evq_length, TS_STATS_AVG); - cs.evq_max = one_stats_get(&STATISTICS::evq_max, TS_STATS_MAX); - cs.blockingpolls = one_stats_get(&STATISTICS::blockingpolls, TS_STATS_SUM); - cs.maxqtime = one_stats_get(&STATISTICS::maxqtime, TS_STATS_MAX); - cs.maxexectime = one_stats_get(&STATISTICS::maxexectime, TS_STATS_MAX); - - for (int i = 0; i < Worker::STATISTICS::MAXNFDS - 1; i++) - { - for (int j = 0; j < this_unit.next_worker_id; ++j) - { - Worker* pWorker = Worker::get(j); - ss_dassert(pWorker); - - cs.n_fds[i] += pWorker->statistics().n_fds[i]; - } - } - - for (int i = 0; i <= Worker::STATISTICS::N_QUEUE_TIMES; ++i) - { - int nWorkers = this_unit.next_worker_id; - - for (int j = 0; j < nWorkers; ++j) - { - Worker* pWorker = Worker::get(j); - ss_dassert(pWorker); - - cs.qtimes[i] += pWorker->statistics().qtimes[i]; - cs.exectimes[i] += pWorker->statistics().exectimes[i]; - } - - cs.qtimes[i] /= (nWorkers != 0 ? nWorkers : 1); - cs.exectimes[i] /= (nWorkers != 0 ? nWorkers : 1); - } - - return cs; -} - -//static -int64_t Worker::get_one_statistic(POLL_STAT what) -{ - int64_t rv = 0; - - int64_t Worker::STATISTICS::*member = NULL; - enum ts_stats_type approach; - - switch (what) - { - case POLL_STAT_READ: - member = &Worker::STATISTICS::n_read; - approach = TS_STATS_SUM; - break; - - case POLL_STAT_WRITE: - member = &Worker::STATISTICS::n_write; - approach = TS_STATS_SUM; - break; - - case POLL_STAT_ERROR: - member = &Worker::STATISTICS::n_error; - approach = TS_STATS_SUM; - break; - - case POLL_STAT_HANGUP: - member = &Worker::STATISTICS::n_hup; - approach = TS_STATS_SUM; - break; - - case POLL_STAT_ACCEPT: - member = &Worker::STATISTICS::n_accept; - approach = TS_STATS_SUM; - break; - - case POLL_STAT_EVQ_LEN: - member = &Worker::STATISTICS::evq_length; - approach = TS_STATS_AVG; - break; - - case POLL_STAT_EVQ_MAX: - member = &Worker::STATISTICS::evq_max; - approach = TS_STATS_MAX; - break; - - case POLL_STAT_MAX_QTIME: - member = &Worker::STATISTICS::maxqtime; - approach = TS_STATS_MAX; - break; - - case POLL_STAT_MAX_EXECTIME: - member = &Worker::STATISTICS::maxexectime; - approach = TS_STATS_MAX; - break; - - default: - ss_dassert(!true); - } - - if (member) - { - rv = one_stats_get(member, approach); - } - - return rv; -} - void Worker::get_descriptor_counts(uint32_t* pnCurrent, uint64_t* pnTotal) { *pnCurrent = atomic_load_uint32(&m_nCurrent_descriptors); @@ -606,30 +414,9 @@ bool Worker::remove_fd(int fd) return rv; } -Worker* Worker::get(int worker_id) -{ - ss_dassert((worker_id >= 0) && (worker_id < this_unit.next_worker_id)); - - return this_unit.ppWorkers[worker_id]; -} - Worker* Worker::get_current() { - Worker* pWorker = NULL; - - int worker_id = get_current_id(); - - if (worker_id != WORKER_ABSENT_ID) - { - pWorker = Worker::get(worker_id); - } - - return pWorker; -} - -int Worker::get_current_id() -{ - return this_thread.current_worker_id; + return this_thread.pCurrent_worker; } bool Worker::post(Task* pTask, Semaphore* pSem, enum execute_mode_t mode) @@ -690,81 +477,6 @@ bool Worker::post_disposable(DisposableTask* pTask, enum execute_mode_t mode) return posted; } -//static -size_t Worker::broadcast(Task* pTask, Semaphore* pSem) -{ - // No logging here, function must be signal safe. - size_t n = 0; - - int nWorkers = this_unit.next_worker_id; - for (int i = 0; i < nWorkers; ++i) - { - Worker* pWorker = this_unit.ppWorkers[i]; - ss_dassert(pWorker); - - if (pWorker->post(pTask, pSem)) - { - ++n; - } - } - - return n; -} - -//static -size_t Worker::broadcast(std::auto_ptr sTask) -{ - DisposableTask* pTask = sTask.release(); - pTask->inc_ref(); - - size_t n = 0; - - int nWorkers = this_unit.next_worker_id; - for (int i = 0; i < nWorkers; ++i) - { - Worker* pWorker = this_unit.ppWorkers[i]; - ss_dassert(pWorker); - - if (pWorker->post_disposable(pTask)) - { - ++n; - } - } - - pTask->dec_ref(); - - return n; -} - -//static -size_t Worker::execute_serially(Task& task) -{ - Semaphore sem; - size_t n = 0; - - int nWorkers = this_unit.next_worker_id; - for (int i = 0; i < nWorkers; ++i) - { - Worker* pWorker = this_unit.ppWorkers[i]; - ss_dassert(pWorker); - - if (pWorker->post(&task, &sem)) - { - sem.wait(); - ++n; - } - } - - return n; -} - -//static -size_t Worker::execute_concurrently(Task& task) -{ - Semaphore sem; - return sem.wait_n(Worker::broadcast(&task, &sem)); -} - bool Worker::post_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2) { // NOTE: No logging here, this function must be signal safe. @@ -773,40 +485,19 @@ bool Worker::post_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2) return m_pQueue->post(message); } -size_t Worker::broadcast_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2) -{ - // NOTE: No logging here, this function must be signal safe. - - size_t n = 0; - - int nWorkers = this_unit.next_worker_id; - for (int i = 0; i < nWorkers; ++i) - { - Worker* pWorker = this_unit.ppWorkers[i]; - ss_dassert(pWorker); - - if (pWorker->post_message(msg_id, arg1, arg2)) - { - ++n; - } - } - - return n; -} - void Worker::run() { - this_thread.current_worker_id = m_id; + this_thread.pCurrent_worker = this; if (pre_run()) { poll_waitevents(); post_run(); - MXS_INFO("Worker %d has shut down.", m_id); + MXS_INFO("Worker %p has shut down.", this); } - this_thread.current_worker_id = WORKER_ABSENT_ID; + this_thread.pCurrent_worker = nullptr; } bool Worker::start(size_t stack_size) @@ -825,9 +516,9 @@ void Worker::join() { if (m_started) { - MXS_INFO("Waiting for worker %d.", m_id); + MXS_INFO("Waiting for worker %p.", this); thread_wait(m_thread); - MXS_INFO("Waited for worker %d.", m_id); + MXS_INFO("Waited for worker %p.", this); m_started = false; } } @@ -845,21 +536,6 @@ void Worker::shutdown() } } -void Worker::shutdown_all() -{ - // NOTE: No logging here, this function must be signal safe. - ss_dassert((this_unit.next_worker_id == 0) || (this_unit.ppWorkers != NULL)); - - int nWorkers = this_unit.next_worker_id; - for (int i = 0; i < nWorkers; ++i) - { - Worker* pWorker = this_unit.ppWorkers[i]; - ss_dassert(pWorker); - - pWorker->shutdown(); - } -} - /** * The worker message handler. * @@ -876,23 +552,23 @@ void Worker::handle_message(MessageQueue& queue, const MessageQueue::Message& ms ss_dassert(msg.arg1() == 0); char* zArg2 = reinterpret_cast(msg.arg2()); const char* zMessage = zArg2 ? zArg2 : "Alive and kicking"; - MXS_NOTICE("Worker[%d]: %s.", m_id, zMessage); + MXS_NOTICE("Worker[%p]: %s.", this, zMessage); MXS_FREE(zArg2); } break; case MXS_WORKER_MSG_SHUTDOWN: { - MXS_INFO("Worker %d received shutdown message.", m_id); + MXS_INFO("Worker %p received shutdown message.", this); m_should_shutdown = true; } break; case MXS_WORKER_MSG_CALL: { - void (*f)(int, void*) = (void (*)(int, void*))msg.arg1(); + void (*f)(MXS_WORKER*, void*) = (void (*)(MXS_WORKER*, void*))msg.arg1(); - f(m_id, (void*)msg.arg2()); + f(this, (void*)msg.arg2()); } break; @@ -1083,7 +759,7 @@ void Worker::poll_waitevents() MXS_POLL_DATA *data = (MXS_POLL_DATA*)events[i].data.ptr; - uint32_t actions = data->handler(data, m_id, events[i].events); + uint32_t actions = data->handler(data, this, events[i].events); if (actions & MXS_POLL_ACCEPT) { @@ -1294,107 +970,6 @@ bool Worker::cancel_delayed_call(uint32_t id) } -namespace -{ - -class WorkerInfoTask: public maxscale::WorkerTask -{ -public: - WorkerInfoTask(const char* zHost, uint32_t nThreads) - : m_zHost(zHost) - { - m_data.resize(nThreads); - } - - void execute(Worker& worker) - { - json_t* pStats = json_object(); - const Worker::STATISTICS& s = worker.get_local_statistics(); - json_object_set_new(pStats, "reads", json_integer(s.n_read)); - json_object_set_new(pStats, "writes", json_integer(s.n_write)); - json_object_set_new(pStats, "errors", json_integer(s.n_error)); - json_object_set_new(pStats, "hangups", json_integer(s.n_hup)); - json_object_set_new(pStats, "accepts", json_integer(s.n_accept)); - json_object_set_new(pStats, "blocking_polls", json_integer(s.blockingpolls)); - json_object_set_new(pStats, "event_queue_length", json_integer(s.evq_length)); - json_object_set_new(pStats, "max_event_queue_length", json_integer(s.evq_max)); - json_object_set_new(pStats, "max_exec_time", json_integer(s.maxexectime)); - json_object_set_new(pStats, "max_queue_time", json_integer(s.maxqtime)); - - uint32_t nCurrent; - uint64_t nTotal; - worker.get_descriptor_counts(&nCurrent, &nTotal); - json_object_set_new(pStats, "current_descriptors", json_integer(nCurrent)); - json_object_set_new(pStats, "total_descriptors", json_integer(nTotal)); - - json_t* load = json_object(); - json_object_set_new(load, "last_second", json_integer(worker.load(Worker::Load::ONE_SECOND))); - json_object_set_new(load, "last_minute", json_integer(worker.load(Worker::Load::ONE_MINUTE))); - json_object_set_new(load, "last_hour", json_integer(worker.load(Worker::Load::ONE_HOUR))); - json_object_set_new(pStats, "load", load); - - json_t* pAttr = json_object(); - json_object_set_new(pAttr, "stats", pStats); - - int idx = worker.get_current_id(); - stringstream ss; - ss << idx; - - json_t* pJson = json_object(); - json_object_set_new(pJson, CN_ID, json_string(ss.str().c_str())); - json_object_set_new(pJson, CN_TYPE, json_string(CN_THREADS)); - json_object_set_new(pJson, CN_ATTRIBUTES, pAttr); - json_object_set_new(pJson, CN_LINKS, mxs_json_self_link(m_zHost, CN_THREADS, ss.str().c_str())); - - ss_dassert((size_t)idx < m_data.size()); - m_data[idx] = pJson; - } - - json_t* resource() - { - json_t* pArr = json_array(); - - for (auto it = m_data.begin(); it != m_data.end(); it++) - { - json_array_append_new(pArr, *it); - } - - return mxs_json_resource(m_zHost, MXS_JSON_API_THREADS, pArr); - } - - json_t* resource(int id) - { - stringstream self; - self << MXS_JSON_API_THREADS << id; - return mxs_json_resource(m_zHost, self.str().c_str(), m_data[id]); - } - -private: - vector m_data; - const char* m_zHost; -}; - -} - -json_t* mxs_worker_to_json(const char* zHost, int id) -{ - Worker* target = Worker::get(id); - WorkerInfoTask task(zHost, id + 1); - mxs::Semaphore sem; - - target->post(&task, &sem); - sem.wait(); - - return task.resource(id); -} - -json_t* mxs_worker_list_to_json(const char* host) -{ - WorkerInfoTask task(host, config_threadcount()); - Worker::execute_concurrently(task); - return task.resource(); -} - MXS_WORKER* mxs_worker_get_current() { return Worker::get_current(); diff --git a/server/modules/protocol/MySQL/mariadb_client.cc b/server/modules/protocol/MySQL/mariadb_client.cc index ec8ca5aa7..8e6f55245 100644 --- a/server/modules/protocol/MySQL/mariadb_client.cc +++ b/server/modules/protocol/MySQL/mariadb_client.cc @@ -230,7 +230,7 @@ void LocalClient::drain_queue() } } -uint32_t LocalClient::poll_handler(struct mxs_poll_data* data, int wid, uint32_t events) +uint32_t LocalClient::poll_handler(struct mxs_poll_data* data, void* worker, uint32_t events) { LocalClient* client = static_cast(data); client->process(events); diff --git a/server/modules/routing/binlogrouter/blr_master.cc b/server/modules/routing/binlogrouter/blr_master.cc index 2e27a97d0..d138db331 100644 --- a/server/modules/routing/binlogrouter/blr_master.cc +++ b/server/modules/routing/binlogrouter/blr_master.cc @@ -109,7 +109,7 @@ extern int blr_write_special_event(ROUTER_INSTANCE *router, extern int blr_file_new_binlog(ROUTER_INSTANCE *router, char *file); static bool blr_handle_missing_files(ROUTER_INSTANCE *router, char *new_file); -static void worker_cb_start_master(int worker_id, void* data); +static void worker_cb_start_master(MXS_WORKER*, void* data); extern void blr_file_update_gtid(ROUTER_INSTANCE *router); static int blr_check_connect_retry(ROUTER_INSTANCE *router); @@ -284,13 +284,13 @@ static void blr_start_master(void* data) /** * Callback start function to be called in the context of the main worker. * - * @param worker_id The id of the worker in whose context the function is called. - * @param data The data to be passed to `blr_start_master` + * @param worker The worker in whose context the function is called. + * @param data The data to be passed to `blr_start_master` */ -static void worker_cb_start_master(int worker_id, void* data) +static void worker_cb_start_master(MXS_WORKER* worker, void* data) { // This is itended to be called only in the main worker. - ss_dassert(worker_id == 0); + ss_dassert(worker == mxs_rworker_get(MXS_RWORKER_MAIN)); blr_start_master(data); } @@ -325,10 +325,10 @@ bool blr_start_master_in_main(void* data) * @param worker_id The id of the worker in whose context the function is called. * @param data The data to be passed to `blr_start_master` */ -static void worker_cb_close_master(int worker_id, void* data) +static void worker_cb_close_master(MXS_WORKER* worker, void* data) { // This is itended to be called only in the main worker. - ss_dassert(worker_id == 0); + ss_dassert(worker == mxs_rworker_get(MXS_RWORKER_MAIN)); blr_master_close(static_cast(data)); }