Delete friendship between VirtualSocket and VirtualSocketServer

Bug: webrtc:11567
Change-Id: I07d01b9eed798a69ba798e899b2bae57409ce332
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/207181
Reviewed-by: Tommi <tommi@webrtc.org>
Commit-Queue: Niels Moller <nisse@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#33276}
This commit is contained in:
Niels Möller
2021-02-16 09:25:52 +01:00
committed by Commit Bot
parent 686ad4ff06
commit c79bd433a1
2 changed files with 232 additions and 150 deletions

View File

@ -149,9 +149,6 @@ int VirtualSocket::Bind(const SocketAddress& addr) {
} else {
bound_ = true;
was_any_ = addr.IsAnyIP();
// Post a message here such that test case could have chance to
// process the local address. (i.e. SetAlternativeLocalAddress).
server_->msg_queue_->Post(RTC_FROM_HERE, this, MSG_ID_ADDRESS_BOUND);
}
return result;
}
@ -174,7 +171,7 @@ int VirtualSocket::Close() {
SocketAddress addr = listen_queue_->front();
// Disconnect listening socket.
server_->Disconnect(server_->LookupBinding(addr));
server_->Disconnect(addr);
listen_queue_->pop_front();
}
delete listen_queue_;
@ -182,51 +179,14 @@ int VirtualSocket::Close() {
}
// Disconnect stream sockets
if (CS_CONNECTED == state_) {
// Disconnect remote socket, check if it is a child of a server socket.
VirtualSocket* socket =
server_->LookupConnection(local_addr_, remote_addr_);
if (!socket) {
// Not a server socket child, then see if it is bound.
// TODO(tbd): If this is indeed a server socket that has no
// children this will cause the server socket to be
// closed. This might lead to unexpected results, how to fix this?
socket = server_->LookupBinding(remote_addr_);
}
server_->Disconnect(socket);
// Remove mapping for both directions.
server_->RemoveConnection(remote_addr_, local_addr_);
server_->RemoveConnection(local_addr_, remote_addr_);
server_->Disconnect(local_addr_, remote_addr_);
}
// Cancel potential connects
MessageList msgs;
if (server_->msg_queue_) {
server_->msg_queue_->Clear(this, MSG_ID_CONNECT, &msgs);
}
for (MessageList::iterator it = msgs.begin(); it != msgs.end(); ++it) {
RTC_DCHECK(nullptr != it->pdata);
MessageAddress* data = static_cast<MessageAddress*>(it->pdata);
// Lookup remote side.
VirtualSocket* socket =
server_->LookupConnection(local_addr_, data->addr);
if (socket) {
// Server socket, remote side is a socket retreived by
// accept. Accepted sockets are not bound so we will not
// find it by looking in the bindings table.
server_->Disconnect(socket);
server_->RemoveConnection(local_addr_, data->addr);
} else {
server_->Disconnect(server_->LookupBinding(data->addr));
}
delete data;
}
server_->CancelConnects(this);
}
// Clear incoming packets and disconnect messages
if (server_->msg_queue_) {
server_->msg_queue_->Clear(this);
}
server_->Clear(this);
state_ = CS_CLOSED;
local_addr_.Clear();
@ -279,9 +239,7 @@ int VirtualSocket::RecvFrom(void* pv,
return -1;
}
while (recv_buffer_.empty()) {
Message msg;
server_->msg_queue_->Get(&msg);
server_->msg_queue_->Dispatch(&msg);
server_->ProcessOneMessage();
}
}
@ -301,18 +259,14 @@ int VirtualSocket::RecvFrom(void* pv,
// To behave like a real socket, SignalReadEvent should fire in the next
// message loop pass if there's still data buffered.
if (!recv_buffer_.empty()) {
// Clear the message so it doesn't end up posted multiple times.
server_->msg_queue_->Clear(this, MSG_ID_SIGNALREADEVENT);
server_->msg_queue_->Post(RTC_FROM_HERE, this, MSG_ID_SIGNALREADEVENT);
server_->PostSignalReadEvent(this);
}
if (SOCK_STREAM == type_) {
bool was_full = (recv_buffer_size_ == server_->recv_buffer_capacity_);
bool was_full = (recv_buffer_size_ == server_->recv_buffer_capacity());
recv_buffer_size_ -= data_read;
if (was_full) {
VirtualSocket* sender = server_->LookupBinding(remote_addr_);
RTC_DCHECK(nullptr != sender);
server_->SendTcp(sender);
server_->SendTcp(remote_addr_);
}
}
@ -410,7 +364,7 @@ void VirtualSocket::OnMessage(Message* pmsg) {
} else {
RTC_LOG(LS_VERBOSE) << "Socket at " << local_addr_.ToString()
<< " is not listening";
server_->Disconnect(server_->LookupBinding(data->addr));
server_->Disconnect(data->addr);
}
delete data;
} else if (pmsg->message_id == MSG_ID_DISCONNECT) {
@ -494,7 +448,7 @@ int VirtualSocket::SendUdp(const void* pv,
}
int VirtualSocket::SendTcp(const void* pv, size_t cb) {
size_t capacity = server_->send_buffer_capacity_ - send_buffer_.size();
size_t capacity = server_->send_buffer_capacity() - send_buffer_.size();
if (0 == capacity) {
ready_to_send_ = false;
error_ = EWOULDBLOCK;
@ -523,6 +477,67 @@ void VirtualSocket::OnSocketServerReadyToSend() {
}
}
void VirtualSocket::SetToBlocked() {
CritScope cs(&crit_);
ready_to_send_ = false;
error_ = EWOULDBLOCK;
}
void VirtualSocket::UpdateRecv(size_t data_size) {
recv_buffer_size_ += data_size;
}
void VirtualSocket::UpdateSend(size_t data_size) {
size_t new_buffer_size = send_buffer_.size() - data_size;
// Avoid undefined access beyond the last element of the vector.
// This only happens when new_buffer_size is 0.
if (data_size < send_buffer_.size()) {
// memmove is required for potentially overlapping source/destination.
memmove(&send_buffer_[0], &send_buffer_[data_size], new_buffer_size);
}
send_buffer_.resize(new_buffer_size);
}
void VirtualSocket::MaybeSignalWriteEvent(size_t capacity) {
if (!ready_to_send_ && (send_buffer_.size() < capacity)) {
ready_to_send_ = true;
SignalWriteEvent(this);
}
}
uint32_t VirtualSocket::AddPacket(int64_t cur_time, size_t packet_size) {
network_size_ += packet_size;
uint32_t send_delay =
server_->SendDelay(static_cast<uint32_t>(network_size_));
NetworkEntry entry;
entry.size = packet_size;
entry.done_time = cur_time + send_delay;
network_.push_back(entry);
return send_delay;
}
int64_t VirtualSocket::UpdateOrderedDelivery(int64_t ts) {
// Ensure that new packets arrive after previous ones
ts = std::max(ts, last_delivery_time_);
// A socket should not have both ordered and unordered delivery, so its last
// delivery time only needs to be updated when it has ordered delivery.
last_delivery_time_ = ts;
return ts;
}
size_t VirtualSocket::PurgeNetworkPackets(int64_t cur_time) {
CritScope cs(&crit_);
while (!network_.empty() && (network_.front().done_time <= cur_time)) {
RTC_DCHECK(network_size_ >= network_.front().size);
network_size_ -= network_.front().size;
network_.pop_front();
}
return network_size_;
}
VirtualSocketServer::VirtualSocketServer() : VirtualSocketServer(nullptr) {}
VirtualSocketServer::VirtualSocketServer(ThreadProcessingFakeClock* fake_clock)
@ -596,9 +611,7 @@ AsyncSocket* VirtualSocketServer::CreateAsyncSocket(int family, int type) {
}
VirtualSocket* VirtualSocketServer::CreateSocketInternal(int family, int type) {
VirtualSocket* socket = new VirtualSocket(this, family, type, true);
SignalSocketCreated(socket);
return socket;
return new VirtualSocket(this, family, type, true);
}
void VirtualSocketServer::SetMessageQueue(Thread* msg_queue) {
@ -683,7 +696,13 @@ int VirtualSocketServer::Bind(VirtualSocket* socket,
SocketAddress normalized(addr.ipaddr().Normalized(), addr.port());
AddressMap::value_type entry(normalized, socket);
return bindings_->insert(entry).second ? 0 : -1;
if (bindings_->insert(entry).second) {
// Post a message here such that test case could have chance to
// process the local address. (i.e. SetAlternativeLocalAddress).
msg_queue_->Post(RTC_FROM_HERE, socket, MSG_ID_ADDRESS_BOUND);
return 0;
}
return -1;
}
int VirtualSocketServer::Bind(VirtualSocket* socket, SocketAddress* addr) {
@ -814,15 +833,79 @@ bool VirtualSocketServer::Disconnect(VirtualSocket* socket) {
return false;
}
bool VirtualSocketServer::Disconnect(const SocketAddress& addr) {
return Disconnect(LookupBinding(addr));
}
bool VirtualSocketServer::Disconnect(const SocketAddress& local_addr,
const SocketAddress& remote_addr) {
// Disconnect remote socket, check if it is a child of a server socket.
VirtualSocket* socket = LookupConnection(local_addr, remote_addr);
if (!socket) {
// Not a server socket child, then see if it is bound.
// TODO(tbd): If this is indeed a server socket that has no
// children this will cause the server socket to be
// closed. This might lead to unexpected results, how to fix this?
socket = LookupBinding(remote_addr);
}
Disconnect(socket);
// Remove mapping for both directions.
RemoveConnection(remote_addr, local_addr);
RemoveConnection(local_addr, remote_addr);
return socket != nullptr;
}
void VirtualSocketServer::CancelConnects(VirtualSocket* socket) {
MessageList msgs;
if (msg_queue_) {
msg_queue_->Clear(socket, MSG_ID_CONNECT, &msgs);
}
for (MessageList::iterator it = msgs.begin(); it != msgs.end(); ++it) {
RTC_DCHECK(nullptr != it->pdata);
MessageAddress* data = static_cast<MessageAddress*>(it->pdata);
SocketAddress local_addr = socket->GetLocalAddress();
// Lookup remote side.
VirtualSocket* socket = LookupConnection(local_addr, data->addr);
if (socket) {
// Server socket, remote side is a socket retreived by
// accept. Accepted sockets are not bound so we will not
// find it by looking in the bindings table.
Disconnect(socket);
RemoveConnection(local_addr, data->addr);
} else {
Disconnect(data->addr);
}
delete data;
}
}
void VirtualSocketServer::Clear(VirtualSocket* socket) {
// Clear incoming packets and disconnect messages
if (msg_queue_) {
msg_queue_->Clear(socket);
}
}
void VirtualSocketServer::ProcessOneMessage() {
Message msg;
msg_queue_->Get(&msg);
msg_queue_->Dispatch(&msg);
}
void VirtualSocketServer::PostSignalReadEvent(VirtualSocket* socket) {
// Clear the message so it doesn't end up posted multiple times.
msg_queue_->Clear(socket, MSG_ID_SIGNALREADEVENT);
msg_queue_->Post(RTC_FROM_HERE, socket, MSG_ID_SIGNALREADEVENT);
}
int VirtualSocketServer::SendUdp(VirtualSocket* socket,
const char* data,
size_t data_size,
const SocketAddress& remote_addr) {
++sent_packets_;
if (sending_blocked_) {
CritScope cs(&socket->crit_);
socket->ready_to_send_ = false;
socket->error_ = EWOULDBLOCK;
socket->SetToBlocked();
return -1;
}
@ -856,10 +939,8 @@ int VirtualSocketServer::SendUdp(VirtualSocket* socket,
}
{
CritScope cs(&socket->crit_);
int64_t cur_time = TimeMillis();
PurgeNetworkPackets(socket, cur_time);
size_t network_size = socket->PurgeNetworkPackets(cur_time);
// Determine whether we have enough bandwidth to accept this packet. To do
// this, we need to update the send queue. Once we know it's current size,
@ -870,7 +951,7 @@ int VirtualSocketServer::SendUdp(VirtualSocket* socket,
// simulation of what a normal network would do.
size_t packet_size = data_size + UDP_HEADER_SIZE;
if (socket->network_size_ + packet_size > network_capacity_) {
if (network_size + packet_size > network_capacity_) {
RTC_LOG(LS_VERBOSE) << "Dropping packet: network capacity exceeded";
return static_cast<int>(data_size);
}
@ -898,45 +979,36 @@ void VirtualSocketServer::SendTcp(VirtualSocket* socket) {
// Lookup the local/remote pair in the connections table.
VirtualSocket* recipient =
LookupConnection(socket->local_addr_, socket->remote_addr_);
LookupConnection(socket->GetLocalAddress(), socket->GetRemoteAddress());
if (!recipient) {
RTC_LOG(LS_VERBOSE) << "Sending data to no one.";
return;
}
CritScope cs(&socket->crit_);
int64_t cur_time = TimeMillis();
PurgeNetworkPackets(socket, cur_time);
socket->PurgeNetworkPackets(cur_time);
while (true) {
size_t available = recv_buffer_capacity_ - recipient->recv_buffer_size_;
size_t available = recv_buffer_capacity_ - recipient->recv_buffer_size();
size_t max_data_size =
std::min<size_t>(available, TCP_MSS - TCP_HEADER_SIZE);
size_t data_size = std::min(socket->send_buffer_.size(), max_data_size);
size_t data_size = std::min(socket->send_buffer_size(), max_data_size);
if (0 == data_size)
break;
AddPacketToNetwork(socket, recipient, cur_time, &socket->send_buffer_[0],
AddPacketToNetwork(socket, recipient, cur_time, socket->send_buffer_data(),
data_size, TCP_HEADER_SIZE, true);
recipient->recv_buffer_size_ += data_size;
size_t new_buffer_size = socket->send_buffer_.size() - data_size;
// Avoid undefined access beyond the last element of the vector.
// This only happens when new_buffer_size is 0.
if (data_size < socket->send_buffer_.size()) {
// memmove is required for potentially overlapping source/destination.
memmove(&socket->send_buffer_[0], &socket->send_buffer_[data_size],
new_buffer_size);
}
socket->send_buffer_.resize(new_buffer_size);
recipient->UpdateRecv(data_size);
socket->UpdateSend(data_size);
}
if (!socket->ready_to_send_ &&
(socket->send_buffer_.size() < send_buffer_capacity_)) {
socket->ready_to_send_ = true;
socket->SignalWriteEvent(socket);
}
socket->MaybeSignalWriteEvent(send_buffer_capacity_);
}
void VirtualSocketServer::SendTcp(const SocketAddress& addr) {
VirtualSocket* sender = LookupBinding(addr);
RTC_DCHECK(nullptr != sender);
SendTcp(sender);
}
void VirtualSocketServer::AddPacketToNetwork(VirtualSocket* sender,
@ -946,13 +1018,7 @@ void VirtualSocketServer::AddPacketToNetwork(VirtualSocket* sender,
size_t data_size,
size_t header_size,
bool ordered) {
VirtualSocket::NetworkEntry entry;
entry.size = data_size + header_size;
sender->network_size_ += entry.size;
uint32_t send_delay = SendDelay(static_cast<uint32_t>(sender->network_size_));
entry.done_time = cur_time + send_delay;
sender->network_.push_back(entry);
uint32_t send_delay = sender->AddPacket(cur_time, data_size + header_size);
// Find the delay for crossing the many virtual hops of the network.
uint32_t transit_delay = GetTransitDelay(sender);
@ -960,7 +1026,7 @@ void VirtualSocketServer::AddPacketToNetwork(VirtualSocket* sender,
// When the incoming packet is from a binding of the any address, translate it
// to the default route here such that the recipient will see the default
// route.
SocketAddress sender_addr = sender->local_addr_;
SocketAddress sender_addr = sender->GetLocalAddress();
IPAddress default_ip = GetDefaultRoute(sender_addr.ipaddr().family());
if (sender_addr.IsAnyIP() && !IPIsUnspec(default_ip)) {
sender_addr.SetIP(default_ip);
@ -971,25 +1037,11 @@ void VirtualSocketServer::AddPacketToNetwork(VirtualSocket* sender,
int64_t ts = TimeAfter(send_delay + transit_delay);
if (ordered) {
// Ensure that new packets arrive after previous ones
ts = std::max(ts, sender->last_delivery_time_);
// A socket should not have both ordered and unordered delivery, so its last
// delivery time only needs to be updated when it has ordered delivery.
sender->last_delivery_time_ = ts;
ts = sender->UpdateOrderedDelivery(ts);
}
msg_queue_->PostAt(RTC_FROM_HERE, ts, recipient, MSG_ID_PACKET, p);
}
void VirtualSocketServer::PurgeNetworkPackets(VirtualSocket* socket,
int64_t cur_time) {
while (!socket->network_.empty() &&
(socket->network_.front().done_time <= cur_time)) {
RTC_DCHECK(socket->network_size_ >= socket->network_.front().size);
socket->network_size_ -= socket->network_.front().size;
socket->network_.pop_front();
}
}
uint32_t VirtualSocketServer::SendDelay(uint32_t size) {
if (bandwidth_ == 0)
return 0;

View File

@ -151,25 +151,12 @@ class VirtualSocketServer : public SocketServer, public sigslot::has_slots<> {
// socket server. Intended to be used for test assertions.
uint32_t sent_packets() const { return sent_packets_; }
// For testing purpose only. Fired when a client socket is created.
sigslot::signal1<VirtualSocket*> SignalSocketCreated;
protected:
// Returns a new IP not used before in this network.
IPAddress GetNextIP(int family);
uint16_t GetNextPort();
VirtualSocket* CreateSocketInternal(int family, int type);
// Binds the given socket to addr, assigning and IP and Port if necessary
int Bind(VirtualSocket* socket, SocketAddress* addr);
// Binds the given socket to the given (fully-defined) address.
int Bind(VirtualSocket* socket, const SocketAddress& addr);
// Find the socket bound to the given address
VirtualSocket* LookupBinding(const SocketAddress& addr);
int Unbind(const SocketAddress& addr, VirtualSocket* socket);
// Adds a mapping between this socket pair and the socket.
@ -177,13 +164,6 @@ class VirtualSocketServer : public SocketServer, public sigslot::has_slots<> {
const SocketAddress& server,
VirtualSocket* socket);
// Find the socket pair corresponding to this server address.
VirtualSocket* LookupConnection(const SocketAddress& client,
const SocketAddress& server);
void RemoveConnection(const SocketAddress& client,
const SocketAddress& server);
// Connects the given socket to the socket at the given address
int Connect(VirtualSocket* socket,
const SocketAddress& remote_addr,
@ -192,6 +172,13 @@ class VirtualSocketServer : public SocketServer, public sigslot::has_slots<> {
// Sends a disconnect message to the socket at the given address
bool Disconnect(VirtualSocket* socket);
// Lookup address, and disconnect corresponding socket.
bool Disconnect(const SocketAddress& addr);
// Lookup connection, close corresponding socket.
bool Disconnect(const SocketAddress& local_addr,
const SocketAddress& remote_addr);
// Sends the given packet to the socket at the given address (if one exists).
int SendUdp(VirtualSocket* socket,
const char* data,
@ -201,6 +188,44 @@ class VirtualSocketServer : public SocketServer, public sigslot::has_slots<> {
// Moves as much data as possible from the sender's buffer to the network
void SendTcp(VirtualSocket* socket);
// Like above, but lookup sender by address.
void SendTcp(const SocketAddress& addr);
// Computes the number of milliseconds required to send a packet of this size.
uint32_t SendDelay(uint32_t size);
// Cancel attempts to connect to a socket that is being closed.
void CancelConnects(VirtualSocket* socket);
// Clear incoming messages for a socket that is being closed.
void Clear(VirtualSocket* socket);
void ProcessOneMessage();
void PostSignalReadEvent(VirtualSocket* socket);
// Sending was previously blocked, but now isn't.
sigslot::signal0<> SignalReadyToSend;
protected:
// Returns a new IP not used before in this network.
IPAddress GetNextIP(int family);
// Find the socket bound to the given address
VirtualSocket* LookupBinding(const SocketAddress& addr);
private:
uint16_t GetNextPort();
VirtualSocket* CreateSocketInternal(int family, int type);
// Find the socket pair corresponding to this server address.
VirtualSocket* LookupConnection(const SocketAddress& client,
const SocketAddress& server);
void RemoveConnection(const SocketAddress& client,
const SocketAddress& server);
// Places a packet on the network.
void AddPacketToNetwork(VirtualSocket* socket,
VirtualSocket* recipient,
@ -210,12 +235,6 @@ class VirtualSocketServer : public SocketServer, public sigslot::has_slots<> {
size_t header_size,
bool ordered);
// Removes stale packets from the network
void PurgeNetworkPackets(VirtualSocket* socket, int64_t cur_time);
// Computes the number of milliseconds required to send a packet of this size.
uint32_t SendDelay(uint32_t size);
// If the delay has been set for the address of the socket, returns the set
// delay. Otherwise, returns a random transit delay chosen from the
// appropriate distribution.
@ -253,12 +272,6 @@ class VirtualSocketServer : public SocketServer, public sigslot::has_slots<> {
// NB: This scheme doesn't permit non-dualstack IPv6 sockets.
static bool CanInteractWith(VirtualSocket* local, VirtualSocket* remote);
private:
friend class VirtualSocket;
// Sending was previously blocked, but now isn't.
sigslot::signal0<> SignalReadyToSend;
typedef std::map<SocketAddress, VirtualSocket*> AddressMap;
typedef std::map<SocketAddressPair, VirtualSocket*> ConnectionMap;
@ -331,9 +344,31 @@ class VirtualSocket : public AsyncSocket,
int SetOption(Option opt, int value) override;
void OnMessage(Message* pmsg) override;
size_t recv_buffer_size() const { return recv_buffer_size_; }
size_t send_buffer_size() const { return send_buffer_.size(); }
const char* send_buffer_data() const { return send_buffer_.data(); }
// Used by server sockets to set the local address without binding.
void SetLocalAddress(const SocketAddress& addr);
bool was_any() { return was_any_; }
void set_was_any(bool was_any) { was_any_ = was_any; }
void SetToBlocked();
void UpdateRecv(size_t data_size);
void UpdateSend(size_t data_size);
void MaybeSignalWriteEvent(size_t capacity);
// Adds a packet to be sent. Returns delay, based on network_size_.
uint32_t AddPacket(int64_t cur_time, size_t packet_size);
int64_t UpdateOrderedDelivery(int64_t ts);
// Removes stale packets from the network. Returns current size.
size_t PurgeNetworkPackets(int64_t cur_time);
// For testing purpose only. Fired when client socket is bound to an address.
sigslot::signal2<VirtualSocket*, const SocketAddress&> SignalAddressReady;
@ -354,9 +389,6 @@ class VirtualSocket : public AsyncSocket,
int SendUdp(const void* pv, size_t cb, const SocketAddress& addr);
int SendTcp(const void* pv, size_t cb);
// Used by server sockets to set the local address without binding.
void SetLocalAddress(const SocketAddress& addr);
void OnSocketServerReadyToSend();
VirtualSocketServer* server_;
@ -402,8 +434,6 @@ class VirtualSocket : public AsyncSocket,
// Store the options that are set
OptionsMap options_map_;
friend class VirtualSocketServer;
};
} // namespace rtc