BasicPortAllocatorSession: migrate to TaskQueue.

Removes dependence on rtc::Thread APIs from BPAS, which removes
the need to remove callbacks via rtc::Thread::Clear().

Bug: webrtc:12840
Change-Id: I0bcc1828c5ab38f521b583f52707174961f28e8a
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/221366
Commit-Queue: Markus Handell <handellm@webrtc.org>
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#34239}
This commit is contained in:
Markus Handell
2021-06-07 13:37:24 +02:00
committed by WebRTC LUCI CQ
parent 637a9eebd7
commit fd89fc75cc
2 changed files with 53 additions and 60 deletions

View File

@ -12,12 +12,14 @@
#include <algorithm> #include <algorithm>
#include <functional> #include <functional>
#include <memory>
#include <set> #include <set>
#include <string> #include <string>
#include <utility> #include <utility>
#include <vector> #include <vector>
#include "absl/algorithm/container.h" #include "absl/algorithm/container.h"
#include "absl/memory/memory.h"
#include "p2p/base/basic_packet_socket_factory.h" #include "p2p/base/basic_packet_socket_factory.h"
#include "p2p/base/port.h" #include "p2p/base/port.h"
#include "p2p/base/stun_port.h" #include "p2p/base/stun_port.h"
@ -27,6 +29,7 @@
#include "rtc_base/checks.h" #include "rtc_base/checks.h"
#include "rtc_base/helpers.h" #include "rtc_base/helpers.h"
#include "rtc_base/logging.h" #include "rtc_base/logging.h"
#include "rtc_base/task_utils/to_queued_task.h"
#include "rtc_base/trace_event.h" #include "rtc_base/trace_event.h"
#include "system_wrappers/include/field_trial.h" #include "system_wrappers/include/field_trial.h"
#include "system_wrappers/include/metrics.h" #include "system_wrappers/include/metrics.h"
@ -37,12 +40,7 @@ namespace cricket {
namespace { namespace {
enum { enum {
MSG_CONFIG_START,
MSG_CONFIG_READY,
MSG_ALLOCATE,
MSG_ALLOCATION_PHASE, MSG_ALLOCATION_PHASE,
MSG_SEQUENCEOBJECTS_CREATED,
MSG_CONFIG_STOP,
}; };
const int PHASE_UDP = 0; const int PHASE_UDP = 0;
@ -281,8 +279,6 @@ BasicPortAllocatorSession::~BasicPortAllocatorSession() {
"BasicPortAllocatorSession::~BasicPortAllocatorSession"); "BasicPortAllocatorSession::~BasicPortAllocatorSession");
RTC_DCHECK_RUN_ON(network_thread_); RTC_DCHECK_RUN_ON(network_thread_);
allocator_->network_manager()->StopUpdating(); allocator_->network_manager()->StopUpdating();
if (network_thread_ != NULL)
network_thread_->Clear(this);
for (uint32_t i = 0; i < sequences_.size(); ++i) { for (uint32_t i = 0; i < sequences_.size(); ++i) {
// AllocationSequence should clear it's map entry for turn ports before // AllocationSequence should clear it's map entry for turn ports before
@ -294,8 +290,7 @@ BasicPortAllocatorSession::~BasicPortAllocatorSession() {
for (it = ports_.begin(); it != ports_.end(); it++) for (it = ports_.begin(); it != ports_.end(); it++)
delete it->port(); delete it->port();
for (uint32_t i = 0; i < configs_.size(); ++i) configs_.clear();
delete configs_[i];
for (uint32_t i = 0; i < sequences_.size(); ++i) for (uint32_t i = 0; i < sequences_.size(); ++i)
delete sequences_[i]; delete sequences_[i];
@ -375,7 +370,8 @@ void BasicPortAllocatorSession::StartGettingPorts() {
socket_factory_ = owned_socket_factory_.get(); socket_factory_ = owned_socket_factory_.get();
} }
network_thread_->Post(RTC_FROM_HERE, this, MSG_CONFIG_START); network_thread_->PostTask(webrtc::ToQueuedTask(
network_safety_, [this] { GetPortConfigurations(); }));
RTC_LOG(LS_INFO) << "Start getting ports with turn_port_prune_policy " RTC_LOG(LS_INFO) << "Start getting ports with turn_port_prune_policy "
<< turn_port_prune_policy_; << turn_port_prune_policy_;
@ -391,11 +387,12 @@ void BasicPortAllocatorSession::StopGettingPorts() {
void BasicPortAllocatorSession::ClearGettingPorts() { void BasicPortAllocatorSession::ClearGettingPorts() {
RTC_DCHECK_RUN_ON(network_thread_); RTC_DCHECK_RUN_ON(network_thread_);
network_thread_->Clear(this, MSG_ALLOCATE); ++allocation_epoch_;
for (uint32_t i = 0; i < sequences_.size(); ++i) { for (uint32_t i = 0; i < sequences_.size(); ++i) {
sequences_[i]->Stop(); sequences_[i]->Stop();
} }
network_thread_->Post(RTC_FROM_HERE, this, MSG_CONFIG_STOP); network_thread_->PostTask(
webrtc::ToQueuedTask(network_safety_, [this] { OnConfigStop(); }));
state_ = SessionState::CLEARED; state_ = SessionState::CLEARED;
} }
@ -579,28 +576,6 @@ bool BasicPortAllocatorSession::CandidatesAllocationDone() const {
ports_, [](const PortData& port) { return port.inprogress(); }); ports_, [](const PortData& port) { return port.inprogress(); });
} }
void BasicPortAllocatorSession::OnMessage(rtc::Message* message) {
switch (message->message_id) {
case MSG_CONFIG_START:
GetPortConfigurations();
break;
case MSG_CONFIG_READY:
OnConfigReady(static_cast<PortConfiguration*>(message->pdata));
break;
case MSG_ALLOCATE:
OnAllocate();
break;
case MSG_SEQUENCEOBJECTS_CREATED:
OnAllocationSequenceObjectsCreated();
break;
case MSG_CONFIG_STOP:
OnConfigStop();
break;
default:
RTC_NOTREACHED();
}
}
void BasicPortAllocatorSession::UpdateIceParametersInternal() { void BasicPortAllocatorSession::UpdateIceParametersInternal() {
RTC_DCHECK_RUN_ON(network_thread_); RTC_DCHECK_RUN_ON(network_thread_);
for (PortData& port : ports_) { for (PortData& port : ports_) {
@ -612,26 +587,35 @@ void BasicPortAllocatorSession::UpdateIceParametersInternal() {
void BasicPortAllocatorSession::GetPortConfigurations() { void BasicPortAllocatorSession::GetPortConfigurations() {
RTC_DCHECK_RUN_ON(network_thread_); RTC_DCHECK_RUN_ON(network_thread_);
PortConfiguration* config = auto config = std::make_unique<PortConfiguration>(allocator_->stun_servers(),
new PortConfiguration(allocator_->stun_servers(), username(), password()); username(), password());
for (const RelayServerConfig& turn_server : allocator_->turn_servers()) { for (const RelayServerConfig& turn_server : allocator_->turn_servers()) {
config->AddRelay(turn_server); config->AddRelay(turn_server);
} }
ConfigReady(config); ConfigReady(std::move(config));
} }
void BasicPortAllocatorSession::ConfigReady(PortConfiguration* config) { void BasicPortAllocatorSession::ConfigReady(PortConfiguration* config) {
RTC_DCHECK_RUN_ON(network_thread_); RTC_DCHECK_RUN_ON(network_thread_);
network_thread_->Post(RTC_FROM_HERE, this, MSG_CONFIG_READY, config); ConfigReady(absl::WrapUnique(config));
}
void BasicPortAllocatorSession::ConfigReady(
std::unique_ptr<PortConfiguration> config) {
RTC_DCHECK_RUN_ON(network_thread_);
network_thread_->PostTask(webrtc::ToQueuedTask(
network_safety_, [this, config = std::move(config)]() mutable {
OnConfigReady(std::move(config));
}));
} }
// Adds a configuration to the list. // Adds a configuration to the list.
void BasicPortAllocatorSession::OnConfigReady(PortConfiguration* config) { void BasicPortAllocatorSession::OnConfigReady(
std::unique_ptr<PortConfiguration> config) {
RTC_DCHECK_RUN_ON(network_thread_); RTC_DCHECK_RUN_ON(network_thread_);
if (config) { if (config)
configs_.push_back(config); configs_.push_back(std::move(config));
}
AllocatePorts(); AllocatePorts();
} }
@ -669,11 +653,16 @@ void BasicPortAllocatorSession::OnConfigStop() {
void BasicPortAllocatorSession::AllocatePorts() { void BasicPortAllocatorSession::AllocatePorts() {
RTC_DCHECK_RUN_ON(network_thread_); RTC_DCHECK_RUN_ON(network_thread_);
network_thread_->Post(RTC_FROM_HERE, this, MSG_ALLOCATE); network_thread_->PostTask(webrtc::ToQueuedTask(
network_safety_, [this, allocation_epoch = allocation_epoch_] {
OnAllocate(allocation_epoch);
}));
} }
void BasicPortAllocatorSession::OnAllocate() { void BasicPortAllocatorSession::OnAllocate(int allocation_epoch) {
RTC_DCHECK_RUN_ON(network_thread_); RTC_DCHECK_RUN_ON(network_thread_);
if (allocation_epoch != allocation_epoch_)
return;
if (network_manager_started_ && !IsStopped()) { if (network_manager_started_ && !IsStopped()) {
bool disable_equivalent_phases = true; bool disable_equivalent_phases = true;
@ -779,7 +768,8 @@ void BasicPortAllocatorSession::DoAllocate(bool disable_equivalent) {
done_signal_needed = true; done_signal_needed = true;
} else { } else {
RTC_LOG(LS_INFO) << "Allocate ports on " << networks.size() << " networks"; RTC_LOG(LS_INFO) << "Allocate ports on " << networks.size() << " networks";
PortConfiguration* config = configs_.empty() ? nullptr : configs_.back(); PortConfiguration* config =
configs_.empty() ? nullptr : configs_.back().get();
for (uint32_t i = 0; i < networks.size(); ++i) { for (uint32_t i = 0; i < networks.size(); ++i) {
uint32_t sequence_flags = flags(); uint32_t sequence_flags = flags();
if ((sequence_flags & DISABLE_ALL_PHASES) == DISABLE_ALL_PHASES) { if ((sequence_flags & DISABLE_ALL_PHASES) == DISABLE_ALL_PHASES) {
@ -829,7 +819,8 @@ void BasicPortAllocatorSession::DoAllocate(bool disable_equivalent) {
} }
} }
if (done_signal_needed) { if (done_signal_needed) {
network_thread_->Post(RTC_FROM_HERE, this, MSG_SEQUENCEOBJECTS_CREATED); network_thread_->PostTask(webrtc::ToQueuedTask(
network_safety_, [this] { OnAllocationSequenceObjectsCreated(); }));
} }
} }
@ -1663,8 +1654,6 @@ PortConfiguration::PortConfiguration(const ServerAddresses& stun_servers,
webrtc::field_trial::IsDisabled("WebRTC-UseTurnServerAsStunServer"); webrtc::field_trial::IsDisabled("WebRTC-UseTurnServerAsStunServer");
} }
PortConfiguration::~PortConfiguration() = default;
ServerAddresses PortConfiguration::StunServers() { ServerAddresses PortConfiguration::StunServers() {
if (!stun_address.IsNil() && if (!stun_address.IsNil() &&
stun_servers.find(stun_address) == stun_servers.end()) { stun_servers.find(stun_address) == stun_servers.end()) {

View File

@ -22,7 +22,9 @@
#include "rtc_base/checks.h" #include "rtc_base/checks.h"
#include "rtc_base/network.h" #include "rtc_base/network.h"
#include "rtc_base/system/rtc_export.h" #include "rtc_base/system/rtc_export.h"
#include "rtc_base/task_utils/pending_task_safety_flag.h"
#include "rtc_base/thread.h" #include "rtc_base/thread.h"
#include "rtc_base/thread_annotations.h"
namespace cricket { namespace cricket {
@ -106,8 +108,9 @@ enum class SessionState {
// process will be started. // process will be started.
}; };
class RTC_EXPORT BasicPortAllocatorSession : public PortAllocatorSession, // This class is thread-compatible and assumes it's created, operated upon and
public rtc::MessageHandler { // destroyed on the network thread.
class RTC_EXPORT BasicPortAllocatorSession : public PortAllocatorSession {
public: public:
BasicPortAllocatorSession(BasicPortAllocator* allocator, BasicPortAllocatorSession(BasicPortAllocator* allocator,
const std::string& content_name, const std::string& content_name,
@ -155,10 +158,11 @@ class RTC_EXPORT BasicPortAllocatorSession : public PortAllocatorSession,
// Adds a port configuration that is now ready. Once we have one for each // Adds a port configuration that is now ready. Once we have one for each
// network (or a timeout occurs), we will start allocating ports. // network (or a timeout occurs), we will start allocating ports.
virtual void ConfigReady(PortConfiguration* config); void ConfigReady(std::unique_ptr<PortConfiguration> config);
// TODO(bugs.webrtc.org/12840) Remove once unused in downstream projects.
// MessageHandler. Can be overriden if message IDs do not conflict. ABSL_DEPRECATED(
void OnMessage(rtc::Message* message) override; "Use ConfigReady(std::unique_ptr<PortConfiguration>) instead!")
void ConfigReady(PortConfiguration* config);
private: private:
class PortData { class PortData {
@ -213,10 +217,10 @@ class RTC_EXPORT BasicPortAllocatorSession : public PortAllocatorSession,
State state_ = STATE_INPROGRESS; State state_ = STATE_INPROGRESS;
}; };
void OnConfigReady(PortConfiguration* config); void OnConfigReady(std::unique_ptr<PortConfiguration> config);
void OnConfigStop(); void OnConfigStop();
void AllocatePorts(); void AllocatePorts();
void OnAllocate(); void OnAllocate(int allocation_epoch);
void DoAllocate(bool disable_equivalent_phases); void DoAllocate(bool disable_equivalent_phases);
void OnNetworksChanged(); void OnNetworksChanged();
void OnAllocationSequenceObjectsCreated(); void OnAllocationSequenceObjectsCreated();
@ -266,7 +270,7 @@ class RTC_EXPORT BasicPortAllocatorSession : public PortAllocatorSession,
bool allocation_started_; bool allocation_started_;
bool network_manager_started_; bool network_manager_started_;
bool allocation_sequences_created_; bool allocation_sequences_created_;
std::vector<PortConfiguration*> configs_; std::vector<std::unique_ptr<PortConfiguration>> configs_;
std::vector<AllocationSequence*> sequences_; std::vector<AllocationSequence*> sequences_;
std::vector<PortData> ports_; std::vector<PortData> ports_;
std::vector<IceCandidateErrorEvent> candidate_error_events_; std::vector<IceCandidateErrorEvent> candidate_error_events_;
@ -274,13 +278,15 @@ class RTC_EXPORT BasicPortAllocatorSession : public PortAllocatorSession,
// Policy on how to prune turn ports, taken from the port allocator. // Policy on how to prune turn ports, taken from the port allocator.
webrtc::PortPrunePolicy turn_port_prune_policy_; webrtc::PortPrunePolicy turn_port_prune_policy_;
SessionState state_ = SessionState::CLEARED; SessionState state_ = SessionState::CLEARED;
int allocation_epoch_ RTC_GUARDED_BY(network_thread_) = 0;
webrtc::ScopedTaskSafety network_safety_;
friend class AllocationSequence; friend class AllocationSequence;
}; };
// Records configuration information useful in creating ports. // Records configuration information useful in creating ports.
// TODO(deadbeef): Rename "relay" to "turn_server" in this struct. // TODO(deadbeef): Rename "relay" to "turn_server" in this struct.
struct RTC_EXPORT PortConfiguration : public rtc::MessageData { struct RTC_EXPORT PortConfiguration {
// TODO(jiayl): remove |stun_address| when Chrome is updated. // TODO(jiayl): remove |stun_address| when Chrome is updated.
rtc::SocketAddress stun_address; rtc::SocketAddress stun_address;
ServerAddresses stun_servers; ServerAddresses stun_servers;
@ -300,8 +306,6 @@ struct RTC_EXPORT PortConfiguration : public rtc::MessageData {
const std::string& username, const std::string& username,
const std::string& password); const std::string& password);
~PortConfiguration() override;
// Returns addresses of both the explicitly configured STUN servers, // Returns addresses of both the explicitly configured STUN servers,
// and TURN servers that should be used as STUN servers. // and TURN servers that should be used as STUN servers.
ServerAddresses StunServers(); ServerAddresses StunServers();