From fe328ca88a3d8732447bc00dcd8d8eceef3f3d8c Mon Sep 17 00:00:00 2001 From: Tomas Gunnarsson Date: Wed, 16 Feb 2022 20:02:12 +0100 Subject: [PATCH] Add several thread checks to RtpSender classes. Minor related updates to AudioTrack and VideoTrack's sequence checkers. There's more that can be done (or arguably needs to), but this is a start. Bug: none Change-Id: I3ccf8eb9bbb6bef62b83248a23a68871b9fcd9e1 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/251843 Reviewed-by: Niels Moller Commit-Queue: Tomas Gunnarsson Cr-Commit-Position: refs/heads/main@{#36021} --- pc/BUILD.gn | 2 ++ pc/audio_track.cc | 8 ++++---- pc/audio_track.h | 3 ++- pc/rtp_sender.cc | 25 +++++++++++++++++++++---- pc/rtp_sender.h | 29 +++++++++++++++++++++++------ pc/video_track.h | 1 + 6 files changed, 53 insertions(+), 15 deletions(-) diff --git a/pc/BUILD.gn b/pc/BUILD.gn index 8aed865acc..277657e6a0 100644 --- a/pc/BUILD.gn +++ b/pc/BUILD.gn @@ -1261,6 +1261,7 @@ rtc_library("audio_track") { "../api:sequence_checker", "../rtc_base:checks", "../rtc_base:refcount", + "../rtc_base/system:no_unique_address", ] } @@ -1281,6 +1282,7 @@ rtc_library("video_track") { "../rtc_base:refcount", "../rtc_base:rtc_base_approved", "../rtc_base:threading", + "../rtc_base/system:no_unique_address", ] } diff --git a/pc/audio_track.cc b/pc/audio_track.cc index be087f693b..ae8914d634 100644 --- a/pc/audio_track.cc +++ b/pc/audio_track.cc @@ -32,7 +32,7 @@ AudioTrack::AudioTrack(const std::string& label, } AudioTrack::~AudioTrack() { - RTC_DCHECK_RUN_ON(&thread_checker_); + RTC_DCHECK_RUN_ON(&signaling_thread_checker_); set_state(MediaStreamTrackInterface::kEnded); if (audio_source_) audio_source_->UnregisterObserver(this); @@ -48,19 +48,19 @@ AudioSourceInterface* AudioTrack::GetSource() const { } void AudioTrack::AddSink(AudioTrackSinkInterface* sink) { - RTC_DCHECK_RUN_ON(&thread_checker_); + RTC_DCHECK_RUN_ON(&signaling_thread_checker_); if (audio_source_) audio_source_->AddSink(sink); } void AudioTrack::RemoveSink(AudioTrackSinkInterface* sink) { - RTC_DCHECK_RUN_ON(&thread_checker_); + RTC_DCHECK_RUN_ON(&signaling_thread_checker_); if (audio_source_) audio_source_->RemoveSink(sink); } void AudioTrack::OnChanged() { - RTC_DCHECK_RUN_ON(&thread_checker_); + RTC_DCHECK_RUN_ON(&signaling_thread_checker_); if (audio_source_->state() == MediaSourceInterface::kEnded) { set_state(kEnded); } else { diff --git a/pc/audio_track.h b/pc/audio_track.h index 8a705cf8fb..f357391987 100644 --- a/pc/audio_track.h +++ b/pc/audio_track.h @@ -17,6 +17,7 @@ #include "api/media_stream_track.h" #include "api/scoped_refptr.h" #include "api/sequence_checker.h" +#include "rtc_base/system/no_unique_address.h" namespace webrtc { @@ -53,7 +54,7 @@ class AudioTrack : public MediaStreamTrack, private: const rtc::scoped_refptr audio_source_; - SequenceChecker thread_checker_; + RTC_NO_UNIQUE_ADDRESS webrtc::SequenceChecker signaling_thread_checker_; }; } // namespace webrtc diff --git a/pc/rtp_sender.cc b/pc/rtp_sender.cc index 957f514205..110b5aae0a 100644 --- a/pc/rtp_sender.cc +++ b/pc/rtp_sender.cc @@ -111,7 +111,8 @@ bool UnimplementedRtpParameterHasValue(const RtpParameters& parameters) { RtpSenderBase::RtpSenderBase(rtc::Thread* worker_thread, const std::string& id, SetStreamsObserver* set_streams_observer) - : worker_thread_(worker_thread), + : signaling_thread_(rtc::Thread::Current()), + worker_thread_(worker_thread), id_(id), set_streams_observer_(set_streams_observer) { RTC_DCHECK(worker_thread); @@ -120,6 +121,7 @@ RtpSenderBase::RtpSenderBase(rtc::Thread* worker_thread, void RtpSenderBase::SetFrameEncryptor( rtc::scoped_refptr frame_encryptor) { + RTC_DCHECK_RUN_ON(signaling_thread_); frame_encryptor_ = std::move(frame_encryptor); // Special Case: Set the frame encryptor to any value on any existing channel. if (media_channel_ && ssrc_ && !stopped_) { @@ -136,6 +138,7 @@ void RtpSenderBase::SetMediaChannel(cricket::MediaChannel* media_channel) { } RtpParameters RtpSenderBase::GetParametersInternal() const { + RTC_DCHECK_RUN_ON(signaling_thread_); if (stopped_) { return RtpParameters(); } @@ -150,6 +153,7 @@ RtpParameters RtpSenderBase::GetParametersInternal() const { } RtpParameters RtpSenderBase::GetParameters() const { + RTC_DCHECK_RUN_ON(signaling_thread_); RtpParameters result = GetParametersInternal(); last_transaction_id_ = rtc::CreateRandomUuid(); result.transaction_id = last_transaction_id_.value(); @@ -157,6 +161,7 @@ RtpParameters RtpSenderBase::GetParameters() const { } RTCError RtpSenderBase::SetParametersInternal(const RtpParameters& parameters) { + RTC_DCHECK_RUN_ON(signaling_thread_); RTC_DCHECK(!stopped_); if (UnimplementedRtpParameterHasValue(parameters)) { @@ -186,6 +191,7 @@ RTCError RtpSenderBase::SetParametersInternal(const RtpParameters& parameters) { } RTCError RtpSenderBase::SetParameters(const RtpParameters& parameters) { + RTC_DCHECK_RUN_ON(signaling_thread_); TRACE_EVENT0("webrtc", "RtpSenderBase::SetParameters"); if (is_transceiver_stopped_) { LOG_AND_RETURN_ERROR( @@ -225,6 +231,7 @@ void RtpSenderBase::SetStreams(const std::vector& stream_ids) { } bool RtpSenderBase::SetTrack(MediaStreamTrackInterface* track) { + RTC_DCHECK_RUN_ON(signaling_thread_); TRACE_EVENT0("webrtc", "RtpSenderBase::SetTrack"); if (stopped_) { RTC_LOG(LS_ERROR) << "SetTrack can't be called on a stopped RtpSender."; @@ -266,6 +273,7 @@ bool RtpSenderBase::SetTrack(MediaStreamTrackInterface* track) { } void RtpSenderBase::SetSsrc(uint32_t ssrc) { + RTC_DCHECK_RUN_ON(signaling_thread_); TRACE_EVENT0("webrtc", "RtpSenderBase::SetSsrc"); if (stopped_ || ssrc == ssrc_) { return; @@ -315,6 +323,7 @@ void RtpSenderBase::SetSsrc(uint32_t ssrc) { } void RtpSenderBase::Stop() { + RTC_DCHECK_RUN_ON(signaling_thread_); TRACE_EVENT0("webrtc", "RtpSenderBase::Stop"); // TODO(deadbeef): Need to do more here to fully stop sending packets. if (stopped_) { @@ -335,6 +344,7 @@ void RtpSenderBase::Stop() { RTCError RtpSenderBase::DisableEncodingLayers( const std::vector& rids) { + RTC_DCHECK_RUN_ON(signaling_thread_); if (stopped_) { LOG_AND_RETURN_ERROR(RTCErrorType::INVALID_STATE, "Cannot disable encodings on a stopped sender."); @@ -381,6 +391,7 @@ RTCError RtpSenderBase::DisableEncodingLayers( void RtpSenderBase::SetEncoderToPacketizerFrameTransformer( rtc::scoped_refptr frame_transformer) { + RTC_DCHECK_RUN_ON(signaling_thread_); frame_transformer_ = std::move(frame_transformer); if (media_channel_ && ssrc_ && !stopped_) { worker_thread_->Invoke(RTC_FROM_HERE, [&] { @@ -483,7 +494,7 @@ sigslot::signal0<>* AudioRtpSender::GetOnDestroyedSignal() { } void AudioRtpSender::OnChanged() { - // Running on the signaling thread. + RTC_DCHECK_RUN_ON(signaling_thread_); TRACE_EVENT0("webrtc", "AudioRtpSender::OnChanged"); RTC_DCHECK(!stopped_); if (cached_track_enabled_ != track_->enabled()) { @@ -518,10 +529,12 @@ void AudioRtpSender::RemoveTrackFromStats() { } rtc::scoped_refptr AudioRtpSender::GetDtmfSender() const { + RTC_DCHECK_RUN_ON(signaling_thread_); return dtmf_sender_proxy_; } void AudioRtpSender::SetSend() { + RTC_DCHECK_RUN_ON(signaling_thread_); RTC_DCHECK(!stopped_); RTC_DCHECK(can_send_track()); if (!media_channel_) { @@ -552,6 +565,7 @@ void AudioRtpSender::SetSend() { } void AudioRtpSender::ClearSend() { + RTC_DCHECK_RUN_ON(signaling_thread_); RTC_DCHECK(ssrc_ != 0); RTC_DCHECK(!stopped_); if (!media_channel_) { @@ -585,7 +599,7 @@ VideoRtpSender::~VideoRtpSender() { } void VideoRtpSender::OnChanged() { - // Running on the signaling thread. + RTC_DCHECK_RUN_ON(signaling_thread_); TRACE_EVENT0("webrtc", "VideoRtpSender::OnChanged"); RTC_DCHECK(!stopped_); @@ -604,11 +618,13 @@ void VideoRtpSender::AttachTrack() { } rtc::scoped_refptr VideoRtpSender::GetDtmfSender() const { - RTC_LOG(LS_ERROR) << "Tried to get DTMF sender from video sender."; + RTC_DCHECK_RUN_ON(signaling_thread_); + RTC_DLOG(LS_ERROR) << "Tried to get DTMF sender from video sender."; return nullptr; } void VideoRtpSender::SetSend() { + RTC_DCHECK_RUN_ON(signaling_thread_); RTC_DCHECK(!stopped_); RTC_DCHECK(can_send_track()); if (!media_channel_) { @@ -640,6 +656,7 @@ void VideoRtpSender::SetSend() { } void VideoRtpSender::ClearSend() { + RTC_DCHECK_RUN_ON(signaling_thread_); RTC_DCHECK(ssrc_ != 0); RTC_DCHECK(!stopped_); if (!media_channel_) { diff --git a/pc/rtp_sender.h b/pc/rtp_sender.h index 4bc16c796f..b87f8d4813 100644 --- a/pc/rtp_sender.h +++ b/pc/rtp_sender.h @@ -104,6 +104,9 @@ class RtpSenderBase : public RtpSenderInternal, public ObserverInterface { bool SetTrack(MediaStreamTrackInterface* track) override; rtc::scoped_refptr track() const override { + // This method is currently called from the worker thread by + // RTCStatsCollector::PrepareTransceiverStatsInfosAndCallStats_s_w_n. + // RTC_DCHECK_RUN_ON(signaling_thread_); return track_; } @@ -120,9 +123,17 @@ class RtpSenderBase : public RtpSenderInternal, public ObserverInterface { // underlying transport (this occurs if the sender isn't seen in a local // description). void SetSsrc(uint32_t ssrc) override; - uint32_t ssrc() const override { return ssrc_; } + uint32_t ssrc() const override { + // This method is currently called from the worker thread by + // RTCStatsCollector::PrepareTransceiverStatsInfosAndCallStats_s_w_n. + // RTC_DCHECK_RUN_ON(signaling_thread_); + return ssrc_; + } - std::vector stream_ids() const override { return stream_ids_; } + std::vector stream_ids() const override { + RTC_DCHECK_RUN_ON(signaling_thread_); + return stream_ids_; + } void set_stream_ids(const std::vector& stream_ids) override { stream_ids_ = stream_ids; } @@ -135,6 +146,7 @@ class RtpSenderBase : public RtpSenderInternal, public ObserverInterface { init_parameters_.encodings = init_send_encodings; } std::vector init_send_encodings() const override { + RTC_DCHECK_RUN_ON(signaling_thread_); return init_parameters_.encodings; } @@ -143,6 +155,7 @@ class RtpSenderBase : public RtpSenderInternal, public ObserverInterface { dtls_transport_ = dtls_transport; } rtc::scoped_refptr dtls_transport() const override { + RTC_DCHECK_RUN_ON(signaling_thread_); return dtls_transport_; } @@ -168,7 +181,10 @@ class RtpSenderBase : public RtpSenderInternal, public ObserverInterface { void SetEncoderToPacketizerFrameTransformer( rtc::scoped_refptr frame_transformer) override; - void SetTransceiverAsStopped() override { is_transceiver_stopped_ = true; } + void SetTransceiverAsStopped() override { + RTC_DCHECK_RUN_ON(signaling_thread_); + is_transceiver_stopped_ = true; + } protected: // If `set_streams_observer` is not null, it is invoked when SetStreams() @@ -195,10 +211,11 @@ class RtpSenderBase : public RtpSenderInternal, public ObserverInterface { virtual void AddTrackToStats() {} virtual void RemoveTrackFromStats() {} - rtc::Thread* worker_thread_; + rtc::Thread* const signaling_thread_; + rtc::Thread* const worker_thread_; uint32_t ssrc_ = 0; - bool stopped_ = false; - bool is_transceiver_stopped_ = false; + bool stopped_ RTC_GUARDED_BY(signaling_thread_) = false; + bool is_transceiver_stopped_ RTC_GUARDED_BY(signaling_thread_) = false; int attachment_id_ = 0; const std::string id_; diff --git a/pc/video_track.h b/pc/video_track.h index 4328f188af..bf433b5f5a 100644 --- a/pc/video_track.h +++ b/pc/video_track.h @@ -21,6 +21,7 @@ #include "api/video/video_sink_interface.h" #include "api/video/video_source_interface.h" #include "media/base/video_source_base.h" +#include "rtc_base/system/no_unique_address.h" #include "rtc_base/thread.h" #include "rtc_base/thread_annotations.h"