From bcc1a765fb6f5be36e4c3c84d0229a0458706119 Mon Sep 17 00:00:00 2001 From: Steve Anton Date: Wed, 11 Dec 2019 11:21:53 -0800 Subject: [PATCH] Add rtc::Thread::PostDelayedTask Earlier, rtc::Thread::PostTask was added as a convenient alternative to MessageHandlers. This CL additionally adds support for posting delayed tasks in a similar manner. Bug: webrtc:10294 Change-Id: I0957b59ca2133a882c980bd2ad109fa03d701a16 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/161740 Commit-Queue: Steve Anton Reviewed-by: Karl Wiberg Cr-Commit-Position: refs/heads/master@{#30069} --- rtc_base/thread.cc | 25 ++++++++++++++++++++++ rtc_base/thread.h | 34 +++++++++++++----------------- rtc_base/thread_unittest.cc | 42 +++++++++++++++++++++++++++++++++++++ 3 files changed, 82 insertions(+), 19 deletions(-) diff --git a/rtc_base/thread.cc b/rtc_base/thread.cc index 90be695885..7335af7c15 100644 --- a/rtc_base/thread.cc +++ b/rtc_base/thread.cc @@ -63,6 +63,24 @@ class ScopedAutoReleasePool { #endif namespace rtc { +namespace { + +class MessageHandlerWithTask final : public MessageHandler { + public: + MessageHandlerWithTask() = default; + + void OnMessage(Message* msg) override { + static_cast(msg->pdata)->Run(); + delete msg->pdata; + } + + private: + ~MessageHandlerWithTask() override {} + + RTC_DISALLOW_COPY_AND_ASSIGN(MessageHandlerWithTask); +}; + +} // namespace ThreadManager* ThreadManager::Instance() { static ThreadManager* const thread_manager = new ThreadManager(); @@ -612,6 +630,13 @@ bool Thread::IsRunning() { #endif } +// static +MessageHandler* Thread::GetPostTaskMessageHandler() { + // Allocate at first call, never deallocate. + static MessageHandler* handler = new MessageHandlerWithTask; + return handler; +} + AutoThread::AutoThread() : Thread(SocketServer::CreateDefault(), /*do_init=*/false) { if (!ThreadManager::Instance()->CurrentThread()) { diff --git a/rtc_base/thread.h b/rtc_base/thread.h index f433bab1ba..fb40a54b80 100644 --- a/rtc_base/thread.h +++ b/rtc_base/thread.h @@ -64,21 +64,6 @@ class MessageWithFunctor final : public MessageLikeTask { RTC_DISALLOW_COPY_AND_ASSIGN(MessageWithFunctor); }; -class MessageHandlerWithTask final : public MessageHandler { - public: - MessageHandlerWithTask() = default; - - void OnMessage(Message* msg) override { - static_cast(msg->pdata)->Run(); - delete msg->pdata; - } - - private: - ~MessageHandlerWithTask() override {} - - RTC_DISALLOW_COPY_AND_ASSIGN(MessageHandlerWithTask); -}; - } // namespace rtc_thread_internal class RTC_EXPORT ThreadManager { @@ -267,13 +252,19 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public MessageQueue, // [&x, &y] { x.TrackComputations(y.Compute()); }); template void PostTask(const Location& posted_from, FunctorT&& functor) { - // Allocate at first call, never deallocate. - static auto* const handler = - new rtc_thread_internal::MessageHandlerWithTask; - Post(posted_from, handler, 0, + Post(posted_from, GetPostTaskMessageHandler(), /*id=*/0, new rtc_thread_internal::MessageWithFunctor( std::forward(functor))); } + template + void PostDelayedTask(const Location& posted_from, + FunctorT&& functor, + uint32_t milliseconds) { + PostDelayed(posted_from, milliseconds, GetPostTaskMessageHandler(), + /*id=*/0, + new rtc_thread_internal::MessageWithFunctor( + std::forward(functor))); + } // From TaskQueueBase void PostTask(std::unique_ptr task) override; @@ -347,6 +338,7 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public MessageQueue, public: void OnMessage(Message* msg) override; }; + // Sets the per-thread allow-blocking-calls flag and returns the previous // value. Must be called on this thread. bool SetAllowBlockingCalls(bool allow); @@ -381,6 +373,10 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public MessageQueue, void InvokeInternal(const Location& posted_from, rtc::FunctionView functor); + // Returns a static-lifetime MessageHandler which runs message with + // MessageLikeTask payload data. + static MessageHandler* GetPostTaskMessageHandler(); + std::list<_SendMessage> sendlist_; std::string name_; diff --git a/rtc_base/thread_unittest.cc b/rtc_base/thread_unittest.cc index 8147c90a35..464f2d4b53 100644 --- a/rtc_base/thread_unittest.cc +++ b/rtc_base/thread_unittest.cc @@ -902,6 +902,48 @@ TEST(ThreadPostTaskTest, InvokesInPostedOrder) { fourth.Wait(Event::kForever); } +TEST(ThreadPostDelayedTaskTest, InvokesAsynchronously) { + std::unique_ptr background_thread(rtc::Thread::Create()); + background_thread->Start(); + + // The first event ensures that SendSingleMessage() is not blocking this + // thread. The second event ensures that the message is processed. + Event event_set_by_test_thread; + Event event_set_by_background_thread; + background_thread->PostDelayedTask( + RTC_FROM_HERE, + Bind(&WaitAndSetEvent, &event_set_by_test_thread, + &event_set_by_background_thread), + /*milliseconds=*/10); + event_set_by_test_thread.Set(); + event_set_by_background_thread.Wait(Event::kForever); +} + +TEST(ThreadPostDelayedTaskTest, InvokesInDelayOrder) { + std::unique_ptr background_thread(rtc::Thread::Create()); + background_thread->Start(); + + Event first; + Event second; + Event third; + Event fourth; + + background_thread->PostDelayedTask(RTC_FROM_HERE, + Bind(&WaitAndSetEvent, &third, &fourth), + /*milliseconds=*/11); + background_thread->PostDelayedTask(RTC_FROM_HERE, + Bind(&WaitAndSetEvent, &first, &second), + /*milliseconds=*/9); + background_thread->PostDelayedTask(RTC_FROM_HERE, + Bind(&WaitAndSetEvent, &second, &third), + /*milliseconds=*/10); + + // All tasks have been posted before the first one is unblocked. + first.Set(); + // Only if the chain is invoked in posted order will the last event be set. + fourth.Wait(Event::kForever); +} + class ThreadFactory : public webrtc::TaskQueueFactory { public: std::unique_ptr