Reland of Make the default ctor of rtc::Thread, protected

This is a partial re-land. The change doesn't make the default Thread ctor protected anymore but it does mark it as deprecated and updates all use of it in WebRTC.

Original issue's description:

Make the default ctor of rtc::Thread, protected.
The goal is to force use of Thread::Create or Thread::CreateWithSocketServer.

The default constructor constructs a 'default' socket server, which is usually a 'physical' socket server, but not always. Not every instance of Thread actually needs to have network support, so it's better to have this be explicit instead of unknowingly instantiate one.

BUG=none

Review-Url: https://codereview.webrtc.org/2977953002
Cr-Commit-Position: refs/heads/master@{#19031}
This commit is contained in:
tommi
2017-07-14 14:44:46 -07:00
committed by Commit Bot
parent bc266bc867
commit e7251599a3
19 changed files with 178 additions and 153 deletions

View File

@ -38,7 +38,9 @@ static const VideoCodec kVideoCodecs[] = {
class ChannelManagerTest : public testing::Test { class ChannelManagerTest : public testing::Test {
protected: protected:
ChannelManagerTest() ChannelManagerTest()
: fme_(new cricket::FakeMediaEngine()), : network_(rtc::Thread::CreateWithSocketServer()),
worker_(rtc::Thread::Create()),
fme_(new cricket::FakeMediaEngine()),
fdme_(new cricket::FakeDataEngine()), fdme_(new cricket::FakeDataEngine()),
cm_(new cricket::ChannelManager( cm_(new cricket::ChannelManager(
std::unique_ptr<MediaEngineInterface>(fme_), std::unique_ptr<MediaEngineInterface>(fme_),
@ -52,8 +54,8 @@ class ChannelManagerTest : public testing::Test {
} }
webrtc::RtcEventLogNullImpl event_log_; webrtc::RtcEventLogNullImpl event_log_;
rtc::Thread network_; std::unique_ptr<rtc::Thread> network_;
rtc::Thread worker_; std::unique_ptr<rtc::Thread> worker_;
// |fme_| and |fdme_| are actually owned by |cm_|. // |fme_| and |fdme_| are actually owned by |cm_|.
cricket::FakeMediaEngine* fme_; cricket::FakeMediaEngine* fme_;
cricket::FakeDataEngine* fdme_; cricket::FakeDataEngine* fdme_;
@ -74,14 +76,14 @@ TEST_F(ChannelManagerTest, StartupShutdown) {
// Test that we startup/shutdown properly with a worker thread. // Test that we startup/shutdown properly with a worker thread.
TEST_F(ChannelManagerTest, StartupShutdownOnThread) { TEST_F(ChannelManagerTest, StartupShutdownOnThread) {
network_.Start(); network_->Start();
worker_.Start(); worker_->Start();
EXPECT_FALSE(cm_->initialized()); EXPECT_FALSE(cm_->initialized());
EXPECT_EQ(rtc::Thread::Current(), cm_->worker_thread()); EXPECT_EQ(rtc::Thread::Current(), cm_->worker_thread());
EXPECT_TRUE(cm_->set_network_thread(&network_)); EXPECT_TRUE(cm_->set_network_thread(network_.get()));
EXPECT_EQ(&network_, cm_->network_thread()); EXPECT_EQ(network_.get(), cm_->network_thread());
EXPECT_TRUE(cm_->set_worker_thread(&worker_)); EXPECT_TRUE(cm_->set_worker_thread(worker_.get()));
EXPECT_EQ(&worker_, cm_->worker_thread()); EXPECT_EQ(worker_.get(), cm_->worker_thread());
EXPECT_TRUE(cm_->Init()); EXPECT_TRUE(cm_->Init());
EXPECT_TRUE(cm_->initialized()); EXPECT_TRUE(cm_->initialized());
// Setting the network or worker thread while initialized should fail. // Setting the network or worker thread while initialized should fail.
@ -121,13 +123,13 @@ TEST_F(ChannelManagerTest, CreateDestroyChannels) {
// Test that we can create and destroy a voice and video channel with a worker. // Test that we can create and destroy a voice and video channel with a worker.
TEST_F(ChannelManagerTest, CreateDestroyChannelsOnThread) { TEST_F(ChannelManagerTest, CreateDestroyChannelsOnThread) {
network_.Start(); network_->Start();
worker_.Start(); worker_->Start();
EXPECT_TRUE(cm_->set_worker_thread(&worker_)); EXPECT_TRUE(cm_->set_worker_thread(worker_.get()));
EXPECT_TRUE(cm_->set_network_thread(&network_)); EXPECT_TRUE(cm_->set_network_thread(network_.get()));
EXPECT_TRUE(cm_->Init()); EXPECT_TRUE(cm_->Init());
transport_controller_.reset( transport_controller_.reset(new cricket::FakeTransportController(
new cricket::FakeTransportController(&network_, ICEROLE_CONTROLLING)); network_.get(), ICEROLE_CONTROLLING));
cricket::DtlsTransportInternal* rtp_transport = cricket::DtlsTransportInternal* rtp_transport =
transport_controller_->CreateDtlsTransport( transport_controller_->CreateDtlsTransport(
cricket::CN_AUDIO, cricket::ICE_CANDIDATE_COMPONENT_RTP); cricket::CN_AUDIO, cricket::ICE_CANDIDATE_COMPONENT_RTP);

View File

@ -54,12 +54,14 @@ class PeerConnectionEndToEndTest
DataChannelList; DataChannelList;
PeerConnectionEndToEndTest() { PeerConnectionEndToEndTest() {
RTC_CHECK(network_thread_.Start()); network_thread_ = rtc::Thread::CreateWithSocketServer();
RTC_CHECK(worker_thread_.Start()); worker_thread_ = rtc::Thread::Create();
RTC_CHECK(network_thread_->Start());
RTC_CHECK(worker_thread_->Start());
caller_ = new rtc::RefCountedObject<PeerConnectionTestWrapper>( caller_ = new rtc::RefCountedObject<PeerConnectionTestWrapper>(
"caller", &network_thread_, &worker_thread_); "caller", network_thread_.get(), worker_thread_.get());
callee_ = new rtc::RefCountedObject<PeerConnectionTestWrapper>( callee_ = new rtc::RefCountedObject<PeerConnectionTestWrapper>(
"callee", &network_thread_, &worker_thread_); "callee", network_thread_.get(), worker_thread_.get());
webrtc::PeerConnectionInterface::IceServer ice_server; webrtc::PeerConnectionInterface::IceServer ice_server;
ice_server.uri = "stun:stun.l.google.com:19302"; ice_server.uri = "stun:stun.l.google.com:19302";
config_.servers.push_back(ice_server); config_.servers.push_back(ice_server);
@ -165,8 +167,8 @@ class PeerConnectionEndToEndTest
} }
protected: protected:
rtc::Thread network_thread_; std::unique_ptr<rtc::Thread> network_thread_;
rtc::Thread worker_thread_; std::unique_ptr<rtc::Thread> worker_thread_;
rtc::scoped_refptr<PeerConnectionTestWrapper> caller_; rtc::scoped_refptr<PeerConnectionTestWrapper> caller_;
rtc::scoped_refptr<PeerConnectionTestWrapper> callee_; rtc::scoped_refptr<PeerConnectionTestWrapper> callee_;
DataChannelList caller_signaled_data_channels_; DataChannelList caller_signaled_data_channels_;

View File

@ -1278,14 +1278,15 @@ TEST_F(PeerConnectionInterfaceTest, CreatePeerConnectionWithPooledCandidates) {
// and on the correct thread. // and on the correct thread.
TEST_F(PeerConnectionInterfaceTest, TEST_F(PeerConnectionInterfaceTest,
CreatePeerConnectionInitializesPortAllocator) { CreatePeerConnectionInitializesPortAllocator) {
rtc::Thread network_thread; std::unique_ptr<rtc::Thread> network_thread(
network_thread.Start(); rtc::Thread::CreateWithSocketServer());
network_thread->Start();
rtc::scoped_refptr<webrtc::PeerConnectionFactoryInterface> pc_factory( rtc::scoped_refptr<webrtc::PeerConnectionFactoryInterface> pc_factory(
webrtc::CreatePeerConnectionFactory( webrtc::CreatePeerConnectionFactory(
&network_thread, rtc::Thread::Current(), rtc::Thread::Current(), network_thread.get(), rtc::Thread::Current(), rtc::Thread::Current(),
nullptr, nullptr, nullptr)); nullptr, nullptr, nullptr));
std::unique_ptr<cricket::FakePortAllocator> port_allocator( std::unique_ptr<cricket::FakePortAllocator> port_allocator(
new cricket::FakePortAllocator(&network_thread, nullptr)); new cricket::FakePortAllocator(network_thread.get(), nullptr));
cricket::FakePortAllocator* raw_port_allocator = port_allocator.get(); cricket::FakePortAllocator* raw_port_allocator = port_allocator.get();
PeerConnectionInterface::RTCConfiguration config; PeerConnectionInterface::RTCConfiguration config;
rtc::scoped_refptr<PeerConnectionInterface> pc( rtc::scoped_refptr<PeerConnectionInterface> pc(

View File

@ -95,7 +95,7 @@ class SignalingProxyTest : public testing::Test {
protected: protected:
void SetUp() override { void SetUp() override {
signaling_thread_.reset(new rtc::Thread()); signaling_thread_ = rtc::Thread::Create();
ASSERT_TRUE(signaling_thread_->Start()); ASSERT_TRUE(signaling_thread_->Start());
fake_ = Fake::Create(); fake_ = Fake::Create();
fake_signaling_proxy_ = fake_signaling_proxy_ =
@ -182,8 +182,8 @@ class ProxyTest : public testing::Test {
protected: protected:
void SetUp() override { void SetUp() override {
signaling_thread_.reset(new rtc::Thread()); signaling_thread_ = rtc::Thread::Create();
worker_thread_.reset(new rtc::Thread()); worker_thread_ = rtc::Thread::Create();
ASSERT_TRUE(signaling_thread_->Start()); ASSERT_TRUE(signaling_thread_->Start());
ASSERT_TRUE(worker_thread_->Start()); ASSERT_TRUE(worker_thread_->Start());
fake_ = Fake::Create(); fake_ = Fake::Create();
@ -283,20 +283,22 @@ END_PROXY_MAP()
class OwnedProxyTest : public testing::Test { class OwnedProxyTest : public testing::Test {
public: public:
OwnedProxyTest() OwnedProxyTest()
: foo_(new Foo()), : signaling_thread_(rtc::Thread::Create()),
foo_proxy_(FooProxy::Create(&signaling_thread_, worker_thread_(rtc::Thread::Create()),
&worker_thread_, foo_(new Foo()),
foo_proxy_(FooProxy::Create(signaling_thread_.get(),
worker_thread_.get(),
std::unique_ptr<FooInterface>(foo_))) { std::unique_ptr<FooInterface>(foo_))) {
signaling_thread_.Start(); signaling_thread_->Start();
worker_thread_.Start(); worker_thread_->Start();
} }
void CheckSignalingThread() { EXPECT_TRUE(signaling_thread_.IsCurrent()); } void CheckSignalingThread() { EXPECT_TRUE(signaling_thread_->IsCurrent()); }
void CheckWorkerThread() { EXPECT_TRUE(worker_thread_.IsCurrent()); } void CheckWorkerThread() { EXPECT_TRUE(worker_thread_->IsCurrent()); }
protected: protected:
rtc::Thread signaling_thread_; std::unique_ptr<rtc::Thread> signaling_thread_;
rtc::Thread worker_thread_; std::unique_ptr<rtc::Thread> worker_thread_;
Foo* foo_; // Owned by foo_proxy_, not this class. Foo* foo_; // Owned by foo_proxy_, not this class.
std::unique_ptr<FooInterface> foo_proxy_; std::unique_ptr<FooInterface> foo_proxy_;
}; };

View File

@ -34,14 +34,15 @@ const int64_t kGetStatsTimeoutMs = 10000;
class RTCStatsIntegrationTest : public testing::Test { class RTCStatsIntegrationTest : public testing::Test {
public: public:
RTCStatsIntegrationTest() RTCStatsIntegrationTest()
: network_thread_(&virtual_socket_server_), worker_thread_() { : network_thread_(new rtc::Thread(&virtual_socket_server_)),
RTC_CHECK(network_thread_.Start()); worker_thread_(rtc::Thread::Create()) {
RTC_CHECK(worker_thread_.Start()); RTC_CHECK(network_thread_->Start());
RTC_CHECK(worker_thread_->Start());
caller_ = new rtc::RefCountedObject<PeerConnectionTestWrapper>( caller_ = new rtc::RefCountedObject<PeerConnectionTestWrapper>(
"caller", &network_thread_, &worker_thread_); "caller", network_thread_.get(), worker_thread_.get());
callee_ = new rtc::RefCountedObject<PeerConnectionTestWrapper>( callee_ = new rtc::RefCountedObject<PeerConnectionTestWrapper>(
"callee", &network_thread_, &worker_thread_); "callee", network_thread_.get(), worker_thread_.get());
} }
void StartCall() { void StartCall() {
@ -96,8 +97,8 @@ class RTCStatsIntegrationTest : public testing::Test {
// |network_thread_| uses |virtual_socket_server_| so they must be // |network_thread_| uses |virtual_socket_server_| so they must be
// constructed/destructed in the correct order. // constructed/destructed in the correct order.
rtc::VirtualSocketServer virtual_socket_server_; rtc::VirtualSocketServer virtual_socket_server_;
rtc::Thread network_thread_; std::unique_ptr<rtc::Thread> network_thread_;
rtc::Thread worker_thread_; std::unique_ptr<rtc::Thread> worker_thread_;
rtc::scoped_refptr<PeerConnectionTestWrapper> caller_; rtc::scoped_refptr<PeerConnectionTestWrapper> caller_;
rtc::scoped_refptr<PeerConnectionTestWrapper> callee_; rtc::scoped_refptr<PeerConnectionTestWrapper> callee_;
}; };

View File

@ -624,7 +624,7 @@ bool FakeAudioCaptureModule::ShouldStartProcessing() {
void FakeAudioCaptureModule::UpdateProcessing(bool start) { void FakeAudioCaptureModule::UpdateProcessing(bool start) {
if (start) { if (start) {
if (!process_thread_) { if (!process_thread_) {
process_thread_.reset(new rtc::Thread()); process_thread_ = rtc::Thread::Create();
process_thread_->Start(); process_thread_->Start();
} }
process_thread_->Post(RTC_FROM_HERE, this, MSG_START_PROCESS); process_thread_->Post(RTC_FROM_HERE, this, MSG_START_PROCESS);

View File

@ -201,7 +201,7 @@ struct CompareAndSwapOp {
void StartThreads(std::vector<std::unique_ptr<Thread>>* threads, void StartThreads(std::vector<std::unique_ptr<Thread>>* threads,
MessageHandler* handler) { MessageHandler* handler) {
for (int i = 0; i < kNumThreads; ++i) { for (int i = 0; i < kNumThreads; ++i) {
std::unique_ptr<Thread> thread(new Thread()); std::unique_ptr<Thread> thread(Thread::Create());
thread->Start(); thread->Start();
thread->Post(RTC_FROM_HERE, handler); thread->Post(RTC_FROM_HERE, handler);
threads->push_back(std::move(thread)); threads->push_back(std::move(thread));

View File

@ -8,9 +8,10 @@
* be found in the AUTHORS file in the root of the source tree. * be found in the AUTHORS file in the root of the source tree.
*/ */
#include "webrtc/rtc_base/logging.h"
#include "webrtc/rtc_base/fileutils.h" #include "webrtc/rtc_base/fileutils.h"
#include "webrtc/rtc_base/gunit.h" #include "webrtc/rtc_base/gunit.h"
#include "webrtc/rtc_base/logging.h"
#include "webrtc/rtc_base/nullsocketserver.h"
#include "webrtc/rtc_base/pathutils.h" #include "webrtc/rtc_base/pathutils.h"
#include "webrtc/rtc_base/stream.h" #include "webrtc/rtc_base/stream.h"
#include "webrtc/rtc_base/thread.h" #include "webrtc/rtc_base/thread.h"
@ -88,6 +89,8 @@ TEST(LogTest, MultipleStreams) {
// We should restore the correct global state at the end. // We should restore the correct global state at the end.
class LogThread : public Thread { class LogThread : public Thread {
public: public:
LogThread() : Thread(std::unique_ptr<SocketServer>(new NullSocketServer())) {}
~LogThread() override { ~LogThread() override {
Stop(); Stop();
} }

View File

@ -38,9 +38,9 @@ class MessageQueueTest: public testing::Test, public MessageQueue {
bool IsLocked() { bool IsLocked() {
// We have to do this on a worker thread, or else the TryEnter will // We have to do this on a worker thread, or else the TryEnter will
// succeed, since our critical sections are reentrant. // succeed, since our critical sections are reentrant.
Thread worker; std::unique_ptr<Thread> worker(Thread::CreateWithSocketServer());
worker.Start(); worker->Start();
return worker.Invoke<bool>( return worker->Invoke<bool>(
RTC_FROM_HERE, rtc::Bind(&MessageQueueTest::IsLocked_Worker, this)); RTC_FROM_HERE, rtc::Bind(&MessageQueueTest::IsLocked_Worker, this));
} }
}; };
@ -152,10 +152,10 @@ TEST(MessageQueueManager, Clear) {
// all registered message queues. // all registered message queues.
TEST(MessageQueueManager, ProcessAllMessageQueues) { TEST(MessageQueueManager, ProcessAllMessageQueues) {
Event entered_process_all_message_queues(true, false); Event entered_process_all_message_queues(true, false);
Thread a; auto a = Thread::CreateWithSocketServer();
Thread b; auto b = Thread::CreateWithSocketServer();
a.Start(); a->Start();
b.Start(); b->Start();
volatile int messages_processed = 0; volatile int messages_processed = 0;
FunctorMessageHandler<void, std::function<void()>> incrementer( FunctorMessageHandler<void, std::function<void()>> incrementer(
@ -173,10 +173,10 @@ TEST(MessageQueueManager, ProcessAllMessageQueues) {
}); });
// Post messages (both delayed and non delayed) to both threads. // Post messages (both delayed and non delayed) to both threads.
a.Post(RTC_FROM_HERE, &incrementer); a->Post(RTC_FROM_HERE, &incrementer);
b.Post(RTC_FROM_HERE, &incrementer); b->Post(RTC_FROM_HERE, &incrementer);
a.PostDelayed(RTC_FROM_HERE, 0, &incrementer); a->PostDelayed(RTC_FROM_HERE, 0, &incrementer);
b.PostDelayed(RTC_FROM_HERE, 0, &incrementer); b->PostDelayed(RTC_FROM_HERE, 0, &incrementer);
rtc::Thread::Current()->Post(RTC_FROM_HERE, &event_signaler); rtc::Thread::Current()->Post(RTC_FROM_HERE, &event_signaler);
MessageQueueManager::ProcessAllMessageQueues(); MessageQueueManager::ProcessAllMessageQueues();
@ -185,9 +185,9 @@ TEST(MessageQueueManager, ProcessAllMessageQueues) {
// Test that ProcessAllMessageQueues doesn't hang if a thread is quitting. // Test that ProcessAllMessageQueues doesn't hang if a thread is quitting.
TEST(MessageQueueManager, ProcessAllMessageQueuesWithQuittingThread) { TEST(MessageQueueManager, ProcessAllMessageQueuesWithQuittingThread) {
Thread t; auto t = Thread::CreateWithSocketServer();
t.Start(); t->Start();
t.Quit(); t->Quit();
MessageQueueManager::ProcessAllMessageQueues(); MessageQueueManager::ProcessAllMessageQueues();
} }
@ -195,8 +195,8 @@ TEST(MessageQueueManager, ProcessAllMessageQueuesWithQuittingThread) {
// messages. // messages.
TEST(MessageQueueManager, ProcessAllMessageQueuesWithClearedQueue) { TEST(MessageQueueManager, ProcessAllMessageQueuesWithClearedQueue) {
Event entered_process_all_message_queues(true, false); Event entered_process_all_message_queues(true, false);
Thread t; auto t = Thread::CreateWithSocketServer();
t.Start(); t->Start();
FunctorMessageHandler<void, std::function<void()>> clearer( FunctorMessageHandler<void, std::function<void()>> clearer(
[&entered_process_all_message_queues] { [&entered_process_all_message_queues] {
@ -213,7 +213,7 @@ TEST(MessageQueueManager, ProcessAllMessageQueuesWithClearedQueue) {
}); });
// Post messages (both delayed and non delayed) to both threads. // Post messages (both delayed and non delayed) to both threads.
t.Post(RTC_FROM_HERE, &clearer); t->Post(RTC_FROM_HERE, &clearer);
rtc::Thread::Current()->Post(RTC_FROM_HERE, &event_signaler); rtc::Thread::Current()->Post(RTC_FROM_HERE, &event_signaler);
MessageQueueManager::ProcessAllMessageQueues(); MessageQueueManager::ProcessAllMessageQueues();
} }
@ -231,7 +231,7 @@ class EmptyHandler : public MessageHandler {
}; };
TEST(MessageQueueManager, ClearReentrant) { TEST(MessageQueueManager, ClearReentrant) {
Thread t; std::unique_ptr<Thread> t(Thread::Create());
EmptyHandler handler; EmptyHandler handler;
RefCountedHandler* inner_handler( RefCountedHandler* inner_handler(
new rtc::RefCountedObject<RefCountedHandler>()); new rtc::RefCountedObject<RefCountedHandler>());
@ -242,7 +242,7 @@ TEST(MessageQueueManager, ClearReentrant) {
// The inner handler will be removed in a re-entrant fashion from the // The inner handler will be removed in a re-entrant fashion from the
// message queue of the thread while the outer handler is removed, verifying // message queue of the thread while the outer handler is removed, verifying
// that the iterator is not invalidated in "MessageQueue::Clear". // that the iterator is not invalidated in "MessageQueue::Clear".
t.Post(RTC_FROM_HERE, inner_handler, 0); t->Post(RTC_FROM_HERE, inner_handler, 0);
t.Post(RTC_FROM_HERE, &handler, 0, t->Post(RTC_FROM_HERE, &handler, 0,
new ScopedRefMessageData<RefCountedHandler>(inner_handler)); new ScopedRefMessageData<RefCountedHandler>(inner_handler));
} }

View File

@ -28,9 +28,9 @@ class NullSocketServerTest
}; };
TEST_F(NullSocketServerTest, WaitAndSet) { TEST_F(NullSocketServerTest, WaitAndSet) {
Thread thread; auto thread = Thread::Create();
EXPECT_TRUE(thread.Start()); EXPECT_TRUE(thread->Start());
thread.Post(RTC_FROM_HERE, this, 0); thread->Post(RTC_FROM_HERE, this, 0);
// The process_io will be ignored. // The process_io will be ignored.
const bool process_io = true; const bool process_io = true;
EXPECT_TRUE_WAIT(ss_.Wait(SocketServer::kForever, process_io), kTimeout); EXPECT_TRUE_WAIT(ss_.Wait(SocketServer::kForever, process_io), kTimeout);

View File

@ -610,7 +610,7 @@ TEST_F(PosixSignalDeliveryTest, SignalOnDifferentThread) {
// Start a new thread that raises it. It will have to be delivered to that // Start a new thread that raises it. It will have to be delivered to that
// thread. Our implementation should safely handle it and dispatch // thread. Our implementation should safely handle it and dispatch
// RecordSignal() on this thread. // RecordSignal() on this thread.
std::unique_ptr<Thread> thread(new Thread()); std::unique_ptr<Thread> thread(Thread::CreateWithSocketServer());
std::unique_ptr<RaiseSigTermRunnable> runnable(new RaiseSigTermRunnable()); std::unique_ptr<RaiseSigTermRunnable> runnable(new RaiseSigTermRunnable());
thread->Start(runnable.get()); thread->Start(runnable.get());
EXPECT_TRUE(ss_->Wait(1500, true)); EXPECT_TRUE(ss_->Wait(1500, true));

View File

@ -24,7 +24,7 @@ class RTCCertificateGeneratorFixture : public RTCCertificateGeneratorCallback {
public: public:
RTCCertificateGeneratorFixture() RTCCertificateGeneratorFixture()
: signaling_thread_(Thread::Current()), : signaling_thread_(Thread::Current()),
worker_thread_(new Thread()), worker_thread_(Thread::Create()),
generate_async_completed_(false) { generate_async_completed_(false) {
RTC_CHECK(signaling_thread_); RTC_CHECK(signaling_thread_);
RTC_CHECK(worker_thread_->Start()); RTC_CHECK(worker_thread_->Start());

View File

@ -681,7 +681,7 @@ void SocketTest::SocketServerWaitInternal(const IPAddress& loopback) {
EXPECT_FALSE(sink.Check(accepted.get(), SSE_READ)); EXPECT_FALSE(sink.Check(accepted.get(), SSE_READ));
// Shouldn't signal when blocked in a thread Send, where process_io is false. // Shouldn't signal when blocked in a thread Send, where process_io is false.
std::unique_ptr<Thread> thread(new Thread()); std::unique_ptr<Thread> thread(Thread::CreateWithSocketServer());
thread->Start(); thread->Start();
Sleeper sleeper; Sleeper sleeper;
TypedMessageData<AsyncSocket*> data(client.get()); TypedMessageData<AsyncSocket*> data(client.get());

View File

@ -44,7 +44,7 @@ Thread* Thread::Current() {
#ifndef NO_MAIN_THREAD_WRAPPING #ifndef NO_MAIN_THREAD_WRAPPING
// Only autowrap the thread which instantiated the ThreadManager. // Only autowrap the thread which instantiated the ThreadManager.
if (!thread && manager->IsMainThread()) { if (!thread && manager->IsMainThread()) {
thread = new Thread(); thread = new Thread(SocketServer::CreateDefault());
thread->WrapCurrentWithThreadManager(manager, true); thread->WrapCurrentWithThreadManager(manager, true);
} }
#endif #endif
@ -87,7 +87,7 @@ void ThreadManager::SetCurrentThread(Thread *thread) {
Thread *ThreadManager::WrapCurrentThread() { Thread *ThreadManager::WrapCurrentThread() {
Thread* result = CurrentThread(); Thread* result = CurrentThread();
if (nullptr == result) { if (nullptr == result) {
result = new Thread(); result = new Thread(SocketServer::CreateDefault());
result->WrapCurrentWithThreadManager(this, true); result->WrapCurrentWithThreadManager(this, true);
} }
return result; return result;
@ -115,6 +115,7 @@ Thread::ScopedDisallowBlockingCalls::~ScopedDisallowBlockingCalls() {
thread_->SetAllowBlockingCalls(previous_state_); thread_->SetAllowBlockingCalls(previous_state_);
} }
// DEPRECATED.
Thread::Thread() : Thread(SocketServer::CreateDefault()) {} Thread::Thread() : Thread(SocketServer::CreateDefault()) {}
Thread::Thread(SocketServer* ss) Thread::Thread(SocketServer* ss)
@ -520,7 +521,7 @@ bool Thread::WrapCurrentWithThreadManager(ThreadManager* thread_manager,
return true; return true;
} }
AutoThread::AutoThread() { AutoThread::AutoThread() : Thread(SocketServer::CreateDefault()) {
if (!ThreadManager::Instance()->CurrentThread()) { if (!ThreadManager::Instance()->CurrentThread()) {
ThreadManager::Instance()->SetCurrentThread(this); ThreadManager::Instance()->SetCurrentThread(this);
} }

View File

@ -102,8 +102,13 @@ class Runnable {
class LOCKABLE Thread : public MessageQueue { class LOCKABLE Thread : public MessageQueue {
public: public:
// Create a new Thread and optionally assign it to the passed SocketServer. // DEPRECATED.
// The default constructor should not be used because it hides whether or
// not a socket server will be associated with the thread. Most instances
// of Thread do actually not need one, so please use either of the Create*
// methods to construct an instance of Thread.
Thread(); Thread();
explicit Thread(SocketServer* ss); explicit Thread(SocketServer* ss);
explicit Thread(std::unique_ptr<SocketServer> ss); explicit Thread(std::unique_ptr<SocketServer> ss);

View File

@ -14,6 +14,7 @@
#include "webrtc/rtc_base/checks.h" #include "webrtc/rtc_base/checks.h"
#include "webrtc/rtc_base/constructormagic.h" #include "webrtc/rtc_base/constructormagic.h"
#include "webrtc/rtc_base/nullsocketserver.h"
#include "webrtc/rtc_base/task_queue.h" #include "webrtc/rtc_base/task_queue.h"
#include "webrtc/rtc_base/thread.h" #include "webrtc/rtc_base/thread.h"
#include "webrtc/rtc_base/thread_checker.h" #include "webrtc/rtc_base/thread_checker.h"
@ -52,7 +53,7 @@ class ThreadCheckerClass : public ThreadChecker {
class CallDoStuffOnThread : public Thread { class CallDoStuffOnThread : public Thread {
public: public:
explicit CallDoStuffOnThread(ThreadCheckerClass* thread_checker_class) explicit CallDoStuffOnThread(ThreadCheckerClass* thread_checker_class)
: Thread(), : Thread(std::unique_ptr<SocketServer>(new rtc::NullSocketServer())),
thread_checker_class_(thread_checker_class) { thread_checker_class_(thread_checker_class) {
SetName("call_do_stuff_on_thread", nullptr); SetName("call_do_stuff_on_thread", nullptr);
} }
@ -75,9 +76,9 @@ class CallDoStuffOnThread : public Thread {
class DeleteThreadCheckerClassOnThread : public Thread { class DeleteThreadCheckerClassOnThread : public Thread {
public: public:
explicit DeleteThreadCheckerClassOnThread( explicit DeleteThreadCheckerClassOnThread(
ThreadCheckerClass* thread_checker_class) std::unique_ptr<ThreadCheckerClass> thread_checker_class)
: Thread(), : Thread(std::unique_ptr<SocketServer>(new rtc::NullSocketServer())),
thread_checker_class_(thread_checker_class) { thread_checker_class_(std::move(thread_checker_class)) {
SetName("delete_thread_checker_class_on_thread", nullptr); SetName("delete_thread_checker_class_on_thread", nullptr);
} }
@ -89,6 +90,8 @@ class DeleteThreadCheckerClassOnThread : public Thread {
Thread::Join(); Thread::Join();
} }
bool has_been_deleted() const { return !thread_checker_class_; }
private: private:
std::unique_ptr<ThreadCheckerClass> thread_checker_class_; std::unique_ptr<ThreadCheckerClass> thread_checker_class_;
@ -115,10 +118,14 @@ TEST(ThreadCheckerTest, DestructorAllowedOnDifferentThread) {
// Verify that the destructor doesn't assert // Verify that the destructor doesn't assert
// when called on a different thread. // when called on a different thread.
DeleteThreadCheckerClassOnThread delete_on_thread( DeleteThreadCheckerClassOnThread delete_on_thread(
thread_checker_class.release()); std::move(thread_checker_class));
EXPECT_FALSE(delete_on_thread.has_been_deleted());
delete_on_thread.Start(); delete_on_thread.Start();
delete_on_thread.Join(); delete_on_thread.Join();
EXPECT_TRUE(delete_on_thread.has_been_deleted());
} }
TEST(ThreadCheckerTest, DetachFromThread) { TEST(ThreadCheckerTest, DetachFromThread) {

View File

@ -14,6 +14,7 @@
#include "webrtc/rtc_base/asyncudpsocket.h" #include "webrtc/rtc_base/asyncudpsocket.h"
#include "webrtc/rtc_base/event.h" #include "webrtc/rtc_base/event.h"
#include "webrtc/rtc_base/gunit.h" #include "webrtc/rtc_base/gunit.h"
#include "webrtc/rtc_base/nullsocketserver.h"
#include "webrtc/rtc_base/physicalsocketserver.h" #include "webrtc/rtc_base/physicalsocketserver.h"
#include "webrtc/rtc_base/sigslot.h" #include "webrtc/rtc_base/sigslot.h"
#include "webrtc/rtc_base/socketaddress.h" #include "webrtc/rtc_base/socketaddress.h"
@ -106,7 +107,8 @@ class MessageClient : public MessageHandler, public TestGenerator {
class CustomThread : public rtc::Thread { class CustomThread : public rtc::Thread {
public: public:
CustomThread() {} CustomThread()
: Thread(std::unique_ptr<SocketServer>(new rtc::NullSocketServer())) {}
virtual ~CustomThread() { Stop(); } virtual ~CustomThread() { Stop(); }
bool Start() { return false; } bool Start() { return false; }
@ -124,8 +126,8 @@ class CustomThread : public rtc::Thread {
class SignalWhenDestroyedThread : public Thread { class SignalWhenDestroyedThread : public Thread {
public: public:
SignalWhenDestroyedThread(Event* event) SignalWhenDestroyedThread(Event* event)
: event_(event) { : Thread(std::unique_ptr<SocketServer>(new NullSocketServer())),
} event_(event) {}
virtual ~SignalWhenDestroyedThread() { virtual ~SignalWhenDestroyedThread() {
Stop(); Stop();
@ -195,24 +197,24 @@ TEST(ThreadTest, DISABLED_Main) {
const SocketAddress addr("127.0.0.1", 0); const SocketAddress addr("127.0.0.1", 0);
// Create the messaging client on its own thread. // Create the messaging client on its own thread.
Thread th1; auto th1 = Thread::CreateWithSocketServer();
Socket* socket = th1.socketserver()->CreateAsyncSocket(addr.family(), Socket* socket =
SOCK_DGRAM); th1->socketserver()->CreateAsyncSocket(addr.family(), SOCK_DGRAM);
MessageClient msg_client(&th1, socket); MessageClient msg_client(th1.get(), socket);
// Create the socket client on its own thread. // Create the socket client on its own thread.
Thread th2; auto th2 = Thread::CreateWithSocketServer();
AsyncSocket* asocket = AsyncSocket* asocket =
th2.socketserver()->CreateAsyncSocket(addr.family(), SOCK_DGRAM); th2->socketserver()->CreateAsyncSocket(addr.family(), SOCK_DGRAM);
SocketClient sock_client(asocket, addr, &th1, &msg_client); SocketClient sock_client(asocket, addr, th1.get(), &msg_client);
socket->Connect(sock_client.address()); socket->Connect(sock_client.address());
th1.Start(); th1->Start();
th2.Start(); th2->Start();
// Get the messages started. // Get the messages started.
th1.PostDelayed(RTC_FROM_HERE, 100, &msg_client, 0, new TestMessage(1)); th1->PostDelayed(RTC_FROM_HERE, 100, &msg_client, 0, new TestMessage(1));
// Give the clients a little while to run. // Give the clients a little while to run.
// Messages will be processed at 100, 300, 500, 700, 900. // Messages will be processed at 100, 300, 500, 700, 900.
@ -221,9 +223,9 @@ TEST(ThreadTest, DISABLED_Main) {
// Stop the sending client. Give the receiver a bit longer to run, in case // Stop the sending client. Give the receiver a bit longer to run, in case
// it is running on a machine that is under load (e.g. the build machine). // it is running on a machine that is under load (e.g. the build machine).
th1.Stop(); th1->Stop();
th_main->ProcessMessages(200); th_main->ProcessMessages(200);
th2.Stop(); th2->Stop();
// Make sure the results were correct // Make sure the results were correct
EXPECT_EQ(5, msg_client.count); EXPECT_EQ(5, msg_client.count);
@ -236,23 +238,19 @@ TEST(ThreadTest, DISABLED_Main) {
// There's no easy way to verify the name was set properly at this time. // There's no easy way to verify the name was set properly at this time.
TEST(ThreadTest, Names) { TEST(ThreadTest, Names) {
// Default name // Default name
Thread *thread; auto thread = Thread::CreateWithSocketServer();
thread = new Thread();
EXPECT_TRUE(thread->Start()); EXPECT_TRUE(thread->Start());
thread->Stop(); thread->Stop();
delete thread;
thread = new Thread();
// Name with no object parameter // Name with no object parameter
thread = Thread::CreateWithSocketServer();
EXPECT_TRUE(thread->SetName("No object", nullptr)); EXPECT_TRUE(thread->SetName("No object", nullptr));
EXPECT_TRUE(thread->Start()); EXPECT_TRUE(thread->Start());
thread->Stop(); thread->Stop();
delete thread;
// Really long name // Really long name
thread = new Thread(); thread = Thread::CreateWithSocketServer();
EXPECT_TRUE(thread->SetName("Abcdefghijklmnopqrstuvwxyz1234567890", this)); EXPECT_TRUE(thread->SetName("Abcdefghijklmnopqrstuvwxyz1234567890", this));
EXPECT_TRUE(thread->Start()); EXPECT_TRUE(thread->Start());
thread->Stop(); thread->Stop();
delete thread;
} }
TEST(ThreadTest, Wrap) { TEST(ThreadTest, Wrap) {
@ -270,21 +268,21 @@ TEST(ThreadTest, Wrap) {
TEST(ThreadTest, Invoke) { TEST(ThreadTest, Invoke) {
// Create and start the thread. // Create and start the thread.
Thread thread; auto thread = Thread::CreateWithSocketServer();
thread.Start(); thread->Start();
// Try calling functors. // Try calling functors.
EXPECT_EQ(42, thread.Invoke<int>(RTC_FROM_HERE, FunctorA())); EXPECT_EQ(42, thread->Invoke<int>(RTC_FROM_HERE, FunctorA()));
AtomicBool called; AtomicBool called;
FunctorB f2(&called); FunctorB f2(&called);
thread.Invoke<void>(RTC_FROM_HERE, f2); thread->Invoke<void>(RTC_FROM_HERE, f2);
EXPECT_TRUE(called.get()); EXPECT_TRUE(called.get());
// Try calling bare functions. // Try calling bare functions.
struct LocalFuncs { struct LocalFuncs {
static int Func1() { return 999; } static int Func1() { return 999; }
static void Func2() {} static void Func2() {}
}; };
EXPECT_EQ(999, thread.Invoke<int>(RTC_FROM_HERE, &LocalFuncs::Func1)); EXPECT_EQ(999, thread->Invoke<int>(RTC_FROM_HERE, &LocalFuncs::Func1));
thread.Invoke<void>(RTC_FROM_HERE, &LocalFuncs::Func2); thread->Invoke<void>(RTC_FROM_HERE, &LocalFuncs::Func2);
} }
// Verifies that two threads calling Invoke on each other at the same time does // Verifies that two threads calling Invoke on each other at the same time does
@ -294,8 +292,8 @@ TEST(ThreadTest, TwoThreadsInvokeNoDeadlock) {
Thread* current_thread = Thread::Current(); Thread* current_thread = Thread::Current();
ASSERT_TRUE(current_thread != nullptr); ASSERT_TRUE(current_thread != nullptr);
Thread other_thread; auto other_thread = Thread::CreateWithSocketServer();
other_thread.Start(); other_thread->Start();
struct LocalFuncs { struct LocalFuncs {
static void Set(bool* out) { *out = true; } static void Set(bool* out) { *out = true; }
@ -305,7 +303,7 @@ TEST(ThreadTest, TwoThreadsInvokeNoDeadlock) {
}; };
bool called = false; bool called = false;
other_thread.Invoke<void>( other_thread->Invoke<void>(
RTC_FROM_HERE, Bind(&LocalFuncs::InvokeSet, current_thread, &called)); RTC_FROM_HERE, Bind(&LocalFuncs::InvokeSet, current_thread, &called));
EXPECT_TRUE(called); EXPECT_TRUE(called);
@ -317,9 +315,10 @@ TEST(ThreadTest, TwoThreadsInvokeNoDeadlock) {
TEST(ThreadTest, ThreeThreadsInvoke) { TEST(ThreadTest, ThreeThreadsInvoke) {
AutoThread thread; AutoThread thread;
Thread* thread_a = Thread::Current(); Thread* thread_a = Thread::Current();
Thread thread_b, thread_c; auto thread_b = Thread::CreateWithSocketServer();
thread_b.Start(); auto thread_c = Thread::CreateWithSocketServer();
thread_c.Start(); thread_b->Start();
thread_c->Start();
class LockedBool { class LockedBool {
public: public:
@ -377,9 +376,9 @@ TEST(ThreadTest, ThreeThreadsInvoke) {
// Start the sequence A --(invoke)--> B --(async invoke)--> C --(invoke)--> A. // Start the sequence A --(invoke)--> B --(async invoke)--> C --(invoke)--> A.
// Thread B returns when C receives the call and C should be blocked until A // Thread B returns when C receives the call and C should be blocked until A
// starts to process messages. // starts to process messages.
thread_b.Invoke<void>(RTC_FROM_HERE, thread_b->Invoke<void>(RTC_FROM_HERE,
Bind(&LocalFuncs::AsyncInvokeSetAndWait, &invoker, Bind(&LocalFuncs::AsyncInvokeSetAndWait, &invoker,
&thread_c, thread_a, &thread_a_called)); thread_c.get(), thread_a, &thread_a_called));
EXPECT_FALSE(thread_a_called.Get()); EXPECT_FALSE(thread_a_called.Get());
EXPECT_TRUE_WAIT(thread_a_called.Get(), 2000); EXPECT_TRUE_WAIT(thread_a_called.Get(), 2000);
@ -406,9 +405,9 @@ class SetNameOnSignalQueueDestroyedTester : public sigslot::has_slots<> {
}; };
TEST(ThreadTest, SetNameOnSignalQueueDestroyed) { TEST(ThreadTest, SetNameOnSignalQueueDestroyed) {
Thread* thread1 = new Thread(); auto thread1 = Thread::CreateWithSocketServer();
SetNameOnSignalQueueDestroyedTester tester1(thread1); SetNameOnSignalQueueDestroyedTester tester1(thread1.get());
delete thread1; thread1.reset();
Thread* thread2 = new AutoThread(); Thread* thread2 = new AutoThread();
SetNameOnSignalQueueDestroyedTester tester2(thread2); SetNameOnSignalQueueDestroyedTester tester2(thread2);
@ -438,12 +437,13 @@ class AsyncInvokeTest : public testing::Test {
TEST_F(AsyncInvokeTest, FireAndForget) { TEST_F(AsyncInvokeTest, FireAndForget) {
AsyncInvoker invoker; AsyncInvoker invoker;
// Create and start the thread. // Create and start the thread.
Thread thread; auto thread = Thread::CreateWithSocketServer();
thread.Start(); thread->Start();
// Try calling functor. // Try calling functor.
AtomicBool called; AtomicBool called;
invoker.AsyncInvoke<void>(RTC_FROM_HERE, &thread, FunctorB(&called)); invoker.AsyncInvoke<void>(RTC_FROM_HERE, thread.get(), FunctorB(&called));
EXPECT_TRUE_WAIT(called.get(), kWaitTimeout); EXPECT_TRUE_WAIT(called.get(), kWaitTimeout);
thread->Stop();
} }
TEST_F(AsyncInvokeTest, KillInvokerDuringExecute) { TEST_F(AsyncInvokeTest, KillInvokerDuringExecute) {
@ -454,12 +454,12 @@ TEST_F(AsyncInvokeTest, KillInvokerDuringExecute) {
Event functor_continue(false, false); Event functor_continue(false, false);
Event functor_finished(false, false); Event functor_finished(false, false);
Thread thread; auto thread = Thread::CreateWithSocketServer();
thread.Start(); thread->Start();
volatile bool invoker_destroyed = false; volatile bool invoker_destroyed = false;
{ {
AsyncInvoker invoker; AsyncInvoker invoker;
invoker.AsyncInvoke<void>(RTC_FROM_HERE, &thread, invoker.AsyncInvoke<void>(RTC_FROM_HERE, thread.get(),
[&functor_started, &functor_continue, [&functor_started, &functor_continue,
&functor_finished, &invoker_destroyed] { &functor_finished, &invoker_destroyed] {
functor_started.Set(); functor_started.Set();
@ -550,7 +550,7 @@ struct CreateInvoker {
// Test that we can call AsyncInvoke<void>() after the thread died. // Test that we can call AsyncInvoke<void>() after the thread died.
TEST_F(GuardedAsyncInvokeTest, KillThreadFireAndForget) { TEST_F(GuardedAsyncInvokeTest, KillThreadFireAndForget) {
// Create and start the thread. // Create and start the thread.
std::unique_ptr<Thread> thread(new Thread()); std::unique_ptr<Thread> thread(Thread::Create());
thread->Start(); thread->Start();
std::unique_ptr<GuardedAsyncInvoker> invoker; std::unique_ptr<GuardedAsyncInvoker> invoker;
// Create the invoker on |thread|. // Create the invoker on |thread|.

View File

@ -349,8 +349,8 @@ TEST(FakeClock, SettingTimeWakesThreads) {
FakeClock clock; FakeClock clock;
SetClockForTesting(&clock); SetClockForTesting(&clock);
Thread worker; std::unique_ptr<Thread> worker(Thread::CreateWithSocketServer());
worker.Start(); worker->Start();
// Post an event that won't be executed for 10 seconds. // Post an event that won't be executed for 10 seconds.
Event message_handler_dispatched(false, false); Event message_handler_dispatched(false, false);
@ -358,7 +358,7 @@ TEST(FakeClock, SettingTimeWakesThreads) {
message_handler_dispatched.Set(); message_handler_dispatched.Set();
}; };
FunctorMessageHandler<void, decltype(functor)> handler(functor); FunctorMessageHandler<void, decltype(functor)> handler(functor);
worker.PostDelayed(RTC_FROM_HERE, 60000, &handler); worker->PostDelayed(RTC_FROM_HERE, 60000, &handler);
// Wait for a bit for the worker thread to be started and enter its socket // Wait for a bit for the worker thread to be started and enter its socket
// select(). Otherwise this test would be trivial since the worker thread // select(). Otherwise this test would be trivial since the worker thread
@ -369,7 +369,7 @@ TEST(FakeClock, SettingTimeWakesThreads) {
// and dispatch the message instantly. // and dispatch the message instantly.
clock.AdvanceTime(TimeDelta::FromSeconds(60u)); clock.AdvanceTime(TimeDelta::FromSeconds(60u));
EXPECT_TRUE(message_handler_dispatched.Wait(0)); EXPECT_TRUE(message_handler_dispatched.Wait(0));
worker.Stop(); worker->Stop();
SetClockForTesting(nullptr); SetClockForTesting(nullptr);

View File

@ -55,7 +55,7 @@ namespace webrtc_jni {
// Logging macros. // Logging macros.
#define TAG_DECODER "MediaCodecVideoDecoder" #define TAG_DECODER "MediaCodecVideoDecoder"
#ifdef TRACK_BUFFER_TIMING #ifdef TRACK_BUFFER_TIMING
#define ALOGV(...) #define ALOGV(...) \
__android_log_print(ANDROID_LOG_VERBOSE, TAG_DECODER, __VA_ARGS__) __android_log_print(ANDROID_LOG_VERBOSE, TAG_DECODER, __VA_ARGS__)
#else #else
#define ALOGV(...) #define ALOGV(...)
@ -180,14 +180,15 @@ class MediaCodecVideoDecoder : public webrtc::VideoDecoder,
std::vector<jobject> input_buffers_; std::vector<jobject> input_buffers_;
}; };
MediaCodecVideoDecoder::MediaCodecVideoDecoder( MediaCodecVideoDecoder::MediaCodecVideoDecoder(JNIEnv* jni,
JNIEnv* jni, VideoCodecType codecType, jobject render_egl_context) : VideoCodecType codecType,
codecType_(codecType), jobject render_egl_context)
: codecType_(codecType),
render_egl_context_(render_egl_context), render_egl_context_(render_egl_context),
key_frame_required_(true), key_frame_required_(true),
inited_(false), inited_(false),
sw_fallback_required_(false), sw_fallback_required_(false),
codec_thread_(new Thread()), codec_thread_(Thread::Create()),
j_media_codec_video_decoder_class_( j_media_codec_video_decoder_class_(
jni, jni,
FindClass(jni, "org/webrtc/MediaCodecVideoDecoder")), FindClass(jni, "org/webrtc/MediaCodecVideoDecoder")),