diff --git a/modules/congestion_controller/rtp/BUILD.gn b/modules/congestion_controller/rtp/BUILD.gn index 1a70447307..39d4d68192 100644 --- a/modules/congestion_controller/rtp/BUILD.gn +++ b/modules/congestion_controller/rtp/BUILD.gn @@ -61,6 +61,7 @@ rtc_library("transport_feedback") { "../../../rtc_base:rtc_base_approved", "../../../rtc_base/network:sent_packet", "../../../rtc_base/synchronization:mutex", + "../../../rtc_base/system:no_unique_address", "../../../system_wrappers", "../../../system_wrappers:field_trial", "../../rtp_rtcp:rtp_rtcp_format", diff --git a/modules/congestion_controller/rtp/transport_feedback_demuxer.cc b/modules/congestion_controller/rtp/transport_feedback_demuxer.cc index 6ab3ad80fa..62b85b10b8 100644 --- a/modules/congestion_controller/rtp/transport_feedback_demuxer.cc +++ b/modules/congestion_controller/rtp/transport_feedback_demuxer.cc @@ -15,10 +15,17 @@ namespace webrtc { namespace { static const size_t kMaxPacketsInHistory = 5000; } + +TransportFeedbackDemuxer::TransportFeedbackDemuxer() { + // In case the construction thread is different from where the registration + // and callbacks occur, detach from the construction thread. + observer_checker_.Detach(); +} + void TransportFeedbackDemuxer::RegisterStreamFeedbackObserver( std::vector ssrcs, StreamFeedbackObserver* observer) { - MutexLock lock(&observers_lock_); + RTC_DCHECK_RUN_ON(&observer_checker_); RTC_DCHECK(observer); RTC_DCHECK(absl::c_find_if(observers_, [=](const auto& pair) { return pair.second == observer; @@ -28,7 +35,7 @@ void TransportFeedbackDemuxer::RegisterStreamFeedbackObserver( void TransportFeedbackDemuxer::DeRegisterStreamFeedbackObserver( StreamFeedbackObserver* observer) { - MutexLock lock(&observers_lock_); + RTC_DCHECK_RUN_ON(&observer_checker_); RTC_DCHECK(observer); const auto it = absl::c_find_if( observers_, [=](const auto& pair) { return pair.second == observer; }); @@ -65,14 +72,14 @@ void TransportFeedbackDemuxer::OnTransportFeedback( if (it != history_.end()) { auto packet_info = it->second; packet_info.received = packet.received(); - stream_feedbacks.push_back(packet_info); + stream_feedbacks.push_back(std::move(packet_info)); if (packet.received()) history_.erase(it); } } } - MutexLock lock(&observers_lock_); + RTC_DCHECK_RUN_ON(&observer_checker_); for (auto& observer : observers_) { std::vector selected_feedback; for (const auto& packet_info : stream_feedbacks) { diff --git a/modules/congestion_controller/rtp/transport_feedback_demuxer.h b/modules/congestion_controller/rtp/transport_feedback_demuxer.h index 634a37ea1a..895288f776 100644 --- a/modules/congestion_controller/rtp/transport_feedback_demuxer.h +++ b/modules/congestion_controller/rtp/transport_feedback_demuxer.h @@ -14,14 +14,27 @@ #include #include +#include "api/sequence_checker.h" #include "modules/include/module_common_types_public.h" #include "modules/rtp_rtcp/include/rtp_rtcp_defines.h" #include "rtc_base/synchronization/mutex.h" +#include "rtc_base/system/no_unique_address.h" namespace webrtc { -class TransportFeedbackDemuxer : public StreamFeedbackProvider { +// Implementation of StreamFeedbackProvider that provides a way for +// implementations of StreamFeedbackObserver to register for feedback callbacks +// for a given set of SSRCs. +// Registration methods need to be called from the same execution context +// (thread or task queue) and callbacks to +// StreamFeedbackObserver::OnPacketFeedbackVector will be made in that same +// context. +// TODO(tommi): This appears to be the only implementation of this interface. +// Do we need the interface? +class TransportFeedbackDemuxer final : public StreamFeedbackProvider { public: + TransportFeedbackDemuxer(); + // Implements StreamFeedbackProvider interface void RegisterStreamFeedbackObserver( std::vector ssrcs, @@ -40,9 +53,9 @@ class TransportFeedbackDemuxer : public StreamFeedbackProvider { // Maps a set of ssrcs to corresponding observer. Vectors are used rather than // set/map to ensure that the processing order is consistent independently of // the randomized ssrcs. - Mutex observers_lock_; + RTC_NO_UNIQUE_ADDRESS SequenceChecker observer_checker_; std::vector, StreamFeedbackObserver*>> - observers_ RTC_GUARDED_BY(&observers_lock_); + observers_ RTC_GUARDED_BY(&observer_checker_); }; } // namespace webrtc