Migrate CallStats and RtpStreamsSynchronizer timers over to RepeatingTask

Bug: none
Change-Id: Ib49a3de74c6d3a6d4ea158383a5e4b69a1e58ab9
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/175000
Reviewed-by: Sebastian Jansson <srte@webrtc.org>
Commit-Queue: Tommi <tommi@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#31252}
This commit is contained in:
Tommi
2020-05-13 18:27:26 +02:00
committed by Commit Bot
parent 6ee67936bd
commit a0a4480f12
8 changed files with 55 additions and 77 deletions

View File

@ -20,12 +20,14 @@ namespace webrtc_repeating_task_impl {
RepeatingTaskBase::RepeatingTaskBase(TaskQueueBase* task_queue, RepeatingTaskBase::RepeatingTaskBase(TaskQueueBase* task_queue,
TimeDelta first_delay) TimeDelta first_delay)
: task_queue_(task_queue), : task_queue_(task_queue),
next_run_time_(Timestamp::Micros(rtc::TimeMicros()) + first_delay) {} next_run_time_(Timestamp::Micros(rtc::TimeMicros()) + first_delay) {
sequence_checker_.Detach();
}
RepeatingTaskBase::~RepeatingTaskBase() = default; RepeatingTaskBase::~RepeatingTaskBase() = default;
bool RepeatingTaskBase::Run() { bool RepeatingTaskBase::Run() {
RTC_DCHECK_RUN_ON(task_queue_); RTC_DCHECK_RUN_ON(&sequence_checker_);
// Return true to tell the TaskQueue to destruct this object. // Return true to tell the TaskQueue to destruct this object.
if (next_run_time_.IsPlusInfinity()) if (next_run_time_.IsPlusInfinity())
return true; return true;
@ -51,6 +53,7 @@ bool RepeatingTaskBase::Run() {
} }
void RepeatingTaskBase::Stop() { void RepeatingTaskBase::Stop() {
RTC_DCHECK_RUN_ON(&sequence_checker_);
RTC_DCHECK(next_run_time_.IsFinite()); RTC_DCHECK(next_run_time_.IsFinite());
next_run_time_ = Timestamp::PlusInfinity(); next_run_time_ = Timestamp::PlusInfinity();
} }
@ -75,7 +78,6 @@ RepeatingTaskHandle::RepeatingTaskHandle(
void RepeatingTaskHandle::Stop() { void RepeatingTaskHandle::Stop() {
if (repeating_task_) { if (repeating_task_) {
RTC_DCHECK_RUN_ON(repeating_task_->task_queue_);
repeating_task_->Stop(); repeating_task_->Stop();
repeating_task_ = nullptr; repeating_task_ = nullptr;
} }

View File

@ -20,7 +20,6 @@
#include "api/units/time_delta.h" #include "api/units/time_delta.h"
#include "api/units/timestamp.h" #include "api/units/timestamp.h"
#include "rtc_base/synchronization/sequence_checker.h" #include "rtc_base/synchronization/sequence_checker.h"
#include "rtc_base/thread_checker.h"
namespace webrtc { namespace webrtc {
@ -31,18 +30,25 @@ class RepeatingTaskBase : public QueuedTask {
public: public:
RepeatingTaskBase(TaskQueueBase* task_queue, TimeDelta first_delay); RepeatingTaskBase(TaskQueueBase* task_queue, TimeDelta first_delay);
~RepeatingTaskBase() override; ~RepeatingTaskBase() override;
virtual TimeDelta RunClosure() = 0;
void Stop();
private: private:
friend class ::webrtc::RepeatingTaskHandle; virtual TimeDelta RunClosure() = 0;
bool Run() final; bool Run() final;
void Stop() RTC_RUN_ON(task_queue_);
TaskQueueBase* const task_queue_; TaskQueueBase* const task_queue_;
// This is always finite, except for the special case where it's PlusInfinity // This is always finite, except for the special case where it's PlusInfinity
// to signal that the task should stop. // to signal that the task should stop.
Timestamp next_run_time_ RTC_GUARDED_BY(task_queue_); Timestamp next_run_time_ RTC_GUARDED_BY(sequence_checker_);
// We use a SequenceChecker to check for correct usage instead of using
// RTC_DCHECK_RUN_ON(task_queue_). This is to work around a compatibility
// issue with some TQ implementations such as rtc::Thread that don't
// consistently set themselves as the 'current' TQ when running tasks.
// The SequenceChecker detects those implementations differently but gives
// the same effect as far as thread safety goes.
SequenceChecker sequence_checker_;
}; };
// The template closure pattern is based on rtc::ClosureTask. // The template closure pattern is based on rtc::ClosureTask.
@ -61,9 +67,9 @@ class RepeatingTaskImpl final : public RepeatingTaskBase {
""); "");
} }
private:
TimeDelta RunClosure() override { return closure_(); } TimeDelta RunClosure() override { return closure_(); }
private:
typename std::remove_const< typename std::remove_const<
typename std::remove_reference<Closure>::type>::type closure_; typename std::remove_reference<Closure>::type>::type closure_;
}; };

View File

@ -64,9 +64,10 @@ int64_t GetNewAvgRttMs(const std::list<CallStats::RttTime>& reports,
} // namespace } // namespace
constexpr TimeDelta CallStats::kUpdateInterval;
CallStats::CallStats(Clock* clock, TaskQueueBase* task_queue) CallStats::CallStats(Clock* clock, TaskQueueBase* task_queue)
: clock_(clock), : clock_(clock),
last_process_time_(clock_->TimeInMilliseconds()),
max_rtt_ms_(-1), max_rtt_ms_(-1),
avg_rtt_ms_(-1), avg_rtt_ms_(-1),
sum_avg_rtt_ms_(0), sum_avg_rtt_ms_(0),
@ -75,39 +76,29 @@ CallStats::CallStats(Clock* clock, TaskQueueBase* task_queue)
task_queue_(task_queue) { task_queue_(task_queue) {
RTC_DCHECK(task_queue_); RTC_DCHECK(task_queue_);
process_thread_checker_.Detach(); process_thread_checker_.Detach();
task_queue_->PostDelayedTask( repeating_task_ =
ToQueuedTask(task_safety_, [this]() { RunTimer(); }), kUpdateIntervalMs); RepeatingTaskHandle::DelayedStart(task_queue_, kUpdateInterval, [this]() {
UpdateAndReport();
return kUpdateInterval;
});
} }
CallStats::~CallStats() { CallStats::~CallStats() {
RTC_DCHECK_RUN_ON(&construction_thread_checker_); RTC_DCHECK_RUN_ON(&construction_thread_checker_);
RTC_DCHECK(observers_.empty()); RTC_DCHECK(observers_.empty());
repeating_task_.Stop();
UpdateHistograms(); UpdateHistograms();
} }
void CallStats::RunTimer() {
RTC_DCHECK_RUN_ON(&construction_thread_checker_);
UpdateAndReport();
uint32_t interval =
last_process_time_ + kUpdateIntervalMs - clock_->TimeInMilliseconds();
task_queue_->PostDelayedTask(
ToQueuedTask(task_safety_, [this]() { RunTimer(); }), interval);
}
void CallStats::UpdateAndReport() { void CallStats::UpdateAndReport() {
RTC_DCHECK_RUN_ON(&construction_thread_checker_); RTC_DCHECK_RUN_ON(&construction_thread_checker_);
int64_t now = clock_->TimeInMilliseconds();
last_process_time_ = now;
// |avg_rtt_ms_| is allowed to be read on the construction thread since that's // |avg_rtt_ms_| is allowed to be read on the construction thread since that's
// the only thread that modifies the value. // the only thread that modifies the value.
int64_t avg_rtt_ms = avg_rtt_ms_; int64_t avg_rtt_ms = avg_rtt_ms_;
RemoveOldReports(now, &reports_); RemoveOldReports(clock_->CurrentTime().ms(), &reports_);
max_rtt_ms_ = GetMaxRttMs(reports_); max_rtt_ms_ = GetMaxRttMs(reports_);
avg_rtt_ms = GetNewAvgRttMs(reports_, avg_rtt_ms); avg_rtt_ms = GetNewAvgRttMs(reports_, avg_rtt_ms);
{ {

View File

@ -14,6 +14,7 @@
#include <list> #include <list>
#include <memory> #include <memory>
#include "api/units/timestamp.h"
#include "modules/include/module_common_types.h" #include "modules/include/module_common_types.h"
#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h" #include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
#include "rtc_base/constructor_magic.h" #include "rtc_base/constructor_magic.h"
@ -21,6 +22,7 @@
#include "rtc_base/synchronization/sequence_checker.h" #include "rtc_base/synchronization/sequence_checker.h"
#include "rtc_base/task_queue.h" #include "rtc_base/task_queue.h"
#include "rtc_base/task_utils/pending_task_safety_flag.h" #include "rtc_base/task_utils/pending_task_safety_flag.h"
#include "rtc_base/task_utils/repeating_task.h"
#include "system_wrappers/include/clock.h" #include "system_wrappers/include/clock.h"
namespace webrtc { namespace webrtc {
@ -29,7 +31,7 @@ namespace internal {
class CallStats { class CallStats {
public: public:
// Time interval for updating the observers. // Time interval for updating the observers.
static constexpr int64_t kUpdateIntervalMs = 1000; static constexpr TimeDelta kUpdateInterval = TimeDelta::Millis(1000);
CallStats(Clock* clock, TaskQueueBase* task_queue); CallStats(Clock* clock, TaskQueueBase* task_queue);
~CallStats(); ~CallStats();
@ -70,8 +72,6 @@ class CallStats {
void OnRttUpdate(int64_t rtt); void OnRttUpdate(int64_t rtt);
int64_t LastProcessedRttFromProcessThread() const; int64_t LastProcessedRttFromProcessThread() const;
void RunTimer();
void UpdateAndReport(); void UpdateAndReport();
// This method must only be called when the process thread is not // This method must only be called when the process thread is not
@ -102,8 +102,10 @@ class CallStats {
Clock* const clock_; Clock* const clock_;
// The last time 'Process' resulted in statistic update. // Used to regularly call UpdateAndReport().
int64_t last_process_time_ RTC_GUARDED_BY(construction_thread_checker_); RepeatingTaskHandle repeating_task_
RTC_GUARDED_BY(construction_thread_checker_);
// The last RTT in the statistics update (zero if there is no valid estimate). // The last RTT in the statistics update (zero if there is no valid estimate).
int64_t max_rtt_ms_ RTC_GUARDED_BY(construction_thread_checker_); int64_t max_rtt_ms_ RTC_GUARDED_BY(construction_thread_checker_);

View File

@ -96,12 +96,13 @@ TEST_F(CallStats2Test, ProcessTime) {
.Times(2) .Times(2)
.WillOnce(InvokeWithoutArgs([this] { .WillOnce(InvokeWithoutArgs([this] {
// Advance clock and verify we get an update. // Advance clock and verify we get an update.
fake_clock_.AdvanceTimeMilliseconds(CallStats::kUpdateIntervalMs); fake_clock_.AdvanceTimeMilliseconds(CallStats::kUpdateInterval.ms());
})) }))
.WillRepeatedly(InvokeWithoutArgs([this] { .WillRepeatedly(InvokeWithoutArgs([this] {
AsyncSimulateRttUpdate(kRtt2); AsyncSimulateRttUpdate(kRtt2);
// Advance clock just too little to get an update. // Advance clock just too little to get an update.
fake_clock_.AdvanceTimeMilliseconds(CallStats::kUpdateIntervalMs - 1); fake_clock_.AdvanceTimeMilliseconds(CallStats::kUpdateInterval.ms() -
1);
})); }));
// In case you're reading this and wondering how this number is arrived at, // In case you're reading this and wondering how this number is arrived at,
@ -256,7 +257,7 @@ TEST_F(CallStats2Test, LastProcessedRtt) {
.Times(AnyNumber()) .Times(AnyNumber())
.WillOnce(InvokeWithoutArgs([this] { .WillOnce(InvokeWithoutArgs([this] {
EXPECT_EQ(kAvgRtt1, call_stats_.LastProcessedRtt()); EXPECT_EQ(kAvgRtt1, call_stats_.LastProcessedRtt());
fake_clock_.AdvanceTimeMilliseconds(CallStats::kUpdateIntervalMs); fake_clock_.AdvanceTimeMilliseconds(CallStats::kUpdateInterval.ms());
AsyncSimulateRttUpdate(kRttLow); AsyncSimulateRttUpdate(kRttLow);
AsyncSimulateRttUpdate(kRttHigh); AsyncSimulateRttUpdate(kRttHigh);
})) }))
@ -272,7 +273,7 @@ TEST_F(CallStats2Test, LastProcessedRtt) {
// Set a first values and verify that LastProcessedRtt initially returns the // Set a first values and verify that LastProcessedRtt initially returns the
// average rtt. // average rtt.
fake_clock_.AdvanceTimeMilliseconds(CallStats::kUpdateIntervalMs); fake_clock_.AdvanceTimeMilliseconds(CallStats::kUpdateInterval.ms());
AsyncSimulateRttUpdate(kRttLow); AsyncSimulateRttUpdate(kRttLow);
loop_.Run(); loop_.Run();
EXPECT_EQ(kAvgRtt2, call_stats_.LastProcessedRtt()); EXPECT_EQ(kAvgRtt2, call_stats_.LastProcessedRtt());
@ -292,7 +293,7 @@ TEST_F(CallStats2Test, ProducesHistogramMetrics) {
AsyncSimulateRttUpdate(kRtt); AsyncSimulateRttUpdate(kRtt);
loop_.Run(); loop_.Run();
fake_clock_.AdvanceTimeMilliseconds(metrics::kMinRunTimeInSeconds * fake_clock_.AdvanceTimeMilliseconds(metrics::kMinRunTimeInSeconds *
CallStats::kUpdateIntervalMs); CallStats::kUpdateInterval.ms());
AsyncSimulateRttUpdate(kRtt); AsyncSimulateRttUpdate(kRtt);
loop_.Run(); loop_.Run();

View File

@ -23,7 +23,7 @@ namespace internal {
namespace { namespace {
// Time interval for logging stats. // Time interval for logging stats.
constexpr int64_t kStatsLogIntervalMs = 10000; constexpr int64_t kStatsLogIntervalMs = 10000;
constexpr uint32_t kSyncIntervalMs = 1000; constexpr TimeDelta kSyncInterval = TimeDelta::Millis(1000);
bool UpdateMeasurements(StreamSynchronization::Measurements* stream, bool UpdateMeasurements(StreamSynchronization::Measurements* stream,
const Syncable::Info& info) { const Syncable::Info& info) {
@ -34,19 +34,20 @@ bool UpdateMeasurements(StreamSynchronization::Measurements* stream,
info.capture_time_ntp_secs, info.capture_time_ntp_frac, info.capture_time_ntp_secs, info.capture_time_ntp_frac,
info.capture_time_source_clock, &new_rtcp_sr); info.capture_time_source_clock, &new_rtcp_sr);
} }
} // namespace } // namespace
RtpStreamsSynchronizer::RtpStreamsSynchronizer(TaskQueueBase* main_queue, RtpStreamsSynchronizer::RtpStreamsSynchronizer(TaskQueueBase* main_queue,
Syncable* syncable_video) Syncable* syncable_video)
: task_queue_(main_queue), : task_queue_(main_queue),
syncable_video_(syncable_video), syncable_video_(syncable_video),
last_sync_time_(rtc::TimeNanos()),
last_stats_log_ms_(rtc::TimeMillis()) { last_stats_log_ms_(rtc::TimeMillis()) {
RTC_DCHECK(syncable_video); RTC_DCHECK(syncable_video);
} }
RtpStreamsSynchronizer::~RtpStreamsSynchronizer() { RtpStreamsSynchronizer::~RtpStreamsSynchronizer() {
RTC_DCHECK_RUN_ON(&main_checker_); RTC_DCHECK_RUN_ON(&main_checker_);
repeating_task_.Stop();
} }
void RtpStreamsSynchronizer::ConfigureSync(Syncable* syncable_audio) { void RtpStreamsSynchronizer::ConfigureSync(Syncable* syncable_audio) {
@ -58,52 +59,32 @@ void RtpStreamsSynchronizer::ConfigureSync(Syncable* syncable_audio) {
syncable_audio_ = syncable_audio; syncable_audio_ = syncable_audio;
sync_.reset(nullptr); sync_.reset(nullptr);
if (!syncable_audio_) if (!syncable_audio_) {
repeating_task_.Stop();
return; return;
}
sync_.reset( sync_.reset(
new StreamSynchronization(syncable_video_->id(), syncable_audio_->id())); new StreamSynchronization(syncable_video_->id(), syncable_audio_->id()));
QueueTimer();
}
void RtpStreamsSynchronizer::QueueTimer() { if (repeating_task_.Running())
RTC_DCHECK_RUN_ON(&main_checker_);
if (timer_running_)
return; return;
timer_running_ = true; repeating_task_ =
uint32_t delay = kSyncIntervalMs - (rtc::TimeNanos() - last_sync_time_) / RepeatingTaskHandle::DelayedStart(task_queue_, kSyncInterval, [this]() {
rtc::kNumNanosecsPerMillisec; UpdateDelay();
if (delay > kSyncIntervalMs) { return kSyncInterval;
// TODO(tommi): |linux_chromium_tsan_rel_ng| bot has shown a failure when });
// running WebRtcBrowserTest.CallAndModifyStream, indicating that the
// underlying clock is not reliable. Possibly there's a fake clock being
// used as the tests are flaky. Look into and fix.
RTC_LOG(LS_ERROR) << "Unexpected timer value: " << delay;
delay = kSyncIntervalMs;
}
RTC_DCHECK_LE(delay, kSyncIntervalMs);
task_queue_->PostDelayedTask(ToQueuedTask(task_safety_,
[this] {
RTC_DCHECK_RUN_ON(&main_checker_);
timer_running_ = false;
UpdateDelay();
}),
delay);
} }
void RtpStreamsSynchronizer::UpdateDelay() { void RtpStreamsSynchronizer::UpdateDelay() {
RTC_DCHECK_RUN_ON(&main_checker_); RTC_DCHECK_RUN_ON(&main_checker_);
last_sync_time_ = rtc::TimeNanos();
if (!syncable_audio_) if (!syncable_audio_)
return; return;
RTC_DCHECK(sync_.get()); RTC_DCHECK(sync_.get());
QueueTimer();
bool log_stats = false; bool log_stats = false;
const int64_t now_ms = rtc::TimeMillis(); const int64_t now_ms = rtc::TimeMillis();
if (now_ms - last_stats_log_ms_ > kStatsLogIntervalMs) { if (now_ms - last_stats_log_ms_ > kStatsLogIntervalMs) {

View File

@ -15,7 +15,7 @@
#include "rtc_base/synchronization/sequence_checker.h" #include "rtc_base/synchronization/sequence_checker.h"
#include "rtc_base/task_queue.h" #include "rtc_base/task_queue.h"
#include "rtc_base/task_utils/pending_task_safety_flag.h" #include "rtc_base/task_utils/repeating_task.h"
#include "video/stream_synchronization.h" #include "video/stream_synchronization.h"
namespace webrtc { namespace webrtc {
@ -45,7 +45,6 @@ class RtpStreamsSynchronizer {
double* estimated_freq_khz) const; double* estimated_freq_khz) const;
private: private:
void QueueTimer();
void UpdateDelay(); void UpdateDelay();
TaskQueueBase* const task_queue_; TaskQueueBase* const task_queue_;
@ -65,12 +64,8 @@ class RtpStreamsSynchronizer {
RTC_GUARDED_BY(main_checker_); RTC_GUARDED_BY(main_checker_);
StreamSynchronization::Measurements video_measurement_ StreamSynchronization::Measurements video_measurement_
RTC_GUARDED_BY(main_checker_); RTC_GUARDED_BY(main_checker_);
int64_t last_sync_time_ RTC_GUARDED_BY(&main_checker_); RepeatingTaskHandle repeating_task_ RTC_GUARDED_BY(main_checker_);
int64_t last_stats_log_ms_ RTC_GUARDED_BY(&main_checker_); int64_t last_stats_log_ms_ RTC_GUARDED_BY(&main_checker_);
bool timer_running_ RTC_GUARDED_BY(main_checker_) = false;
// Used to signal destruction to potentially pending tasks.
ScopedTaskSafety task_safety_;
}; };
} // namespace internal } // namespace internal

View File

@ -38,12 +38,12 @@
#include "rtc_base/rate_statistics.h" #include "rtc_base/rate_statistics.h"
#include "rtc_base/synchronization/sequence_checker.h" #include "rtc_base/synchronization/sequence_checker.h"
#include "rtc_base/task_queue.h" #include "rtc_base/task_queue.h"
#include "rtc_base/thread_checker.h"
#include "system_wrappers/include/clock.h" #include "system_wrappers/include/clock.h"
#include "video/adaptation/video_stream_encoder_resource_manager.h" #include "video/adaptation/video_stream_encoder_resource_manager.h"
#include "video/encoder_bitrate_adjuster.h" #include "video/encoder_bitrate_adjuster.h"
#include "video/frame_encode_metadata_writer.h" #include "video/frame_encode_metadata_writer.h"
#include "video/video_source_sink_controller.h" #include "video/video_source_sink_controller.h"
namespace webrtc { namespace webrtc {
// VideoStreamEncoder represent a video encoder that accepts raw video frames as // VideoStreamEncoder represent a video encoder that accepts raw video frames as