diff --git a/call/adaptation/BUILD.gn b/call/adaptation/BUILD.gn index b5c72efbb1..dac6b2db5e 100644 --- a/call/adaptation/BUILD.gn +++ b/call/adaptation/BUILD.gn @@ -45,6 +45,7 @@ rtc_library("resource_adaptation") { "../../rtc_base:rtc_task_queue", "../../rtc_base/experiments:balanced_degradation_settings", "../../rtc_base/synchronization:sequence_checker", + "../../rtc_base/task_utils:to_queued_task", "//third_party/abseil-cpp/absl/algorithm:container", "//third_party/abseil-cpp/absl/types:optional", ] @@ -70,6 +71,7 @@ if (rtc_include_tests) { "../../api/video:video_adaptation", "../../api/video_codecs:video_codecs_api", "../../rtc_base:checks", + "../../rtc_base:gunit_helpers", "../../rtc_base:rtc_base_approved", "../../rtc_base:rtc_task_queue", "../../rtc_base:task_queue_for_test", diff --git a/call/adaptation/resource.h b/call/adaptation/resource.h index ddc0fe855f..a58b69fed5 100644 --- a/call/adaptation/resource.h +++ b/call/adaptation/resource.h @@ -12,11 +12,8 @@ #define CALL_ADAPTATION_RESOURCE_H_ #include -#include -#include "absl/types/optional.h" #include "api/scoped_refptr.h" -#include "api/task_queue/task_queue_base.h" #include "call/adaptation/video_source_restrictions.h" #include "call/adaptation/video_stream_input_state.h" #include "rtc_base/ref_count.h" @@ -38,20 +35,18 @@ class ResourceListener { public: virtual ~ResourceListener(); - // Informs the listener of a new measurement of resource usage. This means - // that |resource->usage_state()| is now up-to-date. virtual void OnResourceUsageStateMeasured( - rtc::scoped_refptr resource) = 0; + rtc::scoped_refptr resource, + ResourceUsageState usage_state) = 0; }; -// A Resource monitors an implementation-specific system resource. It may report +// A Resource monitors an implementation-specific resource. It may report // kOveruse or kUnderuse when resource usage is high or low enough that we // should perform some sort of mitigation to fulfil the resource's constraints. // -// All methods defined in this interface, except SetResourceListener(), MUST be -// invoked on the resource adaptation task queue. +// The methods on this interface are invoked on the adaptation task queue. +// Resource usage measurements may be performed on an any task queue. // -// Usage measurements may be performed on an implementation-specific task queue. // The Resource is reference counted to prevent use-after-free when posting // between task queues. As such, the implementation MUST NOT make any // assumptions about which task queue Resource is destructed on. @@ -62,18 +57,9 @@ class Resource : public rtc::RefCountInterface { ~Resource() override; virtual std::string Name() const = 0; - // The listener MUST be informed any time UsageState() changes. + // The |listener| may be informed of resource usage measurements on any task + // queue, but not after this method is invoked with the null argument. virtual void SetResourceListener(ResourceListener* listener) = 0; - // Within a single task running on the adaptation task queue, UsageState() - // MUST return the same value every time it is called. - // TODO(https://crbug.com/webrtc/11618): Remove the UsageState() getter in - // favor of passing the use usage state directly to the ResourceListener. This - // gets rid of this strange requirement of having to return the same thing - // every time. - virtual absl::optional UsageState() const = 0; - // Invalidates current usage measurements, i.e. in response to the system load - // changing. Example: an adaptation was just applied. - virtual void ClearUsageState() = 0; }; } // namespace webrtc diff --git a/call/adaptation/resource_adaptation_processor.cc b/call/adaptation/resource_adaptation_processor.cc index ed8f78ddfd..c1a9c5139e 100644 --- a/call/adaptation/resource_adaptation_processor.cc +++ b/call/adaptation/resource_adaptation_processor.cc @@ -16,10 +16,47 @@ #include "absl/algorithm/container.h" #include "rtc_base/logging.h" +#include "rtc_base/ref_counted_object.h" #include "rtc_base/strings/string_builder.h" +#include "rtc_base/task_utils/to_queued_task.h" namespace webrtc { +ResourceAdaptationProcessor::ResourceListenerDelegate::ResourceListenerDelegate( + ResourceAdaptationProcessor* processor) + : resource_adaptation_queue_(nullptr), processor_(processor) {} + +void ResourceAdaptationProcessor::ResourceListenerDelegate:: + SetResourceAdaptationQueue(TaskQueueBase* resource_adaptation_queue) { + RTC_DCHECK(!resource_adaptation_queue_); + RTC_DCHECK(resource_adaptation_queue); + resource_adaptation_queue_ = resource_adaptation_queue; + RTC_DCHECK_RUN_ON(resource_adaptation_queue_); +} + +void ResourceAdaptationProcessor::ResourceListenerDelegate:: + OnProcessorDestroyed() { + RTC_DCHECK_RUN_ON(resource_adaptation_queue_); + processor_ = nullptr; +} + +void ResourceAdaptationProcessor::ResourceListenerDelegate:: + OnResourceUsageStateMeasured(rtc::scoped_refptr resource, + ResourceUsageState usage_state) { + if (!resource_adaptation_queue_->IsCurrent()) { + resource_adaptation_queue_->PostTask(ToQueuedTask( + [this_ref = rtc::scoped_refptr(this), + resource, usage_state] { + this_ref->OnResourceUsageStateMeasured(resource, usage_state); + })); + return; + } + RTC_DCHECK_RUN_ON(resource_adaptation_queue_); + if (processor_) { + processor_->OnResourceUsageStateMeasured(resource, usage_state); + } +} + ResourceAdaptationProcessor::MitigationResultAndLogMessage:: MitigationResultAndLogMessage() : result(MitigationResult::kAdaptationApplied), message() {} @@ -31,7 +68,9 @@ ResourceAdaptationProcessor::MitigationResultAndLogMessage:: ResourceAdaptationProcessor::ResourceAdaptationProcessor( VideoStreamInputStateProvider* input_state_provider, VideoStreamEncoderObserver* encoder_stats_observer) - : sequence_checker_(), + : resource_adaptation_queue_(nullptr), + resource_listener_delegate_( + new rtc::RefCountedObject(this)), is_resource_adaptation_enabled_(false), input_state_provider_(input_state_provider), encoder_stats_observer_(encoder_stats_observer), @@ -42,12 +81,10 @@ ResourceAdaptationProcessor::ResourceAdaptationProcessor( stream_adapter_(std::make_unique()), last_reported_source_restrictions_(), previous_mitigation_results_(), - processing_in_progress_(false) { - sequence_checker_.Detach(); -} + processing_in_progress_(false) {} ResourceAdaptationProcessor::~ResourceAdaptationProcessor() { - RTC_DCHECK_RUN_ON(&sequence_checker_); + RTC_DCHECK_RUN_ON(resource_adaptation_queue_); RTC_DCHECK(!is_resource_adaptation_enabled_); RTC_DCHECK(restrictions_listeners_.empty()) << "There are restrictions listener(s) depending on a " @@ -61,38 +98,43 @@ ResourceAdaptationProcessor::~ResourceAdaptationProcessor() { RTC_DCHECK(adaptation_listeners_.empty()) << "There are listener(s) attached to a ResourceAdaptationProcessor " << "being destroyed."; + resource_listener_delegate_->OnProcessorDestroyed(); } -void ResourceAdaptationProcessor::InitializeOnResourceAdaptationQueue() { - // Allows |sequence_checker_| to attach to the resource adaptation queue. - // The caller is responsible for ensuring that this is the current queue. - RTC_DCHECK_RUN_ON(&sequence_checker_); +void ResourceAdaptationProcessor::SetResourceAdaptationQueue( + TaskQueueBase* resource_adaptation_queue) { + RTC_DCHECK(!resource_adaptation_queue_); + RTC_DCHECK(resource_adaptation_queue); + resource_adaptation_queue_ = resource_adaptation_queue; + resource_listener_delegate_->SetResourceAdaptationQueue( + resource_adaptation_queue); + RTC_DCHECK_RUN_ON(resource_adaptation_queue_); } DegradationPreference ResourceAdaptationProcessor::degradation_preference() const { - RTC_DCHECK_RUN_ON(&sequence_checker_); + RTC_DCHECK_RUN_ON(resource_adaptation_queue_); return degradation_preference_; } DegradationPreference ResourceAdaptationProcessor::effective_degradation_preference() const { - RTC_DCHECK_RUN_ON(&sequence_checker_); + RTC_DCHECK_RUN_ON(resource_adaptation_queue_); return effective_degradation_preference_; } void ResourceAdaptationProcessor::StartResourceAdaptation() { - RTC_DCHECK_RUN_ON(&sequence_checker_); + RTC_DCHECK_RUN_ON(resource_adaptation_queue_); if (is_resource_adaptation_enabled_) return; for (const auto& resource : resources_) { - resource->SetResourceListener(this); + resource->SetResourceListener(resource_listener_delegate_); } is_resource_adaptation_enabled_ = true; } void ResourceAdaptationProcessor::StopResourceAdaptation() { - RTC_DCHECK_RUN_ON(&sequence_checker_); + RTC_DCHECK_RUN_ON(resource_adaptation_queue_); if (!is_resource_adaptation_enabled_) return; for (const auto& resource : resources_) { @@ -103,7 +145,7 @@ void ResourceAdaptationProcessor::StopResourceAdaptation() { void ResourceAdaptationProcessor::AddRestrictionsListener( VideoSourceRestrictionsListener* restrictions_listener) { - RTC_DCHECK_RUN_ON(&sequence_checker_); + RTC_DCHECK_RUN_ON(resource_adaptation_queue_); RTC_DCHECK(std::find(restrictions_listeners_.begin(), restrictions_listeners_.end(), restrictions_listener) == restrictions_listeners_.end()); @@ -112,7 +154,7 @@ void ResourceAdaptationProcessor::AddRestrictionsListener( void ResourceAdaptationProcessor::RemoveRestrictionsListener( VideoSourceRestrictionsListener* restrictions_listener) { - RTC_DCHECK_RUN_ON(&sequence_checker_); + RTC_DCHECK_RUN_ON(resource_adaptation_queue_); auto it = std::find(restrictions_listeners_.begin(), restrictions_listeners_.end(), restrictions_listener); RTC_DCHECK(it != restrictions_listeners_.end()); @@ -121,7 +163,7 @@ void ResourceAdaptationProcessor::RemoveRestrictionsListener( void ResourceAdaptationProcessor::AddResource( rtc::scoped_refptr resource) { - RTC_DCHECK_RUN_ON(&sequence_checker_); + RTC_DCHECK_RUN_ON(resource_adaptation_queue_); // TODO(hbos): Allow adding resources while |is_resource_adaptation_enabled_| // by registering as a listener of the resource on adding it. RTC_DCHECK(!is_resource_adaptation_enabled_); @@ -132,7 +174,7 @@ void ResourceAdaptationProcessor::AddResource( void ResourceAdaptationProcessor::RemoveResource( rtc::scoped_refptr resource) { - RTC_DCHECK_RUN_ON(&sequence_checker_); + RTC_DCHECK_RUN_ON(resource_adaptation_queue_); // TODO(hbos): Allow removing resources while // |is_resource_adaptation_enabled_| by unregistering as a listener of the // resource on removing it. @@ -144,7 +186,7 @@ void ResourceAdaptationProcessor::RemoveResource( void ResourceAdaptationProcessor::AddAdaptationConstraint( AdaptationConstraint* adaptation_constraint) { - RTC_DCHECK_RUN_ON(&sequence_checker_); + RTC_DCHECK_RUN_ON(resource_adaptation_queue_); RTC_DCHECK(std::find(adaptation_constraints_.begin(), adaptation_constraints_.end(), adaptation_constraint) == adaptation_constraints_.end()); @@ -153,7 +195,7 @@ void ResourceAdaptationProcessor::AddAdaptationConstraint( void ResourceAdaptationProcessor::RemoveAdaptationConstraint( AdaptationConstraint* adaptation_constraint) { - RTC_DCHECK_RUN_ON(&sequence_checker_); + RTC_DCHECK_RUN_ON(resource_adaptation_queue_); auto it = std::find(adaptation_constraints_.begin(), adaptation_constraints_.end(), adaptation_constraint); RTC_DCHECK(it != adaptation_constraints_.end()); @@ -162,7 +204,7 @@ void ResourceAdaptationProcessor::RemoveAdaptationConstraint( void ResourceAdaptationProcessor::AddAdaptationListener( AdaptationListener* adaptation_listener) { - RTC_DCHECK_RUN_ON(&sequence_checker_); + RTC_DCHECK_RUN_ON(resource_adaptation_queue_); RTC_DCHECK(std::find(adaptation_listeners_.begin(), adaptation_listeners_.end(), adaptation_listener) == adaptation_listeners_.end()); @@ -171,7 +213,7 @@ void ResourceAdaptationProcessor::AddAdaptationListener( void ResourceAdaptationProcessor::RemoveAdaptationListener( AdaptationListener* adaptation_listener) { - RTC_DCHECK_RUN_ON(&sequence_checker_); + RTC_DCHECK_RUN_ON(resource_adaptation_queue_); auto it = std::find(adaptation_listeners_.begin(), adaptation_listeners_.end(), adaptation_listener); RTC_DCHECK(it != adaptation_listeners_.end()); @@ -180,19 +222,19 @@ void ResourceAdaptationProcessor::RemoveAdaptationListener( void ResourceAdaptationProcessor::SetDegradationPreference( DegradationPreference degradation_preference) { - RTC_DCHECK_RUN_ON(&sequence_checker_); + RTC_DCHECK_RUN_ON(resource_adaptation_queue_); degradation_preference_ = degradation_preference; MaybeUpdateEffectiveDegradationPreference(); } void ResourceAdaptationProcessor::SetIsScreenshare(bool is_screenshare) { - RTC_DCHECK_RUN_ON(&sequence_checker_); + RTC_DCHECK_RUN_ON(resource_adaptation_queue_); is_screenshare_ = is_screenshare; MaybeUpdateEffectiveDegradationPreference(); } void ResourceAdaptationProcessor::MaybeUpdateEffectiveDegradationPreference() { - RTC_DCHECK_RUN_ON(&sequence_checker_); + RTC_DCHECK_RUN_ON(resource_adaptation_queue_); effective_degradation_preference_ = (is_screenshare_ && degradation_preference_ == DegradationPreference::BALANCED) @@ -203,7 +245,7 @@ void ResourceAdaptationProcessor::MaybeUpdateEffectiveDegradationPreference() { } void ResourceAdaptationProcessor::ResetVideoSourceRestrictions() { - RTC_DCHECK_RUN_ON(&sequence_checker_); + RTC_DCHECK_RUN_ON(resource_adaptation_queue_); RTC_LOG(INFO) << "Resetting restrictions"; stream_adapter_->ClearRestrictions(); adaptations_counts_by_resource_.clear(); @@ -212,7 +254,7 @@ void ResourceAdaptationProcessor::ResetVideoSourceRestrictions() { void ResourceAdaptationProcessor::MaybeUpdateVideoSourceRestrictions( rtc::scoped_refptr reason) { - RTC_DCHECK_RUN_ON(&sequence_checker_); + RTC_DCHECK_RUN_ON(resource_adaptation_queue_); VideoSourceRestrictions new_source_restrictions = FilterRestrictionsByDegradationPreference( stream_adapter_->source_restrictions(), @@ -235,10 +277,9 @@ void ResourceAdaptationProcessor::MaybeUpdateVideoSourceRestrictions( } void ResourceAdaptationProcessor::OnResourceUsageStateMeasured( - rtc::scoped_refptr resource) { - RTC_DCHECK_RUN_ON(&sequence_checker_); - RTC_DCHECK(resource->UsageState().has_value()); - ResourceUsageState usage_state = resource->UsageState().value(); + rtc::scoped_refptr resource, + ResourceUsageState usage_state) { + RTC_DCHECK_RUN_ON(resource_adaptation_queue_); MitigationResultAndLogMessage result_and_message; switch (usage_state) { case ResourceUsageState::kOveruse: @@ -269,7 +310,7 @@ void ResourceAdaptationProcessor::OnResourceUsageStateMeasured( bool ResourceAdaptationProcessor::HasSufficientInputForAdaptation( const VideoStreamInputState& input_state) const { - RTC_DCHECK_RUN_ON(&sequence_checker_); + RTC_DCHECK_RUN_ON(resource_adaptation_queue_); return input_state.HasInputFrameSizeAndFramesPerSecond() && (effective_degradation_preference_ != DegradationPreference::MAINTAIN_RESOLUTION || @@ -279,16 +320,9 @@ bool ResourceAdaptationProcessor::HasSufficientInputForAdaptation( ResourceAdaptationProcessor::MitigationResultAndLogMessage ResourceAdaptationProcessor::OnResourceUnderuse( rtc::scoped_refptr reason_resource) { - RTC_DCHECK_RUN_ON(&sequence_checker_); + RTC_DCHECK_RUN_ON(resource_adaptation_queue_); RTC_DCHECK(!processing_in_progress_); processing_in_progress_ = true; - // Clear all usage states. In order to re-run adaptation logic, resources need - // to provide new resource usage measurements. - // TODO(hbos): Support not unconditionally clearing usage states by having the - // ResourceAdaptationProcessor check in on its resources at certain intervals. - for (const auto& resource : resources_) { - resource->ClearUsageState(); - } if (effective_degradation_preference_ == DegradationPreference::DISABLED) { processing_in_progress_ = false; return MitigationResultAndLogMessage( @@ -358,16 +392,9 @@ ResourceAdaptationProcessor::OnResourceUnderuse( ResourceAdaptationProcessor::MitigationResultAndLogMessage ResourceAdaptationProcessor::OnResourceOveruse( rtc::scoped_refptr reason_resource) { - RTC_DCHECK_RUN_ON(&sequence_checker_); + RTC_DCHECK_RUN_ON(resource_adaptation_queue_); RTC_DCHECK(!processing_in_progress_); processing_in_progress_ = true; - // Clear all usage states. In order to re-run adaptation logic, resources need - // to provide new resource usage measurements. - // TODO(hbos): Support not unconditionally clearing usage states by having the - // ResourceAdaptationProcessor check in on its resources at certain intervals. - for (const auto& resource : resources_) { - resource->ClearUsageState(); - } if (effective_degradation_preference_ == DegradationPreference::DISABLED) { processing_in_progress_ = false; return MitigationResultAndLogMessage( @@ -419,7 +446,7 @@ ResourceAdaptationProcessor::OnResourceOveruse( void ResourceAdaptationProcessor::TriggerAdaptationDueToFrameDroppedDueToSize( rtc::scoped_refptr reason_resource) { - RTC_DCHECK_RUN_ON(&sequence_checker_); + RTC_DCHECK_RUN_ON(resource_adaptation_queue_); RTC_LOG(INFO) << "TriggerAdaptationDueToFrameDroppedDueToSize called"; VideoAdaptationCounters counters_before = stream_adapter_->adaptation_counters(); @@ -439,7 +466,7 @@ void ResourceAdaptationProcessor::TriggerAdaptationDueToFrameDroppedDueToSize( void ResourceAdaptationProcessor::UpdateResourceDegradationCounts( rtc::scoped_refptr resource) { - RTC_DCHECK_RUN_ON(&sequence_checker_); + RTC_DCHECK_RUN_ON(resource_adaptation_queue_); RTC_DCHECK(resource); int delta = stream_adapter_->adaptation_counters().Total(); for (const auto& adaptations : adaptations_counts_by_resource_) { @@ -453,7 +480,7 @@ void ResourceAdaptationProcessor::UpdateResourceDegradationCounts( bool ResourceAdaptationProcessor::IsResourceAllowedToAdaptUp( rtc::scoped_refptr resource) const { - RTC_DCHECK_RUN_ON(&sequence_checker_); + RTC_DCHECK_RUN_ON(resource_adaptation_queue_); RTC_DCHECK(resource); const auto& adaptations = adaptations_counts_by_resource_.find(resource); return adaptations != adaptations_counts_by_resource_.end() && diff --git a/call/adaptation/resource_adaptation_processor.h b/call/adaptation/resource_adaptation_processor.h index 7988439002..f052993df1 100644 --- a/call/adaptation/resource_adaptation_processor.h +++ b/call/adaptation/resource_adaptation_processor.h @@ -19,6 +19,7 @@ #include "absl/types/optional.h" #include "api/rtp_parameters.h" #include "api/scoped_refptr.h" +#include "api/task_queue/task_queue_base.h" #include "api/video/video_frame.h" #include "api/video/video_stream_encoder_observer.h" #include "call/adaptation/adaptation_constraint.h" @@ -29,7 +30,6 @@ #include "call/adaptation/video_stream_adapter.h" #include "call/adaptation/video_stream_input_state.h" #include "call/adaptation/video_stream_input_state_provider.h" -#include "rtc_base/synchronization/sequence_checker.h" namespace webrtc { @@ -57,7 +57,8 @@ class ResourceAdaptationProcessor : public ResourceAdaptationProcessorInterface, VideoStreamEncoderObserver* encoder_stats_observer); ~ResourceAdaptationProcessor() override; - void InitializeOnResourceAdaptationQueue() override; + void SetResourceAdaptationQueue( + TaskQueueBase* resource_adaptation_queue) override; // ResourceAdaptationProcessorInterface implementation. DegradationPreference degradation_preference() const override; @@ -86,8 +87,8 @@ class ResourceAdaptationProcessor : public ResourceAdaptationProcessorInterface, // ResourceListener implementation. // Triggers OnResourceUnderuse() or OnResourceOveruse(). - void OnResourceUsageStateMeasured( - rtc::scoped_refptr resource) override; + void OnResourceUsageStateMeasured(rtc::scoped_refptr resource, + ResourceUsageState usage_state) override; // May trigger 1-2 adaptations. It is meant to reduce resolution but this is // not guaranteed. It may adapt frame rate, which does not address the issue. @@ -99,6 +100,27 @@ class ResourceAdaptationProcessor : public ResourceAdaptationProcessorInterface, bool HasSufficientInputForAdaptation( const VideoStreamInputState& input_state) const; + // If resource usage measurements happens off the adaptation task queue, this + // class takes care of posting the measurement for the processor to handle it + // on the adaptation task queue. + class ResourceListenerDelegate : public rtc::RefCountInterface, + public ResourceListener { + public: + explicit ResourceListenerDelegate(ResourceAdaptationProcessor* processor); + + void SetResourceAdaptationQueue(TaskQueueBase* resource_adaptation_queue); + void OnProcessorDestroyed(); + + // ResourceListener implementation. + void OnResourceUsageStateMeasured(rtc::scoped_refptr resource, + ResourceUsageState usage_state) override; + + private: + TaskQueueBase* resource_adaptation_queue_; + ResourceAdaptationProcessor* processor_ + RTC_GUARDED_BY(resource_adaptation_queue_); + }; + enum class MitigationResult { kDisabled, kInsufficientInput, @@ -141,39 +163,41 @@ class ResourceAdaptationProcessor : public ResourceAdaptationProcessorInterface, // restrictions rather than just the counters. bool IsResourceAllowedToAdaptUp(rtc::scoped_refptr resource) const; - webrtc::SequenceChecker sequence_checker_; - bool is_resource_adaptation_enabled_ RTC_GUARDED_BY(sequence_checker_); + TaskQueueBase* resource_adaptation_queue_; + rtc::scoped_refptr resource_listener_delegate_; + bool is_resource_adaptation_enabled_ + RTC_GUARDED_BY(resource_adaptation_queue_); // Input and output. VideoStreamInputStateProvider* const input_state_provider_ - RTC_GUARDED_BY(sequence_checker_); + RTC_GUARDED_BY(resource_adaptation_queue_); VideoStreamEncoderObserver* const encoder_stats_observer_ - RTC_GUARDED_BY(sequence_checker_); + RTC_GUARDED_BY(resource_adaptation_queue_); std::vector restrictions_listeners_ - RTC_GUARDED_BY(sequence_checker_); + RTC_GUARDED_BY(resource_adaptation_queue_); std::vector> resources_ - RTC_GUARDED_BY(sequence_checker_); + RTC_GUARDED_BY(resource_adaptation_queue_); std::vector adaptation_constraints_ - RTC_GUARDED_BY(sequence_checker_); + RTC_GUARDED_BY(resource_adaptation_queue_); std::vector adaptation_listeners_ - RTC_GUARDED_BY(sequence_checker_); + RTC_GUARDED_BY(resource_adaptation_queue_); // Purely used for statistics, does not ensure mapped resources stay alive. std::map adaptations_counts_by_resource_ - RTC_GUARDED_BY(sequence_checker_); + RTC_GUARDED_BY(resource_adaptation_queue_); // Adaptation strategy settings. DegradationPreference degradation_preference_ - RTC_GUARDED_BY(sequence_checker_); + RTC_GUARDED_BY(resource_adaptation_queue_); DegradationPreference effective_degradation_preference_ - RTC_GUARDED_BY(sequence_checker_); - bool is_screenshare_ RTC_GUARDED_BY(sequence_checker_); + RTC_GUARDED_BY(resource_adaptation_queue_); + bool is_screenshare_ RTC_GUARDED_BY(resource_adaptation_queue_); // Responsible for generating and applying possible adaptations. const std::unique_ptr stream_adapter_ - RTC_GUARDED_BY(sequence_checker_); + RTC_GUARDED_BY(resource_adaptation_queue_); VideoSourceRestrictions last_reported_source_restrictions_ - RTC_GUARDED_BY(sequence_checker_); + RTC_GUARDED_BY(resource_adaptation_queue_); // Keeps track of previous mitigation results per resource since the last // successful adaptation. Used to avoid RTC_LOG spam. std::map previous_mitigation_results_ - RTC_GUARDED_BY(sequence_checker_); + RTC_GUARDED_BY(resource_adaptation_queue_); // Prevents recursion. // // This is used to prevent triggering resource adaptation in the process of @@ -185,7 +209,7 @@ class ResourceAdaptationProcessor : public ResourceAdaptationProcessorInterface, // Resource::OnAdaptationApplied() -> // Resource::OnResourceUsageStateMeasured() -> // ResourceAdaptationProcessor::OnResourceOveruse() // Boom, not allowed. - bool processing_in_progress_ RTC_GUARDED_BY(sequence_checker_); + bool processing_in_progress_ RTC_GUARDED_BY(resource_adaptation_queue_); }; } // namespace webrtc diff --git a/call/adaptation/resource_adaptation_processor_interface.h b/call/adaptation/resource_adaptation_processor_interface.h index 8dafefaf2c..d482409ba9 100644 --- a/call/adaptation/resource_adaptation_processor_interface.h +++ b/call/adaptation/resource_adaptation_processor_interface.h @@ -14,6 +14,7 @@ #include "absl/types/optional.h" #include "api/rtp_parameters.h" #include "api/scoped_refptr.h" +#include "api/task_queue/task_queue_base.h" #include "api/video/video_adaptation_counters.h" #include "api/video/video_frame.h" #include "call/adaptation/adaptation_constraint.h" @@ -21,7 +22,6 @@ #include "call/adaptation/encoder_settings.h" #include "call/adaptation/resource.h" #include "call/adaptation/video_source_restrictions.h" -#include "rtc_base/task_queue.h" namespace webrtc { @@ -48,7 +48,8 @@ class ResourceAdaptationProcessorInterface { public: virtual ~ResourceAdaptationProcessorInterface(); - virtual void InitializeOnResourceAdaptationQueue() = 0; + virtual void SetResourceAdaptationQueue( + TaskQueueBase* resource_adaptation_queue) = 0; virtual DegradationPreference degradation_preference() const = 0; // Reinterprets "balanced + screenshare" as "maintain-resolution". diff --git a/call/adaptation/resource_adaptation_processor_unittest.cc b/call/adaptation/resource_adaptation_processor_unittest.cc index 6ff24b165f..6cb11ce6ac 100644 --- a/call/adaptation/resource_adaptation_processor_unittest.cc +++ b/call/adaptation/resource_adaptation_processor_unittest.cc @@ -20,7 +20,9 @@ #include "call/adaptation/test/fake_resource.h" #include "call/adaptation/video_source_restrictions.h" #include "call/adaptation/video_stream_input_state_provider.h" +#include "rtc_base/critical_section.h" #include "rtc_base/event.h" +#include "rtc_base/gunit.h" #include "rtc_base/task_queue_for_test.h" #include "test/gtest.h" @@ -30,6 +32,7 @@ namespace { const int kDefaultFrameRate = 30; const int kDefaultFrameSize = 1280 * 720; +const int kDefaultTimeoutMs = 5000; class VideoSourceRestrictionsListenerForTesting : public VideoSourceRestrictionsListener { @@ -42,19 +45,28 @@ class VideoSourceRestrictionsListenerForTesting ~VideoSourceRestrictionsListenerForTesting() override {} size_t restrictions_updated_count() const { + rtc::CritScope crit(&lock_); return restrictions_updated_count_; } - const VideoSourceRestrictions& restrictions() const { return restrictions_; } - const VideoAdaptationCounters& adaptation_counters() const { + VideoSourceRestrictions restrictions() const { + rtc::CritScope crit(&lock_); + return restrictions_; + } + VideoAdaptationCounters adaptation_counters() const { + rtc::CritScope crit(&lock_); return adaptation_counters_; } - rtc::scoped_refptr reason() const { return reason_; } + rtc::scoped_refptr reason() const { + rtc::CritScope crit(&lock_); + return reason_; + } // VideoSourceRestrictionsListener implementation. void OnVideoSourceRestrictionsUpdated( VideoSourceRestrictions restrictions, const VideoAdaptationCounters& adaptation_counters, rtc::scoped_refptr reason) override { + rtc::CritScope crit(&lock_); ++restrictions_updated_count_; restrictions_ = restrictions; adaptation_counters_ = adaptation_counters; @@ -62,10 +74,11 @@ class VideoSourceRestrictionsListenerForTesting } private: - size_t restrictions_updated_count_; - VideoSourceRestrictions restrictions_; - VideoAdaptationCounters adaptation_counters_; - rtc::scoped_refptr reason_; + rtc::CriticalSection lock_; + size_t restrictions_updated_count_ RTC_GUARDED_BY(lock_); + VideoSourceRestrictions restrictions_ RTC_GUARDED_BY(lock_); + VideoAdaptationCounters adaptation_counters_ RTC_GUARDED_BY(lock_); + rtc::scoped_refptr reason_ RTC_GUARDED_BY(lock_); }; class ResourceAdaptationProcessorTest : public ::testing::Test { @@ -81,31 +94,26 @@ class ResourceAdaptationProcessorTest : public ::testing::Test { processor_(std::make_unique( &input_state_provider_, /*encoder_stats_observer=*/&frame_rate_provider_)) { - rtc::Event event; - resource_adaptation_queue_.PostTask([this, &event] { - processor_->InitializeOnResourceAdaptationQueue(); - processor_->AddRestrictionsListener(&restrictions_listener_); - processor_->AddResource(resource_); - processor_->AddResource(other_resource_); - processor_->AddAdaptationConstraint(&adaptation_constraint_); - processor_->AddAdaptationListener(&adaptation_listener_); - event.Set(); - }); - event.Wait(rtc::Event::kForever); + resource_adaptation_queue_.SendTask( + [this] { + processor_->SetResourceAdaptationQueue( + resource_adaptation_queue_.Get()); + processor_->AddRestrictionsListener(&restrictions_listener_); + processor_->AddResource(resource_); + processor_->AddResource(other_resource_); + processor_->AddAdaptationConstraint(&adaptation_constraint_); + processor_->AddAdaptationListener(&adaptation_listener_); + }, + RTC_FROM_HERE); } ~ResourceAdaptationProcessorTest() override { - rtc::Event event; - resource_adaptation_queue_.PostTask([this, &event] { - processor_->StopResourceAdaptation(); - processor_->RemoveRestrictionsListener(&restrictions_listener_); - processor_->RemoveResource(resource_); - processor_->RemoveResource(other_resource_); - processor_->RemoveAdaptationConstraint(&adaptation_constraint_); - processor_->RemoveAdaptationListener(&adaptation_listener_); - processor_.reset(); - event.Set(); - }); - event.Wait(rtc::Event::kForever); + resource_adaptation_queue_.SendTask( + [this] { + if (processor_) { + DestroyProcessor(); + } + }, + RTC_FROM_HERE); } void SetInputStates(bool has_input, int fps, int frame_size) { @@ -122,6 +130,17 @@ class ResourceAdaptationProcessorTest : public ::testing::Test { : restrictions.max_pixels_per_frame().value_or(kDefaultFrameSize)); } + void DestroyProcessor() { + RTC_DCHECK_RUN_ON(&resource_adaptation_queue_); + processor_->StopResourceAdaptation(); + processor_->RemoveRestrictionsListener(&restrictions_listener_); + processor_->RemoveResource(resource_); + processor_->RemoveResource(other_resource_); + processor_->RemoveAdaptationConstraint(&adaptation_constraint_); + processor_->RemoveAdaptationListener(&adaptation_listener_); + processor_.reset(); + } + protected: TaskQueueForTest resource_adaptation_queue_; FakeFrameRateProvider frame_rate_provider_; @@ -394,33 +413,6 @@ TEST_F(ResourceAdaptationProcessorTest, AdaptingTriggersOnAdaptationApplied) { RTC_FROM_HERE); } -TEST_F(ResourceAdaptationProcessorTest, AdaptingClearsResourceUsageState) { - resource_adaptation_queue_.SendTask( - [this] { - processor_->SetDegradationPreference( - DegradationPreference::MAINTAIN_FRAMERATE); - processor_->StartResourceAdaptation(); - SetInputStates(true, kDefaultFrameRate, kDefaultFrameSize); - resource_->SetUsageState(ResourceUsageState::kOveruse); - EXPECT_EQ(1u, restrictions_listener_.restrictions_updated_count()); - EXPECT_FALSE(resource_->UsageState().has_value()); - }, - RTC_FROM_HERE); -} - -TEST_F(ResourceAdaptationProcessorTest, - FailingAdaptingAlsoClearsResourceUsageState) { - resource_adaptation_queue_.SendTask( - [this] { - processor_->SetDegradationPreference(DegradationPreference::DISABLED); - processor_->StartResourceAdaptation(); - resource_->SetUsageState(ResourceUsageState::kOveruse); - EXPECT_EQ(0u, restrictions_listener_.restrictions_updated_count()); - EXPECT_FALSE(resource_->UsageState().has_value()); - }, - RTC_FROM_HERE); -} - TEST_F(ResourceAdaptationProcessorTest, AdaptsDownWhenOtherResourceIsAlwaysUnderused) { resource_adaptation_queue_.SendTask( @@ -447,4 +439,46 @@ TEST_F(ResourceAdaptationProcessorTest, RTC_FROM_HERE); } +TEST_F(ResourceAdaptationProcessorTest, + TriggerOveruseNotOnAdaptationTaskQueue) { + resource_adaptation_queue_.SendTask( + [this] { + processor_->SetDegradationPreference( + DegradationPreference::MAINTAIN_FRAMERATE); + processor_->StartResourceAdaptation(); + SetInputStates(true, kDefaultFrameRate, kDefaultFrameSize); + }, + RTC_FROM_HERE); + resource_->SetUsageState(ResourceUsageState::kOveruse); + EXPECT_EQ_WAIT(1u, restrictions_listener_.restrictions_updated_count(), + kDefaultTimeoutMs); +} + +TEST_F(ResourceAdaptationProcessorTest, + DestroyProcessorWhileResourceListenerDelegateHasTaskInFlight) { + resource_adaptation_queue_.SendTask( + [this] { + processor_->SetDegradationPreference( + DegradationPreference::MAINTAIN_FRAMERATE); + processor_->StartResourceAdaptation(); + SetInputStates(true, kDefaultFrameRate, kDefaultFrameSize); + }, + RTC_FROM_HERE); + // Block the destruction of the processor. This ensures that the adaptation + // queue is blocked until the ResourceListenerDelegate has had time to post + // its task. + rtc::Event destroy_processor_event; + resource_adaptation_queue_.PostTask([this, &destroy_processor_event] { + destroy_processor_event.Wait(rtc::Event::kForever); + DestroyProcessor(); + }); + resource_->SetUsageState(ResourceUsageState::kOveruse); + // Unblock destruction and delegate task. + destroy_processor_event.Set(); + resource_adaptation_queue_.WaitForPreviouslyPostedTasks(); + // Because the processor was destroyed by the time the delegate's task ran, + // the overuse signal must not have been handled. + EXPECT_EQ(0u, restrictions_listener_.restrictions_updated_count()); +} + } // namespace webrtc diff --git a/call/adaptation/resource_unittest.cc b/call/adaptation/resource_unittest.cc index afa32f0879..49b932420f 100644 --- a/call/adaptation/resource_unittest.cc +++ b/call/adaptation/resource_unittest.cc @@ -26,7 +26,8 @@ class MockResourceListener : public ResourceListener { public: MOCK_METHOD(void, OnResourceUsageStateMeasured, - (rtc::scoped_refptr resource), + (rtc::scoped_refptr resource, + ResourceUsageState usage_state), (override)); }; @@ -41,10 +42,11 @@ class ResourceTest : public ::testing::Test { TEST_F(ResourceTest, RegisteringListenerReceivesCallbacks) { StrictMock resource_listener; fake_resource_->SetResourceListener(&resource_listener); - EXPECT_CALL(resource_listener, OnResourceUsageStateMeasured(_)) + EXPECT_CALL(resource_listener, OnResourceUsageStateMeasured(_, _)) .Times(1) - .WillOnce([](rtc::scoped_refptr resource) { - EXPECT_EQ(ResourceUsageState::kOveruse, resource->UsageState()); + .WillOnce([](rtc::scoped_refptr resource, + ResourceUsageState usage_state) { + EXPECT_EQ(ResourceUsageState::kOveruse, usage_state); }); fake_resource_->SetUsageState(ResourceUsageState::kOveruse); fake_resource_->SetResourceListener(nullptr); @@ -54,7 +56,7 @@ TEST_F(ResourceTest, UnregisteringListenerStopsCallbacks) { StrictMock resource_listener; fake_resource_->SetResourceListener(&resource_listener); fake_resource_->SetResourceListener(nullptr); - EXPECT_CALL(resource_listener, OnResourceUsageStateMeasured(_)).Times(0); + EXPECT_CALL(resource_listener, OnResourceUsageStateMeasured(_, _)).Times(0); fake_resource_->SetUsageState(ResourceUsageState::kOveruse); } diff --git a/call/adaptation/test/fake_resource.cc b/call/adaptation/test/fake_resource.cc index 113f4b5450..fa69e886bf 100644 --- a/call/adaptation/test/fake_resource.cc +++ b/call/adaptation/test/fake_resource.cc @@ -23,17 +23,13 @@ rtc::scoped_refptr FakeResource::Create(std::string name) { } FakeResource::FakeResource(std::string name) - : Resource(), - name_(std::move(name)), - listener_(nullptr), - usage_state_(absl::nullopt) {} + : Resource(), name_(std::move(name)), listener_(nullptr) {} FakeResource::~FakeResource() {} void FakeResource::SetUsageState(ResourceUsageState usage_state) { - usage_state_ = usage_state; if (listener_) { - listener_->OnResourceUsageStateMeasured(this); + listener_->OnResourceUsageStateMeasured(this, usage_state); } } @@ -45,12 +41,4 @@ void FakeResource::SetResourceListener(ResourceListener* listener) { listener_ = listener; } -absl::optional FakeResource::UsageState() const { - return usage_state_; -} - -void FakeResource::ClearUsageState() { - usage_state_ = absl::nullopt; -} - } // namespace webrtc diff --git a/call/adaptation/test/fake_resource.h b/call/adaptation/test/fake_resource.h index c67dc3af3d..0f42c51351 100644 --- a/call/adaptation/test/fake_resource.h +++ b/call/adaptation/test/fake_resource.h @@ -33,13 +33,10 @@ class FakeResource : public Resource { // Resource implementation. std::string Name() const override; void SetResourceListener(ResourceListener* listener) override; - absl::optional UsageState() const override; - void ClearUsageState() override; private: const std::string name_; ResourceListener* listener_; - absl::optional usage_state_; }; } // namespace webrtc diff --git a/video/adaptation/video_stream_encoder_resource.cc b/video/adaptation/video_stream_encoder_resource.cc index 9a2db1fb87..4e99a1dbb3 100644 --- a/video/adaptation/video_stream_encoder_resource.cc +++ b/video/adaptation/video_stream_encoder_resource.cc @@ -20,7 +20,6 @@ VideoStreamEncoderResource::VideoStreamEncoderResource(std::string name) name_(std::move(name)), encoder_queue_(nullptr), resource_adaptation_queue_(nullptr), - usage_state_(absl::nullopt), listener_(nullptr) {} VideoStreamEncoderResource::~VideoStreamEncoderResource() { @@ -64,23 +63,11 @@ std::string VideoStreamEncoderResource::Name() const { return name_; } -absl::optional VideoStreamEncoderResource::UsageState() - const { - RTC_DCHECK_RUN_ON(resource_adaptation_queue()); - return usage_state_; -} - -void VideoStreamEncoderResource::ClearUsageState() { - RTC_DCHECK_RUN_ON(resource_adaptation_queue()); - usage_state_ = absl::nullopt; -} - void VideoStreamEncoderResource::OnResourceUsageStateMeasured( ResourceUsageState usage_state) { RTC_DCHECK_RUN_ON(resource_adaptation_queue()); - usage_state_ = usage_state; if (listener_) { - listener_->OnResourceUsageStateMeasured(this); + listener_->OnResourceUsageStateMeasured(this, usage_state); } } diff --git a/video/adaptation/video_stream_encoder_resource.h b/video/adaptation/video_stream_encoder_resource.h index f561a63ce4..0802c5cd61 100644 --- a/video/adaptation/video_stream_encoder_resource.h +++ b/video/adaptation/video_stream_encoder_resource.h @@ -34,8 +34,6 @@ class VideoStreamEncoderResource : public Resource { // Resource implementation. std::string Name() const override; void SetResourceListener(ResourceListener* listener) override; - absl::optional UsageState() const override; - void ClearUsageState() override; // Provides a pointer to the adaptation task queue. After this call, all // methods defined in this interface, including @@ -74,8 +72,6 @@ class VideoStreamEncoderResource : public Resource { // Treated as const after initialization. TaskQueueBase* encoder_queue_; TaskQueueBase* resource_adaptation_queue_ RTC_GUARDED_BY(lock_); - absl::optional usage_state_ - RTC_GUARDED_BY(resource_adaptation_queue()); ResourceListener* listener_ RTC_GUARDED_BY(resource_adaptation_queue()); }; diff --git a/video/video_stream_encoder.cc b/video/video_stream_encoder.cc index 7014138091..23569bea27 100644 --- a/video/video_stream_encoder.cc +++ b/video/video_stream_encoder.cc @@ -284,7 +284,8 @@ VideoStreamEncoder::VideoStreamEncoder( rtc::Event initialize_processor_event; resource_adaptation_queue_.PostTask([this, &initialize_processor_event] { RTC_DCHECK_RUN_ON(&resource_adaptation_queue_); - resource_adaptation_processor_->InitializeOnResourceAdaptationQueue(); + resource_adaptation_processor_->SetResourceAdaptationQueue( + resource_adaptation_queue_.Get()); stream_resource_manager_.SetAdaptationProcessor( resource_adaptation_processor_.get()); resource_adaptation_processor_->AddRestrictionsListener(