Replace use of RecursiveCriticalSection in VirtualSocketServer

Also change listen_queue_ member to use std::unique_ptr to
manage ownership.

Bug: webrtc:11567
Change-Id: I85171c9cd0253fdbcbce38b1cfebb1adb5bddd9b
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/223063
Commit-Queue: Niels Moller <nisse@webrtc.org>
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#34353}
This commit is contained in:
Niels Möller
2021-06-22 10:03:14 +02:00
committed by WebRTC LUCI CQ
parent fe6580fb87
commit c413c5581b
2 changed files with 25 additions and 24 deletions

View File

@ -19,7 +19,6 @@
#include "absl/algorithm/container.h" #include "absl/algorithm/container.h"
#include "rtc_base/checks.h" #include "rtc_base/checks.h"
#include "rtc_base/deprecated/recursive_critical_section.h"
#include "rtc_base/fake_clock.h" #include "rtc_base/fake_clock.h"
#include "rtc_base/logging.h" #include "rtc_base/logging.h"
#include "rtc_base/physical_socket_server.h" #include "rtc_base/physical_socket_server.h"
@ -164,7 +163,7 @@ int VirtualSocket::Close() {
} }
if (SOCK_STREAM == type_) { if (SOCK_STREAM == type_) {
CritScope cs(&crit_); webrtc::MutexLock lock(&mutex_);
// Cancel pending sockets // Cancel pending sockets
if (listen_queue_) { if (listen_queue_) {
@ -175,7 +174,6 @@ int VirtualSocket::Close() {
server_->Disconnect(addr); server_->Disconnect(addr);
listen_queue_->pop_front(); listen_queue_->pop_front();
} }
delete listen_queue_;
listen_queue_ = nullptr; listen_queue_ = nullptr;
} }
// Disconnect stream sockets // Disconnect stream sockets
@ -234,7 +232,7 @@ int VirtualSocket::RecvFrom(void* pv,
*timestamp = -1; *timestamp = -1;
} }
CritScope cs(&crit_); webrtc::MutexLock lock(&mutex_);
// If we don't have a packet, then either error or wait for one to arrive. // If we don't have a packet, then either error or wait for one to arrive.
if (recv_buffer_.empty()) { if (recv_buffer_.empty()) {
if (async_) { if (async_) {
@ -277,7 +275,7 @@ int VirtualSocket::RecvFrom(void* pv,
} }
int VirtualSocket::Listen(int backlog) { int VirtualSocket::Listen(int backlog) {
CritScope cs(&crit_); webrtc::MutexLock lock(&mutex_);
RTC_DCHECK(SOCK_STREAM == type_); RTC_DCHECK(SOCK_STREAM == type_);
RTC_DCHECK(CS_CLOSED == state_); RTC_DCHECK(CS_CLOSED == state_);
if (local_addr_.IsNil()) { if (local_addr_.IsNil()) {
@ -285,13 +283,13 @@ int VirtualSocket::Listen(int backlog) {
return -1; return -1;
} }
RTC_DCHECK(nullptr == listen_queue_); RTC_DCHECK(nullptr == listen_queue_);
listen_queue_ = new ListenQueue; listen_queue_ = std::make_unique<ListenQueue>();
state_ = CS_CONNECTING; state_ = CS_CONNECTING;
return 0; return 0;
} }
VirtualSocket* VirtualSocket::Accept(SocketAddress* paddr) { VirtualSocket* VirtualSocket::Accept(SocketAddress* paddr) {
CritScope cs(&crit_); webrtc::MutexLock lock(&mutex_);
if (nullptr == listen_queue_) { if (nullptr == listen_queue_) {
error_ = EINVAL; error_ = EINVAL;
return nullptr; return nullptr;
@ -310,7 +308,7 @@ VirtualSocket* VirtualSocket::Accept(SocketAddress* paddr) {
delete socket; delete socket;
continue; continue;
} }
socket->CompleteConnect(remote_addr, false); socket->CompleteConnect(remote_addr);
if (paddr) { if (paddr) {
*paddr = remote_addr; *paddr = remote_addr;
} }
@ -349,9 +347,10 @@ int VirtualSocket::SetOption(Option opt, int value) {
void VirtualSocket::OnMessage(Message* pmsg) { void VirtualSocket::OnMessage(Message* pmsg) {
bool signal_read_event = false; bool signal_read_event = false;
bool signal_close_event = false; bool signal_close_event = false;
bool signal_connect_event = false;
int error_to_signal = 0; int error_to_signal = 0;
{ {
CritScope cs(&crit_); webrtc::MutexLock lock(&mutex_);
if (pmsg->message_id == MSG_ID_PACKET) { if (pmsg->message_id == MSG_ID_PACKET) {
RTC_DCHECK(nullptr != pmsg->pdata); RTC_DCHECK(nullptr != pmsg->pdata);
Packet* packet = static_cast<Packet*>(pmsg->pdata); Packet* packet = static_cast<Packet*>(pmsg->pdata);
@ -365,7 +364,8 @@ void VirtualSocket::OnMessage(Message* pmsg) {
listen_queue_->push_back(data->addr); listen_queue_->push_back(data->addr);
signal_read_event = async_; signal_read_event = async_;
} else if ((SOCK_STREAM == type_) && (CS_CONNECTING == state_)) { } else if ((SOCK_STREAM == type_) && (CS_CONNECTING == state_)) {
CompleteConnect(data->addr, true); CompleteConnect(data->addr);
signal_connect_event = async_;
} else { } else {
RTC_LOG(LS_VERBOSE) RTC_LOG(LS_VERBOSE)
<< "Socket at " << local_addr_.ToString() << " is not listening"; << "Socket at " << local_addr_.ToString() << " is not listening";
@ -386,14 +386,17 @@ void VirtualSocket::OnMessage(Message* pmsg) {
RTC_NOTREACHED(); RTC_NOTREACHED();
} }
} }
// Signal events without holding `crit_`, to avoid lock order inversion with // Signal events without holding `mutex_`, to avoid recursive locking, as well
// sigslot locks. // as issues with sigslot and lock order.
if (signal_read_event) { if (signal_read_event) {
SignalReadEvent(this); SignalReadEvent(this);
} }
if (signal_close_event) { if (signal_close_event) {
SignalCloseEvent(this, error_to_signal); SignalCloseEvent(this, error_to_signal);
} }
if (signal_connect_event) {
SignalConnectEvent(this);
}
} }
int VirtualSocket::InitiateConnect(const SocketAddress& addr, bool use_delay) { int VirtualSocket::InitiateConnect(const SocketAddress& addr, bool use_delay) {
@ -427,14 +430,11 @@ int VirtualSocket::InitiateConnect(const SocketAddress& addr, bool use_delay) {
return 0; return 0;
} }
void VirtualSocket::CompleteConnect(const SocketAddress& addr, bool notify) { void VirtualSocket::CompleteConnect(const SocketAddress& addr) {
RTC_DCHECK(CS_CONNECTING == state_); RTC_DCHECK(CS_CONNECTING == state_);
remote_addr_ = addr; remote_addr_ = addr;
state_ = CS_CONNECTED; state_ = CS_CONNECTED;
server_->AddConnection(remote_addr_, local_addr_, this); server_->AddConnection(remote_addr_, local_addr_, this);
if (async_ && notify) {
SignalConnectEvent(this);
}
} }
int VirtualSocket::SendUdp(const void* pv, int VirtualSocket::SendUdp(const void* pv,
@ -486,7 +486,7 @@ void VirtualSocket::OnSocketServerReadyToSend() {
} }
void VirtualSocket::SetToBlocked() { void VirtualSocket::SetToBlocked() {
CritScope cs(&crit_); webrtc::MutexLock lock(&mutex_);
ready_to_send_ = false; ready_to_send_ = false;
error_ = EWOULDBLOCK; error_ = EWOULDBLOCK;
} }
@ -536,7 +536,7 @@ int64_t VirtualSocket::UpdateOrderedDelivery(int64_t ts) {
} }
size_t VirtualSocket::PurgeNetworkPackets(int64_t cur_time) { size_t VirtualSocket::PurgeNetworkPackets(int64_t cur_time) {
CritScope cs(&crit_); webrtc::MutexLock lock(&mutex_);
while (!network_.empty() && (network_.front().done_time <= cur_time)) { while (!network_.empty() && (network_.front().done_time <= cur_time)) {
RTC_DCHECK(network_size_ >= network_.front().size); RTC_DCHECK(network_size_ >= network_.front().size);

View File

@ -17,11 +17,11 @@
#include "rtc_base/checks.h" #include "rtc_base/checks.h"
#include "rtc_base/constructor_magic.h" #include "rtc_base/constructor_magic.h"
#include "rtc_base/deprecated/recursive_critical_section.h"
#include "rtc_base/event.h" #include "rtc_base/event.h"
#include "rtc_base/fake_clock.h" #include "rtc_base/fake_clock.h"
#include "rtc_base/message_handler.h" #include "rtc_base/message_handler.h"
#include "rtc_base/socket_server.h" #include "rtc_base/socket_server.h"
#include "rtc_base/synchronization/mutex.h"
namespace rtc { namespace rtc {
@ -394,7 +394,7 @@ class VirtualSocket : public AsyncSocket,
typedef std::map<Option, int> OptionsMap; typedef std::map<Option, int> OptionsMap;
int InitiateConnect(const SocketAddress& addr, bool use_delay); int InitiateConnect(const SocketAddress& addr, bool use_delay);
void CompleteConnect(const SocketAddress& addr, bool notify); void CompleteConnect(const SocketAddress& addr);
int SendUdp(const void* pv, size_t cb, const SocketAddress& addr); int SendUdp(const void* pv, size_t cb, const SocketAddress& addr);
int SendTcp(const void* pv, size_t cb); int SendTcp(const void* pv, size_t cb);
@ -409,7 +409,8 @@ class VirtualSocket : public AsyncSocket,
SocketAddress remote_addr_; SocketAddress remote_addr_;
// Pending sockets which can be Accepted // Pending sockets which can be Accepted
ListenQueue* listen_queue_ RTC_GUARDED_BY(crit_) RTC_PT_GUARDED_BY(crit_); std::unique_ptr<ListenQueue> listen_queue_ RTC_GUARDED_BY(mutex_)
RTC_PT_GUARDED_BY(mutex_);
// Data which tcp has buffered for sending // Data which tcp has buffered for sending
SendBuffer send_buffer_; SendBuffer send_buffer_;
@ -417,8 +418,8 @@ class VirtualSocket : public AsyncSocket,
// Set back to true when the socket can send again. // Set back to true when the socket can send again.
bool ready_to_send_ = true; bool ready_to_send_ = true;
// Critical section to protect the recv_buffer and listen_queue_ // Mutex to protect the recv_buffer and listen_queue_
RecursiveCriticalSection crit_; webrtc::Mutex mutex_;
// Network model that enforces bandwidth and capacity constraints // Network model that enforces bandwidth and capacity constraints
NetworkQueue network_; NetworkQueue network_;
@ -428,7 +429,7 @@ class VirtualSocket : public AsyncSocket,
int64_t last_delivery_time_ = 0; int64_t last_delivery_time_ = 0;
// Data which has been received from the network // Data which has been received from the network
RecvBuffer recv_buffer_ RTC_GUARDED_BY(crit_); RecvBuffer recv_buffer_ RTC_GUARDED_BY(mutex_);
// The amount of data which is in flight or in recv_buffer_ // The amount of data which is in flight or in recv_buffer_
size_t recv_buffer_size_; size_t recv_buffer_size_;