From bf7d3f75948372f214488bfc9d03ab30a97f3390 Mon Sep 17 00:00:00 2001 From: Johan Wikman Date: Wed, 18 Apr 2018 11:05:00 +0300 Subject: [PATCH] MXS-1754 Add Worker::Timer class Worker::Timer class and Worker::DelegatingTimer templates are timers built on top of timerfd_create(2). As such they consume descriptor and hence cannot be created independently for each timer need. Each Worker has now a private timer member variable on top of which a general timer mechanism will be provided. --- server/core/internal/worker.hh | 100 +++++++++++++++++- server/core/worker.cc | 179 +++++++++++++++++++++++++++++++-- 2 files changed, 269 insertions(+), 10 deletions(-) diff --git a/server/core/internal/worker.hh b/server/core/internal/worker.hh index ec273bc8e..5a0d7eed9 100644 --- a/server/core/internal/worker.hh +++ b/server/core/internal/worker.hh @@ -422,6 +422,59 @@ private: Average1 m_load_1_second; /*< The load during the last 1-second period. */ }; +/** + * WorkerTimer is a timer class built on top of timerfd_create(2), + * which means that each WorkerTimer instance will consume one file + * descriptor. The implication of that is that there should not be + * too many WorkerTimer instances. In order to be used, a WorkerTimer + * needs a Worker instance in whose context the timer is triggered. + */ +class WorkerTimer : private MXS_POLL_DATA +{ + WorkerTimer(const WorkerTimer&) = delete; + WorkerTimer& operator = (const WorkerTimer&) = delete; + +public: + ~WorkerTimer(); + + /** + * @brief Start the timer. + * + * @param internal The initial delay before the timer is + * triggered, and the subsequent interval + * between triggers. + * + * @attention A value of 0 means that the timer is cancelled. + */ + void start(uint64_t interval); + + /** + * @brief Cancel the timer. + */ + void cancel(); + +protected: + /** + * @brief Constructor + * + * @param pWorker The worker in whose context the timer is to run. + */ + WorkerTimer(Worker* pWorker); + + /** + * @brief Called when the timer is triggered. + */ + virtual void tick() = 0; + +private: + uint32_t handle(int wid, uint32_t events); + + static uint32_t handler(MXS_POLL_DATA* pThis, int wid, uint32_t events); + +private: + int m_fd; /**< The timerfd descriptor. */ + Worker* m_pWorker; /**< The worker in whose context the timer runs. */ +}; class Worker : public MXS_WORKER , private MessageQueue::Handler @@ -434,6 +487,46 @@ public: typedef WorkerTask Task; typedef WorkerDisposableTask DisposableTask; typedef WorkerLoad Load; + typedef WorkerTimer Timer; + + /** + * A delegating timer that delegates the timer tick handling + * to another object. + */ + template + class DelegatingTimer : public Timer + { + DelegatingTimer(const DelegatingTimer&) = delete; + DelegatingTimer& operator = (const DelegatingTimer&) = delete; + + public: + typedef void (T::*PMethod)(); + + /** + * @brief Constructor + * + * @param pWorker The worker in whose context the timer runs. + * @param pDelegatee The object to whom the timer tick is delivered. + * @param pMethod The method to call on @c pDelegatee when the + * timer is triggered. + */ + DelegatingTimer(Worker* pWorker, T* pDelegatee, PMethod pMethod) + : Timer(pWorker) + , m_pDelegatee(pDelegatee) + , m_pMethod(pMethod) + { + } + + private: + void tick() /* final */ + { + (m_pDelegatee->*m_pMethod)(); + } + + private: + T* m_pDelegatee; + PMethod m_pMethod; + }; enum state_t { @@ -847,7 +940,11 @@ private: void poll_waitevents(); + void tick(); + private: + typedef DelegatingTimer PrivateTimer; + STATISTICS m_statistics; /*< Worker statistics. */ MessageQueue* m_pQueue; /*< The message queue of the worker. */ THREAD m_thread; /*< The thread handle of the worker. */ @@ -856,7 +953,8 @@ private: bool m_shutdown_initiated; /*< Whether shutdown has been initated. */ uint32_t m_nCurrent_descriptors; /*< Current number of descriptors. */ uint64_t m_nTotal_descriptors; /*< Total number of descriptors. */ - Load m_load; + Load m_load; /*< The worker load. */ + PrivateTimer m_timer; /*< The worker's own timer. */ }; } diff --git a/server/core/worker.cc b/server/core/worker.cc index 67326454f..6473bb736 100644 --- a/server/core/worker.cc +++ b/server/core/worker.cc @@ -20,6 +20,7 @@ #include #include #include +#include #include #include @@ -141,9 +142,168 @@ uint64_t WorkerLoad::get_time() return t.tv_sec * 1000 + (t.tv_nsec / 1000000); } +namespace +{ + +int create_timerfd() +{ + int fd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK); + + if (fd == -1) + { + if (errno == EINVAL) + { + // Ok, we may be running on an old kernel, let's try again but without flags. + fd = timerfd_create(CLOCK_MONOTONIC, 0); + + if (fd != -1) + { + int flags = fcntl(fd, F_GETFL, 0); + if (flags != -1) + { + flags |= O_NONBLOCK; + if (fcntl(fd, F_SETFL, flags) == -1) + { + MXS_ALERT("Could not make timer fd non-blocking, MaxScale will not work: %s", + mxs_strerror(errno)); + close(fd); + fd = -1; + ss_dassert(!true); + } + } + else + { + MXS_ALERT("Could not get timer fd flags, MaxScale will not work: %s", + mxs_strerror(errno)); + close(fd); + fd = -1; + ss_dassert(!true); + } + } + else + { + MXS_ALERT("Could not create timer file descriptor even with no flags, MaxScale " + "will not work: %s", mxs_strerror(errno)); + ss_dassert(!true); + } + } + else + { + MXS_ALERT("Could not create timer file descriptor, MaxScale will not work: %s", + mxs_strerror(errno)); + ss_dassert(!true); + } + } + + return fd; +} + +} + +WorkerTimer::WorkerTimer(Worker* pWorker) + : m_fd(create_timerfd()) + , m_pWorker(pWorker) +{ + MXS_POLL_DATA::handler = handler; + MXS_POLL_DATA::thread.id = m_pWorker->id(); + + if (m_fd != -1) + { + if (!m_pWorker->add_fd(m_fd, EPOLLIN, this)) + { + MXS_ALERT("Could not add timer descriptor to worker, MaxScale will not work."); + ::close(m_fd); + m_fd = -1; + ss_dassert(!true); + } + } +} + +WorkerTimer::~WorkerTimer() +{ + if (m_fd != -1) + { + if (!m_pWorker->remove_fd(m_fd)) + { + MXS_ERROR("Could not remove timer fd from worker."); + } + + ::close(m_fd); + } +} + +void WorkerTimer::start(uint64_t interval) +{ + // TODO: Add possibility to set initial delay and interval. + time_t initial_sec = interval / 1000; + long initial_nsec = (interval - initial_sec * 1000) * 1000; + + time_t interval_sec = (interval / 1000); + long interval_nsec = (interval - interval_sec * 1000) * 1000; + + struct itimerspec time; + + time.it_value.tv_sec = initial_sec; + time.it_value.tv_nsec = initial_nsec; + time.it_interval.tv_sec = interval_sec; + time.it_interval.tv_nsec = interval_nsec; + + if (timerfd_settime(m_fd, 0, &time, NULL) != 0) + { + MXS_ERROR("Could not set timer settings."); + } +} + +void WorkerTimer::cancel() +{ + start(0); +} + +uint32_t WorkerTimer::handle(int wid, uint32_t events) +{ + ss_dassert(wid == m_pWorker->id()); + ss_dassert(events & EPOLLIN); + ss_dassert((events & ~EPOLLIN) == 0); + + // Read all events + uint64_t expirations; + while (read(m_fd, &expirations, sizeof(expirations)) == 0) + { + } + + tick(); + + return MXS_POLL_READ; +} + +//static +uint32_t WorkerTimer::handler(MXS_POLL_DATA* pThis, int wid, uint32_t events) +{ + return static_cast(pThis)->handle(wid, events); +} + +namespace +{ + +int create_epoll_instance() +{ + int fd = ::epoll_create(MAX_EVENTS); + + if (fd == -1) + { + MXS_ALERT("Could not create epoll-instance for worker, MaxScale will not work: %s", + mxs_strerror(errno)); + ss_dassert(!true); + } + + return fd; +} + +} + Worker::Worker() : m_id(next_worker_id()) - , m_epoll_fd(epoll_create(MAX_EVENTS)) + , m_epoll_fd(create_epoll_instance()) , m_state(STOPPED) , m_pQueue(NULL) , m_thread(0) @@ -152,6 +312,7 @@ Worker::Worker() , m_shutdown_initiated(false) , m_nCurrent_descriptors(0) , m_nTotal_descriptors(0) + , m_timer(this, this, &Worker::tick) { if (m_epoll_fd != -1) { @@ -161,22 +322,16 @@ Worker::Worker() { if (!m_pQueue->add_to_worker(this)) { - MXS_ALERT("Could not add message queue to worker. MaxScale will not work."); + MXS_ALERT("Could not add message queue to worker, MaxScale will not work."); ss_dassert(!true); } } else { - MXS_ALERT("Could not create message queue for worker. MaxScale will not work."); + MXS_ALERT("Could not create message queue for worker, MaxScale will not work."); ss_dassert(!true); } } - else - { - MXS_ALERT("Could not create epoll-instance for worker: %s. MaxScale will not work.", - mxs_strerror(errno)); - ss_dassert(!true); - } this_unit.ppWorkers[m_id] = this; } @@ -953,6 +1108,12 @@ void Worker::poll_waitevents() m_state = STOPPED; } +void Worker::tick() +{ + // TODO: Add timer management here once function for adding delayed calls + // TODO: to Worker has been added. +} + }