MXS-1754 Implement delayed call cancellation
There's now double bookkeeping: - All delayed calls are in a map whose key is the next invocation time. Since it's a map (and not an unordered_map) it's sorted just the way we want to have it. - In addition, there's an unordered set for each tag. With this arrangement we can easily invoke the delayed calls in the right order and be able to efficiently remove all delayed calls related to a particular tag.
This commit is contained in:
parent
cb3a98dee8
commit
51d41b312b
@ -14,6 +14,7 @@
|
||||
|
||||
#include <maxscale/cppdefs.hh>
|
||||
#include <map>
|
||||
#include <tr1/unordered_set>
|
||||
#include <memory>
|
||||
#include <maxscale/platform.h>
|
||||
#include <maxscale/session.h>
|
||||
@ -1296,20 +1297,23 @@ private:
|
||||
}
|
||||
};
|
||||
|
||||
typedef DelegatingTimer<Worker> PrivateTimer;
|
||||
typedef std::multimap<int64_t, DelayedCall*> DelayedCalls;
|
||||
typedef DelegatingTimer<Worker> PrivateTimer;
|
||||
typedef std::tr1::unordered_set<DelayedCall*> DelayedCalls;
|
||||
typedef std::multimap<int64_t, DelayedCall*> DelayedCallsByTime;
|
||||
typedef std::tr1::unordered_map<intptr_t, DelayedCalls> DelayedCallsByTag;
|
||||
|
||||
STATISTICS m_statistics; /*< Worker statistics. */
|
||||
MessageQueue* m_pQueue; /*< The message queue of the worker. */
|
||||
THREAD m_thread; /*< The thread handle of the worker. */
|
||||
bool m_started; /*< Whether the thread has been started or not. */
|
||||
bool m_should_shutdown; /*< Whether shutdown should be performed. */
|
||||
bool m_shutdown_initiated; /*< Whether shutdown has been initated. */
|
||||
uint32_t m_nCurrent_descriptors; /*< Current number of descriptors. */
|
||||
uint64_t m_nTotal_descriptors; /*< Total number of descriptors. */
|
||||
Load m_load; /*< The worker load. */
|
||||
PrivateTimer* m_pTimer; /*< The worker's own timer. */
|
||||
DelayedCalls m_delayed_calls; /*< Current delayed calls. */
|
||||
STATISTICS m_statistics; /*< Worker statistics. */
|
||||
MessageQueue* m_pQueue; /*< The message queue of the worker. */
|
||||
THREAD m_thread; /*< The thread handle of the worker. */
|
||||
bool m_started; /*< Whether the thread has been started or not. */
|
||||
bool m_should_shutdown; /*< Whether shutdown should be performed. */
|
||||
bool m_shutdown_initiated; /*< Whether shutdown has been initated. */
|
||||
uint32_t m_nCurrent_descriptors; /*< Current number of descriptors. */
|
||||
uint64_t m_nTotal_descriptors; /*< Total number of descriptors. */
|
||||
Load m_load; /*< The worker load. */
|
||||
PrivateTimer* m_pTimer; /*< The worker's own timer. */
|
||||
DelayedCallsByTime m_delayed_calls; /*< Current delayed calls ordered by time. */
|
||||
DelayedCallsByTag m_tagged_calls; /*< Current delayed calls ordered by tag. */
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -1141,12 +1141,22 @@ void Worker::tick()
|
||||
|
||||
m_delayed_calls.erase(i++);
|
||||
|
||||
ss_dassert(m_tagged_calls.find(pDelayed_call->tag()) != m_tagged_calls.end());
|
||||
|
||||
DelayedCalls& delayed_calls = m_tagged_calls[pDelayed_call->tag()];
|
||||
|
||||
auto j = delayed_calls.find(pDelayed_call);
|
||||
ss_dassert(j != delayed_calls.end());
|
||||
|
||||
if (pDelayed_call->call(Worker::Call::EXECUTE))
|
||||
{
|
||||
repeating_calls.push_back(pDelayed_call);
|
||||
}
|
||||
else
|
||||
{
|
||||
// If the call is not repeated, we remove it from the set
|
||||
// associated with the tag.
|
||||
delayed_calls.erase(j);
|
||||
delete pDelayed_call;
|
||||
}
|
||||
}
|
||||
@ -1178,8 +1188,15 @@ void Worker::add_delayed_call(DelayedCall* pDelayed_call)
|
||||
}
|
||||
}
|
||||
|
||||
// Insert the delayed call into the map ordered by invocation time.
|
||||
m_delayed_calls.insert(std::make_pair(pDelayed_call->at(), pDelayed_call));
|
||||
|
||||
// Insert the delayed call into the set associated with the tag of
|
||||
// the delayed call.
|
||||
DelayedCalls& delayed_calls = m_tagged_calls[pDelayed_call->tag()];
|
||||
ss_dassert(delayed_calls.find(pDelayed_call) == delayed_calls.end());
|
||||
delayed_calls.insert(pDelayed_call);
|
||||
|
||||
if (adjust)
|
||||
{
|
||||
adjust_timer();
|
||||
@ -1210,9 +1227,58 @@ void Worker::adjust_timer()
|
||||
|
||||
int32_t Worker::cancel_delayed_calls(intptr_t tag)
|
||||
{
|
||||
// TODO: Implement
|
||||
ss_dassert(!true);
|
||||
return 0;
|
||||
int n = 0;
|
||||
|
||||
auto i = m_tagged_calls.find(tag);
|
||||
|
||||
if (i != m_tagged_calls.end())
|
||||
{
|
||||
// All delayed calls associated with the provided tag.
|
||||
DelayedCalls& delayed_calls = i->second;
|
||||
ss_debug(int nDelayed_calls = delayed_calls.size());
|
||||
|
||||
for (auto j = delayed_calls.begin(); j != delayed_calls.end(); ++j)
|
||||
{
|
||||
DelayedCall* pDelayed_call = *j;
|
||||
|
||||
// All delayed calls with exactly the same trigger time.
|
||||
// Not particularly likely there will be many of those.
|
||||
auto range = m_delayed_calls.equal_range(pDelayed_call->at());
|
||||
|
||||
auto k = range.first;
|
||||
ss_dassert(k != range.second);
|
||||
ss_debug(bool found = false);
|
||||
|
||||
while (k != range.second)
|
||||
{
|
||||
if (k->second == pDelayed_call)
|
||||
{
|
||||
m_delayed_calls.erase(k);
|
||||
delete pDelayed_call;
|
||||
++n;
|
||||
|
||||
k = range.second;
|
||||
|
||||
ss_debug(found = true);
|
||||
}
|
||||
else
|
||||
{
|
||||
++k;
|
||||
}
|
||||
}
|
||||
|
||||
ss_dassert(found);
|
||||
}
|
||||
|
||||
ss_dassert(nDelayed_calls == n);
|
||||
}
|
||||
else
|
||||
{
|
||||
ss_dassert(!true);
|
||||
MXS_WARNING("Attempt to remove calls associated with non-existing tag.");
|
||||
}
|
||||
|
||||
return n;
|
||||
}
|
||||
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user