diff --git a/server/core/internal/worker.hh b/server/core/internal/worker.hh index 8f596e007..74cf628a1 100644 --- a/server/core/internal/worker.hh +++ b/server/core/internal/worker.hh @@ -544,6 +544,15 @@ public: EXECUTE_QUEUED /**< Only queue tasks for execution */ }; + struct Call + { + enum action_t + { + EXECUTE, /**< Execute the call */ + CANCEL /**< Cancel the call */ + }; + }; + /** * Initialize the worker mechanism. * @@ -900,61 +909,157 @@ public: * Push a function for delayed execution. * * @param delay The delay in milliseconds. + * @param tag A tag identifying this and possibly other delayed calls. * @param pFunction The function to call. * - * @attention When invoked, if the provided function returns true, then it will - * be called again after @c delay milliseconds. + * @attention When invoked, if @c action is @c Worker::Call::EXECUTE, the + * function should perform the delayed call and return @true, if + * the function should be called again. If the function returns + * @c false, it will not be called again. + * + * If @c action is @c Worker::Call::CANCEL, then the function + * should perform whatever canceling actions are needed. In that + * case the return value is ignored and the function will not + * be called again. */ - void delayed_call(uint32_t delay, bool (*pFunction)()) + void delayed_call(uint32_t delay, + intptr_t tag, + bool (*pFunction)(Worker::Call::action_t action)) { - add_delayed_call(new DelayedCallFunctionVoid(delay, pFunction)); + add_delayed_call(new DelayedCallFunctionVoid(delay, tag, pFunction)); + } + + void delayed_call(uint32_t delay, + void* tag, + bool (*pFunction)(Worker::Call::action_t action)) + { + return delayed_call(delay, reinterpret_cast(tag), pFunction); } /** * Push a function for delayed execution. * * @param delay The delay in milliseconds. + * @param tag A tag identifying this and possibly other delayed calls. * @param pFunction The function to call. * @param data The data to be provided to the function when invoked. * - * @attention When invoked, if the provided function returns true, then it will - * be called again after @c delay milliseconds. + * @attention When invoked, if @c action is @c Worker::Call::EXECUTE, the + * function should perform the delayed call and return @true, if + * the function should be called again. If the function returns + * @c false, it will not be called again. + * + * If @c action is @c Worker::Call::CANCEL, then the function + * should perform whatever canceling actions are needed. In that + * case the return value is ignored and the function will not + * be called again. */ template - void delayed_call(uint32_t delay, bool (*pFunction)(D data), D data) + void delayed_call(uint32_t delay, + intptr_t tag, + bool (*pFunction)(Worker::Call::action_t action, D data), D data) { - add_delayed_call(new DelayedCallFunction(delay, pFunction, data)); + add_delayed_call(new DelayedCallFunction(delay, tag, pFunction, data)); + } + + template + void delayed_call(uint32_t delay, + void* pTag, + bool (*pFunction)(Worker::Call::action_t action, D data), D data) + { + return delayed_call(delay, reinterpret_cast(pTag), pFunction); } /** * Push a member function for delayed execution. * * @param delay The delay in milliseconds. + * @param pTag A tag identifying this and possibly other delayed calls. * @param pMethod The member function to call. * - * @attention When invoked, if the provided function returns true, then it will - * be called again after @c delay milliseconds. + * @attention When invoked, if @c action is @c Worker::Call::EXECUTE, the + * function should perform the delayed call and return @true, if + * the function should be called again. If the function returns + * @c false, it will not be called again. + * + * If @c action is @c Worker::Call::CANCEL, then the function + * should perform whatever canceling actions are needed. In that + * case the return value is ignored and the function will not + * be called again. */ template - void delayed_call(uint32_t delay, T* pT, bool (T::*pMethod)()) + void delayed_call(uint32_t delay, + intptr_t tag, + T* pT, + bool (T::*pMethod)(Worker::Call::action_t action)) { - add_delayed_call(new DelayedCallMethodVoid(delay, pT, pMethod)); + add_delayed_call(new DelayedCallMethodVoid(delay, tag, pT, pMethod)); + } + + template + void delayed_call(uint32_t delay, + void* pTag, + T* pT, + bool (T::*pMethod)(Worker::Call::action_t action)) + { + return delayed_call(delay, reinterpret_cast(pTag), pT, pMethod); } /** * Push a member function for delayed execution. * * @param delay The delay in milliseconds. + * @param tag A tag identifying this and possibly other delayed calls. * @param pMethod The member function to call. * @param data The data to be provided to the function when invoked. * - * @attention When invoked, if the provided function returns true, then it will - * be called again after @c delay milliseconds. + * @attention When invoked, if @c action is @c Worker::Call::EXECUTE, the + * function should perform the delayed call and return @true, if + * the function should be called again. If the function returns + * @c false, it will not be called again. + * + * If @c action is @c Worker::Call::CANCEL, then the function + * should perform whatever canceling actions are needed. In that + * case the return value is ignored and the function will not + * be called again. */ template - void delayed_call(uint32_t delay, T* pT, bool (T::*pMethod)(D data), D data) + void delayed_call(uint32_t delay, + intptr_t tag, + T* pT, + bool (T::*pMethod)(Worker::Call::action_t action, D data), D data) { - add_delayed_call(new DelayedCallMethod(delay, pT, pMethod, data)); + add_delayed_call(new DelayedCallMethod(delay, tag, pT, pMethod, data)); + } + + template + void delayed_call(uint32_t delay, + void* pTag, + T* pT, + bool (T::*pMethod)(Worker::Call::action_t action, D data), D data) + { + return delayed_call(delay, reinterpret_cast(pTag), pT, pMethod); + } + + /** + * Cancel delayed calls. + * + * When this function is called, all current scheduled delayed calls, that + * were scheduled using the specified tag, will be called *synchronously* with + * the @c action argument being @c Worker::Call::CANCEL. That is, when this + * function returns, all function have been canceled. + * + * @attention If any of the called function schedules a new delayed call using + * the same tag, then they will *not* be canceled as a result of + * this call. + * + * @param tag The tag that was used when a delayed call was scheduled. + */ + int32_t cancel_delayed_calls(intptr_t tag); + + int32_t cancel_delayed_calls(void* pTag) + { + return cancel_delayed_calls(reinterpret_cast(pTag)); } protected: @@ -1009,14 +1114,19 @@ private: return m_delay; } + intptr_t tag() const + { + return m_tag; + } + uint64_t at() const { return m_at; } - bool call() + bool call(Worker::Call::action_t action) { - bool rv = do_call(); + bool rv = do_call(action); // We try to invoke the function as often as it was specified. If the // delay is very short and the execution time for the function very long, // then we will not succeed with that and the function will simply be @@ -1026,13 +1136,14 @@ private: } protected: - DelayedCall(uint32_t delay) + DelayedCall(uint32_t delay, intptr_t tag) : m_delay(delay) + , m_tag(tag) , m_at(get_at(delay)) { } - virtual bool do_call() = 0; + virtual bool do_call(Worker::Call::action_t action) = 0; private: static uint64_t get_at(uint32_t delay) @@ -1046,6 +1157,7 @@ private: private: uint32_t m_delay; // The delay in milliseconds. + intptr_t m_tag; // Tag identifying the delayed call. uint64_t m_at; // The next time the function should be invoked. }; @@ -1056,21 +1168,23 @@ private: DelayedCallFunction& operator = (const DelayedCallFunction&) = delete; public: - DelayedCallFunction(uint32_t delay, bool (*pFunction)(D data), D data) - : DelayedCall(delay) + DelayedCallFunction(uint32_t delay, + void* pTag, + bool (*pFunction)(Worker::Call::action_t action, D data), D data) + : DelayedCall(delay, pTag) , m_pFunction(pFunction) , m_data(data) { } private: - bool do_call() + bool do_call(Worker::Call::action_t action) { - return m_pFunction(m_data); + return m_pFunction(action, m_data); } private: - bool (*m_pFunction)(D); + bool (*m_pFunction)(Worker::Call::action_t, D); D m_data; }; @@ -1081,20 +1195,22 @@ private: DelayedCallFunctionVoid& operator = (const DelayedCallFunctionVoid&) = delete; public: - DelayedCallFunctionVoid(uint32_t delay, bool (*pFunction)()) - : DelayedCall(delay) + DelayedCallFunctionVoid(uint32_t delay, + intptr_t tag, + bool (*pFunction)(Worker::Call::action_t action)) + : DelayedCall(delay, tag) , m_pFunction(pFunction) { } private: - bool do_call() + bool do_call(Worker::Call::action_t action) { - return m_pFunction(); + return m_pFunction(action); } private: - bool (*m_pFunction)(); + bool (*m_pFunction)(Worker::Call::action_t action); }; template @@ -1104,8 +1220,11 @@ private: DelayedCallMethod& operator = (const DelayedCallMethod&) = delete; public: - DelayedCallMethod(uint32_t delay, T* pT, bool (T::*pMethod)(D data), D data) - : DelayedCall(delay) + DelayedCallMethod(uint32_t delay, + intptr_t tag, + T* pT, + bool (T::*pMethod)(Worker::Call::action_t action, D data), D data) + : DelayedCall(delay, tag) , m_pT(pT) , m_pMethod(pMethod) , m_data(data) @@ -1113,14 +1232,14 @@ private: } private: - bool do_call() + bool do_call(Worker::Call::action_t action) { - return (m_pT->*m_pMethod)(m_data); + return (m_pT->*m_pMethod)(action, m_data); } private: T* m_pT; - bool (T::*m_pMethod)(D); + bool (T::*m_pMethod)(Worker::Call::action_t, D); D m_data; }; @@ -1131,22 +1250,25 @@ private: DelayedCallMethodVoid& operator = (const DelayedCallMethodVoid&) = delete; public: - DelayedCallMethodVoid(uint32_t delay, T* pT, bool (T::*pMethod)()) - : DelayedCall(delay) + DelayedCallMethodVoid(uint32_t delay, + intptr_t tag, + T* pT, + bool (T::*pMethod)(Worker::Call::action_t)) + : DelayedCall(delay, tag) , m_pT(pT) , m_pMethod(pMethod) { } private: - bool do_call() + bool do_call(Worker::Call::action_t action) { - return (m_pT->*m_pMethod)(); + return (m_pT->*m_pMethod)(action); } private: T* m_pT; - bool (T::*m_pMethod)(); + bool (T::*m_pMethod)(Worker::Call::action_t); }; void add_delayed_call(DelayedCall* pDelayed_call); diff --git a/server/core/test/test_worker.cc b/server/core/test/test_worker.cc index f00bcf720..b124fe8c0 100644 --- a/server/core/test/test_worker.cc +++ b/server/core/test/test_worker.cc @@ -78,8 +78,10 @@ public: return m_delay; } - bool tick() + bool tick(mxs::Worker::Call::action_t action) { + ss_dassert(action == mxs::Worker::Call::EXECUTE); + int64_t now = get_monotonic_time_ms(); int64_t diff = abs(now - m_at); @@ -127,11 +129,11 @@ int run() TimerTest t4(&rv, 500); TimerTest t5(&rv, 600); - w.delayed_call(t1.delay(), &t1, &TimerTest::tick); - w.delayed_call(t2.delay(), &t2, &TimerTest::tick); - w.delayed_call(t3.delay(), &t3, &TimerTest::tick); - w.delayed_call(t4.delay(), &t4, &TimerTest::tick); - w.delayed_call(t5.delay(), &t5, &TimerTest::tick); + w.delayed_call(t1.delay(), NULL, &t1, &TimerTest::tick); + w.delayed_call(t2.delay(), NULL, &t2, &TimerTest::tick); + w.delayed_call(t3.delay(), NULL, &t3, &TimerTest::tick); + w.delayed_call(t4.delay(), NULL, &t4, &TimerTest::tick); + w.delayed_call(t5.delay(), NULL, &t5, &TimerTest::tick); w.run(); diff --git a/server/core/worker.cc b/server/core/worker.cc index 1c2b75e96..e53fb9bbe 100644 --- a/server/core/worker.cc +++ b/server/core/worker.cc @@ -1138,7 +1138,7 @@ void Worker::tick() pDelayed_call = m_delayed_calls.top(); m_delayed_calls.pop(); - if (pDelayed_call->call()) + if (pDelayed_call->call(Worker::Call::EXECUTE)) { repeating_calls.push_back(pDelayed_call); } @@ -1203,6 +1203,13 @@ void Worker::adjust_timer() } } +int32_t Worker::cancel_delayed_calls(intptr_t tag) +{ + // TODO: Implement + ss_dassert(!true); + return 0; +} + }