diff --git a/server/core/internal/worker.hh b/server/core/internal/worker.hh index fa7daf063..1273f85ca 100644 --- a/server/core/internal/worker.hh +++ b/server/core/internal/worker.hh @@ -60,6 +60,369 @@ struct WORKER_STATISTICS int64_t maxexectime; }; +/** + * WorkerLoad is a class that calculates the load percentage of a worker + * thread, based upon the relative amount of time the worker spends in + * epoll_wait(). + * + * If during a time period of length T milliseconds, the worker thread + * spends t milliseconds in epoll_wait(), then the load of the worker is + * calculated as 100 * ((T - t) / T). That is, if the worker spends all + * the time in epoll_wait(), then the load is 0 and if the worker spends + * no time waiting in epoll_wait(), then the load is 100. + */ +class WorkerLoad +{ + WorkerLoad(const WorkerLoad&); + WorkerLoad& operator = (const WorkerLoad&); + +public: + enum counter_t + { + TEN_SECONDS = 10 * 1000, + ONE_MINUTE = 6 * TEN_SECONDS, + ONE_HOUR = 60 * ONE_MINUTE, + }; + + enum + { + GRANULARITY = TEN_SECONDS + }; + + /** + * Constructor + */ + WorkerLoad(); + + /** + * Reset the load calculation. Should be called immediately before the + * worker enters its eternal epoll_wait()-loop. + */ + void reset() + { + uint64_t now = get_time(); + + m_start_time = now; + m_wait_start = 0; + m_wait_time = 0; + } + + /** + * To be used for signaling that the worker is about to call epoll_wait(). + * + * @param now The current time. + */ + void about_to_wait(uint64_t now) + { + m_wait_start = now; + } + + void about_to_wait() + { + about_to_wait(get_time()); + } + + /** + * To be used for signaling that the worker has returned from epoll_wait(). + * + * @param now The current time. + */ + void about_to_work(uint64_t now); + + void about_to_work() + { + about_to_work(get_time()); + } + + /** + * Returns the last calculated load, + * + * @return A value between 0 and 100. + */ + uint8_t percentage(counter_t counter) const + { + switch (counter) + { + case TEN_SECONDS: + return m_load_10_seconds.value(); + + case ONE_MINUTE: + return m_load_1_minute.value(); + + case ONE_HOUR: + return m_load_1_hour.value(); + + default: + ss_dassert(!true); + return 0; + }; + } + + /** + * When was the last 10 second period started. + * + * @return The start time. + */ + uint64_t start_time() const + { + return m_start_time; + } + + /** + * Returns the current time using CLOCK_MONOTONIC. + * + * @return Current time in milliseconds. + */ + static uint64_t get_time(); + +private: + /** + * Average is a base class for classes intended to be used for calculating + * averages. An Average may have a dependant Average whose value depends + * upon the value of the first. At certain moments, an Average may trigger + * its dependant Average to update itself. + */ + class Average + { + Average(const Average&); + Average& operator = (const Average&); + + public: + /** + * Constructor + * + * @param pDependant An optional dependant average. + */ + Average(Average* pDependant = NULL) + : m_pDependant(pDependant) + , m_value(0) + {} + + virtual ~Average(); + + /** + * Add a value to the Average. The exact meaning depends upon the + * concrete Average class. + * + * If the addition of the value in some sense represents a full cycle + * in the average calculation, then the instance will call add_value() + * on its dependant, otherwise it will call update_value(). In both cases + * with its own value as argument. + * + * @param value The value to be added. + * + * @return True if the addition of the value caused a full cycle + * in the average calculation, false otherwise. + */ + virtual bool add_value(uint8_t value) = 0; + + /** + * Update the value of the Average. The exact meaning depends upon the + * concrete Average class. Will also call update_value() of its dependant + * with its own value as argument. + * + * @param value The value to be updated. + */ + virtual void update_value(uint8_t value) = 0; + + /** + * Return the average value. + * + * @return The value represented by the Average. + */ + uint8_t value() const + { + return atomic_load_uint32(&m_value); + } + + protected: + Average* m_pDependant; /*< The optional dependant Average. */ + uint32_t m_value; /*< The current average value. */ + + protected: + void set_value(uint32_t value) + { + atomic_store_uint32(&m_value, value); + } + }; + + /** + * An Average consisting of a single value. + */ + class Average1 : public Average + { + public: + Average1(Average* pDependant = NULL) + : Average(pDependant) + { + } + + bool add_value(uint8_t value) + { + set_value(value); + + // Every addition of a value represents a full cycle. + if (m_pDependant) + { + m_pDependant->add_value(value); + } + + return true; + } + + void update_value(uint8_t value) + { + set_value(value); + + if (m_pDependant) + { + m_pDependant->update_value(value); + } + } + }; + + /** + * An Average calculated from N values. + */ + template + class AverageN : public Average + { + public: + AverageN(Average* pDependant = NULL) + : Average(pDependant) + , m_end(m_begin + N) + , m_i(m_begin) + , m_sum(0) + , m_nValues(0) + { + } + + bool add_value(uint8_t value) + { + if (m_nValues == N) + { + // If as many values that fit has been added, then remove the + // least recent value from the sum. + m_sum -= *m_i; + } + else + { + // Otherwise make a note that a new value is added. + ++m_nValues; + } + + *m_i = value; + m_sum += *m_i; // Update the sum of all values. + + m_i = next(m_i); + + uint32_t average = m_sum / m_nValues; + + set_value(average); + + if (m_pDependant) + { + if (m_i == m_begin) + { + // If we have looped around we have performed a full cycle and will + // add a new value to the dependant average. + m_pDependant->add_value(average); + } + else + { + // Otherwise we just update the most recent value. + m_pDependant->update_value(average); + } + } + + return m_i == m_begin; + } + + void update_value(uint8_t value) + { + if (m_nValues == 0) + { + // If no values have been added yet, there's nothing to update but we + // need to add the value. + add_value(value); + } + else + { + // Otherwise we update the most recent value. + uint8_t* p = prev(m_i); + + m_sum -= *p; + *p = value; + m_sum += *p; + + uint32_t average = m_sum / m_nValues; + + set_value(average); + + if (m_pDependant) + { + m_pDependant->update_value(average); + } + } + } + + private: + uint8_t* prev(uint8_t* p) + { + ss_dassert(p >= m_begin); + ss_dassert(p < m_end); + + if (p > m_begin) + { + --p; + } + else + { + ss_dassert(p == m_begin); + p = m_end - 1; + } + + ss_dassert(p >= m_begin); + ss_dassert(p < m_end); + + return p; + } + + uint8_t* next(uint8_t* p) + { + ss_dassert(p >= m_begin); + ss_dassert(p < m_end); + + ++p; + + if (p == m_end) + { + p = m_begin; + } + + ss_dassert(p >= m_begin); + ss_dassert(p < m_end); + + return p; + } + + private: + uint8_t m_begin[N]; /*< Buffer containing values from which the average is calculated. */ + uint8_t* m_end; /*< Points to one past the end of the buffer. */ + uint8_t* m_i; /*< Current position in the buffer. */ + uint32_t m_sum; /*< Sum of all values in the buffer. */ + uint32_t m_nValues; /*< How many values the buffer contains. */ + }; + + uint64_t m_start_time; /*< When was a new 10-second period started. */ + uint64_t m_wait_start; /*< The time when the worker entered epoll_wait(). */ + uint64_t m_wait_time; /*< How much time the worker has spent in epoll_wait(). */ + AverageN<60> m_load_1_hour; /*< The average load during the last hour. */ + AverageN<6> m_load_1_minute; /*< The average load during the last minute. */ + Average1 m_load_10_seconds; /*< The load during the last 10-second period. */ +}; + + class Worker : public MXS_WORKER , private MessageQueue::Handler , private MXS_POLL_DATA @@ -73,6 +436,7 @@ public: typedef WorkerDisposableTask DisposableTask; typedef Registry SessionsById; typedef std::vector Zombies; + typedef WorkerLoad Load; enum state_t { @@ -118,6 +482,11 @@ public: return m_id; } + int load(Load::counter_t counter) + { + return m_load.percentage(counter); + } + /** * Returns the state of the worker. * @@ -533,6 +902,7 @@ private: Zombies m_zombies; /*< DCBs to be deleted. */ uint32_t m_nCurrent_descriptors; /*< Current number of descriptors. */ uint64_t m_nTotal_descriptors; /*< Total number of descriptors. */ + Load m_load; }; } diff --git a/server/core/poll.cc b/server/core/poll.cc index b40cae50a..478761e4d 100644 --- a/server/core/poll.cc +++ b/server/core/poll.cc @@ -240,8 +240,8 @@ dShowThreads(DCB *dcb) { dcb_printf(dcb, "Polling Threads.\n\n"); - dcb_printf(dcb, " ID | State | #descriptors (curr) | #descriptors (tot) |\n"); - dcb_printf(dcb, "----+------------+---------------------+---------------------+\n"); + dcb_printf(dcb, " ID | State | #descriptors (curr) | #descriptors (tot) | Load (10s) | Load (1m) | Load (1h) |\n"); + dcb_printf(dcb, "----+------------+---------------------+---------------------+------------+-----------+-----------+\n"); for (int i = 0; i < n_threads; i++) { Worker* worker = Worker::get(i); @@ -276,7 +276,11 @@ dShowThreads(DCB *dcb) worker->get_descriptor_counts(&nCurrent, &nTotal); - dcb_printf(dcb, " %2d | %10s | %19" PRIu32 " | %19" PRIu64 " |\n", i, state, nCurrent, nTotal); + dcb_printf(dcb, " %2d | %10s | %19" PRIu32 " | %19" PRIu64 " | %10d | %9d | %9d |\n", + i, state, nCurrent, nTotal, + worker->load(Worker::Load::TEN_SECONDS), + worker->load(Worker::Load::ONE_MINUTE), + worker->load(Worker::Load::ONE_HOUR)); } } diff --git a/server/core/worker.cc b/server/core/worker.cc index b9a7b8a25..14396a508 100644 --- a/server/core/worker.cc +++ b/server/core/worker.cc @@ -41,6 +41,7 @@ #define WORKER_ABSENT_ID -1 using maxscale::Worker; +using maxscale::WorkerLoad; using maxscale::Closer; using maxscale::Semaphore; using std::vector; @@ -154,6 +155,49 @@ void poll_resolve_error(int fd, int errornum, int op) static bool modules_thread_init(); static void modules_thread_finish(); +WorkerLoad::WorkerLoad() + : m_start_time(0) + , m_wait_start(0) + , m_wait_time(0) + , m_load_1_minute(&m_load_1_hour) + , m_load_10_seconds(&m_load_1_minute) +{ +} + +void WorkerLoad::about_to_work(uint64_t now) +{ + uint64_t duration = now - m_start_time; + + m_wait_time += (now - m_wait_start); + + if (duration > TEN_SECONDS) + { + int load_percentage = 100 * ((duration - m_wait_time) / (double)duration); + + m_start_time = now; + m_wait_time = 0; + + m_load_10_seconds.add_value(load_percentage); + } +} + +WorkerLoad::Average::~Average() +{ +} + +//static +uint64_t WorkerLoad::get_time() +{ + uint64_t now; + + timespec t; + + ss_debug(int rv=)clock_gettime(CLOCK_MONOTONIC, &t); + ss_dassert(rv == 0); + + return t.tv_sec * 1000 + (t.tv_nsec / 1000000); +} + Worker::Worker(int id, int epoll_fd) : m_id(id) @@ -1112,13 +1156,31 @@ void Worker::poll_waitevents() m_state = IDLE; + m_load.reset(); + while (!should_shutdown()) { + int nfds; + m_state = POLLING; atomic_add_int64(&m_statistics.n_polls, 1); - int nfds; - if ((nfds = epoll_wait(m_epoll_fd, events, MAX_EVENTS, -1)) == -1) + + uint64_t now = Load::get_time(); + int timeout = Load::GRANULARITY - (now - m_load.start_time()); + + if (timeout < 0) + { + // If the processing of the last batch of events took us past the next + // time boundary, we ensure we return immediately. + timeout = 0; + } + + m_load.about_to_wait(now); + nfds = epoll_wait(m_epoll_fd, events, MAX_EVENTS, timeout); + m_load.about_to_work(); + + if (nfds == -1) { int eno = errno; errno = 0;