From 84b2156508700e43ed5eb59169634ae86519eb79 Mon Sep 17 00:00:00 2001 From: Johan Wikman Date: Thu, 19 Apr 2018 15:18:05 +0300 Subject: [PATCH] MXS-1754 Add delayed calling to Worker It's now possible to provide Worker with a function to call at a later time. It's possible to provide a function or a member function (with the object), taking zero or one argument of any kind. The argument must be copyable. There's currently no way to cancel a call, which must be added as typically the delayed calling is associated with a session and if the session is closed before the delayed call is made, bad things are likely to happen. --- server/core/internal/worker.hh | 240 +++++++++++++++++++++++++++++++- server/core/test/CMakeLists.txt | 3 + server/core/test/test_worker.cc | 159 +++++++++++++++++++++ server/core/worker.cc | 99 ++++++++++++- 4 files changed, 492 insertions(+), 9 deletions(-) create mode 100644 server/core/test/test_worker.cc diff --git a/server/core/internal/worker.hh b/server/core/internal/worker.hh index 5a0d7eed9..8f596e007 100644 --- a/server/core/internal/worker.hh +++ b/server/core/internal/worker.hh @@ -14,6 +14,7 @@ #include #include +#include #include #include #include @@ -435,13 +436,13 @@ class WorkerTimer : private MXS_POLL_DATA WorkerTimer& operator = (const WorkerTimer&) = delete; public: - ~WorkerTimer(); + virtual ~WorkerTimer(); /** * @brief Start the timer. * - * @param internal The initial delay before the timer is - * triggered, and the subsequent interval + * @param interval The initial delay in milliseconds before the + * timer is triggered, and the subsequent interval * between triggers. * * @attention A value of 0 means that the timer is cancelled. @@ -895,6 +896,67 @@ public: */ static int get_current_id(); + /** + * Push a function for delayed execution. + * + * @param delay The delay in milliseconds. + * @param pFunction The function to call. + * + * @attention When invoked, if the provided function returns true, then it will + * be called again after @c delay milliseconds. + */ + void delayed_call(uint32_t delay, bool (*pFunction)()) + { + add_delayed_call(new DelayedCallFunctionVoid(delay, pFunction)); + } + + /** + * Push a function for delayed execution. + * + * @param delay The delay in milliseconds. + * @param pFunction The function to call. + * @param data The data to be provided to the function when invoked. + * + * @attention When invoked, if the provided function returns true, then it will + * be called again after @c delay milliseconds. + */ + template + void delayed_call(uint32_t delay, bool (*pFunction)(D data), D data) + { + add_delayed_call(new DelayedCallFunction(delay, pFunction, data)); + } + + /** + * Push a member function for delayed execution. + * + * @param delay The delay in milliseconds. + * @param pMethod The member function to call. + * + * @attention When invoked, if the provided function returns true, then it will + * be called again after @c delay milliseconds. + */ + template + void delayed_call(uint32_t delay, T* pT, bool (T::*pMethod)()) + { + add_delayed_call(new DelayedCallMethodVoid(delay, pT, pMethod)); + } + + /** + * Push a member function for delayed execution. + * + * @param delay The delay in milliseconds. + * @param pMethod The member function to call. + * @param data The data to be provided to the function when invoked. + * + * @attention When invoked, if the provided function returns true, then it will + * be called again after @c delay milliseconds. + */ + template + void delayed_call(uint32_t delay, T* pT, bool (T::*pMethod)(D data), D data) + { + add_delayed_call(new DelayedCallMethod(delay, pT, pMethod, data)); + } + protected: Worker(); virtual ~Worker(); @@ -932,6 +994,164 @@ protected: state_t m_state; /*< The state of the worker */ private: + class DelayedCall + { + DelayedCall(const DelayedCall&) = delete;; + DelayedCall& operator = (const DelayedCall&) = delete; + + public: + virtual ~DelayedCall() + { + } + + uint32_t delay() const + { + return m_delay; + } + + uint64_t at() const + { + return m_at; + } + + bool call() + { + bool rv = do_call(); + // We try to invoke the function as often as it was specified. If the + // delay is very short and the execution time for the function very long, + // then we will not succeed with that and the function will simply be + // invoked as frequently as possible. + m_at += m_delay; + return rv; + } + + protected: + DelayedCall(uint32_t delay) + : m_delay(delay) + , m_at(get_at(delay)) + { + } + + virtual bool do_call() = 0; + + private: + static uint64_t get_at(uint32_t delay) + { + struct timespec ts; + ss_debug(int rv =) clock_gettime(CLOCK_MONOTONIC, &ts); + ss_dassert(rv == 0); + + return delay + (ts.tv_sec * 1000 + ts.tv_nsec / 1000000); + } + + private: + uint32_t m_delay; // The delay in milliseconds. + uint64_t m_at; // The next time the function should be invoked. + }; + + template + class DelayedCallFunction : public DelayedCall + { + DelayedCallFunction(const DelayedCallFunction&) = delete; + DelayedCallFunction& operator = (const DelayedCallFunction&) = delete; + + public: + DelayedCallFunction(uint32_t delay, bool (*pFunction)(D data), D data) + : DelayedCall(delay) + , m_pFunction(pFunction) + , m_data(data) + { + } + + private: + bool do_call() + { + return m_pFunction(m_data); + } + + private: + bool (*m_pFunction)(D); + D m_data; + }; + + // Explicit specialization requires namespace scope + class DelayedCallFunctionVoid : public DelayedCall + { + DelayedCallFunctionVoid(const DelayedCallFunctionVoid&) = delete; + DelayedCallFunctionVoid& operator = (const DelayedCallFunctionVoid&) = delete; + + public: + DelayedCallFunctionVoid(uint32_t delay, bool (*pFunction)()) + : DelayedCall(delay) + , m_pFunction(pFunction) + { + } + + private: + bool do_call() + { + return m_pFunction(); + } + + private: + bool (*m_pFunction)(); + }; + + template + class DelayedCallMethod : public DelayedCall + { + DelayedCallMethod(const DelayedCallMethod&) = delete; + DelayedCallMethod& operator = (const DelayedCallMethod&) = delete; + + public: + DelayedCallMethod(uint32_t delay, T* pT, bool (T::*pMethod)(D data), D data) + : DelayedCall(delay) + , m_pT(pT) + , m_pMethod(pMethod) + , m_data(data) + { + } + + private: + bool do_call() + { + return (m_pT->*m_pMethod)(m_data); + } + + private: + T* m_pT; + bool (T::*m_pMethod)(D); + D m_data; + }; + + template + class DelayedCallMethodVoid : public DelayedCall + { + DelayedCallMethodVoid(const DelayedCallMethodVoid&) = delete; + DelayedCallMethodVoid& operator = (const DelayedCallMethodVoid&) = delete; + + public: + DelayedCallMethodVoid(uint32_t delay, T* pT, bool (T::*pMethod)()) + : DelayedCall(delay) + , m_pT(pT) + , m_pMethod(pMethod) + { + } + + private: + bool do_call() + { + return (m_pT->*m_pMethod)(); + } + + private: + T* m_pT; + bool (T::*m_pMethod)(); + }; + + void add_delayed_call(DelayedCall* pDelayed_call); + void adjust_timer(); + bool post_disposable(DisposableTask* pTask, enum execute_mode_t mode = EXECUTE_AUTO); void handle_message(MessageQueue& queue, const MessageQueue::Message& msg); // override @@ -943,7 +1163,17 @@ private: void tick(); private: + class LaterAt : public std::binary_function + { + public: + bool operator () (const DelayedCall* pLhs, const DelayedCall* pRhs) + { + return pLhs->at() > pRhs->at(); + } + }; + typedef DelegatingTimer PrivateTimer; + typedef std::priority_queue, LaterAt> DelayedCalls; STATISTICS m_statistics; /*< Worker statistics. */ MessageQueue* m_pQueue; /*< The message queue of the worker. */ @@ -954,7 +1184,9 @@ private: uint32_t m_nCurrent_descriptors; /*< Current number of descriptors. */ uint64_t m_nTotal_descriptors; /*< Total number of descriptors. */ Load m_load; /*< The worker load. */ - PrivateTimer m_timer; /*< The worker's own timer. */ + PrivateTimer* m_pTimer; /*< The worker's own timer. */ + DelayedCalls m_delayed_calls; /*< Current delayed calls. */ + uint64_t m_last_delayed_call; /*< When was the last delayed call made. */ }; } diff --git a/server/core/test/CMakeLists.txt b/server/core/test/CMakeLists.txt index e5d7ae549..30ce55d58 100644 --- a/server/core/test/CMakeLists.txt +++ b/server/core/test/CMakeLists.txt @@ -27,6 +27,7 @@ add_executable(test_trxtracking test_trxtracking.cc) add_executable(test_users test_users.cc) add_executable(test_utils test_utils.cc) add_executable(test_session_track test_session_track.cc) +add_executable(test_worker test_worker.cc) target_link_libraries(profile_trxboundaryparser maxscale-common) target_link_libraries(test_adminusers maxscale-common) @@ -57,6 +58,7 @@ target_link_libraries(test_trxtracking maxscale-common) target_link_libraries(test_users maxscale-common) target_link_libraries(test_utils maxscale-common) target_link_libraries(test_session_track mysqlcommon) +target_link_libraries(test_worker maxscale-common) add_test(test_adminusers test_adminusers) add_test(test_atomic test_atomic) @@ -93,6 +95,7 @@ add_test(test_trxtracking test_trxtracking) add_test(test_users test_users) add_test(test_utils test_utils) add_test(test_session_track test_session_track) +add_test(test_worker test_worker) add_subdirectory(rest-api) add_subdirectory(canonical_tests) diff --git a/server/core/test/test_worker.cc b/server/core/test/test_worker.cc new file mode 100644 index 000000000..f00bcf720 --- /dev/null +++ b/server/core/test/test_worker.cc @@ -0,0 +1,159 @@ +/* + * Copyright (c) 2016 MariaDB Corporation Ab + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file and at www.mariadb.com/bsl11. + * + * Change Date: 2020-01-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2 or later of the General + * Public License. + */ + +#include +#include "../internal/worker.hh" + +using namespace std; + +namespace +{ + +// TODO: Put this in some common place. +int64_t get_monotonic_time_ms() +{ + struct timespec ts; + ss_debug(int rv =) clock_gettime(CLOCK_MONOTONIC, &ts); + ss_dassert(rv == 0); + + return ts.tv_sec * 1000 + ts.tv_nsec / 1000000; +} + +class TestWorker : public maxscale::Worker +{ + TestWorker(const TestWorker&); + TestWorker& operator = (const TestWorker&); + +public: + TestWorker() + { + } + + ~TestWorker() + { + } + +private: + // TODO: Perhaps these could have default implementations, so that + // TODO: Worker could be used as such. + bool pre_run() // override + { + return true; + } + + void post_run() // override + { + } + + void epoll_tick() // override + { + } +}; + +class TimerTest +{ +public: + static int s_ticks; + + TimerTest(int* pRv, int32_t delay) + : m_id(s_id++) + , m_delay(delay) + , m_at(get_monotonic_time_ms() + delay) + , m_rv(*pRv) + { + } + + int32_t delay() const + { + return m_delay; + } + + bool tick() + { + int64_t now = get_monotonic_time_ms(); + int64_t diff = abs(now - m_at); + + cout << m_id << ": " << diff << endl; + + if (diff > 50) + { + cout << "Error: Difference between expected and happened > 50: " << diff << endl; + m_rv = EXIT_FAILURE; + } + + m_at += m_delay; + + if (--s_ticks < 0) + { + maxscale::Worker::shutdown_all(); + } + + return true; + } + +private: + static int s_id; + + int m_id; + int32_t m_delay; + int64_t m_at; + int& m_rv; +}; + +int TimerTest::s_id = 1; +int TimerTest::s_ticks; + +int run() +{ + int rv = EXIT_SUCCESS; + + TimerTest::s_ticks = 100; + + TestWorker w; + + TimerTest t1(&rv, 200); + TimerTest t2(&rv, 300); + TimerTest t3(&rv, 400); + TimerTest t4(&rv, 500); + TimerTest t5(&rv, 600); + + w.delayed_call(t1.delay(), &t1, &TimerTest::tick); + w.delayed_call(t2.delay(), &t2, &TimerTest::tick); + w.delayed_call(t3.delay(), &t3, &TimerTest::tick); + w.delayed_call(t4.delay(), &t4, &TimerTest::tick); + w.delayed_call(t5.delay(), &t5, &TimerTest::tick); + + w.run(); + + return EXIT_SUCCESS; +} + +} + +int main() +{ + int rv = EXIT_FAILURE; + + if (mxs_log_init(NULL, NULL, MXS_LOG_TARGET_STDOUT)) + { + poll_init(); + maxscale::MessageQueue::init(); + maxscale::Worker::init(); + + rv = run(); + + mxs_log_finish(); + } + + return rv; +} diff --git a/server/core/worker.cc b/server/core/worker.cc index 6473bb736..1c2b75e96 100644 --- a/server/core/worker.cc +++ b/server/core/worker.cc @@ -236,10 +236,10 @@ void WorkerTimer::start(uint64_t interval) { // TODO: Add possibility to set initial delay and interval. time_t initial_sec = interval / 1000; - long initial_nsec = (interval - initial_sec * 1000) * 1000; + long initial_nsec = (interval - initial_sec * 1000) * 1000000; time_t interval_sec = (interval / 1000); - long interval_nsec = (interval - interval_sec * 1000) * 1000; + long interval_nsec = (interval - interval_sec * 1000) * 1000000; struct itimerspec time; @@ -312,7 +312,7 @@ Worker::Worker() , m_shutdown_initiated(false) , m_nCurrent_descriptors(0) , m_nTotal_descriptors(0) - , m_timer(this, this, &Worker::tick) + , m_pTimer(new PrivateTimer(this, this, &Worker::tick)) { if (m_epoll_fd != -1) { @@ -342,6 +342,7 @@ Worker::~Worker() ss_dassert(!m_started); + delete m_pTimer; delete m_pQueue; close(m_epoll_fd); } @@ -1108,10 +1109,98 @@ void Worker::poll_waitevents() m_state = STOPPED; } +namespace +{ + +uint64_t get_current_time_ms() +{ + struct timespec ts; + ss_debug(int rv =) clock_gettime(CLOCK_MONOTONIC, &ts); + ss_dassert(rv == 0); + + return ts.tv_sec * 1000 + ts.tv_nsec / 1000000; +} + +} + void Worker::tick() { - // TODO: Add timer management here once function for adding delayed calls - // TODO: to Worker has been added. + uint64_t now = get_current_time_ms(); + + ss_dassert(!m_delayed_calls.empty()); + + vector repeating_calls; + + DelayedCall* pDelayed_call; + + while (!m_delayed_calls.empty() && (m_delayed_calls.top()->at() <= now)) + { + pDelayed_call = m_delayed_calls.top(); + m_delayed_calls.pop(); + + if (pDelayed_call->call()) + { + repeating_calls.push_back(pDelayed_call); + } + else + { + delete pDelayed_call; + } + } + + for (auto i = repeating_calls.begin(); i != repeating_calls.end(); ++i) + { + m_delayed_calls.push(*i); + } + + adjust_timer(); +} + +void Worker::add_delayed_call(DelayedCall* pDelayed_call) +{ + bool adjust = true; + + if (!m_delayed_calls.empty()) + { + DelayedCall* pTop = m_delayed_calls.top(); + + if (pDelayed_call->at() < pTop->at()) + { + // If the added delayed call needs to be called sooner + // than the top-most delayed call, then we must adjust + // the timer. + adjust = true; + } + } + + m_delayed_calls.push(pDelayed_call); + + if (adjust) + { + adjust_timer(); + } +} + +void Worker::adjust_timer() +{ + if (!m_delayed_calls.empty()) + { + DelayedCall* pNext_call = m_delayed_calls.top(); + + uint64_t now = get_current_time_ms(); + int64_t delay = pNext_call->at() - now; + + if (delay <= 0) + { + delay = 1; + } + + m_pTimer->start(delay); + } + else + { + m_pTimer->cancel(); + } } }