Add rtc::Thread invoke policy.
Policy will allow explicitly specify thread between which invokes are allowed, or explicitly forbid any invokes. Change-Id: I360e7cba3ce1c21abd5047c6f175d8c4e0e99c6f Bug: webrtc:11728 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/177526 Reviewed-by: Tommi <tommi@webrtc.org> Commit-Queue: Artem Titov <titovartem@webrtc.org> Cr-Commit-Position: refs/heads/master@{#31604}
This commit is contained in:
@ -10,6 +10,7 @@
|
||||
|
||||
#include "pc/peer_connection_factory.h"
|
||||
|
||||
#include <cstdio>
|
||||
#include <memory>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
@ -107,6 +108,10 @@ PeerConnectionFactory::PeerConnectionFactory(
|
||||
wraps_current_thread_ = true;
|
||||
}
|
||||
}
|
||||
signaling_thread_->AllowInvokesToThread(worker_thread_);
|
||||
signaling_thread_->AllowInvokesToThread(network_thread_);
|
||||
worker_thread_->AllowInvokesToThread(network_thread_);
|
||||
network_thread_->DisallowAnyInvoke();
|
||||
}
|
||||
|
||||
PeerConnectionFactory::~PeerConnectionFactory() {
|
||||
|
@ -34,6 +34,7 @@
|
||||
#include "rtc_base/critical_section.h"
|
||||
#include "rtc_base/logging.h"
|
||||
#include "rtc_base/null_socket_server.h"
|
||||
#include "rtc_base/synchronization/sequence_checker.h"
|
||||
#include "rtc_base/task_utils/to_queued_task.h"
|
||||
#include "rtc_base/time_utils.h"
|
||||
#include "rtc_base/trace_event.h"
|
||||
@ -892,6 +893,7 @@ void Thread::Send(const Location& posted_from,
|
||||
AutoThread thread;
|
||||
Thread* current_thread = Thread::Current();
|
||||
RTC_DCHECK(current_thread != nullptr); // AutoThread ensures this
|
||||
RTC_DCHECK(current_thread->IsInvokeToThreadAllowed(this));
|
||||
#if RTC_DCHECK_IS_ON
|
||||
ThreadManager::Instance()->RegisterSendAndCheckForCycles(current_thread,
|
||||
this);
|
||||
@ -974,6 +976,50 @@ void Thread::QueuedTaskHandler::OnMessage(Message* msg) {
|
||||
task.release();
|
||||
}
|
||||
|
||||
void Thread::AllowInvokesToThread(Thread* thread) {
|
||||
#if (!defined(NDEBUG) || defined(DCHECK_ALWAYS_ON))
|
||||
if (!IsCurrent()) {
|
||||
PostTask(webrtc::ToQueuedTask(
|
||||
[thread, this]() { AllowInvokesToThread(thread); }));
|
||||
return;
|
||||
}
|
||||
RTC_DCHECK_RUN_ON(this);
|
||||
allowed_threads_.push_back(thread);
|
||||
invoke_policy_enabled_ = true;
|
||||
#endif
|
||||
}
|
||||
|
||||
void Thread::DisallowAnyInvoke() {
|
||||
#if (!defined(NDEBUG) || defined(DCHECK_ALWAYS_ON))
|
||||
if (!IsCurrent()) {
|
||||
PostTask(webrtc::ToQueuedTask([this]() { DisallowAnyInvoke(); }));
|
||||
return;
|
||||
}
|
||||
RTC_DCHECK_RUN_ON(this);
|
||||
allowed_threads_.clear();
|
||||
invoke_policy_enabled_ = true;
|
||||
#endif
|
||||
}
|
||||
|
||||
// Returns true if no policies added or if there is at least one policy
|
||||
// that permits invocation to |target| thread.
|
||||
bool Thread::IsInvokeToThreadAllowed(rtc::Thread* target) {
|
||||
#if (!defined(NDEBUG) || defined(DCHECK_ALWAYS_ON))
|
||||
RTC_DCHECK_RUN_ON(this);
|
||||
if (!invoke_policy_enabled_) {
|
||||
return true;
|
||||
}
|
||||
for (const auto* thread : allowed_threads_) {
|
||||
if (thread == target) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
#else
|
||||
return true;
|
||||
#endif
|
||||
}
|
||||
|
||||
void Thread::PostTask(std::unique_ptr<webrtc::QueuedTask> task) {
|
||||
// Though Post takes MessageData by raw pointer (last parameter), it still
|
||||
// takes it with ownership.
|
||||
|
@ -338,6 +338,18 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public webrtc::TaskQueueBase {
|
||||
InvokeInternal(posted_from, functor);
|
||||
}
|
||||
|
||||
// Allows invoke to specified |thread|. Thread never will be dereferenced and
|
||||
// will be used only for reference-based comparison, so instance can be safely
|
||||
// deleted. If NDEBUG is defined and DCHECK_ALWAYS_ON is undefined do nothing.
|
||||
void AllowInvokesToThread(Thread* thread);
|
||||
// If NDEBUG is defined and DCHECK_ALWAYS_ON is undefined do nothing.
|
||||
void DisallowAnyInvoke();
|
||||
// Returns true if |target| was allowed by AllowInvokesToThread() or if no
|
||||
// calls were made to AllowInvokesToThread and DisallowAnyInvoke. Otherwise
|
||||
// returns false.
|
||||
// If NDEBUG is defined and DCHECK_ALWAYS_ON is undefined always returns true.
|
||||
bool IsInvokeToThreadAllowed(rtc::Thread* target);
|
||||
|
||||
// Posts a task to invoke the functor on |this| thread asynchronously, i.e.
|
||||
// without blocking the thread that invoked PostTask(). Ownership of |functor|
|
||||
// is passed and (usually, see below) destroyed on |this| thread after it is
|
||||
@ -566,6 +578,10 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public webrtc::TaskQueueBase {
|
||||
MessageList messages_ RTC_GUARDED_BY(crit_);
|
||||
PriorityQueue delayed_messages_ RTC_GUARDED_BY(crit_);
|
||||
uint32_t delayed_next_num_ RTC_GUARDED_BY(crit_);
|
||||
#if (!defined(NDEBUG) || defined(DCHECK_ALWAYS_ON))
|
||||
std::vector<Thread*> allowed_threads_ RTC_GUARDED_BY(this);
|
||||
bool invoke_policy_enabled_ RTC_GUARDED_BY(this) = false;
|
||||
#endif
|
||||
CriticalSection crit_;
|
||||
bool fInitialized_;
|
||||
bool fDestroyed_;
|
||||
|
@ -288,6 +288,63 @@ TEST(ThreadTest, Wrap) {
|
||||
ThreadManager::Instance()->SetCurrentThread(current_thread);
|
||||
}
|
||||
|
||||
#if (!defined(NDEBUG) || defined(DCHECK_ALWAYS_ON))
|
||||
TEST(ThreadTest, InvokeToThreadAllowedReturnsTrueWithoutPolicies) {
|
||||
// Create and start the thread.
|
||||
auto thread1 = Thread::CreateWithSocketServer();
|
||||
auto thread2 = Thread::CreateWithSocketServer();
|
||||
|
||||
thread1->PostTask(ToQueuedTask(
|
||||
[&]() { EXPECT_TRUE(thread1->IsInvokeToThreadAllowed(thread2.get())); }));
|
||||
Thread* th_main = Thread::Current();
|
||||
th_main->ProcessMessages(100);
|
||||
}
|
||||
|
||||
TEST(ThreadTest, InvokeAllowedWhenThreadsAdded) {
|
||||
// Create and start the thread.
|
||||
auto thread1 = Thread::CreateWithSocketServer();
|
||||
auto thread2 = Thread::CreateWithSocketServer();
|
||||
auto thread3 = Thread::CreateWithSocketServer();
|
||||
auto thread4 = Thread::CreateWithSocketServer();
|
||||
|
||||
thread1->AllowInvokesToThread(thread2.get());
|
||||
thread1->AllowInvokesToThread(thread3.get());
|
||||
|
||||
thread1->PostTask(ToQueuedTask([&]() {
|
||||
EXPECT_TRUE(thread1->IsInvokeToThreadAllowed(thread2.get()));
|
||||
EXPECT_TRUE(thread1->IsInvokeToThreadAllowed(thread3.get()));
|
||||
EXPECT_FALSE(thread1->IsInvokeToThreadAllowed(thread4.get()));
|
||||
}));
|
||||
Thread* th_main = Thread::Current();
|
||||
th_main->ProcessMessages(100);
|
||||
}
|
||||
|
||||
TEST(ThreadTest, InvokesDisallowedWhenDisallowAnyInvoke) {
|
||||
// Create and start the thread.
|
||||
auto thread1 = Thread::CreateWithSocketServer();
|
||||
auto thread2 = Thread::CreateWithSocketServer();
|
||||
|
||||
thread1->DisallowAnyInvoke();
|
||||
|
||||
thread1->PostTask(ToQueuedTask([&]() {
|
||||
EXPECT_FALSE(thread1->IsInvokeToThreadAllowed(thread2.get()));
|
||||
}));
|
||||
Thread* th_main = Thread::Current();
|
||||
th_main->ProcessMessages(100);
|
||||
}
|
||||
#endif // (!defined(NDEBUG) || defined(DCHECK_ALWAYS_ON))
|
||||
|
||||
TEST(ThreadTest, InvokesAllowedByDefault) {
|
||||
// Create and start the thread.
|
||||
auto thread1 = Thread::CreateWithSocketServer();
|
||||
auto thread2 = Thread::CreateWithSocketServer();
|
||||
|
||||
thread1->PostTask(ToQueuedTask(
|
||||
[&]() { EXPECT_TRUE(thread1->IsInvokeToThreadAllowed(thread2.get())); }));
|
||||
Thread* th_main = Thread::Current();
|
||||
th_main->ProcessMessages(100);
|
||||
}
|
||||
|
||||
TEST(ThreadTest, Invoke) {
|
||||
// Create and start the thread.
|
||||
auto thread = Thread::CreateWithSocketServer();
|
||||
|
Reference in New Issue
Block a user