Adds clearer function to create rtc::Thread without Physical SocketServer

Moves logic of default SocketServer from MessageQueue to SocketServer

Review-Url: https://codereview.webrtc.org/1891293002
Cr-Commit-Position: refs/heads/master@{#12541}
This commit is contained in:
danilchap
2016-04-28 01:32:48 -07:00
committed by Commit bot
parent 84583e03c7
commit bebf54cad1
11 changed files with 130 additions and 76 deletions

View File

@ -274,6 +274,7 @@ static_library("rtc_base") {
"network.h",
"networkmonitor.cc",
"networkmonitor.h",
"nullsocketserver.cc",
"nullsocketserver.h",
"openssl.h",
"openssladapter.cc",

View File

@ -207,6 +207,7 @@
'network.h',
'networkmonitor.cc',
'networkmonitor.h',
'nullsocketserver.cc',
'nullsocketserver.h',
'openssl.h',
'openssladapter.cc',

View File

@ -7,23 +7,12 @@
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#if defined(WEBRTC_POSIX)
#include <sys/time.h>
#endif
#include <algorithm>
#include "webrtc/base/checks.h"
#include "webrtc/base/common.h"
#include "webrtc/base/logging.h"
#include "webrtc/base/messagequeue.h"
#if defined(__native_client__)
#include "webrtc/base/nullsocketserver.h"
typedef rtc::NullSocketServer DefaultSocketServer;
#else
#include "webrtc/base/physicalsocketserver.h"
typedef rtc::PhysicalSocketServer DefaultSocketServer;
#endif
namespace rtc {
@ -115,25 +104,26 @@ void MessageQueueManager::ClearInternal(MessageHandler *handler) {
//------------------------------------------------------------------
// MessageQueue
MessageQueue::MessageQueue(SocketServer* ss, bool init_queue)
: fStop_(false), fPeekKeep_(false),
dmsgq_next_num_(0), fInitialized_(false), fDestroyed_(false), ss_(ss) {
if (!ss_) {
// Currently, MessageQueue holds a socket server, and is the base class for
// Thread. It seems like it makes more sense for Thread to hold the socket
// server, and provide it to the MessageQueue, since the Thread controls
// the I/O model, and MQ is agnostic to those details. Anyway, this causes
// messagequeue_unittest to depend on network libraries... yuck.
default_ss_.reset(new DefaultSocketServer());
ss_ = default_ss_.get();
}
RTC_DCHECK(ss);
// Currently, MessageQueue holds a socket server, and is the base class for
// Thread. It seems like it makes more sense for Thread to hold the socket
// server, and provide it to the MessageQueue, since the Thread controls
// the I/O model, and MQ is agnostic to those details. Anyway, this causes
// messagequeue_unittest to depend on network libraries... yuck.
ss_->SetMessageQueue(this);
if (init_queue) {
DoInit();
}
}
MessageQueue::MessageQueue(std::unique_ptr<SocketServer> ss, bool init_queue)
: MessageQueue(ss.get(), init_queue) {
own_ss_ = std::move(ss);
}
MessageQueue::~MessageQueue() {
DoDestroy();
}
@ -178,7 +168,7 @@ void MessageQueue::set_socketserver(SocketServer* ss) {
// Other places that only read "ss_" can use a shared lock as simultaneous
// read access is allowed.
ExclusiveScope es(&ss_lock_);
ss_ = ss ? ss : default_ss_.get();
ss_ = ss ? ss : own_ss_.get();
ss_->SetMessageQueue(this);
}

View File

@ -175,8 +175,8 @@ class MessageQueue {
// init_queue and call DoInit() from their constructor to prevent races
// with the MessageQueueManager using the object while the vtable is still
// being created.
explicit MessageQueue(SocketServer* ss = NULL,
bool init_queue = true);
MessageQueue(SocketServer* ss, bool init_queue);
MessageQueue(std::unique_ptr<SocketServer> ss, bool init_queue);
// NOTE: SUBCLASSES OF MessageQueue THAT OVERRIDE Clear MUST CALL
// DoDestroy() IN THEIR DESTRUCTORS! This is required to avoid a data race
@ -276,13 +276,13 @@ class MessageQueue {
bool fDestroyed_;
private:
// The SocketServer is not owned by MessageQueue.
// The SocketServer might not be owned by MessageQueue.
SocketServer* ss_ GUARDED_BY(ss_lock_);
// If a server isn't supplied in the constructor, use this one.
std::unique_ptr<SocketServer> default_ss_;
// Used if SocketServer ownership lies with |this|.
std::unique_ptr<SocketServer> own_ss_;
SharedExclusiveLock ss_lock_;
RTC_DISALLOW_COPY_AND_ASSIGN(MessageQueue);
RTC_DISALLOW_IMPLICIT_CONSTRUCTORS(MessageQueue);
};
} // namespace rtc

View File

@ -21,6 +21,7 @@ using namespace rtc;
class MessageQueueTest: public testing::Test, public MessageQueue {
public:
MessageQueueTest() : MessageQueue(SocketServer::CreateDefault(), true) {}
bool IsLocked_Worker() {
if (!crit_.TryEnter()) {
return true;
@ -72,10 +73,11 @@ static void DelayedPostsWithIdenticalTimesAreProcessedInFifoOrder(
TEST_F(MessageQueueTest,
DelayedPostsWithIdenticalTimesAreProcessedInFifoOrder) {
MessageQueue q;
MessageQueue q(SocketServer::CreateDefault(), true);
DelayedPostsWithIdenticalTimesAreProcessedInFifoOrder(&q);
NullSocketServer nullss;
MessageQueue q_nullss(&nullss);
MessageQueue q_nullss(&nullss, true);
DelayedPostsWithIdenticalTimesAreProcessedInFifoOrder(&q_nullss);
}

View File

@ -0,0 +1,49 @@
/*
* Copyright 2016 The WebRTC Project Authors. All rights reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "webrtc/base/checks.h"
#include "webrtc/base/nullsocketserver.h"
namespace rtc {
NullSocketServer::NullSocketServer() : event_(false, false) {}
NullSocketServer::~NullSocketServer() {}
bool NullSocketServer::Wait(int cms, bool process_io) {
event_.Wait(cms);
return true;
}
void NullSocketServer::WakeUp() {
event_.Set();
}
rtc::Socket* NullSocketServer::CreateSocket(int /* type */) {
RTC_NOTREACHED();
return nullptr;
}
rtc::Socket* NullSocketServer::CreateSocket(int /* family */, int /* type */) {
RTC_NOTREACHED();
return nullptr;
}
rtc::AsyncSocket* NullSocketServer::CreateAsyncSocket(int /* type */) {
RTC_NOTREACHED();
return nullptr;
}
rtc::AsyncSocket* NullSocketServer::CreateAsyncSocket(int /* family */,
int /* type */) {
RTC_NOTREACHED();
return nullptr;
}
} // namespace rtc

View File

@ -12,48 +12,25 @@
#define WEBRTC_BASE_NULLSOCKETSERVER_H_
#include "webrtc/base/event.h"
#include "webrtc/base/physicalsocketserver.h"
#include "webrtc/base/socketserver.h"
namespace rtc {
// NullSocketServer
class NullSocketServer : public rtc::SocketServer {
class NullSocketServer : public SocketServer {
public:
NullSocketServer() : event_(false, false) {}
NullSocketServer();
~NullSocketServer() override;
virtual bool Wait(int cms, bool process_io) {
event_.Wait(cms);
return true;
}
virtual void WakeUp() {
event_.Set();
}
virtual rtc::Socket* CreateSocket(int type) {
ASSERT(false);
return NULL;
}
virtual rtc::Socket* CreateSocket(int family, int type) {
ASSERT(false);
return NULL;
}
virtual rtc::AsyncSocket* CreateAsyncSocket(int type) {
ASSERT(false);
return NULL;
}
virtual rtc::AsyncSocket* CreateAsyncSocket(int family, int type) {
ASSERT(false);
return NULL;
}
bool Wait(int cms, bool process_io) override;
void WakeUp() override;
Socket* CreateSocket(int type) override;
Socket* CreateSocket(int family, int type) override;
AsyncSocket* CreateAsyncSocket(int type) override;
AsyncSocket* CreateAsyncSocket(int family, int type) override;
private:
rtc::Event event_;
Event event_;
};
} // namespace rtc

View File

@ -46,6 +46,7 @@
#include "webrtc/base/common.h"
#include "webrtc/base/logging.h"
#include "webrtc/base/networkmonitor.h"
#include "webrtc/base/nullsocketserver.h"
#include "webrtc/base/timeutils.h"
#include "webrtc/base/winping.h"
#include "webrtc/base/win32socketinit.h"
@ -62,6 +63,14 @@ typedef char* SockOptArg;
namespace rtc {
std::unique_ptr<SocketServer> SocketServer::CreateDefault() {
#if defined(__native_client__)
return std::unique_ptr<SocketServer>(new rtc::NullSocketServer);
#else
return std::unique_ptr<SocketServer>(new rtc::PhysicalSocketServer);
#endif
}
#if defined(WEBRTC_WIN)
// Standard MTUs, from RFC 1191
const uint16_t PACKET_MAXIMUMS[] = {

View File

@ -11,6 +11,7 @@
#ifndef WEBRTC_BASE_SOCKETSERVER_H_
#define WEBRTC_BASE_SOCKETSERVER_H_
#include <memory>
#include "webrtc/base/socketfactory.h"
namespace rtc {
@ -27,6 +28,7 @@ class SocketServer : public SocketFactory {
public:
static const int kForever = -1;
static std::unique_ptr<SocketServer> CreateDefault();
// When the socket server is installed into a Thread, this function is
// called to allow the socket server to use the thread's message queue for
// any messaging that it might need to perform.

View File

@ -22,6 +22,7 @@
#include "webrtc/base/common.h"
#include "webrtc/base/logging.h"
#include "webrtc/base/nullsocketserver.h"
#include "webrtc/base/platform_thread.h"
#include "webrtc/base/stringutils.h"
#include "webrtc/base/timeutils.h"
@ -138,7 +139,9 @@ Thread::ScopedDisallowBlockingCalls::~ScopedDisallowBlockingCalls() {
thread_->SetAllowBlockingCalls(previous_state_);
}
Thread::Thread(SocketServer* ss, bool init_queue)
Thread::Thread() : Thread(SocketServer::CreateDefault()) {}
Thread::Thread(SocketServer* ss)
: MessageQueue(ss, false),
running_(true, false),
#if defined(WEBRTC_WIN)
@ -148,9 +151,20 @@ Thread::Thread(SocketServer* ss, bool init_queue)
owned_(true),
blocking_calls_allowed_(true) {
SetName("Thread", this); // default name
if (init_queue) {
DoInit();
}
DoInit();
}
Thread::Thread(std::unique_ptr<SocketServer> ss)
: MessageQueue(std::move(ss), false),
running_(true, false),
#if defined(WEBRTC_WIN)
thread_(NULL),
thread_id_(0),
#endif
owned_(true),
blocking_calls_allowed_(true) {
SetName("Thread", this); // default name
DoInit();
}
Thread::~Thread() {
@ -158,6 +172,15 @@ Thread::~Thread() {
DoDestroy();
}
std::unique_ptr<Thread> Thread::CreateWithSocketServer() {
return std::unique_ptr<Thread>(new Thread(SocketServer::CreateDefault()));
}
std::unique_ptr<Thread> Thread::Create() {
return std::unique_ptr<Thread>(
new Thread(std::unique_ptr<SocketServer>(new NullSocketServer())));
}
bool Thread::SleepMs(int milliseconds) {
AssertBlockingIsAllowedOnCurrentThread();
@ -513,7 +536,7 @@ bool Thread::WrapCurrentWithThreadManager(ThreadManager* thread_manager,
return true;
}
AutoThread::AutoThread(SocketServer* ss) : Thread(ss) {
AutoThread::AutoThread() {
if (!ThreadManager::Instance()->CurrentThread()) {
ThreadManager::Instance()->SetCurrentThread(this);
}

View File

@ -95,11 +95,9 @@ class Runnable {
class Thread : public MessageQueue {
public:
// Create a new Thread and optionally assign it to the passed SocketServer.
// Subclasses that override Clear should pass false for init_queue and call
// DoInit() from their constructor to prevent races with the
// MessageQueueManager already using the object while the vtable is still
// being created.
explicit Thread(SocketServer* ss = nullptr, bool init_queue = true);
Thread();
explicit Thread(SocketServer* ss);
explicit Thread(std::unique_ptr<SocketServer> ss);
// NOTE: ALL SUBCLASSES OF Thread MUST CALL Stop() IN THEIR DESTRUCTORS (or
// guarantee Stop() is explicitly called before the subclass is destroyed).
@ -107,6 +105,8 @@ class Thread : public MessageQueue {
// vtable, and the Thread::PreRun calling the virtual method Run().
~Thread() override;
static std::unique_ptr<Thread> CreateWithSocketServer();
static std::unique_ptr<Thread> Create();
static Thread* Current();
// Used to catch performance regressions. Use this to disallow blocking calls
@ -291,7 +291,7 @@ class Thread : public MessageQueue {
class AutoThread : public Thread {
public:
explicit AutoThread(SocketServer* ss = nullptr);
AutoThread();
~AutoThread() override;
private: