diff --git a/server/core/internal/worker.hh b/server/core/internal/worker.hh index 452914f4f..1963d3bb4 100644 --- a/server/core/internal/worker.hh +++ b/server/core/internal/worker.hh @@ -14,6 +14,7 @@ #include #include +#include #include #include #include @@ -1296,20 +1297,23 @@ private: } }; - typedef DelegatingTimer PrivateTimer; - typedef std::multimap DelayedCalls; + typedef DelegatingTimer PrivateTimer; + typedef std::tr1::unordered_set DelayedCalls; + typedef std::multimap DelayedCallsByTime; + typedef std::tr1::unordered_map DelayedCallsByTag; - STATISTICS m_statistics; /*< Worker statistics. */ - MessageQueue* m_pQueue; /*< The message queue of the worker. */ - THREAD m_thread; /*< The thread handle of the worker. */ - bool m_started; /*< Whether the thread has been started or not. */ - bool m_should_shutdown; /*< Whether shutdown should be performed. */ - bool m_shutdown_initiated; /*< Whether shutdown has been initated. */ - uint32_t m_nCurrent_descriptors; /*< Current number of descriptors. */ - uint64_t m_nTotal_descriptors; /*< Total number of descriptors. */ - Load m_load; /*< The worker load. */ - PrivateTimer* m_pTimer; /*< The worker's own timer. */ - DelayedCalls m_delayed_calls; /*< Current delayed calls. */ + STATISTICS m_statistics; /*< Worker statistics. */ + MessageQueue* m_pQueue; /*< The message queue of the worker. */ + THREAD m_thread; /*< The thread handle of the worker. */ + bool m_started; /*< Whether the thread has been started or not. */ + bool m_should_shutdown; /*< Whether shutdown should be performed. */ + bool m_shutdown_initiated; /*< Whether shutdown has been initated. */ + uint32_t m_nCurrent_descriptors; /*< Current number of descriptors. */ + uint64_t m_nTotal_descriptors; /*< Total number of descriptors. */ + Load m_load; /*< The worker load. */ + PrivateTimer* m_pTimer; /*< The worker's own timer. */ + DelayedCallsByTime m_delayed_calls; /*< Current delayed calls ordered by time. */ + DelayedCallsByTag m_tagged_calls; /*< Current delayed calls ordered by tag. */ }; } diff --git a/server/core/worker.cc b/server/core/worker.cc index 748827aba..f1f994657 100644 --- a/server/core/worker.cc +++ b/server/core/worker.cc @@ -1141,12 +1141,22 @@ void Worker::tick() m_delayed_calls.erase(i++); + ss_dassert(m_tagged_calls.find(pDelayed_call->tag()) != m_tagged_calls.end()); + + DelayedCalls& delayed_calls = m_tagged_calls[pDelayed_call->tag()]; + + auto j = delayed_calls.find(pDelayed_call); + ss_dassert(j != delayed_calls.end()); + if (pDelayed_call->call(Worker::Call::EXECUTE)) { repeating_calls.push_back(pDelayed_call); } else { + // If the call is not repeated, we remove it from the set + // associated with the tag. + delayed_calls.erase(j); delete pDelayed_call; } } @@ -1178,8 +1188,15 @@ void Worker::add_delayed_call(DelayedCall* pDelayed_call) } } + // Insert the delayed call into the map ordered by invocation time. m_delayed_calls.insert(std::make_pair(pDelayed_call->at(), pDelayed_call)); + // Insert the delayed call into the set associated with the tag of + // the delayed call. + DelayedCalls& delayed_calls = m_tagged_calls[pDelayed_call->tag()]; + ss_dassert(delayed_calls.find(pDelayed_call) == delayed_calls.end()); + delayed_calls.insert(pDelayed_call); + if (adjust) { adjust_timer(); @@ -1210,9 +1227,58 @@ void Worker::adjust_timer() int32_t Worker::cancel_delayed_calls(intptr_t tag) { - // TODO: Implement - ss_dassert(!true); - return 0; + int n = 0; + + auto i = m_tagged_calls.find(tag); + + if (i != m_tagged_calls.end()) + { + // All delayed calls associated with the provided tag. + DelayedCalls& delayed_calls = i->second; + ss_debug(int nDelayed_calls = delayed_calls.size()); + + for (auto j = delayed_calls.begin(); j != delayed_calls.end(); ++j) + { + DelayedCall* pDelayed_call = *j; + + // All delayed calls with exactly the same trigger time. + // Not particularly likely there will be many of those. + auto range = m_delayed_calls.equal_range(pDelayed_call->at()); + + auto k = range.first; + ss_dassert(k != range.second); + ss_debug(bool found = false); + + while (k != range.second) + { + if (k->second == pDelayed_call) + { + m_delayed_calls.erase(k); + delete pDelayed_call; + ++n; + + k = range.second; + + ss_debug(found = true); + } + else + { + ++k; + } + } + + ss_dassert(found); + } + + ss_dassert(nDelayed_calls == n); + } + else + { + ss_dassert(!true); + MXS_WARNING("Attempt to remove calls associated with non-existing tag."); + } + + return n; } }