From ba4dcc3ed81b91b202db730f1586844fb79e74ec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Henrik=20Bostr=C3=B6m?= Date: Thu, 28 Feb 2019 09:34:06 +0100 Subject: [PATCH] rtc::Thread::PostTask() added. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This method allows asynchronously posting tasks, in the form of functors to be invoked, on the thread represented by rtc::Thread. This CL removes PostMessageWithFunctor(), putting the implementation of it directly into rtc::Thread::PostTask(), and moves & updates the test coverage to thread_unittest.cc. Bug: webrtc:10294, webrtc:10293 Change-Id: Ic6cc3e2533a4f7aaff141aff28e9bb3908ee3c96 Reviewed-on: https://webrtc-review.googlesource.com/c/124701 Reviewed-by: Karl Wiberg Commit-Queue: Henrik Boström Cr-Commit-Position: refs/heads/master@{#26894} --- BUILD.gn | 1 - rtc_base/BUILD.gn | 26 -- rtc_base/post_message_with_functor.h | 77 ------ .../post_message_with_functor_unittest.cc | 229 ------------------ rtc_base/thread.h | 63 +++++ rtc_base/thread_unittest.cc | 225 +++++++++++++++++ 6 files changed, 288 insertions(+), 333 deletions(-) delete mode 100644 rtc_base/post_message_with_functor.h delete mode 100644 rtc_base/post_message_with_functor_unittest.cc diff --git a/BUILD.gn b/BUILD.gn index 590345e102..5c3ef433bd 100644 --- a/BUILD.gn +++ b/BUILD.gn @@ -491,7 +491,6 @@ if (rtc_include_tests) { "rtc_base:rtc_base_unittests", "rtc_base:rtc_json_unittests", "rtc_base:rtc_numerics_unittests", - "rtc_base:rtc_post_message_with_functor_unittests", "rtc_base:rtc_task_queue_unittests", "rtc_base:sequenced_task_checker_unittests", "rtc_base:sigslot_unittest", diff --git a/rtc_base/BUILD.gn b/rtc_base/BUILD.gn index 8f072bdd1a..6ebb980f29 100644 --- a/rtc_base/BUILD.gn +++ b/rtc_base/BUILD.gn @@ -1045,17 +1045,6 @@ rtc_static_library("rtc_base") { } } -rtc_source_set("rtc_post_message_with_functor") { - sources = [ - "post_message_with_functor.h", - ] - deps = [ - ":checks", - ":rtc_base", - ":rtc_event", - ] -} - rtc_source_set("gtest_prod") { visibility = [ "*" ] sources = [ @@ -1390,21 +1379,6 @@ if (rtc_include_tests) { ] } - rtc_source_set("rtc_post_message_with_functor_unittests") { - testonly = true - - sources = [ - "post_message_with_functor_unittest.cc", - ] - deps = [ - ":checks", - ":gunit_helpers", - ":rtc_base", - ":rtc_post_message_with_functor", - "../test:test_support", - ] - } - rtc_source_set("rtc_base_unittests") { testonly = true defines = [] diff --git a/rtc_base/post_message_with_functor.h b/rtc_base/post_message_with_functor.h deleted file mode 100644 index 389724d594..0000000000 --- a/rtc_base/post_message_with_functor.h +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Copyright 2019 The WebRTC Project Authors. All rights reserved. - * - * Use of this source code is governed by a BSD-style license - * that can be found in the LICENSE file in the root of the source - * tree. An additional intellectual property rights grant can be found - * in the file PATENTS. All contributing project authors may - * be found in the AUTHORS file in the root of the source tree. - */ - -#ifndef RTC_BASE_POST_MESSAGE_WITH_FUNCTOR_H_ -#define RTC_BASE_POST_MESSAGE_WITH_FUNCTOR_H_ - -#include - -#include "rtc_base/checks.h" -#include "rtc_base/constructor_magic.h" -#include "rtc_base/location.h" -#include "rtc_base/message_handler.h" -#include "rtc_base/thread.h" - -namespace rtc { - -namespace post_message_with_functor_internal { - -template -class SingleMessageHandlerWithFunctor : public MessageHandler { - public: - template - explicit SingleMessageHandlerWithFunctor(F&& functor) - : functor_(std::forward(functor)) {} - - void OnMessage(Message* msg) override { - functor_(); - delete this; - } - - private: - ~SingleMessageHandlerWithFunctor() override {} - - typename std::remove_reference::type functor_; - - RTC_DISALLOW_COPY_AND_ASSIGN(SingleMessageHandlerWithFunctor); -}; - -} // namespace post_message_with_functor_internal - -// Asynchronously posts a message that will invoke |functor| on the target -// thread. Ownership is passed and |functor| is destroyed on the target thread. -// Requirements of FunctorT: -// - FunctorT is movable. -// - FunctorT implements "T operator()()" or "T operator()() const" for some T -// (if T is not void, the return value is discarded on the target thread). -// - FunctorT has a public destructor that can be invoked from the target -// thread after operation() has been invoked. -// - The functor must not cause the thread to quit before -// PostMessageWithFunctor() is done. -template -void PostMessageWithFunctor(const Location& posted_from, - Thread* thread, - FunctorT&& functor) { - thread->Post( - posted_from, - new post_message_with_functor_internal::SingleMessageHandlerWithFunctor< - FunctorT>(std::forward(functor))); - // This DCHECK guarantees that the post was successful. - // Post() doesn't say whether it succeeded, but it will only fail if the - // thread is quitting. DCHECKing that the thread is not quitting *after* - // posting might yield some false positives (where the thread did in fact - // quit, but only after posting), but if we have false positives here then we - // have a race condition anyway. - RTC_DCHECK(!thread->IsQuitting()); -} - -} // namespace rtc - -#endif // RTC_BASE_POST_MESSAGE_WITH_FUNCTOR_H_ diff --git a/rtc_base/post_message_with_functor_unittest.cc b/rtc_base/post_message_with_functor_unittest.cc deleted file mode 100644 index 025bfd467c..0000000000 --- a/rtc_base/post_message_with_functor_unittest.cc +++ /dev/null @@ -1,229 +0,0 @@ -/* - * Copyright 2019 The WebRTC Project Authors. All rights reserved. - * - * Use of this source code is governed by a BSD-style license - * that can be found in the LICENSE file in the root of the source - * tree. An additional intellectual property rights grant can be found - * in the file PATENTS. All contributing project authors may - * be found in the AUTHORS file in the root of the source tree. - */ - -#include "rtc_base/post_message_with_functor.h" - -#include - -#include "rtc_base/bind.h" -#include "rtc_base/checks.h" -#include "rtc_base/event.h" -#include "rtc_base/gunit.h" -#include "rtc_base/ref_counted_object.h" -#include "rtc_base/thread.h" -#include "test/gtest.h" - -namespace rtc { - -namespace { - -void ThreadIsCurrent(Thread* thread, bool* result, Event* event) { - *result = thread->IsCurrent(); - event->Set(); -} - -void WaitAndSetEvent(Event* wait_event, Event* set_event) { - wait_event->Wait(Event::kForever); - set_event->Set(); -} - -// A functor that keeps track of the number of copies and moves. -class LifeCycleFunctor { - public: - struct Stats { - size_t copy_count = 0; - size_t move_count = 0; - }; - - LifeCycleFunctor(Stats* stats, Event* event) : stats_(stats), event_(event) {} - LifeCycleFunctor(const LifeCycleFunctor& other) { *this = other; } - LifeCycleFunctor(LifeCycleFunctor&& other) { *this = std::move(other); } - - LifeCycleFunctor& operator=(const LifeCycleFunctor& other) { - stats_ = other.stats_; - event_ = other.event_; - ++stats_->copy_count; - return *this; - } - - LifeCycleFunctor& operator=(LifeCycleFunctor&& other) { - stats_ = other.stats_; - event_ = other.event_; - ++stats_->move_count; - return *this; - } - - void operator()() { event_->Set(); } - - private: - Stats* stats_; - Event* event_; -}; - -// A functor that verifies the thread it was destroyed on. -class DestructionFunctor { - public: - DestructionFunctor(Thread* thread, bool* thread_was_current, Event* event) - : thread_(thread), - thread_was_current_(thread_was_current), - event_(event) {} - ~DestructionFunctor() { - // Only signal the event if this was the functor that was invoked to avoid - // the event being signaled due to the destruction of temporary/moved - // versions of this object. - if (was_invoked_) { - *thread_was_current_ = thread_->IsCurrent(); - event_->Set(); - } - } - - void operator()() { was_invoked_ = true; } - - private: - Thread* thread_; - bool* thread_was_current_; - Event* event_; - bool was_invoked_ = false; -}; - -} // namespace - -TEST(PostMessageWithFunctorTest, InvokesWithBind) { - std::unique_ptr background_thread(rtc::Thread::Create()); - background_thread->Start(); - - Event event; - PostMessageWithFunctor(RTC_FROM_HERE, background_thread.get(), - Bind(&Event::Set, &event)); - event.Wait(Event::kForever); -} - -TEST(PostMessageWithFunctorTest, InvokesWithLambda) { - std::unique_ptr background_thread(rtc::Thread::Create()); - background_thread->Start(); - - Event event; - PostMessageWithFunctor(RTC_FROM_HERE, background_thread.get(), - [&event] { event.Set(); }); - event.Wait(Event::kForever); -} - -TEST(PostMessageWithFunctorTest, InvokesWithCopiedFunctor) { - std::unique_ptr background_thread(rtc::Thread::Create()); - background_thread->Start(); - - LifeCycleFunctor::Stats stats; - Event event; - LifeCycleFunctor functor(&stats, &event); - PostMessageWithFunctor(RTC_FROM_HERE, background_thread.get(), functor); - event.Wait(Event::kForever); - - EXPECT_EQ(1u, stats.copy_count); - EXPECT_EQ(0u, stats.move_count); -} - -TEST(PostMessageWithFunctorTest, InvokesWithMovedFunctor) { - std::unique_ptr background_thread(rtc::Thread::Create()); - background_thread->Start(); - - LifeCycleFunctor::Stats stats; - Event event; - LifeCycleFunctor functor(&stats, &event); - PostMessageWithFunctor(RTC_FROM_HERE, background_thread.get(), - std::move(functor)); - event.Wait(Event::kForever); - - EXPECT_EQ(0u, stats.copy_count); - EXPECT_EQ(1u, stats.move_count); -} - -TEST(PostMessageWithFunctorTest, - InvokesWithCopiedFunctorDestroyedOnTargetThread) { - std::unique_ptr background_thread(rtc::Thread::Create()); - background_thread->Start(); - - Event event; - bool was_invoked_on_background_thread = false; - DestructionFunctor functor(background_thread.get(), - &was_invoked_on_background_thread, &event); - PostMessageWithFunctor(RTC_FROM_HERE, background_thread.get(), functor); - event.Wait(Event::kForever); - - EXPECT_TRUE(was_invoked_on_background_thread); -} - -TEST(PostMessageWithFunctorTest, - InvokesWithMovedFunctorDestroyedOnTargetThread) { - std::unique_ptr background_thread(rtc::Thread::Create()); - background_thread->Start(); - - Event event; - bool was_invoked_on_background_thread = false; - DestructionFunctor functor(background_thread.get(), - &was_invoked_on_background_thread, &event); - PostMessageWithFunctor(RTC_FROM_HERE, background_thread.get(), - std::move(functor)); - event.Wait(Event::kForever); - - EXPECT_TRUE(was_invoked_on_background_thread); -} - -TEST(PostMessageWithFunctorTest, InvokesOnBackgroundThread) { - std::unique_ptr background_thread(rtc::Thread::Create()); - background_thread->Start(); - - Event event; - bool was_invoked_on_background_thread = false; - PostMessageWithFunctor(RTC_FROM_HERE, background_thread.get(), - Bind(&ThreadIsCurrent, background_thread.get(), - &was_invoked_on_background_thread, &event)); - event.Wait(Event::kForever); - - EXPECT_TRUE(was_invoked_on_background_thread); -} - -TEST(PostMessageWithFunctorTest, InvokesAsynchronously) { - std::unique_ptr background_thread(rtc::Thread::Create()); - background_thread->Start(); - - // The first event ensures that SendSingleMessage() is not blocking this - // thread. The second event ensures that the message is processed. - Event event_set_by_test_thread; - Event event_set_by_background_thread; - PostMessageWithFunctor(RTC_FROM_HERE, background_thread.get(), - Bind(&WaitAndSetEvent, &event_set_by_test_thread, - &event_set_by_background_thread)); - event_set_by_test_thread.Set(); - event_set_by_background_thread.Wait(Event::kForever); -} - -TEST(PostMessageWithFunctorTest, InvokesInPostedOrder) { - std::unique_ptr background_thread(rtc::Thread::Create()); - background_thread->Start(); - - Event first; - Event second; - Event third; - Event fourth; - - PostMessageWithFunctor(RTC_FROM_HERE, background_thread.get(), - Bind(&WaitAndSetEvent, &first, &second)); - PostMessageWithFunctor(RTC_FROM_HERE, background_thread.get(), - Bind(&WaitAndSetEvent, &second, &third)); - PostMessageWithFunctor(RTC_FROM_HERE, background_thread.get(), - Bind(&WaitAndSetEvent, &third, &fourth)); - - // All tasks have been posted before the first one is unblocked. - first.Set(); - // Only if the chain is invoked in posted order will the last event be set. - fourth.Wait(Event::kForever); -} - -} // namespace rtc diff --git a/rtc_base/thread.h b/rtc_base/thread.h index 3ba3010fd9..7bb7a3c130 100644 --- a/rtc_base/thread.h +++ b/rtc_base/thread.h @@ -36,6 +36,29 @@ namespace rtc { class Thread; +namespace rtc_thread_internal { + +template +class SingleMessageHandlerWithFunctor final : public MessageHandler { + public: + explicit SingleMessageHandlerWithFunctor(FunctorT&& functor) + : functor_(std::forward(functor)) {} + + void OnMessage(Message* msg) override { + functor_(); + delete this; + } + + private: + ~SingleMessageHandlerWithFunctor() override {} + + typename std::remove_reference::type functor_; + + RTC_DISALLOW_COPY_AND_ASSIGN(SingleMessageHandlerWithFunctor); +}; + +} // namespace rtc_thread_internal + class ThreadManager { public: static const int kForever = -1; @@ -183,6 +206,8 @@ class RTC_LOCKABLE Thread : public MessageQueue { // &MyFunctionReturningBool); // NOTE: This function can only be called when synchronous calls are allowed. // See ScopedDisallowBlockingCalls for details. + // NOTE: Blocking invokes are DISCOURAGED, consider if what you're doing can + // be achieved with PostTask() and callbacks instead. template ReturnT Invoke(const Location& posted_from, FunctorT&& functor) { FunctorMessageHandler handler( @@ -191,6 +216,44 @@ class RTC_LOCKABLE Thread : public MessageQueue { return handler.MoveResult(); } + // Posts a task to invoke the functor on |this| thread asynchronously, i.e. + // without blocking the thread that invoked PostTask(). Ownership of |functor| + // is passed and destroyed on |this| thread after it is invoked. + // Requirements of FunctorT: + // - FunctorT is movable. + // - FunctorT implements "T operator()()" or "T operator()() const" for some T + // (if T is not void, the return value is discarded on |this| thread). + // - FunctorT has a public destructor that can be invoked from |this| thread + // after operation() has been invoked. + // - The functor must not cause the thread to quit before PostTask() is done. + // + // Example - Calling a class method: + // class Foo { + // public: + // void DoTheThing(); + // }; + // Foo foo; + // thread->PostTask(RTC_FROM_HERE, Bind(&Foo::DoTheThing, &foo)); + // + // Example - Calling a lambda function: + // thread->PostTask(RTC_FROM_HERE, + // [&x, &y] { x.TrackComputations(y.Compute()); }); + template + void PostTask(const Location& posted_from, FunctorT&& functor) { + Post(posted_from, + new rtc_thread_internal::SingleMessageHandlerWithFunctor( + std::forward(functor))); + // This DCHECK guarantees that the post was successful. + // Post() doesn't say whether it succeeded, but it will only fail if the + // thread is quitting. DCHECKing that the thread is not quitting *after* + // posting might yield some false positives (where the thread did in fact + // quit, but only after posting), but if we have false positives here then + // we have a race condition anyway. + // TODO(https://crbug.com/webrtc/10364): When Post() returns a bool we can + // DCHECK the result instead of inferring success from IsQuitting(). + RTC_DCHECK(!IsQuitting()); + } + // From MessageQueue bool IsProcessingMessagesForTesting() override; void Clear(MessageHandler* phandler, diff --git a/rtc_base/thread_unittest.cc b/rtc_base/thread_unittest.cc index d87c332ea2..63edcf8c19 100644 --- a/rtc_base/thread_unittest.cc +++ b/rtc_base/thread_unittest.cc @@ -671,5 +671,230 @@ TEST_F(GuardedAsyncInvokeTest, FlushWithIds) { EXPECT_TRUE(flag2.get()); } +void ThreadIsCurrent(Thread* thread, bool* result, Event* event) { + *result = thread->IsCurrent(); + event->Set(); +} + +void WaitAndSetEvent(Event* wait_event, Event* set_event) { + wait_event->Wait(Event::kForever); + set_event->Set(); +} + +// A functor that keeps track of the number of copies and moves. +class LifeCycleFunctor { + public: + struct Stats { + size_t copy_count = 0; + size_t move_count = 0; + }; + + LifeCycleFunctor(Stats* stats, Event* event) : stats_(stats), event_(event) {} + LifeCycleFunctor(const LifeCycleFunctor& other) { *this = other; } + LifeCycleFunctor(LifeCycleFunctor&& other) { *this = std::move(other); } + + LifeCycleFunctor& operator=(const LifeCycleFunctor& other) { + stats_ = other.stats_; + event_ = other.event_; + ++stats_->copy_count; + return *this; + } + + LifeCycleFunctor& operator=(LifeCycleFunctor&& other) { + stats_ = other.stats_; + event_ = other.event_; + ++stats_->move_count; + return *this; + } + + void operator()() { event_->Set(); } + + private: + Stats* stats_; + Event* event_; +}; + +// A functor that verifies the thread it was destroyed on. +class DestructionFunctor { + public: + DestructionFunctor(Thread* thread, bool* thread_was_current, Event* event) + : thread_(thread), + thread_was_current_(thread_was_current), + event_(event) {} + ~DestructionFunctor() { + // Only signal the event if this was the functor that was invoked to avoid + // the event being signaled due to the destruction of temporary/moved + // versions of this object. + if (was_invoked_) { + *thread_was_current_ = thread_->IsCurrent(); + event_->Set(); + } + } + + void operator()() { was_invoked_ = true; } + + private: + Thread* thread_; + bool* thread_was_current_; + Event* event_; + bool was_invoked_ = false; +}; + +TEST(ThreadPostTaskTest, InvokesWithBind) { + std::unique_ptr background_thread(rtc::Thread::Create()); + background_thread->Start(); + + Event event; + background_thread->PostTask(RTC_FROM_HERE, Bind(&Event::Set, &event)); + event.Wait(Event::kForever); +} + +TEST(ThreadPostTaskTest, InvokesWithLambda) { + std::unique_ptr background_thread(rtc::Thread::Create()); + background_thread->Start(); + + Event event; + background_thread->PostTask(RTC_FROM_HERE, [&event] { event.Set(); }); + event.Wait(Event::kForever); +} + +TEST(ThreadPostTaskTest, InvokesWithCopiedFunctor) { + std::unique_ptr background_thread(rtc::Thread::Create()); + background_thread->Start(); + + LifeCycleFunctor::Stats stats; + Event event; + LifeCycleFunctor functor(&stats, &event); + background_thread->PostTask(RTC_FROM_HERE, functor); + event.Wait(Event::kForever); + + EXPECT_EQ(1u, stats.copy_count); + EXPECT_EQ(0u, stats.move_count); +} + +TEST(ThreadPostTaskTest, InvokesWithMovedFunctor) { + std::unique_ptr background_thread(rtc::Thread::Create()); + background_thread->Start(); + + LifeCycleFunctor::Stats stats; + Event event; + LifeCycleFunctor functor(&stats, &event); + background_thread->PostTask(RTC_FROM_HERE, std::move(functor)); + event.Wait(Event::kForever); + + EXPECT_EQ(0u, stats.copy_count); + EXPECT_EQ(1u, stats.move_count); +} + +TEST(ThreadPostTaskTest, InvokesWithReferencedFunctorShouldCopy) { + std::unique_ptr background_thread(rtc::Thread::Create()); + background_thread->Start(); + + LifeCycleFunctor::Stats stats; + Event event; + LifeCycleFunctor functor(&stats, &event); + LifeCycleFunctor& functor_ref = functor; + background_thread->PostTask(RTC_FROM_HERE, functor_ref); + event.Wait(Event::kForever); + + EXPECT_EQ(1u, stats.copy_count); + EXPECT_EQ(0u, stats.move_count); +} + +TEST(ThreadPostTaskTest, InvokesWithCopiedFunctorDestroyedOnTargetThread) { + std::unique_ptr background_thread(rtc::Thread::Create()); + background_thread->Start(); + + Event event; + bool was_invoked_on_background_thread = false; + DestructionFunctor functor(background_thread.get(), + &was_invoked_on_background_thread, &event); + background_thread->PostTask(RTC_FROM_HERE, functor); + event.Wait(Event::kForever); + + EXPECT_TRUE(was_invoked_on_background_thread); +} + +TEST(ThreadPostTaskTest, InvokesWithMovedFunctorDestroyedOnTargetThread) { + std::unique_ptr background_thread(rtc::Thread::Create()); + background_thread->Start(); + + Event event; + bool was_invoked_on_background_thread = false; + DestructionFunctor functor(background_thread.get(), + &was_invoked_on_background_thread, &event); + background_thread->PostTask(RTC_FROM_HERE, std::move(functor)); + event.Wait(Event::kForever); + + EXPECT_TRUE(was_invoked_on_background_thread); +} + +TEST(ThreadPostTaskTest, + InvokesWithReferencedFunctorShouldCopyAndDestroyedOnTargetThread) { + std::unique_ptr background_thread(rtc::Thread::Create()); + background_thread->Start(); + + Event event; + bool was_invoked_on_background_thread = false; + DestructionFunctor functor(background_thread.get(), + &was_invoked_on_background_thread, &event); + DestructionFunctor& functor_ref = functor; + background_thread->PostTask(RTC_FROM_HERE, functor_ref); + event.Wait(Event::kForever); + + EXPECT_TRUE(was_invoked_on_background_thread); +} + +TEST(ThreadPostTaskTest, InvokesOnBackgroundThread) { + std::unique_ptr background_thread(rtc::Thread::Create()); + background_thread->Start(); + + Event event; + bool was_invoked_on_background_thread = false; + background_thread->PostTask(RTC_FROM_HERE, + Bind(&ThreadIsCurrent, background_thread.get(), + &was_invoked_on_background_thread, &event)); + event.Wait(Event::kForever); + + EXPECT_TRUE(was_invoked_on_background_thread); +} + +TEST(ThreadPostTaskTest, InvokesAsynchronously) { + std::unique_ptr background_thread(rtc::Thread::Create()); + background_thread->Start(); + + // The first event ensures that SendSingleMessage() is not blocking this + // thread. The second event ensures that the message is processed. + Event event_set_by_test_thread; + Event event_set_by_background_thread; + background_thread->PostTask(RTC_FROM_HERE, + Bind(&WaitAndSetEvent, &event_set_by_test_thread, + &event_set_by_background_thread)); + event_set_by_test_thread.Set(); + event_set_by_background_thread.Wait(Event::kForever); +} + +TEST(ThreadPostTaskTest, InvokesInPostedOrder) { + std::unique_ptr background_thread(rtc::Thread::Create()); + background_thread->Start(); + + Event first; + Event second; + Event third; + Event fourth; + + background_thread->PostTask(RTC_FROM_HERE, + Bind(&WaitAndSetEvent, &first, &second)); + background_thread->PostTask(RTC_FROM_HERE, + Bind(&WaitAndSetEvent, &second, &third)); + background_thread->PostTask(RTC_FROM_HERE, + Bind(&WaitAndSetEvent, &third, &fourth)); + + // All tasks have been posted before the first one is unblocked. + first.Set(); + // Only if the chain is invoked in posted order will the last event be set. + fourth.Wait(Event::kForever); +} + } // namespace } // namespace rtc