diff --git a/pc/BUILD.gn b/pc/BUILD.gn index 5383362686..1fad92a264 100644 --- a/pc/BUILD.gn +++ b/pc/BUILD.gn @@ -274,6 +274,7 @@ rtc_library("peerconnection") { "../rtc_base/synchronization:sequence_checker", "../rtc_base/system:file_wrapper", "../rtc_base/system:rtc_export", + "../rtc_base/task_utils:pending_task_safety_flag", "../rtc_base/task_utils:to_queued_task", "../rtc_base/third_party/base64", "../rtc_base/third_party/sigslot", diff --git a/pc/channel_manager.cc b/pc/channel_manager.cc index 84d74678b5..9d5adcad42 100644 --- a/pc/channel_manager.cc +++ b/pc/channel_manager.cc @@ -193,6 +193,9 @@ VoiceChannel* ChannelManager::CreateVoiceChannel( const webrtc::CryptoOptions& crypto_options, rtc::UniqueRandomIdGenerator* ssrc_generator, const AudioOptions& options) { + // TODO(bugs.webrtc.org/11992): Remove this workaround after updates in + // PeerConnection and add the expectation that we're already on the right + // thread. if (!worker_thread_->IsCurrent()) { return worker_thread_->Invoke(RTC_FROM_HERE, [&] { return CreateVoiceChannel(call, media_config, rtp_transport, @@ -262,6 +265,9 @@ VideoChannel* ChannelManager::CreateVideoChannel( rtc::UniqueRandomIdGenerator* ssrc_generator, const VideoOptions& options, webrtc::VideoBitrateAllocatorFactory* video_bitrate_allocator_factory) { + // TODO(bugs.webrtc.org/11992): Remove this workaround after updates in + // PeerConnection and add the expectation that we're already on the right + // thread. if (!worker_thread_->IsCurrent()) { return worker_thread_->Invoke(RTC_FROM_HERE, [&] { return CreateVideoChannel(call, media_config, rtp_transport, diff --git a/pc/peer_connection.cc b/pc/peer_connection.cc index c872550d43..df2fabc93f 100644 --- a/pc/peer_connection.cc +++ b/pc/peer_connection.cc @@ -56,6 +56,7 @@ #include "rtc_base/logging.h" #include "rtc_base/string_encode.h" #include "rtc_base/strings/string_builder.h" +#include "rtc_base/task_utils/to_queued_task.h" #include "rtc_base/trace_event.h" #include "system_wrappers/include/clock.h" #include "system_wrappers/include/metrics.h" @@ -1038,6 +1039,8 @@ PeerConnection::PeerConnection(PeerConnectionFactory* factory, local_ice_credentials_to_replace_(new LocalIceCredentialsToReplace()), data_channel_controller_(this), weak_ptr_factory_(this) { + RTC_DCHECK(factory_); + // Note: call_ appears to be set to nullptr by some callers. operations_chain_->SetOnChainEmptyCallback( [this_weak_ptr = weak_ptr_factory_.GetWeakPtr()]() { if (!this_weak_ptr) @@ -1082,6 +1085,7 @@ PeerConnection::~PeerConnection() { // call_ and event_log_ must be destroyed on the worker thread. worker_thread()->Invoke(RTC_FROM_HERE, [this] { RTC_DCHECK_RUN_ON(worker_thread()); + call_safety_.reset(); call_.reset(); // The event log must outlive call (and any other object that uses it). event_log_.reset(); @@ -1215,25 +1219,7 @@ bool PeerConnection::Initialize( ? *configuration.crypto_options : options.crypto_options; config.transport_observer = this; - // It's safe to pass |this| and using |rtcp_invoker_| and the |call_| pointer - // since the JsepTransportController instance is owned by this PeerConnection - // instance and is destroyed before both |rtcp_invoker_| and the |call_| - // pointer. - config.rtcp_handler = [this](const rtc::CopyOnWriteBuffer& packet, - int64_t packet_time_us) { - RTC_DCHECK_RUN_ON(network_thread()); - rtcp_invoker_.AsyncInvoke( - RTC_FROM_HERE, worker_thread(), [this, packet, packet_time_us] { - RTC_DCHECK_RUN_ON(worker_thread()); - // |call_| is reset on the worker thread in the PeerConnection - // destructor, so we check that it's still valid before propagating - // the packet. - if (call_) { - call_->Receiver()->DeliverPacket(MediaType::ANY, packet, - packet_time_us); - } - }); - }; + config.rtcp_handler = InitializeRtcpCallback(); config.event_log = event_log_ptr_; #if defined(ENABLE_EXTERNAL_AUTH) config.enable_external_auth = true; @@ -3619,6 +3605,13 @@ RTCError PeerConnection::UpdateTransceiverChannel( const cricket::ContentGroup* bundle_group) { RTC_DCHECK(IsUnifiedPlan()); RTC_DCHECK(transceiver); + // TODO(bugs.webrtc.org/11992): This function always returns RTCError::OK(). + // Some of the below methods, specifically Create & Destroy, need to be called + // on the worker thread. Consider if there should be a split here where we do + // things asynchronously in two steps and change the return type of the + // function to be void. Note that in the case of 'create', that would/could + // mean that SetChannel might get called at a much later stage than it happens + // now. cricket::ChannelInterface* channel = transceiver->internal()->channel(); if (content.rejected) { if (channel) { @@ -4462,6 +4455,7 @@ void PeerConnection::Close() { worker_thread()->Invoke(RTC_FROM_HERE, [this] { RTC_DCHECK_RUN_ON(worker_thread()); + call_safety_.reset(); call_.reset(); // The event log must outlive call (and any other object that uses it). event_log_.reset(); @@ -6676,6 +6670,9 @@ cricket::VoiceChannel* PeerConnection::CreateVoiceChannel( const std::string& mid) { RtpTransportInternal* rtp_transport = GetRtpTransport(mid); + // TODO(bugs.webrtc.org/11992): CreateVoiceChannel internally switches to the + // worker thread. We shouldn't be using the |call_ptr_| hack here but simply + // be on the worker thread and use |call_| (update upstream code). cricket::VoiceChannel* voice_channel = channel_manager()->CreateVoiceChannel( call_ptr_, configuration_.media_config, rtp_transport, signaling_thread(), mid, SrtpRequired(), GetCryptoOptions(), &ssrc_generator_, @@ -6697,6 +6694,9 @@ cricket::VideoChannel* PeerConnection::CreateVideoChannel( const std::string& mid) { RtpTransportInternal* rtp_transport = GetRtpTransport(mid); + // TODO(bugs.webrtc.org/11992): CreateVideoChannel internally switches to the + // worker thread. We shouldn't be using the |call_ptr_| hack here but simply + // be on the worker thread and use |call_| (update upstream code). cricket::VideoChannel* video_channel = channel_manager()->CreateVideoChannel( call_ptr_, configuration_.media_config, rtp_transport, signaling_thread(), mid, SrtpRequired(), GetCryptoOptions(), &ssrc_generator_, video_options_, @@ -7367,6 +7367,10 @@ void PeerConnection::DestroyDataChannelTransport() { void PeerConnection::DestroyChannelInterface( cricket::ChannelInterface* channel) { + // TODO(bugs.webrtc.org/11992): All the below methods should be called on the + // worker thread. (they switch internally anyway). Change + // DestroyChannelInterface to either be called on the worker thread, or do + // this asynchronously on the worker. RTC_DCHECK(channel); switch (channel->media_type()) { case cricket::MEDIA_TYPE_AUDIO: @@ -7786,4 +7790,37 @@ RTCError PeerConnection::Rollback(SdpType desc_type) { return RTCError::OK(); } +std::function +PeerConnection::InitializeRtcpCallback() { + RTC_DCHECK_RUN_ON(signaling_thread()); + + auto flag = + worker_thread()->Invoke>( + RTC_FROM_HERE, [this] { + RTC_DCHECK_RUN_ON(worker_thread()); + if (!call_) + return rtc::scoped_refptr(); + if (!call_safety_) + call_safety_.reset(new ScopedTaskSafety()); + return call_safety_->flag(); + }); + + if (!flag) + return [](const rtc::CopyOnWriteBuffer&, int64_t) {}; + + return [this, flag = std::move(flag)](const rtc::CopyOnWriteBuffer& packet, + int64_t packet_time_us) { + RTC_DCHECK_RUN_ON(network_thread()); + // TODO(bugs.webrtc.org/11993): We should actually be delivering this call + // directly to the Call class somehow directly on the network thread and not + // incur this hop here. The DeliverPacket() method will eventually just have + // to hop back over to the network thread. + worker_thread()->PostTask(ToQueuedTask(flag, [this, packet, + packet_time_us] { + RTC_DCHECK_RUN_ON(worker_thread()); + call_->Receiver()->DeliverPacket(MediaType::ANY, packet, packet_time_us); + })); + }; +} } // namespace webrtc diff --git a/pc/peer_connection.h b/pc/peer_connection.h index 1385ce13a6..12775ba232 100644 --- a/pc/peer_connection.h +++ b/pc/peer_connection.h @@ -11,6 +11,7 @@ #ifndef PC_PEER_CONNECTION_H_ #define PC_PEER_CONNECTION_H_ +#include #include #include #include @@ -36,6 +37,7 @@ #include "rtc_base/experiments/field_trial_parser.h" #include "rtc_base/operations_chain.h" #include "rtc_base/race_checker.h" +#include "rtc_base/task_utils/pending_task_safety_flag.h" #include "rtc_base/unique_id_generator.h" #include "rtc_base/weak_ptr.h" @@ -1149,6 +1151,10 @@ class PeerConnection : public PeerConnectionInternal, // | desc_type | is the type of the description that caused the rollback. RTCError Rollback(SdpType desc_type); + std::function + InitializeRtcpCallback(); + // Storing the factory as a scoped reference pointer ensures that the memory // in the PeerConnectionFactoryImpl remains available as long as the // PeerConnection is running. It is passed to PeerConnection as a raw pointer. @@ -1234,11 +1240,13 @@ class PeerConnection : public PeerConnectionInternal, // The unique_ptr belongs to the worker thread, but the Call object manages // its own thread safety. std::unique_ptr call_ RTC_GUARDED_BY(worker_thread()); - - rtc::AsyncInvoker rtcp_invoker_ RTC_GUARDED_BY(network_thread()); + std::unique_ptr call_safety_ + RTC_GUARDED_BY(worker_thread()); // Points to the same thing as `call_`. Since it's const, we may read the // pointer from any thread. + // TODO(bugs.webrtc.org/11992): Remove this workaround (and potential dangling + // pointer). Call* const call_ptr_; std::unique_ptr stats_