diff --git a/pc/connection_context.h b/pc/connection_context.h index 71e2f1eeae..02d08a191e 100644 --- a/pc/connection_context.h +++ b/pc/connection_context.h @@ -62,6 +62,7 @@ class ConnectionContext : public rtc::RefCountInterface { // Functions called from PeerConnection and friends SctpTransportFactoryInterface* sctp_transport_factory() const { + RTC_DCHECK_RUN_ON(signaling_thread_); return sctp_factory_.get(); } @@ -122,7 +123,8 @@ class ConnectionContext : public rtc::RefCountInterface { RTC_GUARDED_BY(signaling_thread_); std::unique_ptr media_engine_ RTC_GUARDED_BY(signaling_thread_); - std::unique_ptr const sctp_factory_; + std::unique_ptr const sctp_factory_ + RTC_GUARDED_BY(signaling_thread_); // Accessed both on signaling thread and worker thread. std::unique_ptr const trials_; }; diff --git a/pc/jsep_transport_controller.cc b/pc/jsep_transport_controller.cc index 28ba899cb3..0ded1de84f 100644 --- a/pc/jsep_transport_controller.cc +++ b/pc/jsep_transport_controller.cc @@ -84,11 +84,13 @@ webrtc::RTCError VerifyCandidates(const cricket::Candidates& candidates) { namespace webrtc { JsepTransportController::JsepTransportController( + rtc::Thread* signaling_thread, rtc::Thread* network_thread, cricket::PortAllocator* port_allocator, AsyncResolverFactory* async_resolver_factory, Config config) - : network_thread_(network_thread), + : signaling_thread_(signaling_thread), + network_thread_(network_thread), port_allocator_(port_allocator), async_resolver_factory_(async_resolver_factory), config_(config), @@ -220,6 +222,12 @@ void JsepTransportController::SetNeedsIceRestartFlag() { bool JsepTransportController::NeedsIceRestart( const std::string& transport_name) const { + if (!network_thread_->IsCurrent()) { + RTC_DCHECK_RUN_ON(signaling_thread_); + return network_thread_->Invoke( + RTC_FROM_HERE, [&] { return NeedsIceRestart(transport_name); }); + } + RTC_DCHECK_RUN_ON(network_thread_); const cricket::JsepTransport* transport = @@ -406,6 +414,11 @@ RTCError JsepTransportController::RemoveRemoteCandidates( bool JsepTransportController::GetStats(const std::string& transport_name, cricket::TransportStats* stats) { + if (!network_thread_->IsCurrent()) { + return network_thread_->Invoke( + RTC_FROM_HERE, [=] { return GetStats(transport_name, stats); }); + } + RTC_DCHECK_RUN_ON(network_thread_); cricket::JsepTransport* transport = GetJsepTransportByName(transport_name); @@ -1181,24 +1194,35 @@ void JsepTransportController::OnTransportCandidateGathered_n( RTC_NOTREACHED(); return; } - - signal_ice_candidates_gathered_.Send( - transport->transport_name(), std::vector{candidate}); + std::string transport_name = transport->transport_name(); + // TODO(bugs.webrtc.org/12427): See if we can get rid of this. We should be + // able to just call this directly here. + invoker_.AsyncInvoke( + RTC_FROM_HERE, signaling_thread_, [this, transport_name, candidate] { + signal_ice_candidates_gathered_.Send( + transport_name, std::vector{candidate}); + }); } void JsepTransportController::OnTransportCandidateError_n( cricket::IceTransportInternal* transport, const cricket::IceCandidateErrorEvent& event) { - signal_ice_candidate_error_.Send(event); + invoker_.AsyncInvoke(RTC_FROM_HERE, signaling_thread_, [this, event] { + signal_ice_candidate_error_.Send(event); + }); } void JsepTransportController::OnTransportCandidatesRemoved_n( cricket::IceTransportInternal* transport, const cricket::Candidates& candidates) { - signal_ice_candidates_removed_.Send(candidates); + invoker_.AsyncInvoke( + RTC_FROM_HERE, signaling_thread_, + [this, candidates] { signal_ice_candidates_removed_.Send(candidates); }); } void JsepTransportController::OnTransportCandidatePairChanged_n( const cricket::CandidatePairChangeEvent& event) { - signal_ice_candidate_pair_changed_.Send(event); + invoker_.AsyncInvoke(RTC_FROM_HERE, signaling_thread_, [this, event] { + signal_ice_candidate_pair_changed_.Send(event); + }); } void JsepTransportController::OnTransportRoleConflict_n( @@ -1274,7 +1298,10 @@ void JsepTransportController::UpdateAggregateStates_n() { if (ice_connection_state_ != new_connection_state) { ice_connection_state_ = new_connection_state; - signal_ice_connection_state_.Send(new_connection_state); + invoker_.AsyncInvoke( + RTC_FROM_HERE, signaling_thread_, [this, new_connection_state] { + signal_ice_connection_state_.Send(new_connection_state); + }); } // Compute the current RTCIceConnectionState as described in @@ -1330,11 +1357,17 @@ void JsepTransportController::UpdateAggregateStates_n() { new_ice_connection_state == PeerConnectionInterface::kIceConnectionCompleted) { // Ensure that we never skip over the "connected" state. - signal_standardized_ice_connection_state_.Send( - PeerConnectionInterface::kIceConnectionConnected); + invoker_.AsyncInvoke(RTC_FROM_HERE, signaling_thread_, [this] { + signal_standardized_ice_connection_state_.Send( + PeerConnectionInterface::kIceConnectionConnected); + }); } standardized_ice_connection_state_ = new_ice_connection_state; - signal_standardized_ice_connection_state_.Send(new_ice_connection_state); + invoker_.AsyncInvoke(RTC_FROM_HERE, signaling_thread_, + [this, new_ice_connection_state] { + signal_standardized_ice_connection_state_.Send( + new_ice_connection_state); + }); } // Compute the current RTCPeerConnectionState as described in @@ -1385,7 +1418,10 @@ void JsepTransportController::UpdateAggregateStates_n() { if (combined_connection_state_ != new_combined_state) { combined_connection_state_ = new_combined_state; - signal_connection_state_.Send(new_combined_state); + invoker_.AsyncInvoke( + RTC_FROM_HERE, signaling_thread_, [this, new_combined_state] { + signal_connection_state_.Send(new_combined_state); + }); } // Compute the gathering state. @@ -1398,7 +1434,10 @@ void JsepTransportController::UpdateAggregateStates_n() { } if (ice_gathering_state_ != new_gathering_state) { ice_gathering_state_ = new_gathering_state; - signal_ice_gathering_state_.Send(new_gathering_state); + invoker_.AsyncInvoke( + RTC_FROM_HERE, signaling_thread_, [this, new_gathering_state] { + signal_ice_gathering_state_.Send(new_gathering_state); + }); } } diff --git a/pc/jsep_transport_controller.h b/pc/jsep_transport_controller.h index 949c9ad1dc..59d66a24f2 100644 --- a/pc/jsep_transport_controller.h +++ b/pc/jsep_transport_controller.h @@ -54,6 +54,7 @@ #include "pc/session_description.h" #include "pc/srtp_transport.h" #include "pc/transport_stats.h" +#include "rtc_base/async_invoker.h" #include "rtc_base/callback_list.h" #include "rtc_base/constructor_magic.h" #include "rtc_base/copy_on_write_buffer.h" @@ -136,11 +137,10 @@ class JsepTransportController : public sigslot::has_slots<> { std::function on_dtls_handshake_error_; }; - // The ICE related events are fired on the |network_thread|. - // All the transport related methods are called on the |network_thread| - // and destruction of the JsepTransportController must occur on the - // |network_thread|. - JsepTransportController(rtc::Thread* network_thread, + // The ICE related events are signaled on the |signaling_thread|. + // All the transport related methods are called on the |network_thread|. + JsepTransportController(rtc::Thread* signaling_thread, + rtc::Thread* network_thread, cricket::PortAllocator* port_allocator, AsyncResolverFactory* async_resolver_factory, Config config); @@ -227,28 +227,26 @@ class JsepTransportController : public sigslot::has_slots<> { // F: void(const std::string&, const std::vector&) template void SubscribeIceCandidateGathered(F&& callback) { - RTC_DCHECK_RUN_ON(network_thread_); + // TODO(bugs.webrtc.org/12427): Post this subscription to the network + // thread. signal_ice_candidates_gathered_.AddReceiver(std::forward(callback)); } // F: void(cricket::IceConnectionState) template void SubscribeIceConnectionState(F&& callback) { - RTC_DCHECK_RUN_ON(network_thread_); signal_ice_connection_state_.AddReceiver(std::forward(callback)); } // F: void(PeerConnectionInterface::PeerConnectionState) template void SubscribeConnectionState(F&& callback) { - RTC_DCHECK_RUN_ON(network_thread_); signal_connection_state_.AddReceiver(std::forward(callback)); } // F: void(PeerConnectionInterface::IceConnectionState) template void SubscribeStandardizedIceConnectionState(F&& callback) { - RTC_DCHECK_RUN_ON(network_thread_); signal_standardized_ice_connection_state_.AddReceiver( std::forward(callback)); } @@ -256,65 +254,60 @@ class JsepTransportController : public sigslot::has_slots<> { // F: void(cricket::IceGatheringState) template void SubscribeIceGatheringState(F&& callback) { - RTC_DCHECK_RUN_ON(network_thread_); signal_ice_gathering_state_.AddReceiver(std::forward(callback)); } // F: void(const cricket::IceCandidateErrorEvent&) template void SubscribeIceCandidateError(F&& callback) { - RTC_DCHECK_RUN_ON(network_thread_); signal_ice_candidate_error_.AddReceiver(std::forward(callback)); } // F: void(const std::vector&) template void SubscribeIceCandidatesRemoved(F&& callback) { - RTC_DCHECK_RUN_ON(network_thread_); signal_ice_candidates_removed_.AddReceiver(std::forward(callback)); } // F: void(const cricket::CandidatePairChangeEvent&) template void SubscribeIceCandidatePairChanged(F&& callback) { - RTC_DCHECK_RUN_ON(network_thread_); signal_ice_candidate_pair_changed_.AddReceiver(std::forward(callback)); } private: - // All of these callbacks are fired on the network thread. + // All of these callbacks are fired on the signaling thread. // If any transport failed => failed, // Else if all completed => completed, // Else if all connected => connected, // Else => connecting - CallbackList signal_ice_connection_state_ - RTC_GUARDED_BY(network_thread_); + CallbackList signal_ice_connection_state_; CallbackList - signal_connection_state_ RTC_GUARDED_BY(network_thread_); + signal_connection_state_; CallbackList - signal_standardized_ice_connection_state_ RTC_GUARDED_BY(network_thread_); + signal_standardized_ice_connection_state_; // If all transports done gathering => complete, // Else if any are gathering => gathering, // Else => new - CallbackList signal_ice_gathering_state_ - RTC_GUARDED_BY(network_thread_); + CallbackList signal_ice_gathering_state_; // [mid, candidates] + // TODO(bugs.webrtc.org/12427): Protect this with network_thread_. CallbackList&> - signal_ice_candidates_gathered_ RTC_GUARDED_BY(network_thread_); + signal_ice_candidates_gathered_; CallbackList - signal_ice_candidate_error_ RTC_GUARDED_BY(network_thread_); + signal_ice_candidate_error_; CallbackList&> - signal_ice_candidates_removed_ RTC_GUARDED_BY(network_thread_); + signal_ice_candidates_removed_; CallbackList - signal_ice_candidate_pair_changed_ RTC_GUARDED_BY(network_thread_); + signal_ice_candidate_pair_changed_; RTCError ApplyDescription_n(bool local, SdpType type, @@ -459,6 +452,7 @@ class JsepTransportController : public sigslot::has_slots<> { void OnDtlsHandshakeError(rtc::SSLHandshakeError error); + rtc::Thread* const signaling_thread_ = nullptr; rtc::Thread* const network_thread_ = nullptr; cricket::PortAllocator* const port_allocator_ = nullptr; AsyncResolverFactory* const async_resolver_factory_ = nullptr; @@ -496,6 +490,7 @@ class JsepTransportController : public sigslot::has_slots<> { cricket::IceRole ice_role_ = cricket::ICEROLE_CONTROLLING; uint64_t ice_tiebreaker_ = rtc::CreateRandomId64(); rtc::scoped_refptr certificate_; + rtc::AsyncInvoker invoker_; RTC_DISALLOW_COPY_AND_ASSIGN(JsepTransportController); }; diff --git a/pc/jsep_transport_controller_unittest.cc b/pc/jsep_transport_controller_unittest.cc index 0424afe876..9efa205368 100644 --- a/pc/jsep_transport_controller_unittest.cc +++ b/pc/jsep_transport_controller_unittest.cc @@ -74,6 +74,7 @@ class JsepTransportControllerTest : public JsepTransportController::Observer, void CreateJsepTransportController( JsepTransportController::Config config, + rtc::Thread* signaling_thread = rtc::Thread::Current(), rtc::Thread* network_thread = rtc::Thread::Current(), cricket::PortAllocator* port_allocator = nullptr) { config.transport_observer = this; @@ -83,10 +84,9 @@ class JsepTransportControllerTest : public JsepTransportController::Observer, config.dtls_transport_factory = fake_dtls_transport_factory_.get(); config.on_dtls_handshake_error_ = [](rtc::SSLHandshakeError s) {}; transport_controller_ = std::make_unique( - network_thread, port_allocator, nullptr /* async_resolver_factory */, - config); - network_thread->Invoke(RTC_FROM_HERE, - [&] { ConnectTransportControllerSignals(); }); + signaling_thread, network_thread, port_allocator, + nullptr /* async_resolver_factory */, config); + ConnectTransportControllerSignals(); } void ConnectTransportControllerSignals() { @@ -276,14 +276,18 @@ class JsepTransportControllerTest : public JsepTransportController::Observer, protected: void OnConnectionState(cricket::IceConnectionState state) { - ice_signaled_on_thread_ = rtc::Thread::Current(); + if (!signaling_thread_->IsCurrent()) { + signaled_on_non_signaling_thread_ = true; + } connection_state_ = state; ++connection_state_signal_count_; } void OnStandardizedIceConnectionState( PeerConnectionInterface::IceConnectionState state) { - ice_signaled_on_thread_ = rtc::Thread::Current(); + if (!signaling_thread_->IsCurrent()) { + signaled_on_non_signaling_thread_ = true; + } ice_connection_state_ = state; ++ice_connection_state_signal_count_; } @@ -292,20 +296,26 @@ class JsepTransportControllerTest : public JsepTransportController::Observer, PeerConnectionInterface::PeerConnectionState state) { RTC_LOG(LS_INFO) << "OnCombinedConnectionState: " << static_cast(state); - ice_signaled_on_thread_ = rtc::Thread::Current(); + if (!signaling_thread_->IsCurrent()) { + signaled_on_non_signaling_thread_ = true; + } combined_connection_state_ = state; ++combined_connection_state_signal_count_; } void OnGatheringState(cricket::IceGatheringState state) { - ice_signaled_on_thread_ = rtc::Thread::Current(); + if (!signaling_thread_->IsCurrent()) { + signaled_on_non_signaling_thread_ = true; + } gathering_state_ = state; ++gathering_state_signal_count_; } void OnCandidatesGathered(const std::string& transport_name, const Candidates& candidates) { - ice_signaled_on_thread_ = rtc::Thread::Current(); + if (!signaling_thread_->IsCurrent()) { + signaled_on_non_signaling_thread_ = true; + } candidates_[transport_name].insert(candidates_[transport_name].end(), candidates.begin(), candidates.end()); ++candidates_signal_count_; @@ -350,7 +360,7 @@ class JsepTransportControllerTest : public JsepTransportController::Observer, std::unique_ptr fake_ice_transport_factory_; std::unique_ptr fake_dtls_transport_factory_; rtc::Thread* const signaling_thread_ = nullptr; - rtc::Thread* ice_signaled_on_thread_ = nullptr; + bool signaled_on_non_signaling_thread_ = false; // Used to verify the SignalRtpTransportChanged/SignalDtlsTransportChanged are // signaled correctly. std::map changed_rtp_transport_by_mid_; @@ -873,12 +883,11 @@ TEST_F(JsepTransportControllerTest, SignalCandidatesGathered) { EXPECT_EQ(1u, candidates_[kAudioMid1].size()); } -TEST_F(JsepTransportControllerTest, IceSignalingOccursOnNetworkThread) { +TEST_F(JsepTransportControllerTest, IceSignalingOccursOnSignalingThread) { network_thread_ = rtc::Thread::CreateWithSocketServer(); network_thread_->Start(); - EXPECT_EQ(ice_signaled_on_thread_, nullptr); CreateJsepTransportController(JsepTransportController::Config(), - network_thread_.get(), + signaling_thread_, network_thread_.get(), /*port_allocator=*/nullptr); CreateLocalDescriptionAndCompleteConnectionOnNetworkThread(); @@ -894,7 +903,7 @@ TEST_F(JsepTransportControllerTest, IceSignalingOccursOnNetworkThread) { EXPECT_EQ_WAIT(1u, candidates_[kVideoMid1].size(), kTimeout); EXPECT_EQ(2, candidates_signal_count_); - EXPECT_EQ(ice_signaled_on_thread_, network_thread_.get()); + EXPECT_TRUE(!signaled_on_non_signaling_thread_); network_thread_->Invoke(RTC_FROM_HERE, [&] { transport_controller_.reset(); }); diff --git a/pc/peer_connection.cc b/pc/peer_connection.cc index b4e4246766..f82fe35c6d 100644 --- a/pc/peer_connection.cc +++ b/pc/peer_connection.cc @@ -88,6 +88,7 @@ const char kSimulcastNumberOfEncodings[] = static const int REPORT_USAGE_PATTERN_DELAY_MS = 60000; + uint32_t ConvertIceTransportTypeToCandidateFilter( PeerConnectionInterface::IceTransportsType type) { switch (type) { @@ -263,20 +264,6 @@ bool HasRtcpMuxEnabled(const cricket::ContentInfo* content) { return content->media_description()->rtcp_mux(); } -bool DtlsEnabled(const PeerConnectionInterface::RTCConfiguration& configuration, - const PeerConnectionFactoryInterface::Options& options, - const PeerConnectionDependencies& dependencies) { - if (options.disable_encryption) - return false; - - // Enable DTLS by default if we have an identity store or a certificate. - bool default_enabled = - (dependencies.cert_generator || !configuration.certificates.empty()); - - // The |configuration| can override the default value. - return configuration.enable_dtls_srtp.value_or(default_enabled); -} - } // namespace bool PeerConnectionInterface::RTCConfiguration::operator==( @@ -434,12 +421,11 @@ RTCErrorOr> PeerConnection::Create( bool is_unified_plan = configuration.sdp_semantics == SdpSemantics::kUnifiedPlan; - bool dtls_enabled = DtlsEnabled(configuration, options, dependencies); // The PeerConnection constructor consumes some, but not all, dependencies. rtc::scoped_refptr pc( new rtc::RefCountedObject( context, options, is_unified_plan, std::move(event_log), - std::move(call), dependencies, dtls_enabled)); + std::move(call), dependencies)); RTCError init_error = pc->Initialize(configuration, std::move(dependencies)); if (!init_error.ok()) { RTC_LOG(LS_ERROR) << "PeerConnection initialization failed"; @@ -454,8 +440,7 @@ PeerConnection::PeerConnection( bool is_unified_plan, std::unique_ptr event_log, std::unique_ptr call, - PeerConnectionDependencies& dependencies, - bool dtls_enabled) + PeerConnectionDependencies& dependencies) : context_(context), options_(options), observer_(dependencies.observer), @@ -468,17 +453,9 @@ PeerConnection::PeerConnection( tls_cert_verifier_(std::move(dependencies.tls_cert_verifier)), call_(std::move(call)), call_ptr_(call_.get()), - dtls_enabled_(dtls_enabled), data_channel_controller_(this), message_handler_(signaling_thread()), - weak_factory_(this) { - worker_thread()->Invoke(RTC_FROM_HERE, [this] { - RTC_DCHECK_RUN_ON(worker_thread()); - worker_thread_safety_ = PendingTaskSafetyFlag::Create(); - if (!call_) - worker_thread_safety_->SetNotAlive(); - }); -} + weak_factory_(this) {} PeerConnection::~PeerConnection() { TRACE_EVENT0("webrtc", "PeerConnection::~PeerConnection"); @@ -519,13 +496,15 @@ PeerConnection::~PeerConnection() { RTC_DCHECK_RUN_ON(network_thread()); transport_controller_.reset(); port_allocator_.reset(); - if (network_thread_safety_) + if (network_thread_safety_) { network_thread_safety_->SetNotAlive(); + network_thread_safety_ = nullptr; + } }); // call_ and event_log_ must be destroyed on the worker thread. worker_thread()->Invoke(RTC_FROM_HERE, [this] { RTC_DCHECK_RUN_ON(worker_thread()); - worker_thread_safety_->SetNotAlive(); + call_safety_.reset(); call_.reset(); // The event log must outlive call (and any other object that uses it). event_log_.reset(); @@ -552,6 +531,20 @@ RTCError PeerConnection::Initialize( turn_server.turn_logging_id = configuration.turn_logging_id; } + // The port allocator lives on the network thread and should be initialized + // there. Also set up the task safety flag for canceling pending tasks on + // the network thread when closing. + // TODO(bugs.webrtc.org/12427): See if we can piggyback on this call and + // initialize all the |transport_controller_->Subscribe*| calls below on the + // network thread via this invoke. + const auto pa_result = + network_thread()->Invoke( + RTC_FROM_HERE, [this, &stun_servers, &turn_servers, &configuration] { + network_thread_safety_ = PendingTaskSafetyFlag::Create(); + return InitializePortAllocator_n(stun_servers, turn_servers, + configuration); + }); + // Note if STUN or TURN servers were supplied. if (!stun_servers.empty()) { NoteUsageEvent(UsageEvent::STUN_SERVER_ADDED); @@ -560,11 +553,52 @@ RTCError PeerConnection::Initialize( NoteUsageEvent(UsageEvent::TURN_SERVER_ADDED); } + // Send information about IPv4/IPv6 status. + PeerConnectionAddressFamilyCounter address_family; + if (pa_result.enable_ipv6) { + address_family = kPeerConnection_IPv6; + } else { + address_family = kPeerConnection_IPv4; + } + RTC_HISTOGRAM_ENUMERATION("WebRTC.PeerConnection.IPMetrics", address_family, + kPeerConnectionAddressFamilyCounter_Max); + // RFC 3264: The numeric value of the session id and version in the // o line MUST be representable with a "64 bit signed integer". // Due to this constraint session id |session_id_| is max limited to // LLONG_MAX. session_id_ = rtc::ToString(rtc::CreateRandomId64() & LLONG_MAX); + JsepTransportController::Config config; + config.redetermine_role_on_ice_restart = + configuration.redetermine_role_on_ice_restart; + config.ssl_max_version = options_.ssl_max_version; + config.disable_encryption = options_.disable_encryption; + config.bundle_policy = configuration.bundle_policy; + config.rtcp_mux_policy = configuration.rtcp_mux_policy; + // TODO(bugs.webrtc.org/9891) - Remove options_.crypto_options then remove + // this stub. + config.crypto_options = configuration.crypto_options.has_value() + ? *configuration.crypto_options + : options_.crypto_options; + config.transport_observer = this; + config.rtcp_handler = InitializeRtcpCallback(); + config.event_log = event_log_ptr_; +#if defined(ENABLE_EXTERNAL_AUTH) + config.enable_external_auth = true; +#endif + config.active_reset_srtp_params = configuration.active_reset_srtp_params; + + if (options_.disable_encryption) { + dtls_enabled_ = false; + } else { + // Enable DTLS by default if we have an identity store or a certificate. + dtls_enabled_ = + (dependencies.cert_generator || !configuration.certificates.empty()); + // |configuration| can override the default |dtls_enabled_| value. + if (configuration.enable_dtls_srtp) { + dtls_enabled_ = *(configuration.enable_dtls_srtp); + } + } if (configuration.enable_rtp_data_channel) { // Enable creation of RTP data channels if the kEnableRtpDataChannels is @@ -575,27 +609,77 @@ RTCError PeerConnection::Initialize( // DTLS has to be enabled to use SCTP. if (!options_.disable_sctp_data_channels && dtls_enabled_) { data_channel_controller_.set_data_channel_type(cricket::DCT_SCTP); + config.sctp_factory = context_->sctp_transport_factory(); } } - // Network thread initialization. - network_thread()->Invoke(RTC_FROM_HERE, [this, &stun_servers, - &turn_servers, &configuration, - &dependencies] { - RTC_DCHECK_RUN_ON(network_thread()); - network_thread_safety_ = PendingTaskSafetyFlag::Create(); - InitializePortAllocatorResult pa_result = - InitializePortAllocator_n(stun_servers, turn_servers, configuration); - // Send information about IPv4/IPv6 status. - PeerConnectionAddressFamilyCounter address_family = - pa_result.enable_ipv6 ? kPeerConnection_IPv6 : kPeerConnection_IPv4; - RTC_HISTOGRAM_ENUMERATION("WebRTC.PeerConnection.IPMetrics", address_family, - kPeerConnectionAddressFamilyCounter_Max); - InitializeTransportController_n(configuration, dependencies); - }); + config.ice_transport_factory = ice_transport_factory_.get(); + config.on_dtls_handshake_error_ = + [weak_ptr = weak_factory_.GetWeakPtr()](rtc::SSLHandshakeError s) { + if (weak_ptr) { + weak_ptr->OnTransportControllerDtlsHandshakeError(s); + } + }; + + transport_controller_.reset(new JsepTransportController( + signaling_thread(), network_thread(), port_allocator_.get(), + async_resolver_factory_.get(), config)); + + // The following RTC_DCHECKs are added by looking at the caller thread. + // If this is incorrect there might not be test failures + // due to lack of unit tests which trigger these scenarios. + // TODO(bugs.webrtc.org/12160): Remove above comments. + // callbacks for signaling_thread. + // TODO(bugs.webrtc.org/12427): If we can't piggyback on the above network + // Invoke(), then perhaps we could post these subscription calls to the + // network thread so that the transport controller doesn't have to do the + // signaling/network handling internally and use AsyncInvoker. + transport_controller_->SubscribeIceConnectionState( + [this](cricket::IceConnectionState s) { + RTC_DCHECK_RUN_ON(signaling_thread()); + OnTransportControllerConnectionState(s); + }); + transport_controller_->SubscribeConnectionState( + [this](PeerConnectionInterface::PeerConnectionState s) { + RTC_DCHECK_RUN_ON(signaling_thread()); + SetConnectionState(s); + }); + transport_controller_->SubscribeStandardizedIceConnectionState( + [this](PeerConnectionInterface::IceConnectionState s) { + RTC_DCHECK_RUN_ON(signaling_thread()); + SetStandardizedIceConnectionState(s); + }); + transport_controller_->SubscribeIceGatheringState( + [this](cricket::IceGatheringState s) { + RTC_DCHECK_RUN_ON(signaling_thread()); + OnTransportControllerGatheringState(s); + }); + transport_controller_->SubscribeIceCandidateGathered( + [this](const std::string& transport, + const std::vector& candidates) { + RTC_DCHECK_RUN_ON(signaling_thread()); + OnTransportControllerCandidatesGathered(transport, candidates); + }); + transport_controller_->SubscribeIceCandidateError( + [this](const cricket::IceCandidateErrorEvent& event) { + RTC_DCHECK_RUN_ON(signaling_thread()); + OnTransportControllerCandidateError(event); + }); + transport_controller_->SubscribeIceCandidatesRemoved( + [this](const std::vector& c) { + RTC_DCHECK_RUN_ON(signaling_thread()); + OnTransportControllerCandidatesRemoved(c); + }); + transport_controller_->SubscribeIceCandidatePairChanged( + [this](const cricket::CandidatePairChangeEvent& event) { + RTC_DCHECK_RUN_ON(signaling_thread()); + OnTransportControllerCandidateChanged(event); + }); configuration_ = configuration; + transport_controller_->SetIceConfig(ParseIceConfig(configuration)); + stats_ = std::make_unique(this); stats_collector_ = RTCStatsCollector::Create(this); @@ -632,125 +716,6 @@ RTCError PeerConnection::Initialize( return RTCError::OK(); } -void PeerConnection::InitializeTransportController_n( - const RTCConfiguration& configuration, - const PeerConnectionDependencies& dependencies) { - JsepTransportController::Config config; - config.redetermine_role_on_ice_restart = - configuration.redetermine_role_on_ice_restart; - config.ssl_max_version = options_.ssl_max_version; - config.disable_encryption = options_.disable_encryption; - config.bundle_policy = configuration.bundle_policy; - config.rtcp_mux_policy = configuration.rtcp_mux_policy; - // TODO(bugs.webrtc.org/9891) - Remove options_.crypto_options then remove - // this stub. - config.crypto_options = configuration.crypto_options.has_value() - ? *configuration.crypto_options - : options_.crypto_options; - config.transport_observer = this; - config.rtcp_handler = InitializeRtcpCallback(); - config.event_log = event_log_ptr_; -#if defined(ENABLE_EXTERNAL_AUTH) - config.enable_external_auth = true; -#endif - config.active_reset_srtp_params = configuration.active_reset_srtp_params; - - // DTLS has to be enabled to use SCTP. - if (!configuration.enable_rtp_data_channel && - !options_.disable_sctp_data_channels && dtls_enabled_) { - config.sctp_factory = context_->sctp_transport_factory(); - } - - config.ice_transport_factory = ice_transport_factory_.get(); - config.on_dtls_handshake_error_ = - [weak_ptr = weak_factory_.GetWeakPtr()](rtc::SSLHandshakeError s) { - if (weak_ptr) { - weak_ptr->OnTransportControllerDtlsHandshakeError(s); - } - }; - - transport_controller_.reset( - new JsepTransportController(network_thread(), port_allocator_.get(), - async_resolver_factory_.get(), config)); - - transport_controller_->SubscribeIceConnectionState( - [this](cricket::IceConnectionState s) { - RTC_DCHECK_RUN_ON(network_thread()); - signaling_thread()->PostTask( - ToQueuedTask(signaling_thread_safety_.flag(), [this, s]() { - RTC_DCHECK_RUN_ON(signaling_thread()); - OnTransportControllerConnectionState(s); - })); - }); - transport_controller_->SubscribeConnectionState( - [this](PeerConnectionInterface::PeerConnectionState s) { - RTC_DCHECK_RUN_ON(network_thread()); - signaling_thread()->PostTask( - ToQueuedTask(signaling_thread_safety_.flag(), [this, s]() { - RTC_DCHECK_RUN_ON(signaling_thread()); - SetConnectionState(s); - })); - }); - transport_controller_->SubscribeStandardizedIceConnectionState( - [this](PeerConnectionInterface::IceConnectionState s) { - RTC_DCHECK_RUN_ON(network_thread()); - signaling_thread()->PostTask( - ToQueuedTask(signaling_thread_safety_.flag(), [this, s]() { - RTC_DCHECK_RUN_ON(signaling_thread()); - SetStandardizedIceConnectionState(s); - })); - }); - transport_controller_->SubscribeIceGatheringState( - [this](cricket::IceGatheringState s) { - RTC_DCHECK_RUN_ON(network_thread()); - signaling_thread()->PostTask( - ToQueuedTask(signaling_thread_safety_.flag(), [this, s]() { - RTC_DCHECK_RUN_ON(signaling_thread()); - OnTransportControllerGatheringState(s); - })); - }); - transport_controller_->SubscribeIceCandidateGathered( - [this](const std::string& transport, - const std::vector& candidates) { - RTC_DCHECK_RUN_ON(network_thread()); - signaling_thread()->PostTask( - ToQueuedTask(signaling_thread_safety_.flag(), - [this, t = transport, c = candidates]() { - RTC_DCHECK_RUN_ON(signaling_thread()); - OnTransportControllerCandidatesGathered(t, c); - })); - }); - transport_controller_->SubscribeIceCandidateError( - [this](const cricket::IceCandidateErrorEvent& event) { - RTC_DCHECK_RUN_ON(network_thread()); - signaling_thread()->PostTask(ToQueuedTask( - signaling_thread_safety_.flag(), [this, event = event]() { - RTC_DCHECK_RUN_ON(signaling_thread()); - OnTransportControllerCandidateError(event); - })); - }); - transport_controller_->SubscribeIceCandidatesRemoved( - [this](const std::vector& c) { - RTC_DCHECK_RUN_ON(network_thread()); - signaling_thread()->PostTask( - ToQueuedTask(signaling_thread_safety_.flag(), [this, c = c]() { - RTC_DCHECK_RUN_ON(signaling_thread()); - OnTransportControllerCandidatesRemoved(c); - })); - }); - transport_controller_->SubscribeIceCandidatePairChanged( - [this](const cricket::CandidatePairChangeEvent& event) { - RTC_DCHECK_RUN_ON(network_thread()); - signaling_thread()->PostTask(ToQueuedTask( - signaling_thread_safety_.flag(), [this, event = event]() { - RTC_DCHECK_RUN_ON(signaling_thread()); - OnTransportControllerCandidateChanged(event); - })); - }); - - transport_controller_->SetIceConfig(ParseIceConfig(configuration)); -} - rtc::scoped_refptr PeerConnection::local_streams() { RTC_DCHECK_RUN_ON(signaling_thread()); RTC_CHECK(!IsUnifiedPlan()) << "local_streams is not available with Unified " @@ -1475,7 +1440,6 @@ RTCError PeerConnection::SetConfiguration( if (configuration_.active_reset_srtp_params != modified_config.active_reset_srtp_params) { - // TODO(tommi): move to the network thread - this hides an invoke. transport_controller_->SetActiveResetSrtpParams( modified_config.active_reset_srtp_params); } @@ -1630,7 +1594,6 @@ void PeerConnection::StopRtcEventLog() { rtc::scoped_refptr PeerConnection::LookupDtlsTransportByMid(const std::string& mid) { RTC_DCHECK_RUN_ON(signaling_thread()); - // TODO(tommi): Move to the network thread - this hides an invoke. return transport_controller_->LookupDtlsTransportByMid(mid); } @@ -1734,12 +1697,13 @@ void PeerConnection::Close() { port_allocator_->DiscardCandidatePool(); if (network_thread_safety_) { network_thread_safety_->SetNotAlive(); + network_thread_safety_ = nullptr; } }); worker_thread()->Invoke(RTC_FROM_HERE, [this] { RTC_DCHECK_RUN_ON(worker_thread()); - worker_thread_safety_->SetNotAlive(); + call_safety_.reset(); call_.reset(); // The event log must outlive call (and any other object that uses it). event_log_.reset(); @@ -2180,10 +2144,7 @@ bool PeerConnection::IceRestartPending(const std::string& content_name) const { } bool PeerConnection::NeedsIceRestart(const std::string& content_name) const { - return network_thread()->Invoke(RTC_FROM_HERE, [this, &content_name] { - RTC_DCHECK_RUN_ON(network_thread()); - return transport_controller_->NeedsIceRestart(content_name); - }); + return transport_controller_->NeedsIceRestart(content_name); } void PeerConnection::OnTransportControllerConnectionState( @@ -2526,7 +2487,6 @@ void PeerConnection::OnTransportControllerGatheringState( } void PeerConnection::ReportTransportStats() { - rtc::Thread::ScopedDisallowBlockingCalls no_blocking_calls; std::map> media_types_by_transport_name; for (const auto& transceiver : rtp_manager()->transceivers()->List()) { @@ -2548,25 +2508,18 @@ void PeerConnection::ReportTransportStats() { cricket::MEDIA_TYPE_DATA); } - // Run the loop that reports the state on the network thread since the - // transport controller requires the stats to be read there (GetStats()). - network_thread()->PostTask(ToQueuedTask( - network_thread_safety_, [this, media_types_by_transport_name = std::move( - media_types_by_transport_name)] { - for (const auto& entry : media_types_by_transport_name) { - const std::string& transport_name = entry.first; - const std::set media_types = entry.second; - cricket::TransportStats stats; - if (transport_controller_->GetStats(transport_name, &stats)) { - ReportBestConnectionState(stats); - ReportNegotiatedCiphers(dtls_enabled_, stats, media_types); - } - } - })); + for (const auto& entry : media_types_by_transport_name) { + const std::string& transport_name = entry.first; + const std::set media_types = entry.second; + cricket::TransportStats stats; + if (transport_controller_->GetStats(transport_name, &stats)) { + ReportBestConnectionState(stats); + ReportNegotiatedCiphers(stats, media_types); + } + } } // Walk through the ConnectionInfos to gather best connection usage // for IPv4 and IPv6. -// static (no member state required) void PeerConnection::ReportBestConnectionState( const cricket::TransportStats& stats) { for (const cricket::TransportChannelStats& channel_stats : @@ -2614,12 +2567,10 @@ void PeerConnection::ReportBestConnectionState( } } -// static void PeerConnection::ReportNegotiatedCiphers( - bool dtls_enabled, const cricket::TransportStats& stats, const std::set& media_types) { - if (!dtls_enabled || stats.channel_stats.empty()) { + if (!dtls_enabled_ || stats.channel_stats.empty()) { return; } @@ -2770,9 +2721,24 @@ void PeerConnection::RequestUsagePatternReportForTesting() { std::function PeerConnection::InitializeRtcpCallback() { - RTC_DCHECK_RUN_ON(network_thread()); - return [this, flag = worker_thread_safety_]( - const rtc::CopyOnWriteBuffer& packet, int64_t packet_time_us) { + RTC_DCHECK_RUN_ON(signaling_thread()); + + auto flag = + worker_thread()->Invoke>( + RTC_FROM_HERE, [this] { + RTC_DCHECK_RUN_ON(worker_thread()); + if (!call_) + return rtc::scoped_refptr(); + if (!call_safety_) + call_safety_.reset(new ScopedTaskSafety()); + return call_safety_->flag(); + }); + + if (!flag) + return [](const rtc::CopyOnWriteBuffer&, int64_t) {}; + + return [this, flag = std::move(flag)](const rtc::CopyOnWriteBuffer& packet, + int64_t packet_time_us) { RTC_DCHECK_RUN_ON(network_thread()); // TODO(bugs.webrtc.org/11993): We should actually be delivering this call // directly to the Call class somehow directly on the network thread and not diff --git a/pc/peer_connection.h b/pc/peer_connection.h index 75af0ae170..92e33d2858 100644 --- a/pc/peer_connection.h +++ b/pc/peer_connection.h @@ -455,8 +455,7 @@ class PeerConnection : public PeerConnectionInternal, bool is_unified_plan, std::unique_ptr event_log, std::unique_ptr call, - PeerConnectionDependencies& dependencies, - bool dtls_enabled); + PeerConnectionDependencies& dependencies); ~PeerConnection() override; @@ -464,10 +463,6 @@ class PeerConnection : public PeerConnectionInternal, RTCError Initialize( const PeerConnectionInterface::RTCConfiguration& configuration, PeerConnectionDependencies dependencies); - void InitializeTransportController_n( - const RTCConfiguration& configuration, - const PeerConnectionDependencies& dependencies) - RTC_RUN_ON(network_thread()); rtc::scoped_refptr> FindTransceiverBySender(rtc::scoped_refptr sender) @@ -578,12 +573,11 @@ class PeerConnection : public PeerConnectionInternal, void ReportTransportStats() RTC_RUN_ON(signaling_thread()); // Gather the usage of IPv4/IPv6 as best connection. - static void ReportBestConnectionState(const cricket::TransportStats& stats); + void ReportBestConnectionState(const cricket::TransportStats& stats); - static void ReportNegotiatedCiphers( - bool dtls_enabled, - const cricket::TransportStats& stats, - const std::set& media_types); + void ReportNegotiatedCiphers(const cricket::TransportStats& stats, + const std::set& media_types) + RTC_RUN_ON(signaling_thread()); void ReportIceCandidateCollected(const cricket::Candidate& candidate) RTC_RUN_ON(signaling_thread()); @@ -633,9 +627,8 @@ class PeerConnection : public PeerConnectionInternal, // TODO(zstein): |async_resolver_factory_| can currently be nullptr if it // is not injected. It should be required once chromium supplies it. - // This member variable is only used by JsepTransportController so we should - // consider moving ownership to there. - const std::unique_ptr async_resolver_factory_; + const std::unique_ptr async_resolver_factory_ + RTC_GUARDED_BY(signaling_thread()); std::unique_ptr port_allocator_; // TODO(bugs.webrtc.org/9987): Accessed on both // signaling and network thread. @@ -653,7 +646,8 @@ class PeerConnection : public PeerConnectionInternal, std::unique_ptr call_ RTC_GUARDED_BY(worker_thread()); ScopedTaskSafety signaling_thread_safety_; rtc::scoped_refptr network_thread_safety_; - rtc::scoped_refptr worker_thread_safety_; + std::unique_ptr call_safety_ + RTC_GUARDED_BY(worker_thread()); // Points to the same thing as `call_`. Since it's const, we may read the // pointer from any thread. @@ -687,7 +681,7 @@ class PeerConnection : public PeerConnectionInternal, std::unique_ptr sdp_handler_ RTC_GUARDED_BY(signaling_thread()); - const bool dtls_enabled_; + bool dtls_enabled_ RTC_GUARDED_BY(signaling_thread()) = false; UsagePattern usage_pattern_ RTC_GUARDED_BY(signaling_thread()); bool return_histogram_very_quickly_ RTC_GUARDED_BY(signaling_thread()) = diff --git a/pc/sdp_offer_answer.cc b/pc/sdp_offer_answer.cc index 8588ca8dbf..9fa4188e10 100644 --- a/pc/sdp_offer_answer.cc +++ b/pc/sdp_offer_answer.cc @@ -2794,7 +2794,7 @@ bool SdpOfferAnswerHandler::IceRestartPending( bool SdpOfferAnswerHandler::NeedsIceRestart( const std::string& content_name) const { - return pc_->NeedsIceRestart(content_name); + return transport_controller()->NeedsIceRestart(content_name); } absl::optional SdpOfferAnswerHandler::GetDtlsRole( diff --git a/test/peer_scenario/scenario_connection.cc b/test/peer_scenario/scenario_connection.cc index fefaa00c72..8e5b3162cb 100644 --- a/test/peer_scenario/scenario_connection.cc +++ b/test/peer_scenario/scenario_connection.cc @@ -97,7 +97,8 @@ ScenarioIceConnectionImpl::ScenarioIceConnectionImpl( port_allocator_( new cricket::BasicPortAllocator(manager_->network_manager())), jsep_controller_( - new JsepTransportController(network_thread_, + new JsepTransportController(signaling_thread_, + network_thread_, port_allocator_.get(), /*async_resolver_factory*/ nullptr, CreateJsepConfig())) {