diff --git a/p2p/client/basic_port_allocator.cc b/p2p/client/basic_port_allocator.cc index a190fb75da..b74365d682 100644 --- a/p2p/client/basic_port_allocator.cc +++ b/p2p/client/basic_port_allocator.cc @@ -12,12 +12,14 @@ #include #include +#include #include #include #include #include #include "absl/algorithm/container.h" +#include "absl/memory/memory.h" #include "p2p/base/basic_packet_socket_factory.h" #include "p2p/base/port.h" #include "p2p/base/stun_port.h" @@ -27,6 +29,7 @@ #include "rtc_base/checks.h" #include "rtc_base/helpers.h" #include "rtc_base/logging.h" +#include "rtc_base/task_utils/to_queued_task.h" #include "rtc_base/trace_event.h" #include "system_wrappers/include/field_trial.h" #include "system_wrappers/include/metrics.h" @@ -37,12 +40,7 @@ namespace cricket { namespace { enum { - MSG_CONFIG_START, - MSG_CONFIG_READY, - MSG_ALLOCATE, MSG_ALLOCATION_PHASE, - MSG_SEQUENCEOBJECTS_CREATED, - MSG_CONFIG_STOP, }; const int PHASE_UDP = 0; @@ -281,8 +279,6 @@ BasicPortAllocatorSession::~BasicPortAllocatorSession() { "BasicPortAllocatorSession::~BasicPortAllocatorSession"); RTC_DCHECK_RUN_ON(network_thread_); allocator_->network_manager()->StopUpdating(); - if (network_thread_ != NULL) - network_thread_->Clear(this); for (uint32_t i = 0; i < sequences_.size(); ++i) { // AllocationSequence should clear it's map entry for turn ports before @@ -294,8 +290,7 @@ BasicPortAllocatorSession::~BasicPortAllocatorSession() { for (it = ports_.begin(); it != ports_.end(); it++) delete it->port(); - for (uint32_t i = 0; i < configs_.size(); ++i) - delete configs_[i]; + configs_.clear(); for (uint32_t i = 0; i < sequences_.size(); ++i) delete sequences_[i]; @@ -375,7 +370,8 @@ void BasicPortAllocatorSession::StartGettingPorts() { socket_factory_ = owned_socket_factory_.get(); } - network_thread_->Post(RTC_FROM_HERE, this, MSG_CONFIG_START); + network_thread_->PostTask(webrtc::ToQueuedTask( + network_safety_, [this] { GetPortConfigurations(); })); RTC_LOG(LS_INFO) << "Start getting ports with turn_port_prune_policy " << turn_port_prune_policy_; @@ -391,11 +387,12 @@ void BasicPortAllocatorSession::StopGettingPorts() { void BasicPortAllocatorSession::ClearGettingPorts() { RTC_DCHECK_RUN_ON(network_thread_); - network_thread_->Clear(this, MSG_ALLOCATE); + ++allocation_epoch_; for (uint32_t i = 0; i < sequences_.size(); ++i) { sequences_[i]->Stop(); } - network_thread_->Post(RTC_FROM_HERE, this, MSG_CONFIG_STOP); + network_thread_->PostTask( + webrtc::ToQueuedTask(network_safety_, [this] { OnConfigStop(); })); state_ = SessionState::CLEARED; } @@ -579,28 +576,6 @@ bool BasicPortAllocatorSession::CandidatesAllocationDone() const { ports_, [](const PortData& port) { return port.inprogress(); }); } -void BasicPortAllocatorSession::OnMessage(rtc::Message* message) { - switch (message->message_id) { - case MSG_CONFIG_START: - GetPortConfigurations(); - break; - case MSG_CONFIG_READY: - OnConfigReady(static_cast(message->pdata)); - break; - case MSG_ALLOCATE: - OnAllocate(); - break; - case MSG_SEQUENCEOBJECTS_CREATED: - OnAllocationSequenceObjectsCreated(); - break; - case MSG_CONFIG_STOP: - OnConfigStop(); - break; - default: - RTC_NOTREACHED(); - } -} - void BasicPortAllocatorSession::UpdateIceParametersInternal() { RTC_DCHECK_RUN_ON(network_thread_); for (PortData& port : ports_) { @@ -612,26 +587,35 @@ void BasicPortAllocatorSession::UpdateIceParametersInternal() { void BasicPortAllocatorSession::GetPortConfigurations() { RTC_DCHECK_RUN_ON(network_thread_); - PortConfiguration* config = - new PortConfiguration(allocator_->stun_servers(), username(), password()); + auto config = std::make_unique(allocator_->stun_servers(), + username(), password()); for (const RelayServerConfig& turn_server : allocator_->turn_servers()) { config->AddRelay(turn_server); } - ConfigReady(config); + ConfigReady(std::move(config)); } void BasicPortAllocatorSession::ConfigReady(PortConfiguration* config) { RTC_DCHECK_RUN_ON(network_thread_); - network_thread_->Post(RTC_FROM_HERE, this, MSG_CONFIG_READY, config); + ConfigReady(absl::WrapUnique(config)); +} + +void BasicPortAllocatorSession::ConfigReady( + std::unique_ptr config) { + RTC_DCHECK_RUN_ON(network_thread_); + network_thread_->PostTask(webrtc::ToQueuedTask( + network_safety_, [this, config = std::move(config)]() mutable { + OnConfigReady(std::move(config)); + })); } // Adds a configuration to the list. -void BasicPortAllocatorSession::OnConfigReady(PortConfiguration* config) { +void BasicPortAllocatorSession::OnConfigReady( + std::unique_ptr config) { RTC_DCHECK_RUN_ON(network_thread_); - if (config) { - configs_.push_back(config); - } + if (config) + configs_.push_back(std::move(config)); AllocatePorts(); } @@ -669,11 +653,16 @@ void BasicPortAllocatorSession::OnConfigStop() { void BasicPortAllocatorSession::AllocatePorts() { RTC_DCHECK_RUN_ON(network_thread_); - network_thread_->Post(RTC_FROM_HERE, this, MSG_ALLOCATE); + network_thread_->PostTask(webrtc::ToQueuedTask( + network_safety_, [this, allocation_epoch = allocation_epoch_] { + OnAllocate(allocation_epoch); + })); } -void BasicPortAllocatorSession::OnAllocate() { +void BasicPortAllocatorSession::OnAllocate(int allocation_epoch) { RTC_DCHECK_RUN_ON(network_thread_); + if (allocation_epoch != allocation_epoch_) + return; if (network_manager_started_ && !IsStopped()) { bool disable_equivalent_phases = true; @@ -779,7 +768,8 @@ void BasicPortAllocatorSession::DoAllocate(bool disable_equivalent) { done_signal_needed = true; } else { RTC_LOG(LS_INFO) << "Allocate ports on " << networks.size() << " networks"; - PortConfiguration* config = configs_.empty() ? nullptr : configs_.back(); + PortConfiguration* config = + configs_.empty() ? nullptr : configs_.back().get(); for (uint32_t i = 0; i < networks.size(); ++i) { uint32_t sequence_flags = flags(); if ((sequence_flags & DISABLE_ALL_PHASES) == DISABLE_ALL_PHASES) { @@ -829,7 +819,8 @@ void BasicPortAllocatorSession::DoAllocate(bool disable_equivalent) { } } if (done_signal_needed) { - network_thread_->Post(RTC_FROM_HERE, this, MSG_SEQUENCEOBJECTS_CREATED); + network_thread_->PostTask(webrtc::ToQueuedTask( + network_safety_, [this] { OnAllocationSequenceObjectsCreated(); })); } } @@ -1663,8 +1654,6 @@ PortConfiguration::PortConfiguration(const ServerAddresses& stun_servers, webrtc::field_trial::IsDisabled("WebRTC-UseTurnServerAsStunServer"); } -PortConfiguration::~PortConfiguration() = default; - ServerAddresses PortConfiguration::StunServers() { if (!stun_address.IsNil() && stun_servers.find(stun_address) == stun_servers.end()) { diff --git a/p2p/client/basic_port_allocator.h b/p2p/client/basic_port_allocator.h index b27016a1dc..2964daf97a 100644 --- a/p2p/client/basic_port_allocator.h +++ b/p2p/client/basic_port_allocator.h @@ -22,7 +22,9 @@ #include "rtc_base/checks.h" #include "rtc_base/network.h" #include "rtc_base/system/rtc_export.h" +#include "rtc_base/task_utils/pending_task_safety_flag.h" #include "rtc_base/thread.h" +#include "rtc_base/thread_annotations.h" namespace cricket { @@ -106,8 +108,9 @@ enum class SessionState { // process will be started. }; -class RTC_EXPORT BasicPortAllocatorSession : public PortAllocatorSession, - public rtc::MessageHandler { +// This class is thread-compatible and assumes it's created, operated upon and +// destroyed on the network thread. +class RTC_EXPORT BasicPortAllocatorSession : public PortAllocatorSession { public: BasicPortAllocatorSession(BasicPortAllocator* allocator, const std::string& content_name, @@ -155,10 +158,11 @@ class RTC_EXPORT BasicPortAllocatorSession : public PortAllocatorSession, // Adds a port configuration that is now ready. Once we have one for each // network (or a timeout occurs), we will start allocating ports. - virtual void ConfigReady(PortConfiguration* config); - - // MessageHandler. Can be overriden if message IDs do not conflict. - void OnMessage(rtc::Message* message) override; + void ConfigReady(std::unique_ptr config); + // TODO(bugs.webrtc.org/12840) Remove once unused in downstream projects. + ABSL_DEPRECATED( + "Use ConfigReady(std::unique_ptr) instead!") + void ConfigReady(PortConfiguration* config); private: class PortData { @@ -213,10 +217,10 @@ class RTC_EXPORT BasicPortAllocatorSession : public PortAllocatorSession, State state_ = STATE_INPROGRESS; }; - void OnConfigReady(PortConfiguration* config); + void OnConfigReady(std::unique_ptr config); void OnConfigStop(); void AllocatePorts(); - void OnAllocate(); + void OnAllocate(int allocation_epoch); void DoAllocate(bool disable_equivalent_phases); void OnNetworksChanged(); void OnAllocationSequenceObjectsCreated(); @@ -266,7 +270,7 @@ class RTC_EXPORT BasicPortAllocatorSession : public PortAllocatorSession, bool allocation_started_; bool network_manager_started_; bool allocation_sequences_created_; - std::vector configs_; + std::vector> configs_; std::vector sequences_; std::vector ports_; std::vector candidate_error_events_; @@ -274,13 +278,15 @@ class RTC_EXPORT BasicPortAllocatorSession : public PortAllocatorSession, // Policy on how to prune turn ports, taken from the port allocator. webrtc::PortPrunePolicy turn_port_prune_policy_; SessionState state_ = SessionState::CLEARED; + int allocation_epoch_ RTC_GUARDED_BY(network_thread_) = 0; + webrtc::ScopedTaskSafety network_safety_; friend class AllocationSequence; }; // Records configuration information useful in creating ports. // TODO(deadbeef): Rename "relay" to "turn_server" in this struct. -struct RTC_EXPORT PortConfiguration : public rtc::MessageData { +struct RTC_EXPORT PortConfiguration { // TODO(jiayl): remove |stun_address| when Chrome is updated. rtc::SocketAddress stun_address; ServerAddresses stun_servers; @@ -300,8 +306,6 @@ struct RTC_EXPORT PortConfiguration : public rtc::MessageData { const std::string& username, const std::string& password); - ~PortConfiguration() override; - // Returns addresses of both the explicitly configured STUN servers, // and TURN servers that should be used as STUN servers. ServerAddresses StunServers();