Replace RecursiveCriticalSection with Mutex in EmulatedEndpointImpl

Bug: webrtc:11567
Change-Id: Ie9a1f123e7d2858c03414336875d8c537be67702
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/208403
Commit-Queue: Niels Moller <nisse@webrtc.org>
Reviewed-by: Artem Titov <titovartem@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#33348}
This commit is contained in:
Niels Möller
2021-02-26 09:24:51 +01:00
committed by Commit Bot
parent 07a01d09e4
commit 376cf384ac
7 changed files with 65 additions and 37 deletions

View File

@ -222,17 +222,21 @@ class EmulatedEndpoint : public EmulatedNetworkReceiverInterface {
// |desired_port| != 0 and is free or will be the one, selected by endpoint) // |desired_port| != 0 and is free or will be the one, selected by endpoint)
// or absl::nullopt if desired_port in used. Also fails if there are no more // or absl::nullopt if desired_port in used. Also fails if there are no more
// free ports to bind to. // free ports to bind to.
//
// The Bind- and Unbind-methods must not be called from within a bound
// receiver's OnPacketReceived method.
virtual absl::optional<uint16_t> BindReceiver( virtual absl::optional<uint16_t> BindReceiver(
uint16_t desired_port, uint16_t desired_port,
EmulatedNetworkReceiverInterface* receiver) = 0; EmulatedNetworkReceiverInterface* receiver) = 0;
// Unbinds receiver from the specified port. Do nothing if no receiver was // Unbinds receiver from the specified port. Do nothing if no receiver was
// binded before. // bound before. After this method returns, no more packets can be delivered
// to the receiver, and it is safe to destroy it.
virtual void UnbindReceiver(uint16_t port) = 0; virtual void UnbindReceiver(uint16_t port) = 0;
// Binds receiver that will accept all packets which arrived on any port // Binds receiver that will accept all packets which arrived on any port
// for which there are no binded receiver. // for which there are no bound receiver.
virtual void BindDefaultReceiver( virtual void BindDefaultReceiver(
EmulatedNetworkReceiverInterface* receiver) = 0; EmulatedNetworkReceiverInterface* receiver) = 0;
// Unbinds default receiver. Do nothing if no default receiver was binded // Unbinds default receiver. Do nothing if no default receiver was bound
// before. // before.
virtual void UnbindDefaultReceiver() = 0; virtual void UnbindDefaultReceiver() = 0;
virtual rtc::IPAddress GetPeerLocalAddress() const = 0; virtual rtc::IPAddress GetPeerLocalAddress() const = 0;

View File

@ -513,7 +513,20 @@ void EmulatedEndpointImpl::SendPacket(const rtc::SocketAddress& from,
absl::optional<uint16_t> EmulatedEndpointImpl::BindReceiver( absl::optional<uint16_t> EmulatedEndpointImpl::BindReceiver(
uint16_t desired_port, uint16_t desired_port,
EmulatedNetworkReceiverInterface* receiver) { EmulatedNetworkReceiverInterface* receiver) {
rtc::CritScope crit(&receiver_lock_); return BindReceiverInternal(desired_port, receiver, /*is_one_shot=*/false);
}
absl::optional<uint16_t> EmulatedEndpointImpl::BindOneShotReceiver(
uint16_t desired_port,
EmulatedNetworkReceiverInterface* receiver) {
return BindReceiverInternal(desired_port, receiver, /*is_one_shot=*/true);
}
absl::optional<uint16_t> EmulatedEndpointImpl::BindReceiverInternal(
uint16_t desired_port,
EmulatedNetworkReceiverInterface* receiver,
bool is_one_shot) {
MutexLock lock(&receiver_lock_);
uint16_t port = desired_port; uint16_t port = desired_port;
if (port == 0) { if (port == 0) {
// Because client can specify its own port, next_port_ can be already in // Because client can specify its own port, next_port_ can be already in
@ -530,7 +543,8 @@ absl::optional<uint16_t> EmulatedEndpointImpl::BindReceiver(
} }
RTC_CHECK(port != 0) << "Can't find free port for receiver in endpoint " RTC_CHECK(port != 0) << "Can't find free port for receiver in endpoint "
<< options_.log_name << "; id=" << options_.id; << options_.log_name << "; id=" << options_.id;
bool result = port_to_receiver_.insert({port, receiver}).second; bool result =
port_to_receiver_.insert({port, {receiver, is_one_shot}}).second;
if (!result) { if (!result) {
RTC_LOG(INFO) << "Can't bind receiver to used port " << desired_port RTC_LOG(INFO) << "Can't bind receiver to used port " << desired_port
<< " in endpoint " << options_.log_name << " in endpoint " << options_.log_name
@ -553,7 +567,7 @@ uint16_t EmulatedEndpointImpl::NextPort() {
} }
void EmulatedEndpointImpl::UnbindReceiver(uint16_t port) { void EmulatedEndpointImpl::UnbindReceiver(uint16_t port) {
rtc::CritScope crit(&receiver_lock_); MutexLock lock(&receiver_lock_);
RTC_LOG(INFO) << "Receiver is removed on port " << port << " from endpoint " RTC_LOG(INFO) << "Receiver is removed on port " << port << " from endpoint "
<< options_.log_name << "; id=" << options_.id; << options_.log_name << "; id=" << options_.id;
port_to_receiver_.erase(port); port_to_receiver_.erase(port);
@ -561,7 +575,7 @@ void EmulatedEndpointImpl::UnbindReceiver(uint16_t port) {
void EmulatedEndpointImpl::BindDefaultReceiver( void EmulatedEndpointImpl::BindDefaultReceiver(
EmulatedNetworkReceiverInterface* receiver) { EmulatedNetworkReceiverInterface* receiver) {
rtc::CritScope crit(&receiver_lock_); MutexLock lock(&receiver_lock_);
RTC_CHECK(!default_receiver_.has_value()) RTC_CHECK(!default_receiver_.has_value())
<< "Endpoint " << options_.log_name << "; id=" << options_.id << "Endpoint " << options_.log_name << "; id=" << options_.id
<< " already has default receiver"; << " already has default receiver";
@ -571,7 +585,7 @@ void EmulatedEndpointImpl::BindDefaultReceiver(
} }
void EmulatedEndpointImpl::UnbindDefaultReceiver() { void EmulatedEndpointImpl::UnbindDefaultReceiver() {
rtc::CritScope crit(&receiver_lock_); MutexLock lock(&receiver_lock_);
RTC_LOG(INFO) << "Default receiver is removed from endpoint " RTC_LOG(INFO) << "Default receiver is removed from endpoint "
<< options_.log_name << "; id=" << options_.id; << options_.log_name << "; id=" << options_.id;
default_receiver_ = absl::nullopt; default_receiver_ = absl::nullopt;
@ -589,7 +603,7 @@ void EmulatedEndpointImpl::OnPacketReceived(EmulatedIpPacket packet) {
<< packet.to.ipaddr().ToString() << packet.to.ipaddr().ToString()
<< "; Receiver options_.ip=" << options_.ip.ToString(); << "; Receiver options_.ip=" << options_.ip.ToString();
} }
rtc::CritScope crit(&receiver_lock_); MutexLock lock(&receiver_lock_);
stats_builder_.OnPacketReceived(clock_->CurrentTime(), packet.from.ipaddr(), stats_builder_.OnPacketReceived(clock_->CurrentTime(), packet.from.ipaddr(),
DataSize::Bytes(packet.ip_packet_size()), DataSize::Bytes(packet.ip_packet_size()),
options_.stats_gathering_mode); options_.stats_gathering_mode);
@ -610,10 +624,14 @@ void EmulatedEndpointImpl::OnPacketReceived(EmulatedIpPacket packet) {
options_.stats_gathering_mode); options_.stats_gathering_mode);
return; return;
} }
// Endpoint assumes frequent calls to bind and unbind methods, so it holds // Endpoint holds lock during packet processing to ensure that a call to
// lock during packet processing to ensure that receiver won't be deleted // UnbindReceiver followed by a delete of the receiver cannot race with this
// before call to OnPacketReceived. // call to OnPacketReceived.
it->second->OnPacketReceived(std::move(packet)); it->second.receiver->OnPacketReceived(std::move(packet));
if (it->second.is_one_shot) {
port_to_receiver_.erase(it);
}
} }
void EmulatedEndpointImpl::Enable() { void EmulatedEndpointImpl::Enable() {

View File

@ -30,6 +30,7 @@
#include "rtc_base/network.h" #include "rtc_base/network.h"
#include "rtc_base/network_constants.h" #include "rtc_base/network_constants.h"
#include "rtc_base/socket_address.h" #include "rtc_base/socket_address.h"
#include "rtc_base/synchronization/mutex.h"
#include "rtc_base/task_queue_for_test.h" #include "rtc_base/task_queue_for_test.h"
#include "rtc_base/task_utils/repeating_task.h" #include "rtc_base/task_utils/repeating_task.h"
#include "rtc_base/thread_annotations.h" #include "rtc_base/thread_annotations.h"
@ -532,6 +533,11 @@ class EmulatedEndpointImpl : public EmulatedEndpoint {
absl::optional<uint16_t> BindReceiver( absl::optional<uint16_t> BindReceiver(
uint16_t desired_port, uint16_t desired_port,
EmulatedNetworkReceiverInterface* receiver) override; EmulatedNetworkReceiverInterface* receiver) override;
// Binds a receiver, and automatically removes the binding after first call to
// OnPacketReceived.
absl::optional<uint16_t> BindOneShotReceiver(
uint16_t desired_port,
EmulatedNetworkReceiverInterface* receiver);
void UnbindReceiver(uint16_t port) override; void UnbindReceiver(uint16_t port) override;
void BindDefaultReceiver(EmulatedNetworkReceiverInterface* receiver) override; void BindDefaultReceiver(EmulatedNetworkReceiverInterface* receiver) override;
void UnbindDefaultReceiver() override; void UnbindDefaultReceiver() override;
@ -550,10 +556,20 @@ class EmulatedEndpointImpl : public EmulatedEndpoint {
std::unique_ptr<EmulatedNetworkStats> stats() const; std::unique_ptr<EmulatedNetworkStats> stats() const;
private: private:
struct ReceiverBinding {
EmulatedNetworkReceiverInterface* receiver;
bool is_one_shot;
};
absl::optional<uint16_t> BindReceiverInternal(
uint16_t desired_port,
EmulatedNetworkReceiverInterface* receiver,
bool is_one_shot);
static constexpr uint16_t kFirstEphemeralPort = 49152; static constexpr uint16_t kFirstEphemeralPort = 49152;
uint16_t NextPort() RTC_EXCLUSIVE_LOCKS_REQUIRED(receiver_lock_); uint16_t NextPort() RTC_EXCLUSIVE_LOCKS_REQUIRED(receiver_lock_);
rtc::RecursiveCriticalSection receiver_lock_; Mutex receiver_lock_;
SequenceChecker enabled_state_checker_; SequenceChecker enabled_state_checker_;
const Options options_; const Options options_;
@ -566,7 +582,7 @@ class EmulatedEndpointImpl : public EmulatedEndpoint {
uint16_t next_port_ RTC_GUARDED_BY(receiver_lock_); uint16_t next_port_ RTC_GUARDED_BY(receiver_lock_);
absl::optional<EmulatedNetworkReceiverInterface*> default_receiver_ absl::optional<EmulatedNetworkReceiverInterface*> default_receiver_
RTC_GUARDED_BY(receiver_lock_); RTC_GUARDED_BY(receiver_lock_);
std::map<uint16_t, EmulatedNetworkReceiverInterface*> port_to_receiver_ std::map<uint16_t, ReceiverBinding> port_to_receiver_
RTC_GUARDED_BY(receiver_lock_); RTC_GUARDED_BY(receiver_lock_);
EmulatedNetworkStatsBuilder stats_builder_ RTC_GUARDED_BY(task_queue_); EmulatedNetworkStatsBuilder stats_builder_ RTC_GUARDED_BY(task_queue_);

View File

@ -87,7 +87,7 @@ NetworkEmulationManagerImpl::NodeBuilder() {
return SimulatedNetworkNode::Builder(this); return SimulatedNetworkNode::Builder(this);
} }
EmulatedEndpoint* NetworkEmulationManagerImpl::CreateEndpoint( EmulatedEndpointImpl* NetworkEmulationManagerImpl::CreateEndpoint(
EmulatedEndpointConfig config) { EmulatedEndpointConfig config) {
absl::optional<rtc::IPAddress> ip = config.ip; absl::optional<rtc::IPAddress> ip = config.ip;
if (!ip) { if (!ip) {
@ -109,7 +109,7 @@ EmulatedEndpoint* NetworkEmulationManagerImpl::CreateEndpoint(
auto node = std::make_unique<EmulatedEndpointImpl>( auto node = std::make_unique<EmulatedEndpointImpl>(
EmulatedEndpointImpl::Options(next_node_id_++, *ip, config), EmulatedEndpointImpl::Options(next_node_id_++, *ip, config),
config.start_as_enabled, &task_queue_, clock_); config.start_as_enabled, &task_queue_, clock_);
EmulatedEndpoint* out = node.get(); EmulatedEndpointImpl* out = node.get();
endpoints_.push_back(std::move(node)); endpoints_.push_back(std::move(node));
return out; return out;
} }
@ -226,7 +226,7 @@ TcpMessageRoute* NetworkEmulationManagerImpl::CreateTcpRoute(
CrossTrafficRoute* NetworkEmulationManagerImpl::CreateCrossTrafficRoute( CrossTrafficRoute* NetworkEmulationManagerImpl::CreateCrossTrafficRoute(
const std::vector<EmulatedNetworkNode*>& via_nodes) { const std::vector<EmulatedNetworkNode*>& via_nodes) {
RTC_CHECK(!via_nodes.empty()); RTC_CHECK(!via_nodes.empty());
EmulatedEndpoint* endpoint = CreateEndpoint(EmulatedEndpointConfig()); EmulatedEndpointImpl* endpoint = CreateEndpoint(EmulatedEndpointConfig());
// Setup a route via specified nodes. // Setup a route via specified nodes.
EmulatedNetworkNode* cur_node = via_nodes[0]; EmulatedNetworkNode* cur_node = via_nodes[0];

View File

@ -50,7 +50,7 @@ class NetworkEmulationManagerImpl : public NetworkEmulationManager {
SimulatedNetworkNode::Builder NodeBuilder() override; SimulatedNetworkNode::Builder NodeBuilder() override;
EmulatedEndpoint* CreateEndpoint(EmulatedEndpointConfig config) override; EmulatedEndpointImpl* CreateEndpoint(EmulatedEndpointConfig config) override;
void EnableEndpoint(EmulatedEndpoint* endpoint) override; void EnableEndpoint(EmulatedEndpoint* endpoint) override;
void DisableEndpoint(EmulatedEndpoint* endpoint) override; void DisableEndpoint(EmulatedEndpoint* endpoint) override;

View File

@ -29,26 +29,15 @@ class NullReceiver : public EmulatedNetworkReceiverInterface {
class ActionReceiver : public EmulatedNetworkReceiverInterface { class ActionReceiver : public EmulatedNetworkReceiverInterface {
public: public:
ActionReceiver(std::function<void()> action, EmulatedEndpoint* endpoint) explicit ActionReceiver(std::function<void()> action) : action_(action) {}
: action_(action), endpoint_(endpoint) {}
~ActionReceiver() override = default; ~ActionReceiver() override = default;
void OnPacketReceived(EmulatedIpPacket packet) override { void OnPacketReceived(EmulatedIpPacket packet) override {
RTC_DCHECK(port_);
action_(); action_();
endpoint_->UnbindReceiver(port_.value());
} }
// We can't set port in constructor, because port will be provided by
// endpoint, when this receiver will be binded to that endpoint.
void SetPort(uint16_t port) { port_ = port; }
private: private:
std::function<void()> action_; std::function<void()> action_;
// Endpoint and port will be used to free port in the endpoint after action
// will be done.
EmulatedEndpoint* endpoint_;
absl::optional<uint16_t> port_ = absl::nullopt;
}; };
} // namespace } // namespace
@ -56,7 +45,7 @@ class ActionReceiver : public EmulatedNetworkReceiverInterface {
CrossTrafficRouteImpl::CrossTrafficRouteImpl( CrossTrafficRouteImpl::CrossTrafficRouteImpl(
Clock* clock, Clock* clock,
EmulatedNetworkReceiverInterface* receiver, EmulatedNetworkReceiverInterface* receiver,
EmulatedEndpoint* endpoint) EmulatedEndpointImpl* endpoint)
: clock_(clock), receiver_(receiver), endpoint_(endpoint) { : clock_(clock), receiver_(receiver), endpoint_(endpoint) {
null_receiver_ = std::make_unique<NullReceiver>(); null_receiver_ = std::make_unique<NullReceiver>();
absl::optional<uint16_t> port = absl::optional<uint16_t> port =
@ -75,11 +64,12 @@ void CrossTrafficRouteImpl::TriggerPacketBurst(size_t num_packets,
void CrossTrafficRouteImpl::NetworkDelayedAction(size_t packet_size, void CrossTrafficRouteImpl::NetworkDelayedAction(size_t packet_size,
std::function<void()> action) { std::function<void()> action) {
auto action_receiver = std::make_unique<ActionReceiver>(action, endpoint_); auto action_receiver = std::make_unique<ActionReceiver>(action);
// BindOneShotReceiver arranges to free the port in the endpoint after the
// action is done.
absl::optional<uint16_t> port = absl::optional<uint16_t> port =
endpoint_->BindReceiver(0, action_receiver.get()); endpoint_->BindOneShotReceiver(0, action_receiver.get());
RTC_DCHECK(port); RTC_DCHECK(port);
action_receiver->SetPort(port.value());
actions_.push_back(std::move(action_receiver)); actions_.push_back(std::move(action_receiver));
SendPacket(packet_size, port.value()); SendPacket(packet_size, port.value());
} }

View File

@ -28,7 +28,7 @@ class CrossTrafficRouteImpl final : public CrossTrafficRoute {
public: public:
CrossTrafficRouteImpl(Clock* clock, CrossTrafficRouteImpl(Clock* clock,
EmulatedNetworkReceiverInterface* receiver, EmulatedNetworkReceiverInterface* receiver,
EmulatedEndpoint* endpoint); EmulatedEndpointImpl* endpoint);
~CrossTrafficRouteImpl(); ~CrossTrafficRouteImpl();
// Triggers sending of dummy packets with size |packet_size| bytes. // Triggers sending of dummy packets with size |packet_size| bytes.
@ -44,7 +44,7 @@ class CrossTrafficRouteImpl final : public CrossTrafficRoute {
Clock* const clock_; Clock* const clock_;
EmulatedNetworkReceiverInterface* const receiver_; EmulatedNetworkReceiverInterface* const receiver_;
EmulatedEndpoint* const endpoint_; EmulatedEndpointImpl* const endpoint_;
uint16_t null_receiver_port_; uint16_t null_receiver_port_;
std::unique_ptr<EmulatedNetworkReceiverInterface> null_receiver_; std::unique_ptr<EmulatedNetworkReceiverInterface> null_receiver_;