Move VirtualSocket into the .h file to allow unit tests more control over behavior.

BUG=3927
R=pthatcher@webrtc.org

Review URL: https://webrtc-codereview.appspot.com/31289004

git-svn-id: http://webrtc.googlecode.com/svn/trunk@7935 4adac7df-926f-26a2-2b94-8c16560cd09d
This commit is contained in:
guoweis@webrtc.org
2014-12-17 22:03:33 +00:00
parent 6f10ae25ea
commit 0eb6eec5cb
2 changed files with 449 additions and 396 deletions

View File

@ -91,42 +91,51 @@ struct MessageAddress : public MessageData {
SocketAddress addr; SocketAddress addr;
}; };
// Implements the socket interface using the virtual network. Packets are VirtualSocket::VirtualSocket(VirtualSocketServer* server,
// passed as messages using the message queue of the socket server. int family,
class VirtualSocket : public AsyncSocket, public MessageHandler { int type,
public: bool async)
VirtualSocket(VirtualSocketServer* server, int family, int type, bool async) : server_(server),
: server_(server), family_(family), type_(type), async_(async), family_(family),
state_(CS_CLOSED), error_(0), listen_queue_(NULL), type_(type),
async_(async),
state_(CS_CLOSED),
error_(0),
listen_queue_(NULL),
write_enabled_(false), write_enabled_(false),
network_size_(0), recv_buffer_size_(0), bound_(false), was_any_(false) { network_size_(0),
recv_buffer_size_(0),
bound_(false),
was_any_(false) {
ASSERT((type_ == SOCK_DGRAM) || (type_ == SOCK_STREAM)); ASSERT((type_ == SOCK_DGRAM) || (type_ == SOCK_STREAM));
ASSERT(async_ || (type_ != SOCK_STREAM)); // We only support async streams ASSERT(async_ || (type_ != SOCK_STREAM)); // We only support async streams
} }
virtual ~VirtualSocket() { VirtualSocket::~VirtualSocket() {
Close(); Close();
for (RecvBuffer::iterator it = recv_buffer_.begin(); for (RecvBuffer::iterator it = recv_buffer_.begin(); it != recv_buffer_.end();
it != recv_buffer_.end(); ++it) { ++it) {
delete *it; delete *it;
} }
} }
virtual SocketAddress GetLocalAddress() const { SocketAddress VirtualSocket::GetLocalAddress() const {
if (!alternative_local_addr_.IsNil())
return alternative_local_addr_;
return local_addr_; return local_addr_;
} }
virtual SocketAddress GetRemoteAddress() const { SocketAddress VirtualSocket::GetRemoteAddress() const {
return remote_addr_; return remote_addr_;
} }
// Used by server sockets to set the local address without binding. // Used by server sockets to set the local address without binding.
void SetLocalAddress(const SocketAddress& addr) { void VirtualSocket::SetLocalAddress(const SocketAddress& addr) {
local_addr_ = addr; local_addr_ = addr;
} }
virtual int Bind(const SocketAddress& addr) { int VirtualSocket::Bind(const SocketAddress& addr) {
if (!local_addr_.IsNil()) { if (!local_addr_.IsNil()) {
error_ = EINVAL; error_ = EINVAL;
return -1; return -1;
@ -143,11 +152,11 @@ class VirtualSocket : public AsyncSocket, public MessageHandler {
return result; return result;
} }
virtual int Connect(const SocketAddress& addr) { int VirtualSocket::Connect(const SocketAddress& addr) {
return InitiateConnect(addr, true); return InitiateConnect(addr, true);
} }
virtual int Close() { int VirtualSocket::Close() {
if (!local_addr_.IsNil() && bound_) { if (!local_addr_.IsNil() && bound_) {
// Remove from the binding table. // Remove from the binding table.
server_->Unbind(local_addr_, this); server_->Unbind(local_addr_, this);
@ -174,7 +183,7 @@ class VirtualSocket : public AsyncSocket, public MessageHandler {
server_->LookupConnection(local_addr_, remote_addr_); server_->LookupConnection(local_addr_, remote_addr_);
if (!socket) { if (!socket) {
// Not a server socket child, then see if it is bound. // Not a server socket child, then see if it is bound.
// TODO: If this is indeed a server socket that has no // TODO(tbd): If this is indeed a server socket that has no
// children this will cause the server socket to be // children this will cause the server socket to be
// closed. This might lead to unexpected results, how to fix this? // closed. This might lead to unexpected results, how to fix this?
socket = server_->LookupBinding(remote_addr_); socket = server_->LookupBinding(remote_addr_);
@ -195,8 +204,8 @@ class VirtualSocket : public AsyncSocket, public MessageHandler {
MessageAddress* data = static_cast<MessageAddress*>(it->pdata); MessageAddress* data = static_cast<MessageAddress*>(it->pdata);
// Lookup remote side. // Lookup remote side.
VirtualSocket* socket = server_->LookupConnection(local_addr_, VirtualSocket* socket =
data->addr); server_->LookupConnection(local_addr_, data->addr);
if (socket) { if (socket) {
// Server socket, remote side is a socket retreived by // Server socket, remote side is a socket retreived by
// accept. Accepted sockets are not bound so we will not // accept. Accepted sockets are not bound so we will not
@ -220,7 +229,7 @@ class VirtualSocket : public AsyncSocket, public MessageHandler {
return 0; return 0;
} }
virtual int Send(const void *pv, size_t cb) { int VirtualSocket::Send(const void* pv, size_t cb) {
if (CS_CONNECTED != state_) { if (CS_CONNECTED != state_) {
error_ = ENOTCONN; error_ = ENOTCONN;
return -1; return -1;
@ -232,7 +241,9 @@ class VirtualSocket : public AsyncSocket, public MessageHandler {
} }
} }
virtual int SendTo(const void *pv, size_t cb, const SocketAddress& addr) { int VirtualSocket::SendTo(const void* pv,
size_t cb,
const SocketAddress& addr) {
if (SOCK_DGRAM == type_) { if (SOCK_DGRAM == type_) {
return SendUdp(pv, cb, addr); return SendUdp(pv, cb, addr);
} else { } else {
@ -244,12 +255,12 @@ class VirtualSocket : public AsyncSocket, public MessageHandler {
} }
} }
virtual int Recv(void *pv, size_t cb) { int VirtualSocket::Recv(void* pv, size_t cb) {
SocketAddress addr; SocketAddress addr;
return RecvFrom(pv, cb, &addr); return RecvFrom(pv, cb, &addr);
} }
virtual int RecvFrom(void *pv, size_t cb, SocketAddress *paddr) { int VirtualSocket::RecvFrom(void* pv, size_t cb, SocketAddress* paddr) {
// If we don't have a packet, then either error or wait for one to arrive. // If we don't have a packet, then either error or wait for one to arrive.
if (recv_buffer_.empty()) { if (recv_buffer_.empty()) {
if (async_) { if (async_) {
@ -289,7 +300,7 @@ class VirtualSocket : public AsyncSocket, public MessageHandler {
return static_cast<int>(data_read); return static_cast<int>(data_read);
} }
virtual int Listen(int backlog) { int VirtualSocket::Listen(int backlog) {
ASSERT(SOCK_STREAM == type_); ASSERT(SOCK_STREAM == type_);
ASSERT(CS_CLOSED == state_); ASSERT(CS_CLOSED == state_);
if (local_addr_.IsNil()) { if (local_addr_.IsNil()) {
@ -302,14 +313,13 @@ class VirtualSocket : public AsyncSocket, public MessageHandler {
return 0; return 0;
} }
virtual VirtualSocket* Accept(SocketAddress *paddr) { VirtualSocket* VirtualSocket::Accept(SocketAddress* paddr) {
if (NULL == listen_queue_) { if (NULL == listen_queue_) {
error_ = EINVAL; error_ = EINVAL;
return NULL; return NULL;
} }
while (!listen_queue_->empty()) { while (!listen_queue_->empty()) {
VirtualSocket* socket = new VirtualSocket(server_, AF_INET, type_, VirtualSocket* socket = new VirtualSocket(server_, AF_INET, type_, async_);
async_);
// Set the new local address to the same as this server socket. // Set the new local address to the same as this server socket.
socket->SetLocalAddress(local_addr_); socket->SetLocalAddress(local_addr_);
@ -332,19 +342,19 @@ class VirtualSocket : public AsyncSocket, public MessageHandler {
return NULL; return NULL;
} }
virtual int GetError() const { int VirtualSocket::GetError() const {
return error_; return error_;
} }
virtual void SetError(int error) { void VirtualSocket::SetError(int error) {
error_ = error; error_ = error;
} }
virtual ConnState GetState() const { Socket::ConnState VirtualSocket::GetState() const {
return state_; return state_;
} }
virtual int GetOption(Option opt, int* value) { int VirtualSocket::GetOption(Option opt, int* value) {
OptionsMap::const_iterator it = options_map_.find(opt); OptionsMap::const_iterator it = options_map_.find(opt);
if (it == options_map_.end()) { if (it == options_map_.end()) {
return -1; return -1;
@ -353,19 +363,19 @@ class VirtualSocket : public AsyncSocket, public MessageHandler {
return 0; // 0 is success to emulate getsockopt() return 0; // 0 is success to emulate getsockopt()
} }
virtual int SetOption(Option opt, int value) { int VirtualSocket::SetOption(Option opt, int value) {
options_map_[opt] = value; options_map_[opt] = value;
return 0; // 0 is success to emulate setsockopt() return 0; // 0 is success to emulate setsockopt()
} }
virtual int EstimateMTU(uint16* mtu) { int VirtualSocket::EstimateMTU(uint16* mtu) {
if (CS_CONNECTED != state_) if (CS_CONNECTED != state_)
return ENOTCONN; return ENOTCONN;
else else
return 65536; return 65536;
} }
void OnMessage(Message *pmsg) { void VirtualSocket::OnMessage(Message* pmsg) {
if (pmsg->message_id == MSG_ID_PACKET) { if (pmsg->message_id == MSG_ID_PACKET) {
// ASSERT(!local_addr_.IsAny()); // ASSERT(!local_addr_.IsAny());
ASSERT(NULL != pmsg->pdata); ASSERT(NULL != pmsg->pdata);
@ -406,22 +416,7 @@ class VirtualSocket : public AsyncSocket, public MessageHandler {
} }
} }
bool was_any() { return was_any_; } int VirtualSocket::InitiateConnect(const SocketAddress& addr, bool use_delay) {
void set_was_any(bool was_any) { was_any_ = was_any; }
private:
struct NetworkEntry {
size_t size;
uint32 done_time;
};
typedef std::deque<SocketAddress> ListenQueue;
typedef std::deque<NetworkEntry> NetworkQueue;
typedef std::vector<char> SendBuffer;
typedef std::list<Packet*> RecvBuffer;
typedef std::map<Option, int> OptionsMap;
int InitiateConnect(const SocketAddress& addr, bool use_delay) {
if (!remote_addr_.IsNil()) { if (!remote_addr_.IsNil()) {
error_ = (CS_CONNECTED == state_) ? EISCONN : EINPROGRESS; error_ = (CS_CONNECTED == state_) ? EISCONN : EINPROGRESS;
return -1; return -1;
@ -452,7 +447,7 @@ class VirtualSocket : public AsyncSocket, public MessageHandler {
return 0; return 0;
} }
void CompleteConnect(const SocketAddress& addr, bool notify) { void VirtualSocket::CompleteConnect(const SocketAddress& addr, bool notify) {
ASSERT(CS_CONNECTING == state_); ASSERT(CS_CONNECTING == state_);
remote_addr_ = addr; remote_addr_ = addr;
state_ = CS_CONNECTED; state_ = CS_CONNECTED;
@ -462,7 +457,9 @@ class VirtualSocket : public AsyncSocket, public MessageHandler {
} }
} }
int SendUdp(const void* pv, size_t cb, const SocketAddress& addr) { int VirtualSocket::SendUdp(const void* pv,
size_t cb,
const SocketAddress& addr) {
// If we have not been assigned a local port, then get one. // If we have not been assigned a local port, then get one.
if (local_addr_.IsNil()) { if (local_addr_.IsNil()) {
local_addr_ = EmptySocketAddressWithFamily(addr.ipaddr().family()); local_addr_ = EmptySocketAddressWithFamily(addr.ipaddr().family());
@ -478,7 +475,7 @@ class VirtualSocket : public AsyncSocket, public MessageHandler {
return server_->SendUdp(this, static_cast<const char*>(pv), cb, addr); return server_->SendUdp(this, static_cast<const char*>(pv), cb, addr);
} }
int SendTcp(const void* pv, size_t cb) { int VirtualSocket::SendTcp(const void* pv, size_t cb) {
size_t capacity = server_->send_buffer_capacity_ - send_buffer_.size(); size_t capacity = server_->send_buffer_capacity_ - send_buffer_.size();
if (0 == capacity) { if (0 == capacity) {
write_enabled_ = true; write_enabled_ = true;
@ -492,49 +489,6 @@ class VirtualSocket : public AsyncSocket, public MessageHandler {
return static_cast<int>(consumed); return static_cast<int>(consumed);
} }
VirtualSocketServer* server_;
int family_;
int type_;
bool async_;
ConnState state_;
int error_;
SocketAddress local_addr_;
SocketAddress remote_addr_;
// Pending sockets which can be Accepted
ListenQueue* listen_queue_;
// Data which tcp has buffered for sending
SendBuffer send_buffer_;
bool write_enabled_;
// Critical section to protect the recv_buffer and queue_
CriticalSection crit_;
// Network model that enforces bandwidth and capacity constraints
NetworkQueue network_;
size_t network_size_;
// Data which has been received from the network
RecvBuffer recv_buffer_;
// The amount of data which is in flight or in recv_buffer_
size_t recv_buffer_size_;
// Is this socket bound?
bool bound_;
// When we bind a socket to Any, VSS's Bind gives it another address. For
// dual-stack sockets, we want to distinguish between sockets that were
// explicitly given a particular address and sockets that had one picked
// for them by VSS.
bool was_any_;
// Store the options that are set
OptionsMap options_map_;
friend class VirtualSocketServer;
};
VirtualSocketServer::VirtualSocketServer(SocketServer* ss) VirtualSocketServer::VirtualSocketServer(SocketServer* ss)
: server_(ss), server_owned_(false), msg_queue_(NULL), stop_on_idle_(false), : server_(ss), server_owned_(false), msg_queue_(NULL), stop_on_idle_(false),
network_delay_(Time()), next_ipv4_(kInitialNextIPv4), network_delay_(Time()), next_ipv4_(kInitialNextIPv4),

View File

@ -21,6 +21,7 @@
namespace rtc { namespace rtc {
class Packet;
class VirtualSocket; class VirtualSocket;
class SocketAddressPair; class SocketAddressPair;
@ -232,6 +233,104 @@ class VirtualSocketServer : public SocketServer, public sigslot::has_slots<> {
DISALLOW_EVIL_CONSTRUCTORS(VirtualSocketServer); DISALLOW_EVIL_CONSTRUCTORS(VirtualSocketServer);
}; };
// Implements the socket interface using the virtual network. Packets are
// passed as messages using the message queue of the socket server.
class VirtualSocket : public AsyncSocket, public MessageHandler {
public:
VirtualSocket(VirtualSocketServer* server, int family, int type, bool async);
virtual ~VirtualSocket();
virtual SocketAddress GetLocalAddress() const;
virtual SocketAddress GetRemoteAddress() const;
// Used by server sockets to set the local address without binding.
void SetLocalAddress(const SocketAddress& addr);
virtual int Bind(const SocketAddress& addr);
virtual int Connect(const SocketAddress& addr);
virtual int Close();
virtual int Send(const void* pv, size_t cb);
virtual int SendTo(const void* pv, size_t cb, const SocketAddress& addr);
virtual int Recv(void* pv, size_t cb);
virtual int RecvFrom(void* pv, size_t cb, SocketAddress* paddr);
virtual int Listen(int backlog);
virtual VirtualSocket* Accept(SocketAddress* paddr);
virtual int GetError() const;
virtual void SetError(int error);
virtual ConnState GetState() const;
virtual int GetOption(Option opt, int* value);
virtual int SetOption(Option opt, int value);
virtual int EstimateMTU(uint16* mtu);
void OnMessage(Message* pmsg);
bool was_any() { return was_any_; }
void set_was_any(bool was_any) { was_any_ = was_any; }
// For testing purpose only. Fired when client socket is bound to an address.
sigslot::signal2<VirtualSocket*, const SocketAddress&> SignalAddressReady;
private:
struct NetworkEntry {
size_t size;
uint32 done_time;
};
typedef std::deque<SocketAddress> ListenQueue;
typedef std::deque<NetworkEntry> NetworkQueue;
typedef std::vector<char> SendBuffer;
typedef std::list<Packet*> RecvBuffer;
typedef std::map<Option, int> OptionsMap;
int InitiateConnect(const SocketAddress& addr, bool use_delay);
void CompleteConnect(const SocketAddress& addr, bool notify);
int SendUdp(const void* pv, size_t cb, const SocketAddress& addr);
int SendTcp(const void* pv, size_t cb);
VirtualSocketServer* server_;
int family_;
int type_;
bool async_;
ConnState state_;
int error_;
SocketAddress local_addr_;
SocketAddress alternative_local_addr_;
SocketAddress remote_addr_;
// Pending sockets which can be Accepted
ListenQueue* listen_queue_;
// Data which tcp has buffered for sending
SendBuffer send_buffer_;
bool write_enabled_;
// Critical section to protect the recv_buffer and queue_
CriticalSection crit_;
// Network model that enforces bandwidth and capacity constraints
NetworkQueue network_;
size_t network_size_;
// Data which has been received from the network
RecvBuffer recv_buffer_;
// The amount of data which is in flight or in recv_buffer_
size_t recv_buffer_size_;
// Is this socket bound?
bool bound_;
// When we bind a socket to Any, VSS's Bind gives it another address. For
// dual-stack sockets, we want to distinguish between sockets that were
// explicitly given a particular address and sockets that had one picked
// for them by VSS.
bool was_any_;
// Store the options that are set
OptionsMap options_map_;
friend class VirtualSocketServer;
};
} // namespace rtc } // namespace rtc
#endif // WEBRTC_BASE_VIRTUALSOCKETSERVER_H_ #endif // WEBRTC_BASE_VIRTUALSOCKETSERVER_H_