Revert of Delete SignalThread class. (patchset #20 id:380001 of https://codereview.webrtc.org/2915253002/ )
Reason for revert:
Seems to be causing new crashes, possibly because of changes to the "Destroy(false)" behavior. Will re-land after investigating these crashes more and addressing the root cause.
Original issue's description:
> Delete SignalThread class.
>
> Rewrite AsyncResolver to use PlatformThread directly, not
> SignalThread, and update includes of peerconnection client to not
> depend on signalthread.h.
>
> BUG=webrtc:6424,webrtc:7723
>
> Review-Url: https://codereview.webrtc.org/2915253002
> Cr-Commit-Position: refs/heads/master@{#18833}
> Committed: bc8feda1db
TBR=tommi@webrtc.org,kwiberg@webrtc.org,nisse@webrtc.org
NOPRESUBMIT=true
NOTRY=true
BUG=webrtc:6424,webrtc:7723
Review-Url: https://codereview.webrtc.org/2979733002
Cr-Commit-Position: refs/heads/master@{#18980}
This commit is contained in:
19
webrtc/base/signalthread.h
Normal file
19
webrtc/base/signalthread.h
Normal file
@ -0,0 +1,19 @@
|
||||
/*
|
||||
* Copyright 2004 The WebRTC Project Authors. All rights reserved.
|
||||
*
|
||||
* Use of this source code is governed by a BSD-style license
|
||||
* that can be found in the LICENSE file in the root of the source
|
||||
* tree. An additional intellectual property rights grant can be found
|
||||
* in the file PATENTS. All contributing project authors may
|
||||
* be found in the AUTHORS file in the root of the source tree.
|
||||
*/
|
||||
|
||||
#ifndef WEBRTC_BASE_SIGNALTHREAD_H_
|
||||
#define WEBRTC_BASE_SIGNALTHREAD_H_
|
||||
|
||||
|
||||
// This header is deprecated and is just left here temporarily during
|
||||
// refactoring. See https://bugs.webrtc.org/7634 for more details.
|
||||
#include "webrtc/rtc_base/signalthread.h"
|
||||
|
||||
#endif // WEBRTC_BASE_SIGNALTHREAD_H_
|
||||
@ -15,7 +15,6 @@
|
||||
#include "webrtc/rtc_base/logging.h"
|
||||
#include "webrtc/rtc_base/nethelpers.h"
|
||||
#include "webrtc/rtc_base/stringutils.h"
|
||||
#include "webrtc/rtc_base/thread.h"
|
||||
|
||||
#ifdef WIN32
|
||||
#include "webrtc/rtc_base/win32socketserver.h"
|
||||
|
||||
@ -15,9 +15,9 @@
|
||||
#include <memory>
|
||||
#include <string>
|
||||
|
||||
#include "webrtc/rtc_base/messagehandler.h"
|
||||
#include "webrtc/rtc_base/nethelpers.h"
|
||||
#include "webrtc/rtc_base/physicalsocketserver.h"
|
||||
#include "webrtc/rtc_base/signalthread.h"
|
||||
#include "webrtc/rtc_base/sigslot.h"
|
||||
|
||||
typedef std::map<int, std::string> Peers;
|
||||
|
||||
@ -404,7 +404,6 @@ rtc_static_library("rtc_base") {
|
||||
libs = []
|
||||
defines = []
|
||||
deps = [
|
||||
":rtc_task_queue",
|
||||
"..:webrtc_common",
|
||||
]
|
||||
public_deps = [
|
||||
@ -480,6 +479,8 @@ rtc_static_library("rtc_base") {
|
||||
"rtccertificate.h",
|
||||
"rtccertificategenerator.cc",
|
||||
"rtccertificategenerator.h",
|
||||
"signalthread.cc",
|
||||
"signalthread.h",
|
||||
"sigslot.cc",
|
||||
"sigslot.h",
|
||||
"socket.h",
|
||||
@ -988,6 +989,7 @@ if (rtc_include_tests) {
|
||||
"rtccertificate_unittest.cc",
|
||||
"rtccertificategenerator_unittest.cc",
|
||||
"sha1digest_unittest.cc",
|
||||
"signalthread_unittest.cc",
|
||||
"sigslot_unittest.cc",
|
||||
"sigslottester_unittest.cc",
|
||||
"stream_unittest.cc",
|
||||
|
||||
@ -43,16 +43,6 @@ class SCOPED_LOCKABLE MarkProcessingCritScope {
|
||||
|
||||
RTC_DISALLOW_COPY_AND_ASSIGN(MarkProcessingCritScope);
|
||||
};
|
||||
|
||||
class FunctorPostMessageHandler : public MessageHandler {
|
||||
public:
|
||||
void OnMessage(Message* msg) override {
|
||||
RunnableData* data = static_cast<RunnableData*>(msg->pdata);
|
||||
data->Run();
|
||||
delete data;
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace
|
||||
|
||||
//------------------------------------------------------------------
|
||||
@ -546,12 +536,4 @@ void MessageQueue::Dispatch(Message *pmsg) {
|
||||
}
|
||||
}
|
||||
|
||||
void MessageQueue::PostFunctorInternal(const Location& posted_from,
|
||||
RunnableData* message_data) {
|
||||
// Use static to ensure it outlives this scope. Safe since
|
||||
// FunctorPostMessageHandler keeps no state.
|
||||
static FunctorPostMessageHandler handler;
|
||||
Post(posted_from, &handler, 0, message_data);
|
||||
}
|
||||
|
||||
} // namespace rtc
|
||||
|
||||
@ -149,23 +149,6 @@ class DisposeData : public MessageData {
|
||||
T* data_;
|
||||
};
|
||||
|
||||
// TODO(nisse): Replace RunnableData and FunctorData by a subclass of Message
|
||||
// owning a QueuedTask.
|
||||
class RunnableData : public MessageData {
|
||||
public:
|
||||
virtual void Run() = 0;
|
||||
};
|
||||
|
||||
template <class FunctorT>
|
||||
class FunctorData : public RunnableData {
|
||||
public:
|
||||
explicit FunctorData(FunctorT functor) : functor_(std::move(functor)) {}
|
||||
void Run() override { functor_(); }
|
||||
|
||||
private:
|
||||
FunctorT functor_;
|
||||
};
|
||||
|
||||
const uint32_t MQID_ANY = static_cast<uint32_t>(-1);
|
||||
const uint32_t MQID_DISPOSE = static_cast<uint32_t>(-2);
|
||||
|
||||
@ -255,19 +238,6 @@ class MessageQueue {
|
||||
uint32_t id = 0,
|
||||
MessageData* pdata = nullptr,
|
||||
bool time_sensitive = false);
|
||||
|
||||
// TODO(nisse): Replace with a method for posting a
|
||||
// std::unique_ptr<QueuedTask>, to ease gradual conversion to using TaskQueue.
|
||||
template <class FunctorT,
|
||||
// Additional type check, or else it collides with calls to the
|
||||
// above Post method with the optional arguments omitted.
|
||||
typename std::enable_if<!std::is_pointer<FunctorT>::value>::type* =
|
||||
nullptr>
|
||||
void Post(const Location& posted_from, FunctorT functor) {
|
||||
PostFunctorInternal(posted_from,
|
||||
new FunctorData<FunctorT>(std::move(functor)));
|
||||
}
|
||||
|
||||
virtual void PostDelayed(const Location& posted_from,
|
||||
int cmsDelay,
|
||||
MessageHandler* phandler,
|
||||
@ -344,9 +314,6 @@ class MessageQueue {
|
||||
bool fDestroyed_;
|
||||
|
||||
private:
|
||||
void PostFunctorInternal(const Location& posted_from,
|
||||
RunnableData* message_data);
|
||||
|
||||
volatile int stop_;
|
||||
|
||||
// The SocketServer might not be owned by MessageQueue.
|
||||
|
||||
@ -25,17 +25,13 @@
|
||||
#endif
|
||||
#endif // defined(WEBRTC_POSIX) && !defined(__native_client__)
|
||||
|
||||
#include "webrtc/rtc_base/bind.h"
|
||||
#include "webrtc/rtc_base/byteorder.h"
|
||||
#include "webrtc/rtc_base/checks.h"
|
||||
#include "webrtc/rtc_base/logging.h"
|
||||
#include "webrtc/rtc_base/ptr_util.h"
|
||||
#include "webrtc/rtc_base/task_queue.h"
|
||||
#include "webrtc/rtc_base/thread.h"
|
||||
#include "webrtc/rtc_base/signalthread.h"
|
||||
|
||||
namespace rtc {
|
||||
|
||||
namespace {
|
||||
int ResolveHostname(const std::string& hostname, int family,
|
||||
std::vector<IPAddress>* addresses) {
|
||||
#ifdef __native_client__
|
||||
@ -85,54 +81,17 @@ int ResolveHostname(const std::string& hostname, int family,
|
||||
return 0;
|
||||
#endif // !__native_client__
|
||||
}
|
||||
} // namespace
|
||||
|
||||
// AsyncResolver
|
||||
AsyncResolver::AsyncResolver() : construction_thread_(Thread::Current()) {
|
||||
RTC_DCHECK(construction_thread_);
|
||||
}
|
||||
AsyncResolver::AsyncResolver()
|
||||
: SignalThread(false /* use_socket_server */), error_(-1) {}
|
||||
|
||||
AsyncResolver::~AsyncResolver() {
|
||||
RTC_DCHECK(construction_thread_->IsCurrent());
|
||||
if (state_)
|
||||
// It's possible that we have a posted message waiting on the MessageQueue
|
||||
// refering to this object. Indirection via the ref-counted state_ object
|
||||
// ensure it doesn't access us after deletion.
|
||||
|
||||
// TODO(nisse): An alternative approach to solve this problem would be to
|
||||
// extend MessageQueue::Clear in some way to let us selectively cancel posts
|
||||
// directed to this object. Then we wouldn't need any ref count, but its a
|
||||
// larger change to the MessageQueue.
|
||||
state_->resolver = nullptr;
|
||||
}
|
||||
AsyncResolver::~AsyncResolver() = default;
|
||||
|
||||
void AsyncResolver::Start(const SocketAddress& addr) {
|
||||
RTC_DCHECK_RUN_ON(construction_thread_);
|
||||
RTC_DCHECK(!resolver_queue_);
|
||||
RTC_DCHECK(!state_);
|
||||
// TODO(nisse): Support injection of task queue at construction?
|
||||
resolver_queue_ = rtc::MakeUnique<TaskQueue>("AsyncResolverQueue");
|
||||
addr_ = addr;
|
||||
state_ = new RefCountedObject<Trampoline>(this);
|
||||
|
||||
// These member variables need to be copied to local variables to make it
|
||||
// possible to capture them, even for capture-by-copy.
|
||||
scoped_refptr<Trampoline> state = state_;
|
||||
rtc::Thread* construction_thread = construction_thread_;
|
||||
resolver_queue_->PostTask([state, addr, construction_thread]() {
|
||||
std::vector<IPAddress> addresses;
|
||||
int error =
|
||||
ResolveHostname(addr.hostname().c_str(), addr.family(), &addresses);
|
||||
// Ensure SignalDone is called on the main thread.
|
||||
// TODO(nisse): Should use move of the address list, but not easy until
|
||||
// C++17. Since this code isn't performance critical, copy should be fine
|
||||
// for now.
|
||||
construction_thread->Post(RTC_FROM_HERE, [state, error, addresses]() {
|
||||
if (!state->resolver)
|
||||
return;
|
||||
state->resolver->ResolveDone(error, std::move(addresses));
|
||||
});
|
||||
});
|
||||
// SignalThred Start will kickoff the resolve process.
|
||||
SignalThread::Start();
|
||||
}
|
||||
|
||||
bool AsyncResolver::GetResolvedAddress(int family, SocketAddress* addr) const {
|
||||
@ -154,41 +113,16 @@ int AsyncResolver::GetError() const {
|
||||
}
|
||||
|
||||
void AsyncResolver::Destroy(bool wait) {
|
||||
RTC_DCHECK_RUN_ON(construction_thread_);
|
||||
RTC_DCHECK(!state_ || state_->resolver);
|
||||
// If we don't wait here, we will nevertheless wait in the destructor.
|
||||
if (wait || !state_) {
|
||||
// Destroy task queue, blocks on any currently running task. If we have a
|
||||
// pending task, it will post a call to attempt to call ResolveDone before
|
||||
// finishing, which we will never handle.
|
||||
delete this;
|
||||
} else {
|
||||
destroyed_ = true;
|
||||
}
|
||||
SignalThread::Destroy(wait);
|
||||
}
|
||||
|
||||
void AsyncResolver::ResolveDone(int error, std::vector<IPAddress> addresses) {
|
||||
RTC_DCHECK_RUN_ON(construction_thread_);
|
||||
error_ = error;
|
||||
addresses_ = std::move(addresses);
|
||||
if (destroyed_) {
|
||||
delete this;
|
||||
return;
|
||||
} else {
|
||||
// Beware that SignalDone may call Destroy.
|
||||
void AsyncResolver::DoWork() {
|
||||
error_ = ResolveHostname(addr_.hostname().c_str(), addr_.family(),
|
||||
&addresses_);
|
||||
}
|
||||
|
||||
// TODO(nisse): Currently allows only Destroy(false) in this case,
|
||||
// and that's what all webrtc code is using. With Destroy(true),
|
||||
// this object would be destructed immediately, and the access
|
||||
// both to |destroyed_| below as well as the sigslot machinery
|
||||
// involved in SignalDone implies invalid use-after-free.
|
||||
SignalDone(this);
|
||||
if (destroyed_) {
|
||||
delete this;
|
||||
return;
|
||||
}
|
||||
}
|
||||
state_ = nullptr;
|
||||
void AsyncResolver::OnWorkDone() {
|
||||
SignalDone(this);
|
||||
}
|
||||
|
||||
const char* inet_ntop(int af, const void *src, char* dst, socklen_t size) {
|
||||
|
||||
@ -19,25 +19,19 @@
|
||||
#endif
|
||||
|
||||
#include <list>
|
||||
#include <memory>
|
||||
|
||||
#include "webrtc/rtc_base/asyncresolverinterface.h"
|
||||
#include "webrtc/rtc_base/refcount.h"
|
||||
#include "webrtc/rtc_base/scoped_ref_ptr.h"
|
||||
#include "webrtc/rtc_base/signalthread.h"
|
||||
#include "webrtc/rtc_base/sigslot.h"
|
||||
#include "webrtc/rtc_base/socketaddress.h"
|
||||
#include "webrtc/rtc_base/thread_checker.h"
|
||||
|
||||
namespace rtc {
|
||||
|
||||
class Thread;
|
||||
class TaskQueue;
|
||||
class AsyncResolverTest;
|
||||
|
||||
// AsyncResolver will perform async DNS resolution, signaling the result on the
|
||||
// SignalDone from AsyncResolverInterface when the operation completes.
|
||||
// SignalDone is fired on the same thread on which the AsyncResolver was
|
||||
// constructed.
|
||||
class AsyncResolver : public AsyncResolverInterface {
|
||||
// AsyncResolver will perform async DNS resolution, signaling the result on
|
||||
// the SignalDone from AsyncResolverInterface when the operation completes.
|
||||
class AsyncResolver : public SignalThread, public AsyncResolverInterface {
|
||||
public:
|
||||
AsyncResolver();
|
||||
~AsyncResolver() override;
|
||||
@ -48,34 +42,16 @@ class AsyncResolver : public AsyncResolverInterface {
|
||||
void Destroy(bool wait) override;
|
||||
|
||||
const std::vector<IPAddress>& addresses() const { return addresses_; }
|
||||
void set_error(int error) { error_ = error; }
|
||||
|
||||
protected:
|
||||
void DoWork() override;
|
||||
void OnWorkDone() override;
|
||||
|
||||
private:
|
||||
void ResolveDone(int error, std::vector<IPAddress> addresses);
|
||||
|
||||
class Trampoline : public RefCountInterface {
|
||||
public:
|
||||
Trampoline(AsyncResolver* resolver) : resolver(resolver) {}
|
||||
// Points back to the resolver, as long as it is alive. Cleared
|
||||
// by the AsyncResolver destructor.
|
||||
AsyncResolver* resolver;
|
||||
};
|
||||
|
||||
// |state_| is non-null while resolution is pending, i.e., set
|
||||
// non-null by Start() and cleared by ResolveDone(). The destructor
|
||||
// clears state_->resolver (assuming |state_| is non-null), to
|
||||
// indicate that the resolver can no longer be accessed.
|
||||
scoped_refptr<Trampoline> state_ ACCESS_ON(construction_thread_);
|
||||
|
||||
Thread* const construction_thread_;
|
||||
// Set to true when Destroy() can't delete the object immediately.
|
||||
// Indicate that the ResolveDone method is now responsible for
|
||||
// deletion. method should delete the object.
|
||||
bool destroyed_ = false;
|
||||
// Queue used only for a single task.
|
||||
std::unique_ptr<TaskQueue> resolver_queue_;
|
||||
SocketAddress addr_;
|
||||
std::vector<IPAddress> addresses_;
|
||||
int error_ = -1;
|
||||
int error_;
|
||||
};
|
||||
|
||||
// rtc namespaced wrappers for inet_ntop and inet_pton so we can avoid
|
||||
|
||||
154
webrtc/rtc_base/signalthread.cc
Normal file
154
webrtc/rtc_base/signalthread.cc
Normal file
@ -0,0 +1,154 @@
|
||||
/*
|
||||
* Copyright 2004 The WebRTC Project Authors. All rights reserved.
|
||||
*
|
||||
* Use of this source code is governed by a BSD-style license
|
||||
* that can be found in the LICENSE file in the root of the source
|
||||
* tree. An additional intellectual property rights grant can be found
|
||||
* in the file PATENTS. All contributing project authors may
|
||||
* be found in the AUTHORS file in the root of the source tree.
|
||||
*/
|
||||
|
||||
#include "webrtc/base/signalthread.h"
|
||||
|
||||
#include "webrtc/base/checks.h"
|
||||
|
||||
namespace rtc {
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
// SignalThread
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
SignalThread::SignalThread(bool use_socket_server)
|
||||
: main_(Thread::Current()),
|
||||
worker_(this, use_socket_server),
|
||||
state_(kInit),
|
||||
refcount_(1) {
|
||||
main_->SignalQueueDestroyed.connect(this,
|
||||
&SignalThread::OnMainThreadDestroyed);
|
||||
worker_.SetName("SignalThread", this);
|
||||
}
|
||||
|
||||
SignalThread::~SignalThread() {
|
||||
RTC_DCHECK(refcount_ == 0);
|
||||
}
|
||||
|
||||
bool SignalThread::SetName(const std::string& name, const void* obj) {
|
||||
EnterExit ee(this);
|
||||
RTC_DCHECK(main_->IsCurrent());
|
||||
RTC_DCHECK(kInit == state_);
|
||||
return worker_.SetName(name, obj);
|
||||
}
|
||||
|
||||
void SignalThread::Start() {
|
||||
EnterExit ee(this);
|
||||
RTC_DCHECK(main_->IsCurrent());
|
||||
if (kInit == state_ || kComplete == state_) {
|
||||
state_ = kRunning;
|
||||
OnWorkStart();
|
||||
worker_.Start();
|
||||
} else {
|
||||
RTC_NOTREACHED();
|
||||
}
|
||||
}
|
||||
|
||||
void SignalThread::Destroy(bool wait) {
|
||||
EnterExit ee(this);
|
||||
RTC_DCHECK(main_->IsCurrent());
|
||||
if ((kInit == state_) || (kComplete == state_)) {
|
||||
refcount_--;
|
||||
} else if (kRunning == state_ || kReleasing == state_) {
|
||||
state_ = kStopping;
|
||||
// OnWorkStop() must follow Quit(), so that when the thread wakes up due to
|
||||
// OWS(), ContinueWork() will return false.
|
||||
worker_.Quit();
|
||||
OnWorkStop();
|
||||
if (wait) {
|
||||
// Release the thread's lock so that it can return from ::Run.
|
||||
cs_.Leave();
|
||||
worker_.Stop();
|
||||
cs_.Enter();
|
||||
refcount_--;
|
||||
}
|
||||
} else {
|
||||
RTC_NOTREACHED();
|
||||
}
|
||||
}
|
||||
|
||||
void SignalThread::Release() {
|
||||
EnterExit ee(this);
|
||||
RTC_DCHECK(main_->IsCurrent());
|
||||
if (kComplete == state_) {
|
||||
refcount_--;
|
||||
} else if (kRunning == state_) {
|
||||
state_ = kReleasing;
|
||||
} else {
|
||||
// if (kInit == state_) use Destroy()
|
||||
RTC_NOTREACHED();
|
||||
}
|
||||
}
|
||||
|
||||
bool SignalThread::ContinueWork() {
|
||||
EnterExit ee(this);
|
||||
RTC_DCHECK(worker_.IsCurrent());
|
||||
return worker_.ProcessMessages(0);
|
||||
}
|
||||
|
||||
void SignalThread::OnMessage(Message *msg) {
|
||||
EnterExit ee(this);
|
||||
if (ST_MSG_WORKER_DONE == msg->message_id) {
|
||||
RTC_DCHECK(main_->IsCurrent());
|
||||
OnWorkDone();
|
||||
bool do_delete = false;
|
||||
if (kRunning == state_) {
|
||||
state_ = kComplete;
|
||||
} else {
|
||||
do_delete = true;
|
||||
}
|
||||
if (kStopping != state_) {
|
||||
// Before signaling that the work is done, make sure that the worker
|
||||
// thread actually is done. We got here because DoWork() finished and
|
||||
// Run() posted the ST_MSG_WORKER_DONE message. This means the worker
|
||||
// thread is about to go away anyway, but sometimes it doesn't actually
|
||||
// finish before SignalWorkDone is processed, and for a reusable
|
||||
// SignalThread this makes an assert in thread.cc fire.
|
||||
//
|
||||
// Calling Stop() on the worker ensures that the OS thread that underlies
|
||||
// the worker will finish, and will be set to null, enabling us to call
|
||||
// Start() again.
|
||||
worker_.Stop();
|
||||
SignalWorkDone(this);
|
||||
}
|
||||
if (do_delete) {
|
||||
refcount_--;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
SignalThread::Worker::~Worker() {
|
||||
Stop();
|
||||
}
|
||||
|
||||
void SignalThread::Worker::Run() {
|
||||
parent_->Run();
|
||||
}
|
||||
|
||||
void SignalThread::Run() {
|
||||
DoWork();
|
||||
{
|
||||
EnterExit ee(this);
|
||||
if (main_) {
|
||||
main_->Post(RTC_FROM_HERE, this, ST_MSG_WORKER_DONE);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void SignalThread::OnMainThreadDestroyed() {
|
||||
EnterExit ee(this);
|
||||
main_ = nullptr;
|
||||
}
|
||||
|
||||
bool SignalThread::Worker::IsProcessingMessages() {
|
||||
return false;
|
||||
}
|
||||
|
||||
} // namespace rtc
|
||||
161
webrtc/rtc_base/signalthread.h
Normal file
161
webrtc/rtc_base/signalthread.h
Normal file
@ -0,0 +1,161 @@
|
||||
/*
|
||||
* Copyright 2004 The WebRTC Project Authors. All rights reserved.
|
||||
*
|
||||
* Use of this source code is governed by a BSD-style license
|
||||
* that can be found in the LICENSE file in the root of the source
|
||||
* tree. An additional intellectual property rights grant can be found
|
||||
* in the file PATENTS. All contributing project authors may
|
||||
* be found in the AUTHORS file in the root of the source tree.
|
||||
*/
|
||||
|
||||
#ifndef WEBRTC_RTC_BASE_SIGNALTHREAD_H_
|
||||
#define WEBRTC_RTC_BASE_SIGNALTHREAD_H_
|
||||
|
||||
#include <string>
|
||||
|
||||
#include "webrtc/base/checks.h"
|
||||
#include "webrtc/base/constructormagic.h"
|
||||
#include "webrtc/base/nullsocketserver.h"
|
||||
#include "webrtc/base/sigslot.h"
|
||||
#include "webrtc/base/thread.h"
|
||||
|
||||
namespace rtc {
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
// SignalThread - Base class for worker threads. The main thread should call
|
||||
// Start() to begin work, and then follow one of these models:
|
||||
// Normal: Wait for SignalWorkDone, and then call Release to destroy.
|
||||
// Cancellation: Call Release(true), to abort the worker thread.
|
||||
// Fire-and-forget: Call Release(false), which allows the thread to run to
|
||||
// completion, and then self-destruct without further notification.
|
||||
// Periodic tasks: Wait for SignalWorkDone, then eventually call Start()
|
||||
// again to repeat the task. When the instance isn't needed anymore,
|
||||
// call Release. DoWork, OnWorkStart and OnWorkStop are called again,
|
||||
// on a new thread.
|
||||
// The subclass should override DoWork() to perform the background task. By
|
||||
// periodically calling ContinueWork(), it can check for cancellation.
|
||||
// OnWorkStart and OnWorkDone can be overridden to do pre- or post-work
|
||||
// tasks in the context of the main thread.
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
class SignalThread
|
||||
: public sigslot::has_slots<>,
|
||||
protected MessageHandler {
|
||||
public:
|
||||
explicit SignalThread(bool use_socket_server = true);
|
||||
|
||||
// Context: Main Thread. Call before Start to change the worker's name.
|
||||
bool SetName(const std::string& name, const void* obj);
|
||||
|
||||
// Context: Main Thread. Call to begin the worker thread.
|
||||
void Start();
|
||||
|
||||
// Context: Main Thread. If the worker thread is not running, deletes the
|
||||
// object immediately. Otherwise, asks the worker thread to abort processing,
|
||||
// and schedules the object to be deleted once the worker exits.
|
||||
// SignalWorkDone will not be signalled. If wait is true, does not return
|
||||
// until the thread is deleted.
|
||||
void Destroy(bool wait);
|
||||
|
||||
// Context: Main Thread. If the worker thread is complete, deletes the
|
||||
// object immediately. Otherwise, schedules the object to be deleted once
|
||||
// the worker thread completes. SignalWorkDone will be signalled.
|
||||
void Release();
|
||||
|
||||
// Context: Main Thread. Signalled when work is complete.
|
||||
sigslot::signal1<SignalThread *> SignalWorkDone;
|
||||
|
||||
enum { ST_MSG_WORKER_DONE, ST_MSG_FIRST_AVAILABLE };
|
||||
|
||||
protected:
|
||||
~SignalThread() override;
|
||||
|
||||
Thread* worker() { return &worker_; }
|
||||
|
||||
// Context: Main Thread. Subclass should override to do pre-work setup.
|
||||
virtual void OnWorkStart() { }
|
||||
|
||||
// Context: Worker Thread. Subclass should override to do work.
|
||||
virtual void DoWork() = 0;
|
||||
|
||||
// Context: Worker Thread. Subclass should call periodically to
|
||||
// dispatch messages and determine if the thread should terminate.
|
||||
bool ContinueWork();
|
||||
|
||||
// Context: Worker Thread. Subclass should override when extra work is
|
||||
// needed to abort the worker thread.
|
||||
virtual void OnWorkStop() { }
|
||||
|
||||
// Context: Main Thread. Subclass should override to do post-work cleanup.
|
||||
virtual void OnWorkDone() { }
|
||||
|
||||
// Context: Any Thread. If subclass overrides, be sure to call the base
|
||||
// implementation. Do not use (message_id < ST_MSG_FIRST_AVAILABLE)
|
||||
void OnMessage(Message* msg) override;
|
||||
|
||||
private:
|
||||
enum State {
|
||||
kInit, // Initialized, but not started
|
||||
kRunning, // Started and doing work
|
||||
kReleasing, // Same as running, but to be deleted when work is done
|
||||
kComplete, // Work is done
|
||||
kStopping, // Work is being interrupted
|
||||
};
|
||||
|
||||
class Worker : public Thread {
|
||||
public:
|
||||
explicit Worker(SignalThread* parent, bool use_socket_server)
|
||||
: Thread(use_socket_server
|
||||
? SocketServer::CreateDefault()
|
||||
: std::unique_ptr<SocketServer>(new NullSocketServer())),
|
||||
parent_(parent) {}
|
||||
~Worker() override;
|
||||
void Run() override;
|
||||
bool IsProcessingMessages() override;
|
||||
|
||||
private:
|
||||
SignalThread* parent_;
|
||||
|
||||
RTC_DISALLOW_IMPLICIT_CONSTRUCTORS(Worker);
|
||||
};
|
||||
|
||||
class SCOPED_LOCKABLE EnterExit {
|
||||
public:
|
||||
explicit EnterExit(SignalThread* t) EXCLUSIVE_LOCK_FUNCTION(t->cs_)
|
||||
: t_(t) {
|
||||
t_->cs_.Enter();
|
||||
// If refcount_ is zero then the object has already been deleted and we
|
||||
// will be double-deleting it in ~EnterExit()! (shouldn't happen)
|
||||
RTC_DCHECK_NE(0, t_->refcount_);
|
||||
++t_->refcount_;
|
||||
}
|
||||
~EnterExit() UNLOCK_FUNCTION() {
|
||||
bool d = (0 == --t_->refcount_);
|
||||
t_->cs_.Leave();
|
||||
if (d)
|
||||
delete t_;
|
||||
}
|
||||
|
||||
private:
|
||||
SignalThread* t_;
|
||||
|
||||
RTC_DISALLOW_IMPLICIT_CONSTRUCTORS(EnterExit);
|
||||
};
|
||||
|
||||
void Run();
|
||||
void OnMainThreadDestroyed();
|
||||
|
||||
Thread* main_;
|
||||
Worker worker_;
|
||||
CriticalSection cs_;
|
||||
State state_;
|
||||
int refcount_;
|
||||
|
||||
RTC_DISALLOW_COPY_AND_ASSIGN(SignalThread);
|
||||
};
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
} // namespace rtc
|
||||
|
||||
#endif // WEBRTC_RTC_BASE_SIGNALTHREAD_H_
|
||||
210
webrtc/rtc_base/signalthread_unittest.cc
Normal file
210
webrtc/rtc_base/signalthread_unittest.cc
Normal file
@ -0,0 +1,210 @@
|
||||
/*
|
||||
* Copyright 2004 The WebRTC Project Authors. All rights reserved.
|
||||
*
|
||||
* Use of this source code is governed by a BSD-style license
|
||||
* that can be found in the LICENSE file in the root of the source
|
||||
* tree. An additional intellectual property rights grant can be found
|
||||
* in the file PATENTS. All contributing project authors may
|
||||
* be found in the AUTHORS file in the root of the source tree.
|
||||
*/
|
||||
|
||||
#include <memory>
|
||||
|
||||
#include "webrtc/base/constructormagic.h"
|
||||
#include "webrtc/base/gunit.h"
|
||||
#include "webrtc/base/signalthread.h"
|
||||
#include "webrtc/base/thread.h"
|
||||
|
||||
using namespace rtc;
|
||||
|
||||
// 10 seconds.
|
||||
static const int kTimeout = 10000;
|
||||
|
||||
class SignalThreadTest : public testing::Test, public sigslot::has_slots<> {
|
||||
public:
|
||||
class SlowSignalThread : public SignalThread {
|
||||
public:
|
||||
SlowSignalThread(SignalThreadTest* harness) : harness_(harness) {}
|
||||
|
||||
virtual ~SlowSignalThread() {
|
||||
EXPECT_EQ(harness_->main_thread_, Thread::Current());
|
||||
++harness_->thread_deleted_;
|
||||
}
|
||||
|
||||
const SignalThreadTest* harness() { return harness_; }
|
||||
|
||||
protected:
|
||||
virtual void OnWorkStart() {
|
||||
ASSERT_TRUE(harness_ != nullptr);
|
||||
++harness_->thread_started_;
|
||||
EXPECT_EQ(harness_->main_thread_, Thread::Current());
|
||||
EXPECT_FALSE(worker()->RunningForTest()); // not started yet
|
||||
}
|
||||
|
||||
virtual void OnWorkStop() {
|
||||
++harness_->thread_stopped_;
|
||||
EXPECT_EQ(harness_->main_thread_, Thread::Current());
|
||||
EXPECT_TRUE(worker()->RunningForTest()); // not stopped yet
|
||||
}
|
||||
|
||||
virtual void OnWorkDone() {
|
||||
++harness_->thread_done_;
|
||||
EXPECT_EQ(harness_->main_thread_, Thread::Current());
|
||||
EXPECT_TRUE(worker()->RunningForTest()); // not stopped yet
|
||||
}
|
||||
|
||||
virtual void DoWork() {
|
||||
EXPECT_NE(harness_->main_thread_, Thread::Current());
|
||||
EXPECT_EQ(worker(), Thread::Current());
|
||||
Thread::Current()->socketserver()->Wait(250, false);
|
||||
}
|
||||
|
||||
private:
|
||||
SignalThreadTest* harness_;
|
||||
RTC_DISALLOW_COPY_AND_ASSIGN(SlowSignalThread);
|
||||
};
|
||||
|
||||
void OnWorkComplete(rtc::SignalThread* thread) {
|
||||
SlowSignalThread* t = static_cast<SlowSignalThread*>(thread);
|
||||
EXPECT_EQ(t->harness(), this);
|
||||
EXPECT_EQ(main_thread_, Thread::Current());
|
||||
|
||||
++thread_completed_;
|
||||
if (!called_release_) {
|
||||
thread->Release();
|
||||
}
|
||||
}
|
||||
|
||||
virtual void SetUp() {
|
||||
main_thread_ = Thread::Current();
|
||||
thread_ = new SlowSignalThread(this);
|
||||
thread_->SignalWorkDone.connect(this, &SignalThreadTest::OnWorkComplete);
|
||||
called_release_ = false;
|
||||
thread_started_ = 0;
|
||||
thread_done_ = 0;
|
||||
thread_completed_ = 0;
|
||||
thread_stopped_ = 0;
|
||||
thread_deleted_ = 0;
|
||||
}
|
||||
|
||||
virtual void TearDown() {}
|
||||
|
||||
void ExpectState(int started,
|
||||
int done,
|
||||
int completed,
|
||||
int stopped,
|
||||
int deleted) {
|
||||
EXPECT_EQ(started, thread_started_);
|
||||
EXPECT_EQ(done, thread_done_);
|
||||
EXPECT_EQ(completed, thread_completed_);
|
||||
EXPECT_EQ(stopped, thread_stopped_);
|
||||
EXPECT_EQ(deleted, thread_deleted_);
|
||||
}
|
||||
|
||||
void ExpectStateWait(int started,
|
||||
int done,
|
||||
int completed,
|
||||
int stopped,
|
||||
int deleted,
|
||||
int timeout) {
|
||||
EXPECT_EQ_WAIT(started, thread_started_, timeout);
|
||||
EXPECT_EQ_WAIT(done, thread_done_, timeout);
|
||||
EXPECT_EQ_WAIT(completed, thread_completed_, timeout);
|
||||
EXPECT_EQ_WAIT(stopped, thread_stopped_, timeout);
|
||||
EXPECT_EQ_WAIT(deleted, thread_deleted_, timeout);
|
||||
}
|
||||
|
||||
Thread* main_thread_;
|
||||
SlowSignalThread* thread_;
|
||||
bool called_release_;
|
||||
|
||||
int thread_started_;
|
||||
int thread_done_;
|
||||
int thread_completed_;
|
||||
int thread_stopped_;
|
||||
int thread_deleted_;
|
||||
};
|
||||
|
||||
class OwnerThread : public Thread, public sigslot::has_slots<> {
|
||||
public:
|
||||
explicit OwnerThread(SignalThreadTest* harness)
|
||||
: harness_(harness), has_run_(false) {}
|
||||
|
||||
virtual ~OwnerThread() { Stop(); }
|
||||
|
||||
virtual void Run() {
|
||||
SignalThreadTest::SlowSignalThread* signal_thread =
|
||||
new SignalThreadTest::SlowSignalThread(harness_);
|
||||
signal_thread->SignalWorkDone.connect(this, &OwnerThread::OnWorkDone);
|
||||
signal_thread->Start();
|
||||
Thread::Current()->socketserver()->Wait(100, false);
|
||||
signal_thread->Release();
|
||||
// Delete |signal_thread|.
|
||||
signal_thread->Destroy(true);
|
||||
has_run_ = true;
|
||||
}
|
||||
|
||||
bool has_run() { return has_run_; }
|
||||
void OnWorkDone(SignalThread* signal_thread) {
|
||||
FAIL() << " This shouldn't get called.";
|
||||
}
|
||||
|
||||
private:
|
||||
SignalThreadTest* harness_;
|
||||
bool has_run_;
|
||||
RTC_DISALLOW_COPY_AND_ASSIGN(OwnerThread);
|
||||
};
|
||||
|
||||
// Test for when the main thread goes away while the
|
||||
// signal thread is still working. This may happen
|
||||
// when shutting down the process.
|
||||
TEST_F(SignalThreadTest, OwnerThreadGoesAway) {
|
||||
// We don't use |thread_| for this test, so destroy it.
|
||||
thread_->Destroy(true);
|
||||
|
||||
{
|
||||
std::unique_ptr<OwnerThread> owner(new OwnerThread(this));
|
||||
main_thread_ = owner.get();
|
||||
owner->Start();
|
||||
while (!owner->has_run()) {
|
||||
Thread::Current()->socketserver()->Wait(10, false);
|
||||
}
|
||||
}
|
||||
// At this point the main thread has gone away.
|
||||
// Give the SignalThread a little time to do its callback,
|
||||
// which will crash if the signal thread doesn't handle
|
||||
// this situation well.
|
||||
Thread::Current()->socketserver()->Wait(500, false);
|
||||
}
|
||||
|
||||
TEST_F(SignalThreadTest, ThreadFinishes) {
|
||||
thread_->Start();
|
||||
ExpectState(1, 0, 0, 0, 0);
|
||||
ExpectStateWait(1, 1, 1, 0, 1, kTimeout);
|
||||
}
|
||||
|
||||
TEST_F(SignalThreadTest, ReleasedThreadFinishes) {
|
||||
thread_->Start();
|
||||
ExpectState(1, 0, 0, 0, 0);
|
||||
thread_->Release();
|
||||
called_release_ = true;
|
||||
ExpectState(1, 0, 0, 0, 0);
|
||||
ExpectStateWait(1, 1, 1, 0, 1, kTimeout);
|
||||
}
|
||||
|
||||
TEST_F(SignalThreadTest, DestroyedThreadCleansUp) {
|
||||
thread_->Start();
|
||||
ExpectState(1, 0, 0, 0, 0);
|
||||
thread_->Destroy(true);
|
||||
ExpectState(1, 0, 0, 1, 1);
|
||||
Thread::Current()->ProcessMessages(0);
|
||||
ExpectState(1, 0, 0, 1, 1);
|
||||
}
|
||||
|
||||
TEST_F(SignalThreadTest, DeferredDestroyedThreadCleansUp) {
|
||||
thread_->Start();
|
||||
ExpectState(1, 0, 0, 0, 0);
|
||||
thread_->Destroy(false);
|
||||
ExpectState(1, 0, 0, 1, 0);
|
||||
ExpectStateWait(1, 1, 0, 1, 1, kTimeout);
|
||||
}
|
||||
Reference in New Issue
Block a user