diff --git a/rtc_base/thread.cc b/rtc_base/thread.cc index 6dda76258a..0d1003ea78 100644 --- a/rtc_base/thread.cc +++ b/rtc_base/thread.cc @@ -14,6 +14,8 @@ #include #elif defined(WEBRTC_POSIX) #include +#else +#error "Either WEBRTC_WIN or WEBRTC_POSIX needs to be defined." #endif #include "rtc_base/checks.h" @@ -54,8 +56,7 @@ Thread* Thread::Current() { #if defined(WEBRTC_POSIX) #if !defined(WEBRTC_MAC) -ThreadManager::ThreadManager() { - main_thread_ref_ = CurrentThreadRef(); +ThreadManager::ThreadManager() : main_thread_ref_(CurrentThreadRef()) { pthread_key_create(&key_, nullptr); } #endif @@ -65,14 +66,14 @@ Thread *ThreadManager::CurrentThread() { } void ThreadManager::SetCurrentThread(Thread *thread) { + RTC_DCHECK(!CurrentThread() || !thread); pthread_setspecific(key_, thread); } #endif #if defined(WEBRTC_WIN) -ThreadManager::ThreadManager() { - main_thread_ref_ = CurrentThreadRef(); - key_ = TlsAlloc(); +ThreadManager::ThreadManager() + : key_(TlsAlloc()), main_thread_ref_(CurrentThreadRef()) { } Thread *ThreadManager::CurrentThread() { @@ -80,6 +81,7 @@ Thread *ThreadManager::CurrentThread() { } void ThreadManager::SetCurrentThread(Thread *thread) { + RTC_DCHECK(!CurrentThread() || !thread); TlsSetValue(key_, thread); } #endif @@ -118,28 +120,13 @@ Thread::ScopedDisallowBlockingCalls::~ScopedDisallowBlockingCalls() { // DEPRECATED. Thread::Thread() : Thread(SocketServer::CreateDefault()) {} -Thread::Thread(SocketServer* ss) - : MessageQueue(ss, false), - running_(true, false), -#if defined(WEBRTC_WIN) - thread_(nullptr), - thread_id_(0), -#endif - owned_(true), - blocking_calls_allowed_(true) { +Thread::Thread(SocketServer* ss) : MessageQueue(ss, false) { SetName("Thread", this); // default name DoInit(); } Thread::Thread(std::unique_ptr ss) - : MessageQueue(std::move(ss), false), - running_(true, false), -#if defined(WEBRTC_WIN) - thread_(nullptr), - thread_id_(0), -#endif - owned_(true), - blocking_calls_allowed_(true) { + : MessageQueue(std::move(ss), false) { SetName("Thread", this); // default name DoInit(); } @@ -184,7 +171,8 @@ bool Thread::SleepMs(int milliseconds) { } bool Thread::SetName(const std::string& name, const void* obj) { - if (running()) return false; + RTC_DCHECK(!IsRunning()); + name_ = name; if (obj) { char buf[16]; @@ -195,10 +183,11 @@ bool Thread::SetName(const std::string& name, const void* obj) { } bool Thread::Start(Runnable* runnable) { - RTC_DCHECK(owned_); - if (!owned_) return false; - RTC_DCHECK(!running()); - if (running()) return false; + RTC_DCHECK_RUN_ON(&thread_checker_); + RTC_DCHECK(!IsRunning()); + + if (IsRunning()) + return false; Restart(); // reset IsQuitting() if the thread is being restarted @@ -206,14 +195,14 @@ bool Thread::Start(Runnable* runnable) { // we start a new thread. ThreadManager::Instance(); + owned_ = true; + ThreadInit* init = new ThreadInit; init->thread = this; init->runnable = runnable; #if defined(WEBRTC_WIN) thread_ = CreateThread(nullptr, 0, PreRun, init, 0, &thread_id_); - if (thread_) { - running_.Set(); - } else { + if (!thread_) { return false; } #elif defined(WEBRTC_POSIX) @@ -223,9 +212,10 @@ bool Thread::Start(Runnable* runnable) { int error_code = pthread_create(&thread_, &attr, PreRun, init); if (0 != error_code) { RTC_LOG(LS_ERROR) << "Unable to create pthread, error " << error_code; + thread_ = 0; return false; } - running_.Set(); + RTC_DCHECK(thread_); #endif return true; } @@ -235,6 +225,7 @@ bool Thread::WrapCurrent() { } void Thread::UnwrapCurrent() { + RTC_DCHECK_RUN_ON(&thread_checker_); // Clears the platform-specific thread-specific storage. ThreadManager::Instance()->SetCurrentThread(nullptr); #if defined(WEBRTC_WIN) @@ -244,9 +235,12 @@ void Thread::UnwrapCurrent() { << "When unwrapping thread, failed to close handle."; } thread_ = nullptr; + thread_id_ = 0; } +#elif defined(WEBRTC_POSIX) + thread_ = 0; #endif - running_.Reset(); + thread_checker_.DetachFromThread(); } void Thread::SafeWrapCurrent() { @@ -254,25 +248,27 @@ void Thread::SafeWrapCurrent() { } void Thread::Join() { - if (running()) { - RTC_DCHECK(!IsCurrent()); - if (Current() && !Current()->blocking_calls_allowed_) { - RTC_LOG(LS_WARNING) << "Waiting for the thread to join, " - << "but blocking calls have been disallowed"; - } + RTC_DCHECK_RUN_ON(&thread_checker_); + if (!IsRunning()) + return; + + RTC_DCHECK(!IsCurrent()); + if (Current() && !Current()->blocking_calls_allowed_) { + RTC_LOG(LS_WARNING) << "Waiting for the thread to join, " + << "but blocking calls have been disallowed"; + } #if defined(WEBRTC_WIN) - RTC_DCHECK(thread_ != nullptr); - WaitForSingleObject(thread_, INFINITE); - CloseHandle(thread_); - thread_ = nullptr; - thread_id_ = 0; + RTC_DCHECK(thread_ != nullptr); + WaitForSingleObject(thread_, INFINITE); + CloseHandle(thread_); + thread_ = nullptr; + thread_id_ = 0; #elif defined(WEBRTC_POSIX) - void *pv; - pthread_join(thread_, &pv); + pthread_join(thread_, nullptr); + thread_ = 0; #endif - running_.Reset(); - } + thread_checker_.DetachFromThread(); } bool Thread::SetAllowBlockingCalls(bool allow) { @@ -305,6 +301,7 @@ void* Thread::PreRun(void* pv) { } else { init->thread->Run(); } + ThreadManager::Instance()->SetCurrentThread(nullptr); delete init; #ifdef WEBRTC_WIN return 0; @@ -319,12 +316,16 @@ void Thread::Run() { } bool Thread::IsOwned() { + RTC_DCHECK_RUN_ON(&thread_checker_); + RTC_DCHECK(IsRunning()); return owned_; } void Thread::Stop() { + RTC_DCHECK_RUN_ON(&thread_checker_); MessageQueue::Quit(); Join(); + thread_checker_.DetachFromThread(); } void Thread::Send(const Location& posted_from, @@ -498,8 +499,8 @@ bool Thread::ProcessMessages(int cmsLoop) { bool Thread::WrapCurrentWithThreadManager(ThreadManager* thread_manager, bool need_synchronize_access) { - if (running()) - return false; + RTC_DCHECK_RUN_ON(&thread_checker_); + RTC_DCHECK(!IsRunning()); #if defined(WEBRTC_WIN) if (need_synchronize_access) { @@ -515,13 +516,20 @@ bool Thread::WrapCurrentWithThreadManager(ThreadManager* thread_manager, #elif defined(WEBRTC_POSIX) thread_ = pthread_self(); #endif - owned_ = false; - running_.Set(); thread_manager->SetCurrentThread(this); return true; } +bool Thread::IsRunning() { + RTC_DCHECK_RUN_ON(&thread_checker_); +#if defined(WEBRTC_WIN) + return thread_ != nullptr; +#elif defined(WEBRTC_POSIX) + return thread_ != 0; +#endif +} + AutoThread::AutoThread() : Thread(SocketServer::CreateDefault()) { if (!ThreadManager::Instance()->CurrentThread()) { ThreadManager::Instance()->SetCurrentThread(this); @@ -539,6 +547,9 @@ AutoThread::~AutoThread() { AutoSocketServerThread::AutoSocketServerThread(SocketServer* ss) : Thread(ss) { old_thread_ = ThreadManager::Instance()->CurrentThread(); + // Temporarily set the current thread to nullptr so that we can keep checks + // around that catch unintentional pointer overwrites. + rtc::ThreadManager::Instance()->SetCurrentThread(nullptr); rtc::ThreadManager::Instance()->SetCurrentThread(this); if (old_thread_) { MessageQueueManager::Remove(old_thread_); @@ -558,6 +569,7 @@ AutoSocketServerThread::~AutoSocketServerThread() { // its contents rely on this thread still being set as the current thread. Stop(); DoDestroy(); + rtc::ThreadManager::Instance()->SetCurrentThread(nullptr); rtc::ThreadManager::Instance()->SetCurrentThread(old_thread_); if (old_thread_) { MessageQueueManager::Add(old_thread_); diff --git a/rtc_base/thread.h b/rtc_base/thread.h index 3e22c63f2d..d198ba83b9 100644 --- a/rtc_base/thread.h +++ b/rtc_base/thread.h @@ -22,9 +22,9 @@ #include #endif #include "rtc_base/constructormagic.h" -#include "rtc_base/event.h" #include "rtc_base/messagequeue.h" #include "rtc_base/platform_thread_types.h" +#include "rtc_base/thread_checker.h" #if defined(WEBRTC_WIN) #include "rtc_base/win32.h" @@ -71,11 +71,11 @@ class ThreadManager { #endif #if defined(WEBRTC_WIN) - DWORD key_; + const DWORD key_; #endif // The thread to potentially autowrap. - PlatformThreadRef main_thread_ref_; + const PlatformThreadRef main_thread_ref_; RTC_DISALLOW_COPY_AND_ASSIGN(ThreadManager); }; @@ -202,13 +202,13 @@ class RTC_LOCKABLE Thread : public MessageQueue { // You cannot call Start on non-owned threads. bool IsOwned(); - // Expose private method running() for tests. + // Expose private method IsRunning() for tests. // // DANGER: this is a terrible public API. Most callers that might want to // call this likely do not have enough control/knowledge of the Thread in // question to guarantee that the returned value remains true for the duration // of whatever code is conditionally executing because of the return value! - bool RunningForTest() { return running(); } + bool RunningForTest() { return IsRunning(); } // Sets the per-thread allow-blocking-calls flag and returns the previous // value. Must be called on this thread. @@ -256,8 +256,8 @@ class RTC_LOCKABLE Thread : public MessageQueue { bool WrapCurrentWithThreadManager(ThreadManager* thread_manager, bool need_synchronize_access); - // Return true if the thread was started and hasn't yet stopped. - bool running() { return running_.Wait(0); } + // Return true if the thread is currently running. + bool IsRunning(); // Processes received "Send" requests. If |source| is not null, only requests // from |source| are processed, otherwise, all requests are processed. @@ -273,19 +273,31 @@ class RTC_LOCKABLE Thread : public MessageQueue { std::list<_SendMessage> sendlist_; std::string name_; - Event running_; // Signalled means running. + + // Used to check access to unguarded thread control state variables and ensure + // that control functions are called on the right thread. + // The |thread_checker_| might represent a 'parent' thread from which + // Start()/Stop() are called or it could represent the worker thread itself in + // case the thread is wrapped since the functions that modified the control + // state will in this case be called from that thread. + ThreadChecker thread_checker_; #if defined(WEBRTC_POSIX) - pthread_t thread_; + pthread_t thread_ RTC_ACCESS_ON(thread_checker_) = 0; #endif #if defined(WEBRTC_WIN) - HANDLE thread_; - DWORD thread_id_; + HANDLE thread_ RTC_ACCESS_ON(thread_checker_) = nullptr; + DWORD thread_id_ RTC_ACCESS_ON(thread_checker_) = 0; #endif - bool owned_; - bool blocking_calls_allowed_; // By default set to |true|. + // Indicates whether or not ownership of the worker thread lies with + // this instance or not. (i.e. owned_ == !wrapped). + // Must only be modified when the worker thread is not running. + bool owned_ = true; + + // Only touched from the worker thread itself. + bool blocking_calls_allowed_ = true; friend class ThreadManager; diff --git a/rtc_base/thread_darwin.mm b/rtc_base/thread_darwin.mm index ab42d6e4e1..a404849c72 100644 --- a/rtc_base/thread_darwin.mm +++ b/rtc_base/thread_darwin.mm @@ -38,8 +38,7 @@ void InitCocoaMultiThreading() { namespace rtc { -ThreadManager::ThreadManager() { - main_thread_ref_ = CurrentThreadRef(); +ThreadManager::ThreadManager() : main_thread_ref_(CurrentThreadRef()) { pthread_key_create(&key_, nullptr); // This is necessary to alert the cocoa runtime of the fact that // we are running in a multithreaded environment. @@ -58,6 +57,7 @@ void* Thread::PreRun(void* pv) { init->thread->Run(); } } + ThreadManager::Instance()->SetCurrentThread(nullptr); delete init; return nullptr; }