MXS-1754 Identify delayed calls using id and not tag

When a delayed call is scheduled for execution, the caller is
now returned a unique id using which the delayed call can be
cancelled.
This commit is contained in:
Johan Wikman
2018-04-20 14:33:32 +03:00
parent 51d41b312b
commit cbd7e51dd8
3 changed files with 126 additions and 164 deletions

View File

@ -909,9 +909,11 @@ public:
* Push a function for delayed execution. * Push a function for delayed execution.
* *
* @param delay The delay in milliseconds. * @param delay The delay in milliseconds.
* @param tag A tag identifying this and possibly other delayed calls.
* @param pFunction The function to call. * @param pFunction The function to call.
* *
* @return A unique identifier for the delayed call. Using that identifier
* the call can be cancelled.
*
* @attention When invoked, if @c action is @c Worker::Call::EXECUTE, the * @attention When invoked, if @c action is @c Worker::Call::EXECUTE, the
* function should perform the delayed call and return @true, if * function should perform the delayed call and return @true, if
* the function should be called again. If the function returns * the function should be called again. If the function returns
@ -922,28 +924,22 @@ public:
* case the return value is ignored and the function will not * case the return value is ignored and the function will not
* be called again. * be called again.
*/ */
void delayed_call(int32_t delay, uint32_t delayed_call(int32_t delay,
intptr_t tag, bool (*pFunction)(Worker::Call::action_t action))
bool (*pFunction)(Worker::Call::action_t action))
{ {
add_delayed_call(new DelayedCallFunctionVoid(delay, tag, pFunction)); return add_delayed_call(new DelayedCallFunctionVoid(delay, pFunction));
}
void delayed_call(int32_t delay,
void* tag,
bool (*pFunction)(Worker::Call::action_t action))
{
return delayed_call(delay, reinterpret_cast<intptr_t>(tag), pFunction);
} }
/** /**
* Push a function for delayed execution. * Push a function for delayed execution.
* *
* @param delay The delay in milliseconds. * @param delay The delay in milliseconds.
* @param tag A tag identifying this and possibly other delayed calls.
* @param pFunction The function to call. * @param pFunction The function to call.
* @param data The data to be provided to the function when invoked. * @param data The data to be provided to the function when invoked.
* *
* @return A unique identifier for the delayed call. Using that identifier
* the call can be cancelled.
*
* @attention When invoked, if @c action is @c Worker::Call::EXECUTE, the * @attention When invoked, if @c action is @c Worker::Call::EXECUTE, the
* function should perform the delayed call and return @true, if * function should perform the delayed call and return @true, if
* the function should be called again. If the function returns * the function should be called again. If the function returns
@ -955,28 +951,21 @@ public:
* be called again. * be called again.
*/ */
template<class D> template<class D>
void delayed_call(int32_t delay, uint32_t delayed_call(int32_t delay,
intptr_t tag, bool (*pFunction)(Worker::Call::action_t action, D data), D data)
bool (*pFunction)(Worker::Call::action_t action, D data), D data)
{ {
add_delayed_call(new DelayedCallFunction<D>(delay, tag, pFunction, data)); return add_delayed_call(new DelayedCallFunction<D>(delay, pFunction, data));
}
template<class D>
void delayed_call(int32_t delay,
void* pTag,
bool (*pFunction)(Worker::Call::action_t action, D data), D data)
{
return delayed_call(delay, reinterpret_cast<intptr_t>(pTag), pFunction);
} }
/** /**
* Push a member function for delayed execution. * Push a member function for delayed execution.
* *
* @param delay The delay in milliseconds. * @param delay The delay in milliseconds.
* @param pTag A tag identifying this and possibly other delayed calls.
* @param pMethod The member function to call. * @param pMethod The member function to call.
* *
* @return A unique identifier for the delayed call. Using that identifier
* the call can be cancelled.
*
* @attention When invoked, if @c action is @c Worker::Call::EXECUTE, the * @attention When invoked, if @c action is @c Worker::Call::EXECUTE, the
* function should perform the delayed call and return @true, if * function should perform the delayed call and return @true, if
* the function should be called again. If the function returns * the function should be called again. If the function returns
@ -988,31 +977,23 @@ public:
* be called again. * be called again.
*/ */
template<class T> template<class T>
void delayed_call(int32_t delay, uint32_t delayed_call(int32_t delay,
intptr_t tag, T* pT,
T* pT, bool (T::*pMethod)(Worker::Call::action_t action))
bool (T::*pMethod)(Worker::Call::action_t action))
{ {
add_delayed_call(new DelayedCallMethodVoid<T>(delay, tag, pT, pMethod)); return add_delayed_call(new DelayedCallMethodVoid<T>(delay, pT, pMethod));
}
template<class T>
void delayed_call(int32_t delay,
void* pTag,
T* pT,
bool (T::*pMethod)(Worker::Call::action_t action))
{
return delayed_call(delay, reinterpret_cast<intptr_t>(pTag), pT, pMethod);
} }
/** /**
* Push a member function for delayed execution. * Push a member function for delayed execution.
* *
* @param delay The delay in milliseconds. * @param delay The delay in milliseconds.
* @param tag A tag identifying this and possibly other delayed calls.
* @param pMethod The member function to call. * @param pMethod The member function to call.
* @param data The data to be provided to the function when invoked. * @param data The data to be provided to the function when invoked.
* *
* @return A unique identifier for the delayed call. Using that identifier
* the call can be cancelled.
*
* @attention When invoked, if @c action is @c Worker::Call::EXECUTE, the * @attention When invoked, if @c action is @c Worker::Call::EXECUTE, the
* function should perform the delayed call and return @true, if * function should perform the delayed call and return @true, if
* the function should be called again. If the function returns * the function should be called again. If the function returns
@ -1024,43 +1005,25 @@ public:
* be called again. * be called again.
*/ */
template<class T, class D> template<class T, class D>
void delayed_call(int32_t delay, uint32_t delayed_call(int32_t delay,
intptr_t tag, T* pT,
T* pT, bool (T::*pMethod)(Worker::Call::action_t action, D data), D data)
bool (T::*pMethod)(Worker::Call::action_t action, D data), D data)
{ {
add_delayed_call(new DelayedCallMethod<T, D>(delay, tag, pT, pMethod, data)); return add_delayed_call(new DelayedCallMethod<T, D>(delay, pT, pMethod, data));
}
template<class T, class D>
void delayed_call(int32_t delay,
void* pTag,
T* pT,
bool (T::*pMethod)(Worker::Call::action_t action, D data), D data)
{
return delayed_call(delay, reinterpret_cast<intptr_t>(pTag), pT, pMethod);
} }
/** /**
* Cancel delayed calls. * Cancel delayed call.
* *
* When this function is called, all current scheduled delayed calls, that * When this function is called, the delayed call in question will be called
* were scheduled using the specified tag, will be called *synchronously* with * *synchronously* with the @c action argument being @c Worker::Call::CANCEL.
* the @c action argument being @c Worker::Call::CANCEL. That is, when this * That is, when this function returns, the function has been canceled.
* function returns, all function have been canceled.
* *
* @attention If any of the called function schedules a new delayed call using * @param id The id that was returned when the delayed call was scheduled.
* the same tag, then they will *not* be canceled as a result of
* this call.
* *
* @param tag The tag that was used when a delayed call was scheduled. * @return True, if the id represented an existing delayed call.
*/ */
int32_t cancel_delayed_calls(intptr_t tag); bool cancel_delayed_call(uint32_t id);
int32_t cancel_delayed_calls(void* pTag)
{
return cancel_delayed_calls(reinterpret_cast<intptr_t>(pTag));
}
protected: protected:
Worker(); Worker();
@ -1099,6 +1062,17 @@ protected:
state_t m_state; /*< The state of the worker */ state_t m_state; /*< The state of the worker */
private: private:
class DelayedCall;
friend class DelayedCall;
static uint32_t next_delayed_call_id()
{
// Called in single-thread context. Wrapping does not matter
// as it is unlikely there would be 4 billion pending delayed
// calls.
return ++s_next_delayed_call_id;
}
class DelayedCall class DelayedCall
{ {
DelayedCall(const DelayedCall&) = delete;; DelayedCall(const DelayedCall&) = delete;;
@ -1114,9 +1088,9 @@ private:
return m_delay; return m_delay;
} }
intptr_t tag() const uint32_t id() const
{ {
return m_tag; return m_id;
} }
int64_t at() const int64_t at() const
@ -1136,9 +1110,9 @@ private:
} }
protected: protected:
DelayedCall(int32_t delay, intptr_t tag) DelayedCall(int32_t delay)
: m_delay(delay) : m_id(Worker::next_delayed_call_id())
, m_tag(tag) , m_delay(delay)
, m_at(get_at(delay)) , m_at(get_at(delay))
{ {
ss_dassert(delay > 0); ss_dassert(delay > 0);
@ -1159,9 +1133,9 @@ private:
} }
private: private:
int32_t m_delay; // The delay in milliseconds. uint32_t m_id; // The id of the delayed call.
intptr_t m_tag; // Tag identifying the delayed call. int32_t m_delay; // The delay in milliseconds.
int64_t m_at; // The next time the function should be invoked. int64_t m_at; // The next time the function should be invoked.
}; };
template<class D> template<class D>
@ -1172,9 +1146,8 @@ private:
public: public:
DelayedCallFunction(int32_t delay, DelayedCallFunction(int32_t delay,
void* pTag,
bool (*pFunction)(Worker::Call::action_t action, D data), D data) bool (*pFunction)(Worker::Call::action_t action, D data), D data)
: DelayedCall(delay, pTag) : DelayedCall(delay)
, m_pFunction(pFunction) , m_pFunction(pFunction)
, m_data(data) , m_data(data)
{ {
@ -1199,9 +1172,8 @@ private:
public: public:
DelayedCallFunctionVoid(int32_t delay, DelayedCallFunctionVoid(int32_t delay,
intptr_t tag,
bool (*pFunction)(Worker::Call::action_t action)) bool (*pFunction)(Worker::Call::action_t action))
: DelayedCall(delay, tag) : DelayedCall(delay)
, m_pFunction(pFunction) , m_pFunction(pFunction)
{ {
} }
@ -1224,10 +1196,9 @@ private:
public: public:
DelayedCallMethod(int32_t delay, DelayedCallMethod(int32_t delay,
intptr_t tag,
T* pT, T* pT,
bool (T::*pMethod)(Worker::Call::action_t action, D data), D data) bool (T::*pMethod)(Worker::Call::action_t action, D data), D data)
: DelayedCall(delay, tag) : DelayedCall(delay)
, m_pT(pT) , m_pT(pT)
, m_pMethod(pMethod) , m_pMethod(pMethod)
, m_data(data) , m_data(data)
@ -1254,10 +1225,9 @@ private:
public: public:
DelayedCallMethodVoid(int32_t delay, DelayedCallMethodVoid(int32_t delay,
intptr_t tag,
T* pT, T* pT,
bool (T::*pMethod)(Worker::Call::action_t)) bool (T::*pMethod)(Worker::Call::action_t))
: DelayedCall(delay, tag) : DelayedCall(delay)
, m_pT(pT) , m_pT(pT)
, m_pMethod(pMethod) , m_pMethod(pMethod)
{ {
@ -1274,7 +1244,7 @@ private:
bool (T::*m_pMethod)(Worker::Call::action_t); bool (T::*m_pMethod)(Worker::Call::action_t);
}; };
void add_delayed_call(DelayedCall* pDelayed_call); uint32_t add_delayed_call(DelayedCall* pDelayed_call);
void adjust_timer(); void adjust_timer();
bool post_disposable(DisposableTask* pTask, enum execute_mode_t mode = EXECUTE_AUTO); bool post_disposable(DisposableTask* pTask, enum execute_mode_t mode = EXECUTE_AUTO);
@ -1298,9 +1268,8 @@ private:
}; };
typedef DelegatingTimer<Worker> PrivateTimer; typedef DelegatingTimer<Worker> PrivateTimer;
typedef std::tr1::unordered_set<DelayedCall*> DelayedCalls;
typedef std::multimap<int64_t, DelayedCall*> DelayedCallsByTime; typedef std::multimap<int64_t, DelayedCall*> DelayedCallsByTime;
typedef std::tr1::unordered_map<intptr_t, DelayedCalls> DelayedCallsByTag; typedef std::tr1::unordered_map<uint32_t, DelayedCall*> DelayedCallsById;
STATISTICS m_statistics; /*< Worker statistics. */ STATISTICS m_statistics; /*< Worker statistics. */
MessageQueue* m_pQueue; /*< The message queue of the worker. */ MessageQueue* m_pQueue; /*< The message queue of the worker. */
@ -1312,8 +1281,10 @@ private:
uint64_t m_nTotal_descriptors; /*< Total number of descriptors. */ uint64_t m_nTotal_descriptors; /*< Total number of descriptors. */
Load m_load; /*< The worker load. */ Load m_load; /*< The worker load. */
PrivateTimer* m_pTimer; /*< The worker's own timer. */ PrivateTimer* m_pTimer; /*< The worker's own timer. */
DelayedCallsByTime m_delayed_calls; /*< Current delayed calls ordered by time. */ DelayedCallsByTime m_sorted_calls; /*< Current delayed calls sorted by time. */
DelayedCallsByTag m_tagged_calls; /*< Current delayed calls ordered by tag. */ DelayedCallsById m_calls; /*< Current delayed calls indexed by id. */
static uint32_t s_next_delayed_call_id; /*< The next delayed call id. */
}; };
} }

