Add rtc::Thread::PostDelayedTask

Earlier, rtc::Thread::PostTask was added as a convenient
alternative to MessageHandlers. This CL additionally adds support
for posting delayed tasks in a similar manner.

Bug: webrtc:10294
Change-Id: I0957b59ca2133a882c980bd2ad109fa03d701a16
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/161740
Commit-Queue: Steve Anton <steveanton@webrtc.org>
Reviewed-by: Karl Wiberg <kwiberg@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#30069}
This commit is contained in:
Steve Anton
2019-12-11 11:21:53 -08:00
committed by Commit Bot
parent 26fe811623
commit bcc1a765fb
3 changed files with 82 additions and 19 deletions

View File

@ -63,6 +63,24 @@ class ScopedAutoReleasePool {
#endif
namespace rtc {
namespace {
class MessageHandlerWithTask final : public MessageHandler {
public:
MessageHandlerWithTask() = default;
void OnMessage(Message* msg) override {
static_cast<rtc_thread_internal::MessageLikeTask*>(msg->pdata)->Run();
delete msg->pdata;
}
private:
~MessageHandlerWithTask() override {}
RTC_DISALLOW_COPY_AND_ASSIGN(MessageHandlerWithTask);
};
} // namespace
ThreadManager* ThreadManager::Instance() {
static ThreadManager* const thread_manager = new ThreadManager();
@ -612,6 +630,13 @@ bool Thread::IsRunning() {
#endif
}
// static
MessageHandler* Thread::GetPostTaskMessageHandler() {
// Allocate at first call, never deallocate.
static MessageHandler* handler = new MessageHandlerWithTask;
return handler;
}
AutoThread::AutoThread()
: Thread(SocketServer::CreateDefault(), /*do_init=*/false) {
if (!ThreadManager::Instance()->CurrentThread()) {

View File

@ -64,21 +64,6 @@ class MessageWithFunctor final : public MessageLikeTask {
RTC_DISALLOW_COPY_AND_ASSIGN(MessageWithFunctor);
};
class MessageHandlerWithTask final : public MessageHandler {
public:
MessageHandlerWithTask() = default;
void OnMessage(Message* msg) override {
static_cast<MessageLikeTask*>(msg->pdata)->Run();
delete msg->pdata;
}
private:
~MessageHandlerWithTask() override {}
RTC_DISALLOW_COPY_AND_ASSIGN(MessageHandlerWithTask);
};
} // namespace rtc_thread_internal
class RTC_EXPORT ThreadManager {
@ -267,13 +252,19 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public MessageQueue,
// [&x, &y] { x.TrackComputations(y.Compute()); });
template <class FunctorT>
void PostTask(const Location& posted_from, FunctorT&& functor) {
// Allocate at first call, never deallocate.
static auto* const handler =
new rtc_thread_internal::MessageHandlerWithTask;
Post(posted_from, handler, 0,
Post(posted_from, GetPostTaskMessageHandler(), /*id=*/0,
new rtc_thread_internal::MessageWithFunctor<FunctorT>(
std::forward<FunctorT>(functor)));
}
template <class FunctorT>
void PostDelayedTask(const Location& posted_from,
FunctorT&& functor,
uint32_t milliseconds) {
PostDelayed(posted_from, milliseconds, GetPostTaskMessageHandler(),
/*id=*/0,
new rtc_thread_internal::MessageWithFunctor<FunctorT>(
std::forward<FunctorT>(functor)));
}
// From TaskQueueBase
void PostTask(std::unique_ptr<webrtc::QueuedTask> task) override;
@ -347,6 +338,7 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public MessageQueue,
public:
void OnMessage(Message* msg) override;
};
// Sets the per-thread allow-blocking-calls flag and returns the previous
// value. Must be called on this thread.
bool SetAllowBlockingCalls(bool allow);
@ -381,6 +373,10 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public MessageQueue,
void InvokeInternal(const Location& posted_from,
rtc::FunctionView<void()> functor);
// Returns a static-lifetime MessageHandler which runs message with
// MessageLikeTask payload data.
static MessageHandler* GetPostTaskMessageHandler();
std::list<_SendMessage> sendlist_;
std::string name_;

View File

@ -902,6 +902,48 @@ TEST(ThreadPostTaskTest, InvokesInPostedOrder) {
fourth.Wait(Event::kForever);
}
TEST(ThreadPostDelayedTaskTest, InvokesAsynchronously) {
std::unique_ptr<rtc::Thread> background_thread(rtc::Thread::Create());
background_thread->Start();
// The first event ensures that SendSingleMessage() is not blocking this
// thread. The second event ensures that the message is processed.
Event event_set_by_test_thread;
Event event_set_by_background_thread;
background_thread->PostDelayedTask(
RTC_FROM_HERE,
Bind(&WaitAndSetEvent, &event_set_by_test_thread,
&event_set_by_background_thread),
/*milliseconds=*/10);
event_set_by_test_thread.Set();
event_set_by_background_thread.Wait(Event::kForever);
}
TEST(ThreadPostDelayedTaskTest, InvokesInDelayOrder) {
std::unique_ptr<rtc::Thread> background_thread(rtc::Thread::Create());
background_thread->Start();
Event first;
Event second;
Event third;
Event fourth;
background_thread->PostDelayedTask(RTC_FROM_HERE,
Bind(&WaitAndSetEvent, &third, &fourth),
/*milliseconds=*/11);
background_thread->PostDelayedTask(RTC_FROM_HERE,
Bind(&WaitAndSetEvent, &first, &second),
/*milliseconds=*/9);
background_thread->PostDelayedTask(RTC_FROM_HERE,
Bind(&WaitAndSetEvent, &second, &third),
/*milliseconds=*/10);
// All tasks have been posted before the first one is unblocked.
first.Set();
// Only if the chain is invoked in posted order will the last event be set.
fourth.Wait(Event::kForever);
}
class ThreadFactory : public webrtc::TaskQueueFactory {
public:
std::unique_ptr<webrtc::TaskQueueBase, webrtc::TaskQueueDeleter>