From a0a4480f12612853bd7b87f3346abfcc36cda590 Mon Sep 17 00:00:00 2001 From: Tommi Date: Wed, 13 May 2020 18:27:26 +0200 Subject: [PATCH] Migrate CallStats and RtpStreamsSynchronizer timers over to RepeatingTask Bug: none Change-Id: Ib49a3de74c6d3a6d4ea158383a5e4b69a1e58ab9 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/175000 Reviewed-by: Sebastian Jansson Commit-Queue: Tommi Cr-Commit-Position: refs/heads/master@{#31252} --- rtc_base/task_utils/repeating_task.cc | 8 +++-- rtc_base/task_utils/repeating_task.h | 18 +++++++---- video/call_stats2.cc | 29 +++++++----------- video/call_stats2.h | 12 ++++---- video/call_stats2_unittest.cc | 11 +++---- video/rtp_streams_synchronizer2.cc | 43 ++++++++------------------- video/rtp_streams_synchronizer2.h | 9 ++---- video/video_stream_encoder.h | 2 +- 8 files changed, 55 insertions(+), 77 deletions(-) diff --git a/rtc_base/task_utils/repeating_task.cc b/rtc_base/task_utils/repeating_task.cc index 4e460bb082..71911e6982 100644 --- a/rtc_base/task_utils/repeating_task.cc +++ b/rtc_base/task_utils/repeating_task.cc @@ -20,12 +20,14 @@ namespace webrtc_repeating_task_impl { RepeatingTaskBase::RepeatingTaskBase(TaskQueueBase* task_queue, TimeDelta first_delay) : task_queue_(task_queue), - next_run_time_(Timestamp::Micros(rtc::TimeMicros()) + first_delay) {} + next_run_time_(Timestamp::Micros(rtc::TimeMicros()) + first_delay) { + sequence_checker_.Detach(); +} RepeatingTaskBase::~RepeatingTaskBase() = default; bool RepeatingTaskBase::Run() { - RTC_DCHECK_RUN_ON(task_queue_); + RTC_DCHECK_RUN_ON(&sequence_checker_); // Return true to tell the TaskQueue to destruct this object. if (next_run_time_.IsPlusInfinity()) return true; @@ -51,6 +53,7 @@ bool RepeatingTaskBase::Run() { } void RepeatingTaskBase::Stop() { + RTC_DCHECK_RUN_ON(&sequence_checker_); RTC_DCHECK(next_run_time_.IsFinite()); next_run_time_ = Timestamp::PlusInfinity(); } @@ -75,7 +78,6 @@ RepeatingTaskHandle::RepeatingTaskHandle( void RepeatingTaskHandle::Stop() { if (repeating_task_) { - RTC_DCHECK_RUN_ON(repeating_task_->task_queue_); repeating_task_->Stop(); repeating_task_ = nullptr; } diff --git a/rtc_base/task_utils/repeating_task.h b/rtc_base/task_utils/repeating_task.h index 1545d6f757..75d03bfe5e 100644 --- a/rtc_base/task_utils/repeating_task.h +++ b/rtc_base/task_utils/repeating_task.h @@ -20,7 +20,6 @@ #include "api/units/time_delta.h" #include "api/units/timestamp.h" #include "rtc_base/synchronization/sequence_checker.h" -#include "rtc_base/thread_checker.h" namespace webrtc { @@ -31,18 +30,25 @@ class RepeatingTaskBase : public QueuedTask { public: RepeatingTaskBase(TaskQueueBase* task_queue, TimeDelta first_delay); ~RepeatingTaskBase() override; - virtual TimeDelta RunClosure() = 0; + + void Stop(); private: - friend class ::webrtc::RepeatingTaskHandle; + virtual TimeDelta RunClosure() = 0; bool Run() final; - void Stop() RTC_RUN_ON(task_queue_); TaskQueueBase* const task_queue_; // This is always finite, except for the special case where it's PlusInfinity // to signal that the task should stop. - Timestamp next_run_time_ RTC_GUARDED_BY(task_queue_); + Timestamp next_run_time_ RTC_GUARDED_BY(sequence_checker_); + // We use a SequenceChecker to check for correct usage instead of using + // RTC_DCHECK_RUN_ON(task_queue_). This is to work around a compatibility + // issue with some TQ implementations such as rtc::Thread that don't + // consistently set themselves as the 'current' TQ when running tasks. + // The SequenceChecker detects those implementations differently but gives + // the same effect as far as thread safety goes. + SequenceChecker sequence_checker_; }; // The template closure pattern is based on rtc::ClosureTask. @@ -61,9 +67,9 @@ class RepeatingTaskImpl final : public RepeatingTaskBase { ""); } + private: TimeDelta RunClosure() override { return closure_(); } - private: typename std::remove_const< typename std::remove_reference::type>::type closure_; }; diff --git a/video/call_stats2.cc b/video/call_stats2.cc index ce68127490..d190294c7f 100644 --- a/video/call_stats2.cc +++ b/video/call_stats2.cc @@ -64,9 +64,10 @@ int64_t GetNewAvgRttMs(const std::list& reports, } // namespace +constexpr TimeDelta CallStats::kUpdateInterval; + CallStats::CallStats(Clock* clock, TaskQueueBase* task_queue) : clock_(clock), - last_process_time_(clock_->TimeInMilliseconds()), max_rtt_ms_(-1), avg_rtt_ms_(-1), sum_avg_rtt_ms_(0), @@ -75,39 +76,29 @@ CallStats::CallStats(Clock* clock, TaskQueueBase* task_queue) task_queue_(task_queue) { RTC_DCHECK(task_queue_); process_thread_checker_.Detach(); - task_queue_->PostDelayedTask( - ToQueuedTask(task_safety_, [this]() { RunTimer(); }), kUpdateIntervalMs); + repeating_task_ = + RepeatingTaskHandle::DelayedStart(task_queue_, kUpdateInterval, [this]() { + UpdateAndReport(); + return kUpdateInterval; + }); } CallStats::~CallStats() { RTC_DCHECK_RUN_ON(&construction_thread_checker_); RTC_DCHECK(observers_.empty()); + repeating_task_.Stop(); + UpdateHistograms(); } -void CallStats::RunTimer() { - RTC_DCHECK_RUN_ON(&construction_thread_checker_); - - UpdateAndReport(); - - uint32_t interval = - last_process_time_ + kUpdateIntervalMs - clock_->TimeInMilliseconds(); - - task_queue_->PostDelayedTask( - ToQueuedTask(task_safety_, [this]() { RunTimer(); }), interval); -} - void CallStats::UpdateAndReport() { RTC_DCHECK_RUN_ON(&construction_thread_checker_); - int64_t now = clock_->TimeInMilliseconds(); - last_process_time_ = now; - // |avg_rtt_ms_| is allowed to be read on the construction thread since that's // the only thread that modifies the value. int64_t avg_rtt_ms = avg_rtt_ms_; - RemoveOldReports(now, &reports_); + RemoveOldReports(clock_->CurrentTime().ms(), &reports_); max_rtt_ms_ = GetMaxRttMs(reports_); avg_rtt_ms = GetNewAvgRttMs(reports_, avg_rtt_ms); { diff --git a/video/call_stats2.h b/video/call_stats2.h index 49d2db7d31..8f53358685 100644 --- a/video/call_stats2.h +++ b/video/call_stats2.h @@ -14,6 +14,7 @@ #include #include +#include "api/units/timestamp.h" #include "modules/include/module_common_types.h" #include "modules/rtp_rtcp/include/rtp_rtcp_defines.h" #include "rtc_base/constructor_magic.h" @@ -21,6 +22,7 @@ #include "rtc_base/synchronization/sequence_checker.h" #include "rtc_base/task_queue.h" #include "rtc_base/task_utils/pending_task_safety_flag.h" +#include "rtc_base/task_utils/repeating_task.h" #include "system_wrappers/include/clock.h" namespace webrtc { @@ -29,7 +31,7 @@ namespace internal { class CallStats { public: // Time interval for updating the observers. - static constexpr int64_t kUpdateIntervalMs = 1000; + static constexpr TimeDelta kUpdateInterval = TimeDelta::Millis(1000); CallStats(Clock* clock, TaskQueueBase* task_queue); ~CallStats(); @@ -70,8 +72,6 @@ class CallStats { void OnRttUpdate(int64_t rtt); int64_t LastProcessedRttFromProcessThread() const; - void RunTimer(); - void UpdateAndReport(); // This method must only be called when the process thread is not @@ -102,8 +102,10 @@ class CallStats { Clock* const clock_; - // The last time 'Process' resulted in statistic update. - int64_t last_process_time_ RTC_GUARDED_BY(construction_thread_checker_); + // Used to regularly call UpdateAndReport(). + RepeatingTaskHandle repeating_task_ + RTC_GUARDED_BY(construction_thread_checker_); + // The last RTT in the statistics update (zero if there is no valid estimate). int64_t max_rtt_ms_ RTC_GUARDED_BY(construction_thread_checker_); diff --git a/video/call_stats2_unittest.cc b/video/call_stats2_unittest.cc index 58af6fd386..73fe4b45ca 100644 --- a/video/call_stats2_unittest.cc +++ b/video/call_stats2_unittest.cc @@ -96,12 +96,13 @@ TEST_F(CallStats2Test, ProcessTime) { .Times(2) .WillOnce(InvokeWithoutArgs([this] { // Advance clock and verify we get an update. - fake_clock_.AdvanceTimeMilliseconds(CallStats::kUpdateIntervalMs); + fake_clock_.AdvanceTimeMilliseconds(CallStats::kUpdateInterval.ms()); })) .WillRepeatedly(InvokeWithoutArgs([this] { AsyncSimulateRttUpdate(kRtt2); // Advance clock just too little to get an update. - fake_clock_.AdvanceTimeMilliseconds(CallStats::kUpdateIntervalMs - 1); + fake_clock_.AdvanceTimeMilliseconds(CallStats::kUpdateInterval.ms() - + 1); })); // In case you're reading this and wondering how this number is arrived at, @@ -256,7 +257,7 @@ TEST_F(CallStats2Test, LastProcessedRtt) { .Times(AnyNumber()) .WillOnce(InvokeWithoutArgs([this] { EXPECT_EQ(kAvgRtt1, call_stats_.LastProcessedRtt()); - fake_clock_.AdvanceTimeMilliseconds(CallStats::kUpdateIntervalMs); + fake_clock_.AdvanceTimeMilliseconds(CallStats::kUpdateInterval.ms()); AsyncSimulateRttUpdate(kRttLow); AsyncSimulateRttUpdate(kRttHigh); })) @@ -272,7 +273,7 @@ TEST_F(CallStats2Test, LastProcessedRtt) { // Set a first values and verify that LastProcessedRtt initially returns the // average rtt. - fake_clock_.AdvanceTimeMilliseconds(CallStats::kUpdateIntervalMs); + fake_clock_.AdvanceTimeMilliseconds(CallStats::kUpdateInterval.ms()); AsyncSimulateRttUpdate(kRttLow); loop_.Run(); EXPECT_EQ(kAvgRtt2, call_stats_.LastProcessedRtt()); @@ -292,7 +293,7 @@ TEST_F(CallStats2Test, ProducesHistogramMetrics) { AsyncSimulateRttUpdate(kRtt); loop_.Run(); fake_clock_.AdvanceTimeMilliseconds(metrics::kMinRunTimeInSeconds * - CallStats::kUpdateIntervalMs); + CallStats::kUpdateInterval.ms()); AsyncSimulateRttUpdate(kRtt); loop_.Run(); diff --git a/video/rtp_streams_synchronizer2.cc b/video/rtp_streams_synchronizer2.cc index 7e3bed1467..49be355a38 100644 --- a/video/rtp_streams_synchronizer2.cc +++ b/video/rtp_streams_synchronizer2.cc @@ -23,7 +23,7 @@ namespace internal { namespace { // Time interval for logging stats. constexpr int64_t kStatsLogIntervalMs = 10000; -constexpr uint32_t kSyncIntervalMs = 1000; +constexpr TimeDelta kSyncInterval = TimeDelta::Millis(1000); bool UpdateMeasurements(StreamSynchronization::Measurements* stream, const Syncable::Info& info) { @@ -34,19 +34,20 @@ bool UpdateMeasurements(StreamSynchronization::Measurements* stream, info.capture_time_ntp_secs, info.capture_time_ntp_frac, info.capture_time_source_clock, &new_rtcp_sr); } + } // namespace RtpStreamsSynchronizer::RtpStreamsSynchronizer(TaskQueueBase* main_queue, Syncable* syncable_video) : task_queue_(main_queue), syncable_video_(syncable_video), - last_sync_time_(rtc::TimeNanos()), last_stats_log_ms_(rtc::TimeMillis()) { RTC_DCHECK(syncable_video); } RtpStreamsSynchronizer::~RtpStreamsSynchronizer() { RTC_DCHECK_RUN_ON(&main_checker_); + repeating_task_.Stop(); } void RtpStreamsSynchronizer::ConfigureSync(Syncable* syncable_audio) { @@ -58,52 +59,32 @@ void RtpStreamsSynchronizer::ConfigureSync(Syncable* syncable_audio) { syncable_audio_ = syncable_audio; sync_.reset(nullptr); - if (!syncable_audio_) + if (!syncable_audio_) { + repeating_task_.Stop(); return; + } sync_.reset( new StreamSynchronization(syncable_video_->id(), syncable_audio_->id())); - QueueTimer(); -} -void RtpStreamsSynchronizer::QueueTimer() { - RTC_DCHECK_RUN_ON(&main_checker_); - if (timer_running_) + if (repeating_task_.Running()) return; - timer_running_ = true; - uint32_t delay = kSyncIntervalMs - (rtc::TimeNanos() - last_sync_time_) / - rtc::kNumNanosecsPerMillisec; - if (delay > kSyncIntervalMs) { - // TODO(tommi): |linux_chromium_tsan_rel_ng| bot has shown a failure when - // running WebRtcBrowserTest.CallAndModifyStream, indicating that the - // underlying clock is not reliable. Possibly there's a fake clock being - // used as the tests are flaky. Look into and fix. - RTC_LOG(LS_ERROR) << "Unexpected timer value: " << delay; - delay = kSyncIntervalMs; - } - - RTC_DCHECK_LE(delay, kSyncIntervalMs); - task_queue_->PostDelayedTask(ToQueuedTask(task_safety_, - [this] { - RTC_DCHECK_RUN_ON(&main_checker_); - timer_running_ = false; - UpdateDelay(); - }), - delay); + repeating_task_ = + RepeatingTaskHandle::DelayedStart(task_queue_, kSyncInterval, [this]() { + UpdateDelay(); + return kSyncInterval; + }); } void RtpStreamsSynchronizer::UpdateDelay() { RTC_DCHECK_RUN_ON(&main_checker_); - last_sync_time_ = rtc::TimeNanos(); if (!syncable_audio_) return; RTC_DCHECK(sync_.get()); - QueueTimer(); - bool log_stats = false; const int64_t now_ms = rtc::TimeMillis(); if (now_ms - last_stats_log_ms_ > kStatsLogIntervalMs) { diff --git a/video/rtp_streams_synchronizer2.h b/video/rtp_streams_synchronizer2.h index 83dd0fb6f2..6a522e801d 100644 --- a/video/rtp_streams_synchronizer2.h +++ b/video/rtp_streams_synchronizer2.h @@ -15,7 +15,7 @@ #include "rtc_base/synchronization/sequence_checker.h" #include "rtc_base/task_queue.h" -#include "rtc_base/task_utils/pending_task_safety_flag.h" +#include "rtc_base/task_utils/repeating_task.h" #include "video/stream_synchronization.h" namespace webrtc { @@ -45,7 +45,6 @@ class RtpStreamsSynchronizer { double* estimated_freq_khz) const; private: - void QueueTimer(); void UpdateDelay(); TaskQueueBase* const task_queue_; @@ -65,12 +64,8 @@ class RtpStreamsSynchronizer { RTC_GUARDED_BY(main_checker_); StreamSynchronization::Measurements video_measurement_ RTC_GUARDED_BY(main_checker_); - int64_t last_sync_time_ RTC_GUARDED_BY(&main_checker_); + RepeatingTaskHandle repeating_task_ RTC_GUARDED_BY(main_checker_); int64_t last_stats_log_ms_ RTC_GUARDED_BY(&main_checker_); - bool timer_running_ RTC_GUARDED_BY(main_checker_) = false; - - // Used to signal destruction to potentially pending tasks. - ScopedTaskSafety task_safety_; }; } // namespace internal diff --git a/video/video_stream_encoder.h b/video/video_stream_encoder.h index 5c72167964..13b2bdf46b 100644 --- a/video/video_stream_encoder.h +++ b/video/video_stream_encoder.h @@ -38,12 +38,12 @@ #include "rtc_base/rate_statistics.h" #include "rtc_base/synchronization/sequence_checker.h" #include "rtc_base/task_queue.h" +#include "rtc_base/thread_checker.h" #include "system_wrappers/include/clock.h" #include "video/adaptation/video_stream_encoder_resource_manager.h" #include "video/encoder_bitrate_adjuster.h" #include "video/frame_encode_metadata_writer.h" #include "video/video_source_sink_controller.h" - namespace webrtc { // VideoStreamEncoder represent a video encoder that accepts raw video frames as