View File

@ -129,11 +129,11 @@ int run()
TimerTest t4(&rv, 500); TimerTest t4(&rv, 500);
TimerTest t5(&rv, 600); TimerTest t5(&rv, 600);
w.delayed_call(t1.delay(), NULL, &t1, &TimerTest::tick); w.delayed_call(t1.delay(), &t1, &TimerTest::tick);
w.delayed_call(t2.delay(), NULL, &t2, &TimerTest::tick); w.delayed_call(t2.delay(), &t2, &TimerTest::tick);
w.delayed_call(t3.delay(), NULL, &t3, &TimerTest::tick); w.delayed_call(t3.delay(), &t3, &TimerTest::tick);
w.delayed_call(t4.delay(), NULL, &t4, &TimerTest::tick); w.delayed_call(t4.delay(), &t4, &TimerTest::tick);
w.delayed_call(t5.delay(), NULL, &t5, &TimerTest::tick); w.delayed_call(t5.delay(), &t5, &TimerTest::tick);
w.run(); w.run();

View File

@ -303,6 +303,9 @@ int create_epoll_instance()
} }
//static
uint32_t Worker::s_next_delayed_call_id = 1;
Worker::Worker() Worker::Worker()
: m_id(next_worker_id()) : m_id(next_worker_id())
, m_epoll_fd(create_epoll_instance()) , m_epoll_fd(create_epoll_instance())
@ -1129,57 +1132,55 @@ void Worker::tick()
{ {
int64_t now = get_current_time_ms(); int64_t now = get_current_time_ms();
ss_dassert(!m_delayed_calls.empty());
vector<DelayedCall*> repeating_calls; vector<DelayedCall*> repeating_calls;
auto i = m_delayed_calls.begin(); auto i = m_sorted_calls.begin();
while ((i != m_delayed_calls.end()) && (i->first <= now)) // i->first is the time when the first call should be invoked.
while (!m_sorted_calls.empty() && (i->first <= now))
{ {
DelayedCall* pDelayed_call = i->second; DelayedCall* pCall = i->second;
m_delayed_calls.erase(i++); auto j = m_calls.find(pCall->id());
ss_dassert(j != m_calls.end());
ss_dassert(m_tagged_calls.find(pDelayed_call->tag()) != m_tagged_calls.end()); m_sorted_calls.erase(i);
m_calls.erase(j);
DelayedCalls& delayed_calls = m_tagged_calls[pDelayed_call->tag()]; if (pCall->call(Worker::Call::EXECUTE))
auto j = delayed_calls.find(pDelayed_call);
ss_dassert(j != delayed_calls.end());
if (pDelayed_call->call(Worker::Call::EXECUTE))
{ {
repeating_calls.push_back(pDelayed_call); repeating_calls.push_back(pCall);
} }
else else
{ {
// If the call is not repeated, we remove it from the set delete pCall;
// associated with the tag.
delayed_calls.erase(j);
delete pDelayed_call;
} }
// NOTE: Must be reassigned, ++i will not work in case a delayed
// NOTE: call cancels another delayed call.
i = m_sorted_calls.begin();
} }
for (auto i = repeating_calls.begin(); i != repeating_calls.end(); ++i) for (auto i = repeating_calls.begin(); i != repeating_calls.end(); ++i)
{ {
DelayedCall* pCall = *i; DelayedCall* pCall = *i;
m_delayed_calls.insert(std::make_pair(pCall->at(), pCall)); m_sorted_calls.insert(std::make_pair(pCall->at(), pCall));
m_calls.insert(std::make_pair(pCall->id(), pCall));
} }
adjust_timer(); adjust_timer();
} }
void Worker::add_delayed_call(DelayedCall* pDelayed_call) uint32_t Worker::add_delayed_call(DelayedCall* pCall)
{ {
bool adjust = true; bool adjust = true;
if (!m_delayed_calls.empty()) if (!m_sorted_calls.empty())
{ {
DelayedCall* pFirst = m_delayed_calls.begin()->second; DelayedCall* pFirst = m_sorted_calls.begin()->second;
if (pDelayed_call->at() > pFirst->at()) if (pCall->at() > pFirst->at())
{ {
// If the added delayed call needs to be called later // If the added delayed call needs to be called later
// than the first delayed call, then we do not need to // than the first delayed call, then we do not need to
@ -1189,28 +1190,28 @@ void Worker::add_delayed_call(DelayedCall* pDelayed_call)
} }
// Insert the delayed call into the map ordered by invocation time. // Insert the delayed call into the map ordered by invocation time.
m_delayed_calls.insert(std::make_pair(pDelayed_call->at(), pDelayed_call)); m_sorted_calls.insert(std::make_pair(pCall->at(), pCall));
// Insert the delayed call into the set associated with the tag of // Insert the delayed call into the map indexed by id.
// the delayed call. ss_dassert(m_calls.find(pCall->id()) == m_calls.end());
DelayedCalls& delayed_calls = m_tagged_calls[pDelayed_call->tag()]; m_calls.insert(std::make_pair(pCall->id(), pCall));
ss_dassert(delayed_calls.find(pDelayed_call) == delayed_calls.end());
delayed_calls.insert(pDelayed_call);
if (adjust) if (adjust)
{ {
adjust_timer(); adjust_timer();
} }
return pCall->id();
} }
void Worker::adjust_timer() void Worker::adjust_timer()
{ {
if (!m_delayed_calls.empty()) if (!m_sorted_calls.empty())
{ {
DelayedCall* pNext_call = m_delayed_calls.begin()->second; DelayedCall* pCall = m_sorted_calls.begin()->second;
uint64_t now = get_current_time_ms(); uint64_t now = get_current_time_ms();
int64_t delay = pNext_call->at() - now; int64_t delay = pCall->at() - now;
if (delay <= 0) if (delay <= 0)
{ {
@ -1225,60 +1226,50 @@ void Worker::adjust_timer()
} }
} }
int32_t Worker::cancel_delayed_calls(intptr_t tag) bool Worker::cancel_delayed_call(uint32_t id)
{ {
int n = 0; bool found = false;
auto i = m_tagged_calls.find(tag); auto i = m_calls.find(id);
if (i != m_tagged_calls.end()) if (i != m_calls.end())
{ {
// All delayed calls associated with the provided tag. DelayedCall* pCall = i->second;
DelayedCalls& delayed_calls = i->second; m_calls.erase(i);
ss_debug(int nDelayed_calls = delayed_calls.size());
for (auto j = delayed_calls.begin(); j != delayed_calls.end(); ++j) // All delayed calls with exactly the same trigger time.
// Not particularly likely there will be many of those.
auto range = m_sorted_calls.equal_range(pCall->at());
auto k = range.first;
ss_dassert(k != range.second);
while (k != range.second)
{ {
DelayedCall* pDelayed_call = *j; if (k->second == pCall)
// All delayed calls with exactly the same trigger time.
// Not particularly likely there will be many of those.
auto range = m_delayed_calls.equal_range(pDelayed_call->at());
auto k = range.first;
ss_dassert(k != range.second);
ss_debug(bool found = false);
while (k != range.second)
{ {
if (k->second == pDelayed_call) m_sorted_calls.erase(k);
{ delete pCall;
m_delayed_calls.erase(k);
delete pDelayed_call;
++n;
k = range.second; k = range.second;
ss_debug(found = true); found = true;
} }
else else
{ {
++k; ++k;
}
} }
ss_dassert(found);
} }
ss_dassert(nDelayed_calls == n); ss_dassert(found);
} }
else else
{ {
ss_dassert(!true); ss_dassert(!true);
MXS_WARNING("Attempt to remove calls associated with non-existing tag."); MXS_WARNING("Attempt to remove a delayed call, associated with non-existing id.");
} }
return n; return found;
} }
} }