From 6fba6b795a8c9109ef2bd5bbee56797a392b53d9 Mon Sep 17 00:00:00 2001 From: Tommi Date: Fri, 28 Jan 2022 09:00:01 +0100 Subject: [PATCH] Reland "(Un/)Subscribe RtpVideoSender for feedback on the transport queue." MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This is a reland of 9d230d54c7eef31ac1100f0aeef1374dd1ac62fa Original change's description: > (Un/)Subscribe RtpVideoSender for feedback on the transport queue. > > * RtpVideoSender now registers/unregisters for feedback callback > inside of SetActive(), which runs on the transport queue. > * Transport feedback is given on the transport queue > * Registration/unregistration for feedback is done on the same > * Removed the last mutex from TransportFeedbackDemuxer. > > Ultimately, this work is related to moving state from the Call > class, that's related to network configuration, but due to the code > is currently written is attached to the worker thread, over to the > Transport, where it's used (e.g. suspended_video_send_ssrcs_). > > Bug: webrtc:13517, webrtc:11993 > Change-Id: I057d0e2597e6cb746b335e0308599cd547350e56 > Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/248165 > Reviewed-by: Erik Språng > Commit-Queue: Tomas Gunnarsson > Cr-Commit-Position: refs/heads/main@{#35777} Bug: webrtc:13517, webrtc:11993 Change-Id: I766e569abea8bae96d32267a951fcdc195ced8a7 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/249782 Reviewed-by: Stefan Holmer Reviewed-by: Erik Språng Commit-Queue: Tomas Gunnarsson Cr-Commit-Position: refs/heads/main@{#35863} --- call/rtp_transport_controller_send.cc | 2 +- call/rtp_video_sender.cc | 17 +++-- call/rtp_video_sender.h | 1 + call/rtp_video_sender_unittest.cc | 66 +++++++++++++------ .../rtp/transport_feedback_demuxer.cc | 32 ++++----- .../rtp/transport_feedback_demuxer.h | 8 +-- 6 files changed, 75 insertions(+), 51 deletions(-) diff --git a/call/rtp_transport_controller_send.cc b/call/rtp_transport_controller_send.cc index fc4f483087..230b048ce4 100644 --- a/call/rtp_transport_controller_send.cc +++ b/call/rtp_transport_controller_send.cc @@ -572,10 +572,10 @@ void RtpTransportControllerSend::OnAddPacket( void RtpTransportControllerSend::OnTransportFeedback( const rtcp::TransportFeedback& feedback) { - feedback_demuxer_.OnTransportFeedback(feedback); auto feedback_time = Timestamp::Millis(clock_->TimeInMilliseconds()); task_queue_.PostTask([this, feedback, feedback_time]() { RTC_DCHECK_RUN_ON(&task_queue_); + feedback_demuxer_.OnTransportFeedback(feedback); absl::optional feedback_msg = transport_feedback_adapter_.ProcessTransportFeedback(feedback, feedback_time); diff --git a/call/rtp_video_sender.cc b/call/rtp_video_sender.cc index 78cf2817b3..35e6beeb7c 100644 --- a/call/rtp_video_sender.cc +++ b/call/rtp_video_sender.cc @@ -445,9 +445,6 @@ RtpVideoSender::RtpVideoSender( fec_controller_->SetProtectionMethod(fec_enabled, NackEnabled()); fec_controller_->SetProtectionCallback(this); - // Signal congestion controller this object is ready for OnPacket* callbacks. - transport_->GetStreamFeedbackProvider()->RegisterStreamFeedbackObserver( - rtp_config_.ssrcs, this); // Construction happens on the worker thread (see Call::CreateVideoSendStream) // but subseqeuent calls to the RTP state will happen on one of two threads: @@ -466,8 +463,8 @@ RtpVideoSender::~RtpVideoSender() { SetActiveModulesLocked( std::vector(rtp_streams_.size(), /*active=*/false)); - transport_->GetStreamFeedbackProvider()->DeRegisterStreamFeedbackObserver( - this); + + RTC_DCHECK(!registered_for_feedback_); } void RtpVideoSender::SetActive(bool active) { @@ -475,8 +472,18 @@ void RtpVideoSender::SetActive(bool active) { MutexLock lock(&mutex_); if (active_ == active) return; + const std::vector active_modules(rtp_streams_.size(), active); SetActiveModulesLocked(active_modules); + + auto* feedback_provider = transport_->GetStreamFeedbackProvider(); + if (active && !registered_for_feedback_) { + feedback_provider->RegisterStreamFeedbackObserver(rtp_config_.ssrcs, this); + registered_for_feedback_ = true; + } else if (!active && registered_for_feedback_) { + feedback_provider->DeRegisterStreamFeedbackObserver(this); + registered_for_feedback_ = false; + } } void RtpVideoSender::SetActiveModules(const std::vector active_modules) { diff --git a/call/rtp_video_sender.h b/call/rtp_video_sender.h index ea0d1ee87d..7023804506 100644 --- a/call/rtp_video_sender.h +++ b/call/rtp_video_sender.h @@ -180,6 +180,7 @@ class RtpVideoSender : public RtpVideoSenderInterface, // transport task queue. mutable Mutex mutex_; bool active_ RTC_GUARDED_BY(mutex_); + bool registered_for_feedback_ RTC_GUARDED_BY(transport_checker_) = false; const std::unique_ptr fec_controller_; bool fec_allowed_ RTC_GUARDED_BY(mutex_); diff --git a/call/rtp_video_sender_unittest.cc b/call/rtp_video_sender_unittest.cc index 0644556e2f..c47717da7f 100644 --- a/call/rtp_video_sender_unittest.cc +++ b/call/rtp_video_sender_unittest.cc @@ -13,6 +13,7 @@ #include #include #include +#include #include "call/rtp_transport_controller_send.h" #include "modules/rtp_rtcp/include/rtp_rtcp_defines.h" @@ -177,10 +178,33 @@ class RtpVideoSenderTestFixture { /*frame_count_observer=*/nullptr, /*frame_transformer=*/nullptr) {} + ~RtpVideoSenderTestFixture() { SetActive(false); } + RtpVideoSender* router() { return router_.get(); } MockTransport& transport() { return transport_; } void AdvanceTime(TimeDelta delta) { time_controller_.AdvanceTime(delta); } + void SetActive(bool active) { + RunOnTransportQueue([&]() { router_->SetActive(active); }); + } + + void SetActiveModules(const std::vector& active_modules) { + RunOnTransportQueue([&]() { router_->SetActiveModules(active_modules); }); + } + + // Several RtpVideoSender methods expect to be called on the task queue as + // owned by the send transport. While the SequenceChecker may pick up the + // default thread as the transport queue, explicit checks for the transport + // queue (not just using a SequenceChecker) aren't possible unless such a + // queue is actually active. So RunOnTransportQueue is a convenience function + // that allow for running a closure on the transport queue, similar to + // SendTask(). + template + void RunOnTransportQueue(Closure&& task) { + transport_controller_.GetWorkerQueue()->PostTask(std::move(task)); + AdvanceTime(TimeDelta::Millis(0)); + } + private: NiceMock transport_; NiceMock encoder_feedback_; @@ -217,15 +241,15 @@ TEST(RtpVideoSenderTest, SendOnOneModule) { EXPECT_NE(EncodedImageCallback::Result::OK, test.router()->OnEncodedImage(encoded_image, nullptr).error); - test.router()->SetActive(true); + test.SetActive(true); EXPECT_EQ(EncodedImageCallback::Result::OK, test.router()->OnEncodedImage(encoded_image, nullptr).error); - test.router()->SetActive(false); + test.SetActive(false); EXPECT_NE(EncodedImageCallback::Result::OK, test.router()->OnEncodedImage(encoded_image, nullptr).error); - test.router()->SetActive(true); + test.SetActive(true); EXPECT_EQ(EncodedImageCallback::Result::OK, test.router()->OnEncodedImage(encoded_image, nullptr).error); } @@ -244,7 +268,7 @@ TEST(RtpVideoSenderTest, SendSimulcastSetActive) { CodecSpecificInfo codec_info; codec_info.codecType = kVideoCodecVP8; - test.router()->SetActive(true); + test.SetActive(true); EXPECT_EQ(EncodedImageCallback::Result::OK, test.router()->OnEncodedImage(encoded_image_1, &codec_info).error); @@ -254,7 +278,7 @@ TEST(RtpVideoSenderTest, SendSimulcastSetActive) { test.router()->OnEncodedImage(encoded_image_2, &codec_info).error); // Inactive. - test.router()->SetActive(false); + test.SetActive(false); EXPECT_NE(EncodedImageCallback::Result::OK, test.router()->OnEncodedImage(encoded_image_1, &codec_info).error); EXPECT_NE(EncodedImageCallback::Result::OK, @@ -284,14 +308,14 @@ TEST(RtpVideoSenderTest, SendSimulcastSetActiveModules) { // Only setting one stream to active will still set the payload router to // active and allow sending data on the active stream. std::vector active_modules({true, false}); - test.router()->SetActiveModules(active_modules); + test.SetActiveModules(active_modules); EXPECT_EQ(EncodedImageCallback::Result::OK, test.router()->OnEncodedImage(encoded_image_1, &codec_info).error); // Setting both streams to inactive will turn the payload router to // inactive. active_modules = {false, false}; - test.router()->SetActiveModules(active_modules); + test.SetActiveModules(active_modules); // An incoming encoded image will not ask the module to send outgoing data // because the payload router is inactive. EXPECT_NE(EncodedImageCallback::Result::OK, @@ -303,7 +327,7 @@ TEST(RtpVideoSenderTest, SendSimulcastSetActiveModules) { TEST(RtpVideoSenderTest, CreateWithNoPreviousStates) { RtpVideoSenderTestFixture test({kSsrc1, kSsrc2}, {kRtxSsrc1, kRtxSsrc2}, kPayloadType, {}); - test.router()->SetActive(true); + test.SetActive(true); std::map initial_states = test.router()->GetRtpPayloadStates(); @@ -328,7 +352,7 @@ TEST(RtpVideoSenderTest, CreateWithPreviousStates) { RtpVideoSenderTestFixture test({kSsrc1, kSsrc2}, {kRtxSsrc1, kRtxSsrc2}, kPayloadType, states); - test.router()->SetActive(true); + test.SetActive(true); std::map initial_states = test.router()->GetRtpPayloadStates(); @@ -368,7 +392,7 @@ TEST(RtpVideoSenderTest, FrameCountCallbacks) { test.router()->OnEncodedImage(encoded_image, nullptr).error); ::testing::Mock::VerifyAndClearExpectations(&callback); - test.router()->SetActive(true); + test.SetActive(true); FrameCounts frame_counts; EXPECT_CALL(callback, FrameCountUpdated(_, kSsrc1)) @@ -397,7 +421,7 @@ TEST(RtpVideoSenderTest, FrameCountCallbacks) { TEST(RtpVideoSenderTest, DoesNotRetrasmitAckedPackets) { RtpVideoSenderTestFixture test({kSsrc1, kSsrc2}, {kRtxSsrc1, kRtxSsrc2}, kPayloadType, {}); - test.router()->SetActive(true); + test.SetActive(true); constexpr uint8_t kPayload = 'a'; EncodedImage encoded_image; @@ -496,8 +520,8 @@ TEST(RtpVideoSenderTest, DoesNotRetrasmitAckedPackets) { } // This tests that we utilize transport wide feedback to retransmit lost -// packets. This is tested by dropping all ordirary packets from a "lossy" -// stream send along with an secondary untouched stream. The transport wide +// packets. This is tested by dropping all ordinary packets from a "lossy" +// stream sent along with a secondary untouched stream. The transport wide // feedback packets from the secondary stream allows the sending side to // detect and retreansmit the lost packets from the lossy stream. TEST(RtpVideoSenderTest, RetransmitsOnTransportWideLossInfo) { @@ -562,7 +586,7 @@ TEST(RtpVideoSenderTest, RetransmitsOnTransportWideLossInfo) { TEST(RtpVideoSenderTest, EarlyRetransmits) { RtpVideoSenderTestFixture test({kSsrc1, kSsrc2}, {kRtxSsrc1, kRtxSsrc2}, kPayloadType, {}); - test.router()->SetActive(true); + test.SetActive(true); const uint8_t kPayload[1] = {'a'}; EncodedImage encoded_image; @@ -657,7 +681,7 @@ TEST(RtpVideoSenderTest, EarlyRetransmits) { TEST(RtpVideoSenderTest, SupportsDependencyDescriptor) { RtpVideoSenderTestFixture test({kSsrc1}, {}, kPayloadType, {}); - test.router()->SetActive(true); + test.SetActive(true); RtpHeaderExtensionMap extensions; extensions.Register( @@ -717,7 +741,7 @@ TEST(RtpVideoSenderTest, SupportsDependencyDescriptor) { TEST(RtpVideoSenderTest, SupportsDependencyDescriptorForVp9) { RtpVideoSenderTestFixture test({kSsrc1}, {}, kPayloadType, {}); - test.router()->SetActive(true); + test.SetActive(true); RtpHeaderExtensionMap extensions; extensions.Register( @@ -773,7 +797,7 @@ TEST(RtpVideoSenderTest, SupportsDependencyDescriptorForVp9) { TEST(RtpVideoSenderTest, SupportsDependencyDescriptorForVp9NotProvidedByEncoder) { RtpVideoSenderTestFixture test({kSsrc1}, {}, kPayloadType, {}); - test.router()->SetActive(true); + test.SetActive(true); RtpHeaderExtensionMap extensions; extensions.Register( @@ -828,7 +852,7 @@ TEST(RtpVideoSenderTest, GenerateDependecyDescriptorForGenericCodecs) { test::ScopedFieldTrials field_trials( "WebRTC-GenericCodecDependencyDescriptor/Enabled/"); RtpVideoSenderTestFixture test({kSsrc1}, {}, kPayloadType, {}); - test.router()->SetActive(true); + test.SetActive(true); RtpHeaderExtensionMap extensions; extensions.Register( @@ -874,7 +898,7 @@ TEST(RtpVideoSenderTest, GenerateDependecyDescriptorForGenericCodecs) { TEST(RtpVideoSenderTest, SupportsStoppingUsingDependencyDescriptor) { RtpVideoSenderTestFixture test({kSsrc1}, {}, kPayloadType, {}); - test.router()->SetActive(true); + test.SetActive(true); RtpHeaderExtensionMap extensions; extensions.Register( @@ -932,7 +956,7 @@ TEST(RtpVideoSenderTest, SupportsStoppingUsingDependencyDescriptor) { TEST(RtpVideoSenderTest, SupportsStoppingUsingDependencyDescriptorForVp8Simulcast) { RtpVideoSenderTestFixture test({kSsrc1, kSsrc2}, {}, kPayloadType, {}); - test.router()->SetActive(true); + test.SetActive(true); RtpHeaderExtensionMap extensions; extensions.Register( @@ -1017,7 +1041,7 @@ TEST(RtpVideoSenderTest, OverheadIsSubtractedFromTargetBitrate) { kRtpHeaderSizeBytes + kTransportPacketOverheadBytes; RtpVideoSenderTestFixture test({kSsrc1}, {}, kPayloadType, {}); test.router()->OnTransportOverheadChanged(kTransportPacketOverheadBytes); - test.router()->SetActive(true); + test.SetActive(true); { test.router()->OnBitrateUpdated(CreateBitrateAllocationUpdate(300000), diff --git a/modules/congestion_controller/rtp/transport_feedback_demuxer.cc b/modules/congestion_controller/rtp/transport_feedback_demuxer.cc index 29accb5be0..50987b2302 100644 --- a/modules/congestion_controller/rtp/transport_feedback_demuxer.cc +++ b/modules/congestion_controller/rtp/transport_feedback_demuxer.cc @@ -44,11 +44,7 @@ void TransportFeedbackDemuxer::DeRegisterStreamFeedbackObserver( } void TransportFeedbackDemuxer::AddPacket(const RtpPacketSendInfo& packet_info) { - // Currently called on the send transport queue. - // TODO(tommi): When registration/unregistration as well as - // OnTransportFeedback callbacks occur on the transport queue, we can remove - // this lock. - MutexLock lock(&lock_); + RTC_DCHECK_RUN_ON(&observer_checker_); StreamFeedbackObserver::StreamPacketInfo info; info.ssrc = packet_info.media_ssrc; @@ -66,24 +62,22 @@ void TransportFeedbackDemuxer::AddPacket(const RtpPacketSendInfo& packet_info) { void TransportFeedbackDemuxer::OnTransportFeedback( const rtcp::TransportFeedback& feedback) { + RTC_DCHECK_RUN_ON(&observer_checker_); + std::vector stream_feedbacks; - { - MutexLock lock(&lock_); - for (const auto& packet : feedback.GetAllPackets()) { - int64_t seq_num = - seq_num_unwrapper_.UnwrapWithoutUpdate(packet.sequence_number()); - auto it = history_.find(seq_num); - if (it != history_.end()) { - auto packet_info = it->second; - packet_info.received = packet.received(); - stream_feedbacks.push_back(std::move(packet_info)); - if (packet.received()) - history_.erase(it); - } + for (const auto& packet : feedback.GetAllPackets()) { + int64_t seq_num = + seq_num_unwrapper_.UnwrapWithoutUpdate(packet.sequence_number()); + auto it = history_.find(seq_num); + if (it != history_.end()) { + auto packet_info = it->second; + packet_info.received = packet.received(); + stream_feedbacks.push_back(std::move(packet_info)); + if (packet.received()) + history_.erase(it); } } - RTC_DCHECK_RUN_ON(&observer_checker_); for (auto& observer : observers_) { std::vector selected_feedback; for (const auto& packet_info : stream_feedbacks) { diff --git a/modules/congestion_controller/rtp/transport_feedback_demuxer.h b/modules/congestion_controller/rtp/transport_feedback_demuxer.h index 895288f776..7f4f5750d2 100644 --- a/modules/congestion_controller/rtp/transport_feedback_demuxer.h +++ b/modules/congestion_controller/rtp/transport_feedback_demuxer.h @@ -17,7 +17,6 @@ #include "api/sequence_checker.h" #include "modules/include/module_common_types_public.h" #include "modules/rtp_rtcp/include/rtp_rtcp_defines.h" -#include "rtc_base/synchronization/mutex.h" #include "rtc_base/system/no_unique_address.h" namespace webrtc { @@ -45,15 +44,14 @@ class TransportFeedbackDemuxer final : public StreamFeedbackProvider { void OnTransportFeedback(const rtcp::TransportFeedback& feedback); private: - Mutex lock_; - SequenceNumberUnwrapper seq_num_unwrapper_ RTC_GUARDED_BY(&lock_); + RTC_NO_UNIQUE_ADDRESS SequenceChecker observer_checker_; + SequenceNumberUnwrapper seq_num_unwrapper_ RTC_GUARDED_BY(&observer_checker_); std::map history_ - RTC_GUARDED_BY(&lock_); + RTC_GUARDED_BY(&observer_checker_); // Maps a set of ssrcs to corresponding observer. Vectors are used rather than // set/map to ensure that the processing order is consistent independently of // the randomized ssrcs. - RTC_NO_UNIQUE_ADDRESS SequenceChecker observer_checker_; std::vector, StreamFeedbackObserver*>> observers_ RTC_GUARDED_BY(&observer_checker_); };