diff --git a/server/core/internal/worker.hh b/server/core/internal/worker.hh index 1963d3bb4..61c60f128 100644 --- a/server/core/internal/worker.hh +++ b/server/core/internal/worker.hh @@ -909,9 +909,11 @@ public: * Push a function for delayed execution. * * @param delay The delay in milliseconds. - * @param tag A tag identifying this and possibly other delayed calls. * @param pFunction The function to call. * + * @return A unique identifier for the delayed call. Using that identifier + * the call can be cancelled. + * * @attention When invoked, if @c action is @c Worker::Call::EXECUTE, the * function should perform the delayed call and return @true, if * the function should be called again. If the function returns @@ -922,28 +924,22 @@ public: * case the return value is ignored and the function will not * be called again. */ - void delayed_call(int32_t delay, - intptr_t tag, - bool (*pFunction)(Worker::Call::action_t action)) + uint32_t delayed_call(int32_t delay, + bool (*pFunction)(Worker::Call::action_t action)) { - add_delayed_call(new DelayedCallFunctionVoid(delay, tag, pFunction)); - } - - void delayed_call(int32_t delay, - void* tag, - bool (*pFunction)(Worker::Call::action_t action)) - { - return delayed_call(delay, reinterpret_cast(tag), pFunction); + return add_delayed_call(new DelayedCallFunctionVoid(delay, pFunction)); } /** * Push a function for delayed execution. * * @param delay The delay in milliseconds. - * @param tag A tag identifying this and possibly other delayed calls. * @param pFunction The function to call. * @param data The data to be provided to the function when invoked. * + * @return A unique identifier for the delayed call. Using that identifier + * the call can be cancelled. + * * @attention When invoked, if @c action is @c Worker::Call::EXECUTE, the * function should perform the delayed call and return @true, if * the function should be called again. If the function returns @@ -955,28 +951,21 @@ public: * be called again. */ template - void delayed_call(int32_t delay, - intptr_t tag, - bool (*pFunction)(Worker::Call::action_t action, D data), D data) + uint32_t delayed_call(int32_t delay, + bool (*pFunction)(Worker::Call::action_t action, D data), D data) { - add_delayed_call(new DelayedCallFunction(delay, tag, pFunction, data)); - } - - template - void delayed_call(int32_t delay, - void* pTag, - bool (*pFunction)(Worker::Call::action_t action, D data), D data) - { - return delayed_call(delay, reinterpret_cast(pTag), pFunction); + return add_delayed_call(new DelayedCallFunction(delay, pFunction, data)); } /** * Push a member function for delayed execution. * * @param delay The delay in milliseconds. - * @param pTag A tag identifying this and possibly other delayed calls. * @param pMethod The member function to call. * + * @return A unique identifier for the delayed call. Using that identifier + * the call can be cancelled. + * * @attention When invoked, if @c action is @c Worker::Call::EXECUTE, the * function should perform the delayed call and return @true, if * the function should be called again. If the function returns @@ -988,31 +977,23 @@ public: * be called again. */ template - void delayed_call(int32_t delay, - intptr_t tag, - T* pT, - bool (T::*pMethod)(Worker::Call::action_t action)) + uint32_t delayed_call(int32_t delay, + T* pT, + bool (T::*pMethod)(Worker::Call::action_t action)) { - add_delayed_call(new DelayedCallMethodVoid(delay, tag, pT, pMethod)); - } - - template - void delayed_call(int32_t delay, - void* pTag, - T* pT, - bool (T::*pMethod)(Worker::Call::action_t action)) - { - return delayed_call(delay, reinterpret_cast(pTag), pT, pMethod); + return add_delayed_call(new DelayedCallMethodVoid(delay, pT, pMethod)); } /** * Push a member function for delayed execution. * * @param delay The delay in milliseconds. - * @param tag A tag identifying this and possibly other delayed calls. * @param pMethod The member function to call. * @param data The data to be provided to the function when invoked. * + * @return A unique identifier for the delayed call. Using that identifier + * the call can be cancelled. + * * @attention When invoked, if @c action is @c Worker::Call::EXECUTE, the * function should perform the delayed call and return @true, if * the function should be called again. If the function returns @@ -1024,43 +1005,25 @@ public: * be called again. */ template - void delayed_call(int32_t delay, - intptr_t tag, - T* pT, - bool (T::*pMethod)(Worker::Call::action_t action, D data), D data) + uint32_t delayed_call(int32_t delay, + T* pT, + bool (T::*pMethod)(Worker::Call::action_t action, D data), D data) { - add_delayed_call(new DelayedCallMethod(delay, tag, pT, pMethod, data)); - } - - template - void delayed_call(int32_t delay, - void* pTag, - T* pT, - bool (T::*pMethod)(Worker::Call::action_t action, D data), D data) - { - return delayed_call(delay, reinterpret_cast(pTag), pT, pMethod); + return add_delayed_call(new DelayedCallMethod(delay, pT, pMethod, data)); } /** - * Cancel delayed calls. + * Cancel delayed call. * - * When this function is called, all current scheduled delayed calls, that - * were scheduled using the specified tag, will be called *synchronously* with - * the @c action argument being @c Worker::Call::CANCEL. That is, when this - * function returns, all function have been canceled. + * When this function is called, the delayed call in question will be called + * *synchronously* with the @c action argument being @c Worker::Call::CANCEL. + * That is, when this function returns, the function has been canceled. * - * @attention If any of the called function schedules a new delayed call using - * the same tag, then they will *not* be canceled as a result of - * this call. + * @param id The id that was returned when the delayed call was scheduled. * - * @param tag The tag that was used when a delayed call was scheduled. + * @return True, if the id represented an existing delayed call. */ - int32_t cancel_delayed_calls(intptr_t tag); - - int32_t cancel_delayed_calls(void* pTag) - { - return cancel_delayed_calls(reinterpret_cast(pTag)); - } + bool cancel_delayed_call(uint32_t id); protected: Worker(); @@ -1099,6 +1062,17 @@ protected: state_t m_state; /*< The state of the worker */ private: + class DelayedCall; + friend class DelayedCall; + + static uint32_t next_delayed_call_id() + { + // Called in single-thread context. Wrapping does not matter + // as it is unlikely there would be 4 billion pending delayed + // calls. + return ++s_next_delayed_call_id; + } + class DelayedCall { DelayedCall(const DelayedCall&) = delete;; @@ -1114,9 +1088,9 @@ private: return m_delay; } - intptr_t tag() const + uint32_t id() const { - return m_tag; + return m_id; } int64_t at() const @@ -1136,9 +1110,9 @@ private: } protected: - DelayedCall(int32_t delay, intptr_t tag) - : m_delay(delay) - , m_tag(tag) + DelayedCall(int32_t delay) + : m_id(Worker::next_delayed_call_id()) + , m_delay(delay) , m_at(get_at(delay)) { ss_dassert(delay > 0); @@ -1159,9 +1133,9 @@ private: } private: - int32_t m_delay; // The delay in milliseconds. - intptr_t m_tag; // Tag identifying the delayed call. - int64_t m_at; // The next time the function should be invoked. + uint32_t m_id; // The id of the delayed call. + int32_t m_delay; // The delay in milliseconds. + int64_t m_at; // The next time the function should be invoked. }; template @@ -1172,9 +1146,8 @@ private: public: DelayedCallFunction(int32_t delay, - void* pTag, bool (*pFunction)(Worker::Call::action_t action, D data), D data) - : DelayedCall(delay, pTag) + : DelayedCall(delay) , m_pFunction(pFunction) , m_data(data) { @@ -1199,9 +1172,8 @@ private: public: DelayedCallFunctionVoid(int32_t delay, - intptr_t tag, bool (*pFunction)(Worker::Call::action_t action)) - : DelayedCall(delay, tag) + : DelayedCall(delay) , m_pFunction(pFunction) { } @@ -1224,10 +1196,9 @@ private: public: DelayedCallMethod(int32_t delay, - intptr_t tag, T* pT, bool (T::*pMethod)(Worker::Call::action_t action, D data), D data) - : DelayedCall(delay, tag) + : DelayedCall(delay) , m_pT(pT) , m_pMethod(pMethod) , m_data(data) @@ -1254,10 +1225,9 @@ private: public: DelayedCallMethodVoid(int32_t delay, - intptr_t tag, T* pT, bool (T::*pMethod)(Worker::Call::action_t)) - : DelayedCall(delay, tag) + : DelayedCall(delay) , m_pT(pT) , m_pMethod(pMethod) { @@ -1274,7 +1244,7 @@ private: bool (T::*m_pMethod)(Worker::Call::action_t); }; - void add_delayed_call(DelayedCall* pDelayed_call); + uint32_t add_delayed_call(DelayedCall* pDelayed_call); void adjust_timer(); bool post_disposable(DisposableTask* pTask, enum execute_mode_t mode = EXECUTE_AUTO); @@ -1298,9 +1268,8 @@ private: }; typedef DelegatingTimer PrivateTimer; - typedef std::tr1::unordered_set DelayedCalls; typedef std::multimap DelayedCallsByTime; - typedef std::tr1::unordered_map DelayedCallsByTag; + typedef std::tr1::unordered_map DelayedCallsById; STATISTICS m_statistics; /*< Worker statistics. */ MessageQueue* m_pQueue; /*< The message queue of the worker. */ @@ -1312,8 +1281,10 @@ private: uint64_t m_nTotal_descriptors; /*< Total number of descriptors. */ Load m_load; /*< The worker load. */ PrivateTimer* m_pTimer; /*< The worker's own timer. */ - DelayedCallsByTime m_delayed_calls; /*< Current delayed calls ordered by time. */ - DelayedCallsByTag m_tagged_calls; /*< Current delayed calls ordered by tag. */ + DelayedCallsByTime m_sorted_calls; /*< Current delayed calls sorted by time. */ + DelayedCallsById m_calls; /*< Current delayed calls indexed by id. */ + + static uint32_t s_next_delayed_call_id; /*< The next delayed call id. */ }; } diff --git a/server/core/test/test_worker.cc b/server/core/test/test_worker.cc index b124fe8c0..202e9dc7d 100644 --- a/server/core/test/test_worker.cc +++ b/server/core/test/test_worker.cc @@ -129,11 +129,11 @@ int run() TimerTest t4(&rv, 500); TimerTest t5(&rv, 600); - w.delayed_call(t1.delay(), NULL, &t1, &TimerTest::tick); - w.delayed_call(t2.delay(), NULL, &t2, &TimerTest::tick); - w.delayed_call(t3.delay(), NULL, &t3, &TimerTest::tick); - w.delayed_call(t4.delay(), NULL, &t4, &TimerTest::tick); - w.delayed_call(t5.delay(), NULL, &t5, &TimerTest::tick); + w.delayed_call(t1.delay(), &t1, &TimerTest::tick); + w.delayed_call(t2.delay(), &t2, &TimerTest::tick); + w.delayed_call(t3.delay(), &t3, &TimerTest::tick); + w.delayed_call(t4.delay(), &t4, &TimerTest::tick); + w.delayed_call(t5.delay(), &t5, &TimerTest::tick); w.run(); diff --git a/server/core/worker.cc b/server/core/worker.cc index f1f994657..eec722868 100644 --- a/server/core/worker.cc +++ b/server/core/worker.cc @@ -303,6 +303,9 @@ int create_epoll_instance() } +//static +uint32_t Worker::s_next_delayed_call_id = 1; + Worker::Worker() : m_id(next_worker_id()) , m_epoll_fd(create_epoll_instance()) @@ -1129,57 +1132,55 @@ void Worker::tick() { int64_t now = get_current_time_ms(); - ss_dassert(!m_delayed_calls.empty()); - vector repeating_calls; - auto i = m_delayed_calls.begin(); + auto i = m_sorted_calls.begin(); - while ((i != m_delayed_calls.end()) && (i->first <= now)) + // i->first is the time when the first call should be invoked. + while (!m_sorted_calls.empty() && (i->first <= now)) { - DelayedCall* pDelayed_call = i->second; + DelayedCall* pCall = i->second; - m_delayed_calls.erase(i++); + auto j = m_calls.find(pCall->id()); + ss_dassert(j != m_calls.end()); - ss_dassert(m_tagged_calls.find(pDelayed_call->tag()) != m_tagged_calls.end()); + m_sorted_calls.erase(i); + m_calls.erase(j); - DelayedCalls& delayed_calls = m_tagged_calls[pDelayed_call->tag()]; - - auto j = delayed_calls.find(pDelayed_call); - ss_dassert(j != delayed_calls.end()); - - if (pDelayed_call->call(Worker::Call::EXECUTE)) + if (pCall->call(Worker::Call::EXECUTE)) { - repeating_calls.push_back(pDelayed_call); + repeating_calls.push_back(pCall); } else { - // If the call is not repeated, we remove it from the set - // associated with the tag. - delayed_calls.erase(j); - delete pDelayed_call; + delete pCall; } + + // NOTE: Must be reassigned, ++i will not work in case a delayed + // NOTE: call cancels another delayed call. + i = m_sorted_calls.begin(); } for (auto i = repeating_calls.begin(); i != repeating_calls.end(); ++i) { DelayedCall* pCall = *i; - m_delayed_calls.insert(std::make_pair(pCall->at(), pCall)); + m_sorted_calls.insert(std::make_pair(pCall->at(), pCall)); + m_calls.insert(std::make_pair(pCall->id(), pCall)); } adjust_timer(); } -void Worker::add_delayed_call(DelayedCall* pDelayed_call) +uint32_t Worker::add_delayed_call(DelayedCall* pCall) { bool adjust = true; - if (!m_delayed_calls.empty()) + if (!m_sorted_calls.empty()) { - DelayedCall* pFirst = m_delayed_calls.begin()->second; + DelayedCall* pFirst = m_sorted_calls.begin()->second; - if (pDelayed_call->at() > pFirst->at()) + if (pCall->at() > pFirst->at()) { // If the added delayed call needs to be called later // than the first delayed call, then we do not need to @@ -1189,28 +1190,28 @@ void Worker::add_delayed_call(DelayedCall* pDelayed_call) } // Insert the delayed call into the map ordered by invocation time. - m_delayed_calls.insert(std::make_pair(pDelayed_call->at(), pDelayed_call)); + m_sorted_calls.insert(std::make_pair(pCall->at(), pCall)); - // Insert the delayed call into the set associated with the tag of - // the delayed call. - DelayedCalls& delayed_calls = m_tagged_calls[pDelayed_call->tag()]; - ss_dassert(delayed_calls.find(pDelayed_call) == delayed_calls.end()); - delayed_calls.insert(pDelayed_call); + // Insert the delayed call into the map indexed by id. + ss_dassert(m_calls.find(pCall->id()) == m_calls.end()); + m_calls.insert(std::make_pair(pCall->id(), pCall)); if (adjust) { adjust_timer(); } + + return pCall->id(); } void Worker::adjust_timer() { - if (!m_delayed_calls.empty()) + if (!m_sorted_calls.empty()) { - DelayedCall* pNext_call = m_delayed_calls.begin()->second; + DelayedCall* pCall = m_sorted_calls.begin()->second; uint64_t now = get_current_time_ms(); - int64_t delay = pNext_call->at() - now; + int64_t delay = pCall->at() - now; if (delay <= 0) { @@ -1225,60 +1226,50 @@ void Worker::adjust_timer() } } -int32_t Worker::cancel_delayed_calls(intptr_t tag) +bool Worker::cancel_delayed_call(uint32_t id) { - int n = 0; + bool found = false; - auto i = m_tagged_calls.find(tag); + auto i = m_calls.find(id); - if (i != m_tagged_calls.end()) + if (i != m_calls.end()) { - // All delayed calls associated with the provided tag. - DelayedCalls& delayed_calls = i->second; - ss_debug(int nDelayed_calls = delayed_calls.size()); + DelayedCall* pCall = i->second; + m_calls.erase(i); - for (auto j = delayed_calls.begin(); j != delayed_calls.end(); ++j) + // All delayed calls with exactly the same trigger time. + // Not particularly likely there will be many of those. + auto range = m_sorted_calls.equal_range(pCall->at()); + + auto k = range.first; + ss_dassert(k != range.second); + + while (k != range.second) { - DelayedCall* pDelayed_call = *j; - - // All delayed calls with exactly the same trigger time. - // Not particularly likely there will be many of those. - auto range = m_delayed_calls.equal_range(pDelayed_call->at()); - - auto k = range.first; - ss_dassert(k != range.second); - ss_debug(bool found = false); - - while (k != range.second) + if (k->second == pCall) { - if (k->second == pDelayed_call) - { - m_delayed_calls.erase(k); - delete pDelayed_call; - ++n; + m_sorted_calls.erase(k); + delete pCall; - k = range.second; + k = range.second; - ss_debug(found = true); - } - else - { - ++k; - } + found = true; + } + else + { + ++k; } - - ss_dassert(found); } - ss_dassert(nDelayed_calls == n); + ss_dassert(found); } else { ss_dassert(!true); - MXS_WARNING("Attempt to remove calls associated with non-existing tag."); + MXS_WARNING("Attempt to remove a delayed call, associated with non-existing id."); } - return n; + return found; } }