MXS-1754 Add possibility to cancel delayed calls
The interface for canceling calls is now geared towards the needs of sessions. Basically the idea is as follows: class MyFilterSession : public maxscale::FilterSession { ... int MyFilterSession::routeQuery(GWBUF* pPacket) { ... if (needs_to_be_delayed()) { Worker* pWorker = Worker::current(); void* pTag = this; pWorker->delayed_call(5000, pTag, this, &MyFilterSession::delayed_routeQuery, pPacket); return 1; } ... } bool MyFilterSession::delayed_routeQuery(Worker::Call:action_t action, GWBUF* pPacket) { if (action == Worker::Call::EXECUTE) { routeQuery(pPacket); } else { ss_dassert(action == Worker::Call::CANCEL); gwbuf_free(pPacket); } return false; } ~MyFilterSession() { void* pTag = this; Worker::current()->cancel_delayed_calls(pTag); } } The alternative, returning some key that the caller must keep around seems more cumbersome for the general case.
This commit is contained in:
@ -544,6 +544,15 @@ public:
|
||||
EXECUTE_QUEUED /**< Only queue tasks for execution */
|
||||
};
|
||||
|
||||
struct Call
|
||||
{
|
||||
enum action_t
|
||||
{
|
||||
EXECUTE, /**< Execute the call */
|
||||
CANCEL /**< Cancel the call */
|
||||
};
|
||||
};
|
||||
|
||||
/**
|
||||
* Initialize the worker mechanism.
|
||||
*
|
||||
@ -900,61 +909,157 @@ public:
|
||||
* Push a function for delayed execution.
|
||||
*
|
||||
* @param delay The delay in milliseconds.
|
||||
* @param tag A tag identifying this and possibly other delayed calls.
|
||||
* @param pFunction The function to call.
|
||||
*
|
||||
* @attention When invoked, if the provided function returns true, then it will
|
||||
* be called again after @c delay milliseconds.
|
||||
* @attention When invoked, if @c action is @c Worker::Call::EXECUTE, the
|
||||
* function should perform the delayed call and return @true, if
|
||||
* the function should be called again. If the function returns
|
||||
* @c false, it will not be called again.
|
||||
*
|
||||
* If @c action is @c Worker::Call::CANCEL, then the function
|
||||
* should perform whatever canceling actions are needed. In that
|
||||
* case the return value is ignored and the function will not
|
||||
* be called again.
|
||||
*/
|
||||
void delayed_call(uint32_t delay, bool (*pFunction)())
|
||||
void delayed_call(uint32_t delay,
|
||||
intptr_t tag,
|
||||
bool (*pFunction)(Worker::Call::action_t action))
|
||||
{
|
||||
add_delayed_call(new DelayedCallFunctionVoid(delay, pFunction));
|
||||
add_delayed_call(new DelayedCallFunctionVoid(delay, tag, pFunction));
|
||||
}
|
||||
|
||||
void delayed_call(uint32_t delay,
|
||||
void* tag,
|
||||
bool (*pFunction)(Worker::Call::action_t action))
|
||||
{
|
||||
return delayed_call(delay, reinterpret_cast<intptr_t>(tag), pFunction);
|
||||
}
|
||||
|
||||
/**
|
||||
* Push a function for delayed execution.
|
||||
*
|
||||
* @param delay The delay in milliseconds.
|
||||
* @param tag A tag identifying this and possibly other delayed calls.
|
||||
* @param pFunction The function to call.
|
||||
* @param data The data to be provided to the function when invoked.
|
||||
*
|
||||
* @attention When invoked, if the provided function returns true, then it will
|
||||
* be called again after @c delay milliseconds.
|
||||
* @attention When invoked, if @c action is @c Worker::Call::EXECUTE, the
|
||||
* function should perform the delayed call and return @true, if
|
||||
* the function should be called again. If the function returns
|
||||
* @c false, it will not be called again.
|
||||
*
|
||||
* If @c action is @c Worker::Call::CANCEL, then the function
|
||||
* should perform whatever canceling actions are needed. In that
|
||||
* case the return value is ignored and the function will not
|
||||
* be called again.
|
||||
*/
|
||||
template<class D>
|
||||
void delayed_call(uint32_t delay, bool (*pFunction)(D data), D data)
|
||||
void delayed_call(uint32_t delay,
|
||||
intptr_t tag,
|
||||
bool (*pFunction)(Worker::Call::action_t action, D data), D data)
|
||||
{
|
||||
add_delayed_call(new DelayedCallFunction<D>(delay, pFunction, data));
|
||||
add_delayed_call(new DelayedCallFunction<D>(delay, tag, pFunction, data));
|
||||
}
|
||||
|
||||
template<class D>
|
||||
void delayed_call(uint32_t delay,
|
||||
void* pTag,
|
||||
bool (*pFunction)(Worker::Call::action_t action, D data), D data)
|
||||
{
|
||||
return delayed_call(delay, reinterpret_cast<intptr_t>(pTag), pFunction);
|
||||
}
|
||||
|
||||
/**
|
||||
* Push a member function for delayed execution.
|
||||
*
|
||||
* @param delay The delay in milliseconds.
|
||||
* @param pTag A tag identifying this and possibly other delayed calls.
|
||||
* @param pMethod The member function to call.
|
||||
*
|
||||
* @attention When invoked, if the provided function returns true, then it will
|
||||
* be called again after @c delay milliseconds.
|
||||
* @attention When invoked, if @c action is @c Worker::Call::EXECUTE, the
|
||||
* function should perform the delayed call and return @true, if
|
||||
* the function should be called again. If the function returns
|
||||
* @c false, it will not be called again.
|
||||
*
|
||||
* If @c action is @c Worker::Call::CANCEL, then the function
|
||||
* should perform whatever canceling actions are needed. In that
|
||||
* case the return value is ignored and the function will not
|
||||
* be called again.
|
||||
*/
|
||||
template<class T>
|
||||
void delayed_call(uint32_t delay, T* pT, bool (T::*pMethod)())
|
||||
void delayed_call(uint32_t delay,
|
||||
intptr_t tag,
|
||||
T* pT,
|
||||
bool (T::*pMethod)(Worker::Call::action_t action))
|
||||
{
|
||||
add_delayed_call(new DelayedCallMethodVoid<T>(delay, pT, pMethod));
|
||||
add_delayed_call(new DelayedCallMethodVoid<T>(delay, tag, pT, pMethod));
|
||||
}
|
||||
|
||||
template<class T>
|
||||
void delayed_call(uint32_t delay,
|
||||
void* pTag,
|
||||
T* pT,
|
||||
bool (T::*pMethod)(Worker::Call::action_t action))
|
||||
{
|
||||
return delayed_call(delay, reinterpret_cast<intptr_t>(pTag), pT, pMethod);
|
||||
}
|
||||
|
||||
/**
|
||||
* Push a member function for delayed execution.
|
||||
*
|
||||
* @param delay The delay in milliseconds.
|
||||
* @param tag A tag identifying this and possibly other delayed calls.
|
||||
* @param pMethod The member function to call.
|
||||
* @param data The data to be provided to the function when invoked.
|
||||
*
|
||||
* @attention When invoked, if the provided function returns true, then it will
|
||||
* be called again after @c delay milliseconds.
|
||||
* @attention When invoked, if @c action is @c Worker::Call::EXECUTE, the
|
||||
* function should perform the delayed call and return @true, if
|
||||
* the function should be called again. If the function returns
|
||||
* @c false, it will not be called again.
|
||||
*
|
||||
* If @c action is @c Worker::Call::CANCEL, then the function
|
||||
* should perform whatever canceling actions are needed. In that
|
||||
* case the return value is ignored and the function will not
|
||||
* be called again.
|
||||
*/
|
||||
template<class T, class D>
|
||||
void delayed_call(uint32_t delay, T* pT, bool (T::*pMethod)(D data), D data)
|
||||
void delayed_call(uint32_t delay,
|
||||
intptr_t tag,
|
||||
T* pT,
|
||||
bool (T::*pMethod)(Worker::Call::action_t action, D data), D data)
|
||||
{
|
||||
add_delayed_call(new DelayedCallMethod<T, D>(delay, pT, pMethod, data));
|
||||
add_delayed_call(new DelayedCallMethod<T, D>(delay, tag, pT, pMethod, data));
|
||||
}
|
||||
|
||||
template<class T, class D>
|
||||
void delayed_call(uint32_t delay,
|
||||
void* pTag,
|
||||
T* pT,
|
||||
bool (T::*pMethod)(Worker::Call::action_t action, D data), D data)
|
||||
{
|
||||
return delayed_call(delay, reinterpret_cast<intptr_t>(pTag), pT, pMethod);
|
||||
}
|
||||
|
||||
/**
|
||||
* Cancel delayed calls.
|
||||
*
|
||||
* When this function is called, all current scheduled delayed calls, that
|
||||
* were scheduled using the specified tag, will be called *synchronously* with
|
||||
* the @c action argument being @c Worker::Call::CANCEL. That is, when this
|
||||
* function returns, all function have been canceled.
|
||||
*
|
||||
* @attention If any of the called function schedules a new delayed call using
|
||||
* the same tag, then they will *not* be canceled as a result of
|
||||
* this call.
|
||||
*
|
||||
* @param tag The tag that was used when a delayed call was scheduled.
|
||||
*/
|
||||
int32_t cancel_delayed_calls(intptr_t tag);
|
||||
|
||||
int32_t cancel_delayed_calls(void* pTag)
|
||||
{
|
||||
return cancel_delayed_calls(reinterpret_cast<intptr_t>(pTag));
|
||||
}
|
||||
|
||||
protected:
|
||||
@ -1009,14 +1114,19 @@ private:
|
||||
return m_delay;
|
||||
}
|
||||
|
||||
intptr_t tag() const
|
||||
{
|
||||
return m_tag;
|
||||
}
|
||||
|
||||
uint64_t at() const
|
||||
{
|
||||
return m_at;
|
||||
}
|
||||
|
||||
bool call()
|
||||
bool call(Worker::Call::action_t action)
|
||||
{
|
||||
bool rv = do_call();
|
||||
bool rv = do_call(action);
|
||||
// We try to invoke the function as often as it was specified. If the
|
||||
// delay is very short and the execution time for the function very long,
|
||||
// then we will not succeed with that and the function will simply be
|
||||
@ -1026,13 +1136,14 @@ private:
|
||||
}
|
||||
|
||||
protected:
|
||||
DelayedCall(uint32_t delay)
|
||||
DelayedCall(uint32_t delay, intptr_t tag)
|
||||
: m_delay(delay)
|
||||
, m_tag(tag)
|
||||
, m_at(get_at(delay))
|
||||
{
|
||||
}
|
||||
|
||||
virtual bool do_call() = 0;
|
||||
virtual bool do_call(Worker::Call::action_t action) = 0;
|
||||
|
||||
private:
|
||||
static uint64_t get_at(uint32_t delay)
|
||||
@ -1046,6 +1157,7 @@ private:
|
||||
|
||||
private:
|
||||
uint32_t m_delay; // The delay in milliseconds.
|
||||
intptr_t m_tag; // Tag identifying the delayed call.
|
||||
uint64_t m_at; // The next time the function should be invoked.
|
||||
};
|
||||
|
||||
@ -1056,21 +1168,23 @@ private:
|
||||
DelayedCallFunction& operator = (const DelayedCallFunction&) = delete;
|
||||
|
||||
public:
|
||||
DelayedCallFunction(uint32_t delay, bool (*pFunction)(D data), D data)
|
||||
: DelayedCall(delay)
|
||||
DelayedCallFunction(uint32_t delay,
|
||||
void* pTag,
|
||||
bool (*pFunction)(Worker::Call::action_t action, D data), D data)
|
||||
: DelayedCall(delay, pTag)
|
||||
, m_pFunction(pFunction)
|
||||
, m_data(data)
|
||||
{
|
||||
}
|
||||
|
||||
private:
|
||||
bool do_call()
|
||||
bool do_call(Worker::Call::action_t action)
|
||||
{
|
||||
return m_pFunction(m_data);
|
||||
return m_pFunction(action, m_data);
|
||||
}
|
||||
|
||||
private:
|
||||
bool (*m_pFunction)(D);
|
||||
bool (*m_pFunction)(Worker::Call::action_t, D);
|
||||
D m_data;
|
||||
};
|
||||
|
||||
@ -1081,20 +1195,22 @@ private:
|
||||
DelayedCallFunctionVoid& operator = (const DelayedCallFunctionVoid&) = delete;
|
||||
|
||||
public:
|
||||
DelayedCallFunctionVoid(uint32_t delay, bool (*pFunction)())
|
||||
: DelayedCall(delay)
|
||||
DelayedCallFunctionVoid(uint32_t delay,
|
||||
intptr_t tag,
|
||||
bool (*pFunction)(Worker::Call::action_t action))
|
||||
: DelayedCall(delay, tag)
|
||||
, m_pFunction(pFunction)
|
||||
{
|
||||
}
|
||||
|
||||
private:
|
||||
bool do_call()
|
||||
bool do_call(Worker::Call::action_t action)
|
||||
{
|
||||
return m_pFunction();
|
||||
return m_pFunction(action);
|
||||
}
|
||||
|
||||
private:
|
||||
bool (*m_pFunction)();
|
||||
bool (*m_pFunction)(Worker::Call::action_t action);
|
||||
};
|
||||
|
||||
template<class T, class D>
|
||||
@ -1104,8 +1220,11 @@ private:
|
||||
DelayedCallMethod& operator = (const DelayedCallMethod&) = delete;
|
||||
|
||||
public:
|
||||
DelayedCallMethod(uint32_t delay, T* pT, bool (T::*pMethod)(D data), D data)
|
||||
: DelayedCall(delay)
|
||||
DelayedCallMethod(uint32_t delay,
|
||||
intptr_t tag,
|
||||
T* pT,
|
||||
bool (T::*pMethod)(Worker::Call::action_t action, D data), D data)
|
||||
: DelayedCall(delay, tag)
|
||||
, m_pT(pT)
|
||||
, m_pMethod(pMethod)
|
||||
, m_data(data)
|
||||
@ -1113,14 +1232,14 @@ private:
|
||||
}
|
||||
|
||||
private:
|
||||
bool do_call()
|
||||
bool do_call(Worker::Call::action_t action)
|
||||
{
|
||||
return (m_pT->*m_pMethod)(m_data);
|
||||
return (m_pT->*m_pMethod)(action, m_data);
|
||||
}
|
||||
|
||||
private:
|
||||
T* m_pT;
|
||||
bool (T::*m_pMethod)(D);
|
||||
bool (T::*m_pMethod)(Worker::Call::action_t, D);
|
||||
D m_data;
|
||||
};
|
||||
|
||||
@ -1131,22 +1250,25 @@ private:
|
||||
DelayedCallMethodVoid& operator = (const DelayedCallMethodVoid&) = delete;
|
||||
|
||||
public:
|
||||
DelayedCallMethodVoid(uint32_t delay, T* pT, bool (T::*pMethod)())
|
||||
: DelayedCall(delay)
|
||||
DelayedCallMethodVoid(uint32_t delay,
|
||||
intptr_t tag,
|
||||
T* pT,
|
||||
bool (T::*pMethod)(Worker::Call::action_t))
|
||||
: DelayedCall(delay, tag)
|
||||
, m_pT(pT)
|
||||
, m_pMethod(pMethod)
|
||||
{
|
||||
}
|
||||
|
||||
private:
|
||||
bool do_call()
|
||||
bool do_call(Worker::Call::action_t action)
|
||||
{
|
||||
return (m_pT->*m_pMethod)();
|
||||
return (m_pT->*m_pMethod)(action);
|
||||
}
|
||||
|
||||
private:
|
||||
T* m_pT;
|
||||
bool (T::*m_pMethod)();
|
||||
bool (T::*m_pMethod)(Worker::Call::action_t);
|
||||
};
|
||||
|
||||
void add_delayed_call(DelayedCall* pDelayed_call);
|
||||
|
@ -78,8 +78,10 @@ public:
|
||||
return m_delay;
|
||||
}
|
||||
|
||||
bool tick()
|
||||
bool tick(mxs::Worker::Call::action_t action)
|
||||
{
|
||||
ss_dassert(action == mxs::Worker::Call::EXECUTE);
|
||||
|
||||
int64_t now = get_monotonic_time_ms();
|
||||
int64_t diff = abs(now - m_at);
|
||||
|
||||
@ -127,11 +129,11 @@ int run()
|
||||
TimerTest t4(&rv, 500);
|
||||
TimerTest t5(&rv, 600);
|
||||
|
||||
w.delayed_call(t1.delay(), &t1, &TimerTest::tick);
|
||||
w.delayed_call(t2.delay(), &t2, &TimerTest::tick);
|
||||
w.delayed_call(t3.delay(), &t3, &TimerTest::tick);
|
||||
w.delayed_call(t4.delay(), &t4, &TimerTest::tick);
|
||||
w.delayed_call(t5.delay(), &t5, &TimerTest::tick);
|
||||
w.delayed_call(t1.delay(), NULL, &t1, &TimerTest::tick);
|
||||
w.delayed_call(t2.delay(), NULL, &t2, &TimerTest::tick);
|
||||
w.delayed_call(t3.delay(), NULL, &t3, &TimerTest::tick);
|
||||
w.delayed_call(t4.delay(), NULL, &t4, &TimerTest::tick);
|
||||
w.delayed_call(t5.delay(), NULL, &t5, &TimerTest::tick);
|
||||
|
||||
w.run();
|
||||
|
||||
|
@ -1138,7 +1138,7 @@ void Worker::tick()
|
||||
pDelayed_call = m_delayed_calls.top();
|
||||
m_delayed_calls.pop();
|
||||
|
||||
if (pDelayed_call->call())
|
||||
if (pDelayed_call->call(Worker::Call::EXECUTE))
|
||||
{
|
||||
repeating_calls.push_back(pDelayed_call);
|
||||
}
|
||||
@ -1203,6 +1203,13 @@ void Worker::adjust_timer()
|
||||
}
|
||||
}
|
||||
|
||||
int32_t Worker::cancel_delayed_calls(intptr_t tag)
|
||||
{
|
||||
// TODO: Implement
|
||||
ss_dassert(!true);
|
||||
return 0;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
Reference in New Issue
Block a user