From 26d4f9cd398fa4d98408daccd54246c9af59a624 Mon Sep 17 00:00:00 2001 From: Artem Titov Date: Mon, 29 Jun 2020 17:37:32 +0200 Subject: [PATCH] Add rtc::Thread invoke policy. Policy will allow explicitly specify thread between which invokes are allowed, or explicitly forbid any invokes. Change-Id: I360e7cba3ce1c21abd5047c6f175d8c4e0e99c6f Bug: webrtc:11728 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/177526 Reviewed-by: Tommi Commit-Queue: Artem Titov Cr-Commit-Position: refs/heads/master@{#31604} --- pc/peer_connection_factory.cc | 5 +++ rtc_base/thread.cc | 46 ++++++++++++++++++++++++++++ rtc_base/thread.h | 16 ++++++++++ rtc_base/thread_unittest.cc | 57 +++++++++++++++++++++++++++++++++++ 4 files changed, 124 insertions(+) diff --git a/pc/peer_connection_factory.cc b/pc/peer_connection_factory.cc index 2ff8dee4d0..0a6d75dde5 100644 --- a/pc/peer_connection_factory.cc +++ b/pc/peer_connection_factory.cc @@ -10,6 +10,7 @@ #include "pc/peer_connection_factory.h" +#include #include #include #include @@ -107,6 +108,10 @@ PeerConnectionFactory::PeerConnectionFactory( wraps_current_thread_ = true; } } + signaling_thread_->AllowInvokesToThread(worker_thread_); + signaling_thread_->AllowInvokesToThread(network_thread_); + worker_thread_->AllowInvokesToThread(network_thread_); + network_thread_->DisallowAnyInvoke(); } PeerConnectionFactory::~PeerConnectionFactory() { diff --git a/rtc_base/thread.cc b/rtc_base/thread.cc index f8e299afd9..6228b2cfa2 100644 --- a/rtc_base/thread.cc +++ b/rtc_base/thread.cc @@ -34,6 +34,7 @@ #include "rtc_base/critical_section.h" #include "rtc_base/logging.h" #include "rtc_base/null_socket_server.h" +#include "rtc_base/synchronization/sequence_checker.h" #include "rtc_base/task_utils/to_queued_task.h" #include "rtc_base/time_utils.h" #include "rtc_base/trace_event.h" @@ -892,6 +893,7 @@ void Thread::Send(const Location& posted_from, AutoThread thread; Thread* current_thread = Thread::Current(); RTC_DCHECK(current_thread != nullptr); // AutoThread ensures this + RTC_DCHECK(current_thread->IsInvokeToThreadAllowed(this)); #if RTC_DCHECK_IS_ON ThreadManager::Instance()->RegisterSendAndCheckForCycles(current_thread, this); @@ -974,6 +976,50 @@ void Thread::QueuedTaskHandler::OnMessage(Message* msg) { task.release(); } +void Thread::AllowInvokesToThread(Thread* thread) { +#if (!defined(NDEBUG) || defined(DCHECK_ALWAYS_ON)) + if (!IsCurrent()) { + PostTask(webrtc::ToQueuedTask( + [thread, this]() { AllowInvokesToThread(thread); })); + return; + } + RTC_DCHECK_RUN_ON(this); + allowed_threads_.push_back(thread); + invoke_policy_enabled_ = true; +#endif +} + +void Thread::DisallowAnyInvoke() { +#if (!defined(NDEBUG) || defined(DCHECK_ALWAYS_ON)) + if (!IsCurrent()) { + PostTask(webrtc::ToQueuedTask([this]() { DisallowAnyInvoke(); })); + return; + } + RTC_DCHECK_RUN_ON(this); + allowed_threads_.clear(); + invoke_policy_enabled_ = true; +#endif +} + +// Returns true if no policies added or if there is at least one policy +// that permits invocation to |target| thread. +bool Thread::IsInvokeToThreadAllowed(rtc::Thread* target) { +#if (!defined(NDEBUG) || defined(DCHECK_ALWAYS_ON)) + RTC_DCHECK_RUN_ON(this); + if (!invoke_policy_enabled_) { + return true; + } + for (const auto* thread : allowed_threads_) { + if (thread == target) { + return true; + } + } + return false; +#else + return true; +#endif +} + void Thread::PostTask(std::unique_ptr task) { // Though Post takes MessageData by raw pointer (last parameter), it still // takes it with ownership. diff --git a/rtc_base/thread.h b/rtc_base/thread.h index e25ed4ea8c..fbfa7e2c3c 100644 --- a/rtc_base/thread.h +++ b/rtc_base/thread.h @@ -338,6 +338,18 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public webrtc::TaskQueueBase { InvokeInternal(posted_from, functor); } + // Allows invoke to specified |thread|. Thread never will be dereferenced and + // will be used only for reference-based comparison, so instance can be safely + // deleted. If NDEBUG is defined and DCHECK_ALWAYS_ON is undefined do nothing. + void AllowInvokesToThread(Thread* thread); + // If NDEBUG is defined and DCHECK_ALWAYS_ON is undefined do nothing. + void DisallowAnyInvoke(); + // Returns true if |target| was allowed by AllowInvokesToThread() or if no + // calls were made to AllowInvokesToThread and DisallowAnyInvoke. Otherwise + // returns false. + // If NDEBUG is defined and DCHECK_ALWAYS_ON is undefined always returns true. + bool IsInvokeToThreadAllowed(rtc::Thread* target); + // Posts a task to invoke the functor on |this| thread asynchronously, i.e. // without blocking the thread that invoked PostTask(). Ownership of |functor| // is passed and (usually, see below) destroyed on |this| thread after it is @@ -566,6 +578,10 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public webrtc::TaskQueueBase { MessageList messages_ RTC_GUARDED_BY(crit_); PriorityQueue delayed_messages_ RTC_GUARDED_BY(crit_); uint32_t delayed_next_num_ RTC_GUARDED_BY(crit_); +#if (!defined(NDEBUG) || defined(DCHECK_ALWAYS_ON)) + std::vector allowed_threads_ RTC_GUARDED_BY(this); + bool invoke_policy_enabled_ RTC_GUARDED_BY(this) = false; +#endif CriticalSection crit_; bool fInitialized_; bool fDestroyed_; diff --git a/rtc_base/thread_unittest.cc b/rtc_base/thread_unittest.cc index e1011f4119..d7914d908d 100644 --- a/rtc_base/thread_unittest.cc +++ b/rtc_base/thread_unittest.cc @@ -288,6 +288,63 @@ TEST(ThreadTest, Wrap) { ThreadManager::Instance()->SetCurrentThread(current_thread); } +#if (!defined(NDEBUG) || defined(DCHECK_ALWAYS_ON)) +TEST(ThreadTest, InvokeToThreadAllowedReturnsTrueWithoutPolicies) { + // Create and start the thread. + auto thread1 = Thread::CreateWithSocketServer(); + auto thread2 = Thread::CreateWithSocketServer(); + + thread1->PostTask(ToQueuedTask( + [&]() { EXPECT_TRUE(thread1->IsInvokeToThreadAllowed(thread2.get())); })); + Thread* th_main = Thread::Current(); + th_main->ProcessMessages(100); +} + +TEST(ThreadTest, InvokeAllowedWhenThreadsAdded) { + // Create and start the thread. + auto thread1 = Thread::CreateWithSocketServer(); + auto thread2 = Thread::CreateWithSocketServer(); + auto thread3 = Thread::CreateWithSocketServer(); + auto thread4 = Thread::CreateWithSocketServer(); + + thread1->AllowInvokesToThread(thread2.get()); + thread1->AllowInvokesToThread(thread3.get()); + + thread1->PostTask(ToQueuedTask([&]() { + EXPECT_TRUE(thread1->IsInvokeToThreadAllowed(thread2.get())); + EXPECT_TRUE(thread1->IsInvokeToThreadAllowed(thread3.get())); + EXPECT_FALSE(thread1->IsInvokeToThreadAllowed(thread4.get())); + })); + Thread* th_main = Thread::Current(); + th_main->ProcessMessages(100); +} + +TEST(ThreadTest, InvokesDisallowedWhenDisallowAnyInvoke) { + // Create and start the thread. + auto thread1 = Thread::CreateWithSocketServer(); + auto thread2 = Thread::CreateWithSocketServer(); + + thread1->DisallowAnyInvoke(); + + thread1->PostTask(ToQueuedTask([&]() { + EXPECT_FALSE(thread1->IsInvokeToThreadAllowed(thread2.get())); + })); + Thread* th_main = Thread::Current(); + th_main->ProcessMessages(100); +} +#endif // (!defined(NDEBUG) || defined(DCHECK_ALWAYS_ON)) + +TEST(ThreadTest, InvokesAllowedByDefault) { + // Create and start the thread. + auto thread1 = Thread::CreateWithSocketServer(); + auto thread2 = Thread::CreateWithSocketServer(); + + thread1->PostTask(ToQueuedTask( + [&]() { EXPECT_TRUE(thread1->IsInvokeToThreadAllowed(thread2.get())); })); + Thread* th_main = Thread::Current(); + th_main->ProcessMessages(100); +} + TEST(ThreadTest, Invoke) { // Create and start the thread. auto thread = Thread::CreateWithSocketServer();