MXS-1754 Add delayed calling to Worker
It's now possible to provide Worker with a function to call at a later time. It's possible to provide a function or a member function (with the object), taking zero or one argument of any kind. The argument must be copyable. There's currently no way to cancel a call, which must be added as typically the delayed calling is associated with a session and if the session is closed before the delayed call is made, bad things are likely to happen.
This commit is contained in:
@ -236,10 +236,10 @@ void WorkerTimer::start(uint64_t interval)
|
||||
{
|
||||
// TODO: Add possibility to set initial delay and interval.
|
||||
time_t initial_sec = interval / 1000;
|
||||
long initial_nsec = (interval - initial_sec * 1000) * 1000;
|
||||
long initial_nsec = (interval - initial_sec * 1000) * 1000000;
|
||||
|
||||
time_t interval_sec = (interval / 1000);
|
||||
long interval_nsec = (interval - interval_sec * 1000) * 1000;
|
||||
long interval_nsec = (interval - interval_sec * 1000) * 1000000;
|
||||
|
||||
struct itimerspec time;
|
||||
|
||||
@ -312,7 +312,7 @@ Worker::Worker()
|
||||
, m_shutdown_initiated(false)
|
||||
, m_nCurrent_descriptors(0)
|
||||
, m_nTotal_descriptors(0)
|
||||
, m_timer(this, this, &Worker::tick)
|
||||
, m_pTimer(new PrivateTimer(this, this, &Worker::tick))
|
||||
{
|
||||
if (m_epoll_fd != -1)
|
||||
{
|
||||
@ -342,6 +342,7 @@ Worker::~Worker()
|
||||
|
||||
ss_dassert(!m_started);
|
||||
|
||||
delete m_pTimer;
|
||||
delete m_pQueue;
|
||||
close(m_epoll_fd);
|
||||
}
|
||||
@ -1108,10 +1109,98 @@ void Worker::poll_waitevents()
|
||||
m_state = STOPPED;
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
uint64_t get_current_time_ms()
|
||||
{
|
||||
struct timespec ts;
|
||||
ss_debug(int rv =) clock_gettime(CLOCK_MONOTONIC, &ts);
|
||||
ss_dassert(rv == 0);
|
||||
|
||||
return ts.tv_sec * 1000 + ts.tv_nsec / 1000000;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
void Worker::tick()
|
||||
{
|
||||
// TODO: Add timer management here once function for adding delayed calls
|
||||
// TODO: to Worker has been added.
|
||||
uint64_t now = get_current_time_ms();
|
||||
|
||||
ss_dassert(!m_delayed_calls.empty());
|
||||
|
||||
vector<DelayedCall*> repeating_calls;
|
||||
|
||||
DelayedCall* pDelayed_call;
|
||||
|
||||
while (!m_delayed_calls.empty() && (m_delayed_calls.top()->at() <= now))
|
||||
{
|
||||
pDelayed_call = m_delayed_calls.top();
|
||||
m_delayed_calls.pop();
|
||||
|
||||
if (pDelayed_call->call())
|
||||
{
|
||||
repeating_calls.push_back(pDelayed_call);
|
||||
}
|
||||
else
|
||||
{
|
||||
delete pDelayed_call;
|
||||
}
|
||||
}
|
||||
|
||||
for (auto i = repeating_calls.begin(); i != repeating_calls.end(); ++i)
|
||||
{
|
||||
m_delayed_calls.push(*i);
|
||||
}
|
||||
|
||||
adjust_timer();
|
||||
}
|
||||
|
||||
void Worker::add_delayed_call(DelayedCall* pDelayed_call)
|
||||
{
|
||||
bool adjust = true;
|
||||
|
||||
if (!m_delayed_calls.empty())
|
||||
{
|
||||
DelayedCall* pTop = m_delayed_calls.top();
|
||||
|
||||
if (pDelayed_call->at() < pTop->at())
|
||||
{
|
||||
// If the added delayed call needs to be called sooner
|
||||
// than the top-most delayed call, then we must adjust
|
||||
// the timer.
|
||||
adjust = true;
|
||||
}
|
||||
}
|
||||
|
||||
m_delayed_calls.push(pDelayed_call);
|
||||
|
||||
if (adjust)
|
||||
{
|
||||
adjust_timer();
|
||||
}
|
||||
}
|
||||
|
||||
void Worker::adjust_timer()
|
||||
{
|
||||
if (!m_delayed_calls.empty())
|
||||
{
|
||||
DelayedCall* pNext_call = m_delayed_calls.top();
|
||||
|
||||
uint64_t now = get_current_time_ms();
|
||||
int64_t delay = pNext_call->at() - now;
|
||||
|
||||
if (delay <= 0)
|
||||
{
|
||||
delay = 1;
|
||||
}
|
||||
|
||||
m_pTimer->start(delay);
|
||||
}
|
||||
else
|
||||
{
|
||||
m_pTimer->cancel();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user