rtc::Thread::PostTask() added.

This method allows asynchronously posting tasks, in the form of
functors to be invoked, on the thread represented by rtc::Thread.

This CL removes PostMessageWithFunctor(), putting the implementation of
it directly into rtc::Thread::PostTask(), and moves & updates the test
coverage to thread_unittest.cc.

Bug: webrtc:10294, webrtc:10293
Change-Id: Ic6cc3e2533a4f7aaff141aff28e9bb3908ee3c96
Reviewed-on: https://webrtc-review.googlesource.com/c/124701
Reviewed-by: Karl Wiberg <kwiberg@webrtc.org>
Commit-Queue: Henrik Boström <hbos@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#26894}
This commit is contained in:
Henrik Boström
2019-02-28 09:34:06 +01:00
committed by Commit Bot
parent 8f385e39fa
commit ba4dcc3ed8
6 changed files with 288 additions and 333 deletions

View File

@ -491,7 +491,6 @@ if (rtc_include_tests) {
"rtc_base:rtc_base_unittests",
"rtc_base:rtc_json_unittests",
"rtc_base:rtc_numerics_unittests",
"rtc_base:rtc_post_message_with_functor_unittests",
"rtc_base:rtc_task_queue_unittests",
"rtc_base:sequenced_task_checker_unittests",
"rtc_base:sigslot_unittest",

View File

@ -1045,17 +1045,6 @@ rtc_static_library("rtc_base") {
}
}
rtc_source_set("rtc_post_message_with_functor") {
sources = [
"post_message_with_functor.h",
]
deps = [
":checks",
":rtc_base",
":rtc_event",
]
}
rtc_source_set("gtest_prod") {
visibility = [ "*" ]
sources = [
@ -1390,21 +1379,6 @@ if (rtc_include_tests) {
]
}
rtc_source_set("rtc_post_message_with_functor_unittests") {
testonly = true
sources = [
"post_message_with_functor_unittest.cc",
]
deps = [
":checks",
":gunit_helpers",
":rtc_base",
":rtc_post_message_with_functor",
"../test:test_support",
]
}
rtc_source_set("rtc_base_unittests") {
testonly = true
defines = []

View File

@ -1,77 +0,0 @@
/*
* Copyright 2019 The WebRTC Project Authors. All rights reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef RTC_BASE_POST_MESSAGE_WITH_FUNCTOR_H_
#define RTC_BASE_POST_MESSAGE_WITH_FUNCTOR_H_
#include <utility>
#include "rtc_base/checks.h"
#include "rtc_base/constructor_magic.h"
#include "rtc_base/location.h"
#include "rtc_base/message_handler.h"
#include "rtc_base/thread.h"
namespace rtc {
namespace post_message_with_functor_internal {
template <class FunctorT>
class SingleMessageHandlerWithFunctor : public MessageHandler {
public:
template <class F>
explicit SingleMessageHandlerWithFunctor(F&& functor)
: functor_(std::forward<F>(functor)) {}
void OnMessage(Message* msg) override {
functor_();
delete this;
}
private:
~SingleMessageHandlerWithFunctor() override {}
typename std::remove_reference<FunctorT>::type functor_;
RTC_DISALLOW_COPY_AND_ASSIGN(SingleMessageHandlerWithFunctor);
};
} // namespace post_message_with_functor_internal
// Asynchronously posts a message that will invoke |functor| on the target
// thread. Ownership is passed and |functor| is destroyed on the target thread.
// Requirements of FunctorT:
// - FunctorT is movable.
// - FunctorT implements "T operator()()" or "T operator()() const" for some T
// (if T is not void, the return value is discarded on the target thread).
// - FunctorT has a public destructor that can be invoked from the target
// thread after operation() has been invoked.
// - The functor must not cause the thread to quit before
// PostMessageWithFunctor() is done.
template <class FunctorT>
void PostMessageWithFunctor(const Location& posted_from,
Thread* thread,
FunctorT&& functor) {
thread->Post(
posted_from,
new post_message_with_functor_internal::SingleMessageHandlerWithFunctor<
FunctorT>(std::forward<FunctorT>(functor)));
// This DCHECK guarantees that the post was successful.
// Post() doesn't say whether it succeeded, but it will only fail if the
// thread is quitting. DCHECKing that the thread is not quitting *after*
// posting might yield some false positives (where the thread did in fact
// quit, but only after posting), but if we have false positives here then we
// have a race condition anyway.
RTC_DCHECK(!thread->IsQuitting());
}
} // namespace rtc
#endif // RTC_BASE_POST_MESSAGE_WITH_FUNCTOR_H_

View File

@ -1,229 +0,0 @@
/*
* Copyright 2019 The WebRTC Project Authors. All rights reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "rtc_base/post_message_with_functor.h"
#include <memory>
#include "rtc_base/bind.h"
#include "rtc_base/checks.h"
#include "rtc_base/event.h"
#include "rtc_base/gunit.h"
#include "rtc_base/ref_counted_object.h"
#include "rtc_base/thread.h"
#include "test/gtest.h"
namespace rtc {
namespace {
void ThreadIsCurrent(Thread* thread, bool* result, Event* event) {
*result = thread->IsCurrent();
event->Set();
}
void WaitAndSetEvent(Event* wait_event, Event* set_event) {
wait_event->Wait(Event::kForever);
set_event->Set();
}
// A functor that keeps track of the number of copies and moves.
class LifeCycleFunctor {
public:
struct Stats {
size_t copy_count = 0;
size_t move_count = 0;
};
LifeCycleFunctor(Stats* stats, Event* event) : stats_(stats), event_(event) {}
LifeCycleFunctor(const LifeCycleFunctor& other) { *this = other; }
LifeCycleFunctor(LifeCycleFunctor&& other) { *this = std::move(other); }
LifeCycleFunctor& operator=(const LifeCycleFunctor& other) {
stats_ = other.stats_;
event_ = other.event_;
++stats_->copy_count;
return *this;
}
LifeCycleFunctor& operator=(LifeCycleFunctor&& other) {
stats_ = other.stats_;
event_ = other.event_;
++stats_->move_count;
return *this;
}
void operator()() { event_->Set(); }
private:
Stats* stats_;
Event* event_;
};
// A functor that verifies the thread it was destroyed on.
class DestructionFunctor {
public:
DestructionFunctor(Thread* thread, bool* thread_was_current, Event* event)
: thread_(thread),
thread_was_current_(thread_was_current),
event_(event) {}
~DestructionFunctor() {
// Only signal the event if this was the functor that was invoked to avoid
// the event being signaled due to the destruction of temporary/moved
// versions of this object.
if (was_invoked_) {
*thread_was_current_ = thread_->IsCurrent();
event_->Set();
}
}
void operator()() { was_invoked_ = true; }
private:
Thread* thread_;
bool* thread_was_current_;
Event* event_;
bool was_invoked_ = false;
};
} // namespace
TEST(PostMessageWithFunctorTest, InvokesWithBind) {
std::unique_ptr<rtc::Thread> background_thread(rtc::Thread::Create());
background_thread->Start();
Event event;
PostMessageWithFunctor(RTC_FROM_HERE, background_thread.get(),
Bind(&Event::Set, &event));
event.Wait(Event::kForever);
}
TEST(PostMessageWithFunctorTest, InvokesWithLambda) {
std::unique_ptr<rtc::Thread> background_thread(rtc::Thread::Create());
background_thread->Start();
Event event;
PostMessageWithFunctor(RTC_FROM_HERE, background_thread.get(),
[&event] { event.Set(); });
event.Wait(Event::kForever);
}
TEST(PostMessageWithFunctorTest, InvokesWithCopiedFunctor) {
std::unique_ptr<rtc::Thread> background_thread(rtc::Thread::Create());
background_thread->Start();
LifeCycleFunctor::Stats stats;
Event event;
LifeCycleFunctor functor(&stats, &event);
PostMessageWithFunctor(RTC_FROM_HERE, background_thread.get(), functor);
event.Wait(Event::kForever);
EXPECT_EQ(1u, stats.copy_count);
EXPECT_EQ(0u, stats.move_count);
}
TEST(PostMessageWithFunctorTest, InvokesWithMovedFunctor) {
std::unique_ptr<rtc::Thread> background_thread(rtc::Thread::Create());
background_thread->Start();
LifeCycleFunctor::Stats stats;
Event event;
LifeCycleFunctor functor(&stats, &event);
PostMessageWithFunctor(RTC_FROM_HERE, background_thread.get(),
std::move(functor));
event.Wait(Event::kForever);
EXPECT_EQ(0u, stats.copy_count);
EXPECT_EQ(1u, stats.move_count);
}
TEST(PostMessageWithFunctorTest,
InvokesWithCopiedFunctorDestroyedOnTargetThread) {
std::unique_ptr<rtc::Thread> background_thread(rtc::Thread::Create());
background_thread->Start();
Event event;
bool was_invoked_on_background_thread = false;
DestructionFunctor functor(background_thread.get(),
&was_invoked_on_background_thread, &event);
PostMessageWithFunctor(RTC_FROM_HERE, background_thread.get(), functor);
event.Wait(Event::kForever);
EXPECT_TRUE(was_invoked_on_background_thread);
}
TEST(PostMessageWithFunctorTest,
InvokesWithMovedFunctorDestroyedOnTargetThread) {
std::unique_ptr<rtc::Thread> background_thread(rtc::Thread::Create());
background_thread->Start();
Event event;
bool was_invoked_on_background_thread = false;
DestructionFunctor functor(background_thread.get(),
&was_invoked_on_background_thread, &event);
PostMessageWithFunctor(RTC_FROM_HERE, background_thread.get(),
std::move(functor));
event.Wait(Event::kForever);
EXPECT_TRUE(was_invoked_on_background_thread);
}
TEST(PostMessageWithFunctorTest, InvokesOnBackgroundThread) {
std::unique_ptr<rtc::Thread> background_thread(rtc::Thread::Create());
background_thread->Start();
Event event;
bool was_invoked_on_background_thread = false;
PostMessageWithFunctor(RTC_FROM_HERE, background_thread.get(),
Bind(&ThreadIsCurrent, background_thread.get(),
&was_invoked_on_background_thread, &event));
event.Wait(Event::kForever);
EXPECT_TRUE(was_invoked_on_background_thread);
}
TEST(PostMessageWithFunctorTest, InvokesAsynchronously) {
std::unique_ptr<rtc::Thread> background_thread(rtc::Thread::Create());
background_thread->Start();
// The first event ensures that SendSingleMessage() is not blocking this
// thread. The second event ensures that the message is processed.
Event event_set_by_test_thread;
Event event_set_by_background_thread;
PostMessageWithFunctor(RTC_FROM_HERE, background_thread.get(),
Bind(&WaitAndSetEvent, &event_set_by_test_thread,
&event_set_by_background_thread));
event_set_by_test_thread.Set();
event_set_by_background_thread.Wait(Event::kForever);
}
TEST(PostMessageWithFunctorTest, InvokesInPostedOrder) {
std::unique_ptr<rtc::Thread> background_thread(rtc::Thread::Create());
background_thread->Start();
Event first;
Event second;
Event third;
Event fourth;
PostMessageWithFunctor(RTC_FROM_HERE, background_thread.get(),
Bind(&WaitAndSetEvent, &first, &second));
PostMessageWithFunctor(RTC_FROM_HERE, background_thread.get(),
Bind(&WaitAndSetEvent, &second, &third));
PostMessageWithFunctor(RTC_FROM_HERE, background_thread.get(),
Bind(&WaitAndSetEvent, &third, &fourth));
// All tasks have been posted before the first one is unblocked.
first.Set();
// Only if the chain is invoked in posted order will the last event be set.
fourth.Wait(Event::kForever);
}
} // namespace rtc

View File

@ -36,6 +36,29 @@ namespace rtc {
class Thread;
namespace rtc_thread_internal {
template <class FunctorT>
class SingleMessageHandlerWithFunctor final : public MessageHandler {
public:
explicit SingleMessageHandlerWithFunctor(FunctorT&& functor)
: functor_(std::forward<FunctorT>(functor)) {}
void OnMessage(Message* msg) override {
functor_();
delete this;
}
private:
~SingleMessageHandlerWithFunctor() override {}
typename std::remove_reference<FunctorT>::type functor_;
RTC_DISALLOW_COPY_AND_ASSIGN(SingleMessageHandlerWithFunctor);
};
} // namespace rtc_thread_internal
class ThreadManager {
public:
static const int kForever = -1;
@ -183,6 +206,8 @@ class RTC_LOCKABLE Thread : public MessageQueue {
// &MyFunctionReturningBool);
// NOTE: This function can only be called when synchronous calls are allowed.
// See ScopedDisallowBlockingCalls for details.
// NOTE: Blocking invokes are DISCOURAGED, consider if what you're doing can
// be achieved with PostTask() and callbacks instead.
template <class ReturnT, class FunctorT>
ReturnT Invoke(const Location& posted_from, FunctorT&& functor) {
FunctorMessageHandler<ReturnT, FunctorT> handler(
@ -191,6 +216,44 @@ class RTC_LOCKABLE Thread : public MessageQueue {
return handler.MoveResult();
}
// Posts a task to invoke the functor on |this| thread asynchronously, i.e.
// without blocking the thread that invoked PostTask(). Ownership of |functor|
// is passed and destroyed on |this| thread after it is invoked.
// Requirements of FunctorT:
// - FunctorT is movable.
// - FunctorT implements "T operator()()" or "T operator()() const" for some T
// (if T is not void, the return value is discarded on |this| thread).
// - FunctorT has a public destructor that can be invoked from |this| thread
// after operation() has been invoked.
// - The functor must not cause the thread to quit before PostTask() is done.
//
// Example - Calling a class method:
// class Foo {
// public:
// void DoTheThing();
// };
// Foo foo;
// thread->PostTask(RTC_FROM_HERE, Bind(&Foo::DoTheThing, &foo));
//
// Example - Calling a lambda function:
// thread->PostTask(RTC_FROM_HERE,
// [&x, &y] { x.TrackComputations(y.Compute()); });
template <class FunctorT>
void PostTask(const Location& posted_from, FunctorT&& functor) {
Post(posted_from,
new rtc_thread_internal::SingleMessageHandlerWithFunctor<FunctorT>(
std::forward<FunctorT>(functor)));
// This DCHECK guarantees that the post was successful.
// Post() doesn't say whether it succeeded, but it will only fail if the
// thread is quitting. DCHECKing that the thread is not quitting *after*
// posting might yield some false positives (where the thread did in fact
// quit, but only after posting), but if we have false positives here then
// we have a race condition anyway.
// TODO(https://crbug.com/webrtc/10364): When Post() returns a bool we can
// DCHECK the result instead of inferring success from IsQuitting().
RTC_DCHECK(!IsQuitting());
}
// From MessageQueue
bool IsProcessingMessagesForTesting() override;
void Clear(MessageHandler* phandler,

View File

@ -671,5 +671,230 @@ TEST_F(GuardedAsyncInvokeTest, FlushWithIds) {
EXPECT_TRUE(flag2.get());
}
void ThreadIsCurrent(Thread* thread, bool* result, Event* event) {
*result = thread->IsCurrent();
event->Set();
}
void WaitAndSetEvent(Event* wait_event, Event* set_event) {
wait_event->Wait(Event::kForever);
set_event->Set();
}
// A functor that keeps track of the number of copies and moves.
class LifeCycleFunctor {
public:
struct Stats {
size_t copy_count = 0;
size_t move_count = 0;
};
LifeCycleFunctor(Stats* stats, Event* event) : stats_(stats), event_(event) {}
LifeCycleFunctor(const LifeCycleFunctor& other) { *this = other; }
LifeCycleFunctor(LifeCycleFunctor&& other) { *this = std::move(other); }
LifeCycleFunctor& operator=(const LifeCycleFunctor& other) {
stats_ = other.stats_;
event_ = other.event_;
++stats_->copy_count;
return *this;
}
LifeCycleFunctor& operator=(LifeCycleFunctor&& other) {
stats_ = other.stats_;
event_ = other.event_;
++stats_->move_count;
return *this;
}
void operator()() { event_->Set(); }
private:
Stats* stats_;
Event* event_;
};
// A functor that verifies the thread it was destroyed on.
class DestructionFunctor {
public:
DestructionFunctor(Thread* thread, bool* thread_was_current, Event* event)
: thread_(thread),
thread_was_current_(thread_was_current),
event_(event) {}
~DestructionFunctor() {
// Only signal the event if this was the functor that was invoked to avoid
// the event being signaled due to the destruction of temporary/moved
// versions of this object.
if (was_invoked_) {
*thread_was_current_ = thread_->IsCurrent();
event_->Set();
}
}
void operator()() { was_invoked_ = true; }
private:
Thread* thread_;
bool* thread_was_current_;
Event* event_;
bool was_invoked_ = false;
};
TEST(ThreadPostTaskTest, InvokesWithBind) {
std::unique_ptr<rtc::Thread> background_thread(rtc::Thread::Create());
background_thread->Start();
Event event;
background_thread->PostTask(RTC_FROM_HERE, Bind(&Event::Set, &event));
event.Wait(Event::kForever);
}
TEST(ThreadPostTaskTest, InvokesWithLambda) {
std::unique_ptr<rtc::Thread> background_thread(rtc::Thread::Create());
background_thread->Start();
Event event;
background_thread->PostTask(RTC_FROM_HERE, [&event] { event.Set(); });
event.Wait(Event::kForever);
}
TEST(ThreadPostTaskTest, InvokesWithCopiedFunctor) {
std::unique_ptr<rtc::Thread> background_thread(rtc::Thread::Create());
background_thread->Start();
LifeCycleFunctor::Stats stats;
Event event;
LifeCycleFunctor functor(&stats, &event);
background_thread->PostTask(RTC_FROM_HERE, functor);
event.Wait(Event::kForever);
EXPECT_EQ(1u, stats.copy_count);
EXPECT_EQ(0u, stats.move_count);
}
TEST(ThreadPostTaskTest, InvokesWithMovedFunctor) {
std::unique_ptr<rtc::Thread> background_thread(rtc::Thread::Create());
background_thread->Start();
LifeCycleFunctor::Stats stats;
Event event;
LifeCycleFunctor functor(&stats, &event);
background_thread->PostTask(RTC_FROM_HERE, std::move(functor));
event.Wait(Event::kForever);
EXPECT_EQ(0u, stats.copy_count);
EXPECT_EQ(1u, stats.move_count);
}
TEST(ThreadPostTaskTest, InvokesWithReferencedFunctorShouldCopy) {
std::unique_ptr<rtc::Thread> background_thread(rtc::Thread::Create());
background_thread->Start();
LifeCycleFunctor::Stats stats;
Event event;
LifeCycleFunctor functor(&stats, &event);
LifeCycleFunctor& functor_ref = functor;
background_thread->PostTask(RTC_FROM_HERE, functor_ref);
event.Wait(Event::kForever);
EXPECT_EQ(1u, stats.copy_count);
EXPECT_EQ(0u, stats.move_count);
}
TEST(ThreadPostTaskTest, InvokesWithCopiedFunctorDestroyedOnTargetThread) {
std::unique_ptr<rtc::Thread> background_thread(rtc::Thread::Create());
background_thread->Start();
Event event;
bool was_invoked_on_background_thread = false;
DestructionFunctor functor(background_thread.get(),
&was_invoked_on_background_thread, &event);
background_thread->PostTask(RTC_FROM_HERE, functor);
event.Wait(Event::kForever);
EXPECT_TRUE(was_invoked_on_background_thread);
}
TEST(ThreadPostTaskTest, InvokesWithMovedFunctorDestroyedOnTargetThread) {
std::unique_ptr<rtc::Thread> background_thread(rtc::Thread::Create());
background_thread->Start();
Event event;
bool was_invoked_on_background_thread = false;
DestructionFunctor functor(background_thread.get(),
&was_invoked_on_background_thread, &event);
background_thread->PostTask(RTC_FROM_HERE, std::move(functor));
event.Wait(Event::kForever);
EXPECT_TRUE(was_invoked_on_background_thread);
}
TEST(ThreadPostTaskTest,
InvokesWithReferencedFunctorShouldCopyAndDestroyedOnTargetThread) {
std::unique_ptr<rtc::Thread> background_thread(rtc::Thread::Create());
background_thread->Start();
Event event;
bool was_invoked_on_background_thread = false;
DestructionFunctor functor(background_thread.get(),
&was_invoked_on_background_thread, &event);
DestructionFunctor& functor_ref = functor;
background_thread->PostTask(RTC_FROM_HERE, functor_ref);
event.Wait(Event::kForever);
EXPECT_TRUE(was_invoked_on_background_thread);
}
TEST(ThreadPostTaskTest, InvokesOnBackgroundThread) {
std::unique_ptr<rtc::Thread> background_thread(rtc::Thread::Create());
background_thread->Start();
Event event;
bool was_invoked_on_background_thread = false;
background_thread->PostTask(RTC_FROM_HERE,
Bind(&ThreadIsCurrent, background_thread.get(),
&was_invoked_on_background_thread, &event));
event.Wait(Event::kForever);
EXPECT_TRUE(was_invoked_on_background_thread);
}
TEST(ThreadPostTaskTest, InvokesAsynchronously) {
std::unique_ptr<rtc::Thread> background_thread(rtc::Thread::Create());
background_thread->Start();
// The first event ensures that SendSingleMessage() is not blocking this
// thread. The second event ensures that the message is processed.
Event event_set_by_test_thread;
Event event_set_by_background_thread;
background_thread->PostTask(RTC_FROM_HERE,
Bind(&WaitAndSetEvent, &event_set_by_test_thread,
&event_set_by_background_thread));
event_set_by_test_thread.Set();
event_set_by_background_thread.Wait(Event::kForever);
}
TEST(ThreadPostTaskTest, InvokesInPostedOrder) {
std::unique_ptr<rtc::Thread> background_thread(rtc::Thread::Create());
background_thread->Start();
Event first;
Event second;
Event third;
Event fourth;
background_thread->PostTask(RTC_FROM_HERE,
Bind(&WaitAndSetEvent, &first, &second));
background_thread->PostTask(RTC_FROM_HERE,
Bind(&WaitAndSetEvent, &second, &third));
background_thread->PostTask(RTC_FROM_HERE,
Bind(&WaitAndSetEvent, &third, &fourth));
// All tasks have been posted before the first one is unblocked.
first.Set();
// Only if the chain is invoked in posted order will the last event be set.
fourth.Wait(Event::kForever);
}
} // namespace
} // namespace rtc