From 48b48e5cc9ee293af50c584535ae2a4ebc32b858 Mon Sep 17 00:00:00 2001 From: Tommi Date: Fri, 9 Aug 2019 11:42:32 +0200 Subject: [PATCH] Enable thread check in Call::GetStats(). I've updated all the tests that previously were calling this method on the wrong thread, so we can enable this check now. I've also landed some changes that simplify the threading model in this class and subsequently I've removed some locks and can remove some more in this CL. Added some comments about future improvements for GetStats() to reduce synchronization. Simplified CallStats::OnRttUpdate() to have one fewer async methods. Bug: webrtc:10847 Change-Id: I48e6809172142cc4be4385b7d4aa2affb52a963a Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/148588 Reviewed-by: Karl Wiberg Commit-Queue: Tommi Cr-Commit-Position: refs/heads/master@{#28821} --- call/call.cc | 83 +++++++++++++++++++++++---------------------- video/call_stats.cc | 24 ++++++++----- 2 files changed, 59 insertions(+), 48 deletions(-) diff --git a/call/call.cc b/call/call.cc index f676b5f51a..0ee80403b8 100644 --- a/call/call.cc +++ b/call/call.cc @@ -255,6 +255,10 @@ class Call final : public webrtc::Call, void RegisterRateObserver(); + rtc::TaskQueue* network_queue() const { + return transport_send_ptr_->GetWorkerQueue(); + } + Clock* const clock_; TaskQueueFactory* const task_queue_factory_; @@ -268,11 +272,7 @@ class Call final : public webrtc::Call, NetworkState audio_network_state_; NetworkState video_network_state_; - // TODO(tommi): Once tests have been fixed to not call GetStats() on the wrong - // thread, remove this lock and protect aggregate_network_up_crit_ with the - // configuration_sequence_checker_. - rtc::CriticalSection aggregate_network_up_crit_; - bool aggregate_network_up_ RTC_GUARDED_BY(aggregate_network_up_crit_); + bool aggregate_network_up_ RTC_GUARDED_BY(configuration_sequence_checker_); std::unique_ptr receive_crit_; // Audio, Video, and FlexFEC receive streams are owned by the client that @@ -466,6 +466,13 @@ Call::Call(Clock* clock, transport_send_(std::move(transport_send)) { RTC_DCHECK(config.event_log != nullptr); worker_sequence_checker_.Detach(); + + call_stats_->RegisterStatsObserver(&receive_side_cc_); + + module_process_thread_->RegisterModule( + receive_side_cc_.GetRemoteBitrateEstimator(true), RTC_FROM_HERE); + module_process_thread_->RegisterModule(call_stats_.get(), RTC_FROM_HERE); + module_process_thread_->RegisterModule(&receive_side_cc_, RTC_FROM_HERE); } Call::~Call() { @@ -477,30 +484,27 @@ Call::~Call() { RTC_CHECK(audio_receive_streams_.empty()); RTC_CHECK(video_receive_streams_.empty()); + module_process_thread_->Stop(); module_process_thread_->DeRegisterModule( receive_side_cc_.GetRemoteBitrateEstimator(true)); module_process_thread_->DeRegisterModule(&receive_side_cc_); module_process_thread_->DeRegisterModule(call_stats_.get()); - module_process_thread_->Stop(); call_stats_->DeregisterStatsObserver(&receive_side_cc_); absl::optional first_sent_packet_ms = transport_send_->GetFirstPacketTime(); + // Only update histograms after process threads have been shut down, so that // they won't try to concurrently update stats. if (first_sent_packet_ms) { rtc::CritScope lock(&bitrate_crit_); UpdateSendHistograms(*first_sent_packet_ms); } + UpdateReceiveHistograms(); UpdateHistograms(); } -// TODO(tommi): Most of this work could be done when Call gets created. -// Starting the process thread itself could be done on demand when streams -// are created and in that case, calling Start() multiple times is harmless -// so holding an extra state variable, |is_target_rate_observer_registered_| -// also shouldn't be necessary. void Call::RegisterRateObserver() { RTC_DCHECK_RUN_ON(&configuration_sequence_checker_); @@ -509,14 +513,10 @@ void Call::RegisterRateObserver() { is_target_rate_observer_registered_ = true; + // This call seems to kick off a number of things, so probably better left + // off being kicked off on request rather than in the ctor. transport_send_ptr_->RegisterTargetTransferRateObserver(this); - call_stats_->RegisterStatsObserver(&receive_side_cc_); - - module_process_thread_->RegisterModule( - receive_side_cc_.GetRemoteBitrateEstimator(true), RTC_FROM_HERE); - module_process_thread_->RegisterModule(call_stats_.get(), RTC_FROM_HERE); - module_process_thread_->RegisterModule(&receive_side_cc_, RTC_FROM_HERE); module_process_thread_->Start(); } @@ -531,6 +531,7 @@ void Call::UpdateHistograms() { (clock_->TimeInMilliseconds() - start_ms_) / 1000); } +// Called from the dtor. void Call::UpdateSendHistograms(Timestamp first_sent_packet) { int64_t elapsed_sec = (clock_->TimeInMilliseconds() - first_sent_packet.ms()) / 1000; @@ -950,35 +951,41 @@ RtpTransportControllerSendInterface* Call::GetTransportControllerSend() { } Call::Stats Call::GetStats() const { - // TODO(solenberg): Some test cases in EndToEndTest use this from a different - // thread. Re-enable once that is fixed. - // RTC_DCHECK_RUN_ON(&configuration_sequence_checker_); + RTC_DCHECK_RUN_ON(&configuration_sequence_checker_); + + // TODO(tommi): The following stats are managed on the process thread: + // - pacer_delay_ms (PacedSender::Process) + // - rtt_ms + // - recv_bandwidth_bps + // These are delivered on the network TQ: + // - send_bandwidth_bps (see OnTargetTransferRate) + // - max_padding_bitrate_bps (see OnAllocationLimitsChanged) + Stats stats; + // TODO(srte): It is unclear if we only want to report queues if network is + // available. + stats.pacer_delay_ms = + aggregate_network_up_ ? transport_send_ptr_->GetPacerQueuingDelayMs() : 0; + + stats.rtt_ms = call_stats_->LastProcessedRtt(); + // Fetch available send/receive bitrates. std::vector ssrcs; uint32_t recv_bandwidth = 0; receive_side_cc_.GetRemoteBitrateEstimator(false)->LatestEstimate( &ssrcs, &recv_bandwidth); + stats.recv_bandwidth_bps = recv_bandwidth; { rtc::CritScope cs(&last_bandwidth_bps_crit_); stats.send_bandwidth_bps = last_bandwidth_bps_; } - stats.recv_bandwidth_bps = recv_bandwidth; - // TODO(srte): It is unclear if we only want to report queues if network is - // available. - { - rtc::CritScope cs(&aggregate_network_up_crit_); - stats.pacer_delay_ms = aggregate_network_up_ - ? transport_send_ptr_->GetPacerQueuingDelayMs() - : 0; - } - stats.rtt_ms = call_stats_->LastProcessedRtt(); { rtc::CritScope cs(&bitrate_crit_); stats.max_padding_bitrate_bps = configured_max_padding_bitrate_bps_; } + return stats; } @@ -1048,10 +1055,8 @@ void Call::UpdateAggregateNetworkState() { RTC_LOG(LS_INFO) << "UpdateAggregateNetworkState: aggregate_state=" << (aggregate_network_up ? "up" : "down"); - { - rtc::CritScope cs(&aggregate_network_up_crit_); - aggregate_network_up_ = aggregate_network_up; - } + aggregate_network_up_ = aggregate_network_up; + transport_send_ptr_->OnNetworkAvailability(aggregate_network_up); } @@ -1062,16 +1067,12 @@ void Call::OnSentPacket(const rtc::SentPacket& sent_packet) { } void Call::OnStartRateUpdate(DataRate start_rate) { - if (!transport_send_ptr_->GetWorkerQueue()->IsCurrent()) { - transport_send_ptr_->GetWorkerQueue()->PostTask( - [this, start_rate] { this->OnStartRateUpdate(start_rate); }); - return; - } + RTC_DCHECK(network_queue()->IsCurrent()); bitrate_allocator_->UpdateStartRate(start_rate.bps()); } void Call::OnTargetTransferRate(TargetTransferRate msg) { - RTC_DCHECK(transport_send_ptr_->GetWorkerQueue()->IsCurrent()); + RTC_DCHECK(network_queue()->IsCurrent()); RTC_DCHECK_RUN_ON(&worker_sequence_checker_); uint32_t target_bitrate_bps = msg.target_rate.bps(); @@ -1122,7 +1123,9 @@ void Call::OnTargetTransferRate(TargetTransferRate msg) { void Call::OnAllocationLimitsChanged(uint32_t min_send_bitrate_bps, uint32_t max_padding_bitrate_bps, uint32_t total_bitrate_bps) { + RTC_DCHECK(network_queue()->IsCurrent()); RTC_DCHECK_RUN_ON(&worker_sequence_checker_); + transport_send_ptr_->SetAllocatedSendBitrateLimits( min_send_bitrate_bps, max_padding_bitrate_bps, total_bitrate_bps); diff --git a/video/call_stats.cc b/video/call_stats.cc index ce93d04e6d..27e00ee7ca 100644 --- a/video/call_stats.cc +++ b/video/call_stats.cc @@ -122,6 +122,8 @@ void CallStats::Process() { int64_t now = clock_->TimeInMilliseconds(); last_process_time_ = now; + // |avg_rtt_ms_| is allowed to be read on the process thread since that's the + // only thread that modifies the value. int64_t avg_rtt_ms = avg_rtt_ms_; RemoveOldReports(now, &reports_); max_rtt_ms_ = GetMaxRttMs(reports_); @@ -171,20 +173,26 @@ void CallStats::DeregisterStatsObserver(CallStatsObserver* observer) { } int64_t CallStats::LastProcessedRtt() const { + // TODO(tommi): This currently gets called from the construction thread of + // Call as well as from the process thread. Look into restricting this to + // allow only reading this from the process thread (or TQ once we get there) + // so that the lock isn't necessary. + rtc::CritScope cs(&avg_rtt_ms_lock_); return avg_rtt_ms_; } void CallStats::OnRttUpdate(int64_t rtt) { - int64_t now_ms = clock_->TimeInMilliseconds(); - process_thread_->PostTask(ToQueuedTask([rtt, now_ms, this]() { - RTC_DCHECK_RUN_ON(&process_thread_checker_); - reports_.push_back(RttTime(rtt, now_ms)); - if (time_of_first_rtt_ms_ == -1) - time_of_first_rtt_ms_ = now_ms; + RTC_DCHECK_RUN_ON(&process_thread_checker_); - process_thread_->WakeUp(this); - })); + int64_t now_ms = clock_->TimeInMilliseconds(); + reports_.push_back(RttTime(rtt, now_ms)); + if (time_of_first_rtt_ms_ == -1) + time_of_first_rtt_ms_ = now_ms; + + // Make sure Process() will be called and deliver the updates asynchronously. + last_process_time_ -= kUpdateIntervalMs; + process_thread_->WakeUp(this); } void CallStats::UpdateHistograms() {