/* * Copyright (c) 2016 MariaDB Corporation Ab * * Use of this software is governed by the Business Source License included * in the LICENSE.TXT file and at www.mariadb.com/bsl11. * * Change Date: 2026-01-04 * * On the date above, in accordance with the Business Source License, use * of this software will be governed by version 2 or later of the General * Public License. */ #include #include #include #include #include #include #ifdef HAVE_SYSTEMD #include #endif #include #include #include #include #include #include #include #include #include #include #include #include "internal/dcb.hh" #include "internal/modules.hh" #include "internal/poll.hh" #include "internal/service.hh" #include "internal/session.hh" #define WORKER_ABSENT_ID -1 using maxbase::Semaphore; using maxbase::Worker; using maxbase::WorkerLoad; using maxscale::RoutingWorker; using maxscale::Closer; using std::vector; using std::stringstream; namespace { /** * Unit variables. */ struct this_unit { bool initialized; // Whether the initialization has been performed. int nWorkers; // How many routing workers there are. RoutingWorker** ppWorkers; // Array of routing worker instances. int next_worker_id; // Next worker id // DEPRECATED in 2.3, remove in 2.4. int number_poll_spins; // Maximum non-block polls // DEPRECATED in 2.3, remove in 2.4. int max_poll_sleep; // Maximum block time int epoll_listener_fd; // Shared epoll descriptor for listening descriptors. int id_main_worker; // The id of the worker running in the main thread. int id_min_worker; // The smallest routing worker id. int id_max_worker; // The largest routing worker id. } this_unit = { false, // initialized 0, // nWorkers NULL, // ppWorkers 0, // next_worker_id 0, // number_poll_spins 0, // max_poll_sleep -1, // epoll_listener_fd WORKER_ABSENT_ID, // id_main_worker WORKER_ABSENT_ID, // id_min_worker WORKER_ABSENT_ID, // id_max_worker }; int next_worker_id() { return mxb::atomic::add(&this_unit.next_worker_id, 1, mxb::atomic::RELAXED); } thread_local struct this_thread { int current_worker_id; // The worker id of the current thread } this_thread = { WORKER_ABSENT_ID }; /** * Calls thread_init on all loaded modules. * * @return True, if all modules were successfully initialized. */ bool modules_thread_init() { bool initialized = false; MXS_MODULE_ITERATOR i = mxs_module_iterator_get(NULL); MXS_MODULE* module = NULL; while ((module = mxs_module_iterator_get_next(&i)) != NULL) { if (module->thread_init) { int rc = (module->thread_init)(); if (rc != 0) { break; } } } if (module) { // If module is non-NULL it means that the initialization failed for // that module. We now need to call finish on all modules that were // successfully initialized. MXS_MODULE* failed_module = module; i = mxs_module_iterator_get(NULL); while ((module = mxs_module_iterator_get_next(&i)) != failed_module) { if (module->thread_finish) { (module->thread_finish)(); } } } else { initialized = true; } return initialized; } /** * Calls thread_finish on all loaded modules. */ void modules_thread_finish() { MXS_MODULE_ITERATOR i = mxs_module_iterator_get(NULL); MXS_MODULE* module = NULL; while ((module = mxs_module_iterator_get_next(&i)) != NULL) { if (module->thread_finish) { (module->thread_finish)(); } } } bool can_close_dcb(DCB* dcb) { const int SHOW_SHUTDOWN_TIMEOUT = 2; auto idle = MXS_CLOCK_TO_SEC(mxs_clock() - dcb->last_read); return idle > SHOW_SHUTDOWN_TIMEOUT // Timed out || !dcb->func.can_close // Not implemented || dcb->func.can_close(dcb); // Protocol says it's OK to close } } namespace maxscale { // static maxbase::Duration RoutingWorker::s_watchdog_interval {0}; // static maxbase::TimePoint RoutingWorker::s_watchdog_next_check; class RoutingWorker::WatchdogNotifier { WatchdogNotifier(const WatchdogNotifier&) = delete; WatchdogNotifier& operator=(const WatchdogNotifier&) = delete; public: WatchdogNotifier(mxs::RoutingWorker* pOwner) : m_owner(*pOwner) , m_nClients(0) , m_terminate(false) { m_thread = std::thread([this] { uint32_t interval = mxs::RoutingWorker::s_watchdog_interval.secs(); timespec timeout = {interval, 0}; while (!mxb::atomic::load(&m_terminate, mxb::atomic::RELAXED)) { // We will wakeup when someone wants the notifier to run, // or when MaxScale is going down. m_sem_start.wait(); if (!mxb::atomic::load(&m_terminate, mxb::atomic::RELAXED)) { // If MaxScale is not going down... do { // we check the systemd watchdog... m_owner.check_systemd_watchdog(); } while (!m_sem_stop.timedwait(timeout)); // until the semaphore is actually posted, which it will be // once the notification should stop. } } }); } ~WatchdogNotifier() { mxb_assert(m_nClients == 0); mxb::atomic::store(&m_terminate, true, mxb::atomic::RELAXED); m_sem_start.post(); m_thread.join(); } void start() { Guard guard(m_lock); mxb::atomic::add(&m_nClients, 1, mxb::atomic::RELAXED); if (m_nClients == 1) { m_sem_start.post(); } } void stop() { Guard guard(m_lock); mxb::atomic::add(&m_nClients, -1, mxb::atomic::RELAXED); mxb_assert(m_nClients >= 0); if (m_nClients == 0) { m_sem_stop.post(); } } private: using Guard = std::lock_guard; mxs::RoutingWorker& m_owner; int m_nClients; bool m_terminate; std::thread m_thread; std::mutex m_lock; mxb::Semaphore m_sem_start; mxb::Semaphore m_sem_stop; }; RoutingWorker::RoutingWorker() : m_id(next_worker_id()) , m_alive(true) , m_pWatchdog_notifier(nullptr) { MXB_POLL_DATA::handler = &RoutingWorker::epoll_instance_handler; MXB_POLL_DATA::owner = this; if (s_watchdog_interval.count() != 0) { m_pWatchdog_notifier = new WatchdogNotifier(this); } } RoutingWorker::~RoutingWorker() { for (uint64_t key = 0; key < m_local_data.size(); ++key) { auto* pData = m_local_data[key]; auto deleter = m_data_deleters[key]; if (pData && deleter) { deleter(pData); } } delete m_pWatchdog_notifier; } // static bool RoutingWorker::init() { mxb_assert(!this_unit.initialized); this_unit.number_poll_spins = config_nbpolls(); this_unit.max_poll_sleep = config_pollsleep(); this_unit.epoll_listener_fd = epoll_create(MAX_EVENTS); if (this_unit.epoll_listener_fd != -1) { int nWorkers = config_threadcount(); RoutingWorker** ppWorkers = new(std::nothrow) RoutingWorker* [MXS_MAX_THREADS](); // 0-inited // array if (ppWorkers) { int id_main_worker = WORKER_ABSENT_ID; int id_min_worker = INT_MAX; int id_max_worker = INT_MIN; int i; for (i = 0; i < nWorkers; ++i) { RoutingWorker* pWorker = RoutingWorker::create(this_unit.epoll_listener_fd); if (pWorker) { int id = pWorker->id(); // The first created worker will be the main worker. if (id_main_worker == WORKER_ABSENT_ID) { id_main_worker = id; } if (id < id_min_worker) { id_min_worker = id; } if (id > id_max_worker) { id_max_worker = id; } ppWorkers[i] = pWorker; } else { for (int j = i - 1; j >= 0; --j) { delete ppWorkers[j]; } delete[] ppWorkers; ppWorkers = NULL; break; } } if (ppWorkers) { this_unit.ppWorkers = ppWorkers; this_unit.nWorkers = nWorkers; this_unit.id_main_worker = id_main_worker; this_unit.id_min_worker = id_min_worker; this_unit.id_max_worker = id_max_worker; this_unit.initialized = true; } } else { MXS_OOM(); close(this_unit.epoll_listener_fd); } } else { MXS_ALERT("Could not allocate an epoll instance."); } if (this_unit.initialized) { // When the initialization has successfully been performed, we set the // current_worker_id of this thread to 0. That way any connections that // are made during service startup (after this function returns, but // bofore the workes have been started) will be handled by the worker // that will be running in the main thread. this_thread.current_worker_id = 0; if (s_watchdog_interval.count() != 0) { MXS_NOTICE("The systemd watchdog is Enabled. Internal timeout = %s\n", to_string(s_watchdog_interval).c_str()); } } return this_unit.initialized; } void RoutingWorker::finish() { mxb_assert(this_unit.initialized); for (int i = this_unit.id_max_worker; i >= this_unit.id_min_worker; --i) { RoutingWorker* pWorker = this_unit.ppWorkers[i]; mxb_assert(pWorker); delete pWorker; this_unit.ppWorkers[i] = NULL; } delete[] this_unit.ppWorkers; this_unit.ppWorkers = NULL; close(this_unit.epoll_listener_fd); this_unit.epoll_listener_fd = 0; this_unit.initialized = false; } // static bool RoutingWorker::add_shared_fd(int fd, uint32_t events, MXB_POLL_DATA* pData) { bool rv = true; // This must be level-triggered. Since this is intended for listening // sockets and each worker will call accept() just once before going // back the epoll_wait(), using EPOLLET would mean that if there are // more clients to be accepted than there are threads returning from // epoll_wait() for an event, then some clients would be accepted only // when a new client has connected, thus causing a new EPOLLIN event. events &= ~EPOLLET; struct epoll_event ev; ev.events = events; ev.data.ptr = pData; // The main worker takes ownership of all shared fds pData->owner = RoutingWorker::get(RoutingWorker::MAIN); if (epoll_ctl(this_unit.epoll_listener_fd, EPOLL_CTL_ADD, fd, &ev) != 0) { Worker::resolve_poll_error(fd, errno, EPOLL_CTL_ADD); rv = false; } return rv; } // static bool RoutingWorker::remove_shared_fd(int fd) { bool rv = true; struct epoll_event ev = {}; if (epoll_ctl(this_unit.epoll_listener_fd, EPOLL_CTL_DEL, fd, &ev) != 0) { Worker::resolve_poll_error(fd, errno, EPOLL_CTL_DEL); rv = false; } return rv; } bool mxs_worker_should_shutdown(MXB_WORKER* pWorker) { return static_cast(pWorker)->should_shutdown(); } RoutingWorker* RoutingWorker::get(int worker_id) { mxb_assert(this_unit.initialized); if (worker_id == MAIN) { worker_id = this_unit.id_main_worker; } bool valid = (worker_id >= this_unit.id_min_worker && worker_id <= this_unit.id_max_worker); return valid ? this_unit.ppWorkers[worker_id] : nullptr; } RoutingWorker* RoutingWorker::get_current() { RoutingWorker* pWorker = NULL; int worker_id = get_current_id(); if (worker_id != WORKER_ABSENT_ID) { pWorker = RoutingWorker::get(worker_id); } return pWorker; } int RoutingWorker::get_current_id() { return this_thread.current_worker_id; } // static bool RoutingWorker::start_workers() { bool rv = true; for (int i = this_unit.id_min_worker; i <= this_unit.id_max_worker; ++i) { RoutingWorker* pWorker = this_unit.ppWorkers[i]; mxb_assert(pWorker); if (!pWorker->start()) { MXS_ALERT("Could not start routing worker %d of %d.", i, config_threadcount()); rv = false; // At startup, so we don't even try to clean up. break; } } return rv; } // static void RoutingWorker::join_workers() { for (int i = this_unit.id_min_worker; i <= this_unit.id_max_worker; i++) { RoutingWorker* pWorker = this_unit.ppWorkers[i]; mxb_assert(pWorker); pWorker->join(); } } // static void RoutingWorker::set_nonblocking_polls(unsigned int nbpolls) { this_unit.number_poll_spins = nbpolls; } // static void RoutingWorker::set_maxwait(unsigned int maxwait) { this_unit.max_poll_sleep = maxwait; } RoutingWorker::SessionsById& RoutingWorker::session_registry() { return m_sessions; } void RoutingWorker::register_zombie(DCB* pDcb) { mxb_assert(pDcb->owner == this); m_zombies.push_back(pDcb); } void RoutingWorker::delete_zombies() { Zombies slow_zombies; // An algorithm cannot be used, as the final closing of a DCB may cause // other DCBs to be registered in the zombie queue. while (!m_zombies.empty()) { DCB* pDcb = m_zombies.back(); m_zombies.pop_back(); bool can_close = true; if (pDcb->role == DCB::Role::CLIENT) { // Check if any of the backend DCBs isn't ready to be closed. If so, delay the closing of the // client DCB until the backend connections have fully established and finished authenticating. const auto& dcbs = static_cast(pDcb->session)->dcb_set(); can_close = std::all_of(dcbs.begin(), dcbs.end(), can_close_dcb); } if (can_close) { MXS_DEBUG("Ready to close session %lu", pDcb->session->ses_id); dcb_final_close(pDcb); } else { MXS_DEBUG("Delaying destruction of session %lu", pDcb->session->ses_id); slow_zombies.push_back(pDcb); } } mxb_assert(m_zombies.empty()); m_zombies.insert(m_zombies.end(), slow_zombies.begin(), slow_zombies.end()); } bool RoutingWorker::pre_run() { this_thread.current_worker_id = m_id; bool rv = modules_thread_init() && qc_thread_init(QC_INIT_SELF); if (!rv) { MXS_ERROR("Could not perform thread initialization for all modules. Thread exits."); this_thread.current_worker_id = WORKER_ABSENT_ID; } return rv; } void RoutingWorker::post_run() { modules_thread_finish(); qc_thread_end(QC_INIT_SELF); // TODO: Add service_thread_finish(). this_thread.current_worker_id = WORKER_ABSENT_ID; } /** * Creates a worker instance. * - Allocates the structure. * - Creates a pipe. * - Adds the read descriptor to the polling mechanism. * * @param epoll_listener_fd The file descriptor of the epoll set to which listening * sockets will be placed. * * @return A worker instance if successful, otherwise NULL. */ // static RoutingWorker* RoutingWorker::create(int epoll_listener_fd) { RoutingWorker* pThis = new(std::nothrow) RoutingWorker(); if (pThis) { struct epoll_event ev; ev.events = EPOLLIN; MXB_POLL_DATA* pData = pThis; ev.data.ptr = pData; // Necessary for pointer adjustment, otherwise downcast will not work. // The shared epoll instance descriptor is *not* added using EPOLLET (edge-triggered) // because we want it to be level-triggered. That way, as long as there is a single // active (accept() can be called) listening socket, epoll_wait() will return an event // for it. It must be like that because each worker will call accept() just once before // calling epoll_wait() again. The end result is that as long as the load of different // workers is roughly the same, the client connections will be distributed evenly across // the workers. If the load is not the same, then a worker with less load will get more // clients that a worker with more load. if (epoll_ctl(pThis->m_epoll_fd, EPOLL_CTL_ADD, epoll_listener_fd, &ev) == 0) { MXS_INFO("Epoll instance for listening sockets added to worker epoll instance."); } else { MXS_ERROR("Could not add epoll instance for listening sockets to " "epoll instance of worker: %s", mxs_strerror(errno)); delete pThis; pThis = NULL; } } else { MXS_OOM(); } return pThis; } void RoutingWorker::epoll_tick() { dcb_process_timeouts(m_id); delete_zombies(); check_systemd_watchdog(); } /** * Callback for events occurring on the shared epoll instance. * * @param pData Will point to a Worker instance. * @param wid The worker id. * @param events The events. * * @return What actions were performed. */ // static uint32_t RoutingWorker::epoll_instance_handler(MXB_POLL_DATA* pData, MXB_WORKER* pWorker, uint32_t events) { RoutingWorker* pThis = static_cast(pData); mxb_assert(pThis == pWorker); return pThis->handle_epoll_events(events); } /** * Handler for events occurring in the shared epoll instance. * * @param events The events. * * @return What actions were performed. */ uint32_t RoutingWorker::handle_epoll_events(uint32_t events) { struct epoll_event epoll_events[1]; // We extract just one event int nfds = epoll_wait(this_unit.epoll_listener_fd, epoll_events, 1, 0); uint32_t actions = MXB_POLL_NOP; if (nfds == -1) { MXS_ERROR("epoll_wait failed: %s", mxs_strerror(errno)); } else if (nfds == 0) { MXS_DEBUG("No events for worker %d.", m_id); } else { MXS_DEBUG("1 event for worker %d.", m_id); MXB_POLL_DATA* pData = static_cast(epoll_events[0].data.ptr); actions = pData->handler(pData, this, epoll_events[0].events); } return actions; } // static size_t RoutingWorker::broadcast(Task* pTask, Semaphore* pSem) { // No logging here, function must be signal safe. size_t n = 0; int nWorkers = this_unit.next_worker_id; for (int i = 0; i < nWorkers; ++i) { Worker* pWorker = this_unit.ppWorkers[i]; mxb_assert(pWorker); if (pWorker->execute(pTask, pSem, EXECUTE_AUTO)) { ++n; } } return n; } // static size_t RoutingWorker::broadcast(std::unique_ptr sTask) { DisposableTask* pTask = sTask.release(); Worker::inc_ref(pTask); size_t n = 0; int nWorkers = this_unit.next_worker_id; for (int i = 0; i < nWorkers; ++i) { RoutingWorker* pWorker = this_unit.ppWorkers[i]; mxb_assert(pWorker); if (pWorker->post_disposable(pTask, EXECUTE_AUTO)) { ++n; } } Worker::dec_ref(pTask); return n; } // static size_t RoutingWorker::broadcast(std::function func, mxb::Semaphore* pSem, mxb::Worker::execute_mode_t mode) { size_t n = 0; int nWorkers = this_unit.next_worker_id; for (int i = 0; i < nWorkers; ++i) { RoutingWorker* pWorker = this_unit.ppWorkers[i]; mxb_assert(pWorker); if (pWorker->execute(func, pSem, mode)) { ++n; } } return n; } // static size_t RoutingWorker::execute_serially(Task& task) { Semaphore sem; size_t n = 0; int nWorkers = this_unit.next_worker_id; for (int i = 0; i < nWorkers; ++i) { RoutingWorker* pWorker = this_unit.ppWorkers[i]; mxb_assert(pWorker); if (pWorker->execute(&task, &sem, EXECUTE_AUTO)) { sem.wait(); ++n; } } return n; } // static size_t RoutingWorker::execute_serially(std::function func) { Semaphore sem; size_t n = 0; int nWorkers = this_unit.next_worker_id; for (int i = 0; i < nWorkers; ++i) { RoutingWorker* pWorker = this_unit.ppWorkers[i]; mxb_assert(pWorker); if (pWorker->execute(func, &sem, EXECUTE_AUTO)) { sem.wait(); ++n; } } return n; } // static size_t RoutingWorker::execute_concurrently(Task& task) { Semaphore sem; return sem.wait_n(RoutingWorker::broadcast(&task, &sem)); } // static size_t RoutingWorker::execute_concurrently(std::function func) { Semaphore sem; return sem.wait_n(RoutingWorker::broadcast(func, &sem, EXECUTE_AUTO)); } // static size_t RoutingWorker::broadcast_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2) { // NOTE: No logging here, this function must be signal safe. size_t n = 0; int nWorkers = this_unit.next_worker_id; for (int i = 0; i < nWorkers; ++i) { Worker* pWorker = this_unit.ppWorkers[i]; mxb_assert(pWorker); if (pWorker->post_message(msg_id, arg1, arg2)) { ++n; } } return n; } // static void RoutingWorker::shutdown_all() { // NOTE: No logging here, this function must be signal safe. mxb_assert((this_unit.next_worker_id == 0) || (this_unit.ppWorkers != NULL)); int nWorkers = this_unit.next_worker_id; for (int i = 0; i < nWorkers; ++i) { RoutingWorker* pWorker = this_unit.ppWorkers[i]; mxb_assert(pWorker); pWorker->shutdown(); } } namespace { std::vector get_stats() { std::vector rval; int nWorkers = this_unit.next_worker_id; for (int i = 0; i < nWorkers; ++i) { RoutingWorker* pWorker = RoutingWorker::get(i); mxb_assert(pWorker); rval.push_back(pWorker->statistics()); } return rval; } } // static Worker::STATISTICS RoutingWorker::get_statistics() { auto s = get_stats(); STATISTICS cs; cs.n_read = mxs::sum(s, &STATISTICS::n_read); cs.n_write = mxs::sum(s, &STATISTICS::n_write); cs.n_error = mxs::sum(s, &STATISTICS::n_error); cs.n_hup = mxs::sum(s, &STATISTICS::n_hup); cs.n_accept = mxs::sum(s, &STATISTICS::n_accept); cs.n_polls = mxs::sum(s, &STATISTICS::n_polls); cs.n_pollev = mxs::sum(s, &STATISTICS::n_pollev); cs.evq_avg = mxs::avg(s, &STATISTICS::evq_avg); cs.evq_max = mxs::max(s, &STATISTICS::evq_max); cs.maxqtime = mxs::max(s, &STATISTICS::maxqtime); cs.maxexectime = mxs::max(s, &STATISTICS::maxexectime); cs.n_fds = mxs::sum_element(s, &STATISTICS::n_fds); cs.n_fds = mxs::min_element(s, &STATISTICS::n_fds); cs.n_fds = mxs::max_element(s, &STATISTICS::n_fds); cs.qtimes = mxs::avg_element(s, &STATISTICS::qtimes); cs.exectimes = mxs::avg_element(s, &STATISTICS::exectimes); return cs; } // static int64_t RoutingWorker::get_one_statistic(POLL_STAT what) { auto s = get_stats(); int64_t rv = 0; switch (what) { case POLL_STAT_READ: rv = mxs::sum(s, &STATISTICS::n_read); break; case POLL_STAT_WRITE: rv = mxs::sum(s, &STATISTICS::n_write); break; case POLL_STAT_ERROR: rv = mxs::sum(s, &STATISTICS::n_error); break; case POLL_STAT_HANGUP: rv = mxs::sum(s, &STATISTICS::n_hup); break; case POLL_STAT_ACCEPT: rv = mxs::sum(s, &STATISTICS::n_accept); break; case POLL_STAT_EVQ_AVG: rv = mxs::avg(s, &STATISTICS::evq_avg); break; case POLL_STAT_EVQ_MAX: rv = mxs::max(s, &STATISTICS::evq_max); break; case POLL_STAT_MAX_QTIME: rv = mxs::max(s, &STATISTICS::maxqtime); break; case POLL_STAT_MAX_EXECTIME: rv = mxs::max(s, &STATISTICS::maxexectime); break; default: mxb_assert(!true); } return rv; } // static bool RoutingWorker::get_qc_stats(int id, QC_CACHE_STATS* pStats) { class Task : public Worker::Task { public: Task(QC_CACHE_STATS* pStats) : m_stats(*pStats) { } void execute(Worker&) { qc_get_cache_stats(&m_stats); } private: QC_CACHE_STATS& m_stats; }; RoutingWorker* pWorker = RoutingWorker::get(id); if (pWorker) { Semaphore sem; Task task(pStats); pWorker->execute(&task, &sem, EXECUTE_AUTO); sem.wait(); } return pWorker != nullptr; } // static void RoutingWorker::get_qc_stats(std::vector& all_stats) { class Task : public Worker::Task { public: Task(std::vector* pAll_stats) : m_all_stats(*pAll_stats) { m_all_stats.resize(config_threadcount()); } void execute(Worker& worker) { int id = mxs::RoutingWorker::get_current_id(); mxb_assert(id >= 0); QC_CACHE_STATS& stats = m_all_stats[id]; qc_get_cache_stats(&stats); } private: std::vector& m_all_stats; }; Task task(&all_stats); mxs::RoutingWorker::execute_concurrently(task); } namespace { json_t* qc_stats_to_json(const char* zHost, int id, const QC_CACHE_STATS& stats) { json_t* pStats = json_object(); json_object_set_new(pStats, "size", json_integer(stats.size)); json_object_set_new(pStats, "inserts", json_integer(stats.inserts)); json_object_set_new(pStats, "hits", json_integer(stats.hits)); json_object_set_new(pStats, "misses", json_integer(stats.misses)); json_object_set_new(pStats, "evictions", json_integer(stats.evictions)); json_t* pAttributes = json_object(); json_object_set_new(pAttributes, "stats", pStats); json_t* pSelf = mxs_json_self_link(zHost, "qc_stats", std::to_string(id).c_str()); json_t* pJson = json_object(); json_object_set_new(pJson, CN_ID, json_string(std::to_string(id).c_str())); json_object_set_new(pJson, CN_TYPE, json_string("qc_stats")); json_object_set_new(pJson, CN_ATTRIBUTES, pAttributes); json_object_set_new(pJson, CN_LINKS, pSelf); return pJson; } } // static std::unique_ptr RoutingWorker::get_qc_stats_as_json(const char* zHost, int id) { std::unique_ptr sStats; QC_CACHE_STATS stats; if (get_qc_stats(id, &stats)) { json_t* pJson = qc_stats_to_json(zHost, id, stats); stringstream self; self << MXS_JSON_API_QC_STATS << id; sStats.reset(mxs_json_resource(zHost, self.str().c_str(), pJson)); } return sStats; } // static std::unique_ptr RoutingWorker::get_qc_stats_as_json(const char* zHost) { vector all_stats; get_qc_stats(all_stats); std::unique_ptr sAll_stats(json_array()); int id = 0; for (const auto& stats : all_stats) { json_t* pJson = qc_stats_to_json(zHost, id, stats); json_array_append_new(sAll_stats.get(), pJson); ++id; } return std::unique_ptr(mxs_json_resource(zHost, MXS_JSON_API_QC_STATS, sAll_stats.release())); } // static RoutingWorker* RoutingWorker::pick_worker() { static uint32_t id_generator = 0; int id = this_unit.id_min_worker + (mxb::atomic::add(&id_generator, 1, mxb::atomic::RELAXED) % this_unit.nWorkers); return get(id); } // static void maxscale::RoutingWorker::set_watchdog_interval(uint64_t microseconds) { // Do not call anything from here, assume nothing has been initialized (like logging). // The internal timeout is 2/3 of the systemd configured interval. double seconds = 1.0 * microseconds / 2000000; s_watchdog_interval = maxbase::Duration(seconds); s_watchdog_next_check = maxbase::Clock::now(); } void maxscale::RoutingWorker::start_watchdog_workaround() { if (m_pWatchdog_notifier) { m_pWatchdog_notifier->start(); } } void maxscale::RoutingWorker::stop_watchdog_workaround() { if (m_pWatchdog_notifier) { m_pWatchdog_notifier->stop(); } } // A note about the below code. While the main worker is turning the "m_alive" values to false, // it is a possibility that another RoutingWorker sees the old value of "s_watchdog_next_check" // but its new "m_alive==false" value, marks itself alive and promptly hangs. This would cause a // watchdog kill delay of about "s_watchdog_interval" time. // Release-acquire would fix that, but is an unneccesary expense. void RoutingWorker::check_systemd_watchdog() { if (s_watchdog_interval.count() == 0) // not turned on { return; } maxbase::TimePoint now = maxbase::Clock::now(); if (now > s_watchdog_next_check) { if (m_id == this_unit.id_main_worker) { m_alive.store(true, std::memory_order_relaxed); bool all_alive = std::all_of(this_unit.ppWorkers, this_unit.ppWorkers + this_unit.nWorkers, [](RoutingWorker* rw) { return rw->m_alive.load(std::memory_order_relaxed); }); if (all_alive) { s_watchdog_next_check = now + s_watchdog_interval; #ifdef HAVE_SYSTEMD MXS_DEBUG("systemd watchdog keep-alive ping: sd_notify(false, \"WATCHDOG=1\")"); sd_notify(false, "WATCHDOG=1"); #endif std::for_each(this_unit.ppWorkers, this_unit.ppWorkers + this_unit.nWorkers, [](RoutingWorker* rw) { rw->m_alive.store(false, std::memory_order_relaxed); }); } } else { if (m_alive.load(std::memory_order_relaxed) == false) { m_alive.store(true, std::memory_order_relaxed); } } } } } size_t mxs_rworker_broadcast_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2) { return RoutingWorker::broadcast_message(msg_id, arg1, arg2); } bool mxs_rworker_register_session(MXS_SESSION* session) { RoutingWorker* pWorker = RoutingWorker::get_current(); mxb_assert(pWorker); return pWorker->session_registry().add(session); } bool mxs_rworker_deregister_session(uint64_t id) { RoutingWorker* pWorker = RoutingWorker::get_current(); mxb_assert(pWorker); return pWorker->session_registry().remove(id); } MXS_SESSION* mxs_rworker_find_session(uint64_t id) { RoutingWorker* pWorker = RoutingWorker::get_current(); mxb_assert(pWorker); return pWorker->session_registry().lookup(id); } MXB_WORKER* mxs_rworker_get(int worker_id) { return RoutingWorker::get(worker_id); } MXB_WORKER* mxs_rworker_get_current() { return RoutingWorker::get_current(); } int mxs_rworker_get_current_id() { return RoutingWorker::get_current_id(); } namespace { using namespace maxscale; class WorkerInfoTask : public Worker::Task { public: WorkerInfoTask(const char* zHost, uint32_t nThreads) : m_zHost(zHost) { m_data.resize(nThreads); } void execute(Worker& worker) { RoutingWorker& rworker = static_cast(worker); json_t* pStats = json_object(); const Worker::STATISTICS& s = rworker.statistics(); json_object_set_new(pStats, "reads", json_integer(s.n_read)); json_object_set_new(pStats, "writes", json_integer(s.n_write)); json_object_set_new(pStats, "errors", json_integer(s.n_error)); json_object_set_new(pStats, "hangups", json_integer(s.n_hup)); json_object_set_new(pStats, "accepts", json_integer(s.n_accept)); json_object_set_new(pStats, "avg_event_queue_length", json_integer(s.evq_avg)); json_object_set_new(pStats, "max_event_queue_length", json_integer(s.evq_max)); json_object_set_new(pStats, "max_exec_time", json_integer(s.maxexectime)); json_object_set_new(pStats, "max_queue_time", json_integer(s.maxqtime)); uint32_t nCurrent; uint64_t nTotal; rworker.get_descriptor_counts(&nCurrent, &nTotal); json_object_set_new(pStats, "current_descriptors", json_integer(nCurrent)); json_object_set_new(pStats, "total_descriptors", json_integer(nTotal)); json_t* load = json_object(); json_object_set_new(load, "last_second", json_integer(rworker.load(Worker::Load::ONE_SECOND))); json_object_set_new(load, "last_minute", json_integer(rworker.load(Worker::Load::ONE_MINUTE))); json_object_set_new(load, "last_hour", json_integer(rworker.load(Worker::Load::ONE_HOUR))); json_object_set_new(pStats, "load", load); json_t* qc = qc_get_cache_stats_as_json(); if (qc) { json_object_set_new(pStats, "query_classifier_cache", qc); } json_t* pAttr = json_object(); json_object_set_new(pAttr, "stats", pStats); int idx = rworker.id(); stringstream ss; ss << idx; json_t* pJson = json_object(); json_object_set_new(pJson, CN_ID, json_string(ss.str().c_str())); json_object_set_new(pJson, CN_TYPE, json_string(CN_THREADS)); json_object_set_new(pJson, CN_ATTRIBUTES, pAttr); json_object_set_new(pJson, CN_LINKS, mxs_json_self_link(m_zHost, CN_THREADS, ss.str().c_str())); mxb_assert((size_t)idx < m_data.size()); m_data[idx] = pJson; } json_t* resource() { json_t* pArr = json_array(); for (auto it = m_data.begin(); it != m_data.end(); it++) { json_array_append_new(pArr, *it); } return mxs_json_resource(m_zHost, MXS_JSON_API_THREADS, pArr); } json_t* resource(int id) { stringstream self; self << MXS_JSON_API_THREADS << id; return mxs_json_resource(m_zHost, self.str().c_str(), m_data[id]); } private: vector m_data; const char* m_zHost; }; class FunctionTask : public Worker::DisposableTask { public: FunctionTask(std::function cb) : m_cb(cb) { } void execute(Worker& worker) { m_cb(); } protected: std::function m_cb; }; } size_t mxs_rworker_broadcast(void (* cb)(void* data), void* data) { std::unique_ptr task(new FunctionTask([cb, data]() { cb(data); })); return RoutingWorker::broadcast(std::move(task)); } uint64_t mxs_rworker_create_key() { return RoutingWorker::create_key(); } void mxs_rworker_set_data(uint64_t key, void* data, void (* callback)(void*)) { RoutingWorker::get_current()->set_data(key, data, callback); } void* mxs_rworker_get_data(uint64_t key) { return RoutingWorker::get_current()->get_data(key); } void mxs_rworker_delete_data(uint64_t key) { auto func = [key]() { RoutingWorker::get_current()->delete_data(key); }; std::unique_ptr task(new FunctionTask(func)); RoutingWorker::broadcast(std::move(task)); } json_t* mxs_rworker_to_json(const char* zHost, int id) { Worker* target = RoutingWorker::get(id); WorkerInfoTask task(zHost, id + 1); Semaphore sem; target->execute(&task, &sem, Worker::EXECUTE_AUTO); sem.wait(); return task.resource(id); } json_t* mxs_rworker_list_to_json(const char* host) { WorkerInfoTask task(host, config_threadcount()); RoutingWorker::execute_concurrently(task); return task.resource(); } namespace { class WatchdogTask : public Worker::Task { public: WatchdogTask() { } void execute(Worker& worker) { // Success if this is called. } }; } void mxs_rworker_watchdog() { MXS_INFO("MaxScale watchdog called."); WatchdogTask task; RoutingWorker::execute_concurrently(task); }